1 2 /** 3 * Tencent is pleased to support the open source community by making MSEC available. 4 * 5 * Copyright (C) 2016 THL A29 Limited, a Tencent company. All rights reserved. 6 * 7 * Licensed under the GNU General Public License, Version 2.0 (the "License"); 8 * you may not use this file except in compliance with the License. You may 9 * obtain a copy of the License at 10 * 11 * https://opensource.org/licenses/GPL-2.0 12 * 13 * Unless required by applicable law or agreed to in writing, software distributed under the 14 * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, 15 * either express or implied. See the License for the specific language governing permissions 16 * and limitations under the License. 17 */ 18 19 20 /** 21 * @filename mt_sys_call.cpp 22 * @info �̷߳�װϵͳapi, ͬ�������߳�API��ʵ���첽���� 23 */ 24 25 #include "kqueue_proxy.h" 26 #include "micro_thread.h" 27 #include "mt_connection.h" 28 #include "mt_api.h" 29 #include "ff_api.h" 30 #include "mt_sys_hook.h" 31 32 namespace NS_MICRO_THREAD { 33 34 /** 35 * @brief ��������˿ڵ�socket�շ��ӿ�, ��socket������������, ҵ������֤������ 36 * @param dst -�����͵�Ŀ�ĵ�ַ 37 * @param pkg -�������װ�İ��� 38 * @param len -�������װ�İ��峤�� 39 * @param rcv_buf -����Ӧ�����buff 40 * @param buf_size -modify-����Ӧ�����buff��С, �ɹ�����ʱ, ��ΪӦ������� 41 * @param timeout -��ʱʱ��, ��λms 42 * @return 0 �ɹ�, -1 ��socketʧ��, -2 ��������ʧ��, -3 ����Ӧ��ʧ��, �ɴ�ӡerrno, -10 ������Ч 43 */ 44 int mt_udpsendrcv(struct sockaddr_in* dst, void* pkg, int len, void* rcv_buf, int& buf_size, int timeout) 45 { 46 int ret = 0; 47 int rc = 0; 48 int flags = 1; 49 struct sockaddr_in from_addr = {0}; 50 int addr_len = sizeof(from_addr); 51 52 if(len<1 || buf_size<1 ||!dst || !pkg || !rcv_buf) 53 { 54 MTLOG_ERROR("mt_udpsendrcv input params invalid, dst[%p], pkg[%p], rcv_buf[%p], len[%d], buf_size[%d]", 55 dst, pkg, rcv_buf, len, buf_size); 56 return -10; 57 } 58 59 int sock = socket(PF_INET, SOCK_DGRAM, 0); 60 if ((sock < 0) || (ioctl(sock, FIONBIO, &flags) < 0)) 61 { 62 MT_ATTR_API(320842, 1); // socketʧ�� 63 MTLOG_ERROR("mt_udpsendrcv new sock failed, sock: %d, errno: %d (%m)", sock, errno); 64 ret = -1; 65 goto EXIT_LABEL; 66 } 67 68 rc = MtFrame::sendto(sock, pkg, len, 0, (struct sockaddr*)dst, (int)sizeof(*dst), timeout); 69 if (rc < 0) 70 { 71 MT_ATTR_API(320844, 1); // ����ʧ�� 72 MTLOG_ERROR("mt_udpsendrcv send failed, rc: %d, errno: %d (%m)", rc, errno); 73 ret = -2; 74 goto EXIT_LABEL; 75 } 76 77 rc = MtFrame::recvfrom(sock, rcv_buf, buf_size, 0, (struct sockaddr*)&from_addr, (socklen_t*)&addr_len, timeout); 78 if (rc < 0) 79 { 80 MT_ATTR_API(320845, 1); // ����δ��ȫ�ɹ� 81 MTLOG_ERROR("mt_udpsendrcv recv failed, rc: %d, errno: %d (%m)", rc, errno); 82 ret = -3; 83 goto EXIT_LABEL; 84 } 85 buf_size = rc; 86 87 EXIT_LABEL: 88 89 if (sock > 0) 90 { 91 close(sock); 92 sock = -1; 93 } 94 95 return ret; 96 } 97 98 /** 99 * @brief ����TCP���֣�������Ϊ������ 100 * @return >=0 �ɹ�, <0 ʧ�� 101 */ 102 int mt_tcp_create_sock(void) 103 { 104 int fd; 105 int flag; 106 107 // ����socket 108 fd = ::socket(AF_INET, SOCK_STREAM, 0); 109 if (fd < 0) 110 { 111 MTLOG_ERROR("create tcp socket failed, error: %m"); 112 return -1; 113 } 114 115 // ����socket������ 116 flag = fcntl(fd, F_GETFL, 0); 117 if (flag == -1) 118 { 119 ::close(fd); 120 MTLOG_ERROR("get fd flags failed, error: %m"); 121 return -2; 122 } 123 124 if (flag & O_NONBLOCK) 125 return fd; 126 127 if (fcntl(fd, F_SETFL, flag | O_NONBLOCK | O_NDELAY) == -1) 128 { 129 ::close(fd); 130 MTLOG_ERROR("set fd flags failed, error: %m"); 131 return -3; 132 } 133 134 return fd; 135 } 136 137 /** 138 * @brief TCP��ȡ������֪ͨ������socket 139 */ 140 static TcpKeepConn* mt_tcp_get_keep_conn(struct sockaddr_in* dst, int& sock) 141 { 142 // 1. ��ȡ�߳�֪ͨע����� 143 KqueuerObj* ntfy_obj = NtfyObjMgr::Instance()->GetNtfyObj(NTFY_OBJ_THREAD, 0); 144 if (NULL == ntfy_obj) 145 { 146 MTLOG_ERROR("get notify failed, logit"); 147 return NULL; 148 } 149 150 // 2. ��ȡ���Ӷ���, ����֪ͨ��Ϣ 151 TcpKeepConn* conn = dynamic_cast<TcpKeepConn*>(ConnectionMgr::Instance()->GetConnection(OBJ_TCP_KEEP, dst)); 152 if (NULL == conn) 153 { 154 MTLOG_ERROR("get connection failed, dst[%p]", dst); 155 NtfyObjMgr::Instance()->FreeNtfyObj(ntfy_obj); 156 return NULL; 157 } 158 conn->SetNtfyObj(ntfy_obj); 159 160 // 3. ��������socket��� 161 int osfd = conn->CreateSocket(); 162 if (osfd < 0) 163 { 164 ConnectionMgr::Instance()->FreeConnection(conn, true); 165 MTLOG_ERROR("create socket failed, ret[%d]", osfd); 166 return NULL; 167 } 168 169 // 4. �ɹ��������� 170 sock = osfd; 171 return conn; 172 } 173 174 /** 175 * @brief TCPѭ������, ֱ������OK��ʱ 176 * [ע��] �����߲�Ҫ�����ĺ�������ֵ����֤��Ҫ��mt_tcpsendrcv�ȵ��ýӿڳ�ͻ [��Ҫ] 177 */ 178 static int mt_tcp_check_recv(int sock, char* rcv_buf, int &len, int flags, int timeout, MtFuncTcpMsgLen func) 179 { 180 int recv_len = 0; 181 utime64_t start_ms = MtFrame::Instance()->GetLastClock(); 182 do 183 { 184 utime64_t cost_time = MtFrame::Instance()->GetLastClock() - start_ms; 185 if (cost_time > (utime64_t)timeout) 186 { 187 errno = ETIME; 188 MTLOG_ERROR("tcp socket[%d] recv not ok, timeout", sock); 189 return -3; 190 } 191 192 int rc = MtFrame::recv(sock, (rcv_buf + recv_len), (len - recv_len), 0, (timeout - (int)cost_time)); 193 if (rc < 0) 194 { 195 MTLOG_ERROR("tcp socket[%d] recv failed ret[%d][%m]", sock, rc); 196 return -3; 197 } 198 else if (rc == 0) 199 { 200 len = recv_len; 201 MTLOG_ERROR("tcp socket[%d] remote close", sock); 202 return -7; 203 } 204 recv_len += rc; 205 206 /* ��鱨�������� */ 207 rc = func(rcv_buf, recv_len); 208 if (rc < 0) 209 { 210 MTLOG_ERROR("tcp socket[%d] user check pkg error[%d]", sock, rc); 211 return -5; 212 } 213 else if (rc == 0) // ����δ������ 214 { 215 if (len == recv_len) // û�ռ��ٽ�����, ���� 216 { 217 MTLOG_ERROR("tcp socket[%d] user check pkg not ok, but no more buff", sock); 218 return -6; 219 } 220 continue; 221 } 222 else // �ɹ����㱨�ij��� 223 { 224 if (rc > recv_len) // ���Ļ�δ��ȫ 225 { 226 continue; 227 } 228 else 229 { 230 len = rc; 231 break; 232 } 233 } 234 } while (true); 235 236 return 0; 237 } 238 239 /** 240 * @brief TCP��������ӳصķ�ʽ����IP/PORT����, ���ӱ���Ĭ��10���� 241 * [ע��] tcp���շ���buff, ��������static����, ����������Ĵ��� [��Ҫ] 242 * [ע��] �Ľӿڣ���ע�ⲻҪ����ķ���ֵ������֤��mt_tcpsendrcv_ex����ֵƥ�� [��Ҫ] 243 * @param dst -�����͵�Ŀ�ĵ�ַ 244 * @param pkg -�������װ�İ��� 245 * @param len -�������װ�İ��峤�� 246 * @param rcv_buf -����Ӧ�����buff 247 * @param buf_size -modify-����Ӧ�����buff��С, �ɹ�����ʱ, ��ΪӦ������� 248 * @param timeout -��ʱʱ��, ��λms 249 * @param check_func -��ⱨ���Ƿ�ɹ����ﺯ�� 250 * @return 0 �ɹ�, -1 ��socketʧ��, -2 ��������ʧ��, -3 ����Ӧ��ʧ��, 251 * -4 ����ʧ��, -5 ��ⱨ��ʧ��, -6 ���տռ䲻��, -7 ��������ر����ӣ�-10 ������Ч 252 */ 253 int mt_tcpsendrcv(struct sockaddr_in* dst, void* pkg, int len, void* rcv_buf, int& buf_size, int timeout, MtFuncTcpMsgLen func) 254 { 255 if (!dst || !pkg || !rcv_buf || !func || len<1 || buf_size<1) 256 { 257 MTLOG_ERROR("input params invalid, dst[%p], pkg[%p], rcv_buf[%p], fun[%p], len[%d], buf_size[%d]", 258 dst, pkg, rcv_buf, func, len, buf_size); 259 return -10; 260 } 261 262 int ret = 0, rc = 0; 263 int addr_len = sizeof(struct sockaddr_in); 264 utime64_t start_ms = MtFrame::Instance()->GetLastClock(); 265 utime64_t cost_time = 0; 266 int time_left = timeout; 267 268 // 1. ��ȡTCP���ӳض���, �ҽ�֪ͨ���� 269 int sock = -1; 270 TcpKeepConn* conn = mt_tcp_get_keep_conn(dst, sock); 271 if ((conn == NULL) || (sock < 0)) 272 { 273 MTLOG_ERROR("socket[%d] get conn failed, ret[%m]", sock); 274 ret = -1; 275 goto EXIT_LABEL; 276 } 277 278 // 2. ���Լ����½����� 279 rc = MtFrame::connect(sock, (struct sockaddr *)dst, addr_len, time_left); 280 if (rc < 0) 281 { 282 MTLOG_ERROR("socket[%d] connect failed, ret[%d][%m]", sock, rc); 283 ret = -4; 284 goto EXIT_LABEL; 285 } 286 287 // 3. �������ݴ��� 288 cost_time = MtFrame::Instance()->GetLastClock() - start_ms; 289 time_left = (timeout > (int)cost_time) ? (timeout - (int)cost_time) : 0; 290 rc = MtFrame::send(sock, pkg, len, 0, time_left); 291 if (rc < 0) 292 { 293 MTLOG_ERROR("socket[%d] send failed, ret[%d][%m]", sock, rc); 294 ret = -2; 295 goto EXIT_LABEL; 296 } 297 298 // 4. �������ݴ��� 299 cost_time = MtFrame::Instance()->GetLastClock() - start_ms; 300 time_left = (timeout > (int)cost_time) ? (timeout - (int)cost_time) : 0; 301 rc = mt_tcp_check_recv(sock, (char*)rcv_buf, buf_size, 0, time_left, func); 302 if (rc < 0) 303 { 304 MTLOG_ERROR("socket[%d] rcv failed, ret[%d][%m]", sock, rc); 305 ret = rc; 306 goto EXIT_LABEL; 307 } 308 309 ret = 0; 310 311 EXIT_LABEL: 312 313 // ʧ����ǿ���ͷ�����, ����ʱ���� 314 if (conn != NULL) 315 { 316 ConnectionMgr::Instance()->FreeConnection(conn, (ret < 0)); 317 } 318 319 return ret; 320 } 321 322 /** 323 * @brief TCP�������շ����� 324 * [ע��] tcp���շ���buff, ��������static����, ����������Ĵ��� [��Ҫ] 325 * [ע��] �Ľӿڣ���ע�ⲻҪ����ķ���ֵ������֤��mt_tcpsendrcv_ex����ֵƥ�� [��Ҫ] 326 * @param dst -�����͵�Ŀ�ĵ�ַ 327 * @param pkg -�������װ�İ��� 328 * @param len -�������װ�İ��峤�� 329 * @param rcv_buf -����Ӧ�����buff 330 * @param buf_size -modify-����Ӧ�����buff��С, �ɹ�����ʱ, ��ΪӦ������� 331 * @param timeout -��ʱʱ��, ��λms 332 * @param check_func -��ⱨ���Ƿ�ɹ����ﺯ�� 333 * @return 0 �ɹ�, -1 ��socketʧ��, -2 ��������ʧ��, -3 ����Ӧ��ʧ��, 334 * -4 ����ʧ��, -5 ��ⱨ��ʧ��, -6 ���տռ䲻��, -7 ��������ر����ӣ�-10 ������Ч 335 */ 336 int mt_tcpsendrcv_short(struct sockaddr_in* dst, void* pkg, int len, void* rcv_buf, int& buf_size, int timeout, MtFuncTcpMsgLen func) 337 { 338 int ret = 0, rc = 0; 339 int addr_len = sizeof(struct sockaddr_in); 340 utime64_t start_ms = MtFrame::Instance()->GetLastClock(); 341 utime64_t cost_time = 0; 342 int time_left = timeout; 343 344 // 1. ������� 345 if (!dst || !pkg || !rcv_buf || !func || len<1 || buf_size<1) 346 { 347 MTLOG_ERROR("input params invalid, dst[%p], pkg[%p], rcv_buf[%p], fun[%p], len[%d], buf_size[%d]", 348 dst, pkg, rcv_buf, func, len, buf_size); 349 return -10; 350 } 351 352 // 2. ����TCP socket 353 int sock; 354 sock = mt_tcp_create_sock(); 355 if (sock < 0) 356 { 357 MTLOG_ERROR("create tcp socket failed, ret: %d", sock); 358 return -1; 359 } 360 361 // 3. ���Լ����½����� 362 rc = MtFrame::connect(sock, (struct sockaddr *)dst, addr_len, time_left); 363 if (rc < 0) 364 { 365 MTLOG_ERROR("socket[%d] connect failed, ret[%d][%m]", sock, rc); 366 ret = -4; 367 goto EXIT_LABEL; 368 } 369 370 // 4. �������ݴ��� 371 cost_time = MtFrame::Instance()->GetLastClock() - start_ms; 372 time_left = (timeout > (int)cost_time) ? (timeout - (int)cost_time) : 0; 373 rc = MtFrame::send(sock, pkg, len, 0, time_left); 374 if (rc < 0) 375 { 376 MTLOG_ERROR("socket[%d] send failed, ret[%d][%m]", sock, rc); 377 ret = -2; 378 goto EXIT_LABEL; 379 } 380 381 // 5. �������ݴ��� 382 cost_time = MtFrame::Instance()->GetLastClock() - start_ms; 383 time_left = (timeout > (int)cost_time) ? (timeout - (int)cost_time) : 0; 384 rc = mt_tcp_check_recv(sock, (char*)rcv_buf, buf_size, 0, time_left, func); 385 if (rc < 0) 386 { 387 MTLOG_ERROR("socket[%d] rcv failed, ret[%d][%m]", sock, rc); 388 ret = rc; 389 goto EXIT_LABEL; 390 } 391 392 ret = 0; 393 394 EXIT_LABEL: 395 if (sock >= 0) 396 ::close(sock); 397 398 return ret; 399 } 400 401 402 /** 403 * @brief TCP��������ӳصķ�ʽ����IP/PORT����, ���ӱ���Ĭ��10���� 404 * [ע��] tcp����buff, ��������static����, ����������Ĵ��� [��Ҫ] 405 * [ע��] �Ľӿڣ���ע�ⲻҪ����ķ���ֵ������֤��mt_tcpsendrcv_ex����ֵƥ�� [��Ҫ] 406 * @param dst -�����͵�Ŀ�ĵ�ַ 407 * @param pkg -�������װ�İ��� 408 * @param len -�������װ�İ��峤�� 409 * @param timeout -��ʱʱ��, ��λms 410 * @return 0 �ɹ�, -1 ��socketʧ��, -2 ��������ʧ��, -4 ����ʧ��, -10 ������Ч 411 */ 412 int mt_tcpsend(struct sockaddr_in* dst, void* pkg, int len, int timeout) 413 { 414 if (!dst || !pkg || len<1) 415 { 416 MTLOG_ERROR("input params invalid, dst[%p], pkg[%p], len[%d]", dst, pkg, len); 417 return -10; 418 } 419 420 int ret = 0, rc = 0; 421 int addr_len = sizeof(struct sockaddr_in); 422 utime64_t start_ms = MtFrame::Instance()->GetLastClock(); 423 utime64_t cost_time = 0; 424 int time_left = timeout; 425 426 // 1. ��ȡTCP���ӳض���, �ҽ�֪ͨ���� 427 int sock = -1; 428 TcpKeepConn* conn = mt_tcp_get_keep_conn(dst, sock); 429 if ((conn == NULL) || (sock < 0)) 430 { 431 MTLOG_ERROR("socket[%d] get conn failed, ret[%m]", sock); 432 ret = -1; 433 goto EXIT_LABEL; 434 } 435 436 // 2. ���Լ����½����� 437 rc = MtFrame::connect(sock, (struct sockaddr *)dst, addr_len, time_left); 438 if (rc < 0) 439 { 440 MTLOG_ERROR("socket[%d] connect failed, ret[%d][%m]", sock, rc); 441 ret = -4; 442 goto EXIT_LABEL; 443 } 444 445 // 3. �������ݴ��� 446 cost_time = MtFrame::Instance()->GetLastClock() - start_ms; 447 time_left = (timeout > (int)cost_time) ? (timeout - (int)cost_time) : 0; 448 rc = MtFrame::send(sock, pkg, len, 0, time_left); 449 if (rc < 0) 450 { 451 MTLOG_ERROR("socket[%d] send failed, ret[%d][%m]", sock, rc); 452 ret = -2; 453 goto EXIT_LABEL; 454 } 455 456 ret = 0; 457 458 EXIT_LABEL: 459 460 // ʧ����ǿ���ͷ�����, ����ʱ���� 461 if (conn != NULL) 462 { 463 ConnectionMgr::Instance()->FreeConnection(conn, (ret < 0)); 464 } 465 466 return ret; 467 } 468 469 /** 470 * @brief TCP������ֻ�����սӿ� 471 * [ע��] tcp����buff, ��������static����, ����������Ĵ��� [��Ҫ] 472 * [ע��] �Ľӿڣ���ע�ⲻҪ����ķ���ֵ������֤��mt_tcpsendrcv_ex����ֵƥ�� [��Ҫ] 473 * @param dst -�����͵�Ŀ�ĵ�ַ 474 * @param pkg -�������װ�İ��� 475 * @param len -�������װ�İ��峤�� 476 * @param timeout -��ʱʱ��, ��λms 477 * @return 0 �ɹ�, -1 ��socketʧ��, -2 ��������ʧ��, -4 ����ʧ��, -10 ������Ч 478 */ 479 int mt_tcpsend_short(struct sockaddr_in* dst, void* pkg, int len, int timeout) 480 { 481 // 1. ��μ�� 482 if (!dst || !pkg || len<1) 483 { 484 MTLOG_ERROR("input params invalid, dst[%p], pkg[%p], len[%d]", dst, pkg, len); 485 return -10; 486 } 487 488 int ret = 0, rc = 0; 489 int addr_len = sizeof(struct sockaddr_in); 490 utime64_t start_ms = MtFrame::Instance()->GetLastClock(); 491 utime64_t cost_time = 0; 492 int time_left = timeout; 493 494 // 2. ����TCP socket 495 int sock = -1; 496 sock = mt_tcp_create_sock(); 497 if (sock < 0) 498 { 499 MTLOG_ERROR("create tcp socket failed, ret: %d", sock); 500 ret = -1; 501 goto EXIT_LABEL; 502 } 503 504 // 2. ���Լ����½����� 505 rc = MtFrame::connect(sock, (struct sockaddr *)dst, addr_len, time_left); 506 if (rc < 0) 507 { 508 MTLOG_ERROR("socket[%d] connect failed, ret[%d][%m]", sock, rc); 509 ret = -4; 510 goto EXIT_LABEL; 511 } 512 513 // 3. �������ݴ��� 514 cost_time = MtFrame::Instance()->GetLastClock() - start_ms; 515 time_left = (timeout > (int)cost_time) ? (timeout - (int)cost_time) : 0; 516 rc = MtFrame::send(sock, pkg, len, 0, time_left); 517 if (rc < 0) 518 { 519 MTLOG_ERROR("socket[%d] send failed, ret[%d][%m]", sock, rc); 520 ret = -2; 521 goto EXIT_LABEL; 522 } 523 524 ret = 0; 525 526 EXIT_LABEL: 527 528 if (sock >= 0) 529 ::close(sock); 530 531 return ret; 532 } 533 534 535 /** 536 * @brief TCP�շ��ӿڣ�����ѡ���˱������ӻ��߶����� 537 * [ע��] tcp���շ���buff, ��������static����, ����������Ĵ��� [��Ҫ] 538 * @param dst -�����͵�Ŀ�ĵ�ַ 539 * @param pkg -�������װ�İ��� 540 * @param len -�������װ�İ��峤�� 541 * @param rcv_buf -����Ӧ�����buff��ֻ�����տ�������ΪNULL 542 * @param buf_size -modify-����Ӧ�����buff��С, �ɹ�����ʱ, ��ΪӦ������ȣ�ֻ�����գ�����ΪNULL 543 * @param timeout -��ʱʱ��, ��λms 544 * @param check_func -��ⱨ���Ƿ�ɹ����ﺯ�� 545 * @param type - �������� 546 * MT_TCP_SHORT: һ��һ�������ӣ� 547 * MT_TCP_LONG : һ��һ�ճ����ӣ� 548 * MT_TCP_LONG_SNDONLY : ֻ�����ճ����ӣ� 549 * MT_TCP_SHORT_SNDONLY: ֻ�����ն����ӣ� 550 * @return 0 �ɹ�, -1 ��socketʧ��, -2 ��������ʧ��, -3 ����Ӧ��ʧ��, 551 * -4 ����ʧ��, -5 ��ⱨ��ʧ��, -6 ���տռ䲻��, -7 ��������ر�����, -10 ������Ч 552 */ 553 int mt_tcpsendrcv_ex(struct sockaddr_in* dst, void* pkg, int len, void* rcv_buf, int* buf_size, int timeout, MtFuncTcpMsgLen func, MT_TCP_CONN_TYPE type) 554 { 555 if(!dst || !pkg || len<1) 556 { 557 MTLOG_ERROR("input params invalid, dst[%p], pkg[%p], rcv_buf[%p], fun[%p], len[%d], buf_size[%p]type[%d]", 558 dst, pkg, rcv_buf, func, len, buf_size,type); 559 return -10; 560 } 561 562 switch (type) 563 { 564 // TCP�����ӵ������� 565 case MT_TCP_LONG: 566 { 567 return mt_tcpsendrcv(dst, pkg, len, rcv_buf, *buf_size, timeout, func); 568 } 569 570 // TCP������ֻ������ 571 case MT_TCP_LONG_SNDONLY: 572 { 573 return mt_tcpsend(dst, pkg, len, timeout); 574 } 575 576 // TCP�����ӵ������� 577 case MT_TCP_SHORT: 578 { 579 return mt_tcpsendrcv_short(dst, pkg, len, rcv_buf, *buf_size, timeout, func); 580 } 581 582 // TCP������ֻ������ 583 case MT_TCP_SHORT_SNDONLY: 584 { 585 return mt_tcpsend_short(dst, pkg, len, timeout); 586 } 587 588 default: 589 { 590 MTLOG_ERROR("input params invalid, dst[%p], pkg[%p], rcv_buf[%p], fun[%p], len[%d], buf_size[%p]type[%d]", 591 dst, pkg, rcv_buf, func, len, buf_size,type); 592 return -10; 593 } 594 } 595 596 return 0; 597 } 598 599 600 601 /** 602 * @brief ������ص��������� 603 */ 604 static void mt_task_process(void* arg) 605 { 606 int rc = 0; 607 IMtTask* task = (IMtTask*)arg; 608 if (!task) 609 { 610 MTLOG_ERROR("Invalid arg, error"); 611 return; 612 } 613 614 rc = task->Process(); 615 if (rc != 0) 616 { 617 MTLOG_DEBUG("task process failed(%d), log", rc); 618 } 619 620 task->SetResult(rc); 621 622 return; 623 }; 624 625 /** 626 * @brief ��·IO�Ĵ���, ��������̹߳��� 627 * @param req_list - �����б� 628 * @return 0 �ɹ�, <0ʧ�� 629 */ 630 int mt_exec_all_task(IMtTaskList& req_list) 631 { 632 MtFrame* mtframe = MtFrame::Instance(); 633 MicroThread* thread = mtframe->GetActiveThread(); 634 IMtTask* task = NULL; 635 MicroThread* sub = NULL; 636 MicroThread* tmp = NULL; 637 int rc = -1; 638 639 MicroThread::SubThreadList list; 640 TAILQ_INIT(&list); 641 642 // ��ֹû��task�������߳�һֱ����ס 643 if (0 == req_list.size()) 644 { 645 MTLOG_DEBUG("no task for execult"); 646 return 0; 647 } 648 649 // 1. �����̶߳��� 650 for (IMtTaskList::iterator it = req_list.begin(); it != req_list.end(); ++it) 651 { 652 task = *it; 653 sub = MtFrame::CreateThread(mt_task_process, task, false); 654 if (NULL == sub) 655 { 656 MTLOG_ERROR("create sub thread failed"); 657 goto EXIT_LABEL; 658 } 659 660 sub->SetType(MicroThread::SUB_THREAD); 661 TAILQ_INSERT_TAIL(&list, sub, _sub_entry); 662 } 663 664 // 2. ����ִ������ 665 TAILQ_FOREACH_SAFE(sub, &list, _sub_entry, tmp) 666 { 667 TAILQ_REMOVE(&list, sub, _sub_entry); 668 thread->AddSubThread(sub); 669 mtframe->InsertRunable(sub); 670 } 671 672 // 3. �ȴ����߳�ִ�н��� 673 thread->Wait(); 674 rc = 0; 675 676 EXIT_LABEL: 677 678 TAILQ_FOREACH_SAFE(sub, &list, _sub_entry, tmp) 679 { 680 TAILQ_REMOVE(&list, sub, _sub_entry); 681 mtframe->FreeThread(sub); 682 } 683 684 return rc; 685 686 } 687 688 /** 689 * @brief ���õ�ǰIMtMsg��˽�б��� 690 * @info ֻ����ָ�룬�ڴ���Ҫҵ����� 691 */ 692 void mt_set_msg_private(void *data) 693 { 694 MicroThread *msg_thread = MtFrame::Instance()->GetRootThread(); 695 if (msg_thread != NULL) 696 msg_thread->SetPrivate(data); 697 } 698 699 /** 700 * @brief ��ȡ��ǰIMtMsg��˽�б��� 701 * @return ˽�б���ָ�� 702 */ 703 void* mt_get_msg_private() 704 { 705 MicroThread *msg_thread = MtFrame::Instance()->GetRootThread(); 706 if (NULL == msg_thread) 707 { 708 return NULL; 709 } 710 711 return msg_thread->GetPrivate(); 712 } 713 714 /** 715 * @brief �߳̿�ܳ�ʼ�� 716 * @info ҵ��ʹ��spp�������̣߳���Ҫ���øó�ʼ������ 717 * @return false:��ʼ��ʧ�� true:��ʼ���ɹ� 718 */ 719 bool mt_init_frame(int argc, char * const argv[]) 720 { 721 if (argc) { 722 ff_init(argc, argv); 723 ff_set_hook_flag(); 724 } 725 memset(&g_mt_syscall_tab, 0, sizeof(g_mt_syscall_tab)); 726 return MtFrame::Instance()->InitFrame(); 727 } 728 729 /** 730 * @brief �����̶߳���ջ�ռ��С 731 * @info �DZ������ã�Ĭ�ϴ�СΪ128K 732 */ 733 void mt_set_stack_size(unsigned int bytes) 734 { 735 ThreadPool::SetDefaultStackSize(bytes); 736 } 737 738 /** 739 * @brief �̰߳�����ϵͳIO���� recvfrom 740 * @param fd ϵͳsocket��Ϣ 741 * @param buf ������Ϣ������ָ�� 742 * @param len ������Ϣ���������� 743 * @param from ��Դ��ַ��ָ�� 744 * @param fromlen ��Դ��ַ�Ľṹ���� 745 * @param timeout ��ȴ�ʱ��, ���� 746 * @return >0 �ɹ����ճ���, <0 ʧ�� 747 */ 748 int mt_recvfrom(int fd, void *buf, int len, int flags, struct sockaddr *from, socklen_t *fromlen, int timeout) 749 { 750 return MtFrame::recvfrom(fd, buf, len, flags, from, fromlen, timeout); 751 } 752 753 /** 754 * @brief �̰߳�����ϵͳIO���� sendto 755 * @param fd ϵͳsocket��Ϣ 756 * @param msg �����͵���Ϣָ�� 757 * @param len �����͵���Ϣ���� 758 * @param to Ŀ�ĵ�ַ��ָ�� 759 * @param tolen Ŀ�ĵ�ַ�Ľṹ���� 760 * @param timeout ��ȴ�ʱ��, ���� 761 * @return >0 �ɹ����ͳ���, <0 ʧ�� 762 */ 763 int mt_sendto(int fd, const void *msg, int len, int flags, const struct sockaddr *to, int tolen, int timeout) 764 { 765 return MtFrame::sendto(fd, msg, len, flags, to, tolen, timeout); 766 } 767 768 /** 769 * @brief �̰߳�����ϵͳIO���� connect 770 * @param fd ϵͳsocket��Ϣ 771 * @param addr ָ��server��Ŀ�ĵ�ַ 772 * @param addrlen ��ַ�ij��� 773 * @param timeout ��ȴ�ʱ��, ���� 774 * @return >0 �ɹ����ͳ���, <0 ʧ�� 775 */ 776 int mt_connect(int fd, const struct sockaddr *addr, int addrlen, int timeout) 777 { 778 return MtFrame::connect(fd, addr, addrlen, timeout); 779 } 780 781 /** 782 * @brief �̰߳�����ϵͳIO���� accept 783 * @param fd �������� 784 * @param addr �ͻ��˵�ַ 785 * @param addrlen ��ַ�ij��� 786 * @param timeout ��ȴ�ʱ��, ���� 787 * @return >=0 accept��socket������, <0 ʧ�� 788 */ 789 int mt_accept(int fd, struct sockaddr *addr, socklen_t *addrlen, int timeout) 790 { 791 return MtFrame::accept(fd, addr, addrlen, timeout); 792 } 793 794 795 /** 796 * @brief �̰߳�����ϵͳIO���� read 797 * @param fd ϵͳsocket��Ϣ 798 * @param buf ������Ϣ������ָ�� 799 * @param nbyte ������Ϣ���������� 800 * @param timeout ��ȴ�ʱ��, ���� 801 * @return >0 �ɹ����ճ���, <0 ʧ�� 802 */ 803 ssize_t mt_read(int fd, void *buf, size_t nbyte, int timeout) 804 { 805 return MtFrame::read(fd, buf, nbyte, timeout); 806 } 807 808 /** 809 * @brief �̰߳�����ϵͳIO���� write 810 * @param fd ϵͳsocket��Ϣ 811 * @param buf �����͵���Ϣָ�� 812 * @param nbyte �����͵���Ϣ���� 813 * @param timeout ��ȴ�ʱ��, ���� 814 * @return >0 �ɹ����ͳ���, <0 ʧ�� 815 */ 816 ssize_t mt_write(int fd, const void *buf, size_t nbyte, int timeout) 817 { 818 return MtFrame::write(fd, buf, nbyte, timeout); 819 } 820 821 /** 822 * @brief �̰߳�����ϵͳIO���� recv 823 * @param fd ϵͳsocket��Ϣ 824 * @param buf ������Ϣ������ָ�� 825 * @param len ������Ϣ���������� 826 * @param timeout ��ȴ�ʱ��, ���� 827 * @return >0 �ɹ����ճ���, <0 ʧ�� 828 */ 829 ssize_t mt_recv(int fd, void *buf, int len, int flags, int timeout) 830 { 831 return MtFrame::recv(fd, buf, len, flags, timeout); 832 } 833 834 /** 835 * @brief �̰߳�����ϵͳIO���� send 836 * @param fd ϵͳsocket��Ϣ 837 * @param buf �����͵���Ϣָ�� 838 * @param nbyte �����͵���Ϣ���� 839 * @param timeout ��ȴ�ʱ��, ���� 840 * @return >0 �ɹ����ͳ���, <0 ʧ�� 841 */ 842 ssize_t mt_send(int fd, const void *buf, size_t nbyte, int flags, int timeout) 843 { 844 return MtFrame::send(fd, buf, nbyte, flags, timeout); 845 } 846 847 /** 848 * @brief �߳�����sleep�ӿ�, ��λms 849 */ 850 void mt_sleep(int ms) 851 { 852 MtFrame::sleep(ms); 853 } 854 855 /** 856 * @brief �̻߳�ȡϵͳʱ�䣬��λms 857 */ 858 unsigned long long mt_time_ms(void) 859 { 860 return MtFrame::Instance()->GetLastClock(); 861 } 862 863 /** 864 * @brief �̵߳ȴ�epoll�¼��İ������� 865 */ 866 int mt_wait_events(int fd, int events, int timeout) 867 { 868 return MtFrame::Instance()->WaitEvents(fd, events, timeout); 869 } 870 871 void* mt_start_thread(void* entry, void* args) 872 { 873 return MtFrame::Instance()->CreateThread((ThreadStart)entry, args, true); 874 } 875 876 #define BUF_ALIGNMENT_SIZE 4096 877 #define BUF_ALIGN_SIZE(x) (((x)+BUF_ALIGNMENT_SIZE-1)&~(BUF_ALIGNMENT_SIZE-1)) 878 #define BUF_DEFAULT_SIZE 4096 879 880 class ScopedBuf 881 { 882 public: 883 ScopedBuf(void*& buf_keeper, bool keep) 884 :buf_keeper_(buf_keeper),buf_(0),len_(0),len_watermark_(0),keep_(keep) 885 {} 886 887 int Alloc(int len) 888 { 889 if(len<len_) 890 { 891 return -1; // �ռ��СԽ�� 892 } 893 894 if(len==0) 895 { 896 len = BUF_ALIGNMENT_SIZE; 897 } 898 if(len_==len) 899 { 900 return 0; 901 } 902 903 len_ = BUF_ALIGN_SIZE(len); 904 if(len_==0) 905 { 906 len_ = BUF_DEFAULT_SIZE; 907 } 908 len_watermark_ = len_-BUF_ALIGNMENT_SIZE; 909 char* tmp = (char*)realloc(buf_, len_); 910 if(tmp==NULL) 911 { 912 return -2; // �����ڴ�ʧ�� 913 } 914 915 buf_ = tmp; 916 return 0; 917 } 918 919 void reset() 920 { 921 if(keep_) 922 { 923 buf_keeper_ = (void*)buf_; 924 buf_ = NULL; 925 } 926 } 927 928 ~ScopedBuf() 929 { 930 if(buf_!=NULL) 931 { 932 free(buf_); 933 buf_ = NULL; 934 } 935 } 936 937 public: 938 void* &buf_keeper_; 939 char* buf_; 940 int len_; 941 int len_watermark_; 942 bool keep_; 943 944 }; 945 946 /** 947 * @brief TCPѭ������, ֱ������OK��ʱ 948 * [ע��] �����߲�Ҫ�����ĺ�������ֵ����֤��Ҫ��mt_tcpsendrcv�ȵ��ýӿڳ�ͻ [��Ҫ] 949 */ 950 static int mt_tcp_check_recv(int sock, void*& rcv_buf, int &len, int flags, 951 int timeout, MtFuncTcpMsgChecker check_func, void* msg_ctx, bool keep_rcv_buf) 952 { 953 int recv_len = 0; 954 utime64_t start_ms = MtFrame::Instance()->GetLastClock(); 955 956 int rc = 0; 957 int ret = 0; 958 int pkg_len = 0; 959 bool msg_len_detected = false; 960 961 ScopedBuf sbuf(rcv_buf, keep_rcv_buf); 962 ret = sbuf.Alloc(len); 963 964 if(ret!=0) 965 { 966 MTLOG_ERROR("tcp socket[%d] recv failed ret[%d], alloc rcv buf failed, [%m]", sock, ret); 967 return -11; 968 } 969 970 do 971 { 972 utime64_t cost_time = MtFrame::Instance()->GetLastClock() - start_ms; 973 if (cost_time > (utime64_t)timeout) 974 { 975 errno = ETIME; 976 MTLOG_ERROR("tcp socket[%d] recv not ok, timeout", sock); 977 return -3; 978 } 979 980 rc = MtFrame::recv(sock, (sbuf.buf_ + recv_len), (sbuf.len_ - recv_len), 0, (timeout - (int)cost_time)); 981 if (rc < 0) 982 { 983 MTLOG_ERROR("tcp socket[%d] recv failed ret[%d][%m]", sock, rc); 984 return -3; 985 } 986 else if (rc == 0) // Զ�˹ر� 987 { 988 989 if(recv_len==0) // δ�ذ���ֱ�ӷ���Զ�˹ر� 990 { 991 MTLOG_ERROR("tcp socket[%d] remote close", sock); 992 return -7; 993 } 994 995 /* ��鱨�������� */ 996 rc = check_func(sbuf.buf_, recv_len, true, msg_ctx, msg_len_detected); 997 998 if(rc!=recv_len) // ҵ�����Զ�˹رգ�Ӧ�÷������������ȣ�����<=0,��ʾ���������� 999 { 1000 MTLOG_ERROR("tcp socket[%d] remote close", sock); 1001 return -7; 1002 } 1003 len = recv_len; 1004 break; 1005 } 1006 recv_len += rc; 1007 1008 /* ��鱨�������� */ 1009 if((!msg_len_detected)||recv_len==pkg_len) 1010 { 1011 rc = check_func(sbuf.buf_, recv_len, false, msg_ctx,msg_len_detected); 1012 if(msg_len_detected) 1013 { 1014 pkg_len = rc; 1015 } 1016 } 1017 else 1018 { 1019 rc = pkg_len; 1020 } 1021 1022 if (rc < 0) 1023 { 1024 MTLOG_ERROR("tcp socket[%d] user check pkg error[%d]", sock, rc); 1025 return -5; 1026 } 1027 else if (rc == 0) // ����δ������,�Ҳ�ȷ����С 1028 { 1029 if(sbuf.len_ > recv_len) 1030 { 1031 continue; 1032 } 1033 // û�ռ��ٽ�����, 2����С��չbuf 1034 ret = sbuf.Alloc(sbuf.len_<<1); 1035 1036 if(ret!=0) 1037 { 1038 MTLOG_ERROR("tcp socket[%d] recv failed ret[%d], alloc rcv buf failed, [%m]", sock, ret); 1039 return -11; 1040 } 1041 } 1042 else // �ɹ����㱨�ij��� 1043 { 1044 if (rc > recv_len) // ���Ļ�δ��ȫ 1045 { 1046 if(sbuf.len_ > recv_len) // recv buf���пռ�.��δ����ˮλ 1047 { 1048 continue; 1049 } 1050 1051 // û�ռ��ٽ�����, ����ҵ��ָʾ��С��չ�ڴ� 1052 ret = sbuf.Alloc(rc); 1053 1054 if(ret!=0) 1055 { 1056 MTLOG_ERROR("tcp socket[%d] recv failed ret[%d], alloc rcv buf failed, [%m]", sock, ret); 1057 return -11; 1058 } 1059 } 1060 else if(rc==recv_len) // �հ����� 1061 { 1062 len = rc; 1063 break; 1064 } 1065 else // �߳����ģʽ�£�������ճ�� 1066 { 1067 MTLOG_ERROR("tcp socket[%d] user check pkg error, pkg len < recv_len", sock); 1068 return -5; 1069 } 1070 } 1071 } while (true); 1072 1073 sbuf.reset(); 1074 1075 return 0; 1076 } 1077 1078 1079 1080 /** 1081 * @brief TCP��������ӳصķ�ʽ����IP/PORT����, ���ӱ���Ĭ��10���� 1082 * [ע��] tcp���շ���buff, ��������static����, ����������Ĵ��� [��Ҫ] 1083 * @param dst -�����͵�Ŀ�ĵ�ַ 1084 * @param pkg -�������װ�İ��� 1085 * @param len -�������װ�İ��峤�� 1086 * @param rcv_buf -���������������ο����� keep_rcv_buf�� 1087 * @param recv_pkg_size -����Ӧ�����buff�ij�ʼ��С����������0ʱ�����ر���ʵ�ʳ��ȡ� 1088 * @param timeout -��ʱʱ��, ��λms 1089 * @param check_func -��ⱨ���Ƿ�ɹ����ﺯ�� 1090 * @param msg_ctx -�������ĵ������ı����� 1091 * @param keep_rcv_buf -true,��ܽ�����rcv_buf��ֵ������������ҵ�����ͷŸ�buf���������ڵ��ý���ǰ�ͷŸ�buf����ע�⡿ 1092 * ҵ����Ҫ�Լ���msg_ctx�б��������Ϣ�������ͨ��malloc�����ڴ桿 1093 * @return 0 �ɹ�, -1 ��socketʧ��, -2 ��������ʧ��, -3 ����Ӧ��ʧ��, 1094 * -4 ����ʧ��, -5 ��ⱨ��ʧ��, -6 ���տռ䲻��, -7 ��������ر�����, -10 ������Ч 1095 */ 1096 int mt_tcpsendrcv(struct sockaddr_in* dst, void* pkg, int len, void*& rcv_buf, int& recv_pkg_size, 1097 int timeout, MtFuncTcpMsgChecker check_func, void* msg_ctx, bool keep_rcv_buf) 1098 { 1099 if(!dst || !pkg || len<1) 1100 { 1101 MTLOG_ERROR("input params invalid, dst[%p], pkg[%p], len[%d], fun[%p]", 1102 dst, pkg, len, check_func); 1103 return -10; 1104 } 1105 1106 1107 int ret = 0, rc = 0; 1108 int addr_len = sizeof(struct sockaddr_in); 1109 utime64_t start_ms = MtFrame::Instance()->GetLastClock(); 1110 utime64_t cost_time = 0; 1111 int time_left = timeout; 1112 1113 // 1. ��ȡTCP���ӳض���, �ҽ�֪ͨ���� 1114 int sock = -1; 1115 TcpKeepConn* conn = mt_tcp_get_keep_conn(dst, sock); 1116 if ((conn == NULL) || (sock < 0)) 1117 { 1118 MTLOG_ERROR("socket[%d] get conn failed, ret[%m]", sock); 1119 ret = -1; 1120 goto EXIT_LABEL; 1121 } 1122 1123 // 2. ���Լ����½����� 1124 rc = MtFrame::connect(sock, (struct sockaddr *)dst, addr_len, time_left); 1125 if (rc < 0) 1126 { 1127 MTLOG_ERROR("socket[%d] connect failed, ret[%d][%m]", sock, rc); 1128 ret = -4; 1129 goto EXIT_LABEL; 1130 } 1131 1132 // 3. �������ݴ��� 1133 cost_time = MtFrame::Instance()->GetLastClock() - start_ms; 1134 time_left = (timeout > (int)cost_time) ? (timeout - (int)cost_time) : 0; 1135 rc = MtFrame::send(sock, pkg, len, 0, time_left); 1136 if (rc < 0) 1137 { 1138 MTLOG_ERROR("socket[%d] send failed, ret[%d][%m]", sock, rc); 1139 ret = -2; 1140 goto EXIT_LABEL; 1141 } 1142 1143 // 4. �������ݴ��� 1144 cost_time = MtFrame::Instance()->GetLastClock() - start_ms; 1145 time_left = (timeout > (int)cost_time) ? (timeout - (int)cost_time) : 0; 1146 1147 rc = mt_tcp_check_recv(sock, rcv_buf, recv_pkg_size, 0, time_left, check_func, msg_ctx, keep_rcv_buf); 1148 if (rc < 0) 1149 { 1150 MTLOG_ERROR("socket[%d] rcv failed, ret[%d][%m]", sock, rc); 1151 ret = rc; 1152 goto EXIT_LABEL; 1153 } 1154 1155 ret = 0; 1156 1157 EXIT_LABEL: 1158 1159 // ʧ����ǿ���ͷ�����, ����ʱ���� 1160 if (conn != NULL) 1161 { 1162 ConnectionMgr::Instance()->FreeConnection(conn, (ret < 0)); 1163 } 1164 1165 return ret; 1166 } 1167 1168 1169 /** 1170 * @brief TCP�������շ����� 1171 * [ע��] tcp���շ���buff, ��������static����, ����������Ĵ��� [��Ҫ] 1172 * [ע��] �Ľӿڣ���ע�ⲻҪ����ķ���ֵ������֤��mt_tcpsendrcv_ex����ֵƥ�� [��Ҫ] 1173 * @param dst -�����͵�Ŀ�ĵ�ַ 1174 * @param pkg -�������װ�İ��� 1175 * @param len -�������װ�İ��峤�� 1176 * @param rcv_buf -���������������ο����� keep_rcv_buf�� 1177 * @param recv_pkg_size -����Ӧ�����buff�ij�ʼ��С����������0ʱ�����ر���ʵ�ʳ��ȡ� 1178 * @param timeout -��ʱʱ��, ��λms 1179 * @param check_func -��ⱨ���Ƿ�ɹ����ﺯ�� 1180 * @param msg_ctx -�������ĵ������ı����� 1181 * @param keep_rcv_buf -true,��ܽ�����rcv_buf��ֵ������������ҵ�����ͷŸ�buf���������ڵ��ý���ǰ�ͷŸ�buf����ע�⡿ 1182 * ҵ����Ҫ�Լ���msg_ctx�б��������Ϣ�������ͨ��malloc�����ڴ桿 1183 * @return 0 �ɹ�, -1 ��socketʧ��, -2 ��������ʧ��, -3 ����Ӧ��ʧ��, 1184 * -4 ����ʧ��, -5 ��ⱨ��ʧ��, -6 ���տռ䲻��, -7 ��������ر�����, -10 ������Ч 1185 */ 1186 int mt_tcpsendrcv_short(struct sockaddr_in* dst, void* pkg, int len, void*& rcv_buf, int& recv_pkg_size, 1187 int timeout, MtFuncTcpMsgChecker check_func, void* msg_ctx, bool keep_rcv_buf) 1188 { 1189 int ret = 0, rc = 0; 1190 int addr_len = sizeof(struct sockaddr_in); 1191 utime64_t start_ms = MtFrame::Instance()->GetLastClock(); 1192 utime64_t cost_time = 0; 1193 int time_left = timeout; 1194 1195 // 1. ������� 1196 if(!dst || !pkg || len<1) 1197 { 1198 MTLOG_ERROR("input params invalid, dst[%p], pkg[%p], len[%d], fun[%p]", 1199 dst, pkg, len, check_func); 1200 return -10; 1201 } 1202 1203 // 2. ����TCP socket 1204 int sock; 1205 sock = mt_tcp_create_sock(); 1206 if (sock < 0) 1207 { 1208 MTLOG_ERROR("create tcp socket failed, ret: %d", sock); 1209 return -1; 1210 } 1211 1212 // 3. ���Լ����½����� 1213 rc = MtFrame::connect(sock, (struct sockaddr *)dst, addr_len, time_left); 1214 if (rc < 0) 1215 { 1216 MTLOG_ERROR("socket[%d] connect failed, ret[%d][%m]", sock, rc); 1217 ret = -4; 1218 goto EXIT_LABEL; 1219 } 1220 1221 // 4. �������ݴ��� 1222 cost_time = MtFrame::Instance()->GetLastClock() - start_ms; 1223 time_left = (timeout > (int)cost_time) ? (timeout - (int)cost_time) : 0; 1224 rc = MtFrame::send(sock, pkg, len, 0, time_left); 1225 if (rc < 0) 1226 { 1227 MTLOG_ERROR("socket[%d] send failed, ret[%d][%m]", sock, rc); 1228 ret = -2; 1229 goto EXIT_LABEL; 1230 } 1231 1232 // 5. �������ݴ��� 1233 cost_time = MtFrame::Instance()->GetLastClock() - start_ms; 1234 time_left = (timeout > (int)cost_time) ? (timeout - (int)cost_time) : 0; 1235 rc = mt_tcp_check_recv(sock, rcv_buf, recv_pkg_size, 0, time_left, check_func, msg_ctx, keep_rcv_buf); 1236 1237 if (rc < 0) 1238 { 1239 MTLOG_ERROR("socket[%d] rcv failed, ret[%d][%m]", sock, rc); 1240 ret = rc; 1241 goto EXIT_LABEL; 1242 } 1243 1244 ret = 0; 1245 1246 EXIT_LABEL: 1247 if (sock >= 0) 1248 ::close(sock); 1249 1250 return ret; 1251 } 1252 1253 1254 1255 /** 1256 * @brief TCP�շ��ӿڣ�����ѡ���˱������ӻ��߶����� 1257 * [ע��] tcp���շ���buff, ��������static����, ����������Ĵ��� [��Ҫ] 1258 * @param dst -�����͵�Ŀ�ĵ�ַ 1259 * @param pkg -�������װ�İ��� 1260 * @param len -�������װ�İ��峤�� 1261 * @param rcv_buf -���������������ο����� keep_rcv_buf�� 1262 * @param recv_pkg_size -����Ӧ�����buff�ij�ʼ��С����������0ʱ�����ر���ʵ�ʳ��ȡ� 1263 * @param timeout -��ʱʱ��, ��λms 1264 * @param check_func -��ⱨ���Ƿ�ɹ����ﺯ�� 1265 * @param msg_ctx -�������ĵ������ı����� 1266 * 1267 * @param type - �������� 1268 * MT_TCP_SHORT: һ��һ�������ӣ� 1269 * MT_TCP_LONG : һ��һ�ճ����ӣ� 1270 * MT_TCP_LONG_SNDONLY : ֻ�����ճ����ӣ� 1271 * MT_TCP_SHORT_SNDONLY: ֻ�����ն����ӣ� 1272 * @param keep_rcv_buf -true,��ܽ�����rcv_buf��ֵ������������ҵ�����ͷŸ�buf���������ڵ��ý���ǰ�ͷŸ�buf����ע�⡿ 1273 * ҵ����Ҫ�Լ���msg_ctx�б��������Ϣ�������ͨ��malloc�����ڴ桿 1274 * @return 0 �ɹ�, -1 ��socketʧ��, -2 ��������ʧ��, -3 ����Ӧ��ʧ��, 1275 * -4 ����ʧ��, -5 ��ⱨ��ʧ��, -6 ���տռ䲻��, -7 ��������ر�����, -10 ������Ч 1276 */ 1277 int mt_tcpsendrcv_ex(struct sockaddr_in* dst, void* pkg, int len, void*& rcv_buf, int& rcv_pkg_size, 1278 int timeout, MtFuncTcpMsgChecker check_func, void* msg_ctx, 1279 MT_TCP_CONN_TYPE type, bool keep_rcv_buf) 1280 { 1281 if(!dst || !pkg || len<1) 1282 { 1283 MTLOG_ERROR("input params invalid, dst[%p], pkg[%p], len[%d], fun[%p], msg_ctx[%p], type[%d]", 1284 dst, pkg, len, check_func, msg_ctx, type); 1285 return -10; 1286 } 1287 1288 switch (type) 1289 { 1290 // TCP�����ӵ������� 1291 case MT_TCP_LONG: 1292 { 1293 return mt_tcpsendrcv(dst, pkg, len, rcv_buf, rcv_pkg_size, timeout, check_func, msg_ctx, keep_rcv_buf); 1294 } 1295 1296 // TCP������ֻ������ 1297 case MT_TCP_LONG_SNDONLY: 1298 { 1299 return mt_tcpsend(dst, pkg, len, timeout); 1300 } 1301 1302 // TCP�����ӵ������� 1303 case MT_TCP_SHORT: 1304 { 1305 return mt_tcpsendrcv_short(dst, pkg, len, rcv_buf, rcv_pkg_size, timeout, check_func, msg_ctx, keep_rcv_buf); 1306 } 1307 1308 // TCP������ֻ������ 1309 case MT_TCP_SHORT_SNDONLY: 1310 { 1311 return mt_tcpsend_short(dst, pkg, len, timeout); 1312 } 1313 1314 default: 1315 { 1316 MTLOG_ERROR("input params invalid, dst[%p], pkg[%p], len[%d], fun[%p], msg_ctx[%p], type[%d]", 1317 dst, pkg, len, check_func, msg_ctx, type); 1318 return -10; 1319 } 1320 } 1321 1322 return 0; 1323 } 1324 1325 1326 1327 } 1328 1329 1330