1 2 /** 3 * Tencent is pleased to support the open source community by making MSEC available. 4 * 5 * Copyright (C) 2016 THL A29 Limited, a Tencent company. All rights reserved. 6 * 7 * Licensed under the GNU General Public License, Version 2.0 (the "License"); 8 * you may not use this file except in compliance with the License. You may 9 * obtain a copy of the License at 10 * 11 * https://opensource.org/licenses/GPL-2.0 12 * 13 * Unless required by applicable law or agreed to in writing, software distributed under the 14 * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, 15 * either express or implied. See the License for the specific language governing permissions 16 * and limitations under the License. 17 */ 18 19 20 /** 21 * @filename mt_sys_call.cpp 22 */ 23 24 #include "kqueue_proxy.h" 25 #include "micro_thread.h" 26 #include "mt_connection.h" 27 #include "mt_api.h" 28 #include "ff_api.h" 29 #include "mt_sys_hook.h" 30 31 namespace NS_MICRO_THREAD { 32 33 int mt_udpsendrcv(struct sockaddr_in* dst, void* pkg, int len, void* rcv_buf, int& buf_size, int timeout) 34 { 35 int ret = 0; 36 int rc = 0; 37 int flags = 1; 38 struct sockaddr_in from_addr = {0}; 39 int addr_len = sizeof(from_addr); 40 41 if(len<1 || buf_size<1 ||!dst || !pkg || !rcv_buf) 42 { 43 MTLOG_ERROR("mt_udpsendrcv input params invalid, dst[%p], pkg[%p], rcv_buf[%p], len[%d], buf_size[%d]", 44 dst, pkg, rcv_buf, len, buf_size); 45 return -10; 46 } 47 48 int sock = socket(PF_INET, SOCK_DGRAM, 0); 49 if ((sock < 0) || (ioctl(sock, FIONBIO, &flags) < 0)) 50 { 51 MT_ATTR_API(320842, 1); 52 MTLOG_ERROR("mt_udpsendrcv new sock failed, sock: %d, errno: %d (%m)", sock, errno); 53 ret = -1; 54 goto EXIT_LABEL; 55 } 56 57 rc = MtFrame::sendto(sock, pkg, len, 0, (struct sockaddr*)dst, (int)sizeof(*dst), timeout); 58 if (rc < 0) 59 { 60 MT_ATTR_API(320844, 1); 61 MTLOG_ERROR("mt_udpsendrcv send failed, rc: %d, errno: %d (%m)", rc, errno); 62 ret = -2; 63 goto EXIT_LABEL; 64 } 65 66 rc = MtFrame::recvfrom(sock, rcv_buf, buf_size, 0, (struct sockaddr*)&from_addr, (socklen_t*)&addr_len, timeout); 67 if (rc < 0) 68 { 69 MT_ATTR_API(320845, 1); 70 MTLOG_ERROR("mt_udpsendrcv recv failed, rc: %d, errno: %d (%m)", rc, errno); 71 ret = -3; 72 goto EXIT_LABEL; 73 } 74 buf_size = rc; 75 76 EXIT_LABEL: 77 78 if (sock > 0) 79 { 80 close(sock); 81 sock = -1; 82 } 83 84 return ret; 85 } 86 87 int mt_tcp_create_sock(void) 88 { 89 int fd; 90 int flag; 91 92 fd = ::socket(AF_INET, SOCK_STREAM, 0); 93 if (fd < 0) 94 { 95 MTLOG_ERROR("create tcp socket failed, error: %m"); 96 return -1; 97 } 98 99 flag = fcntl(fd, F_GETFL, 0); 100 if (flag == -1) 101 { 102 ::close(fd); 103 MTLOG_ERROR("get fd flags failed, error: %m"); 104 return -2; 105 } 106 107 if (flag & O_NONBLOCK) 108 return fd; 109 110 if (fcntl(fd, F_SETFL, flag | O_NONBLOCK | O_NDELAY) == -1) 111 { 112 ::close(fd); 113 MTLOG_ERROR("set fd flags failed, error: %m"); 114 return -3; 115 } 116 117 return fd; 118 } 119 120 static TcpKeepConn* mt_tcp_get_keep_conn(struct sockaddr_in* dst, int& sock) 121 { 122 KqueuerObj* ntfy_obj = NtfyObjMgr::Instance()->GetNtfyObj(NTFY_OBJ_THREAD, 0); 123 if (NULL == ntfy_obj) 124 { 125 MTLOG_ERROR("get notify failed, logit"); 126 return NULL; 127 } 128 129 TcpKeepConn* conn = dynamic_cast<TcpKeepConn*>(ConnectionMgr::Instance()->GetConnection(OBJ_TCP_KEEP, dst)); 130 if (NULL == conn) 131 { 132 MTLOG_ERROR("get connection failed, dst[%p]", dst); 133 NtfyObjMgr::Instance()->FreeNtfyObj(ntfy_obj); 134 return NULL; 135 } 136 conn->SetNtfyObj(ntfy_obj); 137 138 int osfd = conn->CreateSocket(); 139 if (osfd < 0) 140 { 141 ConnectionMgr::Instance()->FreeConnection(conn, true); 142 MTLOG_ERROR("create socket failed, ret[%d]", osfd); 143 return NULL; 144 } 145 146 sock = osfd; 147 return conn; 148 } 149 150 static int mt_tcp_check_recv(int sock, char* rcv_buf, int &len, int flags, int timeout, MtFuncTcpMsgLen func) 151 { 152 int recv_len = 0; 153 utime64_t start_ms = MtFrame::Instance()->GetLastClock(); 154 do 155 { 156 utime64_t cost_time = MtFrame::Instance()->GetLastClock() - start_ms; 157 if (cost_time > (utime64_t)timeout) 158 { 159 errno = ETIME; 160 MTLOG_ERROR("tcp socket[%d] recv not ok, timeout", sock); 161 return -3; 162 } 163 164 int rc = MtFrame::recv(sock, (rcv_buf + recv_len), (len - recv_len), 0, (timeout - (int)cost_time)); 165 if (rc < 0) 166 { 167 MTLOG_ERROR("tcp socket[%d] recv failed ret[%d][%m]", sock, rc); 168 return -3; 169 } 170 else if (rc == 0) 171 { 172 len = recv_len; 173 MTLOG_ERROR("tcp socket[%d] remote close", sock); 174 return -7; 175 } 176 recv_len += rc; 177 178 rc = func(rcv_buf, recv_len); 179 if (rc < 0) 180 { 181 MTLOG_ERROR("tcp socket[%d] user check pkg error[%d]", sock, rc); 182 return -5; 183 } 184 else if (rc == 0) 185 { 186 if (len == recv_len) 187 { 188 MTLOG_ERROR("tcp socket[%d] user check pkg not ok, but no more buff", sock); 189 return -6; 190 } 191 continue; 192 } 193 else 194 { 195 if (rc > recv_len) 196 { 197 continue; 198 } 199 else 200 { 201 len = rc; 202 break; 203 } 204 } 205 } while (true); 206 207 return 0; 208 } 209 210 int mt_tcpsendrcv(struct sockaddr_in* dst, void* pkg, int len, void* rcv_buf, int& buf_size, int timeout, MtFuncTcpMsgLen func) 211 { 212 if (!dst || !pkg || !rcv_buf || !func || len<1 || buf_size<1) 213 { 214 MTLOG_ERROR("input params invalid, dst[%p], pkg[%p], rcv_buf[%p], fun[%p], len[%d], buf_size[%d]", 215 dst, pkg, rcv_buf, func, len, buf_size); 216 return -10; 217 } 218 219 int ret = 0, rc = 0; 220 int addr_len = sizeof(struct sockaddr_in); 221 utime64_t start_ms = MtFrame::Instance()->GetLastClock(); 222 utime64_t cost_time = 0; 223 int time_left = timeout; 224 225 int sock = -1; 226 TcpKeepConn* conn = mt_tcp_get_keep_conn(dst, sock); 227 if ((conn == NULL) || (sock < 0)) 228 { 229 MTLOG_ERROR("socket[%d] get conn failed, ret[%m]", sock); 230 ret = -1; 231 goto EXIT_LABEL; 232 } 233 234 rc = MtFrame::connect(sock, (struct sockaddr *)dst, addr_len, time_left); 235 if (rc < 0) 236 { 237 MTLOG_ERROR("socket[%d] connect failed, ret[%d][%m]", sock, rc); 238 ret = -4; 239 goto EXIT_LABEL; 240 } 241 242 cost_time = MtFrame::Instance()->GetLastClock() - start_ms; 243 time_left = (timeout > (int)cost_time) ? (timeout - (int)cost_time) : 0; 244 rc = MtFrame::send(sock, pkg, len, 0, time_left); 245 if (rc < 0) 246 { 247 MTLOG_ERROR("socket[%d] send failed, ret[%d][%m]", sock, rc); 248 ret = -2; 249 goto EXIT_LABEL; 250 } 251 252 cost_time = MtFrame::Instance()->GetLastClock() - start_ms; 253 time_left = (timeout > (int)cost_time) ? (timeout - (int)cost_time) : 0; 254 rc = mt_tcp_check_recv(sock, (char*)rcv_buf, buf_size, 0, time_left, func); 255 if (rc < 0) 256 { 257 MTLOG_ERROR("socket[%d] rcv failed, ret[%d][%m]", sock, rc); 258 ret = rc; 259 goto EXIT_LABEL; 260 } 261 262 ret = 0; 263 264 EXIT_LABEL: 265 266 if (conn != NULL) 267 { 268 ConnectionMgr::Instance()->FreeConnection(conn, (ret < 0)); 269 } 270 271 return ret; 272 } 273 274 int mt_tcpsendrcv_short(struct sockaddr_in* dst, void* pkg, int len, void* rcv_buf, int& buf_size, int timeout, MtFuncTcpMsgLen func) 275 { 276 int ret = 0, rc = 0; 277 int addr_len = sizeof(struct sockaddr_in); 278 utime64_t start_ms = MtFrame::Instance()->GetLastClock(); 279 utime64_t cost_time = 0; 280 int time_left = timeout; 281 282 if (!dst || !pkg || !rcv_buf || !func || len<1 || buf_size<1) 283 { 284 MTLOG_ERROR("input params invalid, dst[%p], pkg[%p], rcv_buf[%p], fun[%p], len[%d], buf_size[%d]", 285 dst, pkg, rcv_buf, func, len, buf_size); 286 return -10; 287 } 288 289 int sock; 290 sock = mt_tcp_create_sock(); 291 if (sock < 0) 292 { 293 MTLOG_ERROR("create tcp socket failed, ret: %d", sock); 294 return -1; 295 } 296 297 rc = MtFrame::connect(sock, (struct sockaddr *)dst, addr_len, time_left); 298 if (rc < 0) 299 { 300 MTLOG_ERROR("socket[%d] connect failed, ret[%d][%m]", sock, rc); 301 ret = -4; 302 goto EXIT_LABEL; 303 } 304 305 cost_time = MtFrame::Instance()->GetLastClock() - start_ms; 306 time_left = (timeout > (int)cost_time) ? (timeout - (int)cost_time) : 0; 307 rc = MtFrame::send(sock, pkg, len, 0, time_left); 308 if (rc < 0) 309 { 310 MTLOG_ERROR("socket[%d] send failed, ret[%d][%m]", sock, rc); 311 ret = -2; 312 goto EXIT_LABEL; 313 } 314 315 cost_time = MtFrame::Instance()->GetLastClock() - start_ms; 316 time_left = (timeout > (int)cost_time) ? (timeout - (int)cost_time) : 0; 317 rc = mt_tcp_check_recv(sock, (char*)rcv_buf, buf_size, 0, time_left, func); 318 if (rc < 0) 319 { 320 MTLOG_ERROR("socket[%d] rcv failed, ret[%d][%m]", sock, rc); 321 ret = rc; 322 goto EXIT_LABEL; 323 } 324 325 ret = 0; 326 327 EXIT_LABEL: 328 if (sock >= 0) 329 ::close(sock); 330 331 return ret; 332 } 333 334 int mt_tcpsend(struct sockaddr_in* dst, void* pkg, int len, int timeout) 335 { 336 if (!dst || !pkg || len<1) 337 { 338 MTLOG_ERROR("input params invalid, dst[%p], pkg[%p], len[%d]", dst, pkg, len); 339 return -10; 340 } 341 342 int ret = 0, rc = 0; 343 int addr_len = sizeof(struct sockaddr_in); 344 utime64_t start_ms = MtFrame::Instance()->GetLastClock(); 345 utime64_t cost_time = 0; 346 int time_left = timeout; 347 348 int sock = -1; 349 TcpKeepConn* conn = mt_tcp_get_keep_conn(dst, sock); 350 if ((conn == NULL) || (sock < 0)) 351 { 352 MTLOG_ERROR("socket[%d] get conn failed, ret[%m]", sock); 353 ret = -1; 354 goto EXIT_LABEL; 355 } 356 357 rc = MtFrame::connect(sock, (struct sockaddr *)dst, addr_len, time_left); 358 if (rc < 0) 359 { 360 MTLOG_ERROR("socket[%d] connect failed, ret[%d][%m]", sock, rc); 361 ret = -4; 362 goto EXIT_LABEL; 363 } 364 365 cost_time = MtFrame::Instance()->GetLastClock() - start_ms; 366 time_left = (timeout > (int)cost_time) ? (timeout - (int)cost_time) : 0; 367 rc = MtFrame::send(sock, pkg, len, 0, time_left); 368 if (rc < 0) 369 { 370 MTLOG_ERROR("socket[%d] send failed, ret[%d][%m]", sock, rc); 371 ret = -2; 372 goto EXIT_LABEL; 373 } 374 375 ret = 0; 376 377 EXIT_LABEL: 378 379 if (conn != NULL) 380 { 381 ConnectionMgr::Instance()->FreeConnection(conn, (ret < 0)); 382 } 383 384 return ret; 385 } 386 387 int mt_tcpsend_short(struct sockaddr_in* dst, void* pkg, int len, int timeout) 388 { 389 if (!dst || !pkg || len<1) 390 { 391 MTLOG_ERROR("input params invalid, dst[%p], pkg[%p], len[%d]", dst, pkg, len); 392 return -10; 393 } 394 395 int ret = 0, rc = 0; 396 int addr_len = sizeof(struct sockaddr_in); 397 utime64_t start_ms = MtFrame::Instance()->GetLastClock(); 398 utime64_t cost_time = 0; 399 int time_left = timeout; 400 401 int sock = -1; 402 sock = mt_tcp_create_sock(); 403 if (sock < 0) 404 { 405 MTLOG_ERROR("create tcp socket failed, ret: %d", sock); 406 ret = -1; 407 goto EXIT_LABEL; 408 } 409 410 rc = MtFrame::connect(sock, (struct sockaddr *)dst, addr_len, time_left); 411 if (rc < 0) 412 { 413 MTLOG_ERROR("socket[%d] connect failed, ret[%d][%m]", sock, rc); 414 ret = -4; 415 goto EXIT_LABEL; 416 } 417 418 cost_time = MtFrame::Instance()->GetLastClock() - start_ms; 419 time_left = (timeout > (int)cost_time) ? (timeout - (int)cost_time) : 0; 420 rc = MtFrame::send(sock, pkg, len, 0, time_left); 421 if (rc < 0) 422 { 423 MTLOG_ERROR("socket[%d] send failed, ret[%d][%m]", sock, rc); 424 ret = -2; 425 goto EXIT_LABEL; 426 } 427 428 ret = 0; 429 430 EXIT_LABEL: 431 432 if (sock >= 0) 433 ::close(sock); 434 435 return ret; 436 } 437 438 int mt_tcpsendrcv_ex(struct sockaddr_in* dst, void* pkg, int len, void* rcv_buf, int* buf_size, int timeout, MtFuncTcpMsgLen func, MT_TCP_CONN_TYPE type) 439 { 440 if(!dst || !pkg || len<1) 441 { 442 MTLOG_ERROR("input params invalid, dst[%p], pkg[%p], rcv_buf[%p], fun[%p], len[%d], buf_size[%p]type[%d]", 443 dst, pkg, rcv_buf, func, len, buf_size,type); 444 return -10; 445 } 446 447 switch (type) 448 { 449 case MT_TCP_LONG: 450 { 451 return mt_tcpsendrcv(dst, pkg, len, rcv_buf, *buf_size, timeout, func); 452 } 453 454 case MT_TCP_LONG_SNDONLY: 455 { 456 return mt_tcpsend(dst, pkg, len, timeout); 457 } 458 459 case MT_TCP_SHORT: 460 { 461 return mt_tcpsendrcv_short(dst, pkg, len, rcv_buf, *buf_size, timeout, func); 462 } 463 464 case MT_TCP_SHORT_SNDONLY: 465 { 466 return mt_tcpsend_short(dst, pkg, len, timeout); 467 } 468 469 default: 470 { 471 MTLOG_ERROR("input params invalid, dst[%p], pkg[%p], rcv_buf[%p], fun[%p], len[%d], buf_size[%p]type[%d]", 472 dst, pkg, rcv_buf, func, len, buf_size,type); 473 return -10; 474 } 475 } 476 477 return 0; 478 } 479 480 static void mt_task_process(void* arg) 481 { 482 int rc = 0; 483 IMtTask* task = (IMtTask*)arg; 484 if (!task) 485 { 486 MTLOG_ERROR("Invalid arg, error"); 487 return; 488 } 489 490 rc = task->Process(); 491 if (rc != 0) 492 { 493 MTLOG_DEBUG("task process failed(%d), log", rc); 494 } 495 496 task->SetResult(rc); 497 498 return; 499 }; 500 501 int mt_exec_all_task(IMtTaskList& req_list) 502 { 503 MtFrame* mtframe = MtFrame::Instance(); 504 MicroThread* thread = mtframe->GetActiveThread(); 505 IMtTask* task = NULL; 506 MicroThread* sub = NULL; 507 MicroThread* tmp = NULL; 508 int rc = -1; 509 510 MicroThread::SubThreadList list; 511 TAILQ_INIT(&list); 512 513 if (0 == req_list.size()) 514 { 515 MTLOG_DEBUG("no task for execult"); 516 return 0; 517 } 518 519 for (IMtTaskList::iterator it = req_list.begin(); it != req_list.end(); ++it) 520 { 521 task = *it; 522 sub = MtFrame::CreateThread(mt_task_process, task, false); 523 if (NULL == sub) 524 { 525 MTLOG_ERROR("create sub thread failed"); 526 goto EXIT_LABEL; 527 } 528 529 sub->SetType(MicroThread::SUB_THREAD); 530 TAILQ_INSERT_TAIL(&list, sub, _sub_entry); 531 } 532 533 TAILQ_FOREACH_SAFE(sub, &list, _sub_entry, tmp) 534 { 535 TAILQ_REMOVE(&list, sub, _sub_entry); 536 thread->AddSubThread(sub); 537 mtframe->InsertRunable(sub); 538 } 539 540 thread->Wait(); 541 rc = 0; 542 543 EXIT_LABEL: 544 545 TAILQ_FOREACH_SAFE(sub, &list, _sub_entry, tmp) 546 { 547 TAILQ_REMOVE(&list, sub, _sub_entry); 548 mtframe->FreeThread(sub); 549 } 550 551 return rc; 552 553 } 554 555 void mt_set_msg_private(void *data) 556 { 557 MicroThread *msg_thread = MtFrame::Instance()->GetRootThread(); 558 if (msg_thread != NULL) 559 msg_thread->SetPrivate(data); 560 } 561 562 void* mt_get_msg_private() 563 { 564 MicroThread *msg_thread = MtFrame::Instance()->GetRootThread(); 565 if (NULL == msg_thread) 566 { 567 return NULL; 568 } 569 570 return msg_thread->GetPrivate(); 571 } 572 573 bool mt_init_frame(int argc, char * const argv[]) 574 { 575 if (argc) { 576 ff_init(argc, argv); 577 ff_set_hook_flag(); 578 } 579 memset(&g_mt_syscall_tab, 0, sizeof(g_mt_syscall_tab)); 580 return MtFrame::Instance()->InitFrame(); 581 } 582 583 void mt_set_stack_size(unsigned int bytes) 584 { 585 ThreadPool::SetDefaultStackSize(bytes); 586 } 587 588 int mt_recvfrom(int fd, void *buf, int len, int flags, struct sockaddr *from, socklen_t *fromlen, int timeout) 589 { 590 return MtFrame::recvfrom(fd, buf, len, flags, from, fromlen, timeout); 591 } 592 593 int mt_sendto(int fd, const void *msg, int len, int flags, const struct sockaddr *to, int tolen, int timeout) 594 { 595 return MtFrame::sendto(fd, msg, len, flags, to, tolen, timeout); 596 } 597 598 int mt_connect(int fd, const struct sockaddr *addr, int addrlen, int timeout) 599 { 600 return MtFrame::connect(fd, addr, addrlen, timeout); 601 } 602 603 int mt_accept(int fd, struct sockaddr *addr, socklen_t *addrlen, int timeout) 604 { 605 return MtFrame::accept(fd, addr, addrlen, timeout); 606 } 607 608 ssize_t mt_read(int fd, void *buf, size_t nbyte, int timeout) 609 { 610 return MtFrame::read(fd, buf, nbyte, timeout); 611 } 612 613 ssize_t mt_write(int fd, const void *buf, size_t nbyte, int timeout) 614 { 615 return MtFrame::write(fd, buf, nbyte, timeout); 616 } 617 618 ssize_t mt_recv(int fd, void *buf, int len, int flags, int timeout) 619 { 620 return MtFrame::recv(fd, buf, len, flags, timeout); 621 } 622 623 ssize_t mt_send(int fd, const void *buf, size_t nbyte, int flags, int timeout) 624 { 625 return MtFrame::send(fd, buf, nbyte, flags, timeout); 626 } 627 628 void mt_sleep(int ms) 629 { 630 MtFrame::sleep(ms); 631 } 632 633 unsigned long long mt_time_ms(void) 634 { 635 return MtFrame::Instance()->GetLastClock(); 636 } 637 638 int mt_wait_events(int fd, int events, int timeout) 639 { 640 return MtFrame::Instance()->WaitEvents(fd, events, timeout); 641 } 642 643 void* mt_start_thread(void* entry, void* args) 644 { 645 return MtFrame::Instance()->CreateThread((ThreadStart)entry, args, true); 646 } 647 648 #define BUF_ALIGNMENT_SIZE 4096 649 #define BUF_ALIGN_SIZE(x) (((x)+BUF_ALIGNMENT_SIZE-1)&~(BUF_ALIGNMENT_SIZE-1)) 650 #define BUF_DEFAULT_SIZE 4096 651 652 class ScopedBuf 653 { 654 public: 655 ScopedBuf(void*& buf_keeper, bool keep) 656 :buf_keeper_(buf_keeper),buf_(0),len_(0),len_watermark_(0),keep_(keep) 657 {} 658 659 int Alloc(int len) 660 { 661 if(len<len_) 662 { 663 return -1; 664 } 665 666 if(len==0) 667 { 668 len = BUF_ALIGNMENT_SIZE; 669 } 670 if(len_==len) 671 { 672 return 0; 673 } 674 675 len_ = BUF_ALIGN_SIZE(len); 676 if(len_==0) 677 { 678 len_ = BUF_DEFAULT_SIZE; 679 } 680 len_watermark_ = len_-BUF_ALIGNMENT_SIZE; 681 char* tmp = (char*)realloc(buf_, len_); 682 if(tmp==NULL) 683 { 684 return -2; 685 } 686 687 buf_ = tmp; 688 return 0; 689 } 690 691 void reset() 692 { 693 if(keep_) 694 { 695 buf_keeper_ = (void*)buf_; 696 buf_ = NULL; 697 } 698 } 699 700 ~ScopedBuf() 701 { 702 if(buf_!=NULL) 703 { 704 free(buf_); 705 buf_ = NULL; 706 } 707 } 708 709 public: 710 void* &buf_keeper_; 711 char* buf_; 712 int len_; 713 int len_watermark_; 714 bool keep_; 715 716 }; 717 718 static int mt_tcp_check_recv(int sock, void*& rcv_buf, int &len, int flags, 719 int timeout, MtFuncTcpMsgChecker check_func, void* msg_ctx, bool keep_rcv_buf) 720 { 721 int recv_len = 0; 722 utime64_t start_ms = MtFrame::Instance()->GetLastClock(); 723 724 int rc = 0; 725 int ret = 0; 726 int pkg_len = 0; 727 bool msg_len_detected = false; 728 729 ScopedBuf sbuf(rcv_buf, keep_rcv_buf); 730 ret = sbuf.Alloc(len); 731 732 if(ret!=0) 733 { 734 MTLOG_ERROR("tcp socket[%d] recv failed ret[%d], alloc rcv buf failed, [%m]", sock, ret); 735 return -11; 736 } 737 738 do 739 { 740 utime64_t cost_time = MtFrame::Instance()->GetLastClock() - start_ms; 741 if (cost_time > (utime64_t)timeout) 742 { 743 errno = ETIME; 744 MTLOG_ERROR("tcp socket[%d] recv not ok, timeout", sock); 745 return -3; 746 } 747 748 rc = MtFrame::recv(sock, (sbuf.buf_ + recv_len), (sbuf.len_ - recv_len), 0, (timeout - (int)cost_time)); 749 if (rc < 0) 750 { 751 MTLOG_ERROR("tcp socket[%d] recv failed ret[%d][%m]", sock, rc); 752 return -3; 753 } 754 else if (rc == 0) 755 { 756 757 if(recv_len==0) 758 { 759 MTLOG_ERROR("tcp socket[%d] remote close", sock); 760 return -7; 761 } 762 763 rc = check_func(sbuf.buf_, recv_len, true, msg_ctx, msg_len_detected); 764 765 if(rc!=recv_len) 766 { 767 MTLOG_ERROR("tcp socket[%d] remote close", sock); 768 return -7; 769 } 770 len = recv_len; 771 break; 772 } 773 recv_len += rc; 774 775 if((!msg_len_detected)||recv_len==pkg_len) 776 { 777 rc = check_func(sbuf.buf_, recv_len, false, msg_ctx,msg_len_detected); 778 if(msg_len_detected) 779 { 780 pkg_len = rc; 781 } 782 } 783 else 784 { 785 rc = pkg_len; 786 } 787 788 if (rc < 0) 789 { 790 MTLOG_ERROR("tcp socket[%d] user check pkg error[%d]", sock, rc); 791 return -5; 792 } 793 else if (rc == 0) 794 { 795 if(sbuf.len_ > recv_len) 796 { 797 continue; 798 } 799 800 ret = sbuf.Alloc(sbuf.len_<<1); 801 802 if(ret!=0) 803 { 804 MTLOG_ERROR("tcp socket[%d] recv failed ret[%d], alloc rcv buf failed, [%m]", sock, ret); 805 return -11; 806 } 807 } 808 else 809 { 810 if (rc > recv_len) 811 { 812 if(sbuf.len_ > recv_len) 813 { 814 continue; 815 } 816 817 ret = sbuf.Alloc(rc); 818 819 if(ret!=0) 820 { 821 MTLOG_ERROR("tcp socket[%d] recv failed ret[%d], alloc rcv buf failed, [%m]", sock, ret); 822 return -11; 823 } 824 } 825 else if(rc==recv_len) 826 { 827 len = rc; 828 break; 829 } 830 else 831 { 832 MTLOG_ERROR("tcp socket[%d] user check pkg error, pkg len < recv_len", sock); 833 return -5; 834 } 835 } 836 } while (true); 837 838 sbuf.reset(); 839 840 return 0; 841 } 842 843 int mt_tcpsendrcv(struct sockaddr_in* dst, void* pkg, int len, void*& rcv_buf, int& recv_pkg_size, 844 int timeout, MtFuncTcpMsgChecker check_func, void* msg_ctx, bool keep_rcv_buf) 845 { 846 if(!dst || !pkg || len<1) 847 { 848 MTLOG_ERROR("input params invalid, dst[%p], pkg[%p], len[%d], fun[%p]", 849 dst, pkg, len, check_func); 850 return -10; 851 } 852 853 854 int ret = 0, rc = 0; 855 int addr_len = sizeof(struct sockaddr_in); 856 utime64_t start_ms = MtFrame::Instance()->GetLastClock(); 857 utime64_t cost_time = 0; 858 int time_left = timeout; 859 860 int sock = -1; 861 TcpKeepConn* conn = mt_tcp_get_keep_conn(dst, sock); 862 if ((conn == NULL) || (sock < 0)) 863 { 864 MTLOG_ERROR("socket[%d] get conn failed, ret[%m]", sock); 865 ret = -1; 866 goto EXIT_LABEL; 867 } 868 869 rc = MtFrame::connect(sock, (struct sockaddr *)dst, addr_len, time_left); 870 if (rc < 0) 871 { 872 MTLOG_ERROR("socket[%d] connect failed, ret[%d][%m]", sock, rc); 873 ret = -4; 874 goto EXIT_LABEL; 875 } 876 877 cost_time = MtFrame::Instance()->GetLastClock() - start_ms; 878 time_left = (timeout > (int)cost_time) ? (timeout - (int)cost_time) : 0; 879 rc = MtFrame::send(sock, pkg, len, 0, time_left); 880 if (rc < 0) 881 { 882 MTLOG_ERROR("socket[%d] send failed, ret[%d][%m]", sock, rc); 883 ret = -2; 884 goto EXIT_LABEL; 885 } 886 887 cost_time = MtFrame::Instance()->GetLastClock() - start_ms; 888 time_left = (timeout > (int)cost_time) ? (timeout - (int)cost_time) : 0; 889 890 rc = mt_tcp_check_recv(sock, rcv_buf, recv_pkg_size, 0, time_left, check_func, msg_ctx, keep_rcv_buf); 891 if (rc < 0) 892 { 893 MTLOG_ERROR("socket[%d] rcv failed, ret[%d][%m]", sock, rc); 894 ret = rc; 895 goto EXIT_LABEL; 896 } 897 898 ret = 0; 899 900 EXIT_LABEL: 901 if (conn != NULL) 902 { 903 ConnectionMgr::Instance()->FreeConnection(conn, (ret < 0)); 904 } 905 906 return ret; 907 } 908 909 int mt_tcpsendrcv_short(struct sockaddr_in* dst, void* pkg, int len, void*& rcv_buf, int& recv_pkg_size, 910 int timeout, MtFuncTcpMsgChecker check_func, void* msg_ctx, bool keep_rcv_buf) 911 { 912 int ret = 0, rc = 0; 913 int addr_len = sizeof(struct sockaddr_in); 914 utime64_t start_ms = MtFrame::Instance()->GetLastClock(); 915 utime64_t cost_time = 0; 916 int time_left = timeout; 917 918 if(!dst || !pkg || len<1) 919 { 920 MTLOG_ERROR("input params invalid, dst[%p], pkg[%p], len[%d], fun[%p]", 921 dst, pkg, len, check_func); 922 return -10; 923 } 924 925 int sock; 926 sock = mt_tcp_create_sock(); 927 if (sock < 0) 928 { 929 MTLOG_ERROR("create tcp socket failed, ret: %d", sock); 930 return -1; 931 } 932 933 rc = MtFrame::connect(sock, (struct sockaddr *)dst, addr_len, time_left); 934 if (rc < 0) 935 { 936 MTLOG_ERROR("socket[%d] connect failed, ret[%d][%m]", sock, rc); 937 ret = -4; 938 goto EXIT_LABEL; 939 } 940 941 cost_time = MtFrame::Instance()->GetLastClock() - start_ms; 942 time_left = (timeout > (int)cost_time) ? (timeout - (int)cost_time) : 0; 943 rc = MtFrame::send(sock, pkg, len, 0, time_left); 944 if (rc < 0) 945 { 946 MTLOG_ERROR("socket[%d] send failed, ret[%d][%m]", sock, rc); 947 ret = -2; 948 goto EXIT_LABEL; 949 } 950 951 cost_time = MtFrame::Instance()->GetLastClock() - start_ms; 952 time_left = (timeout > (int)cost_time) ? (timeout - (int)cost_time) : 0; 953 rc = mt_tcp_check_recv(sock, rcv_buf, recv_pkg_size, 0, time_left, check_func, msg_ctx, keep_rcv_buf); 954 955 if (rc < 0) 956 { 957 MTLOG_ERROR("socket[%d] rcv failed, ret[%d][%m]", sock, rc); 958 ret = rc; 959 goto EXIT_LABEL; 960 } 961 962 ret = 0; 963 964 EXIT_LABEL: 965 if (sock >= 0) 966 ::close(sock); 967 968 return ret; 969 } 970 971 int mt_tcpsendrcv_ex(struct sockaddr_in* dst, void* pkg, int len, void*& rcv_buf, int& rcv_pkg_size, 972 int timeout, MtFuncTcpMsgChecker check_func, void* msg_ctx, 973 MT_TCP_CONN_TYPE type, bool keep_rcv_buf) 974 { 975 if(!dst || !pkg || len<1) 976 { 977 MTLOG_ERROR("input params invalid, dst[%p], pkg[%p], len[%d], fun[%p], msg_ctx[%p], type[%d]", 978 dst, pkg, len, check_func, msg_ctx, type); 979 return -10; 980 } 981 982 switch (type) 983 { 984 case MT_TCP_LONG: 985 { 986 return mt_tcpsendrcv(dst, pkg, len, rcv_buf, rcv_pkg_size, timeout, check_func, msg_ctx, keep_rcv_buf); 987 } 988 989 case MT_TCP_LONG_SNDONLY: 990 { 991 return mt_tcpsend(dst, pkg, len, timeout); 992 } 993 994 case MT_TCP_SHORT: 995 { 996 return mt_tcpsendrcv_short(dst, pkg, len, rcv_buf, rcv_pkg_size, timeout, check_func, msg_ctx, keep_rcv_buf); 997 } 998 999 case MT_TCP_SHORT_SNDONLY: 1000 { 1001 return mt_tcpsend_short(dst, pkg, len, timeout); 1002 } 1003 1004 default: 1005 { 1006 MTLOG_ERROR("input params invalid, dst[%p], pkg[%p], len[%d], fun[%p], msg_ctx[%p], type[%d]", 1007 dst, pkg, len, check_func, msg_ctx, type); 1008 return -10; 1009 } 1010 } 1011 1012 return 0; 1013 } 1014 1015 } 1016