xref: /f-stack/app/micro_thread/mt_net.cpp (revision a9643ea8)
1*a9643ea8Slogwang 
2*a9643ea8Slogwang /**
3*a9643ea8Slogwang  * Tencent is pleased to support the open source community by making MSEC available.
4*a9643ea8Slogwang  *
5*a9643ea8Slogwang  * Copyright (C) 2016 THL A29 Limited, a Tencent company. All rights reserved.
6*a9643ea8Slogwang  *
7*a9643ea8Slogwang  * Licensed under the GNU General Public License, Version 2.0 (the "License");
8*a9643ea8Slogwang  * you may not use this file except in compliance with the License. You may
9*a9643ea8Slogwang  * obtain a copy of the License at
10*a9643ea8Slogwang  *
11*a9643ea8Slogwang  *     https://opensource.org/licenses/GPL-2.0
12*a9643ea8Slogwang  *
13*a9643ea8Slogwang  * Unless required by applicable law or agreed to in writing, software distributed under the
14*a9643ea8Slogwang  * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
15*a9643ea8Slogwang  * either express or implied. See the License for the specific language governing permissions
16*a9643ea8Slogwang  * and limitations under the License.
17*a9643ea8Slogwang  */
18*a9643ea8Slogwang 
19*a9643ea8Slogwang 
20*a9643ea8Slogwang /**
21*a9643ea8Slogwang  *  @file mt_mbuf_pool.cpp
22*a9643ea8Slogwang  *  @info ΢�߳���Ϣbuf�ع���ʵ��
23*a9643ea8Slogwang  *  @time 20130924
24*a9643ea8Slogwang  **/
25*a9643ea8Slogwang 
26*a9643ea8Slogwang #include <errno.h>
27*a9643ea8Slogwang #include <netinet/tcp.h>
28*a9643ea8Slogwang #include "micro_thread.h"
29*a9643ea8Slogwang #include "mt_sys_hook.h"
30*a9643ea8Slogwang #include "ff_hook.h"
31*a9643ea8Slogwang #include "mt_net.h"
32*a9643ea8Slogwang 
33*a9643ea8Slogwang 
34*a9643ea8Slogwang using namespace std;
35*a9643ea8Slogwang using namespace NS_MICRO_THREAD;
36*a9643ea8Slogwang 
37*a9643ea8Slogwang // �������鹹
38*a9643ea8Slogwang CNetHelper::CNetHelper()
39*a9643ea8Slogwang {
40*a9643ea8Slogwang     handler = (void*)CNetMgr::Instance()->AllocNetItem();
41*a9643ea8Slogwang }
42*a9643ea8Slogwang 
43*a9643ea8Slogwang CNetHelper::~CNetHelper()
44*a9643ea8Slogwang {
45*a9643ea8Slogwang     CNetHandler* net_handler = (CNetHandler*)handler;
46*a9643ea8Slogwang     if (handler != NULL)
47*a9643ea8Slogwang     {
48*a9643ea8Slogwang         net_handler->Reset();
49*a9643ea8Slogwang         CNetMgr::Instance()->FreeNetItem(net_handler);
50*a9643ea8Slogwang         handler = NULL;
51*a9643ea8Slogwang     }
52*a9643ea8Slogwang }
53*a9643ea8Slogwang 
54*a9643ea8Slogwang // ͬ���շ��ӿ�
55*a9643ea8Slogwang int32_t CNetHelper::SendRecv(void* data, uint32_t len, uint32_t timeout)
56*a9643ea8Slogwang {
57*a9643ea8Slogwang     if (handler != NULL) {
58*a9643ea8Slogwang         CNetHandler* net_handler = (CNetHandler*)handler;
59*a9643ea8Slogwang         return net_handler->SendRecv(data, len, timeout);
60*a9643ea8Slogwang     } else {
61*a9643ea8Slogwang         return RC_INVALID_HANDLER;
62*a9643ea8Slogwang     }
63*a9643ea8Slogwang }
64*a9643ea8Slogwang 
65*a9643ea8Slogwang // ��ȡ����buff��Ϣ, ��Ч��ֱ��helper����
66*a9643ea8Slogwang void* CNetHelper::GetRspBuff()
67*a9643ea8Slogwang {
68*a9643ea8Slogwang     if (handler != NULL) {
69*a9643ea8Slogwang         CNetHandler* net_handler = (CNetHandler*)handler;
70*a9643ea8Slogwang         return net_handler->GetRspBuff();
71*a9643ea8Slogwang     } else {
72*a9643ea8Slogwang         return NULL;
73*a9643ea8Slogwang     }
74*a9643ea8Slogwang }
75*a9643ea8Slogwang 
76*a9643ea8Slogwang // ��ȡ����buff��Ϣ, ��Ч��ֱ��helper����
77*a9643ea8Slogwang uint32_t CNetHelper::GetRspLen()
78*a9643ea8Slogwang {
79*a9643ea8Slogwang     if (handler != NULL) {
80*a9643ea8Slogwang         CNetHandler* net_handler = (CNetHandler*)handler;
81*a9643ea8Slogwang         return net_handler->GetRspLen();
82*a9643ea8Slogwang     } else {
83*a9643ea8Slogwang         return 0;
84*a9643ea8Slogwang     }
85*a9643ea8Slogwang }
86*a9643ea8Slogwang 
87*a9643ea8Slogwang 
88*a9643ea8Slogwang // ת����������Ϣ, �����ȡ
89*a9643ea8Slogwang char* CNetHelper::GetErrMsg(int32_t result)
90*a9643ea8Slogwang {
91*a9643ea8Slogwang     static const char* errmsg = "unknown error type";
92*a9643ea8Slogwang 
93*a9643ea8Slogwang     switch (result)
94*a9643ea8Slogwang     {
95*a9643ea8Slogwang         case RC_SUCCESS:
96*a9643ea8Slogwang             errmsg = "success";
97*a9643ea8Slogwang             break;
98*a9643ea8Slogwang 
99*a9643ea8Slogwang         case RC_ERR_SOCKET:
100*a9643ea8Slogwang             errmsg = "create socket failed";
101*a9643ea8Slogwang             break;
102*a9643ea8Slogwang 
103*a9643ea8Slogwang         case RC_SEND_FAIL:
104*a9643ea8Slogwang             errmsg = "send pakeage timeout or failed";
105*a9643ea8Slogwang             break;
106*a9643ea8Slogwang 
107*a9643ea8Slogwang         case RC_RECV_FAIL:
108*a9643ea8Slogwang             errmsg = "recv response timeout or failed";
109*a9643ea8Slogwang             break;
110*a9643ea8Slogwang 
111*a9643ea8Slogwang         case RC_CONNECT_FAIL:
112*a9643ea8Slogwang             errmsg = "connect timeout or failed";
113*a9643ea8Slogwang             break;
114*a9643ea8Slogwang 
115*a9643ea8Slogwang         case RC_CHECK_PKG_FAIL:
116*a9643ea8Slogwang             errmsg = "user package check failed";
117*a9643ea8Slogwang             break;
118*a9643ea8Slogwang 
119*a9643ea8Slogwang         case RC_NO_MORE_BUFF:
120*a9643ea8Slogwang             errmsg = "user response buffer too small";
121*a9643ea8Slogwang             break;
122*a9643ea8Slogwang 
123*a9643ea8Slogwang         case RC_REMOTE_CLOSED:
124*a9643ea8Slogwang             errmsg = "remote close connection";
125*a9643ea8Slogwang             break;
126*a9643ea8Slogwang 
127*a9643ea8Slogwang         case RC_INVALID_PARAM:
128*a9643ea8Slogwang             errmsg = "params invalid";
129*a9643ea8Slogwang             break;
130*a9643ea8Slogwang 
131*a9643ea8Slogwang         case RC_INVALID_HANDLER:
132*a9643ea8Slogwang             errmsg = "net handler invalid";
133*a9643ea8Slogwang             break;
134*a9643ea8Slogwang 
135*a9643ea8Slogwang         case RC_MEM_ERROR:
136*a9643ea8Slogwang             errmsg = "no more memory, alloc failed";
137*a9643ea8Slogwang             break;
138*a9643ea8Slogwang 
139*a9643ea8Slogwang         case RC_CONFLICT_SID:
140*a9643ea8Slogwang             errmsg = "session id with the dest address conflict";
141*a9643ea8Slogwang             break;
142*a9643ea8Slogwang 
143*a9643ea8Slogwang         case RC_KQUEUE_ERROR:
144*a9643ea8Slogwang             errmsg = "epoll system error";
145*a9643ea8Slogwang             break;
146*a9643ea8Slogwang 
147*a9643ea8Slogwang         default:
148*a9643ea8Slogwang             break;
149*a9643ea8Slogwang     }
150*a9643ea8Slogwang 
151*a9643ea8Slogwang     return (char*)errmsg;
152*a9643ea8Slogwang }
153*a9643ea8Slogwang 
154*a9643ea8Slogwang // ����Э�������, Ĭ��UDP
155*a9643ea8Slogwang void CNetHelper::SetProtoType(MT_PROTO_TYPE type)
156*a9643ea8Slogwang {
157*a9643ea8Slogwang     if (handler != NULL) {
158*a9643ea8Slogwang         CNetHandler* net_handler = (CNetHandler*)handler;
159*a9643ea8Slogwang         return net_handler->SetProtoType(type);
160*a9643ea8Slogwang     }
161*a9643ea8Slogwang }
162*a9643ea8Slogwang 
163*a9643ea8Slogwang // ����Ŀ��IP��ַ
164*a9643ea8Slogwang void CNetHelper::SetDestAddress(struct sockaddr_in* dst)
165*a9643ea8Slogwang {
166*a9643ea8Slogwang     if (handler != NULL) {
167*a9643ea8Slogwang         CNetHandler* net_handler = (CNetHandler*)handler;
168*a9643ea8Slogwang         return net_handler->SetDestAddress(dst);
169*a9643ea8Slogwang     }
170*a9643ea8Slogwang }
171*a9643ea8Slogwang 
172*a9643ea8Slogwang // ����session����session id��Ϣ, �����0
173*a9643ea8Slogwang void CNetHelper::SetSessionId(uint64_t sid)
174*a9643ea8Slogwang {
175*a9643ea8Slogwang     if (handler != NULL) {
176*a9643ea8Slogwang         CNetHandler* net_handler = (CNetHandler*)handler;
177*a9643ea8Slogwang         return net_handler->SetSessionId(sid);
178*a9643ea8Slogwang     }
179*a9643ea8Slogwang }
180*a9643ea8Slogwang 
181*a9643ea8Slogwang // ����session�����ص�����
182*a9643ea8Slogwang void CNetHelper::SetSessionCallback(CHECK_SESSION_CALLBACK function)
183*a9643ea8Slogwang {
184*a9643ea8Slogwang     if (handler != NULL) {
185*a9643ea8Slogwang         CNetHandler* net_handler = (CNetHandler*)handler;
186*a9643ea8Slogwang         return net_handler->SetSessionCallback(function);
187*a9643ea8Slogwang     }
188*a9643ea8Slogwang }
189*a9643ea8Slogwang 
190*a9643ea8Slogwang 
191*a9643ea8Slogwang 
192*a9643ea8Slogwang // ����һ��net handler, ���ڸ���
193*a9643ea8Slogwang void CNetHandler::Reset()
194*a9643ea8Slogwang {
195*a9643ea8Slogwang     // ȥ��session��connection�������
196*a9643ea8Slogwang     this->Unlink();
197*a9643ea8Slogwang     this->UnRegistSession();
198*a9643ea8Slogwang 
199*a9643ea8Slogwang     // ��̬�ڴ�����
200*a9643ea8Slogwang     if (_rsp_buff != NULL) {
201*a9643ea8Slogwang         delete_sk_buffer(_rsp_buff);
202*a9643ea8Slogwang         _rsp_buff               = NULL;
203*a9643ea8Slogwang     }
204*a9643ea8Slogwang 
205*a9643ea8Slogwang     // �ֶγ�ʼ������
206*a9643ea8Slogwang     _thread                     = NULL;
207*a9643ea8Slogwang     _proto_type                 = NET_PROTO_TCP;
208*a9643ea8Slogwang     _conn_type                  = TYPE_CONN_SESSION;
209*a9643ea8Slogwang     _dest_ipv4.sin_addr.s_addr  = 0;
210*a9643ea8Slogwang     _dest_ipv4.sin_port         = 0;
211*a9643ea8Slogwang     _session_id                 = 0;
212*a9643ea8Slogwang     _callback                   = NULL;
213*a9643ea8Slogwang     _err_no                     = 0;
214*a9643ea8Slogwang     _state_flags                = 0;
215*a9643ea8Slogwang     _conn_ptr                   = NULL;
216*a9643ea8Slogwang     _send_pos                   = 0;
217*a9643ea8Slogwang     _req_len                    = 0;
218*a9643ea8Slogwang     _req_data                   = NULL;
219*a9643ea8Slogwang 
220*a9643ea8Slogwang }
221*a9643ea8Slogwang 
222*a9643ea8Slogwang // ���캯��
223*a9643ea8Slogwang CNetHandler::CNetHandler()
224*a9643ea8Slogwang {
225*a9643ea8Slogwang     _state_flags = 0;
226*a9643ea8Slogwang     _rsp_buff = NULL;
227*a9643ea8Slogwang 
228*a9643ea8Slogwang     this->Reset();
229*a9643ea8Slogwang }
230*a9643ea8Slogwang 
231*a9643ea8Slogwang // ��������
232*a9643ea8Slogwang CNetHandler::~CNetHandler()
233*a9643ea8Slogwang {
234*a9643ea8Slogwang     this->Reset();
235*a9643ea8Slogwang }
236*a9643ea8Slogwang 
237*a9643ea8Slogwang // ����Ҫ�IJ�����Ϣ
238*a9643ea8Slogwang int32_t CNetHandler::CheckParams()
239*a9643ea8Slogwang {
240*a9643ea8Slogwang     // 1. ��������Ч���
241*a9643ea8Slogwang     if ((NULL == _req_data) || (_req_len == 0))
242*a9643ea8Slogwang     {
243*a9643ea8Slogwang         MTLOG_ERROR("param invalid, data[%p], len[%u]", _req_data, _req_len);
244*a9643ea8Slogwang         return RC_INVALID_PARAM;
245*a9643ea8Slogwang     }
246*a9643ea8Slogwang 
247*a9643ea8Slogwang     // 2. Ŀ�ĵ�ַ��Ч���
248*a9643ea8Slogwang     if ((_dest_ipv4.sin_addr.s_addr == 0) || (_dest_ipv4.sin_port == 0))
249*a9643ea8Slogwang     {
250*a9643ea8Slogwang         MTLOG_ERROR("param invalid, ip[%u], port[%u]", _dest_ipv4.sin_addr.s_addr,
251*a9643ea8Slogwang              _dest_ipv4.sin_port);
252*a9643ea8Slogwang         return RC_INVALID_PARAM;
253*a9643ea8Slogwang     }
254*a9643ea8Slogwang 
255*a9643ea8Slogwang     // 3. session ���ͼ����ȷ��
256*a9643ea8Slogwang     if (_conn_type == TYPE_CONN_SESSION)
257*a9643ea8Slogwang     {
258*a9643ea8Slogwang         if ((_callback == NULL) || (_session_id == 0))
259*a9643ea8Slogwang         {
260*a9643ea8Slogwang             MTLOG_ERROR("param invalid, callback[%p], session_id[%llu]", _callback, _session_id);
261*a9643ea8Slogwang             return RC_INVALID_PARAM;
262*a9643ea8Slogwang         }
263*a9643ea8Slogwang 
264*a9643ea8Slogwang         // �����ע��session��Ϣ
265*a9643ea8Slogwang         if (!this->RegistSession())
266*a9643ea8Slogwang         {
267*a9643ea8Slogwang             MTLOG_ERROR("param invalid, session_id[%llu] regist failed", _session_id);
268*a9643ea8Slogwang             return RC_CONFLICT_SID;
269*a9643ea8Slogwang         }
270*a9643ea8Slogwang     }
271*a9643ea8Slogwang 
272*a9643ea8Slogwang     // 4. ����ִ��
273*a9643ea8Slogwang     return 0;
274*a9643ea8Slogwang }
275*a9643ea8Slogwang 
276*a9643ea8Slogwang // ��ȡ����, ͬʱ�������ȴ����ӵĶ�����
277*a9643ea8Slogwang int32_t CNetHandler::GetConnLink()
278*a9643ea8Slogwang {
279*a9643ea8Slogwang     CDestLinks key;
280*a9643ea8Slogwang     key.SetKeyInfo(_dest_ipv4.sin_addr.s_addr, _dest_ipv4.sin_port, _proto_type, _conn_type);
281*a9643ea8Slogwang 
282*a9643ea8Slogwang     CDestLinks* dest_link = CNetMgr::Instance()->FindCreateDest(&key);
283*a9643ea8Slogwang     if (NULL == dest_link)
284*a9643ea8Slogwang     {
285*a9643ea8Slogwang         MTLOG_ERROR("get dest link handle failed");
286*a9643ea8Slogwang         return RC_MEM_ERROR;
287*a9643ea8Slogwang     }
288*a9643ea8Slogwang 
289*a9643ea8Slogwang     CSockLink* sock_link = dest_link->GetSockLink();
290*a9643ea8Slogwang     if (NULL == sock_link)
291*a9643ea8Slogwang     {
292*a9643ea8Slogwang         MTLOG_ERROR("get sock link handle failed");
293*a9643ea8Slogwang         return RC_MEM_ERROR;
294*a9643ea8Slogwang     }
295*a9643ea8Slogwang 
296*a9643ea8Slogwang     this->Link(sock_link);
297*a9643ea8Slogwang 
298*a9643ea8Slogwang     return 0;
299*a9643ea8Slogwang }
300*a9643ea8Slogwang 
301*a9643ea8Slogwang 
302*a9643ea8Slogwang // ����Ҫ�IJ�����Ϣ
303*a9643ea8Slogwang int32_t CNetHandler::WaitConnect(uint64_t timeout)
304*a9643ea8Slogwang {
305*a9643ea8Slogwang     CSockLink* conn = (CSockLink*)this->_conn_ptr;
306*a9643ea8Slogwang     if (NULL == conn)
307*a9643ea8Slogwang     {
308*a9643ea8Slogwang         MTLOG_ERROR("get sock link handle failed");
309*a9643ea8Slogwang         return RC_MEM_ERROR;
310*a9643ea8Slogwang     }
311*a9643ea8Slogwang 
312*a9643ea8Slogwang     int32_t fd = conn->CreateSock();
313*a9643ea8Slogwang     if (fd < 0)
314*a9643ea8Slogwang     {
315*a9643ea8Slogwang         MTLOG_ERROR("create sock failed, ret %d[%m]", fd);
316*a9643ea8Slogwang         return RC_ERR_SOCKET;
317*a9643ea8Slogwang     }
318*a9643ea8Slogwang 
319*a9643ea8Slogwang     if (conn->Connect())
320*a9643ea8Slogwang     {
321*a9643ea8Slogwang         MTLOG_DEBUG("sock conncet ok");
322*a9643ea8Slogwang         return RC_SUCCESS;
323*a9643ea8Slogwang     }
324*a9643ea8Slogwang 
325*a9643ea8Slogwang     // ����ȴ�connect����
326*a9643ea8Slogwang     this->SwitchToConn();
327*a9643ea8Slogwang 
328*a9643ea8Slogwang     // �ȴ�������
329*a9643ea8Slogwang     MtFrame* mtframe = MtFrame::Instance();
330*a9643ea8Slogwang     mtframe->WaitNotify(timeout);
331*a9643ea8Slogwang 
332*a9643ea8Slogwang     // ɾ����connect����, ��ʱ��Ҫ�����, ��������
333*a9643ea8Slogwang     this->SwitchToIdle();
334*a9643ea8Slogwang 
335*a9643ea8Slogwang     if (_err_no != 0)
336*a9643ea8Slogwang     {
337*a9643ea8Slogwang         MTLOG_ERROR("connect get out errno %d", _err_no);
338*a9643ea8Slogwang         return _err_no;
339*a9643ea8Slogwang     }
340*a9643ea8Slogwang 
341*a9643ea8Slogwang     // ��ʱ��������ȷ��
342*a9643ea8Slogwang     if (conn->Connected())
343*a9643ea8Slogwang     {
344*a9643ea8Slogwang         MTLOG_DEBUG("connect ok");
345*a9643ea8Slogwang         return 0;
346*a9643ea8Slogwang     }
347*a9643ea8Slogwang     else
348*a9643ea8Slogwang     {
349*a9643ea8Slogwang         MTLOG_TRACE("connect not ok, maybe timeout");
350*a9643ea8Slogwang         return RC_CONNECT_FAIL;
351*a9643ea8Slogwang     }
352*a9643ea8Slogwang }
353*a9643ea8Slogwang 
354*a9643ea8Slogwang 
355*a9643ea8Slogwang // ����Ҫ�IJ�����Ϣ
356*a9643ea8Slogwang int32_t CNetHandler::WaitSend(uint64_t timeout)
357*a9643ea8Slogwang {
358*a9643ea8Slogwang     CSockLink* conn = (CSockLink*)this->_conn_ptr;
359*a9643ea8Slogwang     if (NULL == conn)
360*a9643ea8Slogwang     {
361*a9643ea8Slogwang         MTLOG_ERROR("get sock link handle failed");
362*a9643ea8Slogwang         return RC_MEM_ERROR;
363*a9643ea8Slogwang     }
364*a9643ea8Slogwang 
365*a9643ea8Slogwang     int32_t ret = conn->SendData(_req_data, _req_len);
366*a9643ea8Slogwang     if (ret < 0)
367*a9643ea8Slogwang     {
368*a9643ea8Slogwang         MTLOG_ERROR("sock send failed, ret %d[%m]", ret);
369*a9643ea8Slogwang         return RC_SEND_FAIL;
370*a9643ea8Slogwang     }
371*a9643ea8Slogwang     this->SkipSendPos(ret);
372*a9643ea8Slogwang 
373*a9643ea8Slogwang     if (_req_len == 0)
374*a9643ea8Slogwang     {
375*a9643ea8Slogwang         MTLOG_DEBUG("sock send ok");
376*a9643ea8Slogwang         return RC_SUCCESS;
377*a9643ea8Slogwang     }
378*a9643ea8Slogwang 
379*a9643ea8Slogwang     // �ȴ���������, �л��ѷ��͵���Ϣ
380*a9643ea8Slogwang     this->SwitchToSend();
381*a9643ea8Slogwang 
382*a9643ea8Slogwang     // �ȴ�������
383*a9643ea8Slogwang     MtFrame* mtframe = MtFrame::Instance();
384*a9643ea8Slogwang     mtframe->WaitNotify(timeout);
385*a9643ea8Slogwang 
386*a9643ea8Slogwang     // ɾ����connect����
387*a9643ea8Slogwang     this->SwitchToIdle();
388*a9643ea8Slogwang 
389*a9643ea8Slogwang     // �쳣���
390*a9643ea8Slogwang     if (_err_no != 0)
391*a9643ea8Slogwang     {
392*a9643ea8Slogwang         MTLOG_ERROR("send get out errno %d", _err_no);
393*a9643ea8Slogwang         return _err_no;
394*a9643ea8Slogwang     }
395*a9643ea8Slogwang 
396*a9643ea8Slogwang     // ��ʱ���
397*a9643ea8Slogwang     if (_req_len == 0)
398*a9643ea8Slogwang     {
399*a9643ea8Slogwang         MTLOG_DEBUG("send req ok, len %u", _send_pos);
400*a9643ea8Slogwang         return 0;
401*a9643ea8Slogwang     }
402*a9643ea8Slogwang     else
403*a9643ea8Slogwang     {
404*a9643ea8Slogwang         MTLOG_TRACE("send req not ok, left len %u", _req_len);
405*a9643ea8Slogwang         return RC_SEND_FAIL;
406*a9643ea8Slogwang     }
407*a9643ea8Slogwang }
408*a9643ea8Slogwang 
409*a9643ea8Slogwang // ����Ҫ�IJ�����Ϣ
410*a9643ea8Slogwang int32_t CNetHandler::WaitRecv(uint64_t timeout)
411*a9643ea8Slogwang {
412*a9643ea8Slogwang     CSockLink* conn = (CSockLink*)this->_conn_ptr;
413*a9643ea8Slogwang     if (NULL == conn)
414*a9643ea8Slogwang     {
415*a9643ea8Slogwang         MTLOG_ERROR("get sock link handle failed");
416*a9643ea8Slogwang         return RC_MEM_ERROR;
417*a9643ea8Slogwang     }
418*a9643ea8Slogwang 
419*a9643ea8Slogwang     if (_conn_type == TYPE_CONN_SENDONLY)
420*a9643ea8Slogwang     {
421*a9643ea8Slogwang         MTLOG_DEBUG("only send, without recv");
422*a9643ea8Slogwang         return 0;
423*a9643ea8Slogwang     }
424*a9643ea8Slogwang 
425*a9643ea8Slogwang     // �л����ȴ�����
426*a9643ea8Slogwang     this->SwitchToRecv();
427*a9643ea8Slogwang 
428*a9643ea8Slogwang     // �ȴ�������
429*a9643ea8Slogwang     MtFrame* mtframe = MtFrame::Instance();
430*a9643ea8Slogwang     mtframe->WaitNotify(timeout);
431*a9643ea8Slogwang 
432*a9643ea8Slogwang     // ɾ����connect����
433*a9643ea8Slogwang     this->SwitchToIdle();
434*a9643ea8Slogwang 
435*a9643ea8Slogwang     // ��ʱ���
436*a9643ea8Slogwang     if ((_rsp_buff != NULL) && (_rsp_buff->data_len > 0))
437*a9643ea8Slogwang     {
438*a9643ea8Slogwang         MTLOG_DEBUG("recv get rsp, len %d", _rsp_buff->data_len);
439*a9643ea8Slogwang         return 0;
440*a9643ea8Slogwang     }
441*a9643ea8Slogwang     else
442*a9643ea8Slogwang     {
443*a9643ea8Slogwang         MTLOG_TRACE("recv get out errno %d", _err_no);
444*a9643ea8Slogwang         return RC_RECV_FAIL;
445*a9643ea8Slogwang     }
446*a9643ea8Slogwang }
447*a9643ea8Slogwang 
448*a9643ea8Slogwang 
449*a9643ea8Slogwang // ͬ���շ��ӿ�, �޸�Ϊsessionר��
450*a9643ea8Slogwang int32_t CNetHandler::SendRecv(void* data, uint32_t len, uint32_t timeout)
451*a9643ea8Slogwang {
452*a9643ea8Slogwang     utime64_t start_ms = MtFrame::Instance()->GetLastClock();
453*a9643ea8Slogwang     utime64_t cost_time = 0;
454*a9643ea8Slogwang     uint64_t time_left = timeout;
455*a9643ea8Slogwang     this->_req_data = data;
456*a9643ea8Slogwang     this->_req_len  = len;
457*a9643ea8Slogwang 
458*a9643ea8Slogwang     // 0. ����Ҫ��������Ϣ
459*a9643ea8Slogwang     int32_t ret = this->CheckParams();
460*a9643ea8Slogwang     if (ret < 0)
461*a9643ea8Slogwang     {
462*a9643ea8Slogwang         MTLOG_ERROR("check params failed, ret[%d]", ret);
463*a9643ea8Slogwang         goto EXIT_LABEL;
464*a9643ea8Slogwang     }
465*a9643ea8Slogwang 
466*a9643ea8Slogwang     // 1. ��ȡ������·ָ��
467*a9643ea8Slogwang     ret = this->GetConnLink();
468*a9643ea8Slogwang     if (ret < 0)
469*a9643ea8Slogwang     {
470*a9643ea8Slogwang         MTLOG_ERROR("get sock conn failed, ret: %d", ret);
471*a9643ea8Slogwang         goto EXIT_LABEL;
472*a9643ea8Slogwang     }
473*a9643ea8Slogwang 
474*a9643ea8Slogwang     // 2. �ȴ����ӳɹ�
475*a9643ea8Slogwang     ret = this->WaitConnect(time_left);
476*a9643ea8Slogwang     if (ret < 0)
477*a9643ea8Slogwang     {
478*a9643ea8Slogwang         MTLOG_ERROR("sock connect failed, ret: %d", ret);
479*a9643ea8Slogwang         goto EXIT_LABEL;
480*a9643ea8Slogwang     }
481*a9643ea8Slogwang 
482*a9643ea8Slogwang     // 3. �ȴ����ͳɹ�
483*a9643ea8Slogwang     cost_time = MtFrame::Instance()->GetLastClock() - start_ms;
484*a9643ea8Slogwang     time_left = (timeout > (uint32_t)cost_time) ? (timeout - (uint32_t)cost_time) : 0;
485*a9643ea8Slogwang     ret = this->WaitSend(time_left);
486*a9643ea8Slogwang     if (ret < 0)
487*a9643ea8Slogwang     {
488*a9643ea8Slogwang         MTLOG_ERROR("sock send failed, ret: %d", ret);
489*a9643ea8Slogwang         goto EXIT_LABEL;
490*a9643ea8Slogwang     }
491*a9643ea8Slogwang 
492*a9643ea8Slogwang     // 4. �ȴ����ճɹ�
493*a9643ea8Slogwang     cost_time = MtFrame::Instance()->GetLastClock() - start_ms;
494*a9643ea8Slogwang     time_left = (timeout > (uint32_t)cost_time) ? (timeout - (uint32_t)cost_time) : 0;
495*a9643ea8Slogwang     ret = this->WaitRecv(time_left);
496*a9643ea8Slogwang     if (ret < 0)
497*a9643ea8Slogwang     {
498*a9643ea8Slogwang         MTLOG_ERROR("sock recv failed, ret: %d", ret);
499*a9643ea8Slogwang         goto EXIT_LABEL;
500*a9643ea8Slogwang     }
501*a9643ea8Slogwang 
502*a9643ea8Slogwang     // 5. �ɹ�����
503*a9643ea8Slogwang     ret = 0;
504*a9643ea8Slogwang 
505*a9643ea8Slogwang EXIT_LABEL:
506*a9643ea8Slogwang 
507*a9643ea8Slogwang     // ���η������, ��ҪUNLINK, ����NETHANDLER֧��
508*a9643ea8Slogwang     this->Unlink();
509*a9643ea8Slogwang 
510*a9643ea8Slogwang     // �ɹ�ʧ��, ��Ҫȥ��sessionע��
511*a9643ea8Slogwang     this->UnRegistSession();
512*a9643ea8Slogwang 
513*a9643ea8Slogwang     return ret;
514*a9643ea8Slogwang }
515*a9643ea8Slogwang 
516*a9643ea8Slogwang 
517*a9643ea8Slogwang // �������͵�������
518*a9643ea8Slogwang uint32_t CNetHandler::SkipSendPos(uint32_t len)
519*a9643ea8Slogwang {
520*a9643ea8Slogwang     uint32_t skip_len = (len >= _req_len) ? _req_len : len;
521*a9643ea8Slogwang     _req_len -= skip_len;
522*a9643ea8Slogwang     _send_pos += skip_len;
523*a9643ea8Slogwang     _req_data = (char*)_req_data + skip_len;
524*a9643ea8Slogwang 
525*a9643ea8Slogwang     return skip_len;
526*a9643ea8Slogwang }
527*a9643ea8Slogwang 
528*a9643ea8Slogwang // �������Ӷ���
529*a9643ea8Slogwang void CNetHandler::Link(CSockLink* conn)
530*a9643ea8Slogwang {
531*a9643ea8Slogwang     this->_conn_ptr = conn;
532*a9643ea8Slogwang     this->SwitchToIdle();
533*a9643ea8Slogwang }
534*a9643ea8Slogwang 
535*a9643ea8Slogwang 
536*a9643ea8Slogwang // �������Ӷ���
537*a9643ea8Slogwang void CNetHandler::Unlink()
538*a9643ea8Slogwang {
539*a9643ea8Slogwang     if (this->_state_flags != 0)
540*a9643ea8Slogwang     {
541*a9643ea8Slogwang         this->DetachConn();
542*a9643ea8Slogwang     }
543*a9643ea8Slogwang     this->_conn_ptr = NULL;
544*a9643ea8Slogwang }
545*a9643ea8Slogwang 
546*a9643ea8Slogwang // �����ڵȴ����Ӷ���
547*a9643ea8Slogwang void CNetHandler::SwitchToConn()
548*a9643ea8Slogwang {
549*a9643ea8Slogwang     CSockLink* conn = (CSockLink*)this->_conn_ptr;
550*a9643ea8Slogwang     if (NULL == conn)
551*a9643ea8Slogwang     {
552*a9643ea8Slogwang         MTLOG_ERROR("net handler invalid");
553*a9643ea8Slogwang         return;
554*a9643ea8Slogwang     }
555*a9643ea8Slogwang 
556*a9643ea8Slogwang     this->DetachConn();
557*a9643ea8Slogwang 
558*a9643ea8Slogwang     this->_state_flags |= STATE_IN_CONNECT;
559*a9643ea8Slogwang     conn->AppendToList(CSockLink::LINK_CONN_LIST, this);
560*a9643ea8Slogwang }
561*a9643ea8Slogwang 
562*a9643ea8Slogwang // �л������Ͷ���
563*a9643ea8Slogwang void CNetHandler::SwitchToSend()
564*a9643ea8Slogwang {
565*a9643ea8Slogwang     CSockLink* conn = (CSockLink*)this->_conn_ptr;
566*a9643ea8Slogwang     if (NULL == conn)
567*a9643ea8Slogwang     {
568*a9643ea8Slogwang         MTLOG_ERROR("net handler invalid");
569*a9643ea8Slogwang         return;
570*a9643ea8Slogwang     }
571*a9643ea8Slogwang 
572*a9643ea8Slogwang     this->DetachConn();
573*a9643ea8Slogwang 
574*a9643ea8Slogwang     this->_state_flags |= STATE_IN_SEND;
575*a9643ea8Slogwang     conn->AppendToList(CSockLink::LINK_SEND_LIST, this);
576*a9643ea8Slogwang }
577*a9643ea8Slogwang 
578*a9643ea8Slogwang 
579*a9643ea8Slogwang // �л������ն���
580*a9643ea8Slogwang void CNetHandler::SwitchToRecv()
581*a9643ea8Slogwang {
582*a9643ea8Slogwang     CSockLink* conn = (CSockLink*)this->_conn_ptr;
583*a9643ea8Slogwang     if (NULL == conn)
584*a9643ea8Slogwang     {
585*a9643ea8Slogwang         MTLOG_ERROR("net handler invalid");
586*a9643ea8Slogwang         return;
587*a9643ea8Slogwang     }
588*a9643ea8Slogwang 
589*a9643ea8Slogwang     this->DetachConn();
590*a9643ea8Slogwang 
591*a9643ea8Slogwang     this->_state_flags |= STATE_IN_RECV;
592*a9643ea8Slogwang     conn->AppendToList(CSockLink::LINK_RECV_LIST, this);
593*a9643ea8Slogwang }
594*a9643ea8Slogwang 
595*a9643ea8Slogwang // �л�������״̬
596*a9643ea8Slogwang void CNetHandler::SwitchToIdle()
597*a9643ea8Slogwang {
598*a9643ea8Slogwang     CSockLink* conn = (CSockLink*)this->_conn_ptr;
599*a9643ea8Slogwang     if (NULL == conn)
600*a9643ea8Slogwang     {
601*a9643ea8Slogwang         MTLOG_ERROR("net handler invalid");
602*a9643ea8Slogwang         return;
603*a9643ea8Slogwang     }
604*a9643ea8Slogwang 
605*a9643ea8Slogwang     this->DetachConn();
606*a9643ea8Slogwang 
607*a9643ea8Slogwang     this->_state_flags |= STATE_IN_IDLE;
608*a9643ea8Slogwang     conn->AppendToList(CSockLink::LINK_IDLE_LIST, this);
609*a9643ea8Slogwang }
610*a9643ea8Slogwang 
611*a9643ea8Slogwang 
612*a9643ea8Slogwang // ��������״̬����
613*a9643ea8Slogwang void CNetHandler::DetachConn()
614*a9643ea8Slogwang {
615*a9643ea8Slogwang     CSockLink* conn = (CSockLink*)this->_conn_ptr;
616*a9643ea8Slogwang     if (NULL == conn)
617*a9643ea8Slogwang     {
618*a9643ea8Slogwang         MTLOG_DEBUG("net handler not set");
619*a9643ea8Slogwang         return;
620*a9643ea8Slogwang     }
621*a9643ea8Slogwang 
622*a9643ea8Slogwang     if (_state_flags == 0)  // ��ʱ���¼�����, 2����֧, ����Ҫ������
623*a9643ea8Slogwang     {
624*a9643ea8Slogwang         return;
625*a9643ea8Slogwang     }
626*a9643ea8Slogwang 
627*a9643ea8Slogwang     if (_state_flags & STATE_IN_CONNECT)
628*a9643ea8Slogwang     {
629*a9643ea8Slogwang         conn->RemoveFromList(CSockLink::LINK_CONN_LIST, this);
630*a9643ea8Slogwang         _state_flags &= ~STATE_IN_CONNECT;
631*a9643ea8Slogwang     }
632*a9643ea8Slogwang 
633*a9643ea8Slogwang     if (_state_flags & STATE_IN_SEND)
634*a9643ea8Slogwang     {
635*a9643ea8Slogwang         conn->RemoveFromList(CSockLink::LINK_SEND_LIST, this);
636*a9643ea8Slogwang         _state_flags &= ~STATE_IN_SEND;
637*a9643ea8Slogwang     }
638*a9643ea8Slogwang 
639*a9643ea8Slogwang     if (_state_flags & STATE_IN_RECV)
640*a9643ea8Slogwang     {
641*a9643ea8Slogwang         conn->RemoveFromList(CSockLink::LINK_RECV_LIST, this);
642*a9643ea8Slogwang         _state_flags &= ~STATE_IN_RECV;
643*a9643ea8Slogwang     }
644*a9643ea8Slogwang 
645*a9643ea8Slogwang     if (_state_flags & STATE_IN_IDLE)
646*a9643ea8Slogwang     {
647*a9643ea8Slogwang         conn->RemoveFromList(CSockLink::LINK_IDLE_LIST, this);
648*a9643ea8Slogwang         _state_flags &= ~STATE_IN_IDLE;
649*a9643ea8Slogwang     }
650*a9643ea8Slogwang }
651*a9643ea8Slogwang 
652*a9643ea8Slogwang 
653*a9643ea8Slogwang 
654*a9643ea8Slogwang 
655*a9643ea8Slogwang /**
656*a9643ea8Slogwang  *  @brief �ڵ�Ԫ�ص�hash�㷨, ��ȡkey��hashֵ
657*a9643ea8Slogwang  *  @return �ڵ�Ԫ�ص�hashֵ
658*a9643ea8Slogwang  */
659*a9643ea8Slogwang uint32_t CNetHandler::HashValue()
660*a9643ea8Slogwang {
661*a9643ea8Slogwang     uint32_t ip = _dest_ipv4.sin_addr.s_addr;
662*a9643ea8Slogwang     ip ^= (_dest_ipv4.sin_port << 16) | (_proto_type << 8) | (_conn_type << 8);
663*a9643ea8Slogwang 
664*a9643ea8Slogwang     uint32_t hash = (_session_id >> 32) & 0xffffffff;
665*a9643ea8Slogwang     hash ^= _session_id  & 0xffffffff;
666*a9643ea8Slogwang     hash ^= ip;
667*a9643ea8Slogwang 
668*a9643ea8Slogwang     return hash;
669*a9643ea8Slogwang }
670*a9643ea8Slogwang 
671*a9643ea8Slogwang /**
672*a9643ea8Slogwang  *  @brief �ڵ�Ԫ�ص�cmp����, ͬһͰID��, ��key�Ƚ�
673*a9643ea8Slogwang  *  @return �ڵ�Ԫ�ص�hashֵ
674*a9643ea8Slogwang  */
675*a9643ea8Slogwang int32_t CNetHandler::HashCmp(HashKey* rhs)
676*a9643ea8Slogwang {
677*a9643ea8Slogwang     CNetHandler* data = (CNetHandler*)(rhs);
678*a9643ea8Slogwang     if (!data) {
679*a9643ea8Slogwang         return -1;
680*a9643ea8Slogwang     }
681*a9643ea8Slogwang     if (this->_session_id != data->_session_id)
682*a9643ea8Slogwang     {
683*a9643ea8Slogwang         return (this->_session_id > data->_session_id) ? 1 : -1;
684*a9643ea8Slogwang     }
685*a9643ea8Slogwang 
686*a9643ea8Slogwang     if (this->_dest_ipv4.sin_addr.s_addr != data->_dest_ipv4.sin_addr.s_addr) {
687*a9643ea8Slogwang         return (this->_dest_ipv4.sin_addr.s_addr > data->_dest_ipv4.sin_addr.s_addr) ? 1 : -1;
688*a9643ea8Slogwang     }
689*a9643ea8Slogwang     if (this->_dest_ipv4.sin_port != data->_dest_ipv4.sin_port) {
690*a9643ea8Slogwang         return (this->_dest_ipv4.sin_port > data->_dest_ipv4.sin_port) ? 1 : -1;
691*a9643ea8Slogwang     }
692*a9643ea8Slogwang     if (this->_proto_type != data->_proto_type) {
693*a9643ea8Slogwang         return (this->_proto_type > data->_proto_type) ? 1 : -1;
694*a9643ea8Slogwang     }
695*a9643ea8Slogwang     if (this->_conn_type != data->_conn_type) {
696*a9643ea8Slogwang         return (this->_conn_type > data->_conn_type) ? 1 : -1;
697*a9643ea8Slogwang     }
698*a9643ea8Slogwang 
699*a9643ea8Slogwang     return 0;
700*a9643ea8Slogwang };
701*a9643ea8Slogwang 
702*a9643ea8Slogwang 
703*a9643ea8Slogwang // ע��session����
704*a9643ea8Slogwang bool CNetHandler::RegistSession()
705*a9643ea8Slogwang {
706*a9643ea8Slogwang     if (CNetMgr::Instance()->FindNetItem(this) != NULL)
707*a9643ea8Slogwang     {
708*a9643ea8Slogwang         return false;
709*a9643ea8Slogwang     }
710*a9643ea8Slogwang 
711*a9643ea8Slogwang     MtFrame* mtframe = MtFrame::Instance();
712*a9643ea8Slogwang     this->_thread = mtframe->GetActiveThread();
713*a9643ea8Slogwang 
714*a9643ea8Slogwang     CNetMgr::Instance()->InsertNetItem(this);
715*a9643ea8Slogwang     this->_state_flags |= STATE_IN_SESSION;
716*a9643ea8Slogwang     return true;
717*a9643ea8Slogwang }
718*a9643ea8Slogwang 
719*a9643ea8Slogwang // ȡ��ע��session
720*a9643ea8Slogwang void CNetHandler::UnRegistSession()
721*a9643ea8Slogwang {
722*a9643ea8Slogwang     if (this->_state_flags & STATE_IN_SESSION)
723*a9643ea8Slogwang     {
724*a9643ea8Slogwang         CNetMgr::Instance()->RemoveNetItem(this);
725*a9643ea8Slogwang         this->_state_flags &= ~STATE_IN_SESSION;
726*a9643ea8Slogwang     }
727*a9643ea8Slogwang }
728*a9643ea8Slogwang 
729*a9643ea8Slogwang 
730*a9643ea8Slogwang // ��ȡ��������
731*a9643ea8Slogwang TNetItemList* CSockLink::GetItemList(int32_t type)
732*a9643ea8Slogwang {
733*a9643ea8Slogwang     TNetItemList* list = NULL;
734*a9643ea8Slogwang     switch (type)
735*a9643ea8Slogwang     {
736*a9643ea8Slogwang         case LINK_IDLE_LIST:
737*a9643ea8Slogwang             list = &this->_idle_list;
738*a9643ea8Slogwang             break;
739*a9643ea8Slogwang 
740*a9643ea8Slogwang         case LINK_CONN_LIST:
741*a9643ea8Slogwang             list = &this->_wait_connect;
742*a9643ea8Slogwang             break;
743*a9643ea8Slogwang 
744*a9643ea8Slogwang         case LINK_SEND_LIST:
745*a9643ea8Slogwang             list = &this->_wait_send;
746*a9643ea8Slogwang             break;
747*a9643ea8Slogwang 
748*a9643ea8Slogwang         case LINK_RECV_LIST:
749*a9643ea8Slogwang             list = &this->_wait_recv;
750*a9643ea8Slogwang             break;
751*a9643ea8Slogwang 
752*a9643ea8Slogwang         default:
753*a9643ea8Slogwang             break;
754*a9643ea8Slogwang     }
755*a9643ea8Slogwang 
756*a9643ea8Slogwang     return list;
757*a9643ea8Slogwang }
758*a9643ea8Slogwang 
759*a9643ea8Slogwang 
760*a9643ea8Slogwang // ��������Ϣ
761*a9643ea8Slogwang void CSockLink::AppendToList(int32_t type, CNetHandler* item)
762*a9643ea8Slogwang {
763*a9643ea8Slogwang     TNetItemList* list = this->GetItemList(type);
764*a9643ea8Slogwang     if (NULL == list)
765*a9643ea8Slogwang     {
766*a9643ea8Slogwang         MTLOG_ERROR("unknown list type: %d", type);
767*a9643ea8Slogwang         return;
768*a9643ea8Slogwang     }
769*a9643ea8Slogwang 
770*a9643ea8Slogwang     TAILQ_INSERT_TAIL(list, item, _link_entry);
771*a9643ea8Slogwang }
772*a9643ea8Slogwang 
773*a9643ea8Slogwang // ��������Ϣ
774*a9643ea8Slogwang void CSockLink::RemoveFromList(int32_t type, CNetHandler* item)
775*a9643ea8Slogwang {
776*a9643ea8Slogwang     TNetItemList* list = this->GetItemList(type);
777*a9643ea8Slogwang     if (NULL == list)
778*a9643ea8Slogwang     {
779*a9643ea8Slogwang         MTLOG_ERROR("unknown list type: %d", type);
780*a9643ea8Slogwang         return;
781*a9643ea8Slogwang     }
782*a9643ea8Slogwang 
783*a9643ea8Slogwang     TAILQ_REMOVE(list, item, _link_entry);
784*a9643ea8Slogwang }
785*a9643ea8Slogwang 
786*a9643ea8Slogwang 
787*a9643ea8Slogwang // ֪ͨ�����߳�
788*a9643ea8Slogwang void CSockLink::NotifyThread(CNetHandler* item, int32_t result)
789*a9643ea8Slogwang {
790*a9643ea8Slogwang     static MtFrame* frame = NULL;
791*a9643ea8Slogwang     if (frame == NULL) {
792*a9643ea8Slogwang         frame = MtFrame::Instance();
793*a9643ea8Slogwang     }
794*a9643ea8Slogwang 
795*a9643ea8Slogwang     // ���÷�������Ϣ
796*a9643ea8Slogwang     if (result != RC_SUCCESS)
797*a9643ea8Slogwang     {
798*a9643ea8Slogwang         item->SetErrNo(result);
799*a9643ea8Slogwang     }
800*a9643ea8Slogwang 
801*a9643ea8Slogwang     // ���ÿ�����
802*a9643ea8Slogwang     MicroThread* thread = item->GetThread();
803*a9643ea8Slogwang     if ((thread != NULL) && (thread->HasFlag(MicroThread::IO_LIST)))
804*a9643ea8Slogwang     {
805*a9643ea8Slogwang         frame->RemoveIoWait(thread);
806*a9643ea8Slogwang         frame->InsertRunable(thread);
807*a9643ea8Slogwang     }
808*a9643ea8Slogwang }
809*a9643ea8Slogwang 
810*a9643ea8Slogwang 
811*a9643ea8Slogwang // ֪ͨ�����߳�
812*a9643ea8Slogwang void CSockLink::NotifyAll(int32_t result)
813*a9643ea8Slogwang {
814*a9643ea8Slogwang     CNetHandler* item = NULL;
815*a9643ea8Slogwang     CNetHandler* tmp = NULL;
816*a9643ea8Slogwang 
817*a9643ea8Slogwang     TAILQ_FOREACH_SAFE(item, &_wait_connect, _link_entry, tmp)
818*a9643ea8Slogwang     {
819*a9643ea8Slogwang         NotifyThread(item, result);
820*a9643ea8Slogwang         item->Unlink();
821*a9643ea8Slogwang     }
822*a9643ea8Slogwang 
823*a9643ea8Slogwang     TAILQ_FOREACH_SAFE(item, &_wait_send, _link_entry, tmp)
824*a9643ea8Slogwang     {
825*a9643ea8Slogwang         NotifyThread(item, result);
826*a9643ea8Slogwang         item->Unlink();
827*a9643ea8Slogwang     }
828*a9643ea8Slogwang 
829*a9643ea8Slogwang     TAILQ_FOREACH_SAFE(item, &_wait_recv, _link_entry, tmp)
830*a9643ea8Slogwang     {
831*a9643ea8Slogwang         NotifyThread(item, result);
832*a9643ea8Slogwang         item->Unlink();
833*a9643ea8Slogwang     }
834*a9643ea8Slogwang 
835*a9643ea8Slogwang     TAILQ_FOREACH_SAFE(item, &_idle_list, _link_entry, tmp)
836*a9643ea8Slogwang     {
837*a9643ea8Slogwang         NotifyThread(item, result);
838*a9643ea8Slogwang         item->Unlink();
839*a9643ea8Slogwang     }
840*a9643ea8Slogwang }
841*a9643ea8Slogwang 
842*a9643ea8Slogwang 
843*a9643ea8Slogwang // �����ó�ʼ���߼�
844*a9643ea8Slogwang void CSockLink::Reset()
845*a9643ea8Slogwang {
846*a9643ea8Slogwang     // �ر�fd, ֪ͨ�������߳�
847*a9643ea8Slogwang     this->Close();
848*a9643ea8Slogwang     this->NotifyAll(_errno);
849*a9643ea8Slogwang 
850*a9643ea8Slogwang     // ����cache����
851*a9643ea8Slogwang     rw_cache_destroy(&_recv_cache);
852*a9643ea8Slogwang     if (_rsp_buff != NULL)
853*a9643ea8Slogwang     {
854*a9643ea8Slogwang         delete_sk_buffer(_rsp_buff);
855*a9643ea8Slogwang         _rsp_buff = NULL;
856*a9643ea8Slogwang     }
857*a9643ea8Slogwang 
858*a9643ea8Slogwang     // ����ȴ�����, ���ѵȴ��߳�
859*a9643ea8Slogwang     TAILQ_INIT(&_wait_connect);
860*a9643ea8Slogwang     TAILQ_INIT(&_wait_send);
861*a9643ea8Slogwang     TAILQ_INIT(&_wait_recv);
862*a9643ea8Slogwang     TAILQ_INIT(&_idle_list);
863*a9643ea8Slogwang 
864*a9643ea8Slogwang     _proto_type     = NET_PROTO_TCP;
865*a9643ea8Slogwang     _errno          = 0;
866*a9643ea8Slogwang     _state          = 0;
867*a9643ea8Slogwang     _last_access    = mt_time_ms();
868*a9643ea8Slogwang     _parents        = NULL;
869*a9643ea8Slogwang 
870*a9643ea8Slogwang     // �����������
871*a9643ea8Slogwang     this->KqueuerObj::Reset();
872*a9643ea8Slogwang }
873*a9643ea8Slogwang 
874*a9643ea8Slogwang // ��������������
875*a9643ea8Slogwang CSockLink::CSockLink()
876*a9643ea8Slogwang {
877*a9643ea8Slogwang     rw_cache_init(&_recv_cache, NULL);
878*a9643ea8Slogwang     _rsp_buff       = NULL;
879*a9643ea8Slogwang 
880*a9643ea8Slogwang     TAILQ_INIT(&_wait_connect);
881*a9643ea8Slogwang     TAILQ_INIT(&_wait_send);
882*a9643ea8Slogwang     TAILQ_INIT(&_wait_recv);
883*a9643ea8Slogwang     TAILQ_INIT(&_idle_list);
884*a9643ea8Slogwang 
885*a9643ea8Slogwang     _proto_type     = NET_PROTO_TCP;
886*a9643ea8Slogwang     _errno          = 0;
887*a9643ea8Slogwang     _state          = 0;
888*a9643ea8Slogwang     _last_access    = mt_time_ms();
889*a9643ea8Slogwang     _parents        = NULL;
890*a9643ea8Slogwang }
891*a9643ea8Slogwang 
892*a9643ea8Slogwang // ��������������
893*a9643ea8Slogwang CSockLink::~CSockLink()
894*a9643ea8Slogwang {
895*a9643ea8Slogwang     this->Reset();
896*a9643ea8Slogwang }
897*a9643ea8Slogwang 
898*a9643ea8Slogwang 
899*a9643ea8Slogwang // ����Э������, ����buff�ص�ָ��
900*a9643ea8Slogwang void CSockLink::SetProtoType(MT_PROTO_TYPE type)
901*a9643ea8Slogwang {
902*a9643ea8Slogwang     _proto_type = type;
903*a9643ea8Slogwang     _recv_cache.pool = CNetMgr::Instance()->GetSkBuffMng(type);
904*a9643ea8Slogwang }
905*a9643ea8Slogwang 
906*a9643ea8Slogwang // �ر���·�ľ��
907*a9643ea8Slogwang void CSockLink::Close()
908*a9643ea8Slogwang {
909*a9643ea8Slogwang     // 1. ���δ��ʼ��, ֱ�ӷ���
910*a9643ea8Slogwang     if (_fd < 0)
911*a9643ea8Slogwang     {
912*a9643ea8Slogwang         return;
913*a9643ea8Slogwang     }
914*a9643ea8Slogwang 
915*a9643ea8Slogwang     // 2. ��������ͷ�
916*a9643ea8Slogwang     MtFrame::Instance()->KqueueDelObj(this);
917*a9643ea8Slogwang 
918*a9643ea8Slogwang     // 3. �رվ��
919*a9643ea8Slogwang     close(_fd);
920*a9643ea8Slogwang     _fd = -1;
921*a9643ea8Slogwang }
922*a9643ea8Slogwang 
923*a9643ea8Slogwang 
924*a9643ea8Slogwang // �쳣��ֹ�Ĵ�����
925*a9643ea8Slogwang void CSockLink::Destroy()
926*a9643ea8Slogwang {
927*a9643ea8Slogwang     CDestLinks* dstlink = (CDestLinks*)_parents;
928*a9643ea8Slogwang     if (NULL == dstlink)
929*a9643ea8Slogwang     {
930*a9643ea8Slogwang         MTLOG_ERROR("socket link without parents ptr, maybe wrong");
931*a9643ea8Slogwang         delete this;
932*a9643ea8Slogwang     }
933*a9643ea8Slogwang     else
934*a9643ea8Slogwang     {
935*a9643ea8Slogwang         MTLOG_DEBUG("socket link just free");
936*a9643ea8Slogwang         dstlink->FreeSockLink(this);
937*a9643ea8Slogwang     }
938*a9643ea8Slogwang }
939*a9643ea8Slogwang 
940*a9643ea8Slogwang 
941*a9643ea8Slogwang // ������socket���
942*a9643ea8Slogwang int32_t CSockLink::CreateSock()
943*a9643ea8Slogwang {
944*a9643ea8Slogwang     if (_fd > 0)        // ��������ʱ, ��������������;
945*a9643ea8Slogwang     {
946*a9643ea8Slogwang         return _fd;
947*a9643ea8Slogwang     }
948*a9643ea8Slogwang 
949*a9643ea8Slogwang     // �������
950*a9643ea8Slogwang     if (NET_PROTO_TCP == _proto_type)
951*a9643ea8Slogwang     {
952*a9643ea8Slogwang         _fd = socket(AF_INET, SOCK_STREAM, 0);
953*a9643ea8Slogwang     }
954*a9643ea8Slogwang     else
955*a9643ea8Slogwang     {
956*a9643ea8Slogwang         _fd = socket(AF_INET, SOCK_DGRAM, 0);
957*a9643ea8Slogwang     }
958*a9643ea8Slogwang 
959*a9643ea8Slogwang     if (_fd < 0)
960*a9643ea8Slogwang     {
961*a9643ea8Slogwang         MTLOG_ERROR("create socket failed, ret %d[%m]", _fd);
962*a9643ea8Slogwang         return -1;
963*a9643ea8Slogwang     }
964*a9643ea8Slogwang 
965*a9643ea8Slogwang     // ���÷�����
966*a9643ea8Slogwang     int flags = 1;
967*a9643ea8Slogwang     if (ioctl(_fd, FIONBIO, &flags) < 0)
968*a9643ea8Slogwang     {
969*a9643ea8Slogwang         MTLOG_ERROR("socket unblock failed, %m");
970*a9643ea8Slogwang         close(_fd);
971*a9643ea8Slogwang         _fd = -1;
972*a9643ea8Slogwang         return -2;
973*a9643ea8Slogwang     }
974*a9643ea8Slogwang 
975*a9643ea8Slogwang     // ѡ������, NODELAY
976*a9643ea8Slogwang     if (NET_PROTO_TCP == _proto_type)
977*a9643ea8Slogwang     {
978*a9643ea8Slogwang         setsockopt(_fd, IPPROTO_TCP, TCP_NODELAY, &flags, sizeof(flags));
979*a9643ea8Slogwang         this->EnableOutput(); // TCP �ȴ�connect����, ʡһ��epollctrl
980*a9643ea8Slogwang     }
981*a9643ea8Slogwang 
982*a9643ea8Slogwang     // ����epoll���
983*a9643ea8Slogwang     this->EnableInput();
984*a9643ea8Slogwang     if (!MtFrame::Instance()->KqueueAddObj(this))
985*a9643ea8Slogwang     {
986*a9643ea8Slogwang         MTLOG_ERROR("socket epoll mng failed, %m");
987*a9643ea8Slogwang         close(_fd);
988*a9643ea8Slogwang         _fd = -1;
989*a9643ea8Slogwang         return -3;
990*a9643ea8Slogwang     }
991*a9643ea8Slogwang 
992*a9643ea8Slogwang     return _fd;
993*a9643ea8Slogwang }
994*a9643ea8Slogwang 
995*a9643ea8Slogwang // ��ȡĿ��ip��Ϣ
996*a9643ea8Slogwang struct sockaddr_in* CSockLink::GetDestAddr(struct sockaddr_in* addr)
997*a9643ea8Slogwang {
998*a9643ea8Slogwang     CDestLinks* dstlink = (CDestLinks*)_parents;
999*a9643ea8Slogwang     if ((NULL == _parents) || (NULL == addr)) {
1000*a9643ea8Slogwang         return NULL;
1001*a9643ea8Slogwang     }
1002*a9643ea8Slogwang 
1003*a9643ea8Slogwang     uint32_t ip = 0;
1004*a9643ea8Slogwang     uint16_t port = 0;
1005*a9643ea8Slogwang     dstlink->GetDestIP(ip, port);
1006*a9643ea8Slogwang 
1007*a9643ea8Slogwang     addr->sin_family = AF_INET;
1008*a9643ea8Slogwang     addr->sin_addr.s_addr = ip;
1009*a9643ea8Slogwang     addr->sin_port = port;
1010*a9643ea8Slogwang 
1011*a9643ea8Slogwang     return addr;
1012*a9643ea8Slogwang }
1013*a9643ea8Slogwang 
1014*a9643ea8Slogwang 
1015*a9643ea8Slogwang // �������ӹ���
1016*a9643ea8Slogwang bool CSockLink::Connect()
1017*a9643ea8Slogwang {
1018*a9643ea8Slogwang     this->_last_access = mt_time_ms();
1019*a9643ea8Slogwang 
1020*a9643ea8Slogwang     // 1. UDP�����ӹ���
1021*a9643ea8Slogwang     if (_proto_type == NET_PROTO_UDP)
1022*a9643ea8Slogwang     {
1023*a9643ea8Slogwang         _state |= LINK_CONNECTED;
1024*a9643ea8Slogwang     }
1025*a9643ea8Slogwang 
1026*a9643ea8Slogwang     // 2. ������״̬, �ɹ�����
1027*a9643ea8Slogwang     if (_state & LINK_CONNECTED)
1028*a9643ea8Slogwang     {
1029*a9643ea8Slogwang         return true;
1030*a9643ea8Slogwang     }
1031*a9643ea8Slogwang 
1032*a9643ea8Slogwang     // 3. ����������, �˳��ȴ�
1033*a9643ea8Slogwang     if (_state & LINK_CONNECTING)
1034*a9643ea8Slogwang     {
1035*a9643ea8Slogwang         return false;
1036*a9643ea8Slogwang     }
1037*a9643ea8Slogwang 
1038*a9643ea8Slogwang     // 4. ������, �״����ӳ���
1039*a9643ea8Slogwang     struct sockaddr_in addr = {0};
1040*a9643ea8Slogwang 
1041*a9643ea8Slogwang     mt_hook_syscall(connect);
1042*a9643ea8Slogwang     int32_t ret = ff_hook_connect(_fd, (struct sockaddr*)this->GetDestAddr(&addr), sizeof(struct sockaddr_in));
1043*a9643ea8Slogwang     if (ret < 0)
1044*a9643ea8Slogwang     {
1045*a9643ea8Slogwang         int32_t err = errno;
1046*a9643ea8Slogwang         if (err == EISCONN)
1047*a9643ea8Slogwang         {
1048*a9643ea8Slogwang             _state |= LINK_CONNECTED;
1049*a9643ea8Slogwang             return true;
1050*a9643ea8Slogwang         }
1051*a9643ea8Slogwang         else
1052*a9643ea8Slogwang         {
1053*a9643ea8Slogwang             _state |= LINK_CONNECTING;
1054*a9643ea8Slogwang             if ((err == EINPROGRESS) || (err == EALREADY) || (err == EINTR))
1055*a9643ea8Slogwang             {
1056*a9643ea8Slogwang                 MTLOG_DEBUG("Open connect not ok, maybe first try, sock %d, errno %d", _fd, err);
1057*a9643ea8Slogwang                 return false;
1058*a9643ea8Slogwang             }
1059*a9643ea8Slogwang             else
1060*a9643ea8Slogwang             {
1061*a9643ea8Slogwang                 MTLOG_ERROR("Open connect not ok, sock %d, errno %d", _fd, err);
1062*a9643ea8Slogwang                 return false;
1063*a9643ea8Slogwang             }
1064*a9643ea8Slogwang         }
1065*a9643ea8Slogwang     }
1066*a9643ea8Slogwang     else
1067*a9643ea8Slogwang     {
1068*a9643ea8Slogwang         _state |= LINK_CONNECTED;
1069*a9643ea8Slogwang         return true;
1070*a9643ea8Slogwang     }
1071*a9643ea8Slogwang }
1072*a9643ea8Slogwang 
1073*a9643ea8Slogwang // ��UDP�ķ�ʽ���͵ȴ�������, һ��ֱ�Ӿͷ���OK
1074*a9643ea8Slogwang int32_t CSockLink::SendCacheUdp(void* data, uint32_t len)
1075*a9643ea8Slogwang {
1076*a9643ea8Slogwang     mt_hook_syscall(sendto);
1077*a9643ea8Slogwang     void* buff = NULL;
1078*a9643ea8Slogwang     uint32_t buff_len = 0;
1079*a9643ea8Slogwang 
1080*a9643ea8Slogwang     CNetHandler* item = NULL;
1081*a9643ea8Slogwang     CNetHandler* tmp = NULL;
1082*a9643ea8Slogwang     struct sockaddr_in dst = {0};
1083*a9643ea8Slogwang 
1084*a9643ea8Slogwang     // 1. ���Է��͵ȴ�����, ����ָ�����߳�
1085*a9643ea8Slogwang     TAILQ_FOREACH_SAFE(item, &_wait_send, _link_entry, tmp)
1086*a9643ea8Slogwang     {
1087*a9643ea8Slogwang         item->GetSendData(buff, buff_len);
1088*a9643ea8Slogwang         if ((NULL == buff) || (buff_len == 0))
1089*a9643ea8Slogwang         {
1090*a9643ea8Slogwang             MTLOG_ERROR("get buff ptr invalid, log it");
1091*a9643ea8Slogwang             NotifyThread(item, 0);
1092*a9643ea8Slogwang             item->SwitchToIdle();
1093*a9643ea8Slogwang             continue;
1094*a9643ea8Slogwang         }
1095*a9643ea8Slogwang 
1096*a9643ea8Slogwang         int32_t ret = ff_hook_sendto(_fd, buff, buff_len, 0,
1097*a9643ea8Slogwang                     (struct sockaddr*)this->GetDestAddr(&dst), sizeof(struct sockaddr_in));
1098*a9643ea8Slogwang         if (ret == -1)
1099*a9643ea8Slogwang         {
1100*a9643ea8Slogwang             if ((errno == EINTR) || (errno == EAGAIN) || (errno == EINPROGRESS))
1101*a9643ea8Slogwang             {
1102*a9643ea8Slogwang                 return 0;
1103*a9643ea8Slogwang             }
1104*a9643ea8Slogwang             else
1105*a9643ea8Slogwang             {
1106*a9643ea8Slogwang                 MTLOG_ERROR("socket send failed, fd %d, errno %d(%s)", _fd,
1107*a9643ea8Slogwang                           errno, strerror(errno));
1108*a9643ea8Slogwang                 return -2;
1109*a9643ea8Slogwang             }
1110*a9643ea8Slogwang         }
1111*a9643ea8Slogwang 
1112*a9643ea8Slogwang         NotifyThread(item, 0);
1113*a9643ea8Slogwang         item->SwitchToIdle();
1114*a9643ea8Slogwang     }
1115*a9643ea8Slogwang 
1116*a9643ea8Slogwang     // 2. ������OK��, �ٷ��ͱ�������, û�д���������������
1117*a9643ea8Slogwang     if ((data == NULL) || (len == 0))
1118*a9643ea8Slogwang     {
1119*a9643ea8Slogwang         return 0;
1120*a9643ea8Slogwang     }
1121*a9643ea8Slogwang 
1122*a9643ea8Slogwang     int32_t ret = ff_hook_sendto(_fd, data, len, 0,
1123*a9643ea8Slogwang                     (struct sockaddr*)this->GetDestAddr(&dst), sizeof(struct sockaddr_in));
1124*a9643ea8Slogwang     if (ret == -1)
1125*a9643ea8Slogwang     {
1126*a9643ea8Slogwang         if ((errno == EINTR) || (errno == EAGAIN) || (errno == EINPROGRESS))
1127*a9643ea8Slogwang         {
1128*a9643ea8Slogwang             return 0;
1129*a9643ea8Slogwang         }
1130*a9643ea8Slogwang         else
1131*a9643ea8Slogwang         {
1132*a9643ea8Slogwang             MTLOG_ERROR("socket send failed, fd %d, errno %d(%s)", _fd,
1133*a9643ea8Slogwang                    errno, strerror(errno));
1134*a9643ea8Slogwang             return -2;
1135*a9643ea8Slogwang         }
1136*a9643ea8Slogwang     }
1137*a9643ea8Slogwang     else
1138*a9643ea8Slogwang     {
1139*a9643ea8Slogwang         return ret;
1140*a9643ea8Slogwang     }
1141*a9643ea8Slogwang }
1142*a9643ea8Slogwang 
1143*a9643ea8Slogwang 
1144*a9643ea8Slogwang // TCP�Ļ��巢�ʹ���
1145*a9643ea8Slogwang int32_t CSockLink::SendCacheTcp(void* data, uint32_t len)
1146*a9643ea8Slogwang {
1147*a9643ea8Slogwang     void* buff = NULL;
1148*a9643ea8Slogwang     uint32_t buff_len = 0;
1149*a9643ea8Slogwang     struct iovec iov[64];
1150*a9643ea8Slogwang     int32_t count = 0;
1151*a9643ea8Slogwang     CNetHandler* item = NULL;
1152*a9643ea8Slogwang     CNetHandler* tmp = NULL;
1153*a9643ea8Slogwang 
1154*a9643ea8Slogwang     // 1. ���Է��͵ȴ�����, ����ָ�����߳�
1155*a9643ea8Slogwang     TAILQ_FOREACH_SAFE(item, &_wait_send, _link_entry, tmp)
1156*a9643ea8Slogwang     {
1157*a9643ea8Slogwang         item->GetSendData(buff, buff_len);
1158*a9643ea8Slogwang         iov[count].iov_base = buff;
1159*a9643ea8Slogwang         iov[count].iov_len  = (int32_t)buff_len;
1160*a9643ea8Slogwang         count++;
1161*a9643ea8Slogwang         if (count >= 64)
1162*a9643ea8Slogwang         {
1163*a9643ea8Slogwang             break;
1164*a9643ea8Slogwang         }
1165*a9643ea8Slogwang     }
1166*a9643ea8Slogwang     if ((count < 64) && (data != NULL))
1167*a9643ea8Slogwang     {
1168*a9643ea8Slogwang         iov[count].iov_base = data;
1169*a9643ea8Slogwang         iov[count].iov_len  = (int32_t)len;
1170*a9643ea8Slogwang         count++;
1171*a9643ea8Slogwang     }
1172*a9643ea8Slogwang 
1173*a9643ea8Slogwang     ssize_t bytes = writev(_fd, iov, count);
1174*a9643ea8Slogwang     if (bytes < 0)
1175*a9643ea8Slogwang     {
1176*a9643ea8Slogwang         if ((errno == EAGAIN) || (errno == EINTR))
1177*a9643ea8Slogwang         {
1178*a9643ea8Slogwang             return 0;
1179*a9643ea8Slogwang         }
1180*a9643ea8Slogwang         else
1181*a9643ea8Slogwang         {
1182*a9643ea8Slogwang             MTLOG_ERROR("socket writev failed, fd %d, errno %d(%s)", _fd,
1183*a9643ea8Slogwang                    errno, strerror(errno));
1184*a9643ea8Slogwang             return -1;
1185*a9643ea8Slogwang         }
1186*a9643ea8Slogwang     }
1187*a9643ea8Slogwang 
1188*a9643ea8Slogwang     // 2. ���Է��͵ȴ�����, ����ָ�����߳�
1189*a9643ea8Slogwang     uint32_t send_left = (uint32_t)bytes;
1190*a9643ea8Slogwang     TAILQ_FOREACH_SAFE(item, &_wait_send, _link_entry, tmp)
1191*a9643ea8Slogwang     {
1192*a9643ea8Slogwang         send_left -= item->SkipSendPos(send_left);
1193*a9643ea8Slogwang         item->GetSendData(buff, buff_len);
1194*a9643ea8Slogwang         if (buff_len == 0)
1195*a9643ea8Slogwang         {
1196*a9643ea8Slogwang             NotifyThread(item, 0);
1197*a9643ea8Slogwang             item->SwitchToIdle();
1198*a9643ea8Slogwang         }
1199*a9643ea8Slogwang 
1200*a9643ea8Slogwang         if (send_left == 0)
1201*a9643ea8Slogwang         {
1202*a9643ea8Slogwang             break;
1203*a9643ea8Slogwang         }
1204*a9643ea8Slogwang     }
1205*a9643ea8Slogwang 
1206*a9643ea8Slogwang     return send_left;
1207*a9643ea8Slogwang }
1208*a9643ea8Slogwang 
1209*a9643ea8Slogwang 
1210*a9643ea8Slogwang // �������ӹ���
1211*a9643ea8Slogwang int32_t CSockLink::SendData(void* data, uint32_t len)
1212*a9643ea8Slogwang {
1213*a9643ea8Slogwang     int32_t ret = 0;
1214*a9643ea8Slogwang     bool rc = false;
1215*a9643ea8Slogwang 
1216*a9643ea8Slogwang     this->_last_access = mt_time_ms();
1217*a9643ea8Slogwang 
1218*a9643ea8Slogwang     // 1. ���Է�������, �ȷ����Ŷӵ�����
1219*a9643ea8Slogwang     if (_proto_type == NET_PROTO_UDP)
1220*a9643ea8Slogwang     {
1221*a9643ea8Slogwang         ret = SendCacheUdp(data, len);
1222*a9643ea8Slogwang     }
1223*a9643ea8Slogwang     else
1224*a9643ea8Slogwang     {
1225*a9643ea8Slogwang         ret = SendCacheTcp(data, len);
1226*a9643ea8Slogwang     }
1227*a9643ea8Slogwang 
1228*a9643ea8Slogwang     // 2. ��ǰ�����Ƿ������, �����, ���Բ�������OUT
1229*a9643ea8Slogwang     if (ret < (int32_t)len)
1230*a9643ea8Slogwang     {
1231*a9643ea8Slogwang         this->EnableOutput();
1232*a9643ea8Slogwang         rc = MtFrame::Instance()->KqueueCtrlAdd(_fd, KQ_EVENT_READ);
1233*a9643ea8Slogwang     }
1234*a9643ea8Slogwang     else
1235*a9643ea8Slogwang     {
1236*a9643ea8Slogwang         this->DisableOutput();
1237*a9643ea8Slogwang         rc = MtFrame::Instance()->KqueueCtrlDel(_fd, KQ_EVENT_WRITE);
1238*a9643ea8Slogwang     }
1239*a9643ea8Slogwang 
1240*a9643ea8Slogwang     // 3. ����ˢ�¾��epollע��
1241*a9643ea8Slogwang     if (!rc)
1242*a9643ea8Slogwang     {
1243*a9643ea8Slogwang         MTLOG_ERROR("socket epoll mng failed[%m], wait timeout");
1244*a9643ea8Slogwang     }
1245*a9643ea8Slogwang 
1246*a9643ea8Slogwang     return ret;
1247*a9643ea8Slogwang }
1248*a9643ea8Slogwang 
1249*a9643ea8Slogwang // ���ݷַ��������
1250*a9643ea8Slogwang int32_t CSockLink::RecvDispath()
1251*a9643ea8Slogwang {
1252*a9643ea8Slogwang     if (_proto_type == NET_PROTO_UDP)
1253*a9643ea8Slogwang     {
1254*a9643ea8Slogwang         return this->DispathUdp();
1255*a9643ea8Slogwang     }
1256*a9643ea8Slogwang     else
1257*a9643ea8Slogwang     {
1258*a9643ea8Slogwang         return this->DispathTcp();
1259*a9643ea8Slogwang     }
1260*a9643ea8Slogwang }
1261*a9643ea8Slogwang 
1262*a9643ea8Slogwang // ���Խ��ո�������ݵ���ʱbuff
1263*a9643ea8Slogwang void CSockLink::ExtendRecvRsp()
1264*a9643ea8Slogwang {
1265*a9643ea8Slogwang     if (NULL == _rsp_buff)
1266*a9643ea8Slogwang     {
1267*a9643ea8Slogwang         // buff���η���, ���̫��, �ᵼ���ظ��Ŀ���; ���̫С, �ᵼ��2��check���
1268*a9643ea8Slogwang         // Ȩ��һ��, �ݶ�500�ֽ�����, ���ֳ���, ������ȫ, �˷Ѳ���, С���ֳ���
1269*a9643ea8Slogwang         // ��Ҫrealloc, ��500�ֽڿ�������Ҳ����
1270*a9643ea8Slogwang         _rsp_buff = new_sk_buffer(512);
1271*a9643ea8Slogwang         if (NULL == _rsp_buff)
1272*a9643ea8Slogwang         {
1273*a9643ea8Slogwang             MTLOG_ERROR("no more memory, error");
1274*a9643ea8Slogwang             return;
1275*a9643ea8Slogwang         }
1276*a9643ea8Slogwang     }
1277*a9643ea8Slogwang 
1278*a9643ea8Slogwang     _rsp_buff->data_len +=  read_cache_begin(&_recv_cache, _rsp_buff->data_len,
1279*a9643ea8Slogwang         _rsp_buff->data + _rsp_buff->data_len , _rsp_buff->size - _rsp_buff->data_len);
1280*a9643ea8Slogwang }
1281*a9643ea8Slogwang 
1282*a9643ea8Slogwang // ���߻ص�����, ���ȴ��Ŷӵȴ��л�ȡ, ���ݴӸ��ڵ��ȡ
1283*a9643ea8Slogwang CHECK_SESSION_CALLBACK CSockLink::GetSessionCallback()
1284*a9643ea8Slogwang {
1285*a9643ea8Slogwang     CHECK_SESSION_CALLBACK check_session = NULL;
1286*a9643ea8Slogwang 
1287*a9643ea8Slogwang     // 1. �Ŷӻ�ȡ�ص�����
1288*a9643ea8Slogwang     CNetHandler* item = TAILQ_FIRST(&_wait_recv);
1289*a9643ea8Slogwang     if (NULL == item)
1290*a9643ea8Slogwang     {
1291*a9643ea8Slogwang         MTLOG_DEBUG("recv data with no wait item, err");
1292*a9643ea8Slogwang         goto EXIT_LABEL;
1293*a9643ea8Slogwang     }
1294*a9643ea8Slogwang 
1295*a9643ea8Slogwang     check_session = item->GetSessionCallback();
1296*a9643ea8Slogwang     if (NULL == check_session)
1297*a9643ea8Slogwang     {
1298*a9643ea8Slogwang         MTLOG_ERROR("recv data with no session callback, err");
1299*a9643ea8Slogwang         goto EXIT_LABEL;
1300*a9643ea8Slogwang     }
1301*a9643ea8Slogwang 
1302*a9643ea8Slogwang EXIT_LABEL:
1303*a9643ea8Slogwang 
1304*a9643ea8Slogwang     // 2. ������ڵ�ΪNULL, ֱ�ӷ��ض��н��
1305*a9643ea8Slogwang     CDestLinks* dstlink = (CDestLinks*)_parents;
1306*a9643ea8Slogwang     if (NULL == dstlink)
1307*a9643ea8Slogwang     {
1308*a9643ea8Slogwang         return check_session;
1309*a9643ea8Slogwang     }
1310*a9643ea8Slogwang 
1311*a9643ea8Slogwang     // 3. ������н��Ϊ��, ����·Ĭ�ϻ����func; ��Ϊ��, ������
1312*a9643ea8Slogwang     if (check_session != NULL)
1313*a9643ea8Slogwang     {
1314*a9643ea8Slogwang         dstlink->SetDefaultCallback(check_session);
1315*a9643ea8Slogwang     }
1316*a9643ea8Slogwang     else
1317*a9643ea8Slogwang     {
1318*a9643ea8Slogwang         check_session = dstlink->GetDefaultCallback();
1319*a9643ea8Slogwang     }
1320*a9643ea8Slogwang 
1321*a9643ea8Slogwang     return check_session;
1322*a9643ea8Slogwang }
1323*a9643ea8Slogwang 
1324*a9643ea8Slogwang 
1325*a9643ea8Slogwang // TCP����������������ַ�
1326*a9643ea8Slogwang int32_t CSockLink::DispathTcp()
1327*a9643ea8Slogwang {
1328*a9643ea8Slogwang     // 1. TCP�޵ȴ�����, ������ʽ, �������β, ֻ�ܹر�
1329*a9643ea8Slogwang     CHECK_SESSION_CALLBACK check_session = this->GetSessionCallback();
1330*a9643ea8Slogwang     if (NULL == check_session)
1331*a9643ea8Slogwang     {
1332*a9643ea8Slogwang         MTLOG_ERROR("recv data with no session callback, err");
1333*a9643ea8Slogwang         return -1;
1334*a9643ea8Slogwang     }
1335*a9643ea8Slogwang 
1336*a9643ea8Slogwang     // 2. ����ͬһIP/PORT��Э��, ��������������ͨ��
1337*a9643ea8Slogwang     uint32_t need_len = 0;
1338*a9643ea8Slogwang     uint64_t sid = 0;
1339*a9643ea8Slogwang     int32_t ret = 0;
1340*a9643ea8Slogwang     while (_recv_cache.len > 0)
1341*a9643ea8Slogwang     {
1342*a9643ea8Slogwang         this->ExtendRecvRsp();
1343*a9643ea8Slogwang         if (NULL == _rsp_buff)
1344*a9643ea8Slogwang         {
1345*a9643ea8Slogwang             MTLOG_ERROR("alloc memory, error");
1346*a9643ea8Slogwang             _errno = RC_MEM_ERROR;
1347*a9643ea8Slogwang             return -3;
1348*a9643ea8Slogwang         }
1349*a9643ea8Slogwang 
1350*a9643ea8Slogwang         need_len = 0;
1351*a9643ea8Slogwang         ret = check_session(_rsp_buff->data, _rsp_buff->data_len, &sid, &need_len);
1352*a9643ea8Slogwang 
1353*a9643ea8Slogwang         if (ret < 0)
1354*a9643ea8Slogwang         {
1355*a9643ea8Slogwang             MTLOG_ERROR("user check resp failed, ret %d", ret);
1356*a9643ea8Slogwang             _errno = RC_CHECK_PKG_FAIL;
1357*a9643ea8Slogwang             return -1;
1358*a9643ea8Slogwang         }
1359*a9643ea8Slogwang 
1360*a9643ea8Slogwang         if (ret == 0)
1361*a9643ea8Slogwang         {
1362*a9643ea8Slogwang             // 1. �û������ָ������, Ĭ��2����չ, ���ܻ���Ӱ��
1363*a9643ea8Slogwang             if ((need_len == 0) && (_rsp_buff->data_len == _rsp_buff->size))
1364*a9643ea8Slogwang             {
1365*a9643ea8Slogwang                 MTLOG_DEBUG("recv default buff full[%u], but user no set need length", _rsp_buff->size);
1366*a9643ea8Slogwang                 need_len = _rsp_buff->size * 2;
1367*a9643ea8Slogwang             }
1368*a9643ea8Slogwang 
1369*a9643ea8Slogwang             // 2. ����ʣ��ռ�, ������ȴ�����; �����ռ䳬��, �������
1370*a9643ea8Slogwang             if ((need_len <= _rsp_buff->size) || (need_len > 100*1024*1024))
1371*a9643ea8Slogwang             {
1372*a9643ea8Slogwang                 MTLOG_DEBUG("maybe need wait more data: %u", need_len);
1373*a9643ea8Slogwang                 return 0;
1374*a9643ea8Slogwang             }
1375*a9643ea8Slogwang 
1376*a9643ea8Slogwang             // 3. ��չbuff����, ׼���ٳ���һ��
1377*a9643ea8Slogwang             _rsp_buff = reserve_sk_buffer(_rsp_buff, need_len);
1378*a9643ea8Slogwang             if (NULL == _rsp_buff)
1379*a9643ea8Slogwang             {
1380*a9643ea8Slogwang                 MTLOG_ERROR("no more memory, error");
1381*a9643ea8Slogwang                 _errno = RC_MEM_ERROR;
1382*a9643ea8Slogwang                 return -3;
1383*a9643ea8Slogwang             }
1384*a9643ea8Slogwang 
1385*a9643ea8Slogwang             // 4. �Ѿ��޶����������Ϣ, �ȴ������հ�
1386*a9643ea8Slogwang             if (_rsp_buff->data_len >= _recv_cache.len)
1387*a9643ea8Slogwang             {
1388*a9643ea8Slogwang                 MTLOG_DEBUG("maybe need wait more data, now %u", _recv_cache.len);
1389*a9643ea8Slogwang                 return 0;
1390*a9643ea8Slogwang             }
1391*a9643ea8Slogwang 
1392*a9643ea8Slogwang             // 5. ������������, ���Խ���
1393*a9643ea8Slogwang             continue;
1394*a9643ea8Slogwang         }
1395*a9643ea8Slogwang 
1396*a9643ea8Slogwang         // �����쳣����, ����ʵ��δ������, �����ȴ�
1397*a9643ea8Slogwang         if (ret > (int32_t)_recv_cache.len)
1398*a9643ea8Slogwang         {
1399*a9643ea8Slogwang             MTLOG_DEBUG("maybe pkg not all ok, wait more");
1400*a9643ea8Slogwang             return 0;
1401*a9643ea8Slogwang         }
1402*a9643ea8Slogwang 
1403*a9643ea8Slogwang         // ��ѯ��session�Ķ���
1404*a9643ea8Slogwang         CNetHandler* session = this->FindSession(sid);
1405*a9643ea8Slogwang         if (NULL == session)
1406*a9643ea8Slogwang         {
1407*a9643ea8Slogwang             MTLOG_DEBUG("session id %llu, find failed, maybe timeout", sid);
1408*a9643ea8Slogwang             cache_skip_data(&_recv_cache, ret);
1409*a9643ea8Slogwang             delete_sk_buffer(_rsp_buff);
1410*a9643ea8Slogwang             _rsp_buff = NULL;
1411*a9643ea8Slogwang         }
1412*a9643ea8Slogwang         else
1413*a9643ea8Slogwang         {
1414*a9643ea8Slogwang             MTLOG_DEBUG("session id %llu, find ok, wakeup it", sid);
1415*a9643ea8Slogwang             cache_skip_data(&_recv_cache, ret);
1416*a9643ea8Slogwang             this->NotifyThread(session, 0);
1417*a9643ea8Slogwang             session->SwitchToIdle();
1418*a9643ea8Slogwang             _rsp_buff->data_len = ret;  //TCPʵ����Ч�ı��ij��ȱ��
1419*a9643ea8Slogwang             session->SetRespBuff(_rsp_buff);
1420*a9643ea8Slogwang             _rsp_buff = NULL;
1421*a9643ea8Slogwang         }
1422*a9643ea8Slogwang     }
1423*a9643ea8Slogwang 
1424*a9643ea8Slogwang     return 0;
1425*a9643ea8Slogwang 
1426*a9643ea8Slogwang }
1427*a9643ea8Slogwang 
1428*a9643ea8Slogwang 
1429*a9643ea8Slogwang // UDP����������������ַ�
1430*a9643ea8Slogwang int32_t CSockLink::DispathUdp()
1431*a9643ea8Slogwang {
1432*a9643ea8Slogwang     // 1. UDP���ݴ���, ������Ҳ���Զ��������еı���
1433*a9643ea8Slogwang     CHECK_SESSION_CALLBACK check_session = NULL;
1434*a9643ea8Slogwang     CNetHandler* item = TAILQ_FIRST(&_wait_recv);
1435*a9643ea8Slogwang     if (NULL == item)
1436*a9643ea8Slogwang     {
1437*a9643ea8Slogwang         MTLOG_DEBUG("recv data with no wait item, maybe wrong pkg recv");
1438*a9643ea8Slogwang         // �˴����˳�, ��Ϊ���Զ���UDPӦ��(��ʱ��\������)
1439*a9643ea8Slogwang     }
1440*a9643ea8Slogwang     else
1441*a9643ea8Slogwang     {
1442*a9643ea8Slogwang         check_session = item->GetSessionCallback();
1443*a9643ea8Slogwang         if (NULL == check_session)
1444*a9643ea8Slogwang         {
1445*a9643ea8Slogwang             MTLOG_TRACE("recv data with no session callback, err");
1446*a9643ea8Slogwang         }
1447*a9643ea8Slogwang     }
1448*a9643ea8Slogwang 
1449*a9643ea8Slogwang     // 2. ����ÿ����, �ҵ�session�ʹ���, ������
1450*a9643ea8Slogwang     uint64_t sid = 0;
1451*a9643ea8Slogwang     uint32_t need_len = 0;
1452*a9643ea8Slogwang     int32_t ret = 0;
1453*a9643ea8Slogwang     TSkBuffer* block = NULL;
1454*a9643ea8Slogwang     while ((block = TAILQ_FIRST(&_recv_cache.list)) != NULL)
1455*a9643ea8Slogwang     {
1456*a9643ea8Slogwang         if (check_session == NULL)
1457*a9643ea8Slogwang         {
1458*a9643ea8Slogwang             MTLOG_DEBUG("no recv wait, skip first block");
1459*a9643ea8Slogwang             cache_skip_data(&_recv_cache, block->data_len);
1460*a9643ea8Slogwang             continue;
1461*a9643ea8Slogwang         }
1462*a9643ea8Slogwang 
1463*a9643ea8Slogwang         need_len = 0;
1464*a9643ea8Slogwang         ret = check_session(block->data, block->data_len, &sid, &need_len);
1465*a9643ea8Slogwang         if ((ret <= 0) || (ret > (int32_t)block->data_len))
1466*a9643ea8Slogwang         {
1467*a9643ea8Slogwang             MTLOG_DEBUG("maybe wrong pkg come, skip it");
1468*a9643ea8Slogwang             cache_skip_data(&_recv_cache, block->data_len);
1469*a9643ea8Slogwang             continue;
1470*a9643ea8Slogwang         }
1471*a9643ea8Slogwang 
1472*a9643ea8Slogwang         // ��ѯ��session�Ķ���
1473*a9643ea8Slogwang         CNetHandler* session = this->FindSession(sid);
1474*a9643ea8Slogwang         if (NULL == session)
1475*a9643ea8Slogwang         {
1476*a9643ea8Slogwang             MTLOG_DEBUG("session id %llu, find failed, maybe timeout", sid);
1477*a9643ea8Slogwang             cache_skip_data(&_recv_cache, block->data_len);
1478*a9643ea8Slogwang         }
1479*a9643ea8Slogwang         else
1480*a9643ea8Slogwang         {
1481*a9643ea8Slogwang             MTLOG_DEBUG("session id %llu, find ok, wakeup it", sid);
1482*a9643ea8Slogwang             this->NotifyThread(session, 0);
1483*a9643ea8Slogwang             session->SwitchToIdle();
1484*a9643ea8Slogwang             cache_skip_first_buffer(&_recv_cache);
1485*a9643ea8Slogwang             session->SetRespBuff(block);
1486*a9643ea8Slogwang         }
1487*a9643ea8Slogwang     }
1488*a9643ea8Slogwang 
1489*a9643ea8Slogwang     return 0;
1490*a9643ea8Slogwang }
1491*a9643ea8Slogwang 
1492*a9643ea8Slogwang 
1493*a9643ea8Slogwang // ��ѯ����sessionid������session��Ϣ
1494*a9643ea8Slogwang CNetHandler* CSockLink::FindSession(uint64_t sid)
1495*a9643ea8Slogwang {
1496*a9643ea8Slogwang     CNetHandler key;
1497*a9643ea8Slogwang     CDestLinks* dstlink = (CDestLinks*)_parents;
1498*a9643ea8Slogwang     if (NULL == dstlink)
1499*a9643ea8Slogwang     {
1500*a9643ea8Slogwang         MTLOG_ERROR("session dest link invalid, maybe error");
1501*a9643ea8Slogwang         return NULL;
1502*a9643ea8Slogwang     }
1503*a9643ea8Slogwang     struct sockaddr_in addr;
1504*a9643ea8Slogwang     key.SetDestAddress(this->GetDestAddr(&addr));
1505*a9643ea8Slogwang     key.SetConnType(dstlink->GetConnType());
1506*a9643ea8Slogwang     key.SetProtoType(dstlink->GetProtoType());
1507*a9643ea8Slogwang     key.SetSessionId(sid);
1508*a9643ea8Slogwang 
1509*a9643ea8Slogwang     return CNetMgr::Instance()->FindNetItem(&key);
1510*a9643ea8Slogwang }
1511*a9643ea8Slogwang 
1512*a9643ea8Slogwang 
1513*a9643ea8Slogwang 
1514*a9643ea8Slogwang /**
1515*a9643ea8Slogwang  *  @brief �ɶ��¼�֪ͨ�ӿ�, ����֪ͨ������ܻ��ƻ�����, ���÷���ֵ����
1516*a9643ea8Slogwang  *  @return 0 ��fd�ɼ������������¼�; !=0 ��fd�������ص�����
1517*a9643ea8Slogwang  */
1518*a9643ea8Slogwang int CSockLink::InputNotify()
1519*a9643ea8Slogwang {
1520*a9643ea8Slogwang     int32_t ret = 0;
1521*a9643ea8Slogwang 
1522*a9643ea8Slogwang     this->_last_access = mt_time_ms();
1523*a9643ea8Slogwang 
1524*a9643ea8Slogwang     // 1. ��������, ��������
1525*a9643ea8Slogwang     if (_proto_type == NET_PROTO_UDP)
1526*a9643ea8Slogwang     {
1527*a9643ea8Slogwang         ret = cache_udp_recv(&_recv_cache, _fd, NULL);
1528*a9643ea8Slogwang     }
1529*a9643ea8Slogwang     else
1530*a9643ea8Slogwang     {
1531*a9643ea8Slogwang         ret = cache_tcp_recv(&_recv_cache, _fd);
1532*a9643ea8Slogwang     }
1533*a9643ea8Slogwang 
1534*a9643ea8Slogwang     // 2. ����ʧ������������
1535*a9643ea8Slogwang     if (ret < 0)
1536*a9643ea8Slogwang     {
1537*a9643ea8Slogwang         if (ret == -SK_ERR_NEED_CLOSE)
1538*a9643ea8Slogwang         {
1539*a9643ea8Slogwang             MTLOG_DEBUG("recv on link failed, remote close");
1540*a9643ea8Slogwang             _errno = RC_REMOTE_CLOSED;
1541*a9643ea8Slogwang         }
1542*a9643ea8Slogwang         else
1543*a9643ea8Slogwang         {
1544*a9643ea8Slogwang             MTLOG_ERROR("recv on link failed, close it, ret %d[%m]", ret);
1545*a9643ea8Slogwang             _errno = RC_RECV_FAIL;
1546*a9643ea8Slogwang         }
1547*a9643ea8Slogwang 
1548*a9643ea8Slogwang         this->Destroy();
1549*a9643ea8Slogwang         return -1;
1550*a9643ea8Slogwang     }
1551*a9643ea8Slogwang 
1552*a9643ea8Slogwang     // 3. �ַ�����, ������TCP
1553*a9643ea8Slogwang     ret = this->RecvDispath();
1554*a9643ea8Slogwang     if (ret < 0)
1555*a9643ea8Slogwang     {
1556*a9643ea8Slogwang         MTLOG_DEBUG("recv dispath failed, close it, ret %d[%m]", ret);
1557*a9643ea8Slogwang         this->Destroy();
1558*a9643ea8Slogwang         return -2;
1559*a9643ea8Slogwang     }
1560*a9643ea8Slogwang 
1561*a9643ea8Slogwang     // 4. �ɹ�����
1562*a9643ea8Slogwang     return 0;
1563*a9643ea8Slogwang 
1564*a9643ea8Slogwang }
1565*a9643ea8Slogwang 
1566*a9643ea8Slogwang /**
1567*a9643ea8Slogwang  *  @brief ��д�¼�֪ͨ�ӿ�, ����֪ͨ������ܻ��ƻ�����, ���÷���ֵ����
1568*a9643ea8Slogwang  *  @return 0 ��fd�ɼ������������¼�; !=0 ��fd�������ص�����
1569*a9643ea8Slogwang  */
1570*a9643ea8Slogwang int CSockLink::OutputNotify()
1571*a9643ea8Slogwang {
1572*a9643ea8Slogwang     int32_t ret = 0;
1573*a9643ea8Slogwang 
1574*a9643ea8Slogwang     this->_last_access = mt_time_ms();
1575*a9643ea8Slogwang 
1576*a9643ea8Slogwang     // 1. ���ӵȴ�����������л�
1577*a9643ea8Slogwang     if (_state & LINK_CONNECTING)
1578*a9643ea8Slogwang     {
1579*a9643ea8Slogwang         _state &= ~LINK_CONNECTING;
1580*a9643ea8Slogwang         _state |= LINK_CONNECTED;
1581*a9643ea8Slogwang 
1582*a9643ea8Slogwang         CNetHandler* item = NULL;
1583*a9643ea8Slogwang         CNetHandler* tmp = NULL;
1584*a9643ea8Slogwang         TAILQ_FOREACH_SAFE(item, &_wait_connect, _link_entry, tmp)
1585*a9643ea8Slogwang         {
1586*a9643ea8Slogwang             NotifyThread(item, 0);
1587*a9643ea8Slogwang             item->SwitchToIdle();
1588*a9643ea8Slogwang         }
1589*a9643ea8Slogwang     }
1590*a9643ea8Slogwang 
1591*a9643ea8Slogwang     // 2. ���Է�������
1592*a9643ea8Slogwang     if (_proto_type == NET_PROTO_UDP)
1593*a9643ea8Slogwang     {
1594*a9643ea8Slogwang         ret = SendCacheUdp(NULL, 0);
1595*a9643ea8Slogwang     }
1596*a9643ea8Slogwang     else
1597*a9643ea8Slogwang     {
1598*a9643ea8Slogwang         ret = SendCacheTcp(NULL, 0);
1599*a9643ea8Slogwang     }
1600*a9643ea8Slogwang 
1601*a9643ea8Slogwang     // 3. ����ʧ������������
1602*a9643ea8Slogwang     if (ret < 0)
1603*a9643ea8Slogwang     {
1604*a9643ea8Slogwang         MTLOG_ERROR("Send on link failed, close it, ret %d[%m]", ret);
1605*a9643ea8Slogwang         _errno = RC_SEND_FAIL;
1606*a9643ea8Slogwang         this->Destroy();
1607*a9643ea8Slogwang         return ret;
1608*a9643ea8Slogwang     }
1609*a9643ea8Slogwang 
1610*a9643ea8Slogwang     // 4. ��������, �����������epoll
1611*a9643ea8Slogwang     if (TAILQ_EMPTY(&_wait_send))
1612*a9643ea8Slogwang     {
1613*a9643ea8Slogwang         this->DisableOutput();
1614*a9643ea8Slogwang         if (!MtFrame::Instance()->KqueueCtrlDel(_fd, KQ_EVENT_WRITE))
1615*a9643ea8Slogwang         {
1616*a9643ea8Slogwang             MTLOG_ERROR("socket epoll mng failed[%m], wait timeout");
1617*a9643ea8Slogwang         }
1618*a9643ea8Slogwang     }
1619*a9643ea8Slogwang 
1620*a9643ea8Slogwang     return 0;
1621*a9643ea8Slogwang }
1622*a9643ea8Slogwang 
1623*a9643ea8Slogwang 
1624*a9643ea8Slogwang /**
1625*a9643ea8Slogwang  *  @brief �쳣֪ͨ�ӿ�
1626*a9643ea8Slogwang  *  @return ���Է���ֵ, ���������¼�����
1627*a9643ea8Slogwang  */
1628*a9643ea8Slogwang int CSockLink::HangupNotify()
1629*a9643ea8Slogwang {
1630*a9643ea8Slogwang     MTLOG_ERROR("socket epoll error, fd %d", _fd);
1631*a9643ea8Slogwang 
1632*a9643ea8Slogwang     this->_errno = RC_KQUEUE_ERROR;
1633*a9643ea8Slogwang     this->Destroy();
1634*a9643ea8Slogwang     return -1;
1635*a9643ea8Slogwang }
1636*a9643ea8Slogwang 
1637*a9643ea8Slogwang 
1638*a9643ea8Slogwang // ���캯��
1639*a9643ea8Slogwang CDestLinks::CDestLinks()
1640*a9643ea8Slogwang {
1641*a9643ea8Slogwang     _timeout        = 5*60*1000; // Ĭ��5����
1642*a9643ea8Slogwang     _addr_ipv4      = 0;
1643*a9643ea8Slogwang     _net_port       = 0;
1644*a9643ea8Slogwang     _proto_type     = NET_PROTO_UNDEF;
1645*a9643ea8Slogwang     _conn_type      = TYPE_CONN_SESSION;
1646*a9643ea8Slogwang 
1647*a9643ea8Slogwang     _max_links      = 3; // Ĭ��3��
1648*a9643ea8Slogwang     _curr_link      = 0;
1649*a9643ea8Slogwang     _dflt_callback  = NULL;
1650*a9643ea8Slogwang 
1651*a9643ea8Slogwang     TAILQ_INIT(&_sock_list);
1652*a9643ea8Slogwang }
1653*a9643ea8Slogwang 
1654*a9643ea8Slogwang 
1655*a9643ea8Slogwang // ���ø��õĽӿں���
1656*a9643ea8Slogwang void CDestLinks::Reset()
1657*a9643ea8Slogwang {
1658*a9643ea8Slogwang     // �������Ӷ���
1659*a9643ea8Slogwang     CSockLink* item = NULL;
1660*a9643ea8Slogwang     CSockLink* temp = NULL;
1661*a9643ea8Slogwang     TAILQ_FOREACH_SAFE(item, &_sock_list, _link_entry, temp)
1662*a9643ea8Slogwang     {
1663*a9643ea8Slogwang         item->Destroy();
1664*a9643ea8Slogwang     }
1665*a9643ea8Slogwang     TAILQ_INIT(&_sock_list);
1666*a9643ea8Slogwang 
1667*a9643ea8Slogwang     // ��ʱ��ɾ��
1668*a9643ea8Slogwang     CTimerMng* timer = MtFrame::Instance()->GetTimerMng();
1669*a9643ea8Slogwang     if (NULL != timer)
1670*a9643ea8Slogwang     {
1671*a9643ea8Slogwang         timer->stop_timer(this);
1672*a9643ea8Slogwang     }
1673*a9643ea8Slogwang 
1674*a9643ea8Slogwang     // ����Ĭ���ֶ���Ϣ
1675*a9643ea8Slogwang     _timeout        = 5*60*1000; // Ĭ��5����
1676*a9643ea8Slogwang     _addr_ipv4      = 0;
1677*a9643ea8Slogwang     _net_port       = 0;
1678*a9643ea8Slogwang     _proto_type     = NET_PROTO_UNDEF;
1679*a9643ea8Slogwang     _conn_type      = TYPE_CONN_SESSION;
1680*a9643ea8Slogwang 
1681*a9643ea8Slogwang     _max_links      = 3; // Ĭ��3��
1682*a9643ea8Slogwang     _curr_link      = 0;
1683*a9643ea8Slogwang }
1684*a9643ea8Slogwang 
1685*a9643ea8Slogwang // ���캯��
1686*a9643ea8Slogwang CDestLinks::~CDestLinks()
1687*a9643ea8Slogwang {
1688*a9643ea8Slogwang     this->Reset();
1689*a9643ea8Slogwang }
1690*a9643ea8Slogwang 
1691*a9643ea8Slogwang // ������ʱ��
1692*a9643ea8Slogwang void CDestLinks::StartTimer()
1693*a9643ea8Slogwang {
1694*a9643ea8Slogwang     CTimerMng* timer = MtFrame::Instance()->GetTimerMng();
1695*a9643ea8Slogwang     if ((NULL == timer) || !timer->start_timer(this, 60*1000))
1696*a9643ea8Slogwang     {
1697*a9643ea8Slogwang         MTLOG_ERROR("obj %p attach timer failed, error", this);
1698*a9643ea8Slogwang     }
1699*a9643ea8Slogwang }
1700*a9643ea8Slogwang 
1701*a9643ea8Slogwang 
1702*a9643ea8Slogwang // �ͷ�һ������link
1703*a9643ea8Slogwang void CDestLinks::FreeSockLink(CSockLink* sock)
1704*a9643ea8Slogwang {
1705*a9643ea8Slogwang     if ((sock == NULL) || (sock->GetParentsPtr() != (void*)this))
1706*a9643ea8Slogwang     {
1707*a9643ea8Slogwang         MTLOG_ERROR("invalid socklink %p, error", sock);
1708*a9643ea8Slogwang         return;
1709*a9643ea8Slogwang     }
1710*a9643ea8Slogwang 
1711*a9643ea8Slogwang     TAILQ_REMOVE(&_sock_list, sock, _link_entry);
1712*a9643ea8Slogwang     if (this->_curr_link > 0) {
1713*a9643ea8Slogwang         this->_curr_link--;
1714*a9643ea8Slogwang     }
1715*a9643ea8Slogwang 
1716*a9643ea8Slogwang     sock->Reset();
1717*a9643ea8Slogwang     CNetMgr::Instance()->FreeSockLink(sock);
1718*a9643ea8Slogwang }
1719*a9643ea8Slogwang 
1720*a9643ea8Slogwang 
1721*a9643ea8Slogwang // ��ȡһ������link, ĿǰΪ��ѯ
1722*a9643ea8Slogwang CSockLink* CDestLinks::GetSockLink()
1723*a9643ea8Slogwang {
1724*a9643ea8Slogwang     CSockLink* link = NULL;
1725*a9643ea8Slogwang     if (_curr_link < _max_links)
1726*a9643ea8Slogwang     {
1727*a9643ea8Slogwang         link = CNetMgr::Instance()->AllocSockLink();
1728*a9643ea8Slogwang         if (NULL == link)
1729*a9643ea8Slogwang         {
1730*a9643ea8Slogwang             MTLOG_ERROR("alloc sock link failed, error");
1731*a9643ea8Slogwang             return NULL;
1732*a9643ea8Slogwang         }
1733*a9643ea8Slogwang         link->SetParentsPtr(this);
1734*a9643ea8Slogwang         link->SetProtoType(_proto_type);
1735*a9643ea8Slogwang         TAILQ_INSERT_TAIL(&_sock_list, link, _link_entry);
1736*a9643ea8Slogwang         _curr_link++;
1737*a9643ea8Slogwang     }
1738*a9643ea8Slogwang     else
1739*a9643ea8Slogwang     {
1740*a9643ea8Slogwang         link = TAILQ_FIRST(&_sock_list);
1741*a9643ea8Slogwang         TAILQ_REMOVE(&_sock_list, link, _link_entry);
1742*a9643ea8Slogwang         TAILQ_INSERT_TAIL(&_sock_list, link, _link_entry);
1743*a9643ea8Slogwang     }
1744*a9643ea8Slogwang 
1745*a9643ea8Slogwang     return link;
1746*a9643ea8Slogwang }
1747*a9643ea8Slogwang 
1748*a9643ea8Slogwang /**
1749*a9643ea8Slogwang  * @brief ��ʱ֪ͨ����, ��������·, ���������·����
1750*a9643ea8Slogwang  */
1751*a9643ea8Slogwang void CDestLinks::timer_notify()
1752*a9643ea8Slogwang {
1753*a9643ea8Slogwang     // 1. ����Ƿ��п��е���·, ɾ����
1754*a9643ea8Slogwang     uint64_t now = mt_time_ms();
1755*a9643ea8Slogwang     CSockLink* item = NULL;
1756*a9643ea8Slogwang     CSockLink* temp = NULL;
1757*a9643ea8Slogwang     TAILQ_FOREACH_SAFE(item, &_sock_list, _link_entry, temp)
1758*a9643ea8Slogwang     {
1759*a9643ea8Slogwang         if ((item->GetLastAccess() + this->_timeout) < now)
1760*a9643ea8Slogwang         {
1761*a9643ea8Slogwang             MTLOG_DEBUG("link timeout, last[%llu], now [%llu]", item->GetLastAccess(), now);
1762*a9643ea8Slogwang             item->Destroy();
1763*a9643ea8Slogwang         }
1764*a9643ea8Slogwang     }
1765*a9643ea8Slogwang 
1766*a9643ea8Slogwang     // 2. ����Ƿ�����Ч����·, û����ע��Ŀ����
1767*a9643ea8Slogwang     item = TAILQ_FIRST(&_sock_list);
1768*a9643ea8Slogwang     if (NULL == item)
1769*a9643ea8Slogwang     {
1770*a9643ea8Slogwang         MTLOG_DEBUG("dest links timeout, now [%llu]", now);
1771*a9643ea8Slogwang         CNetMgr::Instance()->DeleteDestLink(this);
1772*a9643ea8Slogwang         return;
1773*a9643ea8Slogwang     }
1774*a9643ea8Slogwang 
1775*a9643ea8Slogwang     // 3. ���¼��붨ʱ������
1776*a9643ea8Slogwang     this->StartTimer();
1777*a9643ea8Slogwang 
1778*a9643ea8Slogwang     return;
1779*a9643ea8Slogwang }
1780*a9643ea8Slogwang 
1781*a9643ea8Slogwang 
1782*a9643ea8Slogwang /**
1783*a9643ea8Slogwang  * @brief sessionȫ�ֹ�����
1784*a9643ea8Slogwang  * @return ȫ�־��ָ��
1785*a9643ea8Slogwang  */
1786*a9643ea8Slogwang CNetMgr* CNetMgr::_instance = NULL;
1787*a9643ea8Slogwang CNetMgr* CNetMgr::Instance (void)
1788*a9643ea8Slogwang {
1789*a9643ea8Slogwang     if (NULL == _instance)
1790*a9643ea8Slogwang     {
1791*a9643ea8Slogwang         _instance = new CNetMgr();
1792*a9643ea8Slogwang     }
1793*a9643ea8Slogwang 
1794*a9643ea8Slogwang     return _instance;
1795*a9643ea8Slogwang }
1796*a9643ea8Slogwang 
1797*a9643ea8Slogwang /**
1798*a9643ea8Slogwang  * @brief session����ȫ�ֵ����ٽӿ�
1799*a9643ea8Slogwang  */
1800*a9643ea8Slogwang void CNetMgr::Destroy()
1801*a9643ea8Slogwang {
1802*a9643ea8Slogwang     if( _instance != NULL )
1803*a9643ea8Slogwang     {
1804*a9643ea8Slogwang         delete _instance;
1805*a9643ea8Slogwang         _instance = NULL;
1806*a9643ea8Slogwang     }
1807*a9643ea8Slogwang }
1808*a9643ea8Slogwang 
1809*a9643ea8Slogwang // ��ѯ�Ƿ��Ѿ�����ͬһ��sid�Ķ���
1810*a9643ea8Slogwang CNetHandler* CNetMgr::FindNetItem(CNetHandler* key)
1811*a9643ea8Slogwang {
1812*a9643ea8Slogwang     if (NULL == this->_session_hash)
1813*a9643ea8Slogwang     {
1814*a9643ea8Slogwang         return NULL;
1815*a9643ea8Slogwang     }
1816*a9643ea8Slogwang 
1817*a9643ea8Slogwang     return (CNetHandler*)_session_hash->HashFind(key);
1818*a9643ea8Slogwang }
1819*a9643ea8Slogwang 
1820*a9643ea8Slogwang // ע��һ��item, �Ȳ�ѯ�����, ��֤�޳�ͻ
1821*a9643ea8Slogwang void CNetMgr::InsertNetItem(CNetHandler* item)
1822*a9643ea8Slogwang {
1823*a9643ea8Slogwang     if (NULL == this->_session_hash)
1824*a9643ea8Slogwang     {
1825*a9643ea8Slogwang         return;
1826*a9643ea8Slogwang     }
1827*a9643ea8Slogwang 
1828*a9643ea8Slogwang     int32_t ret = _session_hash->HashInsert(item);
1829*a9643ea8Slogwang     if (ret < 0)
1830*a9643ea8Slogwang     {
1831*a9643ea8Slogwang         MTLOG_ERROR("session insert failed, ret %d", ret);
1832*a9643ea8Slogwang     }
1833*a9643ea8Slogwang 
1834*a9643ea8Slogwang     return;
1835*a9643ea8Slogwang }
1836*a9643ea8Slogwang 
1837*a9643ea8Slogwang // �Ƴ�һ��item����
1838*a9643ea8Slogwang void CNetMgr::RemoveNetItem(CNetHandler* item)
1839*a9643ea8Slogwang {
1840*a9643ea8Slogwang     CNetHandler* handler =  this->FindNetItem(item);
1841*a9643ea8Slogwang     if (NULL == handler)
1842*a9643ea8Slogwang     {
1843*a9643ea8Slogwang         return;
1844*a9643ea8Slogwang     }
1845*a9643ea8Slogwang 
1846*a9643ea8Slogwang     _session_hash->HashRemove(handler);
1847*a9643ea8Slogwang }
1848*a9643ea8Slogwang 
1849*a9643ea8Slogwang // ��ѯ�Ƿ��Ѿ�����ͬһ��sid�Ķ���
1850*a9643ea8Slogwang CDestLinks* CNetMgr::FindDestLink(CDestLinks* key)
1851*a9643ea8Slogwang {
1852*a9643ea8Slogwang     if (NULL == this->_ip_hash)
1853*a9643ea8Slogwang     {
1854*a9643ea8Slogwang         return NULL;
1855*a9643ea8Slogwang     }
1856*a9643ea8Slogwang 
1857*a9643ea8Slogwang     return (CDestLinks*)_ip_hash->HashFind(key);
1858*a9643ea8Slogwang }
1859*a9643ea8Slogwang 
1860*a9643ea8Slogwang // ע��һ��item, �Ȳ�ѯ�����, ��֤�޳�ͻ
1861*a9643ea8Slogwang void CNetMgr::InsertDestLink(CDestLinks* item)
1862*a9643ea8Slogwang {
1863*a9643ea8Slogwang     if (NULL == this->_ip_hash)
1864*a9643ea8Slogwang     {
1865*a9643ea8Slogwang         return;
1866*a9643ea8Slogwang     }
1867*a9643ea8Slogwang 
1868*a9643ea8Slogwang     int32_t ret = _ip_hash->HashInsert(item);
1869*a9643ea8Slogwang     if (ret < 0)
1870*a9643ea8Slogwang     {
1871*a9643ea8Slogwang         MTLOG_ERROR("ip dest insert failed, ret %d", ret);
1872*a9643ea8Slogwang     }
1873*a9643ea8Slogwang 
1874*a9643ea8Slogwang     return;
1875*a9643ea8Slogwang }
1876*a9643ea8Slogwang 
1877*a9643ea8Slogwang // �Ƴ�һ��item����
1878*a9643ea8Slogwang void CNetMgr::RemoveDestLink(CDestLinks* item)
1879*a9643ea8Slogwang {
1880*a9643ea8Slogwang     CDestLinks* handler =  this->FindDestLink(item);
1881*a9643ea8Slogwang     if (NULL == handler)
1882*a9643ea8Slogwang     {
1883*a9643ea8Slogwang         return;
1884*a9643ea8Slogwang     }
1885*a9643ea8Slogwang 
1886*a9643ea8Slogwang     _ip_hash->HashRemove(handler);
1887*a9643ea8Slogwang }
1888*a9643ea8Slogwang 
1889*a9643ea8Slogwang 
1890*a9643ea8Slogwang // ��ѯ����һ��Ŀ��ip��links�ڵ�
1891*a9643ea8Slogwang CDestLinks* CNetMgr::FindCreateDest(CDestLinks* key)
1892*a9643ea8Slogwang {
1893*a9643ea8Slogwang     CDestLinks* dest = this->FindDestLink(key);
1894*a9643ea8Slogwang     if (dest != NULL)
1895*a9643ea8Slogwang     {
1896*a9643ea8Slogwang         MTLOG_DEBUG("dest links reuse ok");
1897*a9643ea8Slogwang         return dest;
1898*a9643ea8Slogwang     }
1899*a9643ea8Slogwang 
1900*a9643ea8Slogwang     dest = this->AllocDestLink();
1901*a9643ea8Slogwang     if (NULL == dest)
1902*a9643ea8Slogwang     {
1903*a9643ea8Slogwang         MTLOG_ERROR("dest links alloc failed, log it");
1904*a9643ea8Slogwang         return NULL;
1905*a9643ea8Slogwang     }
1906*a9643ea8Slogwang 
1907*a9643ea8Slogwang     dest->CopyKeyInfo(key);
1908*a9643ea8Slogwang     dest->StartTimer();
1909*a9643ea8Slogwang     this->InsertDestLink(dest);
1910*a9643ea8Slogwang 
1911*a9643ea8Slogwang     return dest;
1912*a9643ea8Slogwang }
1913*a9643ea8Slogwang 
1914*a9643ea8Slogwang 
1915*a9643ea8Slogwang // ��ѯ����һ��Ŀ��ip��links�ڵ�
1916*a9643ea8Slogwang void CNetMgr::DeleteDestLink(CDestLinks* dst)
1917*a9643ea8Slogwang {
1918*a9643ea8Slogwang     this->RemoveDestLink(dst);
1919*a9643ea8Slogwang     dst->Reset();  // ֱ��freeǰ���� reset
1920*a9643ea8Slogwang     this->FreeDestLink(dst);
1921*a9643ea8Slogwang }
1922*a9643ea8Slogwang 
1923*a9643ea8Slogwang 
1924*a9643ea8Slogwang // ���캯��ʵ��
1925*a9643ea8Slogwang CNetMgr::CNetMgr()
1926*a9643ea8Slogwang {
1927*a9643ea8Slogwang     sk_buffer_mng_init(&_tcp_pool, 60, 4096);
1928*a9643ea8Slogwang     sk_buffer_mng_init(&_udp_pool, 60, SK_DFLT_BUFF_SIZE);
1929*a9643ea8Slogwang 
1930*a9643ea8Slogwang     _ip_hash = new HashList(100000);
1931*a9643ea8Slogwang     _session_hash = new HashList(100000);
1932*a9643ea8Slogwang }
1933*a9643ea8Slogwang 
1934*a9643ea8Slogwang 
1935*a9643ea8Slogwang // ��������ʵ��
1936*a9643ea8Slogwang CNetMgr::~CNetMgr()
1937*a9643ea8Slogwang {
1938*a9643ea8Slogwang     // �������е�dest��Դ
1939*a9643ea8Slogwang     if (_ip_hash != NULL)
1940*a9643ea8Slogwang     {
1941*a9643ea8Slogwang         HashKey* hash_item = _ip_hash->HashGetFirst();
1942*a9643ea8Slogwang         while (hash_item)
1943*a9643ea8Slogwang         {
1944*a9643ea8Slogwang             delete hash_item;
1945*a9643ea8Slogwang             hash_item = _ip_hash->HashGetFirst();
1946*a9643ea8Slogwang         }
1947*a9643ea8Slogwang 
1948*a9643ea8Slogwang         delete _ip_hash;
1949*a9643ea8Slogwang         _ip_hash = NULL;
1950*a9643ea8Slogwang     }
1951*a9643ea8Slogwang 
1952*a9643ea8Slogwang     // �������е� netitem ��Դ
1953*a9643ea8Slogwang     if (_session_hash != NULL)
1954*a9643ea8Slogwang     {
1955*a9643ea8Slogwang         HashKey* hash_item = _session_hash->HashGetFirst();
1956*a9643ea8Slogwang         while (hash_item)
1957*a9643ea8Slogwang         {
1958*a9643ea8Slogwang             delete hash_item;
1959*a9643ea8Slogwang             hash_item = _session_hash->HashGetFirst();
1960*a9643ea8Slogwang         }
1961*a9643ea8Slogwang 
1962*a9643ea8Slogwang         delete _session_hash;
1963*a9643ea8Slogwang         _session_hash = NULL;
1964*a9643ea8Slogwang     }
1965*a9643ea8Slogwang 
1966*a9643ea8Slogwang     // ����buff����Դ
1967*a9643ea8Slogwang     sk_buffer_mng_destroy(&_tcp_pool);
1968*a9643ea8Slogwang     sk_buffer_mng_destroy(&_udp_pool);
1969*a9643ea8Slogwang }
1970*a9643ea8Slogwang 
1971*a9643ea8Slogwang 
1972*a9643ea8Slogwang /**
1973*a9643ea8Slogwang  * @brief ������Դ��Ϣ
1974*a9643ea8Slogwang  */
1975*a9643ea8Slogwang void CNetMgr::RecycleObjs(uint64_t now)
1976*a9643ea8Slogwang {
1977*a9643ea8Slogwang     uint32_t now_s = (uint32_t)(now / 1000);
1978*a9643ea8Slogwang 
1979*a9643ea8Slogwang     recycle_sk_buffer(&_udp_pool, now_s);
1980*a9643ea8Slogwang     recycle_sk_buffer(&_tcp_pool, now_s);
1981*a9643ea8Slogwang 
1982*a9643ea8Slogwang     _net_item_pool.RecycleItem(now);
1983*a9643ea8Slogwang     _sock_link_pool.RecycleItem(now);
1984*a9643ea8Slogwang     _dest_ip_pool.RecycleItem(now);
1985*a9643ea8Slogwang }
1986*a9643ea8Slogwang 
1987*a9643ea8Slogwang 
1988