1*a9643ea8Slogwang 2*a9643ea8Slogwang /** 3*a9643ea8Slogwang * Tencent is pleased to support the open source community by making MSEC available. 4*a9643ea8Slogwang * 5*a9643ea8Slogwang * Copyright (C) 2016 THL A29 Limited, a Tencent company. All rights reserved. 6*a9643ea8Slogwang * 7*a9643ea8Slogwang * Licensed under the GNU General Public License, Version 2.0 (the "License"); 8*a9643ea8Slogwang * you may not use this file except in compliance with the License. You may 9*a9643ea8Slogwang * obtain a copy of the License at 10*a9643ea8Slogwang * 11*a9643ea8Slogwang * https://opensource.org/licenses/GPL-2.0 12*a9643ea8Slogwang * 13*a9643ea8Slogwang * Unless required by applicable law or agreed to in writing, software distributed under the 14*a9643ea8Slogwang * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, 15*a9643ea8Slogwang * either express or implied. See the License for the specific language governing permissions 16*a9643ea8Slogwang * and limitations under the License. 17*a9643ea8Slogwang */ 18*a9643ea8Slogwang 19*a9643ea8Slogwang 20*a9643ea8Slogwang 21*a9643ea8Slogwang /** 22*a9643ea8Slogwang * @filename kqueue_proxy.cpp 23*a9643ea8Slogwang * @info kqueue for micro thread manage 24*a9643ea8Slogwang */ 25*a9643ea8Slogwang 26*a9643ea8Slogwang #include "kqueue_proxy.h" 27*a9643ea8Slogwang #include "micro_thread.h" 28*a9643ea8Slogwang #include "ff_hook.h" 29*a9643ea8Slogwang 30*a9643ea8Slogwang using namespace NS_MICRO_THREAD; 31*a9643ea8Slogwang 32*a9643ea8Slogwang KqueueProxy::KqueueProxy() 33*a9643ea8Slogwang { 34*a9643ea8Slogwang _maxfd = KqueueProxy::DEFAULT_MAX_FD_NUM; 35*a9643ea8Slogwang _kqfd = -1; 36*a9643ea8Slogwang _evtlist = NULL; 37*a9643ea8Slogwang _kqrefs = NULL; 38*a9643ea8Slogwang } 39*a9643ea8Slogwang 40*a9643ea8Slogwang int KqueueProxy::InitKqueue(int max_num) 41*a9643ea8Slogwang { 42*a9643ea8Slogwang int rc = 0; 43*a9643ea8Slogwang if (max_num > _maxfd) 44*a9643ea8Slogwang { 45*a9643ea8Slogwang _maxfd = max_num; 46*a9643ea8Slogwang } 47*a9643ea8Slogwang 48*a9643ea8Slogwang _kqfd = ff_kqueue(); 49*a9643ea8Slogwang if (_kqfd < 0) 50*a9643ea8Slogwang { 51*a9643ea8Slogwang rc = -1; 52*a9643ea8Slogwang goto EXIT_LABEL; 53*a9643ea8Slogwang } 54*a9643ea8Slogwang 55*a9643ea8Slogwang ff_fcntl(_kqfd, F_SETFD, FD_CLOEXEC); 56*a9643ea8Slogwang 57*a9643ea8Slogwang _kqrefs = new KqFdRef[_maxfd]; 58*a9643ea8Slogwang if (_kqrefs == NULL) 59*a9643ea8Slogwang { 60*a9643ea8Slogwang rc = -2; 61*a9643ea8Slogwang goto EXIT_LABEL; 62*a9643ea8Slogwang } 63*a9643ea8Slogwang 64*a9643ea8Slogwang _evtlist = (KqEvent*)calloc(_maxfd, sizeof(KqEvent)); 65*a9643ea8Slogwang if (_evtlist == NULL) 66*a9643ea8Slogwang { 67*a9643ea8Slogwang rc = -3; 68*a9643ea8Slogwang goto EXIT_LABEL; 69*a9643ea8Slogwang } 70*a9643ea8Slogwang 71*a9643ea8Slogwang struct rlimit rlim; 72*a9643ea8Slogwang memset(&rlim, 0, sizeof(rlim)); 73*a9643ea8Slogwang if (getrlimit(RLIMIT_NOFILE, &rlim) == 0) 74*a9643ea8Slogwang { 75*a9643ea8Slogwang if ((int)rlim.rlim_max < _maxfd) 76*a9643ea8Slogwang { 77*a9643ea8Slogwang rlim.rlim_cur = rlim.rlim_max; 78*a9643ea8Slogwang setrlimit(RLIMIT_NOFILE, &rlim); 79*a9643ea8Slogwang rlim.rlim_cur = _maxfd; 80*a9643ea8Slogwang rlim.rlim_max = _maxfd; 81*a9643ea8Slogwang setrlimit(RLIMIT_NOFILE, &rlim); 82*a9643ea8Slogwang } 83*a9643ea8Slogwang } 84*a9643ea8Slogwang 85*a9643ea8Slogwang EXIT_LABEL: 86*a9643ea8Slogwang 87*a9643ea8Slogwang if (rc < 0) 88*a9643ea8Slogwang { 89*a9643ea8Slogwang TermKqueue(); 90*a9643ea8Slogwang } 91*a9643ea8Slogwang 92*a9643ea8Slogwang return rc; 93*a9643ea8Slogwang } 94*a9643ea8Slogwang 95*a9643ea8Slogwang void KqueueProxy::TermKqueue() 96*a9643ea8Slogwang { 97*a9643ea8Slogwang if (_kqfd > 0) 98*a9643ea8Slogwang { 99*a9643ea8Slogwang close(_kqfd); 100*a9643ea8Slogwang _kqfd = -1; 101*a9643ea8Slogwang } 102*a9643ea8Slogwang 103*a9643ea8Slogwang if (_evtlist != NULL) 104*a9643ea8Slogwang { 105*a9643ea8Slogwang free(_evtlist); 106*a9643ea8Slogwang _evtlist = NULL; 107*a9643ea8Slogwang } 108*a9643ea8Slogwang 109*a9643ea8Slogwang if (_kqrefs != NULL) 110*a9643ea8Slogwang { 111*a9643ea8Slogwang delete []_kqrefs; 112*a9643ea8Slogwang _kqrefs = NULL; 113*a9643ea8Slogwang } 114*a9643ea8Slogwang } 115*a9643ea8Slogwang 116*a9643ea8Slogwang bool KqueueProxy::KqueueAdd(KqObjList& obj_list) 117*a9643ea8Slogwang { 118*a9643ea8Slogwang bool ret = true; 119*a9643ea8Slogwang KqueuerObj *kqobj = NULL; 120*a9643ea8Slogwang KqueuerObj *kqobj_error = NULL; 121*a9643ea8Slogwang TAILQ_FOREACH(kqobj, &obj_list, _entry) 122*a9643ea8Slogwang { 123*a9643ea8Slogwang if (!KqueueAddObj(kqobj)) 124*a9643ea8Slogwang { 125*a9643ea8Slogwang MTLOG_ERROR("kqobj add failed, fd: %d", kqobj->GetOsfd()); 126*a9643ea8Slogwang kqueue_assert(0); 127*a9643ea8Slogwang kqobj_error = kqobj; 128*a9643ea8Slogwang ret = false; 129*a9643ea8Slogwang goto EXIT_LABEL; 130*a9643ea8Slogwang } 131*a9643ea8Slogwang } 132*a9643ea8Slogwang 133*a9643ea8Slogwang EXIT_LABEL: 134*a9643ea8Slogwang 135*a9643ea8Slogwang if (!ret) 136*a9643ea8Slogwang { 137*a9643ea8Slogwang TAILQ_FOREACH(kqobj, &obj_list, _entry) 138*a9643ea8Slogwang { 139*a9643ea8Slogwang if (kqobj == kqobj_error) 140*a9643ea8Slogwang { 141*a9643ea8Slogwang break; 142*a9643ea8Slogwang } 143*a9643ea8Slogwang KqueueDelObj(kqobj); 144*a9643ea8Slogwang } 145*a9643ea8Slogwang } 146*a9643ea8Slogwang 147*a9643ea8Slogwang return ret; 148*a9643ea8Slogwang } 149*a9643ea8Slogwang 150*a9643ea8Slogwang bool KqueueProxy::KqueueDel(KqObjList& obj_list) 151*a9643ea8Slogwang { 152*a9643ea8Slogwang bool ret = true; 153*a9643ea8Slogwang 154*a9643ea8Slogwang KqueuerObj *kqobj = NULL; 155*a9643ea8Slogwang TAILQ_FOREACH(kqobj, &obj_list, _entry) 156*a9643ea8Slogwang { 157*a9643ea8Slogwang if (!KqueueDelObj(kqobj)) // failed also need continue, be sure ref count ok 158*a9643ea8Slogwang { 159*a9643ea8Slogwang MTLOG_ERROR("epobj del failed, fd: %d", kqobj->GetOsfd()); 160*a9643ea8Slogwang kqueue_assert(0); 161*a9643ea8Slogwang ret = false; 162*a9643ea8Slogwang } 163*a9643ea8Slogwang } 164*a9643ea8Slogwang 165*a9643ea8Slogwang return ret; 166*a9643ea8Slogwang } 167*a9643ea8Slogwang 168*a9643ea8Slogwang bool KqueueProxy::KqueueCtrlAdd(int fd, int events) 169*a9643ea8Slogwang { 170*a9643ea8Slogwang KqFdRef* item = KqFdRefGet(fd); 171*a9643ea8Slogwang if (item == NULL) 172*a9643ea8Slogwang { 173*a9643ea8Slogwang MT_ATTR_API(320851, 1); // fd error, wtf? 174*a9643ea8Slogwang MTLOG_ERROR("kqfd ref not find, failed, fd: %d", fd); 175*a9643ea8Slogwang kqueue_assert(0); 176*a9643ea8Slogwang return false; 177*a9643ea8Slogwang } 178*a9643ea8Slogwang 179*a9643ea8Slogwang item->AttachEvents(events); 180*a9643ea8Slogwang 181*a9643ea8Slogwang int old_events = item->GetListenEvents(); 182*a9643ea8Slogwang int new_events = old_events | events; 183*a9643ea8Slogwang if (old_events == new_events) 184*a9643ea8Slogwang { 185*a9643ea8Slogwang return true; 186*a9643ea8Slogwang } 187*a9643ea8Slogwang 188*a9643ea8Slogwang KqEvent ke; 189*a9643ea8Slogwang int ret; 190*a9643ea8Slogwang if (CHK_FD_BIT(fd)) { 191*a9643ea8Slogwang fd = CLR_FD_BIT(fd); 192*a9643ea8Slogwang } 193*a9643ea8Slogwang if (old_events & KQ_EVENT_WRITE) { 194*a9643ea8Slogwang EV_SET(&ke, fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL); 195*a9643ea8Slogwang ret = ff_kevent(_kqfd, &ke, 1, NULL, 0, NULL); 196*a9643ea8Slogwang if (ret == -1) { 197*a9643ea8Slogwang // TODO, error check 198*a9643ea8Slogwang item->DetachEvents(events); 199*a9643ea8Slogwang kqueue_assert(0); 200*a9643ea8Slogwang return false; 201*a9643ea8Slogwang } 202*a9643ea8Slogwang } 203*a9643ea8Slogwang if (old_events & KQ_EVENT_READ) { 204*a9643ea8Slogwang EV_SET(&ke, fd, EVFILT_READ, EV_DELETE, 0, 0, NULL); 205*a9643ea8Slogwang ret = ff_kevent(_kqfd, &ke, 1, NULL, 0, NULL); 206*a9643ea8Slogwang if (ret == -1) { 207*a9643ea8Slogwang // TODO, error check 208*a9643ea8Slogwang item->DetachEvents(events); 209*a9643ea8Slogwang kqueue_assert(0); 210*a9643ea8Slogwang return false; 211*a9643ea8Slogwang } 212*a9643ea8Slogwang } 213*a9643ea8Slogwang if (events & KQ_EVENT_WRITE) { 214*a9643ea8Slogwang EV_SET(&ke, fd, EVFILT_WRITE, EV_ADD, 0, 0, NULL); 215*a9643ea8Slogwang ret = ff_kevent(_kqfd, &ke, 1, NULL, 0, NULL); 216*a9643ea8Slogwang if (ret == -1) { 217*a9643ea8Slogwang // TODO, error check 218*a9643ea8Slogwang item->DetachEvents(events); 219*a9643ea8Slogwang kqueue_assert(0); 220*a9643ea8Slogwang return false; 221*a9643ea8Slogwang } 222*a9643ea8Slogwang } 223*a9643ea8Slogwang if (events & KQ_EVENT_READ) { 224*a9643ea8Slogwang EV_SET(&ke, fd, EVFILT_READ, EV_ADD, 0, 0, NULL); 225*a9643ea8Slogwang ret = ff_kevent(_kqfd, &ke, 1, NULL, 0, NULL); 226*a9643ea8Slogwang if (ret == -1) { 227*a9643ea8Slogwang // TODO, error check 228*a9643ea8Slogwang item->DetachEvents(events); 229*a9643ea8Slogwang kqueue_assert(0); 230*a9643ea8Slogwang return false; 231*a9643ea8Slogwang } 232*a9643ea8Slogwang } 233*a9643ea8Slogwang 234*a9643ea8Slogwang item->SetListenEvents(new_events); 235*a9643ea8Slogwang 236*a9643ea8Slogwang return true; 237*a9643ea8Slogwang } 238*a9643ea8Slogwang 239*a9643ea8Slogwang 240*a9643ea8Slogwang bool KqueueProxy::KqueueCtrlDel(int fd, int events) 241*a9643ea8Slogwang { 242*a9643ea8Slogwang return KqueueCtrlDelRef(fd, events, false); 243*a9643ea8Slogwang } 244*a9643ea8Slogwang 245*a9643ea8Slogwang bool KqueueProxy::KqueueCtrlDelRef(int fd, int events, bool use_ref) 246*a9643ea8Slogwang { 247*a9643ea8Slogwang KqFdRef* item = KqFdRefGet(fd); 248*a9643ea8Slogwang if (item == NULL) 249*a9643ea8Slogwang { 250*a9643ea8Slogwang MT_ATTR_API(320851, 1); // fd error 251*a9643ea8Slogwang MTLOG_ERROR("kqfd ref not find, failed, fd: %d", fd); 252*a9643ea8Slogwang kqueue_assert(0); 253*a9643ea8Slogwang return false; 254*a9643ea8Slogwang 255*a9643ea8Slogwang } 256*a9643ea8Slogwang 257*a9643ea8Slogwang item->DetachEvents(events); 258*a9643ea8Slogwang int old_events = item->GetListenEvents(); 259*a9643ea8Slogwang int new_events = old_events &~ events; 260*a9643ea8Slogwang 261*a9643ea8Slogwang if (use_ref) { 262*a9643ea8Slogwang new_events = old_events; 263*a9643ea8Slogwang if (item->ReadRefCnt() == 0) { 264*a9643ea8Slogwang new_events = new_events & ~KQ_EVENT_READ; 265*a9643ea8Slogwang } 266*a9643ea8Slogwang if (item->WriteRefCnt() == 0) { 267*a9643ea8Slogwang new_events = new_events & ~KQ_EVENT_WRITE; 268*a9643ea8Slogwang } 269*a9643ea8Slogwang } 270*a9643ea8Slogwang 271*a9643ea8Slogwang if (old_events == new_events) 272*a9643ea8Slogwang { 273*a9643ea8Slogwang return true; 274*a9643ea8Slogwang } 275*a9643ea8Slogwang KqEvent ke; 276*a9643ea8Slogwang int ret; 277*a9643ea8Slogwang if (CHK_FD_BIT(fd)) { 278*a9643ea8Slogwang fd = CLR_FD_BIT(fd); 279*a9643ea8Slogwang } 280*a9643ea8Slogwang if (old_events & KQ_EVENT_WRITE) { 281*a9643ea8Slogwang EV_SET(&ke, fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL); 282*a9643ea8Slogwang ret = ff_kevent(_kqfd, &ke, 1, NULL, 0, NULL); 283*a9643ea8Slogwang if (ret == -1) { 284*a9643ea8Slogwang kqueue_assert(0); 285*a9643ea8Slogwang return false; 286*a9643ea8Slogwang } 287*a9643ea8Slogwang } 288*a9643ea8Slogwang if (old_events & KQ_EVENT_READ) { 289*a9643ea8Slogwang EV_SET(&ke, fd, EVFILT_READ, EV_DELETE, 0, 0, NULL); 290*a9643ea8Slogwang ret = ff_kevent(_kqfd, &ke, 1, NULL, 0, NULL); 291*a9643ea8Slogwang if (ret == -1) { 292*a9643ea8Slogwang kqueue_assert(0); 293*a9643ea8Slogwang return false; 294*a9643ea8Slogwang } 295*a9643ea8Slogwang } 296*a9643ea8Slogwang 297*a9643ea8Slogwang if (new_events & KQ_EVENT_WRITE) { 298*a9643ea8Slogwang EV_SET(&ke, fd, EVFILT_WRITE, EV_ADD, 0, 0, NULL); 299*a9643ea8Slogwang ret = ff_kevent(_kqfd, &ke, 1, NULL, 0, NULL); 300*a9643ea8Slogwang if (ret == -1) { 301*a9643ea8Slogwang kqueue_assert(0); 302*a9643ea8Slogwang return false; 303*a9643ea8Slogwang } 304*a9643ea8Slogwang } 305*a9643ea8Slogwang if (new_events & KQ_EVENT_READ) { 306*a9643ea8Slogwang EV_SET(&ke, fd, EVFILT_READ, EV_ADD, 0, 0, NULL); 307*a9643ea8Slogwang ret = ff_kevent(_kqfd, &ke, 1, NULL, 0, NULL); 308*a9643ea8Slogwang if (ret == -1) { 309*a9643ea8Slogwang kqueue_assert(0); 310*a9643ea8Slogwang return false; 311*a9643ea8Slogwang } 312*a9643ea8Slogwang } 313*a9643ea8Slogwang 314*a9643ea8Slogwang item->SetListenEvents(new_events); 315*a9643ea8Slogwang 316*a9643ea8Slogwang return true; 317*a9643ea8Slogwang } 318*a9643ea8Slogwang 319*a9643ea8Slogwang bool KqueueProxy::KqueueAddObj(KqueuerObj* obj) 320*a9643ea8Slogwang { 321*a9643ea8Slogwang if (obj == NULL) 322*a9643ea8Slogwang { 323*a9643ea8Slogwang MTLOG_ERROR("kqobj input invalid, %p", obj); 324*a9643ea8Slogwang return false; 325*a9643ea8Slogwang } 326*a9643ea8Slogwang 327*a9643ea8Slogwang KqFdRef* item = KqFdRefGet(obj->GetOsfd()); 328*a9643ea8Slogwang if (item == NULL) 329*a9643ea8Slogwang { 330*a9643ea8Slogwang MT_ATTR_API(320851, 1); // fd error 331*a9643ea8Slogwang MTLOG_ERROR("kqfd ref not find, failed, fd: %d", obj->GetOsfd()); 332*a9643ea8Slogwang kqueue_assert(0); 333*a9643ea8Slogwang return false; 334*a9643ea8Slogwang } 335*a9643ea8Slogwang 336*a9643ea8Slogwang int ret = obj->KqueueCtlAdd(item); 337*a9643ea8Slogwang if (ret < 0) { 338*a9643ea8Slogwang MTLOG_ERROR("kqueue ctrl callback failed, fd: %d, obj: %p", obj->GetOsfd(), obj); 339*a9643ea8Slogwang kqueue_assert(0); 340*a9643ea8Slogwang return false; 341*a9643ea8Slogwang } 342*a9643ea8Slogwang 343*a9643ea8Slogwang return true; 344*a9643ea8Slogwang } 345*a9643ea8Slogwang 346*a9643ea8Slogwang bool KqueueProxy::KqueueDelObj(KqueuerObj* obj) 347*a9643ea8Slogwang { 348*a9643ea8Slogwang if (obj == NULL) 349*a9643ea8Slogwang { 350*a9643ea8Slogwang MTLOG_ERROR("kqobj input invalid, %p", obj); 351*a9643ea8Slogwang return false; 352*a9643ea8Slogwang } 353*a9643ea8Slogwang KqFdRef* item = KqFdRefGet(obj->GetOsfd()); 354*a9643ea8Slogwang if (item == NULL) 355*a9643ea8Slogwang { 356*a9643ea8Slogwang MT_ATTR_API(320851, 1); // fd error 357*a9643ea8Slogwang MTLOG_ERROR("kqfd ref not find, failed, fd: %d", obj->GetOsfd()); 358*a9643ea8Slogwang kqueue_assert(0); 359*a9643ea8Slogwang return false; 360*a9643ea8Slogwang } 361*a9643ea8Slogwang 362*a9643ea8Slogwang int ret = obj->KqueueCtlDel(item); 363*a9643ea8Slogwang if (ret < 0) { 364*a9643ea8Slogwang MTLOG_ERROR("kqueue ctrl callback failed, fd: %d, obj: %p", obj->GetOsfd(), obj); 365*a9643ea8Slogwang kqueue_assert(0); 366*a9643ea8Slogwang return false; 367*a9643ea8Slogwang } 368*a9643ea8Slogwang 369*a9643ea8Slogwang return true; 370*a9643ea8Slogwang } 371*a9643ea8Slogwang 372*a9643ea8Slogwang void KqueueProxy::KqueueRcvEventList(int evtfdnum) 373*a9643ea8Slogwang { 374*a9643ea8Slogwang int ret = 0; 375*a9643ea8Slogwang int osfd = 0; 376*a9643ea8Slogwang int revents = 0; 377*a9643ea8Slogwang int tmp_evts = 0; 378*a9643ea8Slogwang KqFdRef* item = NULL; 379*a9643ea8Slogwang KqueuerObj* obj = NULL; 380*a9643ea8Slogwang 381*a9643ea8Slogwang for (int i = 0; i < evtfdnum; i++) 382*a9643ea8Slogwang { 383*a9643ea8Slogwang osfd = _evtlist[i].ident |= 1 << FF_FD_BITS; 384*a9643ea8Slogwang 385*a9643ea8Slogwang item = KqFdRefGet(osfd); 386*a9643ea8Slogwang if (item == NULL) 387*a9643ea8Slogwang { 388*a9643ea8Slogwang MT_ATTR_API(320851, 1); // fd error 389*a9643ea8Slogwang MTLOG_ERROR("kqfd ref not find, failed, fd: %d", osfd); 390*a9643ea8Slogwang kqueue_assert(0); 391*a9643ea8Slogwang continue; 392*a9643ea8Slogwang } 393*a9643ea8Slogwang tmp_evts = _evtlist[i].filter; 394*a9643ea8Slogwang if (tmp_evts == EVFILT_READ) { 395*a9643ea8Slogwang revents |= KQ_EVENT_READ; 396*a9643ea8Slogwang } 397*a9643ea8Slogwang if (tmp_evts == EVFILT_WRITE) { 398*a9643ea8Slogwang revents |= KQ_EVENT_WRITE; 399*a9643ea8Slogwang } 400*a9643ea8Slogwang obj = item->GetNotifyObj(); 401*a9643ea8Slogwang if (obj == NULL) 402*a9643ea8Slogwang { 403*a9643ea8Slogwang MTLOG_ERROR("fd notify obj null, failed, fd: %d", osfd); 404*a9643ea8Slogwang KqueueCtrlDel(osfd, (revents & (KQ_EVENT_READ | KQ_EVENT_WRITE))); 405*a9643ea8Slogwang continue; 406*a9643ea8Slogwang } 407*a9643ea8Slogwang obj->SetRcvEvents(revents); 408*a9643ea8Slogwang 409*a9643ea8Slogwang if (tmp_evts == EV_ERROR) 410*a9643ea8Slogwang { 411*a9643ea8Slogwang obj->HangupNotify(); 412*a9643ea8Slogwang continue; 413*a9643ea8Slogwang } 414*a9643ea8Slogwang 415*a9643ea8Slogwang if (revents & KQ_EVENT_READ) 416*a9643ea8Slogwang { 417*a9643ea8Slogwang ret = obj->InputNotify(); 418*a9643ea8Slogwang if (ret != 0) 419*a9643ea8Slogwang { 420*a9643ea8Slogwang continue; 421*a9643ea8Slogwang } 422*a9643ea8Slogwang } 423*a9643ea8Slogwang 424*a9643ea8Slogwang if (revents & KQ_EVENT_WRITE) 425*a9643ea8Slogwang { 426*a9643ea8Slogwang ret = obj->OutputNotify(); 427*a9643ea8Slogwang if (ret != 0) 428*a9643ea8Slogwang { 429*a9643ea8Slogwang continue; 430*a9643ea8Slogwang } 431*a9643ea8Slogwang } 432*a9643ea8Slogwang } 433*a9643ea8Slogwang } 434*a9643ea8Slogwang 435*a9643ea8Slogwang void KqueueProxy::KqueueDispatch() 436*a9643ea8Slogwang { 437*a9643ea8Slogwang int nfd; 438*a9643ea8Slogwang int wait_time = KqueueGetTimeout(); 439*a9643ea8Slogwang if (wait_time) { 440*a9643ea8Slogwang struct timespec ts; 441*a9643ea8Slogwang ts.tv_sec = wait_time / 1000; 442*a9643ea8Slogwang ts.tv_nsec = 0; 443*a9643ea8Slogwang nfd = ff_kevent(_kqfd, NULL, 0, _evtlist, _maxfd, &ts); 444*a9643ea8Slogwang } else { 445*a9643ea8Slogwang nfd = ff_kevent(_kqfd, NULL, 0, _evtlist, _maxfd, NULL); 446*a9643ea8Slogwang } 447*a9643ea8Slogwang if (nfd <= 0) 448*a9643ea8Slogwang { 449*a9643ea8Slogwang return; 450*a9643ea8Slogwang } 451*a9643ea8Slogwang 452*a9643ea8Slogwang KqueueRcvEventList(nfd); 453*a9643ea8Slogwang } 454*a9643ea8Slogwang 455*a9643ea8Slogwang int KqueuerObj::InputNotify() 456*a9643ea8Slogwang { 457*a9643ea8Slogwang MicroThread* thread = this->GetOwnerThread(); 458*a9643ea8Slogwang if (thread == NULL) 459*a9643ea8Slogwang { 460*a9643ea8Slogwang kqueue_assert(0); 461*a9643ea8Slogwang MTLOG_ERROR("kqueue fd obj, no thread ptr, wrong"); 462*a9643ea8Slogwang return -1; 463*a9643ea8Slogwang } 464*a9643ea8Slogwang 465*a9643ea8Slogwang if (thread->HasFlag(MicroThread::IO_LIST)) 466*a9643ea8Slogwang { 467*a9643ea8Slogwang MtFrame* frame = MtFrame::Instance(); 468*a9643ea8Slogwang frame->RemoveIoWait(thread); 469*a9643ea8Slogwang frame->InsertRunable(thread); 470*a9643ea8Slogwang } 471*a9643ea8Slogwang 472*a9643ea8Slogwang return 0; 473*a9643ea8Slogwang } 474*a9643ea8Slogwang 475*a9643ea8Slogwang int KqueuerObj::OutputNotify() 476*a9643ea8Slogwang { 477*a9643ea8Slogwang MicroThread* thread = this->GetOwnerThread(); 478*a9643ea8Slogwang if (NULL == thread) 479*a9643ea8Slogwang { 480*a9643ea8Slogwang kqueue_assert(0); 481*a9643ea8Slogwang MTLOG_ERROR("kqueue fd obj, no thread ptr, wrong"); 482*a9643ea8Slogwang return -1; 483*a9643ea8Slogwang } 484*a9643ea8Slogwang 485*a9643ea8Slogwang // 多个事件同时到达, 防重复操作 486*a9643ea8Slogwang if (thread->HasFlag(MicroThread::IO_LIST)) 487*a9643ea8Slogwang { 488*a9643ea8Slogwang MtFrame* frame = MtFrame::Instance(); 489*a9643ea8Slogwang frame->RemoveIoWait(thread); 490*a9643ea8Slogwang frame->InsertRunable(thread); 491*a9643ea8Slogwang } 492*a9643ea8Slogwang 493*a9643ea8Slogwang return 0; 494*a9643ea8Slogwang } 495*a9643ea8Slogwang 496*a9643ea8Slogwang int KqueuerObj::HangupNotify() 497*a9643ea8Slogwang { 498*a9643ea8Slogwang MtFrame* frame = MtFrame::Instance(); 499*a9643ea8Slogwang frame->KqueueCtrlDel(this->GetOsfd(), this->GetEvents()); 500*a9643ea8Slogwang return 0; 501*a9643ea8Slogwang } 502*a9643ea8Slogwang 503*a9643ea8Slogwang int KqueuerObj::KqueueCtlAdd(void* args) 504*a9643ea8Slogwang { 505*a9643ea8Slogwang MtFrame* frame = MtFrame::Instance(); 506*a9643ea8Slogwang KqFdRef* fd_ref = (KqFdRef*)args; 507*a9643ea8Slogwang kqueue_assert(fd_ref != NULL); 508*a9643ea8Slogwang 509*a9643ea8Slogwang int osfd = this->GetOsfd(); 510*a9643ea8Slogwang int new_events = this->GetEvents(); 511*a9643ea8Slogwang 512*a9643ea8Slogwang // 通知对象需要更新, FD通知对象理论上不会复用, 这里做冲突检查, 异常log记录 513*a9643ea8Slogwang KqueuerObj* old_obj = fd_ref->GetNotifyObj(); 514*a9643ea8Slogwang if ((old_obj != NULL) && (old_obj != this)) 515*a9643ea8Slogwang { 516*a9643ea8Slogwang MTLOG_ERROR("kqfd ref conflict, fd: %d, old: %p, now: %p", osfd, old_obj, this); 517*a9643ea8Slogwang return -1; 518*a9643ea8Slogwang } 519*a9643ea8Slogwang fd_ref->SetNotifyObj(this); 520*a9643ea8Slogwang 521*a9643ea8Slogwang // 调用框架的epoll ctl接口, 屏蔽epoll ctrl细节 522*a9643ea8Slogwang if (!frame->KqueueCtrlAdd(osfd, new_events)) 523*a9643ea8Slogwang { 524*a9643ea8Slogwang MTLOG_ERROR("kqfd ref add failed, log"); 525*a9643ea8Slogwang fd_ref->SetNotifyObj(old_obj); 526*a9643ea8Slogwang return -2; 527*a9643ea8Slogwang } 528*a9643ea8Slogwang 529*a9643ea8Slogwang return 0; 530*a9643ea8Slogwang } 531*a9643ea8Slogwang 532*a9643ea8Slogwang int KqueuerObj::KqueueCtlDel(void* args) 533*a9643ea8Slogwang { 534*a9643ea8Slogwang MtFrame* frame = MtFrame::Instance(); 535*a9643ea8Slogwang KqFdRef* fd_ref = (KqFdRef*)args; 536*a9643ea8Slogwang kqueue_assert(fd_ref != NULL); 537*a9643ea8Slogwang 538*a9643ea8Slogwang int osfd = this->GetOsfd(); 539*a9643ea8Slogwang int events = this->GetEvents(); 540*a9643ea8Slogwang 541*a9643ea8Slogwang // 通知对象需要更新, FD通知对象理论上不会复用, 这里做冲突检查, 异常log记录 542*a9643ea8Slogwang KqueuerObj* old_obj = fd_ref->GetNotifyObj(); 543*a9643ea8Slogwang if (old_obj != this) 544*a9643ea8Slogwang { 545*a9643ea8Slogwang MTLOG_ERROR("kqfd ref conflict, fd: %d, old: %p, now: %p", osfd, old_obj, this); 546*a9643ea8Slogwang return -1; 547*a9643ea8Slogwang } 548*a9643ea8Slogwang fd_ref->SetNotifyObj(NULL); 549*a9643ea8Slogwang 550*a9643ea8Slogwang // 调用框架的epoll ctl接口, 屏蔽epoll ctrl细节 551*a9643ea8Slogwang if (!frame->KqueueCtrlDelRef(osfd, events, false)) // 引用有风险, 弊大于利, 关闭掉 552*a9643ea8Slogwang { 553*a9643ea8Slogwang MTLOG_ERROR("kqfd ref del failed, log"); 554*a9643ea8Slogwang fd_ref->SetNotifyObj(old_obj); 555*a9643ea8Slogwang return -2; 556*a9643ea8Slogwang } 557*a9643ea8Slogwang 558*a9643ea8Slogwang return 0; 559*a9643ea8Slogwang 560*a9643ea8Slogwang } 561*a9643ea8Slogwang 562