1*a9643ea8Slogwang 2*a9643ea8Slogwang /** 3*a9643ea8Slogwang * Tencent is pleased to support the open source community by making MSEC available. 4*a9643ea8Slogwang * 5*a9643ea8Slogwang * Copyright (C) 2016 THL A29 Limited, a Tencent company. All rights reserved. 6*a9643ea8Slogwang * 7*a9643ea8Slogwang * Licensed under the GNU General Public License, Version 2.0 (the "License"); 8*a9643ea8Slogwang * you may not use this file except in compliance with the License. You may 9*a9643ea8Slogwang * obtain a copy of the License at 10*a9643ea8Slogwang * 11*a9643ea8Slogwang * https://opensource.org/licenses/GPL-2.0 12*a9643ea8Slogwang * 13*a9643ea8Slogwang * Unless required by applicable law or agreed to in writing, software distributed under the 14*a9643ea8Slogwang * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, 15*a9643ea8Slogwang * either express or implied. See the License for the specific language governing permissions 16*a9643ea8Slogwang * and limitations under the License. 17*a9643ea8Slogwang */ 18*a9643ea8Slogwang 19*a9643ea8Slogwang 20*a9643ea8Slogwang /** 21*a9643ea8Slogwang * @file mt_notify.cpp 22*a9643ea8Slogwang * @info �̵߳���ע��������ʵ�� 23*a9643ea8Slogwang * @time 20130924 24*a9643ea8Slogwang **/ 25*a9643ea8Slogwang #include <fcntl.h> 26*a9643ea8Slogwang #include <sys/types.h> 27*a9643ea8Slogwang #include <sys/socket.h> 28*a9643ea8Slogwang #include <netinet/in.h> 29*a9643ea8Slogwang #include <arpa/inet.h> 30*a9643ea8Slogwang 31*a9643ea8Slogwang #include "micro_thread.h" 32*a9643ea8Slogwang #include "mt_session.h" 33*a9643ea8Slogwang #include "mt_msg.h" 34*a9643ea8Slogwang #include "mt_notify.h" 35*a9643ea8Slogwang #include "mt_connection.h" 36*a9643ea8Slogwang #include "mt_sys_hook.h" 37*a9643ea8Slogwang #include "ff_hook.h" 38*a9643ea8Slogwang 39*a9643ea8Slogwang using namespace std; 40*a9643ea8Slogwang using namespace NS_MICRO_THREAD; 41*a9643ea8Slogwang 42*a9643ea8Slogwang 43*a9643ea8Slogwang /** 44*a9643ea8Slogwang * @brief ֪ͨ�������ȴ�״̬, ����ȴ������� 45*a9643ea8Slogwang * @param proxy �����sessionģ�� 46*a9643ea8Slogwang */ 47*a9643ea8Slogwang void ISessionNtfy::InsertWriteWait(SessionProxy* proxy) 48*a9643ea8Slogwang { 49*a9643ea8Slogwang if (!proxy->_flag) { 50*a9643ea8Slogwang TAILQ_INSERT_TAIL(&_write_list, proxy, _write_entry); 51*a9643ea8Slogwang proxy->_flag = 1; 52*a9643ea8Slogwang } 53*a9643ea8Slogwang } 54*a9643ea8Slogwang 55*a9643ea8Slogwang /** 56*a9643ea8Slogwang * @brief ֪ͨ�����Ƴ��ȴ�״̬ 57*a9643ea8Slogwang * @param proxy �����sessionģ�� 58*a9643ea8Slogwang */ 59*a9643ea8Slogwang void ISessionNtfy::RemoveWriteWait(SessionProxy* proxy) 60*a9643ea8Slogwang { 61*a9643ea8Slogwang if (proxy->_flag) { 62*a9643ea8Slogwang TAILQ_REMOVE(&_write_list, proxy, _write_entry); 63*a9643ea8Slogwang proxy->_flag = 0; 64*a9643ea8Slogwang } 65*a9643ea8Slogwang } 66*a9643ea8Slogwang 67*a9643ea8Slogwang /** 68*a9643ea8Slogwang * @brief �۲���ģʽ, ֪ͨд�ȴ��߳� 69*a9643ea8Slogwang * @info UDP����֪ͨÿ���߳�ִ��д����, TCP��Ҫ�Ŷ�д 70*a9643ea8Slogwang */ 71*a9643ea8Slogwang void UdpSessionNtfy::NotifyWriteWait() 72*a9643ea8Slogwang { 73*a9643ea8Slogwang MtFrame* frame = MtFrame::Instance(); 74*a9643ea8Slogwang SessionProxy* proxy = NULL; 75*a9643ea8Slogwang MicroThread* thread = NULL; 76*a9643ea8Slogwang TAILQ_FOREACH(proxy, &_write_list, _write_entry) 77*a9643ea8Slogwang { 78*a9643ea8Slogwang proxy->SetRcvEvents(KQ_EVENT_WRITE); 79*a9643ea8Slogwang 80*a9643ea8Slogwang thread = proxy->GetOwnerThread(); 81*a9643ea8Slogwang if (thread && thread->HasFlag(MicroThread::IO_LIST)) 82*a9643ea8Slogwang { 83*a9643ea8Slogwang frame->RemoveIoWait(thread); 84*a9643ea8Slogwang frame->InsertRunable(thread); 85*a9643ea8Slogwang } 86*a9643ea8Slogwang } 87*a9643ea8Slogwang } 88*a9643ea8Slogwang 89*a9643ea8Slogwang /** 90*a9643ea8Slogwang * @brief ����socket, �����ɶ��¼� 91*a9643ea8Slogwang * @return fd�ľ��, <0 ʧ�� 92*a9643ea8Slogwang */ 93*a9643ea8Slogwang int UdpSessionNtfy::CreateSocket() 94*a9643ea8Slogwang { 95*a9643ea8Slogwang // 1. UDP������, ÿ���´�SOCKET 96*a9643ea8Slogwang int osfd = socket(AF_INET, SOCK_DGRAM, 0); 97*a9643ea8Slogwang if (osfd < 0) 98*a9643ea8Slogwang { 99*a9643ea8Slogwang MTLOG_ERROR("socket create failed, errno %d(%s)", errno, strerror(errno)); 100*a9643ea8Slogwang return -1; 101*a9643ea8Slogwang } 102*a9643ea8Slogwang 103*a9643ea8Slogwang // 2. ���������� 104*a9643ea8Slogwang int flags = 1; 105*a9643ea8Slogwang if (ioctl(osfd, FIONBIO, &flags) < 0) 106*a9643ea8Slogwang { 107*a9643ea8Slogwang MTLOG_ERROR("socket unblock failed, errno %d(%s)", errno, strerror(errno)); 108*a9643ea8Slogwang close(osfd); 109*a9643ea8Slogwang osfd = -1; 110*a9643ea8Slogwang return -2; 111*a9643ea8Slogwang } 112*a9643ea8Slogwang 113*a9643ea8Slogwang // ��ѡbindִ��, ���ñ���port��ִ�� 114*a9643ea8Slogwang if (_local_addr.sin_port != 0) 115*a9643ea8Slogwang { 116*a9643ea8Slogwang int ret = bind(osfd, (struct sockaddr *)&_local_addr, sizeof(_local_addr)); 117*a9643ea8Slogwang if (ret < 0) 118*a9643ea8Slogwang { 119*a9643ea8Slogwang MTLOG_ERROR("socket bind(%s:%d) failed, errno %d(%s)", inet_ntoa(_local_addr.sin_addr), 120*a9643ea8Slogwang ntohs(_local_addr.sin_port), errno, strerror(errno)); 121*a9643ea8Slogwang close(osfd); 122*a9643ea8Slogwang osfd = -1; 123*a9643ea8Slogwang return -3; 124*a9643ea8Slogwang } 125*a9643ea8Slogwang } 126*a9643ea8Slogwang 127*a9643ea8Slogwang // 3. ���¹�����Ϣ, Ĭ��udp session ���� epollin 128*a9643ea8Slogwang this->SetOsfd(osfd); 129*a9643ea8Slogwang this->EnableInput(); 130*a9643ea8Slogwang MtFrame* frame = MtFrame::Instance(); 131*a9643ea8Slogwang frame->KqueueNtfyReg(osfd, this); 132*a9643ea8Slogwang frame->KqueueCtrlAdd(osfd, KQ_EVENT_READ); 133*a9643ea8Slogwang 134*a9643ea8Slogwang return osfd; 135*a9643ea8Slogwang } 136*a9643ea8Slogwang 137*a9643ea8Slogwang 138*a9643ea8Slogwang /** 139*a9643ea8Slogwang * @brief �ر�socket, ֹͣ�����ɶ��¼� 140*a9643ea8Slogwang */ 141*a9643ea8Slogwang void UdpSessionNtfy::CloseSocket() 142*a9643ea8Slogwang { 143*a9643ea8Slogwang int osfd = this->GetOsfd(); 144*a9643ea8Slogwang if (osfd > 0) 145*a9643ea8Slogwang { 146*a9643ea8Slogwang MtFrame* frame = MtFrame::Instance(); 147*a9643ea8Slogwang frame->KqueueCtrlDel(osfd, KQ_EVENT_READ); 148*a9643ea8Slogwang frame->KqueueNtfyReg(osfd, NULL); 149*a9643ea8Slogwang this->DisableInput(); 150*a9643ea8Slogwang this->SetOsfd(-1); 151*a9643ea8Slogwang close(osfd); 152*a9643ea8Slogwang } 153*a9643ea8Slogwang } 154*a9643ea8Slogwang 155*a9643ea8Slogwang 156*a9643ea8Slogwang /** 157*a9643ea8Slogwang * @brief �ɶ��¼�֪ͨ�ӿ�, ����֪ͨ������ܻ��ƻ�����, ���÷���ֵ���� 158*a9643ea8Slogwang * @return 0 ��fd�ɼ������������¼�; !=0 ��fd�������ص����� 159*a9643ea8Slogwang */ 160*a9643ea8Slogwang int UdpSessionNtfy::InputNotify() 161*a9643ea8Slogwang { 162*a9643ea8Slogwang while (1) 163*a9643ea8Slogwang { 164*a9643ea8Slogwang int ret = 0; 165*a9643ea8Slogwang int have_rcv_len = 0; 166*a9643ea8Slogwang 167*a9643ea8Slogwang // 1. ��ȡ�հ�������, ����ѡ��δ�����������buff 168*a9643ea8Slogwang if (!_msg_buff) { 169*a9643ea8Slogwang _msg_buff = MsgBuffPool::Instance()->GetMsgBuf(this->GetMsgBuffSize()); 170*a9643ea8Slogwang if (NULL == _msg_buff) { 171*a9643ea8Slogwang MTLOG_ERROR("Get memory failed, size %d, wait next time", this->GetMsgBuffSize()); 172*a9643ea8Slogwang return 0; 173*a9643ea8Slogwang } 174*a9643ea8Slogwang _msg_buff->SetBuffType(BUFF_RECV); 175*a9643ea8Slogwang } 176*a9643ea8Slogwang char* buff = (char*)_msg_buff->GetMsgBuff(); 177*a9643ea8Slogwang 178*a9643ea8Slogwang // 2. ��ȡsocket, �հ����� 179*a9643ea8Slogwang int osfd = this->GetOsfd(); 180*a9643ea8Slogwang struct sockaddr_in from; 181*a9643ea8Slogwang socklen_t fromlen = sizeof(from); 182*a9643ea8Slogwang mt_hook_syscall(recvfrom); 183*a9643ea8Slogwang ret = ff_hook_recvfrom(osfd, buff, _msg_buff->GetMaxLen(), 184*a9643ea8Slogwang 0, (struct sockaddr*)&from, &fromlen); 185*a9643ea8Slogwang if (ret < 0) 186*a9643ea8Slogwang { 187*a9643ea8Slogwang if ((errno == EINTR) || (errno == EAGAIN) || (errno == EINPROGRESS)) 188*a9643ea8Slogwang { 189*a9643ea8Slogwang return 0; 190*a9643ea8Slogwang } 191*a9643ea8Slogwang else 192*a9643ea8Slogwang { 193*a9643ea8Slogwang MTLOG_ERROR("recv error, fd %d", osfd); 194*a9643ea8Slogwang return 0; // ϵͳ����, UDP �ݲ��ر� 195*a9643ea8Slogwang } 196*a9643ea8Slogwang } 197*a9643ea8Slogwang else if (ret == 0) 198*a9643ea8Slogwang { 199*a9643ea8Slogwang MTLOG_DEBUG("remote close connection, fd %d", osfd); 200*a9643ea8Slogwang return 0; // �Զ˹ر�, UDP �ݲ��ر� 201*a9643ea8Slogwang } 202*a9643ea8Slogwang else 203*a9643ea8Slogwang { 204*a9643ea8Slogwang have_rcv_len = ret; 205*a9643ea8Slogwang _msg_buff->SetHaveRcvLen(have_rcv_len); 206*a9643ea8Slogwang _msg_buff->SetMsgLen(have_rcv_len); 207*a9643ea8Slogwang } 208*a9643ea8Slogwang 209*a9643ea8Slogwang // 3. �����Ϣ��������, ��ȡsessionid 210*a9643ea8Slogwang int sessionid = 0; 211*a9643ea8Slogwang ret = this->GetSessionId(buff, have_rcv_len, sessionid); 212*a9643ea8Slogwang if (ret <= 0) 213*a9643ea8Slogwang { 214*a9643ea8Slogwang MTLOG_ERROR("recv get session failed, len %d, fd %d, drop it", 215*a9643ea8Slogwang have_rcv_len, osfd); 216*a9643ea8Slogwang MsgBuffPool::Instance()->FreeMsgBuf(_msg_buff); 217*a9643ea8Slogwang _msg_buff = NULL; 218*a9643ea8Slogwang return 0; 219*a9643ea8Slogwang } 220*a9643ea8Slogwang 221*a9643ea8Slogwang // 4. ӳ���ѯthread���, ����handle���, ���ö��¼�����, �ҽ�msgbuff 222*a9643ea8Slogwang ISession* session = SessionMgr::Instance()->FindSession(sessionid); 223*a9643ea8Slogwang if (NULL == session) 224*a9643ea8Slogwang { 225*a9643ea8Slogwang MT_ATTR_API(350403, 1); // session �����ѳ�ʱ 226*a9643ea8Slogwang MTLOG_DEBUG("session %d, not find, maybe timeout, drop pkg", sessionid); 227*a9643ea8Slogwang MsgBuffPool::Instance()->FreeMsgBuf(_msg_buff); 228*a9643ea8Slogwang _msg_buff = NULL; 229*a9643ea8Slogwang return 0; 230*a9643ea8Slogwang } 231*a9643ea8Slogwang 232*a9643ea8Slogwang // 5. �ҽ�recvbuff, �����߳� 233*a9643ea8Slogwang IMtConnection* conn = session->GetSessionConn(); 234*a9643ea8Slogwang MicroThread* thread = session->GetOwnerThread(); 235*a9643ea8Slogwang if (!thread || !conn || !conn->GetNtfyObj()) 236*a9643ea8Slogwang { 237*a9643ea8Slogwang MTLOG_ERROR("sesson obj %p, no thread ptr %p, no conn %p wrong", 238*a9643ea8Slogwang session, thread, conn); 239*a9643ea8Slogwang MsgBuffPool::Instance()->FreeMsgBuf(_msg_buff); 240*a9643ea8Slogwang _msg_buff = NULL; 241*a9643ea8Slogwang return 0; 242*a9643ea8Slogwang } 243*a9643ea8Slogwang MtMsgBuf* msg = conn->GetMtMsgBuff(); 244*a9643ea8Slogwang if (msg) { 245*a9643ea8Slogwang MsgBuffPool::Instance()->FreeMsgBuf(msg); 246*a9643ea8Slogwang } 247*a9643ea8Slogwang conn->SetMtMsgBuff(_msg_buff); 248*a9643ea8Slogwang _msg_buff = NULL; 249*a9643ea8Slogwang 250*a9643ea8Slogwang conn->GetNtfyObj()->SetRcvEvents(KQ_EVENT_READ); 251*a9643ea8Slogwang if (thread->HasFlag(MicroThread::IO_LIST)) 252*a9643ea8Slogwang { 253*a9643ea8Slogwang MtFrame* frame = MtFrame::Instance(); 254*a9643ea8Slogwang frame->RemoveIoWait(thread); 255*a9643ea8Slogwang frame->InsertRunable(thread); 256*a9643ea8Slogwang } 257*a9643ea8Slogwang } 258*a9643ea8Slogwang 259*a9643ea8Slogwang return 0; 260*a9643ea8Slogwang } 261*a9643ea8Slogwang 262*a9643ea8Slogwang /** 263*a9643ea8Slogwang * @brief ��д�¼�֪ͨ�ӿ�, ����֪ͨ������ܻ��ƻ�����, ���÷���ֵ���� 264*a9643ea8Slogwang * @return 0 ��fd�ɼ������������¼�; !=0 ��fd�������ص����� 265*a9643ea8Slogwang */ 266*a9643ea8Slogwang int UdpSessionNtfy::OutputNotify() 267*a9643ea8Slogwang { 268*a9643ea8Slogwang NotifyWriteWait(); 269*a9643ea8Slogwang return 0; 270*a9643ea8Slogwang } 271*a9643ea8Slogwang 272*a9643ea8Slogwang /** 273*a9643ea8Slogwang * @brief �쳣֪ͨ�ӿ�, �ر�fd����, thread�ȴ�����ʱ 274*a9643ea8Slogwang * @return ���Է���ֵ, ���������¼����� 275*a9643ea8Slogwang */ 276*a9643ea8Slogwang int UdpSessionNtfy::HangupNotify() 277*a9643ea8Slogwang { 278*a9643ea8Slogwang // 1. ����epoll ctl�����¼� 279*a9643ea8Slogwang MtFrame* frame = MtFrame::Instance(); 280*a9643ea8Slogwang frame->KqueueCtrlDel(this->GetOsfd(), this->GetEvents()); 281*a9643ea8Slogwang 282*a9643ea8Slogwang MTLOG_ERROR("sesson obj %p, recv error event. fd %d", this, this->GetOsfd()); 283*a9643ea8Slogwang 284*a9643ea8Slogwang // 2. ���´�socket 285*a9643ea8Slogwang CloseSocket(); 286*a9643ea8Slogwang 287*a9643ea8Slogwang // 3. �ؼ���epoll listen 288*a9643ea8Slogwang CreateSocket(); 289*a9643ea8Slogwang 290*a9643ea8Slogwang return 0; 291*a9643ea8Slogwang } 292*a9643ea8Slogwang 293*a9643ea8Slogwang /** 294*a9643ea8Slogwang * @brief ����epoll�����¼��Ļص��ӿ�, ������ʼ��EPOLLIN, ż��EPOLLOUT 295*a9643ea8Slogwang * @param args fd���ö����ָ�� 296*a9643ea8Slogwang * @return 0 �ɹ�, < 0 ʧ��, Ҫ������ع�������ǰ״̬ 297*a9643ea8Slogwang * @info Ĭ���Ǽ����ɶ��¼���, ����ֻ�����д�¼��ļ���ɾ�� 298*a9643ea8Slogwang */ 299*a9643ea8Slogwang int UdpSessionNtfy::KqueueCtlAdd(void* args) 300*a9643ea8Slogwang { 301*a9643ea8Slogwang MtFrame* frame = MtFrame::Instance(); 302*a9643ea8Slogwang KqFdRef* fd_ref = (KqFdRef*)args; 303*a9643ea8Slogwang //ASSERT(fd_ref != NULL); 304*a9643ea8Slogwang 305*a9643ea8Slogwang int osfd = this->GetOsfd(); 306*a9643ea8Slogwang 307*a9643ea8Slogwang // ֪ͨ������Ҫ����, FD֪ͨ���������ϲ��Ḵ��, ��������ͻ���, �쳣log��¼ 308*a9643ea8Slogwang KqueuerObj* old_obj = fd_ref->GetNotifyObj(); 309*a9643ea8Slogwang if ((old_obj != NULL) && (old_obj != this)) 310*a9643ea8Slogwang { 311*a9643ea8Slogwang MTLOG_ERROR("epfd ref conflict, fd: %d, old: %p, now: %p", osfd, old_obj, this); 312*a9643ea8Slogwang return -1; 313*a9643ea8Slogwang } 314*a9643ea8Slogwang 315*a9643ea8Slogwang // ���ÿ�ܵ�epoll ctl�ӿ�, ����epoll ctrlϸ�� 316*a9643ea8Slogwang if (!frame->KqueueCtrlAdd(osfd, KQ_EVENT_WRITE)) 317*a9643ea8Slogwang { 318*a9643ea8Slogwang MTLOG_ERROR("epfd ref add failed, log"); 319*a9643ea8Slogwang return -2; 320*a9643ea8Slogwang } 321*a9643ea8Slogwang this->EnableOutput(); 322*a9643ea8Slogwang 323*a9643ea8Slogwang return 0; 324*a9643ea8Slogwang } 325*a9643ea8Slogwang 326*a9643ea8Slogwang /** 327*a9643ea8Slogwang * @brief ����epoll�����¼��Ļص��ӿ�, ������ʼ��EPOLLIN, ż��EPOLLOUT 328*a9643ea8Slogwang * @param args fd���ö����ָ�� 329*a9643ea8Slogwang * @return 0 �ɹ�, < 0 ʧ��, Ҫ������ع�������ǰ״̬ 330*a9643ea8Slogwang */ 331*a9643ea8Slogwang int UdpSessionNtfy::KqueueCtlDel(void* args) 332*a9643ea8Slogwang { 333*a9643ea8Slogwang MtFrame* frame = MtFrame::Instance(); 334*a9643ea8Slogwang KqFdRef* fd_ref = (KqFdRef*)args; 335*a9643ea8Slogwang //ASSERT(fd_ref != NULL); 336*a9643ea8Slogwang 337*a9643ea8Slogwang int osfd = this->GetOsfd(); 338*a9643ea8Slogwang 339*a9643ea8Slogwang // ֪ͨ������Ҫ����, FD֪ͨ���������ϲ��Ḵ��, ��������ͻ���, �쳣log��¼ 340*a9643ea8Slogwang KqueuerObj* old_obj = fd_ref->GetNotifyObj(); 341*a9643ea8Slogwang if (old_obj != this) 342*a9643ea8Slogwang { 343*a9643ea8Slogwang MTLOG_ERROR("epfd ref conflict, fd: %d, old: %p, now: %p", osfd, old_obj, this); 344*a9643ea8Slogwang return -1; 345*a9643ea8Slogwang } 346*a9643ea8Slogwang 347*a9643ea8Slogwang // ���ÿ�ܵ�epoll ctl�ӿ�, ����epoll ctrlϸ�� 348*a9643ea8Slogwang if (!frame->KqueueCtrlDel(osfd, KQ_EVENT_WRITE)) 349*a9643ea8Slogwang { 350*a9643ea8Slogwang MTLOG_ERROR("epfd ref del failed, log"); 351*a9643ea8Slogwang return -2; 352*a9643ea8Slogwang } 353*a9643ea8Slogwang this->DisableOutput(); 354*a9643ea8Slogwang 355*a9643ea8Slogwang return 0; 356*a9643ea8Slogwang 357*a9643ea8Slogwang } 358*a9643ea8Slogwang 359*a9643ea8Slogwang 360*a9643ea8Slogwang /** 361*a9643ea8Slogwang * @brief �ɶ��¼�֪ͨ�ӿ�, ����֪ͨ������ܻ��ƻ�����, ���÷���ֵ���� 362*a9643ea8Slogwang * @return 0 ��fd�ɼ������������¼�; !=0 ��fd�������ص����� 363*a9643ea8Slogwang */ 364*a9643ea8Slogwang int TcpKeepNtfy::InputNotify() 365*a9643ea8Slogwang { 366*a9643ea8Slogwang KeepaliveClose(); 367*a9643ea8Slogwang return -1; 368*a9643ea8Slogwang } 369*a9643ea8Slogwang 370*a9643ea8Slogwang /** 371*a9643ea8Slogwang * @brief ��д�¼�֪ͨ�ӿ�, ����֪ͨ������ܻ��ƻ�����, ���÷���ֵ���� 372*a9643ea8Slogwang * @return 0 ��fd�ɼ������������¼�; !=0 ��fd�������ص����� 373*a9643ea8Slogwang */ 374*a9643ea8Slogwang int TcpKeepNtfy::OutputNotify() 375*a9643ea8Slogwang { 376*a9643ea8Slogwang KeepaliveClose(); 377*a9643ea8Slogwang return -1; 378*a9643ea8Slogwang } 379*a9643ea8Slogwang 380*a9643ea8Slogwang /** 381*a9643ea8Slogwang * @brief �쳣֪ͨ�ӿ� 382*a9643ea8Slogwang * @return ���Է���ֵ, ���������¼����� 383*a9643ea8Slogwang */ 384*a9643ea8Slogwang int TcpKeepNtfy::HangupNotify() 385*a9643ea8Slogwang { 386*a9643ea8Slogwang KeepaliveClose(); 387*a9643ea8Slogwang return -1; 388*a9643ea8Slogwang } 389*a9643ea8Slogwang 390*a9643ea8Slogwang 391*a9643ea8Slogwang /** 392*a9643ea8Slogwang * @brief ����ʵ�����ӹرղ��� 393*a9643ea8Slogwang */ 394*a9643ea8Slogwang void TcpKeepNtfy::KeepaliveClose() 395*a9643ea8Slogwang { 396*a9643ea8Slogwang if (_keep_conn) { 397*a9643ea8Slogwang MTLOG_DEBUG("remote close, fd %d, close connection", _fd); 398*a9643ea8Slogwang ConnectionMgr::Instance()->CloseIdleTcpKeep(_keep_conn); 399*a9643ea8Slogwang } else { 400*a9643ea8Slogwang MTLOG_ERROR("_keep_conn ptr null, error"); 401*a9643ea8Slogwang } 402*a9643ea8Slogwang } 403*a9643ea8Slogwang 404*a9643ea8Slogwang 405*a9643ea8Slogwang /** 406*a9643ea8Slogwang * @brief sessionȫ�ֹ����� 407*a9643ea8Slogwang * @return ȫ�־��ָ�� 408*a9643ea8Slogwang */ 409*a9643ea8Slogwang NtfyObjMgr* NtfyObjMgr::_instance = NULL; 410*a9643ea8Slogwang NtfyObjMgr* NtfyObjMgr::Instance (void) 411*a9643ea8Slogwang { 412*a9643ea8Slogwang if (NULL == _instance) 413*a9643ea8Slogwang { 414*a9643ea8Slogwang _instance = new NtfyObjMgr; 415*a9643ea8Slogwang } 416*a9643ea8Slogwang 417*a9643ea8Slogwang return _instance; 418*a9643ea8Slogwang } 419*a9643ea8Slogwang 420*a9643ea8Slogwang /** 421*a9643ea8Slogwang * @brief session����ȫ�ֵ����ٽӿ� 422*a9643ea8Slogwang */ 423*a9643ea8Slogwang void NtfyObjMgr::Destroy() 424*a9643ea8Slogwang { 425*a9643ea8Slogwang if( _instance != NULL ) 426*a9643ea8Slogwang { 427*a9643ea8Slogwang delete _instance; 428*a9643ea8Slogwang _instance = NULL; 429*a9643ea8Slogwang } 430*a9643ea8Slogwang } 431*a9643ea8Slogwang 432*a9643ea8Slogwang /** 433*a9643ea8Slogwang * @brief ��Ϣbuff�Ĺ��캯�� 434*a9643ea8Slogwang */ 435*a9643ea8Slogwang NtfyObjMgr::NtfyObjMgr() 436*a9643ea8Slogwang { 437*a9643ea8Slogwang } 438*a9643ea8Slogwang 439*a9643ea8Slogwang /** 440*a9643ea8Slogwang * @brief ��������, ��������Դ, ������������ 441*a9643ea8Slogwang */ 442*a9643ea8Slogwang NtfyObjMgr::~NtfyObjMgr() 443*a9643ea8Slogwang { 444*a9643ea8Slogwang } 445*a9643ea8Slogwang 446*a9643ea8Slogwang /** 447*a9643ea8Slogwang * @brief ע�᳤����session��Ϣ 448*a9643ea8Slogwang * @param session_name �����ӵı�ʶ, ÿ�����Ӵ���һ��session��װ��ʽ 449*a9643ea8Slogwang * @param session �����Ӷ���ָ��, ������������ 450*a9643ea8Slogwang * @return 0 �ɹ�, < 0 ʧ�� 451*a9643ea8Slogwang */ 452*a9643ea8Slogwang int NtfyObjMgr::RegisterSession(int session_name, ISessionNtfy* session) 453*a9643ea8Slogwang { 454*a9643ea8Slogwang if (session_name <= 0 || NULL == session) { 455*a9643ea8Slogwang MTLOG_ERROR("session %d, register %p failed", session_name, session); 456*a9643ea8Slogwang return -1; 457*a9643ea8Slogwang } 458*a9643ea8Slogwang 459*a9643ea8Slogwang SessionMap::iterator it = _session_map.find(session_name); 460*a9643ea8Slogwang if (it != _session_map.end()) 461*a9643ea8Slogwang { 462*a9643ea8Slogwang MTLOG_ERROR("session %d, register %p already", session_name, session); 463*a9643ea8Slogwang return -2; 464*a9643ea8Slogwang } 465*a9643ea8Slogwang 466*a9643ea8Slogwang _session_map.insert(SessionMap::value_type(session_name, session)); 467*a9643ea8Slogwang 468*a9643ea8Slogwang return 0; 469*a9643ea8Slogwang } 470*a9643ea8Slogwang 471*a9643ea8Slogwang /** 472*a9643ea8Slogwang * @brief ��ȡע�᳤����session��Ϣ 473*a9643ea8Slogwang * @param session_name �����ӵı�ʶ, ÿ�����Ӵ���һ��session��װ��ʽ 474*a9643ea8Slogwang * @return ������ָ��, ʧ��ΪNULL 475*a9643ea8Slogwang */ 476*a9643ea8Slogwang ISessionNtfy* NtfyObjMgr::GetNameSession(int session_name) 477*a9643ea8Slogwang { 478*a9643ea8Slogwang SessionMap::iterator it = _session_map.find(session_name); 479*a9643ea8Slogwang if (it != _session_map.end()) 480*a9643ea8Slogwang { 481*a9643ea8Slogwang return it->second; 482*a9643ea8Slogwang } 483*a9643ea8Slogwang else 484*a9643ea8Slogwang { 485*a9643ea8Slogwang return NULL; 486*a9643ea8Slogwang } 487*a9643ea8Slogwang } 488*a9643ea8Slogwang 489*a9643ea8Slogwang /** 490*a9643ea8Slogwang * @brief ��ȡͨ��֪ͨ����, ���߳�֪ͨ������session֪ͨ������� 491*a9643ea8Slogwang * @param type ����, �߳�֪ͨ���ͣ�UDP/TCP SESSION֪ͨ�� 492*a9643ea8Slogwang * @param session_name proxyģ��,һ����ȡsession���� 493*a9643ea8Slogwang * @return ֪ͨ�����ָ��, ʧ��ΪNULL 494*a9643ea8Slogwang */ 495*a9643ea8Slogwang KqueuerObj* NtfyObjMgr::GetNtfyObj(int type, int session_name) 496*a9643ea8Slogwang { 497*a9643ea8Slogwang KqueuerObj* obj = NULL; 498*a9643ea8Slogwang SessionProxy* proxy = NULL; 499*a9643ea8Slogwang 500*a9643ea8Slogwang switch (type) 501*a9643ea8Slogwang { 502*a9643ea8Slogwang case NTFY_OBJ_THREAD: 503*a9643ea8Slogwang obj = _fd_ntfy_pool.AllocPtr(); 504*a9643ea8Slogwang break; 505*a9643ea8Slogwang 506*a9643ea8Slogwang case NTFY_OBJ_SESSION: 507*a9643ea8Slogwang proxy = _udp_proxy_pool.AllocPtr(); 508*a9643ea8Slogwang obj = proxy; 509*a9643ea8Slogwang break; 510*a9643ea8Slogwang 511*a9643ea8Slogwang case NTFY_OBJ_KEEPALIVE: // no need get this now 512*a9643ea8Slogwang break; 513*a9643ea8Slogwang 514*a9643ea8Slogwang default: 515*a9643ea8Slogwang break; 516*a9643ea8Slogwang } 517*a9643ea8Slogwang 518*a9643ea8Slogwang // ��ȡ�ײ�ij����Ӷ���, ����������ʵ�ʵ�֪ͨ���� 519*a9643ea8Slogwang if (proxy) { 520*a9643ea8Slogwang ISessionNtfy* ntfy = this->GetNameSession(session_name); 521*a9643ea8Slogwang if (!ntfy) { 522*a9643ea8Slogwang MTLOG_ERROR("ntfy get session name(%d) failed", session_name); 523*a9643ea8Slogwang this->FreeNtfyObj(proxy); 524*a9643ea8Slogwang obj = NULL; 525*a9643ea8Slogwang } else { 526*a9643ea8Slogwang proxy->SetRealNtfyObj(ntfy); 527*a9643ea8Slogwang } 528*a9643ea8Slogwang } 529*a9643ea8Slogwang 530*a9643ea8Slogwang return obj; 531*a9643ea8Slogwang 532*a9643ea8Slogwang } 533*a9643ea8Slogwang 534*a9643ea8Slogwang /** 535*a9643ea8Slogwang * @brief �ͷ�֪ͨ����ָ�� 536*a9643ea8Slogwang * @param obj ֪ͨ���� 537*a9643ea8Slogwang */ 538*a9643ea8Slogwang void NtfyObjMgr::FreeNtfyObj(KqueuerObj* obj) 539*a9643ea8Slogwang { 540*a9643ea8Slogwang SessionProxy* proxy = NULL; 541*a9643ea8Slogwang if (!obj) { 542*a9643ea8Slogwang return; 543*a9643ea8Slogwang } 544*a9643ea8Slogwang 545*a9643ea8Slogwang int type = obj->GetNtfyType(); 546*a9643ea8Slogwang obj->Reset(); 547*a9643ea8Slogwang 548*a9643ea8Slogwang switch (type) 549*a9643ea8Slogwang { 550*a9643ea8Slogwang case NTFY_OBJ_THREAD: 551*a9643ea8Slogwang return _fd_ntfy_pool.FreePtr(obj); 552*a9643ea8Slogwang break; 553*a9643ea8Slogwang 554*a9643ea8Slogwang case NTFY_OBJ_SESSION: 555*a9643ea8Slogwang proxy = dynamic_cast<SessionProxy*>(obj); 556*a9643ea8Slogwang return _udp_proxy_pool.FreePtr(proxy); 557*a9643ea8Slogwang break; 558*a9643ea8Slogwang 559*a9643ea8Slogwang case NTFY_OBJ_KEEPALIVE: 560*a9643ea8Slogwang break; 561*a9643ea8Slogwang 562*a9643ea8Slogwang default: 563*a9643ea8Slogwang break; 564*a9643ea8Slogwang } 565*a9643ea8Slogwang 566*a9643ea8Slogwang delete obj; 567*a9643ea8Slogwang return; 568*a9643ea8Slogwang } 569*a9643ea8Slogwang 570*a9643ea8Slogwang 571*a9643ea8Slogwang 572