1a9643ea8Slogwang 2a9643ea8Slogwang /** 3a9643ea8Slogwang * Tencent is pleased to support the open source community by making MSEC available. 4a9643ea8Slogwang * 5a9643ea8Slogwang * Copyright (C) 2016 THL A29 Limited, a Tencent company. All rights reserved. 6a9643ea8Slogwang * 7a9643ea8Slogwang * Licensed under the GNU General Public License, Version 2.0 (the "License"); 8a9643ea8Slogwang * you may not use this file except in compliance with the License. You may 9a9643ea8Slogwang * obtain a copy of the License at 10a9643ea8Slogwang * 11a9643ea8Slogwang * https://opensource.org/licenses/GPL-2.0 12a9643ea8Slogwang * 13a9643ea8Slogwang * Unless required by applicable law or agreed to in writing, software distributed under the 14a9643ea8Slogwang * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, 15a9643ea8Slogwang * either express or implied. See the License for the specific language governing permissions 16a9643ea8Slogwang * and limitations under the License. 17a9643ea8Slogwang */ 18a9643ea8Slogwang 19a9643ea8Slogwang 20a9643ea8Slogwang /** 21a9643ea8Slogwang * @filename mt_sys_call.cpp 22a9643ea8Slogwang */ 23a9643ea8Slogwang 24a9643ea8Slogwang #include "kqueue_proxy.h" 25a9643ea8Slogwang #include "micro_thread.h" 26a9643ea8Slogwang #include "mt_connection.h" 27a9643ea8Slogwang #include "mt_api.h" 28a9643ea8Slogwang #include "ff_api.h" 29a9643ea8Slogwang #include "mt_sys_hook.h" 30a9643ea8Slogwang 31a9643ea8Slogwang namespace NS_MICRO_THREAD { 32a9643ea8Slogwang 33a9643ea8Slogwang int mt_udpsendrcv(struct sockaddr_in* dst, void* pkg, int len, void* rcv_buf, int& buf_size, int timeout) 34a9643ea8Slogwang { 35a9643ea8Slogwang int ret = 0; 36a9643ea8Slogwang int rc = 0; 37a9643ea8Slogwang int flags = 1; 38a9643ea8Slogwang struct sockaddr_in from_addr = {0}; 39a9643ea8Slogwang int addr_len = sizeof(from_addr); 40a9643ea8Slogwang 41a9643ea8Slogwang if(len<1 || buf_size<1 ||!dst || !pkg || !rcv_buf) 42a9643ea8Slogwang { 43a9643ea8Slogwang MTLOG_ERROR("mt_udpsendrcv input params invalid, dst[%p], pkg[%p], rcv_buf[%p], len[%d], buf_size[%d]", 44a9643ea8Slogwang dst, pkg, rcv_buf, len, buf_size); 45a9643ea8Slogwang return -10; 46a9643ea8Slogwang } 47a9643ea8Slogwang 48a9643ea8Slogwang int sock = socket(PF_INET, SOCK_DGRAM, 0); 49a9643ea8Slogwang if ((sock < 0) || (ioctl(sock, FIONBIO, &flags) < 0)) 50a9643ea8Slogwang { 51*5ac59bc4Slogwang MT_ATTR_API(320842, 1); 52a9643ea8Slogwang MTLOG_ERROR("mt_udpsendrcv new sock failed, sock: %d, errno: %d (%m)", sock, errno); 53a9643ea8Slogwang ret = -1; 54a9643ea8Slogwang goto EXIT_LABEL; 55a9643ea8Slogwang } 56a9643ea8Slogwang 57a9643ea8Slogwang rc = MtFrame::sendto(sock, pkg, len, 0, (struct sockaddr*)dst, (int)sizeof(*dst), timeout); 58a9643ea8Slogwang if (rc < 0) 59a9643ea8Slogwang { 60*5ac59bc4Slogwang MT_ATTR_API(320844, 1); 61a9643ea8Slogwang MTLOG_ERROR("mt_udpsendrcv send failed, rc: %d, errno: %d (%m)", rc, errno); 62a9643ea8Slogwang ret = -2; 63a9643ea8Slogwang goto EXIT_LABEL; 64a9643ea8Slogwang } 65a9643ea8Slogwang 66a9643ea8Slogwang rc = MtFrame::recvfrom(sock, rcv_buf, buf_size, 0, (struct sockaddr*)&from_addr, (socklen_t*)&addr_len, timeout); 67a9643ea8Slogwang if (rc < 0) 68a9643ea8Slogwang { 69*5ac59bc4Slogwang MT_ATTR_API(320845, 1); 70a9643ea8Slogwang MTLOG_ERROR("mt_udpsendrcv recv failed, rc: %d, errno: %d (%m)", rc, errno); 71a9643ea8Slogwang ret = -3; 72a9643ea8Slogwang goto EXIT_LABEL; 73a9643ea8Slogwang } 74a9643ea8Slogwang buf_size = rc; 75a9643ea8Slogwang 76a9643ea8Slogwang EXIT_LABEL: 77a9643ea8Slogwang 78a9643ea8Slogwang if (sock > 0) 79a9643ea8Slogwang { 80a9643ea8Slogwang close(sock); 81a9643ea8Slogwang sock = -1; 82a9643ea8Slogwang } 83a9643ea8Slogwang 84a9643ea8Slogwang return ret; 85a9643ea8Slogwang } 86a9643ea8Slogwang 87a9643ea8Slogwang int mt_tcp_create_sock(void) 88a9643ea8Slogwang { 89a9643ea8Slogwang int fd; 90a9643ea8Slogwang int flag; 91a9643ea8Slogwang 92a9643ea8Slogwang fd = ::socket(AF_INET, SOCK_STREAM, 0); 93a9643ea8Slogwang if (fd < 0) 94a9643ea8Slogwang { 95a9643ea8Slogwang MTLOG_ERROR("create tcp socket failed, error: %m"); 96a9643ea8Slogwang return -1; 97a9643ea8Slogwang } 98a9643ea8Slogwang 99a9643ea8Slogwang flag = fcntl(fd, F_GETFL, 0); 100a9643ea8Slogwang if (flag == -1) 101a9643ea8Slogwang { 102a9643ea8Slogwang ::close(fd); 103a9643ea8Slogwang MTLOG_ERROR("get fd flags failed, error: %m"); 104a9643ea8Slogwang return -2; 105a9643ea8Slogwang } 106a9643ea8Slogwang 107a9643ea8Slogwang if (flag & O_NONBLOCK) 108a9643ea8Slogwang return fd; 109a9643ea8Slogwang 110a9643ea8Slogwang if (fcntl(fd, F_SETFL, flag | O_NONBLOCK | O_NDELAY) == -1) 111a9643ea8Slogwang { 112a9643ea8Slogwang ::close(fd); 113a9643ea8Slogwang MTLOG_ERROR("set fd flags failed, error: %m"); 114a9643ea8Slogwang return -3; 115a9643ea8Slogwang } 116a9643ea8Slogwang 117a9643ea8Slogwang return fd; 118a9643ea8Slogwang } 119a9643ea8Slogwang 120a9643ea8Slogwang static TcpKeepConn* mt_tcp_get_keep_conn(struct sockaddr_in* dst, int& sock) 121a9643ea8Slogwang { 122a9643ea8Slogwang KqueuerObj* ntfy_obj = NtfyObjMgr::Instance()->GetNtfyObj(NTFY_OBJ_THREAD, 0); 123a9643ea8Slogwang if (NULL == ntfy_obj) 124a9643ea8Slogwang { 125a9643ea8Slogwang MTLOG_ERROR("get notify failed, logit"); 126a9643ea8Slogwang return NULL; 127a9643ea8Slogwang } 128a9643ea8Slogwang 129a9643ea8Slogwang TcpKeepConn* conn = dynamic_cast<TcpKeepConn*>(ConnectionMgr::Instance()->GetConnection(OBJ_TCP_KEEP, dst)); 130a9643ea8Slogwang if (NULL == conn) 131a9643ea8Slogwang { 132a9643ea8Slogwang MTLOG_ERROR("get connection failed, dst[%p]", dst); 133a9643ea8Slogwang NtfyObjMgr::Instance()->FreeNtfyObj(ntfy_obj); 134a9643ea8Slogwang return NULL; 135a9643ea8Slogwang } 136a9643ea8Slogwang conn->SetNtfyObj(ntfy_obj); 137a9643ea8Slogwang 138a9643ea8Slogwang int osfd = conn->CreateSocket(); 139a9643ea8Slogwang if (osfd < 0) 140a9643ea8Slogwang { 141a9643ea8Slogwang ConnectionMgr::Instance()->FreeConnection(conn, true); 142a9643ea8Slogwang MTLOG_ERROR("create socket failed, ret[%d]", osfd); 143a9643ea8Slogwang return NULL; 144a9643ea8Slogwang } 145a9643ea8Slogwang 146a9643ea8Slogwang sock = osfd; 147a9643ea8Slogwang return conn; 148a9643ea8Slogwang } 149a9643ea8Slogwang 150a9643ea8Slogwang static int mt_tcp_check_recv(int sock, char* rcv_buf, int &len, int flags, int timeout, MtFuncTcpMsgLen func) 151a9643ea8Slogwang { 152a9643ea8Slogwang int recv_len = 0; 153a9643ea8Slogwang utime64_t start_ms = MtFrame::Instance()->GetLastClock(); 154a9643ea8Slogwang do 155a9643ea8Slogwang { 156a9643ea8Slogwang utime64_t cost_time = MtFrame::Instance()->GetLastClock() - start_ms; 157a9643ea8Slogwang if (cost_time > (utime64_t)timeout) 158a9643ea8Slogwang { 159a9643ea8Slogwang errno = ETIME; 160a9643ea8Slogwang MTLOG_ERROR("tcp socket[%d] recv not ok, timeout", sock); 161a9643ea8Slogwang return -3; 162a9643ea8Slogwang } 163a9643ea8Slogwang 164a9643ea8Slogwang int rc = MtFrame::recv(sock, (rcv_buf + recv_len), (len - recv_len), 0, (timeout - (int)cost_time)); 165a9643ea8Slogwang if (rc < 0) 166a9643ea8Slogwang { 167a9643ea8Slogwang MTLOG_ERROR("tcp socket[%d] recv failed ret[%d][%m]", sock, rc); 168a9643ea8Slogwang return -3; 169a9643ea8Slogwang } 170a9643ea8Slogwang else if (rc == 0) 171a9643ea8Slogwang { 172a9643ea8Slogwang len = recv_len; 173a9643ea8Slogwang MTLOG_ERROR("tcp socket[%d] remote close", sock); 174a9643ea8Slogwang return -7; 175a9643ea8Slogwang } 176a9643ea8Slogwang recv_len += rc; 177a9643ea8Slogwang 178a9643ea8Slogwang rc = func(rcv_buf, recv_len); 179a9643ea8Slogwang if (rc < 0) 180a9643ea8Slogwang { 181a9643ea8Slogwang MTLOG_ERROR("tcp socket[%d] user check pkg error[%d]", sock, rc); 182a9643ea8Slogwang return -5; 183a9643ea8Slogwang } 184*5ac59bc4Slogwang else if (rc == 0) 185a9643ea8Slogwang { 186*5ac59bc4Slogwang if (len == recv_len) 187a9643ea8Slogwang { 188a9643ea8Slogwang MTLOG_ERROR("tcp socket[%d] user check pkg not ok, but no more buff", sock); 189a9643ea8Slogwang return -6; 190a9643ea8Slogwang } 191a9643ea8Slogwang continue; 192a9643ea8Slogwang } 193*5ac59bc4Slogwang else 194a9643ea8Slogwang { 195*5ac59bc4Slogwang if (rc > recv_len) 196a9643ea8Slogwang { 197a9643ea8Slogwang continue; 198a9643ea8Slogwang } 199a9643ea8Slogwang else 200a9643ea8Slogwang { 201a9643ea8Slogwang len = rc; 202a9643ea8Slogwang break; 203a9643ea8Slogwang } 204a9643ea8Slogwang } 205a9643ea8Slogwang } while (true); 206a9643ea8Slogwang 207a9643ea8Slogwang return 0; 208a9643ea8Slogwang } 209a9643ea8Slogwang 210a9643ea8Slogwang int mt_tcpsendrcv(struct sockaddr_in* dst, void* pkg, int len, void* rcv_buf, int& buf_size, int timeout, MtFuncTcpMsgLen func) 211a9643ea8Slogwang { 212a9643ea8Slogwang if (!dst || !pkg || !rcv_buf || !func || len<1 || buf_size<1) 213a9643ea8Slogwang { 214a9643ea8Slogwang MTLOG_ERROR("input params invalid, dst[%p], pkg[%p], rcv_buf[%p], fun[%p], len[%d], buf_size[%d]", 215a9643ea8Slogwang dst, pkg, rcv_buf, func, len, buf_size); 216a9643ea8Slogwang return -10; 217a9643ea8Slogwang } 218a9643ea8Slogwang 219a9643ea8Slogwang int ret = 0, rc = 0; 220a9643ea8Slogwang int addr_len = sizeof(struct sockaddr_in); 221a9643ea8Slogwang utime64_t start_ms = MtFrame::Instance()->GetLastClock(); 222a9643ea8Slogwang utime64_t cost_time = 0; 223a9643ea8Slogwang int time_left = timeout; 224a9643ea8Slogwang 225a9643ea8Slogwang int sock = -1; 226a9643ea8Slogwang TcpKeepConn* conn = mt_tcp_get_keep_conn(dst, sock); 227a9643ea8Slogwang if ((conn == NULL) || (sock < 0)) 228a9643ea8Slogwang { 229a9643ea8Slogwang MTLOG_ERROR("socket[%d] get conn failed, ret[%m]", sock); 230a9643ea8Slogwang ret = -1; 231a9643ea8Slogwang goto EXIT_LABEL; 232a9643ea8Slogwang } 233a9643ea8Slogwang 234a9643ea8Slogwang rc = MtFrame::connect(sock, (struct sockaddr *)dst, addr_len, time_left); 235a9643ea8Slogwang if (rc < 0) 236a9643ea8Slogwang { 237a9643ea8Slogwang MTLOG_ERROR("socket[%d] connect failed, ret[%d][%m]", sock, rc); 238a9643ea8Slogwang ret = -4; 239a9643ea8Slogwang goto EXIT_LABEL; 240a9643ea8Slogwang } 241a9643ea8Slogwang 242a9643ea8Slogwang cost_time = MtFrame::Instance()->GetLastClock() - start_ms; 243a9643ea8Slogwang time_left = (timeout > (int)cost_time) ? (timeout - (int)cost_time) : 0; 244a9643ea8Slogwang rc = MtFrame::send(sock, pkg, len, 0, time_left); 245a9643ea8Slogwang if (rc < 0) 246a9643ea8Slogwang { 247a9643ea8Slogwang MTLOG_ERROR("socket[%d] send failed, ret[%d][%m]", sock, rc); 248a9643ea8Slogwang ret = -2; 249a9643ea8Slogwang goto EXIT_LABEL; 250a9643ea8Slogwang } 251a9643ea8Slogwang 252a9643ea8Slogwang cost_time = MtFrame::Instance()->GetLastClock() - start_ms; 253a9643ea8Slogwang time_left = (timeout > (int)cost_time) ? (timeout - (int)cost_time) : 0; 254a9643ea8Slogwang rc = mt_tcp_check_recv(sock, (char*)rcv_buf, buf_size, 0, time_left, func); 255a9643ea8Slogwang if (rc < 0) 256a9643ea8Slogwang { 257a9643ea8Slogwang MTLOG_ERROR("socket[%d] rcv failed, ret[%d][%m]", sock, rc); 258a9643ea8Slogwang ret = rc; 259a9643ea8Slogwang goto EXIT_LABEL; 260a9643ea8Slogwang } 261a9643ea8Slogwang 262a9643ea8Slogwang ret = 0; 263a9643ea8Slogwang 264a9643ea8Slogwang EXIT_LABEL: 265a9643ea8Slogwang 266a9643ea8Slogwang if (conn != NULL) 267a9643ea8Slogwang { 268a9643ea8Slogwang ConnectionMgr::Instance()->FreeConnection(conn, (ret < 0)); 269a9643ea8Slogwang } 270a9643ea8Slogwang 271a9643ea8Slogwang return ret; 272a9643ea8Slogwang } 273a9643ea8Slogwang 274a9643ea8Slogwang int mt_tcpsendrcv_short(struct sockaddr_in* dst, void* pkg, int len, void* rcv_buf, int& buf_size, int timeout, MtFuncTcpMsgLen func) 275a9643ea8Slogwang { 276a9643ea8Slogwang int ret = 0, rc = 0; 277a9643ea8Slogwang int addr_len = sizeof(struct sockaddr_in); 278a9643ea8Slogwang utime64_t start_ms = MtFrame::Instance()->GetLastClock(); 279a9643ea8Slogwang utime64_t cost_time = 0; 280a9643ea8Slogwang int time_left = timeout; 281a9643ea8Slogwang 282a9643ea8Slogwang if (!dst || !pkg || !rcv_buf || !func || len<1 || buf_size<1) 283a9643ea8Slogwang { 284a9643ea8Slogwang MTLOG_ERROR("input params invalid, dst[%p], pkg[%p], rcv_buf[%p], fun[%p], len[%d], buf_size[%d]", 285a9643ea8Slogwang dst, pkg, rcv_buf, func, len, buf_size); 286a9643ea8Slogwang return -10; 287a9643ea8Slogwang } 288a9643ea8Slogwang 289a9643ea8Slogwang int sock; 290a9643ea8Slogwang sock = mt_tcp_create_sock(); 291a9643ea8Slogwang if (sock < 0) 292a9643ea8Slogwang { 293a9643ea8Slogwang MTLOG_ERROR("create tcp socket failed, ret: %d", sock); 294a9643ea8Slogwang return -1; 295a9643ea8Slogwang } 296a9643ea8Slogwang 297a9643ea8Slogwang rc = MtFrame::connect(sock, (struct sockaddr *)dst, addr_len, time_left); 298a9643ea8Slogwang if (rc < 0) 299a9643ea8Slogwang { 300a9643ea8Slogwang MTLOG_ERROR("socket[%d] connect failed, ret[%d][%m]", sock, rc); 301a9643ea8Slogwang ret = -4; 302a9643ea8Slogwang goto EXIT_LABEL; 303a9643ea8Slogwang } 304a9643ea8Slogwang 305a9643ea8Slogwang cost_time = MtFrame::Instance()->GetLastClock() - start_ms; 306a9643ea8Slogwang time_left = (timeout > (int)cost_time) ? (timeout - (int)cost_time) : 0; 307a9643ea8Slogwang rc = MtFrame::send(sock, pkg, len, 0, time_left); 308a9643ea8Slogwang if (rc < 0) 309a9643ea8Slogwang { 310a9643ea8Slogwang MTLOG_ERROR("socket[%d] send failed, ret[%d][%m]", sock, rc); 311a9643ea8Slogwang ret = -2; 312a9643ea8Slogwang goto EXIT_LABEL; 313a9643ea8Slogwang } 314a9643ea8Slogwang 315a9643ea8Slogwang cost_time = MtFrame::Instance()->GetLastClock() - start_ms; 316a9643ea8Slogwang time_left = (timeout > (int)cost_time) ? (timeout - (int)cost_time) : 0; 317a9643ea8Slogwang rc = mt_tcp_check_recv(sock, (char*)rcv_buf, buf_size, 0, time_left, func); 318a9643ea8Slogwang if (rc < 0) 319a9643ea8Slogwang { 320a9643ea8Slogwang MTLOG_ERROR("socket[%d] rcv failed, ret[%d][%m]", sock, rc); 321a9643ea8Slogwang ret = rc; 322a9643ea8Slogwang goto EXIT_LABEL; 323a9643ea8Slogwang } 324a9643ea8Slogwang 325a9643ea8Slogwang ret = 0; 326a9643ea8Slogwang 327a9643ea8Slogwang EXIT_LABEL: 328a9643ea8Slogwang if (sock >= 0) 329a9643ea8Slogwang ::close(sock); 330a9643ea8Slogwang 331a9643ea8Slogwang return ret; 332a9643ea8Slogwang } 333a9643ea8Slogwang 334a9643ea8Slogwang int mt_tcpsend(struct sockaddr_in* dst, void* pkg, int len, int timeout) 335a9643ea8Slogwang { 336a9643ea8Slogwang if (!dst || !pkg || len<1) 337a9643ea8Slogwang { 338a9643ea8Slogwang MTLOG_ERROR("input params invalid, dst[%p], pkg[%p], len[%d]", dst, pkg, len); 339a9643ea8Slogwang return -10; 340a9643ea8Slogwang } 341a9643ea8Slogwang 342a9643ea8Slogwang int ret = 0, rc = 0; 343a9643ea8Slogwang int addr_len = sizeof(struct sockaddr_in); 344a9643ea8Slogwang utime64_t start_ms = MtFrame::Instance()->GetLastClock(); 345a9643ea8Slogwang utime64_t cost_time = 0; 346a9643ea8Slogwang int time_left = timeout; 347a9643ea8Slogwang 348a9643ea8Slogwang int sock = -1; 349a9643ea8Slogwang TcpKeepConn* conn = mt_tcp_get_keep_conn(dst, sock); 350a9643ea8Slogwang if ((conn == NULL) || (sock < 0)) 351a9643ea8Slogwang { 352a9643ea8Slogwang MTLOG_ERROR("socket[%d] get conn failed, ret[%m]", sock); 353a9643ea8Slogwang ret = -1; 354a9643ea8Slogwang goto EXIT_LABEL; 355a9643ea8Slogwang } 356a9643ea8Slogwang 357a9643ea8Slogwang rc = MtFrame::connect(sock, (struct sockaddr *)dst, addr_len, time_left); 358a9643ea8Slogwang if (rc < 0) 359a9643ea8Slogwang { 360a9643ea8Slogwang MTLOG_ERROR("socket[%d] connect failed, ret[%d][%m]", sock, rc); 361a9643ea8Slogwang ret = -4; 362a9643ea8Slogwang goto EXIT_LABEL; 363a9643ea8Slogwang } 364a9643ea8Slogwang 365a9643ea8Slogwang cost_time = MtFrame::Instance()->GetLastClock() - start_ms; 366a9643ea8Slogwang time_left = (timeout > (int)cost_time) ? (timeout - (int)cost_time) : 0; 367a9643ea8Slogwang rc = MtFrame::send(sock, pkg, len, 0, time_left); 368a9643ea8Slogwang if (rc < 0) 369a9643ea8Slogwang { 370a9643ea8Slogwang MTLOG_ERROR("socket[%d] send failed, ret[%d][%m]", sock, rc); 371a9643ea8Slogwang ret = -2; 372a9643ea8Slogwang goto EXIT_LABEL; 373a9643ea8Slogwang } 374a9643ea8Slogwang 375a9643ea8Slogwang ret = 0; 376a9643ea8Slogwang 377a9643ea8Slogwang EXIT_LABEL: 378a9643ea8Slogwang 379a9643ea8Slogwang if (conn != NULL) 380a9643ea8Slogwang { 381a9643ea8Slogwang ConnectionMgr::Instance()->FreeConnection(conn, (ret < 0)); 382a9643ea8Slogwang } 383a9643ea8Slogwang 384a9643ea8Slogwang return ret; 385a9643ea8Slogwang } 386a9643ea8Slogwang 387a9643ea8Slogwang int mt_tcpsend_short(struct sockaddr_in* dst, void* pkg, int len, int timeout) 388a9643ea8Slogwang { 389a9643ea8Slogwang if (!dst || !pkg || len<1) 390a9643ea8Slogwang { 391a9643ea8Slogwang MTLOG_ERROR("input params invalid, dst[%p], pkg[%p], len[%d]", dst, pkg, len); 392a9643ea8Slogwang return -10; 393a9643ea8Slogwang } 394a9643ea8Slogwang 395a9643ea8Slogwang int ret = 0, rc = 0; 396a9643ea8Slogwang int addr_len = sizeof(struct sockaddr_in); 397a9643ea8Slogwang utime64_t start_ms = MtFrame::Instance()->GetLastClock(); 398a9643ea8Slogwang utime64_t cost_time = 0; 399a9643ea8Slogwang int time_left = timeout; 400a9643ea8Slogwang 401a9643ea8Slogwang int sock = -1; 402a9643ea8Slogwang sock = mt_tcp_create_sock(); 403a9643ea8Slogwang if (sock < 0) 404a9643ea8Slogwang { 405a9643ea8Slogwang MTLOG_ERROR("create tcp socket failed, ret: %d", sock); 406a9643ea8Slogwang ret = -1; 407a9643ea8Slogwang goto EXIT_LABEL; 408a9643ea8Slogwang } 409a9643ea8Slogwang 410a9643ea8Slogwang rc = MtFrame::connect(sock, (struct sockaddr *)dst, addr_len, time_left); 411a9643ea8Slogwang if (rc < 0) 412a9643ea8Slogwang { 413a9643ea8Slogwang MTLOG_ERROR("socket[%d] connect failed, ret[%d][%m]", sock, rc); 414a9643ea8Slogwang ret = -4; 415a9643ea8Slogwang goto EXIT_LABEL; 416a9643ea8Slogwang } 417a9643ea8Slogwang 418a9643ea8Slogwang cost_time = MtFrame::Instance()->GetLastClock() - start_ms; 419a9643ea8Slogwang time_left = (timeout > (int)cost_time) ? (timeout - (int)cost_time) : 0; 420a9643ea8Slogwang rc = MtFrame::send(sock, pkg, len, 0, time_left); 421a9643ea8Slogwang if (rc < 0) 422a9643ea8Slogwang { 423a9643ea8Slogwang MTLOG_ERROR("socket[%d] send failed, ret[%d][%m]", sock, rc); 424a9643ea8Slogwang ret = -2; 425a9643ea8Slogwang goto EXIT_LABEL; 426a9643ea8Slogwang } 427a9643ea8Slogwang 428a9643ea8Slogwang ret = 0; 429a9643ea8Slogwang 430a9643ea8Slogwang EXIT_LABEL: 431a9643ea8Slogwang 432a9643ea8Slogwang if (sock >= 0) 433a9643ea8Slogwang ::close(sock); 434a9643ea8Slogwang 435a9643ea8Slogwang return ret; 436a9643ea8Slogwang } 437a9643ea8Slogwang 438a9643ea8Slogwang int mt_tcpsendrcv_ex(struct sockaddr_in* dst, void* pkg, int len, void* rcv_buf, int* buf_size, int timeout, MtFuncTcpMsgLen func, MT_TCP_CONN_TYPE type) 439a9643ea8Slogwang { 440a9643ea8Slogwang if(!dst || !pkg || len<1) 441a9643ea8Slogwang { 442a9643ea8Slogwang MTLOG_ERROR("input params invalid, dst[%p], pkg[%p], rcv_buf[%p], fun[%p], len[%d], buf_size[%p]type[%d]", 443a9643ea8Slogwang dst, pkg, rcv_buf, func, len, buf_size,type); 444a9643ea8Slogwang return -10; 445a9643ea8Slogwang } 446a9643ea8Slogwang 447a9643ea8Slogwang switch (type) 448a9643ea8Slogwang { 449a9643ea8Slogwang case MT_TCP_LONG: 450a9643ea8Slogwang { 451a9643ea8Slogwang return mt_tcpsendrcv(dst, pkg, len, rcv_buf, *buf_size, timeout, func); 452a9643ea8Slogwang } 453a9643ea8Slogwang 454a9643ea8Slogwang case MT_TCP_LONG_SNDONLY: 455a9643ea8Slogwang { 456a9643ea8Slogwang return mt_tcpsend(dst, pkg, len, timeout); 457a9643ea8Slogwang } 458a9643ea8Slogwang 459a9643ea8Slogwang case MT_TCP_SHORT: 460a9643ea8Slogwang { 461a9643ea8Slogwang return mt_tcpsendrcv_short(dst, pkg, len, rcv_buf, *buf_size, timeout, func); 462a9643ea8Slogwang } 463a9643ea8Slogwang 464a9643ea8Slogwang case MT_TCP_SHORT_SNDONLY: 465a9643ea8Slogwang { 466a9643ea8Slogwang return mt_tcpsend_short(dst, pkg, len, timeout); 467a9643ea8Slogwang } 468a9643ea8Slogwang 469a9643ea8Slogwang default: 470a9643ea8Slogwang { 471a9643ea8Slogwang MTLOG_ERROR("input params invalid, dst[%p], pkg[%p], rcv_buf[%p], fun[%p], len[%d], buf_size[%p]type[%d]", 472a9643ea8Slogwang dst, pkg, rcv_buf, func, len, buf_size,type); 473a9643ea8Slogwang return -10; 474a9643ea8Slogwang } 475a9643ea8Slogwang } 476a9643ea8Slogwang 477a9643ea8Slogwang return 0; 478a9643ea8Slogwang } 479a9643ea8Slogwang 480a9643ea8Slogwang static void mt_task_process(void* arg) 481a9643ea8Slogwang { 482a9643ea8Slogwang int rc = 0; 483a9643ea8Slogwang IMtTask* task = (IMtTask*)arg; 484a9643ea8Slogwang if (!task) 485a9643ea8Slogwang { 486a9643ea8Slogwang MTLOG_ERROR("Invalid arg, error"); 487a9643ea8Slogwang return; 488a9643ea8Slogwang } 489a9643ea8Slogwang 490a9643ea8Slogwang rc = task->Process(); 491a9643ea8Slogwang if (rc != 0) 492a9643ea8Slogwang { 493a9643ea8Slogwang MTLOG_DEBUG("task process failed(%d), log", rc); 494a9643ea8Slogwang } 495a9643ea8Slogwang 496a9643ea8Slogwang task->SetResult(rc); 497a9643ea8Slogwang 498a9643ea8Slogwang return; 499a9643ea8Slogwang }; 500a9643ea8Slogwang 501a9643ea8Slogwang int mt_exec_all_task(IMtTaskList& req_list) 502a9643ea8Slogwang { 503a9643ea8Slogwang MtFrame* mtframe = MtFrame::Instance(); 504a9643ea8Slogwang MicroThread* thread = mtframe->GetActiveThread(); 505a9643ea8Slogwang IMtTask* task = NULL; 506a9643ea8Slogwang MicroThread* sub = NULL; 507a9643ea8Slogwang MicroThread* tmp = NULL; 508a9643ea8Slogwang int rc = -1; 509a9643ea8Slogwang 510a9643ea8Slogwang MicroThread::SubThreadList list; 511a9643ea8Slogwang TAILQ_INIT(&list); 512a9643ea8Slogwang 513a9643ea8Slogwang if (0 == req_list.size()) 514a9643ea8Slogwang { 515a9643ea8Slogwang MTLOG_DEBUG("no task for execult"); 516a9643ea8Slogwang return 0; 517a9643ea8Slogwang } 518a9643ea8Slogwang 519a9643ea8Slogwang for (IMtTaskList::iterator it = req_list.begin(); it != req_list.end(); ++it) 520a9643ea8Slogwang { 521a9643ea8Slogwang task = *it; 522a9643ea8Slogwang sub = MtFrame::CreateThread(mt_task_process, task, false); 523a9643ea8Slogwang if (NULL == sub) 524a9643ea8Slogwang { 525a9643ea8Slogwang MTLOG_ERROR("create sub thread failed"); 526a9643ea8Slogwang goto EXIT_LABEL; 527a9643ea8Slogwang } 528a9643ea8Slogwang 529a9643ea8Slogwang sub->SetType(MicroThread::SUB_THREAD); 530a9643ea8Slogwang TAILQ_INSERT_TAIL(&list, sub, _sub_entry); 531a9643ea8Slogwang } 532a9643ea8Slogwang 533a9643ea8Slogwang TAILQ_FOREACH_SAFE(sub, &list, _sub_entry, tmp) 534a9643ea8Slogwang { 535a9643ea8Slogwang TAILQ_REMOVE(&list, sub, _sub_entry); 536a9643ea8Slogwang thread->AddSubThread(sub); 537a9643ea8Slogwang mtframe->InsertRunable(sub); 538a9643ea8Slogwang } 539a9643ea8Slogwang 540a9643ea8Slogwang thread->Wait(); 541a9643ea8Slogwang rc = 0; 542a9643ea8Slogwang 543a9643ea8Slogwang EXIT_LABEL: 544a9643ea8Slogwang 545a9643ea8Slogwang TAILQ_FOREACH_SAFE(sub, &list, _sub_entry, tmp) 546a9643ea8Slogwang { 547a9643ea8Slogwang TAILQ_REMOVE(&list, sub, _sub_entry); 548a9643ea8Slogwang mtframe->FreeThread(sub); 549a9643ea8Slogwang } 550a9643ea8Slogwang 551a9643ea8Slogwang return rc; 552a9643ea8Slogwang 553a9643ea8Slogwang } 554a9643ea8Slogwang 555a9643ea8Slogwang void mt_set_msg_private(void *data) 556a9643ea8Slogwang { 557a9643ea8Slogwang MicroThread *msg_thread = MtFrame::Instance()->GetRootThread(); 558a9643ea8Slogwang if (msg_thread != NULL) 559a9643ea8Slogwang msg_thread->SetPrivate(data); 560a9643ea8Slogwang } 561a9643ea8Slogwang 562a9643ea8Slogwang void* mt_get_msg_private() 563a9643ea8Slogwang { 564a9643ea8Slogwang MicroThread *msg_thread = MtFrame::Instance()->GetRootThread(); 565a9643ea8Slogwang if (NULL == msg_thread) 566a9643ea8Slogwang { 567a9643ea8Slogwang return NULL; 568a9643ea8Slogwang } 569a9643ea8Slogwang 570a9643ea8Slogwang return msg_thread->GetPrivate(); 571a9643ea8Slogwang } 572a9643ea8Slogwang 573a02c88d6Slogwang bool mt_init_frame(int argc, char * const argv[]) 574a9643ea8Slogwang { 575a02c88d6Slogwang if (argc) { 576a02c88d6Slogwang ff_init(argc, argv); 577a9643ea8Slogwang ff_set_hook_flag(); 578a9643ea8Slogwang } 579a9643ea8Slogwang memset(&g_mt_syscall_tab, 0, sizeof(g_mt_syscall_tab)); 580a9643ea8Slogwang return MtFrame::Instance()->InitFrame(); 581a9643ea8Slogwang } 582a9643ea8Slogwang 583a9643ea8Slogwang void mt_set_stack_size(unsigned int bytes) 584a9643ea8Slogwang { 585a9643ea8Slogwang ThreadPool::SetDefaultStackSize(bytes); 586a9643ea8Slogwang } 587a9643ea8Slogwang 588a9643ea8Slogwang int mt_recvfrom(int fd, void *buf, int len, int flags, struct sockaddr *from, socklen_t *fromlen, int timeout) 589a9643ea8Slogwang { 590a9643ea8Slogwang return MtFrame::recvfrom(fd, buf, len, flags, from, fromlen, timeout); 591a9643ea8Slogwang } 592a9643ea8Slogwang 593a9643ea8Slogwang int mt_sendto(int fd, const void *msg, int len, int flags, const struct sockaddr *to, int tolen, int timeout) 594a9643ea8Slogwang { 595a9643ea8Slogwang return MtFrame::sendto(fd, msg, len, flags, to, tolen, timeout); 596a9643ea8Slogwang } 597a9643ea8Slogwang 598a9643ea8Slogwang int mt_connect(int fd, const struct sockaddr *addr, int addrlen, int timeout) 599a9643ea8Slogwang { 600a9643ea8Slogwang return MtFrame::connect(fd, addr, addrlen, timeout); 601a9643ea8Slogwang } 602a9643ea8Slogwang 603a9643ea8Slogwang int mt_accept(int fd, struct sockaddr *addr, socklen_t *addrlen, int timeout) 604a9643ea8Slogwang { 605a9643ea8Slogwang return MtFrame::accept(fd, addr, addrlen, timeout); 606a9643ea8Slogwang } 607a9643ea8Slogwang 608a9643ea8Slogwang ssize_t mt_read(int fd, void *buf, size_t nbyte, int timeout) 609a9643ea8Slogwang { 610a9643ea8Slogwang return MtFrame::read(fd, buf, nbyte, timeout); 611a9643ea8Slogwang } 612a9643ea8Slogwang 613a9643ea8Slogwang ssize_t mt_write(int fd, const void *buf, size_t nbyte, int timeout) 614a9643ea8Slogwang { 615a9643ea8Slogwang return MtFrame::write(fd, buf, nbyte, timeout); 616a9643ea8Slogwang } 617a9643ea8Slogwang 618a9643ea8Slogwang ssize_t mt_recv(int fd, void *buf, int len, int flags, int timeout) 619a9643ea8Slogwang { 620a9643ea8Slogwang return MtFrame::recv(fd, buf, len, flags, timeout); 621a9643ea8Slogwang } 622a9643ea8Slogwang 623a9643ea8Slogwang ssize_t mt_send(int fd, const void *buf, size_t nbyte, int flags, int timeout) 624a9643ea8Slogwang { 625a9643ea8Slogwang return MtFrame::send(fd, buf, nbyte, flags, timeout); 626a9643ea8Slogwang } 627a9643ea8Slogwang 628a9643ea8Slogwang void mt_sleep(int ms) 629a9643ea8Slogwang { 630a9643ea8Slogwang MtFrame::sleep(ms); 631a9643ea8Slogwang } 632a9643ea8Slogwang 633a9643ea8Slogwang unsigned long long mt_time_ms(void) 634a9643ea8Slogwang { 635a9643ea8Slogwang return MtFrame::Instance()->GetLastClock(); 636a9643ea8Slogwang } 637a9643ea8Slogwang 638a9643ea8Slogwang int mt_wait_events(int fd, int events, int timeout) 639a9643ea8Slogwang { 640a9643ea8Slogwang return MtFrame::Instance()->WaitEvents(fd, events, timeout); 641a9643ea8Slogwang } 642a9643ea8Slogwang 643a9643ea8Slogwang void* mt_start_thread(void* entry, void* args) 644a9643ea8Slogwang { 645a9643ea8Slogwang return MtFrame::Instance()->CreateThread((ThreadStart)entry, args, true); 646a9643ea8Slogwang } 647a9643ea8Slogwang 648a9643ea8Slogwang #define BUF_ALIGNMENT_SIZE 4096 649a9643ea8Slogwang #define BUF_ALIGN_SIZE(x) (((x)+BUF_ALIGNMENT_SIZE-1)&~(BUF_ALIGNMENT_SIZE-1)) 650a9643ea8Slogwang #define BUF_DEFAULT_SIZE 4096 651a9643ea8Slogwang 652a9643ea8Slogwang class ScopedBuf 653a9643ea8Slogwang { 654a9643ea8Slogwang public: 655a9643ea8Slogwang ScopedBuf(void*& buf_keeper, bool keep) 656a9643ea8Slogwang :buf_keeper_(buf_keeper),buf_(0),len_(0),len_watermark_(0),keep_(keep) 657a9643ea8Slogwang {} 658a9643ea8Slogwang 659a9643ea8Slogwang int Alloc(int len) 660a9643ea8Slogwang { 661a9643ea8Slogwang if(len<len_) 662a9643ea8Slogwang { 663*5ac59bc4Slogwang return -1; 664a9643ea8Slogwang } 665a9643ea8Slogwang 666a9643ea8Slogwang if(len==0) 667a9643ea8Slogwang { 668a9643ea8Slogwang len = BUF_ALIGNMENT_SIZE; 669a9643ea8Slogwang } 670a9643ea8Slogwang if(len_==len) 671a9643ea8Slogwang { 672a9643ea8Slogwang return 0; 673a9643ea8Slogwang } 674a9643ea8Slogwang 675a9643ea8Slogwang len_ = BUF_ALIGN_SIZE(len); 676a9643ea8Slogwang if(len_==0) 677a9643ea8Slogwang { 678a9643ea8Slogwang len_ = BUF_DEFAULT_SIZE; 679a9643ea8Slogwang } 680a9643ea8Slogwang len_watermark_ = len_-BUF_ALIGNMENT_SIZE; 681a9643ea8Slogwang char* tmp = (char*)realloc(buf_, len_); 682a9643ea8Slogwang if(tmp==NULL) 683a9643ea8Slogwang { 684*5ac59bc4Slogwang return -2; 685a9643ea8Slogwang } 686a9643ea8Slogwang 687a9643ea8Slogwang buf_ = tmp; 688a9643ea8Slogwang return 0; 689a9643ea8Slogwang } 690a9643ea8Slogwang 691a9643ea8Slogwang void reset() 692a9643ea8Slogwang { 693a9643ea8Slogwang if(keep_) 694a9643ea8Slogwang { 695a9643ea8Slogwang buf_keeper_ = (void*)buf_; 696a9643ea8Slogwang buf_ = NULL; 697a9643ea8Slogwang } 698a9643ea8Slogwang } 699a9643ea8Slogwang 700a9643ea8Slogwang ~ScopedBuf() 701a9643ea8Slogwang { 702a9643ea8Slogwang if(buf_!=NULL) 703a9643ea8Slogwang { 704a9643ea8Slogwang free(buf_); 705a9643ea8Slogwang buf_ = NULL; 706a9643ea8Slogwang } 707a9643ea8Slogwang } 708a9643ea8Slogwang 709a9643ea8Slogwang public: 710a9643ea8Slogwang void* &buf_keeper_; 711a9643ea8Slogwang char* buf_; 712a9643ea8Slogwang int len_; 713a9643ea8Slogwang int len_watermark_; 714a9643ea8Slogwang bool keep_; 715a9643ea8Slogwang 716a9643ea8Slogwang }; 717a9643ea8Slogwang 718a9643ea8Slogwang static int mt_tcp_check_recv(int sock, void*& rcv_buf, int &len, int flags, 719a9643ea8Slogwang int timeout, MtFuncTcpMsgChecker check_func, void* msg_ctx, bool keep_rcv_buf) 720a9643ea8Slogwang { 721a9643ea8Slogwang int recv_len = 0; 722a9643ea8Slogwang utime64_t start_ms = MtFrame::Instance()->GetLastClock(); 723a9643ea8Slogwang 724a9643ea8Slogwang int rc = 0; 725a9643ea8Slogwang int ret = 0; 726a9643ea8Slogwang int pkg_len = 0; 727a9643ea8Slogwang bool msg_len_detected = false; 728a9643ea8Slogwang 729a9643ea8Slogwang ScopedBuf sbuf(rcv_buf, keep_rcv_buf); 730a9643ea8Slogwang ret = sbuf.Alloc(len); 731a9643ea8Slogwang 732a9643ea8Slogwang if(ret!=0) 733a9643ea8Slogwang { 734a9643ea8Slogwang MTLOG_ERROR("tcp socket[%d] recv failed ret[%d], alloc rcv buf failed, [%m]", sock, ret); 735a9643ea8Slogwang return -11; 736a9643ea8Slogwang } 737a9643ea8Slogwang 738a9643ea8Slogwang do 739a9643ea8Slogwang { 740a9643ea8Slogwang utime64_t cost_time = MtFrame::Instance()->GetLastClock() - start_ms; 741a9643ea8Slogwang if (cost_time > (utime64_t)timeout) 742a9643ea8Slogwang { 743a9643ea8Slogwang errno = ETIME; 744a9643ea8Slogwang MTLOG_ERROR("tcp socket[%d] recv not ok, timeout", sock); 745a9643ea8Slogwang return -3; 746a9643ea8Slogwang } 747a9643ea8Slogwang 748a9643ea8Slogwang rc = MtFrame::recv(sock, (sbuf.buf_ + recv_len), (sbuf.len_ - recv_len), 0, (timeout - (int)cost_time)); 749a9643ea8Slogwang if (rc < 0) 750a9643ea8Slogwang { 751a9643ea8Slogwang MTLOG_ERROR("tcp socket[%d] recv failed ret[%d][%m]", sock, rc); 752a9643ea8Slogwang return -3; 753a9643ea8Slogwang } 754*5ac59bc4Slogwang else if (rc == 0) 755a9643ea8Slogwang { 756a9643ea8Slogwang 757*5ac59bc4Slogwang if(recv_len==0) 758a9643ea8Slogwang { 759a9643ea8Slogwang MTLOG_ERROR("tcp socket[%d] remote close", sock); 760a9643ea8Slogwang return -7; 761a9643ea8Slogwang } 762a9643ea8Slogwang 763a9643ea8Slogwang rc = check_func(sbuf.buf_, recv_len, true, msg_ctx, msg_len_detected); 764a9643ea8Slogwang 765*5ac59bc4Slogwang if(rc!=recv_len) 766a9643ea8Slogwang { 767a9643ea8Slogwang MTLOG_ERROR("tcp socket[%d] remote close", sock); 768a9643ea8Slogwang return -7; 769a9643ea8Slogwang } 770a9643ea8Slogwang len = recv_len; 771a9643ea8Slogwang break; 772a9643ea8Slogwang } 773a9643ea8Slogwang recv_len += rc; 774a9643ea8Slogwang 775a9643ea8Slogwang if((!msg_len_detected)||recv_len==pkg_len) 776a9643ea8Slogwang { 777a9643ea8Slogwang rc = check_func(sbuf.buf_, recv_len, false, msg_ctx,msg_len_detected); 778a9643ea8Slogwang if(msg_len_detected) 779a9643ea8Slogwang { 780a9643ea8Slogwang pkg_len = rc; 781a9643ea8Slogwang } 782a9643ea8Slogwang } 783a9643ea8Slogwang else 784a9643ea8Slogwang { 785a9643ea8Slogwang rc = pkg_len; 786a9643ea8Slogwang } 787a9643ea8Slogwang 788a9643ea8Slogwang if (rc < 0) 789a9643ea8Slogwang { 790a9643ea8Slogwang MTLOG_ERROR("tcp socket[%d] user check pkg error[%d]", sock, rc); 791a9643ea8Slogwang return -5; 792a9643ea8Slogwang } 793*5ac59bc4Slogwang else if (rc == 0) 794a9643ea8Slogwang { 795a9643ea8Slogwang if(sbuf.len_ > recv_len) 796a9643ea8Slogwang { 797a9643ea8Slogwang continue; 798a9643ea8Slogwang } 799*5ac59bc4Slogwang 800a9643ea8Slogwang ret = sbuf.Alloc(sbuf.len_<<1); 801a9643ea8Slogwang 802a9643ea8Slogwang if(ret!=0) 803a9643ea8Slogwang { 804a9643ea8Slogwang MTLOG_ERROR("tcp socket[%d] recv failed ret[%d], alloc rcv buf failed, [%m]", sock, ret); 805a9643ea8Slogwang return -11; 806a9643ea8Slogwang } 807a9643ea8Slogwang } 808*5ac59bc4Slogwang else 809a9643ea8Slogwang { 810*5ac59bc4Slogwang if (rc > recv_len) 811a9643ea8Slogwang { 812*5ac59bc4Slogwang if(sbuf.len_ > recv_len) 813a9643ea8Slogwang { 814a9643ea8Slogwang continue; 815a9643ea8Slogwang } 816a9643ea8Slogwang 817a9643ea8Slogwang ret = sbuf.Alloc(rc); 818a9643ea8Slogwang 819a9643ea8Slogwang if(ret!=0) 820a9643ea8Slogwang { 821a9643ea8Slogwang MTLOG_ERROR("tcp socket[%d] recv failed ret[%d], alloc rcv buf failed, [%m]", sock, ret); 822a9643ea8Slogwang return -11; 823a9643ea8Slogwang } 824a9643ea8Slogwang } 825*5ac59bc4Slogwang else if(rc==recv_len) 826a9643ea8Slogwang { 827a9643ea8Slogwang len = rc; 828a9643ea8Slogwang break; 829a9643ea8Slogwang } 830*5ac59bc4Slogwang else 831a9643ea8Slogwang { 832a9643ea8Slogwang MTLOG_ERROR("tcp socket[%d] user check pkg error, pkg len < recv_len", sock); 833a9643ea8Slogwang return -5; 834a9643ea8Slogwang } 835a9643ea8Slogwang } 836a9643ea8Slogwang } while (true); 837a9643ea8Slogwang 838a9643ea8Slogwang sbuf.reset(); 839a9643ea8Slogwang 840a9643ea8Slogwang return 0; 841a9643ea8Slogwang } 842a9643ea8Slogwang 843a9643ea8Slogwang int mt_tcpsendrcv(struct sockaddr_in* dst, void* pkg, int len, void*& rcv_buf, int& recv_pkg_size, 844a9643ea8Slogwang int timeout, MtFuncTcpMsgChecker check_func, void* msg_ctx, bool keep_rcv_buf) 845a9643ea8Slogwang { 846a9643ea8Slogwang if(!dst || !pkg || len<1) 847a9643ea8Slogwang { 848a9643ea8Slogwang MTLOG_ERROR("input params invalid, dst[%p], pkg[%p], len[%d], fun[%p]", 849a9643ea8Slogwang dst, pkg, len, check_func); 850a9643ea8Slogwang return -10; 851a9643ea8Slogwang } 852a9643ea8Slogwang 853a9643ea8Slogwang 854a9643ea8Slogwang int ret = 0, rc = 0; 855a9643ea8Slogwang int addr_len = sizeof(struct sockaddr_in); 856a9643ea8Slogwang utime64_t start_ms = MtFrame::Instance()->GetLastClock(); 857a9643ea8Slogwang utime64_t cost_time = 0; 858a9643ea8Slogwang int time_left = timeout; 859a9643ea8Slogwang 860a9643ea8Slogwang int sock = -1; 861a9643ea8Slogwang TcpKeepConn* conn = mt_tcp_get_keep_conn(dst, sock); 862a9643ea8Slogwang if ((conn == NULL) || (sock < 0)) 863a9643ea8Slogwang { 864a9643ea8Slogwang MTLOG_ERROR("socket[%d] get conn failed, ret[%m]", sock); 865a9643ea8Slogwang ret = -1; 866a9643ea8Slogwang goto EXIT_LABEL; 867a9643ea8Slogwang } 868a9643ea8Slogwang 869a9643ea8Slogwang rc = MtFrame::connect(sock, (struct sockaddr *)dst, addr_len, time_left); 870a9643ea8Slogwang if (rc < 0) 871a9643ea8Slogwang { 872a9643ea8Slogwang MTLOG_ERROR("socket[%d] connect failed, ret[%d][%m]", sock, rc); 873a9643ea8Slogwang ret = -4; 874a9643ea8Slogwang goto EXIT_LABEL; 875a9643ea8Slogwang } 876a9643ea8Slogwang 877a9643ea8Slogwang cost_time = MtFrame::Instance()->GetLastClock() - start_ms; 878a9643ea8Slogwang time_left = (timeout > (int)cost_time) ? (timeout - (int)cost_time) : 0; 879a9643ea8Slogwang rc = MtFrame::send(sock, pkg, len, 0, time_left); 880a9643ea8Slogwang if (rc < 0) 881a9643ea8Slogwang { 882a9643ea8Slogwang MTLOG_ERROR("socket[%d] send failed, ret[%d][%m]", sock, rc); 883a9643ea8Slogwang ret = -2; 884a9643ea8Slogwang goto EXIT_LABEL; 885a9643ea8Slogwang } 886a9643ea8Slogwang 887a9643ea8Slogwang cost_time = MtFrame::Instance()->GetLastClock() - start_ms; 888a9643ea8Slogwang time_left = (timeout > (int)cost_time) ? (timeout - (int)cost_time) : 0; 889a9643ea8Slogwang 890a9643ea8Slogwang rc = mt_tcp_check_recv(sock, rcv_buf, recv_pkg_size, 0, time_left, check_func, msg_ctx, keep_rcv_buf); 891a9643ea8Slogwang if (rc < 0) 892a9643ea8Slogwang { 893a9643ea8Slogwang MTLOG_ERROR("socket[%d] rcv failed, ret[%d][%m]", sock, rc); 894a9643ea8Slogwang ret = rc; 895a9643ea8Slogwang goto EXIT_LABEL; 896a9643ea8Slogwang } 897a9643ea8Slogwang 898a9643ea8Slogwang ret = 0; 899a9643ea8Slogwang 900a9643ea8Slogwang EXIT_LABEL: 901a9643ea8Slogwang if (conn != NULL) 902a9643ea8Slogwang { 903a9643ea8Slogwang ConnectionMgr::Instance()->FreeConnection(conn, (ret < 0)); 904a9643ea8Slogwang } 905a9643ea8Slogwang 906a9643ea8Slogwang return ret; 907a9643ea8Slogwang } 908a9643ea8Slogwang 909a9643ea8Slogwang int mt_tcpsendrcv_short(struct sockaddr_in* dst, void* pkg, int len, void*& rcv_buf, int& recv_pkg_size, 910a9643ea8Slogwang int timeout, MtFuncTcpMsgChecker check_func, void* msg_ctx, bool keep_rcv_buf) 911a9643ea8Slogwang { 912a9643ea8Slogwang int ret = 0, rc = 0; 913a9643ea8Slogwang int addr_len = sizeof(struct sockaddr_in); 914a9643ea8Slogwang utime64_t start_ms = MtFrame::Instance()->GetLastClock(); 915a9643ea8Slogwang utime64_t cost_time = 0; 916a9643ea8Slogwang int time_left = timeout; 917a9643ea8Slogwang 918a9643ea8Slogwang if(!dst || !pkg || len<1) 919a9643ea8Slogwang { 920a9643ea8Slogwang MTLOG_ERROR("input params invalid, dst[%p], pkg[%p], len[%d], fun[%p]", 921a9643ea8Slogwang dst, pkg, len, check_func); 922a9643ea8Slogwang return -10; 923a9643ea8Slogwang } 924a9643ea8Slogwang 925a9643ea8Slogwang int sock; 926a9643ea8Slogwang sock = mt_tcp_create_sock(); 927a9643ea8Slogwang if (sock < 0) 928a9643ea8Slogwang { 929a9643ea8Slogwang MTLOG_ERROR("create tcp socket failed, ret: %d", sock); 930a9643ea8Slogwang return -1; 931a9643ea8Slogwang } 932a9643ea8Slogwang 933a9643ea8Slogwang rc = MtFrame::connect(sock, (struct sockaddr *)dst, addr_len, time_left); 934a9643ea8Slogwang if (rc < 0) 935a9643ea8Slogwang { 936a9643ea8Slogwang MTLOG_ERROR("socket[%d] connect failed, ret[%d][%m]", sock, rc); 937a9643ea8Slogwang ret = -4; 938a9643ea8Slogwang goto EXIT_LABEL; 939a9643ea8Slogwang } 940a9643ea8Slogwang 941a9643ea8Slogwang cost_time = MtFrame::Instance()->GetLastClock() - start_ms; 942a9643ea8Slogwang time_left = (timeout > (int)cost_time) ? (timeout - (int)cost_time) : 0; 943a9643ea8Slogwang rc = MtFrame::send(sock, pkg, len, 0, time_left); 944a9643ea8Slogwang if (rc < 0) 945a9643ea8Slogwang { 946a9643ea8Slogwang MTLOG_ERROR("socket[%d] send failed, ret[%d][%m]", sock, rc); 947a9643ea8Slogwang ret = -2; 948a9643ea8Slogwang goto EXIT_LABEL; 949a9643ea8Slogwang } 950a9643ea8Slogwang 951a9643ea8Slogwang cost_time = MtFrame::Instance()->GetLastClock() - start_ms; 952a9643ea8Slogwang time_left = (timeout > (int)cost_time) ? (timeout - (int)cost_time) : 0; 953a9643ea8Slogwang rc = mt_tcp_check_recv(sock, rcv_buf, recv_pkg_size, 0, time_left, check_func, msg_ctx, keep_rcv_buf); 954a9643ea8Slogwang 955a9643ea8Slogwang if (rc < 0) 956a9643ea8Slogwang { 957a9643ea8Slogwang MTLOG_ERROR("socket[%d] rcv failed, ret[%d][%m]", sock, rc); 958a9643ea8Slogwang ret = rc; 959a9643ea8Slogwang goto EXIT_LABEL; 960a9643ea8Slogwang } 961a9643ea8Slogwang 962a9643ea8Slogwang ret = 0; 963a9643ea8Slogwang 964a9643ea8Slogwang EXIT_LABEL: 965a9643ea8Slogwang if (sock >= 0) 966a9643ea8Slogwang ::close(sock); 967a9643ea8Slogwang 968a9643ea8Slogwang return ret; 969a9643ea8Slogwang } 970a9643ea8Slogwang 971a9643ea8Slogwang int mt_tcpsendrcv_ex(struct sockaddr_in* dst, void* pkg, int len, void*& rcv_buf, int& rcv_pkg_size, 972a9643ea8Slogwang int timeout, MtFuncTcpMsgChecker check_func, void* msg_ctx, 973a9643ea8Slogwang MT_TCP_CONN_TYPE type, bool keep_rcv_buf) 974a9643ea8Slogwang { 975a9643ea8Slogwang if(!dst || !pkg || len<1) 976a9643ea8Slogwang { 977a9643ea8Slogwang MTLOG_ERROR("input params invalid, dst[%p], pkg[%p], len[%d], fun[%p], msg_ctx[%p], type[%d]", 978a9643ea8Slogwang dst, pkg, len, check_func, msg_ctx, type); 979a9643ea8Slogwang return -10; 980a9643ea8Slogwang } 981a9643ea8Slogwang 982a9643ea8Slogwang switch (type) 983a9643ea8Slogwang { 984a9643ea8Slogwang case MT_TCP_LONG: 985a9643ea8Slogwang { 986a9643ea8Slogwang return mt_tcpsendrcv(dst, pkg, len, rcv_buf, rcv_pkg_size, timeout, check_func, msg_ctx, keep_rcv_buf); 987a9643ea8Slogwang } 988a9643ea8Slogwang 989a9643ea8Slogwang case MT_TCP_LONG_SNDONLY: 990a9643ea8Slogwang { 991a9643ea8Slogwang return mt_tcpsend(dst, pkg, len, timeout); 992a9643ea8Slogwang } 993a9643ea8Slogwang 994a9643ea8Slogwang case MT_TCP_SHORT: 995a9643ea8Slogwang { 996a9643ea8Slogwang return mt_tcpsendrcv_short(dst, pkg, len, rcv_buf, rcv_pkg_size, timeout, check_func, msg_ctx, keep_rcv_buf); 997a9643ea8Slogwang } 998a9643ea8Slogwang 999a9643ea8Slogwang case MT_TCP_SHORT_SNDONLY: 1000a9643ea8Slogwang { 1001a9643ea8Slogwang return mt_tcpsend_short(dst, pkg, len, timeout); 1002a9643ea8Slogwang } 1003a9643ea8Slogwang 1004a9643ea8Slogwang default: 1005a9643ea8Slogwang { 1006a9643ea8Slogwang MTLOG_ERROR("input params invalid, dst[%p], pkg[%p], len[%d], fun[%p], msg_ctx[%p], type[%d]", 1007a9643ea8Slogwang dst, pkg, len, check_func, msg_ctx, type); 1008a9643ea8Slogwang return -10; 1009a9643ea8Slogwang } 1010a9643ea8Slogwang } 1011a9643ea8Slogwang 1012a9643ea8Slogwang return 0; 1013a9643ea8Slogwang } 1014a9643ea8Slogwang 1015a9643ea8Slogwang } 1016