1 2 /** 3 * Tencent is pleased to support the open source community by making MSEC available. 4 * 5 * Copyright (C) 2016 THL A29 Limited, a Tencent company. All rights reserved. 6 * 7 * Licensed under the GNU General Public License, Version 2.0 (the "License"); 8 * you may not use this file except in compliance with the License. You may 9 * obtain a copy of the License at 10 * 11 * https://opensource.org/licenses/GPL-2.0 12 * 13 * Unless required by applicable law or agreed to in writing, software distributed under the 14 * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, 15 * either express or implied. See the License for the specific language governing permissions 16 * and limitations under the License. 17 */ 18 19 20 /** 21 * @filename mt_sys_call.cpp 22 */ 23 24 #include "kqueue_proxy.h" 25 #include "micro_thread.h" 26 #include "mt_connection.h" 27 #include "mt_api.h" 28 #include "ff_api.h" 29 #include "mt_sys_hook.h" 30 31 namespace NS_MICRO_THREAD { 32 33 int mt_udpsendrcv(struct sockaddr_in* dst, void* pkg, int len, void* rcv_buf, int& buf_size, int timeout) 34 { 35 int ret = 0; 36 int rc = 0; 37 int flags = 1; 38 struct sockaddr_in from_addr = {0}; 39 int addr_len = sizeof(from_addr); 40 41 if(len<1 || buf_size<1 ||!dst || !pkg || !rcv_buf) 42 { 43 MTLOG_ERROR("mt_udpsendrcv input params invalid, dst[%p], pkg[%p], rcv_buf[%p], len[%d], buf_size[%d]", 44 dst, pkg, rcv_buf, len, buf_size); 45 return -10; 46 } 47 48 int sock = socket(PF_INET, SOCK_DGRAM, 0); 49 if ((sock < 0) || (ioctl(sock, FIONBIO, &flags) < 0)) 50 { 51 MT_ATTR_API(320842, 1); 52 MTLOG_ERROR("mt_udpsendrcv new sock failed, sock: %d, errno: %d (%m)", sock, errno); 53 ret = -1; 54 goto EXIT_LABEL; 55 } 56 57 rc = MtFrame::sendto(sock, pkg, len, 0, (struct sockaddr*)dst, (int)sizeof(*dst), timeout); 58 if (rc < 0) 59 { 60 MT_ATTR_API(320844, 1); 61 MTLOG_ERROR("mt_udpsendrcv send failed, rc: %d, errno: %d (%m)", rc, errno); 62 ret = -2; 63 goto EXIT_LABEL; 64 } 65 66 rc = MtFrame::recvfrom(sock, rcv_buf, buf_size, 0, (struct sockaddr*)&from_addr, (socklen_t*)&addr_len, timeout); 67 if (rc < 0) 68 { 69 MT_ATTR_API(320845, 1); 70 MTLOG_ERROR("mt_udpsendrcv recv failed, rc: %d, errno: %d (%m)", rc, errno); 71 ret = -3; 72 goto EXIT_LABEL; 73 } 74 buf_size = rc; 75 76 EXIT_LABEL: 77 78 if (sock > 0) 79 { 80 close(sock); 81 sock = -1; 82 } 83 84 return ret; 85 } 86 87 int mt_tcp_create_sock(void) 88 { 89 int fd; 90 int flag; 91 92 fd = ::socket(AF_INET, SOCK_STREAM, 0); 93 if (fd < 0) 94 { 95 MTLOG_ERROR("create tcp socket failed, error: %m"); 96 return -1; 97 } 98 99 flag = fcntl(fd, F_GETFL, 0); 100 if (flag == -1) 101 { 102 ::close(fd); 103 MTLOG_ERROR("get fd flags failed, error: %m"); 104 return -2; 105 } 106 107 if (flag & O_NONBLOCK) 108 return fd; 109 110 if (fcntl(fd, F_SETFL, flag | O_NONBLOCK | O_NDELAY) == -1) 111 { 112 ::close(fd); 113 MTLOG_ERROR("set fd flags failed, error: %m"); 114 return -3; 115 } 116 117 return fd; 118 } 119 120 static TcpKeepConn* mt_tcp_get_keep_conn(struct sockaddr_in* dst, int& sock) 121 { 122 KqueuerObj* ntfy_obj = NtfyObjMgr::Instance()->GetNtfyObj(NTFY_OBJ_THREAD, 0); 123 if (NULL == ntfy_obj) 124 { 125 MTLOG_ERROR("get notify failed, logit"); 126 return NULL; 127 } 128 129 TcpKeepConn* conn = dynamic_cast<TcpKeepConn*>(ConnectionMgr::Instance()->GetConnection(OBJ_TCP_KEEP, dst)); 130 if (NULL == conn) 131 { 132 MTLOG_ERROR("get connection failed, dst[%p]", dst); 133 NtfyObjMgr::Instance()->FreeNtfyObj(ntfy_obj); 134 return NULL; 135 } 136 conn->SetNtfyObj(ntfy_obj); 137 138 int osfd = conn->CreateSocket(); 139 if (osfd < 0) 140 { 141 ConnectionMgr::Instance()->FreeConnection(conn, true); 142 MTLOG_ERROR("create socket failed, ret[%d]", osfd); 143 return NULL; 144 } 145 146 sock = osfd; 147 return conn; 148 } 149 150 static int mt_tcp_check_recv(int sock, char* rcv_buf, int &len, int flags, int timeout, MtFuncTcpMsgLen func) 151 { 152 int recv_len = 0; 153 utime64_t start_ms = MtFrame::Instance()->GetLastClock(); 154 do 155 { 156 utime64_t cost_time = MtFrame::Instance()->GetLastClock() - start_ms; 157 if (cost_time > (utime64_t)timeout) 158 { 159 errno = ETIME; 160 MTLOG_ERROR("tcp socket[%d] recv not ok, timeout", sock); 161 return -3; 162 } 163 164 int rc = MtFrame::recv(sock, (rcv_buf + recv_len), (len - recv_len), 0, (timeout - (int)cost_time)); 165 if (rc < 0) 166 { 167 MTLOG_ERROR("tcp socket[%d] recv failed ret[%d][%m]", sock, rc); 168 return -3; 169 } 170 else if (rc == 0) 171 { 172 len = recv_len; 173 MTLOG_ERROR("tcp socket[%d] remote close", sock); 174 return -7; 175 } 176 recv_len += rc; 177 178 rc = func(rcv_buf, recv_len); 179 if (rc < 0) 180 { 181 MTLOG_ERROR("tcp socket[%d] user check pkg error[%d]", sock, rc); 182 return -5; 183 } 184 else if (rc == 0) 185 { 186 if (len == recv_len) 187 { 188 MTLOG_ERROR("tcp socket[%d] user check pkg not ok, but no more buff", sock); 189 return -6; 190 } 191 continue; 192 } 193 else 194 { 195 if (rc > recv_len) 196 { 197 continue; 198 } 199 else 200 { 201 len = rc; 202 break; 203 } 204 } 205 } while (true); 206 207 return 0; 208 } 209 210 int mt_tcpsendrcv(struct sockaddr_in* dst, void* pkg, int len, void* rcv_buf, int& buf_size, int timeout, MtFuncTcpMsgLen func) 211 { 212 if (!dst || !pkg || !rcv_buf || !func || len<1 || buf_size<1) 213 { 214 MTLOG_ERROR("input params invalid, dst[%p], pkg[%p], rcv_buf[%p], fun[%p], len[%d], buf_size[%d]", 215 dst, pkg, rcv_buf, func, len, buf_size); 216 return -10; 217 } 218 219 int ret = 0, rc = 0; 220 int addr_len = sizeof(struct sockaddr_in); 221 utime64_t start_ms = MtFrame::Instance()->GetLastClock(); 222 utime64_t cost_time = 0; 223 int time_left = timeout; 224 225 int sock = -1; 226 TcpKeepConn* conn = mt_tcp_get_keep_conn(dst, sock); 227 if ((conn == NULL) || (sock < 0)) 228 { 229 MTLOG_ERROR("socket[%d] get conn failed, ret[%m]", sock); 230 ret = -1; 231 goto EXIT_LABEL; 232 } 233 234 rc = MtFrame::connect(sock, (struct sockaddr *)dst, addr_len, time_left); 235 if (rc < 0) 236 { 237 MTLOG_ERROR("socket[%d] connect failed, ret[%d][%m]", sock, rc); 238 ret = -4; 239 goto EXIT_LABEL; 240 } 241 242 cost_time = MtFrame::Instance()->GetLastClock() - start_ms; 243 time_left = (timeout > (int)cost_time) ? (timeout - (int)cost_time) : 0; 244 rc = MtFrame::send(sock, pkg, len, 0, time_left); 245 if (rc < 0) 246 { 247 MTLOG_ERROR("socket[%d] send failed, ret[%d][%m]", sock, rc); 248 ret = -2; 249 goto EXIT_LABEL; 250 } 251 252 cost_time = MtFrame::Instance()->GetLastClock() - start_ms; 253 time_left = (timeout > (int)cost_time) ? (timeout - (int)cost_time) : 0; 254 rc = mt_tcp_check_recv(sock, (char*)rcv_buf, buf_size, 0, time_left, func); 255 if (rc < 0) 256 { 257 MTLOG_ERROR("socket[%d] rcv failed, ret[%d][%m]", sock, rc); 258 ret = rc; 259 goto EXIT_LABEL; 260 } 261 262 ret = 0; 263 264 EXIT_LABEL: 265 266 if (conn != NULL) 267 { 268 ConnectionMgr::Instance()->FreeConnection(conn, (ret < 0)); 269 } 270 271 return ret; 272 } 273 274 int mt_tcpsendrcv_short(struct sockaddr_in* dst, void* pkg, int len, void* rcv_buf, int& buf_size, int timeout, MtFuncTcpMsgLen func) 275 { 276 int ret = 0, rc = 0; 277 int addr_len = sizeof(struct sockaddr_in); 278 utime64_t start_ms = MtFrame::Instance()->GetLastClock(); 279 utime64_t cost_time = 0; 280 int time_left = timeout; 281 282 if (!dst || !pkg || !rcv_buf || !func || len<1 || buf_size<1) 283 { 284 MTLOG_ERROR("input params invalid, dst[%p], pkg[%p], rcv_buf[%p], fun[%p], len[%d], buf_size[%d]", 285 dst, pkg, rcv_buf, func, len, buf_size); 286 return -10; 287 } 288 289 int sock; 290 sock = mt_tcp_create_sock(); 291 if (sock < 0) 292 { 293 MTLOG_ERROR("create tcp socket failed, ret: %d", sock); 294 return -1; 295 } 296 297 rc = MtFrame::connect(sock, (struct sockaddr *)dst, addr_len, time_left); 298 if (rc < 0) 299 { 300 MTLOG_ERROR("socket[%d] connect failed, ret[%d][%m]", sock, rc); 301 ret = -4; 302 goto EXIT_LABEL; 303 } 304 305 cost_time = MtFrame::Instance()->GetLastClock() - start_ms; 306 time_left = (timeout > (int)cost_time) ? (timeout - (int)cost_time) : 0; 307 rc = MtFrame::send(sock, pkg, len, 0, time_left); 308 if (rc < 0) 309 { 310 MTLOG_ERROR("socket[%d] send failed, ret[%d][%m]", sock, rc); 311 ret = -2; 312 goto EXIT_LABEL; 313 } 314 315 cost_time = MtFrame::Instance()->GetLastClock() - start_ms; 316 time_left = (timeout > (int)cost_time) ? (timeout - (int)cost_time) : 0; 317 rc = mt_tcp_check_recv(sock, (char*)rcv_buf, buf_size, 0, time_left, func); 318 if (rc < 0) 319 { 320 MTLOG_ERROR("socket[%d] rcv failed, ret[%d][%m]", sock, rc); 321 ret = rc; 322 goto EXIT_LABEL; 323 } 324 325 ret = 0; 326 327 EXIT_LABEL: 328 if (sock >= 0) 329 ::close(sock); 330 331 return ret; 332 } 333 334 int mt_tcpsend(struct sockaddr_in* dst, void* pkg, int len, int timeout) 335 { 336 if (!dst || !pkg || len<1) 337 { 338 MTLOG_ERROR("input params invalid, dst[%p], pkg[%p], len[%d]", dst, pkg, len); 339 return -10; 340 } 341 342 int ret = 0, rc = 0; 343 int addr_len = sizeof(struct sockaddr_in); 344 utime64_t start_ms = MtFrame::Instance()->GetLastClock(); 345 utime64_t cost_time = 0; 346 int time_left = timeout; 347 348 int sock = -1; 349 TcpKeepConn* conn = mt_tcp_get_keep_conn(dst, sock); 350 if ((conn == NULL) || (sock < 0)) 351 { 352 MTLOG_ERROR("socket[%d] get conn failed, ret[%m]", sock); 353 ret = -1; 354 goto EXIT_LABEL; 355 } 356 357 rc = MtFrame::connect(sock, (struct sockaddr *)dst, addr_len, time_left); 358 if (rc < 0) 359 { 360 MTLOG_ERROR("socket[%d] connect failed, ret[%d][%m]", sock, rc); 361 ret = -4; 362 goto EXIT_LABEL; 363 } 364 365 cost_time = MtFrame::Instance()->GetLastClock() - start_ms; 366 time_left = (timeout > (int)cost_time) ? (timeout - (int)cost_time) : 0; 367 rc = MtFrame::send(sock, pkg, len, 0, time_left); 368 if (rc < 0) 369 { 370 MTLOG_ERROR("socket[%d] send failed, ret[%d][%m]", sock, rc); 371 ret = -2; 372 goto EXIT_LABEL; 373 } 374 375 ret = 0; 376 377 EXIT_LABEL: 378 379 if (conn != NULL) 380 { 381 ConnectionMgr::Instance()->FreeConnection(conn, (ret < 0)); 382 } 383 384 return ret; 385 } 386 387 int mt_tcpsend_short(struct sockaddr_in* dst, void* pkg, int len, int timeout) 388 { 389 if (!dst || !pkg || len<1) 390 { 391 MTLOG_ERROR("input params invalid, dst[%p], pkg[%p], len[%d]", dst, pkg, len); 392 return -10; 393 } 394 395 int ret = 0, rc = 0; 396 int addr_len = sizeof(struct sockaddr_in); 397 utime64_t start_ms = MtFrame::Instance()->GetLastClock(); 398 utime64_t cost_time = 0; 399 int time_left = timeout; 400 401 int sock = -1; 402 sock = mt_tcp_create_sock(); 403 if (sock < 0) 404 { 405 MTLOG_ERROR("create tcp socket failed, ret: %d", sock); 406 ret = -1; 407 goto EXIT_LABEL; 408 } 409 410 rc = MtFrame::connect(sock, (struct sockaddr *)dst, addr_len, time_left); 411 if (rc < 0) 412 { 413 MTLOG_ERROR("socket[%d] connect failed, ret[%d][%m]", sock, rc); 414 ret = -4; 415 goto EXIT_LABEL; 416 } 417 418 cost_time = MtFrame::Instance()->GetLastClock() - start_ms; 419 time_left = (timeout > (int)cost_time) ? (timeout - (int)cost_time) : 0; 420 rc = MtFrame::send(sock, pkg, len, 0, time_left); 421 if (rc < 0) 422 { 423 MTLOG_ERROR("socket[%d] send failed, ret[%d][%m]", sock, rc); 424 ret = -2; 425 goto EXIT_LABEL; 426 } 427 428 ret = 0; 429 430 EXIT_LABEL: 431 432 if (sock >= 0) 433 ::close(sock); 434 435 return ret; 436 } 437 438 int mt_tcpsendrcv_ex(struct sockaddr_in* dst, void* pkg, int len, void* rcv_buf, int* buf_size, int timeout, MtFuncTcpMsgLen func, MT_TCP_CONN_TYPE type) 439 { 440 if(!dst || !pkg || len<1) 441 { 442 MTLOG_ERROR("input params invalid, dst[%p], pkg[%p], rcv_buf[%p], fun[%p], len[%d], buf_size[%p]type[%d]", 443 dst, pkg, rcv_buf, func, len, buf_size,type); 444 return -10; 445 } 446 447 switch (type) 448 { 449 case MT_TCP_LONG: 450 { 451 return mt_tcpsendrcv(dst, pkg, len, rcv_buf, *buf_size, timeout, func); 452 } 453 454 case MT_TCP_LONG_SNDONLY: 455 { 456 return mt_tcpsend(dst, pkg, len, timeout); 457 } 458 459 case MT_TCP_SHORT: 460 { 461 return mt_tcpsendrcv_short(dst, pkg, len, rcv_buf, *buf_size, timeout, func); 462 } 463 464 case MT_TCP_SHORT_SNDONLY: 465 { 466 return mt_tcpsend_short(dst, pkg, len, timeout); 467 } 468 469 default: 470 { 471 MTLOG_ERROR("input params invalid, dst[%p], pkg[%p], rcv_buf[%p], fun[%p], len[%d], buf_size[%p]type[%d]", 472 dst, pkg, rcv_buf, func, len, buf_size,type); 473 return -10; 474 } 475 } 476 477 return 0; 478 } 479 480 static void mt_task_process(void* arg) 481 { 482 int rc = 0; 483 IMtTask* task = (IMtTask*)arg; 484 if (!task) 485 { 486 MTLOG_ERROR("Invalid arg, error"); 487 return; 488 } 489 490 rc = task->Process(); 491 if (rc != 0) 492 { 493 MTLOG_DEBUG("task process failed(%d), log", rc); 494 } 495 496 task->SetResult(rc); 497 498 return; 499 }; 500 501 int mt_exec_all_task(IMtTaskList& req_list) 502 { 503 MtFrame* mtframe = MtFrame::Instance(); 504 MicroThread* thread = mtframe->GetActiveThread(); 505 IMtTask* task = NULL; 506 MicroThread* sub = NULL; 507 MicroThread* tmp = NULL; 508 int rc = -1; 509 510 MicroThread::SubThreadList list; 511 TAILQ_INIT(&list); 512 513 if (0 == req_list.size()) 514 { 515 MTLOG_DEBUG("no task for execult"); 516 return 0; 517 } 518 519 for (IMtTaskList::iterator it = req_list.begin(); it != req_list.end(); ++it) 520 { 521 task = *it; 522 sub = MtFrame::CreateThread(mt_task_process, task, false); 523 if (NULL == sub) 524 { 525 MTLOG_ERROR("create sub thread failed"); 526 goto EXIT_LABEL; 527 } 528 529 sub->SetType(MicroThread::SUB_THREAD); 530 TAILQ_INSERT_TAIL(&list, sub, _sub_entry); 531 } 532 533 TAILQ_FOREACH_SAFE(sub, &list, _sub_entry, tmp) 534 { 535 TAILQ_REMOVE(&list, sub, _sub_entry); 536 thread->AddSubThread(sub); 537 mtframe->InsertRunable(sub); 538 } 539 540 thread->Wait(); 541 rc = 0; 542 543 EXIT_LABEL: 544 545 TAILQ_FOREACH_SAFE(sub, &list, _sub_entry, tmp) 546 { 547 TAILQ_REMOVE(&list, sub, _sub_entry); 548 mtframe->FreeThread(sub); 549 } 550 551 return rc; 552 553 } 554 555 void mt_set_msg_private(void *data) 556 { 557 MicroThread *msg_thread = MtFrame::Instance()->GetRootThread(); 558 if (msg_thread != NULL) 559 msg_thread->SetPrivate(data); 560 } 561 562 void* mt_get_msg_private() 563 { 564 MicroThread *msg_thread = MtFrame::Instance()->GetRootThread(); 565 if (NULL == msg_thread) 566 { 567 return NULL; 568 } 569 570 return msg_thread->GetPrivate(); 571 } 572 573 bool mt_init_frame(int argc, char * const argv[]) 574 { 575 if (argc) { 576 ff_init(argc, argv); 577 ff_set_hook_flag(); 578 } 579 memset(&g_mt_syscall_tab, 0, sizeof(g_mt_syscall_tab)); 580 return MtFrame::Instance()->InitFrame(); 581 } 582 583 void mt_set_stack_size(unsigned int bytes) 584 { 585 ThreadPool::SetDefaultStackSize(bytes); 586 } 587 588 int mt_recvfrom(int fd, void *buf, int len, int flags, struct sockaddr *from, socklen_t *fromlen, int timeout) 589 { 590 return MtFrame::recvfrom(fd, buf, len, flags, from, fromlen, timeout); 591 } 592 593 int mt_sendto(int fd, const void *msg, int len, int flags, const struct sockaddr *to, int tolen, int timeout) 594 { 595 return MtFrame::sendto(fd, msg, len, flags, to, tolen, timeout); 596 } 597 598 int mt_connect(int fd, const struct sockaddr *addr, int addrlen, int timeout) 599 { 600 return MtFrame::connect(fd, addr, addrlen, timeout); 601 } 602 603 int mt_accept(int fd, struct sockaddr *addr, socklen_t *addrlen, int timeout) 604 { 605 return MtFrame::accept(fd, addr, addrlen, timeout); 606 } 607 608 ssize_t mt_read(int fd, void *buf, size_t nbyte, int timeout) 609 { 610 return MtFrame::read(fd, buf, nbyte, timeout); 611 } 612 613 ssize_t mt_write(int fd, const void *buf, size_t nbyte, int timeout) 614 { 615 return MtFrame::write(fd, buf, nbyte, timeout); 616 } 617 618 ssize_t mt_recv(int fd, void *buf, int len, int flags, int timeout) 619 { 620 return MtFrame::recv(fd, buf, len, flags, timeout); 621 } 622 623 ssize_t mt_send(int fd, const void *buf, size_t nbyte, int flags, int timeout) 624 { 625 return MtFrame::send(fd, buf, nbyte, flags, timeout); 626 } 627 628 void mt_sleep(int ms) 629 { 630 MtFrame::sleep(ms); 631 } 632 633 unsigned long long mt_time_ms(void) 634 { 635 return MtFrame::Instance()->GetLastClock(); 636 } 637 638 int mt_wait_events(int fd, int events, int timeout) 639 { 640 return MtFrame::Instance()->WaitEvents(fd, events, timeout); 641 } 642 643 void* mt_start_thread(void* entry, void* args) 644 { 645 return MtFrame::Instance()->CreateThread((ThreadStart)entry, args, true); 646 } 647 648 void* mt_active_thread() 649 { 650 return MtFrame::Instance()->GetActiveThread(); 651 } 652 653 void mt_thread_wait(int ms) 654 { 655 MtFrame::Instance()->WaitNotify(ms); 656 } 657 658 void mt_thread_wakeup_wait(void * thread_p) 659 { 660 MtFrame::Instance()->NotifyThread((MicroThread *) thread_p); 661 } 662 663 void mt_swap_thread() 664 { 665 return MtFrame::Instance()->SwapDaemonThread(); 666 } 667 668 #define BUF_ALIGNMENT_SIZE 4096 669 #define BUF_ALIGN_SIZE(x) (((x)+BUF_ALIGNMENT_SIZE-1)&~(BUF_ALIGNMENT_SIZE-1)) 670 #define BUF_DEFAULT_SIZE 4096 671 672 class ScopedBuf 673 { 674 public: 675 ScopedBuf(void*& buf_keeper, bool keep) 676 :buf_keeper_(buf_keeper),buf_(0),len_(0),len_watermark_(0),keep_(keep) 677 {} 678 679 int Alloc(int len) 680 { 681 if(len<len_) 682 { 683 return -1; 684 } 685 686 if(len==0) 687 { 688 len = BUF_ALIGNMENT_SIZE; 689 } 690 if(len_==len) 691 { 692 return 0; 693 } 694 695 len_ = BUF_ALIGN_SIZE(len); 696 if(len_==0) 697 { 698 len_ = BUF_DEFAULT_SIZE; 699 } 700 len_watermark_ = len_-BUF_ALIGNMENT_SIZE; 701 char* tmp = (char*)realloc(buf_, len_); 702 if(tmp==NULL) 703 { 704 return -2; 705 } 706 707 buf_ = tmp; 708 return 0; 709 } 710 711 void reset() 712 { 713 if(keep_) 714 { 715 buf_keeper_ = (void*)buf_; 716 buf_ = NULL; 717 } 718 } 719 720 ~ScopedBuf() 721 { 722 if(buf_!=NULL) 723 { 724 free(buf_); 725 buf_ = NULL; 726 } 727 } 728 729 public: 730 void* &buf_keeper_; 731 char* buf_; 732 int len_; 733 int len_watermark_; 734 bool keep_; 735 736 }; 737 738 static int mt_tcp_check_recv(int sock, void*& rcv_buf, int &len, int flags, 739 int timeout, MtFuncTcpMsgChecker check_func, void* msg_ctx, bool keep_rcv_buf) 740 { 741 int recv_len = 0; 742 utime64_t start_ms = MtFrame::Instance()->GetLastClock(); 743 744 int rc = 0; 745 int ret = 0; 746 int pkg_len = 0; 747 bool msg_len_detected = false; 748 749 ScopedBuf sbuf(rcv_buf, keep_rcv_buf); 750 ret = sbuf.Alloc(len); 751 752 if(ret!=0) 753 { 754 MTLOG_ERROR("tcp socket[%d] recv failed ret[%d], alloc rcv buf failed, [%m]", sock, ret); 755 return -11; 756 } 757 758 do 759 { 760 utime64_t cost_time = MtFrame::Instance()->GetLastClock() - start_ms; 761 if (cost_time > (utime64_t)timeout) 762 { 763 errno = ETIME; 764 MTLOG_ERROR("tcp socket[%d] recv not ok, timeout", sock); 765 return -3; 766 } 767 768 rc = MtFrame::recv(sock, (sbuf.buf_ + recv_len), (sbuf.len_ - recv_len), 0, (timeout - (int)cost_time)); 769 if (rc < 0) 770 { 771 MTLOG_ERROR("tcp socket[%d] recv failed ret[%d][%m]", sock, rc); 772 return -3; 773 } 774 else if (rc == 0) 775 { 776 777 if(recv_len==0) 778 { 779 MTLOG_ERROR("tcp socket[%d] remote close", sock); 780 return -7; 781 } 782 783 rc = check_func(sbuf.buf_, recv_len, true, msg_ctx, msg_len_detected); 784 785 if(rc!=recv_len) 786 { 787 MTLOG_ERROR("tcp socket[%d] remote close", sock); 788 return -7; 789 } 790 len = recv_len; 791 break; 792 } 793 recv_len += rc; 794 795 if((!msg_len_detected)||recv_len==pkg_len) 796 { 797 rc = check_func(sbuf.buf_, recv_len, false, msg_ctx,msg_len_detected); 798 if(msg_len_detected) 799 { 800 pkg_len = rc; 801 } 802 } 803 else 804 { 805 rc = pkg_len; 806 } 807 808 if (rc < 0) 809 { 810 MTLOG_ERROR("tcp socket[%d] user check pkg error[%d]", sock, rc); 811 return -5; 812 } 813 else if (rc == 0) 814 { 815 if(sbuf.len_ > recv_len) 816 { 817 continue; 818 } 819 820 ret = sbuf.Alloc(sbuf.len_<<1); 821 822 if(ret!=0) 823 { 824 MTLOG_ERROR("tcp socket[%d] recv failed ret[%d], alloc rcv buf failed, [%m]", sock, ret); 825 return -11; 826 } 827 } 828 else 829 { 830 if (rc > recv_len) 831 { 832 if(sbuf.len_ > recv_len) 833 { 834 continue; 835 } 836 837 ret = sbuf.Alloc(rc); 838 839 if(ret!=0) 840 { 841 MTLOG_ERROR("tcp socket[%d] recv failed ret[%d], alloc rcv buf failed, [%m]", sock, ret); 842 return -11; 843 } 844 } 845 else if(rc==recv_len) 846 { 847 len = rc; 848 break; 849 } 850 else 851 { 852 MTLOG_ERROR("tcp socket[%d] user check pkg error, pkg len < recv_len", sock); 853 return -5; 854 } 855 } 856 } while (true); 857 858 sbuf.reset(); 859 860 return 0; 861 } 862 863 int mt_tcpsendrcv(struct sockaddr_in* dst, void* pkg, int len, void*& rcv_buf, int& recv_pkg_size, 864 int timeout, MtFuncTcpMsgChecker check_func, void* msg_ctx, bool keep_rcv_buf) 865 { 866 if(!dst || !pkg || len<1) 867 { 868 MTLOG_ERROR("input params invalid, dst[%p], pkg[%p], len[%d], fun[%p]", 869 dst, pkg, len, check_func); 870 return -10; 871 } 872 873 874 int ret = 0, rc = 0; 875 int addr_len = sizeof(struct sockaddr_in); 876 utime64_t start_ms = MtFrame::Instance()->GetLastClock(); 877 utime64_t cost_time = 0; 878 int time_left = timeout; 879 880 int sock = -1; 881 TcpKeepConn* conn = mt_tcp_get_keep_conn(dst, sock); 882 if ((conn == NULL) || (sock < 0)) 883 { 884 MTLOG_ERROR("socket[%d] get conn failed, ret[%m]", sock); 885 ret = -1; 886 goto EXIT_LABEL; 887 } 888 889 rc = MtFrame::connect(sock, (struct sockaddr *)dst, addr_len, time_left); 890 if (rc < 0) 891 { 892 MTLOG_ERROR("socket[%d] connect failed, ret[%d][%m]", sock, rc); 893 ret = -4; 894 goto EXIT_LABEL; 895 } 896 897 cost_time = MtFrame::Instance()->GetLastClock() - start_ms; 898 time_left = (timeout > (int)cost_time) ? (timeout - (int)cost_time) : 0; 899 rc = MtFrame::send(sock, pkg, len, 0, time_left); 900 if (rc < 0) 901 { 902 MTLOG_ERROR("socket[%d] send failed, ret[%d][%m]", sock, rc); 903 ret = -2; 904 goto EXIT_LABEL; 905 } 906 907 cost_time = MtFrame::Instance()->GetLastClock() - start_ms; 908 time_left = (timeout > (int)cost_time) ? (timeout - (int)cost_time) : 0; 909 910 rc = mt_tcp_check_recv(sock, rcv_buf, recv_pkg_size, 0, time_left, check_func, msg_ctx, keep_rcv_buf); 911 if (rc < 0) 912 { 913 MTLOG_ERROR("socket[%d] rcv failed, ret[%d][%m]", sock, rc); 914 ret = rc; 915 goto EXIT_LABEL; 916 } 917 918 ret = 0; 919 920 EXIT_LABEL: 921 if (conn != NULL) 922 { 923 ConnectionMgr::Instance()->FreeConnection(conn, (ret < 0)); 924 } 925 926 return ret; 927 } 928 929 int mt_tcpsendrcv_short(struct sockaddr_in* dst, void* pkg, int len, void*& rcv_buf, int& recv_pkg_size, 930 int timeout, MtFuncTcpMsgChecker check_func, void* msg_ctx, bool keep_rcv_buf) 931 { 932 int ret = 0, rc = 0; 933 int addr_len = sizeof(struct sockaddr_in); 934 utime64_t start_ms = MtFrame::Instance()->GetLastClock(); 935 utime64_t cost_time = 0; 936 int time_left = timeout; 937 938 if(!dst || !pkg || len<1) 939 { 940 MTLOG_ERROR("input params invalid, dst[%p], pkg[%p], len[%d], fun[%p]", 941 dst, pkg, len, check_func); 942 return -10; 943 } 944 945 int sock; 946 sock = mt_tcp_create_sock(); 947 if (sock < 0) 948 { 949 MTLOG_ERROR("create tcp socket failed, ret: %d", sock); 950 return -1; 951 } 952 953 rc = MtFrame::connect(sock, (struct sockaddr *)dst, addr_len, time_left); 954 if (rc < 0) 955 { 956 MTLOG_ERROR("socket[%d] connect failed, ret[%d][%m]", sock, rc); 957 ret = -4; 958 goto EXIT_LABEL; 959 } 960 961 cost_time = MtFrame::Instance()->GetLastClock() - start_ms; 962 time_left = (timeout > (int)cost_time) ? (timeout - (int)cost_time) : 0; 963 rc = MtFrame::send(sock, pkg, len, 0, time_left); 964 if (rc < 0) 965 { 966 MTLOG_ERROR("socket[%d] send failed, ret[%d][%m]", sock, rc); 967 ret = -2; 968 goto EXIT_LABEL; 969 } 970 971 cost_time = MtFrame::Instance()->GetLastClock() - start_ms; 972 time_left = (timeout > (int)cost_time) ? (timeout - (int)cost_time) : 0; 973 rc = mt_tcp_check_recv(sock, rcv_buf, recv_pkg_size, 0, time_left, check_func, msg_ctx, keep_rcv_buf); 974 975 if (rc < 0) 976 { 977 MTLOG_ERROR("socket[%d] rcv failed, ret[%d][%m]", sock, rc); 978 ret = rc; 979 goto EXIT_LABEL; 980 } 981 982 ret = 0; 983 984 EXIT_LABEL: 985 if (sock >= 0) 986 ::close(sock); 987 988 return ret; 989 } 990 991 int mt_tcpsendrcv_ex(struct sockaddr_in* dst, void* pkg, int len, void*& rcv_buf, int& rcv_pkg_size, 992 int timeout, MtFuncTcpMsgChecker check_func, void* msg_ctx, 993 MT_TCP_CONN_TYPE type, bool keep_rcv_buf) 994 { 995 if(!dst || !pkg || len<1) 996 { 997 MTLOG_ERROR("input params invalid, dst[%p], pkg[%p], len[%d], fun[%p], msg_ctx[%p], type[%d]", 998 dst, pkg, len, check_func, msg_ctx, type); 999 return -10; 1000 } 1001 1002 switch (type) 1003 { 1004 case MT_TCP_LONG: 1005 { 1006 return mt_tcpsendrcv(dst, pkg, len, rcv_buf, rcv_pkg_size, timeout, check_func, msg_ctx, keep_rcv_buf); 1007 } 1008 1009 case MT_TCP_LONG_SNDONLY: 1010 { 1011 return mt_tcpsend(dst, pkg, len, timeout); 1012 } 1013 1014 case MT_TCP_SHORT: 1015 { 1016 return mt_tcpsendrcv_short(dst, pkg, len, rcv_buf, rcv_pkg_size, timeout, check_func, msg_ctx, keep_rcv_buf); 1017 } 1018 1019 case MT_TCP_SHORT_SNDONLY: 1020 { 1021 return mt_tcpsend_short(dst, pkg, len, timeout); 1022 } 1023 1024 default: 1025 { 1026 MTLOG_ERROR("input params invalid, dst[%p], pkg[%p], len[%d], fun[%p], msg_ctx[%p], type[%d]", 1027 dst, pkg, len, check_func, msg_ctx, type); 1028 return -10; 1029 } 1030 } 1031 1032 return 0; 1033 } 1034 1035 } 1036