xref: /f-stack/app/micro_thread/mt_notify.cpp (revision a9643ea8)
1*a9643ea8Slogwang 
2*a9643ea8Slogwang /**
3*a9643ea8Slogwang  * Tencent is pleased to support the open source community by making MSEC available.
4*a9643ea8Slogwang  *
5*a9643ea8Slogwang  * Copyright (C) 2016 THL A29 Limited, a Tencent company. All rights reserved.
6*a9643ea8Slogwang  *
7*a9643ea8Slogwang  * Licensed under the GNU General Public License, Version 2.0 (the "License");
8*a9643ea8Slogwang  * you may not use this file except in compliance with the License. You may
9*a9643ea8Slogwang  * obtain a copy of the License at
10*a9643ea8Slogwang  *
11*a9643ea8Slogwang  *     https://opensource.org/licenses/GPL-2.0
12*a9643ea8Slogwang  *
13*a9643ea8Slogwang  * Unless required by applicable law or agreed to in writing, software distributed under the
14*a9643ea8Slogwang  * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
15*a9643ea8Slogwang  * either express or implied. See the License for the specific language governing permissions
16*a9643ea8Slogwang  * and limitations under the License.
17*a9643ea8Slogwang  */
18*a9643ea8Slogwang 
19*a9643ea8Slogwang 
20*a9643ea8Slogwang /**
21*a9643ea8Slogwang  *  @file mt_notify.cpp
22*a9643ea8Slogwang  *  @info ΢�̵߳���ע��������ʵ��
23*a9643ea8Slogwang  *  @time 20130924
24*a9643ea8Slogwang  **/
25*a9643ea8Slogwang #include <fcntl.h>
26*a9643ea8Slogwang #include <sys/types.h>
27*a9643ea8Slogwang #include <sys/socket.h>
28*a9643ea8Slogwang #include <netinet/in.h>
29*a9643ea8Slogwang #include <arpa/inet.h>
30*a9643ea8Slogwang 
31*a9643ea8Slogwang #include "micro_thread.h"
32*a9643ea8Slogwang #include "mt_session.h"
33*a9643ea8Slogwang #include "mt_msg.h"
34*a9643ea8Slogwang #include "mt_notify.h"
35*a9643ea8Slogwang #include "mt_connection.h"
36*a9643ea8Slogwang #include "mt_sys_hook.h"
37*a9643ea8Slogwang #include "ff_hook.h"
38*a9643ea8Slogwang 
39*a9643ea8Slogwang using namespace std;
40*a9643ea8Slogwang using namespace NS_MICRO_THREAD;
41*a9643ea8Slogwang 
42*a9643ea8Slogwang 
43*a9643ea8Slogwang /**
44*a9643ea8Slogwang  * @brief ֪ͨ�������ȴ�״̬, ����ȴ�������
45*a9643ea8Slogwang  * @param proxy �����sessionģ��
46*a9643ea8Slogwang  */
47*a9643ea8Slogwang void ISessionNtfy::InsertWriteWait(SessionProxy* proxy)
48*a9643ea8Slogwang {
49*a9643ea8Slogwang     if (!proxy->_flag) {
50*a9643ea8Slogwang         TAILQ_INSERT_TAIL(&_write_list, proxy, _write_entry);
51*a9643ea8Slogwang         proxy->_flag = 1;
52*a9643ea8Slogwang     }
53*a9643ea8Slogwang }
54*a9643ea8Slogwang 
55*a9643ea8Slogwang /**
56*a9643ea8Slogwang  * @brief ֪ͨ�����Ƴ��ȴ�״̬
57*a9643ea8Slogwang  * @param proxy �����sessionģ��
58*a9643ea8Slogwang  */
59*a9643ea8Slogwang void ISessionNtfy::RemoveWriteWait(SessionProxy* proxy)
60*a9643ea8Slogwang {
61*a9643ea8Slogwang     if (proxy->_flag) {
62*a9643ea8Slogwang         TAILQ_REMOVE(&_write_list, proxy, _write_entry);
63*a9643ea8Slogwang         proxy->_flag = 0;
64*a9643ea8Slogwang     }
65*a9643ea8Slogwang }
66*a9643ea8Slogwang 
67*a9643ea8Slogwang /**
68*a9643ea8Slogwang  * @brief �۲���ģʽ, ֪ͨд�ȴ��߳�
69*a9643ea8Slogwang  * @info UDP����֪ͨÿ���߳�ִ��д����, TCP��Ҫ�Ŷ�д
70*a9643ea8Slogwang  */
71*a9643ea8Slogwang void UdpSessionNtfy::NotifyWriteWait()
72*a9643ea8Slogwang {
73*a9643ea8Slogwang     MtFrame* frame = MtFrame::Instance();
74*a9643ea8Slogwang     SessionProxy* proxy = NULL;
75*a9643ea8Slogwang     MicroThread* thread = NULL;
76*a9643ea8Slogwang     TAILQ_FOREACH(proxy, &_write_list, _write_entry)
77*a9643ea8Slogwang     {
78*a9643ea8Slogwang         proxy->SetRcvEvents(KQ_EVENT_WRITE);
79*a9643ea8Slogwang 
80*a9643ea8Slogwang         thread = proxy->GetOwnerThread();
81*a9643ea8Slogwang         if (thread && thread->HasFlag(MicroThread::IO_LIST))
82*a9643ea8Slogwang         {
83*a9643ea8Slogwang             frame->RemoveIoWait(thread);
84*a9643ea8Slogwang             frame->InsertRunable(thread);
85*a9643ea8Slogwang         }
86*a9643ea8Slogwang     }
87*a9643ea8Slogwang }
88*a9643ea8Slogwang 
89*a9643ea8Slogwang /**
90*a9643ea8Slogwang  *  @brief ����socket, �����ɶ��¼�
91*a9643ea8Slogwang  *  @return fd�ľ��, <0 ʧ��
92*a9643ea8Slogwang  */
93*a9643ea8Slogwang int UdpSessionNtfy::CreateSocket()
94*a9643ea8Slogwang {
95*a9643ea8Slogwang     // 1. UDP������, ÿ���´�SOCKET
96*a9643ea8Slogwang     int osfd = socket(AF_INET, SOCK_DGRAM, 0);
97*a9643ea8Slogwang     if (osfd < 0)
98*a9643ea8Slogwang     {
99*a9643ea8Slogwang         MTLOG_ERROR("socket create failed, errno %d(%s)", errno, strerror(errno));
100*a9643ea8Slogwang         return -1;
101*a9643ea8Slogwang     }
102*a9643ea8Slogwang 
103*a9643ea8Slogwang     // 2. ����������
104*a9643ea8Slogwang     int flags = 1;
105*a9643ea8Slogwang     if (ioctl(osfd, FIONBIO, &flags) < 0)
106*a9643ea8Slogwang     {
107*a9643ea8Slogwang         MTLOG_ERROR("socket unblock failed, errno %d(%s)", errno, strerror(errno));
108*a9643ea8Slogwang         close(osfd);
109*a9643ea8Slogwang         osfd = -1;
110*a9643ea8Slogwang         return -2;
111*a9643ea8Slogwang     }
112*a9643ea8Slogwang 
113*a9643ea8Slogwang     // ��ѡbindִ��, ���ñ���port��ִ��
114*a9643ea8Slogwang     if (_local_addr.sin_port != 0)
115*a9643ea8Slogwang     {
116*a9643ea8Slogwang         int ret = bind(osfd, (struct sockaddr *)&_local_addr, sizeof(_local_addr));
117*a9643ea8Slogwang         if (ret < 0)
118*a9643ea8Slogwang         {
119*a9643ea8Slogwang             MTLOG_ERROR("socket bind(%s:%d) failed, errno %d(%s)",  inet_ntoa(_local_addr.sin_addr),
120*a9643ea8Slogwang                     ntohs(_local_addr.sin_port), errno, strerror(errno));
121*a9643ea8Slogwang             close(osfd);
122*a9643ea8Slogwang             osfd = -1;
123*a9643ea8Slogwang             return -3;
124*a9643ea8Slogwang         }
125*a9643ea8Slogwang     }
126*a9643ea8Slogwang 
127*a9643ea8Slogwang     // 3. ���¹�����Ϣ, Ĭ��udp session ���� epollin
128*a9643ea8Slogwang     this->SetOsfd(osfd);
129*a9643ea8Slogwang     this->EnableInput();
130*a9643ea8Slogwang     MtFrame* frame = MtFrame::Instance();
131*a9643ea8Slogwang     frame->KqueueNtfyReg(osfd, this);
132*a9643ea8Slogwang     frame->KqueueCtrlAdd(osfd, KQ_EVENT_READ);
133*a9643ea8Slogwang 
134*a9643ea8Slogwang     return osfd;
135*a9643ea8Slogwang }
136*a9643ea8Slogwang 
137*a9643ea8Slogwang 
138*a9643ea8Slogwang /**
139*a9643ea8Slogwang  *  @brief �ر�socket, ֹͣ�����ɶ��¼�
140*a9643ea8Slogwang  */
141*a9643ea8Slogwang void UdpSessionNtfy::CloseSocket()
142*a9643ea8Slogwang {
143*a9643ea8Slogwang     int osfd = this->GetOsfd();
144*a9643ea8Slogwang     if (osfd > 0)
145*a9643ea8Slogwang     {
146*a9643ea8Slogwang         MtFrame* frame = MtFrame::Instance();
147*a9643ea8Slogwang         frame->KqueueCtrlDel(osfd, KQ_EVENT_READ);
148*a9643ea8Slogwang         frame->KqueueNtfyReg(osfd, NULL);
149*a9643ea8Slogwang         this->DisableInput();
150*a9643ea8Slogwang         this->SetOsfd(-1);
151*a9643ea8Slogwang         close(osfd);
152*a9643ea8Slogwang     }
153*a9643ea8Slogwang }
154*a9643ea8Slogwang 
155*a9643ea8Slogwang 
156*a9643ea8Slogwang /**
157*a9643ea8Slogwang  *  @brief �ɶ��¼�֪ͨ�ӿ�, ����֪ͨ������ܻ��ƻ�����, ���÷���ֵ����
158*a9643ea8Slogwang  *  @return 0 ��fd�ɼ������������¼�; !=0 ��fd�������ص�����
159*a9643ea8Slogwang  */
160*a9643ea8Slogwang int UdpSessionNtfy::InputNotify()
161*a9643ea8Slogwang {
162*a9643ea8Slogwang     while (1)
163*a9643ea8Slogwang     {
164*a9643ea8Slogwang         int ret = 0;
165*a9643ea8Slogwang         int have_rcv_len = 0;
166*a9643ea8Slogwang 
167*a9643ea8Slogwang         // 1. ��ȡ�հ�������, ����ѡ��δ�����������buff
168*a9643ea8Slogwang         if (!_msg_buff) {
169*a9643ea8Slogwang             _msg_buff = MsgBuffPool::Instance()->GetMsgBuf(this->GetMsgBuffSize());
170*a9643ea8Slogwang             if (NULL == _msg_buff) {
171*a9643ea8Slogwang                 MTLOG_ERROR("Get memory failed, size %d, wait next time", this->GetMsgBuffSize());
172*a9643ea8Slogwang                 return 0;
173*a9643ea8Slogwang             }
174*a9643ea8Slogwang             _msg_buff->SetBuffType(BUFF_RECV);
175*a9643ea8Slogwang         }
176*a9643ea8Slogwang         char* buff = (char*)_msg_buff->GetMsgBuff();
177*a9643ea8Slogwang 
178*a9643ea8Slogwang         // 2. ��ȡsocket, �հ�����
179*a9643ea8Slogwang         int osfd = this->GetOsfd();
180*a9643ea8Slogwang         struct sockaddr_in  from;
181*a9643ea8Slogwang         socklen_t fromlen = sizeof(from);
182*a9643ea8Slogwang         mt_hook_syscall(recvfrom);
183*a9643ea8Slogwang         ret = ff_hook_recvfrom(osfd, buff, _msg_buff->GetMaxLen(),
184*a9643ea8Slogwang                        0, (struct sockaddr*)&from, &fromlen);
185*a9643ea8Slogwang         if (ret < 0)
186*a9643ea8Slogwang         {
187*a9643ea8Slogwang             if ((errno == EINTR) || (errno == EAGAIN) || (errno == EINPROGRESS))
188*a9643ea8Slogwang             {
189*a9643ea8Slogwang                 return 0;
190*a9643ea8Slogwang             }
191*a9643ea8Slogwang             else
192*a9643ea8Slogwang             {
193*a9643ea8Slogwang                 MTLOG_ERROR("recv error, fd %d", osfd);
194*a9643ea8Slogwang                 return 0;  // ϵͳ����, UDP �ݲ��ر�
195*a9643ea8Slogwang             }
196*a9643ea8Slogwang         }
197*a9643ea8Slogwang         else if (ret == 0)
198*a9643ea8Slogwang         {
199*a9643ea8Slogwang             MTLOG_DEBUG("remote close connection, fd %d", osfd);
200*a9643ea8Slogwang             return 0;  // �Զ˹ر�, UDP �ݲ��ر�
201*a9643ea8Slogwang         }
202*a9643ea8Slogwang         else
203*a9643ea8Slogwang         {
204*a9643ea8Slogwang             have_rcv_len = ret;
205*a9643ea8Slogwang             _msg_buff->SetHaveRcvLen(have_rcv_len);
206*a9643ea8Slogwang             _msg_buff->SetMsgLen(have_rcv_len);
207*a9643ea8Slogwang         }
208*a9643ea8Slogwang 
209*a9643ea8Slogwang         // 3. �����Ϣ��������, ��ȡsessionid
210*a9643ea8Slogwang         int sessionid = 0;
211*a9643ea8Slogwang         ret = this->GetSessionId(buff, have_rcv_len, sessionid);
212*a9643ea8Slogwang         if (ret <= 0)
213*a9643ea8Slogwang         {
214*a9643ea8Slogwang             MTLOG_ERROR("recv get session failed, len %d, fd %d, drop it",
215*a9643ea8Slogwang                        have_rcv_len, osfd);
216*a9643ea8Slogwang             MsgBuffPool::Instance()->FreeMsgBuf(_msg_buff);
217*a9643ea8Slogwang             _msg_buff = NULL;
218*a9643ea8Slogwang             return 0;
219*a9643ea8Slogwang         }
220*a9643ea8Slogwang 
221*a9643ea8Slogwang         // 4. ӳ���ѯthread���, ����handle���, ���ö��¼�����, �ҽ�msgbuff
222*a9643ea8Slogwang         ISession* session = SessionMgr::Instance()->FindSession(sessionid);
223*a9643ea8Slogwang         if (NULL == session)
224*a9643ea8Slogwang         {
225*a9643ea8Slogwang             MT_ATTR_API(350403, 1); // session �����ѳ�ʱ
226*a9643ea8Slogwang             MTLOG_DEBUG("session %d, not find, maybe timeout, drop pkg", sessionid);
227*a9643ea8Slogwang             MsgBuffPool::Instance()->FreeMsgBuf(_msg_buff);
228*a9643ea8Slogwang             _msg_buff = NULL;
229*a9643ea8Slogwang             return 0;
230*a9643ea8Slogwang         }
231*a9643ea8Slogwang 
232*a9643ea8Slogwang         // 5. �ҽ�recvbuff, �����߳�
233*a9643ea8Slogwang         IMtConnection* conn = session->GetSessionConn();
234*a9643ea8Slogwang         MicroThread* thread = session->GetOwnerThread();
235*a9643ea8Slogwang         if (!thread || !conn || !conn->GetNtfyObj())
236*a9643ea8Slogwang         {
237*a9643ea8Slogwang             MTLOG_ERROR("sesson obj %p, no thread ptr %p, no conn %p wrong",
238*a9643ea8Slogwang                     session, thread, conn);
239*a9643ea8Slogwang             MsgBuffPool::Instance()->FreeMsgBuf(_msg_buff);
240*a9643ea8Slogwang             _msg_buff = NULL;
241*a9643ea8Slogwang             return 0;
242*a9643ea8Slogwang         }
243*a9643ea8Slogwang         MtMsgBuf* msg = conn->GetMtMsgBuff();
244*a9643ea8Slogwang         if (msg) {
245*a9643ea8Slogwang             MsgBuffPool::Instance()->FreeMsgBuf(msg);
246*a9643ea8Slogwang         }
247*a9643ea8Slogwang         conn->SetMtMsgBuff(_msg_buff);
248*a9643ea8Slogwang         _msg_buff = NULL;
249*a9643ea8Slogwang 
250*a9643ea8Slogwang         conn->GetNtfyObj()->SetRcvEvents(KQ_EVENT_READ);
251*a9643ea8Slogwang         if (thread->HasFlag(MicroThread::IO_LIST))
252*a9643ea8Slogwang         {
253*a9643ea8Slogwang             MtFrame* frame = MtFrame::Instance();
254*a9643ea8Slogwang             frame->RemoveIoWait(thread);
255*a9643ea8Slogwang             frame->InsertRunable(thread);
256*a9643ea8Slogwang         }
257*a9643ea8Slogwang     }
258*a9643ea8Slogwang 
259*a9643ea8Slogwang     return 0;
260*a9643ea8Slogwang }
261*a9643ea8Slogwang 
262*a9643ea8Slogwang /**
263*a9643ea8Slogwang  *  @brief ��д�¼�֪ͨ�ӿ�, ����֪ͨ������ܻ��ƻ�����, ���÷���ֵ����
264*a9643ea8Slogwang  *  @return 0 ��fd�ɼ������������¼�; !=0 ��fd�������ص�����
265*a9643ea8Slogwang  */
266*a9643ea8Slogwang int UdpSessionNtfy::OutputNotify()
267*a9643ea8Slogwang {
268*a9643ea8Slogwang     NotifyWriteWait();
269*a9643ea8Slogwang     return 0;
270*a9643ea8Slogwang }
271*a9643ea8Slogwang 
272*a9643ea8Slogwang /**
273*a9643ea8Slogwang  *  @brief �쳣֪ͨ�ӿ�, �ر�fd����, thread�ȴ�����ʱ
274*a9643ea8Slogwang  *  @return ���Է���ֵ, ���������¼�����
275*a9643ea8Slogwang  */
276*a9643ea8Slogwang int UdpSessionNtfy::HangupNotify()
277*a9643ea8Slogwang {
278*a9643ea8Slogwang     // 1. ����epoll ctl�����¼�
279*a9643ea8Slogwang     MtFrame* frame = MtFrame::Instance();
280*a9643ea8Slogwang     frame->KqueueCtrlDel(this->GetOsfd(), this->GetEvents());
281*a9643ea8Slogwang 
282*a9643ea8Slogwang     MTLOG_ERROR("sesson obj %p, recv error event. fd %d", this, this->GetOsfd());
283*a9643ea8Slogwang 
284*a9643ea8Slogwang     // 2. ���´�socket
285*a9643ea8Slogwang     CloseSocket();
286*a9643ea8Slogwang 
287*a9643ea8Slogwang     // 3. �ؼ���epoll listen
288*a9643ea8Slogwang     CreateSocket();
289*a9643ea8Slogwang 
290*a9643ea8Slogwang     return 0;
291*a9643ea8Slogwang }
292*a9643ea8Slogwang 
293*a9643ea8Slogwang /**
294*a9643ea8Slogwang  *  @brief ����epoll�����¼��Ļص��ӿ�, ������ʼ��EPOLLIN, ż��EPOLLOUT
295*a9643ea8Slogwang  *  @param args fd���ö����ָ��
296*a9643ea8Slogwang  *  @return 0 �ɹ�, < 0 ʧ��, Ҫ������ع�������ǰ״̬
297*a9643ea8Slogwang  *  @info  Ĭ���Ǽ����ɶ��¼���, ����ֻ�����д�¼��ļ���ɾ��
298*a9643ea8Slogwang  */
299*a9643ea8Slogwang int UdpSessionNtfy::KqueueCtlAdd(void* args)
300*a9643ea8Slogwang {
301*a9643ea8Slogwang     MtFrame* frame = MtFrame::Instance();
302*a9643ea8Slogwang     KqFdRef* fd_ref = (KqFdRef*)args;
303*a9643ea8Slogwang     //ASSERT(fd_ref != NULL);
304*a9643ea8Slogwang 
305*a9643ea8Slogwang     int osfd = this->GetOsfd();
306*a9643ea8Slogwang 
307*a9643ea8Slogwang     // ֪ͨ������Ҫ����, FD֪ͨ���������ϲ��Ḵ��, ��������ͻ���, �쳣log��¼
308*a9643ea8Slogwang     KqueuerObj* old_obj = fd_ref->GetNotifyObj();
309*a9643ea8Slogwang     if ((old_obj != NULL) && (old_obj != this))
310*a9643ea8Slogwang     {
311*a9643ea8Slogwang         MTLOG_ERROR("epfd ref conflict, fd: %d, old: %p, now: %p", osfd, old_obj, this);
312*a9643ea8Slogwang         return -1;
313*a9643ea8Slogwang     }
314*a9643ea8Slogwang 
315*a9643ea8Slogwang     // ���ÿ�ܵ�epoll ctl�ӿ�, ����epoll ctrlϸ��
316*a9643ea8Slogwang     if (!frame->KqueueCtrlAdd(osfd, KQ_EVENT_WRITE))
317*a9643ea8Slogwang     {
318*a9643ea8Slogwang         MTLOG_ERROR("epfd ref add failed, log");
319*a9643ea8Slogwang         return -2;
320*a9643ea8Slogwang     }
321*a9643ea8Slogwang     this->EnableOutput();
322*a9643ea8Slogwang 
323*a9643ea8Slogwang     return 0;
324*a9643ea8Slogwang }
325*a9643ea8Slogwang 
326*a9643ea8Slogwang /**
327*a9643ea8Slogwang  *  @brief ����epoll�����¼��Ļص��ӿ�, ������ʼ��EPOLLIN, ż��EPOLLOUT
328*a9643ea8Slogwang  *  @param args fd���ö����ָ��
329*a9643ea8Slogwang  *  @return 0 �ɹ�, < 0 ʧ��, Ҫ������ع�������ǰ״̬
330*a9643ea8Slogwang  */
331*a9643ea8Slogwang int UdpSessionNtfy::KqueueCtlDel(void* args)
332*a9643ea8Slogwang {
333*a9643ea8Slogwang     MtFrame* frame = MtFrame::Instance();
334*a9643ea8Slogwang     KqFdRef* fd_ref = (KqFdRef*)args;
335*a9643ea8Slogwang     //ASSERT(fd_ref != NULL);
336*a9643ea8Slogwang 
337*a9643ea8Slogwang     int osfd = this->GetOsfd();
338*a9643ea8Slogwang 
339*a9643ea8Slogwang     // ֪ͨ������Ҫ����, FD֪ͨ���������ϲ��Ḵ��, ��������ͻ���, �쳣log��¼
340*a9643ea8Slogwang     KqueuerObj* old_obj = fd_ref->GetNotifyObj();
341*a9643ea8Slogwang     if (old_obj != this)
342*a9643ea8Slogwang     {
343*a9643ea8Slogwang         MTLOG_ERROR("epfd ref conflict, fd: %d, old: %p, now: %p", osfd, old_obj, this);
344*a9643ea8Slogwang         return -1;
345*a9643ea8Slogwang     }
346*a9643ea8Slogwang 
347*a9643ea8Slogwang     // ���ÿ�ܵ�epoll ctl�ӿ�, ����epoll ctrlϸ��
348*a9643ea8Slogwang     if (!frame->KqueueCtrlDel(osfd, KQ_EVENT_WRITE))
349*a9643ea8Slogwang     {
350*a9643ea8Slogwang         MTLOG_ERROR("epfd ref del failed, log");
351*a9643ea8Slogwang         return -2;
352*a9643ea8Slogwang     }
353*a9643ea8Slogwang     this->DisableOutput();
354*a9643ea8Slogwang 
355*a9643ea8Slogwang     return 0;
356*a9643ea8Slogwang 
357*a9643ea8Slogwang }
358*a9643ea8Slogwang 
359*a9643ea8Slogwang 
360*a9643ea8Slogwang /**
361*a9643ea8Slogwang  *  @brief �ɶ��¼�֪ͨ�ӿ�, ����֪ͨ������ܻ��ƻ�����, ���÷���ֵ����
362*a9643ea8Slogwang  *  @return 0 ��fd�ɼ������������¼�; !=0 ��fd�������ص�����
363*a9643ea8Slogwang  */
364*a9643ea8Slogwang int TcpKeepNtfy::InputNotify()
365*a9643ea8Slogwang {
366*a9643ea8Slogwang     KeepaliveClose();
367*a9643ea8Slogwang     return -1;
368*a9643ea8Slogwang }
369*a9643ea8Slogwang 
370*a9643ea8Slogwang /**
371*a9643ea8Slogwang  *  @brief ��д�¼�֪ͨ�ӿ�, ����֪ͨ������ܻ��ƻ�����, ���÷���ֵ����
372*a9643ea8Slogwang  *  @return 0 ��fd�ɼ������������¼�; !=0 ��fd�������ص�����
373*a9643ea8Slogwang  */
374*a9643ea8Slogwang int TcpKeepNtfy::OutputNotify()
375*a9643ea8Slogwang {
376*a9643ea8Slogwang     KeepaliveClose();
377*a9643ea8Slogwang     return -1;
378*a9643ea8Slogwang }
379*a9643ea8Slogwang 
380*a9643ea8Slogwang /**
381*a9643ea8Slogwang  *  @brief �쳣֪ͨ�ӿ�
382*a9643ea8Slogwang  *  @return ���Է���ֵ, ���������¼�����
383*a9643ea8Slogwang  */
384*a9643ea8Slogwang int TcpKeepNtfy::HangupNotify()
385*a9643ea8Slogwang {
386*a9643ea8Slogwang     KeepaliveClose();
387*a9643ea8Slogwang     return -1;
388*a9643ea8Slogwang }
389*a9643ea8Slogwang 
390*a9643ea8Slogwang 
391*a9643ea8Slogwang /**
392*a9643ea8Slogwang  *  @brief ����ʵ�����ӹرղ���
393*a9643ea8Slogwang  */
394*a9643ea8Slogwang void TcpKeepNtfy::KeepaliveClose()
395*a9643ea8Slogwang {
396*a9643ea8Slogwang     if (_keep_conn) {
397*a9643ea8Slogwang         MTLOG_DEBUG("remote close, fd %d, close connection", _fd);
398*a9643ea8Slogwang         ConnectionMgr::Instance()->CloseIdleTcpKeep(_keep_conn);
399*a9643ea8Slogwang     } else {
400*a9643ea8Slogwang         MTLOG_ERROR("_keep_conn ptr null, error");
401*a9643ea8Slogwang     }
402*a9643ea8Slogwang }
403*a9643ea8Slogwang 
404*a9643ea8Slogwang 
405*a9643ea8Slogwang /**
406*a9643ea8Slogwang  * @brief sessionȫ�ֹ�����
407*a9643ea8Slogwang  * @return ȫ�־��ָ��
408*a9643ea8Slogwang  */
409*a9643ea8Slogwang NtfyObjMgr* NtfyObjMgr::_instance = NULL;
410*a9643ea8Slogwang NtfyObjMgr* NtfyObjMgr::Instance (void)
411*a9643ea8Slogwang {
412*a9643ea8Slogwang     if (NULL == _instance)
413*a9643ea8Slogwang     {
414*a9643ea8Slogwang         _instance = new NtfyObjMgr;
415*a9643ea8Slogwang     }
416*a9643ea8Slogwang 
417*a9643ea8Slogwang     return _instance;
418*a9643ea8Slogwang }
419*a9643ea8Slogwang 
420*a9643ea8Slogwang /**
421*a9643ea8Slogwang  * @brief session����ȫ�ֵ����ٽӿ�
422*a9643ea8Slogwang  */
423*a9643ea8Slogwang void NtfyObjMgr::Destroy()
424*a9643ea8Slogwang {
425*a9643ea8Slogwang     if( _instance != NULL )
426*a9643ea8Slogwang     {
427*a9643ea8Slogwang         delete _instance;
428*a9643ea8Slogwang         _instance = NULL;
429*a9643ea8Slogwang     }
430*a9643ea8Slogwang }
431*a9643ea8Slogwang 
432*a9643ea8Slogwang /**
433*a9643ea8Slogwang  * @brief ��Ϣbuff�Ĺ��캯��
434*a9643ea8Slogwang  */
435*a9643ea8Slogwang NtfyObjMgr::NtfyObjMgr()
436*a9643ea8Slogwang {
437*a9643ea8Slogwang }
438*a9643ea8Slogwang 
439*a9643ea8Slogwang /**
440*a9643ea8Slogwang  * @brief ��������, ��������Դ, ������������
441*a9643ea8Slogwang  */
442*a9643ea8Slogwang NtfyObjMgr::~NtfyObjMgr()
443*a9643ea8Slogwang {
444*a9643ea8Slogwang }
445*a9643ea8Slogwang 
446*a9643ea8Slogwang /**
447*a9643ea8Slogwang  * @brief ע�᳤����session��Ϣ
448*a9643ea8Slogwang  * @param session_name �����ӵı�ʶ, ÿ�����Ӵ���һ��session��װ��ʽ
449*a9643ea8Slogwang  * @param session �����Ӷ���ָ��, ������������
450*a9643ea8Slogwang  * @return 0 �ɹ�, < 0 ʧ��
451*a9643ea8Slogwang  */
452*a9643ea8Slogwang int NtfyObjMgr::RegisterSession(int session_name, ISessionNtfy* session)
453*a9643ea8Slogwang {
454*a9643ea8Slogwang     if (session_name <= 0 || NULL == session) {
455*a9643ea8Slogwang         MTLOG_ERROR("session %d, register %p failed", session_name, session);
456*a9643ea8Slogwang         return -1;
457*a9643ea8Slogwang     }
458*a9643ea8Slogwang 
459*a9643ea8Slogwang     SessionMap::iterator it = _session_map.find(session_name);
460*a9643ea8Slogwang     if (it != _session_map.end())
461*a9643ea8Slogwang     {
462*a9643ea8Slogwang         MTLOG_ERROR("session %d, register %p already", session_name, session);
463*a9643ea8Slogwang         return -2;
464*a9643ea8Slogwang     }
465*a9643ea8Slogwang 
466*a9643ea8Slogwang     _session_map.insert(SessionMap::value_type(session_name, session));
467*a9643ea8Slogwang 
468*a9643ea8Slogwang     return 0;
469*a9643ea8Slogwang }
470*a9643ea8Slogwang 
471*a9643ea8Slogwang /**
472*a9643ea8Slogwang  * @brief ��ȡע�᳤����session��Ϣ
473*a9643ea8Slogwang  * @param session_name �����ӵı�ʶ, ÿ�����Ӵ���һ��session��װ��ʽ
474*a9643ea8Slogwang  * @return ������ָ��, ʧ��ΪNULL
475*a9643ea8Slogwang  */
476*a9643ea8Slogwang ISessionNtfy* NtfyObjMgr::GetNameSession(int session_name)
477*a9643ea8Slogwang {
478*a9643ea8Slogwang     SessionMap::iterator it = _session_map.find(session_name);
479*a9643ea8Slogwang     if (it != _session_map.end())
480*a9643ea8Slogwang     {
481*a9643ea8Slogwang         return it->second;
482*a9643ea8Slogwang     }
483*a9643ea8Slogwang     else
484*a9643ea8Slogwang     {
485*a9643ea8Slogwang         return NULL;
486*a9643ea8Slogwang     }
487*a9643ea8Slogwang }
488*a9643ea8Slogwang 
489*a9643ea8Slogwang /**
490*a9643ea8Slogwang  * @brief ��ȡͨ��֪ͨ����, ���߳�֪ͨ������session֪ͨ�������
491*a9643ea8Slogwang  * @param type ����, �߳�֪ͨ���ͣ�UDP/TCP SESSION֪ͨ��
492*a9643ea8Slogwang  * @param session_name proxyģ��,һ����ȡsession����
493*a9643ea8Slogwang  * @return ֪ͨ�����ָ��, ʧ��ΪNULL
494*a9643ea8Slogwang  */
495*a9643ea8Slogwang KqueuerObj* NtfyObjMgr::GetNtfyObj(int type, int session_name)
496*a9643ea8Slogwang {
497*a9643ea8Slogwang     KqueuerObj* obj = NULL;
498*a9643ea8Slogwang     SessionProxy* proxy = NULL;
499*a9643ea8Slogwang 
500*a9643ea8Slogwang     switch (type)
501*a9643ea8Slogwang     {
502*a9643ea8Slogwang         case NTFY_OBJ_THREAD:
503*a9643ea8Slogwang             obj = _fd_ntfy_pool.AllocPtr();
504*a9643ea8Slogwang             break;
505*a9643ea8Slogwang 
506*a9643ea8Slogwang         case NTFY_OBJ_SESSION:
507*a9643ea8Slogwang             proxy = _udp_proxy_pool.AllocPtr();
508*a9643ea8Slogwang             obj = proxy;
509*a9643ea8Slogwang             break;
510*a9643ea8Slogwang 
511*a9643ea8Slogwang         case NTFY_OBJ_KEEPALIVE:    // no need get this now
512*a9643ea8Slogwang             break;
513*a9643ea8Slogwang 
514*a9643ea8Slogwang         default:
515*a9643ea8Slogwang             break;
516*a9643ea8Slogwang     }
517*a9643ea8Slogwang 
518*a9643ea8Slogwang     // ��ȡ�ײ�ij����Ӷ���, ����������ʵ�ʵ�֪ͨ����
519*a9643ea8Slogwang     if (proxy) {
520*a9643ea8Slogwang         ISessionNtfy* ntfy = this->GetNameSession(session_name);
521*a9643ea8Slogwang         if (!ntfy) {
522*a9643ea8Slogwang             MTLOG_ERROR("ntfy get session name(%d) failed", session_name);
523*a9643ea8Slogwang             this->FreeNtfyObj(proxy);
524*a9643ea8Slogwang             obj = NULL;
525*a9643ea8Slogwang         } else {
526*a9643ea8Slogwang             proxy->SetRealNtfyObj(ntfy);
527*a9643ea8Slogwang         }
528*a9643ea8Slogwang     }
529*a9643ea8Slogwang 
530*a9643ea8Slogwang     return obj;
531*a9643ea8Slogwang 
532*a9643ea8Slogwang }
533*a9643ea8Slogwang 
534*a9643ea8Slogwang /**
535*a9643ea8Slogwang  * @brief �ͷ�֪ͨ����ָ��
536*a9643ea8Slogwang  * @param obj ֪ͨ����
537*a9643ea8Slogwang  */
538*a9643ea8Slogwang void NtfyObjMgr::FreeNtfyObj(KqueuerObj* obj)
539*a9643ea8Slogwang {
540*a9643ea8Slogwang     SessionProxy* proxy = NULL;
541*a9643ea8Slogwang     if (!obj) {
542*a9643ea8Slogwang         return;
543*a9643ea8Slogwang     }
544*a9643ea8Slogwang 
545*a9643ea8Slogwang     int type = obj->GetNtfyType();
546*a9643ea8Slogwang     obj->Reset();
547*a9643ea8Slogwang 
548*a9643ea8Slogwang     switch (type)
549*a9643ea8Slogwang     {
550*a9643ea8Slogwang         case NTFY_OBJ_THREAD:
551*a9643ea8Slogwang             return _fd_ntfy_pool.FreePtr(obj);
552*a9643ea8Slogwang             break;
553*a9643ea8Slogwang 
554*a9643ea8Slogwang         case NTFY_OBJ_SESSION:
555*a9643ea8Slogwang             proxy = dynamic_cast<SessionProxy*>(obj);
556*a9643ea8Slogwang             return _udp_proxy_pool.FreePtr(proxy);
557*a9643ea8Slogwang             break;
558*a9643ea8Slogwang 
559*a9643ea8Slogwang         case NTFY_OBJ_KEEPALIVE:
560*a9643ea8Slogwang             break;
561*a9643ea8Slogwang 
562*a9643ea8Slogwang         default:
563*a9643ea8Slogwang             break;
564*a9643ea8Slogwang     }
565*a9643ea8Slogwang 
566*a9643ea8Slogwang     delete obj;
567*a9643ea8Slogwang     return;
568*a9643ea8Slogwang }
569*a9643ea8Slogwang 
570*a9643ea8Slogwang 
571*a9643ea8Slogwang 
572