xref: /f-stack/app/micro_thread/mt_net.cpp (revision 39be5a50)
1 
2 /**
3  * Tencent is pleased to support the open source community by making MSEC available.
4  *
5  * Copyright (C) 2016 THL A29 Limited, a Tencent company. All rights reserved.
6  *
7  * Licensed under the GNU General Public License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License. You may
9  * obtain a copy of the License at
10  *
11  *     https://opensource.org/licenses/GPL-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software distributed under the
14  * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
15  * either express or implied. See the License for the specific language governing permissions
16  * and limitations under the License.
17  */
18 
19 
20 /**
21  *  @file mt_mbuf_pool.cpp
22  *  @time 20130924
23  **/
24 
25 #include <errno.h>
26 #include <netinet/tcp.h>
27 #include "micro_thread.h"
28 #include "mt_sys_hook.h"
29 #include "ff_hook.h"
30 #include "mt_net.h"
31 
32 
33 using namespace std;
34 using namespace NS_MICRO_THREAD;
35 
36 CNetHelper::CNetHelper()
37 {
38     handler = (void*)CNetMgr::Instance()->AllocNetItem();
39 }
40 
41 CNetHelper::~CNetHelper()
42 {
43     CNetHandler* net_handler = (CNetHandler*)handler;
44     if (handler != NULL)
45     {
46         net_handler->Reset();
47         CNetMgr::Instance()->FreeNetItem(net_handler);
48         handler = NULL;
49     }
50 }
51 
52 int32_t CNetHelper::SendRecv(void* data, uint32_t len, uint32_t timeout)
53 {
54     if (handler != NULL) {
55         CNetHandler* net_handler = (CNetHandler*)handler;
56         return net_handler->SendRecv(data, len, timeout);
57     } else {
58         return RC_INVALID_HANDLER;
59     }
60 }
61 
62 void* CNetHelper::GetRspBuff()
63 {
64     if (handler != NULL) {
65         CNetHandler* net_handler = (CNetHandler*)handler;
66         return net_handler->GetRspBuff();
67     } else {
68         return NULL;
69     }
70 }
71 
72 uint32_t CNetHelper::GetRspLen()
73 {
74     if (handler != NULL) {
75         CNetHandler* net_handler = (CNetHandler*)handler;
76         return net_handler->GetRspLen();
77     } else {
78         return 0;
79     }
80 }
81 
82 char* CNetHelper::GetErrMsg(int32_t result)
83 {
84     static const char* errmsg = "unknown error type";
85 
86     switch (result)
87     {
88         case RC_SUCCESS:
89             errmsg = "success";
90             break;
91 
92         case RC_ERR_SOCKET:
93             errmsg = "create socket failed";
94             break;
95 
96         case RC_SEND_FAIL:
97             errmsg = "send pakeage timeout or failed";
98             break;
99 
100         case RC_RECV_FAIL:
101             errmsg = "recv response timeout or failed";
102             break;
103 
104         case RC_CONNECT_FAIL:
105             errmsg = "connect timeout or failed";
106             break;
107 
108         case RC_CHECK_PKG_FAIL:
109             errmsg = "user package check failed";
110             break;
111 
112         case RC_NO_MORE_BUFF:
113             errmsg = "user response buffer too small";
114             break;
115 
116         case RC_REMOTE_CLOSED:
117             errmsg = "remote close connection";
118             break;
119 
120         case RC_INVALID_PARAM:
121             errmsg = "params invalid";
122             break;
123 
124         case RC_INVALID_HANDLER:
125             errmsg = "net handler invalid";
126             break;
127 
128         case RC_MEM_ERROR:
129             errmsg = "no more memory, alloc failed";
130             break;
131 
132         case RC_CONFLICT_SID:
133             errmsg = "session id with the dest address conflict";
134             break;
135 
136         case RC_KQUEUE_ERROR:
137             errmsg = "epoll system error";
138             break;
139 
140         default:
141             break;
142     }
143 
144     return (char*)errmsg;
145 }
146 
147 void CNetHelper::SetProtoType(MT_PROTO_TYPE type)
148 {
149     if (handler != NULL) {
150         CNetHandler* net_handler = (CNetHandler*)handler;
151         return net_handler->SetProtoType(type);
152     }
153 }
154 
155 void CNetHelper::SetDestAddress(struct sockaddr_in* dst)
156 {
157     if (handler != NULL) {
158         CNetHandler* net_handler = (CNetHandler*)handler;
159         return net_handler->SetDestAddress(dst);
160     }
161 }
162 
163 void CNetHelper::SetSessionId(uint64_t sid)
164 {
165     if (handler != NULL) {
166         CNetHandler* net_handler = (CNetHandler*)handler;
167         return net_handler->SetSessionId(sid);
168     }
169 }
170 
171 void CNetHelper::SetSessionCallback(CHECK_SESSION_CALLBACK function)
172 {
173     if (handler != NULL) {
174         CNetHandler* net_handler = (CNetHandler*)handler;
175         return net_handler->SetSessionCallback(function);
176     }
177 }
178 
179 void CNetHandler::Reset()
180 {
181     this->Unlink();
182     this->UnRegistSession();
183 
184     if (_rsp_buff != NULL) {
185         delete_sk_buffer(_rsp_buff);
186         _rsp_buff               = NULL;
187     }
188 
189     _thread                     = NULL;
190     _proto_type                 = NET_PROTO_TCP;
191     _conn_type                  = TYPE_CONN_SESSION;
192     _dest_ipv4.sin_addr.s_addr  = 0;
193     _dest_ipv4.sin_port         = 0;
194     _session_id                 = 0;
195     _callback                   = NULL;
196     _err_no                     = 0;
197     _state_flags                = 0;
198     _conn_ptr                   = NULL;
199     _send_pos                   = 0;
200     _req_len                    = 0;
201     _req_data                   = NULL;
202 
203 }
204 
205 CNetHandler::CNetHandler()
206 {
207     _state_flags = 0;
208     _rsp_buff = NULL;
209 
210     this->Reset();
211 }
212 
213 CNetHandler::~CNetHandler()
214 {
215     this->Reset();
216 }
217 
218 int32_t CNetHandler::CheckParams()
219 {
220     if ((NULL == _req_data) || (_req_len == 0))
221     {
222         MTLOG_ERROR("param invalid, data[%p], len[%u]", _req_data, _req_len);
223         return RC_INVALID_PARAM;
224     }
225 
226     if ((_dest_ipv4.sin_addr.s_addr == 0) || (_dest_ipv4.sin_port == 0))
227     {
228         MTLOG_ERROR("param invalid, ip[%u], port[%u]", _dest_ipv4.sin_addr.s_addr,
229              _dest_ipv4.sin_port);
230         return RC_INVALID_PARAM;
231     }
232 
233     if (_conn_type == TYPE_CONN_SESSION)
234     {
235         if ((_callback == NULL) || (_session_id == 0))
236         {
237             MTLOG_ERROR("param invalid, callback[%p], session_id[%llu]", _callback, _session_id);
238             return RC_INVALID_PARAM;
239         }
240 
241         if (!this->RegistSession())
242         {
243             MTLOG_ERROR("param invalid, session_id[%llu] regist failed", _session_id);
244             return RC_CONFLICT_SID;
245         }
246     }
247 
248     return 0;
249 }
250 
251 int32_t CNetHandler::GetConnLink()
252 {
253     CDestLinks key;
254     key.SetKeyInfo(_dest_ipv4.sin_addr.s_addr, _dest_ipv4.sin_port, _proto_type, _conn_type);
255 
256     CDestLinks* dest_link = CNetMgr::Instance()->FindCreateDest(&key);
257     if (NULL == dest_link)
258     {
259         MTLOG_ERROR("get dest link handle failed");
260         return RC_MEM_ERROR;
261     }
262 
263     CSockLink* sock_link = dest_link->GetSockLink();
264     if (NULL == sock_link)
265     {
266         MTLOG_ERROR("get sock link handle failed");
267         return RC_MEM_ERROR;
268     }
269 
270     this->Link(sock_link);
271 
272     return 0;
273 }
274 
275 int32_t CNetHandler::WaitConnect(uint64_t timeout)
276 {
277     CSockLink* conn = (CSockLink*)this->_conn_ptr;
278     if (NULL == conn)
279     {
280         MTLOG_ERROR("get sock link handle failed");
281         return RC_MEM_ERROR;
282     }
283 
284     int32_t fd = conn->CreateSock();
285     if (fd < 0)
286     {
287         MTLOG_ERROR("create sock failed, ret %d[%m]", fd);
288         return RC_ERR_SOCKET;
289     }
290 
291     if (conn->Connect())
292     {
293         MTLOG_DEBUG("sock conncet ok");
294         return RC_SUCCESS;
295     }
296 
297     this->SwitchToConn();
298 
299     MtFrame* mtframe = MtFrame::Instance();
300     mtframe->WaitNotify(timeout);
301 
302     this->SwitchToIdle();
303 
304     if (_err_no != 0)
305     {
306         MTLOG_ERROR("connect get out errno %d", _err_no);
307         return _err_no;
308     }
309 
310     if (conn->Connected())
311     {
312         MTLOG_DEBUG("connect ok");
313         return 0;
314     }
315     else
316     {
317         MTLOG_TRACE("connect not ok, maybe timeout");
318         return RC_CONNECT_FAIL;
319     }
320 }
321 
322 int32_t CNetHandler::WaitSend(uint64_t timeout)
323 {
324     CSockLink* conn = (CSockLink*)this->_conn_ptr;
325     if (NULL == conn)
326     {
327         MTLOG_ERROR("get sock link handle failed");
328         return RC_MEM_ERROR;
329     }
330 
331     int32_t ret = conn->SendData(_req_data, _req_len);
332     if (ret < 0)
333     {
334         MTLOG_ERROR("sock send failed, ret %d[%m]", ret);
335         return RC_SEND_FAIL;
336     }
337     this->SkipSendPos(ret);
338 
339     if (_req_len == 0)
340     {
341         MTLOG_DEBUG("sock send ok");
342         return RC_SUCCESS;
343     }
344 
345     this->SwitchToSend();
346 
347     MtFrame* mtframe = MtFrame::Instance();
348     mtframe->WaitNotify(timeout);
349 
350     this->SwitchToIdle();
351 
352     if (_err_no != 0)
353     {
354         MTLOG_ERROR("send get out errno %d", _err_no);
355         return _err_no;
356     }
357 
358     if (_req_len == 0)
359     {
360         MTLOG_DEBUG("send req ok, len %u", _send_pos);
361         return 0;
362     }
363     else
364     {
365         MTLOG_TRACE("send req not ok, left len %u", _req_len);
366         return RC_SEND_FAIL;
367     }
368 }
369 
370 int32_t CNetHandler::WaitRecv(uint64_t timeout)
371 {
372     CSockLink* conn = (CSockLink*)this->_conn_ptr;
373     if (NULL == conn)
374     {
375         MTLOG_ERROR("get sock link handle failed");
376         return RC_MEM_ERROR;
377     }
378 
379     if (_conn_type == TYPE_CONN_SENDONLY)
380     {
381         MTLOG_DEBUG("only send, without recv");
382         return 0;
383     }
384 
385     this->SwitchToRecv();
386 
387     MtFrame* mtframe = MtFrame::Instance();
388     mtframe->WaitNotify(timeout);
389 
390     this->SwitchToIdle();
391 
392     if ((_rsp_buff != NULL) && (_rsp_buff->data_len > 0))
393     {
394         MTLOG_DEBUG("recv get rsp, len %d", _rsp_buff->data_len);
395         return 0;
396     }
397     else
398     {
399         MTLOG_TRACE("recv get out errno %d", _err_no);
400         return RC_RECV_FAIL;
401     }
402 }
403 
404 int32_t CNetHandler::SendRecv(void* data, uint32_t len, uint32_t timeout)
405 {
406     utime64_t start_ms = MtFrame::Instance()->GetLastClock();
407     utime64_t cost_time = 0;
408     uint64_t time_left = timeout;
409     this->_req_data = data;
410     this->_req_len  = len;
411 
412     int32_t ret = this->CheckParams();
413     if (ret < 0)
414     {
415         MTLOG_ERROR("check params failed, ret[%d]", ret);
416         goto EXIT_LABEL;
417     }
418 
419     ret = this->GetConnLink();
420     if (ret < 0)
421     {
422         MTLOG_ERROR("get sock conn failed, ret: %d", ret);
423         goto EXIT_LABEL;
424     }
425 
426     ret = this->WaitConnect(time_left);
427     if (ret < 0)
428     {
429         MTLOG_ERROR("sock connect failed, ret: %d", ret);
430         goto EXIT_LABEL;
431     }
432 
433     cost_time = MtFrame::Instance()->GetLastClock() - start_ms;
434     time_left = (timeout > (uint32_t)cost_time) ? (timeout - (uint32_t)cost_time) : 0;
435     ret = this->WaitSend(time_left);
436     if (ret < 0)
437     {
438         MTLOG_ERROR("sock send failed, ret: %d", ret);
439         goto EXIT_LABEL;
440     }
441 
442     cost_time = MtFrame::Instance()->GetLastClock() - start_ms;
443     time_left = (timeout > (uint32_t)cost_time) ? (timeout - (uint32_t)cost_time) : 0;
444     ret = this->WaitRecv(time_left);
445     if (ret < 0)
446     {
447         MTLOG_ERROR("sock recv failed, ret: %d", ret);
448         goto EXIT_LABEL;
449     }
450 
451     ret = 0;
452 
453 EXIT_LABEL:
454 
455     this->Unlink();
456 
457     this->UnRegistSession();
458 
459     return ret;
460 }
461 
462 uint32_t CNetHandler::SkipSendPos(uint32_t len)
463 {
464     uint32_t skip_len = (len >= _req_len) ? _req_len : len;
465     _req_len -= skip_len;
466     _send_pos += skip_len;
467     _req_data = (char*)_req_data + skip_len;
468 
469     return skip_len;
470 }
471 
472 void CNetHandler::Link(CSockLink* conn)
473 {
474     this->_conn_ptr = conn;
475     this->SwitchToIdle();
476 }
477 
478 void CNetHandler::Unlink()
479 {
480     if (this->_state_flags != 0)
481     {
482         this->DetachConn();
483     }
484     this->_conn_ptr = NULL;
485 }
486 
487 void CNetHandler::SwitchToConn()
488 {
489     CSockLink* conn = (CSockLink*)this->_conn_ptr;
490     if (NULL == conn)
491     {
492         MTLOG_ERROR("net handler invalid");
493         return;
494     }
495 
496     this->DetachConn();
497 
498     this->_state_flags |= STATE_IN_CONNECT;
499     conn->AppendToList(CSockLink::LINK_CONN_LIST, this);
500 }
501 
502 void CNetHandler::SwitchToSend()
503 {
504     CSockLink* conn = (CSockLink*)this->_conn_ptr;
505     if (NULL == conn)
506     {
507         MTLOG_ERROR("net handler invalid");
508         return;
509     }
510 
511     this->DetachConn();
512 
513     this->_state_flags |= STATE_IN_SEND;
514     conn->AppendToList(CSockLink::LINK_SEND_LIST, this);
515 }
516 
517 void CNetHandler::SwitchToRecv()
518 {
519     CSockLink* conn = (CSockLink*)this->_conn_ptr;
520     if (NULL == conn)
521     {
522         MTLOG_ERROR("net handler invalid");
523         return;
524     }
525 
526     this->DetachConn();
527 
528     this->_state_flags |= STATE_IN_RECV;
529     conn->AppendToList(CSockLink::LINK_RECV_LIST, this);
530 }
531 
532 void CNetHandler::SwitchToIdle()
533 {
534     CSockLink* conn = (CSockLink*)this->_conn_ptr;
535     if (NULL == conn)
536     {
537         MTLOG_ERROR("net handler invalid");
538         return;
539     }
540 
541     this->DetachConn();
542 
543     this->_state_flags |= STATE_IN_IDLE;
544     conn->AppendToList(CSockLink::LINK_IDLE_LIST, this);
545 }
546 
547 void CNetHandler::DetachConn()
548 {
549     CSockLink* conn = (CSockLink*)this->_conn_ptr;
550     if (NULL == conn)
551     {
552         MTLOG_DEBUG("net handler not set");
553         return;
554     }
555 
556     if (_state_flags == 0)
557     {
558         return;
559     }
560 
561     if (_state_flags & STATE_IN_CONNECT)
562     {
563         conn->RemoveFromList(CSockLink::LINK_CONN_LIST, this);
564         _state_flags &= ~STATE_IN_CONNECT;
565     }
566 
567     if (_state_flags & STATE_IN_SEND)
568     {
569         conn->RemoveFromList(CSockLink::LINK_SEND_LIST, this);
570         _state_flags &= ~STATE_IN_SEND;
571     }
572 
573     if (_state_flags & STATE_IN_RECV)
574     {
575         conn->RemoveFromList(CSockLink::LINK_RECV_LIST, this);
576         _state_flags &= ~STATE_IN_RECV;
577     }
578 
579     if (_state_flags & STATE_IN_IDLE)
580     {
581         conn->RemoveFromList(CSockLink::LINK_IDLE_LIST, this);
582         _state_flags &= ~STATE_IN_IDLE;
583     }
584 }
585 
586 uint32_t CNetHandler::HashValue()
587 {
588     uint32_t ip = _dest_ipv4.sin_addr.s_addr;
589     ip ^= (_dest_ipv4.sin_port << 16) | (_proto_type << 8) | (_conn_type << 8);
590 
591     uint32_t hash = (_session_id >> 32) & 0xffffffff;
592     hash ^= _session_id  & 0xffffffff;
593     hash ^= ip;
594 
595     return hash;
596 }
597 
598 int32_t CNetHandler::HashCmp(HashKey* rhs)
599 {
600     CNetHandler* data = (CNetHandler*)(rhs);
601     if (!data) {
602         return -1;
603     }
604     if (this->_session_id != data->_session_id)
605     {
606         return (this->_session_id > data->_session_id) ? 1 : -1;
607     }
608 
609     if (this->_dest_ipv4.sin_addr.s_addr != data->_dest_ipv4.sin_addr.s_addr) {
610         return (this->_dest_ipv4.sin_addr.s_addr > data->_dest_ipv4.sin_addr.s_addr) ? 1 : -1;
611     }
612     if (this->_dest_ipv4.sin_port != data->_dest_ipv4.sin_port) {
613         return (this->_dest_ipv4.sin_port > data->_dest_ipv4.sin_port) ? 1 : -1;
614     }
615     if (this->_proto_type != data->_proto_type) {
616         return (this->_proto_type > data->_proto_type) ? 1 : -1;
617     }
618     if (this->_conn_type != data->_conn_type) {
619         return (this->_conn_type > data->_conn_type) ? 1 : -1;
620     }
621 
622     return 0;
623 };
624 
625 bool CNetHandler::RegistSession()
626 {
627     if (CNetMgr::Instance()->FindNetItem(this) != NULL)
628     {
629         return false;
630     }
631 
632     MtFrame* mtframe = MtFrame::Instance();
633     this->_thread = mtframe->GetActiveThread();
634 
635     CNetMgr::Instance()->InsertNetItem(this);
636     this->_state_flags |= STATE_IN_SESSION;
637     return true;
638 }
639 
640 void CNetHandler::UnRegistSession()
641 {
642     if (this->_state_flags & STATE_IN_SESSION)
643     {
644         CNetMgr::Instance()->RemoveNetItem(this);
645         this->_state_flags &= ~STATE_IN_SESSION;
646     }
647 }
648 
649 TNetItemList* CSockLink::GetItemList(int32_t type)
650 {
651     TNetItemList* list = NULL;
652     switch (type)
653     {
654         case LINK_IDLE_LIST:
655             list = &this->_idle_list;
656             break;
657 
658         case LINK_CONN_LIST:
659             list = &this->_wait_connect;
660             break;
661 
662         case LINK_SEND_LIST:
663             list = &this->_wait_send;
664             break;
665 
666         case LINK_RECV_LIST:
667             list = &this->_wait_recv;
668             break;
669 
670         default:
671             break;
672     }
673 
674     return list;
675 }
676 
677 void CSockLink::AppendToList(int32_t type, CNetHandler* item)
678 {
679     TNetItemList* list = this->GetItemList(type);
680     if (NULL == list)
681     {
682         MTLOG_ERROR("unknown list type: %d", type);
683         return;
684     }
685 
686     TAILQ_INSERT_TAIL(list, item, _link_entry);
687 }
688 
689 void CSockLink::RemoveFromList(int32_t type, CNetHandler* item)
690 {
691     TNetItemList* list = this->GetItemList(type);
692     if (NULL == list)
693     {
694         MTLOG_ERROR("unknown list type: %d", type);
695         return;
696     }
697 
698     TAILQ_REMOVE(list, item, _link_entry);
699 }
700 
701 void CSockLink::NotifyThread(CNetHandler* item, int32_t result)
702 {
703     static MtFrame* frame = NULL;
704     if (frame == NULL) {
705         frame = MtFrame::Instance();
706     }
707 
708     if (result != RC_SUCCESS)
709     {
710         item->SetErrNo(result);
711     }
712 
713     MicroThread* thread = item->GetThread();
714     if ((thread != NULL) && (thread->HasFlag(MicroThread::IO_LIST)))
715     {
716         frame->RemoveIoWait(thread);
717         frame->InsertRunable(thread);
718     }
719 }
720 
721 void CSockLink::NotifyAll(int32_t result)
722 {
723     CNetHandler* item = NULL;
724     CNetHandler* tmp = NULL;
725 
726     TAILQ_FOREACH_SAFE(item, &_wait_connect, _link_entry, tmp)
727     {
728         NotifyThread(item, result);
729         item->Unlink();
730     }
731 
732     TAILQ_FOREACH_SAFE(item, &_wait_send, _link_entry, tmp)
733     {
734         NotifyThread(item, result);
735         item->Unlink();
736     }
737 
738     TAILQ_FOREACH_SAFE(item, &_wait_recv, _link_entry, tmp)
739     {
740         NotifyThread(item, result);
741         item->Unlink();
742     }
743 
744     TAILQ_FOREACH_SAFE(item, &_idle_list, _link_entry, tmp)
745     {
746         NotifyThread(item, result);
747         item->Unlink();
748     }
749 }
750 
751 void CSockLink::Reset()
752 {
753     this->Close();
754     this->NotifyAll(_errno);
755 
756     rw_cache_destroy(&_recv_cache);
757     if (_rsp_buff != NULL)
758     {
759         delete_sk_buffer(_rsp_buff);
760         _rsp_buff = NULL;
761     }
762 
763     TAILQ_INIT(&_wait_connect);
764     TAILQ_INIT(&_wait_send);
765     TAILQ_INIT(&_wait_recv);
766     TAILQ_INIT(&_idle_list);
767 
768     _proto_type     = NET_PROTO_TCP;
769     _errno          = 0;
770     _state          = 0;
771     _last_access    = mt_time_ms();
772     _parents        = NULL;
773 
774     this->KqueuerObj::Reset();
775 }
776 
777 CSockLink::CSockLink()
778 {
779     rw_cache_init(&_recv_cache, NULL);
780     _rsp_buff       = NULL;
781 
782     TAILQ_INIT(&_wait_connect);
783     TAILQ_INIT(&_wait_send);
784     TAILQ_INIT(&_wait_recv);
785     TAILQ_INIT(&_idle_list);
786 
787     _proto_type     = NET_PROTO_TCP;
788     _errno          = 0;
789     _state          = 0;
790     _last_access    = mt_time_ms();
791     _parents        = NULL;
792 }
793 
794 CSockLink::~CSockLink()
795 {
796     this->Reset();
797 }
798 
799 void CSockLink::SetProtoType(MT_PROTO_TYPE type)
800 {
801     _proto_type = type;
802     _recv_cache.pool = CNetMgr::Instance()->GetSkBuffMng(type);
803 }
804 
805 void CSockLink::Close()
806 {
807     if (_fd < 0)
808     {
809         return;
810     }
811 
812     MtFrame::Instance()->KqueueDelObj(this);
813 
814     close(_fd);
815     _fd = -1;
816 }
817 
818 void CSockLink::Destroy()
819 {
820     CDestLinks* dstlink = (CDestLinks*)_parents;
821     if (NULL == dstlink)
822     {
823         MTLOG_ERROR("socket link without parents ptr, maybe wrong");
824         delete this;
825     }
826     else
827     {
828         MTLOG_DEBUG("socket link just free");
829         dstlink->FreeSockLink(this);
830     }
831 }
832 
833 int32_t CSockLink::CreateSock()
834 {
835     if (_fd > 0)
836     {
837         return _fd;
838     }
839 
840     if (NET_PROTO_TCP == _proto_type)
841     {
842         _fd = socket(AF_INET, SOCK_STREAM, 0);
843     }
844     else
845     {
846         _fd = socket(AF_INET, SOCK_DGRAM, 0);
847     }
848 
849     if (_fd < 0)
850     {
851         MTLOG_ERROR("create socket failed, ret %d[%m]", _fd);
852         return -1;
853     }
854 
855     int flags = 1;
856     if (ioctl(_fd, FIONBIO, &flags) < 0)
857     {
858         MTLOG_ERROR("socket unblock failed, %m");
859         close(_fd);
860         _fd = -1;
861         return -2;
862     }
863 
864     if (NET_PROTO_TCP == _proto_type)
865     {
866         setsockopt(_fd, IPPROTO_TCP, TCP_NODELAY, &flags, sizeof(flags));
867         this->EnableOutput();
868     }
869 
870     this->EnableInput();
871     if (!MtFrame::Instance()->KqueueAddObj(this))
872     {
873         MTLOG_ERROR("socket epoll mng failed, %m");
874         close(_fd);
875         _fd = -1;
876         return -3;
877     }
878 
879     return _fd;
880 }
881 
882 struct sockaddr_in* CSockLink::GetDestAddr(struct sockaddr_in* addr)
883 {
884     CDestLinks* dstlink = (CDestLinks*)_parents;
885     if ((NULL == _parents) || (NULL == addr)) {
886         return NULL;
887     }
888 
889     uint32_t ip = 0;
890     uint16_t port = 0;
891     dstlink->GetDestIP(ip, port);
892 
893     addr->sin_family = AF_INET;
894     addr->sin_addr.s_addr = ip;
895     addr->sin_port = port;
896 
897     return addr;
898 }
899 
900 bool CSockLink::Connect()
901 {
902     this->_last_access = mt_time_ms();
903 
904     if (_proto_type == NET_PROTO_UDP)
905     {
906         _state |= LINK_CONNECTED;
907     }
908 
909     if (_state & LINK_CONNECTED)
910     {
911         return true;
912     }
913 
914     if (_state & LINK_CONNECTING)
915     {
916         return false;
917     }
918 
919     struct sockaddr_in addr = {0};
920 
921     mt_hook_syscall(connect);
922     int32_t ret = ff_hook_connect(_fd, (struct sockaddr*)this->GetDestAddr(&addr), sizeof(struct sockaddr_in));
923     if (ret < 0)
924     {
925         int32_t err = errno;
926         if (err == EISCONN)
927         {
928             _state |= LINK_CONNECTED;
929             return true;
930         }
931         else
932         {
933             _state |= LINK_CONNECTING;
934             if ((err == EINPROGRESS) || (err == EALREADY) || (err == EINTR))
935             {
936                 MTLOG_DEBUG("Open connect not ok, maybe first try, sock %d, errno %d", _fd, err);
937                 return false;
938             }
939             else
940             {
941                 MTLOG_ERROR("Open connect not ok, sock %d, errno %d", _fd, err);
942                 return false;
943             }
944         }
945     }
946     else
947     {
948         _state |= LINK_CONNECTED;
949         return true;
950     }
951 }
952 
953 int32_t CSockLink::SendCacheUdp(void* data, uint32_t len)
954 {
955     mt_hook_syscall(sendto);
956     void* buff = NULL;
957     uint32_t buff_len = 0;
958 
959     CNetHandler* item = NULL;
960     CNetHandler* tmp = NULL;
961     struct sockaddr_in dst = {0};
962 
963     TAILQ_FOREACH_SAFE(item, &_wait_send, _link_entry, tmp)
964     {
965         item->GetSendData(buff, buff_len);
966         if ((NULL == buff) || (buff_len == 0))
967         {
968             MTLOG_ERROR("get buff ptr invalid, log it");
969             NotifyThread(item, 0);
970             item->SwitchToIdle();
971             continue;
972         }
973 
974         int32_t ret = ff_hook_sendto(_fd, buff, buff_len, 0,
975                     (struct sockaddr*)this->GetDestAddr(&dst), sizeof(struct sockaddr_in));
976         if (ret == -1)
977         {
978             if ((errno == EINTR) || (errno == EAGAIN) || (errno == EINPROGRESS))
979             {
980                 return 0;
981             }
982             else
983             {
984                 MTLOG_ERROR("socket send failed, fd %d, errno %d(%s)", _fd,
985                           errno, strerror(errno));
986                 return -2;
987             }
988         }
989 
990         NotifyThread(item, 0);
991         item->SwitchToIdle();
992     }
993 
994     if ((data == NULL) || (len == 0))
995     {
996         return 0;
997     }
998 
999     int32_t ret = ff_hook_sendto(_fd, data, len, 0,
1000                     (struct sockaddr*)this->GetDestAddr(&dst), sizeof(struct sockaddr_in));
1001     if (ret == -1)
1002     {
1003         if ((errno == EINTR) || (errno == EAGAIN) || (errno == EINPROGRESS))
1004         {
1005             return 0;
1006         }
1007         else
1008         {
1009             MTLOG_ERROR("socket send failed, fd %d, errno %d(%s)", _fd,
1010                    errno, strerror(errno));
1011             return -2;
1012         }
1013     }
1014     else
1015     {
1016         return ret;
1017     }
1018 }
1019 
1020 int32_t CSockLink::SendCacheTcp(void* data, uint32_t len)
1021 {
1022     void* buff = NULL;
1023     uint32_t buff_len = 0;
1024     struct iovec iov[64];
1025     int32_t count = 0;
1026     CNetHandler* item = NULL;
1027     CNetHandler* tmp = NULL;
1028 
1029     TAILQ_FOREACH_SAFE(item, &_wait_send, _link_entry, tmp)
1030     {
1031         item->GetSendData(buff, buff_len);
1032         iov[count].iov_base = buff;
1033         iov[count].iov_len  = (int32_t)buff_len;
1034         count++;
1035         if (count >= 64)
1036         {
1037             break;
1038         }
1039     }
1040     if ((count < 64) && (data != NULL))
1041     {
1042         iov[count].iov_base = data;
1043         iov[count].iov_len  = (int32_t)len;
1044         count++;
1045     }
1046 
1047     ssize_t bytes = writev(_fd, iov, count);
1048     if (bytes < 0)
1049     {
1050         if ((errno == EAGAIN) || (errno == EINTR))
1051         {
1052             return 0;
1053         }
1054         else
1055         {
1056             MTLOG_ERROR("socket writev failed, fd %d, errno %d(%s)", _fd,
1057                    errno, strerror(errno));
1058             return -1;
1059         }
1060     }
1061 
1062     uint32_t send_left = (uint32_t)bytes;
1063     TAILQ_FOREACH_SAFE(item, &_wait_send, _link_entry, tmp)
1064     {
1065         send_left -= item->SkipSendPos(send_left);
1066         item->GetSendData(buff, buff_len);
1067         if (buff_len == 0)
1068         {
1069             NotifyThread(item, 0);
1070             item->SwitchToIdle();
1071         }
1072 
1073         if (send_left == 0)
1074         {
1075             break;
1076         }
1077     }
1078 
1079     return send_left;
1080 }
1081 
1082 int32_t CSockLink::SendData(void* data, uint32_t len)
1083 {
1084     int32_t ret = 0;
1085     bool rc = false;
1086 
1087     this->_last_access = mt_time_ms();
1088 
1089     if (_proto_type == NET_PROTO_UDP)
1090     {
1091         ret = SendCacheUdp(data, len);
1092     }
1093     else
1094     {
1095         ret = SendCacheTcp(data, len);
1096     }
1097 
1098     if (ret < (int32_t)len)
1099     {
1100         this->EnableOutput();
1101         rc = MtFrame::Instance()->KqueueCtrlAdd(_fd, KQ_EVENT_READ);
1102     }
1103     else
1104     {
1105         this->DisableOutput();
1106         rc = MtFrame::Instance()->KqueueCtrlDel(_fd, KQ_EVENT_WRITE);
1107     }
1108 
1109     if (!rc)
1110     {
1111         MTLOG_ERROR("socket epoll mng failed[%m], wait timeout");
1112     }
1113 
1114     return ret;
1115 }
1116 
1117 int32_t CSockLink::RecvDispath()
1118 {
1119     if (_proto_type == NET_PROTO_UDP)
1120     {
1121         return this->DispathUdp();
1122     }
1123     else
1124     {
1125         return this->DispathTcp();
1126     }
1127 }
1128 
1129 void CSockLink::ExtendRecvRsp()
1130 {
1131     if (NULL == _rsp_buff)
1132     {
1133         _rsp_buff = new_sk_buffer(512);
1134         if (NULL == _rsp_buff)
1135         {
1136             MTLOG_ERROR("no more memory, error");
1137             return;
1138         }
1139     }
1140 
1141     _rsp_buff->data_len +=  read_cache_begin(&_recv_cache, _rsp_buff->data_len,
1142         _rsp_buff->data + _rsp_buff->data_len , _rsp_buff->size - _rsp_buff->data_len);
1143 }
1144 
1145 CHECK_SESSION_CALLBACK CSockLink::GetSessionCallback()
1146 {
1147     CHECK_SESSION_CALLBACK check_session = NULL;
1148 
1149     CNetHandler* item = TAILQ_FIRST(&_wait_recv);
1150     if (NULL == item)
1151     {
1152         MTLOG_DEBUG("recv data with no wait item, err");
1153         goto EXIT_LABEL;
1154     }
1155 
1156     check_session = item->GetSessionCallback();
1157     if (NULL == check_session)
1158     {
1159         MTLOG_ERROR("recv data with no session callback, err");
1160         goto EXIT_LABEL;
1161     }
1162 
1163 EXIT_LABEL:
1164 
1165     CDestLinks* dstlink = (CDestLinks*)_parents;
1166     if (NULL == dstlink)
1167     {
1168         return check_session;
1169     }
1170 
1171     if (check_session != NULL)
1172     {
1173         dstlink->SetDefaultCallback(check_session);
1174     }
1175     else
1176     {
1177         check_session = dstlink->GetDefaultCallback();
1178     }
1179 
1180     return check_session;
1181 }
1182 
1183 int32_t CSockLink::DispathTcp()
1184 {
1185     CHECK_SESSION_CALLBACK check_session = this->GetSessionCallback();
1186     if (NULL == check_session)
1187     {
1188         MTLOG_ERROR("recv data with no session callback, err");
1189         return -1;
1190     }
1191 
1192     uint32_t need_len = 0;
1193     uint64_t sid = 0;
1194     int32_t ret = 0;
1195     while (_recv_cache.len > 0)
1196     {
1197         this->ExtendRecvRsp();
1198         if (NULL == _rsp_buff)
1199         {
1200             MTLOG_ERROR("alloc memory, error");
1201             _errno = RC_MEM_ERROR;
1202             return -3;
1203         }
1204 
1205         need_len = 0;
1206         ret = check_session(_rsp_buff->data, _rsp_buff->data_len, &sid, &need_len);
1207 
1208         if (ret < 0)
1209         {
1210             MTLOG_ERROR("user check resp failed, ret %d", ret);
1211             _errno = RC_CHECK_PKG_FAIL;
1212             return -1;
1213         }
1214 
1215         if (ret == 0)
1216         {
1217             if ((need_len == 0) && (_rsp_buff->data_len == _rsp_buff->size))
1218             {
1219                 MTLOG_DEBUG("recv default buff full[%u], but user no set need length", _rsp_buff->size);
1220                 need_len = _rsp_buff->size * 2;
1221             }
1222 
1223             if ((need_len <= _rsp_buff->size) || (need_len > 100*1024*1024))
1224             {
1225                 MTLOG_DEBUG("maybe need wait more data: %u", need_len);
1226                 return 0;
1227             }
1228 
1229             _rsp_buff = reserve_sk_buffer(_rsp_buff, need_len);
1230             if (NULL == _rsp_buff)
1231             {
1232                 MTLOG_ERROR("no more memory, error");
1233                 _errno = RC_MEM_ERROR;
1234                 return -3;
1235             }
1236 
1237             if (_rsp_buff->data_len >= _recv_cache.len)
1238             {
1239                 MTLOG_DEBUG("maybe need wait more data, now %u", _recv_cache.len);
1240                 return 0;
1241             }
1242 
1243             continue;
1244         }
1245 
1246         if (ret > (int32_t)_recv_cache.len)
1247         {
1248             MTLOG_DEBUG("maybe pkg not all ok, wait more");
1249             return 0;
1250         }
1251 
1252         CNetHandler* session = this->FindSession(sid);
1253         if (NULL == session)
1254         {
1255             MTLOG_DEBUG("session id %llu, find failed, maybe timeout", sid);
1256             cache_skip_data(&_recv_cache, ret);
1257             delete_sk_buffer(_rsp_buff);
1258             _rsp_buff = NULL;
1259         }
1260         else
1261         {
1262             MTLOG_DEBUG("session id %llu, find ok, wakeup it", sid);
1263             cache_skip_data(&_recv_cache, ret);
1264             this->NotifyThread(session, 0);
1265             session->SwitchToIdle();
1266             _rsp_buff->data_len = ret;
1267             session->SetRespBuff(_rsp_buff);
1268             _rsp_buff = NULL;
1269         }
1270     }
1271 
1272     return 0;
1273 
1274 }
1275 
1276 int32_t CSockLink::DispathUdp()
1277 {
1278     CHECK_SESSION_CALLBACK check_session = NULL;
1279     CNetHandler* item = TAILQ_FIRST(&_wait_recv);
1280     if (NULL == item)
1281     {
1282         MTLOG_DEBUG("recv data with no wait item, maybe wrong pkg recv");
1283     }
1284     else
1285     {
1286         check_session = item->GetSessionCallback();
1287         if (NULL == check_session)
1288         {
1289             MTLOG_TRACE("recv data with no session callback, err");
1290         }
1291     }
1292 
1293     uint64_t sid = 0;
1294     uint32_t need_len = 0;
1295     int32_t ret = 0;
1296     TSkBuffer* block = NULL;
1297     while ((block = TAILQ_FIRST(&_recv_cache.list)) != NULL)
1298     {
1299         if (check_session == NULL)
1300         {
1301             MTLOG_DEBUG("no recv wait, skip first block");
1302             cache_skip_data(&_recv_cache, block->data_len);
1303             continue;
1304         }
1305 
1306         need_len = 0;
1307         ret = check_session(block->data, block->data_len, &sid, &need_len);
1308         if ((ret <= 0) || (ret > (int32_t)block->data_len))
1309         {
1310             MTLOG_DEBUG("maybe wrong pkg come, skip it");
1311             cache_skip_data(&_recv_cache, block->data_len);
1312             continue;
1313         }
1314 
1315         CNetHandler* session = this->FindSession(sid);
1316         if (NULL == session)
1317         {
1318             MTLOG_DEBUG("session id %llu, find failed, maybe timeout", sid);
1319             cache_skip_data(&_recv_cache, block->data_len);
1320         }
1321         else
1322         {
1323             MTLOG_DEBUG("session id %llu, find ok, wakeup it", sid);
1324             this->NotifyThread(session, 0);
1325             session->SwitchToIdle();
1326             cache_skip_first_buffer(&_recv_cache);
1327             session->SetRespBuff(block);
1328         }
1329     }
1330 
1331     return 0;
1332 }
1333 
1334 CNetHandler* CSockLink::FindSession(uint64_t sid)
1335 {
1336     CNetHandler key;
1337     CDestLinks* dstlink = (CDestLinks*)_parents;
1338     if (NULL == dstlink)
1339     {
1340         MTLOG_ERROR("session dest link invalid, maybe error");
1341         return NULL;
1342     }
1343     struct sockaddr_in addr;
1344     key.SetDestAddress(this->GetDestAddr(&addr));
1345     key.SetConnType(dstlink->GetConnType());
1346     key.SetProtoType(dstlink->GetProtoType());
1347     key.SetSessionId(sid);
1348 
1349     return CNetMgr::Instance()->FindNetItem(&key);
1350 }
1351 
1352 int CSockLink::InputNotify()
1353 {
1354     int32_t ret = 0;
1355 
1356     this->_last_access = mt_time_ms();
1357 
1358     if (_proto_type == NET_PROTO_UDP)
1359     {
1360         ret = cache_udp_recv(&_recv_cache, _fd, NULL);
1361     }
1362     else
1363     {
1364         ret = cache_tcp_recv(&_recv_cache, _fd);
1365     }
1366 
1367     if (ret < 0)
1368     {
1369         if (ret == -SK_ERR_NEED_CLOSE)
1370         {
1371             MTLOG_DEBUG("recv on link failed, remote close");
1372             _errno = RC_REMOTE_CLOSED;
1373         }
1374         else
1375         {
1376             MTLOG_ERROR("recv on link failed, close it, ret %d[%m]", ret);
1377             _errno = RC_RECV_FAIL;
1378         }
1379 
1380         this->Destroy();
1381         return -1;
1382     }
1383 
1384     ret = this->RecvDispath();
1385     if (ret < 0)
1386     {
1387         MTLOG_DEBUG("recv dispath failed, close it, ret %d[%m]", ret);
1388         this->Destroy();
1389         return -2;
1390     }
1391 
1392     return 0;
1393 
1394 }
1395 
1396 int CSockLink::OutputNotify()
1397 {
1398     int32_t ret = 0;
1399 
1400     this->_last_access = mt_time_ms();
1401 
1402     if (_state & LINK_CONNECTING)
1403     {
1404         _state &= ~LINK_CONNECTING;
1405         _state |= LINK_CONNECTED;
1406 
1407         CNetHandler* item = NULL;
1408         CNetHandler* tmp = NULL;
1409         TAILQ_FOREACH_SAFE(item, &_wait_connect, _link_entry, tmp)
1410         {
1411             NotifyThread(item, 0);
1412             item->SwitchToIdle();
1413         }
1414     }
1415 
1416     if (_proto_type == NET_PROTO_UDP)
1417     {
1418         ret = SendCacheUdp(NULL, 0);
1419     }
1420     else
1421     {
1422         ret = SendCacheTcp(NULL, 0);
1423     }
1424 
1425     if (ret < 0)
1426     {
1427         MTLOG_ERROR("Send on link failed, close it, ret %d[%m]", ret);
1428         _errno = RC_SEND_FAIL;
1429         this->Destroy();
1430         return ret;
1431     }
1432 
1433     if (TAILQ_EMPTY(&_wait_send))
1434     {
1435         this->DisableOutput();
1436         if (!MtFrame::Instance()->KqueueCtrlDel(_fd, KQ_EVENT_WRITE))
1437         {
1438             MTLOG_ERROR("socket epoll mng failed[%m], wait timeout");
1439         }
1440     }
1441 
1442     return 0;
1443 }
1444 
1445 int CSockLink::HangupNotify()
1446 {
1447     MTLOG_ERROR("socket epoll error, fd %d", _fd);
1448 
1449     this->_errno = RC_KQUEUE_ERROR;
1450     this->Destroy();
1451     return -1;
1452 }
1453 
1454 CDestLinks::CDestLinks()
1455 {
1456     _timeout        = 5*60*1000;
1457     _addr_ipv4      = 0;
1458     _net_port       = 0;
1459     _proto_type     = NET_PROTO_UNDEF;
1460     _conn_type      = TYPE_CONN_SESSION;
1461 
1462     _max_links      = 3; // 默认3个
1463     _curr_link      = 0;
1464     _dflt_callback  = NULL;
1465 
1466     TAILQ_INIT(&_sock_list);
1467 }
1468 
1469 void CDestLinks::Reset()
1470 {
1471     CSockLink* item = NULL;
1472     CSockLink* temp = NULL;
1473     TAILQ_FOREACH_SAFE(item, &_sock_list, _link_entry, temp)
1474     {
1475         item->Destroy();
1476     }
1477     TAILQ_INIT(&_sock_list);
1478 
1479     CTimerMng* timer = MtFrame::Instance()->GetTimerMng();
1480     if (NULL != timer)
1481     {
1482         timer->stop_timer(this);
1483     }
1484 
1485     _timeout        = 5*60*1000;
1486     _addr_ipv4      = 0;
1487     _net_port       = 0;
1488     _proto_type     = NET_PROTO_UNDEF;
1489     _conn_type      = TYPE_CONN_SESSION;
1490 
1491     _max_links      = 3;
1492     _curr_link      = 0;
1493 }
1494 
1495 CDestLinks::~CDestLinks()
1496 {
1497     this->Reset();
1498 }
1499 
1500 void CDestLinks::StartTimer()
1501 {
1502     CTimerMng* timer = MtFrame::Instance()->GetTimerMng();
1503     if ((NULL == timer) || !timer->start_timer(this, 60*1000))
1504     {
1505         MTLOG_ERROR("obj %p attach timer failed, error", this);
1506     }
1507 }
1508 
1509 void CDestLinks::FreeSockLink(CSockLink* sock)
1510 {
1511     if ((sock == NULL) || (sock->GetParentsPtr() != (void*)this))
1512     {
1513         MTLOG_ERROR("invalid socklink %p, error", sock);
1514         return;
1515     }
1516 
1517     TAILQ_REMOVE(&_sock_list, sock, _link_entry);
1518     if (this->_curr_link > 0) {
1519         this->_curr_link--;
1520     }
1521 
1522     sock->Reset();
1523     CNetMgr::Instance()->FreeSockLink(sock);
1524 }
1525 
1526 CSockLink* CDestLinks::GetSockLink()
1527 {
1528     CSockLink* link = NULL;
1529     if (_curr_link < _max_links)
1530     {
1531         link = CNetMgr::Instance()->AllocSockLink();
1532         if (NULL == link)
1533         {
1534             MTLOG_ERROR("alloc sock link failed, error");
1535             return NULL;
1536         }
1537         link->SetParentsPtr(this);
1538         link->SetProtoType(_proto_type);
1539         TAILQ_INSERT_TAIL(&_sock_list, link, _link_entry);
1540         _curr_link++;
1541     }
1542     else
1543     {
1544         link = TAILQ_FIRST(&_sock_list);
1545         TAILQ_REMOVE(&_sock_list, link, _link_entry);
1546         TAILQ_INSERT_TAIL(&_sock_list, link, _link_entry);
1547     }
1548 
1549     return link;
1550 }
1551 
1552 void CDestLinks::timer_notify()
1553 {
1554     uint64_t now = mt_time_ms();
1555     CSockLink* item = NULL;
1556     CSockLink* temp = NULL;
1557     TAILQ_FOREACH_SAFE(item, &_sock_list, _link_entry, temp)
1558     {
1559         if ((item->GetLastAccess() + this->_timeout) < now)
1560         {
1561             MTLOG_DEBUG("link timeout, last[%llu], now [%llu]", item->GetLastAccess(), now);
1562             item->Destroy();
1563         }
1564     }
1565 
1566     item = TAILQ_FIRST(&_sock_list);
1567     if (NULL == item)
1568     {
1569         MTLOG_DEBUG("dest links timeout, now [%llu]", now);
1570         CNetMgr::Instance()->DeleteDestLink(this);
1571         return;
1572     }
1573 
1574     this->StartTimer();
1575 
1576     return;
1577 }
1578 
1579 CNetMgr* CNetMgr::_instance = NULL;
1580 CNetMgr* CNetMgr::Instance (void)
1581 {
1582     if (NULL == _instance)
1583     {
1584         _instance = new CNetMgr();
1585     }
1586 
1587     return _instance;
1588 }
1589 
1590 void CNetMgr::Destroy()
1591 {
1592     if( _instance != NULL )
1593     {
1594         delete _instance;
1595         _instance = NULL;
1596     }
1597 }
1598 
1599 CNetHandler* CNetMgr::FindNetItem(CNetHandler* key)
1600 {
1601     if (NULL == this->_session_hash)
1602     {
1603         return NULL;
1604     }
1605 
1606     return (CNetHandler*)_session_hash->HashFind(key);
1607 }
1608 
1609 void CNetMgr::InsertNetItem(CNetHandler* item)
1610 {
1611     if (NULL == this->_session_hash)
1612     {
1613         return;
1614     }
1615 
1616     int32_t ret = _session_hash->HashInsert(item);
1617     if (ret < 0)
1618     {
1619         MTLOG_ERROR("session insert failed, ret %d", ret);
1620     }
1621 
1622     return;
1623 }
1624 
1625 void CNetMgr::RemoveNetItem(CNetHandler* item)
1626 {
1627     CNetHandler* handler =  this->FindNetItem(item);
1628     if (NULL == handler)
1629     {
1630         return;
1631     }
1632 
1633     _session_hash->HashRemove(handler);
1634 }
1635 
1636 CDestLinks* CNetMgr::FindDestLink(CDestLinks* key)
1637 {
1638     if (NULL == this->_ip_hash)
1639     {
1640         return NULL;
1641     }
1642 
1643     return (CDestLinks*)_ip_hash->HashFind(key);
1644 }
1645 
1646 void CNetMgr::InsertDestLink(CDestLinks* item)
1647 {
1648     if (NULL == this->_ip_hash)
1649     {
1650         return;
1651     }
1652 
1653     int32_t ret = _ip_hash->HashInsert(item);
1654     if (ret < 0)
1655     {
1656         MTLOG_ERROR("ip dest insert failed, ret %d", ret);
1657     }
1658 
1659     return;
1660 }
1661 
1662 void CNetMgr::RemoveDestLink(CDestLinks* item)
1663 {
1664     CDestLinks* handler =  this->FindDestLink(item);
1665     if (NULL == handler)
1666     {
1667         return;
1668     }
1669 
1670     _ip_hash->HashRemove(handler);
1671 }
1672 
1673 CDestLinks* CNetMgr::FindCreateDest(CDestLinks* key)
1674 {
1675     CDestLinks* dest = this->FindDestLink(key);
1676     if (dest != NULL)
1677     {
1678         MTLOG_DEBUG("dest links reuse ok");
1679         return dest;
1680     }
1681 
1682     dest = this->AllocDestLink();
1683     if (NULL == dest)
1684     {
1685         MTLOG_ERROR("dest links alloc failed, log it");
1686         return NULL;
1687     }
1688 
1689     dest->CopyKeyInfo(key);
1690     dest->StartTimer();
1691     this->InsertDestLink(dest);
1692 
1693     return dest;
1694 }
1695 
1696 void CNetMgr::DeleteDestLink(CDestLinks* dst)
1697 {
1698     this->RemoveDestLink(dst);
1699     dst->Reset();
1700     this->FreeDestLink(dst);
1701 }
1702 
1703 CNetMgr::CNetMgr()
1704 {
1705     sk_buffer_mng_init(&_tcp_pool, 60, 4096);
1706     sk_buffer_mng_init(&_udp_pool, 60, SK_DFLT_BUFF_SIZE);
1707 
1708     _ip_hash = new HashList(100000);
1709     _session_hash = new HashList(100000);
1710 }
1711 
1712 CNetMgr::~CNetMgr()
1713 {
1714     if (_ip_hash != NULL)
1715     {
1716         HashKey* hash_item = _ip_hash->HashGetFirst();
1717         while (hash_item)
1718         {
1719             delete hash_item;
1720             hash_item = _ip_hash->HashGetFirst();
1721         }
1722 
1723         delete _ip_hash;
1724         _ip_hash = NULL;
1725     }
1726 
1727     if (_session_hash != NULL)
1728     {
1729         HashKey* hash_item = _session_hash->HashGetFirst();
1730         while (hash_item)
1731         {
1732             delete hash_item;
1733             hash_item = _session_hash->HashGetFirst();
1734         }
1735 
1736         delete _session_hash;
1737         _session_hash = NULL;
1738     }
1739 
1740     sk_buffer_mng_destroy(&_tcp_pool);
1741     sk_buffer_mng_destroy(&_udp_pool);
1742 }
1743 
1744 void CNetMgr::RecycleObjs(uint64_t now)
1745 {
1746     uint32_t now_s = (uint32_t)(now / 1000);
1747 
1748     recycle_sk_buffer(&_udp_pool, now_s);
1749     recycle_sk_buffer(&_tcp_pool, now_s);
1750 
1751     _net_item_pool.RecycleItem(now);
1752     _sock_link_pool.RecycleItem(now);
1753     _dest_ip_pool.RecycleItem(now);
1754 }
1755