xref: /f-stack/app/micro_thread/micro_thread.h (revision 45438af0)
1 
2 /**
3  * Tencent is pleased to support the open source community by making MSEC available.
4  *
5  * Copyright (C) 2016 THL A29 Limited, a Tencent company. All rights reserved.
6  *
7  * Licensed under the GNU General Public License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License. You may
9  * obtain a copy of the License at
10  *
11  *     https://opensource.org/licenses/GPL-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software distributed under the
14  * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
15  * either express or implied. See the License for the specific language governing permissions
16  * and limitations under the License.
17  */
18 
19 
20 /**
21  *  @filename micro_thread.h
22  *  @info  micro thread manager
23  */
24 
25 #ifndef ___MICRO_THREAD_H__
26 #define ___MICRO_THREAD_H__
27 
28 #include <stdlib.h>
29 #include <stdio.h>
30 #include <unistd.h>
31 #include <sys/types.h>
32 #include <sys/socket.h>
33 #include <sys/ioctl.h>
34 #include <sys/uio.h>
35 #include <sys/time.h>
36 #include <sys/mman.h>
37 #include <sys/resource.h>
38 #include <sys/queue.h>
39 #include <fcntl.h>
40 #include <signal.h>
41 #include <errno.h>
42 #include <setjmp.h>
43 
44 #include <set>
45 #include <vector>
46 #include <queue>
47 #include "heap.h"
48 #include "kqueue_proxy.h"
49 #include "heap_timer.h"
50 
51 using std::vector;
52 using std::set;
53 using std::queue;
54 
55 namespace NS_MICRO_THREAD {
56 
57 #define STACK_PAD_SIZE      128
58 #define MEM_PAGE_SIZE       4096
59 #define DEFAULT_STACK_SIZE  128*1024
60 #define DEFAULT_THREAD_NUM  2000
61 
62 typedef unsigned long long  utime64_t;
63 typedef void (*ThreadStart)(void*);
64 
65 class ScheduleObj
66 {
67 public:
68 
69     static ScheduleObj* Instance (void);
70 
71     utime64_t ScheduleGetTime(void);
72 
73     void ScheduleThread(void);
74 
75     void ScheduleSleep(void);
76 
77     void SchedulePend(void);
78 
79     void ScheduleUnpend(void* thread);
80 
81     void ScheduleReclaim(void);
82 
83     void ScheduleStartRun(void);
84 
85 private:
86     static ScheduleObj* _instance;
87 };
88 
89 struct MtStack
90 {
91     int  _stk_size;
92     int  _vaddr_size;
93     char *_vaddr;
94     void *_esp;
95     char *_stk_bottom;
96     char *_stk_top;
97     void *_private;
98     int valgrind_id;
99 };
100 
101 class Thread : public  HeapEntry
102 {
103 public:
104 
105     explicit Thread(int stack_size = 0);
106     virtual ~Thread(){};
107 
108     virtual void Run(void){};
109 
110     bool Initial(void);
111 
112     void Destroy(void);
113 
114     void Reset(void);
115 
116     void sleep(int ms);
117 
118     void Wait();
119 
120     void SwitchContext(void);
121 
122     void RestoreContext(void);
123 
124     utime64_t GetWakeupTime(void) {
125         return _wakeup_time;
126     };
127 
128     void SetWakeupTime(utime64_t waketime) {
129         _wakeup_time = waketime;
130     };
131 
132     void SetPrivate(void *data)
133     {
134         _stack->_private = data;
135     }
136 
137     void* GetPrivate()
138     {
139         return _stack->_private;
140     }
141 
142     bool CheckStackHealth(char *esp);
143 
144 protected:
145 
146     virtual void CleanState(void){};
147 
148     virtual bool InitStack(void);
149 
150     virtual void FreeStack(void);
151 
152     virtual void InitContext(void);
153 
154 private:
155     MtStack* _stack;
156     jmp_buf _jmpbuf;
157     int _stack_size;
158     utime64_t _wakeup_time;
159 };
160 
161 class MicroThread : public Thread
162 {
163 public:
164     enum ThreadType
165     {
166         NORMAL          =   0,   ///< normal thread, no dynamic allocated stack infomations.
167         PRIMORDIAL      =   1,   ///< primordial thread, created when frame initialized.
168         DAEMON          =   2,   ///< daemon thread, IO event management and scheduling trigger.
169         SUB_THREAD      =   3,   ///< sub thread, run simple task.
170     };
171 
172     enum ThreadFlag
173     {
174         NOT_INLIST    =  0x0,
175         FREE_LIST    =  0x1,
176         IO_LIST        =  0x2,
177         SLEEP_LIST    =  0x4,
178         RUN_LIST    =  0x8,
179         PEND_LIST   =  0x10,
180         SUB_LIST    =  0x20,
181 
182     };
183 
184     enum ThreadState
185     {
186         INITIAL         =  0,
187         RUNABLE         =  1,
188         RUNNING         =  2,
189         SLEEPING        =  3,
190         PENDING         =  4,
191     };
192 
193     typedef TAILQ_ENTRY(MicroThread) ThreadLink;
194     typedef TAILQ_HEAD(__ThreadSubTailq, MicroThread) SubThreadList;
195 
196 public:
197 
198     MicroThread(ThreadType type = NORMAL);
199     ~MicroThread(){};
200 
201     ThreadLink _entry;
202     ThreadLink _sub_entry;
203 
204     virtual utime64_t HeapValue() {
205         return GetWakeupTime();
206     };
207 
208     virtual void Run(void);
209 
210     void ClearAllFd(void) {
211         TAILQ_INIT(&_fdset);
212     };
213     void AddFd(KqueuerObj* efpd) {
214         TAILQ_INSERT_TAIL(&_fdset, efpd, _entry);
215     };
216     void AddFdList(KqObjList* fdset) {
217         TAILQ_CONCAT(&_fdset, fdset, _entry);
218     };
219     KqObjList& GetFdSet(void) {
220         return _fdset;
221     };
222 
223     void SetType(ThreadType type) {
224         _type = type;
225     };
226     ThreadType GetType(void) {
227         return _type;
228     };
229 
230     bool IsDaemon(void) {
231         return (DAEMON == _type);
232     };
233     bool IsPrimo(void) {
234         return (PRIMORDIAL == _type);
235     };
236     bool IsSubThread(void) {
237         return (SUB_THREAD == _type);
238     };
239 
240     void SetParent(MicroThread* parent) {
241         _parent = parent;
242     };
243     MicroThread* GetParent() {
244         return _parent;
245     };
246     void WakeupParent();
247 
248     void AddSubThread(MicroThread* sub);
249     void RemoveSubThread(MicroThread* sub);
250     bool HasNoSubThread();
251 
252     void SetState(ThreadState state) {
253         _state = state;
254     };
255     ThreadState GetState(void) {
256         return _state;
257     }
258 
259     void SetFlag(ThreadFlag flag) {
260     _flag = (ThreadFlag)(_flag | flag);
261     };
262     void UnsetFlag(ThreadFlag flag) {
263         _flag = (ThreadFlag)(_flag & ~flag);
264     };
265     bool HasFlag(ThreadFlag flag) {
266     return _flag & flag;
267     };
268     ThreadFlag GetFlag() {
269         return _flag;
270     };
271 
272     void SetSartFunc(ThreadStart func, void* args) {
273         _start = func;
274         _args  = args;
275     };
276 
277     void* GetThreadArgs() {
278         return _args;
279     }
280 
281 protected:
282 
283     virtual void CleanState(void);
284 
285 private:
286     ThreadState _state;
287     ThreadType _type;
288     ThreadFlag _flag;
289     KqObjList _fdset;
290     SubThreadList _sub_list;
291     MicroThread* _parent;
292     ThreadStart _start;
293     void* _args;
294 
295 };
296 typedef std::set<MicroThread*> ThreadSet;
297 typedef std::queue<MicroThread*> ThreadList;
298 
299 
300 class LogAdapter
301 {
302 public:
303 
304     LogAdapter(){};
305     virtual ~LogAdapter(){};
306 
307     virtual bool CheckDebug(){ return true;};
308     virtual bool CheckTrace(){ return true;};
309     virtual bool CheckError(){ return true;};
310 
311     virtual void LogDebug(char* fmt, ...){};
312     virtual void LogTrace(char* fmt, ...){};
313     virtual void LogError(char* fmt, ...){};
314 
315     virtual void AttrReportAdd(int attr, int iValue){};
316     virtual void AttrReportSet(int attr, int iValue){};
317 
318 };
319 
320 
321 class ThreadPool
322 {
323 public:
324 
325     static unsigned int default_thread_num;
326     static unsigned int default_stack_size;
327 
328     static void SetDefaultThreadNum(unsigned int num) {
329         default_thread_num = num;
330     };
331 
332     static void SetDefaultStackSize(unsigned int size) {
333         default_stack_size = (size + MEM_PAGE_SIZE - 1) / MEM_PAGE_SIZE * MEM_PAGE_SIZE;
334     };
335 
336     bool InitialPool(int max_num);
337 
338     void DestroyPool (void);
339 
340     MicroThread* AllocThread(void);
341 
342     void FreeThread(MicroThread* thread);
343 
344     int GetUsedNum(void);
345 
346 private:
347     ThreadList      _freelist;
348     int             _total_num;
349     int             _use_num;
350     int             _max_num;
351 };
352 
353 typedef TAILQ_HEAD(__ThreadTailq, MicroThread) ThreadTailq;
354 
355 class MtFrame : public KqueueProxy, public ThreadPool
356 {
357 private:
358     static MtFrame* _instance;
359     LogAdapter*     _log_adpt;
360     ThreadList      _runlist;
361     ThreadTailq     _iolist;
362     ThreadTailq     _pend_list;
363     HeapList        _sleeplist;
364     MicroThread*    _daemon;
365     MicroThread*    _primo;
366     MicroThread*    _curr_thread;
367     utime64_t       _last_clock;
368     int             _waitnum;
369     CTimerMng*      _timer;
370     int             _realtime;
371 
372 public:
373     friend class ScheduleObj;
374 
375 public:
376 
377     static MtFrame* Instance (void);
378 
379     static int sendto(int fd, const void *msg, int len, int flags, const struct sockaddr *to, int tolen, int timeout);
380 
381     static int recvfrom(int fd, void *buf, int len, int flags, struct sockaddr *from, socklen_t *fromlen, int timeout);
382 
383     static int connect(int fd, const struct sockaddr *addr, int addrlen, int timeout);
384 
385     static int accept(int fd, struct sockaddr *addr, socklen_t *addrlen, int timeout);
386 
387     static ssize_t read(int fd, void *buf, size_t nbyte, int timeout);
388 
389     static ssize_t write(int fd, const void *buf, size_t nbyte, int timeout);
390 
391     static int recv(int fd, void *buf, int len, int flags, int timeout);
392 
393     static ssize_t send(int fd, const void *buf, size_t nbyte, int flags, int timeout);
394 
395     static void sleep(int ms);
396 
397     static int WaitEvents(int fd, int events, int timeout);
398 
399     static MicroThread* CreateThread(ThreadStart entry, void *args, bool runable = true);
400 
401     static void DaemonRun(void* args);
402     static int Loop(void* args);
403 
404     MicroThread *GetRootThread();
405 
406     bool InitFrame(LogAdapter* logadpt = NULL, int max_thread_num = 50000);
407 
408     void SetHookFlag();
409 
410     void Destroy (void);
411 
412     char* Version(void);
413 
414     utime64_t GetLastClock(void) {
415         if(_realtime)
416         {
417             return GetSystemMS();
418         }
419         return _last_clock;
420     };
421 
422     MicroThread* GetActiveThread(void) {
423         return _curr_thread;
424     };
425 
426     int RunWaitNum(void) {
427         return _waitnum;
428     };
429 
430     LogAdapter* GetLogAdpt(void) {
431         return _log_adpt;
432     };
433 
434     CTimerMng* GetTimerMng(void) {
435         return _timer;
436     };
437 
438     virtual int KqueueGetTimeout(void);
439 
440     virtual bool KqueueSchedule(KqObjList* fdlist, KqueuerObj* fd, int timeout);
441 
442     void WaitNotify(utime64_t timeout);
443 
444     void RemoveIoWait(MicroThread* thread);
445 
446     void InsertRunable(MicroThread* thread);
447 
448     void InsertPend(MicroThread* thread);
449 
450     void RemovePend(MicroThread* thread);
451 
452     void SetRealTime(int realtime_)
453     {
454         _realtime =realtime_;
455     }
456 private:
457 
458     MtFrame():_realtime(1){ _curr_thread = NULL; };
459 
460     MicroThread* DaemonThread(void){
461         return _daemon;
462     };
463 
464     void ThreadSchdule(void);
465 
466     void CheckExpired();
467 
468     void WakeupTimeout(void);
469 
470     void SetLastClock(utime64_t clock) {
471         _last_clock = clock;
472     };
473 
474     void SetActiveThread(MicroThread* thread) {
475         _curr_thread = thread;
476     };
477 
478     utime64_t GetSystemMS(void) {
479         struct timeval tv;
480         gettimeofday(&tv, NULL);
481         return (tv.tv_sec * 1000ULL + tv.tv_usec / 1000ULL);
482     };
483 
484     void InsertSleep(MicroThread* thread);
485 
486     void RemoveSleep(MicroThread* thread);
487 
488     void InsertIoWait(MicroThread* thread);
489 
490     void RemoveRunable(MicroThread* thread);
491 
492 };
493 
494 #define MTLOG_DEBUG(fmt, args...)                                              \
495 do {                                                                           \
496        register NS_MICRO_THREAD::MtFrame *fm = NS_MICRO_THREAD::MtFrame::Instance(); \
497        if (fm && fm->GetLogAdpt() && fm->GetLogAdpt()->CheckDebug())           \
498        {                                                                       \
499           fm->GetLogAdpt()->LogDebug((char*)"[%-10s][%-4d][%-10s]"fmt,         \
500                 __FILE__, __LINE__, __FUNCTION__, ##args);                     \
501        }                                                                       \
502 } while (0)
503 
504 #define MTLOG_TRACE(fmt, args...)                                              \
505 do {                                                                           \
506        register NS_MICRO_THREAD::MtFrame *fm = NS_MICRO_THREAD::MtFrame::Instance(); \
507        if (fm && fm->GetLogAdpt() && fm->GetLogAdpt()->CheckTrace())           \
508        {                                                                       \
509           fm->GetLogAdpt()->LogTrace((char*)"[%-10s][%-4d][%-10s]"fmt,         \
510                 __FILE__, __LINE__, __FUNCTION__, ##args);                     \
511        }                                                                       \
512 } while (0)
513 
514 #define MTLOG_ERROR(fmt, args...)                                              \
515 do {                                                                           \
516        register NS_MICRO_THREAD::MtFrame *fm = NS_MICRO_THREAD::MtFrame::Instance(); \
517        if (fm && fm->GetLogAdpt() && fm->GetLogAdpt()->CheckError())           \
518        {                                                                       \
519           fm->GetLogAdpt()->LogError((char*)"[%-10s][%-4d][%-10s]"fmt,         \
520                 __FILE__, __LINE__, __FUNCTION__, ##args);                     \
521        }                                                                       \
522 } while (0)
523 
524 #define MT_ATTR_API(ATTR, VALUE)                                               \
525 do {                                                                           \
526        register NS_MICRO_THREAD::MtFrame *fm = NS_MICRO_THREAD::MtFrame::Instance(); \
527        if (fm && fm->GetLogAdpt())                                             \
528        {                                                                       \
529           fm->GetLogAdpt()->AttrReportAdd(ATTR, VALUE);                        \
530        }                                                                       \
531 } while (0)
532 
533 #define MT_ATTR_API_SET(ATTR, VALUE)                                               \
534        do {                                                                           \
535               register NS_MICRO_THREAD::MtFrame *fm = NS_MICRO_THREAD::MtFrame::Instance(); \
536               if (fm && fm->GetLogAdpt())                                              \
537               {                                                                       \
538                  fm->GetLogAdpt()->AttrReportSet(ATTR, VALUE);                          \
539               }                                                                       \
540        } while (0)
541 
542 
543 
544 }// NAMESPACE NS_MICRO_THREAD
545 
546 #endif
547 
548