1 2 /** 3 * Tencent is pleased to support the open source community by making MSEC available. 4 * 5 * Copyright (C) 2016 THL A29 Limited, a Tencent company. All rights reserved. 6 * 7 * Licensed under the GNU General Public License, Version 2.0 (the "License"); 8 * you may not use this file except in compliance with the License. You may 9 * obtain a copy of the License at 10 * 11 * https://opensource.org/licenses/GPL-2.0 12 * 13 * Unless required by applicable law or agreed to in writing, software distributed under the 14 * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, 15 * either express or implied. See the License for the specific language governing permissions 16 * and limitations under the License. 17 */ 18 19 20 /** 21 * @file mt_notify.h 22 * @info �߳�ע���֪ͨ����������� 23 * @time 20130926 24 **/ 25 26 #ifndef __MT_NOTIFY_H__ 27 #define __MT_NOTIFY_H__ 28 29 #include <netinet/in.h> 30 #include <queue> 31 #include <map> 32 #include "mt_mbuf_pool.h" 33 34 namespace NS_MICRO_THREAD { 35 36 using std::queue; 37 using std::map; 38 39 class SessionProxy; 40 class TcpKeepConn; 41 42 /** 43 * @brief ֪ͨ�������� 44 */ 45 enum NTFY_OBJ_TYPE 46 { 47 NTFY_OBJ_UNDEF = 0, ///< δ��������Ӷ��� 48 NTFY_OBJ_THREAD = 1, ///< �����Ӷ���, һ��fd��Ӧһ��thread 49 NTFY_OBJ_KEEPALIVE = 2, ///< TCP�������ֵ�notify����, ������ thread 50 NTFY_OBJ_SESSION = 3, ///< UDP��sessionģ��, ����ij����Ӷ��� 51 }; 52 53 /** 54 * @brief Э�����Ͷ��� 55 */ 56 enum MULTI_PROTO 57 { 58 MT_UNKNOWN = 0, 59 MT_UDP = 0x1, ///< �������� UDP 60 MT_TCP = 0x2 ///< �������� TCP 61 }; 62 63 /** 64 * @brief ������sessionģ��, �����շ����ȹ���ӿ� 65 */ 66 typedef TAILQ_ENTRY(SessionProxy) NtfyEntry; 67 typedef TAILQ_HEAD(__NtfyList, SessionProxy) NtfyList; 68 class ISessionNtfy : public KqueuerObj 69 { 70 public: 71 72 /** 73 * @brief ��鱨��������, ͬʱ��ȡsessionid��Ϣ 74 * @param pkg ����ָ�� 75 * @param len �����ѽ��ճ��� 76 * @param session ������sessionid, ������� 77 * @return <=0 ʧ��, >0 ʵ�ʱ��ij��� 78 */ 79 virtual int GetSessionId(void* pkg, int len, int& session) { return 0;}; 80 81 /** 82 * @brief ����socket, �����ɶ��¼� 83 * @return fd�ľ��, <0 ʧ�� 84 */ 85 virtual int CreateSocket(){return -1;}; 86 87 /** 88 * @brief �ر�socket, ֹͣ�����ɶ��¼� 89 */ 90 virtual void CloseSocket(){}; 91 92 /** 93 * @brief �ɶ��¼�֪ͨ�ӿ�, ����֪ͨ������ܻ��ƻ�����, ���÷���ֵ���� 94 * @return 0 ��fd�ɼ������������¼�; !=0 ��fd�������ص����� 95 */ 96 virtual int InputNotify(){return 0;}; 97 98 /** 99 * @brief ��д�¼�֪ͨ�ӿ�, ����֪ͨ������ܻ��ƻ�����, ���÷���ֵ���� 100 * @return 0 ��fd�ɼ������������¼�; !=0 ��fd�������ص����� 101 */ 102 virtual int OutputNotify(){return 0;}; 103 104 /** 105 * @brief �쳣֪ͨ�ӿ� 106 * @return ���Է���ֵ, ���������¼����� 107 */ 108 virtual int HangupNotify(){return 0;}; 109 110 /** 111 * @brief ����epoll�����¼��Ļص��ӿ�, ������ʼ��EPOLLIN, ż��EPOLLOUT 112 * @param args fd���ö����ָ�� 113 * @return 0 �ɹ�, < 0 ʧ��, Ҫ������ع�������ǰ״̬ 114 */ 115 virtual int KqueueCtlAdd(void* args){return 0;}; 116 117 /** 118 * @brief ����epoll�����¼��Ļص��ӿ�, ������ʼ��EPOLLIN, ż��EPOLLOUT 119 * @param args fd���ö����ָ�� 120 * @return 0 �ɹ�, < 0 ʧ��, Ҫ������ع�������ǰ״̬ 121 */ 122 virtual int KqueueCtlDel(void* args){return 0;}; 123 124 /** 125 * @brief ���캯���������� 126 */ 127 ISessionNtfy(): KqueuerObj(0) { 128 _proto = MT_UDP; 129 _buff_size = 0; 130 _msg_buff = NULL; 131 TAILQ_INIT(&_write_list); 132 } 133 virtual ~ISessionNtfy() { }; 134 135 /** 136 * @brief ���ñ��δ����proto��Ϣ 137 */ 138 void SetProtoType(MULTI_PROTO proto) { 139 _proto = proto; 140 }; 141 142 /** 143 * @brief ��ȡ���δ����proto��Ϣ 144 * @return proto type 145 */ 146 MULTI_PROTO GetProtoType() { 147 return _proto; 148 }; 149 150 /** 151 * @brief ����buff��С, ����ʵ��ʹ�õ�msgbuff���� 152 * @return 0�ɹ� 153 */ 154 void SetMsgBuffSize(int buff_size) { 155 _buff_size = buff_size; 156 }; 157 158 /** 159 * @brief ��ȡԤ�õ�buff��С, ��������, ����65535 160 * @return ����������Ϣbuff��� 161 */ 162 int GetMsgBuffSize() { 163 return (_buff_size > 0) ? _buff_size : 65535; 164 } 165 166 /** 167 * @brief ֪ͨ�������ȴ�״̬ 168 */ 169 void InsertWriteWait(SessionProxy* proxy); 170 171 /** 172 * @brief ֪ͨ����ȡ���ȴ�״̬ 173 */ 174 void RemoveWriteWait(SessionProxy* proxy); 175 176 /** 177 * @brief �۲���ģʽ, ֪ͨд�ȴ��߳� 178 * @info UDP����֪ͨÿ���߳�ִ��д����, TCP��Ҫ�Ŷ�д 179 */ 180 virtual void NotifyWriteWait(){}; 181 182 protected: 183 MULTI_PROTO _proto; // Э������ UDP/TCP 184 int _buff_size; // �����Ϣ���� 185 NtfyList _write_list; // ��д�ȴ����� 186 MtMsgBuf* _msg_buff; // ��ʱ�հ���Ż����� 187 }; 188 189 190 /** 191 * @brief UDP������sessionģ�͵Ļ���ӿ� 192 * @info ҵ��session��Ҫ�̳иýӿ�, ��������, ʵ�ֻ�ȡGetSessionId���� 193 * @info ������չ, ��ָ�����ض˿ڵ� 194 */ 195 class UdpSessionNtfy : public ISessionNtfy 196 { 197 public: 198 199 /** 200 * @brief ��鱨��������, ͬʱ��ȡsessionid��Ϣ, �ɼ̳���ʵ���� 201 * @param pkg ����ָ�� 202 * @param len �����ѽ��ճ��� 203 * @param session ������sessionid, ������� 204 * @return <=0 ʧ��, >0 ʵ�ʱ��ij��� 205 */ 206 virtual int GetSessionId(void* pkg, int len, int& session) { return 0;}; 207 208 209 public: 210 211 /** 212 * @brief �������������� 213 */ 214 UdpSessionNtfy() : ISessionNtfy(){ 215 ISessionNtfy::SetProtoType(MT_UDP); 216 217 _local_addr.sin_family = AF_INET; 218 _local_addr.sin_addr.s_addr = 0; 219 _local_addr.sin_port = 0; 220 } 221 virtual ~UdpSessionNtfy() { }; 222 223 /** 224 * @brief �۲���ģʽ, ֪ͨд�ȴ��߳� 225 * @info UDP����֪ͨÿ���߳�ִ��д����, TCP��Ҫ�Ŷ�д 226 */ 227 virtual void NotifyWriteWait(); 228 229 /** 230 * @brief ����socket, �����ɶ��¼� 231 * @return fd�ľ��, <0 ʧ�� 232 */ 233 virtual int CreateSocket(); 234 235 /** 236 * @brief �ر�socket, ֹͣ�����ɶ��¼� 237 */ 238 virtual void CloseSocket(); 239 240 /** 241 * @brief �ɶ��¼�֪ͨ�ӿ�, ����֪ͨ������ܻ��ƻ�����, ���÷���ֵ���� 242 * @return 0 ��fd�ɼ������������¼�; !=0 ��fd�������ص����� 243 */ 244 virtual int InputNotify(); 245 246 /** 247 * @brief ��д�¼�֪ͨ�ӿ�, ����֪ͨ������ܻ��ƻ�����, ���÷���ֵ���� 248 * @return 0 ��fd�ɼ������������¼�; !=0 ��fd�������ص����� 249 */ 250 virtual int OutputNotify(); 251 252 /** 253 * @brief �쳣֪ͨ�ӿ� 254 * @return ���Է���ֵ, ���������¼����� 255 */ 256 virtual int HangupNotify(); 257 258 /** 259 * @brief ����epoll�����¼��Ļص��ӿ�, ������ʼ��EPOLLIN, ż��EPOLLOUT 260 * @param args fd���ö����ָ�� 261 * @return 0 �ɹ�, < 0 ʧ��, Ҫ������ع�������ǰ״̬ 262 */ 263 virtual int KqueueCtlAdd(void* args); 264 265 /** 266 * @brief ����epoll�����¼��Ļص��ӿ�, ������ʼ��EPOLLIN, ż��EPOLLOUT 267 * @param args fd���ö����ָ�� 268 * @return 0 �ɹ�, < 0 ʧ��, Ҫ������ع�������ǰ״̬ 269 */ 270 virtual int KqueueCtlDel(void* args); 271 272 public: 273 274 /** 275 * @brief ����udp���صı���bind��ַ, �����bind���ͻ, ��ʱͣ�� 276 * ��������, �ܱ�֤ÿ����Ψһport��ʹ�� 277 */ 278 void SetLocalAddr(struct sockaddr_in* local_addr) { 279 memcpy(&_local_addr, local_addr, sizeof(_local_addr)); 280 }; 281 282 protected: 283 284 struct sockaddr_in _local_addr; 285 }; 286 287 288 289 /** 290 * @brief UDPģʽsessionģ�͵Ĵ���֪ͨ����, �������ӳ�䵽ijһ��session notify 291 * @info session proxy ������epollע��, �������¼�֪ͨ, ����Ҫ���ij�ʱ�� 292 */ 293 class SessionProxy : public KqueuerObj 294 { 295 public: 296 int _flag; ///< 0-���ڶ�����, 1-�ڵȴ����� 297 NtfyEntry _write_entry; ///< ������д�ȴ����еĹ������ 298 299 /** 300 * @brief ���ô������, ���������fd��� 301 */ 302 void SetRealNtfyObj(ISessionNtfy* obj) { 303 _real_ntfy = obj; 304 this->SetOsfd(obj->GetOsfd()); 305 }; 306 307 /** 308 * @brief ��ȡ�������ָ�� 309 */ 310 ISessionNtfy* GetRealNtfyObj() { 311 return _real_ntfy; 312 }; 313 314 public: 315 316 /** 317 * @brief ���մ���, ���������� 318 */ 319 virtual void Reset() { 320 _real_ntfy = NULL; 321 this->KqueuerObj::Reset(); 322 }; 323 324 /** 325 * @brief ����epoll�����¼��Ļص��ӿ�, ������ʼ��EPOLLIN, ż��EPOLLOUT 326 * @param args fd���ö����ָ�� 327 * @return 0 �ɹ�, < 0 ʧ��, Ҫ������ع�������ǰ״̬ 328 */ 329 virtual int KqueueCtlAdd(void* args) { 330 if (!_real_ntfy) { 331 return -1; 332 } 333 334 int events = this->GetEvents(); 335 if (!(events & KQ_EVENT_WRITE)) { 336 return 0; 337 } 338 339 if (_real_ntfy->KqueueCtlAdd(args) < 0) { 340 return -2; 341 } 342 343 _real_ntfy->InsertWriteWait(this); 344 return 0; 345 }; 346 347 /** 348 * @brief ����epoll�����¼��Ļص��ӿ�, ������ʼ��EPOLLIN, ż��EPOLLOUT 349 * @param args fd���ö����ָ�� 350 * @return 0 �ɹ�, < 0 ʧ��, Ҫ������ع�������ǰ״̬ 351 */ 352 virtual int KqueueCtlDel(void* args) { 353 if (!_real_ntfy) { 354 return -1; 355 } 356 357 int events = this->GetEvents(); 358 if (!(events & KQ_EVENT_WRITE)) { 359 return 0; 360 } 361 362 _real_ntfy->RemoveWriteWait(this); 363 return _real_ntfy->KqueueCtlDel(args); 364 }; 365 366 private: 367 ISessionNtfy* _real_ntfy; // ʵ�ʵ�ִ���� 368 369 }; 370 371 /** 372 * @brief TCPģʽ��keepalive֪ͨ����, �������Ŀɶ��¼�, ȷ���Ƿ�Զ˹ر� 373 */ 374 class TcpKeepNtfy: public KqueuerObj 375 { 376 public: 377 378 /** 379 * @brief ���캯�� 380 */ 381 TcpKeepNtfy() : _keep_conn(NULL){}; 382 383 /** 384 * @brief �ɶ��¼�֪ͨ�ӿ�, ����֪ͨ������ܻ��ƻ�����, ���÷���ֵ���� 385 * @return 0 ��fd�ɼ������������¼�; !=0 ��fd�������ص����� 386 */ 387 virtual int InputNotify(); 388 389 /** 390 * @brief ��д�¼�֪ͨ�ӿ�, ����֪ͨ������ܻ��ƻ�����, ���÷���ֵ���� 391 * @return 0 ��fd�ɼ������������¼�; !=0 ��fd�������ص����� 392 */ 393 virtual int OutputNotify(); 394 395 /** 396 * @brief �쳣֪ͨ�ӿ� 397 * @return ���Է���ֵ, ���������¼����� 398 */ 399 virtual int HangupNotify(); 400 401 /** 402 * @brief ���ô������ 403 */ 404 void SetKeepNtfyObj(TcpKeepConn* obj) { 405 _keep_conn = obj; 406 }; 407 408 /** 409 * @brief ��ȡ�������ָ�� 410 */ 411 TcpKeepConn* GetKeepNtfyObj() { 412 return _keep_conn; 413 }; 414 415 /** 416 * @brief ����ʵ�����ӹرղ��� 417 */ 418 void KeepaliveClose(); 419 420 421 private: 422 TcpKeepConn* _keep_conn; // ʵ�ʵ����������� 423 424 }; 425 426 427 /** 428 * @brief ��̬�ڴ��ģ����, ���ڷ���new/delete�Ķ������, ��һ���̶���������� 429 */ 430 template<typename ValueType> 431 class CPtrPool 432 { 433 public: 434 typedef typename std::queue<ValueType*> PtrQueue; ///< �ڴ�ָ����� 435 436 public: 437 438 /** 439 * @brief ��̬�ڴ�ع��캯�� 440 * @param max �����ж��б����ָ��Ԫ��, Ĭ��500 441 */ 442 explicit CPtrPool(int max = 500) : _max_free(max), _total(0){}; 443 444 /** 445 * @brief ��̬�ڴ����������, ���������freelist 446 */ 447 ~CPtrPool() { 448 ValueType* ptr = NULL; 449 while (!_ptr_list.empty()) { 450 ptr = _ptr_list.front(); 451 _ptr_list.pop(); 452 delete ptr; 453 } 454 }; 455 456 /** 457 * @brief �����ڴ�ָ��, ���ȴӻ����ȡ, ���п�����̬ new ���� 458 * @return ģ�����͵�ָ��Ԫ��, �ձ�ʾ�ڴ�����ʧ�� 459 */ 460 ValueType* AllocPtr() { 461 ValueType* ptr = NULL; 462 if (!_ptr_list.empty()) { 463 ptr = _ptr_list.front(); 464 _ptr_list.pop(); 465 } else { 466 ptr = new ValueType; 467 _total++; 468 } 469 470 return ptr; 471 }; 472 473 /** 474 * @brief �ͷ��ڴ�ָ��, �����ж��г������, ��ֱ���ͷ�, ������л��� 475 */ 476 void FreePtr(ValueType* ptr) { 477 if ((int)_ptr_list.size() >= _max_free) { 478 delete ptr; 479 _total--; 480 } else { 481 _ptr_list.push(ptr); 482 } 483 }; 484 485 protected: 486 PtrQueue _ptr_list; ///< ���ж��� 487 int _max_free; ///< ������Ԫ�� 488 int _total; ///< ����new�Ķ������ͳ�� 489 }; 490 491 492 /** 493 * @brief ֪ͨ����ȫ�ֹ����� 494 */ 495 class NtfyObjMgr 496 { 497 public: 498 499 typedef std::map<int, ISessionNtfy*> SessionMap; 500 typedef CPtrPool<KqueuerObj> NtfyThreadQueue; 501 typedef CPtrPool<SessionProxy> NtfySessionQueue; 502 503 /** 504 * @brief �Ự�����ĵ�ȫ�ֹ������ӿ� 505 * @return ȫ�־��ָ�� 506 */ 507 static NtfyObjMgr* Instance (void); 508 509 /** 510 * @brief ����ӿ� 511 */ 512 static void Destroy(void); 513 514 /** 515 * @brief ע�᳤����session��Ϣ 516 * @param session_name �����ӵı�ʶ, ÿ�����Ӵ���һ��session��װ��ʽ 517 * @param session �����Ӷ���ָ��, ������������ 518 * @return 0 �ɹ�, < 0 ʧ�� 519 */ 520 int RegisterSession(int session_name, ISessionNtfy* session); 521 522 /** 523 * @brief ��ȡע�᳤����session��Ϣ 524 * @param session_name �����ӵı�ʶ, ÿ�����Ӵ���һ��session��װ��ʽ 525 * @return ������ָ��, ʧ��ΪNULL 526 */ 527 ISessionNtfy* GetNameSession(int session_name); 528 529 /** 530 * @brief ��ȡͨ��֪ͨ����, ���߳�֪ͨ������session֪ͨ������� 531 * @param type ����, �߳�֪ͨ���ͣ�UDP/TCP SESSION֪ͨ�� 532 * @param session_name proxyģ��,һ����ȡsession���� 533 * @return ֪ͨ�����ָ��, ʧ��ΪNULL 534 */ 535 KqueuerObj* GetNtfyObj(int type, int session_name = 0); 536 537 538 /** 539 * @brief �ͷ�֪ͨ����ָ�� 540 * @param obj ֪ͨ���� 541 */ 542 void FreeNtfyObj(KqueuerObj* obj); 543 544 /** 545 * @brief �������� 546 */ 547 ~NtfyObjMgr(); 548 549 private: 550 551 /** 552 * @brief ��Ϣbuff�Ĺ��캯�� 553 */ 554 NtfyObjMgr(); 555 556 static NtfyObjMgr * _instance; ///< �������� 557 SessionMap _session_map; ///< ȫ�ֵ�ע��session���� 558 NtfyThreadQueue _fd_ntfy_pool; ///< fd֪ͨ���� 559 NtfySessionQueue _udp_proxy_pool; ///< fd֪ͨ���� 560 }; 561 562 563 564 } 565 566 #endif 567 568 569