xref: /f-stack/app/micro_thread/micro_thread.cpp (revision a9643ea8)
1 
2 /**
3  * Tencent is pleased to support the open source community by making MSEC available.
4  *
5  * Copyright (C) 2016 THL A29 Limited, a Tencent company. All rights reserved.
6  *
7  * Licensed under the GNU General Public License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License. You may
9  * obtain a copy of the License at
10  *
11  *     https://opensource.org/licenses/GPL-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software distributed under the
14  * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
15  * either express or implied. See the License for the specific language governing permissions
16  * and limitations under the License.
17  */
18 
19 
20 /**
21  *  @filename micro_thread.cpp
22  *  @info  micro thread manager
23  */
24 #include "mt_version.h"
25 #include "micro_thread.h"
26 #include "mt_net.h"
27 #include "valgrind.h"
28 #include <assert.h>
29 #include "mt_sys_hook.h"
30 #include "ff_hook.h"
31 #include "ff_api.h"
32 
33 using namespace NS_MICRO_THREAD;
34 
35 #define  ASSERT(statement)
36 //#define  ASSERT(statement)   assert(statement)
37 
38 /**
39  *  @brief ���ʵ�ֱ��������ĺ���
40  *  @param jbf jmpbuff����ָ��
41  */
42 extern "C"  int save_context(jmp_buf jbf);
43 
44 /**
45  *  @brief ���ʵ�ָֻ������ĺ���
46  *  @param jbf jmpbuff����ָ��
47  *  @param ret �лصķ���ֵ, Ĭ��1
48  */
49 extern "C"  void restore_context(jmp_buf jbf, int ret);
50 
51 /**
52  *  @brief ���ʵ���滻����ջ����
53  *  @param jbf jmpbuff����ָ��
54  *  @param esp ��ջָ��
55  */
56 extern "C"  void replace_esp(jmp_buf jbf, void* esp);
57 
58 /**
59  * @brief ���캯��, Ĭ�ϲ���ջ��С
60  */
61 Thread::Thread(int stack_size)
62 {
63     _stack_size  = stack_size ? stack_size : ThreadPool::default_stack_size;
64     _wakeup_time = 0;
65     _stack       = NULL;
66     memset(&_jmpbuf, 0, sizeof(_jmpbuf));
67 }
68 
69 
70 /**
71  *  @brief LINUX x86/x86_64�µ�ջ����, �����ܹ�����Ҫע�����
72  */
73 bool Thread::InitStack()
74 {
75     if (_stack) {
76         return true;
77     }
78 
79     ///< ջ������ջ�ڴ����, ��Խ��
80     _stack = (MtStack*)calloc(1, sizeof(MtStack));
81     if (NULL == _stack)
82     {
83         MTLOG_ERROR("calloc stack failed, size %u", sizeof(MtStack));
84         return false;
85     }
86 
87     int memsize = MEM_PAGE_SIZE*2 + _stack_size;
88     memsize = (memsize + MEM_PAGE_SIZE - 1)/MEM_PAGE_SIZE*MEM_PAGE_SIZE;
89 
90     static int zero_fd = -1;
91     int mmap_flags = MAP_PRIVATE | MAP_ANON;
92     void* vaddr = mmap(NULL, memsize, PROT_READ | PROT_WRITE, mmap_flags, zero_fd, 0);
93     if (vaddr == (void *)MAP_FAILED)
94     {
95         MTLOG_ERROR("mmap stack failed, size %d", memsize);
96         free(_stack);
97         _stack = NULL;
98         return false;
99     }
100     _stack->_vaddr = (char*)vaddr;
101     _stack->_vaddr_size = memsize;
102     _stack->_stk_size = _stack_size;
103     _stack->_stk_bottom = _stack->_vaddr + MEM_PAGE_SIZE;
104     _stack->_stk_top = _stack->_stk_bottom + _stack->_stk_size;
105 	// valgrind support: register stack frame
106 	_stack->valgrind_id = VALGRIND_STACK_REGISTER(_stack->_stk_bottom, _stack->_stk_top);
107 
108     _stack->_esp = _stack->_stk_top - STACK_PAD_SIZE;
109 
110     mprotect(_stack->_vaddr, MEM_PAGE_SIZE, PROT_NONE);
111     mprotect(_stack->_stk_top, MEM_PAGE_SIZE, PROT_NONE);
112 
113     return true;
114 }
115 
116 
117 /**
118  * @brief �ͷŶ�ջ��Ϣ
119  */
120 void Thread::FreeStack()
121 {
122     if (!_stack) {
123         return;
124     }
125     munmap(_stack->_vaddr, _stack->_vaddr_size);
126 	// valgrind support: deregister stack frame
127 	VALGRIND_STACK_DEREGISTER(_stack->valgrind_id);
128     free(_stack);
129     _stack = NULL;
130 }
131 
132 /**
133  * @brief ��ʼ��������,���üĴ���,��ջ
134  */
135 void Thread::InitContext()
136 {
137     if (save_context(_jmpbuf) != 0)
138     {
139         ScheduleObj::Instance()->ScheduleStartRun(); // ֱ�ӵ��� this->run?
140     }
141 
142     if (_stack != NULL)
143     {
144         replace_esp(_jmpbuf, _stack->_esp);
145     }
146 }
147 
148 /**
149  * @brief �����л�, ����״̬, ��������
150  */
151 void Thread::SwitchContext()
152 {
153     if (save_context(_jmpbuf) == 0)
154     {
155         ScheduleObj::Instance()->ScheduleThread();
156     }
157 }
158 
159 /**
160  * @brief �ָ�������, �л��ضϵ�,��������
161  */
162 void Thread::RestoreContext()
163 {
164     restore_context(_jmpbuf, 1);
165 }
166 
167 /**
168  * @brief ��ʼ���߳�,���ջ�������ij�ʼ��
169  */
170 bool Thread::Initial()
171 {
172     if (!InitStack())
173     {
174         MTLOG_ERROR("init stack failed");
175         return false;
176     }
177 
178     InitContext();
179 
180     return true;
181 }
182 
183 /**
184  * @brief ��ֹ�߳�,���ջ���������ͷ�
185  */
186 void Thread::Destroy()
187 {
188     FreeStack();
189     memset(&_jmpbuf, 0, sizeof(_jmpbuf));
190 }
191 
192 /**
193  * @brief �߳�״̬����, �ɸ���״̬
194  */
195 void Thread::Reset()
196 {
197     _wakeup_time = 0;
198     SetPrivate(NULL);
199 
200     InitContext();
201     CleanState();
202 }
203 
204 /**
205  * @brief �߳���������˯��, ��λ����
206  * @param ms ˯�ߺ�����
207  */
208 void Thread::sleep(int ms)
209 {
210     utime64_t now = ScheduleObj::Instance()->ScheduleGetTime();
211     _wakeup_time = now + ms;
212 
213     if (save_context(_jmpbuf) == 0)
214     {
215         ScheduleObj::Instance()->ScheduleSleep();
216     }
217 }
218 
219 /**
220  * @brief ��������״̬, �ȴ��������߳̽���
221  */
222 void Thread::Wait()
223 {
224     if (save_context(_jmpbuf) == 0)
225     {
226         ScheduleObj::Instance()->SchedulePend();
227     }
228 }
229 
230 /**
231  * @brief ��ʼ��������,���üĴ���,��ջ
232  */
233 bool Thread::CheckStackHealth(char *esp)
234 {
235 	if (!_stack)
236 		return false;
237 
238 	if (esp > _stack->_stk_bottom && esp < _stack->_stk_top)
239 		return true;
240 	else
241 		return false;
242 }
243 
244 /**
245  * @brief ΢�̹߳���, Ĭ������ͨ�߳�
246  * @param type ����, Ĭ����ͨ
247  */
248 MicroThread::MicroThread(ThreadType type)
249 {
250     memset(&_entry, 0, sizeof(_entry));
251     TAILQ_INIT(&_fdset);
252     TAILQ_INIT(&_sub_list);
253     _flag = NOT_INLIST;
254     _type = type;
255     _state = INITIAL;
256     _start = NULL;
257     _args = NULL;
258     _parent = NULL;
259 }
260 
261 /**
262  * @breif ΢�̸߳���״̬����
263  */
264 void MicroThread::CleanState()
265 {
266     TAILQ_INIT(&_fdset);
267     TAILQ_INIT(&_sub_list);
268     _flag = NOT_INLIST;
269     _type = NORMAL;
270     _state = INITIAL;
271     _start = NULL;
272     _args = NULL;
273     _parent = NULL;
274 }
275 
276 /**
277  * @brief �̵߳�ʵ�ʹ�������
278  */
279 void MicroThread::Run()
280 {
281     if (_start) {
282         _start(_args);
283     }
284 
285     // �����߳�, �������߳̽��������̬
286     if (this->IsSubThread()) {
287         this->WakeupParent();
288     }
289 
290     ScheduleObj::Instance()->ScheduleReclaim();
291     ScheduleObj::Instance()->ScheduleThread();
292 }
293 
294 /**
295  * @brief �������̻߳��Ѹ��̴߳���
296  */
297 void MicroThread::WakeupParent()
298 {
299     MicroThread* parent = this->GetParent();
300     if (parent)
301     {
302         parent->RemoveSubThread(this);
303         if (parent->HasNoSubThread())
304         {
305             ScheduleObj::Instance()->ScheduleUnpend(parent);
306         }
307     }
308     else
309     {
310         MTLOG_ERROR("Sub thread no parent, error");
311     }
312 }
313 
314 /**
315  * @brief �Ƿ��������Ķ������߳�
316  */
317 bool MicroThread::HasNoSubThread()
318 {
319     return TAILQ_EMPTY(&_sub_list);
320 }
321 
322 /**
323  * @brief ��ָ�����̼߳�������߳��б�
324  */
325 void MicroThread::AddSubThread(MicroThread* sub)
326 {
327     ASSERT(!sub->HasFlag(MicroThread::SUB_LIST));
328     if (!sub->HasFlag(MicroThread::SUB_LIST))
329     {
330         TAILQ_INSERT_TAIL(&_sub_list, sub, _sub_entry);
331         sub->_parent = this;
332     }
333 
334     sub->SetFlag(MicroThread::SUB_LIST);
335 }
336 
337 /**
338  * @brief ��ָ���߳��Ƴ������߳��б�
339  */
340 void MicroThread::RemoveSubThread(MicroThread* sub)
341 {
342     ASSERT(sub->HasFlag(MicroThread::SUB_LIST));
343     if (sub->HasFlag(MicroThread::SUB_LIST))
344     {
345         TAILQ_REMOVE(&_sub_list, sub, _sub_entry);
346         sub->_parent = NULL;
347     }
348 
349     sub->UnsetFlag(MicroThread::SUB_LIST);
350 }
351 
352 
353 /**
354  * @brief ��������ʾ�����
355  */
356 ScheduleObj *ScheduleObj::_instance = NULL;     ///< ��̬�����ʼ��
357 inline ScheduleObj* ScheduleObj::Instance()
358 {
359     if (NULL == _instance)
360     {
361         _instance = new ScheduleObj();
362     }
363 
364     return _instance;
365 }
366 
367 /**
368  * @brief ��������΢�߳�������, �����ӿ�
369  */
370 void ScheduleObj::ScheduleThread()
371 {
372     MtFrame* frame = MtFrame::Instance();
373     frame->ThreadSchdule();
374 }
375 
376 /**
377  * @brief ��ȡȫ�ֵ�ʱ���, ���뵥λ
378  */
379 utime64_t ScheduleObj::ScheduleGetTime()
380 {
381     MtFrame* frame = MtFrame::Instance();
382     if (frame)
383     {
384         return frame->GetLastClock();
385     }
386     else
387     {
388         MTLOG_ERROR("frame time failed, maybe not init");
389         return 0;
390     }
391 }
392 
393 /**
394  * @brief �̵߳�����������sleep״̬
395  */
396 void ScheduleObj::ScheduleSleep()
397 {
398     MtFrame* frame = MtFrame::Instance();
399     MicroThread* thread = frame->GetActiveThread();
400     if ((!frame) || (!thread)) {
401         MTLOG_ERROR("frame and act thread null, %p, %p", frame, thread);
402         return;
403     }
404 
405     frame->InsertSleep(thread);
406     frame->ThreadSchdule();
407 }
408 
409 /**
410  * @brief �̵߳�����������pend״̬
411  */
412 void ScheduleObj::SchedulePend()
413 {
414     MtFrame* frame = MtFrame::Instance();
415     MicroThread* thread = frame->GetActiveThread();
416     if ((!frame) || (!thread)) {
417         MTLOG_ERROR("frame and act thread null, %p, %p", frame, thread);
418         return;
419     }
420 
421     frame->InsertPend(thread);
422     frame->ThreadSchdule();
423 }
424 
425 /**
426  * @brief �̵߳���ȡ��pend״̬, �ⲿ����ȡ��
427  */
428 void ScheduleObj::ScheduleUnpend(void* pthread)
429 {
430     MtFrame* frame = MtFrame::Instance();
431     MicroThread* thread = (MicroThread*)pthread;
432     if ((!frame) || (!thread)) {
433         MTLOG_ERROR("frame and act thread null, %p, %p", frame, thread);
434         return;
435     }
436 
437     frame->RemovePend(thread);
438     frame->InsertRunable(thread);
439 }
440 
441 
442 
443 /**
444  * @brief �߳�ִ����Ϻ�, ���մ���
445  */
446 void ScheduleObj::ScheduleReclaim()
447 {
448     MtFrame* frame = MtFrame::Instance();
449     MicroThread* thread = frame->GetActiveThread();
450     if ((!frame) || (!thread)) {
451         MTLOG_ERROR("frame and act thread null, %p, %p", frame, thread);
452         return;
453     }
454 
455     frame->FreeThread(thread);
456 }
457 
458 /**
459  * @brief ���������ȳ�ʼִ��
460  */
461 void ScheduleObj::ScheduleStartRun()
462 {
463     MtFrame* frame = MtFrame::Instance();
464     MicroThread* thread = frame->GetActiveThread();
465     if ((!frame) || (!thread)) {
466         MTLOG_ERROR("frame and act thread null, %p, %p", frame, thread);
467         return;
468     }
469 
470     thread->Run();
471 }
472 
473 
474 /**
475  * @brief ΢�̳߳�ȫ�ֲ�����ʼ��
476  */
477 unsigned int ThreadPool::default_thread_num = DEFAULT_THREAD_NUM;   ///< Ĭ��2000΢�̴߳���
478 unsigned int ThreadPool::default_stack_size = DEFAULT_STACK_SIZE;   ///< Ĭ��128Kջ��С
479 
480 /**
481  * @brief ΢�̳߳س�ʼ��
482  */
483 bool ThreadPool::InitialPool(int max_num)
484 {
485     MicroThread *thread = NULL;
486     for (unsigned int i = 0; i < default_thread_num; i++)
487     {
488         thread = new MicroThread();
489         if ((NULL == thread) || (false == thread->Initial()))
490         {
491             MTLOG_ERROR("init pool, thread %p init failed", thread);
492             if (thread)  delete thread;
493             continue;
494         }
495         thread->SetFlag(MicroThread::FREE_LIST);
496         _freelist.push(thread);
497     }
498 
499     _total_num = _freelist.size();
500     _max_num  = max_num;
501     _use_num = 0;
502     if (_total_num <= 0)
503     {
504         return false;
505     }
506     else
507     {
508         return true;
509     }
510 }
511 
512 /**
513  * @brief ΢�̳߳ط���ʼ��
514  */
515 void ThreadPool::DestroyPool()
516 {
517     MicroThread* thread = NULL;
518     while (!_freelist.empty())
519     {
520         thread = _freelist.front();
521         _freelist.pop();
522         thread->Destroy();
523         delete thread;
524     }
525 
526     _total_num = 0;
527     _use_num = 0;
528 }
529 
530 /**
531  * @brief ΢�̷߳���ӿ�
532  * @return ΢�̶߳���
533  */
534 MicroThread* ThreadPool::AllocThread()
535 {
536     MT_ATTR_API_SET(492069, _total_num); // ΢�̳߳ش�С
537 
538     MicroThread* thread = NULL;
539     if (!_freelist.empty())
540     {
541         thread = _freelist.front();
542         _freelist.pop();
543 
544         ASSERT(thread->HasFlag(MicroThread::FREE_LIST));
545 
546         thread->UnsetFlag(MicroThread::FREE_LIST);
547         _use_num++;
548         return thread;
549     }
550 
551     MT_ATTR_API(320846, 1); // pool no nore
552     if (_total_num >= _max_num)
553     {
554         MT_ATTR_API(361140, 1); // no more quota
555         return NULL;
556     }
557 
558     thread = new MicroThread();
559     if ((NULL == thread) || (false == thread->Initial()))
560     {
561         MT_ATTR_API(320847, 1); // pool init fail
562         MTLOG_ERROR("thread alloc failed, thread: %p", thread);
563         if (thread)  delete thread;
564         return NULL;
565     }
566     _total_num++;
567     _use_num++;
568 
569     return thread;
570 }
571 
572 /**
573  * @brief ΢�߳��ͷŽӿ�
574  * @param thread ΢�̶߳���
575  */
576 void ThreadPool::FreeThread(MicroThread* thread)
577 {
578     ASSERT(!thread->HasFlag(MicroThread::FREE_LIST));
579     thread->Reset();
580     _use_num--;
581     _freelist.push(thread);
582     thread->SetFlag(MicroThread::FREE_LIST);
583 
584     ///< ���ж��� > default_thread_num, ���ͷ����ϵ�, �������ͷŵ�ǰ
585     unsigned int free_num = _freelist.size();
586     if ((free_num > default_thread_num) && (free_num > 1))
587     {
588         thread = _freelist.front();
589         _freelist.pop();
590         thread->Destroy();
591         delete thread;
592         _total_num--;
593     }
594 }
595 
596 int ThreadPool::GetUsedNum(void)
597 {
598 	return _use_num;
599 }
600 
601 /**
602  * @brief ΢�߳̿����, ȫ��ʵ����ȡ
603  */
604 MtFrame *MtFrame::_instance = NULL;
605 inline MtFrame* MtFrame::Instance ()
606 {
607     if (NULL == _instance )
608     {
609         _instance = new MtFrame();
610     }
611 
612     return _instance;
613 }
614 
615 /**
616  * @brief HOOKϵͳapi������
617  */
618 void MtFrame::SetHookFlag() {
619     mt_set_hook_flag();
620 };
621 
622 
623 /**
624  * @brief ��ܳ�ʼ��, Ĭ�ϲ�����־����
625  */
626 bool MtFrame::InitFrame(LogAdapter* logadpt, int max_thread_num)
627 {
628     _log_adpt = logadpt;
629 
630     // �������������߳���Ŀ, ���Ե���epoll��ص�fd��Ŀ
631     if ((this->InitKqueue(max_thread_num) < 0) || !this->InitialPool(max_thread_num))
632     {
633         MTLOG_ERROR("Init epoll or thread pool failed");
634         this->Destroy();
635         return false;
636     }
637 
638     // �������öѴ�С, �Ŵ�Ѹ���Ϊ2��
639     if (_sleeplist.HeapResize(max_thread_num * 2) < 0)
640     {
641         MTLOG_ERROR("Init heap list failed");
642         this->Destroy();
643         return false;
644     }
645 
646     // ��ʱ�������ʼ��, �Ŵ�Ѹ���Ϊ2��
647     _timer = new CTimerMng(max_thread_num * 2);
648     if (NULL == _timer)
649     {
650         MTLOG_ERROR("Init heap timer failed");
651         this->Destroy();
652         return false;
653     }
654 
655     // �ػ��̵߳�����ʼ��
656     _daemon = AllocThread();
657     if (NULL == _daemon)
658     {
659         MTLOG_ERROR("Alloc daemon thread failed");
660         this->Destroy();
661         return false;
662     }
663     _daemon->SetType(MicroThread::DAEMON);
664     _daemon->SetState(MicroThread::RUNABLE);
665     _daemon->SetSartFunc(MtFrame::DaemonRun, this);
666 
667     // �����߳�, ����INIT, ����ʼ��ջ, Ҳ�޻ص�ע��, ����Ҫͳһ����
668     _primo = new MicroThread(MicroThread::PRIMORDIAL);
669     if (NULL == _primo)
670     {
671         MTLOG_ERROR("new _primo thread failed");
672         this->Destroy();
673         return false;
674     }
675     _primo->SetState(MicroThread::RUNNING);
676     SetActiveThread(_primo);
677 
678     // ��������ʱ���
679     _last_clock = GetSystemMS();
680     TAILQ_INIT(&_iolist);
681     TAILQ_INIT(&_pend_list);
682 
683 	//SetHookFlag();
684 
685     return true;
686 
687 }
688 
689 /**
690  * @brief ��ܷ���ʼ��
691  */
692 void MtFrame::Destroy(void)
693 {
694     if (NULL == _instance )
695     {
696         return;
697     }
698 
699     if (_primo) {
700         delete _primo;
701         _primo = NULL;
702     }
703 
704     if (_daemon) {
705         FreeThread(_daemon);
706         _daemon = NULL;
707     }
708 
709     TAILQ_INIT(&_iolist);
710 
711     MicroThread* thread = dynamic_cast<MicroThread*>(_sleeplist.HeapPop());
712     while (thread)
713     {
714         FreeThread(thread);
715         thread = dynamic_cast<MicroThread*>(_sleeplist.HeapPop());
716     }
717 
718     while (!_runlist.empty())
719     {
720         thread = _runlist.front();
721         _runlist.pop();
722         FreeThread(thread);
723     }
724 
725     MicroThread* tmp;
726     TAILQ_FOREACH_SAFE(thread, &_pend_list, _entry, tmp)
727     {
728         TAILQ_REMOVE(&_pend_list, thread, _entry);
729         FreeThread(thread);
730     }
731 
732     if (_timer != NULL)
733     {
734         delete _timer;
735         _timer = NULL;
736     }
737 
738     _instance->DestroyPool();
739     _instance->TermKqueue();
740     delete _instance;
741     _instance = NULL;
742 }
743 
744 /**
745  * @brief ΢�߳̿�ܰ汾��ȡ
746  */
747 char* MtFrame::Version()
748 {
749     return IMT_VERSION;
750 }
751 
752 /**
753  * @brief ΢�̴߳����ӿ�
754  * @param entry �߳���ں���
755  * @param args  �߳���ڲ���
756  * @return ΢�߳�ָ��, NULL��ʾʧ��
757  */
758 MicroThread* MtFrame::CreateThread(ThreadStart entry, void *args, bool runable)
759 {
760     MtFrame* mtframe = MtFrame::Instance();
761     MicroThread* thread = mtframe->AllocThread();
762     if (NULL == thread)
763     {
764         MTLOG_ERROR("create thread failed");
765         return NULL;
766     }
767     thread->SetSartFunc(entry, args);
768 
769     if (runable) {
770         mtframe->InsertRunable(thread);
771     }
772 
773     return thread;
774 }
775 
776 int MtFrame::Loop(void* args)
777 {
778     MtFrame* mtframe = MtFrame::Instance();
779     MicroThread* daemon = mtframe->DaemonThread();
780 
781 	mtframe->KqueueDispatch();
782 	mtframe->SetLastClock(mtframe->GetSystemMS());
783 	mtframe->WakeupTimeout();
784 	mtframe->CheckExpired();
785 	daemon->SwitchContext();
786 
787 	return 0;
788 }
789 
790 /**
791  * @brief �ػ��߳���ں���, ����ָ��Ҫ��static����
792  * @param args  �߳���ڲ���
793  */
794 void MtFrame::DaemonRun(void* args)
795 {
796 	/*
797     MtFrame* mtframe = MtFrame::Instance();
798     MicroThread* daemon = mtframe->DaemonThread();
799 
800 	while (true) {
801 		mtframe->KqueueDispatch();
802 		mtframe->SetLastClock(mtframe->GetSystemMS());
803 		mtframe->WakeupTimeout();
804 		mtframe->CheckExpired();
805 		daemon->SwitchContext();
806 	}
807 	*/
808 	ff_run(MtFrame::Loop, NULL);
809 }
810 
811 /**
812  * @brief ��ȡ��ǰ�̵߳ĸ��߳�
813  */
814 MicroThread *MtFrame::GetRootThread()
815 {
816     if (NULL == _curr_thread)
817     {
818         return NULL;
819     }
820 
821     MicroThread::ThreadType type = _curr_thread->GetType();
822     MicroThread *thread = _curr_thread;
823     MicroThread *parent = thread;
824 
825     while (MicroThread::SUB_THREAD == type)
826     {
827         thread = thread->GetParent();
828         if (!thread)
829         {
830             break;
831         }
832 
833         type   = thread->GetType();
834         parent = thread;
835     }
836 
837     return parent;
838 }
839 
840 /**
841  * @brief ��ܵ����߳�����
842  */
843 void MtFrame::ThreadSchdule()
844 {
845     MicroThread* thread = NULL;
846     MtFrame* mtframe = MtFrame::Instance();
847 
848     if (mtframe->_runlist.empty())
849     {
850         thread = mtframe->DaemonThread();
851     }
852     else
853     {
854         thread = mtframe->_runlist.front();
855         mtframe->RemoveRunable(thread);
856     }
857 
858     this->SetActiveThread(thread);
859     thread->SetState(MicroThread::RUNNING);
860     thread->RestoreContext();
861 }
862 
863 /**
864  * @brief ��ܴ���ʱ�ص�����
865  */
866 void MtFrame::CheckExpired()
867 {
868     static utime64_t check_time = 0;
869 
870     if (_timer != NULL)
871     {
872         _timer->check_expired();
873     }
874 
875    utime64_t now = GetLastClock();
876 
877     if ((now - check_time) > 1000)
878     {
879         CNetMgr::Instance()->RecycleObjs(now);
880         check_time = now;
881     }
882 }
883 
884 /**
885  * @brief ��ܼ�⵽��ʱ, �������еij�ʱ�߳�
886  */
887 void MtFrame::WakeupTimeout()
888 {
889     utime64_t now = GetLastClock();
890     MicroThread* thread = dynamic_cast<MicroThread*>(_sleeplist.HeapTop());
891     while (thread && (thread->GetWakeupTime() <= now))
892     {
893         if (thread->HasFlag(MicroThread::IO_LIST))
894 	    {
895             RemoveIoWait(thread);
896         }
897         else
898         {
899             RemoveSleep(thread);
900         }
901 
902         InsertRunable(thread);
903 
904         thread = dynamic_cast<MicroThread*>(_sleeplist.HeapTop());
905     }
906 }
907 
908 /**
909  * @brief ��ܵ���epoll waitǰ, �ж��ȴ�ʱ����Ϣ
910  */
911 int MtFrame::KqueueGetTimeout()
912 {
913     utime64_t now = GetLastClock();
914     MicroThread* thread = dynamic_cast<MicroThread*>(_sleeplist.HeapTop());
915     if (!thread)
916     {
917         return 10; //Ĭ��10ms epollwait
918     }
919     else if (thread->GetWakeupTime() < now)
920     {
921         return 0;
922     }
923     else
924     {
925         return (int)(thread->GetWakeupTime() - now);
926     }
927 }
928 
929 /**
930  * @brief ��ܹ����̵߳�Ԫ, ���������
931  * @param thread ΢�̶߳���
932  */
933 inline void MtFrame::InsertSleep(MicroThread* thread)
934 {
935     ASSERT(!thread->HasFlag(MicroThread::SLEEP_LIST));
936 
937     thread->SetFlag(MicroThread::SLEEP_LIST);
938     thread->SetState(MicroThread::SLEEPING);
939     int rc = _sleeplist.HeapPush(thread);
940     if (rc < 0)
941     {
942         MT_ATTR_API(320848, 1); // heap error
943         MTLOG_ERROR("Insert heap failed , rc %d", rc);
944     }
945 }
946 
947 /**
948  * @brief ��ܹ����̵߳�Ԫ, �Ƴ������
949  * @param thread ΢�̶߳���
950  */
951 inline void MtFrame::RemoveSleep(MicroThread* thread)
952 {
953     ASSERT(thread->HasFlag(MicroThread::SLEEP_LIST));
954     thread->UnsetFlag(MicroThread::SLEEP_LIST);
955 
956     int rc = _sleeplist.HeapDelete(thread);
957     if (rc < 0)
958     {
959         MT_ATTR_API(320849, 1); // heap error
960         MTLOG_ERROR("remove heap failed , rc %d", rc);
961     }
962 }
963 
964 /**
965  * @brief ��ܹ����̵߳�Ԫ, ִ��IO�ȴ�״̬
966  * @param thread ΢�̶߳���
967  */
968 inline void MtFrame::InsertIoWait(MicroThread* thread)
969 {
970     ASSERT(!thread->HasFlag(MicroThread::IO_LIST));
971     thread->SetFlag(MicroThread::IO_LIST);
972     TAILQ_INSERT_TAIL(&_iolist, thread, _entry);
973     InsertSleep(thread);
974 }
975 
976 /**
977  * @brief ��ܹ����̵߳�Ԫ, �Ƴ�IO�ȴ�״̬
978  * @param thread ΢�̶߳���
979  */
980 void MtFrame::RemoveIoWait(MicroThread* thread)
981 {
982     ASSERT(thread->HasFlag(MicroThread::IO_LIST));
983     thread->UnsetFlag(MicroThread::IO_LIST);
984     TAILQ_REMOVE(&_iolist, thread, _entry);
985 
986     RemoveSleep(thread);
987 }
988 
989 /**
990  * @brief ��ܹ����̵߳�Ԫ, ��������ж���
991  * @param thread ΢�̶߳���
992  */
993 void MtFrame::InsertRunable(MicroThread* thread)
994 {
995     ASSERT(!thread->HasFlag(MicroThread::RUN_LIST));
996     thread->SetFlag(MicroThread::RUN_LIST);
997 
998     thread->SetState(MicroThread::RUNABLE);
999     _runlist.push(thread);
1000     _waitnum++;
1001 }
1002 
1003 /**
1004  * @brief ��ܹ����̵߳�Ԫ, �Ƴ������ж���
1005  * @param thread ΢�̶߳���
1006  */
1007 inline void MtFrame::RemoveRunable(MicroThread* thread)
1008 {
1009     ASSERT(thread->HasFlag(MicroThread::RUN_LIST));
1010     ASSERT(thread == _runlist.front());
1011     thread->UnsetFlag(MicroThread::RUN_LIST);
1012 
1013     _runlist.pop();
1014     _waitnum--;
1015 }
1016 
1017 
1018 /**
1019  * @brief ��ܹ����̵߳�Ԫ, ִ��pend�ȴ�״̬
1020  * @param thread ΢�̶߳���
1021  */
1022 void MtFrame::InsertPend(MicroThread* thread)
1023 {
1024     ASSERT(!thread->HasFlag(MicroThread::PEND_LIST));
1025     thread->SetFlag(MicroThread::PEND_LIST);
1026     TAILQ_INSERT_TAIL(&_pend_list, thread, _entry);
1027     thread->SetState(MicroThread::PENDING);
1028 }
1029 
1030 /**
1031  * @brief ��ܹ����̵߳�Ԫ, �Ƴ�PEND�ȴ�״̬
1032  * @param thread ΢�̶߳���
1033  */
1034 void MtFrame::RemovePend(MicroThread* thread)
1035 {
1036     ASSERT(thread->HasFlag(MicroThread::PEND_LIST));
1037     thread->UnsetFlag(MicroThread::PEND_LIST);
1038     TAILQ_REMOVE(&_pend_list, thread, _entry);
1039 }
1040 
1041 /**
1042  * @brief ΢�߳������л�, �ȴ������̵߳Ļ���
1043  * @param timeout ��ȴ�ʱ��, ����
1044  */
1045 void MtFrame::WaitNotify(utime64_t timeout)
1046 {
1047     MicroThread* thread = GetActiveThread();
1048 
1049     thread->SetWakeupTime(timeout + this->GetLastClock());
1050     this->InsertIoWait(thread);
1051     thread->SwitchContext();
1052 }
1053 
1054 /**
1055  * @brief ΢�̴߳����л�����,���óɹ� ���ó�cpu
1056  * @param fdlist ��·������socket�б�
1057  * @param fd ���������fd��Ϣ
1058  * @param timeout ��ȴ�ʱ��, ����
1059  * @return true �ɹ�, false ʧ��
1060  */
1061 bool MtFrame::KqueueSchedule(KqObjList* fdlist, KqueuerObj* fd, int timeout)
1062 {
1063     MicroThread* thread = GetActiveThread();
1064     if (NULL == thread)
1065     {
1066         MTLOG_ERROR("active thread null, epoll schedule failed");
1067         return false;
1068     }
1069 
1070     // 1. ���ϸ��߳���Ҫ���ĵ�epoll���ȶ���
1071     thread->ClearAllFd();
1072     if (fdlist)
1073     {
1074         thread->AddFdList(fdlist);
1075     }
1076     if (fd)
1077     {
1078         thread->AddFd(fd);
1079     }
1080 
1081     // 2. ����epoll�����¼�, ������ʱʱ��, �л�IO�ȴ�״̬, �����л�
1082     thread->SetWakeupTime(timeout + this->GetLastClock());
1083     if (!this->KqueueAdd(thread->GetFdSet()))
1084     {
1085         MTLOG_ERROR("epoll add failed, errno: %d", errno);
1086         return false;
1087     }
1088     this->InsertIoWait(thread);
1089     thread->SwitchContext();
1090 
1091     // 3. ����OK, �ж���ʱ, epoll ctrl ��ԭ״̬
1092     int rcvnum = 0;
1093     KqObjList& rcvfds = thread->GetFdSet();
1094     KqueuerObj* fdata = NULL;
1095     TAILQ_FOREACH(fdata, &rcvfds, _entry)
1096     {
1097         if (fdata->GetRcvEvents() != 0)
1098         {
1099             rcvnum++;
1100         }
1101     }
1102     this->KqueueDel(rcvfds);     // ��һ��������ADD, DEL �ջ�����
1103 
1104     if (rcvnum == 0)    // ��ʱ����, ���ش���
1105     {
1106         errno = ETIME;
1107         return false;
1108     }
1109 
1110     return true;
1111 }
1112 
1113 /**
1114  * @brief ΢�̰߳�����ϵͳIO���� recvfrom
1115  * @param fd ϵͳsocket��Ϣ
1116  * @param buf ������Ϣ������ָ��
1117  * @param len ������Ϣ����������
1118  * @param from ��Դ��ַ��ָ��
1119  * @param fromlen ��Դ��ַ�Ľṹ����
1120  * @param timeout ��ȴ�ʱ��, ����
1121  * @return >0 �ɹ����ճ���, <0 ʧ��
1122  */
1123 int MtFrame::recvfrom(int fd, void *buf, int len, int flags, struct sockaddr *from, socklen_t *fromlen, int timeout)
1124 {
1125     MtFrame* mtframe = MtFrame::Instance();
1126     utime64_t start = mtframe->GetLastClock();
1127     MicroThread* thread = mtframe->GetActiveThread();
1128     utime64_t now = 0;
1129 
1130     if(fd<0 || !buf || len<1)
1131     {
1132     	errno = EINVAL;
1133     	MTLOG_ERROR("recvfrom failed, errno: %d (%m)", errno);
1134     	return -10;
1135     }
1136 
1137     if (timeout <= -1)
1138     {
1139         timeout = 0x7fffffff;
1140     }
1141 
1142     while (true)
1143     {
1144         now = mtframe->GetLastClock();
1145         if ((int)(now - start) > timeout)
1146         {
1147             errno = ETIME;
1148             return -1;
1149         }
1150 
1151         KqueuerObj epfd;
1152         epfd.SetOsfd(fd);
1153         epfd.EnableInput();
1154         epfd.SetOwnerThread(thread);
1155         if (!mtframe->KqueueSchedule(NULL, &epfd, timeout))
1156         {
1157             MTLOG_DEBUG("epoll schedule failed, errno: %d", errno);
1158             return -2;
1159         }
1160 
1161         mt_hook_syscall(recvfrom);
1162         int n = ff_hook_recvfrom(fd, buf, len, flags, from, fromlen);
1163         if (n < 0)
1164         {
1165             if (errno == EINTR) {
1166                 continue;
1167             }
1168 
1169             if ((errno != EAGAIN) && (errno != EWOULDBLOCK))
1170             {
1171                 MTLOG_ERROR("recvfrom failed, errno: %d", errno);
1172                 return -3;
1173             }
1174         }
1175         else
1176         {
1177             return n;
1178         }
1179     }
1180 
1181 }
1182 
1183 /**
1184  * @brief ΢�̰߳�����ϵͳIO���� sendto
1185  * @param fd ϵͳsocket��Ϣ
1186  * @param msg �����͵���Ϣָ��
1187  * @param len �����͵���Ϣ����
1188  * @param to Ŀ�ĵ�ַ��ָ��
1189  * @param tolen Ŀ�ĵ�ַ�Ľṹ����
1190  * @param timeout ��ȴ�ʱ��, ����
1191  * @return >0 �ɹ����ͳ���, <0 ʧ��
1192  */
1193 int MtFrame::sendto(int fd, const void *msg, int len, int flags, const struct sockaddr *to, int tolen, int timeout)
1194 {
1195     MtFrame* mtframe = MtFrame::Instance();
1196     utime64_t start = mtframe->GetLastClock();
1197     MicroThread* thread = mtframe->GetActiveThread();
1198     utime64_t now = 0;
1199 
1200     if(fd<0 || !msg || len<1)
1201     {
1202     	errno = EINVAL;
1203     	MTLOG_ERROR("sendto failed, errno: %d (%m)", errno);
1204     	return -10;
1205     }
1206 
1207     int n = 0;
1208     mt_hook_syscall(sendto);
1209     while ((n = ff_hook_sendto(fd, msg, len, flags, to, tolen)) < 0)
1210     {
1211         now = mtframe->GetLastClock();
1212         if ((int)(now - start) > timeout)
1213         {
1214             errno = ETIME;
1215             return -1;
1216         }
1217 
1218         if (errno == EINTR) {
1219             continue;
1220         }
1221 
1222         if ((errno != EAGAIN) && (errno != EWOULDBLOCK)) {
1223             MTLOG_ERROR("sendto failed, errno: %d", errno);
1224             return -2;
1225         }
1226 
1227         KqueuerObj epfd;
1228         epfd.SetOsfd(fd);
1229         epfd.EnableOutput();
1230         epfd.SetOwnerThread(thread);
1231         if (!mtframe->KqueueSchedule(NULL, &epfd, timeout)) {
1232             return -3;
1233         }
1234     }
1235 
1236     return n;
1237 }
1238 
1239 /**
1240  * @brief ΢�̰߳�����ϵͳIO���� connect
1241  * @param fd ϵͳsocket��Ϣ
1242  * @param addr ָ��server��Ŀ�ĵ�ַ
1243  * @param addrlen ��ַ�ij���
1244  * @param timeout ��ȴ�ʱ��, ����
1245  * @return =0 ���ӳɹ�, <0 ʧ��
1246  */
1247 int MtFrame::connect(int fd, const struct sockaddr *addr, int addrlen, int timeout)
1248 {
1249     MtFrame* mtframe = MtFrame::Instance();
1250     utime64_t start = mtframe->GetLastClock();
1251     MicroThread* thread = mtframe->GetActiveThread();
1252     utime64_t now = 0;
1253 
1254     if(fd<0 || !addr || addrlen<1)
1255     {
1256     	errno = EINVAL;
1257     	MTLOG_ERROR("connect failed, errno: %d (%m)", errno);
1258     	return -10;
1259     }
1260 
1261     int n = 0;
1262     mt_hook_syscall(connect);
1263     while ((n = ff_hook_connect(fd, addr, addrlen)) < 0)
1264     {
1265         now = mtframe->GetLastClock();
1266         if ((int)(now - start) > timeout)
1267         {
1268             errno = ETIME;
1269             return -1;
1270         }
1271 
1272         if (errno == EISCONN)   // ������, ���سɹ�
1273         {
1274             return 0;
1275         }
1276 
1277         if (errno == EINTR) {
1278             continue;
1279         }
1280 
1281         if (errno != EINPROGRESS) {
1282             MTLOG_ERROR("connect failed, errno: %d", errno);
1283             return -2;
1284         }
1285 
1286         KqueuerObj epfd;
1287         epfd.SetOsfd(fd);
1288         epfd.EnableOutput();
1289         epfd.SetOwnerThread(thread);
1290         if (!mtframe->KqueueSchedule(NULL, &epfd, timeout)) {
1291             return -3;
1292         }
1293     }
1294 
1295     return n;
1296 }
1297 
1298 /**
1299  * @brief ΢�̰߳�����ϵͳIO���� accept
1300  * @param fd �����׽���
1301  * @param addr �ͻ��˵�ַ
1302  * @param addrlen ��ַ�ij���
1303  * @param timeout ��ȴ�ʱ��, ����
1304  * @return >=0 accept��socket������, <0 ʧ��
1305  */
1306 int MtFrame::accept(int fd, struct sockaddr *addr, socklen_t *addrlen, int timeout)
1307 {
1308     MtFrame* mtframe = MtFrame::Instance();
1309     utime64_t start = mtframe->GetLastClock();
1310     MicroThread* thread = mtframe->GetActiveThread();
1311     utime64_t now = 0;
1312 
1313     if(fd<0)
1314     {
1315     	errno = EINVAL;
1316     	MTLOG_ERROR("accept failed, errno: %d (%m)", errno);
1317     	return -10;
1318     }
1319 
1320     int acceptfd = 0;
1321     mt_hook_syscall(accept);
1322     while ((acceptfd = ff_hook_accept(fd, addr, addrlen)) < 0)
1323     {
1324         now = mtframe->GetLastClock();
1325         if ((int)(now - start) > timeout)
1326         {
1327             errno = ETIME;
1328             return -1;
1329         }
1330 
1331         if (errno == EINTR) {
1332             continue;
1333         }
1334 
1335         if (!((errno == EAGAIN) || (errno == EWOULDBLOCK))) {
1336             MTLOG_ERROR("accept failed, errno: %d", errno);
1337             return -2;
1338         }
1339 
1340         KqueuerObj epfd;
1341         epfd.SetOsfd(fd);
1342         epfd.EnableInput();
1343         epfd.SetOwnerThread(thread);
1344         if (!mtframe->KqueueSchedule(NULL, &epfd, timeout)) {
1345             return -3;
1346         }
1347     }
1348 
1349     return acceptfd;
1350 }
1351 
1352 
1353 /**
1354  * @brief ΢�̰߳�����ϵͳIO���� read
1355  * @param fd ϵͳsocket��Ϣ
1356  * @param buf ������Ϣ������ָ��
1357  * @param nbyte ������Ϣ����������
1358  * @param timeout ��ȴ�ʱ��, ����
1359  * @return >0 �ɹ����ճ���, <0 ʧ��
1360  */
1361 ssize_t MtFrame::read(int fd, void *buf, size_t nbyte, int timeout)
1362 {
1363     MtFrame* mtframe = MtFrame::Instance();
1364     utime64_t start = mtframe->GetLastClock();
1365     MicroThread* thread = mtframe->GetActiveThread();
1366     utime64_t now = 0;
1367 
1368     if(fd<0 || !buf || nbyte<1)
1369     {
1370     	errno = EINVAL;
1371     	MTLOG_ERROR("read failed, errno: %d (%m)", errno);
1372     	return -10;
1373     }
1374 
1375     ssize_t n = 0;
1376     mt_hook_syscall(read);
1377     while ((n = ff_hook_read(fd, buf, nbyte)) < 0)
1378     {
1379         now = mtframe->GetLastClock();
1380         if ((int)(now - start) > timeout)
1381         {
1382             errno = ETIME;
1383             return -1;
1384         }
1385 
1386         if (errno == EINTR) {
1387             continue;
1388         }
1389 
1390         if ((errno != EAGAIN) && (errno != EWOULDBLOCK)) {
1391             MTLOG_ERROR("read failed, errno: %d", errno);
1392             return -2;
1393         }
1394 
1395         KqueuerObj epfd;
1396         epfd.SetOsfd(fd);
1397         epfd.EnableInput();
1398         epfd.SetOwnerThread(thread);
1399         if (!mtframe->KqueueSchedule(NULL, &epfd, timeout)) {
1400             return -3;
1401         }
1402     }
1403 
1404     return n;
1405 }
1406 
1407 /**
1408  * @brief ΢�̰߳�����ϵͳIO���� write
1409  * @param fd ϵͳsocket��Ϣ
1410  * @param buf �����͵���Ϣָ��
1411  * @param nbyte �����͵���Ϣ����
1412  * @param timeout ��ȴ�ʱ��, ����
1413  * @return >0 �ɹ����ͳ���, <0 ʧ��
1414  */
1415 ssize_t MtFrame::write(int fd, const void *buf, size_t nbyte, int timeout)
1416 {
1417     MtFrame* mtframe = MtFrame::Instance();
1418     utime64_t start = mtframe->GetLastClock();
1419     MicroThread* thread = mtframe->GetActiveThread();
1420     utime64_t now = 0;
1421 
1422     if(fd<0 || !buf || nbyte<1)
1423     {
1424     	errno = EINVAL;
1425     	MTLOG_ERROR("write failed, errno: %d (%m)", errno);
1426     	return -10;
1427     }
1428 
1429     ssize_t n = 0;
1430     size_t send_len = 0;
1431     while (send_len < nbyte)
1432     {
1433         now = mtframe->GetLastClock();
1434         if ((int)(now - start) > timeout)
1435         {
1436             errno = ETIME;
1437             return -1;
1438         }
1439 
1440         mt_hook_syscall(write);
1441         n = ff_hook_write(fd, (char*)buf + send_len, nbyte - send_len);
1442         if (n < 0)
1443         {
1444             if (errno == EINTR) {
1445                 continue;
1446             }
1447 
1448             if ((errno != EAGAIN) && (errno != EWOULDBLOCK)) {
1449                 MTLOG_ERROR("write failed, errno: %d", errno);
1450                 return -2;
1451             }
1452         }
1453         else
1454         {
1455             send_len += n;
1456             if (send_len >= nbyte) {
1457                 return nbyte;
1458             }
1459         }
1460 
1461         KqueuerObj epfd;
1462         epfd.SetOsfd(fd);
1463         epfd.EnableOutput();
1464         epfd.SetOwnerThread(thread);
1465         if (!mtframe->KqueueSchedule(NULL, &epfd, timeout)) {
1466             return -3;
1467         }
1468     }
1469 
1470     return nbyte;
1471 }
1472 
1473 
1474 /**
1475  * @brief ΢�̰߳�����ϵͳIO���� recv
1476  * @param fd ϵͳsocket��Ϣ
1477  * @param buf ������Ϣ������ָ��
1478  * @param len ������Ϣ����������
1479  * @param timeout ��ȴ�ʱ��, ����
1480  * @return >0 �ɹ����ճ���, <0 ʧ��
1481  */
1482 int MtFrame::recv(int fd, void *buf, int len, int flags, int timeout)
1483 {
1484     MtFrame* mtframe = MtFrame::Instance();
1485     utime64_t start = mtframe->GetLastClock();
1486     MicroThread* thread = mtframe->GetActiveThread();
1487     utime64_t now = 0;
1488 
1489     if(fd<0 || !buf || len<1)
1490     {
1491     	errno = EINVAL;
1492     	MTLOG_ERROR("recv failed, errno: %d (%m)", errno);
1493     	return -10;
1494     }
1495 
1496     if (timeout <= -1)
1497     {
1498         timeout = 0x7fffffff;
1499     }
1500 
1501     while (true)
1502     {
1503         now = mtframe->GetLastClock();
1504         if ((int)(now - start) > timeout)
1505         {
1506             errno = ETIME;
1507             return -1;
1508         }
1509 
1510         KqueuerObj epfd;
1511         epfd.SetOsfd(fd);
1512         epfd.EnableInput();
1513         epfd.SetOwnerThread(thread);
1514         if (!mtframe->KqueueSchedule(NULL, &epfd, timeout))
1515         {
1516             MTLOG_DEBUG("epoll schedule failed, errno: %d", errno);
1517             return -2;
1518         }
1519 
1520         mt_hook_syscall(recv);
1521         int n = ff_hook_recv(fd, buf, len, flags);
1522         if (n < 0)
1523         {
1524             if (errno == EINTR) {
1525                 continue;
1526             }
1527 
1528             if ((errno != EAGAIN) && (errno != EWOULDBLOCK))
1529             {
1530                 MTLOG_ERROR("recv failed, errno: %d", errno);
1531                 return -3;
1532             }
1533         }
1534         else
1535         {
1536             return n;
1537         }
1538     }
1539 
1540 }
1541 
1542 /**
1543  * @brief ΢�̰߳�����ϵͳIO���� send
1544  * @param fd ϵͳsocket��Ϣ
1545  * @param buf �����͵���Ϣָ��
1546  * @param nbyte �����͵���Ϣ����
1547  * @param timeout ��ȴ�ʱ��, ����
1548  * @return >0 �ɹ����ͳ���, <0 ʧ��
1549  */
1550 ssize_t MtFrame::send(int fd, const void *buf, size_t nbyte, int flags, int timeout)
1551 {
1552     MtFrame* mtframe = MtFrame::Instance();
1553     utime64_t start = mtframe->GetLastClock();
1554     MicroThread* thread = mtframe->GetActiveThread();
1555     utime64_t now = 0;
1556 
1557     if(fd<0 || !buf || nbyte<1)
1558     {
1559     	errno = EINVAL;
1560     	MTLOG_ERROR("send failed, errno: %d (%m)", errno);
1561     	return -10;
1562     }
1563 
1564     ssize_t n = 0;
1565     size_t send_len = 0;
1566     while (send_len < nbyte)
1567     {
1568         now = mtframe->GetLastClock();
1569         if ((int)(now - start) > timeout)
1570         {
1571             errno = ETIME;
1572             return -1;
1573         }
1574 
1575         mt_hook_syscall(send);
1576         n = ff_hook_send(fd, (char*)buf + send_len, nbyte - send_len, flags);
1577         if (n < 0)
1578         {
1579             if (errno == EINTR) {
1580                 continue;
1581             }
1582 
1583             if ((errno != EAGAIN) && (errno != EWOULDBLOCK)) {
1584                 MTLOG_ERROR("write failed, errno: %d", errno);
1585                 return -2;
1586             }
1587         }
1588         else
1589         {
1590             send_len += n;
1591             if (send_len >= nbyte) {
1592                 return nbyte;
1593             }
1594         }
1595 
1596         KqueuerObj epfd;
1597         epfd.SetOsfd(fd);
1598         epfd.EnableOutput();
1599         epfd.SetOwnerThread(thread);
1600         if (!mtframe->KqueueSchedule(NULL, &epfd, timeout)) {
1601             return -3;
1602         }
1603     }
1604 
1605     return nbyte;
1606 }
1607 
1608 
1609 
1610 /**
1611  * @brief ΢�߳�����sleep�ӿ�, ��λms
1612  */
1613 void MtFrame::sleep(int ms)
1614 {
1615     MtFrame* frame = MtFrame::Instance();
1616     MicroThread* thread = frame->GetActiveThread();
1617     if (thread != NULL)
1618     {
1619         thread->sleep(ms);
1620     }
1621 }
1622 
1623 /**
1624  * @brief ΢�̰߳�����ϵͳIO���� recv
1625  * @param fd ϵͳsocket��Ϣ
1626  * @param events �¼�����  EPOLLIN or EPOLLOUT
1627  * @param timeout ��ȴ�ʱ��, ����
1628  * @return >0 �ɹ����ճ���, <0 ʧ��
1629  */
1630 int MtFrame::WaitEvents(int fd, int events, int timeout)
1631 {
1632     MtFrame* mtframe = MtFrame::Instance();
1633     utime64_t start = mtframe->GetLastClock();
1634     MicroThread* thread = mtframe->GetActiveThread();
1635     utime64_t now = 0;
1636 
1637     if (timeout <= -1)
1638     {
1639         timeout = 0x7fffffff;
1640     }
1641 
1642     while (true)
1643     {
1644         now = mtframe->GetLastClock();
1645         if ((int)(now - start) > timeout)
1646         {
1647             errno = ETIME;
1648             return 0;
1649         }
1650 
1651         KqueuerObj epfd;
1652         epfd.SetOsfd(fd);
1653         if (events & KQ_EVENT_READ)
1654         {
1655             epfd.EnableInput();
1656         }
1657         if (events & KQ_EVENT_WRITE)
1658         {
1659             epfd.EnableOutput();
1660         }
1661         epfd.SetOwnerThread(thread);
1662 
1663         if (!mtframe->KqueueSchedule(NULL, &epfd, timeout))
1664         {
1665             MTLOG_TRACE("epoll schedule failed, errno: %d", errno);
1666             return 0;
1667         }
1668 
1669         return epfd.GetRcvEvents();
1670     }
1671 }
1672 
1673 
1674