xref: /f-stack/app/micro_thread/mt_notify.h (revision a9643ea8)
1 
2 /**
3  * Tencent is pleased to support the open source community by making MSEC available.
4  *
5  * Copyright (C) 2016 THL A29 Limited, a Tencent company. All rights reserved.
6  *
7  * Licensed under the GNU General Public License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License. You may
9  * obtain a copy of the License at
10  *
11  *     https://opensource.org/licenses/GPL-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software distributed under the
14  * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
15  * either express or implied. See the License for the specific language governing permissions
16  * and limitations under the License.
17  */
18 
19 
20 /**
21  *  @file mt_notify.h
22  *  @info ΢�߳�ע���֪ͨ�����������
23  *  @time 20130926
24  **/
25 
26 #ifndef __MT_NOTIFY_H__
27 #define __MT_NOTIFY_H__
28 
29 #include <netinet/in.h>
30 #include <queue>
31 #include <map>
32 #include "mt_mbuf_pool.h"
33 
34 namespace NS_MICRO_THREAD {
35 
36 using std::queue;
37 using std::map;
38 
39 class SessionProxy;
40 class TcpKeepConn;
41 
42 /**
43  * @brief ֪ͨ��������
44  */
45 enum NTFY_OBJ_TYPE
46 {
47     NTFY_OBJ_UNDEF     = 0,     ///< δ��������Ӷ���
48     NTFY_OBJ_THREAD    = 1,     ///< �����Ӷ���, һ��fd��Ӧһ��thread
49     NTFY_OBJ_KEEPALIVE = 2,     ///< TCP�������ֵ�notify����, ������ thread
50     NTFY_OBJ_SESSION   = 3,     ///< UDP��sessionģ��, ����ij����Ӷ���
51 };
52 
53 /**
54  * @brief Э�����Ͷ���
55  */
56 enum MULTI_PROTO
57 {
58     MT_UNKNOWN = 0,
59     MT_UDP     = 0x1,                ///< �������� UDP
60     MT_TCP     = 0x2                 ///< �������� TCP
61 };
62 
63 /**
64  * @brief ������sessionģ��, �����շ����ȹ���ӿ�
65  */
66 typedef TAILQ_ENTRY(SessionProxy) NtfyEntry;
67 typedef TAILQ_HEAD(__NtfyList, SessionProxy) NtfyList;
68 class ISessionNtfy : public KqueuerObj
69 {
70 public:
71 
72     /**
73      *  @brief ��鱨��������, ͬʱ��ȡsessionid��Ϣ
74      *  @param pkg ����ָ��
75      *  @param len �����ѽ��ճ���
76      *  @param session ������sessionid, �������
77      *  @return <=0 ʧ��, >0 ʵ�ʱ��ij���
78      */
79     virtual int GetSessionId(void* pkg, int len,  int& session) { return 0;};
80 
81     /**
82      *  @brief ����socket, �����ɶ��¼�
83      *  @return fd�ľ��, <0 ʧ��
84      */
85     virtual int CreateSocket(){return -1;};
86 
87     /**
88      *  @brief �ر�socket, ֹͣ�����ɶ��¼�
89      */
90     virtual void CloseSocket(){};
91 
92     /**
93      *  @brief �ɶ��¼�֪ͨ�ӿ�, ����֪ͨ������ܻ��ƻ�����, ���÷���ֵ����
94      *  @return 0 ��fd�ɼ������������¼�; !=0 ��fd�������ص�����
95      */
96     virtual int InputNotify(){return 0;};
97 
98     /**
99      *  @brief ��д�¼�֪ͨ�ӿ�, ����֪ͨ������ܻ��ƻ�����, ���÷���ֵ����
100      *  @return 0 ��fd�ɼ������������¼�; !=0 ��fd�������ص�����
101      */
102     virtual int OutputNotify(){return 0;};
103 
104     /**
105      *  @brief �쳣֪ͨ�ӿ�
106      *  @return ���Է���ֵ, ���������¼�����
107      */
108     virtual int HangupNotify(){return 0;};
109 
110     /**
111      *  @brief ����epoll�����¼��Ļص��ӿ�, ������ʼ��EPOLLIN, ż��EPOLLOUT
112      *  @param args fd���ö����ָ��
113      *  @return 0 �ɹ�, < 0 ʧ��, Ҫ������ع�������ǰ״̬
114      */
115     virtual int KqueueCtlAdd(void* args){return 0;};
116 
117     /**
118      *  @brief ����epoll�����¼��Ļص��ӿ�, ������ʼ��EPOLLIN, ż��EPOLLOUT
119      *  @param args fd���ö����ָ��
120      *  @return 0 �ɹ�, < 0 ʧ��, Ҫ������ع�������ǰ״̬
121      */
122     virtual int KqueueCtlDel(void* args){return 0;};
123 
124     /**
125      * @brief ���캯����������
126      */
127     ISessionNtfy(): KqueuerObj(0) {
128         _proto = MT_UDP;
129         _buff_size = 0;
130         _msg_buff = NULL;
131         TAILQ_INIT(&_write_list);
132     }
133     virtual ~ISessionNtfy() {   };
134 
135     /**
136      * @brief ���ñ��δ����proto��Ϣ
137      */
138     void SetProtoType(MULTI_PROTO proto) {
139         _proto = proto;
140     };
141 
142     /**
143      * @brief ��ȡ���δ����proto��Ϣ
144      * @return proto type
145      */
146     MULTI_PROTO GetProtoType() {
147         return _proto;
148     };
149 
150     /**
151      * @brief ����buff��С, ����ʵ��ʹ�õ�msgbuff����
152      * @return  0�ɹ�
153      */
154     void SetMsgBuffSize(int buff_size) {
155         _buff_size = buff_size;
156     };
157 
158     /**
159      * @brief ��ȡԤ�õ�buff��С, ��������, ����65535
160      * @return  ����������Ϣbuff�����
161      */
162     int GetMsgBuffSize()     {
163         return (_buff_size > 0) ? _buff_size : 65535;
164     }
165 
166     /**
167      * @brief ֪ͨ�������ȴ�״̬
168      */
169     void InsertWriteWait(SessionProxy* proxy);
170 
171     /**
172      * @brief ֪ͨ����ȡ���ȴ�״̬
173      */
174     void RemoveWriteWait(SessionProxy* proxy);
175 
176     /**
177      * @brief �۲���ģʽ, ֪ͨд�ȴ��߳�
178      * @info UDP����֪ͨÿ���߳�ִ��д����, TCP��Ҫ�Ŷ�д
179      */
180     virtual void NotifyWriteWait(){};
181 
182 protected:
183     MULTI_PROTO         _proto;         // Э������ UDP/TCP
184     int                 _buff_size;     // �����Ϣ����
185     NtfyList            _write_list;    // ��д�ȴ�����
186     MtMsgBuf*           _msg_buff;      // ��ʱ�հ���Ż�����
187 };
188 
189 
190 /**
191  * @brief UDP������sessionģ�͵Ļ���ӿ�
192  * @info  ҵ��session��Ҫ�̳иýӿ�, ��������, ʵ�ֻ�ȡGetSessionId����
193  * @info  ������չ, ��ָ�����ض˿ڵ�
194  */
195 class UdpSessionNtfy : public ISessionNtfy
196 {
197 public:
198 
199     /**
200      *  @brief ��鱨��������, ͬʱ��ȡsessionid��Ϣ, �ɼ̳���ʵ����
201      *  @param pkg ����ָ��
202      *  @param len �����ѽ��ճ���
203      *  @param session ������sessionid, �������
204      *  @return <=0 ʧ��, >0 ʵ�ʱ��ij���
205      */
206     virtual int GetSessionId(void* pkg, int len,  int& session) { return 0;};
207 
208 
209 public:
210 
211     /**
212      * @brief ��������������
213      */
214     UdpSessionNtfy() : ISessionNtfy(){
215         ISessionNtfy::SetProtoType(MT_UDP);
216 
217         _local_addr.sin_family = AF_INET;
218         _local_addr.sin_addr.s_addr = 0;
219         _local_addr.sin_port = 0;
220     }
221     virtual ~UdpSessionNtfy() {    };
222 
223     /**
224      * @brief �۲���ģʽ, ֪ͨд�ȴ��߳�
225      * @info UDP����֪ͨÿ���߳�ִ��д����, TCP��Ҫ�Ŷ�д
226      */
227     virtual void NotifyWriteWait();
228 
229     /**
230      *  @brief ����socket, �����ɶ��¼�
231      *  @return fd�ľ��, <0 ʧ��
232      */
233     virtual int CreateSocket();
234 
235     /**
236      *  @brief �ر�socket, ֹͣ�����ɶ��¼�
237      */
238     virtual void CloseSocket();
239 
240     /**
241      *  @brief �ɶ��¼�֪ͨ�ӿ�, ����֪ͨ������ܻ��ƻ�����, ���÷���ֵ����
242      *  @return 0 ��fd�ɼ������������¼�; !=0 ��fd�������ص�����
243      */
244     virtual int InputNotify();
245 
246     /**
247      *  @brief ��д�¼�֪ͨ�ӿ�, ����֪ͨ������ܻ��ƻ�����, ���÷���ֵ����
248      *  @return 0 ��fd�ɼ������������¼�; !=0 ��fd�������ص�����
249      */
250     virtual int OutputNotify();
251 
252     /**
253      *  @brief �쳣֪ͨ�ӿ�
254      *  @return ���Է���ֵ, ���������¼�����
255      */
256     virtual int HangupNotify();
257 
258     /**
259      *  @brief ����epoll�����¼��Ļص��ӿ�, ������ʼ��EPOLLIN, ż��EPOLLOUT
260      *  @param args fd���ö����ָ��
261      *  @return 0 �ɹ�, < 0 ʧ��, Ҫ������ع�������ǰ״̬
262      */
263     virtual int KqueueCtlAdd(void* args);
264 
265     /**
266      *  @brief ����epoll�����¼��Ļص��ӿ�, ������ʼ��EPOLLIN, ż��EPOLLOUT
267      *  @param args fd���ö����ָ��
268      *  @return 0 �ɹ�, < 0 ʧ��, Ҫ������ع�������ǰ״̬
269      */
270     virtual int KqueueCtlDel(void* args);
271 
272 public:
273 
274     /**
275      * @brief ����udp���صı���bind��ַ, �����bind���ͻ, ��ʱͣ��
276      *      ��������, �ܱ�֤ÿ����Ψһport��ʹ��
277      */
278     void SetLocalAddr(struct sockaddr_in* local_addr) {
279         memcpy(&_local_addr, local_addr, sizeof(_local_addr));
280     };
281 
282 protected:
283 
284     struct sockaddr_in  _local_addr;
285 };
286 
287 
288 
289 /**
290  * @brief UDPģʽsessionģ�͵Ĵ���֪ͨ����, �������ӳ�䵽ijһ��session notify
291  * @info  session proxy ������epollע��, �������¼�֪ͨ, ����Ҫ���ij�ʱ��
292  */
293 class SessionProxy  : public KqueuerObj
294 {
295 public:
296     int         _flag;                ///< 0-���ڶ�����, 1-�ڵȴ�����
297     NtfyEntry   _write_entry;         ///< ������д�ȴ����еĹ������
298 
299     /**
300      *  @brief ���ô������, ���������fd���
301      */
302     void SetRealNtfyObj(ISessionNtfy* obj) {
303         _real_ntfy = obj;
304         this->SetOsfd(obj->GetOsfd());
305     };
306 
307     /**
308      *  @brief ��ȡ�������ָ��
309      */
310     ISessionNtfy* GetRealNtfyObj() {
311         return _real_ntfy;
312     };
313 
314 public:
315 
316     /**
317      * @brief ���մ���, ����������
318      */
319     virtual void Reset() {
320         _real_ntfy = NULL;
321         this->KqueuerObj::Reset();
322     };
323 
324     /**
325      *  @brief ����epoll�����¼��Ļص��ӿ�, ������ʼ��EPOLLIN, ż��EPOLLOUT
326      *  @param args fd���ö����ָ��
327      *  @return 0 �ɹ�, < 0 ʧ��, Ҫ������ع�������ǰ״̬
328      */
329     virtual int KqueueCtlAdd(void* args) {
330         if (!_real_ntfy) {
331             return -1;
332         }
333 
334         int events = this->GetEvents();
335         if (!(events & KQ_EVENT_WRITE)) {
336             return 0;
337         }
338 
339         if (_real_ntfy->KqueueCtlAdd(args) < 0) {
340             return -2;
341         }
342 
343         _real_ntfy->InsertWriteWait(this);
344         return 0;
345     };
346 
347     /**
348      *  @brief ����epoll�����¼��Ļص��ӿ�, ������ʼ��EPOLLIN, ż��EPOLLOUT
349      *  @param args fd���ö����ָ��
350      *  @return 0 �ɹ�, < 0 ʧ��, Ҫ������ع�������ǰ״̬
351      */
352     virtual int KqueueCtlDel(void* args) {
353         if (!_real_ntfy) {
354             return -1;
355         }
356 
357         int events = this->GetEvents();
358         if (!(events & KQ_EVENT_WRITE)) {
359             return 0;
360         }
361 
362         _real_ntfy->RemoveWriteWait(this);
363         return _real_ntfy->KqueueCtlDel(args);
364     };
365 
366 private:
367     ISessionNtfy*   _real_ntfy;         // ʵ�ʵ�ִ����
368 
369 };
370 
371 /**
372  * @brief TCPģʽ��keepalive֪ͨ����, �������Ŀɶ��¼�, ȷ���Ƿ�Զ˹ر�
373  */
374 class TcpKeepNtfy: public KqueuerObj
375 {
376 public:
377 
378     /**
379      * @brief ���캯��
380      */
381     TcpKeepNtfy() :     _keep_conn(NULL){};
382 
383     /**
384      *  @brief �ɶ��¼�֪ͨ�ӿ�, ����֪ͨ������ܻ��ƻ�����, ���÷���ֵ����
385      *  @return 0 ��fd�ɼ������������¼�; !=0 ��fd�������ص�����
386      */
387     virtual int InputNotify();
388 
389     /**
390      *  @brief ��д�¼�֪ͨ�ӿ�, ����֪ͨ������ܻ��ƻ�����, ���÷���ֵ����
391      *  @return 0 ��fd�ɼ������������¼�; !=0 ��fd�������ص�����
392      */
393     virtual int OutputNotify();
394 
395     /**
396      *  @brief �쳣֪ͨ�ӿ�
397      *  @return ���Է���ֵ, ���������¼�����
398      */
399     virtual int HangupNotify();
400 
401     /**
402      *  @brief ���ô������
403      */
404     void SetKeepNtfyObj(TcpKeepConn* obj) {
405         _keep_conn = obj;
406     };
407 
408     /**
409      *  @brief ��ȡ�������ָ��
410      */
411     TcpKeepConn* GetKeepNtfyObj() {
412         return _keep_conn;
413     };
414 
415     /**
416      *  @brief ����ʵ�����ӹرղ���
417      */
418     void KeepaliveClose();
419 
420 
421 private:
422     TcpKeepConn*   _keep_conn;         // ʵ�ʵ�����������
423 
424 };
425 
426 
427 /**
428  * @brief ��̬�ڴ��ģ����, ���ڷ���new/delete�Ķ������, ��һ���̶����������
429  */
430 template<typename ValueType>
431 class CPtrPool
432 {
433 public:
434     typedef typename std::queue<ValueType*>  PtrQueue; ///< �ڴ�ָ�����
435 
436 public:
437 
438     /**
439      * @brief ��̬�ڴ�ع��캯��
440      * @param max �����ж��б����ָ��Ԫ��, Ĭ��500
441      */
442     explicit CPtrPool(int max = 500) : _max_free(max), _total(0){};
443 
444     /**
445      * @brief ��̬�ڴ����������, ���������freelist
446      */
447     ~CPtrPool()    {
448         ValueType* ptr = NULL;
449         while (!_ptr_list.empty()) {
450             ptr = _ptr_list.front();
451             _ptr_list.pop();
452             delete ptr;
453         }
454     };
455 
456     /**
457      * @brief �����ڴ�ָ��, ���ȴӻ����ȡ, �޿��п�����̬ new ����
458      * @return ģ�����͵�ָ��Ԫ��, �ձ�ʾ�ڴ�����ʧ��
459      */
460     ValueType* AllocPtr() {
461         ValueType* ptr = NULL;
462         if (!_ptr_list.empty()) {
463             ptr = _ptr_list.front();
464             _ptr_list.pop();
465         } else {
466             ptr = new ValueType;
467             _total++;
468         }
469 
470         return ptr;
471     };
472 
473     /**
474      * @brief �ͷ��ڴ�ָ��, �����ж��г������, ��ֱ���ͷ�, ������л���
475      */
476     void FreePtr(ValueType* ptr) {
477         if ((int)_ptr_list.size() >= _max_free) {
478             delete ptr;
479             _total--;
480         } else {
481             _ptr_list.push(ptr);
482         }
483     };
484 
485 protected:
486     PtrQueue  _ptr_list;           ///<  ���ж���
487     int       _max_free;           ///<  ������Ԫ��
488     int       _total;              ///<  ����new�Ķ������ͳ��
489 };
490 
491 
492 /**
493  * @brief ֪ͨ����ȫ�ֹ�����
494  */
495 class NtfyObjMgr
496 {
497 public:
498 
499     typedef std::map<int, ISessionNtfy*>   SessionMap;
500     typedef CPtrPool<KqueuerObj> NtfyThreadQueue;
501     typedef CPtrPool<SessionProxy>  NtfySessionQueue;
502 
503     /**
504      * @brief �Ự�����ĵ�ȫ�ֹ������ӿ�
505      * @return ȫ�־��ָ��
506      */
507     static NtfyObjMgr* Instance (void);
508 
509     /**
510      * @brief ����ӿ�
511      */
512     static void Destroy(void);
513 
514     /**
515      * @brief ע�᳤����session��Ϣ
516      * @param session_name �����ӵı�ʶ, ÿ�����Ӵ���һ��session��װ��ʽ
517      * @param session �����Ӷ���ָ��, ������������
518      * @return 0 �ɹ�, < 0 ʧ��
519      */
520     int RegisterSession(int session_name, ISessionNtfy* session);
521 
522     /**
523      * @brief ��ȡע�᳤����session��Ϣ
524      * @param session_name �����ӵı�ʶ, ÿ�����Ӵ���һ��session��װ��ʽ
525      * @return ������ָ��, ʧ��ΪNULL
526      */
527     ISessionNtfy* GetNameSession(int session_name);
528 
529     /**
530      * @brief ��ȡͨ��֪ͨ����, ���߳�֪ͨ������session֪ͨ�������
531      * @param type ����, �߳�֪ͨ���ͣ�UDP/TCP SESSION֪ͨ��
532      * @param session_name proxyģ��,һ����ȡsession����
533      * @return ֪ͨ�����ָ��, ʧ��ΪNULL
534      */
535     KqueuerObj* GetNtfyObj(int type, int session_name = 0);
536 
537 
538     /**
539      * @brief �ͷ�֪ͨ����ָ��
540      * @param obj ֪ͨ����
541      */
542     void FreeNtfyObj(KqueuerObj* obj);
543 
544     /**
545      * @brief ��������
546      */
547     ~NtfyObjMgr();
548 
549 private:
550 
551     /**
552      * @brief ��Ϣbuff�Ĺ��캯��
553      */
554     NtfyObjMgr();
555 
556     static NtfyObjMgr * _instance;         ///<  ��������
557     SessionMap _session_map;               ///<  ȫ�ֵ�ע��session����
558     NtfyThreadQueue  _fd_ntfy_pool;        ///<  fd֪ͨ����
559     NtfySessionQueue _udp_proxy_pool;      ///<  fd֪ͨ����
560 };
561 
562 
563 
564 }
565 
566 #endif
567 
568 
569