xref: /f-stack/app/micro_thread/mt_api.cpp (revision 552bc48c)
1 
2 /**
3  * Tencent is pleased to support the open source community by making MSEC available.
4  *
5  * Copyright (C) 2016 THL A29 Limited, a Tencent company. All rights reserved.
6  *
7  * Licensed under the GNU General Public License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License. You may
9  * obtain a copy of the License at
10  *
11  *     https://opensource.org/licenses/GPL-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software distributed under the
14  * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
15  * either express or implied. See the License for the specific language governing permissions
16  * and limitations under the License.
17  */
18 
19 
20 /**
21  *  @filename mt_sys_call.cpp
22  */
23 
24 #include "kqueue_proxy.h"
25 #include "micro_thread.h"
26 #include "mt_connection.h"
27 #include "mt_api.h"
28 #include "ff_api.h"
29 #include "mt_sys_hook.h"
30 
31 namespace NS_MICRO_THREAD {
32 
33 int mt_udpsendrcv(struct sockaddr_in* dst, void* pkg, int len, void* rcv_buf, int& buf_size, int timeout)
34 {
35     int ret = 0;
36     int rc  = 0;
37     int flags = 1;
38     struct sockaddr_in from_addr = {0};
39     int addr_len = sizeof(from_addr);
40 
41     if(len<1 || buf_size<1 ||!dst || !pkg || !rcv_buf)
42     {
43         MTLOG_ERROR("mt_udpsendrcv input params invalid, dst[%p], pkg[%p], rcv_buf[%p], len[%d], buf_size[%d]",
44             dst, pkg, rcv_buf, len, buf_size);
45         return -10;
46     }
47 
48     int sock = socket(PF_INET, SOCK_DGRAM, 0);
49     if ((sock < 0) || (ioctl(sock, FIONBIO, &flags) < 0))
50     {
51         MT_ATTR_API(320842, 1);
52         MTLOG_ERROR("mt_udpsendrcv new sock failed, sock: %d, errno: %d (%m)", sock, errno);
53         ret = -1;
54         goto EXIT_LABEL;
55     }
56 
57     rc = MtFrame::sendto(sock, pkg, len, 0, (struct sockaddr*)dst, (int)sizeof(*dst), timeout);
58     if (rc < 0)
59     {
60         MT_ATTR_API(320844, 1);
61         MTLOG_ERROR("mt_udpsendrcv send failed, rc: %d, errno: %d (%m)", rc, errno);
62         ret = -2;
63         goto EXIT_LABEL;
64     }
65 
66     rc = MtFrame::recvfrom(sock, rcv_buf, buf_size, 0, (struct sockaddr*)&from_addr, (socklen_t*)&addr_len, timeout);
67     if (rc < 0)
68     {
69         MT_ATTR_API(320845, 1);
70         MTLOG_ERROR("mt_udpsendrcv recv failed, rc: %d, errno: %d (%m)", rc, errno);
71         ret = -3;
72         goto EXIT_LABEL;
73     }
74     buf_size = rc;
75 
76 EXIT_LABEL:
77 
78     if (sock > 0)
79     {
80         close(sock);
81         sock = -1;
82     }
83 
84     return ret;
85 }
86 
87 int mt_tcp_create_sock(void)
88 {
89     int fd;
90     int flag;
91 
92     fd = ::socket(AF_INET, SOCK_STREAM, 0);
93     if (fd < 0)
94     {
95         MTLOG_ERROR("create tcp socket failed, error: %m");
96         return -1;
97     }
98 
99     flag = fcntl(fd, F_GETFL, 0);
100     if (flag == -1)
101     {
102         ::close(fd);
103         MTLOG_ERROR("get fd flags failed, error: %m");
104         return -2;
105     }
106 
107     if (flag & O_NONBLOCK)
108         return fd;
109 
110     if (fcntl(fd, F_SETFL, flag | O_NONBLOCK | O_NDELAY) == -1)
111     {
112         ::close(fd);
113         MTLOG_ERROR("set fd flags failed, error: %m");
114         return -3;
115     }
116 
117     return fd;
118 }
119 
120 static TcpKeepConn* mt_tcp_get_keep_conn(struct sockaddr_in* dst, int& sock)
121 {
122     KqueuerObj* ntfy_obj = NtfyObjMgr::Instance()->GetNtfyObj(NTFY_OBJ_THREAD, 0);
123     if (NULL == ntfy_obj)
124     {
125         MTLOG_ERROR("get notify failed, logit");
126         return NULL;
127     }
128 
129     TcpKeepConn* conn = dynamic_cast<TcpKeepConn*>(ConnectionMgr::Instance()->GetConnection(OBJ_TCP_KEEP, dst));
130     if (NULL == conn)
131     {
132         MTLOG_ERROR("get connection failed, dst[%p]", dst);
133         NtfyObjMgr::Instance()->FreeNtfyObj(ntfy_obj);
134         return NULL;
135     }
136     conn->SetNtfyObj(ntfy_obj);
137 
138     int osfd = conn->CreateSocket();
139     if (osfd < 0)
140     {
141         ConnectionMgr::Instance()->FreeConnection(conn, true);
142         MTLOG_ERROR("create socket failed, ret[%d]", osfd);
143         return NULL;
144     }
145 
146     sock = osfd;
147     return conn;
148 }
149 
150 static int mt_tcp_check_recv(int sock, char* rcv_buf, int &len, int flags, int timeout, MtFuncTcpMsgLen func)
151 {
152     int recv_len = 0;
153     utime64_t start_ms = MtFrame::Instance()->GetLastClock();
154     do
155     {
156         utime64_t cost_time = MtFrame::Instance()->GetLastClock() - start_ms;
157         if (cost_time > (utime64_t)timeout)
158         {
159             errno = ETIME;
160             MTLOG_ERROR("tcp socket[%d] recv not ok, timeout", sock);
161             return -3;
162         }
163 
164         int rc = MtFrame::recv(sock, (rcv_buf + recv_len), (len - recv_len), 0, (timeout - (int)cost_time));
165         if (rc < 0)
166         {
167             MTLOG_ERROR("tcp socket[%d] recv failed ret[%d][%m]", sock, rc);
168             return -3;
169         }
170         else if (rc == 0)
171         {
172             len = recv_len;
173             MTLOG_ERROR("tcp socket[%d] remote close", sock);
174             return -7;
175         }
176         recv_len += rc;
177 
178         rc = func(rcv_buf, recv_len);
179         if (rc < 0)
180         {
181             MTLOG_ERROR("tcp socket[%d] user check pkg error[%d]", sock, rc);
182             return -5;
183         }
184         else if (rc == 0)
185         {
186             if (len == recv_len)
187             {
188                 MTLOG_ERROR("tcp socket[%d] user check pkg not ok, but no more buff", sock);
189                 return -6;
190             }
191             continue;
192         }
193         else
194         {
195             if (rc > recv_len)
196             {
197                 continue;
198             }
199             else
200             {
201                 len = rc;
202                 break;
203             }
204         }
205     } while (true);
206 
207     return 0;
208 }
209 
210 int mt_tcpsendrcv(struct sockaddr_in* dst, void* pkg, int len, void* rcv_buf, int& buf_size, int timeout, MtFuncTcpMsgLen func)
211 {
212     if (!dst || !pkg || !rcv_buf || !func || len<1 || buf_size<1)
213     {
214         MTLOG_ERROR("input params invalid, dst[%p], pkg[%p], rcv_buf[%p], fun[%p], len[%d], buf_size[%d]",
215             dst, pkg, rcv_buf, func, len, buf_size);
216         return -10;
217     }
218 
219     int ret = 0, rc = 0;
220     int addr_len = sizeof(struct sockaddr_in);
221     utime64_t start_ms = MtFrame::Instance()->GetLastClock();
222     utime64_t cost_time = 0;
223     int time_left = timeout;
224 
225     int sock = -1;
226     TcpKeepConn* conn = mt_tcp_get_keep_conn(dst, sock);
227     if ((conn == NULL) || (sock < 0))
228     {
229         MTLOG_ERROR("socket[%d] get conn failed, ret[%m]", sock);
230         ret = -1;
231         goto EXIT_LABEL;
232     }
233 
234     rc = MtFrame::connect(sock, (struct sockaddr *)dst, addr_len, time_left);
235     if (rc < 0)
236     {
237         MTLOG_ERROR("socket[%d] connect failed, ret[%d][%m]", sock, rc);
238         ret = -4;
239         goto EXIT_LABEL;
240     }
241 
242     cost_time = MtFrame::Instance()->GetLastClock() - start_ms;
243     time_left = (timeout > (int)cost_time) ? (timeout - (int)cost_time) : 0;
244     rc = MtFrame::send(sock, pkg, len, 0, time_left);
245     if (rc < 0)
246     {
247         MTLOG_ERROR("socket[%d] send failed, ret[%d][%m]", sock, rc);
248         ret = -2;
249         goto EXIT_LABEL;
250     }
251 
252     cost_time = MtFrame::Instance()->GetLastClock() - start_ms;
253     time_left = (timeout > (int)cost_time) ? (timeout - (int)cost_time) : 0;
254     rc = mt_tcp_check_recv(sock, (char*)rcv_buf, buf_size, 0, time_left, func);
255     if (rc < 0)
256     {
257         MTLOG_ERROR("socket[%d] rcv failed, ret[%d][%m]", sock, rc);
258         ret = rc;
259         goto EXIT_LABEL;
260     }
261 
262     ret = 0;
263 
264 EXIT_LABEL:
265 
266     if (conn != NULL)
267     {
268         ConnectionMgr::Instance()->FreeConnection(conn, (ret < 0));
269     }
270 
271     return ret;
272 }
273 
274 int mt_tcpsendrcv_short(struct sockaddr_in* dst, void* pkg, int len, void* rcv_buf, int& buf_size, int timeout, MtFuncTcpMsgLen func)
275 {
276     int ret = 0, rc = 0;
277     int addr_len = sizeof(struct sockaddr_in);
278     utime64_t start_ms = MtFrame::Instance()->GetLastClock();
279     utime64_t cost_time = 0;
280     int time_left = timeout;
281 
282     if (!dst || !pkg || !rcv_buf || !func || len<1 || buf_size<1)
283     {
284         MTLOG_ERROR("input params invalid, dst[%p], pkg[%p], rcv_buf[%p], fun[%p], len[%d], buf_size[%d]",
285             dst, pkg, rcv_buf, func, len, buf_size);
286         return -10;
287     }
288 
289     int sock;
290     sock = mt_tcp_create_sock();
291     if (sock < 0)
292     {
293         MTLOG_ERROR("create tcp socket failed, ret: %d", sock);
294         return -1;
295     }
296 
297     rc = MtFrame::connect(sock, (struct sockaddr *)dst, addr_len, time_left);
298     if (rc < 0)
299     {
300         MTLOG_ERROR("socket[%d] connect failed, ret[%d][%m]", sock, rc);
301         ret = -4;
302         goto EXIT_LABEL;
303     }
304 
305     cost_time = MtFrame::Instance()->GetLastClock() - start_ms;
306     time_left = (timeout > (int)cost_time) ? (timeout - (int)cost_time) : 0;
307     rc = MtFrame::send(sock, pkg, len, 0, time_left);
308     if (rc < 0)
309     {
310         MTLOG_ERROR("socket[%d] send failed, ret[%d][%m]", sock, rc);
311         ret = -2;
312         goto EXIT_LABEL;
313     }
314 
315     cost_time = MtFrame::Instance()->GetLastClock() - start_ms;
316     time_left = (timeout > (int)cost_time) ? (timeout - (int)cost_time) : 0;
317     rc = mt_tcp_check_recv(sock, (char*)rcv_buf, buf_size, 0, time_left, func);
318     if (rc < 0)
319     {
320         MTLOG_ERROR("socket[%d] rcv failed, ret[%d][%m]", sock, rc);
321         ret = rc;
322         goto EXIT_LABEL;
323     }
324 
325     ret = 0;
326 
327 EXIT_LABEL:
328     if (sock >= 0)
329         ::close(sock);
330 
331     return ret;
332 }
333 
334 int mt_tcpsend(struct sockaddr_in* dst, void* pkg, int len, int timeout)
335 {
336     if (!dst || !pkg || len<1)
337     {
338         MTLOG_ERROR("input params invalid, dst[%p], pkg[%p], len[%d]", dst, pkg, len);
339         return -10;
340     }
341 
342     int ret = 0, rc = 0;
343     int addr_len = sizeof(struct sockaddr_in);
344     utime64_t start_ms = MtFrame::Instance()->GetLastClock();
345     utime64_t cost_time = 0;
346     int time_left = timeout;
347 
348     int sock = -1;
349     TcpKeepConn* conn = mt_tcp_get_keep_conn(dst, sock);
350     if ((conn == NULL) || (sock < 0))
351     {
352         MTLOG_ERROR("socket[%d] get conn failed, ret[%m]", sock);
353         ret = -1;
354         goto EXIT_LABEL;
355     }
356 
357     rc = MtFrame::connect(sock, (struct sockaddr *)dst, addr_len, time_left);
358     if (rc < 0)
359     {
360         MTLOG_ERROR("socket[%d] connect failed, ret[%d][%m]", sock, rc);
361         ret = -4;
362         goto EXIT_LABEL;
363     }
364 
365     cost_time = MtFrame::Instance()->GetLastClock() - start_ms;
366     time_left = (timeout > (int)cost_time) ? (timeout - (int)cost_time) : 0;
367     rc = MtFrame::send(sock, pkg, len, 0, time_left);
368     if (rc < 0)
369     {
370         MTLOG_ERROR("socket[%d] send failed, ret[%d][%m]", sock, rc);
371         ret = -2;
372         goto EXIT_LABEL;
373     }
374 
375     ret = 0;
376 
377 EXIT_LABEL:
378 
379     if (conn != NULL)
380     {
381         ConnectionMgr::Instance()->FreeConnection(conn, (ret < 0));
382     }
383 
384     return ret;
385 }
386 
387 int mt_tcpsend_short(struct sockaddr_in* dst, void* pkg, int len, int timeout)
388 {
389     if (!dst || !pkg || len<1)
390     {
391         MTLOG_ERROR("input params invalid, dst[%p], pkg[%p], len[%d]", dst, pkg, len);
392         return -10;
393     }
394 
395     int ret = 0, rc = 0;
396     int addr_len = sizeof(struct sockaddr_in);
397     utime64_t start_ms = MtFrame::Instance()->GetLastClock();
398     utime64_t cost_time = 0;
399     int time_left = timeout;
400 
401     int sock = -1;
402     sock = mt_tcp_create_sock();
403     if (sock < 0)
404     {
405         MTLOG_ERROR("create tcp socket failed, ret: %d", sock);
406         ret = -1;
407         goto EXIT_LABEL;
408     }
409 
410     rc = MtFrame::connect(sock, (struct sockaddr *)dst, addr_len, time_left);
411     if (rc < 0)
412     {
413         MTLOG_ERROR("socket[%d] connect failed, ret[%d][%m]", sock, rc);
414         ret = -4;
415         goto EXIT_LABEL;
416     }
417 
418     cost_time = MtFrame::Instance()->GetLastClock() - start_ms;
419     time_left = (timeout > (int)cost_time) ? (timeout - (int)cost_time) : 0;
420     rc = MtFrame::send(sock, pkg, len, 0, time_left);
421     if (rc < 0)
422     {
423         MTLOG_ERROR("socket[%d] send failed, ret[%d][%m]", sock, rc);
424         ret = -2;
425         goto EXIT_LABEL;
426     }
427 
428     ret = 0;
429 
430 EXIT_LABEL:
431 
432     if (sock >= 0)
433         ::close(sock);
434 
435     return ret;
436 }
437 
438 int mt_tcpsendrcv_ex(struct sockaddr_in* dst, void* pkg, int len, void* rcv_buf, int* buf_size, int timeout, MtFuncTcpMsgLen func, MT_TCP_CONN_TYPE type)
439 {
440     if(!dst || !pkg || len<1)
441     {
442         MTLOG_ERROR("input params invalid, dst[%p], pkg[%p], rcv_buf[%p], fun[%p], len[%d], buf_size[%p]type[%d]",
443                     dst, pkg, rcv_buf, func, len, buf_size,type);
444         return -10;
445     }
446 
447     switch (type)
448     {
449         case MT_TCP_LONG:
450         {
451             return mt_tcpsendrcv(dst, pkg, len, rcv_buf, *buf_size, timeout, func);
452         }
453 
454         case MT_TCP_LONG_SNDONLY:
455         {
456             return mt_tcpsend(dst, pkg, len, timeout);
457         }
458 
459         case MT_TCP_SHORT:
460         {
461             return mt_tcpsendrcv_short(dst, pkg, len, rcv_buf, *buf_size, timeout, func);
462         }
463 
464         case MT_TCP_SHORT_SNDONLY:
465         {
466             return mt_tcpsend_short(dst, pkg, len, timeout);
467         }
468 
469         default:
470         {
471             MTLOG_ERROR("input params invalid, dst[%p], pkg[%p], rcv_buf[%p], fun[%p], len[%d], buf_size[%p]type[%d]",
472                         dst, pkg, rcv_buf, func, len, buf_size,type);
473             return -10;
474         }
475     }
476 
477     return 0;
478 }
479 
480 static void mt_task_process(void* arg)
481 {
482     int rc = 0;
483     IMtTask* task = (IMtTask*)arg;
484     if (!task)
485     {
486         MTLOG_ERROR("Invalid arg, error");
487         return;
488     }
489 
490     rc = task->Process();
491     if (rc != 0)
492     {
493         MTLOG_DEBUG("task process failed(%d), log", rc);
494     }
495 
496     task->SetResult(rc);
497 
498     return;
499 };
500 
501 int mt_exec_all_task(IMtTaskList& req_list)
502 {
503     MtFrame* mtframe    = MtFrame::Instance();
504     MicroThread* thread = mtframe->GetActiveThread();
505     IMtTask* task       = NULL;
506     MicroThread* sub    = NULL;
507     MicroThread* tmp    = NULL;
508     int rc              = -1;
509 
510     MicroThread::SubThreadList list;
511     TAILQ_INIT(&list);
512 
513     if (0 == req_list.size())
514     {
515         MTLOG_DEBUG("no task for execult");
516         return 0;
517     }
518 
519     for (IMtTaskList::iterator it = req_list.begin(); it != req_list.end(); ++it)
520     {
521         task = *it;
522         sub = MtFrame::CreateThread(mt_task_process, task, false);
523         if (NULL == sub)
524         {
525             MTLOG_ERROR("create sub thread failed");
526             goto EXIT_LABEL;
527         }
528 
529         sub->SetType(MicroThread::SUB_THREAD);
530         TAILQ_INSERT_TAIL(&list, sub, _sub_entry);
531     }
532 
533     TAILQ_FOREACH_SAFE(sub, &list, _sub_entry, tmp)
534     {
535         TAILQ_REMOVE(&list, sub, _sub_entry);
536         thread->AddSubThread(sub);
537         mtframe->InsertRunable(sub);
538     }
539 
540     thread->Wait();
541     rc = 0;
542 
543 EXIT_LABEL:
544 
545     TAILQ_FOREACH_SAFE(sub, &list, _sub_entry, tmp)
546     {
547         TAILQ_REMOVE(&list, sub, _sub_entry);
548         mtframe->FreeThread(sub);
549     }
550 
551     return rc;
552 
553 }
554 
555 void mt_set_msg_private(void *data)
556 {
557     MicroThread *msg_thread = MtFrame::Instance()->GetRootThread();
558     if (msg_thread != NULL)
559         msg_thread->SetPrivate(data);
560 }
561 
562 void* mt_get_msg_private()
563 {
564     MicroThread *msg_thread = MtFrame::Instance()->GetRootThread();
565     if (NULL == msg_thread)
566     {
567         return NULL;
568     }
569 
570     return msg_thread->GetPrivate();
571 }
572 
573 bool mt_init_frame(int argc, char * const argv[])
574 {
575     if (argc) {
576         ff_init(argc, argv);
577         ff_set_hook_flag();
578     }
579     memset(&g_mt_syscall_tab, 0, sizeof(g_mt_syscall_tab));
580     return MtFrame::Instance()->InitFrame();
581 }
582 
583 void mt_set_stack_size(unsigned int bytes)
584 {
585     ThreadPool::SetDefaultStackSize(bytes);
586 }
587 
588 int mt_recvfrom(int fd, void *buf, int len, int flags, struct sockaddr *from, socklen_t *fromlen, int timeout)
589 {
590     return MtFrame::recvfrom(fd, buf, len, flags, from, fromlen, timeout);
591 }
592 
593 int mt_sendto(int fd, const void *msg, int len, int flags, const struct sockaddr *to, int tolen, int timeout)
594 {
595     return MtFrame::sendto(fd, msg, len, flags, to, tolen, timeout);
596 }
597 
598 int mt_connect(int fd, const struct sockaddr *addr, int addrlen, int timeout)
599 {
600     return MtFrame::connect(fd, addr, addrlen, timeout);
601 }
602 
603 int mt_accept(int fd, struct sockaddr *addr, socklen_t *addrlen, int timeout)
604 {
605     return MtFrame::accept(fd, addr, addrlen, timeout);
606 }
607 
608 ssize_t mt_read(int fd, void *buf, size_t nbyte, int timeout)
609 {
610     return MtFrame::read(fd, buf, nbyte, timeout);
611 }
612 
613 ssize_t mt_write(int fd, const void *buf, size_t nbyte, int timeout)
614 {
615     return MtFrame::write(fd, buf, nbyte, timeout);
616 }
617 
618 ssize_t mt_recv(int fd, void *buf, int len, int flags, int timeout)
619 {
620     return MtFrame::recv(fd, buf, len, flags, timeout);
621 }
622 
623 ssize_t mt_send(int fd, const void *buf, size_t nbyte, int flags, int timeout)
624 {
625     return MtFrame::send(fd, buf, nbyte, flags, timeout);
626 }
627 
628 void mt_sleep(int ms)
629 {
630     MtFrame::sleep(ms);
631 }
632 
633 unsigned long long mt_time_ms(void)
634 {
635     return MtFrame::Instance()->GetLastClock();
636 }
637 
638 int mt_wait_events(int fd, int events, int timeout)
639 {
640     return MtFrame::Instance()->WaitEvents(fd, events, timeout);
641 }
642 
643 void* mt_start_thread(void* entry, void* args)
644 {
645     return MtFrame::Instance()->CreateThread((ThreadStart)entry, args, true);
646 }
647 
648 void* mt_active_thread()
649 {
650     return MtFrame::Instance()->GetActiveThread();
651 }
652 
653 void mt_thread_wait(int ms)
654 {
655     MtFrame::Instance()->WaitNotify(ms);
656 }
657 
658 void mt_thread_wakeup_wait(void * thread_p)
659 {
660     MtFrame::Instance()->NotifyThread((MicroThread *) thread_p);
661 }
662 
663 void mt_swap_thread()
664 {
665     return MtFrame::Instance()->SwapDaemonThread();
666 }
667 
668 #define BUF_ALIGNMENT_SIZE 4096
669 #define BUF_ALIGN_SIZE(x) (((x)+BUF_ALIGNMENT_SIZE-1)&~(BUF_ALIGNMENT_SIZE-1))
670 #define BUF_DEFAULT_SIZE 4096
671 
672 class ScopedBuf
673 {
674 public:
675     ScopedBuf(void*& buf_keeper, bool keep)
676     :buf_keeper_(buf_keeper),buf_(0),len_(0),len_watermark_(0),keep_(keep)
677     {}
678 
679     int Alloc(int len)
680     {
681         if(len<len_)
682         {
683             return -1;
684         }
685 
686         if(len==0)
687         {
688             len = BUF_ALIGNMENT_SIZE;
689         }
690         if(len_==len)
691         {
692             return 0;
693         }
694 
695         len_ = BUF_ALIGN_SIZE(len);
696         if(len_==0)
697         {
698             len_ = BUF_DEFAULT_SIZE;
699         }
700         len_watermark_ = len_-BUF_ALIGNMENT_SIZE;
701         char* tmp = (char*)realloc(buf_, len_);
702         if(tmp==NULL)
703         {
704             return -2;
705         }
706 
707         buf_ = tmp;
708         return 0;
709     }
710 
711     void reset()
712     {
713         if(keep_)
714         {
715             buf_keeper_ = (void*)buf_;
716             buf_ = NULL;
717         }
718     }
719 
720     ~ScopedBuf()
721     {
722         if(buf_!=NULL)
723         {
724             free(buf_);
725             buf_ = NULL;
726         }
727     }
728 
729 public:
730     void* &buf_keeper_;
731     char* buf_;
732     int   len_;
733     int   len_watermark_;
734     bool  keep_;
735 
736 };
737 
738 static int mt_tcp_check_recv(int sock, void*& rcv_buf, int &len, int flags,
739                      int timeout, MtFuncTcpMsgChecker check_func, void* msg_ctx, bool keep_rcv_buf)
740 {
741     int recv_len = 0;
742     utime64_t start_ms = MtFrame::Instance()->GetLastClock();
743 
744     int rc = 0;
745     int ret = 0;
746     int pkg_len = 0;
747     bool msg_len_detected = false;
748 
749     ScopedBuf sbuf(rcv_buf, keep_rcv_buf);
750     ret = sbuf.Alloc(len);
751 
752     if(ret!=0)
753     {
754         MTLOG_ERROR("tcp socket[%d] recv failed ret[%d], alloc rcv buf failed, [%m]", sock, ret);
755         return -11;
756     }
757 
758     do
759     {
760         utime64_t cost_time = MtFrame::Instance()->GetLastClock() - start_ms;
761         if (cost_time > (utime64_t)timeout)
762         {
763             errno = ETIME;
764             MTLOG_ERROR("tcp socket[%d] recv not ok, timeout", sock);
765             return -3;
766         }
767 
768         rc = MtFrame::recv(sock, (sbuf.buf_ + recv_len), (sbuf.len_ - recv_len), 0, (timeout - (int)cost_time));
769         if (rc < 0)
770         {
771             MTLOG_ERROR("tcp socket[%d] recv failed ret[%d][%m]", sock, rc);
772             return -3;
773         }
774         else if (rc == 0)
775         {
776 
777             if(recv_len==0)
778             {
779                 MTLOG_ERROR("tcp socket[%d] remote close", sock);
780                 return -7;
781             }
782 
783             rc = check_func(sbuf.buf_, recv_len, true, msg_ctx, msg_len_detected);
784 
785             if(rc!=recv_len)
786             {
787                 MTLOG_ERROR("tcp socket[%d] remote close", sock);
788                 return -7;
789             }
790             len = recv_len;
791             break;
792         }
793         recv_len += rc;
794 
795         if((!msg_len_detected)||recv_len==pkg_len)
796         {
797             rc = check_func(sbuf.buf_, recv_len, false, msg_ctx,msg_len_detected);
798             if(msg_len_detected)
799             {
800                 pkg_len = rc;
801             }
802         }
803         else
804         {
805             rc = pkg_len;
806         }
807 
808         if (rc < 0)
809         {
810             MTLOG_ERROR("tcp socket[%d] user check pkg error[%d]", sock, rc);
811             return -5;
812         }
813         else if (rc == 0)
814         {
815             if(sbuf.len_ > recv_len)
816             {
817                 continue;
818             }
819 
820             ret = sbuf.Alloc(sbuf.len_<<1);
821 
822             if(ret!=0)
823             {
824                 MTLOG_ERROR("tcp socket[%d] recv failed ret[%d], alloc rcv buf failed, [%m]", sock, ret);
825                 return -11;
826             }
827         }
828         else
829         {
830             if (rc > recv_len)
831             {
832                 if(sbuf.len_ > recv_len)
833                 {
834                     continue;
835                 }
836 
837                 ret = sbuf.Alloc(rc);
838 
839                 if(ret!=0)
840                 {
841                     MTLOG_ERROR("tcp socket[%d] recv failed ret[%d], alloc rcv buf failed, [%m]", sock, ret);
842                     return -11;
843                 }
844             }
845             else if(rc==recv_len)
846             {
847                 len = rc;
848                 break;
849             }
850             else
851             {
852                 MTLOG_ERROR("tcp socket[%d] user check pkg error, pkg len < recv_len", sock);
853                 return -5;
854             }
855         }
856     } while (true);
857 
858     sbuf.reset();
859 
860     return 0;
861 }
862 
863 int mt_tcpsendrcv(struct sockaddr_in* dst, void* pkg, int len, void*& rcv_buf, int& recv_pkg_size,
864                      int timeout, MtFuncTcpMsgChecker check_func, void* msg_ctx, bool keep_rcv_buf)
865 {
866     if(!dst || !pkg || len<1)
867     {
868         MTLOG_ERROR("input params invalid, dst[%p], pkg[%p], len[%d], fun[%p]",
869                     dst, pkg, len, check_func);
870         return -10;
871     }
872 
873 
874     int ret = 0, rc = 0;
875     int addr_len = sizeof(struct sockaddr_in);
876     utime64_t start_ms = MtFrame::Instance()->GetLastClock();
877     utime64_t cost_time = 0;
878     int time_left = timeout;
879 
880     int sock = -1;
881     TcpKeepConn* conn = mt_tcp_get_keep_conn(dst, sock);
882     if ((conn == NULL) || (sock < 0))
883     {
884         MTLOG_ERROR("socket[%d] get conn failed, ret[%m]", sock);
885         ret = -1;
886         goto EXIT_LABEL;
887     }
888 
889     rc = MtFrame::connect(sock, (struct sockaddr *)dst, addr_len, time_left);
890     if (rc < 0)
891     {
892         MTLOG_ERROR("socket[%d] connect failed, ret[%d][%m]", sock, rc);
893         ret = -4;
894         goto EXIT_LABEL;
895     }
896 
897     cost_time = MtFrame::Instance()->GetLastClock() - start_ms;
898     time_left = (timeout > (int)cost_time) ? (timeout - (int)cost_time) : 0;
899     rc = MtFrame::send(sock, pkg, len, 0, time_left);
900     if (rc < 0)
901     {
902         MTLOG_ERROR("socket[%d] send failed, ret[%d][%m]", sock, rc);
903         ret = -2;
904         goto EXIT_LABEL;
905     }
906 
907     cost_time = MtFrame::Instance()->GetLastClock() - start_ms;
908     time_left = (timeout > (int)cost_time) ? (timeout - (int)cost_time) : 0;
909 
910     rc = mt_tcp_check_recv(sock, rcv_buf, recv_pkg_size, 0, time_left, check_func, msg_ctx, keep_rcv_buf);
911     if (rc < 0)
912     {
913         MTLOG_ERROR("socket[%d] rcv failed, ret[%d][%m]", sock, rc);
914         ret = rc;
915         goto EXIT_LABEL;
916     }
917 
918     ret = 0;
919 
920 EXIT_LABEL:
921     if (conn != NULL)
922     {
923         ConnectionMgr::Instance()->FreeConnection(conn, (ret < 0));
924     }
925 
926     return ret;
927 }
928 
929 int mt_tcpsendrcv_short(struct sockaddr_in* dst, void* pkg, int len, void*& rcv_buf, int& recv_pkg_size,
930                      int timeout, MtFuncTcpMsgChecker check_func, void* msg_ctx, bool keep_rcv_buf)
931 {
932     int ret = 0, rc = 0;
933     int addr_len = sizeof(struct sockaddr_in);
934     utime64_t start_ms = MtFrame::Instance()->GetLastClock();
935     utime64_t cost_time = 0;
936     int time_left = timeout;
937 
938     if(!dst || !pkg || len<1)
939     {
940         MTLOG_ERROR("input params invalid, dst[%p], pkg[%p], len[%d], fun[%p]",
941                     dst, pkg, len, check_func);
942         return -10;
943     }
944 
945     int sock;
946     sock = mt_tcp_create_sock();
947     if (sock < 0)
948     {
949         MTLOG_ERROR("create tcp socket failed, ret: %d", sock);
950         return -1;
951     }
952 
953     rc = MtFrame::connect(sock, (struct sockaddr *)dst, addr_len, time_left);
954     if (rc < 0)
955     {
956         MTLOG_ERROR("socket[%d] connect failed, ret[%d][%m]", sock, rc);
957         ret = -4;
958         goto EXIT_LABEL;
959     }
960 
961     cost_time = MtFrame::Instance()->GetLastClock() - start_ms;
962     time_left = (timeout > (int)cost_time) ? (timeout - (int)cost_time) : 0;
963     rc = MtFrame::send(sock, pkg, len, 0, time_left);
964     if (rc < 0)
965     {
966         MTLOG_ERROR("socket[%d] send failed, ret[%d][%m]", sock, rc);
967         ret = -2;
968         goto EXIT_LABEL;
969     }
970 
971     cost_time = MtFrame::Instance()->GetLastClock() - start_ms;
972     time_left = (timeout > (int)cost_time) ? (timeout - (int)cost_time) : 0;
973     rc = mt_tcp_check_recv(sock, rcv_buf, recv_pkg_size, 0, time_left, check_func, msg_ctx, keep_rcv_buf);
974 
975     if (rc < 0)
976     {
977         MTLOG_ERROR("socket[%d] rcv failed, ret[%d][%m]", sock, rc);
978         ret = rc;
979         goto EXIT_LABEL;
980     }
981 
982     ret = 0;
983 
984 EXIT_LABEL:
985     if (sock >= 0)
986         ::close(sock);
987 
988     return ret;
989 }
990 
991 int mt_tcpsendrcv_ex(struct sockaddr_in* dst, void* pkg, int len, void*& rcv_buf, int& rcv_pkg_size,
992                      int timeout, MtFuncTcpMsgChecker check_func, void* msg_ctx,
993                      MT_TCP_CONN_TYPE type, bool keep_rcv_buf)
994 {
995     if(!dst || !pkg || len<1)
996     {
997         MTLOG_ERROR("input params invalid, dst[%p], pkg[%p], len[%d], fun[%p], msg_ctx[%p], type[%d]",
998                     dst, pkg, len, check_func, msg_ctx, type);
999         return -10;
1000     }
1001 
1002     switch (type)
1003     {
1004         case MT_TCP_LONG:
1005         {
1006             return mt_tcpsendrcv(dst, pkg, len, rcv_buf, rcv_pkg_size, timeout, check_func, msg_ctx, keep_rcv_buf);
1007         }
1008 
1009         case MT_TCP_LONG_SNDONLY:
1010         {
1011             return mt_tcpsend(dst, pkg, len, timeout);
1012         }
1013 
1014         case MT_TCP_SHORT:
1015         {
1016             return mt_tcpsendrcv_short(dst, pkg, len, rcv_buf, rcv_pkg_size, timeout, check_func, msg_ctx, keep_rcv_buf);
1017         }
1018 
1019         case MT_TCP_SHORT_SNDONLY:
1020         {
1021             return mt_tcpsend_short(dst, pkg, len, timeout);
1022         }
1023 
1024         default:
1025         {
1026             MTLOG_ERROR("input params invalid, dst[%p], pkg[%p], len[%d], fun[%p], msg_ctx[%p], type[%d]",
1027                     dst, pkg, len, check_func, msg_ctx, type);
1028             return -10;
1029         }
1030     }
1031 
1032     return 0;
1033 }
1034 
1035 }
1036