xref: /f-stack/app/micro_thread/mt_notify.h (revision 9bd490e8)
1 
2 /**
3  * Tencent is pleased to support the open source community by making MSEC available.
4  *
5  * Copyright (C) 2016 THL A29 Limited, a Tencent company. All rights reserved.
6  *
7  * Licensed under the GNU General Public License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License. You may
9  * obtain a copy of the License at
10  *
11  *     https://opensource.org/licenses/GPL-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software distributed under the
14  * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
15  * either express or implied. See the License for the specific language governing permissions
16  * and limitations under the License.
17  */
18 
19 
20 /**
21  *  @file mt_notify.h
22  *  @time 20130926
23  **/
24 
25 #ifndef __MT_NOTIFY_H__
26 #define __MT_NOTIFY_H__
27 
28 #include <netinet/in.h>
29 #include <queue>
30 #include <map>
31 #include "mt_mbuf_pool.h"
32 
33 namespace NS_MICRO_THREAD {
34 
35 using std::queue;
36 using std::map;
37 
38 class SessionProxy;
39 class TcpKeepConn;
40 
41 enum NTFY_OBJ_TYPE
42 {
43     NTFY_OBJ_UNDEF     = 0,
44     NTFY_OBJ_THREAD    = 1,
45     NTFY_OBJ_KEEPALIVE = 2,
46     NTFY_OBJ_SESSION   = 3,
47 };
48 
49 enum MULTI_PROTO
50 {
51     MT_UNKNOWN = 0,
52     MT_UDP     = 0x1,
53     MT_TCP     = 0x2
54 };
55 
56 typedef TAILQ_ENTRY(SessionProxy) NtfyEntry;
57 typedef TAILQ_HEAD(__NtfyList, SessionProxy) NtfyList;
58 class ISessionNtfy : public KqueuerObj
59 {
60 public:
61 
62     virtual int GetSessionId(void* pkg, int len,  int& session) { return 0;};
63 
64     virtual int CreateSocket(){return -1;};
65 
66     virtual void CloseSocket(){};
67 
68     virtual int InputNotify(){return 0;};
69 
70     virtual int OutputNotify(){return 0;};
71 
72     virtual int HangupNotify(){return 0;};
73 
74     virtual int KqueueCtlAdd(void* args){return 0;};
75 
76     virtual int KqueueCtlDel(void* args){return 0;};
77 
78     ISessionNtfy(): KqueuerObj(0) {
79         _proto = MT_UDP;
80         _buff_size = 0;
81         _msg_buff = NULL;
82         TAILQ_INIT(&_write_list);
83     }
84     virtual ~ISessionNtfy() {   };
85 
86     void SetProtoType(MULTI_PROTO proto) {
87         _proto = proto;
88     };
89 
90     MULTI_PROTO GetProtoType() {
91         return _proto;
92     };
93 
94     void SetMsgBuffSize(int buff_size) {
95         _buff_size = buff_size;
96     };
97 
98     int GetMsgBuffSize()     {
99         return (_buff_size > 0) ? _buff_size : 65535;
100     }
101 
102     void InsertWriteWait(SessionProxy* proxy);
103 
104     void RemoveWriteWait(SessionProxy* proxy);
105 
106 
107     virtual void NotifyWriteWait(){};
108 
109 protected:
110     MULTI_PROTO         _proto;
111     int                 _buff_size;
112     NtfyList            _write_list;
113     MtMsgBuf*           _msg_buff;
114 };
115 
116 
117 class UdpSessionNtfy : public ISessionNtfy
118 {
119 public:
120 
121     virtual int GetSessionId(void* pkg, int len,  int& session) { return 0;};
122 
123 
124 public:
125 
126     UdpSessionNtfy() : ISessionNtfy(){
127         ISessionNtfy::SetProtoType(MT_UDP);
128 
129         _local_addr.sin_family = AF_INET;
130         _local_addr.sin_addr.s_addr = 0;
131         _local_addr.sin_port = 0;
132     }
133     virtual ~UdpSessionNtfy() {    };
134 
135     virtual void NotifyWriteWait();
136 
137     virtual int CreateSocket();
138 
139     virtual void CloseSocket();
140 
141     virtual int InputNotify();
142 
143     virtual int OutputNotify();
144 
145     virtual int HangupNotify();
146 
147     virtual int KqueueCtlAdd(void* args);
148 
149     virtual int KqueueCtlDel(void* args);
150 
151 public:
152 
153     void SetLocalAddr(struct sockaddr_in* local_addr) {
154         memcpy(&_local_addr, local_addr, sizeof(_local_addr));
155     };
156 
157 protected:
158 
159     struct sockaddr_in  _local_addr;
160 };
161 
162 
163 class SessionProxy  : public KqueuerObj
164 {
165 public:
166     int         _flag;
167     NtfyEntry   _write_entry;
168 
169     void SetRealNtfyObj(ISessionNtfy* obj) {
170         _real_ntfy = obj;
171         this->SetOsfd(obj->GetOsfd());
172     };
173 
174     ISessionNtfy* GetRealNtfyObj() {
175         return _real_ntfy;
176     };
177 
178 public:
179 
180     virtual void Reset() {
181         _real_ntfy = NULL;
182         this->KqueuerObj::Reset();
183     };
184 
185     virtual int KqueueCtlAdd(void* args) {
186         if (!_real_ntfy) {
187             return -1;
188         }
189 
190         int events = this->GetEvents();
191         if (!(events & KQ_EVENT_WRITE)) {
192             return 0;
193         }
194 
195         if (_real_ntfy->KqueueCtlAdd(args) < 0) {
196             return -2;
197         }
198 
199         _real_ntfy->InsertWriteWait(this);
200         return 0;
201     };
202 
203     virtual int KqueueCtlDel(void* args) {
204         if (!_real_ntfy) {
205             return -1;
206         }
207 
208         int events = this->GetEvents();
209         if (!(events & KQ_EVENT_WRITE)) {
210             return 0;
211         }
212 
213         _real_ntfy->RemoveWriteWait(this);
214         return _real_ntfy->KqueueCtlDel(args);
215     };
216 
217 private:
218     ISessionNtfy*   _real_ntfy;
219 
220 };
221 
222 class TcpKeepNtfy: public KqueuerObj
223 {
224 public:
225 
226     TcpKeepNtfy() :     _keep_conn(NULL){};
227 
228     virtual int InputNotify();
229 
230     virtual int OutputNotify();
231 
232     virtual int HangupNotify();
233 
234     void SetKeepNtfyObj(TcpKeepConn* obj) {
235         _keep_conn = obj;
236     };
237 
238     TcpKeepConn* GetKeepNtfyObj() {
239         return _keep_conn;
240     };
241 
242     void KeepaliveClose();
243 
244 
245 private:
246     TcpKeepConn*   _keep_conn;
247 
248 };
249 
250 template<typename ValueType>
251 class CPtrPool
252 {
253 public:
254     typedef typename std::queue<ValueType*>  PtrQueue;
255 
256 public:
257 
258     explicit CPtrPool(int max = 500) : _max_free(max), _total(0){};
259 
260     ~CPtrPool()    {
261         ValueType* ptr = NULL;
262         while (!_ptr_list.empty()) {
263             ptr = _ptr_list.front();
264             _ptr_list.pop();
265             delete ptr;
266         }
267     };
268 
269     ValueType* AllocPtr() {
270         ValueType* ptr = NULL;
271         if (!_ptr_list.empty()) {
272             ptr = _ptr_list.front();
273             _ptr_list.pop();
274         } else {
275             ptr = new ValueType;
276             _total++;
277         }
278 
279         return ptr;
280     };
281 
282     void FreePtr(ValueType* ptr) {
283         if ((int)_ptr_list.size() >= _max_free) {
284             delete ptr;
285             _total--;
286         } else {
287             _ptr_list.push(ptr);
288         }
289     };
290 
291 protected:
292     PtrQueue  _ptr_list;
293     int       _max_free;
294     int       _total;
295 };
296 
297 class NtfyObjMgr
298 {
299 public:
300 
301     typedef std::map<int, ISessionNtfy*>   SessionMap;
302     typedef CPtrPool<KqueuerObj> NtfyThreadQueue;
303     typedef CPtrPool<SessionProxy>  NtfySessionQueue;
304 
305     static NtfyObjMgr* Instance (void);
306 
307     static void Destroy(void);
308 
309     int RegisterSession(int session_name, ISessionNtfy* session);
310 
311     ISessionNtfy* GetNameSession(int session_name);
312 
313     KqueuerObj* GetNtfyObj(int type, int session_name = 0);
314 
315     void FreeNtfyObj(KqueuerObj* obj);
316 
317     ~NtfyObjMgr();
318 
319 private:
320 
321     NtfyObjMgr();
322 
323     static NtfyObjMgr * _instance;
324     SessionMap _session_map;
325     NtfyThreadQueue  _fd_ntfy_pool;
326     NtfySessionQueue _udp_proxy_pool;
327 };
328 
329 }
330 
331 #endif
332