1 2 /** 3 * Tencent is pleased to support the open source community by making MSEC available. 4 * 5 * Copyright (C) 2016 THL A29 Limited, a Tencent company. All rights reserved. 6 * 7 * Licensed under the GNU General Public License, Version 2.0 (the "License"); 8 * you may not use this file except in compliance with the License. You may 9 * obtain a copy of the License at 10 * 11 * https://opensource.org/licenses/GPL-2.0 12 * 13 * Unless required by applicable law or agreed to in writing, software distributed under the 14 * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, 15 * either express or implied. See the License for the specific language governing permissions 16 * and limitations under the License. 17 */ 18 19 20 /** 21 * @file mt_connection.cpp 22 * @time 20130924 23 **/ 24 #include <fcntl.h> 25 #include <sys/types.h> 26 #include <sys/socket.h> 27 #include <netinet/in.h> 28 #include <arpa/inet.h> 29 30 #include "micro_thread.h" 31 #include "mt_msg.h" 32 #include "mt_notify.h" 33 #include "mt_connection.h" 34 #include "mt_sys_hook.h" 35 #include "ff_hook.h" 36 37 using namespace std; 38 using namespace NS_MICRO_THREAD; 39 40 IMtConnection::IMtConnection() 41 { 42 _type = OBJ_CONN_UNDEF; 43 _action = NULL; 44 _ntfy_obj = NULL; 45 _msg_buff = NULL; 46 } 47 IMtConnection::~IMtConnection() 48 { 49 if (_ntfy_obj) { 50 NtfyObjMgr::Instance()->FreeNtfyObj(_ntfy_obj); 51 _ntfy_obj = NULL; 52 } 53 54 if (_msg_buff) { 55 MsgBuffPool::Instance()->FreeMsgBuf(_msg_buff); 56 _msg_buff = NULL; 57 } 58 } 59 60 void IMtConnection::Reset() 61 { 62 if (_ntfy_obj) { 63 NtfyObjMgr::Instance()->FreeNtfyObj(_ntfy_obj); 64 _ntfy_obj = NULL; 65 } 66 67 if (_msg_buff) { 68 MsgBuffPool::Instance()->FreeMsgBuf(_msg_buff); 69 _msg_buff = NULL; 70 } 71 72 _action = NULL; 73 _ntfy_obj = NULL; 74 _msg_buff = NULL; 75 } 76 77 int UdpShortConn::CreateSocket() 78 { 79 _osfd = socket(AF_INET, SOCK_DGRAM, 0); 80 if (_osfd < 0) 81 { 82 MTLOG_ERROR("socket create failed, errno %d(%s)", errno, strerror(errno)); 83 return -1; 84 } 85 86 int flags = 1; 87 if (ioctl(_osfd, FIONBIO, &flags) < 0) 88 { 89 MTLOG_ERROR("socket unblock failed, errno %d(%s)", errno, strerror(errno)); 90 close(_osfd); 91 _osfd = -1; 92 return -2; 93 } 94 95 if (_ntfy_obj) { 96 _ntfy_obj->SetOsfd(_osfd); 97 } 98 99 return _osfd; 100 } 101 102 int UdpShortConn::CloseSocket() 103 { 104 if (_osfd < 0) 105 { 106 return 0; 107 } 108 109 close(_osfd); 110 _osfd = -1; 111 112 return 0; 113 } 114 115 int UdpShortConn::SendData() 116 { 117 if (!_action || !_msg_buff) { 118 MTLOG_ERROR("conn not set action %p, or msg %p, error", _action, _msg_buff); 119 return -100; 120 } 121 122 mt_hook_syscall(sendto); 123 int ret = ff_hook_sendto(_osfd, _msg_buff->GetMsgBuff(), _msg_buff->GetMsgLen(), 0, 124 (struct sockaddr*)_action->GetMsgDstAddr(), sizeof(struct sockaddr_in)); 125 if (ret == -1) 126 { 127 if ((errno == EINTR) || (errno == EAGAIN) || (errno == EINPROGRESS)) 128 { 129 return 0; 130 } 131 else 132 { 133 MTLOG_ERROR("socket send failed, fd %d, errno %d(%s)", _osfd, 134 errno, strerror(errno)); 135 return -2; 136 } 137 } 138 else 139 { 140 _msg_buff->SetHaveSndLen(ret); 141 return ret; 142 } 143 } 144 145 int UdpShortConn::RecvData() 146 { 147 if (!_action || !_msg_buff) { 148 MTLOG_ERROR("conn not set action %p, or msg %p, error", _action, _msg_buff); 149 return -100; 150 } 151 152 struct sockaddr_in from; 153 socklen_t fromlen = sizeof(from); 154 mt_hook_syscall(recvfrom); 155 int ret = ff_hook_recvfrom(_osfd, _msg_buff->GetMsgBuff(), _msg_buff->GetMaxLen(), 156 0, (struct sockaddr*)&from, &fromlen); 157 if (ret < 0) 158 { 159 if ((errno == EINTR) || (errno == EAGAIN) || (errno == EINPROGRESS)) 160 { 161 return 0; 162 } 163 else 164 { 165 MTLOG_ERROR("socket recv failed, fd %d, errno %d(%s)", _osfd, 166 errno, strerror(errno)); 167 return -2; 168 } 169 } 170 else if (ret == 0) 171 { 172 return -1; 173 } 174 else 175 { 176 _msg_buff->SetHaveRcvLen(ret); 177 } 178 179 ret = _action->DoInput(); 180 if (ret > 0) 181 { 182 _msg_buff->SetMsgLen(ret); 183 return ret; 184 } 185 else if (ret == 0) 186 { 187 return 0; 188 } 189 else if (ret == -65535) 190 { 191 _msg_buff->SetHaveRcvLen(0); 192 return 0; 193 } 194 else 195 { 196 return -1; 197 } 198 } 199 200 void UdpShortConn::Reset() 201 { 202 CloseSocket(); 203 this->IMtConnection::Reset(); 204 } 205 206 int TcpKeepConn::OpenCnnect() 207 { 208 if (!_action || !_msg_buff) { 209 MTLOG_ERROR("conn not set action %p, or msg %p, error", _action, _msg_buff); 210 return -100; 211 } 212 213 int err = 0; 214 mt_hook_syscall(connect); 215 int ret = ff_hook_connect(_osfd, (struct sockaddr*)_action->GetMsgDstAddr(), sizeof(struct sockaddr_in)); 216 if (ret < 0) 217 { 218 err = errno; 219 if (err == EISCONN) 220 { 221 return 0; 222 } 223 else 224 { 225 if ((err == EINPROGRESS) || (err == EALREADY) || (err == EINTR)) 226 { 227 MTLOG_DEBUG("Open connect not ok, maybe first try, sock %d, errno %d", _osfd, err); 228 return -1; 229 } 230 else 231 { 232 MTLOG_ERROR("Open connect not ok, sock %d, errno %d", _osfd, err); 233 return -2; 234 } 235 } 236 } 237 else 238 { 239 return 0; 240 } 241 } 242 243 int TcpKeepConn::CreateSocket() 244 { 245 if (_osfd > 0) 246 { 247 if (_ntfy_obj) { 248 _ntfy_obj->SetOsfd(_osfd); 249 } 250 251 return _osfd; 252 } 253 254 _osfd = socket(AF_INET, SOCK_STREAM, 0); 255 if (_osfd < 0) 256 { 257 MTLOG_ERROR("create tcp socket failed, error: %d", errno); 258 return -1; 259 } 260 261 int flags = 1; 262 if (ioctl(_osfd, FIONBIO, &flags) < 0) 263 { 264 MTLOG_ERROR("set tcp socket unblock failed, error: %d", errno); 265 close(_osfd); 266 _osfd = -1; 267 return -2; 268 } 269 270 _keep_ntfy.SetOsfd(_osfd); 271 _keep_ntfy.DisableOutput(); 272 _keep_ntfy.EnableInput(); 273 274 if (_ntfy_obj) { 275 _ntfy_obj->SetOsfd(_osfd); 276 } 277 278 return _osfd; 279 } 280 281 int TcpKeepConn::SendData() 282 { 283 if (!_action || !_msg_buff) { 284 MTLOG_ERROR("conn not set action %p, or msg %p, error", _action, _msg_buff); 285 return -100; 286 } 287 288 char* msg_ptr = (char*)_msg_buff->GetMsgBuff(); 289 int msg_len = _msg_buff->GetMsgLen(); 290 int have_send_len = _msg_buff->GetHaveSndLen(); 291 mt_hook_syscall(send); 292 int ret = ff_hook_send(_osfd, msg_ptr + have_send_len, msg_len - have_send_len, 0); 293 if (ret == -1) 294 { 295 if ((errno == EINTR) || (errno == EAGAIN) || (errno == EINPROGRESS)) 296 { 297 return 0; 298 } 299 else 300 { 301 MTLOG_ERROR("send tcp socket failed, error: %d", errno); 302 return -1; 303 } 304 } 305 else 306 { 307 have_send_len += ret; 308 _msg_buff->SetHaveSndLen(have_send_len); 309 } 310 311 if (have_send_len >= msg_len) 312 { 313 return msg_len; 314 } 315 else 316 { 317 return 0; 318 } 319 } 320 321 int TcpKeepConn::RecvData() 322 { 323 if (!_action || !_msg_buff) { 324 MTLOG_ERROR("conn not set action %p, or msg %p, error", _action, _msg_buff); 325 return -100; 326 } 327 328 char* msg_ptr = (char*)_msg_buff->GetMsgBuff(); 329 int max_len = _msg_buff->GetMaxLen(); 330 int have_rcv_len = _msg_buff->GetHaveRcvLen(); 331 mt_hook_syscall(recv); 332 int ret = ff_hook_recv(_osfd, (char*)msg_ptr + have_rcv_len, max_len - have_rcv_len, 0); 333 if (ret < 0) 334 { 335 if ((errno == EINTR) || (errno == EAGAIN) || (errno == EINPROGRESS)) 336 { 337 return 0; 338 } 339 else 340 { 341 MTLOG_ERROR("recv tcp socket failed, error: %d", errno); 342 return -2; 343 } 344 } 345 else if (ret == 0) 346 { 347 MTLOG_ERROR("tcp remote close, address: %s[%d]", 348 inet_ntoa(_dst_addr.sin_addr), ntohs(_dst_addr.sin_port)); 349 return -1; 350 } 351 else 352 { 353 have_rcv_len += ret; 354 _msg_buff->SetHaveRcvLen(have_rcv_len); 355 } 356 357 ret = _action->DoInput(); 358 if (ret > 0) 359 { 360 _msg_buff->SetMsgLen(have_rcv_len); 361 return ret; 362 } 363 else if (ret == 0) 364 { 365 return 0; 366 } 367 else 368 { 369 return -1; 370 } 371 } 372 373 int TcpKeepConn::CloseSocket() 374 { 375 if (_osfd < 0) 376 { 377 return 0; 378 } 379 _keep_ntfy.SetOsfd(-1); 380 381 close(_osfd); 382 _osfd = -1; 383 384 return 0; 385 } 386 387 void TcpKeepConn::Reset() 388 { 389 memset(&_dst_addr, 0 ,sizeof(_dst_addr)); 390 CloseSocket(); 391 this->IMtConnection::Reset(); 392 } 393 394 void TcpKeepConn::ConnReuseClean() 395 { 396 this->IMtConnection::Reset(); 397 } 398 399 bool TcpKeepConn::IdleAttach() 400 { 401 if (_osfd < 0) { 402 MTLOG_ERROR("obj %p attach failed, fd %d error", this, _osfd); 403 return false; 404 } 405 406 if (_keep_flag & TCP_KEEP_IN_KQUEUE) { 407 MTLOG_ERROR("obj %p repeat attach, error", this); 408 return true; 409 } 410 411 _keep_ntfy.DisableOutput(); 412 _keep_ntfy.EnableInput(); 413 414 CTimerMng* timer = MtFrame::Instance()->GetTimerMng(); 415 if ((NULL == timer) || !timer->start_timer(this, _keep_time)) 416 { 417 MTLOG_ERROR("obj %p attach timer failed, error", this); 418 return false; 419 } 420 421 if (MtFrame::Instance()->KqueueAddObj(&_keep_ntfy)) 422 { 423 _keep_flag |= TCP_KEEP_IN_KQUEUE; 424 return true; 425 } 426 else 427 { 428 MTLOG_ERROR("obj %p attach failed, error", this); 429 return false; 430 } 431 } 432 433 bool TcpKeepConn::IdleDetach() 434 { 435 if (_osfd < 0) { 436 MTLOG_ERROR("obj %p detach failed, fd %d error", this, _osfd); 437 return false; 438 } 439 440 if (!(_keep_flag & TCP_KEEP_IN_KQUEUE)) { 441 MTLOG_DEBUG("obj %p repeat detach, error", this); 442 return true; 443 } 444 445 _keep_ntfy.DisableOutput(); 446 _keep_ntfy.EnableInput(); 447 448 CTimerMng* timer = MtFrame::Instance()->GetTimerMng(); 449 if (NULL != timer) 450 { 451 timer->stop_timer(this); 452 } 453 454 if (MtFrame::Instance()->KqueueDelObj(&_keep_ntfy)) 455 { 456 _keep_flag &= ~TCP_KEEP_IN_KQUEUE; 457 return true; 458 } 459 else 460 { 461 MTLOG_ERROR("obj %p detach failed, error", this); 462 return false; 463 } 464 } 465 466 void TcpKeepConn::timer_notify() 467 { 468 MTLOG_DEBUG("keep timeout[%u], fd %d, close connection", _keep_time, _osfd); 469 ConnectionMgr::Instance()->CloseIdleTcpKeep(this); 470 } 471 472 TcpKeepMgr::TcpKeepMgr() 473 { 474 _keep_hash = new HashList(10000); 475 } 476 477 TcpKeepMgr::~TcpKeepMgr() 478 { 479 if (!_keep_hash) { 480 return; 481 } 482 483 HashKey* hash_item = _keep_hash->HashGetFirst(); 484 while (hash_item) 485 { 486 delete hash_item; 487 hash_item = _keep_hash->HashGetFirst(); 488 } 489 490 delete _keep_hash; 491 _keep_hash = NULL; 492 } 493 494 TcpKeepConn* TcpKeepMgr::GetTcpKeepConn(struct sockaddr_in* dst) 495 { 496 TcpKeepConn* conn = NULL; 497 if (NULL == dst) 498 { 499 MTLOG_ERROR("input param dst null, error"); 500 return NULL; 501 } 502 503 TcpKeepKey key(dst); 504 TcpKeepKey* conn_list = (TcpKeepKey*)_keep_hash->HashFindData(&key); 505 if ((NULL == conn_list) || (NULL == conn_list->GetFirstConn())) 506 { 507 conn = _mem_queue.AllocPtr(); 508 if (conn) { 509 conn->SetDestAddr(dst); 510 } 511 } 512 else 513 { 514 conn = conn_list->GetFirstConn(); 515 conn_list->RemoveConn(conn); 516 conn->IdleDetach(); 517 } 518 519 return conn; 520 } 521 522 bool TcpKeepMgr::RemoveTcpKeepConn(TcpKeepConn* conn) 523 { 524 struct sockaddr_in* dst = conn->GetDestAddr(); 525 if ((dst->sin_addr.s_addr == 0) || (dst->sin_port == 0)) 526 { 527 MTLOG_ERROR("sock addr, invalid, %x:%d", dst->sin_addr.s_addr, dst->sin_port); 528 return false; 529 } 530 531 TcpKeepKey key(dst); 532 TcpKeepKey* conn_list = (TcpKeepKey*)_keep_hash->HashFindData(&key); 533 if (!conn_list) 534 { 535 MTLOG_ERROR("no conn cache list, invalid, %x:%d", dst->sin_addr.s_addr, dst->sin_port); 536 return false; 537 } 538 539 conn->IdleDetach(); 540 conn_list->RemoveConn(conn); 541 542 return true; 543 544 } 545 546 bool TcpKeepMgr::CacheTcpKeepConn(TcpKeepConn* conn) 547 { 548 struct sockaddr_in* dst = conn->GetDestAddr(); 549 if ((dst->sin_addr.s_addr == 0) || (dst->sin_port == 0)) 550 { 551 MTLOG_ERROR("sock addr, invalid, %x:%d", dst->sin_addr.s_addr, dst->sin_port); 552 return false; 553 } 554 555 TcpKeepKey key(dst); 556 TcpKeepKey* conn_list = (TcpKeepKey*)_keep_hash->HashFindData(&key); 557 if (!conn_list) 558 { 559 conn_list = new TcpKeepKey(conn->GetDestAddr()); 560 if (!conn_list) { 561 MTLOG_ERROR("new conn list failed, error"); 562 return false; 563 } 564 _keep_hash->HashInsert(conn_list); 565 } 566 567 if (!conn->IdleAttach()) 568 { 569 MTLOG_ERROR("conn IdleAttach failed, error"); 570 return false; 571 } 572 573 conn->ConnReuseClean(); 574 conn_list->InsertConn(conn); 575 576 577 return true; 578 579 } 580 581 void TcpKeepMgr::FreeTcpKeepConn(TcpKeepConn* conn, bool force_free) 582 { 583 if (force_free) 584 { 585 conn->Reset(); 586 _mem_queue.FreePtr(conn); 587 return; 588 } 589 else 590 { 591 if (!CacheTcpKeepConn(conn)) 592 { 593 conn->Reset(); 594 _mem_queue.FreePtr(conn); 595 return; 596 } 597 } 598 } 599 600 int UdpSessionConn::CreateSocket() 601 { 602 if (!_action || !_ntfy_obj) { 603 MTLOG_ERROR("conn not set action %p, or _ntfy_obj %p, error", _action, _ntfy_obj); 604 return -100; 605 } 606 SessionProxy* proxy = dynamic_cast<SessionProxy*>(_ntfy_obj); 607 if (!proxy) { 608 MTLOG_ERROR("ntfy obj not match, _ntfy_obj %p, error", _ntfy_obj); 609 return -200; 610 } 611 ISessionNtfy* real_ntfy = proxy->GetRealNtfyObj(); 612 if (!real_ntfy) { 613 MTLOG_ERROR("real ntfy obj not match, _ntfy_obj %p, error", _ntfy_obj); 614 return -300; 615 } 616 617 int osfd = real_ntfy->GetOsfd(); 618 if (osfd <= 0) 619 { 620 osfd = real_ntfy->CreateSocket(); 621 if (osfd <= 0) { 622 MTLOG_ERROR("real ntfy obj create fd failed, _ntfy_obj %p, error", real_ntfy); 623 return -400; 624 } 625 } 626 _ntfy_obj->SetOsfd(osfd); 627 628 return osfd; 629 } 630 631 int UdpSessionConn::CloseSocket() 632 { 633 return 0; 634 } 635 636 int UdpSessionConn::SendData() 637 { 638 if (!_action || !_msg_buff || !_ntfy_obj) { 639 MTLOG_ERROR("conn not set action %p, or msg %p, ntfy %p error", _action, _msg_buff, _ntfy_obj); 640 return -100; 641 } 642 643 mt_hook_syscall(sendto); 644 int ret = ff_hook_sendto(_ntfy_obj->GetOsfd(), _msg_buff->GetMsgBuff(), _msg_buff->GetMsgLen(), 0, 645 (struct sockaddr*)_action->GetMsgDstAddr(), sizeof(struct sockaddr_in)); 646 if (ret == -1) 647 { 648 if ((errno == EINTR) || (errno == EAGAIN) || (errno == EINPROGRESS)) 649 { 650 return 0; 651 } 652 else 653 { 654 MTLOG_ERROR("socket send failed, fd %d, errno %d(%s)", _ntfy_obj->GetOsfd(), 655 errno, strerror(errno)); 656 return -2; 657 } 658 } 659 else 660 { 661 _msg_buff->SetHaveSndLen(ret); 662 return ret; 663 } 664 } 665 666 int UdpSessionConn::RecvData() 667 { 668 if (!_ntfy_obj || !_msg_buff) { 669 MTLOG_ERROR("conn not set _ntfy_obj %p, or msg %p, error", _ntfy_obj, _msg_buff); 670 return -100; 671 } 672 673 if (_ntfy_obj->GetRcvEvents() <= 0) { 674 MTLOG_DEBUG("conn _ntfy_obj %p, no recv event, retry it", _ntfy_obj); 675 return 0; 676 } 677 678 int msg_len = _msg_buff->GetMsgLen(); 679 if (BUFF_RECV == _msg_buff->GetBuffType()) 680 { 681 return msg_len; 682 } 683 else 684 { 685 MTLOG_DEBUG("conn msg buff %p, no recv comm", _msg_buff); 686 return 0; 687 } 688 } 689 690 ConnectionMgr* ConnectionMgr::_instance = NULL; 691 ConnectionMgr* ConnectionMgr::Instance (void) 692 { 693 if (NULL == _instance) 694 { 695 _instance = new ConnectionMgr(); 696 } 697 698 return _instance; 699 } 700 701 void ConnectionMgr::Destroy() 702 { 703 if( _instance != NULL ) 704 { 705 delete _instance; 706 _instance = NULL; 707 } 708 } 709 710 ConnectionMgr::ConnectionMgr() 711 { 712 } 713 714 ConnectionMgr::~ConnectionMgr() 715 { 716 } 717 718 IMtConnection* ConnectionMgr::GetConnection(CONN_OBJ_TYPE type, struct sockaddr_in* dst) 719 { 720 switch (type) 721 { 722 case OBJ_SHORT_CONN: 723 return _udp_short_queue.AllocPtr(); 724 break; 725 726 case OBJ_TCP_KEEP: 727 return _tcp_keep_mgr.GetTcpKeepConn(dst); 728 break; 729 730 case OBJ_UDP_SESSION: 731 return _udp_session_queue.AllocPtr(); 732 break; 733 734 default: 735 return NULL; 736 break; 737 } 738 739 } 740 741 void ConnectionMgr::FreeConnection(IMtConnection* conn, bool force_free) 742 { 743 if (!conn) { 744 return; 745 } 746 CONN_OBJ_TYPE type = conn->GetConnType(); 747 748 switch (type) 749 { 750 case OBJ_SHORT_CONN: 751 conn->Reset(); 752 return _udp_short_queue.FreePtr(dynamic_cast<UdpShortConn*>(conn)); 753 break; 754 755 case OBJ_TCP_KEEP: 756 return _tcp_keep_mgr.FreeTcpKeepConn(dynamic_cast<TcpKeepConn*>(conn), force_free); 757 break; 758 759 case OBJ_UDP_SESSION: 760 conn->Reset(); 761 return _udp_session_queue.FreePtr(dynamic_cast<UdpSessionConn*>(conn)); 762 break; 763 764 default: 765 break; 766 } 767 768 delete conn; 769 return; 770 } 771 772 void ConnectionMgr::CloseIdleTcpKeep(TcpKeepConn* conn) 773 { 774 _tcp_keep_mgr.RemoveTcpKeepConn(conn); 775 _tcp_keep_mgr.FreeTcpKeepConn(conn, true); 776 } 777