1 2 /** 3 * Tencent is pleased to support the open source community by making MSEC available. 4 * 5 * Copyright (C) 2016 THL A29 Limited, a Tencent company. All rights reserved. 6 * 7 * Licensed under the GNU General Public License, Version 2.0 (the "License"); 8 * you may not use this file except in compliance with the License. You may 9 * obtain a copy of the License at 10 * 11 * https://opensource.org/licenses/GPL-2.0 12 * 13 * Unless required by applicable law or agreed to in writing, software distributed under the 14 * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, 15 * either express or implied. See the License for the specific language governing permissions 16 * and limitations under the License. 17 */ 18 19 20 /** 21 * @filename micro_thread.cpp 22 * @info micro thread manager 23 */ 24 #include "mt_version.h" 25 #include "micro_thread.h" 26 #include "mt_net.h" 27 #include "valgrind.h" 28 #include <assert.h> 29 #include "mt_sys_hook.h" 30 #include "ff_hook.h" 31 #include "ff_api.h" 32 33 using namespace NS_MICRO_THREAD; 34 35 #define ASSERT(statement) 36 //#define ASSERT(statement) assert(statement) 37 38 /** 39 * @brief ���ʵ�ֱ��������ĺ��� 40 * @param jbf jmpbuff����ָ�� 41 */ 42 extern "C" int save_context(jmp_buf jbf); 43 44 /** 45 * @brief ���ʵ�ָֻ������ĺ��� 46 * @param jbf jmpbuff����ָ�� 47 * @param ret �лصķ���ֵ, Ĭ��1 48 */ 49 extern "C" void restore_context(jmp_buf jbf, int ret); 50 51 /** 52 * @brief ���ʵ���滻����ջ���� 53 * @param jbf jmpbuff����ָ�� 54 * @param esp ��ջָ�� 55 */ 56 extern "C" void replace_esp(jmp_buf jbf, void* esp); 57 58 /** 59 * @brief ���캯��, Ĭ�ϲ���ջ��С 60 */ 61 Thread::Thread(int stack_size) 62 { 63 _stack_size = stack_size ? stack_size : ThreadPool::default_stack_size; 64 _wakeup_time = 0; 65 _stack = NULL; 66 memset(&_jmpbuf, 0, sizeof(_jmpbuf)); 67 } 68 69 70 /** 71 * @brief LINUX x86/x86_64�µ�ջ����, �����ܹ�����Ҫע����� 72 */ 73 bool Thread::InitStack() 74 { 75 if (_stack) { 76 return true; 77 } 78 79 ///< ջ������ջ�ڴ����, ��Խ�� 80 _stack = (MtStack*)calloc(1, sizeof(MtStack)); 81 if (NULL == _stack) 82 { 83 MTLOG_ERROR("calloc stack failed, size %u", sizeof(MtStack)); 84 return false; 85 } 86 87 int memsize = MEM_PAGE_SIZE*2 + _stack_size; 88 memsize = (memsize + MEM_PAGE_SIZE - 1)/MEM_PAGE_SIZE*MEM_PAGE_SIZE; 89 90 static int zero_fd = -1; 91 int mmap_flags = MAP_PRIVATE | MAP_ANON; 92 void* vaddr = mmap(NULL, memsize, PROT_READ | PROT_WRITE, mmap_flags, zero_fd, 0); 93 if (vaddr == (void *)MAP_FAILED) 94 { 95 MTLOG_ERROR("mmap stack failed, size %d", memsize); 96 free(_stack); 97 _stack = NULL; 98 return false; 99 } 100 _stack->_vaddr = (char*)vaddr; 101 _stack->_vaddr_size = memsize; 102 _stack->_stk_size = _stack_size; 103 _stack->_stk_bottom = _stack->_vaddr + MEM_PAGE_SIZE; 104 _stack->_stk_top = _stack->_stk_bottom + _stack->_stk_size; 105 // valgrind support: register stack frame 106 _stack->valgrind_id = VALGRIND_STACK_REGISTER(_stack->_stk_bottom, _stack->_stk_top); 107 108 _stack->_esp = _stack->_stk_top - STACK_PAD_SIZE; 109 110 mprotect(_stack->_vaddr, MEM_PAGE_SIZE, PROT_NONE); 111 mprotect(_stack->_stk_top, MEM_PAGE_SIZE, PROT_NONE); 112 113 return true; 114 } 115 116 117 /** 118 * @brief �ͷŶ�ջ��Ϣ 119 */ 120 void Thread::FreeStack() 121 { 122 if (!_stack) { 123 return; 124 } 125 munmap(_stack->_vaddr, _stack->_vaddr_size); 126 // valgrind support: deregister stack frame 127 VALGRIND_STACK_DEREGISTER(_stack->valgrind_id); 128 free(_stack); 129 _stack = NULL; 130 } 131 132 /** 133 * @brief ��ʼ��������,���üĴ���,��ջ 134 */ 135 void Thread::InitContext() 136 { 137 if (save_context(_jmpbuf) != 0) 138 { 139 ScheduleObj::Instance()->ScheduleStartRun(); // ֱ�ӵ��� this->run? 140 } 141 142 if (_stack != NULL) 143 { 144 replace_esp(_jmpbuf, _stack->_esp); 145 } 146 } 147 148 /** 149 * @brief �����л�, ����״̬, �������� 150 */ 151 void Thread::SwitchContext() 152 { 153 if (save_context(_jmpbuf) == 0) 154 { 155 ScheduleObj::Instance()->ScheduleThread(); 156 } 157 } 158 159 /** 160 * @brief �ָ�������, �л��ضϵ�,�������� 161 */ 162 void Thread::RestoreContext() 163 { 164 restore_context(_jmpbuf, 1); 165 } 166 167 /** 168 * @brief ��ʼ���߳�,���ջ�������ij�ʼ�� 169 */ 170 bool Thread::Initial() 171 { 172 if (!InitStack()) 173 { 174 MTLOG_ERROR("init stack failed"); 175 return false; 176 } 177 178 InitContext(); 179 180 return true; 181 } 182 183 /** 184 * @brief ��ֹ�߳�,���ջ���������ͷ� 185 */ 186 void Thread::Destroy() 187 { 188 FreeStack(); 189 memset(&_jmpbuf, 0, sizeof(_jmpbuf)); 190 } 191 192 /** 193 * @brief �߳�״̬����, �ɸ���״̬ 194 */ 195 void Thread::Reset() 196 { 197 _wakeup_time = 0; 198 SetPrivate(NULL); 199 200 InitContext(); 201 CleanState(); 202 } 203 204 /** 205 * @brief �߳���������˯��, ��λ���� 206 * @param ms ˯�ߺ����� 207 */ 208 void Thread::sleep(int ms) 209 { 210 utime64_t now = ScheduleObj::Instance()->ScheduleGetTime(); 211 _wakeup_time = now + ms; 212 213 if (save_context(_jmpbuf) == 0) 214 { 215 ScheduleObj::Instance()->ScheduleSleep(); 216 } 217 } 218 219 /** 220 * @brief ��������״̬, �ȴ��������߳̽��� 221 */ 222 void Thread::Wait() 223 { 224 if (save_context(_jmpbuf) == 0) 225 { 226 ScheduleObj::Instance()->SchedulePend(); 227 } 228 } 229 230 /** 231 * @brief ��ʼ��������,���üĴ���,��ջ 232 */ 233 bool Thread::CheckStackHealth(char *esp) 234 { 235 if (!_stack) 236 return false; 237 238 if (esp > _stack->_stk_bottom && esp < _stack->_stk_top) 239 return true; 240 else 241 return false; 242 } 243 244 /** 245 * @brief �̹߳���, Ĭ������ͨ�߳� 246 * @param type ����, Ĭ����ͨ 247 */ 248 MicroThread::MicroThread(ThreadType type) 249 { 250 memset(&_entry, 0, sizeof(_entry)); 251 TAILQ_INIT(&_fdset); 252 TAILQ_INIT(&_sub_list); 253 _flag = NOT_INLIST; 254 _type = type; 255 _state = INITIAL; 256 _start = NULL; 257 _args = NULL; 258 _parent = NULL; 259 } 260 261 /** 262 * @breif �̸߳���״̬���� 263 */ 264 void MicroThread::CleanState() 265 { 266 TAILQ_INIT(&_fdset); 267 TAILQ_INIT(&_sub_list); 268 _flag = NOT_INLIST; 269 _type = NORMAL; 270 _state = INITIAL; 271 _start = NULL; 272 _args = NULL; 273 _parent = NULL; 274 } 275 276 /** 277 * @brief �̵߳�ʵ�ʹ������� 278 */ 279 void MicroThread::Run() 280 { 281 if (_start) { 282 _start(_args); 283 } 284 285 // �����߳�, �������߳̽��������̬ 286 if (this->IsSubThread()) { 287 this->WakeupParent(); 288 } 289 290 ScheduleObj::Instance()->ScheduleReclaim(); 291 ScheduleObj::Instance()->ScheduleThread(); 292 } 293 294 /** 295 * @brief �������̻߳��Ѹ��̴߳��� 296 */ 297 void MicroThread::WakeupParent() 298 { 299 MicroThread* parent = this->GetParent(); 300 if (parent) 301 { 302 parent->RemoveSubThread(this); 303 if (parent->HasNoSubThread()) 304 { 305 ScheduleObj::Instance()->ScheduleUnpend(parent); 306 } 307 } 308 else 309 { 310 MTLOG_ERROR("Sub thread no parent, error"); 311 } 312 } 313 314 /** 315 * @brief �Ƿ��������Ķ������߳� 316 */ 317 bool MicroThread::HasNoSubThread() 318 { 319 return TAILQ_EMPTY(&_sub_list); 320 } 321 322 /** 323 * @brief ��ָ�����̼߳�������߳��б� 324 */ 325 void MicroThread::AddSubThread(MicroThread* sub) 326 { 327 ASSERT(!sub->HasFlag(MicroThread::SUB_LIST)); 328 if (!sub->HasFlag(MicroThread::SUB_LIST)) 329 { 330 TAILQ_INSERT_TAIL(&_sub_list, sub, _sub_entry); 331 sub->_parent = this; 332 } 333 334 sub->SetFlag(MicroThread::SUB_LIST); 335 } 336 337 /** 338 * @brief ��ָ���߳��Ƴ������߳��б� 339 */ 340 void MicroThread::RemoveSubThread(MicroThread* sub) 341 { 342 ASSERT(sub->HasFlag(MicroThread::SUB_LIST)); 343 if (sub->HasFlag(MicroThread::SUB_LIST)) 344 { 345 TAILQ_REMOVE(&_sub_list, sub, _sub_entry); 346 sub->_parent = NULL; 347 } 348 349 sub->UnsetFlag(MicroThread::SUB_LIST); 350 } 351 352 353 /** 354 * @brief ��������ʾ����� 355 */ 356 ScheduleObj *ScheduleObj::_instance = NULL; ///< ��̬�����ʼ�� 357 inline ScheduleObj* ScheduleObj::Instance() 358 { 359 if (NULL == _instance) 360 { 361 _instance = new ScheduleObj(); 362 } 363 364 return _instance; 365 } 366 367 /** 368 * @brief ���������߳�������, �����ӿ� 369 */ 370 void ScheduleObj::ScheduleThread() 371 { 372 MtFrame* frame = MtFrame::Instance(); 373 frame->ThreadSchdule(); 374 } 375 376 /** 377 * @brief ��ȡȫ�ֵ�ʱ���, ���뵥λ 378 */ 379 utime64_t ScheduleObj::ScheduleGetTime() 380 { 381 MtFrame* frame = MtFrame::Instance(); 382 if (frame) 383 { 384 return frame->GetLastClock(); 385 } 386 else 387 { 388 MTLOG_ERROR("frame time failed, maybe not init"); 389 return 0; 390 } 391 } 392 393 /** 394 * @brief �̵߳�����������sleep״̬ 395 */ 396 void ScheduleObj::ScheduleSleep() 397 { 398 MtFrame* frame = MtFrame::Instance(); 399 MicroThread* thread = frame->GetActiveThread(); 400 if ((!frame) || (!thread)) { 401 MTLOG_ERROR("frame and act thread null, %p, %p", frame, thread); 402 return; 403 } 404 405 frame->InsertSleep(thread); 406 frame->ThreadSchdule(); 407 } 408 409 /** 410 * @brief �̵߳�����������pend״̬ 411 */ 412 void ScheduleObj::SchedulePend() 413 { 414 MtFrame* frame = MtFrame::Instance(); 415 MicroThread* thread = frame->GetActiveThread(); 416 if ((!frame) || (!thread)) { 417 MTLOG_ERROR("frame and act thread null, %p, %p", frame, thread); 418 return; 419 } 420 421 frame->InsertPend(thread); 422 frame->ThreadSchdule(); 423 } 424 425 /** 426 * @brief �̵߳���ȡ��pend״̬, �ⲿ����ȡ�� 427 */ 428 void ScheduleObj::ScheduleUnpend(void* pthread) 429 { 430 MtFrame* frame = MtFrame::Instance(); 431 MicroThread* thread = (MicroThread*)pthread; 432 if ((!frame) || (!thread)) { 433 MTLOG_ERROR("frame and act thread null, %p, %p", frame, thread); 434 return; 435 } 436 437 frame->RemovePend(thread); 438 frame->InsertRunable(thread); 439 } 440 441 442 443 /** 444 * @brief �߳�ִ����Ϻ�, ���մ��� 445 */ 446 void ScheduleObj::ScheduleReclaim() 447 { 448 MtFrame* frame = MtFrame::Instance(); 449 MicroThread* thread = frame->GetActiveThread(); 450 if ((!frame) || (!thread)) { 451 MTLOG_ERROR("frame and act thread null, %p, %p", frame, thread); 452 return; 453 } 454 455 frame->FreeThread(thread); 456 } 457 458 /** 459 * @brief ���������ȳ�ʼִ�� 460 */ 461 void ScheduleObj::ScheduleStartRun() 462 { 463 MtFrame* frame = MtFrame::Instance(); 464 MicroThread* thread = frame->GetActiveThread(); 465 if ((!frame) || (!thread)) { 466 MTLOG_ERROR("frame and act thread null, %p, %p", frame, thread); 467 return; 468 } 469 470 thread->Run(); 471 } 472 473 474 /** 475 * @brief �̳߳�ȫ�ֲ�����ʼ�� 476 */ 477 unsigned int ThreadPool::default_thread_num = DEFAULT_THREAD_NUM; ///< Ĭ��2000�̴߳��� 478 unsigned int ThreadPool::default_stack_size = DEFAULT_STACK_SIZE; ///< Ĭ��128Kջ��С 479 480 /** 481 * @brief �̳߳س�ʼ�� 482 */ 483 bool ThreadPool::InitialPool(int max_num) 484 { 485 MicroThread *thread = NULL; 486 for (unsigned int i = 0; i < default_thread_num; i++) 487 { 488 thread = new MicroThread(); 489 if ((NULL == thread) || (false == thread->Initial())) 490 { 491 MTLOG_ERROR("init pool, thread %p init failed", thread); 492 if (thread) delete thread; 493 continue; 494 } 495 thread->SetFlag(MicroThread::FREE_LIST); 496 _freelist.push(thread); 497 } 498 499 _total_num = _freelist.size(); 500 _max_num = max_num; 501 _use_num = 0; 502 if (_total_num <= 0) 503 { 504 return false; 505 } 506 else 507 { 508 return true; 509 } 510 } 511 512 /** 513 * @brief �̳߳ط���ʼ�� 514 */ 515 void ThreadPool::DestroyPool() 516 { 517 MicroThread* thread = NULL; 518 while (!_freelist.empty()) 519 { 520 thread = _freelist.front(); 521 _freelist.pop(); 522 thread->Destroy(); 523 delete thread; 524 } 525 526 _total_num = 0; 527 _use_num = 0; 528 } 529 530 /** 531 * @brief �̷߳���ӿ� 532 * @return �̶߳��� 533 */ 534 MicroThread* ThreadPool::AllocThread() 535 { 536 MT_ATTR_API_SET(492069, _total_num); // �̳߳ش�С 537 538 MicroThread* thread = NULL; 539 if (!_freelist.empty()) 540 { 541 thread = _freelist.front(); 542 _freelist.pop(); 543 544 ASSERT(thread->HasFlag(MicroThread::FREE_LIST)); 545 546 thread->UnsetFlag(MicroThread::FREE_LIST); 547 _use_num++; 548 return thread; 549 } 550 551 MT_ATTR_API(320846, 1); // pool no nore 552 if (_total_num >= _max_num) 553 { 554 MT_ATTR_API(361140, 1); // no more quota 555 return NULL; 556 } 557 558 thread = new MicroThread(); 559 if ((NULL == thread) || (false == thread->Initial())) 560 { 561 MT_ATTR_API(320847, 1); // pool init fail 562 MTLOG_ERROR("thread alloc failed, thread: %p", thread); 563 if (thread) delete thread; 564 return NULL; 565 } 566 _total_num++; 567 _use_num++; 568 569 return thread; 570 } 571 572 /** 573 * @brief �߳��ͷŽӿ� 574 * @param thread �̶߳��� 575 */ 576 void ThreadPool::FreeThread(MicroThread* thread) 577 { 578 ASSERT(!thread->HasFlag(MicroThread::FREE_LIST)); 579 thread->Reset(); 580 _use_num--; 581 _freelist.push(thread); 582 thread->SetFlag(MicroThread::FREE_LIST); 583 584 ///< ���ж��� > default_thread_num, ���ͷ����ϵ�, �������ͷŵ�ǰ 585 unsigned int free_num = _freelist.size(); 586 if ((free_num > default_thread_num) && (free_num > 1)) 587 { 588 thread = _freelist.front(); 589 _freelist.pop(); 590 thread->Destroy(); 591 delete thread; 592 _total_num--; 593 } 594 } 595 596 int ThreadPool::GetUsedNum(void) 597 { 598 return _use_num; 599 } 600 601 /** 602 * @brief �߳̿����, ȫ��ʵ����ȡ 603 */ 604 MtFrame *MtFrame::_instance = NULL; 605 inline MtFrame* MtFrame::Instance () 606 { 607 if (NULL == _instance ) 608 { 609 _instance = new MtFrame(); 610 } 611 612 return _instance; 613 } 614 615 /** 616 * @brief HOOKϵͳapi������ 617 */ 618 void MtFrame::SetHookFlag() { 619 mt_set_hook_flag(); 620 }; 621 622 623 /** 624 * @brief ��ܳ�ʼ��, Ĭ�ϲ�����־���� 625 */ 626 bool MtFrame::InitFrame(LogAdapter* logadpt, int max_thread_num) 627 { 628 _log_adpt = logadpt; 629 630 // �������������߳���Ŀ, ���Ե���epoll��ص�fd��Ŀ 631 if ((this->InitKqueue(max_thread_num) < 0) || !this->InitialPool(max_thread_num)) 632 { 633 MTLOG_ERROR("Init epoll or thread pool failed"); 634 this->Destroy(); 635 return false; 636 } 637 638 // �������öѴ�С, �Ŵ�Ѹ���Ϊ2�� 639 if (_sleeplist.HeapResize(max_thread_num * 2) < 0) 640 { 641 MTLOG_ERROR("Init heap list failed"); 642 this->Destroy(); 643 return false; 644 } 645 646 // ��ʱ�������ʼ��, �Ŵ�Ѹ���Ϊ2�� 647 _timer = new CTimerMng(max_thread_num * 2); 648 if (NULL == _timer) 649 { 650 MTLOG_ERROR("Init heap timer failed"); 651 this->Destroy(); 652 return false; 653 } 654 655 // �ػ��̵߳�����ʼ�� 656 _daemon = AllocThread(); 657 if (NULL == _daemon) 658 { 659 MTLOG_ERROR("Alloc daemon thread failed"); 660 this->Destroy(); 661 return false; 662 } 663 _daemon->SetType(MicroThread::DAEMON); 664 _daemon->SetState(MicroThread::RUNABLE); 665 _daemon->SetSartFunc(MtFrame::DaemonRun, this); 666 667 // �����߳�, ����INIT, ����ʼ��ջ, Ҳ�ص�ע��, ����Ҫͳһ���� 668 _primo = new MicroThread(MicroThread::PRIMORDIAL); 669 if (NULL == _primo) 670 { 671 MTLOG_ERROR("new _primo thread failed"); 672 this->Destroy(); 673 return false; 674 } 675 _primo->SetState(MicroThread::RUNNING); 676 SetActiveThread(_primo); 677 678 // ��������ʱ��� 679 _last_clock = GetSystemMS(); 680 TAILQ_INIT(&_iolist); 681 TAILQ_INIT(&_pend_list); 682 683 //SetHookFlag(); 684 685 return true; 686 687 } 688 689 /** 690 * @brief ��ܷ���ʼ�� 691 */ 692 void MtFrame::Destroy(void) 693 { 694 if (NULL == _instance ) 695 { 696 return; 697 } 698 699 if (_primo) { 700 delete _primo; 701 _primo = NULL; 702 } 703 704 if (_daemon) { 705 FreeThread(_daemon); 706 _daemon = NULL; 707 } 708 709 TAILQ_INIT(&_iolist); 710 711 MicroThread* thread = dynamic_cast<MicroThread*>(_sleeplist.HeapPop()); 712 while (thread) 713 { 714 FreeThread(thread); 715 thread = dynamic_cast<MicroThread*>(_sleeplist.HeapPop()); 716 } 717 718 while (!_runlist.empty()) 719 { 720 thread = _runlist.front(); 721 _runlist.pop(); 722 FreeThread(thread); 723 } 724 725 MicroThread* tmp; 726 TAILQ_FOREACH_SAFE(thread, &_pend_list, _entry, tmp) 727 { 728 TAILQ_REMOVE(&_pend_list, thread, _entry); 729 FreeThread(thread); 730 } 731 732 if (_timer != NULL) 733 { 734 delete _timer; 735 _timer = NULL; 736 } 737 738 _instance->DestroyPool(); 739 _instance->TermKqueue(); 740 delete _instance; 741 _instance = NULL; 742 } 743 744 /** 745 * @brief �߳̿�ܰ汾��ȡ 746 */ 747 char* MtFrame::Version() 748 { 749 return IMT_VERSION; 750 } 751 752 /** 753 * @brief �̴߳����ӿ� 754 * @param entry �߳���ں��� 755 * @param args �߳���ڲ��� 756 * @return �߳�ָ��, NULL��ʾʧ�� 757 */ 758 MicroThread* MtFrame::CreateThread(ThreadStart entry, void *args, bool runable) 759 { 760 MtFrame* mtframe = MtFrame::Instance(); 761 MicroThread* thread = mtframe->AllocThread(); 762 if (NULL == thread) 763 { 764 MTLOG_ERROR("create thread failed"); 765 return NULL; 766 } 767 thread->SetSartFunc(entry, args); 768 769 if (runable) { 770 mtframe->InsertRunable(thread); 771 } 772 773 return thread; 774 } 775 776 int MtFrame::Loop(void* args) 777 { 778 MtFrame* mtframe = MtFrame::Instance(); 779 MicroThread* daemon = mtframe->DaemonThread(); 780 781 mtframe->KqueueDispatch(); 782 mtframe->SetLastClock(mtframe->GetSystemMS()); 783 mtframe->WakeupTimeout(); 784 mtframe->CheckExpired(); 785 daemon->SwitchContext(); 786 787 return 0; 788 } 789 790 /** 791 * @brief �ػ��߳���ں���, ����ָ��Ҫ��static���� 792 * @param args �߳���ڲ��� 793 */ 794 void MtFrame::DaemonRun(void* args) 795 { 796 /* 797 MtFrame* mtframe = MtFrame::Instance(); 798 MicroThread* daemon = mtframe->DaemonThread(); 799 800 while (true) { 801 mtframe->KqueueDispatch(); 802 mtframe->SetLastClock(mtframe->GetSystemMS()); 803 mtframe->WakeupTimeout(); 804 mtframe->CheckExpired(); 805 daemon->SwitchContext(); 806 } 807 */ 808 ff_run(MtFrame::Loop, NULL); 809 } 810 811 /** 812 * @brief ��ȡ��ǰ�̵߳ĸ��߳� 813 */ 814 MicroThread *MtFrame::GetRootThread() 815 { 816 if (NULL == _curr_thread) 817 { 818 return NULL; 819 } 820 821 MicroThread::ThreadType type = _curr_thread->GetType(); 822 MicroThread *thread = _curr_thread; 823 MicroThread *parent = thread; 824 825 while (MicroThread::SUB_THREAD == type) 826 { 827 thread = thread->GetParent(); 828 if (!thread) 829 { 830 break; 831 } 832 833 type = thread->GetType(); 834 parent = thread; 835 } 836 837 return parent; 838 } 839 840 /** 841 * @brief ��ܵ����߳����� 842 */ 843 void MtFrame::ThreadSchdule() 844 { 845 MicroThread* thread = NULL; 846 MtFrame* mtframe = MtFrame::Instance(); 847 848 if (mtframe->_runlist.empty()) 849 { 850 thread = mtframe->DaemonThread(); 851 } 852 else 853 { 854 thread = mtframe->_runlist.front(); 855 mtframe->RemoveRunable(thread); 856 } 857 858 this->SetActiveThread(thread); 859 thread->SetState(MicroThread::RUNNING); 860 thread->RestoreContext(); 861 } 862 863 /** 864 * @brief ��ܴ���ʱ�ص����� 865 */ 866 void MtFrame::CheckExpired() 867 { 868 static utime64_t check_time = 0; 869 870 if (_timer != NULL) 871 { 872 _timer->check_expired(); 873 } 874 875 utime64_t now = GetLastClock(); 876 877 if ((now - check_time) > 1000) 878 { 879 CNetMgr::Instance()->RecycleObjs(now); 880 check_time = now; 881 } 882 } 883 884 /** 885 * @brief ��ܼ���ʱ, �������еij�ʱ�߳� 886 */ 887 void MtFrame::WakeupTimeout() 888 { 889 utime64_t now = GetLastClock(); 890 MicroThread* thread = dynamic_cast<MicroThread*>(_sleeplist.HeapTop()); 891 while (thread && (thread->GetWakeupTime() <= now)) 892 { 893 if (thread->HasFlag(MicroThread::IO_LIST)) 894 { 895 RemoveIoWait(thread); 896 } 897 else 898 { 899 RemoveSleep(thread); 900 } 901 902 InsertRunable(thread); 903 904 thread = dynamic_cast<MicroThread*>(_sleeplist.HeapTop()); 905 } 906 } 907 908 /** 909 * @brief ��ܵ���epoll waitǰ, �ж��ȴ�ʱ����Ϣ 910 */ 911 int MtFrame::KqueueGetTimeout() 912 { 913 utime64_t now = GetLastClock(); 914 MicroThread* thread = dynamic_cast<MicroThread*>(_sleeplist.HeapTop()); 915 if (!thread) 916 { 917 return 10; //Ĭ��10ms epollwait 918 } 919 else if (thread->GetWakeupTime() < now) 920 { 921 return 0; 922 } 923 else 924 { 925 return (int)(thread->GetWakeupTime() - now); 926 } 927 } 928 929 /** 930 * @brief ��ܹ����̵߳�Ԫ, ��������� 931 * @param thread �̶߳��� 932 */ 933 inline void MtFrame::InsertSleep(MicroThread* thread) 934 { 935 ASSERT(!thread->HasFlag(MicroThread::SLEEP_LIST)); 936 937 thread->SetFlag(MicroThread::SLEEP_LIST); 938 thread->SetState(MicroThread::SLEEPING); 939 int rc = _sleeplist.HeapPush(thread); 940 if (rc < 0) 941 { 942 MT_ATTR_API(320848, 1); // heap error 943 MTLOG_ERROR("Insert heap failed , rc %d", rc); 944 } 945 } 946 947 /** 948 * @brief ��ܹ����̵߳�Ԫ, �Ƴ������ 949 * @param thread �̶߳��� 950 */ 951 inline void MtFrame::RemoveSleep(MicroThread* thread) 952 { 953 ASSERT(thread->HasFlag(MicroThread::SLEEP_LIST)); 954 thread->UnsetFlag(MicroThread::SLEEP_LIST); 955 956 int rc = _sleeplist.HeapDelete(thread); 957 if (rc < 0) 958 { 959 MT_ATTR_API(320849, 1); // heap error 960 MTLOG_ERROR("remove heap failed , rc %d", rc); 961 } 962 } 963 964 /** 965 * @brief ��ܹ����̵߳�Ԫ, ִ��IO�ȴ�״̬ 966 * @param thread �̶߳��� 967 */ 968 inline void MtFrame::InsertIoWait(MicroThread* thread) 969 { 970 ASSERT(!thread->HasFlag(MicroThread::IO_LIST)); 971 thread->SetFlag(MicroThread::IO_LIST); 972 TAILQ_INSERT_TAIL(&_iolist, thread, _entry); 973 InsertSleep(thread); 974 } 975 976 /** 977 * @brief ��ܹ����̵߳�Ԫ, �Ƴ�IO�ȴ�״̬ 978 * @param thread �̶߳��� 979 */ 980 void MtFrame::RemoveIoWait(MicroThread* thread) 981 { 982 ASSERT(thread->HasFlag(MicroThread::IO_LIST)); 983 thread->UnsetFlag(MicroThread::IO_LIST); 984 TAILQ_REMOVE(&_iolist, thread, _entry); 985 986 RemoveSleep(thread); 987 } 988 989 /** 990 * @brief ��ܹ����̵߳�Ԫ, ��������ж��� 991 * @param thread �̶߳��� 992 */ 993 void MtFrame::InsertRunable(MicroThread* thread) 994 { 995 ASSERT(!thread->HasFlag(MicroThread::RUN_LIST)); 996 thread->SetFlag(MicroThread::RUN_LIST); 997 998 thread->SetState(MicroThread::RUNABLE); 999 _runlist.push(thread); 1000 _waitnum++; 1001 } 1002 1003 /** 1004 * @brief ��ܹ����̵߳�Ԫ, �Ƴ������ж��� 1005 * @param thread �̶߳��� 1006 */ 1007 inline void MtFrame::RemoveRunable(MicroThread* thread) 1008 { 1009 ASSERT(thread->HasFlag(MicroThread::RUN_LIST)); 1010 ASSERT(thread == _runlist.front()); 1011 thread->UnsetFlag(MicroThread::RUN_LIST); 1012 1013 _runlist.pop(); 1014 _waitnum--; 1015 } 1016 1017 1018 /** 1019 * @brief ��ܹ����̵߳�Ԫ, ִ��pend�ȴ�״̬ 1020 * @param thread �̶߳��� 1021 */ 1022 void MtFrame::InsertPend(MicroThread* thread) 1023 { 1024 ASSERT(!thread->HasFlag(MicroThread::PEND_LIST)); 1025 thread->SetFlag(MicroThread::PEND_LIST); 1026 TAILQ_INSERT_TAIL(&_pend_list, thread, _entry); 1027 thread->SetState(MicroThread::PENDING); 1028 } 1029 1030 /** 1031 * @brief ��ܹ����̵߳�Ԫ, �Ƴ�PEND�ȴ�״̬ 1032 * @param thread �̶߳��� 1033 */ 1034 void MtFrame::RemovePend(MicroThread* thread) 1035 { 1036 ASSERT(thread->HasFlag(MicroThread::PEND_LIST)); 1037 thread->UnsetFlag(MicroThread::PEND_LIST); 1038 TAILQ_REMOVE(&_pend_list, thread, _entry); 1039 } 1040 1041 /** 1042 * @brief �߳������л�, �ȴ������̵߳Ļ��� 1043 * @param timeout ��ȴ�ʱ��, ���� 1044 */ 1045 void MtFrame::WaitNotify(utime64_t timeout) 1046 { 1047 MicroThread* thread = GetActiveThread(); 1048 1049 thread->SetWakeupTime(timeout + this->GetLastClock()); 1050 this->InsertIoWait(thread); 1051 thread->SwitchContext(); 1052 } 1053 1054 /** 1055 * @brief �̴߳����л�����,���óɹ� ���ó�cpu 1056 * @param fdlist ��·������socket�б� 1057 * @param fd ���������fd��Ϣ 1058 * @param timeout ��ȴ�ʱ��, ���� 1059 * @return true �ɹ�, false ʧ�� 1060 */ 1061 bool MtFrame::KqueueSchedule(KqObjList* fdlist, KqueuerObj* fd, int timeout) 1062 { 1063 MicroThread* thread = GetActiveThread(); 1064 if (NULL == thread) 1065 { 1066 MTLOG_ERROR("active thread null, epoll schedule failed"); 1067 return false; 1068 } 1069 1070 // 1. ���ϸ��߳���Ҫ���ĵ�epoll���ȶ��� 1071 thread->ClearAllFd(); 1072 if (fdlist) 1073 { 1074 thread->AddFdList(fdlist); 1075 } 1076 if (fd) 1077 { 1078 thread->AddFd(fd); 1079 } 1080 1081 // 2. ����epoll�����¼�, ������ʱʱ��, �л�IO�ȴ�״̬, �����л� 1082 thread->SetWakeupTime(timeout + this->GetLastClock()); 1083 if (!this->KqueueAdd(thread->GetFdSet())) 1084 { 1085 MTLOG_ERROR("epoll add failed, errno: %d", errno); 1086 return false; 1087 } 1088 this->InsertIoWait(thread); 1089 thread->SwitchContext(); 1090 1091 // 3. ����OK, �ж���ʱ, epoll ctrl ��ԭ״̬ 1092 int rcvnum = 0; 1093 KqObjList& rcvfds = thread->GetFdSet(); 1094 KqueuerObj* fdata = NULL; 1095 TAILQ_FOREACH(fdata, &rcvfds, _entry) 1096 { 1097 if (fdata->GetRcvEvents() != 0) 1098 { 1099 rcvnum++; 1100 } 1101 } 1102 this->KqueueDel(rcvfds); // ��һ��������ADD, DEL �ջ����� 1103 1104 if (rcvnum == 0) // ��ʱ����, ���ش��� 1105 { 1106 errno = ETIME; 1107 return false; 1108 } 1109 1110 return true; 1111 } 1112 1113 /** 1114 * @brief �̰߳�����ϵͳIO���� recvfrom 1115 * @param fd ϵͳsocket��Ϣ 1116 * @param buf ������Ϣ������ָ�� 1117 * @param len ������Ϣ���������� 1118 * @param from ��Դ��ַ��ָ�� 1119 * @param fromlen ��Դ��ַ�Ľṹ���� 1120 * @param timeout ��ȴ�ʱ��, ���� 1121 * @return >0 �ɹ����ճ���, <0 ʧ�� 1122 */ 1123 int MtFrame::recvfrom(int fd, void *buf, int len, int flags, struct sockaddr *from, socklen_t *fromlen, int timeout) 1124 { 1125 MtFrame* mtframe = MtFrame::Instance(); 1126 utime64_t start = mtframe->GetLastClock(); 1127 MicroThread* thread = mtframe->GetActiveThread(); 1128 utime64_t now = 0; 1129 1130 if(fd<0 || !buf || len<1) 1131 { 1132 errno = EINVAL; 1133 MTLOG_ERROR("recvfrom failed, errno: %d (%m)", errno); 1134 return -10; 1135 } 1136 1137 if (timeout <= -1) 1138 { 1139 timeout = 0x7fffffff; 1140 } 1141 1142 while (true) 1143 { 1144 now = mtframe->GetLastClock(); 1145 if ((int)(now - start) > timeout) 1146 { 1147 errno = ETIME; 1148 return -1; 1149 } 1150 1151 KqueuerObj epfd; 1152 epfd.SetOsfd(fd); 1153 epfd.EnableInput(); 1154 epfd.SetOwnerThread(thread); 1155 if (!mtframe->KqueueSchedule(NULL, &epfd, timeout)) 1156 { 1157 MTLOG_DEBUG("epoll schedule failed, errno: %d", errno); 1158 return -2; 1159 } 1160 1161 mt_hook_syscall(recvfrom); 1162 int n = ff_hook_recvfrom(fd, buf, len, flags, from, fromlen); 1163 if (n < 0) 1164 { 1165 if (errno == EINTR) { 1166 continue; 1167 } 1168 1169 if ((errno != EAGAIN) && (errno != EWOULDBLOCK)) 1170 { 1171 MTLOG_ERROR("recvfrom failed, errno: %d", errno); 1172 return -3; 1173 } 1174 } 1175 else 1176 { 1177 return n; 1178 } 1179 } 1180 1181 } 1182 1183 /** 1184 * @brief �̰߳�����ϵͳIO���� sendto 1185 * @param fd ϵͳsocket��Ϣ 1186 * @param msg �����͵���Ϣָ�� 1187 * @param len �����͵���Ϣ���� 1188 * @param to Ŀ�ĵ�ַ��ָ�� 1189 * @param tolen Ŀ�ĵ�ַ�Ľṹ���� 1190 * @param timeout ��ȴ�ʱ��, ���� 1191 * @return >0 �ɹ����ͳ���, <0 ʧ�� 1192 */ 1193 int MtFrame::sendto(int fd, const void *msg, int len, int flags, const struct sockaddr *to, int tolen, int timeout) 1194 { 1195 MtFrame* mtframe = MtFrame::Instance(); 1196 utime64_t start = mtframe->GetLastClock(); 1197 MicroThread* thread = mtframe->GetActiveThread(); 1198 utime64_t now = 0; 1199 1200 if(fd<0 || !msg || len<1) 1201 { 1202 errno = EINVAL; 1203 MTLOG_ERROR("sendto failed, errno: %d (%m)", errno); 1204 return -10; 1205 } 1206 1207 int n = 0; 1208 mt_hook_syscall(sendto); 1209 while ((n = ff_hook_sendto(fd, msg, len, flags, to, tolen)) < 0) 1210 { 1211 now = mtframe->GetLastClock(); 1212 if ((int)(now - start) > timeout) 1213 { 1214 errno = ETIME; 1215 return -1; 1216 } 1217 1218 if (errno == EINTR) { 1219 continue; 1220 } 1221 1222 if ((errno != EAGAIN) && (errno != EWOULDBLOCK)) { 1223 MTLOG_ERROR("sendto failed, errno: %d", errno); 1224 return -2; 1225 } 1226 1227 KqueuerObj epfd; 1228 epfd.SetOsfd(fd); 1229 epfd.EnableOutput(); 1230 epfd.SetOwnerThread(thread); 1231 if (!mtframe->KqueueSchedule(NULL, &epfd, timeout)) { 1232 return -3; 1233 } 1234 } 1235 1236 return n; 1237 } 1238 1239 /** 1240 * @brief �̰߳�����ϵͳIO���� connect 1241 * @param fd ϵͳsocket��Ϣ 1242 * @param addr ָ��server��Ŀ�ĵ�ַ 1243 * @param addrlen ��ַ�ij��� 1244 * @param timeout ��ȴ�ʱ��, ���� 1245 * @return =0 ���ӳɹ�, <0 ʧ�� 1246 */ 1247 int MtFrame::connect(int fd, const struct sockaddr *addr, int addrlen, int timeout) 1248 { 1249 MtFrame* mtframe = MtFrame::Instance(); 1250 utime64_t start = mtframe->GetLastClock(); 1251 MicroThread* thread = mtframe->GetActiveThread(); 1252 utime64_t now = 0; 1253 1254 if(fd<0 || !addr || addrlen<1) 1255 { 1256 errno = EINVAL; 1257 MTLOG_ERROR("connect failed, errno: %d (%m)", errno); 1258 return -10; 1259 } 1260 1261 int n = 0; 1262 mt_hook_syscall(connect); 1263 while ((n = ff_hook_connect(fd, addr, addrlen)) < 0) 1264 { 1265 now = mtframe->GetLastClock(); 1266 if ((int)(now - start) > timeout) 1267 { 1268 errno = ETIME; 1269 return -1; 1270 } 1271 1272 if (errno == EISCONN) // ������, ���سɹ� 1273 { 1274 return 0; 1275 } 1276 1277 if (errno == EINTR) { 1278 continue; 1279 } 1280 1281 if (errno != EINPROGRESS) { 1282 MTLOG_ERROR("connect failed, errno: %d", errno); 1283 return -2; 1284 } 1285 1286 KqueuerObj epfd; 1287 epfd.SetOsfd(fd); 1288 epfd.EnableOutput(); 1289 epfd.SetOwnerThread(thread); 1290 if (!mtframe->KqueueSchedule(NULL, &epfd, timeout)) { 1291 return -3; 1292 } 1293 } 1294 1295 return n; 1296 } 1297 1298 /** 1299 * @brief �̰߳�����ϵͳIO���� accept 1300 * @param fd �������� 1301 * @param addr �ͻ��˵�ַ 1302 * @param addrlen ��ַ�ij��� 1303 * @param timeout ��ȴ�ʱ��, ���� 1304 * @return >=0 accept��socket������, <0 ʧ�� 1305 */ 1306 int MtFrame::accept(int fd, struct sockaddr *addr, socklen_t *addrlen, int timeout) 1307 { 1308 MtFrame* mtframe = MtFrame::Instance(); 1309 utime64_t start = mtframe->GetLastClock(); 1310 MicroThread* thread = mtframe->GetActiveThread(); 1311 utime64_t now = 0; 1312 1313 if(fd<0) 1314 { 1315 errno = EINVAL; 1316 MTLOG_ERROR("accept failed, errno: %d (%m)", errno); 1317 return -10; 1318 } 1319 1320 int acceptfd = 0; 1321 mt_hook_syscall(accept); 1322 while ((acceptfd = ff_hook_accept(fd, addr, addrlen)) < 0) 1323 { 1324 now = mtframe->GetLastClock(); 1325 if ((int)(now - start) > timeout) 1326 { 1327 errno = ETIME; 1328 return -1; 1329 } 1330 1331 if (errno == EINTR) { 1332 continue; 1333 } 1334 1335 if (!((errno == EAGAIN) || (errno == EWOULDBLOCK))) { 1336 MTLOG_ERROR("accept failed, errno: %d", errno); 1337 return -2; 1338 } 1339 1340 KqueuerObj epfd; 1341 epfd.SetOsfd(fd); 1342 epfd.EnableInput(); 1343 epfd.SetOwnerThread(thread); 1344 if (!mtframe->KqueueSchedule(NULL, &epfd, timeout)) { 1345 return -3; 1346 } 1347 } 1348 1349 return acceptfd; 1350 } 1351 1352 1353 /** 1354 * @brief �̰߳�����ϵͳIO���� read 1355 * @param fd ϵͳsocket��Ϣ 1356 * @param buf ������Ϣ������ָ�� 1357 * @param nbyte ������Ϣ���������� 1358 * @param timeout ��ȴ�ʱ��, ���� 1359 * @return >0 �ɹ����ճ���, <0 ʧ�� 1360 */ 1361 ssize_t MtFrame::read(int fd, void *buf, size_t nbyte, int timeout) 1362 { 1363 MtFrame* mtframe = MtFrame::Instance(); 1364 utime64_t start = mtframe->GetLastClock(); 1365 MicroThread* thread = mtframe->GetActiveThread(); 1366 utime64_t now = 0; 1367 1368 if(fd<0 || !buf || nbyte<1) 1369 { 1370 errno = EINVAL; 1371 MTLOG_ERROR("read failed, errno: %d (%m)", errno); 1372 return -10; 1373 } 1374 1375 ssize_t n = 0; 1376 mt_hook_syscall(read); 1377 while ((n = ff_hook_read(fd, buf, nbyte)) < 0) 1378 { 1379 now = mtframe->GetLastClock(); 1380 if ((int)(now - start) > timeout) 1381 { 1382 errno = ETIME; 1383 return -1; 1384 } 1385 1386 if (errno == EINTR) { 1387 continue; 1388 } 1389 1390 if ((errno != EAGAIN) && (errno != EWOULDBLOCK)) { 1391 MTLOG_ERROR("read failed, errno: %d", errno); 1392 return -2; 1393 } 1394 1395 KqueuerObj epfd; 1396 epfd.SetOsfd(fd); 1397 epfd.EnableInput(); 1398 epfd.SetOwnerThread(thread); 1399 if (!mtframe->KqueueSchedule(NULL, &epfd, timeout)) { 1400 return -3; 1401 } 1402 } 1403 1404 return n; 1405 } 1406 1407 /** 1408 * @brief �̰߳�����ϵͳIO���� write 1409 * @param fd ϵͳsocket��Ϣ 1410 * @param buf �����͵���Ϣָ�� 1411 * @param nbyte �����͵���Ϣ���� 1412 * @param timeout ��ȴ�ʱ��, ���� 1413 * @return >0 �ɹ����ͳ���, <0 ʧ�� 1414 */ 1415 ssize_t MtFrame::write(int fd, const void *buf, size_t nbyte, int timeout) 1416 { 1417 MtFrame* mtframe = MtFrame::Instance(); 1418 utime64_t start = mtframe->GetLastClock(); 1419 MicroThread* thread = mtframe->GetActiveThread(); 1420 utime64_t now = 0; 1421 1422 if(fd<0 || !buf || nbyte<1) 1423 { 1424 errno = EINVAL; 1425 MTLOG_ERROR("write failed, errno: %d (%m)", errno); 1426 return -10; 1427 } 1428 1429 ssize_t n = 0; 1430 size_t send_len = 0; 1431 while (send_len < nbyte) 1432 { 1433 now = mtframe->GetLastClock(); 1434 if ((int)(now - start) > timeout) 1435 { 1436 errno = ETIME; 1437 return -1; 1438 } 1439 1440 mt_hook_syscall(write); 1441 n = ff_hook_write(fd, (char*)buf + send_len, nbyte - send_len); 1442 if (n < 0) 1443 { 1444 if (errno == EINTR) { 1445 continue; 1446 } 1447 1448 if ((errno != EAGAIN) && (errno != EWOULDBLOCK)) { 1449 MTLOG_ERROR("write failed, errno: %d", errno); 1450 return -2; 1451 } 1452 } 1453 else 1454 { 1455 send_len += n; 1456 if (send_len >= nbyte) { 1457 return nbyte; 1458 } 1459 } 1460 1461 KqueuerObj epfd; 1462 epfd.SetOsfd(fd); 1463 epfd.EnableOutput(); 1464 epfd.SetOwnerThread(thread); 1465 if (!mtframe->KqueueSchedule(NULL, &epfd, timeout)) { 1466 return -3; 1467 } 1468 } 1469 1470 return nbyte; 1471 } 1472 1473 1474 /** 1475 * @brief �̰߳�����ϵͳIO���� recv 1476 * @param fd ϵͳsocket��Ϣ 1477 * @param buf ������Ϣ������ָ�� 1478 * @param len ������Ϣ���������� 1479 * @param timeout ��ȴ�ʱ��, ���� 1480 * @return >0 �ɹ����ճ���, <0 ʧ�� 1481 */ 1482 int MtFrame::recv(int fd, void *buf, int len, int flags, int timeout) 1483 { 1484 MtFrame* mtframe = MtFrame::Instance(); 1485 utime64_t start = mtframe->GetLastClock(); 1486 MicroThread* thread = mtframe->GetActiveThread(); 1487 utime64_t now = 0; 1488 1489 if(fd<0 || !buf || len<1) 1490 { 1491 errno = EINVAL; 1492 MTLOG_ERROR("recv failed, errno: %d (%m)", errno); 1493 return -10; 1494 } 1495 1496 if (timeout <= -1) 1497 { 1498 timeout = 0x7fffffff; 1499 } 1500 1501 while (true) 1502 { 1503 now = mtframe->GetLastClock(); 1504 if ((int)(now - start) > timeout) 1505 { 1506 errno = ETIME; 1507 return -1; 1508 } 1509 1510 KqueuerObj epfd; 1511 epfd.SetOsfd(fd); 1512 epfd.EnableInput(); 1513 epfd.SetOwnerThread(thread); 1514 if (!mtframe->KqueueSchedule(NULL, &epfd, timeout)) 1515 { 1516 MTLOG_DEBUG("epoll schedule failed, errno: %d", errno); 1517 return -2; 1518 } 1519 1520 mt_hook_syscall(recv); 1521 int n = ff_hook_recv(fd, buf, len, flags); 1522 if (n < 0) 1523 { 1524 if (errno == EINTR) { 1525 continue; 1526 } 1527 1528 if ((errno != EAGAIN) && (errno != EWOULDBLOCK)) 1529 { 1530 MTLOG_ERROR("recv failed, errno: %d", errno); 1531 return -3; 1532 } 1533 } 1534 else 1535 { 1536 return n; 1537 } 1538 } 1539 1540 } 1541 1542 /** 1543 * @brief �̰߳�����ϵͳIO���� send 1544 * @param fd ϵͳsocket��Ϣ 1545 * @param buf �����͵���Ϣָ�� 1546 * @param nbyte �����͵���Ϣ���� 1547 * @param timeout ��ȴ�ʱ��, ���� 1548 * @return >0 �ɹ����ͳ���, <0 ʧ�� 1549 */ 1550 ssize_t MtFrame::send(int fd, const void *buf, size_t nbyte, int flags, int timeout) 1551 { 1552 MtFrame* mtframe = MtFrame::Instance(); 1553 utime64_t start = mtframe->GetLastClock(); 1554 MicroThread* thread = mtframe->GetActiveThread(); 1555 utime64_t now = 0; 1556 1557 if(fd<0 || !buf || nbyte<1) 1558 { 1559 errno = EINVAL; 1560 MTLOG_ERROR("send failed, errno: %d (%m)", errno); 1561 return -10; 1562 } 1563 1564 ssize_t n = 0; 1565 size_t send_len = 0; 1566 while (send_len < nbyte) 1567 { 1568 now = mtframe->GetLastClock(); 1569 if ((int)(now - start) > timeout) 1570 { 1571 errno = ETIME; 1572 return -1; 1573 } 1574 1575 mt_hook_syscall(send); 1576 n = ff_hook_send(fd, (char*)buf + send_len, nbyte - send_len, flags); 1577 if (n < 0) 1578 { 1579 if (errno == EINTR) { 1580 continue; 1581 } 1582 1583 if ((errno != EAGAIN) && (errno != EWOULDBLOCK)) { 1584 MTLOG_ERROR("write failed, errno: %d", errno); 1585 return -2; 1586 } 1587 } 1588 else 1589 { 1590 send_len += n; 1591 if (send_len >= nbyte) { 1592 return nbyte; 1593 } 1594 } 1595 1596 KqueuerObj epfd; 1597 epfd.SetOsfd(fd); 1598 epfd.EnableOutput(); 1599 epfd.SetOwnerThread(thread); 1600 if (!mtframe->KqueueSchedule(NULL, &epfd, timeout)) { 1601 return -3; 1602 } 1603 } 1604 1605 return nbyte; 1606 } 1607 1608 1609 1610 /** 1611 * @brief �߳�����sleep�ӿ�, ��λms 1612 */ 1613 void MtFrame::sleep(int ms) 1614 { 1615 MtFrame* frame = MtFrame::Instance(); 1616 MicroThread* thread = frame->GetActiveThread(); 1617 if (thread != NULL) 1618 { 1619 thread->sleep(ms); 1620 } 1621 } 1622 1623 /** 1624 * @brief �̰߳�����ϵͳIO���� recv 1625 * @param fd ϵͳsocket��Ϣ 1626 * @param events �¼����� EPOLLIN or EPOLLOUT 1627 * @param timeout ��ȴ�ʱ��, ���� 1628 * @return >0 �ɹ����ճ���, <0 ʧ�� 1629 */ 1630 int MtFrame::WaitEvents(int fd, int events, int timeout) 1631 { 1632 MtFrame* mtframe = MtFrame::Instance(); 1633 utime64_t start = mtframe->GetLastClock(); 1634 MicroThread* thread = mtframe->GetActiveThread(); 1635 utime64_t now = 0; 1636 1637 if (timeout <= -1) 1638 { 1639 timeout = 0x7fffffff; 1640 } 1641 1642 while (true) 1643 { 1644 now = mtframe->GetLastClock(); 1645 if ((int)(now - start) > timeout) 1646 { 1647 errno = ETIME; 1648 return 0; 1649 } 1650 1651 KqueuerObj epfd; 1652 epfd.SetOsfd(fd); 1653 if (events & KQ_EVENT_READ) 1654 { 1655 epfd.EnableInput(); 1656 } 1657 if (events & KQ_EVENT_WRITE) 1658 { 1659 epfd.EnableOutput(); 1660 } 1661 epfd.SetOwnerThread(thread); 1662 1663 if (!mtframe->KqueueSchedule(NULL, &epfd, timeout)) 1664 { 1665 MTLOG_TRACE("epoll schedule failed, errno: %d", errno); 1666 return 0; 1667 } 1668 1669 return epfd.GetRcvEvents(); 1670 } 1671 } 1672 1673 1674