1 2 /** 3 * Tencent is pleased to support the open source community by making MSEC available. 4 * 5 * Copyright (C) 2016 THL A29 Limited, a Tencent company. All rights reserved. 6 * 7 * Licensed under the GNU General Public License, Version 2.0 (the "License"); 8 * you may not use this file except in compliance with the License. You may 9 * obtain a copy of the License at 10 * 11 * https://opensource.org/licenses/GPL-2.0 12 * 13 * Unless required by applicable law or agreed to in writing, software distributed under the 14 * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, 15 * either express or implied. See the License for the specific language governing permissions 16 * and limitations under the License. 17 */ 18 19 20 21 /** 22 * @filename kqueue_proxy.cpp 23 * @info kqueue for micro thread manage 24 */ 25 26 #include "kqueue_proxy.h" 27 #include "micro_thread.h" 28 #include "ff_hook.h" 29 30 using namespace NS_MICRO_THREAD; 31 32 KqueueProxy::KqueueProxy() 33 { 34 _maxfd = KqueueProxy::DEFAULT_MAX_FD_NUM; 35 _kqfd = -1; 36 _evtlist = NULL; 37 _kqrefs = NULL; 38 } 39 40 int KqueueProxy::InitKqueue(int max_num) 41 { 42 int rc = 0; 43 if (max_num > _maxfd) 44 { 45 _maxfd = max_num; 46 } 47 48 _kqfd = ff_kqueue(); 49 if (_kqfd < 0) 50 { 51 rc = -1; 52 goto EXIT_LABEL; 53 } 54 55 ff_fcntl(_kqfd, F_SETFD, FD_CLOEXEC); 56 57 _kqrefs = new KqFdRef[_maxfd]; 58 if (_kqrefs == NULL) 59 { 60 rc = -2; 61 goto EXIT_LABEL; 62 } 63 64 _evtlist = (KqEvent*)calloc(_maxfd, sizeof(KqEvent)); 65 if (_evtlist == NULL) 66 { 67 rc = -3; 68 goto EXIT_LABEL; 69 } 70 71 struct rlimit rlim; 72 memset(&rlim, 0, sizeof(rlim)); 73 if (getrlimit(RLIMIT_NOFILE, &rlim) == 0) 74 { 75 if ((int)rlim.rlim_max < _maxfd) 76 { 77 rlim.rlim_cur = rlim.rlim_max; 78 setrlimit(RLIMIT_NOFILE, &rlim); 79 rlim.rlim_cur = _maxfd; 80 rlim.rlim_max = _maxfd; 81 setrlimit(RLIMIT_NOFILE, &rlim); 82 } 83 } 84 85 EXIT_LABEL: 86 87 if (rc < 0) 88 { 89 TermKqueue(); 90 } 91 92 return rc; 93 } 94 95 void KqueueProxy::TermKqueue() 96 { 97 if (_kqfd > 0) 98 { 99 close(_kqfd); 100 _kqfd = -1; 101 } 102 103 if (_evtlist != NULL) 104 { 105 free(_evtlist); 106 _evtlist = NULL; 107 } 108 109 if (_kqrefs != NULL) 110 { 111 delete []_kqrefs; 112 _kqrefs = NULL; 113 } 114 } 115 116 bool KqueueProxy::KqueueAdd(KqObjList& obj_list) 117 { 118 bool ret = true; 119 KqueuerObj *kqobj = NULL; 120 KqueuerObj *kqobj_error = NULL; 121 TAILQ_FOREACH(kqobj, &obj_list, _entry) 122 { 123 if (!KqueueAddObj(kqobj)) 124 { 125 MTLOG_ERROR("kqobj add failed, fd: %d", kqobj->GetOsfd()); 126 kqueue_assert(0); 127 kqobj_error = kqobj; 128 ret = false; 129 goto EXIT_LABEL; 130 } 131 } 132 133 EXIT_LABEL: 134 135 if (!ret) 136 { 137 TAILQ_FOREACH(kqobj, &obj_list, _entry) 138 { 139 if (kqobj == kqobj_error) 140 { 141 break; 142 } 143 KqueueDelObj(kqobj); 144 } 145 } 146 147 return ret; 148 } 149 150 bool KqueueProxy::KqueueDel(KqObjList& obj_list) 151 { 152 bool ret = true; 153 154 KqueuerObj *kqobj = NULL; 155 TAILQ_FOREACH(kqobj, &obj_list, _entry) 156 { 157 if (!KqueueDelObj(kqobj)) // failed also need continue, be sure ref count ok 158 { 159 MTLOG_ERROR("epobj del failed, fd: %d", kqobj->GetOsfd()); 160 kqueue_assert(0); 161 ret = false; 162 } 163 } 164 165 return ret; 166 } 167 168 bool KqueueProxy::KqueueCtrlAdd(int fd, int events) 169 { 170 KqFdRef* item = KqFdRefGet(fd); 171 if (item == NULL) 172 { 173 MT_ATTR_API(320851, 1); // fd error, wtf? 174 MTLOG_ERROR("kqfd ref not find, failed, fd: %d", fd); 175 kqueue_assert(0); 176 return false; 177 } 178 179 item->AttachEvents(events); 180 181 int old_events = item->GetListenEvents(); 182 int new_events = old_events | events; 183 if (old_events == new_events) 184 { 185 return true; 186 } 187 188 KqEvent ke; 189 int ret; 190 if (CHK_FD_BIT(fd)) { 191 fd = CLR_FD_BIT(fd); 192 } 193 if (old_events & KQ_EVENT_WRITE) { 194 EV_SET(&ke, fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL); 195 ret = ff_kevent(_kqfd, &ke, 1, NULL, 0, NULL); 196 if (ret == -1) { 197 // TODO, error check 198 item->DetachEvents(events); 199 kqueue_assert(0); 200 return false; 201 } 202 } 203 if (old_events & KQ_EVENT_READ) { 204 EV_SET(&ke, fd, EVFILT_READ, EV_DELETE, 0, 0, NULL); 205 ret = ff_kevent(_kqfd, &ke, 1, NULL, 0, NULL); 206 if (ret == -1) { 207 // TODO, error check 208 item->DetachEvents(events); 209 kqueue_assert(0); 210 return false; 211 } 212 } 213 if (events & KQ_EVENT_WRITE) { 214 EV_SET(&ke, fd, EVFILT_WRITE, EV_ADD, 0, 0, NULL); 215 ret = ff_kevent(_kqfd, &ke, 1, NULL, 0, NULL); 216 if (ret == -1) { 217 // TODO, error check 218 item->DetachEvents(events); 219 kqueue_assert(0); 220 return false; 221 } 222 } 223 if (events & KQ_EVENT_READ) { 224 EV_SET(&ke, fd, EVFILT_READ, EV_ADD, 0, 0, NULL); 225 ret = ff_kevent(_kqfd, &ke, 1, NULL, 0, NULL); 226 if (ret == -1) { 227 // TODO, error check 228 item->DetachEvents(events); 229 kqueue_assert(0); 230 return false; 231 } 232 } 233 234 item->SetListenEvents(new_events); 235 236 return true; 237 } 238 239 240 bool KqueueProxy::KqueueCtrlDel(int fd, int events) 241 { 242 return KqueueCtrlDelRef(fd, events, false); 243 } 244 245 bool KqueueProxy::KqueueCtrlDelRef(int fd, int events, bool use_ref) 246 { 247 KqFdRef* item = KqFdRefGet(fd); 248 if (item == NULL) 249 { 250 MT_ATTR_API(320851, 1); // fd error 251 MTLOG_ERROR("kqfd ref not find, failed, fd: %d", fd); 252 kqueue_assert(0); 253 return false; 254 255 } 256 257 item->DetachEvents(events); 258 int old_events = item->GetListenEvents(); 259 int new_events = old_events &~ events; 260 261 if (use_ref) { 262 new_events = old_events; 263 if (item->ReadRefCnt() == 0) { 264 new_events = new_events & ~KQ_EVENT_READ; 265 } 266 if (item->WriteRefCnt() == 0) { 267 new_events = new_events & ~KQ_EVENT_WRITE; 268 } 269 } 270 271 if (old_events == new_events) 272 { 273 return true; 274 } 275 KqEvent ke; 276 int ret; 277 if (CHK_FD_BIT(fd)) { 278 fd = CLR_FD_BIT(fd); 279 } 280 if (old_events & KQ_EVENT_WRITE) { 281 EV_SET(&ke, fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL); 282 ret = ff_kevent(_kqfd, &ke, 1, NULL, 0, NULL); 283 if (ret == -1) { 284 kqueue_assert(0); 285 return false; 286 } 287 } 288 if (old_events & KQ_EVENT_READ) { 289 EV_SET(&ke, fd, EVFILT_READ, EV_DELETE, 0, 0, NULL); 290 ret = ff_kevent(_kqfd, &ke, 1, NULL, 0, NULL); 291 if (ret == -1) { 292 kqueue_assert(0); 293 return false; 294 } 295 } 296 297 if (new_events & KQ_EVENT_WRITE) { 298 EV_SET(&ke, fd, EVFILT_WRITE, EV_ADD, 0, 0, NULL); 299 ret = ff_kevent(_kqfd, &ke, 1, NULL, 0, NULL); 300 if (ret == -1) { 301 kqueue_assert(0); 302 return false; 303 } 304 } 305 if (new_events & KQ_EVENT_READ) { 306 EV_SET(&ke, fd, EVFILT_READ, EV_ADD, 0, 0, NULL); 307 ret = ff_kevent(_kqfd, &ke, 1, NULL, 0, NULL); 308 if (ret == -1) { 309 kqueue_assert(0); 310 return false; 311 } 312 } 313 314 item->SetListenEvents(new_events); 315 316 return true; 317 } 318 319 bool KqueueProxy::KqueueAddObj(KqueuerObj* obj) 320 { 321 if (obj == NULL) 322 { 323 MTLOG_ERROR("kqobj input invalid, %p", obj); 324 return false; 325 } 326 327 KqFdRef* item = KqFdRefGet(obj->GetOsfd()); 328 if (item == NULL) 329 { 330 MT_ATTR_API(320851, 1); // fd error 331 MTLOG_ERROR("kqfd ref not find, failed, fd: %d", obj->GetOsfd()); 332 kqueue_assert(0); 333 return false; 334 } 335 336 int ret = obj->KqueueCtlAdd(item); 337 if (ret < 0) { 338 MTLOG_ERROR("kqueue ctrl callback failed, fd: %d, obj: %p", obj->GetOsfd(), obj); 339 kqueue_assert(0); 340 return false; 341 } 342 343 return true; 344 } 345 346 bool KqueueProxy::KqueueDelObj(KqueuerObj* obj) 347 { 348 if (obj == NULL) 349 { 350 MTLOG_ERROR("kqobj input invalid, %p", obj); 351 return false; 352 } 353 KqFdRef* item = KqFdRefGet(obj->GetOsfd()); 354 if (item == NULL) 355 { 356 MT_ATTR_API(320851, 1); // fd error 357 MTLOG_ERROR("kqfd ref not find, failed, fd: %d", obj->GetOsfd()); 358 kqueue_assert(0); 359 return false; 360 } 361 362 int ret = obj->KqueueCtlDel(item); 363 if (ret < 0) { 364 MTLOG_ERROR("kqueue ctrl callback failed, fd: %d, obj: %p", obj->GetOsfd(), obj); 365 kqueue_assert(0); 366 return false; 367 } 368 369 return true; 370 } 371 372 void KqueueProxy::KqueueRcvEventList(int evtfdnum) 373 { 374 int ret = 0; 375 int osfd = 0; 376 int revents = 0; 377 int tmp_evts = 0; 378 KqFdRef* item = NULL; 379 KqueuerObj* obj = NULL; 380 381 for (int i = 0; i < evtfdnum; i++) 382 { 383 osfd = _evtlist[i].ident |= 1 << FF_FD_BITS; 384 385 item = KqFdRefGet(osfd); 386 if (item == NULL) 387 { 388 MT_ATTR_API(320851, 1); // fd error 389 MTLOG_ERROR("kqfd ref not find, failed, fd: %d", osfd); 390 kqueue_assert(0); 391 continue; 392 } 393 tmp_evts = _evtlist[i].filter; 394 if (tmp_evts == EVFILT_READ) { 395 revents |= KQ_EVENT_READ; 396 } 397 if (tmp_evts == EVFILT_WRITE) { 398 revents |= KQ_EVENT_WRITE; 399 } 400 obj = item->GetNotifyObj(); 401 if (obj == NULL) 402 { 403 MTLOG_ERROR("fd notify obj null, failed, fd: %d", osfd); 404 KqueueCtrlDel(osfd, (revents & (KQ_EVENT_READ | KQ_EVENT_WRITE))); 405 continue; 406 } 407 obj->SetRcvEvents(revents); 408 409 if (tmp_evts == EV_ERROR) 410 { 411 obj->HangupNotify(); 412 continue; 413 } 414 415 if (revents & KQ_EVENT_READ) 416 { 417 ret = obj->InputNotify(); 418 if (ret != 0) 419 { 420 continue; 421 } 422 } 423 424 if (revents & KQ_EVENT_WRITE) 425 { 426 ret = obj->OutputNotify(); 427 if (ret != 0) 428 { 429 continue; 430 } 431 } 432 } 433 } 434 435 void KqueueProxy::KqueueDispatch() 436 { 437 int nfd; 438 int wait_time = KqueueGetTimeout(); 439 if (wait_time) { 440 struct timespec ts; 441 ts.tv_sec = wait_time / 1000; 442 ts.tv_nsec = 0; 443 nfd = ff_kevent(_kqfd, NULL, 0, _evtlist, _maxfd, &ts); 444 } else { 445 nfd = ff_kevent(_kqfd, NULL, 0, _evtlist, _maxfd, NULL); 446 } 447 if (nfd <= 0) 448 { 449 return; 450 } 451 452 KqueueRcvEventList(nfd); 453 } 454 455 int KqueuerObj::InputNotify() 456 { 457 MicroThread* thread = this->GetOwnerThread(); 458 if (thread == NULL) 459 { 460 kqueue_assert(0); 461 MTLOG_ERROR("kqueue fd obj, no thread ptr, wrong"); 462 return -1; 463 } 464 465 if (thread->HasFlag(MicroThread::IO_LIST)) 466 { 467 MtFrame* frame = MtFrame::Instance(); 468 frame->RemoveIoWait(thread); 469 frame->InsertRunable(thread); 470 } 471 472 return 0; 473 } 474 475 int KqueuerObj::OutputNotify() 476 { 477 MicroThread* thread = this->GetOwnerThread(); 478 if (NULL == thread) 479 { 480 kqueue_assert(0); 481 MTLOG_ERROR("kqueue fd obj, no thread ptr, wrong"); 482 return -1; 483 } 484 485 // 多个事件同时到达, 防重复操作 486 if (thread->HasFlag(MicroThread::IO_LIST)) 487 { 488 MtFrame* frame = MtFrame::Instance(); 489 frame->RemoveIoWait(thread); 490 frame->InsertRunable(thread); 491 } 492 493 return 0; 494 } 495 496 int KqueuerObj::HangupNotify() 497 { 498 MtFrame* frame = MtFrame::Instance(); 499 frame->KqueueCtrlDel(this->GetOsfd(), this->GetEvents()); 500 return 0; 501 } 502 503 int KqueuerObj::KqueueCtlAdd(void* args) 504 { 505 MtFrame* frame = MtFrame::Instance(); 506 KqFdRef* fd_ref = (KqFdRef*)args; 507 kqueue_assert(fd_ref != NULL); 508 509 int osfd = this->GetOsfd(); 510 int new_events = this->GetEvents(); 511 512 // 通知对象需要更新, FD通知对象理论上不会复用, 这里做冲突检查, 异常log记录 513 KqueuerObj* old_obj = fd_ref->GetNotifyObj(); 514 if ((old_obj != NULL) && (old_obj != this)) 515 { 516 MTLOG_ERROR("kqfd ref conflict, fd: %d, old: %p, now: %p", osfd, old_obj, this); 517 return -1; 518 } 519 fd_ref->SetNotifyObj(this); 520 521 // 调用框架的epoll ctl接口, 屏蔽epoll ctrl细节 522 if (!frame->KqueueCtrlAdd(osfd, new_events)) 523 { 524 MTLOG_ERROR("kqfd ref add failed, log"); 525 fd_ref->SetNotifyObj(old_obj); 526 return -2; 527 } 528 529 return 0; 530 } 531 532 int KqueuerObj::KqueueCtlDel(void* args) 533 { 534 MtFrame* frame = MtFrame::Instance(); 535 KqFdRef* fd_ref = (KqFdRef*)args; 536 kqueue_assert(fd_ref != NULL); 537 538 int osfd = this->GetOsfd(); 539 int events = this->GetEvents(); 540 541 // 通知对象需要更新, FD通知对象理论上不会复用, 这里做冲突检查, 异常log记录 542 KqueuerObj* old_obj = fd_ref->GetNotifyObj(); 543 if (old_obj != this) 544 { 545 MTLOG_ERROR("kqfd ref conflict, fd: %d, old: %p, now: %p", osfd, old_obj, this); 546 return -1; 547 } 548 fd_ref->SetNotifyObj(NULL); 549 550 // 调用框架的epoll ctl接口, 屏蔽epoll ctrl细节 551 if (!frame->KqueueCtrlDelRef(osfd, events, false)) // 引用有风险, 弊大于利, 关闭掉 552 { 553 MTLOG_ERROR("kqfd ref del failed, log"); 554 fd_ref->SetNotifyObj(old_obj); 555 return -2; 556 } 557 558 return 0; 559 560 } 561 562