xref: /f-stack/app/micro_thread/mt_notify.cpp (revision a9643ea8)
1 
2 /**
3  * Tencent is pleased to support the open source community by making MSEC available.
4  *
5  * Copyright (C) 2016 THL A29 Limited, a Tencent company. All rights reserved.
6  *
7  * Licensed under the GNU General Public License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License. You may
9  * obtain a copy of the License at
10  *
11  *     https://opensource.org/licenses/GPL-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software distributed under the
14  * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
15  * either express or implied. See the License for the specific language governing permissions
16  * and limitations under the License.
17  */
18 
19 
20 /**
21  *  @file mt_notify.cpp
22  *  @info ΢�̵߳���ע��������ʵ��
23  *  @time 20130924
24  **/
25 #include <fcntl.h>
26 #include <sys/types.h>
27 #include <sys/socket.h>
28 #include <netinet/in.h>
29 #include <arpa/inet.h>
30 
31 #include "micro_thread.h"
32 #include "mt_session.h"
33 #include "mt_msg.h"
34 #include "mt_notify.h"
35 #include "mt_connection.h"
36 #include "mt_sys_hook.h"
37 #include "ff_hook.h"
38 
39 using namespace std;
40 using namespace NS_MICRO_THREAD;
41 
42 
43 /**
44  * @brief ֪ͨ�������ȴ�״̬, ����ȴ�������
45  * @param proxy �����sessionģ��
46  */
47 void ISessionNtfy::InsertWriteWait(SessionProxy* proxy)
48 {
49     if (!proxy->_flag) {
50         TAILQ_INSERT_TAIL(&_write_list, proxy, _write_entry);
51         proxy->_flag = 1;
52     }
53 }
54 
55 /**
56  * @brief ֪ͨ�����Ƴ��ȴ�״̬
57  * @param proxy �����sessionģ��
58  */
59 void ISessionNtfy::RemoveWriteWait(SessionProxy* proxy)
60 {
61     if (proxy->_flag) {
62         TAILQ_REMOVE(&_write_list, proxy, _write_entry);
63         proxy->_flag = 0;
64     }
65 }
66 
67 /**
68  * @brief �۲���ģʽ, ֪ͨд�ȴ��߳�
69  * @info UDP����֪ͨÿ���߳�ִ��д����, TCP��Ҫ�Ŷ�д
70  */
71 void UdpSessionNtfy::NotifyWriteWait()
72 {
73     MtFrame* frame = MtFrame::Instance();
74     SessionProxy* proxy = NULL;
75     MicroThread* thread = NULL;
76     TAILQ_FOREACH(proxy, &_write_list, _write_entry)
77     {
78         proxy->SetRcvEvents(KQ_EVENT_WRITE);
79 
80         thread = proxy->GetOwnerThread();
81         if (thread && thread->HasFlag(MicroThread::IO_LIST))
82         {
83             frame->RemoveIoWait(thread);
84             frame->InsertRunable(thread);
85         }
86     }
87 }
88 
89 /**
90  *  @brief ����socket, �����ɶ��¼�
91  *  @return fd�ľ��, <0 ʧ��
92  */
93 int UdpSessionNtfy::CreateSocket()
94 {
95     // 1. UDP������, ÿ���´�SOCKET
96     int osfd = socket(AF_INET, SOCK_DGRAM, 0);
97     if (osfd < 0)
98     {
99         MTLOG_ERROR("socket create failed, errno %d(%s)", errno, strerror(errno));
100         return -1;
101     }
102 
103     // 2. ����������
104     int flags = 1;
105     if (ioctl(osfd, FIONBIO, &flags) < 0)
106     {
107         MTLOG_ERROR("socket unblock failed, errno %d(%s)", errno, strerror(errno));
108         close(osfd);
109         osfd = -1;
110         return -2;
111     }
112 
113     // ��ѡbindִ��, ���ñ���port��ִ��
114     if (_local_addr.sin_port != 0)
115     {
116         int ret = bind(osfd, (struct sockaddr *)&_local_addr, sizeof(_local_addr));
117         if (ret < 0)
118         {
119             MTLOG_ERROR("socket bind(%s:%d) failed, errno %d(%s)",  inet_ntoa(_local_addr.sin_addr),
120                     ntohs(_local_addr.sin_port), errno, strerror(errno));
121             close(osfd);
122             osfd = -1;
123             return -3;
124         }
125     }
126 
127     // 3. ���¹�����Ϣ, Ĭ��udp session ���� epollin
128     this->SetOsfd(osfd);
129     this->EnableInput();
130     MtFrame* frame = MtFrame::Instance();
131     frame->KqueueNtfyReg(osfd, this);
132     frame->KqueueCtrlAdd(osfd, KQ_EVENT_READ);
133 
134     return osfd;
135 }
136 
137 
138 /**
139  *  @brief �ر�socket, ֹͣ�����ɶ��¼�
140  */
141 void UdpSessionNtfy::CloseSocket()
142 {
143     int osfd = this->GetOsfd();
144     if (osfd > 0)
145     {
146         MtFrame* frame = MtFrame::Instance();
147         frame->KqueueCtrlDel(osfd, KQ_EVENT_READ);
148         frame->KqueueNtfyReg(osfd, NULL);
149         this->DisableInput();
150         this->SetOsfd(-1);
151         close(osfd);
152     }
153 }
154 
155 
156 /**
157  *  @brief �ɶ��¼�֪ͨ�ӿ�, ����֪ͨ������ܻ��ƻ�����, ���÷���ֵ����
158  *  @return 0 ��fd�ɼ������������¼�; !=0 ��fd�������ص�����
159  */
160 int UdpSessionNtfy::InputNotify()
161 {
162     while (1)
163     {
164         int ret = 0;
165         int have_rcv_len = 0;
166 
167         // 1. ��ȡ�հ�������, ����ѡ��δ�����������buff
168         if (!_msg_buff) {
169             _msg_buff = MsgBuffPool::Instance()->GetMsgBuf(this->GetMsgBuffSize());
170             if (NULL == _msg_buff) {
171                 MTLOG_ERROR("Get memory failed, size %d, wait next time", this->GetMsgBuffSize());
172                 return 0;
173             }
174             _msg_buff->SetBuffType(BUFF_RECV);
175         }
176         char* buff = (char*)_msg_buff->GetMsgBuff();
177 
178         // 2. ��ȡsocket, �հ�����
179         int osfd = this->GetOsfd();
180         struct sockaddr_in  from;
181         socklen_t fromlen = sizeof(from);
182         mt_hook_syscall(recvfrom);
183         ret = ff_hook_recvfrom(osfd, buff, _msg_buff->GetMaxLen(),
184                        0, (struct sockaddr*)&from, &fromlen);
185         if (ret < 0)
186         {
187             if ((errno == EINTR) || (errno == EAGAIN) || (errno == EINPROGRESS))
188             {
189                 return 0;
190             }
191             else
192             {
193                 MTLOG_ERROR("recv error, fd %d", osfd);
194                 return 0;  // ϵͳ����, UDP �ݲ��ر�
195             }
196         }
197         else if (ret == 0)
198         {
199             MTLOG_DEBUG("remote close connection, fd %d", osfd);
200             return 0;  // �Զ˹ر�, UDP �ݲ��ر�
201         }
202         else
203         {
204             have_rcv_len = ret;
205             _msg_buff->SetHaveRcvLen(have_rcv_len);
206             _msg_buff->SetMsgLen(have_rcv_len);
207         }
208 
209         // 3. �����Ϣ��������, ��ȡsessionid
210         int sessionid = 0;
211         ret = this->GetSessionId(buff, have_rcv_len, sessionid);
212         if (ret <= 0)
213         {
214             MTLOG_ERROR("recv get session failed, len %d, fd %d, drop it",
215                        have_rcv_len, osfd);
216             MsgBuffPool::Instance()->FreeMsgBuf(_msg_buff);
217             _msg_buff = NULL;
218             return 0;
219         }
220 
221         // 4. ӳ���ѯthread���, ����handle���, ���ö��¼�����, �ҽ�msgbuff
222         ISession* session = SessionMgr::Instance()->FindSession(sessionid);
223         if (NULL == session)
224         {
225             MT_ATTR_API(350403, 1); // session �����ѳ�ʱ
226             MTLOG_DEBUG("session %d, not find, maybe timeout, drop pkg", sessionid);
227             MsgBuffPool::Instance()->FreeMsgBuf(_msg_buff);
228             _msg_buff = NULL;
229             return 0;
230         }
231 
232         // 5. �ҽ�recvbuff, �����߳�
233         IMtConnection* conn = session->GetSessionConn();
234         MicroThread* thread = session->GetOwnerThread();
235         if (!thread || !conn || !conn->GetNtfyObj())
236         {
237             MTLOG_ERROR("sesson obj %p, no thread ptr %p, no conn %p wrong",
238                     session, thread, conn);
239             MsgBuffPool::Instance()->FreeMsgBuf(_msg_buff);
240             _msg_buff = NULL;
241             return 0;
242         }
243         MtMsgBuf* msg = conn->GetMtMsgBuff();
244         if (msg) {
245             MsgBuffPool::Instance()->FreeMsgBuf(msg);
246         }
247         conn->SetMtMsgBuff(_msg_buff);
248         _msg_buff = NULL;
249 
250         conn->GetNtfyObj()->SetRcvEvents(KQ_EVENT_READ);
251         if (thread->HasFlag(MicroThread::IO_LIST))
252         {
253             MtFrame* frame = MtFrame::Instance();
254             frame->RemoveIoWait(thread);
255             frame->InsertRunable(thread);
256         }
257     }
258 
259     return 0;
260 }
261 
262 /**
263  *  @brief ��д�¼�֪ͨ�ӿ�, ����֪ͨ������ܻ��ƻ�����, ���÷���ֵ����
264  *  @return 0 ��fd�ɼ������������¼�; !=0 ��fd�������ص�����
265  */
266 int UdpSessionNtfy::OutputNotify()
267 {
268     NotifyWriteWait();
269     return 0;
270 }
271 
272 /**
273  *  @brief �쳣֪ͨ�ӿ�, �ر�fd����, thread�ȴ�����ʱ
274  *  @return ���Է���ֵ, ���������¼�����
275  */
276 int UdpSessionNtfy::HangupNotify()
277 {
278     // 1. ����epoll ctl�����¼�
279     MtFrame* frame = MtFrame::Instance();
280     frame->KqueueCtrlDel(this->GetOsfd(), this->GetEvents());
281 
282     MTLOG_ERROR("sesson obj %p, recv error event. fd %d", this, this->GetOsfd());
283 
284     // 2. ���´�socket
285     CloseSocket();
286 
287     // 3. �ؼ���epoll listen
288     CreateSocket();
289 
290     return 0;
291 }
292 
293 /**
294  *  @brief ����epoll�����¼��Ļص��ӿ�, ������ʼ��EPOLLIN, ż��EPOLLOUT
295  *  @param args fd���ö����ָ��
296  *  @return 0 �ɹ�, < 0 ʧ��, Ҫ������ع�������ǰ״̬
297  *  @info  Ĭ���Ǽ����ɶ��¼���, ����ֻ�����д�¼��ļ���ɾ��
298  */
299 int UdpSessionNtfy::KqueueCtlAdd(void* args)
300 {
301     MtFrame* frame = MtFrame::Instance();
302     KqFdRef* fd_ref = (KqFdRef*)args;
303     //ASSERT(fd_ref != NULL);
304 
305     int osfd = this->GetOsfd();
306 
307     // ֪ͨ������Ҫ����, FD֪ͨ���������ϲ��Ḵ��, ��������ͻ���, �쳣log��¼
308     KqueuerObj* old_obj = fd_ref->GetNotifyObj();
309     if ((old_obj != NULL) && (old_obj != this))
310     {
311         MTLOG_ERROR("epfd ref conflict, fd: %d, old: %p, now: %p", osfd, old_obj, this);
312         return -1;
313     }
314 
315     // ���ÿ�ܵ�epoll ctl�ӿ�, ����epoll ctrlϸ��
316     if (!frame->KqueueCtrlAdd(osfd, KQ_EVENT_WRITE))
317     {
318         MTLOG_ERROR("epfd ref add failed, log");
319         return -2;
320     }
321     this->EnableOutput();
322 
323     return 0;
324 }
325 
326 /**
327  *  @brief ����epoll�����¼��Ļص��ӿ�, ������ʼ��EPOLLIN, ż��EPOLLOUT
328  *  @param args fd���ö����ָ��
329  *  @return 0 �ɹ�, < 0 ʧ��, Ҫ������ع�������ǰ״̬
330  */
331 int UdpSessionNtfy::KqueueCtlDel(void* args)
332 {
333     MtFrame* frame = MtFrame::Instance();
334     KqFdRef* fd_ref = (KqFdRef*)args;
335     //ASSERT(fd_ref != NULL);
336 
337     int osfd = this->GetOsfd();
338 
339     // ֪ͨ������Ҫ����, FD֪ͨ���������ϲ��Ḵ��, ��������ͻ���, �쳣log��¼
340     KqueuerObj* old_obj = fd_ref->GetNotifyObj();
341     if (old_obj != this)
342     {
343         MTLOG_ERROR("epfd ref conflict, fd: %d, old: %p, now: %p", osfd, old_obj, this);
344         return -1;
345     }
346 
347     // ���ÿ�ܵ�epoll ctl�ӿ�, ����epoll ctrlϸ��
348     if (!frame->KqueueCtrlDel(osfd, KQ_EVENT_WRITE))
349     {
350         MTLOG_ERROR("epfd ref del failed, log");
351         return -2;
352     }
353     this->DisableOutput();
354 
355     return 0;
356 
357 }
358 
359 
360 /**
361  *  @brief �ɶ��¼�֪ͨ�ӿ�, ����֪ͨ������ܻ��ƻ�����, ���÷���ֵ����
362  *  @return 0 ��fd�ɼ������������¼�; !=0 ��fd�������ص�����
363  */
364 int TcpKeepNtfy::InputNotify()
365 {
366     KeepaliveClose();
367     return -1;
368 }
369 
370 /**
371  *  @brief ��д�¼�֪ͨ�ӿ�, ����֪ͨ������ܻ��ƻ�����, ���÷���ֵ����
372  *  @return 0 ��fd�ɼ������������¼�; !=0 ��fd�������ص�����
373  */
374 int TcpKeepNtfy::OutputNotify()
375 {
376     KeepaliveClose();
377     return -1;
378 }
379 
380 /**
381  *  @brief �쳣֪ͨ�ӿ�
382  *  @return ���Է���ֵ, ���������¼�����
383  */
384 int TcpKeepNtfy::HangupNotify()
385 {
386     KeepaliveClose();
387     return -1;
388 }
389 
390 
391 /**
392  *  @brief ����ʵ�����ӹرղ���
393  */
394 void TcpKeepNtfy::KeepaliveClose()
395 {
396     if (_keep_conn) {
397         MTLOG_DEBUG("remote close, fd %d, close connection", _fd);
398         ConnectionMgr::Instance()->CloseIdleTcpKeep(_keep_conn);
399     } else {
400         MTLOG_ERROR("_keep_conn ptr null, error");
401     }
402 }
403 
404 
405 /**
406  * @brief sessionȫ�ֹ�����
407  * @return ȫ�־��ָ��
408  */
409 NtfyObjMgr* NtfyObjMgr::_instance = NULL;
410 NtfyObjMgr* NtfyObjMgr::Instance (void)
411 {
412     if (NULL == _instance)
413     {
414         _instance = new NtfyObjMgr;
415     }
416 
417     return _instance;
418 }
419 
420 /**
421  * @brief session����ȫ�ֵ����ٽӿ�
422  */
423 void NtfyObjMgr::Destroy()
424 {
425     if( _instance != NULL )
426     {
427         delete _instance;
428         _instance = NULL;
429     }
430 }
431 
432 /**
433  * @brief ��Ϣbuff�Ĺ��캯��
434  */
435 NtfyObjMgr::NtfyObjMgr()
436 {
437 }
438 
439 /**
440  * @brief ��������, ��������Դ, ������������
441  */
442 NtfyObjMgr::~NtfyObjMgr()
443 {
444 }
445 
446 /**
447  * @brief ע�᳤����session��Ϣ
448  * @param session_name �����ӵı�ʶ, ÿ�����Ӵ���һ��session��װ��ʽ
449  * @param session �����Ӷ���ָ��, ������������
450  * @return 0 �ɹ�, < 0 ʧ��
451  */
452 int NtfyObjMgr::RegisterSession(int session_name, ISessionNtfy* session)
453 {
454     if (session_name <= 0 || NULL == session) {
455         MTLOG_ERROR("session %d, register %p failed", session_name, session);
456         return -1;
457     }
458 
459     SessionMap::iterator it = _session_map.find(session_name);
460     if (it != _session_map.end())
461     {
462         MTLOG_ERROR("session %d, register %p already", session_name, session);
463         return -2;
464     }
465 
466     _session_map.insert(SessionMap::value_type(session_name, session));
467 
468     return 0;
469 }
470 
471 /**
472  * @brief ��ȡע�᳤����session��Ϣ
473  * @param session_name �����ӵı�ʶ, ÿ�����Ӵ���һ��session��װ��ʽ
474  * @return ������ָ��, ʧ��ΪNULL
475  */
476 ISessionNtfy* NtfyObjMgr::GetNameSession(int session_name)
477 {
478     SessionMap::iterator it = _session_map.find(session_name);
479     if (it != _session_map.end())
480     {
481         return it->second;
482     }
483     else
484     {
485         return NULL;
486     }
487 }
488 
489 /**
490  * @brief ��ȡͨ��֪ͨ����, ���߳�֪ͨ������session֪ͨ�������
491  * @param type ����, �߳�֪ͨ���ͣ�UDP/TCP SESSION֪ͨ��
492  * @param session_name proxyģ��,һ����ȡsession����
493  * @return ֪ͨ�����ָ��, ʧ��ΪNULL
494  */
495 KqueuerObj* NtfyObjMgr::GetNtfyObj(int type, int session_name)
496 {
497     KqueuerObj* obj = NULL;
498     SessionProxy* proxy = NULL;
499 
500     switch (type)
501     {
502         case NTFY_OBJ_THREAD:
503             obj = _fd_ntfy_pool.AllocPtr();
504             break;
505 
506         case NTFY_OBJ_SESSION:
507             proxy = _udp_proxy_pool.AllocPtr();
508             obj = proxy;
509             break;
510 
511         case NTFY_OBJ_KEEPALIVE:    // no need get this now
512             break;
513 
514         default:
515             break;
516     }
517 
518     // ��ȡ�ײ�ij����Ӷ���, ����������ʵ�ʵ�֪ͨ����
519     if (proxy) {
520         ISessionNtfy* ntfy = this->GetNameSession(session_name);
521         if (!ntfy) {
522             MTLOG_ERROR("ntfy get session name(%d) failed", session_name);
523             this->FreeNtfyObj(proxy);
524             obj = NULL;
525         } else {
526             proxy->SetRealNtfyObj(ntfy);
527         }
528     }
529 
530     return obj;
531 
532 }
533 
534 /**
535  * @brief �ͷ�֪ͨ����ָ��
536  * @param obj ֪ͨ����
537  */
538 void NtfyObjMgr::FreeNtfyObj(KqueuerObj* obj)
539 {
540     SessionProxy* proxy = NULL;
541     if (!obj) {
542         return;
543     }
544 
545     int type = obj->GetNtfyType();
546     obj->Reset();
547 
548     switch (type)
549     {
550         case NTFY_OBJ_THREAD:
551             return _fd_ntfy_pool.FreePtr(obj);
552             break;
553 
554         case NTFY_OBJ_SESSION:
555             proxy = dynamic_cast<SessionProxy*>(obj);
556             return _udp_proxy_pool.FreePtr(proxy);
557             break;
558 
559         case NTFY_OBJ_KEEPALIVE:
560             break;
561 
562         default:
563             break;
564     }
565 
566     delete obj;
567     return;
568 }
569 
570 
571 
572