1*a9643ea8Slogwang 2*a9643ea8Slogwang /** 3*a9643ea8Slogwang * Tencent is pleased to support the open source community by making MSEC available. 4*a9643ea8Slogwang * 5*a9643ea8Slogwang * Copyright (C) 2016 THL A29 Limited, a Tencent company. All rights reserved. 6*a9643ea8Slogwang * 7*a9643ea8Slogwang * Licensed under the GNU General Public License, Version 2.0 (the "License"); 8*a9643ea8Slogwang * you may not use this file except in compliance with the License. You may 9*a9643ea8Slogwang * obtain a copy of the License at 10*a9643ea8Slogwang * 11*a9643ea8Slogwang * https://opensource.org/licenses/GPL-2.0 12*a9643ea8Slogwang * 13*a9643ea8Slogwang * Unless required by applicable law or agreed to in writing, software distributed under the 14*a9643ea8Slogwang * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, 15*a9643ea8Slogwang * either express or implied. See the License for the specific language governing permissions 16*a9643ea8Slogwang * and limitations under the License. 17*a9643ea8Slogwang */ 18*a9643ea8Slogwang 19*a9643ea8Slogwang 20*a9643ea8Slogwang /** 21*a9643ea8Slogwang * @filename micro_thread.cpp 22*a9643ea8Slogwang * @info micro thread manager 23*a9643ea8Slogwang */ 24*a9643ea8Slogwang #include "mt_version.h" 25*a9643ea8Slogwang #include "micro_thread.h" 26*a9643ea8Slogwang #include "mt_net.h" 27*a9643ea8Slogwang #include "valgrind.h" 28*a9643ea8Slogwang #include <assert.h> 29*a9643ea8Slogwang #include "mt_sys_hook.h" 30*a9643ea8Slogwang #include "ff_hook.h" 31*a9643ea8Slogwang #include "ff_api.h" 32*a9643ea8Slogwang 33*a9643ea8Slogwang using namespace NS_MICRO_THREAD; 34*a9643ea8Slogwang 35*a9643ea8Slogwang #define ASSERT(statement) 36*a9643ea8Slogwang //#define ASSERT(statement) assert(statement) 37*a9643ea8Slogwang 38*a9643ea8Slogwang /** 39*a9643ea8Slogwang * @brief ���ʵ�ֱ��������ĺ��� 40*a9643ea8Slogwang * @param jbf jmpbuff����ָ�� 41*a9643ea8Slogwang */ 42*a9643ea8Slogwang extern "C" int save_context(jmp_buf jbf); 43*a9643ea8Slogwang 44*a9643ea8Slogwang /** 45*a9643ea8Slogwang * @brief ���ʵ�ָֻ������ĺ��� 46*a9643ea8Slogwang * @param jbf jmpbuff����ָ�� 47*a9643ea8Slogwang * @param ret �лصķ���ֵ, Ĭ��1 48*a9643ea8Slogwang */ 49*a9643ea8Slogwang extern "C" void restore_context(jmp_buf jbf, int ret); 50*a9643ea8Slogwang 51*a9643ea8Slogwang /** 52*a9643ea8Slogwang * @brief ���ʵ���滻����ջ���� 53*a9643ea8Slogwang * @param jbf jmpbuff����ָ�� 54*a9643ea8Slogwang * @param esp ��ջָ�� 55*a9643ea8Slogwang */ 56*a9643ea8Slogwang extern "C" void replace_esp(jmp_buf jbf, void* esp); 57*a9643ea8Slogwang 58*a9643ea8Slogwang /** 59*a9643ea8Slogwang * @brief ���캯��, Ĭ�ϲ���ջ��С 60*a9643ea8Slogwang */ 61*a9643ea8Slogwang Thread::Thread(int stack_size) 62*a9643ea8Slogwang { 63*a9643ea8Slogwang _stack_size = stack_size ? stack_size : ThreadPool::default_stack_size; 64*a9643ea8Slogwang _wakeup_time = 0; 65*a9643ea8Slogwang _stack = NULL; 66*a9643ea8Slogwang memset(&_jmpbuf, 0, sizeof(_jmpbuf)); 67*a9643ea8Slogwang } 68*a9643ea8Slogwang 69*a9643ea8Slogwang 70*a9643ea8Slogwang /** 71*a9643ea8Slogwang * @brief LINUX x86/x86_64�µ�ջ����, �����ܹ�����Ҫע����� 72*a9643ea8Slogwang */ 73*a9643ea8Slogwang bool Thread::InitStack() 74*a9643ea8Slogwang { 75*a9643ea8Slogwang if (_stack) { 76*a9643ea8Slogwang return true; 77*a9643ea8Slogwang } 78*a9643ea8Slogwang 79*a9643ea8Slogwang ///< ջ������ջ�ڴ����, ��Խ�� 80*a9643ea8Slogwang _stack = (MtStack*)calloc(1, sizeof(MtStack)); 81*a9643ea8Slogwang if (NULL == _stack) 82*a9643ea8Slogwang { 83*a9643ea8Slogwang MTLOG_ERROR("calloc stack failed, size %u", sizeof(MtStack)); 84*a9643ea8Slogwang return false; 85*a9643ea8Slogwang } 86*a9643ea8Slogwang 87*a9643ea8Slogwang int memsize = MEM_PAGE_SIZE*2 + _stack_size; 88*a9643ea8Slogwang memsize = (memsize + MEM_PAGE_SIZE - 1)/MEM_PAGE_SIZE*MEM_PAGE_SIZE; 89*a9643ea8Slogwang 90*a9643ea8Slogwang static int zero_fd = -1; 91*a9643ea8Slogwang int mmap_flags = MAP_PRIVATE | MAP_ANON; 92*a9643ea8Slogwang void* vaddr = mmap(NULL, memsize, PROT_READ | PROT_WRITE, mmap_flags, zero_fd, 0); 93*a9643ea8Slogwang if (vaddr == (void *)MAP_FAILED) 94*a9643ea8Slogwang { 95*a9643ea8Slogwang MTLOG_ERROR("mmap stack failed, size %d", memsize); 96*a9643ea8Slogwang free(_stack); 97*a9643ea8Slogwang _stack = NULL; 98*a9643ea8Slogwang return false; 99*a9643ea8Slogwang } 100*a9643ea8Slogwang _stack->_vaddr = (char*)vaddr; 101*a9643ea8Slogwang _stack->_vaddr_size = memsize; 102*a9643ea8Slogwang _stack->_stk_size = _stack_size; 103*a9643ea8Slogwang _stack->_stk_bottom = _stack->_vaddr + MEM_PAGE_SIZE; 104*a9643ea8Slogwang _stack->_stk_top = _stack->_stk_bottom + _stack->_stk_size; 105*a9643ea8Slogwang // valgrind support: register stack frame 106*a9643ea8Slogwang _stack->valgrind_id = VALGRIND_STACK_REGISTER(_stack->_stk_bottom, _stack->_stk_top); 107*a9643ea8Slogwang 108*a9643ea8Slogwang _stack->_esp = _stack->_stk_top - STACK_PAD_SIZE; 109*a9643ea8Slogwang 110*a9643ea8Slogwang mprotect(_stack->_vaddr, MEM_PAGE_SIZE, PROT_NONE); 111*a9643ea8Slogwang mprotect(_stack->_stk_top, MEM_PAGE_SIZE, PROT_NONE); 112*a9643ea8Slogwang 113*a9643ea8Slogwang return true; 114*a9643ea8Slogwang } 115*a9643ea8Slogwang 116*a9643ea8Slogwang 117*a9643ea8Slogwang /** 118*a9643ea8Slogwang * @brief �ͷŶ�ջ��Ϣ 119*a9643ea8Slogwang */ 120*a9643ea8Slogwang void Thread::FreeStack() 121*a9643ea8Slogwang { 122*a9643ea8Slogwang if (!_stack) { 123*a9643ea8Slogwang return; 124*a9643ea8Slogwang } 125*a9643ea8Slogwang munmap(_stack->_vaddr, _stack->_vaddr_size); 126*a9643ea8Slogwang // valgrind support: deregister stack frame 127*a9643ea8Slogwang VALGRIND_STACK_DEREGISTER(_stack->valgrind_id); 128*a9643ea8Slogwang free(_stack); 129*a9643ea8Slogwang _stack = NULL; 130*a9643ea8Slogwang } 131*a9643ea8Slogwang 132*a9643ea8Slogwang /** 133*a9643ea8Slogwang * @brief ��ʼ��������,���üĴ���,��ջ 134*a9643ea8Slogwang */ 135*a9643ea8Slogwang void Thread::InitContext() 136*a9643ea8Slogwang { 137*a9643ea8Slogwang if (save_context(_jmpbuf) != 0) 138*a9643ea8Slogwang { 139*a9643ea8Slogwang ScheduleObj::Instance()->ScheduleStartRun(); // ֱ�ӵ��� this->run? 140*a9643ea8Slogwang } 141*a9643ea8Slogwang 142*a9643ea8Slogwang if (_stack != NULL) 143*a9643ea8Slogwang { 144*a9643ea8Slogwang replace_esp(_jmpbuf, _stack->_esp); 145*a9643ea8Slogwang } 146*a9643ea8Slogwang } 147*a9643ea8Slogwang 148*a9643ea8Slogwang /** 149*a9643ea8Slogwang * @brief �����л�, ����״̬, �������� 150*a9643ea8Slogwang */ 151*a9643ea8Slogwang void Thread::SwitchContext() 152*a9643ea8Slogwang { 153*a9643ea8Slogwang if (save_context(_jmpbuf) == 0) 154*a9643ea8Slogwang { 155*a9643ea8Slogwang ScheduleObj::Instance()->ScheduleThread(); 156*a9643ea8Slogwang } 157*a9643ea8Slogwang } 158*a9643ea8Slogwang 159*a9643ea8Slogwang /** 160*a9643ea8Slogwang * @brief �ָ�������, �л��ضϵ�,�������� 161*a9643ea8Slogwang */ 162*a9643ea8Slogwang void Thread::RestoreContext() 163*a9643ea8Slogwang { 164*a9643ea8Slogwang restore_context(_jmpbuf, 1); 165*a9643ea8Slogwang } 166*a9643ea8Slogwang 167*a9643ea8Slogwang /** 168*a9643ea8Slogwang * @brief ��ʼ���߳�,���ջ�������ij�ʼ�� 169*a9643ea8Slogwang */ 170*a9643ea8Slogwang bool Thread::Initial() 171*a9643ea8Slogwang { 172*a9643ea8Slogwang if (!InitStack()) 173*a9643ea8Slogwang { 174*a9643ea8Slogwang MTLOG_ERROR("init stack failed"); 175*a9643ea8Slogwang return false; 176*a9643ea8Slogwang } 177*a9643ea8Slogwang 178*a9643ea8Slogwang InitContext(); 179*a9643ea8Slogwang 180*a9643ea8Slogwang return true; 181*a9643ea8Slogwang } 182*a9643ea8Slogwang 183*a9643ea8Slogwang /** 184*a9643ea8Slogwang * @brief ��ֹ�߳�,���ջ���������ͷ� 185*a9643ea8Slogwang */ 186*a9643ea8Slogwang void Thread::Destroy() 187*a9643ea8Slogwang { 188*a9643ea8Slogwang FreeStack(); 189*a9643ea8Slogwang memset(&_jmpbuf, 0, sizeof(_jmpbuf)); 190*a9643ea8Slogwang } 191*a9643ea8Slogwang 192*a9643ea8Slogwang /** 193*a9643ea8Slogwang * @brief �߳�״̬����, �ɸ���״̬ 194*a9643ea8Slogwang */ 195*a9643ea8Slogwang void Thread::Reset() 196*a9643ea8Slogwang { 197*a9643ea8Slogwang _wakeup_time = 0; 198*a9643ea8Slogwang SetPrivate(NULL); 199*a9643ea8Slogwang 200*a9643ea8Slogwang InitContext(); 201*a9643ea8Slogwang CleanState(); 202*a9643ea8Slogwang } 203*a9643ea8Slogwang 204*a9643ea8Slogwang /** 205*a9643ea8Slogwang * @brief �߳���������˯��, ��λ���� 206*a9643ea8Slogwang * @param ms ˯�ߺ����� 207*a9643ea8Slogwang */ 208*a9643ea8Slogwang void Thread::sleep(int ms) 209*a9643ea8Slogwang { 210*a9643ea8Slogwang utime64_t now = ScheduleObj::Instance()->ScheduleGetTime(); 211*a9643ea8Slogwang _wakeup_time = now + ms; 212*a9643ea8Slogwang 213*a9643ea8Slogwang if (save_context(_jmpbuf) == 0) 214*a9643ea8Slogwang { 215*a9643ea8Slogwang ScheduleObj::Instance()->ScheduleSleep(); 216*a9643ea8Slogwang } 217*a9643ea8Slogwang } 218*a9643ea8Slogwang 219*a9643ea8Slogwang /** 220*a9643ea8Slogwang * @brief ��������״̬, �ȴ��������߳̽��� 221*a9643ea8Slogwang */ 222*a9643ea8Slogwang void Thread::Wait() 223*a9643ea8Slogwang { 224*a9643ea8Slogwang if (save_context(_jmpbuf) == 0) 225*a9643ea8Slogwang { 226*a9643ea8Slogwang ScheduleObj::Instance()->SchedulePend(); 227*a9643ea8Slogwang } 228*a9643ea8Slogwang } 229*a9643ea8Slogwang 230*a9643ea8Slogwang /** 231*a9643ea8Slogwang * @brief ��ʼ��������,���üĴ���,��ջ 232*a9643ea8Slogwang */ 233*a9643ea8Slogwang bool Thread::CheckStackHealth(char *esp) 234*a9643ea8Slogwang { 235*a9643ea8Slogwang if (!_stack) 236*a9643ea8Slogwang return false; 237*a9643ea8Slogwang 238*a9643ea8Slogwang if (esp > _stack->_stk_bottom && esp < _stack->_stk_top) 239*a9643ea8Slogwang return true; 240*a9643ea8Slogwang else 241*a9643ea8Slogwang return false; 242*a9643ea8Slogwang } 243*a9643ea8Slogwang 244*a9643ea8Slogwang /** 245*a9643ea8Slogwang * @brief �̹߳���, Ĭ������ͨ�߳� 246*a9643ea8Slogwang * @param type ����, Ĭ����ͨ 247*a9643ea8Slogwang */ 248*a9643ea8Slogwang MicroThread::MicroThread(ThreadType type) 249*a9643ea8Slogwang { 250*a9643ea8Slogwang memset(&_entry, 0, sizeof(_entry)); 251*a9643ea8Slogwang TAILQ_INIT(&_fdset); 252*a9643ea8Slogwang TAILQ_INIT(&_sub_list); 253*a9643ea8Slogwang _flag = NOT_INLIST; 254*a9643ea8Slogwang _type = type; 255*a9643ea8Slogwang _state = INITIAL; 256*a9643ea8Slogwang _start = NULL; 257*a9643ea8Slogwang _args = NULL; 258*a9643ea8Slogwang _parent = NULL; 259*a9643ea8Slogwang } 260*a9643ea8Slogwang 261*a9643ea8Slogwang /** 262*a9643ea8Slogwang * @breif �̸߳���״̬���� 263*a9643ea8Slogwang */ 264*a9643ea8Slogwang void MicroThread::CleanState() 265*a9643ea8Slogwang { 266*a9643ea8Slogwang TAILQ_INIT(&_fdset); 267*a9643ea8Slogwang TAILQ_INIT(&_sub_list); 268*a9643ea8Slogwang _flag = NOT_INLIST; 269*a9643ea8Slogwang _type = NORMAL; 270*a9643ea8Slogwang _state = INITIAL; 271*a9643ea8Slogwang _start = NULL; 272*a9643ea8Slogwang _args = NULL; 273*a9643ea8Slogwang _parent = NULL; 274*a9643ea8Slogwang } 275*a9643ea8Slogwang 276*a9643ea8Slogwang /** 277*a9643ea8Slogwang * @brief �̵߳�ʵ�ʹ������� 278*a9643ea8Slogwang */ 279*a9643ea8Slogwang void MicroThread::Run() 280*a9643ea8Slogwang { 281*a9643ea8Slogwang if (_start) { 282*a9643ea8Slogwang _start(_args); 283*a9643ea8Slogwang } 284*a9643ea8Slogwang 285*a9643ea8Slogwang // �����߳�, �������߳̽��������̬ 286*a9643ea8Slogwang if (this->IsSubThread()) { 287*a9643ea8Slogwang this->WakeupParent(); 288*a9643ea8Slogwang } 289*a9643ea8Slogwang 290*a9643ea8Slogwang ScheduleObj::Instance()->ScheduleReclaim(); 291*a9643ea8Slogwang ScheduleObj::Instance()->ScheduleThread(); 292*a9643ea8Slogwang } 293*a9643ea8Slogwang 294*a9643ea8Slogwang /** 295*a9643ea8Slogwang * @brief �������̻߳��Ѹ��̴߳��� 296*a9643ea8Slogwang */ 297*a9643ea8Slogwang void MicroThread::WakeupParent() 298*a9643ea8Slogwang { 299*a9643ea8Slogwang MicroThread* parent = this->GetParent(); 300*a9643ea8Slogwang if (parent) 301*a9643ea8Slogwang { 302*a9643ea8Slogwang parent->RemoveSubThread(this); 303*a9643ea8Slogwang if (parent->HasNoSubThread()) 304*a9643ea8Slogwang { 305*a9643ea8Slogwang ScheduleObj::Instance()->ScheduleUnpend(parent); 306*a9643ea8Slogwang } 307*a9643ea8Slogwang } 308*a9643ea8Slogwang else 309*a9643ea8Slogwang { 310*a9643ea8Slogwang MTLOG_ERROR("Sub thread no parent, error"); 311*a9643ea8Slogwang } 312*a9643ea8Slogwang } 313*a9643ea8Slogwang 314*a9643ea8Slogwang /** 315*a9643ea8Slogwang * @brief �Ƿ��������Ķ������߳� 316*a9643ea8Slogwang */ 317*a9643ea8Slogwang bool MicroThread::HasNoSubThread() 318*a9643ea8Slogwang { 319*a9643ea8Slogwang return TAILQ_EMPTY(&_sub_list); 320*a9643ea8Slogwang } 321*a9643ea8Slogwang 322*a9643ea8Slogwang /** 323*a9643ea8Slogwang * @brief ��ָ�����̼߳�������߳��б� 324*a9643ea8Slogwang */ 325*a9643ea8Slogwang void MicroThread::AddSubThread(MicroThread* sub) 326*a9643ea8Slogwang { 327*a9643ea8Slogwang ASSERT(!sub->HasFlag(MicroThread::SUB_LIST)); 328*a9643ea8Slogwang if (!sub->HasFlag(MicroThread::SUB_LIST)) 329*a9643ea8Slogwang { 330*a9643ea8Slogwang TAILQ_INSERT_TAIL(&_sub_list, sub, _sub_entry); 331*a9643ea8Slogwang sub->_parent = this; 332*a9643ea8Slogwang } 333*a9643ea8Slogwang 334*a9643ea8Slogwang sub->SetFlag(MicroThread::SUB_LIST); 335*a9643ea8Slogwang } 336*a9643ea8Slogwang 337*a9643ea8Slogwang /** 338*a9643ea8Slogwang * @brief ��ָ���߳��Ƴ������߳��б� 339*a9643ea8Slogwang */ 340*a9643ea8Slogwang void MicroThread::RemoveSubThread(MicroThread* sub) 341*a9643ea8Slogwang { 342*a9643ea8Slogwang ASSERT(sub->HasFlag(MicroThread::SUB_LIST)); 343*a9643ea8Slogwang if (sub->HasFlag(MicroThread::SUB_LIST)) 344*a9643ea8Slogwang { 345*a9643ea8Slogwang TAILQ_REMOVE(&_sub_list, sub, _sub_entry); 346*a9643ea8Slogwang sub->_parent = NULL; 347*a9643ea8Slogwang } 348*a9643ea8Slogwang 349*a9643ea8Slogwang sub->UnsetFlag(MicroThread::SUB_LIST); 350*a9643ea8Slogwang } 351*a9643ea8Slogwang 352*a9643ea8Slogwang 353*a9643ea8Slogwang /** 354*a9643ea8Slogwang * @brief ��������ʾ����� 355*a9643ea8Slogwang */ 356*a9643ea8Slogwang ScheduleObj *ScheduleObj::_instance = NULL; ///< ��̬�����ʼ�� 357*a9643ea8Slogwang inline ScheduleObj* ScheduleObj::Instance() 358*a9643ea8Slogwang { 359*a9643ea8Slogwang if (NULL == _instance) 360*a9643ea8Slogwang { 361*a9643ea8Slogwang _instance = new ScheduleObj(); 362*a9643ea8Slogwang } 363*a9643ea8Slogwang 364*a9643ea8Slogwang return _instance; 365*a9643ea8Slogwang } 366*a9643ea8Slogwang 367*a9643ea8Slogwang /** 368*a9643ea8Slogwang * @brief ���������߳�������, �����ӿ� 369*a9643ea8Slogwang */ 370*a9643ea8Slogwang void ScheduleObj::ScheduleThread() 371*a9643ea8Slogwang { 372*a9643ea8Slogwang MtFrame* frame = MtFrame::Instance(); 373*a9643ea8Slogwang frame->ThreadSchdule(); 374*a9643ea8Slogwang } 375*a9643ea8Slogwang 376*a9643ea8Slogwang /** 377*a9643ea8Slogwang * @brief ��ȡȫ�ֵ�ʱ���, ���뵥λ 378*a9643ea8Slogwang */ 379*a9643ea8Slogwang utime64_t ScheduleObj::ScheduleGetTime() 380*a9643ea8Slogwang { 381*a9643ea8Slogwang MtFrame* frame = MtFrame::Instance(); 382*a9643ea8Slogwang if (frame) 383*a9643ea8Slogwang { 384*a9643ea8Slogwang return frame->GetLastClock(); 385*a9643ea8Slogwang } 386*a9643ea8Slogwang else 387*a9643ea8Slogwang { 388*a9643ea8Slogwang MTLOG_ERROR("frame time failed, maybe not init"); 389*a9643ea8Slogwang return 0; 390*a9643ea8Slogwang } 391*a9643ea8Slogwang } 392*a9643ea8Slogwang 393*a9643ea8Slogwang /** 394*a9643ea8Slogwang * @brief �̵߳�����������sleep״̬ 395*a9643ea8Slogwang */ 396*a9643ea8Slogwang void ScheduleObj::ScheduleSleep() 397*a9643ea8Slogwang { 398*a9643ea8Slogwang MtFrame* frame = MtFrame::Instance(); 399*a9643ea8Slogwang MicroThread* thread = frame->GetActiveThread(); 400*a9643ea8Slogwang if ((!frame) || (!thread)) { 401*a9643ea8Slogwang MTLOG_ERROR("frame and act thread null, %p, %p", frame, thread); 402*a9643ea8Slogwang return; 403*a9643ea8Slogwang } 404*a9643ea8Slogwang 405*a9643ea8Slogwang frame->InsertSleep(thread); 406*a9643ea8Slogwang frame->ThreadSchdule(); 407*a9643ea8Slogwang } 408*a9643ea8Slogwang 409*a9643ea8Slogwang /** 410*a9643ea8Slogwang * @brief �̵߳�����������pend״̬ 411*a9643ea8Slogwang */ 412*a9643ea8Slogwang void ScheduleObj::SchedulePend() 413*a9643ea8Slogwang { 414*a9643ea8Slogwang MtFrame* frame = MtFrame::Instance(); 415*a9643ea8Slogwang MicroThread* thread = frame->GetActiveThread(); 416*a9643ea8Slogwang if ((!frame) || (!thread)) { 417*a9643ea8Slogwang MTLOG_ERROR("frame and act thread null, %p, %p", frame, thread); 418*a9643ea8Slogwang return; 419*a9643ea8Slogwang } 420*a9643ea8Slogwang 421*a9643ea8Slogwang frame->InsertPend(thread); 422*a9643ea8Slogwang frame->ThreadSchdule(); 423*a9643ea8Slogwang } 424*a9643ea8Slogwang 425*a9643ea8Slogwang /** 426*a9643ea8Slogwang * @brief �̵߳���ȡ��pend״̬, �ⲿ����ȡ�� 427*a9643ea8Slogwang */ 428*a9643ea8Slogwang void ScheduleObj::ScheduleUnpend(void* pthread) 429*a9643ea8Slogwang { 430*a9643ea8Slogwang MtFrame* frame = MtFrame::Instance(); 431*a9643ea8Slogwang MicroThread* thread = (MicroThread*)pthread; 432*a9643ea8Slogwang if ((!frame) || (!thread)) { 433*a9643ea8Slogwang MTLOG_ERROR("frame and act thread null, %p, %p", frame, thread); 434*a9643ea8Slogwang return; 435*a9643ea8Slogwang } 436*a9643ea8Slogwang 437*a9643ea8Slogwang frame->RemovePend(thread); 438*a9643ea8Slogwang frame->InsertRunable(thread); 439*a9643ea8Slogwang } 440*a9643ea8Slogwang 441*a9643ea8Slogwang 442*a9643ea8Slogwang 443*a9643ea8Slogwang /** 444*a9643ea8Slogwang * @brief �߳�ִ����Ϻ�, ���մ��� 445*a9643ea8Slogwang */ 446*a9643ea8Slogwang void ScheduleObj::ScheduleReclaim() 447*a9643ea8Slogwang { 448*a9643ea8Slogwang MtFrame* frame = MtFrame::Instance(); 449*a9643ea8Slogwang MicroThread* thread = frame->GetActiveThread(); 450*a9643ea8Slogwang if ((!frame) || (!thread)) { 451*a9643ea8Slogwang MTLOG_ERROR("frame and act thread null, %p, %p", frame, thread); 452*a9643ea8Slogwang return; 453*a9643ea8Slogwang } 454*a9643ea8Slogwang 455*a9643ea8Slogwang frame->FreeThread(thread); 456*a9643ea8Slogwang } 457*a9643ea8Slogwang 458*a9643ea8Slogwang /** 459*a9643ea8Slogwang * @brief ���������ȳ�ʼִ�� 460*a9643ea8Slogwang */ 461*a9643ea8Slogwang void ScheduleObj::ScheduleStartRun() 462*a9643ea8Slogwang { 463*a9643ea8Slogwang MtFrame* frame = MtFrame::Instance(); 464*a9643ea8Slogwang MicroThread* thread = frame->GetActiveThread(); 465*a9643ea8Slogwang if ((!frame) || (!thread)) { 466*a9643ea8Slogwang MTLOG_ERROR("frame and act thread null, %p, %p", frame, thread); 467*a9643ea8Slogwang return; 468*a9643ea8Slogwang } 469*a9643ea8Slogwang 470*a9643ea8Slogwang thread->Run(); 471*a9643ea8Slogwang } 472*a9643ea8Slogwang 473*a9643ea8Slogwang 474*a9643ea8Slogwang /** 475*a9643ea8Slogwang * @brief �̳߳�ȫ�ֲ�����ʼ�� 476*a9643ea8Slogwang */ 477*a9643ea8Slogwang unsigned int ThreadPool::default_thread_num = DEFAULT_THREAD_NUM; ///< Ĭ��2000�̴߳��� 478*a9643ea8Slogwang unsigned int ThreadPool::default_stack_size = DEFAULT_STACK_SIZE; ///< Ĭ��128Kջ��С 479*a9643ea8Slogwang 480*a9643ea8Slogwang /** 481*a9643ea8Slogwang * @brief �̳߳س�ʼ�� 482*a9643ea8Slogwang */ 483*a9643ea8Slogwang bool ThreadPool::InitialPool(int max_num) 484*a9643ea8Slogwang { 485*a9643ea8Slogwang MicroThread *thread = NULL; 486*a9643ea8Slogwang for (unsigned int i = 0; i < default_thread_num; i++) 487*a9643ea8Slogwang { 488*a9643ea8Slogwang thread = new MicroThread(); 489*a9643ea8Slogwang if ((NULL == thread) || (false == thread->Initial())) 490*a9643ea8Slogwang { 491*a9643ea8Slogwang MTLOG_ERROR("init pool, thread %p init failed", thread); 492*a9643ea8Slogwang if (thread) delete thread; 493*a9643ea8Slogwang continue; 494*a9643ea8Slogwang } 495*a9643ea8Slogwang thread->SetFlag(MicroThread::FREE_LIST); 496*a9643ea8Slogwang _freelist.push(thread); 497*a9643ea8Slogwang } 498*a9643ea8Slogwang 499*a9643ea8Slogwang _total_num = _freelist.size(); 500*a9643ea8Slogwang _max_num = max_num; 501*a9643ea8Slogwang _use_num = 0; 502*a9643ea8Slogwang if (_total_num <= 0) 503*a9643ea8Slogwang { 504*a9643ea8Slogwang return false; 505*a9643ea8Slogwang } 506*a9643ea8Slogwang else 507*a9643ea8Slogwang { 508*a9643ea8Slogwang return true; 509*a9643ea8Slogwang } 510*a9643ea8Slogwang } 511*a9643ea8Slogwang 512*a9643ea8Slogwang /** 513*a9643ea8Slogwang * @brief �̳߳ط���ʼ�� 514*a9643ea8Slogwang */ 515*a9643ea8Slogwang void ThreadPool::DestroyPool() 516*a9643ea8Slogwang { 517*a9643ea8Slogwang MicroThread* thread = NULL; 518*a9643ea8Slogwang while (!_freelist.empty()) 519*a9643ea8Slogwang { 520*a9643ea8Slogwang thread = _freelist.front(); 521*a9643ea8Slogwang _freelist.pop(); 522*a9643ea8Slogwang thread->Destroy(); 523*a9643ea8Slogwang delete thread; 524*a9643ea8Slogwang } 525*a9643ea8Slogwang 526*a9643ea8Slogwang _total_num = 0; 527*a9643ea8Slogwang _use_num = 0; 528*a9643ea8Slogwang } 529*a9643ea8Slogwang 530*a9643ea8Slogwang /** 531*a9643ea8Slogwang * @brief �̷߳���ӿ� 532*a9643ea8Slogwang * @return �̶߳��� 533*a9643ea8Slogwang */ 534*a9643ea8Slogwang MicroThread* ThreadPool::AllocThread() 535*a9643ea8Slogwang { 536*a9643ea8Slogwang MT_ATTR_API_SET(492069, _total_num); // �̳߳ش�С 537*a9643ea8Slogwang 538*a9643ea8Slogwang MicroThread* thread = NULL; 539*a9643ea8Slogwang if (!_freelist.empty()) 540*a9643ea8Slogwang { 541*a9643ea8Slogwang thread = _freelist.front(); 542*a9643ea8Slogwang _freelist.pop(); 543*a9643ea8Slogwang 544*a9643ea8Slogwang ASSERT(thread->HasFlag(MicroThread::FREE_LIST)); 545*a9643ea8Slogwang 546*a9643ea8Slogwang thread->UnsetFlag(MicroThread::FREE_LIST); 547*a9643ea8Slogwang _use_num++; 548*a9643ea8Slogwang return thread; 549*a9643ea8Slogwang } 550*a9643ea8Slogwang 551*a9643ea8Slogwang MT_ATTR_API(320846, 1); // pool no nore 552*a9643ea8Slogwang if (_total_num >= _max_num) 553*a9643ea8Slogwang { 554*a9643ea8Slogwang MT_ATTR_API(361140, 1); // no more quota 555*a9643ea8Slogwang return NULL; 556*a9643ea8Slogwang } 557*a9643ea8Slogwang 558*a9643ea8Slogwang thread = new MicroThread(); 559*a9643ea8Slogwang if ((NULL == thread) || (false == thread->Initial())) 560*a9643ea8Slogwang { 561*a9643ea8Slogwang MT_ATTR_API(320847, 1); // pool init fail 562*a9643ea8Slogwang MTLOG_ERROR("thread alloc failed, thread: %p", thread); 563*a9643ea8Slogwang if (thread) delete thread; 564*a9643ea8Slogwang return NULL; 565*a9643ea8Slogwang } 566*a9643ea8Slogwang _total_num++; 567*a9643ea8Slogwang _use_num++; 568*a9643ea8Slogwang 569*a9643ea8Slogwang return thread; 570*a9643ea8Slogwang } 571*a9643ea8Slogwang 572*a9643ea8Slogwang /** 573*a9643ea8Slogwang * @brief �߳��ͷŽӿ� 574*a9643ea8Slogwang * @param thread �̶߳��� 575*a9643ea8Slogwang */ 576*a9643ea8Slogwang void ThreadPool::FreeThread(MicroThread* thread) 577*a9643ea8Slogwang { 578*a9643ea8Slogwang ASSERT(!thread->HasFlag(MicroThread::FREE_LIST)); 579*a9643ea8Slogwang thread->Reset(); 580*a9643ea8Slogwang _use_num--; 581*a9643ea8Slogwang _freelist.push(thread); 582*a9643ea8Slogwang thread->SetFlag(MicroThread::FREE_LIST); 583*a9643ea8Slogwang 584*a9643ea8Slogwang ///< ���ж��� > default_thread_num, ���ͷ����ϵ�, �������ͷŵ�ǰ 585*a9643ea8Slogwang unsigned int free_num = _freelist.size(); 586*a9643ea8Slogwang if ((free_num > default_thread_num) && (free_num > 1)) 587*a9643ea8Slogwang { 588*a9643ea8Slogwang thread = _freelist.front(); 589*a9643ea8Slogwang _freelist.pop(); 590*a9643ea8Slogwang thread->Destroy(); 591*a9643ea8Slogwang delete thread; 592*a9643ea8Slogwang _total_num--; 593*a9643ea8Slogwang } 594*a9643ea8Slogwang } 595*a9643ea8Slogwang 596*a9643ea8Slogwang int ThreadPool::GetUsedNum(void) 597*a9643ea8Slogwang { 598*a9643ea8Slogwang return _use_num; 599*a9643ea8Slogwang } 600*a9643ea8Slogwang 601*a9643ea8Slogwang /** 602*a9643ea8Slogwang * @brief �߳̿����, ȫ��ʵ����ȡ 603*a9643ea8Slogwang */ 604*a9643ea8Slogwang MtFrame *MtFrame::_instance = NULL; 605*a9643ea8Slogwang inline MtFrame* MtFrame::Instance () 606*a9643ea8Slogwang { 607*a9643ea8Slogwang if (NULL == _instance ) 608*a9643ea8Slogwang { 609*a9643ea8Slogwang _instance = new MtFrame(); 610*a9643ea8Slogwang } 611*a9643ea8Slogwang 612*a9643ea8Slogwang return _instance; 613*a9643ea8Slogwang } 614*a9643ea8Slogwang 615*a9643ea8Slogwang /** 616*a9643ea8Slogwang * @brief HOOKϵͳapi������ 617*a9643ea8Slogwang */ 618*a9643ea8Slogwang void MtFrame::SetHookFlag() { 619*a9643ea8Slogwang mt_set_hook_flag(); 620*a9643ea8Slogwang }; 621*a9643ea8Slogwang 622*a9643ea8Slogwang 623*a9643ea8Slogwang /** 624*a9643ea8Slogwang * @brief ��ܳ�ʼ��, Ĭ�ϲ�����־���� 625*a9643ea8Slogwang */ 626*a9643ea8Slogwang bool MtFrame::InitFrame(LogAdapter* logadpt, int max_thread_num) 627*a9643ea8Slogwang { 628*a9643ea8Slogwang _log_adpt = logadpt; 629*a9643ea8Slogwang 630*a9643ea8Slogwang // �������������߳���Ŀ, ���Ե���epoll��ص�fd��Ŀ 631*a9643ea8Slogwang if ((this->InitKqueue(max_thread_num) < 0) || !this->InitialPool(max_thread_num)) 632*a9643ea8Slogwang { 633*a9643ea8Slogwang MTLOG_ERROR("Init epoll or thread pool failed"); 634*a9643ea8Slogwang this->Destroy(); 635*a9643ea8Slogwang return false; 636*a9643ea8Slogwang } 637*a9643ea8Slogwang 638*a9643ea8Slogwang // �������öѴ�С, �Ŵ�Ѹ���Ϊ2�� 639*a9643ea8Slogwang if (_sleeplist.HeapResize(max_thread_num * 2) < 0) 640*a9643ea8Slogwang { 641*a9643ea8Slogwang MTLOG_ERROR("Init heap list failed"); 642*a9643ea8Slogwang this->Destroy(); 643*a9643ea8Slogwang return false; 644*a9643ea8Slogwang } 645*a9643ea8Slogwang 646*a9643ea8Slogwang // ��ʱ�������ʼ��, �Ŵ�Ѹ���Ϊ2�� 647*a9643ea8Slogwang _timer = new CTimerMng(max_thread_num * 2); 648*a9643ea8Slogwang if (NULL == _timer) 649*a9643ea8Slogwang { 650*a9643ea8Slogwang MTLOG_ERROR("Init heap timer failed"); 651*a9643ea8Slogwang this->Destroy(); 652*a9643ea8Slogwang return false; 653*a9643ea8Slogwang } 654*a9643ea8Slogwang 655*a9643ea8Slogwang // �ػ��̵߳�����ʼ�� 656*a9643ea8Slogwang _daemon = AllocThread(); 657*a9643ea8Slogwang if (NULL == _daemon) 658*a9643ea8Slogwang { 659*a9643ea8Slogwang MTLOG_ERROR("Alloc daemon thread failed"); 660*a9643ea8Slogwang this->Destroy(); 661*a9643ea8Slogwang return false; 662*a9643ea8Slogwang } 663*a9643ea8Slogwang _daemon->SetType(MicroThread::DAEMON); 664*a9643ea8Slogwang _daemon->SetState(MicroThread::RUNABLE); 665*a9643ea8Slogwang _daemon->SetSartFunc(MtFrame::DaemonRun, this); 666*a9643ea8Slogwang 667*a9643ea8Slogwang // �����߳�, ����INIT, ����ʼ��ջ, Ҳ�ص�ע��, ����Ҫͳһ���� 668*a9643ea8Slogwang _primo = new MicroThread(MicroThread::PRIMORDIAL); 669*a9643ea8Slogwang if (NULL == _primo) 670*a9643ea8Slogwang { 671*a9643ea8Slogwang MTLOG_ERROR("new _primo thread failed"); 672*a9643ea8Slogwang this->Destroy(); 673*a9643ea8Slogwang return false; 674*a9643ea8Slogwang } 675*a9643ea8Slogwang _primo->SetState(MicroThread::RUNNING); 676*a9643ea8Slogwang SetActiveThread(_primo); 677*a9643ea8Slogwang 678*a9643ea8Slogwang // ��������ʱ��� 679*a9643ea8Slogwang _last_clock = GetSystemMS(); 680*a9643ea8Slogwang TAILQ_INIT(&_iolist); 681*a9643ea8Slogwang TAILQ_INIT(&_pend_list); 682*a9643ea8Slogwang 683*a9643ea8Slogwang //SetHookFlag(); 684*a9643ea8Slogwang 685*a9643ea8Slogwang return true; 686*a9643ea8Slogwang 687*a9643ea8Slogwang } 688*a9643ea8Slogwang 689*a9643ea8Slogwang /** 690*a9643ea8Slogwang * @brief ��ܷ���ʼ�� 691*a9643ea8Slogwang */ 692*a9643ea8Slogwang void MtFrame::Destroy(void) 693*a9643ea8Slogwang { 694*a9643ea8Slogwang if (NULL == _instance ) 695*a9643ea8Slogwang { 696*a9643ea8Slogwang return; 697*a9643ea8Slogwang } 698*a9643ea8Slogwang 699*a9643ea8Slogwang if (_primo) { 700*a9643ea8Slogwang delete _primo; 701*a9643ea8Slogwang _primo = NULL; 702*a9643ea8Slogwang } 703*a9643ea8Slogwang 704*a9643ea8Slogwang if (_daemon) { 705*a9643ea8Slogwang FreeThread(_daemon); 706*a9643ea8Slogwang _daemon = NULL; 707*a9643ea8Slogwang } 708*a9643ea8Slogwang 709*a9643ea8Slogwang TAILQ_INIT(&_iolist); 710*a9643ea8Slogwang 711*a9643ea8Slogwang MicroThread* thread = dynamic_cast<MicroThread*>(_sleeplist.HeapPop()); 712*a9643ea8Slogwang while (thread) 713*a9643ea8Slogwang { 714*a9643ea8Slogwang FreeThread(thread); 715*a9643ea8Slogwang thread = dynamic_cast<MicroThread*>(_sleeplist.HeapPop()); 716*a9643ea8Slogwang } 717*a9643ea8Slogwang 718*a9643ea8Slogwang while (!_runlist.empty()) 719*a9643ea8Slogwang { 720*a9643ea8Slogwang thread = _runlist.front(); 721*a9643ea8Slogwang _runlist.pop(); 722*a9643ea8Slogwang FreeThread(thread); 723*a9643ea8Slogwang } 724*a9643ea8Slogwang 725*a9643ea8Slogwang MicroThread* tmp; 726*a9643ea8Slogwang TAILQ_FOREACH_SAFE(thread, &_pend_list, _entry, tmp) 727*a9643ea8Slogwang { 728*a9643ea8Slogwang TAILQ_REMOVE(&_pend_list, thread, _entry); 729*a9643ea8Slogwang FreeThread(thread); 730*a9643ea8Slogwang } 731*a9643ea8Slogwang 732*a9643ea8Slogwang if (_timer != NULL) 733*a9643ea8Slogwang { 734*a9643ea8Slogwang delete _timer; 735*a9643ea8Slogwang _timer = NULL; 736*a9643ea8Slogwang } 737*a9643ea8Slogwang 738*a9643ea8Slogwang _instance->DestroyPool(); 739*a9643ea8Slogwang _instance->TermKqueue(); 740*a9643ea8Slogwang delete _instance; 741*a9643ea8Slogwang _instance = NULL; 742*a9643ea8Slogwang } 743*a9643ea8Slogwang 744*a9643ea8Slogwang /** 745*a9643ea8Slogwang * @brief �߳̿�ܰ汾��ȡ 746*a9643ea8Slogwang */ 747*a9643ea8Slogwang char* MtFrame::Version() 748*a9643ea8Slogwang { 749*a9643ea8Slogwang return IMT_VERSION; 750*a9643ea8Slogwang } 751*a9643ea8Slogwang 752*a9643ea8Slogwang /** 753*a9643ea8Slogwang * @brief �̴߳����ӿ� 754*a9643ea8Slogwang * @param entry �߳���ں��� 755*a9643ea8Slogwang * @param args �߳���ڲ��� 756*a9643ea8Slogwang * @return �߳�ָ��, NULL��ʾʧ�� 757*a9643ea8Slogwang */ 758*a9643ea8Slogwang MicroThread* MtFrame::CreateThread(ThreadStart entry, void *args, bool runable) 759*a9643ea8Slogwang { 760*a9643ea8Slogwang MtFrame* mtframe = MtFrame::Instance(); 761*a9643ea8Slogwang MicroThread* thread = mtframe->AllocThread(); 762*a9643ea8Slogwang if (NULL == thread) 763*a9643ea8Slogwang { 764*a9643ea8Slogwang MTLOG_ERROR("create thread failed"); 765*a9643ea8Slogwang return NULL; 766*a9643ea8Slogwang } 767*a9643ea8Slogwang thread->SetSartFunc(entry, args); 768*a9643ea8Slogwang 769*a9643ea8Slogwang if (runable) { 770*a9643ea8Slogwang mtframe->InsertRunable(thread); 771*a9643ea8Slogwang } 772*a9643ea8Slogwang 773*a9643ea8Slogwang return thread; 774*a9643ea8Slogwang } 775*a9643ea8Slogwang 776*a9643ea8Slogwang int MtFrame::Loop(void* args) 777*a9643ea8Slogwang { 778*a9643ea8Slogwang MtFrame* mtframe = MtFrame::Instance(); 779*a9643ea8Slogwang MicroThread* daemon = mtframe->DaemonThread(); 780*a9643ea8Slogwang 781*a9643ea8Slogwang mtframe->KqueueDispatch(); 782*a9643ea8Slogwang mtframe->SetLastClock(mtframe->GetSystemMS()); 783*a9643ea8Slogwang mtframe->WakeupTimeout(); 784*a9643ea8Slogwang mtframe->CheckExpired(); 785*a9643ea8Slogwang daemon->SwitchContext(); 786*a9643ea8Slogwang 787*a9643ea8Slogwang return 0; 788*a9643ea8Slogwang } 789*a9643ea8Slogwang 790*a9643ea8Slogwang /** 791*a9643ea8Slogwang * @brief �ػ��߳���ں���, ����ָ��Ҫ��static���� 792*a9643ea8Slogwang * @param args �߳���ڲ��� 793*a9643ea8Slogwang */ 794*a9643ea8Slogwang void MtFrame::DaemonRun(void* args) 795*a9643ea8Slogwang { 796*a9643ea8Slogwang /* 797*a9643ea8Slogwang MtFrame* mtframe = MtFrame::Instance(); 798*a9643ea8Slogwang MicroThread* daemon = mtframe->DaemonThread(); 799*a9643ea8Slogwang 800*a9643ea8Slogwang while (true) { 801*a9643ea8Slogwang mtframe->KqueueDispatch(); 802*a9643ea8Slogwang mtframe->SetLastClock(mtframe->GetSystemMS()); 803*a9643ea8Slogwang mtframe->WakeupTimeout(); 804*a9643ea8Slogwang mtframe->CheckExpired(); 805*a9643ea8Slogwang daemon->SwitchContext(); 806*a9643ea8Slogwang } 807*a9643ea8Slogwang */ 808*a9643ea8Slogwang ff_run(MtFrame::Loop, NULL); 809*a9643ea8Slogwang } 810*a9643ea8Slogwang 811*a9643ea8Slogwang /** 812*a9643ea8Slogwang * @brief ��ȡ��ǰ�̵߳ĸ��߳� 813*a9643ea8Slogwang */ 814*a9643ea8Slogwang MicroThread *MtFrame::GetRootThread() 815*a9643ea8Slogwang { 816*a9643ea8Slogwang if (NULL == _curr_thread) 817*a9643ea8Slogwang { 818*a9643ea8Slogwang return NULL; 819*a9643ea8Slogwang } 820*a9643ea8Slogwang 821*a9643ea8Slogwang MicroThread::ThreadType type = _curr_thread->GetType(); 822*a9643ea8Slogwang MicroThread *thread = _curr_thread; 823*a9643ea8Slogwang MicroThread *parent = thread; 824*a9643ea8Slogwang 825*a9643ea8Slogwang while (MicroThread::SUB_THREAD == type) 826*a9643ea8Slogwang { 827*a9643ea8Slogwang thread = thread->GetParent(); 828*a9643ea8Slogwang if (!thread) 829*a9643ea8Slogwang { 830*a9643ea8Slogwang break; 831*a9643ea8Slogwang } 832*a9643ea8Slogwang 833*a9643ea8Slogwang type = thread->GetType(); 834*a9643ea8Slogwang parent = thread; 835*a9643ea8Slogwang } 836*a9643ea8Slogwang 837*a9643ea8Slogwang return parent; 838*a9643ea8Slogwang } 839*a9643ea8Slogwang 840*a9643ea8Slogwang /** 841*a9643ea8Slogwang * @brief ��ܵ����߳����� 842*a9643ea8Slogwang */ 843*a9643ea8Slogwang void MtFrame::ThreadSchdule() 844*a9643ea8Slogwang { 845*a9643ea8Slogwang MicroThread* thread = NULL; 846*a9643ea8Slogwang MtFrame* mtframe = MtFrame::Instance(); 847*a9643ea8Slogwang 848*a9643ea8Slogwang if (mtframe->_runlist.empty()) 849*a9643ea8Slogwang { 850*a9643ea8Slogwang thread = mtframe->DaemonThread(); 851*a9643ea8Slogwang } 852*a9643ea8Slogwang else 853*a9643ea8Slogwang { 854*a9643ea8Slogwang thread = mtframe->_runlist.front(); 855*a9643ea8Slogwang mtframe->RemoveRunable(thread); 856*a9643ea8Slogwang } 857*a9643ea8Slogwang 858*a9643ea8Slogwang this->SetActiveThread(thread); 859*a9643ea8Slogwang thread->SetState(MicroThread::RUNNING); 860*a9643ea8Slogwang thread->RestoreContext(); 861*a9643ea8Slogwang } 862*a9643ea8Slogwang 863*a9643ea8Slogwang /** 864*a9643ea8Slogwang * @brief ��ܴ���ʱ�ص����� 865*a9643ea8Slogwang */ 866*a9643ea8Slogwang void MtFrame::CheckExpired() 867*a9643ea8Slogwang { 868*a9643ea8Slogwang static utime64_t check_time = 0; 869*a9643ea8Slogwang 870*a9643ea8Slogwang if (_timer != NULL) 871*a9643ea8Slogwang { 872*a9643ea8Slogwang _timer->check_expired(); 873*a9643ea8Slogwang } 874*a9643ea8Slogwang 875*a9643ea8Slogwang utime64_t now = GetLastClock(); 876*a9643ea8Slogwang 877*a9643ea8Slogwang if ((now - check_time) > 1000) 878*a9643ea8Slogwang { 879*a9643ea8Slogwang CNetMgr::Instance()->RecycleObjs(now); 880*a9643ea8Slogwang check_time = now; 881*a9643ea8Slogwang } 882*a9643ea8Slogwang } 883*a9643ea8Slogwang 884*a9643ea8Slogwang /** 885*a9643ea8Slogwang * @brief ��ܼ���ʱ, �������еij�ʱ�߳� 886*a9643ea8Slogwang */ 887*a9643ea8Slogwang void MtFrame::WakeupTimeout() 888*a9643ea8Slogwang { 889*a9643ea8Slogwang utime64_t now = GetLastClock(); 890*a9643ea8Slogwang MicroThread* thread = dynamic_cast<MicroThread*>(_sleeplist.HeapTop()); 891*a9643ea8Slogwang while (thread && (thread->GetWakeupTime() <= now)) 892*a9643ea8Slogwang { 893*a9643ea8Slogwang if (thread->HasFlag(MicroThread::IO_LIST)) 894*a9643ea8Slogwang { 895*a9643ea8Slogwang RemoveIoWait(thread); 896*a9643ea8Slogwang } 897*a9643ea8Slogwang else 898*a9643ea8Slogwang { 899*a9643ea8Slogwang RemoveSleep(thread); 900*a9643ea8Slogwang } 901*a9643ea8Slogwang 902*a9643ea8Slogwang InsertRunable(thread); 903*a9643ea8Slogwang 904*a9643ea8Slogwang thread = dynamic_cast<MicroThread*>(_sleeplist.HeapTop()); 905*a9643ea8Slogwang } 906*a9643ea8Slogwang } 907*a9643ea8Slogwang 908*a9643ea8Slogwang /** 909*a9643ea8Slogwang * @brief ��ܵ���epoll waitǰ, �ж��ȴ�ʱ����Ϣ 910*a9643ea8Slogwang */ 911*a9643ea8Slogwang int MtFrame::KqueueGetTimeout() 912*a9643ea8Slogwang { 913*a9643ea8Slogwang utime64_t now = GetLastClock(); 914*a9643ea8Slogwang MicroThread* thread = dynamic_cast<MicroThread*>(_sleeplist.HeapTop()); 915*a9643ea8Slogwang if (!thread) 916*a9643ea8Slogwang { 917*a9643ea8Slogwang return 10; //Ĭ��10ms epollwait 918*a9643ea8Slogwang } 919*a9643ea8Slogwang else if (thread->GetWakeupTime() < now) 920*a9643ea8Slogwang { 921*a9643ea8Slogwang return 0; 922*a9643ea8Slogwang } 923*a9643ea8Slogwang else 924*a9643ea8Slogwang { 925*a9643ea8Slogwang return (int)(thread->GetWakeupTime() - now); 926*a9643ea8Slogwang } 927*a9643ea8Slogwang } 928*a9643ea8Slogwang 929*a9643ea8Slogwang /** 930*a9643ea8Slogwang * @brief ��ܹ����̵߳�Ԫ, ��������� 931*a9643ea8Slogwang * @param thread �̶߳��� 932*a9643ea8Slogwang */ 933*a9643ea8Slogwang inline void MtFrame::InsertSleep(MicroThread* thread) 934*a9643ea8Slogwang { 935*a9643ea8Slogwang ASSERT(!thread->HasFlag(MicroThread::SLEEP_LIST)); 936*a9643ea8Slogwang 937*a9643ea8Slogwang thread->SetFlag(MicroThread::SLEEP_LIST); 938*a9643ea8Slogwang thread->SetState(MicroThread::SLEEPING); 939*a9643ea8Slogwang int rc = _sleeplist.HeapPush(thread); 940*a9643ea8Slogwang if (rc < 0) 941*a9643ea8Slogwang { 942*a9643ea8Slogwang MT_ATTR_API(320848, 1); // heap error 943*a9643ea8Slogwang MTLOG_ERROR("Insert heap failed , rc %d", rc); 944*a9643ea8Slogwang } 945*a9643ea8Slogwang } 946*a9643ea8Slogwang 947*a9643ea8Slogwang /** 948*a9643ea8Slogwang * @brief ��ܹ����̵߳�Ԫ, �Ƴ������ 949*a9643ea8Slogwang * @param thread �̶߳��� 950*a9643ea8Slogwang */ 951*a9643ea8Slogwang inline void MtFrame::RemoveSleep(MicroThread* thread) 952*a9643ea8Slogwang { 953*a9643ea8Slogwang ASSERT(thread->HasFlag(MicroThread::SLEEP_LIST)); 954*a9643ea8Slogwang thread->UnsetFlag(MicroThread::SLEEP_LIST); 955*a9643ea8Slogwang 956*a9643ea8Slogwang int rc = _sleeplist.HeapDelete(thread); 957*a9643ea8Slogwang if (rc < 0) 958*a9643ea8Slogwang { 959*a9643ea8Slogwang MT_ATTR_API(320849, 1); // heap error 960*a9643ea8Slogwang MTLOG_ERROR("remove heap failed , rc %d", rc); 961*a9643ea8Slogwang } 962*a9643ea8Slogwang } 963*a9643ea8Slogwang 964*a9643ea8Slogwang /** 965*a9643ea8Slogwang * @brief ��ܹ����̵߳�Ԫ, ִ��IO�ȴ�״̬ 966*a9643ea8Slogwang * @param thread �̶߳��� 967*a9643ea8Slogwang */ 968*a9643ea8Slogwang inline void MtFrame::InsertIoWait(MicroThread* thread) 969*a9643ea8Slogwang { 970*a9643ea8Slogwang ASSERT(!thread->HasFlag(MicroThread::IO_LIST)); 971*a9643ea8Slogwang thread->SetFlag(MicroThread::IO_LIST); 972*a9643ea8Slogwang TAILQ_INSERT_TAIL(&_iolist, thread, _entry); 973*a9643ea8Slogwang InsertSleep(thread); 974*a9643ea8Slogwang } 975*a9643ea8Slogwang 976*a9643ea8Slogwang /** 977*a9643ea8Slogwang * @brief ��ܹ����̵߳�Ԫ, �Ƴ�IO�ȴ�״̬ 978*a9643ea8Slogwang * @param thread �̶߳��� 979*a9643ea8Slogwang */ 980*a9643ea8Slogwang void MtFrame::RemoveIoWait(MicroThread* thread) 981*a9643ea8Slogwang { 982*a9643ea8Slogwang ASSERT(thread->HasFlag(MicroThread::IO_LIST)); 983*a9643ea8Slogwang thread->UnsetFlag(MicroThread::IO_LIST); 984*a9643ea8Slogwang TAILQ_REMOVE(&_iolist, thread, _entry); 985*a9643ea8Slogwang 986*a9643ea8Slogwang RemoveSleep(thread); 987*a9643ea8Slogwang } 988*a9643ea8Slogwang 989*a9643ea8Slogwang /** 990*a9643ea8Slogwang * @brief ��ܹ����̵߳�Ԫ, ��������ж��� 991*a9643ea8Slogwang * @param thread �̶߳��� 992*a9643ea8Slogwang */ 993*a9643ea8Slogwang void MtFrame::InsertRunable(MicroThread* thread) 994*a9643ea8Slogwang { 995*a9643ea8Slogwang ASSERT(!thread->HasFlag(MicroThread::RUN_LIST)); 996*a9643ea8Slogwang thread->SetFlag(MicroThread::RUN_LIST); 997*a9643ea8Slogwang 998*a9643ea8Slogwang thread->SetState(MicroThread::RUNABLE); 999*a9643ea8Slogwang _runlist.push(thread); 1000*a9643ea8Slogwang _waitnum++; 1001*a9643ea8Slogwang } 1002*a9643ea8Slogwang 1003*a9643ea8Slogwang /** 1004*a9643ea8Slogwang * @brief ��ܹ����̵߳�Ԫ, �Ƴ������ж��� 1005*a9643ea8Slogwang * @param thread �̶߳��� 1006*a9643ea8Slogwang */ 1007*a9643ea8Slogwang inline void MtFrame::RemoveRunable(MicroThread* thread) 1008*a9643ea8Slogwang { 1009*a9643ea8Slogwang ASSERT(thread->HasFlag(MicroThread::RUN_LIST)); 1010*a9643ea8Slogwang ASSERT(thread == _runlist.front()); 1011*a9643ea8Slogwang thread->UnsetFlag(MicroThread::RUN_LIST); 1012*a9643ea8Slogwang 1013*a9643ea8Slogwang _runlist.pop(); 1014*a9643ea8Slogwang _waitnum--; 1015*a9643ea8Slogwang } 1016*a9643ea8Slogwang 1017*a9643ea8Slogwang 1018*a9643ea8Slogwang /** 1019*a9643ea8Slogwang * @brief ��ܹ����̵߳�Ԫ, ִ��pend�ȴ�״̬ 1020*a9643ea8Slogwang * @param thread �̶߳��� 1021*a9643ea8Slogwang */ 1022*a9643ea8Slogwang void MtFrame::InsertPend(MicroThread* thread) 1023*a9643ea8Slogwang { 1024*a9643ea8Slogwang ASSERT(!thread->HasFlag(MicroThread::PEND_LIST)); 1025*a9643ea8Slogwang thread->SetFlag(MicroThread::PEND_LIST); 1026*a9643ea8Slogwang TAILQ_INSERT_TAIL(&_pend_list, thread, _entry); 1027*a9643ea8Slogwang thread->SetState(MicroThread::PENDING); 1028*a9643ea8Slogwang } 1029*a9643ea8Slogwang 1030*a9643ea8Slogwang /** 1031*a9643ea8Slogwang * @brief ��ܹ����̵߳�Ԫ, �Ƴ�PEND�ȴ�״̬ 1032*a9643ea8Slogwang * @param thread �̶߳��� 1033*a9643ea8Slogwang */ 1034*a9643ea8Slogwang void MtFrame::RemovePend(MicroThread* thread) 1035*a9643ea8Slogwang { 1036*a9643ea8Slogwang ASSERT(thread->HasFlag(MicroThread::PEND_LIST)); 1037*a9643ea8Slogwang thread->UnsetFlag(MicroThread::PEND_LIST); 1038*a9643ea8Slogwang TAILQ_REMOVE(&_pend_list, thread, _entry); 1039*a9643ea8Slogwang } 1040*a9643ea8Slogwang 1041*a9643ea8Slogwang /** 1042*a9643ea8Slogwang * @brief �߳������л�, �ȴ������̵߳Ļ��� 1043*a9643ea8Slogwang * @param timeout ��ȴ�ʱ��, ���� 1044*a9643ea8Slogwang */ 1045*a9643ea8Slogwang void MtFrame::WaitNotify(utime64_t timeout) 1046*a9643ea8Slogwang { 1047*a9643ea8Slogwang MicroThread* thread = GetActiveThread(); 1048*a9643ea8Slogwang 1049*a9643ea8Slogwang thread->SetWakeupTime(timeout + this->GetLastClock()); 1050*a9643ea8Slogwang this->InsertIoWait(thread); 1051*a9643ea8Slogwang thread->SwitchContext(); 1052*a9643ea8Slogwang } 1053*a9643ea8Slogwang 1054*a9643ea8Slogwang /** 1055*a9643ea8Slogwang * @brief �̴߳����л�����,���óɹ� ���ó�cpu 1056*a9643ea8Slogwang * @param fdlist ��·������socket�б� 1057*a9643ea8Slogwang * @param fd ���������fd��Ϣ 1058*a9643ea8Slogwang * @param timeout ��ȴ�ʱ��, ���� 1059*a9643ea8Slogwang * @return true �ɹ�, false ʧ�� 1060*a9643ea8Slogwang */ 1061*a9643ea8Slogwang bool MtFrame::KqueueSchedule(KqObjList* fdlist, KqueuerObj* fd, int timeout) 1062*a9643ea8Slogwang { 1063*a9643ea8Slogwang MicroThread* thread = GetActiveThread(); 1064*a9643ea8Slogwang if (NULL == thread) 1065*a9643ea8Slogwang { 1066*a9643ea8Slogwang MTLOG_ERROR("active thread null, epoll schedule failed"); 1067*a9643ea8Slogwang return false; 1068*a9643ea8Slogwang } 1069*a9643ea8Slogwang 1070*a9643ea8Slogwang // 1. ���ϸ��߳���Ҫ���ĵ�epoll���ȶ��� 1071*a9643ea8Slogwang thread->ClearAllFd(); 1072*a9643ea8Slogwang if (fdlist) 1073*a9643ea8Slogwang { 1074*a9643ea8Slogwang thread->AddFdList(fdlist); 1075*a9643ea8Slogwang } 1076*a9643ea8Slogwang if (fd) 1077*a9643ea8Slogwang { 1078*a9643ea8Slogwang thread->AddFd(fd); 1079*a9643ea8Slogwang } 1080*a9643ea8Slogwang 1081*a9643ea8Slogwang // 2. ����epoll�����¼�, ������ʱʱ��, �л�IO�ȴ�״̬, �����л� 1082*a9643ea8Slogwang thread->SetWakeupTime(timeout + this->GetLastClock()); 1083*a9643ea8Slogwang if (!this->KqueueAdd(thread->GetFdSet())) 1084*a9643ea8Slogwang { 1085*a9643ea8Slogwang MTLOG_ERROR("epoll add failed, errno: %d", errno); 1086*a9643ea8Slogwang return false; 1087*a9643ea8Slogwang } 1088*a9643ea8Slogwang this->InsertIoWait(thread); 1089*a9643ea8Slogwang thread->SwitchContext(); 1090*a9643ea8Slogwang 1091*a9643ea8Slogwang // 3. ����OK, �ж���ʱ, epoll ctrl ��ԭ״̬ 1092*a9643ea8Slogwang int rcvnum = 0; 1093*a9643ea8Slogwang KqObjList& rcvfds = thread->GetFdSet(); 1094*a9643ea8Slogwang KqueuerObj* fdata = NULL; 1095*a9643ea8Slogwang TAILQ_FOREACH(fdata, &rcvfds, _entry) 1096*a9643ea8Slogwang { 1097*a9643ea8Slogwang if (fdata->GetRcvEvents() != 0) 1098*a9643ea8Slogwang { 1099*a9643ea8Slogwang rcvnum++; 1100*a9643ea8Slogwang } 1101*a9643ea8Slogwang } 1102*a9643ea8Slogwang this->KqueueDel(rcvfds); // ��һ��������ADD, DEL �ջ����� 1103*a9643ea8Slogwang 1104*a9643ea8Slogwang if (rcvnum == 0) // ��ʱ����, ���ش��� 1105*a9643ea8Slogwang { 1106*a9643ea8Slogwang errno = ETIME; 1107*a9643ea8Slogwang return false; 1108*a9643ea8Slogwang } 1109*a9643ea8Slogwang 1110*a9643ea8Slogwang return true; 1111*a9643ea8Slogwang } 1112*a9643ea8Slogwang 1113*a9643ea8Slogwang /** 1114*a9643ea8Slogwang * @brief �̰߳�����ϵͳIO���� recvfrom 1115*a9643ea8Slogwang * @param fd ϵͳsocket��Ϣ 1116*a9643ea8Slogwang * @param buf ������Ϣ������ָ�� 1117*a9643ea8Slogwang * @param len ������Ϣ���������� 1118*a9643ea8Slogwang * @param from ��Դ��ַ��ָ�� 1119*a9643ea8Slogwang * @param fromlen ��Դ��ַ�Ľṹ���� 1120*a9643ea8Slogwang * @param timeout ��ȴ�ʱ��, ���� 1121*a9643ea8Slogwang * @return >0 �ɹ����ճ���, <0 ʧ�� 1122*a9643ea8Slogwang */ 1123*a9643ea8Slogwang int MtFrame::recvfrom(int fd, void *buf, int len, int flags, struct sockaddr *from, socklen_t *fromlen, int timeout) 1124*a9643ea8Slogwang { 1125*a9643ea8Slogwang MtFrame* mtframe = MtFrame::Instance(); 1126*a9643ea8Slogwang utime64_t start = mtframe->GetLastClock(); 1127*a9643ea8Slogwang MicroThread* thread = mtframe->GetActiveThread(); 1128*a9643ea8Slogwang utime64_t now = 0; 1129*a9643ea8Slogwang 1130*a9643ea8Slogwang if(fd<0 || !buf || len<1) 1131*a9643ea8Slogwang { 1132*a9643ea8Slogwang errno = EINVAL; 1133*a9643ea8Slogwang MTLOG_ERROR("recvfrom failed, errno: %d (%m)", errno); 1134*a9643ea8Slogwang return -10; 1135*a9643ea8Slogwang } 1136*a9643ea8Slogwang 1137*a9643ea8Slogwang if (timeout <= -1) 1138*a9643ea8Slogwang { 1139*a9643ea8Slogwang timeout = 0x7fffffff; 1140*a9643ea8Slogwang } 1141*a9643ea8Slogwang 1142*a9643ea8Slogwang while (true) 1143*a9643ea8Slogwang { 1144*a9643ea8Slogwang now = mtframe->GetLastClock(); 1145*a9643ea8Slogwang if ((int)(now - start) > timeout) 1146*a9643ea8Slogwang { 1147*a9643ea8Slogwang errno = ETIME; 1148*a9643ea8Slogwang return -1; 1149*a9643ea8Slogwang } 1150*a9643ea8Slogwang 1151*a9643ea8Slogwang KqueuerObj epfd; 1152*a9643ea8Slogwang epfd.SetOsfd(fd); 1153*a9643ea8Slogwang epfd.EnableInput(); 1154*a9643ea8Slogwang epfd.SetOwnerThread(thread); 1155*a9643ea8Slogwang if (!mtframe->KqueueSchedule(NULL, &epfd, timeout)) 1156*a9643ea8Slogwang { 1157*a9643ea8Slogwang MTLOG_DEBUG("epoll schedule failed, errno: %d", errno); 1158*a9643ea8Slogwang return -2; 1159*a9643ea8Slogwang } 1160*a9643ea8Slogwang 1161*a9643ea8Slogwang mt_hook_syscall(recvfrom); 1162*a9643ea8Slogwang int n = ff_hook_recvfrom(fd, buf, len, flags, from, fromlen); 1163*a9643ea8Slogwang if (n < 0) 1164*a9643ea8Slogwang { 1165*a9643ea8Slogwang if (errno == EINTR) { 1166*a9643ea8Slogwang continue; 1167*a9643ea8Slogwang } 1168*a9643ea8Slogwang 1169*a9643ea8Slogwang if ((errno != EAGAIN) && (errno != EWOULDBLOCK)) 1170*a9643ea8Slogwang { 1171*a9643ea8Slogwang MTLOG_ERROR("recvfrom failed, errno: %d", errno); 1172*a9643ea8Slogwang return -3; 1173*a9643ea8Slogwang } 1174*a9643ea8Slogwang } 1175*a9643ea8Slogwang else 1176*a9643ea8Slogwang { 1177*a9643ea8Slogwang return n; 1178*a9643ea8Slogwang } 1179*a9643ea8Slogwang } 1180*a9643ea8Slogwang 1181*a9643ea8Slogwang } 1182*a9643ea8Slogwang 1183*a9643ea8Slogwang /** 1184*a9643ea8Slogwang * @brief �̰߳�����ϵͳIO���� sendto 1185*a9643ea8Slogwang * @param fd ϵͳsocket��Ϣ 1186*a9643ea8Slogwang * @param msg �����͵���Ϣָ�� 1187*a9643ea8Slogwang * @param len �����͵���Ϣ���� 1188*a9643ea8Slogwang * @param to Ŀ�ĵ�ַ��ָ�� 1189*a9643ea8Slogwang * @param tolen Ŀ�ĵ�ַ�Ľṹ���� 1190*a9643ea8Slogwang * @param timeout ��ȴ�ʱ��, ���� 1191*a9643ea8Slogwang * @return >0 �ɹ����ͳ���, <0 ʧ�� 1192*a9643ea8Slogwang */ 1193*a9643ea8Slogwang int MtFrame::sendto(int fd, const void *msg, int len, int flags, const struct sockaddr *to, int tolen, int timeout) 1194*a9643ea8Slogwang { 1195*a9643ea8Slogwang MtFrame* mtframe = MtFrame::Instance(); 1196*a9643ea8Slogwang utime64_t start = mtframe->GetLastClock(); 1197*a9643ea8Slogwang MicroThread* thread = mtframe->GetActiveThread(); 1198*a9643ea8Slogwang utime64_t now = 0; 1199*a9643ea8Slogwang 1200*a9643ea8Slogwang if(fd<0 || !msg || len<1) 1201*a9643ea8Slogwang { 1202*a9643ea8Slogwang errno = EINVAL; 1203*a9643ea8Slogwang MTLOG_ERROR("sendto failed, errno: %d (%m)", errno); 1204*a9643ea8Slogwang return -10; 1205*a9643ea8Slogwang } 1206*a9643ea8Slogwang 1207*a9643ea8Slogwang int n = 0; 1208*a9643ea8Slogwang mt_hook_syscall(sendto); 1209*a9643ea8Slogwang while ((n = ff_hook_sendto(fd, msg, len, flags, to, tolen)) < 0) 1210*a9643ea8Slogwang { 1211*a9643ea8Slogwang now = mtframe->GetLastClock(); 1212*a9643ea8Slogwang if ((int)(now - start) > timeout) 1213*a9643ea8Slogwang { 1214*a9643ea8Slogwang errno = ETIME; 1215*a9643ea8Slogwang return -1; 1216*a9643ea8Slogwang } 1217*a9643ea8Slogwang 1218*a9643ea8Slogwang if (errno == EINTR) { 1219*a9643ea8Slogwang continue; 1220*a9643ea8Slogwang } 1221*a9643ea8Slogwang 1222*a9643ea8Slogwang if ((errno != EAGAIN) && (errno != EWOULDBLOCK)) { 1223*a9643ea8Slogwang MTLOG_ERROR("sendto failed, errno: %d", errno); 1224*a9643ea8Slogwang return -2; 1225*a9643ea8Slogwang } 1226*a9643ea8Slogwang 1227*a9643ea8Slogwang KqueuerObj epfd; 1228*a9643ea8Slogwang epfd.SetOsfd(fd); 1229*a9643ea8Slogwang epfd.EnableOutput(); 1230*a9643ea8Slogwang epfd.SetOwnerThread(thread); 1231*a9643ea8Slogwang if (!mtframe->KqueueSchedule(NULL, &epfd, timeout)) { 1232*a9643ea8Slogwang return -3; 1233*a9643ea8Slogwang } 1234*a9643ea8Slogwang } 1235*a9643ea8Slogwang 1236*a9643ea8Slogwang return n; 1237*a9643ea8Slogwang } 1238*a9643ea8Slogwang 1239*a9643ea8Slogwang /** 1240*a9643ea8Slogwang * @brief �̰߳�����ϵͳIO���� connect 1241*a9643ea8Slogwang * @param fd ϵͳsocket��Ϣ 1242*a9643ea8Slogwang * @param addr ָ��server��Ŀ�ĵ�ַ 1243*a9643ea8Slogwang * @param addrlen ��ַ�ij��� 1244*a9643ea8Slogwang * @param timeout ��ȴ�ʱ��, ���� 1245*a9643ea8Slogwang * @return =0 ���ӳɹ�, <0 ʧ�� 1246*a9643ea8Slogwang */ 1247*a9643ea8Slogwang int MtFrame::connect(int fd, const struct sockaddr *addr, int addrlen, int timeout) 1248*a9643ea8Slogwang { 1249*a9643ea8Slogwang MtFrame* mtframe = MtFrame::Instance(); 1250*a9643ea8Slogwang utime64_t start = mtframe->GetLastClock(); 1251*a9643ea8Slogwang MicroThread* thread = mtframe->GetActiveThread(); 1252*a9643ea8Slogwang utime64_t now = 0; 1253*a9643ea8Slogwang 1254*a9643ea8Slogwang if(fd<0 || !addr || addrlen<1) 1255*a9643ea8Slogwang { 1256*a9643ea8Slogwang errno = EINVAL; 1257*a9643ea8Slogwang MTLOG_ERROR("connect failed, errno: %d (%m)", errno); 1258*a9643ea8Slogwang return -10; 1259*a9643ea8Slogwang } 1260*a9643ea8Slogwang 1261*a9643ea8Slogwang int n = 0; 1262*a9643ea8Slogwang mt_hook_syscall(connect); 1263*a9643ea8Slogwang while ((n = ff_hook_connect(fd, addr, addrlen)) < 0) 1264*a9643ea8Slogwang { 1265*a9643ea8Slogwang now = mtframe->GetLastClock(); 1266*a9643ea8Slogwang if ((int)(now - start) > timeout) 1267*a9643ea8Slogwang { 1268*a9643ea8Slogwang errno = ETIME; 1269*a9643ea8Slogwang return -1; 1270*a9643ea8Slogwang } 1271*a9643ea8Slogwang 1272*a9643ea8Slogwang if (errno == EISCONN) // ������, ���سɹ� 1273*a9643ea8Slogwang { 1274*a9643ea8Slogwang return 0; 1275*a9643ea8Slogwang } 1276*a9643ea8Slogwang 1277*a9643ea8Slogwang if (errno == EINTR) { 1278*a9643ea8Slogwang continue; 1279*a9643ea8Slogwang } 1280*a9643ea8Slogwang 1281*a9643ea8Slogwang if (errno != EINPROGRESS) { 1282*a9643ea8Slogwang MTLOG_ERROR("connect failed, errno: %d", errno); 1283*a9643ea8Slogwang return -2; 1284*a9643ea8Slogwang } 1285*a9643ea8Slogwang 1286*a9643ea8Slogwang KqueuerObj epfd; 1287*a9643ea8Slogwang epfd.SetOsfd(fd); 1288*a9643ea8Slogwang epfd.EnableOutput(); 1289*a9643ea8Slogwang epfd.SetOwnerThread(thread); 1290*a9643ea8Slogwang if (!mtframe->KqueueSchedule(NULL, &epfd, timeout)) { 1291*a9643ea8Slogwang return -3; 1292*a9643ea8Slogwang } 1293*a9643ea8Slogwang } 1294*a9643ea8Slogwang 1295*a9643ea8Slogwang return n; 1296*a9643ea8Slogwang } 1297*a9643ea8Slogwang 1298*a9643ea8Slogwang /** 1299*a9643ea8Slogwang * @brief �̰߳�����ϵͳIO���� accept 1300*a9643ea8Slogwang * @param fd �������� 1301*a9643ea8Slogwang * @param addr �ͻ��˵�ַ 1302*a9643ea8Slogwang * @param addrlen ��ַ�ij��� 1303*a9643ea8Slogwang * @param timeout ��ȴ�ʱ��, ���� 1304*a9643ea8Slogwang * @return >=0 accept��socket������, <0 ʧ�� 1305*a9643ea8Slogwang */ 1306*a9643ea8Slogwang int MtFrame::accept(int fd, struct sockaddr *addr, socklen_t *addrlen, int timeout) 1307*a9643ea8Slogwang { 1308*a9643ea8Slogwang MtFrame* mtframe = MtFrame::Instance(); 1309*a9643ea8Slogwang utime64_t start = mtframe->GetLastClock(); 1310*a9643ea8Slogwang MicroThread* thread = mtframe->GetActiveThread(); 1311*a9643ea8Slogwang utime64_t now = 0; 1312*a9643ea8Slogwang 1313*a9643ea8Slogwang if(fd<0) 1314*a9643ea8Slogwang { 1315*a9643ea8Slogwang errno = EINVAL; 1316*a9643ea8Slogwang MTLOG_ERROR("accept failed, errno: %d (%m)", errno); 1317*a9643ea8Slogwang return -10; 1318*a9643ea8Slogwang } 1319*a9643ea8Slogwang 1320*a9643ea8Slogwang int acceptfd = 0; 1321*a9643ea8Slogwang mt_hook_syscall(accept); 1322*a9643ea8Slogwang while ((acceptfd = ff_hook_accept(fd, addr, addrlen)) < 0) 1323*a9643ea8Slogwang { 1324*a9643ea8Slogwang now = mtframe->GetLastClock(); 1325*a9643ea8Slogwang if ((int)(now - start) > timeout) 1326*a9643ea8Slogwang { 1327*a9643ea8Slogwang errno = ETIME; 1328*a9643ea8Slogwang return -1; 1329*a9643ea8Slogwang } 1330*a9643ea8Slogwang 1331*a9643ea8Slogwang if (errno == EINTR) { 1332*a9643ea8Slogwang continue; 1333*a9643ea8Slogwang } 1334*a9643ea8Slogwang 1335*a9643ea8Slogwang if (!((errno == EAGAIN) || (errno == EWOULDBLOCK))) { 1336*a9643ea8Slogwang MTLOG_ERROR("accept failed, errno: %d", errno); 1337*a9643ea8Slogwang return -2; 1338*a9643ea8Slogwang } 1339*a9643ea8Slogwang 1340*a9643ea8Slogwang KqueuerObj epfd; 1341*a9643ea8Slogwang epfd.SetOsfd(fd); 1342*a9643ea8Slogwang epfd.EnableInput(); 1343*a9643ea8Slogwang epfd.SetOwnerThread(thread); 1344*a9643ea8Slogwang if (!mtframe->KqueueSchedule(NULL, &epfd, timeout)) { 1345*a9643ea8Slogwang return -3; 1346*a9643ea8Slogwang } 1347*a9643ea8Slogwang } 1348*a9643ea8Slogwang 1349*a9643ea8Slogwang return acceptfd; 1350*a9643ea8Slogwang } 1351*a9643ea8Slogwang 1352*a9643ea8Slogwang 1353*a9643ea8Slogwang /** 1354*a9643ea8Slogwang * @brief �̰߳�����ϵͳIO���� read 1355*a9643ea8Slogwang * @param fd ϵͳsocket��Ϣ 1356*a9643ea8Slogwang * @param buf ������Ϣ������ָ�� 1357*a9643ea8Slogwang * @param nbyte ������Ϣ���������� 1358*a9643ea8Slogwang * @param timeout ��ȴ�ʱ��, ���� 1359*a9643ea8Slogwang * @return >0 �ɹ����ճ���, <0 ʧ�� 1360*a9643ea8Slogwang */ 1361*a9643ea8Slogwang ssize_t MtFrame::read(int fd, void *buf, size_t nbyte, int timeout) 1362*a9643ea8Slogwang { 1363*a9643ea8Slogwang MtFrame* mtframe = MtFrame::Instance(); 1364*a9643ea8Slogwang utime64_t start = mtframe->GetLastClock(); 1365*a9643ea8Slogwang MicroThread* thread = mtframe->GetActiveThread(); 1366*a9643ea8Slogwang utime64_t now = 0; 1367*a9643ea8Slogwang 1368*a9643ea8Slogwang if(fd<0 || !buf || nbyte<1) 1369*a9643ea8Slogwang { 1370*a9643ea8Slogwang errno = EINVAL; 1371*a9643ea8Slogwang MTLOG_ERROR("read failed, errno: %d (%m)", errno); 1372*a9643ea8Slogwang return -10; 1373*a9643ea8Slogwang } 1374*a9643ea8Slogwang 1375*a9643ea8Slogwang ssize_t n = 0; 1376*a9643ea8Slogwang mt_hook_syscall(read); 1377*a9643ea8Slogwang while ((n = ff_hook_read(fd, buf, nbyte)) < 0) 1378*a9643ea8Slogwang { 1379*a9643ea8Slogwang now = mtframe->GetLastClock(); 1380*a9643ea8Slogwang if ((int)(now - start) > timeout) 1381*a9643ea8Slogwang { 1382*a9643ea8Slogwang errno = ETIME; 1383*a9643ea8Slogwang return -1; 1384*a9643ea8Slogwang } 1385*a9643ea8Slogwang 1386*a9643ea8Slogwang if (errno == EINTR) { 1387*a9643ea8Slogwang continue; 1388*a9643ea8Slogwang } 1389*a9643ea8Slogwang 1390*a9643ea8Slogwang if ((errno != EAGAIN) && (errno != EWOULDBLOCK)) { 1391*a9643ea8Slogwang MTLOG_ERROR("read failed, errno: %d", errno); 1392*a9643ea8Slogwang return -2; 1393*a9643ea8Slogwang } 1394*a9643ea8Slogwang 1395*a9643ea8Slogwang KqueuerObj epfd; 1396*a9643ea8Slogwang epfd.SetOsfd(fd); 1397*a9643ea8Slogwang epfd.EnableInput(); 1398*a9643ea8Slogwang epfd.SetOwnerThread(thread); 1399*a9643ea8Slogwang if (!mtframe->KqueueSchedule(NULL, &epfd, timeout)) { 1400*a9643ea8Slogwang return -3; 1401*a9643ea8Slogwang } 1402*a9643ea8Slogwang } 1403*a9643ea8Slogwang 1404*a9643ea8Slogwang return n; 1405*a9643ea8Slogwang } 1406*a9643ea8Slogwang 1407*a9643ea8Slogwang /** 1408*a9643ea8Slogwang * @brief �̰߳�����ϵͳIO���� write 1409*a9643ea8Slogwang * @param fd ϵͳsocket��Ϣ 1410*a9643ea8Slogwang * @param buf �����͵���Ϣָ�� 1411*a9643ea8Slogwang * @param nbyte �����͵���Ϣ���� 1412*a9643ea8Slogwang * @param timeout ��ȴ�ʱ��, ���� 1413*a9643ea8Slogwang * @return >0 �ɹ����ͳ���, <0 ʧ�� 1414*a9643ea8Slogwang */ 1415*a9643ea8Slogwang ssize_t MtFrame::write(int fd, const void *buf, size_t nbyte, int timeout) 1416*a9643ea8Slogwang { 1417*a9643ea8Slogwang MtFrame* mtframe = MtFrame::Instance(); 1418*a9643ea8Slogwang utime64_t start = mtframe->GetLastClock(); 1419*a9643ea8Slogwang MicroThread* thread = mtframe->GetActiveThread(); 1420*a9643ea8Slogwang utime64_t now = 0; 1421*a9643ea8Slogwang 1422*a9643ea8Slogwang if(fd<0 || !buf || nbyte<1) 1423*a9643ea8Slogwang { 1424*a9643ea8Slogwang errno = EINVAL; 1425*a9643ea8Slogwang MTLOG_ERROR("write failed, errno: %d (%m)", errno); 1426*a9643ea8Slogwang return -10; 1427*a9643ea8Slogwang } 1428*a9643ea8Slogwang 1429*a9643ea8Slogwang ssize_t n = 0; 1430*a9643ea8Slogwang size_t send_len = 0; 1431*a9643ea8Slogwang while (send_len < nbyte) 1432*a9643ea8Slogwang { 1433*a9643ea8Slogwang now = mtframe->GetLastClock(); 1434*a9643ea8Slogwang if ((int)(now - start) > timeout) 1435*a9643ea8Slogwang { 1436*a9643ea8Slogwang errno = ETIME; 1437*a9643ea8Slogwang return -1; 1438*a9643ea8Slogwang } 1439*a9643ea8Slogwang 1440*a9643ea8Slogwang mt_hook_syscall(write); 1441*a9643ea8Slogwang n = ff_hook_write(fd, (char*)buf + send_len, nbyte - send_len); 1442*a9643ea8Slogwang if (n < 0) 1443*a9643ea8Slogwang { 1444*a9643ea8Slogwang if (errno == EINTR) { 1445*a9643ea8Slogwang continue; 1446*a9643ea8Slogwang } 1447*a9643ea8Slogwang 1448*a9643ea8Slogwang if ((errno != EAGAIN) && (errno != EWOULDBLOCK)) { 1449*a9643ea8Slogwang MTLOG_ERROR("write failed, errno: %d", errno); 1450*a9643ea8Slogwang return -2; 1451*a9643ea8Slogwang } 1452*a9643ea8Slogwang } 1453*a9643ea8Slogwang else 1454*a9643ea8Slogwang { 1455*a9643ea8Slogwang send_len += n; 1456*a9643ea8Slogwang if (send_len >= nbyte) { 1457*a9643ea8Slogwang return nbyte; 1458*a9643ea8Slogwang } 1459*a9643ea8Slogwang } 1460*a9643ea8Slogwang 1461*a9643ea8Slogwang KqueuerObj epfd; 1462*a9643ea8Slogwang epfd.SetOsfd(fd); 1463*a9643ea8Slogwang epfd.EnableOutput(); 1464*a9643ea8Slogwang epfd.SetOwnerThread(thread); 1465*a9643ea8Slogwang if (!mtframe->KqueueSchedule(NULL, &epfd, timeout)) { 1466*a9643ea8Slogwang return -3; 1467*a9643ea8Slogwang } 1468*a9643ea8Slogwang } 1469*a9643ea8Slogwang 1470*a9643ea8Slogwang return nbyte; 1471*a9643ea8Slogwang } 1472*a9643ea8Slogwang 1473*a9643ea8Slogwang 1474*a9643ea8Slogwang /** 1475*a9643ea8Slogwang * @brief �̰߳�����ϵͳIO���� recv 1476*a9643ea8Slogwang * @param fd ϵͳsocket��Ϣ 1477*a9643ea8Slogwang * @param buf ������Ϣ������ָ�� 1478*a9643ea8Slogwang * @param len ������Ϣ���������� 1479*a9643ea8Slogwang * @param timeout ��ȴ�ʱ��, ���� 1480*a9643ea8Slogwang * @return >0 �ɹ����ճ���, <0 ʧ�� 1481*a9643ea8Slogwang */ 1482*a9643ea8Slogwang int MtFrame::recv(int fd, void *buf, int len, int flags, int timeout) 1483*a9643ea8Slogwang { 1484*a9643ea8Slogwang MtFrame* mtframe = MtFrame::Instance(); 1485*a9643ea8Slogwang utime64_t start = mtframe->GetLastClock(); 1486*a9643ea8Slogwang MicroThread* thread = mtframe->GetActiveThread(); 1487*a9643ea8Slogwang utime64_t now = 0; 1488*a9643ea8Slogwang 1489*a9643ea8Slogwang if(fd<0 || !buf || len<1) 1490*a9643ea8Slogwang { 1491*a9643ea8Slogwang errno = EINVAL; 1492*a9643ea8Slogwang MTLOG_ERROR("recv failed, errno: %d (%m)", errno); 1493*a9643ea8Slogwang return -10; 1494*a9643ea8Slogwang } 1495*a9643ea8Slogwang 1496*a9643ea8Slogwang if (timeout <= -1) 1497*a9643ea8Slogwang { 1498*a9643ea8Slogwang timeout = 0x7fffffff; 1499*a9643ea8Slogwang } 1500*a9643ea8Slogwang 1501*a9643ea8Slogwang while (true) 1502*a9643ea8Slogwang { 1503*a9643ea8Slogwang now = mtframe->GetLastClock(); 1504*a9643ea8Slogwang if ((int)(now - start) > timeout) 1505*a9643ea8Slogwang { 1506*a9643ea8Slogwang errno = ETIME; 1507*a9643ea8Slogwang return -1; 1508*a9643ea8Slogwang } 1509*a9643ea8Slogwang 1510*a9643ea8Slogwang KqueuerObj epfd; 1511*a9643ea8Slogwang epfd.SetOsfd(fd); 1512*a9643ea8Slogwang epfd.EnableInput(); 1513*a9643ea8Slogwang epfd.SetOwnerThread(thread); 1514*a9643ea8Slogwang if (!mtframe->KqueueSchedule(NULL, &epfd, timeout)) 1515*a9643ea8Slogwang { 1516*a9643ea8Slogwang MTLOG_DEBUG("epoll schedule failed, errno: %d", errno); 1517*a9643ea8Slogwang return -2; 1518*a9643ea8Slogwang } 1519*a9643ea8Slogwang 1520*a9643ea8Slogwang mt_hook_syscall(recv); 1521*a9643ea8Slogwang int n = ff_hook_recv(fd, buf, len, flags); 1522*a9643ea8Slogwang if (n < 0) 1523*a9643ea8Slogwang { 1524*a9643ea8Slogwang if (errno == EINTR) { 1525*a9643ea8Slogwang continue; 1526*a9643ea8Slogwang } 1527*a9643ea8Slogwang 1528*a9643ea8Slogwang if ((errno != EAGAIN) && (errno != EWOULDBLOCK)) 1529*a9643ea8Slogwang { 1530*a9643ea8Slogwang MTLOG_ERROR("recv failed, errno: %d", errno); 1531*a9643ea8Slogwang return -3; 1532*a9643ea8Slogwang } 1533*a9643ea8Slogwang } 1534*a9643ea8Slogwang else 1535*a9643ea8Slogwang { 1536*a9643ea8Slogwang return n; 1537*a9643ea8Slogwang } 1538*a9643ea8Slogwang } 1539*a9643ea8Slogwang 1540*a9643ea8Slogwang } 1541*a9643ea8Slogwang 1542*a9643ea8Slogwang /** 1543*a9643ea8Slogwang * @brief �̰߳�����ϵͳIO���� send 1544*a9643ea8Slogwang * @param fd ϵͳsocket��Ϣ 1545*a9643ea8Slogwang * @param buf �����͵���Ϣָ�� 1546*a9643ea8Slogwang * @param nbyte �����͵���Ϣ���� 1547*a9643ea8Slogwang * @param timeout ��ȴ�ʱ��, ���� 1548*a9643ea8Slogwang * @return >0 �ɹ����ͳ���, <0 ʧ�� 1549*a9643ea8Slogwang */ 1550*a9643ea8Slogwang ssize_t MtFrame::send(int fd, const void *buf, size_t nbyte, int flags, int timeout) 1551*a9643ea8Slogwang { 1552*a9643ea8Slogwang MtFrame* mtframe = MtFrame::Instance(); 1553*a9643ea8Slogwang utime64_t start = mtframe->GetLastClock(); 1554*a9643ea8Slogwang MicroThread* thread = mtframe->GetActiveThread(); 1555*a9643ea8Slogwang utime64_t now = 0; 1556*a9643ea8Slogwang 1557*a9643ea8Slogwang if(fd<0 || !buf || nbyte<1) 1558*a9643ea8Slogwang { 1559*a9643ea8Slogwang errno = EINVAL; 1560*a9643ea8Slogwang MTLOG_ERROR("send failed, errno: %d (%m)", errno); 1561*a9643ea8Slogwang return -10; 1562*a9643ea8Slogwang } 1563*a9643ea8Slogwang 1564*a9643ea8Slogwang ssize_t n = 0; 1565*a9643ea8Slogwang size_t send_len = 0; 1566*a9643ea8Slogwang while (send_len < nbyte) 1567*a9643ea8Slogwang { 1568*a9643ea8Slogwang now = mtframe->GetLastClock(); 1569*a9643ea8Slogwang if ((int)(now - start) > timeout) 1570*a9643ea8Slogwang { 1571*a9643ea8Slogwang errno = ETIME; 1572*a9643ea8Slogwang return -1; 1573*a9643ea8Slogwang } 1574*a9643ea8Slogwang 1575*a9643ea8Slogwang mt_hook_syscall(send); 1576*a9643ea8Slogwang n = ff_hook_send(fd, (char*)buf + send_len, nbyte - send_len, flags); 1577*a9643ea8Slogwang if (n < 0) 1578*a9643ea8Slogwang { 1579*a9643ea8Slogwang if (errno == EINTR) { 1580*a9643ea8Slogwang continue; 1581*a9643ea8Slogwang } 1582*a9643ea8Slogwang 1583*a9643ea8Slogwang if ((errno != EAGAIN) && (errno != EWOULDBLOCK)) { 1584*a9643ea8Slogwang MTLOG_ERROR("write failed, errno: %d", errno); 1585*a9643ea8Slogwang return -2; 1586*a9643ea8Slogwang } 1587*a9643ea8Slogwang } 1588*a9643ea8Slogwang else 1589*a9643ea8Slogwang { 1590*a9643ea8Slogwang send_len += n; 1591*a9643ea8Slogwang if (send_len >= nbyte) { 1592*a9643ea8Slogwang return nbyte; 1593*a9643ea8Slogwang } 1594*a9643ea8Slogwang } 1595*a9643ea8Slogwang 1596*a9643ea8Slogwang KqueuerObj epfd; 1597*a9643ea8Slogwang epfd.SetOsfd(fd); 1598*a9643ea8Slogwang epfd.EnableOutput(); 1599*a9643ea8Slogwang epfd.SetOwnerThread(thread); 1600*a9643ea8Slogwang if (!mtframe->KqueueSchedule(NULL, &epfd, timeout)) { 1601*a9643ea8Slogwang return -3; 1602*a9643ea8Slogwang } 1603*a9643ea8Slogwang } 1604*a9643ea8Slogwang 1605*a9643ea8Slogwang return nbyte; 1606*a9643ea8Slogwang } 1607*a9643ea8Slogwang 1608*a9643ea8Slogwang 1609*a9643ea8Slogwang 1610*a9643ea8Slogwang /** 1611*a9643ea8Slogwang * @brief �߳�����sleep�ӿ�, ��λms 1612*a9643ea8Slogwang */ 1613*a9643ea8Slogwang void MtFrame::sleep(int ms) 1614*a9643ea8Slogwang { 1615*a9643ea8Slogwang MtFrame* frame = MtFrame::Instance(); 1616*a9643ea8Slogwang MicroThread* thread = frame->GetActiveThread(); 1617*a9643ea8Slogwang if (thread != NULL) 1618*a9643ea8Slogwang { 1619*a9643ea8Slogwang thread->sleep(ms); 1620*a9643ea8Slogwang } 1621*a9643ea8Slogwang } 1622*a9643ea8Slogwang 1623*a9643ea8Slogwang /** 1624*a9643ea8Slogwang * @brief �̰߳�����ϵͳIO���� recv 1625*a9643ea8Slogwang * @param fd ϵͳsocket��Ϣ 1626*a9643ea8Slogwang * @param events �¼����� EPOLLIN or EPOLLOUT 1627*a9643ea8Slogwang * @param timeout ��ȴ�ʱ��, ���� 1628*a9643ea8Slogwang * @return >0 �ɹ����ճ���, <0 ʧ�� 1629*a9643ea8Slogwang */ 1630*a9643ea8Slogwang int MtFrame::WaitEvents(int fd, int events, int timeout) 1631*a9643ea8Slogwang { 1632*a9643ea8Slogwang MtFrame* mtframe = MtFrame::Instance(); 1633*a9643ea8Slogwang utime64_t start = mtframe->GetLastClock(); 1634*a9643ea8Slogwang MicroThread* thread = mtframe->GetActiveThread(); 1635*a9643ea8Slogwang utime64_t now = 0; 1636*a9643ea8Slogwang 1637*a9643ea8Slogwang if (timeout <= -1) 1638*a9643ea8Slogwang { 1639*a9643ea8Slogwang timeout = 0x7fffffff; 1640*a9643ea8Slogwang } 1641*a9643ea8Slogwang 1642*a9643ea8Slogwang while (true) 1643*a9643ea8Slogwang { 1644*a9643ea8Slogwang now = mtframe->GetLastClock(); 1645*a9643ea8Slogwang if ((int)(now - start) > timeout) 1646*a9643ea8Slogwang { 1647*a9643ea8Slogwang errno = ETIME; 1648*a9643ea8Slogwang return 0; 1649*a9643ea8Slogwang } 1650*a9643ea8Slogwang 1651*a9643ea8Slogwang KqueuerObj epfd; 1652*a9643ea8Slogwang epfd.SetOsfd(fd); 1653*a9643ea8Slogwang if (events & KQ_EVENT_READ) 1654*a9643ea8Slogwang { 1655*a9643ea8Slogwang epfd.EnableInput(); 1656*a9643ea8Slogwang } 1657*a9643ea8Slogwang if (events & KQ_EVENT_WRITE) 1658*a9643ea8Slogwang { 1659*a9643ea8Slogwang epfd.EnableOutput(); 1660*a9643ea8Slogwang } 1661*a9643ea8Slogwang epfd.SetOwnerThread(thread); 1662*a9643ea8Slogwang 1663*a9643ea8Slogwang if (!mtframe->KqueueSchedule(NULL, &epfd, timeout)) 1664*a9643ea8Slogwang { 1665*a9643ea8Slogwang MTLOG_TRACE("epoll schedule failed, errno: %d", errno); 1666*a9643ea8Slogwang return 0; 1667*a9643ea8Slogwang } 1668*a9643ea8Slogwang 1669*a9643ea8Slogwang return epfd.GetRcvEvents(); 1670*a9643ea8Slogwang } 1671*a9643ea8Slogwang } 1672*a9643ea8Slogwang 1673*a9643ea8Slogwang 1674