1
2 /**
3 * Tencent is pleased to support the open source community by making MSEC available.
4 *
5 * Copyright (C) 2016 THL A29 Limited, a Tencent company. All rights reserved.
6 *
7 * Licensed under the GNU General Public License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License. You may
9 * obtain a copy of the License at
10 *
11 * https://opensource.org/licenses/GPL-2.0
12 *
13 * Unless required by applicable law or agreed to in writing, software distributed under the
14 * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
15 * either express or implied. See the License for the specific language governing permissions
16 * and limitations under the License.
17 */
18
19
20 /**
21 * @file mt_mbuf_pool.cpp
22 * @time 20130924
23 **/
24
25 #include <errno.h>
26 #include <netinet/tcp.h>
27 #include "micro_thread.h"
28 #include "mt_sys_hook.h"
29 #include "ff_hook.h"
30 #include "mt_net.h"
31
32
33 using namespace std;
34 using namespace NS_MICRO_THREAD;
35
CNetHelper()36 CNetHelper::CNetHelper()
37 {
38 handler = (void*)CNetMgr::Instance()->AllocNetItem();
39 }
40
~CNetHelper()41 CNetHelper::~CNetHelper()
42 {
43 CNetHandler* net_handler = (CNetHandler*)handler;
44 if (handler != NULL)
45 {
46 net_handler->Reset();
47 CNetMgr::Instance()->FreeNetItem(net_handler);
48 handler = NULL;
49 }
50 }
51
SendRecv(void * data,uint32_t len,uint32_t timeout)52 int32_t CNetHelper::SendRecv(void* data, uint32_t len, uint32_t timeout)
53 {
54 if (handler != NULL) {
55 CNetHandler* net_handler = (CNetHandler*)handler;
56 return net_handler->SendRecv(data, len, timeout);
57 } else {
58 return RC_INVALID_HANDLER;
59 }
60 }
61
GetRspBuff()62 void* CNetHelper::GetRspBuff()
63 {
64 if (handler != NULL) {
65 CNetHandler* net_handler = (CNetHandler*)handler;
66 return net_handler->GetRspBuff();
67 } else {
68 return NULL;
69 }
70 }
71
GetRspLen()72 uint32_t CNetHelper::GetRspLen()
73 {
74 if (handler != NULL) {
75 CNetHandler* net_handler = (CNetHandler*)handler;
76 return net_handler->GetRspLen();
77 } else {
78 return 0;
79 }
80 }
81
GetErrMsg(int32_t result)82 char* CNetHelper::GetErrMsg(int32_t result)
83 {
84 static const char* errmsg = "unknown error type";
85
86 switch (result)
87 {
88 case RC_SUCCESS:
89 errmsg = "success";
90 break;
91
92 case RC_ERR_SOCKET:
93 errmsg = "create socket failed";
94 break;
95
96 case RC_SEND_FAIL:
97 errmsg = "send pakeage timeout or failed";
98 break;
99
100 case RC_RECV_FAIL:
101 errmsg = "recv response timeout or failed";
102 break;
103
104 case RC_CONNECT_FAIL:
105 errmsg = "connect timeout or failed";
106 break;
107
108 case RC_CHECK_PKG_FAIL:
109 errmsg = "user package check failed";
110 break;
111
112 case RC_NO_MORE_BUFF:
113 errmsg = "user response buffer too small";
114 break;
115
116 case RC_REMOTE_CLOSED:
117 errmsg = "remote close connection";
118 break;
119
120 case RC_INVALID_PARAM:
121 errmsg = "params invalid";
122 break;
123
124 case RC_INVALID_HANDLER:
125 errmsg = "net handler invalid";
126 break;
127
128 case RC_MEM_ERROR:
129 errmsg = "no more memory, alloc failed";
130 break;
131
132 case RC_CONFLICT_SID:
133 errmsg = "session id with the dest address conflict";
134 break;
135
136 case RC_KQUEUE_ERROR:
137 errmsg = "epoll system error";
138 break;
139
140 default:
141 break;
142 }
143
144 return (char*)errmsg;
145 }
146
SetProtoType(MT_PROTO_TYPE type)147 void CNetHelper::SetProtoType(MT_PROTO_TYPE type)
148 {
149 if (handler != NULL) {
150 CNetHandler* net_handler = (CNetHandler*)handler;
151 return net_handler->SetProtoType(type);
152 }
153 }
154
SetDestAddress(struct sockaddr_in * dst)155 void CNetHelper::SetDestAddress(struct sockaddr_in* dst)
156 {
157 if (handler != NULL) {
158 CNetHandler* net_handler = (CNetHandler*)handler;
159 return net_handler->SetDestAddress(dst);
160 }
161 }
162
SetSessionId(uint64_t sid)163 void CNetHelper::SetSessionId(uint64_t sid)
164 {
165 if (handler != NULL) {
166 CNetHandler* net_handler = (CNetHandler*)handler;
167 return net_handler->SetSessionId(sid);
168 }
169 }
170
SetSessionCallback(CHECK_SESSION_CALLBACK function)171 void CNetHelper::SetSessionCallback(CHECK_SESSION_CALLBACK function)
172 {
173 if (handler != NULL) {
174 CNetHandler* net_handler = (CNetHandler*)handler;
175 return net_handler->SetSessionCallback(function);
176 }
177 }
178
Reset()179 void CNetHandler::Reset()
180 {
181 this->Unlink();
182 this->UnRegistSession();
183
184 if (_rsp_buff != NULL) {
185 delete_sk_buffer(_rsp_buff);
186 _rsp_buff = NULL;
187 }
188
189 _thread = NULL;
190 _proto_type = NET_PROTO_TCP;
191 _conn_type = TYPE_CONN_SESSION;
192 _dest_ipv4.sin_addr.s_addr = 0;
193 _dest_ipv4.sin_port = 0;
194 _session_id = 0;
195 _callback = NULL;
196 _err_no = 0;
197 _state_flags = 0;
198 _conn_ptr = NULL;
199 _send_pos = 0;
200 _req_len = 0;
201 _req_data = NULL;
202
203 }
204
CNetHandler()205 CNetHandler::CNetHandler()
206 {
207 _state_flags = 0;
208 _rsp_buff = NULL;
209
210 this->Reset();
211 }
212
~CNetHandler()213 CNetHandler::~CNetHandler()
214 {
215 this->Reset();
216 }
217
CheckParams()218 int32_t CNetHandler::CheckParams()
219 {
220 if ((NULL == _req_data) || (_req_len == 0))
221 {
222 MTLOG_ERROR("param invalid, data[%p], len[%u]", _req_data, _req_len);
223 return RC_INVALID_PARAM;
224 }
225
226 if ((_dest_ipv4.sin_addr.s_addr == 0) || (_dest_ipv4.sin_port == 0))
227 {
228 MTLOG_ERROR("param invalid, ip[%u], port[%u]", _dest_ipv4.sin_addr.s_addr,
229 _dest_ipv4.sin_port);
230 return RC_INVALID_PARAM;
231 }
232
233 if (_conn_type == TYPE_CONN_SESSION)
234 {
235 if ((_callback == NULL) || (_session_id == 0))
236 {
237 MTLOG_ERROR("param invalid, callback[%p], session_id[%llu]", _callback, _session_id);
238 return RC_INVALID_PARAM;
239 }
240
241 if (!this->RegistSession())
242 {
243 MTLOG_ERROR("param invalid, session_id[%llu] regist failed", _session_id);
244 return RC_CONFLICT_SID;
245 }
246 }
247
248 return 0;
249 }
250
GetConnLink()251 int32_t CNetHandler::GetConnLink()
252 {
253 CDestLinks key;
254 key.SetKeyInfo(_dest_ipv4.sin_addr.s_addr, _dest_ipv4.sin_port, _proto_type, _conn_type);
255
256 CDestLinks* dest_link = CNetMgr::Instance()->FindCreateDest(&key);
257 if (NULL == dest_link)
258 {
259 MTLOG_ERROR("get dest link handle failed");
260 return RC_MEM_ERROR;
261 }
262
263 CSockLink* sock_link = dest_link->GetSockLink();
264 if (NULL == sock_link)
265 {
266 MTLOG_ERROR("get sock link handle failed");
267 return RC_MEM_ERROR;
268 }
269
270 this->Link(sock_link);
271
272 return 0;
273 }
274
WaitConnect(uint64_t timeout)275 int32_t CNetHandler::WaitConnect(uint64_t timeout)
276 {
277 CSockLink* conn = (CSockLink*)this->_conn_ptr;
278 if (NULL == conn)
279 {
280 MTLOG_ERROR("get sock link handle failed");
281 return RC_MEM_ERROR;
282 }
283
284 int32_t fd = conn->CreateSock();
285 if (fd < 0)
286 {
287 MTLOG_ERROR("create sock failed, ret %d[%m]", fd);
288 return RC_ERR_SOCKET;
289 }
290
291 if (conn->Connect())
292 {
293 MTLOG_DEBUG("sock conncet ok");
294 return RC_SUCCESS;
295 }
296
297 this->SwitchToConn();
298
299 MtFrame* mtframe = MtFrame::Instance();
300 mtframe->WaitNotify(timeout);
301
302 this->SwitchToIdle();
303
304 if (_err_no != 0)
305 {
306 MTLOG_ERROR("connect get out errno %d", _err_no);
307 return _err_no;
308 }
309
310 if (conn->Connected())
311 {
312 MTLOG_DEBUG("connect ok");
313 return 0;
314 }
315 else
316 {
317 MTLOG_TRACE("connect not ok, maybe timeout");
318 return RC_CONNECT_FAIL;
319 }
320 }
321
WaitSend(uint64_t timeout)322 int32_t CNetHandler::WaitSend(uint64_t timeout)
323 {
324 CSockLink* conn = (CSockLink*)this->_conn_ptr;
325 if (NULL == conn)
326 {
327 MTLOG_ERROR("get sock link handle failed");
328 return RC_MEM_ERROR;
329 }
330
331 int32_t ret = conn->SendData(_req_data, _req_len);
332 if (ret < 0)
333 {
334 MTLOG_ERROR("sock send failed, ret %d[%m]", ret);
335 return RC_SEND_FAIL;
336 }
337 this->SkipSendPos(ret);
338
339 if (_req_len == 0)
340 {
341 MTLOG_DEBUG("sock send ok");
342 return RC_SUCCESS;
343 }
344
345 this->SwitchToSend();
346
347 MtFrame* mtframe = MtFrame::Instance();
348 mtframe->WaitNotify(timeout);
349
350 this->SwitchToIdle();
351
352 if (_err_no != 0)
353 {
354 MTLOG_ERROR("send get out errno %d", _err_no);
355 return _err_no;
356 }
357
358 if (_req_len == 0)
359 {
360 MTLOG_DEBUG("send req ok, len %u", _send_pos);
361 return 0;
362 }
363 else
364 {
365 MTLOG_TRACE("send req not ok, left len %u", _req_len);
366 return RC_SEND_FAIL;
367 }
368 }
369
WaitRecv(uint64_t timeout)370 int32_t CNetHandler::WaitRecv(uint64_t timeout)
371 {
372 CSockLink* conn = (CSockLink*)this->_conn_ptr;
373 if (NULL == conn)
374 {
375 MTLOG_ERROR("get sock link handle failed");
376 return RC_MEM_ERROR;
377 }
378
379 if (_conn_type == TYPE_CONN_SENDONLY)
380 {
381 MTLOG_DEBUG("only send, without recv");
382 return 0;
383 }
384
385 this->SwitchToRecv();
386
387 MtFrame* mtframe = MtFrame::Instance();
388 mtframe->WaitNotify(timeout);
389
390 this->SwitchToIdle();
391
392 if ((_rsp_buff != NULL) && (_rsp_buff->data_len > 0))
393 {
394 MTLOG_DEBUG("recv get rsp, len %d", _rsp_buff->data_len);
395 return 0;
396 }
397 else
398 {
399 MTLOG_TRACE("recv get out errno %d", _err_no);
400 return RC_RECV_FAIL;
401 }
402 }
403
SendRecv(void * data,uint32_t len,uint32_t timeout)404 int32_t CNetHandler::SendRecv(void* data, uint32_t len, uint32_t timeout)
405 {
406 utime64_t start_ms = MtFrame::Instance()->GetLastClock();
407 utime64_t cost_time = 0;
408 uint64_t time_left = timeout;
409 this->_req_data = data;
410 this->_req_len = len;
411
412 int32_t ret = this->CheckParams();
413 if (ret < 0)
414 {
415 MTLOG_ERROR("check params failed, ret[%d]", ret);
416 goto EXIT_LABEL;
417 }
418
419 ret = this->GetConnLink();
420 if (ret < 0)
421 {
422 MTLOG_ERROR("get sock conn failed, ret: %d", ret);
423 goto EXIT_LABEL;
424 }
425
426 ret = this->WaitConnect(time_left);
427 if (ret < 0)
428 {
429 MTLOG_ERROR("sock connect failed, ret: %d", ret);
430 goto EXIT_LABEL;
431 }
432
433 cost_time = MtFrame::Instance()->GetLastClock() - start_ms;
434 time_left = (timeout > (uint32_t)cost_time) ? (timeout - (uint32_t)cost_time) : 0;
435 ret = this->WaitSend(time_left);
436 if (ret < 0)
437 {
438 MTLOG_ERROR("sock send failed, ret: %d", ret);
439 goto EXIT_LABEL;
440 }
441
442 cost_time = MtFrame::Instance()->GetLastClock() - start_ms;
443 time_left = (timeout > (uint32_t)cost_time) ? (timeout - (uint32_t)cost_time) : 0;
444 ret = this->WaitRecv(time_left);
445 if (ret < 0)
446 {
447 MTLOG_ERROR("sock recv failed, ret: %d", ret);
448 goto EXIT_LABEL;
449 }
450
451 ret = 0;
452
453 EXIT_LABEL:
454
455 this->Unlink();
456
457 this->UnRegistSession();
458
459 return ret;
460 }
461
SkipSendPos(uint32_t len)462 uint32_t CNetHandler::SkipSendPos(uint32_t len)
463 {
464 uint32_t skip_len = (len >= _req_len) ? _req_len : len;
465 _req_len -= skip_len;
466 _send_pos += skip_len;
467 _req_data = (char*)_req_data + skip_len;
468
469 return skip_len;
470 }
471
Link(CSockLink * conn)472 void CNetHandler::Link(CSockLink* conn)
473 {
474 this->_conn_ptr = conn;
475 this->SwitchToIdle();
476 }
477
Unlink()478 void CNetHandler::Unlink()
479 {
480 if (this->_state_flags != 0)
481 {
482 this->DetachConn();
483 }
484 this->_conn_ptr = NULL;
485 }
486
SwitchToConn()487 void CNetHandler::SwitchToConn()
488 {
489 CSockLink* conn = (CSockLink*)this->_conn_ptr;
490 if (NULL == conn)
491 {
492 MTLOG_ERROR("net handler invalid");
493 return;
494 }
495
496 this->DetachConn();
497
498 this->_state_flags |= STATE_IN_CONNECT;
499 conn->AppendToList(CSockLink::LINK_CONN_LIST, this);
500 }
501
SwitchToSend()502 void CNetHandler::SwitchToSend()
503 {
504 CSockLink* conn = (CSockLink*)this->_conn_ptr;
505 if (NULL == conn)
506 {
507 MTLOG_ERROR("net handler invalid");
508 return;
509 }
510
511 this->DetachConn();
512
513 this->_state_flags |= STATE_IN_SEND;
514 conn->AppendToList(CSockLink::LINK_SEND_LIST, this);
515 }
516
SwitchToRecv()517 void CNetHandler::SwitchToRecv()
518 {
519 CSockLink* conn = (CSockLink*)this->_conn_ptr;
520 if (NULL == conn)
521 {
522 MTLOG_ERROR("net handler invalid");
523 return;
524 }
525
526 this->DetachConn();
527
528 this->_state_flags |= STATE_IN_RECV;
529 conn->AppendToList(CSockLink::LINK_RECV_LIST, this);
530 }
531
SwitchToIdle()532 void CNetHandler::SwitchToIdle()
533 {
534 CSockLink* conn = (CSockLink*)this->_conn_ptr;
535 if (NULL == conn)
536 {
537 MTLOG_ERROR("net handler invalid");
538 return;
539 }
540
541 this->DetachConn();
542
543 this->_state_flags |= STATE_IN_IDLE;
544 conn->AppendToList(CSockLink::LINK_IDLE_LIST, this);
545 }
546
DetachConn()547 void CNetHandler::DetachConn()
548 {
549 CSockLink* conn = (CSockLink*)this->_conn_ptr;
550 if (NULL == conn)
551 {
552 MTLOG_DEBUG("net handler not set");
553 return;
554 }
555
556 if (_state_flags == 0)
557 {
558 return;
559 }
560
561 if (_state_flags & STATE_IN_CONNECT)
562 {
563 conn->RemoveFromList(CSockLink::LINK_CONN_LIST, this);
564 _state_flags &= ~STATE_IN_CONNECT;
565 }
566
567 if (_state_flags & STATE_IN_SEND)
568 {
569 conn->RemoveFromList(CSockLink::LINK_SEND_LIST, this);
570 _state_flags &= ~STATE_IN_SEND;
571 }
572
573 if (_state_flags & STATE_IN_RECV)
574 {
575 conn->RemoveFromList(CSockLink::LINK_RECV_LIST, this);
576 _state_flags &= ~STATE_IN_RECV;
577 }
578
579 if (_state_flags & STATE_IN_IDLE)
580 {
581 conn->RemoveFromList(CSockLink::LINK_IDLE_LIST, this);
582 _state_flags &= ~STATE_IN_IDLE;
583 }
584 }
585
HashValue()586 uint32_t CNetHandler::HashValue()
587 {
588 uint32_t ip = _dest_ipv4.sin_addr.s_addr;
589 ip ^= (_dest_ipv4.sin_port << 16) | (_proto_type << 8) | (_conn_type << 8);
590
591 uint32_t hash = (_session_id >> 32) & 0xffffffff;
592 hash ^= _session_id & 0xffffffff;
593 hash ^= ip;
594
595 return hash;
596 }
597
HashCmp(HashKey * rhs)598 int32_t CNetHandler::HashCmp(HashKey* rhs)
599 {
600 CNetHandler* data = (CNetHandler*)(rhs);
601 if (!data) {
602 return -1;
603 }
604 if (this->_session_id != data->_session_id)
605 {
606 return (this->_session_id > data->_session_id) ? 1 : -1;
607 }
608
609 if (this->_dest_ipv4.sin_addr.s_addr != data->_dest_ipv4.sin_addr.s_addr) {
610 return (this->_dest_ipv4.sin_addr.s_addr > data->_dest_ipv4.sin_addr.s_addr) ? 1 : -1;
611 }
612 if (this->_dest_ipv4.sin_port != data->_dest_ipv4.sin_port) {
613 return (this->_dest_ipv4.sin_port > data->_dest_ipv4.sin_port) ? 1 : -1;
614 }
615 if (this->_proto_type != data->_proto_type) {
616 return (this->_proto_type > data->_proto_type) ? 1 : -1;
617 }
618 if (this->_conn_type != data->_conn_type) {
619 return (this->_conn_type > data->_conn_type) ? 1 : -1;
620 }
621
622 return 0;
623 };
624
RegistSession()625 bool CNetHandler::RegistSession()
626 {
627 if (CNetMgr::Instance()->FindNetItem(this) != NULL)
628 {
629 return false;
630 }
631
632 MtFrame* mtframe = MtFrame::Instance();
633 this->_thread = mtframe->GetActiveThread();
634
635 CNetMgr::Instance()->InsertNetItem(this);
636 this->_state_flags |= STATE_IN_SESSION;
637 return true;
638 }
639
UnRegistSession()640 void CNetHandler::UnRegistSession()
641 {
642 if (this->_state_flags & STATE_IN_SESSION)
643 {
644 CNetMgr::Instance()->RemoveNetItem(this);
645 this->_state_flags &= ~STATE_IN_SESSION;
646 }
647 }
648
GetItemList(int32_t type)649 TNetItemList* CSockLink::GetItemList(int32_t type)
650 {
651 TNetItemList* list = NULL;
652 switch (type)
653 {
654 case LINK_IDLE_LIST:
655 list = &this->_idle_list;
656 break;
657
658 case LINK_CONN_LIST:
659 list = &this->_wait_connect;
660 break;
661
662 case LINK_SEND_LIST:
663 list = &this->_wait_send;
664 break;
665
666 case LINK_RECV_LIST:
667 list = &this->_wait_recv;
668 break;
669
670 default:
671 break;
672 }
673
674 return list;
675 }
676
AppendToList(int32_t type,CNetHandler * item)677 void CSockLink::AppendToList(int32_t type, CNetHandler* item)
678 {
679 TNetItemList* list = this->GetItemList(type);
680 if (NULL == list)
681 {
682 MTLOG_ERROR("unknown list type: %d", type);
683 return;
684 }
685
686 TAILQ_INSERT_TAIL(list, item, _link_entry);
687 }
688
RemoveFromList(int32_t type,CNetHandler * item)689 void CSockLink::RemoveFromList(int32_t type, CNetHandler* item)
690 {
691 TNetItemList* list = this->GetItemList(type);
692 if (NULL == list)
693 {
694 MTLOG_ERROR("unknown list type: %d", type);
695 return;
696 }
697
698 TAILQ_REMOVE(list, item, _link_entry);
699 }
700
NotifyThread(CNetHandler * item,int32_t result)701 void CSockLink::NotifyThread(CNetHandler* item, int32_t result)
702 {
703 static MtFrame* frame = NULL;
704 if (frame == NULL) {
705 frame = MtFrame::Instance();
706 }
707
708 if (result != RC_SUCCESS)
709 {
710 item->SetErrNo(result);
711 }
712
713 MicroThread* thread = item->GetThread();
714 if ((thread != NULL) && (thread->HasFlag(MicroThread::IO_LIST)))
715 {
716 frame->RemoveIoWait(thread);
717 frame->InsertRunable(thread);
718 }
719 }
720
NotifyAll(int32_t result)721 void CSockLink::NotifyAll(int32_t result)
722 {
723 CNetHandler* item = NULL;
724 CNetHandler* tmp = NULL;
725
726 TAILQ_FOREACH_SAFE(item, &_wait_connect, _link_entry, tmp)
727 {
728 NotifyThread(item, result);
729 item->Unlink();
730 }
731
732 TAILQ_FOREACH_SAFE(item, &_wait_send, _link_entry, tmp)
733 {
734 NotifyThread(item, result);
735 item->Unlink();
736 }
737
738 TAILQ_FOREACH_SAFE(item, &_wait_recv, _link_entry, tmp)
739 {
740 NotifyThread(item, result);
741 item->Unlink();
742 }
743
744 TAILQ_FOREACH_SAFE(item, &_idle_list, _link_entry, tmp)
745 {
746 NotifyThread(item, result);
747 item->Unlink();
748 }
749 }
750
Reset()751 void CSockLink::Reset()
752 {
753 this->Close();
754 this->NotifyAll(_errno);
755
756 rw_cache_destroy(&_recv_cache);
757 if (_rsp_buff != NULL)
758 {
759 delete_sk_buffer(_rsp_buff);
760 _rsp_buff = NULL;
761 }
762
763 TAILQ_INIT(&_wait_connect);
764 TAILQ_INIT(&_wait_send);
765 TAILQ_INIT(&_wait_recv);
766 TAILQ_INIT(&_idle_list);
767
768 _proto_type = NET_PROTO_TCP;
769 _errno = 0;
770 _state = 0;
771 _last_access = mt_time_ms();
772 _parents = NULL;
773
774 this->KqueuerObj::Reset();
775 }
776
CSockLink()777 CSockLink::CSockLink()
778 {
779 rw_cache_init(&_recv_cache, NULL);
780 _rsp_buff = NULL;
781
782 TAILQ_INIT(&_wait_connect);
783 TAILQ_INIT(&_wait_send);
784 TAILQ_INIT(&_wait_recv);
785 TAILQ_INIT(&_idle_list);
786
787 _proto_type = NET_PROTO_TCP;
788 _errno = 0;
789 _state = 0;
790 _last_access = mt_time_ms();
791 _parents = NULL;
792 }
793
~CSockLink()794 CSockLink::~CSockLink()
795 {
796 this->Reset();
797 }
798
SetProtoType(MT_PROTO_TYPE type)799 void CSockLink::SetProtoType(MT_PROTO_TYPE type)
800 {
801 _proto_type = type;
802 _recv_cache.pool = CNetMgr::Instance()->GetSkBuffMng(type);
803 }
804
Close()805 void CSockLink::Close()
806 {
807 if (_fd < 0)
808 {
809 return;
810 }
811
812 MtFrame::Instance()->KqueueDelObj(this);
813
814 close(_fd);
815 _fd = -1;
816 }
817
Destroy()818 void CSockLink::Destroy()
819 {
820 CDestLinks* dstlink = (CDestLinks*)_parents;
821 if (NULL == dstlink)
822 {
823 MTLOG_ERROR("socket link without parents ptr, maybe wrong");
824 delete this;
825 }
826 else
827 {
828 MTLOG_DEBUG("socket link just free");
829 dstlink->FreeSockLink(this);
830 }
831 }
832
CreateSock()833 int32_t CSockLink::CreateSock()
834 {
835 if (_fd > 0)
836 {
837 return _fd;
838 }
839
840 if (NET_PROTO_TCP == _proto_type)
841 {
842 _fd = socket(AF_INET, SOCK_STREAM, 0);
843 }
844 else
845 {
846 _fd = socket(AF_INET, SOCK_DGRAM, 0);
847 }
848
849 if (_fd < 0)
850 {
851 MTLOG_ERROR("create socket failed, ret %d[%m]", _fd);
852 return -1;
853 }
854
855 int flags = 1;
856 if (ioctl(_fd, FIONBIO, &flags) < 0)
857 {
858 MTLOG_ERROR("socket unblock failed, %m");
859 close(_fd);
860 _fd = -1;
861 return -2;
862 }
863
864 if (NET_PROTO_TCP == _proto_type)
865 {
866 setsockopt(_fd, IPPROTO_TCP, TCP_NODELAY, &flags, sizeof(flags));
867 this->EnableOutput();
868 }
869
870 this->EnableInput();
871 if (!MtFrame::Instance()->KqueueAddObj(this))
872 {
873 MTLOG_ERROR("socket epoll mng failed, %m");
874 close(_fd);
875 _fd = -1;
876 return -3;
877 }
878
879 return _fd;
880 }
881
GetDestAddr(struct sockaddr_in * addr)882 struct sockaddr_in* CSockLink::GetDestAddr(struct sockaddr_in* addr)
883 {
884 CDestLinks* dstlink = (CDestLinks*)_parents;
885 if ((NULL == _parents) || (NULL == addr)) {
886 return NULL;
887 }
888
889 uint32_t ip = 0;
890 uint16_t port = 0;
891 dstlink->GetDestIP(ip, port);
892
893 addr->sin_family = AF_INET;
894 addr->sin_addr.s_addr = ip;
895 addr->sin_port = port;
896
897 return addr;
898 }
899
Connect()900 bool CSockLink::Connect()
901 {
902 this->_last_access = mt_time_ms();
903
904 if (_proto_type == NET_PROTO_UDP)
905 {
906 _state |= LINK_CONNECTED;
907 }
908
909 if (_state & LINK_CONNECTED)
910 {
911 return true;
912 }
913
914 if (_state & LINK_CONNECTING)
915 {
916 return false;
917 }
918
919 struct sockaddr_in addr = {0};
920
921 mt_hook_syscall(connect);
922 int32_t ret = ff_hook_connect(_fd, (struct sockaddr*)this->GetDestAddr(&addr), sizeof(struct sockaddr_in));
923 if (ret < 0)
924 {
925 int32_t err = errno;
926 if (err == EISCONN)
927 {
928 _state |= LINK_CONNECTED;
929 return true;
930 }
931 else
932 {
933 _state |= LINK_CONNECTING;
934 if ((err == EINPROGRESS) || (err == EALREADY) || (err == EINTR))
935 {
936 MTLOG_DEBUG("Open connect not ok, maybe first try, sock %d, errno %d", _fd, err);
937 return false;
938 }
939 else
940 {
941 MTLOG_ERROR("Open connect not ok, sock %d, errno %d", _fd, err);
942 return false;
943 }
944 }
945 }
946 else
947 {
948 _state |= LINK_CONNECTED;
949 return true;
950 }
951 }
952
SendCacheUdp(void * data,uint32_t len)953 int32_t CSockLink::SendCacheUdp(void* data, uint32_t len)
954 {
955 mt_hook_syscall(sendto);
956 void* buff = NULL;
957 uint32_t buff_len = 0;
958
959 CNetHandler* item = NULL;
960 CNetHandler* tmp = NULL;
961 struct sockaddr_in dst = {0};
962
963 TAILQ_FOREACH_SAFE(item, &_wait_send, _link_entry, tmp)
964 {
965 item->GetSendData(buff, buff_len);
966 if ((NULL == buff) || (buff_len == 0))
967 {
968 MTLOG_ERROR("get buff ptr invalid, log it");
969 NotifyThread(item, 0);
970 item->SwitchToIdle();
971 continue;
972 }
973
974 int32_t ret = ff_hook_sendto(_fd, buff, buff_len, 0,
975 (struct sockaddr*)this->GetDestAddr(&dst), sizeof(struct sockaddr_in));
976 if (ret == -1)
977 {
978 if ((errno == EINTR) || (errno == EAGAIN) || (errno == EINPROGRESS))
979 {
980 return 0;
981 }
982 else
983 {
984 MTLOG_ERROR("socket send failed, fd %d, errno %d(%s)", _fd,
985 errno, strerror(errno));
986 return -2;
987 }
988 }
989
990 NotifyThread(item, 0);
991 item->SwitchToIdle();
992 }
993
994 if ((data == NULL) || (len == 0))
995 {
996 return 0;
997 }
998
999 int32_t ret = ff_hook_sendto(_fd, data, len, 0,
1000 (struct sockaddr*)this->GetDestAddr(&dst), sizeof(struct sockaddr_in));
1001 if (ret == -1)
1002 {
1003 if ((errno == EINTR) || (errno == EAGAIN) || (errno == EINPROGRESS))
1004 {
1005 return 0;
1006 }
1007 else
1008 {
1009 MTLOG_ERROR("socket send failed, fd %d, errno %d(%s)", _fd,
1010 errno, strerror(errno));
1011 return -2;
1012 }
1013 }
1014 else
1015 {
1016 return ret;
1017 }
1018 }
1019
SendCacheTcp(void * data,uint32_t len)1020 int32_t CSockLink::SendCacheTcp(void* data, uint32_t len)
1021 {
1022 void* buff = NULL;
1023 uint32_t buff_len = 0;
1024 struct iovec iov[64];
1025 int32_t count = 0;
1026 CNetHandler* item = NULL;
1027 CNetHandler* tmp = NULL;
1028
1029 TAILQ_FOREACH_SAFE(item, &_wait_send, _link_entry, tmp)
1030 {
1031 item->GetSendData(buff, buff_len);
1032 iov[count].iov_base = buff;
1033 iov[count].iov_len = (int32_t)buff_len;
1034 count++;
1035 if (count >= 64)
1036 {
1037 break;
1038 }
1039 }
1040 if ((count < 64) && (data != NULL))
1041 {
1042 iov[count].iov_base = data;
1043 iov[count].iov_len = (int32_t)len;
1044 count++;
1045 }
1046
1047 ssize_t bytes = writev(_fd, iov, count);
1048 if (bytes < 0)
1049 {
1050 if ((errno == EAGAIN) || (errno == EINTR))
1051 {
1052 return 0;
1053 }
1054 else
1055 {
1056 MTLOG_ERROR("socket writev failed, fd %d, errno %d(%s)", _fd,
1057 errno, strerror(errno));
1058 return -1;
1059 }
1060 }
1061
1062 uint32_t send_left = (uint32_t)bytes;
1063 TAILQ_FOREACH_SAFE(item, &_wait_send, _link_entry, tmp)
1064 {
1065 send_left -= item->SkipSendPos(send_left);
1066 item->GetSendData(buff, buff_len);
1067 if (buff_len == 0)
1068 {
1069 NotifyThread(item, 0);
1070 item->SwitchToIdle();
1071 }
1072
1073 if (send_left == 0)
1074 {
1075 break;
1076 }
1077 }
1078
1079 return send_left;
1080 }
1081
SendData(void * data,uint32_t len)1082 int32_t CSockLink::SendData(void* data, uint32_t len)
1083 {
1084 int32_t ret = 0;
1085 bool rc = false;
1086
1087 this->_last_access = mt_time_ms();
1088
1089 if (_proto_type == NET_PROTO_UDP)
1090 {
1091 ret = SendCacheUdp(data, len);
1092 }
1093 else
1094 {
1095 ret = SendCacheTcp(data, len);
1096 }
1097
1098 if (ret < (int32_t)len)
1099 {
1100 this->EnableOutput();
1101 rc = MtFrame::Instance()->KqueueCtrlAdd(_fd, KQ_EVENT_READ);
1102 }
1103 else
1104 {
1105 this->DisableOutput();
1106 rc = MtFrame::Instance()->KqueueCtrlDel(_fd, KQ_EVENT_WRITE);
1107 }
1108
1109 if (!rc)
1110 {
1111 MTLOG_ERROR("socket epoll mng failed[%m], wait timeout");
1112 }
1113
1114 return ret;
1115 }
1116
RecvDispath()1117 int32_t CSockLink::RecvDispath()
1118 {
1119 if (_proto_type == NET_PROTO_UDP)
1120 {
1121 return this->DispathUdp();
1122 }
1123 else
1124 {
1125 return this->DispathTcp();
1126 }
1127 }
1128
ExtendRecvRsp()1129 void CSockLink::ExtendRecvRsp()
1130 {
1131 if (NULL == _rsp_buff)
1132 {
1133 _rsp_buff = new_sk_buffer(512);
1134 if (NULL == _rsp_buff)
1135 {
1136 MTLOG_ERROR("no more memory, error");
1137 return;
1138 }
1139 }
1140
1141 _rsp_buff->data_len += read_cache_begin(&_recv_cache, _rsp_buff->data_len,
1142 _rsp_buff->data + _rsp_buff->data_len , _rsp_buff->size - _rsp_buff->data_len);
1143 }
1144
GetSessionCallback()1145 CHECK_SESSION_CALLBACK CSockLink::GetSessionCallback()
1146 {
1147 CHECK_SESSION_CALLBACK check_session = NULL;
1148
1149 CNetHandler* item = TAILQ_FIRST(&_wait_recv);
1150 if (NULL == item)
1151 {
1152 MTLOG_DEBUG("recv data with no wait item, err");
1153 goto EXIT_LABEL;
1154 }
1155
1156 check_session = item->GetSessionCallback();
1157 if (NULL == check_session)
1158 {
1159 MTLOG_ERROR("recv data with no session callback, err");
1160 goto EXIT_LABEL;
1161 }
1162
1163 EXIT_LABEL:
1164
1165 CDestLinks* dstlink = (CDestLinks*)_parents;
1166 if (NULL == dstlink)
1167 {
1168 return check_session;
1169 }
1170
1171 if (check_session != NULL)
1172 {
1173 dstlink->SetDefaultCallback(check_session);
1174 }
1175 else
1176 {
1177 check_session = dstlink->GetDefaultCallback();
1178 }
1179
1180 return check_session;
1181 }
1182
DispathTcp()1183 int32_t CSockLink::DispathTcp()
1184 {
1185 CHECK_SESSION_CALLBACK check_session = this->GetSessionCallback();
1186 if (NULL == check_session)
1187 {
1188 MTLOG_ERROR("recv data with no session callback, err");
1189 return -1;
1190 }
1191
1192 uint32_t need_len = 0;
1193 uint64_t sid = 0;
1194 int32_t ret = 0;
1195 while (_recv_cache.len > 0)
1196 {
1197 this->ExtendRecvRsp();
1198 if (NULL == _rsp_buff)
1199 {
1200 MTLOG_ERROR("alloc memory, error");
1201 _errno = RC_MEM_ERROR;
1202 return -3;
1203 }
1204
1205 need_len = 0;
1206 ret = check_session(_rsp_buff->data, _rsp_buff->data_len, &sid, &need_len);
1207
1208 if (ret < 0)
1209 {
1210 MTLOG_ERROR("user check resp failed, ret %d", ret);
1211 _errno = RC_CHECK_PKG_FAIL;
1212 return -1;
1213 }
1214
1215 if (ret == 0)
1216 {
1217 if ((need_len == 0) && (_rsp_buff->data_len == _rsp_buff->size))
1218 {
1219 MTLOG_DEBUG("recv default buff full[%u], but user no set need length", _rsp_buff->size);
1220 need_len = _rsp_buff->size * 2;
1221 }
1222
1223 if ((need_len <= _rsp_buff->size) || (need_len > 100*1024*1024))
1224 {
1225 MTLOG_DEBUG("maybe need wait more data: %u", need_len);
1226 return 0;
1227 }
1228
1229 _rsp_buff = reserve_sk_buffer(_rsp_buff, need_len);
1230 if (NULL == _rsp_buff)
1231 {
1232 MTLOG_ERROR("no more memory, error");
1233 _errno = RC_MEM_ERROR;
1234 return -3;
1235 }
1236
1237 if (_rsp_buff->data_len >= _recv_cache.len)
1238 {
1239 MTLOG_DEBUG("maybe need wait more data, now %u", _recv_cache.len);
1240 return 0;
1241 }
1242
1243 continue;
1244 }
1245
1246 if (ret > (int32_t)_recv_cache.len)
1247 {
1248 MTLOG_DEBUG("maybe pkg not all ok, wait more");
1249 return 0;
1250 }
1251
1252 CNetHandler* session = this->FindSession(sid);
1253 if (NULL == session)
1254 {
1255 MTLOG_DEBUG("session id %llu, find failed, maybe timeout", sid);
1256 cache_skip_data(&_recv_cache, ret);
1257 delete_sk_buffer(_rsp_buff);
1258 _rsp_buff = NULL;
1259 }
1260 else
1261 {
1262 MTLOG_DEBUG("session id %llu, find ok, wakeup it", sid);
1263 cache_skip_data(&_recv_cache, ret);
1264 this->NotifyThread(session, 0);
1265 session->SwitchToIdle();
1266 _rsp_buff->data_len = ret;
1267 session->SetRespBuff(_rsp_buff);
1268 _rsp_buff = NULL;
1269 }
1270 }
1271
1272 return 0;
1273
1274 }
1275
DispathUdp()1276 int32_t CSockLink::DispathUdp()
1277 {
1278 CHECK_SESSION_CALLBACK check_session = NULL;
1279 CNetHandler* item = TAILQ_FIRST(&_wait_recv);
1280 if (NULL == item)
1281 {
1282 MTLOG_DEBUG("recv data with no wait item, maybe wrong pkg recv");
1283 }
1284 else
1285 {
1286 check_session = item->GetSessionCallback();
1287 if (NULL == check_session)
1288 {
1289 MTLOG_TRACE("recv data with no session callback, err");
1290 }
1291 }
1292
1293 uint64_t sid = 0;
1294 uint32_t need_len = 0;
1295 int32_t ret = 0;
1296 TSkBuffer* block = NULL;
1297 while ((block = TAILQ_FIRST(&_recv_cache.list)) != NULL)
1298 {
1299 if (check_session == NULL)
1300 {
1301 MTLOG_DEBUG("no recv wait, skip first block");
1302 cache_skip_data(&_recv_cache, block->data_len);
1303 continue;
1304 }
1305
1306 need_len = 0;
1307 ret = check_session(block->data, block->data_len, &sid, &need_len);
1308 if ((ret <= 0) || (ret > (int32_t)block->data_len))
1309 {
1310 MTLOG_DEBUG("maybe wrong pkg come, skip it");
1311 cache_skip_data(&_recv_cache, block->data_len);
1312 continue;
1313 }
1314
1315 CNetHandler* session = this->FindSession(sid);
1316 if (NULL == session)
1317 {
1318 MTLOG_DEBUG("session id %llu, find failed, maybe timeout", sid);
1319 cache_skip_data(&_recv_cache, block->data_len);
1320 }
1321 else
1322 {
1323 MTLOG_DEBUG("session id %llu, find ok, wakeup it", sid);
1324 this->NotifyThread(session, 0);
1325 session->SwitchToIdle();
1326 cache_skip_first_buffer(&_recv_cache);
1327 session->SetRespBuff(block);
1328 }
1329 }
1330
1331 return 0;
1332 }
1333
FindSession(uint64_t sid)1334 CNetHandler* CSockLink::FindSession(uint64_t sid)
1335 {
1336 CNetHandler key;
1337 CDestLinks* dstlink = (CDestLinks*)_parents;
1338 if (NULL == dstlink)
1339 {
1340 MTLOG_ERROR("session dest link invalid, maybe error");
1341 return NULL;
1342 }
1343 struct sockaddr_in addr;
1344 key.SetDestAddress(this->GetDestAddr(&addr));
1345 key.SetConnType(dstlink->GetConnType());
1346 key.SetProtoType(dstlink->GetProtoType());
1347 key.SetSessionId(sid);
1348
1349 return CNetMgr::Instance()->FindNetItem(&key);
1350 }
1351
InputNotify()1352 int CSockLink::InputNotify()
1353 {
1354 int32_t ret = 0;
1355
1356 this->_last_access = mt_time_ms();
1357
1358 if (_proto_type == NET_PROTO_UDP)
1359 {
1360 ret = cache_udp_recv(&_recv_cache, _fd, NULL);
1361 }
1362 else
1363 {
1364 ret = cache_tcp_recv(&_recv_cache, _fd);
1365 }
1366
1367 if (ret < 0)
1368 {
1369 if (ret == -SK_ERR_NEED_CLOSE)
1370 {
1371 MTLOG_DEBUG("recv on link failed, remote close");
1372 _errno = RC_REMOTE_CLOSED;
1373 }
1374 else
1375 {
1376 MTLOG_ERROR("recv on link failed, close it, ret %d[%m]", ret);
1377 _errno = RC_RECV_FAIL;
1378 }
1379
1380 this->Destroy();
1381 return -1;
1382 }
1383
1384 ret = this->RecvDispath();
1385 if (ret < 0)
1386 {
1387 MTLOG_DEBUG("recv dispath failed, close it, ret %d[%m]", ret);
1388 this->Destroy();
1389 return -2;
1390 }
1391
1392 return 0;
1393
1394 }
1395
OutputNotify()1396 int CSockLink::OutputNotify()
1397 {
1398 int32_t ret = 0;
1399
1400 this->_last_access = mt_time_ms();
1401
1402 if (_state & LINK_CONNECTING)
1403 {
1404 _state &= ~LINK_CONNECTING;
1405 _state |= LINK_CONNECTED;
1406
1407 CNetHandler* item = NULL;
1408 CNetHandler* tmp = NULL;
1409 TAILQ_FOREACH_SAFE(item, &_wait_connect, _link_entry, tmp)
1410 {
1411 NotifyThread(item, 0);
1412 item->SwitchToIdle();
1413 }
1414 }
1415
1416 if (_proto_type == NET_PROTO_UDP)
1417 {
1418 ret = SendCacheUdp(NULL, 0);
1419 }
1420 else
1421 {
1422 ret = SendCacheTcp(NULL, 0);
1423 }
1424
1425 if (ret < 0)
1426 {
1427 MTLOG_ERROR("Send on link failed, close it, ret %d[%m]", ret);
1428 _errno = RC_SEND_FAIL;
1429 this->Destroy();
1430 return ret;
1431 }
1432
1433 if (TAILQ_EMPTY(&_wait_send))
1434 {
1435 this->DisableOutput();
1436 if (!MtFrame::Instance()->KqueueCtrlDel(_fd, KQ_EVENT_WRITE))
1437 {
1438 MTLOG_ERROR("socket epoll mng failed[%m], wait timeout");
1439 }
1440 }
1441
1442 return 0;
1443 }
1444
HangupNotify()1445 int CSockLink::HangupNotify()
1446 {
1447 MTLOG_ERROR("socket epoll error, fd %d", _fd);
1448
1449 this->_errno = RC_KQUEUE_ERROR;
1450 this->Destroy();
1451 return -1;
1452 }
1453
CDestLinks()1454 CDestLinks::CDestLinks()
1455 {
1456 _timeout = 5*60*1000;
1457 _addr_ipv4 = 0;
1458 _net_port = 0;
1459 _proto_type = NET_PROTO_UNDEF;
1460 _conn_type = TYPE_CONN_SESSION;
1461
1462 _max_links = 3; // 默认3个
1463 _curr_link = 0;
1464 _dflt_callback = NULL;
1465
1466 TAILQ_INIT(&_sock_list);
1467 }
1468
Reset()1469 void CDestLinks::Reset()
1470 {
1471 CSockLink* item = NULL;
1472 CSockLink* temp = NULL;
1473 TAILQ_FOREACH_SAFE(item, &_sock_list, _link_entry, temp)
1474 {
1475 item->Destroy();
1476 }
1477 TAILQ_INIT(&_sock_list);
1478
1479 CTimerMng* timer = MtFrame::Instance()->GetTimerMng();
1480 if (NULL != timer)
1481 {
1482 timer->stop_timer(this);
1483 }
1484
1485 _timeout = 5*60*1000;
1486 _addr_ipv4 = 0;
1487 _net_port = 0;
1488 _proto_type = NET_PROTO_UNDEF;
1489 _conn_type = TYPE_CONN_SESSION;
1490
1491 _max_links = 3;
1492 _curr_link = 0;
1493 }
1494
~CDestLinks()1495 CDestLinks::~CDestLinks()
1496 {
1497 this->Reset();
1498 }
1499
StartTimer()1500 void CDestLinks::StartTimer()
1501 {
1502 CTimerMng* timer = MtFrame::Instance()->GetTimerMng();
1503 if ((NULL == timer) || !timer->start_timer(this, 60*1000))
1504 {
1505 MTLOG_ERROR("obj %p attach timer failed, error", this);
1506 }
1507 }
1508
FreeSockLink(CSockLink * sock)1509 void CDestLinks::FreeSockLink(CSockLink* sock)
1510 {
1511 if ((sock == NULL) || (sock->GetParentsPtr() != (void*)this))
1512 {
1513 MTLOG_ERROR("invalid socklink %p, error", sock);
1514 return;
1515 }
1516
1517 TAILQ_REMOVE(&_sock_list, sock, _link_entry);
1518 if (this->_curr_link > 0) {
1519 this->_curr_link--;
1520 }
1521
1522 sock->Reset();
1523 CNetMgr::Instance()->FreeSockLink(sock);
1524 }
1525
GetSockLink()1526 CSockLink* CDestLinks::GetSockLink()
1527 {
1528 CSockLink* link = NULL;
1529 if (_curr_link < _max_links)
1530 {
1531 link = CNetMgr::Instance()->AllocSockLink();
1532 if (NULL == link)
1533 {
1534 MTLOG_ERROR("alloc sock link failed, error");
1535 return NULL;
1536 }
1537 link->SetParentsPtr(this);
1538 link->SetProtoType(_proto_type);
1539 TAILQ_INSERT_TAIL(&_sock_list, link, _link_entry);
1540 _curr_link++;
1541 }
1542 else
1543 {
1544 link = TAILQ_FIRST(&_sock_list);
1545 TAILQ_REMOVE(&_sock_list, link, _link_entry);
1546 TAILQ_INSERT_TAIL(&_sock_list, link, _link_entry);
1547 }
1548
1549 return link;
1550 }
1551
timer_notify()1552 void CDestLinks::timer_notify()
1553 {
1554 uint64_t now = mt_time_ms();
1555 CSockLink* item = NULL;
1556 CSockLink* temp = NULL;
1557 TAILQ_FOREACH_SAFE(item, &_sock_list, _link_entry, temp)
1558 {
1559 if ((item->GetLastAccess() + this->_timeout) < now)
1560 {
1561 MTLOG_DEBUG("link timeout, last[%llu], now [%llu]", item->GetLastAccess(), now);
1562 item->Destroy();
1563 }
1564 }
1565
1566 item = TAILQ_FIRST(&_sock_list);
1567 if (NULL == item)
1568 {
1569 MTLOG_DEBUG("dest links timeout, now [%llu]", now);
1570 CNetMgr::Instance()->DeleteDestLink(this);
1571 return;
1572 }
1573
1574 this->StartTimer();
1575
1576 return;
1577 }
1578
1579 CNetMgr* CNetMgr::_instance = NULL;
Instance(void)1580 CNetMgr* CNetMgr::Instance (void)
1581 {
1582 if (NULL == _instance)
1583 {
1584 _instance = new CNetMgr();
1585 }
1586
1587 return _instance;
1588 }
1589
Destroy()1590 void CNetMgr::Destroy()
1591 {
1592 if( _instance != NULL )
1593 {
1594 delete _instance;
1595 _instance = NULL;
1596 }
1597 }
1598
FindNetItem(CNetHandler * key)1599 CNetHandler* CNetMgr::FindNetItem(CNetHandler* key)
1600 {
1601 if (NULL == this->_session_hash)
1602 {
1603 return NULL;
1604 }
1605
1606 return (CNetHandler*)_session_hash->HashFind(key);
1607 }
1608
InsertNetItem(CNetHandler * item)1609 void CNetMgr::InsertNetItem(CNetHandler* item)
1610 {
1611 if (NULL == this->_session_hash)
1612 {
1613 return;
1614 }
1615
1616 int32_t ret = _session_hash->HashInsert(item);
1617 if (ret < 0)
1618 {
1619 MTLOG_ERROR("session insert failed, ret %d", ret);
1620 }
1621
1622 return;
1623 }
1624
RemoveNetItem(CNetHandler * item)1625 void CNetMgr::RemoveNetItem(CNetHandler* item)
1626 {
1627 CNetHandler* handler = this->FindNetItem(item);
1628 if (NULL == handler)
1629 {
1630 return;
1631 }
1632
1633 _session_hash->HashRemove(handler);
1634 }
1635
FindDestLink(CDestLinks * key)1636 CDestLinks* CNetMgr::FindDestLink(CDestLinks* key)
1637 {
1638 if (NULL == this->_ip_hash)
1639 {
1640 return NULL;
1641 }
1642
1643 return (CDestLinks*)_ip_hash->HashFind(key);
1644 }
1645
InsertDestLink(CDestLinks * item)1646 void CNetMgr::InsertDestLink(CDestLinks* item)
1647 {
1648 if (NULL == this->_ip_hash)
1649 {
1650 return;
1651 }
1652
1653 int32_t ret = _ip_hash->HashInsert(item);
1654 if (ret < 0)
1655 {
1656 MTLOG_ERROR("ip dest insert failed, ret %d", ret);
1657 }
1658
1659 return;
1660 }
1661
RemoveDestLink(CDestLinks * item)1662 void CNetMgr::RemoveDestLink(CDestLinks* item)
1663 {
1664 CDestLinks* handler = this->FindDestLink(item);
1665 if (NULL == handler)
1666 {
1667 return;
1668 }
1669
1670 _ip_hash->HashRemove(handler);
1671 }
1672
FindCreateDest(CDestLinks * key)1673 CDestLinks* CNetMgr::FindCreateDest(CDestLinks* key)
1674 {
1675 CDestLinks* dest = this->FindDestLink(key);
1676 if (dest != NULL)
1677 {
1678 MTLOG_DEBUG("dest links reuse ok");
1679 return dest;
1680 }
1681
1682 dest = this->AllocDestLink();
1683 if (NULL == dest)
1684 {
1685 MTLOG_ERROR("dest links alloc failed, log it");
1686 return NULL;
1687 }
1688
1689 dest->CopyKeyInfo(key);
1690 dest->StartTimer();
1691 this->InsertDestLink(dest);
1692
1693 return dest;
1694 }
1695
DeleteDestLink(CDestLinks * dst)1696 void CNetMgr::DeleteDestLink(CDestLinks* dst)
1697 {
1698 this->RemoveDestLink(dst);
1699 dst->Reset();
1700 this->FreeDestLink(dst);
1701 }
1702
CNetMgr()1703 CNetMgr::CNetMgr()
1704 {
1705 sk_buffer_mng_init(&_tcp_pool, 60, 4096);
1706 sk_buffer_mng_init(&_udp_pool, 60, SK_DFLT_BUFF_SIZE);
1707
1708 _ip_hash = new HashList(100000);
1709 _session_hash = new HashList(100000);
1710 }
1711
~CNetMgr()1712 CNetMgr::~CNetMgr()
1713 {
1714 if (_ip_hash != NULL)
1715 {
1716 HashKey* hash_item = _ip_hash->HashGetFirst();
1717 while (hash_item)
1718 {
1719 delete hash_item;
1720 hash_item = _ip_hash->HashGetFirst();
1721 }
1722
1723 delete _ip_hash;
1724 _ip_hash = NULL;
1725 }
1726
1727 if (_session_hash != NULL)
1728 {
1729 HashKey* hash_item = _session_hash->HashGetFirst();
1730 while (hash_item)
1731 {
1732 delete hash_item;
1733 hash_item = _session_hash->HashGetFirst();
1734 }
1735
1736 delete _session_hash;
1737 _session_hash = NULL;
1738 }
1739
1740 sk_buffer_mng_destroy(&_tcp_pool);
1741 sk_buffer_mng_destroy(&_udp_pool);
1742 }
1743
RecycleObjs(uint64_t now)1744 void CNetMgr::RecycleObjs(uint64_t now)
1745 {
1746 uint32_t now_s = (uint32_t)(now / 1000);
1747
1748 recycle_sk_buffer(&_udp_pool, now_s);
1749 recycle_sk_buffer(&_tcp_pool, now_s);
1750
1751 _net_item_pool.RecycleItem(now);
1752 _sock_link_pool.RecycleItem(now);
1753 _dest_ip_pool.RecycleItem(now);
1754 }
1755