1a9643ea8Slogwang 2a9643ea8Slogwang /** 3a9643ea8Slogwang * Tencent is pleased to support the open source community by making MSEC available. 4a9643ea8Slogwang * 5a9643ea8Slogwang * Copyright (C) 2016 THL A29 Limited, a Tencent company. All rights reserved. 6a9643ea8Slogwang * 7a9643ea8Slogwang * Licensed under the GNU General Public License, Version 2.0 (the "License"); 8a9643ea8Slogwang * you may not use this file except in compliance with the License. You may 9a9643ea8Slogwang * obtain a copy of the License at 10a9643ea8Slogwang * 11a9643ea8Slogwang * https://opensource.org/licenses/GPL-2.0 12a9643ea8Slogwang * 13a9643ea8Slogwang * Unless required by applicable law or agreed to in writing, software distributed under the 14a9643ea8Slogwang * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, 15a9643ea8Slogwang * either express or implied. See the License for the specific language governing permissions 16a9643ea8Slogwang * and limitations under the License. 17a9643ea8Slogwang */ 18a9643ea8Slogwang 19a9643ea8Slogwang 20a9643ea8Slogwang /** 21a9643ea8Slogwang * @filename micro_thread.cpp 22a9643ea8Slogwang * @info micro thread manager 23a9643ea8Slogwang */ 24a9643ea8Slogwang #include "mt_version.h" 25a9643ea8Slogwang #include "micro_thread.h" 26a9643ea8Slogwang #include "mt_net.h" 27a9643ea8Slogwang #include "valgrind.h" 28a9643ea8Slogwang #include <assert.h> 29a9643ea8Slogwang #include "mt_sys_hook.h" 30a9643ea8Slogwang #include "ff_hook.h" 31a9643ea8Slogwang #include "ff_api.h" 32a9643ea8Slogwang 33a9643ea8Slogwang using namespace NS_MICRO_THREAD; 34a9643ea8Slogwang 35a9643ea8Slogwang #define ASSERT(statement) 36a9643ea8Slogwang //#define ASSERT(statement) assert(statement) 37a9643ea8Slogwang 38a9643ea8Slogwang extern "C" int save_context(jmp_buf jbf); 39a9643ea8Slogwang 40a9643ea8Slogwang extern "C" void restore_context(jmp_buf jbf, int ret); 41a9643ea8Slogwang 42a9643ea8Slogwang extern "C" void replace_esp(jmp_buf jbf, void* esp); 43a9643ea8Slogwang 44a9643ea8Slogwang Thread::Thread(int stack_size) 45a9643ea8Slogwang { 46a9643ea8Slogwang _stack_size = stack_size ? stack_size : ThreadPool::default_stack_size; 47a9643ea8Slogwang _wakeup_time = 0; 48a9643ea8Slogwang _stack = NULL; 49a9643ea8Slogwang memset(&_jmpbuf, 0, sizeof(_jmpbuf)); 50a9643ea8Slogwang } 51a9643ea8Slogwang 52a9643ea8Slogwang 53a9643ea8Slogwang /** 54*5ac59bc4Slogwang * @brief LINUX x86/x86_64's allocated stacks. 55a9643ea8Slogwang */ 56a9643ea8Slogwang bool Thread::InitStack() 57a9643ea8Slogwang { 58a9643ea8Slogwang if (_stack) { 59a9643ea8Slogwang return true; 60a9643ea8Slogwang } 61a9643ea8Slogwang 62*5ac59bc4Slogwang ///< stack index and memory are separated to prevent out of bounds. 63a9643ea8Slogwang _stack = (MtStack*)calloc(1, sizeof(MtStack)); 64a9643ea8Slogwang if (NULL == _stack) 65a9643ea8Slogwang { 66a9643ea8Slogwang MTLOG_ERROR("calloc stack failed, size %u", sizeof(MtStack)); 67a9643ea8Slogwang return false; 68a9643ea8Slogwang } 69a9643ea8Slogwang 70a9643ea8Slogwang int memsize = MEM_PAGE_SIZE*2 + _stack_size; 71a9643ea8Slogwang memsize = (memsize + MEM_PAGE_SIZE - 1)/MEM_PAGE_SIZE*MEM_PAGE_SIZE; 72a9643ea8Slogwang 73a9643ea8Slogwang static int zero_fd = -1; 74a9643ea8Slogwang int mmap_flags = MAP_PRIVATE | MAP_ANON; 75a9643ea8Slogwang void* vaddr = mmap(NULL, memsize, PROT_READ | PROT_WRITE, mmap_flags, zero_fd, 0); 76a9643ea8Slogwang if (vaddr == (void *)MAP_FAILED) 77a9643ea8Slogwang { 78a9643ea8Slogwang MTLOG_ERROR("mmap stack failed, size %d", memsize); 79a9643ea8Slogwang free(_stack); 80a9643ea8Slogwang _stack = NULL; 81a9643ea8Slogwang return false; 82a9643ea8Slogwang } 83a9643ea8Slogwang _stack->_vaddr = (char*)vaddr; 84a9643ea8Slogwang _stack->_vaddr_size = memsize; 85a9643ea8Slogwang _stack->_stk_size = _stack_size; 86a9643ea8Slogwang _stack->_stk_bottom = _stack->_vaddr + MEM_PAGE_SIZE; 87a9643ea8Slogwang _stack->_stk_top = _stack->_stk_bottom + _stack->_stk_size; 88a9643ea8Slogwang // valgrind support: register stack frame 89a9643ea8Slogwang _stack->valgrind_id = VALGRIND_STACK_REGISTER(_stack->_stk_bottom, _stack->_stk_top); 90a9643ea8Slogwang 91a9643ea8Slogwang _stack->_esp = _stack->_stk_top - STACK_PAD_SIZE; 92a9643ea8Slogwang 93a9643ea8Slogwang mprotect(_stack->_vaddr, MEM_PAGE_SIZE, PROT_NONE); 94a9643ea8Slogwang mprotect(_stack->_stk_top, MEM_PAGE_SIZE, PROT_NONE); 95a9643ea8Slogwang 96a9643ea8Slogwang return true; 97a9643ea8Slogwang } 98a9643ea8Slogwang 99a9643ea8Slogwang 100a9643ea8Slogwang void Thread::FreeStack() 101a9643ea8Slogwang { 102a9643ea8Slogwang if (!_stack) { 103a9643ea8Slogwang return; 104a9643ea8Slogwang } 105a9643ea8Slogwang munmap(_stack->_vaddr, _stack->_vaddr_size); 106a9643ea8Slogwang // valgrind support: deregister stack frame 107a9643ea8Slogwang VALGRIND_STACK_DEREGISTER(_stack->valgrind_id); 108a9643ea8Slogwang free(_stack); 109a9643ea8Slogwang _stack = NULL; 110a9643ea8Slogwang } 111a9643ea8Slogwang 112a9643ea8Slogwang void Thread::InitContext() 113a9643ea8Slogwang { 114a9643ea8Slogwang if (save_context(_jmpbuf) != 0) 115a9643ea8Slogwang { 116*5ac59bc4Slogwang ScheduleObj::Instance()->ScheduleStartRun(); 117a9643ea8Slogwang } 118a9643ea8Slogwang 119a9643ea8Slogwang if (_stack != NULL) 120a9643ea8Slogwang { 121a9643ea8Slogwang replace_esp(_jmpbuf, _stack->_esp); 122a9643ea8Slogwang } 123a9643ea8Slogwang } 124a9643ea8Slogwang 125a9643ea8Slogwang void Thread::SwitchContext() 126a9643ea8Slogwang { 127a9643ea8Slogwang if (save_context(_jmpbuf) == 0) 128a9643ea8Slogwang { 129a9643ea8Slogwang ScheduleObj::Instance()->ScheduleThread(); 130a9643ea8Slogwang } 131a9643ea8Slogwang } 132a9643ea8Slogwang 133a9643ea8Slogwang void Thread::RestoreContext() 134a9643ea8Slogwang { 135a9643ea8Slogwang restore_context(_jmpbuf, 1); 136a9643ea8Slogwang } 137a9643ea8Slogwang 138*5ac59bc4Slogwang 139a9643ea8Slogwang bool Thread::Initial() 140a9643ea8Slogwang { 141a9643ea8Slogwang if (!InitStack()) 142a9643ea8Slogwang { 143a9643ea8Slogwang MTLOG_ERROR("init stack failed"); 144a9643ea8Slogwang return false; 145a9643ea8Slogwang } 146a9643ea8Slogwang 147a9643ea8Slogwang InitContext(); 148a9643ea8Slogwang 149a9643ea8Slogwang return true; 150a9643ea8Slogwang } 151a9643ea8Slogwang 152a9643ea8Slogwang void Thread::Destroy() 153a9643ea8Slogwang { 154a9643ea8Slogwang FreeStack(); 155a9643ea8Slogwang memset(&_jmpbuf, 0, sizeof(_jmpbuf)); 156a9643ea8Slogwang } 157a9643ea8Slogwang 158a9643ea8Slogwang void Thread::Reset() 159a9643ea8Slogwang { 160a9643ea8Slogwang _wakeup_time = 0; 161a9643ea8Slogwang SetPrivate(NULL); 162a9643ea8Slogwang 163a9643ea8Slogwang InitContext(); 164a9643ea8Slogwang CleanState(); 165a9643ea8Slogwang } 166a9643ea8Slogwang 167a9643ea8Slogwang void Thread::sleep(int ms) 168a9643ea8Slogwang { 169a9643ea8Slogwang utime64_t now = ScheduleObj::Instance()->ScheduleGetTime(); 170a9643ea8Slogwang _wakeup_time = now + ms; 171a9643ea8Slogwang 172a9643ea8Slogwang if (save_context(_jmpbuf) == 0) 173a9643ea8Slogwang { 174a9643ea8Slogwang ScheduleObj::Instance()->ScheduleSleep(); 175a9643ea8Slogwang } 176a9643ea8Slogwang } 177a9643ea8Slogwang 178a9643ea8Slogwang void Thread::Wait() 179a9643ea8Slogwang { 180a9643ea8Slogwang if (save_context(_jmpbuf) == 0) 181a9643ea8Slogwang { 182a9643ea8Slogwang ScheduleObj::Instance()->SchedulePend(); 183a9643ea8Slogwang } 184a9643ea8Slogwang } 185a9643ea8Slogwang 186a9643ea8Slogwang bool Thread::CheckStackHealth(char *esp) 187a9643ea8Slogwang { 188a9643ea8Slogwang if (!_stack) 189a9643ea8Slogwang return false; 190a9643ea8Slogwang 191a9643ea8Slogwang if (esp > _stack->_stk_bottom && esp < _stack->_stk_top) 192a9643ea8Slogwang return true; 193a9643ea8Slogwang else 194a9643ea8Slogwang return false; 195a9643ea8Slogwang } 196a9643ea8Slogwang 197a9643ea8Slogwang MicroThread::MicroThread(ThreadType type) 198a9643ea8Slogwang { 199a9643ea8Slogwang memset(&_entry, 0, sizeof(_entry)); 200a9643ea8Slogwang TAILQ_INIT(&_fdset); 201a9643ea8Slogwang TAILQ_INIT(&_sub_list); 202a9643ea8Slogwang _flag = NOT_INLIST; 203a9643ea8Slogwang _type = type; 204a9643ea8Slogwang _state = INITIAL; 205a9643ea8Slogwang _start = NULL; 206a9643ea8Slogwang _args = NULL; 207a9643ea8Slogwang _parent = NULL; 208a9643ea8Slogwang } 209a9643ea8Slogwang 210a9643ea8Slogwang void MicroThread::CleanState() 211a9643ea8Slogwang { 212a9643ea8Slogwang TAILQ_INIT(&_fdset); 213a9643ea8Slogwang TAILQ_INIT(&_sub_list); 214a9643ea8Slogwang _flag = NOT_INLIST; 215a9643ea8Slogwang _type = NORMAL; 216a9643ea8Slogwang _state = INITIAL; 217a9643ea8Slogwang _start = NULL; 218a9643ea8Slogwang _args = NULL; 219a9643ea8Slogwang _parent = NULL; 220a9643ea8Slogwang } 221a9643ea8Slogwang 222a9643ea8Slogwang void MicroThread::Run() 223a9643ea8Slogwang { 224a9643ea8Slogwang if (_start) { 225a9643ea8Slogwang _start(_args); 226a9643ea8Slogwang } 227a9643ea8Slogwang 228a9643ea8Slogwang if (this->IsSubThread()) { 229a9643ea8Slogwang this->WakeupParent(); 230a9643ea8Slogwang } 231a9643ea8Slogwang 232a9643ea8Slogwang ScheduleObj::Instance()->ScheduleReclaim(); 233a9643ea8Slogwang ScheduleObj::Instance()->ScheduleThread(); 234a9643ea8Slogwang } 235a9643ea8Slogwang 236a9643ea8Slogwang void MicroThread::WakeupParent() 237a9643ea8Slogwang { 238a9643ea8Slogwang MicroThread* parent = this->GetParent(); 239a9643ea8Slogwang if (parent) 240a9643ea8Slogwang { 241a9643ea8Slogwang parent->RemoveSubThread(this); 242a9643ea8Slogwang if (parent->HasNoSubThread()) 243a9643ea8Slogwang { 244a9643ea8Slogwang ScheduleObj::Instance()->ScheduleUnpend(parent); 245a9643ea8Slogwang } 246a9643ea8Slogwang } 247a9643ea8Slogwang else 248a9643ea8Slogwang { 249a9643ea8Slogwang MTLOG_ERROR("Sub thread no parent, error"); 250a9643ea8Slogwang } 251a9643ea8Slogwang } 252a9643ea8Slogwang 253a9643ea8Slogwang bool MicroThread::HasNoSubThread() 254a9643ea8Slogwang { 255a9643ea8Slogwang return TAILQ_EMPTY(&_sub_list); 256a9643ea8Slogwang } 257a9643ea8Slogwang 258a9643ea8Slogwang void MicroThread::AddSubThread(MicroThread* sub) 259a9643ea8Slogwang { 260a9643ea8Slogwang ASSERT(!sub->HasFlag(MicroThread::SUB_LIST)); 261a9643ea8Slogwang if (!sub->HasFlag(MicroThread::SUB_LIST)) 262a9643ea8Slogwang { 263a9643ea8Slogwang TAILQ_INSERT_TAIL(&_sub_list, sub, _sub_entry); 264a9643ea8Slogwang sub->_parent = this; 265a9643ea8Slogwang } 266a9643ea8Slogwang 267a9643ea8Slogwang sub->SetFlag(MicroThread::SUB_LIST); 268a9643ea8Slogwang } 269a9643ea8Slogwang 270a9643ea8Slogwang void MicroThread::RemoveSubThread(MicroThread* sub) 271a9643ea8Slogwang { 272a9643ea8Slogwang ASSERT(sub->HasFlag(MicroThread::SUB_LIST)); 273a9643ea8Slogwang if (sub->HasFlag(MicroThread::SUB_LIST)) 274a9643ea8Slogwang { 275a9643ea8Slogwang TAILQ_REMOVE(&_sub_list, sub, _sub_entry); 276a9643ea8Slogwang sub->_parent = NULL; 277a9643ea8Slogwang } 278a9643ea8Slogwang 279a9643ea8Slogwang sub->UnsetFlag(MicroThread::SUB_LIST); 280a9643ea8Slogwang } 281a9643ea8Slogwang 282*5ac59bc4Slogwang ScheduleObj *ScheduleObj::_instance = NULL; 283a9643ea8Slogwang inline ScheduleObj* ScheduleObj::Instance() 284a9643ea8Slogwang { 285a9643ea8Slogwang if (NULL == _instance) 286a9643ea8Slogwang { 287a9643ea8Slogwang _instance = new ScheduleObj(); 288a9643ea8Slogwang } 289a9643ea8Slogwang 290a9643ea8Slogwang return _instance; 291a9643ea8Slogwang } 292a9643ea8Slogwang 293a9643ea8Slogwang void ScheduleObj::ScheduleThread() 294a9643ea8Slogwang { 295a9643ea8Slogwang MtFrame* frame = MtFrame::Instance(); 296a9643ea8Slogwang frame->ThreadSchdule(); 297a9643ea8Slogwang } 298a9643ea8Slogwang 299a9643ea8Slogwang utime64_t ScheduleObj::ScheduleGetTime() 300a9643ea8Slogwang { 301a9643ea8Slogwang MtFrame* frame = MtFrame::Instance(); 302a9643ea8Slogwang if (frame) 303a9643ea8Slogwang { 304a9643ea8Slogwang return frame->GetLastClock(); 305a9643ea8Slogwang } 306a9643ea8Slogwang else 307a9643ea8Slogwang { 308a9643ea8Slogwang MTLOG_ERROR("frame time failed, maybe not init"); 309a9643ea8Slogwang return 0; 310a9643ea8Slogwang } 311a9643ea8Slogwang } 312a9643ea8Slogwang 313a9643ea8Slogwang void ScheduleObj::ScheduleSleep() 314a9643ea8Slogwang { 315a9643ea8Slogwang MtFrame* frame = MtFrame::Instance(); 316a9643ea8Slogwang MicroThread* thread = frame->GetActiveThread(); 317a9643ea8Slogwang if ((!frame) || (!thread)) { 318a9643ea8Slogwang MTLOG_ERROR("frame and act thread null, %p, %p", frame, thread); 319a9643ea8Slogwang return; 320a9643ea8Slogwang } 321a9643ea8Slogwang 322a9643ea8Slogwang frame->InsertSleep(thread); 323a9643ea8Slogwang frame->ThreadSchdule(); 324a9643ea8Slogwang } 325a9643ea8Slogwang 326a9643ea8Slogwang void ScheduleObj::SchedulePend() 327a9643ea8Slogwang { 328a9643ea8Slogwang MtFrame* frame = MtFrame::Instance(); 329a9643ea8Slogwang MicroThread* thread = frame->GetActiveThread(); 330a9643ea8Slogwang if ((!frame) || (!thread)) { 331a9643ea8Slogwang MTLOG_ERROR("frame and act thread null, %p, %p", frame, thread); 332a9643ea8Slogwang return; 333a9643ea8Slogwang } 334a9643ea8Slogwang 335a9643ea8Slogwang frame->InsertPend(thread); 336a9643ea8Slogwang frame->ThreadSchdule(); 337a9643ea8Slogwang } 338a9643ea8Slogwang 339a9643ea8Slogwang void ScheduleObj::ScheduleUnpend(void* pthread) 340a9643ea8Slogwang { 341a9643ea8Slogwang MtFrame* frame = MtFrame::Instance(); 342a9643ea8Slogwang MicroThread* thread = (MicroThread*)pthread; 343a9643ea8Slogwang if ((!frame) || (!thread)) { 344a9643ea8Slogwang MTLOG_ERROR("frame and act thread null, %p, %p", frame, thread); 345a9643ea8Slogwang return; 346a9643ea8Slogwang } 347a9643ea8Slogwang 348a9643ea8Slogwang frame->RemovePend(thread); 349a9643ea8Slogwang frame->InsertRunable(thread); 350a9643ea8Slogwang } 351a9643ea8Slogwang 352a9643ea8Slogwang void ScheduleObj::ScheduleReclaim() 353a9643ea8Slogwang { 354a9643ea8Slogwang MtFrame* frame = MtFrame::Instance(); 355a9643ea8Slogwang MicroThread* thread = frame->GetActiveThread(); 356a9643ea8Slogwang if ((!frame) || (!thread)) { 357a9643ea8Slogwang MTLOG_ERROR("frame and act thread null, %p, %p", frame, thread); 358a9643ea8Slogwang return; 359a9643ea8Slogwang } 360a9643ea8Slogwang 361a9643ea8Slogwang frame->FreeThread(thread); 362a9643ea8Slogwang } 363a9643ea8Slogwang 364a9643ea8Slogwang void ScheduleObj::ScheduleStartRun() 365a9643ea8Slogwang { 366a9643ea8Slogwang MtFrame* frame = MtFrame::Instance(); 367a9643ea8Slogwang MicroThread* thread = frame->GetActiveThread(); 368a9643ea8Slogwang if ((!frame) || (!thread)) { 369a9643ea8Slogwang MTLOG_ERROR("frame and act thread null, %p, %p", frame, thread); 370a9643ea8Slogwang return; 371a9643ea8Slogwang } 372a9643ea8Slogwang 373a9643ea8Slogwang thread->Run(); 374a9643ea8Slogwang } 375a9643ea8Slogwang 376a9643ea8Slogwang 377*5ac59bc4Slogwang unsigned int ThreadPool::default_thread_num = DEFAULT_THREAD_NUM; ///< 2000 micro threads. 378*5ac59bc4Slogwang unsigned int ThreadPool::default_stack_size = DEFAULT_STACK_SIZE; ///< 128k stack. 379a9643ea8Slogwang 380a9643ea8Slogwang bool ThreadPool::InitialPool(int max_num) 381a9643ea8Slogwang { 382a9643ea8Slogwang MicroThread *thread = NULL; 383a9643ea8Slogwang for (unsigned int i = 0; i < default_thread_num; i++) 384a9643ea8Slogwang { 385a9643ea8Slogwang thread = new MicroThread(); 386a9643ea8Slogwang if ((NULL == thread) || (false == thread->Initial())) 387a9643ea8Slogwang { 388a9643ea8Slogwang MTLOG_ERROR("init pool, thread %p init failed", thread); 389a9643ea8Slogwang if (thread) delete thread; 390a9643ea8Slogwang continue; 391a9643ea8Slogwang } 392a9643ea8Slogwang thread->SetFlag(MicroThread::FREE_LIST); 393a9643ea8Slogwang _freelist.push(thread); 394a9643ea8Slogwang } 395a9643ea8Slogwang 396a9643ea8Slogwang _total_num = _freelist.size(); 397a9643ea8Slogwang _max_num = max_num; 398a9643ea8Slogwang _use_num = 0; 399a9643ea8Slogwang if (_total_num <= 0) 400a9643ea8Slogwang { 401a9643ea8Slogwang return false; 402a9643ea8Slogwang } 403a9643ea8Slogwang else 404a9643ea8Slogwang { 405a9643ea8Slogwang return true; 406a9643ea8Slogwang } 407a9643ea8Slogwang } 408a9643ea8Slogwang 409a9643ea8Slogwang void ThreadPool::DestroyPool() 410a9643ea8Slogwang { 411a9643ea8Slogwang MicroThread* thread = NULL; 412a9643ea8Slogwang while (!_freelist.empty()) 413a9643ea8Slogwang { 414a9643ea8Slogwang thread = _freelist.front(); 415a9643ea8Slogwang _freelist.pop(); 416a9643ea8Slogwang thread->Destroy(); 417a9643ea8Slogwang delete thread; 418a9643ea8Slogwang } 419a9643ea8Slogwang 420a9643ea8Slogwang _total_num = 0; 421a9643ea8Slogwang _use_num = 0; 422a9643ea8Slogwang } 423a9643ea8Slogwang 424a9643ea8Slogwang MicroThread* ThreadPool::AllocThread() 425a9643ea8Slogwang { 426*5ac59bc4Slogwang MT_ATTR_API_SET(492069, _total_num); 427a9643ea8Slogwang 428a9643ea8Slogwang MicroThread* thread = NULL; 429a9643ea8Slogwang if (!_freelist.empty()) 430a9643ea8Slogwang { 431a9643ea8Slogwang thread = _freelist.front(); 432a9643ea8Slogwang _freelist.pop(); 433a9643ea8Slogwang 434a9643ea8Slogwang ASSERT(thread->HasFlag(MicroThread::FREE_LIST)); 435a9643ea8Slogwang 436a9643ea8Slogwang thread->UnsetFlag(MicroThread::FREE_LIST); 437a9643ea8Slogwang _use_num++; 438a9643ea8Slogwang return thread; 439a9643ea8Slogwang } 440a9643ea8Slogwang 441a9643ea8Slogwang MT_ATTR_API(320846, 1); // pool no nore 442a9643ea8Slogwang if (_total_num >= _max_num) 443a9643ea8Slogwang { 444a9643ea8Slogwang MT_ATTR_API(361140, 1); // no more quota 445a9643ea8Slogwang return NULL; 446a9643ea8Slogwang } 447a9643ea8Slogwang 448a9643ea8Slogwang thread = new MicroThread(); 449a9643ea8Slogwang if ((NULL == thread) || (false == thread->Initial())) 450a9643ea8Slogwang { 451a9643ea8Slogwang MT_ATTR_API(320847, 1); // pool init fail 452a9643ea8Slogwang MTLOG_ERROR("thread alloc failed, thread: %p", thread); 453a9643ea8Slogwang if (thread) delete thread; 454a9643ea8Slogwang return NULL; 455a9643ea8Slogwang } 456a9643ea8Slogwang _total_num++; 457a9643ea8Slogwang _use_num++; 458a9643ea8Slogwang 459a9643ea8Slogwang return thread; 460a9643ea8Slogwang } 461a9643ea8Slogwang 462a9643ea8Slogwang void ThreadPool::FreeThread(MicroThread* thread) 463a9643ea8Slogwang { 464a9643ea8Slogwang ASSERT(!thread->HasFlag(MicroThread::FREE_LIST)); 465a9643ea8Slogwang thread->Reset(); 466a9643ea8Slogwang _use_num--; 467a9643ea8Slogwang _freelist.push(thread); 468a9643ea8Slogwang thread->SetFlag(MicroThread::FREE_LIST); 469a9643ea8Slogwang 470a9643ea8Slogwang unsigned int free_num = _freelist.size(); 471a9643ea8Slogwang if ((free_num > default_thread_num) && (free_num > 1)) 472a9643ea8Slogwang { 473a9643ea8Slogwang thread = _freelist.front(); 474a9643ea8Slogwang _freelist.pop(); 475a9643ea8Slogwang thread->Destroy(); 476a9643ea8Slogwang delete thread; 477a9643ea8Slogwang _total_num--; 478a9643ea8Slogwang } 479a9643ea8Slogwang } 480a9643ea8Slogwang 481a9643ea8Slogwang int ThreadPool::GetUsedNum(void) 482a9643ea8Slogwang { 483a9643ea8Slogwang return _use_num; 484a9643ea8Slogwang } 485a9643ea8Slogwang 486a9643ea8Slogwang MtFrame *MtFrame::_instance = NULL; 487a9643ea8Slogwang inline MtFrame* MtFrame::Instance () 488a9643ea8Slogwang { 489a9643ea8Slogwang if (NULL == _instance ) 490a9643ea8Slogwang { 491a9643ea8Slogwang _instance = new MtFrame(); 492a9643ea8Slogwang } 493a9643ea8Slogwang 494a9643ea8Slogwang return _instance; 495a9643ea8Slogwang } 496a9643ea8Slogwang 497a9643ea8Slogwang void MtFrame::SetHookFlag() { 498a9643ea8Slogwang mt_set_hook_flag(); 499a9643ea8Slogwang }; 500a9643ea8Slogwang 501a9643ea8Slogwang bool MtFrame::InitFrame(LogAdapter* logadpt, int max_thread_num) 502a9643ea8Slogwang { 503a9643ea8Slogwang _log_adpt = logadpt; 504a9643ea8Slogwang 505a9643ea8Slogwang if ((this->InitKqueue(max_thread_num) < 0) || !this->InitialPool(max_thread_num)) 506a9643ea8Slogwang { 507a9643ea8Slogwang MTLOG_ERROR("Init epoll or thread pool failed"); 508a9643ea8Slogwang this->Destroy(); 509a9643ea8Slogwang return false; 510a9643ea8Slogwang } 511a9643ea8Slogwang if (_sleeplist.HeapResize(max_thread_num * 2) < 0) 512a9643ea8Slogwang { 513a9643ea8Slogwang MTLOG_ERROR("Init heap list failed"); 514a9643ea8Slogwang this->Destroy(); 515a9643ea8Slogwang return false; 516a9643ea8Slogwang } 517a9643ea8Slogwang 518a9643ea8Slogwang _timer = new CTimerMng(max_thread_num * 2); 519a9643ea8Slogwang if (NULL == _timer) 520a9643ea8Slogwang { 521a9643ea8Slogwang MTLOG_ERROR("Init heap timer failed"); 522a9643ea8Slogwang this->Destroy(); 523a9643ea8Slogwang return false; 524a9643ea8Slogwang } 525a9643ea8Slogwang 526a9643ea8Slogwang _daemon = AllocThread(); 527a9643ea8Slogwang if (NULL == _daemon) 528a9643ea8Slogwang { 529a9643ea8Slogwang MTLOG_ERROR("Alloc daemon thread failed"); 530a9643ea8Slogwang this->Destroy(); 531a9643ea8Slogwang return false; 532a9643ea8Slogwang } 533a9643ea8Slogwang _daemon->SetType(MicroThread::DAEMON); 534a9643ea8Slogwang _daemon->SetState(MicroThread::RUNABLE); 535a9643ea8Slogwang _daemon->SetSartFunc(MtFrame::DaemonRun, this); 536a9643ea8Slogwang 537a9643ea8Slogwang _primo = new MicroThread(MicroThread::PRIMORDIAL); 538a9643ea8Slogwang if (NULL == _primo) 539a9643ea8Slogwang { 540a9643ea8Slogwang MTLOG_ERROR("new _primo thread failed"); 541a9643ea8Slogwang this->Destroy(); 542a9643ea8Slogwang return false; 543a9643ea8Slogwang } 544a9643ea8Slogwang _primo->SetState(MicroThread::RUNNING); 545a9643ea8Slogwang SetActiveThread(_primo); 546a9643ea8Slogwang 547a9643ea8Slogwang _last_clock = GetSystemMS(); 548a9643ea8Slogwang TAILQ_INIT(&_iolist); 549a9643ea8Slogwang TAILQ_INIT(&_pend_list); 550a9643ea8Slogwang 551a9643ea8Slogwang //SetHookFlag(); 552a9643ea8Slogwang 553a9643ea8Slogwang return true; 554a9643ea8Slogwang 555a9643ea8Slogwang } 556a9643ea8Slogwang 557a9643ea8Slogwang void MtFrame::Destroy(void) 558a9643ea8Slogwang { 559a9643ea8Slogwang if (NULL == _instance ) 560a9643ea8Slogwang { 561a9643ea8Slogwang return; 562a9643ea8Slogwang } 563a9643ea8Slogwang 564a9643ea8Slogwang if (_primo) { 565a9643ea8Slogwang delete _primo; 566a9643ea8Slogwang _primo = NULL; 567a9643ea8Slogwang } 568a9643ea8Slogwang 569a9643ea8Slogwang if (_daemon) { 570a9643ea8Slogwang FreeThread(_daemon); 571a9643ea8Slogwang _daemon = NULL; 572a9643ea8Slogwang } 573a9643ea8Slogwang 574a9643ea8Slogwang TAILQ_INIT(&_iolist); 575a9643ea8Slogwang 576a9643ea8Slogwang MicroThread* thread = dynamic_cast<MicroThread*>(_sleeplist.HeapPop()); 577a9643ea8Slogwang while (thread) 578a9643ea8Slogwang { 579a9643ea8Slogwang FreeThread(thread); 580a9643ea8Slogwang thread = dynamic_cast<MicroThread*>(_sleeplist.HeapPop()); 581a9643ea8Slogwang } 582a9643ea8Slogwang 583a9643ea8Slogwang while (!_runlist.empty()) 584a9643ea8Slogwang { 585a9643ea8Slogwang thread = _runlist.front(); 586a9643ea8Slogwang _runlist.pop(); 587a9643ea8Slogwang FreeThread(thread); 588a9643ea8Slogwang } 589a9643ea8Slogwang 590a9643ea8Slogwang MicroThread* tmp; 591a9643ea8Slogwang TAILQ_FOREACH_SAFE(thread, &_pend_list, _entry, tmp) 592a9643ea8Slogwang { 593a9643ea8Slogwang TAILQ_REMOVE(&_pend_list, thread, _entry); 594a9643ea8Slogwang FreeThread(thread); 595a9643ea8Slogwang } 596a9643ea8Slogwang 597a9643ea8Slogwang if (_timer != NULL) 598a9643ea8Slogwang { 599a9643ea8Slogwang delete _timer; 600a9643ea8Slogwang _timer = NULL; 601a9643ea8Slogwang } 602a9643ea8Slogwang 603a9643ea8Slogwang _instance->DestroyPool(); 604a9643ea8Slogwang _instance->TermKqueue(); 605a9643ea8Slogwang delete _instance; 606a9643ea8Slogwang _instance = NULL; 607a9643ea8Slogwang } 608a9643ea8Slogwang 609a9643ea8Slogwang char* MtFrame::Version() 610a9643ea8Slogwang { 611a9643ea8Slogwang return IMT_VERSION; 612a9643ea8Slogwang } 613a9643ea8Slogwang 614a9643ea8Slogwang MicroThread* MtFrame::CreateThread(ThreadStart entry, void *args, bool runable) 615a9643ea8Slogwang { 616a9643ea8Slogwang MtFrame* mtframe = MtFrame::Instance(); 617a9643ea8Slogwang MicroThread* thread = mtframe->AllocThread(); 618a9643ea8Slogwang if (NULL == thread) 619a9643ea8Slogwang { 620a9643ea8Slogwang MTLOG_ERROR("create thread failed"); 621a9643ea8Slogwang return NULL; 622a9643ea8Slogwang } 623a9643ea8Slogwang thread->SetSartFunc(entry, args); 624a9643ea8Slogwang 625a9643ea8Slogwang if (runable) { 626a9643ea8Slogwang mtframe->InsertRunable(thread); 627a9643ea8Slogwang } 628a9643ea8Slogwang 629a9643ea8Slogwang return thread; 630a9643ea8Slogwang } 631a9643ea8Slogwang 632a9643ea8Slogwang int MtFrame::Loop(void* args) 633a9643ea8Slogwang { 634a9643ea8Slogwang MtFrame* mtframe = MtFrame::Instance(); 635a9643ea8Slogwang MicroThread* daemon = mtframe->DaemonThread(); 636a9643ea8Slogwang 637a9643ea8Slogwang mtframe->KqueueDispatch(); 638a9643ea8Slogwang mtframe->SetLastClock(mtframe->GetSystemMS()); 639a9643ea8Slogwang mtframe->WakeupTimeout(); 640a9643ea8Slogwang mtframe->CheckExpired(); 641a9643ea8Slogwang daemon->SwitchContext(); 642a9643ea8Slogwang 643a9643ea8Slogwang return 0; 644a9643ea8Slogwang } 645a9643ea8Slogwang 646a9643ea8Slogwang void MtFrame::DaemonRun(void* args) 647a9643ea8Slogwang { 648a9643ea8Slogwang /* 649a9643ea8Slogwang MtFrame* mtframe = MtFrame::Instance(); 650a9643ea8Slogwang MicroThread* daemon = mtframe->DaemonThread(); 651a9643ea8Slogwang 652a9643ea8Slogwang while (true) { 653a9643ea8Slogwang mtframe->KqueueDispatch(); 654a9643ea8Slogwang mtframe->SetLastClock(mtframe->GetSystemMS()); 655a9643ea8Slogwang mtframe->WakeupTimeout(); 656a9643ea8Slogwang mtframe->CheckExpired(); 657a9643ea8Slogwang daemon->SwitchContext(); 658a9643ea8Slogwang } 659a9643ea8Slogwang */ 660a9643ea8Slogwang ff_run(MtFrame::Loop, NULL); 661a9643ea8Slogwang } 662a9643ea8Slogwang 663a9643ea8Slogwang MicroThread *MtFrame::GetRootThread() 664a9643ea8Slogwang { 665a9643ea8Slogwang if (NULL == _curr_thread) 666a9643ea8Slogwang { 667a9643ea8Slogwang return NULL; 668a9643ea8Slogwang } 669a9643ea8Slogwang 670a9643ea8Slogwang MicroThread::ThreadType type = _curr_thread->GetType(); 671a9643ea8Slogwang MicroThread *thread = _curr_thread; 672a9643ea8Slogwang MicroThread *parent = thread; 673a9643ea8Slogwang 674a9643ea8Slogwang while (MicroThread::SUB_THREAD == type) 675a9643ea8Slogwang { 676a9643ea8Slogwang thread = thread->GetParent(); 677a9643ea8Slogwang if (!thread) 678a9643ea8Slogwang { 679a9643ea8Slogwang break; 680a9643ea8Slogwang } 681a9643ea8Slogwang 682a9643ea8Slogwang type = thread->GetType(); 683a9643ea8Slogwang parent = thread; 684a9643ea8Slogwang } 685a9643ea8Slogwang 686a9643ea8Slogwang return parent; 687a9643ea8Slogwang } 688a9643ea8Slogwang 689a9643ea8Slogwang void MtFrame::ThreadSchdule() 690a9643ea8Slogwang { 691a9643ea8Slogwang MicroThread* thread = NULL; 692a9643ea8Slogwang MtFrame* mtframe = MtFrame::Instance(); 693a9643ea8Slogwang 694a9643ea8Slogwang if (mtframe->_runlist.empty()) 695a9643ea8Slogwang { 696a9643ea8Slogwang thread = mtframe->DaemonThread(); 697a9643ea8Slogwang } 698a9643ea8Slogwang else 699a9643ea8Slogwang { 700a9643ea8Slogwang thread = mtframe->_runlist.front(); 701a9643ea8Slogwang mtframe->RemoveRunable(thread); 702a9643ea8Slogwang } 703a9643ea8Slogwang 704a9643ea8Slogwang this->SetActiveThread(thread); 705a9643ea8Slogwang thread->SetState(MicroThread::RUNNING); 706a9643ea8Slogwang thread->RestoreContext(); 707a9643ea8Slogwang } 708a9643ea8Slogwang 709a9643ea8Slogwang void MtFrame::CheckExpired() 710a9643ea8Slogwang { 711a9643ea8Slogwang static utime64_t check_time = 0; 712a9643ea8Slogwang 713a9643ea8Slogwang if (_timer != NULL) 714a9643ea8Slogwang { 715a9643ea8Slogwang _timer->check_expired(); 716a9643ea8Slogwang } 717a9643ea8Slogwang 718a9643ea8Slogwang utime64_t now = GetLastClock(); 719a9643ea8Slogwang 720a9643ea8Slogwang if ((now - check_time) > 1000) 721a9643ea8Slogwang { 722a9643ea8Slogwang CNetMgr::Instance()->RecycleObjs(now); 723a9643ea8Slogwang check_time = now; 724a9643ea8Slogwang } 725a9643ea8Slogwang } 726a9643ea8Slogwang 727a9643ea8Slogwang void MtFrame::WakeupTimeout() 728a9643ea8Slogwang { 729a9643ea8Slogwang utime64_t now = GetLastClock(); 730a9643ea8Slogwang MicroThread* thread = dynamic_cast<MicroThread*>(_sleeplist.HeapTop()); 731a9643ea8Slogwang while (thread && (thread->GetWakeupTime() <= now)) 732a9643ea8Slogwang { 733a9643ea8Slogwang if (thread->HasFlag(MicroThread::IO_LIST)) 734a9643ea8Slogwang { 735a9643ea8Slogwang RemoveIoWait(thread); 736a9643ea8Slogwang } 737a9643ea8Slogwang else 738a9643ea8Slogwang { 739a9643ea8Slogwang RemoveSleep(thread); 740a9643ea8Slogwang } 741a9643ea8Slogwang 742a9643ea8Slogwang InsertRunable(thread); 743a9643ea8Slogwang 744a9643ea8Slogwang thread = dynamic_cast<MicroThread*>(_sleeplist.HeapTop()); 745a9643ea8Slogwang } 746a9643ea8Slogwang } 747a9643ea8Slogwang 748a9643ea8Slogwang int MtFrame::KqueueGetTimeout() 749a9643ea8Slogwang { 750a9643ea8Slogwang utime64_t now = GetLastClock(); 751a9643ea8Slogwang MicroThread* thread = dynamic_cast<MicroThread*>(_sleeplist.HeapTop()); 752a9643ea8Slogwang if (!thread) 753a9643ea8Slogwang { 754*5ac59bc4Slogwang return 10; //default 10ms epollwait 755a9643ea8Slogwang } 756a9643ea8Slogwang else if (thread->GetWakeupTime() < now) 757a9643ea8Slogwang { 758a9643ea8Slogwang return 0; 759a9643ea8Slogwang } 760a9643ea8Slogwang else 761a9643ea8Slogwang { 762a9643ea8Slogwang return (int)(thread->GetWakeupTime() - now); 763a9643ea8Slogwang } 764a9643ea8Slogwang } 765a9643ea8Slogwang 766a9643ea8Slogwang inline void MtFrame::InsertSleep(MicroThread* thread) 767a9643ea8Slogwang { 768a9643ea8Slogwang ASSERT(!thread->HasFlag(MicroThread::SLEEP_LIST)); 769a9643ea8Slogwang 770a9643ea8Slogwang thread->SetFlag(MicroThread::SLEEP_LIST); 771a9643ea8Slogwang thread->SetState(MicroThread::SLEEPING); 772a9643ea8Slogwang int rc = _sleeplist.HeapPush(thread); 773a9643ea8Slogwang if (rc < 0) 774a9643ea8Slogwang { 775a9643ea8Slogwang MT_ATTR_API(320848, 1); // heap error 776a9643ea8Slogwang MTLOG_ERROR("Insert heap failed , rc %d", rc); 777a9643ea8Slogwang } 778a9643ea8Slogwang } 779a9643ea8Slogwang 780a9643ea8Slogwang inline void MtFrame::RemoveSleep(MicroThread* thread) 781a9643ea8Slogwang { 782a9643ea8Slogwang ASSERT(thread->HasFlag(MicroThread::SLEEP_LIST)); 783a9643ea8Slogwang thread->UnsetFlag(MicroThread::SLEEP_LIST); 784a9643ea8Slogwang 785a9643ea8Slogwang int rc = _sleeplist.HeapDelete(thread); 786a9643ea8Slogwang if (rc < 0) 787a9643ea8Slogwang { 788a9643ea8Slogwang MT_ATTR_API(320849, 1); // heap error 789a9643ea8Slogwang MTLOG_ERROR("remove heap failed , rc %d", rc); 790a9643ea8Slogwang } 791a9643ea8Slogwang } 792a9643ea8Slogwang 793a9643ea8Slogwang inline void MtFrame::InsertIoWait(MicroThread* thread) 794a9643ea8Slogwang { 795a9643ea8Slogwang ASSERT(!thread->HasFlag(MicroThread::IO_LIST)); 796a9643ea8Slogwang thread->SetFlag(MicroThread::IO_LIST); 797a9643ea8Slogwang TAILQ_INSERT_TAIL(&_iolist, thread, _entry); 798a9643ea8Slogwang InsertSleep(thread); 799a9643ea8Slogwang } 800a9643ea8Slogwang 801a9643ea8Slogwang void MtFrame::RemoveIoWait(MicroThread* thread) 802a9643ea8Slogwang { 803a9643ea8Slogwang ASSERT(thread->HasFlag(MicroThread::IO_LIST)); 804a9643ea8Slogwang thread->UnsetFlag(MicroThread::IO_LIST); 805a9643ea8Slogwang TAILQ_REMOVE(&_iolist, thread, _entry); 806a9643ea8Slogwang 807a9643ea8Slogwang RemoveSleep(thread); 808a9643ea8Slogwang } 809a9643ea8Slogwang 810a9643ea8Slogwang void MtFrame::InsertRunable(MicroThread* thread) 811a9643ea8Slogwang { 812a9643ea8Slogwang ASSERT(!thread->HasFlag(MicroThread::RUN_LIST)); 813a9643ea8Slogwang thread->SetFlag(MicroThread::RUN_LIST); 814a9643ea8Slogwang 815a9643ea8Slogwang thread->SetState(MicroThread::RUNABLE); 816a9643ea8Slogwang _runlist.push(thread); 817a9643ea8Slogwang _waitnum++; 818a9643ea8Slogwang } 819a9643ea8Slogwang 820a9643ea8Slogwang inline void MtFrame::RemoveRunable(MicroThread* thread) 821a9643ea8Slogwang { 822a9643ea8Slogwang ASSERT(thread->HasFlag(MicroThread::RUN_LIST)); 823a9643ea8Slogwang ASSERT(thread == _runlist.front()); 824a9643ea8Slogwang thread->UnsetFlag(MicroThread::RUN_LIST); 825a9643ea8Slogwang 826a9643ea8Slogwang _runlist.pop(); 827a9643ea8Slogwang _waitnum--; 828a9643ea8Slogwang } 829a9643ea8Slogwang 830a9643ea8Slogwang void MtFrame::InsertPend(MicroThread* thread) 831a9643ea8Slogwang { 832a9643ea8Slogwang ASSERT(!thread->HasFlag(MicroThread::PEND_LIST)); 833a9643ea8Slogwang thread->SetFlag(MicroThread::PEND_LIST); 834a9643ea8Slogwang TAILQ_INSERT_TAIL(&_pend_list, thread, _entry); 835a9643ea8Slogwang thread->SetState(MicroThread::PENDING); 836a9643ea8Slogwang } 837a9643ea8Slogwang 838a9643ea8Slogwang void MtFrame::RemovePend(MicroThread* thread) 839a9643ea8Slogwang { 840a9643ea8Slogwang ASSERT(thread->HasFlag(MicroThread::PEND_LIST)); 841a9643ea8Slogwang thread->UnsetFlag(MicroThread::PEND_LIST); 842a9643ea8Slogwang TAILQ_REMOVE(&_pend_list, thread, _entry); 843a9643ea8Slogwang } 844a9643ea8Slogwang 845a9643ea8Slogwang void MtFrame::WaitNotify(utime64_t timeout) 846a9643ea8Slogwang { 847a9643ea8Slogwang MicroThread* thread = GetActiveThread(); 848a9643ea8Slogwang 849a9643ea8Slogwang thread->SetWakeupTime(timeout + this->GetLastClock()); 850a9643ea8Slogwang this->InsertIoWait(thread); 851a9643ea8Slogwang thread->SwitchContext(); 852a9643ea8Slogwang } 853a9643ea8Slogwang 854a9643ea8Slogwang bool MtFrame::KqueueSchedule(KqObjList* fdlist, KqueuerObj* fd, int timeout) 855a9643ea8Slogwang { 856a9643ea8Slogwang MicroThread* thread = GetActiveThread(); 857a9643ea8Slogwang if (NULL == thread) 858a9643ea8Slogwang { 859a9643ea8Slogwang MTLOG_ERROR("active thread null, epoll schedule failed"); 860a9643ea8Slogwang return false; 861a9643ea8Slogwang } 862a9643ea8Slogwang 863a9643ea8Slogwang thread->ClearAllFd(); 864a9643ea8Slogwang if (fdlist) 865a9643ea8Slogwang { 866a9643ea8Slogwang thread->AddFdList(fdlist); 867a9643ea8Slogwang } 868a9643ea8Slogwang if (fd) 869a9643ea8Slogwang { 870a9643ea8Slogwang thread->AddFd(fd); 871a9643ea8Slogwang } 872a9643ea8Slogwang 873a9643ea8Slogwang thread->SetWakeupTime(timeout + this->GetLastClock()); 874a9643ea8Slogwang if (!this->KqueueAdd(thread->GetFdSet())) 875a9643ea8Slogwang { 876a9643ea8Slogwang MTLOG_ERROR("epoll add failed, errno: %d", errno); 877a9643ea8Slogwang return false; 878a9643ea8Slogwang } 879a9643ea8Slogwang this->InsertIoWait(thread); 880a9643ea8Slogwang thread->SwitchContext(); 881a9643ea8Slogwang 882a9643ea8Slogwang int rcvnum = 0; 883a9643ea8Slogwang KqObjList& rcvfds = thread->GetFdSet(); 884a9643ea8Slogwang KqueuerObj* fdata = NULL; 885a9643ea8Slogwang TAILQ_FOREACH(fdata, &rcvfds, _entry) 886a9643ea8Slogwang { 887a9643ea8Slogwang if (fdata->GetRcvEvents() != 0) 888a9643ea8Slogwang { 889a9643ea8Slogwang rcvnum++; 890a9643ea8Slogwang } 891a9643ea8Slogwang } 892*5ac59bc4Slogwang this->KqueueDel(rcvfds); 893a9643ea8Slogwang 894*5ac59bc4Slogwang if (rcvnum == 0) 895a9643ea8Slogwang { 896a9643ea8Slogwang errno = ETIME; 897a9643ea8Slogwang return false; 898a9643ea8Slogwang } 899a9643ea8Slogwang 900a9643ea8Slogwang return true; 901a9643ea8Slogwang } 902a9643ea8Slogwang 903a9643ea8Slogwang int MtFrame::recvfrom(int fd, void *buf, int len, int flags, struct sockaddr *from, socklen_t *fromlen, int timeout) 904a9643ea8Slogwang { 905a9643ea8Slogwang MtFrame* mtframe = MtFrame::Instance(); 906a9643ea8Slogwang utime64_t start = mtframe->GetLastClock(); 907a9643ea8Slogwang MicroThread* thread = mtframe->GetActiveThread(); 908a9643ea8Slogwang utime64_t now = 0; 909a9643ea8Slogwang 910a9643ea8Slogwang if(fd<0 || !buf || len<1) 911a9643ea8Slogwang { 912a9643ea8Slogwang errno = EINVAL; 913a9643ea8Slogwang MTLOG_ERROR("recvfrom failed, errno: %d (%m)", errno); 914a9643ea8Slogwang return -10; 915a9643ea8Slogwang } 916a9643ea8Slogwang 917a9643ea8Slogwang if (timeout <= -1) 918a9643ea8Slogwang { 919a9643ea8Slogwang timeout = 0x7fffffff; 920a9643ea8Slogwang } 921a9643ea8Slogwang 922a9643ea8Slogwang while (true) 923a9643ea8Slogwang { 924a9643ea8Slogwang now = mtframe->GetLastClock(); 925a9643ea8Slogwang if ((int)(now - start) > timeout) 926a9643ea8Slogwang { 927a9643ea8Slogwang errno = ETIME; 928a9643ea8Slogwang return -1; 929a9643ea8Slogwang } 930a9643ea8Slogwang 931a9643ea8Slogwang KqueuerObj epfd; 932a9643ea8Slogwang epfd.SetOsfd(fd); 933a9643ea8Slogwang epfd.EnableInput(); 934a9643ea8Slogwang epfd.SetOwnerThread(thread); 935a9643ea8Slogwang if (!mtframe->KqueueSchedule(NULL, &epfd, timeout)) 936a9643ea8Slogwang { 937a9643ea8Slogwang MTLOG_DEBUG("epoll schedule failed, errno: %d", errno); 938a9643ea8Slogwang return -2; 939a9643ea8Slogwang } 940a9643ea8Slogwang 941a9643ea8Slogwang mt_hook_syscall(recvfrom); 942a9643ea8Slogwang int n = ff_hook_recvfrom(fd, buf, len, flags, from, fromlen); 943a9643ea8Slogwang if (n < 0) 944a9643ea8Slogwang { 945a9643ea8Slogwang if (errno == EINTR) { 946a9643ea8Slogwang continue; 947a9643ea8Slogwang } 948a9643ea8Slogwang 949a9643ea8Slogwang if ((errno != EAGAIN) && (errno != EWOULDBLOCK)) 950a9643ea8Slogwang { 951a9643ea8Slogwang MTLOG_ERROR("recvfrom failed, errno: %d", errno); 952a9643ea8Slogwang return -3; 953a9643ea8Slogwang } 954a9643ea8Slogwang } 955a9643ea8Slogwang else 956a9643ea8Slogwang { 957a9643ea8Slogwang return n; 958a9643ea8Slogwang } 959a9643ea8Slogwang } 960a9643ea8Slogwang 961a9643ea8Slogwang } 962a9643ea8Slogwang 963a9643ea8Slogwang int MtFrame::sendto(int fd, const void *msg, int len, int flags, const struct sockaddr *to, int tolen, int timeout) 964a9643ea8Slogwang { 965a9643ea8Slogwang MtFrame* mtframe = MtFrame::Instance(); 966a9643ea8Slogwang utime64_t start = mtframe->GetLastClock(); 967a9643ea8Slogwang MicroThread* thread = mtframe->GetActiveThread(); 968a9643ea8Slogwang utime64_t now = 0; 969a9643ea8Slogwang 970a9643ea8Slogwang if(fd<0 || !msg || len<1) 971a9643ea8Slogwang { 972a9643ea8Slogwang errno = EINVAL; 973a9643ea8Slogwang MTLOG_ERROR("sendto failed, errno: %d (%m)", errno); 974a9643ea8Slogwang return -10; 975a9643ea8Slogwang } 976a9643ea8Slogwang 977a9643ea8Slogwang int n = 0; 978a9643ea8Slogwang mt_hook_syscall(sendto); 979a9643ea8Slogwang while ((n = ff_hook_sendto(fd, msg, len, flags, to, tolen)) < 0) 980a9643ea8Slogwang { 981a9643ea8Slogwang now = mtframe->GetLastClock(); 982a9643ea8Slogwang if ((int)(now - start) > timeout) 983a9643ea8Slogwang { 984a9643ea8Slogwang errno = ETIME; 985a9643ea8Slogwang return -1; 986a9643ea8Slogwang } 987a9643ea8Slogwang 988a9643ea8Slogwang if (errno == EINTR) { 989a9643ea8Slogwang continue; 990a9643ea8Slogwang } 991a9643ea8Slogwang 992a9643ea8Slogwang if ((errno != EAGAIN) && (errno != EWOULDBLOCK)) { 993a9643ea8Slogwang MTLOG_ERROR("sendto failed, errno: %d", errno); 994a9643ea8Slogwang return -2; 995a9643ea8Slogwang } 996a9643ea8Slogwang 997a9643ea8Slogwang KqueuerObj epfd; 998a9643ea8Slogwang epfd.SetOsfd(fd); 999a9643ea8Slogwang epfd.EnableOutput(); 1000a9643ea8Slogwang epfd.SetOwnerThread(thread); 1001a9643ea8Slogwang if (!mtframe->KqueueSchedule(NULL, &epfd, timeout)) { 1002a9643ea8Slogwang return -3; 1003a9643ea8Slogwang } 1004a9643ea8Slogwang } 1005a9643ea8Slogwang 1006a9643ea8Slogwang return n; 1007a9643ea8Slogwang } 1008a9643ea8Slogwang 1009a9643ea8Slogwang int MtFrame::connect(int fd, const struct sockaddr *addr, int addrlen, int timeout) 1010a9643ea8Slogwang { 1011a9643ea8Slogwang MtFrame* mtframe = MtFrame::Instance(); 1012a9643ea8Slogwang utime64_t start = mtframe->GetLastClock(); 1013a9643ea8Slogwang MicroThread* thread = mtframe->GetActiveThread(); 1014a9643ea8Slogwang utime64_t now = 0; 1015a9643ea8Slogwang 1016a9643ea8Slogwang if(fd<0 || !addr || addrlen<1) 1017a9643ea8Slogwang { 1018a9643ea8Slogwang errno = EINVAL; 1019a9643ea8Slogwang MTLOG_ERROR("connect failed, errno: %d (%m)", errno); 1020a9643ea8Slogwang return -10; 1021a9643ea8Slogwang } 1022a9643ea8Slogwang 1023a9643ea8Slogwang int n = 0; 1024a9643ea8Slogwang mt_hook_syscall(connect); 1025a9643ea8Slogwang while ((n = ff_hook_connect(fd, addr, addrlen)) < 0) 1026a9643ea8Slogwang { 1027a9643ea8Slogwang now = mtframe->GetLastClock(); 1028a9643ea8Slogwang if ((int)(now - start) > timeout) 1029a9643ea8Slogwang { 1030a9643ea8Slogwang errno = ETIME; 1031a9643ea8Slogwang return -1; 1032a9643ea8Slogwang } 1033a9643ea8Slogwang 1034*5ac59bc4Slogwang if (errno == EISCONN) 1035a9643ea8Slogwang { 1036a9643ea8Slogwang return 0; 1037a9643ea8Slogwang } 1038a9643ea8Slogwang 1039a9643ea8Slogwang if (errno == EINTR) { 1040a9643ea8Slogwang continue; 1041a9643ea8Slogwang } 1042a9643ea8Slogwang 1043a9643ea8Slogwang if (errno != EINPROGRESS) { 1044a9643ea8Slogwang MTLOG_ERROR("connect failed, errno: %d", errno); 1045a9643ea8Slogwang return -2; 1046a9643ea8Slogwang } 1047a9643ea8Slogwang 1048a9643ea8Slogwang KqueuerObj epfd; 1049a9643ea8Slogwang epfd.SetOsfd(fd); 1050a9643ea8Slogwang epfd.EnableOutput(); 1051a9643ea8Slogwang epfd.SetOwnerThread(thread); 1052a9643ea8Slogwang if (!mtframe->KqueueSchedule(NULL, &epfd, timeout)) { 1053a9643ea8Slogwang return -3; 1054a9643ea8Slogwang } 1055a9643ea8Slogwang } 1056a9643ea8Slogwang 1057a9643ea8Slogwang return n; 1058a9643ea8Slogwang } 1059a9643ea8Slogwang 1060a9643ea8Slogwang int MtFrame::accept(int fd, struct sockaddr *addr, socklen_t *addrlen, int timeout) 1061a9643ea8Slogwang { 1062a9643ea8Slogwang MtFrame* mtframe = MtFrame::Instance(); 1063a9643ea8Slogwang utime64_t start = mtframe->GetLastClock(); 1064a9643ea8Slogwang MicroThread* thread = mtframe->GetActiveThread(); 1065a9643ea8Slogwang utime64_t now = 0; 1066a9643ea8Slogwang 1067a9643ea8Slogwang if(fd<0) 1068a9643ea8Slogwang { 1069a9643ea8Slogwang errno = EINVAL; 1070a9643ea8Slogwang MTLOG_ERROR("accept failed, errno: %d (%m)", errno); 1071a9643ea8Slogwang return -10; 1072a9643ea8Slogwang } 1073a9643ea8Slogwang 1074a9643ea8Slogwang int acceptfd = 0; 1075a9643ea8Slogwang mt_hook_syscall(accept); 1076a9643ea8Slogwang while ((acceptfd = ff_hook_accept(fd, addr, addrlen)) < 0) 1077a9643ea8Slogwang { 1078a9643ea8Slogwang now = mtframe->GetLastClock(); 1079a9643ea8Slogwang if ((int)(now - start) > timeout) 1080a9643ea8Slogwang { 1081a9643ea8Slogwang errno = ETIME; 1082a9643ea8Slogwang return -1; 1083a9643ea8Slogwang } 1084a9643ea8Slogwang 1085a9643ea8Slogwang if (errno == EINTR) { 1086a9643ea8Slogwang continue; 1087a9643ea8Slogwang } 1088a9643ea8Slogwang 1089a9643ea8Slogwang if (!((errno == EAGAIN) || (errno == EWOULDBLOCK))) { 1090a9643ea8Slogwang MTLOG_ERROR("accept failed, errno: %d", errno); 1091a9643ea8Slogwang return -2; 1092a9643ea8Slogwang } 1093a9643ea8Slogwang 1094a9643ea8Slogwang KqueuerObj epfd; 1095a9643ea8Slogwang epfd.SetOsfd(fd); 1096a9643ea8Slogwang epfd.EnableInput(); 1097a9643ea8Slogwang epfd.SetOwnerThread(thread); 1098a9643ea8Slogwang if (!mtframe->KqueueSchedule(NULL, &epfd, timeout)) { 1099a9643ea8Slogwang return -3; 1100a9643ea8Slogwang } 1101a9643ea8Slogwang } 1102a9643ea8Slogwang 1103a9643ea8Slogwang return acceptfd; 1104a9643ea8Slogwang } 1105a9643ea8Slogwang 1106a9643ea8Slogwang ssize_t MtFrame::read(int fd, void *buf, size_t nbyte, int timeout) 1107a9643ea8Slogwang { 1108a9643ea8Slogwang MtFrame* mtframe = MtFrame::Instance(); 1109a9643ea8Slogwang utime64_t start = mtframe->GetLastClock(); 1110a9643ea8Slogwang MicroThread* thread = mtframe->GetActiveThread(); 1111a9643ea8Slogwang utime64_t now = 0; 1112a9643ea8Slogwang 1113a9643ea8Slogwang if(fd<0 || !buf || nbyte<1) 1114a9643ea8Slogwang { 1115a9643ea8Slogwang errno = EINVAL; 1116a9643ea8Slogwang MTLOG_ERROR("read failed, errno: %d (%m)", errno); 1117a9643ea8Slogwang return -10; 1118a9643ea8Slogwang } 1119a9643ea8Slogwang 1120a9643ea8Slogwang ssize_t n = 0; 1121a9643ea8Slogwang mt_hook_syscall(read); 1122a9643ea8Slogwang while ((n = ff_hook_read(fd, buf, nbyte)) < 0) 1123a9643ea8Slogwang { 1124a9643ea8Slogwang now = mtframe->GetLastClock(); 1125a9643ea8Slogwang if ((int)(now - start) > timeout) 1126a9643ea8Slogwang { 1127a9643ea8Slogwang errno = ETIME; 1128a9643ea8Slogwang return -1; 1129a9643ea8Slogwang } 1130a9643ea8Slogwang 1131a9643ea8Slogwang if (errno == EINTR) { 1132a9643ea8Slogwang continue; 1133a9643ea8Slogwang } 1134a9643ea8Slogwang 1135a9643ea8Slogwang if ((errno != EAGAIN) && (errno != EWOULDBLOCK)) { 1136a9643ea8Slogwang MTLOG_ERROR("read failed, errno: %d", errno); 1137a9643ea8Slogwang return -2; 1138a9643ea8Slogwang } 1139a9643ea8Slogwang 1140a9643ea8Slogwang KqueuerObj epfd; 1141a9643ea8Slogwang epfd.SetOsfd(fd); 1142a9643ea8Slogwang epfd.EnableInput(); 1143a9643ea8Slogwang epfd.SetOwnerThread(thread); 1144a9643ea8Slogwang if (!mtframe->KqueueSchedule(NULL, &epfd, timeout)) { 1145a9643ea8Slogwang return -3; 1146a9643ea8Slogwang } 1147a9643ea8Slogwang } 1148a9643ea8Slogwang 1149a9643ea8Slogwang return n; 1150a9643ea8Slogwang } 1151a9643ea8Slogwang 1152a9643ea8Slogwang ssize_t MtFrame::write(int fd, const void *buf, size_t nbyte, int timeout) 1153a9643ea8Slogwang { 1154a9643ea8Slogwang MtFrame* mtframe = MtFrame::Instance(); 1155a9643ea8Slogwang utime64_t start = mtframe->GetLastClock(); 1156a9643ea8Slogwang MicroThread* thread = mtframe->GetActiveThread(); 1157a9643ea8Slogwang utime64_t now = 0; 1158a9643ea8Slogwang 1159a9643ea8Slogwang if(fd<0 || !buf || nbyte<1) 1160a9643ea8Slogwang { 1161a9643ea8Slogwang errno = EINVAL; 1162a9643ea8Slogwang MTLOG_ERROR("write failed, errno: %d (%m)", errno); 1163a9643ea8Slogwang return -10; 1164a9643ea8Slogwang } 1165a9643ea8Slogwang 1166a9643ea8Slogwang ssize_t n = 0; 1167a9643ea8Slogwang size_t send_len = 0; 1168a9643ea8Slogwang while (send_len < nbyte) 1169a9643ea8Slogwang { 1170a9643ea8Slogwang now = mtframe->GetLastClock(); 1171a9643ea8Slogwang if ((int)(now - start) > timeout) 1172a9643ea8Slogwang { 1173a9643ea8Slogwang errno = ETIME; 1174a9643ea8Slogwang return -1; 1175a9643ea8Slogwang } 1176a9643ea8Slogwang 1177a9643ea8Slogwang mt_hook_syscall(write); 1178a9643ea8Slogwang n = ff_hook_write(fd, (char*)buf + send_len, nbyte - send_len); 1179a9643ea8Slogwang if (n < 0) 1180a9643ea8Slogwang { 1181a9643ea8Slogwang if (errno == EINTR) { 1182a9643ea8Slogwang continue; 1183a9643ea8Slogwang } 1184a9643ea8Slogwang 1185a9643ea8Slogwang if ((errno != EAGAIN) && (errno != EWOULDBLOCK)) { 1186a9643ea8Slogwang MTLOG_ERROR("write failed, errno: %d", errno); 1187a9643ea8Slogwang return -2; 1188a9643ea8Slogwang } 1189a9643ea8Slogwang } 1190a9643ea8Slogwang else 1191a9643ea8Slogwang { 1192a9643ea8Slogwang send_len += n; 1193a9643ea8Slogwang if (send_len >= nbyte) { 1194a9643ea8Slogwang return nbyte; 1195a9643ea8Slogwang } 1196a9643ea8Slogwang } 1197a9643ea8Slogwang 1198a9643ea8Slogwang KqueuerObj epfd; 1199a9643ea8Slogwang epfd.SetOsfd(fd); 1200a9643ea8Slogwang epfd.EnableOutput(); 1201a9643ea8Slogwang epfd.SetOwnerThread(thread); 1202a9643ea8Slogwang if (!mtframe->KqueueSchedule(NULL, &epfd, timeout)) { 1203a9643ea8Slogwang return -3; 1204a9643ea8Slogwang } 1205a9643ea8Slogwang } 1206a9643ea8Slogwang 1207a9643ea8Slogwang return nbyte; 1208a9643ea8Slogwang } 1209a9643ea8Slogwang 1210a9643ea8Slogwang int MtFrame::recv(int fd, void *buf, int len, int flags, int timeout) 1211a9643ea8Slogwang { 1212a9643ea8Slogwang MtFrame* mtframe = MtFrame::Instance(); 1213a9643ea8Slogwang utime64_t start = mtframe->GetLastClock(); 1214a9643ea8Slogwang MicroThread* thread = mtframe->GetActiveThread(); 1215a9643ea8Slogwang utime64_t now = 0; 1216a9643ea8Slogwang 1217a9643ea8Slogwang if(fd<0 || !buf || len<1) 1218a9643ea8Slogwang { 1219a9643ea8Slogwang errno = EINVAL; 1220a9643ea8Slogwang MTLOG_ERROR("recv failed, errno: %d (%m)", errno); 1221a9643ea8Slogwang return -10; 1222a9643ea8Slogwang } 1223a9643ea8Slogwang 1224a9643ea8Slogwang if (timeout <= -1) 1225a9643ea8Slogwang { 1226a9643ea8Slogwang timeout = 0x7fffffff; 1227a9643ea8Slogwang } 1228a9643ea8Slogwang 1229a9643ea8Slogwang while (true) 1230a9643ea8Slogwang { 1231a9643ea8Slogwang now = mtframe->GetLastClock(); 1232a9643ea8Slogwang if ((int)(now - start) > timeout) 1233a9643ea8Slogwang { 1234a9643ea8Slogwang errno = ETIME; 1235a9643ea8Slogwang return -1; 1236a9643ea8Slogwang } 1237a9643ea8Slogwang 1238a9643ea8Slogwang KqueuerObj epfd; 1239a9643ea8Slogwang epfd.SetOsfd(fd); 1240a9643ea8Slogwang epfd.EnableInput(); 1241a9643ea8Slogwang epfd.SetOwnerThread(thread); 1242a9643ea8Slogwang if (!mtframe->KqueueSchedule(NULL, &epfd, timeout)) 1243a9643ea8Slogwang { 1244a9643ea8Slogwang MTLOG_DEBUG("epoll schedule failed, errno: %d", errno); 1245a9643ea8Slogwang return -2; 1246a9643ea8Slogwang } 1247a9643ea8Slogwang 1248a9643ea8Slogwang mt_hook_syscall(recv); 1249a9643ea8Slogwang int n = ff_hook_recv(fd, buf, len, flags); 1250a9643ea8Slogwang if (n < 0) 1251a9643ea8Slogwang { 1252a9643ea8Slogwang if (errno == EINTR) { 1253a9643ea8Slogwang continue; 1254a9643ea8Slogwang } 1255a9643ea8Slogwang 1256a9643ea8Slogwang if ((errno != EAGAIN) && (errno != EWOULDBLOCK)) 1257a9643ea8Slogwang { 1258a9643ea8Slogwang MTLOG_ERROR("recv failed, errno: %d", errno); 1259a9643ea8Slogwang return -3; 1260a9643ea8Slogwang } 1261a9643ea8Slogwang } 1262a9643ea8Slogwang else 1263a9643ea8Slogwang { 1264a9643ea8Slogwang return n; 1265a9643ea8Slogwang } 1266a9643ea8Slogwang } 1267a9643ea8Slogwang 1268a9643ea8Slogwang } 1269a9643ea8Slogwang 1270a9643ea8Slogwang ssize_t MtFrame::send(int fd, const void *buf, size_t nbyte, int flags, int timeout) 1271a9643ea8Slogwang { 1272a9643ea8Slogwang MtFrame* mtframe = MtFrame::Instance(); 1273a9643ea8Slogwang utime64_t start = mtframe->GetLastClock(); 1274a9643ea8Slogwang MicroThread* thread = mtframe->GetActiveThread(); 1275a9643ea8Slogwang utime64_t now = 0; 1276a9643ea8Slogwang 1277a9643ea8Slogwang if(fd<0 || !buf || nbyte<1) 1278a9643ea8Slogwang { 1279a9643ea8Slogwang errno = EINVAL; 1280a9643ea8Slogwang MTLOG_ERROR("send failed, errno: %d (%m)", errno); 1281a9643ea8Slogwang return -10; 1282a9643ea8Slogwang } 1283a9643ea8Slogwang 1284a9643ea8Slogwang ssize_t n = 0; 1285a9643ea8Slogwang size_t send_len = 0; 1286a9643ea8Slogwang while (send_len < nbyte) 1287a9643ea8Slogwang { 1288a9643ea8Slogwang now = mtframe->GetLastClock(); 1289a9643ea8Slogwang if ((int)(now - start) > timeout) 1290a9643ea8Slogwang { 1291a9643ea8Slogwang errno = ETIME; 1292a9643ea8Slogwang return -1; 1293a9643ea8Slogwang } 1294a9643ea8Slogwang 1295a9643ea8Slogwang mt_hook_syscall(send); 1296a9643ea8Slogwang n = ff_hook_send(fd, (char*)buf + send_len, nbyte - send_len, flags); 1297a9643ea8Slogwang if (n < 0) 1298a9643ea8Slogwang { 1299a9643ea8Slogwang if (errno == EINTR) { 1300a9643ea8Slogwang continue; 1301a9643ea8Slogwang } 1302a9643ea8Slogwang 1303a9643ea8Slogwang if ((errno != EAGAIN) && (errno != EWOULDBLOCK)) { 1304a9643ea8Slogwang MTLOG_ERROR("write failed, errno: %d", errno); 1305a9643ea8Slogwang return -2; 1306a9643ea8Slogwang } 1307a9643ea8Slogwang } 1308a9643ea8Slogwang else 1309a9643ea8Slogwang { 1310a9643ea8Slogwang send_len += n; 1311a9643ea8Slogwang if (send_len >= nbyte) { 1312a9643ea8Slogwang return nbyte; 1313a9643ea8Slogwang } 1314a9643ea8Slogwang } 1315a9643ea8Slogwang 1316a9643ea8Slogwang KqueuerObj epfd; 1317a9643ea8Slogwang epfd.SetOsfd(fd); 1318a9643ea8Slogwang epfd.EnableOutput(); 1319a9643ea8Slogwang epfd.SetOwnerThread(thread); 1320a9643ea8Slogwang if (!mtframe->KqueueSchedule(NULL, &epfd, timeout)) { 1321a9643ea8Slogwang return -3; 1322a9643ea8Slogwang } 1323a9643ea8Slogwang } 1324a9643ea8Slogwang 1325a9643ea8Slogwang return nbyte; 1326a9643ea8Slogwang } 1327a9643ea8Slogwang 1328a9643ea8Slogwang void MtFrame::sleep(int ms) 1329a9643ea8Slogwang { 1330a9643ea8Slogwang MtFrame* frame = MtFrame::Instance(); 1331a9643ea8Slogwang MicroThread* thread = frame->GetActiveThread(); 1332a9643ea8Slogwang if (thread != NULL) 1333a9643ea8Slogwang { 1334a9643ea8Slogwang thread->sleep(ms); 1335a9643ea8Slogwang } 1336a9643ea8Slogwang } 1337a9643ea8Slogwang 1338a9643ea8Slogwang int MtFrame::WaitEvents(int fd, int events, int timeout) 1339a9643ea8Slogwang { 1340a9643ea8Slogwang MtFrame* mtframe = MtFrame::Instance(); 1341a9643ea8Slogwang utime64_t start = mtframe->GetLastClock(); 1342a9643ea8Slogwang MicroThread* thread = mtframe->GetActiveThread(); 1343a9643ea8Slogwang utime64_t now = 0; 1344a9643ea8Slogwang 1345a9643ea8Slogwang if (timeout <= -1) 1346a9643ea8Slogwang { 1347a9643ea8Slogwang timeout = 0x7fffffff; 1348a9643ea8Slogwang } 1349a9643ea8Slogwang 1350a9643ea8Slogwang while (true) 1351a9643ea8Slogwang { 1352a9643ea8Slogwang now = mtframe->GetLastClock(); 1353a9643ea8Slogwang if ((int)(now - start) > timeout) 1354a9643ea8Slogwang { 1355a9643ea8Slogwang errno = ETIME; 1356a9643ea8Slogwang return 0; 1357a9643ea8Slogwang } 1358a9643ea8Slogwang 1359a9643ea8Slogwang KqueuerObj epfd; 1360a9643ea8Slogwang epfd.SetOsfd(fd); 1361a9643ea8Slogwang if (events & KQ_EVENT_READ) 1362a9643ea8Slogwang { 1363a9643ea8Slogwang epfd.EnableInput(); 1364a9643ea8Slogwang } 1365a9643ea8Slogwang if (events & KQ_EVENT_WRITE) 1366a9643ea8Slogwang { 1367a9643ea8Slogwang epfd.EnableOutput(); 1368a9643ea8Slogwang } 1369a9643ea8Slogwang epfd.SetOwnerThread(thread); 1370a9643ea8Slogwang 1371a9643ea8Slogwang if (!mtframe->KqueueSchedule(NULL, &epfd, timeout)) 1372a9643ea8Slogwang { 1373a9643ea8Slogwang MTLOG_TRACE("epoll schedule failed, errno: %d", errno); 1374a9643ea8Slogwang return 0; 1375a9643ea8Slogwang } 1376a9643ea8Slogwang 1377a9643ea8Slogwang return epfd.GetRcvEvents(); 1378a9643ea8Slogwang } 1379a9643ea8Slogwang } 1380