xref: /f-stack/app/micro_thread/mt_api.cpp (revision a02c88d6)
1a9643ea8Slogwang 
2a9643ea8Slogwang /**
3a9643ea8Slogwang  * Tencent is pleased to support the open source community by making MSEC available.
4a9643ea8Slogwang  *
5a9643ea8Slogwang  * Copyright (C) 2016 THL A29 Limited, a Tencent company. All rights reserved.
6a9643ea8Slogwang  *
7a9643ea8Slogwang  * Licensed under the GNU General Public License, Version 2.0 (the "License");
8a9643ea8Slogwang  * you may not use this file except in compliance with the License. You may
9a9643ea8Slogwang  * obtain a copy of the License at
10a9643ea8Slogwang  *
11a9643ea8Slogwang  *     https://opensource.org/licenses/GPL-2.0
12a9643ea8Slogwang  *
13a9643ea8Slogwang  * Unless required by applicable law or agreed to in writing, software distributed under the
14a9643ea8Slogwang  * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
15a9643ea8Slogwang  * either express or implied. See the License for the specific language governing permissions
16a9643ea8Slogwang  * and limitations under the License.
17a9643ea8Slogwang  */
18a9643ea8Slogwang 
19a9643ea8Slogwang 
20a9643ea8Slogwang /**
21a9643ea8Slogwang  *  @filename mt_sys_call.cpp
22a9643ea8Slogwang  *  @info  ΢�̷߳�װϵͳapi, ͬ������΢�߳�API��ʵ���첽����
23a9643ea8Slogwang  */
24a9643ea8Slogwang 
25a9643ea8Slogwang #include "kqueue_proxy.h"
26a9643ea8Slogwang #include "micro_thread.h"
27a9643ea8Slogwang #include "mt_connection.h"
28a9643ea8Slogwang #include "mt_api.h"
29a9643ea8Slogwang #include "ff_api.h"
30a9643ea8Slogwang #include "mt_sys_hook.h"
31a9643ea8Slogwang 
32a9643ea8Slogwang namespace NS_MICRO_THREAD {
33a9643ea8Slogwang 
34a9643ea8Slogwang /**
35a9643ea8Slogwang  * @brief ��������˿ڵ�socket�շ��ӿ�, ��socket������������, ҵ������֤������
36a9643ea8Slogwang  * @param dst -�����͵�Ŀ�ĵ�ַ
37a9643ea8Slogwang  * @param pkg -�������װ�İ���
38a9643ea8Slogwang  * @param len -�������װ�İ��峤��
39a9643ea8Slogwang  * @param rcv_buf -����Ӧ�����buff
40a9643ea8Slogwang  * @param buf_size -modify-����Ӧ�����buff��С, �ɹ�����ʱ, �޸�ΪӦ�������
41a9643ea8Slogwang  * @param timeout -��ʱʱ��, ��λms
42a9643ea8Slogwang  * @return  0 �ɹ�, -1 ��socketʧ��, -2 ��������ʧ��, -3 ����Ӧ��ʧ��, �ɴ�ӡerrno, -10 ������Ч
43a9643ea8Slogwang  */
44a9643ea8Slogwang int mt_udpsendrcv(struct sockaddr_in* dst, void* pkg, int len, void* rcv_buf, int& buf_size, int timeout)
45a9643ea8Slogwang {
46a9643ea8Slogwang     int ret = 0;
47a9643ea8Slogwang     int rc  = 0;
48a9643ea8Slogwang     int flags = 1;
49a9643ea8Slogwang     struct sockaddr_in from_addr = {0};
50a9643ea8Slogwang     int addr_len = sizeof(from_addr);
51a9643ea8Slogwang 
52a9643ea8Slogwang     if(len<1 || buf_size<1 ||!dst || !pkg || !rcv_buf)
53a9643ea8Slogwang     {
54a9643ea8Slogwang         MTLOG_ERROR("mt_udpsendrcv input params invalid, dst[%p], pkg[%p], rcv_buf[%p], len[%d], buf_size[%d]",
55a9643ea8Slogwang             dst, pkg, rcv_buf, len, buf_size);
56a9643ea8Slogwang         return -10;
57a9643ea8Slogwang     }
58a9643ea8Slogwang 
59a9643ea8Slogwang     int sock = socket(PF_INET, SOCK_DGRAM, 0);
60a9643ea8Slogwang     if ((sock < 0) || (ioctl(sock, FIONBIO, &flags) < 0))
61a9643ea8Slogwang     {
62a9643ea8Slogwang         MT_ATTR_API(320842, 1); // socketʧ��
63a9643ea8Slogwang         MTLOG_ERROR("mt_udpsendrcv new sock failed, sock: %d, errno: %d (%m)", sock, errno);
64a9643ea8Slogwang         ret = -1;
65a9643ea8Slogwang         goto EXIT_LABEL;
66a9643ea8Slogwang     }
67a9643ea8Slogwang 
68a9643ea8Slogwang     rc = MtFrame::sendto(sock, pkg, len, 0, (struct sockaddr*)dst, (int)sizeof(*dst), timeout);
69a9643ea8Slogwang     if (rc < 0)
70a9643ea8Slogwang     {
71a9643ea8Slogwang         MT_ATTR_API(320844, 1); // ����ʧ��
72a9643ea8Slogwang         MTLOG_ERROR("mt_udpsendrcv send failed, rc: %d, errno: %d (%m)", rc, errno);
73a9643ea8Slogwang         ret = -2;
74a9643ea8Slogwang         goto EXIT_LABEL;
75a9643ea8Slogwang     }
76a9643ea8Slogwang 
77a9643ea8Slogwang     rc = MtFrame::recvfrom(sock, rcv_buf, buf_size, 0, (struct sockaddr*)&from_addr, (socklen_t*)&addr_len, timeout);
78a9643ea8Slogwang     if (rc < 0)
79a9643ea8Slogwang     {
80a9643ea8Slogwang         MT_ATTR_API(320845, 1); // ����δ��ȫ�ɹ�
81a9643ea8Slogwang         MTLOG_ERROR("mt_udpsendrcv recv failed, rc: %d, errno: %d (%m)", rc, errno);
82a9643ea8Slogwang         ret = -3;
83a9643ea8Slogwang         goto EXIT_LABEL;
84a9643ea8Slogwang     }
85a9643ea8Slogwang     buf_size = rc;
86a9643ea8Slogwang 
87a9643ea8Slogwang EXIT_LABEL:
88a9643ea8Slogwang 
89a9643ea8Slogwang     if (sock > 0)
90a9643ea8Slogwang     {
91a9643ea8Slogwang         close(sock);
92a9643ea8Slogwang         sock = -1;
93a9643ea8Slogwang     }
94a9643ea8Slogwang 
95a9643ea8Slogwang     return ret;
96a9643ea8Slogwang }
97a9643ea8Slogwang 
98a9643ea8Slogwang /**
99a9643ea8Slogwang  * @brief  ����TCP�׽��֣�������Ϊ������
100a9643ea8Slogwang  * @return >=0 �ɹ�, <0 ʧ��
101a9643ea8Slogwang  */
102a9643ea8Slogwang int mt_tcp_create_sock(void)
103a9643ea8Slogwang {
104a9643ea8Slogwang     int fd;
105a9643ea8Slogwang     int flag;
106a9643ea8Slogwang 
107a9643ea8Slogwang     // ����socket
108a9643ea8Slogwang     fd = ::socket(AF_INET, SOCK_STREAM, 0);
109a9643ea8Slogwang     if (fd < 0)
110a9643ea8Slogwang     {
111a9643ea8Slogwang         MTLOG_ERROR("create tcp socket failed, error: %m");
112a9643ea8Slogwang         return -1;
113a9643ea8Slogwang     }
114a9643ea8Slogwang 
115a9643ea8Slogwang     // ����socket������
116a9643ea8Slogwang     flag = fcntl(fd, F_GETFL, 0);
117a9643ea8Slogwang     if (flag == -1)
118a9643ea8Slogwang     {
119a9643ea8Slogwang         ::close(fd);
120a9643ea8Slogwang         MTLOG_ERROR("get fd flags failed, error: %m");
121a9643ea8Slogwang         return -2;
122a9643ea8Slogwang     }
123a9643ea8Slogwang 
124a9643ea8Slogwang     if (flag & O_NONBLOCK)
125a9643ea8Slogwang         return fd;
126a9643ea8Slogwang 
127a9643ea8Slogwang     if (fcntl(fd, F_SETFL, flag | O_NONBLOCK | O_NDELAY) == -1)
128a9643ea8Slogwang     {
129a9643ea8Slogwang         ::close(fd);
130a9643ea8Slogwang         MTLOG_ERROR("set fd flags failed, error: %m");
131a9643ea8Slogwang         return -3;
132a9643ea8Slogwang     }
133a9643ea8Slogwang 
134a9643ea8Slogwang     return fd;
135a9643ea8Slogwang }
136a9643ea8Slogwang 
137a9643ea8Slogwang /**
138a9643ea8Slogwang  * @brief TCP��ȡ������֪ͨ������socket
139a9643ea8Slogwang  */
140a9643ea8Slogwang static TcpKeepConn* mt_tcp_get_keep_conn(struct sockaddr_in* dst, int& sock)
141a9643ea8Slogwang {
142a9643ea8Slogwang     // 1. ��ȡ�߳�֪ͨע�����
143a9643ea8Slogwang     KqueuerObj* ntfy_obj = NtfyObjMgr::Instance()->GetNtfyObj(NTFY_OBJ_THREAD, 0);
144a9643ea8Slogwang     if (NULL == ntfy_obj)
145a9643ea8Slogwang     {
146a9643ea8Slogwang         MTLOG_ERROR("get notify failed, logit");
147a9643ea8Slogwang         return NULL;
148a9643ea8Slogwang     }
149a9643ea8Slogwang 
150a9643ea8Slogwang     // 2. ��ȡ���Ӷ���, ����֪ͨ��Ϣ
151a9643ea8Slogwang     TcpKeepConn* conn = dynamic_cast<TcpKeepConn*>(ConnectionMgr::Instance()->GetConnection(OBJ_TCP_KEEP, dst));
152a9643ea8Slogwang     if (NULL == conn)
153a9643ea8Slogwang     {
154a9643ea8Slogwang         MTLOG_ERROR("get connection failed, dst[%p]", dst);
155a9643ea8Slogwang         NtfyObjMgr::Instance()->FreeNtfyObj(ntfy_obj);
156a9643ea8Slogwang         return NULL;
157a9643ea8Slogwang     }
158a9643ea8Slogwang     conn->SetNtfyObj(ntfy_obj);
159a9643ea8Slogwang 
160a9643ea8Slogwang     // 3. ��������socket���
161a9643ea8Slogwang     int osfd = conn->CreateSocket();
162a9643ea8Slogwang     if (osfd < 0)
163a9643ea8Slogwang     {
164a9643ea8Slogwang         ConnectionMgr::Instance()->FreeConnection(conn, true);
165a9643ea8Slogwang         MTLOG_ERROR("create socket failed, ret[%d]", osfd);
166a9643ea8Slogwang         return NULL;
167a9643ea8Slogwang     }
168a9643ea8Slogwang 
169a9643ea8Slogwang     // 4. �ɹ���������
170a9643ea8Slogwang     sock = osfd;
171a9643ea8Slogwang     return conn;
172a9643ea8Slogwang }
173a9643ea8Slogwang 
174a9643ea8Slogwang /**
175a9643ea8Slogwang  * @brief TCPѭ������, ֱ������OK��ʱ
176a9643ea8Slogwang  *       [ע��] �����߲�Ҫ�����޸ĺ�������ֵ����֤��Ҫ��mt_tcpsendrcv�ȵ��ýӿڳ�ͻ [��Ҫ]
177a9643ea8Slogwang  */
178a9643ea8Slogwang static int mt_tcp_check_recv(int sock, char* rcv_buf, int &len, int flags, int timeout, MtFuncTcpMsgLen func)
179a9643ea8Slogwang {
180a9643ea8Slogwang     int recv_len = 0;
181a9643ea8Slogwang     utime64_t start_ms = MtFrame::Instance()->GetLastClock();
182a9643ea8Slogwang     do
183a9643ea8Slogwang     {
184a9643ea8Slogwang         utime64_t cost_time = MtFrame::Instance()->GetLastClock() - start_ms;
185a9643ea8Slogwang         if (cost_time > (utime64_t)timeout)
186a9643ea8Slogwang         {
187a9643ea8Slogwang             errno = ETIME;
188a9643ea8Slogwang             MTLOG_ERROR("tcp socket[%d] recv not ok, timeout", sock);
189a9643ea8Slogwang             return -3;
190a9643ea8Slogwang         }
191a9643ea8Slogwang 
192a9643ea8Slogwang         int rc = MtFrame::recv(sock, (rcv_buf + recv_len), (len - recv_len), 0, (timeout - (int)cost_time));
193a9643ea8Slogwang         if (rc < 0)
194a9643ea8Slogwang         {
195a9643ea8Slogwang             MTLOG_ERROR("tcp socket[%d] recv failed ret[%d][%m]", sock, rc);
196a9643ea8Slogwang             return -3;
197a9643ea8Slogwang         }
198a9643ea8Slogwang 		else if (rc == 0)
199a9643ea8Slogwang         {
200a9643ea8Slogwang         	len = recv_len;
201a9643ea8Slogwang             MTLOG_ERROR("tcp socket[%d] remote close", sock);
202a9643ea8Slogwang             return -7;
203a9643ea8Slogwang         }
204a9643ea8Slogwang         recv_len += rc;
205a9643ea8Slogwang 
206a9643ea8Slogwang         /* ��鱨�������� */
207a9643ea8Slogwang         rc = func(rcv_buf, recv_len);
208a9643ea8Slogwang         if (rc < 0)
209a9643ea8Slogwang         {
210a9643ea8Slogwang             MTLOG_ERROR("tcp socket[%d] user check pkg error[%d]", sock, rc);
211a9643ea8Slogwang             return -5;
212a9643ea8Slogwang         }
213a9643ea8Slogwang         else if (rc == 0) // ����δ������
214a9643ea8Slogwang         {
215a9643ea8Slogwang             if (len == recv_len) // û�ռ��ٽ�����, ����
216a9643ea8Slogwang             {
217a9643ea8Slogwang                 MTLOG_ERROR("tcp socket[%d] user check pkg not ok, but no more buff", sock);
218a9643ea8Slogwang                 return -6;
219a9643ea8Slogwang             }
220a9643ea8Slogwang             continue;
221a9643ea8Slogwang         }
222a9643ea8Slogwang         else    // �ɹ����㱨�ij���
223a9643ea8Slogwang         {
224a9643ea8Slogwang             if (rc > recv_len) // ���Ļ�δ��ȫ
225a9643ea8Slogwang             {
226a9643ea8Slogwang                 continue;
227a9643ea8Slogwang             }
228a9643ea8Slogwang             else
229a9643ea8Slogwang             {
230a9643ea8Slogwang                 len = rc;
231a9643ea8Slogwang                 break;
232a9643ea8Slogwang             }
233a9643ea8Slogwang         }
234a9643ea8Slogwang     } while (true);
235a9643ea8Slogwang 
236a9643ea8Slogwang     return 0;
237a9643ea8Slogwang }
238a9643ea8Slogwang 
239a9643ea8Slogwang /**
240a9643ea8Slogwang  * @brief TCP��������ӳصķ�ʽ����IP/PORT����, ���ӱ���Ĭ��10����
241a9643ea8Slogwang  *        [ע��] tcp���շ���buff, ��������static����, ����������Ĵ��� [��Ҫ]
242a9643ea8Slogwang  *        [ע��] �޸Ľӿڣ���ע�ⲻҪ����޸ķ���ֵ������֤��mt_tcpsendrcv_ex����ֵƥ�� [��Ҫ]
243a9643ea8Slogwang  * @param dst -�����͵�Ŀ�ĵ�ַ
244a9643ea8Slogwang  * @param pkg -�������װ�İ���
245a9643ea8Slogwang  * @param len -�������װ�İ��峤��
246a9643ea8Slogwang  * @param rcv_buf -����Ӧ�����buff
247a9643ea8Slogwang  * @param buf_size -modify-����Ӧ�����buff��С, �ɹ�����ʱ, �޸�ΪӦ�������
248a9643ea8Slogwang  * @param timeout -��ʱʱ��, ��λms
249a9643ea8Slogwang  * @param check_func -��ⱨ���Ƿ�ɹ����ﺯ��
250a9643ea8Slogwang  * @return  0 �ɹ�, -1 ��socketʧ��, -2 ��������ʧ��, -3 ����Ӧ��ʧ��,
251a9643ea8Slogwang  *          -4 ����ʧ��, -5 ��ⱨ��ʧ��, -6 ���տռ䲻��, -7 ��������ر����ӣ�-10 ������Ч
252a9643ea8Slogwang  */
253a9643ea8Slogwang int mt_tcpsendrcv(struct sockaddr_in* dst, void* pkg, int len, void* rcv_buf, int& buf_size, int timeout, MtFuncTcpMsgLen func)
254a9643ea8Slogwang {
255a9643ea8Slogwang     if (!dst || !pkg || !rcv_buf || !func || len<1 || buf_size<1)
256a9643ea8Slogwang     {
257a9643ea8Slogwang         MTLOG_ERROR("input params invalid, dst[%p], pkg[%p], rcv_buf[%p], fun[%p], len[%d], buf_size[%d]",
258a9643ea8Slogwang             dst, pkg, rcv_buf, func, len, buf_size);
259a9643ea8Slogwang         return -10;
260a9643ea8Slogwang     }
261a9643ea8Slogwang 
262a9643ea8Slogwang     int ret = 0, rc = 0;
263a9643ea8Slogwang     int addr_len = sizeof(struct sockaddr_in);
264a9643ea8Slogwang     utime64_t start_ms = MtFrame::Instance()->GetLastClock();
265a9643ea8Slogwang     utime64_t cost_time = 0;
266a9643ea8Slogwang     int time_left = timeout;
267a9643ea8Slogwang 
268a9643ea8Slogwang     // 1. ��ȡTCP���ӳض���, �ҽ�֪ͨ����
269a9643ea8Slogwang     int sock = -1;
270a9643ea8Slogwang     TcpKeepConn* conn = mt_tcp_get_keep_conn(dst, sock);
271a9643ea8Slogwang     if ((conn == NULL) || (sock < 0))
272a9643ea8Slogwang     {
273a9643ea8Slogwang         MTLOG_ERROR("socket[%d] get conn failed, ret[%m]", sock);
274a9643ea8Slogwang         ret = -1;
275a9643ea8Slogwang         goto EXIT_LABEL;
276a9643ea8Slogwang     }
277a9643ea8Slogwang 
278a9643ea8Slogwang     // 2. ���Լ����½�����
279a9643ea8Slogwang     rc = MtFrame::connect(sock, (struct sockaddr *)dst, addr_len, time_left);
280a9643ea8Slogwang     if (rc < 0)
281a9643ea8Slogwang     {
282a9643ea8Slogwang         MTLOG_ERROR("socket[%d] connect failed, ret[%d][%m]", sock, rc);
283a9643ea8Slogwang         ret = -4;
284a9643ea8Slogwang         goto EXIT_LABEL;
285a9643ea8Slogwang     }
286a9643ea8Slogwang 
287a9643ea8Slogwang     // 3. �������ݴ���
288a9643ea8Slogwang     cost_time = MtFrame::Instance()->GetLastClock() - start_ms;
289a9643ea8Slogwang     time_left = (timeout > (int)cost_time) ? (timeout - (int)cost_time) : 0;
290a9643ea8Slogwang     rc = MtFrame::send(sock, pkg, len, 0, time_left);
291a9643ea8Slogwang     if (rc < 0)
292a9643ea8Slogwang     {
293a9643ea8Slogwang         MTLOG_ERROR("socket[%d] send failed, ret[%d][%m]", sock, rc);
294a9643ea8Slogwang         ret = -2;
295a9643ea8Slogwang         goto EXIT_LABEL;
296a9643ea8Slogwang     }
297a9643ea8Slogwang 
298a9643ea8Slogwang     // 4. �������ݴ���
299a9643ea8Slogwang     cost_time = MtFrame::Instance()->GetLastClock() - start_ms;
300a9643ea8Slogwang     time_left = (timeout > (int)cost_time) ? (timeout - (int)cost_time) : 0;
301a9643ea8Slogwang     rc = mt_tcp_check_recv(sock, (char*)rcv_buf, buf_size, 0, time_left, func);
302a9643ea8Slogwang     if (rc < 0)
303a9643ea8Slogwang     {
304a9643ea8Slogwang         MTLOG_ERROR("socket[%d] rcv failed, ret[%d][%m]", sock, rc);
305a9643ea8Slogwang         ret = rc;
306a9643ea8Slogwang         goto EXIT_LABEL;
307a9643ea8Slogwang     }
308a9643ea8Slogwang 
309a9643ea8Slogwang     ret = 0;
310a9643ea8Slogwang 
311a9643ea8Slogwang EXIT_LABEL:
312a9643ea8Slogwang 
313a9643ea8Slogwang     // ʧ����ǿ���ͷ�����, ����ʱ����
314a9643ea8Slogwang     if (conn != NULL)
315a9643ea8Slogwang     {
316a9643ea8Slogwang         ConnectionMgr::Instance()->FreeConnection(conn, (ret < 0));
317a9643ea8Slogwang     }
318a9643ea8Slogwang 
319a9643ea8Slogwang     return ret;
320a9643ea8Slogwang }
321a9643ea8Slogwang 
322a9643ea8Slogwang /**
323a9643ea8Slogwang  * @brief TCP�������շ�����
324a9643ea8Slogwang  *        [ע��] tcp���շ���buff, ��������static����, ����������Ĵ��� [��Ҫ]
325a9643ea8Slogwang  *        [ע��] �޸Ľӿڣ���ע�ⲻҪ����޸ķ���ֵ������֤��mt_tcpsendrcv_ex����ֵƥ�� [��Ҫ]
326a9643ea8Slogwang  * @param dst -�����͵�Ŀ�ĵ�ַ
327a9643ea8Slogwang  * @param pkg -�������װ�İ���
328a9643ea8Slogwang  * @param len -�������װ�İ��峤��
329a9643ea8Slogwang  * @param rcv_buf -����Ӧ�����buff
330a9643ea8Slogwang  * @param buf_size -modify-����Ӧ�����buff��С, �ɹ�����ʱ, �޸�ΪӦ�������
331a9643ea8Slogwang  * @param timeout -��ʱʱ��, ��λms
332a9643ea8Slogwang  * @param check_func -��ⱨ���Ƿ�ɹ����ﺯ��
333a9643ea8Slogwang  * @return  0 �ɹ�, -1 ��socketʧ��, -2 ��������ʧ��, -3 ����Ӧ��ʧ��,
334a9643ea8Slogwang  *          -4 ����ʧ��, -5 ��ⱨ��ʧ��, -6 ���տռ䲻��, -7 ��������ر����ӣ�-10 ������Ч
335a9643ea8Slogwang  */
336a9643ea8Slogwang int mt_tcpsendrcv_short(struct sockaddr_in* dst, void* pkg, int len, void* rcv_buf, int& buf_size, int timeout, MtFuncTcpMsgLen func)
337a9643ea8Slogwang {
338a9643ea8Slogwang     int ret = 0, rc = 0;
339a9643ea8Slogwang     int addr_len = sizeof(struct sockaddr_in);
340a9643ea8Slogwang     utime64_t start_ms = MtFrame::Instance()->GetLastClock();
341a9643ea8Slogwang     utime64_t cost_time = 0;
342a9643ea8Slogwang     int time_left = timeout;
343a9643ea8Slogwang 
344a9643ea8Slogwang     // 1. �������
345a9643ea8Slogwang     if (!dst || !pkg || !rcv_buf || !func || len<1 || buf_size<1)
346a9643ea8Slogwang     {
347a9643ea8Slogwang         MTLOG_ERROR("input params invalid, dst[%p], pkg[%p], rcv_buf[%p], fun[%p], len[%d], buf_size[%d]",
348a9643ea8Slogwang             dst, pkg, rcv_buf, func, len, buf_size);
349a9643ea8Slogwang         return -10;
350a9643ea8Slogwang     }
351a9643ea8Slogwang 
352a9643ea8Slogwang     // 2. ����TCP socket
353a9643ea8Slogwang     int sock;
354a9643ea8Slogwang     sock = mt_tcp_create_sock();
355a9643ea8Slogwang     if (sock < 0)
356a9643ea8Slogwang     {
357a9643ea8Slogwang         MTLOG_ERROR("create tcp socket failed, ret: %d", sock);
358a9643ea8Slogwang         return -1;
359a9643ea8Slogwang     }
360a9643ea8Slogwang 
361a9643ea8Slogwang     // 3. ���Լ����½�����
362a9643ea8Slogwang     rc = MtFrame::connect(sock, (struct sockaddr *)dst, addr_len, time_left);
363a9643ea8Slogwang     if (rc < 0)
364a9643ea8Slogwang     {
365a9643ea8Slogwang         MTLOG_ERROR("socket[%d] connect failed, ret[%d][%m]", sock, rc);
366a9643ea8Slogwang         ret = -4;
367a9643ea8Slogwang         goto EXIT_LABEL;
368a9643ea8Slogwang     }
369a9643ea8Slogwang 
370a9643ea8Slogwang     // 4. �������ݴ���
371a9643ea8Slogwang     cost_time = MtFrame::Instance()->GetLastClock() - start_ms;
372a9643ea8Slogwang     time_left = (timeout > (int)cost_time) ? (timeout - (int)cost_time) : 0;
373a9643ea8Slogwang     rc = MtFrame::send(sock, pkg, len, 0, time_left);
374a9643ea8Slogwang     if (rc < 0)
375a9643ea8Slogwang     {
376a9643ea8Slogwang         MTLOG_ERROR("socket[%d] send failed, ret[%d][%m]", sock, rc);
377a9643ea8Slogwang         ret = -2;
378a9643ea8Slogwang         goto EXIT_LABEL;
379a9643ea8Slogwang     }
380a9643ea8Slogwang 
381a9643ea8Slogwang     // 5. �������ݴ���
382a9643ea8Slogwang     cost_time = MtFrame::Instance()->GetLastClock() - start_ms;
383a9643ea8Slogwang     time_left = (timeout > (int)cost_time) ? (timeout - (int)cost_time) : 0;
384a9643ea8Slogwang     rc = mt_tcp_check_recv(sock, (char*)rcv_buf, buf_size, 0, time_left, func);
385a9643ea8Slogwang     if (rc < 0)
386a9643ea8Slogwang     {
387a9643ea8Slogwang         MTLOG_ERROR("socket[%d] rcv failed, ret[%d][%m]", sock, rc);
388a9643ea8Slogwang         ret = rc;
389a9643ea8Slogwang         goto EXIT_LABEL;
390a9643ea8Slogwang     }
391a9643ea8Slogwang 
392a9643ea8Slogwang     ret = 0;
393a9643ea8Slogwang 
394a9643ea8Slogwang EXIT_LABEL:
395a9643ea8Slogwang     if (sock >= 0)
396a9643ea8Slogwang         ::close(sock);
397a9643ea8Slogwang 
398a9643ea8Slogwang     return ret;
399a9643ea8Slogwang }
400a9643ea8Slogwang 
401a9643ea8Slogwang 
402a9643ea8Slogwang /**
403a9643ea8Slogwang  * @brief TCP��������ӳصķ�ʽ����IP/PORT����, ���ӱ���Ĭ��10����
404a9643ea8Slogwang  *        [ע��] tcp����buff, ��������static����, ����������Ĵ��� [��Ҫ]
405a9643ea8Slogwang  *        [ע��] �޸Ľӿڣ���ע�ⲻҪ����޸ķ���ֵ������֤��mt_tcpsendrcv_ex����ֵƥ�� [��Ҫ]
406a9643ea8Slogwang  * @param dst -�����͵�Ŀ�ĵ�ַ
407a9643ea8Slogwang  * @param pkg -�������װ�İ���
408a9643ea8Slogwang  * @param len -�������װ�İ��峤��
409a9643ea8Slogwang  * @param timeout -��ʱʱ��, ��λms
410a9643ea8Slogwang  * @return  0 �ɹ�, -1 ��socketʧ��, -2 ��������ʧ��, -4 ����ʧ��, -10 ������Ч
411a9643ea8Slogwang  */
412a9643ea8Slogwang int mt_tcpsend(struct sockaddr_in* dst, void* pkg, int len, int timeout)
413a9643ea8Slogwang {
414a9643ea8Slogwang     if (!dst || !pkg || len<1)
415a9643ea8Slogwang     {
416a9643ea8Slogwang         MTLOG_ERROR("input params invalid, dst[%p], pkg[%p], len[%d]", dst, pkg, len);
417a9643ea8Slogwang         return -10;
418a9643ea8Slogwang     }
419a9643ea8Slogwang 
420a9643ea8Slogwang     int ret = 0, rc = 0;
421a9643ea8Slogwang     int addr_len = sizeof(struct sockaddr_in);
422a9643ea8Slogwang     utime64_t start_ms = MtFrame::Instance()->GetLastClock();
423a9643ea8Slogwang     utime64_t cost_time = 0;
424a9643ea8Slogwang     int time_left = timeout;
425a9643ea8Slogwang 
426a9643ea8Slogwang     // 1. ��ȡTCP���ӳض���, �ҽ�֪ͨ����
427a9643ea8Slogwang     int sock = -1;
428a9643ea8Slogwang     TcpKeepConn* conn = mt_tcp_get_keep_conn(dst, sock);
429a9643ea8Slogwang     if ((conn == NULL) || (sock < 0))
430a9643ea8Slogwang     {
431a9643ea8Slogwang         MTLOG_ERROR("socket[%d] get conn failed, ret[%m]", sock);
432a9643ea8Slogwang         ret = -1;
433a9643ea8Slogwang         goto EXIT_LABEL;
434a9643ea8Slogwang     }
435a9643ea8Slogwang 
436a9643ea8Slogwang     // 2. ���Լ����½�����
437a9643ea8Slogwang     rc = MtFrame::connect(sock, (struct sockaddr *)dst, addr_len, time_left);
438a9643ea8Slogwang     if (rc < 0)
439a9643ea8Slogwang     {
440a9643ea8Slogwang         MTLOG_ERROR("socket[%d] connect failed, ret[%d][%m]", sock, rc);
441a9643ea8Slogwang         ret = -4;
442a9643ea8Slogwang         goto EXIT_LABEL;
443a9643ea8Slogwang     }
444a9643ea8Slogwang 
445a9643ea8Slogwang     // 3. �������ݴ���
446a9643ea8Slogwang     cost_time = MtFrame::Instance()->GetLastClock() - start_ms;
447a9643ea8Slogwang     time_left = (timeout > (int)cost_time) ? (timeout - (int)cost_time) : 0;
448a9643ea8Slogwang     rc = MtFrame::send(sock, pkg, len, 0, time_left);
449a9643ea8Slogwang     if (rc < 0)
450a9643ea8Slogwang     {
451a9643ea8Slogwang         MTLOG_ERROR("socket[%d] send failed, ret[%d][%m]", sock, rc);
452a9643ea8Slogwang         ret = -2;
453a9643ea8Slogwang         goto EXIT_LABEL;
454a9643ea8Slogwang     }
455a9643ea8Slogwang 
456a9643ea8Slogwang     ret = 0;
457a9643ea8Slogwang 
458a9643ea8Slogwang EXIT_LABEL:
459a9643ea8Slogwang 
460a9643ea8Slogwang     // ʧ����ǿ���ͷ�����, ����ʱ����
461a9643ea8Slogwang     if (conn != NULL)
462a9643ea8Slogwang     {
463a9643ea8Slogwang         ConnectionMgr::Instance()->FreeConnection(conn, (ret < 0));
464a9643ea8Slogwang     }
465a9643ea8Slogwang 
466a9643ea8Slogwang     return ret;
467a9643ea8Slogwang }
468a9643ea8Slogwang 
469a9643ea8Slogwang /**
470a9643ea8Slogwang  * @brief TCP������ֻ�����սӿ�
471a9643ea8Slogwang  *        [ע��] tcp����buff, ��������static����, ����������Ĵ��� [��Ҫ]
472a9643ea8Slogwang  *        [ע��] �޸Ľӿڣ���ע�ⲻҪ����޸ķ���ֵ������֤��mt_tcpsendrcv_ex����ֵƥ�� [��Ҫ]
473a9643ea8Slogwang  * @param dst -�����͵�Ŀ�ĵ�ַ
474a9643ea8Slogwang  * @param pkg -�������װ�İ���
475a9643ea8Slogwang  * @param len -�������װ�İ��峤��
476a9643ea8Slogwang  * @param timeout -��ʱʱ��, ��λms
477a9643ea8Slogwang  * @return  0 �ɹ�, -1 ��socketʧ��, -2 ��������ʧ��, -4 ����ʧ��, -10 ������Ч
478a9643ea8Slogwang  */
479a9643ea8Slogwang int mt_tcpsend_short(struct sockaddr_in* dst, void* pkg, int len, int timeout)
480a9643ea8Slogwang {
481a9643ea8Slogwang     // 1. ��μ��
482a9643ea8Slogwang     if (!dst || !pkg || len<1)
483a9643ea8Slogwang     {
484a9643ea8Slogwang         MTLOG_ERROR("input params invalid, dst[%p], pkg[%p], len[%d]", dst, pkg, len);
485a9643ea8Slogwang         return -10;
486a9643ea8Slogwang     }
487a9643ea8Slogwang 
488a9643ea8Slogwang     int ret = 0, rc = 0;
489a9643ea8Slogwang     int addr_len = sizeof(struct sockaddr_in);
490a9643ea8Slogwang     utime64_t start_ms = MtFrame::Instance()->GetLastClock();
491a9643ea8Slogwang     utime64_t cost_time = 0;
492a9643ea8Slogwang     int time_left = timeout;
493a9643ea8Slogwang 
494a9643ea8Slogwang     // 2. ����TCP socket
495a9643ea8Slogwang     int sock = -1;
496a9643ea8Slogwang     sock = mt_tcp_create_sock();
497a9643ea8Slogwang     if (sock < 0)
498a9643ea8Slogwang     {
499a9643ea8Slogwang         MTLOG_ERROR("create tcp socket failed, ret: %d", sock);
500a9643ea8Slogwang         ret = -1;
501a9643ea8Slogwang         goto EXIT_LABEL;
502a9643ea8Slogwang     }
503a9643ea8Slogwang 
504a9643ea8Slogwang     // 2. ���Լ����½�����
505a9643ea8Slogwang     rc = MtFrame::connect(sock, (struct sockaddr *)dst, addr_len, time_left);
506a9643ea8Slogwang     if (rc < 0)
507a9643ea8Slogwang     {
508a9643ea8Slogwang         MTLOG_ERROR("socket[%d] connect failed, ret[%d][%m]", sock, rc);
509a9643ea8Slogwang         ret = -4;
510a9643ea8Slogwang         goto EXIT_LABEL;
511a9643ea8Slogwang     }
512a9643ea8Slogwang 
513a9643ea8Slogwang     // 3. �������ݴ���
514a9643ea8Slogwang     cost_time = MtFrame::Instance()->GetLastClock() - start_ms;
515a9643ea8Slogwang     time_left = (timeout > (int)cost_time) ? (timeout - (int)cost_time) : 0;
516a9643ea8Slogwang     rc = MtFrame::send(sock, pkg, len, 0, time_left);
517a9643ea8Slogwang     if (rc < 0)
518a9643ea8Slogwang     {
519a9643ea8Slogwang         MTLOG_ERROR("socket[%d] send failed, ret[%d][%m]", sock, rc);
520a9643ea8Slogwang         ret = -2;
521a9643ea8Slogwang         goto EXIT_LABEL;
522a9643ea8Slogwang     }
523a9643ea8Slogwang 
524a9643ea8Slogwang     ret = 0;
525a9643ea8Slogwang 
526a9643ea8Slogwang EXIT_LABEL:
527a9643ea8Slogwang 
528a9643ea8Slogwang     if (sock >= 0)
529a9643ea8Slogwang         ::close(sock);
530a9643ea8Slogwang 
531a9643ea8Slogwang     return ret;
532a9643ea8Slogwang }
533a9643ea8Slogwang 
534a9643ea8Slogwang 
535a9643ea8Slogwang /**
536a9643ea8Slogwang  * @brief TCP�շ��ӿڣ�����ѡ���˱������ӻ��߶�����
537a9643ea8Slogwang  *        [ע��] tcp���շ���buff, ��������static����, ����������Ĵ��� [��Ҫ]
538a9643ea8Slogwang  * @param dst -�����͵�Ŀ�ĵ�ַ
539a9643ea8Slogwang  * @param pkg -�������װ�İ���
540a9643ea8Slogwang  * @param len -�������װ�İ��峤��
541a9643ea8Slogwang  * @param rcv_buf -����Ӧ�����buff��ֻ�����տ�������ΪNULL
542a9643ea8Slogwang  * @param buf_size -modify-����Ӧ�����buff��С, �ɹ�����ʱ, �޸�ΪӦ������ȣ�ֻ�����գ�����ΪNULL
543a9643ea8Slogwang  * @param timeout -��ʱʱ��, ��λms
544a9643ea8Slogwang  * @param check_func -��ⱨ���Ƿ�ɹ����ﺯ��
545a9643ea8Slogwang  * @param type - ��������
546a9643ea8Slogwang  *               MT_TCP_SHORT: һ��һ�������ӣ�
547a9643ea8Slogwang  *               MT_TCP_LONG : һ��һ�ճ����ӣ�
548a9643ea8Slogwang  *               MT_TCP_LONG_SNDONLY : ֻ�����ճ����ӣ�
549a9643ea8Slogwang  *               MT_TCP_SHORT_SNDONLY: ֻ�����ն����ӣ�
550a9643ea8Slogwang  * @return  0 �ɹ�, -1 ��socketʧ��, -2 ��������ʧ��, -3 ����Ӧ��ʧ��,
551a9643ea8Slogwang  *          -4 ����ʧ��, -5 ��ⱨ��ʧ��, -6 ���տռ䲻��, -7 ��������ر�����, -10 ������Ч
552a9643ea8Slogwang  */
553a9643ea8Slogwang int mt_tcpsendrcv_ex(struct sockaddr_in* dst, void* pkg, int len, void* rcv_buf, int* buf_size, int timeout, MtFuncTcpMsgLen func, MT_TCP_CONN_TYPE type)
554a9643ea8Slogwang {
555a9643ea8Slogwang 	if(!dst || !pkg || len<1)
556a9643ea8Slogwang 	{
557a9643ea8Slogwang         MTLOG_ERROR("input params invalid, dst[%p], pkg[%p], rcv_buf[%p], fun[%p], len[%d], buf_size[%p]type[%d]",
558a9643ea8Slogwang                     dst, pkg, rcv_buf, func, len, buf_size,type);
559a9643ea8Slogwang         return -10;
560a9643ea8Slogwang 	}
561a9643ea8Slogwang 
562a9643ea8Slogwang     switch (type)
563a9643ea8Slogwang     {
564a9643ea8Slogwang         // TCP�����ӵ�������
565a9643ea8Slogwang         case MT_TCP_LONG:
566a9643ea8Slogwang         {
567a9643ea8Slogwang             return mt_tcpsendrcv(dst, pkg, len, rcv_buf, *buf_size, timeout, func);
568a9643ea8Slogwang         }
569a9643ea8Slogwang 
570a9643ea8Slogwang         // TCP������ֻ������
571a9643ea8Slogwang         case MT_TCP_LONG_SNDONLY:
572a9643ea8Slogwang         {
573a9643ea8Slogwang             return mt_tcpsend(dst, pkg, len, timeout);
574a9643ea8Slogwang         }
575a9643ea8Slogwang 
576a9643ea8Slogwang         // TCP�����ӵ�������
577a9643ea8Slogwang         case MT_TCP_SHORT:
578a9643ea8Slogwang         {
579a9643ea8Slogwang             return mt_tcpsendrcv_short(dst, pkg, len, rcv_buf, *buf_size, timeout, func);
580a9643ea8Slogwang         }
581a9643ea8Slogwang 
582a9643ea8Slogwang         // TCP������ֻ������
583a9643ea8Slogwang         case MT_TCP_SHORT_SNDONLY:
584a9643ea8Slogwang         {
585a9643ea8Slogwang             return mt_tcpsend_short(dst, pkg, len, timeout);
586a9643ea8Slogwang         }
587a9643ea8Slogwang 
588a9643ea8Slogwang         default:
589a9643ea8Slogwang         {
590a9643ea8Slogwang             MTLOG_ERROR("input params invalid, dst[%p], pkg[%p], rcv_buf[%p], fun[%p], len[%d], buf_size[%p]type[%d]",
591a9643ea8Slogwang                         dst, pkg, rcv_buf, func, len, buf_size,type);
592a9643ea8Slogwang             return -10;
593a9643ea8Slogwang         }
594a9643ea8Slogwang     }
595a9643ea8Slogwang 
596a9643ea8Slogwang     return 0;
597a9643ea8Slogwang }
598a9643ea8Slogwang 
599a9643ea8Slogwang 
600a9643ea8Slogwang 
601a9643ea8Slogwang /**
602a9643ea8Slogwang  * @brief ������ص���������
603a9643ea8Slogwang  */
604a9643ea8Slogwang static void mt_task_process(void* arg)
605a9643ea8Slogwang {
606a9643ea8Slogwang     int rc = 0;
607a9643ea8Slogwang     IMtTask* task = (IMtTask*)arg;
608a9643ea8Slogwang     if (!task)
609a9643ea8Slogwang     {
610a9643ea8Slogwang         MTLOG_ERROR("Invalid arg, error");
611a9643ea8Slogwang         return;
612a9643ea8Slogwang     }
613a9643ea8Slogwang 
614a9643ea8Slogwang     rc = task->Process();
615a9643ea8Slogwang     if (rc != 0)
616a9643ea8Slogwang     {
617a9643ea8Slogwang         MTLOG_DEBUG("task process failed(%d), log", rc);
618a9643ea8Slogwang     }
619a9643ea8Slogwang 
620a9643ea8Slogwang     task->SetResult(rc);
621a9643ea8Slogwang 
622a9643ea8Slogwang     return;
623a9643ea8Slogwang };
624a9643ea8Slogwang 
625a9643ea8Slogwang /**
626a9643ea8Slogwang  * @brief ��·IO�Ĵ���, ��������̹߳���
627a9643ea8Slogwang  * @param req_list - �����б�
628a9643ea8Slogwang  * @return 0 �ɹ�, <0ʧ��
629a9643ea8Slogwang  */
630a9643ea8Slogwang int mt_exec_all_task(IMtTaskList& req_list)
631a9643ea8Slogwang {
632a9643ea8Slogwang     MtFrame* mtframe    = MtFrame::Instance();
633a9643ea8Slogwang     MicroThread* thread = mtframe->GetActiveThread();
634a9643ea8Slogwang     IMtTask* task       = NULL;
635a9643ea8Slogwang     MicroThread* sub    = NULL;
636a9643ea8Slogwang     MicroThread* tmp    = NULL;
637a9643ea8Slogwang     int rc              = -1;
638a9643ea8Slogwang 
639a9643ea8Slogwang     MicroThread::SubThreadList list;
640a9643ea8Slogwang     TAILQ_INIT(&list);
641a9643ea8Slogwang 
642a9643ea8Slogwang     // ��ֹû��task������΢�߳�һֱ����ס
643a9643ea8Slogwang     if (0 == req_list.size())
644a9643ea8Slogwang     {
645a9643ea8Slogwang         MTLOG_DEBUG("no task for execult");
646a9643ea8Slogwang         return 0;
647a9643ea8Slogwang     }
648a9643ea8Slogwang 
649a9643ea8Slogwang     // 1. �����̶߳���
650a9643ea8Slogwang     for (IMtTaskList::iterator it = req_list.begin(); it != req_list.end(); ++it)
651a9643ea8Slogwang     {
652a9643ea8Slogwang         task = *it;
653a9643ea8Slogwang         sub = MtFrame::CreateThread(mt_task_process, task, false);
654a9643ea8Slogwang         if (NULL == sub)
655a9643ea8Slogwang         {
656a9643ea8Slogwang             MTLOG_ERROR("create sub thread failed");
657a9643ea8Slogwang             goto EXIT_LABEL;
658a9643ea8Slogwang         }
659a9643ea8Slogwang 
660a9643ea8Slogwang         sub->SetType(MicroThread::SUB_THREAD);
661a9643ea8Slogwang         TAILQ_INSERT_TAIL(&list, sub, _sub_entry);
662a9643ea8Slogwang     }
663a9643ea8Slogwang 
664a9643ea8Slogwang     // 2. ����ִ������
665a9643ea8Slogwang     TAILQ_FOREACH_SAFE(sub, &list, _sub_entry, tmp)
666a9643ea8Slogwang     {
667a9643ea8Slogwang         TAILQ_REMOVE(&list, sub, _sub_entry);
668a9643ea8Slogwang         thread->AddSubThread(sub);
669a9643ea8Slogwang         mtframe->InsertRunable(sub);
670a9643ea8Slogwang     }
671a9643ea8Slogwang 
672a9643ea8Slogwang     // 3. �ȴ����߳�ִ�н���
673a9643ea8Slogwang     thread->Wait();
674a9643ea8Slogwang     rc = 0;
675a9643ea8Slogwang 
676a9643ea8Slogwang EXIT_LABEL:
677a9643ea8Slogwang 
678a9643ea8Slogwang     TAILQ_FOREACH_SAFE(sub, &list, _sub_entry, tmp)
679a9643ea8Slogwang     {
680a9643ea8Slogwang         TAILQ_REMOVE(&list, sub, _sub_entry);
681a9643ea8Slogwang         mtframe->FreeThread(sub);
682a9643ea8Slogwang     }
683a9643ea8Slogwang 
684a9643ea8Slogwang     return rc;
685a9643ea8Slogwang 
686a9643ea8Slogwang }
687a9643ea8Slogwang 
688a9643ea8Slogwang /**
689a9643ea8Slogwang  * @brief ���õ�ǰIMtMsg��˽�б���
690a9643ea8Slogwang  * @info  ֻ����ָ�룬�ڴ���Ҫҵ�����
691a9643ea8Slogwang  */
692a9643ea8Slogwang void mt_set_msg_private(void *data)
693a9643ea8Slogwang {
694a9643ea8Slogwang     MicroThread *msg_thread = MtFrame::Instance()->GetRootThread();
695a9643ea8Slogwang     if (msg_thread != NULL)
696a9643ea8Slogwang         msg_thread->SetPrivate(data);
697a9643ea8Slogwang }
698a9643ea8Slogwang 
699a9643ea8Slogwang /**
700a9643ea8Slogwang  * @brief  ��ȡ��ǰIMtMsg��˽�б���
701a9643ea8Slogwang  * @return ˽�б���ָ��
702a9643ea8Slogwang  */
703a9643ea8Slogwang void* mt_get_msg_private()
704a9643ea8Slogwang {
705a9643ea8Slogwang     MicroThread *msg_thread = MtFrame::Instance()->GetRootThread();
706a9643ea8Slogwang     if (NULL == msg_thread)
707a9643ea8Slogwang     {
708a9643ea8Slogwang         return NULL;
709a9643ea8Slogwang     }
710a9643ea8Slogwang 
711a9643ea8Slogwang     return msg_thread->GetPrivate();
712a9643ea8Slogwang }
713a9643ea8Slogwang 
714a9643ea8Slogwang /**
715a9643ea8Slogwang  * @brief  ΢�߳̿�ܳ�ʼ��
716a9643ea8Slogwang  * @info   ҵ��ʹ��spp������΢�̣߳���Ҫ���øó�ʼ������
717a9643ea8Slogwang  * @return false:��ʼ��ʧ��  true:��ʼ���ɹ�
718a9643ea8Slogwang  */
719*a02c88d6Slogwang bool mt_init_frame(int argc, char * const argv[])
720a9643ea8Slogwang {
721*a02c88d6Slogwang 	if (argc) {
722*a02c88d6Slogwang 		ff_init(argc, argv);
723a9643ea8Slogwang 		ff_set_hook_flag();
724a9643ea8Slogwang 	}
725a9643ea8Slogwang 	memset(&g_mt_syscall_tab, 0, sizeof(g_mt_syscall_tab));
726a9643ea8Slogwang     return MtFrame::Instance()->InitFrame();
727a9643ea8Slogwang }
728a9643ea8Slogwang 
729a9643ea8Slogwang /**
730a9643ea8Slogwang  * @brief ����΢�̶߳���ջ�ռ��С
731a9643ea8Slogwang  * @info  �DZ������ã�Ĭ�ϴ�СΪ128K
732a9643ea8Slogwang  */
733a9643ea8Slogwang void mt_set_stack_size(unsigned int bytes)
734a9643ea8Slogwang {
735a9643ea8Slogwang     ThreadPool::SetDefaultStackSize(bytes);
736a9643ea8Slogwang }
737a9643ea8Slogwang 
738a9643ea8Slogwang /**
739a9643ea8Slogwang  * @brief ΢�̰߳�����ϵͳIO���� recvfrom
740a9643ea8Slogwang  * @param fd ϵͳsocket��Ϣ
741a9643ea8Slogwang  * @param buf ������Ϣ������ָ��
742a9643ea8Slogwang  * @param len ������Ϣ����������
743a9643ea8Slogwang  * @param from ��Դ��ַ��ָ��
744a9643ea8Slogwang  * @param fromlen ��Դ��ַ�Ľṹ����
745a9643ea8Slogwang  * @param timeout ��ȴ�ʱ��, ����
746a9643ea8Slogwang  * @return >0 �ɹ����ճ���, <0 ʧ��
747a9643ea8Slogwang  */
748a9643ea8Slogwang int mt_recvfrom(int fd, void *buf, int len, int flags, struct sockaddr *from, socklen_t *fromlen, int timeout)
749a9643ea8Slogwang {
750a9643ea8Slogwang     return MtFrame::recvfrom(fd, buf, len, flags, from, fromlen, timeout);
751a9643ea8Slogwang }
752a9643ea8Slogwang 
753a9643ea8Slogwang /**
754a9643ea8Slogwang  * @brief ΢�̰߳�����ϵͳIO���� sendto
755a9643ea8Slogwang  * @param fd ϵͳsocket��Ϣ
756a9643ea8Slogwang  * @param msg �����͵���Ϣָ��
757a9643ea8Slogwang  * @param len �����͵���Ϣ����
758a9643ea8Slogwang  * @param to Ŀ�ĵ�ַ��ָ��
759a9643ea8Slogwang  * @param tolen Ŀ�ĵ�ַ�Ľṹ����
760a9643ea8Slogwang  * @param timeout ��ȴ�ʱ��, ����
761a9643ea8Slogwang  * @return >0 �ɹ����ͳ���, <0 ʧ��
762a9643ea8Slogwang  */
763a9643ea8Slogwang int mt_sendto(int fd, const void *msg, int len, int flags, const struct sockaddr *to, int tolen, int timeout)
764a9643ea8Slogwang {
765a9643ea8Slogwang     return MtFrame::sendto(fd, msg, len, flags, to, tolen, timeout);
766a9643ea8Slogwang }
767a9643ea8Slogwang 
768a9643ea8Slogwang /**
769a9643ea8Slogwang  * @brief ΢�̰߳�����ϵͳIO���� connect
770a9643ea8Slogwang  * @param fd ϵͳsocket��Ϣ
771a9643ea8Slogwang  * @param addr ָ��server��Ŀ�ĵ�ַ
772a9643ea8Slogwang  * @param addrlen ��ַ�ij���
773a9643ea8Slogwang  * @param timeout ��ȴ�ʱ��, ����
774a9643ea8Slogwang  * @return >0 �ɹ����ͳ���, <0 ʧ��
775a9643ea8Slogwang  */
776a9643ea8Slogwang int mt_connect(int fd, const struct sockaddr *addr, int addrlen, int timeout)
777a9643ea8Slogwang {
778a9643ea8Slogwang     return MtFrame::connect(fd, addr, addrlen, timeout);
779a9643ea8Slogwang }
780a9643ea8Slogwang 
781a9643ea8Slogwang /**
782a9643ea8Slogwang  * @brief ΢�̰߳�����ϵͳIO���� accept
783a9643ea8Slogwang  * @param fd �����׽���
784a9643ea8Slogwang  * @param addr �ͻ��˵�ַ
785a9643ea8Slogwang  * @param addrlen ��ַ�ij���
786a9643ea8Slogwang  * @param timeout ��ȴ�ʱ��, ����
787a9643ea8Slogwang  * @return >=0 accept��socket������, <0 ʧ��
788a9643ea8Slogwang  */
789a9643ea8Slogwang int mt_accept(int fd, struct sockaddr *addr, socklen_t *addrlen, int timeout)
790a9643ea8Slogwang {
791a9643ea8Slogwang     return MtFrame::accept(fd, addr, addrlen, timeout);
792a9643ea8Slogwang }
793a9643ea8Slogwang 
794a9643ea8Slogwang 
795a9643ea8Slogwang /**
796a9643ea8Slogwang  * @brief ΢�̰߳�����ϵͳIO���� read
797a9643ea8Slogwang  * @param fd ϵͳsocket��Ϣ
798a9643ea8Slogwang  * @param buf ������Ϣ������ָ��
799a9643ea8Slogwang  * @param nbyte ������Ϣ����������
800a9643ea8Slogwang  * @param timeout ��ȴ�ʱ��, ����
801a9643ea8Slogwang  * @return >0 �ɹ����ճ���, <0 ʧ��
802a9643ea8Slogwang  */
803a9643ea8Slogwang ssize_t mt_read(int fd, void *buf, size_t nbyte, int timeout)
804a9643ea8Slogwang {
805a9643ea8Slogwang     return MtFrame::read(fd, buf, nbyte, timeout);
806a9643ea8Slogwang }
807a9643ea8Slogwang 
808a9643ea8Slogwang /**
809a9643ea8Slogwang  * @brief ΢�̰߳�����ϵͳIO���� write
810a9643ea8Slogwang  * @param fd ϵͳsocket��Ϣ
811a9643ea8Slogwang  * @param buf �����͵���Ϣָ��
812a9643ea8Slogwang  * @param nbyte �����͵���Ϣ����
813a9643ea8Slogwang  * @param timeout ��ȴ�ʱ��, ����
814a9643ea8Slogwang  * @return >0 �ɹ����ͳ���, <0 ʧ��
815a9643ea8Slogwang  */
816a9643ea8Slogwang ssize_t mt_write(int fd, const void *buf, size_t nbyte, int timeout)
817a9643ea8Slogwang {
818a9643ea8Slogwang     return MtFrame::write(fd, buf, nbyte, timeout);
819a9643ea8Slogwang }
820a9643ea8Slogwang 
821a9643ea8Slogwang /**
822a9643ea8Slogwang  * @brief ΢�̰߳�����ϵͳIO���� recv
823a9643ea8Slogwang  * @param fd ϵͳsocket��Ϣ
824a9643ea8Slogwang  * @param buf ������Ϣ������ָ��
825a9643ea8Slogwang  * @param len ������Ϣ����������
826a9643ea8Slogwang  * @param timeout ��ȴ�ʱ��, ����
827a9643ea8Slogwang  * @return >0 �ɹ����ճ���, <0 ʧ��
828a9643ea8Slogwang  */
829a9643ea8Slogwang ssize_t mt_recv(int fd, void *buf, int len, int flags, int timeout)
830a9643ea8Slogwang {
831a9643ea8Slogwang     return MtFrame::recv(fd, buf, len, flags, timeout);
832a9643ea8Slogwang }
833a9643ea8Slogwang 
834a9643ea8Slogwang /**
835a9643ea8Slogwang  * @brief ΢�̰߳�����ϵͳIO���� send
836a9643ea8Slogwang  * @param fd ϵͳsocket��Ϣ
837a9643ea8Slogwang  * @param buf �����͵���Ϣָ��
838a9643ea8Slogwang  * @param nbyte �����͵���Ϣ����
839a9643ea8Slogwang  * @param timeout ��ȴ�ʱ��, ����
840a9643ea8Slogwang  * @return >0 �ɹ����ͳ���, <0 ʧ��
841a9643ea8Slogwang  */
842a9643ea8Slogwang ssize_t mt_send(int fd, const void *buf, size_t nbyte, int flags, int timeout)
843a9643ea8Slogwang {
844a9643ea8Slogwang     return MtFrame::send(fd, buf, nbyte, flags, timeout);
845a9643ea8Slogwang }
846a9643ea8Slogwang 
847a9643ea8Slogwang /**
848a9643ea8Slogwang  * @brief ΢�߳�����sleep�ӿ�, ��λms
849a9643ea8Slogwang  */
850a9643ea8Slogwang void mt_sleep(int ms)
851a9643ea8Slogwang {
852a9643ea8Slogwang     MtFrame::sleep(ms);
853a9643ea8Slogwang }
854a9643ea8Slogwang 
855a9643ea8Slogwang /**
856a9643ea8Slogwang  * @brief ΢�̻߳�ȡϵͳʱ�䣬��λms
857a9643ea8Slogwang  */
858a9643ea8Slogwang unsigned long long mt_time_ms(void)
859a9643ea8Slogwang {
860a9643ea8Slogwang     return MtFrame::Instance()->GetLastClock();
861a9643ea8Slogwang }
862a9643ea8Slogwang 
863a9643ea8Slogwang /**
864a9643ea8Slogwang  * @brief ΢�̵߳ȴ�epoll�¼��İ�������
865a9643ea8Slogwang  */
866a9643ea8Slogwang int mt_wait_events(int fd, int events, int timeout)
867a9643ea8Slogwang {
868a9643ea8Slogwang     return MtFrame::Instance()->WaitEvents(fd, events, timeout);
869a9643ea8Slogwang }
870a9643ea8Slogwang 
871a9643ea8Slogwang void* mt_start_thread(void* entry, void* args)
872a9643ea8Slogwang {
873a9643ea8Slogwang     return MtFrame::Instance()->CreateThread((ThreadStart)entry, args, true);
874a9643ea8Slogwang }
875a9643ea8Slogwang 
876a9643ea8Slogwang #define BUF_ALIGNMENT_SIZE 4096
877a9643ea8Slogwang #define BUF_ALIGN_SIZE(x) (((x)+BUF_ALIGNMENT_SIZE-1)&~(BUF_ALIGNMENT_SIZE-1))
878a9643ea8Slogwang #define BUF_DEFAULT_SIZE 4096
879a9643ea8Slogwang 
880a9643ea8Slogwang class ScopedBuf
881a9643ea8Slogwang {
882a9643ea8Slogwang public:
883a9643ea8Slogwang 	ScopedBuf(void*& buf_keeper, bool keep)
884a9643ea8Slogwang 	:buf_keeper_(buf_keeper),buf_(0),len_(0),len_watermark_(0),keep_(keep)
885a9643ea8Slogwang 	{}
886a9643ea8Slogwang 
887a9643ea8Slogwang 	int Alloc(int len)
888a9643ea8Slogwang 	{
889a9643ea8Slogwang 		if(len<len_)
890a9643ea8Slogwang 		{
891a9643ea8Slogwang 			return -1; // �ռ��СԽ��
892a9643ea8Slogwang 		}
893a9643ea8Slogwang 
894a9643ea8Slogwang         if(len==0)
895a9643ea8Slogwang         {
896a9643ea8Slogwang             len = BUF_ALIGNMENT_SIZE;
897a9643ea8Slogwang         }
898a9643ea8Slogwang 		if(len_==len)
899a9643ea8Slogwang 		{
900a9643ea8Slogwang 			return 0;
901a9643ea8Slogwang 		}
902a9643ea8Slogwang 
903a9643ea8Slogwang 		len_ = BUF_ALIGN_SIZE(len);
904a9643ea8Slogwang 		if(len_==0)
905a9643ea8Slogwang 		{
906a9643ea8Slogwang 			len_ = BUF_DEFAULT_SIZE;
907a9643ea8Slogwang 		}
908a9643ea8Slogwang 		len_watermark_ = len_-BUF_ALIGNMENT_SIZE;
909a9643ea8Slogwang 		char* tmp = (char*)realloc(buf_, len_);
910a9643ea8Slogwang 		if(tmp==NULL)
911a9643ea8Slogwang 		{
912a9643ea8Slogwang 	        return -2; // �����ڴ�ʧ��
913a9643ea8Slogwang 		}
914a9643ea8Slogwang 
915a9643ea8Slogwang 		buf_ = tmp;
916a9643ea8Slogwang 		return 0;
917a9643ea8Slogwang 	}
918a9643ea8Slogwang 
919a9643ea8Slogwang 	void reset()
920a9643ea8Slogwang 	{
921a9643ea8Slogwang 		if(keep_)
922a9643ea8Slogwang 		{
923a9643ea8Slogwang 			buf_keeper_ = (void*)buf_;
924a9643ea8Slogwang 			buf_ = NULL;
925a9643ea8Slogwang 		}
926a9643ea8Slogwang 	}
927a9643ea8Slogwang 
928a9643ea8Slogwang 	~ScopedBuf()
929a9643ea8Slogwang 	{
930a9643ea8Slogwang 		if(buf_!=NULL)
931a9643ea8Slogwang 		{
932a9643ea8Slogwang 			free(buf_);
933a9643ea8Slogwang 			buf_ = NULL;
934a9643ea8Slogwang 		}
935a9643ea8Slogwang 	}
936a9643ea8Slogwang 
937a9643ea8Slogwang public:
938a9643ea8Slogwang 	void* &buf_keeper_;
939a9643ea8Slogwang 	char* buf_;
940a9643ea8Slogwang 	int   len_;
941a9643ea8Slogwang     int   len_watermark_;
942a9643ea8Slogwang 	bool  keep_;
943a9643ea8Slogwang 
944a9643ea8Slogwang };
945a9643ea8Slogwang 
946a9643ea8Slogwang /**
947a9643ea8Slogwang  * @brief TCPѭ������, ֱ������OK��ʱ
948a9643ea8Slogwang  *       [ע��] �����߲�Ҫ�����޸ĺ�������ֵ����֤��Ҫ��mt_tcpsendrcv�ȵ��ýӿڳ�ͻ [��Ҫ]
949a9643ea8Slogwang  */
950a9643ea8Slogwang static int mt_tcp_check_recv(int sock, void*& rcv_buf, int &len, int flags,
951a9643ea8Slogwang                      int timeout, MtFuncTcpMsgChecker check_func, void* msg_ctx, bool keep_rcv_buf)
952a9643ea8Slogwang {
953a9643ea8Slogwang     int recv_len = 0;
954a9643ea8Slogwang     utime64_t start_ms = MtFrame::Instance()->GetLastClock();
955a9643ea8Slogwang 
956a9643ea8Slogwang 	int rc = 0;
957a9643ea8Slogwang 	int ret = 0;
958a9643ea8Slogwang     int pkg_len = 0;
959a9643ea8Slogwang     bool msg_len_detected = false;
960a9643ea8Slogwang 
961a9643ea8Slogwang 	ScopedBuf sbuf(rcv_buf, keep_rcv_buf);
962a9643ea8Slogwang 	ret = sbuf.Alloc(len);
963a9643ea8Slogwang 
964a9643ea8Slogwang 	if(ret!=0)
965a9643ea8Slogwang 	{
966a9643ea8Slogwang         MTLOG_ERROR("tcp socket[%d] recv failed ret[%d], alloc rcv buf failed, [%m]", sock, ret);
967a9643ea8Slogwang 		return -11;
968a9643ea8Slogwang 	}
969a9643ea8Slogwang 
970a9643ea8Slogwang     do
971a9643ea8Slogwang     {
972a9643ea8Slogwang         utime64_t cost_time = MtFrame::Instance()->GetLastClock() - start_ms;
973a9643ea8Slogwang         if (cost_time > (utime64_t)timeout)
974a9643ea8Slogwang         {
975a9643ea8Slogwang             errno = ETIME;
976a9643ea8Slogwang             MTLOG_ERROR("tcp socket[%d] recv not ok, timeout", sock);
977a9643ea8Slogwang             return -3;
978a9643ea8Slogwang         }
979a9643ea8Slogwang 
980a9643ea8Slogwang         rc = MtFrame::recv(sock, (sbuf.buf_ + recv_len), (sbuf.len_ - recv_len), 0, (timeout - (int)cost_time));
981a9643ea8Slogwang         if (rc < 0)
982a9643ea8Slogwang         {
983a9643ea8Slogwang             MTLOG_ERROR("tcp socket[%d] recv failed ret[%d][%m]", sock, rc);
984a9643ea8Slogwang             return -3;
985a9643ea8Slogwang         }
986a9643ea8Slogwang 		else if (rc == 0) // Զ�˹ر�
987a9643ea8Slogwang         {
988a9643ea8Slogwang 
989a9643ea8Slogwang 			if(recv_len==0) // δ�ذ���ֱ�ӷ���Զ�˹ر�
990a9643ea8Slogwang 			{
991a9643ea8Slogwang 	            MTLOG_ERROR("tcp socket[%d] remote close", sock);
992a9643ea8Slogwang 	            return -7;
993a9643ea8Slogwang 			}
994a9643ea8Slogwang 
995a9643ea8Slogwang 	        /* ��鱨�������� */
996a9643ea8Slogwang 		    rc = check_func(sbuf.buf_, recv_len, true, msg_ctx, msg_len_detected);
997a9643ea8Slogwang 
998a9643ea8Slogwang 			if(rc!=recv_len) // ҵ�����Զ�˹رգ�Ӧ�÷������������ȣ�����<=0,��ʾ����������
999a9643ea8Slogwang 			{
1000a9643ea8Slogwang 	            MTLOG_ERROR("tcp socket[%d] remote close", sock);
1001a9643ea8Slogwang     	        return -7;
1002a9643ea8Slogwang 			}
1003a9643ea8Slogwang         	len = recv_len;
1004a9643ea8Slogwang 			break;
1005a9643ea8Slogwang         }
1006a9643ea8Slogwang         recv_len += rc;
1007a9643ea8Slogwang 
1008a9643ea8Slogwang         /* ��鱨�������� */
1009a9643ea8Slogwang         if((!msg_len_detected)||recv_len==pkg_len)
1010a9643ea8Slogwang         {
1011a9643ea8Slogwang             rc = check_func(sbuf.buf_, recv_len, false, msg_ctx,msg_len_detected);
1012a9643ea8Slogwang             if(msg_len_detected)
1013a9643ea8Slogwang             {
1014a9643ea8Slogwang                 pkg_len = rc;
1015a9643ea8Slogwang             }
1016a9643ea8Slogwang         }
1017a9643ea8Slogwang         else
1018a9643ea8Slogwang         {
1019a9643ea8Slogwang             rc = pkg_len;
1020a9643ea8Slogwang         }
1021a9643ea8Slogwang 
1022a9643ea8Slogwang         if (rc < 0)
1023a9643ea8Slogwang         {
1024a9643ea8Slogwang             MTLOG_ERROR("tcp socket[%d] user check pkg error[%d]", sock, rc);
1025a9643ea8Slogwang             return -5;
1026a9643ea8Slogwang         }
1027a9643ea8Slogwang         else if (rc == 0) // ����δ������,�Ҳ�ȷ����С
1028a9643ea8Slogwang         {
1029a9643ea8Slogwang         	if(sbuf.len_ > recv_len)
1030a9643ea8Slogwang         	{
1031a9643ea8Slogwang         		continue;
1032a9643ea8Slogwang         	}
1033a9643ea8Slogwang             // û�ռ��ٽ�����, 2����С��չbuf
1034a9643ea8Slogwang 			ret = sbuf.Alloc(sbuf.len_<<1);
1035a9643ea8Slogwang 
1036a9643ea8Slogwang 			if(ret!=0)
1037a9643ea8Slogwang 			{
1038a9643ea8Slogwang 		        MTLOG_ERROR("tcp socket[%d] recv failed ret[%d], alloc rcv buf failed, [%m]", sock, ret);
1039a9643ea8Slogwang 				return -11;
1040a9643ea8Slogwang 			}
1041a9643ea8Slogwang         }
1042a9643ea8Slogwang         else    // �ɹ����㱨�ij���
1043a9643ea8Slogwang         {
1044a9643ea8Slogwang             if (rc > recv_len) // ���Ļ�δ��ȫ
1045a9643ea8Slogwang             {
1046a9643ea8Slogwang             	if(sbuf.len_ > recv_len) // recv buf���пռ�.��δ����ˮλ
1047a9643ea8Slogwang             	{
1048a9643ea8Slogwang             		continue;
1049a9643ea8Slogwang             	}
1050a9643ea8Slogwang 
1051a9643ea8Slogwang 	            // û�ռ��ٽ�����, ����ҵ��ָʾ��С��չ�ڴ�
1052a9643ea8Slogwang 				ret = sbuf.Alloc(rc);
1053a9643ea8Slogwang 
1054a9643ea8Slogwang 				if(ret!=0)
1055a9643ea8Slogwang 				{
1056a9643ea8Slogwang 			        MTLOG_ERROR("tcp socket[%d] recv failed ret[%d], alloc rcv buf failed, [%m]", sock, ret);
1057a9643ea8Slogwang 					return -11;
1058a9643ea8Slogwang 				}
1059a9643ea8Slogwang             }
1060a9643ea8Slogwang             else if(rc==recv_len) // �հ�����
1061a9643ea8Slogwang             {
1062a9643ea8Slogwang                 len = rc;
1063a9643ea8Slogwang                 break;
1064a9643ea8Slogwang             }
1065a9643ea8Slogwang             else // ΢�߳����ģʽ�£�������ճ��
1066a9643ea8Slogwang             {
1067a9643ea8Slogwang 	            MTLOG_ERROR("tcp socket[%d] user check pkg error, pkg len < recv_len", sock);
1068a9643ea8Slogwang 	            return -5;
1069a9643ea8Slogwang             }
1070a9643ea8Slogwang         }
1071a9643ea8Slogwang     } while (true);
1072a9643ea8Slogwang 
1073a9643ea8Slogwang 	sbuf.reset();
1074a9643ea8Slogwang 
1075a9643ea8Slogwang     return 0;
1076a9643ea8Slogwang }
1077a9643ea8Slogwang 
1078a9643ea8Slogwang 
1079a9643ea8Slogwang 
1080a9643ea8Slogwang /**
1081a9643ea8Slogwang  * @brief TCP��������ӳصķ�ʽ����IP/PORT����, ���ӱ���Ĭ��10����
1082a9643ea8Slogwang  *        [ע��] tcp���շ���buff, ��������static����, ����������Ĵ��� [��Ҫ]
1083a9643ea8Slogwang  * @param dst -�����͵�Ŀ�ĵ�ַ
1084a9643ea8Slogwang  * @param pkg -�������װ�İ���
1085a9643ea8Slogwang  * @param len -�������װ�İ��峤��
1086a9643ea8Slogwang  * @param rcv_buf -���������������ο����� keep_rcv_buf��
1087a9643ea8Slogwang  * @param recv_pkg_size -����Ӧ�����buff�ij�ʼ��С����������0ʱ�����ر���ʵ�ʳ��ȡ�
1088a9643ea8Slogwang  * @param timeout -��ʱʱ��, ��λms
1089a9643ea8Slogwang  * @param check_func -��ⱨ���Ƿ�ɹ����ﺯ��
1090a9643ea8Slogwang  * @param msg_ctx -�������ĵ������ı�����
1091a9643ea8Slogwang  * @param keep_rcv_buf -true,��ܽ�����rcv_buf��ֵ������������ҵ�����ͷŸ�buf���������ڵ��ý���ǰ�ͷŸ�buf����ע�⡿
1092a9643ea8Slogwang  *               ҵ����Ҫ�Լ���msg_ctx�б��������Ϣ�������ͨ��malloc�����ڴ桿
1093a9643ea8Slogwang  * @return  0 �ɹ�, -1 ��socketʧ��, -2 ��������ʧ��, -3 ����Ӧ��ʧ��,
1094a9643ea8Slogwang  *          -4 ����ʧ��, -5 ��ⱨ��ʧ��, -6 ���տռ䲻��, -7 ��������ر�����, -10 ������Ч
1095a9643ea8Slogwang  */
1096a9643ea8Slogwang int mt_tcpsendrcv(struct sockaddr_in* dst, void* pkg, int len, void*& rcv_buf, int& recv_pkg_size,
1097a9643ea8Slogwang                      int timeout, MtFuncTcpMsgChecker check_func, void* msg_ctx, bool keep_rcv_buf)
1098a9643ea8Slogwang {
1099a9643ea8Slogwang 	if(!dst || !pkg || len<1)
1100a9643ea8Slogwang 	{
1101a9643ea8Slogwang         MTLOG_ERROR("input params invalid, dst[%p], pkg[%p], len[%d], fun[%p]",
1102a9643ea8Slogwang                     dst, pkg, len, check_func);
1103a9643ea8Slogwang         return -10;
1104a9643ea8Slogwang 	}
1105a9643ea8Slogwang 
1106a9643ea8Slogwang 
1107a9643ea8Slogwang     int ret = 0, rc = 0;
1108a9643ea8Slogwang     int addr_len = sizeof(struct sockaddr_in);
1109a9643ea8Slogwang     utime64_t start_ms = MtFrame::Instance()->GetLastClock();
1110a9643ea8Slogwang     utime64_t cost_time = 0;
1111a9643ea8Slogwang     int time_left = timeout;
1112a9643ea8Slogwang 
1113a9643ea8Slogwang     // 1. ��ȡTCP���ӳض���, �ҽ�֪ͨ����
1114a9643ea8Slogwang     int sock = -1;
1115a9643ea8Slogwang     TcpKeepConn* conn = mt_tcp_get_keep_conn(dst, sock);
1116a9643ea8Slogwang     if ((conn == NULL) || (sock < 0))
1117a9643ea8Slogwang     {
1118a9643ea8Slogwang         MTLOG_ERROR("socket[%d] get conn failed, ret[%m]", sock);
1119a9643ea8Slogwang         ret = -1;
1120a9643ea8Slogwang         goto EXIT_LABEL;
1121a9643ea8Slogwang     }
1122a9643ea8Slogwang 
1123a9643ea8Slogwang     // 2. ���Լ����½�����
1124a9643ea8Slogwang     rc = MtFrame::connect(sock, (struct sockaddr *)dst, addr_len, time_left);
1125a9643ea8Slogwang     if (rc < 0)
1126a9643ea8Slogwang     {
1127a9643ea8Slogwang         MTLOG_ERROR("socket[%d] connect failed, ret[%d][%m]", sock, rc);
1128a9643ea8Slogwang         ret = -4;
1129a9643ea8Slogwang         goto EXIT_LABEL;
1130a9643ea8Slogwang     }
1131a9643ea8Slogwang 
1132a9643ea8Slogwang     // 3. �������ݴ���
1133a9643ea8Slogwang     cost_time = MtFrame::Instance()->GetLastClock() - start_ms;
1134a9643ea8Slogwang     time_left = (timeout > (int)cost_time) ? (timeout - (int)cost_time) : 0;
1135a9643ea8Slogwang     rc = MtFrame::send(sock, pkg, len, 0, time_left);
1136a9643ea8Slogwang     if (rc < 0)
1137a9643ea8Slogwang     {
1138a9643ea8Slogwang         MTLOG_ERROR("socket[%d] send failed, ret[%d][%m]", sock, rc);
1139a9643ea8Slogwang         ret = -2;
1140a9643ea8Slogwang         goto EXIT_LABEL;
1141a9643ea8Slogwang     }
1142a9643ea8Slogwang 
1143a9643ea8Slogwang     // 4. �������ݴ���
1144a9643ea8Slogwang     cost_time = MtFrame::Instance()->GetLastClock() - start_ms;
1145a9643ea8Slogwang     time_left = (timeout > (int)cost_time) ? (timeout - (int)cost_time) : 0;
1146a9643ea8Slogwang 
1147a9643ea8Slogwang     rc = mt_tcp_check_recv(sock, rcv_buf, recv_pkg_size, 0, time_left, check_func, msg_ctx, keep_rcv_buf);
1148a9643ea8Slogwang     if (rc < 0)
1149a9643ea8Slogwang     {
1150a9643ea8Slogwang         MTLOG_ERROR("socket[%d] rcv failed, ret[%d][%m]", sock, rc);
1151a9643ea8Slogwang         ret = rc;
1152a9643ea8Slogwang         goto EXIT_LABEL;
1153a9643ea8Slogwang     }
1154a9643ea8Slogwang 
1155a9643ea8Slogwang     ret = 0;
1156a9643ea8Slogwang 
1157a9643ea8Slogwang EXIT_LABEL:
1158a9643ea8Slogwang 
1159a9643ea8Slogwang     // ʧ����ǿ���ͷ�����, ����ʱ����
1160a9643ea8Slogwang     if (conn != NULL)
1161a9643ea8Slogwang     {
1162a9643ea8Slogwang         ConnectionMgr::Instance()->FreeConnection(conn, (ret < 0));
1163a9643ea8Slogwang     }
1164a9643ea8Slogwang 
1165a9643ea8Slogwang     return ret;
1166a9643ea8Slogwang }
1167a9643ea8Slogwang 
1168a9643ea8Slogwang 
1169a9643ea8Slogwang /**
1170a9643ea8Slogwang  * @brief TCP�������շ�����
1171a9643ea8Slogwang  *        [ע��] tcp���շ���buff, ��������static����, ����������Ĵ��� [��Ҫ]
1172a9643ea8Slogwang  *        [ע��] �޸Ľӿڣ���ע�ⲻҪ����޸ķ���ֵ������֤��mt_tcpsendrcv_ex����ֵƥ�� [��Ҫ]
1173a9643ea8Slogwang  * @param dst -�����͵�Ŀ�ĵ�ַ
1174a9643ea8Slogwang  * @param pkg -�������װ�İ���
1175a9643ea8Slogwang  * @param len -�������װ�İ��峤��
1176a9643ea8Slogwang  * @param rcv_buf -���������������ο����� keep_rcv_buf��
1177a9643ea8Slogwang  * @param recv_pkg_size -����Ӧ�����buff�ij�ʼ��С����������0ʱ�����ر���ʵ�ʳ��ȡ�
1178a9643ea8Slogwang  * @param timeout -��ʱʱ��, ��λms
1179a9643ea8Slogwang  * @param check_func -��ⱨ���Ƿ�ɹ����ﺯ��
1180a9643ea8Slogwang  * @param msg_ctx -�������ĵ������ı�����
1181a9643ea8Slogwang  * @param keep_rcv_buf -true,��ܽ�����rcv_buf��ֵ������������ҵ�����ͷŸ�buf���������ڵ��ý���ǰ�ͷŸ�buf����ע�⡿
1182a9643ea8Slogwang  *               ҵ����Ҫ�Լ���msg_ctx�б��������Ϣ�������ͨ��malloc�����ڴ桿
1183a9643ea8Slogwang  * @return  0 �ɹ�, -1 ��socketʧ��, -2 ��������ʧ��, -3 ����Ӧ��ʧ��,
1184a9643ea8Slogwang  *          -4 ����ʧ��, -5 ��ⱨ��ʧ��, -6 ���տռ䲻��, -7 ��������ر�����, -10 ������Ч
1185a9643ea8Slogwang  */
1186a9643ea8Slogwang int mt_tcpsendrcv_short(struct sockaddr_in* dst, void* pkg, int len, void*& rcv_buf, int& recv_pkg_size,
1187a9643ea8Slogwang                      int timeout, MtFuncTcpMsgChecker check_func, void* msg_ctx, bool keep_rcv_buf)
1188a9643ea8Slogwang {
1189a9643ea8Slogwang     int ret = 0, rc = 0;
1190a9643ea8Slogwang     int addr_len = sizeof(struct sockaddr_in);
1191a9643ea8Slogwang     utime64_t start_ms = MtFrame::Instance()->GetLastClock();
1192a9643ea8Slogwang     utime64_t cost_time = 0;
1193a9643ea8Slogwang     int time_left = timeout;
1194a9643ea8Slogwang 
1195a9643ea8Slogwang     // 1. �������
1196a9643ea8Slogwang 	if(!dst || !pkg || len<1)
1197a9643ea8Slogwang 	{
1198a9643ea8Slogwang         MTLOG_ERROR("input params invalid, dst[%p], pkg[%p], len[%d], fun[%p]",
1199a9643ea8Slogwang                     dst, pkg, len, check_func);
1200a9643ea8Slogwang         return -10;
1201a9643ea8Slogwang 	}
1202a9643ea8Slogwang 
1203a9643ea8Slogwang     // 2. ����TCP socket
1204a9643ea8Slogwang     int sock;
1205a9643ea8Slogwang     sock = mt_tcp_create_sock();
1206a9643ea8Slogwang     if (sock < 0)
1207a9643ea8Slogwang     {
1208a9643ea8Slogwang         MTLOG_ERROR("create tcp socket failed, ret: %d", sock);
1209a9643ea8Slogwang         return -1;
1210a9643ea8Slogwang     }
1211a9643ea8Slogwang 
1212a9643ea8Slogwang     // 3. ���Լ����½�����
1213a9643ea8Slogwang     rc = MtFrame::connect(sock, (struct sockaddr *)dst, addr_len, time_left);
1214a9643ea8Slogwang     if (rc < 0)
1215a9643ea8Slogwang     {
1216a9643ea8Slogwang         MTLOG_ERROR("socket[%d] connect failed, ret[%d][%m]", sock, rc);
1217a9643ea8Slogwang         ret = -4;
1218a9643ea8Slogwang         goto EXIT_LABEL;
1219a9643ea8Slogwang     }
1220a9643ea8Slogwang 
1221a9643ea8Slogwang     // 4. �������ݴ���
1222a9643ea8Slogwang     cost_time = MtFrame::Instance()->GetLastClock() - start_ms;
1223a9643ea8Slogwang     time_left = (timeout > (int)cost_time) ? (timeout - (int)cost_time) : 0;
1224a9643ea8Slogwang     rc = MtFrame::send(sock, pkg, len, 0, time_left);
1225a9643ea8Slogwang     if (rc < 0)
1226a9643ea8Slogwang     {
1227a9643ea8Slogwang         MTLOG_ERROR("socket[%d] send failed, ret[%d][%m]", sock, rc);
1228a9643ea8Slogwang         ret = -2;
1229a9643ea8Slogwang         goto EXIT_LABEL;
1230a9643ea8Slogwang     }
1231a9643ea8Slogwang 
1232a9643ea8Slogwang     // 5. �������ݴ���
1233a9643ea8Slogwang     cost_time = MtFrame::Instance()->GetLastClock() - start_ms;
1234a9643ea8Slogwang     time_left = (timeout > (int)cost_time) ? (timeout - (int)cost_time) : 0;
1235a9643ea8Slogwang     rc = mt_tcp_check_recv(sock, rcv_buf, recv_pkg_size, 0, time_left, check_func, msg_ctx, keep_rcv_buf);
1236a9643ea8Slogwang 
1237a9643ea8Slogwang     if (rc < 0)
1238a9643ea8Slogwang     {
1239a9643ea8Slogwang         MTLOG_ERROR("socket[%d] rcv failed, ret[%d][%m]", sock, rc);
1240a9643ea8Slogwang         ret = rc;
1241a9643ea8Slogwang         goto EXIT_LABEL;
1242a9643ea8Slogwang     }
1243a9643ea8Slogwang 
1244a9643ea8Slogwang     ret = 0;
1245a9643ea8Slogwang 
1246a9643ea8Slogwang EXIT_LABEL:
1247a9643ea8Slogwang     if (sock >= 0)
1248a9643ea8Slogwang         ::close(sock);
1249a9643ea8Slogwang 
1250a9643ea8Slogwang     return ret;
1251a9643ea8Slogwang }
1252a9643ea8Slogwang 
1253a9643ea8Slogwang 
1254a9643ea8Slogwang 
1255a9643ea8Slogwang /**
1256a9643ea8Slogwang  * @brief TCP�շ��ӿڣ�����ѡ���˱������ӻ��߶�����
1257a9643ea8Slogwang  *        [ע��] tcp���շ���buff, ��������static����, ����������Ĵ��� [��Ҫ]
1258a9643ea8Slogwang  * @param dst -�����͵�Ŀ�ĵ�ַ
1259a9643ea8Slogwang  * @param pkg -�������װ�İ���
1260a9643ea8Slogwang  * @param len -�������װ�İ��峤��
1261a9643ea8Slogwang  * @param rcv_buf -���������������ο����� keep_rcv_buf��
1262a9643ea8Slogwang  * @param recv_pkg_size -����Ӧ�����buff�ij�ʼ��С����������0ʱ�����ر���ʵ�ʳ��ȡ�
1263a9643ea8Slogwang  * @param timeout -��ʱʱ��, ��λms
1264a9643ea8Slogwang  * @param check_func -��ⱨ���Ƿ�ɹ����ﺯ��
1265a9643ea8Slogwang  * @param msg_ctx -�������ĵ������ı�����
1266a9643ea8Slogwang  *
1267a9643ea8Slogwang  * @param type - ��������
1268a9643ea8Slogwang  *               MT_TCP_SHORT: һ��һ�������ӣ�
1269a9643ea8Slogwang  *               MT_TCP_LONG : һ��һ�ճ����ӣ�
1270a9643ea8Slogwang  *               MT_TCP_LONG_SNDONLY : ֻ�����ճ����ӣ�
1271a9643ea8Slogwang  *               MT_TCP_SHORT_SNDONLY: ֻ�����ն����ӣ�
1272a9643ea8Slogwang  * @param keep_rcv_buf -true,��ܽ�����rcv_buf��ֵ������������ҵ�����ͷŸ�buf���������ڵ��ý���ǰ�ͷŸ�buf����ע�⡿
1273a9643ea8Slogwang  *               ҵ����Ҫ�Լ���msg_ctx�б��������Ϣ�������ͨ��malloc�����ڴ桿
1274a9643ea8Slogwang  * @return  0 �ɹ�, -1 ��socketʧ��, -2 ��������ʧ��, -3 ����Ӧ��ʧ��,
1275a9643ea8Slogwang  *          -4 ����ʧ��, -5 ��ⱨ��ʧ��, -6 ���տռ䲻��, -7 ��������ر�����, -10 ������Ч
1276a9643ea8Slogwang  */
1277a9643ea8Slogwang int mt_tcpsendrcv_ex(struct sockaddr_in* dst, void* pkg, int len, void*& rcv_buf, int& rcv_pkg_size,
1278a9643ea8Slogwang                      int timeout, MtFuncTcpMsgChecker check_func, void* msg_ctx,
1279a9643ea8Slogwang                      MT_TCP_CONN_TYPE type, bool keep_rcv_buf)
1280a9643ea8Slogwang {
1281a9643ea8Slogwang 	if(!dst || !pkg || len<1)
1282a9643ea8Slogwang 	{
1283a9643ea8Slogwang         MTLOG_ERROR("input params invalid, dst[%p], pkg[%p], len[%d], fun[%p], msg_ctx[%p], type[%d]",
1284a9643ea8Slogwang                     dst, pkg, len, check_func, msg_ctx, type);
1285a9643ea8Slogwang         return -10;
1286a9643ea8Slogwang 	}
1287a9643ea8Slogwang 
1288a9643ea8Slogwang     switch (type)
1289a9643ea8Slogwang     {
1290a9643ea8Slogwang         // TCP�����ӵ�������
1291a9643ea8Slogwang         case MT_TCP_LONG:
1292a9643ea8Slogwang         {
1293a9643ea8Slogwang             return mt_tcpsendrcv(dst, pkg, len, rcv_buf, rcv_pkg_size, timeout, check_func, msg_ctx, keep_rcv_buf);
1294a9643ea8Slogwang         }
1295a9643ea8Slogwang 
1296a9643ea8Slogwang         // TCP������ֻ������
1297a9643ea8Slogwang         case MT_TCP_LONG_SNDONLY:
1298a9643ea8Slogwang         {
1299a9643ea8Slogwang             return mt_tcpsend(dst, pkg, len, timeout);
1300a9643ea8Slogwang         }
1301a9643ea8Slogwang 
1302a9643ea8Slogwang         // TCP�����ӵ�������
1303a9643ea8Slogwang         case MT_TCP_SHORT:
1304a9643ea8Slogwang         {
1305a9643ea8Slogwang             return mt_tcpsendrcv_short(dst, pkg, len, rcv_buf, rcv_pkg_size, timeout, check_func, msg_ctx, keep_rcv_buf);
1306a9643ea8Slogwang         }
1307a9643ea8Slogwang 
1308a9643ea8Slogwang         // TCP������ֻ������
1309a9643ea8Slogwang         case MT_TCP_SHORT_SNDONLY:
1310a9643ea8Slogwang         {
1311a9643ea8Slogwang             return mt_tcpsend_short(dst, pkg, len, timeout);
1312a9643ea8Slogwang         }
1313a9643ea8Slogwang 
1314a9643ea8Slogwang         default:
1315a9643ea8Slogwang         {
1316a9643ea8Slogwang 	        MTLOG_ERROR("input params invalid, dst[%p], pkg[%p], len[%d], fun[%p], msg_ctx[%p], type[%d]",
1317a9643ea8Slogwang                     dst, pkg, len, check_func, msg_ctx, type);
1318a9643ea8Slogwang             return -10;
1319a9643ea8Slogwang         }
1320a9643ea8Slogwang     }
1321a9643ea8Slogwang 
1322a9643ea8Slogwang     return 0;
1323a9643ea8Slogwang }
1324a9643ea8Slogwang 
1325a9643ea8Slogwang 
1326a9643ea8Slogwang 
1327a9643ea8Slogwang }
1328a9643ea8Slogwang 
1329a9643ea8Slogwang 
1330