1 2 /** 3 * Tencent is pleased to support the open source community by making MSEC available. 4 * 5 * Copyright (C) 2016 THL A29 Limited, a Tencent company. All rights reserved. 6 * 7 * Licensed under the GNU General Public License, Version 2.0 (the "License"); 8 * you may not use this file except in compliance with the License. You may 9 * obtain a copy of the License at 10 * 11 * https://opensource.org/licenses/GPL-2.0 12 * 13 * Unless required by applicable law or agreed to in writing, software distributed under the 14 * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, 15 * either express or implied. See the License for the specific language governing permissions 16 * and limitations under the License. 17 */ 18 19 20 /** 21 * @file mt_mbuf_pool.cpp 22 * @info �߳���Ϣbuf�ع���ʵ�� 23 * @time 20130924 24 **/ 25 26 #include <errno.h> 27 #include <netinet/tcp.h> 28 #include "micro_thread.h" 29 #include "mt_sys_hook.h" 30 #include "ff_hook.h" 31 #include "mt_net.h" 32 33 34 using namespace std; 35 using namespace NS_MICRO_THREAD; 36 37 // �������鹹 38 CNetHelper::CNetHelper() 39 { 40 handler = (void*)CNetMgr::Instance()->AllocNetItem(); 41 } 42 43 CNetHelper::~CNetHelper() 44 { 45 CNetHandler* net_handler = (CNetHandler*)handler; 46 if (handler != NULL) 47 { 48 net_handler->Reset(); 49 CNetMgr::Instance()->FreeNetItem(net_handler); 50 handler = NULL; 51 } 52 } 53 54 // ͬ���շ��ӿ� 55 int32_t CNetHelper::SendRecv(void* data, uint32_t len, uint32_t timeout) 56 { 57 if (handler != NULL) { 58 CNetHandler* net_handler = (CNetHandler*)handler; 59 return net_handler->SendRecv(data, len, timeout); 60 } else { 61 return RC_INVALID_HANDLER; 62 } 63 } 64 65 // ��ȡ����buff��Ϣ, ��Ч��ֱ��helper���� 66 void* CNetHelper::GetRspBuff() 67 { 68 if (handler != NULL) { 69 CNetHandler* net_handler = (CNetHandler*)handler; 70 return net_handler->GetRspBuff(); 71 } else { 72 return NULL; 73 } 74 } 75 76 // ��ȡ����buff��Ϣ, ��Ч��ֱ��helper���� 77 uint32_t CNetHelper::GetRspLen() 78 { 79 if (handler != NULL) { 80 CNetHandler* net_handler = (CNetHandler*)handler; 81 return net_handler->GetRspLen(); 82 } else { 83 return 0; 84 } 85 } 86 87 88 // ת����������Ϣ, �����ȡ 89 char* CNetHelper::GetErrMsg(int32_t result) 90 { 91 static const char* errmsg = "unknown error type"; 92 93 switch (result) 94 { 95 case RC_SUCCESS: 96 errmsg = "success"; 97 break; 98 99 case RC_ERR_SOCKET: 100 errmsg = "create socket failed"; 101 break; 102 103 case RC_SEND_FAIL: 104 errmsg = "send pakeage timeout or failed"; 105 break; 106 107 case RC_RECV_FAIL: 108 errmsg = "recv response timeout or failed"; 109 break; 110 111 case RC_CONNECT_FAIL: 112 errmsg = "connect timeout or failed"; 113 break; 114 115 case RC_CHECK_PKG_FAIL: 116 errmsg = "user package check failed"; 117 break; 118 119 case RC_NO_MORE_BUFF: 120 errmsg = "user response buffer too small"; 121 break; 122 123 case RC_REMOTE_CLOSED: 124 errmsg = "remote close connection"; 125 break; 126 127 case RC_INVALID_PARAM: 128 errmsg = "params invalid"; 129 break; 130 131 case RC_INVALID_HANDLER: 132 errmsg = "net handler invalid"; 133 break; 134 135 case RC_MEM_ERROR: 136 errmsg = "no more memory, alloc failed"; 137 break; 138 139 case RC_CONFLICT_SID: 140 errmsg = "session id with the dest address conflict"; 141 break; 142 143 case RC_KQUEUE_ERROR: 144 errmsg = "epoll system error"; 145 break; 146 147 default: 148 break; 149 } 150 151 return (char*)errmsg; 152 } 153 154 // ����Э�������, Ĭ��UDP 155 void CNetHelper::SetProtoType(MT_PROTO_TYPE type) 156 { 157 if (handler != NULL) { 158 CNetHandler* net_handler = (CNetHandler*)handler; 159 return net_handler->SetProtoType(type); 160 } 161 } 162 163 // ����Ŀ��IP��ַ 164 void CNetHelper::SetDestAddress(struct sockaddr_in* dst) 165 { 166 if (handler != NULL) { 167 CNetHandler* net_handler = (CNetHandler*)handler; 168 return net_handler->SetDestAddress(dst); 169 } 170 } 171 172 // ����session����session id��Ϣ, �����0 173 void CNetHelper::SetSessionId(uint64_t sid) 174 { 175 if (handler != NULL) { 176 CNetHandler* net_handler = (CNetHandler*)handler; 177 return net_handler->SetSessionId(sid); 178 } 179 } 180 181 // ����session�����ص����� 182 void CNetHelper::SetSessionCallback(CHECK_SESSION_CALLBACK function) 183 { 184 if (handler != NULL) { 185 CNetHandler* net_handler = (CNetHandler*)handler; 186 return net_handler->SetSessionCallback(function); 187 } 188 } 189 190 191 192 // ����һ��net handler, ���ڸ��� 193 void CNetHandler::Reset() 194 { 195 // ȥ��session��connection������� 196 this->Unlink(); 197 this->UnRegistSession(); 198 199 // ��̬�ڴ����� 200 if (_rsp_buff != NULL) { 201 delete_sk_buffer(_rsp_buff); 202 _rsp_buff = NULL; 203 } 204 205 // �ֶγ�ʼ������ 206 _thread = NULL; 207 _proto_type = NET_PROTO_TCP; 208 _conn_type = TYPE_CONN_SESSION; 209 _dest_ipv4.sin_addr.s_addr = 0; 210 _dest_ipv4.sin_port = 0; 211 _session_id = 0; 212 _callback = NULL; 213 _err_no = 0; 214 _state_flags = 0; 215 _conn_ptr = NULL; 216 _send_pos = 0; 217 _req_len = 0; 218 _req_data = NULL; 219 220 } 221 222 // ���캯�� 223 CNetHandler::CNetHandler() 224 { 225 _state_flags = 0; 226 _rsp_buff = NULL; 227 228 this->Reset(); 229 } 230 231 // �������� 232 CNetHandler::~CNetHandler() 233 { 234 this->Reset(); 235 } 236 237 // ����Ҫ�IJ�����Ϣ 238 int32_t CNetHandler::CheckParams() 239 { 240 // 1. ��������Ч��� 241 if ((NULL == _req_data) || (_req_len == 0)) 242 { 243 MTLOG_ERROR("param invalid, data[%p], len[%u]", _req_data, _req_len); 244 return RC_INVALID_PARAM; 245 } 246 247 // 2. Ŀ�ĵ�ַ��Ч��� 248 if ((_dest_ipv4.sin_addr.s_addr == 0) || (_dest_ipv4.sin_port == 0)) 249 { 250 MTLOG_ERROR("param invalid, ip[%u], port[%u]", _dest_ipv4.sin_addr.s_addr, 251 _dest_ipv4.sin_port); 252 return RC_INVALID_PARAM; 253 } 254 255 // 3. session ���ͼ����ȷ�� 256 if (_conn_type == TYPE_CONN_SESSION) 257 { 258 if ((_callback == NULL) || (_session_id == 0)) 259 { 260 MTLOG_ERROR("param invalid, callback[%p], session_id[%llu]", _callback, _session_id); 261 return RC_INVALID_PARAM; 262 } 263 264 // �����ע��session��Ϣ 265 if (!this->RegistSession()) 266 { 267 MTLOG_ERROR("param invalid, session_id[%llu] regist failed", _session_id); 268 return RC_CONFLICT_SID; 269 } 270 } 271 272 // 4. ����ִ�� 273 return 0; 274 } 275 276 // ��ȡ����, ͬʱ�������ȴ����ӵĶ����� 277 int32_t CNetHandler::GetConnLink() 278 { 279 CDestLinks key; 280 key.SetKeyInfo(_dest_ipv4.sin_addr.s_addr, _dest_ipv4.sin_port, _proto_type, _conn_type); 281 282 CDestLinks* dest_link = CNetMgr::Instance()->FindCreateDest(&key); 283 if (NULL == dest_link) 284 { 285 MTLOG_ERROR("get dest link handle failed"); 286 return RC_MEM_ERROR; 287 } 288 289 CSockLink* sock_link = dest_link->GetSockLink(); 290 if (NULL == sock_link) 291 { 292 MTLOG_ERROR("get sock link handle failed"); 293 return RC_MEM_ERROR; 294 } 295 296 this->Link(sock_link); 297 298 return 0; 299 } 300 301 302 // ����Ҫ�IJ�����Ϣ 303 int32_t CNetHandler::WaitConnect(uint64_t timeout) 304 { 305 CSockLink* conn = (CSockLink*)this->_conn_ptr; 306 if (NULL == conn) 307 { 308 MTLOG_ERROR("get sock link handle failed"); 309 return RC_MEM_ERROR; 310 } 311 312 int32_t fd = conn->CreateSock(); 313 if (fd < 0) 314 { 315 MTLOG_ERROR("create sock failed, ret %d[%m]", fd); 316 return RC_ERR_SOCKET; 317 } 318 319 if (conn->Connect()) 320 { 321 MTLOG_DEBUG("sock conncet ok"); 322 return RC_SUCCESS; 323 } 324 325 // ����ȴ�connect���� 326 this->SwitchToConn(); 327 328 // �ȴ������� 329 MtFrame* mtframe = MtFrame::Instance(); 330 mtframe->WaitNotify(timeout); 331 332 // ɾ����connect����, ��ʱ��Ҫ�����, �������� 333 this->SwitchToIdle(); 334 335 if (_err_no != 0) 336 { 337 MTLOG_ERROR("connect get out errno %d", _err_no); 338 return _err_no; 339 } 340 341 // ��ʱ��������ȷ�� 342 if (conn->Connected()) 343 { 344 MTLOG_DEBUG("connect ok"); 345 return 0; 346 } 347 else 348 { 349 MTLOG_TRACE("connect not ok, maybe timeout"); 350 return RC_CONNECT_FAIL; 351 } 352 } 353 354 355 // ����Ҫ�IJ�����Ϣ 356 int32_t CNetHandler::WaitSend(uint64_t timeout) 357 { 358 CSockLink* conn = (CSockLink*)this->_conn_ptr; 359 if (NULL == conn) 360 { 361 MTLOG_ERROR("get sock link handle failed"); 362 return RC_MEM_ERROR; 363 } 364 365 int32_t ret = conn->SendData(_req_data, _req_len); 366 if (ret < 0) 367 { 368 MTLOG_ERROR("sock send failed, ret %d[%m]", ret); 369 return RC_SEND_FAIL; 370 } 371 this->SkipSendPos(ret); 372 373 if (_req_len == 0) 374 { 375 MTLOG_DEBUG("sock send ok"); 376 return RC_SUCCESS; 377 } 378 379 // �ȴ���������, �л��ѷ��͵���Ϣ 380 this->SwitchToSend(); 381 382 // �ȴ������� 383 MtFrame* mtframe = MtFrame::Instance(); 384 mtframe->WaitNotify(timeout); 385 386 // ɾ����connect���� 387 this->SwitchToIdle(); 388 389 // �쳣��� 390 if (_err_no != 0) 391 { 392 MTLOG_ERROR("send get out errno %d", _err_no); 393 return _err_no; 394 } 395 396 // ��ʱ��� 397 if (_req_len == 0) 398 { 399 MTLOG_DEBUG("send req ok, len %u", _send_pos); 400 return 0; 401 } 402 else 403 { 404 MTLOG_TRACE("send req not ok, left len %u", _req_len); 405 return RC_SEND_FAIL; 406 } 407 } 408 409 // ����Ҫ�IJ�����Ϣ 410 int32_t CNetHandler::WaitRecv(uint64_t timeout) 411 { 412 CSockLink* conn = (CSockLink*)this->_conn_ptr; 413 if (NULL == conn) 414 { 415 MTLOG_ERROR("get sock link handle failed"); 416 return RC_MEM_ERROR; 417 } 418 419 if (_conn_type == TYPE_CONN_SENDONLY) 420 { 421 MTLOG_DEBUG("only send, without recv"); 422 return 0; 423 } 424 425 // �л����ȴ����� 426 this->SwitchToRecv(); 427 428 // �ȴ������� 429 MtFrame* mtframe = MtFrame::Instance(); 430 mtframe->WaitNotify(timeout); 431 432 // ɾ����connect���� 433 this->SwitchToIdle(); 434 435 // ��ʱ��� 436 if ((_rsp_buff != NULL) && (_rsp_buff->data_len > 0)) 437 { 438 MTLOG_DEBUG("recv get rsp, len %d", _rsp_buff->data_len); 439 return 0; 440 } 441 else 442 { 443 MTLOG_TRACE("recv get out errno %d", _err_no); 444 return RC_RECV_FAIL; 445 } 446 } 447 448 449 // ͬ���շ��ӿ�, ��Ϊsessionר�� 450 int32_t CNetHandler::SendRecv(void* data, uint32_t len, uint32_t timeout) 451 { 452 utime64_t start_ms = MtFrame::Instance()->GetLastClock(); 453 utime64_t cost_time = 0; 454 uint64_t time_left = timeout; 455 this->_req_data = data; 456 this->_req_len = len; 457 458 // 0. ����Ҫ��������Ϣ 459 int32_t ret = this->CheckParams(); 460 if (ret < 0) 461 { 462 MTLOG_ERROR("check params failed, ret[%d]", ret); 463 goto EXIT_LABEL; 464 } 465 466 // 1. ��ȡ������·ָ�� 467 ret = this->GetConnLink(); 468 if (ret < 0) 469 { 470 MTLOG_ERROR("get sock conn failed, ret: %d", ret); 471 goto EXIT_LABEL; 472 } 473 474 // 2. �ȴ����ӳɹ� 475 ret = this->WaitConnect(time_left); 476 if (ret < 0) 477 { 478 MTLOG_ERROR("sock connect failed, ret: %d", ret); 479 goto EXIT_LABEL; 480 } 481 482 // 3. �ȴ����ͳɹ� 483 cost_time = MtFrame::Instance()->GetLastClock() - start_ms; 484 time_left = (timeout > (uint32_t)cost_time) ? (timeout - (uint32_t)cost_time) : 0; 485 ret = this->WaitSend(time_left); 486 if (ret < 0) 487 { 488 MTLOG_ERROR("sock send failed, ret: %d", ret); 489 goto EXIT_LABEL; 490 } 491 492 // 4. �ȴ����ճɹ� 493 cost_time = MtFrame::Instance()->GetLastClock() - start_ms; 494 time_left = (timeout > (uint32_t)cost_time) ? (timeout - (uint32_t)cost_time) : 0; 495 ret = this->WaitRecv(time_left); 496 if (ret < 0) 497 { 498 MTLOG_ERROR("sock recv failed, ret: %d", ret); 499 goto EXIT_LABEL; 500 } 501 502 // 5. �ɹ����� 503 ret = 0; 504 505 EXIT_LABEL: 506 507 // ���η������, ��ҪUNLINK, ����NETHANDLER֧�� 508 this->Unlink(); 509 510 // �ɹ�ʧ��, ��Ҫȥ��sessionע�� 511 this->UnRegistSession(); 512 513 return ret; 514 } 515 516 517 // �������͵����� 518 uint32_t CNetHandler::SkipSendPos(uint32_t len) 519 { 520 uint32_t skip_len = (len >= _req_len) ? _req_len : len; 521 _req_len -= skip_len; 522 _send_pos += skip_len; 523 _req_data = (char*)_req_data + skip_len; 524 525 return skip_len; 526 } 527 528 // �������Ӷ��� 529 void CNetHandler::Link(CSockLink* conn) 530 { 531 this->_conn_ptr = conn; 532 this->SwitchToIdle(); 533 } 534 535 536 // �������Ӷ��� 537 void CNetHandler::Unlink() 538 { 539 if (this->_state_flags != 0) 540 { 541 this->DetachConn(); 542 } 543 this->_conn_ptr = NULL; 544 } 545 546 // �����ڵȴ����Ӷ��� 547 void CNetHandler::SwitchToConn() 548 { 549 CSockLink* conn = (CSockLink*)this->_conn_ptr; 550 if (NULL == conn) 551 { 552 MTLOG_ERROR("net handler invalid"); 553 return; 554 } 555 556 this->DetachConn(); 557 558 this->_state_flags |= STATE_IN_CONNECT; 559 conn->AppendToList(CSockLink::LINK_CONN_LIST, this); 560 } 561 562 // �л������Ͷ��� 563 void CNetHandler::SwitchToSend() 564 { 565 CSockLink* conn = (CSockLink*)this->_conn_ptr; 566 if (NULL == conn) 567 { 568 MTLOG_ERROR("net handler invalid"); 569 return; 570 } 571 572 this->DetachConn(); 573 574 this->_state_flags |= STATE_IN_SEND; 575 conn->AppendToList(CSockLink::LINK_SEND_LIST, this); 576 } 577 578 579 // �л������ն��� 580 void CNetHandler::SwitchToRecv() 581 { 582 CSockLink* conn = (CSockLink*)this->_conn_ptr; 583 if (NULL == conn) 584 { 585 MTLOG_ERROR("net handler invalid"); 586 return; 587 } 588 589 this->DetachConn(); 590 591 this->_state_flags |= STATE_IN_RECV; 592 conn->AppendToList(CSockLink::LINK_RECV_LIST, this); 593 } 594 595 // �л�������״̬ 596 void CNetHandler::SwitchToIdle() 597 { 598 CSockLink* conn = (CSockLink*)this->_conn_ptr; 599 if (NULL == conn) 600 { 601 MTLOG_ERROR("net handler invalid"); 602 return; 603 } 604 605 this->DetachConn(); 606 607 this->_state_flags |= STATE_IN_IDLE; 608 conn->AppendToList(CSockLink::LINK_IDLE_LIST, this); 609 } 610 611 612 // ��������״̬���� 613 void CNetHandler::DetachConn() 614 { 615 CSockLink* conn = (CSockLink*)this->_conn_ptr; 616 if (NULL == conn) 617 { 618 MTLOG_DEBUG("net handler not set"); 619 return; 620 } 621 622 if (_state_flags == 0) // ��ʱ���¼�����, 2����֧, ����Ҫ������ 623 { 624 return; 625 } 626 627 if (_state_flags & STATE_IN_CONNECT) 628 { 629 conn->RemoveFromList(CSockLink::LINK_CONN_LIST, this); 630 _state_flags &= ~STATE_IN_CONNECT; 631 } 632 633 if (_state_flags & STATE_IN_SEND) 634 { 635 conn->RemoveFromList(CSockLink::LINK_SEND_LIST, this); 636 _state_flags &= ~STATE_IN_SEND; 637 } 638 639 if (_state_flags & STATE_IN_RECV) 640 { 641 conn->RemoveFromList(CSockLink::LINK_RECV_LIST, this); 642 _state_flags &= ~STATE_IN_RECV; 643 } 644 645 if (_state_flags & STATE_IN_IDLE) 646 { 647 conn->RemoveFromList(CSockLink::LINK_IDLE_LIST, this); 648 _state_flags &= ~STATE_IN_IDLE; 649 } 650 } 651 652 653 654 655 /** 656 * @brief �ڵ�Ԫ�ص�hash�㷨, ��ȡkey��hashֵ 657 * @return �ڵ�Ԫ�ص�hashֵ 658 */ 659 uint32_t CNetHandler::HashValue() 660 { 661 uint32_t ip = _dest_ipv4.sin_addr.s_addr; 662 ip ^= (_dest_ipv4.sin_port << 16) | (_proto_type << 8) | (_conn_type << 8); 663 664 uint32_t hash = (_session_id >> 32) & 0xffffffff; 665 hash ^= _session_id & 0xffffffff; 666 hash ^= ip; 667 668 return hash; 669 } 670 671 /** 672 * @brief �ڵ�Ԫ�ص�cmp����, ͬһͰID��, ��key�Ƚ� 673 * @return �ڵ�Ԫ�ص�hashֵ 674 */ 675 int32_t CNetHandler::HashCmp(HashKey* rhs) 676 { 677 CNetHandler* data = (CNetHandler*)(rhs); 678 if (!data) { 679 return -1; 680 } 681 if (this->_session_id != data->_session_id) 682 { 683 return (this->_session_id > data->_session_id) ? 1 : -1; 684 } 685 686 if (this->_dest_ipv4.sin_addr.s_addr != data->_dest_ipv4.sin_addr.s_addr) { 687 return (this->_dest_ipv4.sin_addr.s_addr > data->_dest_ipv4.sin_addr.s_addr) ? 1 : -1; 688 } 689 if (this->_dest_ipv4.sin_port != data->_dest_ipv4.sin_port) { 690 return (this->_dest_ipv4.sin_port > data->_dest_ipv4.sin_port) ? 1 : -1; 691 } 692 if (this->_proto_type != data->_proto_type) { 693 return (this->_proto_type > data->_proto_type) ? 1 : -1; 694 } 695 if (this->_conn_type != data->_conn_type) { 696 return (this->_conn_type > data->_conn_type) ? 1 : -1; 697 } 698 699 return 0; 700 }; 701 702 703 // ע��session���� 704 bool CNetHandler::RegistSession() 705 { 706 if (CNetMgr::Instance()->FindNetItem(this) != NULL) 707 { 708 return false; 709 } 710 711 MtFrame* mtframe = MtFrame::Instance(); 712 this->_thread = mtframe->GetActiveThread(); 713 714 CNetMgr::Instance()->InsertNetItem(this); 715 this->_state_flags |= STATE_IN_SESSION; 716 return true; 717 } 718 719 // ȡ��ע��session 720 void CNetHandler::UnRegistSession() 721 { 722 if (this->_state_flags & STATE_IN_SESSION) 723 { 724 CNetMgr::Instance()->RemoveNetItem(this); 725 this->_state_flags &= ~STATE_IN_SESSION; 726 } 727 } 728 729 730 // ��ȡ�������� 731 TNetItemList* CSockLink::GetItemList(int32_t type) 732 { 733 TNetItemList* list = NULL; 734 switch (type) 735 { 736 case LINK_IDLE_LIST: 737 list = &this->_idle_list; 738 break; 739 740 case LINK_CONN_LIST: 741 list = &this->_wait_connect; 742 break; 743 744 case LINK_SEND_LIST: 745 list = &this->_wait_send; 746 break; 747 748 case LINK_RECV_LIST: 749 list = &this->_wait_recv; 750 break; 751 752 default: 753 break; 754 } 755 756 return list; 757 } 758 759 760 // ��������Ϣ 761 void CSockLink::AppendToList(int32_t type, CNetHandler* item) 762 { 763 TNetItemList* list = this->GetItemList(type); 764 if (NULL == list) 765 { 766 MTLOG_ERROR("unknown list type: %d", type); 767 return; 768 } 769 770 TAILQ_INSERT_TAIL(list, item, _link_entry); 771 } 772 773 // ��������Ϣ 774 void CSockLink::RemoveFromList(int32_t type, CNetHandler* item) 775 { 776 TNetItemList* list = this->GetItemList(type); 777 if (NULL == list) 778 { 779 MTLOG_ERROR("unknown list type: %d", type); 780 return; 781 } 782 783 TAILQ_REMOVE(list, item, _link_entry); 784 } 785 786 787 // ֪ͨ�����߳� 788 void CSockLink::NotifyThread(CNetHandler* item, int32_t result) 789 { 790 static MtFrame* frame = NULL; 791 if (frame == NULL) { 792 frame = MtFrame::Instance(); 793 } 794 795 // ���÷�������Ϣ 796 if (result != RC_SUCCESS) 797 { 798 item->SetErrNo(result); 799 } 800 801 // ���ÿ����� 802 MicroThread* thread = item->GetThread(); 803 if ((thread != NULL) && (thread->HasFlag(MicroThread::IO_LIST))) 804 { 805 frame->RemoveIoWait(thread); 806 frame->InsertRunable(thread); 807 } 808 } 809 810 811 // ֪ͨ�����߳� 812 void CSockLink::NotifyAll(int32_t result) 813 { 814 CNetHandler* item = NULL; 815 CNetHandler* tmp = NULL; 816 817 TAILQ_FOREACH_SAFE(item, &_wait_connect, _link_entry, tmp) 818 { 819 NotifyThread(item, result); 820 item->Unlink(); 821 } 822 823 TAILQ_FOREACH_SAFE(item, &_wait_send, _link_entry, tmp) 824 { 825 NotifyThread(item, result); 826 item->Unlink(); 827 } 828 829 TAILQ_FOREACH_SAFE(item, &_wait_recv, _link_entry, tmp) 830 { 831 NotifyThread(item, result); 832 item->Unlink(); 833 } 834 835 TAILQ_FOREACH_SAFE(item, &_idle_list, _link_entry, tmp) 836 { 837 NotifyThread(item, result); 838 item->Unlink(); 839 } 840 } 841 842 843 // �����ó�ʼ���� 844 void CSockLink::Reset() 845 { 846 // �ر�fd, ֪ͨ�������߳� 847 this->Close(); 848 this->NotifyAll(_errno); 849 850 // ����cache���� 851 rw_cache_destroy(&_recv_cache); 852 if (_rsp_buff != NULL) 853 { 854 delete_sk_buffer(_rsp_buff); 855 _rsp_buff = NULL; 856 } 857 858 // ����ȴ�����, ���ѵȴ��߳� 859 TAILQ_INIT(&_wait_connect); 860 TAILQ_INIT(&_wait_send); 861 TAILQ_INIT(&_wait_recv); 862 TAILQ_INIT(&_idle_list); 863 864 _proto_type = NET_PROTO_TCP; 865 _errno = 0; 866 _state = 0; 867 _last_access = mt_time_ms(); 868 _parents = NULL; 869 870 // ����������� 871 this->KqueuerObj::Reset(); 872 } 873 874 // �������������� 875 CSockLink::CSockLink() 876 { 877 rw_cache_init(&_recv_cache, NULL); 878 _rsp_buff = NULL; 879 880 TAILQ_INIT(&_wait_connect); 881 TAILQ_INIT(&_wait_send); 882 TAILQ_INIT(&_wait_recv); 883 TAILQ_INIT(&_idle_list); 884 885 _proto_type = NET_PROTO_TCP; 886 _errno = 0; 887 _state = 0; 888 _last_access = mt_time_ms(); 889 _parents = NULL; 890 } 891 892 // �������������� 893 CSockLink::~CSockLink() 894 { 895 this->Reset(); 896 } 897 898 899 // ����Э������, ����buff�ص�ָ�� 900 void CSockLink::SetProtoType(MT_PROTO_TYPE type) 901 { 902 _proto_type = type; 903 _recv_cache.pool = CNetMgr::Instance()->GetSkBuffMng(type); 904 } 905 906 // �ر���·�ľ�� 907 void CSockLink::Close() 908 { 909 // 1. ���δ��ʼ��, ֱ�ӷ��� 910 if (_fd < 0) 911 { 912 return; 913 } 914 915 // 2. ��������ͷ� 916 MtFrame::Instance()->KqueueDelObj(this); 917 918 // 3. �رվ�� 919 close(_fd); 920 _fd = -1; 921 } 922 923 924 // �쳣��ֹ�Ĵ����� 925 void CSockLink::Destroy() 926 { 927 CDestLinks* dstlink = (CDestLinks*)_parents; 928 if (NULL == dstlink) 929 { 930 MTLOG_ERROR("socket link without parents ptr, maybe wrong"); 931 delete this; 932 } 933 else 934 { 935 MTLOG_DEBUG("socket link just free"); 936 dstlink->FreeSockLink(this); 937 } 938 } 939 940 941 // ����socket��� 942 int32_t CSockLink::CreateSock() 943 { 944 if (_fd > 0) // ��������ʱ, ��������������; 945 { 946 return _fd; 947 } 948 949 // ����� 950 if (NET_PROTO_TCP == _proto_type) 951 { 952 _fd = socket(AF_INET, SOCK_STREAM, 0); 953 } 954 else 955 { 956 _fd = socket(AF_INET, SOCK_DGRAM, 0); 957 } 958 959 if (_fd < 0) 960 { 961 MTLOG_ERROR("create socket failed, ret %d[%m]", _fd); 962 return -1; 963 } 964 965 // ���÷����� 966 int flags = 1; 967 if (ioctl(_fd, FIONBIO, &flags) < 0) 968 { 969 MTLOG_ERROR("socket unblock failed, %m"); 970 close(_fd); 971 _fd = -1; 972 return -2; 973 } 974 975 // ѡ������, NODELAY 976 if (NET_PROTO_TCP == _proto_type) 977 { 978 setsockopt(_fd, IPPROTO_TCP, TCP_NODELAY, &flags, sizeof(flags)); 979 this->EnableOutput(); // TCP �ȴ�connect����, ʡһ��epollctrl 980 } 981 982 // ����epoll��� 983 this->EnableInput(); 984 if (!MtFrame::Instance()->KqueueAddObj(this)) 985 { 986 MTLOG_ERROR("socket epoll mng failed, %m"); 987 close(_fd); 988 _fd = -1; 989 return -3; 990 } 991 992 return _fd; 993 } 994 995 // ��ȡĿ��ip��Ϣ 996 struct sockaddr_in* CSockLink::GetDestAddr(struct sockaddr_in* addr) 997 { 998 CDestLinks* dstlink = (CDestLinks*)_parents; 999 if ((NULL == _parents) || (NULL == addr)) { 1000 return NULL; 1001 } 1002 1003 uint32_t ip = 0; 1004 uint16_t port = 0; 1005 dstlink->GetDestIP(ip, port); 1006 1007 addr->sin_family = AF_INET; 1008 addr->sin_addr.s_addr = ip; 1009 addr->sin_port = port; 1010 1011 return addr; 1012 } 1013 1014 1015 // �������ӹ��� 1016 bool CSockLink::Connect() 1017 { 1018 this->_last_access = mt_time_ms(); 1019 1020 // 1. UDP�����ӹ��� 1021 if (_proto_type == NET_PROTO_UDP) 1022 { 1023 _state |= LINK_CONNECTED; 1024 } 1025 1026 // 2. ������״̬, �ɹ����� 1027 if (_state & LINK_CONNECTED) 1028 { 1029 return true; 1030 } 1031 1032 // 3. ����������, �˳��ȴ� 1033 if (_state & LINK_CONNECTING) 1034 { 1035 return false; 1036 } 1037 1038 // 4. ������, �״����ӳ��� 1039 struct sockaddr_in addr = {0}; 1040 1041 mt_hook_syscall(connect); 1042 int32_t ret = ff_hook_connect(_fd, (struct sockaddr*)this->GetDestAddr(&addr), sizeof(struct sockaddr_in)); 1043 if (ret < 0) 1044 { 1045 int32_t err = errno; 1046 if (err == EISCONN) 1047 { 1048 _state |= LINK_CONNECTED; 1049 return true; 1050 } 1051 else 1052 { 1053 _state |= LINK_CONNECTING; 1054 if ((err == EINPROGRESS) || (err == EALREADY) || (err == EINTR)) 1055 { 1056 MTLOG_DEBUG("Open connect not ok, maybe first try, sock %d, errno %d", _fd, err); 1057 return false; 1058 } 1059 else 1060 { 1061 MTLOG_ERROR("Open connect not ok, sock %d, errno %d", _fd, err); 1062 return false; 1063 } 1064 } 1065 } 1066 else 1067 { 1068 _state |= LINK_CONNECTED; 1069 return true; 1070 } 1071 } 1072 1073 // ��UDP�ķ�ʽ���͵ȴ�������, һ��ֱ�Ӿͷ���OK 1074 int32_t CSockLink::SendCacheUdp(void* data, uint32_t len) 1075 { 1076 mt_hook_syscall(sendto); 1077 void* buff = NULL; 1078 uint32_t buff_len = 0; 1079 1080 CNetHandler* item = NULL; 1081 CNetHandler* tmp = NULL; 1082 struct sockaddr_in dst = {0}; 1083 1084 // 1. ���Է��͵ȴ�����, ����ָ�����߳� 1085 TAILQ_FOREACH_SAFE(item, &_wait_send, _link_entry, tmp) 1086 { 1087 item->GetSendData(buff, buff_len); 1088 if ((NULL == buff) || (buff_len == 0)) 1089 { 1090 MTLOG_ERROR("get buff ptr invalid, log it"); 1091 NotifyThread(item, 0); 1092 item->SwitchToIdle(); 1093 continue; 1094 } 1095 1096 int32_t ret = ff_hook_sendto(_fd, buff, buff_len, 0, 1097 (struct sockaddr*)this->GetDestAddr(&dst), sizeof(struct sockaddr_in)); 1098 if (ret == -1) 1099 { 1100 if ((errno == EINTR) || (errno == EAGAIN) || (errno == EINPROGRESS)) 1101 { 1102 return 0; 1103 } 1104 else 1105 { 1106 MTLOG_ERROR("socket send failed, fd %d, errno %d(%s)", _fd, 1107 errno, strerror(errno)); 1108 return -2; 1109 } 1110 } 1111 1112 NotifyThread(item, 0); 1113 item->SwitchToIdle(); 1114 } 1115 1116 // 2. ������OK��, �ٷ��ͱ�������, û�д��������������� 1117 if ((data == NULL) || (len == 0)) 1118 { 1119 return 0; 1120 } 1121 1122 int32_t ret = ff_hook_sendto(_fd, data, len, 0, 1123 (struct sockaddr*)this->GetDestAddr(&dst), sizeof(struct sockaddr_in)); 1124 if (ret == -1) 1125 { 1126 if ((errno == EINTR) || (errno == EAGAIN) || (errno == EINPROGRESS)) 1127 { 1128 return 0; 1129 } 1130 else 1131 { 1132 MTLOG_ERROR("socket send failed, fd %d, errno %d(%s)", _fd, 1133 errno, strerror(errno)); 1134 return -2; 1135 } 1136 } 1137 else 1138 { 1139 return ret; 1140 } 1141 } 1142 1143 1144 // TCP�Ļ��巢�ʹ��� 1145 int32_t CSockLink::SendCacheTcp(void* data, uint32_t len) 1146 { 1147 void* buff = NULL; 1148 uint32_t buff_len = 0; 1149 struct iovec iov[64]; 1150 int32_t count = 0; 1151 CNetHandler* item = NULL; 1152 CNetHandler* tmp = NULL; 1153 1154 // 1. ���Է��͵ȴ�����, ����ָ�����߳� 1155 TAILQ_FOREACH_SAFE(item, &_wait_send, _link_entry, tmp) 1156 { 1157 item->GetSendData(buff, buff_len); 1158 iov[count].iov_base = buff; 1159 iov[count].iov_len = (int32_t)buff_len; 1160 count++; 1161 if (count >= 64) 1162 { 1163 break; 1164 } 1165 } 1166 if ((count < 64) && (data != NULL)) 1167 { 1168 iov[count].iov_base = data; 1169 iov[count].iov_len = (int32_t)len; 1170 count++; 1171 } 1172 1173 ssize_t bytes = writev(_fd, iov, count); 1174 if (bytes < 0) 1175 { 1176 if ((errno == EAGAIN) || (errno == EINTR)) 1177 { 1178 return 0; 1179 } 1180 else 1181 { 1182 MTLOG_ERROR("socket writev failed, fd %d, errno %d(%s)", _fd, 1183 errno, strerror(errno)); 1184 return -1; 1185 } 1186 } 1187 1188 // 2. ���Է��͵ȴ�����, ����ָ�����߳� 1189 uint32_t send_left = (uint32_t)bytes; 1190 TAILQ_FOREACH_SAFE(item, &_wait_send, _link_entry, tmp) 1191 { 1192 send_left -= item->SkipSendPos(send_left); 1193 item->GetSendData(buff, buff_len); 1194 if (buff_len == 0) 1195 { 1196 NotifyThread(item, 0); 1197 item->SwitchToIdle(); 1198 } 1199 1200 if (send_left == 0) 1201 { 1202 break; 1203 } 1204 } 1205 1206 return send_left; 1207 } 1208 1209 1210 // �������ӹ��� 1211 int32_t CSockLink::SendData(void* data, uint32_t len) 1212 { 1213 int32_t ret = 0; 1214 bool rc = false; 1215 1216 this->_last_access = mt_time_ms(); 1217 1218 // 1. ���Է�������, �ȷ����Ŷӵ����� 1219 if (_proto_type == NET_PROTO_UDP) 1220 { 1221 ret = SendCacheUdp(data, len); 1222 } 1223 else 1224 { 1225 ret = SendCacheTcp(data, len); 1226 } 1227 1228 // 2. ��ǰ�����Ƿ������, �����, ���Բ�������OUT 1229 if (ret < (int32_t)len) 1230 { 1231 this->EnableOutput(); 1232 rc = MtFrame::Instance()->KqueueCtrlAdd(_fd, KQ_EVENT_READ); 1233 } 1234 else 1235 { 1236 this->DisableOutput(); 1237 rc = MtFrame::Instance()->KqueueCtrlDel(_fd, KQ_EVENT_WRITE); 1238 } 1239 1240 // 3. ����ˢ�¾��epollע�� 1241 if (!rc) 1242 { 1243 MTLOG_ERROR("socket epoll mng failed[%m], wait timeout"); 1244 } 1245 1246 return ret; 1247 } 1248 1249 // ���ݷַ�������� 1250 int32_t CSockLink::RecvDispath() 1251 { 1252 if (_proto_type == NET_PROTO_UDP) 1253 { 1254 return this->DispathUdp(); 1255 } 1256 else 1257 { 1258 return this->DispathTcp(); 1259 } 1260 } 1261 1262 // ���Խ��ո�������ݵ���ʱbuff 1263 void CSockLink::ExtendRecvRsp() 1264 { 1265 if (NULL == _rsp_buff) 1266 { 1267 // buff���η���, ���̫��, �ᵼ���ظ��Ŀ���; ���̫С, �ᵼ��2��check��� 1268 // Ȩ��һ��, �ݶ�500�ֽ�����, �ֳ���, ������ȫ, �˷Ѳ���, С���ֳ��� 1269 // ��Ҫrealloc, ��500�ֽڿ�������Ҳ���� 1270 _rsp_buff = new_sk_buffer(512); 1271 if (NULL == _rsp_buff) 1272 { 1273 MTLOG_ERROR("no more memory, error"); 1274 return; 1275 } 1276 } 1277 1278 _rsp_buff->data_len += read_cache_begin(&_recv_cache, _rsp_buff->data_len, 1279 _rsp_buff->data + _rsp_buff->data_len , _rsp_buff->size - _rsp_buff->data_len); 1280 } 1281 1282 // ���ص�����, ���ȴ��Ŷӵȴ��л�ȡ, ���ݴӸ��ڵ��ȡ 1283 CHECK_SESSION_CALLBACK CSockLink::GetSessionCallback() 1284 { 1285 CHECK_SESSION_CALLBACK check_session = NULL; 1286 1287 // 1. �Ŷӻ�ȡ�ص����� 1288 CNetHandler* item = TAILQ_FIRST(&_wait_recv); 1289 if (NULL == item) 1290 { 1291 MTLOG_DEBUG("recv data with no wait item, err"); 1292 goto EXIT_LABEL; 1293 } 1294 1295 check_session = item->GetSessionCallback(); 1296 if (NULL == check_session) 1297 { 1298 MTLOG_ERROR("recv data with no session callback, err"); 1299 goto EXIT_LABEL; 1300 } 1301 1302 EXIT_LABEL: 1303 1304 // 2. ������ڵ�ΪNULL, ֱ�ӷ��ض��н�� 1305 CDestLinks* dstlink = (CDestLinks*)_parents; 1306 if (NULL == dstlink) 1307 { 1308 return check_session; 1309 } 1310 1311 // 3. ������н��Ϊ��, ����·Ĭ�ϻ����func; ��Ϊ��, ���� 1312 if (check_session != NULL) 1313 { 1314 dstlink->SetDefaultCallback(check_session); 1315 } 1316 else 1317 { 1318 check_session = dstlink->GetDefaultCallback(); 1319 } 1320 1321 return check_session; 1322 } 1323 1324 1325 // TCP����������������ַ� 1326 int32_t CSockLink::DispathTcp() 1327 { 1328 // 1. TCP�ȴ�����, ������ʽ, �������β, ֻ�ܹر� 1329 CHECK_SESSION_CALLBACK check_session = this->GetSessionCallback(); 1330 if (NULL == check_session) 1331 { 1332 MTLOG_ERROR("recv data with no session callback, err"); 1333 return -1; 1334 } 1335 1336 // 2. ����ͬһIP/PORT��Э��, ��������������ͨ�� 1337 uint32_t need_len = 0; 1338 uint64_t sid = 0; 1339 int32_t ret = 0; 1340 while (_recv_cache.len > 0) 1341 { 1342 this->ExtendRecvRsp(); 1343 if (NULL == _rsp_buff) 1344 { 1345 MTLOG_ERROR("alloc memory, error"); 1346 _errno = RC_MEM_ERROR; 1347 return -3; 1348 } 1349 1350 need_len = 0; 1351 ret = check_session(_rsp_buff->data, _rsp_buff->data_len, &sid, &need_len); 1352 1353 if (ret < 0) 1354 { 1355 MTLOG_ERROR("user check resp failed, ret %d", ret); 1356 _errno = RC_CHECK_PKG_FAIL; 1357 return -1; 1358 } 1359 1360 if (ret == 0) 1361 { 1362 // 1. �û������ָ������, Ĭ��2����չ, ���ܻ���Ӱ�� 1363 if ((need_len == 0) && (_rsp_buff->data_len == _rsp_buff->size)) 1364 { 1365 MTLOG_DEBUG("recv default buff full[%u], but user no set need length", _rsp_buff->size); 1366 need_len = _rsp_buff->size * 2; 1367 } 1368 1369 // 2. ����ʣ��ռ�, ������ȴ�����; �����ռ䳬��, ������� 1370 if ((need_len <= _rsp_buff->size) || (need_len > 100*1024*1024)) 1371 { 1372 MTLOG_DEBUG("maybe need wait more data: %u", need_len); 1373 return 0; 1374 } 1375 1376 // 3. ��չbuff����, ���ٳ���һ�� 1377 _rsp_buff = reserve_sk_buffer(_rsp_buff, need_len); 1378 if (NULL == _rsp_buff) 1379 { 1380 MTLOG_ERROR("no more memory, error"); 1381 _errno = RC_MEM_ERROR; 1382 return -3; 1383 } 1384 1385 // 4. �Ѿ������������Ϣ, �ȴ������հ� 1386 if (_rsp_buff->data_len >= _recv_cache.len) 1387 { 1388 MTLOG_DEBUG("maybe need wait more data, now %u", _recv_cache.len); 1389 return 0; 1390 } 1391 1392 // 5. ������������, ���Խ��� 1393 continue; 1394 } 1395 1396 // �����쳣����, ����ʵ��δ������, �����ȴ� 1397 if (ret > (int32_t)_recv_cache.len) 1398 { 1399 MTLOG_DEBUG("maybe pkg not all ok, wait more"); 1400 return 0; 1401 } 1402 1403 // ��ѯ��session�Ķ��� 1404 CNetHandler* session = this->FindSession(sid); 1405 if (NULL == session) 1406 { 1407 MTLOG_DEBUG("session id %llu, find failed, maybe timeout", sid); 1408 cache_skip_data(&_recv_cache, ret); 1409 delete_sk_buffer(_rsp_buff); 1410 _rsp_buff = NULL; 1411 } 1412 else 1413 { 1414 MTLOG_DEBUG("session id %llu, find ok, wakeup it", sid); 1415 cache_skip_data(&_recv_cache, ret); 1416 this->NotifyThread(session, 0); 1417 session->SwitchToIdle(); 1418 _rsp_buff->data_len = ret; //TCPʵ����Ч�ı��ij��ȱ�� 1419 session->SetRespBuff(_rsp_buff); 1420 _rsp_buff = NULL; 1421 } 1422 } 1423 1424 return 0; 1425 1426 } 1427 1428 1429 // UDP����������������ַ� 1430 int32_t CSockLink::DispathUdp() 1431 { 1432 // 1. UDP���ݴ���, ������Ҳ���Զ��������еı��� 1433 CHECK_SESSION_CALLBACK check_session = NULL; 1434 CNetHandler* item = TAILQ_FIRST(&_wait_recv); 1435 if (NULL == item) 1436 { 1437 MTLOG_DEBUG("recv data with no wait item, maybe wrong pkg recv"); 1438 // �˴����˳�, ��Ϊ���Զ���UDPӦ��(��ʱ��\������) 1439 } 1440 else 1441 { 1442 check_session = item->GetSessionCallback(); 1443 if (NULL == check_session) 1444 { 1445 MTLOG_TRACE("recv data with no session callback, err"); 1446 } 1447 } 1448 1449 // 2. ����ÿ����, �ҵ�session�ʹ���, ������ 1450 uint64_t sid = 0; 1451 uint32_t need_len = 0; 1452 int32_t ret = 0; 1453 TSkBuffer* block = NULL; 1454 while ((block = TAILQ_FIRST(&_recv_cache.list)) != NULL) 1455 { 1456 if (check_session == NULL) 1457 { 1458 MTLOG_DEBUG("no recv wait, skip first block"); 1459 cache_skip_data(&_recv_cache, block->data_len); 1460 continue; 1461 } 1462 1463 need_len = 0; 1464 ret = check_session(block->data, block->data_len, &sid, &need_len); 1465 if ((ret <= 0) || (ret > (int32_t)block->data_len)) 1466 { 1467 MTLOG_DEBUG("maybe wrong pkg come, skip it"); 1468 cache_skip_data(&_recv_cache, block->data_len); 1469 continue; 1470 } 1471 1472 // ��ѯ��session�Ķ��� 1473 CNetHandler* session = this->FindSession(sid); 1474 if (NULL == session) 1475 { 1476 MTLOG_DEBUG("session id %llu, find failed, maybe timeout", sid); 1477 cache_skip_data(&_recv_cache, block->data_len); 1478 } 1479 else 1480 { 1481 MTLOG_DEBUG("session id %llu, find ok, wakeup it", sid); 1482 this->NotifyThread(session, 0); 1483 session->SwitchToIdle(); 1484 cache_skip_first_buffer(&_recv_cache); 1485 session->SetRespBuff(block); 1486 } 1487 } 1488 1489 return 0; 1490 } 1491 1492 1493 // ��ѯ����sessionid������session��Ϣ 1494 CNetHandler* CSockLink::FindSession(uint64_t sid) 1495 { 1496 CNetHandler key; 1497 CDestLinks* dstlink = (CDestLinks*)_parents; 1498 if (NULL == dstlink) 1499 { 1500 MTLOG_ERROR("session dest link invalid, maybe error"); 1501 return NULL; 1502 } 1503 struct sockaddr_in addr; 1504 key.SetDestAddress(this->GetDestAddr(&addr)); 1505 key.SetConnType(dstlink->GetConnType()); 1506 key.SetProtoType(dstlink->GetProtoType()); 1507 key.SetSessionId(sid); 1508 1509 return CNetMgr::Instance()->FindNetItem(&key); 1510 } 1511 1512 1513 1514 /** 1515 * @brief �ɶ��¼�֪ͨ�ӿ�, ����֪ͨ������ܻ��ƻ�����, ���÷���ֵ���� 1516 * @return 0 ��fd�ɼ������������¼�; !=0 ��fd�������ص����� 1517 */ 1518 int CSockLink::InputNotify() 1519 { 1520 int32_t ret = 0; 1521 1522 this->_last_access = mt_time_ms(); 1523 1524 // 1. ��������, �������� 1525 if (_proto_type == NET_PROTO_UDP) 1526 { 1527 ret = cache_udp_recv(&_recv_cache, _fd, NULL); 1528 } 1529 else 1530 { 1531 ret = cache_tcp_recv(&_recv_cache, _fd); 1532 } 1533 1534 // 2. ����ʧ������������ 1535 if (ret < 0) 1536 { 1537 if (ret == -SK_ERR_NEED_CLOSE) 1538 { 1539 MTLOG_DEBUG("recv on link failed, remote close"); 1540 _errno = RC_REMOTE_CLOSED; 1541 } 1542 else 1543 { 1544 MTLOG_ERROR("recv on link failed, close it, ret %d[%m]", ret); 1545 _errno = RC_RECV_FAIL; 1546 } 1547 1548 this->Destroy(); 1549 return -1; 1550 } 1551 1552 // 3. �ַ�����, ������TCP 1553 ret = this->RecvDispath(); 1554 if (ret < 0) 1555 { 1556 MTLOG_DEBUG("recv dispath failed, close it, ret %d[%m]", ret); 1557 this->Destroy(); 1558 return -2; 1559 } 1560 1561 // 4. �ɹ����� 1562 return 0; 1563 1564 } 1565 1566 /** 1567 * @brief ��д�¼�֪ͨ�ӿ�, ����֪ͨ������ܻ��ƻ�����, ���÷���ֵ���� 1568 * @return 0 ��fd�ɼ������������¼�; !=0 ��fd�������ص����� 1569 */ 1570 int CSockLink::OutputNotify() 1571 { 1572 int32_t ret = 0; 1573 1574 this->_last_access = mt_time_ms(); 1575 1576 // 1. ���ӵȴ�����������л� 1577 if (_state & LINK_CONNECTING) 1578 { 1579 _state &= ~LINK_CONNECTING; 1580 _state |= LINK_CONNECTED; 1581 1582 CNetHandler* item = NULL; 1583 CNetHandler* tmp = NULL; 1584 TAILQ_FOREACH_SAFE(item, &_wait_connect, _link_entry, tmp) 1585 { 1586 NotifyThread(item, 0); 1587 item->SwitchToIdle(); 1588 } 1589 } 1590 1591 // 2. ���Է������� 1592 if (_proto_type == NET_PROTO_UDP) 1593 { 1594 ret = SendCacheUdp(NULL, 0); 1595 } 1596 else 1597 { 1598 ret = SendCacheTcp(NULL, 0); 1599 } 1600 1601 // 3. ����ʧ������������ 1602 if (ret < 0) 1603 { 1604 MTLOG_ERROR("Send on link failed, close it, ret %d[%m]", ret); 1605 _errno = RC_SEND_FAIL; 1606 this->Destroy(); 1607 return ret; 1608 } 1609 1610 // 4. ��������, �����������epoll 1611 if (TAILQ_EMPTY(&_wait_send)) 1612 { 1613 this->DisableOutput(); 1614 if (!MtFrame::Instance()->KqueueCtrlDel(_fd, KQ_EVENT_WRITE)) 1615 { 1616 MTLOG_ERROR("socket epoll mng failed[%m], wait timeout"); 1617 } 1618 } 1619 1620 return 0; 1621 } 1622 1623 1624 /** 1625 * @brief �쳣֪ͨ�ӿ� 1626 * @return ���Է���ֵ, ���������¼����� 1627 */ 1628 int CSockLink::HangupNotify() 1629 { 1630 MTLOG_ERROR("socket epoll error, fd %d", _fd); 1631 1632 this->_errno = RC_KQUEUE_ERROR; 1633 this->Destroy(); 1634 return -1; 1635 } 1636 1637 1638 // ���캯�� 1639 CDestLinks::CDestLinks() 1640 { 1641 _timeout = 5*60*1000; // Ĭ��5���� 1642 _addr_ipv4 = 0; 1643 _net_port = 0; 1644 _proto_type = NET_PROTO_UNDEF; 1645 _conn_type = TYPE_CONN_SESSION; 1646 1647 _max_links = 3; // Ĭ��3�� 1648 _curr_link = 0; 1649 _dflt_callback = NULL; 1650 1651 TAILQ_INIT(&_sock_list); 1652 } 1653 1654 1655 // ���ø��õĽӿں��� 1656 void CDestLinks::Reset() 1657 { 1658 // �������Ӷ��� 1659 CSockLink* item = NULL; 1660 CSockLink* temp = NULL; 1661 TAILQ_FOREACH_SAFE(item, &_sock_list, _link_entry, temp) 1662 { 1663 item->Destroy(); 1664 } 1665 TAILQ_INIT(&_sock_list); 1666 1667 // ��ʱ��ɾ�� 1668 CTimerMng* timer = MtFrame::Instance()->GetTimerMng(); 1669 if (NULL != timer) 1670 { 1671 timer->stop_timer(this); 1672 } 1673 1674 // ����Ĭ���ֶ���Ϣ 1675 _timeout = 5*60*1000; // Ĭ��5���� 1676 _addr_ipv4 = 0; 1677 _net_port = 0; 1678 _proto_type = NET_PROTO_UNDEF; 1679 _conn_type = TYPE_CONN_SESSION; 1680 1681 _max_links = 3; // Ĭ��3�� 1682 _curr_link = 0; 1683 } 1684 1685 // ���캯�� 1686 CDestLinks::~CDestLinks() 1687 { 1688 this->Reset(); 1689 } 1690 1691 // ������ʱ�� 1692 void CDestLinks::StartTimer() 1693 { 1694 CTimerMng* timer = MtFrame::Instance()->GetTimerMng(); 1695 if ((NULL == timer) || !timer->start_timer(this, 60*1000)) 1696 { 1697 MTLOG_ERROR("obj %p attach timer failed, error", this); 1698 } 1699 } 1700 1701 1702 // �ͷ�һ������link 1703 void CDestLinks::FreeSockLink(CSockLink* sock) 1704 { 1705 if ((sock == NULL) || (sock->GetParentsPtr() != (void*)this)) 1706 { 1707 MTLOG_ERROR("invalid socklink %p, error", sock); 1708 return; 1709 } 1710 1711 TAILQ_REMOVE(&_sock_list, sock, _link_entry); 1712 if (this->_curr_link > 0) { 1713 this->_curr_link--; 1714 } 1715 1716 sock->Reset(); 1717 CNetMgr::Instance()->FreeSockLink(sock); 1718 } 1719 1720 1721 // ��ȡһ������link, ĿǰΪ��ѯ 1722 CSockLink* CDestLinks::GetSockLink() 1723 { 1724 CSockLink* link = NULL; 1725 if (_curr_link < _max_links) 1726 { 1727 link = CNetMgr::Instance()->AllocSockLink(); 1728 if (NULL == link) 1729 { 1730 MTLOG_ERROR("alloc sock link failed, error"); 1731 return NULL; 1732 } 1733 link->SetParentsPtr(this); 1734 link->SetProtoType(_proto_type); 1735 TAILQ_INSERT_TAIL(&_sock_list, link, _link_entry); 1736 _curr_link++; 1737 } 1738 else 1739 { 1740 link = TAILQ_FIRST(&_sock_list); 1741 TAILQ_REMOVE(&_sock_list, link, _link_entry); 1742 TAILQ_INSERT_TAIL(&_sock_list, link, _link_entry); 1743 } 1744 1745 return link; 1746 } 1747 1748 /** 1749 * @brief ��ʱ֪ͨ����, ��������·, ���������·���� 1750 */ 1751 void CDestLinks::timer_notify() 1752 { 1753 // 1. ����Ƿ��п��е���·, ɾ���� 1754 uint64_t now = mt_time_ms(); 1755 CSockLink* item = NULL; 1756 CSockLink* temp = NULL; 1757 TAILQ_FOREACH_SAFE(item, &_sock_list, _link_entry, temp) 1758 { 1759 if ((item->GetLastAccess() + this->_timeout) < now) 1760 { 1761 MTLOG_DEBUG("link timeout, last[%llu], now [%llu]", item->GetLastAccess(), now); 1762 item->Destroy(); 1763 } 1764 } 1765 1766 // 2. ����Ƿ�����Ч����·, û����ע��Ŀ���� 1767 item = TAILQ_FIRST(&_sock_list); 1768 if (NULL == item) 1769 { 1770 MTLOG_DEBUG("dest links timeout, now [%llu]", now); 1771 CNetMgr::Instance()->DeleteDestLink(this); 1772 return; 1773 } 1774 1775 // 3. ���¼��붨ʱ������ 1776 this->StartTimer(); 1777 1778 return; 1779 } 1780 1781 1782 /** 1783 * @brief sessionȫ�ֹ����� 1784 * @return ȫ�־��ָ�� 1785 */ 1786 CNetMgr* CNetMgr::_instance = NULL; 1787 CNetMgr* CNetMgr::Instance (void) 1788 { 1789 if (NULL == _instance) 1790 { 1791 _instance = new CNetMgr(); 1792 } 1793 1794 return _instance; 1795 } 1796 1797 /** 1798 * @brief session����ȫ�ֵ����ٽӿ� 1799 */ 1800 void CNetMgr::Destroy() 1801 { 1802 if( _instance != NULL ) 1803 { 1804 delete _instance; 1805 _instance = NULL; 1806 } 1807 } 1808 1809 // ��ѯ�Ƿ��Ѿ�����ͬһ��sid�Ķ��� 1810 CNetHandler* CNetMgr::FindNetItem(CNetHandler* key) 1811 { 1812 if (NULL == this->_session_hash) 1813 { 1814 return NULL; 1815 } 1816 1817 return (CNetHandler*)_session_hash->HashFind(key); 1818 } 1819 1820 // ע��һ��item, �Ȳ�ѯ�����, ��֤��ͻ 1821 void CNetMgr::InsertNetItem(CNetHandler* item) 1822 { 1823 if (NULL == this->_session_hash) 1824 { 1825 return; 1826 } 1827 1828 int32_t ret = _session_hash->HashInsert(item); 1829 if (ret < 0) 1830 { 1831 MTLOG_ERROR("session insert failed, ret %d", ret); 1832 } 1833 1834 return; 1835 } 1836 1837 // �Ƴ�һ��item���� 1838 void CNetMgr::RemoveNetItem(CNetHandler* item) 1839 { 1840 CNetHandler* handler = this->FindNetItem(item); 1841 if (NULL == handler) 1842 { 1843 return; 1844 } 1845 1846 _session_hash->HashRemove(handler); 1847 } 1848 1849 // ��ѯ�Ƿ��Ѿ�����ͬһ��sid�Ķ��� 1850 CDestLinks* CNetMgr::FindDestLink(CDestLinks* key) 1851 { 1852 if (NULL == this->_ip_hash) 1853 { 1854 return NULL; 1855 } 1856 1857 return (CDestLinks*)_ip_hash->HashFind(key); 1858 } 1859 1860 // ע��һ��item, �Ȳ�ѯ�����, ��֤��ͻ 1861 void CNetMgr::InsertDestLink(CDestLinks* item) 1862 { 1863 if (NULL == this->_ip_hash) 1864 { 1865 return; 1866 } 1867 1868 int32_t ret = _ip_hash->HashInsert(item); 1869 if (ret < 0) 1870 { 1871 MTLOG_ERROR("ip dest insert failed, ret %d", ret); 1872 } 1873 1874 return; 1875 } 1876 1877 // �Ƴ�һ��item���� 1878 void CNetMgr::RemoveDestLink(CDestLinks* item) 1879 { 1880 CDestLinks* handler = this->FindDestLink(item); 1881 if (NULL == handler) 1882 { 1883 return; 1884 } 1885 1886 _ip_hash->HashRemove(handler); 1887 } 1888 1889 1890 // ��ѯ��һ��Ŀ��ip��links�ڵ� 1891 CDestLinks* CNetMgr::FindCreateDest(CDestLinks* key) 1892 { 1893 CDestLinks* dest = this->FindDestLink(key); 1894 if (dest != NULL) 1895 { 1896 MTLOG_DEBUG("dest links reuse ok"); 1897 return dest; 1898 } 1899 1900 dest = this->AllocDestLink(); 1901 if (NULL == dest) 1902 { 1903 MTLOG_ERROR("dest links alloc failed, log it"); 1904 return NULL; 1905 } 1906 1907 dest->CopyKeyInfo(key); 1908 dest->StartTimer(); 1909 this->InsertDestLink(dest); 1910 1911 return dest; 1912 } 1913 1914 1915 // ��ѯ��һ��Ŀ��ip��links�ڵ� 1916 void CNetMgr::DeleteDestLink(CDestLinks* dst) 1917 { 1918 this->RemoveDestLink(dst); 1919 dst->Reset(); // ֱ��freeǰ���� reset 1920 this->FreeDestLink(dst); 1921 } 1922 1923 1924 // ���캯��ʵ�� 1925 CNetMgr::CNetMgr() 1926 { 1927 sk_buffer_mng_init(&_tcp_pool, 60, 4096); 1928 sk_buffer_mng_init(&_udp_pool, 60, SK_DFLT_BUFF_SIZE); 1929 1930 _ip_hash = new HashList(100000); 1931 _session_hash = new HashList(100000); 1932 } 1933 1934 1935 // ��������ʵ�� 1936 CNetMgr::~CNetMgr() 1937 { 1938 // �������е�dest��Դ 1939 if (_ip_hash != NULL) 1940 { 1941 HashKey* hash_item = _ip_hash->HashGetFirst(); 1942 while (hash_item) 1943 { 1944 delete hash_item; 1945 hash_item = _ip_hash->HashGetFirst(); 1946 } 1947 1948 delete _ip_hash; 1949 _ip_hash = NULL; 1950 } 1951 1952 // �������е� netitem ��Դ 1953 if (_session_hash != NULL) 1954 { 1955 HashKey* hash_item = _session_hash->HashGetFirst(); 1956 while (hash_item) 1957 { 1958 delete hash_item; 1959 hash_item = _session_hash->HashGetFirst(); 1960 } 1961 1962 delete _session_hash; 1963 _session_hash = NULL; 1964 } 1965 1966 // ����buff����Դ 1967 sk_buffer_mng_destroy(&_tcp_pool); 1968 sk_buffer_mng_destroy(&_udp_pool); 1969 } 1970 1971 1972 /** 1973 * @brief ������Դ��Ϣ 1974 */ 1975 void CNetMgr::RecycleObjs(uint64_t now) 1976 { 1977 uint32_t now_s = (uint32_t)(now / 1000); 1978 1979 recycle_sk_buffer(&_udp_pool, now_s); 1980 recycle_sk_buffer(&_tcp_pool, now_s); 1981 1982 _net_item_pool.RecycleItem(now); 1983 _sock_link_pool.RecycleItem(now); 1984 _dest_ip_pool.RecycleItem(now); 1985 } 1986 1987 1988