xref: /f-stack/app/micro_thread/micro_thread.cpp (revision 84bcae25)
1a9643ea8Slogwang 
2a9643ea8Slogwang /**
3a9643ea8Slogwang  * Tencent is pleased to support the open source community by making MSEC available.
4a9643ea8Slogwang  *
5a9643ea8Slogwang  * Copyright (C) 2016 THL A29 Limited, a Tencent company. All rights reserved.
6a9643ea8Slogwang  *
7a9643ea8Slogwang  * Licensed under the GNU General Public License, Version 2.0 (the "License");
8a9643ea8Slogwang  * you may not use this file except in compliance with the License. You may
9a9643ea8Slogwang  * obtain a copy of the License at
10a9643ea8Slogwang  *
11a9643ea8Slogwang  *     https://opensource.org/licenses/GPL-2.0
12a9643ea8Slogwang  *
13a9643ea8Slogwang  * Unless required by applicable law or agreed to in writing, software distributed under the
14a9643ea8Slogwang  * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
15a9643ea8Slogwang  * either express or implied. See the License for the specific language governing permissions
16a9643ea8Slogwang  * and limitations under the License.
17a9643ea8Slogwang  */
18a9643ea8Slogwang 
19a9643ea8Slogwang 
20a9643ea8Slogwang /**
21a9643ea8Slogwang  *  @filename micro_thread.cpp
22a9643ea8Slogwang  *  @info  micro thread manager
23a9643ea8Slogwang  */
24a9643ea8Slogwang #include "mt_version.h"
25a9643ea8Slogwang #include "micro_thread.h"
26a9643ea8Slogwang #include "mt_net.h"
27a9643ea8Slogwang #include "valgrind.h"
28a9643ea8Slogwang #include <assert.h>
29a9643ea8Slogwang #include "mt_sys_hook.h"
30a9643ea8Slogwang #include "ff_hook.h"
31a9643ea8Slogwang #include "ff_api.h"
32a9643ea8Slogwang 
33a9643ea8Slogwang using namespace NS_MICRO_THREAD;
34a9643ea8Slogwang 
35a9643ea8Slogwang #define  ASSERT(statement)
36a9643ea8Slogwang //#define  ASSERT(statement)   assert(statement)
37a9643ea8Slogwang 
38a9643ea8Slogwang extern "C"  int save_context(jmp_buf jbf);
39a9643ea8Slogwang 
40a9643ea8Slogwang extern "C"  void restore_context(jmp_buf jbf, int ret);
41a9643ea8Slogwang 
42a9643ea8Slogwang extern "C"  void replace_esp(jmp_buf jbf, void* esp);
43a9643ea8Slogwang 
Thread(int stack_size)44a9643ea8Slogwang Thread::Thread(int stack_size)
45a9643ea8Slogwang {
46a9643ea8Slogwang     _stack_size  = stack_size ? stack_size : ThreadPool::default_stack_size;
47a9643ea8Slogwang     _wakeup_time = 0;
48a9643ea8Slogwang     _stack       = NULL;
49a9643ea8Slogwang     memset(&_jmpbuf, 0, sizeof(_jmpbuf));
50a9643ea8Slogwang }
51a9643ea8Slogwang 
52*84bcae25Swoolen static DefaultLogAdapter def_log_adapt;
53a9643ea8Slogwang /**
545ac59bc4Slogwang  *  @brief LINUX x86/x86_64's allocated stacks.
55a9643ea8Slogwang  */
InitStack()56a9643ea8Slogwang bool Thread::InitStack()
57a9643ea8Slogwang {
58a9643ea8Slogwang     if (_stack) {
59a9643ea8Slogwang         return true;
60a9643ea8Slogwang     }
61a9643ea8Slogwang 
625ac59bc4Slogwang     ///< stack index and memory are separated to prevent out of bounds.
63a9643ea8Slogwang     _stack = (MtStack*)calloc(1, sizeof(MtStack));
64a9643ea8Slogwang     if (NULL == _stack)
65a9643ea8Slogwang     {
66a9643ea8Slogwang         MTLOG_ERROR("calloc stack failed, size %u", sizeof(MtStack));
67a9643ea8Slogwang         return false;
68a9643ea8Slogwang     }
69a9643ea8Slogwang 
70a9643ea8Slogwang     int memsize = MEM_PAGE_SIZE*2 + _stack_size;
71a9643ea8Slogwang     memsize = (memsize + MEM_PAGE_SIZE - 1)/MEM_PAGE_SIZE*MEM_PAGE_SIZE;
72a9643ea8Slogwang 
73a9643ea8Slogwang     static int zero_fd = -1;
74a9643ea8Slogwang     int mmap_flags = MAP_PRIVATE | MAP_ANON;
75a9643ea8Slogwang     void* vaddr = mmap(NULL, memsize, PROT_READ | PROT_WRITE, mmap_flags, zero_fd, 0);
76a9643ea8Slogwang     if (vaddr == (void *)MAP_FAILED)
77a9643ea8Slogwang     {
78*84bcae25Swoolen         MTLOG_ERROR("mmap stack failed, size %d,errmsg: %s.", memsize,strerror(errno));
79a9643ea8Slogwang         free(_stack);
80a9643ea8Slogwang         _stack = NULL;
81a9643ea8Slogwang         return false;
82a9643ea8Slogwang     }
83a9643ea8Slogwang     _stack->_vaddr = (char*)vaddr;
84a9643ea8Slogwang     _stack->_vaddr_size = memsize;
85a9643ea8Slogwang     _stack->_stk_size = _stack_size;
86a9643ea8Slogwang     _stack->_stk_bottom = _stack->_vaddr + MEM_PAGE_SIZE;
87a9643ea8Slogwang     _stack->_stk_top = _stack->_stk_bottom + _stack->_stk_size;
88a9643ea8Slogwang     // valgrind support: register stack frame
89a9643ea8Slogwang     _stack->valgrind_id = VALGRIND_STACK_REGISTER(_stack->_stk_bottom, _stack->_stk_top);
90a9643ea8Slogwang 
91a9643ea8Slogwang     _stack->_esp = _stack->_stk_top - STACK_PAD_SIZE;
92a9643ea8Slogwang 
93a9643ea8Slogwang     mprotect(_stack->_vaddr, MEM_PAGE_SIZE, PROT_NONE);
94a9643ea8Slogwang     mprotect(_stack->_stk_top, MEM_PAGE_SIZE, PROT_NONE);
95a9643ea8Slogwang 
96a9643ea8Slogwang     return true;
97a9643ea8Slogwang }
98a9643ea8Slogwang 
99a9643ea8Slogwang 
FreeStack()100a9643ea8Slogwang void Thread::FreeStack()
101a9643ea8Slogwang {
102a9643ea8Slogwang     if (!_stack) {
103a9643ea8Slogwang         return;
104a9643ea8Slogwang     }
105a9643ea8Slogwang     munmap(_stack->_vaddr, _stack->_vaddr_size);
106a9643ea8Slogwang     // valgrind support: deregister stack frame
107a9643ea8Slogwang     VALGRIND_STACK_DEREGISTER(_stack->valgrind_id);
108a9643ea8Slogwang     free(_stack);
109a9643ea8Slogwang     _stack = NULL;
110a9643ea8Slogwang }
111a9643ea8Slogwang 
InitContext()112a9643ea8Slogwang void Thread::InitContext()
113a9643ea8Slogwang {
114a9643ea8Slogwang     if (save_context(_jmpbuf) != 0)
115a9643ea8Slogwang     {
1165ac59bc4Slogwang         ScheduleObj::Instance()->ScheduleStartRun();
117a9643ea8Slogwang     }
118a9643ea8Slogwang 
119a9643ea8Slogwang     if (_stack != NULL)
120a9643ea8Slogwang     {
121a9643ea8Slogwang         replace_esp(_jmpbuf, _stack->_esp);
122a9643ea8Slogwang     }
123a9643ea8Slogwang }
124a9643ea8Slogwang 
SwitchContext()125a9643ea8Slogwang void Thread::SwitchContext()
126a9643ea8Slogwang {
127a9643ea8Slogwang     if (save_context(_jmpbuf) == 0)
128a9643ea8Slogwang     {
129a9643ea8Slogwang         ScheduleObj::Instance()->ScheduleThread();
130a9643ea8Slogwang     }
131a9643ea8Slogwang }
132a9643ea8Slogwang 
133*84bcae25Swoolen 
SaveContext()134*84bcae25Swoolen int Thread::SaveContext()
135*84bcae25Swoolen {
136*84bcae25Swoolen     return save_context(_jmpbuf);
137*84bcae25Swoolen }
138*84bcae25Swoolen 
RestoreContext()139a9643ea8Slogwang void Thread::RestoreContext()
140a9643ea8Slogwang {
141a9643ea8Slogwang     restore_context(_jmpbuf, 1);
142a9643ea8Slogwang }
143a9643ea8Slogwang 
1445ac59bc4Slogwang 
Initial()145a9643ea8Slogwang bool Thread::Initial()
146a9643ea8Slogwang {
147a9643ea8Slogwang     if (!InitStack())
148a9643ea8Slogwang     {
149a9643ea8Slogwang         MTLOG_ERROR("init stack failed");
150a9643ea8Slogwang         return false;
151a9643ea8Slogwang     }
152a9643ea8Slogwang 
153a9643ea8Slogwang     InitContext();
154a9643ea8Slogwang 
155a9643ea8Slogwang     return true;
156a9643ea8Slogwang }
157a9643ea8Slogwang 
Destroy()158a9643ea8Slogwang void Thread::Destroy()
159a9643ea8Slogwang {
160a9643ea8Slogwang     FreeStack();
161a9643ea8Slogwang     memset(&_jmpbuf, 0, sizeof(_jmpbuf));
162a9643ea8Slogwang }
163a9643ea8Slogwang 
Reset()164a9643ea8Slogwang void Thread::Reset()
165a9643ea8Slogwang {
166a9643ea8Slogwang     _wakeup_time = 0;
167a9643ea8Slogwang     SetPrivate(NULL);
168a9643ea8Slogwang 
169a9643ea8Slogwang     InitContext();
170a9643ea8Slogwang     CleanState();
171a9643ea8Slogwang }
172a9643ea8Slogwang 
sleep(int ms)173a9643ea8Slogwang void Thread::sleep(int ms)
174a9643ea8Slogwang {
175a9643ea8Slogwang     utime64_t now = ScheduleObj::Instance()->ScheduleGetTime();
176a9643ea8Slogwang     _wakeup_time = now + ms;
177a9643ea8Slogwang 
178a9643ea8Slogwang     if (save_context(_jmpbuf) == 0)
179a9643ea8Slogwang     {
180a9643ea8Slogwang         ScheduleObj::Instance()->ScheduleSleep();
181a9643ea8Slogwang     }
182a9643ea8Slogwang }
183a9643ea8Slogwang 
Wait()184a9643ea8Slogwang void Thread::Wait()
185a9643ea8Slogwang {
186a9643ea8Slogwang     if (save_context(_jmpbuf) == 0)
187a9643ea8Slogwang     {
188a9643ea8Slogwang         ScheduleObj::Instance()->SchedulePend();
189a9643ea8Slogwang     }
190a9643ea8Slogwang }
191a9643ea8Slogwang 
CheckStackHealth(char * esp)192a9643ea8Slogwang bool Thread::CheckStackHealth(char *esp)
193a9643ea8Slogwang {
194a9643ea8Slogwang     if (!_stack)
195a9643ea8Slogwang         return false;
196a9643ea8Slogwang 
197a9643ea8Slogwang     if (esp > _stack->_stk_bottom && esp < _stack->_stk_top)
198a9643ea8Slogwang         return true;
199a9643ea8Slogwang     else
200a9643ea8Slogwang         return false;
201a9643ea8Slogwang }
202a9643ea8Slogwang 
MicroThread(ThreadType type)203a9643ea8Slogwang MicroThread::MicroThread(ThreadType type)
204a9643ea8Slogwang {
205a9643ea8Slogwang     memset(&_entry, 0, sizeof(_entry));
206a9643ea8Slogwang     TAILQ_INIT(&_fdset);
207a9643ea8Slogwang     TAILQ_INIT(&_sub_list);
208a9643ea8Slogwang     _flag = NOT_INLIST;
209a9643ea8Slogwang     _type = type;
210a9643ea8Slogwang     _state = INITIAL;
211a9643ea8Slogwang     _start = NULL;
212a9643ea8Slogwang     _args = NULL;
213a9643ea8Slogwang     _parent = NULL;
214a9643ea8Slogwang }
215a9643ea8Slogwang 
CleanState()216a9643ea8Slogwang void MicroThread::CleanState()
217a9643ea8Slogwang {
218a9643ea8Slogwang     TAILQ_INIT(&_fdset);
219a9643ea8Slogwang     TAILQ_INIT(&_sub_list);
220a9643ea8Slogwang     _flag = NOT_INLIST;
221a9643ea8Slogwang     _type = NORMAL;
222a9643ea8Slogwang     _state = INITIAL;
223a9643ea8Slogwang     _start = NULL;
224a9643ea8Slogwang     _args = NULL;
225a9643ea8Slogwang     _parent = NULL;
226a9643ea8Slogwang }
227a9643ea8Slogwang 
Run()228a9643ea8Slogwang void MicroThread::Run()
229a9643ea8Slogwang {
230a9643ea8Slogwang     if (_start) {
231a9643ea8Slogwang         _start(_args);
232a9643ea8Slogwang     }
233a9643ea8Slogwang 
234a9643ea8Slogwang     if (this->IsSubThread()) {
235a9643ea8Slogwang         this->WakeupParent();
236a9643ea8Slogwang     }
237a9643ea8Slogwang 
238a9643ea8Slogwang     ScheduleObj::Instance()->ScheduleReclaim();
239a9643ea8Slogwang     ScheduleObj::Instance()->ScheduleThread();
240a9643ea8Slogwang }
241a9643ea8Slogwang 
WakeupParent()242a9643ea8Slogwang void MicroThread::WakeupParent()
243a9643ea8Slogwang {
244a9643ea8Slogwang     MicroThread* parent = this->GetParent();
245a9643ea8Slogwang     if (parent)
246a9643ea8Slogwang     {
247a9643ea8Slogwang         parent->RemoveSubThread(this);
248a9643ea8Slogwang         if (parent->HasNoSubThread())
249a9643ea8Slogwang         {
250a9643ea8Slogwang             ScheduleObj::Instance()->ScheduleUnpend(parent);
251a9643ea8Slogwang         }
252a9643ea8Slogwang     }
253a9643ea8Slogwang     else
254a9643ea8Slogwang     {
255a9643ea8Slogwang         MTLOG_ERROR("Sub thread no parent, error");
256a9643ea8Slogwang     }
257a9643ea8Slogwang }
258a9643ea8Slogwang 
HasNoSubThread()259a9643ea8Slogwang bool MicroThread::HasNoSubThread()
260a9643ea8Slogwang {
261a9643ea8Slogwang     return TAILQ_EMPTY(&_sub_list);
262a9643ea8Slogwang }
263a9643ea8Slogwang 
AddSubThread(MicroThread * sub)264a9643ea8Slogwang void MicroThread::AddSubThread(MicroThread* sub)
265a9643ea8Slogwang {
266a9643ea8Slogwang     ASSERT(!sub->HasFlag(MicroThread::SUB_LIST));
267a9643ea8Slogwang     if (!sub->HasFlag(MicroThread::SUB_LIST))
268a9643ea8Slogwang     {
269a9643ea8Slogwang         TAILQ_INSERT_TAIL(&_sub_list, sub, _sub_entry);
270a9643ea8Slogwang         sub->_parent = this;
271a9643ea8Slogwang     }
272a9643ea8Slogwang 
273a9643ea8Slogwang     sub->SetFlag(MicroThread::SUB_LIST);
274a9643ea8Slogwang }
275a9643ea8Slogwang 
RemoveSubThread(MicroThread * sub)276a9643ea8Slogwang void MicroThread::RemoveSubThread(MicroThread* sub)
277a9643ea8Slogwang {
278a9643ea8Slogwang     ASSERT(sub->HasFlag(MicroThread::SUB_LIST));
279a9643ea8Slogwang     if (sub->HasFlag(MicroThread::SUB_LIST))
280a9643ea8Slogwang     {
281a9643ea8Slogwang         TAILQ_REMOVE(&_sub_list, sub, _sub_entry);
282a9643ea8Slogwang         sub->_parent = NULL;
283a9643ea8Slogwang     }
284a9643ea8Slogwang 
285a9643ea8Slogwang     sub->UnsetFlag(MicroThread::SUB_LIST);
286a9643ea8Slogwang }
287a9643ea8Slogwang 
2885ac59bc4Slogwang ScheduleObj *ScheduleObj::_instance = NULL;
Instance()289a9643ea8Slogwang inline ScheduleObj* ScheduleObj::Instance()
290a9643ea8Slogwang {
291a9643ea8Slogwang     if (NULL == _instance)
292a9643ea8Slogwang     {
293a9643ea8Slogwang         _instance = new ScheduleObj();
294a9643ea8Slogwang     }
295a9643ea8Slogwang 
296a9643ea8Slogwang     return _instance;
297a9643ea8Slogwang }
298a9643ea8Slogwang 
ScheduleThread()299a9643ea8Slogwang void ScheduleObj::ScheduleThread()
300a9643ea8Slogwang {
301a9643ea8Slogwang     MtFrame* frame = MtFrame::Instance();
302a9643ea8Slogwang     frame->ThreadSchdule();
303a9643ea8Slogwang }
304a9643ea8Slogwang 
ScheduleGetTime()305a9643ea8Slogwang utime64_t ScheduleObj::ScheduleGetTime()
306a9643ea8Slogwang {
307a9643ea8Slogwang     MtFrame* frame = MtFrame::Instance();
308a9643ea8Slogwang     if (frame)
309a9643ea8Slogwang     {
310a9643ea8Slogwang         return frame->GetLastClock();
311a9643ea8Slogwang     }
312a9643ea8Slogwang     else
313a9643ea8Slogwang     {
314a9643ea8Slogwang         MTLOG_ERROR("frame time failed, maybe not init");
315a9643ea8Slogwang         return 0;
316a9643ea8Slogwang     }
317a9643ea8Slogwang }
318a9643ea8Slogwang 
ScheduleSleep()319a9643ea8Slogwang void ScheduleObj::ScheduleSleep()
320a9643ea8Slogwang {
321a9643ea8Slogwang     MtFrame* frame = MtFrame::Instance();
322a9643ea8Slogwang     MicroThread* thread = frame->GetActiveThread();
323a9643ea8Slogwang     if ((!frame) || (!thread)) {
324a9643ea8Slogwang         MTLOG_ERROR("frame and act thread null, %p, %p", frame, thread);
325a9643ea8Slogwang         return;
326a9643ea8Slogwang     }
327a9643ea8Slogwang 
328a9643ea8Slogwang     frame->InsertSleep(thread);
329a9643ea8Slogwang     frame->ThreadSchdule();
330a9643ea8Slogwang }
331a9643ea8Slogwang 
SchedulePend()332a9643ea8Slogwang void ScheduleObj::SchedulePend()
333a9643ea8Slogwang {
334a9643ea8Slogwang     MtFrame* frame = MtFrame::Instance();
335a9643ea8Slogwang     MicroThread* thread = frame->GetActiveThread();
336a9643ea8Slogwang     if ((!frame) || (!thread)) {
337a9643ea8Slogwang         MTLOG_ERROR("frame and act thread null, %p, %p", frame, thread);
338a9643ea8Slogwang         return;
339a9643ea8Slogwang     }
340a9643ea8Slogwang 
341a9643ea8Slogwang     frame->InsertPend(thread);
342a9643ea8Slogwang     frame->ThreadSchdule();
343a9643ea8Slogwang }
344a9643ea8Slogwang 
ScheduleUnpend(void * pthread)345a9643ea8Slogwang void ScheduleObj::ScheduleUnpend(void* pthread)
346a9643ea8Slogwang {
347a9643ea8Slogwang     MtFrame* frame = MtFrame::Instance();
348a9643ea8Slogwang     MicroThread* thread = (MicroThread*)pthread;
349a9643ea8Slogwang     if ((!frame) || (!thread)) {
350a9643ea8Slogwang         MTLOG_ERROR("frame and act thread null, %p, %p", frame, thread);
351a9643ea8Slogwang         return;
352a9643ea8Slogwang     }
353a9643ea8Slogwang 
354a9643ea8Slogwang     frame->RemovePend(thread);
355a9643ea8Slogwang     frame->InsertRunable(thread);
356a9643ea8Slogwang }
357a9643ea8Slogwang 
ScheduleReclaim()358a9643ea8Slogwang void ScheduleObj::ScheduleReclaim()
359a9643ea8Slogwang {
360a9643ea8Slogwang     MtFrame* frame = MtFrame::Instance();
361a9643ea8Slogwang     MicroThread* thread = frame->GetActiveThread();
362a9643ea8Slogwang     if ((!frame) || (!thread)) {
363a9643ea8Slogwang         MTLOG_ERROR("frame and act thread null, %p, %p", frame, thread);
364a9643ea8Slogwang         return;
365a9643ea8Slogwang     }
366a9643ea8Slogwang 
367a9643ea8Slogwang     frame->FreeThread(thread);
368a9643ea8Slogwang }
369a9643ea8Slogwang 
ScheduleStartRun()370a9643ea8Slogwang void ScheduleObj::ScheduleStartRun()
371a9643ea8Slogwang {
372a9643ea8Slogwang     MtFrame* frame = MtFrame::Instance();
373a9643ea8Slogwang     MicroThread* thread = frame->GetActiveThread();
374a9643ea8Slogwang     if ((!frame) || (!thread)) {
375a9643ea8Slogwang         MTLOG_ERROR("frame and act thread null, %p, %p", frame, thread);
376a9643ea8Slogwang         return;
377a9643ea8Slogwang     }
378a9643ea8Slogwang 
379a9643ea8Slogwang     thread->Run();
380a9643ea8Slogwang }
381a9643ea8Slogwang 
382a9643ea8Slogwang 
3835ac59bc4Slogwang unsigned int ThreadPool::default_thread_num = DEFAULT_THREAD_NUM;   ///< 2000 micro threads.
384*84bcae25Swoolen unsigned int ThreadPool::last_default_thread_num = DEFAULT_THREAD_NUM;   ///< 2000 micro threads.
3855ac59bc4Slogwang unsigned int ThreadPool::default_stack_size = DEFAULT_STACK_SIZE;   ///< 128k stack.
386a9643ea8Slogwang 
InitialPool(int max_num)387a9643ea8Slogwang bool ThreadPool::InitialPool(int max_num)
388a9643ea8Slogwang {
389a9643ea8Slogwang     MicroThread *thread = NULL;
390a9643ea8Slogwang     for (unsigned int i = 0; i < default_thread_num; i++)
391a9643ea8Slogwang     {
392a9643ea8Slogwang         thread = new MicroThread();
393a9643ea8Slogwang         if ((NULL == thread) || (false == thread->Initial()))
394a9643ea8Slogwang         {
395a9643ea8Slogwang             MTLOG_ERROR("init pool, thread %p init failed", thread);
396a9643ea8Slogwang             if (thread)  delete thread;
397a9643ea8Slogwang             continue;
398a9643ea8Slogwang         }
399a9643ea8Slogwang         thread->SetFlag(MicroThread::FREE_LIST);
400a9643ea8Slogwang         _freelist.push(thread);
401a9643ea8Slogwang     }
402a9643ea8Slogwang 
403a9643ea8Slogwang     _total_num = _freelist.size();
404a9643ea8Slogwang     _max_num  = max_num;
405a9643ea8Slogwang     _use_num = 0;
406a9643ea8Slogwang     if (_total_num <= 0)
407a9643ea8Slogwang     {
408a9643ea8Slogwang         return false;
409a9643ea8Slogwang     }
410a9643ea8Slogwang     else
411a9643ea8Slogwang     {
412a9643ea8Slogwang         return true;
413a9643ea8Slogwang     }
414a9643ea8Slogwang }
415a9643ea8Slogwang 
DestroyPool()416a9643ea8Slogwang void ThreadPool::DestroyPool()
417a9643ea8Slogwang {
418a9643ea8Slogwang     MicroThread* thread = NULL;
419a9643ea8Slogwang     while (!_freelist.empty())
420a9643ea8Slogwang     {
421a9643ea8Slogwang         thread = _freelist.front();
422a9643ea8Slogwang         _freelist.pop();
423a9643ea8Slogwang         thread->Destroy();
424a9643ea8Slogwang         delete thread;
425a9643ea8Slogwang     }
426a9643ea8Slogwang 
427a9643ea8Slogwang     _total_num = 0;
428a9643ea8Slogwang     _use_num = 0;
429a9643ea8Slogwang }
430a9643ea8Slogwang 
AllocThread()431a9643ea8Slogwang MicroThread* ThreadPool::AllocThread()
432a9643ea8Slogwang {
4335ac59bc4Slogwang     MT_ATTR_API_SET(492069, _total_num);
434a9643ea8Slogwang 
435a9643ea8Slogwang     MicroThread* thread = NULL;
436a9643ea8Slogwang     if (!_freelist.empty())
437a9643ea8Slogwang     {
438a9643ea8Slogwang         thread = _freelist.front();
439a9643ea8Slogwang         _freelist.pop();
440a9643ea8Slogwang 
441a9643ea8Slogwang         ASSERT(thread->HasFlag(MicroThread::FREE_LIST));
442a9643ea8Slogwang 
443a9643ea8Slogwang         thread->UnsetFlag(MicroThread::FREE_LIST);
444a9643ea8Slogwang         _use_num++;
445a9643ea8Slogwang         return thread;
446a9643ea8Slogwang     }
447a9643ea8Slogwang 
448a9643ea8Slogwang     MT_ATTR_API(320846, 1); // pool no nore
449a9643ea8Slogwang     if (_total_num >= _max_num)
450a9643ea8Slogwang     {
451a9643ea8Slogwang         MT_ATTR_API(361140, 1); // no more quota
452*84bcae25Swoolen         MTLOG_ERROR("total %d is outof max: %d", _total_num,_max_num);
453a9643ea8Slogwang         return NULL;
454a9643ea8Slogwang     }
455a9643ea8Slogwang 
456a9643ea8Slogwang     thread = new MicroThread();
457a9643ea8Slogwang     if ((NULL == thread) || (false == thread->Initial()))
458a9643ea8Slogwang     {
459a9643ea8Slogwang         MT_ATTR_API(320847, 1); // pool init fail
460a9643ea8Slogwang         MTLOG_ERROR("thread alloc failed, thread: %p", thread);
461a9643ea8Slogwang         if (thread)  delete thread;
462a9643ea8Slogwang         return NULL;
463a9643ea8Slogwang     }
464a9643ea8Slogwang     _total_num++;
465a9643ea8Slogwang     _use_num++;
466*84bcae25Swoolen     if(_use_num >(int) default_thread_num){
467*84bcae25Swoolen         if(((int) default_thread_num * 2 )< _max_num){
468*84bcae25Swoolen             last_default_thread_num = default_thread_num;
469*84bcae25Swoolen             default_thread_num = default_thread_num * 2;
470*84bcae25Swoolen         }
471*84bcae25Swoolen     }
472a9643ea8Slogwang 
473a9643ea8Slogwang     return thread;
474a9643ea8Slogwang }
475a9643ea8Slogwang 
FreeThread(MicroThread * thread)476a9643ea8Slogwang void ThreadPool::FreeThread(MicroThread* thread)
477a9643ea8Slogwang {
478a9643ea8Slogwang     ASSERT(!thread->HasFlag(MicroThread::FREE_LIST));
479a9643ea8Slogwang     thread->Reset();
480a9643ea8Slogwang     _use_num--;
481a9643ea8Slogwang     _freelist.push(thread);
482a9643ea8Slogwang     thread->SetFlag(MicroThread::FREE_LIST);
483a9643ea8Slogwang 
484a9643ea8Slogwang     unsigned int free_num = _freelist.size();
485a9643ea8Slogwang     if ((free_num > default_thread_num) && (free_num > 1))
486a9643ea8Slogwang     {
487a9643ea8Slogwang         thread = _freelist.front();
488a9643ea8Slogwang         _freelist.pop();
489a9643ea8Slogwang         thread->Destroy();
490a9643ea8Slogwang         delete thread;
491a9643ea8Slogwang         _total_num--;
492*84bcae25Swoolen         if(default_thread_num / 2 >= DEFAULT_THREAD_NUM){
493*84bcae25Swoolen             last_default_thread_num = default_thread_num;
494*84bcae25Swoolen             default_thread_num = default_thread_num / 2;
495*84bcae25Swoolen         }
496a9643ea8Slogwang     }
497a9643ea8Slogwang }
498a9643ea8Slogwang 
GetUsedNum(void)499a9643ea8Slogwang int ThreadPool::GetUsedNum(void)
500a9643ea8Slogwang {
501a9643ea8Slogwang     return _use_num;
502a9643ea8Slogwang }
503a9643ea8Slogwang 
504a9643ea8Slogwang MtFrame *MtFrame::_instance = NULL;
Instance()505a9643ea8Slogwang inline MtFrame* MtFrame::Instance ()
506a9643ea8Slogwang {
507a9643ea8Slogwang     if (NULL == _instance )
508a9643ea8Slogwang     {
509a9643ea8Slogwang         _instance = new MtFrame();
510a9643ea8Slogwang     }
511a9643ea8Slogwang 
512a9643ea8Slogwang     return _instance;
513a9643ea8Slogwang }
514a9643ea8Slogwang 
SetHookFlag()515a9643ea8Slogwang void MtFrame::SetHookFlag() {
516a9643ea8Slogwang     mt_set_hook_flag();
517a9643ea8Slogwang };
518a9643ea8Slogwang 
InitFrame(LogAdapter * logadpt,int max_thread_num)519a9643ea8Slogwang bool MtFrame::InitFrame(LogAdapter* logadpt, int max_thread_num)
520a9643ea8Slogwang {
521*84bcae25Swoolen     if(logadpt == NULL){
522*84bcae25Swoolen         _log_adpt = &def_log_adapt;
523*84bcae25Swoolen     }else{
524a9643ea8Slogwang         _log_adpt = logadpt;
525*84bcae25Swoolen     }
526a9643ea8Slogwang 
527a9643ea8Slogwang     if ((this->InitKqueue(max_thread_num) < 0) || !this->InitialPool(max_thread_num))
528a9643ea8Slogwang     {
529a9643ea8Slogwang         MTLOG_ERROR("Init epoll or thread pool failed");
530a9643ea8Slogwang         this->Destroy();
531a9643ea8Slogwang         return false;
532a9643ea8Slogwang     }
533a9643ea8Slogwang     if (_sleeplist.HeapResize(max_thread_num * 2) < 0)
534a9643ea8Slogwang     {
535a9643ea8Slogwang         MTLOG_ERROR("Init heap list failed");
536a9643ea8Slogwang         this->Destroy();
537a9643ea8Slogwang         return false;
538a9643ea8Slogwang     }
539a9643ea8Slogwang 
540a9643ea8Slogwang     _timer = new CTimerMng(max_thread_num * 2);
541a9643ea8Slogwang     if (NULL == _timer)
542a9643ea8Slogwang     {
543a9643ea8Slogwang         MTLOG_ERROR("Init heap timer failed");
544a9643ea8Slogwang         this->Destroy();
545a9643ea8Slogwang         return false;
546a9643ea8Slogwang     }
547a9643ea8Slogwang 
548a9643ea8Slogwang     _daemon = AllocThread();
549a9643ea8Slogwang     if (NULL == _daemon)
550a9643ea8Slogwang     {
551a9643ea8Slogwang         MTLOG_ERROR("Alloc daemon thread failed");
552a9643ea8Slogwang         this->Destroy();
553a9643ea8Slogwang         return false;
554a9643ea8Slogwang     }
555a9643ea8Slogwang     _daemon->SetType(MicroThread::DAEMON);
556a9643ea8Slogwang     _daemon->SetState(MicroThread::RUNABLE);
557a9643ea8Slogwang     _daemon->SetSartFunc(MtFrame::DaemonRun, this);
558a9643ea8Slogwang 
559a9643ea8Slogwang     _primo = new MicroThread(MicroThread::PRIMORDIAL);
560a9643ea8Slogwang     if (NULL == _primo)
561a9643ea8Slogwang     {
562a9643ea8Slogwang         MTLOG_ERROR("new _primo thread failed");
563a9643ea8Slogwang         this->Destroy();
564a9643ea8Slogwang         return false;
565a9643ea8Slogwang     }
566a9643ea8Slogwang     _primo->SetState(MicroThread::RUNNING);
567a9643ea8Slogwang     SetActiveThread(_primo);
568a9643ea8Slogwang 
569a9643ea8Slogwang     _last_clock = GetSystemMS();
570a9643ea8Slogwang     TAILQ_INIT(&_iolist);
571a9643ea8Slogwang     TAILQ_INIT(&_pend_list);
572a9643ea8Slogwang 
573a9643ea8Slogwang     //SetHookFlag();
574a9643ea8Slogwang 
575a9643ea8Slogwang     return true;
576a9643ea8Slogwang 
577a9643ea8Slogwang }
578a9643ea8Slogwang 
Destroy(void)579a9643ea8Slogwang void MtFrame::Destroy(void)
580a9643ea8Slogwang {
581a9643ea8Slogwang     if (NULL == _instance )
582a9643ea8Slogwang     {
583a9643ea8Slogwang         return;
584a9643ea8Slogwang     }
585a9643ea8Slogwang 
586a9643ea8Slogwang     if (_primo) {
587a9643ea8Slogwang         delete _primo;
588a9643ea8Slogwang         _primo = NULL;
589a9643ea8Slogwang     }
590a9643ea8Slogwang 
591a9643ea8Slogwang     if (_daemon) {
592a9643ea8Slogwang         FreeThread(_daemon);
593a9643ea8Slogwang         _daemon = NULL;
594a9643ea8Slogwang     }
595a9643ea8Slogwang 
596a9643ea8Slogwang     TAILQ_INIT(&_iolist);
597a9643ea8Slogwang 
598a9643ea8Slogwang     MicroThread* thread = dynamic_cast<MicroThread*>(_sleeplist.HeapPop());
599a9643ea8Slogwang     while (thread)
600a9643ea8Slogwang     {
601a9643ea8Slogwang         FreeThread(thread);
602a9643ea8Slogwang         thread = dynamic_cast<MicroThread*>(_sleeplist.HeapPop());
603a9643ea8Slogwang     }
604a9643ea8Slogwang 
605a9643ea8Slogwang     while (!_runlist.empty())
606a9643ea8Slogwang     {
607a9643ea8Slogwang         thread = _runlist.front();
608a9643ea8Slogwang         _runlist.pop();
609a9643ea8Slogwang         FreeThread(thread);
610a9643ea8Slogwang     }
611a9643ea8Slogwang 
612a9643ea8Slogwang     MicroThread* tmp;
613a9643ea8Slogwang     TAILQ_FOREACH_SAFE(thread, &_pend_list, _entry, tmp)
614a9643ea8Slogwang     {
615a9643ea8Slogwang         TAILQ_REMOVE(&_pend_list, thread, _entry);
616a9643ea8Slogwang         FreeThread(thread);
617a9643ea8Slogwang     }
618a9643ea8Slogwang 
619a9643ea8Slogwang     if (_timer != NULL)
620a9643ea8Slogwang     {
621a9643ea8Slogwang         delete _timer;
622a9643ea8Slogwang         _timer = NULL;
623a9643ea8Slogwang     }
624a9643ea8Slogwang 
625a9643ea8Slogwang     _instance->DestroyPool();
626a9643ea8Slogwang     _instance->TermKqueue();
627a9643ea8Slogwang     delete _instance;
628a9643ea8Slogwang     _instance = NULL;
629a9643ea8Slogwang }
630a9643ea8Slogwang 
Version()631a9643ea8Slogwang char* MtFrame::Version()
632a9643ea8Slogwang {
633a9643ea8Slogwang     return IMT_VERSION;
634a9643ea8Slogwang }
635a9643ea8Slogwang 
CreateThread(ThreadStart entry,void * args,bool runable)636a9643ea8Slogwang MicroThread* MtFrame::CreateThread(ThreadStart entry, void *args, bool runable)
637a9643ea8Slogwang {
638a9643ea8Slogwang     MtFrame* mtframe = MtFrame::Instance();
639a9643ea8Slogwang     MicroThread* thread = mtframe->AllocThread();
640a9643ea8Slogwang     if (NULL == thread)
641a9643ea8Slogwang     {
642a9643ea8Slogwang         MTLOG_ERROR("create thread failed");
643a9643ea8Slogwang         return NULL;
644a9643ea8Slogwang     }
645a9643ea8Slogwang     thread->SetSartFunc(entry, args);
646a9643ea8Slogwang 
647a9643ea8Slogwang     if (runable) {
648a9643ea8Slogwang         mtframe->InsertRunable(thread);
649a9643ea8Slogwang     }
650a9643ea8Slogwang 
651a9643ea8Slogwang     return thread;
652a9643ea8Slogwang }
653a9643ea8Slogwang 
Loop(void * args)654a9643ea8Slogwang int MtFrame::Loop(void* args)
655a9643ea8Slogwang {
656a9643ea8Slogwang     MtFrame* mtframe = MtFrame::Instance();
657a9643ea8Slogwang     MicroThread* daemon = mtframe->DaemonThread();
658a9643ea8Slogwang 
659a9643ea8Slogwang     mtframe->KqueueDispatch();
660a9643ea8Slogwang     mtframe->SetLastClock(mtframe->GetSystemMS());
661a9643ea8Slogwang     mtframe->WakeupTimeout();
662a9643ea8Slogwang     mtframe->CheckExpired();
663a9643ea8Slogwang     daemon->SwitchContext();
664a9643ea8Slogwang 
665a9643ea8Slogwang     return 0;
666a9643ea8Slogwang }
667a9643ea8Slogwang 
DaemonRun(void * args)668a9643ea8Slogwang void MtFrame::DaemonRun(void* args)
669a9643ea8Slogwang {
670a9643ea8Slogwang     /*
671a9643ea8Slogwang     MtFrame* mtframe = MtFrame::Instance();
672a9643ea8Slogwang     MicroThread* daemon = mtframe->DaemonThread();
673a9643ea8Slogwang 
674a9643ea8Slogwang     while (true) {
675a9643ea8Slogwang         mtframe->KqueueDispatch();
676a9643ea8Slogwang         mtframe->SetLastClock(mtframe->GetSystemMS());
677a9643ea8Slogwang         mtframe->WakeupTimeout();
678a9643ea8Slogwang         mtframe->CheckExpired();
679a9643ea8Slogwang         daemon->SwitchContext();
680a9643ea8Slogwang     }
681a9643ea8Slogwang     */
682a9643ea8Slogwang     ff_run(MtFrame::Loop, NULL);
683a9643ea8Slogwang }
684a9643ea8Slogwang 
GetRootThread()685a9643ea8Slogwang MicroThread *MtFrame::GetRootThread()
686a9643ea8Slogwang {
687a9643ea8Slogwang     if (NULL == _curr_thread)
688a9643ea8Slogwang     {
689a9643ea8Slogwang         return NULL;
690a9643ea8Slogwang     }
691a9643ea8Slogwang 
692a9643ea8Slogwang     MicroThread::ThreadType type = _curr_thread->GetType();
693a9643ea8Slogwang     MicroThread *thread = _curr_thread;
694a9643ea8Slogwang     MicroThread *parent = thread;
695a9643ea8Slogwang 
696a9643ea8Slogwang     while (MicroThread::SUB_THREAD == type)
697a9643ea8Slogwang     {
698a9643ea8Slogwang         thread = thread->GetParent();
699a9643ea8Slogwang         if (!thread)
700a9643ea8Slogwang         {
701a9643ea8Slogwang             break;
702a9643ea8Slogwang         }
703a9643ea8Slogwang 
704a9643ea8Slogwang         type   = thread->GetType();
705a9643ea8Slogwang         parent = thread;
706a9643ea8Slogwang     }
707a9643ea8Slogwang 
708a9643ea8Slogwang     return parent;
709a9643ea8Slogwang }
710a9643ea8Slogwang 
ThreadSchdule()711a9643ea8Slogwang void MtFrame::ThreadSchdule()
712a9643ea8Slogwang {
713a9643ea8Slogwang     MicroThread* thread = NULL;
714a9643ea8Slogwang     MtFrame* mtframe = MtFrame::Instance();
715a9643ea8Slogwang 
716a9643ea8Slogwang     if (mtframe->_runlist.empty())
717a9643ea8Slogwang     {
718a9643ea8Slogwang         thread = mtframe->DaemonThread();
719a9643ea8Slogwang     }
720a9643ea8Slogwang     else
721a9643ea8Slogwang     {
722a9643ea8Slogwang         thread = mtframe->_runlist.front();
723a9643ea8Slogwang         mtframe->RemoveRunable(thread);
724a9643ea8Slogwang     }
725a9643ea8Slogwang 
726a9643ea8Slogwang     this->SetActiveThread(thread);
727a9643ea8Slogwang     thread->SetState(MicroThread::RUNNING);
728a9643ea8Slogwang     thread->RestoreContext();
729a9643ea8Slogwang }
730a9643ea8Slogwang 
CheckExpired()731a9643ea8Slogwang void MtFrame::CheckExpired()
732a9643ea8Slogwang {
733a9643ea8Slogwang     static utime64_t check_time = 0;
734a9643ea8Slogwang 
735a9643ea8Slogwang     if (_timer != NULL)
736a9643ea8Slogwang     {
737a9643ea8Slogwang         _timer->check_expired();
738a9643ea8Slogwang     }
739a9643ea8Slogwang 
740a9643ea8Slogwang    utime64_t now = GetLastClock();
741a9643ea8Slogwang 
742a9643ea8Slogwang     if ((now - check_time) > 1000)
743a9643ea8Slogwang     {
744a9643ea8Slogwang         CNetMgr::Instance()->RecycleObjs(now);
745a9643ea8Slogwang         check_time = now;
746a9643ea8Slogwang     }
747a9643ea8Slogwang }
748a9643ea8Slogwang 
WakeupTimeout()749a9643ea8Slogwang void MtFrame::WakeupTimeout()
750a9643ea8Slogwang {
751a9643ea8Slogwang     utime64_t now = GetLastClock();
752a9643ea8Slogwang     MicroThread* thread = dynamic_cast<MicroThread*>(_sleeplist.HeapTop());
753a9643ea8Slogwang     while (thread && (thread->GetWakeupTime() <= now))
754a9643ea8Slogwang     {
755a9643ea8Slogwang         if (thread->HasFlag(MicroThread::IO_LIST))
756a9643ea8Slogwang         {
757a9643ea8Slogwang             RemoveIoWait(thread);
758a9643ea8Slogwang         }
759a9643ea8Slogwang         else
760a9643ea8Slogwang         {
761a9643ea8Slogwang             RemoveSleep(thread);
762a9643ea8Slogwang         }
763a9643ea8Slogwang 
764a9643ea8Slogwang         InsertRunable(thread);
765a9643ea8Slogwang 
766a9643ea8Slogwang         thread = dynamic_cast<MicroThread*>(_sleeplist.HeapTop());
767a9643ea8Slogwang     }
768a9643ea8Slogwang }
769a9643ea8Slogwang 
KqueueGetTimeout()770a9643ea8Slogwang int MtFrame::KqueueGetTimeout()
771a9643ea8Slogwang {
772a9643ea8Slogwang     utime64_t now = GetLastClock();
773a9643ea8Slogwang     MicroThread* thread = dynamic_cast<MicroThread*>(_sleeplist.HeapTop());
774a9643ea8Slogwang     if (!thread)
775a9643ea8Slogwang     {
7765ac59bc4Slogwang         return 10; //default 10ms epollwait
777a9643ea8Slogwang     }
778a9643ea8Slogwang     else if (thread->GetWakeupTime() < now)
779a9643ea8Slogwang     {
780a9643ea8Slogwang         return 0;
781a9643ea8Slogwang     }
782a9643ea8Slogwang     else
783a9643ea8Slogwang     {
784a9643ea8Slogwang         return (int)(thread->GetWakeupTime() - now);
785a9643ea8Slogwang     }
786a9643ea8Slogwang }
787a9643ea8Slogwang 
InsertSleep(MicroThread * thread)788a9643ea8Slogwang inline void MtFrame::InsertSleep(MicroThread* thread)
789a9643ea8Slogwang {
790a9643ea8Slogwang     ASSERT(!thread->HasFlag(MicroThread::SLEEP_LIST));
791a9643ea8Slogwang 
792a9643ea8Slogwang     thread->SetFlag(MicroThread::SLEEP_LIST);
793a9643ea8Slogwang     thread->SetState(MicroThread::SLEEPING);
794a9643ea8Slogwang     int rc = _sleeplist.HeapPush(thread);
795a9643ea8Slogwang     if (rc < 0)
796a9643ea8Slogwang     {
797a9643ea8Slogwang         MT_ATTR_API(320848, 1); // heap error
798a9643ea8Slogwang         MTLOG_ERROR("Insert heap failed , rc %d", rc);
799a9643ea8Slogwang     }
800a9643ea8Slogwang }
801a9643ea8Slogwang 
RemoveSleep(MicroThread * thread)802a9643ea8Slogwang inline void MtFrame::RemoveSleep(MicroThread* thread)
803a9643ea8Slogwang {
804a9643ea8Slogwang     ASSERT(thread->HasFlag(MicroThread::SLEEP_LIST));
805a9643ea8Slogwang     thread->UnsetFlag(MicroThread::SLEEP_LIST);
806a9643ea8Slogwang 
807a9643ea8Slogwang     int rc = _sleeplist.HeapDelete(thread);
808a9643ea8Slogwang     if (rc < 0)
809a9643ea8Slogwang     {
810a9643ea8Slogwang         MT_ATTR_API(320849, 1); // heap error
811a9643ea8Slogwang         MTLOG_ERROR("remove heap failed , rc %d", rc);
812a9643ea8Slogwang     }
813a9643ea8Slogwang }
814a9643ea8Slogwang 
InsertIoWait(MicroThread * thread)815a9643ea8Slogwang inline void MtFrame::InsertIoWait(MicroThread* thread)
816a9643ea8Slogwang {
817a9643ea8Slogwang     ASSERT(!thread->HasFlag(MicroThread::IO_LIST));
818a9643ea8Slogwang     thread->SetFlag(MicroThread::IO_LIST);
819a9643ea8Slogwang     TAILQ_INSERT_TAIL(&_iolist, thread, _entry);
820a9643ea8Slogwang     InsertSleep(thread);
821a9643ea8Slogwang }
822a9643ea8Slogwang 
RemoveIoWait(MicroThread * thread)823a9643ea8Slogwang void MtFrame::RemoveIoWait(MicroThread* thread)
824a9643ea8Slogwang {
825a9643ea8Slogwang     ASSERT(thread->HasFlag(MicroThread::IO_LIST));
826a9643ea8Slogwang     thread->UnsetFlag(MicroThread::IO_LIST);
827a9643ea8Slogwang     TAILQ_REMOVE(&_iolist, thread, _entry);
828a9643ea8Slogwang 
829a9643ea8Slogwang     RemoveSleep(thread);
830a9643ea8Slogwang }
831a9643ea8Slogwang 
InsertRunable(MicroThread * thread)832a9643ea8Slogwang void MtFrame::InsertRunable(MicroThread* thread)
833a9643ea8Slogwang {
834a9643ea8Slogwang     ASSERT(!thread->HasFlag(MicroThread::RUN_LIST));
835a9643ea8Slogwang     thread->SetFlag(MicroThread::RUN_LIST);
836a9643ea8Slogwang 
837a9643ea8Slogwang     thread->SetState(MicroThread::RUNABLE);
838a9643ea8Slogwang     _runlist.push(thread);
839a9643ea8Slogwang     _waitnum++;
840a9643ea8Slogwang }
841a9643ea8Slogwang 
RemoveRunable(MicroThread * thread)842a9643ea8Slogwang inline void MtFrame::RemoveRunable(MicroThread* thread)
843a9643ea8Slogwang {
844a9643ea8Slogwang     ASSERT(thread->HasFlag(MicroThread::RUN_LIST));
845a9643ea8Slogwang     ASSERT(thread == _runlist.front());
846a9643ea8Slogwang     thread->UnsetFlag(MicroThread::RUN_LIST);
847a9643ea8Slogwang 
848a9643ea8Slogwang     _runlist.pop();
849a9643ea8Slogwang     _waitnum--;
850a9643ea8Slogwang }
851a9643ea8Slogwang 
InsertPend(MicroThread * thread)852a9643ea8Slogwang void MtFrame::InsertPend(MicroThread* thread)
853a9643ea8Slogwang {
854a9643ea8Slogwang     ASSERT(!thread->HasFlag(MicroThread::PEND_LIST));
855a9643ea8Slogwang     thread->SetFlag(MicroThread::PEND_LIST);
856a9643ea8Slogwang     TAILQ_INSERT_TAIL(&_pend_list, thread, _entry);
857a9643ea8Slogwang     thread->SetState(MicroThread::PENDING);
858a9643ea8Slogwang }
859a9643ea8Slogwang 
RemovePend(MicroThread * thread)860a9643ea8Slogwang void MtFrame::RemovePend(MicroThread* thread)
861a9643ea8Slogwang {
862a9643ea8Slogwang     ASSERT(thread->HasFlag(MicroThread::PEND_LIST));
863a9643ea8Slogwang     thread->UnsetFlag(MicroThread::PEND_LIST);
864a9643ea8Slogwang     TAILQ_REMOVE(&_pend_list, thread, _entry);
865a9643ea8Slogwang }
866a9643ea8Slogwang 
WaitNotify(utime64_t timeout)867a9643ea8Slogwang void MtFrame::WaitNotify(utime64_t timeout)
868a9643ea8Slogwang {
869a9643ea8Slogwang     MicroThread* thread = GetActiveThread();
870a9643ea8Slogwang 
871a9643ea8Slogwang     thread->SetWakeupTime(timeout + this->GetLastClock());
872a9643ea8Slogwang     this->InsertIoWait(thread);
873a9643ea8Slogwang     thread->SwitchContext();
874a9643ea8Slogwang }
875a9643ea8Slogwang 
NotifyThread(MicroThread * thread)876*84bcae25Swoolen void MtFrame::NotifyThread(MicroThread* thread)
877*84bcae25Swoolen {
878*84bcae25Swoolen     if(thread == NULL){
879*84bcae25Swoolen         return;
880*84bcae25Swoolen     }
881*84bcae25Swoolen     MicroThread* cur_thread = GetActiveThread();
882*84bcae25Swoolen     if (thread->HasFlag(MicroThread::IO_LIST))
883*84bcae25Swoolen     {
884*84bcae25Swoolen         this->RemoveIoWait(thread);
885*84bcae25Swoolen         if(cur_thread == this->DaemonThread()){
886*84bcae25Swoolen             // ���ﲻֱ���еĻ�,���Dz���ʱ,�ᵼ��Ŀ���̵߳ȴ�����ʱ
887*84bcae25Swoolen             if(cur_thread->SaveContext() == 0){
888*84bcae25Swoolen                 this->SetActiveThread(thread);
889*84bcae25Swoolen                 thread->SetState(MicroThread::RUNNING);
890*84bcae25Swoolen                 thread->RestoreContext();
891*84bcae25Swoolen             }
892*84bcae25Swoolen         }else{
893*84bcae25Swoolen             this->InsertRunable(thread);
894*84bcae25Swoolen         }
895*84bcae25Swoolen     }
896*84bcae25Swoolen }
897*84bcae25Swoolen 
SwapDaemonThread()898*84bcae25Swoolen void MtFrame::SwapDaemonThread()
899*84bcae25Swoolen {
900*84bcae25Swoolen     MicroThread* thread = GetActiveThread();
901*84bcae25Swoolen     MicroThread* daemon_thread = this->DaemonThread();
902*84bcae25Swoolen     if(thread != daemon_thread){
903*84bcae25Swoolen         if(thread->SaveContext() == 0){
904*84bcae25Swoolen             this->InsertRunable(thread);
905*84bcae25Swoolen             this->SetActiveThread(daemon_thread);
906*84bcae25Swoolen             daemon_thread->SetState(MicroThread::RUNNING);
907*84bcae25Swoolen             daemon_thread->RestoreContext();
908*84bcae25Swoolen         }
909*84bcae25Swoolen     }
910*84bcae25Swoolen }
911*84bcae25Swoolen 
KqueueSchedule(KqObjList * fdlist,KqueuerObj * fd,int timeout)912a9643ea8Slogwang bool MtFrame::KqueueSchedule(KqObjList* fdlist, KqueuerObj* fd, int timeout)
913a9643ea8Slogwang {
914a9643ea8Slogwang     MicroThread* thread = GetActiveThread();
915a9643ea8Slogwang     if (NULL == thread)
916a9643ea8Slogwang     {
917a9643ea8Slogwang         MTLOG_ERROR("active thread null, epoll schedule failed");
918a9643ea8Slogwang         return false;
919a9643ea8Slogwang     }
920a9643ea8Slogwang 
921a9643ea8Slogwang     thread->ClearAllFd();
922a9643ea8Slogwang     if (fdlist)
923a9643ea8Slogwang     {
924a9643ea8Slogwang         thread->AddFdList(fdlist);
925a9643ea8Slogwang     }
926a9643ea8Slogwang     if (fd)
927a9643ea8Slogwang     {
928a9643ea8Slogwang         thread->AddFd(fd);
929a9643ea8Slogwang     }
930a9643ea8Slogwang 
931a9643ea8Slogwang     thread->SetWakeupTime(timeout + this->GetLastClock());
932a9643ea8Slogwang     if (!this->KqueueAdd(thread->GetFdSet()))
933a9643ea8Slogwang     {
934a9643ea8Slogwang         MTLOG_ERROR("epoll add failed, errno: %d", errno);
935a9643ea8Slogwang         return false;
936a9643ea8Slogwang     }
937a9643ea8Slogwang     this->InsertIoWait(thread);
938a9643ea8Slogwang     thread->SwitchContext();
939a9643ea8Slogwang 
940a9643ea8Slogwang     int rcvnum = 0;
941a9643ea8Slogwang     KqObjList& rcvfds = thread->GetFdSet();
942a9643ea8Slogwang     KqueuerObj* fdata = NULL;
943a9643ea8Slogwang     TAILQ_FOREACH(fdata, &rcvfds, _entry)
944a9643ea8Slogwang     {
945a9643ea8Slogwang         if (fdata->GetRcvEvents() != 0)
946a9643ea8Slogwang         {
947a9643ea8Slogwang             rcvnum++;
948a9643ea8Slogwang         }
949a9643ea8Slogwang     }
9505ac59bc4Slogwang     this->KqueueDel(rcvfds);
951a9643ea8Slogwang 
9525ac59bc4Slogwang     if (rcvnum == 0)
953a9643ea8Slogwang     {
954a9643ea8Slogwang         errno = ETIME;
955a9643ea8Slogwang         return false;
956a9643ea8Slogwang     }
957a9643ea8Slogwang 
958a9643ea8Slogwang     return true;
959a9643ea8Slogwang }
960a9643ea8Slogwang 
recvfrom(int fd,void * buf,int len,int flags,struct sockaddr * from,socklen_t * fromlen,int timeout)961a9643ea8Slogwang int MtFrame::recvfrom(int fd, void *buf, int len, int flags, struct sockaddr *from, socklen_t *fromlen, int timeout)
962a9643ea8Slogwang {
963a9643ea8Slogwang     MtFrame* mtframe = MtFrame::Instance();
964a9643ea8Slogwang     utime64_t start = mtframe->GetLastClock();
965a9643ea8Slogwang     MicroThread* thread = mtframe->GetActiveThread();
966a9643ea8Slogwang     utime64_t now = 0;
967a9643ea8Slogwang 
968a9643ea8Slogwang     if(fd<0 || !buf || len<1)
969a9643ea8Slogwang     {
970a9643ea8Slogwang         errno = EINVAL;
971a9643ea8Slogwang         MTLOG_ERROR("recvfrom failed, errno: %d (%m)", errno);
972a9643ea8Slogwang         return -10;
973a9643ea8Slogwang     }
974a9643ea8Slogwang 
975a9643ea8Slogwang     if (timeout <= -1)
976a9643ea8Slogwang     {
977a9643ea8Slogwang         timeout = 0x7fffffff;
978a9643ea8Slogwang     }
979a9643ea8Slogwang 
980a9643ea8Slogwang     while (true)
981a9643ea8Slogwang     {
982a9643ea8Slogwang         now = mtframe->GetLastClock();
983a9643ea8Slogwang         if ((int)(now - start) > timeout)
984a9643ea8Slogwang         {
985a9643ea8Slogwang             errno = ETIME;
986a9643ea8Slogwang             return -1;
987a9643ea8Slogwang         }
988a9643ea8Slogwang 
989a9643ea8Slogwang         KqueuerObj epfd;
990a9643ea8Slogwang         epfd.SetOsfd(fd);
991a9643ea8Slogwang         epfd.EnableInput();
992a9643ea8Slogwang         epfd.SetOwnerThread(thread);
993a9643ea8Slogwang         if (!mtframe->KqueueSchedule(NULL, &epfd, timeout))
994a9643ea8Slogwang         {
995a9643ea8Slogwang             MTLOG_DEBUG("epoll schedule failed, errno: %d", errno);
996a9643ea8Slogwang             return -2;
997a9643ea8Slogwang         }
998a9643ea8Slogwang 
999a9643ea8Slogwang         mt_hook_syscall(recvfrom);
1000a9643ea8Slogwang         int n = ff_hook_recvfrom(fd, buf, len, flags, from, fromlen);
1001a9643ea8Slogwang         if (n < 0)
1002a9643ea8Slogwang         {
1003a9643ea8Slogwang             if (errno == EINTR) {
1004a9643ea8Slogwang                 continue;
1005a9643ea8Slogwang             }
1006a9643ea8Slogwang 
1007a9643ea8Slogwang             if ((errno != EAGAIN) && (errno != EWOULDBLOCK))
1008a9643ea8Slogwang             {
1009a9643ea8Slogwang                 MTLOG_ERROR("recvfrom failed, errno: %d", errno);
1010a9643ea8Slogwang                 return -3;
1011a9643ea8Slogwang             }
1012a9643ea8Slogwang         }
1013a9643ea8Slogwang         else
1014a9643ea8Slogwang         {
1015a9643ea8Slogwang             return n;
1016a9643ea8Slogwang         }
1017a9643ea8Slogwang     }
1018a9643ea8Slogwang 
1019a9643ea8Slogwang }
1020a9643ea8Slogwang 
sendto(int fd,const void * msg,int len,int flags,const struct sockaddr * to,int tolen,int timeout)1021a9643ea8Slogwang int MtFrame::sendto(int fd, const void *msg, int len, int flags, const struct sockaddr *to, int tolen, int timeout)
1022a9643ea8Slogwang {
1023a9643ea8Slogwang     MtFrame* mtframe = MtFrame::Instance();
1024a9643ea8Slogwang     utime64_t start = mtframe->GetLastClock();
1025a9643ea8Slogwang     MicroThread* thread = mtframe->GetActiveThread();
1026a9643ea8Slogwang     utime64_t now = 0;
1027a9643ea8Slogwang 
1028a9643ea8Slogwang     if(fd<0 || !msg || len<1)
1029a9643ea8Slogwang     {
1030a9643ea8Slogwang         errno = EINVAL;
1031a9643ea8Slogwang         MTLOG_ERROR("sendto failed, errno: %d (%m)", errno);
1032a9643ea8Slogwang         return -10;
1033a9643ea8Slogwang     }
1034a9643ea8Slogwang 
1035a9643ea8Slogwang     int n = 0;
1036a9643ea8Slogwang     mt_hook_syscall(sendto);
1037a9643ea8Slogwang     while ((n = ff_hook_sendto(fd, msg, len, flags, to, tolen)) < 0)
1038a9643ea8Slogwang     {
1039a9643ea8Slogwang         now = mtframe->GetLastClock();
1040a9643ea8Slogwang         if ((int)(now - start) > timeout)
1041a9643ea8Slogwang         {
1042a9643ea8Slogwang             errno = ETIME;
1043a9643ea8Slogwang             return -1;
1044a9643ea8Slogwang         }
1045a9643ea8Slogwang 
1046a9643ea8Slogwang         if (errno == EINTR) {
1047a9643ea8Slogwang             continue;
1048a9643ea8Slogwang         }
1049a9643ea8Slogwang 
1050a9643ea8Slogwang         if ((errno != EAGAIN) && (errno != EWOULDBLOCK)) {
1051a9643ea8Slogwang             MTLOG_ERROR("sendto failed, errno: %d", errno);
1052a9643ea8Slogwang             return -2;
1053a9643ea8Slogwang         }
1054a9643ea8Slogwang 
1055a9643ea8Slogwang         KqueuerObj epfd;
1056a9643ea8Slogwang         epfd.SetOsfd(fd);
1057a9643ea8Slogwang         epfd.EnableOutput();
1058a9643ea8Slogwang         epfd.SetOwnerThread(thread);
1059a9643ea8Slogwang         if (!mtframe->KqueueSchedule(NULL, &epfd, timeout)) {
1060a9643ea8Slogwang             return -3;
1061a9643ea8Slogwang         }
1062a9643ea8Slogwang     }
1063a9643ea8Slogwang 
1064a9643ea8Slogwang     return n;
1065a9643ea8Slogwang }
1066a9643ea8Slogwang 
connect(int fd,const struct sockaddr * addr,int addrlen,int timeout)1067a9643ea8Slogwang int MtFrame::connect(int fd, const struct sockaddr *addr, int addrlen, int timeout)
1068a9643ea8Slogwang {
1069a9643ea8Slogwang     MtFrame* mtframe = MtFrame::Instance();
1070a9643ea8Slogwang     utime64_t start = mtframe->GetLastClock();
1071a9643ea8Slogwang     MicroThread* thread = mtframe->GetActiveThread();
1072a9643ea8Slogwang     utime64_t now = 0;
1073a9643ea8Slogwang 
1074a9643ea8Slogwang     if(fd<0 || !addr || addrlen<1)
1075a9643ea8Slogwang     {
1076a9643ea8Slogwang         errno = EINVAL;
1077a9643ea8Slogwang         MTLOG_ERROR("connect failed, errno: %d (%m)", errno);
1078a9643ea8Slogwang         return -10;
1079a9643ea8Slogwang     }
1080a9643ea8Slogwang 
1081a9643ea8Slogwang     int n = 0;
1082a9643ea8Slogwang     mt_hook_syscall(connect);
1083a9643ea8Slogwang     while ((n = ff_hook_connect(fd, addr, addrlen)) < 0)
1084a9643ea8Slogwang     {
1085a9643ea8Slogwang         now = mtframe->GetLastClock();
1086a9643ea8Slogwang         if ((int)(now - start) > timeout)
1087a9643ea8Slogwang         {
1088a9643ea8Slogwang             errno = ETIME;
1089a9643ea8Slogwang             return -1;
1090a9643ea8Slogwang         }
1091a9643ea8Slogwang 
10925ac59bc4Slogwang         if (errno == EISCONN)
1093a9643ea8Slogwang         {
1094a9643ea8Slogwang             return 0;
1095a9643ea8Slogwang         }
1096a9643ea8Slogwang 
1097a9643ea8Slogwang         if (errno == EINTR) {
1098a9643ea8Slogwang             continue;
1099a9643ea8Slogwang         }
1100a9643ea8Slogwang 
1101a9643ea8Slogwang         if (errno != EINPROGRESS) {
1102a9643ea8Slogwang             MTLOG_ERROR("connect failed, errno: %d", errno);
1103a9643ea8Slogwang             return -2;
1104a9643ea8Slogwang         }
1105a9643ea8Slogwang 
1106a9643ea8Slogwang         KqueuerObj epfd;
1107a9643ea8Slogwang         epfd.SetOsfd(fd);
1108a9643ea8Slogwang         epfd.EnableOutput();
1109a9643ea8Slogwang         epfd.SetOwnerThread(thread);
1110a9643ea8Slogwang         if (!mtframe->KqueueSchedule(NULL, &epfd, timeout)) {
1111a9643ea8Slogwang             return -3;
1112a9643ea8Slogwang         }
1113a9643ea8Slogwang     }
1114a9643ea8Slogwang 
1115a9643ea8Slogwang     return n;
1116a9643ea8Slogwang }
1117a9643ea8Slogwang 
accept(int fd,struct sockaddr * addr,socklen_t * addrlen,int timeout)1118a9643ea8Slogwang int MtFrame::accept(int fd, struct sockaddr *addr, socklen_t *addrlen, int timeout)
1119a9643ea8Slogwang {
1120a9643ea8Slogwang     MtFrame* mtframe = MtFrame::Instance();
1121a9643ea8Slogwang     utime64_t start = mtframe->GetLastClock();
1122a9643ea8Slogwang     MicroThread* thread = mtframe->GetActiveThread();
1123a9643ea8Slogwang     utime64_t now = 0;
1124a9643ea8Slogwang 
1125a9643ea8Slogwang     if(fd<0)
1126a9643ea8Slogwang     {
1127a9643ea8Slogwang         errno = EINVAL;
1128a9643ea8Slogwang         MTLOG_ERROR("accept failed, errno: %d (%m)", errno);
1129a9643ea8Slogwang         return -10;
1130a9643ea8Slogwang     }
1131a9643ea8Slogwang 
1132a9643ea8Slogwang     int acceptfd = 0;
1133a9643ea8Slogwang     mt_hook_syscall(accept);
1134a9643ea8Slogwang     while ((acceptfd = ff_hook_accept(fd, addr, addrlen)) < 0)
1135a9643ea8Slogwang     {
1136a9643ea8Slogwang         now = mtframe->GetLastClock();
1137a9643ea8Slogwang         if ((int)(now - start) > timeout)
1138a9643ea8Slogwang         {
1139a9643ea8Slogwang             errno = ETIME;
1140a9643ea8Slogwang             return -1;
1141a9643ea8Slogwang         }
1142a9643ea8Slogwang 
1143a9643ea8Slogwang         if (errno == EINTR) {
1144a9643ea8Slogwang             continue;
1145a9643ea8Slogwang         }
1146a9643ea8Slogwang 
1147a9643ea8Slogwang         if (!((errno == EAGAIN) || (errno == EWOULDBLOCK))) {
1148a9643ea8Slogwang             MTLOG_ERROR("accept failed, errno: %d", errno);
1149a9643ea8Slogwang             return -2;
1150a9643ea8Slogwang         }
1151a9643ea8Slogwang 
1152a9643ea8Slogwang         KqueuerObj epfd;
1153a9643ea8Slogwang         epfd.SetOsfd(fd);
1154a9643ea8Slogwang         epfd.EnableInput();
1155a9643ea8Slogwang         epfd.SetOwnerThread(thread);
1156a9643ea8Slogwang         if (!mtframe->KqueueSchedule(NULL, &epfd, timeout)) {
1157a9643ea8Slogwang             return -3;
1158a9643ea8Slogwang         }
1159a9643ea8Slogwang     }
1160a9643ea8Slogwang 
1161a9643ea8Slogwang     return acceptfd;
1162a9643ea8Slogwang }
1163a9643ea8Slogwang 
read(int fd,void * buf,size_t nbyte,int timeout)1164a9643ea8Slogwang ssize_t MtFrame::read(int fd, void *buf, size_t nbyte, int timeout)
1165a9643ea8Slogwang {
1166a9643ea8Slogwang     MtFrame* mtframe = MtFrame::Instance();
1167a9643ea8Slogwang     utime64_t start = mtframe->GetLastClock();
1168a9643ea8Slogwang     MicroThread* thread = mtframe->GetActiveThread();
1169a9643ea8Slogwang     utime64_t now = 0;
1170a9643ea8Slogwang 
1171a9643ea8Slogwang     if(fd<0 || !buf || nbyte<1)
1172a9643ea8Slogwang     {
1173a9643ea8Slogwang         errno = EINVAL;
1174a9643ea8Slogwang         MTLOG_ERROR("read failed, errno: %d (%m)", errno);
1175a9643ea8Slogwang         return -10;
1176a9643ea8Slogwang     }
1177a9643ea8Slogwang 
1178a9643ea8Slogwang     ssize_t n = 0;
1179a9643ea8Slogwang     mt_hook_syscall(read);
1180a9643ea8Slogwang     while ((n = ff_hook_read(fd, buf, nbyte)) < 0)
1181a9643ea8Slogwang     {
1182a9643ea8Slogwang         now = mtframe->GetLastClock();
1183a9643ea8Slogwang         if ((int)(now - start) > timeout)
1184a9643ea8Slogwang         {
1185a9643ea8Slogwang             errno = ETIME;
1186a9643ea8Slogwang             return -1;
1187a9643ea8Slogwang         }
1188a9643ea8Slogwang 
1189a9643ea8Slogwang         if (errno == EINTR) {
1190a9643ea8Slogwang             continue;
1191a9643ea8Slogwang         }
1192a9643ea8Slogwang 
1193a9643ea8Slogwang         if ((errno != EAGAIN) && (errno != EWOULDBLOCK)) {
1194a9643ea8Slogwang             MTLOG_ERROR("read failed, errno: %d", errno);
1195a9643ea8Slogwang             return -2;
1196a9643ea8Slogwang         }
1197a9643ea8Slogwang 
1198a9643ea8Slogwang         KqueuerObj epfd;
1199a9643ea8Slogwang         epfd.SetOsfd(fd);
1200a9643ea8Slogwang         epfd.EnableInput();
1201a9643ea8Slogwang         epfd.SetOwnerThread(thread);
1202a9643ea8Slogwang         if (!mtframe->KqueueSchedule(NULL, &epfd, timeout)) {
1203a9643ea8Slogwang             return -3;
1204a9643ea8Slogwang         }
1205a9643ea8Slogwang     }
1206a9643ea8Slogwang 
1207a9643ea8Slogwang     return n;
1208a9643ea8Slogwang }
1209a9643ea8Slogwang 
write(int fd,const void * buf,size_t nbyte,int timeout)1210a9643ea8Slogwang ssize_t MtFrame::write(int fd, const void *buf, size_t nbyte, int timeout)
1211a9643ea8Slogwang {
1212a9643ea8Slogwang     MtFrame* mtframe = MtFrame::Instance();
1213a9643ea8Slogwang     utime64_t start = mtframe->GetLastClock();
1214a9643ea8Slogwang     MicroThread* thread = mtframe->GetActiveThread();
1215a9643ea8Slogwang     utime64_t now = 0;
1216a9643ea8Slogwang 
1217a9643ea8Slogwang     if(fd<0 || !buf || nbyte<1)
1218a9643ea8Slogwang     {
1219a9643ea8Slogwang         errno = EINVAL;
1220a9643ea8Slogwang         MTLOG_ERROR("write failed, errno: %d (%m)", errno);
1221a9643ea8Slogwang         return -10;
1222a9643ea8Slogwang     }
1223a9643ea8Slogwang 
1224a9643ea8Slogwang     ssize_t n = 0;
1225a9643ea8Slogwang     size_t send_len = 0;
1226a9643ea8Slogwang     while (send_len < nbyte)
1227a9643ea8Slogwang     {
1228a9643ea8Slogwang         now = mtframe->GetLastClock();
1229a9643ea8Slogwang         if ((int)(now - start) > timeout)
1230a9643ea8Slogwang         {
1231a9643ea8Slogwang             errno = ETIME;
1232a9643ea8Slogwang             return -1;
1233a9643ea8Slogwang         }
1234a9643ea8Slogwang 
1235a9643ea8Slogwang         mt_hook_syscall(write);
1236a9643ea8Slogwang         n = ff_hook_write(fd, (char*)buf + send_len, nbyte - send_len);
1237a9643ea8Slogwang         if (n < 0)
1238a9643ea8Slogwang         {
1239a9643ea8Slogwang             if (errno == EINTR) {
1240a9643ea8Slogwang                 continue;
1241a9643ea8Slogwang             }
1242a9643ea8Slogwang 
1243a9643ea8Slogwang             if ((errno != EAGAIN) && (errno != EWOULDBLOCK)) {
1244a9643ea8Slogwang                 MTLOG_ERROR("write failed, errno: %d", errno);
1245a9643ea8Slogwang                 return -2;
1246a9643ea8Slogwang             }
1247a9643ea8Slogwang         }
1248a9643ea8Slogwang         else
1249a9643ea8Slogwang         {
1250a9643ea8Slogwang             send_len += n;
1251a9643ea8Slogwang             if (send_len >= nbyte) {
1252a9643ea8Slogwang                 return nbyte;
1253a9643ea8Slogwang             }
1254a9643ea8Slogwang         }
1255a9643ea8Slogwang 
1256a9643ea8Slogwang         KqueuerObj epfd;
1257a9643ea8Slogwang         epfd.SetOsfd(fd);
1258a9643ea8Slogwang         epfd.EnableOutput();
1259a9643ea8Slogwang         epfd.SetOwnerThread(thread);
1260a9643ea8Slogwang         if (!mtframe->KqueueSchedule(NULL, &epfd, timeout)) {
1261a9643ea8Slogwang             return -3;
1262a9643ea8Slogwang         }
1263a9643ea8Slogwang     }
1264a9643ea8Slogwang 
1265a9643ea8Slogwang     return nbyte;
1266a9643ea8Slogwang }
1267a9643ea8Slogwang 
recv(int fd,void * buf,int len,int flags,int timeout)1268a9643ea8Slogwang int MtFrame::recv(int fd, void *buf, int len, int flags, int timeout)
1269a9643ea8Slogwang {
1270a9643ea8Slogwang     MtFrame* mtframe = MtFrame::Instance();
1271a9643ea8Slogwang     utime64_t start = mtframe->GetLastClock();
1272a9643ea8Slogwang     MicroThread* thread = mtframe->GetActiveThread();
1273a9643ea8Slogwang     utime64_t now = 0;
1274a9643ea8Slogwang 
1275a9643ea8Slogwang     if(fd<0 || !buf || len<1)
1276a9643ea8Slogwang     {
1277a9643ea8Slogwang         errno = EINVAL;
1278a9643ea8Slogwang         MTLOG_ERROR("recv failed, errno: %d (%m)", errno);
1279a9643ea8Slogwang         return -10;
1280a9643ea8Slogwang     }
1281a9643ea8Slogwang 
1282a9643ea8Slogwang     if (timeout <= -1)
1283a9643ea8Slogwang     {
1284a9643ea8Slogwang         timeout = 0x7fffffff;
1285a9643ea8Slogwang     }
1286a9643ea8Slogwang 
1287a9643ea8Slogwang     while (true)
1288a9643ea8Slogwang     {
1289a9643ea8Slogwang         now = mtframe->GetLastClock();
1290a9643ea8Slogwang         if ((int)(now - start) > timeout)
1291a9643ea8Slogwang         {
1292a9643ea8Slogwang             errno = ETIME;
1293a9643ea8Slogwang             return -1;
1294a9643ea8Slogwang         }
1295a9643ea8Slogwang 
1296a9643ea8Slogwang         KqueuerObj epfd;
1297a9643ea8Slogwang         epfd.SetOsfd(fd);
1298a9643ea8Slogwang         epfd.EnableInput();
1299a9643ea8Slogwang         epfd.SetOwnerThread(thread);
1300a9643ea8Slogwang         if (!mtframe->KqueueSchedule(NULL, &epfd, timeout))
1301a9643ea8Slogwang         {
1302a9643ea8Slogwang             MTLOG_DEBUG("epoll schedule failed, errno: %d", errno);
1303a9643ea8Slogwang             return -2;
1304a9643ea8Slogwang         }
1305a9643ea8Slogwang 
1306a9643ea8Slogwang         mt_hook_syscall(recv);
1307a9643ea8Slogwang         int n = ff_hook_recv(fd, buf, len, flags);
1308a9643ea8Slogwang         if (n < 0)
1309a9643ea8Slogwang         {
1310a9643ea8Slogwang             if (errno == EINTR) {
1311a9643ea8Slogwang                 continue;
1312a9643ea8Slogwang             }
1313a9643ea8Slogwang 
1314a9643ea8Slogwang             if ((errno != EAGAIN) && (errno != EWOULDBLOCK))
1315a9643ea8Slogwang             {
1316a9643ea8Slogwang                 MTLOG_ERROR("recv failed, errno: %d", errno);
1317a9643ea8Slogwang                 return -3;
1318a9643ea8Slogwang             }
1319a9643ea8Slogwang         }
1320a9643ea8Slogwang         else
1321a9643ea8Slogwang         {
1322a9643ea8Slogwang             return n;
1323a9643ea8Slogwang         }
1324a9643ea8Slogwang     }
1325a9643ea8Slogwang 
1326a9643ea8Slogwang }
1327a9643ea8Slogwang 
send(int fd,const void * buf,size_t nbyte,int flags,int timeout)1328a9643ea8Slogwang ssize_t MtFrame::send(int fd, const void *buf, size_t nbyte, int flags, int timeout)
1329a9643ea8Slogwang {
1330a9643ea8Slogwang     MtFrame* mtframe = MtFrame::Instance();
1331a9643ea8Slogwang     utime64_t start = mtframe->GetLastClock();
1332a9643ea8Slogwang     MicroThread* thread = mtframe->GetActiveThread();
1333a9643ea8Slogwang     utime64_t now = 0;
1334a9643ea8Slogwang 
1335a9643ea8Slogwang     if(fd<0 || !buf || nbyte<1)
1336a9643ea8Slogwang     {
1337a9643ea8Slogwang         errno = EINVAL;
1338a9643ea8Slogwang         MTLOG_ERROR("send failed, errno: %d (%m)", errno);
1339a9643ea8Slogwang         return -10;
1340a9643ea8Slogwang     }
1341a9643ea8Slogwang 
1342a9643ea8Slogwang     ssize_t n = 0;
1343a9643ea8Slogwang     size_t send_len = 0;
1344a9643ea8Slogwang     while (send_len < nbyte)
1345a9643ea8Slogwang     {
1346a9643ea8Slogwang         now = mtframe->GetLastClock();
1347a9643ea8Slogwang         if ((int)(now - start) > timeout)
1348a9643ea8Slogwang         {
1349a9643ea8Slogwang             errno = ETIME;
1350a9643ea8Slogwang             return -1;
1351a9643ea8Slogwang         }
1352a9643ea8Slogwang 
1353a9643ea8Slogwang         mt_hook_syscall(send);
1354a9643ea8Slogwang         n = ff_hook_send(fd, (char*)buf + send_len, nbyte - send_len, flags);
1355a9643ea8Slogwang         if (n < 0)
1356a9643ea8Slogwang         {
1357a9643ea8Slogwang             if (errno == EINTR) {
1358a9643ea8Slogwang                 continue;
1359a9643ea8Slogwang             }
1360a9643ea8Slogwang 
1361a9643ea8Slogwang             if ((errno != EAGAIN) && (errno != EWOULDBLOCK)) {
1362a9643ea8Slogwang                 MTLOG_ERROR("write failed, errno: %d", errno);
1363a9643ea8Slogwang                 return -2;
1364a9643ea8Slogwang             }
1365a9643ea8Slogwang         }
1366a9643ea8Slogwang         else
1367a9643ea8Slogwang         {
1368a9643ea8Slogwang             send_len += n;
1369a9643ea8Slogwang             if (send_len >= nbyte) {
1370a9643ea8Slogwang                 return nbyte;
1371a9643ea8Slogwang             }
1372a9643ea8Slogwang         }
1373a9643ea8Slogwang 
1374a9643ea8Slogwang         KqueuerObj epfd;
1375a9643ea8Slogwang         epfd.SetOsfd(fd);
1376a9643ea8Slogwang         epfd.EnableOutput();
1377a9643ea8Slogwang         epfd.SetOwnerThread(thread);
1378a9643ea8Slogwang         if (!mtframe->KqueueSchedule(NULL, &epfd, timeout)) {
1379a9643ea8Slogwang             return -3;
1380a9643ea8Slogwang         }
1381a9643ea8Slogwang     }
1382a9643ea8Slogwang 
1383a9643ea8Slogwang     return nbyte;
1384a9643ea8Slogwang }
1385a9643ea8Slogwang 
sleep(int ms)1386a9643ea8Slogwang void MtFrame::sleep(int ms)
1387a9643ea8Slogwang {
1388a9643ea8Slogwang     MtFrame* frame = MtFrame::Instance();
1389a9643ea8Slogwang     MicroThread* thread = frame->GetActiveThread();
1390a9643ea8Slogwang     if (thread != NULL)
1391a9643ea8Slogwang     {
1392a9643ea8Slogwang         thread->sleep(ms);
1393a9643ea8Slogwang     }
1394a9643ea8Slogwang }
1395a9643ea8Slogwang 
WaitEvents(int fd,int events,int timeout)1396a9643ea8Slogwang int MtFrame::WaitEvents(int fd, int events, int timeout)
1397a9643ea8Slogwang {
1398a9643ea8Slogwang     MtFrame* mtframe = MtFrame::Instance();
1399a9643ea8Slogwang     utime64_t start = mtframe->GetLastClock();
1400a9643ea8Slogwang     MicroThread* thread = mtframe->GetActiveThread();
1401a9643ea8Slogwang     utime64_t now = 0;
1402a9643ea8Slogwang 
1403a9643ea8Slogwang     if (timeout <= -1)
1404a9643ea8Slogwang     {
1405a9643ea8Slogwang         timeout = 0x7fffffff;
1406a9643ea8Slogwang     }
1407a9643ea8Slogwang 
1408a9643ea8Slogwang     while (true)
1409a9643ea8Slogwang     {
1410a9643ea8Slogwang         now = mtframe->GetLastClock();
1411a9643ea8Slogwang         if ((int)(now - start) > timeout)
1412a9643ea8Slogwang         {
1413a9643ea8Slogwang             errno = ETIME;
1414a9643ea8Slogwang             return 0;
1415a9643ea8Slogwang         }
1416a9643ea8Slogwang 
1417a9643ea8Slogwang         KqueuerObj epfd;
1418a9643ea8Slogwang         epfd.SetOsfd(fd);
1419a9643ea8Slogwang         if (events & KQ_EVENT_READ)
1420a9643ea8Slogwang         {
1421a9643ea8Slogwang             epfd.EnableInput();
1422a9643ea8Slogwang         }
1423a9643ea8Slogwang         if (events & KQ_EVENT_WRITE)
1424a9643ea8Slogwang         {
1425a9643ea8Slogwang             epfd.EnableOutput();
1426a9643ea8Slogwang         }
1427a9643ea8Slogwang         epfd.SetOwnerThread(thread);
1428a9643ea8Slogwang 
1429a9643ea8Slogwang         if (!mtframe->KqueueSchedule(NULL, &epfd, timeout))
1430a9643ea8Slogwang         {
1431a9643ea8Slogwang             MTLOG_TRACE("epoll schedule failed, errno: %d", errno);
1432a9643ea8Slogwang             return 0;
1433a9643ea8Slogwang         }
1434a9643ea8Slogwang 
1435a9643ea8Slogwang         return epfd.GetRcvEvents();
1436a9643ea8Slogwang     }
1437a9643ea8Slogwang }
1438