xref: /f-stack/app/micro_thread/micro_thread.cpp (revision eb3a5857)
1 
2 /**
3  * Tencent is pleased to support the open source community by making MSEC available.
4  *
5  * Copyright (C) 2016 THL A29 Limited, a Tencent company. All rights reserved.
6  *
7  * Licensed under the GNU General Public License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License. You may
9  * obtain a copy of the License at
10  *
11  *     https://opensource.org/licenses/GPL-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software distributed under the
14  * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
15  * either express or implied. See the License for the specific language governing permissions
16  * and limitations under the License.
17  */
18 
19 
20 /**
21  *  @filename micro_thread.cpp
22  *  @info  micro thread manager
23  */
24 #include "mt_version.h"
25 #include "micro_thread.h"
26 #include "mt_net.h"
27 #include "valgrind.h"
28 #include <assert.h>
29 #include "mt_sys_hook.h"
30 #include "ff_hook.h"
31 #include "ff_api.h"
32 
33 using namespace NS_MICRO_THREAD;
34 
35 #define  ASSERT(statement)
36 //#define  ASSERT(statement)   assert(statement)
37 
38 extern "C"  int save_context(jmp_buf jbf);
39 
40 extern "C"  void restore_context(jmp_buf jbf, int ret);
41 
42 extern "C"  void replace_esp(jmp_buf jbf, void* esp);
43 
44 Thread::Thread(int stack_size)
45 {
46     _stack_size  = stack_size ? stack_size : ThreadPool::default_stack_size;
47     _wakeup_time = 0;
48     _stack       = NULL;
49     memset(&_jmpbuf, 0, sizeof(_jmpbuf));
50 }
51 
52 
53 /**
54  *  @brief LINUX x86/x86_64's allocated stacks.
55  */
56 bool Thread::InitStack()
57 {
58     if (_stack) {
59         return true;
60     }
61 
62     ///< stack index and memory are separated to prevent out of bounds.
63     _stack = (MtStack*)calloc(1, sizeof(MtStack));
64     if (NULL == _stack)
65     {
66         MTLOG_ERROR("calloc stack failed, size %u", sizeof(MtStack));
67         return false;
68     }
69 
70     int memsize = MEM_PAGE_SIZE*2 + _stack_size;
71     memsize = (memsize + MEM_PAGE_SIZE - 1)/MEM_PAGE_SIZE*MEM_PAGE_SIZE;
72 
73     static int zero_fd = -1;
74     int mmap_flags = MAP_PRIVATE | MAP_ANON;
75     void* vaddr = mmap(NULL, memsize, PROT_READ | PROT_WRITE, mmap_flags, zero_fd, 0);
76     if (vaddr == (void *)MAP_FAILED)
77     {
78         MTLOG_ERROR("mmap stack failed, size %d", memsize);
79         free(_stack);
80         _stack = NULL;
81         return false;
82     }
83     _stack->_vaddr = (char*)vaddr;
84     _stack->_vaddr_size = memsize;
85     _stack->_stk_size = _stack_size;
86     _stack->_stk_bottom = _stack->_vaddr + MEM_PAGE_SIZE;
87     _stack->_stk_top = _stack->_stk_bottom + _stack->_stk_size;
88     // valgrind support: register stack frame
89     _stack->valgrind_id = VALGRIND_STACK_REGISTER(_stack->_stk_bottom, _stack->_stk_top);
90 
91     _stack->_esp = _stack->_stk_top - STACK_PAD_SIZE;
92 
93     mprotect(_stack->_vaddr, MEM_PAGE_SIZE, PROT_NONE);
94     mprotect(_stack->_stk_top, MEM_PAGE_SIZE, PROT_NONE);
95 
96     return true;
97 }
98 
99 
100 void Thread::FreeStack()
101 {
102     if (!_stack) {
103         return;
104     }
105     munmap(_stack->_vaddr, _stack->_vaddr_size);
106     // valgrind support: deregister stack frame
107     VALGRIND_STACK_DEREGISTER(_stack->valgrind_id);
108     free(_stack);
109     _stack = NULL;
110 }
111 
112 void Thread::InitContext()
113 {
114     if (save_context(_jmpbuf) != 0)
115     {
116         ScheduleObj::Instance()->ScheduleStartRun();
117     }
118 
119     if (_stack != NULL)
120     {
121         replace_esp(_jmpbuf, _stack->_esp);
122     }
123 }
124 
125 void Thread::SwitchContext()
126 {
127     if (save_context(_jmpbuf) == 0)
128     {
129         ScheduleObj::Instance()->ScheduleThread();
130     }
131 }
132 
133 void Thread::RestoreContext()
134 {
135     restore_context(_jmpbuf, 1);
136 }
137 
138 
139 bool Thread::Initial()
140 {
141     if (!InitStack())
142     {
143         MTLOG_ERROR("init stack failed");
144         return false;
145     }
146 
147     InitContext();
148 
149     return true;
150 }
151 
152 void Thread::Destroy()
153 {
154     FreeStack();
155     memset(&_jmpbuf, 0, sizeof(_jmpbuf));
156 }
157 
158 void Thread::Reset()
159 {
160     _wakeup_time = 0;
161     SetPrivate(NULL);
162 
163     InitContext();
164     CleanState();
165 }
166 
167 void Thread::sleep(int ms)
168 {
169     utime64_t now = ScheduleObj::Instance()->ScheduleGetTime();
170     _wakeup_time = now + ms;
171 
172     if (save_context(_jmpbuf) == 0)
173     {
174         ScheduleObj::Instance()->ScheduleSleep();
175     }
176 }
177 
178 void Thread::Wait()
179 {
180     if (save_context(_jmpbuf) == 0)
181     {
182         ScheduleObj::Instance()->SchedulePend();
183     }
184 }
185 
186 bool Thread::CheckStackHealth(char *esp)
187 {
188     if (!_stack)
189         return false;
190 
191     if (esp > _stack->_stk_bottom && esp < _stack->_stk_top)
192         return true;
193     else
194         return false;
195 }
196 
197 MicroThread::MicroThread(ThreadType type)
198 {
199     memset(&_entry, 0, sizeof(_entry));
200     TAILQ_INIT(&_fdset);
201     TAILQ_INIT(&_sub_list);
202     _flag = NOT_INLIST;
203     _type = type;
204     _state = INITIAL;
205     _start = NULL;
206     _args = NULL;
207     _parent = NULL;
208 }
209 
210 void MicroThread::CleanState()
211 {
212     TAILQ_INIT(&_fdset);
213     TAILQ_INIT(&_sub_list);
214     _flag = NOT_INLIST;
215     _type = NORMAL;
216     _state = INITIAL;
217     _start = NULL;
218     _args = NULL;
219     _parent = NULL;
220 }
221 
222 void MicroThread::Run()
223 {
224     if (_start) {
225         _start(_args);
226     }
227 
228     if (this->IsSubThread()) {
229         this->WakeupParent();
230     }
231 
232     ScheduleObj::Instance()->ScheduleReclaim();
233     ScheduleObj::Instance()->ScheduleThread();
234 }
235 
236 void MicroThread::WakeupParent()
237 {
238     MicroThread* parent = this->GetParent();
239     if (parent)
240     {
241         parent->RemoveSubThread(this);
242         if (parent->HasNoSubThread())
243         {
244             ScheduleObj::Instance()->ScheduleUnpend(parent);
245         }
246     }
247     else
248     {
249         MTLOG_ERROR("Sub thread no parent, error");
250     }
251 }
252 
253 bool MicroThread::HasNoSubThread()
254 {
255     return TAILQ_EMPTY(&_sub_list);
256 }
257 
258 void MicroThread::AddSubThread(MicroThread* sub)
259 {
260     ASSERT(!sub->HasFlag(MicroThread::SUB_LIST));
261     if (!sub->HasFlag(MicroThread::SUB_LIST))
262     {
263         TAILQ_INSERT_TAIL(&_sub_list, sub, _sub_entry);
264         sub->_parent = this;
265     }
266 
267     sub->SetFlag(MicroThread::SUB_LIST);
268 }
269 
270 void MicroThread::RemoveSubThread(MicroThread* sub)
271 {
272     ASSERT(sub->HasFlag(MicroThread::SUB_LIST));
273     if (sub->HasFlag(MicroThread::SUB_LIST))
274     {
275         TAILQ_REMOVE(&_sub_list, sub, _sub_entry);
276         sub->_parent = NULL;
277     }
278 
279     sub->UnsetFlag(MicroThread::SUB_LIST);
280 }
281 
282 ScheduleObj *ScheduleObj::_instance = NULL;
283 inline ScheduleObj* ScheduleObj::Instance()
284 {
285     if (NULL == _instance)
286     {
287         _instance = new ScheduleObj();
288     }
289 
290     return _instance;
291 }
292 
293 void ScheduleObj::ScheduleThread()
294 {
295     MtFrame* frame = MtFrame::Instance();
296     frame->ThreadSchdule();
297 }
298 
299 utime64_t ScheduleObj::ScheduleGetTime()
300 {
301     MtFrame* frame = MtFrame::Instance();
302     if (frame)
303     {
304         return frame->GetLastClock();
305     }
306     else
307     {
308         MTLOG_ERROR("frame time failed, maybe not init");
309         return 0;
310     }
311 }
312 
313 void ScheduleObj::ScheduleSleep()
314 {
315     MtFrame* frame = MtFrame::Instance();
316     MicroThread* thread = frame->GetActiveThread();
317     if ((!frame) || (!thread)) {
318         MTLOG_ERROR("frame and act thread null, %p, %p", frame, thread);
319         return;
320     }
321 
322     frame->InsertSleep(thread);
323     frame->ThreadSchdule();
324 }
325 
326 void ScheduleObj::SchedulePend()
327 {
328     MtFrame* frame = MtFrame::Instance();
329     MicroThread* thread = frame->GetActiveThread();
330     if ((!frame) || (!thread)) {
331         MTLOG_ERROR("frame and act thread null, %p, %p", frame, thread);
332         return;
333     }
334 
335     frame->InsertPend(thread);
336     frame->ThreadSchdule();
337 }
338 
339 void ScheduleObj::ScheduleUnpend(void* pthread)
340 {
341     MtFrame* frame = MtFrame::Instance();
342     MicroThread* thread = (MicroThread*)pthread;
343     if ((!frame) || (!thread)) {
344         MTLOG_ERROR("frame and act thread null, %p, %p", frame, thread);
345         return;
346     }
347 
348     frame->RemovePend(thread);
349     frame->InsertRunable(thread);
350 }
351 
352 void ScheduleObj::ScheduleReclaim()
353 {
354     MtFrame* frame = MtFrame::Instance();
355     MicroThread* thread = frame->GetActiveThread();
356     if ((!frame) || (!thread)) {
357         MTLOG_ERROR("frame and act thread null, %p, %p", frame, thread);
358         return;
359     }
360 
361     frame->FreeThread(thread);
362 }
363 
364 void ScheduleObj::ScheduleStartRun()
365 {
366     MtFrame* frame = MtFrame::Instance();
367     MicroThread* thread = frame->GetActiveThread();
368     if ((!frame) || (!thread)) {
369         MTLOG_ERROR("frame and act thread null, %p, %p", frame, thread);
370         return;
371     }
372 
373     thread->Run();
374 }
375 
376 
377 unsigned int ThreadPool::default_thread_num = DEFAULT_THREAD_NUM;   ///< 2000 micro threads.
378 unsigned int ThreadPool::default_stack_size = DEFAULT_STACK_SIZE;   ///< 128k stack.
379 
380 bool ThreadPool::InitialPool(int max_num)
381 {
382     MicroThread *thread = NULL;
383     for (unsigned int i = 0; i < default_thread_num; i++)
384     {
385         thread = new MicroThread();
386         if ((NULL == thread) || (false == thread->Initial()))
387         {
388             MTLOG_ERROR("init pool, thread %p init failed", thread);
389             if (thread)  delete thread;
390             continue;
391         }
392         thread->SetFlag(MicroThread::FREE_LIST);
393         _freelist.push(thread);
394     }
395 
396     _total_num = _freelist.size();
397     _max_num  = max_num;
398     _use_num = 0;
399     if (_total_num <= 0)
400     {
401         return false;
402     }
403     else
404     {
405         return true;
406     }
407 }
408 
409 void ThreadPool::DestroyPool()
410 {
411     MicroThread* thread = NULL;
412     while (!_freelist.empty())
413     {
414         thread = _freelist.front();
415         _freelist.pop();
416         thread->Destroy();
417         delete thread;
418     }
419 
420     _total_num = 0;
421     _use_num = 0;
422 }
423 
424 MicroThread* ThreadPool::AllocThread()
425 {
426     MT_ATTR_API_SET(492069, _total_num);
427 
428     MicroThread* thread = NULL;
429     if (!_freelist.empty())
430     {
431         thread = _freelist.front();
432         _freelist.pop();
433 
434         ASSERT(thread->HasFlag(MicroThread::FREE_LIST));
435 
436         thread->UnsetFlag(MicroThread::FREE_LIST);
437         _use_num++;
438         return thread;
439     }
440 
441     MT_ATTR_API(320846, 1); // pool no nore
442     if (_total_num >= _max_num)
443     {
444         MT_ATTR_API(361140, 1); // no more quota
445         return NULL;
446     }
447 
448     thread = new MicroThread();
449     if ((NULL == thread) || (false == thread->Initial()))
450     {
451         MT_ATTR_API(320847, 1); // pool init fail
452         MTLOG_ERROR("thread alloc failed, thread: %p", thread);
453         if (thread)  delete thread;
454         return NULL;
455     }
456     _total_num++;
457     _use_num++;
458 
459     return thread;
460 }
461 
462 void ThreadPool::FreeThread(MicroThread* thread)
463 {
464     ASSERT(!thread->HasFlag(MicroThread::FREE_LIST));
465     thread->Reset();
466     _use_num--;
467     _freelist.push(thread);
468     thread->SetFlag(MicroThread::FREE_LIST);
469 
470     unsigned int free_num = _freelist.size();
471     if ((free_num > default_thread_num) && (free_num > 1))
472     {
473         thread = _freelist.front();
474         _freelist.pop();
475         thread->Destroy();
476         delete thread;
477         _total_num--;
478     }
479 }
480 
481 int ThreadPool::GetUsedNum(void)
482 {
483     return _use_num;
484 }
485 
486 MtFrame *MtFrame::_instance = NULL;
487 inline MtFrame* MtFrame::Instance ()
488 {
489     if (NULL == _instance )
490     {
491         _instance = new MtFrame();
492     }
493 
494     return _instance;
495 }
496 
497 void MtFrame::SetHookFlag() {
498     mt_set_hook_flag();
499 };
500 
501 bool MtFrame::InitFrame(LogAdapter* logadpt, int max_thread_num)
502 {
503     _log_adpt = logadpt;
504 
505     if ((this->InitKqueue(max_thread_num) < 0) || !this->InitialPool(max_thread_num))
506     {
507         MTLOG_ERROR("Init epoll or thread pool failed");
508         this->Destroy();
509         return false;
510     }
511     if (_sleeplist.HeapResize(max_thread_num * 2) < 0)
512     {
513         MTLOG_ERROR("Init heap list failed");
514         this->Destroy();
515         return false;
516     }
517 
518     _timer = new CTimerMng(max_thread_num * 2);
519     if (NULL == _timer)
520     {
521         MTLOG_ERROR("Init heap timer failed");
522         this->Destroy();
523         return false;
524     }
525 
526     _daemon = AllocThread();
527     if (NULL == _daemon)
528     {
529         MTLOG_ERROR("Alloc daemon thread failed");
530         this->Destroy();
531         return false;
532     }
533     _daemon->SetType(MicroThread::DAEMON);
534     _daemon->SetState(MicroThread::RUNABLE);
535     _daemon->SetSartFunc(MtFrame::DaemonRun, this);
536 
537     _primo = new MicroThread(MicroThread::PRIMORDIAL);
538     if (NULL == _primo)
539     {
540         MTLOG_ERROR("new _primo thread failed");
541         this->Destroy();
542         return false;
543     }
544     _primo->SetState(MicroThread::RUNNING);
545     SetActiveThread(_primo);
546 
547     _last_clock = GetSystemMS();
548     TAILQ_INIT(&_iolist);
549     TAILQ_INIT(&_pend_list);
550 
551     //SetHookFlag();
552 
553     return true;
554 
555 }
556 
557 void MtFrame::Destroy(void)
558 {
559     if (NULL == _instance )
560     {
561         return;
562     }
563 
564     if (_primo) {
565         delete _primo;
566         _primo = NULL;
567     }
568 
569     if (_daemon) {
570         FreeThread(_daemon);
571         _daemon = NULL;
572     }
573 
574     TAILQ_INIT(&_iolist);
575 
576     MicroThread* thread = dynamic_cast<MicroThread*>(_sleeplist.HeapPop());
577     while (thread)
578     {
579         FreeThread(thread);
580         thread = dynamic_cast<MicroThread*>(_sleeplist.HeapPop());
581     }
582 
583     while (!_runlist.empty())
584     {
585         thread = _runlist.front();
586         _runlist.pop();
587         FreeThread(thread);
588     }
589 
590     MicroThread* tmp;
591     TAILQ_FOREACH_SAFE(thread, &_pend_list, _entry, tmp)
592     {
593         TAILQ_REMOVE(&_pend_list, thread, _entry);
594         FreeThread(thread);
595     }
596 
597     if (_timer != NULL)
598     {
599         delete _timer;
600         _timer = NULL;
601     }
602 
603     _instance->DestroyPool();
604     _instance->TermKqueue();
605     delete _instance;
606     _instance = NULL;
607 }
608 
609 char* MtFrame::Version()
610 {
611     return IMT_VERSION;
612 }
613 
614 MicroThread* MtFrame::CreateThread(ThreadStart entry, void *args, bool runable)
615 {
616     MtFrame* mtframe = MtFrame::Instance();
617     MicroThread* thread = mtframe->AllocThread();
618     if (NULL == thread)
619     {
620         MTLOG_ERROR("create thread failed");
621         return NULL;
622     }
623     thread->SetSartFunc(entry, args);
624 
625     if (runable) {
626         mtframe->InsertRunable(thread);
627     }
628 
629     return thread;
630 }
631 
632 int MtFrame::Loop(void* args)
633 {
634     MtFrame* mtframe = MtFrame::Instance();
635     MicroThread* daemon = mtframe->DaemonThread();
636 
637     mtframe->KqueueDispatch();
638     mtframe->SetLastClock(mtframe->GetSystemMS());
639     mtframe->WakeupTimeout();
640     mtframe->CheckExpired();
641     daemon->SwitchContext();
642 
643     return 0;
644 }
645 
646 void MtFrame::DaemonRun(void* args)
647 {
648     /*
649     MtFrame* mtframe = MtFrame::Instance();
650     MicroThread* daemon = mtframe->DaemonThread();
651 
652     while (true) {
653         mtframe->KqueueDispatch();
654         mtframe->SetLastClock(mtframe->GetSystemMS());
655         mtframe->WakeupTimeout();
656         mtframe->CheckExpired();
657         daemon->SwitchContext();
658     }
659     */
660     ff_run(MtFrame::Loop, NULL);
661 }
662 
663 MicroThread *MtFrame::GetRootThread()
664 {
665     if (NULL == _curr_thread)
666     {
667         return NULL;
668     }
669 
670     MicroThread::ThreadType type = _curr_thread->GetType();
671     MicroThread *thread = _curr_thread;
672     MicroThread *parent = thread;
673 
674     while (MicroThread::SUB_THREAD == type)
675     {
676         thread = thread->GetParent();
677         if (!thread)
678         {
679             break;
680         }
681 
682         type   = thread->GetType();
683         parent = thread;
684     }
685 
686     return parent;
687 }
688 
689 void MtFrame::ThreadSchdule()
690 {
691     MicroThread* thread = NULL;
692     MtFrame* mtframe = MtFrame::Instance();
693 
694     if (mtframe->_runlist.empty())
695     {
696         thread = mtframe->DaemonThread();
697     }
698     else
699     {
700         thread = mtframe->_runlist.front();
701         mtframe->RemoveRunable(thread);
702     }
703 
704     this->SetActiveThread(thread);
705     thread->SetState(MicroThread::RUNNING);
706     thread->RestoreContext();
707 }
708 
709 void MtFrame::CheckExpired()
710 {
711     static utime64_t check_time = 0;
712 
713     if (_timer != NULL)
714     {
715         _timer->check_expired();
716     }
717 
718    utime64_t now = GetLastClock();
719 
720     if ((now - check_time) > 1000)
721     {
722         CNetMgr::Instance()->RecycleObjs(now);
723         check_time = now;
724     }
725 }
726 
727 void MtFrame::WakeupTimeout()
728 {
729     utime64_t now = GetLastClock();
730     MicroThread* thread = dynamic_cast<MicroThread*>(_sleeplist.HeapTop());
731     while (thread && (thread->GetWakeupTime() <= now))
732     {
733         if (thread->HasFlag(MicroThread::IO_LIST))
734         {
735             RemoveIoWait(thread);
736         }
737         else
738         {
739             RemoveSleep(thread);
740         }
741 
742         InsertRunable(thread);
743 
744         thread = dynamic_cast<MicroThread*>(_sleeplist.HeapTop());
745     }
746 }
747 
748 int MtFrame::KqueueGetTimeout()
749 {
750     utime64_t now = GetLastClock();
751     MicroThread* thread = dynamic_cast<MicroThread*>(_sleeplist.HeapTop());
752     if (!thread)
753     {
754         return 10; //default 10ms epollwait
755     }
756     else if (thread->GetWakeupTime() < now)
757     {
758         return 0;
759     }
760     else
761     {
762         return (int)(thread->GetWakeupTime() - now);
763     }
764 }
765 
766 inline void MtFrame::InsertSleep(MicroThread* thread)
767 {
768     ASSERT(!thread->HasFlag(MicroThread::SLEEP_LIST));
769 
770     thread->SetFlag(MicroThread::SLEEP_LIST);
771     thread->SetState(MicroThread::SLEEPING);
772     int rc = _sleeplist.HeapPush(thread);
773     if (rc < 0)
774     {
775         MT_ATTR_API(320848, 1); // heap error
776         MTLOG_ERROR("Insert heap failed , rc %d", rc);
777     }
778 }
779 
780 inline void MtFrame::RemoveSleep(MicroThread* thread)
781 {
782     ASSERT(thread->HasFlag(MicroThread::SLEEP_LIST));
783     thread->UnsetFlag(MicroThread::SLEEP_LIST);
784 
785     int rc = _sleeplist.HeapDelete(thread);
786     if (rc < 0)
787     {
788         MT_ATTR_API(320849, 1); // heap error
789         MTLOG_ERROR("remove heap failed , rc %d", rc);
790     }
791 }
792 
793 inline void MtFrame::InsertIoWait(MicroThread* thread)
794 {
795     ASSERT(!thread->HasFlag(MicroThread::IO_LIST));
796     thread->SetFlag(MicroThread::IO_LIST);
797     TAILQ_INSERT_TAIL(&_iolist, thread, _entry);
798     InsertSleep(thread);
799 }
800 
801 void MtFrame::RemoveIoWait(MicroThread* thread)
802 {
803     ASSERT(thread->HasFlag(MicroThread::IO_LIST));
804     thread->UnsetFlag(MicroThread::IO_LIST);
805     TAILQ_REMOVE(&_iolist, thread, _entry);
806 
807     RemoveSleep(thread);
808 }
809 
810 void MtFrame::InsertRunable(MicroThread* thread)
811 {
812     ASSERT(!thread->HasFlag(MicroThread::RUN_LIST));
813     thread->SetFlag(MicroThread::RUN_LIST);
814 
815     thread->SetState(MicroThread::RUNABLE);
816     _runlist.push(thread);
817     _waitnum++;
818 }
819 
820 inline void MtFrame::RemoveRunable(MicroThread* thread)
821 {
822     ASSERT(thread->HasFlag(MicroThread::RUN_LIST));
823     ASSERT(thread == _runlist.front());
824     thread->UnsetFlag(MicroThread::RUN_LIST);
825 
826     _runlist.pop();
827     _waitnum--;
828 }
829 
830 void MtFrame::InsertPend(MicroThread* thread)
831 {
832     ASSERT(!thread->HasFlag(MicroThread::PEND_LIST));
833     thread->SetFlag(MicroThread::PEND_LIST);
834     TAILQ_INSERT_TAIL(&_pend_list, thread, _entry);
835     thread->SetState(MicroThread::PENDING);
836 }
837 
838 void MtFrame::RemovePend(MicroThread* thread)
839 {
840     ASSERT(thread->HasFlag(MicroThread::PEND_LIST));
841     thread->UnsetFlag(MicroThread::PEND_LIST);
842     TAILQ_REMOVE(&_pend_list, thread, _entry);
843 }
844 
845 void MtFrame::WaitNotify(utime64_t timeout)
846 {
847     MicroThread* thread = GetActiveThread();
848 
849     thread->SetWakeupTime(timeout + this->GetLastClock());
850     this->InsertIoWait(thread);
851     thread->SwitchContext();
852 }
853 
854 bool MtFrame::KqueueSchedule(KqObjList* fdlist, KqueuerObj* fd, int timeout)
855 {
856     MicroThread* thread = GetActiveThread();
857     if (NULL == thread)
858     {
859         MTLOG_ERROR("active thread null, epoll schedule failed");
860         return false;
861     }
862 
863     thread->ClearAllFd();
864     if (fdlist)
865     {
866         thread->AddFdList(fdlist);
867     }
868     if (fd)
869     {
870         thread->AddFd(fd);
871     }
872 
873     thread->SetWakeupTime(timeout + this->GetLastClock());
874     if (!this->KqueueAdd(thread->GetFdSet()))
875     {
876         MTLOG_ERROR("epoll add failed, errno: %d", errno);
877         return false;
878     }
879     this->InsertIoWait(thread);
880     thread->SwitchContext();
881 
882     int rcvnum = 0;
883     KqObjList& rcvfds = thread->GetFdSet();
884     KqueuerObj* fdata = NULL;
885     TAILQ_FOREACH(fdata, &rcvfds, _entry)
886     {
887         if (fdata->GetRcvEvents() != 0)
888         {
889             rcvnum++;
890         }
891     }
892     this->KqueueDel(rcvfds);
893 
894     if (rcvnum == 0)
895     {
896         errno = ETIME;
897         return false;
898     }
899 
900     return true;
901 }
902 
903 int MtFrame::recvfrom(int fd, void *buf, int len, int flags, struct sockaddr *from, socklen_t *fromlen, int timeout)
904 {
905     MtFrame* mtframe = MtFrame::Instance();
906     utime64_t start = mtframe->GetLastClock();
907     MicroThread* thread = mtframe->GetActiveThread();
908     utime64_t now = 0;
909 
910     if(fd<0 || !buf || len<1)
911     {
912         errno = EINVAL;
913         MTLOG_ERROR("recvfrom failed, errno: %d (%m)", errno);
914         return -10;
915     }
916 
917     if (timeout <= -1)
918     {
919         timeout = 0x7fffffff;
920     }
921 
922     while (true)
923     {
924         now = mtframe->GetLastClock();
925         if ((int)(now - start) > timeout)
926         {
927             errno = ETIME;
928             return -1;
929         }
930 
931         KqueuerObj epfd;
932         epfd.SetOsfd(fd);
933         epfd.EnableInput();
934         epfd.SetOwnerThread(thread);
935         if (!mtframe->KqueueSchedule(NULL, &epfd, timeout))
936         {
937             MTLOG_DEBUG("epoll schedule failed, errno: %d", errno);
938             return -2;
939         }
940 
941         mt_hook_syscall(recvfrom);
942         int n = ff_hook_recvfrom(fd, buf, len, flags, from, fromlen);
943         if (n < 0)
944         {
945             if (errno == EINTR) {
946                 continue;
947             }
948 
949             if ((errno != EAGAIN) && (errno != EWOULDBLOCK))
950             {
951                 MTLOG_ERROR("recvfrom failed, errno: %d", errno);
952                 return -3;
953             }
954         }
955         else
956         {
957             return n;
958         }
959     }
960 
961 }
962 
963 int MtFrame::sendto(int fd, const void *msg, int len, int flags, const struct sockaddr *to, int tolen, int timeout)
964 {
965     MtFrame* mtframe = MtFrame::Instance();
966     utime64_t start = mtframe->GetLastClock();
967     MicroThread* thread = mtframe->GetActiveThread();
968     utime64_t now = 0;
969 
970     if(fd<0 || !msg || len<1)
971     {
972         errno = EINVAL;
973         MTLOG_ERROR("sendto failed, errno: %d (%m)", errno);
974         return -10;
975     }
976 
977     int n = 0;
978     mt_hook_syscall(sendto);
979     while ((n = ff_hook_sendto(fd, msg, len, flags, to, tolen)) < 0)
980     {
981         now = mtframe->GetLastClock();
982         if ((int)(now - start) > timeout)
983         {
984             errno = ETIME;
985             return -1;
986         }
987 
988         if (errno == EINTR) {
989             continue;
990         }
991 
992         if ((errno != EAGAIN) && (errno != EWOULDBLOCK)) {
993             MTLOG_ERROR("sendto failed, errno: %d", errno);
994             return -2;
995         }
996 
997         KqueuerObj epfd;
998         epfd.SetOsfd(fd);
999         epfd.EnableOutput();
1000         epfd.SetOwnerThread(thread);
1001         if (!mtframe->KqueueSchedule(NULL, &epfd, timeout)) {
1002             return -3;
1003         }
1004     }
1005 
1006     return n;
1007 }
1008 
1009 int MtFrame::connect(int fd, const struct sockaddr *addr, int addrlen, int timeout)
1010 {
1011     MtFrame* mtframe = MtFrame::Instance();
1012     utime64_t start = mtframe->GetLastClock();
1013     MicroThread* thread = mtframe->GetActiveThread();
1014     utime64_t now = 0;
1015 
1016     if(fd<0 || !addr || addrlen<1)
1017     {
1018         errno = EINVAL;
1019         MTLOG_ERROR("connect failed, errno: %d (%m)", errno);
1020         return -10;
1021     }
1022 
1023     int n = 0;
1024     mt_hook_syscall(connect);
1025     while ((n = ff_hook_connect(fd, addr, addrlen)) < 0)
1026     {
1027         now = mtframe->GetLastClock();
1028         if ((int)(now - start) > timeout)
1029         {
1030             errno = ETIME;
1031             return -1;
1032         }
1033 
1034         if (errno == EISCONN)
1035         {
1036             return 0;
1037         }
1038 
1039         if (errno == EINTR) {
1040             continue;
1041         }
1042 
1043         if (errno != EINPROGRESS) {
1044             MTLOG_ERROR("connect failed, errno: %d", errno);
1045             return -2;
1046         }
1047 
1048         KqueuerObj epfd;
1049         epfd.SetOsfd(fd);
1050         epfd.EnableOutput();
1051         epfd.SetOwnerThread(thread);
1052         if (!mtframe->KqueueSchedule(NULL, &epfd, timeout)) {
1053             return -3;
1054         }
1055     }
1056 
1057     return n;
1058 }
1059 
1060 int MtFrame::accept(int fd, struct sockaddr *addr, socklen_t *addrlen, int timeout)
1061 {
1062     MtFrame* mtframe = MtFrame::Instance();
1063     utime64_t start = mtframe->GetLastClock();
1064     MicroThread* thread = mtframe->GetActiveThread();
1065     utime64_t now = 0;
1066 
1067     if(fd<0)
1068     {
1069         errno = EINVAL;
1070         MTLOG_ERROR("accept failed, errno: %d (%m)", errno);
1071         return -10;
1072     }
1073 
1074     int acceptfd = 0;
1075     mt_hook_syscall(accept);
1076     while ((acceptfd = ff_hook_accept(fd, addr, addrlen)) < 0)
1077     {
1078         now = mtframe->GetLastClock();
1079         if ((int)(now - start) > timeout)
1080         {
1081             errno = ETIME;
1082             return -1;
1083         }
1084 
1085         if (errno == EINTR) {
1086             continue;
1087         }
1088 
1089         if (!((errno == EAGAIN) || (errno == EWOULDBLOCK))) {
1090             MTLOG_ERROR("accept failed, errno: %d", errno);
1091             return -2;
1092         }
1093 
1094         KqueuerObj epfd;
1095         epfd.SetOsfd(fd);
1096         epfd.EnableInput();
1097         epfd.SetOwnerThread(thread);
1098         if (!mtframe->KqueueSchedule(NULL, &epfd, timeout)) {
1099             return -3;
1100         }
1101     }
1102 
1103     return acceptfd;
1104 }
1105 
1106 ssize_t MtFrame::read(int fd, void *buf, size_t nbyte, int timeout)
1107 {
1108     MtFrame* mtframe = MtFrame::Instance();
1109     utime64_t start = mtframe->GetLastClock();
1110     MicroThread* thread = mtframe->GetActiveThread();
1111     utime64_t now = 0;
1112 
1113     if(fd<0 || !buf || nbyte<1)
1114     {
1115         errno = EINVAL;
1116         MTLOG_ERROR("read failed, errno: %d (%m)", errno);
1117         return -10;
1118     }
1119 
1120     ssize_t n = 0;
1121     mt_hook_syscall(read);
1122     while ((n = ff_hook_read(fd, buf, nbyte)) < 0)
1123     {
1124         now = mtframe->GetLastClock();
1125         if ((int)(now - start) > timeout)
1126         {
1127             errno = ETIME;
1128             return -1;
1129         }
1130 
1131         if (errno == EINTR) {
1132             continue;
1133         }
1134 
1135         if ((errno != EAGAIN) && (errno != EWOULDBLOCK)) {
1136             MTLOG_ERROR("read failed, errno: %d", errno);
1137             return -2;
1138         }
1139 
1140         KqueuerObj epfd;
1141         epfd.SetOsfd(fd);
1142         epfd.EnableInput();
1143         epfd.SetOwnerThread(thread);
1144         if (!mtframe->KqueueSchedule(NULL, &epfd, timeout)) {
1145             return -3;
1146         }
1147     }
1148 
1149     return n;
1150 }
1151 
1152 ssize_t MtFrame::write(int fd, const void *buf, size_t nbyte, int timeout)
1153 {
1154     MtFrame* mtframe = MtFrame::Instance();
1155     utime64_t start = mtframe->GetLastClock();
1156     MicroThread* thread = mtframe->GetActiveThread();
1157     utime64_t now = 0;
1158 
1159     if(fd<0 || !buf || nbyte<1)
1160     {
1161         errno = EINVAL;
1162         MTLOG_ERROR("write failed, errno: %d (%m)", errno);
1163         return -10;
1164     }
1165 
1166     ssize_t n = 0;
1167     size_t send_len = 0;
1168     while (send_len < nbyte)
1169     {
1170         now = mtframe->GetLastClock();
1171         if ((int)(now - start) > timeout)
1172         {
1173             errno = ETIME;
1174             return -1;
1175         }
1176 
1177         mt_hook_syscall(write);
1178         n = ff_hook_write(fd, (char*)buf + send_len, nbyte - send_len);
1179         if (n < 0)
1180         {
1181             if (errno == EINTR) {
1182                 continue;
1183             }
1184 
1185             if ((errno != EAGAIN) && (errno != EWOULDBLOCK)) {
1186                 MTLOG_ERROR("write failed, errno: %d", errno);
1187                 return -2;
1188             }
1189         }
1190         else
1191         {
1192             send_len += n;
1193             if (send_len >= nbyte) {
1194                 return nbyte;
1195             }
1196         }
1197 
1198         KqueuerObj epfd;
1199         epfd.SetOsfd(fd);
1200         epfd.EnableOutput();
1201         epfd.SetOwnerThread(thread);
1202         if (!mtframe->KqueueSchedule(NULL, &epfd, timeout)) {
1203             return -3;
1204         }
1205     }
1206 
1207     return nbyte;
1208 }
1209 
1210 int MtFrame::recv(int fd, void *buf, int len, int flags, int timeout)
1211 {
1212     MtFrame* mtframe = MtFrame::Instance();
1213     utime64_t start = mtframe->GetLastClock();
1214     MicroThread* thread = mtframe->GetActiveThread();
1215     utime64_t now = 0;
1216 
1217     if(fd<0 || !buf || len<1)
1218     {
1219         errno = EINVAL;
1220         MTLOG_ERROR("recv failed, errno: %d (%m)", errno);
1221         return -10;
1222     }
1223 
1224     if (timeout <= -1)
1225     {
1226         timeout = 0x7fffffff;
1227     }
1228 
1229     while (true)
1230     {
1231         now = mtframe->GetLastClock();
1232         if ((int)(now - start) > timeout)
1233         {
1234             errno = ETIME;
1235             return -1;
1236         }
1237 
1238         KqueuerObj epfd;
1239         epfd.SetOsfd(fd);
1240         epfd.EnableInput();
1241         epfd.SetOwnerThread(thread);
1242         if (!mtframe->KqueueSchedule(NULL, &epfd, timeout))
1243         {
1244             MTLOG_DEBUG("epoll schedule failed, errno: %d", errno);
1245             return -2;
1246         }
1247 
1248         mt_hook_syscall(recv);
1249         int n = ff_hook_recv(fd, buf, len, flags);
1250         if (n < 0)
1251         {
1252             if (errno == EINTR) {
1253                 continue;
1254             }
1255 
1256             if ((errno != EAGAIN) && (errno != EWOULDBLOCK))
1257             {
1258                 MTLOG_ERROR("recv failed, errno: %d", errno);
1259                 return -3;
1260             }
1261         }
1262         else
1263         {
1264             return n;
1265         }
1266     }
1267 
1268 }
1269 
1270 ssize_t MtFrame::send(int fd, const void *buf, size_t nbyte, int flags, int timeout)
1271 {
1272     MtFrame* mtframe = MtFrame::Instance();
1273     utime64_t start = mtframe->GetLastClock();
1274     MicroThread* thread = mtframe->GetActiveThread();
1275     utime64_t now = 0;
1276 
1277     if(fd<0 || !buf || nbyte<1)
1278     {
1279         errno = EINVAL;
1280         MTLOG_ERROR("send failed, errno: %d (%m)", errno);
1281         return -10;
1282     }
1283 
1284     ssize_t n = 0;
1285     size_t send_len = 0;
1286     while (send_len < nbyte)
1287     {
1288         now = mtframe->GetLastClock();
1289         if ((int)(now - start) > timeout)
1290         {
1291             errno = ETIME;
1292             return -1;
1293         }
1294 
1295         mt_hook_syscall(send);
1296         n = ff_hook_send(fd, (char*)buf + send_len, nbyte - send_len, flags);
1297         if (n < 0)
1298         {
1299             if (errno == EINTR) {
1300                 continue;
1301             }
1302 
1303             if ((errno != EAGAIN) && (errno != EWOULDBLOCK)) {
1304                 MTLOG_ERROR("write failed, errno: %d", errno);
1305                 return -2;
1306             }
1307         }
1308         else
1309         {
1310             send_len += n;
1311             if (send_len >= nbyte) {
1312                 return nbyte;
1313             }
1314         }
1315 
1316         KqueuerObj epfd;
1317         epfd.SetOsfd(fd);
1318         epfd.EnableOutput();
1319         epfd.SetOwnerThread(thread);
1320         if (!mtframe->KqueueSchedule(NULL, &epfd, timeout)) {
1321             return -3;
1322         }
1323     }
1324 
1325     return nbyte;
1326 }
1327 
1328 void MtFrame::sleep(int ms)
1329 {
1330     MtFrame* frame = MtFrame::Instance();
1331     MicroThread* thread = frame->GetActiveThread();
1332     if (thread != NULL)
1333     {
1334         thread->sleep(ms);
1335     }
1336 }
1337 
1338 int MtFrame::WaitEvents(int fd, int events, int timeout)
1339 {
1340     MtFrame* mtframe = MtFrame::Instance();
1341     utime64_t start = mtframe->GetLastClock();
1342     MicroThread* thread = mtframe->GetActiveThread();
1343     utime64_t now = 0;
1344 
1345     if (timeout <= -1)
1346     {
1347         timeout = 0x7fffffff;
1348     }
1349 
1350     while (true)
1351     {
1352         now = mtframe->GetLastClock();
1353         if ((int)(now - start) > timeout)
1354         {
1355             errno = ETIME;
1356             return 0;
1357         }
1358 
1359         KqueuerObj epfd;
1360         epfd.SetOsfd(fd);
1361         if (events & KQ_EVENT_READ)
1362         {
1363             epfd.EnableInput();
1364         }
1365         if (events & KQ_EVENT_WRITE)
1366         {
1367             epfd.EnableOutput();
1368         }
1369         epfd.SetOwnerThread(thread);
1370 
1371         if (!mtframe->KqueueSchedule(NULL, &epfd, timeout))
1372         {
1373             MTLOG_TRACE("epoll schedule failed, errno: %d", errno);
1374             return 0;
1375         }
1376 
1377         return epfd.GetRcvEvents();
1378     }
1379 }
1380