xref: /f-stack/app/micro_thread/micro_thread.cpp (revision 5ac59bc4)
1a9643ea8Slogwang 
2a9643ea8Slogwang /**
3a9643ea8Slogwang  * Tencent is pleased to support the open source community by making MSEC available.
4a9643ea8Slogwang  *
5a9643ea8Slogwang  * Copyright (C) 2016 THL A29 Limited, a Tencent company. All rights reserved.
6a9643ea8Slogwang  *
7a9643ea8Slogwang  * Licensed under the GNU General Public License, Version 2.0 (the "License");
8a9643ea8Slogwang  * you may not use this file except in compliance with the License. You may
9a9643ea8Slogwang  * obtain a copy of the License at
10a9643ea8Slogwang  *
11a9643ea8Slogwang  *     https://opensource.org/licenses/GPL-2.0
12a9643ea8Slogwang  *
13a9643ea8Slogwang  * Unless required by applicable law or agreed to in writing, software distributed under the
14a9643ea8Slogwang  * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
15a9643ea8Slogwang  * either express or implied. See the License for the specific language governing permissions
16a9643ea8Slogwang  * and limitations under the License.
17a9643ea8Slogwang  */
18a9643ea8Slogwang 
19a9643ea8Slogwang 
20a9643ea8Slogwang /**
21a9643ea8Slogwang  *  @filename micro_thread.cpp
22a9643ea8Slogwang  *  @info  micro thread manager
23a9643ea8Slogwang  */
24a9643ea8Slogwang #include "mt_version.h"
25a9643ea8Slogwang #include "micro_thread.h"
26a9643ea8Slogwang #include "mt_net.h"
27a9643ea8Slogwang #include "valgrind.h"
28a9643ea8Slogwang #include <assert.h>
29a9643ea8Slogwang #include "mt_sys_hook.h"
30a9643ea8Slogwang #include "ff_hook.h"
31a9643ea8Slogwang #include "ff_api.h"
32a9643ea8Slogwang 
33a9643ea8Slogwang using namespace NS_MICRO_THREAD;
34a9643ea8Slogwang 
35a9643ea8Slogwang #define  ASSERT(statement)
36a9643ea8Slogwang //#define  ASSERT(statement)   assert(statement)
37a9643ea8Slogwang 
38a9643ea8Slogwang extern "C"  int save_context(jmp_buf jbf);
39a9643ea8Slogwang 
40a9643ea8Slogwang extern "C"  void restore_context(jmp_buf jbf, int ret);
41a9643ea8Slogwang 
42a9643ea8Slogwang extern "C"  void replace_esp(jmp_buf jbf, void* esp);
43a9643ea8Slogwang 
44a9643ea8Slogwang Thread::Thread(int stack_size)
45a9643ea8Slogwang {
46a9643ea8Slogwang     _stack_size  = stack_size ? stack_size : ThreadPool::default_stack_size;
47a9643ea8Slogwang     _wakeup_time = 0;
48a9643ea8Slogwang     _stack       = NULL;
49a9643ea8Slogwang     memset(&_jmpbuf, 0, sizeof(_jmpbuf));
50a9643ea8Slogwang }
51a9643ea8Slogwang 
52a9643ea8Slogwang 
53a9643ea8Slogwang /**
54*5ac59bc4Slogwang  *  @brief LINUX x86/x86_64's allocated stacks.
55a9643ea8Slogwang  */
56a9643ea8Slogwang bool Thread::InitStack()
57a9643ea8Slogwang {
58a9643ea8Slogwang     if (_stack) {
59a9643ea8Slogwang         return true;
60a9643ea8Slogwang     }
61a9643ea8Slogwang 
62*5ac59bc4Slogwang     ///< stack index and memory are separated to prevent out of bounds.
63a9643ea8Slogwang     _stack = (MtStack*)calloc(1, sizeof(MtStack));
64a9643ea8Slogwang     if (NULL == _stack)
65a9643ea8Slogwang     {
66a9643ea8Slogwang         MTLOG_ERROR("calloc stack failed, size %u", sizeof(MtStack));
67a9643ea8Slogwang         return false;
68a9643ea8Slogwang     }
69a9643ea8Slogwang 
70a9643ea8Slogwang     int memsize = MEM_PAGE_SIZE*2 + _stack_size;
71a9643ea8Slogwang     memsize = (memsize + MEM_PAGE_SIZE - 1)/MEM_PAGE_SIZE*MEM_PAGE_SIZE;
72a9643ea8Slogwang 
73a9643ea8Slogwang     static int zero_fd = -1;
74a9643ea8Slogwang     int mmap_flags = MAP_PRIVATE | MAP_ANON;
75a9643ea8Slogwang     void* vaddr = mmap(NULL, memsize, PROT_READ | PROT_WRITE, mmap_flags, zero_fd, 0);
76a9643ea8Slogwang     if (vaddr == (void *)MAP_FAILED)
77a9643ea8Slogwang     {
78a9643ea8Slogwang         MTLOG_ERROR("mmap stack failed, size %d", memsize);
79a9643ea8Slogwang         free(_stack);
80a9643ea8Slogwang         _stack = NULL;
81a9643ea8Slogwang         return false;
82a9643ea8Slogwang     }
83a9643ea8Slogwang     _stack->_vaddr = (char*)vaddr;
84a9643ea8Slogwang     _stack->_vaddr_size = memsize;
85a9643ea8Slogwang     _stack->_stk_size = _stack_size;
86a9643ea8Slogwang     _stack->_stk_bottom = _stack->_vaddr + MEM_PAGE_SIZE;
87a9643ea8Slogwang     _stack->_stk_top = _stack->_stk_bottom + _stack->_stk_size;
88a9643ea8Slogwang     // valgrind support: register stack frame
89a9643ea8Slogwang     _stack->valgrind_id = VALGRIND_STACK_REGISTER(_stack->_stk_bottom, _stack->_stk_top);
90a9643ea8Slogwang 
91a9643ea8Slogwang     _stack->_esp = _stack->_stk_top - STACK_PAD_SIZE;
92a9643ea8Slogwang 
93a9643ea8Slogwang     mprotect(_stack->_vaddr, MEM_PAGE_SIZE, PROT_NONE);
94a9643ea8Slogwang     mprotect(_stack->_stk_top, MEM_PAGE_SIZE, PROT_NONE);
95a9643ea8Slogwang 
96a9643ea8Slogwang     return true;
97a9643ea8Slogwang }
98a9643ea8Slogwang 
99a9643ea8Slogwang 
100a9643ea8Slogwang void Thread::FreeStack()
101a9643ea8Slogwang {
102a9643ea8Slogwang     if (!_stack) {
103a9643ea8Slogwang         return;
104a9643ea8Slogwang     }
105a9643ea8Slogwang     munmap(_stack->_vaddr, _stack->_vaddr_size);
106a9643ea8Slogwang     // valgrind support: deregister stack frame
107a9643ea8Slogwang     VALGRIND_STACK_DEREGISTER(_stack->valgrind_id);
108a9643ea8Slogwang     free(_stack);
109a9643ea8Slogwang     _stack = NULL;
110a9643ea8Slogwang }
111a9643ea8Slogwang 
112a9643ea8Slogwang void Thread::InitContext()
113a9643ea8Slogwang {
114a9643ea8Slogwang     if (save_context(_jmpbuf) != 0)
115a9643ea8Slogwang     {
116*5ac59bc4Slogwang         ScheduleObj::Instance()->ScheduleStartRun();
117a9643ea8Slogwang     }
118a9643ea8Slogwang 
119a9643ea8Slogwang     if (_stack != NULL)
120a9643ea8Slogwang     {
121a9643ea8Slogwang         replace_esp(_jmpbuf, _stack->_esp);
122a9643ea8Slogwang     }
123a9643ea8Slogwang }
124a9643ea8Slogwang 
125a9643ea8Slogwang void Thread::SwitchContext()
126a9643ea8Slogwang {
127a9643ea8Slogwang     if (save_context(_jmpbuf) == 0)
128a9643ea8Slogwang     {
129a9643ea8Slogwang         ScheduleObj::Instance()->ScheduleThread();
130a9643ea8Slogwang     }
131a9643ea8Slogwang }
132a9643ea8Slogwang 
133a9643ea8Slogwang void Thread::RestoreContext()
134a9643ea8Slogwang {
135a9643ea8Slogwang     restore_context(_jmpbuf, 1);
136a9643ea8Slogwang }
137a9643ea8Slogwang 
138*5ac59bc4Slogwang 
139a9643ea8Slogwang bool Thread::Initial()
140a9643ea8Slogwang {
141a9643ea8Slogwang     if (!InitStack())
142a9643ea8Slogwang     {
143a9643ea8Slogwang         MTLOG_ERROR("init stack failed");
144a9643ea8Slogwang         return false;
145a9643ea8Slogwang     }
146a9643ea8Slogwang 
147a9643ea8Slogwang     InitContext();
148a9643ea8Slogwang 
149a9643ea8Slogwang     return true;
150a9643ea8Slogwang }
151a9643ea8Slogwang 
152a9643ea8Slogwang void Thread::Destroy()
153a9643ea8Slogwang {
154a9643ea8Slogwang     FreeStack();
155a9643ea8Slogwang     memset(&_jmpbuf, 0, sizeof(_jmpbuf));
156a9643ea8Slogwang }
157a9643ea8Slogwang 
158a9643ea8Slogwang void Thread::Reset()
159a9643ea8Slogwang {
160a9643ea8Slogwang     _wakeup_time = 0;
161a9643ea8Slogwang     SetPrivate(NULL);
162a9643ea8Slogwang 
163a9643ea8Slogwang     InitContext();
164a9643ea8Slogwang     CleanState();
165a9643ea8Slogwang }
166a9643ea8Slogwang 
167a9643ea8Slogwang void Thread::sleep(int ms)
168a9643ea8Slogwang {
169a9643ea8Slogwang     utime64_t now = ScheduleObj::Instance()->ScheduleGetTime();
170a9643ea8Slogwang     _wakeup_time = now + ms;
171a9643ea8Slogwang 
172a9643ea8Slogwang     if (save_context(_jmpbuf) == 0)
173a9643ea8Slogwang     {
174a9643ea8Slogwang         ScheduleObj::Instance()->ScheduleSleep();
175a9643ea8Slogwang     }
176a9643ea8Slogwang }
177a9643ea8Slogwang 
178a9643ea8Slogwang void Thread::Wait()
179a9643ea8Slogwang {
180a9643ea8Slogwang     if (save_context(_jmpbuf) == 0)
181a9643ea8Slogwang     {
182a9643ea8Slogwang         ScheduleObj::Instance()->SchedulePend();
183a9643ea8Slogwang     }
184a9643ea8Slogwang }
185a9643ea8Slogwang 
186a9643ea8Slogwang bool Thread::CheckStackHealth(char *esp)
187a9643ea8Slogwang {
188a9643ea8Slogwang     if (!_stack)
189a9643ea8Slogwang         return false;
190a9643ea8Slogwang 
191a9643ea8Slogwang     if (esp > _stack->_stk_bottom && esp < _stack->_stk_top)
192a9643ea8Slogwang         return true;
193a9643ea8Slogwang     else
194a9643ea8Slogwang         return false;
195a9643ea8Slogwang }
196a9643ea8Slogwang 
197a9643ea8Slogwang MicroThread::MicroThread(ThreadType type)
198a9643ea8Slogwang {
199a9643ea8Slogwang     memset(&_entry, 0, sizeof(_entry));
200a9643ea8Slogwang     TAILQ_INIT(&_fdset);
201a9643ea8Slogwang     TAILQ_INIT(&_sub_list);
202a9643ea8Slogwang     _flag = NOT_INLIST;
203a9643ea8Slogwang     _type = type;
204a9643ea8Slogwang     _state = INITIAL;
205a9643ea8Slogwang     _start = NULL;
206a9643ea8Slogwang     _args = NULL;
207a9643ea8Slogwang     _parent = NULL;
208a9643ea8Slogwang }
209a9643ea8Slogwang 
210a9643ea8Slogwang void MicroThread::CleanState()
211a9643ea8Slogwang {
212a9643ea8Slogwang     TAILQ_INIT(&_fdset);
213a9643ea8Slogwang     TAILQ_INIT(&_sub_list);
214a9643ea8Slogwang     _flag = NOT_INLIST;
215a9643ea8Slogwang     _type = NORMAL;
216a9643ea8Slogwang     _state = INITIAL;
217a9643ea8Slogwang     _start = NULL;
218a9643ea8Slogwang     _args = NULL;
219a9643ea8Slogwang     _parent = NULL;
220a9643ea8Slogwang }
221a9643ea8Slogwang 
222a9643ea8Slogwang void MicroThread::Run()
223a9643ea8Slogwang {
224a9643ea8Slogwang     if (_start) {
225a9643ea8Slogwang         _start(_args);
226a9643ea8Slogwang     }
227a9643ea8Slogwang 
228a9643ea8Slogwang     if (this->IsSubThread()) {
229a9643ea8Slogwang         this->WakeupParent();
230a9643ea8Slogwang     }
231a9643ea8Slogwang 
232a9643ea8Slogwang     ScheduleObj::Instance()->ScheduleReclaim();
233a9643ea8Slogwang     ScheduleObj::Instance()->ScheduleThread();
234a9643ea8Slogwang }
235a9643ea8Slogwang 
236a9643ea8Slogwang void MicroThread::WakeupParent()
237a9643ea8Slogwang {
238a9643ea8Slogwang     MicroThread* parent = this->GetParent();
239a9643ea8Slogwang     if (parent)
240a9643ea8Slogwang     {
241a9643ea8Slogwang         parent->RemoveSubThread(this);
242a9643ea8Slogwang         if (parent->HasNoSubThread())
243a9643ea8Slogwang         {
244a9643ea8Slogwang             ScheduleObj::Instance()->ScheduleUnpend(parent);
245a9643ea8Slogwang         }
246a9643ea8Slogwang     }
247a9643ea8Slogwang     else
248a9643ea8Slogwang     {
249a9643ea8Slogwang         MTLOG_ERROR("Sub thread no parent, error");
250a9643ea8Slogwang     }
251a9643ea8Slogwang }
252a9643ea8Slogwang 
253a9643ea8Slogwang bool MicroThread::HasNoSubThread()
254a9643ea8Slogwang {
255a9643ea8Slogwang     return TAILQ_EMPTY(&_sub_list);
256a9643ea8Slogwang }
257a9643ea8Slogwang 
258a9643ea8Slogwang void MicroThread::AddSubThread(MicroThread* sub)
259a9643ea8Slogwang {
260a9643ea8Slogwang     ASSERT(!sub->HasFlag(MicroThread::SUB_LIST));
261a9643ea8Slogwang     if (!sub->HasFlag(MicroThread::SUB_LIST))
262a9643ea8Slogwang     {
263a9643ea8Slogwang         TAILQ_INSERT_TAIL(&_sub_list, sub, _sub_entry);
264a9643ea8Slogwang         sub->_parent = this;
265a9643ea8Slogwang     }
266a9643ea8Slogwang 
267a9643ea8Slogwang     sub->SetFlag(MicroThread::SUB_LIST);
268a9643ea8Slogwang }
269a9643ea8Slogwang 
270a9643ea8Slogwang void MicroThread::RemoveSubThread(MicroThread* sub)
271a9643ea8Slogwang {
272a9643ea8Slogwang     ASSERT(sub->HasFlag(MicroThread::SUB_LIST));
273a9643ea8Slogwang     if (sub->HasFlag(MicroThread::SUB_LIST))
274a9643ea8Slogwang     {
275a9643ea8Slogwang         TAILQ_REMOVE(&_sub_list, sub, _sub_entry);
276a9643ea8Slogwang         sub->_parent = NULL;
277a9643ea8Slogwang     }
278a9643ea8Slogwang 
279a9643ea8Slogwang     sub->UnsetFlag(MicroThread::SUB_LIST);
280a9643ea8Slogwang }
281a9643ea8Slogwang 
282*5ac59bc4Slogwang ScheduleObj *ScheduleObj::_instance = NULL;
283a9643ea8Slogwang inline ScheduleObj* ScheduleObj::Instance()
284a9643ea8Slogwang {
285a9643ea8Slogwang     if (NULL == _instance)
286a9643ea8Slogwang     {
287a9643ea8Slogwang         _instance = new ScheduleObj();
288a9643ea8Slogwang     }
289a9643ea8Slogwang 
290a9643ea8Slogwang     return _instance;
291a9643ea8Slogwang }
292a9643ea8Slogwang 
293a9643ea8Slogwang void ScheduleObj::ScheduleThread()
294a9643ea8Slogwang {
295a9643ea8Slogwang     MtFrame* frame = MtFrame::Instance();
296a9643ea8Slogwang     frame->ThreadSchdule();
297a9643ea8Slogwang }
298a9643ea8Slogwang 
299a9643ea8Slogwang utime64_t ScheduleObj::ScheduleGetTime()
300a9643ea8Slogwang {
301a9643ea8Slogwang     MtFrame* frame = MtFrame::Instance();
302a9643ea8Slogwang     if (frame)
303a9643ea8Slogwang     {
304a9643ea8Slogwang         return frame->GetLastClock();
305a9643ea8Slogwang     }
306a9643ea8Slogwang     else
307a9643ea8Slogwang     {
308a9643ea8Slogwang         MTLOG_ERROR("frame time failed, maybe not init");
309a9643ea8Slogwang         return 0;
310a9643ea8Slogwang     }
311a9643ea8Slogwang }
312a9643ea8Slogwang 
313a9643ea8Slogwang void ScheduleObj::ScheduleSleep()
314a9643ea8Slogwang {
315a9643ea8Slogwang     MtFrame* frame = MtFrame::Instance();
316a9643ea8Slogwang     MicroThread* thread = frame->GetActiveThread();
317a9643ea8Slogwang     if ((!frame) || (!thread)) {
318a9643ea8Slogwang         MTLOG_ERROR("frame and act thread null, %p, %p", frame, thread);
319a9643ea8Slogwang         return;
320a9643ea8Slogwang     }
321a9643ea8Slogwang 
322a9643ea8Slogwang     frame->InsertSleep(thread);
323a9643ea8Slogwang     frame->ThreadSchdule();
324a9643ea8Slogwang }
325a9643ea8Slogwang 
326a9643ea8Slogwang void ScheduleObj::SchedulePend()
327a9643ea8Slogwang {
328a9643ea8Slogwang     MtFrame* frame = MtFrame::Instance();
329a9643ea8Slogwang     MicroThread* thread = frame->GetActiveThread();
330a9643ea8Slogwang     if ((!frame) || (!thread)) {
331a9643ea8Slogwang         MTLOG_ERROR("frame and act thread null, %p, %p", frame, thread);
332a9643ea8Slogwang         return;
333a9643ea8Slogwang     }
334a9643ea8Slogwang 
335a9643ea8Slogwang     frame->InsertPend(thread);
336a9643ea8Slogwang     frame->ThreadSchdule();
337a9643ea8Slogwang }
338a9643ea8Slogwang 
339a9643ea8Slogwang void ScheduleObj::ScheduleUnpend(void* pthread)
340a9643ea8Slogwang {
341a9643ea8Slogwang     MtFrame* frame = MtFrame::Instance();
342a9643ea8Slogwang     MicroThread* thread = (MicroThread*)pthread;
343a9643ea8Slogwang     if ((!frame) || (!thread)) {
344a9643ea8Slogwang         MTLOG_ERROR("frame and act thread null, %p, %p", frame, thread);
345a9643ea8Slogwang         return;
346a9643ea8Slogwang     }
347a9643ea8Slogwang 
348a9643ea8Slogwang     frame->RemovePend(thread);
349a9643ea8Slogwang     frame->InsertRunable(thread);
350a9643ea8Slogwang }
351a9643ea8Slogwang 
352a9643ea8Slogwang void ScheduleObj::ScheduleReclaim()
353a9643ea8Slogwang {
354a9643ea8Slogwang     MtFrame* frame = MtFrame::Instance();
355a9643ea8Slogwang     MicroThread* thread = frame->GetActiveThread();
356a9643ea8Slogwang     if ((!frame) || (!thread)) {
357a9643ea8Slogwang         MTLOG_ERROR("frame and act thread null, %p, %p", frame, thread);
358a9643ea8Slogwang         return;
359a9643ea8Slogwang     }
360a9643ea8Slogwang 
361a9643ea8Slogwang     frame->FreeThread(thread);
362a9643ea8Slogwang }
363a9643ea8Slogwang 
364a9643ea8Slogwang void ScheduleObj::ScheduleStartRun()
365a9643ea8Slogwang {
366a9643ea8Slogwang     MtFrame* frame = MtFrame::Instance();
367a9643ea8Slogwang     MicroThread* thread = frame->GetActiveThread();
368a9643ea8Slogwang     if ((!frame) || (!thread)) {
369a9643ea8Slogwang         MTLOG_ERROR("frame and act thread null, %p, %p", frame, thread);
370a9643ea8Slogwang         return;
371a9643ea8Slogwang     }
372a9643ea8Slogwang 
373a9643ea8Slogwang     thread->Run();
374a9643ea8Slogwang }
375a9643ea8Slogwang 
376a9643ea8Slogwang 
377*5ac59bc4Slogwang unsigned int ThreadPool::default_thread_num = DEFAULT_THREAD_NUM;   ///< 2000 micro threads.
378*5ac59bc4Slogwang unsigned int ThreadPool::default_stack_size = DEFAULT_STACK_SIZE;   ///< 128k stack.
379a9643ea8Slogwang 
380a9643ea8Slogwang bool ThreadPool::InitialPool(int max_num)
381a9643ea8Slogwang {
382a9643ea8Slogwang     MicroThread *thread = NULL;
383a9643ea8Slogwang     for (unsigned int i = 0; i < default_thread_num; i++)
384a9643ea8Slogwang     {
385a9643ea8Slogwang         thread = new MicroThread();
386a9643ea8Slogwang         if ((NULL == thread) || (false == thread->Initial()))
387a9643ea8Slogwang         {
388a9643ea8Slogwang             MTLOG_ERROR("init pool, thread %p init failed", thread);
389a9643ea8Slogwang             if (thread)  delete thread;
390a9643ea8Slogwang             continue;
391a9643ea8Slogwang         }
392a9643ea8Slogwang         thread->SetFlag(MicroThread::FREE_LIST);
393a9643ea8Slogwang         _freelist.push(thread);
394a9643ea8Slogwang     }
395a9643ea8Slogwang 
396a9643ea8Slogwang     _total_num = _freelist.size();
397a9643ea8Slogwang     _max_num  = max_num;
398a9643ea8Slogwang     _use_num = 0;
399a9643ea8Slogwang     if (_total_num <= 0)
400a9643ea8Slogwang     {
401a9643ea8Slogwang         return false;
402a9643ea8Slogwang     }
403a9643ea8Slogwang     else
404a9643ea8Slogwang     {
405a9643ea8Slogwang         return true;
406a9643ea8Slogwang     }
407a9643ea8Slogwang }
408a9643ea8Slogwang 
409a9643ea8Slogwang void ThreadPool::DestroyPool()
410a9643ea8Slogwang {
411a9643ea8Slogwang     MicroThread* thread = NULL;
412a9643ea8Slogwang     while (!_freelist.empty())
413a9643ea8Slogwang     {
414a9643ea8Slogwang         thread = _freelist.front();
415a9643ea8Slogwang         _freelist.pop();
416a9643ea8Slogwang         thread->Destroy();
417a9643ea8Slogwang         delete thread;
418a9643ea8Slogwang     }
419a9643ea8Slogwang 
420a9643ea8Slogwang     _total_num = 0;
421a9643ea8Slogwang     _use_num = 0;
422a9643ea8Slogwang }
423a9643ea8Slogwang 
424a9643ea8Slogwang MicroThread* ThreadPool::AllocThread()
425a9643ea8Slogwang {
426*5ac59bc4Slogwang     MT_ATTR_API_SET(492069, _total_num);
427a9643ea8Slogwang 
428a9643ea8Slogwang     MicroThread* thread = NULL;
429a9643ea8Slogwang     if (!_freelist.empty())
430a9643ea8Slogwang     {
431a9643ea8Slogwang         thread = _freelist.front();
432a9643ea8Slogwang         _freelist.pop();
433a9643ea8Slogwang 
434a9643ea8Slogwang         ASSERT(thread->HasFlag(MicroThread::FREE_LIST));
435a9643ea8Slogwang 
436a9643ea8Slogwang         thread->UnsetFlag(MicroThread::FREE_LIST);
437a9643ea8Slogwang         _use_num++;
438a9643ea8Slogwang         return thread;
439a9643ea8Slogwang     }
440a9643ea8Slogwang 
441a9643ea8Slogwang     MT_ATTR_API(320846, 1); // pool no nore
442a9643ea8Slogwang     if (_total_num >= _max_num)
443a9643ea8Slogwang     {
444a9643ea8Slogwang         MT_ATTR_API(361140, 1); // no more quota
445a9643ea8Slogwang         return NULL;
446a9643ea8Slogwang     }
447a9643ea8Slogwang 
448a9643ea8Slogwang     thread = new MicroThread();
449a9643ea8Slogwang     if ((NULL == thread) || (false == thread->Initial()))
450a9643ea8Slogwang     {
451a9643ea8Slogwang         MT_ATTR_API(320847, 1); // pool init fail
452a9643ea8Slogwang         MTLOG_ERROR("thread alloc failed, thread: %p", thread);
453a9643ea8Slogwang         if (thread)  delete thread;
454a9643ea8Slogwang         return NULL;
455a9643ea8Slogwang     }
456a9643ea8Slogwang     _total_num++;
457a9643ea8Slogwang     _use_num++;
458a9643ea8Slogwang 
459a9643ea8Slogwang     return thread;
460a9643ea8Slogwang }
461a9643ea8Slogwang 
462a9643ea8Slogwang void ThreadPool::FreeThread(MicroThread* thread)
463a9643ea8Slogwang {
464a9643ea8Slogwang     ASSERT(!thread->HasFlag(MicroThread::FREE_LIST));
465a9643ea8Slogwang     thread->Reset();
466a9643ea8Slogwang     _use_num--;
467a9643ea8Slogwang     _freelist.push(thread);
468a9643ea8Slogwang     thread->SetFlag(MicroThread::FREE_LIST);
469a9643ea8Slogwang 
470a9643ea8Slogwang     unsigned int free_num = _freelist.size();
471a9643ea8Slogwang     if ((free_num > default_thread_num) && (free_num > 1))
472a9643ea8Slogwang     {
473a9643ea8Slogwang         thread = _freelist.front();
474a9643ea8Slogwang         _freelist.pop();
475a9643ea8Slogwang         thread->Destroy();
476a9643ea8Slogwang         delete thread;
477a9643ea8Slogwang         _total_num--;
478a9643ea8Slogwang     }
479a9643ea8Slogwang }
480a9643ea8Slogwang 
481a9643ea8Slogwang int ThreadPool::GetUsedNum(void)
482a9643ea8Slogwang {
483a9643ea8Slogwang     return _use_num;
484a9643ea8Slogwang }
485a9643ea8Slogwang 
486a9643ea8Slogwang MtFrame *MtFrame::_instance = NULL;
487a9643ea8Slogwang inline MtFrame* MtFrame::Instance ()
488a9643ea8Slogwang {
489a9643ea8Slogwang     if (NULL == _instance )
490a9643ea8Slogwang     {
491a9643ea8Slogwang         _instance = new MtFrame();
492a9643ea8Slogwang     }
493a9643ea8Slogwang 
494a9643ea8Slogwang     return _instance;
495a9643ea8Slogwang }
496a9643ea8Slogwang 
497a9643ea8Slogwang void MtFrame::SetHookFlag() {
498a9643ea8Slogwang     mt_set_hook_flag();
499a9643ea8Slogwang };
500a9643ea8Slogwang 
501a9643ea8Slogwang bool MtFrame::InitFrame(LogAdapter* logadpt, int max_thread_num)
502a9643ea8Slogwang {
503a9643ea8Slogwang     _log_adpt = logadpt;
504a9643ea8Slogwang 
505a9643ea8Slogwang     if ((this->InitKqueue(max_thread_num) < 0) || !this->InitialPool(max_thread_num))
506a9643ea8Slogwang     {
507a9643ea8Slogwang         MTLOG_ERROR("Init epoll or thread pool failed");
508a9643ea8Slogwang         this->Destroy();
509a9643ea8Slogwang         return false;
510a9643ea8Slogwang     }
511a9643ea8Slogwang     if (_sleeplist.HeapResize(max_thread_num * 2) < 0)
512a9643ea8Slogwang     {
513a9643ea8Slogwang         MTLOG_ERROR("Init heap list failed");
514a9643ea8Slogwang         this->Destroy();
515a9643ea8Slogwang         return false;
516a9643ea8Slogwang     }
517a9643ea8Slogwang 
518a9643ea8Slogwang     _timer = new CTimerMng(max_thread_num * 2);
519a9643ea8Slogwang     if (NULL == _timer)
520a9643ea8Slogwang     {
521a9643ea8Slogwang         MTLOG_ERROR("Init heap timer failed");
522a9643ea8Slogwang         this->Destroy();
523a9643ea8Slogwang         return false;
524a9643ea8Slogwang     }
525a9643ea8Slogwang 
526a9643ea8Slogwang     _daemon = AllocThread();
527a9643ea8Slogwang     if (NULL == _daemon)
528a9643ea8Slogwang     {
529a9643ea8Slogwang         MTLOG_ERROR("Alloc daemon thread failed");
530a9643ea8Slogwang         this->Destroy();
531a9643ea8Slogwang         return false;
532a9643ea8Slogwang     }
533a9643ea8Slogwang     _daemon->SetType(MicroThread::DAEMON);
534a9643ea8Slogwang     _daemon->SetState(MicroThread::RUNABLE);
535a9643ea8Slogwang     _daemon->SetSartFunc(MtFrame::DaemonRun, this);
536a9643ea8Slogwang 
537a9643ea8Slogwang     _primo = new MicroThread(MicroThread::PRIMORDIAL);
538a9643ea8Slogwang     if (NULL == _primo)
539a9643ea8Slogwang     {
540a9643ea8Slogwang         MTLOG_ERROR("new _primo thread failed");
541a9643ea8Slogwang         this->Destroy();
542a9643ea8Slogwang         return false;
543a9643ea8Slogwang     }
544a9643ea8Slogwang     _primo->SetState(MicroThread::RUNNING);
545a9643ea8Slogwang     SetActiveThread(_primo);
546a9643ea8Slogwang 
547a9643ea8Slogwang     _last_clock = GetSystemMS();
548a9643ea8Slogwang     TAILQ_INIT(&_iolist);
549a9643ea8Slogwang     TAILQ_INIT(&_pend_list);
550a9643ea8Slogwang 
551a9643ea8Slogwang     //SetHookFlag();
552a9643ea8Slogwang 
553a9643ea8Slogwang     return true;
554a9643ea8Slogwang 
555a9643ea8Slogwang }
556a9643ea8Slogwang 
557a9643ea8Slogwang void MtFrame::Destroy(void)
558a9643ea8Slogwang {
559a9643ea8Slogwang     if (NULL == _instance )
560a9643ea8Slogwang     {
561a9643ea8Slogwang         return;
562a9643ea8Slogwang     }
563a9643ea8Slogwang 
564a9643ea8Slogwang     if (_primo) {
565a9643ea8Slogwang         delete _primo;
566a9643ea8Slogwang         _primo = NULL;
567a9643ea8Slogwang     }
568a9643ea8Slogwang 
569a9643ea8Slogwang     if (_daemon) {
570a9643ea8Slogwang         FreeThread(_daemon);
571a9643ea8Slogwang         _daemon = NULL;
572a9643ea8Slogwang     }
573a9643ea8Slogwang 
574a9643ea8Slogwang     TAILQ_INIT(&_iolist);
575a9643ea8Slogwang 
576a9643ea8Slogwang     MicroThread* thread = dynamic_cast<MicroThread*>(_sleeplist.HeapPop());
577a9643ea8Slogwang     while (thread)
578a9643ea8Slogwang     {
579a9643ea8Slogwang         FreeThread(thread);
580a9643ea8Slogwang         thread = dynamic_cast<MicroThread*>(_sleeplist.HeapPop());
581a9643ea8Slogwang     }
582a9643ea8Slogwang 
583a9643ea8Slogwang     while (!_runlist.empty())
584a9643ea8Slogwang     {
585a9643ea8Slogwang         thread = _runlist.front();
586a9643ea8Slogwang         _runlist.pop();
587a9643ea8Slogwang         FreeThread(thread);
588a9643ea8Slogwang     }
589a9643ea8Slogwang 
590a9643ea8Slogwang     MicroThread* tmp;
591a9643ea8Slogwang     TAILQ_FOREACH_SAFE(thread, &_pend_list, _entry, tmp)
592a9643ea8Slogwang     {
593a9643ea8Slogwang         TAILQ_REMOVE(&_pend_list, thread, _entry);
594a9643ea8Slogwang         FreeThread(thread);
595a9643ea8Slogwang     }
596a9643ea8Slogwang 
597a9643ea8Slogwang     if (_timer != NULL)
598a9643ea8Slogwang     {
599a9643ea8Slogwang         delete _timer;
600a9643ea8Slogwang         _timer = NULL;
601a9643ea8Slogwang     }
602a9643ea8Slogwang 
603a9643ea8Slogwang     _instance->DestroyPool();
604a9643ea8Slogwang     _instance->TermKqueue();
605a9643ea8Slogwang     delete _instance;
606a9643ea8Slogwang     _instance = NULL;
607a9643ea8Slogwang }
608a9643ea8Slogwang 
609a9643ea8Slogwang char* MtFrame::Version()
610a9643ea8Slogwang {
611a9643ea8Slogwang     return IMT_VERSION;
612a9643ea8Slogwang }
613a9643ea8Slogwang 
614a9643ea8Slogwang MicroThread* MtFrame::CreateThread(ThreadStart entry, void *args, bool runable)
615a9643ea8Slogwang {
616a9643ea8Slogwang     MtFrame* mtframe = MtFrame::Instance();
617a9643ea8Slogwang     MicroThread* thread = mtframe->AllocThread();
618a9643ea8Slogwang     if (NULL == thread)
619a9643ea8Slogwang     {
620a9643ea8Slogwang         MTLOG_ERROR("create thread failed");
621a9643ea8Slogwang         return NULL;
622a9643ea8Slogwang     }
623a9643ea8Slogwang     thread->SetSartFunc(entry, args);
624a9643ea8Slogwang 
625a9643ea8Slogwang     if (runable) {
626a9643ea8Slogwang         mtframe->InsertRunable(thread);
627a9643ea8Slogwang     }
628a9643ea8Slogwang 
629a9643ea8Slogwang     return thread;
630a9643ea8Slogwang }
631a9643ea8Slogwang 
632a9643ea8Slogwang int MtFrame::Loop(void* args)
633a9643ea8Slogwang {
634a9643ea8Slogwang     MtFrame* mtframe = MtFrame::Instance();
635a9643ea8Slogwang     MicroThread* daemon = mtframe->DaemonThread();
636a9643ea8Slogwang 
637a9643ea8Slogwang     mtframe->KqueueDispatch();
638a9643ea8Slogwang     mtframe->SetLastClock(mtframe->GetSystemMS());
639a9643ea8Slogwang     mtframe->WakeupTimeout();
640a9643ea8Slogwang     mtframe->CheckExpired();
641a9643ea8Slogwang     daemon->SwitchContext();
642a9643ea8Slogwang 
643a9643ea8Slogwang     return 0;
644a9643ea8Slogwang }
645a9643ea8Slogwang 
646a9643ea8Slogwang void MtFrame::DaemonRun(void* args)
647a9643ea8Slogwang {
648a9643ea8Slogwang     /*
649a9643ea8Slogwang     MtFrame* mtframe = MtFrame::Instance();
650a9643ea8Slogwang     MicroThread* daemon = mtframe->DaemonThread();
651a9643ea8Slogwang 
652a9643ea8Slogwang     while (true) {
653a9643ea8Slogwang         mtframe->KqueueDispatch();
654a9643ea8Slogwang         mtframe->SetLastClock(mtframe->GetSystemMS());
655a9643ea8Slogwang         mtframe->WakeupTimeout();
656a9643ea8Slogwang         mtframe->CheckExpired();
657a9643ea8Slogwang         daemon->SwitchContext();
658a9643ea8Slogwang     }
659a9643ea8Slogwang     */
660a9643ea8Slogwang     ff_run(MtFrame::Loop, NULL);
661a9643ea8Slogwang }
662a9643ea8Slogwang 
663a9643ea8Slogwang MicroThread *MtFrame::GetRootThread()
664a9643ea8Slogwang {
665a9643ea8Slogwang     if (NULL == _curr_thread)
666a9643ea8Slogwang     {
667a9643ea8Slogwang         return NULL;
668a9643ea8Slogwang     }
669a9643ea8Slogwang 
670a9643ea8Slogwang     MicroThread::ThreadType type = _curr_thread->GetType();
671a9643ea8Slogwang     MicroThread *thread = _curr_thread;
672a9643ea8Slogwang     MicroThread *parent = thread;
673a9643ea8Slogwang 
674a9643ea8Slogwang     while (MicroThread::SUB_THREAD == type)
675a9643ea8Slogwang     {
676a9643ea8Slogwang         thread = thread->GetParent();
677a9643ea8Slogwang         if (!thread)
678a9643ea8Slogwang         {
679a9643ea8Slogwang             break;
680a9643ea8Slogwang         }
681a9643ea8Slogwang 
682a9643ea8Slogwang         type   = thread->GetType();
683a9643ea8Slogwang         parent = thread;
684a9643ea8Slogwang     }
685a9643ea8Slogwang 
686a9643ea8Slogwang     return parent;
687a9643ea8Slogwang }
688a9643ea8Slogwang 
689a9643ea8Slogwang void MtFrame::ThreadSchdule()
690a9643ea8Slogwang {
691a9643ea8Slogwang     MicroThread* thread = NULL;
692a9643ea8Slogwang     MtFrame* mtframe = MtFrame::Instance();
693a9643ea8Slogwang 
694a9643ea8Slogwang     if (mtframe->_runlist.empty())
695a9643ea8Slogwang     {
696a9643ea8Slogwang         thread = mtframe->DaemonThread();
697a9643ea8Slogwang     }
698a9643ea8Slogwang     else
699a9643ea8Slogwang     {
700a9643ea8Slogwang         thread = mtframe->_runlist.front();
701a9643ea8Slogwang         mtframe->RemoveRunable(thread);
702a9643ea8Slogwang     }
703a9643ea8Slogwang 
704a9643ea8Slogwang     this->SetActiveThread(thread);
705a9643ea8Slogwang     thread->SetState(MicroThread::RUNNING);
706a9643ea8Slogwang     thread->RestoreContext();
707a9643ea8Slogwang }
708a9643ea8Slogwang 
709a9643ea8Slogwang void MtFrame::CheckExpired()
710a9643ea8Slogwang {
711a9643ea8Slogwang     static utime64_t check_time = 0;
712a9643ea8Slogwang 
713a9643ea8Slogwang     if (_timer != NULL)
714a9643ea8Slogwang     {
715a9643ea8Slogwang         _timer->check_expired();
716a9643ea8Slogwang     }
717a9643ea8Slogwang 
718a9643ea8Slogwang    utime64_t now = GetLastClock();
719a9643ea8Slogwang 
720a9643ea8Slogwang     if ((now - check_time) > 1000)
721a9643ea8Slogwang     {
722a9643ea8Slogwang         CNetMgr::Instance()->RecycleObjs(now);
723a9643ea8Slogwang         check_time = now;
724a9643ea8Slogwang     }
725a9643ea8Slogwang }
726a9643ea8Slogwang 
727a9643ea8Slogwang void MtFrame::WakeupTimeout()
728a9643ea8Slogwang {
729a9643ea8Slogwang     utime64_t now = GetLastClock();
730a9643ea8Slogwang     MicroThread* thread = dynamic_cast<MicroThread*>(_sleeplist.HeapTop());
731a9643ea8Slogwang     while (thread && (thread->GetWakeupTime() <= now))
732a9643ea8Slogwang     {
733a9643ea8Slogwang         if (thread->HasFlag(MicroThread::IO_LIST))
734a9643ea8Slogwang         {
735a9643ea8Slogwang             RemoveIoWait(thread);
736a9643ea8Slogwang         }
737a9643ea8Slogwang         else
738a9643ea8Slogwang         {
739a9643ea8Slogwang             RemoveSleep(thread);
740a9643ea8Slogwang         }
741a9643ea8Slogwang 
742a9643ea8Slogwang         InsertRunable(thread);
743a9643ea8Slogwang 
744a9643ea8Slogwang         thread = dynamic_cast<MicroThread*>(_sleeplist.HeapTop());
745a9643ea8Slogwang     }
746a9643ea8Slogwang }
747a9643ea8Slogwang 
748a9643ea8Slogwang int MtFrame::KqueueGetTimeout()
749a9643ea8Slogwang {
750a9643ea8Slogwang     utime64_t now = GetLastClock();
751a9643ea8Slogwang     MicroThread* thread = dynamic_cast<MicroThread*>(_sleeplist.HeapTop());
752a9643ea8Slogwang     if (!thread)
753a9643ea8Slogwang     {
754*5ac59bc4Slogwang         return 10; //default 10ms epollwait
755a9643ea8Slogwang     }
756a9643ea8Slogwang     else if (thread->GetWakeupTime() < now)
757a9643ea8Slogwang     {
758a9643ea8Slogwang         return 0;
759a9643ea8Slogwang     }
760a9643ea8Slogwang     else
761a9643ea8Slogwang     {
762a9643ea8Slogwang         return (int)(thread->GetWakeupTime() - now);
763a9643ea8Slogwang     }
764a9643ea8Slogwang }
765a9643ea8Slogwang 
766a9643ea8Slogwang inline void MtFrame::InsertSleep(MicroThread* thread)
767a9643ea8Slogwang {
768a9643ea8Slogwang     ASSERT(!thread->HasFlag(MicroThread::SLEEP_LIST));
769a9643ea8Slogwang 
770a9643ea8Slogwang     thread->SetFlag(MicroThread::SLEEP_LIST);
771a9643ea8Slogwang     thread->SetState(MicroThread::SLEEPING);
772a9643ea8Slogwang     int rc = _sleeplist.HeapPush(thread);
773a9643ea8Slogwang     if (rc < 0)
774a9643ea8Slogwang     {
775a9643ea8Slogwang         MT_ATTR_API(320848, 1); // heap error
776a9643ea8Slogwang         MTLOG_ERROR("Insert heap failed , rc %d", rc);
777a9643ea8Slogwang     }
778a9643ea8Slogwang }
779a9643ea8Slogwang 
780a9643ea8Slogwang inline void MtFrame::RemoveSleep(MicroThread* thread)
781a9643ea8Slogwang {
782a9643ea8Slogwang     ASSERT(thread->HasFlag(MicroThread::SLEEP_LIST));
783a9643ea8Slogwang     thread->UnsetFlag(MicroThread::SLEEP_LIST);
784a9643ea8Slogwang 
785a9643ea8Slogwang     int rc = _sleeplist.HeapDelete(thread);
786a9643ea8Slogwang     if (rc < 0)
787a9643ea8Slogwang     {
788a9643ea8Slogwang         MT_ATTR_API(320849, 1); // heap error
789a9643ea8Slogwang         MTLOG_ERROR("remove heap failed , rc %d", rc);
790a9643ea8Slogwang     }
791a9643ea8Slogwang }
792a9643ea8Slogwang 
793a9643ea8Slogwang inline void MtFrame::InsertIoWait(MicroThread* thread)
794a9643ea8Slogwang {
795a9643ea8Slogwang     ASSERT(!thread->HasFlag(MicroThread::IO_LIST));
796a9643ea8Slogwang     thread->SetFlag(MicroThread::IO_LIST);
797a9643ea8Slogwang     TAILQ_INSERT_TAIL(&_iolist, thread, _entry);
798a9643ea8Slogwang     InsertSleep(thread);
799a9643ea8Slogwang }
800a9643ea8Slogwang 
801a9643ea8Slogwang void MtFrame::RemoveIoWait(MicroThread* thread)
802a9643ea8Slogwang {
803a9643ea8Slogwang     ASSERT(thread->HasFlag(MicroThread::IO_LIST));
804a9643ea8Slogwang     thread->UnsetFlag(MicroThread::IO_LIST);
805a9643ea8Slogwang     TAILQ_REMOVE(&_iolist, thread, _entry);
806a9643ea8Slogwang 
807a9643ea8Slogwang     RemoveSleep(thread);
808a9643ea8Slogwang }
809a9643ea8Slogwang 
810a9643ea8Slogwang void MtFrame::InsertRunable(MicroThread* thread)
811a9643ea8Slogwang {
812a9643ea8Slogwang     ASSERT(!thread->HasFlag(MicroThread::RUN_LIST));
813a9643ea8Slogwang     thread->SetFlag(MicroThread::RUN_LIST);
814a9643ea8Slogwang 
815a9643ea8Slogwang     thread->SetState(MicroThread::RUNABLE);
816a9643ea8Slogwang     _runlist.push(thread);
817a9643ea8Slogwang     _waitnum++;
818a9643ea8Slogwang }
819a9643ea8Slogwang 
820a9643ea8Slogwang inline void MtFrame::RemoveRunable(MicroThread* thread)
821a9643ea8Slogwang {
822a9643ea8Slogwang     ASSERT(thread->HasFlag(MicroThread::RUN_LIST));
823a9643ea8Slogwang     ASSERT(thread == _runlist.front());
824a9643ea8Slogwang     thread->UnsetFlag(MicroThread::RUN_LIST);
825a9643ea8Slogwang 
826a9643ea8Slogwang     _runlist.pop();
827a9643ea8Slogwang     _waitnum--;
828a9643ea8Slogwang }
829a9643ea8Slogwang 
830a9643ea8Slogwang void MtFrame::InsertPend(MicroThread* thread)
831a9643ea8Slogwang {
832a9643ea8Slogwang     ASSERT(!thread->HasFlag(MicroThread::PEND_LIST));
833a9643ea8Slogwang     thread->SetFlag(MicroThread::PEND_LIST);
834a9643ea8Slogwang     TAILQ_INSERT_TAIL(&_pend_list, thread, _entry);
835a9643ea8Slogwang     thread->SetState(MicroThread::PENDING);
836a9643ea8Slogwang }
837a9643ea8Slogwang 
838a9643ea8Slogwang void MtFrame::RemovePend(MicroThread* thread)
839a9643ea8Slogwang {
840a9643ea8Slogwang     ASSERT(thread->HasFlag(MicroThread::PEND_LIST));
841a9643ea8Slogwang     thread->UnsetFlag(MicroThread::PEND_LIST);
842a9643ea8Slogwang     TAILQ_REMOVE(&_pend_list, thread, _entry);
843a9643ea8Slogwang }
844a9643ea8Slogwang 
845a9643ea8Slogwang void MtFrame::WaitNotify(utime64_t timeout)
846a9643ea8Slogwang {
847a9643ea8Slogwang     MicroThread* thread = GetActiveThread();
848a9643ea8Slogwang 
849a9643ea8Slogwang     thread->SetWakeupTime(timeout + this->GetLastClock());
850a9643ea8Slogwang     this->InsertIoWait(thread);
851a9643ea8Slogwang     thread->SwitchContext();
852a9643ea8Slogwang }
853a9643ea8Slogwang 
854a9643ea8Slogwang bool MtFrame::KqueueSchedule(KqObjList* fdlist, KqueuerObj* fd, int timeout)
855a9643ea8Slogwang {
856a9643ea8Slogwang     MicroThread* thread = GetActiveThread();
857a9643ea8Slogwang     if (NULL == thread)
858a9643ea8Slogwang     {
859a9643ea8Slogwang         MTLOG_ERROR("active thread null, epoll schedule failed");
860a9643ea8Slogwang         return false;
861a9643ea8Slogwang     }
862a9643ea8Slogwang 
863a9643ea8Slogwang     thread->ClearAllFd();
864a9643ea8Slogwang     if (fdlist)
865a9643ea8Slogwang     {
866a9643ea8Slogwang         thread->AddFdList(fdlist);
867a9643ea8Slogwang     }
868a9643ea8Slogwang     if (fd)
869a9643ea8Slogwang     {
870a9643ea8Slogwang         thread->AddFd(fd);
871a9643ea8Slogwang     }
872a9643ea8Slogwang 
873a9643ea8Slogwang     thread->SetWakeupTime(timeout + this->GetLastClock());
874a9643ea8Slogwang     if (!this->KqueueAdd(thread->GetFdSet()))
875a9643ea8Slogwang     {
876a9643ea8Slogwang         MTLOG_ERROR("epoll add failed, errno: %d", errno);
877a9643ea8Slogwang         return false;
878a9643ea8Slogwang     }
879a9643ea8Slogwang     this->InsertIoWait(thread);
880a9643ea8Slogwang     thread->SwitchContext();
881a9643ea8Slogwang 
882a9643ea8Slogwang     int rcvnum = 0;
883a9643ea8Slogwang     KqObjList& rcvfds = thread->GetFdSet();
884a9643ea8Slogwang     KqueuerObj* fdata = NULL;
885a9643ea8Slogwang     TAILQ_FOREACH(fdata, &rcvfds, _entry)
886a9643ea8Slogwang     {
887a9643ea8Slogwang         if (fdata->GetRcvEvents() != 0)
888a9643ea8Slogwang         {
889a9643ea8Slogwang             rcvnum++;
890a9643ea8Slogwang         }
891a9643ea8Slogwang     }
892*5ac59bc4Slogwang     this->KqueueDel(rcvfds);
893a9643ea8Slogwang 
894*5ac59bc4Slogwang     if (rcvnum == 0)
895a9643ea8Slogwang     {
896a9643ea8Slogwang         errno = ETIME;
897a9643ea8Slogwang         return false;
898a9643ea8Slogwang     }
899a9643ea8Slogwang 
900a9643ea8Slogwang     return true;
901a9643ea8Slogwang }
902a9643ea8Slogwang 
903a9643ea8Slogwang int MtFrame::recvfrom(int fd, void *buf, int len, int flags, struct sockaddr *from, socklen_t *fromlen, int timeout)
904a9643ea8Slogwang {
905a9643ea8Slogwang     MtFrame* mtframe = MtFrame::Instance();
906a9643ea8Slogwang     utime64_t start = mtframe->GetLastClock();
907a9643ea8Slogwang     MicroThread* thread = mtframe->GetActiveThread();
908a9643ea8Slogwang     utime64_t now = 0;
909a9643ea8Slogwang 
910a9643ea8Slogwang     if(fd<0 || !buf || len<1)
911a9643ea8Slogwang     {
912a9643ea8Slogwang         errno = EINVAL;
913a9643ea8Slogwang         MTLOG_ERROR("recvfrom failed, errno: %d (%m)", errno);
914a9643ea8Slogwang         return -10;
915a9643ea8Slogwang     }
916a9643ea8Slogwang 
917a9643ea8Slogwang     if (timeout <= -1)
918a9643ea8Slogwang     {
919a9643ea8Slogwang         timeout = 0x7fffffff;
920a9643ea8Slogwang     }
921a9643ea8Slogwang 
922a9643ea8Slogwang     while (true)
923a9643ea8Slogwang     {
924a9643ea8Slogwang         now = mtframe->GetLastClock();
925a9643ea8Slogwang         if ((int)(now - start) > timeout)
926a9643ea8Slogwang         {
927a9643ea8Slogwang             errno = ETIME;
928a9643ea8Slogwang             return -1;
929a9643ea8Slogwang         }
930a9643ea8Slogwang 
931a9643ea8Slogwang         KqueuerObj epfd;
932a9643ea8Slogwang         epfd.SetOsfd(fd);
933a9643ea8Slogwang         epfd.EnableInput();
934a9643ea8Slogwang         epfd.SetOwnerThread(thread);
935a9643ea8Slogwang         if (!mtframe->KqueueSchedule(NULL, &epfd, timeout))
936a9643ea8Slogwang         {
937a9643ea8Slogwang             MTLOG_DEBUG("epoll schedule failed, errno: %d", errno);
938a9643ea8Slogwang             return -2;
939a9643ea8Slogwang         }
940a9643ea8Slogwang 
941a9643ea8Slogwang         mt_hook_syscall(recvfrom);
942a9643ea8Slogwang         int n = ff_hook_recvfrom(fd, buf, len, flags, from, fromlen);
943a9643ea8Slogwang         if (n < 0)
944a9643ea8Slogwang         {
945a9643ea8Slogwang             if (errno == EINTR) {
946a9643ea8Slogwang                 continue;
947a9643ea8Slogwang             }
948a9643ea8Slogwang 
949a9643ea8Slogwang             if ((errno != EAGAIN) && (errno != EWOULDBLOCK))
950a9643ea8Slogwang             {
951a9643ea8Slogwang                 MTLOG_ERROR("recvfrom failed, errno: %d", errno);
952a9643ea8Slogwang                 return -3;
953a9643ea8Slogwang             }
954a9643ea8Slogwang         }
955a9643ea8Slogwang         else
956a9643ea8Slogwang         {
957a9643ea8Slogwang             return n;
958a9643ea8Slogwang         }
959a9643ea8Slogwang     }
960a9643ea8Slogwang 
961a9643ea8Slogwang }
962a9643ea8Slogwang 
963a9643ea8Slogwang int MtFrame::sendto(int fd, const void *msg, int len, int flags, const struct sockaddr *to, int tolen, int timeout)
964a9643ea8Slogwang {
965a9643ea8Slogwang     MtFrame* mtframe = MtFrame::Instance();
966a9643ea8Slogwang     utime64_t start = mtframe->GetLastClock();
967a9643ea8Slogwang     MicroThread* thread = mtframe->GetActiveThread();
968a9643ea8Slogwang     utime64_t now = 0;
969a9643ea8Slogwang 
970a9643ea8Slogwang     if(fd<0 || !msg || len<1)
971a9643ea8Slogwang     {
972a9643ea8Slogwang         errno = EINVAL;
973a9643ea8Slogwang         MTLOG_ERROR("sendto failed, errno: %d (%m)", errno);
974a9643ea8Slogwang         return -10;
975a9643ea8Slogwang     }
976a9643ea8Slogwang 
977a9643ea8Slogwang     int n = 0;
978a9643ea8Slogwang     mt_hook_syscall(sendto);
979a9643ea8Slogwang     while ((n = ff_hook_sendto(fd, msg, len, flags, to, tolen)) < 0)
980a9643ea8Slogwang     {
981a9643ea8Slogwang         now = mtframe->GetLastClock();
982a9643ea8Slogwang         if ((int)(now - start) > timeout)
983a9643ea8Slogwang         {
984a9643ea8Slogwang             errno = ETIME;
985a9643ea8Slogwang             return -1;
986a9643ea8Slogwang         }
987a9643ea8Slogwang 
988a9643ea8Slogwang         if (errno == EINTR) {
989a9643ea8Slogwang             continue;
990a9643ea8Slogwang         }
991a9643ea8Slogwang 
992a9643ea8Slogwang         if ((errno != EAGAIN) && (errno != EWOULDBLOCK)) {
993a9643ea8Slogwang             MTLOG_ERROR("sendto failed, errno: %d", errno);
994a9643ea8Slogwang             return -2;
995a9643ea8Slogwang         }
996a9643ea8Slogwang 
997a9643ea8Slogwang         KqueuerObj epfd;
998a9643ea8Slogwang         epfd.SetOsfd(fd);
999a9643ea8Slogwang         epfd.EnableOutput();
1000a9643ea8Slogwang         epfd.SetOwnerThread(thread);
1001a9643ea8Slogwang         if (!mtframe->KqueueSchedule(NULL, &epfd, timeout)) {
1002a9643ea8Slogwang             return -3;
1003a9643ea8Slogwang         }
1004a9643ea8Slogwang     }
1005a9643ea8Slogwang 
1006a9643ea8Slogwang     return n;
1007a9643ea8Slogwang }
1008a9643ea8Slogwang 
1009a9643ea8Slogwang int MtFrame::connect(int fd, const struct sockaddr *addr, int addrlen, int timeout)
1010a9643ea8Slogwang {
1011a9643ea8Slogwang     MtFrame* mtframe = MtFrame::Instance();
1012a9643ea8Slogwang     utime64_t start = mtframe->GetLastClock();
1013a9643ea8Slogwang     MicroThread* thread = mtframe->GetActiveThread();
1014a9643ea8Slogwang     utime64_t now = 0;
1015a9643ea8Slogwang 
1016a9643ea8Slogwang     if(fd<0 || !addr || addrlen<1)
1017a9643ea8Slogwang     {
1018a9643ea8Slogwang         errno = EINVAL;
1019a9643ea8Slogwang         MTLOG_ERROR("connect failed, errno: %d (%m)", errno);
1020a9643ea8Slogwang         return -10;
1021a9643ea8Slogwang     }
1022a9643ea8Slogwang 
1023a9643ea8Slogwang     int n = 0;
1024a9643ea8Slogwang     mt_hook_syscall(connect);
1025a9643ea8Slogwang     while ((n = ff_hook_connect(fd, addr, addrlen)) < 0)
1026a9643ea8Slogwang     {
1027a9643ea8Slogwang         now = mtframe->GetLastClock();
1028a9643ea8Slogwang         if ((int)(now - start) > timeout)
1029a9643ea8Slogwang         {
1030a9643ea8Slogwang             errno = ETIME;
1031a9643ea8Slogwang             return -1;
1032a9643ea8Slogwang         }
1033a9643ea8Slogwang 
1034*5ac59bc4Slogwang         if (errno == EISCONN)
1035a9643ea8Slogwang         {
1036a9643ea8Slogwang             return 0;
1037a9643ea8Slogwang         }
1038a9643ea8Slogwang 
1039a9643ea8Slogwang         if (errno == EINTR) {
1040a9643ea8Slogwang             continue;
1041a9643ea8Slogwang         }
1042a9643ea8Slogwang 
1043a9643ea8Slogwang         if (errno != EINPROGRESS) {
1044a9643ea8Slogwang             MTLOG_ERROR("connect failed, errno: %d", errno);
1045a9643ea8Slogwang             return -2;
1046a9643ea8Slogwang         }
1047a9643ea8Slogwang 
1048a9643ea8Slogwang         KqueuerObj epfd;
1049a9643ea8Slogwang         epfd.SetOsfd(fd);
1050a9643ea8Slogwang         epfd.EnableOutput();
1051a9643ea8Slogwang         epfd.SetOwnerThread(thread);
1052a9643ea8Slogwang         if (!mtframe->KqueueSchedule(NULL, &epfd, timeout)) {
1053a9643ea8Slogwang             return -3;
1054a9643ea8Slogwang         }
1055a9643ea8Slogwang     }
1056a9643ea8Slogwang 
1057a9643ea8Slogwang     return n;
1058a9643ea8Slogwang }
1059a9643ea8Slogwang 
1060a9643ea8Slogwang int MtFrame::accept(int fd, struct sockaddr *addr, socklen_t *addrlen, int timeout)
1061a9643ea8Slogwang {
1062a9643ea8Slogwang     MtFrame* mtframe = MtFrame::Instance();
1063a9643ea8Slogwang     utime64_t start = mtframe->GetLastClock();
1064a9643ea8Slogwang     MicroThread* thread = mtframe->GetActiveThread();
1065a9643ea8Slogwang     utime64_t now = 0;
1066a9643ea8Slogwang 
1067a9643ea8Slogwang     if(fd<0)
1068a9643ea8Slogwang     {
1069a9643ea8Slogwang         errno = EINVAL;
1070a9643ea8Slogwang         MTLOG_ERROR("accept failed, errno: %d (%m)", errno);
1071a9643ea8Slogwang         return -10;
1072a9643ea8Slogwang     }
1073a9643ea8Slogwang 
1074a9643ea8Slogwang     int acceptfd = 0;
1075a9643ea8Slogwang     mt_hook_syscall(accept);
1076a9643ea8Slogwang     while ((acceptfd = ff_hook_accept(fd, addr, addrlen)) < 0)
1077a9643ea8Slogwang     {
1078a9643ea8Slogwang         now = mtframe->GetLastClock();
1079a9643ea8Slogwang         if ((int)(now - start) > timeout)
1080a9643ea8Slogwang         {
1081a9643ea8Slogwang             errno = ETIME;
1082a9643ea8Slogwang             return -1;
1083a9643ea8Slogwang         }
1084a9643ea8Slogwang 
1085a9643ea8Slogwang         if (errno == EINTR) {
1086a9643ea8Slogwang             continue;
1087a9643ea8Slogwang         }
1088a9643ea8Slogwang 
1089a9643ea8Slogwang         if (!((errno == EAGAIN) || (errno == EWOULDBLOCK))) {
1090a9643ea8Slogwang             MTLOG_ERROR("accept failed, errno: %d", errno);
1091a9643ea8Slogwang             return -2;
1092a9643ea8Slogwang         }
1093a9643ea8Slogwang 
1094a9643ea8Slogwang         KqueuerObj epfd;
1095a9643ea8Slogwang         epfd.SetOsfd(fd);
1096a9643ea8Slogwang         epfd.EnableInput();
1097a9643ea8Slogwang         epfd.SetOwnerThread(thread);
1098a9643ea8Slogwang         if (!mtframe->KqueueSchedule(NULL, &epfd, timeout)) {
1099a9643ea8Slogwang             return -3;
1100a9643ea8Slogwang         }
1101a9643ea8Slogwang     }
1102a9643ea8Slogwang 
1103a9643ea8Slogwang     return acceptfd;
1104a9643ea8Slogwang }
1105a9643ea8Slogwang 
1106a9643ea8Slogwang ssize_t MtFrame::read(int fd, void *buf, size_t nbyte, int timeout)
1107a9643ea8Slogwang {
1108a9643ea8Slogwang     MtFrame* mtframe = MtFrame::Instance();
1109a9643ea8Slogwang     utime64_t start = mtframe->GetLastClock();
1110a9643ea8Slogwang     MicroThread* thread = mtframe->GetActiveThread();
1111a9643ea8Slogwang     utime64_t now = 0;
1112a9643ea8Slogwang 
1113a9643ea8Slogwang     if(fd<0 || !buf || nbyte<1)
1114a9643ea8Slogwang     {
1115a9643ea8Slogwang         errno = EINVAL;
1116a9643ea8Slogwang         MTLOG_ERROR("read failed, errno: %d (%m)", errno);
1117a9643ea8Slogwang         return -10;
1118a9643ea8Slogwang     }
1119a9643ea8Slogwang 
1120a9643ea8Slogwang     ssize_t n = 0;
1121a9643ea8Slogwang     mt_hook_syscall(read);
1122a9643ea8Slogwang     while ((n = ff_hook_read(fd, buf, nbyte)) < 0)
1123a9643ea8Slogwang     {
1124a9643ea8Slogwang         now = mtframe->GetLastClock();
1125a9643ea8Slogwang         if ((int)(now - start) > timeout)
1126a9643ea8Slogwang         {
1127a9643ea8Slogwang             errno = ETIME;
1128a9643ea8Slogwang             return -1;
1129a9643ea8Slogwang         }
1130a9643ea8Slogwang 
1131a9643ea8Slogwang         if (errno == EINTR) {
1132a9643ea8Slogwang             continue;
1133a9643ea8Slogwang         }
1134a9643ea8Slogwang 
1135a9643ea8Slogwang         if ((errno != EAGAIN) && (errno != EWOULDBLOCK)) {
1136a9643ea8Slogwang             MTLOG_ERROR("read failed, errno: %d", errno);
1137a9643ea8Slogwang             return -2;
1138a9643ea8Slogwang         }
1139a9643ea8Slogwang 
1140a9643ea8Slogwang         KqueuerObj epfd;
1141a9643ea8Slogwang         epfd.SetOsfd(fd);
1142a9643ea8Slogwang         epfd.EnableInput();
1143a9643ea8Slogwang         epfd.SetOwnerThread(thread);
1144a9643ea8Slogwang         if (!mtframe->KqueueSchedule(NULL, &epfd, timeout)) {
1145a9643ea8Slogwang             return -3;
1146a9643ea8Slogwang         }
1147a9643ea8Slogwang     }
1148a9643ea8Slogwang 
1149a9643ea8Slogwang     return n;
1150a9643ea8Slogwang }
1151a9643ea8Slogwang 
1152a9643ea8Slogwang ssize_t MtFrame::write(int fd, const void *buf, size_t nbyte, int timeout)
1153a9643ea8Slogwang {
1154a9643ea8Slogwang     MtFrame* mtframe = MtFrame::Instance();
1155a9643ea8Slogwang     utime64_t start = mtframe->GetLastClock();
1156a9643ea8Slogwang     MicroThread* thread = mtframe->GetActiveThread();
1157a9643ea8Slogwang     utime64_t now = 0;
1158a9643ea8Slogwang 
1159a9643ea8Slogwang     if(fd<0 || !buf || nbyte<1)
1160a9643ea8Slogwang     {
1161a9643ea8Slogwang         errno = EINVAL;
1162a9643ea8Slogwang         MTLOG_ERROR("write failed, errno: %d (%m)", errno);
1163a9643ea8Slogwang         return -10;
1164a9643ea8Slogwang     }
1165a9643ea8Slogwang 
1166a9643ea8Slogwang     ssize_t n = 0;
1167a9643ea8Slogwang     size_t send_len = 0;
1168a9643ea8Slogwang     while (send_len < nbyte)
1169a9643ea8Slogwang     {
1170a9643ea8Slogwang         now = mtframe->GetLastClock();
1171a9643ea8Slogwang         if ((int)(now - start) > timeout)
1172a9643ea8Slogwang         {
1173a9643ea8Slogwang             errno = ETIME;
1174a9643ea8Slogwang             return -1;
1175a9643ea8Slogwang         }
1176a9643ea8Slogwang 
1177a9643ea8Slogwang         mt_hook_syscall(write);
1178a9643ea8Slogwang         n = ff_hook_write(fd, (char*)buf + send_len, nbyte - send_len);
1179a9643ea8Slogwang         if (n < 0)
1180a9643ea8Slogwang         {
1181a9643ea8Slogwang             if (errno == EINTR) {
1182a9643ea8Slogwang                 continue;
1183a9643ea8Slogwang             }
1184a9643ea8Slogwang 
1185a9643ea8Slogwang             if ((errno != EAGAIN) && (errno != EWOULDBLOCK)) {
1186a9643ea8Slogwang                 MTLOG_ERROR("write failed, errno: %d", errno);
1187a9643ea8Slogwang                 return -2;
1188a9643ea8Slogwang             }
1189a9643ea8Slogwang         }
1190a9643ea8Slogwang         else
1191a9643ea8Slogwang         {
1192a9643ea8Slogwang             send_len += n;
1193a9643ea8Slogwang             if (send_len >= nbyte) {
1194a9643ea8Slogwang                 return nbyte;
1195a9643ea8Slogwang             }
1196a9643ea8Slogwang         }
1197a9643ea8Slogwang 
1198a9643ea8Slogwang         KqueuerObj epfd;
1199a9643ea8Slogwang         epfd.SetOsfd(fd);
1200a9643ea8Slogwang         epfd.EnableOutput();
1201a9643ea8Slogwang         epfd.SetOwnerThread(thread);
1202a9643ea8Slogwang         if (!mtframe->KqueueSchedule(NULL, &epfd, timeout)) {
1203a9643ea8Slogwang             return -3;
1204a9643ea8Slogwang         }
1205a9643ea8Slogwang     }
1206a9643ea8Slogwang 
1207a9643ea8Slogwang     return nbyte;
1208a9643ea8Slogwang }
1209a9643ea8Slogwang 
1210a9643ea8Slogwang int MtFrame::recv(int fd, void *buf, int len, int flags, int timeout)
1211a9643ea8Slogwang {
1212a9643ea8Slogwang     MtFrame* mtframe = MtFrame::Instance();
1213a9643ea8Slogwang     utime64_t start = mtframe->GetLastClock();
1214a9643ea8Slogwang     MicroThread* thread = mtframe->GetActiveThread();
1215a9643ea8Slogwang     utime64_t now = 0;
1216a9643ea8Slogwang 
1217a9643ea8Slogwang     if(fd<0 || !buf || len<1)
1218a9643ea8Slogwang     {
1219a9643ea8Slogwang         errno = EINVAL;
1220a9643ea8Slogwang         MTLOG_ERROR("recv failed, errno: %d (%m)", errno);
1221a9643ea8Slogwang         return -10;
1222a9643ea8Slogwang     }
1223a9643ea8Slogwang 
1224a9643ea8Slogwang     if (timeout <= -1)
1225a9643ea8Slogwang     {
1226a9643ea8Slogwang         timeout = 0x7fffffff;
1227a9643ea8Slogwang     }
1228a9643ea8Slogwang 
1229a9643ea8Slogwang     while (true)
1230a9643ea8Slogwang     {
1231a9643ea8Slogwang         now = mtframe->GetLastClock();
1232a9643ea8Slogwang         if ((int)(now - start) > timeout)
1233a9643ea8Slogwang         {
1234a9643ea8Slogwang             errno = ETIME;
1235a9643ea8Slogwang             return -1;
1236a9643ea8Slogwang         }
1237a9643ea8Slogwang 
1238a9643ea8Slogwang         KqueuerObj epfd;
1239a9643ea8Slogwang         epfd.SetOsfd(fd);
1240a9643ea8Slogwang         epfd.EnableInput();
1241a9643ea8Slogwang         epfd.SetOwnerThread(thread);
1242a9643ea8Slogwang         if (!mtframe->KqueueSchedule(NULL, &epfd, timeout))
1243a9643ea8Slogwang         {
1244a9643ea8Slogwang             MTLOG_DEBUG("epoll schedule failed, errno: %d", errno);
1245a9643ea8Slogwang             return -2;
1246a9643ea8Slogwang         }
1247a9643ea8Slogwang 
1248a9643ea8Slogwang         mt_hook_syscall(recv);
1249a9643ea8Slogwang         int n = ff_hook_recv(fd, buf, len, flags);
1250a9643ea8Slogwang         if (n < 0)
1251a9643ea8Slogwang         {
1252a9643ea8Slogwang             if (errno == EINTR) {
1253a9643ea8Slogwang                 continue;
1254a9643ea8Slogwang             }
1255a9643ea8Slogwang 
1256a9643ea8Slogwang             if ((errno != EAGAIN) && (errno != EWOULDBLOCK))
1257a9643ea8Slogwang             {
1258a9643ea8Slogwang                 MTLOG_ERROR("recv failed, errno: %d", errno);
1259a9643ea8Slogwang                 return -3;
1260a9643ea8Slogwang             }
1261a9643ea8Slogwang         }
1262a9643ea8Slogwang         else
1263a9643ea8Slogwang         {
1264a9643ea8Slogwang             return n;
1265a9643ea8Slogwang         }
1266a9643ea8Slogwang     }
1267a9643ea8Slogwang 
1268a9643ea8Slogwang }
1269a9643ea8Slogwang 
1270a9643ea8Slogwang ssize_t MtFrame::send(int fd, const void *buf, size_t nbyte, int flags, int timeout)
1271a9643ea8Slogwang {
1272a9643ea8Slogwang     MtFrame* mtframe = MtFrame::Instance();
1273a9643ea8Slogwang     utime64_t start = mtframe->GetLastClock();
1274a9643ea8Slogwang     MicroThread* thread = mtframe->GetActiveThread();
1275a9643ea8Slogwang     utime64_t now = 0;
1276a9643ea8Slogwang 
1277a9643ea8Slogwang     if(fd<0 || !buf || nbyte<1)
1278a9643ea8Slogwang     {
1279a9643ea8Slogwang         errno = EINVAL;
1280a9643ea8Slogwang         MTLOG_ERROR("send failed, errno: %d (%m)", errno);
1281a9643ea8Slogwang         return -10;
1282a9643ea8Slogwang     }
1283a9643ea8Slogwang 
1284a9643ea8Slogwang     ssize_t n = 0;
1285a9643ea8Slogwang     size_t send_len = 0;
1286a9643ea8Slogwang     while (send_len < nbyte)
1287a9643ea8Slogwang     {
1288a9643ea8Slogwang         now = mtframe->GetLastClock();
1289a9643ea8Slogwang         if ((int)(now - start) > timeout)
1290a9643ea8Slogwang         {
1291a9643ea8Slogwang             errno = ETIME;
1292a9643ea8Slogwang             return -1;
1293a9643ea8Slogwang         }
1294a9643ea8Slogwang 
1295a9643ea8Slogwang         mt_hook_syscall(send);
1296a9643ea8Slogwang         n = ff_hook_send(fd, (char*)buf + send_len, nbyte - send_len, flags);
1297a9643ea8Slogwang         if (n < 0)
1298a9643ea8Slogwang         {
1299a9643ea8Slogwang             if (errno == EINTR) {
1300a9643ea8Slogwang                 continue;
1301a9643ea8Slogwang             }
1302a9643ea8Slogwang 
1303a9643ea8Slogwang             if ((errno != EAGAIN) && (errno != EWOULDBLOCK)) {
1304a9643ea8Slogwang                 MTLOG_ERROR("write failed, errno: %d", errno);
1305a9643ea8Slogwang                 return -2;
1306a9643ea8Slogwang             }
1307a9643ea8Slogwang         }
1308a9643ea8Slogwang         else
1309a9643ea8Slogwang         {
1310a9643ea8Slogwang             send_len += n;
1311a9643ea8Slogwang             if (send_len >= nbyte) {
1312a9643ea8Slogwang                 return nbyte;
1313a9643ea8Slogwang             }
1314a9643ea8Slogwang         }
1315a9643ea8Slogwang 
1316a9643ea8Slogwang         KqueuerObj epfd;
1317a9643ea8Slogwang         epfd.SetOsfd(fd);
1318a9643ea8Slogwang         epfd.EnableOutput();
1319a9643ea8Slogwang         epfd.SetOwnerThread(thread);
1320a9643ea8Slogwang         if (!mtframe->KqueueSchedule(NULL, &epfd, timeout)) {
1321a9643ea8Slogwang             return -3;
1322a9643ea8Slogwang         }
1323a9643ea8Slogwang     }
1324a9643ea8Slogwang 
1325a9643ea8Slogwang     return nbyte;
1326a9643ea8Slogwang }
1327a9643ea8Slogwang 
1328a9643ea8Slogwang void MtFrame::sleep(int ms)
1329a9643ea8Slogwang {
1330a9643ea8Slogwang     MtFrame* frame = MtFrame::Instance();
1331a9643ea8Slogwang     MicroThread* thread = frame->GetActiveThread();
1332a9643ea8Slogwang     if (thread != NULL)
1333a9643ea8Slogwang     {
1334a9643ea8Slogwang         thread->sleep(ms);
1335a9643ea8Slogwang     }
1336a9643ea8Slogwang }
1337a9643ea8Slogwang 
1338a9643ea8Slogwang int MtFrame::WaitEvents(int fd, int events, int timeout)
1339a9643ea8Slogwang {
1340a9643ea8Slogwang     MtFrame* mtframe = MtFrame::Instance();
1341a9643ea8Slogwang     utime64_t start = mtframe->GetLastClock();
1342a9643ea8Slogwang     MicroThread* thread = mtframe->GetActiveThread();
1343a9643ea8Slogwang     utime64_t now = 0;
1344a9643ea8Slogwang 
1345a9643ea8Slogwang     if (timeout <= -1)
1346a9643ea8Slogwang     {
1347a9643ea8Slogwang         timeout = 0x7fffffff;
1348a9643ea8Slogwang     }
1349a9643ea8Slogwang 
1350a9643ea8Slogwang     while (true)
1351a9643ea8Slogwang     {
1352a9643ea8Slogwang         now = mtframe->GetLastClock();
1353a9643ea8Slogwang         if ((int)(now - start) > timeout)
1354a9643ea8Slogwang         {
1355a9643ea8Slogwang             errno = ETIME;
1356a9643ea8Slogwang             return 0;
1357a9643ea8Slogwang         }
1358a9643ea8Slogwang 
1359a9643ea8Slogwang         KqueuerObj epfd;
1360a9643ea8Slogwang         epfd.SetOsfd(fd);
1361a9643ea8Slogwang         if (events & KQ_EVENT_READ)
1362a9643ea8Slogwang         {
1363a9643ea8Slogwang             epfd.EnableInput();
1364a9643ea8Slogwang         }
1365a9643ea8Slogwang         if (events & KQ_EVENT_WRITE)
1366a9643ea8Slogwang         {
1367a9643ea8Slogwang             epfd.EnableOutput();
1368a9643ea8Slogwang         }
1369a9643ea8Slogwang         epfd.SetOwnerThread(thread);
1370a9643ea8Slogwang 
1371a9643ea8Slogwang         if (!mtframe->KqueueSchedule(NULL, &epfd, timeout))
1372a9643ea8Slogwang         {
1373a9643ea8Slogwang             MTLOG_TRACE("epoll schedule failed, errno: %d", errno);
1374a9643ea8Slogwang             return 0;
1375a9643ea8Slogwang         }
1376a9643ea8Slogwang 
1377a9643ea8Slogwang         return epfd.GetRcvEvents();
1378a9643ea8Slogwang     }
1379a9643ea8Slogwang }
1380