xref: /f-stack/app/micro_thread/mt_api.cpp (revision 3b2bd0f6)
1 
2 /**
3  * Tencent is pleased to support the open source community by making MSEC available.
4  *
5  * Copyright (C) 2016 THL A29 Limited, a Tencent company. All rights reserved.
6  *
7  * Licensed under the GNU General Public License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License. You may
9  * obtain a copy of the License at
10  *
11  *     https://opensource.org/licenses/GPL-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software distributed under the
14  * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
15  * either express or implied. See the License for the specific language governing permissions
16  * and limitations under the License.
17  */
18 
19 
20 /**
21  *  @filename mt_sys_call.cpp
22  *  @info  ΢�̷߳�װϵͳapi, ͬ������΢�߳�API��ʵ���첽����
23  */
24 
25 #include "kqueue_proxy.h"
26 #include "micro_thread.h"
27 #include "mt_connection.h"
28 #include "mt_api.h"
29 #include "ff_api.h"
30 #include "mt_sys_hook.h"
31 
32 namespace NS_MICRO_THREAD {
33 
34 /**
35  * @brief ��������˿ڵ�socket�շ��ӿ�, ��socket������������, ҵ������֤������
36  * @param dst -�����͵�Ŀ�ĵ�ַ
37  * @param pkg -�������װ�İ���
38  * @param len -�������װ�İ��峤��
39  * @param rcv_buf -����Ӧ�����buff
40  * @param buf_size -modify-����Ӧ�����buff��С, �ɹ�����ʱ, �޸�ΪӦ�������
41  * @param timeout -��ʱʱ��, ��λms
42  * @return  0 �ɹ�, -1 ��socketʧ��, -2 ��������ʧ��, -3 ����Ӧ��ʧ��, �ɴ�ӡerrno, -10 ������Ч
43  */
44 int mt_udpsendrcv(struct sockaddr_in* dst, void* pkg, int len, void* rcv_buf, int& buf_size, int timeout)
45 {
46     int ret = 0;
47     int rc  = 0;
48     int flags = 1;
49     struct sockaddr_in from_addr = {0};
50     int addr_len = sizeof(from_addr);
51 
52     if(len<1 || buf_size<1 ||!dst || !pkg || !rcv_buf)
53     {
54         MTLOG_ERROR("mt_udpsendrcv input params invalid, dst[%p], pkg[%p], rcv_buf[%p], len[%d], buf_size[%d]",
55             dst, pkg, rcv_buf, len, buf_size);
56         return -10;
57     }
58 
59     int sock = socket(PF_INET, SOCK_DGRAM, 0);
60     if ((sock < 0) || (ioctl(sock, FIONBIO, &flags) < 0))
61     {
62         MT_ATTR_API(320842, 1); // socketʧ��
63         MTLOG_ERROR("mt_udpsendrcv new sock failed, sock: %d, errno: %d (%m)", sock, errno);
64         ret = -1;
65         goto EXIT_LABEL;
66     }
67 
68     rc = MtFrame::sendto(sock, pkg, len, 0, (struct sockaddr*)dst, (int)sizeof(*dst), timeout);
69     if (rc < 0)
70     {
71         MT_ATTR_API(320844, 1); // ����ʧ��
72         MTLOG_ERROR("mt_udpsendrcv send failed, rc: %d, errno: %d (%m)", rc, errno);
73         ret = -2;
74         goto EXIT_LABEL;
75     }
76 
77     rc = MtFrame::recvfrom(sock, rcv_buf, buf_size, 0, (struct sockaddr*)&from_addr, (socklen_t*)&addr_len, timeout);
78     if (rc < 0)
79     {
80         MT_ATTR_API(320845, 1); // ����δ��ȫ�ɹ�
81         MTLOG_ERROR("mt_udpsendrcv recv failed, rc: %d, errno: %d (%m)", rc, errno);
82         ret = -3;
83         goto EXIT_LABEL;
84     }
85     buf_size = rc;
86 
87 EXIT_LABEL:
88 
89     if (sock > 0)
90     {
91         close(sock);
92         sock = -1;
93     }
94 
95     return ret;
96 }
97 
98 /**
99  * @brief  ����TCP�׽��֣�������Ϊ������
100  * @return >=0 �ɹ�, <0 ʧ��
101  */
102 int mt_tcp_create_sock(void)
103 {
104     int fd;
105     int flag;
106 
107     // ����socket
108     fd = ::socket(AF_INET, SOCK_STREAM, 0);
109     if (fd < 0)
110     {
111         MTLOG_ERROR("create tcp socket failed, error: %m");
112         return -1;
113     }
114 
115     // ����socket������
116     flag = fcntl(fd, F_GETFL, 0);
117     if (flag == -1)
118     {
119         ::close(fd);
120         MTLOG_ERROR("get fd flags failed, error: %m");
121         return -2;
122     }
123 
124     if (flag & O_NONBLOCK)
125         return fd;
126 
127     if (fcntl(fd, F_SETFL, flag | O_NONBLOCK | O_NDELAY) == -1)
128     {
129         ::close(fd);
130         MTLOG_ERROR("set fd flags failed, error: %m");
131         return -3;
132     }
133 
134     return fd;
135 }
136 
137 /**
138  * @brief TCP��ȡ������֪ͨ������socket
139  */
140 static TcpKeepConn* mt_tcp_get_keep_conn(struct sockaddr_in* dst, int& sock)
141 {
142     // 1. ��ȡ�߳�֪ͨע�����
143     KqueuerObj* ntfy_obj = NtfyObjMgr::Instance()->GetNtfyObj(NTFY_OBJ_THREAD, 0);
144     if (NULL == ntfy_obj)
145     {
146         MTLOG_ERROR("get notify failed, logit");
147         return NULL;
148     }
149 
150     // 2. ��ȡ���Ӷ���, ����֪ͨ��Ϣ
151     TcpKeepConn* conn = dynamic_cast<TcpKeepConn*>(ConnectionMgr::Instance()->GetConnection(OBJ_TCP_KEEP, dst));
152     if (NULL == conn)
153     {
154         MTLOG_ERROR("get connection failed, dst[%p]", dst);
155         NtfyObjMgr::Instance()->FreeNtfyObj(ntfy_obj);
156         return NULL;
157     }
158     conn->SetNtfyObj(ntfy_obj);
159 
160     // 3. ��������socket���
161     int osfd = conn->CreateSocket();
162     if (osfd < 0)
163     {
164         ConnectionMgr::Instance()->FreeConnection(conn, true);
165         MTLOG_ERROR("create socket failed, ret[%d]", osfd);
166         return NULL;
167     }
168 
169     // 4. �ɹ���������
170     sock = osfd;
171     return conn;
172 }
173 
174 /**
175  * @brief TCPѭ������, ֱ������OK��ʱ
176  *       [ע��] �����߲�Ҫ�����޸ĺ�������ֵ����֤��Ҫ��mt_tcpsendrcv�ȵ��ýӿڳ�ͻ [��Ҫ]
177  */
178 static int mt_tcp_check_recv(int sock, char* rcv_buf, int &len, int flags, int timeout, MtFuncTcpMsgLen func)
179 {
180     int recv_len = 0;
181     utime64_t start_ms = MtFrame::Instance()->GetLastClock();
182     do
183     {
184         utime64_t cost_time = MtFrame::Instance()->GetLastClock() - start_ms;
185         if (cost_time > (utime64_t)timeout)
186         {
187             errno = ETIME;
188             MTLOG_ERROR("tcp socket[%d] recv not ok, timeout", sock);
189             return -3;
190         }
191 
192         int rc = MtFrame::recv(sock, (rcv_buf + recv_len), (len - recv_len), 0, (timeout - (int)cost_time));
193         if (rc < 0)
194         {
195             MTLOG_ERROR("tcp socket[%d] recv failed ret[%d][%m]", sock, rc);
196             return -3;
197         }
198 		else if (rc == 0)
199         {
200         	len = recv_len;
201             MTLOG_ERROR("tcp socket[%d] remote close", sock);
202             return -7;
203         }
204         recv_len += rc;
205 
206         /* ��鱨�������� */
207         rc = func(rcv_buf, recv_len);
208         if (rc < 0)
209         {
210             MTLOG_ERROR("tcp socket[%d] user check pkg error[%d]", sock, rc);
211             return -5;
212         }
213         else if (rc == 0) // ����δ������
214         {
215             if (len == recv_len) // û�ռ��ٽ�����, ����
216             {
217                 MTLOG_ERROR("tcp socket[%d] user check pkg not ok, but no more buff", sock);
218                 return -6;
219             }
220             continue;
221         }
222         else    // �ɹ����㱨�ij���
223         {
224             if (rc > recv_len) // ���Ļ�δ��ȫ
225             {
226                 continue;
227             }
228             else
229             {
230                 len = rc;
231                 break;
232             }
233         }
234     } while (true);
235 
236     return 0;
237 }
238 
239 /**
240  * @brief TCP��������ӳصķ�ʽ����IP/PORT����, ���ӱ���Ĭ��10����
241  *        [ע��] tcp���շ���buff, ��������static����, ����������Ĵ��� [��Ҫ]
242  *        [ע��] �޸Ľӿڣ���ע�ⲻҪ����޸ķ���ֵ������֤��mt_tcpsendrcv_ex����ֵƥ�� [��Ҫ]
243  * @param dst -�����͵�Ŀ�ĵ�ַ
244  * @param pkg -�������װ�İ���
245  * @param len -�������װ�İ��峤��
246  * @param rcv_buf -����Ӧ�����buff
247  * @param buf_size -modify-����Ӧ�����buff��С, �ɹ�����ʱ, �޸�ΪӦ�������
248  * @param timeout -��ʱʱ��, ��λms
249  * @param check_func -��ⱨ���Ƿ�ɹ����ﺯ��
250  * @return  0 �ɹ�, -1 ��socketʧ��, -2 ��������ʧ��, -3 ����Ӧ��ʧ��,
251  *          -4 ����ʧ��, -5 ��ⱨ��ʧ��, -6 ���տռ䲻��, -7 ��������ر����ӣ�-10 ������Ч
252  */
253 int mt_tcpsendrcv(struct sockaddr_in* dst, void* pkg, int len, void* rcv_buf, int& buf_size, int timeout, MtFuncTcpMsgLen func)
254 {
255     if (!dst || !pkg || !rcv_buf || !func || len<1 || buf_size<1)
256     {
257         MTLOG_ERROR("input params invalid, dst[%p], pkg[%p], rcv_buf[%p], fun[%p], len[%d], buf_size[%d]",
258             dst, pkg, rcv_buf, func, len, buf_size);
259         return -10;
260     }
261 
262     int ret = 0, rc = 0;
263     int addr_len = sizeof(struct sockaddr_in);
264     utime64_t start_ms = MtFrame::Instance()->GetLastClock();
265     utime64_t cost_time = 0;
266     int time_left = timeout;
267 
268     // 1. ��ȡTCP���ӳض���, �ҽ�֪ͨ����
269     int sock = -1;
270     TcpKeepConn* conn = mt_tcp_get_keep_conn(dst, sock);
271     if ((conn == NULL) || (sock < 0))
272     {
273         MTLOG_ERROR("socket[%d] get conn failed, ret[%m]", sock);
274         ret = -1;
275         goto EXIT_LABEL;
276     }
277 
278     // 2. ���Լ����½�����
279     rc = MtFrame::connect(sock, (struct sockaddr *)dst, addr_len, time_left);
280     if (rc < 0)
281     {
282         MTLOG_ERROR("socket[%d] connect failed, ret[%d][%m]", sock, rc);
283         ret = -4;
284         goto EXIT_LABEL;
285     }
286 
287     // 3. �������ݴ���
288     cost_time = MtFrame::Instance()->GetLastClock() - start_ms;
289     time_left = (timeout > (int)cost_time) ? (timeout - (int)cost_time) : 0;
290     rc = MtFrame::send(sock, pkg, len, 0, time_left);
291     if (rc < 0)
292     {
293         MTLOG_ERROR("socket[%d] send failed, ret[%d][%m]", sock, rc);
294         ret = -2;
295         goto EXIT_LABEL;
296     }
297 
298     // 4. �������ݴ���
299     cost_time = MtFrame::Instance()->GetLastClock() - start_ms;
300     time_left = (timeout > (int)cost_time) ? (timeout - (int)cost_time) : 0;
301     rc = mt_tcp_check_recv(sock, (char*)rcv_buf, buf_size, 0, time_left, func);
302     if (rc < 0)
303     {
304         MTLOG_ERROR("socket[%d] rcv failed, ret[%d][%m]", sock, rc);
305         ret = rc;
306         goto EXIT_LABEL;
307     }
308 
309     ret = 0;
310 
311 EXIT_LABEL:
312 
313     // ʧ����ǿ���ͷ�����, ����ʱ����
314     if (conn != NULL)
315     {
316         ConnectionMgr::Instance()->FreeConnection(conn, (ret < 0));
317     }
318 
319     return ret;
320 }
321 
322 /**
323  * @brief TCP�������շ�����
324  *        [ע��] tcp���շ���buff, ��������static����, ����������Ĵ��� [��Ҫ]
325  *        [ע��] �޸Ľӿڣ���ע�ⲻҪ����޸ķ���ֵ������֤��mt_tcpsendrcv_ex����ֵƥ�� [��Ҫ]
326  * @param dst -�����͵�Ŀ�ĵ�ַ
327  * @param pkg -�������װ�İ���
328  * @param len -�������װ�İ��峤��
329  * @param rcv_buf -����Ӧ�����buff
330  * @param buf_size -modify-����Ӧ�����buff��С, �ɹ�����ʱ, �޸�ΪӦ�������
331  * @param timeout -��ʱʱ��, ��λms
332  * @param check_func -��ⱨ���Ƿ�ɹ����ﺯ��
333  * @return  0 �ɹ�, -1 ��socketʧ��, -2 ��������ʧ��, -3 ����Ӧ��ʧ��,
334  *          -4 ����ʧ��, -5 ��ⱨ��ʧ��, -6 ���տռ䲻��, -7 ��������ر����ӣ�-10 ������Ч
335  */
336 int mt_tcpsendrcv_short(struct sockaddr_in* dst, void* pkg, int len, void* rcv_buf, int& buf_size, int timeout, MtFuncTcpMsgLen func)
337 {
338     int ret = 0, rc = 0;
339     int addr_len = sizeof(struct sockaddr_in);
340     utime64_t start_ms = MtFrame::Instance()->GetLastClock();
341     utime64_t cost_time = 0;
342     int time_left = timeout;
343 
344     // 1. �������
345     if (!dst || !pkg || !rcv_buf || !func || len<1 || buf_size<1)
346     {
347         MTLOG_ERROR("input params invalid, dst[%p], pkg[%p], rcv_buf[%p], fun[%p], len[%d], buf_size[%d]",
348             dst, pkg, rcv_buf, func, len, buf_size);
349         return -10;
350     }
351 
352     // 2. ����TCP socket
353     int sock;
354     sock = mt_tcp_create_sock();
355     if (sock < 0)
356     {
357         MTLOG_ERROR("create tcp socket failed, ret: %d", sock);
358         return -1;
359     }
360 
361     // 3. ���Լ����½�����
362     rc = MtFrame::connect(sock, (struct sockaddr *)dst, addr_len, time_left);
363     if (rc < 0)
364     {
365         MTLOG_ERROR("socket[%d] connect failed, ret[%d][%m]", sock, rc);
366         ret = -4;
367         goto EXIT_LABEL;
368     }
369 
370     // 4. �������ݴ���
371     cost_time = MtFrame::Instance()->GetLastClock() - start_ms;
372     time_left = (timeout > (int)cost_time) ? (timeout - (int)cost_time) : 0;
373     rc = MtFrame::send(sock, pkg, len, 0, time_left);
374     if (rc < 0)
375     {
376         MTLOG_ERROR("socket[%d] send failed, ret[%d][%m]", sock, rc);
377         ret = -2;
378         goto EXIT_LABEL;
379     }
380 
381     // 5. �������ݴ���
382     cost_time = MtFrame::Instance()->GetLastClock() - start_ms;
383     time_left = (timeout > (int)cost_time) ? (timeout - (int)cost_time) : 0;
384     rc = mt_tcp_check_recv(sock, (char*)rcv_buf, buf_size, 0, time_left, func);
385     if (rc < 0)
386     {
387         MTLOG_ERROR("socket[%d] rcv failed, ret[%d][%m]", sock, rc);
388         ret = rc;
389         goto EXIT_LABEL;
390     }
391 
392     ret = 0;
393 
394 EXIT_LABEL:
395     if (sock >= 0)
396         ::close(sock);
397 
398     return ret;
399 }
400 
401 
402 /**
403  * @brief TCP��������ӳصķ�ʽ����IP/PORT����, ���ӱ���Ĭ��10����
404  *        [ע��] tcp����buff, ��������static����, ����������Ĵ��� [��Ҫ]
405  *        [ע��] �޸Ľӿڣ���ע�ⲻҪ����޸ķ���ֵ������֤��mt_tcpsendrcv_ex����ֵƥ�� [��Ҫ]
406  * @param dst -�����͵�Ŀ�ĵ�ַ
407  * @param pkg -�������װ�İ���
408  * @param len -�������װ�İ��峤��
409  * @param timeout -��ʱʱ��, ��λms
410  * @return  0 �ɹ�, -1 ��socketʧ��, -2 ��������ʧ��, -4 ����ʧ��, -10 ������Ч
411  */
412 int mt_tcpsend(struct sockaddr_in* dst, void* pkg, int len, int timeout)
413 {
414     if (!dst || !pkg || len<1)
415     {
416         MTLOG_ERROR("input params invalid, dst[%p], pkg[%p], len[%d]", dst, pkg, len);
417         return -10;
418     }
419 
420     int ret = 0, rc = 0;
421     int addr_len = sizeof(struct sockaddr_in);
422     utime64_t start_ms = MtFrame::Instance()->GetLastClock();
423     utime64_t cost_time = 0;
424     int time_left = timeout;
425 
426     // 1. ��ȡTCP���ӳض���, �ҽ�֪ͨ����
427     int sock = -1;
428     TcpKeepConn* conn = mt_tcp_get_keep_conn(dst, sock);
429     if ((conn == NULL) || (sock < 0))
430     {
431         MTLOG_ERROR("socket[%d] get conn failed, ret[%m]", sock);
432         ret = -1;
433         goto EXIT_LABEL;
434     }
435 
436     // 2. ���Լ����½�����
437     rc = MtFrame::connect(sock, (struct sockaddr *)dst, addr_len, time_left);
438     if (rc < 0)
439     {
440         MTLOG_ERROR("socket[%d] connect failed, ret[%d][%m]", sock, rc);
441         ret = -4;
442         goto EXIT_LABEL;
443     }
444 
445     // 3. �������ݴ���
446     cost_time = MtFrame::Instance()->GetLastClock() - start_ms;
447     time_left = (timeout > (int)cost_time) ? (timeout - (int)cost_time) : 0;
448     rc = MtFrame::send(sock, pkg, len, 0, time_left);
449     if (rc < 0)
450     {
451         MTLOG_ERROR("socket[%d] send failed, ret[%d][%m]", sock, rc);
452         ret = -2;
453         goto EXIT_LABEL;
454     }
455 
456     ret = 0;
457 
458 EXIT_LABEL:
459 
460     // ʧ����ǿ���ͷ�����, ����ʱ����
461     if (conn != NULL)
462     {
463         ConnectionMgr::Instance()->FreeConnection(conn, (ret < 0));
464     }
465 
466     return ret;
467 }
468 
469 /**
470  * @brief TCP������ֻ�����սӿ�
471  *        [ע��] tcp����buff, ��������static����, ����������Ĵ��� [��Ҫ]
472  *        [ע��] �޸Ľӿڣ���ע�ⲻҪ����޸ķ���ֵ������֤��mt_tcpsendrcv_ex����ֵƥ�� [��Ҫ]
473  * @param dst -�����͵�Ŀ�ĵ�ַ
474  * @param pkg -�������װ�İ���
475  * @param len -�������װ�İ��峤��
476  * @param timeout -��ʱʱ��, ��λms
477  * @return  0 �ɹ�, -1 ��socketʧ��, -2 ��������ʧ��, -4 ����ʧ��, -10 ������Ч
478  */
479 int mt_tcpsend_short(struct sockaddr_in* dst, void* pkg, int len, int timeout)
480 {
481     // 1. ��μ��
482     if (!dst || !pkg || len<1)
483     {
484         MTLOG_ERROR("input params invalid, dst[%p], pkg[%p], len[%d]", dst, pkg, len);
485         return -10;
486     }
487 
488     int ret = 0, rc = 0;
489     int addr_len = sizeof(struct sockaddr_in);
490     utime64_t start_ms = MtFrame::Instance()->GetLastClock();
491     utime64_t cost_time = 0;
492     int time_left = timeout;
493 
494     // 2. ����TCP socket
495     int sock = -1;
496     sock = mt_tcp_create_sock();
497     if (sock < 0)
498     {
499         MTLOG_ERROR("create tcp socket failed, ret: %d", sock);
500         ret = -1;
501         goto EXIT_LABEL;
502     }
503 
504     // 2. ���Լ����½�����
505     rc = MtFrame::connect(sock, (struct sockaddr *)dst, addr_len, time_left);
506     if (rc < 0)
507     {
508         MTLOG_ERROR("socket[%d] connect failed, ret[%d][%m]", sock, rc);
509         ret = -4;
510         goto EXIT_LABEL;
511     }
512 
513     // 3. �������ݴ���
514     cost_time = MtFrame::Instance()->GetLastClock() - start_ms;
515     time_left = (timeout > (int)cost_time) ? (timeout - (int)cost_time) : 0;
516     rc = MtFrame::send(sock, pkg, len, 0, time_left);
517     if (rc < 0)
518     {
519         MTLOG_ERROR("socket[%d] send failed, ret[%d][%m]", sock, rc);
520         ret = -2;
521         goto EXIT_LABEL;
522     }
523 
524     ret = 0;
525 
526 EXIT_LABEL:
527 
528     if (sock >= 0)
529         ::close(sock);
530 
531     return ret;
532 }
533 
534 
535 /**
536  * @brief TCP�շ��ӿڣ�����ѡ���˱������ӻ��߶�����
537  *        [ע��] tcp���շ���buff, ��������static����, ����������Ĵ��� [��Ҫ]
538  * @param dst -�����͵�Ŀ�ĵ�ַ
539  * @param pkg -�������װ�İ���
540  * @param len -�������װ�İ��峤��
541  * @param rcv_buf -����Ӧ�����buff��ֻ�����տ�������ΪNULL
542  * @param buf_size -modify-����Ӧ�����buff��С, �ɹ�����ʱ, �޸�ΪӦ������ȣ�ֻ�����գ�����ΪNULL
543  * @param timeout -��ʱʱ��, ��λms
544  * @param check_func -��ⱨ���Ƿ�ɹ����ﺯ��
545  * @param type - ��������
546  *               MT_TCP_SHORT: һ��һ�������ӣ�
547  *               MT_TCP_LONG : һ��һ�ճ����ӣ�
548  *               MT_TCP_LONG_SNDONLY : ֻ�����ճ����ӣ�
549  *               MT_TCP_SHORT_SNDONLY: ֻ�����ն����ӣ�
550  * @return  0 �ɹ�, -1 ��socketʧ��, -2 ��������ʧ��, -3 ����Ӧ��ʧ��,
551  *          -4 ����ʧ��, -5 ��ⱨ��ʧ��, -6 ���տռ䲻��, -7 ��������ر�����, -10 ������Ч
552  */
553 int mt_tcpsendrcv_ex(struct sockaddr_in* dst, void* pkg, int len, void* rcv_buf, int* buf_size, int timeout, MtFuncTcpMsgLen func, MT_TCP_CONN_TYPE type)
554 {
555 	if(!dst || !pkg || len<1)
556 	{
557         MTLOG_ERROR("input params invalid, dst[%p], pkg[%p], rcv_buf[%p], fun[%p], len[%d], buf_size[%p]type[%d]",
558                     dst, pkg, rcv_buf, func, len, buf_size,type);
559         return -10;
560 	}
561 
562     switch (type)
563     {
564         // TCP�����ӵ�������
565         case MT_TCP_LONG:
566         {
567             return mt_tcpsendrcv(dst, pkg, len, rcv_buf, *buf_size, timeout, func);
568         }
569 
570         // TCP������ֻ������
571         case MT_TCP_LONG_SNDONLY:
572         {
573             return mt_tcpsend(dst, pkg, len, timeout);
574         }
575 
576         // TCP�����ӵ�������
577         case MT_TCP_SHORT:
578         {
579             return mt_tcpsendrcv_short(dst, pkg, len, rcv_buf, *buf_size, timeout, func);
580         }
581 
582         // TCP������ֻ������
583         case MT_TCP_SHORT_SNDONLY:
584         {
585             return mt_tcpsend_short(dst, pkg, len, timeout);
586         }
587 
588         default:
589         {
590             MTLOG_ERROR("input params invalid, dst[%p], pkg[%p], rcv_buf[%p], fun[%p], len[%d], buf_size[%p]type[%d]",
591                         dst, pkg, rcv_buf, func, len, buf_size,type);
592             return -10;
593         }
594     }
595 
596     return 0;
597 }
598 
599 
600 
601 /**
602  * @brief ������ص���������
603  */
604 static void mt_task_process(void* arg)
605 {
606     int rc = 0;
607     IMtTask* task = (IMtTask*)arg;
608     if (!task)
609     {
610         MTLOG_ERROR("Invalid arg, error");
611         return;
612     }
613 
614     rc = task->Process();
615     if (rc != 0)
616     {
617         MTLOG_DEBUG("task process failed(%d), log", rc);
618     }
619 
620     task->SetResult(rc);
621 
622     return;
623 };
624 
625 /**
626  * @brief ��·IO�Ĵ���, ��������̹߳���
627  * @param req_list - �����б�
628  * @return 0 �ɹ�, <0ʧ��
629  */
630 int mt_exec_all_task(IMtTaskList& req_list)
631 {
632     MtFrame* mtframe    = MtFrame::Instance();
633     MicroThread* thread = mtframe->GetActiveThread();
634     IMtTask* task       = NULL;
635     MicroThread* sub    = NULL;
636     MicroThread* tmp    = NULL;
637     int rc              = -1;
638 
639     MicroThread::SubThreadList list;
640     TAILQ_INIT(&list);
641 
642     // ��ֹû��task������΢�߳�һֱ����ס
643     if (0 == req_list.size())
644     {
645         MTLOG_DEBUG("no task for execult");
646         return 0;
647     }
648 
649     // 1. �����̶߳���
650     for (IMtTaskList::iterator it = req_list.begin(); it != req_list.end(); ++it)
651     {
652         task = *it;
653         sub = MtFrame::CreateThread(mt_task_process, task, false);
654         if (NULL == sub)
655         {
656             MTLOG_ERROR("create sub thread failed");
657             goto EXIT_LABEL;
658         }
659 
660         sub->SetType(MicroThread::SUB_THREAD);
661         TAILQ_INSERT_TAIL(&list, sub, _sub_entry);
662     }
663 
664     // 2. ����ִ������
665     TAILQ_FOREACH_SAFE(sub, &list, _sub_entry, tmp)
666     {
667         TAILQ_REMOVE(&list, sub, _sub_entry);
668         thread->AddSubThread(sub);
669         mtframe->InsertRunable(sub);
670     }
671 
672     // 3. �ȴ����߳�ִ�н���
673     thread->Wait();
674     rc = 0;
675 
676 EXIT_LABEL:
677 
678     TAILQ_FOREACH_SAFE(sub, &list, _sub_entry, tmp)
679     {
680         TAILQ_REMOVE(&list, sub, _sub_entry);
681         mtframe->FreeThread(sub);
682     }
683 
684     return rc;
685 
686 }
687 
688 /**
689  * @brief ���õ�ǰIMtMsg��˽�б���
690  * @info  ֻ����ָ�룬�ڴ���Ҫҵ�����
691  */
692 void mt_set_msg_private(void *data)
693 {
694     MicroThread *msg_thread = MtFrame::Instance()->GetRootThread();
695     if (msg_thread != NULL)
696         msg_thread->SetPrivate(data);
697 }
698 
699 /**
700  * @brief  ��ȡ��ǰIMtMsg��˽�б���
701  * @return ˽�б���ָ��
702  */
703 void* mt_get_msg_private()
704 {
705     MicroThread *msg_thread = MtFrame::Instance()->GetRootThread();
706     if (NULL == msg_thread)
707     {
708         return NULL;
709     }
710 
711     return msg_thread->GetPrivate();
712 }
713 
714 /**
715  * @brief  ΢�߳̿�ܳ�ʼ��
716  * @info   ҵ��ʹ��spp������΢�̣߳���Ҫ���øó�ʼ������
717  * @return false:��ʼ��ʧ��  true:��ʼ���ɹ�
718  */
719 bool mt_init_frame(int argc, char * const argv[])
720 {
721 	if (argc) {
722 		ff_init(argc, argv);
723 		ff_set_hook_flag();
724 	}
725 	memset(&g_mt_syscall_tab, 0, sizeof(g_mt_syscall_tab));
726     return MtFrame::Instance()->InitFrame();
727 }
728 
729 /**
730  * @brief ����΢�̶߳���ջ�ռ��С
731  * @info  �DZ������ã�Ĭ�ϴ�СΪ128K
732  */
733 void mt_set_stack_size(unsigned int bytes)
734 {
735     ThreadPool::SetDefaultStackSize(bytes);
736 }
737 
738 /**
739  * @brief ΢�̰߳�����ϵͳIO���� recvfrom
740  * @param fd ϵͳsocket��Ϣ
741  * @param buf ������Ϣ������ָ��
742  * @param len ������Ϣ����������
743  * @param from ��Դ��ַ��ָ��
744  * @param fromlen ��Դ��ַ�Ľṹ����
745  * @param timeout ��ȴ�ʱ��, ����
746  * @return >0 �ɹ����ճ���, <0 ʧ��
747  */
748 int mt_recvfrom(int fd, void *buf, int len, int flags, struct sockaddr *from, socklen_t *fromlen, int timeout)
749 {
750     return MtFrame::recvfrom(fd, buf, len, flags, from, fromlen, timeout);
751 }
752 
753 /**
754  * @brief ΢�̰߳�����ϵͳIO���� sendto
755  * @param fd ϵͳsocket��Ϣ
756  * @param msg �����͵���Ϣָ��
757  * @param len �����͵���Ϣ����
758  * @param to Ŀ�ĵ�ַ��ָ��
759  * @param tolen Ŀ�ĵ�ַ�Ľṹ����
760  * @param timeout ��ȴ�ʱ��, ����
761  * @return >0 �ɹ����ͳ���, <0 ʧ��
762  */
763 int mt_sendto(int fd, const void *msg, int len, int flags, const struct sockaddr *to, int tolen, int timeout)
764 {
765     return MtFrame::sendto(fd, msg, len, flags, to, tolen, timeout);
766 }
767 
768 /**
769  * @brief ΢�̰߳�����ϵͳIO���� connect
770  * @param fd ϵͳsocket��Ϣ
771  * @param addr ָ��server��Ŀ�ĵ�ַ
772  * @param addrlen ��ַ�ij���
773  * @param timeout ��ȴ�ʱ��, ����
774  * @return >0 �ɹ����ͳ���, <0 ʧ��
775  */
776 int mt_connect(int fd, const struct sockaddr *addr, int addrlen, int timeout)
777 {
778     return MtFrame::connect(fd, addr, addrlen, timeout);
779 }
780 
781 /**
782  * @brief ΢�̰߳�����ϵͳIO���� accept
783  * @param fd �����׽���
784  * @param addr �ͻ��˵�ַ
785  * @param addrlen ��ַ�ij���
786  * @param timeout ��ȴ�ʱ��, ����
787  * @return >=0 accept��socket������, <0 ʧ��
788  */
789 int mt_accept(int fd, struct sockaddr *addr, socklen_t *addrlen, int timeout)
790 {
791     return MtFrame::accept(fd, addr, addrlen, timeout);
792 }
793 
794 
795 /**
796  * @brief ΢�̰߳�����ϵͳIO���� read
797  * @param fd ϵͳsocket��Ϣ
798  * @param buf ������Ϣ������ָ��
799  * @param nbyte ������Ϣ����������
800  * @param timeout ��ȴ�ʱ��, ����
801  * @return >0 �ɹ����ճ���, <0 ʧ��
802  */
803 ssize_t mt_read(int fd, void *buf, size_t nbyte, int timeout)
804 {
805     return MtFrame::read(fd, buf, nbyte, timeout);
806 }
807 
808 /**
809  * @brief ΢�̰߳�����ϵͳIO���� write
810  * @param fd ϵͳsocket��Ϣ
811  * @param buf �����͵���Ϣָ��
812  * @param nbyte �����͵���Ϣ����
813  * @param timeout ��ȴ�ʱ��, ����
814  * @return >0 �ɹ����ͳ���, <0 ʧ��
815  */
816 ssize_t mt_write(int fd, const void *buf, size_t nbyte, int timeout)
817 {
818     return MtFrame::write(fd, buf, nbyte, timeout);
819 }
820 
821 /**
822  * @brief ΢�̰߳�����ϵͳIO���� recv
823  * @param fd ϵͳsocket��Ϣ
824  * @param buf ������Ϣ������ָ��
825  * @param len ������Ϣ����������
826  * @param timeout ��ȴ�ʱ��, ����
827  * @return >0 �ɹ����ճ���, <0 ʧ��
828  */
829 ssize_t mt_recv(int fd, void *buf, int len, int flags, int timeout)
830 {
831     return MtFrame::recv(fd, buf, len, flags, timeout);
832 }
833 
834 /**
835  * @brief ΢�̰߳�����ϵͳIO���� send
836  * @param fd ϵͳsocket��Ϣ
837  * @param buf �����͵���Ϣָ��
838  * @param nbyte �����͵���Ϣ����
839  * @param timeout ��ȴ�ʱ��, ����
840  * @return >0 �ɹ����ͳ���, <0 ʧ��
841  */
842 ssize_t mt_send(int fd, const void *buf, size_t nbyte, int flags, int timeout)
843 {
844     return MtFrame::send(fd, buf, nbyte, flags, timeout);
845 }
846 
847 /**
848  * @brief ΢�߳�����sleep�ӿ�, ��λms
849  */
850 void mt_sleep(int ms)
851 {
852     MtFrame::sleep(ms);
853 }
854 
855 /**
856  * @brief ΢�̻߳�ȡϵͳʱ�䣬��λms
857  */
858 unsigned long long mt_time_ms(void)
859 {
860     return MtFrame::Instance()->GetLastClock();
861 }
862 
863 /**
864  * @brief ΢�̵߳ȴ�epoll�¼��İ�������
865  */
866 int mt_wait_events(int fd, int events, int timeout)
867 {
868     return MtFrame::Instance()->WaitEvents(fd, events, timeout);
869 }
870 
871 void* mt_start_thread(void* entry, void* args)
872 {
873     return MtFrame::Instance()->CreateThread((ThreadStart)entry, args, true);
874 }
875 
876 #define BUF_ALIGNMENT_SIZE 4096
877 #define BUF_ALIGN_SIZE(x) (((x)+BUF_ALIGNMENT_SIZE-1)&~(BUF_ALIGNMENT_SIZE-1))
878 #define BUF_DEFAULT_SIZE 4096
879 
880 class ScopedBuf
881 {
882 public:
883 	ScopedBuf(void*& buf_keeper, bool keep)
884 	:buf_keeper_(buf_keeper),buf_(0),len_(0),len_watermark_(0),keep_(keep)
885 	{}
886 
887 	int Alloc(int len)
888 	{
889 		if(len<len_)
890 		{
891 			return -1; // �ռ��СԽ��
892 		}
893 
894         if(len==0)
895         {
896             len = BUF_ALIGNMENT_SIZE;
897         }
898 		if(len_==len)
899 		{
900 			return 0;
901 		}
902 
903 		len_ = BUF_ALIGN_SIZE(len);
904 		if(len_==0)
905 		{
906 			len_ = BUF_DEFAULT_SIZE;
907 		}
908 		len_watermark_ = len_-BUF_ALIGNMENT_SIZE;
909 		char* tmp = (char*)realloc(buf_, len_);
910 		if(tmp==NULL)
911 		{
912 	        return -2; // �����ڴ�ʧ��
913 		}
914 
915 		buf_ = tmp;
916 		return 0;
917 	}
918 
919 	void reset()
920 	{
921 		if(keep_)
922 		{
923 			buf_keeper_ = (void*)buf_;
924 			buf_ = NULL;
925 		}
926 	}
927 
928 	~ScopedBuf()
929 	{
930 		if(buf_!=NULL)
931 		{
932 			free(buf_);
933 			buf_ = NULL;
934 		}
935 	}
936 
937 public:
938 	void* &buf_keeper_;
939 	char* buf_;
940 	int   len_;
941     int   len_watermark_;
942 	bool  keep_;
943 
944 };
945 
946 /**
947  * @brief TCPѭ������, ֱ������OK��ʱ
948  *       [ע��] �����߲�Ҫ�����޸ĺ�������ֵ����֤��Ҫ��mt_tcpsendrcv�ȵ��ýӿڳ�ͻ [��Ҫ]
949  */
950 static int mt_tcp_check_recv(int sock, void*& rcv_buf, int &len, int flags,
951                      int timeout, MtFuncTcpMsgChecker check_func, void* msg_ctx, bool keep_rcv_buf)
952 {
953     int recv_len = 0;
954     utime64_t start_ms = MtFrame::Instance()->GetLastClock();
955 
956 	int rc = 0;
957 	int ret = 0;
958     int pkg_len = 0;
959     bool msg_len_detected = false;
960 
961 	ScopedBuf sbuf(rcv_buf, keep_rcv_buf);
962 	ret = sbuf.Alloc(len);
963 
964 	if(ret!=0)
965 	{
966         MTLOG_ERROR("tcp socket[%d] recv failed ret[%d], alloc rcv buf failed, [%m]", sock, ret);
967 		return -11;
968 	}
969 
970     do
971     {
972         utime64_t cost_time = MtFrame::Instance()->GetLastClock() - start_ms;
973         if (cost_time > (utime64_t)timeout)
974         {
975             errno = ETIME;
976             MTLOG_ERROR("tcp socket[%d] recv not ok, timeout", sock);
977             return -3;
978         }
979 
980         rc = MtFrame::recv(sock, (sbuf.buf_ + recv_len), (sbuf.len_ - recv_len), 0, (timeout - (int)cost_time));
981         if (rc < 0)
982         {
983             MTLOG_ERROR("tcp socket[%d] recv failed ret[%d][%m]", sock, rc);
984             return -3;
985         }
986 		else if (rc == 0) // Զ�˹ر�
987         {
988 
989 			if(recv_len==0) // δ�ذ���ֱ�ӷ���Զ�˹ر�
990 			{
991 	            MTLOG_ERROR("tcp socket[%d] remote close", sock);
992 	            return -7;
993 			}
994 
995 	        /* ��鱨�������� */
996 		    rc = check_func(sbuf.buf_, recv_len, true, msg_ctx, msg_len_detected);
997 
998 			if(rc!=recv_len) // ҵ�����Զ�˹رգ�Ӧ�÷������������ȣ�����<=0,��ʾ����������
999 			{
1000 	            MTLOG_ERROR("tcp socket[%d] remote close", sock);
1001     	        return -7;
1002 			}
1003         	len = recv_len;
1004 			break;
1005         }
1006         recv_len += rc;
1007 
1008         /* ��鱨�������� */
1009         if((!msg_len_detected)||recv_len==pkg_len)
1010         {
1011             rc = check_func(sbuf.buf_, recv_len, false, msg_ctx,msg_len_detected);
1012             if(msg_len_detected)
1013             {
1014                 pkg_len = rc;
1015             }
1016         }
1017         else
1018         {
1019             rc = pkg_len;
1020         }
1021 
1022         if (rc < 0)
1023         {
1024             MTLOG_ERROR("tcp socket[%d] user check pkg error[%d]", sock, rc);
1025             return -5;
1026         }
1027         else if (rc == 0) // ����δ������,�Ҳ�ȷ����С
1028         {
1029         	if(sbuf.len_ > recv_len)
1030         	{
1031         		continue;
1032         	}
1033             // û�ռ��ٽ�����, 2����С��չbuf
1034 			ret = sbuf.Alloc(sbuf.len_<<1);
1035 
1036 			if(ret!=0)
1037 			{
1038 		        MTLOG_ERROR("tcp socket[%d] recv failed ret[%d], alloc rcv buf failed, [%m]", sock, ret);
1039 				return -11;
1040 			}
1041         }
1042         else    // �ɹ����㱨�ij���
1043         {
1044             if (rc > recv_len) // ���Ļ�δ��ȫ
1045             {
1046             	if(sbuf.len_ > recv_len) // recv buf���пռ�.��δ����ˮλ
1047             	{
1048             		continue;
1049             	}
1050 
1051 	            // û�ռ��ٽ�����, ����ҵ��ָʾ��С��չ�ڴ�
1052 				ret = sbuf.Alloc(rc);
1053 
1054 				if(ret!=0)
1055 				{
1056 			        MTLOG_ERROR("tcp socket[%d] recv failed ret[%d], alloc rcv buf failed, [%m]", sock, ret);
1057 					return -11;
1058 				}
1059             }
1060             else if(rc==recv_len) // �հ�����
1061             {
1062                 len = rc;
1063                 break;
1064             }
1065             else // ΢�߳����ģʽ�£�������ճ��
1066             {
1067 	            MTLOG_ERROR("tcp socket[%d] user check pkg error, pkg len < recv_len", sock);
1068 	            return -5;
1069             }
1070         }
1071     } while (true);
1072 
1073 	sbuf.reset();
1074 
1075     return 0;
1076 }
1077 
1078 
1079 
1080 /**
1081  * @brief TCP��������ӳصķ�ʽ����IP/PORT����, ���ӱ���Ĭ��10����
1082  *        [ע��] tcp���շ���buff, ��������static����, ����������Ĵ��� [��Ҫ]
1083  * @param dst -�����͵�Ŀ�ĵ�ַ
1084  * @param pkg -�������װ�İ���
1085  * @param len -�������װ�İ��峤��
1086  * @param rcv_buf -���������������ο����� keep_rcv_buf��
1087  * @param recv_pkg_size -����Ӧ�����buff�ij�ʼ��С����������0ʱ�����ر���ʵ�ʳ��ȡ�
1088  * @param timeout -��ʱʱ��, ��λms
1089  * @param check_func -��ⱨ���Ƿ�ɹ����ﺯ��
1090  * @param msg_ctx -�������ĵ������ı�����
1091  * @param keep_rcv_buf -true,��ܽ�����rcv_buf��ֵ������������ҵ�����ͷŸ�buf���������ڵ��ý���ǰ�ͷŸ�buf����ע�⡿
1092  *               ҵ����Ҫ�Լ���msg_ctx�б��������Ϣ�������ͨ��malloc�����ڴ桿
1093  * @return  0 �ɹ�, -1 ��socketʧ��, -2 ��������ʧ��, -3 ����Ӧ��ʧ��,
1094  *          -4 ����ʧ��, -5 ��ⱨ��ʧ��, -6 ���տռ䲻��, -7 ��������ر�����, -10 ������Ч
1095  */
1096 int mt_tcpsendrcv(struct sockaddr_in* dst, void* pkg, int len, void*& rcv_buf, int& recv_pkg_size,
1097                      int timeout, MtFuncTcpMsgChecker check_func, void* msg_ctx, bool keep_rcv_buf)
1098 {
1099 	if(!dst || !pkg || len<1)
1100 	{
1101         MTLOG_ERROR("input params invalid, dst[%p], pkg[%p], len[%d], fun[%p]",
1102                     dst, pkg, len, check_func);
1103         return -10;
1104 	}
1105 
1106 
1107     int ret = 0, rc = 0;
1108     int addr_len = sizeof(struct sockaddr_in);
1109     utime64_t start_ms = MtFrame::Instance()->GetLastClock();
1110     utime64_t cost_time = 0;
1111     int time_left = timeout;
1112 
1113     // 1. ��ȡTCP���ӳض���, �ҽ�֪ͨ����
1114     int sock = -1;
1115     TcpKeepConn* conn = mt_tcp_get_keep_conn(dst, sock);
1116     if ((conn == NULL) || (sock < 0))
1117     {
1118         MTLOG_ERROR("socket[%d] get conn failed, ret[%m]", sock);
1119         ret = -1;
1120         goto EXIT_LABEL;
1121     }
1122 
1123     // 2. ���Լ����½�����
1124     rc = MtFrame::connect(sock, (struct sockaddr *)dst, addr_len, time_left);
1125     if (rc < 0)
1126     {
1127         MTLOG_ERROR("socket[%d] connect failed, ret[%d][%m]", sock, rc);
1128         ret = -4;
1129         goto EXIT_LABEL;
1130     }
1131 
1132     // 3. �������ݴ���
1133     cost_time = MtFrame::Instance()->GetLastClock() - start_ms;
1134     time_left = (timeout > (int)cost_time) ? (timeout - (int)cost_time) : 0;
1135     rc = MtFrame::send(sock, pkg, len, 0, time_left);
1136     if (rc < 0)
1137     {
1138         MTLOG_ERROR("socket[%d] send failed, ret[%d][%m]", sock, rc);
1139         ret = -2;
1140         goto EXIT_LABEL;
1141     }
1142 
1143     // 4. �������ݴ���
1144     cost_time = MtFrame::Instance()->GetLastClock() - start_ms;
1145     time_left = (timeout > (int)cost_time) ? (timeout - (int)cost_time) : 0;
1146 
1147     rc = mt_tcp_check_recv(sock, rcv_buf, recv_pkg_size, 0, time_left, check_func, msg_ctx, keep_rcv_buf);
1148     if (rc < 0)
1149     {
1150         MTLOG_ERROR("socket[%d] rcv failed, ret[%d][%m]", sock, rc);
1151         ret = rc;
1152         goto EXIT_LABEL;
1153     }
1154 
1155     ret = 0;
1156 
1157 EXIT_LABEL:
1158 
1159     // ʧ����ǿ���ͷ�����, ����ʱ����
1160     if (conn != NULL)
1161     {
1162         ConnectionMgr::Instance()->FreeConnection(conn, (ret < 0));
1163     }
1164 
1165     return ret;
1166 }
1167 
1168 
1169 /**
1170  * @brief TCP�������շ�����
1171  *        [ע��] tcp���շ���buff, ��������static����, ����������Ĵ��� [��Ҫ]
1172  *        [ע��] �޸Ľӿڣ���ע�ⲻҪ����޸ķ���ֵ������֤��mt_tcpsendrcv_ex����ֵƥ�� [��Ҫ]
1173  * @param dst -�����͵�Ŀ�ĵ�ַ
1174  * @param pkg -�������װ�İ���
1175  * @param len -�������װ�İ��峤��
1176  * @param rcv_buf -���������������ο����� keep_rcv_buf��
1177  * @param recv_pkg_size -����Ӧ�����buff�ij�ʼ��С����������0ʱ�����ر���ʵ�ʳ��ȡ�
1178  * @param timeout -��ʱʱ��, ��λms
1179  * @param check_func -��ⱨ���Ƿ�ɹ����ﺯ��
1180  * @param msg_ctx -�������ĵ������ı�����
1181  * @param keep_rcv_buf -true,��ܽ�����rcv_buf��ֵ������������ҵ�����ͷŸ�buf���������ڵ��ý���ǰ�ͷŸ�buf����ע�⡿
1182  *               ҵ����Ҫ�Լ���msg_ctx�б��������Ϣ�������ͨ��malloc�����ڴ桿
1183  * @return  0 �ɹ�, -1 ��socketʧ��, -2 ��������ʧ��, -3 ����Ӧ��ʧ��,
1184  *          -4 ����ʧ��, -5 ��ⱨ��ʧ��, -6 ���տռ䲻��, -7 ��������ر�����, -10 ������Ч
1185  */
1186 int mt_tcpsendrcv_short(struct sockaddr_in* dst, void* pkg, int len, void*& rcv_buf, int& recv_pkg_size,
1187                      int timeout, MtFuncTcpMsgChecker check_func, void* msg_ctx, bool keep_rcv_buf)
1188 {
1189     int ret = 0, rc = 0;
1190     int addr_len = sizeof(struct sockaddr_in);
1191     utime64_t start_ms = MtFrame::Instance()->GetLastClock();
1192     utime64_t cost_time = 0;
1193     int time_left = timeout;
1194 
1195     // 1. �������
1196 	if(!dst || !pkg || len<1)
1197 	{
1198         MTLOG_ERROR("input params invalid, dst[%p], pkg[%p], len[%d], fun[%p]",
1199                     dst, pkg, len, check_func);
1200         return -10;
1201 	}
1202 
1203     // 2. ����TCP socket
1204     int sock;
1205     sock = mt_tcp_create_sock();
1206     if (sock < 0)
1207     {
1208         MTLOG_ERROR("create tcp socket failed, ret: %d", sock);
1209         return -1;
1210     }
1211 
1212     // 3. ���Լ����½�����
1213     rc = MtFrame::connect(sock, (struct sockaddr *)dst, addr_len, time_left);
1214     if (rc < 0)
1215     {
1216         MTLOG_ERROR("socket[%d] connect failed, ret[%d][%m]", sock, rc);
1217         ret = -4;
1218         goto EXIT_LABEL;
1219     }
1220 
1221     // 4. �������ݴ���
1222     cost_time = MtFrame::Instance()->GetLastClock() - start_ms;
1223     time_left = (timeout > (int)cost_time) ? (timeout - (int)cost_time) : 0;
1224     rc = MtFrame::send(sock, pkg, len, 0, time_left);
1225     if (rc < 0)
1226     {
1227         MTLOG_ERROR("socket[%d] send failed, ret[%d][%m]", sock, rc);
1228         ret = -2;
1229         goto EXIT_LABEL;
1230     }
1231 
1232     // 5. �������ݴ���
1233     cost_time = MtFrame::Instance()->GetLastClock() - start_ms;
1234     time_left = (timeout > (int)cost_time) ? (timeout - (int)cost_time) : 0;
1235     rc = mt_tcp_check_recv(sock, rcv_buf, recv_pkg_size, 0, time_left, check_func, msg_ctx, keep_rcv_buf);
1236 
1237     if (rc < 0)
1238     {
1239         MTLOG_ERROR("socket[%d] rcv failed, ret[%d][%m]", sock, rc);
1240         ret = rc;
1241         goto EXIT_LABEL;
1242     }
1243 
1244     ret = 0;
1245 
1246 EXIT_LABEL:
1247     if (sock >= 0)
1248         ::close(sock);
1249 
1250     return ret;
1251 }
1252 
1253 
1254 
1255 /**
1256  * @brief TCP�շ��ӿڣ�����ѡ���˱������ӻ��߶�����
1257  *        [ע��] tcp���շ���buff, ��������static����, ����������Ĵ��� [��Ҫ]
1258  * @param dst -�����͵�Ŀ�ĵ�ַ
1259  * @param pkg -�������װ�İ���
1260  * @param len -�������װ�İ��峤��
1261  * @param rcv_buf -���������������ο����� keep_rcv_buf��
1262  * @param recv_pkg_size -����Ӧ�����buff�ij�ʼ��С����������0ʱ�����ر���ʵ�ʳ��ȡ�
1263  * @param timeout -��ʱʱ��, ��λms
1264  * @param check_func -��ⱨ���Ƿ�ɹ����ﺯ��
1265  * @param msg_ctx -�������ĵ������ı�����
1266  *
1267  * @param type - ��������
1268  *               MT_TCP_SHORT: һ��һ�������ӣ�
1269  *               MT_TCP_LONG : һ��һ�ճ����ӣ�
1270  *               MT_TCP_LONG_SNDONLY : ֻ�����ճ����ӣ�
1271  *               MT_TCP_SHORT_SNDONLY: ֻ�����ն����ӣ�
1272  * @param keep_rcv_buf -true,��ܽ�����rcv_buf��ֵ������������ҵ�����ͷŸ�buf���������ڵ��ý���ǰ�ͷŸ�buf����ע�⡿
1273  *               ҵ����Ҫ�Լ���msg_ctx�б��������Ϣ�������ͨ��malloc�����ڴ桿
1274  * @return  0 �ɹ�, -1 ��socketʧ��, -2 ��������ʧ��, -3 ����Ӧ��ʧ��,
1275  *          -4 ����ʧ��, -5 ��ⱨ��ʧ��, -6 ���տռ䲻��, -7 ��������ر�����, -10 ������Ч
1276  */
1277 int mt_tcpsendrcv_ex(struct sockaddr_in* dst, void* pkg, int len, void*& rcv_buf, int& rcv_pkg_size,
1278                      int timeout, MtFuncTcpMsgChecker check_func, void* msg_ctx,
1279                      MT_TCP_CONN_TYPE type, bool keep_rcv_buf)
1280 {
1281 	if(!dst || !pkg || len<1)
1282 	{
1283         MTLOG_ERROR("input params invalid, dst[%p], pkg[%p], len[%d], fun[%p], msg_ctx[%p], type[%d]",
1284                     dst, pkg, len, check_func, msg_ctx, type);
1285         return -10;
1286 	}
1287 
1288     switch (type)
1289     {
1290         // TCP�����ӵ�������
1291         case MT_TCP_LONG:
1292         {
1293             return mt_tcpsendrcv(dst, pkg, len, rcv_buf, rcv_pkg_size, timeout, check_func, msg_ctx, keep_rcv_buf);
1294         }
1295 
1296         // TCP������ֻ������
1297         case MT_TCP_LONG_SNDONLY:
1298         {
1299             return mt_tcpsend(dst, pkg, len, timeout);
1300         }
1301 
1302         // TCP�����ӵ�������
1303         case MT_TCP_SHORT:
1304         {
1305             return mt_tcpsendrcv_short(dst, pkg, len, rcv_buf, rcv_pkg_size, timeout, check_func, msg_ctx, keep_rcv_buf);
1306         }
1307 
1308         // TCP������ֻ������
1309         case MT_TCP_SHORT_SNDONLY:
1310         {
1311             return mt_tcpsend_short(dst, pkg, len, timeout);
1312         }
1313 
1314         default:
1315         {
1316 	        MTLOG_ERROR("input params invalid, dst[%p], pkg[%p], len[%d], fun[%p], msg_ctx[%p], type[%d]",
1317                     dst, pkg, len, check_func, msg_ctx, type);
1318             return -10;
1319         }
1320     }
1321 
1322     return 0;
1323 }
1324 
1325 
1326 
1327 }
1328 
1329 
1330