1 
2 /**
3  * Tencent is pleased to support the open source community by making MSEC available.
4  *
5  * Copyright (C) 2016 THL A29 Limited, a Tencent company. All rights reserved.
6  *
7  * Licensed under the GNU General Public License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License. You may
9  * obtain a copy of the License at
10  *
11  *     https://opensource.org/licenses/GPL-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software distributed under the
14  * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
15  * either express or implied. See the License for the specific language governing permissions
16  * and limitations under the License.
17  */
18 
19 
20 /**
21  *  @file mt_connection.cpp
22  *  @info ΢�߳���Ϣ�������ӹ���ʵ��
23  *  @time 20130924
24  **/
25 #include <fcntl.h>
26 #include <sys/types.h>
27 #include <sys/socket.h>
28 #include <netinet/in.h>
29 #include <arpa/inet.h>
30 
31 #include "micro_thread.h"
32 #include "mt_msg.h"
33 #include "mt_notify.h"
34 #include "mt_connection.h"
35 #include "mt_sys_hook.h"
36 #include "ff_hook.h"
37 
38 using namespace std;
39 using namespace NS_MICRO_THREAD;
40 
41 
42 /**
43  * @brief  ΢�߳����ӻ��๹��������
44  */
45 IMtConnection::IMtConnection()
46 {
47     _type       = OBJ_CONN_UNDEF;
48     _action     = NULL;
49     _ntfy_obj   = NULL;
50     _msg_buff   = NULL;
51 }
52 IMtConnection::~IMtConnection()
53 {
54     if (_ntfy_obj) {
55         NtfyObjMgr::Instance()->FreeNtfyObj(_ntfy_obj);
56         _ntfy_obj = NULL;
57     }
58 
59     if (_msg_buff) {
60         MsgBuffPool::Instance()->FreeMsgBuf(_msg_buff);
61         _msg_buff = NULL;
62     }
63 }
64 
65 
66 /**
67  * @brief ���ӻ��ո����������
68  */
69 void IMtConnection::Reset()
70 {
71     if (_ntfy_obj) {
72         NtfyObjMgr::Instance()->FreeNtfyObj(_ntfy_obj);
73         _ntfy_obj = NULL;
74     }
75 
76     if (_msg_buff) {
77         MsgBuffPool::Instance()->FreeMsgBuf(_msg_buff);
78         _msg_buff = NULL;
79     }
80 
81     _action     = NULL;
82     _ntfy_obj   = NULL;
83     _msg_buff   = NULL;
84 }
85 
86 
87 /**
88  * @brief  ���ӵ�socket����, �������ӵ�Э�����͵�
89  * @return >0 -�ɹ�, ����ϵͳfd, < 0 ʧ��
90  */
91 int UdpShortConn::CreateSocket()
92 {
93     // 1. UDP������, ÿ���´�SOCKET
94     _osfd = socket(AF_INET, SOCK_DGRAM, 0);
95     if (_osfd < 0)
96     {
97         MTLOG_ERROR("socket create failed, errno %d(%s)", errno, strerror(errno));
98         return -1;
99     }
100 
101     // 2. ����������
102     int flags = 1;
103     if (ioctl(_osfd, FIONBIO, &flags) < 0)
104     {
105         MTLOG_ERROR("socket unblock failed, errno %d(%s)", errno, strerror(errno));
106         close(_osfd);
107         _osfd = -1;
108         return -2;
109     }
110 
111     // 3. ���¹�����Ϣ
112     if (_ntfy_obj) {
113         _ntfy_obj->SetOsfd(_osfd);
114     }
115 
116     return _osfd;
117 }
118 
119 /**
120  * @brief �ر�ϵͳsocket, ����һЩ״̬
121  */
122 int UdpShortConn::CloseSocket()
123 {
124     if (_osfd < 0)
125     {
126         return 0;
127     }
128 
129     close(_osfd);
130     _osfd = -1;
131 
132     return 0;
133 }
134 
135 
136 /**
137  * @brief ���Է�������, ������һ��
138  * @return 0 ���ͱ��ж�, ������. <0 ϵͳʧ��. >0 ���η��ͳɹ�
139  */
140 int UdpShortConn::SendData()
141 {
142     if (!_action || !_msg_buff) {
143         MTLOG_ERROR("conn not set action %p, or msg %p, error", _action, _msg_buff);
144         return -100;
145     }
146 
147     mt_hook_syscall(sendto);
148     int ret = ff_hook_sendto(_osfd, _msg_buff->GetMsgBuff(), _msg_buff->GetMsgLen(), 0,
149                 (struct sockaddr*)_action->GetMsgDstAddr(), sizeof(struct sockaddr_in));
150     if (ret == -1)
151     {
152         if ((errno == EINTR) || (errno == EAGAIN) || (errno == EINPROGRESS))
153         {
154             return 0;
155         }
156         else
157         {
158             MTLOG_ERROR("socket send failed, fd %d, errno %d(%s)", _osfd,
159                       errno, strerror(errno));
160             return -2;
161         }
162     }
163     else
164     {
165         _msg_buff->SetHaveSndLen(ret);
166         return ret;
167     }
168 }
169 
170 /**
171  * @brief ���Խ�������, ������һ��
172  * @param buff ���ջ�����ָ��
173  * @return -1 �Զ˹ر�. -2 ϵͳʧ��. >0 ���ν��ճɹ�
174  */
175 int UdpShortConn::RecvData()
176 {
177     if (!_action || !_msg_buff) {
178         MTLOG_ERROR("conn not set action %p, or msg %p, error", _action, _msg_buff);
179         return -100;
180     }
181 
182     struct sockaddr_in  from;
183     socklen_t fromlen = sizeof(from);
184     mt_hook_syscall(recvfrom);
185     int ret = ff_hook_recvfrom(_osfd, _msg_buff->GetMsgBuff(), _msg_buff->GetMaxLen(),
186                        0, (struct sockaddr*)&from, &fromlen);
187     if (ret < 0)
188     {
189         if ((errno == EINTR) || (errno == EAGAIN) || (errno == EINPROGRESS))
190         {
191             return 0;
192         }
193         else
194         {
195             MTLOG_ERROR("socket recv failed, fd %d, errno %d(%s)", _osfd,
196                       errno, strerror(errno));
197             return -2;  // ϵͳ����
198         }
199     }
200     else if (ret == 0)
201     {
202         return -1;  // �Զ˹ر�
203     }
204     else
205     {
206         _msg_buff->SetHaveRcvLen(ret);
207     }
208 
209     // �����ļ��, >0 �հ�����; =0 �����ȴ�; <0(-65535����)�����쳣
210     ret = _action->DoInput();
211     if (ret > 0)
212     {
213         _msg_buff->SetMsgLen(ret);
214         return ret;
215     }
216     else if (ret == 0)
217     {
218         return 0;
219     }
220     else if (ret == -65535)
221     {
222         _msg_buff->SetHaveRcvLen(0);
223         return 0;
224     }
225     else
226     {
227         return -1;
228     }
229 }
230 
231 /**
232  * @brief ���ӻ��ո����������
233  */
234 void UdpShortConn::Reset()
235 {
236     CloseSocket();
237     this->IMtConnection::Reset();
238 }
239 
240 
241 /**
242  * @brief  ���Ӵ���Զ�˻Ựͨ��, ��TCP��connect��
243  * @return 0 -�ɹ�, < 0 ʧ��
244  */
245 int TcpKeepConn::OpenCnnect()
246 {
247     if (!_action || !_msg_buff) {
248         MTLOG_ERROR("conn not set action %p, or msg %p, error", _action, _msg_buff);
249         return -100;
250     }
251 
252     int err = 0;
253     mt_hook_syscall(connect);
254     int ret = ff_hook_connect(_osfd, (struct sockaddr*)_action->GetMsgDstAddr(), sizeof(struct sockaddr_in));
255     if (ret < 0)
256     {
257         err = errno;
258         if (err == EISCONN)
259         {
260             return 0;
261         }
262         else
263         {
264             if ((err == EINPROGRESS) || (err == EALREADY) || (err == EINTR))
265             {
266                 MTLOG_DEBUG("Open connect not ok, maybe first try, sock %d, errno %d", _osfd, err);
267                 return -1;
268             }
269             else
270             {
271                 MTLOG_ERROR("Open connect not ok, sock %d, errno %d", _osfd, err);
272                 return -2;
273             }
274         }
275     }
276     else
277     {
278         return 0;
279     }
280 }
281 
282 /**
283  * @brief ����sock��TCP��������
284  */
285 int TcpKeepConn::CreateSocket()
286 {
287     if (_osfd > 0)        // ��������ʱ, ��������������; ������������ntfyfd
288     {
289         if (_ntfy_obj) {
290             _ntfy_obj->SetOsfd(_osfd);
291         }
292 
293         return _osfd;
294     }
295 
296     // ��һ�ν���ʱ, ����socket
297     _osfd = socket(AF_INET, SOCK_STREAM, 0);
298     if (_osfd < 0)
299     {
300         MTLOG_ERROR("create tcp socket failed, error: %d", errno);
301         return -1;
302     }
303 
304     // ����������
305     int flags = 1;
306     if (ioctl(_osfd, FIONBIO, &flags) < 0)
307     {
308         MTLOG_ERROR("set tcp socket unblock failed, error: %d", errno);
309         close(_osfd);
310         _osfd = -1;
311         return -2;
312     }
313 
314     // ���¹�����Ϣ
315     _keep_ntfy.SetOsfd(_osfd);
316     _keep_ntfy.DisableOutput();
317     _keep_ntfy.EnableInput();
318 
319     if (_ntfy_obj) {
320         _ntfy_obj->SetOsfd(_osfd);
321     }
322 
323     return _osfd;
324 }
325 
326 /**
327  * @brief ���Է�������, ������һ��
328  * @param dst  ����Ŀ�ĵ�ַ
329  * @param buff ���ͻ�����ָ��
330  * @param size �����͵������
331  * @return 0 ���ͱ��ж�, ������. <0 ϵͳʧ��. >0 ���η��ͳɹ�
332  */
333 int TcpKeepConn::SendData()
334 {
335     if (!_action || !_msg_buff) {
336         MTLOG_ERROR("conn not set action %p, or msg %p, error", _action, _msg_buff);
337         return -100;
338     }
339 
340     char* msg_ptr = (char*)_msg_buff->GetMsgBuff();
341     int msg_len = _msg_buff->GetMsgLen();
342     int have_send_len = _msg_buff->GetHaveSndLen();
343     mt_hook_syscall(send);
344     int ret = ff_hook_send(_osfd, msg_ptr + have_send_len, msg_len - have_send_len, 0);
345     if (ret == -1)
346     {
347         if ((errno == EINTR) || (errno == EAGAIN) || (errno == EINPROGRESS))
348         {
349             return 0;
350         }
351         else
352         {
353             MTLOG_ERROR("send tcp socket failed, error: %d", errno);
354             return -1;
355         }
356     }
357     else
358     {
359         have_send_len += ret;
360         _msg_buff->SetHaveSndLen(have_send_len);
361     }
362 
363     // ȫ���������, ���سɹ�, ��������ȴ�
364     if (have_send_len >= msg_len)
365     {
366         return msg_len;
367     }
368     else
369     {
370         return 0;
371     }
372 }
373 
374 /**
375  * @brief ���Խ�������, ������һ��
376  * @param buff ���ջ�����ָ��
377  * @return -1 �Զ˹ر�. -2 ϵͳʧ��. >0 ���ν��ճɹ�
378  */
379 int TcpKeepConn::RecvData()
380 {
381     if (!_action || !_msg_buff) {
382         MTLOG_ERROR("conn not set action %p, or msg %p, error", _action, _msg_buff);
383         return -100;
384     }
385 
386     char* msg_ptr = (char*)_msg_buff->GetMsgBuff();
387     int max_len = _msg_buff->GetMaxLen();
388     int have_rcv_len = _msg_buff->GetHaveRcvLen();
389     mt_hook_syscall(recv);
390     int ret = ff_hook_recv(_osfd, (char*)msg_ptr + have_rcv_len, max_len - have_rcv_len, 0);
391     if (ret < 0)
392     {
393         if ((errno == EINTR) || (errno == EAGAIN) || (errno == EINPROGRESS))
394         {
395             return 0;
396         }
397         else
398         {
399             MTLOG_ERROR("recv tcp socket failed, error: %d", errno);
400             return -2;  // ϵͳ����
401         }
402     }
403     else if (ret == 0)
404     {
405         MTLOG_ERROR("tcp remote close, address: %s[%d]",
406                 inet_ntoa(_dst_addr.sin_addr), ntohs(_dst_addr.sin_port));
407         return -1;  // �Զ˹ر�
408     }
409     else
410     {
411         have_rcv_len += ret;
412         _msg_buff->SetHaveRcvLen(have_rcv_len);
413     }
414 
415     // �����ļ��, >0 �հ�����; =0 �����ȴ�; <0(-65535����)�����쳣
416     ret = _action->DoInput();
417     if (ret > 0)
418     {
419         _msg_buff->SetMsgLen(have_rcv_len);
420         return ret;
421     }
422     else if (ret == 0)
423     {
424         return 0;
425     }
426     else
427     {
428         return -1;
429     }
430 }
431 
432 /**
433  * @brief �ر�ϵͳsocket, ����һЩ״̬
434  */
435 int TcpKeepConn::CloseSocket()
436 {
437     if (_osfd < 0)
438     {
439         return 0;
440     }
441     _keep_ntfy.SetOsfd(-1);
442 
443     close(_osfd);
444     _osfd = -1;
445 
446     return 0;
447 }
448 
449 /**
450  * @brief ���ӻ��ո����������
451  */
452 void TcpKeepConn::Reset()
453 {
454     memset(&_dst_addr, 0 ,sizeof(_dst_addr));
455     CloseSocket();
456     this->IMtConnection::Reset();
457 }
458 
459 /**
460  * @brief ���ӻ��ո����������
461  */
462 void TcpKeepConn::ConnReuseClean()
463 {
464     this->IMtConnection::Reset();
465 }
466 
467 /**
468  * @brief Idle���洦��, epoll ����Զ�˹رյ�
469  */
470 bool TcpKeepConn::IdleAttach()
471 {
472     if (_osfd < 0) {
473         MTLOG_ERROR("obj %p attach failed, fd %d error", this, _osfd);
474         return false;
475     }
476 
477     if (_keep_flag & TCP_KEEP_IN_KQUEUE) {
478         MTLOG_ERROR("obj %p repeat attach, error", this);
479         return true;
480     }
481 
482     _keep_ntfy.DisableOutput();
483     _keep_ntfy.EnableInput();
484 
485     // ���ʱ�����
486     CTimerMng* timer = MtFrame::Instance()->GetTimerMng();
487     if ((NULL == timer) || !timer->start_timer(this, _keep_time))
488     {
489         MTLOG_ERROR("obj %p attach timer failed, error", this);
490         return false;
491     }
492 
493     if (MtFrame::Instance()->KqueueAddObj(&_keep_ntfy))
494     {
495         _keep_flag |= TCP_KEEP_IN_KQUEUE;
496         return true;
497     }
498     else
499     {
500         MTLOG_ERROR("obj %p attach failed, error", this);
501         return false;
502     }
503 }
504 
505 /**
506  * @brief Idleȡ�����洦��, �����ɿ����߳�����Զ�˹ر�
507  */
508 bool TcpKeepConn::IdleDetach()
509 {
510     if (_osfd < 0) {
511         MTLOG_ERROR("obj %p detach failed, fd %d error", this, _osfd);
512         return false;
513     }
514 
515     if (!(_keep_flag & TCP_KEEP_IN_KQUEUE)) {
516         MTLOG_DEBUG("obj %p repeat detach, error", this);
517         return true;
518     }
519 
520     _keep_ntfy.DisableOutput();
521     _keep_ntfy.EnableInput();
522 
523     // ���ʱ��ɾ��
524     CTimerMng* timer = MtFrame::Instance()->GetTimerMng();
525     if (NULL != timer)
526     {
527         timer->stop_timer(this);
528     }
529 
530     if (MtFrame::Instance()->KqueueDelObj(&_keep_ntfy))
531     {
532         _keep_flag &= ~TCP_KEEP_IN_KQUEUE;
533         return true;
534     }
535     else
536     {
537         MTLOG_ERROR("obj %p detach failed, error", this);
538         return false;
539     }
540 }
541 
542 
543 /**
544  * @brief ��ʱ֪ͨ����, ����ʵ���߼�
545  */
546 void TcpKeepConn::timer_notify()
547 {
548     MTLOG_DEBUG("keep timeout[%u], fd %d, close connection", _keep_time, _osfd);
549     ConnectionMgr::Instance()->CloseIdleTcpKeep(this);
550 }
551 
552 /**
553  * @brief ��������������
554  */
555 TcpKeepMgr::TcpKeepMgr()
556 {
557     _keep_hash = new HashList(10000);
558 }
559 
560 TcpKeepMgr::~TcpKeepMgr()
561 {
562     if (!_keep_hash) {
563         return;
564     }
565 
566     HashKey* hash_item = _keep_hash->HashGetFirst();
567     while (hash_item)
568     {
569         delete hash_item;
570         hash_item = _keep_hash->HashGetFirst();
571     }
572 
573     delete _keep_hash;
574     _keep_hash = NULL;
575 }
576 
577 
578 /**
579  * @brief ��IP��ַ��ȡTCP�ı�������
580  */
581 TcpKeepConn* TcpKeepMgr::GetTcpKeepConn(struct sockaddr_in* dst)
582 {
583     TcpKeepConn* conn = NULL;
584     if (NULL == dst)
585     {
586         MTLOG_ERROR("input param dst null, error");
587         return NULL;
588     }
589 
590     TcpKeepKey key(dst);
591     TcpKeepKey* conn_list = (TcpKeepKey*)_keep_hash->HashFindData(&key);
592     if ((NULL == conn_list) || (NULL == conn_list->GetFirstConn()))
593     {
594         conn = _mem_queue.AllocPtr();
595         if (conn) {
596             conn->SetDestAddr(dst);
597         }
598     }
599     else
600     {
601         conn = conn_list->GetFirstConn();
602         conn_list->RemoveConn(conn);
603         conn->IdleDetach();
604     }
605 
606     return conn;
607 }
608 
609 /**
610  * @brief ��IP��ַ����TCP�ı�������
611  */
612 bool TcpKeepMgr::RemoveTcpKeepConn(TcpKeepConn* conn)
613 {
614     struct sockaddr_in* dst = conn->GetDestAddr();
615     if ((dst->sin_addr.s_addr == 0) || (dst->sin_port == 0))
616     {
617         MTLOG_ERROR("sock addr, invalid, %x:%d", dst->sin_addr.s_addr, dst->sin_port);
618         return false;
619     }
620 
621     TcpKeepKey key(dst);
622     TcpKeepKey* conn_list = (TcpKeepKey*)_keep_hash->HashFindData(&key);
623     if (!conn_list)
624     {
625         MTLOG_ERROR("no conn cache list, invalid, %x:%d", dst->sin_addr.s_addr, dst->sin_port);
626         return false;
627     }
628 
629     conn->IdleDetach();
630     conn_list->RemoveConn(conn);
631 
632     return true;
633 
634 }
635 
636 
637 /**
638  * @brief ��IP��ַ����TCP�ı�������
639  */
640 bool TcpKeepMgr::CacheTcpKeepConn(TcpKeepConn* conn)
641 {
642     struct sockaddr_in* dst = conn->GetDestAddr();
643     if ((dst->sin_addr.s_addr == 0) || (dst->sin_port == 0))
644     {
645         MTLOG_ERROR("sock addr, invalid, %x:%d", dst->sin_addr.s_addr, dst->sin_port);
646         return false;
647     }
648 
649     TcpKeepKey key(dst);
650     TcpKeepKey* conn_list = (TcpKeepKey*)_keep_hash->HashFindData(&key);
651     if (!conn_list)
652     {
653         conn_list = new TcpKeepKey(conn->GetDestAddr());
654         if (!conn_list) {
655             MTLOG_ERROR("new conn list failed, error");
656             return false;
657         }
658         _keep_hash->HashInsert(conn_list);
659     }
660 
661     if (!conn->IdleAttach())
662     {
663         MTLOG_ERROR("conn IdleAttach failed, error");
664         return false;
665     }
666 
667     conn->ConnReuseClean();
668     conn_list->InsertConn(conn);
669 
670 
671     return true;
672 
673 }
674 
675 /**
676  * @brief �رջ���tcp������
677  */
678 void TcpKeepMgr::FreeTcpKeepConn(TcpKeepConn* conn, bool force_free)
679 {
680     if (force_free)
681     {
682         conn->Reset();
683         _mem_queue.FreePtr(conn);
684         return;
685     }
686     else
687     {
688         if (!CacheTcpKeepConn(conn))
689         {
690             conn->Reset();
691             _mem_queue.FreePtr(conn);
692             return;
693         }
694     }
695 }
696 
697 
698 
699 /**
700  * @brief  ���ӵ�socket����, �������ӵ�Э�����͵�
701  * @return >0 -�ɹ�, ����ϵͳfd, < 0 ʧ��
702  */
703 int UdpSessionConn::CreateSocket()
704 {
705     // 1. session������, ��֪ͨ����������fd
706     if (!_action || !_ntfy_obj) {
707         MTLOG_ERROR("conn not set action %p, or _ntfy_obj %p, error", _action, _ntfy_obj);
708         return -100;
709     }
710     SessionProxy* proxy = dynamic_cast<SessionProxy*>(_ntfy_obj);
711     if (!proxy) {
712         MTLOG_ERROR("ntfy obj not match, _ntfy_obj %p, error", _ntfy_obj);
713         return -200;
714     }
715     ISessionNtfy* real_ntfy = proxy->GetRealNtfyObj();
716     if (!real_ntfy) {
717         MTLOG_ERROR("real ntfy obj not match, _ntfy_obj %p, error", _ntfy_obj);
718         return -300;
719     }
720 
721     // 2. ί�ɴ���, ���¾����Ϣ
722     int osfd = real_ntfy->GetOsfd();
723     if (osfd <= 0)
724     {
725         osfd = real_ntfy->CreateSocket();
726         if (osfd <= 0) {
727             MTLOG_ERROR("real ntfy obj create fd failed, _ntfy_obj %p, error", real_ntfy);
728             return -400;
729         }
730     }
731     _ntfy_obj->SetOsfd(osfd);
732 
733     return osfd;
734 }
735 
736 /**
737  * @brief �ر�ϵͳsocket, ����һЩ״̬
738  */
739 int UdpSessionConn::CloseSocket()
740 {
741     return 0;
742 }
743 
744 
745 /**
746  * @brief ���Է�������, ������һ��
747  * @return 0 ���ͱ��ж�, ������. <0 ϵͳʧ��. >0 ���η��ͳɹ�
748  */
749 int UdpSessionConn::SendData()
750 {
751     if (!_action || !_msg_buff || !_ntfy_obj) {
752         MTLOG_ERROR("conn not set action %p, or msg %p, ntfy %p error", _action, _msg_buff, _ntfy_obj);
753         return -100;
754     }
755 
756     mt_hook_syscall(sendto);
757     int ret = ff_hook_sendto(_ntfy_obj->GetOsfd(), _msg_buff->GetMsgBuff(), _msg_buff->GetMsgLen(), 0,
758                 (struct sockaddr*)_action->GetMsgDstAddr(), sizeof(struct sockaddr_in));
759     if (ret == -1)
760     {
761         if ((errno == EINTR) || (errno == EAGAIN) || (errno == EINPROGRESS))
762         {
763             return 0;
764         }
765         else
766         {
767             MTLOG_ERROR("socket send failed, fd %d, errno %d(%s)", _ntfy_obj->GetOsfd(),
768                       errno, strerror(errno));
769             return -2;
770         }
771     }
772     else
773     {
774         _msg_buff->SetHaveSndLen(ret);
775         return ret;
776     }
777 }
778 
779 /**
780  * @brief ���Խ�������, ������һ��
781  * @param buff ���ջ�����ָ��
782  * @return 0 -�����ȴ�����; >0 ���ճɹ�; < 0 ʧ��
783  */
784 int UdpSessionConn::RecvData()
785 {
786     if (!_ntfy_obj || !_msg_buff) {
787         MTLOG_ERROR("conn not set _ntfy_obj %p, or msg %p, error", _ntfy_obj, _msg_buff);
788         return -100;
789     }
790 
791     if (_ntfy_obj->GetRcvEvents() <= 0) {
792         MTLOG_DEBUG("conn _ntfy_obj %p, no recv event, retry it", _ntfy_obj);
793         return 0;
794     }
795 
796     //  UDP Session ֪ͨ���滻msg buff, ͨ��type�ж�
797     int msg_len = _msg_buff->GetMsgLen();
798     if (BUFF_RECV == _msg_buff->GetBuffType())
799     {
800         return msg_len;
801     }
802     else
803     {
804         MTLOG_DEBUG("conn msg buff %p, no recv comm", _msg_buff);
805         return 0;
806     }
807 }
808 
809 
810 /**
811  * @brief sessionȫ�ֹ�����
812  * @return ȫ�־��ָ��
813  */
814 ConnectionMgr* ConnectionMgr::_instance = NULL;
815 ConnectionMgr* ConnectionMgr::Instance (void)
816 {
817     if (NULL == _instance)
818     {
819         _instance = new ConnectionMgr();
820     }
821 
822     return _instance;
823 }
824 
825 /**
826  * @brief session����ȫ�ֵ����ٽӿ�
827  */
828 void ConnectionMgr::Destroy()
829 {
830     if( _instance != NULL )
831     {
832         delete _instance;
833         _instance = NULL;
834     }
835 }
836 
837 /**
838  * @brief ��Ϣbuff�Ĺ��캯��
839  */
840 ConnectionMgr::ConnectionMgr()
841 {
842 }
843 
844 /**
845  * @brief ��������, ��������Դ, ������������
846  */
847 ConnectionMgr::~ConnectionMgr()
848 {
849 }
850 
851 /**
852  * @brief ��ȡ�ӿ�
853  */
854 IMtConnection* ConnectionMgr::GetConnection(CONN_OBJ_TYPE type, struct sockaddr_in* dst)
855 {
856     switch (type)
857     {
858         case OBJ_SHORT_CONN:
859             return _udp_short_queue.AllocPtr();
860             break;
861 
862         case OBJ_TCP_KEEP:
863             return _tcp_keep_mgr.GetTcpKeepConn(dst);
864             break;
865 
866         case OBJ_UDP_SESSION:
867             return _udp_session_queue.AllocPtr();
868             break;
869 
870         default:
871             return NULL;
872             break;
873     }
874 
875 }
876 
877 /**
878  * @brief ���սӿ�
879  */
880 void ConnectionMgr::FreeConnection(IMtConnection* conn, bool force_free)
881 {
882     if (!conn) {
883         return;
884     }
885     CONN_OBJ_TYPE type = conn->GetConnType();
886 
887     switch (type)
888     {
889         case OBJ_SHORT_CONN:
890             conn->Reset();
891             return _udp_short_queue.FreePtr(dynamic_cast<UdpShortConn*>(conn));
892             break;
893 
894         case OBJ_TCP_KEEP:
895             return _tcp_keep_mgr.FreeTcpKeepConn(dynamic_cast<TcpKeepConn*>(conn), force_free);
896             break;
897 
898         case OBJ_UDP_SESSION:
899             conn->Reset();
900             return _udp_session_queue.FreePtr(dynamic_cast<UdpSessionConn*>(conn));
901             break;
902 
903         default:
904             break;
905     }
906 
907     delete conn;
908     return;
909 }
910 
911 
912 /**
913  * @brief �ر�idle��tcp������
914  */
915 void ConnectionMgr::CloseIdleTcpKeep(TcpKeepConn* conn)
916 {
917     _tcp_keep_mgr.RemoveTcpKeepConn(conn);
918     _tcp_keep_mgr.FreeTcpKeepConn(conn, true);
919 }
920 
921 
922