1 2 /** 3 * Tencent is pleased to support the open source community by making MSEC available. 4 * 5 * Copyright (C) 2016 THL A29 Limited, a Tencent company. All rights reserved. 6 * 7 * Licensed under the GNU General Public License, Version 2.0 (the "License"); 8 * you may not use this file except in compliance with the License. You may 9 * obtain a copy of the License at 10 * 11 * https://opensource.org/licenses/GPL-2.0 12 * 13 * Unless required by applicable law or agreed to in writing, software distributed under the 14 * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, 15 * either express or implied. See the License for the specific language governing permissions 16 * and limitations under the License. 17 */ 18 19 20 /** 21 * @file mt_mbuf_pool.cpp 22 * @time 20130924 23 **/ 24 25 #include <errno.h> 26 #include <netinet/tcp.h> 27 #include "micro_thread.h" 28 #include "mt_sys_hook.h" 29 #include "ff_hook.h" 30 #include "mt_net.h" 31 32 33 using namespace std; 34 using namespace NS_MICRO_THREAD; 35 36 CNetHelper::CNetHelper() 37 { 38 handler = (void*)CNetMgr::Instance()->AllocNetItem(); 39 } 40 41 CNetHelper::~CNetHelper() 42 { 43 CNetHandler* net_handler = (CNetHandler*)handler; 44 if (handler != NULL) 45 { 46 net_handler->Reset(); 47 CNetMgr::Instance()->FreeNetItem(net_handler); 48 handler = NULL; 49 } 50 } 51 52 int32_t CNetHelper::SendRecv(void* data, uint32_t len, uint32_t timeout) 53 { 54 if (handler != NULL) { 55 CNetHandler* net_handler = (CNetHandler*)handler; 56 return net_handler->SendRecv(data, len, timeout); 57 } else { 58 return RC_INVALID_HANDLER; 59 } 60 } 61 62 void* CNetHelper::GetRspBuff() 63 { 64 if (handler != NULL) { 65 CNetHandler* net_handler = (CNetHandler*)handler; 66 return net_handler->GetRspBuff(); 67 } else { 68 return NULL; 69 } 70 } 71 72 uint32_t CNetHelper::GetRspLen() 73 { 74 if (handler != NULL) { 75 CNetHandler* net_handler = (CNetHandler*)handler; 76 return net_handler->GetRspLen(); 77 } else { 78 return 0; 79 } 80 } 81 82 char* CNetHelper::GetErrMsg(int32_t result) 83 { 84 static const char* errmsg = "unknown error type"; 85 86 switch (result) 87 { 88 case RC_SUCCESS: 89 errmsg = "success"; 90 break; 91 92 case RC_ERR_SOCKET: 93 errmsg = "create socket failed"; 94 break; 95 96 case RC_SEND_FAIL: 97 errmsg = "send pakeage timeout or failed"; 98 break; 99 100 case RC_RECV_FAIL: 101 errmsg = "recv response timeout or failed"; 102 break; 103 104 case RC_CONNECT_FAIL: 105 errmsg = "connect timeout or failed"; 106 break; 107 108 case RC_CHECK_PKG_FAIL: 109 errmsg = "user package check failed"; 110 break; 111 112 case RC_NO_MORE_BUFF: 113 errmsg = "user response buffer too small"; 114 break; 115 116 case RC_REMOTE_CLOSED: 117 errmsg = "remote close connection"; 118 break; 119 120 case RC_INVALID_PARAM: 121 errmsg = "params invalid"; 122 break; 123 124 case RC_INVALID_HANDLER: 125 errmsg = "net handler invalid"; 126 break; 127 128 case RC_MEM_ERROR: 129 errmsg = "no more memory, alloc failed"; 130 break; 131 132 case RC_CONFLICT_SID: 133 errmsg = "session id with the dest address conflict"; 134 break; 135 136 case RC_KQUEUE_ERROR: 137 errmsg = "epoll system error"; 138 break; 139 140 default: 141 break; 142 } 143 144 return (char*)errmsg; 145 } 146 147 void CNetHelper::SetProtoType(MT_PROTO_TYPE type) 148 { 149 if (handler != NULL) { 150 CNetHandler* net_handler = (CNetHandler*)handler; 151 return net_handler->SetProtoType(type); 152 } 153 } 154 155 void CNetHelper::SetDestAddress(struct sockaddr_in* dst) 156 { 157 if (handler != NULL) { 158 CNetHandler* net_handler = (CNetHandler*)handler; 159 return net_handler->SetDestAddress(dst); 160 } 161 } 162 163 void CNetHelper::SetSessionId(uint64_t sid) 164 { 165 if (handler != NULL) { 166 CNetHandler* net_handler = (CNetHandler*)handler; 167 return net_handler->SetSessionId(sid); 168 } 169 } 170 171 void CNetHelper::SetSessionCallback(CHECK_SESSION_CALLBACK function) 172 { 173 if (handler != NULL) { 174 CNetHandler* net_handler = (CNetHandler*)handler; 175 return net_handler->SetSessionCallback(function); 176 } 177 } 178 179 void CNetHandler::Reset() 180 { 181 this->Unlink(); 182 this->UnRegistSession(); 183 184 if (_rsp_buff != NULL) { 185 delete_sk_buffer(_rsp_buff); 186 _rsp_buff = NULL; 187 } 188 189 _thread = NULL; 190 _proto_type = NET_PROTO_TCP; 191 _conn_type = TYPE_CONN_SESSION; 192 _dest_ipv4.sin_addr.s_addr = 0; 193 _dest_ipv4.sin_port = 0; 194 _session_id = 0; 195 _callback = NULL; 196 _err_no = 0; 197 _state_flags = 0; 198 _conn_ptr = NULL; 199 _send_pos = 0; 200 _req_len = 0; 201 _req_data = NULL; 202 203 } 204 205 CNetHandler::CNetHandler() 206 { 207 _state_flags = 0; 208 _rsp_buff = NULL; 209 210 this->Reset(); 211 } 212 213 CNetHandler::~CNetHandler() 214 { 215 this->Reset(); 216 } 217 218 int32_t CNetHandler::CheckParams() 219 { 220 if ((NULL == _req_data) || (_req_len == 0)) 221 { 222 MTLOG_ERROR("param invalid, data[%p], len[%u]", _req_data, _req_len); 223 return RC_INVALID_PARAM; 224 } 225 226 if ((_dest_ipv4.sin_addr.s_addr == 0) || (_dest_ipv4.sin_port == 0)) 227 { 228 MTLOG_ERROR("param invalid, ip[%u], port[%u]", _dest_ipv4.sin_addr.s_addr, 229 _dest_ipv4.sin_port); 230 return RC_INVALID_PARAM; 231 } 232 233 if (_conn_type == TYPE_CONN_SESSION) 234 { 235 if ((_callback == NULL) || (_session_id == 0)) 236 { 237 MTLOG_ERROR("param invalid, callback[%p], session_id[%llu]", _callback, _session_id); 238 return RC_INVALID_PARAM; 239 } 240 241 if (!this->RegistSession()) 242 { 243 MTLOG_ERROR("param invalid, session_id[%llu] regist failed", _session_id); 244 return RC_CONFLICT_SID; 245 } 246 } 247 248 return 0; 249 } 250 251 int32_t CNetHandler::GetConnLink() 252 { 253 CDestLinks key; 254 key.SetKeyInfo(_dest_ipv4.sin_addr.s_addr, _dest_ipv4.sin_port, _proto_type, _conn_type); 255 256 CDestLinks* dest_link = CNetMgr::Instance()->FindCreateDest(&key); 257 if (NULL == dest_link) 258 { 259 MTLOG_ERROR("get dest link handle failed"); 260 return RC_MEM_ERROR; 261 } 262 263 CSockLink* sock_link = dest_link->GetSockLink(); 264 if (NULL == sock_link) 265 { 266 MTLOG_ERROR("get sock link handle failed"); 267 return RC_MEM_ERROR; 268 } 269 270 this->Link(sock_link); 271 272 return 0; 273 } 274 275 int32_t CNetHandler::WaitConnect(uint64_t timeout) 276 { 277 CSockLink* conn = (CSockLink*)this->_conn_ptr; 278 if (NULL == conn) 279 { 280 MTLOG_ERROR("get sock link handle failed"); 281 return RC_MEM_ERROR; 282 } 283 284 int32_t fd = conn->CreateSock(); 285 if (fd < 0) 286 { 287 MTLOG_ERROR("create sock failed, ret %d[%m]", fd); 288 return RC_ERR_SOCKET; 289 } 290 291 if (conn->Connect()) 292 { 293 MTLOG_DEBUG("sock conncet ok"); 294 return RC_SUCCESS; 295 } 296 297 this->SwitchToConn(); 298 299 MtFrame* mtframe = MtFrame::Instance(); 300 mtframe->WaitNotify(timeout); 301 302 this->SwitchToIdle(); 303 304 if (_err_no != 0) 305 { 306 MTLOG_ERROR("connect get out errno %d", _err_no); 307 return _err_no; 308 } 309 310 if (conn->Connected()) 311 { 312 MTLOG_DEBUG("connect ok"); 313 return 0; 314 } 315 else 316 { 317 MTLOG_TRACE("connect not ok, maybe timeout"); 318 return RC_CONNECT_FAIL; 319 } 320 } 321 322 int32_t CNetHandler::WaitSend(uint64_t timeout) 323 { 324 CSockLink* conn = (CSockLink*)this->_conn_ptr; 325 if (NULL == conn) 326 { 327 MTLOG_ERROR("get sock link handle failed"); 328 return RC_MEM_ERROR; 329 } 330 331 int32_t ret = conn->SendData(_req_data, _req_len); 332 if (ret < 0) 333 { 334 MTLOG_ERROR("sock send failed, ret %d[%m]", ret); 335 return RC_SEND_FAIL; 336 } 337 this->SkipSendPos(ret); 338 339 if (_req_len == 0) 340 { 341 MTLOG_DEBUG("sock send ok"); 342 return RC_SUCCESS; 343 } 344 345 this->SwitchToSend(); 346 347 MtFrame* mtframe = MtFrame::Instance(); 348 mtframe->WaitNotify(timeout); 349 350 this->SwitchToIdle(); 351 352 if (_err_no != 0) 353 { 354 MTLOG_ERROR("send get out errno %d", _err_no); 355 return _err_no; 356 } 357 358 if (_req_len == 0) 359 { 360 MTLOG_DEBUG("send req ok, len %u", _send_pos); 361 return 0; 362 } 363 else 364 { 365 MTLOG_TRACE("send req not ok, left len %u", _req_len); 366 return RC_SEND_FAIL; 367 } 368 } 369 370 int32_t CNetHandler::WaitRecv(uint64_t timeout) 371 { 372 CSockLink* conn = (CSockLink*)this->_conn_ptr; 373 if (NULL == conn) 374 { 375 MTLOG_ERROR("get sock link handle failed"); 376 return RC_MEM_ERROR; 377 } 378 379 if (_conn_type == TYPE_CONN_SENDONLY) 380 { 381 MTLOG_DEBUG("only send, without recv"); 382 return 0; 383 } 384 385 this->SwitchToRecv(); 386 387 MtFrame* mtframe = MtFrame::Instance(); 388 mtframe->WaitNotify(timeout); 389 390 this->SwitchToIdle(); 391 392 if ((_rsp_buff != NULL) && (_rsp_buff->data_len > 0)) 393 { 394 MTLOG_DEBUG("recv get rsp, len %d", _rsp_buff->data_len); 395 return 0; 396 } 397 else 398 { 399 MTLOG_TRACE("recv get out errno %d", _err_no); 400 return RC_RECV_FAIL; 401 } 402 } 403 404 int32_t CNetHandler::SendRecv(void* data, uint32_t len, uint32_t timeout) 405 { 406 utime64_t start_ms = MtFrame::Instance()->GetLastClock(); 407 utime64_t cost_time = 0; 408 uint64_t time_left = timeout; 409 this->_req_data = data; 410 this->_req_len = len; 411 412 int32_t ret = this->CheckParams(); 413 if (ret < 0) 414 { 415 MTLOG_ERROR("check params failed, ret[%d]", ret); 416 goto EXIT_LABEL; 417 } 418 419 ret = this->GetConnLink(); 420 if (ret < 0) 421 { 422 MTLOG_ERROR("get sock conn failed, ret: %d", ret); 423 goto EXIT_LABEL; 424 } 425 426 ret = this->WaitConnect(time_left); 427 if (ret < 0) 428 { 429 MTLOG_ERROR("sock connect failed, ret: %d", ret); 430 goto EXIT_LABEL; 431 } 432 433 cost_time = MtFrame::Instance()->GetLastClock() - start_ms; 434 time_left = (timeout > (uint32_t)cost_time) ? (timeout - (uint32_t)cost_time) : 0; 435 ret = this->WaitSend(time_left); 436 if (ret < 0) 437 { 438 MTLOG_ERROR("sock send failed, ret: %d", ret); 439 goto EXIT_LABEL; 440 } 441 442 cost_time = MtFrame::Instance()->GetLastClock() - start_ms; 443 time_left = (timeout > (uint32_t)cost_time) ? (timeout - (uint32_t)cost_time) : 0; 444 ret = this->WaitRecv(time_left); 445 if (ret < 0) 446 { 447 MTLOG_ERROR("sock recv failed, ret: %d", ret); 448 goto EXIT_LABEL; 449 } 450 451 ret = 0; 452 453 EXIT_LABEL: 454 455 this->Unlink(); 456 457 this->UnRegistSession(); 458 459 return ret; 460 } 461 462 uint32_t CNetHandler::SkipSendPos(uint32_t len) 463 { 464 uint32_t skip_len = (len >= _req_len) ? _req_len : len; 465 _req_len -= skip_len; 466 _send_pos += skip_len; 467 _req_data = (char*)_req_data + skip_len; 468 469 return skip_len; 470 } 471 472 void CNetHandler::Link(CSockLink* conn) 473 { 474 this->_conn_ptr = conn; 475 this->SwitchToIdle(); 476 } 477 478 void CNetHandler::Unlink() 479 { 480 if (this->_state_flags != 0) 481 { 482 this->DetachConn(); 483 } 484 this->_conn_ptr = NULL; 485 } 486 487 void CNetHandler::SwitchToConn() 488 { 489 CSockLink* conn = (CSockLink*)this->_conn_ptr; 490 if (NULL == conn) 491 { 492 MTLOG_ERROR("net handler invalid"); 493 return; 494 } 495 496 this->DetachConn(); 497 498 this->_state_flags |= STATE_IN_CONNECT; 499 conn->AppendToList(CSockLink::LINK_CONN_LIST, this); 500 } 501 502 void CNetHandler::SwitchToSend() 503 { 504 CSockLink* conn = (CSockLink*)this->_conn_ptr; 505 if (NULL == conn) 506 { 507 MTLOG_ERROR("net handler invalid"); 508 return; 509 } 510 511 this->DetachConn(); 512 513 this->_state_flags |= STATE_IN_SEND; 514 conn->AppendToList(CSockLink::LINK_SEND_LIST, this); 515 } 516 517 void CNetHandler::SwitchToRecv() 518 { 519 CSockLink* conn = (CSockLink*)this->_conn_ptr; 520 if (NULL == conn) 521 { 522 MTLOG_ERROR("net handler invalid"); 523 return; 524 } 525 526 this->DetachConn(); 527 528 this->_state_flags |= STATE_IN_RECV; 529 conn->AppendToList(CSockLink::LINK_RECV_LIST, this); 530 } 531 532 void CNetHandler::SwitchToIdle() 533 { 534 CSockLink* conn = (CSockLink*)this->_conn_ptr; 535 if (NULL == conn) 536 { 537 MTLOG_ERROR("net handler invalid"); 538 return; 539 } 540 541 this->DetachConn(); 542 543 this->_state_flags |= STATE_IN_IDLE; 544 conn->AppendToList(CSockLink::LINK_IDLE_LIST, this); 545 } 546 547 void CNetHandler::DetachConn() 548 { 549 CSockLink* conn = (CSockLink*)this->_conn_ptr; 550 if (NULL == conn) 551 { 552 MTLOG_DEBUG("net handler not set"); 553 return; 554 } 555 556 if (_state_flags == 0) 557 { 558 return; 559 } 560 561 if (_state_flags & STATE_IN_CONNECT) 562 { 563 conn->RemoveFromList(CSockLink::LINK_CONN_LIST, this); 564 _state_flags &= ~STATE_IN_CONNECT; 565 } 566 567 if (_state_flags & STATE_IN_SEND) 568 { 569 conn->RemoveFromList(CSockLink::LINK_SEND_LIST, this); 570 _state_flags &= ~STATE_IN_SEND; 571 } 572 573 if (_state_flags & STATE_IN_RECV) 574 { 575 conn->RemoveFromList(CSockLink::LINK_RECV_LIST, this); 576 _state_flags &= ~STATE_IN_RECV; 577 } 578 579 if (_state_flags & STATE_IN_IDLE) 580 { 581 conn->RemoveFromList(CSockLink::LINK_IDLE_LIST, this); 582 _state_flags &= ~STATE_IN_IDLE; 583 } 584 } 585 586 uint32_t CNetHandler::HashValue() 587 { 588 uint32_t ip = _dest_ipv4.sin_addr.s_addr; 589 ip ^= (_dest_ipv4.sin_port << 16) | (_proto_type << 8) | (_conn_type << 8); 590 591 uint32_t hash = (_session_id >> 32) & 0xffffffff; 592 hash ^= _session_id & 0xffffffff; 593 hash ^= ip; 594 595 return hash; 596 } 597 598 int32_t CNetHandler::HashCmp(HashKey* rhs) 599 { 600 CNetHandler* data = (CNetHandler*)(rhs); 601 if (!data) { 602 return -1; 603 } 604 if (this->_session_id != data->_session_id) 605 { 606 return (this->_session_id > data->_session_id) ? 1 : -1; 607 } 608 609 if (this->_dest_ipv4.sin_addr.s_addr != data->_dest_ipv4.sin_addr.s_addr) { 610 return (this->_dest_ipv4.sin_addr.s_addr > data->_dest_ipv4.sin_addr.s_addr) ? 1 : -1; 611 } 612 if (this->_dest_ipv4.sin_port != data->_dest_ipv4.sin_port) { 613 return (this->_dest_ipv4.sin_port > data->_dest_ipv4.sin_port) ? 1 : -1; 614 } 615 if (this->_proto_type != data->_proto_type) { 616 return (this->_proto_type > data->_proto_type) ? 1 : -1; 617 } 618 if (this->_conn_type != data->_conn_type) { 619 return (this->_conn_type > data->_conn_type) ? 1 : -1; 620 } 621 622 return 0; 623 }; 624 625 bool CNetHandler::RegistSession() 626 { 627 if (CNetMgr::Instance()->FindNetItem(this) != NULL) 628 { 629 return false; 630 } 631 632 MtFrame* mtframe = MtFrame::Instance(); 633 this->_thread = mtframe->GetActiveThread(); 634 635 CNetMgr::Instance()->InsertNetItem(this); 636 this->_state_flags |= STATE_IN_SESSION; 637 return true; 638 } 639 640 void CNetHandler::UnRegistSession() 641 { 642 if (this->_state_flags & STATE_IN_SESSION) 643 { 644 CNetMgr::Instance()->RemoveNetItem(this); 645 this->_state_flags &= ~STATE_IN_SESSION; 646 } 647 } 648 649 TNetItemList* CSockLink::GetItemList(int32_t type) 650 { 651 TNetItemList* list = NULL; 652 switch (type) 653 { 654 case LINK_IDLE_LIST: 655 list = &this->_idle_list; 656 break; 657 658 case LINK_CONN_LIST: 659 list = &this->_wait_connect; 660 break; 661 662 case LINK_SEND_LIST: 663 list = &this->_wait_send; 664 break; 665 666 case LINK_RECV_LIST: 667 list = &this->_wait_recv; 668 break; 669 670 default: 671 break; 672 } 673 674 return list; 675 } 676 677 void CSockLink::AppendToList(int32_t type, CNetHandler* item) 678 { 679 TNetItemList* list = this->GetItemList(type); 680 if (NULL == list) 681 { 682 MTLOG_ERROR("unknown list type: %d", type); 683 return; 684 } 685 686 TAILQ_INSERT_TAIL(list, item, _link_entry); 687 } 688 689 void CSockLink::RemoveFromList(int32_t type, CNetHandler* item) 690 { 691 TNetItemList* list = this->GetItemList(type); 692 if (NULL == list) 693 { 694 MTLOG_ERROR("unknown list type: %d", type); 695 return; 696 } 697 698 TAILQ_REMOVE(list, item, _link_entry); 699 } 700 701 void CSockLink::NotifyThread(CNetHandler* item, int32_t result) 702 { 703 static MtFrame* frame = NULL; 704 if (frame == NULL) { 705 frame = MtFrame::Instance(); 706 } 707 708 if (result != RC_SUCCESS) 709 { 710 item->SetErrNo(result); 711 } 712 713 MicroThread* thread = item->GetThread(); 714 if ((thread != NULL) && (thread->HasFlag(MicroThread::IO_LIST))) 715 { 716 frame->RemoveIoWait(thread); 717 frame->InsertRunable(thread); 718 } 719 } 720 721 void CSockLink::NotifyAll(int32_t result) 722 { 723 CNetHandler* item = NULL; 724 CNetHandler* tmp = NULL; 725 726 TAILQ_FOREACH_SAFE(item, &_wait_connect, _link_entry, tmp) 727 { 728 NotifyThread(item, result); 729 item->Unlink(); 730 } 731 732 TAILQ_FOREACH_SAFE(item, &_wait_send, _link_entry, tmp) 733 { 734 NotifyThread(item, result); 735 item->Unlink(); 736 } 737 738 TAILQ_FOREACH_SAFE(item, &_wait_recv, _link_entry, tmp) 739 { 740 NotifyThread(item, result); 741 item->Unlink(); 742 } 743 744 TAILQ_FOREACH_SAFE(item, &_idle_list, _link_entry, tmp) 745 { 746 NotifyThread(item, result); 747 item->Unlink(); 748 } 749 } 750 751 void CSockLink::Reset() 752 { 753 this->Close(); 754 this->NotifyAll(_errno); 755 756 rw_cache_destroy(&_recv_cache); 757 if (_rsp_buff != NULL) 758 { 759 delete_sk_buffer(_rsp_buff); 760 _rsp_buff = NULL; 761 } 762 763 TAILQ_INIT(&_wait_connect); 764 TAILQ_INIT(&_wait_send); 765 TAILQ_INIT(&_wait_recv); 766 TAILQ_INIT(&_idle_list); 767 768 _proto_type = NET_PROTO_TCP; 769 _errno = 0; 770 _state = 0; 771 _last_access = mt_time_ms(); 772 _parents = NULL; 773 774 this->KqueuerObj::Reset(); 775 } 776 777 CSockLink::CSockLink() 778 { 779 rw_cache_init(&_recv_cache, NULL); 780 _rsp_buff = NULL; 781 782 TAILQ_INIT(&_wait_connect); 783 TAILQ_INIT(&_wait_send); 784 TAILQ_INIT(&_wait_recv); 785 TAILQ_INIT(&_idle_list); 786 787 _proto_type = NET_PROTO_TCP; 788 _errno = 0; 789 _state = 0; 790 _last_access = mt_time_ms(); 791 _parents = NULL; 792 } 793 794 CSockLink::~CSockLink() 795 { 796 this->Reset(); 797 } 798 799 void CSockLink::SetProtoType(MT_PROTO_TYPE type) 800 { 801 _proto_type = type; 802 _recv_cache.pool = CNetMgr::Instance()->GetSkBuffMng(type); 803 } 804 805 void CSockLink::Close() 806 { 807 if (_fd < 0) 808 { 809 return; 810 } 811 812 MtFrame::Instance()->KqueueDelObj(this); 813 814 close(_fd); 815 _fd = -1; 816 } 817 818 void CSockLink::Destroy() 819 { 820 CDestLinks* dstlink = (CDestLinks*)_parents; 821 if (NULL == dstlink) 822 { 823 MTLOG_ERROR("socket link without parents ptr, maybe wrong"); 824 delete this; 825 } 826 else 827 { 828 MTLOG_DEBUG("socket link just free"); 829 dstlink->FreeSockLink(this); 830 } 831 } 832 833 int32_t CSockLink::CreateSock() 834 { 835 if (_fd > 0) 836 { 837 return _fd; 838 } 839 840 if (NET_PROTO_TCP == _proto_type) 841 { 842 _fd = socket(AF_INET, SOCK_STREAM, 0); 843 } 844 else 845 { 846 _fd = socket(AF_INET, SOCK_DGRAM, 0); 847 } 848 849 if (_fd < 0) 850 { 851 MTLOG_ERROR("create socket failed, ret %d[%m]", _fd); 852 return -1; 853 } 854 855 int flags = 1; 856 if (ioctl(_fd, FIONBIO, &flags) < 0) 857 { 858 MTLOG_ERROR("socket unblock failed, %m"); 859 close(_fd); 860 _fd = -1; 861 return -2; 862 } 863 864 if (NET_PROTO_TCP == _proto_type) 865 { 866 setsockopt(_fd, IPPROTO_TCP, TCP_NODELAY, &flags, sizeof(flags)); 867 this->EnableOutput(); 868 } 869 870 this->EnableInput(); 871 if (!MtFrame::Instance()->KqueueAddObj(this)) 872 { 873 MTLOG_ERROR("socket epoll mng failed, %m"); 874 close(_fd); 875 _fd = -1; 876 return -3; 877 } 878 879 return _fd; 880 } 881 882 struct sockaddr_in* CSockLink::GetDestAddr(struct sockaddr_in* addr) 883 { 884 CDestLinks* dstlink = (CDestLinks*)_parents; 885 if ((NULL == _parents) || (NULL == addr)) { 886 return NULL; 887 } 888 889 uint32_t ip = 0; 890 uint16_t port = 0; 891 dstlink->GetDestIP(ip, port); 892 893 addr->sin_family = AF_INET; 894 addr->sin_addr.s_addr = ip; 895 addr->sin_port = port; 896 897 return addr; 898 } 899 900 bool CSockLink::Connect() 901 { 902 this->_last_access = mt_time_ms(); 903 904 if (_proto_type == NET_PROTO_UDP) 905 { 906 _state |= LINK_CONNECTED; 907 } 908 909 if (_state & LINK_CONNECTED) 910 { 911 return true; 912 } 913 914 if (_state & LINK_CONNECTING) 915 { 916 return false; 917 } 918 919 struct sockaddr_in addr = {0}; 920 921 mt_hook_syscall(connect); 922 int32_t ret = ff_hook_connect(_fd, (struct sockaddr*)this->GetDestAddr(&addr), sizeof(struct sockaddr_in)); 923 if (ret < 0) 924 { 925 int32_t err = errno; 926 if (err == EISCONN) 927 { 928 _state |= LINK_CONNECTED; 929 return true; 930 } 931 else 932 { 933 _state |= LINK_CONNECTING; 934 if ((err == EINPROGRESS) || (err == EALREADY) || (err == EINTR)) 935 { 936 MTLOG_DEBUG("Open connect not ok, maybe first try, sock %d, errno %d", _fd, err); 937 return false; 938 } 939 else 940 { 941 MTLOG_ERROR("Open connect not ok, sock %d, errno %d", _fd, err); 942 return false; 943 } 944 } 945 } 946 else 947 { 948 _state |= LINK_CONNECTED; 949 return true; 950 } 951 } 952 953 int32_t CSockLink::SendCacheUdp(void* data, uint32_t len) 954 { 955 mt_hook_syscall(sendto); 956 void* buff = NULL; 957 uint32_t buff_len = 0; 958 959 CNetHandler* item = NULL; 960 CNetHandler* tmp = NULL; 961 struct sockaddr_in dst = {0}; 962 963 TAILQ_FOREACH_SAFE(item, &_wait_send, _link_entry, tmp) 964 { 965 item->GetSendData(buff, buff_len); 966 if ((NULL == buff) || (buff_len == 0)) 967 { 968 MTLOG_ERROR("get buff ptr invalid, log it"); 969 NotifyThread(item, 0); 970 item->SwitchToIdle(); 971 continue; 972 } 973 974 int32_t ret = ff_hook_sendto(_fd, buff, buff_len, 0, 975 (struct sockaddr*)this->GetDestAddr(&dst), sizeof(struct sockaddr_in)); 976 if (ret == -1) 977 { 978 if ((errno == EINTR) || (errno == EAGAIN) || (errno == EINPROGRESS)) 979 { 980 return 0; 981 } 982 else 983 { 984 MTLOG_ERROR("socket send failed, fd %d, errno %d(%s)", _fd, 985 errno, strerror(errno)); 986 return -2; 987 } 988 } 989 990 NotifyThread(item, 0); 991 item->SwitchToIdle(); 992 } 993 994 if ((data == NULL) || (len == 0)) 995 { 996 return 0; 997 } 998 999 int32_t ret = ff_hook_sendto(_fd, data, len, 0, 1000 (struct sockaddr*)this->GetDestAddr(&dst), sizeof(struct sockaddr_in)); 1001 if (ret == -1) 1002 { 1003 if ((errno == EINTR) || (errno == EAGAIN) || (errno == EINPROGRESS)) 1004 { 1005 return 0; 1006 } 1007 else 1008 { 1009 MTLOG_ERROR("socket send failed, fd %d, errno %d(%s)", _fd, 1010 errno, strerror(errno)); 1011 return -2; 1012 } 1013 } 1014 else 1015 { 1016 return ret; 1017 } 1018 } 1019 1020 int32_t CSockLink::SendCacheTcp(void* data, uint32_t len) 1021 { 1022 void* buff = NULL; 1023 uint32_t buff_len = 0; 1024 struct iovec iov[64]; 1025 int32_t count = 0; 1026 CNetHandler* item = NULL; 1027 CNetHandler* tmp = NULL; 1028 1029 TAILQ_FOREACH_SAFE(item, &_wait_send, _link_entry, tmp) 1030 { 1031 item->GetSendData(buff, buff_len); 1032 iov[count].iov_base = buff; 1033 iov[count].iov_len = (int32_t)buff_len; 1034 count++; 1035 if (count >= 64) 1036 { 1037 break; 1038 } 1039 } 1040 if ((count < 64) && (data != NULL)) 1041 { 1042 iov[count].iov_base = data; 1043 iov[count].iov_len = (int32_t)len; 1044 count++; 1045 } 1046 1047 ssize_t bytes = writev(_fd, iov, count); 1048 if (bytes < 0) 1049 { 1050 if ((errno == EAGAIN) || (errno == EINTR)) 1051 { 1052 return 0; 1053 } 1054 else 1055 { 1056 MTLOG_ERROR("socket writev failed, fd %d, errno %d(%s)", _fd, 1057 errno, strerror(errno)); 1058 return -1; 1059 } 1060 } 1061 1062 uint32_t send_left = (uint32_t)bytes; 1063 TAILQ_FOREACH_SAFE(item, &_wait_send, _link_entry, tmp) 1064 { 1065 send_left -= item->SkipSendPos(send_left); 1066 item->GetSendData(buff, buff_len); 1067 if (buff_len == 0) 1068 { 1069 NotifyThread(item, 0); 1070 item->SwitchToIdle(); 1071 } 1072 1073 if (send_left == 0) 1074 { 1075 break; 1076 } 1077 } 1078 1079 return send_left; 1080 } 1081 1082 int32_t CSockLink::SendData(void* data, uint32_t len) 1083 { 1084 int32_t ret = 0; 1085 bool rc = false; 1086 1087 this->_last_access = mt_time_ms(); 1088 1089 if (_proto_type == NET_PROTO_UDP) 1090 { 1091 ret = SendCacheUdp(data, len); 1092 } 1093 else 1094 { 1095 ret = SendCacheTcp(data, len); 1096 } 1097 1098 if (ret < (int32_t)len) 1099 { 1100 this->EnableOutput(); 1101 rc = MtFrame::Instance()->KqueueCtrlAdd(_fd, KQ_EVENT_READ); 1102 } 1103 else 1104 { 1105 this->DisableOutput(); 1106 rc = MtFrame::Instance()->KqueueCtrlDel(_fd, KQ_EVENT_WRITE); 1107 } 1108 1109 if (!rc) 1110 { 1111 MTLOG_ERROR("socket epoll mng failed[%m], wait timeout"); 1112 } 1113 1114 return ret; 1115 } 1116 1117 int32_t CSockLink::RecvDispath() 1118 { 1119 if (_proto_type == NET_PROTO_UDP) 1120 { 1121 return this->DispathUdp(); 1122 } 1123 else 1124 { 1125 return this->DispathTcp(); 1126 } 1127 } 1128 1129 void CSockLink::ExtendRecvRsp() 1130 { 1131 if (NULL == _rsp_buff) 1132 { 1133 _rsp_buff = new_sk_buffer(512); 1134 if (NULL == _rsp_buff) 1135 { 1136 MTLOG_ERROR("no more memory, error"); 1137 return; 1138 } 1139 } 1140 1141 _rsp_buff->data_len += read_cache_begin(&_recv_cache, _rsp_buff->data_len, 1142 _rsp_buff->data + _rsp_buff->data_len , _rsp_buff->size - _rsp_buff->data_len); 1143 } 1144 1145 CHECK_SESSION_CALLBACK CSockLink::GetSessionCallback() 1146 { 1147 CHECK_SESSION_CALLBACK check_session = NULL; 1148 1149 CNetHandler* item = TAILQ_FIRST(&_wait_recv); 1150 if (NULL == item) 1151 { 1152 MTLOG_DEBUG("recv data with no wait item, err"); 1153 goto EXIT_LABEL; 1154 } 1155 1156 check_session = item->GetSessionCallback(); 1157 if (NULL == check_session) 1158 { 1159 MTLOG_ERROR("recv data with no session callback, err"); 1160 goto EXIT_LABEL; 1161 } 1162 1163 EXIT_LABEL: 1164 1165 CDestLinks* dstlink = (CDestLinks*)_parents; 1166 if (NULL == dstlink) 1167 { 1168 return check_session; 1169 } 1170 1171 if (check_session != NULL) 1172 { 1173 dstlink->SetDefaultCallback(check_session); 1174 } 1175 else 1176 { 1177 check_session = dstlink->GetDefaultCallback(); 1178 } 1179 1180 return check_session; 1181 } 1182 1183 int32_t CSockLink::DispathTcp() 1184 { 1185 CHECK_SESSION_CALLBACK check_session = this->GetSessionCallback(); 1186 if (NULL == check_session) 1187 { 1188 MTLOG_ERROR("recv data with no session callback, err"); 1189 return -1; 1190 } 1191 1192 uint32_t need_len = 0; 1193 uint64_t sid = 0; 1194 int32_t ret = 0; 1195 while (_recv_cache.len > 0) 1196 { 1197 this->ExtendRecvRsp(); 1198 if (NULL == _rsp_buff) 1199 { 1200 MTLOG_ERROR("alloc memory, error"); 1201 _errno = RC_MEM_ERROR; 1202 return -3; 1203 } 1204 1205 need_len = 0; 1206 ret = check_session(_rsp_buff->data, _rsp_buff->data_len, &sid, &need_len); 1207 1208 if (ret < 0) 1209 { 1210 MTLOG_ERROR("user check resp failed, ret %d", ret); 1211 _errno = RC_CHECK_PKG_FAIL; 1212 return -1; 1213 } 1214 1215 if (ret == 0) 1216 { 1217 if ((need_len == 0) && (_rsp_buff->data_len == _rsp_buff->size)) 1218 { 1219 MTLOG_DEBUG("recv default buff full[%u], but user no set need length", _rsp_buff->size); 1220 need_len = _rsp_buff->size * 2; 1221 } 1222 1223 if ((need_len <= _rsp_buff->size) || (need_len > 100*1024*1024)) 1224 { 1225 MTLOG_DEBUG("maybe need wait more data: %u", need_len); 1226 return 0; 1227 } 1228 1229 _rsp_buff = reserve_sk_buffer(_rsp_buff, need_len); 1230 if (NULL == _rsp_buff) 1231 { 1232 MTLOG_ERROR("no more memory, error"); 1233 _errno = RC_MEM_ERROR; 1234 return -3; 1235 } 1236 1237 if (_rsp_buff->data_len >= _recv_cache.len) 1238 { 1239 MTLOG_DEBUG("maybe need wait more data, now %u", _recv_cache.len); 1240 return 0; 1241 } 1242 1243 continue; 1244 } 1245 1246 if (ret > (int32_t)_recv_cache.len) 1247 { 1248 MTLOG_DEBUG("maybe pkg not all ok, wait more"); 1249 return 0; 1250 } 1251 1252 CNetHandler* session = this->FindSession(sid); 1253 if (NULL == session) 1254 { 1255 MTLOG_DEBUG("session id %llu, find failed, maybe timeout", sid); 1256 cache_skip_data(&_recv_cache, ret); 1257 delete_sk_buffer(_rsp_buff); 1258 _rsp_buff = NULL; 1259 } 1260 else 1261 { 1262 MTLOG_DEBUG("session id %llu, find ok, wakeup it", sid); 1263 cache_skip_data(&_recv_cache, ret); 1264 this->NotifyThread(session, 0); 1265 session->SwitchToIdle(); 1266 _rsp_buff->data_len = ret; 1267 session->SetRespBuff(_rsp_buff); 1268 _rsp_buff = NULL; 1269 } 1270 } 1271 1272 return 0; 1273 1274 } 1275 1276 int32_t CSockLink::DispathUdp() 1277 { 1278 CHECK_SESSION_CALLBACK check_session = NULL; 1279 CNetHandler* item = TAILQ_FIRST(&_wait_recv); 1280 if (NULL == item) 1281 { 1282 MTLOG_DEBUG("recv data with no wait item, maybe wrong pkg recv"); 1283 } 1284 else 1285 { 1286 check_session = item->GetSessionCallback(); 1287 if (NULL == check_session) 1288 { 1289 MTLOG_TRACE("recv data with no session callback, err"); 1290 } 1291 } 1292 1293 uint64_t sid = 0; 1294 uint32_t need_len = 0; 1295 int32_t ret = 0; 1296 TSkBuffer* block = NULL; 1297 while ((block = TAILQ_FIRST(&_recv_cache.list)) != NULL) 1298 { 1299 if (check_session == NULL) 1300 { 1301 MTLOG_DEBUG("no recv wait, skip first block"); 1302 cache_skip_data(&_recv_cache, block->data_len); 1303 continue; 1304 } 1305 1306 need_len = 0; 1307 ret = check_session(block->data, block->data_len, &sid, &need_len); 1308 if ((ret <= 0) || (ret > (int32_t)block->data_len)) 1309 { 1310 MTLOG_DEBUG("maybe wrong pkg come, skip it"); 1311 cache_skip_data(&_recv_cache, block->data_len); 1312 continue; 1313 } 1314 1315 CNetHandler* session = this->FindSession(sid); 1316 if (NULL == session) 1317 { 1318 MTLOG_DEBUG("session id %llu, find failed, maybe timeout", sid); 1319 cache_skip_data(&_recv_cache, block->data_len); 1320 } 1321 else 1322 { 1323 MTLOG_DEBUG("session id %llu, find ok, wakeup it", sid); 1324 this->NotifyThread(session, 0); 1325 session->SwitchToIdle(); 1326 cache_skip_first_buffer(&_recv_cache); 1327 session->SetRespBuff(block); 1328 } 1329 } 1330 1331 return 0; 1332 } 1333 1334 CNetHandler* CSockLink::FindSession(uint64_t sid) 1335 { 1336 CNetHandler key; 1337 CDestLinks* dstlink = (CDestLinks*)_parents; 1338 if (NULL == dstlink) 1339 { 1340 MTLOG_ERROR("session dest link invalid, maybe error"); 1341 return NULL; 1342 } 1343 struct sockaddr_in addr; 1344 key.SetDestAddress(this->GetDestAddr(&addr)); 1345 key.SetConnType(dstlink->GetConnType()); 1346 key.SetProtoType(dstlink->GetProtoType()); 1347 key.SetSessionId(sid); 1348 1349 return CNetMgr::Instance()->FindNetItem(&key); 1350 } 1351 1352 int CSockLink::InputNotify() 1353 { 1354 int32_t ret = 0; 1355 1356 this->_last_access = mt_time_ms(); 1357 1358 if (_proto_type == NET_PROTO_UDP) 1359 { 1360 ret = cache_udp_recv(&_recv_cache, _fd, NULL); 1361 } 1362 else 1363 { 1364 ret = cache_tcp_recv(&_recv_cache, _fd); 1365 } 1366 1367 if (ret < 0) 1368 { 1369 if (ret == -SK_ERR_NEED_CLOSE) 1370 { 1371 MTLOG_DEBUG("recv on link failed, remote close"); 1372 _errno = RC_REMOTE_CLOSED; 1373 } 1374 else 1375 { 1376 MTLOG_ERROR("recv on link failed, close it, ret %d[%m]", ret); 1377 _errno = RC_RECV_FAIL; 1378 } 1379 1380 this->Destroy(); 1381 return -1; 1382 } 1383 1384 ret = this->RecvDispath(); 1385 if (ret < 0) 1386 { 1387 MTLOG_DEBUG("recv dispath failed, close it, ret %d[%m]", ret); 1388 this->Destroy(); 1389 return -2; 1390 } 1391 1392 return 0; 1393 1394 } 1395 1396 int CSockLink::OutputNotify() 1397 { 1398 int32_t ret = 0; 1399 1400 this->_last_access = mt_time_ms(); 1401 1402 if (_state & LINK_CONNECTING) 1403 { 1404 _state &= ~LINK_CONNECTING; 1405 _state |= LINK_CONNECTED; 1406 1407 CNetHandler* item = NULL; 1408 CNetHandler* tmp = NULL; 1409 TAILQ_FOREACH_SAFE(item, &_wait_connect, _link_entry, tmp) 1410 { 1411 NotifyThread(item, 0); 1412 item->SwitchToIdle(); 1413 } 1414 } 1415 1416 if (_proto_type == NET_PROTO_UDP) 1417 { 1418 ret = SendCacheUdp(NULL, 0); 1419 } 1420 else 1421 { 1422 ret = SendCacheTcp(NULL, 0); 1423 } 1424 1425 if (ret < 0) 1426 { 1427 MTLOG_ERROR("Send on link failed, close it, ret %d[%m]", ret); 1428 _errno = RC_SEND_FAIL; 1429 this->Destroy(); 1430 return ret; 1431 } 1432 1433 if (TAILQ_EMPTY(&_wait_send)) 1434 { 1435 this->DisableOutput(); 1436 if (!MtFrame::Instance()->KqueueCtrlDel(_fd, KQ_EVENT_WRITE)) 1437 { 1438 MTLOG_ERROR("socket epoll mng failed[%m], wait timeout"); 1439 } 1440 } 1441 1442 return 0; 1443 } 1444 1445 int CSockLink::HangupNotify() 1446 { 1447 MTLOG_ERROR("socket epoll error, fd %d", _fd); 1448 1449 this->_errno = RC_KQUEUE_ERROR; 1450 this->Destroy(); 1451 return -1; 1452 } 1453 1454 CDestLinks::CDestLinks() 1455 { 1456 _timeout = 5*60*1000; 1457 _addr_ipv4 = 0; 1458 _net_port = 0; 1459 _proto_type = NET_PROTO_UNDEF; 1460 _conn_type = TYPE_CONN_SESSION; 1461 1462 _max_links = 3; // 默认3个 1463 _curr_link = 0; 1464 _dflt_callback = NULL; 1465 1466 TAILQ_INIT(&_sock_list); 1467 } 1468 1469 void CDestLinks::Reset() 1470 { 1471 CSockLink* item = NULL; 1472 CSockLink* temp = NULL; 1473 TAILQ_FOREACH_SAFE(item, &_sock_list, _link_entry, temp) 1474 { 1475 item->Destroy(); 1476 } 1477 TAILQ_INIT(&_sock_list); 1478 1479 CTimerMng* timer = MtFrame::Instance()->GetTimerMng(); 1480 if (NULL != timer) 1481 { 1482 timer->stop_timer(this); 1483 } 1484 1485 _timeout = 5*60*1000; 1486 _addr_ipv4 = 0; 1487 _net_port = 0; 1488 _proto_type = NET_PROTO_UNDEF; 1489 _conn_type = TYPE_CONN_SESSION; 1490 1491 _max_links = 3; 1492 _curr_link = 0; 1493 } 1494 1495 CDestLinks::~CDestLinks() 1496 { 1497 this->Reset(); 1498 } 1499 1500 void CDestLinks::StartTimer() 1501 { 1502 CTimerMng* timer = MtFrame::Instance()->GetTimerMng(); 1503 if ((NULL == timer) || !timer->start_timer(this, 60*1000)) 1504 { 1505 MTLOG_ERROR("obj %p attach timer failed, error", this); 1506 } 1507 } 1508 1509 void CDestLinks::FreeSockLink(CSockLink* sock) 1510 { 1511 if ((sock == NULL) || (sock->GetParentsPtr() != (void*)this)) 1512 { 1513 MTLOG_ERROR("invalid socklink %p, error", sock); 1514 return; 1515 } 1516 1517 TAILQ_REMOVE(&_sock_list, sock, _link_entry); 1518 if (this->_curr_link > 0) { 1519 this->_curr_link--; 1520 } 1521 1522 sock->Reset(); 1523 CNetMgr::Instance()->FreeSockLink(sock); 1524 } 1525 1526 CSockLink* CDestLinks::GetSockLink() 1527 { 1528 CSockLink* link = NULL; 1529 if (_curr_link < _max_links) 1530 { 1531 link = CNetMgr::Instance()->AllocSockLink(); 1532 if (NULL == link) 1533 { 1534 MTLOG_ERROR("alloc sock link failed, error"); 1535 return NULL; 1536 } 1537 link->SetParentsPtr(this); 1538 link->SetProtoType(_proto_type); 1539 TAILQ_INSERT_TAIL(&_sock_list, link, _link_entry); 1540 _curr_link++; 1541 } 1542 else 1543 { 1544 link = TAILQ_FIRST(&_sock_list); 1545 TAILQ_REMOVE(&_sock_list, link, _link_entry); 1546 TAILQ_INSERT_TAIL(&_sock_list, link, _link_entry); 1547 } 1548 1549 return link; 1550 } 1551 1552 void CDestLinks::timer_notify() 1553 { 1554 uint64_t now = mt_time_ms(); 1555 CSockLink* item = NULL; 1556 CSockLink* temp = NULL; 1557 TAILQ_FOREACH_SAFE(item, &_sock_list, _link_entry, temp) 1558 { 1559 if ((item->GetLastAccess() + this->_timeout) < now) 1560 { 1561 MTLOG_DEBUG("link timeout, last[%llu], now [%llu]", item->GetLastAccess(), now); 1562 item->Destroy(); 1563 } 1564 } 1565 1566 item = TAILQ_FIRST(&_sock_list); 1567 if (NULL == item) 1568 { 1569 MTLOG_DEBUG("dest links timeout, now [%llu]", now); 1570 CNetMgr::Instance()->DeleteDestLink(this); 1571 return; 1572 } 1573 1574 this->StartTimer(); 1575 1576 return; 1577 } 1578 1579 CNetMgr* CNetMgr::_instance = NULL; 1580 CNetMgr* CNetMgr::Instance (void) 1581 { 1582 if (NULL == _instance) 1583 { 1584 _instance = new CNetMgr(); 1585 } 1586 1587 return _instance; 1588 } 1589 1590 void CNetMgr::Destroy() 1591 { 1592 if( _instance != NULL ) 1593 { 1594 delete _instance; 1595 _instance = NULL; 1596 } 1597 } 1598 1599 CNetHandler* CNetMgr::FindNetItem(CNetHandler* key) 1600 { 1601 if (NULL == this->_session_hash) 1602 { 1603 return NULL; 1604 } 1605 1606 return (CNetHandler*)_session_hash->HashFind(key); 1607 } 1608 1609 void CNetMgr::InsertNetItem(CNetHandler* item) 1610 { 1611 if (NULL == this->_session_hash) 1612 { 1613 return; 1614 } 1615 1616 int32_t ret = _session_hash->HashInsert(item); 1617 if (ret < 0) 1618 { 1619 MTLOG_ERROR("session insert failed, ret %d", ret); 1620 } 1621 1622 return; 1623 } 1624 1625 void CNetMgr::RemoveNetItem(CNetHandler* item) 1626 { 1627 CNetHandler* handler = this->FindNetItem(item); 1628 if (NULL == handler) 1629 { 1630 return; 1631 } 1632 1633 _session_hash->HashRemove(handler); 1634 } 1635 1636 CDestLinks* CNetMgr::FindDestLink(CDestLinks* key) 1637 { 1638 if (NULL == this->_ip_hash) 1639 { 1640 return NULL; 1641 } 1642 1643 return (CDestLinks*)_ip_hash->HashFind(key); 1644 } 1645 1646 void CNetMgr::InsertDestLink(CDestLinks* item) 1647 { 1648 if (NULL == this->_ip_hash) 1649 { 1650 return; 1651 } 1652 1653 int32_t ret = _ip_hash->HashInsert(item); 1654 if (ret < 0) 1655 { 1656 MTLOG_ERROR("ip dest insert failed, ret %d", ret); 1657 } 1658 1659 return; 1660 } 1661 1662 void CNetMgr::RemoveDestLink(CDestLinks* item) 1663 { 1664 CDestLinks* handler = this->FindDestLink(item); 1665 if (NULL == handler) 1666 { 1667 return; 1668 } 1669 1670 _ip_hash->HashRemove(handler); 1671 } 1672 1673 CDestLinks* CNetMgr::FindCreateDest(CDestLinks* key) 1674 { 1675 CDestLinks* dest = this->FindDestLink(key); 1676 if (dest != NULL) 1677 { 1678 MTLOG_DEBUG("dest links reuse ok"); 1679 return dest; 1680 } 1681 1682 dest = this->AllocDestLink(); 1683 if (NULL == dest) 1684 { 1685 MTLOG_ERROR("dest links alloc failed, log it"); 1686 return NULL; 1687 } 1688 1689 dest->CopyKeyInfo(key); 1690 dest->StartTimer(); 1691 this->InsertDestLink(dest); 1692 1693 return dest; 1694 } 1695 1696 void CNetMgr::DeleteDestLink(CDestLinks* dst) 1697 { 1698 this->RemoveDestLink(dst); 1699 dst->Reset(); 1700 this->FreeDestLink(dst); 1701 } 1702 1703 CNetMgr::CNetMgr() 1704 { 1705 sk_buffer_mng_init(&_tcp_pool, 60, 4096); 1706 sk_buffer_mng_init(&_udp_pool, 60, SK_DFLT_BUFF_SIZE); 1707 1708 _ip_hash = new HashList(100000); 1709 _session_hash = new HashList(100000); 1710 } 1711 1712 CNetMgr::~CNetMgr() 1713 { 1714 if (_ip_hash != NULL) 1715 { 1716 HashKey* hash_item = _ip_hash->HashGetFirst(); 1717 while (hash_item) 1718 { 1719 delete hash_item; 1720 hash_item = _ip_hash->HashGetFirst(); 1721 } 1722 1723 delete _ip_hash; 1724 _ip_hash = NULL; 1725 } 1726 1727 if (_session_hash != NULL) 1728 { 1729 HashKey* hash_item = _session_hash->HashGetFirst(); 1730 while (hash_item) 1731 { 1732 delete hash_item; 1733 hash_item = _session_hash->HashGetFirst(); 1734 } 1735 1736 delete _session_hash; 1737 _session_hash = NULL; 1738 } 1739 1740 sk_buffer_mng_destroy(&_tcp_pool); 1741 sk_buffer_mng_destroy(&_udp_pool); 1742 } 1743 1744 void CNetMgr::RecycleObjs(uint64_t now) 1745 { 1746 uint32_t now_s = (uint32_t)(now / 1000); 1747 1748 recycle_sk_buffer(&_udp_pool, now_s); 1749 recycle_sk_buffer(&_tcp_pool, now_s); 1750 1751 _net_item_pool.RecycleItem(now); 1752 _sock_link_pool.RecycleItem(now); 1753 _dest_ip_pool.RecycleItem(now); 1754 } 1755