1
2 /**
3 * Tencent is pleased to support the open source community by making MSEC available.
4 *
5 * Copyright (C) 2016 THL A29 Limited, a Tencent company. All rights reserved.
6 *
7 * Licensed under the GNU General Public License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License. You may
9 * obtain a copy of the License at
10 *
11 * https://opensource.org/licenses/GPL-2.0
12 *
13 * Unless required by applicable law or agreed to in writing, software distributed under the
14 * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
15 * either express or implied. See the License for the specific language governing permissions
16 * and limitations under the License.
17 */
18
19
20 /**
21 * @file mt_connection.cpp
22 * @time 20130924
23 **/
24 #include <fcntl.h>
25 #include <sys/types.h>
26 #include <sys/socket.h>
27 #include <netinet/in.h>
28 #include <arpa/inet.h>
29
30 #include "micro_thread.h"
31 #include "mt_msg.h"
32 #include "mt_notify.h"
33 #include "mt_connection.h"
34 #include "mt_sys_hook.h"
35 #include "ff_hook.h"
36
37 using namespace std;
38 using namespace NS_MICRO_THREAD;
39
IMtConnection()40 IMtConnection::IMtConnection()
41 {
42 _type = OBJ_CONN_UNDEF;
43 _action = NULL;
44 _ntfy_obj = NULL;
45 _msg_buff = NULL;
46 }
~IMtConnection()47 IMtConnection::~IMtConnection()
48 {
49 if (_ntfy_obj) {
50 NtfyObjMgr::Instance()->FreeNtfyObj(_ntfy_obj);
51 _ntfy_obj = NULL;
52 }
53
54 if (_msg_buff) {
55 MsgBuffPool::Instance()->FreeMsgBuf(_msg_buff);
56 _msg_buff = NULL;
57 }
58 }
59
Reset()60 void IMtConnection::Reset()
61 {
62 if (_ntfy_obj) {
63 NtfyObjMgr::Instance()->FreeNtfyObj(_ntfy_obj);
64 _ntfy_obj = NULL;
65 }
66
67 if (_msg_buff) {
68 MsgBuffPool::Instance()->FreeMsgBuf(_msg_buff);
69 _msg_buff = NULL;
70 }
71
72 _action = NULL;
73 _ntfy_obj = NULL;
74 _msg_buff = NULL;
75 }
76
CreateSocket()77 int UdpShortConn::CreateSocket()
78 {
79 _osfd = socket(AF_INET, SOCK_DGRAM, 0);
80 if (_osfd < 0)
81 {
82 MTLOG_ERROR("socket create failed, errno %d(%s)", errno, strerror(errno));
83 return -1;
84 }
85
86 int flags = 1;
87 if (ioctl(_osfd, FIONBIO, &flags) < 0)
88 {
89 MTLOG_ERROR("socket unblock failed, errno %d(%s)", errno, strerror(errno));
90 close(_osfd);
91 _osfd = -1;
92 return -2;
93 }
94
95 if (_ntfy_obj) {
96 _ntfy_obj->SetOsfd(_osfd);
97 }
98
99 return _osfd;
100 }
101
CloseSocket()102 int UdpShortConn::CloseSocket()
103 {
104 if (_osfd < 0)
105 {
106 return 0;
107 }
108
109 close(_osfd);
110 _osfd = -1;
111
112 return 0;
113 }
114
SendData()115 int UdpShortConn::SendData()
116 {
117 if (!_action || !_msg_buff) {
118 MTLOG_ERROR("conn not set action %p, or msg %p, error", _action, _msg_buff);
119 return -100;
120 }
121
122 mt_hook_syscall(sendto);
123 int ret = ff_hook_sendto(_osfd, _msg_buff->GetMsgBuff(), _msg_buff->GetMsgLen(), 0,
124 (struct sockaddr*)_action->GetMsgDstAddr(), sizeof(struct sockaddr_in));
125 if (ret == -1)
126 {
127 if ((errno == EINTR) || (errno == EAGAIN) || (errno == EINPROGRESS))
128 {
129 return 0;
130 }
131 else
132 {
133 MTLOG_ERROR("socket send failed, fd %d, errno %d(%s)", _osfd,
134 errno, strerror(errno));
135 return -2;
136 }
137 }
138 else
139 {
140 _msg_buff->SetHaveSndLen(ret);
141 return ret;
142 }
143 }
144
RecvData()145 int UdpShortConn::RecvData()
146 {
147 if (!_action || !_msg_buff) {
148 MTLOG_ERROR("conn not set action %p, or msg %p, error", _action, _msg_buff);
149 return -100;
150 }
151
152 struct sockaddr_in from;
153 socklen_t fromlen = sizeof(from);
154 mt_hook_syscall(recvfrom);
155 int ret = ff_hook_recvfrom(_osfd, _msg_buff->GetMsgBuff(), _msg_buff->GetMaxLen(),
156 0, (struct sockaddr*)&from, &fromlen);
157 if (ret < 0)
158 {
159 if ((errno == EINTR) || (errno == EAGAIN) || (errno == EINPROGRESS))
160 {
161 return 0;
162 }
163 else
164 {
165 MTLOG_ERROR("socket recv failed, fd %d, errno %d(%s)", _osfd,
166 errno, strerror(errno));
167 return -2;
168 }
169 }
170 else if (ret == 0)
171 {
172 return -1;
173 }
174 else
175 {
176 _msg_buff->SetHaveRcvLen(ret);
177 }
178
179 ret = _action->DoInput();
180 if (ret > 0)
181 {
182 _msg_buff->SetMsgLen(ret);
183 return ret;
184 }
185 else if (ret == 0)
186 {
187 return 0;
188 }
189 else if (ret == -65535)
190 {
191 _msg_buff->SetHaveRcvLen(0);
192 return 0;
193 }
194 else
195 {
196 return -1;
197 }
198 }
199
Reset()200 void UdpShortConn::Reset()
201 {
202 CloseSocket();
203 this->IMtConnection::Reset();
204 }
205
OpenCnnect()206 int TcpKeepConn::OpenCnnect()
207 {
208 if (!_action || !_msg_buff) {
209 MTLOG_ERROR("conn not set action %p, or msg %p, error", _action, _msg_buff);
210 return -100;
211 }
212
213 int err = 0;
214 mt_hook_syscall(connect);
215 int ret = ff_hook_connect(_osfd, (struct sockaddr*)_action->GetMsgDstAddr(), sizeof(struct sockaddr_in));
216 if (ret < 0)
217 {
218 err = errno;
219 if (err == EISCONN)
220 {
221 return 0;
222 }
223 else
224 {
225 if ((err == EINPROGRESS) || (err == EALREADY) || (err == EINTR))
226 {
227 MTLOG_DEBUG("Open connect not ok, maybe first try, sock %d, errno %d", _osfd, err);
228 return -1;
229 }
230 else
231 {
232 MTLOG_ERROR("Open connect not ok, sock %d, errno %d", _osfd, err);
233 return -2;
234 }
235 }
236 }
237 else
238 {
239 return 0;
240 }
241 }
242
CreateSocket()243 int TcpKeepConn::CreateSocket()
244 {
245 if (_osfd > 0)
246 {
247 if (_ntfy_obj) {
248 _ntfy_obj->SetOsfd(_osfd);
249 }
250
251 return _osfd;
252 }
253
254 _osfd = socket(AF_INET, SOCK_STREAM, 0);
255 if (_osfd < 0)
256 {
257 MTLOG_ERROR("create tcp socket failed, error: %d", errno);
258 return -1;
259 }
260
261 int flags = 1;
262 if (ioctl(_osfd, FIONBIO, &flags) < 0)
263 {
264 MTLOG_ERROR("set tcp socket unblock failed, error: %d", errno);
265 close(_osfd);
266 _osfd = -1;
267 return -2;
268 }
269
270 _keep_ntfy.SetOsfd(_osfd);
271 _keep_ntfy.DisableOutput();
272 _keep_ntfy.EnableInput();
273
274 if (_ntfy_obj) {
275 _ntfy_obj->SetOsfd(_osfd);
276 }
277
278 return _osfd;
279 }
280
SendData()281 int TcpKeepConn::SendData()
282 {
283 if (!_action || !_msg_buff) {
284 MTLOG_ERROR("conn not set action %p, or msg %p, error", _action, _msg_buff);
285 return -100;
286 }
287
288 char* msg_ptr = (char*)_msg_buff->GetMsgBuff();
289 int msg_len = _msg_buff->GetMsgLen();
290 int have_send_len = _msg_buff->GetHaveSndLen();
291 mt_hook_syscall(send);
292 int ret = ff_hook_send(_osfd, msg_ptr + have_send_len, msg_len - have_send_len, 0);
293 if (ret == -1)
294 {
295 if ((errno == EINTR) || (errno == EAGAIN) || (errno == EINPROGRESS))
296 {
297 return 0;
298 }
299 else
300 {
301 MTLOG_ERROR("send tcp socket failed, error: %d", errno);
302 return -1;
303 }
304 }
305 else
306 {
307 have_send_len += ret;
308 _msg_buff->SetHaveSndLen(have_send_len);
309 }
310
311 if (have_send_len >= msg_len)
312 {
313 return msg_len;
314 }
315 else
316 {
317 return 0;
318 }
319 }
320
RecvData()321 int TcpKeepConn::RecvData()
322 {
323 if (!_action || !_msg_buff) {
324 MTLOG_ERROR("conn not set action %p, or msg %p, error", _action, _msg_buff);
325 return -100;
326 }
327
328 char* msg_ptr = (char*)_msg_buff->GetMsgBuff();
329 int max_len = _msg_buff->GetMaxLen();
330 int have_rcv_len = _msg_buff->GetHaveRcvLen();
331 mt_hook_syscall(recv);
332 int ret = ff_hook_recv(_osfd, (char*)msg_ptr + have_rcv_len, max_len - have_rcv_len, 0);
333 if (ret < 0)
334 {
335 if ((errno == EINTR) || (errno == EAGAIN) || (errno == EINPROGRESS))
336 {
337 return 0;
338 }
339 else
340 {
341 MTLOG_ERROR("recv tcp socket failed, error: %d", errno);
342 return -2;
343 }
344 }
345 else if (ret == 0)
346 {
347 MTLOG_ERROR("tcp remote close, address: %s[%d]",
348 inet_ntoa(_dst_addr.sin_addr), ntohs(_dst_addr.sin_port));
349 return -1;
350 }
351 else
352 {
353 have_rcv_len += ret;
354 _msg_buff->SetHaveRcvLen(have_rcv_len);
355 }
356
357 ret = _action->DoInput();
358 if (ret > 0)
359 {
360 _msg_buff->SetMsgLen(have_rcv_len);
361 return ret;
362 }
363 else if (ret == 0)
364 {
365 return 0;
366 }
367 else
368 {
369 return -1;
370 }
371 }
372
CloseSocket()373 int TcpKeepConn::CloseSocket()
374 {
375 if (_osfd < 0)
376 {
377 return 0;
378 }
379 _keep_ntfy.SetOsfd(-1);
380
381 close(_osfd);
382 _osfd = -1;
383
384 return 0;
385 }
386
Reset()387 void TcpKeepConn::Reset()
388 {
389 memset(&_dst_addr, 0 ,sizeof(_dst_addr));
390 CloseSocket();
391 this->IMtConnection::Reset();
392 }
393
ConnReuseClean()394 void TcpKeepConn::ConnReuseClean()
395 {
396 this->IMtConnection::Reset();
397 }
398
IdleAttach()399 bool TcpKeepConn::IdleAttach()
400 {
401 if (_osfd < 0) {
402 MTLOG_ERROR("obj %p attach failed, fd %d error", this, _osfd);
403 return false;
404 }
405
406 if (_keep_flag & TCP_KEEP_IN_KQUEUE) {
407 MTLOG_ERROR("obj %p repeat attach, error", this);
408 return true;
409 }
410
411 _keep_ntfy.DisableOutput();
412 _keep_ntfy.EnableInput();
413
414 CTimerMng* timer = MtFrame::Instance()->GetTimerMng();
415 if ((NULL == timer) || !timer->start_timer(this, _keep_time))
416 {
417 MTLOG_ERROR("obj %p attach timer failed, error", this);
418 return false;
419 }
420
421 if (MtFrame::Instance()->KqueueAddObj(&_keep_ntfy))
422 {
423 _keep_flag |= TCP_KEEP_IN_KQUEUE;
424 return true;
425 }
426 else
427 {
428 MTLOG_ERROR("obj %p attach failed, error", this);
429 return false;
430 }
431 }
432
IdleDetach()433 bool TcpKeepConn::IdleDetach()
434 {
435 if (_osfd < 0) {
436 MTLOG_ERROR("obj %p detach failed, fd %d error", this, _osfd);
437 return false;
438 }
439
440 if (!(_keep_flag & TCP_KEEP_IN_KQUEUE)) {
441 MTLOG_DEBUG("obj %p repeat detach, error", this);
442 return true;
443 }
444
445 _keep_ntfy.DisableOutput();
446 _keep_ntfy.EnableInput();
447
448 CTimerMng* timer = MtFrame::Instance()->GetTimerMng();
449 if (NULL != timer)
450 {
451 timer->stop_timer(this);
452 }
453
454 if (MtFrame::Instance()->KqueueDelObj(&_keep_ntfy))
455 {
456 _keep_flag &= ~TCP_KEEP_IN_KQUEUE;
457 return true;
458 }
459 else
460 {
461 MTLOG_ERROR("obj %p detach failed, error", this);
462 return false;
463 }
464 }
465
timer_notify()466 void TcpKeepConn::timer_notify()
467 {
468 MTLOG_DEBUG("keep timeout[%u], fd %d, close connection", _keep_time, _osfd);
469 ConnectionMgr::Instance()->CloseIdleTcpKeep(this);
470 }
471
TcpKeepMgr()472 TcpKeepMgr::TcpKeepMgr()
473 {
474 _keep_hash = new HashList(10000);
475 }
476
~TcpKeepMgr()477 TcpKeepMgr::~TcpKeepMgr()
478 {
479 if (!_keep_hash) {
480 return;
481 }
482
483 HashKey* hash_item = _keep_hash->HashGetFirst();
484 while (hash_item)
485 {
486 delete hash_item;
487 hash_item = _keep_hash->HashGetFirst();
488 }
489
490 delete _keep_hash;
491 _keep_hash = NULL;
492 }
493
GetTcpKeepConn(struct sockaddr_in * dst)494 TcpKeepConn* TcpKeepMgr::GetTcpKeepConn(struct sockaddr_in* dst)
495 {
496 TcpKeepConn* conn = NULL;
497 if (NULL == dst)
498 {
499 MTLOG_ERROR("input param dst null, error");
500 return NULL;
501 }
502
503 TcpKeepKey key(dst);
504 TcpKeepKey* conn_list = (TcpKeepKey*)_keep_hash->HashFindData(&key);
505 if ((NULL == conn_list) || (NULL == conn_list->GetFirstConn()))
506 {
507 conn = _mem_queue.AllocPtr();
508 if (conn) {
509 conn->SetDestAddr(dst);
510 }
511 }
512 else
513 {
514 conn = conn_list->GetFirstConn();
515 conn_list->RemoveConn(conn);
516 conn->IdleDetach();
517 }
518
519 return conn;
520 }
521
RemoveTcpKeepConn(TcpKeepConn * conn)522 bool TcpKeepMgr::RemoveTcpKeepConn(TcpKeepConn* conn)
523 {
524 struct sockaddr_in* dst = conn->GetDestAddr();
525 if ((dst->sin_addr.s_addr == 0) || (dst->sin_port == 0))
526 {
527 MTLOG_ERROR("sock addr, invalid, %x:%d", dst->sin_addr.s_addr, dst->sin_port);
528 return false;
529 }
530
531 TcpKeepKey key(dst);
532 TcpKeepKey* conn_list = (TcpKeepKey*)_keep_hash->HashFindData(&key);
533 if (!conn_list)
534 {
535 MTLOG_ERROR("no conn cache list, invalid, %x:%d", dst->sin_addr.s_addr, dst->sin_port);
536 return false;
537 }
538
539 conn->IdleDetach();
540 conn_list->RemoveConn(conn);
541
542 return true;
543
544 }
545
CacheTcpKeepConn(TcpKeepConn * conn)546 bool TcpKeepMgr::CacheTcpKeepConn(TcpKeepConn* conn)
547 {
548 struct sockaddr_in* dst = conn->GetDestAddr();
549 if ((dst->sin_addr.s_addr == 0) || (dst->sin_port == 0))
550 {
551 MTLOG_ERROR("sock addr, invalid, %x:%d", dst->sin_addr.s_addr, dst->sin_port);
552 return false;
553 }
554
555 TcpKeepKey key(dst);
556 TcpKeepKey* conn_list = (TcpKeepKey*)_keep_hash->HashFindData(&key);
557 if (!conn_list)
558 {
559 conn_list = new TcpKeepKey(conn->GetDestAddr());
560 if (!conn_list) {
561 MTLOG_ERROR("new conn list failed, error");
562 return false;
563 }
564 _keep_hash->HashInsert(conn_list);
565 }
566
567 if (!conn->IdleAttach())
568 {
569 MTLOG_ERROR("conn IdleAttach failed, error");
570 return false;
571 }
572
573 conn->ConnReuseClean();
574 conn_list->InsertConn(conn);
575
576
577 return true;
578
579 }
580
FreeTcpKeepConn(TcpKeepConn * conn,bool force_free)581 void TcpKeepMgr::FreeTcpKeepConn(TcpKeepConn* conn, bool force_free)
582 {
583 if (force_free)
584 {
585 conn->Reset();
586 _mem_queue.FreePtr(conn);
587 return;
588 }
589 else
590 {
591 if (!CacheTcpKeepConn(conn))
592 {
593 conn->Reset();
594 _mem_queue.FreePtr(conn);
595 return;
596 }
597 }
598 }
599
CreateSocket()600 int UdpSessionConn::CreateSocket()
601 {
602 if (!_action || !_ntfy_obj) {
603 MTLOG_ERROR("conn not set action %p, or _ntfy_obj %p, error", _action, _ntfy_obj);
604 return -100;
605 }
606 SessionProxy* proxy = dynamic_cast<SessionProxy*>(_ntfy_obj);
607 if (!proxy) {
608 MTLOG_ERROR("ntfy obj not match, _ntfy_obj %p, error", _ntfy_obj);
609 return -200;
610 }
611 ISessionNtfy* real_ntfy = proxy->GetRealNtfyObj();
612 if (!real_ntfy) {
613 MTLOG_ERROR("real ntfy obj not match, _ntfy_obj %p, error", _ntfy_obj);
614 return -300;
615 }
616
617 int osfd = real_ntfy->GetOsfd();
618 if (osfd <= 0)
619 {
620 osfd = real_ntfy->CreateSocket();
621 if (osfd <= 0) {
622 MTLOG_ERROR("real ntfy obj create fd failed, _ntfy_obj %p, error", real_ntfy);
623 return -400;
624 }
625 }
626 _ntfy_obj->SetOsfd(osfd);
627
628 return osfd;
629 }
630
CloseSocket()631 int UdpSessionConn::CloseSocket()
632 {
633 return 0;
634 }
635
SendData()636 int UdpSessionConn::SendData()
637 {
638 if (!_action || !_msg_buff || !_ntfy_obj) {
639 MTLOG_ERROR("conn not set action %p, or msg %p, ntfy %p error", _action, _msg_buff, _ntfy_obj);
640 return -100;
641 }
642
643 mt_hook_syscall(sendto);
644 int ret = ff_hook_sendto(_ntfy_obj->GetOsfd(), _msg_buff->GetMsgBuff(), _msg_buff->GetMsgLen(), 0,
645 (struct sockaddr*)_action->GetMsgDstAddr(), sizeof(struct sockaddr_in));
646 if (ret == -1)
647 {
648 if ((errno == EINTR) || (errno == EAGAIN) || (errno == EINPROGRESS))
649 {
650 return 0;
651 }
652 else
653 {
654 MTLOG_ERROR("socket send failed, fd %d, errno %d(%s)", _ntfy_obj->GetOsfd(),
655 errno, strerror(errno));
656 return -2;
657 }
658 }
659 else
660 {
661 _msg_buff->SetHaveSndLen(ret);
662 return ret;
663 }
664 }
665
RecvData()666 int UdpSessionConn::RecvData()
667 {
668 if (!_ntfy_obj || !_msg_buff) {
669 MTLOG_ERROR("conn not set _ntfy_obj %p, or msg %p, error", _ntfy_obj, _msg_buff);
670 return -100;
671 }
672
673 if (_ntfy_obj->GetRcvEvents() <= 0) {
674 MTLOG_DEBUG("conn _ntfy_obj %p, no recv event, retry it", _ntfy_obj);
675 return 0;
676 }
677
678 int msg_len = _msg_buff->GetMsgLen();
679 if (BUFF_RECV == _msg_buff->GetBuffType())
680 {
681 return msg_len;
682 }
683 else
684 {
685 MTLOG_DEBUG("conn msg buff %p, no recv comm", _msg_buff);
686 return 0;
687 }
688 }
689
690 ConnectionMgr* ConnectionMgr::_instance = NULL;
Instance(void)691 ConnectionMgr* ConnectionMgr::Instance (void)
692 {
693 if (NULL == _instance)
694 {
695 _instance = new ConnectionMgr();
696 }
697
698 return _instance;
699 }
700
Destroy()701 void ConnectionMgr::Destroy()
702 {
703 if( _instance != NULL )
704 {
705 delete _instance;
706 _instance = NULL;
707 }
708 }
709
ConnectionMgr()710 ConnectionMgr::ConnectionMgr()
711 {
712 }
713
~ConnectionMgr()714 ConnectionMgr::~ConnectionMgr()
715 {
716 }
717
GetConnection(CONN_OBJ_TYPE type,struct sockaddr_in * dst)718 IMtConnection* ConnectionMgr::GetConnection(CONN_OBJ_TYPE type, struct sockaddr_in* dst)
719 {
720 switch (type)
721 {
722 case OBJ_SHORT_CONN:
723 return _udp_short_queue.AllocPtr();
724 break;
725
726 case OBJ_TCP_KEEP:
727 return _tcp_keep_mgr.GetTcpKeepConn(dst);
728 break;
729
730 case OBJ_UDP_SESSION:
731 return _udp_session_queue.AllocPtr();
732 break;
733
734 default:
735 return NULL;
736 break;
737 }
738
739 }
740
FreeConnection(IMtConnection * conn,bool force_free)741 void ConnectionMgr::FreeConnection(IMtConnection* conn, bool force_free)
742 {
743 if (!conn) {
744 return;
745 }
746 CONN_OBJ_TYPE type = conn->GetConnType();
747
748 switch (type)
749 {
750 case OBJ_SHORT_CONN:
751 conn->Reset();
752 return _udp_short_queue.FreePtr(dynamic_cast<UdpShortConn*>(conn));
753 break;
754
755 case OBJ_TCP_KEEP:
756 return _tcp_keep_mgr.FreeTcpKeepConn(dynamic_cast<TcpKeepConn*>(conn), force_free);
757 break;
758
759 case OBJ_UDP_SESSION:
760 conn->Reset();
761 return _udp_session_queue.FreePtr(dynamic_cast<UdpSessionConn*>(conn));
762 break;
763
764 default:
765 break;
766 }
767
768 delete conn;
769 return;
770 }
771
CloseIdleTcpKeep(TcpKeepConn * conn)772 void ConnectionMgr::CloseIdleTcpKeep(TcpKeepConn* conn)
773 {
774 _tcp_keep_mgr.RemoveTcpKeepConn(conn);
775 _tcp_keep_mgr.FreeTcpKeepConn(conn, true);
776 }
777