1
2 /**
3 * Tencent is pleased to support the open source community by making MSEC available.
4 *
5 * Copyright (C) 2016 THL A29 Limited, a Tencent company. All rights reserved.
6 *
7 * Licensed under the GNU General Public License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License. You may
9 * obtain a copy of the License at
10 *
11 * https://opensource.org/licenses/GPL-2.0
12 *
13 * Unless required by applicable law or agreed to in writing, software distributed under the
14 * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
15 * either express or implied. See the License for the specific language governing permissions
16 * and limitations under the License.
17 */
18
19
20 /**
21 * @filename mt_sys_call.cpp
22 */
23
24 #include "kqueue_proxy.h"
25 #include "micro_thread.h"
26 #include "mt_connection.h"
27 #include "mt_api.h"
28 #include "ff_api.h"
29 #include "mt_sys_hook.h"
30
31 namespace NS_MICRO_THREAD {
32
mt_udpsendrcv(struct sockaddr_in * dst,void * pkg,int len,void * rcv_buf,int & buf_size,int timeout)33 int mt_udpsendrcv(struct sockaddr_in* dst, void* pkg, int len, void* rcv_buf, int& buf_size, int timeout)
34 {
35 int ret = 0;
36 int rc = 0;
37 int flags = 1;
38 struct sockaddr_in from_addr = {0};
39 int addr_len = sizeof(from_addr);
40
41 if(len<1 || buf_size<1 ||!dst || !pkg || !rcv_buf)
42 {
43 MTLOG_ERROR("mt_udpsendrcv input params invalid, dst[%p], pkg[%p], rcv_buf[%p], len[%d], buf_size[%d]",
44 dst, pkg, rcv_buf, len, buf_size);
45 return -10;
46 }
47
48 int sock = socket(PF_INET, SOCK_DGRAM, 0);
49 if ((sock < 0) || (ioctl(sock, FIONBIO, &flags) < 0))
50 {
51 MT_ATTR_API(320842, 1);
52 MTLOG_ERROR("mt_udpsendrcv new sock failed, sock: %d, errno: %d (%m)", sock, errno);
53 ret = -1;
54 goto EXIT_LABEL;
55 }
56
57 rc = MtFrame::sendto(sock, pkg, len, 0, (struct sockaddr*)dst, (int)sizeof(*dst), timeout);
58 if (rc < 0)
59 {
60 MT_ATTR_API(320844, 1);
61 MTLOG_ERROR("mt_udpsendrcv send failed, rc: %d, errno: %d (%m)", rc, errno);
62 ret = -2;
63 goto EXIT_LABEL;
64 }
65
66 rc = MtFrame::recvfrom(sock, rcv_buf, buf_size, 0, (struct sockaddr*)&from_addr, (socklen_t*)&addr_len, timeout);
67 if (rc < 0)
68 {
69 MT_ATTR_API(320845, 1);
70 MTLOG_ERROR("mt_udpsendrcv recv failed, rc: %d, errno: %d (%m)", rc, errno);
71 ret = -3;
72 goto EXIT_LABEL;
73 }
74 buf_size = rc;
75
76 EXIT_LABEL:
77
78 if (sock > 0)
79 {
80 close(sock);
81 sock = -1;
82 }
83
84 return ret;
85 }
86
mt_tcp_create_sock(void)87 int mt_tcp_create_sock(void)
88 {
89 int fd;
90 int flag;
91
92 fd = ::socket(AF_INET, SOCK_STREAM, 0);
93 if (fd < 0)
94 {
95 MTLOG_ERROR("create tcp socket failed, error: %m");
96 return -1;
97 }
98
99 flag = fcntl(fd, F_GETFL, 0);
100 if (flag == -1)
101 {
102 ::close(fd);
103 MTLOG_ERROR("get fd flags failed, error: %m");
104 return -2;
105 }
106
107 if (flag & O_NONBLOCK)
108 return fd;
109
110 if (fcntl(fd, F_SETFL, flag | O_NONBLOCK | O_NDELAY) == -1)
111 {
112 ::close(fd);
113 MTLOG_ERROR("set fd flags failed, error: %m");
114 return -3;
115 }
116
117 return fd;
118 }
119
mt_tcp_get_keep_conn(struct sockaddr_in * dst,int & sock)120 static TcpKeepConn* mt_tcp_get_keep_conn(struct sockaddr_in* dst, int& sock)
121 {
122 KqueuerObj* ntfy_obj = NtfyObjMgr::Instance()->GetNtfyObj(NTFY_OBJ_THREAD, 0);
123 if (NULL == ntfy_obj)
124 {
125 MTLOG_ERROR("get notify failed, logit");
126 return NULL;
127 }
128
129 TcpKeepConn* conn = dynamic_cast<TcpKeepConn*>(ConnectionMgr::Instance()->GetConnection(OBJ_TCP_KEEP, dst));
130 if (NULL == conn)
131 {
132 MTLOG_ERROR("get connection failed, dst[%p]", dst);
133 NtfyObjMgr::Instance()->FreeNtfyObj(ntfy_obj);
134 return NULL;
135 }
136 conn->SetNtfyObj(ntfy_obj);
137
138 int osfd = conn->CreateSocket();
139 if (osfd < 0)
140 {
141 ConnectionMgr::Instance()->FreeConnection(conn, true);
142 MTLOG_ERROR("create socket failed, ret[%d]", osfd);
143 return NULL;
144 }
145
146 sock = osfd;
147 return conn;
148 }
149
mt_tcp_check_recv(int sock,char * rcv_buf,int & len,int flags,int timeout,MtFuncTcpMsgLen func)150 static int mt_tcp_check_recv(int sock, char* rcv_buf, int &len, int flags, int timeout, MtFuncTcpMsgLen func)
151 {
152 int recv_len = 0;
153 utime64_t start_ms = MtFrame::Instance()->GetLastClock();
154 do
155 {
156 utime64_t cost_time = MtFrame::Instance()->GetLastClock() - start_ms;
157 if (cost_time > (utime64_t)timeout)
158 {
159 errno = ETIME;
160 MTLOG_ERROR("tcp socket[%d] recv not ok, timeout", sock);
161 return -3;
162 }
163
164 int rc = MtFrame::recv(sock, (rcv_buf + recv_len), (len - recv_len), 0, (timeout - (int)cost_time));
165 if (rc < 0)
166 {
167 MTLOG_ERROR("tcp socket[%d] recv failed ret[%d][%m]", sock, rc);
168 return -3;
169 }
170 else if (rc == 0)
171 {
172 len = recv_len;
173 MTLOG_ERROR("tcp socket[%d] remote close", sock);
174 return -7;
175 }
176 recv_len += rc;
177
178 rc = func(rcv_buf, recv_len);
179 if (rc < 0)
180 {
181 MTLOG_ERROR("tcp socket[%d] user check pkg error[%d]", sock, rc);
182 return -5;
183 }
184 else if (rc == 0)
185 {
186 if (len == recv_len)
187 {
188 MTLOG_ERROR("tcp socket[%d] user check pkg not ok, but no more buff", sock);
189 return -6;
190 }
191 continue;
192 }
193 else
194 {
195 if (rc > recv_len)
196 {
197 continue;
198 }
199 else
200 {
201 len = rc;
202 break;
203 }
204 }
205 } while (true);
206
207 return 0;
208 }
209
mt_tcpsendrcv(struct sockaddr_in * dst,void * pkg,int len,void * rcv_buf,int & buf_size,int timeout,MtFuncTcpMsgLen func)210 int mt_tcpsendrcv(struct sockaddr_in* dst, void* pkg, int len, void* rcv_buf, int& buf_size, int timeout, MtFuncTcpMsgLen func)
211 {
212 if (!dst || !pkg || !rcv_buf || !func || len<1 || buf_size<1)
213 {
214 MTLOG_ERROR("input params invalid, dst[%p], pkg[%p], rcv_buf[%p], fun[%p], len[%d], buf_size[%d]",
215 dst, pkg, rcv_buf, func, len, buf_size);
216 return -10;
217 }
218
219 int ret = 0, rc = 0;
220 int addr_len = sizeof(struct sockaddr_in);
221 utime64_t start_ms = MtFrame::Instance()->GetLastClock();
222 utime64_t cost_time = 0;
223 int time_left = timeout;
224
225 int sock = -1;
226 TcpKeepConn* conn = mt_tcp_get_keep_conn(dst, sock);
227 if ((conn == NULL) || (sock < 0))
228 {
229 MTLOG_ERROR("socket[%d] get conn failed, ret[%m]", sock);
230 ret = -1;
231 goto EXIT_LABEL;
232 }
233
234 rc = MtFrame::connect(sock, (struct sockaddr *)dst, addr_len, time_left);
235 if (rc < 0)
236 {
237 MTLOG_ERROR("socket[%d] connect failed, ret[%d][%m]", sock, rc);
238 ret = -4;
239 goto EXIT_LABEL;
240 }
241
242 cost_time = MtFrame::Instance()->GetLastClock() - start_ms;
243 time_left = (timeout > (int)cost_time) ? (timeout - (int)cost_time) : 0;
244 rc = MtFrame::send(sock, pkg, len, 0, time_left);
245 if (rc < 0)
246 {
247 MTLOG_ERROR("socket[%d] send failed, ret[%d][%m]", sock, rc);
248 ret = -2;
249 goto EXIT_LABEL;
250 }
251
252 cost_time = MtFrame::Instance()->GetLastClock() - start_ms;
253 time_left = (timeout > (int)cost_time) ? (timeout - (int)cost_time) : 0;
254 rc = mt_tcp_check_recv(sock, (char*)rcv_buf, buf_size, 0, time_left, func);
255 if (rc < 0)
256 {
257 MTLOG_ERROR("socket[%d] rcv failed, ret[%d][%m]", sock, rc);
258 ret = rc;
259 goto EXIT_LABEL;
260 }
261
262 ret = 0;
263
264 EXIT_LABEL:
265
266 if (conn != NULL)
267 {
268 ConnectionMgr::Instance()->FreeConnection(conn, (ret < 0));
269 }
270
271 return ret;
272 }
273
mt_tcpsendrcv_short(struct sockaddr_in * dst,void * pkg,int len,void * rcv_buf,int & buf_size,int timeout,MtFuncTcpMsgLen func)274 int mt_tcpsendrcv_short(struct sockaddr_in* dst, void* pkg, int len, void* rcv_buf, int& buf_size, int timeout, MtFuncTcpMsgLen func)
275 {
276 int ret = 0, rc = 0;
277 int addr_len = sizeof(struct sockaddr_in);
278 utime64_t start_ms = MtFrame::Instance()->GetLastClock();
279 utime64_t cost_time = 0;
280 int time_left = timeout;
281
282 if (!dst || !pkg || !rcv_buf || !func || len<1 || buf_size<1)
283 {
284 MTLOG_ERROR("input params invalid, dst[%p], pkg[%p], rcv_buf[%p], fun[%p], len[%d], buf_size[%d]",
285 dst, pkg, rcv_buf, func, len, buf_size);
286 return -10;
287 }
288
289 int sock;
290 sock = mt_tcp_create_sock();
291 if (sock < 0)
292 {
293 MTLOG_ERROR("create tcp socket failed, ret: %d", sock);
294 return -1;
295 }
296
297 rc = MtFrame::connect(sock, (struct sockaddr *)dst, addr_len, time_left);
298 if (rc < 0)
299 {
300 MTLOG_ERROR("socket[%d] connect failed, ret[%d][%m]", sock, rc);
301 ret = -4;
302 goto EXIT_LABEL;
303 }
304
305 cost_time = MtFrame::Instance()->GetLastClock() - start_ms;
306 time_left = (timeout > (int)cost_time) ? (timeout - (int)cost_time) : 0;
307 rc = MtFrame::send(sock, pkg, len, 0, time_left);
308 if (rc < 0)
309 {
310 MTLOG_ERROR("socket[%d] send failed, ret[%d][%m]", sock, rc);
311 ret = -2;
312 goto EXIT_LABEL;
313 }
314
315 cost_time = MtFrame::Instance()->GetLastClock() - start_ms;
316 time_left = (timeout > (int)cost_time) ? (timeout - (int)cost_time) : 0;
317 rc = mt_tcp_check_recv(sock, (char*)rcv_buf, buf_size, 0, time_left, func);
318 if (rc < 0)
319 {
320 MTLOG_ERROR("socket[%d] rcv failed, ret[%d][%m]", sock, rc);
321 ret = rc;
322 goto EXIT_LABEL;
323 }
324
325 ret = 0;
326
327 EXIT_LABEL:
328 if (sock >= 0)
329 ::close(sock);
330
331 return ret;
332 }
333
mt_tcpsend(struct sockaddr_in * dst,void * pkg,int len,int timeout)334 int mt_tcpsend(struct sockaddr_in* dst, void* pkg, int len, int timeout)
335 {
336 if (!dst || !pkg || len<1)
337 {
338 MTLOG_ERROR("input params invalid, dst[%p], pkg[%p], len[%d]", dst, pkg, len);
339 return -10;
340 }
341
342 int ret = 0, rc = 0;
343 int addr_len = sizeof(struct sockaddr_in);
344 utime64_t start_ms = MtFrame::Instance()->GetLastClock();
345 utime64_t cost_time = 0;
346 int time_left = timeout;
347
348 int sock = -1;
349 TcpKeepConn* conn = mt_tcp_get_keep_conn(dst, sock);
350 if ((conn == NULL) || (sock < 0))
351 {
352 MTLOG_ERROR("socket[%d] get conn failed, ret[%m]", sock);
353 ret = -1;
354 goto EXIT_LABEL;
355 }
356
357 rc = MtFrame::connect(sock, (struct sockaddr *)dst, addr_len, time_left);
358 if (rc < 0)
359 {
360 MTLOG_ERROR("socket[%d] connect failed, ret[%d][%m]", sock, rc);
361 ret = -4;
362 goto EXIT_LABEL;
363 }
364
365 cost_time = MtFrame::Instance()->GetLastClock() - start_ms;
366 time_left = (timeout > (int)cost_time) ? (timeout - (int)cost_time) : 0;
367 rc = MtFrame::send(sock, pkg, len, 0, time_left);
368 if (rc < 0)
369 {
370 MTLOG_ERROR("socket[%d] send failed, ret[%d][%m]", sock, rc);
371 ret = -2;
372 goto EXIT_LABEL;
373 }
374
375 ret = 0;
376
377 EXIT_LABEL:
378
379 if (conn != NULL)
380 {
381 ConnectionMgr::Instance()->FreeConnection(conn, (ret < 0));
382 }
383
384 return ret;
385 }
386
mt_tcpsend_short(struct sockaddr_in * dst,void * pkg,int len,int timeout)387 int mt_tcpsend_short(struct sockaddr_in* dst, void* pkg, int len, int timeout)
388 {
389 if (!dst || !pkg || len<1)
390 {
391 MTLOG_ERROR("input params invalid, dst[%p], pkg[%p], len[%d]", dst, pkg, len);
392 return -10;
393 }
394
395 int ret = 0, rc = 0;
396 int addr_len = sizeof(struct sockaddr_in);
397 utime64_t start_ms = MtFrame::Instance()->GetLastClock();
398 utime64_t cost_time = 0;
399 int time_left = timeout;
400
401 int sock = -1;
402 sock = mt_tcp_create_sock();
403 if (sock < 0)
404 {
405 MTLOG_ERROR("create tcp socket failed, ret: %d", sock);
406 ret = -1;
407 goto EXIT_LABEL;
408 }
409
410 rc = MtFrame::connect(sock, (struct sockaddr *)dst, addr_len, time_left);
411 if (rc < 0)
412 {
413 MTLOG_ERROR("socket[%d] connect failed, ret[%d][%m]", sock, rc);
414 ret = -4;
415 goto EXIT_LABEL;
416 }
417
418 cost_time = MtFrame::Instance()->GetLastClock() - start_ms;
419 time_left = (timeout > (int)cost_time) ? (timeout - (int)cost_time) : 0;
420 rc = MtFrame::send(sock, pkg, len, 0, time_left);
421 if (rc < 0)
422 {
423 MTLOG_ERROR("socket[%d] send failed, ret[%d][%m]", sock, rc);
424 ret = -2;
425 goto EXIT_LABEL;
426 }
427
428 ret = 0;
429
430 EXIT_LABEL:
431
432 if (sock >= 0)
433 ::close(sock);
434
435 return ret;
436 }
437
mt_tcpsendrcv_ex(struct sockaddr_in * dst,void * pkg,int len,void * rcv_buf,int * buf_size,int timeout,MtFuncTcpMsgLen func,MT_TCP_CONN_TYPE type)438 int mt_tcpsendrcv_ex(struct sockaddr_in* dst, void* pkg, int len, void* rcv_buf, int* buf_size, int timeout, MtFuncTcpMsgLen func, MT_TCP_CONN_TYPE type)
439 {
440 if(!dst || !pkg || len<1)
441 {
442 MTLOG_ERROR("input params invalid, dst[%p], pkg[%p], rcv_buf[%p], fun[%p], len[%d], buf_size[%p]type[%d]",
443 dst, pkg, rcv_buf, func, len, buf_size,type);
444 return -10;
445 }
446
447 switch (type)
448 {
449 case MT_TCP_LONG:
450 {
451 return mt_tcpsendrcv(dst, pkg, len, rcv_buf, *buf_size, timeout, func);
452 }
453
454 case MT_TCP_LONG_SNDONLY:
455 {
456 return mt_tcpsend(dst, pkg, len, timeout);
457 }
458
459 case MT_TCP_SHORT:
460 {
461 return mt_tcpsendrcv_short(dst, pkg, len, rcv_buf, *buf_size, timeout, func);
462 }
463
464 case MT_TCP_SHORT_SNDONLY:
465 {
466 return mt_tcpsend_short(dst, pkg, len, timeout);
467 }
468
469 default:
470 {
471 MTLOG_ERROR("input params invalid, dst[%p], pkg[%p], rcv_buf[%p], fun[%p], len[%d], buf_size[%p]type[%d]",
472 dst, pkg, rcv_buf, func, len, buf_size,type);
473 return -10;
474 }
475 }
476
477 return 0;
478 }
479
mt_task_process(void * arg)480 static void mt_task_process(void* arg)
481 {
482 int rc = 0;
483 IMtTask* task = (IMtTask*)arg;
484 if (!task)
485 {
486 MTLOG_ERROR("Invalid arg, error");
487 return;
488 }
489
490 rc = task->Process();
491 if (rc != 0)
492 {
493 MTLOG_DEBUG("task process failed(%d), log", rc);
494 }
495
496 task->SetResult(rc);
497
498 return;
499 };
500
mt_exec_all_task(IMtTaskList & req_list)501 int mt_exec_all_task(IMtTaskList& req_list)
502 {
503 MtFrame* mtframe = MtFrame::Instance();
504 MicroThread* thread = mtframe->GetActiveThread();
505 IMtTask* task = NULL;
506 MicroThread* sub = NULL;
507 MicroThread* tmp = NULL;
508 int rc = -1;
509
510 MicroThread::SubThreadList list;
511 TAILQ_INIT(&list);
512
513 if (0 == req_list.size())
514 {
515 MTLOG_DEBUG("no task for execult");
516 return 0;
517 }
518
519 for (IMtTaskList::iterator it = req_list.begin(); it != req_list.end(); ++it)
520 {
521 task = *it;
522 sub = MtFrame::CreateThread(mt_task_process, task, false);
523 if (NULL == sub)
524 {
525 MTLOG_ERROR("create sub thread failed");
526 goto EXIT_LABEL;
527 }
528
529 sub->SetType(MicroThread::SUB_THREAD);
530 TAILQ_INSERT_TAIL(&list, sub, _sub_entry);
531 }
532
533 TAILQ_FOREACH_SAFE(sub, &list, _sub_entry, tmp)
534 {
535 TAILQ_REMOVE(&list, sub, _sub_entry);
536 thread->AddSubThread(sub);
537 mtframe->InsertRunable(sub);
538 }
539
540 thread->Wait();
541 rc = 0;
542
543 EXIT_LABEL:
544
545 TAILQ_FOREACH_SAFE(sub, &list, _sub_entry, tmp)
546 {
547 TAILQ_REMOVE(&list, sub, _sub_entry);
548 mtframe->FreeThread(sub);
549 }
550
551 return rc;
552
553 }
554
mt_set_msg_private(void * data)555 void mt_set_msg_private(void *data)
556 {
557 MicroThread *msg_thread = MtFrame::Instance()->GetRootThread();
558 if (msg_thread != NULL)
559 msg_thread->SetPrivate(data);
560 }
561
mt_get_msg_private()562 void* mt_get_msg_private()
563 {
564 MicroThread *msg_thread = MtFrame::Instance()->GetRootThread();
565 if (NULL == msg_thread)
566 {
567 return NULL;
568 }
569
570 return msg_thread->GetPrivate();
571 }
572
mt_init_frame(int argc,char * const argv[])573 bool mt_init_frame(int argc, char * const argv[])
574 {
575 if (argc) {
576 ff_init(argc, argv);
577 ff_set_hook_flag();
578 }
579 memset(&g_mt_syscall_tab, 0, sizeof(g_mt_syscall_tab));
580 return MtFrame::Instance()->InitFrame();
581 }
582
mt_set_stack_size(unsigned int bytes)583 void mt_set_stack_size(unsigned int bytes)
584 {
585 ThreadPool::SetDefaultStackSize(bytes);
586 }
587
mt_recvfrom(int fd,void * buf,int len,int flags,struct sockaddr * from,socklen_t * fromlen,int timeout)588 int mt_recvfrom(int fd, void *buf, int len, int flags, struct sockaddr *from, socklen_t *fromlen, int timeout)
589 {
590 return MtFrame::recvfrom(fd, buf, len, flags, from, fromlen, timeout);
591 }
592
mt_sendto(int fd,const void * msg,int len,int flags,const struct sockaddr * to,int tolen,int timeout)593 int mt_sendto(int fd, const void *msg, int len, int flags, const struct sockaddr *to, int tolen, int timeout)
594 {
595 return MtFrame::sendto(fd, msg, len, flags, to, tolen, timeout);
596 }
597
mt_connect(int fd,const struct sockaddr * addr,int addrlen,int timeout)598 int mt_connect(int fd, const struct sockaddr *addr, int addrlen, int timeout)
599 {
600 return MtFrame::connect(fd, addr, addrlen, timeout);
601 }
602
mt_accept(int fd,struct sockaddr * addr,socklen_t * addrlen,int timeout)603 int mt_accept(int fd, struct sockaddr *addr, socklen_t *addrlen, int timeout)
604 {
605 return MtFrame::accept(fd, addr, addrlen, timeout);
606 }
607
mt_read(int fd,void * buf,size_t nbyte,int timeout)608 ssize_t mt_read(int fd, void *buf, size_t nbyte, int timeout)
609 {
610 return MtFrame::read(fd, buf, nbyte, timeout);
611 }
612
mt_write(int fd,const void * buf,size_t nbyte,int timeout)613 ssize_t mt_write(int fd, const void *buf, size_t nbyte, int timeout)
614 {
615 return MtFrame::write(fd, buf, nbyte, timeout);
616 }
617
mt_recv(int fd,void * buf,int len,int flags,int timeout)618 ssize_t mt_recv(int fd, void *buf, int len, int flags, int timeout)
619 {
620 return MtFrame::recv(fd, buf, len, flags, timeout);
621 }
622
mt_send(int fd,const void * buf,size_t nbyte,int flags,int timeout)623 ssize_t mt_send(int fd, const void *buf, size_t nbyte, int flags, int timeout)
624 {
625 return MtFrame::send(fd, buf, nbyte, flags, timeout);
626 }
627
mt_sleep(int ms)628 void mt_sleep(int ms)
629 {
630 MtFrame::sleep(ms);
631 }
632
mt_time_ms(void)633 unsigned long long mt_time_ms(void)
634 {
635 return MtFrame::Instance()->GetLastClock();
636 }
637
mt_wait_events(int fd,int events,int timeout)638 int mt_wait_events(int fd, int events, int timeout)
639 {
640 return MtFrame::Instance()->WaitEvents(fd, events, timeout);
641 }
642
mt_start_thread(void * entry,void * args)643 void* mt_start_thread(void* entry, void* args)
644 {
645 return MtFrame::Instance()->CreateThread((ThreadStart)entry, args, true);
646 }
647
mt_active_thread()648 void* mt_active_thread()
649 {
650 return MtFrame::Instance()->GetActiveThread();
651 }
652
mt_thread_wait(int ms)653 void mt_thread_wait(int ms)
654 {
655 MtFrame::Instance()->WaitNotify(ms);
656 }
657
mt_thread_wakeup_wait(void * thread_p)658 void mt_thread_wakeup_wait(void * thread_p)
659 {
660 MtFrame::Instance()->NotifyThread((MicroThread *) thread_p);
661 }
662
mt_swap_thread()663 void mt_swap_thread()
664 {
665 return MtFrame::Instance()->SwapDaemonThread();
666 }
667
668 #define BUF_ALIGNMENT_SIZE 4096
669 #define BUF_ALIGN_SIZE(x) (((x)+BUF_ALIGNMENT_SIZE-1)&~(BUF_ALIGNMENT_SIZE-1))
670 #define BUF_DEFAULT_SIZE 4096
671
672 class ScopedBuf
673 {
674 public:
ScopedBuf(void * & buf_keeper,bool keep)675 ScopedBuf(void*& buf_keeper, bool keep)
676 :buf_keeper_(buf_keeper),buf_(0),len_(0),len_watermark_(0),keep_(keep)
677 {}
678
Alloc(int len)679 int Alloc(int len)
680 {
681 if(len<len_)
682 {
683 return -1;
684 }
685
686 if(len==0)
687 {
688 len = BUF_ALIGNMENT_SIZE;
689 }
690 if(len_==len)
691 {
692 return 0;
693 }
694
695 len_ = BUF_ALIGN_SIZE(len);
696 if(len_==0)
697 {
698 len_ = BUF_DEFAULT_SIZE;
699 }
700 len_watermark_ = len_-BUF_ALIGNMENT_SIZE;
701 char* tmp = (char*)realloc(buf_, len_);
702 if(tmp==NULL)
703 {
704 return -2;
705 }
706
707 buf_ = tmp;
708 return 0;
709 }
710
reset()711 void reset()
712 {
713 if(keep_)
714 {
715 buf_keeper_ = (void*)buf_;
716 buf_ = NULL;
717 }
718 }
719
~ScopedBuf()720 ~ScopedBuf()
721 {
722 if(buf_!=NULL)
723 {
724 free(buf_);
725 buf_ = NULL;
726 }
727 }
728
729 public:
730 void* &buf_keeper_;
731 char* buf_;
732 int len_;
733 int len_watermark_;
734 bool keep_;
735
736 };
737
mt_tcp_check_recv(int sock,void * & rcv_buf,int & len,int flags,int timeout,MtFuncTcpMsgChecker check_func,void * msg_ctx,bool keep_rcv_buf)738 static int mt_tcp_check_recv(int sock, void*& rcv_buf, int &len, int flags,
739 int timeout, MtFuncTcpMsgChecker check_func, void* msg_ctx, bool keep_rcv_buf)
740 {
741 int recv_len = 0;
742 utime64_t start_ms = MtFrame::Instance()->GetLastClock();
743
744 int rc = 0;
745 int ret = 0;
746 int pkg_len = 0;
747 bool msg_len_detected = false;
748
749 ScopedBuf sbuf(rcv_buf, keep_rcv_buf);
750 ret = sbuf.Alloc(len);
751
752 if(ret!=0)
753 {
754 MTLOG_ERROR("tcp socket[%d] recv failed ret[%d], alloc rcv buf failed, [%m]", sock, ret);
755 return -11;
756 }
757
758 do
759 {
760 utime64_t cost_time = MtFrame::Instance()->GetLastClock() - start_ms;
761 if (cost_time > (utime64_t)timeout)
762 {
763 errno = ETIME;
764 MTLOG_ERROR("tcp socket[%d] recv not ok, timeout", sock);
765 return -3;
766 }
767
768 rc = MtFrame::recv(sock, (sbuf.buf_ + recv_len), (sbuf.len_ - recv_len), 0, (timeout - (int)cost_time));
769 if (rc < 0)
770 {
771 MTLOG_ERROR("tcp socket[%d] recv failed ret[%d][%m]", sock, rc);
772 return -3;
773 }
774 else if (rc == 0)
775 {
776
777 if(recv_len==0)
778 {
779 MTLOG_ERROR("tcp socket[%d] remote close", sock);
780 return -7;
781 }
782
783 rc = check_func(sbuf.buf_, recv_len, true, msg_ctx, msg_len_detected);
784
785 if(rc!=recv_len)
786 {
787 MTLOG_ERROR("tcp socket[%d] remote close", sock);
788 return -7;
789 }
790 len = recv_len;
791 break;
792 }
793 recv_len += rc;
794
795 if((!msg_len_detected)||recv_len==pkg_len)
796 {
797 rc = check_func(sbuf.buf_, recv_len, false, msg_ctx,msg_len_detected);
798 if(msg_len_detected)
799 {
800 pkg_len = rc;
801 }
802 }
803 else
804 {
805 rc = pkg_len;
806 }
807
808 if (rc < 0)
809 {
810 MTLOG_ERROR("tcp socket[%d] user check pkg error[%d]", sock, rc);
811 return -5;
812 }
813 else if (rc == 0)
814 {
815 if(sbuf.len_ > recv_len)
816 {
817 continue;
818 }
819
820 ret = sbuf.Alloc(sbuf.len_<<1);
821
822 if(ret!=0)
823 {
824 MTLOG_ERROR("tcp socket[%d] recv failed ret[%d], alloc rcv buf failed, [%m]", sock, ret);
825 return -11;
826 }
827 }
828 else
829 {
830 if (rc > recv_len)
831 {
832 if(sbuf.len_ > recv_len)
833 {
834 continue;
835 }
836
837 ret = sbuf.Alloc(rc);
838
839 if(ret!=0)
840 {
841 MTLOG_ERROR("tcp socket[%d] recv failed ret[%d], alloc rcv buf failed, [%m]", sock, ret);
842 return -11;
843 }
844 }
845 else if(rc==recv_len)
846 {
847 len = rc;
848 break;
849 }
850 else
851 {
852 MTLOG_ERROR("tcp socket[%d] user check pkg error, pkg len < recv_len", sock);
853 return -5;
854 }
855 }
856 } while (true);
857
858 sbuf.reset();
859
860 return 0;
861 }
862
mt_tcpsendrcv(struct sockaddr_in * dst,void * pkg,int len,void * & rcv_buf,int & recv_pkg_size,int timeout,MtFuncTcpMsgChecker check_func,void * msg_ctx,bool keep_rcv_buf)863 int mt_tcpsendrcv(struct sockaddr_in* dst, void* pkg, int len, void*& rcv_buf, int& recv_pkg_size,
864 int timeout, MtFuncTcpMsgChecker check_func, void* msg_ctx, bool keep_rcv_buf)
865 {
866 if(!dst || !pkg || len<1)
867 {
868 MTLOG_ERROR("input params invalid, dst[%p], pkg[%p], len[%d], fun[%p]",
869 dst, pkg, len, check_func);
870 return -10;
871 }
872
873
874 int ret = 0, rc = 0;
875 int addr_len = sizeof(struct sockaddr_in);
876 utime64_t start_ms = MtFrame::Instance()->GetLastClock();
877 utime64_t cost_time = 0;
878 int time_left = timeout;
879
880 int sock = -1;
881 TcpKeepConn* conn = mt_tcp_get_keep_conn(dst, sock);
882 if ((conn == NULL) || (sock < 0))
883 {
884 MTLOG_ERROR("socket[%d] get conn failed, ret[%m]", sock);
885 ret = -1;
886 goto EXIT_LABEL;
887 }
888
889 rc = MtFrame::connect(sock, (struct sockaddr *)dst, addr_len, time_left);
890 if (rc < 0)
891 {
892 MTLOG_ERROR("socket[%d] connect failed, ret[%d][%m]", sock, rc);
893 ret = -4;
894 goto EXIT_LABEL;
895 }
896
897 cost_time = MtFrame::Instance()->GetLastClock() - start_ms;
898 time_left = (timeout > (int)cost_time) ? (timeout - (int)cost_time) : 0;
899 rc = MtFrame::send(sock, pkg, len, 0, time_left);
900 if (rc < 0)
901 {
902 MTLOG_ERROR("socket[%d] send failed, ret[%d][%m]", sock, rc);
903 ret = -2;
904 goto EXIT_LABEL;
905 }
906
907 cost_time = MtFrame::Instance()->GetLastClock() - start_ms;
908 time_left = (timeout > (int)cost_time) ? (timeout - (int)cost_time) : 0;
909
910 rc = mt_tcp_check_recv(sock, rcv_buf, recv_pkg_size, 0, time_left, check_func, msg_ctx, keep_rcv_buf);
911 if (rc < 0)
912 {
913 MTLOG_ERROR("socket[%d] rcv failed, ret[%d][%m]", sock, rc);
914 ret = rc;
915 goto EXIT_LABEL;
916 }
917
918 ret = 0;
919
920 EXIT_LABEL:
921 if (conn != NULL)
922 {
923 ConnectionMgr::Instance()->FreeConnection(conn, (ret < 0));
924 }
925
926 return ret;
927 }
928
mt_tcpsendrcv_short(struct sockaddr_in * dst,void * pkg,int len,void * & rcv_buf,int & recv_pkg_size,int timeout,MtFuncTcpMsgChecker check_func,void * msg_ctx,bool keep_rcv_buf)929 int mt_tcpsendrcv_short(struct sockaddr_in* dst, void* pkg, int len, void*& rcv_buf, int& recv_pkg_size,
930 int timeout, MtFuncTcpMsgChecker check_func, void* msg_ctx, bool keep_rcv_buf)
931 {
932 int ret = 0, rc = 0;
933 int addr_len = sizeof(struct sockaddr_in);
934 utime64_t start_ms = MtFrame::Instance()->GetLastClock();
935 utime64_t cost_time = 0;
936 int time_left = timeout;
937
938 if(!dst || !pkg || len<1)
939 {
940 MTLOG_ERROR("input params invalid, dst[%p], pkg[%p], len[%d], fun[%p]",
941 dst, pkg, len, check_func);
942 return -10;
943 }
944
945 int sock;
946 sock = mt_tcp_create_sock();
947 if (sock < 0)
948 {
949 MTLOG_ERROR("create tcp socket failed, ret: %d", sock);
950 return -1;
951 }
952
953 rc = MtFrame::connect(sock, (struct sockaddr *)dst, addr_len, time_left);
954 if (rc < 0)
955 {
956 MTLOG_ERROR("socket[%d] connect failed, ret[%d][%m]", sock, rc);
957 ret = -4;
958 goto EXIT_LABEL;
959 }
960
961 cost_time = MtFrame::Instance()->GetLastClock() - start_ms;
962 time_left = (timeout > (int)cost_time) ? (timeout - (int)cost_time) : 0;
963 rc = MtFrame::send(sock, pkg, len, 0, time_left);
964 if (rc < 0)
965 {
966 MTLOG_ERROR("socket[%d] send failed, ret[%d][%m]", sock, rc);
967 ret = -2;
968 goto EXIT_LABEL;
969 }
970
971 cost_time = MtFrame::Instance()->GetLastClock() - start_ms;
972 time_left = (timeout > (int)cost_time) ? (timeout - (int)cost_time) : 0;
973 rc = mt_tcp_check_recv(sock, rcv_buf, recv_pkg_size, 0, time_left, check_func, msg_ctx, keep_rcv_buf);
974
975 if (rc < 0)
976 {
977 MTLOG_ERROR("socket[%d] rcv failed, ret[%d][%m]", sock, rc);
978 ret = rc;
979 goto EXIT_LABEL;
980 }
981
982 ret = 0;
983
984 EXIT_LABEL:
985 if (sock >= 0)
986 ::close(sock);
987
988 return ret;
989 }
990
mt_tcpsendrcv_ex(struct sockaddr_in * dst,void * pkg,int len,void * & rcv_buf,int & rcv_pkg_size,int timeout,MtFuncTcpMsgChecker check_func,void * msg_ctx,MT_TCP_CONN_TYPE type,bool keep_rcv_buf)991 int mt_tcpsendrcv_ex(struct sockaddr_in* dst, void* pkg, int len, void*& rcv_buf, int& rcv_pkg_size,
992 int timeout, MtFuncTcpMsgChecker check_func, void* msg_ctx,
993 MT_TCP_CONN_TYPE type, bool keep_rcv_buf)
994 {
995 if(!dst || !pkg || len<1)
996 {
997 MTLOG_ERROR("input params invalid, dst[%p], pkg[%p], len[%d], fun[%p], msg_ctx[%p], type[%d]",
998 dst, pkg, len, check_func, msg_ctx, type);
999 return -10;
1000 }
1001
1002 switch (type)
1003 {
1004 case MT_TCP_LONG:
1005 {
1006 return mt_tcpsendrcv(dst, pkg, len, rcv_buf, rcv_pkg_size, timeout, check_func, msg_ctx, keep_rcv_buf);
1007 }
1008
1009 case MT_TCP_LONG_SNDONLY:
1010 {
1011 return mt_tcpsend(dst, pkg, len, timeout);
1012 }
1013
1014 case MT_TCP_SHORT:
1015 {
1016 return mt_tcpsendrcv_short(dst, pkg, len, rcv_buf, rcv_pkg_size, timeout, check_func, msg_ctx, keep_rcv_buf);
1017 }
1018
1019 case MT_TCP_SHORT_SNDONLY:
1020 {
1021 return mt_tcpsend_short(dst, pkg, len, timeout);
1022 }
1023
1024 default:
1025 {
1026 MTLOG_ERROR("input params invalid, dst[%p], pkg[%p], len[%d], fun[%p], msg_ctx[%p], type[%d]",
1027 dst, pkg, len, check_func, msg_ctx, type);
1028 return -10;
1029 }
1030 }
1031
1032 return 0;
1033 }
1034
1035 }
1036