xref: /f-stack/app/micro_thread/micro_thread.cpp (revision 73bdce41)
1 
2 /**
3  * Tencent is pleased to support the open source community by making MSEC available.
4  *
5  * Copyright (C) 2016 THL A29 Limited, a Tencent company. All rights reserved.
6  *
7  * Licensed under the GNU General Public License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License. You may
9  * obtain a copy of the License at
10  *
11  *     https://opensource.org/licenses/GPL-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software distributed under the
14  * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
15  * either express or implied. See the License for the specific language governing permissions
16  * and limitations under the License.
17  */
18 
19 
20 /**
21  *  @filename micro_thread.cpp
22  *  @info  micro thread manager
23  */
24 #include "mt_version.h"
25 #include "micro_thread.h"
26 #include "mt_net.h"
27 #include "valgrind.h"
28 #include <assert.h>
29 #include "mt_sys_hook.h"
30 #include "ff_hook.h"
31 #include "ff_api.h"
32 
33 using namespace NS_MICRO_THREAD;
34 
35 #define  ASSERT(statement)
36 //#define  ASSERT(statement)   assert(statement)
37 
38 extern "C"  int save_context(jmp_buf jbf);
39 
40 extern "C"  void restore_context(jmp_buf jbf, int ret);
41 
42 extern "C"  void replace_esp(jmp_buf jbf, void* esp);
43 
44 Thread::Thread(int stack_size)
45 {
46     _stack_size  = stack_size ? stack_size : ThreadPool::default_stack_size;
47     _wakeup_time = 0;
48     _stack       = NULL;
49     memset(&_jmpbuf, 0, sizeof(_jmpbuf));
50 }
51 
52 static DefaultLogAdapter def_log_adapt;
53 /**
54  *  @brief LINUX x86/x86_64's allocated stacks.
55  */
56 bool Thread::InitStack()
57 {
58     if (_stack) {
59         return true;
60     }
61 
62     ///< stack index and memory are separated to prevent out of bounds.
63     _stack = (MtStack*)calloc(1, sizeof(MtStack));
64     if (NULL == _stack)
65     {
66         MTLOG_ERROR("calloc stack failed, size %u", sizeof(MtStack));
67         return false;
68     }
69 
70     int memsize = MEM_PAGE_SIZE*2 + _stack_size;
71     memsize = (memsize + MEM_PAGE_SIZE - 1)/MEM_PAGE_SIZE*MEM_PAGE_SIZE;
72 
73     static int zero_fd = -1;
74     int mmap_flags = MAP_PRIVATE | MAP_ANON;
75     void* vaddr = mmap(NULL, memsize, PROT_READ | PROT_WRITE, mmap_flags, zero_fd, 0);
76     if (vaddr == (void *)MAP_FAILED)
77     {
78         MTLOG_ERROR("mmap stack failed, size %d,errmsg: %s.", memsize,strerror(errno));
79         free(_stack);
80         _stack = NULL;
81         return false;
82     }
83     _stack->_vaddr = (char*)vaddr;
84     _stack->_vaddr_size = memsize;
85     _stack->_stk_size = _stack_size;
86     _stack->_stk_bottom = _stack->_vaddr + MEM_PAGE_SIZE;
87     _stack->_stk_top = _stack->_stk_bottom + _stack->_stk_size;
88     // valgrind support: register stack frame
89     _stack->valgrind_id = VALGRIND_STACK_REGISTER(_stack->_stk_bottom, _stack->_stk_top);
90 
91     _stack->_esp = _stack->_stk_top - STACK_PAD_SIZE;
92 
93     mprotect(_stack->_vaddr, MEM_PAGE_SIZE, PROT_NONE);
94     mprotect(_stack->_stk_top, MEM_PAGE_SIZE, PROT_NONE);
95 
96     return true;
97 }
98 
99 
100 void Thread::FreeStack()
101 {
102     if (!_stack) {
103         return;
104     }
105     munmap(_stack->_vaddr, _stack->_vaddr_size);
106     // valgrind support: deregister stack frame
107     VALGRIND_STACK_DEREGISTER(_stack->valgrind_id);
108     free(_stack);
109     _stack = NULL;
110 }
111 
112 void Thread::InitContext()
113 {
114     if (save_context(_jmpbuf) != 0)
115     {
116         ScheduleObj::Instance()->ScheduleStartRun();
117     }
118 
119     if (_stack != NULL)
120     {
121         replace_esp(_jmpbuf, _stack->_esp);
122     }
123 }
124 
125 void Thread::SwitchContext()
126 {
127     if (save_context(_jmpbuf) == 0)
128     {
129         ScheduleObj::Instance()->ScheduleThread();
130     }
131 }
132 
133 
134 int Thread::SaveContext()
135 {
136     return save_context(_jmpbuf);
137 }
138 
139 void Thread::RestoreContext()
140 {
141     restore_context(_jmpbuf, 1);
142 }
143 
144 
145 bool Thread::Initial()
146 {
147     if (!InitStack())
148     {
149         MTLOG_ERROR("init stack failed");
150         return false;
151     }
152 
153     InitContext();
154 
155     return true;
156 }
157 
158 void Thread::Destroy()
159 {
160     FreeStack();
161     memset(&_jmpbuf, 0, sizeof(_jmpbuf));
162 }
163 
164 void Thread::Reset()
165 {
166     _wakeup_time = 0;
167     SetPrivate(NULL);
168 
169     InitContext();
170     CleanState();
171 }
172 
173 void Thread::sleep(int ms)
174 {
175     utime64_t now = ScheduleObj::Instance()->ScheduleGetTime();
176     _wakeup_time = now + ms;
177 
178     if (save_context(_jmpbuf) == 0)
179     {
180         ScheduleObj::Instance()->ScheduleSleep();
181     }
182 }
183 
184 void Thread::Wait()
185 {
186     if (save_context(_jmpbuf) == 0)
187     {
188         ScheduleObj::Instance()->SchedulePend();
189     }
190 }
191 
192 bool Thread::CheckStackHealth(char *esp)
193 {
194     if (!_stack)
195         return false;
196 
197     if (esp > _stack->_stk_bottom && esp < _stack->_stk_top)
198         return true;
199     else
200         return false;
201 }
202 
203 MicroThread::MicroThread(ThreadType type)
204 {
205     memset(&_entry, 0, sizeof(_entry));
206     TAILQ_INIT(&_fdset);
207     TAILQ_INIT(&_sub_list);
208     _flag = NOT_INLIST;
209     _type = type;
210     _state = INITIAL;
211     _start = NULL;
212     _args = NULL;
213     _parent = NULL;
214 }
215 
216 void MicroThread::CleanState()
217 {
218     TAILQ_INIT(&_fdset);
219     TAILQ_INIT(&_sub_list);
220     _flag = NOT_INLIST;
221     _type = NORMAL;
222     _state = INITIAL;
223     _start = NULL;
224     _args = NULL;
225     _parent = NULL;
226 }
227 
228 void MicroThread::Run()
229 {
230     if (_start) {
231         _start(_args);
232     }
233 
234     if (this->IsSubThread()) {
235         this->WakeupParent();
236     }
237 
238     ScheduleObj::Instance()->ScheduleReclaim();
239     ScheduleObj::Instance()->ScheduleThread();
240 }
241 
242 void MicroThread::WakeupParent()
243 {
244     MicroThread* parent = this->GetParent();
245     if (parent)
246     {
247         parent->RemoveSubThread(this);
248         if (parent->HasNoSubThread())
249         {
250             ScheduleObj::Instance()->ScheduleUnpend(parent);
251         }
252     }
253     else
254     {
255         MTLOG_ERROR("Sub thread no parent, error");
256     }
257 }
258 
259 bool MicroThread::HasNoSubThread()
260 {
261     return TAILQ_EMPTY(&_sub_list);
262 }
263 
264 void MicroThread::AddSubThread(MicroThread* sub)
265 {
266     ASSERT(!sub->HasFlag(MicroThread::SUB_LIST));
267     if (!sub->HasFlag(MicroThread::SUB_LIST))
268     {
269         TAILQ_INSERT_TAIL(&_sub_list, sub, _sub_entry);
270         sub->_parent = this;
271     }
272 
273     sub->SetFlag(MicroThread::SUB_LIST);
274 }
275 
276 void MicroThread::RemoveSubThread(MicroThread* sub)
277 {
278     ASSERT(sub->HasFlag(MicroThread::SUB_LIST));
279     if (sub->HasFlag(MicroThread::SUB_LIST))
280     {
281         TAILQ_REMOVE(&_sub_list, sub, _sub_entry);
282         sub->_parent = NULL;
283     }
284 
285     sub->UnsetFlag(MicroThread::SUB_LIST);
286 }
287 
288 ScheduleObj *ScheduleObj::_instance = NULL;
289 inline ScheduleObj* ScheduleObj::Instance()
290 {
291     if (NULL == _instance)
292     {
293         _instance = new ScheduleObj();
294     }
295 
296     return _instance;
297 }
298 
299 void ScheduleObj::ScheduleThread()
300 {
301     MtFrame* frame = MtFrame::Instance();
302     frame->ThreadSchdule();
303 }
304 
305 utime64_t ScheduleObj::ScheduleGetTime()
306 {
307     MtFrame* frame = MtFrame::Instance();
308     if (frame)
309     {
310         return frame->GetLastClock();
311     }
312     else
313     {
314         MTLOG_ERROR("frame time failed, maybe not init");
315         return 0;
316     }
317 }
318 
319 void ScheduleObj::ScheduleSleep()
320 {
321     MtFrame* frame = MtFrame::Instance();
322     MicroThread* thread = frame->GetActiveThread();
323     if ((!frame) || (!thread)) {
324         MTLOG_ERROR("frame and act thread null, %p, %p", frame, thread);
325         return;
326     }
327 
328     frame->InsertSleep(thread);
329     frame->ThreadSchdule();
330 }
331 
332 void ScheduleObj::SchedulePend()
333 {
334     MtFrame* frame = MtFrame::Instance();
335     MicroThread* thread = frame->GetActiveThread();
336     if ((!frame) || (!thread)) {
337         MTLOG_ERROR("frame and act thread null, %p, %p", frame, thread);
338         return;
339     }
340 
341     frame->InsertPend(thread);
342     frame->ThreadSchdule();
343 }
344 
345 void ScheduleObj::ScheduleUnpend(void* pthread)
346 {
347     MtFrame* frame = MtFrame::Instance();
348     MicroThread* thread = (MicroThread*)pthread;
349     if ((!frame) || (!thread)) {
350         MTLOG_ERROR("frame and act thread null, %p, %p", frame, thread);
351         return;
352     }
353 
354     frame->RemovePend(thread);
355     frame->InsertRunable(thread);
356 }
357 
358 void ScheduleObj::ScheduleReclaim()
359 {
360     MtFrame* frame = MtFrame::Instance();
361     MicroThread* thread = frame->GetActiveThread();
362     if ((!frame) || (!thread)) {
363         MTLOG_ERROR("frame and act thread null, %p, %p", frame, thread);
364         return;
365     }
366 
367     frame->FreeThread(thread);
368 }
369 
370 void ScheduleObj::ScheduleStartRun()
371 {
372     MtFrame* frame = MtFrame::Instance();
373     MicroThread* thread = frame->GetActiveThread();
374     if ((!frame) || (!thread)) {
375         MTLOG_ERROR("frame and act thread null, %p, %p", frame, thread);
376         return;
377     }
378 
379     thread->Run();
380 }
381 
382 
383 unsigned int ThreadPool::default_thread_num = DEFAULT_THREAD_NUM;   ///< 2000 micro threads.
384 unsigned int ThreadPool::last_default_thread_num = DEFAULT_THREAD_NUM;   ///< 2000 micro threads.
385 unsigned int ThreadPool::default_stack_size = DEFAULT_STACK_SIZE;   ///< 128k stack.
386 
387 bool ThreadPool::InitialPool(int max_num)
388 {
389     MicroThread *thread = NULL;
390     for (unsigned int i = 0; i < default_thread_num; i++)
391     {
392         thread = new MicroThread();
393         if ((NULL == thread) || (false == thread->Initial()))
394         {
395             MTLOG_ERROR("init pool, thread %p init failed", thread);
396             if (thread)  delete thread;
397             continue;
398         }
399         thread->SetFlag(MicroThread::FREE_LIST);
400         _freelist.push(thread);
401     }
402 
403     _total_num = _freelist.size();
404     _max_num  = max_num;
405     _use_num = 0;
406     if (_total_num <= 0)
407     {
408         return false;
409     }
410     else
411     {
412         return true;
413     }
414 }
415 
416 void ThreadPool::DestroyPool()
417 {
418     MicroThread* thread = NULL;
419     while (!_freelist.empty())
420     {
421         thread = _freelist.front();
422         _freelist.pop();
423         thread->Destroy();
424         delete thread;
425     }
426 
427     _total_num = 0;
428     _use_num = 0;
429 }
430 
431 MicroThread* ThreadPool::AllocThread()
432 {
433     MT_ATTR_API_SET(492069, _total_num);
434 
435     MicroThread* thread = NULL;
436     if (!_freelist.empty())
437     {
438         thread = _freelist.front();
439         _freelist.pop();
440 
441         ASSERT(thread->HasFlag(MicroThread::FREE_LIST));
442 
443         thread->UnsetFlag(MicroThread::FREE_LIST);
444         _use_num++;
445         return thread;
446     }
447 
448     MT_ATTR_API(320846, 1); // pool no nore
449     if (_total_num >= _max_num)
450     {
451         MT_ATTR_API(361140, 1); // no more quota
452         MTLOG_ERROR("total %d is outof max: %d", _total_num,_max_num);
453         return NULL;
454     }
455 
456     thread = new MicroThread();
457     if ((NULL == thread) || (false == thread->Initial()))
458     {
459         MT_ATTR_API(320847, 1); // pool init fail
460         MTLOG_ERROR("thread alloc failed, thread: %p", thread);
461         if (thread)  delete thread;
462         return NULL;
463     }
464     _total_num++;
465     _use_num++;
466     if(_use_num >(int) default_thread_num){
467         if(((int) default_thread_num * 2 )< _max_num){
468             last_default_thread_num = default_thread_num;
469             default_thread_num = default_thread_num * 2;
470         }
471     }
472 
473     return thread;
474 }
475 
476 void ThreadPool::FreeThread(MicroThread* thread)
477 {
478     ASSERT(!thread->HasFlag(MicroThread::FREE_LIST));
479     thread->Reset();
480     _use_num--;
481     _freelist.push(thread);
482     thread->SetFlag(MicroThread::FREE_LIST);
483 
484     unsigned int free_num = _freelist.size();
485     if ((free_num > default_thread_num) && (free_num > 1))
486     {
487         thread = _freelist.front();
488         _freelist.pop();
489         thread->Destroy();
490         delete thread;
491         _total_num--;
492         if(default_thread_num / 2 >= DEFAULT_THREAD_NUM){
493             last_default_thread_num = default_thread_num;
494             default_thread_num = default_thread_num / 2;
495         }
496     }
497 }
498 
499 int ThreadPool::GetUsedNum(void)
500 {
501     return _use_num;
502 }
503 
504 MtFrame *MtFrame::_instance = NULL;
505 inline MtFrame* MtFrame::Instance ()
506 {
507     if (NULL == _instance )
508     {
509         _instance = new MtFrame();
510     }
511 
512     return _instance;
513 }
514 
515 void MtFrame::SetHookFlag() {
516     mt_set_hook_flag();
517 };
518 
519 bool MtFrame::InitFrame(LogAdapter* logadpt, int max_thread_num)
520 {
521     if(logadpt == NULL){
522         _log_adpt = &def_log_adapt;
523     }else{
524         _log_adpt = logadpt;
525     }
526 
527     if ((this->InitKqueue(max_thread_num) < 0) || !this->InitialPool(max_thread_num))
528     {
529         MTLOG_ERROR("Init epoll or thread pool failed");
530         this->Destroy();
531         return false;
532     }
533     if (_sleeplist.HeapResize(max_thread_num * 2) < 0)
534     {
535         MTLOG_ERROR("Init heap list failed");
536         this->Destroy();
537         return false;
538     }
539 
540     _timer = new CTimerMng(max_thread_num * 2);
541     if (NULL == _timer)
542     {
543         MTLOG_ERROR("Init heap timer failed");
544         this->Destroy();
545         return false;
546     }
547 
548     _daemon = AllocThread();
549     if (NULL == _daemon)
550     {
551         MTLOG_ERROR("Alloc daemon thread failed");
552         this->Destroy();
553         return false;
554     }
555     _daemon->SetType(MicroThread::DAEMON);
556     _daemon->SetState(MicroThread::RUNABLE);
557     _daemon->SetSartFunc(MtFrame::DaemonRun, this);
558 
559     _primo = new MicroThread(MicroThread::PRIMORDIAL);
560     if (NULL == _primo)
561     {
562         MTLOG_ERROR("new _primo thread failed");
563         this->Destroy();
564         return false;
565     }
566     _primo->SetState(MicroThread::RUNNING);
567     SetActiveThread(_primo);
568 
569     _last_clock = GetSystemMS();
570     TAILQ_INIT(&_iolist);
571     TAILQ_INIT(&_pend_list);
572 
573     //SetHookFlag();
574 
575     return true;
576 
577 }
578 
579 void MtFrame::Destroy(void)
580 {
581     if (NULL == _instance )
582     {
583         return;
584     }
585 
586     if (_primo) {
587         delete _primo;
588         _primo = NULL;
589     }
590 
591     if (_daemon) {
592         FreeThread(_daemon);
593         _daemon = NULL;
594     }
595 
596     TAILQ_INIT(&_iolist);
597 
598     MicroThread* thread = dynamic_cast<MicroThread*>(_sleeplist.HeapPop());
599     while (thread)
600     {
601         FreeThread(thread);
602         thread = dynamic_cast<MicroThread*>(_sleeplist.HeapPop());
603     }
604 
605     while (!_runlist.empty())
606     {
607         thread = _runlist.front();
608         _runlist.pop();
609         FreeThread(thread);
610     }
611 
612     MicroThread* tmp;
613     TAILQ_FOREACH_SAFE(thread, &_pend_list, _entry, tmp)
614     {
615         TAILQ_REMOVE(&_pend_list, thread, _entry);
616         FreeThread(thread);
617     }
618 
619     if (_timer != NULL)
620     {
621         delete _timer;
622         _timer = NULL;
623     }
624 
625     _instance->DestroyPool();
626     _instance->TermKqueue();
627     delete _instance;
628     _instance = NULL;
629 }
630 
631 char* MtFrame::Version()
632 {
633     return IMT_VERSION;
634 }
635 
636 MicroThread* MtFrame::CreateThread(ThreadStart entry, void *args, bool runable)
637 {
638     MtFrame* mtframe = MtFrame::Instance();
639     MicroThread* thread = mtframe->AllocThread();
640     if (NULL == thread)
641     {
642         MTLOG_ERROR("create thread failed");
643         return NULL;
644     }
645     thread->SetSartFunc(entry, args);
646 
647     if (runable) {
648         mtframe->InsertRunable(thread);
649     }
650 
651     return thread;
652 }
653 
654 int MtFrame::Loop(void* args)
655 {
656     MtFrame* mtframe = MtFrame::Instance();
657     MicroThread* daemon = mtframe->DaemonThread();
658 
659     mtframe->KqueueDispatch();
660     mtframe->SetLastClock(mtframe->GetSystemMS());
661     mtframe->WakeupTimeout();
662     mtframe->CheckExpired();
663     daemon->SwitchContext();
664 
665     return 0;
666 }
667 
668 void MtFrame::DaemonRun(void* args)
669 {
670     /*
671     MtFrame* mtframe = MtFrame::Instance();
672     MicroThread* daemon = mtframe->DaemonThread();
673 
674     while (true) {
675         mtframe->KqueueDispatch();
676         mtframe->SetLastClock(mtframe->GetSystemMS());
677         mtframe->WakeupTimeout();
678         mtframe->CheckExpired();
679         daemon->SwitchContext();
680     }
681     */
682     ff_run(MtFrame::Loop, NULL);
683 }
684 
685 MicroThread *MtFrame::GetRootThread()
686 {
687     if (NULL == _curr_thread)
688     {
689         return NULL;
690     }
691 
692     MicroThread::ThreadType type = _curr_thread->GetType();
693     MicroThread *thread = _curr_thread;
694     MicroThread *parent = thread;
695 
696     while (MicroThread::SUB_THREAD == type)
697     {
698         thread = thread->GetParent();
699         if (!thread)
700         {
701             break;
702         }
703 
704         type   = thread->GetType();
705         parent = thread;
706     }
707 
708     return parent;
709 }
710 
711 void MtFrame::ThreadSchdule()
712 {
713     MicroThread* thread = NULL;
714     MtFrame* mtframe = MtFrame::Instance();
715 
716     if (mtframe->_runlist.empty())
717     {
718         thread = mtframe->DaemonThread();
719     }
720     else
721     {
722         thread = mtframe->_runlist.front();
723         mtframe->RemoveRunable(thread);
724     }
725 
726     this->SetActiveThread(thread);
727     thread->SetState(MicroThread::RUNNING);
728     thread->RestoreContext();
729 }
730 
731 void MtFrame::CheckExpired()
732 {
733     static utime64_t check_time = 0;
734 
735     if (_timer != NULL)
736     {
737         _timer->check_expired();
738     }
739 
740    utime64_t now = GetLastClock();
741 
742     if ((now - check_time) > 1000)
743     {
744         CNetMgr::Instance()->RecycleObjs(now);
745         check_time = now;
746     }
747 }
748 
749 void MtFrame::WakeupTimeout()
750 {
751     utime64_t now = GetLastClock();
752     MicroThread* thread = dynamic_cast<MicroThread*>(_sleeplist.HeapTop());
753     while (thread && (thread->GetWakeupTime() <= now))
754     {
755         if (thread->HasFlag(MicroThread::IO_LIST))
756         {
757             RemoveIoWait(thread);
758         }
759         else
760         {
761             RemoveSleep(thread);
762         }
763 
764         InsertRunable(thread);
765 
766         thread = dynamic_cast<MicroThread*>(_sleeplist.HeapTop());
767     }
768 }
769 
770 int MtFrame::KqueueGetTimeout()
771 {
772     utime64_t now = GetLastClock();
773     MicroThread* thread = dynamic_cast<MicroThread*>(_sleeplist.HeapTop());
774     if (!thread)
775     {
776         return 10; //default 10ms epollwait
777     }
778     else if (thread->GetWakeupTime() < now)
779     {
780         return 0;
781     }
782     else
783     {
784         return (int)(thread->GetWakeupTime() - now);
785     }
786 }
787 
788 inline void MtFrame::InsertSleep(MicroThread* thread)
789 {
790     ASSERT(!thread->HasFlag(MicroThread::SLEEP_LIST));
791 
792     thread->SetFlag(MicroThread::SLEEP_LIST);
793     thread->SetState(MicroThread::SLEEPING);
794     int rc = _sleeplist.HeapPush(thread);
795     if (rc < 0)
796     {
797         MT_ATTR_API(320848, 1); // heap error
798         MTLOG_ERROR("Insert heap failed , rc %d", rc);
799     }
800 }
801 
802 inline void MtFrame::RemoveSleep(MicroThread* thread)
803 {
804     ASSERT(thread->HasFlag(MicroThread::SLEEP_LIST));
805     thread->UnsetFlag(MicroThread::SLEEP_LIST);
806 
807     int rc = _sleeplist.HeapDelete(thread);
808     if (rc < 0)
809     {
810         MT_ATTR_API(320849, 1); // heap error
811         MTLOG_ERROR("remove heap failed , rc %d", rc);
812     }
813 }
814 
815 inline void MtFrame::InsertIoWait(MicroThread* thread)
816 {
817     ASSERT(!thread->HasFlag(MicroThread::IO_LIST));
818     thread->SetFlag(MicroThread::IO_LIST);
819     TAILQ_INSERT_TAIL(&_iolist, thread, _entry);
820     InsertSleep(thread);
821 }
822 
823 void MtFrame::RemoveIoWait(MicroThread* thread)
824 {
825     ASSERT(thread->HasFlag(MicroThread::IO_LIST));
826     thread->UnsetFlag(MicroThread::IO_LIST);
827     TAILQ_REMOVE(&_iolist, thread, _entry);
828 
829     RemoveSleep(thread);
830 }
831 
832 void MtFrame::InsertRunable(MicroThread* thread)
833 {
834     ASSERT(!thread->HasFlag(MicroThread::RUN_LIST));
835     thread->SetFlag(MicroThread::RUN_LIST);
836 
837     thread->SetState(MicroThread::RUNABLE);
838     _runlist.push(thread);
839     _waitnum++;
840 }
841 
842 inline void MtFrame::RemoveRunable(MicroThread* thread)
843 {
844     ASSERT(thread->HasFlag(MicroThread::RUN_LIST));
845     ASSERT(thread == _runlist.front());
846     thread->UnsetFlag(MicroThread::RUN_LIST);
847 
848     _runlist.pop();
849     _waitnum--;
850 }
851 
852 void MtFrame::InsertPend(MicroThread* thread)
853 {
854     ASSERT(!thread->HasFlag(MicroThread::PEND_LIST));
855     thread->SetFlag(MicroThread::PEND_LIST);
856     TAILQ_INSERT_TAIL(&_pend_list, thread, _entry);
857     thread->SetState(MicroThread::PENDING);
858 }
859 
860 void MtFrame::RemovePend(MicroThread* thread)
861 {
862     ASSERT(thread->HasFlag(MicroThread::PEND_LIST));
863     thread->UnsetFlag(MicroThread::PEND_LIST);
864     TAILQ_REMOVE(&_pend_list, thread, _entry);
865 }
866 
867 void MtFrame::WaitNotify(utime64_t timeout)
868 {
869     MicroThread* thread = GetActiveThread();
870 
871     thread->SetWakeupTime(timeout + this->GetLastClock());
872     this->InsertIoWait(thread);
873     thread->SwitchContext();
874 }
875 
876 void MtFrame::NotifyThread(MicroThread* thread)
877 {
878     if(thread == NULL){
879         return;
880     }
881     MicroThread* cur_thread = GetActiveThread();
882     if (thread->HasFlag(MicroThread::IO_LIST))
883     {
884         this->RemoveIoWait(thread);
885         if(cur_thread == this->DaemonThread()){
886             // ���ﲻֱ���еĻ�,���Dz���ʱ,�ᵼ��Ŀ���̵߳ȴ�����ʱ
887             if(cur_thread->SaveContext() == 0){
888                 this->SetActiveThread(thread);
889                 thread->SetState(MicroThread::RUNNING);
890                 thread->RestoreContext();
891             }
892         }else{
893             this->InsertRunable(thread);
894         }
895     }
896 }
897 
898 void MtFrame::SwapDaemonThread()
899 {
900     MicroThread* thread = GetActiveThread();
901     MicroThread* daemon_thread = this->DaemonThread();
902     if(thread != daemon_thread){
903         if(thread->SaveContext() == 0){
904             this->InsertRunable(thread);
905             this->SetActiveThread(daemon_thread);
906             daemon_thread->SetState(MicroThread::RUNNING);
907             daemon_thread->RestoreContext();
908         }
909     }
910 }
911 
912 bool MtFrame::KqueueSchedule(KqObjList* fdlist, KqueuerObj* fd, int timeout)
913 {
914     MicroThread* thread = GetActiveThread();
915     if (NULL == thread)
916     {
917         MTLOG_ERROR("active thread null, epoll schedule failed");
918         return false;
919     }
920 
921     thread->ClearAllFd();
922     if (fdlist)
923     {
924         thread->AddFdList(fdlist);
925     }
926     if (fd)
927     {
928         thread->AddFd(fd);
929     }
930 
931     thread->SetWakeupTime(timeout + this->GetLastClock());
932     if (!this->KqueueAdd(thread->GetFdSet()))
933     {
934         MTLOG_ERROR("epoll add failed, errno: %d", errno);
935         return false;
936     }
937     this->InsertIoWait(thread);
938     thread->SwitchContext();
939 
940     int rcvnum = 0;
941     KqObjList& rcvfds = thread->GetFdSet();
942     KqueuerObj* fdata = NULL;
943     TAILQ_FOREACH(fdata, &rcvfds, _entry)
944     {
945         if (fdata->GetRcvEvents() != 0)
946         {
947             rcvnum++;
948         }
949     }
950     this->KqueueDel(rcvfds);
951 
952     if (rcvnum == 0)
953     {
954         errno = ETIME;
955         return false;
956     }
957 
958     return true;
959 }
960 
961 int MtFrame::recvfrom(int fd, void *buf, int len, int flags, struct sockaddr *from, socklen_t *fromlen, int timeout)
962 {
963     MtFrame* mtframe = MtFrame::Instance();
964     utime64_t start = mtframe->GetLastClock();
965     MicroThread* thread = mtframe->GetActiveThread();
966     utime64_t now = 0;
967 
968     if(fd<0 || !buf || len<1)
969     {
970         errno = EINVAL;
971         MTLOG_ERROR("recvfrom failed, errno: %d (%m)", errno);
972         return -10;
973     }
974 
975     if (timeout <= -1)
976     {
977         timeout = 0x7fffffff;
978     }
979 
980     while (true)
981     {
982         now = mtframe->GetLastClock();
983         if ((int)(now - start) > timeout)
984         {
985             errno = ETIME;
986             return -1;
987         }
988 
989         KqueuerObj epfd;
990         epfd.SetOsfd(fd);
991         epfd.EnableInput();
992         epfd.SetOwnerThread(thread);
993         if (!mtframe->KqueueSchedule(NULL, &epfd, timeout))
994         {
995             MTLOG_DEBUG("epoll schedule failed, errno: %d", errno);
996             return -2;
997         }
998 
999         mt_hook_syscall(recvfrom);
1000         int n = ff_hook_recvfrom(fd, buf, len, flags, from, fromlen);
1001         if (n < 0)
1002         {
1003             if (errno == EINTR) {
1004                 continue;
1005             }
1006 
1007             if ((errno != EAGAIN) && (errno != EWOULDBLOCK))
1008             {
1009                 MTLOG_ERROR("recvfrom failed, errno: %d", errno);
1010                 return -3;
1011             }
1012         }
1013         else
1014         {
1015             return n;
1016         }
1017     }
1018 
1019 }
1020 
1021 int MtFrame::sendto(int fd, const void *msg, int len, int flags, const struct sockaddr *to, int tolen, int timeout)
1022 {
1023     MtFrame* mtframe = MtFrame::Instance();
1024     utime64_t start = mtframe->GetLastClock();
1025     MicroThread* thread = mtframe->GetActiveThread();
1026     utime64_t now = 0;
1027 
1028     if(fd<0 || !msg || len<1)
1029     {
1030         errno = EINVAL;
1031         MTLOG_ERROR("sendto failed, errno: %d (%m)", errno);
1032         return -10;
1033     }
1034 
1035     int n = 0;
1036     mt_hook_syscall(sendto);
1037     while ((n = ff_hook_sendto(fd, msg, len, flags, to, tolen)) < 0)
1038     {
1039         now = mtframe->GetLastClock();
1040         if ((int)(now - start) > timeout)
1041         {
1042             errno = ETIME;
1043             return -1;
1044         }
1045 
1046         if (errno == EINTR) {
1047             continue;
1048         }
1049 
1050         if ((errno != EAGAIN) && (errno != EWOULDBLOCK)) {
1051             MTLOG_ERROR("sendto failed, errno: %d", errno);
1052             return -2;
1053         }
1054 
1055         KqueuerObj epfd;
1056         epfd.SetOsfd(fd);
1057         epfd.EnableOutput();
1058         epfd.SetOwnerThread(thread);
1059         if (!mtframe->KqueueSchedule(NULL, &epfd, timeout)) {
1060             return -3;
1061         }
1062     }
1063 
1064     return n;
1065 }
1066 
1067 int MtFrame::connect(int fd, const struct sockaddr *addr, int addrlen, int timeout)
1068 {
1069     MtFrame* mtframe = MtFrame::Instance();
1070     utime64_t start = mtframe->GetLastClock();
1071     MicroThread* thread = mtframe->GetActiveThread();
1072     utime64_t now = 0;
1073 
1074     if(fd<0 || !addr || addrlen<1)
1075     {
1076         errno = EINVAL;
1077         MTLOG_ERROR("connect failed, errno: %d (%m)", errno);
1078         return -10;
1079     }
1080 
1081     int n = 0;
1082     mt_hook_syscall(connect);
1083     while ((n = ff_hook_connect(fd, addr, addrlen)) < 0)
1084     {
1085         now = mtframe->GetLastClock();
1086         if ((int)(now - start) > timeout)
1087         {
1088             errno = ETIME;
1089             return -1;
1090         }
1091 
1092         if (errno == EISCONN)
1093         {
1094             return 0;
1095         }
1096 
1097         if (errno == EINTR) {
1098             continue;
1099         }
1100 
1101         if (errno != EINPROGRESS) {
1102             MTLOG_ERROR("connect failed, errno: %d", errno);
1103             return -2;
1104         }
1105 
1106         KqueuerObj epfd;
1107         epfd.SetOsfd(fd);
1108         epfd.EnableOutput();
1109         epfd.SetOwnerThread(thread);
1110         if (!mtframe->KqueueSchedule(NULL, &epfd, timeout)) {
1111             return -3;
1112         }
1113     }
1114 
1115     return n;
1116 }
1117 
1118 int MtFrame::accept(int fd, struct sockaddr *addr, socklen_t *addrlen, int timeout)
1119 {
1120     MtFrame* mtframe = MtFrame::Instance();
1121     utime64_t start = mtframe->GetLastClock();
1122     MicroThread* thread = mtframe->GetActiveThread();
1123     utime64_t now = 0;
1124 
1125     if(fd<0)
1126     {
1127         errno = EINVAL;
1128         MTLOG_ERROR("accept failed, errno: %d (%m)", errno);
1129         return -10;
1130     }
1131 
1132     int acceptfd = 0;
1133     mt_hook_syscall(accept);
1134     while ((acceptfd = ff_hook_accept(fd, addr, addrlen)) < 0)
1135     {
1136         now = mtframe->GetLastClock();
1137         if ((int)(now - start) > timeout)
1138         {
1139             errno = ETIME;
1140             return -1;
1141         }
1142 
1143         if (errno == EINTR) {
1144             continue;
1145         }
1146 
1147         if (!((errno == EAGAIN) || (errno == EWOULDBLOCK))) {
1148             MTLOG_ERROR("accept failed, errno: %d", errno);
1149             return -2;
1150         }
1151 
1152         KqueuerObj epfd;
1153         epfd.SetOsfd(fd);
1154         epfd.EnableInput();
1155         epfd.SetOwnerThread(thread);
1156         if (!mtframe->KqueueSchedule(NULL, &epfd, timeout)) {
1157             return -3;
1158         }
1159     }
1160 
1161     return acceptfd;
1162 }
1163 
1164 ssize_t MtFrame::read(int fd, void *buf, size_t nbyte, int timeout)
1165 {
1166     MtFrame* mtframe = MtFrame::Instance();
1167     utime64_t start = mtframe->GetLastClock();
1168     MicroThread* thread = mtframe->GetActiveThread();
1169     utime64_t now = 0;
1170 
1171     if(fd<0 || !buf || nbyte<1)
1172     {
1173         errno = EINVAL;
1174         MTLOG_ERROR("read failed, errno: %d (%m)", errno);
1175         return -10;
1176     }
1177 
1178     ssize_t n = 0;
1179     mt_hook_syscall(read);
1180     while ((n = ff_hook_read(fd, buf, nbyte)) < 0)
1181     {
1182         now = mtframe->GetLastClock();
1183         if ((int)(now - start) > timeout)
1184         {
1185             errno = ETIME;
1186             return -1;
1187         }
1188 
1189         if (errno == EINTR) {
1190             continue;
1191         }
1192 
1193         if ((errno != EAGAIN) && (errno != EWOULDBLOCK)) {
1194             MTLOG_ERROR("read failed, errno: %d", errno);
1195             return -2;
1196         }
1197 
1198         KqueuerObj epfd;
1199         epfd.SetOsfd(fd);
1200         epfd.EnableInput();
1201         epfd.SetOwnerThread(thread);
1202         if (!mtframe->KqueueSchedule(NULL, &epfd, timeout)) {
1203             return -3;
1204         }
1205     }
1206 
1207     return n;
1208 }
1209 
1210 ssize_t MtFrame::write(int fd, const void *buf, size_t nbyte, int timeout)
1211 {
1212     MtFrame* mtframe = MtFrame::Instance();
1213     utime64_t start = mtframe->GetLastClock();
1214     MicroThread* thread = mtframe->GetActiveThread();
1215     utime64_t now = 0;
1216 
1217     if(fd<0 || !buf || nbyte<1)
1218     {
1219         errno = EINVAL;
1220         MTLOG_ERROR("write failed, errno: %d (%m)", errno);
1221         return -10;
1222     }
1223 
1224     ssize_t n = 0;
1225     size_t send_len = 0;
1226     while (send_len < nbyte)
1227     {
1228         now = mtframe->GetLastClock();
1229         if ((int)(now - start) > timeout)
1230         {
1231             errno = ETIME;
1232             return -1;
1233         }
1234 
1235         mt_hook_syscall(write);
1236         n = ff_hook_write(fd, (char*)buf + send_len, nbyte - send_len);
1237         if (n < 0)
1238         {
1239             if (errno == EINTR) {
1240                 continue;
1241             }
1242 
1243             if ((errno != EAGAIN) && (errno != EWOULDBLOCK)) {
1244                 MTLOG_ERROR("write failed, errno: %d", errno);
1245                 return -2;
1246             }
1247         }
1248         else
1249         {
1250             send_len += n;
1251             if (send_len >= nbyte) {
1252                 return nbyte;
1253             }
1254         }
1255 
1256         KqueuerObj epfd;
1257         epfd.SetOsfd(fd);
1258         epfd.EnableOutput();
1259         epfd.SetOwnerThread(thread);
1260         if (!mtframe->KqueueSchedule(NULL, &epfd, timeout)) {
1261             return -3;
1262         }
1263     }
1264 
1265     return nbyte;
1266 }
1267 
1268 int MtFrame::recv(int fd, void *buf, int len, int flags, int timeout)
1269 {
1270     MtFrame* mtframe = MtFrame::Instance();
1271     utime64_t start = mtframe->GetLastClock();
1272     MicroThread* thread = mtframe->GetActiveThread();
1273     utime64_t now = 0;
1274 
1275     if(fd<0 || !buf || len<1)
1276     {
1277         errno = EINVAL;
1278         MTLOG_ERROR("recv failed, errno: %d (%m)", errno);
1279         return -10;
1280     }
1281 
1282     if (timeout <= -1)
1283     {
1284         timeout = 0x7fffffff;
1285     }
1286 
1287     while (true)
1288     {
1289         now = mtframe->GetLastClock();
1290         if ((int)(now - start) > timeout)
1291         {
1292             errno = ETIME;
1293             return -1;
1294         }
1295 
1296         KqueuerObj epfd;
1297         epfd.SetOsfd(fd);
1298         epfd.EnableInput();
1299         epfd.SetOwnerThread(thread);
1300         if (!mtframe->KqueueSchedule(NULL, &epfd, timeout))
1301         {
1302             MTLOG_DEBUG("epoll schedule failed, errno: %d", errno);
1303             return -2;
1304         }
1305 
1306         mt_hook_syscall(recv);
1307         int n = ff_hook_recv(fd, buf, len, flags);
1308         if (n < 0)
1309         {
1310             if (errno == EINTR) {
1311                 continue;
1312             }
1313 
1314             if ((errno != EAGAIN) && (errno != EWOULDBLOCK))
1315             {
1316                 MTLOG_ERROR("recv failed, errno: %d", errno);
1317                 return -3;
1318             }
1319         }
1320         else
1321         {
1322             return n;
1323         }
1324     }
1325 
1326 }
1327 
1328 ssize_t MtFrame::send(int fd, const void *buf, size_t nbyte, int flags, int timeout)
1329 {
1330     MtFrame* mtframe = MtFrame::Instance();
1331     utime64_t start = mtframe->GetLastClock();
1332     MicroThread* thread = mtframe->GetActiveThread();
1333     utime64_t now = 0;
1334 
1335     if(fd<0 || !buf || nbyte<1)
1336     {
1337         errno = EINVAL;
1338         MTLOG_ERROR("send failed, errno: %d (%m)", errno);
1339         return -10;
1340     }
1341 
1342     ssize_t n = 0;
1343     size_t send_len = 0;
1344     while (send_len < nbyte)
1345     {
1346         now = mtframe->GetLastClock();
1347         if ((int)(now - start) > timeout)
1348         {
1349             errno = ETIME;
1350             return -1;
1351         }
1352 
1353         mt_hook_syscall(send);
1354         n = ff_hook_send(fd, (char*)buf + send_len, nbyte - send_len, flags);
1355         if (n < 0)
1356         {
1357             if (errno == EINTR) {
1358                 continue;
1359             }
1360 
1361             if ((errno != EAGAIN) && (errno != EWOULDBLOCK)) {
1362                 MTLOG_ERROR("write failed, errno: %d", errno);
1363                 return -2;
1364             }
1365         }
1366         else
1367         {
1368             send_len += n;
1369             if (send_len >= nbyte) {
1370                 return nbyte;
1371             }
1372         }
1373 
1374         KqueuerObj epfd;
1375         epfd.SetOsfd(fd);
1376         epfd.EnableOutput();
1377         epfd.SetOwnerThread(thread);
1378         if (!mtframe->KqueueSchedule(NULL, &epfd, timeout)) {
1379             return -3;
1380         }
1381     }
1382 
1383     return nbyte;
1384 }
1385 
1386 void MtFrame::sleep(int ms)
1387 {
1388     MtFrame* frame = MtFrame::Instance();
1389     MicroThread* thread = frame->GetActiveThread();
1390     if (thread != NULL)
1391     {
1392         thread->sleep(ms);
1393     }
1394 }
1395 
1396 int MtFrame::WaitEvents(int fd, int events, int timeout)
1397 {
1398     MtFrame* mtframe = MtFrame::Instance();
1399     utime64_t start = mtframe->GetLastClock();
1400     MicroThread* thread = mtframe->GetActiveThread();
1401     utime64_t now = 0;
1402 
1403     if (timeout <= -1)
1404     {
1405         timeout = 0x7fffffff;
1406     }
1407 
1408     while (true)
1409     {
1410         now = mtframe->GetLastClock();
1411         if ((int)(now - start) > timeout)
1412         {
1413             errno = ETIME;
1414             return 0;
1415         }
1416 
1417         KqueuerObj epfd;
1418         epfd.SetOsfd(fd);
1419         if (events & KQ_EVENT_READ)
1420         {
1421             epfd.EnableInput();
1422         }
1423         if (events & KQ_EVENT_WRITE)
1424         {
1425             epfd.EnableOutput();
1426         }
1427         epfd.SetOwnerThread(thread);
1428 
1429         if (!mtframe->KqueueSchedule(NULL, &epfd, timeout))
1430         {
1431             MTLOG_TRACE("epoll schedule failed, errno: %d", errno);
1432             return 0;
1433         }
1434 
1435         return epfd.GetRcvEvents();
1436     }
1437 }
1438