xref: /f-stack/app/micro_thread/mt_api.cpp (revision a9643ea8)
1*a9643ea8Slogwang 
2*a9643ea8Slogwang /**
3*a9643ea8Slogwang  * Tencent is pleased to support the open source community by making MSEC available.
4*a9643ea8Slogwang  *
5*a9643ea8Slogwang  * Copyright (C) 2016 THL A29 Limited, a Tencent company. All rights reserved.
6*a9643ea8Slogwang  *
7*a9643ea8Slogwang  * Licensed under the GNU General Public License, Version 2.0 (the "License");
8*a9643ea8Slogwang  * you may not use this file except in compliance with the License. You may
9*a9643ea8Slogwang  * obtain a copy of the License at
10*a9643ea8Slogwang  *
11*a9643ea8Slogwang  *     https://opensource.org/licenses/GPL-2.0
12*a9643ea8Slogwang  *
13*a9643ea8Slogwang  * Unless required by applicable law or agreed to in writing, software distributed under the
14*a9643ea8Slogwang  * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
15*a9643ea8Slogwang  * either express or implied. See the License for the specific language governing permissions
16*a9643ea8Slogwang  * and limitations under the License.
17*a9643ea8Slogwang  */
18*a9643ea8Slogwang 
19*a9643ea8Slogwang 
20*a9643ea8Slogwang /**
21*a9643ea8Slogwang  *  @filename mt_sys_call.cpp
22*a9643ea8Slogwang  *  @info  ΢�̷߳�װϵͳapi, ͬ������΢�߳�API��ʵ���첽����
23*a9643ea8Slogwang  */
24*a9643ea8Slogwang 
25*a9643ea8Slogwang #include "kqueue_proxy.h"
26*a9643ea8Slogwang #include "micro_thread.h"
27*a9643ea8Slogwang #include "mt_connection.h"
28*a9643ea8Slogwang #include "mt_api.h"
29*a9643ea8Slogwang #include "ff_api.h"
30*a9643ea8Slogwang #include "mt_sys_hook.h"
31*a9643ea8Slogwang 
32*a9643ea8Slogwang namespace NS_MICRO_THREAD {
33*a9643ea8Slogwang 
34*a9643ea8Slogwang /**
35*a9643ea8Slogwang  * @brief ��������˿ڵ�socket�շ��ӿ�, ��socket������������, ҵ������֤������
36*a9643ea8Slogwang  * @param dst -�����͵�Ŀ�ĵ�ַ
37*a9643ea8Slogwang  * @param pkg -�������װ�İ���
38*a9643ea8Slogwang  * @param len -�������װ�İ��峤��
39*a9643ea8Slogwang  * @param rcv_buf -����Ӧ�����buff
40*a9643ea8Slogwang  * @param buf_size -modify-����Ӧ�����buff��С, �ɹ�����ʱ, �޸�ΪӦ�������
41*a9643ea8Slogwang  * @param timeout -��ʱʱ��, ��λms
42*a9643ea8Slogwang  * @return  0 �ɹ�, -1 ��socketʧ��, -2 ��������ʧ��, -3 ����Ӧ��ʧ��, �ɴ�ӡerrno, -10 ������Ч
43*a9643ea8Slogwang  */
44*a9643ea8Slogwang int mt_udpsendrcv(struct sockaddr_in* dst, void* pkg, int len, void* rcv_buf, int& buf_size, int timeout)
45*a9643ea8Slogwang {
46*a9643ea8Slogwang     int ret = 0;
47*a9643ea8Slogwang     int rc  = 0;
48*a9643ea8Slogwang     int flags = 1;
49*a9643ea8Slogwang     struct sockaddr_in from_addr = {0};
50*a9643ea8Slogwang     int addr_len = sizeof(from_addr);
51*a9643ea8Slogwang 
52*a9643ea8Slogwang     if(len<1 || buf_size<1 ||!dst || !pkg || !rcv_buf)
53*a9643ea8Slogwang     {
54*a9643ea8Slogwang         MTLOG_ERROR("mt_udpsendrcv input params invalid, dst[%p], pkg[%p], rcv_buf[%p], len[%d], buf_size[%d]",
55*a9643ea8Slogwang             dst, pkg, rcv_buf, len, buf_size);
56*a9643ea8Slogwang         return -10;
57*a9643ea8Slogwang     }
58*a9643ea8Slogwang 
59*a9643ea8Slogwang     int sock = socket(PF_INET, SOCK_DGRAM, 0);
60*a9643ea8Slogwang     if ((sock < 0) || (ioctl(sock, FIONBIO, &flags) < 0))
61*a9643ea8Slogwang     {
62*a9643ea8Slogwang         MT_ATTR_API(320842, 1); // socketʧ��
63*a9643ea8Slogwang         MTLOG_ERROR("mt_udpsendrcv new sock failed, sock: %d, errno: %d (%m)", sock, errno);
64*a9643ea8Slogwang         ret = -1;
65*a9643ea8Slogwang         goto EXIT_LABEL;
66*a9643ea8Slogwang     }
67*a9643ea8Slogwang 
68*a9643ea8Slogwang     rc = MtFrame::sendto(sock, pkg, len, 0, (struct sockaddr*)dst, (int)sizeof(*dst), timeout);
69*a9643ea8Slogwang     if (rc < 0)
70*a9643ea8Slogwang     {
71*a9643ea8Slogwang         MT_ATTR_API(320844, 1); // ����ʧ��
72*a9643ea8Slogwang         MTLOG_ERROR("mt_udpsendrcv send failed, rc: %d, errno: %d (%m)", rc, errno);
73*a9643ea8Slogwang         ret = -2;
74*a9643ea8Slogwang         goto EXIT_LABEL;
75*a9643ea8Slogwang     }
76*a9643ea8Slogwang 
77*a9643ea8Slogwang     rc = MtFrame::recvfrom(sock, rcv_buf, buf_size, 0, (struct sockaddr*)&from_addr, (socklen_t*)&addr_len, timeout);
78*a9643ea8Slogwang     if (rc < 0)
79*a9643ea8Slogwang     {
80*a9643ea8Slogwang         MT_ATTR_API(320845, 1); // ����δ��ȫ�ɹ�
81*a9643ea8Slogwang         MTLOG_ERROR("mt_udpsendrcv recv failed, rc: %d, errno: %d (%m)", rc, errno);
82*a9643ea8Slogwang         ret = -3;
83*a9643ea8Slogwang         goto EXIT_LABEL;
84*a9643ea8Slogwang     }
85*a9643ea8Slogwang     buf_size = rc;
86*a9643ea8Slogwang 
87*a9643ea8Slogwang EXIT_LABEL:
88*a9643ea8Slogwang 
89*a9643ea8Slogwang     if (sock > 0)
90*a9643ea8Slogwang     {
91*a9643ea8Slogwang         close(sock);
92*a9643ea8Slogwang         sock = -1;
93*a9643ea8Slogwang     }
94*a9643ea8Slogwang 
95*a9643ea8Slogwang     return ret;
96*a9643ea8Slogwang }
97*a9643ea8Slogwang 
98*a9643ea8Slogwang /**
99*a9643ea8Slogwang  * @brief  ����TCP�׽��֣�������Ϊ������
100*a9643ea8Slogwang  * @return >=0 �ɹ�, <0 ʧ��
101*a9643ea8Slogwang  */
102*a9643ea8Slogwang int mt_tcp_create_sock(void)
103*a9643ea8Slogwang {
104*a9643ea8Slogwang     int fd;
105*a9643ea8Slogwang     int flag;
106*a9643ea8Slogwang 
107*a9643ea8Slogwang     // ����socket
108*a9643ea8Slogwang     fd = ::socket(AF_INET, SOCK_STREAM, 0);
109*a9643ea8Slogwang     if (fd < 0)
110*a9643ea8Slogwang     {
111*a9643ea8Slogwang         MTLOG_ERROR("create tcp socket failed, error: %m");
112*a9643ea8Slogwang         return -1;
113*a9643ea8Slogwang     }
114*a9643ea8Slogwang 
115*a9643ea8Slogwang     // ����socket������
116*a9643ea8Slogwang     flag = fcntl(fd, F_GETFL, 0);
117*a9643ea8Slogwang     if (flag == -1)
118*a9643ea8Slogwang     {
119*a9643ea8Slogwang         ::close(fd);
120*a9643ea8Slogwang         MTLOG_ERROR("get fd flags failed, error: %m");
121*a9643ea8Slogwang         return -2;
122*a9643ea8Slogwang     }
123*a9643ea8Slogwang 
124*a9643ea8Slogwang     if (flag & O_NONBLOCK)
125*a9643ea8Slogwang         return fd;
126*a9643ea8Slogwang 
127*a9643ea8Slogwang     if (fcntl(fd, F_SETFL, flag | O_NONBLOCK | O_NDELAY) == -1)
128*a9643ea8Slogwang     {
129*a9643ea8Slogwang         ::close(fd);
130*a9643ea8Slogwang         MTLOG_ERROR("set fd flags failed, error: %m");
131*a9643ea8Slogwang         return -3;
132*a9643ea8Slogwang     }
133*a9643ea8Slogwang 
134*a9643ea8Slogwang     return fd;
135*a9643ea8Slogwang }
136*a9643ea8Slogwang 
137*a9643ea8Slogwang /**
138*a9643ea8Slogwang  * @brief TCP��ȡ������֪ͨ������socket
139*a9643ea8Slogwang  */
140*a9643ea8Slogwang static TcpKeepConn* mt_tcp_get_keep_conn(struct sockaddr_in* dst, int& sock)
141*a9643ea8Slogwang {
142*a9643ea8Slogwang     // 1. ��ȡ�߳�֪ͨע�����
143*a9643ea8Slogwang     KqueuerObj* ntfy_obj = NtfyObjMgr::Instance()->GetNtfyObj(NTFY_OBJ_THREAD, 0);
144*a9643ea8Slogwang     if (NULL == ntfy_obj)
145*a9643ea8Slogwang     {
146*a9643ea8Slogwang         MTLOG_ERROR("get notify failed, logit");
147*a9643ea8Slogwang         return NULL;
148*a9643ea8Slogwang     }
149*a9643ea8Slogwang 
150*a9643ea8Slogwang     // 2. ��ȡ���Ӷ���, ����֪ͨ��Ϣ
151*a9643ea8Slogwang     TcpKeepConn* conn = dynamic_cast<TcpKeepConn*>(ConnectionMgr::Instance()->GetConnection(OBJ_TCP_KEEP, dst));
152*a9643ea8Slogwang     if (NULL == conn)
153*a9643ea8Slogwang     {
154*a9643ea8Slogwang         MTLOG_ERROR("get connection failed, dst[%p]", dst);
155*a9643ea8Slogwang         NtfyObjMgr::Instance()->FreeNtfyObj(ntfy_obj);
156*a9643ea8Slogwang         return NULL;
157*a9643ea8Slogwang     }
158*a9643ea8Slogwang     conn->SetNtfyObj(ntfy_obj);
159*a9643ea8Slogwang 
160*a9643ea8Slogwang     // 3. ��������socket���
161*a9643ea8Slogwang     int osfd = conn->CreateSocket();
162*a9643ea8Slogwang     if (osfd < 0)
163*a9643ea8Slogwang     {
164*a9643ea8Slogwang         ConnectionMgr::Instance()->FreeConnection(conn, true);
165*a9643ea8Slogwang         MTLOG_ERROR("create socket failed, ret[%d]", osfd);
166*a9643ea8Slogwang         return NULL;
167*a9643ea8Slogwang     }
168*a9643ea8Slogwang 
169*a9643ea8Slogwang     // 4. �ɹ���������
170*a9643ea8Slogwang     sock = osfd;
171*a9643ea8Slogwang     return conn;
172*a9643ea8Slogwang }
173*a9643ea8Slogwang 
174*a9643ea8Slogwang /**
175*a9643ea8Slogwang  * @brief TCPѭ������, ֱ������OK��ʱ
176*a9643ea8Slogwang  *       [ע��] �����߲�Ҫ�����޸ĺ�������ֵ����֤��Ҫ��mt_tcpsendrcv�ȵ��ýӿڳ�ͻ [��Ҫ]
177*a9643ea8Slogwang  */
178*a9643ea8Slogwang static int mt_tcp_check_recv(int sock, char* rcv_buf, int &len, int flags, int timeout, MtFuncTcpMsgLen func)
179*a9643ea8Slogwang {
180*a9643ea8Slogwang     int recv_len = 0;
181*a9643ea8Slogwang     utime64_t start_ms = MtFrame::Instance()->GetLastClock();
182*a9643ea8Slogwang     do
183*a9643ea8Slogwang     {
184*a9643ea8Slogwang         utime64_t cost_time = MtFrame::Instance()->GetLastClock() - start_ms;
185*a9643ea8Slogwang         if (cost_time > (utime64_t)timeout)
186*a9643ea8Slogwang         {
187*a9643ea8Slogwang             errno = ETIME;
188*a9643ea8Slogwang             MTLOG_ERROR("tcp socket[%d] recv not ok, timeout", sock);
189*a9643ea8Slogwang             return -3;
190*a9643ea8Slogwang         }
191*a9643ea8Slogwang 
192*a9643ea8Slogwang         int rc = MtFrame::recv(sock, (rcv_buf + recv_len), (len - recv_len), 0, (timeout - (int)cost_time));
193*a9643ea8Slogwang         if (rc < 0)
194*a9643ea8Slogwang         {
195*a9643ea8Slogwang             MTLOG_ERROR("tcp socket[%d] recv failed ret[%d][%m]", sock, rc);
196*a9643ea8Slogwang             return -3;
197*a9643ea8Slogwang         }
198*a9643ea8Slogwang 		else if (rc == 0)
199*a9643ea8Slogwang         {
200*a9643ea8Slogwang         	len = recv_len;
201*a9643ea8Slogwang             MTLOG_ERROR("tcp socket[%d] remote close", sock);
202*a9643ea8Slogwang             return -7;
203*a9643ea8Slogwang         }
204*a9643ea8Slogwang         recv_len += rc;
205*a9643ea8Slogwang 
206*a9643ea8Slogwang         /* ��鱨�������� */
207*a9643ea8Slogwang         rc = func(rcv_buf, recv_len);
208*a9643ea8Slogwang         if (rc < 0)
209*a9643ea8Slogwang         {
210*a9643ea8Slogwang             MTLOG_ERROR("tcp socket[%d] user check pkg error[%d]", sock, rc);
211*a9643ea8Slogwang             return -5;
212*a9643ea8Slogwang         }
213*a9643ea8Slogwang         else if (rc == 0) // ����δ������
214*a9643ea8Slogwang         {
215*a9643ea8Slogwang             if (len == recv_len) // û�ռ��ٽ�����, ����
216*a9643ea8Slogwang             {
217*a9643ea8Slogwang                 MTLOG_ERROR("tcp socket[%d] user check pkg not ok, but no more buff", sock);
218*a9643ea8Slogwang                 return -6;
219*a9643ea8Slogwang             }
220*a9643ea8Slogwang             continue;
221*a9643ea8Slogwang         }
222*a9643ea8Slogwang         else    // �ɹ����㱨�ij���
223*a9643ea8Slogwang         {
224*a9643ea8Slogwang             if (rc > recv_len) // ���Ļ�δ��ȫ
225*a9643ea8Slogwang             {
226*a9643ea8Slogwang                 continue;
227*a9643ea8Slogwang             }
228*a9643ea8Slogwang             else
229*a9643ea8Slogwang             {
230*a9643ea8Slogwang                 len = rc;
231*a9643ea8Slogwang                 break;
232*a9643ea8Slogwang             }
233*a9643ea8Slogwang         }
234*a9643ea8Slogwang     } while (true);
235*a9643ea8Slogwang 
236*a9643ea8Slogwang     return 0;
237*a9643ea8Slogwang }
238*a9643ea8Slogwang 
239*a9643ea8Slogwang /**
240*a9643ea8Slogwang  * @brief TCP��������ӳصķ�ʽ����IP/PORT����, ���ӱ���Ĭ��10����
241*a9643ea8Slogwang  *        [ע��] tcp���շ���buff, ��������static����, ����������Ĵ��� [��Ҫ]
242*a9643ea8Slogwang  *        [ע��] �޸Ľӿڣ���ע�ⲻҪ����޸ķ���ֵ������֤��mt_tcpsendrcv_ex����ֵƥ�� [��Ҫ]
243*a9643ea8Slogwang  * @param dst -�����͵�Ŀ�ĵ�ַ
244*a9643ea8Slogwang  * @param pkg -�������װ�İ���
245*a9643ea8Slogwang  * @param len -�������װ�İ��峤��
246*a9643ea8Slogwang  * @param rcv_buf -����Ӧ�����buff
247*a9643ea8Slogwang  * @param buf_size -modify-����Ӧ�����buff��С, �ɹ�����ʱ, �޸�ΪӦ�������
248*a9643ea8Slogwang  * @param timeout -��ʱʱ��, ��λms
249*a9643ea8Slogwang  * @param check_func -��ⱨ���Ƿ�ɹ����ﺯ��
250*a9643ea8Slogwang  * @return  0 �ɹ�, -1 ��socketʧ��, -2 ��������ʧ��, -3 ����Ӧ��ʧ��,
251*a9643ea8Slogwang  *          -4 ����ʧ��, -5 ��ⱨ��ʧ��, -6 ���տռ䲻��, -7 ��������ر����ӣ�-10 ������Ч
252*a9643ea8Slogwang  */
253*a9643ea8Slogwang int mt_tcpsendrcv(struct sockaddr_in* dst, void* pkg, int len, void* rcv_buf, int& buf_size, int timeout, MtFuncTcpMsgLen func)
254*a9643ea8Slogwang {
255*a9643ea8Slogwang     if (!dst || !pkg || !rcv_buf || !func || len<1 || buf_size<1)
256*a9643ea8Slogwang     {
257*a9643ea8Slogwang         MTLOG_ERROR("input params invalid, dst[%p], pkg[%p], rcv_buf[%p], fun[%p], len[%d], buf_size[%d]",
258*a9643ea8Slogwang             dst, pkg, rcv_buf, func, len, buf_size);
259*a9643ea8Slogwang         return -10;
260*a9643ea8Slogwang     }
261*a9643ea8Slogwang 
262*a9643ea8Slogwang     int ret = 0, rc = 0;
263*a9643ea8Slogwang     int addr_len = sizeof(struct sockaddr_in);
264*a9643ea8Slogwang     utime64_t start_ms = MtFrame::Instance()->GetLastClock();
265*a9643ea8Slogwang     utime64_t cost_time = 0;
266*a9643ea8Slogwang     int time_left = timeout;
267*a9643ea8Slogwang 
268*a9643ea8Slogwang     // 1. ��ȡTCP���ӳض���, �ҽ�֪ͨ����
269*a9643ea8Slogwang     int sock = -1;
270*a9643ea8Slogwang     TcpKeepConn* conn = mt_tcp_get_keep_conn(dst, sock);
271*a9643ea8Slogwang     if ((conn == NULL) || (sock < 0))
272*a9643ea8Slogwang     {
273*a9643ea8Slogwang         MTLOG_ERROR("socket[%d] get conn failed, ret[%m]", sock);
274*a9643ea8Slogwang         ret = -1;
275*a9643ea8Slogwang         goto EXIT_LABEL;
276*a9643ea8Slogwang     }
277*a9643ea8Slogwang 
278*a9643ea8Slogwang     // 2. ���Լ����½�����
279*a9643ea8Slogwang     rc = MtFrame::connect(sock, (struct sockaddr *)dst, addr_len, time_left);
280*a9643ea8Slogwang     if (rc < 0)
281*a9643ea8Slogwang     {
282*a9643ea8Slogwang         MTLOG_ERROR("socket[%d] connect failed, ret[%d][%m]", sock, rc);
283*a9643ea8Slogwang         ret = -4;
284*a9643ea8Slogwang         goto EXIT_LABEL;
285*a9643ea8Slogwang     }
286*a9643ea8Slogwang 
287*a9643ea8Slogwang     // 3. �������ݴ���
288*a9643ea8Slogwang     cost_time = MtFrame::Instance()->GetLastClock() - start_ms;
289*a9643ea8Slogwang     time_left = (timeout > (int)cost_time) ? (timeout - (int)cost_time) : 0;
290*a9643ea8Slogwang     rc = MtFrame::send(sock, pkg, len, 0, time_left);
291*a9643ea8Slogwang     if (rc < 0)
292*a9643ea8Slogwang     {
293*a9643ea8Slogwang         MTLOG_ERROR("socket[%d] send failed, ret[%d][%m]", sock, rc);
294*a9643ea8Slogwang         ret = -2;
295*a9643ea8Slogwang         goto EXIT_LABEL;
296*a9643ea8Slogwang     }
297*a9643ea8Slogwang 
298*a9643ea8Slogwang     // 4. �������ݴ���
299*a9643ea8Slogwang     cost_time = MtFrame::Instance()->GetLastClock() - start_ms;
300*a9643ea8Slogwang     time_left = (timeout > (int)cost_time) ? (timeout - (int)cost_time) : 0;
301*a9643ea8Slogwang     rc = mt_tcp_check_recv(sock, (char*)rcv_buf, buf_size, 0, time_left, func);
302*a9643ea8Slogwang     if (rc < 0)
303*a9643ea8Slogwang     {
304*a9643ea8Slogwang         MTLOG_ERROR("socket[%d] rcv failed, ret[%d][%m]", sock, rc);
305*a9643ea8Slogwang         ret = rc;
306*a9643ea8Slogwang         goto EXIT_LABEL;
307*a9643ea8Slogwang     }
308*a9643ea8Slogwang 
309*a9643ea8Slogwang     ret = 0;
310*a9643ea8Slogwang 
311*a9643ea8Slogwang EXIT_LABEL:
312*a9643ea8Slogwang 
313*a9643ea8Slogwang     // ʧ����ǿ���ͷ�����, ����ʱ����
314*a9643ea8Slogwang     if (conn != NULL)
315*a9643ea8Slogwang     {
316*a9643ea8Slogwang         ConnectionMgr::Instance()->FreeConnection(conn, (ret < 0));
317*a9643ea8Slogwang     }
318*a9643ea8Slogwang 
319*a9643ea8Slogwang     return ret;
320*a9643ea8Slogwang }
321*a9643ea8Slogwang 
322*a9643ea8Slogwang /**
323*a9643ea8Slogwang  * @brief TCP�������շ�����
324*a9643ea8Slogwang  *        [ע��] tcp���շ���buff, ��������static����, ����������Ĵ��� [��Ҫ]
325*a9643ea8Slogwang  *        [ע��] �޸Ľӿڣ���ע�ⲻҪ����޸ķ���ֵ������֤��mt_tcpsendrcv_ex����ֵƥ�� [��Ҫ]
326*a9643ea8Slogwang  * @param dst -�����͵�Ŀ�ĵ�ַ
327*a9643ea8Slogwang  * @param pkg -�������װ�İ���
328*a9643ea8Slogwang  * @param len -�������װ�İ��峤��
329*a9643ea8Slogwang  * @param rcv_buf -����Ӧ�����buff
330*a9643ea8Slogwang  * @param buf_size -modify-����Ӧ�����buff��С, �ɹ�����ʱ, �޸�ΪӦ�������
331*a9643ea8Slogwang  * @param timeout -��ʱʱ��, ��λms
332*a9643ea8Slogwang  * @param check_func -��ⱨ���Ƿ�ɹ����ﺯ��
333*a9643ea8Slogwang  * @return  0 �ɹ�, -1 ��socketʧ��, -2 ��������ʧ��, -3 ����Ӧ��ʧ��,
334*a9643ea8Slogwang  *          -4 ����ʧ��, -5 ��ⱨ��ʧ��, -6 ���տռ䲻��, -7 ��������ر����ӣ�-10 ������Ч
335*a9643ea8Slogwang  */
336*a9643ea8Slogwang int mt_tcpsendrcv_short(struct sockaddr_in* dst, void* pkg, int len, void* rcv_buf, int& buf_size, int timeout, MtFuncTcpMsgLen func)
337*a9643ea8Slogwang {
338*a9643ea8Slogwang     int ret = 0, rc = 0;
339*a9643ea8Slogwang     int addr_len = sizeof(struct sockaddr_in);
340*a9643ea8Slogwang     utime64_t start_ms = MtFrame::Instance()->GetLastClock();
341*a9643ea8Slogwang     utime64_t cost_time = 0;
342*a9643ea8Slogwang     int time_left = timeout;
343*a9643ea8Slogwang 
344*a9643ea8Slogwang     // 1. �������
345*a9643ea8Slogwang     if (!dst || !pkg || !rcv_buf || !func || len<1 || buf_size<1)
346*a9643ea8Slogwang     {
347*a9643ea8Slogwang         MTLOG_ERROR("input params invalid, dst[%p], pkg[%p], rcv_buf[%p], fun[%p], len[%d], buf_size[%d]",
348*a9643ea8Slogwang             dst, pkg, rcv_buf, func, len, buf_size);
349*a9643ea8Slogwang         return -10;
350*a9643ea8Slogwang     }
351*a9643ea8Slogwang 
352*a9643ea8Slogwang     // 2. ����TCP socket
353*a9643ea8Slogwang     int sock;
354*a9643ea8Slogwang     sock = mt_tcp_create_sock();
355*a9643ea8Slogwang     if (sock < 0)
356*a9643ea8Slogwang     {
357*a9643ea8Slogwang         MTLOG_ERROR("create tcp socket failed, ret: %d", sock);
358*a9643ea8Slogwang         return -1;
359*a9643ea8Slogwang     }
360*a9643ea8Slogwang 
361*a9643ea8Slogwang     // 3. ���Լ����½�����
362*a9643ea8Slogwang     rc = MtFrame::connect(sock, (struct sockaddr *)dst, addr_len, time_left);
363*a9643ea8Slogwang     if (rc < 0)
364*a9643ea8Slogwang     {
365*a9643ea8Slogwang         MTLOG_ERROR("socket[%d] connect failed, ret[%d][%m]", sock, rc);
366*a9643ea8Slogwang         ret = -4;
367*a9643ea8Slogwang         goto EXIT_LABEL;
368*a9643ea8Slogwang     }
369*a9643ea8Slogwang 
370*a9643ea8Slogwang     // 4. �������ݴ���
371*a9643ea8Slogwang     cost_time = MtFrame::Instance()->GetLastClock() - start_ms;
372*a9643ea8Slogwang     time_left = (timeout > (int)cost_time) ? (timeout - (int)cost_time) : 0;
373*a9643ea8Slogwang     rc = MtFrame::send(sock, pkg, len, 0, time_left);
374*a9643ea8Slogwang     if (rc < 0)
375*a9643ea8Slogwang     {
376*a9643ea8Slogwang         MTLOG_ERROR("socket[%d] send failed, ret[%d][%m]", sock, rc);
377*a9643ea8Slogwang         ret = -2;
378*a9643ea8Slogwang         goto EXIT_LABEL;
379*a9643ea8Slogwang     }
380*a9643ea8Slogwang 
381*a9643ea8Slogwang     // 5. �������ݴ���
382*a9643ea8Slogwang     cost_time = MtFrame::Instance()->GetLastClock() - start_ms;
383*a9643ea8Slogwang     time_left = (timeout > (int)cost_time) ? (timeout - (int)cost_time) : 0;
384*a9643ea8Slogwang     rc = mt_tcp_check_recv(sock, (char*)rcv_buf, buf_size, 0, time_left, func);
385*a9643ea8Slogwang     if (rc < 0)
386*a9643ea8Slogwang     {
387*a9643ea8Slogwang         MTLOG_ERROR("socket[%d] rcv failed, ret[%d][%m]", sock, rc);
388*a9643ea8Slogwang         ret = rc;
389*a9643ea8Slogwang         goto EXIT_LABEL;
390*a9643ea8Slogwang     }
391*a9643ea8Slogwang 
392*a9643ea8Slogwang     ret = 0;
393*a9643ea8Slogwang 
394*a9643ea8Slogwang EXIT_LABEL:
395*a9643ea8Slogwang     if (sock >= 0)
396*a9643ea8Slogwang         ::close(sock);
397*a9643ea8Slogwang 
398*a9643ea8Slogwang     return ret;
399*a9643ea8Slogwang }
400*a9643ea8Slogwang 
401*a9643ea8Slogwang 
402*a9643ea8Slogwang /**
403*a9643ea8Slogwang  * @brief TCP��������ӳصķ�ʽ����IP/PORT����, ���ӱ���Ĭ��10����
404*a9643ea8Slogwang  *        [ע��] tcp����buff, ��������static����, ����������Ĵ��� [��Ҫ]
405*a9643ea8Slogwang  *        [ע��] �޸Ľӿڣ���ע�ⲻҪ����޸ķ���ֵ������֤��mt_tcpsendrcv_ex����ֵƥ�� [��Ҫ]
406*a9643ea8Slogwang  * @param dst -�����͵�Ŀ�ĵ�ַ
407*a9643ea8Slogwang  * @param pkg -�������װ�İ���
408*a9643ea8Slogwang  * @param len -�������װ�İ��峤��
409*a9643ea8Slogwang  * @param timeout -��ʱʱ��, ��λms
410*a9643ea8Slogwang  * @return  0 �ɹ�, -1 ��socketʧ��, -2 ��������ʧ��, -4 ����ʧ��, -10 ������Ч
411*a9643ea8Slogwang  */
412*a9643ea8Slogwang int mt_tcpsend(struct sockaddr_in* dst, void* pkg, int len, int timeout)
413*a9643ea8Slogwang {
414*a9643ea8Slogwang     if (!dst || !pkg || len<1)
415*a9643ea8Slogwang     {
416*a9643ea8Slogwang         MTLOG_ERROR("input params invalid, dst[%p], pkg[%p], len[%d]", dst, pkg, len);
417*a9643ea8Slogwang         return -10;
418*a9643ea8Slogwang     }
419*a9643ea8Slogwang 
420*a9643ea8Slogwang     int ret = 0, rc = 0;
421*a9643ea8Slogwang     int addr_len = sizeof(struct sockaddr_in);
422*a9643ea8Slogwang     utime64_t start_ms = MtFrame::Instance()->GetLastClock();
423*a9643ea8Slogwang     utime64_t cost_time = 0;
424*a9643ea8Slogwang     int time_left = timeout;
425*a9643ea8Slogwang 
426*a9643ea8Slogwang     // 1. ��ȡTCP���ӳض���, �ҽ�֪ͨ����
427*a9643ea8Slogwang     int sock = -1;
428*a9643ea8Slogwang     TcpKeepConn* conn = mt_tcp_get_keep_conn(dst, sock);
429*a9643ea8Slogwang     if ((conn == NULL) || (sock < 0))
430*a9643ea8Slogwang     {
431*a9643ea8Slogwang         MTLOG_ERROR("socket[%d] get conn failed, ret[%m]", sock);
432*a9643ea8Slogwang         ret = -1;
433*a9643ea8Slogwang         goto EXIT_LABEL;
434*a9643ea8Slogwang     }
435*a9643ea8Slogwang 
436*a9643ea8Slogwang     // 2. ���Լ����½�����
437*a9643ea8Slogwang     rc = MtFrame::connect(sock, (struct sockaddr *)dst, addr_len, time_left);
438*a9643ea8Slogwang     if (rc < 0)
439*a9643ea8Slogwang     {
440*a9643ea8Slogwang         MTLOG_ERROR("socket[%d] connect failed, ret[%d][%m]", sock, rc);
441*a9643ea8Slogwang         ret = -4;
442*a9643ea8Slogwang         goto EXIT_LABEL;
443*a9643ea8Slogwang     }
444*a9643ea8Slogwang 
445*a9643ea8Slogwang     // 3. �������ݴ���
446*a9643ea8Slogwang     cost_time = MtFrame::Instance()->GetLastClock() - start_ms;
447*a9643ea8Slogwang     time_left = (timeout > (int)cost_time) ? (timeout - (int)cost_time) : 0;
448*a9643ea8Slogwang     rc = MtFrame::send(sock, pkg, len, 0, time_left);
449*a9643ea8Slogwang     if (rc < 0)
450*a9643ea8Slogwang     {
451*a9643ea8Slogwang         MTLOG_ERROR("socket[%d] send failed, ret[%d][%m]", sock, rc);
452*a9643ea8Slogwang         ret = -2;
453*a9643ea8Slogwang         goto EXIT_LABEL;
454*a9643ea8Slogwang     }
455*a9643ea8Slogwang 
456*a9643ea8Slogwang     ret = 0;
457*a9643ea8Slogwang 
458*a9643ea8Slogwang EXIT_LABEL:
459*a9643ea8Slogwang 
460*a9643ea8Slogwang     // ʧ����ǿ���ͷ�����, ����ʱ����
461*a9643ea8Slogwang     if (conn != NULL)
462*a9643ea8Slogwang     {
463*a9643ea8Slogwang         ConnectionMgr::Instance()->FreeConnection(conn, (ret < 0));
464*a9643ea8Slogwang     }
465*a9643ea8Slogwang 
466*a9643ea8Slogwang     return ret;
467*a9643ea8Slogwang }
468*a9643ea8Slogwang 
469*a9643ea8Slogwang /**
470*a9643ea8Slogwang  * @brief TCP������ֻ�����սӿ�
471*a9643ea8Slogwang  *        [ע��] tcp����buff, ��������static����, ����������Ĵ��� [��Ҫ]
472*a9643ea8Slogwang  *        [ע��] �޸Ľӿڣ���ע�ⲻҪ����޸ķ���ֵ������֤��mt_tcpsendrcv_ex����ֵƥ�� [��Ҫ]
473*a9643ea8Slogwang  * @param dst -�����͵�Ŀ�ĵ�ַ
474*a9643ea8Slogwang  * @param pkg -�������װ�İ���
475*a9643ea8Slogwang  * @param len -�������װ�İ��峤��
476*a9643ea8Slogwang  * @param timeout -��ʱʱ��, ��λms
477*a9643ea8Slogwang  * @return  0 �ɹ�, -1 ��socketʧ��, -2 ��������ʧ��, -4 ����ʧ��, -10 ������Ч
478*a9643ea8Slogwang  */
479*a9643ea8Slogwang int mt_tcpsend_short(struct sockaddr_in* dst, void* pkg, int len, int timeout)
480*a9643ea8Slogwang {
481*a9643ea8Slogwang     // 1. ��μ��
482*a9643ea8Slogwang     if (!dst || !pkg || len<1)
483*a9643ea8Slogwang     {
484*a9643ea8Slogwang         MTLOG_ERROR("input params invalid, dst[%p], pkg[%p], len[%d]", dst, pkg, len);
485*a9643ea8Slogwang         return -10;
486*a9643ea8Slogwang     }
487*a9643ea8Slogwang 
488*a9643ea8Slogwang     int ret = 0, rc = 0;
489*a9643ea8Slogwang     int addr_len = sizeof(struct sockaddr_in);
490*a9643ea8Slogwang     utime64_t start_ms = MtFrame::Instance()->GetLastClock();
491*a9643ea8Slogwang     utime64_t cost_time = 0;
492*a9643ea8Slogwang     int time_left = timeout;
493*a9643ea8Slogwang 
494*a9643ea8Slogwang     // 2. ����TCP socket
495*a9643ea8Slogwang     int sock = -1;
496*a9643ea8Slogwang     sock = mt_tcp_create_sock();
497*a9643ea8Slogwang     if (sock < 0)
498*a9643ea8Slogwang     {
499*a9643ea8Slogwang         MTLOG_ERROR("create tcp socket failed, ret: %d", sock);
500*a9643ea8Slogwang         ret = -1;
501*a9643ea8Slogwang         goto EXIT_LABEL;
502*a9643ea8Slogwang     }
503*a9643ea8Slogwang 
504*a9643ea8Slogwang     // 2. ���Լ����½�����
505*a9643ea8Slogwang     rc = MtFrame::connect(sock, (struct sockaddr *)dst, addr_len, time_left);
506*a9643ea8Slogwang     if (rc < 0)
507*a9643ea8Slogwang     {
508*a9643ea8Slogwang         MTLOG_ERROR("socket[%d] connect failed, ret[%d][%m]", sock, rc);
509*a9643ea8Slogwang         ret = -4;
510*a9643ea8Slogwang         goto EXIT_LABEL;
511*a9643ea8Slogwang     }
512*a9643ea8Slogwang 
513*a9643ea8Slogwang     // 3. �������ݴ���
514*a9643ea8Slogwang     cost_time = MtFrame::Instance()->GetLastClock() - start_ms;
515*a9643ea8Slogwang     time_left = (timeout > (int)cost_time) ? (timeout - (int)cost_time) : 0;
516*a9643ea8Slogwang     rc = MtFrame::send(sock, pkg, len, 0, time_left);
517*a9643ea8Slogwang     if (rc < 0)
518*a9643ea8Slogwang     {
519*a9643ea8Slogwang         MTLOG_ERROR("socket[%d] send failed, ret[%d][%m]", sock, rc);
520*a9643ea8Slogwang         ret = -2;
521*a9643ea8Slogwang         goto EXIT_LABEL;
522*a9643ea8Slogwang     }
523*a9643ea8Slogwang 
524*a9643ea8Slogwang     ret = 0;
525*a9643ea8Slogwang 
526*a9643ea8Slogwang EXIT_LABEL:
527*a9643ea8Slogwang 
528*a9643ea8Slogwang     if (sock >= 0)
529*a9643ea8Slogwang         ::close(sock);
530*a9643ea8Slogwang 
531*a9643ea8Slogwang     return ret;
532*a9643ea8Slogwang }
533*a9643ea8Slogwang 
534*a9643ea8Slogwang 
535*a9643ea8Slogwang /**
536*a9643ea8Slogwang  * @brief TCP�շ��ӿڣ�����ѡ���˱������ӻ��߶�����
537*a9643ea8Slogwang  *        [ע��] tcp���շ���buff, ��������static����, ����������Ĵ��� [��Ҫ]
538*a9643ea8Slogwang  * @param dst -�����͵�Ŀ�ĵ�ַ
539*a9643ea8Slogwang  * @param pkg -�������װ�İ���
540*a9643ea8Slogwang  * @param len -�������װ�İ��峤��
541*a9643ea8Slogwang  * @param rcv_buf -����Ӧ�����buff��ֻ�����տ�������ΪNULL
542*a9643ea8Slogwang  * @param buf_size -modify-����Ӧ�����buff��С, �ɹ�����ʱ, �޸�ΪӦ������ȣ�ֻ�����գ�����ΪNULL
543*a9643ea8Slogwang  * @param timeout -��ʱʱ��, ��λms
544*a9643ea8Slogwang  * @param check_func -��ⱨ���Ƿ�ɹ����ﺯ��
545*a9643ea8Slogwang  * @param type - ��������
546*a9643ea8Slogwang  *               MT_TCP_SHORT: һ��һ�������ӣ�
547*a9643ea8Slogwang  *               MT_TCP_LONG : һ��һ�ճ����ӣ�
548*a9643ea8Slogwang  *               MT_TCP_LONG_SNDONLY : ֻ�����ճ����ӣ�
549*a9643ea8Slogwang  *               MT_TCP_SHORT_SNDONLY: ֻ�����ն����ӣ�
550*a9643ea8Slogwang  * @return  0 �ɹ�, -1 ��socketʧ��, -2 ��������ʧ��, -3 ����Ӧ��ʧ��,
551*a9643ea8Slogwang  *          -4 ����ʧ��, -5 ��ⱨ��ʧ��, -6 ���տռ䲻��, -7 ��������ر�����, -10 ������Ч
552*a9643ea8Slogwang  */
553*a9643ea8Slogwang int mt_tcpsendrcv_ex(struct sockaddr_in* dst, void* pkg, int len, void* rcv_buf, int* buf_size, int timeout, MtFuncTcpMsgLen func, MT_TCP_CONN_TYPE type)
554*a9643ea8Slogwang {
555*a9643ea8Slogwang 	if(!dst || !pkg || len<1)
556*a9643ea8Slogwang 	{
557*a9643ea8Slogwang         MTLOG_ERROR("input params invalid, dst[%p], pkg[%p], rcv_buf[%p], fun[%p], len[%d], buf_size[%p]type[%d]",
558*a9643ea8Slogwang                     dst, pkg, rcv_buf, func, len, buf_size,type);
559*a9643ea8Slogwang         return -10;
560*a9643ea8Slogwang 	}
561*a9643ea8Slogwang 
562*a9643ea8Slogwang     switch (type)
563*a9643ea8Slogwang     {
564*a9643ea8Slogwang         // TCP�����ӵ�������
565*a9643ea8Slogwang         case MT_TCP_LONG:
566*a9643ea8Slogwang         {
567*a9643ea8Slogwang             return mt_tcpsendrcv(dst, pkg, len, rcv_buf, *buf_size, timeout, func);
568*a9643ea8Slogwang         }
569*a9643ea8Slogwang 
570*a9643ea8Slogwang         // TCP������ֻ������
571*a9643ea8Slogwang         case MT_TCP_LONG_SNDONLY:
572*a9643ea8Slogwang         {
573*a9643ea8Slogwang             return mt_tcpsend(dst, pkg, len, timeout);
574*a9643ea8Slogwang         }
575*a9643ea8Slogwang 
576*a9643ea8Slogwang         // TCP�����ӵ�������
577*a9643ea8Slogwang         case MT_TCP_SHORT:
578*a9643ea8Slogwang         {
579*a9643ea8Slogwang             return mt_tcpsendrcv_short(dst, pkg, len, rcv_buf, *buf_size, timeout, func);
580*a9643ea8Slogwang         }
581*a9643ea8Slogwang 
582*a9643ea8Slogwang         // TCP������ֻ������
583*a9643ea8Slogwang         case MT_TCP_SHORT_SNDONLY:
584*a9643ea8Slogwang         {
585*a9643ea8Slogwang             return mt_tcpsend_short(dst, pkg, len, timeout);
586*a9643ea8Slogwang         }
587*a9643ea8Slogwang 
588*a9643ea8Slogwang         default:
589*a9643ea8Slogwang         {
590*a9643ea8Slogwang             MTLOG_ERROR("input params invalid, dst[%p], pkg[%p], rcv_buf[%p], fun[%p], len[%d], buf_size[%p]type[%d]",
591*a9643ea8Slogwang                         dst, pkg, rcv_buf, func, len, buf_size,type);
592*a9643ea8Slogwang             return -10;
593*a9643ea8Slogwang         }
594*a9643ea8Slogwang     }
595*a9643ea8Slogwang 
596*a9643ea8Slogwang     return 0;
597*a9643ea8Slogwang }
598*a9643ea8Slogwang 
599*a9643ea8Slogwang 
600*a9643ea8Slogwang 
601*a9643ea8Slogwang /**
602*a9643ea8Slogwang  * @brief ������ص���������
603*a9643ea8Slogwang  */
604*a9643ea8Slogwang static void mt_task_process(void* arg)
605*a9643ea8Slogwang {
606*a9643ea8Slogwang     int rc = 0;
607*a9643ea8Slogwang     IMtTask* task = (IMtTask*)arg;
608*a9643ea8Slogwang     if (!task)
609*a9643ea8Slogwang     {
610*a9643ea8Slogwang         MTLOG_ERROR("Invalid arg, error");
611*a9643ea8Slogwang         return;
612*a9643ea8Slogwang     }
613*a9643ea8Slogwang 
614*a9643ea8Slogwang     rc = task->Process();
615*a9643ea8Slogwang     if (rc != 0)
616*a9643ea8Slogwang     {
617*a9643ea8Slogwang         MTLOG_DEBUG("task process failed(%d), log", rc);
618*a9643ea8Slogwang     }
619*a9643ea8Slogwang 
620*a9643ea8Slogwang     task->SetResult(rc);
621*a9643ea8Slogwang 
622*a9643ea8Slogwang     return;
623*a9643ea8Slogwang };
624*a9643ea8Slogwang 
625*a9643ea8Slogwang /**
626*a9643ea8Slogwang  * @brief ��·IO�Ĵ���, ��������̹߳���
627*a9643ea8Slogwang  * @param req_list - �����б�
628*a9643ea8Slogwang  * @return 0 �ɹ�, <0ʧ��
629*a9643ea8Slogwang  */
630*a9643ea8Slogwang int mt_exec_all_task(IMtTaskList& req_list)
631*a9643ea8Slogwang {
632*a9643ea8Slogwang     MtFrame* mtframe    = MtFrame::Instance();
633*a9643ea8Slogwang     MicroThread* thread = mtframe->GetActiveThread();
634*a9643ea8Slogwang     IMtTask* task       = NULL;
635*a9643ea8Slogwang     MicroThread* sub    = NULL;
636*a9643ea8Slogwang     MicroThread* tmp    = NULL;
637*a9643ea8Slogwang     int rc              = -1;
638*a9643ea8Slogwang 
639*a9643ea8Slogwang     MicroThread::SubThreadList list;
640*a9643ea8Slogwang     TAILQ_INIT(&list);
641*a9643ea8Slogwang 
642*a9643ea8Slogwang     // ��ֹû��task������΢�߳�һֱ����ס
643*a9643ea8Slogwang     if (0 == req_list.size())
644*a9643ea8Slogwang     {
645*a9643ea8Slogwang         MTLOG_DEBUG("no task for execult");
646*a9643ea8Slogwang         return 0;
647*a9643ea8Slogwang     }
648*a9643ea8Slogwang 
649*a9643ea8Slogwang     // 1. �����̶߳���
650*a9643ea8Slogwang     for (IMtTaskList::iterator it = req_list.begin(); it != req_list.end(); ++it)
651*a9643ea8Slogwang     {
652*a9643ea8Slogwang         task = *it;
653*a9643ea8Slogwang         sub = MtFrame::CreateThread(mt_task_process, task, false);
654*a9643ea8Slogwang         if (NULL == sub)
655*a9643ea8Slogwang         {
656*a9643ea8Slogwang             MTLOG_ERROR("create sub thread failed");
657*a9643ea8Slogwang             goto EXIT_LABEL;
658*a9643ea8Slogwang         }
659*a9643ea8Slogwang 
660*a9643ea8Slogwang         sub->SetType(MicroThread::SUB_THREAD);
661*a9643ea8Slogwang         TAILQ_INSERT_TAIL(&list, sub, _sub_entry);
662*a9643ea8Slogwang     }
663*a9643ea8Slogwang 
664*a9643ea8Slogwang     // 2. ����ִ������
665*a9643ea8Slogwang     TAILQ_FOREACH_SAFE(sub, &list, _sub_entry, tmp)
666*a9643ea8Slogwang     {
667*a9643ea8Slogwang         TAILQ_REMOVE(&list, sub, _sub_entry);
668*a9643ea8Slogwang         thread->AddSubThread(sub);
669*a9643ea8Slogwang         mtframe->InsertRunable(sub);
670*a9643ea8Slogwang     }
671*a9643ea8Slogwang 
672*a9643ea8Slogwang     // 3. �ȴ����߳�ִ�н���
673*a9643ea8Slogwang     thread->Wait();
674*a9643ea8Slogwang     rc = 0;
675*a9643ea8Slogwang 
676*a9643ea8Slogwang EXIT_LABEL:
677*a9643ea8Slogwang 
678*a9643ea8Slogwang     TAILQ_FOREACH_SAFE(sub, &list, _sub_entry, tmp)
679*a9643ea8Slogwang     {
680*a9643ea8Slogwang         TAILQ_REMOVE(&list, sub, _sub_entry);
681*a9643ea8Slogwang         mtframe->FreeThread(sub);
682*a9643ea8Slogwang     }
683*a9643ea8Slogwang 
684*a9643ea8Slogwang     return rc;
685*a9643ea8Slogwang 
686*a9643ea8Slogwang }
687*a9643ea8Slogwang 
688*a9643ea8Slogwang /**
689*a9643ea8Slogwang  * @brief ���õ�ǰIMtMsg��˽�б���
690*a9643ea8Slogwang  * @info  ֻ����ָ�룬�ڴ���Ҫҵ�����
691*a9643ea8Slogwang  */
692*a9643ea8Slogwang void mt_set_msg_private(void *data)
693*a9643ea8Slogwang {
694*a9643ea8Slogwang     MicroThread *msg_thread = MtFrame::Instance()->GetRootThread();
695*a9643ea8Slogwang     if (msg_thread != NULL)
696*a9643ea8Slogwang         msg_thread->SetPrivate(data);
697*a9643ea8Slogwang }
698*a9643ea8Slogwang 
699*a9643ea8Slogwang /**
700*a9643ea8Slogwang  * @brief  ��ȡ��ǰIMtMsg��˽�б���
701*a9643ea8Slogwang  * @return ˽�б���ָ��
702*a9643ea8Slogwang  */
703*a9643ea8Slogwang void* mt_get_msg_private()
704*a9643ea8Slogwang {
705*a9643ea8Slogwang     MicroThread *msg_thread = MtFrame::Instance()->GetRootThread();
706*a9643ea8Slogwang     if (NULL == msg_thread)
707*a9643ea8Slogwang     {
708*a9643ea8Slogwang         return NULL;
709*a9643ea8Slogwang     }
710*a9643ea8Slogwang 
711*a9643ea8Slogwang     return msg_thread->GetPrivate();
712*a9643ea8Slogwang }
713*a9643ea8Slogwang 
714*a9643ea8Slogwang /**
715*a9643ea8Slogwang  * @brief  ΢�߳̿�ܳ�ʼ��
716*a9643ea8Slogwang  * @info   ҵ��ʹ��spp������΢�̣߳���Ҫ���øó�ʼ������
717*a9643ea8Slogwang  * @return false:��ʼ��ʧ��  true:��ʼ���ɹ�
718*a9643ea8Slogwang  */
719*a9643ea8Slogwang bool mt_init_frame(const char *conf, int argc, char * const argv[])
720*a9643ea8Slogwang {
721*a9643ea8Slogwang 	if (conf && argc) {
722*a9643ea8Slogwang 		ff_init(conf, argc, argv);
723*a9643ea8Slogwang 		ff_set_hook_flag();
724*a9643ea8Slogwang 	}
725*a9643ea8Slogwang 	memset(&g_mt_syscall_tab, 0, sizeof(g_mt_syscall_tab));
726*a9643ea8Slogwang     return MtFrame::Instance()->InitFrame();
727*a9643ea8Slogwang }
728*a9643ea8Slogwang 
729*a9643ea8Slogwang /**
730*a9643ea8Slogwang  * @brief ����΢�̶߳���ջ�ռ��С
731*a9643ea8Slogwang  * @info  �DZ������ã�Ĭ�ϴ�СΪ128K
732*a9643ea8Slogwang  */
733*a9643ea8Slogwang void mt_set_stack_size(unsigned int bytes)
734*a9643ea8Slogwang {
735*a9643ea8Slogwang     ThreadPool::SetDefaultStackSize(bytes);
736*a9643ea8Slogwang }
737*a9643ea8Slogwang 
738*a9643ea8Slogwang /**
739*a9643ea8Slogwang  * @brief ΢�̰߳�����ϵͳIO���� recvfrom
740*a9643ea8Slogwang  * @param fd ϵͳsocket��Ϣ
741*a9643ea8Slogwang  * @param buf ������Ϣ������ָ��
742*a9643ea8Slogwang  * @param len ������Ϣ����������
743*a9643ea8Slogwang  * @param from ��Դ��ַ��ָ��
744*a9643ea8Slogwang  * @param fromlen ��Դ��ַ�Ľṹ����
745*a9643ea8Slogwang  * @param timeout ��ȴ�ʱ��, ����
746*a9643ea8Slogwang  * @return >0 �ɹ����ճ���, <0 ʧ��
747*a9643ea8Slogwang  */
748*a9643ea8Slogwang int mt_recvfrom(int fd, void *buf, int len, int flags, struct sockaddr *from, socklen_t *fromlen, int timeout)
749*a9643ea8Slogwang {
750*a9643ea8Slogwang     return MtFrame::recvfrom(fd, buf, len, flags, from, fromlen, timeout);
751*a9643ea8Slogwang }
752*a9643ea8Slogwang 
753*a9643ea8Slogwang /**
754*a9643ea8Slogwang  * @brief ΢�̰߳�����ϵͳIO���� sendto
755*a9643ea8Slogwang  * @param fd ϵͳsocket��Ϣ
756*a9643ea8Slogwang  * @param msg �����͵���Ϣָ��
757*a9643ea8Slogwang  * @param len �����͵���Ϣ����
758*a9643ea8Slogwang  * @param to Ŀ�ĵ�ַ��ָ��
759*a9643ea8Slogwang  * @param tolen Ŀ�ĵ�ַ�Ľṹ����
760*a9643ea8Slogwang  * @param timeout ��ȴ�ʱ��, ����
761*a9643ea8Slogwang  * @return >0 �ɹ����ͳ���, <0 ʧ��
762*a9643ea8Slogwang  */
763*a9643ea8Slogwang int mt_sendto(int fd, const void *msg, int len, int flags, const struct sockaddr *to, int tolen, int timeout)
764*a9643ea8Slogwang {
765*a9643ea8Slogwang     return MtFrame::sendto(fd, msg, len, flags, to, tolen, timeout);
766*a9643ea8Slogwang }
767*a9643ea8Slogwang 
768*a9643ea8Slogwang /**
769*a9643ea8Slogwang  * @brief ΢�̰߳�����ϵͳIO���� connect
770*a9643ea8Slogwang  * @param fd ϵͳsocket��Ϣ
771*a9643ea8Slogwang  * @param addr ָ��server��Ŀ�ĵ�ַ
772*a9643ea8Slogwang  * @param addrlen ��ַ�ij���
773*a9643ea8Slogwang  * @param timeout ��ȴ�ʱ��, ����
774*a9643ea8Slogwang  * @return >0 �ɹ����ͳ���, <0 ʧ��
775*a9643ea8Slogwang  */
776*a9643ea8Slogwang int mt_connect(int fd, const struct sockaddr *addr, int addrlen, int timeout)
777*a9643ea8Slogwang {
778*a9643ea8Slogwang     return MtFrame::connect(fd, addr, addrlen, timeout);
779*a9643ea8Slogwang }
780*a9643ea8Slogwang 
781*a9643ea8Slogwang /**
782*a9643ea8Slogwang  * @brief ΢�̰߳�����ϵͳIO���� accept
783*a9643ea8Slogwang  * @param fd �����׽���
784*a9643ea8Slogwang  * @param addr �ͻ��˵�ַ
785*a9643ea8Slogwang  * @param addrlen ��ַ�ij���
786*a9643ea8Slogwang  * @param timeout ��ȴ�ʱ��, ����
787*a9643ea8Slogwang  * @return >=0 accept��socket������, <0 ʧ��
788*a9643ea8Slogwang  */
789*a9643ea8Slogwang int mt_accept(int fd, struct sockaddr *addr, socklen_t *addrlen, int timeout)
790*a9643ea8Slogwang {
791*a9643ea8Slogwang     return MtFrame::accept(fd, addr, addrlen, timeout);
792*a9643ea8Slogwang }
793*a9643ea8Slogwang 
794*a9643ea8Slogwang 
795*a9643ea8Slogwang /**
796*a9643ea8Slogwang  * @brief ΢�̰߳�����ϵͳIO���� read
797*a9643ea8Slogwang  * @param fd ϵͳsocket��Ϣ
798*a9643ea8Slogwang  * @param buf ������Ϣ������ָ��
799*a9643ea8Slogwang  * @param nbyte ������Ϣ����������
800*a9643ea8Slogwang  * @param timeout ��ȴ�ʱ��, ����
801*a9643ea8Slogwang  * @return >0 �ɹ����ճ���, <0 ʧ��
802*a9643ea8Slogwang  */
803*a9643ea8Slogwang ssize_t mt_read(int fd, void *buf, size_t nbyte, int timeout)
804*a9643ea8Slogwang {
805*a9643ea8Slogwang     return MtFrame::read(fd, buf, nbyte, timeout);
806*a9643ea8Slogwang }
807*a9643ea8Slogwang 
808*a9643ea8Slogwang /**
809*a9643ea8Slogwang  * @brief ΢�̰߳�����ϵͳIO���� write
810*a9643ea8Slogwang  * @param fd ϵͳsocket��Ϣ
811*a9643ea8Slogwang  * @param buf �����͵���Ϣָ��
812*a9643ea8Slogwang  * @param nbyte �����͵���Ϣ����
813*a9643ea8Slogwang  * @param timeout ��ȴ�ʱ��, ����
814*a9643ea8Slogwang  * @return >0 �ɹ����ͳ���, <0 ʧ��
815*a9643ea8Slogwang  */
816*a9643ea8Slogwang ssize_t mt_write(int fd, const void *buf, size_t nbyte, int timeout)
817*a9643ea8Slogwang {
818*a9643ea8Slogwang     return MtFrame::write(fd, buf, nbyte, timeout);
819*a9643ea8Slogwang }
820*a9643ea8Slogwang 
821*a9643ea8Slogwang /**
822*a9643ea8Slogwang  * @brief ΢�̰߳�����ϵͳIO���� recv
823*a9643ea8Slogwang  * @param fd ϵͳsocket��Ϣ
824*a9643ea8Slogwang  * @param buf ������Ϣ������ָ��
825*a9643ea8Slogwang  * @param len ������Ϣ����������
826*a9643ea8Slogwang  * @param timeout ��ȴ�ʱ��, ����
827*a9643ea8Slogwang  * @return >0 �ɹ����ճ���, <0 ʧ��
828*a9643ea8Slogwang  */
829*a9643ea8Slogwang ssize_t mt_recv(int fd, void *buf, int len, int flags, int timeout)
830*a9643ea8Slogwang {
831*a9643ea8Slogwang     return MtFrame::recv(fd, buf, len, flags, timeout);
832*a9643ea8Slogwang }
833*a9643ea8Slogwang 
834*a9643ea8Slogwang /**
835*a9643ea8Slogwang  * @brief ΢�̰߳�����ϵͳIO���� send
836*a9643ea8Slogwang  * @param fd ϵͳsocket��Ϣ
837*a9643ea8Slogwang  * @param buf �����͵���Ϣָ��
838*a9643ea8Slogwang  * @param nbyte �����͵���Ϣ����
839*a9643ea8Slogwang  * @param timeout ��ȴ�ʱ��, ����
840*a9643ea8Slogwang  * @return >0 �ɹ����ͳ���, <0 ʧ��
841*a9643ea8Slogwang  */
842*a9643ea8Slogwang ssize_t mt_send(int fd, const void *buf, size_t nbyte, int flags, int timeout)
843*a9643ea8Slogwang {
844*a9643ea8Slogwang     return MtFrame::send(fd, buf, nbyte, flags, timeout);
845*a9643ea8Slogwang }
846*a9643ea8Slogwang 
847*a9643ea8Slogwang /**
848*a9643ea8Slogwang  * @brief ΢�߳�����sleep�ӿ�, ��λms
849*a9643ea8Slogwang  */
850*a9643ea8Slogwang void mt_sleep(int ms)
851*a9643ea8Slogwang {
852*a9643ea8Slogwang     MtFrame::sleep(ms);
853*a9643ea8Slogwang }
854*a9643ea8Slogwang 
855*a9643ea8Slogwang /**
856*a9643ea8Slogwang  * @brief ΢�̻߳�ȡϵͳʱ�䣬��λms
857*a9643ea8Slogwang  */
858*a9643ea8Slogwang unsigned long long mt_time_ms(void)
859*a9643ea8Slogwang {
860*a9643ea8Slogwang     return MtFrame::Instance()->GetLastClock();
861*a9643ea8Slogwang }
862*a9643ea8Slogwang 
863*a9643ea8Slogwang /**
864*a9643ea8Slogwang  * @brief ΢�̵߳ȴ�epoll�¼��İ�������
865*a9643ea8Slogwang  */
866*a9643ea8Slogwang int mt_wait_events(int fd, int events, int timeout)
867*a9643ea8Slogwang {
868*a9643ea8Slogwang     return MtFrame::Instance()->WaitEvents(fd, events, timeout);
869*a9643ea8Slogwang }
870*a9643ea8Slogwang 
871*a9643ea8Slogwang void* mt_start_thread(void* entry, void* args)
872*a9643ea8Slogwang {
873*a9643ea8Slogwang     return MtFrame::Instance()->CreateThread((ThreadStart)entry, args, true);
874*a9643ea8Slogwang }
875*a9643ea8Slogwang 
876*a9643ea8Slogwang #define BUF_ALIGNMENT_SIZE 4096
877*a9643ea8Slogwang #define BUF_ALIGN_SIZE(x) (((x)+BUF_ALIGNMENT_SIZE-1)&~(BUF_ALIGNMENT_SIZE-1))
878*a9643ea8Slogwang #define BUF_DEFAULT_SIZE 4096
879*a9643ea8Slogwang 
880*a9643ea8Slogwang class ScopedBuf
881*a9643ea8Slogwang {
882*a9643ea8Slogwang public:
883*a9643ea8Slogwang 	ScopedBuf(void*& buf_keeper, bool keep)
884*a9643ea8Slogwang 	:buf_keeper_(buf_keeper),buf_(0),len_(0),len_watermark_(0),keep_(keep)
885*a9643ea8Slogwang 	{}
886*a9643ea8Slogwang 
887*a9643ea8Slogwang 	int Alloc(int len)
888*a9643ea8Slogwang 	{
889*a9643ea8Slogwang 		if(len<len_)
890*a9643ea8Slogwang 		{
891*a9643ea8Slogwang 			return -1; // �ռ��СԽ��
892*a9643ea8Slogwang 		}
893*a9643ea8Slogwang 
894*a9643ea8Slogwang         if(len==0)
895*a9643ea8Slogwang         {
896*a9643ea8Slogwang             len = BUF_ALIGNMENT_SIZE;
897*a9643ea8Slogwang         }
898*a9643ea8Slogwang 		if(len_==len)
899*a9643ea8Slogwang 		{
900*a9643ea8Slogwang 			return 0;
901*a9643ea8Slogwang 		}
902*a9643ea8Slogwang 
903*a9643ea8Slogwang 		len_ = BUF_ALIGN_SIZE(len);
904*a9643ea8Slogwang 		if(len_==0)
905*a9643ea8Slogwang 		{
906*a9643ea8Slogwang 			len_ = BUF_DEFAULT_SIZE;
907*a9643ea8Slogwang 		}
908*a9643ea8Slogwang 		len_watermark_ = len_-BUF_ALIGNMENT_SIZE;
909*a9643ea8Slogwang 		char* tmp = (char*)realloc(buf_, len_);
910*a9643ea8Slogwang 		if(tmp==NULL)
911*a9643ea8Slogwang 		{
912*a9643ea8Slogwang 	        return -2; // �����ڴ�ʧ��
913*a9643ea8Slogwang 		}
914*a9643ea8Slogwang 
915*a9643ea8Slogwang 		buf_ = tmp;
916*a9643ea8Slogwang 		return 0;
917*a9643ea8Slogwang 	}
918*a9643ea8Slogwang 
919*a9643ea8Slogwang 	void reset()
920*a9643ea8Slogwang 	{
921*a9643ea8Slogwang 		if(keep_)
922*a9643ea8Slogwang 		{
923*a9643ea8Slogwang 			buf_keeper_ = (void*)buf_;
924*a9643ea8Slogwang 			buf_ = NULL;
925*a9643ea8Slogwang 		}
926*a9643ea8Slogwang 	}
927*a9643ea8Slogwang 
928*a9643ea8Slogwang 	~ScopedBuf()
929*a9643ea8Slogwang 	{
930*a9643ea8Slogwang 		if(buf_!=NULL)
931*a9643ea8Slogwang 		{
932*a9643ea8Slogwang 			free(buf_);
933*a9643ea8Slogwang 			buf_ = NULL;
934*a9643ea8Slogwang 		}
935*a9643ea8Slogwang 	}
936*a9643ea8Slogwang 
937*a9643ea8Slogwang public:
938*a9643ea8Slogwang 	void* &buf_keeper_;
939*a9643ea8Slogwang 	char* buf_;
940*a9643ea8Slogwang 	int   len_;
941*a9643ea8Slogwang     int   len_watermark_;
942*a9643ea8Slogwang 	bool  keep_;
943*a9643ea8Slogwang 
944*a9643ea8Slogwang };
945*a9643ea8Slogwang 
946*a9643ea8Slogwang /**
947*a9643ea8Slogwang  * @brief TCPѭ������, ֱ������OK��ʱ
948*a9643ea8Slogwang  *       [ע��] �����߲�Ҫ�����޸ĺ�������ֵ����֤��Ҫ��mt_tcpsendrcv�ȵ��ýӿڳ�ͻ [��Ҫ]
949*a9643ea8Slogwang  */
950*a9643ea8Slogwang static int mt_tcp_check_recv(int sock, void*& rcv_buf, int &len, int flags,
951*a9643ea8Slogwang                      int timeout, MtFuncTcpMsgChecker check_func, void* msg_ctx, bool keep_rcv_buf)
952*a9643ea8Slogwang {
953*a9643ea8Slogwang     int recv_len = 0;
954*a9643ea8Slogwang     utime64_t start_ms = MtFrame::Instance()->GetLastClock();
955*a9643ea8Slogwang 
956*a9643ea8Slogwang 	int rc = 0;
957*a9643ea8Slogwang 	int ret = 0;
958*a9643ea8Slogwang     int pkg_len = 0;
959*a9643ea8Slogwang     bool msg_len_detected = false;
960*a9643ea8Slogwang 
961*a9643ea8Slogwang 	ScopedBuf sbuf(rcv_buf, keep_rcv_buf);
962*a9643ea8Slogwang 	ret = sbuf.Alloc(len);
963*a9643ea8Slogwang 
964*a9643ea8Slogwang 	if(ret!=0)
965*a9643ea8Slogwang 	{
966*a9643ea8Slogwang         MTLOG_ERROR("tcp socket[%d] recv failed ret[%d], alloc rcv buf failed, [%m]", sock, ret);
967*a9643ea8Slogwang 		return -11;
968*a9643ea8Slogwang 	}
969*a9643ea8Slogwang 
970*a9643ea8Slogwang     do
971*a9643ea8Slogwang     {
972*a9643ea8Slogwang         utime64_t cost_time = MtFrame::Instance()->GetLastClock() - start_ms;
973*a9643ea8Slogwang         if (cost_time > (utime64_t)timeout)
974*a9643ea8Slogwang         {
975*a9643ea8Slogwang             errno = ETIME;
976*a9643ea8Slogwang             MTLOG_ERROR("tcp socket[%d] recv not ok, timeout", sock);
977*a9643ea8Slogwang             return -3;
978*a9643ea8Slogwang         }
979*a9643ea8Slogwang 
980*a9643ea8Slogwang         rc = MtFrame::recv(sock, (sbuf.buf_ + recv_len), (sbuf.len_ - recv_len), 0, (timeout - (int)cost_time));
981*a9643ea8Slogwang         if (rc < 0)
982*a9643ea8Slogwang         {
983*a9643ea8Slogwang             MTLOG_ERROR("tcp socket[%d] recv failed ret[%d][%m]", sock, rc);
984*a9643ea8Slogwang             return -3;
985*a9643ea8Slogwang         }
986*a9643ea8Slogwang 		else if (rc == 0) // Զ�˹ر�
987*a9643ea8Slogwang         {
988*a9643ea8Slogwang 
989*a9643ea8Slogwang 			if(recv_len==0) // δ�ذ���ֱ�ӷ���Զ�˹ر�
990*a9643ea8Slogwang 			{
991*a9643ea8Slogwang 	            MTLOG_ERROR("tcp socket[%d] remote close", sock);
992*a9643ea8Slogwang 	            return -7;
993*a9643ea8Slogwang 			}
994*a9643ea8Slogwang 
995*a9643ea8Slogwang 	        /* ��鱨�������� */
996*a9643ea8Slogwang 		    rc = check_func(sbuf.buf_, recv_len, true, msg_ctx, msg_len_detected);
997*a9643ea8Slogwang 
998*a9643ea8Slogwang 			if(rc!=recv_len) // ҵ�����Զ�˹رգ�Ӧ�÷������������ȣ�����<=0,��ʾ����������
999*a9643ea8Slogwang 			{
1000*a9643ea8Slogwang 	            MTLOG_ERROR("tcp socket[%d] remote close", sock);
1001*a9643ea8Slogwang     	        return -7;
1002*a9643ea8Slogwang 			}
1003*a9643ea8Slogwang         	len = recv_len;
1004*a9643ea8Slogwang 			break;
1005*a9643ea8Slogwang         }
1006*a9643ea8Slogwang         recv_len += rc;
1007*a9643ea8Slogwang 
1008*a9643ea8Slogwang         /* ��鱨�������� */
1009*a9643ea8Slogwang         if((!msg_len_detected)||recv_len==pkg_len)
1010*a9643ea8Slogwang         {
1011*a9643ea8Slogwang             rc = check_func(sbuf.buf_, recv_len, false, msg_ctx,msg_len_detected);
1012*a9643ea8Slogwang             if(msg_len_detected)
1013*a9643ea8Slogwang             {
1014*a9643ea8Slogwang                 pkg_len = rc;
1015*a9643ea8Slogwang             }
1016*a9643ea8Slogwang         }
1017*a9643ea8Slogwang         else
1018*a9643ea8Slogwang         {
1019*a9643ea8Slogwang             rc = pkg_len;
1020*a9643ea8Slogwang         }
1021*a9643ea8Slogwang 
1022*a9643ea8Slogwang         if (rc < 0)
1023*a9643ea8Slogwang         {
1024*a9643ea8Slogwang             MTLOG_ERROR("tcp socket[%d] user check pkg error[%d]", sock, rc);
1025*a9643ea8Slogwang             return -5;
1026*a9643ea8Slogwang         }
1027*a9643ea8Slogwang         else if (rc == 0) // ����δ������,�Ҳ�ȷ����С
1028*a9643ea8Slogwang         {
1029*a9643ea8Slogwang         	if(sbuf.len_ > recv_len)
1030*a9643ea8Slogwang         	{
1031*a9643ea8Slogwang         		continue;
1032*a9643ea8Slogwang         	}
1033*a9643ea8Slogwang             // û�ռ��ٽ�����, 2����С��չbuf
1034*a9643ea8Slogwang 			ret = sbuf.Alloc(sbuf.len_<<1);
1035*a9643ea8Slogwang 
1036*a9643ea8Slogwang 			if(ret!=0)
1037*a9643ea8Slogwang 			{
1038*a9643ea8Slogwang 		        MTLOG_ERROR("tcp socket[%d] recv failed ret[%d], alloc rcv buf failed, [%m]", sock, ret);
1039*a9643ea8Slogwang 				return -11;
1040*a9643ea8Slogwang 			}
1041*a9643ea8Slogwang         }
1042*a9643ea8Slogwang         else    // �ɹ����㱨�ij���
1043*a9643ea8Slogwang         {
1044*a9643ea8Slogwang             if (rc > recv_len) // ���Ļ�δ��ȫ
1045*a9643ea8Slogwang             {
1046*a9643ea8Slogwang             	if(sbuf.len_ > recv_len) // recv buf���пռ�.��δ����ˮλ
1047*a9643ea8Slogwang             	{
1048*a9643ea8Slogwang             		continue;
1049*a9643ea8Slogwang             	}
1050*a9643ea8Slogwang 
1051*a9643ea8Slogwang 	            // û�ռ��ٽ�����, ����ҵ��ָʾ��С��չ�ڴ�
1052*a9643ea8Slogwang 				ret = sbuf.Alloc(rc);
1053*a9643ea8Slogwang 
1054*a9643ea8Slogwang 				if(ret!=0)
1055*a9643ea8Slogwang 				{
1056*a9643ea8Slogwang 			        MTLOG_ERROR("tcp socket[%d] recv failed ret[%d], alloc rcv buf failed, [%m]", sock, ret);
1057*a9643ea8Slogwang 					return -11;
1058*a9643ea8Slogwang 				}
1059*a9643ea8Slogwang             }
1060*a9643ea8Slogwang             else if(rc==recv_len) // �հ�����
1061*a9643ea8Slogwang             {
1062*a9643ea8Slogwang                 len = rc;
1063*a9643ea8Slogwang                 break;
1064*a9643ea8Slogwang             }
1065*a9643ea8Slogwang             else // ΢�߳����ģʽ�£�������ճ��
1066*a9643ea8Slogwang             {
1067*a9643ea8Slogwang 	            MTLOG_ERROR("tcp socket[%d] user check pkg error, pkg len < recv_len", sock);
1068*a9643ea8Slogwang 	            return -5;
1069*a9643ea8Slogwang             }
1070*a9643ea8Slogwang         }
1071*a9643ea8Slogwang     } while (true);
1072*a9643ea8Slogwang 
1073*a9643ea8Slogwang 	sbuf.reset();
1074*a9643ea8Slogwang 
1075*a9643ea8Slogwang     return 0;
1076*a9643ea8Slogwang }
1077*a9643ea8Slogwang 
1078*a9643ea8Slogwang 
1079*a9643ea8Slogwang 
1080*a9643ea8Slogwang /**
1081*a9643ea8Slogwang  * @brief TCP��������ӳصķ�ʽ����IP/PORT����, ���ӱ���Ĭ��10����
1082*a9643ea8Slogwang  *        [ע��] tcp���շ���buff, ��������static����, ����������Ĵ��� [��Ҫ]
1083*a9643ea8Slogwang  * @param dst -�����͵�Ŀ�ĵ�ַ
1084*a9643ea8Slogwang  * @param pkg -�������װ�İ���
1085*a9643ea8Slogwang  * @param len -�������װ�İ��峤��
1086*a9643ea8Slogwang  * @param rcv_buf -���������������ο����� keep_rcv_buf��
1087*a9643ea8Slogwang  * @param recv_pkg_size -����Ӧ�����buff�ij�ʼ��С����������0ʱ�����ر���ʵ�ʳ��ȡ�
1088*a9643ea8Slogwang  * @param timeout -��ʱʱ��, ��λms
1089*a9643ea8Slogwang  * @param check_func -��ⱨ���Ƿ�ɹ����ﺯ��
1090*a9643ea8Slogwang  * @param msg_ctx -�������ĵ������ı�����
1091*a9643ea8Slogwang  * @param keep_rcv_buf -true,��ܽ�����rcv_buf��ֵ������������ҵ�����ͷŸ�buf���������ڵ��ý���ǰ�ͷŸ�buf����ע�⡿
1092*a9643ea8Slogwang  *               ҵ����Ҫ�Լ���msg_ctx�б��������Ϣ�������ͨ��malloc�����ڴ桿
1093*a9643ea8Slogwang  * @return  0 �ɹ�, -1 ��socketʧ��, -2 ��������ʧ��, -3 ����Ӧ��ʧ��,
1094*a9643ea8Slogwang  *          -4 ����ʧ��, -5 ��ⱨ��ʧ��, -6 ���տռ䲻��, -7 ��������ر�����, -10 ������Ч
1095*a9643ea8Slogwang  */
1096*a9643ea8Slogwang int mt_tcpsendrcv(struct sockaddr_in* dst, void* pkg, int len, void*& rcv_buf, int& recv_pkg_size,
1097*a9643ea8Slogwang                      int timeout, MtFuncTcpMsgChecker check_func, void* msg_ctx, bool keep_rcv_buf)
1098*a9643ea8Slogwang {
1099*a9643ea8Slogwang 	if(!dst || !pkg || len<1)
1100*a9643ea8Slogwang 	{
1101*a9643ea8Slogwang         MTLOG_ERROR("input params invalid, dst[%p], pkg[%p], len[%d], fun[%p]",
1102*a9643ea8Slogwang                     dst, pkg, len, check_func);
1103*a9643ea8Slogwang         return -10;
1104*a9643ea8Slogwang 	}
1105*a9643ea8Slogwang 
1106*a9643ea8Slogwang 
1107*a9643ea8Slogwang     int ret = 0, rc = 0;
1108*a9643ea8Slogwang     int addr_len = sizeof(struct sockaddr_in);
1109*a9643ea8Slogwang     utime64_t start_ms = MtFrame::Instance()->GetLastClock();
1110*a9643ea8Slogwang     utime64_t cost_time = 0;
1111*a9643ea8Slogwang     int time_left = timeout;
1112*a9643ea8Slogwang 
1113*a9643ea8Slogwang     // 1. ��ȡTCP���ӳض���, �ҽ�֪ͨ����
1114*a9643ea8Slogwang     int sock = -1;
1115*a9643ea8Slogwang     TcpKeepConn* conn = mt_tcp_get_keep_conn(dst, sock);
1116*a9643ea8Slogwang     if ((conn == NULL) || (sock < 0))
1117*a9643ea8Slogwang     {
1118*a9643ea8Slogwang         MTLOG_ERROR("socket[%d] get conn failed, ret[%m]", sock);
1119*a9643ea8Slogwang         ret = -1;
1120*a9643ea8Slogwang         goto EXIT_LABEL;
1121*a9643ea8Slogwang     }
1122*a9643ea8Slogwang 
1123*a9643ea8Slogwang     // 2. ���Լ����½�����
1124*a9643ea8Slogwang     rc = MtFrame::connect(sock, (struct sockaddr *)dst, addr_len, time_left);
1125*a9643ea8Slogwang     if (rc < 0)
1126*a9643ea8Slogwang     {
1127*a9643ea8Slogwang         MTLOG_ERROR("socket[%d] connect failed, ret[%d][%m]", sock, rc);
1128*a9643ea8Slogwang         ret = -4;
1129*a9643ea8Slogwang         goto EXIT_LABEL;
1130*a9643ea8Slogwang     }
1131*a9643ea8Slogwang 
1132*a9643ea8Slogwang     // 3. �������ݴ���
1133*a9643ea8Slogwang     cost_time = MtFrame::Instance()->GetLastClock() - start_ms;
1134*a9643ea8Slogwang     time_left = (timeout > (int)cost_time) ? (timeout - (int)cost_time) : 0;
1135*a9643ea8Slogwang     rc = MtFrame::send(sock, pkg, len, 0, time_left);
1136*a9643ea8Slogwang     if (rc < 0)
1137*a9643ea8Slogwang     {
1138*a9643ea8Slogwang         MTLOG_ERROR("socket[%d] send failed, ret[%d][%m]", sock, rc);
1139*a9643ea8Slogwang         ret = -2;
1140*a9643ea8Slogwang         goto EXIT_LABEL;
1141*a9643ea8Slogwang     }
1142*a9643ea8Slogwang 
1143*a9643ea8Slogwang     // 4. �������ݴ���
1144*a9643ea8Slogwang     cost_time = MtFrame::Instance()->GetLastClock() - start_ms;
1145*a9643ea8Slogwang     time_left = (timeout > (int)cost_time) ? (timeout - (int)cost_time) : 0;
1146*a9643ea8Slogwang 
1147*a9643ea8Slogwang     rc = mt_tcp_check_recv(sock, rcv_buf, recv_pkg_size, 0, time_left, check_func, msg_ctx, keep_rcv_buf);
1148*a9643ea8Slogwang     if (rc < 0)
1149*a9643ea8Slogwang     {
1150*a9643ea8Slogwang         MTLOG_ERROR("socket[%d] rcv failed, ret[%d][%m]", sock, rc);
1151*a9643ea8Slogwang         ret = rc;
1152*a9643ea8Slogwang         goto EXIT_LABEL;
1153*a9643ea8Slogwang     }
1154*a9643ea8Slogwang 
1155*a9643ea8Slogwang     ret = 0;
1156*a9643ea8Slogwang 
1157*a9643ea8Slogwang EXIT_LABEL:
1158*a9643ea8Slogwang 
1159*a9643ea8Slogwang     // ʧ����ǿ���ͷ�����, ����ʱ����
1160*a9643ea8Slogwang     if (conn != NULL)
1161*a9643ea8Slogwang     {
1162*a9643ea8Slogwang         ConnectionMgr::Instance()->FreeConnection(conn, (ret < 0));
1163*a9643ea8Slogwang     }
1164*a9643ea8Slogwang 
1165*a9643ea8Slogwang     return ret;
1166*a9643ea8Slogwang }
1167*a9643ea8Slogwang 
1168*a9643ea8Slogwang 
1169*a9643ea8Slogwang /**
1170*a9643ea8Slogwang  * @brief TCP�������շ�����
1171*a9643ea8Slogwang  *        [ע��] tcp���շ���buff, ��������static����, ����������Ĵ��� [��Ҫ]
1172*a9643ea8Slogwang  *        [ע��] �޸Ľӿڣ���ע�ⲻҪ����޸ķ���ֵ������֤��mt_tcpsendrcv_ex����ֵƥ�� [��Ҫ]
1173*a9643ea8Slogwang  * @param dst -�����͵�Ŀ�ĵ�ַ
1174*a9643ea8Slogwang  * @param pkg -�������װ�İ���
1175*a9643ea8Slogwang  * @param len -�������װ�İ��峤��
1176*a9643ea8Slogwang  * @param rcv_buf -���������������ο����� keep_rcv_buf��
1177*a9643ea8Slogwang  * @param recv_pkg_size -����Ӧ�����buff�ij�ʼ��С����������0ʱ�����ر���ʵ�ʳ��ȡ�
1178*a9643ea8Slogwang  * @param timeout -��ʱʱ��, ��λms
1179*a9643ea8Slogwang  * @param check_func -��ⱨ���Ƿ�ɹ����ﺯ��
1180*a9643ea8Slogwang  * @param msg_ctx -�������ĵ������ı�����
1181*a9643ea8Slogwang  * @param keep_rcv_buf -true,��ܽ�����rcv_buf��ֵ������������ҵ�����ͷŸ�buf���������ڵ��ý���ǰ�ͷŸ�buf����ע�⡿
1182*a9643ea8Slogwang  *               ҵ����Ҫ�Լ���msg_ctx�б��������Ϣ�������ͨ��malloc�����ڴ桿
1183*a9643ea8Slogwang  * @return  0 �ɹ�, -1 ��socketʧ��, -2 ��������ʧ��, -3 ����Ӧ��ʧ��,
1184*a9643ea8Slogwang  *          -4 ����ʧ��, -5 ��ⱨ��ʧ��, -6 ���տռ䲻��, -7 ��������ر�����, -10 ������Ч
1185*a9643ea8Slogwang  */
1186*a9643ea8Slogwang int mt_tcpsendrcv_short(struct sockaddr_in* dst, void* pkg, int len, void*& rcv_buf, int& recv_pkg_size,
1187*a9643ea8Slogwang                      int timeout, MtFuncTcpMsgChecker check_func, void* msg_ctx, bool keep_rcv_buf)
1188*a9643ea8Slogwang {
1189*a9643ea8Slogwang     int ret = 0, rc = 0;
1190*a9643ea8Slogwang     int addr_len = sizeof(struct sockaddr_in);
1191*a9643ea8Slogwang     utime64_t start_ms = MtFrame::Instance()->GetLastClock();
1192*a9643ea8Slogwang     utime64_t cost_time = 0;
1193*a9643ea8Slogwang     int time_left = timeout;
1194*a9643ea8Slogwang 
1195*a9643ea8Slogwang     // 1. �������
1196*a9643ea8Slogwang 	if(!dst || !pkg || len<1)
1197*a9643ea8Slogwang 	{
1198*a9643ea8Slogwang         MTLOG_ERROR("input params invalid, dst[%p], pkg[%p], len[%d], fun[%p]",
1199*a9643ea8Slogwang                     dst, pkg, len, check_func);
1200*a9643ea8Slogwang         return -10;
1201*a9643ea8Slogwang 	}
1202*a9643ea8Slogwang 
1203*a9643ea8Slogwang     // 2. ����TCP socket
1204*a9643ea8Slogwang     int sock;
1205*a9643ea8Slogwang     sock = mt_tcp_create_sock();
1206*a9643ea8Slogwang     if (sock < 0)
1207*a9643ea8Slogwang     {
1208*a9643ea8Slogwang         MTLOG_ERROR("create tcp socket failed, ret: %d", sock);
1209*a9643ea8Slogwang         return -1;
1210*a9643ea8Slogwang     }
1211*a9643ea8Slogwang 
1212*a9643ea8Slogwang     // 3. ���Լ����½�����
1213*a9643ea8Slogwang     rc = MtFrame::connect(sock, (struct sockaddr *)dst, addr_len, time_left);
1214*a9643ea8Slogwang     if (rc < 0)
1215*a9643ea8Slogwang     {
1216*a9643ea8Slogwang         MTLOG_ERROR("socket[%d] connect failed, ret[%d][%m]", sock, rc);
1217*a9643ea8Slogwang         ret = -4;
1218*a9643ea8Slogwang         goto EXIT_LABEL;
1219*a9643ea8Slogwang     }
1220*a9643ea8Slogwang 
1221*a9643ea8Slogwang     // 4. �������ݴ���
1222*a9643ea8Slogwang     cost_time = MtFrame::Instance()->GetLastClock() - start_ms;
1223*a9643ea8Slogwang     time_left = (timeout > (int)cost_time) ? (timeout - (int)cost_time) : 0;
1224*a9643ea8Slogwang     rc = MtFrame::send(sock, pkg, len, 0, time_left);
1225*a9643ea8Slogwang     if (rc < 0)
1226*a9643ea8Slogwang     {
1227*a9643ea8Slogwang         MTLOG_ERROR("socket[%d] send failed, ret[%d][%m]", sock, rc);
1228*a9643ea8Slogwang         ret = -2;
1229*a9643ea8Slogwang         goto EXIT_LABEL;
1230*a9643ea8Slogwang     }
1231*a9643ea8Slogwang 
1232*a9643ea8Slogwang     // 5. �������ݴ���
1233*a9643ea8Slogwang     cost_time = MtFrame::Instance()->GetLastClock() - start_ms;
1234*a9643ea8Slogwang     time_left = (timeout > (int)cost_time) ? (timeout - (int)cost_time) : 0;
1235*a9643ea8Slogwang     rc = mt_tcp_check_recv(sock, rcv_buf, recv_pkg_size, 0, time_left, check_func, msg_ctx, keep_rcv_buf);
1236*a9643ea8Slogwang 
1237*a9643ea8Slogwang     if (rc < 0)
1238*a9643ea8Slogwang     {
1239*a9643ea8Slogwang         MTLOG_ERROR("socket[%d] rcv failed, ret[%d][%m]", sock, rc);
1240*a9643ea8Slogwang         ret = rc;
1241*a9643ea8Slogwang         goto EXIT_LABEL;
1242*a9643ea8Slogwang     }
1243*a9643ea8Slogwang 
1244*a9643ea8Slogwang     ret = 0;
1245*a9643ea8Slogwang 
1246*a9643ea8Slogwang EXIT_LABEL:
1247*a9643ea8Slogwang     if (sock >= 0)
1248*a9643ea8Slogwang         ::close(sock);
1249*a9643ea8Slogwang 
1250*a9643ea8Slogwang     return ret;
1251*a9643ea8Slogwang }
1252*a9643ea8Slogwang 
1253*a9643ea8Slogwang 
1254*a9643ea8Slogwang 
1255*a9643ea8Slogwang /**
1256*a9643ea8Slogwang  * @brief TCP�շ��ӿڣ�����ѡ���˱������ӻ��߶�����
1257*a9643ea8Slogwang  *        [ע��] tcp���շ���buff, ��������static����, ����������Ĵ��� [��Ҫ]
1258*a9643ea8Slogwang  * @param dst -�����͵�Ŀ�ĵ�ַ
1259*a9643ea8Slogwang  * @param pkg -�������װ�İ���
1260*a9643ea8Slogwang  * @param len -�������װ�İ��峤��
1261*a9643ea8Slogwang  * @param rcv_buf -���������������ο����� keep_rcv_buf��
1262*a9643ea8Slogwang  * @param recv_pkg_size -����Ӧ�����buff�ij�ʼ��С����������0ʱ�����ر���ʵ�ʳ��ȡ�
1263*a9643ea8Slogwang  * @param timeout -��ʱʱ��, ��λms
1264*a9643ea8Slogwang  * @param check_func -��ⱨ���Ƿ�ɹ����ﺯ��
1265*a9643ea8Slogwang  * @param msg_ctx -�������ĵ������ı�����
1266*a9643ea8Slogwang  *
1267*a9643ea8Slogwang  * @param type - ��������
1268*a9643ea8Slogwang  *               MT_TCP_SHORT: һ��һ�������ӣ�
1269*a9643ea8Slogwang  *               MT_TCP_LONG : һ��һ�ճ����ӣ�
1270*a9643ea8Slogwang  *               MT_TCP_LONG_SNDONLY : ֻ�����ճ����ӣ�
1271*a9643ea8Slogwang  *               MT_TCP_SHORT_SNDONLY: ֻ�����ն����ӣ�
1272*a9643ea8Slogwang  * @param keep_rcv_buf -true,��ܽ�����rcv_buf��ֵ������������ҵ�����ͷŸ�buf���������ڵ��ý���ǰ�ͷŸ�buf����ע�⡿
1273*a9643ea8Slogwang  *               ҵ����Ҫ�Լ���msg_ctx�б��������Ϣ�������ͨ��malloc�����ڴ桿
1274*a9643ea8Slogwang  * @return  0 �ɹ�, -1 ��socketʧ��, -2 ��������ʧ��, -3 ����Ӧ��ʧ��,
1275*a9643ea8Slogwang  *          -4 ����ʧ��, -5 ��ⱨ��ʧ��, -6 ���տռ䲻��, -7 ��������ر�����, -10 ������Ч
1276*a9643ea8Slogwang  */
1277*a9643ea8Slogwang int mt_tcpsendrcv_ex(struct sockaddr_in* dst, void* pkg, int len, void*& rcv_buf, int& rcv_pkg_size,
1278*a9643ea8Slogwang                      int timeout, MtFuncTcpMsgChecker check_func, void* msg_ctx,
1279*a9643ea8Slogwang                      MT_TCP_CONN_TYPE type, bool keep_rcv_buf)
1280*a9643ea8Slogwang {
1281*a9643ea8Slogwang 	if(!dst || !pkg || len<1)
1282*a9643ea8Slogwang 	{
1283*a9643ea8Slogwang         MTLOG_ERROR("input params invalid, dst[%p], pkg[%p], len[%d], fun[%p], msg_ctx[%p], type[%d]",
1284*a9643ea8Slogwang                     dst, pkg, len, check_func, msg_ctx, type);
1285*a9643ea8Slogwang         return -10;
1286*a9643ea8Slogwang 	}
1287*a9643ea8Slogwang 
1288*a9643ea8Slogwang     switch (type)
1289*a9643ea8Slogwang     {
1290*a9643ea8Slogwang         // TCP�����ӵ�������
1291*a9643ea8Slogwang         case MT_TCP_LONG:
1292*a9643ea8Slogwang         {
1293*a9643ea8Slogwang             return mt_tcpsendrcv(dst, pkg, len, rcv_buf, rcv_pkg_size, timeout, check_func, msg_ctx, keep_rcv_buf);
1294*a9643ea8Slogwang         }
1295*a9643ea8Slogwang 
1296*a9643ea8Slogwang         // TCP������ֻ������
1297*a9643ea8Slogwang         case MT_TCP_LONG_SNDONLY:
1298*a9643ea8Slogwang         {
1299*a9643ea8Slogwang             return mt_tcpsend(dst, pkg, len, timeout);
1300*a9643ea8Slogwang         }
1301*a9643ea8Slogwang 
1302*a9643ea8Slogwang         // TCP�����ӵ�������
1303*a9643ea8Slogwang         case MT_TCP_SHORT:
1304*a9643ea8Slogwang         {
1305*a9643ea8Slogwang             return mt_tcpsendrcv_short(dst, pkg, len, rcv_buf, rcv_pkg_size, timeout, check_func, msg_ctx, keep_rcv_buf);
1306*a9643ea8Slogwang         }
1307*a9643ea8Slogwang 
1308*a9643ea8Slogwang         // TCP������ֻ������
1309*a9643ea8Slogwang         case MT_TCP_SHORT_SNDONLY:
1310*a9643ea8Slogwang         {
1311*a9643ea8Slogwang             return mt_tcpsend_short(dst, pkg, len, timeout);
1312*a9643ea8Slogwang         }
1313*a9643ea8Slogwang 
1314*a9643ea8Slogwang         default:
1315*a9643ea8Slogwang         {
1316*a9643ea8Slogwang 	        MTLOG_ERROR("input params invalid, dst[%p], pkg[%p], len[%d], fun[%p], msg_ctx[%p], type[%d]",
1317*a9643ea8Slogwang                     dst, pkg, len, check_func, msg_ctx, type);
1318*a9643ea8Slogwang             return -10;
1319*a9643ea8Slogwang         }
1320*a9643ea8Slogwang     }
1321*a9643ea8Slogwang 
1322*a9643ea8Slogwang     return 0;
1323*a9643ea8Slogwang }
1324*a9643ea8Slogwang 
1325*a9643ea8Slogwang 
1326*a9643ea8Slogwang 
1327*a9643ea8Slogwang }
1328*a9643ea8Slogwang 
1329*a9643ea8Slogwang 
1330