1
2 /**
3 * Tencent is pleased to support the open source community by making MSEC available.
4 *
5 * Copyright (C) 2016 THL A29 Limited, a Tencent company. All rights reserved.
6 *
7 * Licensed under the GNU General Public License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License. You may
9 * obtain a copy of the License at
10 *
11 * https://opensource.org/licenses/GPL-2.0
12 *
13 * Unless required by applicable law or agreed to in writing, software distributed under the
14 * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
15 * either express or implied. See the License for the specific language governing permissions
16 * and limitations under the License.
17 */
18
19
20 /**
21 * @filename micro_thread.cpp
22 * @info micro thread manager
23 */
24 #include "mt_version.h"
25 #include "micro_thread.h"
26 #include "mt_net.h"
27 #include "valgrind.h"
28 #include <assert.h>
29 #include "mt_sys_hook.h"
30 #include "ff_hook.h"
31 #include "ff_api.h"
32
33 using namespace NS_MICRO_THREAD;
34
35 #define ASSERT(statement)
36 //#define ASSERT(statement) assert(statement)
37
38 extern "C" int save_context(jmp_buf jbf);
39
40 extern "C" void restore_context(jmp_buf jbf, int ret);
41
42 extern "C" void replace_esp(jmp_buf jbf, void* esp);
43
Thread(int stack_size)44 Thread::Thread(int stack_size)
45 {
46 _stack_size = stack_size ? stack_size : ThreadPool::default_stack_size;
47 _wakeup_time = 0;
48 _stack = NULL;
49 memset(&_jmpbuf, 0, sizeof(_jmpbuf));
50 }
51
52 static DefaultLogAdapter def_log_adapt;
53 /**
54 * @brief LINUX x86/x86_64's allocated stacks.
55 */
InitStack()56 bool Thread::InitStack()
57 {
58 if (_stack) {
59 return true;
60 }
61
62 ///< stack index and memory are separated to prevent out of bounds.
63 _stack = (MtStack*)calloc(1, sizeof(MtStack));
64 if (NULL == _stack)
65 {
66 MTLOG_ERROR("calloc stack failed, size %u", sizeof(MtStack));
67 return false;
68 }
69
70 int memsize = MEM_PAGE_SIZE*2 + _stack_size;
71 memsize = (memsize + MEM_PAGE_SIZE - 1)/MEM_PAGE_SIZE*MEM_PAGE_SIZE;
72
73 static int zero_fd = -1;
74 int mmap_flags = MAP_PRIVATE | MAP_ANON;
75 void* vaddr = mmap(NULL, memsize, PROT_READ | PROT_WRITE, mmap_flags, zero_fd, 0);
76 if (vaddr == (void *)MAP_FAILED)
77 {
78 MTLOG_ERROR("mmap stack failed, size %d,errmsg: %s.", memsize,strerror(errno));
79 free(_stack);
80 _stack = NULL;
81 return false;
82 }
83 _stack->_vaddr = (char*)vaddr;
84 _stack->_vaddr_size = memsize;
85 _stack->_stk_size = _stack_size;
86 _stack->_stk_bottom = _stack->_vaddr + MEM_PAGE_SIZE;
87 _stack->_stk_top = _stack->_stk_bottom + _stack->_stk_size;
88 // valgrind support: register stack frame
89 _stack->valgrind_id = VALGRIND_STACK_REGISTER(_stack->_stk_bottom, _stack->_stk_top);
90
91 _stack->_esp = _stack->_stk_top - STACK_PAD_SIZE;
92
93 mprotect(_stack->_vaddr, MEM_PAGE_SIZE, PROT_NONE);
94 mprotect(_stack->_stk_top, MEM_PAGE_SIZE, PROT_NONE);
95
96 return true;
97 }
98
99
FreeStack()100 void Thread::FreeStack()
101 {
102 if (!_stack) {
103 return;
104 }
105 munmap(_stack->_vaddr, _stack->_vaddr_size);
106 // valgrind support: deregister stack frame
107 VALGRIND_STACK_DEREGISTER(_stack->valgrind_id);
108 free(_stack);
109 _stack = NULL;
110 }
111
InitContext()112 void Thread::InitContext()
113 {
114 if (save_context(_jmpbuf) != 0)
115 {
116 ScheduleObj::Instance()->ScheduleStartRun();
117 }
118
119 if (_stack != NULL)
120 {
121 replace_esp(_jmpbuf, _stack->_esp);
122 }
123 }
124
SwitchContext()125 void Thread::SwitchContext()
126 {
127 if (save_context(_jmpbuf) == 0)
128 {
129 ScheduleObj::Instance()->ScheduleThread();
130 }
131 }
132
133
SaveContext()134 int Thread::SaveContext()
135 {
136 return save_context(_jmpbuf);
137 }
138
RestoreContext()139 void Thread::RestoreContext()
140 {
141 restore_context(_jmpbuf, 1);
142 }
143
144
Initial()145 bool Thread::Initial()
146 {
147 if (!InitStack())
148 {
149 MTLOG_ERROR("init stack failed");
150 return false;
151 }
152
153 InitContext();
154
155 return true;
156 }
157
Destroy()158 void Thread::Destroy()
159 {
160 FreeStack();
161 memset(&_jmpbuf, 0, sizeof(_jmpbuf));
162 }
163
Reset()164 void Thread::Reset()
165 {
166 _wakeup_time = 0;
167 SetPrivate(NULL);
168
169 InitContext();
170 CleanState();
171 }
172
sleep(int ms)173 void Thread::sleep(int ms)
174 {
175 utime64_t now = ScheduleObj::Instance()->ScheduleGetTime();
176 _wakeup_time = now + ms;
177
178 if (save_context(_jmpbuf) == 0)
179 {
180 ScheduleObj::Instance()->ScheduleSleep();
181 }
182 }
183
Wait()184 void Thread::Wait()
185 {
186 if (save_context(_jmpbuf) == 0)
187 {
188 ScheduleObj::Instance()->SchedulePend();
189 }
190 }
191
CheckStackHealth(char * esp)192 bool Thread::CheckStackHealth(char *esp)
193 {
194 if (!_stack)
195 return false;
196
197 if (esp > _stack->_stk_bottom && esp < _stack->_stk_top)
198 return true;
199 else
200 return false;
201 }
202
MicroThread(ThreadType type)203 MicroThread::MicroThread(ThreadType type)
204 {
205 memset(&_entry, 0, sizeof(_entry));
206 TAILQ_INIT(&_fdset);
207 TAILQ_INIT(&_sub_list);
208 _flag = NOT_INLIST;
209 _type = type;
210 _state = INITIAL;
211 _start = NULL;
212 _args = NULL;
213 _parent = NULL;
214 }
215
CleanState()216 void MicroThread::CleanState()
217 {
218 TAILQ_INIT(&_fdset);
219 TAILQ_INIT(&_sub_list);
220 _flag = NOT_INLIST;
221 _type = NORMAL;
222 _state = INITIAL;
223 _start = NULL;
224 _args = NULL;
225 _parent = NULL;
226 }
227
Run()228 void MicroThread::Run()
229 {
230 if (_start) {
231 _start(_args);
232 }
233
234 if (this->IsSubThread()) {
235 this->WakeupParent();
236 }
237
238 ScheduleObj::Instance()->ScheduleReclaim();
239 ScheduleObj::Instance()->ScheduleThread();
240 }
241
WakeupParent()242 void MicroThread::WakeupParent()
243 {
244 MicroThread* parent = this->GetParent();
245 if (parent)
246 {
247 parent->RemoveSubThread(this);
248 if (parent->HasNoSubThread())
249 {
250 ScheduleObj::Instance()->ScheduleUnpend(parent);
251 }
252 }
253 else
254 {
255 MTLOG_ERROR("Sub thread no parent, error");
256 }
257 }
258
HasNoSubThread()259 bool MicroThread::HasNoSubThread()
260 {
261 return TAILQ_EMPTY(&_sub_list);
262 }
263
AddSubThread(MicroThread * sub)264 void MicroThread::AddSubThread(MicroThread* sub)
265 {
266 ASSERT(!sub->HasFlag(MicroThread::SUB_LIST));
267 if (!sub->HasFlag(MicroThread::SUB_LIST))
268 {
269 TAILQ_INSERT_TAIL(&_sub_list, sub, _sub_entry);
270 sub->_parent = this;
271 }
272
273 sub->SetFlag(MicroThread::SUB_LIST);
274 }
275
RemoveSubThread(MicroThread * sub)276 void MicroThread::RemoveSubThread(MicroThread* sub)
277 {
278 ASSERT(sub->HasFlag(MicroThread::SUB_LIST));
279 if (sub->HasFlag(MicroThread::SUB_LIST))
280 {
281 TAILQ_REMOVE(&_sub_list, sub, _sub_entry);
282 sub->_parent = NULL;
283 }
284
285 sub->UnsetFlag(MicroThread::SUB_LIST);
286 }
287
288 ScheduleObj *ScheduleObj::_instance = NULL;
Instance()289 inline ScheduleObj* ScheduleObj::Instance()
290 {
291 if (NULL == _instance)
292 {
293 _instance = new ScheduleObj();
294 }
295
296 return _instance;
297 }
298
ScheduleThread()299 void ScheduleObj::ScheduleThread()
300 {
301 MtFrame* frame = MtFrame::Instance();
302 frame->ThreadSchdule();
303 }
304
ScheduleGetTime()305 utime64_t ScheduleObj::ScheduleGetTime()
306 {
307 MtFrame* frame = MtFrame::Instance();
308 if (frame)
309 {
310 return frame->GetLastClock();
311 }
312 else
313 {
314 MTLOG_ERROR("frame time failed, maybe not init");
315 return 0;
316 }
317 }
318
ScheduleSleep()319 void ScheduleObj::ScheduleSleep()
320 {
321 MtFrame* frame = MtFrame::Instance();
322 MicroThread* thread = frame->GetActiveThread();
323 if ((!frame) || (!thread)) {
324 MTLOG_ERROR("frame and act thread null, %p, %p", frame, thread);
325 return;
326 }
327
328 frame->InsertSleep(thread);
329 frame->ThreadSchdule();
330 }
331
SchedulePend()332 void ScheduleObj::SchedulePend()
333 {
334 MtFrame* frame = MtFrame::Instance();
335 MicroThread* thread = frame->GetActiveThread();
336 if ((!frame) || (!thread)) {
337 MTLOG_ERROR("frame and act thread null, %p, %p", frame, thread);
338 return;
339 }
340
341 frame->InsertPend(thread);
342 frame->ThreadSchdule();
343 }
344
ScheduleUnpend(void * pthread)345 void ScheduleObj::ScheduleUnpend(void* pthread)
346 {
347 MtFrame* frame = MtFrame::Instance();
348 MicroThread* thread = (MicroThread*)pthread;
349 if ((!frame) || (!thread)) {
350 MTLOG_ERROR("frame and act thread null, %p, %p", frame, thread);
351 return;
352 }
353
354 frame->RemovePend(thread);
355 frame->InsertRunable(thread);
356 }
357
ScheduleReclaim()358 void ScheduleObj::ScheduleReclaim()
359 {
360 MtFrame* frame = MtFrame::Instance();
361 MicroThread* thread = frame->GetActiveThread();
362 if ((!frame) || (!thread)) {
363 MTLOG_ERROR("frame and act thread null, %p, %p", frame, thread);
364 return;
365 }
366
367 frame->FreeThread(thread);
368 }
369
ScheduleStartRun()370 void ScheduleObj::ScheduleStartRun()
371 {
372 MtFrame* frame = MtFrame::Instance();
373 MicroThread* thread = frame->GetActiveThread();
374 if ((!frame) || (!thread)) {
375 MTLOG_ERROR("frame and act thread null, %p, %p", frame, thread);
376 return;
377 }
378
379 thread->Run();
380 }
381
382
383 unsigned int ThreadPool::default_thread_num = DEFAULT_THREAD_NUM; ///< 2000 micro threads.
384 unsigned int ThreadPool::last_default_thread_num = DEFAULT_THREAD_NUM; ///< 2000 micro threads.
385 unsigned int ThreadPool::default_stack_size = DEFAULT_STACK_SIZE; ///< 128k stack.
386
InitialPool(int max_num)387 bool ThreadPool::InitialPool(int max_num)
388 {
389 MicroThread *thread = NULL;
390 for (unsigned int i = 0; i < default_thread_num; i++)
391 {
392 thread = new MicroThread();
393 if ((NULL == thread) || (false == thread->Initial()))
394 {
395 MTLOG_ERROR("init pool, thread %p init failed", thread);
396 if (thread) delete thread;
397 continue;
398 }
399 thread->SetFlag(MicroThread::FREE_LIST);
400 _freelist.push(thread);
401 }
402
403 _total_num = _freelist.size();
404 _max_num = max_num;
405 _use_num = 0;
406 if (_total_num <= 0)
407 {
408 return false;
409 }
410 else
411 {
412 return true;
413 }
414 }
415
DestroyPool()416 void ThreadPool::DestroyPool()
417 {
418 MicroThread* thread = NULL;
419 while (!_freelist.empty())
420 {
421 thread = _freelist.front();
422 _freelist.pop();
423 thread->Destroy();
424 delete thread;
425 }
426
427 _total_num = 0;
428 _use_num = 0;
429 }
430
AllocThread()431 MicroThread* ThreadPool::AllocThread()
432 {
433 MT_ATTR_API_SET(492069, _total_num);
434
435 MicroThread* thread = NULL;
436 if (!_freelist.empty())
437 {
438 thread = _freelist.front();
439 _freelist.pop();
440
441 ASSERT(thread->HasFlag(MicroThread::FREE_LIST));
442
443 thread->UnsetFlag(MicroThread::FREE_LIST);
444 _use_num++;
445 return thread;
446 }
447
448 MT_ATTR_API(320846, 1); // pool no nore
449 if (_total_num >= _max_num)
450 {
451 MT_ATTR_API(361140, 1); // no more quota
452 MTLOG_ERROR("total %d is outof max: %d", _total_num,_max_num);
453 return NULL;
454 }
455
456 thread = new MicroThread();
457 if ((NULL == thread) || (false == thread->Initial()))
458 {
459 MT_ATTR_API(320847, 1); // pool init fail
460 MTLOG_ERROR("thread alloc failed, thread: %p", thread);
461 if (thread) delete thread;
462 return NULL;
463 }
464 _total_num++;
465 _use_num++;
466 if(_use_num >(int) default_thread_num){
467 if(((int) default_thread_num * 2 )< _max_num){
468 last_default_thread_num = default_thread_num;
469 default_thread_num = default_thread_num * 2;
470 }
471 }
472
473 return thread;
474 }
475
FreeThread(MicroThread * thread)476 void ThreadPool::FreeThread(MicroThread* thread)
477 {
478 ASSERT(!thread->HasFlag(MicroThread::FREE_LIST));
479 thread->Reset();
480 _use_num--;
481 _freelist.push(thread);
482 thread->SetFlag(MicroThread::FREE_LIST);
483
484 unsigned int free_num = _freelist.size();
485 if ((free_num > default_thread_num) && (free_num > 1))
486 {
487 thread = _freelist.front();
488 _freelist.pop();
489 thread->Destroy();
490 delete thread;
491 _total_num--;
492 if(default_thread_num / 2 >= DEFAULT_THREAD_NUM){
493 last_default_thread_num = default_thread_num;
494 default_thread_num = default_thread_num / 2;
495 }
496 }
497 }
498
GetUsedNum(void)499 int ThreadPool::GetUsedNum(void)
500 {
501 return _use_num;
502 }
503
504 MtFrame *MtFrame::_instance = NULL;
Instance()505 inline MtFrame* MtFrame::Instance ()
506 {
507 if (NULL == _instance )
508 {
509 _instance = new MtFrame();
510 }
511
512 return _instance;
513 }
514
SetHookFlag()515 void MtFrame::SetHookFlag() {
516 mt_set_hook_flag();
517 };
518
InitFrame(LogAdapter * logadpt,int max_thread_num)519 bool MtFrame::InitFrame(LogAdapter* logadpt, int max_thread_num)
520 {
521 if(logadpt == NULL){
522 _log_adpt = &def_log_adapt;
523 }else{
524 _log_adpt = logadpt;
525 }
526
527 if ((this->InitKqueue(max_thread_num) < 0) || !this->InitialPool(max_thread_num))
528 {
529 MTLOG_ERROR("Init epoll or thread pool failed");
530 this->Destroy();
531 return false;
532 }
533 if (_sleeplist.HeapResize(max_thread_num * 2) < 0)
534 {
535 MTLOG_ERROR("Init heap list failed");
536 this->Destroy();
537 return false;
538 }
539
540 _timer = new CTimerMng(max_thread_num * 2);
541 if (NULL == _timer)
542 {
543 MTLOG_ERROR("Init heap timer failed");
544 this->Destroy();
545 return false;
546 }
547
548 _daemon = AllocThread();
549 if (NULL == _daemon)
550 {
551 MTLOG_ERROR("Alloc daemon thread failed");
552 this->Destroy();
553 return false;
554 }
555 _daemon->SetType(MicroThread::DAEMON);
556 _daemon->SetState(MicroThread::RUNABLE);
557 _daemon->SetSartFunc(MtFrame::DaemonRun, this);
558
559 _primo = new MicroThread(MicroThread::PRIMORDIAL);
560 if (NULL == _primo)
561 {
562 MTLOG_ERROR("new _primo thread failed");
563 this->Destroy();
564 return false;
565 }
566 _primo->SetState(MicroThread::RUNNING);
567 SetActiveThread(_primo);
568
569 _last_clock = GetSystemMS();
570 TAILQ_INIT(&_iolist);
571 TAILQ_INIT(&_pend_list);
572
573 //SetHookFlag();
574
575 return true;
576
577 }
578
Destroy(void)579 void MtFrame::Destroy(void)
580 {
581 if (NULL == _instance )
582 {
583 return;
584 }
585
586 if (_primo) {
587 delete _primo;
588 _primo = NULL;
589 }
590
591 if (_daemon) {
592 FreeThread(_daemon);
593 _daemon = NULL;
594 }
595
596 TAILQ_INIT(&_iolist);
597
598 MicroThread* thread = dynamic_cast<MicroThread*>(_sleeplist.HeapPop());
599 while (thread)
600 {
601 FreeThread(thread);
602 thread = dynamic_cast<MicroThread*>(_sleeplist.HeapPop());
603 }
604
605 while (!_runlist.empty())
606 {
607 thread = _runlist.front();
608 _runlist.pop();
609 FreeThread(thread);
610 }
611
612 MicroThread* tmp;
613 TAILQ_FOREACH_SAFE(thread, &_pend_list, _entry, tmp)
614 {
615 TAILQ_REMOVE(&_pend_list, thread, _entry);
616 FreeThread(thread);
617 }
618
619 if (_timer != NULL)
620 {
621 delete _timer;
622 _timer = NULL;
623 }
624
625 _instance->DestroyPool();
626 _instance->TermKqueue();
627 delete _instance;
628 _instance = NULL;
629 }
630
Version()631 char* MtFrame::Version()
632 {
633 return IMT_VERSION;
634 }
635
CreateThread(ThreadStart entry,void * args,bool runable)636 MicroThread* MtFrame::CreateThread(ThreadStart entry, void *args, bool runable)
637 {
638 MtFrame* mtframe = MtFrame::Instance();
639 MicroThread* thread = mtframe->AllocThread();
640 if (NULL == thread)
641 {
642 MTLOG_ERROR("create thread failed");
643 return NULL;
644 }
645 thread->SetSartFunc(entry, args);
646
647 if (runable) {
648 mtframe->InsertRunable(thread);
649 }
650
651 return thread;
652 }
653
Loop(void * args)654 int MtFrame::Loop(void* args)
655 {
656 MtFrame* mtframe = MtFrame::Instance();
657 MicroThread* daemon = mtframe->DaemonThread();
658
659 mtframe->KqueueDispatch();
660 mtframe->SetLastClock(mtframe->GetSystemMS());
661 mtframe->WakeupTimeout();
662 mtframe->CheckExpired();
663 daemon->SwitchContext();
664
665 return 0;
666 }
667
DaemonRun(void * args)668 void MtFrame::DaemonRun(void* args)
669 {
670 /*
671 MtFrame* mtframe = MtFrame::Instance();
672 MicroThread* daemon = mtframe->DaemonThread();
673
674 while (true) {
675 mtframe->KqueueDispatch();
676 mtframe->SetLastClock(mtframe->GetSystemMS());
677 mtframe->WakeupTimeout();
678 mtframe->CheckExpired();
679 daemon->SwitchContext();
680 }
681 */
682 ff_run(MtFrame::Loop, NULL);
683 }
684
GetRootThread()685 MicroThread *MtFrame::GetRootThread()
686 {
687 if (NULL == _curr_thread)
688 {
689 return NULL;
690 }
691
692 MicroThread::ThreadType type = _curr_thread->GetType();
693 MicroThread *thread = _curr_thread;
694 MicroThread *parent = thread;
695
696 while (MicroThread::SUB_THREAD == type)
697 {
698 thread = thread->GetParent();
699 if (!thread)
700 {
701 break;
702 }
703
704 type = thread->GetType();
705 parent = thread;
706 }
707
708 return parent;
709 }
710
ThreadSchdule()711 void MtFrame::ThreadSchdule()
712 {
713 MicroThread* thread = NULL;
714 MtFrame* mtframe = MtFrame::Instance();
715
716 if (mtframe->_runlist.empty())
717 {
718 thread = mtframe->DaemonThread();
719 }
720 else
721 {
722 thread = mtframe->_runlist.front();
723 mtframe->RemoveRunable(thread);
724 }
725
726 this->SetActiveThread(thread);
727 thread->SetState(MicroThread::RUNNING);
728 thread->RestoreContext();
729 }
730
CheckExpired()731 void MtFrame::CheckExpired()
732 {
733 static utime64_t check_time = 0;
734
735 if (_timer != NULL)
736 {
737 _timer->check_expired();
738 }
739
740 utime64_t now = GetLastClock();
741
742 if ((now - check_time) > 1000)
743 {
744 CNetMgr::Instance()->RecycleObjs(now);
745 check_time = now;
746 }
747 }
748
WakeupTimeout()749 void MtFrame::WakeupTimeout()
750 {
751 utime64_t now = GetLastClock();
752 MicroThread* thread = dynamic_cast<MicroThread*>(_sleeplist.HeapTop());
753 while (thread && (thread->GetWakeupTime() <= now))
754 {
755 if (thread->HasFlag(MicroThread::IO_LIST))
756 {
757 RemoveIoWait(thread);
758 }
759 else
760 {
761 RemoveSleep(thread);
762 }
763
764 InsertRunable(thread);
765
766 thread = dynamic_cast<MicroThread*>(_sleeplist.HeapTop());
767 }
768 }
769
KqueueGetTimeout()770 int MtFrame::KqueueGetTimeout()
771 {
772 utime64_t now = GetLastClock();
773 MicroThread* thread = dynamic_cast<MicroThread*>(_sleeplist.HeapTop());
774 if (!thread)
775 {
776 return 10; //default 10ms epollwait
777 }
778 else if (thread->GetWakeupTime() < now)
779 {
780 return 0;
781 }
782 else
783 {
784 return (int)(thread->GetWakeupTime() - now);
785 }
786 }
787
InsertSleep(MicroThread * thread)788 inline void MtFrame::InsertSleep(MicroThread* thread)
789 {
790 ASSERT(!thread->HasFlag(MicroThread::SLEEP_LIST));
791
792 thread->SetFlag(MicroThread::SLEEP_LIST);
793 thread->SetState(MicroThread::SLEEPING);
794 int rc = _sleeplist.HeapPush(thread);
795 if (rc < 0)
796 {
797 MT_ATTR_API(320848, 1); // heap error
798 MTLOG_ERROR("Insert heap failed , rc %d", rc);
799 }
800 }
801
RemoveSleep(MicroThread * thread)802 inline void MtFrame::RemoveSleep(MicroThread* thread)
803 {
804 ASSERT(thread->HasFlag(MicroThread::SLEEP_LIST));
805 thread->UnsetFlag(MicroThread::SLEEP_LIST);
806
807 int rc = _sleeplist.HeapDelete(thread);
808 if (rc < 0)
809 {
810 MT_ATTR_API(320849, 1); // heap error
811 MTLOG_ERROR("remove heap failed , rc %d", rc);
812 }
813 }
814
InsertIoWait(MicroThread * thread)815 inline void MtFrame::InsertIoWait(MicroThread* thread)
816 {
817 ASSERT(!thread->HasFlag(MicroThread::IO_LIST));
818 thread->SetFlag(MicroThread::IO_LIST);
819 TAILQ_INSERT_TAIL(&_iolist, thread, _entry);
820 InsertSleep(thread);
821 }
822
RemoveIoWait(MicroThread * thread)823 void MtFrame::RemoveIoWait(MicroThread* thread)
824 {
825 ASSERT(thread->HasFlag(MicroThread::IO_LIST));
826 thread->UnsetFlag(MicroThread::IO_LIST);
827 TAILQ_REMOVE(&_iolist, thread, _entry);
828
829 RemoveSleep(thread);
830 }
831
InsertRunable(MicroThread * thread)832 void MtFrame::InsertRunable(MicroThread* thread)
833 {
834 ASSERT(!thread->HasFlag(MicroThread::RUN_LIST));
835 thread->SetFlag(MicroThread::RUN_LIST);
836
837 thread->SetState(MicroThread::RUNABLE);
838 _runlist.push(thread);
839 _waitnum++;
840 }
841
RemoveRunable(MicroThread * thread)842 inline void MtFrame::RemoveRunable(MicroThread* thread)
843 {
844 ASSERT(thread->HasFlag(MicroThread::RUN_LIST));
845 ASSERT(thread == _runlist.front());
846 thread->UnsetFlag(MicroThread::RUN_LIST);
847
848 _runlist.pop();
849 _waitnum--;
850 }
851
InsertPend(MicroThread * thread)852 void MtFrame::InsertPend(MicroThread* thread)
853 {
854 ASSERT(!thread->HasFlag(MicroThread::PEND_LIST));
855 thread->SetFlag(MicroThread::PEND_LIST);
856 TAILQ_INSERT_TAIL(&_pend_list, thread, _entry);
857 thread->SetState(MicroThread::PENDING);
858 }
859
RemovePend(MicroThread * thread)860 void MtFrame::RemovePend(MicroThread* thread)
861 {
862 ASSERT(thread->HasFlag(MicroThread::PEND_LIST));
863 thread->UnsetFlag(MicroThread::PEND_LIST);
864 TAILQ_REMOVE(&_pend_list, thread, _entry);
865 }
866
WaitNotify(utime64_t timeout)867 void MtFrame::WaitNotify(utime64_t timeout)
868 {
869 MicroThread* thread = GetActiveThread();
870
871 thread->SetWakeupTime(timeout + this->GetLastClock());
872 this->InsertIoWait(thread);
873 thread->SwitchContext();
874 }
875
NotifyThread(MicroThread * thread)876 void MtFrame::NotifyThread(MicroThread* thread)
877 {
878 if(thread == NULL){
879 return;
880 }
881 MicroThread* cur_thread = GetActiveThread();
882 if (thread->HasFlag(MicroThread::IO_LIST))
883 {
884 this->RemoveIoWait(thread);
885 if(cur_thread == this->DaemonThread()){
886 // ���ﲻֱ���еĻ�,���Dz���ʱ,�ᵼ��Ŀ���̵߳ȴ�����ʱ
887 if(cur_thread->SaveContext() == 0){
888 this->SetActiveThread(thread);
889 thread->SetState(MicroThread::RUNNING);
890 thread->RestoreContext();
891 }
892 }else{
893 this->InsertRunable(thread);
894 }
895 }
896 }
897
SwapDaemonThread()898 void MtFrame::SwapDaemonThread()
899 {
900 MicroThread* thread = GetActiveThread();
901 MicroThread* daemon_thread = this->DaemonThread();
902 if(thread != daemon_thread){
903 if(thread->SaveContext() == 0){
904 this->InsertRunable(thread);
905 this->SetActiveThread(daemon_thread);
906 daemon_thread->SetState(MicroThread::RUNNING);
907 daemon_thread->RestoreContext();
908 }
909 }
910 }
911
KqueueSchedule(KqObjList * fdlist,KqueuerObj * fd,int timeout)912 bool MtFrame::KqueueSchedule(KqObjList* fdlist, KqueuerObj* fd, int timeout)
913 {
914 MicroThread* thread = GetActiveThread();
915 if (NULL == thread)
916 {
917 MTLOG_ERROR("active thread null, epoll schedule failed");
918 return false;
919 }
920
921 thread->ClearAllFd();
922 if (fdlist)
923 {
924 thread->AddFdList(fdlist);
925 }
926 if (fd)
927 {
928 thread->AddFd(fd);
929 }
930
931 thread->SetWakeupTime(timeout + this->GetLastClock());
932 if (!this->KqueueAdd(thread->GetFdSet()))
933 {
934 MTLOG_ERROR("epoll add failed, errno: %d", errno);
935 return false;
936 }
937 this->InsertIoWait(thread);
938 thread->SwitchContext();
939
940 int rcvnum = 0;
941 KqObjList& rcvfds = thread->GetFdSet();
942 KqueuerObj* fdata = NULL;
943 TAILQ_FOREACH(fdata, &rcvfds, _entry)
944 {
945 if (fdata->GetRcvEvents() != 0)
946 {
947 rcvnum++;
948 }
949 }
950 this->KqueueDel(rcvfds);
951
952 if (rcvnum == 0)
953 {
954 errno = ETIME;
955 return false;
956 }
957
958 return true;
959 }
960
recvfrom(int fd,void * buf,int len,int flags,struct sockaddr * from,socklen_t * fromlen,int timeout)961 int MtFrame::recvfrom(int fd, void *buf, int len, int flags, struct sockaddr *from, socklen_t *fromlen, int timeout)
962 {
963 MtFrame* mtframe = MtFrame::Instance();
964 utime64_t start = mtframe->GetLastClock();
965 MicroThread* thread = mtframe->GetActiveThread();
966 utime64_t now = 0;
967
968 if(fd<0 || !buf || len<1)
969 {
970 errno = EINVAL;
971 MTLOG_ERROR("recvfrom failed, errno: %d (%m)", errno);
972 return -10;
973 }
974
975 if (timeout <= -1)
976 {
977 timeout = 0x7fffffff;
978 }
979
980 while (true)
981 {
982 now = mtframe->GetLastClock();
983 if ((int)(now - start) > timeout)
984 {
985 errno = ETIME;
986 return -1;
987 }
988
989 KqueuerObj epfd;
990 epfd.SetOsfd(fd);
991 epfd.EnableInput();
992 epfd.SetOwnerThread(thread);
993 if (!mtframe->KqueueSchedule(NULL, &epfd, timeout))
994 {
995 MTLOG_DEBUG("epoll schedule failed, errno: %d", errno);
996 return -2;
997 }
998
999 mt_hook_syscall(recvfrom);
1000 int n = ff_hook_recvfrom(fd, buf, len, flags, from, fromlen);
1001 if (n < 0)
1002 {
1003 if (errno == EINTR) {
1004 continue;
1005 }
1006
1007 if ((errno != EAGAIN) && (errno != EWOULDBLOCK))
1008 {
1009 MTLOG_ERROR("recvfrom failed, errno: %d", errno);
1010 return -3;
1011 }
1012 }
1013 else
1014 {
1015 return n;
1016 }
1017 }
1018
1019 }
1020
sendto(int fd,const void * msg,int len,int flags,const struct sockaddr * to,int tolen,int timeout)1021 int MtFrame::sendto(int fd, const void *msg, int len, int flags, const struct sockaddr *to, int tolen, int timeout)
1022 {
1023 MtFrame* mtframe = MtFrame::Instance();
1024 utime64_t start = mtframe->GetLastClock();
1025 MicroThread* thread = mtframe->GetActiveThread();
1026 utime64_t now = 0;
1027
1028 if(fd<0 || !msg || len<1)
1029 {
1030 errno = EINVAL;
1031 MTLOG_ERROR("sendto failed, errno: %d (%m)", errno);
1032 return -10;
1033 }
1034
1035 int n = 0;
1036 mt_hook_syscall(sendto);
1037 while ((n = ff_hook_sendto(fd, msg, len, flags, to, tolen)) < 0)
1038 {
1039 now = mtframe->GetLastClock();
1040 if ((int)(now - start) > timeout)
1041 {
1042 errno = ETIME;
1043 return -1;
1044 }
1045
1046 if (errno == EINTR) {
1047 continue;
1048 }
1049
1050 if ((errno != EAGAIN) && (errno != EWOULDBLOCK)) {
1051 MTLOG_ERROR("sendto failed, errno: %d", errno);
1052 return -2;
1053 }
1054
1055 KqueuerObj epfd;
1056 epfd.SetOsfd(fd);
1057 epfd.EnableOutput();
1058 epfd.SetOwnerThread(thread);
1059 if (!mtframe->KqueueSchedule(NULL, &epfd, timeout)) {
1060 return -3;
1061 }
1062 }
1063
1064 return n;
1065 }
1066
connect(int fd,const struct sockaddr * addr,int addrlen,int timeout)1067 int MtFrame::connect(int fd, const struct sockaddr *addr, int addrlen, int timeout)
1068 {
1069 MtFrame* mtframe = MtFrame::Instance();
1070 utime64_t start = mtframe->GetLastClock();
1071 MicroThread* thread = mtframe->GetActiveThread();
1072 utime64_t now = 0;
1073
1074 if(fd<0 || !addr || addrlen<1)
1075 {
1076 errno = EINVAL;
1077 MTLOG_ERROR("connect failed, errno: %d (%m)", errno);
1078 return -10;
1079 }
1080
1081 int n = 0;
1082 mt_hook_syscall(connect);
1083 while ((n = ff_hook_connect(fd, addr, addrlen)) < 0)
1084 {
1085 now = mtframe->GetLastClock();
1086 if ((int)(now - start) > timeout)
1087 {
1088 errno = ETIME;
1089 return -1;
1090 }
1091
1092 if (errno == EISCONN)
1093 {
1094 return 0;
1095 }
1096
1097 if (errno == EINTR) {
1098 continue;
1099 }
1100
1101 if (errno != EINPROGRESS) {
1102 MTLOG_ERROR("connect failed, errno: %d", errno);
1103 return -2;
1104 }
1105
1106 KqueuerObj epfd;
1107 epfd.SetOsfd(fd);
1108 epfd.EnableOutput();
1109 epfd.SetOwnerThread(thread);
1110 if (!mtframe->KqueueSchedule(NULL, &epfd, timeout)) {
1111 return -3;
1112 }
1113 }
1114
1115 return n;
1116 }
1117
accept(int fd,struct sockaddr * addr,socklen_t * addrlen,int timeout)1118 int MtFrame::accept(int fd, struct sockaddr *addr, socklen_t *addrlen, int timeout)
1119 {
1120 MtFrame* mtframe = MtFrame::Instance();
1121 utime64_t start = mtframe->GetLastClock();
1122 MicroThread* thread = mtframe->GetActiveThread();
1123 utime64_t now = 0;
1124
1125 if(fd<0)
1126 {
1127 errno = EINVAL;
1128 MTLOG_ERROR("accept failed, errno: %d (%m)", errno);
1129 return -10;
1130 }
1131
1132 int acceptfd = 0;
1133 mt_hook_syscall(accept);
1134 while ((acceptfd = ff_hook_accept(fd, addr, addrlen)) < 0)
1135 {
1136 now = mtframe->GetLastClock();
1137 if ((int)(now - start) > timeout)
1138 {
1139 errno = ETIME;
1140 return -1;
1141 }
1142
1143 if (errno == EINTR) {
1144 continue;
1145 }
1146
1147 if (!((errno == EAGAIN) || (errno == EWOULDBLOCK))) {
1148 MTLOG_ERROR("accept failed, errno: %d", errno);
1149 return -2;
1150 }
1151
1152 KqueuerObj epfd;
1153 epfd.SetOsfd(fd);
1154 epfd.EnableInput();
1155 epfd.SetOwnerThread(thread);
1156 if (!mtframe->KqueueSchedule(NULL, &epfd, timeout)) {
1157 return -3;
1158 }
1159 }
1160
1161 return acceptfd;
1162 }
1163
read(int fd,void * buf,size_t nbyte,int timeout)1164 ssize_t MtFrame::read(int fd, void *buf, size_t nbyte, int timeout)
1165 {
1166 MtFrame* mtframe = MtFrame::Instance();
1167 utime64_t start = mtframe->GetLastClock();
1168 MicroThread* thread = mtframe->GetActiveThread();
1169 utime64_t now = 0;
1170
1171 if(fd<0 || !buf || nbyte<1)
1172 {
1173 errno = EINVAL;
1174 MTLOG_ERROR("read failed, errno: %d (%m)", errno);
1175 return -10;
1176 }
1177
1178 ssize_t n = 0;
1179 mt_hook_syscall(read);
1180 while ((n = ff_hook_read(fd, buf, nbyte)) < 0)
1181 {
1182 now = mtframe->GetLastClock();
1183 if ((int)(now - start) > timeout)
1184 {
1185 errno = ETIME;
1186 return -1;
1187 }
1188
1189 if (errno == EINTR) {
1190 continue;
1191 }
1192
1193 if ((errno != EAGAIN) && (errno != EWOULDBLOCK)) {
1194 MTLOG_ERROR("read failed, errno: %d", errno);
1195 return -2;
1196 }
1197
1198 KqueuerObj epfd;
1199 epfd.SetOsfd(fd);
1200 epfd.EnableInput();
1201 epfd.SetOwnerThread(thread);
1202 if (!mtframe->KqueueSchedule(NULL, &epfd, timeout)) {
1203 return -3;
1204 }
1205 }
1206
1207 return n;
1208 }
1209
write(int fd,const void * buf,size_t nbyte,int timeout)1210 ssize_t MtFrame::write(int fd, const void *buf, size_t nbyte, int timeout)
1211 {
1212 MtFrame* mtframe = MtFrame::Instance();
1213 utime64_t start = mtframe->GetLastClock();
1214 MicroThread* thread = mtframe->GetActiveThread();
1215 utime64_t now = 0;
1216
1217 if(fd<0 || !buf || nbyte<1)
1218 {
1219 errno = EINVAL;
1220 MTLOG_ERROR("write failed, errno: %d (%m)", errno);
1221 return -10;
1222 }
1223
1224 ssize_t n = 0;
1225 size_t send_len = 0;
1226 while (send_len < nbyte)
1227 {
1228 now = mtframe->GetLastClock();
1229 if ((int)(now - start) > timeout)
1230 {
1231 errno = ETIME;
1232 return -1;
1233 }
1234
1235 mt_hook_syscall(write);
1236 n = ff_hook_write(fd, (char*)buf + send_len, nbyte - send_len);
1237 if (n < 0)
1238 {
1239 if (errno == EINTR) {
1240 continue;
1241 }
1242
1243 if ((errno != EAGAIN) && (errno != EWOULDBLOCK)) {
1244 MTLOG_ERROR("write failed, errno: %d", errno);
1245 return -2;
1246 }
1247 }
1248 else
1249 {
1250 send_len += n;
1251 if (send_len >= nbyte) {
1252 return nbyte;
1253 }
1254 }
1255
1256 KqueuerObj epfd;
1257 epfd.SetOsfd(fd);
1258 epfd.EnableOutput();
1259 epfd.SetOwnerThread(thread);
1260 if (!mtframe->KqueueSchedule(NULL, &epfd, timeout)) {
1261 return -3;
1262 }
1263 }
1264
1265 return nbyte;
1266 }
1267
recv(int fd,void * buf,int len,int flags,int timeout)1268 int MtFrame::recv(int fd, void *buf, int len, int flags, int timeout)
1269 {
1270 MtFrame* mtframe = MtFrame::Instance();
1271 utime64_t start = mtframe->GetLastClock();
1272 MicroThread* thread = mtframe->GetActiveThread();
1273 utime64_t now = 0;
1274
1275 if(fd<0 || !buf || len<1)
1276 {
1277 errno = EINVAL;
1278 MTLOG_ERROR("recv failed, errno: %d (%m)", errno);
1279 return -10;
1280 }
1281
1282 if (timeout <= -1)
1283 {
1284 timeout = 0x7fffffff;
1285 }
1286
1287 while (true)
1288 {
1289 now = mtframe->GetLastClock();
1290 if ((int)(now - start) > timeout)
1291 {
1292 errno = ETIME;
1293 return -1;
1294 }
1295
1296 KqueuerObj epfd;
1297 epfd.SetOsfd(fd);
1298 epfd.EnableInput();
1299 epfd.SetOwnerThread(thread);
1300 if (!mtframe->KqueueSchedule(NULL, &epfd, timeout))
1301 {
1302 MTLOG_DEBUG("epoll schedule failed, errno: %d", errno);
1303 return -2;
1304 }
1305
1306 mt_hook_syscall(recv);
1307 int n = ff_hook_recv(fd, buf, len, flags);
1308 if (n < 0)
1309 {
1310 if (errno == EINTR) {
1311 continue;
1312 }
1313
1314 if ((errno != EAGAIN) && (errno != EWOULDBLOCK))
1315 {
1316 MTLOG_ERROR("recv failed, errno: %d", errno);
1317 return -3;
1318 }
1319 }
1320 else
1321 {
1322 return n;
1323 }
1324 }
1325
1326 }
1327
send(int fd,const void * buf,size_t nbyte,int flags,int timeout)1328 ssize_t MtFrame::send(int fd, const void *buf, size_t nbyte, int flags, int timeout)
1329 {
1330 MtFrame* mtframe = MtFrame::Instance();
1331 utime64_t start = mtframe->GetLastClock();
1332 MicroThread* thread = mtframe->GetActiveThread();
1333 utime64_t now = 0;
1334
1335 if(fd<0 || !buf || nbyte<1)
1336 {
1337 errno = EINVAL;
1338 MTLOG_ERROR("send failed, errno: %d (%m)", errno);
1339 return -10;
1340 }
1341
1342 ssize_t n = 0;
1343 size_t send_len = 0;
1344 while (send_len < nbyte)
1345 {
1346 now = mtframe->GetLastClock();
1347 if ((int)(now - start) > timeout)
1348 {
1349 errno = ETIME;
1350 return -1;
1351 }
1352
1353 mt_hook_syscall(send);
1354 n = ff_hook_send(fd, (char*)buf + send_len, nbyte - send_len, flags);
1355 if (n < 0)
1356 {
1357 if (errno == EINTR) {
1358 continue;
1359 }
1360
1361 if ((errno != EAGAIN) && (errno != EWOULDBLOCK)) {
1362 MTLOG_ERROR("write failed, errno: %d", errno);
1363 return -2;
1364 }
1365 }
1366 else
1367 {
1368 send_len += n;
1369 if (send_len >= nbyte) {
1370 return nbyte;
1371 }
1372 }
1373
1374 KqueuerObj epfd;
1375 epfd.SetOsfd(fd);
1376 epfd.EnableOutput();
1377 epfd.SetOwnerThread(thread);
1378 if (!mtframe->KqueueSchedule(NULL, &epfd, timeout)) {
1379 return -3;
1380 }
1381 }
1382
1383 return nbyte;
1384 }
1385
sleep(int ms)1386 void MtFrame::sleep(int ms)
1387 {
1388 MtFrame* frame = MtFrame::Instance();
1389 MicroThread* thread = frame->GetActiveThread();
1390 if (thread != NULL)
1391 {
1392 thread->sleep(ms);
1393 }
1394 }
1395
WaitEvents(int fd,int events,int timeout)1396 int MtFrame::WaitEvents(int fd, int events, int timeout)
1397 {
1398 MtFrame* mtframe = MtFrame::Instance();
1399 utime64_t start = mtframe->GetLastClock();
1400 MicroThread* thread = mtframe->GetActiveThread();
1401 utime64_t now = 0;
1402
1403 if (timeout <= -1)
1404 {
1405 timeout = 0x7fffffff;
1406 }
1407
1408 while (true)
1409 {
1410 now = mtframe->GetLastClock();
1411 if ((int)(now - start) > timeout)
1412 {
1413 errno = ETIME;
1414 return 0;
1415 }
1416
1417 KqueuerObj epfd;
1418 epfd.SetOsfd(fd);
1419 if (events & KQ_EVENT_READ)
1420 {
1421 epfd.EnableInput();
1422 }
1423 if (events & KQ_EVENT_WRITE)
1424 {
1425 epfd.EnableOutput();
1426 }
1427 epfd.SetOwnerThread(thread);
1428
1429 if (!mtframe->KqueueSchedule(NULL, &epfd, timeout))
1430 {
1431 MTLOG_TRACE("epoll schedule failed, errno: %d", errno);
1432 return 0;
1433 }
1434
1435 return epfd.GetRcvEvents();
1436 }
1437 }
1438