xref: /f-stack/app/micro_thread/mt_notify.h (revision 35a81399)
1a9643ea8Slogwang 
2a9643ea8Slogwang /**
3a9643ea8Slogwang  * Tencent is pleased to support the open source community by making MSEC available.
4a9643ea8Slogwang  *
5a9643ea8Slogwang  * Copyright (C) 2016 THL A29 Limited, a Tencent company. All rights reserved.
6a9643ea8Slogwang  *
7a9643ea8Slogwang  * Licensed under the GNU General Public License, Version 2.0 (the "License");
8a9643ea8Slogwang  * you may not use this file except in compliance with the License. You may
9a9643ea8Slogwang  * obtain a copy of the License at
10a9643ea8Slogwang  *
11a9643ea8Slogwang  *     https://opensource.org/licenses/GPL-2.0
12a9643ea8Slogwang  *
13a9643ea8Slogwang  * Unless required by applicable law or agreed to in writing, software distributed under the
14a9643ea8Slogwang  * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
15a9643ea8Slogwang  * either express or implied. See the License for the specific language governing permissions
16a9643ea8Slogwang  * and limitations under the License.
17a9643ea8Slogwang  */
18a9643ea8Slogwang 
19a9643ea8Slogwang 
20a9643ea8Slogwang /**
21a9643ea8Slogwang  *  @file mt_notify.h
22a9643ea8Slogwang  *  @time 20130926
23a9643ea8Slogwang  **/
24a9643ea8Slogwang 
25a9643ea8Slogwang #ifndef __MT_NOTIFY_H__
26a9643ea8Slogwang #define __MT_NOTIFY_H__
27a9643ea8Slogwang 
28a9643ea8Slogwang #include <netinet/in.h>
29a9643ea8Slogwang #include <queue>
30a9643ea8Slogwang #include <map>
31a9643ea8Slogwang #include "mt_mbuf_pool.h"
32a9643ea8Slogwang 
33a9643ea8Slogwang namespace NS_MICRO_THREAD {
34a9643ea8Slogwang 
35a9643ea8Slogwang using std::queue;
36a9643ea8Slogwang using std::map;
37a9643ea8Slogwang 
38a9643ea8Slogwang class SessionProxy;
39a9643ea8Slogwang class TcpKeepConn;
40a9643ea8Slogwang 
41a9643ea8Slogwang enum NTFY_OBJ_TYPE
42a9643ea8Slogwang {
43*35a81399Slogwang     NTFY_OBJ_UNDEF     = 0,
44*35a81399Slogwang     NTFY_OBJ_THREAD    = 1,
45*35a81399Slogwang     NTFY_OBJ_KEEPALIVE = 2,
46*35a81399Slogwang     NTFY_OBJ_SESSION   = 3,
47a9643ea8Slogwang };
48a9643ea8Slogwang 
49a9643ea8Slogwang enum MULTI_PROTO
50a9643ea8Slogwang {
51a9643ea8Slogwang     MT_UNKNOWN = 0,
52*35a81399Slogwang     MT_UDP     = 0x1,
53*35a81399Slogwang     MT_TCP     = 0x2
54a9643ea8Slogwang };
55a9643ea8Slogwang 
56a9643ea8Slogwang typedef TAILQ_ENTRY(SessionProxy) NtfyEntry;
57a9643ea8Slogwang typedef TAILQ_HEAD(__NtfyList, SessionProxy) NtfyList;
58a9643ea8Slogwang class ISessionNtfy : public KqueuerObj
59a9643ea8Slogwang {
60a9643ea8Slogwang public:
61a9643ea8Slogwang 
GetSessionId(void * pkg,int len,int & session)62a9643ea8Slogwang     virtual int GetSessionId(void* pkg, int len,  int& session) { return 0;};
63a9643ea8Slogwang 
CreateSocket()64a9643ea8Slogwang     virtual int CreateSocket(){return -1;};
65a9643ea8Slogwang 
CloseSocket()66a9643ea8Slogwang     virtual void CloseSocket(){};
67a9643ea8Slogwang 
InputNotify()68a9643ea8Slogwang     virtual int InputNotify(){return 0;};
69a9643ea8Slogwang 
OutputNotify()70a9643ea8Slogwang     virtual int OutputNotify(){return 0;};
71a9643ea8Slogwang 
HangupNotify()72a9643ea8Slogwang     virtual int HangupNotify(){return 0;};
73a9643ea8Slogwang 
KqueueCtlAdd(void * args)74a9643ea8Slogwang     virtual int KqueueCtlAdd(void* args){return 0;};
75a9643ea8Slogwang 
KqueueCtlDel(void * args)76a9643ea8Slogwang     virtual int KqueueCtlDel(void* args){return 0;};
77a9643ea8Slogwang 
ISessionNtfy()78a9643ea8Slogwang     ISessionNtfy(): KqueuerObj(0) {
79a9643ea8Slogwang         _proto = MT_UDP;
80a9643ea8Slogwang         _buff_size = 0;
81a9643ea8Slogwang         _msg_buff = NULL;
82a9643ea8Slogwang         TAILQ_INIT(&_write_list);
83a9643ea8Slogwang     }
~ISessionNtfy()84a9643ea8Slogwang     virtual ~ISessionNtfy() {   };
85a9643ea8Slogwang 
SetProtoType(MULTI_PROTO proto)86a9643ea8Slogwang     void SetProtoType(MULTI_PROTO proto) {
87a9643ea8Slogwang         _proto = proto;
88a9643ea8Slogwang     };
89a9643ea8Slogwang 
GetProtoType()90a9643ea8Slogwang     MULTI_PROTO GetProtoType() {
91a9643ea8Slogwang         return _proto;
92a9643ea8Slogwang     };
93a9643ea8Slogwang 
SetMsgBuffSize(int buff_size)94a9643ea8Slogwang     void SetMsgBuffSize(int buff_size) {
95a9643ea8Slogwang         _buff_size = buff_size;
96a9643ea8Slogwang     };
97a9643ea8Slogwang 
GetMsgBuffSize()98a9643ea8Slogwang     int GetMsgBuffSize()     {
99a9643ea8Slogwang         return (_buff_size > 0) ? _buff_size : 65535;
100a9643ea8Slogwang     }
101a9643ea8Slogwang 
102a9643ea8Slogwang     void InsertWriteWait(SessionProxy* proxy);
103a9643ea8Slogwang 
104a9643ea8Slogwang     void RemoveWriteWait(SessionProxy* proxy);
105a9643ea8Slogwang 
106*35a81399Slogwang 
NotifyWriteWait()107a9643ea8Slogwang     virtual void NotifyWriteWait(){};
108a9643ea8Slogwang 
109a9643ea8Slogwang protected:
110*35a81399Slogwang     MULTI_PROTO         _proto;
111*35a81399Slogwang     int                 _buff_size;
112*35a81399Slogwang     NtfyList            _write_list;
113*35a81399Slogwang     MtMsgBuf*           _msg_buff;
114a9643ea8Slogwang };
115a9643ea8Slogwang 
116a9643ea8Slogwang 
117a9643ea8Slogwang class UdpSessionNtfy : public ISessionNtfy
118a9643ea8Slogwang {
119a9643ea8Slogwang public:
120a9643ea8Slogwang 
GetSessionId(void * pkg,int len,int & session)121a9643ea8Slogwang     virtual int GetSessionId(void* pkg, int len,  int& session) { return 0;};
122a9643ea8Slogwang 
123a9643ea8Slogwang 
124a9643ea8Slogwang public:
125a9643ea8Slogwang 
UdpSessionNtfy()126a9643ea8Slogwang     UdpSessionNtfy() : ISessionNtfy(){
127a9643ea8Slogwang         ISessionNtfy::SetProtoType(MT_UDP);
128a9643ea8Slogwang 
129a9643ea8Slogwang         _local_addr.sin_family = AF_INET;
130a9643ea8Slogwang         _local_addr.sin_addr.s_addr = 0;
131a9643ea8Slogwang         _local_addr.sin_port = 0;
132a9643ea8Slogwang     }
~UdpSessionNtfy()133a9643ea8Slogwang     virtual ~UdpSessionNtfy() {    };
134a9643ea8Slogwang 
135a9643ea8Slogwang     virtual void NotifyWriteWait();
136a9643ea8Slogwang 
137a9643ea8Slogwang     virtual int CreateSocket();
138a9643ea8Slogwang 
139a9643ea8Slogwang     virtual void CloseSocket();
140a9643ea8Slogwang 
141a9643ea8Slogwang     virtual int InputNotify();
142a9643ea8Slogwang 
143a9643ea8Slogwang     virtual int OutputNotify();
144a9643ea8Slogwang 
145a9643ea8Slogwang     virtual int HangupNotify();
146a9643ea8Slogwang 
147a9643ea8Slogwang     virtual int KqueueCtlAdd(void* args);
148a9643ea8Slogwang 
149a9643ea8Slogwang     virtual int KqueueCtlDel(void* args);
150a9643ea8Slogwang 
151a9643ea8Slogwang public:
152a9643ea8Slogwang 
SetLocalAddr(struct sockaddr_in * local_addr)153a9643ea8Slogwang     void SetLocalAddr(struct sockaddr_in* local_addr) {
154a9643ea8Slogwang         memcpy(&_local_addr, local_addr, sizeof(_local_addr));
155a9643ea8Slogwang     };
156a9643ea8Slogwang 
157a9643ea8Slogwang protected:
158a9643ea8Slogwang 
159a9643ea8Slogwang     struct sockaddr_in  _local_addr;
160a9643ea8Slogwang };
161a9643ea8Slogwang 
162a9643ea8Slogwang 
163a9643ea8Slogwang class SessionProxy  : public KqueuerObj
164a9643ea8Slogwang {
165a9643ea8Slogwang public:
166*35a81399Slogwang     int         _flag;
167*35a81399Slogwang     NtfyEntry   _write_entry;
168a9643ea8Slogwang 
SetRealNtfyObj(ISessionNtfy * obj)169a9643ea8Slogwang     void SetRealNtfyObj(ISessionNtfy* obj) {
170a9643ea8Slogwang         _real_ntfy = obj;
171a9643ea8Slogwang         this->SetOsfd(obj->GetOsfd());
172a9643ea8Slogwang     };
173a9643ea8Slogwang 
GetRealNtfyObj()174a9643ea8Slogwang     ISessionNtfy* GetRealNtfyObj() {
175a9643ea8Slogwang         return _real_ntfy;
176a9643ea8Slogwang     };
177a9643ea8Slogwang 
178a9643ea8Slogwang public:
179a9643ea8Slogwang 
Reset()180a9643ea8Slogwang     virtual void Reset() {
181a9643ea8Slogwang         _real_ntfy = NULL;
182a9643ea8Slogwang         this->KqueuerObj::Reset();
183a9643ea8Slogwang     };
184a9643ea8Slogwang 
KqueueCtlAdd(void * args)185a9643ea8Slogwang     virtual int KqueueCtlAdd(void* args) {
186a9643ea8Slogwang         if (!_real_ntfy) {
187a9643ea8Slogwang             return -1;
188a9643ea8Slogwang         }
189a9643ea8Slogwang 
190a9643ea8Slogwang         int events = this->GetEvents();
191a9643ea8Slogwang         if (!(events & KQ_EVENT_WRITE)) {
192a9643ea8Slogwang             return 0;
193a9643ea8Slogwang         }
194a9643ea8Slogwang 
195a9643ea8Slogwang         if (_real_ntfy->KqueueCtlAdd(args) < 0) {
196a9643ea8Slogwang             return -2;
197a9643ea8Slogwang         }
198a9643ea8Slogwang 
199a9643ea8Slogwang         _real_ntfy->InsertWriteWait(this);
200a9643ea8Slogwang         return 0;
201a9643ea8Slogwang     };
202a9643ea8Slogwang 
KqueueCtlDel(void * args)203a9643ea8Slogwang     virtual int KqueueCtlDel(void* args) {
204a9643ea8Slogwang         if (!_real_ntfy) {
205a9643ea8Slogwang             return -1;
206a9643ea8Slogwang         }
207a9643ea8Slogwang 
208a9643ea8Slogwang         int events = this->GetEvents();
209a9643ea8Slogwang         if (!(events & KQ_EVENT_WRITE)) {
210a9643ea8Slogwang             return 0;
211a9643ea8Slogwang         }
212a9643ea8Slogwang 
213a9643ea8Slogwang         _real_ntfy->RemoveWriteWait(this);
214a9643ea8Slogwang         return _real_ntfy->KqueueCtlDel(args);
215a9643ea8Slogwang     };
216a9643ea8Slogwang 
217a9643ea8Slogwang private:
218*35a81399Slogwang     ISessionNtfy*   _real_ntfy;
219a9643ea8Slogwang 
220a9643ea8Slogwang };
221a9643ea8Slogwang 
222a9643ea8Slogwang class TcpKeepNtfy: public KqueuerObj
223a9643ea8Slogwang {
224a9643ea8Slogwang public:
225a9643ea8Slogwang 
TcpKeepNtfy()226a9643ea8Slogwang     TcpKeepNtfy() :     _keep_conn(NULL){};
227a9643ea8Slogwang 
228a9643ea8Slogwang     virtual int InputNotify();
229a9643ea8Slogwang 
230a9643ea8Slogwang     virtual int OutputNotify();
231a9643ea8Slogwang 
232a9643ea8Slogwang     virtual int HangupNotify();
233a9643ea8Slogwang 
SetKeepNtfyObj(TcpKeepConn * obj)234a9643ea8Slogwang     void SetKeepNtfyObj(TcpKeepConn* obj) {
235a9643ea8Slogwang         _keep_conn = obj;
236a9643ea8Slogwang     };
237a9643ea8Slogwang 
GetKeepNtfyObj()238a9643ea8Slogwang     TcpKeepConn* GetKeepNtfyObj() {
239a9643ea8Slogwang         return _keep_conn;
240a9643ea8Slogwang     };
241a9643ea8Slogwang 
242a9643ea8Slogwang     void KeepaliveClose();
243a9643ea8Slogwang 
244a9643ea8Slogwang 
245a9643ea8Slogwang private:
246*35a81399Slogwang     TcpKeepConn*   _keep_conn;
247a9643ea8Slogwang 
248a9643ea8Slogwang };
249a9643ea8Slogwang 
250a9643ea8Slogwang template<typename ValueType>
251a9643ea8Slogwang class CPtrPool
252a9643ea8Slogwang {
253a9643ea8Slogwang public:
254*35a81399Slogwang     typedef typename std::queue<ValueType*>  PtrQueue;
255a9643ea8Slogwang 
256a9643ea8Slogwang public:
257a9643ea8Slogwang 
_max_free(max)258a9643ea8Slogwang     explicit CPtrPool(int max = 500) : _max_free(max), _total(0){};
259a9643ea8Slogwang 
~CPtrPool()260a9643ea8Slogwang     ~CPtrPool()    {
261a9643ea8Slogwang         ValueType* ptr = NULL;
262a9643ea8Slogwang         while (!_ptr_list.empty()) {
263a9643ea8Slogwang             ptr = _ptr_list.front();
264a9643ea8Slogwang             _ptr_list.pop();
265a9643ea8Slogwang             delete ptr;
266a9643ea8Slogwang         }
267a9643ea8Slogwang     };
268a9643ea8Slogwang 
AllocPtr()269a9643ea8Slogwang     ValueType* AllocPtr() {
270a9643ea8Slogwang         ValueType* ptr = NULL;
271a9643ea8Slogwang         if (!_ptr_list.empty()) {
272a9643ea8Slogwang             ptr = _ptr_list.front();
273a9643ea8Slogwang             _ptr_list.pop();
274a9643ea8Slogwang         } else {
275a9643ea8Slogwang             ptr = new ValueType;
276a9643ea8Slogwang             _total++;
277a9643ea8Slogwang         }
278a9643ea8Slogwang 
279a9643ea8Slogwang         return ptr;
280a9643ea8Slogwang     };
281a9643ea8Slogwang 
FreePtr(ValueType * ptr)282a9643ea8Slogwang     void FreePtr(ValueType* ptr) {
283a9643ea8Slogwang         if ((int)_ptr_list.size() >= _max_free) {
284a9643ea8Slogwang             delete ptr;
285a9643ea8Slogwang             _total--;
286a9643ea8Slogwang         } else {
287a9643ea8Slogwang             _ptr_list.push(ptr);
288a9643ea8Slogwang         }
289a9643ea8Slogwang     };
290a9643ea8Slogwang 
291a9643ea8Slogwang protected:
292*35a81399Slogwang     PtrQueue  _ptr_list;
293*35a81399Slogwang     int       _max_free;
294*35a81399Slogwang     int       _total;
295a9643ea8Slogwang };
296a9643ea8Slogwang 
297a9643ea8Slogwang class NtfyObjMgr
298a9643ea8Slogwang {
299a9643ea8Slogwang public:
300a9643ea8Slogwang 
301a9643ea8Slogwang     typedef std::map<int, ISessionNtfy*>   SessionMap;
302a9643ea8Slogwang     typedef CPtrPool<KqueuerObj> NtfyThreadQueue;
303a9643ea8Slogwang     typedef CPtrPool<SessionProxy>  NtfySessionQueue;
304a9643ea8Slogwang 
305a9643ea8Slogwang     static NtfyObjMgr* Instance (void);
306a9643ea8Slogwang 
307a9643ea8Slogwang     static void Destroy(void);
308a9643ea8Slogwang 
309a9643ea8Slogwang     int RegisterSession(int session_name, ISessionNtfy* session);
310a9643ea8Slogwang 
311a9643ea8Slogwang     ISessionNtfy* GetNameSession(int session_name);
312a9643ea8Slogwang 
313a9643ea8Slogwang     KqueuerObj* GetNtfyObj(int type, int session_name = 0);
314a9643ea8Slogwang 
315a9643ea8Slogwang     void FreeNtfyObj(KqueuerObj* obj);
316a9643ea8Slogwang 
317a9643ea8Slogwang     ~NtfyObjMgr();
318a9643ea8Slogwang 
319a9643ea8Slogwang private:
320a9643ea8Slogwang 
321a9643ea8Slogwang     NtfyObjMgr();
322a9643ea8Slogwang 
323*35a81399Slogwang     static NtfyObjMgr * _instance;
324*35a81399Slogwang     SessionMap _session_map;
325*35a81399Slogwang     NtfyThreadQueue  _fd_ntfy_pool;
326*35a81399Slogwang     NtfySessionQueue _udp_proxy_pool;
327a9643ea8Slogwang };
328a9643ea8Slogwang 
329a9643ea8Slogwang }
330a9643ea8Slogwang 
331a9643ea8Slogwang #endif
332