1a9643ea8Slogwang 2a9643ea8Slogwang /** 3a9643ea8Slogwang * Tencent is pleased to support the open source community by making MSEC available. 4a9643ea8Slogwang * 5a9643ea8Slogwang * Copyright (C) 2016 THL A29 Limited, a Tencent company. All rights reserved. 6a9643ea8Slogwang * 7a9643ea8Slogwang * Licensed under the GNU General Public License, Version 2.0 (the "License"); 8a9643ea8Slogwang * you may not use this file except in compliance with the License. You may 9a9643ea8Slogwang * obtain a copy of the License at 10a9643ea8Slogwang * 11a9643ea8Slogwang * https://opensource.org/licenses/GPL-2.0 12a9643ea8Slogwang * 13a9643ea8Slogwang * Unless required by applicable law or agreed to in writing, software distributed under the 14a9643ea8Slogwang * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, 15a9643ea8Slogwang * either express or implied. See the License for the specific language governing permissions 16a9643ea8Slogwang * and limitations under the License. 17a9643ea8Slogwang */ 18a9643ea8Slogwang 19a9643ea8Slogwang 20a9643ea8Slogwang /** 21a9643ea8Slogwang * @filename mt_sys_call.cpp 22a9643ea8Slogwang * @info �̷߳�װϵͳapi, ͬ�������߳�API��ʵ���첽���� 23a9643ea8Slogwang */ 24a9643ea8Slogwang 25a9643ea8Slogwang #include "kqueue_proxy.h" 26a9643ea8Slogwang #include "micro_thread.h" 27a9643ea8Slogwang #include "mt_connection.h" 28a9643ea8Slogwang #include "mt_api.h" 29a9643ea8Slogwang #include "ff_api.h" 30a9643ea8Slogwang #include "mt_sys_hook.h" 31a9643ea8Slogwang 32a9643ea8Slogwang namespace NS_MICRO_THREAD { 33a9643ea8Slogwang 34a9643ea8Slogwang /** 35a9643ea8Slogwang * @brief ��������˿ڵ�socket�շ��ӿ�, ��socket������������, ҵ������֤������ 36a9643ea8Slogwang * @param dst -�����͵�Ŀ�ĵ�ַ 37a9643ea8Slogwang * @param pkg -�������װ�İ��� 38a9643ea8Slogwang * @param len -�������װ�İ��峤�� 39a9643ea8Slogwang * @param rcv_buf -����Ӧ�����buff 40a9643ea8Slogwang * @param buf_size -modify-����Ӧ�����buff��С, �ɹ�����ʱ, ��ΪӦ������� 41a9643ea8Slogwang * @param timeout -��ʱʱ��, ��λms 42a9643ea8Slogwang * @return 0 �ɹ�, -1 ��socketʧ��, -2 ��������ʧ��, -3 ����Ӧ��ʧ��, �ɴ�ӡerrno, -10 ������Ч 43a9643ea8Slogwang */ 44a9643ea8Slogwang int mt_udpsendrcv(struct sockaddr_in* dst, void* pkg, int len, void* rcv_buf, int& buf_size, int timeout) 45a9643ea8Slogwang { 46a9643ea8Slogwang int ret = 0; 47a9643ea8Slogwang int rc = 0; 48a9643ea8Slogwang int flags = 1; 49a9643ea8Slogwang struct sockaddr_in from_addr = {0}; 50a9643ea8Slogwang int addr_len = sizeof(from_addr); 51a9643ea8Slogwang 52a9643ea8Slogwang if(len<1 || buf_size<1 ||!dst || !pkg || !rcv_buf) 53a9643ea8Slogwang { 54a9643ea8Slogwang MTLOG_ERROR("mt_udpsendrcv input params invalid, dst[%p], pkg[%p], rcv_buf[%p], len[%d], buf_size[%d]", 55a9643ea8Slogwang dst, pkg, rcv_buf, len, buf_size); 56a9643ea8Slogwang return -10; 57a9643ea8Slogwang } 58a9643ea8Slogwang 59a9643ea8Slogwang int sock = socket(PF_INET, SOCK_DGRAM, 0); 60a9643ea8Slogwang if ((sock < 0) || (ioctl(sock, FIONBIO, &flags) < 0)) 61a9643ea8Slogwang { 62a9643ea8Slogwang MT_ATTR_API(320842, 1); // socketʧ�� 63a9643ea8Slogwang MTLOG_ERROR("mt_udpsendrcv new sock failed, sock: %d, errno: %d (%m)", sock, errno); 64a9643ea8Slogwang ret = -1; 65a9643ea8Slogwang goto EXIT_LABEL; 66a9643ea8Slogwang } 67a9643ea8Slogwang 68a9643ea8Slogwang rc = MtFrame::sendto(sock, pkg, len, 0, (struct sockaddr*)dst, (int)sizeof(*dst), timeout); 69a9643ea8Slogwang if (rc < 0) 70a9643ea8Slogwang { 71a9643ea8Slogwang MT_ATTR_API(320844, 1); // ����ʧ�� 72a9643ea8Slogwang MTLOG_ERROR("mt_udpsendrcv send failed, rc: %d, errno: %d (%m)", rc, errno); 73a9643ea8Slogwang ret = -2; 74a9643ea8Slogwang goto EXIT_LABEL; 75a9643ea8Slogwang } 76a9643ea8Slogwang 77a9643ea8Slogwang rc = MtFrame::recvfrom(sock, rcv_buf, buf_size, 0, (struct sockaddr*)&from_addr, (socklen_t*)&addr_len, timeout); 78a9643ea8Slogwang if (rc < 0) 79a9643ea8Slogwang { 80a9643ea8Slogwang MT_ATTR_API(320845, 1); // ����δ��ȫ�ɹ� 81a9643ea8Slogwang MTLOG_ERROR("mt_udpsendrcv recv failed, rc: %d, errno: %d (%m)", rc, errno); 82a9643ea8Slogwang ret = -3; 83a9643ea8Slogwang goto EXIT_LABEL; 84a9643ea8Slogwang } 85a9643ea8Slogwang buf_size = rc; 86a9643ea8Slogwang 87a9643ea8Slogwang EXIT_LABEL: 88a9643ea8Slogwang 89a9643ea8Slogwang if (sock > 0) 90a9643ea8Slogwang { 91a9643ea8Slogwang close(sock); 92a9643ea8Slogwang sock = -1; 93a9643ea8Slogwang } 94a9643ea8Slogwang 95a9643ea8Slogwang return ret; 96a9643ea8Slogwang } 97a9643ea8Slogwang 98a9643ea8Slogwang /** 99a9643ea8Slogwang * @brief ����TCP���֣�������Ϊ������ 100a9643ea8Slogwang * @return >=0 �ɹ�, <0 ʧ�� 101a9643ea8Slogwang */ 102a9643ea8Slogwang int mt_tcp_create_sock(void) 103a9643ea8Slogwang { 104a9643ea8Slogwang int fd; 105a9643ea8Slogwang int flag; 106a9643ea8Slogwang 107a9643ea8Slogwang // ����socket 108a9643ea8Slogwang fd = ::socket(AF_INET, SOCK_STREAM, 0); 109a9643ea8Slogwang if (fd < 0) 110a9643ea8Slogwang { 111a9643ea8Slogwang MTLOG_ERROR("create tcp socket failed, error: %m"); 112a9643ea8Slogwang return -1; 113a9643ea8Slogwang } 114a9643ea8Slogwang 115a9643ea8Slogwang // ����socket������ 116a9643ea8Slogwang flag = fcntl(fd, F_GETFL, 0); 117a9643ea8Slogwang if (flag == -1) 118a9643ea8Slogwang { 119a9643ea8Slogwang ::close(fd); 120a9643ea8Slogwang MTLOG_ERROR("get fd flags failed, error: %m"); 121a9643ea8Slogwang return -2; 122a9643ea8Slogwang } 123a9643ea8Slogwang 124a9643ea8Slogwang if (flag & O_NONBLOCK) 125a9643ea8Slogwang return fd; 126a9643ea8Slogwang 127a9643ea8Slogwang if (fcntl(fd, F_SETFL, flag | O_NONBLOCK | O_NDELAY) == -1) 128a9643ea8Slogwang { 129a9643ea8Slogwang ::close(fd); 130a9643ea8Slogwang MTLOG_ERROR("set fd flags failed, error: %m"); 131a9643ea8Slogwang return -3; 132a9643ea8Slogwang } 133a9643ea8Slogwang 134a9643ea8Slogwang return fd; 135a9643ea8Slogwang } 136a9643ea8Slogwang 137a9643ea8Slogwang /** 138a9643ea8Slogwang * @brief TCP��ȡ������֪ͨ������socket 139a9643ea8Slogwang */ 140a9643ea8Slogwang static TcpKeepConn* mt_tcp_get_keep_conn(struct sockaddr_in* dst, int& sock) 141a9643ea8Slogwang { 142a9643ea8Slogwang // 1. ��ȡ�߳�֪ͨע����� 143a9643ea8Slogwang KqueuerObj* ntfy_obj = NtfyObjMgr::Instance()->GetNtfyObj(NTFY_OBJ_THREAD, 0); 144a9643ea8Slogwang if (NULL == ntfy_obj) 145a9643ea8Slogwang { 146a9643ea8Slogwang MTLOG_ERROR("get notify failed, logit"); 147a9643ea8Slogwang return NULL; 148a9643ea8Slogwang } 149a9643ea8Slogwang 150a9643ea8Slogwang // 2. ��ȡ���Ӷ���, ����֪ͨ��Ϣ 151a9643ea8Slogwang TcpKeepConn* conn = dynamic_cast<TcpKeepConn*>(ConnectionMgr::Instance()->GetConnection(OBJ_TCP_KEEP, dst)); 152a9643ea8Slogwang if (NULL == conn) 153a9643ea8Slogwang { 154a9643ea8Slogwang MTLOG_ERROR("get connection failed, dst[%p]", dst); 155a9643ea8Slogwang NtfyObjMgr::Instance()->FreeNtfyObj(ntfy_obj); 156a9643ea8Slogwang return NULL; 157a9643ea8Slogwang } 158a9643ea8Slogwang conn->SetNtfyObj(ntfy_obj); 159a9643ea8Slogwang 160a9643ea8Slogwang // 3. ��������socket��� 161a9643ea8Slogwang int osfd = conn->CreateSocket(); 162a9643ea8Slogwang if (osfd < 0) 163a9643ea8Slogwang { 164a9643ea8Slogwang ConnectionMgr::Instance()->FreeConnection(conn, true); 165a9643ea8Slogwang MTLOG_ERROR("create socket failed, ret[%d]", osfd); 166a9643ea8Slogwang return NULL; 167a9643ea8Slogwang } 168a9643ea8Slogwang 169a9643ea8Slogwang // 4. �ɹ��������� 170a9643ea8Slogwang sock = osfd; 171a9643ea8Slogwang return conn; 172a9643ea8Slogwang } 173a9643ea8Slogwang 174a9643ea8Slogwang /** 175a9643ea8Slogwang * @brief TCPѭ������, ֱ������OK��ʱ 176a9643ea8Slogwang * [ע��] �����߲�Ҫ�����ĺ�������ֵ����֤��Ҫ��mt_tcpsendrcv�ȵ��ýӿڳ�ͻ [��Ҫ] 177a9643ea8Slogwang */ 178a9643ea8Slogwang static int mt_tcp_check_recv(int sock, char* rcv_buf, int &len, int flags, int timeout, MtFuncTcpMsgLen func) 179a9643ea8Slogwang { 180a9643ea8Slogwang int recv_len = 0; 181a9643ea8Slogwang utime64_t start_ms = MtFrame::Instance()->GetLastClock(); 182a9643ea8Slogwang do 183a9643ea8Slogwang { 184a9643ea8Slogwang utime64_t cost_time = MtFrame::Instance()->GetLastClock() - start_ms; 185a9643ea8Slogwang if (cost_time > (utime64_t)timeout) 186a9643ea8Slogwang { 187a9643ea8Slogwang errno = ETIME; 188a9643ea8Slogwang MTLOG_ERROR("tcp socket[%d] recv not ok, timeout", sock); 189a9643ea8Slogwang return -3; 190a9643ea8Slogwang } 191a9643ea8Slogwang 192a9643ea8Slogwang int rc = MtFrame::recv(sock, (rcv_buf + recv_len), (len - recv_len), 0, (timeout - (int)cost_time)); 193a9643ea8Slogwang if (rc < 0) 194a9643ea8Slogwang { 195a9643ea8Slogwang MTLOG_ERROR("tcp socket[%d] recv failed ret[%d][%m]", sock, rc); 196a9643ea8Slogwang return -3; 197a9643ea8Slogwang } 198a9643ea8Slogwang else if (rc == 0) 199a9643ea8Slogwang { 200a9643ea8Slogwang len = recv_len; 201a9643ea8Slogwang MTLOG_ERROR("tcp socket[%d] remote close", sock); 202a9643ea8Slogwang return -7; 203a9643ea8Slogwang } 204a9643ea8Slogwang recv_len += rc; 205a9643ea8Slogwang 206a9643ea8Slogwang /* ��鱨�������� */ 207a9643ea8Slogwang rc = func(rcv_buf, recv_len); 208a9643ea8Slogwang if (rc < 0) 209a9643ea8Slogwang { 210a9643ea8Slogwang MTLOG_ERROR("tcp socket[%d] user check pkg error[%d]", sock, rc); 211a9643ea8Slogwang return -5; 212a9643ea8Slogwang } 213a9643ea8Slogwang else if (rc == 0) // ����δ������ 214a9643ea8Slogwang { 215a9643ea8Slogwang if (len == recv_len) // û�ռ��ٽ�����, ���� 216a9643ea8Slogwang { 217a9643ea8Slogwang MTLOG_ERROR("tcp socket[%d] user check pkg not ok, but no more buff", sock); 218a9643ea8Slogwang return -6; 219a9643ea8Slogwang } 220a9643ea8Slogwang continue; 221a9643ea8Slogwang } 222a9643ea8Slogwang else // �ɹ����㱨�ij��� 223a9643ea8Slogwang { 224a9643ea8Slogwang if (rc > recv_len) // ���Ļ�δ��ȫ 225a9643ea8Slogwang { 226a9643ea8Slogwang continue; 227a9643ea8Slogwang } 228a9643ea8Slogwang else 229a9643ea8Slogwang { 230a9643ea8Slogwang len = rc; 231a9643ea8Slogwang break; 232a9643ea8Slogwang } 233a9643ea8Slogwang } 234a9643ea8Slogwang } while (true); 235a9643ea8Slogwang 236a9643ea8Slogwang return 0; 237a9643ea8Slogwang } 238a9643ea8Slogwang 239a9643ea8Slogwang /** 240a9643ea8Slogwang * @brief TCP��������ӳصķ�ʽ����IP/PORT����, ���ӱ���Ĭ��10���� 241a9643ea8Slogwang * [ע��] tcp���շ���buff, ��������static����, ����������Ĵ��� [��Ҫ] 242a9643ea8Slogwang * [ע��] �Ľӿڣ���ע�ⲻҪ����ķ���ֵ������֤��mt_tcpsendrcv_ex����ֵƥ�� [��Ҫ] 243a9643ea8Slogwang * @param dst -�����͵�Ŀ�ĵ�ַ 244a9643ea8Slogwang * @param pkg -�������װ�İ��� 245a9643ea8Slogwang * @param len -�������װ�İ��峤�� 246a9643ea8Slogwang * @param rcv_buf -����Ӧ�����buff 247a9643ea8Slogwang * @param buf_size -modify-����Ӧ�����buff��С, �ɹ�����ʱ, ��ΪӦ������� 248a9643ea8Slogwang * @param timeout -��ʱʱ��, ��λms 249a9643ea8Slogwang * @param check_func -��ⱨ���Ƿ�ɹ����ﺯ�� 250a9643ea8Slogwang * @return 0 �ɹ�, -1 ��socketʧ��, -2 ��������ʧ��, -3 ����Ӧ��ʧ��, 251a9643ea8Slogwang * -4 ����ʧ��, -5 ��ⱨ��ʧ��, -6 ���տռ䲻��, -7 ��������ر����ӣ�-10 ������Ч 252a9643ea8Slogwang */ 253a9643ea8Slogwang int mt_tcpsendrcv(struct sockaddr_in* dst, void* pkg, int len, void* rcv_buf, int& buf_size, int timeout, MtFuncTcpMsgLen func) 254a9643ea8Slogwang { 255a9643ea8Slogwang if (!dst || !pkg || !rcv_buf || !func || len<1 || buf_size<1) 256a9643ea8Slogwang { 257a9643ea8Slogwang MTLOG_ERROR("input params invalid, dst[%p], pkg[%p], rcv_buf[%p], fun[%p], len[%d], buf_size[%d]", 258a9643ea8Slogwang dst, pkg, rcv_buf, func, len, buf_size); 259a9643ea8Slogwang return -10; 260a9643ea8Slogwang } 261a9643ea8Slogwang 262a9643ea8Slogwang int ret = 0, rc = 0; 263a9643ea8Slogwang int addr_len = sizeof(struct sockaddr_in); 264a9643ea8Slogwang utime64_t start_ms = MtFrame::Instance()->GetLastClock(); 265a9643ea8Slogwang utime64_t cost_time = 0; 266a9643ea8Slogwang int time_left = timeout; 267a9643ea8Slogwang 268a9643ea8Slogwang // 1. ��ȡTCP���ӳض���, �ҽ�֪ͨ���� 269a9643ea8Slogwang int sock = -1; 270a9643ea8Slogwang TcpKeepConn* conn = mt_tcp_get_keep_conn(dst, sock); 271a9643ea8Slogwang if ((conn == NULL) || (sock < 0)) 272a9643ea8Slogwang { 273a9643ea8Slogwang MTLOG_ERROR("socket[%d] get conn failed, ret[%m]", sock); 274a9643ea8Slogwang ret = -1; 275a9643ea8Slogwang goto EXIT_LABEL; 276a9643ea8Slogwang } 277a9643ea8Slogwang 278a9643ea8Slogwang // 2. ���Լ����½����� 279a9643ea8Slogwang rc = MtFrame::connect(sock, (struct sockaddr *)dst, addr_len, time_left); 280a9643ea8Slogwang if (rc < 0) 281a9643ea8Slogwang { 282a9643ea8Slogwang MTLOG_ERROR("socket[%d] connect failed, ret[%d][%m]", sock, rc); 283a9643ea8Slogwang ret = -4; 284a9643ea8Slogwang goto EXIT_LABEL; 285a9643ea8Slogwang } 286a9643ea8Slogwang 287a9643ea8Slogwang // 3. �������ݴ��� 288a9643ea8Slogwang cost_time = MtFrame::Instance()->GetLastClock() - start_ms; 289a9643ea8Slogwang time_left = (timeout > (int)cost_time) ? (timeout - (int)cost_time) : 0; 290a9643ea8Slogwang rc = MtFrame::send(sock, pkg, len, 0, time_left); 291a9643ea8Slogwang if (rc < 0) 292a9643ea8Slogwang { 293a9643ea8Slogwang MTLOG_ERROR("socket[%d] send failed, ret[%d][%m]", sock, rc); 294a9643ea8Slogwang ret = -2; 295a9643ea8Slogwang goto EXIT_LABEL; 296a9643ea8Slogwang } 297a9643ea8Slogwang 298a9643ea8Slogwang // 4. �������ݴ��� 299a9643ea8Slogwang cost_time = MtFrame::Instance()->GetLastClock() - start_ms; 300a9643ea8Slogwang time_left = (timeout > (int)cost_time) ? (timeout - (int)cost_time) : 0; 301a9643ea8Slogwang rc = mt_tcp_check_recv(sock, (char*)rcv_buf, buf_size, 0, time_left, func); 302a9643ea8Slogwang if (rc < 0) 303a9643ea8Slogwang { 304a9643ea8Slogwang MTLOG_ERROR("socket[%d] rcv failed, ret[%d][%m]", sock, rc); 305a9643ea8Slogwang ret = rc; 306a9643ea8Slogwang goto EXIT_LABEL; 307a9643ea8Slogwang } 308a9643ea8Slogwang 309a9643ea8Slogwang ret = 0; 310a9643ea8Slogwang 311a9643ea8Slogwang EXIT_LABEL: 312a9643ea8Slogwang 313a9643ea8Slogwang // ʧ����ǿ���ͷ�����, ����ʱ���� 314a9643ea8Slogwang if (conn != NULL) 315a9643ea8Slogwang { 316a9643ea8Slogwang ConnectionMgr::Instance()->FreeConnection(conn, (ret < 0)); 317a9643ea8Slogwang } 318a9643ea8Slogwang 319a9643ea8Slogwang return ret; 320a9643ea8Slogwang } 321a9643ea8Slogwang 322a9643ea8Slogwang /** 323a9643ea8Slogwang * @brief TCP�������շ����� 324a9643ea8Slogwang * [ע��] tcp���շ���buff, ��������static����, ����������Ĵ��� [��Ҫ] 325a9643ea8Slogwang * [ע��] �Ľӿڣ���ע�ⲻҪ����ķ���ֵ������֤��mt_tcpsendrcv_ex����ֵƥ�� [��Ҫ] 326a9643ea8Slogwang * @param dst -�����͵�Ŀ�ĵ�ַ 327a9643ea8Slogwang * @param pkg -�������װ�İ��� 328a9643ea8Slogwang * @param len -�������װ�İ��峤�� 329a9643ea8Slogwang * @param rcv_buf -����Ӧ�����buff 330a9643ea8Slogwang * @param buf_size -modify-����Ӧ�����buff��С, �ɹ�����ʱ, ��ΪӦ������� 331a9643ea8Slogwang * @param timeout -��ʱʱ��, ��λms 332a9643ea8Slogwang * @param check_func -��ⱨ���Ƿ�ɹ����ﺯ�� 333a9643ea8Slogwang * @return 0 �ɹ�, -1 ��socketʧ��, -2 ��������ʧ��, -3 ����Ӧ��ʧ��, 334a9643ea8Slogwang * -4 ����ʧ��, -5 ��ⱨ��ʧ��, -6 ���տռ䲻��, -7 ��������ر����ӣ�-10 ������Ч 335a9643ea8Slogwang */ 336a9643ea8Slogwang int mt_tcpsendrcv_short(struct sockaddr_in* dst, void* pkg, int len, void* rcv_buf, int& buf_size, int timeout, MtFuncTcpMsgLen func) 337a9643ea8Slogwang { 338a9643ea8Slogwang int ret = 0, rc = 0; 339a9643ea8Slogwang int addr_len = sizeof(struct sockaddr_in); 340a9643ea8Slogwang utime64_t start_ms = MtFrame::Instance()->GetLastClock(); 341a9643ea8Slogwang utime64_t cost_time = 0; 342a9643ea8Slogwang int time_left = timeout; 343a9643ea8Slogwang 344a9643ea8Slogwang // 1. ������� 345a9643ea8Slogwang if (!dst || !pkg || !rcv_buf || !func || len<1 || buf_size<1) 346a9643ea8Slogwang { 347a9643ea8Slogwang MTLOG_ERROR("input params invalid, dst[%p], pkg[%p], rcv_buf[%p], fun[%p], len[%d], buf_size[%d]", 348a9643ea8Slogwang dst, pkg, rcv_buf, func, len, buf_size); 349a9643ea8Slogwang return -10; 350a9643ea8Slogwang } 351a9643ea8Slogwang 352a9643ea8Slogwang // 2. ����TCP socket 353a9643ea8Slogwang int sock; 354a9643ea8Slogwang sock = mt_tcp_create_sock(); 355a9643ea8Slogwang if (sock < 0) 356a9643ea8Slogwang { 357a9643ea8Slogwang MTLOG_ERROR("create tcp socket failed, ret: %d", sock); 358a9643ea8Slogwang return -1; 359a9643ea8Slogwang } 360a9643ea8Slogwang 361a9643ea8Slogwang // 3. ���Լ����½����� 362a9643ea8Slogwang rc = MtFrame::connect(sock, (struct sockaddr *)dst, addr_len, time_left); 363a9643ea8Slogwang if (rc < 0) 364a9643ea8Slogwang { 365a9643ea8Slogwang MTLOG_ERROR("socket[%d] connect failed, ret[%d][%m]", sock, rc); 366a9643ea8Slogwang ret = -4; 367a9643ea8Slogwang goto EXIT_LABEL; 368a9643ea8Slogwang } 369a9643ea8Slogwang 370a9643ea8Slogwang // 4. �������ݴ��� 371a9643ea8Slogwang cost_time = MtFrame::Instance()->GetLastClock() - start_ms; 372a9643ea8Slogwang time_left = (timeout > (int)cost_time) ? (timeout - (int)cost_time) : 0; 373a9643ea8Slogwang rc = MtFrame::send(sock, pkg, len, 0, time_left); 374a9643ea8Slogwang if (rc < 0) 375a9643ea8Slogwang { 376a9643ea8Slogwang MTLOG_ERROR("socket[%d] send failed, ret[%d][%m]", sock, rc); 377a9643ea8Slogwang ret = -2; 378a9643ea8Slogwang goto EXIT_LABEL; 379a9643ea8Slogwang } 380a9643ea8Slogwang 381a9643ea8Slogwang // 5. �������ݴ��� 382a9643ea8Slogwang cost_time = MtFrame::Instance()->GetLastClock() - start_ms; 383a9643ea8Slogwang time_left = (timeout > (int)cost_time) ? (timeout - (int)cost_time) : 0; 384a9643ea8Slogwang rc = mt_tcp_check_recv(sock, (char*)rcv_buf, buf_size, 0, time_left, func); 385a9643ea8Slogwang if (rc < 0) 386a9643ea8Slogwang { 387a9643ea8Slogwang MTLOG_ERROR("socket[%d] rcv failed, ret[%d][%m]", sock, rc); 388a9643ea8Slogwang ret = rc; 389a9643ea8Slogwang goto EXIT_LABEL; 390a9643ea8Slogwang } 391a9643ea8Slogwang 392a9643ea8Slogwang ret = 0; 393a9643ea8Slogwang 394a9643ea8Slogwang EXIT_LABEL: 395a9643ea8Slogwang if (sock >= 0) 396a9643ea8Slogwang ::close(sock); 397a9643ea8Slogwang 398a9643ea8Slogwang return ret; 399a9643ea8Slogwang } 400a9643ea8Slogwang 401a9643ea8Slogwang 402a9643ea8Slogwang /** 403a9643ea8Slogwang * @brief TCP��������ӳصķ�ʽ����IP/PORT����, ���ӱ���Ĭ��10���� 404a9643ea8Slogwang * [ע��] tcp����buff, ��������static����, ����������Ĵ��� [��Ҫ] 405a9643ea8Slogwang * [ע��] �Ľӿڣ���ע�ⲻҪ����ķ���ֵ������֤��mt_tcpsendrcv_ex����ֵƥ�� [��Ҫ] 406a9643ea8Slogwang * @param dst -�����͵�Ŀ�ĵ�ַ 407a9643ea8Slogwang * @param pkg -�������װ�İ��� 408a9643ea8Slogwang * @param len -�������װ�İ��峤�� 409a9643ea8Slogwang * @param timeout -��ʱʱ��, ��λms 410a9643ea8Slogwang * @return 0 �ɹ�, -1 ��socketʧ��, -2 ��������ʧ��, -4 ����ʧ��, -10 ������Ч 411a9643ea8Slogwang */ 412a9643ea8Slogwang int mt_tcpsend(struct sockaddr_in* dst, void* pkg, int len, int timeout) 413a9643ea8Slogwang { 414a9643ea8Slogwang if (!dst || !pkg || len<1) 415a9643ea8Slogwang { 416a9643ea8Slogwang MTLOG_ERROR("input params invalid, dst[%p], pkg[%p], len[%d]", dst, pkg, len); 417a9643ea8Slogwang return -10; 418a9643ea8Slogwang } 419a9643ea8Slogwang 420a9643ea8Slogwang int ret = 0, rc = 0; 421a9643ea8Slogwang int addr_len = sizeof(struct sockaddr_in); 422a9643ea8Slogwang utime64_t start_ms = MtFrame::Instance()->GetLastClock(); 423a9643ea8Slogwang utime64_t cost_time = 0; 424a9643ea8Slogwang int time_left = timeout; 425a9643ea8Slogwang 426a9643ea8Slogwang // 1. ��ȡTCP���ӳض���, �ҽ�֪ͨ���� 427a9643ea8Slogwang int sock = -1; 428a9643ea8Slogwang TcpKeepConn* conn = mt_tcp_get_keep_conn(dst, sock); 429a9643ea8Slogwang if ((conn == NULL) || (sock < 0)) 430a9643ea8Slogwang { 431a9643ea8Slogwang MTLOG_ERROR("socket[%d] get conn failed, ret[%m]", sock); 432a9643ea8Slogwang ret = -1; 433a9643ea8Slogwang goto EXIT_LABEL; 434a9643ea8Slogwang } 435a9643ea8Slogwang 436a9643ea8Slogwang // 2. ���Լ����½����� 437a9643ea8Slogwang rc = MtFrame::connect(sock, (struct sockaddr *)dst, addr_len, time_left); 438a9643ea8Slogwang if (rc < 0) 439a9643ea8Slogwang { 440a9643ea8Slogwang MTLOG_ERROR("socket[%d] connect failed, ret[%d][%m]", sock, rc); 441a9643ea8Slogwang ret = -4; 442a9643ea8Slogwang goto EXIT_LABEL; 443a9643ea8Slogwang } 444a9643ea8Slogwang 445a9643ea8Slogwang // 3. �������ݴ��� 446a9643ea8Slogwang cost_time = MtFrame::Instance()->GetLastClock() - start_ms; 447a9643ea8Slogwang time_left = (timeout > (int)cost_time) ? (timeout - (int)cost_time) : 0; 448a9643ea8Slogwang rc = MtFrame::send(sock, pkg, len, 0, time_left); 449a9643ea8Slogwang if (rc < 0) 450a9643ea8Slogwang { 451a9643ea8Slogwang MTLOG_ERROR("socket[%d] send failed, ret[%d][%m]", sock, rc); 452a9643ea8Slogwang ret = -2; 453a9643ea8Slogwang goto EXIT_LABEL; 454a9643ea8Slogwang } 455a9643ea8Slogwang 456a9643ea8Slogwang ret = 0; 457a9643ea8Slogwang 458a9643ea8Slogwang EXIT_LABEL: 459a9643ea8Slogwang 460a9643ea8Slogwang // ʧ����ǿ���ͷ�����, ����ʱ���� 461a9643ea8Slogwang if (conn != NULL) 462a9643ea8Slogwang { 463a9643ea8Slogwang ConnectionMgr::Instance()->FreeConnection(conn, (ret < 0)); 464a9643ea8Slogwang } 465a9643ea8Slogwang 466a9643ea8Slogwang return ret; 467a9643ea8Slogwang } 468a9643ea8Slogwang 469a9643ea8Slogwang /** 470a9643ea8Slogwang * @brief TCP������ֻ�����սӿ� 471a9643ea8Slogwang * [ע��] tcp����buff, ��������static����, ����������Ĵ��� [��Ҫ] 472a9643ea8Slogwang * [ע��] �Ľӿڣ���ע�ⲻҪ����ķ���ֵ������֤��mt_tcpsendrcv_ex����ֵƥ�� [��Ҫ] 473a9643ea8Slogwang * @param dst -�����͵�Ŀ�ĵ�ַ 474a9643ea8Slogwang * @param pkg -�������װ�İ��� 475a9643ea8Slogwang * @param len -�������װ�İ��峤�� 476a9643ea8Slogwang * @param timeout -��ʱʱ��, ��λms 477a9643ea8Slogwang * @return 0 �ɹ�, -1 ��socketʧ��, -2 ��������ʧ��, -4 ����ʧ��, -10 ������Ч 478a9643ea8Slogwang */ 479a9643ea8Slogwang int mt_tcpsend_short(struct sockaddr_in* dst, void* pkg, int len, int timeout) 480a9643ea8Slogwang { 481a9643ea8Slogwang // 1. ��μ�� 482a9643ea8Slogwang if (!dst || !pkg || len<1) 483a9643ea8Slogwang { 484a9643ea8Slogwang MTLOG_ERROR("input params invalid, dst[%p], pkg[%p], len[%d]", dst, pkg, len); 485a9643ea8Slogwang return -10; 486a9643ea8Slogwang } 487a9643ea8Slogwang 488a9643ea8Slogwang int ret = 0, rc = 0; 489a9643ea8Slogwang int addr_len = sizeof(struct sockaddr_in); 490a9643ea8Slogwang utime64_t start_ms = MtFrame::Instance()->GetLastClock(); 491a9643ea8Slogwang utime64_t cost_time = 0; 492a9643ea8Slogwang int time_left = timeout; 493a9643ea8Slogwang 494a9643ea8Slogwang // 2. ����TCP socket 495a9643ea8Slogwang int sock = -1; 496a9643ea8Slogwang sock = mt_tcp_create_sock(); 497a9643ea8Slogwang if (sock < 0) 498a9643ea8Slogwang { 499a9643ea8Slogwang MTLOG_ERROR("create tcp socket failed, ret: %d", sock); 500a9643ea8Slogwang ret = -1; 501a9643ea8Slogwang goto EXIT_LABEL; 502a9643ea8Slogwang } 503a9643ea8Slogwang 504a9643ea8Slogwang // 2. ���Լ����½����� 505a9643ea8Slogwang rc = MtFrame::connect(sock, (struct sockaddr *)dst, addr_len, time_left); 506a9643ea8Slogwang if (rc < 0) 507a9643ea8Slogwang { 508a9643ea8Slogwang MTLOG_ERROR("socket[%d] connect failed, ret[%d][%m]", sock, rc); 509a9643ea8Slogwang ret = -4; 510a9643ea8Slogwang goto EXIT_LABEL; 511a9643ea8Slogwang } 512a9643ea8Slogwang 513a9643ea8Slogwang // 3. �������ݴ��� 514a9643ea8Slogwang cost_time = MtFrame::Instance()->GetLastClock() - start_ms; 515a9643ea8Slogwang time_left = (timeout > (int)cost_time) ? (timeout - (int)cost_time) : 0; 516a9643ea8Slogwang rc = MtFrame::send(sock, pkg, len, 0, time_left); 517a9643ea8Slogwang if (rc < 0) 518a9643ea8Slogwang { 519a9643ea8Slogwang MTLOG_ERROR("socket[%d] send failed, ret[%d][%m]", sock, rc); 520a9643ea8Slogwang ret = -2; 521a9643ea8Slogwang goto EXIT_LABEL; 522a9643ea8Slogwang } 523a9643ea8Slogwang 524a9643ea8Slogwang ret = 0; 525a9643ea8Slogwang 526a9643ea8Slogwang EXIT_LABEL: 527a9643ea8Slogwang 528a9643ea8Slogwang if (sock >= 0) 529a9643ea8Slogwang ::close(sock); 530a9643ea8Slogwang 531a9643ea8Slogwang return ret; 532a9643ea8Slogwang } 533a9643ea8Slogwang 534a9643ea8Slogwang 535a9643ea8Slogwang /** 536a9643ea8Slogwang * @brief TCP�շ��ӿڣ�����ѡ���˱������ӻ��߶����� 537a9643ea8Slogwang * [ע��] tcp���շ���buff, ��������static����, ����������Ĵ��� [��Ҫ] 538a9643ea8Slogwang * @param dst -�����͵�Ŀ�ĵ�ַ 539a9643ea8Slogwang * @param pkg -�������װ�İ��� 540a9643ea8Slogwang * @param len -�������װ�İ��峤�� 541a9643ea8Slogwang * @param rcv_buf -����Ӧ�����buff��ֻ�����տ�������ΪNULL 542a9643ea8Slogwang * @param buf_size -modify-����Ӧ�����buff��С, �ɹ�����ʱ, ��ΪӦ������ȣ�ֻ�����գ�����ΪNULL 543a9643ea8Slogwang * @param timeout -��ʱʱ��, ��λms 544a9643ea8Slogwang * @param check_func -��ⱨ���Ƿ�ɹ����ﺯ�� 545a9643ea8Slogwang * @param type - �������� 546a9643ea8Slogwang * MT_TCP_SHORT: һ��һ�������ӣ� 547a9643ea8Slogwang * MT_TCP_LONG : һ��һ�ճ����ӣ� 548a9643ea8Slogwang * MT_TCP_LONG_SNDONLY : ֻ�����ճ����ӣ� 549a9643ea8Slogwang * MT_TCP_SHORT_SNDONLY: ֻ�����ն����ӣ� 550a9643ea8Slogwang * @return 0 �ɹ�, -1 ��socketʧ��, -2 ��������ʧ��, -3 ����Ӧ��ʧ��, 551a9643ea8Slogwang * -4 ����ʧ��, -5 ��ⱨ��ʧ��, -6 ���տռ䲻��, -7 ��������ر�����, -10 ������Ч 552a9643ea8Slogwang */ 553a9643ea8Slogwang int mt_tcpsendrcv_ex(struct sockaddr_in* dst, void* pkg, int len, void* rcv_buf, int* buf_size, int timeout, MtFuncTcpMsgLen func, MT_TCP_CONN_TYPE type) 554a9643ea8Slogwang { 555a9643ea8Slogwang if(!dst || !pkg || len<1) 556a9643ea8Slogwang { 557a9643ea8Slogwang MTLOG_ERROR("input params invalid, dst[%p], pkg[%p], rcv_buf[%p], fun[%p], len[%d], buf_size[%p]type[%d]", 558a9643ea8Slogwang dst, pkg, rcv_buf, func, len, buf_size,type); 559a9643ea8Slogwang return -10; 560a9643ea8Slogwang } 561a9643ea8Slogwang 562a9643ea8Slogwang switch (type) 563a9643ea8Slogwang { 564a9643ea8Slogwang // TCP�����ӵ������� 565a9643ea8Slogwang case MT_TCP_LONG: 566a9643ea8Slogwang { 567a9643ea8Slogwang return mt_tcpsendrcv(dst, pkg, len, rcv_buf, *buf_size, timeout, func); 568a9643ea8Slogwang } 569a9643ea8Slogwang 570a9643ea8Slogwang // TCP������ֻ������ 571a9643ea8Slogwang case MT_TCP_LONG_SNDONLY: 572a9643ea8Slogwang { 573a9643ea8Slogwang return mt_tcpsend(dst, pkg, len, timeout); 574a9643ea8Slogwang } 575a9643ea8Slogwang 576a9643ea8Slogwang // TCP�����ӵ������� 577a9643ea8Slogwang case MT_TCP_SHORT: 578a9643ea8Slogwang { 579a9643ea8Slogwang return mt_tcpsendrcv_short(dst, pkg, len, rcv_buf, *buf_size, timeout, func); 580a9643ea8Slogwang } 581a9643ea8Slogwang 582a9643ea8Slogwang // TCP������ֻ������ 583a9643ea8Slogwang case MT_TCP_SHORT_SNDONLY: 584a9643ea8Slogwang { 585a9643ea8Slogwang return mt_tcpsend_short(dst, pkg, len, timeout); 586a9643ea8Slogwang } 587a9643ea8Slogwang 588a9643ea8Slogwang default: 589a9643ea8Slogwang { 590a9643ea8Slogwang MTLOG_ERROR("input params invalid, dst[%p], pkg[%p], rcv_buf[%p], fun[%p], len[%d], buf_size[%p]type[%d]", 591a9643ea8Slogwang dst, pkg, rcv_buf, func, len, buf_size,type); 592a9643ea8Slogwang return -10; 593a9643ea8Slogwang } 594a9643ea8Slogwang } 595a9643ea8Slogwang 596a9643ea8Slogwang return 0; 597a9643ea8Slogwang } 598a9643ea8Slogwang 599a9643ea8Slogwang 600a9643ea8Slogwang 601a9643ea8Slogwang /** 602a9643ea8Slogwang * @brief ������ص��������� 603a9643ea8Slogwang */ 604a9643ea8Slogwang static void mt_task_process(void* arg) 605a9643ea8Slogwang { 606a9643ea8Slogwang int rc = 0; 607a9643ea8Slogwang IMtTask* task = (IMtTask*)arg; 608a9643ea8Slogwang if (!task) 609a9643ea8Slogwang { 610a9643ea8Slogwang MTLOG_ERROR("Invalid arg, error"); 611a9643ea8Slogwang return; 612a9643ea8Slogwang } 613a9643ea8Slogwang 614a9643ea8Slogwang rc = task->Process(); 615a9643ea8Slogwang if (rc != 0) 616a9643ea8Slogwang { 617a9643ea8Slogwang MTLOG_DEBUG("task process failed(%d), log", rc); 618a9643ea8Slogwang } 619a9643ea8Slogwang 620a9643ea8Slogwang task->SetResult(rc); 621a9643ea8Slogwang 622a9643ea8Slogwang return; 623a9643ea8Slogwang }; 624a9643ea8Slogwang 625a9643ea8Slogwang /** 626a9643ea8Slogwang * @brief ��·IO�Ĵ���, ��������̹߳��� 627a9643ea8Slogwang * @param req_list - �����б� 628a9643ea8Slogwang * @return 0 �ɹ�, <0ʧ�� 629a9643ea8Slogwang */ 630a9643ea8Slogwang int mt_exec_all_task(IMtTaskList& req_list) 631a9643ea8Slogwang { 632a9643ea8Slogwang MtFrame* mtframe = MtFrame::Instance(); 633a9643ea8Slogwang MicroThread* thread = mtframe->GetActiveThread(); 634a9643ea8Slogwang IMtTask* task = NULL; 635a9643ea8Slogwang MicroThread* sub = NULL; 636a9643ea8Slogwang MicroThread* tmp = NULL; 637a9643ea8Slogwang int rc = -1; 638a9643ea8Slogwang 639a9643ea8Slogwang MicroThread::SubThreadList list; 640a9643ea8Slogwang TAILQ_INIT(&list); 641a9643ea8Slogwang 642a9643ea8Slogwang // ��ֹû��task�������߳�һֱ����ס 643a9643ea8Slogwang if (0 == req_list.size()) 644a9643ea8Slogwang { 645a9643ea8Slogwang MTLOG_DEBUG("no task for execult"); 646a9643ea8Slogwang return 0; 647a9643ea8Slogwang } 648a9643ea8Slogwang 649a9643ea8Slogwang // 1. �����̶߳��� 650a9643ea8Slogwang for (IMtTaskList::iterator it = req_list.begin(); it != req_list.end(); ++it) 651a9643ea8Slogwang { 652a9643ea8Slogwang task = *it; 653a9643ea8Slogwang sub = MtFrame::CreateThread(mt_task_process, task, false); 654a9643ea8Slogwang if (NULL == sub) 655a9643ea8Slogwang { 656a9643ea8Slogwang MTLOG_ERROR("create sub thread failed"); 657a9643ea8Slogwang goto EXIT_LABEL; 658a9643ea8Slogwang } 659a9643ea8Slogwang 660a9643ea8Slogwang sub->SetType(MicroThread::SUB_THREAD); 661a9643ea8Slogwang TAILQ_INSERT_TAIL(&list, sub, _sub_entry); 662a9643ea8Slogwang } 663a9643ea8Slogwang 664a9643ea8Slogwang // 2. ����ִ������ 665a9643ea8Slogwang TAILQ_FOREACH_SAFE(sub, &list, _sub_entry, tmp) 666a9643ea8Slogwang { 667a9643ea8Slogwang TAILQ_REMOVE(&list, sub, _sub_entry); 668a9643ea8Slogwang thread->AddSubThread(sub); 669a9643ea8Slogwang mtframe->InsertRunable(sub); 670a9643ea8Slogwang } 671a9643ea8Slogwang 672a9643ea8Slogwang // 3. �ȴ����߳�ִ�н��� 673a9643ea8Slogwang thread->Wait(); 674a9643ea8Slogwang rc = 0; 675a9643ea8Slogwang 676a9643ea8Slogwang EXIT_LABEL: 677a9643ea8Slogwang 678a9643ea8Slogwang TAILQ_FOREACH_SAFE(sub, &list, _sub_entry, tmp) 679a9643ea8Slogwang { 680a9643ea8Slogwang TAILQ_REMOVE(&list, sub, _sub_entry); 681a9643ea8Slogwang mtframe->FreeThread(sub); 682a9643ea8Slogwang } 683a9643ea8Slogwang 684a9643ea8Slogwang return rc; 685a9643ea8Slogwang 686a9643ea8Slogwang } 687a9643ea8Slogwang 688a9643ea8Slogwang /** 689a9643ea8Slogwang * @brief ���õ�ǰIMtMsg��˽�б��� 690a9643ea8Slogwang * @info ֻ����ָ�룬�ڴ���Ҫҵ����� 691a9643ea8Slogwang */ 692a9643ea8Slogwang void mt_set_msg_private(void *data) 693a9643ea8Slogwang { 694a9643ea8Slogwang MicroThread *msg_thread = MtFrame::Instance()->GetRootThread(); 695a9643ea8Slogwang if (msg_thread != NULL) 696a9643ea8Slogwang msg_thread->SetPrivate(data); 697a9643ea8Slogwang } 698a9643ea8Slogwang 699a9643ea8Slogwang /** 700a9643ea8Slogwang * @brief ��ȡ��ǰIMtMsg��˽�б��� 701a9643ea8Slogwang * @return ˽�б���ָ�� 702a9643ea8Slogwang */ 703a9643ea8Slogwang void* mt_get_msg_private() 704a9643ea8Slogwang { 705a9643ea8Slogwang MicroThread *msg_thread = MtFrame::Instance()->GetRootThread(); 706a9643ea8Slogwang if (NULL == msg_thread) 707a9643ea8Slogwang { 708a9643ea8Slogwang return NULL; 709a9643ea8Slogwang } 710a9643ea8Slogwang 711a9643ea8Slogwang return msg_thread->GetPrivate(); 712a9643ea8Slogwang } 713a9643ea8Slogwang 714a9643ea8Slogwang /** 715a9643ea8Slogwang * @brief �߳̿�ܳ�ʼ�� 716a9643ea8Slogwang * @info ҵ��ʹ��spp�������̣߳���Ҫ���øó�ʼ������ 717a9643ea8Slogwang * @return false:��ʼ��ʧ�� true:��ʼ���ɹ� 718a9643ea8Slogwang */ 719*a02c88d6Slogwang bool mt_init_frame(int argc, char * const argv[]) 720a9643ea8Slogwang { 721*a02c88d6Slogwang if (argc) { 722*a02c88d6Slogwang ff_init(argc, argv); 723a9643ea8Slogwang ff_set_hook_flag(); 724a9643ea8Slogwang } 725a9643ea8Slogwang memset(&g_mt_syscall_tab, 0, sizeof(g_mt_syscall_tab)); 726a9643ea8Slogwang return MtFrame::Instance()->InitFrame(); 727a9643ea8Slogwang } 728a9643ea8Slogwang 729a9643ea8Slogwang /** 730a9643ea8Slogwang * @brief �����̶߳���ջ�ռ��С 731a9643ea8Slogwang * @info �DZ������ã�Ĭ�ϴ�СΪ128K 732a9643ea8Slogwang */ 733a9643ea8Slogwang void mt_set_stack_size(unsigned int bytes) 734a9643ea8Slogwang { 735a9643ea8Slogwang ThreadPool::SetDefaultStackSize(bytes); 736a9643ea8Slogwang } 737a9643ea8Slogwang 738a9643ea8Slogwang /** 739a9643ea8Slogwang * @brief �̰߳�����ϵͳIO���� recvfrom 740a9643ea8Slogwang * @param fd ϵͳsocket��Ϣ 741a9643ea8Slogwang * @param buf ������Ϣ������ָ�� 742a9643ea8Slogwang * @param len ������Ϣ���������� 743a9643ea8Slogwang * @param from ��Դ��ַ��ָ�� 744a9643ea8Slogwang * @param fromlen ��Դ��ַ�Ľṹ���� 745a9643ea8Slogwang * @param timeout ��ȴ�ʱ��, ���� 746a9643ea8Slogwang * @return >0 �ɹ����ճ���, <0 ʧ�� 747a9643ea8Slogwang */ 748a9643ea8Slogwang int mt_recvfrom(int fd, void *buf, int len, int flags, struct sockaddr *from, socklen_t *fromlen, int timeout) 749a9643ea8Slogwang { 750a9643ea8Slogwang return MtFrame::recvfrom(fd, buf, len, flags, from, fromlen, timeout); 751a9643ea8Slogwang } 752a9643ea8Slogwang 753a9643ea8Slogwang /** 754a9643ea8Slogwang * @brief �̰߳�����ϵͳIO���� sendto 755a9643ea8Slogwang * @param fd ϵͳsocket��Ϣ 756a9643ea8Slogwang * @param msg �����͵���Ϣָ�� 757a9643ea8Slogwang * @param len �����͵���Ϣ���� 758a9643ea8Slogwang * @param to Ŀ�ĵ�ַ��ָ�� 759a9643ea8Slogwang * @param tolen Ŀ�ĵ�ַ�Ľṹ���� 760a9643ea8Slogwang * @param timeout ��ȴ�ʱ��, ���� 761a9643ea8Slogwang * @return >0 �ɹ����ͳ���, <0 ʧ�� 762a9643ea8Slogwang */ 763a9643ea8Slogwang int mt_sendto(int fd, const void *msg, int len, int flags, const struct sockaddr *to, int tolen, int timeout) 764a9643ea8Slogwang { 765a9643ea8Slogwang return MtFrame::sendto(fd, msg, len, flags, to, tolen, timeout); 766a9643ea8Slogwang } 767a9643ea8Slogwang 768a9643ea8Slogwang /** 769a9643ea8Slogwang * @brief �̰߳�����ϵͳIO���� connect 770a9643ea8Slogwang * @param fd ϵͳsocket��Ϣ 771a9643ea8Slogwang * @param addr ָ��server��Ŀ�ĵ�ַ 772a9643ea8Slogwang * @param addrlen ��ַ�ij��� 773a9643ea8Slogwang * @param timeout ��ȴ�ʱ��, ���� 774a9643ea8Slogwang * @return >0 �ɹ����ͳ���, <0 ʧ�� 775a9643ea8Slogwang */ 776a9643ea8Slogwang int mt_connect(int fd, const struct sockaddr *addr, int addrlen, int timeout) 777a9643ea8Slogwang { 778a9643ea8Slogwang return MtFrame::connect(fd, addr, addrlen, timeout); 779a9643ea8Slogwang } 780a9643ea8Slogwang 781a9643ea8Slogwang /** 782a9643ea8Slogwang * @brief �̰߳�����ϵͳIO���� accept 783a9643ea8Slogwang * @param fd �������� 784a9643ea8Slogwang * @param addr �ͻ��˵�ַ 785a9643ea8Slogwang * @param addrlen ��ַ�ij��� 786a9643ea8Slogwang * @param timeout ��ȴ�ʱ��, ���� 787a9643ea8Slogwang * @return >=0 accept��socket������, <0 ʧ�� 788a9643ea8Slogwang */ 789a9643ea8Slogwang int mt_accept(int fd, struct sockaddr *addr, socklen_t *addrlen, int timeout) 790a9643ea8Slogwang { 791a9643ea8Slogwang return MtFrame::accept(fd, addr, addrlen, timeout); 792a9643ea8Slogwang } 793a9643ea8Slogwang 794a9643ea8Slogwang 795a9643ea8Slogwang /** 796a9643ea8Slogwang * @brief �̰߳�����ϵͳIO���� read 797a9643ea8Slogwang * @param fd ϵͳsocket��Ϣ 798a9643ea8Slogwang * @param buf ������Ϣ������ָ�� 799a9643ea8Slogwang * @param nbyte ������Ϣ���������� 800a9643ea8Slogwang * @param timeout ��ȴ�ʱ��, ���� 801a9643ea8Slogwang * @return >0 �ɹ����ճ���, <0 ʧ�� 802a9643ea8Slogwang */ 803a9643ea8Slogwang ssize_t mt_read(int fd, void *buf, size_t nbyte, int timeout) 804a9643ea8Slogwang { 805a9643ea8Slogwang return MtFrame::read(fd, buf, nbyte, timeout); 806a9643ea8Slogwang } 807a9643ea8Slogwang 808a9643ea8Slogwang /** 809a9643ea8Slogwang * @brief �̰߳�����ϵͳIO���� write 810a9643ea8Slogwang * @param fd ϵͳsocket��Ϣ 811a9643ea8Slogwang * @param buf �����͵���Ϣָ�� 812a9643ea8Slogwang * @param nbyte �����͵���Ϣ���� 813a9643ea8Slogwang * @param timeout ��ȴ�ʱ��, ���� 814a9643ea8Slogwang * @return >0 �ɹ����ͳ���, <0 ʧ�� 815a9643ea8Slogwang */ 816a9643ea8Slogwang ssize_t mt_write(int fd, const void *buf, size_t nbyte, int timeout) 817a9643ea8Slogwang { 818a9643ea8Slogwang return MtFrame::write(fd, buf, nbyte, timeout); 819a9643ea8Slogwang } 820a9643ea8Slogwang 821a9643ea8Slogwang /** 822a9643ea8Slogwang * @brief �̰߳�����ϵͳIO���� recv 823a9643ea8Slogwang * @param fd ϵͳsocket��Ϣ 824a9643ea8Slogwang * @param buf ������Ϣ������ָ�� 825a9643ea8Slogwang * @param len ������Ϣ���������� 826a9643ea8Slogwang * @param timeout ��ȴ�ʱ��, ���� 827a9643ea8Slogwang * @return >0 �ɹ����ճ���, <0 ʧ�� 828a9643ea8Slogwang */ 829a9643ea8Slogwang ssize_t mt_recv(int fd, void *buf, int len, int flags, int timeout) 830a9643ea8Slogwang { 831a9643ea8Slogwang return MtFrame::recv(fd, buf, len, flags, timeout); 832a9643ea8Slogwang } 833a9643ea8Slogwang 834a9643ea8Slogwang /** 835a9643ea8Slogwang * @brief �̰߳�����ϵͳIO���� send 836a9643ea8Slogwang * @param fd ϵͳsocket��Ϣ 837a9643ea8Slogwang * @param buf �����͵���Ϣָ�� 838a9643ea8Slogwang * @param nbyte �����͵���Ϣ���� 839a9643ea8Slogwang * @param timeout ��ȴ�ʱ��, ���� 840a9643ea8Slogwang * @return >0 �ɹ����ͳ���, <0 ʧ�� 841a9643ea8Slogwang */ 842a9643ea8Slogwang ssize_t mt_send(int fd, const void *buf, size_t nbyte, int flags, int timeout) 843a9643ea8Slogwang { 844a9643ea8Slogwang return MtFrame::send(fd, buf, nbyte, flags, timeout); 845a9643ea8Slogwang } 846a9643ea8Slogwang 847a9643ea8Slogwang /** 848a9643ea8Slogwang * @brief �߳�����sleep�ӿ�, ��λms 849a9643ea8Slogwang */ 850a9643ea8Slogwang void mt_sleep(int ms) 851a9643ea8Slogwang { 852a9643ea8Slogwang MtFrame::sleep(ms); 853a9643ea8Slogwang } 854a9643ea8Slogwang 855a9643ea8Slogwang /** 856a9643ea8Slogwang * @brief �̻߳�ȡϵͳʱ�䣬��λms 857a9643ea8Slogwang */ 858a9643ea8Slogwang unsigned long long mt_time_ms(void) 859a9643ea8Slogwang { 860a9643ea8Slogwang return MtFrame::Instance()->GetLastClock(); 861a9643ea8Slogwang } 862a9643ea8Slogwang 863a9643ea8Slogwang /** 864a9643ea8Slogwang * @brief �̵߳ȴ�epoll�¼��İ������� 865a9643ea8Slogwang */ 866a9643ea8Slogwang int mt_wait_events(int fd, int events, int timeout) 867a9643ea8Slogwang { 868a9643ea8Slogwang return MtFrame::Instance()->WaitEvents(fd, events, timeout); 869a9643ea8Slogwang } 870a9643ea8Slogwang 871a9643ea8Slogwang void* mt_start_thread(void* entry, void* args) 872a9643ea8Slogwang { 873a9643ea8Slogwang return MtFrame::Instance()->CreateThread((ThreadStart)entry, args, true); 874a9643ea8Slogwang } 875a9643ea8Slogwang 876a9643ea8Slogwang #define BUF_ALIGNMENT_SIZE 4096 877a9643ea8Slogwang #define BUF_ALIGN_SIZE(x) (((x)+BUF_ALIGNMENT_SIZE-1)&~(BUF_ALIGNMENT_SIZE-1)) 878a9643ea8Slogwang #define BUF_DEFAULT_SIZE 4096 879a9643ea8Slogwang 880a9643ea8Slogwang class ScopedBuf 881a9643ea8Slogwang { 882a9643ea8Slogwang public: 883a9643ea8Slogwang ScopedBuf(void*& buf_keeper, bool keep) 884a9643ea8Slogwang :buf_keeper_(buf_keeper),buf_(0),len_(0),len_watermark_(0),keep_(keep) 885a9643ea8Slogwang {} 886a9643ea8Slogwang 887a9643ea8Slogwang int Alloc(int len) 888a9643ea8Slogwang { 889a9643ea8Slogwang if(len<len_) 890a9643ea8Slogwang { 891a9643ea8Slogwang return -1; // �ռ��СԽ�� 892a9643ea8Slogwang } 893a9643ea8Slogwang 894a9643ea8Slogwang if(len==0) 895a9643ea8Slogwang { 896a9643ea8Slogwang len = BUF_ALIGNMENT_SIZE; 897a9643ea8Slogwang } 898a9643ea8Slogwang if(len_==len) 899a9643ea8Slogwang { 900a9643ea8Slogwang return 0; 901a9643ea8Slogwang } 902a9643ea8Slogwang 903a9643ea8Slogwang len_ = BUF_ALIGN_SIZE(len); 904a9643ea8Slogwang if(len_==0) 905a9643ea8Slogwang { 906a9643ea8Slogwang len_ = BUF_DEFAULT_SIZE; 907a9643ea8Slogwang } 908a9643ea8Slogwang len_watermark_ = len_-BUF_ALIGNMENT_SIZE; 909a9643ea8Slogwang char* tmp = (char*)realloc(buf_, len_); 910a9643ea8Slogwang if(tmp==NULL) 911a9643ea8Slogwang { 912a9643ea8Slogwang return -2; // �����ڴ�ʧ�� 913a9643ea8Slogwang } 914a9643ea8Slogwang 915a9643ea8Slogwang buf_ = tmp; 916a9643ea8Slogwang return 0; 917a9643ea8Slogwang } 918a9643ea8Slogwang 919a9643ea8Slogwang void reset() 920a9643ea8Slogwang { 921a9643ea8Slogwang if(keep_) 922a9643ea8Slogwang { 923a9643ea8Slogwang buf_keeper_ = (void*)buf_; 924a9643ea8Slogwang buf_ = NULL; 925a9643ea8Slogwang } 926a9643ea8Slogwang } 927a9643ea8Slogwang 928a9643ea8Slogwang ~ScopedBuf() 929a9643ea8Slogwang { 930a9643ea8Slogwang if(buf_!=NULL) 931a9643ea8Slogwang { 932a9643ea8Slogwang free(buf_); 933a9643ea8Slogwang buf_ = NULL; 934a9643ea8Slogwang } 935a9643ea8Slogwang } 936a9643ea8Slogwang 937a9643ea8Slogwang public: 938a9643ea8Slogwang void* &buf_keeper_; 939a9643ea8Slogwang char* buf_; 940a9643ea8Slogwang int len_; 941a9643ea8Slogwang int len_watermark_; 942a9643ea8Slogwang bool keep_; 943a9643ea8Slogwang 944a9643ea8Slogwang }; 945a9643ea8Slogwang 946a9643ea8Slogwang /** 947a9643ea8Slogwang * @brief TCPѭ������, ֱ������OK��ʱ 948a9643ea8Slogwang * [ע��] �����߲�Ҫ�����ĺ�������ֵ����֤��Ҫ��mt_tcpsendrcv�ȵ��ýӿڳ�ͻ [��Ҫ] 949a9643ea8Slogwang */ 950a9643ea8Slogwang static int mt_tcp_check_recv(int sock, void*& rcv_buf, int &len, int flags, 951a9643ea8Slogwang int timeout, MtFuncTcpMsgChecker check_func, void* msg_ctx, bool keep_rcv_buf) 952a9643ea8Slogwang { 953a9643ea8Slogwang int recv_len = 0; 954a9643ea8Slogwang utime64_t start_ms = MtFrame::Instance()->GetLastClock(); 955a9643ea8Slogwang 956a9643ea8Slogwang int rc = 0; 957a9643ea8Slogwang int ret = 0; 958a9643ea8Slogwang int pkg_len = 0; 959a9643ea8Slogwang bool msg_len_detected = false; 960a9643ea8Slogwang 961a9643ea8Slogwang ScopedBuf sbuf(rcv_buf, keep_rcv_buf); 962a9643ea8Slogwang ret = sbuf.Alloc(len); 963a9643ea8Slogwang 964a9643ea8Slogwang if(ret!=0) 965a9643ea8Slogwang { 966a9643ea8Slogwang MTLOG_ERROR("tcp socket[%d] recv failed ret[%d], alloc rcv buf failed, [%m]", sock, ret); 967a9643ea8Slogwang return -11; 968a9643ea8Slogwang } 969a9643ea8Slogwang 970a9643ea8Slogwang do 971a9643ea8Slogwang { 972a9643ea8Slogwang utime64_t cost_time = MtFrame::Instance()->GetLastClock() - start_ms; 973a9643ea8Slogwang if (cost_time > (utime64_t)timeout) 974a9643ea8Slogwang { 975a9643ea8Slogwang errno = ETIME; 976a9643ea8Slogwang MTLOG_ERROR("tcp socket[%d] recv not ok, timeout", sock); 977a9643ea8Slogwang return -3; 978a9643ea8Slogwang } 979a9643ea8Slogwang 980a9643ea8Slogwang rc = MtFrame::recv(sock, (sbuf.buf_ + recv_len), (sbuf.len_ - recv_len), 0, (timeout - (int)cost_time)); 981a9643ea8Slogwang if (rc < 0) 982a9643ea8Slogwang { 983a9643ea8Slogwang MTLOG_ERROR("tcp socket[%d] recv failed ret[%d][%m]", sock, rc); 984a9643ea8Slogwang return -3; 985a9643ea8Slogwang } 986a9643ea8Slogwang else if (rc == 0) // Զ�˹ر� 987a9643ea8Slogwang { 988a9643ea8Slogwang 989a9643ea8Slogwang if(recv_len==0) // δ�ذ���ֱ�ӷ���Զ�˹ر� 990a9643ea8Slogwang { 991a9643ea8Slogwang MTLOG_ERROR("tcp socket[%d] remote close", sock); 992a9643ea8Slogwang return -7; 993a9643ea8Slogwang } 994a9643ea8Slogwang 995a9643ea8Slogwang /* ��鱨�������� */ 996a9643ea8Slogwang rc = check_func(sbuf.buf_, recv_len, true, msg_ctx, msg_len_detected); 997a9643ea8Slogwang 998a9643ea8Slogwang if(rc!=recv_len) // ҵ�����Զ�˹رգ�Ӧ�÷������������ȣ�����<=0,��ʾ���������� 999a9643ea8Slogwang { 1000a9643ea8Slogwang MTLOG_ERROR("tcp socket[%d] remote close", sock); 1001a9643ea8Slogwang return -7; 1002a9643ea8Slogwang } 1003a9643ea8Slogwang len = recv_len; 1004a9643ea8Slogwang break; 1005a9643ea8Slogwang } 1006a9643ea8Slogwang recv_len += rc; 1007a9643ea8Slogwang 1008a9643ea8Slogwang /* ��鱨�������� */ 1009a9643ea8Slogwang if((!msg_len_detected)||recv_len==pkg_len) 1010a9643ea8Slogwang { 1011a9643ea8Slogwang rc = check_func(sbuf.buf_, recv_len, false, msg_ctx,msg_len_detected); 1012a9643ea8Slogwang if(msg_len_detected) 1013a9643ea8Slogwang { 1014a9643ea8Slogwang pkg_len = rc; 1015a9643ea8Slogwang } 1016a9643ea8Slogwang } 1017a9643ea8Slogwang else 1018a9643ea8Slogwang { 1019a9643ea8Slogwang rc = pkg_len; 1020a9643ea8Slogwang } 1021a9643ea8Slogwang 1022a9643ea8Slogwang if (rc < 0) 1023a9643ea8Slogwang { 1024a9643ea8Slogwang MTLOG_ERROR("tcp socket[%d] user check pkg error[%d]", sock, rc); 1025a9643ea8Slogwang return -5; 1026a9643ea8Slogwang } 1027a9643ea8Slogwang else if (rc == 0) // ����δ������,�Ҳ�ȷ����С 1028a9643ea8Slogwang { 1029a9643ea8Slogwang if(sbuf.len_ > recv_len) 1030a9643ea8Slogwang { 1031a9643ea8Slogwang continue; 1032a9643ea8Slogwang } 1033a9643ea8Slogwang // û�ռ��ٽ�����, 2����С��չbuf 1034a9643ea8Slogwang ret = sbuf.Alloc(sbuf.len_<<1); 1035a9643ea8Slogwang 1036a9643ea8Slogwang if(ret!=0) 1037a9643ea8Slogwang { 1038a9643ea8Slogwang MTLOG_ERROR("tcp socket[%d] recv failed ret[%d], alloc rcv buf failed, [%m]", sock, ret); 1039a9643ea8Slogwang return -11; 1040a9643ea8Slogwang } 1041a9643ea8Slogwang } 1042a9643ea8Slogwang else // �ɹ����㱨�ij��� 1043a9643ea8Slogwang { 1044a9643ea8Slogwang if (rc > recv_len) // ���Ļ�δ��ȫ 1045a9643ea8Slogwang { 1046a9643ea8Slogwang if(sbuf.len_ > recv_len) // recv buf���пռ�.��δ����ˮλ 1047a9643ea8Slogwang { 1048a9643ea8Slogwang continue; 1049a9643ea8Slogwang } 1050a9643ea8Slogwang 1051a9643ea8Slogwang // û�ռ��ٽ�����, ����ҵ��ָʾ��С��չ�ڴ� 1052a9643ea8Slogwang ret = sbuf.Alloc(rc); 1053a9643ea8Slogwang 1054a9643ea8Slogwang if(ret!=0) 1055a9643ea8Slogwang { 1056a9643ea8Slogwang MTLOG_ERROR("tcp socket[%d] recv failed ret[%d], alloc rcv buf failed, [%m]", sock, ret); 1057a9643ea8Slogwang return -11; 1058a9643ea8Slogwang } 1059a9643ea8Slogwang } 1060a9643ea8Slogwang else if(rc==recv_len) // �հ����� 1061a9643ea8Slogwang { 1062a9643ea8Slogwang len = rc; 1063a9643ea8Slogwang break; 1064a9643ea8Slogwang } 1065a9643ea8Slogwang else // �߳����ģʽ�£�������ճ�� 1066a9643ea8Slogwang { 1067a9643ea8Slogwang MTLOG_ERROR("tcp socket[%d] user check pkg error, pkg len < recv_len", sock); 1068a9643ea8Slogwang return -5; 1069a9643ea8Slogwang } 1070a9643ea8Slogwang } 1071a9643ea8Slogwang } while (true); 1072a9643ea8Slogwang 1073a9643ea8Slogwang sbuf.reset(); 1074a9643ea8Slogwang 1075a9643ea8Slogwang return 0; 1076a9643ea8Slogwang } 1077a9643ea8Slogwang 1078a9643ea8Slogwang 1079a9643ea8Slogwang 1080a9643ea8Slogwang /** 1081a9643ea8Slogwang * @brief TCP��������ӳصķ�ʽ����IP/PORT����, ���ӱ���Ĭ��10���� 1082a9643ea8Slogwang * [ע��] tcp���շ���buff, ��������static����, ����������Ĵ��� [��Ҫ] 1083a9643ea8Slogwang * @param dst -�����͵�Ŀ�ĵ�ַ 1084a9643ea8Slogwang * @param pkg -�������װ�İ��� 1085a9643ea8Slogwang * @param len -�������װ�İ��峤�� 1086a9643ea8Slogwang * @param rcv_buf -���������������ο����� keep_rcv_buf�� 1087a9643ea8Slogwang * @param recv_pkg_size -����Ӧ�����buff�ij�ʼ��С����������0ʱ�����ر���ʵ�ʳ��ȡ� 1088a9643ea8Slogwang * @param timeout -��ʱʱ��, ��λms 1089a9643ea8Slogwang * @param check_func -��ⱨ���Ƿ�ɹ����ﺯ�� 1090a9643ea8Slogwang * @param msg_ctx -�������ĵ������ı����� 1091a9643ea8Slogwang * @param keep_rcv_buf -true,��ܽ�����rcv_buf��ֵ������������ҵ�����ͷŸ�buf���������ڵ��ý���ǰ�ͷŸ�buf����ע�⡿ 1092a9643ea8Slogwang * ҵ����Ҫ�Լ���msg_ctx�б��������Ϣ�������ͨ��malloc�����ڴ桿 1093a9643ea8Slogwang * @return 0 �ɹ�, -1 ��socketʧ��, -2 ��������ʧ��, -3 ����Ӧ��ʧ��, 1094a9643ea8Slogwang * -4 ����ʧ��, -5 ��ⱨ��ʧ��, -6 ���տռ䲻��, -7 ��������ر�����, -10 ������Ч 1095a9643ea8Slogwang */ 1096a9643ea8Slogwang int mt_tcpsendrcv(struct sockaddr_in* dst, void* pkg, int len, void*& rcv_buf, int& recv_pkg_size, 1097a9643ea8Slogwang int timeout, MtFuncTcpMsgChecker check_func, void* msg_ctx, bool keep_rcv_buf) 1098a9643ea8Slogwang { 1099a9643ea8Slogwang if(!dst || !pkg || len<1) 1100a9643ea8Slogwang { 1101a9643ea8Slogwang MTLOG_ERROR("input params invalid, dst[%p], pkg[%p], len[%d], fun[%p]", 1102a9643ea8Slogwang dst, pkg, len, check_func); 1103a9643ea8Slogwang return -10; 1104a9643ea8Slogwang } 1105a9643ea8Slogwang 1106a9643ea8Slogwang 1107a9643ea8Slogwang int ret = 0, rc = 0; 1108a9643ea8Slogwang int addr_len = sizeof(struct sockaddr_in); 1109a9643ea8Slogwang utime64_t start_ms = MtFrame::Instance()->GetLastClock(); 1110a9643ea8Slogwang utime64_t cost_time = 0; 1111a9643ea8Slogwang int time_left = timeout; 1112a9643ea8Slogwang 1113a9643ea8Slogwang // 1. ��ȡTCP���ӳض���, �ҽ�֪ͨ���� 1114a9643ea8Slogwang int sock = -1; 1115a9643ea8Slogwang TcpKeepConn* conn = mt_tcp_get_keep_conn(dst, sock); 1116a9643ea8Slogwang if ((conn == NULL) || (sock < 0)) 1117a9643ea8Slogwang { 1118a9643ea8Slogwang MTLOG_ERROR("socket[%d] get conn failed, ret[%m]", sock); 1119a9643ea8Slogwang ret = -1; 1120a9643ea8Slogwang goto EXIT_LABEL; 1121a9643ea8Slogwang } 1122a9643ea8Slogwang 1123a9643ea8Slogwang // 2. ���Լ����½����� 1124a9643ea8Slogwang rc = MtFrame::connect(sock, (struct sockaddr *)dst, addr_len, time_left); 1125a9643ea8Slogwang if (rc < 0) 1126a9643ea8Slogwang { 1127a9643ea8Slogwang MTLOG_ERROR("socket[%d] connect failed, ret[%d][%m]", sock, rc); 1128a9643ea8Slogwang ret = -4; 1129a9643ea8Slogwang goto EXIT_LABEL; 1130a9643ea8Slogwang } 1131a9643ea8Slogwang 1132a9643ea8Slogwang // 3. �������ݴ��� 1133a9643ea8Slogwang cost_time = MtFrame::Instance()->GetLastClock() - start_ms; 1134a9643ea8Slogwang time_left = (timeout > (int)cost_time) ? (timeout - (int)cost_time) : 0; 1135a9643ea8Slogwang rc = MtFrame::send(sock, pkg, len, 0, time_left); 1136a9643ea8Slogwang if (rc < 0) 1137a9643ea8Slogwang { 1138a9643ea8Slogwang MTLOG_ERROR("socket[%d] send failed, ret[%d][%m]", sock, rc); 1139a9643ea8Slogwang ret = -2; 1140a9643ea8Slogwang goto EXIT_LABEL; 1141a9643ea8Slogwang } 1142a9643ea8Slogwang 1143a9643ea8Slogwang // 4. �������ݴ��� 1144a9643ea8Slogwang cost_time = MtFrame::Instance()->GetLastClock() - start_ms; 1145a9643ea8Slogwang time_left = (timeout > (int)cost_time) ? (timeout - (int)cost_time) : 0; 1146a9643ea8Slogwang 1147a9643ea8Slogwang rc = mt_tcp_check_recv(sock, rcv_buf, recv_pkg_size, 0, time_left, check_func, msg_ctx, keep_rcv_buf); 1148a9643ea8Slogwang if (rc < 0) 1149a9643ea8Slogwang { 1150a9643ea8Slogwang MTLOG_ERROR("socket[%d] rcv failed, ret[%d][%m]", sock, rc); 1151a9643ea8Slogwang ret = rc; 1152a9643ea8Slogwang goto EXIT_LABEL; 1153a9643ea8Slogwang } 1154a9643ea8Slogwang 1155a9643ea8Slogwang ret = 0; 1156a9643ea8Slogwang 1157a9643ea8Slogwang EXIT_LABEL: 1158a9643ea8Slogwang 1159a9643ea8Slogwang // ʧ����ǿ���ͷ�����, ����ʱ���� 1160a9643ea8Slogwang if (conn != NULL) 1161a9643ea8Slogwang { 1162a9643ea8Slogwang ConnectionMgr::Instance()->FreeConnection(conn, (ret < 0)); 1163a9643ea8Slogwang } 1164a9643ea8Slogwang 1165a9643ea8Slogwang return ret; 1166a9643ea8Slogwang } 1167a9643ea8Slogwang 1168a9643ea8Slogwang 1169a9643ea8Slogwang /** 1170a9643ea8Slogwang * @brief TCP�������շ����� 1171a9643ea8Slogwang * [ע��] tcp���շ���buff, ��������static����, ����������Ĵ��� [��Ҫ] 1172a9643ea8Slogwang * [ע��] �Ľӿڣ���ע�ⲻҪ����ķ���ֵ������֤��mt_tcpsendrcv_ex����ֵƥ�� [��Ҫ] 1173a9643ea8Slogwang * @param dst -�����͵�Ŀ�ĵ�ַ 1174a9643ea8Slogwang * @param pkg -�������װ�İ��� 1175a9643ea8Slogwang * @param len -�������װ�İ��峤�� 1176a9643ea8Slogwang * @param rcv_buf -���������������ο����� keep_rcv_buf�� 1177a9643ea8Slogwang * @param recv_pkg_size -����Ӧ�����buff�ij�ʼ��С����������0ʱ�����ر���ʵ�ʳ��ȡ� 1178a9643ea8Slogwang * @param timeout -��ʱʱ��, ��λms 1179a9643ea8Slogwang * @param check_func -��ⱨ���Ƿ�ɹ����ﺯ�� 1180a9643ea8Slogwang * @param msg_ctx -�������ĵ������ı����� 1181a9643ea8Slogwang * @param keep_rcv_buf -true,��ܽ�����rcv_buf��ֵ������������ҵ�����ͷŸ�buf���������ڵ��ý���ǰ�ͷŸ�buf����ע�⡿ 1182a9643ea8Slogwang * ҵ����Ҫ�Լ���msg_ctx�б��������Ϣ�������ͨ��malloc�����ڴ桿 1183a9643ea8Slogwang * @return 0 �ɹ�, -1 ��socketʧ��, -2 ��������ʧ��, -3 ����Ӧ��ʧ��, 1184a9643ea8Slogwang * -4 ����ʧ��, -5 ��ⱨ��ʧ��, -6 ���տռ䲻��, -7 ��������ر�����, -10 ������Ч 1185a9643ea8Slogwang */ 1186a9643ea8Slogwang int mt_tcpsendrcv_short(struct sockaddr_in* dst, void* pkg, int len, void*& rcv_buf, int& recv_pkg_size, 1187a9643ea8Slogwang int timeout, MtFuncTcpMsgChecker check_func, void* msg_ctx, bool keep_rcv_buf) 1188a9643ea8Slogwang { 1189a9643ea8Slogwang int ret = 0, rc = 0; 1190a9643ea8Slogwang int addr_len = sizeof(struct sockaddr_in); 1191a9643ea8Slogwang utime64_t start_ms = MtFrame::Instance()->GetLastClock(); 1192a9643ea8Slogwang utime64_t cost_time = 0; 1193a9643ea8Slogwang int time_left = timeout; 1194a9643ea8Slogwang 1195a9643ea8Slogwang // 1. ������� 1196a9643ea8Slogwang if(!dst || !pkg || len<1) 1197a9643ea8Slogwang { 1198a9643ea8Slogwang MTLOG_ERROR("input params invalid, dst[%p], pkg[%p], len[%d], fun[%p]", 1199a9643ea8Slogwang dst, pkg, len, check_func); 1200a9643ea8Slogwang return -10; 1201a9643ea8Slogwang } 1202a9643ea8Slogwang 1203a9643ea8Slogwang // 2. ����TCP socket 1204a9643ea8Slogwang int sock; 1205a9643ea8Slogwang sock = mt_tcp_create_sock(); 1206a9643ea8Slogwang if (sock < 0) 1207a9643ea8Slogwang { 1208a9643ea8Slogwang MTLOG_ERROR("create tcp socket failed, ret: %d", sock); 1209a9643ea8Slogwang return -1; 1210a9643ea8Slogwang } 1211a9643ea8Slogwang 1212a9643ea8Slogwang // 3. ���Լ����½����� 1213a9643ea8Slogwang rc = MtFrame::connect(sock, (struct sockaddr *)dst, addr_len, time_left); 1214a9643ea8Slogwang if (rc < 0) 1215a9643ea8Slogwang { 1216a9643ea8Slogwang MTLOG_ERROR("socket[%d] connect failed, ret[%d][%m]", sock, rc); 1217a9643ea8Slogwang ret = -4; 1218a9643ea8Slogwang goto EXIT_LABEL; 1219a9643ea8Slogwang } 1220a9643ea8Slogwang 1221a9643ea8Slogwang // 4. �������ݴ��� 1222a9643ea8Slogwang cost_time = MtFrame::Instance()->GetLastClock() - start_ms; 1223a9643ea8Slogwang time_left = (timeout > (int)cost_time) ? (timeout - (int)cost_time) : 0; 1224a9643ea8Slogwang rc = MtFrame::send(sock, pkg, len, 0, time_left); 1225a9643ea8Slogwang if (rc < 0) 1226a9643ea8Slogwang { 1227a9643ea8Slogwang MTLOG_ERROR("socket[%d] send failed, ret[%d][%m]", sock, rc); 1228a9643ea8Slogwang ret = -2; 1229a9643ea8Slogwang goto EXIT_LABEL; 1230a9643ea8Slogwang } 1231a9643ea8Slogwang 1232a9643ea8Slogwang // 5. �������ݴ��� 1233a9643ea8Slogwang cost_time = MtFrame::Instance()->GetLastClock() - start_ms; 1234a9643ea8Slogwang time_left = (timeout > (int)cost_time) ? (timeout - (int)cost_time) : 0; 1235a9643ea8Slogwang rc = mt_tcp_check_recv(sock, rcv_buf, recv_pkg_size, 0, time_left, check_func, msg_ctx, keep_rcv_buf); 1236a9643ea8Slogwang 1237a9643ea8Slogwang if (rc < 0) 1238a9643ea8Slogwang { 1239a9643ea8Slogwang MTLOG_ERROR("socket[%d] rcv failed, ret[%d][%m]", sock, rc); 1240a9643ea8Slogwang ret = rc; 1241a9643ea8Slogwang goto EXIT_LABEL; 1242a9643ea8Slogwang } 1243a9643ea8Slogwang 1244a9643ea8Slogwang ret = 0; 1245a9643ea8Slogwang 1246a9643ea8Slogwang EXIT_LABEL: 1247a9643ea8Slogwang if (sock >= 0) 1248a9643ea8Slogwang ::close(sock); 1249a9643ea8Slogwang 1250a9643ea8Slogwang return ret; 1251a9643ea8Slogwang } 1252a9643ea8Slogwang 1253a9643ea8Slogwang 1254a9643ea8Slogwang 1255a9643ea8Slogwang /** 1256a9643ea8Slogwang * @brief TCP�շ��ӿڣ�����ѡ���˱������ӻ��߶����� 1257a9643ea8Slogwang * [ע��] tcp���շ���buff, ��������static����, ����������Ĵ��� [��Ҫ] 1258a9643ea8Slogwang * @param dst -�����͵�Ŀ�ĵ�ַ 1259a9643ea8Slogwang * @param pkg -�������װ�İ��� 1260a9643ea8Slogwang * @param len -�������װ�İ��峤�� 1261a9643ea8Slogwang * @param rcv_buf -���������������ο����� keep_rcv_buf�� 1262a9643ea8Slogwang * @param recv_pkg_size -����Ӧ�����buff�ij�ʼ��С����������0ʱ�����ر���ʵ�ʳ��ȡ� 1263a9643ea8Slogwang * @param timeout -��ʱʱ��, ��λms 1264a9643ea8Slogwang * @param check_func -��ⱨ���Ƿ�ɹ����ﺯ�� 1265a9643ea8Slogwang * @param msg_ctx -�������ĵ������ı����� 1266a9643ea8Slogwang * 1267a9643ea8Slogwang * @param type - �������� 1268a9643ea8Slogwang * MT_TCP_SHORT: һ��һ�������ӣ� 1269a9643ea8Slogwang * MT_TCP_LONG : һ��һ�ճ����ӣ� 1270a9643ea8Slogwang * MT_TCP_LONG_SNDONLY : ֻ�����ճ����ӣ� 1271a9643ea8Slogwang * MT_TCP_SHORT_SNDONLY: ֻ�����ն����ӣ� 1272a9643ea8Slogwang * @param keep_rcv_buf -true,��ܽ�����rcv_buf��ֵ������������ҵ�����ͷŸ�buf���������ڵ��ý���ǰ�ͷŸ�buf����ע�⡿ 1273a9643ea8Slogwang * ҵ����Ҫ�Լ���msg_ctx�б��������Ϣ�������ͨ��malloc�����ڴ桿 1274a9643ea8Slogwang * @return 0 �ɹ�, -1 ��socketʧ��, -2 ��������ʧ��, -3 ����Ӧ��ʧ��, 1275a9643ea8Slogwang * -4 ����ʧ��, -5 ��ⱨ��ʧ��, -6 ���տռ䲻��, -7 ��������ر�����, -10 ������Ч 1276a9643ea8Slogwang */ 1277a9643ea8Slogwang int mt_tcpsendrcv_ex(struct sockaddr_in* dst, void* pkg, int len, void*& rcv_buf, int& rcv_pkg_size, 1278a9643ea8Slogwang int timeout, MtFuncTcpMsgChecker check_func, void* msg_ctx, 1279a9643ea8Slogwang MT_TCP_CONN_TYPE type, bool keep_rcv_buf) 1280a9643ea8Slogwang { 1281a9643ea8Slogwang if(!dst || !pkg || len<1) 1282a9643ea8Slogwang { 1283a9643ea8Slogwang MTLOG_ERROR("input params invalid, dst[%p], pkg[%p], len[%d], fun[%p], msg_ctx[%p], type[%d]", 1284a9643ea8Slogwang dst, pkg, len, check_func, msg_ctx, type); 1285a9643ea8Slogwang return -10; 1286a9643ea8Slogwang } 1287a9643ea8Slogwang 1288a9643ea8Slogwang switch (type) 1289a9643ea8Slogwang { 1290a9643ea8Slogwang // TCP�����ӵ������� 1291a9643ea8Slogwang case MT_TCP_LONG: 1292a9643ea8Slogwang { 1293a9643ea8Slogwang return mt_tcpsendrcv(dst, pkg, len, rcv_buf, rcv_pkg_size, timeout, check_func, msg_ctx, keep_rcv_buf); 1294a9643ea8Slogwang } 1295a9643ea8Slogwang 1296a9643ea8Slogwang // TCP������ֻ������ 1297a9643ea8Slogwang case MT_TCP_LONG_SNDONLY: 1298a9643ea8Slogwang { 1299a9643ea8Slogwang return mt_tcpsend(dst, pkg, len, timeout); 1300a9643ea8Slogwang } 1301a9643ea8Slogwang 1302a9643ea8Slogwang // TCP�����ӵ������� 1303a9643ea8Slogwang case MT_TCP_SHORT: 1304a9643ea8Slogwang { 1305a9643ea8Slogwang return mt_tcpsendrcv_short(dst, pkg, len, rcv_buf, rcv_pkg_size, timeout, check_func, msg_ctx, keep_rcv_buf); 1306a9643ea8Slogwang } 1307a9643ea8Slogwang 1308a9643ea8Slogwang // TCP������ֻ������ 1309a9643ea8Slogwang case MT_TCP_SHORT_SNDONLY: 1310a9643ea8Slogwang { 1311a9643ea8Slogwang return mt_tcpsend_short(dst, pkg, len, timeout); 1312a9643ea8Slogwang } 1313a9643ea8Slogwang 1314a9643ea8Slogwang default: 1315a9643ea8Slogwang { 1316a9643ea8Slogwang MTLOG_ERROR("input params invalid, dst[%p], pkg[%p], len[%d], fun[%p], msg_ctx[%p], type[%d]", 1317a9643ea8Slogwang dst, pkg, len, check_func, msg_ctx, type); 1318a9643ea8Slogwang return -10; 1319a9643ea8Slogwang } 1320a9643ea8Slogwang } 1321a9643ea8Slogwang 1322a9643ea8Slogwang return 0; 1323a9643ea8Slogwang } 1324a9643ea8Slogwang 1325a9643ea8Slogwang 1326a9643ea8Slogwang 1327a9643ea8Slogwang } 1328a9643ea8Slogwang 1329a9643ea8Slogwang 1330