xref: /f-stack/app/micro_thread/micro_thread.cpp (revision a9643ea8)
1*a9643ea8Slogwang 
2*a9643ea8Slogwang /**
3*a9643ea8Slogwang  * Tencent is pleased to support the open source community by making MSEC available.
4*a9643ea8Slogwang  *
5*a9643ea8Slogwang  * Copyright (C) 2016 THL A29 Limited, a Tencent company. All rights reserved.
6*a9643ea8Slogwang  *
7*a9643ea8Slogwang  * Licensed under the GNU General Public License, Version 2.0 (the "License");
8*a9643ea8Slogwang  * you may not use this file except in compliance with the License. You may
9*a9643ea8Slogwang  * obtain a copy of the License at
10*a9643ea8Slogwang  *
11*a9643ea8Slogwang  *     https://opensource.org/licenses/GPL-2.0
12*a9643ea8Slogwang  *
13*a9643ea8Slogwang  * Unless required by applicable law or agreed to in writing, software distributed under the
14*a9643ea8Slogwang  * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
15*a9643ea8Slogwang  * either express or implied. See the License for the specific language governing permissions
16*a9643ea8Slogwang  * and limitations under the License.
17*a9643ea8Slogwang  */
18*a9643ea8Slogwang 
19*a9643ea8Slogwang 
20*a9643ea8Slogwang /**
21*a9643ea8Slogwang  *  @filename micro_thread.cpp
22*a9643ea8Slogwang  *  @info  micro thread manager
23*a9643ea8Slogwang  */
24*a9643ea8Slogwang #include "mt_version.h"
25*a9643ea8Slogwang #include "micro_thread.h"
26*a9643ea8Slogwang #include "mt_net.h"
27*a9643ea8Slogwang #include "valgrind.h"
28*a9643ea8Slogwang #include <assert.h>
29*a9643ea8Slogwang #include "mt_sys_hook.h"
30*a9643ea8Slogwang #include "ff_hook.h"
31*a9643ea8Slogwang #include "ff_api.h"
32*a9643ea8Slogwang 
33*a9643ea8Slogwang using namespace NS_MICRO_THREAD;
34*a9643ea8Slogwang 
35*a9643ea8Slogwang #define  ASSERT(statement)
36*a9643ea8Slogwang //#define  ASSERT(statement)   assert(statement)
37*a9643ea8Slogwang 
38*a9643ea8Slogwang /**
39*a9643ea8Slogwang  *  @brief ���ʵ�ֱ��������ĺ���
40*a9643ea8Slogwang  *  @param jbf jmpbuff����ָ��
41*a9643ea8Slogwang  */
42*a9643ea8Slogwang extern "C"  int save_context(jmp_buf jbf);
43*a9643ea8Slogwang 
44*a9643ea8Slogwang /**
45*a9643ea8Slogwang  *  @brief ���ʵ�ָֻ������ĺ���
46*a9643ea8Slogwang  *  @param jbf jmpbuff����ָ��
47*a9643ea8Slogwang  *  @param ret �лصķ���ֵ, Ĭ��1
48*a9643ea8Slogwang  */
49*a9643ea8Slogwang extern "C"  void restore_context(jmp_buf jbf, int ret);
50*a9643ea8Slogwang 
51*a9643ea8Slogwang /**
52*a9643ea8Slogwang  *  @brief ���ʵ���滻����ջ����
53*a9643ea8Slogwang  *  @param jbf jmpbuff����ָ��
54*a9643ea8Slogwang  *  @param esp ��ջָ��
55*a9643ea8Slogwang  */
56*a9643ea8Slogwang extern "C"  void replace_esp(jmp_buf jbf, void* esp);
57*a9643ea8Slogwang 
58*a9643ea8Slogwang /**
59*a9643ea8Slogwang  * @brief ���캯��, Ĭ�ϲ���ջ��С
60*a9643ea8Slogwang  */
61*a9643ea8Slogwang Thread::Thread(int stack_size)
62*a9643ea8Slogwang {
63*a9643ea8Slogwang     _stack_size  = stack_size ? stack_size : ThreadPool::default_stack_size;
64*a9643ea8Slogwang     _wakeup_time = 0;
65*a9643ea8Slogwang     _stack       = NULL;
66*a9643ea8Slogwang     memset(&_jmpbuf, 0, sizeof(_jmpbuf));
67*a9643ea8Slogwang }
68*a9643ea8Slogwang 
69*a9643ea8Slogwang 
70*a9643ea8Slogwang /**
71*a9643ea8Slogwang  *  @brief LINUX x86/x86_64�µ�ջ����, �����ܹ�����Ҫע�����
72*a9643ea8Slogwang  */
73*a9643ea8Slogwang bool Thread::InitStack()
74*a9643ea8Slogwang {
75*a9643ea8Slogwang     if (_stack) {
76*a9643ea8Slogwang         return true;
77*a9643ea8Slogwang     }
78*a9643ea8Slogwang 
79*a9643ea8Slogwang     ///< ջ������ջ�ڴ����, ��Խ��
80*a9643ea8Slogwang     _stack = (MtStack*)calloc(1, sizeof(MtStack));
81*a9643ea8Slogwang     if (NULL == _stack)
82*a9643ea8Slogwang     {
83*a9643ea8Slogwang         MTLOG_ERROR("calloc stack failed, size %u", sizeof(MtStack));
84*a9643ea8Slogwang         return false;
85*a9643ea8Slogwang     }
86*a9643ea8Slogwang 
87*a9643ea8Slogwang     int memsize = MEM_PAGE_SIZE*2 + _stack_size;
88*a9643ea8Slogwang     memsize = (memsize + MEM_PAGE_SIZE - 1)/MEM_PAGE_SIZE*MEM_PAGE_SIZE;
89*a9643ea8Slogwang 
90*a9643ea8Slogwang     static int zero_fd = -1;
91*a9643ea8Slogwang     int mmap_flags = MAP_PRIVATE | MAP_ANON;
92*a9643ea8Slogwang     void* vaddr = mmap(NULL, memsize, PROT_READ | PROT_WRITE, mmap_flags, zero_fd, 0);
93*a9643ea8Slogwang     if (vaddr == (void *)MAP_FAILED)
94*a9643ea8Slogwang     {
95*a9643ea8Slogwang         MTLOG_ERROR("mmap stack failed, size %d", memsize);
96*a9643ea8Slogwang         free(_stack);
97*a9643ea8Slogwang         _stack = NULL;
98*a9643ea8Slogwang         return false;
99*a9643ea8Slogwang     }
100*a9643ea8Slogwang     _stack->_vaddr = (char*)vaddr;
101*a9643ea8Slogwang     _stack->_vaddr_size = memsize;
102*a9643ea8Slogwang     _stack->_stk_size = _stack_size;
103*a9643ea8Slogwang     _stack->_stk_bottom = _stack->_vaddr + MEM_PAGE_SIZE;
104*a9643ea8Slogwang     _stack->_stk_top = _stack->_stk_bottom + _stack->_stk_size;
105*a9643ea8Slogwang 	// valgrind support: register stack frame
106*a9643ea8Slogwang 	_stack->valgrind_id = VALGRIND_STACK_REGISTER(_stack->_stk_bottom, _stack->_stk_top);
107*a9643ea8Slogwang 
108*a9643ea8Slogwang     _stack->_esp = _stack->_stk_top - STACK_PAD_SIZE;
109*a9643ea8Slogwang 
110*a9643ea8Slogwang     mprotect(_stack->_vaddr, MEM_PAGE_SIZE, PROT_NONE);
111*a9643ea8Slogwang     mprotect(_stack->_stk_top, MEM_PAGE_SIZE, PROT_NONE);
112*a9643ea8Slogwang 
113*a9643ea8Slogwang     return true;
114*a9643ea8Slogwang }
115*a9643ea8Slogwang 
116*a9643ea8Slogwang 
117*a9643ea8Slogwang /**
118*a9643ea8Slogwang  * @brief �ͷŶ�ջ��Ϣ
119*a9643ea8Slogwang  */
120*a9643ea8Slogwang void Thread::FreeStack()
121*a9643ea8Slogwang {
122*a9643ea8Slogwang     if (!_stack) {
123*a9643ea8Slogwang         return;
124*a9643ea8Slogwang     }
125*a9643ea8Slogwang     munmap(_stack->_vaddr, _stack->_vaddr_size);
126*a9643ea8Slogwang 	// valgrind support: deregister stack frame
127*a9643ea8Slogwang 	VALGRIND_STACK_DEREGISTER(_stack->valgrind_id);
128*a9643ea8Slogwang     free(_stack);
129*a9643ea8Slogwang     _stack = NULL;
130*a9643ea8Slogwang }
131*a9643ea8Slogwang 
132*a9643ea8Slogwang /**
133*a9643ea8Slogwang  * @brief ��ʼ��������,���üĴ���,��ջ
134*a9643ea8Slogwang  */
135*a9643ea8Slogwang void Thread::InitContext()
136*a9643ea8Slogwang {
137*a9643ea8Slogwang     if (save_context(_jmpbuf) != 0)
138*a9643ea8Slogwang     {
139*a9643ea8Slogwang         ScheduleObj::Instance()->ScheduleStartRun(); // ֱ�ӵ��� this->run?
140*a9643ea8Slogwang     }
141*a9643ea8Slogwang 
142*a9643ea8Slogwang     if (_stack != NULL)
143*a9643ea8Slogwang     {
144*a9643ea8Slogwang         replace_esp(_jmpbuf, _stack->_esp);
145*a9643ea8Slogwang     }
146*a9643ea8Slogwang }
147*a9643ea8Slogwang 
148*a9643ea8Slogwang /**
149*a9643ea8Slogwang  * @brief �����л�, ����״̬, ��������
150*a9643ea8Slogwang  */
151*a9643ea8Slogwang void Thread::SwitchContext()
152*a9643ea8Slogwang {
153*a9643ea8Slogwang     if (save_context(_jmpbuf) == 0)
154*a9643ea8Slogwang     {
155*a9643ea8Slogwang         ScheduleObj::Instance()->ScheduleThread();
156*a9643ea8Slogwang     }
157*a9643ea8Slogwang }
158*a9643ea8Slogwang 
159*a9643ea8Slogwang /**
160*a9643ea8Slogwang  * @brief �ָ�������, �л��ضϵ�,��������
161*a9643ea8Slogwang  */
162*a9643ea8Slogwang void Thread::RestoreContext()
163*a9643ea8Slogwang {
164*a9643ea8Slogwang     restore_context(_jmpbuf, 1);
165*a9643ea8Slogwang }
166*a9643ea8Slogwang 
167*a9643ea8Slogwang /**
168*a9643ea8Slogwang  * @brief ��ʼ���߳�,���ջ�������ij�ʼ��
169*a9643ea8Slogwang  */
170*a9643ea8Slogwang bool Thread::Initial()
171*a9643ea8Slogwang {
172*a9643ea8Slogwang     if (!InitStack())
173*a9643ea8Slogwang     {
174*a9643ea8Slogwang         MTLOG_ERROR("init stack failed");
175*a9643ea8Slogwang         return false;
176*a9643ea8Slogwang     }
177*a9643ea8Slogwang 
178*a9643ea8Slogwang     InitContext();
179*a9643ea8Slogwang 
180*a9643ea8Slogwang     return true;
181*a9643ea8Slogwang }
182*a9643ea8Slogwang 
183*a9643ea8Slogwang /**
184*a9643ea8Slogwang  * @brief ��ֹ�߳�,���ջ���������ͷ�
185*a9643ea8Slogwang  */
186*a9643ea8Slogwang void Thread::Destroy()
187*a9643ea8Slogwang {
188*a9643ea8Slogwang     FreeStack();
189*a9643ea8Slogwang     memset(&_jmpbuf, 0, sizeof(_jmpbuf));
190*a9643ea8Slogwang }
191*a9643ea8Slogwang 
192*a9643ea8Slogwang /**
193*a9643ea8Slogwang  * @brief �߳�״̬����, �ɸ���״̬
194*a9643ea8Slogwang  */
195*a9643ea8Slogwang void Thread::Reset()
196*a9643ea8Slogwang {
197*a9643ea8Slogwang     _wakeup_time = 0;
198*a9643ea8Slogwang     SetPrivate(NULL);
199*a9643ea8Slogwang 
200*a9643ea8Slogwang     InitContext();
201*a9643ea8Slogwang     CleanState();
202*a9643ea8Slogwang }
203*a9643ea8Slogwang 
204*a9643ea8Slogwang /**
205*a9643ea8Slogwang  * @brief �߳���������˯��, ��λ����
206*a9643ea8Slogwang  * @param ms ˯�ߺ�����
207*a9643ea8Slogwang  */
208*a9643ea8Slogwang void Thread::sleep(int ms)
209*a9643ea8Slogwang {
210*a9643ea8Slogwang     utime64_t now = ScheduleObj::Instance()->ScheduleGetTime();
211*a9643ea8Slogwang     _wakeup_time = now + ms;
212*a9643ea8Slogwang 
213*a9643ea8Slogwang     if (save_context(_jmpbuf) == 0)
214*a9643ea8Slogwang     {
215*a9643ea8Slogwang         ScheduleObj::Instance()->ScheduleSleep();
216*a9643ea8Slogwang     }
217*a9643ea8Slogwang }
218*a9643ea8Slogwang 
219*a9643ea8Slogwang /**
220*a9643ea8Slogwang  * @brief ��������״̬, �ȴ��������߳̽���
221*a9643ea8Slogwang  */
222*a9643ea8Slogwang void Thread::Wait()
223*a9643ea8Slogwang {
224*a9643ea8Slogwang     if (save_context(_jmpbuf) == 0)
225*a9643ea8Slogwang     {
226*a9643ea8Slogwang         ScheduleObj::Instance()->SchedulePend();
227*a9643ea8Slogwang     }
228*a9643ea8Slogwang }
229*a9643ea8Slogwang 
230*a9643ea8Slogwang /**
231*a9643ea8Slogwang  * @brief ��ʼ��������,���üĴ���,��ջ
232*a9643ea8Slogwang  */
233*a9643ea8Slogwang bool Thread::CheckStackHealth(char *esp)
234*a9643ea8Slogwang {
235*a9643ea8Slogwang 	if (!_stack)
236*a9643ea8Slogwang 		return false;
237*a9643ea8Slogwang 
238*a9643ea8Slogwang 	if (esp > _stack->_stk_bottom && esp < _stack->_stk_top)
239*a9643ea8Slogwang 		return true;
240*a9643ea8Slogwang 	else
241*a9643ea8Slogwang 		return false;
242*a9643ea8Slogwang }
243*a9643ea8Slogwang 
244*a9643ea8Slogwang /**
245*a9643ea8Slogwang  * @brief ΢�̹߳���, Ĭ������ͨ�߳�
246*a9643ea8Slogwang  * @param type ����, Ĭ����ͨ
247*a9643ea8Slogwang  */
248*a9643ea8Slogwang MicroThread::MicroThread(ThreadType type)
249*a9643ea8Slogwang {
250*a9643ea8Slogwang     memset(&_entry, 0, sizeof(_entry));
251*a9643ea8Slogwang     TAILQ_INIT(&_fdset);
252*a9643ea8Slogwang     TAILQ_INIT(&_sub_list);
253*a9643ea8Slogwang     _flag = NOT_INLIST;
254*a9643ea8Slogwang     _type = type;
255*a9643ea8Slogwang     _state = INITIAL;
256*a9643ea8Slogwang     _start = NULL;
257*a9643ea8Slogwang     _args = NULL;
258*a9643ea8Slogwang     _parent = NULL;
259*a9643ea8Slogwang }
260*a9643ea8Slogwang 
261*a9643ea8Slogwang /**
262*a9643ea8Slogwang  * @breif ΢�̸߳���״̬����
263*a9643ea8Slogwang  */
264*a9643ea8Slogwang void MicroThread::CleanState()
265*a9643ea8Slogwang {
266*a9643ea8Slogwang     TAILQ_INIT(&_fdset);
267*a9643ea8Slogwang     TAILQ_INIT(&_sub_list);
268*a9643ea8Slogwang     _flag = NOT_INLIST;
269*a9643ea8Slogwang     _type = NORMAL;
270*a9643ea8Slogwang     _state = INITIAL;
271*a9643ea8Slogwang     _start = NULL;
272*a9643ea8Slogwang     _args = NULL;
273*a9643ea8Slogwang     _parent = NULL;
274*a9643ea8Slogwang }
275*a9643ea8Slogwang 
276*a9643ea8Slogwang /**
277*a9643ea8Slogwang  * @brief �̵߳�ʵ�ʹ�������
278*a9643ea8Slogwang  */
279*a9643ea8Slogwang void MicroThread::Run()
280*a9643ea8Slogwang {
281*a9643ea8Slogwang     if (_start) {
282*a9643ea8Slogwang         _start(_args);
283*a9643ea8Slogwang     }
284*a9643ea8Slogwang 
285*a9643ea8Slogwang     // �����߳�, �������߳̽��������̬
286*a9643ea8Slogwang     if (this->IsSubThread()) {
287*a9643ea8Slogwang         this->WakeupParent();
288*a9643ea8Slogwang     }
289*a9643ea8Slogwang 
290*a9643ea8Slogwang     ScheduleObj::Instance()->ScheduleReclaim();
291*a9643ea8Slogwang     ScheduleObj::Instance()->ScheduleThread();
292*a9643ea8Slogwang }
293*a9643ea8Slogwang 
294*a9643ea8Slogwang /**
295*a9643ea8Slogwang  * @brief �������̻߳��Ѹ��̴߳���
296*a9643ea8Slogwang  */
297*a9643ea8Slogwang void MicroThread::WakeupParent()
298*a9643ea8Slogwang {
299*a9643ea8Slogwang     MicroThread* parent = this->GetParent();
300*a9643ea8Slogwang     if (parent)
301*a9643ea8Slogwang     {
302*a9643ea8Slogwang         parent->RemoveSubThread(this);
303*a9643ea8Slogwang         if (parent->HasNoSubThread())
304*a9643ea8Slogwang         {
305*a9643ea8Slogwang             ScheduleObj::Instance()->ScheduleUnpend(parent);
306*a9643ea8Slogwang         }
307*a9643ea8Slogwang     }
308*a9643ea8Slogwang     else
309*a9643ea8Slogwang     {
310*a9643ea8Slogwang         MTLOG_ERROR("Sub thread no parent, error");
311*a9643ea8Slogwang     }
312*a9643ea8Slogwang }
313*a9643ea8Slogwang 
314*a9643ea8Slogwang /**
315*a9643ea8Slogwang  * @brief �Ƿ��������Ķ������߳�
316*a9643ea8Slogwang  */
317*a9643ea8Slogwang bool MicroThread::HasNoSubThread()
318*a9643ea8Slogwang {
319*a9643ea8Slogwang     return TAILQ_EMPTY(&_sub_list);
320*a9643ea8Slogwang }
321*a9643ea8Slogwang 
322*a9643ea8Slogwang /**
323*a9643ea8Slogwang  * @brief ��ָ�����̼߳�������߳��б�
324*a9643ea8Slogwang  */
325*a9643ea8Slogwang void MicroThread::AddSubThread(MicroThread* sub)
326*a9643ea8Slogwang {
327*a9643ea8Slogwang     ASSERT(!sub->HasFlag(MicroThread::SUB_LIST));
328*a9643ea8Slogwang     if (!sub->HasFlag(MicroThread::SUB_LIST))
329*a9643ea8Slogwang     {
330*a9643ea8Slogwang         TAILQ_INSERT_TAIL(&_sub_list, sub, _sub_entry);
331*a9643ea8Slogwang         sub->_parent = this;
332*a9643ea8Slogwang     }
333*a9643ea8Slogwang 
334*a9643ea8Slogwang     sub->SetFlag(MicroThread::SUB_LIST);
335*a9643ea8Slogwang }
336*a9643ea8Slogwang 
337*a9643ea8Slogwang /**
338*a9643ea8Slogwang  * @brief ��ָ���߳��Ƴ������߳��б�
339*a9643ea8Slogwang  */
340*a9643ea8Slogwang void MicroThread::RemoveSubThread(MicroThread* sub)
341*a9643ea8Slogwang {
342*a9643ea8Slogwang     ASSERT(sub->HasFlag(MicroThread::SUB_LIST));
343*a9643ea8Slogwang     if (sub->HasFlag(MicroThread::SUB_LIST))
344*a9643ea8Slogwang     {
345*a9643ea8Slogwang         TAILQ_REMOVE(&_sub_list, sub, _sub_entry);
346*a9643ea8Slogwang         sub->_parent = NULL;
347*a9643ea8Slogwang     }
348*a9643ea8Slogwang 
349*a9643ea8Slogwang     sub->UnsetFlag(MicroThread::SUB_LIST);
350*a9643ea8Slogwang }
351*a9643ea8Slogwang 
352*a9643ea8Slogwang 
353*a9643ea8Slogwang /**
354*a9643ea8Slogwang  * @brief ��������ʾ�����
355*a9643ea8Slogwang  */
356*a9643ea8Slogwang ScheduleObj *ScheduleObj::_instance = NULL;     ///< ��̬�����ʼ��
357*a9643ea8Slogwang inline ScheduleObj* ScheduleObj::Instance()
358*a9643ea8Slogwang {
359*a9643ea8Slogwang     if (NULL == _instance)
360*a9643ea8Slogwang     {
361*a9643ea8Slogwang         _instance = new ScheduleObj();
362*a9643ea8Slogwang     }
363*a9643ea8Slogwang 
364*a9643ea8Slogwang     return _instance;
365*a9643ea8Slogwang }
366*a9643ea8Slogwang 
367*a9643ea8Slogwang /**
368*a9643ea8Slogwang  * @brief ��������΢�߳�������, �����ӿ�
369*a9643ea8Slogwang  */
370*a9643ea8Slogwang void ScheduleObj::ScheduleThread()
371*a9643ea8Slogwang {
372*a9643ea8Slogwang     MtFrame* frame = MtFrame::Instance();
373*a9643ea8Slogwang     frame->ThreadSchdule();
374*a9643ea8Slogwang }
375*a9643ea8Slogwang 
376*a9643ea8Slogwang /**
377*a9643ea8Slogwang  * @brief ��ȡȫ�ֵ�ʱ���, ���뵥λ
378*a9643ea8Slogwang  */
379*a9643ea8Slogwang utime64_t ScheduleObj::ScheduleGetTime()
380*a9643ea8Slogwang {
381*a9643ea8Slogwang     MtFrame* frame = MtFrame::Instance();
382*a9643ea8Slogwang     if (frame)
383*a9643ea8Slogwang     {
384*a9643ea8Slogwang         return frame->GetLastClock();
385*a9643ea8Slogwang     }
386*a9643ea8Slogwang     else
387*a9643ea8Slogwang     {
388*a9643ea8Slogwang         MTLOG_ERROR("frame time failed, maybe not init");
389*a9643ea8Slogwang         return 0;
390*a9643ea8Slogwang     }
391*a9643ea8Slogwang }
392*a9643ea8Slogwang 
393*a9643ea8Slogwang /**
394*a9643ea8Slogwang  * @brief �̵߳�����������sleep״̬
395*a9643ea8Slogwang  */
396*a9643ea8Slogwang void ScheduleObj::ScheduleSleep()
397*a9643ea8Slogwang {
398*a9643ea8Slogwang     MtFrame* frame = MtFrame::Instance();
399*a9643ea8Slogwang     MicroThread* thread = frame->GetActiveThread();
400*a9643ea8Slogwang     if ((!frame) || (!thread)) {
401*a9643ea8Slogwang         MTLOG_ERROR("frame and act thread null, %p, %p", frame, thread);
402*a9643ea8Slogwang         return;
403*a9643ea8Slogwang     }
404*a9643ea8Slogwang 
405*a9643ea8Slogwang     frame->InsertSleep(thread);
406*a9643ea8Slogwang     frame->ThreadSchdule();
407*a9643ea8Slogwang }
408*a9643ea8Slogwang 
409*a9643ea8Slogwang /**
410*a9643ea8Slogwang  * @brief �̵߳�����������pend״̬
411*a9643ea8Slogwang  */
412*a9643ea8Slogwang void ScheduleObj::SchedulePend()
413*a9643ea8Slogwang {
414*a9643ea8Slogwang     MtFrame* frame = MtFrame::Instance();
415*a9643ea8Slogwang     MicroThread* thread = frame->GetActiveThread();
416*a9643ea8Slogwang     if ((!frame) || (!thread)) {
417*a9643ea8Slogwang         MTLOG_ERROR("frame and act thread null, %p, %p", frame, thread);
418*a9643ea8Slogwang         return;
419*a9643ea8Slogwang     }
420*a9643ea8Slogwang 
421*a9643ea8Slogwang     frame->InsertPend(thread);
422*a9643ea8Slogwang     frame->ThreadSchdule();
423*a9643ea8Slogwang }
424*a9643ea8Slogwang 
425*a9643ea8Slogwang /**
426*a9643ea8Slogwang  * @brief �̵߳���ȡ��pend״̬, �ⲿ����ȡ��
427*a9643ea8Slogwang  */
428*a9643ea8Slogwang void ScheduleObj::ScheduleUnpend(void* pthread)
429*a9643ea8Slogwang {
430*a9643ea8Slogwang     MtFrame* frame = MtFrame::Instance();
431*a9643ea8Slogwang     MicroThread* thread = (MicroThread*)pthread;
432*a9643ea8Slogwang     if ((!frame) || (!thread)) {
433*a9643ea8Slogwang         MTLOG_ERROR("frame and act thread null, %p, %p", frame, thread);
434*a9643ea8Slogwang         return;
435*a9643ea8Slogwang     }
436*a9643ea8Slogwang 
437*a9643ea8Slogwang     frame->RemovePend(thread);
438*a9643ea8Slogwang     frame->InsertRunable(thread);
439*a9643ea8Slogwang }
440*a9643ea8Slogwang 
441*a9643ea8Slogwang 
442*a9643ea8Slogwang 
443*a9643ea8Slogwang /**
444*a9643ea8Slogwang  * @brief �߳�ִ����Ϻ�, ���մ���
445*a9643ea8Slogwang  */
446*a9643ea8Slogwang void ScheduleObj::ScheduleReclaim()
447*a9643ea8Slogwang {
448*a9643ea8Slogwang     MtFrame* frame = MtFrame::Instance();
449*a9643ea8Slogwang     MicroThread* thread = frame->GetActiveThread();
450*a9643ea8Slogwang     if ((!frame) || (!thread)) {
451*a9643ea8Slogwang         MTLOG_ERROR("frame and act thread null, %p, %p", frame, thread);
452*a9643ea8Slogwang         return;
453*a9643ea8Slogwang     }
454*a9643ea8Slogwang 
455*a9643ea8Slogwang     frame->FreeThread(thread);
456*a9643ea8Slogwang }
457*a9643ea8Slogwang 
458*a9643ea8Slogwang /**
459*a9643ea8Slogwang  * @brief ���������ȳ�ʼִ��
460*a9643ea8Slogwang  */
461*a9643ea8Slogwang void ScheduleObj::ScheduleStartRun()
462*a9643ea8Slogwang {
463*a9643ea8Slogwang     MtFrame* frame = MtFrame::Instance();
464*a9643ea8Slogwang     MicroThread* thread = frame->GetActiveThread();
465*a9643ea8Slogwang     if ((!frame) || (!thread)) {
466*a9643ea8Slogwang         MTLOG_ERROR("frame and act thread null, %p, %p", frame, thread);
467*a9643ea8Slogwang         return;
468*a9643ea8Slogwang     }
469*a9643ea8Slogwang 
470*a9643ea8Slogwang     thread->Run();
471*a9643ea8Slogwang }
472*a9643ea8Slogwang 
473*a9643ea8Slogwang 
474*a9643ea8Slogwang /**
475*a9643ea8Slogwang  * @brief ΢�̳߳�ȫ�ֲ�����ʼ��
476*a9643ea8Slogwang  */
477*a9643ea8Slogwang unsigned int ThreadPool::default_thread_num = DEFAULT_THREAD_NUM;   ///< Ĭ��2000΢�̴߳���
478*a9643ea8Slogwang unsigned int ThreadPool::default_stack_size = DEFAULT_STACK_SIZE;   ///< Ĭ��128Kջ��С
479*a9643ea8Slogwang 
480*a9643ea8Slogwang /**
481*a9643ea8Slogwang  * @brief ΢�̳߳س�ʼ��
482*a9643ea8Slogwang  */
483*a9643ea8Slogwang bool ThreadPool::InitialPool(int max_num)
484*a9643ea8Slogwang {
485*a9643ea8Slogwang     MicroThread *thread = NULL;
486*a9643ea8Slogwang     for (unsigned int i = 0; i < default_thread_num; i++)
487*a9643ea8Slogwang     {
488*a9643ea8Slogwang         thread = new MicroThread();
489*a9643ea8Slogwang         if ((NULL == thread) || (false == thread->Initial()))
490*a9643ea8Slogwang         {
491*a9643ea8Slogwang             MTLOG_ERROR("init pool, thread %p init failed", thread);
492*a9643ea8Slogwang             if (thread)  delete thread;
493*a9643ea8Slogwang             continue;
494*a9643ea8Slogwang         }
495*a9643ea8Slogwang         thread->SetFlag(MicroThread::FREE_LIST);
496*a9643ea8Slogwang         _freelist.push(thread);
497*a9643ea8Slogwang     }
498*a9643ea8Slogwang 
499*a9643ea8Slogwang     _total_num = _freelist.size();
500*a9643ea8Slogwang     _max_num  = max_num;
501*a9643ea8Slogwang     _use_num = 0;
502*a9643ea8Slogwang     if (_total_num <= 0)
503*a9643ea8Slogwang     {
504*a9643ea8Slogwang         return false;
505*a9643ea8Slogwang     }
506*a9643ea8Slogwang     else
507*a9643ea8Slogwang     {
508*a9643ea8Slogwang         return true;
509*a9643ea8Slogwang     }
510*a9643ea8Slogwang }
511*a9643ea8Slogwang 
512*a9643ea8Slogwang /**
513*a9643ea8Slogwang  * @brief ΢�̳߳ط���ʼ��
514*a9643ea8Slogwang  */
515*a9643ea8Slogwang void ThreadPool::DestroyPool()
516*a9643ea8Slogwang {
517*a9643ea8Slogwang     MicroThread* thread = NULL;
518*a9643ea8Slogwang     while (!_freelist.empty())
519*a9643ea8Slogwang     {
520*a9643ea8Slogwang         thread = _freelist.front();
521*a9643ea8Slogwang         _freelist.pop();
522*a9643ea8Slogwang         thread->Destroy();
523*a9643ea8Slogwang         delete thread;
524*a9643ea8Slogwang     }
525*a9643ea8Slogwang 
526*a9643ea8Slogwang     _total_num = 0;
527*a9643ea8Slogwang     _use_num = 0;
528*a9643ea8Slogwang }
529*a9643ea8Slogwang 
530*a9643ea8Slogwang /**
531*a9643ea8Slogwang  * @brief ΢�̷߳���ӿ�
532*a9643ea8Slogwang  * @return ΢�̶߳���
533*a9643ea8Slogwang  */
534*a9643ea8Slogwang MicroThread* ThreadPool::AllocThread()
535*a9643ea8Slogwang {
536*a9643ea8Slogwang     MT_ATTR_API_SET(492069, _total_num); // ΢�̳߳ش�С
537*a9643ea8Slogwang 
538*a9643ea8Slogwang     MicroThread* thread = NULL;
539*a9643ea8Slogwang     if (!_freelist.empty())
540*a9643ea8Slogwang     {
541*a9643ea8Slogwang         thread = _freelist.front();
542*a9643ea8Slogwang         _freelist.pop();
543*a9643ea8Slogwang 
544*a9643ea8Slogwang         ASSERT(thread->HasFlag(MicroThread::FREE_LIST));
545*a9643ea8Slogwang 
546*a9643ea8Slogwang         thread->UnsetFlag(MicroThread::FREE_LIST);
547*a9643ea8Slogwang         _use_num++;
548*a9643ea8Slogwang         return thread;
549*a9643ea8Slogwang     }
550*a9643ea8Slogwang 
551*a9643ea8Slogwang     MT_ATTR_API(320846, 1); // pool no nore
552*a9643ea8Slogwang     if (_total_num >= _max_num)
553*a9643ea8Slogwang     {
554*a9643ea8Slogwang         MT_ATTR_API(361140, 1); // no more quota
555*a9643ea8Slogwang         return NULL;
556*a9643ea8Slogwang     }
557*a9643ea8Slogwang 
558*a9643ea8Slogwang     thread = new MicroThread();
559*a9643ea8Slogwang     if ((NULL == thread) || (false == thread->Initial()))
560*a9643ea8Slogwang     {
561*a9643ea8Slogwang         MT_ATTR_API(320847, 1); // pool init fail
562*a9643ea8Slogwang         MTLOG_ERROR("thread alloc failed, thread: %p", thread);
563*a9643ea8Slogwang         if (thread)  delete thread;
564*a9643ea8Slogwang         return NULL;
565*a9643ea8Slogwang     }
566*a9643ea8Slogwang     _total_num++;
567*a9643ea8Slogwang     _use_num++;
568*a9643ea8Slogwang 
569*a9643ea8Slogwang     return thread;
570*a9643ea8Slogwang }
571*a9643ea8Slogwang 
572*a9643ea8Slogwang /**
573*a9643ea8Slogwang  * @brief ΢�߳��ͷŽӿ�
574*a9643ea8Slogwang  * @param thread ΢�̶߳���
575*a9643ea8Slogwang  */
576*a9643ea8Slogwang void ThreadPool::FreeThread(MicroThread* thread)
577*a9643ea8Slogwang {
578*a9643ea8Slogwang     ASSERT(!thread->HasFlag(MicroThread::FREE_LIST));
579*a9643ea8Slogwang     thread->Reset();
580*a9643ea8Slogwang     _use_num--;
581*a9643ea8Slogwang     _freelist.push(thread);
582*a9643ea8Slogwang     thread->SetFlag(MicroThread::FREE_LIST);
583*a9643ea8Slogwang 
584*a9643ea8Slogwang     ///< ���ж��� > default_thread_num, ���ͷ����ϵ�, �������ͷŵ�ǰ
585*a9643ea8Slogwang     unsigned int free_num = _freelist.size();
586*a9643ea8Slogwang     if ((free_num > default_thread_num) && (free_num > 1))
587*a9643ea8Slogwang     {
588*a9643ea8Slogwang         thread = _freelist.front();
589*a9643ea8Slogwang         _freelist.pop();
590*a9643ea8Slogwang         thread->Destroy();
591*a9643ea8Slogwang         delete thread;
592*a9643ea8Slogwang         _total_num--;
593*a9643ea8Slogwang     }
594*a9643ea8Slogwang }
595*a9643ea8Slogwang 
596*a9643ea8Slogwang int ThreadPool::GetUsedNum(void)
597*a9643ea8Slogwang {
598*a9643ea8Slogwang 	return _use_num;
599*a9643ea8Slogwang }
600*a9643ea8Slogwang 
601*a9643ea8Slogwang /**
602*a9643ea8Slogwang  * @brief ΢�߳̿����, ȫ��ʵ����ȡ
603*a9643ea8Slogwang  */
604*a9643ea8Slogwang MtFrame *MtFrame::_instance = NULL;
605*a9643ea8Slogwang inline MtFrame* MtFrame::Instance ()
606*a9643ea8Slogwang {
607*a9643ea8Slogwang     if (NULL == _instance )
608*a9643ea8Slogwang     {
609*a9643ea8Slogwang         _instance = new MtFrame();
610*a9643ea8Slogwang     }
611*a9643ea8Slogwang 
612*a9643ea8Slogwang     return _instance;
613*a9643ea8Slogwang }
614*a9643ea8Slogwang 
615*a9643ea8Slogwang /**
616*a9643ea8Slogwang  * @brief HOOKϵͳapi������
617*a9643ea8Slogwang  */
618*a9643ea8Slogwang void MtFrame::SetHookFlag() {
619*a9643ea8Slogwang     mt_set_hook_flag();
620*a9643ea8Slogwang };
621*a9643ea8Slogwang 
622*a9643ea8Slogwang 
623*a9643ea8Slogwang /**
624*a9643ea8Slogwang  * @brief ��ܳ�ʼ��, Ĭ�ϲ�����־����
625*a9643ea8Slogwang  */
626*a9643ea8Slogwang bool MtFrame::InitFrame(LogAdapter* logadpt, int max_thread_num)
627*a9643ea8Slogwang {
628*a9643ea8Slogwang     _log_adpt = logadpt;
629*a9643ea8Slogwang 
630*a9643ea8Slogwang     // �������������߳���Ŀ, ���Ե���epoll��ص�fd��Ŀ
631*a9643ea8Slogwang     if ((this->InitKqueue(max_thread_num) < 0) || !this->InitialPool(max_thread_num))
632*a9643ea8Slogwang     {
633*a9643ea8Slogwang         MTLOG_ERROR("Init epoll or thread pool failed");
634*a9643ea8Slogwang         this->Destroy();
635*a9643ea8Slogwang         return false;
636*a9643ea8Slogwang     }
637*a9643ea8Slogwang 
638*a9643ea8Slogwang     // �������öѴ�С, �Ŵ�Ѹ���Ϊ2��
639*a9643ea8Slogwang     if (_sleeplist.HeapResize(max_thread_num * 2) < 0)
640*a9643ea8Slogwang     {
641*a9643ea8Slogwang         MTLOG_ERROR("Init heap list failed");
642*a9643ea8Slogwang         this->Destroy();
643*a9643ea8Slogwang         return false;
644*a9643ea8Slogwang     }
645*a9643ea8Slogwang 
646*a9643ea8Slogwang     // ��ʱ�������ʼ��, �Ŵ�Ѹ���Ϊ2��
647*a9643ea8Slogwang     _timer = new CTimerMng(max_thread_num * 2);
648*a9643ea8Slogwang     if (NULL == _timer)
649*a9643ea8Slogwang     {
650*a9643ea8Slogwang         MTLOG_ERROR("Init heap timer failed");
651*a9643ea8Slogwang         this->Destroy();
652*a9643ea8Slogwang         return false;
653*a9643ea8Slogwang     }
654*a9643ea8Slogwang 
655*a9643ea8Slogwang     // �ػ��̵߳�����ʼ��
656*a9643ea8Slogwang     _daemon = AllocThread();
657*a9643ea8Slogwang     if (NULL == _daemon)
658*a9643ea8Slogwang     {
659*a9643ea8Slogwang         MTLOG_ERROR("Alloc daemon thread failed");
660*a9643ea8Slogwang         this->Destroy();
661*a9643ea8Slogwang         return false;
662*a9643ea8Slogwang     }
663*a9643ea8Slogwang     _daemon->SetType(MicroThread::DAEMON);
664*a9643ea8Slogwang     _daemon->SetState(MicroThread::RUNABLE);
665*a9643ea8Slogwang     _daemon->SetSartFunc(MtFrame::DaemonRun, this);
666*a9643ea8Slogwang 
667*a9643ea8Slogwang     // �����߳�, ����INIT, ����ʼ��ջ, Ҳ�޻ص�ע��, ����Ҫͳһ����
668*a9643ea8Slogwang     _primo = new MicroThread(MicroThread::PRIMORDIAL);
669*a9643ea8Slogwang     if (NULL == _primo)
670*a9643ea8Slogwang     {
671*a9643ea8Slogwang         MTLOG_ERROR("new _primo thread failed");
672*a9643ea8Slogwang         this->Destroy();
673*a9643ea8Slogwang         return false;
674*a9643ea8Slogwang     }
675*a9643ea8Slogwang     _primo->SetState(MicroThread::RUNNING);
676*a9643ea8Slogwang     SetActiveThread(_primo);
677*a9643ea8Slogwang 
678*a9643ea8Slogwang     // ��������ʱ���
679*a9643ea8Slogwang     _last_clock = GetSystemMS();
680*a9643ea8Slogwang     TAILQ_INIT(&_iolist);
681*a9643ea8Slogwang     TAILQ_INIT(&_pend_list);
682*a9643ea8Slogwang 
683*a9643ea8Slogwang 	//SetHookFlag();
684*a9643ea8Slogwang 
685*a9643ea8Slogwang     return true;
686*a9643ea8Slogwang 
687*a9643ea8Slogwang }
688*a9643ea8Slogwang 
689*a9643ea8Slogwang /**
690*a9643ea8Slogwang  * @brief ��ܷ���ʼ��
691*a9643ea8Slogwang  */
692*a9643ea8Slogwang void MtFrame::Destroy(void)
693*a9643ea8Slogwang {
694*a9643ea8Slogwang     if (NULL == _instance )
695*a9643ea8Slogwang     {
696*a9643ea8Slogwang         return;
697*a9643ea8Slogwang     }
698*a9643ea8Slogwang 
699*a9643ea8Slogwang     if (_primo) {
700*a9643ea8Slogwang         delete _primo;
701*a9643ea8Slogwang         _primo = NULL;
702*a9643ea8Slogwang     }
703*a9643ea8Slogwang 
704*a9643ea8Slogwang     if (_daemon) {
705*a9643ea8Slogwang         FreeThread(_daemon);
706*a9643ea8Slogwang         _daemon = NULL;
707*a9643ea8Slogwang     }
708*a9643ea8Slogwang 
709*a9643ea8Slogwang     TAILQ_INIT(&_iolist);
710*a9643ea8Slogwang 
711*a9643ea8Slogwang     MicroThread* thread = dynamic_cast<MicroThread*>(_sleeplist.HeapPop());
712*a9643ea8Slogwang     while (thread)
713*a9643ea8Slogwang     {
714*a9643ea8Slogwang         FreeThread(thread);
715*a9643ea8Slogwang         thread = dynamic_cast<MicroThread*>(_sleeplist.HeapPop());
716*a9643ea8Slogwang     }
717*a9643ea8Slogwang 
718*a9643ea8Slogwang     while (!_runlist.empty())
719*a9643ea8Slogwang     {
720*a9643ea8Slogwang         thread = _runlist.front();
721*a9643ea8Slogwang         _runlist.pop();
722*a9643ea8Slogwang         FreeThread(thread);
723*a9643ea8Slogwang     }
724*a9643ea8Slogwang 
725*a9643ea8Slogwang     MicroThread* tmp;
726*a9643ea8Slogwang     TAILQ_FOREACH_SAFE(thread, &_pend_list, _entry, tmp)
727*a9643ea8Slogwang     {
728*a9643ea8Slogwang         TAILQ_REMOVE(&_pend_list, thread, _entry);
729*a9643ea8Slogwang         FreeThread(thread);
730*a9643ea8Slogwang     }
731*a9643ea8Slogwang 
732*a9643ea8Slogwang     if (_timer != NULL)
733*a9643ea8Slogwang     {
734*a9643ea8Slogwang         delete _timer;
735*a9643ea8Slogwang         _timer = NULL;
736*a9643ea8Slogwang     }
737*a9643ea8Slogwang 
738*a9643ea8Slogwang     _instance->DestroyPool();
739*a9643ea8Slogwang     _instance->TermKqueue();
740*a9643ea8Slogwang     delete _instance;
741*a9643ea8Slogwang     _instance = NULL;
742*a9643ea8Slogwang }
743*a9643ea8Slogwang 
744*a9643ea8Slogwang /**
745*a9643ea8Slogwang  * @brief ΢�߳̿�ܰ汾��ȡ
746*a9643ea8Slogwang  */
747*a9643ea8Slogwang char* MtFrame::Version()
748*a9643ea8Slogwang {
749*a9643ea8Slogwang     return IMT_VERSION;
750*a9643ea8Slogwang }
751*a9643ea8Slogwang 
752*a9643ea8Slogwang /**
753*a9643ea8Slogwang  * @brief ΢�̴߳����ӿ�
754*a9643ea8Slogwang  * @param entry �߳���ں���
755*a9643ea8Slogwang  * @param args  �߳���ڲ���
756*a9643ea8Slogwang  * @return ΢�߳�ָ��, NULL��ʾʧ��
757*a9643ea8Slogwang  */
758*a9643ea8Slogwang MicroThread* MtFrame::CreateThread(ThreadStart entry, void *args, bool runable)
759*a9643ea8Slogwang {
760*a9643ea8Slogwang     MtFrame* mtframe = MtFrame::Instance();
761*a9643ea8Slogwang     MicroThread* thread = mtframe->AllocThread();
762*a9643ea8Slogwang     if (NULL == thread)
763*a9643ea8Slogwang     {
764*a9643ea8Slogwang         MTLOG_ERROR("create thread failed");
765*a9643ea8Slogwang         return NULL;
766*a9643ea8Slogwang     }
767*a9643ea8Slogwang     thread->SetSartFunc(entry, args);
768*a9643ea8Slogwang 
769*a9643ea8Slogwang     if (runable) {
770*a9643ea8Slogwang         mtframe->InsertRunable(thread);
771*a9643ea8Slogwang     }
772*a9643ea8Slogwang 
773*a9643ea8Slogwang     return thread;
774*a9643ea8Slogwang }
775*a9643ea8Slogwang 
776*a9643ea8Slogwang int MtFrame::Loop(void* args)
777*a9643ea8Slogwang {
778*a9643ea8Slogwang     MtFrame* mtframe = MtFrame::Instance();
779*a9643ea8Slogwang     MicroThread* daemon = mtframe->DaemonThread();
780*a9643ea8Slogwang 
781*a9643ea8Slogwang 	mtframe->KqueueDispatch();
782*a9643ea8Slogwang 	mtframe->SetLastClock(mtframe->GetSystemMS());
783*a9643ea8Slogwang 	mtframe->WakeupTimeout();
784*a9643ea8Slogwang 	mtframe->CheckExpired();
785*a9643ea8Slogwang 	daemon->SwitchContext();
786*a9643ea8Slogwang 
787*a9643ea8Slogwang 	return 0;
788*a9643ea8Slogwang }
789*a9643ea8Slogwang 
790*a9643ea8Slogwang /**
791*a9643ea8Slogwang  * @brief �ػ��߳���ں���, ����ָ��Ҫ��static����
792*a9643ea8Slogwang  * @param args  �߳���ڲ���
793*a9643ea8Slogwang  */
794*a9643ea8Slogwang void MtFrame::DaemonRun(void* args)
795*a9643ea8Slogwang {
796*a9643ea8Slogwang 	/*
797*a9643ea8Slogwang     MtFrame* mtframe = MtFrame::Instance();
798*a9643ea8Slogwang     MicroThread* daemon = mtframe->DaemonThread();
799*a9643ea8Slogwang 
800*a9643ea8Slogwang 	while (true) {
801*a9643ea8Slogwang 		mtframe->KqueueDispatch();
802*a9643ea8Slogwang 		mtframe->SetLastClock(mtframe->GetSystemMS());
803*a9643ea8Slogwang 		mtframe->WakeupTimeout();
804*a9643ea8Slogwang 		mtframe->CheckExpired();
805*a9643ea8Slogwang 		daemon->SwitchContext();
806*a9643ea8Slogwang 	}
807*a9643ea8Slogwang 	*/
808*a9643ea8Slogwang 	ff_run(MtFrame::Loop, NULL);
809*a9643ea8Slogwang }
810*a9643ea8Slogwang 
811*a9643ea8Slogwang /**
812*a9643ea8Slogwang  * @brief ��ȡ��ǰ�̵߳ĸ��߳�
813*a9643ea8Slogwang  */
814*a9643ea8Slogwang MicroThread *MtFrame::GetRootThread()
815*a9643ea8Slogwang {
816*a9643ea8Slogwang     if (NULL == _curr_thread)
817*a9643ea8Slogwang     {
818*a9643ea8Slogwang         return NULL;
819*a9643ea8Slogwang     }
820*a9643ea8Slogwang 
821*a9643ea8Slogwang     MicroThread::ThreadType type = _curr_thread->GetType();
822*a9643ea8Slogwang     MicroThread *thread = _curr_thread;
823*a9643ea8Slogwang     MicroThread *parent = thread;
824*a9643ea8Slogwang 
825*a9643ea8Slogwang     while (MicroThread::SUB_THREAD == type)
826*a9643ea8Slogwang     {
827*a9643ea8Slogwang         thread = thread->GetParent();
828*a9643ea8Slogwang         if (!thread)
829*a9643ea8Slogwang         {
830*a9643ea8Slogwang             break;
831*a9643ea8Slogwang         }
832*a9643ea8Slogwang 
833*a9643ea8Slogwang         type   = thread->GetType();
834*a9643ea8Slogwang         parent = thread;
835*a9643ea8Slogwang     }
836*a9643ea8Slogwang 
837*a9643ea8Slogwang     return parent;
838*a9643ea8Slogwang }
839*a9643ea8Slogwang 
840*a9643ea8Slogwang /**
841*a9643ea8Slogwang  * @brief ��ܵ����߳�����
842*a9643ea8Slogwang  */
843*a9643ea8Slogwang void MtFrame::ThreadSchdule()
844*a9643ea8Slogwang {
845*a9643ea8Slogwang     MicroThread* thread = NULL;
846*a9643ea8Slogwang     MtFrame* mtframe = MtFrame::Instance();
847*a9643ea8Slogwang 
848*a9643ea8Slogwang     if (mtframe->_runlist.empty())
849*a9643ea8Slogwang     {
850*a9643ea8Slogwang         thread = mtframe->DaemonThread();
851*a9643ea8Slogwang     }
852*a9643ea8Slogwang     else
853*a9643ea8Slogwang     {
854*a9643ea8Slogwang         thread = mtframe->_runlist.front();
855*a9643ea8Slogwang         mtframe->RemoveRunable(thread);
856*a9643ea8Slogwang     }
857*a9643ea8Slogwang 
858*a9643ea8Slogwang     this->SetActiveThread(thread);
859*a9643ea8Slogwang     thread->SetState(MicroThread::RUNNING);
860*a9643ea8Slogwang     thread->RestoreContext();
861*a9643ea8Slogwang }
862*a9643ea8Slogwang 
863*a9643ea8Slogwang /**
864*a9643ea8Slogwang  * @brief ��ܴ���ʱ�ص�����
865*a9643ea8Slogwang  */
866*a9643ea8Slogwang void MtFrame::CheckExpired()
867*a9643ea8Slogwang {
868*a9643ea8Slogwang     static utime64_t check_time = 0;
869*a9643ea8Slogwang 
870*a9643ea8Slogwang     if (_timer != NULL)
871*a9643ea8Slogwang     {
872*a9643ea8Slogwang         _timer->check_expired();
873*a9643ea8Slogwang     }
874*a9643ea8Slogwang 
875*a9643ea8Slogwang    utime64_t now = GetLastClock();
876*a9643ea8Slogwang 
877*a9643ea8Slogwang     if ((now - check_time) > 1000)
878*a9643ea8Slogwang     {
879*a9643ea8Slogwang         CNetMgr::Instance()->RecycleObjs(now);
880*a9643ea8Slogwang         check_time = now;
881*a9643ea8Slogwang     }
882*a9643ea8Slogwang }
883*a9643ea8Slogwang 
884*a9643ea8Slogwang /**
885*a9643ea8Slogwang  * @brief ��ܼ�⵽��ʱ, �������еij�ʱ�߳�
886*a9643ea8Slogwang  */
887*a9643ea8Slogwang void MtFrame::WakeupTimeout()
888*a9643ea8Slogwang {
889*a9643ea8Slogwang     utime64_t now = GetLastClock();
890*a9643ea8Slogwang     MicroThread* thread = dynamic_cast<MicroThread*>(_sleeplist.HeapTop());
891*a9643ea8Slogwang     while (thread && (thread->GetWakeupTime() <= now))
892*a9643ea8Slogwang     {
893*a9643ea8Slogwang         if (thread->HasFlag(MicroThread::IO_LIST))
894*a9643ea8Slogwang 	    {
895*a9643ea8Slogwang             RemoveIoWait(thread);
896*a9643ea8Slogwang         }
897*a9643ea8Slogwang         else
898*a9643ea8Slogwang         {
899*a9643ea8Slogwang             RemoveSleep(thread);
900*a9643ea8Slogwang         }
901*a9643ea8Slogwang 
902*a9643ea8Slogwang         InsertRunable(thread);
903*a9643ea8Slogwang 
904*a9643ea8Slogwang         thread = dynamic_cast<MicroThread*>(_sleeplist.HeapTop());
905*a9643ea8Slogwang     }
906*a9643ea8Slogwang }
907*a9643ea8Slogwang 
908*a9643ea8Slogwang /**
909*a9643ea8Slogwang  * @brief ��ܵ���epoll waitǰ, �ж��ȴ�ʱ����Ϣ
910*a9643ea8Slogwang  */
911*a9643ea8Slogwang int MtFrame::KqueueGetTimeout()
912*a9643ea8Slogwang {
913*a9643ea8Slogwang     utime64_t now = GetLastClock();
914*a9643ea8Slogwang     MicroThread* thread = dynamic_cast<MicroThread*>(_sleeplist.HeapTop());
915*a9643ea8Slogwang     if (!thread)
916*a9643ea8Slogwang     {
917*a9643ea8Slogwang         return 10; //Ĭ��10ms epollwait
918*a9643ea8Slogwang     }
919*a9643ea8Slogwang     else if (thread->GetWakeupTime() < now)
920*a9643ea8Slogwang     {
921*a9643ea8Slogwang         return 0;
922*a9643ea8Slogwang     }
923*a9643ea8Slogwang     else
924*a9643ea8Slogwang     {
925*a9643ea8Slogwang         return (int)(thread->GetWakeupTime() - now);
926*a9643ea8Slogwang     }
927*a9643ea8Slogwang }
928*a9643ea8Slogwang 
929*a9643ea8Slogwang /**
930*a9643ea8Slogwang  * @brief ��ܹ����̵߳�Ԫ, ���������
931*a9643ea8Slogwang  * @param thread ΢�̶߳���
932*a9643ea8Slogwang  */
933*a9643ea8Slogwang inline void MtFrame::InsertSleep(MicroThread* thread)
934*a9643ea8Slogwang {
935*a9643ea8Slogwang     ASSERT(!thread->HasFlag(MicroThread::SLEEP_LIST));
936*a9643ea8Slogwang 
937*a9643ea8Slogwang     thread->SetFlag(MicroThread::SLEEP_LIST);
938*a9643ea8Slogwang     thread->SetState(MicroThread::SLEEPING);
939*a9643ea8Slogwang     int rc = _sleeplist.HeapPush(thread);
940*a9643ea8Slogwang     if (rc < 0)
941*a9643ea8Slogwang     {
942*a9643ea8Slogwang         MT_ATTR_API(320848, 1); // heap error
943*a9643ea8Slogwang         MTLOG_ERROR("Insert heap failed , rc %d", rc);
944*a9643ea8Slogwang     }
945*a9643ea8Slogwang }
946*a9643ea8Slogwang 
947*a9643ea8Slogwang /**
948*a9643ea8Slogwang  * @brief ��ܹ����̵߳�Ԫ, �Ƴ������
949*a9643ea8Slogwang  * @param thread ΢�̶߳���
950*a9643ea8Slogwang  */
951*a9643ea8Slogwang inline void MtFrame::RemoveSleep(MicroThread* thread)
952*a9643ea8Slogwang {
953*a9643ea8Slogwang     ASSERT(thread->HasFlag(MicroThread::SLEEP_LIST));
954*a9643ea8Slogwang     thread->UnsetFlag(MicroThread::SLEEP_LIST);
955*a9643ea8Slogwang 
956*a9643ea8Slogwang     int rc = _sleeplist.HeapDelete(thread);
957*a9643ea8Slogwang     if (rc < 0)
958*a9643ea8Slogwang     {
959*a9643ea8Slogwang         MT_ATTR_API(320849, 1); // heap error
960*a9643ea8Slogwang         MTLOG_ERROR("remove heap failed , rc %d", rc);
961*a9643ea8Slogwang     }
962*a9643ea8Slogwang }
963*a9643ea8Slogwang 
964*a9643ea8Slogwang /**
965*a9643ea8Slogwang  * @brief ��ܹ����̵߳�Ԫ, ִ��IO�ȴ�״̬
966*a9643ea8Slogwang  * @param thread ΢�̶߳���
967*a9643ea8Slogwang  */
968*a9643ea8Slogwang inline void MtFrame::InsertIoWait(MicroThread* thread)
969*a9643ea8Slogwang {
970*a9643ea8Slogwang     ASSERT(!thread->HasFlag(MicroThread::IO_LIST));
971*a9643ea8Slogwang     thread->SetFlag(MicroThread::IO_LIST);
972*a9643ea8Slogwang     TAILQ_INSERT_TAIL(&_iolist, thread, _entry);
973*a9643ea8Slogwang     InsertSleep(thread);
974*a9643ea8Slogwang }
975*a9643ea8Slogwang 
976*a9643ea8Slogwang /**
977*a9643ea8Slogwang  * @brief ��ܹ����̵߳�Ԫ, �Ƴ�IO�ȴ�״̬
978*a9643ea8Slogwang  * @param thread ΢�̶߳���
979*a9643ea8Slogwang  */
980*a9643ea8Slogwang void MtFrame::RemoveIoWait(MicroThread* thread)
981*a9643ea8Slogwang {
982*a9643ea8Slogwang     ASSERT(thread->HasFlag(MicroThread::IO_LIST));
983*a9643ea8Slogwang     thread->UnsetFlag(MicroThread::IO_LIST);
984*a9643ea8Slogwang     TAILQ_REMOVE(&_iolist, thread, _entry);
985*a9643ea8Slogwang 
986*a9643ea8Slogwang     RemoveSleep(thread);
987*a9643ea8Slogwang }
988*a9643ea8Slogwang 
989*a9643ea8Slogwang /**
990*a9643ea8Slogwang  * @brief ��ܹ����̵߳�Ԫ, ��������ж���
991*a9643ea8Slogwang  * @param thread ΢�̶߳���
992*a9643ea8Slogwang  */
993*a9643ea8Slogwang void MtFrame::InsertRunable(MicroThread* thread)
994*a9643ea8Slogwang {
995*a9643ea8Slogwang     ASSERT(!thread->HasFlag(MicroThread::RUN_LIST));
996*a9643ea8Slogwang     thread->SetFlag(MicroThread::RUN_LIST);
997*a9643ea8Slogwang 
998*a9643ea8Slogwang     thread->SetState(MicroThread::RUNABLE);
999*a9643ea8Slogwang     _runlist.push(thread);
1000*a9643ea8Slogwang     _waitnum++;
1001*a9643ea8Slogwang }
1002*a9643ea8Slogwang 
1003*a9643ea8Slogwang /**
1004*a9643ea8Slogwang  * @brief ��ܹ����̵߳�Ԫ, �Ƴ������ж���
1005*a9643ea8Slogwang  * @param thread ΢�̶߳���
1006*a9643ea8Slogwang  */
1007*a9643ea8Slogwang inline void MtFrame::RemoveRunable(MicroThread* thread)
1008*a9643ea8Slogwang {
1009*a9643ea8Slogwang     ASSERT(thread->HasFlag(MicroThread::RUN_LIST));
1010*a9643ea8Slogwang     ASSERT(thread == _runlist.front());
1011*a9643ea8Slogwang     thread->UnsetFlag(MicroThread::RUN_LIST);
1012*a9643ea8Slogwang 
1013*a9643ea8Slogwang     _runlist.pop();
1014*a9643ea8Slogwang     _waitnum--;
1015*a9643ea8Slogwang }
1016*a9643ea8Slogwang 
1017*a9643ea8Slogwang 
1018*a9643ea8Slogwang /**
1019*a9643ea8Slogwang  * @brief ��ܹ����̵߳�Ԫ, ִ��pend�ȴ�״̬
1020*a9643ea8Slogwang  * @param thread ΢�̶߳���
1021*a9643ea8Slogwang  */
1022*a9643ea8Slogwang void MtFrame::InsertPend(MicroThread* thread)
1023*a9643ea8Slogwang {
1024*a9643ea8Slogwang     ASSERT(!thread->HasFlag(MicroThread::PEND_LIST));
1025*a9643ea8Slogwang     thread->SetFlag(MicroThread::PEND_LIST);
1026*a9643ea8Slogwang     TAILQ_INSERT_TAIL(&_pend_list, thread, _entry);
1027*a9643ea8Slogwang     thread->SetState(MicroThread::PENDING);
1028*a9643ea8Slogwang }
1029*a9643ea8Slogwang 
1030*a9643ea8Slogwang /**
1031*a9643ea8Slogwang  * @brief ��ܹ����̵߳�Ԫ, �Ƴ�PEND�ȴ�״̬
1032*a9643ea8Slogwang  * @param thread ΢�̶߳���
1033*a9643ea8Slogwang  */
1034*a9643ea8Slogwang void MtFrame::RemovePend(MicroThread* thread)
1035*a9643ea8Slogwang {
1036*a9643ea8Slogwang     ASSERT(thread->HasFlag(MicroThread::PEND_LIST));
1037*a9643ea8Slogwang     thread->UnsetFlag(MicroThread::PEND_LIST);
1038*a9643ea8Slogwang     TAILQ_REMOVE(&_pend_list, thread, _entry);
1039*a9643ea8Slogwang }
1040*a9643ea8Slogwang 
1041*a9643ea8Slogwang /**
1042*a9643ea8Slogwang  * @brief ΢�߳������л�, �ȴ������̵߳Ļ���
1043*a9643ea8Slogwang  * @param timeout ��ȴ�ʱ��, ����
1044*a9643ea8Slogwang  */
1045*a9643ea8Slogwang void MtFrame::WaitNotify(utime64_t timeout)
1046*a9643ea8Slogwang {
1047*a9643ea8Slogwang     MicroThread* thread = GetActiveThread();
1048*a9643ea8Slogwang 
1049*a9643ea8Slogwang     thread->SetWakeupTime(timeout + this->GetLastClock());
1050*a9643ea8Slogwang     this->InsertIoWait(thread);
1051*a9643ea8Slogwang     thread->SwitchContext();
1052*a9643ea8Slogwang }
1053*a9643ea8Slogwang 
1054*a9643ea8Slogwang /**
1055*a9643ea8Slogwang  * @brief ΢�̴߳����л�����,���óɹ� ���ó�cpu
1056*a9643ea8Slogwang  * @param fdlist ��·������socket�б�
1057*a9643ea8Slogwang  * @param fd ���������fd��Ϣ
1058*a9643ea8Slogwang  * @param timeout ��ȴ�ʱ��, ����
1059*a9643ea8Slogwang  * @return true �ɹ�, false ʧ��
1060*a9643ea8Slogwang  */
1061*a9643ea8Slogwang bool MtFrame::KqueueSchedule(KqObjList* fdlist, KqueuerObj* fd, int timeout)
1062*a9643ea8Slogwang {
1063*a9643ea8Slogwang     MicroThread* thread = GetActiveThread();
1064*a9643ea8Slogwang     if (NULL == thread)
1065*a9643ea8Slogwang     {
1066*a9643ea8Slogwang         MTLOG_ERROR("active thread null, epoll schedule failed");
1067*a9643ea8Slogwang         return false;
1068*a9643ea8Slogwang     }
1069*a9643ea8Slogwang 
1070*a9643ea8Slogwang     // 1. ���ϸ��߳���Ҫ���ĵ�epoll���ȶ���
1071*a9643ea8Slogwang     thread->ClearAllFd();
1072*a9643ea8Slogwang     if (fdlist)
1073*a9643ea8Slogwang     {
1074*a9643ea8Slogwang         thread->AddFdList(fdlist);
1075*a9643ea8Slogwang     }
1076*a9643ea8Slogwang     if (fd)
1077*a9643ea8Slogwang     {
1078*a9643ea8Slogwang         thread->AddFd(fd);
1079*a9643ea8Slogwang     }
1080*a9643ea8Slogwang 
1081*a9643ea8Slogwang     // 2. ����epoll�����¼�, ������ʱʱ��, �л�IO�ȴ�״̬, �����л�
1082*a9643ea8Slogwang     thread->SetWakeupTime(timeout + this->GetLastClock());
1083*a9643ea8Slogwang     if (!this->KqueueAdd(thread->GetFdSet()))
1084*a9643ea8Slogwang     {
1085*a9643ea8Slogwang         MTLOG_ERROR("epoll add failed, errno: %d", errno);
1086*a9643ea8Slogwang         return false;
1087*a9643ea8Slogwang     }
1088*a9643ea8Slogwang     this->InsertIoWait(thread);
1089*a9643ea8Slogwang     thread->SwitchContext();
1090*a9643ea8Slogwang 
1091*a9643ea8Slogwang     // 3. ����OK, �ж���ʱ, epoll ctrl ��ԭ״̬
1092*a9643ea8Slogwang     int rcvnum = 0;
1093*a9643ea8Slogwang     KqObjList& rcvfds = thread->GetFdSet();
1094*a9643ea8Slogwang     KqueuerObj* fdata = NULL;
1095*a9643ea8Slogwang     TAILQ_FOREACH(fdata, &rcvfds, _entry)
1096*a9643ea8Slogwang     {
1097*a9643ea8Slogwang         if (fdata->GetRcvEvents() != 0)
1098*a9643ea8Slogwang         {
1099*a9643ea8Slogwang             rcvnum++;
1100*a9643ea8Slogwang         }
1101*a9643ea8Slogwang     }
1102*a9643ea8Slogwang     this->KqueueDel(rcvfds);     // ��һ��������ADD, DEL �ջ�����
1103*a9643ea8Slogwang 
1104*a9643ea8Slogwang     if (rcvnum == 0)    // ��ʱ����, ���ش���
1105*a9643ea8Slogwang     {
1106*a9643ea8Slogwang         errno = ETIME;
1107*a9643ea8Slogwang         return false;
1108*a9643ea8Slogwang     }
1109*a9643ea8Slogwang 
1110*a9643ea8Slogwang     return true;
1111*a9643ea8Slogwang }
1112*a9643ea8Slogwang 
1113*a9643ea8Slogwang /**
1114*a9643ea8Slogwang  * @brief ΢�̰߳�����ϵͳIO���� recvfrom
1115*a9643ea8Slogwang  * @param fd ϵͳsocket��Ϣ
1116*a9643ea8Slogwang  * @param buf ������Ϣ������ָ��
1117*a9643ea8Slogwang  * @param len ������Ϣ����������
1118*a9643ea8Slogwang  * @param from ��Դ��ַ��ָ��
1119*a9643ea8Slogwang  * @param fromlen ��Դ��ַ�Ľṹ����
1120*a9643ea8Slogwang  * @param timeout ��ȴ�ʱ��, ����
1121*a9643ea8Slogwang  * @return >0 �ɹ����ճ���, <0 ʧ��
1122*a9643ea8Slogwang  */
1123*a9643ea8Slogwang int MtFrame::recvfrom(int fd, void *buf, int len, int flags, struct sockaddr *from, socklen_t *fromlen, int timeout)
1124*a9643ea8Slogwang {
1125*a9643ea8Slogwang     MtFrame* mtframe = MtFrame::Instance();
1126*a9643ea8Slogwang     utime64_t start = mtframe->GetLastClock();
1127*a9643ea8Slogwang     MicroThread* thread = mtframe->GetActiveThread();
1128*a9643ea8Slogwang     utime64_t now = 0;
1129*a9643ea8Slogwang 
1130*a9643ea8Slogwang     if(fd<0 || !buf || len<1)
1131*a9643ea8Slogwang     {
1132*a9643ea8Slogwang     	errno = EINVAL;
1133*a9643ea8Slogwang     	MTLOG_ERROR("recvfrom failed, errno: %d (%m)", errno);
1134*a9643ea8Slogwang     	return -10;
1135*a9643ea8Slogwang     }
1136*a9643ea8Slogwang 
1137*a9643ea8Slogwang     if (timeout <= -1)
1138*a9643ea8Slogwang     {
1139*a9643ea8Slogwang         timeout = 0x7fffffff;
1140*a9643ea8Slogwang     }
1141*a9643ea8Slogwang 
1142*a9643ea8Slogwang     while (true)
1143*a9643ea8Slogwang     {
1144*a9643ea8Slogwang         now = mtframe->GetLastClock();
1145*a9643ea8Slogwang         if ((int)(now - start) > timeout)
1146*a9643ea8Slogwang         {
1147*a9643ea8Slogwang             errno = ETIME;
1148*a9643ea8Slogwang             return -1;
1149*a9643ea8Slogwang         }
1150*a9643ea8Slogwang 
1151*a9643ea8Slogwang         KqueuerObj epfd;
1152*a9643ea8Slogwang         epfd.SetOsfd(fd);
1153*a9643ea8Slogwang         epfd.EnableInput();
1154*a9643ea8Slogwang         epfd.SetOwnerThread(thread);
1155*a9643ea8Slogwang         if (!mtframe->KqueueSchedule(NULL, &epfd, timeout))
1156*a9643ea8Slogwang         {
1157*a9643ea8Slogwang             MTLOG_DEBUG("epoll schedule failed, errno: %d", errno);
1158*a9643ea8Slogwang             return -2;
1159*a9643ea8Slogwang         }
1160*a9643ea8Slogwang 
1161*a9643ea8Slogwang         mt_hook_syscall(recvfrom);
1162*a9643ea8Slogwang         int n = ff_hook_recvfrom(fd, buf, len, flags, from, fromlen);
1163*a9643ea8Slogwang         if (n < 0)
1164*a9643ea8Slogwang         {
1165*a9643ea8Slogwang             if (errno == EINTR) {
1166*a9643ea8Slogwang                 continue;
1167*a9643ea8Slogwang             }
1168*a9643ea8Slogwang 
1169*a9643ea8Slogwang             if ((errno != EAGAIN) && (errno != EWOULDBLOCK))
1170*a9643ea8Slogwang             {
1171*a9643ea8Slogwang                 MTLOG_ERROR("recvfrom failed, errno: %d", errno);
1172*a9643ea8Slogwang                 return -3;
1173*a9643ea8Slogwang             }
1174*a9643ea8Slogwang         }
1175*a9643ea8Slogwang         else
1176*a9643ea8Slogwang         {
1177*a9643ea8Slogwang             return n;
1178*a9643ea8Slogwang         }
1179*a9643ea8Slogwang     }
1180*a9643ea8Slogwang 
1181*a9643ea8Slogwang }
1182*a9643ea8Slogwang 
1183*a9643ea8Slogwang /**
1184*a9643ea8Slogwang  * @brief ΢�̰߳�����ϵͳIO���� sendto
1185*a9643ea8Slogwang  * @param fd ϵͳsocket��Ϣ
1186*a9643ea8Slogwang  * @param msg �����͵���Ϣָ��
1187*a9643ea8Slogwang  * @param len �����͵���Ϣ����
1188*a9643ea8Slogwang  * @param to Ŀ�ĵ�ַ��ָ��
1189*a9643ea8Slogwang  * @param tolen Ŀ�ĵ�ַ�Ľṹ����
1190*a9643ea8Slogwang  * @param timeout ��ȴ�ʱ��, ����
1191*a9643ea8Slogwang  * @return >0 �ɹ����ͳ���, <0 ʧ��
1192*a9643ea8Slogwang  */
1193*a9643ea8Slogwang int MtFrame::sendto(int fd, const void *msg, int len, int flags, const struct sockaddr *to, int tolen, int timeout)
1194*a9643ea8Slogwang {
1195*a9643ea8Slogwang     MtFrame* mtframe = MtFrame::Instance();
1196*a9643ea8Slogwang     utime64_t start = mtframe->GetLastClock();
1197*a9643ea8Slogwang     MicroThread* thread = mtframe->GetActiveThread();
1198*a9643ea8Slogwang     utime64_t now = 0;
1199*a9643ea8Slogwang 
1200*a9643ea8Slogwang     if(fd<0 || !msg || len<1)
1201*a9643ea8Slogwang     {
1202*a9643ea8Slogwang     	errno = EINVAL;
1203*a9643ea8Slogwang     	MTLOG_ERROR("sendto failed, errno: %d (%m)", errno);
1204*a9643ea8Slogwang     	return -10;
1205*a9643ea8Slogwang     }
1206*a9643ea8Slogwang 
1207*a9643ea8Slogwang     int n = 0;
1208*a9643ea8Slogwang     mt_hook_syscall(sendto);
1209*a9643ea8Slogwang     while ((n = ff_hook_sendto(fd, msg, len, flags, to, tolen)) < 0)
1210*a9643ea8Slogwang     {
1211*a9643ea8Slogwang         now = mtframe->GetLastClock();
1212*a9643ea8Slogwang         if ((int)(now - start) > timeout)
1213*a9643ea8Slogwang         {
1214*a9643ea8Slogwang             errno = ETIME;
1215*a9643ea8Slogwang             return -1;
1216*a9643ea8Slogwang         }
1217*a9643ea8Slogwang 
1218*a9643ea8Slogwang         if (errno == EINTR) {
1219*a9643ea8Slogwang             continue;
1220*a9643ea8Slogwang         }
1221*a9643ea8Slogwang 
1222*a9643ea8Slogwang         if ((errno != EAGAIN) && (errno != EWOULDBLOCK)) {
1223*a9643ea8Slogwang             MTLOG_ERROR("sendto failed, errno: %d", errno);
1224*a9643ea8Slogwang             return -2;
1225*a9643ea8Slogwang         }
1226*a9643ea8Slogwang 
1227*a9643ea8Slogwang         KqueuerObj epfd;
1228*a9643ea8Slogwang         epfd.SetOsfd(fd);
1229*a9643ea8Slogwang         epfd.EnableOutput();
1230*a9643ea8Slogwang         epfd.SetOwnerThread(thread);
1231*a9643ea8Slogwang         if (!mtframe->KqueueSchedule(NULL, &epfd, timeout)) {
1232*a9643ea8Slogwang             return -3;
1233*a9643ea8Slogwang         }
1234*a9643ea8Slogwang     }
1235*a9643ea8Slogwang 
1236*a9643ea8Slogwang     return n;
1237*a9643ea8Slogwang }
1238*a9643ea8Slogwang 
1239*a9643ea8Slogwang /**
1240*a9643ea8Slogwang  * @brief ΢�̰߳�����ϵͳIO���� connect
1241*a9643ea8Slogwang  * @param fd ϵͳsocket��Ϣ
1242*a9643ea8Slogwang  * @param addr ָ��server��Ŀ�ĵ�ַ
1243*a9643ea8Slogwang  * @param addrlen ��ַ�ij���
1244*a9643ea8Slogwang  * @param timeout ��ȴ�ʱ��, ����
1245*a9643ea8Slogwang  * @return =0 ���ӳɹ�, <0 ʧ��
1246*a9643ea8Slogwang  */
1247*a9643ea8Slogwang int MtFrame::connect(int fd, const struct sockaddr *addr, int addrlen, int timeout)
1248*a9643ea8Slogwang {
1249*a9643ea8Slogwang     MtFrame* mtframe = MtFrame::Instance();
1250*a9643ea8Slogwang     utime64_t start = mtframe->GetLastClock();
1251*a9643ea8Slogwang     MicroThread* thread = mtframe->GetActiveThread();
1252*a9643ea8Slogwang     utime64_t now = 0;
1253*a9643ea8Slogwang 
1254*a9643ea8Slogwang     if(fd<0 || !addr || addrlen<1)
1255*a9643ea8Slogwang     {
1256*a9643ea8Slogwang     	errno = EINVAL;
1257*a9643ea8Slogwang     	MTLOG_ERROR("connect failed, errno: %d (%m)", errno);
1258*a9643ea8Slogwang     	return -10;
1259*a9643ea8Slogwang     }
1260*a9643ea8Slogwang 
1261*a9643ea8Slogwang     int n = 0;
1262*a9643ea8Slogwang     mt_hook_syscall(connect);
1263*a9643ea8Slogwang     while ((n = ff_hook_connect(fd, addr, addrlen)) < 0)
1264*a9643ea8Slogwang     {
1265*a9643ea8Slogwang         now = mtframe->GetLastClock();
1266*a9643ea8Slogwang         if ((int)(now - start) > timeout)
1267*a9643ea8Slogwang         {
1268*a9643ea8Slogwang             errno = ETIME;
1269*a9643ea8Slogwang             return -1;
1270*a9643ea8Slogwang         }
1271*a9643ea8Slogwang 
1272*a9643ea8Slogwang         if (errno == EISCONN)   // ������, ���سɹ�
1273*a9643ea8Slogwang         {
1274*a9643ea8Slogwang             return 0;
1275*a9643ea8Slogwang         }
1276*a9643ea8Slogwang 
1277*a9643ea8Slogwang         if (errno == EINTR) {
1278*a9643ea8Slogwang             continue;
1279*a9643ea8Slogwang         }
1280*a9643ea8Slogwang 
1281*a9643ea8Slogwang         if (errno != EINPROGRESS) {
1282*a9643ea8Slogwang             MTLOG_ERROR("connect failed, errno: %d", errno);
1283*a9643ea8Slogwang             return -2;
1284*a9643ea8Slogwang         }
1285*a9643ea8Slogwang 
1286*a9643ea8Slogwang         KqueuerObj epfd;
1287*a9643ea8Slogwang         epfd.SetOsfd(fd);
1288*a9643ea8Slogwang         epfd.EnableOutput();
1289*a9643ea8Slogwang         epfd.SetOwnerThread(thread);
1290*a9643ea8Slogwang         if (!mtframe->KqueueSchedule(NULL, &epfd, timeout)) {
1291*a9643ea8Slogwang             return -3;
1292*a9643ea8Slogwang         }
1293*a9643ea8Slogwang     }
1294*a9643ea8Slogwang 
1295*a9643ea8Slogwang     return n;
1296*a9643ea8Slogwang }
1297*a9643ea8Slogwang 
1298*a9643ea8Slogwang /**
1299*a9643ea8Slogwang  * @brief ΢�̰߳�����ϵͳIO���� accept
1300*a9643ea8Slogwang  * @param fd �����׽���
1301*a9643ea8Slogwang  * @param addr �ͻ��˵�ַ
1302*a9643ea8Slogwang  * @param addrlen ��ַ�ij���
1303*a9643ea8Slogwang  * @param timeout ��ȴ�ʱ��, ����
1304*a9643ea8Slogwang  * @return >=0 accept��socket������, <0 ʧ��
1305*a9643ea8Slogwang  */
1306*a9643ea8Slogwang int MtFrame::accept(int fd, struct sockaddr *addr, socklen_t *addrlen, int timeout)
1307*a9643ea8Slogwang {
1308*a9643ea8Slogwang     MtFrame* mtframe = MtFrame::Instance();
1309*a9643ea8Slogwang     utime64_t start = mtframe->GetLastClock();
1310*a9643ea8Slogwang     MicroThread* thread = mtframe->GetActiveThread();
1311*a9643ea8Slogwang     utime64_t now = 0;
1312*a9643ea8Slogwang 
1313*a9643ea8Slogwang     if(fd<0)
1314*a9643ea8Slogwang     {
1315*a9643ea8Slogwang     	errno = EINVAL;
1316*a9643ea8Slogwang     	MTLOG_ERROR("accept failed, errno: %d (%m)", errno);
1317*a9643ea8Slogwang     	return -10;
1318*a9643ea8Slogwang     }
1319*a9643ea8Slogwang 
1320*a9643ea8Slogwang     int acceptfd = 0;
1321*a9643ea8Slogwang     mt_hook_syscall(accept);
1322*a9643ea8Slogwang     while ((acceptfd = ff_hook_accept(fd, addr, addrlen)) < 0)
1323*a9643ea8Slogwang     {
1324*a9643ea8Slogwang         now = mtframe->GetLastClock();
1325*a9643ea8Slogwang         if ((int)(now - start) > timeout)
1326*a9643ea8Slogwang         {
1327*a9643ea8Slogwang             errno = ETIME;
1328*a9643ea8Slogwang             return -1;
1329*a9643ea8Slogwang         }
1330*a9643ea8Slogwang 
1331*a9643ea8Slogwang         if (errno == EINTR) {
1332*a9643ea8Slogwang             continue;
1333*a9643ea8Slogwang         }
1334*a9643ea8Slogwang 
1335*a9643ea8Slogwang         if (!((errno == EAGAIN) || (errno == EWOULDBLOCK))) {
1336*a9643ea8Slogwang             MTLOG_ERROR("accept failed, errno: %d", errno);
1337*a9643ea8Slogwang             return -2;
1338*a9643ea8Slogwang         }
1339*a9643ea8Slogwang 
1340*a9643ea8Slogwang         KqueuerObj epfd;
1341*a9643ea8Slogwang         epfd.SetOsfd(fd);
1342*a9643ea8Slogwang         epfd.EnableInput();
1343*a9643ea8Slogwang         epfd.SetOwnerThread(thread);
1344*a9643ea8Slogwang         if (!mtframe->KqueueSchedule(NULL, &epfd, timeout)) {
1345*a9643ea8Slogwang             return -3;
1346*a9643ea8Slogwang         }
1347*a9643ea8Slogwang     }
1348*a9643ea8Slogwang 
1349*a9643ea8Slogwang     return acceptfd;
1350*a9643ea8Slogwang }
1351*a9643ea8Slogwang 
1352*a9643ea8Slogwang 
1353*a9643ea8Slogwang /**
1354*a9643ea8Slogwang  * @brief ΢�̰߳�����ϵͳIO���� read
1355*a9643ea8Slogwang  * @param fd ϵͳsocket��Ϣ
1356*a9643ea8Slogwang  * @param buf ������Ϣ������ָ��
1357*a9643ea8Slogwang  * @param nbyte ������Ϣ����������
1358*a9643ea8Slogwang  * @param timeout ��ȴ�ʱ��, ����
1359*a9643ea8Slogwang  * @return >0 �ɹ����ճ���, <0 ʧ��
1360*a9643ea8Slogwang  */
1361*a9643ea8Slogwang ssize_t MtFrame::read(int fd, void *buf, size_t nbyte, int timeout)
1362*a9643ea8Slogwang {
1363*a9643ea8Slogwang     MtFrame* mtframe = MtFrame::Instance();
1364*a9643ea8Slogwang     utime64_t start = mtframe->GetLastClock();
1365*a9643ea8Slogwang     MicroThread* thread = mtframe->GetActiveThread();
1366*a9643ea8Slogwang     utime64_t now = 0;
1367*a9643ea8Slogwang 
1368*a9643ea8Slogwang     if(fd<0 || !buf || nbyte<1)
1369*a9643ea8Slogwang     {
1370*a9643ea8Slogwang     	errno = EINVAL;
1371*a9643ea8Slogwang     	MTLOG_ERROR("read failed, errno: %d (%m)", errno);
1372*a9643ea8Slogwang     	return -10;
1373*a9643ea8Slogwang     }
1374*a9643ea8Slogwang 
1375*a9643ea8Slogwang     ssize_t n = 0;
1376*a9643ea8Slogwang     mt_hook_syscall(read);
1377*a9643ea8Slogwang     while ((n = ff_hook_read(fd, buf, nbyte)) < 0)
1378*a9643ea8Slogwang     {
1379*a9643ea8Slogwang         now = mtframe->GetLastClock();
1380*a9643ea8Slogwang         if ((int)(now - start) > timeout)
1381*a9643ea8Slogwang         {
1382*a9643ea8Slogwang             errno = ETIME;
1383*a9643ea8Slogwang             return -1;
1384*a9643ea8Slogwang         }
1385*a9643ea8Slogwang 
1386*a9643ea8Slogwang         if (errno == EINTR) {
1387*a9643ea8Slogwang             continue;
1388*a9643ea8Slogwang         }
1389*a9643ea8Slogwang 
1390*a9643ea8Slogwang         if ((errno != EAGAIN) && (errno != EWOULDBLOCK)) {
1391*a9643ea8Slogwang             MTLOG_ERROR("read failed, errno: %d", errno);
1392*a9643ea8Slogwang             return -2;
1393*a9643ea8Slogwang         }
1394*a9643ea8Slogwang 
1395*a9643ea8Slogwang         KqueuerObj epfd;
1396*a9643ea8Slogwang         epfd.SetOsfd(fd);
1397*a9643ea8Slogwang         epfd.EnableInput();
1398*a9643ea8Slogwang         epfd.SetOwnerThread(thread);
1399*a9643ea8Slogwang         if (!mtframe->KqueueSchedule(NULL, &epfd, timeout)) {
1400*a9643ea8Slogwang             return -3;
1401*a9643ea8Slogwang         }
1402*a9643ea8Slogwang     }
1403*a9643ea8Slogwang 
1404*a9643ea8Slogwang     return n;
1405*a9643ea8Slogwang }
1406*a9643ea8Slogwang 
1407*a9643ea8Slogwang /**
1408*a9643ea8Slogwang  * @brief ΢�̰߳�����ϵͳIO���� write
1409*a9643ea8Slogwang  * @param fd ϵͳsocket��Ϣ
1410*a9643ea8Slogwang  * @param buf �����͵���Ϣָ��
1411*a9643ea8Slogwang  * @param nbyte �����͵���Ϣ����
1412*a9643ea8Slogwang  * @param timeout ��ȴ�ʱ��, ����
1413*a9643ea8Slogwang  * @return >0 �ɹ����ͳ���, <0 ʧ��
1414*a9643ea8Slogwang  */
1415*a9643ea8Slogwang ssize_t MtFrame::write(int fd, const void *buf, size_t nbyte, int timeout)
1416*a9643ea8Slogwang {
1417*a9643ea8Slogwang     MtFrame* mtframe = MtFrame::Instance();
1418*a9643ea8Slogwang     utime64_t start = mtframe->GetLastClock();
1419*a9643ea8Slogwang     MicroThread* thread = mtframe->GetActiveThread();
1420*a9643ea8Slogwang     utime64_t now = 0;
1421*a9643ea8Slogwang 
1422*a9643ea8Slogwang     if(fd<0 || !buf || nbyte<1)
1423*a9643ea8Slogwang     {
1424*a9643ea8Slogwang     	errno = EINVAL;
1425*a9643ea8Slogwang     	MTLOG_ERROR("write failed, errno: %d (%m)", errno);
1426*a9643ea8Slogwang     	return -10;
1427*a9643ea8Slogwang     }
1428*a9643ea8Slogwang 
1429*a9643ea8Slogwang     ssize_t n = 0;
1430*a9643ea8Slogwang     size_t send_len = 0;
1431*a9643ea8Slogwang     while (send_len < nbyte)
1432*a9643ea8Slogwang     {
1433*a9643ea8Slogwang         now = mtframe->GetLastClock();
1434*a9643ea8Slogwang         if ((int)(now - start) > timeout)
1435*a9643ea8Slogwang         {
1436*a9643ea8Slogwang             errno = ETIME;
1437*a9643ea8Slogwang             return -1;
1438*a9643ea8Slogwang         }
1439*a9643ea8Slogwang 
1440*a9643ea8Slogwang         mt_hook_syscall(write);
1441*a9643ea8Slogwang         n = ff_hook_write(fd, (char*)buf + send_len, nbyte - send_len);
1442*a9643ea8Slogwang         if (n < 0)
1443*a9643ea8Slogwang         {
1444*a9643ea8Slogwang             if (errno == EINTR) {
1445*a9643ea8Slogwang                 continue;
1446*a9643ea8Slogwang             }
1447*a9643ea8Slogwang 
1448*a9643ea8Slogwang             if ((errno != EAGAIN) && (errno != EWOULDBLOCK)) {
1449*a9643ea8Slogwang                 MTLOG_ERROR("write failed, errno: %d", errno);
1450*a9643ea8Slogwang                 return -2;
1451*a9643ea8Slogwang             }
1452*a9643ea8Slogwang         }
1453*a9643ea8Slogwang         else
1454*a9643ea8Slogwang         {
1455*a9643ea8Slogwang             send_len += n;
1456*a9643ea8Slogwang             if (send_len >= nbyte) {
1457*a9643ea8Slogwang                 return nbyte;
1458*a9643ea8Slogwang             }
1459*a9643ea8Slogwang         }
1460*a9643ea8Slogwang 
1461*a9643ea8Slogwang         KqueuerObj epfd;
1462*a9643ea8Slogwang         epfd.SetOsfd(fd);
1463*a9643ea8Slogwang         epfd.EnableOutput();
1464*a9643ea8Slogwang         epfd.SetOwnerThread(thread);
1465*a9643ea8Slogwang         if (!mtframe->KqueueSchedule(NULL, &epfd, timeout)) {
1466*a9643ea8Slogwang             return -3;
1467*a9643ea8Slogwang         }
1468*a9643ea8Slogwang     }
1469*a9643ea8Slogwang 
1470*a9643ea8Slogwang     return nbyte;
1471*a9643ea8Slogwang }
1472*a9643ea8Slogwang 
1473*a9643ea8Slogwang 
1474*a9643ea8Slogwang /**
1475*a9643ea8Slogwang  * @brief ΢�̰߳�����ϵͳIO���� recv
1476*a9643ea8Slogwang  * @param fd ϵͳsocket��Ϣ
1477*a9643ea8Slogwang  * @param buf ������Ϣ������ָ��
1478*a9643ea8Slogwang  * @param len ������Ϣ����������
1479*a9643ea8Slogwang  * @param timeout ��ȴ�ʱ��, ����
1480*a9643ea8Slogwang  * @return >0 �ɹ����ճ���, <0 ʧ��
1481*a9643ea8Slogwang  */
1482*a9643ea8Slogwang int MtFrame::recv(int fd, void *buf, int len, int flags, int timeout)
1483*a9643ea8Slogwang {
1484*a9643ea8Slogwang     MtFrame* mtframe = MtFrame::Instance();
1485*a9643ea8Slogwang     utime64_t start = mtframe->GetLastClock();
1486*a9643ea8Slogwang     MicroThread* thread = mtframe->GetActiveThread();
1487*a9643ea8Slogwang     utime64_t now = 0;
1488*a9643ea8Slogwang 
1489*a9643ea8Slogwang     if(fd<0 || !buf || len<1)
1490*a9643ea8Slogwang     {
1491*a9643ea8Slogwang     	errno = EINVAL;
1492*a9643ea8Slogwang     	MTLOG_ERROR("recv failed, errno: %d (%m)", errno);
1493*a9643ea8Slogwang     	return -10;
1494*a9643ea8Slogwang     }
1495*a9643ea8Slogwang 
1496*a9643ea8Slogwang     if (timeout <= -1)
1497*a9643ea8Slogwang     {
1498*a9643ea8Slogwang         timeout = 0x7fffffff;
1499*a9643ea8Slogwang     }
1500*a9643ea8Slogwang 
1501*a9643ea8Slogwang     while (true)
1502*a9643ea8Slogwang     {
1503*a9643ea8Slogwang         now = mtframe->GetLastClock();
1504*a9643ea8Slogwang         if ((int)(now - start) > timeout)
1505*a9643ea8Slogwang         {
1506*a9643ea8Slogwang             errno = ETIME;
1507*a9643ea8Slogwang             return -1;
1508*a9643ea8Slogwang         }
1509*a9643ea8Slogwang 
1510*a9643ea8Slogwang         KqueuerObj epfd;
1511*a9643ea8Slogwang         epfd.SetOsfd(fd);
1512*a9643ea8Slogwang         epfd.EnableInput();
1513*a9643ea8Slogwang         epfd.SetOwnerThread(thread);
1514*a9643ea8Slogwang         if (!mtframe->KqueueSchedule(NULL, &epfd, timeout))
1515*a9643ea8Slogwang         {
1516*a9643ea8Slogwang             MTLOG_DEBUG("epoll schedule failed, errno: %d", errno);
1517*a9643ea8Slogwang             return -2;
1518*a9643ea8Slogwang         }
1519*a9643ea8Slogwang 
1520*a9643ea8Slogwang         mt_hook_syscall(recv);
1521*a9643ea8Slogwang         int n = ff_hook_recv(fd, buf, len, flags);
1522*a9643ea8Slogwang         if (n < 0)
1523*a9643ea8Slogwang         {
1524*a9643ea8Slogwang             if (errno == EINTR) {
1525*a9643ea8Slogwang                 continue;
1526*a9643ea8Slogwang             }
1527*a9643ea8Slogwang 
1528*a9643ea8Slogwang             if ((errno != EAGAIN) && (errno != EWOULDBLOCK))
1529*a9643ea8Slogwang             {
1530*a9643ea8Slogwang                 MTLOG_ERROR("recv failed, errno: %d", errno);
1531*a9643ea8Slogwang                 return -3;
1532*a9643ea8Slogwang             }
1533*a9643ea8Slogwang         }
1534*a9643ea8Slogwang         else
1535*a9643ea8Slogwang         {
1536*a9643ea8Slogwang             return n;
1537*a9643ea8Slogwang         }
1538*a9643ea8Slogwang     }
1539*a9643ea8Slogwang 
1540*a9643ea8Slogwang }
1541*a9643ea8Slogwang 
1542*a9643ea8Slogwang /**
1543*a9643ea8Slogwang  * @brief ΢�̰߳�����ϵͳIO���� send
1544*a9643ea8Slogwang  * @param fd ϵͳsocket��Ϣ
1545*a9643ea8Slogwang  * @param buf �����͵���Ϣָ��
1546*a9643ea8Slogwang  * @param nbyte �����͵���Ϣ����
1547*a9643ea8Slogwang  * @param timeout ��ȴ�ʱ��, ����
1548*a9643ea8Slogwang  * @return >0 �ɹ����ͳ���, <0 ʧ��
1549*a9643ea8Slogwang  */
1550*a9643ea8Slogwang ssize_t MtFrame::send(int fd, const void *buf, size_t nbyte, int flags, int timeout)
1551*a9643ea8Slogwang {
1552*a9643ea8Slogwang     MtFrame* mtframe = MtFrame::Instance();
1553*a9643ea8Slogwang     utime64_t start = mtframe->GetLastClock();
1554*a9643ea8Slogwang     MicroThread* thread = mtframe->GetActiveThread();
1555*a9643ea8Slogwang     utime64_t now = 0;
1556*a9643ea8Slogwang 
1557*a9643ea8Slogwang     if(fd<0 || !buf || nbyte<1)
1558*a9643ea8Slogwang     {
1559*a9643ea8Slogwang     	errno = EINVAL;
1560*a9643ea8Slogwang     	MTLOG_ERROR("send failed, errno: %d (%m)", errno);
1561*a9643ea8Slogwang     	return -10;
1562*a9643ea8Slogwang     }
1563*a9643ea8Slogwang 
1564*a9643ea8Slogwang     ssize_t n = 0;
1565*a9643ea8Slogwang     size_t send_len = 0;
1566*a9643ea8Slogwang     while (send_len < nbyte)
1567*a9643ea8Slogwang     {
1568*a9643ea8Slogwang         now = mtframe->GetLastClock();
1569*a9643ea8Slogwang         if ((int)(now - start) > timeout)
1570*a9643ea8Slogwang         {
1571*a9643ea8Slogwang             errno = ETIME;
1572*a9643ea8Slogwang             return -1;
1573*a9643ea8Slogwang         }
1574*a9643ea8Slogwang 
1575*a9643ea8Slogwang         mt_hook_syscall(send);
1576*a9643ea8Slogwang         n = ff_hook_send(fd, (char*)buf + send_len, nbyte - send_len, flags);
1577*a9643ea8Slogwang         if (n < 0)
1578*a9643ea8Slogwang         {
1579*a9643ea8Slogwang             if (errno == EINTR) {
1580*a9643ea8Slogwang                 continue;
1581*a9643ea8Slogwang             }
1582*a9643ea8Slogwang 
1583*a9643ea8Slogwang             if ((errno != EAGAIN) && (errno != EWOULDBLOCK)) {
1584*a9643ea8Slogwang                 MTLOG_ERROR("write failed, errno: %d", errno);
1585*a9643ea8Slogwang                 return -2;
1586*a9643ea8Slogwang             }
1587*a9643ea8Slogwang         }
1588*a9643ea8Slogwang         else
1589*a9643ea8Slogwang         {
1590*a9643ea8Slogwang             send_len += n;
1591*a9643ea8Slogwang             if (send_len >= nbyte) {
1592*a9643ea8Slogwang                 return nbyte;
1593*a9643ea8Slogwang             }
1594*a9643ea8Slogwang         }
1595*a9643ea8Slogwang 
1596*a9643ea8Slogwang         KqueuerObj epfd;
1597*a9643ea8Slogwang         epfd.SetOsfd(fd);
1598*a9643ea8Slogwang         epfd.EnableOutput();
1599*a9643ea8Slogwang         epfd.SetOwnerThread(thread);
1600*a9643ea8Slogwang         if (!mtframe->KqueueSchedule(NULL, &epfd, timeout)) {
1601*a9643ea8Slogwang             return -3;
1602*a9643ea8Slogwang         }
1603*a9643ea8Slogwang     }
1604*a9643ea8Slogwang 
1605*a9643ea8Slogwang     return nbyte;
1606*a9643ea8Slogwang }
1607*a9643ea8Slogwang 
1608*a9643ea8Slogwang 
1609*a9643ea8Slogwang 
1610*a9643ea8Slogwang /**
1611*a9643ea8Slogwang  * @brief ΢�߳�����sleep�ӿ�, ��λms
1612*a9643ea8Slogwang  */
1613*a9643ea8Slogwang void MtFrame::sleep(int ms)
1614*a9643ea8Slogwang {
1615*a9643ea8Slogwang     MtFrame* frame = MtFrame::Instance();
1616*a9643ea8Slogwang     MicroThread* thread = frame->GetActiveThread();
1617*a9643ea8Slogwang     if (thread != NULL)
1618*a9643ea8Slogwang     {
1619*a9643ea8Slogwang         thread->sleep(ms);
1620*a9643ea8Slogwang     }
1621*a9643ea8Slogwang }
1622*a9643ea8Slogwang 
1623*a9643ea8Slogwang /**
1624*a9643ea8Slogwang  * @brief ΢�̰߳�����ϵͳIO���� recv
1625*a9643ea8Slogwang  * @param fd ϵͳsocket��Ϣ
1626*a9643ea8Slogwang  * @param events �¼�����  EPOLLIN or EPOLLOUT
1627*a9643ea8Slogwang  * @param timeout ��ȴ�ʱ��, ����
1628*a9643ea8Slogwang  * @return >0 �ɹ����ճ���, <0 ʧ��
1629*a9643ea8Slogwang  */
1630*a9643ea8Slogwang int MtFrame::WaitEvents(int fd, int events, int timeout)
1631*a9643ea8Slogwang {
1632*a9643ea8Slogwang     MtFrame* mtframe = MtFrame::Instance();
1633*a9643ea8Slogwang     utime64_t start = mtframe->GetLastClock();
1634*a9643ea8Slogwang     MicroThread* thread = mtframe->GetActiveThread();
1635*a9643ea8Slogwang     utime64_t now = 0;
1636*a9643ea8Slogwang 
1637*a9643ea8Slogwang     if (timeout <= -1)
1638*a9643ea8Slogwang     {
1639*a9643ea8Slogwang         timeout = 0x7fffffff;
1640*a9643ea8Slogwang     }
1641*a9643ea8Slogwang 
1642*a9643ea8Slogwang     while (true)
1643*a9643ea8Slogwang     {
1644*a9643ea8Slogwang         now = mtframe->GetLastClock();
1645*a9643ea8Slogwang         if ((int)(now - start) > timeout)
1646*a9643ea8Slogwang         {
1647*a9643ea8Slogwang             errno = ETIME;
1648*a9643ea8Slogwang             return 0;
1649*a9643ea8Slogwang         }
1650*a9643ea8Slogwang 
1651*a9643ea8Slogwang         KqueuerObj epfd;
1652*a9643ea8Slogwang         epfd.SetOsfd(fd);
1653*a9643ea8Slogwang         if (events & KQ_EVENT_READ)
1654*a9643ea8Slogwang         {
1655*a9643ea8Slogwang             epfd.EnableInput();
1656*a9643ea8Slogwang         }
1657*a9643ea8Slogwang         if (events & KQ_EVENT_WRITE)
1658*a9643ea8Slogwang         {
1659*a9643ea8Slogwang             epfd.EnableOutput();
1660*a9643ea8Slogwang         }
1661*a9643ea8Slogwang         epfd.SetOwnerThread(thread);
1662*a9643ea8Slogwang 
1663*a9643ea8Slogwang         if (!mtframe->KqueueSchedule(NULL, &epfd, timeout))
1664*a9643ea8Slogwang         {
1665*a9643ea8Slogwang             MTLOG_TRACE("epoll schedule failed, errno: %d", errno);
1666*a9643ea8Slogwang             return 0;
1667*a9643ea8Slogwang         }
1668*a9643ea8Slogwang 
1669*a9643ea8Slogwang         return epfd.GetRcvEvents();
1670*a9643ea8Slogwang     }
1671*a9643ea8Slogwang }
1672*a9643ea8Slogwang 
1673*a9643ea8Slogwang 
1674