1 
2 /**
3  * Tencent is pleased to support the open source community by making MSEC available.
4  *
5  * Copyright (C) 2016 THL A29 Limited, a Tencent company. All rights reserved.
6  *
7  * Licensed under the GNU General Public License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License. You may
9  * obtain a copy of the License at
10  *
11  *     https://opensource.org/licenses/GPL-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software distributed under the
14  * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
15  * either express or implied. See the License for the specific language governing permissions
16  * and limitations under the License.
17  */
18 
19 
20 /**
21  *  @file mt_connection.cpp
22  *  @time 20130924
23  **/
24 #include <fcntl.h>
25 #include <sys/types.h>
26 #include <sys/socket.h>
27 #include <netinet/in.h>
28 #include <arpa/inet.h>
29 
30 #include "micro_thread.h"
31 #include "mt_msg.h"
32 #include "mt_notify.h"
33 #include "mt_connection.h"
34 #include "mt_sys_hook.h"
35 #include "ff_hook.h"
36 
37 using namespace std;
38 using namespace NS_MICRO_THREAD;
39 
40 IMtConnection::IMtConnection()
41 {
42     _type       = OBJ_CONN_UNDEF;
43     _action     = NULL;
44     _ntfy_obj   = NULL;
45     _msg_buff   = NULL;
46 }
47 IMtConnection::~IMtConnection()
48 {
49     if (_ntfy_obj) {
50         NtfyObjMgr::Instance()->FreeNtfyObj(_ntfy_obj);
51         _ntfy_obj = NULL;
52     }
53 
54     if (_msg_buff) {
55         MsgBuffPool::Instance()->FreeMsgBuf(_msg_buff);
56         _msg_buff = NULL;
57     }
58 }
59 
60 void IMtConnection::Reset()
61 {
62     if (_ntfy_obj) {
63         NtfyObjMgr::Instance()->FreeNtfyObj(_ntfy_obj);
64         _ntfy_obj = NULL;
65     }
66 
67     if (_msg_buff) {
68         MsgBuffPool::Instance()->FreeMsgBuf(_msg_buff);
69         _msg_buff = NULL;
70     }
71 
72     _action     = NULL;
73     _ntfy_obj   = NULL;
74     _msg_buff   = NULL;
75 }
76 
77 int UdpShortConn::CreateSocket()
78 {
79     _osfd = socket(AF_INET, SOCK_DGRAM, 0);
80     if (_osfd < 0)
81     {
82         MTLOG_ERROR("socket create failed, errno %d(%s)", errno, strerror(errno));
83         return -1;
84     }
85 
86     int flags = 1;
87     if (ioctl(_osfd, FIONBIO, &flags) < 0)
88     {
89         MTLOG_ERROR("socket unblock failed, errno %d(%s)", errno, strerror(errno));
90         close(_osfd);
91         _osfd = -1;
92         return -2;
93     }
94 
95     if (_ntfy_obj) {
96         _ntfy_obj->SetOsfd(_osfd);
97     }
98 
99     return _osfd;
100 }
101 
102 int UdpShortConn::CloseSocket()
103 {
104     if (_osfd < 0)
105     {
106         return 0;
107     }
108 
109     close(_osfd);
110     _osfd = -1;
111 
112     return 0;
113 }
114 
115 int UdpShortConn::SendData()
116 {
117     if (!_action || !_msg_buff) {
118         MTLOG_ERROR("conn not set action %p, or msg %p, error", _action, _msg_buff);
119         return -100;
120     }
121 
122     mt_hook_syscall(sendto);
123     int ret = ff_hook_sendto(_osfd, _msg_buff->GetMsgBuff(), _msg_buff->GetMsgLen(), 0,
124                 (struct sockaddr*)_action->GetMsgDstAddr(), sizeof(struct sockaddr_in));
125     if (ret == -1)
126     {
127         if ((errno == EINTR) || (errno == EAGAIN) || (errno == EINPROGRESS))
128         {
129             return 0;
130         }
131         else
132         {
133             MTLOG_ERROR("socket send failed, fd %d, errno %d(%s)", _osfd,
134                       errno, strerror(errno));
135             return -2;
136         }
137     }
138     else
139     {
140         _msg_buff->SetHaveSndLen(ret);
141         return ret;
142     }
143 }
144 
145 int UdpShortConn::RecvData()
146 {
147     if (!_action || !_msg_buff) {
148         MTLOG_ERROR("conn not set action %p, or msg %p, error", _action, _msg_buff);
149         return -100;
150     }
151 
152     struct sockaddr_in  from;
153     socklen_t fromlen = sizeof(from);
154     mt_hook_syscall(recvfrom);
155     int ret = ff_hook_recvfrom(_osfd, _msg_buff->GetMsgBuff(), _msg_buff->GetMaxLen(),
156                        0, (struct sockaddr*)&from, &fromlen);
157     if (ret < 0)
158     {
159         if ((errno == EINTR) || (errno == EAGAIN) || (errno == EINPROGRESS))
160         {
161             return 0;
162         }
163         else
164         {
165             MTLOG_ERROR("socket recv failed, fd %d, errno %d(%s)", _osfd,
166                       errno, strerror(errno));
167             return -2;
168         }
169     }
170     else if (ret == 0)
171     {
172         return -1;
173     }
174     else
175     {
176         _msg_buff->SetHaveRcvLen(ret);
177     }
178 
179     ret = _action->DoInput();
180     if (ret > 0)
181     {
182         _msg_buff->SetMsgLen(ret);
183         return ret;
184     }
185     else if (ret == 0)
186     {
187         return 0;
188     }
189     else if (ret == -65535)
190     {
191         _msg_buff->SetHaveRcvLen(0);
192         return 0;
193     }
194     else
195     {
196         return -1;
197     }
198 }
199 
200 void UdpShortConn::Reset()
201 {
202     CloseSocket();
203     this->IMtConnection::Reset();
204 }
205 
206 int TcpKeepConn::OpenCnnect()
207 {
208     if (!_action || !_msg_buff) {
209         MTLOG_ERROR("conn not set action %p, or msg %p, error", _action, _msg_buff);
210         return -100;
211     }
212 
213     int err = 0;
214     mt_hook_syscall(connect);
215     int ret = ff_hook_connect(_osfd, (struct sockaddr*)_action->GetMsgDstAddr(), sizeof(struct sockaddr_in));
216     if (ret < 0)
217     {
218         err = errno;
219         if (err == EISCONN)
220         {
221             return 0;
222         }
223         else
224         {
225             if ((err == EINPROGRESS) || (err == EALREADY) || (err == EINTR))
226             {
227                 MTLOG_DEBUG("Open connect not ok, maybe first try, sock %d, errno %d", _osfd, err);
228                 return -1;
229             }
230             else
231             {
232                 MTLOG_ERROR("Open connect not ok, sock %d, errno %d", _osfd, err);
233                 return -2;
234             }
235         }
236     }
237     else
238     {
239         return 0;
240     }
241 }
242 
243 int TcpKeepConn::CreateSocket()
244 {
245     if (_osfd > 0)
246     {
247         if (_ntfy_obj) {
248             _ntfy_obj->SetOsfd(_osfd);
249         }
250 
251         return _osfd;
252     }
253 
254     _osfd = socket(AF_INET, SOCK_STREAM, 0);
255     if (_osfd < 0)
256     {
257         MTLOG_ERROR("create tcp socket failed, error: %d", errno);
258         return -1;
259     }
260 
261     int flags = 1;
262     if (ioctl(_osfd, FIONBIO, &flags) < 0)
263     {
264         MTLOG_ERROR("set tcp socket unblock failed, error: %d", errno);
265         close(_osfd);
266         _osfd = -1;
267         return -2;
268     }
269 
270     _keep_ntfy.SetOsfd(_osfd);
271     _keep_ntfy.DisableOutput();
272     _keep_ntfy.EnableInput();
273 
274     if (_ntfy_obj) {
275         _ntfy_obj->SetOsfd(_osfd);
276     }
277 
278     return _osfd;
279 }
280 
281 int TcpKeepConn::SendData()
282 {
283     if (!_action || !_msg_buff) {
284         MTLOG_ERROR("conn not set action %p, or msg %p, error", _action, _msg_buff);
285         return -100;
286     }
287 
288     char* msg_ptr = (char*)_msg_buff->GetMsgBuff();
289     int msg_len = _msg_buff->GetMsgLen();
290     int have_send_len = _msg_buff->GetHaveSndLen();
291     mt_hook_syscall(send);
292     int ret = ff_hook_send(_osfd, msg_ptr + have_send_len, msg_len - have_send_len, 0);
293     if (ret == -1)
294     {
295         if ((errno == EINTR) || (errno == EAGAIN) || (errno == EINPROGRESS))
296         {
297             return 0;
298         }
299         else
300         {
301             MTLOG_ERROR("send tcp socket failed, error: %d", errno);
302             return -1;
303         }
304     }
305     else
306     {
307         have_send_len += ret;
308         _msg_buff->SetHaveSndLen(have_send_len);
309     }
310 
311     if (have_send_len >= msg_len)
312     {
313         return msg_len;
314     }
315     else
316     {
317         return 0;
318     }
319 }
320 
321 int TcpKeepConn::RecvData()
322 {
323     if (!_action || !_msg_buff) {
324         MTLOG_ERROR("conn not set action %p, or msg %p, error", _action, _msg_buff);
325         return -100;
326     }
327 
328     char* msg_ptr = (char*)_msg_buff->GetMsgBuff();
329     int max_len = _msg_buff->GetMaxLen();
330     int have_rcv_len = _msg_buff->GetHaveRcvLen();
331     mt_hook_syscall(recv);
332     int ret = ff_hook_recv(_osfd, (char*)msg_ptr + have_rcv_len, max_len - have_rcv_len, 0);
333     if (ret < 0)
334     {
335         if ((errno == EINTR) || (errno == EAGAIN) || (errno == EINPROGRESS))
336         {
337             return 0;
338         }
339         else
340         {
341             MTLOG_ERROR("recv tcp socket failed, error: %d", errno);
342             return -2;
343         }
344     }
345     else if (ret == 0)
346     {
347         MTLOG_ERROR("tcp remote close, address: %s[%d]",
348                 inet_ntoa(_dst_addr.sin_addr), ntohs(_dst_addr.sin_port));
349         return -1;
350     }
351     else
352     {
353         have_rcv_len += ret;
354         _msg_buff->SetHaveRcvLen(have_rcv_len);
355     }
356 
357     ret = _action->DoInput();
358     if (ret > 0)
359     {
360         _msg_buff->SetMsgLen(have_rcv_len);
361         return ret;
362     }
363     else if (ret == 0)
364     {
365         return 0;
366     }
367     else
368     {
369         return -1;
370     }
371 }
372 
373 int TcpKeepConn::CloseSocket()
374 {
375     if (_osfd < 0)
376     {
377         return 0;
378     }
379     _keep_ntfy.SetOsfd(-1);
380 
381     close(_osfd);
382     _osfd = -1;
383 
384     return 0;
385 }
386 
387 void TcpKeepConn::Reset()
388 {
389     memset(&_dst_addr, 0 ,sizeof(_dst_addr));
390     CloseSocket();
391     this->IMtConnection::Reset();
392 }
393 
394 void TcpKeepConn::ConnReuseClean()
395 {
396     this->IMtConnection::Reset();
397 }
398 
399 bool TcpKeepConn::IdleAttach()
400 {
401     if (_osfd < 0) {
402         MTLOG_ERROR("obj %p attach failed, fd %d error", this, _osfd);
403         return false;
404     }
405 
406     if (_keep_flag & TCP_KEEP_IN_KQUEUE) {
407         MTLOG_ERROR("obj %p repeat attach, error", this);
408         return true;
409     }
410 
411     _keep_ntfy.DisableOutput();
412     _keep_ntfy.EnableInput();
413 
414     CTimerMng* timer = MtFrame::Instance()->GetTimerMng();
415     if ((NULL == timer) || !timer->start_timer(this, _keep_time))
416     {
417         MTLOG_ERROR("obj %p attach timer failed, error", this);
418         return false;
419     }
420 
421     if (MtFrame::Instance()->KqueueAddObj(&_keep_ntfy))
422     {
423         _keep_flag |= TCP_KEEP_IN_KQUEUE;
424         return true;
425     }
426     else
427     {
428         MTLOG_ERROR("obj %p attach failed, error", this);
429         return false;
430     }
431 }
432 
433 bool TcpKeepConn::IdleDetach()
434 {
435     if (_osfd < 0) {
436         MTLOG_ERROR("obj %p detach failed, fd %d error", this, _osfd);
437         return false;
438     }
439 
440     if (!(_keep_flag & TCP_KEEP_IN_KQUEUE)) {
441         MTLOG_DEBUG("obj %p repeat detach, error", this);
442         return true;
443     }
444 
445     _keep_ntfy.DisableOutput();
446     _keep_ntfy.EnableInput();
447 
448     CTimerMng* timer = MtFrame::Instance()->GetTimerMng();
449     if (NULL != timer)
450     {
451         timer->stop_timer(this);
452     }
453 
454     if (MtFrame::Instance()->KqueueDelObj(&_keep_ntfy))
455     {
456         _keep_flag &= ~TCP_KEEP_IN_KQUEUE;
457         return true;
458     }
459     else
460     {
461         MTLOG_ERROR("obj %p detach failed, error", this);
462         return false;
463     }
464 }
465 
466 void TcpKeepConn::timer_notify()
467 {
468     MTLOG_DEBUG("keep timeout[%u], fd %d, close connection", _keep_time, _osfd);
469     ConnectionMgr::Instance()->CloseIdleTcpKeep(this);
470 }
471 
472 TcpKeepMgr::TcpKeepMgr()
473 {
474     _keep_hash = new HashList(10000);
475 }
476 
477 TcpKeepMgr::~TcpKeepMgr()
478 {
479     if (!_keep_hash) {
480         return;
481     }
482 
483     HashKey* hash_item = _keep_hash->HashGetFirst();
484     while (hash_item)
485     {
486         delete hash_item;
487         hash_item = _keep_hash->HashGetFirst();
488     }
489 
490     delete _keep_hash;
491     _keep_hash = NULL;
492 }
493 
494 TcpKeepConn* TcpKeepMgr::GetTcpKeepConn(struct sockaddr_in* dst)
495 {
496     TcpKeepConn* conn = NULL;
497     if (NULL == dst)
498     {
499         MTLOG_ERROR("input param dst null, error");
500         return NULL;
501     }
502 
503     TcpKeepKey key(dst);
504     TcpKeepKey* conn_list = (TcpKeepKey*)_keep_hash->HashFindData(&key);
505     if ((NULL == conn_list) || (NULL == conn_list->GetFirstConn()))
506     {
507         conn = _mem_queue.AllocPtr();
508         if (conn) {
509             conn->SetDestAddr(dst);
510         }
511     }
512     else
513     {
514         conn = conn_list->GetFirstConn();
515         conn_list->RemoveConn(conn);
516         conn->IdleDetach();
517     }
518 
519     return conn;
520 }
521 
522 bool TcpKeepMgr::RemoveTcpKeepConn(TcpKeepConn* conn)
523 {
524     struct sockaddr_in* dst = conn->GetDestAddr();
525     if ((dst->sin_addr.s_addr == 0) || (dst->sin_port == 0))
526     {
527         MTLOG_ERROR("sock addr, invalid, %x:%d", dst->sin_addr.s_addr, dst->sin_port);
528         return false;
529     }
530 
531     TcpKeepKey key(dst);
532     TcpKeepKey* conn_list = (TcpKeepKey*)_keep_hash->HashFindData(&key);
533     if (!conn_list)
534     {
535         MTLOG_ERROR("no conn cache list, invalid, %x:%d", dst->sin_addr.s_addr, dst->sin_port);
536         return false;
537     }
538 
539     conn->IdleDetach();
540     conn_list->RemoveConn(conn);
541 
542     return true;
543 
544 }
545 
546 bool TcpKeepMgr::CacheTcpKeepConn(TcpKeepConn* conn)
547 {
548     struct sockaddr_in* dst = conn->GetDestAddr();
549     if ((dst->sin_addr.s_addr == 0) || (dst->sin_port == 0))
550     {
551         MTLOG_ERROR("sock addr, invalid, %x:%d", dst->sin_addr.s_addr, dst->sin_port);
552         return false;
553     }
554 
555     TcpKeepKey key(dst);
556     TcpKeepKey* conn_list = (TcpKeepKey*)_keep_hash->HashFindData(&key);
557     if (!conn_list)
558     {
559         conn_list = new TcpKeepKey(conn->GetDestAddr());
560         if (!conn_list) {
561             MTLOG_ERROR("new conn list failed, error");
562             return false;
563         }
564         _keep_hash->HashInsert(conn_list);
565     }
566 
567     if (!conn->IdleAttach())
568     {
569         MTLOG_ERROR("conn IdleAttach failed, error");
570         return false;
571     }
572 
573     conn->ConnReuseClean();
574     conn_list->InsertConn(conn);
575 
576 
577     return true;
578 
579 }
580 
581 void TcpKeepMgr::FreeTcpKeepConn(TcpKeepConn* conn, bool force_free)
582 {
583     if (force_free)
584     {
585         conn->Reset();
586         _mem_queue.FreePtr(conn);
587         return;
588     }
589     else
590     {
591         if (!CacheTcpKeepConn(conn))
592         {
593             conn->Reset();
594             _mem_queue.FreePtr(conn);
595             return;
596         }
597     }
598 }
599 
600 int UdpSessionConn::CreateSocket()
601 {
602     if (!_action || !_ntfy_obj) {
603         MTLOG_ERROR("conn not set action %p, or _ntfy_obj %p, error", _action, _ntfy_obj);
604         return -100;
605     }
606     SessionProxy* proxy = dynamic_cast<SessionProxy*>(_ntfy_obj);
607     if (!proxy) {
608         MTLOG_ERROR("ntfy obj not match, _ntfy_obj %p, error", _ntfy_obj);
609         return -200;
610     }
611     ISessionNtfy* real_ntfy = proxy->GetRealNtfyObj();
612     if (!real_ntfy) {
613         MTLOG_ERROR("real ntfy obj not match, _ntfy_obj %p, error", _ntfy_obj);
614         return -300;
615     }
616 
617     int osfd = real_ntfy->GetOsfd();
618     if (osfd <= 0)
619     {
620         osfd = real_ntfy->CreateSocket();
621         if (osfd <= 0) {
622             MTLOG_ERROR("real ntfy obj create fd failed, _ntfy_obj %p, error", real_ntfy);
623             return -400;
624         }
625     }
626     _ntfy_obj->SetOsfd(osfd);
627 
628     return osfd;
629 }
630 
631 int UdpSessionConn::CloseSocket()
632 {
633     return 0;
634 }
635 
636 int UdpSessionConn::SendData()
637 {
638     if (!_action || !_msg_buff || !_ntfy_obj) {
639         MTLOG_ERROR("conn not set action %p, or msg %p, ntfy %p error", _action, _msg_buff, _ntfy_obj);
640         return -100;
641     }
642 
643     mt_hook_syscall(sendto);
644     int ret = ff_hook_sendto(_ntfy_obj->GetOsfd(), _msg_buff->GetMsgBuff(), _msg_buff->GetMsgLen(), 0,
645                 (struct sockaddr*)_action->GetMsgDstAddr(), sizeof(struct sockaddr_in));
646     if (ret == -1)
647     {
648         if ((errno == EINTR) || (errno == EAGAIN) || (errno == EINPROGRESS))
649         {
650             return 0;
651         }
652         else
653         {
654             MTLOG_ERROR("socket send failed, fd %d, errno %d(%s)", _ntfy_obj->GetOsfd(),
655                       errno, strerror(errno));
656             return -2;
657         }
658     }
659     else
660     {
661         _msg_buff->SetHaveSndLen(ret);
662         return ret;
663     }
664 }
665 
666 int UdpSessionConn::RecvData()
667 {
668     if (!_ntfy_obj || !_msg_buff) {
669         MTLOG_ERROR("conn not set _ntfy_obj %p, or msg %p, error", _ntfy_obj, _msg_buff);
670         return -100;
671     }
672 
673     if (_ntfy_obj->GetRcvEvents() <= 0) {
674         MTLOG_DEBUG("conn _ntfy_obj %p, no recv event, retry it", _ntfy_obj);
675         return 0;
676     }
677 
678     int msg_len = _msg_buff->GetMsgLen();
679     if (BUFF_RECV == _msg_buff->GetBuffType())
680     {
681         return msg_len;
682     }
683     else
684     {
685         MTLOG_DEBUG("conn msg buff %p, no recv comm", _msg_buff);
686         return 0;
687     }
688 }
689 
690 ConnectionMgr* ConnectionMgr::_instance = NULL;
691 ConnectionMgr* ConnectionMgr::Instance (void)
692 {
693     if (NULL == _instance)
694     {
695         _instance = new ConnectionMgr();
696     }
697 
698     return _instance;
699 }
700 
701 void ConnectionMgr::Destroy()
702 {
703     if( _instance != NULL )
704     {
705         delete _instance;
706         _instance = NULL;
707     }
708 }
709 
710 ConnectionMgr::ConnectionMgr()
711 {
712 }
713 
714 ConnectionMgr::~ConnectionMgr()
715 {
716 }
717 
718 IMtConnection* ConnectionMgr::GetConnection(CONN_OBJ_TYPE type, struct sockaddr_in* dst)
719 {
720     switch (type)
721     {
722         case OBJ_SHORT_CONN:
723             return _udp_short_queue.AllocPtr();
724             break;
725 
726         case OBJ_TCP_KEEP:
727             return _tcp_keep_mgr.GetTcpKeepConn(dst);
728             break;
729 
730         case OBJ_UDP_SESSION:
731             return _udp_session_queue.AllocPtr();
732             break;
733 
734         default:
735             return NULL;
736             break;
737     }
738 
739 }
740 
741 void ConnectionMgr::FreeConnection(IMtConnection* conn, bool force_free)
742 {
743     if (!conn) {
744         return;
745     }
746     CONN_OBJ_TYPE type = conn->GetConnType();
747 
748     switch (type)
749     {
750         case OBJ_SHORT_CONN:
751             conn->Reset();
752             return _udp_short_queue.FreePtr(dynamic_cast<UdpShortConn*>(conn));
753             break;
754 
755         case OBJ_TCP_KEEP:
756             return _tcp_keep_mgr.FreeTcpKeepConn(dynamic_cast<TcpKeepConn*>(conn), force_free);
757             break;
758 
759         case OBJ_UDP_SESSION:
760             conn->Reset();
761             return _udp_session_queue.FreePtr(dynamic_cast<UdpSessionConn*>(conn));
762             break;
763 
764         default:
765             break;
766     }
767 
768     delete conn;
769     return;
770 }
771 
772 void ConnectionMgr::CloseIdleTcpKeep(TcpKeepConn* conn)
773 {
774     _tcp_keep_mgr.RemoveTcpKeepConn(conn);
775     _tcp_keep_mgr.FreeTcpKeepConn(conn, true);
776 }
777