xref: /f-stack/app/micro_thread/mt_notify.cpp (revision 35a81399)
1a9643ea8Slogwang 
2a9643ea8Slogwang /**
3a9643ea8Slogwang  * Tencent is pleased to support the open source community by making MSEC available.
4a9643ea8Slogwang  *
5a9643ea8Slogwang  * Copyright (C) 2016 THL A29 Limited, a Tencent company. All rights reserved.
6a9643ea8Slogwang  *
7a9643ea8Slogwang  * Licensed under the GNU General Public License, Version 2.0 (the "License");
8a9643ea8Slogwang  * you may not use this file except in compliance with the License. You may
9a9643ea8Slogwang  * obtain a copy of the License at
10a9643ea8Slogwang  *
11a9643ea8Slogwang  *     https://opensource.org/licenses/GPL-2.0
12a9643ea8Slogwang  *
13a9643ea8Slogwang  * Unless required by applicable law or agreed to in writing, software distributed under the
14a9643ea8Slogwang  * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
15a9643ea8Slogwang  * either express or implied. See the License for the specific language governing permissions
16a9643ea8Slogwang  * and limitations under the License.
17a9643ea8Slogwang  */
18a9643ea8Slogwang 
19a9643ea8Slogwang 
20a9643ea8Slogwang /**
21a9643ea8Slogwang  *  @file mt_notify.cpp
22a9643ea8Slogwang  *  @time 20130924
23a9643ea8Slogwang  **/
24a9643ea8Slogwang #include <fcntl.h>
25a9643ea8Slogwang #include <sys/types.h>
26a9643ea8Slogwang #include <sys/socket.h>
27a9643ea8Slogwang #include <netinet/in.h>
28a9643ea8Slogwang #include <arpa/inet.h>
29a9643ea8Slogwang 
30a9643ea8Slogwang #include "micro_thread.h"
31a9643ea8Slogwang #include "mt_session.h"
32a9643ea8Slogwang #include "mt_msg.h"
33a9643ea8Slogwang #include "mt_notify.h"
34a9643ea8Slogwang #include "mt_connection.h"
35a9643ea8Slogwang #include "mt_sys_hook.h"
36a9643ea8Slogwang #include "ff_hook.h"
37a9643ea8Slogwang 
38a9643ea8Slogwang using namespace std;
39a9643ea8Slogwang using namespace NS_MICRO_THREAD;
40a9643ea8Slogwang 
InsertWriteWait(SessionProxy * proxy)41a9643ea8Slogwang void ISessionNtfy::InsertWriteWait(SessionProxy* proxy)
42a9643ea8Slogwang {
43a9643ea8Slogwang     if (!proxy->_flag) {
44a9643ea8Slogwang         TAILQ_INSERT_TAIL(&_write_list, proxy, _write_entry);
45a9643ea8Slogwang         proxy->_flag = 1;
46a9643ea8Slogwang     }
47a9643ea8Slogwang }
48a9643ea8Slogwang 
RemoveWriteWait(SessionProxy * proxy)49a9643ea8Slogwang void ISessionNtfy::RemoveWriteWait(SessionProxy* proxy)
50a9643ea8Slogwang {
51a9643ea8Slogwang     if (proxy->_flag) {
52a9643ea8Slogwang         TAILQ_REMOVE(&_write_list, proxy, _write_entry);
53a9643ea8Slogwang         proxy->_flag = 0;
54a9643ea8Slogwang     }
55a9643ea8Slogwang }
56a9643ea8Slogwang 
NotifyWriteWait()57a9643ea8Slogwang void UdpSessionNtfy::NotifyWriteWait()
58a9643ea8Slogwang {
59a9643ea8Slogwang     MtFrame* frame = MtFrame::Instance();
60a9643ea8Slogwang     SessionProxy* proxy = NULL;
61a9643ea8Slogwang     MicroThread* thread = NULL;
62a9643ea8Slogwang     TAILQ_FOREACH(proxy, &_write_list, _write_entry)
63a9643ea8Slogwang     {
64a9643ea8Slogwang         proxy->SetRcvEvents(KQ_EVENT_WRITE);
65a9643ea8Slogwang 
66a9643ea8Slogwang         thread = proxy->GetOwnerThread();
67a9643ea8Slogwang         if (thread && thread->HasFlag(MicroThread::IO_LIST))
68a9643ea8Slogwang         {
69a9643ea8Slogwang             frame->RemoveIoWait(thread);
70a9643ea8Slogwang             frame->InsertRunable(thread);
71a9643ea8Slogwang         }
72a9643ea8Slogwang     }
73a9643ea8Slogwang }
74a9643ea8Slogwang 
CreateSocket()75a9643ea8Slogwang int UdpSessionNtfy::CreateSocket()
76a9643ea8Slogwang {
77a9643ea8Slogwang     int osfd = socket(AF_INET, SOCK_DGRAM, 0);
78a9643ea8Slogwang     if (osfd < 0)
79a9643ea8Slogwang     {
80a9643ea8Slogwang         MTLOG_ERROR("socket create failed, errno %d(%s)", errno, strerror(errno));
81a9643ea8Slogwang         return -1;
82a9643ea8Slogwang     }
83a9643ea8Slogwang 
84a9643ea8Slogwang     int flags = 1;
85a9643ea8Slogwang     if (ioctl(osfd, FIONBIO, &flags) < 0)
86a9643ea8Slogwang     {
87a9643ea8Slogwang         MTLOG_ERROR("socket unblock failed, errno %d(%s)", errno, strerror(errno));
88a9643ea8Slogwang         close(osfd);
89a9643ea8Slogwang         osfd = -1;
90a9643ea8Slogwang         return -2;
91a9643ea8Slogwang     }
92a9643ea8Slogwang 
93a9643ea8Slogwang     if (_local_addr.sin_port != 0)
94a9643ea8Slogwang     {
95a9643ea8Slogwang         int ret = bind(osfd, (struct sockaddr *)&_local_addr, sizeof(_local_addr));
96a9643ea8Slogwang         if (ret < 0)
97a9643ea8Slogwang         {
98a9643ea8Slogwang             MTLOG_ERROR("socket bind(%s:%d) failed, errno %d(%s)",  inet_ntoa(_local_addr.sin_addr),
99a9643ea8Slogwang                     ntohs(_local_addr.sin_port), errno, strerror(errno));
100a9643ea8Slogwang             close(osfd);
101a9643ea8Slogwang             osfd = -1;
102a9643ea8Slogwang             return -3;
103a9643ea8Slogwang         }
104a9643ea8Slogwang     }
105a9643ea8Slogwang 
106a9643ea8Slogwang     this->SetOsfd(osfd);
107a9643ea8Slogwang     this->EnableInput();
108a9643ea8Slogwang     MtFrame* frame = MtFrame::Instance();
109a9643ea8Slogwang     frame->KqueueNtfyReg(osfd, this);
110a9643ea8Slogwang     frame->KqueueCtrlAdd(osfd, KQ_EVENT_READ);
111a9643ea8Slogwang 
112a9643ea8Slogwang     return osfd;
113a9643ea8Slogwang }
114a9643ea8Slogwang 
CloseSocket()115a9643ea8Slogwang void UdpSessionNtfy::CloseSocket()
116a9643ea8Slogwang {
117a9643ea8Slogwang     int osfd = this->GetOsfd();
118a9643ea8Slogwang     if (osfd > 0)
119a9643ea8Slogwang     {
120a9643ea8Slogwang         MtFrame* frame = MtFrame::Instance();
121a9643ea8Slogwang         frame->KqueueCtrlDel(osfd, KQ_EVENT_READ);
122a9643ea8Slogwang         frame->KqueueNtfyReg(osfd, NULL);
123a9643ea8Slogwang         this->DisableInput();
124a9643ea8Slogwang         this->SetOsfd(-1);
125a9643ea8Slogwang         close(osfd);
126a9643ea8Slogwang     }
127a9643ea8Slogwang }
128a9643ea8Slogwang 
InputNotify()129a9643ea8Slogwang int UdpSessionNtfy::InputNotify()
130a9643ea8Slogwang {
131a9643ea8Slogwang     while (1)
132a9643ea8Slogwang     {
133a9643ea8Slogwang         int ret = 0;
134a9643ea8Slogwang         int have_rcv_len = 0;
135a9643ea8Slogwang 
136a9643ea8Slogwang         if (!_msg_buff) {
137a9643ea8Slogwang             _msg_buff = MsgBuffPool::Instance()->GetMsgBuf(this->GetMsgBuffSize());
138a9643ea8Slogwang             if (NULL == _msg_buff) {
139a9643ea8Slogwang                 MTLOG_ERROR("Get memory failed, size %d, wait next time", this->GetMsgBuffSize());
140a9643ea8Slogwang                 return 0;
141a9643ea8Slogwang             }
142a9643ea8Slogwang             _msg_buff->SetBuffType(BUFF_RECV);
143a9643ea8Slogwang         }
144a9643ea8Slogwang         char* buff = (char*)_msg_buff->GetMsgBuff();
145a9643ea8Slogwang 
146a9643ea8Slogwang         int osfd = this->GetOsfd();
147a9643ea8Slogwang         struct sockaddr_in  from;
148a9643ea8Slogwang         socklen_t fromlen = sizeof(from);
149a9643ea8Slogwang         mt_hook_syscall(recvfrom);
150a9643ea8Slogwang         ret = ff_hook_recvfrom(osfd, buff, _msg_buff->GetMaxLen(),
151a9643ea8Slogwang                        0, (struct sockaddr*)&from, &fromlen);
152a9643ea8Slogwang         if (ret < 0)
153a9643ea8Slogwang         {
154a9643ea8Slogwang             if ((errno == EINTR) || (errno == EAGAIN) || (errno == EINPROGRESS))
155a9643ea8Slogwang             {
156a9643ea8Slogwang                 return 0;
157a9643ea8Slogwang             }
158a9643ea8Slogwang             else
159a9643ea8Slogwang             {
160a9643ea8Slogwang                 MTLOG_ERROR("recv error, fd %d", osfd);
161*35a81399Slogwang                 return 0;
162a9643ea8Slogwang             }
163a9643ea8Slogwang         }
164a9643ea8Slogwang         else if (ret == 0)
165a9643ea8Slogwang         {
166a9643ea8Slogwang             MTLOG_DEBUG("remote close connection, fd %d", osfd);
167*35a81399Slogwang             return 0;
168a9643ea8Slogwang         }
169a9643ea8Slogwang         else
170a9643ea8Slogwang         {
171a9643ea8Slogwang             have_rcv_len = ret;
172a9643ea8Slogwang             _msg_buff->SetHaveRcvLen(have_rcv_len);
173a9643ea8Slogwang             _msg_buff->SetMsgLen(have_rcv_len);
174a9643ea8Slogwang         }
175a9643ea8Slogwang 
176a9643ea8Slogwang         int sessionid = 0;
177a9643ea8Slogwang         ret = this->GetSessionId(buff, have_rcv_len, sessionid);
178a9643ea8Slogwang         if (ret <= 0)
179a9643ea8Slogwang         {
180a9643ea8Slogwang             MTLOG_ERROR("recv get session failed, len %d, fd %d, drop it",
181a9643ea8Slogwang                        have_rcv_len, osfd);
182a9643ea8Slogwang             MsgBuffPool::Instance()->FreeMsgBuf(_msg_buff);
183a9643ea8Slogwang             _msg_buff = NULL;
184a9643ea8Slogwang             return 0;
185a9643ea8Slogwang         }
186a9643ea8Slogwang 
187a9643ea8Slogwang         ISession* session = SessionMgr::Instance()->FindSession(sessionid);
188a9643ea8Slogwang         if (NULL == session)
189a9643ea8Slogwang         {
190*35a81399Slogwang             MT_ATTR_API(350403, 1);
191a9643ea8Slogwang             MTLOG_DEBUG("session %d, not find, maybe timeout, drop pkg", sessionid);
192a9643ea8Slogwang             MsgBuffPool::Instance()->FreeMsgBuf(_msg_buff);
193a9643ea8Slogwang             _msg_buff = NULL;
194a9643ea8Slogwang             return 0;
195a9643ea8Slogwang         }
196a9643ea8Slogwang 
197a9643ea8Slogwang         IMtConnection* conn = session->GetSessionConn();
198a9643ea8Slogwang         MicroThread* thread = session->GetOwnerThread();
199a9643ea8Slogwang         if (!thread || !conn || !conn->GetNtfyObj())
200a9643ea8Slogwang         {
201a9643ea8Slogwang             MTLOG_ERROR("sesson obj %p, no thread ptr %p, no conn %p wrong",
202a9643ea8Slogwang                     session, thread, conn);
203a9643ea8Slogwang             MsgBuffPool::Instance()->FreeMsgBuf(_msg_buff);
204a9643ea8Slogwang             _msg_buff = NULL;
205a9643ea8Slogwang             return 0;
206a9643ea8Slogwang         }
207a9643ea8Slogwang         MtMsgBuf* msg = conn->GetMtMsgBuff();
208a9643ea8Slogwang         if (msg) {
209a9643ea8Slogwang             MsgBuffPool::Instance()->FreeMsgBuf(msg);
210a9643ea8Slogwang         }
211a9643ea8Slogwang         conn->SetMtMsgBuff(_msg_buff);
212a9643ea8Slogwang         _msg_buff = NULL;
213a9643ea8Slogwang 
214a9643ea8Slogwang         conn->GetNtfyObj()->SetRcvEvents(KQ_EVENT_READ);
215a9643ea8Slogwang         if (thread->HasFlag(MicroThread::IO_LIST))
216a9643ea8Slogwang         {
217a9643ea8Slogwang             MtFrame* frame = MtFrame::Instance();
218a9643ea8Slogwang             frame->RemoveIoWait(thread);
219a9643ea8Slogwang             frame->InsertRunable(thread);
220a9643ea8Slogwang         }
221a9643ea8Slogwang     }
222a9643ea8Slogwang 
223a9643ea8Slogwang     return 0;
224a9643ea8Slogwang }
225a9643ea8Slogwang 
OutputNotify()226a9643ea8Slogwang int UdpSessionNtfy::OutputNotify()
227a9643ea8Slogwang {
228a9643ea8Slogwang     NotifyWriteWait();
229a9643ea8Slogwang     return 0;
230a9643ea8Slogwang }
231a9643ea8Slogwang 
HangupNotify()232a9643ea8Slogwang int UdpSessionNtfy::HangupNotify()
233a9643ea8Slogwang {
234a9643ea8Slogwang     MtFrame* frame = MtFrame::Instance();
235a9643ea8Slogwang     frame->KqueueCtrlDel(this->GetOsfd(), this->GetEvents());
236a9643ea8Slogwang 
237a9643ea8Slogwang     MTLOG_ERROR("sesson obj %p, recv error event. fd %d", this, this->GetOsfd());
238a9643ea8Slogwang 
239a9643ea8Slogwang     CloseSocket();
240a9643ea8Slogwang 
241a9643ea8Slogwang     CreateSocket();
242a9643ea8Slogwang 
243a9643ea8Slogwang     return 0;
244a9643ea8Slogwang }
245a9643ea8Slogwang 
KqueueCtlAdd(void * args)246a9643ea8Slogwang int UdpSessionNtfy::KqueueCtlAdd(void* args)
247a9643ea8Slogwang {
248a9643ea8Slogwang     MtFrame* frame = MtFrame::Instance();
249a9643ea8Slogwang     KqFdRef* fd_ref = (KqFdRef*)args;
250a9643ea8Slogwang     //ASSERT(fd_ref != NULL);
251a9643ea8Slogwang 
252a9643ea8Slogwang     int osfd = this->GetOsfd();
253a9643ea8Slogwang 
254a9643ea8Slogwang     KqueuerObj* old_obj = fd_ref->GetNotifyObj();
255a9643ea8Slogwang     if ((old_obj != NULL) && (old_obj != this))
256a9643ea8Slogwang     {
257a9643ea8Slogwang         MTLOG_ERROR("epfd ref conflict, fd: %d, old: %p, now: %p", osfd, old_obj, this);
258a9643ea8Slogwang         return -1;
259a9643ea8Slogwang     }
260a9643ea8Slogwang 
261a9643ea8Slogwang     if (!frame->KqueueCtrlAdd(osfd, KQ_EVENT_WRITE))
262a9643ea8Slogwang     {
263a9643ea8Slogwang         MTLOG_ERROR("epfd ref add failed, log");
264a9643ea8Slogwang         return -2;
265a9643ea8Slogwang     }
266a9643ea8Slogwang     this->EnableOutput();
267a9643ea8Slogwang 
268a9643ea8Slogwang     return 0;
269a9643ea8Slogwang }
270a9643ea8Slogwang 
KqueueCtlDel(void * args)271a9643ea8Slogwang int UdpSessionNtfy::KqueueCtlDel(void* args)
272a9643ea8Slogwang {
273a9643ea8Slogwang     MtFrame* frame = MtFrame::Instance();
274a9643ea8Slogwang     KqFdRef* fd_ref = (KqFdRef*)args;
275a9643ea8Slogwang     //ASSERT(fd_ref != NULL);
276a9643ea8Slogwang 
277a9643ea8Slogwang     int osfd = this->GetOsfd();
278a9643ea8Slogwang 
279a9643ea8Slogwang     KqueuerObj* old_obj = fd_ref->GetNotifyObj();
280a9643ea8Slogwang     if (old_obj != this)
281a9643ea8Slogwang     {
282a9643ea8Slogwang         MTLOG_ERROR("epfd ref conflict, fd: %d, old: %p, now: %p", osfd, old_obj, this);
283a9643ea8Slogwang         return -1;
284a9643ea8Slogwang     }
285a9643ea8Slogwang 
286a9643ea8Slogwang     if (!frame->KqueueCtrlDel(osfd, KQ_EVENT_WRITE))
287a9643ea8Slogwang     {
288a9643ea8Slogwang         MTLOG_ERROR("epfd ref del failed, log");
289a9643ea8Slogwang         return -2;
290a9643ea8Slogwang     }
291a9643ea8Slogwang     this->DisableOutput();
292a9643ea8Slogwang 
293a9643ea8Slogwang     return 0;
294a9643ea8Slogwang 
295a9643ea8Slogwang }
296a9643ea8Slogwang 
InputNotify()297a9643ea8Slogwang int TcpKeepNtfy::InputNotify()
298a9643ea8Slogwang {
299a9643ea8Slogwang     KeepaliveClose();
300a9643ea8Slogwang     return -1;
301a9643ea8Slogwang }
302a9643ea8Slogwang 
OutputNotify()303a9643ea8Slogwang int TcpKeepNtfy::OutputNotify()
304a9643ea8Slogwang {
305a9643ea8Slogwang     KeepaliveClose();
306a9643ea8Slogwang     return -1;
307a9643ea8Slogwang }
308a9643ea8Slogwang 
HangupNotify()309a9643ea8Slogwang int TcpKeepNtfy::HangupNotify()
310a9643ea8Slogwang {
311a9643ea8Slogwang     KeepaliveClose();
312a9643ea8Slogwang     return -1;
313a9643ea8Slogwang }
314a9643ea8Slogwang 
KeepaliveClose()315a9643ea8Slogwang void TcpKeepNtfy::KeepaliveClose()
316a9643ea8Slogwang {
317a9643ea8Slogwang     if (_keep_conn) {
318a9643ea8Slogwang         MTLOG_DEBUG("remote close, fd %d, close connection", _fd);
319a9643ea8Slogwang         ConnectionMgr::Instance()->CloseIdleTcpKeep(_keep_conn);
320a9643ea8Slogwang     } else {
321a9643ea8Slogwang         MTLOG_ERROR("_keep_conn ptr null, error");
322a9643ea8Slogwang     }
323a9643ea8Slogwang }
324a9643ea8Slogwang 
325a9643ea8Slogwang NtfyObjMgr* NtfyObjMgr::_instance = NULL;
Instance(void)326a9643ea8Slogwang NtfyObjMgr* NtfyObjMgr::Instance (void)
327a9643ea8Slogwang {
328a9643ea8Slogwang     if (NULL == _instance)
329a9643ea8Slogwang     {
330a9643ea8Slogwang         _instance = new NtfyObjMgr;
331a9643ea8Slogwang     }
332a9643ea8Slogwang 
333a9643ea8Slogwang     return _instance;
334a9643ea8Slogwang }
335a9643ea8Slogwang 
Destroy()336a9643ea8Slogwang void NtfyObjMgr::Destroy()
337a9643ea8Slogwang {
338a9643ea8Slogwang     if( _instance != NULL )
339a9643ea8Slogwang     {
340a9643ea8Slogwang         delete _instance;
341a9643ea8Slogwang         _instance = NULL;
342a9643ea8Slogwang     }
343a9643ea8Slogwang }
344a9643ea8Slogwang 
NtfyObjMgr()345a9643ea8Slogwang NtfyObjMgr::NtfyObjMgr()
346a9643ea8Slogwang {
347a9643ea8Slogwang }
348a9643ea8Slogwang 
~NtfyObjMgr()349a9643ea8Slogwang NtfyObjMgr::~NtfyObjMgr()
350a9643ea8Slogwang {
351a9643ea8Slogwang }
352a9643ea8Slogwang 
RegisterSession(int session_name,ISessionNtfy * session)353a9643ea8Slogwang int NtfyObjMgr::RegisterSession(int session_name, ISessionNtfy* session)
354a9643ea8Slogwang {
355a9643ea8Slogwang     if (session_name <= 0 || NULL == session) {
356a9643ea8Slogwang         MTLOG_ERROR("session %d, register %p failed", session_name, session);
357a9643ea8Slogwang         return -1;
358a9643ea8Slogwang     }
359a9643ea8Slogwang 
360a9643ea8Slogwang     SessionMap::iterator it = _session_map.find(session_name);
361a9643ea8Slogwang     if (it != _session_map.end())
362a9643ea8Slogwang     {
363a9643ea8Slogwang         MTLOG_ERROR("session %d, register %p already", session_name, session);
364a9643ea8Slogwang         return -2;
365a9643ea8Slogwang     }
366a9643ea8Slogwang 
367a9643ea8Slogwang     _session_map.insert(SessionMap::value_type(session_name, session));
368a9643ea8Slogwang 
369a9643ea8Slogwang     return 0;
370a9643ea8Slogwang }
371a9643ea8Slogwang 
GetNameSession(int session_name)372a9643ea8Slogwang ISessionNtfy* NtfyObjMgr::GetNameSession(int session_name)
373a9643ea8Slogwang {
374a9643ea8Slogwang     SessionMap::iterator it = _session_map.find(session_name);
375a9643ea8Slogwang     if (it != _session_map.end())
376a9643ea8Slogwang     {
377a9643ea8Slogwang         return it->second;
378a9643ea8Slogwang     }
379a9643ea8Slogwang     else
380a9643ea8Slogwang     {
381a9643ea8Slogwang         return NULL;
382a9643ea8Slogwang     }
383a9643ea8Slogwang }
384a9643ea8Slogwang 
GetNtfyObj(int type,int session_name)385a9643ea8Slogwang KqueuerObj* NtfyObjMgr::GetNtfyObj(int type, int session_name)
386a9643ea8Slogwang {
387a9643ea8Slogwang     KqueuerObj* obj = NULL;
388a9643ea8Slogwang     SessionProxy* proxy = NULL;
389a9643ea8Slogwang 
390a9643ea8Slogwang     switch (type)
391a9643ea8Slogwang     {
392a9643ea8Slogwang         case NTFY_OBJ_THREAD:
393a9643ea8Slogwang             obj = _fd_ntfy_pool.AllocPtr();
394a9643ea8Slogwang             break;
395a9643ea8Slogwang 
396a9643ea8Slogwang         case NTFY_OBJ_SESSION:
397a9643ea8Slogwang             proxy = _udp_proxy_pool.AllocPtr();
398a9643ea8Slogwang             obj = proxy;
399a9643ea8Slogwang             break;
400a9643ea8Slogwang 
401a9643ea8Slogwang         case NTFY_OBJ_KEEPALIVE:    // no need get this now
402a9643ea8Slogwang             break;
403a9643ea8Slogwang 
404a9643ea8Slogwang         default:
405a9643ea8Slogwang             break;
406a9643ea8Slogwang     }
407a9643ea8Slogwang 
408a9643ea8Slogwang     if (proxy) {
409a9643ea8Slogwang         ISessionNtfy* ntfy = this->GetNameSession(session_name);
410a9643ea8Slogwang         if (!ntfy) {
411a9643ea8Slogwang             MTLOG_ERROR("ntfy get session name(%d) failed", session_name);
412a9643ea8Slogwang             this->FreeNtfyObj(proxy);
413a9643ea8Slogwang             obj = NULL;
414a9643ea8Slogwang         } else {
415a9643ea8Slogwang             proxy->SetRealNtfyObj(ntfy);
416a9643ea8Slogwang         }
417a9643ea8Slogwang     }
418a9643ea8Slogwang 
419a9643ea8Slogwang     return obj;
420a9643ea8Slogwang 
421a9643ea8Slogwang }
422a9643ea8Slogwang 
FreeNtfyObj(KqueuerObj * obj)423a9643ea8Slogwang void NtfyObjMgr::FreeNtfyObj(KqueuerObj* obj)
424a9643ea8Slogwang {
425a9643ea8Slogwang     SessionProxy* proxy = NULL;
426a9643ea8Slogwang     if (!obj) {
427a9643ea8Slogwang         return;
428a9643ea8Slogwang     }
429a9643ea8Slogwang 
430a9643ea8Slogwang     int type = obj->GetNtfyType();
431a9643ea8Slogwang     obj->Reset();
432a9643ea8Slogwang 
433a9643ea8Slogwang     switch (type)
434a9643ea8Slogwang     {
435a9643ea8Slogwang         case NTFY_OBJ_THREAD:
436a9643ea8Slogwang             return _fd_ntfy_pool.FreePtr(obj);
437a9643ea8Slogwang             break;
438a9643ea8Slogwang 
439a9643ea8Slogwang         case NTFY_OBJ_SESSION:
440a9643ea8Slogwang             proxy = dynamic_cast<SessionProxy*>(obj);
441a9643ea8Slogwang             return _udp_proxy_pool.FreePtr(proxy);
442a9643ea8Slogwang             break;
443a9643ea8Slogwang 
444a9643ea8Slogwang         case NTFY_OBJ_KEEPALIVE:
445a9643ea8Slogwang             break;
446a9643ea8Slogwang 
447a9643ea8Slogwang         default:
448a9643ea8Slogwang             break;
449a9643ea8Slogwang     }
450a9643ea8Slogwang 
451a9643ea8Slogwang     delete obj;
452a9643ea8Slogwang     return;
453a9643ea8Slogwang }
454