1 2 /** 3 * Tencent is pleased to support the open source community by making MSEC available. 4 * 5 * Copyright (C) 2016 THL A29 Limited, a Tencent company. All rights reserved. 6 * 7 * Licensed under the GNU General Public License, Version 2.0 (the "License"); 8 * you may not use this file except in compliance with the License. You may 9 * obtain a copy of the License at 10 * 11 * https://opensource.org/licenses/GPL-2.0 12 * 13 * Unless required by applicable law or agreed to in writing, software distributed under the 14 * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, 15 * either express or implied. See the License for the specific language governing permissions 16 * and limitations under the License. 17 */ 18 19 20 /** 21 * @filename micro_thread.h 22 * @info micro thread manager 23 */ 24 25 #ifndef ___MICRO_THREAD_H__ 26 #define ___MICRO_THREAD_H__ 27 28 #include <stdlib.h> 29 #include <stdio.h> 30 #include <unistd.h> 31 #include <sys/types.h> 32 #include <sys/socket.h> 33 #include <sys/ioctl.h> 34 #include <sys/uio.h> 35 #include <sys/time.h> 36 #include <sys/mman.h> 37 #include <sys/resource.h> 38 #include <sys/queue.h> 39 #include <fcntl.h> 40 #include <signal.h> 41 #include <errno.h> 42 #include <setjmp.h> 43 #include <stdarg.h> 44 45 #include <set> 46 #include <vector> 47 #include <queue> 48 #include "heap.h" 49 #include "kqueue_proxy.h" 50 #include "heap_timer.h" 51 52 using std::vector; 53 using std::set; 54 using std::queue; 55 56 namespace NS_MICRO_THREAD { 57 58 #define STACK_PAD_SIZE 128 59 #define MEM_PAGE_SIZE 4096 60 #define DEFAULT_STACK_SIZE STACK_PAD_SIZE * 1024 61 #define DEFAULT_THREAD_NUM 5000 62 #define MAX_THREAD_NUM 800000 63 64 typedef unsigned long long utime64_t; 65 typedef void (*ThreadStart)(void*); 66 67 class ScheduleObj 68 { 69 public: 70 71 static ScheduleObj* Instance (void); 72 73 utime64_t ScheduleGetTime(void); 74 75 void ScheduleThread(void); 76 77 void ScheduleSleep(void); 78 79 void SchedulePend(void); 80 81 void ScheduleUnpend(void* thread); 82 83 void ScheduleReclaim(void); 84 85 void ScheduleStartRun(void); 86 87 private: 88 static ScheduleObj* _instance; 89 }; 90 91 struct MtStack 92 { 93 int _stk_size; 94 int _vaddr_size; 95 char *_vaddr; 96 void *_esp; 97 char *_stk_bottom; 98 char *_stk_top; 99 void *_private; 100 int valgrind_id; 101 }; 102 103 class Thread : public HeapEntry 104 { 105 public: 106 107 explicit Thread(int stack_size = 0); 108 virtual ~Thread(){}; 109 110 virtual void Run(void){}; 111 112 bool Initial(void); 113 114 void Destroy(void); 115 116 void Reset(void); 117 118 void sleep(int ms); 119 120 void Wait(); 121 122 void SwitchContext(void); 123 124 int SaveContext(void); 125 126 void RestoreContext(void); 127 128 utime64_t GetWakeupTime(void) { 129 return _wakeup_time; 130 }; 131 132 void SetWakeupTime(utime64_t waketime) { 133 _wakeup_time = waketime; 134 }; 135 136 void SetPrivate(void *data) 137 { 138 _stack->_private = data; 139 } 140 141 void* GetPrivate() 142 { 143 return _stack->_private; 144 } 145 146 bool CheckStackHealth(char *esp); 147 148 protected: 149 150 virtual void CleanState(void){}; 151 152 virtual bool InitStack(void); 153 154 virtual void FreeStack(void); 155 156 virtual void InitContext(void); 157 158 private: 159 MtStack* _stack; 160 jmp_buf _jmpbuf; 161 int _stack_size; 162 utime64_t _wakeup_time; 163 }; 164 165 class MicroThread : public Thread 166 { 167 public: 168 enum ThreadType 169 { 170 NORMAL = 0, ///< normal thread, no dynamic allocated stack infomations. 171 PRIMORDIAL = 1, ///< primordial thread, created when frame initialized. 172 DAEMON = 2, ///< daemon thread, IO event management and scheduling trigger. 173 SUB_THREAD = 3, ///< sub thread, run simple task. 174 }; 175 176 enum ThreadFlag 177 { 178 NOT_INLIST = 0x0, 179 FREE_LIST = 0x1, 180 IO_LIST = 0x2, 181 SLEEP_LIST = 0x4, 182 RUN_LIST = 0x8, 183 PEND_LIST = 0x10, 184 SUB_LIST = 0x20, 185 186 }; 187 188 enum ThreadState 189 { 190 INITIAL = 0, 191 RUNABLE = 1, 192 RUNNING = 2, 193 SLEEPING = 3, 194 PENDING = 4, 195 }; 196 197 typedef TAILQ_ENTRY(MicroThread) ThreadLink; 198 typedef TAILQ_HEAD(__ThreadSubTailq, MicroThread) SubThreadList; 199 200 public: 201 202 MicroThread(ThreadType type = NORMAL); 203 ~MicroThread(){}; 204 205 ThreadLink _entry; 206 ThreadLink _sub_entry; 207 208 virtual utime64_t HeapValue() { 209 return GetWakeupTime(); 210 }; 211 212 virtual void Run(void); 213 214 void ClearAllFd(void) { 215 TAILQ_INIT(&_fdset); 216 }; 217 void AddFd(KqueuerObj* efpd) { 218 TAILQ_INSERT_TAIL(&_fdset, efpd, _entry); 219 }; 220 void AddFdList(KqObjList* fdset) { 221 TAILQ_CONCAT(&_fdset, fdset, _entry); 222 }; 223 KqObjList& GetFdSet(void) { 224 return _fdset; 225 }; 226 227 void SetType(ThreadType type) { 228 _type = type; 229 }; 230 ThreadType GetType(void) { 231 return _type; 232 }; 233 234 bool IsDaemon(void) { 235 return (DAEMON == _type); 236 }; 237 bool IsPrimo(void) { 238 return (PRIMORDIAL == _type); 239 }; 240 bool IsSubThread(void) { 241 return (SUB_THREAD == _type); 242 }; 243 244 void SetParent(MicroThread* parent) { 245 _parent = parent; 246 }; 247 MicroThread* GetParent() { 248 return _parent; 249 }; 250 void WakeupParent(); 251 252 void AddSubThread(MicroThread* sub); 253 void RemoveSubThread(MicroThread* sub); 254 bool HasNoSubThread(); 255 256 void SetState(ThreadState state) { 257 _state = state; 258 }; 259 ThreadState GetState(void) { 260 return _state; 261 } 262 263 void SetFlag(ThreadFlag flag) { 264 _flag = (ThreadFlag)(_flag | flag); 265 }; 266 void UnsetFlag(ThreadFlag flag) { 267 _flag = (ThreadFlag)(_flag & ~flag); 268 }; 269 bool HasFlag(ThreadFlag flag) { 270 return _flag & flag; 271 }; 272 ThreadFlag GetFlag() { 273 return _flag; 274 }; 275 276 void SetSartFunc(ThreadStart func, void* args) { 277 _start = func; 278 _args = args; 279 }; 280 281 void* GetThreadArgs() { 282 return _args; 283 } 284 285 protected: 286 287 virtual void CleanState(void); 288 289 private: 290 ThreadState _state; 291 ThreadType _type; 292 ThreadFlag _flag; 293 KqObjList _fdset; 294 SubThreadList _sub_list; 295 MicroThread* _parent; 296 ThreadStart _start; 297 void* _args; 298 299 }; 300 typedef std::set<MicroThread*> ThreadSet; 301 typedef std::queue<MicroThread*> ThreadList; 302 303 304 class LogAdapter 305 { 306 public: 307 308 LogAdapter(){}; 309 virtual ~LogAdapter(){}; 310 311 virtual bool CheckDebug(){ return true;}; 312 virtual bool CheckTrace(){ return true;}; 313 virtual bool CheckError(){ return true;}; 314 315 virtual void LogDebug(char* fmt, ...){}; 316 virtual void LogTrace(char* fmt, ...){}; 317 virtual void LogError(char* fmt, ...){}; 318 319 virtual void AttrReportAdd(int attr, int iValue){}; 320 virtual void AttrReportSet(int attr, int iValue){}; 321 322 }; 323 324 class DefaultLogAdapter :public LogAdapter 325 { 326 public: 327 328 329 bool CheckDebug(){ return false;}; 330 bool CheckTrace(){ return false;}; 331 bool CheckError(){ return false;}; 332 333 inline void LogDebug(char* fmt, ...){ 334 va_list args; 335 char szBuff[1024]; 336 va_start(args, fmt); 337 memset(szBuff, 0, sizeof(szBuff)); 338 vsprintf(szBuff, fmt, args); 339 va_end(args); 340 printf("%s\n",szBuff); 341 }; 342 inline void LogTrace(char* fmt, ...){ 343 va_list args; 344 char szBuff[1024]; 345 va_start(args, fmt); 346 memset(szBuff, 0, sizeof(szBuff)); 347 vsprintf(szBuff, fmt, args); 348 va_end(args); 349 printf("%s\n",szBuff); 350 }; 351 inline void LogError(char* fmt, ...){ 352 va_list args; 353 char szBuff[1024]; 354 va_start(args, fmt); 355 memset(szBuff, 0, sizeof(szBuff)); 356 vsprintf(szBuff, fmt, args); 357 va_end(args); 358 printf("%s\n",szBuff); 359 }; 360 361 }; 362 363 class ThreadPool 364 { 365 public: 366 367 static unsigned int default_thread_num; 368 static unsigned int last_default_thread_num; 369 static unsigned int default_stack_size; 370 371 static void SetDefaultThreadNum(unsigned int num) { 372 default_thread_num = num; 373 }; 374 375 static void SetDefaultStackSize(unsigned int size) { 376 default_stack_size = (size + MEM_PAGE_SIZE - 1) / MEM_PAGE_SIZE * MEM_PAGE_SIZE; 377 }; 378 379 bool InitialPool(int max_num); 380 381 void DestroyPool (void); 382 383 MicroThread* AllocThread(void); 384 385 void FreeThread(MicroThread* thread); 386 387 int GetUsedNum(void); 388 389 private: 390 ThreadList _freelist; 391 int _total_num; 392 int _use_num; 393 int _max_num; 394 }; 395 396 typedef TAILQ_HEAD(__ThreadTailq, MicroThread) ThreadTailq; 397 398 class MtFrame : public KqueueProxy, public ThreadPool 399 { 400 private: 401 static MtFrame* _instance; 402 LogAdapter* _log_adpt; 403 ThreadList _runlist; 404 ThreadTailq _iolist; 405 ThreadTailq _pend_list; 406 HeapList _sleeplist; 407 MicroThread* _daemon; 408 MicroThread* _primo; 409 MicroThread* _curr_thread; 410 utime64_t _last_clock; 411 int _waitnum; 412 CTimerMng* _timer; 413 int _realtime; 414 415 public: 416 friend class ScheduleObj; 417 418 public: 419 420 static MtFrame* Instance (void); 421 422 static int sendto(int fd, const void *msg, int len, int flags, const struct sockaddr *to, int tolen, int timeout); 423 424 static int recvfrom(int fd, void *buf, int len, int flags, struct sockaddr *from, socklen_t *fromlen, int timeout); 425 426 static int connect(int fd, const struct sockaddr *addr, int addrlen, int timeout); 427 428 static int accept(int fd, struct sockaddr *addr, socklen_t *addrlen, int timeout); 429 430 static ssize_t read(int fd, void *buf, size_t nbyte, int timeout); 431 432 static ssize_t write(int fd, const void *buf, size_t nbyte, int timeout); 433 434 static int recv(int fd, void *buf, int len, int flags, int timeout); 435 436 static ssize_t send(int fd, const void *buf, size_t nbyte, int flags, int timeout); 437 438 static void sleep(int ms); 439 440 static int WaitEvents(int fd, int events, int timeout); 441 442 static MicroThread* CreateThread(ThreadStart entry, void *args, bool runable = true); 443 444 static void DaemonRun(void* args); 445 static int Loop(void* args); 446 447 MicroThread *GetRootThread(); 448 449 bool InitFrame(LogAdapter* logadpt = NULL, int max_thread_num = MAX_THREAD_NUM); 450 451 void SetHookFlag(); 452 453 void Destroy (void); 454 455 char* Version(void); 456 457 utime64_t GetLastClock(void) { 458 if(_realtime) 459 { 460 return GetSystemMS(); 461 } 462 return _last_clock; 463 }; 464 465 MicroThread* GetActiveThread(void) { 466 return _curr_thread; 467 }; 468 469 int RunWaitNum(void) { 470 return _waitnum; 471 }; 472 473 LogAdapter* GetLogAdpt(void) { 474 return _log_adpt; 475 }; 476 477 CTimerMng* GetTimerMng(void) { 478 return _timer; 479 }; 480 481 virtual int KqueueGetTimeout(void); 482 483 virtual bool KqueueSchedule(KqObjList* fdlist, KqueuerObj* fd, int timeout); 484 485 void WaitNotify(utime64_t timeout); 486 487 void NotifyThread(MicroThread* thread); 488 489 void SwapDaemonThread(); 490 491 void RemoveIoWait(MicroThread* thread); 492 493 void InsertRunable(MicroThread* thread); 494 495 void InsertPend(MicroThread* thread); 496 497 void RemovePend(MicroThread* thread); 498 499 void SetRealTime(int realtime_) 500 { 501 _realtime =realtime_; 502 } 503 private: 504 505 MtFrame():_realtime(1){ _curr_thread = NULL; }; 506 507 MicroThread* DaemonThread(void){ 508 return _daemon; 509 }; 510 511 void ThreadSchdule(void); 512 513 void CheckExpired(); 514 515 void WakeupTimeout(void); 516 517 void SetLastClock(utime64_t clock) { 518 _last_clock = clock; 519 }; 520 521 void SetActiveThread(MicroThread* thread) { 522 _curr_thread = thread; 523 }; 524 525 utime64_t GetSystemMS(void) { 526 struct timeval tv; 527 gettimeofday(&tv, NULL); 528 return (tv.tv_sec * 1000ULL + tv.tv_usec / 1000ULL); 529 }; 530 531 void InsertSleep(MicroThread* thread); 532 533 void RemoveSleep(MicroThread* thread); 534 535 void InsertIoWait(MicroThread* thread); 536 537 void RemoveRunable(MicroThread* thread); 538 539 }; 540 541 #define MTLOG_DEBUG(fmt, args...) \ 542 do { \ 543 register NS_MICRO_THREAD::MtFrame *fm = NS_MICRO_THREAD::MtFrame::Instance(); \ 544 if (fm && fm->GetLogAdpt() && fm->GetLogAdpt()->CheckDebug()) \ 545 { \ 546 fm->GetLogAdpt()->LogDebug((char*)"[%-10s][%-4d][%-10s]"fmt, \ 547 __FILE__, __LINE__, __FUNCTION__, ##args); \ 548 } \ 549 } while (0) 550 551 #define MTLOG_TRACE(fmt, args...) \ 552 do { \ 553 register NS_MICRO_THREAD::MtFrame *fm = NS_MICRO_THREAD::MtFrame::Instance(); \ 554 if (fm && fm->GetLogAdpt() && fm->GetLogAdpt()->CheckTrace()) \ 555 { \ 556 fm->GetLogAdpt()->LogTrace((char*)"[%-10s][%-4d][%-10s]"fmt, \ 557 __FILE__, __LINE__, __FUNCTION__, ##args); \ 558 } \ 559 } while (0) 560 561 #define MTLOG_ERROR(fmt, args...) \ 562 do { \ 563 register NS_MICRO_THREAD::MtFrame *fm = NS_MICRO_THREAD::MtFrame::Instance(); \ 564 if (fm && fm->GetLogAdpt() && fm->GetLogAdpt()->CheckError()) \ 565 { \ 566 fm->GetLogAdpt()->LogError((char*)"[%-10s][%-4d][%-10s]"fmt, \ 567 __FILE__, __LINE__, __FUNCTION__, ##args); \ 568 } \ 569 } while (0) 570 571 #define MT_ATTR_API(ATTR, VALUE) \ 572 do { \ 573 register NS_MICRO_THREAD::MtFrame *fm = NS_MICRO_THREAD::MtFrame::Instance(); \ 574 if (fm && fm->GetLogAdpt()) \ 575 { \ 576 fm->GetLogAdpt()->AttrReportAdd(ATTR, VALUE); \ 577 } \ 578 } while (0) 579 580 #define MT_ATTR_API_SET(ATTR, VALUE) \ 581 do { \ 582 register NS_MICRO_THREAD::MtFrame *fm = NS_MICRO_THREAD::MtFrame::Instance(); \ 583 if (fm && fm->GetLogAdpt()) \ 584 { \ 585 fm->GetLogAdpt()->AttrReportSet(ATTR, VALUE); \ 586 } \ 587 } while (0) 588 589 590 591 }// NAMESPACE NS_MICRO_THREAD 592 593 #endif 594 595