1 2 /** 3 * Tencent is pleased to support the open source community by making MSEC available. 4 * 5 * Copyright (C) 2016 THL A29 Limited, a Tencent company. All rights reserved. 6 * 7 * Licensed under the GNU General Public License, Version 2.0 (the "License"); 8 * you may not use this file except in compliance with the License. You may 9 * obtain a copy of the License at 10 * 11 * https://opensource.org/licenses/GPL-2.0 12 * 13 * Unless required by applicable law or agreed to in writing, software distributed under the 14 * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, 15 * either express or implied. See the License for the specific language governing permissions 16 * and limitations under the License. 17 */ 18 19 20 /** 21 * @filename micro_thread.cpp 22 * @info micro thread manager 23 */ 24 #include "mt_version.h" 25 #include "micro_thread.h" 26 #include "mt_net.h" 27 #include "valgrind.h" 28 #include <assert.h> 29 #include "mt_sys_hook.h" 30 #include "ff_hook.h" 31 #include "ff_api.h" 32 33 using namespace NS_MICRO_THREAD; 34 35 #define ASSERT(statement) 36 //#define ASSERT(statement) assert(statement) 37 38 extern "C" int save_context(jmp_buf jbf); 39 40 extern "C" void restore_context(jmp_buf jbf, int ret); 41 42 extern "C" void replace_esp(jmp_buf jbf, void* esp); 43 44 Thread::Thread(int stack_size) 45 { 46 _stack_size = stack_size ? stack_size : ThreadPool::default_stack_size; 47 _wakeup_time = 0; 48 _stack = NULL; 49 memset(&_jmpbuf, 0, sizeof(_jmpbuf)); 50 } 51 52 53 /** 54 * @brief LINUX x86/x86_64's allocated stacks. 55 */ 56 bool Thread::InitStack() 57 { 58 if (_stack) { 59 return true; 60 } 61 62 ///< stack index and memory are separated to prevent out of bounds. 63 _stack = (MtStack*)calloc(1, sizeof(MtStack)); 64 if (NULL == _stack) 65 { 66 MTLOG_ERROR("calloc stack failed, size %u", sizeof(MtStack)); 67 return false; 68 } 69 70 int memsize = MEM_PAGE_SIZE*2 + _stack_size; 71 memsize = (memsize + MEM_PAGE_SIZE - 1)/MEM_PAGE_SIZE*MEM_PAGE_SIZE; 72 73 static int zero_fd = -1; 74 int mmap_flags = MAP_PRIVATE | MAP_ANON; 75 void* vaddr = mmap(NULL, memsize, PROT_READ | PROT_WRITE, mmap_flags, zero_fd, 0); 76 if (vaddr == (void *)MAP_FAILED) 77 { 78 MTLOG_ERROR("mmap stack failed, size %d", memsize); 79 free(_stack); 80 _stack = NULL; 81 return false; 82 } 83 _stack->_vaddr = (char*)vaddr; 84 _stack->_vaddr_size = memsize; 85 _stack->_stk_size = _stack_size; 86 _stack->_stk_bottom = _stack->_vaddr + MEM_PAGE_SIZE; 87 _stack->_stk_top = _stack->_stk_bottom + _stack->_stk_size; 88 // valgrind support: register stack frame 89 _stack->valgrind_id = VALGRIND_STACK_REGISTER(_stack->_stk_bottom, _stack->_stk_top); 90 91 _stack->_esp = _stack->_stk_top - STACK_PAD_SIZE; 92 93 mprotect(_stack->_vaddr, MEM_PAGE_SIZE, PROT_NONE); 94 mprotect(_stack->_stk_top, MEM_PAGE_SIZE, PROT_NONE); 95 96 return true; 97 } 98 99 100 void Thread::FreeStack() 101 { 102 if (!_stack) { 103 return; 104 } 105 munmap(_stack->_vaddr, _stack->_vaddr_size); 106 // valgrind support: deregister stack frame 107 VALGRIND_STACK_DEREGISTER(_stack->valgrind_id); 108 free(_stack); 109 _stack = NULL; 110 } 111 112 void Thread::InitContext() 113 { 114 if (save_context(_jmpbuf) != 0) 115 { 116 ScheduleObj::Instance()->ScheduleStartRun(); 117 } 118 119 if (_stack != NULL) 120 { 121 replace_esp(_jmpbuf, _stack->_esp); 122 } 123 } 124 125 void Thread::SwitchContext() 126 { 127 if (save_context(_jmpbuf) == 0) 128 { 129 ScheduleObj::Instance()->ScheduleThread(); 130 } 131 } 132 133 void Thread::RestoreContext() 134 { 135 restore_context(_jmpbuf, 1); 136 } 137 138 139 bool Thread::Initial() 140 { 141 if (!InitStack()) 142 { 143 MTLOG_ERROR("init stack failed"); 144 return false; 145 } 146 147 InitContext(); 148 149 return true; 150 } 151 152 void Thread::Destroy() 153 { 154 FreeStack(); 155 memset(&_jmpbuf, 0, sizeof(_jmpbuf)); 156 } 157 158 void Thread::Reset() 159 { 160 _wakeup_time = 0; 161 SetPrivate(NULL); 162 163 InitContext(); 164 CleanState(); 165 } 166 167 void Thread::sleep(int ms) 168 { 169 utime64_t now = ScheduleObj::Instance()->ScheduleGetTime(); 170 _wakeup_time = now + ms; 171 172 if (save_context(_jmpbuf) == 0) 173 { 174 ScheduleObj::Instance()->ScheduleSleep(); 175 } 176 } 177 178 void Thread::Wait() 179 { 180 if (save_context(_jmpbuf) == 0) 181 { 182 ScheduleObj::Instance()->SchedulePend(); 183 } 184 } 185 186 bool Thread::CheckStackHealth(char *esp) 187 { 188 if (!_stack) 189 return false; 190 191 if (esp > _stack->_stk_bottom && esp < _stack->_stk_top) 192 return true; 193 else 194 return false; 195 } 196 197 MicroThread::MicroThread(ThreadType type) 198 { 199 memset(&_entry, 0, sizeof(_entry)); 200 TAILQ_INIT(&_fdset); 201 TAILQ_INIT(&_sub_list); 202 _flag = NOT_INLIST; 203 _type = type; 204 _state = INITIAL; 205 _start = NULL; 206 _args = NULL; 207 _parent = NULL; 208 } 209 210 void MicroThread::CleanState() 211 { 212 TAILQ_INIT(&_fdset); 213 TAILQ_INIT(&_sub_list); 214 _flag = NOT_INLIST; 215 _type = NORMAL; 216 _state = INITIAL; 217 _start = NULL; 218 _args = NULL; 219 _parent = NULL; 220 } 221 222 void MicroThread::Run() 223 { 224 if (_start) { 225 _start(_args); 226 } 227 228 if (this->IsSubThread()) { 229 this->WakeupParent(); 230 } 231 232 ScheduleObj::Instance()->ScheduleReclaim(); 233 ScheduleObj::Instance()->ScheduleThread(); 234 } 235 236 void MicroThread::WakeupParent() 237 { 238 MicroThread* parent = this->GetParent(); 239 if (parent) 240 { 241 parent->RemoveSubThread(this); 242 if (parent->HasNoSubThread()) 243 { 244 ScheduleObj::Instance()->ScheduleUnpend(parent); 245 } 246 } 247 else 248 { 249 MTLOG_ERROR("Sub thread no parent, error"); 250 } 251 } 252 253 bool MicroThread::HasNoSubThread() 254 { 255 return TAILQ_EMPTY(&_sub_list); 256 } 257 258 void MicroThread::AddSubThread(MicroThread* sub) 259 { 260 ASSERT(!sub->HasFlag(MicroThread::SUB_LIST)); 261 if (!sub->HasFlag(MicroThread::SUB_LIST)) 262 { 263 TAILQ_INSERT_TAIL(&_sub_list, sub, _sub_entry); 264 sub->_parent = this; 265 } 266 267 sub->SetFlag(MicroThread::SUB_LIST); 268 } 269 270 void MicroThread::RemoveSubThread(MicroThread* sub) 271 { 272 ASSERT(sub->HasFlag(MicroThread::SUB_LIST)); 273 if (sub->HasFlag(MicroThread::SUB_LIST)) 274 { 275 TAILQ_REMOVE(&_sub_list, sub, _sub_entry); 276 sub->_parent = NULL; 277 } 278 279 sub->UnsetFlag(MicroThread::SUB_LIST); 280 } 281 282 ScheduleObj *ScheduleObj::_instance = NULL; 283 inline ScheduleObj* ScheduleObj::Instance() 284 { 285 if (NULL == _instance) 286 { 287 _instance = new ScheduleObj(); 288 } 289 290 return _instance; 291 } 292 293 void ScheduleObj::ScheduleThread() 294 { 295 MtFrame* frame = MtFrame::Instance(); 296 frame->ThreadSchdule(); 297 } 298 299 utime64_t ScheduleObj::ScheduleGetTime() 300 { 301 MtFrame* frame = MtFrame::Instance(); 302 if (frame) 303 { 304 return frame->GetLastClock(); 305 } 306 else 307 { 308 MTLOG_ERROR("frame time failed, maybe not init"); 309 return 0; 310 } 311 } 312 313 void ScheduleObj::ScheduleSleep() 314 { 315 MtFrame* frame = MtFrame::Instance(); 316 MicroThread* thread = frame->GetActiveThread(); 317 if ((!frame) || (!thread)) { 318 MTLOG_ERROR("frame and act thread null, %p, %p", frame, thread); 319 return; 320 } 321 322 frame->InsertSleep(thread); 323 frame->ThreadSchdule(); 324 } 325 326 void ScheduleObj::SchedulePend() 327 { 328 MtFrame* frame = MtFrame::Instance(); 329 MicroThread* thread = frame->GetActiveThread(); 330 if ((!frame) || (!thread)) { 331 MTLOG_ERROR("frame and act thread null, %p, %p", frame, thread); 332 return; 333 } 334 335 frame->InsertPend(thread); 336 frame->ThreadSchdule(); 337 } 338 339 void ScheduleObj::ScheduleUnpend(void* pthread) 340 { 341 MtFrame* frame = MtFrame::Instance(); 342 MicroThread* thread = (MicroThread*)pthread; 343 if ((!frame) || (!thread)) { 344 MTLOG_ERROR("frame and act thread null, %p, %p", frame, thread); 345 return; 346 } 347 348 frame->RemovePend(thread); 349 frame->InsertRunable(thread); 350 } 351 352 void ScheduleObj::ScheduleReclaim() 353 { 354 MtFrame* frame = MtFrame::Instance(); 355 MicroThread* thread = frame->GetActiveThread(); 356 if ((!frame) || (!thread)) { 357 MTLOG_ERROR("frame and act thread null, %p, %p", frame, thread); 358 return; 359 } 360 361 frame->FreeThread(thread); 362 } 363 364 void ScheduleObj::ScheduleStartRun() 365 { 366 MtFrame* frame = MtFrame::Instance(); 367 MicroThread* thread = frame->GetActiveThread(); 368 if ((!frame) || (!thread)) { 369 MTLOG_ERROR("frame and act thread null, %p, %p", frame, thread); 370 return; 371 } 372 373 thread->Run(); 374 } 375 376 377 unsigned int ThreadPool::default_thread_num = DEFAULT_THREAD_NUM; ///< 2000 micro threads. 378 unsigned int ThreadPool::default_stack_size = DEFAULT_STACK_SIZE; ///< 128k stack. 379 380 bool ThreadPool::InitialPool(int max_num) 381 { 382 MicroThread *thread = NULL; 383 for (unsigned int i = 0; i < default_thread_num; i++) 384 { 385 thread = new MicroThread(); 386 if ((NULL == thread) || (false == thread->Initial())) 387 { 388 MTLOG_ERROR("init pool, thread %p init failed", thread); 389 if (thread) delete thread; 390 continue; 391 } 392 thread->SetFlag(MicroThread::FREE_LIST); 393 _freelist.push(thread); 394 } 395 396 _total_num = _freelist.size(); 397 _max_num = max_num; 398 _use_num = 0; 399 if (_total_num <= 0) 400 { 401 return false; 402 } 403 else 404 { 405 return true; 406 } 407 } 408 409 void ThreadPool::DestroyPool() 410 { 411 MicroThread* thread = NULL; 412 while (!_freelist.empty()) 413 { 414 thread = _freelist.front(); 415 _freelist.pop(); 416 thread->Destroy(); 417 delete thread; 418 } 419 420 _total_num = 0; 421 _use_num = 0; 422 } 423 424 MicroThread* ThreadPool::AllocThread() 425 { 426 MT_ATTR_API_SET(492069, _total_num); 427 428 MicroThread* thread = NULL; 429 if (!_freelist.empty()) 430 { 431 thread = _freelist.front(); 432 _freelist.pop(); 433 434 ASSERT(thread->HasFlag(MicroThread::FREE_LIST)); 435 436 thread->UnsetFlag(MicroThread::FREE_LIST); 437 _use_num++; 438 return thread; 439 } 440 441 MT_ATTR_API(320846, 1); // pool no nore 442 if (_total_num >= _max_num) 443 { 444 MT_ATTR_API(361140, 1); // no more quota 445 return NULL; 446 } 447 448 thread = new MicroThread(); 449 if ((NULL == thread) || (false == thread->Initial())) 450 { 451 MT_ATTR_API(320847, 1); // pool init fail 452 MTLOG_ERROR("thread alloc failed, thread: %p", thread); 453 if (thread) delete thread; 454 return NULL; 455 } 456 _total_num++; 457 _use_num++; 458 459 return thread; 460 } 461 462 void ThreadPool::FreeThread(MicroThread* thread) 463 { 464 ASSERT(!thread->HasFlag(MicroThread::FREE_LIST)); 465 thread->Reset(); 466 _use_num--; 467 _freelist.push(thread); 468 thread->SetFlag(MicroThread::FREE_LIST); 469 470 unsigned int free_num = _freelist.size(); 471 if ((free_num > default_thread_num) && (free_num > 1)) 472 { 473 thread = _freelist.front(); 474 _freelist.pop(); 475 thread->Destroy(); 476 delete thread; 477 _total_num--; 478 } 479 } 480 481 int ThreadPool::GetUsedNum(void) 482 { 483 return _use_num; 484 } 485 486 MtFrame *MtFrame::_instance = NULL; 487 inline MtFrame* MtFrame::Instance () 488 { 489 if (NULL == _instance ) 490 { 491 _instance = new MtFrame(); 492 } 493 494 return _instance; 495 } 496 497 void MtFrame::SetHookFlag() { 498 mt_set_hook_flag(); 499 }; 500 501 bool MtFrame::InitFrame(LogAdapter* logadpt, int max_thread_num) 502 { 503 _log_adpt = logadpt; 504 505 if ((this->InitKqueue(max_thread_num) < 0) || !this->InitialPool(max_thread_num)) 506 { 507 MTLOG_ERROR("Init epoll or thread pool failed"); 508 this->Destroy(); 509 return false; 510 } 511 if (_sleeplist.HeapResize(max_thread_num * 2) < 0) 512 { 513 MTLOG_ERROR("Init heap list failed"); 514 this->Destroy(); 515 return false; 516 } 517 518 _timer = new CTimerMng(max_thread_num * 2); 519 if (NULL == _timer) 520 { 521 MTLOG_ERROR("Init heap timer failed"); 522 this->Destroy(); 523 return false; 524 } 525 526 _daemon = AllocThread(); 527 if (NULL == _daemon) 528 { 529 MTLOG_ERROR("Alloc daemon thread failed"); 530 this->Destroy(); 531 return false; 532 } 533 _daemon->SetType(MicroThread::DAEMON); 534 _daemon->SetState(MicroThread::RUNABLE); 535 _daemon->SetSartFunc(MtFrame::DaemonRun, this); 536 537 _primo = new MicroThread(MicroThread::PRIMORDIAL); 538 if (NULL == _primo) 539 { 540 MTLOG_ERROR("new _primo thread failed"); 541 this->Destroy(); 542 return false; 543 } 544 _primo->SetState(MicroThread::RUNNING); 545 SetActiveThread(_primo); 546 547 _last_clock = GetSystemMS(); 548 TAILQ_INIT(&_iolist); 549 TAILQ_INIT(&_pend_list); 550 551 //SetHookFlag(); 552 553 return true; 554 555 } 556 557 void MtFrame::Destroy(void) 558 { 559 if (NULL == _instance ) 560 { 561 return; 562 } 563 564 if (_primo) { 565 delete _primo; 566 _primo = NULL; 567 } 568 569 if (_daemon) { 570 FreeThread(_daemon); 571 _daemon = NULL; 572 } 573 574 TAILQ_INIT(&_iolist); 575 576 MicroThread* thread = dynamic_cast<MicroThread*>(_sleeplist.HeapPop()); 577 while (thread) 578 { 579 FreeThread(thread); 580 thread = dynamic_cast<MicroThread*>(_sleeplist.HeapPop()); 581 } 582 583 while (!_runlist.empty()) 584 { 585 thread = _runlist.front(); 586 _runlist.pop(); 587 FreeThread(thread); 588 } 589 590 MicroThread* tmp; 591 TAILQ_FOREACH_SAFE(thread, &_pend_list, _entry, tmp) 592 { 593 TAILQ_REMOVE(&_pend_list, thread, _entry); 594 FreeThread(thread); 595 } 596 597 if (_timer != NULL) 598 { 599 delete _timer; 600 _timer = NULL; 601 } 602 603 _instance->DestroyPool(); 604 _instance->TermKqueue(); 605 delete _instance; 606 _instance = NULL; 607 } 608 609 char* MtFrame::Version() 610 { 611 return IMT_VERSION; 612 } 613 614 MicroThread* MtFrame::CreateThread(ThreadStart entry, void *args, bool runable) 615 { 616 MtFrame* mtframe = MtFrame::Instance(); 617 MicroThread* thread = mtframe->AllocThread(); 618 if (NULL == thread) 619 { 620 MTLOG_ERROR("create thread failed"); 621 return NULL; 622 } 623 thread->SetSartFunc(entry, args); 624 625 if (runable) { 626 mtframe->InsertRunable(thread); 627 } 628 629 return thread; 630 } 631 632 int MtFrame::Loop(void* args) 633 { 634 MtFrame* mtframe = MtFrame::Instance(); 635 MicroThread* daemon = mtframe->DaemonThread(); 636 637 mtframe->KqueueDispatch(); 638 mtframe->SetLastClock(mtframe->GetSystemMS()); 639 mtframe->WakeupTimeout(); 640 mtframe->CheckExpired(); 641 daemon->SwitchContext(); 642 643 return 0; 644 } 645 646 void MtFrame::DaemonRun(void* args) 647 { 648 /* 649 MtFrame* mtframe = MtFrame::Instance(); 650 MicroThread* daemon = mtframe->DaemonThread(); 651 652 while (true) { 653 mtframe->KqueueDispatch(); 654 mtframe->SetLastClock(mtframe->GetSystemMS()); 655 mtframe->WakeupTimeout(); 656 mtframe->CheckExpired(); 657 daemon->SwitchContext(); 658 } 659 */ 660 ff_run(MtFrame::Loop, NULL); 661 } 662 663 MicroThread *MtFrame::GetRootThread() 664 { 665 if (NULL == _curr_thread) 666 { 667 return NULL; 668 } 669 670 MicroThread::ThreadType type = _curr_thread->GetType(); 671 MicroThread *thread = _curr_thread; 672 MicroThread *parent = thread; 673 674 while (MicroThread::SUB_THREAD == type) 675 { 676 thread = thread->GetParent(); 677 if (!thread) 678 { 679 break; 680 } 681 682 type = thread->GetType(); 683 parent = thread; 684 } 685 686 return parent; 687 } 688 689 void MtFrame::ThreadSchdule() 690 { 691 MicroThread* thread = NULL; 692 MtFrame* mtframe = MtFrame::Instance(); 693 694 if (mtframe->_runlist.empty()) 695 { 696 thread = mtframe->DaemonThread(); 697 } 698 else 699 { 700 thread = mtframe->_runlist.front(); 701 mtframe->RemoveRunable(thread); 702 } 703 704 this->SetActiveThread(thread); 705 thread->SetState(MicroThread::RUNNING); 706 thread->RestoreContext(); 707 } 708 709 void MtFrame::CheckExpired() 710 { 711 static utime64_t check_time = 0; 712 713 if (_timer != NULL) 714 { 715 _timer->check_expired(); 716 } 717 718 utime64_t now = GetLastClock(); 719 720 if ((now - check_time) > 1000) 721 { 722 CNetMgr::Instance()->RecycleObjs(now); 723 check_time = now; 724 } 725 } 726 727 void MtFrame::WakeupTimeout() 728 { 729 utime64_t now = GetLastClock(); 730 MicroThread* thread = dynamic_cast<MicroThread*>(_sleeplist.HeapTop()); 731 while (thread && (thread->GetWakeupTime() <= now)) 732 { 733 if (thread->HasFlag(MicroThread::IO_LIST)) 734 { 735 RemoveIoWait(thread); 736 } 737 else 738 { 739 RemoveSleep(thread); 740 } 741 742 InsertRunable(thread); 743 744 thread = dynamic_cast<MicroThread*>(_sleeplist.HeapTop()); 745 } 746 } 747 748 int MtFrame::KqueueGetTimeout() 749 { 750 utime64_t now = GetLastClock(); 751 MicroThread* thread = dynamic_cast<MicroThread*>(_sleeplist.HeapTop()); 752 if (!thread) 753 { 754 return 10; //default 10ms epollwait 755 } 756 else if (thread->GetWakeupTime() < now) 757 { 758 return 0; 759 } 760 else 761 { 762 return (int)(thread->GetWakeupTime() - now); 763 } 764 } 765 766 inline void MtFrame::InsertSleep(MicroThread* thread) 767 { 768 ASSERT(!thread->HasFlag(MicroThread::SLEEP_LIST)); 769 770 thread->SetFlag(MicroThread::SLEEP_LIST); 771 thread->SetState(MicroThread::SLEEPING); 772 int rc = _sleeplist.HeapPush(thread); 773 if (rc < 0) 774 { 775 MT_ATTR_API(320848, 1); // heap error 776 MTLOG_ERROR("Insert heap failed , rc %d", rc); 777 } 778 } 779 780 inline void MtFrame::RemoveSleep(MicroThread* thread) 781 { 782 ASSERT(thread->HasFlag(MicroThread::SLEEP_LIST)); 783 thread->UnsetFlag(MicroThread::SLEEP_LIST); 784 785 int rc = _sleeplist.HeapDelete(thread); 786 if (rc < 0) 787 { 788 MT_ATTR_API(320849, 1); // heap error 789 MTLOG_ERROR("remove heap failed , rc %d", rc); 790 } 791 } 792 793 inline void MtFrame::InsertIoWait(MicroThread* thread) 794 { 795 ASSERT(!thread->HasFlag(MicroThread::IO_LIST)); 796 thread->SetFlag(MicroThread::IO_LIST); 797 TAILQ_INSERT_TAIL(&_iolist, thread, _entry); 798 InsertSleep(thread); 799 } 800 801 void MtFrame::RemoveIoWait(MicroThread* thread) 802 { 803 ASSERT(thread->HasFlag(MicroThread::IO_LIST)); 804 thread->UnsetFlag(MicroThread::IO_LIST); 805 TAILQ_REMOVE(&_iolist, thread, _entry); 806 807 RemoveSleep(thread); 808 } 809 810 void MtFrame::InsertRunable(MicroThread* thread) 811 { 812 ASSERT(!thread->HasFlag(MicroThread::RUN_LIST)); 813 thread->SetFlag(MicroThread::RUN_LIST); 814 815 thread->SetState(MicroThread::RUNABLE); 816 _runlist.push(thread); 817 _waitnum++; 818 } 819 820 inline void MtFrame::RemoveRunable(MicroThread* thread) 821 { 822 ASSERT(thread->HasFlag(MicroThread::RUN_LIST)); 823 ASSERT(thread == _runlist.front()); 824 thread->UnsetFlag(MicroThread::RUN_LIST); 825 826 _runlist.pop(); 827 _waitnum--; 828 } 829 830 void MtFrame::InsertPend(MicroThread* thread) 831 { 832 ASSERT(!thread->HasFlag(MicroThread::PEND_LIST)); 833 thread->SetFlag(MicroThread::PEND_LIST); 834 TAILQ_INSERT_TAIL(&_pend_list, thread, _entry); 835 thread->SetState(MicroThread::PENDING); 836 } 837 838 void MtFrame::RemovePend(MicroThread* thread) 839 { 840 ASSERT(thread->HasFlag(MicroThread::PEND_LIST)); 841 thread->UnsetFlag(MicroThread::PEND_LIST); 842 TAILQ_REMOVE(&_pend_list, thread, _entry); 843 } 844 845 void MtFrame::WaitNotify(utime64_t timeout) 846 { 847 MicroThread* thread = GetActiveThread(); 848 849 thread->SetWakeupTime(timeout + this->GetLastClock()); 850 this->InsertIoWait(thread); 851 thread->SwitchContext(); 852 } 853 854 bool MtFrame::KqueueSchedule(KqObjList* fdlist, KqueuerObj* fd, int timeout) 855 { 856 MicroThread* thread = GetActiveThread(); 857 if (NULL == thread) 858 { 859 MTLOG_ERROR("active thread null, epoll schedule failed"); 860 return false; 861 } 862 863 thread->ClearAllFd(); 864 if (fdlist) 865 { 866 thread->AddFdList(fdlist); 867 } 868 if (fd) 869 { 870 thread->AddFd(fd); 871 } 872 873 thread->SetWakeupTime(timeout + this->GetLastClock()); 874 if (!this->KqueueAdd(thread->GetFdSet())) 875 { 876 MTLOG_ERROR("epoll add failed, errno: %d", errno); 877 return false; 878 } 879 this->InsertIoWait(thread); 880 thread->SwitchContext(); 881 882 int rcvnum = 0; 883 KqObjList& rcvfds = thread->GetFdSet(); 884 KqueuerObj* fdata = NULL; 885 TAILQ_FOREACH(fdata, &rcvfds, _entry) 886 { 887 if (fdata->GetRcvEvents() != 0) 888 { 889 rcvnum++; 890 } 891 } 892 this->KqueueDel(rcvfds); 893 894 if (rcvnum == 0) 895 { 896 errno = ETIME; 897 return false; 898 } 899 900 return true; 901 } 902 903 int MtFrame::recvfrom(int fd, void *buf, int len, int flags, struct sockaddr *from, socklen_t *fromlen, int timeout) 904 { 905 MtFrame* mtframe = MtFrame::Instance(); 906 utime64_t start = mtframe->GetLastClock(); 907 MicroThread* thread = mtframe->GetActiveThread(); 908 utime64_t now = 0; 909 910 if(fd<0 || !buf || len<1) 911 { 912 errno = EINVAL; 913 MTLOG_ERROR("recvfrom failed, errno: %d (%m)", errno); 914 return -10; 915 } 916 917 if (timeout <= -1) 918 { 919 timeout = 0x7fffffff; 920 } 921 922 while (true) 923 { 924 now = mtframe->GetLastClock(); 925 if ((int)(now - start) > timeout) 926 { 927 errno = ETIME; 928 return -1; 929 } 930 931 KqueuerObj epfd; 932 epfd.SetOsfd(fd); 933 epfd.EnableInput(); 934 epfd.SetOwnerThread(thread); 935 if (!mtframe->KqueueSchedule(NULL, &epfd, timeout)) 936 { 937 MTLOG_DEBUG("epoll schedule failed, errno: %d", errno); 938 return -2; 939 } 940 941 mt_hook_syscall(recvfrom); 942 int n = ff_hook_recvfrom(fd, buf, len, flags, from, fromlen); 943 if (n < 0) 944 { 945 if (errno == EINTR) { 946 continue; 947 } 948 949 if ((errno != EAGAIN) && (errno != EWOULDBLOCK)) 950 { 951 MTLOG_ERROR("recvfrom failed, errno: %d", errno); 952 return -3; 953 } 954 } 955 else 956 { 957 return n; 958 } 959 } 960 961 } 962 963 int MtFrame::sendto(int fd, const void *msg, int len, int flags, const struct sockaddr *to, int tolen, int timeout) 964 { 965 MtFrame* mtframe = MtFrame::Instance(); 966 utime64_t start = mtframe->GetLastClock(); 967 MicroThread* thread = mtframe->GetActiveThread(); 968 utime64_t now = 0; 969 970 if(fd<0 || !msg || len<1) 971 { 972 errno = EINVAL; 973 MTLOG_ERROR("sendto failed, errno: %d (%m)", errno); 974 return -10; 975 } 976 977 int n = 0; 978 mt_hook_syscall(sendto); 979 while ((n = ff_hook_sendto(fd, msg, len, flags, to, tolen)) < 0) 980 { 981 now = mtframe->GetLastClock(); 982 if ((int)(now - start) > timeout) 983 { 984 errno = ETIME; 985 return -1; 986 } 987 988 if (errno == EINTR) { 989 continue; 990 } 991 992 if ((errno != EAGAIN) && (errno != EWOULDBLOCK)) { 993 MTLOG_ERROR("sendto failed, errno: %d", errno); 994 return -2; 995 } 996 997 KqueuerObj epfd; 998 epfd.SetOsfd(fd); 999 epfd.EnableOutput(); 1000 epfd.SetOwnerThread(thread); 1001 if (!mtframe->KqueueSchedule(NULL, &epfd, timeout)) { 1002 return -3; 1003 } 1004 } 1005 1006 return n; 1007 } 1008 1009 int MtFrame::connect(int fd, const struct sockaddr *addr, int addrlen, int timeout) 1010 { 1011 MtFrame* mtframe = MtFrame::Instance(); 1012 utime64_t start = mtframe->GetLastClock(); 1013 MicroThread* thread = mtframe->GetActiveThread(); 1014 utime64_t now = 0; 1015 1016 if(fd<0 || !addr || addrlen<1) 1017 { 1018 errno = EINVAL; 1019 MTLOG_ERROR("connect failed, errno: %d (%m)", errno); 1020 return -10; 1021 } 1022 1023 int n = 0; 1024 mt_hook_syscall(connect); 1025 while ((n = ff_hook_connect(fd, addr, addrlen)) < 0) 1026 { 1027 now = mtframe->GetLastClock(); 1028 if ((int)(now - start) > timeout) 1029 { 1030 errno = ETIME; 1031 return -1; 1032 } 1033 1034 if (errno == EISCONN) 1035 { 1036 return 0; 1037 } 1038 1039 if (errno == EINTR) { 1040 continue; 1041 } 1042 1043 if (errno != EINPROGRESS) { 1044 MTLOG_ERROR("connect failed, errno: %d", errno); 1045 return -2; 1046 } 1047 1048 KqueuerObj epfd; 1049 epfd.SetOsfd(fd); 1050 epfd.EnableOutput(); 1051 epfd.SetOwnerThread(thread); 1052 if (!mtframe->KqueueSchedule(NULL, &epfd, timeout)) { 1053 return -3; 1054 } 1055 } 1056 1057 return n; 1058 } 1059 1060 int MtFrame::accept(int fd, struct sockaddr *addr, socklen_t *addrlen, int timeout) 1061 { 1062 MtFrame* mtframe = MtFrame::Instance(); 1063 utime64_t start = mtframe->GetLastClock(); 1064 MicroThread* thread = mtframe->GetActiveThread(); 1065 utime64_t now = 0; 1066 1067 if(fd<0) 1068 { 1069 errno = EINVAL; 1070 MTLOG_ERROR("accept failed, errno: %d (%m)", errno); 1071 return -10; 1072 } 1073 1074 int acceptfd = 0; 1075 mt_hook_syscall(accept); 1076 while ((acceptfd = ff_hook_accept(fd, addr, addrlen)) < 0) 1077 { 1078 now = mtframe->GetLastClock(); 1079 if ((int)(now - start) > timeout) 1080 { 1081 errno = ETIME; 1082 return -1; 1083 } 1084 1085 if (errno == EINTR) { 1086 continue; 1087 } 1088 1089 if (!((errno == EAGAIN) || (errno == EWOULDBLOCK))) { 1090 MTLOG_ERROR("accept failed, errno: %d", errno); 1091 return -2; 1092 } 1093 1094 KqueuerObj epfd; 1095 epfd.SetOsfd(fd); 1096 epfd.EnableInput(); 1097 epfd.SetOwnerThread(thread); 1098 if (!mtframe->KqueueSchedule(NULL, &epfd, timeout)) { 1099 return -3; 1100 } 1101 } 1102 1103 return acceptfd; 1104 } 1105 1106 ssize_t MtFrame::read(int fd, void *buf, size_t nbyte, int timeout) 1107 { 1108 MtFrame* mtframe = MtFrame::Instance(); 1109 utime64_t start = mtframe->GetLastClock(); 1110 MicroThread* thread = mtframe->GetActiveThread(); 1111 utime64_t now = 0; 1112 1113 if(fd<0 || !buf || nbyte<1) 1114 { 1115 errno = EINVAL; 1116 MTLOG_ERROR("read failed, errno: %d (%m)", errno); 1117 return -10; 1118 } 1119 1120 ssize_t n = 0; 1121 mt_hook_syscall(read); 1122 while ((n = ff_hook_read(fd, buf, nbyte)) < 0) 1123 { 1124 now = mtframe->GetLastClock(); 1125 if ((int)(now - start) > timeout) 1126 { 1127 errno = ETIME; 1128 return -1; 1129 } 1130 1131 if (errno == EINTR) { 1132 continue; 1133 } 1134 1135 if ((errno != EAGAIN) && (errno != EWOULDBLOCK)) { 1136 MTLOG_ERROR("read failed, errno: %d", errno); 1137 return -2; 1138 } 1139 1140 KqueuerObj epfd; 1141 epfd.SetOsfd(fd); 1142 epfd.EnableInput(); 1143 epfd.SetOwnerThread(thread); 1144 if (!mtframe->KqueueSchedule(NULL, &epfd, timeout)) { 1145 return -3; 1146 } 1147 } 1148 1149 return n; 1150 } 1151 1152 ssize_t MtFrame::write(int fd, const void *buf, size_t nbyte, int timeout) 1153 { 1154 MtFrame* mtframe = MtFrame::Instance(); 1155 utime64_t start = mtframe->GetLastClock(); 1156 MicroThread* thread = mtframe->GetActiveThread(); 1157 utime64_t now = 0; 1158 1159 if(fd<0 || !buf || nbyte<1) 1160 { 1161 errno = EINVAL; 1162 MTLOG_ERROR("write failed, errno: %d (%m)", errno); 1163 return -10; 1164 } 1165 1166 ssize_t n = 0; 1167 size_t send_len = 0; 1168 while (send_len < nbyte) 1169 { 1170 now = mtframe->GetLastClock(); 1171 if ((int)(now - start) > timeout) 1172 { 1173 errno = ETIME; 1174 return -1; 1175 } 1176 1177 mt_hook_syscall(write); 1178 n = ff_hook_write(fd, (char*)buf + send_len, nbyte - send_len); 1179 if (n < 0) 1180 { 1181 if (errno == EINTR) { 1182 continue; 1183 } 1184 1185 if ((errno != EAGAIN) && (errno != EWOULDBLOCK)) { 1186 MTLOG_ERROR("write failed, errno: %d", errno); 1187 return -2; 1188 } 1189 } 1190 else 1191 { 1192 send_len += n; 1193 if (send_len >= nbyte) { 1194 return nbyte; 1195 } 1196 } 1197 1198 KqueuerObj epfd; 1199 epfd.SetOsfd(fd); 1200 epfd.EnableOutput(); 1201 epfd.SetOwnerThread(thread); 1202 if (!mtframe->KqueueSchedule(NULL, &epfd, timeout)) { 1203 return -3; 1204 } 1205 } 1206 1207 return nbyte; 1208 } 1209 1210 int MtFrame::recv(int fd, void *buf, int len, int flags, int timeout) 1211 { 1212 MtFrame* mtframe = MtFrame::Instance(); 1213 utime64_t start = mtframe->GetLastClock(); 1214 MicroThread* thread = mtframe->GetActiveThread(); 1215 utime64_t now = 0; 1216 1217 if(fd<0 || !buf || len<1) 1218 { 1219 errno = EINVAL; 1220 MTLOG_ERROR("recv failed, errno: %d (%m)", errno); 1221 return -10; 1222 } 1223 1224 if (timeout <= -1) 1225 { 1226 timeout = 0x7fffffff; 1227 } 1228 1229 while (true) 1230 { 1231 now = mtframe->GetLastClock(); 1232 if ((int)(now - start) > timeout) 1233 { 1234 errno = ETIME; 1235 return -1; 1236 } 1237 1238 KqueuerObj epfd; 1239 epfd.SetOsfd(fd); 1240 epfd.EnableInput(); 1241 epfd.SetOwnerThread(thread); 1242 if (!mtframe->KqueueSchedule(NULL, &epfd, timeout)) 1243 { 1244 MTLOG_DEBUG("epoll schedule failed, errno: %d", errno); 1245 return -2; 1246 } 1247 1248 mt_hook_syscall(recv); 1249 int n = ff_hook_recv(fd, buf, len, flags); 1250 if (n < 0) 1251 { 1252 if (errno == EINTR) { 1253 continue; 1254 } 1255 1256 if ((errno != EAGAIN) && (errno != EWOULDBLOCK)) 1257 { 1258 MTLOG_ERROR("recv failed, errno: %d", errno); 1259 return -3; 1260 } 1261 } 1262 else 1263 { 1264 return n; 1265 } 1266 } 1267 1268 } 1269 1270 ssize_t MtFrame::send(int fd, const void *buf, size_t nbyte, int flags, int timeout) 1271 { 1272 MtFrame* mtframe = MtFrame::Instance(); 1273 utime64_t start = mtframe->GetLastClock(); 1274 MicroThread* thread = mtframe->GetActiveThread(); 1275 utime64_t now = 0; 1276 1277 if(fd<0 || !buf || nbyte<1) 1278 { 1279 errno = EINVAL; 1280 MTLOG_ERROR("send failed, errno: %d (%m)", errno); 1281 return -10; 1282 } 1283 1284 ssize_t n = 0; 1285 size_t send_len = 0; 1286 while (send_len < nbyte) 1287 { 1288 now = mtframe->GetLastClock(); 1289 if ((int)(now - start) > timeout) 1290 { 1291 errno = ETIME; 1292 return -1; 1293 } 1294 1295 mt_hook_syscall(send); 1296 n = ff_hook_send(fd, (char*)buf + send_len, nbyte - send_len, flags); 1297 if (n < 0) 1298 { 1299 if (errno == EINTR) { 1300 continue; 1301 } 1302 1303 if ((errno != EAGAIN) && (errno != EWOULDBLOCK)) { 1304 MTLOG_ERROR("write failed, errno: %d", errno); 1305 return -2; 1306 } 1307 } 1308 else 1309 { 1310 send_len += n; 1311 if (send_len >= nbyte) { 1312 return nbyte; 1313 } 1314 } 1315 1316 KqueuerObj epfd; 1317 epfd.SetOsfd(fd); 1318 epfd.EnableOutput(); 1319 epfd.SetOwnerThread(thread); 1320 if (!mtframe->KqueueSchedule(NULL, &epfd, timeout)) { 1321 return -3; 1322 } 1323 } 1324 1325 return nbyte; 1326 } 1327 1328 void MtFrame::sleep(int ms) 1329 { 1330 MtFrame* frame = MtFrame::Instance(); 1331 MicroThread* thread = frame->GetActiveThread(); 1332 if (thread != NULL) 1333 { 1334 thread->sleep(ms); 1335 } 1336 } 1337 1338 int MtFrame::WaitEvents(int fd, int events, int timeout) 1339 { 1340 MtFrame* mtframe = MtFrame::Instance(); 1341 utime64_t start = mtframe->GetLastClock(); 1342 MicroThread* thread = mtframe->GetActiveThread(); 1343 utime64_t now = 0; 1344 1345 if (timeout <= -1) 1346 { 1347 timeout = 0x7fffffff; 1348 } 1349 1350 while (true) 1351 { 1352 now = mtframe->GetLastClock(); 1353 if ((int)(now - start) > timeout) 1354 { 1355 errno = ETIME; 1356 return 0; 1357 } 1358 1359 KqueuerObj epfd; 1360 epfd.SetOsfd(fd); 1361 if (events & KQ_EVENT_READ) 1362 { 1363 epfd.EnableInput(); 1364 } 1365 if (events & KQ_EVENT_WRITE) 1366 { 1367 epfd.EnableOutput(); 1368 } 1369 epfd.SetOwnerThread(thread); 1370 1371 if (!mtframe->KqueueSchedule(NULL, &epfd, timeout)) 1372 { 1373 MTLOG_TRACE("epoll schedule failed, errno: %d", errno); 1374 return 0; 1375 } 1376 1377 return epfd.GetRcvEvents(); 1378 } 1379 } 1380