1 2 /** 3 * Tencent is pleased to support the open source community by making MSEC available. 4 * 5 * Copyright (C) 2016 THL A29 Limited, a Tencent company. All rights reserved. 6 * 7 * Licensed under the GNU General Public License, Version 2.0 (the "License"); 8 * you may not use this file except in compliance with the License. You may 9 * obtain a copy of the License at 10 * 11 * https://opensource.org/licenses/GPL-2.0 12 * 13 * Unless required by applicable law or agreed to in writing, software distributed under the 14 * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, 15 * either express or implied. See the License for the specific language governing permissions 16 * and limitations under the License. 17 */ 18 19 20 21 /** 22 * @filename kqueue_proxy.cpp 23 * @info kqueue for micro thread manage 24 */ 25 26 #include "kqueue_proxy.h" 27 #include "micro_thread.h" 28 #include "ff_hook.h" 29 30 using namespace NS_MICRO_THREAD; 31 32 KqueueProxy::KqueueProxy() 33 { 34 _maxfd = KqueueProxy::DEFAULT_MAX_FD_NUM; 35 _kqfd = -1; 36 _evtlist = NULL; 37 _kqrefs = NULL; 38 } 39 40 int KqueueProxy::InitKqueue(int max_num) 41 { 42 int rc = 0; 43 if (max_num > _maxfd) 44 { 45 _maxfd = max_num; 46 } 47 48 _kqfd = ff_kqueue(); 49 if (_kqfd < 0) 50 { 51 rc = -1; 52 goto EXIT_LABEL; 53 } 54 55 ff_fcntl(_kqfd, F_SETFD, FD_CLOEXEC); 56 57 _kqrefs = new KqFdRef[_maxfd]; 58 if (_kqrefs == NULL) 59 { 60 rc = -2; 61 goto EXIT_LABEL; 62 } 63 64 _evtlist = (KqEvent*)calloc(_maxfd, sizeof(KqEvent)); 65 if (_evtlist == NULL) 66 { 67 rc = -3; 68 goto EXIT_LABEL; 69 } 70 71 struct rlimit rlim; 72 memset(&rlim, 0, sizeof(rlim)); 73 if (getrlimit(RLIMIT_NOFILE, &rlim) == 0) 74 { 75 if ((int)rlim.rlim_max < _maxfd) 76 { 77 rlim.rlim_cur = rlim.rlim_max; 78 setrlimit(RLIMIT_NOFILE, &rlim); 79 rlim.rlim_cur = _maxfd; 80 rlim.rlim_max = _maxfd; 81 setrlimit(RLIMIT_NOFILE, &rlim); 82 } 83 } 84 85 EXIT_LABEL: 86 87 if (rc < 0) 88 { 89 TermKqueue(); 90 } 91 92 return rc; 93 } 94 95 void KqueueProxy::TermKqueue() 96 { 97 if (_kqfd > 0) 98 { 99 close(_kqfd); 100 _kqfd = -1; 101 } 102 103 if (_evtlist != NULL) 104 { 105 free(_evtlist); 106 _evtlist = NULL; 107 } 108 109 if (_kqrefs != NULL) 110 { 111 delete []_kqrefs; 112 _kqrefs = NULL; 113 } 114 } 115 116 bool KqueueProxy::KqueueAdd(KqObjList& obj_list) 117 { 118 bool ret = true; 119 KqueuerObj *kqobj = NULL; 120 KqueuerObj *kqobj_error = NULL; 121 TAILQ_FOREACH(kqobj, &obj_list, _entry) 122 { 123 if (!KqueueAddObj(kqobj)) 124 { 125 MTLOG_ERROR("kqobj add failed, fd: %d", kqobj->GetOsfd()); 126 kqueue_assert(0); 127 kqobj_error = kqobj; 128 ret = false; 129 goto EXIT_LABEL; 130 } 131 } 132 133 EXIT_LABEL: 134 135 if (!ret) 136 { 137 TAILQ_FOREACH(kqobj, &obj_list, _entry) 138 { 139 if (kqobj == kqobj_error) 140 { 141 break; 142 } 143 KqueueDelObj(kqobj); 144 } 145 } 146 147 return ret; 148 } 149 150 bool KqueueProxy::KqueueDel(KqObjList& obj_list) 151 { 152 bool ret = true; 153 154 KqueuerObj *kqobj = NULL; 155 TAILQ_FOREACH(kqobj, &obj_list, _entry) 156 { 157 if (!KqueueDelObj(kqobj)) // failed also need continue, be sure ref count ok 158 { 159 MTLOG_ERROR("epobj del failed, fd: %d", kqobj->GetOsfd()); 160 kqueue_assert(0); 161 ret = false; 162 } 163 } 164 165 return ret; 166 } 167 168 bool KqueueProxy::KqueueCtrlAdd(int fd, int events) 169 { 170 KqFdRef* item = KqFdRefGet(fd); 171 if (item == NULL) 172 { 173 MT_ATTR_API(320851, 1); // fd error, wtf? 174 MTLOG_ERROR("kqfd ref not find, failed, fd: %d", fd); 175 kqueue_assert(0); 176 return false; 177 } 178 179 item->AttachEvents(events); 180 181 int old_events = item->GetListenEvents(); 182 int new_events = old_events | events; 183 if (old_events == new_events) 184 { 185 return true; 186 } 187 188 KqEvent ke; 189 int ret; 190 if (old_events & KQ_EVENT_WRITE) { 191 EV_SET(&ke, fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL); 192 ret = ff_kevent(_kqfd, &ke, 1, NULL, 0, NULL); 193 if (ret == -1) { 194 // TODO, error check 195 item->DetachEvents(events); 196 kqueue_assert(0); 197 return false; 198 } 199 } 200 if (old_events & KQ_EVENT_READ) { 201 EV_SET(&ke, fd, EVFILT_READ, EV_DELETE, 0, 0, NULL); 202 ret = ff_kevent(_kqfd, &ke, 1, NULL, 0, NULL); 203 if (ret == -1) { 204 // TODO, error check 205 item->DetachEvents(events); 206 kqueue_assert(0); 207 return false; 208 } 209 } 210 if (events & KQ_EVENT_WRITE) { 211 EV_SET(&ke, fd, EVFILT_WRITE, EV_ADD, 0, 0, NULL); 212 ret = ff_kevent(_kqfd, &ke, 1, NULL, 0, NULL); 213 if (ret == -1) { 214 // TODO, error check 215 item->DetachEvents(events); 216 kqueue_assert(0); 217 return false; 218 } 219 } 220 if (events & KQ_EVENT_READ) { 221 EV_SET(&ke, fd, EVFILT_READ, EV_ADD, 0, 0, NULL); 222 ret = ff_kevent(_kqfd, &ke, 1, NULL, 0, NULL); 223 if (ret == -1) { 224 // TODO, error check 225 item->DetachEvents(events); 226 kqueue_assert(0); 227 return false; 228 } 229 } 230 231 item->SetListenEvents(new_events); 232 233 return true; 234 } 235 236 237 bool KqueueProxy::KqueueCtrlDel(int fd, int events) 238 { 239 return KqueueCtrlDelRef(fd, events, false); 240 } 241 242 bool KqueueProxy::KqueueCtrlDelRef(int fd, int events, bool use_ref) 243 { 244 KqFdRef* item = KqFdRefGet(fd); 245 if (item == NULL) 246 { 247 MT_ATTR_API(320851, 1); // fd error 248 MTLOG_ERROR("kqfd ref not find, failed, fd: %d", fd); 249 kqueue_assert(0); 250 return false; 251 252 } 253 254 item->DetachEvents(events); 255 int old_events = item->GetListenEvents(); 256 int new_events = old_events &~ events; 257 258 if (use_ref) { 259 new_events = old_events; 260 if (item->ReadRefCnt() == 0) { 261 new_events = new_events & ~KQ_EVENT_READ; 262 } 263 if (item->WriteRefCnt() == 0) { 264 new_events = new_events & ~KQ_EVENT_WRITE; 265 } 266 } 267 268 if (old_events == new_events) 269 { 270 return true; 271 } 272 KqEvent ke; 273 int ret; 274 if (old_events & KQ_EVENT_WRITE) { 275 EV_SET(&ke, fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL); 276 ret = ff_kevent(_kqfd, &ke, 1, NULL, 0, NULL); 277 if (ret == -1) { 278 kqueue_assert(0); 279 return false; 280 } 281 } 282 if (old_events & KQ_EVENT_READ) { 283 EV_SET(&ke, fd, EVFILT_READ, EV_DELETE, 0, 0, NULL); 284 ret = ff_kevent(_kqfd, &ke, 1, NULL, 0, NULL); 285 if (ret == -1) { 286 kqueue_assert(0); 287 return false; 288 } 289 } 290 291 if (new_events & KQ_EVENT_WRITE) { 292 EV_SET(&ke, fd, EVFILT_WRITE, EV_ADD, 0, 0, NULL); 293 ret = ff_kevent(_kqfd, &ke, 1, NULL, 0, NULL); 294 if (ret == -1) { 295 kqueue_assert(0); 296 return false; 297 } 298 } 299 if (new_events & KQ_EVENT_READ) { 300 EV_SET(&ke, fd, EVFILT_READ, EV_ADD, 0, 0, NULL); 301 ret = ff_kevent(_kqfd, &ke, 1, NULL, 0, NULL); 302 if (ret == -1) { 303 kqueue_assert(0); 304 return false; 305 } 306 } 307 308 item->SetListenEvents(new_events); 309 310 return true; 311 } 312 313 bool KqueueProxy::KqueueAddObj(KqueuerObj* obj) 314 { 315 if (obj == NULL) 316 { 317 MTLOG_ERROR("kqobj input invalid, %p", obj); 318 return false; 319 } 320 321 KqFdRef* item = KqFdRefGet(obj->GetOsfd()); 322 if (item == NULL) 323 { 324 MT_ATTR_API(320851, 1); // fd error 325 MTLOG_ERROR("kqfd ref not find, failed, fd: %d", obj->GetOsfd()); 326 kqueue_assert(0); 327 return false; 328 } 329 330 int ret = obj->KqueueCtlAdd(item); 331 if (ret < 0) { 332 MTLOG_ERROR("kqueue ctrl callback failed, fd: %d, obj: %p", obj->GetOsfd(), obj); 333 kqueue_assert(0); 334 return false; 335 } 336 337 return true; 338 } 339 340 bool KqueueProxy::KqueueDelObj(KqueuerObj* obj) 341 { 342 if (obj == NULL) 343 { 344 MTLOG_ERROR("kqobj input invalid, %p", obj); 345 return false; 346 } 347 KqFdRef* item = KqFdRefGet(obj->GetOsfd()); 348 if (item == NULL) 349 { 350 MT_ATTR_API(320851, 1); // fd error 351 MTLOG_ERROR("kqfd ref not find, failed, fd: %d", obj->GetOsfd()); 352 kqueue_assert(0); 353 return false; 354 } 355 356 int ret = obj->KqueueCtlDel(item); 357 if (ret < 0) { 358 MTLOG_ERROR("kqueue ctrl callback failed, fd: %d, obj: %p", obj->GetOsfd(), obj); 359 kqueue_assert(0); 360 return false; 361 } 362 363 return true; 364 } 365 366 void KqueueProxy::KqueueRcvEventList(int evtfdnum) 367 { 368 int ret = 0; 369 int osfd = 0; 370 int revents = 0; 371 int tmp_evts = 0; 372 KqFdRef* item = NULL; 373 KqueuerObj* obj = NULL; 374 375 for (int i = 0; i < evtfdnum; i++) 376 { 377 osfd = _evtlist[i].ident; 378 379 item = KqFdRefGet(osfd); 380 if (item == NULL) 381 { 382 MT_ATTR_API(320851, 1); // fd error 383 MTLOG_ERROR("kqfd ref not find, failed, fd: %d", osfd); 384 kqueue_assert(0); 385 continue; 386 } 387 tmp_evts = _evtlist[i].filter; 388 if (tmp_evts == EVFILT_READ) { 389 revents |= KQ_EVENT_READ; 390 } 391 if (tmp_evts == EVFILT_WRITE) { 392 revents |= KQ_EVENT_WRITE; 393 } 394 obj = item->GetNotifyObj(); 395 if (obj == NULL) 396 { 397 MTLOG_ERROR("fd notify obj null, failed, fd: %d", osfd); 398 KqueueCtrlDel(osfd, (revents & (KQ_EVENT_READ | KQ_EVENT_WRITE))); 399 continue; 400 } 401 obj->SetRcvEvents(revents); 402 403 if (tmp_evts == EV_ERROR) 404 { 405 obj->HangupNotify(); 406 continue; 407 } 408 409 if (revents & KQ_EVENT_READ) 410 { 411 ret = obj->InputNotify(); 412 if (ret != 0) 413 { 414 continue; 415 } 416 } 417 418 if (revents & KQ_EVENT_WRITE) 419 { 420 ret = obj->OutputNotify(); 421 if (ret != 0) 422 { 423 continue; 424 } 425 } 426 } 427 } 428 429 void KqueueProxy::KqueueDispatch() 430 { 431 int nfd; 432 int wait_time = KqueueGetTimeout(); 433 if (wait_time) { 434 struct timespec ts; 435 ts.tv_sec = wait_time / 1000; 436 ts.tv_nsec = 0; 437 nfd = ff_kevent(_kqfd, NULL, 0, _evtlist, _maxfd, &ts); 438 } else { 439 nfd = ff_kevent(_kqfd, NULL, 0, _evtlist, _maxfd, NULL); 440 } 441 if (nfd <= 0) 442 { 443 return; 444 } 445 446 KqueueRcvEventList(nfd); 447 } 448 449 int KqueuerObj::InputNotify() 450 { 451 MicroThread* thread = this->GetOwnerThread(); 452 if (thread == NULL) 453 { 454 kqueue_assert(0); 455 MTLOG_ERROR("kqueue fd obj, no thread ptr, wrong"); 456 return -1; 457 } 458 459 if (thread->HasFlag(MicroThread::IO_LIST)) 460 { 461 MtFrame* frame = MtFrame::Instance(); 462 frame->RemoveIoWait(thread); 463 frame->InsertRunable(thread); 464 } 465 466 return 0; 467 } 468 469 int KqueuerObj::OutputNotify() 470 { 471 MicroThread* thread = this->GetOwnerThread(); 472 if (NULL == thread) 473 { 474 kqueue_assert(0); 475 MTLOG_ERROR("kqueue fd obj, no thread ptr, wrong"); 476 return -1; 477 } 478 479 // Multiple events arrive at the same time 480 if (thread->HasFlag(MicroThread::IO_LIST)) 481 { 482 MtFrame* frame = MtFrame::Instance(); 483 frame->RemoveIoWait(thread); 484 frame->InsertRunable(thread); 485 } 486 487 return 0; 488 } 489 490 int KqueuerObj::HangupNotify() 491 { 492 MtFrame* frame = MtFrame::Instance(); 493 frame->KqueueCtrlDel(this->GetOsfd(), this->GetEvents()); 494 return 0; 495 } 496 497 int KqueuerObj::KqueueCtlAdd(void* args) 498 { 499 MtFrame* frame = MtFrame::Instance(); 500 KqFdRef* fd_ref = (KqFdRef*)args; 501 kqueue_assert(fd_ref != NULL); 502 503 int osfd = this->GetOsfd(); 504 int new_events = this->GetEvents(); 505 506 // Notify object needs updating 507 KqueuerObj* old_obj = fd_ref->GetNotifyObj(); 508 if ((old_obj != NULL) && (old_obj != this)) 509 { 510 MTLOG_ERROR("kqfd ref conflict, fd: %d, old: %p, now: %p", osfd, old_obj, this); 511 return -1; 512 } 513 fd_ref->SetNotifyObj(this); 514 515 if (!frame->KqueueCtrlAdd(osfd, new_events)) 516 { 517 MTLOG_ERROR("kqfd ref add failed, log"); 518 fd_ref->SetNotifyObj(old_obj); 519 return -2; 520 } 521 522 return 0; 523 } 524 525 int KqueuerObj::KqueueCtlDel(void* args) 526 { 527 MtFrame* frame = MtFrame::Instance(); 528 KqFdRef* fd_ref = (KqFdRef*)args; 529 kqueue_assert(fd_ref != NULL); 530 531 int osfd = this->GetOsfd(); 532 int events = this->GetEvents(); 533 534 KqueuerObj* old_obj = fd_ref->GetNotifyObj(); 535 if (old_obj != this) 536 { 537 MTLOG_ERROR("kqfd ref conflict, fd: %d, old: %p, now: %p", osfd, old_obj, this); 538 return -1; 539 } 540 fd_ref->SetNotifyObj(NULL); 541 542 if (!frame->KqueueCtrlDelRef(osfd, events, false)) 543 { 544 MTLOG_ERROR("kqfd ref del failed, log"); 545 fd_ref->SetNotifyObj(old_obj); 546 return -2; 547 } 548 549 return 0; 550 551 } 552 553