xref: /f-stack/app/micro_thread/mt_net.cpp (revision a9643ea8)
1 
2 /**
3  * Tencent is pleased to support the open source community by making MSEC available.
4  *
5  * Copyright (C) 2016 THL A29 Limited, a Tencent company. All rights reserved.
6  *
7  * Licensed under the GNU General Public License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License. You may
9  * obtain a copy of the License at
10  *
11  *     https://opensource.org/licenses/GPL-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software distributed under the
14  * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
15  * either express or implied. See the License for the specific language governing permissions
16  * and limitations under the License.
17  */
18 
19 
20 /**
21  *  @file mt_mbuf_pool.cpp
22  *  @info ΢�߳���Ϣbuf�ع���ʵ��
23  *  @time 20130924
24  **/
25 
26 #include <errno.h>
27 #include <netinet/tcp.h>
28 #include "micro_thread.h"
29 #include "mt_sys_hook.h"
30 #include "ff_hook.h"
31 #include "mt_net.h"
32 
33 
34 using namespace std;
35 using namespace NS_MICRO_THREAD;
36 
37 // �������鹹
38 CNetHelper::CNetHelper()
39 {
40     handler = (void*)CNetMgr::Instance()->AllocNetItem();
41 }
42 
43 CNetHelper::~CNetHelper()
44 {
45     CNetHandler* net_handler = (CNetHandler*)handler;
46     if (handler != NULL)
47     {
48         net_handler->Reset();
49         CNetMgr::Instance()->FreeNetItem(net_handler);
50         handler = NULL;
51     }
52 }
53 
54 // ͬ���շ��ӿ�
55 int32_t CNetHelper::SendRecv(void* data, uint32_t len, uint32_t timeout)
56 {
57     if (handler != NULL) {
58         CNetHandler* net_handler = (CNetHandler*)handler;
59         return net_handler->SendRecv(data, len, timeout);
60     } else {
61         return RC_INVALID_HANDLER;
62     }
63 }
64 
65 // ��ȡ����buff��Ϣ, ��Ч��ֱ��helper����
66 void* CNetHelper::GetRspBuff()
67 {
68     if (handler != NULL) {
69         CNetHandler* net_handler = (CNetHandler*)handler;
70         return net_handler->GetRspBuff();
71     } else {
72         return NULL;
73     }
74 }
75 
76 // ��ȡ����buff��Ϣ, ��Ч��ֱ��helper����
77 uint32_t CNetHelper::GetRspLen()
78 {
79     if (handler != NULL) {
80         CNetHandler* net_handler = (CNetHandler*)handler;
81         return net_handler->GetRspLen();
82     } else {
83         return 0;
84     }
85 }
86 
87 
88 // ת����������Ϣ, �����ȡ
89 char* CNetHelper::GetErrMsg(int32_t result)
90 {
91     static const char* errmsg = "unknown error type";
92 
93     switch (result)
94     {
95         case RC_SUCCESS:
96             errmsg = "success";
97             break;
98 
99         case RC_ERR_SOCKET:
100             errmsg = "create socket failed";
101             break;
102 
103         case RC_SEND_FAIL:
104             errmsg = "send pakeage timeout or failed";
105             break;
106 
107         case RC_RECV_FAIL:
108             errmsg = "recv response timeout or failed";
109             break;
110 
111         case RC_CONNECT_FAIL:
112             errmsg = "connect timeout or failed";
113             break;
114 
115         case RC_CHECK_PKG_FAIL:
116             errmsg = "user package check failed";
117             break;
118 
119         case RC_NO_MORE_BUFF:
120             errmsg = "user response buffer too small";
121             break;
122 
123         case RC_REMOTE_CLOSED:
124             errmsg = "remote close connection";
125             break;
126 
127         case RC_INVALID_PARAM:
128             errmsg = "params invalid";
129             break;
130 
131         case RC_INVALID_HANDLER:
132             errmsg = "net handler invalid";
133             break;
134 
135         case RC_MEM_ERROR:
136             errmsg = "no more memory, alloc failed";
137             break;
138 
139         case RC_CONFLICT_SID:
140             errmsg = "session id with the dest address conflict";
141             break;
142 
143         case RC_KQUEUE_ERROR:
144             errmsg = "epoll system error";
145             break;
146 
147         default:
148             break;
149     }
150 
151     return (char*)errmsg;
152 }
153 
154 // ����Э�������, Ĭ��UDP
155 void CNetHelper::SetProtoType(MT_PROTO_TYPE type)
156 {
157     if (handler != NULL) {
158         CNetHandler* net_handler = (CNetHandler*)handler;
159         return net_handler->SetProtoType(type);
160     }
161 }
162 
163 // ����Ŀ��IP��ַ
164 void CNetHelper::SetDestAddress(struct sockaddr_in* dst)
165 {
166     if (handler != NULL) {
167         CNetHandler* net_handler = (CNetHandler*)handler;
168         return net_handler->SetDestAddress(dst);
169     }
170 }
171 
172 // ����session����session id��Ϣ, �����0
173 void CNetHelper::SetSessionId(uint64_t sid)
174 {
175     if (handler != NULL) {
176         CNetHandler* net_handler = (CNetHandler*)handler;
177         return net_handler->SetSessionId(sid);
178     }
179 }
180 
181 // ����session�����ص�����
182 void CNetHelper::SetSessionCallback(CHECK_SESSION_CALLBACK function)
183 {
184     if (handler != NULL) {
185         CNetHandler* net_handler = (CNetHandler*)handler;
186         return net_handler->SetSessionCallback(function);
187     }
188 }
189 
190 
191 
192 // ����һ��net handler, ���ڸ���
193 void CNetHandler::Reset()
194 {
195     // ȥ��session��connection�������
196     this->Unlink();
197     this->UnRegistSession();
198 
199     // ��̬�ڴ�����
200     if (_rsp_buff != NULL) {
201         delete_sk_buffer(_rsp_buff);
202         _rsp_buff               = NULL;
203     }
204 
205     // �ֶγ�ʼ������
206     _thread                     = NULL;
207     _proto_type                 = NET_PROTO_TCP;
208     _conn_type                  = TYPE_CONN_SESSION;
209     _dest_ipv4.sin_addr.s_addr  = 0;
210     _dest_ipv4.sin_port         = 0;
211     _session_id                 = 0;
212     _callback                   = NULL;
213     _err_no                     = 0;
214     _state_flags                = 0;
215     _conn_ptr                   = NULL;
216     _send_pos                   = 0;
217     _req_len                    = 0;
218     _req_data                   = NULL;
219 
220 }
221 
222 // ���캯��
223 CNetHandler::CNetHandler()
224 {
225     _state_flags = 0;
226     _rsp_buff = NULL;
227 
228     this->Reset();
229 }
230 
231 // ��������
232 CNetHandler::~CNetHandler()
233 {
234     this->Reset();
235 }
236 
237 // ����Ҫ�IJ�����Ϣ
238 int32_t CNetHandler::CheckParams()
239 {
240     // 1. ��������Ч���
241     if ((NULL == _req_data) || (_req_len == 0))
242     {
243         MTLOG_ERROR("param invalid, data[%p], len[%u]", _req_data, _req_len);
244         return RC_INVALID_PARAM;
245     }
246 
247     // 2. Ŀ�ĵ�ַ��Ч���
248     if ((_dest_ipv4.sin_addr.s_addr == 0) || (_dest_ipv4.sin_port == 0))
249     {
250         MTLOG_ERROR("param invalid, ip[%u], port[%u]", _dest_ipv4.sin_addr.s_addr,
251              _dest_ipv4.sin_port);
252         return RC_INVALID_PARAM;
253     }
254 
255     // 3. session ���ͼ����ȷ��
256     if (_conn_type == TYPE_CONN_SESSION)
257     {
258         if ((_callback == NULL) || (_session_id == 0))
259         {
260             MTLOG_ERROR("param invalid, callback[%p], session_id[%llu]", _callback, _session_id);
261             return RC_INVALID_PARAM;
262         }
263 
264         // �����ע��session��Ϣ
265         if (!this->RegistSession())
266         {
267             MTLOG_ERROR("param invalid, session_id[%llu] regist failed", _session_id);
268             return RC_CONFLICT_SID;
269         }
270     }
271 
272     // 4. ����ִ��
273     return 0;
274 }
275 
276 // ��ȡ����, ͬʱ�������ȴ����ӵĶ�����
277 int32_t CNetHandler::GetConnLink()
278 {
279     CDestLinks key;
280     key.SetKeyInfo(_dest_ipv4.sin_addr.s_addr, _dest_ipv4.sin_port, _proto_type, _conn_type);
281 
282     CDestLinks* dest_link = CNetMgr::Instance()->FindCreateDest(&key);
283     if (NULL == dest_link)
284     {
285         MTLOG_ERROR("get dest link handle failed");
286         return RC_MEM_ERROR;
287     }
288 
289     CSockLink* sock_link = dest_link->GetSockLink();
290     if (NULL == sock_link)
291     {
292         MTLOG_ERROR("get sock link handle failed");
293         return RC_MEM_ERROR;
294     }
295 
296     this->Link(sock_link);
297 
298     return 0;
299 }
300 
301 
302 // ����Ҫ�IJ�����Ϣ
303 int32_t CNetHandler::WaitConnect(uint64_t timeout)
304 {
305     CSockLink* conn = (CSockLink*)this->_conn_ptr;
306     if (NULL == conn)
307     {
308         MTLOG_ERROR("get sock link handle failed");
309         return RC_MEM_ERROR;
310     }
311 
312     int32_t fd = conn->CreateSock();
313     if (fd < 0)
314     {
315         MTLOG_ERROR("create sock failed, ret %d[%m]", fd);
316         return RC_ERR_SOCKET;
317     }
318 
319     if (conn->Connect())
320     {
321         MTLOG_DEBUG("sock conncet ok");
322         return RC_SUCCESS;
323     }
324 
325     // ����ȴ�connect����
326     this->SwitchToConn();
327 
328     // �ȴ�������
329     MtFrame* mtframe = MtFrame::Instance();
330     mtframe->WaitNotify(timeout);
331 
332     // ɾ����connect����, ��ʱ��Ҫ�����, ��������
333     this->SwitchToIdle();
334 
335     if (_err_no != 0)
336     {
337         MTLOG_ERROR("connect get out errno %d", _err_no);
338         return _err_no;
339     }
340 
341     // ��ʱ��������ȷ��
342     if (conn->Connected())
343     {
344         MTLOG_DEBUG("connect ok");
345         return 0;
346     }
347     else
348     {
349         MTLOG_TRACE("connect not ok, maybe timeout");
350         return RC_CONNECT_FAIL;
351     }
352 }
353 
354 
355 // ����Ҫ�IJ�����Ϣ
356 int32_t CNetHandler::WaitSend(uint64_t timeout)
357 {
358     CSockLink* conn = (CSockLink*)this->_conn_ptr;
359     if (NULL == conn)
360     {
361         MTLOG_ERROR("get sock link handle failed");
362         return RC_MEM_ERROR;
363     }
364 
365     int32_t ret = conn->SendData(_req_data, _req_len);
366     if (ret < 0)
367     {
368         MTLOG_ERROR("sock send failed, ret %d[%m]", ret);
369         return RC_SEND_FAIL;
370     }
371     this->SkipSendPos(ret);
372 
373     if (_req_len == 0)
374     {
375         MTLOG_DEBUG("sock send ok");
376         return RC_SUCCESS;
377     }
378 
379     // �ȴ���������, �л��ѷ��͵���Ϣ
380     this->SwitchToSend();
381 
382     // �ȴ�������
383     MtFrame* mtframe = MtFrame::Instance();
384     mtframe->WaitNotify(timeout);
385 
386     // ɾ����connect����
387     this->SwitchToIdle();
388 
389     // �쳣���
390     if (_err_no != 0)
391     {
392         MTLOG_ERROR("send get out errno %d", _err_no);
393         return _err_no;
394     }
395 
396     // ��ʱ���
397     if (_req_len == 0)
398     {
399         MTLOG_DEBUG("send req ok, len %u", _send_pos);
400         return 0;
401     }
402     else
403     {
404         MTLOG_TRACE("send req not ok, left len %u", _req_len);
405         return RC_SEND_FAIL;
406     }
407 }
408 
409 // ����Ҫ�IJ�����Ϣ
410 int32_t CNetHandler::WaitRecv(uint64_t timeout)
411 {
412     CSockLink* conn = (CSockLink*)this->_conn_ptr;
413     if (NULL == conn)
414     {
415         MTLOG_ERROR("get sock link handle failed");
416         return RC_MEM_ERROR;
417     }
418 
419     if (_conn_type == TYPE_CONN_SENDONLY)
420     {
421         MTLOG_DEBUG("only send, without recv");
422         return 0;
423     }
424 
425     // �л����ȴ�����
426     this->SwitchToRecv();
427 
428     // �ȴ�������
429     MtFrame* mtframe = MtFrame::Instance();
430     mtframe->WaitNotify(timeout);
431 
432     // ɾ����connect����
433     this->SwitchToIdle();
434 
435     // ��ʱ���
436     if ((_rsp_buff != NULL) && (_rsp_buff->data_len > 0))
437     {
438         MTLOG_DEBUG("recv get rsp, len %d", _rsp_buff->data_len);
439         return 0;
440     }
441     else
442     {
443         MTLOG_TRACE("recv get out errno %d", _err_no);
444         return RC_RECV_FAIL;
445     }
446 }
447 
448 
449 // ͬ���շ��ӿ�, �޸�Ϊsessionר��
450 int32_t CNetHandler::SendRecv(void* data, uint32_t len, uint32_t timeout)
451 {
452     utime64_t start_ms = MtFrame::Instance()->GetLastClock();
453     utime64_t cost_time = 0;
454     uint64_t time_left = timeout;
455     this->_req_data = data;
456     this->_req_len  = len;
457 
458     // 0. ����Ҫ��������Ϣ
459     int32_t ret = this->CheckParams();
460     if (ret < 0)
461     {
462         MTLOG_ERROR("check params failed, ret[%d]", ret);
463         goto EXIT_LABEL;
464     }
465 
466     // 1. ��ȡ������·ָ��
467     ret = this->GetConnLink();
468     if (ret < 0)
469     {
470         MTLOG_ERROR("get sock conn failed, ret: %d", ret);
471         goto EXIT_LABEL;
472     }
473 
474     // 2. �ȴ����ӳɹ�
475     ret = this->WaitConnect(time_left);
476     if (ret < 0)
477     {
478         MTLOG_ERROR("sock connect failed, ret: %d", ret);
479         goto EXIT_LABEL;
480     }
481 
482     // 3. �ȴ����ͳɹ�
483     cost_time = MtFrame::Instance()->GetLastClock() - start_ms;
484     time_left = (timeout > (uint32_t)cost_time) ? (timeout - (uint32_t)cost_time) : 0;
485     ret = this->WaitSend(time_left);
486     if (ret < 0)
487     {
488         MTLOG_ERROR("sock send failed, ret: %d", ret);
489         goto EXIT_LABEL;
490     }
491 
492     // 4. �ȴ����ճɹ�
493     cost_time = MtFrame::Instance()->GetLastClock() - start_ms;
494     time_left = (timeout > (uint32_t)cost_time) ? (timeout - (uint32_t)cost_time) : 0;
495     ret = this->WaitRecv(time_left);
496     if (ret < 0)
497     {
498         MTLOG_ERROR("sock recv failed, ret: %d", ret);
499         goto EXIT_LABEL;
500     }
501 
502     // 5. �ɹ�����
503     ret = 0;
504 
505 EXIT_LABEL:
506 
507     // ���η������, ��ҪUNLINK, ����NETHANDLER֧��
508     this->Unlink();
509 
510     // �ɹ�ʧ��, ��Ҫȥ��sessionע��
511     this->UnRegistSession();
512 
513     return ret;
514 }
515 
516 
517 // �������͵�������
518 uint32_t CNetHandler::SkipSendPos(uint32_t len)
519 {
520     uint32_t skip_len = (len >= _req_len) ? _req_len : len;
521     _req_len -= skip_len;
522     _send_pos += skip_len;
523     _req_data = (char*)_req_data + skip_len;
524 
525     return skip_len;
526 }
527 
528 // �������Ӷ���
529 void CNetHandler::Link(CSockLink* conn)
530 {
531     this->_conn_ptr = conn;
532     this->SwitchToIdle();
533 }
534 
535 
536 // �������Ӷ���
537 void CNetHandler::Unlink()
538 {
539     if (this->_state_flags != 0)
540     {
541         this->DetachConn();
542     }
543     this->_conn_ptr = NULL;
544 }
545 
546 // �����ڵȴ����Ӷ���
547 void CNetHandler::SwitchToConn()
548 {
549     CSockLink* conn = (CSockLink*)this->_conn_ptr;
550     if (NULL == conn)
551     {
552         MTLOG_ERROR("net handler invalid");
553         return;
554     }
555 
556     this->DetachConn();
557 
558     this->_state_flags |= STATE_IN_CONNECT;
559     conn->AppendToList(CSockLink::LINK_CONN_LIST, this);
560 }
561 
562 // �л������Ͷ���
563 void CNetHandler::SwitchToSend()
564 {
565     CSockLink* conn = (CSockLink*)this->_conn_ptr;
566     if (NULL == conn)
567     {
568         MTLOG_ERROR("net handler invalid");
569         return;
570     }
571 
572     this->DetachConn();
573 
574     this->_state_flags |= STATE_IN_SEND;
575     conn->AppendToList(CSockLink::LINK_SEND_LIST, this);
576 }
577 
578 
579 // �л������ն���
580 void CNetHandler::SwitchToRecv()
581 {
582     CSockLink* conn = (CSockLink*)this->_conn_ptr;
583     if (NULL == conn)
584     {
585         MTLOG_ERROR("net handler invalid");
586         return;
587     }
588 
589     this->DetachConn();
590 
591     this->_state_flags |= STATE_IN_RECV;
592     conn->AppendToList(CSockLink::LINK_RECV_LIST, this);
593 }
594 
595 // �л�������״̬
596 void CNetHandler::SwitchToIdle()
597 {
598     CSockLink* conn = (CSockLink*)this->_conn_ptr;
599     if (NULL == conn)
600     {
601         MTLOG_ERROR("net handler invalid");
602         return;
603     }
604 
605     this->DetachConn();
606 
607     this->_state_flags |= STATE_IN_IDLE;
608     conn->AppendToList(CSockLink::LINK_IDLE_LIST, this);
609 }
610 
611 
612 // ��������״̬����
613 void CNetHandler::DetachConn()
614 {
615     CSockLink* conn = (CSockLink*)this->_conn_ptr;
616     if (NULL == conn)
617     {
618         MTLOG_DEBUG("net handler not set");
619         return;
620     }
621 
622     if (_state_flags == 0)  // ��ʱ���¼�����, 2����֧, ����Ҫ������
623     {
624         return;
625     }
626 
627     if (_state_flags & STATE_IN_CONNECT)
628     {
629         conn->RemoveFromList(CSockLink::LINK_CONN_LIST, this);
630         _state_flags &= ~STATE_IN_CONNECT;
631     }
632 
633     if (_state_flags & STATE_IN_SEND)
634     {
635         conn->RemoveFromList(CSockLink::LINK_SEND_LIST, this);
636         _state_flags &= ~STATE_IN_SEND;
637     }
638 
639     if (_state_flags & STATE_IN_RECV)
640     {
641         conn->RemoveFromList(CSockLink::LINK_RECV_LIST, this);
642         _state_flags &= ~STATE_IN_RECV;
643     }
644 
645     if (_state_flags & STATE_IN_IDLE)
646     {
647         conn->RemoveFromList(CSockLink::LINK_IDLE_LIST, this);
648         _state_flags &= ~STATE_IN_IDLE;
649     }
650 }
651 
652 
653 
654 
655 /**
656  *  @brief �ڵ�Ԫ�ص�hash�㷨, ��ȡkey��hashֵ
657  *  @return �ڵ�Ԫ�ص�hashֵ
658  */
659 uint32_t CNetHandler::HashValue()
660 {
661     uint32_t ip = _dest_ipv4.sin_addr.s_addr;
662     ip ^= (_dest_ipv4.sin_port << 16) | (_proto_type << 8) | (_conn_type << 8);
663 
664     uint32_t hash = (_session_id >> 32) & 0xffffffff;
665     hash ^= _session_id  & 0xffffffff;
666     hash ^= ip;
667 
668     return hash;
669 }
670 
671 /**
672  *  @brief �ڵ�Ԫ�ص�cmp����, ͬһͰID��, ��key�Ƚ�
673  *  @return �ڵ�Ԫ�ص�hashֵ
674  */
675 int32_t CNetHandler::HashCmp(HashKey* rhs)
676 {
677     CNetHandler* data = (CNetHandler*)(rhs);
678     if (!data) {
679         return -1;
680     }
681     if (this->_session_id != data->_session_id)
682     {
683         return (this->_session_id > data->_session_id) ? 1 : -1;
684     }
685 
686     if (this->_dest_ipv4.sin_addr.s_addr != data->_dest_ipv4.sin_addr.s_addr) {
687         return (this->_dest_ipv4.sin_addr.s_addr > data->_dest_ipv4.sin_addr.s_addr) ? 1 : -1;
688     }
689     if (this->_dest_ipv4.sin_port != data->_dest_ipv4.sin_port) {
690         return (this->_dest_ipv4.sin_port > data->_dest_ipv4.sin_port) ? 1 : -1;
691     }
692     if (this->_proto_type != data->_proto_type) {
693         return (this->_proto_type > data->_proto_type) ? 1 : -1;
694     }
695     if (this->_conn_type != data->_conn_type) {
696         return (this->_conn_type > data->_conn_type) ? 1 : -1;
697     }
698 
699     return 0;
700 };
701 
702 
703 // ע��session����
704 bool CNetHandler::RegistSession()
705 {
706     if (CNetMgr::Instance()->FindNetItem(this) != NULL)
707     {
708         return false;
709     }
710 
711     MtFrame* mtframe = MtFrame::Instance();
712     this->_thread = mtframe->GetActiveThread();
713 
714     CNetMgr::Instance()->InsertNetItem(this);
715     this->_state_flags |= STATE_IN_SESSION;
716     return true;
717 }
718 
719 // ȡ��ע��session
720 void CNetHandler::UnRegistSession()
721 {
722     if (this->_state_flags & STATE_IN_SESSION)
723     {
724         CNetMgr::Instance()->RemoveNetItem(this);
725         this->_state_flags &= ~STATE_IN_SESSION;
726     }
727 }
728 
729 
730 // ��ȡ��������
731 TNetItemList* CSockLink::GetItemList(int32_t type)
732 {
733     TNetItemList* list = NULL;
734     switch (type)
735     {
736         case LINK_IDLE_LIST:
737             list = &this->_idle_list;
738             break;
739 
740         case LINK_CONN_LIST:
741             list = &this->_wait_connect;
742             break;
743 
744         case LINK_SEND_LIST:
745             list = &this->_wait_send;
746             break;
747 
748         case LINK_RECV_LIST:
749             list = &this->_wait_recv;
750             break;
751 
752         default:
753             break;
754     }
755 
756     return list;
757 }
758 
759 
760 // ��������Ϣ
761 void CSockLink::AppendToList(int32_t type, CNetHandler* item)
762 {
763     TNetItemList* list = this->GetItemList(type);
764     if (NULL == list)
765     {
766         MTLOG_ERROR("unknown list type: %d", type);
767         return;
768     }
769 
770     TAILQ_INSERT_TAIL(list, item, _link_entry);
771 }
772 
773 // ��������Ϣ
774 void CSockLink::RemoveFromList(int32_t type, CNetHandler* item)
775 {
776     TNetItemList* list = this->GetItemList(type);
777     if (NULL == list)
778     {
779         MTLOG_ERROR("unknown list type: %d", type);
780         return;
781     }
782 
783     TAILQ_REMOVE(list, item, _link_entry);
784 }
785 
786 
787 // ֪ͨ�����߳�
788 void CSockLink::NotifyThread(CNetHandler* item, int32_t result)
789 {
790     static MtFrame* frame = NULL;
791     if (frame == NULL) {
792         frame = MtFrame::Instance();
793     }
794 
795     // ���÷�������Ϣ
796     if (result != RC_SUCCESS)
797     {
798         item->SetErrNo(result);
799     }
800 
801     // ���ÿ�����
802     MicroThread* thread = item->GetThread();
803     if ((thread != NULL) && (thread->HasFlag(MicroThread::IO_LIST)))
804     {
805         frame->RemoveIoWait(thread);
806         frame->InsertRunable(thread);
807     }
808 }
809 
810 
811 // ֪ͨ�����߳�
812 void CSockLink::NotifyAll(int32_t result)
813 {
814     CNetHandler* item = NULL;
815     CNetHandler* tmp = NULL;
816 
817     TAILQ_FOREACH_SAFE(item, &_wait_connect, _link_entry, tmp)
818     {
819         NotifyThread(item, result);
820         item->Unlink();
821     }
822 
823     TAILQ_FOREACH_SAFE(item, &_wait_send, _link_entry, tmp)
824     {
825         NotifyThread(item, result);
826         item->Unlink();
827     }
828 
829     TAILQ_FOREACH_SAFE(item, &_wait_recv, _link_entry, tmp)
830     {
831         NotifyThread(item, result);
832         item->Unlink();
833     }
834 
835     TAILQ_FOREACH_SAFE(item, &_idle_list, _link_entry, tmp)
836     {
837         NotifyThread(item, result);
838         item->Unlink();
839     }
840 }
841 
842 
843 // �����ó�ʼ���߼�
844 void CSockLink::Reset()
845 {
846     // �ر�fd, ֪ͨ�������߳�
847     this->Close();
848     this->NotifyAll(_errno);
849 
850     // ����cache����
851     rw_cache_destroy(&_recv_cache);
852     if (_rsp_buff != NULL)
853     {
854         delete_sk_buffer(_rsp_buff);
855         _rsp_buff = NULL;
856     }
857 
858     // ����ȴ�����, ���ѵȴ��߳�
859     TAILQ_INIT(&_wait_connect);
860     TAILQ_INIT(&_wait_send);
861     TAILQ_INIT(&_wait_recv);
862     TAILQ_INIT(&_idle_list);
863 
864     _proto_type     = NET_PROTO_TCP;
865     _errno          = 0;
866     _state          = 0;
867     _last_access    = mt_time_ms();
868     _parents        = NULL;
869 
870     // �����������
871     this->KqueuerObj::Reset();
872 }
873 
874 // ��������������
875 CSockLink::CSockLink()
876 {
877     rw_cache_init(&_recv_cache, NULL);
878     _rsp_buff       = NULL;
879 
880     TAILQ_INIT(&_wait_connect);
881     TAILQ_INIT(&_wait_send);
882     TAILQ_INIT(&_wait_recv);
883     TAILQ_INIT(&_idle_list);
884 
885     _proto_type     = NET_PROTO_TCP;
886     _errno          = 0;
887     _state          = 0;
888     _last_access    = mt_time_ms();
889     _parents        = NULL;
890 }
891 
892 // ��������������
893 CSockLink::~CSockLink()
894 {
895     this->Reset();
896 }
897 
898 
899 // ����Э������, ����buff�ص�ָ��
900 void CSockLink::SetProtoType(MT_PROTO_TYPE type)
901 {
902     _proto_type = type;
903     _recv_cache.pool = CNetMgr::Instance()->GetSkBuffMng(type);
904 }
905 
906 // �ر���·�ľ��
907 void CSockLink::Close()
908 {
909     // 1. ���δ��ʼ��, ֱ�ӷ���
910     if (_fd < 0)
911     {
912         return;
913     }
914 
915     // 2. ��������ͷ�
916     MtFrame::Instance()->KqueueDelObj(this);
917 
918     // 3. �رվ��
919     close(_fd);
920     _fd = -1;
921 }
922 
923 
924 // �쳣��ֹ�Ĵ�����
925 void CSockLink::Destroy()
926 {
927     CDestLinks* dstlink = (CDestLinks*)_parents;
928     if (NULL == dstlink)
929     {
930         MTLOG_ERROR("socket link without parents ptr, maybe wrong");
931         delete this;
932     }
933     else
934     {
935         MTLOG_DEBUG("socket link just free");
936         dstlink->FreeSockLink(this);
937     }
938 }
939 
940 
941 // ������socket���
942 int32_t CSockLink::CreateSock()
943 {
944     if (_fd > 0)        // ��������ʱ, ��������������;
945     {
946         return _fd;
947     }
948 
949     // �������
950     if (NET_PROTO_TCP == _proto_type)
951     {
952         _fd = socket(AF_INET, SOCK_STREAM, 0);
953     }
954     else
955     {
956         _fd = socket(AF_INET, SOCK_DGRAM, 0);
957     }
958 
959     if (_fd < 0)
960     {
961         MTLOG_ERROR("create socket failed, ret %d[%m]", _fd);
962         return -1;
963     }
964 
965     // ���÷�����
966     int flags = 1;
967     if (ioctl(_fd, FIONBIO, &flags) < 0)
968     {
969         MTLOG_ERROR("socket unblock failed, %m");
970         close(_fd);
971         _fd = -1;
972         return -2;
973     }
974 
975     // ѡ������, NODELAY
976     if (NET_PROTO_TCP == _proto_type)
977     {
978         setsockopt(_fd, IPPROTO_TCP, TCP_NODELAY, &flags, sizeof(flags));
979         this->EnableOutput(); // TCP �ȴ�connect����, ʡһ��epollctrl
980     }
981 
982     // ����epoll���
983     this->EnableInput();
984     if (!MtFrame::Instance()->KqueueAddObj(this))
985     {
986         MTLOG_ERROR("socket epoll mng failed, %m");
987         close(_fd);
988         _fd = -1;
989         return -3;
990     }
991 
992     return _fd;
993 }
994 
995 // ��ȡĿ��ip��Ϣ
996 struct sockaddr_in* CSockLink::GetDestAddr(struct sockaddr_in* addr)
997 {
998     CDestLinks* dstlink = (CDestLinks*)_parents;
999     if ((NULL == _parents) || (NULL == addr)) {
1000         return NULL;
1001     }
1002 
1003     uint32_t ip = 0;
1004     uint16_t port = 0;
1005     dstlink->GetDestIP(ip, port);
1006 
1007     addr->sin_family = AF_INET;
1008     addr->sin_addr.s_addr = ip;
1009     addr->sin_port = port;
1010 
1011     return addr;
1012 }
1013 
1014 
1015 // �������ӹ���
1016 bool CSockLink::Connect()
1017 {
1018     this->_last_access = mt_time_ms();
1019 
1020     // 1. UDP�����ӹ���
1021     if (_proto_type == NET_PROTO_UDP)
1022     {
1023         _state |= LINK_CONNECTED;
1024     }
1025 
1026     // 2. ������״̬, �ɹ�����
1027     if (_state & LINK_CONNECTED)
1028     {
1029         return true;
1030     }
1031 
1032     // 3. ����������, �˳��ȴ�
1033     if (_state & LINK_CONNECTING)
1034     {
1035         return false;
1036     }
1037 
1038     // 4. ������, �״����ӳ���
1039     struct sockaddr_in addr = {0};
1040 
1041     mt_hook_syscall(connect);
1042     int32_t ret = ff_hook_connect(_fd, (struct sockaddr*)this->GetDestAddr(&addr), sizeof(struct sockaddr_in));
1043     if (ret < 0)
1044     {
1045         int32_t err = errno;
1046         if (err == EISCONN)
1047         {
1048             _state |= LINK_CONNECTED;
1049             return true;
1050         }
1051         else
1052         {
1053             _state |= LINK_CONNECTING;
1054             if ((err == EINPROGRESS) || (err == EALREADY) || (err == EINTR))
1055             {
1056                 MTLOG_DEBUG("Open connect not ok, maybe first try, sock %d, errno %d", _fd, err);
1057                 return false;
1058             }
1059             else
1060             {
1061                 MTLOG_ERROR("Open connect not ok, sock %d, errno %d", _fd, err);
1062                 return false;
1063             }
1064         }
1065     }
1066     else
1067     {
1068         _state |= LINK_CONNECTED;
1069         return true;
1070     }
1071 }
1072 
1073 // ��UDP�ķ�ʽ���͵ȴ�������, һ��ֱ�Ӿͷ���OK
1074 int32_t CSockLink::SendCacheUdp(void* data, uint32_t len)
1075 {
1076     mt_hook_syscall(sendto);
1077     void* buff = NULL;
1078     uint32_t buff_len = 0;
1079 
1080     CNetHandler* item = NULL;
1081     CNetHandler* tmp = NULL;
1082     struct sockaddr_in dst = {0};
1083 
1084     // 1. ���Է��͵ȴ�����, ����ָ�����߳�
1085     TAILQ_FOREACH_SAFE(item, &_wait_send, _link_entry, tmp)
1086     {
1087         item->GetSendData(buff, buff_len);
1088         if ((NULL == buff) || (buff_len == 0))
1089         {
1090             MTLOG_ERROR("get buff ptr invalid, log it");
1091             NotifyThread(item, 0);
1092             item->SwitchToIdle();
1093             continue;
1094         }
1095 
1096         int32_t ret = ff_hook_sendto(_fd, buff, buff_len, 0,
1097                     (struct sockaddr*)this->GetDestAddr(&dst), sizeof(struct sockaddr_in));
1098         if (ret == -1)
1099         {
1100             if ((errno == EINTR) || (errno == EAGAIN) || (errno == EINPROGRESS))
1101             {
1102                 return 0;
1103             }
1104             else
1105             {
1106                 MTLOG_ERROR("socket send failed, fd %d, errno %d(%s)", _fd,
1107                           errno, strerror(errno));
1108                 return -2;
1109             }
1110         }
1111 
1112         NotifyThread(item, 0);
1113         item->SwitchToIdle();
1114     }
1115 
1116     // 2. ������OK��, �ٷ��ͱ�������, û�д���������������
1117     if ((data == NULL) || (len == 0))
1118     {
1119         return 0;
1120     }
1121 
1122     int32_t ret = ff_hook_sendto(_fd, data, len, 0,
1123                     (struct sockaddr*)this->GetDestAddr(&dst), sizeof(struct sockaddr_in));
1124     if (ret == -1)
1125     {
1126         if ((errno == EINTR) || (errno == EAGAIN) || (errno == EINPROGRESS))
1127         {
1128             return 0;
1129         }
1130         else
1131         {
1132             MTLOG_ERROR("socket send failed, fd %d, errno %d(%s)", _fd,
1133                    errno, strerror(errno));
1134             return -2;
1135         }
1136     }
1137     else
1138     {
1139         return ret;
1140     }
1141 }
1142 
1143 
1144 // TCP�Ļ��巢�ʹ���
1145 int32_t CSockLink::SendCacheTcp(void* data, uint32_t len)
1146 {
1147     void* buff = NULL;
1148     uint32_t buff_len = 0;
1149     struct iovec iov[64];
1150     int32_t count = 0;
1151     CNetHandler* item = NULL;
1152     CNetHandler* tmp = NULL;
1153 
1154     // 1. ���Է��͵ȴ�����, ����ָ�����߳�
1155     TAILQ_FOREACH_SAFE(item, &_wait_send, _link_entry, tmp)
1156     {
1157         item->GetSendData(buff, buff_len);
1158         iov[count].iov_base = buff;
1159         iov[count].iov_len  = (int32_t)buff_len;
1160         count++;
1161         if (count >= 64)
1162         {
1163             break;
1164         }
1165     }
1166     if ((count < 64) && (data != NULL))
1167     {
1168         iov[count].iov_base = data;
1169         iov[count].iov_len  = (int32_t)len;
1170         count++;
1171     }
1172 
1173     ssize_t bytes = writev(_fd, iov, count);
1174     if (bytes < 0)
1175     {
1176         if ((errno == EAGAIN) || (errno == EINTR))
1177         {
1178             return 0;
1179         }
1180         else
1181         {
1182             MTLOG_ERROR("socket writev failed, fd %d, errno %d(%s)", _fd,
1183                    errno, strerror(errno));
1184             return -1;
1185         }
1186     }
1187 
1188     // 2. ���Է��͵ȴ�����, ����ָ�����߳�
1189     uint32_t send_left = (uint32_t)bytes;
1190     TAILQ_FOREACH_SAFE(item, &_wait_send, _link_entry, tmp)
1191     {
1192         send_left -= item->SkipSendPos(send_left);
1193         item->GetSendData(buff, buff_len);
1194         if (buff_len == 0)
1195         {
1196             NotifyThread(item, 0);
1197             item->SwitchToIdle();
1198         }
1199 
1200         if (send_left == 0)
1201         {
1202             break;
1203         }
1204     }
1205 
1206     return send_left;
1207 }
1208 
1209 
1210 // �������ӹ���
1211 int32_t CSockLink::SendData(void* data, uint32_t len)
1212 {
1213     int32_t ret = 0;
1214     bool rc = false;
1215 
1216     this->_last_access = mt_time_ms();
1217 
1218     // 1. ���Է�������, �ȷ����Ŷӵ�����
1219     if (_proto_type == NET_PROTO_UDP)
1220     {
1221         ret = SendCacheUdp(data, len);
1222     }
1223     else
1224     {
1225         ret = SendCacheTcp(data, len);
1226     }
1227 
1228     // 2. ��ǰ�����Ƿ������, �����, ���Բ�������OUT
1229     if (ret < (int32_t)len)
1230     {
1231         this->EnableOutput();
1232         rc = MtFrame::Instance()->KqueueCtrlAdd(_fd, KQ_EVENT_READ);
1233     }
1234     else
1235     {
1236         this->DisableOutput();
1237         rc = MtFrame::Instance()->KqueueCtrlDel(_fd, KQ_EVENT_WRITE);
1238     }
1239 
1240     // 3. ����ˢ�¾��epollע��
1241     if (!rc)
1242     {
1243         MTLOG_ERROR("socket epoll mng failed[%m], wait timeout");
1244     }
1245 
1246     return ret;
1247 }
1248 
1249 // ���ݷַ��������
1250 int32_t CSockLink::RecvDispath()
1251 {
1252     if (_proto_type == NET_PROTO_UDP)
1253     {
1254         return this->DispathUdp();
1255     }
1256     else
1257     {
1258         return this->DispathTcp();
1259     }
1260 }
1261 
1262 // ���Խ��ո�������ݵ���ʱbuff
1263 void CSockLink::ExtendRecvRsp()
1264 {
1265     if (NULL == _rsp_buff)
1266     {
1267         // buff���η���, ���̫��, �ᵼ���ظ��Ŀ���; ���̫С, �ᵼ��2��check���
1268         // Ȩ��һ��, �ݶ�500�ֽ�����, ���ֳ���, ������ȫ, �˷Ѳ���, С���ֳ���
1269         // ��Ҫrealloc, ��500�ֽڿ�������Ҳ����
1270         _rsp_buff = new_sk_buffer(512);
1271         if (NULL == _rsp_buff)
1272         {
1273             MTLOG_ERROR("no more memory, error");
1274             return;
1275         }
1276     }
1277 
1278     _rsp_buff->data_len +=  read_cache_begin(&_recv_cache, _rsp_buff->data_len,
1279         _rsp_buff->data + _rsp_buff->data_len , _rsp_buff->size - _rsp_buff->data_len);
1280 }
1281 
1282 // ���߻ص�����, ���ȴ��Ŷӵȴ��л�ȡ, ���ݴӸ��ڵ��ȡ
1283 CHECK_SESSION_CALLBACK CSockLink::GetSessionCallback()
1284 {
1285     CHECK_SESSION_CALLBACK check_session = NULL;
1286 
1287     // 1. �Ŷӻ�ȡ�ص�����
1288     CNetHandler* item = TAILQ_FIRST(&_wait_recv);
1289     if (NULL == item)
1290     {
1291         MTLOG_DEBUG("recv data with no wait item, err");
1292         goto EXIT_LABEL;
1293     }
1294 
1295     check_session = item->GetSessionCallback();
1296     if (NULL == check_session)
1297     {
1298         MTLOG_ERROR("recv data with no session callback, err");
1299         goto EXIT_LABEL;
1300     }
1301 
1302 EXIT_LABEL:
1303 
1304     // 2. ������ڵ�ΪNULL, ֱ�ӷ��ض��н��
1305     CDestLinks* dstlink = (CDestLinks*)_parents;
1306     if (NULL == dstlink)
1307     {
1308         return check_session;
1309     }
1310 
1311     // 3. ������н��Ϊ��, ����·Ĭ�ϻ����func; ��Ϊ��, ������
1312     if (check_session != NULL)
1313     {
1314         dstlink->SetDefaultCallback(check_session);
1315     }
1316     else
1317     {
1318         check_session = dstlink->GetDefaultCallback();
1319     }
1320 
1321     return check_session;
1322 }
1323 
1324 
1325 // TCP����������������ַ�
1326 int32_t CSockLink::DispathTcp()
1327 {
1328     // 1. TCP�޵ȴ�����, ������ʽ, �������β, ֻ�ܹر�
1329     CHECK_SESSION_CALLBACK check_session = this->GetSessionCallback();
1330     if (NULL == check_session)
1331     {
1332         MTLOG_ERROR("recv data with no session callback, err");
1333         return -1;
1334     }
1335 
1336     // 2. ����ͬһIP/PORT��Э��, ��������������ͨ��
1337     uint32_t need_len = 0;
1338     uint64_t sid = 0;
1339     int32_t ret = 0;
1340     while (_recv_cache.len > 0)
1341     {
1342         this->ExtendRecvRsp();
1343         if (NULL == _rsp_buff)
1344         {
1345             MTLOG_ERROR("alloc memory, error");
1346             _errno = RC_MEM_ERROR;
1347             return -3;
1348         }
1349 
1350         need_len = 0;
1351         ret = check_session(_rsp_buff->data, _rsp_buff->data_len, &sid, &need_len);
1352 
1353         if (ret < 0)
1354         {
1355             MTLOG_ERROR("user check resp failed, ret %d", ret);
1356             _errno = RC_CHECK_PKG_FAIL;
1357             return -1;
1358         }
1359 
1360         if (ret == 0)
1361         {
1362             // 1. �û������ָ������, Ĭ��2����չ, ���ܻ���Ӱ��
1363             if ((need_len == 0) && (_rsp_buff->data_len == _rsp_buff->size))
1364             {
1365                 MTLOG_DEBUG("recv default buff full[%u], but user no set need length", _rsp_buff->size);
1366                 need_len = _rsp_buff->size * 2;
1367             }
1368 
1369             // 2. ����ʣ��ռ�, ������ȴ�����; �����ռ䳬��, �������
1370             if ((need_len <= _rsp_buff->size) || (need_len > 100*1024*1024))
1371             {
1372                 MTLOG_DEBUG("maybe need wait more data: %u", need_len);
1373                 return 0;
1374             }
1375 
1376             // 3. ��չbuff����, ׼���ٳ���һ��
1377             _rsp_buff = reserve_sk_buffer(_rsp_buff, need_len);
1378             if (NULL == _rsp_buff)
1379             {
1380                 MTLOG_ERROR("no more memory, error");
1381                 _errno = RC_MEM_ERROR;
1382                 return -3;
1383             }
1384 
1385             // 4. �Ѿ��޶����������Ϣ, �ȴ������հ�
1386             if (_rsp_buff->data_len >= _recv_cache.len)
1387             {
1388                 MTLOG_DEBUG("maybe need wait more data, now %u", _recv_cache.len);
1389                 return 0;
1390             }
1391 
1392             // 5. ������������, ���Խ���
1393             continue;
1394         }
1395 
1396         // �����쳣����, ����ʵ��δ������, �����ȴ�
1397         if (ret > (int32_t)_recv_cache.len)
1398         {
1399             MTLOG_DEBUG("maybe pkg not all ok, wait more");
1400             return 0;
1401         }
1402 
1403         // ��ѯ��session�Ķ���
1404         CNetHandler* session = this->FindSession(sid);
1405         if (NULL == session)
1406         {
1407             MTLOG_DEBUG("session id %llu, find failed, maybe timeout", sid);
1408             cache_skip_data(&_recv_cache, ret);
1409             delete_sk_buffer(_rsp_buff);
1410             _rsp_buff = NULL;
1411         }
1412         else
1413         {
1414             MTLOG_DEBUG("session id %llu, find ok, wakeup it", sid);
1415             cache_skip_data(&_recv_cache, ret);
1416             this->NotifyThread(session, 0);
1417             session->SwitchToIdle();
1418             _rsp_buff->data_len = ret;  //TCPʵ����Ч�ı��ij��ȱ��
1419             session->SetRespBuff(_rsp_buff);
1420             _rsp_buff = NULL;
1421         }
1422     }
1423 
1424     return 0;
1425 
1426 }
1427 
1428 
1429 // UDP����������������ַ�
1430 int32_t CSockLink::DispathUdp()
1431 {
1432     // 1. UDP���ݴ���, ������Ҳ���Զ��������еı���
1433     CHECK_SESSION_CALLBACK check_session = NULL;
1434     CNetHandler* item = TAILQ_FIRST(&_wait_recv);
1435     if (NULL == item)
1436     {
1437         MTLOG_DEBUG("recv data with no wait item, maybe wrong pkg recv");
1438         // �˴����˳�, ��Ϊ���Զ���UDPӦ��(��ʱ��\������)
1439     }
1440     else
1441     {
1442         check_session = item->GetSessionCallback();
1443         if (NULL == check_session)
1444         {
1445             MTLOG_TRACE("recv data with no session callback, err");
1446         }
1447     }
1448 
1449     // 2. ����ÿ����, �ҵ�session�ʹ���, ������
1450     uint64_t sid = 0;
1451     uint32_t need_len = 0;
1452     int32_t ret = 0;
1453     TSkBuffer* block = NULL;
1454     while ((block = TAILQ_FIRST(&_recv_cache.list)) != NULL)
1455     {
1456         if (check_session == NULL)
1457         {
1458             MTLOG_DEBUG("no recv wait, skip first block");
1459             cache_skip_data(&_recv_cache, block->data_len);
1460             continue;
1461         }
1462 
1463         need_len = 0;
1464         ret = check_session(block->data, block->data_len, &sid, &need_len);
1465         if ((ret <= 0) || (ret > (int32_t)block->data_len))
1466         {
1467             MTLOG_DEBUG("maybe wrong pkg come, skip it");
1468             cache_skip_data(&_recv_cache, block->data_len);
1469             continue;
1470         }
1471 
1472         // ��ѯ��session�Ķ���
1473         CNetHandler* session = this->FindSession(sid);
1474         if (NULL == session)
1475         {
1476             MTLOG_DEBUG("session id %llu, find failed, maybe timeout", sid);
1477             cache_skip_data(&_recv_cache, block->data_len);
1478         }
1479         else
1480         {
1481             MTLOG_DEBUG("session id %llu, find ok, wakeup it", sid);
1482             this->NotifyThread(session, 0);
1483             session->SwitchToIdle();
1484             cache_skip_first_buffer(&_recv_cache);
1485             session->SetRespBuff(block);
1486         }
1487     }
1488 
1489     return 0;
1490 }
1491 
1492 
1493 // ��ѯ����sessionid������session��Ϣ
1494 CNetHandler* CSockLink::FindSession(uint64_t sid)
1495 {
1496     CNetHandler key;
1497     CDestLinks* dstlink = (CDestLinks*)_parents;
1498     if (NULL == dstlink)
1499     {
1500         MTLOG_ERROR("session dest link invalid, maybe error");
1501         return NULL;
1502     }
1503     struct sockaddr_in addr;
1504     key.SetDestAddress(this->GetDestAddr(&addr));
1505     key.SetConnType(dstlink->GetConnType());
1506     key.SetProtoType(dstlink->GetProtoType());
1507     key.SetSessionId(sid);
1508 
1509     return CNetMgr::Instance()->FindNetItem(&key);
1510 }
1511 
1512 
1513 
1514 /**
1515  *  @brief �ɶ��¼�֪ͨ�ӿ�, ����֪ͨ������ܻ��ƻ�����, ���÷���ֵ����
1516  *  @return 0 ��fd�ɼ������������¼�; !=0 ��fd�������ص�����
1517  */
1518 int CSockLink::InputNotify()
1519 {
1520     int32_t ret = 0;
1521 
1522     this->_last_access = mt_time_ms();
1523 
1524     // 1. ��������, ��������
1525     if (_proto_type == NET_PROTO_UDP)
1526     {
1527         ret = cache_udp_recv(&_recv_cache, _fd, NULL);
1528     }
1529     else
1530     {
1531         ret = cache_tcp_recv(&_recv_cache, _fd);
1532     }
1533 
1534     // 2. ����ʧ������������
1535     if (ret < 0)
1536     {
1537         if (ret == -SK_ERR_NEED_CLOSE)
1538         {
1539             MTLOG_DEBUG("recv on link failed, remote close");
1540             _errno = RC_REMOTE_CLOSED;
1541         }
1542         else
1543         {
1544             MTLOG_ERROR("recv on link failed, close it, ret %d[%m]", ret);
1545             _errno = RC_RECV_FAIL;
1546         }
1547 
1548         this->Destroy();
1549         return -1;
1550     }
1551 
1552     // 3. �ַ�����, ������TCP
1553     ret = this->RecvDispath();
1554     if (ret < 0)
1555     {
1556         MTLOG_DEBUG("recv dispath failed, close it, ret %d[%m]", ret);
1557         this->Destroy();
1558         return -2;
1559     }
1560 
1561     // 4. �ɹ�����
1562     return 0;
1563 
1564 }
1565 
1566 /**
1567  *  @brief ��д�¼�֪ͨ�ӿ�, ����֪ͨ������ܻ��ƻ�����, ���÷���ֵ����
1568  *  @return 0 ��fd�ɼ������������¼�; !=0 ��fd�������ص�����
1569  */
1570 int CSockLink::OutputNotify()
1571 {
1572     int32_t ret = 0;
1573 
1574     this->_last_access = mt_time_ms();
1575 
1576     // 1. ���ӵȴ�����������л�
1577     if (_state & LINK_CONNECTING)
1578     {
1579         _state &= ~LINK_CONNECTING;
1580         _state |= LINK_CONNECTED;
1581 
1582         CNetHandler* item = NULL;
1583         CNetHandler* tmp = NULL;
1584         TAILQ_FOREACH_SAFE(item, &_wait_connect, _link_entry, tmp)
1585         {
1586             NotifyThread(item, 0);
1587             item->SwitchToIdle();
1588         }
1589     }
1590 
1591     // 2. ���Է�������
1592     if (_proto_type == NET_PROTO_UDP)
1593     {
1594         ret = SendCacheUdp(NULL, 0);
1595     }
1596     else
1597     {
1598         ret = SendCacheTcp(NULL, 0);
1599     }
1600 
1601     // 3. ����ʧ������������
1602     if (ret < 0)
1603     {
1604         MTLOG_ERROR("Send on link failed, close it, ret %d[%m]", ret);
1605         _errno = RC_SEND_FAIL;
1606         this->Destroy();
1607         return ret;
1608     }
1609 
1610     // 4. ��������, �����������epoll
1611     if (TAILQ_EMPTY(&_wait_send))
1612     {
1613         this->DisableOutput();
1614         if (!MtFrame::Instance()->KqueueCtrlDel(_fd, KQ_EVENT_WRITE))
1615         {
1616             MTLOG_ERROR("socket epoll mng failed[%m], wait timeout");
1617         }
1618     }
1619 
1620     return 0;
1621 }
1622 
1623 
1624 /**
1625  *  @brief �쳣֪ͨ�ӿ�
1626  *  @return ���Է���ֵ, ���������¼�����
1627  */
1628 int CSockLink::HangupNotify()
1629 {
1630     MTLOG_ERROR("socket epoll error, fd %d", _fd);
1631 
1632     this->_errno = RC_KQUEUE_ERROR;
1633     this->Destroy();
1634     return -1;
1635 }
1636 
1637 
1638 // ���캯��
1639 CDestLinks::CDestLinks()
1640 {
1641     _timeout        = 5*60*1000; // Ĭ��5����
1642     _addr_ipv4      = 0;
1643     _net_port       = 0;
1644     _proto_type     = NET_PROTO_UNDEF;
1645     _conn_type      = TYPE_CONN_SESSION;
1646 
1647     _max_links      = 3; // Ĭ��3��
1648     _curr_link      = 0;
1649     _dflt_callback  = NULL;
1650 
1651     TAILQ_INIT(&_sock_list);
1652 }
1653 
1654 
1655 // ���ø��õĽӿں���
1656 void CDestLinks::Reset()
1657 {
1658     // �������Ӷ���
1659     CSockLink* item = NULL;
1660     CSockLink* temp = NULL;
1661     TAILQ_FOREACH_SAFE(item, &_sock_list, _link_entry, temp)
1662     {
1663         item->Destroy();
1664     }
1665     TAILQ_INIT(&_sock_list);
1666 
1667     // ��ʱ��ɾ��
1668     CTimerMng* timer = MtFrame::Instance()->GetTimerMng();
1669     if (NULL != timer)
1670     {
1671         timer->stop_timer(this);
1672     }
1673 
1674     // ����Ĭ���ֶ���Ϣ
1675     _timeout        = 5*60*1000; // Ĭ��5����
1676     _addr_ipv4      = 0;
1677     _net_port       = 0;
1678     _proto_type     = NET_PROTO_UNDEF;
1679     _conn_type      = TYPE_CONN_SESSION;
1680 
1681     _max_links      = 3; // Ĭ��3��
1682     _curr_link      = 0;
1683 }
1684 
1685 // ���캯��
1686 CDestLinks::~CDestLinks()
1687 {
1688     this->Reset();
1689 }
1690 
1691 // ������ʱ��
1692 void CDestLinks::StartTimer()
1693 {
1694     CTimerMng* timer = MtFrame::Instance()->GetTimerMng();
1695     if ((NULL == timer) || !timer->start_timer(this, 60*1000))
1696     {
1697         MTLOG_ERROR("obj %p attach timer failed, error", this);
1698     }
1699 }
1700 
1701 
1702 // �ͷ�һ������link
1703 void CDestLinks::FreeSockLink(CSockLink* sock)
1704 {
1705     if ((sock == NULL) || (sock->GetParentsPtr() != (void*)this))
1706     {
1707         MTLOG_ERROR("invalid socklink %p, error", sock);
1708         return;
1709     }
1710 
1711     TAILQ_REMOVE(&_sock_list, sock, _link_entry);
1712     if (this->_curr_link > 0) {
1713         this->_curr_link--;
1714     }
1715 
1716     sock->Reset();
1717     CNetMgr::Instance()->FreeSockLink(sock);
1718 }
1719 
1720 
1721 // ��ȡһ������link, ĿǰΪ��ѯ
1722 CSockLink* CDestLinks::GetSockLink()
1723 {
1724     CSockLink* link = NULL;
1725     if (_curr_link < _max_links)
1726     {
1727         link = CNetMgr::Instance()->AllocSockLink();
1728         if (NULL == link)
1729         {
1730             MTLOG_ERROR("alloc sock link failed, error");
1731             return NULL;
1732         }
1733         link->SetParentsPtr(this);
1734         link->SetProtoType(_proto_type);
1735         TAILQ_INSERT_TAIL(&_sock_list, link, _link_entry);
1736         _curr_link++;
1737     }
1738     else
1739     {
1740         link = TAILQ_FIRST(&_sock_list);
1741         TAILQ_REMOVE(&_sock_list, link, _link_entry);
1742         TAILQ_INSERT_TAIL(&_sock_list, link, _link_entry);
1743     }
1744 
1745     return link;
1746 }
1747 
1748 /**
1749  * @brief ��ʱ֪ͨ����, ��������·, ���������·����
1750  */
1751 void CDestLinks::timer_notify()
1752 {
1753     // 1. ����Ƿ��п��е���·, ɾ����
1754     uint64_t now = mt_time_ms();
1755     CSockLink* item = NULL;
1756     CSockLink* temp = NULL;
1757     TAILQ_FOREACH_SAFE(item, &_sock_list, _link_entry, temp)
1758     {
1759         if ((item->GetLastAccess() + this->_timeout) < now)
1760         {
1761             MTLOG_DEBUG("link timeout, last[%llu], now [%llu]", item->GetLastAccess(), now);
1762             item->Destroy();
1763         }
1764     }
1765 
1766     // 2. ����Ƿ�����Ч����·, û����ע��Ŀ����
1767     item = TAILQ_FIRST(&_sock_list);
1768     if (NULL == item)
1769     {
1770         MTLOG_DEBUG("dest links timeout, now [%llu]", now);
1771         CNetMgr::Instance()->DeleteDestLink(this);
1772         return;
1773     }
1774 
1775     // 3. ���¼��붨ʱ������
1776     this->StartTimer();
1777 
1778     return;
1779 }
1780 
1781 
1782 /**
1783  * @brief sessionȫ�ֹ�����
1784  * @return ȫ�־��ָ��
1785  */
1786 CNetMgr* CNetMgr::_instance = NULL;
1787 CNetMgr* CNetMgr::Instance (void)
1788 {
1789     if (NULL == _instance)
1790     {
1791         _instance = new CNetMgr();
1792     }
1793 
1794     return _instance;
1795 }
1796 
1797 /**
1798  * @brief session����ȫ�ֵ����ٽӿ�
1799  */
1800 void CNetMgr::Destroy()
1801 {
1802     if( _instance != NULL )
1803     {
1804         delete _instance;
1805         _instance = NULL;
1806     }
1807 }
1808 
1809 // ��ѯ�Ƿ��Ѿ�����ͬһ��sid�Ķ���
1810 CNetHandler* CNetMgr::FindNetItem(CNetHandler* key)
1811 {
1812     if (NULL == this->_session_hash)
1813     {
1814         return NULL;
1815     }
1816 
1817     return (CNetHandler*)_session_hash->HashFind(key);
1818 }
1819 
1820 // ע��һ��item, �Ȳ�ѯ�����, ��֤�޳�ͻ
1821 void CNetMgr::InsertNetItem(CNetHandler* item)
1822 {
1823     if (NULL == this->_session_hash)
1824     {
1825         return;
1826     }
1827 
1828     int32_t ret = _session_hash->HashInsert(item);
1829     if (ret < 0)
1830     {
1831         MTLOG_ERROR("session insert failed, ret %d", ret);
1832     }
1833 
1834     return;
1835 }
1836 
1837 // �Ƴ�һ��item����
1838 void CNetMgr::RemoveNetItem(CNetHandler* item)
1839 {
1840     CNetHandler* handler =  this->FindNetItem(item);
1841     if (NULL == handler)
1842     {
1843         return;
1844     }
1845 
1846     _session_hash->HashRemove(handler);
1847 }
1848 
1849 // ��ѯ�Ƿ��Ѿ�����ͬһ��sid�Ķ���
1850 CDestLinks* CNetMgr::FindDestLink(CDestLinks* key)
1851 {
1852     if (NULL == this->_ip_hash)
1853     {
1854         return NULL;
1855     }
1856 
1857     return (CDestLinks*)_ip_hash->HashFind(key);
1858 }
1859 
1860 // ע��һ��item, �Ȳ�ѯ�����, ��֤�޳�ͻ
1861 void CNetMgr::InsertDestLink(CDestLinks* item)
1862 {
1863     if (NULL == this->_ip_hash)
1864     {
1865         return;
1866     }
1867 
1868     int32_t ret = _ip_hash->HashInsert(item);
1869     if (ret < 0)
1870     {
1871         MTLOG_ERROR("ip dest insert failed, ret %d", ret);
1872     }
1873 
1874     return;
1875 }
1876 
1877 // �Ƴ�һ��item����
1878 void CNetMgr::RemoveDestLink(CDestLinks* item)
1879 {
1880     CDestLinks* handler =  this->FindDestLink(item);
1881     if (NULL == handler)
1882     {
1883         return;
1884     }
1885 
1886     _ip_hash->HashRemove(handler);
1887 }
1888 
1889 
1890 // ��ѯ����һ��Ŀ��ip��links�ڵ�
1891 CDestLinks* CNetMgr::FindCreateDest(CDestLinks* key)
1892 {
1893     CDestLinks* dest = this->FindDestLink(key);
1894     if (dest != NULL)
1895     {
1896         MTLOG_DEBUG("dest links reuse ok");
1897         return dest;
1898     }
1899 
1900     dest = this->AllocDestLink();
1901     if (NULL == dest)
1902     {
1903         MTLOG_ERROR("dest links alloc failed, log it");
1904         return NULL;
1905     }
1906 
1907     dest->CopyKeyInfo(key);
1908     dest->StartTimer();
1909     this->InsertDestLink(dest);
1910 
1911     return dest;
1912 }
1913 
1914 
1915 // ��ѯ����һ��Ŀ��ip��links�ڵ�
1916 void CNetMgr::DeleteDestLink(CDestLinks* dst)
1917 {
1918     this->RemoveDestLink(dst);
1919     dst->Reset();  // ֱ��freeǰ���� reset
1920     this->FreeDestLink(dst);
1921 }
1922 
1923 
1924 // ���캯��ʵ��
1925 CNetMgr::CNetMgr()
1926 {
1927     sk_buffer_mng_init(&_tcp_pool, 60, 4096);
1928     sk_buffer_mng_init(&_udp_pool, 60, SK_DFLT_BUFF_SIZE);
1929 
1930     _ip_hash = new HashList(100000);
1931     _session_hash = new HashList(100000);
1932 }
1933 
1934 
1935 // ��������ʵ��
1936 CNetMgr::~CNetMgr()
1937 {
1938     // �������е�dest��Դ
1939     if (_ip_hash != NULL)
1940     {
1941         HashKey* hash_item = _ip_hash->HashGetFirst();
1942         while (hash_item)
1943         {
1944             delete hash_item;
1945             hash_item = _ip_hash->HashGetFirst();
1946         }
1947 
1948         delete _ip_hash;
1949         _ip_hash = NULL;
1950     }
1951 
1952     // �������е� netitem ��Դ
1953     if (_session_hash != NULL)
1954     {
1955         HashKey* hash_item = _session_hash->HashGetFirst();
1956         while (hash_item)
1957         {
1958             delete hash_item;
1959             hash_item = _session_hash->HashGetFirst();
1960         }
1961 
1962         delete _session_hash;
1963         _session_hash = NULL;
1964     }
1965 
1966     // ����buff����Դ
1967     sk_buffer_mng_destroy(&_tcp_pool);
1968     sk_buffer_mng_destroy(&_udp_pool);
1969 }
1970 
1971 
1972 /**
1973  * @brief ������Դ��Ϣ
1974  */
1975 void CNetMgr::RecycleObjs(uint64_t now)
1976 {
1977     uint32_t now_s = (uint32_t)(now / 1000);
1978 
1979     recycle_sk_buffer(&_udp_pool, now_s);
1980     recycle_sk_buffer(&_tcp_pool, now_s);
1981 
1982     _net_item_pool.RecycleItem(now);
1983     _sock_link_pool.RecycleItem(now);
1984     _dest_ip_pool.RecycleItem(now);
1985 }
1986 
1987 
1988