1a9643ea8Slogwang 2a9643ea8Slogwang /** 3a9643ea8Slogwang * Tencent is pleased to support the open source community by making MSEC available. 4a9643ea8Slogwang * 5a9643ea8Slogwang * Copyright (C) 2016 THL A29 Limited, a Tencent company. All rights reserved. 6a9643ea8Slogwang * 7a9643ea8Slogwang * Licensed under the GNU General Public License, Version 2.0 (the "License"); 8a9643ea8Slogwang * you may not use this file except in compliance with the License. You may 9a9643ea8Slogwang * obtain a copy of the License at 10a9643ea8Slogwang * 11a9643ea8Slogwang * https://opensource.org/licenses/GPL-2.0 12a9643ea8Slogwang * 13a9643ea8Slogwang * Unless required by applicable law or agreed to in writing, software distributed under the 14a9643ea8Slogwang * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, 15a9643ea8Slogwang * either express or implied. See the License for the specific language governing permissions 16a9643ea8Slogwang * and limitations under the License. 17a9643ea8Slogwang */ 18a9643ea8Slogwang 19a9643ea8Slogwang 20a9643ea8Slogwang /** 21a9643ea8Slogwang * @file mt_notify.h 22a9643ea8Slogwang * @time 20130926 23a9643ea8Slogwang **/ 24a9643ea8Slogwang 25a9643ea8Slogwang #ifndef __MT_NOTIFY_H__ 26a9643ea8Slogwang #define __MT_NOTIFY_H__ 27a9643ea8Slogwang 28a9643ea8Slogwang #include <netinet/in.h> 29a9643ea8Slogwang #include <queue> 30a9643ea8Slogwang #include <map> 31a9643ea8Slogwang #include "mt_mbuf_pool.h" 32a9643ea8Slogwang 33a9643ea8Slogwang namespace NS_MICRO_THREAD { 34a9643ea8Slogwang 35a9643ea8Slogwang using std::queue; 36a9643ea8Slogwang using std::map; 37a9643ea8Slogwang 38a9643ea8Slogwang class SessionProxy; 39a9643ea8Slogwang class TcpKeepConn; 40a9643ea8Slogwang 41a9643ea8Slogwang enum NTFY_OBJ_TYPE 42a9643ea8Slogwang { 43*35a81399Slogwang NTFY_OBJ_UNDEF = 0, 44*35a81399Slogwang NTFY_OBJ_THREAD = 1, 45*35a81399Slogwang NTFY_OBJ_KEEPALIVE = 2, 46*35a81399Slogwang NTFY_OBJ_SESSION = 3, 47a9643ea8Slogwang }; 48a9643ea8Slogwang 49a9643ea8Slogwang enum MULTI_PROTO 50a9643ea8Slogwang { 51a9643ea8Slogwang MT_UNKNOWN = 0, 52*35a81399Slogwang MT_UDP = 0x1, 53*35a81399Slogwang MT_TCP = 0x2 54a9643ea8Slogwang }; 55a9643ea8Slogwang 56a9643ea8Slogwang typedef TAILQ_ENTRY(SessionProxy) NtfyEntry; 57a9643ea8Slogwang typedef TAILQ_HEAD(__NtfyList, SessionProxy) NtfyList; 58a9643ea8Slogwang class ISessionNtfy : public KqueuerObj 59a9643ea8Slogwang { 60a9643ea8Slogwang public: 61a9643ea8Slogwang GetSessionId(void * pkg,int len,int & session)62a9643ea8Slogwang virtual int GetSessionId(void* pkg, int len, int& session) { return 0;}; 63a9643ea8Slogwang CreateSocket()64a9643ea8Slogwang virtual int CreateSocket(){return -1;}; 65a9643ea8Slogwang CloseSocket()66a9643ea8Slogwang virtual void CloseSocket(){}; 67a9643ea8Slogwang InputNotify()68a9643ea8Slogwang virtual int InputNotify(){return 0;}; 69a9643ea8Slogwang OutputNotify()70a9643ea8Slogwang virtual int OutputNotify(){return 0;}; 71a9643ea8Slogwang HangupNotify()72a9643ea8Slogwang virtual int HangupNotify(){return 0;}; 73a9643ea8Slogwang KqueueCtlAdd(void * args)74a9643ea8Slogwang virtual int KqueueCtlAdd(void* args){return 0;}; 75a9643ea8Slogwang KqueueCtlDel(void * args)76a9643ea8Slogwang virtual int KqueueCtlDel(void* args){return 0;}; 77a9643ea8Slogwang ISessionNtfy()78a9643ea8Slogwang ISessionNtfy(): KqueuerObj(0) { 79a9643ea8Slogwang _proto = MT_UDP; 80a9643ea8Slogwang _buff_size = 0; 81a9643ea8Slogwang _msg_buff = NULL; 82a9643ea8Slogwang TAILQ_INIT(&_write_list); 83a9643ea8Slogwang } ~ISessionNtfy()84a9643ea8Slogwang virtual ~ISessionNtfy() { }; 85a9643ea8Slogwang SetProtoType(MULTI_PROTO proto)86a9643ea8Slogwang void SetProtoType(MULTI_PROTO proto) { 87a9643ea8Slogwang _proto = proto; 88a9643ea8Slogwang }; 89a9643ea8Slogwang GetProtoType()90a9643ea8Slogwang MULTI_PROTO GetProtoType() { 91a9643ea8Slogwang return _proto; 92a9643ea8Slogwang }; 93a9643ea8Slogwang SetMsgBuffSize(int buff_size)94a9643ea8Slogwang void SetMsgBuffSize(int buff_size) { 95a9643ea8Slogwang _buff_size = buff_size; 96a9643ea8Slogwang }; 97a9643ea8Slogwang GetMsgBuffSize()98a9643ea8Slogwang int GetMsgBuffSize() { 99a9643ea8Slogwang return (_buff_size > 0) ? _buff_size : 65535; 100a9643ea8Slogwang } 101a9643ea8Slogwang 102a9643ea8Slogwang void InsertWriteWait(SessionProxy* proxy); 103a9643ea8Slogwang 104a9643ea8Slogwang void RemoveWriteWait(SessionProxy* proxy); 105a9643ea8Slogwang 106*35a81399Slogwang NotifyWriteWait()107a9643ea8Slogwang virtual void NotifyWriteWait(){}; 108a9643ea8Slogwang 109a9643ea8Slogwang protected: 110*35a81399Slogwang MULTI_PROTO _proto; 111*35a81399Slogwang int _buff_size; 112*35a81399Slogwang NtfyList _write_list; 113*35a81399Slogwang MtMsgBuf* _msg_buff; 114a9643ea8Slogwang }; 115a9643ea8Slogwang 116a9643ea8Slogwang 117a9643ea8Slogwang class UdpSessionNtfy : public ISessionNtfy 118a9643ea8Slogwang { 119a9643ea8Slogwang public: 120a9643ea8Slogwang GetSessionId(void * pkg,int len,int & session)121a9643ea8Slogwang virtual int GetSessionId(void* pkg, int len, int& session) { return 0;}; 122a9643ea8Slogwang 123a9643ea8Slogwang 124a9643ea8Slogwang public: 125a9643ea8Slogwang UdpSessionNtfy()126a9643ea8Slogwang UdpSessionNtfy() : ISessionNtfy(){ 127a9643ea8Slogwang ISessionNtfy::SetProtoType(MT_UDP); 128a9643ea8Slogwang 129a9643ea8Slogwang _local_addr.sin_family = AF_INET; 130a9643ea8Slogwang _local_addr.sin_addr.s_addr = 0; 131a9643ea8Slogwang _local_addr.sin_port = 0; 132a9643ea8Slogwang } ~UdpSessionNtfy()133a9643ea8Slogwang virtual ~UdpSessionNtfy() { }; 134a9643ea8Slogwang 135a9643ea8Slogwang virtual void NotifyWriteWait(); 136a9643ea8Slogwang 137a9643ea8Slogwang virtual int CreateSocket(); 138a9643ea8Slogwang 139a9643ea8Slogwang virtual void CloseSocket(); 140a9643ea8Slogwang 141a9643ea8Slogwang virtual int InputNotify(); 142a9643ea8Slogwang 143a9643ea8Slogwang virtual int OutputNotify(); 144a9643ea8Slogwang 145a9643ea8Slogwang virtual int HangupNotify(); 146a9643ea8Slogwang 147a9643ea8Slogwang virtual int KqueueCtlAdd(void* args); 148a9643ea8Slogwang 149a9643ea8Slogwang virtual int KqueueCtlDel(void* args); 150a9643ea8Slogwang 151a9643ea8Slogwang public: 152a9643ea8Slogwang SetLocalAddr(struct sockaddr_in * local_addr)153a9643ea8Slogwang void SetLocalAddr(struct sockaddr_in* local_addr) { 154a9643ea8Slogwang memcpy(&_local_addr, local_addr, sizeof(_local_addr)); 155a9643ea8Slogwang }; 156a9643ea8Slogwang 157a9643ea8Slogwang protected: 158a9643ea8Slogwang 159a9643ea8Slogwang struct sockaddr_in _local_addr; 160a9643ea8Slogwang }; 161a9643ea8Slogwang 162a9643ea8Slogwang 163a9643ea8Slogwang class SessionProxy : public KqueuerObj 164a9643ea8Slogwang { 165a9643ea8Slogwang public: 166*35a81399Slogwang int _flag; 167*35a81399Slogwang NtfyEntry _write_entry; 168a9643ea8Slogwang SetRealNtfyObj(ISessionNtfy * obj)169a9643ea8Slogwang void SetRealNtfyObj(ISessionNtfy* obj) { 170a9643ea8Slogwang _real_ntfy = obj; 171a9643ea8Slogwang this->SetOsfd(obj->GetOsfd()); 172a9643ea8Slogwang }; 173a9643ea8Slogwang GetRealNtfyObj()174a9643ea8Slogwang ISessionNtfy* GetRealNtfyObj() { 175a9643ea8Slogwang return _real_ntfy; 176a9643ea8Slogwang }; 177a9643ea8Slogwang 178a9643ea8Slogwang public: 179a9643ea8Slogwang Reset()180a9643ea8Slogwang virtual void Reset() { 181a9643ea8Slogwang _real_ntfy = NULL; 182a9643ea8Slogwang this->KqueuerObj::Reset(); 183a9643ea8Slogwang }; 184a9643ea8Slogwang KqueueCtlAdd(void * args)185a9643ea8Slogwang virtual int KqueueCtlAdd(void* args) { 186a9643ea8Slogwang if (!_real_ntfy) { 187a9643ea8Slogwang return -1; 188a9643ea8Slogwang } 189a9643ea8Slogwang 190a9643ea8Slogwang int events = this->GetEvents(); 191a9643ea8Slogwang if (!(events & KQ_EVENT_WRITE)) { 192a9643ea8Slogwang return 0; 193a9643ea8Slogwang } 194a9643ea8Slogwang 195a9643ea8Slogwang if (_real_ntfy->KqueueCtlAdd(args) < 0) { 196a9643ea8Slogwang return -2; 197a9643ea8Slogwang } 198a9643ea8Slogwang 199a9643ea8Slogwang _real_ntfy->InsertWriteWait(this); 200a9643ea8Slogwang return 0; 201a9643ea8Slogwang }; 202a9643ea8Slogwang KqueueCtlDel(void * args)203a9643ea8Slogwang virtual int KqueueCtlDel(void* args) { 204a9643ea8Slogwang if (!_real_ntfy) { 205a9643ea8Slogwang return -1; 206a9643ea8Slogwang } 207a9643ea8Slogwang 208a9643ea8Slogwang int events = this->GetEvents(); 209a9643ea8Slogwang if (!(events & KQ_EVENT_WRITE)) { 210a9643ea8Slogwang return 0; 211a9643ea8Slogwang } 212a9643ea8Slogwang 213a9643ea8Slogwang _real_ntfy->RemoveWriteWait(this); 214a9643ea8Slogwang return _real_ntfy->KqueueCtlDel(args); 215a9643ea8Slogwang }; 216a9643ea8Slogwang 217a9643ea8Slogwang private: 218*35a81399Slogwang ISessionNtfy* _real_ntfy; 219a9643ea8Slogwang 220a9643ea8Slogwang }; 221a9643ea8Slogwang 222a9643ea8Slogwang class TcpKeepNtfy: public KqueuerObj 223a9643ea8Slogwang { 224a9643ea8Slogwang public: 225a9643ea8Slogwang TcpKeepNtfy()226a9643ea8Slogwang TcpKeepNtfy() : _keep_conn(NULL){}; 227a9643ea8Slogwang 228a9643ea8Slogwang virtual int InputNotify(); 229a9643ea8Slogwang 230a9643ea8Slogwang virtual int OutputNotify(); 231a9643ea8Slogwang 232a9643ea8Slogwang virtual int HangupNotify(); 233a9643ea8Slogwang SetKeepNtfyObj(TcpKeepConn * obj)234a9643ea8Slogwang void SetKeepNtfyObj(TcpKeepConn* obj) { 235a9643ea8Slogwang _keep_conn = obj; 236a9643ea8Slogwang }; 237a9643ea8Slogwang GetKeepNtfyObj()238a9643ea8Slogwang TcpKeepConn* GetKeepNtfyObj() { 239a9643ea8Slogwang return _keep_conn; 240a9643ea8Slogwang }; 241a9643ea8Slogwang 242a9643ea8Slogwang void KeepaliveClose(); 243a9643ea8Slogwang 244a9643ea8Slogwang 245a9643ea8Slogwang private: 246*35a81399Slogwang TcpKeepConn* _keep_conn; 247a9643ea8Slogwang 248a9643ea8Slogwang }; 249a9643ea8Slogwang 250a9643ea8Slogwang template<typename ValueType> 251a9643ea8Slogwang class CPtrPool 252a9643ea8Slogwang { 253a9643ea8Slogwang public: 254*35a81399Slogwang typedef typename std::queue<ValueType*> PtrQueue; 255a9643ea8Slogwang 256a9643ea8Slogwang public: 257a9643ea8Slogwang _max_free(max)258a9643ea8Slogwang explicit CPtrPool(int max = 500) : _max_free(max), _total(0){}; 259a9643ea8Slogwang ~CPtrPool()260a9643ea8Slogwang ~CPtrPool() { 261a9643ea8Slogwang ValueType* ptr = NULL; 262a9643ea8Slogwang while (!_ptr_list.empty()) { 263a9643ea8Slogwang ptr = _ptr_list.front(); 264a9643ea8Slogwang _ptr_list.pop(); 265a9643ea8Slogwang delete ptr; 266a9643ea8Slogwang } 267a9643ea8Slogwang }; 268a9643ea8Slogwang AllocPtr()269a9643ea8Slogwang ValueType* AllocPtr() { 270a9643ea8Slogwang ValueType* ptr = NULL; 271a9643ea8Slogwang if (!_ptr_list.empty()) { 272a9643ea8Slogwang ptr = _ptr_list.front(); 273a9643ea8Slogwang _ptr_list.pop(); 274a9643ea8Slogwang } else { 275a9643ea8Slogwang ptr = new ValueType; 276a9643ea8Slogwang _total++; 277a9643ea8Slogwang } 278a9643ea8Slogwang 279a9643ea8Slogwang return ptr; 280a9643ea8Slogwang }; 281a9643ea8Slogwang FreePtr(ValueType * ptr)282a9643ea8Slogwang void FreePtr(ValueType* ptr) { 283a9643ea8Slogwang if ((int)_ptr_list.size() >= _max_free) { 284a9643ea8Slogwang delete ptr; 285a9643ea8Slogwang _total--; 286a9643ea8Slogwang } else { 287a9643ea8Slogwang _ptr_list.push(ptr); 288a9643ea8Slogwang } 289a9643ea8Slogwang }; 290a9643ea8Slogwang 291a9643ea8Slogwang protected: 292*35a81399Slogwang PtrQueue _ptr_list; 293*35a81399Slogwang int _max_free; 294*35a81399Slogwang int _total; 295a9643ea8Slogwang }; 296a9643ea8Slogwang 297a9643ea8Slogwang class NtfyObjMgr 298a9643ea8Slogwang { 299a9643ea8Slogwang public: 300a9643ea8Slogwang 301a9643ea8Slogwang typedef std::map<int, ISessionNtfy*> SessionMap; 302a9643ea8Slogwang typedef CPtrPool<KqueuerObj> NtfyThreadQueue; 303a9643ea8Slogwang typedef CPtrPool<SessionProxy> NtfySessionQueue; 304a9643ea8Slogwang 305a9643ea8Slogwang static NtfyObjMgr* Instance (void); 306a9643ea8Slogwang 307a9643ea8Slogwang static void Destroy(void); 308a9643ea8Slogwang 309a9643ea8Slogwang int RegisterSession(int session_name, ISessionNtfy* session); 310a9643ea8Slogwang 311a9643ea8Slogwang ISessionNtfy* GetNameSession(int session_name); 312a9643ea8Slogwang 313a9643ea8Slogwang KqueuerObj* GetNtfyObj(int type, int session_name = 0); 314a9643ea8Slogwang 315a9643ea8Slogwang void FreeNtfyObj(KqueuerObj* obj); 316a9643ea8Slogwang 317a9643ea8Slogwang ~NtfyObjMgr(); 318a9643ea8Slogwang 319a9643ea8Slogwang private: 320a9643ea8Slogwang 321a9643ea8Slogwang NtfyObjMgr(); 322a9643ea8Slogwang 323*35a81399Slogwang static NtfyObjMgr * _instance; 324*35a81399Slogwang SessionMap _session_map; 325*35a81399Slogwang NtfyThreadQueue _fd_ntfy_pool; 326*35a81399Slogwang NtfySessionQueue _udp_proxy_pool; 327a9643ea8Slogwang }; 328a9643ea8Slogwang 329a9643ea8Slogwang } 330a9643ea8Slogwang 331a9643ea8Slogwang #endif 332