1a9643ea8Slogwang
2a9643ea8Slogwang /**
3a9643ea8Slogwang * Tencent is pleased to support the open source community by making MSEC available.
4a9643ea8Slogwang *
5a9643ea8Slogwang * Copyright (C) 2016 THL A29 Limited, a Tencent company. All rights reserved.
6a9643ea8Slogwang *
7a9643ea8Slogwang * Licensed under the GNU General Public License, Version 2.0 (the "License");
8a9643ea8Slogwang * you may not use this file except in compliance with the License. You may
9a9643ea8Slogwang * obtain a copy of the License at
10a9643ea8Slogwang *
11a9643ea8Slogwang * https://opensource.org/licenses/GPL-2.0
12a9643ea8Slogwang *
13a9643ea8Slogwang * Unless required by applicable law or agreed to in writing, software distributed under the
14a9643ea8Slogwang * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
15a9643ea8Slogwang * either express or implied. See the License for the specific language governing permissions
16a9643ea8Slogwang * and limitations under the License.
17a9643ea8Slogwang */
18a9643ea8Slogwang
19a9643ea8Slogwang
20a9643ea8Slogwang /**
21a9643ea8Slogwang * @filename micro_thread.cpp
22a9643ea8Slogwang * @info micro thread manager
23a9643ea8Slogwang */
24a9643ea8Slogwang #include "mt_version.h"
25a9643ea8Slogwang #include "micro_thread.h"
26a9643ea8Slogwang #include "mt_net.h"
27a9643ea8Slogwang #include "valgrind.h"
28a9643ea8Slogwang #include <assert.h>
29a9643ea8Slogwang #include "mt_sys_hook.h"
30a9643ea8Slogwang #include "ff_hook.h"
31a9643ea8Slogwang #include "ff_api.h"
32a9643ea8Slogwang
33a9643ea8Slogwang using namespace NS_MICRO_THREAD;
34a9643ea8Slogwang
35a9643ea8Slogwang #define ASSERT(statement)
36a9643ea8Slogwang //#define ASSERT(statement) assert(statement)
37a9643ea8Slogwang
38a9643ea8Slogwang extern "C" int save_context(jmp_buf jbf);
39a9643ea8Slogwang
40a9643ea8Slogwang extern "C" void restore_context(jmp_buf jbf, int ret);
41a9643ea8Slogwang
42a9643ea8Slogwang extern "C" void replace_esp(jmp_buf jbf, void* esp);
43a9643ea8Slogwang
Thread(int stack_size)44a9643ea8Slogwang Thread::Thread(int stack_size)
45a9643ea8Slogwang {
46a9643ea8Slogwang _stack_size = stack_size ? stack_size : ThreadPool::default_stack_size;
47a9643ea8Slogwang _wakeup_time = 0;
48a9643ea8Slogwang _stack = NULL;
49a9643ea8Slogwang memset(&_jmpbuf, 0, sizeof(_jmpbuf));
50a9643ea8Slogwang }
51a9643ea8Slogwang
52*84bcae25Swoolen static DefaultLogAdapter def_log_adapt;
53a9643ea8Slogwang /**
545ac59bc4Slogwang * @brief LINUX x86/x86_64's allocated stacks.
55a9643ea8Slogwang */
InitStack()56a9643ea8Slogwang bool Thread::InitStack()
57a9643ea8Slogwang {
58a9643ea8Slogwang if (_stack) {
59a9643ea8Slogwang return true;
60a9643ea8Slogwang }
61a9643ea8Slogwang
625ac59bc4Slogwang ///< stack index and memory are separated to prevent out of bounds.
63a9643ea8Slogwang _stack = (MtStack*)calloc(1, sizeof(MtStack));
64a9643ea8Slogwang if (NULL == _stack)
65a9643ea8Slogwang {
66a9643ea8Slogwang MTLOG_ERROR("calloc stack failed, size %u", sizeof(MtStack));
67a9643ea8Slogwang return false;
68a9643ea8Slogwang }
69a9643ea8Slogwang
70a9643ea8Slogwang int memsize = MEM_PAGE_SIZE*2 + _stack_size;
71a9643ea8Slogwang memsize = (memsize + MEM_PAGE_SIZE - 1)/MEM_PAGE_SIZE*MEM_PAGE_SIZE;
72a9643ea8Slogwang
73a9643ea8Slogwang static int zero_fd = -1;
74a9643ea8Slogwang int mmap_flags = MAP_PRIVATE | MAP_ANON;
75a9643ea8Slogwang void* vaddr = mmap(NULL, memsize, PROT_READ | PROT_WRITE, mmap_flags, zero_fd, 0);
76a9643ea8Slogwang if (vaddr == (void *)MAP_FAILED)
77a9643ea8Slogwang {
78*84bcae25Swoolen MTLOG_ERROR("mmap stack failed, size %d,errmsg: %s.", memsize,strerror(errno));
79a9643ea8Slogwang free(_stack);
80a9643ea8Slogwang _stack = NULL;
81a9643ea8Slogwang return false;
82a9643ea8Slogwang }
83a9643ea8Slogwang _stack->_vaddr = (char*)vaddr;
84a9643ea8Slogwang _stack->_vaddr_size = memsize;
85a9643ea8Slogwang _stack->_stk_size = _stack_size;
86a9643ea8Slogwang _stack->_stk_bottom = _stack->_vaddr + MEM_PAGE_SIZE;
87a9643ea8Slogwang _stack->_stk_top = _stack->_stk_bottom + _stack->_stk_size;
88a9643ea8Slogwang // valgrind support: register stack frame
89a9643ea8Slogwang _stack->valgrind_id = VALGRIND_STACK_REGISTER(_stack->_stk_bottom, _stack->_stk_top);
90a9643ea8Slogwang
91a9643ea8Slogwang _stack->_esp = _stack->_stk_top - STACK_PAD_SIZE;
92a9643ea8Slogwang
93a9643ea8Slogwang mprotect(_stack->_vaddr, MEM_PAGE_SIZE, PROT_NONE);
94a9643ea8Slogwang mprotect(_stack->_stk_top, MEM_PAGE_SIZE, PROT_NONE);
95a9643ea8Slogwang
96a9643ea8Slogwang return true;
97a9643ea8Slogwang }
98a9643ea8Slogwang
99a9643ea8Slogwang
FreeStack()100a9643ea8Slogwang void Thread::FreeStack()
101a9643ea8Slogwang {
102a9643ea8Slogwang if (!_stack) {
103a9643ea8Slogwang return;
104a9643ea8Slogwang }
105a9643ea8Slogwang munmap(_stack->_vaddr, _stack->_vaddr_size);
106a9643ea8Slogwang // valgrind support: deregister stack frame
107a9643ea8Slogwang VALGRIND_STACK_DEREGISTER(_stack->valgrind_id);
108a9643ea8Slogwang free(_stack);
109a9643ea8Slogwang _stack = NULL;
110a9643ea8Slogwang }
111a9643ea8Slogwang
InitContext()112a9643ea8Slogwang void Thread::InitContext()
113a9643ea8Slogwang {
114a9643ea8Slogwang if (save_context(_jmpbuf) != 0)
115a9643ea8Slogwang {
1165ac59bc4Slogwang ScheduleObj::Instance()->ScheduleStartRun();
117a9643ea8Slogwang }
118a9643ea8Slogwang
119a9643ea8Slogwang if (_stack != NULL)
120a9643ea8Slogwang {
121a9643ea8Slogwang replace_esp(_jmpbuf, _stack->_esp);
122a9643ea8Slogwang }
123a9643ea8Slogwang }
124a9643ea8Slogwang
SwitchContext()125a9643ea8Slogwang void Thread::SwitchContext()
126a9643ea8Slogwang {
127a9643ea8Slogwang if (save_context(_jmpbuf) == 0)
128a9643ea8Slogwang {
129a9643ea8Slogwang ScheduleObj::Instance()->ScheduleThread();
130a9643ea8Slogwang }
131a9643ea8Slogwang }
132a9643ea8Slogwang
133*84bcae25Swoolen
SaveContext()134*84bcae25Swoolen int Thread::SaveContext()
135*84bcae25Swoolen {
136*84bcae25Swoolen return save_context(_jmpbuf);
137*84bcae25Swoolen }
138*84bcae25Swoolen
RestoreContext()139a9643ea8Slogwang void Thread::RestoreContext()
140a9643ea8Slogwang {
141a9643ea8Slogwang restore_context(_jmpbuf, 1);
142a9643ea8Slogwang }
143a9643ea8Slogwang
1445ac59bc4Slogwang
Initial()145a9643ea8Slogwang bool Thread::Initial()
146a9643ea8Slogwang {
147a9643ea8Slogwang if (!InitStack())
148a9643ea8Slogwang {
149a9643ea8Slogwang MTLOG_ERROR("init stack failed");
150a9643ea8Slogwang return false;
151a9643ea8Slogwang }
152a9643ea8Slogwang
153a9643ea8Slogwang InitContext();
154a9643ea8Slogwang
155a9643ea8Slogwang return true;
156a9643ea8Slogwang }
157a9643ea8Slogwang
Destroy()158a9643ea8Slogwang void Thread::Destroy()
159a9643ea8Slogwang {
160a9643ea8Slogwang FreeStack();
161a9643ea8Slogwang memset(&_jmpbuf, 0, sizeof(_jmpbuf));
162a9643ea8Slogwang }
163a9643ea8Slogwang
Reset()164a9643ea8Slogwang void Thread::Reset()
165a9643ea8Slogwang {
166a9643ea8Slogwang _wakeup_time = 0;
167a9643ea8Slogwang SetPrivate(NULL);
168a9643ea8Slogwang
169a9643ea8Slogwang InitContext();
170a9643ea8Slogwang CleanState();
171a9643ea8Slogwang }
172a9643ea8Slogwang
sleep(int ms)173a9643ea8Slogwang void Thread::sleep(int ms)
174a9643ea8Slogwang {
175a9643ea8Slogwang utime64_t now = ScheduleObj::Instance()->ScheduleGetTime();
176a9643ea8Slogwang _wakeup_time = now + ms;
177a9643ea8Slogwang
178a9643ea8Slogwang if (save_context(_jmpbuf) == 0)
179a9643ea8Slogwang {
180a9643ea8Slogwang ScheduleObj::Instance()->ScheduleSleep();
181a9643ea8Slogwang }
182a9643ea8Slogwang }
183a9643ea8Slogwang
Wait()184a9643ea8Slogwang void Thread::Wait()
185a9643ea8Slogwang {
186a9643ea8Slogwang if (save_context(_jmpbuf) == 0)
187a9643ea8Slogwang {
188a9643ea8Slogwang ScheduleObj::Instance()->SchedulePend();
189a9643ea8Slogwang }
190a9643ea8Slogwang }
191a9643ea8Slogwang
CheckStackHealth(char * esp)192a9643ea8Slogwang bool Thread::CheckStackHealth(char *esp)
193a9643ea8Slogwang {
194a9643ea8Slogwang if (!_stack)
195a9643ea8Slogwang return false;
196a9643ea8Slogwang
197a9643ea8Slogwang if (esp > _stack->_stk_bottom && esp < _stack->_stk_top)
198a9643ea8Slogwang return true;
199a9643ea8Slogwang else
200a9643ea8Slogwang return false;
201a9643ea8Slogwang }
202a9643ea8Slogwang
MicroThread(ThreadType type)203a9643ea8Slogwang MicroThread::MicroThread(ThreadType type)
204a9643ea8Slogwang {
205a9643ea8Slogwang memset(&_entry, 0, sizeof(_entry));
206a9643ea8Slogwang TAILQ_INIT(&_fdset);
207a9643ea8Slogwang TAILQ_INIT(&_sub_list);
208a9643ea8Slogwang _flag = NOT_INLIST;
209a9643ea8Slogwang _type = type;
210a9643ea8Slogwang _state = INITIAL;
211a9643ea8Slogwang _start = NULL;
212a9643ea8Slogwang _args = NULL;
213a9643ea8Slogwang _parent = NULL;
214a9643ea8Slogwang }
215a9643ea8Slogwang
CleanState()216a9643ea8Slogwang void MicroThread::CleanState()
217a9643ea8Slogwang {
218a9643ea8Slogwang TAILQ_INIT(&_fdset);
219a9643ea8Slogwang TAILQ_INIT(&_sub_list);
220a9643ea8Slogwang _flag = NOT_INLIST;
221a9643ea8Slogwang _type = NORMAL;
222a9643ea8Slogwang _state = INITIAL;
223a9643ea8Slogwang _start = NULL;
224a9643ea8Slogwang _args = NULL;
225a9643ea8Slogwang _parent = NULL;
226a9643ea8Slogwang }
227a9643ea8Slogwang
Run()228a9643ea8Slogwang void MicroThread::Run()
229a9643ea8Slogwang {
230a9643ea8Slogwang if (_start) {
231a9643ea8Slogwang _start(_args);
232a9643ea8Slogwang }
233a9643ea8Slogwang
234a9643ea8Slogwang if (this->IsSubThread()) {
235a9643ea8Slogwang this->WakeupParent();
236a9643ea8Slogwang }
237a9643ea8Slogwang
238a9643ea8Slogwang ScheduleObj::Instance()->ScheduleReclaim();
239a9643ea8Slogwang ScheduleObj::Instance()->ScheduleThread();
240a9643ea8Slogwang }
241a9643ea8Slogwang
WakeupParent()242a9643ea8Slogwang void MicroThread::WakeupParent()
243a9643ea8Slogwang {
244a9643ea8Slogwang MicroThread* parent = this->GetParent();
245a9643ea8Slogwang if (parent)
246a9643ea8Slogwang {
247a9643ea8Slogwang parent->RemoveSubThread(this);
248a9643ea8Slogwang if (parent->HasNoSubThread())
249a9643ea8Slogwang {
250a9643ea8Slogwang ScheduleObj::Instance()->ScheduleUnpend(parent);
251a9643ea8Slogwang }
252a9643ea8Slogwang }
253a9643ea8Slogwang else
254a9643ea8Slogwang {
255a9643ea8Slogwang MTLOG_ERROR("Sub thread no parent, error");
256a9643ea8Slogwang }
257a9643ea8Slogwang }
258a9643ea8Slogwang
HasNoSubThread()259a9643ea8Slogwang bool MicroThread::HasNoSubThread()
260a9643ea8Slogwang {
261a9643ea8Slogwang return TAILQ_EMPTY(&_sub_list);
262a9643ea8Slogwang }
263a9643ea8Slogwang
AddSubThread(MicroThread * sub)264a9643ea8Slogwang void MicroThread::AddSubThread(MicroThread* sub)
265a9643ea8Slogwang {
266a9643ea8Slogwang ASSERT(!sub->HasFlag(MicroThread::SUB_LIST));
267a9643ea8Slogwang if (!sub->HasFlag(MicroThread::SUB_LIST))
268a9643ea8Slogwang {
269a9643ea8Slogwang TAILQ_INSERT_TAIL(&_sub_list, sub, _sub_entry);
270a9643ea8Slogwang sub->_parent = this;
271a9643ea8Slogwang }
272a9643ea8Slogwang
273a9643ea8Slogwang sub->SetFlag(MicroThread::SUB_LIST);
274a9643ea8Slogwang }
275a9643ea8Slogwang
RemoveSubThread(MicroThread * sub)276a9643ea8Slogwang void MicroThread::RemoveSubThread(MicroThread* sub)
277a9643ea8Slogwang {
278a9643ea8Slogwang ASSERT(sub->HasFlag(MicroThread::SUB_LIST));
279a9643ea8Slogwang if (sub->HasFlag(MicroThread::SUB_LIST))
280a9643ea8Slogwang {
281a9643ea8Slogwang TAILQ_REMOVE(&_sub_list, sub, _sub_entry);
282a9643ea8Slogwang sub->_parent = NULL;
283a9643ea8Slogwang }
284a9643ea8Slogwang
285a9643ea8Slogwang sub->UnsetFlag(MicroThread::SUB_LIST);
286a9643ea8Slogwang }
287a9643ea8Slogwang
2885ac59bc4Slogwang ScheduleObj *ScheduleObj::_instance = NULL;
Instance()289a9643ea8Slogwang inline ScheduleObj* ScheduleObj::Instance()
290a9643ea8Slogwang {
291a9643ea8Slogwang if (NULL == _instance)
292a9643ea8Slogwang {
293a9643ea8Slogwang _instance = new ScheduleObj();
294a9643ea8Slogwang }
295a9643ea8Slogwang
296a9643ea8Slogwang return _instance;
297a9643ea8Slogwang }
298a9643ea8Slogwang
ScheduleThread()299a9643ea8Slogwang void ScheduleObj::ScheduleThread()
300a9643ea8Slogwang {
301a9643ea8Slogwang MtFrame* frame = MtFrame::Instance();
302a9643ea8Slogwang frame->ThreadSchdule();
303a9643ea8Slogwang }
304a9643ea8Slogwang
ScheduleGetTime()305a9643ea8Slogwang utime64_t ScheduleObj::ScheduleGetTime()
306a9643ea8Slogwang {
307a9643ea8Slogwang MtFrame* frame = MtFrame::Instance();
308a9643ea8Slogwang if (frame)
309a9643ea8Slogwang {
310a9643ea8Slogwang return frame->GetLastClock();
311a9643ea8Slogwang }
312a9643ea8Slogwang else
313a9643ea8Slogwang {
314a9643ea8Slogwang MTLOG_ERROR("frame time failed, maybe not init");
315a9643ea8Slogwang return 0;
316a9643ea8Slogwang }
317a9643ea8Slogwang }
318a9643ea8Slogwang
ScheduleSleep()319a9643ea8Slogwang void ScheduleObj::ScheduleSleep()
320a9643ea8Slogwang {
321a9643ea8Slogwang MtFrame* frame = MtFrame::Instance();
322a9643ea8Slogwang MicroThread* thread = frame->GetActiveThread();
323a9643ea8Slogwang if ((!frame) || (!thread)) {
324a9643ea8Slogwang MTLOG_ERROR("frame and act thread null, %p, %p", frame, thread);
325a9643ea8Slogwang return;
326a9643ea8Slogwang }
327a9643ea8Slogwang
328a9643ea8Slogwang frame->InsertSleep(thread);
329a9643ea8Slogwang frame->ThreadSchdule();
330a9643ea8Slogwang }
331a9643ea8Slogwang
SchedulePend()332a9643ea8Slogwang void ScheduleObj::SchedulePend()
333a9643ea8Slogwang {
334a9643ea8Slogwang MtFrame* frame = MtFrame::Instance();
335a9643ea8Slogwang MicroThread* thread = frame->GetActiveThread();
336a9643ea8Slogwang if ((!frame) || (!thread)) {
337a9643ea8Slogwang MTLOG_ERROR("frame and act thread null, %p, %p", frame, thread);
338a9643ea8Slogwang return;
339a9643ea8Slogwang }
340a9643ea8Slogwang
341a9643ea8Slogwang frame->InsertPend(thread);
342a9643ea8Slogwang frame->ThreadSchdule();
343a9643ea8Slogwang }
344a9643ea8Slogwang
ScheduleUnpend(void * pthread)345a9643ea8Slogwang void ScheduleObj::ScheduleUnpend(void* pthread)
346a9643ea8Slogwang {
347a9643ea8Slogwang MtFrame* frame = MtFrame::Instance();
348a9643ea8Slogwang MicroThread* thread = (MicroThread*)pthread;
349a9643ea8Slogwang if ((!frame) || (!thread)) {
350a9643ea8Slogwang MTLOG_ERROR("frame and act thread null, %p, %p", frame, thread);
351a9643ea8Slogwang return;
352a9643ea8Slogwang }
353a9643ea8Slogwang
354a9643ea8Slogwang frame->RemovePend(thread);
355a9643ea8Slogwang frame->InsertRunable(thread);
356a9643ea8Slogwang }
357a9643ea8Slogwang
ScheduleReclaim()358a9643ea8Slogwang void ScheduleObj::ScheduleReclaim()
359a9643ea8Slogwang {
360a9643ea8Slogwang MtFrame* frame = MtFrame::Instance();
361a9643ea8Slogwang MicroThread* thread = frame->GetActiveThread();
362a9643ea8Slogwang if ((!frame) || (!thread)) {
363a9643ea8Slogwang MTLOG_ERROR("frame and act thread null, %p, %p", frame, thread);
364a9643ea8Slogwang return;
365a9643ea8Slogwang }
366a9643ea8Slogwang
367a9643ea8Slogwang frame->FreeThread(thread);
368a9643ea8Slogwang }
369a9643ea8Slogwang
ScheduleStartRun()370a9643ea8Slogwang void ScheduleObj::ScheduleStartRun()
371a9643ea8Slogwang {
372a9643ea8Slogwang MtFrame* frame = MtFrame::Instance();
373a9643ea8Slogwang MicroThread* thread = frame->GetActiveThread();
374a9643ea8Slogwang if ((!frame) || (!thread)) {
375a9643ea8Slogwang MTLOG_ERROR("frame and act thread null, %p, %p", frame, thread);
376a9643ea8Slogwang return;
377a9643ea8Slogwang }
378a9643ea8Slogwang
379a9643ea8Slogwang thread->Run();
380a9643ea8Slogwang }
381a9643ea8Slogwang
382a9643ea8Slogwang
3835ac59bc4Slogwang unsigned int ThreadPool::default_thread_num = DEFAULT_THREAD_NUM; ///< 2000 micro threads.
384*84bcae25Swoolen unsigned int ThreadPool::last_default_thread_num = DEFAULT_THREAD_NUM; ///< 2000 micro threads.
3855ac59bc4Slogwang unsigned int ThreadPool::default_stack_size = DEFAULT_STACK_SIZE; ///< 128k stack.
386a9643ea8Slogwang
InitialPool(int max_num)387a9643ea8Slogwang bool ThreadPool::InitialPool(int max_num)
388a9643ea8Slogwang {
389a9643ea8Slogwang MicroThread *thread = NULL;
390a9643ea8Slogwang for (unsigned int i = 0; i < default_thread_num; i++)
391a9643ea8Slogwang {
392a9643ea8Slogwang thread = new MicroThread();
393a9643ea8Slogwang if ((NULL == thread) || (false == thread->Initial()))
394a9643ea8Slogwang {
395a9643ea8Slogwang MTLOG_ERROR("init pool, thread %p init failed", thread);
396a9643ea8Slogwang if (thread) delete thread;
397a9643ea8Slogwang continue;
398a9643ea8Slogwang }
399a9643ea8Slogwang thread->SetFlag(MicroThread::FREE_LIST);
400a9643ea8Slogwang _freelist.push(thread);
401a9643ea8Slogwang }
402a9643ea8Slogwang
403a9643ea8Slogwang _total_num = _freelist.size();
404a9643ea8Slogwang _max_num = max_num;
405a9643ea8Slogwang _use_num = 0;
406a9643ea8Slogwang if (_total_num <= 0)
407a9643ea8Slogwang {
408a9643ea8Slogwang return false;
409a9643ea8Slogwang }
410a9643ea8Slogwang else
411a9643ea8Slogwang {
412a9643ea8Slogwang return true;
413a9643ea8Slogwang }
414a9643ea8Slogwang }
415a9643ea8Slogwang
DestroyPool()416a9643ea8Slogwang void ThreadPool::DestroyPool()
417a9643ea8Slogwang {
418a9643ea8Slogwang MicroThread* thread = NULL;
419a9643ea8Slogwang while (!_freelist.empty())
420a9643ea8Slogwang {
421a9643ea8Slogwang thread = _freelist.front();
422a9643ea8Slogwang _freelist.pop();
423a9643ea8Slogwang thread->Destroy();
424a9643ea8Slogwang delete thread;
425a9643ea8Slogwang }
426a9643ea8Slogwang
427a9643ea8Slogwang _total_num = 0;
428a9643ea8Slogwang _use_num = 0;
429a9643ea8Slogwang }
430a9643ea8Slogwang
AllocThread()431a9643ea8Slogwang MicroThread* ThreadPool::AllocThread()
432a9643ea8Slogwang {
4335ac59bc4Slogwang MT_ATTR_API_SET(492069, _total_num);
434a9643ea8Slogwang
435a9643ea8Slogwang MicroThread* thread = NULL;
436a9643ea8Slogwang if (!_freelist.empty())
437a9643ea8Slogwang {
438a9643ea8Slogwang thread = _freelist.front();
439a9643ea8Slogwang _freelist.pop();
440a9643ea8Slogwang
441a9643ea8Slogwang ASSERT(thread->HasFlag(MicroThread::FREE_LIST));
442a9643ea8Slogwang
443a9643ea8Slogwang thread->UnsetFlag(MicroThread::FREE_LIST);
444a9643ea8Slogwang _use_num++;
445a9643ea8Slogwang return thread;
446a9643ea8Slogwang }
447a9643ea8Slogwang
448a9643ea8Slogwang MT_ATTR_API(320846, 1); // pool no nore
449a9643ea8Slogwang if (_total_num >= _max_num)
450a9643ea8Slogwang {
451a9643ea8Slogwang MT_ATTR_API(361140, 1); // no more quota
452*84bcae25Swoolen MTLOG_ERROR("total %d is outof max: %d", _total_num,_max_num);
453a9643ea8Slogwang return NULL;
454a9643ea8Slogwang }
455a9643ea8Slogwang
456a9643ea8Slogwang thread = new MicroThread();
457a9643ea8Slogwang if ((NULL == thread) || (false == thread->Initial()))
458a9643ea8Slogwang {
459a9643ea8Slogwang MT_ATTR_API(320847, 1); // pool init fail
460a9643ea8Slogwang MTLOG_ERROR("thread alloc failed, thread: %p", thread);
461a9643ea8Slogwang if (thread) delete thread;
462a9643ea8Slogwang return NULL;
463a9643ea8Slogwang }
464a9643ea8Slogwang _total_num++;
465a9643ea8Slogwang _use_num++;
466*84bcae25Swoolen if(_use_num >(int) default_thread_num){
467*84bcae25Swoolen if(((int) default_thread_num * 2 )< _max_num){
468*84bcae25Swoolen last_default_thread_num = default_thread_num;
469*84bcae25Swoolen default_thread_num = default_thread_num * 2;
470*84bcae25Swoolen }
471*84bcae25Swoolen }
472a9643ea8Slogwang
473a9643ea8Slogwang return thread;
474a9643ea8Slogwang }
475a9643ea8Slogwang
FreeThread(MicroThread * thread)476a9643ea8Slogwang void ThreadPool::FreeThread(MicroThread* thread)
477a9643ea8Slogwang {
478a9643ea8Slogwang ASSERT(!thread->HasFlag(MicroThread::FREE_LIST));
479a9643ea8Slogwang thread->Reset();
480a9643ea8Slogwang _use_num--;
481a9643ea8Slogwang _freelist.push(thread);
482a9643ea8Slogwang thread->SetFlag(MicroThread::FREE_LIST);
483a9643ea8Slogwang
484a9643ea8Slogwang unsigned int free_num = _freelist.size();
485a9643ea8Slogwang if ((free_num > default_thread_num) && (free_num > 1))
486a9643ea8Slogwang {
487a9643ea8Slogwang thread = _freelist.front();
488a9643ea8Slogwang _freelist.pop();
489a9643ea8Slogwang thread->Destroy();
490a9643ea8Slogwang delete thread;
491a9643ea8Slogwang _total_num--;
492*84bcae25Swoolen if(default_thread_num / 2 >= DEFAULT_THREAD_NUM){
493*84bcae25Swoolen last_default_thread_num = default_thread_num;
494*84bcae25Swoolen default_thread_num = default_thread_num / 2;
495*84bcae25Swoolen }
496a9643ea8Slogwang }
497a9643ea8Slogwang }
498a9643ea8Slogwang
GetUsedNum(void)499a9643ea8Slogwang int ThreadPool::GetUsedNum(void)
500a9643ea8Slogwang {
501a9643ea8Slogwang return _use_num;
502a9643ea8Slogwang }
503a9643ea8Slogwang
504a9643ea8Slogwang MtFrame *MtFrame::_instance = NULL;
Instance()505a9643ea8Slogwang inline MtFrame* MtFrame::Instance ()
506a9643ea8Slogwang {
507a9643ea8Slogwang if (NULL == _instance )
508a9643ea8Slogwang {
509a9643ea8Slogwang _instance = new MtFrame();
510a9643ea8Slogwang }
511a9643ea8Slogwang
512a9643ea8Slogwang return _instance;
513a9643ea8Slogwang }
514a9643ea8Slogwang
SetHookFlag()515a9643ea8Slogwang void MtFrame::SetHookFlag() {
516a9643ea8Slogwang mt_set_hook_flag();
517a9643ea8Slogwang };
518a9643ea8Slogwang
InitFrame(LogAdapter * logadpt,int max_thread_num)519a9643ea8Slogwang bool MtFrame::InitFrame(LogAdapter* logadpt, int max_thread_num)
520a9643ea8Slogwang {
521*84bcae25Swoolen if(logadpt == NULL){
522*84bcae25Swoolen _log_adpt = &def_log_adapt;
523*84bcae25Swoolen }else{
524a9643ea8Slogwang _log_adpt = logadpt;
525*84bcae25Swoolen }
526a9643ea8Slogwang
527a9643ea8Slogwang if ((this->InitKqueue(max_thread_num) < 0) || !this->InitialPool(max_thread_num))
528a9643ea8Slogwang {
529a9643ea8Slogwang MTLOG_ERROR("Init epoll or thread pool failed");
530a9643ea8Slogwang this->Destroy();
531a9643ea8Slogwang return false;
532a9643ea8Slogwang }
533a9643ea8Slogwang if (_sleeplist.HeapResize(max_thread_num * 2) < 0)
534a9643ea8Slogwang {
535a9643ea8Slogwang MTLOG_ERROR("Init heap list failed");
536a9643ea8Slogwang this->Destroy();
537a9643ea8Slogwang return false;
538a9643ea8Slogwang }
539a9643ea8Slogwang
540a9643ea8Slogwang _timer = new CTimerMng(max_thread_num * 2);
541a9643ea8Slogwang if (NULL == _timer)
542a9643ea8Slogwang {
543a9643ea8Slogwang MTLOG_ERROR("Init heap timer failed");
544a9643ea8Slogwang this->Destroy();
545a9643ea8Slogwang return false;
546a9643ea8Slogwang }
547a9643ea8Slogwang
548a9643ea8Slogwang _daemon = AllocThread();
549a9643ea8Slogwang if (NULL == _daemon)
550a9643ea8Slogwang {
551a9643ea8Slogwang MTLOG_ERROR("Alloc daemon thread failed");
552a9643ea8Slogwang this->Destroy();
553a9643ea8Slogwang return false;
554a9643ea8Slogwang }
555a9643ea8Slogwang _daemon->SetType(MicroThread::DAEMON);
556a9643ea8Slogwang _daemon->SetState(MicroThread::RUNABLE);
557a9643ea8Slogwang _daemon->SetSartFunc(MtFrame::DaemonRun, this);
558a9643ea8Slogwang
559a9643ea8Slogwang _primo = new MicroThread(MicroThread::PRIMORDIAL);
560a9643ea8Slogwang if (NULL == _primo)
561a9643ea8Slogwang {
562a9643ea8Slogwang MTLOG_ERROR("new _primo thread failed");
563a9643ea8Slogwang this->Destroy();
564a9643ea8Slogwang return false;
565a9643ea8Slogwang }
566a9643ea8Slogwang _primo->SetState(MicroThread::RUNNING);
567a9643ea8Slogwang SetActiveThread(_primo);
568a9643ea8Slogwang
569a9643ea8Slogwang _last_clock = GetSystemMS();
570a9643ea8Slogwang TAILQ_INIT(&_iolist);
571a9643ea8Slogwang TAILQ_INIT(&_pend_list);
572a9643ea8Slogwang
573a9643ea8Slogwang //SetHookFlag();
574a9643ea8Slogwang
575a9643ea8Slogwang return true;
576a9643ea8Slogwang
577a9643ea8Slogwang }
578a9643ea8Slogwang
Destroy(void)579a9643ea8Slogwang void MtFrame::Destroy(void)
580a9643ea8Slogwang {
581a9643ea8Slogwang if (NULL == _instance )
582a9643ea8Slogwang {
583a9643ea8Slogwang return;
584a9643ea8Slogwang }
585a9643ea8Slogwang
586a9643ea8Slogwang if (_primo) {
587a9643ea8Slogwang delete _primo;
588a9643ea8Slogwang _primo = NULL;
589a9643ea8Slogwang }
590a9643ea8Slogwang
591a9643ea8Slogwang if (_daemon) {
592a9643ea8Slogwang FreeThread(_daemon);
593a9643ea8Slogwang _daemon = NULL;
594a9643ea8Slogwang }
595a9643ea8Slogwang
596a9643ea8Slogwang TAILQ_INIT(&_iolist);
597a9643ea8Slogwang
598a9643ea8Slogwang MicroThread* thread = dynamic_cast<MicroThread*>(_sleeplist.HeapPop());
599a9643ea8Slogwang while (thread)
600a9643ea8Slogwang {
601a9643ea8Slogwang FreeThread(thread);
602a9643ea8Slogwang thread = dynamic_cast<MicroThread*>(_sleeplist.HeapPop());
603a9643ea8Slogwang }
604a9643ea8Slogwang
605a9643ea8Slogwang while (!_runlist.empty())
606a9643ea8Slogwang {
607a9643ea8Slogwang thread = _runlist.front();
608a9643ea8Slogwang _runlist.pop();
609a9643ea8Slogwang FreeThread(thread);
610a9643ea8Slogwang }
611a9643ea8Slogwang
612a9643ea8Slogwang MicroThread* tmp;
613a9643ea8Slogwang TAILQ_FOREACH_SAFE(thread, &_pend_list, _entry, tmp)
614a9643ea8Slogwang {
615a9643ea8Slogwang TAILQ_REMOVE(&_pend_list, thread, _entry);
616a9643ea8Slogwang FreeThread(thread);
617a9643ea8Slogwang }
618a9643ea8Slogwang
619a9643ea8Slogwang if (_timer != NULL)
620a9643ea8Slogwang {
621a9643ea8Slogwang delete _timer;
622a9643ea8Slogwang _timer = NULL;
623a9643ea8Slogwang }
624a9643ea8Slogwang
625a9643ea8Slogwang _instance->DestroyPool();
626a9643ea8Slogwang _instance->TermKqueue();
627a9643ea8Slogwang delete _instance;
628a9643ea8Slogwang _instance = NULL;
629a9643ea8Slogwang }
630a9643ea8Slogwang
Version()631a9643ea8Slogwang char* MtFrame::Version()
632a9643ea8Slogwang {
633a9643ea8Slogwang return IMT_VERSION;
634a9643ea8Slogwang }
635a9643ea8Slogwang
CreateThread(ThreadStart entry,void * args,bool runable)636a9643ea8Slogwang MicroThread* MtFrame::CreateThread(ThreadStart entry, void *args, bool runable)
637a9643ea8Slogwang {
638a9643ea8Slogwang MtFrame* mtframe = MtFrame::Instance();
639a9643ea8Slogwang MicroThread* thread = mtframe->AllocThread();
640a9643ea8Slogwang if (NULL == thread)
641a9643ea8Slogwang {
642a9643ea8Slogwang MTLOG_ERROR("create thread failed");
643a9643ea8Slogwang return NULL;
644a9643ea8Slogwang }
645a9643ea8Slogwang thread->SetSartFunc(entry, args);
646a9643ea8Slogwang
647a9643ea8Slogwang if (runable) {
648a9643ea8Slogwang mtframe->InsertRunable(thread);
649a9643ea8Slogwang }
650a9643ea8Slogwang
651a9643ea8Slogwang return thread;
652a9643ea8Slogwang }
653a9643ea8Slogwang
Loop(void * args)654a9643ea8Slogwang int MtFrame::Loop(void* args)
655a9643ea8Slogwang {
656a9643ea8Slogwang MtFrame* mtframe = MtFrame::Instance();
657a9643ea8Slogwang MicroThread* daemon = mtframe->DaemonThread();
658a9643ea8Slogwang
659a9643ea8Slogwang mtframe->KqueueDispatch();
660a9643ea8Slogwang mtframe->SetLastClock(mtframe->GetSystemMS());
661a9643ea8Slogwang mtframe->WakeupTimeout();
662a9643ea8Slogwang mtframe->CheckExpired();
663a9643ea8Slogwang daemon->SwitchContext();
664a9643ea8Slogwang
665a9643ea8Slogwang return 0;
666a9643ea8Slogwang }
667a9643ea8Slogwang
DaemonRun(void * args)668a9643ea8Slogwang void MtFrame::DaemonRun(void* args)
669a9643ea8Slogwang {
670a9643ea8Slogwang /*
671a9643ea8Slogwang MtFrame* mtframe = MtFrame::Instance();
672a9643ea8Slogwang MicroThread* daemon = mtframe->DaemonThread();
673a9643ea8Slogwang
674a9643ea8Slogwang while (true) {
675a9643ea8Slogwang mtframe->KqueueDispatch();
676a9643ea8Slogwang mtframe->SetLastClock(mtframe->GetSystemMS());
677a9643ea8Slogwang mtframe->WakeupTimeout();
678a9643ea8Slogwang mtframe->CheckExpired();
679a9643ea8Slogwang daemon->SwitchContext();
680a9643ea8Slogwang }
681a9643ea8Slogwang */
682a9643ea8Slogwang ff_run(MtFrame::Loop, NULL);
683a9643ea8Slogwang }
684a9643ea8Slogwang
GetRootThread()685a9643ea8Slogwang MicroThread *MtFrame::GetRootThread()
686a9643ea8Slogwang {
687a9643ea8Slogwang if (NULL == _curr_thread)
688a9643ea8Slogwang {
689a9643ea8Slogwang return NULL;
690a9643ea8Slogwang }
691a9643ea8Slogwang
692a9643ea8Slogwang MicroThread::ThreadType type = _curr_thread->GetType();
693a9643ea8Slogwang MicroThread *thread = _curr_thread;
694a9643ea8Slogwang MicroThread *parent = thread;
695a9643ea8Slogwang
696a9643ea8Slogwang while (MicroThread::SUB_THREAD == type)
697a9643ea8Slogwang {
698a9643ea8Slogwang thread = thread->GetParent();
699a9643ea8Slogwang if (!thread)
700a9643ea8Slogwang {
701a9643ea8Slogwang break;
702a9643ea8Slogwang }
703a9643ea8Slogwang
704a9643ea8Slogwang type = thread->GetType();
705a9643ea8Slogwang parent = thread;
706a9643ea8Slogwang }
707a9643ea8Slogwang
708a9643ea8Slogwang return parent;
709a9643ea8Slogwang }
710a9643ea8Slogwang
ThreadSchdule()711a9643ea8Slogwang void MtFrame::ThreadSchdule()
712a9643ea8Slogwang {
713a9643ea8Slogwang MicroThread* thread = NULL;
714a9643ea8Slogwang MtFrame* mtframe = MtFrame::Instance();
715a9643ea8Slogwang
716a9643ea8Slogwang if (mtframe->_runlist.empty())
717a9643ea8Slogwang {
718a9643ea8Slogwang thread = mtframe->DaemonThread();
719a9643ea8Slogwang }
720a9643ea8Slogwang else
721a9643ea8Slogwang {
722a9643ea8Slogwang thread = mtframe->_runlist.front();
723a9643ea8Slogwang mtframe->RemoveRunable(thread);
724a9643ea8Slogwang }
725a9643ea8Slogwang
726a9643ea8Slogwang this->SetActiveThread(thread);
727a9643ea8Slogwang thread->SetState(MicroThread::RUNNING);
728a9643ea8Slogwang thread->RestoreContext();
729a9643ea8Slogwang }
730a9643ea8Slogwang
CheckExpired()731a9643ea8Slogwang void MtFrame::CheckExpired()
732a9643ea8Slogwang {
733a9643ea8Slogwang static utime64_t check_time = 0;
734a9643ea8Slogwang
735a9643ea8Slogwang if (_timer != NULL)
736a9643ea8Slogwang {
737a9643ea8Slogwang _timer->check_expired();
738a9643ea8Slogwang }
739a9643ea8Slogwang
740a9643ea8Slogwang utime64_t now = GetLastClock();
741a9643ea8Slogwang
742a9643ea8Slogwang if ((now - check_time) > 1000)
743a9643ea8Slogwang {
744a9643ea8Slogwang CNetMgr::Instance()->RecycleObjs(now);
745a9643ea8Slogwang check_time = now;
746a9643ea8Slogwang }
747a9643ea8Slogwang }
748a9643ea8Slogwang
WakeupTimeout()749a9643ea8Slogwang void MtFrame::WakeupTimeout()
750a9643ea8Slogwang {
751a9643ea8Slogwang utime64_t now = GetLastClock();
752a9643ea8Slogwang MicroThread* thread = dynamic_cast<MicroThread*>(_sleeplist.HeapTop());
753a9643ea8Slogwang while (thread && (thread->GetWakeupTime() <= now))
754a9643ea8Slogwang {
755a9643ea8Slogwang if (thread->HasFlag(MicroThread::IO_LIST))
756a9643ea8Slogwang {
757a9643ea8Slogwang RemoveIoWait(thread);
758a9643ea8Slogwang }
759a9643ea8Slogwang else
760a9643ea8Slogwang {
761a9643ea8Slogwang RemoveSleep(thread);
762a9643ea8Slogwang }
763a9643ea8Slogwang
764a9643ea8Slogwang InsertRunable(thread);
765a9643ea8Slogwang
766a9643ea8Slogwang thread = dynamic_cast<MicroThread*>(_sleeplist.HeapTop());
767a9643ea8Slogwang }
768a9643ea8Slogwang }
769a9643ea8Slogwang
KqueueGetTimeout()770a9643ea8Slogwang int MtFrame::KqueueGetTimeout()
771a9643ea8Slogwang {
772a9643ea8Slogwang utime64_t now = GetLastClock();
773a9643ea8Slogwang MicroThread* thread = dynamic_cast<MicroThread*>(_sleeplist.HeapTop());
774a9643ea8Slogwang if (!thread)
775a9643ea8Slogwang {
7765ac59bc4Slogwang return 10; //default 10ms epollwait
777a9643ea8Slogwang }
778a9643ea8Slogwang else if (thread->GetWakeupTime() < now)
779a9643ea8Slogwang {
780a9643ea8Slogwang return 0;
781a9643ea8Slogwang }
782a9643ea8Slogwang else
783a9643ea8Slogwang {
784a9643ea8Slogwang return (int)(thread->GetWakeupTime() - now);
785a9643ea8Slogwang }
786a9643ea8Slogwang }
787a9643ea8Slogwang
InsertSleep(MicroThread * thread)788a9643ea8Slogwang inline void MtFrame::InsertSleep(MicroThread* thread)
789a9643ea8Slogwang {
790a9643ea8Slogwang ASSERT(!thread->HasFlag(MicroThread::SLEEP_LIST));
791a9643ea8Slogwang
792a9643ea8Slogwang thread->SetFlag(MicroThread::SLEEP_LIST);
793a9643ea8Slogwang thread->SetState(MicroThread::SLEEPING);
794a9643ea8Slogwang int rc = _sleeplist.HeapPush(thread);
795a9643ea8Slogwang if (rc < 0)
796a9643ea8Slogwang {
797a9643ea8Slogwang MT_ATTR_API(320848, 1); // heap error
798a9643ea8Slogwang MTLOG_ERROR("Insert heap failed , rc %d", rc);
799a9643ea8Slogwang }
800a9643ea8Slogwang }
801a9643ea8Slogwang
RemoveSleep(MicroThread * thread)802a9643ea8Slogwang inline void MtFrame::RemoveSleep(MicroThread* thread)
803a9643ea8Slogwang {
804a9643ea8Slogwang ASSERT(thread->HasFlag(MicroThread::SLEEP_LIST));
805a9643ea8Slogwang thread->UnsetFlag(MicroThread::SLEEP_LIST);
806a9643ea8Slogwang
807a9643ea8Slogwang int rc = _sleeplist.HeapDelete(thread);
808a9643ea8Slogwang if (rc < 0)
809a9643ea8Slogwang {
810a9643ea8Slogwang MT_ATTR_API(320849, 1); // heap error
811a9643ea8Slogwang MTLOG_ERROR("remove heap failed , rc %d", rc);
812a9643ea8Slogwang }
813a9643ea8Slogwang }
814a9643ea8Slogwang
InsertIoWait(MicroThread * thread)815a9643ea8Slogwang inline void MtFrame::InsertIoWait(MicroThread* thread)
816a9643ea8Slogwang {
817a9643ea8Slogwang ASSERT(!thread->HasFlag(MicroThread::IO_LIST));
818a9643ea8Slogwang thread->SetFlag(MicroThread::IO_LIST);
819a9643ea8Slogwang TAILQ_INSERT_TAIL(&_iolist, thread, _entry);
820a9643ea8Slogwang InsertSleep(thread);
821a9643ea8Slogwang }
822a9643ea8Slogwang
RemoveIoWait(MicroThread * thread)823a9643ea8Slogwang void MtFrame::RemoveIoWait(MicroThread* thread)
824a9643ea8Slogwang {
825a9643ea8Slogwang ASSERT(thread->HasFlag(MicroThread::IO_LIST));
826a9643ea8Slogwang thread->UnsetFlag(MicroThread::IO_LIST);
827a9643ea8Slogwang TAILQ_REMOVE(&_iolist, thread, _entry);
828a9643ea8Slogwang
829a9643ea8Slogwang RemoveSleep(thread);
830a9643ea8Slogwang }
831a9643ea8Slogwang
InsertRunable(MicroThread * thread)832a9643ea8Slogwang void MtFrame::InsertRunable(MicroThread* thread)
833a9643ea8Slogwang {
834a9643ea8Slogwang ASSERT(!thread->HasFlag(MicroThread::RUN_LIST));
835a9643ea8Slogwang thread->SetFlag(MicroThread::RUN_LIST);
836a9643ea8Slogwang
837a9643ea8Slogwang thread->SetState(MicroThread::RUNABLE);
838a9643ea8Slogwang _runlist.push(thread);
839a9643ea8Slogwang _waitnum++;
840a9643ea8Slogwang }
841a9643ea8Slogwang
RemoveRunable(MicroThread * thread)842a9643ea8Slogwang inline void MtFrame::RemoveRunable(MicroThread* thread)
843a9643ea8Slogwang {
844a9643ea8Slogwang ASSERT(thread->HasFlag(MicroThread::RUN_LIST));
845a9643ea8Slogwang ASSERT(thread == _runlist.front());
846a9643ea8Slogwang thread->UnsetFlag(MicroThread::RUN_LIST);
847a9643ea8Slogwang
848a9643ea8Slogwang _runlist.pop();
849a9643ea8Slogwang _waitnum--;
850a9643ea8Slogwang }
851a9643ea8Slogwang
InsertPend(MicroThread * thread)852a9643ea8Slogwang void MtFrame::InsertPend(MicroThread* thread)
853a9643ea8Slogwang {
854a9643ea8Slogwang ASSERT(!thread->HasFlag(MicroThread::PEND_LIST));
855a9643ea8Slogwang thread->SetFlag(MicroThread::PEND_LIST);
856a9643ea8Slogwang TAILQ_INSERT_TAIL(&_pend_list, thread, _entry);
857a9643ea8Slogwang thread->SetState(MicroThread::PENDING);
858a9643ea8Slogwang }
859a9643ea8Slogwang
RemovePend(MicroThread * thread)860a9643ea8Slogwang void MtFrame::RemovePend(MicroThread* thread)
861a9643ea8Slogwang {
862a9643ea8Slogwang ASSERT(thread->HasFlag(MicroThread::PEND_LIST));
863a9643ea8Slogwang thread->UnsetFlag(MicroThread::PEND_LIST);
864a9643ea8Slogwang TAILQ_REMOVE(&_pend_list, thread, _entry);
865a9643ea8Slogwang }
866a9643ea8Slogwang
WaitNotify(utime64_t timeout)867a9643ea8Slogwang void MtFrame::WaitNotify(utime64_t timeout)
868a9643ea8Slogwang {
869a9643ea8Slogwang MicroThread* thread = GetActiveThread();
870a9643ea8Slogwang
871a9643ea8Slogwang thread->SetWakeupTime(timeout + this->GetLastClock());
872a9643ea8Slogwang this->InsertIoWait(thread);
873a9643ea8Slogwang thread->SwitchContext();
874a9643ea8Slogwang }
875a9643ea8Slogwang
NotifyThread(MicroThread * thread)876*84bcae25Swoolen void MtFrame::NotifyThread(MicroThread* thread)
877*84bcae25Swoolen {
878*84bcae25Swoolen if(thread == NULL){
879*84bcae25Swoolen return;
880*84bcae25Swoolen }
881*84bcae25Swoolen MicroThread* cur_thread = GetActiveThread();
882*84bcae25Swoolen if (thread->HasFlag(MicroThread::IO_LIST))
883*84bcae25Swoolen {
884*84bcae25Swoolen this->RemoveIoWait(thread);
885*84bcae25Swoolen if(cur_thread == this->DaemonThread()){
886*84bcae25Swoolen // ���ﲻֱ���еĻ�,���Dz���ʱ,�ᵼ��Ŀ���̵߳ȴ�����ʱ
887*84bcae25Swoolen if(cur_thread->SaveContext() == 0){
888*84bcae25Swoolen this->SetActiveThread(thread);
889*84bcae25Swoolen thread->SetState(MicroThread::RUNNING);
890*84bcae25Swoolen thread->RestoreContext();
891*84bcae25Swoolen }
892*84bcae25Swoolen }else{
893*84bcae25Swoolen this->InsertRunable(thread);
894*84bcae25Swoolen }
895*84bcae25Swoolen }
896*84bcae25Swoolen }
897*84bcae25Swoolen
SwapDaemonThread()898*84bcae25Swoolen void MtFrame::SwapDaemonThread()
899*84bcae25Swoolen {
900*84bcae25Swoolen MicroThread* thread = GetActiveThread();
901*84bcae25Swoolen MicroThread* daemon_thread = this->DaemonThread();
902*84bcae25Swoolen if(thread != daemon_thread){
903*84bcae25Swoolen if(thread->SaveContext() == 0){
904*84bcae25Swoolen this->InsertRunable(thread);
905*84bcae25Swoolen this->SetActiveThread(daemon_thread);
906*84bcae25Swoolen daemon_thread->SetState(MicroThread::RUNNING);
907*84bcae25Swoolen daemon_thread->RestoreContext();
908*84bcae25Swoolen }
909*84bcae25Swoolen }
910*84bcae25Swoolen }
911*84bcae25Swoolen
KqueueSchedule(KqObjList * fdlist,KqueuerObj * fd,int timeout)912a9643ea8Slogwang bool MtFrame::KqueueSchedule(KqObjList* fdlist, KqueuerObj* fd, int timeout)
913a9643ea8Slogwang {
914a9643ea8Slogwang MicroThread* thread = GetActiveThread();
915a9643ea8Slogwang if (NULL == thread)
916a9643ea8Slogwang {
917a9643ea8Slogwang MTLOG_ERROR("active thread null, epoll schedule failed");
918a9643ea8Slogwang return false;
919a9643ea8Slogwang }
920a9643ea8Slogwang
921a9643ea8Slogwang thread->ClearAllFd();
922a9643ea8Slogwang if (fdlist)
923a9643ea8Slogwang {
924a9643ea8Slogwang thread->AddFdList(fdlist);
925a9643ea8Slogwang }
926a9643ea8Slogwang if (fd)
927a9643ea8Slogwang {
928a9643ea8Slogwang thread->AddFd(fd);
929a9643ea8Slogwang }
930a9643ea8Slogwang
931a9643ea8Slogwang thread->SetWakeupTime(timeout + this->GetLastClock());
932a9643ea8Slogwang if (!this->KqueueAdd(thread->GetFdSet()))
933a9643ea8Slogwang {
934a9643ea8Slogwang MTLOG_ERROR("epoll add failed, errno: %d", errno);
935a9643ea8Slogwang return false;
936a9643ea8Slogwang }
937a9643ea8Slogwang this->InsertIoWait(thread);
938a9643ea8Slogwang thread->SwitchContext();
939a9643ea8Slogwang
940a9643ea8Slogwang int rcvnum = 0;
941a9643ea8Slogwang KqObjList& rcvfds = thread->GetFdSet();
942a9643ea8Slogwang KqueuerObj* fdata = NULL;
943a9643ea8Slogwang TAILQ_FOREACH(fdata, &rcvfds, _entry)
944a9643ea8Slogwang {
945a9643ea8Slogwang if (fdata->GetRcvEvents() != 0)
946a9643ea8Slogwang {
947a9643ea8Slogwang rcvnum++;
948a9643ea8Slogwang }
949a9643ea8Slogwang }
9505ac59bc4Slogwang this->KqueueDel(rcvfds);
951a9643ea8Slogwang
9525ac59bc4Slogwang if (rcvnum == 0)
953a9643ea8Slogwang {
954a9643ea8Slogwang errno = ETIME;
955a9643ea8Slogwang return false;
956a9643ea8Slogwang }
957a9643ea8Slogwang
958a9643ea8Slogwang return true;
959a9643ea8Slogwang }
960a9643ea8Slogwang
recvfrom(int fd,void * buf,int len,int flags,struct sockaddr * from,socklen_t * fromlen,int timeout)961a9643ea8Slogwang int MtFrame::recvfrom(int fd, void *buf, int len, int flags, struct sockaddr *from, socklen_t *fromlen, int timeout)
962a9643ea8Slogwang {
963a9643ea8Slogwang MtFrame* mtframe = MtFrame::Instance();
964a9643ea8Slogwang utime64_t start = mtframe->GetLastClock();
965a9643ea8Slogwang MicroThread* thread = mtframe->GetActiveThread();
966a9643ea8Slogwang utime64_t now = 0;
967a9643ea8Slogwang
968a9643ea8Slogwang if(fd<0 || !buf || len<1)
969a9643ea8Slogwang {
970a9643ea8Slogwang errno = EINVAL;
971a9643ea8Slogwang MTLOG_ERROR("recvfrom failed, errno: %d (%m)", errno);
972a9643ea8Slogwang return -10;
973a9643ea8Slogwang }
974a9643ea8Slogwang
975a9643ea8Slogwang if (timeout <= -1)
976a9643ea8Slogwang {
977a9643ea8Slogwang timeout = 0x7fffffff;
978a9643ea8Slogwang }
979a9643ea8Slogwang
980a9643ea8Slogwang while (true)
981a9643ea8Slogwang {
982a9643ea8Slogwang now = mtframe->GetLastClock();
983a9643ea8Slogwang if ((int)(now - start) > timeout)
984a9643ea8Slogwang {
985a9643ea8Slogwang errno = ETIME;
986a9643ea8Slogwang return -1;
987a9643ea8Slogwang }
988a9643ea8Slogwang
989a9643ea8Slogwang KqueuerObj epfd;
990a9643ea8Slogwang epfd.SetOsfd(fd);
991a9643ea8Slogwang epfd.EnableInput();
992a9643ea8Slogwang epfd.SetOwnerThread(thread);
993a9643ea8Slogwang if (!mtframe->KqueueSchedule(NULL, &epfd, timeout))
994a9643ea8Slogwang {
995a9643ea8Slogwang MTLOG_DEBUG("epoll schedule failed, errno: %d", errno);
996a9643ea8Slogwang return -2;
997a9643ea8Slogwang }
998a9643ea8Slogwang
999a9643ea8Slogwang mt_hook_syscall(recvfrom);
1000a9643ea8Slogwang int n = ff_hook_recvfrom(fd, buf, len, flags, from, fromlen);
1001a9643ea8Slogwang if (n < 0)
1002a9643ea8Slogwang {
1003a9643ea8Slogwang if (errno == EINTR) {
1004a9643ea8Slogwang continue;
1005a9643ea8Slogwang }
1006a9643ea8Slogwang
1007a9643ea8Slogwang if ((errno != EAGAIN) && (errno != EWOULDBLOCK))
1008a9643ea8Slogwang {
1009a9643ea8Slogwang MTLOG_ERROR("recvfrom failed, errno: %d", errno);
1010a9643ea8Slogwang return -3;
1011a9643ea8Slogwang }
1012a9643ea8Slogwang }
1013a9643ea8Slogwang else
1014a9643ea8Slogwang {
1015a9643ea8Slogwang return n;
1016a9643ea8Slogwang }
1017a9643ea8Slogwang }
1018a9643ea8Slogwang
1019a9643ea8Slogwang }
1020a9643ea8Slogwang
sendto(int fd,const void * msg,int len,int flags,const struct sockaddr * to,int tolen,int timeout)1021a9643ea8Slogwang int MtFrame::sendto(int fd, const void *msg, int len, int flags, const struct sockaddr *to, int tolen, int timeout)
1022a9643ea8Slogwang {
1023a9643ea8Slogwang MtFrame* mtframe = MtFrame::Instance();
1024a9643ea8Slogwang utime64_t start = mtframe->GetLastClock();
1025a9643ea8Slogwang MicroThread* thread = mtframe->GetActiveThread();
1026a9643ea8Slogwang utime64_t now = 0;
1027a9643ea8Slogwang
1028a9643ea8Slogwang if(fd<0 || !msg || len<1)
1029a9643ea8Slogwang {
1030a9643ea8Slogwang errno = EINVAL;
1031a9643ea8Slogwang MTLOG_ERROR("sendto failed, errno: %d (%m)", errno);
1032a9643ea8Slogwang return -10;
1033a9643ea8Slogwang }
1034a9643ea8Slogwang
1035a9643ea8Slogwang int n = 0;
1036a9643ea8Slogwang mt_hook_syscall(sendto);
1037a9643ea8Slogwang while ((n = ff_hook_sendto(fd, msg, len, flags, to, tolen)) < 0)
1038a9643ea8Slogwang {
1039a9643ea8Slogwang now = mtframe->GetLastClock();
1040a9643ea8Slogwang if ((int)(now - start) > timeout)
1041a9643ea8Slogwang {
1042a9643ea8Slogwang errno = ETIME;
1043a9643ea8Slogwang return -1;
1044a9643ea8Slogwang }
1045a9643ea8Slogwang
1046a9643ea8Slogwang if (errno == EINTR) {
1047a9643ea8Slogwang continue;
1048a9643ea8Slogwang }
1049a9643ea8Slogwang
1050a9643ea8Slogwang if ((errno != EAGAIN) && (errno != EWOULDBLOCK)) {
1051a9643ea8Slogwang MTLOG_ERROR("sendto failed, errno: %d", errno);
1052a9643ea8Slogwang return -2;
1053a9643ea8Slogwang }
1054a9643ea8Slogwang
1055a9643ea8Slogwang KqueuerObj epfd;
1056a9643ea8Slogwang epfd.SetOsfd(fd);
1057a9643ea8Slogwang epfd.EnableOutput();
1058a9643ea8Slogwang epfd.SetOwnerThread(thread);
1059a9643ea8Slogwang if (!mtframe->KqueueSchedule(NULL, &epfd, timeout)) {
1060a9643ea8Slogwang return -3;
1061a9643ea8Slogwang }
1062a9643ea8Slogwang }
1063a9643ea8Slogwang
1064a9643ea8Slogwang return n;
1065a9643ea8Slogwang }
1066a9643ea8Slogwang
connect(int fd,const struct sockaddr * addr,int addrlen,int timeout)1067a9643ea8Slogwang int MtFrame::connect(int fd, const struct sockaddr *addr, int addrlen, int timeout)
1068a9643ea8Slogwang {
1069a9643ea8Slogwang MtFrame* mtframe = MtFrame::Instance();
1070a9643ea8Slogwang utime64_t start = mtframe->GetLastClock();
1071a9643ea8Slogwang MicroThread* thread = mtframe->GetActiveThread();
1072a9643ea8Slogwang utime64_t now = 0;
1073a9643ea8Slogwang
1074a9643ea8Slogwang if(fd<0 || !addr || addrlen<1)
1075a9643ea8Slogwang {
1076a9643ea8Slogwang errno = EINVAL;
1077a9643ea8Slogwang MTLOG_ERROR("connect failed, errno: %d (%m)", errno);
1078a9643ea8Slogwang return -10;
1079a9643ea8Slogwang }
1080a9643ea8Slogwang
1081a9643ea8Slogwang int n = 0;
1082a9643ea8Slogwang mt_hook_syscall(connect);
1083a9643ea8Slogwang while ((n = ff_hook_connect(fd, addr, addrlen)) < 0)
1084a9643ea8Slogwang {
1085a9643ea8Slogwang now = mtframe->GetLastClock();
1086a9643ea8Slogwang if ((int)(now - start) > timeout)
1087a9643ea8Slogwang {
1088a9643ea8Slogwang errno = ETIME;
1089a9643ea8Slogwang return -1;
1090a9643ea8Slogwang }
1091a9643ea8Slogwang
10925ac59bc4Slogwang if (errno == EISCONN)
1093a9643ea8Slogwang {
1094a9643ea8Slogwang return 0;
1095a9643ea8Slogwang }
1096a9643ea8Slogwang
1097a9643ea8Slogwang if (errno == EINTR) {
1098a9643ea8Slogwang continue;
1099a9643ea8Slogwang }
1100a9643ea8Slogwang
1101a9643ea8Slogwang if (errno != EINPROGRESS) {
1102a9643ea8Slogwang MTLOG_ERROR("connect failed, errno: %d", errno);
1103a9643ea8Slogwang return -2;
1104a9643ea8Slogwang }
1105a9643ea8Slogwang
1106a9643ea8Slogwang KqueuerObj epfd;
1107a9643ea8Slogwang epfd.SetOsfd(fd);
1108a9643ea8Slogwang epfd.EnableOutput();
1109a9643ea8Slogwang epfd.SetOwnerThread(thread);
1110a9643ea8Slogwang if (!mtframe->KqueueSchedule(NULL, &epfd, timeout)) {
1111a9643ea8Slogwang return -3;
1112a9643ea8Slogwang }
1113a9643ea8Slogwang }
1114a9643ea8Slogwang
1115a9643ea8Slogwang return n;
1116a9643ea8Slogwang }
1117a9643ea8Slogwang
accept(int fd,struct sockaddr * addr,socklen_t * addrlen,int timeout)1118a9643ea8Slogwang int MtFrame::accept(int fd, struct sockaddr *addr, socklen_t *addrlen, int timeout)
1119a9643ea8Slogwang {
1120a9643ea8Slogwang MtFrame* mtframe = MtFrame::Instance();
1121a9643ea8Slogwang utime64_t start = mtframe->GetLastClock();
1122a9643ea8Slogwang MicroThread* thread = mtframe->GetActiveThread();
1123a9643ea8Slogwang utime64_t now = 0;
1124a9643ea8Slogwang
1125a9643ea8Slogwang if(fd<0)
1126a9643ea8Slogwang {
1127a9643ea8Slogwang errno = EINVAL;
1128a9643ea8Slogwang MTLOG_ERROR("accept failed, errno: %d (%m)", errno);
1129a9643ea8Slogwang return -10;
1130a9643ea8Slogwang }
1131a9643ea8Slogwang
1132a9643ea8Slogwang int acceptfd = 0;
1133a9643ea8Slogwang mt_hook_syscall(accept);
1134a9643ea8Slogwang while ((acceptfd = ff_hook_accept(fd, addr, addrlen)) < 0)
1135a9643ea8Slogwang {
1136a9643ea8Slogwang now = mtframe->GetLastClock();
1137a9643ea8Slogwang if ((int)(now - start) > timeout)
1138a9643ea8Slogwang {
1139a9643ea8Slogwang errno = ETIME;
1140a9643ea8Slogwang return -1;
1141a9643ea8Slogwang }
1142a9643ea8Slogwang
1143a9643ea8Slogwang if (errno == EINTR) {
1144a9643ea8Slogwang continue;
1145a9643ea8Slogwang }
1146a9643ea8Slogwang
1147a9643ea8Slogwang if (!((errno == EAGAIN) || (errno == EWOULDBLOCK))) {
1148a9643ea8Slogwang MTLOG_ERROR("accept failed, errno: %d", errno);
1149a9643ea8Slogwang return -2;
1150a9643ea8Slogwang }
1151a9643ea8Slogwang
1152a9643ea8Slogwang KqueuerObj epfd;
1153a9643ea8Slogwang epfd.SetOsfd(fd);
1154a9643ea8Slogwang epfd.EnableInput();
1155a9643ea8Slogwang epfd.SetOwnerThread(thread);
1156a9643ea8Slogwang if (!mtframe->KqueueSchedule(NULL, &epfd, timeout)) {
1157a9643ea8Slogwang return -3;
1158a9643ea8Slogwang }
1159a9643ea8Slogwang }
1160a9643ea8Slogwang
1161a9643ea8Slogwang return acceptfd;
1162a9643ea8Slogwang }
1163a9643ea8Slogwang
read(int fd,void * buf,size_t nbyte,int timeout)1164a9643ea8Slogwang ssize_t MtFrame::read(int fd, void *buf, size_t nbyte, int timeout)
1165a9643ea8Slogwang {
1166a9643ea8Slogwang MtFrame* mtframe = MtFrame::Instance();
1167a9643ea8Slogwang utime64_t start = mtframe->GetLastClock();
1168a9643ea8Slogwang MicroThread* thread = mtframe->GetActiveThread();
1169a9643ea8Slogwang utime64_t now = 0;
1170a9643ea8Slogwang
1171a9643ea8Slogwang if(fd<0 || !buf || nbyte<1)
1172a9643ea8Slogwang {
1173a9643ea8Slogwang errno = EINVAL;
1174a9643ea8Slogwang MTLOG_ERROR("read failed, errno: %d (%m)", errno);
1175a9643ea8Slogwang return -10;
1176a9643ea8Slogwang }
1177a9643ea8Slogwang
1178a9643ea8Slogwang ssize_t n = 0;
1179a9643ea8Slogwang mt_hook_syscall(read);
1180a9643ea8Slogwang while ((n = ff_hook_read(fd, buf, nbyte)) < 0)
1181a9643ea8Slogwang {
1182a9643ea8Slogwang now = mtframe->GetLastClock();
1183a9643ea8Slogwang if ((int)(now - start) > timeout)
1184a9643ea8Slogwang {
1185a9643ea8Slogwang errno = ETIME;
1186a9643ea8Slogwang return -1;
1187a9643ea8Slogwang }
1188a9643ea8Slogwang
1189a9643ea8Slogwang if (errno == EINTR) {
1190a9643ea8Slogwang continue;
1191a9643ea8Slogwang }
1192a9643ea8Slogwang
1193a9643ea8Slogwang if ((errno != EAGAIN) && (errno != EWOULDBLOCK)) {
1194a9643ea8Slogwang MTLOG_ERROR("read failed, errno: %d", errno);
1195a9643ea8Slogwang return -2;
1196a9643ea8Slogwang }
1197a9643ea8Slogwang
1198a9643ea8Slogwang KqueuerObj epfd;
1199a9643ea8Slogwang epfd.SetOsfd(fd);
1200a9643ea8Slogwang epfd.EnableInput();
1201a9643ea8Slogwang epfd.SetOwnerThread(thread);
1202a9643ea8Slogwang if (!mtframe->KqueueSchedule(NULL, &epfd, timeout)) {
1203a9643ea8Slogwang return -3;
1204a9643ea8Slogwang }
1205a9643ea8Slogwang }
1206a9643ea8Slogwang
1207a9643ea8Slogwang return n;
1208a9643ea8Slogwang }
1209a9643ea8Slogwang
write(int fd,const void * buf,size_t nbyte,int timeout)1210a9643ea8Slogwang ssize_t MtFrame::write(int fd, const void *buf, size_t nbyte, int timeout)
1211a9643ea8Slogwang {
1212a9643ea8Slogwang MtFrame* mtframe = MtFrame::Instance();
1213a9643ea8Slogwang utime64_t start = mtframe->GetLastClock();
1214a9643ea8Slogwang MicroThread* thread = mtframe->GetActiveThread();
1215a9643ea8Slogwang utime64_t now = 0;
1216a9643ea8Slogwang
1217a9643ea8Slogwang if(fd<0 || !buf || nbyte<1)
1218a9643ea8Slogwang {
1219a9643ea8Slogwang errno = EINVAL;
1220a9643ea8Slogwang MTLOG_ERROR("write failed, errno: %d (%m)", errno);
1221a9643ea8Slogwang return -10;
1222a9643ea8Slogwang }
1223a9643ea8Slogwang
1224a9643ea8Slogwang ssize_t n = 0;
1225a9643ea8Slogwang size_t send_len = 0;
1226a9643ea8Slogwang while (send_len < nbyte)
1227a9643ea8Slogwang {
1228a9643ea8Slogwang now = mtframe->GetLastClock();
1229a9643ea8Slogwang if ((int)(now - start) > timeout)
1230a9643ea8Slogwang {
1231a9643ea8Slogwang errno = ETIME;
1232a9643ea8Slogwang return -1;
1233a9643ea8Slogwang }
1234a9643ea8Slogwang
1235a9643ea8Slogwang mt_hook_syscall(write);
1236a9643ea8Slogwang n = ff_hook_write(fd, (char*)buf + send_len, nbyte - send_len);
1237a9643ea8Slogwang if (n < 0)
1238a9643ea8Slogwang {
1239a9643ea8Slogwang if (errno == EINTR) {
1240a9643ea8Slogwang continue;
1241a9643ea8Slogwang }
1242a9643ea8Slogwang
1243a9643ea8Slogwang if ((errno != EAGAIN) && (errno != EWOULDBLOCK)) {
1244a9643ea8Slogwang MTLOG_ERROR("write failed, errno: %d", errno);
1245a9643ea8Slogwang return -2;
1246a9643ea8Slogwang }
1247a9643ea8Slogwang }
1248a9643ea8Slogwang else
1249a9643ea8Slogwang {
1250a9643ea8Slogwang send_len += n;
1251a9643ea8Slogwang if (send_len >= nbyte) {
1252a9643ea8Slogwang return nbyte;
1253a9643ea8Slogwang }
1254a9643ea8Slogwang }
1255a9643ea8Slogwang
1256a9643ea8Slogwang KqueuerObj epfd;
1257a9643ea8Slogwang epfd.SetOsfd(fd);
1258a9643ea8Slogwang epfd.EnableOutput();
1259a9643ea8Slogwang epfd.SetOwnerThread(thread);
1260a9643ea8Slogwang if (!mtframe->KqueueSchedule(NULL, &epfd, timeout)) {
1261a9643ea8Slogwang return -3;
1262a9643ea8Slogwang }
1263a9643ea8Slogwang }
1264a9643ea8Slogwang
1265a9643ea8Slogwang return nbyte;
1266a9643ea8Slogwang }
1267a9643ea8Slogwang
recv(int fd,void * buf,int len,int flags,int timeout)1268a9643ea8Slogwang int MtFrame::recv(int fd, void *buf, int len, int flags, int timeout)
1269a9643ea8Slogwang {
1270a9643ea8Slogwang MtFrame* mtframe = MtFrame::Instance();
1271a9643ea8Slogwang utime64_t start = mtframe->GetLastClock();
1272a9643ea8Slogwang MicroThread* thread = mtframe->GetActiveThread();
1273a9643ea8Slogwang utime64_t now = 0;
1274a9643ea8Slogwang
1275a9643ea8Slogwang if(fd<0 || !buf || len<1)
1276a9643ea8Slogwang {
1277a9643ea8Slogwang errno = EINVAL;
1278a9643ea8Slogwang MTLOG_ERROR("recv failed, errno: %d (%m)", errno);
1279a9643ea8Slogwang return -10;
1280a9643ea8Slogwang }
1281a9643ea8Slogwang
1282a9643ea8Slogwang if (timeout <= -1)
1283a9643ea8Slogwang {
1284a9643ea8Slogwang timeout = 0x7fffffff;
1285a9643ea8Slogwang }
1286a9643ea8Slogwang
1287a9643ea8Slogwang while (true)
1288a9643ea8Slogwang {
1289a9643ea8Slogwang now = mtframe->GetLastClock();
1290a9643ea8Slogwang if ((int)(now - start) > timeout)
1291a9643ea8Slogwang {
1292a9643ea8Slogwang errno = ETIME;
1293a9643ea8Slogwang return -1;
1294a9643ea8Slogwang }
1295a9643ea8Slogwang
1296a9643ea8Slogwang KqueuerObj epfd;
1297a9643ea8Slogwang epfd.SetOsfd(fd);
1298a9643ea8Slogwang epfd.EnableInput();
1299a9643ea8Slogwang epfd.SetOwnerThread(thread);
1300a9643ea8Slogwang if (!mtframe->KqueueSchedule(NULL, &epfd, timeout))
1301a9643ea8Slogwang {
1302a9643ea8Slogwang MTLOG_DEBUG("epoll schedule failed, errno: %d", errno);
1303a9643ea8Slogwang return -2;
1304a9643ea8Slogwang }
1305a9643ea8Slogwang
1306a9643ea8Slogwang mt_hook_syscall(recv);
1307a9643ea8Slogwang int n = ff_hook_recv(fd, buf, len, flags);
1308a9643ea8Slogwang if (n < 0)
1309a9643ea8Slogwang {
1310a9643ea8Slogwang if (errno == EINTR) {
1311a9643ea8Slogwang continue;
1312a9643ea8Slogwang }
1313a9643ea8Slogwang
1314a9643ea8Slogwang if ((errno != EAGAIN) && (errno != EWOULDBLOCK))
1315a9643ea8Slogwang {
1316a9643ea8Slogwang MTLOG_ERROR("recv failed, errno: %d", errno);
1317a9643ea8Slogwang return -3;
1318a9643ea8Slogwang }
1319a9643ea8Slogwang }
1320a9643ea8Slogwang else
1321a9643ea8Slogwang {
1322a9643ea8Slogwang return n;
1323a9643ea8Slogwang }
1324a9643ea8Slogwang }
1325a9643ea8Slogwang
1326a9643ea8Slogwang }
1327a9643ea8Slogwang
send(int fd,const void * buf,size_t nbyte,int flags,int timeout)1328a9643ea8Slogwang ssize_t MtFrame::send(int fd, const void *buf, size_t nbyte, int flags, int timeout)
1329a9643ea8Slogwang {
1330a9643ea8Slogwang MtFrame* mtframe = MtFrame::Instance();
1331a9643ea8Slogwang utime64_t start = mtframe->GetLastClock();
1332a9643ea8Slogwang MicroThread* thread = mtframe->GetActiveThread();
1333a9643ea8Slogwang utime64_t now = 0;
1334a9643ea8Slogwang
1335a9643ea8Slogwang if(fd<0 || !buf || nbyte<1)
1336a9643ea8Slogwang {
1337a9643ea8Slogwang errno = EINVAL;
1338a9643ea8Slogwang MTLOG_ERROR("send failed, errno: %d (%m)", errno);
1339a9643ea8Slogwang return -10;
1340a9643ea8Slogwang }
1341a9643ea8Slogwang
1342a9643ea8Slogwang ssize_t n = 0;
1343a9643ea8Slogwang size_t send_len = 0;
1344a9643ea8Slogwang while (send_len < nbyte)
1345a9643ea8Slogwang {
1346a9643ea8Slogwang now = mtframe->GetLastClock();
1347a9643ea8Slogwang if ((int)(now - start) > timeout)
1348a9643ea8Slogwang {
1349a9643ea8Slogwang errno = ETIME;
1350a9643ea8Slogwang return -1;
1351a9643ea8Slogwang }
1352a9643ea8Slogwang
1353a9643ea8Slogwang mt_hook_syscall(send);
1354a9643ea8Slogwang n = ff_hook_send(fd, (char*)buf + send_len, nbyte - send_len, flags);
1355a9643ea8Slogwang if (n < 0)
1356a9643ea8Slogwang {
1357a9643ea8Slogwang if (errno == EINTR) {
1358a9643ea8Slogwang continue;
1359a9643ea8Slogwang }
1360a9643ea8Slogwang
1361a9643ea8Slogwang if ((errno != EAGAIN) && (errno != EWOULDBLOCK)) {
1362a9643ea8Slogwang MTLOG_ERROR("write failed, errno: %d", errno);
1363a9643ea8Slogwang return -2;
1364a9643ea8Slogwang }
1365a9643ea8Slogwang }
1366a9643ea8Slogwang else
1367a9643ea8Slogwang {
1368a9643ea8Slogwang send_len += n;
1369a9643ea8Slogwang if (send_len >= nbyte) {
1370a9643ea8Slogwang return nbyte;
1371a9643ea8Slogwang }
1372a9643ea8Slogwang }
1373a9643ea8Slogwang
1374a9643ea8Slogwang KqueuerObj epfd;
1375a9643ea8Slogwang epfd.SetOsfd(fd);
1376a9643ea8Slogwang epfd.EnableOutput();
1377a9643ea8Slogwang epfd.SetOwnerThread(thread);
1378a9643ea8Slogwang if (!mtframe->KqueueSchedule(NULL, &epfd, timeout)) {
1379a9643ea8Slogwang return -3;
1380a9643ea8Slogwang }
1381a9643ea8Slogwang }
1382a9643ea8Slogwang
1383a9643ea8Slogwang return nbyte;
1384a9643ea8Slogwang }
1385a9643ea8Slogwang
sleep(int ms)1386a9643ea8Slogwang void MtFrame::sleep(int ms)
1387a9643ea8Slogwang {
1388a9643ea8Slogwang MtFrame* frame = MtFrame::Instance();
1389a9643ea8Slogwang MicroThread* thread = frame->GetActiveThread();
1390a9643ea8Slogwang if (thread != NULL)
1391a9643ea8Slogwang {
1392a9643ea8Slogwang thread->sleep(ms);
1393a9643ea8Slogwang }
1394a9643ea8Slogwang }
1395a9643ea8Slogwang
WaitEvents(int fd,int events,int timeout)1396a9643ea8Slogwang int MtFrame::WaitEvents(int fd, int events, int timeout)
1397a9643ea8Slogwang {
1398a9643ea8Slogwang MtFrame* mtframe = MtFrame::Instance();
1399a9643ea8Slogwang utime64_t start = mtframe->GetLastClock();
1400a9643ea8Slogwang MicroThread* thread = mtframe->GetActiveThread();
1401a9643ea8Slogwang utime64_t now = 0;
1402a9643ea8Slogwang
1403a9643ea8Slogwang if (timeout <= -1)
1404a9643ea8Slogwang {
1405a9643ea8Slogwang timeout = 0x7fffffff;
1406a9643ea8Slogwang }
1407a9643ea8Slogwang
1408a9643ea8Slogwang while (true)
1409a9643ea8Slogwang {
1410a9643ea8Slogwang now = mtframe->GetLastClock();
1411a9643ea8Slogwang if ((int)(now - start) > timeout)
1412a9643ea8Slogwang {
1413a9643ea8Slogwang errno = ETIME;
1414a9643ea8Slogwang return 0;
1415a9643ea8Slogwang }
1416a9643ea8Slogwang
1417a9643ea8Slogwang KqueuerObj epfd;
1418a9643ea8Slogwang epfd.SetOsfd(fd);
1419a9643ea8Slogwang if (events & KQ_EVENT_READ)
1420a9643ea8Slogwang {
1421a9643ea8Slogwang epfd.EnableInput();
1422a9643ea8Slogwang }
1423a9643ea8Slogwang if (events & KQ_EVENT_WRITE)
1424a9643ea8Slogwang {
1425a9643ea8Slogwang epfd.EnableOutput();
1426a9643ea8Slogwang }
1427a9643ea8Slogwang epfd.SetOwnerThread(thread);
1428a9643ea8Slogwang
1429a9643ea8Slogwang if (!mtframe->KqueueSchedule(NULL, &epfd, timeout))
1430a9643ea8Slogwang {
1431a9643ea8Slogwang MTLOG_TRACE("epoll schedule failed, errno: %d", errno);
1432a9643ea8Slogwang return 0;
1433a9643ea8Slogwang }
1434a9643ea8Slogwang
1435a9643ea8Slogwang return epfd.GetRcvEvents();
1436a9643ea8Slogwang }
1437a9643ea8Slogwang }
1438