xref: /f-stack/app/micro_thread/mt_api.cpp (revision 84bcae25)
1a9643ea8Slogwang 
2a9643ea8Slogwang /**
3a9643ea8Slogwang  * Tencent is pleased to support the open source community by making MSEC available.
4a9643ea8Slogwang  *
5a9643ea8Slogwang  * Copyright (C) 2016 THL A29 Limited, a Tencent company. All rights reserved.
6a9643ea8Slogwang  *
7a9643ea8Slogwang  * Licensed under the GNU General Public License, Version 2.0 (the "License");
8a9643ea8Slogwang  * you may not use this file except in compliance with the License. You may
9a9643ea8Slogwang  * obtain a copy of the License at
10a9643ea8Slogwang  *
11a9643ea8Slogwang  *     https://opensource.org/licenses/GPL-2.0
12a9643ea8Slogwang  *
13a9643ea8Slogwang  * Unless required by applicable law or agreed to in writing, software distributed under the
14a9643ea8Slogwang  * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
15a9643ea8Slogwang  * either express or implied. See the License for the specific language governing permissions
16a9643ea8Slogwang  * and limitations under the License.
17a9643ea8Slogwang  */
18a9643ea8Slogwang 
19a9643ea8Slogwang 
20a9643ea8Slogwang /**
21a9643ea8Slogwang  *  @filename mt_sys_call.cpp
22a9643ea8Slogwang  */
23a9643ea8Slogwang 
24a9643ea8Slogwang #include "kqueue_proxy.h"
25a9643ea8Slogwang #include "micro_thread.h"
26a9643ea8Slogwang #include "mt_connection.h"
27a9643ea8Slogwang #include "mt_api.h"
28a9643ea8Slogwang #include "ff_api.h"
29a9643ea8Slogwang #include "mt_sys_hook.h"
30a9643ea8Slogwang 
31a9643ea8Slogwang namespace NS_MICRO_THREAD {
32a9643ea8Slogwang 
mt_udpsendrcv(struct sockaddr_in * dst,void * pkg,int len,void * rcv_buf,int & buf_size,int timeout)33a9643ea8Slogwang int mt_udpsendrcv(struct sockaddr_in* dst, void* pkg, int len, void* rcv_buf, int& buf_size, int timeout)
34a9643ea8Slogwang {
35a9643ea8Slogwang     int ret = 0;
36a9643ea8Slogwang     int rc  = 0;
37a9643ea8Slogwang     int flags = 1;
38a9643ea8Slogwang     struct sockaddr_in from_addr = {0};
39a9643ea8Slogwang     int addr_len = sizeof(from_addr);
40a9643ea8Slogwang 
41a9643ea8Slogwang     if(len<1 || buf_size<1 ||!dst || !pkg || !rcv_buf)
42a9643ea8Slogwang     {
43a9643ea8Slogwang         MTLOG_ERROR("mt_udpsendrcv input params invalid, dst[%p], pkg[%p], rcv_buf[%p], len[%d], buf_size[%d]",
44a9643ea8Slogwang             dst, pkg, rcv_buf, len, buf_size);
45a9643ea8Slogwang         return -10;
46a9643ea8Slogwang     }
47a9643ea8Slogwang 
48a9643ea8Slogwang     int sock = socket(PF_INET, SOCK_DGRAM, 0);
49a9643ea8Slogwang     if ((sock < 0) || (ioctl(sock, FIONBIO, &flags) < 0))
50a9643ea8Slogwang     {
515ac59bc4Slogwang         MT_ATTR_API(320842, 1);
52a9643ea8Slogwang         MTLOG_ERROR("mt_udpsendrcv new sock failed, sock: %d, errno: %d (%m)", sock, errno);
53a9643ea8Slogwang         ret = -1;
54a9643ea8Slogwang         goto EXIT_LABEL;
55a9643ea8Slogwang     }
56a9643ea8Slogwang 
57a9643ea8Slogwang     rc = MtFrame::sendto(sock, pkg, len, 0, (struct sockaddr*)dst, (int)sizeof(*dst), timeout);
58a9643ea8Slogwang     if (rc < 0)
59a9643ea8Slogwang     {
605ac59bc4Slogwang         MT_ATTR_API(320844, 1);
61a9643ea8Slogwang         MTLOG_ERROR("mt_udpsendrcv send failed, rc: %d, errno: %d (%m)", rc, errno);
62a9643ea8Slogwang         ret = -2;
63a9643ea8Slogwang         goto EXIT_LABEL;
64a9643ea8Slogwang     }
65a9643ea8Slogwang 
66a9643ea8Slogwang     rc = MtFrame::recvfrom(sock, rcv_buf, buf_size, 0, (struct sockaddr*)&from_addr, (socklen_t*)&addr_len, timeout);
67a9643ea8Slogwang     if (rc < 0)
68a9643ea8Slogwang     {
695ac59bc4Slogwang         MT_ATTR_API(320845, 1);
70a9643ea8Slogwang         MTLOG_ERROR("mt_udpsendrcv recv failed, rc: %d, errno: %d (%m)", rc, errno);
71a9643ea8Slogwang         ret = -3;
72a9643ea8Slogwang         goto EXIT_LABEL;
73a9643ea8Slogwang     }
74a9643ea8Slogwang     buf_size = rc;
75a9643ea8Slogwang 
76a9643ea8Slogwang EXIT_LABEL:
77a9643ea8Slogwang 
78a9643ea8Slogwang     if (sock > 0)
79a9643ea8Slogwang     {
80a9643ea8Slogwang         close(sock);
81a9643ea8Slogwang         sock = -1;
82a9643ea8Slogwang     }
83a9643ea8Slogwang 
84a9643ea8Slogwang     return ret;
85a9643ea8Slogwang }
86a9643ea8Slogwang 
mt_tcp_create_sock(void)87a9643ea8Slogwang int mt_tcp_create_sock(void)
88a9643ea8Slogwang {
89a9643ea8Slogwang     int fd;
90a9643ea8Slogwang     int flag;
91a9643ea8Slogwang 
92a9643ea8Slogwang     fd = ::socket(AF_INET, SOCK_STREAM, 0);
93a9643ea8Slogwang     if (fd < 0)
94a9643ea8Slogwang     {
95a9643ea8Slogwang         MTLOG_ERROR("create tcp socket failed, error: %m");
96a9643ea8Slogwang         return -1;
97a9643ea8Slogwang     }
98a9643ea8Slogwang 
99a9643ea8Slogwang     flag = fcntl(fd, F_GETFL, 0);
100a9643ea8Slogwang     if (flag == -1)
101a9643ea8Slogwang     {
102a9643ea8Slogwang         ::close(fd);
103a9643ea8Slogwang         MTLOG_ERROR("get fd flags failed, error: %m");
104a9643ea8Slogwang         return -2;
105a9643ea8Slogwang     }
106a9643ea8Slogwang 
107a9643ea8Slogwang     if (flag & O_NONBLOCK)
108a9643ea8Slogwang         return fd;
109a9643ea8Slogwang 
110a9643ea8Slogwang     if (fcntl(fd, F_SETFL, flag | O_NONBLOCK | O_NDELAY) == -1)
111a9643ea8Slogwang     {
112a9643ea8Slogwang         ::close(fd);
113a9643ea8Slogwang         MTLOG_ERROR("set fd flags failed, error: %m");
114a9643ea8Slogwang         return -3;
115a9643ea8Slogwang     }
116a9643ea8Slogwang 
117a9643ea8Slogwang     return fd;
118a9643ea8Slogwang }
119a9643ea8Slogwang 
mt_tcp_get_keep_conn(struct sockaddr_in * dst,int & sock)120a9643ea8Slogwang static TcpKeepConn* mt_tcp_get_keep_conn(struct sockaddr_in* dst, int& sock)
121a9643ea8Slogwang {
122a9643ea8Slogwang     KqueuerObj* ntfy_obj = NtfyObjMgr::Instance()->GetNtfyObj(NTFY_OBJ_THREAD, 0);
123a9643ea8Slogwang     if (NULL == ntfy_obj)
124a9643ea8Slogwang     {
125a9643ea8Slogwang         MTLOG_ERROR("get notify failed, logit");
126a9643ea8Slogwang         return NULL;
127a9643ea8Slogwang     }
128a9643ea8Slogwang 
129a9643ea8Slogwang     TcpKeepConn* conn = dynamic_cast<TcpKeepConn*>(ConnectionMgr::Instance()->GetConnection(OBJ_TCP_KEEP, dst));
130a9643ea8Slogwang     if (NULL == conn)
131a9643ea8Slogwang     {
132a9643ea8Slogwang         MTLOG_ERROR("get connection failed, dst[%p]", dst);
133a9643ea8Slogwang         NtfyObjMgr::Instance()->FreeNtfyObj(ntfy_obj);
134a9643ea8Slogwang         return NULL;
135a9643ea8Slogwang     }
136a9643ea8Slogwang     conn->SetNtfyObj(ntfy_obj);
137a9643ea8Slogwang 
138a9643ea8Slogwang     int osfd = conn->CreateSocket();
139a9643ea8Slogwang     if (osfd < 0)
140a9643ea8Slogwang     {
141a9643ea8Slogwang         ConnectionMgr::Instance()->FreeConnection(conn, true);
142a9643ea8Slogwang         MTLOG_ERROR("create socket failed, ret[%d]", osfd);
143a9643ea8Slogwang         return NULL;
144a9643ea8Slogwang     }
145a9643ea8Slogwang 
146a9643ea8Slogwang     sock = osfd;
147a9643ea8Slogwang     return conn;
148a9643ea8Slogwang }
149a9643ea8Slogwang 
mt_tcp_check_recv(int sock,char * rcv_buf,int & len,int flags,int timeout,MtFuncTcpMsgLen func)150a9643ea8Slogwang static int mt_tcp_check_recv(int sock, char* rcv_buf, int &len, int flags, int timeout, MtFuncTcpMsgLen func)
151a9643ea8Slogwang {
152a9643ea8Slogwang     int recv_len = 0;
153a9643ea8Slogwang     utime64_t start_ms = MtFrame::Instance()->GetLastClock();
154a9643ea8Slogwang     do
155a9643ea8Slogwang     {
156a9643ea8Slogwang         utime64_t cost_time = MtFrame::Instance()->GetLastClock() - start_ms;
157a9643ea8Slogwang         if (cost_time > (utime64_t)timeout)
158a9643ea8Slogwang         {
159a9643ea8Slogwang             errno = ETIME;
160a9643ea8Slogwang             MTLOG_ERROR("tcp socket[%d] recv not ok, timeout", sock);
161a9643ea8Slogwang             return -3;
162a9643ea8Slogwang         }
163a9643ea8Slogwang 
164a9643ea8Slogwang         int rc = MtFrame::recv(sock, (rcv_buf + recv_len), (len - recv_len), 0, (timeout - (int)cost_time));
165a9643ea8Slogwang         if (rc < 0)
166a9643ea8Slogwang         {
167a9643ea8Slogwang             MTLOG_ERROR("tcp socket[%d] recv failed ret[%d][%m]", sock, rc);
168a9643ea8Slogwang             return -3;
169a9643ea8Slogwang         }
170a9643ea8Slogwang         else if (rc == 0)
171a9643ea8Slogwang         {
172a9643ea8Slogwang             len = recv_len;
173a9643ea8Slogwang             MTLOG_ERROR("tcp socket[%d] remote close", sock);
174a9643ea8Slogwang             return -7;
175a9643ea8Slogwang         }
176a9643ea8Slogwang         recv_len += rc;
177a9643ea8Slogwang 
178a9643ea8Slogwang         rc = func(rcv_buf, recv_len);
179a9643ea8Slogwang         if (rc < 0)
180a9643ea8Slogwang         {
181a9643ea8Slogwang             MTLOG_ERROR("tcp socket[%d] user check pkg error[%d]", sock, rc);
182a9643ea8Slogwang             return -5;
183a9643ea8Slogwang         }
1845ac59bc4Slogwang         else if (rc == 0)
185a9643ea8Slogwang         {
1865ac59bc4Slogwang             if (len == recv_len)
187a9643ea8Slogwang             {
188a9643ea8Slogwang                 MTLOG_ERROR("tcp socket[%d] user check pkg not ok, but no more buff", sock);
189a9643ea8Slogwang                 return -6;
190a9643ea8Slogwang             }
191a9643ea8Slogwang             continue;
192a9643ea8Slogwang         }
1935ac59bc4Slogwang         else
194a9643ea8Slogwang         {
1955ac59bc4Slogwang             if (rc > recv_len)
196a9643ea8Slogwang             {
197a9643ea8Slogwang                 continue;
198a9643ea8Slogwang             }
199a9643ea8Slogwang             else
200a9643ea8Slogwang             {
201a9643ea8Slogwang                 len = rc;
202a9643ea8Slogwang                 break;
203a9643ea8Slogwang             }
204a9643ea8Slogwang         }
205a9643ea8Slogwang     } while (true);
206a9643ea8Slogwang 
207a9643ea8Slogwang     return 0;
208a9643ea8Slogwang }
209a9643ea8Slogwang 
mt_tcpsendrcv(struct sockaddr_in * dst,void * pkg,int len,void * rcv_buf,int & buf_size,int timeout,MtFuncTcpMsgLen func)210a9643ea8Slogwang int mt_tcpsendrcv(struct sockaddr_in* dst, void* pkg, int len, void* rcv_buf, int& buf_size, int timeout, MtFuncTcpMsgLen func)
211a9643ea8Slogwang {
212a9643ea8Slogwang     if (!dst || !pkg || !rcv_buf || !func || len<1 || buf_size<1)
213a9643ea8Slogwang     {
214a9643ea8Slogwang         MTLOG_ERROR("input params invalid, dst[%p], pkg[%p], rcv_buf[%p], fun[%p], len[%d], buf_size[%d]",
215a9643ea8Slogwang             dst, pkg, rcv_buf, func, len, buf_size);
216a9643ea8Slogwang         return -10;
217a9643ea8Slogwang     }
218a9643ea8Slogwang 
219a9643ea8Slogwang     int ret = 0, rc = 0;
220a9643ea8Slogwang     int addr_len = sizeof(struct sockaddr_in);
221a9643ea8Slogwang     utime64_t start_ms = MtFrame::Instance()->GetLastClock();
222a9643ea8Slogwang     utime64_t cost_time = 0;
223a9643ea8Slogwang     int time_left = timeout;
224a9643ea8Slogwang 
225a9643ea8Slogwang     int sock = -1;
226a9643ea8Slogwang     TcpKeepConn* conn = mt_tcp_get_keep_conn(dst, sock);
227a9643ea8Slogwang     if ((conn == NULL) || (sock < 0))
228a9643ea8Slogwang     {
229a9643ea8Slogwang         MTLOG_ERROR("socket[%d] get conn failed, ret[%m]", sock);
230a9643ea8Slogwang         ret = -1;
231a9643ea8Slogwang         goto EXIT_LABEL;
232a9643ea8Slogwang     }
233a9643ea8Slogwang 
234a9643ea8Slogwang     rc = MtFrame::connect(sock, (struct sockaddr *)dst, addr_len, time_left);
235a9643ea8Slogwang     if (rc < 0)
236a9643ea8Slogwang     {
237a9643ea8Slogwang         MTLOG_ERROR("socket[%d] connect failed, ret[%d][%m]", sock, rc);
238a9643ea8Slogwang         ret = -4;
239a9643ea8Slogwang         goto EXIT_LABEL;
240a9643ea8Slogwang     }
241a9643ea8Slogwang 
242a9643ea8Slogwang     cost_time = MtFrame::Instance()->GetLastClock() - start_ms;
243a9643ea8Slogwang     time_left = (timeout > (int)cost_time) ? (timeout - (int)cost_time) : 0;
244a9643ea8Slogwang     rc = MtFrame::send(sock, pkg, len, 0, time_left);
245a9643ea8Slogwang     if (rc < 0)
246a9643ea8Slogwang     {
247a9643ea8Slogwang         MTLOG_ERROR("socket[%d] send failed, ret[%d][%m]", sock, rc);
248a9643ea8Slogwang         ret = -2;
249a9643ea8Slogwang         goto EXIT_LABEL;
250a9643ea8Slogwang     }
251a9643ea8Slogwang 
252a9643ea8Slogwang     cost_time = MtFrame::Instance()->GetLastClock() - start_ms;
253a9643ea8Slogwang     time_left = (timeout > (int)cost_time) ? (timeout - (int)cost_time) : 0;
254a9643ea8Slogwang     rc = mt_tcp_check_recv(sock, (char*)rcv_buf, buf_size, 0, time_left, func);
255a9643ea8Slogwang     if (rc < 0)
256a9643ea8Slogwang     {
257a9643ea8Slogwang         MTLOG_ERROR("socket[%d] rcv failed, ret[%d][%m]", sock, rc);
258a9643ea8Slogwang         ret = rc;
259a9643ea8Slogwang         goto EXIT_LABEL;
260a9643ea8Slogwang     }
261a9643ea8Slogwang 
262a9643ea8Slogwang     ret = 0;
263a9643ea8Slogwang 
264a9643ea8Slogwang EXIT_LABEL:
265a9643ea8Slogwang 
266a9643ea8Slogwang     if (conn != NULL)
267a9643ea8Slogwang     {
268a9643ea8Slogwang         ConnectionMgr::Instance()->FreeConnection(conn, (ret < 0));
269a9643ea8Slogwang     }
270a9643ea8Slogwang 
271a9643ea8Slogwang     return ret;
272a9643ea8Slogwang }
273a9643ea8Slogwang 
mt_tcpsendrcv_short(struct sockaddr_in * dst,void * pkg,int len,void * rcv_buf,int & buf_size,int timeout,MtFuncTcpMsgLen func)274a9643ea8Slogwang int mt_tcpsendrcv_short(struct sockaddr_in* dst, void* pkg, int len, void* rcv_buf, int& buf_size, int timeout, MtFuncTcpMsgLen func)
275a9643ea8Slogwang {
276a9643ea8Slogwang     int ret = 0, rc = 0;
277a9643ea8Slogwang     int addr_len = sizeof(struct sockaddr_in);
278a9643ea8Slogwang     utime64_t start_ms = MtFrame::Instance()->GetLastClock();
279a9643ea8Slogwang     utime64_t cost_time = 0;
280a9643ea8Slogwang     int time_left = timeout;
281a9643ea8Slogwang 
282a9643ea8Slogwang     if (!dst || !pkg || !rcv_buf || !func || len<1 || buf_size<1)
283a9643ea8Slogwang     {
284a9643ea8Slogwang         MTLOG_ERROR("input params invalid, dst[%p], pkg[%p], rcv_buf[%p], fun[%p], len[%d], buf_size[%d]",
285a9643ea8Slogwang             dst, pkg, rcv_buf, func, len, buf_size);
286a9643ea8Slogwang         return -10;
287a9643ea8Slogwang     }
288a9643ea8Slogwang 
289a9643ea8Slogwang     int sock;
290a9643ea8Slogwang     sock = mt_tcp_create_sock();
291a9643ea8Slogwang     if (sock < 0)
292a9643ea8Slogwang     {
293a9643ea8Slogwang         MTLOG_ERROR("create tcp socket failed, ret: %d", sock);
294a9643ea8Slogwang         return -1;
295a9643ea8Slogwang     }
296a9643ea8Slogwang 
297a9643ea8Slogwang     rc = MtFrame::connect(sock, (struct sockaddr *)dst, addr_len, time_left);
298a9643ea8Slogwang     if (rc < 0)
299a9643ea8Slogwang     {
300a9643ea8Slogwang         MTLOG_ERROR("socket[%d] connect failed, ret[%d][%m]", sock, rc);
301a9643ea8Slogwang         ret = -4;
302a9643ea8Slogwang         goto EXIT_LABEL;
303a9643ea8Slogwang     }
304a9643ea8Slogwang 
305a9643ea8Slogwang     cost_time = MtFrame::Instance()->GetLastClock() - start_ms;
306a9643ea8Slogwang     time_left = (timeout > (int)cost_time) ? (timeout - (int)cost_time) : 0;
307a9643ea8Slogwang     rc = MtFrame::send(sock, pkg, len, 0, time_left);
308a9643ea8Slogwang     if (rc < 0)
309a9643ea8Slogwang     {
310a9643ea8Slogwang         MTLOG_ERROR("socket[%d] send failed, ret[%d][%m]", sock, rc);
311a9643ea8Slogwang         ret = -2;
312a9643ea8Slogwang         goto EXIT_LABEL;
313a9643ea8Slogwang     }
314a9643ea8Slogwang 
315a9643ea8Slogwang     cost_time = MtFrame::Instance()->GetLastClock() - start_ms;
316a9643ea8Slogwang     time_left = (timeout > (int)cost_time) ? (timeout - (int)cost_time) : 0;
317a9643ea8Slogwang     rc = mt_tcp_check_recv(sock, (char*)rcv_buf, buf_size, 0, time_left, func);
318a9643ea8Slogwang     if (rc < 0)
319a9643ea8Slogwang     {
320a9643ea8Slogwang         MTLOG_ERROR("socket[%d] rcv failed, ret[%d][%m]", sock, rc);
321a9643ea8Slogwang         ret = rc;
322a9643ea8Slogwang         goto EXIT_LABEL;
323a9643ea8Slogwang     }
324a9643ea8Slogwang 
325a9643ea8Slogwang     ret = 0;
326a9643ea8Slogwang 
327a9643ea8Slogwang EXIT_LABEL:
328a9643ea8Slogwang     if (sock >= 0)
329a9643ea8Slogwang         ::close(sock);
330a9643ea8Slogwang 
331a9643ea8Slogwang     return ret;
332a9643ea8Slogwang }
333a9643ea8Slogwang 
mt_tcpsend(struct sockaddr_in * dst,void * pkg,int len,int timeout)334a9643ea8Slogwang int mt_tcpsend(struct sockaddr_in* dst, void* pkg, int len, int timeout)
335a9643ea8Slogwang {
336a9643ea8Slogwang     if (!dst || !pkg || len<1)
337a9643ea8Slogwang     {
338a9643ea8Slogwang         MTLOG_ERROR("input params invalid, dst[%p], pkg[%p], len[%d]", dst, pkg, len);
339a9643ea8Slogwang         return -10;
340a9643ea8Slogwang     }
341a9643ea8Slogwang 
342a9643ea8Slogwang     int ret = 0, rc = 0;
343a9643ea8Slogwang     int addr_len = sizeof(struct sockaddr_in);
344a9643ea8Slogwang     utime64_t start_ms = MtFrame::Instance()->GetLastClock();
345a9643ea8Slogwang     utime64_t cost_time = 0;
346a9643ea8Slogwang     int time_left = timeout;
347a9643ea8Slogwang 
348a9643ea8Slogwang     int sock = -1;
349a9643ea8Slogwang     TcpKeepConn* conn = mt_tcp_get_keep_conn(dst, sock);
350a9643ea8Slogwang     if ((conn == NULL) || (sock < 0))
351a9643ea8Slogwang     {
352a9643ea8Slogwang         MTLOG_ERROR("socket[%d] get conn failed, ret[%m]", sock);
353a9643ea8Slogwang         ret = -1;
354a9643ea8Slogwang         goto EXIT_LABEL;
355a9643ea8Slogwang     }
356a9643ea8Slogwang 
357a9643ea8Slogwang     rc = MtFrame::connect(sock, (struct sockaddr *)dst, addr_len, time_left);
358a9643ea8Slogwang     if (rc < 0)
359a9643ea8Slogwang     {
360a9643ea8Slogwang         MTLOG_ERROR("socket[%d] connect failed, ret[%d][%m]", sock, rc);
361a9643ea8Slogwang         ret = -4;
362a9643ea8Slogwang         goto EXIT_LABEL;
363a9643ea8Slogwang     }
364a9643ea8Slogwang 
365a9643ea8Slogwang     cost_time = MtFrame::Instance()->GetLastClock() - start_ms;
366a9643ea8Slogwang     time_left = (timeout > (int)cost_time) ? (timeout - (int)cost_time) : 0;
367a9643ea8Slogwang     rc = MtFrame::send(sock, pkg, len, 0, time_left);
368a9643ea8Slogwang     if (rc < 0)
369a9643ea8Slogwang     {
370a9643ea8Slogwang         MTLOG_ERROR("socket[%d] send failed, ret[%d][%m]", sock, rc);
371a9643ea8Slogwang         ret = -2;
372a9643ea8Slogwang         goto EXIT_LABEL;
373a9643ea8Slogwang     }
374a9643ea8Slogwang 
375a9643ea8Slogwang     ret = 0;
376a9643ea8Slogwang 
377a9643ea8Slogwang EXIT_LABEL:
378a9643ea8Slogwang 
379a9643ea8Slogwang     if (conn != NULL)
380a9643ea8Slogwang     {
381a9643ea8Slogwang         ConnectionMgr::Instance()->FreeConnection(conn, (ret < 0));
382a9643ea8Slogwang     }
383a9643ea8Slogwang 
384a9643ea8Slogwang     return ret;
385a9643ea8Slogwang }
386a9643ea8Slogwang 
mt_tcpsend_short(struct sockaddr_in * dst,void * pkg,int len,int timeout)387a9643ea8Slogwang int mt_tcpsend_short(struct sockaddr_in* dst, void* pkg, int len, int timeout)
388a9643ea8Slogwang {
389a9643ea8Slogwang     if (!dst || !pkg || len<1)
390a9643ea8Slogwang     {
391a9643ea8Slogwang         MTLOG_ERROR("input params invalid, dst[%p], pkg[%p], len[%d]", dst, pkg, len);
392a9643ea8Slogwang         return -10;
393a9643ea8Slogwang     }
394a9643ea8Slogwang 
395a9643ea8Slogwang     int ret = 0, rc = 0;
396a9643ea8Slogwang     int addr_len = sizeof(struct sockaddr_in);
397a9643ea8Slogwang     utime64_t start_ms = MtFrame::Instance()->GetLastClock();
398a9643ea8Slogwang     utime64_t cost_time = 0;
399a9643ea8Slogwang     int time_left = timeout;
400a9643ea8Slogwang 
401a9643ea8Slogwang     int sock = -1;
402a9643ea8Slogwang     sock = mt_tcp_create_sock();
403a9643ea8Slogwang     if (sock < 0)
404a9643ea8Slogwang     {
405a9643ea8Slogwang         MTLOG_ERROR("create tcp socket failed, ret: %d", sock);
406a9643ea8Slogwang         ret = -1;
407a9643ea8Slogwang         goto EXIT_LABEL;
408a9643ea8Slogwang     }
409a9643ea8Slogwang 
410a9643ea8Slogwang     rc = MtFrame::connect(sock, (struct sockaddr *)dst, addr_len, time_left);
411a9643ea8Slogwang     if (rc < 0)
412a9643ea8Slogwang     {
413a9643ea8Slogwang         MTLOG_ERROR("socket[%d] connect failed, ret[%d][%m]", sock, rc);
414a9643ea8Slogwang         ret = -4;
415a9643ea8Slogwang         goto EXIT_LABEL;
416a9643ea8Slogwang     }
417a9643ea8Slogwang 
418a9643ea8Slogwang     cost_time = MtFrame::Instance()->GetLastClock() - start_ms;
419a9643ea8Slogwang     time_left = (timeout > (int)cost_time) ? (timeout - (int)cost_time) : 0;
420a9643ea8Slogwang     rc = MtFrame::send(sock, pkg, len, 0, time_left);
421a9643ea8Slogwang     if (rc < 0)
422a9643ea8Slogwang     {
423a9643ea8Slogwang         MTLOG_ERROR("socket[%d] send failed, ret[%d][%m]", sock, rc);
424a9643ea8Slogwang         ret = -2;
425a9643ea8Slogwang         goto EXIT_LABEL;
426a9643ea8Slogwang     }
427a9643ea8Slogwang 
428a9643ea8Slogwang     ret = 0;
429a9643ea8Slogwang 
430a9643ea8Slogwang EXIT_LABEL:
431a9643ea8Slogwang 
432a9643ea8Slogwang     if (sock >= 0)
433a9643ea8Slogwang         ::close(sock);
434a9643ea8Slogwang 
435a9643ea8Slogwang     return ret;
436a9643ea8Slogwang }
437a9643ea8Slogwang 
mt_tcpsendrcv_ex(struct sockaddr_in * dst,void * pkg,int len,void * rcv_buf,int * buf_size,int timeout,MtFuncTcpMsgLen func,MT_TCP_CONN_TYPE type)438a9643ea8Slogwang int mt_tcpsendrcv_ex(struct sockaddr_in* dst, void* pkg, int len, void* rcv_buf, int* buf_size, int timeout, MtFuncTcpMsgLen func, MT_TCP_CONN_TYPE type)
439a9643ea8Slogwang {
440a9643ea8Slogwang     if(!dst || !pkg || len<1)
441a9643ea8Slogwang     {
442a9643ea8Slogwang         MTLOG_ERROR("input params invalid, dst[%p], pkg[%p], rcv_buf[%p], fun[%p], len[%d], buf_size[%p]type[%d]",
443a9643ea8Slogwang                     dst, pkg, rcv_buf, func, len, buf_size,type);
444a9643ea8Slogwang         return -10;
445a9643ea8Slogwang     }
446a9643ea8Slogwang 
447a9643ea8Slogwang     switch (type)
448a9643ea8Slogwang     {
449a9643ea8Slogwang         case MT_TCP_LONG:
450a9643ea8Slogwang         {
451a9643ea8Slogwang             return mt_tcpsendrcv(dst, pkg, len, rcv_buf, *buf_size, timeout, func);
452a9643ea8Slogwang         }
453a9643ea8Slogwang 
454a9643ea8Slogwang         case MT_TCP_LONG_SNDONLY:
455a9643ea8Slogwang         {
456a9643ea8Slogwang             return mt_tcpsend(dst, pkg, len, timeout);
457a9643ea8Slogwang         }
458a9643ea8Slogwang 
459a9643ea8Slogwang         case MT_TCP_SHORT:
460a9643ea8Slogwang         {
461a9643ea8Slogwang             return mt_tcpsendrcv_short(dst, pkg, len, rcv_buf, *buf_size, timeout, func);
462a9643ea8Slogwang         }
463a9643ea8Slogwang 
464a9643ea8Slogwang         case MT_TCP_SHORT_SNDONLY:
465a9643ea8Slogwang         {
466a9643ea8Slogwang             return mt_tcpsend_short(dst, pkg, len, timeout);
467a9643ea8Slogwang         }
468a9643ea8Slogwang 
469a9643ea8Slogwang         default:
470a9643ea8Slogwang         {
471a9643ea8Slogwang             MTLOG_ERROR("input params invalid, dst[%p], pkg[%p], rcv_buf[%p], fun[%p], len[%d], buf_size[%p]type[%d]",
472a9643ea8Slogwang                         dst, pkg, rcv_buf, func, len, buf_size,type);
473a9643ea8Slogwang             return -10;
474a9643ea8Slogwang         }
475a9643ea8Slogwang     }
476a9643ea8Slogwang 
477a9643ea8Slogwang     return 0;
478a9643ea8Slogwang }
479a9643ea8Slogwang 
mt_task_process(void * arg)480a9643ea8Slogwang static void mt_task_process(void* arg)
481a9643ea8Slogwang {
482a9643ea8Slogwang     int rc = 0;
483a9643ea8Slogwang     IMtTask* task = (IMtTask*)arg;
484a9643ea8Slogwang     if (!task)
485a9643ea8Slogwang     {
486a9643ea8Slogwang         MTLOG_ERROR("Invalid arg, error");
487a9643ea8Slogwang         return;
488a9643ea8Slogwang     }
489a9643ea8Slogwang 
490a9643ea8Slogwang     rc = task->Process();
491a9643ea8Slogwang     if (rc != 0)
492a9643ea8Slogwang     {
493a9643ea8Slogwang         MTLOG_DEBUG("task process failed(%d), log", rc);
494a9643ea8Slogwang     }
495a9643ea8Slogwang 
496a9643ea8Slogwang     task->SetResult(rc);
497a9643ea8Slogwang 
498a9643ea8Slogwang     return;
499a9643ea8Slogwang };
500a9643ea8Slogwang 
mt_exec_all_task(IMtTaskList & req_list)501a9643ea8Slogwang int mt_exec_all_task(IMtTaskList& req_list)
502a9643ea8Slogwang {
503a9643ea8Slogwang     MtFrame* mtframe    = MtFrame::Instance();
504a9643ea8Slogwang     MicroThread* thread = mtframe->GetActiveThread();
505a9643ea8Slogwang     IMtTask* task       = NULL;
506a9643ea8Slogwang     MicroThread* sub    = NULL;
507a9643ea8Slogwang     MicroThread* tmp    = NULL;
508a9643ea8Slogwang     int rc              = -1;
509a9643ea8Slogwang 
510a9643ea8Slogwang     MicroThread::SubThreadList list;
511a9643ea8Slogwang     TAILQ_INIT(&list);
512a9643ea8Slogwang 
513a9643ea8Slogwang     if (0 == req_list.size())
514a9643ea8Slogwang     {
515a9643ea8Slogwang         MTLOG_DEBUG("no task for execult");
516a9643ea8Slogwang         return 0;
517a9643ea8Slogwang     }
518a9643ea8Slogwang 
519a9643ea8Slogwang     for (IMtTaskList::iterator it = req_list.begin(); it != req_list.end(); ++it)
520a9643ea8Slogwang     {
521a9643ea8Slogwang         task = *it;
522a9643ea8Slogwang         sub = MtFrame::CreateThread(mt_task_process, task, false);
523a9643ea8Slogwang         if (NULL == sub)
524a9643ea8Slogwang         {
525a9643ea8Slogwang             MTLOG_ERROR("create sub thread failed");
526a9643ea8Slogwang             goto EXIT_LABEL;
527a9643ea8Slogwang         }
528a9643ea8Slogwang 
529a9643ea8Slogwang         sub->SetType(MicroThread::SUB_THREAD);
530a9643ea8Slogwang         TAILQ_INSERT_TAIL(&list, sub, _sub_entry);
531a9643ea8Slogwang     }
532a9643ea8Slogwang 
533a9643ea8Slogwang     TAILQ_FOREACH_SAFE(sub, &list, _sub_entry, tmp)
534a9643ea8Slogwang     {
535a9643ea8Slogwang         TAILQ_REMOVE(&list, sub, _sub_entry);
536a9643ea8Slogwang         thread->AddSubThread(sub);
537a9643ea8Slogwang         mtframe->InsertRunable(sub);
538a9643ea8Slogwang     }
539a9643ea8Slogwang 
540a9643ea8Slogwang     thread->Wait();
541a9643ea8Slogwang     rc = 0;
542a9643ea8Slogwang 
543a9643ea8Slogwang EXIT_LABEL:
544a9643ea8Slogwang 
545a9643ea8Slogwang     TAILQ_FOREACH_SAFE(sub, &list, _sub_entry, tmp)
546a9643ea8Slogwang     {
547a9643ea8Slogwang         TAILQ_REMOVE(&list, sub, _sub_entry);
548a9643ea8Slogwang         mtframe->FreeThread(sub);
549a9643ea8Slogwang     }
550a9643ea8Slogwang 
551a9643ea8Slogwang     return rc;
552a9643ea8Slogwang 
553a9643ea8Slogwang }
554a9643ea8Slogwang 
mt_set_msg_private(void * data)555a9643ea8Slogwang void mt_set_msg_private(void *data)
556a9643ea8Slogwang {
557a9643ea8Slogwang     MicroThread *msg_thread = MtFrame::Instance()->GetRootThread();
558a9643ea8Slogwang     if (msg_thread != NULL)
559a9643ea8Slogwang         msg_thread->SetPrivate(data);
560a9643ea8Slogwang }
561a9643ea8Slogwang 
mt_get_msg_private()562a9643ea8Slogwang void* mt_get_msg_private()
563a9643ea8Slogwang {
564a9643ea8Slogwang     MicroThread *msg_thread = MtFrame::Instance()->GetRootThread();
565a9643ea8Slogwang     if (NULL == msg_thread)
566a9643ea8Slogwang     {
567a9643ea8Slogwang         return NULL;
568a9643ea8Slogwang     }
569a9643ea8Slogwang 
570a9643ea8Slogwang     return msg_thread->GetPrivate();
571a9643ea8Slogwang }
572a9643ea8Slogwang 
mt_init_frame(int argc,char * const argv[])573a02c88d6Slogwang bool mt_init_frame(int argc, char * const argv[])
574a9643ea8Slogwang {
575a02c88d6Slogwang     if (argc) {
576a02c88d6Slogwang         ff_init(argc, argv);
577a9643ea8Slogwang         ff_set_hook_flag();
578a9643ea8Slogwang     }
579a9643ea8Slogwang     memset(&g_mt_syscall_tab, 0, sizeof(g_mt_syscall_tab));
580a9643ea8Slogwang     return MtFrame::Instance()->InitFrame();
581a9643ea8Slogwang }
582a9643ea8Slogwang 
mt_set_stack_size(unsigned int bytes)583a9643ea8Slogwang void mt_set_stack_size(unsigned int bytes)
584a9643ea8Slogwang {
585a9643ea8Slogwang     ThreadPool::SetDefaultStackSize(bytes);
586a9643ea8Slogwang }
587a9643ea8Slogwang 
mt_recvfrom(int fd,void * buf,int len,int flags,struct sockaddr * from,socklen_t * fromlen,int timeout)588a9643ea8Slogwang int mt_recvfrom(int fd, void *buf, int len, int flags, struct sockaddr *from, socklen_t *fromlen, int timeout)
589a9643ea8Slogwang {
590a9643ea8Slogwang     return MtFrame::recvfrom(fd, buf, len, flags, from, fromlen, timeout);
591a9643ea8Slogwang }
592a9643ea8Slogwang 
mt_sendto(int fd,const void * msg,int len,int flags,const struct sockaddr * to,int tolen,int timeout)593a9643ea8Slogwang int mt_sendto(int fd, const void *msg, int len, int flags, const struct sockaddr *to, int tolen, int timeout)
594a9643ea8Slogwang {
595a9643ea8Slogwang     return MtFrame::sendto(fd, msg, len, flags, to, tolen, timeout);
596a9643ea8Slogwang }
597a9643ea8Slogwang 
mt_connect(int fd,const struct sockaddr * addr,int addrlen,int timeout)598a9643ea8Slogwang int mt_connect(int fd, const struct sockaddr *addr, int addrlen, int timeout)
599a9643ea8Slogwang {
600a9643ea8Slogwang     return MtFrame::connect(fd, addr, addrlen, timeout);
601a9643ea8Slogwang }
602a9643ea8Slogwang 
mt_accept(int fd,struct sockaddr * addr,socklen_t * addrlen,int timeout)603a9643ea8Slogwang int mt_accept(int fd, struct sockaddr *addr, socklen_t *addrlen, int timeout)
604a9643ea8Slogwang {
605a9643ea8Slogwang     return MtFrame::accept(fd, addr, addrlen, timeout);
606a9643ea8Slogwang }
607a9643ea8Slogwang 
mt_read(int fd,void * buf,size_t nbyte,int timeout)608a9643ea8Slogwang ssize_t mt_read(int fd, void *buf, size_t nbyte, int timeout)
609a9643ea8Slogwang {
610a9643ea8Slogwang     return MtFrame::read(fd, buf, nbyte, timeout);
611a9643ea8Slogwang }
612a9643ea8Slogwang 
mt_write(int fd,const void * buf,size_t nbyte,int timeout)613a9643ea8Slogwang ssize_t mt_write(int fd, const void *buf, size_t nbyte, int timeout)
614a9643ea8Slogwang {
615a9643ea8Slogwang     return MtFrame::write(fd, buf, nbyte, timeout);
616a9643ea8Slogwang }
617a9643ea8Slogwang 
mt_recv(int fd,void * buf,int len,int flags,int timeout)618a9643ea8Slogwang ssize_t mt_recv(int fd, void *buf, int len, int flags, int timeout)
619a9643ea8Slogwang {
620a9643ea8Slogwang     return MtFrame::recv(fd, buf, len, flags, timeout);
621a9643ea8Slogwang }
622a9643ea8Slogwang 
mt_send(int fd,const void * buf,size_t nbyte,int flags,int timeout)623a9643ea8Slogwang ssize_t mt_send(int fd, const void *buf, size_t nbyte, int flags, int timeout)
624a9643ea8Slogwang {
625a9643ea8Slogwang     return MtFrame::send(fd, buf, nbyte, flags, timeout);
626a9643ea8Slogwang }
627a9643ea8Slogwang 
mt_sleep(int ms)628a9643ea8Slogwang void mt_sleep(int ms)
629a9643ea8Slogwang {
630a9643ea8Slogwang     MtFrame::sleep(ms);
631a9643ea8Slogwang }
632a9643ea8Slogwang 
mt_time_ms(void)633a9643ea8Slogwang unsigned long long mt_time_ms(void)
634a9643ea8Slogwang {
635a9643ea8Slogwang     return MtFrame::Instance()->GetLastClock();
636a9643ea8Slogwang }
637a9643ea8Slogwang 
mt_wait_events(int fd,int events,int timeout)638a9643ea8Slogwang int mt_wait_events(int fd, int events, int timeout)
639a9643ea8Slogwang {
640a9643ea8Slogwang     return MtFrame::Instance()->WaitEvents(fd, events, timeout);
641a9643ea8Slogwang }
642a9643ea8Slogwang 
mt_start_thread(void * entry,void * args)643a9643ea8Slogwang void* mt_start_thread(void* entry, void* args)
644a9643ea8Slogwang {
645a9643ea8Slogwang     return MtFrame::Instance()->CreateThread((ThreadStart)entry, args, true);
646a9643ea8Slogwang }
647a9643ea8Slogwang 
mt_active_thread()648*84bcae25Swoolen void* mt_active_thread()
649*84bcae25Swoolen {
650*84bcae25Swoolen     return MtFrame::Instance()->GetActiveThread();
651*84bcae25Swoolen }
652*84bcae25Swoolen 
mt_thread_wait(int ms)653*84bcae25Swoolen void mt_thread_wait(int ms)
654*84bcae25Swoolen {
655*84bcae25Swoolen     MtFrame::Instance()->WaitNotify(ms);
656*84bcae25Swoolen }
657*84bcae25Swoolen 
mt_thread_wakeup_wait(void * thread_p)658*84bcae25Swoolen void mt_thread_wakeup_wait(void * thread_p)
659*84bcae25Swoolen {
660*84bcae25Swoolen     MtFrame::Instance()->NotifyThread((MicroThread *) thread_p);
661*84bcae25Swoolen }
662*84bcae25Swoolen 
mt_swap_thread()663*84bcae25Swoolen void mt_swap_thread()
664*84bcae25Swoolen {
665*84bcae25Swoolen     return MtFrame::Instance()->SwapDaemonThread();
666*84bcae25Swoolen }
667*84bcae25Swoolen 
668a9643ea8Slogwang #define BUF_ALIGNMENT_SIZE 4096
669a9643ea8Slogwang #define BUF_ALIGN_SIZE(x) (((x)+BUF_ALIGNMENT_SIZE-1)&~(BUF_ALIGNMENT_SIZE-1))
670a9643ea8Slogwang #define BUF_DEFAULT_SIZE 4096
671a9643ea8Slogwang 
672a9643ea8Slogwang class ScopedBuf
673a9643ea8Slogwang {
674a9643ea8Slogwang public:
ScopedBuf(void * & buf_keeper,bool keep)675a9643ea8Slogwang     ScopedBuf(void*& buf_keeper, bool keep)
676a9643ea8Slogwang     :buf_keeper_(buf_keeper),buf_(0),len_(0),len_watermark_(0),keep_(keep)
677a9643ea8Slogwang     {}
678a9643ea8Slogwang 
Alloc(int len)679a9643ea8Slogwang     int Alloc(int len)
680a9643ea8Slogwang     {
681a9643ea8Slogwang         if(len<len_)
682a9643ea8Slogwang         {
6835ac59bc4Slogwang             return -1;
684a9643ea8Slogwang         }
685a9643ea8Slogwang 
686a9643ea8Slogwang         if(len==0)
687a9643ea8Slogwang         {
688a9643ea8Slogwang             len = BUF_ALIGNMENT_SIZE;
689a9643ea8Slogwang         }
690a9643ea8Slogwang         if(len_==len)
691a9643ea8Slogwang         {
692a9643ea8Slogwang             return 0;
693a9643ea8Slogwang         }
694a9643ea8Slogwang 
695a9643ea8Slogwang         len_ = BUF_ALIGN_SIZE(len);
696a9643ea8Slogwang         if(len_==0)
697a9643ea8Slogwang         {
698a9643ea8Slogwang             len_ = BUF_DEFAULT_SIZE;
699a9643ea8Slogwang         }
700a9643ea8Slogwang         len_watermark_ = len_-BUF_ALIGNMENT_SIZE;
701a9643ea8Slogwang         char* tmp = (char*)realloc(buf_, len_);
702a9643ea8Slogwang         if(tmp==NULL)
703a9643ea8Slogwang         {
7045ac59bc4Slogwang             return -2;
705a9643ea8Slogwang         }
706a9643ea8Slogwang 
707a9643ea8Slogwang         buf_ = tmp;
708a9643ea8Slogwang         return 0;
709a9643ea8Slogwang     }
710a9643ea8Slogwang 
reset()711a9643ea8Slogwang     void reset()
712a9643ea8Slogwang     {
713a9643ea8Slogwang         if(keep_)
714a9643ea8Slogwang         {
715a9643ea8Slogwang             buf_keeper_ = (void*)buf_;
716a9643ea8Slogwang             buf_ = NULL;
717a9643ea8Slogwang         }
718a9643ea8Slogwang     }
719a9643ea8Slogwang 
~ScopedBuf()720a9643ea8Slogwang     ~ScopedBuf()
721a9643ea8Slogwang     {
722a9643ea8Slogwang         if(buf_!=NULL)
723a9643ea8Slogwang         {
724a9643ea8Slogwang             free(buf_);
725a9643ea8Slogwang             buf_ = NULL;
726a9643ea8Slogwang         }
727a9643ea8Slogwang     }
728a9643ea8Slogwang 
729a9643ea8Slogwang public:
730a9643ea8Slogwang     void* &buf_keeper_;
731a9643ea8Slogwang     char* buf_;
732a9643ea8Slogwang     int   len_;
733a9643ea8Slogwang     int   len_watermark_;
734a9643ea8Slogwang     bool  keep_;
735a9643ea8Slogwang 
736a9643ea8Slogwang };
737a9643ea8Slogwang 
mt_tcp_check_recv(int sock,void * & rcv_buf,int & len,int flags,int timeout,MtFuncTcpMsgChecker check_func,void * msg_ctx,bool keep_rcv_buf)738a9643ea8Slogwang static int mt_tcp_check_recv(int sock, void*& rcv_buf, int &len, int flags,
739a9643ea8Slogwang                      int timeout, MtFuncTcpMsgChecker check_func, void* msg_ctx, bool keep_rcv_buf)
740a9643ea8Slogwang {
741a9643ea8Slogwang     int recv_len = 0;
742a9643ea8Slogwang     utime64_t start_ms = MtFrame::Instance()->GetLastClock();
743a9643ea8Slogwang 
744a9643ea8Slogwang     int rc = 0;
745a9643ea8Slogwang     int ret = 0;
746a9643ea8Slogwang     int pkg_len = 0;
747a9643ea8Slogwang     bool msg_len_detected = false;
748a9643ea8Slogwang 
749a9643ea8Slogwang     ScopedBuf sbuf(rcv_buf, keep_rcv_buf);
750a9643ea8Slogwang     ret = sbuf.Alloc(len);
751a9643ea8Slogwang 
752a9643ea8Slogwang     if(ret!=0)
753a9643ea8Slogwang     {
754a9643ea8Slogwang         MTLOG_ERROR("tcp socket[%d] recv failed ret[%d], alloc rcv buf failed, [%m]", sock, ret);
755a9643ea8Slogwang         return -11;
756a9643ea8Slogwang     }
757a9643ea8Slogwang 
758a9643ea8Slogwang     do
759a9643ea8Slogwang     {
760a9643ea8Slogwang         utime64_t cost_time = MtFrame::Instance()->GetLastClock() - start_ms;
761a9643ea8Slogwang         if (cost_time > (utime64_t)timeout)
762a9643ea8Slogwang         {
763a9643ea8Slogwang             errno = ETIME;
764a9643ea8Slogwang             MTLOG_ERROR("tcp socket[%d] recv not ok, timeout", sock);
765a9643ea8Slogwang             return -3;
766a9643ea8Slogwang         }
767a9643ea8Slogwang 
768a9643ea8Slogwang         rc = MtFrame::recv(sock, (sbuf.buf_ + recv_len), (sbuf.len_ - recv_len), 0, (timeout - (int)cost_time));
769a9643ea8Slogwang         if (rc < 0)
770a9643ea8Slogwang         {
771a9643ea8Slogwang             MTLOG_ERROR("tcp socket[%d] recv failed ret[%d][%m]", sock, rc);
772a9643ea8Slogwang             return -3;
773a9643ea8Slogwang         }
7745ac59bc4Slogwang         else if (rc == 0)
775a9643ea8Slogwang         {
776a9643ea8Slogwang 
7775ac59bc4Slogwang             if(recv_len==0)
778a9643ea8Slogwang             {
779a9643ea8Slogwang                 MTLOG_ERROR("tcp socket[%d] remote close", sock);
780a9643ea8Slogwang                 return -7;
781a9643ea8Slogwang             }
782a9643ea8Slogwang 
783a9643ea8Slogwang             rc = check_func(sbuf.buf_, recv_len, true, msg_ctx, msg_len_detected);
784a9643ea8Slogwang 
7855ac59bc4Slogwang             if(rc!=recv_len)
786a9643ea8Slogwang             {
787a9643ea8Slogwang                 MTLOG_ERROR("tcp socket[%d] remote close", sock);
788a9643ea8Slogwang                 return -7;
789a9643ea8Slogwang             }
790a9643ea8Slogwang             len = recv_len;
791a9643ea8Slogwang             break;
792a9643ea8Slogwang         }
793a9643ea8Slogwang         recv_len += rc;
794a9643ea8Slogwang 
795a9643ea8Slogwang         if((!msg_len_detected)||recv_len==pkg_len)
796a9643ea8Slogwang         {
797a9643ea8Slogwang             rc = check_func(sbuf.buf_, recv_len, false, msg_ctx,msg_len_detected);
798a9643ea8Slogwang             if(msg_len_detected)
799a9643ea8Slogwang             {
800a9643ea8Slogwang                 pkg_len = rc;
801a9643ea8Slogwang             }
802a9643ea8Slogwang         }
803a9643ea8Slogwang         else
804a9643ea8Slogwang         {
805a9643ea8Slogwang             rc = pkg_len;
806a9643ea8Slogwang         }
807a9643ea8Slogwang 
808a9643ea8Slogwang         if (rc < 0)
809a9643ea8Slogwang         {
810a9643ea8Slogwang             MTLOG_ERROR("tcp socket[%d] user check pkg error[%d]", sock, rc);
811a9643ea8Slogwang             return -5;
812a9643ea8Slogwang         }
8135ac59bc4Slogwang         else if (rc == 0)
814a9643ea8Slogwang         {
815a9643ea8Slogwang             if(sbuf.len_ > recv_len)
816a9643ea8Slogwang             {
817a9643ea8Slogwang                 continue;
818a9643ea8Slogwang             }
8195ac59bc4Slogwang 
820a9643ea8Slogwang             ret = sbuf.Alloc(sbuf.len_<<1);
821a9643ea8Slogwang 
822a9643ea8Slogwang             if(ret!=0)
823a9643ea8Slogwang             {
824a9643ea8Slogwang                 MTLOG_ERROR("tcp socket[%d] recv failed ret[%d], alloc rcv buf failed, [%m]", sock, ret);
825a9643ea8Slogwang                 return -11;
826a9643ea8Slogwang             }
827a9643ea8Slogwang         }
8285ac59bc4Slogwang         else
829a9643ea8Slogwang         {
8305ac59bc4Slogwang             if (rc > recv_len)
831a9643ea8Slogwang             {
8325ac59bc4Slogwang                 if(sbuf.len_ > recv_len)
833a9643ea8Slogwang                 {
834a9643ea8Slogwang                     continue;
835a9643ea8Slogwang                 }
836a9643ea8Slogwang 
837a9643ea8Slogwang                 ret = sbuf.Alloc(rc);
838a9643ea8Slogwang 
839a9643ea8Slogwang                 if(ret!=0)
840a9643ea8Slogwang                 {
841a9643ea8Slogwang                     MTLOG_ERROR("tcp socket[%d] recv failed ret[%d], alloc rcv buf failed, [%m]", sock, ret);
842a9643ea8Slogwang                     return -11;
843a9643ea8Slogwang                 }
844a9643ea8Slogwang             }
8455ac59bc4Slogwang             else if(rc==recv_len)
846a9643ea8Slogwang             {
847a9643ea8Slogwang                 len = rc;
848a9643ea8Slogwang                 break;
849a9643ea8Slogwang             }
8505ac59bc4Slogwang             else
851a9643ea8Slogwang             {
852a9643ea8Slogwang                 MTLOG_ERROR("tcp socket[%d] user check pkg error, pkg len < recv_len", sock);
853a9643ea8Slogwang                 return -5;
854a9643ea8Slogwang             }
855a9643ea8Slogwang         }
856a9643ea8Slogwang     } while (true);
857a9643ea8Slogwang 
858a9643ea8Slogwang     sbuf.reset();
859a9643ea8Slogwang 
860a9643ea8Slogwang     return 0;
861a9643ea8Slogwang }
862a9643ea8Slogwang 
mt_tcpsendrcv(struct sockaddr_in * dst,void * pkg,int len,void * & rcv_buf,int & recv_pkg_size,int timeout,MtFuncTcpMsgChecker check_func,void * msg_ctx,bool keep_rcv_buf)863a9643ea8Slogwang int mt_tcpsendrcv(struct sockaddr_in* dst, void* pkg, int len, void*& rcv_buf, int& recv_pkg_size,
864a9643ea8Slogwang                      int timeout, MtFuncTcpMsgChecker check_func, void* msg_ctx, bool keep_rcv_buf)
865a9643ea8Slogwang {
866a9643ea8Slogwang     if(!dst || !pkg || len<1)
867a9643ea8Slogwang     {
868a9643ea8Slogwang         MTLOG_ERROR("input params invalid, dst[%p], pkg[%p], len[%d], fun[%p]",
869a9643ea8Slogwang                     dst, pkg, len, check_func);
870a9643ea8Slogwang         return -10;
871a9643ea8Slogwang     }
872a9643ea8Slogwang 
873a9643ea8Slogwang 
874a9643ea8Slogwang     int ret = 0, rc = 0;
875a9643ea8Slogwang     int addr_len = sizeof(struct sockaddr_in);
876a9643ea8Slogwang     utime64_t start_ms = MtFrame::Instance()->GetLastClock();
877a9643ea8Slogwang     utime64_t cost_time = 0;
878a9643ea8Slogwang     int time_left = timeout;
879a9643ea8Slogwang 
880a9643ea8Slogwang     int sock = -1;
881a9643ea8Slogwang     TcpKeepConn* conn = mt_tcp_get_keep_conn(dst, sock);
882a9643ea8Slogwang     if ((conn == NULL) || (sock < 0))
883a9643ea8Slogwang     {
884a9643ea8Slogwang         MTLOG_ERROR("socket[%d] get conn failed, ret[%m]", sock);
885a9643ea8Slogwang         ret = -1;
886a9643ea8Slogwang         goto EXIT_LABEL;
887a9643ea8Slogwang     }
888a9643ea8Slogwang 
889a9643ea8Slogwang     rc = MtFrame::connect(sock, (struct sockaddr *)dst, addr_len, time_left);
890a9643ea8Slogwang     if (rc < 0)
891a9643ea8Slogwang     {
892a9643ea8Slogwang         MTLOG_ERROR("socket[%d] connect failed, ret[%d][%m]", sock, rc);
893a9643ea8Slogwang         ret = -4;
894a9643ea8Slogwang         goto EXIT_LABEL;
895a9643ea8Slogwang     }
896a9643ea8Slogwang 
897a9643ea8Slogwang     cost_time = MtFrame::Instance()->GetLastClock() - start_ms;
898a9643ea8Slogwang     time_left = (timeout > (int)cost_time) ? (timeout - (int)cost_time) : 0;
899a9643ea8Slogwang     rc = MtFrame::send(sock, pkg, len, 0, time_left);
900a9643ea8Slogwang     if (rc < 0)
901a9643ea8Slogwang     {
902a9643ea8Slogwang         MTLOG_ERROR("socket[%d] send failed, ret[%d][%m]", sock, rc);
903a9643ea8Slogwang         ret = -2;
904a9643ea8Slogwang         goto EXIT_LABEL;
905a9643ea8Slogwang     }
906a9643ea8Slogwang 
907a9643ea8Slogwang     cost_time = MtFrame::Instance()->GetLastClock() - start_ms;
908a9643ea8Slogwang     time_left = (timeout > (int)cost_time) ? (timeout - (int)cost_time) : 0;
909a9643ea8Slogwang 
910a9643ea8Slogwang     rc = mt_tcp_check_recv(sock, rcv_buf, recv_pkg_size, 0, time_left, check_func, msg_ctx, keep_rcv_buf);
911a9643ea8Slogwang     if (rc < 0)
912a9643ea8Slogwang     {
913a9643ea8Slogwang         MTLOG_ERROR("socket[%d] rcv failed, ret[%d][%m]", sock, rc);
914a9643ea8Slogwang         ret = rc;
915a9643ea8Slogwang         goto EXIT_LABEL;
916a9643ea8Slogwang     }
917a9643ea8Slogwang 
918a9643ea8Slogwang     ret = 0;
919a9643ea8Slogwang 
920a9643ea8Slogwang EXIT_LABEL:
921a9643ea8Slogwang     if (conn != NULL)
922a9643ea8Slogwang     {
923a9643ea8Slogwang         ConnectionMgr::Instance()->FreeConnection(conn, (ret < 0));
924a9643ea8Slogwang     }
925a9643ea8Slogwang 
926a9643ea8Slogwang     return ret;
927a9643ea8Slogwang }
928a9643ea8Slogwang 
mt_tcpsendrcv_short(struct sockaddr_in * dst,void * pkg,int len,void * & rcv_buf,int & recv_pkg_size,int timeout,MtFuncTcpMsgChecker check_func,void * msg_ctx,bool keep_rcv_buf)929a9643ea8Slogwang int mt_tcpsendrcv_short(struct sockaddr_in* dst, void* pkg, int len, void*& rcv_buf, int& recv_pkg_size,
930a9643ea8Slogwang                      int timeout, MtFuncTcpMsgChecker check_func, void* msg_ctx, bool keep_rcv_buf)
931a9643ea8Slogwang {
932a9643ea8Slogwang     int ret = 0, rc = 0;
933a9643ea8Slogwang     int addr_len = sizeof(struct sockaddr_in);
934a9643ea8Slogwang     utime64_t start_ms = MtFrame::Instance()->GetLastClock();
935a9643ea8Slogwang     utime64_t cost_time = 0;
936a9643ea8Slogwang     int time_left = timeout;
937a9643ea8Slogwang 
938a9643ea8Slogwang     if(!dst || !pkg || len<1)
939a9643ea8Slogwang     {
940a9643ea8Slogwang         MTLOG_ERROR("input params invalid, dst[%p], pkg[%p], len[%d], fun[%p]",
941a9643ea8Slogwang                     dst, pkg, len, check_func);
942a9643ea8Slogwang         return -10;
943a9643ea8Slogwang     }
944a9643ea8Slogwang 
945a9643ea8Slogwang     int sock;
946a9643ea8Slogwang     sock = mt_tcp_create_sock();
947a9643ea8Slogwang     if (sock < 0)
948a9643ea8Slogwang     {
949a9643ea8Slogwang         MTLOG_ERROR("create tcp socket failed, ret: %d", sock);
950a9643ea8Slogwang         return -1;
951a9643ea8Slogwang     }
952a9643ea8Slogwang 
953a9643ea8Slogwang     rc = MtFrame::connect(sock, (struct sockaddr *)dst, addr_len, time_left);
954a9643ea8Slogwang     if (rc < 0)
955a9643ea8Slogwang     {
956a9643ea8Slogwang         MTLOG_ERROR("socket[%d] connect failed, ret[%d][%m]", sock, rc);
957a9643ea8Slogwang         ret = -4;
958a9643ea8Slogwang         goto EXIT_LABEL;
959a9643ea8Slogwang     }
960a9643ea8Slogwang 
961a9643ea8Slogwang     cost_time = MtFrame::Instance()->GetLastClock() - start_ms;
962a9643ea8Slogwang     time_left = (timeout > (int)cost_time) ? (timeout - (int)cost_time) : 0;
963a9643ea8Slogwang     rc = MtFrame::send(sock, pkg, len, 0, time_left);
964a9643ea8Slogwang     if (rc < 0)
965a9643ea8Slogwang     {
966a9643ea8Slogwang         MTLOG_ERROR("socket[%d] send failed, ret[%d][%m]", sock, rc);
967a9643ea8Slogwang         ret = -2;
968a9643ea8Slogwang         goto EXIT_LABEL;
969a9643ea8Slogwang     }
970a9643ea8Slogwang 
971a9643ea8Slogwang     cost_time = MtFrame::Instance()->GetLastClock() - start_ms;
972a9643ea8Slogwang     time_left = (timeout > (int)cost_time) ? (timeout - (int)cost_time) : 0;
973a9643ea8Slogwang     rc = mt_tcp_check_recv(sock, rcv_buf, recv_pkg_size, 0, time_left, check_func, msg_ctx, keep_rcv_buf);
974a9643ea8Slogwang 
975a9643ea8Slogwang     if (rc < 0)
976a9643ea8Slogwang     {
977a9643ea8Slogwang         MTLOG_ERROR("socket[%d] rcv failed, ret[%d][%m]", sock, rc);
978a9643ea8Slogwang         ret = rc;
979a9643ea8Slogwang         goto EXIT_LABEL;
980a9643ea8Slogwang     }
981a9643ea8Slogwang 
982a9643ea8Slogwang     ret = 0;
983a9643ea8Slogwang 
984a9643ea8Slogwang EXIT_LABEL:
985a9643ea8Slogwang     if (sock >= 0)
986a9643ea8Slogwang         ::close(sock);
987a9643ea8Slogwang 
988a9643ea8Slogwang     return ret;
989a9643ea8Slogwang }
990a9643ea8Slogwang 
mt_tcpsendrcv_ex(struct sockaddr_in * dst,void * pkg,int len,void * & rcv_buf,int & rcv_pkg_size,int timeout,MtFuncTcpMsgChecker check_func,void * msg_ctx,MT_TCP_CONN_TYPE type,bool keep_rcv_buf)991a9643ea8Slogwang int mt_tcpsendrcv_ex(struct sockaddr_in* dst, void* pkg, int len, void*& rcv_buf, int& rcv_pkg_size,
992a9643ea8Slogwang                      int timeout, MtFuncTcpMsgChecker check_func, void* msg_ctx,
993a9643ea8Slogwang                      MT_TCP_CONN_TYPE type, bool keep_rcv_buf)
994a9643ea8Slogwang {
995a9643ea8Slogwang     if(!dst || !pkg || len<1)
996a9643ea8Slogwang     {
997a9643ea8Slogwang         MTLOG_ERROR("input params invalid, dst[%p], pkg[%p], len[%d], fun[%p], msg_ctx[%p], type[%d]",
998a9643ea8Slogwang                     dst, pkg, len, check_func, msg_ctx, type);
999a9643ea8Slogwang         return -10;
1000a9643ea8Slogwang     }
1001a9643ea8Slogwang 
1002a9643ea8Slogwang     switch (type)
1003a9643ea8Slogwang     {
1004a9643ea8Slogwang         case MT_TCP_LONG:
1005a9643ea8Slogwang         {
1006a9643ea8Slogwang             return mt_tcpsendrcv(dst, pkg, len, rcv_buf, rcv_pkg_size, timeout, check_func, msg_ctx, keep_rcv_buf);
1007a9643ea8Slogwang         }
1008a9643ea8Slogwang 
1009a9643ea8Slogwang         case MT_TCP_LONG_SNDONLY:
1010a9643ea8Slogwang         {
1011a9643ea8Slogwang             return mt_tcpsend(dst, pkg, len, timeout);
1012a9643ea8Slogwang         }
1013a9643ea8Slogwang 
1014a9643ea8Slogwang         case MT_TCP_SHORT:
1015a9643ea8Slogwang         {
1016a9643ea8Slogwang             return mt_tcpsendrcv_short(dst, pkg, len, rcv_buf, rcv_pkg_size, timeout, check_func, msg_ctx, keep_rcv_buf);
1017a9643ea8Slogwang         }
1018a9643ea8Slogwang 
1019a9643ea8Slogwang         case MT_TCP_SHORT_SNDONLY:
1020a9643ea8Slogwang         {
1021a9643ea8Slogwang             return mt_tcpsend_short(dst, pkg, len, timeout);
1022a9643ea8Slogwang         }
1023a9643ea8Slogwang 
1024a9643ea8Slogwang         default:
1025a9643ea8Slogwang         {
1026a9643ea8Slogwang             MTLOG_ERROR("input params invalid, dst[%p], pkg[%p], len[%d], fun[%p], msg_ctx[%p], type[%d]",
1027a9643ea8Slogwang                     dst, pkg, len, check_func, msg_ctx, type);
1028a9643ea8Slogwang             return -10;
1029a9643ea8Slogwang         }
1030a9643ea8Slogwang     }
1031a9643ea8Slogwang 
1032a9643ea8Slogwang     return 0;
1033a9643ea8Slogwang }
1034a9643ea8Slogwang 
1035a9643ea8Slogwang }
1036