xref: /f-stack/app/micro_thread/mt_notify.cpp (revision 45438af0)
1 
2 /**
3  * Tencent is pleased to support the open source community by making MSEC available.
4  *
5  * Copyright (C) 2016 THL A29 Limited, a Tencent company. All rights reserved.
6  *
7  * Licensed under the GNU General Public License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License. You may
9  * obtain a copy of the License at
10  *
11  *     https://opensource.org/licenses/GPL-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software distributed under the
14  * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
15  * either express or implied. See the License for the specific language governing permissions
16  * and limitations under the License.
17  */
18 
19 
20 /**
21  *  @file mt_notify.cpp
22  *  @time 20130924
23  **/
24 #include <fcntl.h>
25 #include <sys/types.h>
26 #include <sys/socket.h>
27 #include <netinet/in.h>
28 #include <arpa/inet.h>
29 
30 #include "micro_thread.h"
31 #include "mt_session.h"
32 #include "mt_msg.h"
33 #include "mt_notify.h"
34 #include "mt_connection.h"
35 #include "mt_sys_hook.h"
36 #include "ff_hook.h"
37 
38 using namespace std;
39 using namespace NS_MICRO_THREAD;
40 
41 void ISessionNtfy::InsertWriteWait(SessionProxy* proxy)
42 {
43     if (!proxy->_flag) {
44         TAILQ_INSERT_TAIL(&_write_list, proxy, _write_entry);
45         proxy->_flag = 1;
46     }
47 }
48 
49 void ISessionNtfy::RemoveWriteWait(SessionProxy* proxy)
50 {
51     if (proxy->_flag) {
52         TAILQ_REMOVE(&_write_list, proxy, _write_entry);
53         proxy->_flag = 0;
54     }
55 }
56 
57 void UdpSessionNtfy::NotifyWriteWait()
58 {
59     MtFrame* frame = MtFrame::Instance();
60     SessionProxy* proxy = NULL;
61     MicroThread* thread = NULL;
62     TAILQ_FOREACH(proxy, &_write_list, _write_entry)
63     {
64         proxy->SetRcvEvents(KQ_EVENT_WRITE);
65 
66         thread = proxy->GetOwnerThread();
67         if (thread && thread->HasFlag(MicroThread::IO_LIST))
68         {
69             frame->RemoveIoWait(thread);
70             frame->InsertRunable(thread);
71         }
72     }
73 }
74 
75 int UdpSessionNtfy::CreateSocket()
76 {
77     int osfd = socket(AF_INET, SOCK_DGRAM, 0);
78     if (osfd < 0)
79     {
80         MTLOG_ERROR("socket create failed, errno %d(%s)", errno, strerror(errno));
81         return -1;
82     }
83 
84     int flags = 1;
85     if (ioctl(osfd, FIONBIO, &flags) < 0)
86     {
87         MTLOG_ERROR("socket unblock failed, errno %d(%s)", errno, strerror(errno));
88         close(osfd);
89         osfd = -1;
90         return -2;
91     }
92 
93     if (_local_addr.sin_port != 0)
94     {
95         int ret = bind(osfd, (struct sockaddr *)&_local_addr, sizeof(_local_addr));
96         if (ret < 0)
97         {
98             MTLOG_ERROR("socket bind(%s:%d) failed, errno %d(%s)",  inet_ntoa(_local_addr.sin_addr),
99                     ntohs(_local_addr.sin_port), errno, strerror(errno));
100             close(osfd);
101             osfd = -1;
102             return -3;
103         }
104     }
105 
106     this->SetOsfd(osfd);
107     this->EnableInput();
108     MtFrame* frame = MtFrame::Instance();
109     frame->KqueueNtfyReg(osfd, this);
110     frame->KqueueCtrlAdd(osfd, KQ_EVENT_READ);
111 
112     return osfd;
113 }
114 
115 void UdpSessionNtfy::CloseSocket()
116 {
117     int osfd = this->GetOsfd();
118     if (osfd > 0)
119     {
120         MtFrame* frame = MtFrame::Instance();
121         frame->KqueueCtrlDel(osfd, KQ_EVENT_READ);
122         frame->KqueueNtfyReg(osfd, NULL);
123         this->DisableInput();
124         this->SetOsfd(-1);
125         close(osfd);
126     }
127 }
128 
129 int UdpSessionNtfy::InputNotify()
130 {
131     while (1)
132     {
133         int ret = 0;
134         int have_rcv_len = 0;
135 
136         if (!_msg_buff) {
137             _msg_buff = MsgBuffPool::Instance()->GetMsgBuf(this->GetMsgBuffSize());
138             if (NULL == _msg_buff) {
139                 MTLOG_ERROR("Get memory failed, size %d, wait next time", this->GetMsgBuffSize());
140                 return 0;
141             }
142             _msg_buff->SetBuffType(BUFF_RECV);
143         }
144         char* buff = (char*)_msg_buff->GetMsgBuff();
145 
146         int osfd = this->GetOsfd();
147         struct sockaddr_in  from;
148         socklen_t fromlen = sizeof(from);
149         mt_hook_syscall(recvfrom);
150         ret = ff_hook_recvfrom(osfd, buff, _msg_buff->GetMaxLen(),
151                        0, (struct sockaddr*)&from, &fromlen);
152         if (ret < 0)
153         {
154             if ((errno == EINTR) || (errno == EAGAIN) || (errno == EINPROGRESS))
155             {
156                 return 0;
157             }
158             else
159             {
160                 MTLOG_ERROR("recv error, fd %d", osfd);
161                 return 0;
162             }
163         }
164         else if (ret == 0)
165         {
166             MTLOG_DEBUG("remote close connection, fd %d", osfd);
167             return 0;
168         }
169         else
170         {
171             have_rcv_len = ret;
172             _msg_buff->SetHaveRcvLen(have_rcv_len);
173             _msg_buff->SetMsgLen(have_rcv_len);
174         }
175 
176         int sessionid = 0;
177         ret = this->GetSessionId(buff, have_rcv_len, sessionid);
178         if (ret <= 0)
179         {
180             MTLOG_ERROR("recv get session failed, len %d, fd %d, drop it",
181                        have_rcv_len, osfd);
182             MsgBuffPool::Instance()->FreeMsgBuf(_msg_buff);
183             _msg_buff = NULL;
184             return 0;
185         }
186 
187         ISession* session = SessionMgr::Instance()->FindSession(sessionid);
188         if (NULL == session)
189         {
190             MT_ATTR_API(350403, 1);
191             MTLOG_DEBUG("session %d, not find, maybe timeout, drop pkg", sessionid);
192             MsgBuffPool::Instance()->FreeMsgBuf(_msg_buff);
193             _msg_buff = NULL;
194             return 0;
195         }
196 
197         IMtConnection* conn = session->GetSessionConn();
198         MicroThread* thread = session->GetOwnerThread();
199         if (!thread || !conn || !conn->GetNtfyObj())
200         {
201             MTLOG_ERROR("sesson obj %p, no thread ptr %p, no conn %p wrong",
202                     session, thread, conn);
203             MsgBuffPool::Instance()->FreeMsgBuf(_msg_buff);
204             _msg_buff = NULL;
205             return 0;
206         }
207         MtMsgBuf* msg = conn->GetMtMsgBuff();
208         if (msg) {
209             MsgBuffPool::Instance()->FreeMsgBuf(msg);
210         }
211         conn->SetMtMsgBuff(_msg_buff);
212         _msg_buff = NULL;
213 
214         conn->GetNtfyObj()->SetRcvEvents(KQ_EVENT_READ);
215         if (thread->HasFlag(MicroThread::IO_LIST))
216         {
217             MtFrame* frame = MtFrame::Instance();
218             frame->RemoveIoWait(thread);
219             frame->InsertRunable(thread);
220         }
221     }
222 
223     return 0;
224 }
225 
226 int UdpSessionNtfy::OutputNotify()
227 {
228     NotifyWriteWait();
229     return 0;
230 }
231 
232 int UdpSessionNtfy::HangupNotify()
233 {
234     MtFrame* frame = MtFrame::Instance();
235     frame->KqueueCtrlDel(this->GetOsfd(), this->GetEvents());
236 
237     MTLOG_ERROR("sesson obj %p, recv error event. fd %d", this, this->GetOsfd());
238 
239     CloseSocket();
240 
241     CreateSocket();
242 
243     return 0;
244 }
245 
246 int UdpSessionNtfy::KqueueCtlAdd(void* args)
247 {
248     MtFrame* frame = MtFrame::Instance();
249     KqFdRef* fd_ref = (KqFdRef*)args;
250     //ASSERT(fd_ref != NULL);
251 
252     int osfd = this->GetOsfd();
253 
254     KqueuerObj* old_obj = fd_ref->GetNotifyObj();
255     if ((old_obj != NULL) && (old_obj != this))
256     {
257         MTLOG_ERROR("epfd ref conflict, fd: %d, old: %p, now: %p", osfd, old_obj, this);
258         return -1;
259     }
260 
261     if (!frame->KqueueCtrlAdd(osfd, KQ_EVENT_WRITE))
262     {
263         MTLOG_ERROR("epfd ref add failed, log");
264         return -2;
265     }
266     this->EnableOutput();
267 
268     return 0;
269 }
270 
271 int UdpSessionNtfy::KqueueCtlDel(void* args)
272 {
273     MtFrame* frame = MtFrame::Instance();
274     KqFdRef* fd_ref = (KqFdRef*)args;
275     //ASSERT(fd_ref != NULL);
276 
277     int osfd = this->GetOsfd();
278 
279     KqueuerObj* old_obj = fd_ref->GetNotifyObj();
280     if (old_obj != this)
281     {
282         MTLOG_ERROR("epfd ref conflict, fd: %d, old: %p, now: %p", osfd, old_obj, this);
283         return -1;
284     }
285 
286     if (!frame->KqueueCtrlDel(osfd, KQ_EVENT_WRITE))
287     {
288         MTLOG_ERROR("epfd ref del failed, log");
289         return -2;
290     }
291     this->DisableOutput();
292 
293     return 0;
294 
295 }
296 
297 int TcpKeepNtfy::InputNotify()
298 {
299     KeepaliveClose();
300     return -1;
301 }
302 
303 int TcpKeepNtfy::OutputNotify()
304 {
305     KeepaliveClose();
306     return -1;
307 }
308 
309 int TcpKeepNtfy::HangupNotify()
310 {
311     KeepaliveClose();
312     return -1;
313 }
314 
315 void TcpKeepNtfy::KeepaliveClose()
316 {
317     if (_keep_conn) {
318         MTLOG_DEBUG("remote close, fd %d, close connection", _fd);
319         ConnectionMgr::Instance()->CloseIdleTcpKeep(_keep_conn);
320     } else {
321         MTLOG_ERROR("_keep_conn ptr null, error");
322     }
323 }
324 
325 NtfyObjMgr* NtfyObjMgr::_instance = NULL;
326 NtfyObjMgr* NtfyObjMgr::Instance (void)
327 {
328     if (NULL == _instance)
329     {
330         _instance = new NtfyObjMgr;
331     }
332 
333     return _instance;
334 }
335 
336 void NtfyObjMgr::Destroy()
337 {
338     if( _instance != NULL )
339     {
340         delete _instance;
341         _instance = NULL;
342     }
343 }
344 
345 NtfyObjMgr::NtfyObjMgr()
346 {
347 }
348 
349 NtfyObjMgr::~NtfyObjMgr()
350 {
351 }
352 
353 int NtfyObjMgr::RegisterSession(int session_name, ISessionNtfy* session)
354 {
355     if (session_name <= 0 || NULL == session) {
356         MTLOG_ERROR("session %d, register %p failed", session_name, session);
357         return -1;
358     }
359 
360     SessionMap::iterator it = _session_map.find(session_name);
361     if (it != _session_map.end())
362     {
363         MTLOG_ERROR("session %d, register %p already", session_name, session);
364         return -2;
365     }
366 
367     _session_map.insert(SessionMap::value_type(session_name, session));
368 
369     return 0;
370 }
371 
372 ISessionNtfy* NtfyObjMgr::GetNameSession(int session_name)
373 {
374     SessionMap::iterator it = _session_map.find(session_name);
375     if (it != _session_map.end())
376     {
377         return it->second;
378     }
379     else
380     {
381         return NULL;
382     }
383 }
384 
385 KqueuerObj* NtfyObjMgr::GetNtfyObj(int type, int session_name)
386 {
387     KqueuerObj* obj = NULL;
388     SessionProxy* proxy = NULL;
389 
390     switch (type)
391     {
392         case NTFY_OBJ_THREAD:
393             obj = _fd_ntfy_pool.AllocPtr();
394             break;
395 
396         case NTFY_OBJ_SESSION:
397             proxy = _udp_proxy_pool.AllocPtr();
398             obj = proxy;
399             break;
400 
401         case NTFY_OBJ_KEEPALIVE:    // no need get this now
402             break;
403 
404         default:
405             break;
406     }
407 
408     if (proxy) {
409         ISessionNtfy* ntfy = this->GetNameSession(session_name);
410         if (!ntfy) {
411             MTLOG_ERROR("ntfy get session name(%d) failed", session_name);
412             this->FreeNtfyObj(proxy);
413             obj = NULL;
414         } else {
415             proxy->SetRealNtfyObj(ntfy);
416         }
417     }
418 
419     return obj;
420 
421 }
422 
423 void NtfyObjMgr::FreeNtfyObj(KqueuerObj* obj)
424 {
425     SessionProxy* proxy = NULL;
426     if (!obj) {
427         return;
428     }
429 
430     int type = obj->GetNtfyType();
431     obj->Reset();
432 
433     switch (type)
434     {
435         case NTFY_OBJ_THREAD:
436             return _fd_ntfy_pool.FreePtr(obj);
437             break;
438 
439         case NTFY_OBJ_SESSION:
440             proxy = dynamic_cast<SessionProxy*>(obj);
441             return _udp_proxy_pool.FreePtr(proxy);
442             break;
443 
444         case NTFY_OBJ_KEEPALIVE:
445             break;
446 
447         default:
448             break;
449     }
450 
451     delete obj;
452     return;
453 }
454