1 2 /** 3 * Tencent is pleased to support the open source community by making MSEC available. 4 * 5 * Copyright (C) 2016 THL A29 Limited, a Tencent company. All rights reserved. 6 * 7 * Licensed under the GNU General Public License, Version 2.0 (the "License"); 8 * you may not use this file except in compliance with the License. You may 9 * obtain a copy of the License at 10 * 11 * https://opensource.org/licenses/GPL-2.0 12 * 13 * Unless required by applicable law or agreed to in writing, software distributed under the 14 * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, 15 * either express or implied. See the License for the specific language governing permissions 16 * and limitations under the License. 17 */ 18 19 20 /** 21 * @file mt_notify.h 22 * @time 20130926 23 **/ 24 25 #ifndef __MT_NOTIFY_H__ 26 #define __MT_NOTIFY_H__ 27 28 #include <netinet/in.h> 29 #include <queue> 30 #include <map> 31 #include "mt_mbuf_pool.h" 32 33 namespace NS_MICRO_THREAD { 34 35 using std::queue; 36 using std::map; 37 38 class SessionProxy; 39 class TcpKeepConn; 40 41 enum NTFY_OBJ_TYPE 42 { 43 NTFY_OBJ_UNDEF = 0, 44 NTFY_OBJ_THREAD = 1, 45 NTFY_OBJ_KEEPALIVE = 2, 46 NTFY_OBJ_SESSION = 3, 47 }; 48 49 enum MULTI_PROTO 50 { 51 MT_UNKNOWN = 0, 52 MT_UDP = 0x1, 53 MT_TCP = 0x2 54 }; 55 56 typedef TAILQ_ENTRY(SessionProxy) NtfyEntry; 57 typedef TAILQ_HEAD(__NtfyList, SessionProxy) NtfyList; 58 class ISessionNtfy : public KqueuerObj 59 { 60 public: 61 GetSessionId(void * pkg,int len,int & session)62 virtual int GetSessionId(void* pkg, int len, int& session) { return 0;}; 63 CreateSocket()64 virtual int CreateSocket(){return -1;}; 65 CloseSocket()66 virtual void CloseSocket(){}; 67 InputNotify()68 virtual int InputNotify(){return 0;}; 69 OutputNotify()70 virtual int OutputNotify(){return 0;}; 71 HangupNotify()72 virtual int HangupNotify(){return 0;}; 73 KqueueCtlAdd(void * args)74 virtual int KqueueCtlAdd(void* args){return 0;}; 75 KqueueCtlDel(void * args)76 virtual int KqueueCtlDel(void* args){return 0;}; 77 ISessionNtfy()78 ISessionNtfy(): KqueuerObj(0) { 79 _proto = MT_UDP; 80 _buff_size = 0; 81 _msg_buff = NULL; 82 TAILQ_INIT(&_write_list); 83 } ~ISessionNtfy()84 virtual ~ISessionNtfy() { }; 85 SetProtoType(MULTI_PROTO proto)86 void SetProtoType(MULTI_PROTO proto) { 87 _proto = proto; 88 }; 89 GetProtoType()90 MULTI_PROTO GetProtoType() { 91 return _proto; 92 }; 93 SetMsgBuffSize(int buff_size)94 void SetMsgBuffSize(int buff_size) { 95 _buff_size = buff_size; 96 }; 97 GetMsgBuffSize()98 int GetMsgBuffSize() { 99 return (_buff_size > 0) ? _buff_size : 65535; 100 } 101 102 void InsertWriteWait(SessionProxy* proxy); 103 104 void RemoveWriteWait(SessionProxy* proxy); 105 106 NotifyWriteWait()107 virtual void NotifyWriteWait(){}; 108 109 protected: 110 MULTI_PROTO _proto; 111 int _buff_size; 112 NtfyList _write_list; 113 MtMsgBuf* _msg_buff; 114 }; 115 116 117 class UdpSessionNtfy : public ISessionNtfy 118 { 119 public: 120 GetSessionId(void * pkg,int len,int & session)121 virtual int GetSessionId(void* pkg, int len, int& session) { return 0;}; 122 123 124 public: 125 UdpSessionNtfy()126 UdpSessionNtfy() : ISessionNtfy(){ 127 ISessionNtfy::SetProtoType(MT_UDP); 128 129 _local_addr.sin_family = AF_INET; 130 _local_addr.sin_addr.s_addr = 0; 131 _local_addr.sin_port = 0; 132 } ~UdpSessionNtfy()133 virtual ~UdpSessionNtfy() { }; 134 135 virtual void NotifyWriteWait(); 136 137 virtual int CreateSocket(); 138 139 virtual void CloseSocket(); 140 141 virtual int InputNotify(); 142 143 virtual int OutputNotify(); 144 145 virtual int HangupNotify(); 146 147 virtual int KqueueCtlAdd(void* args); 148 149 virtual int KqueueCtlDel(void* args); 150 151 public: 152 SetLocalAddr(struct sockaddr_in * local_addr)153 void SetLocalAddr(struct sockaddr_in* local_addr) { 154 memcpy(&_local_addr, local_addr, sizeof(_local_addr)); 155 }; 156 157 protected: 158 159 struct sockaddr_in _local_addr; 160 }; 161 162 163 class SessionProxy : public KqueuerObj 164 { 165 public: 166 int _flag; 167 NtfyEntry _write_entry; 168 SetRealNtfyObj(ISessionNtfy * obj)169 void SetRealNtfyObj(ISessionNtfy* obj) { 170 _real_ntfy = obj; 171 this->SetOsfd(obj->GetOsfd()); 172 }; 173 GetRealNtfyObj()174 ISessionNtfy* GetRealNtfyObj() { 175 return _real_ntfy; 176 }; 177 178 public: 179 Reset()180 virtual void Reset() { 181 _real_ntfy = NULL; 182 this->KqueuerObj::Reset(); 183 }; 184 KqueueCtlAdd(void * args)185 virtual int KqueueCtlAdd(void* args) { 186 if (!_real_ntfy) { 187 return -1; 188 } 189 190 int events = this->GetEvents(); 191 if (!(events & KQ_EVENT_WRITE)) { 192 return 0; 193 } 194 195 if (_real_ntfy->KqueueCtlAdd(args) < 0) { 196 return -2; 197 } 198 199 _real_ntfy->InsertWriteWait(this); 200 return 0; 201 }; 202 KqueueCtlDel(void * args)203 virtual int KqueueCtlDel(void* args) { 204 if (!_real_ntfy) { 205 return -1; 206 } 207 208 int events = this->GetEvents(); 209 if (!(events & KQ_EVENT_WRITE)) { 210 return 0; 211 } 212 213 _real_ntfy->RemoveWriteWait(this); 214 return _real_ntfy->KqueueCtlDel(args); 215 }; 216 217 private: 218 ISessionNtfy* _real_ntfy; 219 220 }; 221 222 class TcpKeepNtfy: public KqueuerObj 223 { 224 public: 225 TcpKeepNtfy()226 TcpKeepNtfy() : _keep_conn(NULL){}; 227 228 virtual int InputNotify(); 229 230 virtual int OutputNotify(); 231 232 virtual int HangupNotify(); 233 SetKeepNtfyObj(TcpKeepConn * obj)234 void SetKeepNtfyObj(TcpKeepConn* obj) { 235 _keep_conn = obj; 236 }; 237 GetKeepNtfyObj()238 TcpKeepConn* GetKeepNtfyObj() { 239 return _keep_conn; 240 }; 241 242 void KeepaliveClose(); 243 244 245 private: 246 TcpKeepConn* _keep_conn; 247 248 }; 249 250 template<typename ValueType> 251 class CPtrPool 252 { 253 public: 254 typedef typename std::queue<ValueType*> PtrQueue; 255 256 public: 257 _max_free(max)258 explicit CPtrPool(int max = 500) : _max_free(max), _total(0){}; 259 ~CPtrPool()260 ~CPtrPool() { 261 ValueType* ptr = NULL; 262 while (!_ptr_list.empty()) { 263 ptr = _ptr_list.front(); 264 _ptr_list.pop(); 265 delete ptr; 266 } 267 }; 268 AllocPtr()269 ValueType* AllocPtr() { 270 ValueType* ptr = NULL; 271 if (!_ptr_list.empty()) { 272 ptr = _ptr_list.front(); 273 _ptr_list.pop(); 274 } else { 275 ptr = new ValueType; 276 _total++; 277 } 278 279 return ptr; 280 }; 281 FreePtr(ValueType * ptr)282 void FreePtr(ValueType* ptr) { 283 if ((int)_ptr_list.size() >= _max_free) { 284 delete ptr; 285 _total--; 286 } else { 287 _ptr_list.push(ptr); 288 } 289 }; 290 291 protected: 292 PtrQueue _ptr_list; 293 int _max_free; 294 int _total; 295 }; 296 297 class NtfyObjMgr 298 { 299 public: 300 301 typedef std::map<int, ISessionNtfy*> SessionMap; 302 typedef CPtrPool<KqueuerObj> NtfyThreadQueue; 303 typedef CPtrPool<SessionProxy> NtfySessionQueue; 304 305 static NtfyObjMgr* Instance (void); 306 307 static void Destroy(void); 308 309 int RegisterSession(int session_name, ISessionNtfy* session); 310 311 ISessionNtfy* GetNameSession(int session_name); 312 313 KqueuerObj* GetNtfyObj(int type, int session_name = 0); 314 315 void FreeNtfyObj(KqueuerObj* obj); 316 317 ~NtfyObjMgr(); 318 319 private: 320 321 NtfyObjMgr(); 322 323 static NtfyObjMgr * _instance; 324 SessionMap _session_map; 325 NtfyThreadQueue _fd_ntfy_pool; 326 NtfySessionQueue _udp_proxy_pool; 327 }; 328 329 } 330 331 #endif 332