1 2 /** 3 * Tencent is pleased to support the open source community by making MSEC available. 4 * 5 * Copyright (C) 2016 THL A29 Limited, a Tencent company. All rights reserved. 6 * 7 * Licensed under the GNU General Public License, Version 2.0 (the "License"); 8 * you may not use this file except in compliance with the License. You may 9 * obtain a copy of the License at 10 * 11 * https://opensource.org/licenses/GPL-2.0 12 * 13 * Unless required by applicable law or agreed to in writing, software distributed under the 14 * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, 15 * either express or implied. See the License for the specific language governing permissions 16 * and limitations under the License. 17 */ 18 19 20 /** 21 * @filename micro_thread.h 22 * @info micro thread manager 23 */ 24 25 #ifndef ___MICRO_THREAD_H__ 26 #define ___MICRO_THREAD_H__ 27 28 #include <stdlib.h> 29 #include <stdio.h> 30 #include <unistd.h> 31 #include <sys/types.h> 32 #include <sys/socket.h> 33 #include <sys/ioctl.h> 34 #include <sys/uio.h> 35 #include <sys/time.h> 36 #include <sys/mman.h> 37 #include <sys/resource.h> 38 #include <sys/queue.h> 39 #include <fcntl.h> 40 #include <signal.h> 41 #include <errno.h> 42 #include <setjmp.h> 43 44 #include <set> 45 #include <vector> 46 #include <queue> 47 #include "heap.h" 48 #include "kqueue_proxy.h" 49 #include "heap_timer.h" 50 51 using std::vector; 52 using std::set; 53 using std::queue; 54 55 namespace NS_MICRO_THREAD { 56 57 #define STACK_PAD_SIZE 128 58 #define MEM_PAGE_SIZE 4096 59 #define DEFAULT_STACK_SIZE 128*1024 60 #define DEFAULT_THREAD_NUM 2000 61 62 typedef unsigned long long utime64_t; 63 typedef void (*ThreadStart)(void*); 64 65 class ScheduleObj 66 { 67 public: 68 69 static ScheduleObj* Instance (void); 70 71 utime64_t ScheduleGetTime(void); 72 73 void ScheduleThread(void); 74 75 void ScheduleSleep(void); 76 77 void SchedulePend(void); 78 79 void ScheduleUnpend(void* thread); 80 81 void ScheduleReclaim(void); 82 83 void ScheduleStartRun(void); 84 85 private: 86 static ScheduleObj* _instance; 87 }; 88 89 struct MtStack 90 { 91 int _stk_size; 92 int _vaddr_size; 93 char *_vaddr; 94 void *_esp; 95 char *_stk_bottom; 96 char *_stk_top; 97 void *_private; 98 int valgrind_id; 99 }; 100 101 class Thread : public HeapEntry 102 { 103 public: 104 105 explicit Thread(int stack_size = 0); 106 virtual ~Thread(){}; 107 108 virtual void Run(void){}; 109 110 bool Initial(void); 111 112 void Destroy(void); 113 114 void Reset(void); 115 116 void sleep(int ms); 117 118 void Wait(); 119 120 void SwitchContext(void); 121 122 void RestoreContext(void); 123 124 utime64_t GetWakeupTime(void) { 125 return _wakeup_time; 126 }; 127 128 void SetWakeupTime(utime64_t waketime) { 129 _wakeup_time = waketime; 130 }; 131 132 void SetPrivate(void *data) 133 { 134 _stack->_private = data; 135 } 136 137 void* GetPrivate() 138 { 139 return _stack->_private; 140 } 141 142 bool CheckStackHealth(char *esp); 143 144 protected: 145 146 virtual void CleanState(void){}; 147 148 virtual bool InitStack(void); 149 150 virtual void FreeStack(void); 151 152 virtual void InitContext(void); 153 154 private: 155 MtStack* _stack; 156 jmp_buf _jmpbuf; 157 int _stack_size; 158 utime64_t _wakeup_time; 159 }; 160 161 class MicroThread : public Thread 162 { 163 public: 164 enum ThreadType 165 { 166 NORMAL = 0, ///< normal thread, no dynamic allocated stack infomations. 167 PRIMORDIAL = 1, ///< primordial thread, created when frame initialized. 168 DAEMON = 2, ///< daemon thread, IO event management and scheduling trigger. 169 SUB_THREAD = 3, ///< sub thread, run simple task. 170 }; 171 172 enum ThreadFlag 173 { 174 NOT_INLIST = 0x0, 175 FREE_LIST = 0x1, 176 IO_LIST = 0x2, 177 SLEEP_LIST = 0x4, 178 RUN_LIST = 0x8, 179 PEND_LIST = 0x10, 180 SUB_LIST = 0x20, 181 182 }; 183 184 enum ThreadState 185 { 186 INITIAL = 0, 187 RUNABLE = 1, 188 RUNNING = 2, 189 SLEEPING = 3, 190 PENDING = 4, 191 }; 192 193 typedef TAILQ_ENTRY(MicroThread) ThreadLink; 194 typedef TAILQ_HEAD(__ThreadSubTailq, MicroThread) SubThreadList; 195 196 public: 197 198 MicroThread(ThreadType type = NORMAL); 199 ~MicroThread(){}; 200 201 ThreadLink _entry; 202 ThreadLink _sub_entry; 203 204 virtual utime64_t HeapValue() { 205 return GetWakeupTime(); 206 }; 207 208 virtual void Run(void); 209 210 void ClearAllFd(void) { 211 TAILQ_INIT(&_fdset); 212 }; 213 void AddFd(KqueuerObj* efpd) { 214 TAILQ_INSERT_TAIL(&_fdset, efpd, _entry); 215 }; 216 void AddFdList(KqObjList* fdset) { 217 TAILQ_CONCAT(&_fdset, fdset, _entry); 218 }; 219 KqObjList& GetFdSet(void) { 220 return _fdset; 221 }; 222 223 void SetType(ThreadType type) { 224 _type = type; 225 }; 226 ThreadType GetType(void) { 227 return _type; 228 }; 229 230 bool IsDaemon(void) { 231 return (DAEMON == _type); 232 }; 233 bool IsPrimo(void) { 234 return (PRIMORDIAL == _type); 235 }; 236 bool IsSubThread(void) { 237 return (SUB_THREAD == _type); 238 }; 239 240 void SetParent(MicroThread* parent) { 241 _parent = parent; 242 }; 243 MicroThread* GetParent() { 244 return _parent; 245 }; 246 void WakeupParent(); 247 248 void AddSubThread(MicroThread* sub); 249 void RemoveSubThread(MicroThread* sub); 250 bool HasNoSubThread(); 251 252 void SetState(ThreadState state) { 253 _state = state; 254 }; 255 ThreadState GetState(void) { 256 return _state; 257 } 258 259 void SetFlag(ThreadFlag flag) { 260 _flag = (ThreadFlag)(_flag | flag); 261 }; 262 void UnsetFlag(ThreadFlag flag) { 263 _flag = (ThreadFlag)(_flag & ~flag); 264 }; 265 bool HasFlag(ThreadFlag flag) { 266 return _flag & flag; 267 }; 268 ThreadFlag GetFlag() { 269 return _flag; 270 }; 271 272 void SetSartFunc(ThreadStart func, void* args) { 273 _start = func; 274 _args = args; 275 }; 276 277 void* GetThreadArgs() { 278 return _args; 279 } 280 281 protected: 282 283 virtual void CleanState(void); 284 285 private: 286 ThreadState _state; 287 ThreadType _type; 288 ThreadFlag _flag; 289 KqObjList _fdset; 290 SubThreadList _sub_list; 291 MicroThread* _parent; 292 ThreadStart _start; 293 void* _args; 294 295 }; 296 typedef std::set<MicroThread*> ThreadSet; 297 typedef std::queue<MicroThread*> ThreadList; 298 299 300 class LogAdapter 301 { 302 public: 303 304 LogAdapter(){}; 305 virtual ~LogAdapter(){}; 306 307 virtual bool CheckDebug(){ return true;}; 308 virtual bool CheckTrace(){ return true;}; 309 virtual bool CheckError(){ return true;}; 310 311 virtual void LogDebug(char* fmt, ...){}; 312 virtual void LogTrace(char* fmt, ...){}; 313 virtual void LogError(char* fmt, ...){}; 314 315 virtual void AttrReportAdd(int attr, int iValue){}; 316 virtual void AttrReportSet(int attr, int iValue){}; 317 318 }; 319 320 321 class ThreadPool 322 { 323 public: 324 325 static unsigned int default_thread_num; 326 static unsigned int default_stack_size; 327 328 static void SetDefaultThreadNum(unsigned int num) { 329 default_thread_num = num; 330 }; 331 332 static void SetDefaultStackSize(unsigned int size) { 333 default_stack_size = (size + MEM_PAGE_SIZE - 1) / MEM_PAGE_SIZE * MEM_PAGE_SIZE; 334 }; 335 336 bool InitialPool(int max_num); 337 338 void DestroyPool (void); 339 340 MicroThread* AllocThread(void); 341 342 void FreeThread(MicroThread* thread); 343 344 int GetUsedNum(void); 345 346 private: 347 ThreadList _freelist; 348 int _total_num; 349 int _use_num; 350 int _max_num; 351 }; 352 353 typedef TAILQ_HEAD(__ThreadTailq, MicroThread) ThreadTailq; 354 355 class MtFrame : public KqueueProxy, public ThreadPool 356 { 357 private: 358 static MtFrame* _instance; 359 LogAdapter* _log_adpt; 360 ThreadList _runlist; 361 ThreadTailq _iolist; 362 ThreadTailq _pend_list; 363 HeapList _sleeplist; 364 MicroThread* _daemon; 365 MicroThread* _primo; 366 MicroThread* _curr_thread; 367 utime64_t _last_clock; 368 int _waitnum; 369 CTimerMng* _timer; 370 int _realtime; 371 372 public: 373 friend class ScheduleObj; 374 375 public: 376 377 static MtFrame* Instance (void); 378 379 static int sendto(int fd, const void *msg, int len, int flags, const struct sockaddr *to, int tolen, int timeout); 380 381 static int recvfrom(int fd, void *buf, int len, int flags, struct sockaddr *from, socklen_t *fromlen, int timeout); 382 383 static int connect(int fd, const struct sockaddr *addr, int addrlen, int timeout); 384 385 static int accept(int fd, struct sockaddr *addr, socklen_t *addrlen, int timeout); 386 387 static ssize_t read(int fd, void *buf, size_t nbyte, int timeout); 388 389 static ssize_t write(int fd, const void *buf, size_t nbyte, int timeout); 390 391 static int recv(int fd, void *buf, int len, int flags, int timeout); 392 393 static ssize_t send(int fd, const void *buf, size_t nbyte, int flags, int timeout); 394 395 static void sleep(int ms); 396 397 static int WaitEvents(int fd, int events, int timeout); 398 399 static MicroThread* CreateThread(ThreadStart entry, void *args, bool runable = true); 400 401 static void DaemonRun(void* args); 402 static int Loop(void* args); 403 404 MicroThread *GetRootThread(); 405 406 bool InitFrame(LogAdapter* logadpt = NULL, int max_thread_num = 50000); 407 408 void SetHookFlag(); 409 410 void Destroy (void); 411 412 char* Version(void); 413 414 utime64_t GetLastClock(void) { 415 if(_realtime) 416 { 417 return GetSystemMS(); 418 } 419 return _last_clock; 420 }; 421 422 MicroThread* GetActiveThread(void) { 423 return _curr_thread; 424 }; 425 426 int RunWaitNum(void) { 427 return _waitnum; 428 }; 429 430 LogAdapter* GetLogAdpt(void) { 431 return _log_adpt; 432 }; 433 434 CTimerMng* GetTimerMng(void) { 435 return _timer; 436 }; 437 438 virtual int KqueueGetTimeout(void); 439 440 virtual bool KqueueSchedule(KqObjList* fdlist, KqueuerObj* fd, int timeout); 441 442 void WaitNotify(utime64_t timeout); 443 444 void RemoveIoWait(MicroThread* thread); 445 446 void InsertRunable(MicroThread* thread); 447 448 void InsertPend(MicroThread* thread); 449 450 void RemovePend(MicroThread* thread); 451 452 void SetRealTime(int realtime_) 453 { 454 _realtime =realtime_; 455 } 456 private: 457 458 MtFrame():_realtime(1){ _curr_thread = NULL; }; 459 460 MicroThread* DaemonThread(void){ 461 return _daemon; 462 }; 463 464 void ThreadSchdule(void); 465 466 void CheckExpired(); 467 468 void WakeupTimeout(void); 469 470 void SetLastClock(utime64_t clock) { 471 _last_clock = clock; 472 }; 473 474 void SetActiveThread(MicroThread* thread) { 475 _curr_thread = thread; 476 }; 477 478 utime64_t GetSystemMS(void) { 479 struct timeval tv; 480 gettimeofday(&tv, NULL); 481 return (tv.tv_sec * 1000ULL + tv.tv_usec / 1000ULL); 482 }; 483 484 void InsertSleep(MicroThread* thread); 485 486 void RemoveSleep(MicroThread* thread); 487 488 void InsertIoWait(MicroThread* thread); 489 490 void RemoveRunable(MicroThread* thread); 491 492 }; 493 494 #define MTLOG_DEBUG(fmt, args...) \ 495 do { \ 496 register NS_MICRO_THREAD::MtFrame *fm = NS_MICRO_THREAD::MtFrame::Instance(); \ 497 if (fm && fm->GetLogAdpt() && fm->GetLogAdpt()->CheckDebug()) \ 498 { \ 499 fm->GetLogAdpt()->LogDebug((char*)"[%-10s][%-4d][%-10s]"fmt, \ 500 __FILE__, __LINE__, __FUNCTION__, ##args); \ 501 } \ 502 } while (0) 503 504 #define MTLOG_TRACE(fmt, args...) \ 505 do { \ 506 register NS_MICRO_THREAD::MtFrame *fm = NS_MICRO_THREAD::MtFrame::Instance(); \ 507 if (fm && fm->GetLogAdpt() && fm->GetLogAdpt()->CheckTrace()) \ 508 { \ 509 fm->GetLogAdpt()->LogTrace((char*)"[%-10s][%-4d][%-10s]"fmt, \ 510 __FILE__, __LINE__, __FUNCTION__, ##args); \ 511 } \ 512 } while (0) 513 514 #define MTLOG_ERROR(fmt, args...) \ 515 do { \ 516 register NS_MICRO_THREAD::MtFrame *fm = NS_MICRO_THREAD::MtFrame::Instance(); \ 517 if (fm && fm->GetLogAdpt() && fm->GetLogAdpt()->CheckError()) \ 518 { \ 519 fm->GetLogAdpt()->LogError((char*)"[%-10s][%-4d][%-10s]"fmt, \ 520 __FILE__, __LINE__, __FUNCTION__, ##args); \ 521 } \ 522 } while (0) 523 524 #define MT_ATTR_API(ATTR, VALUE) \ 525 do { \ 526 register NS_MICRO_THREAD::MtFrame *fm = NS_MICRO_THREAD::MtFrame::Instance(); \ 527 if (fm && fm->GetLogAdpt()) \ 528 { \ 529 fm->GetLogAdpt()->AttrReportAdd(ATTR, VALUE); \ 530 } \ 531 } while (0) 532 533 #define MT_ATTR_API_SET(ATTR, VALUE) \ 534 do { \ 535 register NS_MICRO_THREAD::MtFrame *fm = NS_MICRO_THREAD::MtFrame::Instance(); \ 536 if (fm && fm->GetLogAdpt()) \ 537 { \ 538 fm->GetLogAdpt()->AttrReportSet(ATTR, VALUE); \ 539 } \ 540 } while (0) 541 542 543 544 }// NAMESPACE NS_MICRO_THREAD 545 546 #endif 547 548