xref: /f-stack/app/micro_thread/micro_thread.h (revision 4418919f)
1 
2 /**
3  * Tencent is pleased to support the open source community by making MSEC available.
4  *
5  * Copyright (C) 2016 THL A29 Limited, a Tencent company. All rights reserved.
6  *
7  * Licensed under the GNU General Public License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License. You may
9  * obtain a copy of the License at
10  *
11  *     https://opensource.org/licenses/GPL-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software distributed under the
14  * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
15  * either express or implied. See the License for the specific language governing permissions
16  * and limitations under the License.
17  */
18 
19 
20 /**
21  *  @filename micro_thread.h
22  *  @info  micro thread manager
23  */
24 
25 #ifndef ___MICRO_THREAD_H__
26 #define ___MICRO_THREAD_H__
27 
28 #include <stdlib.h>
29 #include <stdio.h>
30 #include <unistd.h>
31 #include <sys/types.h>
32 #include <sys/socket.h>
33 #include <sys/ioctl.h>
34 #include <sys/uio.h>
35 #include <sys/time.h>
36 #include <sys/mman.h>
37 #include <sys/resource.h>
38 #include <sys/queue.h>
39 #include <fcntl.h>
40 #include <signal.h>
41 #include <errno.h>
42 #include <setjmp.h>
43 #include <stdarg.h>
44 
45 #include <set>
46 #include <vector>
47 #include <queue>
48 #include "heap.h"
49 #include "kqueue_proxy.h"
50 #include "heap_timer.h"
51 
52 using std::vector;
53 using std::set;
54 using std::queue;
55 
56 namespace NS_MICRO_THREAD {
57 
58 #define STACK_PAD_SIZE      128
59 #define MEM_PAGE_SIZE       4096
60 #define DEFAULT_STACK_SIZE  STACK_PAD_SIZE * 1024
61 #define DEFAULT_THREAD_NUM  5000
62 #define MAX_THREAD_NUM  800000
63 
64 typedef unsigned long long  utime64_t;
65 typedef void (*ThreadStart)(void*);
66 
67 class ScheduleObj
68 {
69 public:
70 
71     static ScheduleObj* Instance (void);
72 
73     utime64_t ScheduleGetTime(void);
74 
75     void ScheduleThread(void);
76 
77     void ScheduleSleep(void);
78 
79     void SchedulePend(void);
80 
81     void ScheduleUnpend(void* thread);
82 
83     void ScheduleReclaim(void);
84 
85     void ScheduleStartRun(void);
86 
87 private:
88     static ScheduleObj* _instance;
89 };
90 
91 struct MtStack
92 {
93     int  _stk_size;
94     int  _vaddr_size;
95     char *_vaddr;
96     void *_esp;
97     char *_stk_bottom;
98     char *_stk_top;
99     void *_private;
100     int valgrind_id;
101 };
102 
103 class Thread : public  HeapEntry
104 {
105 public:
106 
107     explicit Thread(int stack_size = 0);
108     virtual ~Thread(){};
109 
110     virtual void Run(void){};
111 
112     bool Initial(void);
113 
114     void Destroy(void);
115 
116     void Reset(void);
117 
118     void sleep(int ms);
119 
120     void Wait();
121 
122     void SwitchContext(void);
123 
124     int SaveContext(void);
125 
126     void RestoreContext(void);
127 
128     utime64_t GetWakeupTime(void) {
129         return _wakeup_time;
130     };
131 
132     void SetWakeupTime(utime64_t waketime) {
133         _wakeup_time = waketime;
134     };
135 
136     void SetPrivate(void *data)
137     {
138         _stack->_private = data;
139     }
140 
141     void* GetPrivate()
142     {
143         return _stack->_private;
144     }
145 
146     bool CheckStackHealth(char *esp);
147 
148 protected:
149 
150     virtual void CleanState(void){};
151 
152     virtual bool InitStack(void);
153 
154     virtual void FreeStack(void);
155 
156     virtual void InitContext(void);
157 
158 private:
159     MtStack* _stack;
160     jmp_buf _jmpbuf;
161     int _stack_size;
162     utime64_t _wakeup_time;
163 };
164 
165 class MicroThread : public Thread
166 {
167 public:
168     enum ThreadType
169     {
170         NORMAL          =   0,   ///< normal thread, no dynamic allocated stack infomations.
171         PRIMORDIAL      =   1,   ///< primordial thread, created when frame initialized.
172         DAEMON          =   2,   ///< daemon thread, IO event management and scheduling trigger.
173         SUB_THREAD      =   3,   ///< sub thread, run simple task.
174     };
175 
176     enum ThreadFlag
177     {
178         NOT_INLIST    =  0x0,
179         FREE_LIST    =  0x1,
180         IO_LIST        =  0x2,
181         SLEEP_LIST    =  0x4,
182         RUN_LIST    =  0x8,
183         PEND_LIST   =  0x10,
184         SUB_LIST    =  0x20,
185 
186     };
187 
188     enum ThreadState
189     {
190         INITIAL         =  0,
191         RUNABLE         =  1,
192         RUNNING         =  2,
193         SLEEPING        =  3,
194         PENDING         =  4,
195     };
196 
197     typedef TAILQ_ENTRY(MicroThread) ThreadLink;
198     typedef TAILQ_HEAD(__ThreadSubTailq, MicroThread) SubThreadList;
199 
200 public:
201 
202     MicroThread(ThreadType type = NORMAL);
203     ~MicroThread(){};
204 
205     ThreadLink _entry;
206     ThreadLink _sub_entry;
207 
208     virtual utime64_t HeapValue() {
209         return GetWakeupTime();
210     };
211 
212     virtual void Run(void);
213 
214     void ClearAllFd(void) {
215         TAILQ_INIT(&_fdset);
216     };
217     void AddFd(KqueuerObj* efpd) {
218         TAILQ_INSERT_TAIL(&_fdset, efpd, _entry);
219     };
220     void AddFdList(KqObjList* fdset) {
221         TAILQ_CONCAT(&_fdset, fdset, _entry);
222     };
223     KqObjList& GetFdSet(void) {
224         return _fdset;
225     };
226 
227     void SetType(ThreadType type) {
228         _type = type;
229     };
230     ThreadType GetType(void) {
231         return _type;
232     };
233 
234     bool IsDaemon(void) {
235         return (DAEMON == _type);
236     };
237     bool IsPrimo(void) {
238         return (PRIMORDIAL == _type);
239     };
240     bool IsSubThread(void) {
241         return (SUB_THREAD == _type);
242     };
243 
244     void SetParent(MicroThread* parent) {
245         _parent = parent;
246     };
247     MicroThread* GetParent() {
248         return _parent;
249     };
250     void WakeupParent();
251 
252     void AddSubThread(MicroThread* sub);
253     void RemoveSubThread(MicroThread* sub);
254     bool HasNoSubThread();
255 
256     void SetState(ThreadState state) {
257         _state = state;
258     };
259     ThreadState GetState(void) {
260         return _state;
261     }
262 
263     void SetFlag(ThreadFlag flag) {
264     _flag = (ThreadFlag)(_flag | flag);
265     };
266     void UnsetFlag(ThreadFlag flag) {
267         _flag = (ThreadFlag)(_flag & ~flag);
268     };
269     bool HasFlag(ThreadFlag flag) {
270     return _flag & flag;
271     };
272     ThreadFlag GetFlag() {
273         return _flag;
274     };
275 
276     void SetSartFunc(ThreadStart func, void* args) {
277         _start = func;
278         _args  = args;
279     };
280 
281     void* GetThreadArgs() {
282         return _args;
283     }
284 
285 protected:
286 
287     virtual void CleanState(void);
288 
289 private:
290     ThreadState _state;
291     ThreadType _type;
292     ThreadFlag _flag;
293     KqObjList _fdset;
294     SubThreadList _sub_list;
295     MicroThread* _parent;
296     ThreadStart _start;
297     void* _args;
298 
299 };
300 typedef std::set<MicroThread*> ThreadSet;
301 typedef std::queue<MicroThread*> ThreadList;
302 
303 
304 class LogAdapter
305 {
306 public:
307 
308     LogAdapter(){};
309     virtual ~LogAdapter(){};
310 
311     virtual bool CheckDebug(){ return true;};
312     virtual bool CheckTrace(){ return true;};
313     virtual bool CheckError(){ return true;};
314 
315     virtual void LogDebug(char* fmt, ...){};
316     virtual void LogTrace(char* fmt, ...){};
317     virtual void LogError(char* fmt, ...){};
318 
319     virtual void AttrReportAdd(int attr, int iValue){};
320     virtual void AttrReportSet(int attr, int iValue){};
321 
322 };
323 
324 class DefaultLogAdapter :public LogAdapter
325 {
326 public:
327 
328 
329     bool CheckDebug(){ return false;};
330     bool CheckTrace(){ return false;};
331     bool CheckError(){ return false;};
332 
333     inline void LogDebug(char* fmt, ...){
334         va_list args;
335         char szBuff[1024];
336         va_start(args, fmt);
337         memset(szBuff, 0, sizeof(szBuff));
338         vsprintf(szBuff, fmt, args);
339         va_end(args);
340         printf("%s\n",szBuff);
341     };
342     inline void LogTrace(char* fmt, ...){
343         va_list args;
344         char szBuff[1024];
345         va_start(args, fmt);
346         memset(szBuff, 0, sizeof(szBuff));
347         vsprintf(szBuff, fmt, args);
348         va_end(args);
349         printf("%s\n",szBuff);
350     };
351     inline void LogError(char* fmt, ...){
352         va_list args;
353         char szBuff[1024];
354         va_start(args, fmt);
355         memset(szBuff, 0, sizeof(szBuff));
356         vsprintf(szBuff, fmt, args);
357         va_end(args);
358         printf("%s\n",szBuff);
359     };
360 
361 };
362 
363 class ThreadPool
364 {
365 public:
366 
367     static unsigned int default_thread_num;
368     static unsigned int last_default_thread_num;
369     static unsigned int default_stack_size;
370 
371     static void SetDefaultThreadNum(unsigned int num) {
372         default_thread_num = num;
373     };
374 
375     static void SetDefaultStackSize(unsigned int size) {
376         default_stack_size = (size + MEM_PAGE_SIZE - 1) / MEM_PAGE_SIZE * MEM_PAGE_SIZE;
377     };
378 
379     bool InitialPool(int max_num);
380 
381     void DestroyPool (void);
382 
383     MicroThread* AllocThread(void);
384 
385     void FreeThread(MicroThread* thread);
386 
387     int GetUsedNum(void);
388 
389 private:
390     ThreadList      _freelist;
391     int             _total_num;
392     int             _use_num;
393     int             _max_num;
394 };
395 
396 typedef TAILQ_HEAD(__ThreadTailq, MicroThread) ThreadTailq;
397 
398 class MtFrame : public KqueueProxy, public ThreadPool
399 {
400 private:
401     static MtFrame* _instance;
402     LogAdapter*     _log_adpt;
403     ThreadList      _runlist;
404     ThreadTailq     _iolist;
405     ThreadTailq     _pend_list;
406     HeapList        _sleeplist;
407     MicroThread*    _daemon;
408     MicroThread*    _primo;
409     MicroThread*    _curr_thread;
410     utime64_t       _last_clock;
411     int             _waitnum;
412     CTimerMng*      _timer;
413     int             _realtime;
414 
415 public:
416     friend class ScheduleObj;
417 
418 public:
419 
420     static MtFrame* Instance (void);
421 
422     static int sendto(int fd, const void *msg, int len, int flags, const struct sockaddr *to, int tolen, int timeout);
423 
424     static int recvfrom(int fd, void *buf, int len, int flags, struct sockaddr *from, socklen_t *fromlen, int timeout);
425 
426     static int connect(int fd, const struct sockaddr *addr, int addrlen, int timeout);
427 
428     static int accept(int fd, struct sockaddr *addr, socklen_t *addrlen, int timeout);
429 
430     static ssize_t read(int fd, void *buf, size_t nbyte, int timeout);
431 
432     static ssize_t write(int fd, const void *buf, size_t nbyte, int timeout);
433 
434     static int recv(int fd, void *buf, int len, int flags, int timeout);
435 
436     static ssize_t send(int fd, const void *buf, size_t nbyte, int flags, int timeout);
437 
438     static void sleep(int ms);
439 
440     static int WaitEvents(int fd, int events, int timeout);
441 
442     static MicroThread* CreateThread(ThreadStart entry, void *args, bool runable = true);
443 
444     static void DaemonRun(void* args);
445     static int Loop(void* args);
446 
447     MicroThread *GetRootThread();
448 
449     bool InitFrame(LogAdapter* logadpt = NULL, int max_thread_num = MAX_THREAD_NUM);
450 
451     void SetHookFlag();
452 
453     void Destroy (void);
454 
455     char* Version(void);
456 
457     utime64_t GetLastClock(void) {
458         if(_realtime)
459         {
460             return GetSystemMS();
461         }
462         return _last_clock;
463     };
464 
465     MicroThread* GetActiveThread(void) {
466         return _curr_thread;
467     };
468 
469     int RunWaitNum(void) {
470         return _waitnum;
471     };
472 
473     LogAdapter* GetLogAdpt(void) {
474         return _log_adpt;
475     };
476 
477     CTimerMng* GetTimerMng(void) {
478         return _timer;
479     };
480 
481     virtual int KqueueGetTimeout(void);
482 
483     virtual bool KqueueSchedule(KqObjList* fdlist, KqueuerObj* fd, int timeout);
484 
485     void WaitNotify(utime64_t timeout);
486 
487     void NotifyThread(MicroThread* thread);
488 
489     void SwapDaemonThread();
490 
491     void RemoveIoWait(MicroThread* thread);
492 
493     void InsertRunable(MicroThread* thread);
494 
495     void InsertPend(MicroThread* thread);
496 
497     void RemovePend(MicroThread* thread);
498 
499     void SetRealTime(int realtime_)
500     {
501         _realtime =realtime_;
502     }
503 private:
504 
505     MtFrame():_realtime(1){ _curr_thread = NULL; };
506 
507     MicroThread* DaemonThread(void){
508         return _daemon;
509     };
510 
511     void ThreadSchdule(void);
512 
513     void CheckExpired();
514 
515     void WakeupTimeout(void);
516 
517     void SetLastClock(utime64_t clock) {
518         _last_clock = clock;
519     };
520 
521     void SetActiveThread(MicroThread* thread) {
522         _curr_thread = thread;
523     };
524 
525     utime64_t GetSystemMS(void) {
526         struct timeval tv;
527         gettimeofday(&tv, NULL);
528         return (tv.tv_sec * 1000ULL + tv.tv_usec / 1000ULL);
529     };
530 
531     void InsertSleep(MicroThread* thread);
532 
533     void RemoveSleep(MicroThread* thread);
534 
535     void InsertIoWait(MicroThread* thread);
536 
537     void RemoveRunable(MicroThread* thread);
538 
539 };
540 
541 #define MTLOG_DEBUG(fmt, args...)                                              \
542 do {                                                                           \
543        register NS_MICRO_THREAD::MtFrame *fm = NS_MICRO_THREAD::MtFrame::Instance(); \
544        if (fm && fm->GetLogAdpt() && fm->GetLogAdpt()->CheckDebug())           \
545        {                                                                       \
546           fm->GetLogAdpt()->LogDebug((char*)"[%-10s][%-4d][%-10s]"fmt,         \
547                 __FILE__, __LINE__, __FUNCTION__, ##args);                     \
548        }                                                                       \
549 } while (0)
550 
551 #define MTLOG_TRACE(fmt, args...)                                              \
552 do {                                                                           \
553        register NS_MICRO_THREAD::MtFrame *fm = NS_MICRO_THREAD::MtFrame::Instance(); \
554        if (fm && fm->GetLogAdpt() && fm->GetLogAdpt()->CheckTrace())           \
555        {                                                                       \
556           fm->GetLogAdpt()->LogTrace((char*)"[%-10s][%-4d][%-10s]"fmt,         \
557                 __FILE__, __LINE__, __FUNCTION__, ##args);                     \
558        }                                                                       \
559 } while (0)
560 
561 #define MTLOG_ERROR(fmt, args...)                                              \
562 do {                                                                           \
563        register NS_MICRO_THREAD::MtFrame *fm = NS_MICRO_THREAD::MtFrame::Instance(); \
564        if (fm && fm->GetLogAdpt() && fm->GetLogAdpt()->CheckError())           \
565        {                                                                       \
566           fm->GetLogAdpt()->LogError((char*)"[%-10s][%-4d][%-10s]"fmt,         \
567                 __FILE__, __LINE__, __FUNCTION__, ##args);                     \
568        }                                                                       \
569 } while (0)
570 
571 #define MT_ATTR_API(ATTR, VALUE)                                               \
572 do {                                                                           \
573        register NS_MICRO_THREAD::MtFrame *fm = NS_MICRO_THREAD::MtFrame::Instance(); \
574        if (fm && fm->GetLogAdpt())                                             \
575        {                                                                       \
576           fm->GetLogAdpt()->AttrReportAdd(ATTR, VALUE);                        \
577        }                                                                       \
578 } while (0)
579 
580 #define MT_ATTR_API_SET(ATTR, VALUE)                                               \
581        do {                                                                           \
582               register NS_MICRO_THREAD::MtFrame *fm = NS_MICRO_THREAD::MtFrame::Instance(); \
583               if (fm && fm->GetLogAdpt())                                              \
584               {                                                                       \
585                  fm->GetLogAdpt()->AttrReportSet(ATTR, VALUE);                          \
586               }                                                                       \
587        } while (0)
588 
589 
590 
591 }// NAMESPACE NS_MICRO_THREAD
592 
593 #endif
594 
595