1 2 /** 3 * Tencent is pleased to support the open source community by making MSEC available. 4 * 5 * Copyright (C) 2016 THL A29 Limited, a Tencent company. All rights reserved. 6 * 7 * Licensed under the GNU General Public License, Version 2.0 (the "License"); 8 * you may not use this file except in compliance with the License. You may 9 * obtain a copy of the License at 10 * 11 * https://opensource.org/licenses/GPL-2.0 12 * 13 * Unless required by applicable law or agreed to in writing, software distributed under the 14 * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, 15 * either express or implied. See the License for the specific language governing permissions 16 * and limitations under the License. 17 */ 18 19 20 /** 21 * @file mt_notify.cpp 22 * @time 20130924 23 **/ 24 #include <fcntl.h> 25 #include <sys/types.h> 26 #include <sys/socket.h> 27 #include <netinet/in.h> 28 #include <arpa/inet.h> 29 30 #include "micro_thread.h" 31 #include "mt_session.h" 32 #include "mt_msg.h" 33 #include "mt_notify.h" 34 #include "mt_connection.h" 35 #include "mt_sys_hook.h" 36 #include "ff_hook.h" 37 38 using namespace std; 39 using namespace NS_MICRO_THREAD; 40 41 void ISessionNtfy::InsertWriteWait(SessionProxy* proxy) 42 { 43 if (!proxy->_flag) { 44 TAILQ_INSERT_TAIL(&_write_list, proxy, _write_entry); 45 proxy->_flag = 1; 46 } 47 } 48 49 void ISessionNtfy::RemoveWriteWait(SessionProxy* proxy) 50 { 51 if (proxy->_flag) { 52 TAILQ_REMOVE(&_write_list, proxy, _write_entry); 53 proxy->_flag = 0; 54 } 55 } 56 57 void UdpSessionNtfy::NotifyWriteWait() 58 { 59 MtFrame* frame = MtFrame::Instance(); 60 SessionProxy* proxy = NULL; 61 MicroThread* thread = NULL; 62 TAILQ_FOREACH(proxy, &_write_list, _write_entry) 63 { 64 proxy->SetRcvEvents(KQ_EVENT_WRITE); 65 66 thread = proxy->GetOwnerThread(); 67 if (thread && thread->HasFlag(MicroThread::IO_LIST)) 68 { 69 frame->RemoveIoWait(thread); 70 frame->InsertRunable(thread); 71 } 72 } 73 } 74 75 int UdpSessionNtfy::CreateSocket() 76 { 77 int osfd = socket(AF_INET, SOCK_DGRAM, 0); 78 if (osfd < 0) 79 { 80 MTLOG_ERROR("socket create failed, errno %d(%s)", errno, strerror(errno)); 81 return -1; 82 } 83 84 int flags = 1; 85 if (ioctl(osfd, FIONBIO, &flags) < 0) 86 { 87 MTLOG_ERROR("socket unblock failed, errno %d(%s)", errno, strerror(errno)); 88 close(osfd); 89 osfd = -1; 90 return -2; 91 } 92 93 if (_local_addr.sin_port != 0) 94 { 95 int ret = bind(osfd, (struct sockaddr *)&_local_addr, sizeof(_local_addr)); 96 if (ret < 0) 97 { 98 MTLOG_ERROR("socket bind(%s:%d) failed, errno %d(%s)", inet_ntoa(_local_addr.sin_addr), 99 ntohs(_local_addr.sin_port), errno, strerror(errno)); 100 close(osfd); 101 osfd = -1; 102 return -3; 103 } 104 } 105 106 this->SetOsfd(osfd); 107 this->EnableInput(); 108 MtFrame* frame = MtFrame::Instance(); 109 frame->KqueueNtfyReg(osfd, this); 110 frame->KqueueCtrlAdd(osfd, KQ_EVENT_READ); 111 112 return osfd; 113 } 114 115 void UdpSessionNtfy::CloseSocket() 116 { 117 int osfd = this->GetOsfd(); 118 if (osfd > 0) 119 { 120 MtFrame* frame = MtFrame::Instance(); 121 frame->KqueueCtrlDel(osfd, KQ_EVENT_READ); 122 frame->KqueueNtfyReg(osfd, NULL); 123 this->DisableInput(); 124 this->SetOsfd(-1); 125 close(osfd); 126 } 127 } 128 129 int UdpSessionNtfy::InputNotify() 130 { 131 while (1) 132 { 133 int ret = 0; 134 int have_rcv_len = 0; 135 136 if (!_msg_buff) { 137 _msg_buff = MsgBuffPool::Instance()->GetMsgBuf(this->GetMsgBuffSize()); 138 if (NULL == _msg_buff) { 139 MTLOG_ERROR("Get memory failed, size %d, wait next time", this->GetMsgBuffSize()); 140 return 0; 141 } 142 _msg_buff->SetBuffType(BUFF_RECV); 143 } 144 char* buff = (char*)_msg_buff->GetMsgBuff(); 145 146 int osfd = this->GetOsfd(); 147 struct sockaddr_in from; 148 socklen_t fromlen = sizeof(from); 149 mt_hook_syscall(recvfrom); 150 ret = ff_hook_recvfrom(osfd, buff, _msg_buff->GetMaxLen(), 151 0, (struct sockaddr*)&from, &fromlen); 152 if (ret < 0) 153 { 154 if ((errno == EINTR) || (errno == EAGAIN) || (errno == EINPROGRESS)) 155 { 156 return 0; 157 } 158 else 159 { 160 MTLOG_ERROR("recv error, fd %d", osfd); 161 return 0; 162 } 163 } 164 else if (ret == 0) 165 { 166 MTLOG_DEBUG("remote close connection, fd %d", osfd); 167 return 0; 168 } 169 else 170 { 171 have_rcv_len = ret; 172 _msg_buff->SetHaveRcvLen(have_rcv_len); 173 _msg_buff->SetMsgLen(have_rcv_len); 174 } 175 176 int sessionid = 0; 177 ret = this->GetSessionId(buff, have_rcv_len, sessionid); 178 if (ret <= 0) 179 { 180 MTLOG_ERROR("recv get session failed, len %d, fd %d, drop it", 181 have_rcv_len, osfd); 182 MsgBuffPool::Instance()->FreeMsgBuf(_msg_buff); 183 _msg_buff = NULL; 184 return 0; 185 } 186 187 ISession* session = SessionMgr::Instance()->FindSession(sessionid); 188 if (NULL == session) 189 { 190 MT_ATTR_API(350403, 1); 191 MTLOG_DEBUG("session %d, not find, maybe timeout, drop pkg", sessionid); 192 MsgBuffPool::Instance()->FreeMsgBuf(_msg_buff); 193 _msg_buff = NULL; 194 return 0; 195 } 196 197 IMtConnection* conn = session->GetSessionConn(); 198 MicroThread* thread = session->GetOwnerThread(); 199 if (!thread || !conn || !conn->GetNtfyObj()) 200 { 201 MTLOG_ERROR("sesson obj %p, no thread ptr %p, no conn %p wrong", 202 session, thread, conn); 203 MsgBuffPool::Instance()->FreeMsgBuf(_msg_buff); 204 _msg_buff = NULL; 205 return 0; 206 } 207 MtMsgBuf* msg = conn->GetMtMsgBuff(); 208 if (msg) { 209 MsgBuffPool::Instance()->FreeMsgBuf(msg); 210 } 211 conn->SetMtMsgBuff(_msg_buff); 212 _msg_buff = NULL; 213 214 conn->GetNtfyObj()->SetRcvEvents(KQ_EVENT_READ); 215 if (thread->HasFlag(MicroThread::IO_LIST)) 216 { 217 MtFrame* frame = MtFrame::Instance(); 218 frame->RemoveIoWait(thread); 219 frame->InsertRunable(thread); 220 } 221 } 222 223 return 0; 224 } 225 226 int UdpSessionNtfy::OutputNotify() 227 { 228 NotifyWriteWait(); 229 return 0; 230 } 231 232 int UdpSessionNtfy::HangupNotify() 233 { 234 MtFrame* frame = MtFrame::Instance(); 235 frame->KqueueCtrlDel(this->GetOsfd(), this->GetEvents()); 236 237 MTLOG_ERROR("sesson obj %p, recv error event. fd %d", this, this->GetOsfd()); 238 239 CloseSocket(); 240 241 CreateSocket(); 242 243 return 0; 244 } 245 246 int UdpSessionNtfy::KqueueCtlAdd(void* args) 247 { 248 MtFrame* frame = MtFrame::Instance(); 249 KqFdRef* fd_ref = (KqFdRef*)args; 250 //ASSERT(fd_ref != NULL); 251 252 int osfd = this->GetOsfd(); 253 254 KqueuerObj* old_obj = fd_ref->GetNotifyObj(); 255 if ((old_obj != NULL) && (old_obj != this)) 256 { 257 MTLOG_ERROR("epfd ref conflict, fd: %d, old: %p, now: %p", osfd, old_obj, this); 258 return -1; 259 } 260 261 if (!frame->KqueueCtrlAdd(osfd, KQ_EVENT_WRITE)) 262 { 263 MTLOG_ERROR("epfd ref add failed, log"); 264 return -2; 265 } 266 this->EnableOutput(); 267 268 return 0; 269 } 270 271 int UdpSessionNtfy::KqueueCtlDel(void* args) 272 { 273 MtFrame* frame = MtFrame::Instance(); 274 KqFdRef* fd_ref = (KqFdRef*)args; 275 //ASSERT(fd_ref != NULL); 276 277 int osfd = this->GetOsfd(); 278 279 KqueuerObj* old_obj = fd_ref->GetNotifyObj(); 280 if (old_obj != this) 281 { 282 MTLOG_ERROR("epfd ref conflict, fd: %d, old: %p, now: %p", osfd, old_obj, this); 283 return -1; 284 } 285 286 if (!frame->KqueueCtrlDel(osfd, KQ_EVENT_WRITE)) 287 { 288 MTLOG_ERROR("epfd ref del failed, log"); 289 return -2; 290 } 291 this->DisableOutput(); 292 293 return 0; 294 295 } 296 297 int TcpKeepNtfy::InputNotify() 298 { 299 KeepaliveClose(); 300 return -1; 301 } 302 303 int TcpKeepNtfy::OutputNotify() 304 { 305 KeepaliveClose(); 306 return -1; 307 } 308 309 int TcpKeepNtfy::HangupNotify() 310 { 311 KeepaliveClose(); 312 return -1; 313 } 314 315 void TcpKeepNtfy::KeepaliveClose() 316 { 317 if (_keep_conn) { 318 MTLOG_DEBUG("remote close, fd %d, close connection", _fd); 319 ConnectionMgr::Instance()->CloseIdleTcpKeep(_keep_conn); 320 } else { 321 MTLOG_ERROR("_keep_conn ptr null, error"); 322 } 323 } 324 325 NtfyObjMgr* NtfyObjMgr::_instance = NULL; 326 NtfyObjMgr* NtfyObjMgr::Instance (void) 327 { 328 if (NULL == _instance) 329 { 330 _instance = new NtfyObjMgr; 331 } 332 333 return _instance; 334 } 335 336 void NtfyObjMgr::Destroy() 337 { 338 if( _instance != NULL ) 339 { 340 delete _instance; 341 _instance = NULL; 342 } 343 } 344 345 NtfyObjMgr::NtfyObjMgr() 346 { 347 } 348 349 NtfyObjMgr::~NtfyObjMgr() 350 { 351 } 352 353 int NtfyObjMgr::RegisterSession(int session_name, ISessionNtfy* session) 354 { 355 if (session_name <= 0 || NULL == session) { 356 MTLOG_ERROR("session %d, register %p failed", session_name, session); 357 return -1; 358 } 359 360 SessionMap::iterator it = _session_map.find(session_name); 361 if (it != _session_map.end()) 362 { 363 MTLOG_ERROR("session %d, register %p already", session_name, session); 364 return -2; 365 } 366 367 _session_map.insert(SessionMap::value_type(session_name, session)); 368 369 return 0; 370 } 371 372 ISessionNtfy* NtfyObjMgr::GetNameSession(int session_name) 373 { 374 SessionMap::iterator it = _session_map.find(session_name); 375 if (it != _session_map.end()) 376 { 377 return it->second; 378 } 379 else 380 { 381 return NULL; 382 } 383 } 384 385 KqueuerObj* NtfyObjMgr::GetNtfyObj(int type, int session_name) 386 { 387 KqueuerObj* obj = NULL; 388 SessionProxy* proxy = NULL; 389 390 switch (type) 391 { 392 case NTFY_OBJ_THREAD: 393 obj = _fd_ntfy_pool.AllocPtr(); 394 break; 395 396 case NTFY_OBJ_SESSION: 397 proxy = _udp_proxy_pool.AllocPtr(); 398 obj = proxy; 399 break; 400 401 case NTFY_OBJ_KEEPALIVE: // no need get this now 402 break; 403 404 default: 405 break; 406 } 407 408 if (proxy) { 409 ISessionNtfy* ntfy = this->GetNameSession(session_name); 410 if (!ntfy) { 411 MTLOG_ERROR("ntfy get session name(%d) failed", session_name); 412 this->FreeNtfyObj(proxy); 413 obj = NULL; 414 } else { 415 proxy->SetRealNtfyObj(ntfy); 416 } 417 } 418 419 return obj; 420 421 } 422 423 void NtfyObjMgr::FreeNtfyObj(KqueuerObj* obj) 424 { 425 SessionProxy* proxy = NULL; 426 if (!obj) { 427 return; 428 } 429 430 int type = obj->GetNtfyType(); 431 obj->Reset(); 432 433 switch (type) 434 { 435 case NTFY_OBJ_THREAD: 436 return _fd_ntfy_pool.FreePtr(obj); 437 break; 438 439 case NTFY_OBJ_SESSION: 440 proxy = dynamic_cast<SessionProxy*>(obj); 441 return _udp_proxy_pool.FreePtr(proxy); 442 break; 443 444 case NTFY_OBJ_KEEPALIVE: 445 break; 446 447 default: 448 break; 449 } 450 451 delete obj; 452 return; 453 } 454