1 2 /** 3 * Tencent is pleased to support the open source community by making MSEC available. 4 * 5 * Copyright (C) 2016 THL A29 Limited, a Tencent company. All rights reserved. 6 * 7 * Licensed under the GNU General Public License, Version 2.0 (the "License"); 8 * you may not use this file except in compliance with the License. You may 9 * obtain a copy of the License at 10 * 11 * https://opensource.org/licenses/GPL-2.0 12 * 13 * Unless required by applicable law or agreed to in writing, software distributed under the 14 * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, 15 * either express or implied. See the License for the specific language governing permissions 16 * and limitations under the License. 17 */ 18 19 20 /** 21 * @filename micro_thread.cpp 22 * @info micro thread manager 23 */ 24 #include "mt_version.h" 25 #include "micro_thread.h" 26 #include "mt_net.h" 27 #include "valgrind.h" 28 #include <assert.h> 29 #include "mt_sys_hook.h" 30 #include "ff_hook.h" 31 #include "ff_api.h" 32 33 using namespace NS_MICRO_THREAD; 34 35 #define ASSERT(statement) 36 //#define ASSERT(statement) assert(statement) 37 38 extern "C" int save_context(jmp_buf jbf); 39 40 extern "C" void restore_context(jmp_buf jbf, int ret); 41 42 extern "C" void replace_esp(jmp_buf jbf, void* esp); 43 44 Thread::Thread(int stack_size) 45 { 46 _stack_size = stack_size ? stack_size : ThreadPool::default_stack_size; 47 _wakeup_time = 0; 48 _stack = NULL; 49 memset(&_jmpbuf, 0, sizeof(_jmpbuf)); 50 } 51 52 static DefaultLogAdapter def_log_adapt; 53 /** 54 * @brief LINUX x86/x86_64's allocated stacks. 55 */ 56 bool Thread::InitStack() 57 { 58 if (_stack) { 59 return true; 60 } 61 62 ///< stack index and memory are separated to prevent out of bounds. 63 _stack = (MtStack*)calloc(1, sizeof(MtStack)); 64 if (NULL == _stack) 65 { 66 MTLOG_ERROR("calloc stack failed, size %u", sizeof(MtStack)); 67 return false; 68 } 69 70 int memsize = MEM_PAGE_SIZE*2 + _stack_size; 71 memsize = (memsize + MEM_PAGE_SIZE - 1)/MEM_PAGE_SIZE*MEM_PAGE_SIZE; 72 73 static int zero_fd = -1; 74 int mmap_flags = MAP_PRIVATE | MAP_ANON; 75 void* vaddr = mmap(NULL, memsize, PROT_READ | PROT_WRITE, mmap_flags, zero_fd, 0); 76 if (vaddr == (void *)MAP_FAILED) 77 { 78 MTLOG_ERROR("mmap stack failed, size %d,errmsg: %s.", memsize,strerror(errno)); 79 free(_stack); 80 _stack = NULL; 81 return false; 82 } 83 _stack->_vaddr = (char*)vaddr; 84 _stack->_vaddr_size = memsize; 85 _stack->_stk_size = _stack_size; 86 _stack->_stk_bottom = _stack->_vaddr + MEM_PAGE_SIZE; 87 _stack->_stk_top = _stack->_stk_bottom + _stack->_stk_size; 88 // valgrind support: register stack frame 89 _stack->valgrind_id = VALGRIND_STACK_REGISTER(_stack->_stk_bottom, _stack->_stk_top); 90 91 _stack->_esp = _stack->_stk_top - STACK_PAD_SIZE; 92 93 mprotect(_stack->_vaddr, MEM_PAGE_SIZE, PROT_NONE); 94 mprotect(_stack->_stk_top, MEM_PAGE_SIZE, PROT_NONE); 95 96 return true; 97 } 98 99 100 void Thread::FreeStack() 101 { 102 if (!_stack) { 103 return; 104 } 105 munmap(_stack->_vaddr, _stack->_vaddr_size); 106 // valgrind support: deregister stack frame 107 VALGRIND_STACK_DEREGISTER(_stack->valgrind_id); 108 free(_stack); 109 _stack = NULL; 110 } 111 112 void Thread::InitContext() 113 { 114 if (save_context(_jmpbuf) != 0) 115 { 116 ScheduleObj::Instance()->ScheduleStartRun(); 117 } 118 119 if (_stack != NULL) 120 { 121 replace_esp(_jmpbuf, _stack->_esp); 122 } 123 } 124 125 void Thread::SwitchContext() 126 { 127 if (save_context(_jmpbuf) == 0) 128 { 129 ScheduleObj::Instance()->ScheduleThread(); 130 } 131 } 132 133 134 int Thread::SaveContext() 135 { 136 return save_context(_jmpbuf); 137 } 138 139 void Thread::RestoreContext() 140 { 141 restore_context(_jmpbuf, 1); 142 } 143 144 145 bool Thread::Initial() 146 { 147 if (!InitStack()) 148 { 149 MTLOG_ERROR("init stack failed"); 150 return false; 151 } 152 153 InitContext(); 154 155 return true; 156 } 157 158 void Thread::Destroy() 159 { 160 FreeStack(); 161 memset(&_jmpbuf, 0, sizeof(_jmpbuf)); 162 } 163 164 void Thread::Reset() 165 { 166 _wakeup_time = 0; 167 SetPrivate(NULL); 168 169 InitContext(); 170 CleanState(); 171 } 172 173 void Thread::sleep(int ms) 174 { 175 utime64_t now = ScheduleObj::Instance()->ScheduleGetTime(); 176 _wakeup_time = now + ms; 177 178 if (save_context(_jmpbuf) == 0) 179 { 180 ScheduleObj::Instance()->ScheduleSleep(); 181 } 182 } 183 184 void Thread::Wait() 185 { 186 if (save_context(_jmpbuf) == 0) 187 { 188 ScheduleObj::Instance()->SchedulePend(); 189 } 190 } 191 192 bool Thread::CheckStackHealth(char *esp) 193 { 194 if (!_stack) 195 return false; 196 197 if (esp > _stack->_stk_bottom && esp < _stack->_stk_top) 198 return true; 199 else 200 return false; 201 } 202 203 MicroThread::MicroThread(ThreadType type) 204 { 205 memset(&_entry, 0, sizeof(_entry)); 206 TAILQ_INIT(&_fdset); 207 TAILQ_INIT(&_sub_list); 208 _flag = NOT_INLIST; 209 _type = type; 210 _state = INITIAL; 211 _start = NULL; 212 _args = NULL; 213 _parent = NULL; 214 } 215 216 void MicroThread::CleanState() 217 { 218 TAILQ_INIT(&_fdset); 219 TAILQ_INIT(&_sub_list); 220 _flag = NOT_INLIST; 221 _type = NORMAL; 222 _state = INITIAL; 223 _start = NULL; 224 _args = NULL; 225 _parent = NULL; 226 } 227 228 void MicroThread::Run() 229 { 230 if (_start) { 231 _start(_args); 232 } 233 234 if (this->IsSubThread()) { 235 this->WakeupParent(); 236 } 237 238 ScheduleObj::Instance()->ScheduleReclaim(); 239 ScheduleObj::Instance()->ScheduleThread(); 240 } 241 242 void MicroThread::WakeupParent() 243 { 244 MicroThread* parent = this->GetParent(); 245 if (parent) 246 { 247 parent->RemoveSubThread(this); 248 if (parent->HasNoSubThread()) 249 { 250 ScheduleObj::Instance()->ScheduleUnpend(parent); 251 } 252 } 253 else 254 { 255 MTLOG_ERROR("Sub thread no parent, error"); 256 } 257 } 258 259 bool MicroThread::HasNoSubThread() 260 { 261 return TAILQ_EMPTY(&_sub_list); 262 } 263 264 void MicroThread::AddSubThread(MicroThread* sub) 265 { 266 ASSERT(!sub->HasFlag(MicroThread::SUB_LIST)); 267 if (!sub->HasFlag(MicroThread::SUB_LIST)) 268 { 269 TAILQ_INSERT_TAIL(&_sub_list, sub, _sub_entry); 270 sub->_parent = this; 271 } 272 273 sub->SetFlag(MicroThread::SUB_LIST); 274 } 275 276 void MicroThread::RemoveSubThread(MicroThread* sub) 277 { 278 ASSERT(sub->HasFlag(MicroThread::SUB_LIST)); 279 if (sub->HasFlag(MicroThread::SUB_LIST)) 280 { 281 TAILQ_REMOVE(&_sub_list, sub, _sub_entry); 282 sub->_parent = NULL; 283 } 284 285 sub->UnsetFlag(MicroThread::SUB_LIST); 286 } 287 288 ScheduleObj *ScheduleObj::_instance = NULL; 289 inline ScheduleObj* ScheduleObj::Instance() 290 { 291 if (NULL == _instance) 292 { 293 _instance = new ScheduleObj(); 294 } 295 296 return _instance; 297 } 298 299 void ScheduleObj::ScheduleThread() 300 { 301 MtFrame* frame = MtFrame::Instance(); 302 frame->ThreadSchdule(); 303 } 304 305 utime64_t ScheduleObj::ScheduleGetTime() 306 { 307 MtFrame* frame = MtFrame::Instance(); 308 if (frame) 309 { 310 return frame->GetLastClock(); 311 } 312 else 313 { 314 MTLOG_ERROR("frame time failed, maybe not init"); 315 return 0; 316 } 317 } 318 319 void ScheduleObj::ScheduleSleep() 320 { 321 MtFrame* frame = MtFrame::Instance(); 322 MicroThread* thread = frame->GetActiveThread(); 323 if ((!frame) || (!thread)) { 324 MTLOG_ERROR("frame and act thread null, %p, %p", frame, thread); 325 return; 326 } 327 328 frame->InsertSleep(thread); 329 frame->ThreadSchdule(); 330 } 331 332 void ScheduleObj::SchedulePend() 333 { 334 MtFrame* frame = MtFrame::Instance(); 335 MicroThread* thread = frame->GetActiveThread(); 336 if ((!frame) || (!thread)) { 337 MTLOG_ERROR("frame and act thread null, %p, %p", frame, thread); 338 return; 339 } 340 341 frame->InsertPend(thread); 342 frame->ThreadSchdule(); 343 } 344 345 void ScheduleObj::ScheduleUnpend(void* pthread) 346 { 347 MtFrame* frame = MtFrame::Instance(); 348 MicroThread* thread = (MicroThread*)pthread; 349 if ((!frame) || (!thread)) { 350 MTLOG_ERROR("frame and act thread null, %p, %p", frame, thread); 351 return; 352 } 353 354 frame->RemovePend(thread); 355 frame->InsertRunable(thread); 356 } 357 358 void ScheduleObj::ScheduleReclaim() 359 { 360 MtFrame* frame = MtFrame::Instance(); 361 MicroThread* thread = frame->GetActiveThread(); 362 if ((!frame) || (!thread)) { 363 MTLOG_ERROR("frame and act thread null, %p, %p", frame, thread); 364 return; 365 } 366 367 frame->FreeThread(thread); 368 } 369 370 void ScheduleObj::ScheduleStartRun() 371 { 372 MtFrame* frame = MtFrame::Instance(); 373 MicroThread* thread = frame->GetActiveThread(); 374 if ((!frame) || (!thread)) { 375 MTLOG_ERROR("frame and act thread null, %p, %p", frame, thread); 376 return; 377 } 378 379 thread->Run(); 380 } 381 382 383 unsigned int ThreadPool::default_thread_num = DEFAULT_THREAD_NUM; ///< 2000 micro threads. 384 unsigned int ThreadPool::last_default_thread_num = DEFAULT_THREAD_NUM; ///< 2000 micro threads. 385 unsigned int ThreadPool::default_stack_size = DEFAULT_STACK_SIZE; ///< 128k stack. 386 387 bool ThreadPool::InitialPool(int max_num) 388 { 389 MicroThread *thread = NULL; 390 for (unsigned int i = 0; i < default_thread_num; i++) 391 { 392 thread = new MicroThread(); 393 if ((NULL == thread) || (false == thread->Initial())) 394 { 395 MTLOG_ERROR("init pool, thread %p init failed", thread); 396 if (thread) delete thread; 397 continue; 398 } 399 thread->SetFlag(MicroThread::FREE_LIST); 400 _freelist.push(thread); 401 } 402 403 _total_num = _freelist.size(); 404 _max_num = max_num; 405 _use_num = 0; 406 if (_total_num <= 0) 407 { 408 return false; 409 } 410 else 411 { 412 return true; 413 } 414 } 415 416 void ThreadPool::DestroyPool() 417 { 418 MicroThread* thread = NULL; 419 while (!_freelist.empty()) 420 { 421 thread = _freelist.front(); 422 _freelist.pop(); 423 thread->Destroy(); 424 delete thread; 425 } 426 427 _total_num = 0; 428 _use_num = 0; 429 } 430 431 MicroThread* ThreadPool::AllocThread() 432 { 433 MT_ATTR_API_SET(492069, _total_num); 434 435 MicroThread* thread = NULL; 436 if (!_freelist.empty()) 437 { 438 thread = _freelist.front(); 439 _freelist.pop(); 440 441 ASSERT(thread->HasFlag(MicroThread::FREE_LIST)); 442 443 thread->UnsetFlag(MicroThread::FREE_LIST); 444 _use_num++; 445 return thread; 446 } 447 448 MT_ATTR_API(320846, 1); // pool no nore 449 if (_total_num >= _max_num) 450 { 451 MT_ATTR_API(361140, 1); // no more quota 452 MTLOG_ERROR("total %d is outof max: %d", _total_num,_max_num); 453 return NULL; 454 } 455 456 thread = new MicroThread(); 457 if ((NULL == thread) || (false == thread->Initial())) 458 { 459 MT_ATTR_API(320847, 1); // pool init fail 460 MTLOG_ERROR("thread alloc failed, thread: %p", thread); 461 if (thread) delete thread; 462 return NULL; 463 } 464 _total_num++; 465 _use_num++; 466 if(_use_num >(int) default_thread_num){ 467 if(((int) default_thread_num * 2 )< _max_num){ 468 last_default_thread_num = default_thread_num; 469 default_thread_num = default_thread_num * 2; 470 } 471 } 472 473 return thread; 474 } 475 476 void ThreadPool::FreeThread(MicroThread* thread) 477 { 478 ASSERT(!thread->HasFlag(MicroThread::FREE_LIST)); 479 thread->Reset(); 480 _use_num--; 481 _freelist.push(thread); 482 thread->SetFlag(MicroThread::FREE_LIST); 483 484 unsigned int free_num = _freelist.size(); 485 if ((free_num > default_thread_num) && (free_num > 1)) 486 { 487 thread = _freelist.front(); 488 _freelist.pop(); 489 thread->Destroy(); 490 delete thread; 491 _total_num--; 492 if(default_thread_num / 2 >= DEFAULT_THREAD_NUM){ 493 last_default_thread_num = default_thread_num; 494 default_thread_num = default_thread_num / 2; 495 } 496 } 497 } 498 499 int ThreadPool::GetUsedNum(void) 500 { 501 return _use_num; 502 } 503 504 MtFrame *MtFrame::_instance = NULL; 505 inline MtFrame* MtFrame::Instance () 506 { 507 if (NULL == _instance ) 508 { 509 _instance = new MtFrame(); 510 } 511 512 return _instance; 513 } 514 515 void MtFrame::SetHookFlag() { 516 mt_set_hook_flag(); 517 }; 518 519 bool MtFrame::InitFrame(LogAdapter* logadpt, int max_thread_num) 520 { 521 if(logadpt == NULL){ 522 _log_adpt = &def_log_adapt; 523 }else{ 524 _log_adpt = logadpt; 525 } 526 527 if ((this->InitKqueue(max_thread_num) < 0) || !this->InitialPool(max_thread_num)) 528 { 529 MTLOG_ERROR("Init epoll or thread pool failed"); 530 this->Destroy(); 531 return false; 532 } 533 if (_sleeplist.HeapResize(max_thread_num * 2) < 0) 534 { 535 MTLOG_ERROR("Init heap list failed"); 536 this->Destroy(); 537 return false; 538 } 539 540 _timer = new CTimerMng(max_thread_num * 2); 541 if (NULL == _timer) 542 { 543 MTLOG_ERROR("Init heap timer failed"); 544 this->Destroy(); 545 return false; 546 } 547 548 _daemon = AllocThread(); 549 if (NULL == _daemon) 550 { 551 MTLOG_ERROR("Alloc daemon thread failed"); 552 this->Destroy(); 553 return false; 554 } 555 _daemon->SetType(MicroThread::DAEMON); 556 _daemon->SetState(MicroThread::RUNABLE); 557 _daemon->SetSartFunc(MtFrame::DaemonRun, this); 558 559 _primo = new MicroThread(MicroThread::PRIMORDIAL); 560 if (NULL == _primo) 561 { 562 MTLOG_ERROR("new _primo thread failed"); 563 this->Destroy(); 564 return false; 565 } 566 _primo->SetState(MicroThread::RUNNING); 567 SetActiveThread(_primo); 568 569 _last_clock = GetSystemMS(); 570 TAILQ_INIT(&_iolist); 571 TAILQ_INIT(&_pend_list); 572 573 //SetHookFlag(); 574 575 return true; 576 577 } 578 579 void MtFrame::Destroy(void) 580 { 581 if (NULL == _instance ) 582 { 583 return; 584 } 585 586 if (_primo) { 587 delete _primo; 588 _primo = NULL; 589 } 590 591 if (_daemon) { 592 FreeThread(_daemon); 593 _daemon = NULL; 594 } 595 596 TAILQ_INIT(&_iolist); 597 598 MicroThread* thread = dynamic_cast<MicroThread*>(_sleeplist.HeapPop()); 599 while (thread) 600 { 601 FreeThread(thread); 602 thread = dynamic_cast<MicroThread*>(_sleeplist.HeapPop()); 603 } 604 605 while (!_runlist.empty()) 606 { 607 thread = _runlist.front(); 608 _runlist.pop(); 609 FreeThread(thread); 610 } 611 612 MicroThread* tmp; 613 TAILQ_FOREACH_SAFE(thread, &_pend_list, _entry, tmp) 614 { 615 TAILQ_REMOVE(&_pend_list, thread, _entry); 616 FreeThread(thread); 617 } 618 619 if (_timer != NULL) 620 { 621 delete _timer; 622 _timer = NULL; 623 } 624 625 _instance->DestroyPool(); 626 _instance->TermKqueue(); 627 delete _instance; 628 _instance = NULL; 629 } 630 631 char* MtFrame::Version() 632 { 633 return IMT_VERSION; 634 } 635 636 MicroThread* MtFrame::CreateThread(ThreadStart entry, void *args, bool runable) 637 { 638 MtFrame* mtframe = MtFrame::Instance(); 639 MicroThread* thread = mtframe->AllocThread(); 640 if (NULL == thread) 641 { 642 MTLOG_ERROR("create thread failed"); 643 return NULL; 644 } 645 thread->SetSartFunc(entry, args); 646 647 if (runable) { 648 mtframe->InsertRunable(thread); 649 } 650 651 return thread; 652 } 653 654 int MtFrame::Loop(void* args) 655 { 656 MtFrame* mtframe = MtFrame::Instance(); 657 MicroThread* daemon = mtframe->DaemonThread(); 658 659 mtframe->KqueueDispatch(); 660 mtframe->SetLastClock(mtframe->GetSystemMS()); 661 mtframe->WakeupTimeout(); 662 mtframe->CheckExpired(); 663 daemon->SwitchContext(); 664 665 return 0; 666 } 667 668 void MtFrame::DaemonRun(void* args) 669 { 670 /* 671 MtFrame* mtframe = MtFrame::Instance(); 672 MicroThread* daemon = mtframe->DaemonThread(); 673 674 while (true) { 675 mtframe->KqueueDispatch(); 676 mtframe->SetLastClock(mtframe->GetSystemMS()); 677 mtframe->WakeupTimeout(); 678 mtframe->CheckExpired(); 679 daemon->SwitchContext(); 680 } 681 */ 682 ff_run(MtFrame::Loop, NULL); 683 } 684 685 MicroThread *MtFrame::GetRootThread() 686 { 687 if (NULL == _curr_thread) 688 { 689 return NULL; 690 } 691 692 MicroThread::ThreadType type = _curr_thread->GetType(); 693 MicroThread *thread = _curr_thread; 694 MicroThread *parent = thread; 695 696 while (MicroThread::SUB_THREAD == type) 697 { 698 thread = thread->GetParent(); 699 if (!thread) 700 { 701 break; 702 } 703 704 type = thread->GetType(); 705 parent = thread; 706 } 707 708 return parent; 709 } 710 711 void MtFrame::ThreadSchdule() 712 { 713 MicroThread* thread = NULL; 714 MtFrame* mtframe = MtFrame::Instance(); 715 716 if (mtframe->_runlist.empty()) 717 { 718 thread = mtframe->DaemonThread(); 719 } 720 else 721 { 722 thread = mtframe->_runlist.front(); 723 mtframe->RemoveRunable(thread); 724 } 725 726 this->SetActiveThread(thread); 727 thread->SetState(MicroThread::RUNNING); 728 thread->RestoreContext(); 729 } 730 731 void MtFrame::CheckExpired() 732 { 733 static utime64_t check_time = 0; 734 735 if (_timer != NULL) 736 { 737 _timer->check_expired(); 738 } 739 740 utime64_t now = GetLastClock(); 741 742 if ((now - check_time) > 1000) 743 { 744 CNetMgr::Instance()->RecycleObjs(now); 745 check_time = now; 746 } 747 } 748 749 void MtFrame::WakeupTimeout() 750 { 751 utime64_t now = GetLastClock(); 752 MicroThread* thread = dynamic_cast<MicroThread*>(_sleeplist.HeapTop()); 753 while (thread && (thread->GetWakeupTime() <= now)) 754 { 755 if (thread->HasFlag(MicroThread::IO_LIST)) 756 { 757 RemoveIoWait(thread); 758 } 759 else 760 { 761 RemoveSleep(thread); 762 } 763 764 InsertRunable(thread); 765 766 thread = dynamic_cast<MicroThread*>(_sleeplist.HeapTop()); 767 } 768 } 769 770 int MtFrame::KqueueGetTimeout() 771 { 772 utime64_t now = GetLastClock(); 773 MicroThread* thread = dynamic_cast<MicroThread*>(_sleeplist.HeapTop()); 774 if (!thread) 775 { 776 return 10; //default 10ms epollwait 777 } 778 else if (thread->GetWakeupTime() < now) 779 { 780 return 0; 781 } 782 else 783 { 784 return (int)(thread->GetWakeupTime() - now); 785 } 786 } 787 788 inline void MtFrame::InsertSleep(MicroThread* thread) 789 { 790 ASSERT(!thread->HasFlag(MicroThread::SLEEP_LIST)); 791 792 thread->SetFlag(MicroThread::SLEEP_LIST); 793 thread->SetState(MicroThread::SLEEPING); 794 int rc = _sleeplist.HeapPush(thread); 795 if (rc < 0) 796 { 797 MT_ATTR_API(320848, 1); // heap error 798 MTLOG_ERROR("Insert heap failed , rc %d", rc); 799 } 800 } 801 802 inline void MtFrame::RemoveSleep(MicroThread* thread) 803 { 804 ASSERT(thread->HasFlag(MicroThread::SLEEP_LIST)); 805 thread->UnsetFlag(MicroThread::SLEEP_LIST); 806 807 int rc = _sleeplist.HeapDelete(thread); 808 if (rc < 0) 809 { 810 MT_ATTR_API(320849, 1); // heap error 811 MTLOG_ERROR("remove heap failed , rc %d", rc); 812 } 813 } 814 815 inline void MtFrame::InsertIoWait(MicroThread* thread) 816 { 817 ASSERT(!thread->HasFlag(MicroThread::IO_LIST)); 818 thread->SetFlag(MicroThread::IO_LIST); 819 TAILQ_INSERT_TAIL(&_iolist, thread, _entry); 820 InsertSleep(thread); 821 } 822 823 void MtFrame::RemoveIoWait(MicroThread* thread) 824 { 825 ASSERT(thread->HasFlag(MicroThread::IO_LIST)); 826 thread->UnsetFlag(MicroThread::IO_LIST); 827 TAILQ_REMOVE(&_iolist, thread, _entry); 828 829 RemoveSleep(thread); 830 } 831 832 void MtFrame::InsertRunable(MicroThread* thread) 833 { 834 ASSERT(!thread->HasFlag(MicroThread::RUN_LIST)); 835 thread->SetFlag(MicroThread::RUN_LIST); 836 837 thread->SetState(MicroThread::RUNABLE); 838 _runlist.push(thread); 839 _waitnum++; 840 } 841 842 inline void MtFrame::RemoveRunable(MicroThread* thread) 843 { 844 ASSERT(thread->HasFlag(MicroThread::RUN_LIST)); 845 ASSERT(thread == _runlist.front()); 846 thread->UnsetFlag(MicroThread::RUN_LIST); 847 848 _runlist.pop(); 849 _waitnum--; 850 } 851 852 void MtFrame::InsertPend(MicroThread* thread) 853 { 854 ASSERT(!thread->HasFlag(MicroThread::PEND_LIST)); 855 thread->SetFlag(MicroThread::PEND_LIST); 856 TAILQ_INSERT_TAIL(&_pend_list, thread, _entry); 857 thread->SetState(MicroThread::PENDING); 858 } 859 860 void MtFrame::RemovePend(MicroThread* thread) 861 { 862 ASSERT(thread->HasFlag(MicroThread::PEND_LIST)); 863 thread->UnsetFlag(MicroThread::PEND_LIST); 864 TAILQ_REMOVE(&_pend_list, thread, _entry); 865 } 866 867 void MtFrame::WaitNotify(utime64_t timeout) 868 { 869 MicroThread* thread = GetActiveThread(); 870 871 thread->SetWakeupTime(timeout + this->GetLastClock()); 872 this->InsertIoWait(thread); 873 thread->SwitchContext(); 874 } 875 876 void MtFrame::NotifyThread(MicroThread* thread) 877 { 878 if(thread == NULL){ 879 return; 880 } 881 MicroThread* cur_thread = GetActiveThread(); 882 if (thread->HasFlag(MicroThread::IO_LIST)) 883 { 884 this->RemoveIoWait(thread); 885 if(cur_thread == this->DaemonThread()){ 886 // ���ﲻֱ���еĻ�,���Dz���ʱ,�ᵼ��Ŀ���̵߳ȴ�����ʱ 887 if(cur_thread->SaveContext() == 0){ 888 this->SetActiveThread(thread); 889 thread->SetState(MicroThread::RUNNING); 890 thread->RestoreContext(); 891 } 892 }else{ 893 this->InsertRunable(thread); 894 } 895 } 896 } 897 898 void MtFrame::SwapDaemonThread() 899 { 900 MicroThread* thread = GetActiveThread(); 901 MicroThread* daemon_thread = this->DaemonThread(); 902 if(thread != daemon_thread){ 903 if(thread->SaveContext() == 0){ 904 this->InsertRunable(thread); 905 this->SetActiveThread(daemon_thread); 906 daemon_thread->SetState(MicroThread::RUNNING); 907 daemon_thread->RestoreContext(); 908 } 909 } 910 } 911 912 bool MtFrame::KqueueSchedule(KqObjList* fdlist, KqueuerObj* fd, int timeout) 913 { 914 MicroThread* thread = GetActiveThread(); 915 if (NULL == thread) 916 { 917 MTLOG_ERROR("active thread null, epoll schedule failed"); 918 return false; 919 } 920 921 thread->ClearAllFd(); 922 if (fdlist) 923 { 924 thread->AddFdList(fdlist); 925 } 926 if (fd) 927 { 928 thread->AddFd(fd); 929 } 930 931 thread->SetWakeupTime(timeout + this->GetLastClock()); 932 if (!this->KqueueAdd(thread->GetFdSet())) 933 { 934 MTLOG_ERROR("epoll add failed, errno: %d", errno); 935 return false; 936 } 937 this->InsertIoWait(thread); 938 thread->SwitchContext(); 939 940 int rcvnum = 0; 941 KqObjList& rcvfds = thread->GetFdSet(); 942 KqueuerObj* fdata = NULL; 943 TAILQ_FOREACH(fdata, &rcvfds, _entry) 944 { 945 if (fdata->GetRcvEvents() != 0) 946 { 947 rcvnum++; 948 } 949 } 950 this->KqueueDel(rcvfds); 951 952 if (rcvnum == 0) 953 { 954 errno = ETIME; 955 return false; 956 } 957 958 return true; 959 } 960 961 int MtFrame::recvfrom(int fd, void *buf, int len, int flags, struct sockaddr *from, socklen_t *fromlen, int timeout) 962 { 963 MtFrame* mtframe = MtFrame::Instance(); 964 utime64_t start = mtframe->GetLastClock(); 965 MicroThread* thread = mtframe->GetActiveThread(); 966 utime64_t now = 0; 967 968 if(fd<0 || !buf || len<1) 969 { 970 errno = EINVAL; 971 MTLOG_ERROR("recvfrom failed, errno: %d (%m)", errno); 972 return -10; 973 } 974 975 if (timeout <= -1) 976 { 977 timeout = 0x7fffffff; 978 } 979 980 while (true) 981 { 982 now = mtframe->GetLastClock(); 983 if ((int)(now - start) > timeout) 984 { 985 errno = ETIME; 986 return -1; 987 } 988 989 KqueuerObj epfd; 990 epfd.SetOsfd(fd); 991 epfd.EnableInput(); 992 epfd.SetOwnerThread(thread); 993 if (!mtframe->KqueueSchedule(NULL, &epfd, timeout)) 994 { 995 MTLOG_DEBUG("epoll schedule failed, errno: %d", errno); 996 return -2; 997 } 998 999 mt_hook_syscall(recvfrom); 1000 int n = ff_hook_recvfrom(fd, buf, len, flags, from, fromlen); 1001 if (n < 0) 1002 { 1003 if (errno == EINTR) { 1004 continue; 1005 } 1006 1007 if ((errno != EAGAIN) && (errno != EWOULDBLOCK)) 1008 { 1009 MTLOG_ERROR("recvfrom failed, errno: %d", errno); 1010 return -3; 1011 } 1012 } 1013 else 1014 { 1015 return n; 1016 } 1017 } 1018 1019 } 1020 1021 int MtFrame::sendto(int fd, const void *msg, int len, int flags, const struct sockaddr *to, int tolen, int timeout) 1022 { 1023 MtFrame* mtframe = MtFrame::Instance(); 1024 utime64_t start = mtframe->GetLastClock(); 1025 MicroThread* thread = mtframe->GetActiveThread(); 1026 utime64_t now = 0; 1027 1028 if(fd<0 || !msg || len<1) 1029 { 1030 errno = EINVAL; 1031 MTLOG_ERROR("sendto failed, errno: %d (%m)", errno); 1032 return -10; 1033 } 1034 1035 int n = 0; 1036 mt_hook_syscall(sendto); 1037 while ((n = ff_hook_sendto(fd, msg, len, flags, to, tolen)) < 0) 1038 { 1039 now = mtframe->GetLastClock(); 1040 if ((int)(now - start) > timeout) 1041 { 1042 errno = ETIME; 1043 return -1; 1044 } 1045 1046 if (errno == EINTR) { 1047 continue; 1048 } 1049 1050 if ((errno != EAGAIN) && (errno != EWOULDBLOCK)) { 1051 MTLOG_ERROR("sendto failed, errno: %d", errno); 1052 return -2; 1053 } 1054 1055 KqueuerObj epfd; 1056 epfd.SetOsfd(fd); 1057 epfd.EnableOutput(); 1058 epfd.SetOwnerThread(thread); 1059 if (!mtframe->KqueueSchedule(NULL, &epfd, timeout)) { 1060 return -3; 1061 } 1062 } 1063 1064 return n; 1065 } 1066 1067 int MtFrame::connect(int fd, const struct sockaddr *addr, int addrlen, int timeout) 1068 { 1069 MtFrame* mtframe = MtFrame::Instance(); 1070 utime64_t start = mtframe->GetLastClock(); 1071 MicroThread* thread = mtframe->GetActiveThread(); 1072 utime64_t now = 0; 1073 1074 if(fd<0 || !addr || addrlen<1) 1075 { 1076 errno = EINVAL; 1077 MTLOG_ERROR("connect failed, errno: %d (%m)", errno); 1078 return -10; 1079 } 1080 1081 int n = 0; 1082 mt_hook_syscall(connect); 1083 while ((n = ff_hook_connect(fd, addr, addrlen)) < 0) 1084 { 1085 now = mtframe->GetLastClock(); 1086 if ((int)(now - start) > timeout) 1087 { 1088 errno = ETIME; 1089 return -1; 1090 } 1091 1092 if (errno == EISCONN) 1093 { 1094 return 0; 1095 } 1096 1097 if (errno == EINTR) { 1098 continue; 1099 } 1100 1101 if (errno != EINPROGRESS) { 1102 MTLOG_ERROR("connect failed, errno: %d", errno); 1103 return -2; 1104 } 1105 1106 KqueuerObj epfd; 1107 epfd.SetOsfd(fd); 1108 epfd.EnableOutput(); 1109 epfd.SetOwnerThread(thread); 1110 if (!mtframe->KqueueSchedule(NULL, &epfd, timeout)) { 1111 return -3; 1112 } 1113 } 1114 1115 return n; 1116 } 1117 1118 int MtFrame::accept(int fd, struct sockaddr *addr, socklen_t *addrlen, int timeout) 1119 { 1120 MtFrame* mtframe = MtFrame::Instance(); 1121 utime64_t start = mtframe->GetLastClock(); 1122 MicroThread* thread = mtframe->GetActiveThread(); 1123 utime64_t now = 0; 1124 1125 if(fd<0) 1126 { 1127 errno = EINVAL; 1128 MTLOG_ERROR("accept failed, errno: %d (%m)", errno); 1129 return -10; 1130 } 1131 1132 int acceptfd = 0; 1133 mt_hook_syscall(accept); 1134 while ((acceptfd = ff_hook_accept(fd, addr, addrlen)) < 0) 1135 { 1136 now = mtframe->GetLastClock(); 1137 if ((int)(now - start) > timeout) 1138 { 1139 errno = ETIME; 1140 return -1; 1141 } 1142 1143 if (errno == EINTR) { 1144 continue; 1145 } 1146 1147 if (!((errno == EAGAIN) || (errno == EWOULDBLOCK))) { 1148 MTLOG_ERROR("accept failed, errno: %d", errno); 1149 return -2; 1150 } 1151 1152 KqueuerObj epfd; 1153 epfd.SetOsfd(fd); 1154 epfd.EnableInput(); 1155 epfd.SetOwnerThread(thread); 1156 if (!mtframe->KqueueSchedule(NULL, &epfd, timeout)) { 1157 return -3; 1158 } 1159 } 1160 1161 return acceptfd; 1162 } 1163 1164 ssize_t MtFrame::read(int fd, void *buf, size_t nbyte, int timeout) 1165 { 1166 MtFrame* mtframe = MtFrame::Instance(); 1167 utime64_t start = mtframe->GetLastClock(); 1168 MicroThread* thread = mtframe->GetActiveThread(); 1169 utime64_t now = 0; 1170 1171 if(fd<0 || !buf || nbyte<1) 1172 { 1173 errno = EINVAL; 1174 MTLOG_ERROR("read failed, errno: %d (%m)", errno); 1175 return -10; 1176 } 1177 1178 ssize_t n = 0; 1179 mt_hook_syscall(read); 1180 while ((n = ff_hook_read(fd, buf, nbyte)) < 0) 1181 { 1182 now = mtframe->GetLastClock(); 1183 if ((int)(now - start) > timeout) 1184 { 1185 errno = ETIME; 1186 return -1; 1187 } 1188 1189 if (errno == EINTR) { 1190 continue; 1191 } 1192 1193 if ((errno != EAGAIN) && (errno != EWOULDBLOCK)) { 1194 MTLOG_ERROR("read failed, errno: %d", errno); 1195 return -2; 1196 } 1197 1198 KqueuerObj epfd; 1199 epfd.SetOsfd(fd); 1200 epfd.EnableInput(); 1201 epfd.SetOwnerThread(thread); 1202 if (!mtframe->KqueueSchedule(NULL, &epfd, timeout)) { 1203 return -3; 1204 } 1205 } 1206 1207 return n; 1208 } 1209 1210 ssize_t MtFrame::write(int fd, const void *buf, size_t nbyte, int timeout) 1211 { 1212 MtFrame* mtframe = MtFrame::Instance(); 1213 utime64_t start = mtframe->GetLastClock(); 1214 MicroThread* thread = mtframe->GetActiveThread(); 1215 utime64_t now = 0; 1216 1217 if(fd<0 || !buf || nbyte<1) 1218 { 1219 errno = EINVAL; 1220 MTLOG_ERROR("write failed, errno: %d (%m)", errno); 1221 return -10; 1222 } 1223 1224 ssize_t n = 0; 1225 size_t send_len = 0; 1226 while (send_len < nbyte) 1227 { 1228 now = mtframe->GetLastClock(); 1229 if ((int)(now - start) > timeout) 1230 { 1231 errno = ETIME; 1232 return -1; 1233 } 1234 1235 mt_hook_syscall(write); 1236 n = ff_hook_write(fd, (char*)buf + send_len, nbyte - send_len); 1237 if (n < 0) 1238 { 1239 if (errno == EINTR) { 1240 continue; 1241 } 1242 1243 if ((errno != EAGAIN) && (errno != EWOULDBLOCK)) { 1244 MTLOG_ERROR("write failed, errno: %d", errno); 1245 return -2; 1246 } 1247 } 1248 else 1249 { 1250 send_len += n; 1251 if (send_len >= nbyte) { 1252 return nbyte; 1253 } 1254 } 1255 1256 KqueuerObj epfd; 1257 epfd.SetOsfd(fd); 1258 epfd.EnableOutput(); 1259 epfd.SetOwnerThread(thread); 1260 if (!mtframe->KqueueSchedule(NULL, &epfd, timeout)) { 1261 return -3; 1262 } 1263 } 1264 1265 return nbyte; 1266 } 1267 1268 int MtFrame::recv(int fd, void *buf, int len, int flags, int timeout) 1269 { 1270 MtFrame* mtframe = MtFrame::Instance(); 1271 utime64_t start = mtframe->GetLastClock(); 1272 MicroThread* thread = mtframe->GetActiveThread(); 1273 utime64_t now = 0; 1274 1275 if(fd<0 || !buf || len<1) 1276 { 1277 errno = EINVAL; 1278 MTLOG_ERROR("recv failed, errno: %d (%m)", errno); 1279 return -10; 1280 } 1281 1282 if (timeout <= -1) 1283 { 1284 timeout = 0x7fffffff; 1285 } 1286 1287 while (true) 1288 { 1289 now = mtframe->GetLastClock(); 1290 if ((int)(now - start) > timeout) 1291 { 1292 errno = ETIME; 1293 return -1; 1294 } 1295 1296 KqueuerObj epfd; 1297 epfd.SetOsfd(fd); 1298 epfd.EnableInput(); 1299 epfd.SetOwnerThread(thread); 1300 if (!mtframe->KqueueSchedule(NULL, &epfd, timeout)) 1301 { 1302 MTLOG_DEBUG("epoll schedule failed, errno: %d", errno); 1303 return -2; 1304 } 1305 1306 mt_hook_syscall(recv); 1307 int n = ff_hook_recv(fd, buf, len, flags); 1308 if (n < 0) 1309 { 1310 if (errno == EINTR) { 1311 continue; 1312 } 1313 1314 if ((errno != EAGAIN) && (errno != EWOULDBLOCK)) 1315 { 1316 MTLOG_ERROR("recv failed, errno: %d", errno); 1317 return -3; 1318 } 1319 } 1320 else 1321 { 1322 return n; 1323 } 1324 } 1325 1326 } 1327 1328 ssize_t MtFrame::send(int fd, const void *buf, size_t nbyte, int flags, int timeout) 1329 { 1330 MtFrame* mtframe = MtFrame::Instance(); 1331 utime64_t start = mtframe->GetLastClock(); 1332 MicroThread* thread = mtframe->GetActiveThread(); 1333 utime64_t now = 0; 1334 1335 if(fd<0 || !buf || nbyte<1) 1336 { 1337 errno = EINVAL; 1338 MTLOG_ERROR("send failed, errno: %d (%m)", errno); 1339 return -10; 1340 } 1341 1342 ssize_t n = 0; 1343 size_t send_len = 0; 1344 while (send_len < nbyte) 1345 { 1346 now = mtframe->GetLastClock(); 1347 if ((int)(now - start) > timeout) 1348 { 1349 errno = ETIME; 1350 return -1; 1351 } 1352 1353 mt_hook_syscall(send); 1354 n = ff_hook_send(fd, (char*)buf + send_len, nbyte - send_len, flags); 1355 if (n < 0) 1356 { 1357 if (errno == EINTR) { 1358 continue; 1359 } 1360 1361 if ((errno != EAGAIN) && (errno != EWOULDBLOCK)) { 1362 MTLOG_ERROR("write failed, errno: %d", errno); 1363 return -2; 1364 } 1365 } 1366 else 1367 { 1368 send_len += n; 1369 if (send_len >= nbyte) { 1370 return nbyte; 1371 } 1372 } 1373 1374 KqueuerObj epfd; 1375 epfd.SetOsfd(fd); 1376 epfd.EnableOutput(); 1377 epfd.SetOwnerThread(thread); 1378 if (!mtframe->KqueueSchedule(NULL, &epfd, timeout)) { 1379 return -3; 1380 } 1381 } 1382 1383 return nbyte; 1384 } 1385 1386 void MtFrame::sleep(int ms) 1387 { 1388 MtFrame* frame = MtFrame::Instance(); 1389 MicroThread* thread = frame->GetActiveThread(); 1390 if (thread != NULL) 1391 { 1392 thread->sleep(ms); 1393 } 1394 } 1395 1396 int MtFrame::WaitEvents(int fd, int events, int timeout) 1397 { 1398 MtFrame* mtframe = MtFrame::Instance(); 1399 utime64_t start = mtframe->GetLastClock(); 1400 MicroThread* thread = mtframe->GetActiveThread(); 1401 utime64_t now = 0; 1402 1403 if (timeout <= -1) 1404 { 1405 timeout = 0x7fffffff; 1406 } 1407 1408 while (true) 1409 { 1410 now = mtframe->GetLastClock(); 1411 if ((int)(now - start) > timeout) 1412 { 1413 errno = ETIME; 1414 return 0; 1415 } 1416 1417 KqueuerObj epfd; 1418 epfd.SetOsfd(fd); 1419 if (events & KQ_EVENT_READ) 1420 { 1421 epfd.EnableInput(); 1422 } 1423 if (events & KQ_EVENT_WRITE) 1424 { 1425 epfd.EnableOutput(); 1426 } 1427 epfd.SetOwnerThread(thread); 1428 1429 if (!mtframe->KqueueSchedule(NULL, &epfd, timeout)) 1430 { 1431 MTLOG_TRACE("epoll schedule failed, errno: %d", errno); 1432 return 0; 1433 } 1434 1435 return epfd.GetRcvEvents(); 1436 } 1437 } 1438