1 2 /** 3 * Tencent is pleased to support the open source community by making MSEC available. 4 * 5 * Copyright (C) 2016 THL A29 Limited, a Tencent company. All rights reserved. 6 * 7 * Licensed under the GNU General Public License, Version 2.0 (the "License"); 8 * you may not use this file except in compliance with the License. You may 9 * obtain a copy of the License at 10 * 11 * https://opensource.org/licenses/GPL-2.0 12 * 13 * Unless required by applicable law or agreed to in writing, software distributed under the 14 * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, 15 * either express or implied. See the License for the specific language governing permissions 16 * and limitations under the License. 17 */ 18 19 20 /** 21 * @file mt_notify.cpp 22 * @info �̵߳���ע��������ʵ�� 23 * @time 20130924 24 **/ 25 #include <fcntl.h> 26 #include <sys/types.h> 27 #include <sys/socket.h> 28 #include <netinet/in.h> 29 #include <arpa/inet.h> 30 31 #include "micro_thread.h" 32 #include "mt_session.h" 33 #include "mt_msg.h" 34 #include "mt_notify.h" 35 #include "mt_connection.h" 36 #include "mt_sys_hook.h" 37 #include "ff_hook.h" 38 39 using namespace std; 40 using namespace NS_MICRO_THREAD; 41 42 43 /** 44 * @brief ֪ͨ�������ȴ�״̬, ����ȴ������� 45 * @param proxy �����sessionģ�� 46 */ 47 void ISessionNtfy::InsertWriteWait(SessionProxy* proxy) 48 { 49 if (!proxy->_flag) { 50 TAILQ_INSERT_TAIL(&_write_list, proxy, _write_entry); 51 proxy->_flag = 1; 52 } 53 } 54 55 /** 56 * @brief ֪ͨ�����Ƴ��ȴ�״̬ 57 * @param proxy �����sessionģ�� 58 */ 59 void ISessionNtfy::RemoveWriteWait(SessionProxy* proxy) 60 { 61 if (proxy->_flag) { 62 TAILQ_REMOVE(&_write_list, proxy, _write_entry); 63 proxy->_flag = 0; 64 } 65 } 66 67 /** 68 * @brief �۲���ģʽ, ֪ͨд�ȴ��߳� 69 * @info UDP����֪ͨÿ���߳�ִ��д����, TCP��Ҫ�Ŷ�д 70 */ 71 void UdpSessionNtfy::NotifyWriteWait() 72 { 73 MtFrame* frame = MtFrame::Instance(); 74 SessionProxy* proxy = NULL; 75 MicroThread* thread = NULL; 76 TAILQ_FOREACH(proxy, &_write_list, _write_entry) 77 { 78 proxy->SetRcvEvents(KQ_EVENT_WRITE); 79 80 thread = proxy->GetOwnerThread(); 81 if (thread && thread->HasFlag(MicroThread::IO_LIST)) 82 { 83 frame->RemoveIoWait(thread); 84 frame->InsertRunable(thread); 85 } 86 } 87 } 88 89 /** 90 * @brief ����socket, �����ɶ��¼� 91 * @return fd�ľ��, <0 ʧ�� 92 */ 93 int UdpSessionNtfy::CreateSocket() 94 { 95 // 1. UDP������, ÿ���´�SOCKET 96 int osfd = socket(AF_INET, SOCK_DGRAM, 0); 97 if (osfd < 0) 98 { 99 MTLOG_ERROR("socket create failed, errno %d(%s)", errno, strerror(errno)); 100 return -1; 101 } 102 103 // 2. ���������� 104 int flags = 1; 105 if (ioctl(osfd, FIONBIO, &flags) < 0) 106 { 107 MTLOG_ERROR("socket unblock failed, errno %d(%s)", errno, strerror(errno)); 108 close(osfd); 109 osfd = -1; 110 return -2; 111 } 112 113 // ��ѡbindִ��, ���ñ���port��ִ�� 114 if (_local_addr.sin_port != 0) 115 { 116 int ret = bind(osfd, (struct sockaddr *)&_local_addr, sizeof(_local_addr)); 117 if (ret < 0) 118 { 119 MTLOG_ERROR("socket bind(%s:%d) failed, errno %d(%s)", inet_ntoa(_local_addr.sin_addr), 120 ntohs(_local_addr.sin_port), errno, strerror(errno)); 121 close(osfd); 122 osfd = -1; 123 return -3; 124 } 125 } 126 127 // 3. ���¹�����Ϣ, Ĭ��udp session ���� epollin 128 this->SetOsfd(osfd); 129 this->EnableInput(); 130 MtFrame* frame = MtFrame::Instance(); 131 frame->KqueueNtfyReg(osfd, this); 132 frame->KqueueCtrlAdd(osfd, KQ_EVENT_READ); 133 134 return osfd; 135 } 136 137 138 /** 139 * @brief �ر�socket, ֹͣ�����ɶ��¼� 140 */ 141 void UdpSessionNtfy::CloseSocket() 142 { 143 int osfd = this->GetOsfd(); 144 if (osfd > 0) 145 { 146 MtFrame* frame = MtFrame::Instance(); 147 frame->KqueueCtrlDel(osfd, KQ_EVENT_READ); 148 frame->KqueueNtfyReg(osfd, NULL); 149 this->DisableInput(); 150 this->SetOsfd(-1); 151 close(osfd); 152 } 153 } 154 155 156 /** 157 * @brief �ɶ��¼�֪ͨ�ӿ�, ����֪ͨ������ܻ��ƻ�����, ���÷���ֵ���� 158 * @return 0 ��fd�ɼ������������¼�; !=0 ��fd�������ص����� 159 */ 160 int UdpSessionNtfy::InputNotify() 161 { 162 while (1) 163 { 164 int ret = 0; 165 int have_rcv_len = 0; 166 167 // 1. ��ȡ�հ�������, ����ѡ��δ�����������buff 168 if (!_msg_buff) { 169 _msg_buff = MsgBuffPool::Instance()->GetMsgBuf(this->GetMsgBuffSize()); 170 if (NULL == _msg_buff) { 171 MTLOG_ERROR("Get memory failed, size %d, wait next time", this->GetMsgBuffSize()); 172 return 0; 173 } 174 _msg_buff->SetBuffType(BUFF_RECV); 175 } 176 char* buff = (char*)_msg_buff->GetMsgBuff(); 177 178 // 2. ��ȡsocket, �հ����� 179 int osfd = this->GetOsfd(); 180 struct sockaddr_in from; 181 socklen_t fromlen = sizeof(from); 182 mt_hook_syscall(recvfrom); 183 ret = ff_hook_recvfrom(osfd, buff, _msg_buff->GetMaxLen(), 184 0, (struct sockaddr*)&from, &fromlen); 185 if (ret < 0) 186 { 187 if ((errno == EINTR) || (errno == EAGAIN) || (errno == EINPROGRESS)) 188 { 189 return 0; 190 } 191 else 192 { 193 MTLOG_ERROR("recv error, fd %d", osfd); 194 return 0; // ϵͳ����, UDP �ݲ��ر� 195 } 196 } 197 else if (ret == 0) 198 { 199 MTLOG_DEBUG("remote close connection, fd %d", osfd); 200 return 0; // �Զ˹ر�, UDP �ݲ��ر� 201 } 202 else 203 { 204 have_rcv_len = ret; 205 _msg_buff->SetHaveRcvLen(have_rcv_len); 206 _msg_buff->SetMsgLen(have_rcv_len); 207 } 208 209 // 3. �����Ϣ��������, ��ȡsessionid 210 int sessionid = 0; 211 ret = this->GetSessionId(buff, have_rcv_len, sessionid); 212 if (ret <= 0) 213 { 214 MTLOG_ERROR("recv get session failed, len %d, fd %d, drop it", 215 have_rcv_len, osfd); 216 MsgBuffPool::Instance()->FreeMsgBuf(_msg_buff); 217 _msg_buff = NULL; 218 return 0; 219 } 220 221 // 4. ӳ���ѯthread���, ����handle���, ���ö��¼�����, �ҽ�msgbuff 222 ISession* session = SessionMgr::Instance()->FindSession(sessionid); 223 if (NULL == session) 224 { 225 MT_ATTR_API(350403, 1); // session �����ѳ�ʱ 226 MTLOG_DEBUG("session %d, not find, maybe timeout, drop pkg", sessionid); 227 MsgBuffPool::Instance()->FreeMsgBuf(_msg_buff); 228 _msg_buff = NULL; 229 return 0; 230 } 231 232 // 5. �ҽ�recvbuff, �����߳� 233 IMtConnection* conn = session->GetSessionConn(); 234 MicroThread* thread = session->GetOwnerThread(); 235 if (!thread || !conn || !conn->GetNtfyObj()) 236 { 237 MTLOG_ERROR("sesson obj %p, no thread ptr %p, no conn %p wrong", 238 session, thread, conn); 239 MsgBuffPool::Instance()->FreeMsgBuf(_msg_buff); 240 _msg_buff = NULL; 241 return 0; 242 } 243 MtMsgBuf* msg = conn->GetMtMsgBuff(); 244 if (msg) { 245 MsgBuffPool::Instance()->FreeMsgBuf(msg); 246 } 247 conn->SetMtMsgBuff(_msg_buff); 248 _msg_buff = NULL; 249 250 conn->GetNtfyObj()->SetRcvEvents(KQ_EVENT_READ); 251 if (thread->HasFlag(MicroThread::IO_LIST)) 252 { 253 MtFrame* frame = MtFrame::Instance(); 254 frame->RemoveIoWait(thread); 255 frame->InsertRunable(thread); 256 } 257 } 258 259 return 0; 260 } 261 262 /** 263 * @brief ��д�¼�֪ͨ�ӿ�, ����֪ͨ������ܻ��ƻ�����, ���÷���ֵ���� 264 * @return 0 ��fd�ɼ������������¼�; !=0 ��fd�������ص����� 265 */ 266 int UdpSessionNtfy::OutputNotify() 267 { 268 NotifyWriteWait(); 269 return 0; 270 } 271 272 /** 273 * @brief �쳣֪ͨ�ӿ�, �ر�fd����, thread�ȴ�����ʱ 274 * @return ���Է���ֵ, ���������¼����� 275 */ 276 int UdpSessionNtfy::HangupNotify() 277 { 278 // 1. ����epoll ctl�����¼� 279 MtFrame* frame = MtFrame::Instance(); 280 frame->KqueueCtrlDel(this->GetOsfd(), this->GetEvents()); 281 282 MTLOG_ERROR("sesson obj %p, recv error event. fd %d", this, this->GetOsfd()); 283 284 // 2. ���´�socket 285 CloseSocket(); 286 287 // 3. �ؼ���epoll listen 288 CreateSocket(); 289 290 return 0; 291 } 292 293 /** 294 * @brief ����epoll�����¼��Ļص��ӿ�, ������ʼ��EPOLLIN, ż��EPOLLOUT 295 * @param args fd���ö����ָ�� 296 * @return 0 �ɹ�, < 0 ʧ��, Ҫ������ع�������ǰ״̬ 297 * @info Ĭ���Ǽ����ɶ��¼���, ����ֻ�����д�¼��ļ���ɾ�� 298 */ 299 int UdpSessionNtfy::KqueueCtlAdd(void* args) 300 { 301 MtFrame* frame = MtFrame::Instance(); 302 KqFdRef* fd_ref = (KqFdRef*)args; 303 //ASSERT(fd_ref != NULL); 304 305 int osfd = this->GetOsfd(); 306 307 // ֪ͨ������Ҫ����, FD֪ͨ���������ϲ��Ḵ��, ��������ͻ���, �쳣log��¼ 308 KqueuerObj* old_obj = fd_ref->GetNotifyObj(); 309 if ((old_obj != NULL) && (old_obj != this)) 310 { 311 MTLOG_ERROR("epfd ref conflict, fd: %d, old: %p, now: %p", osfd, old_obj, this); 312 return -1; 313 } 314 315 // ���ÿ�ܵ�epoll ctl�ӿ�, ����epoll ctrlϸ�� 316 if (!frame->KqueueCtrlAdd(osfd, KQ_EVENT_WRITE)) 317 { 318 MTLOG_ERROR("epfd ref add failed, log"); 319 return -2; 320 } 321 this->EnableOutput(); 322 323 return 0; 324 } 325 326 /** 327 * @brief ����epoll�����¼��Ļص��ӿ�, ������ʼ��EPOLLIN, ż��EPOLLOUT 328 * @param args fd���ö����ָ�� 329 * @return 0 �ɹ�, < 0 ʧ��, Ҫ������ع�������ǰ״̬ 330 */ 331 int UdpSessionNtfy::KqueueCtlDel(void* args) 332 { 333 MtFrame* frame = MtFrame::Instance(); 334 KqFdRef* fd_ref = (KqFdRef*)args; 335 //ASSERT(fd_ref != NULL); 336 337 int osfd = this->GetOsfd(); 338 339 // ֪ͨ������Ҫ����, FD֪ͨ���������ϲ��Ḵ��, ��������ͻ���, �쳣log��¼ 340 KqueuerObj* old_obj = fd_ref->GetNotifyObj(); 341 if (old_obj != this) 342 { 343 MTLOG_ERROR("epfd ref conflict, fd: %d, old: %p, now: %p", osfd, old_obj, this); 344 return -1; 345 } 346 347 // ���ÿ�ܵ�epoll ctl�ӿ�, ����epoll ctrlϸ�� 348 if (!frame->KqueueCtrlDel(osfd, KQ_EVENT_WRITE)) 349 { 350 MTLOG_ERROR("epfd ref del failed, log"); 351 return -2; 352 } 353 this->DisableOutput(); 354 355 return 0; 356 357 } 358 359 360 /** 361 * @brief �ɶ��¼�֪ͨ�ӿ�, ����֪ͨ������ܻ��ƻ�����, ���÷���ֵ���� 362 * @return 0 ��fd�ɼ������������¼�; !=0 ��fd�������ص����� 363 */ 364 int TcpKeepNtfy::InputNotify() 365 { 366 KeepaliveClose(); 367 return -1; 368 } 369 370 /** 371 * @brief ��д�¼�֪ͨ�ӿ�, ����֪ͨ������ܻ��ƻ�����, ���÷���ֵ���� 372 * @return 0 ��fd�ɼ������������¼�; !=0 ��fd�������ص����� 373 */ 374 int TcpKeepNtfy::OutputNotify() 375 { 376 KeepaliveClose(); 377 return -1; 378 } 379 380 /** 381 * @brief �쳣֪ͨ�ӿ� 382 * @return ���Է���ֵ, ���������¼����� 383 */ 384 int TcpKeepNtfy::HangupNotify() 385 { 386 KeepaliveClose(); 387 return -1; 388 } 389 390 391 /** 392 * @brief ����ʵ�����ӹرղ��� 393 */ 394 void TcpKeepNtfy::KeepaliveClose() 395 { 396 if (_keep_conn) { 397 MTLOG_DEBUG("remote close, fd %d, close connection", _fd); 398 ConnectionMgr::Instance()->CloseIdleTcpKeep(_keep_conn); 399 } else { 400 MTLOG_ERROR("_keep_conn ptr null, error"); 401 } 402 } 403 404 405 /** 406 * @brief sessionȫ�ֹ����� 407 * @return ȫ�־��ָ�� 408 */ 409 NtfyObjMgr* NtfyObjMgr::_instance = NULL; 410 NtfyObjMgr* NtfyObjMgr::Instance (void) 411 { 412 if (NULL == _instance) 413 { 414 _instance = new NtfyObjMgr; 415 } 416 417 return _instance; 418 } 419 420 /** 421 * @brief session����ȫ�ֵ����ٽӿ� 422 */ 423 void NtfyObjMgr::Destroy() 424 { 425 if( _instance != NULL ) 426 { 427 delete _instance; 428 _instance = NULL; 429 } 430 } 431 432 /** 433 * @brief ��Ϣbuff�Ĺ��캯�� 434 */ 435 NtfyObjMgr::NtfyObjMgr() 436 { 437 } 438 439 /** 440 * @brief ��������, ��������Դ, ������������ 441 */ 442 NtfyObjMgr::~NtfyObjMgr() 443 { 444 } 445 446 /** 447 * @brief ע�᳤����session��Ϣ 448 * @param session_name �����ӵı�ʶ, ÿ�����Ӵ���һ��session��װ��ʽ 449 * @param session �����Ӷ���ָ��, ������������ 450 * @return 0 �ɹ�, < 0 ʧ�� 451 */ 452 int NtfyObjMgr::RegisterSession(int session_name, ISessionNtfy* session) 453 { 454 if (session_name <= 0 || NULL == session) { 455 MTLOG_ERROR("session %d, register %p failed", session_name, session); 456 return -1; 457 } 458 459 SessionMap::iterator it = _session_map.find(session_name); 460 if (it != _session_map.end()) 461 { 462 MTLOG_ERROR("session %d, register %p already", session_name, session); 463 return -2; 464 } 465 466 _session_map.insert(SessionMap::value_type(session_name, session)); 467 468 return 0; 469 } 470 471 /** 472 * @brief ��ȡע�᳤����session��Ϣ 473 * @param session_name �����ӵı�ʶ, ÿ�����Ӵ���һ��session��װ��ʽ 474 * @return ������ָ��, ʧ��ΪNULL 475 */ 476 ISessionNtfy* NtfyObjMgr::GetNameSession(int session_name) 477 { 478 SessionMap::iterator it = _session_map.find(session_name); 479 if (it != _session_map.end()) 480 { 481 return it->second; 482 } 483 else 484 { 485 return NULL; 486 } 487 } 488 489 /** 490 * @brief ��ȡͨ��֪ͨ����, ���߳�֪ͨ������session֪ͨ������� 491 * @param type ����, �߳�֪ͨ���ͣ�UDP/TCP SESSION֪ͨ�� 492 * @param session_name proxyģ��,һ����ȡsession���� 493 * @return ֪ͨ�����ָ��, ʧ��ΪNULL 494 */ 495 KqueuerObj* NtfyObjMgr::GetNtfyObj(int type, int session_name) 496 { 497 KqueuerObj* obj = NULL; 498 SessionProxy* proxy = NULL; 499 500 switch (type) 501 { 502 case NTFY_OBJ_THREAD: 503 obj = _fd_ntfy_pool.AllocPtr(); 504 break; 505 506 case NTFY_OBJ_SESSION: 507 proxy = _udp_proxy_pool.AllocPtr(); 508 obj = proxy; 509 break; 510 511 case NTFY_OBJ_KEEPALIVE: // no need get this now 512 break; 513 514 default: 515 break; 516 } 517 518 // ��ȡ�ײ�ij����Ӷ���, ����������ʵ�ʵ�֪ͨ���� 519 if (proxy) { 520 ISessionNtfy* ntfy = this->GetNameSession(session_name); 521 if (!ntfy) { 522 MTLOG_ERROR("ntfy get session name(%d) failed", session_name); 523 this->FreeNtfyObj(proxy); 524 obj = NULL; 525 } else { 526 proxy->SetRealNtfyObj(ntfy); 527 } 528 } 529 530 return obj; 531 532 } 533 534 /** 535 * @brief �ͷ�֪ͨ����ָ�� 536 * @param obj ֪ͨ���� 537 */ 538 void NtfyObjMgr::FreeNtfyObj(KqueuerObj* obj) 539 { 540 SessionProxy* proxy = NULL; 541 if (!obj) { 542 return; 543 } 544 545 int type = obj->GetNtfyType(); 546 obj->Reset(); 547 548 switch (type) 549 { 550 case NTFY_OBJ_THREAD: 551 return _fd_ntfy_pool.FreePtr(obj); 552 break; 553 554 case NTFY_OBJ_SESSION: 555 proxy = dynamic_cast<SessionProxy*>(obj); 556 return _udp_proxy_pool.FreePtr(proxy); 557 break; 558 559 case NTFY_OBJ_KEEPALIVE: 560 break; 561 562 default: 563 break; 564 } 565 566 delete obj; 567 return; 568 } 569 570 571 572