1a9643ea8Slogwang 2a9643ea8Slogwang /** 3a9643ea8Slogwang * Tencent is pleased to support the open source community by making MSEC available. 4a9643ea8Slogwang * 5a9643ea8Slogwang * Copyright (C) 2016 THL A29 Limited, a Tencent company. All rights reserved. 6a9643ea8Slogwang * 7a9643ea8Slogwang * Licensed under the GNU General Public License, Version 2.0 (the "License"); 8a9643ea8Slogwang * you may not use this file except in compliance with the License. You may 9a9643ea8Slogwang * obtain a copy of the License at 10a9643ea8Slogwang * 11a9643ea8Slogwang * https://opensource.org/licenses/GPL-2.0 12a9643ea8Slogwang * 13a9643ea8Slogwang * Unless required by applicable law or agreed to in writing, software distributed under the 14a9643ea8Slogwang * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, 15a9643ea8Slogwang * either express or implied. See the License for the specific language governing permissions 16a9643ea8Slogwang * and limitations under the License. 17a9643ea8Slogwang */ 18a9643ea8Slogwang 19a9643ea8Slogwang 20a9643ea8Slogwang 21a9643ea8Slogwang /** 22a9643ea8Slogwang * @filename kqueue_proxy.cpp 23a9643ea8Slogwang * @info kqueue for micro thread manage 24a9643ea8Slogwang */ 25a9643ea8Slogwang 26a9643ea8Slogwang #include "kqueue_proxy.h" 27a9643ea8Slogwang #include "micro_thread.h" 28a9643ea8Slogwang #include "ff_hook.h" 29a9643ea8Slogwang 30a9643ea8Slogwang using namespace NS_MICRO_THREAD; 31a9643ea8Slogwang 32a9643ea8Slogwang KqueueProxy::KqueueProxy() 33a9643ea8Slogwang { 34a9643ea8Slogwang _maxfd = KqueueProxy::DEFAULT_MAX_FD_NUM; 35a9643ea8Slogwang _kqfd = -1; 36a9643ea8Slogwang _evtlist = NULL; 37a9643ea8Slogwang _kqrefs = NULL; 38a9643ea8Slogwang } 39a9643ea8Slogwang 40a9643ea8Slogwang int KqueueProxy::InitKqueue(int max_num) 41a9643ea8Slogwang { 42a9643ea8Slogwang int rc = 0; 43a9643ea8Slogwang if (max_num > _maxfd) 44a9643ea8Slogwang { 45a9643ea8Slogwang _maxfd = max_num; 46a9643ea8Slogwang } 47a9643ea8Slogwang 48a9643ea8Slogwang _kqfd = ff_kqueue(); 49a9643ea8Slogwang if (_kqfd < 0) 50a9643ea8Slogwang { 51a9643ea8Slogwang rc = -1; 52a9643ea8Slogwang goto EXIT_LABEL; 53a9643ea8Slogwang } 54a9643ea8Slogwang 55a9643ea8Slogwang ff_fcntl(_kqfd, F_SETFD, FD_CLOEXEC); 56a9643ea8Slogwang 57a9643ea8Slogwang _kqrefs = new KqFdRef[_maxfd]; 58a9643ea8Slogwang if (_kqrefs == NULL) 59a9643ea8Slogwang { 60a9643ea8Slogwang rc = -2; 61a9643ea8Slogwang goto EXIT_LABEL; 62a9643ea8Slogwang } 63a9643ea8Slogwang 64a9643ea8Slogwang _evtlist = (KqEvent*)calloc(_maxfd, sizeof(KqEvent)); 65a9643ea8Slogwang if (_evtlist == NULL) 66a9643ea8Slogwang { 67a9643ea8Slogwang rc = -3; 68a9643ea8Slogwang goto EXIT_LABEL; 69a9643ea8Slogwang } 70a9643ea8Slogwang 71a9643ea8Slogwang struct rlimit rlim; 72a9643ea8Slogwang memset(&rlim, 0, sizeof(rlim)); 73a9643ea8Slogwang if (getrlimit(RLIMIT_NOFILE, &rlim) == 0) 74a9643ea8Slogwang { 75a9643ea8Slogwang if ((int)rlim.rlim_max < _maxfd) 76a9643ea8Slogwang { 77a9643ea8Slogwang rlim.rlim_cur = rlim.rlim_max; 78a9643ea8Slogwang setrlimit(RLIMIT_NOFILE, &rlim); 79a9643ea8Slogwang rlim.rlim_cur = _maxfd; 80a9643ea8Slogwang rlim.rlim_max = _maxfd; 81a9643ea8Slogwang setrlimit(RLIMIT_NOFILE, &rlim); 82a9643ea8Slogwang } 83a9643ea8Slogwang } 84a9643ea8Slogwang 85a9643ea8Slogwang EXIT_LABEL: 86a9643ea8Slogwang 87a9643ea8Slogwang if (rc < 0) 88a9643ea8Slogwang { 89a9643ea8Slogwang TermKqueue(); 90a9643ea8Slogwang } 91a9643ea8Slogwang 92a9643ea8Slogwang return rc; 93a9643ea8Slogwang } 94a9643ea8Slogwang 95a9643ea8Slogwang void KqueueProxy::TermKqueue() 96a9643ea8Slogwang { 97a9643ea8Slogwang if (_kqfd > 0) 98a9643ea8Slogwang { 99a9643ea8Slogwang close(_kqfd); 100a9643ea8Slogwang _kqfd = -1; 101a9643ea8Slogwang } 102a9643ea8Slogwang 103a9643ea8Slogwang if (_evtlist != NULL) 104a9643ea8Slogwang { 105a9643ea8Slogwang free(_evtlist); 106a9643ea8Slogwang _evtlist = NULL; 107a9643ea8Slogwang } 108a9643ea8Slogwang 109a9643ea8Slogwang if (_kqrefs != NULL) 110a9643ea8Slogwang { 111a9643ea8Slogwang delete []_kqrefs; 112a9643ea8Slogwang _kqrefs = NULL; 113a9643ea8Slogwang } 114a9643ea8Slogwang } 115a9643ea8Slogwang 116a9643ea8Slogwang bool KqueueProxy::KqueueAdd(KqObjList& obj_list) 117a9643ea8Slogwang { 118a9643ea8Slogwang bool ret = true; 119a9643ea8Slogwang KqueuerObj *kqobj = NULL; 120a9643ea8Slogwang KqueuerObj *kqobj_error = NULL; 121a9643ea8Slogwang TAILQ_FOREACH(kqobj, &obj_list, _entry) 122a9643ea8Slogwang { 123a9643ea8Slogwang if (!KqueueAddObj(kqobj)) 124a9643ea8Slogwang { 125a9643ea8Slogwang MTLOG_ERROR("kqobj add failed, fd: %d", kqobj->GetOsfd()); 126a9643ea8Slogwang kqueue_assert(0); 127a9643ea8Slogwang kqobj_error = kqobj; 128a9643ea8Slogwang ret = false; 129a9643ea8Slogwang goto EXIT_LABEL; 130a9643ea8Slogwang } 131a9643ea8Slogwang } 132a9643ea8Slogwang 133a9643ea8Slogwang EXIT_LABEL: 134a9643ea8Slogwang 135a9643ea8Slogwang if (!ret) 136a9643ea8Slogwang { 137a9643ea8Slogwang TAILQ_FOREACH(kqobj, &obj_list, _entry) 138a9643ea8Slogwang { 139a9643ea8Slogwang if (kqobj == kqobj_error) 140a9643ea8Slogwang { 141a9643ea8Slogwang break; 142a9643ea8Slogwang } 143a9643ea8Slogwang KqueueDelObj(kqobj); 144a9643ea8Slogwang } 145a9643ea8Slogwang } 146a9643ea8Slogwang 147a9643ea8Slogwang return ret; 148a9643ea8Slogwang } 149a9643ea8Slogwang 150a9643ea8Slogwang bool KqueueProxy::KqueueDel(KqObjList& obj_list) 151a9643ea8Slogwang { 152a9643ea8Slogwang bool ret = true; 153a9643ea8Slogwang 154a9643ea8Slogwang KqueuerObj *kqobj = NULL; 155a9643ea8Slogwang TAILQ_FOREACH(kqobj, &obj_list, _entry) 156a9643ea8Slogwang { 157a9643ea8Slogwang if (!KqueueDelObj(kqobj)) // failed also need continue, be sure ref count ok 158a9643ea8Slogwang { 159a9643ea8Slogwang MTLOG_ERROR("epobj del failed, fd: %d", kqobj->GetOsfd()); 160a9643ea8Slogwang kqueue_assert(0); 161a9643ea8Slogwang ret = false; 162a9643ea8Slogwang } 163a9643ea8Slogwang } 164a9643ea8Slogwang 165a9643ea8Slogwang return ret; 166a9643ea8Slogwang } 167a9643ea8Slogwang 168a9643ea8Slogwang bool KqueueProxy::KqueueCtrlAdd(int fd, int events) 169a9643ea8Slogwang { 170a9643ea8Slogwang KqFdRef* item = KqFdRefGet(fd); 171a9643ea8Slogwang if (item == NULL) 172a9643ea8Slogwang { 173a9643ea8Slogwang MT_ATTR_API(320851, 1); // fd error, wtf? 174a9643ea8Slogwang MTLOG_ERROR("kqfd ref not find, failed, fd: %d", fd); 175a9643ea8Slogwang kqueue_assert(0); 176a9643ea8Slogwang return false; 177a9643ea8Slogwang } 178a9643ea8Slogwang 179a9643ea8Slogwang item->AttachEvents(events); 180a9643ea8Slogwang 181a9643ea8Slogwang int old_events = item->GetListenEvents(); 182a9643ea8Slogwang int new_events = old_events | events; 183a9643ea8Slogwang if (old_events == new_events) 184a9643ea8Slogwang { 185a9643ea8Slogwang return true; 186a9643ea8Slogwang } 187a9643ea8Slogwang 188a9643ea8Slogwang KqEvent ke; 189a9643ea8Slogwang int ret; 190a9643ea8Slogwang if (old_events & KQ_EVENT_WRITE) { 191a9643ea8Slogwang EV_SET(&ke, fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL); 192a9643ea8Slogwang ret = ff_kevent(_kqfd, &ke, 1, NULL, 0, NULL); 193a9643ea8Slogwang if (ret == -1) { 194a9643ea8Slogwang // TODO, error check 195a9643ea8Slogwang item->DetachEvents(events); 196a9643ea8Slogwang kqueue_assert(0); 197a9643ea8Slogwang return false; 198a9643ea8Slogwang } 199a9643ea8Slogwang } 200a9643ea8Slogwang if (old_events & KQ_EVENT_READ) { 201a9643ea8Slogwang EV_SET(&ke, fd, EVFILT_READ, EV_DELETE, 0, 0, NULL); 202a9643ea8Slogwang ret = ff_kevent(_kqfd, &ke, 1, NULL, 0, NULL); 203a9643ea8Slogwang if (ret == -1) { 204a9643ea8Slogwang // TODO, error check 205a9643ea8Slogwang item->DetachEvents(events); 206a9643ea8Slogwang kqueue_assert(0); 207a9643ea8Slogwang return false; 208a9643ea8Slogwang } 209a9643ea8Slogwang } 210a9643ea8Slogwang if (events & KQ_EVENT_WRITE) { 211a9643ea8Slogwang EV_SET(&ke, fd, EVFILT_WRITE, EV_ADD, 0, 0, NULL); 212a9643ea8Slogwang ret = ff_kevent(_kqfd, &ke, 1, NULL, 0, NULL); 213a9643ea8Slogwang if (ret == -1) { 214a9643ea8Slogwang // TODO, error check 215a9643ea8Slogwang item->DetachEvents(events); 216a9643ea8Slogwang kqueue_assert(0); 217a9643ea8Slogwang return false; 218a9643ea8Slogwang } 219a9643ea8Slogwang } 220a9643ea8Slogwang if (events & KQ_EVENT_READ) { 221a9643ea8Slogwang EV_SET(&ke, fd, EVFILT_READ, EV_ADD, 0, 0, NULL); 222a9643ea8Slogwang ret = ff_kevent(_kqfd, &ke, 1, NULL, 0, NULL); 223a9643ea8Slogwang if (ret == -1) { 224a9643ea8Slogwang // TODO, error check 225a9643ea8Slogwang item->DetachEvents(events); 226a9643ea8Slogwang kqueue_assert(0); 227a9643ea8Slogwang return false; 228a9643ea8Slogwang } 229a9643ea8Slogwang } 230a9643ea8Slogwang 231a9643ea8Slogwang item->SetListenEvents(new_events); 232a9643ea8Slogwang 233a9643ea8Slogwang return true; 234a9643ea8Slogwang } 235a9643ea8Slogwang 236a9643ea8Slogwang 237a9643ea8Slogwang bool KqueueProxy::KqueueCtrlDel(int fd, int events) 238a9643ea8Slogwang { 239a9643ea8Slogwang return KqueueCtrlDelRef(fd, events, false); 240a9643ea8Slogwang } 241a9643ea8Slogwang 242a9643ea8Slogwang bool KqueueProxy::KqueueCtrlDelRef(int fd, int events, bool use_ref) 243a9643ea8Slogwang { 244a9643ea8Slogwang KqFdRef* item = KqFdRefGet(fd); 245a9643ea8Slogwang if (item == NULL) 246a9643ea8Slogwang { 247a9643ea8Slogwang MT_ATTR_API(320851, 1); // fd error 248a9643ea8Slogwang MTLOG_ERROR("kqfd ref not find, failed, fd: %d", fd); 249a9643ea8Slogwang kqueue_assert(0); 250a9643ea8Slogwang return false; 251a9643ea8Slogwang 252a9643ea8Slogwang } 253a9643ea8Slogwang 254a9643ea8Slogwang item->DetachEvents(events); 255a9643ea8Slogwang int old_events = item->GetListenEvents(); 256a9643ea8Slogwang int new_events = old_events &~ events; 257a9643ea8Slogwang 258a9643ea8Slogwang if (use_ref) { 259a9643ea8Slogwang new_events = old_events; 260a9643ea8Slogwang if (item->ReadRefCnt() == 0) { 261a9643ea8Slogwang new_events = new_events & ~KQ_EVENT_READ; 262a9643ea8Slogwang } 263a9643ea8Slogwang if (item->WriteRefCnt() == 0) { 264a9643ea8Slogwang new_events = new_events & ~KQ_EVENT_WRITE; 265a9643ea8Slogwang } 266a9643ea8Slogwang } 267a9643ea8Slogwang 268a9643ea8Slogwang if (old_events == new_events) 269a9643ea8Slogwang { 270a9643ea8Slogwang return true; 271a9643ea8Slogwang } 272a9643ea8Slogwang KqEvent ke; 273a9643ea8Slogwang int ret; 274a9643ea8Slogwang if (old_events & KQ_EVENT_WRITE) { 275a9643ea8Slogwang EV_SET(&ke, fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL); 276a9643ea8Slogwang ret = ff_kevent(_kqfd, &ke, 1, NULL, 0, NULL); 277a9643ea8Slogwang if (ret == -1) { 278a9643ea8Slogwang kqueue_assert(0); 279a9643ea8Slogwang return false; 280a9643ea8Slogwang } 281a9643ea8Slogwang } 282a9643ea8Slogwang if (old_events & KQ_EVENT_READ) { 283a9643ea8Slogwang EV_SET(&ke, fd, EVFILT_READ, EV_DELETE, 0, 0, NULL); 284a9643ea8Slogwang ret = ff_kevent(_kqfd, &ke, 1, NULL, 0, NULL); 285a9643ea8Slogwang if (ret == -1) { 286a9643ea8Slogwang kqueue_assert(0); 287a9643ea8Slogwang return false; 288a9643ea8Slogwang } 289a9643ea8Slogwang } 290a9643ea8Slogwang 291a9643ea8Slogwang if (new_events & KQ_EVENT_WRITE) { 292a9643ea8Slogwang EV_SET(&ke, fd, EVFILT_WRITE, EV_ADD, 0, 0, NULL); 293a9643ea8Slogwang ret = ff_kevent(_kqfd, &ke, 1, NULL, 0, NULL); 294a9643ea8Slogwang if (ret == -1) { 295a9643ea8Slogwang kqueue_assert(0); 296a9643ea8Slogwang return false; 297a9643ea8Slogwang } 298a9643ea8Slogwang } 299a9643ea8Slogwang if (new_events & KQ_EVENT_READ) { 300a9643ea8Slogwang EV_SET(&ke, fd, EVFILT_READ, EV_ADD, 0, 0, NULL); 301a9643ea8Slogwang ret = ff_kevent(_kqfd, &ke, 1, NULL, 0, NULL); 302a9643ea8Slogwang if (ret == -1) { 303a9643ea8Slogwang kqueue_assert(0); 304a9643ea8Slogwang return false; 305a9643ea8Slogwang } 306a9643ea8Slogwang } 307a9643ea8Slogwang 308a9643ea8Slogwang item->SetListenEvents(new_events); 309a9643ea8Slogwang 310a9643ea8Slogwang return true; 311a9643ea8Slogwang } 312a9643ea8Slogwang 313a9643ea8Slogwang bool KqueueProxy::KqueueAddObj(KqueuerObj* obj) 314a9643ea8Slogwang { 315a9643ea8Slogwang if (obj == NULL) 316a9643ea8Slogwang { 317a9643ea8Slogwang MTLOG_ERROR("kqobj input invalid, %p", obj); 318a9643ea8Slogwang return false; 319a9643ea8Slogwang } 320a9643ea8Slogwang 321a9643ea8Slogwang KqFdRef* item = KqFdRefGet(obj->GetOsfd()); 322a9643ea8Slogwang if (item == NULL) 323a9643ea8Slogwang { 324a9643ea8Slogwang MT_ATTR_API(320851, 1); // fd error 325a9643ea8Slogwang MTLOG_ERROR("kqfd ref not find, failed, fd: %d", obj->GetOsfd()); 326a9643ea8Slogwang kqueue_assert(0); 327a9643ea8Slogwang return false; 328a9643ea8Slogwang } 329a9643ea8Slogwang 330a9643ea8Slogwang int ret = obj->KqueueCtlAdd(item); 331a9643ea8Slogwang if (ret < 0) { 332a9643ea8Slogwang MTLOG_ERROR("kqueue ctrl callback failed, fd: %d, obj: %p", obj->GetOsfd(), obj); 333a9643ea8Slogwang kqueue_assert(0); 334a9643ea8Slogwang return false; 335a9643ea8Slogwang } 336a9643ea8Slogwang 337a9643ea8Slogwang return true; 338a9643ea8Slogwang } 339a9643ea8Slogwang 340a9643ea8Slogwang bool KqueueProxy::KqueueDelObj(KqueuerObj* obj) 341a9643ea8Slogwang { 342a9643ea8Slogwang if (obj == NULL) 343a9643ea8Slogwang { 344a9643ea8Slogwang MTLOG_ERROR("kqobj input invalid, %p", obj); 345a9643ea8Slogwang return false; 346a9643ea8Slogwang } 347a9643ea8Slogwang KqFdRef* item = KqFdRefGet(obj->GetOsfd()); 348a9643ea8Slogwang if (item == NULL) 349a9643ea8Slogwang { 350a9643ea8Slogwang MT_ATTR_API(320851, 1); // fd error 351a9643ea8Slogwang MTLOG_ERROR("kqfd ref not find, failed, fd: %d", obj->GetOsfd()); 352a9643ea8Slogwang kqueue_assert(0); 353a9643ea8Slogwang return false; 354a9643ea8Slogwang } 355a9643ea8Slogwang 356a9643ea8Slogwang int ret = obj->KqueueCtlDel(item); 357a9643ea8Slogwang if (ret < 0) { 358a9643ea8Slogwang MTLOG_ERROR("kqueue ctrl callback failed, fd: %d, obj: %p", obj->GetOsfd(), obj); 359a9643ea8Slogwang kqueue_assert(0); 360a9643ea8Slogwang return false; 361a9643ea8Slogwang } 362a9643ea8Slogwang 363a9643ea8Slogwang return true; 364a9643ea8Slogwang } 365a9643ea8Slogwang 366a9643ea8Slogwang void KqueueProxy::KqueueRcvEventList(int evtfdnum) 367a9643ea8Slogwang { 368a9643ea8Slogwang int ret = 0; 369a9643ea8Slogwang int osfd = 0; 370a9643ea8Slogwang int revents = 0; 371a9643ea8Slogwang int tmp_evts = 0; 372a9643ea8Slogwang KqFdRef* item = NULL; 373a9643ea8Slogwang KqueuerObj* obj = NULL; 374a9643ea8Slogwang 375a9643ea8Slogwang for (int i = 0; i < evtfdnum; i++) 376a9643ea8Slogwang { 377*a02c88d6Slogwang osfd = _evtlist[i].ident; 378a9643ea8Slogwang 379a9643ea8Slogwang item = KqFdRefGet(osfd); 380a9643ea8Slogwang if (item == NULL) 381a9643ea8Slogwang { 382a9643ea8Slogwang MT_ATTR_API(320851, 1); // fd error 383a9643ea8Slogwang MTLOG_ERROR("kqfd ref not find, failed, fd: %d", osfd); 384a9643ea8Slogwang kqueue_assert(0); 385a9643ea8Slogwang continue; 386a9643ea8Slogwang } 387a9643ea8Slogwang tmp_evts = _evtlist[i].filter; 388a9643ea8Slogwang if (tmp_evts == EVFILT_READ) { 389a9643ea8Slogwang revents |= KQ_EVENT_READ; 390a9643ea8Slogwang } 391a9643ea8Slogwang if (tmp_evts == EVFILT_WRITE) { 392a9643ea8Slogwang revents |= KQ_EVENT_WRITE; 393a9643ea8Slogwang } 394a9643ea8Slogwang obj = item->GetNotifyObj(); 395a9643ea8Slogwang if (obj == NULL) 396a9643ea8Slogwang { 397a9643ea8Slogwang MTLOG_ERROR("fd notify obj null, failed, fd: %d", osfd); 398a9643ea8Slogwang KqueueCtrlDel(osfd, (revents & (KQ_EVENT_READ | KQ_EVENT_WRITE))); 399a9643ea8Slogwang continue; 400a9643ea8Slogwang } 401a9643ea8Slogwang obj->SetRcvEvents(revents); 402a9643ea8Slogwang 403a9643ea8Slogwang if (tmp_evts == EV_ERROR) 404a9643ea8Slogwang { 405a9643ea8Slogwang obj->HangupNotify(); 406a9643ea8Slogwang continue; 407a9643ea8Slogwang } 408a9643ea8Slogwang 409a9643ea8Slogwang if (revents & KQ_EVENT_READ) 410a9643ea8Slogwang { 411a9643ea8Slogwang ret = obj->InputNotify(); 412a9643ea8Slogwang if (ret != 0) 413a9643ea8Slogwang { 414a9643ea8Slogwang continue; 415a9643ea8Slogwang } 416a9643ea8Slogwang } 417a9643ea8Slogwang 418a9643ea8Slogwang if (revents & KQ_EVENT_WRITE) 419a9643ea8Slogwang { 420a9643ea8Slogwang ret = obj->OutputNotify(); 421a9643ea8Slogwang if (ret != 0) 422a9643ea8Slogwang { 423a9643ea8Slogwang continue; 424a9643ea8Slogwang } 425a9643ea8Slogwang } 426a9643ea8Slogwang } 427a9643ea8Slogwang } 428a9643ea8Slogwang 429a9643ea8Slogwang void KqueueProxy::KqueueDispatch() 430a9643ea8Slogwang { 431a9643ea8Slogwang int nfd; 432a9643ea8Slogwang int wait_time = KqueueGetTimeout(); 433a9643ea8Slogwang if (wait_time) { 434a9643ea8Slogwang struct timespec ts; 435a9643ea8Slogwang ts.tv_sec = wait_time / 1000; 436a9643ea8Slogwang ts.tv_nsec = 0; 437a9643ea8Slogwang nfd = ff_kevent(_kqfd, NULL, 0, _evtlist, _maxfd, &ts); 438a9643ea8Slogwang } else { 439a9643ea8Slogwang nfd = ff_kevent(_kqfd, NULL, 0, _evtlist, _maxfd, NULL); 440a9643ea8Slogwang } 441a9643ea8Slogwang if (nfd <= 0) 442a9643ea8Slogwang { 443a9643ea8Slogwang return; 444a9643ea8Slogwang } 445a9643ea8Slogwang 446a9643ea8Slogwang KqueueRcvEventList(nfd); 447a9643ea8Slogwang } 448a9643ea8Slogwang 449a9643ea8Slogwang int KqueuerObj::InputNotify() 450a9643ea8Slogwang { 451a9643ea8Slogwang MicroThread* thread = this->GetOwnerThread(); 452a9643ea8Slogwang if (thread == NULL) 453a9643ea8Slogwang { 454a9643ea8Slogwang kqueue_assert(0); 455a9643ea8Slogwang MTLOG_ERROR("kqueue fd obj, no thread ptr, wrong"); 456a9643ea8Slogwang return -1; 457a9643ea8Slogwang } 458a9643ea8Slogwang 459a9643ea8Slogwang if (thread->HasFlag(MicroThread::IO_LIST)) 460a9643ea8Slogwang { 461a9643ea8Slogwang MtFrame* frame = MtFrame::Instance(); 462a9643ea8Slogwang frame->RemoveIoWait(thread); 463a9643ea8Slogwang frame->InsertRunable(thread); 464a9643ea8Slogwang } 465a9643ea8Slogwang 466a9643ea8Slogwang return 0; 467a9643ea8Slogwang } 468a9643ea8Slogwang 469a9643ea8Slogwang int KqueuerObj::OutputNotify() 470a9643ea8Slogwang { 471a9643ea8Slogwang MicroThread* thread = this->GetOwnerThread(); 472a9643ea8Slogwang if (NULL == thread) 473a9643ea8Slogwang { 474a9643ea8Slogwang kqueue_assert(0); 475a9643ea8Slogwang MTLOG_ERROR("kqueue fd obj, no thread ptr, wrong"); 476a9643ea8Slogwang return -1; 477a9643ea8Slogwang } 478a9643ea8Slogwang 479a9643ea8Slogwang // 多个事件同时到达, 防重复操作 480a9643ea8Slogwang if (thread->HasFlag(MicroThread::IO_LIST)) 481a9643ea8Slogwang { 482a9643ea8Slogwang MtFrame* frame = MtFrame::Instance(); 483a9643ea8Slogwang frame->RemoveIoWait(thread); 484a9643ea8Slogwang frame->InsertRunable(thread); 485a9643ea8Slogwang } 486a9643ea8Slogwang 487a9643ea8Slogwang return 0; 488a9643ea8Slogwang } 489a9643ea8Slogwang 490a9643ea8Slogwang int KqueuerObj::HangupNotify() 491a9643ea8Slogwang { 492a9643ea8Slogwang MtFrame* frame = MtFrame::Instance(); 493a9643ea8Slogwang frame->KqueueCtrlDel(this->GetOsfd(), this->GetEvents()); 494a9643ea8Slogwang return 0; 495a9643ea8Slogwang } 496a9643ea8Slogwang 497a9643ea8Slogwang int KqueuerObj::KqueueCtlAdd(void* args) 498a9643ea8Slogwang { 499a9643ea8Slogwang MtFrame* frame = MtFrame::Instance(); 500a9643ea8Slogwang KqFdRef* fd_ref = (KqFdRef*)args; 501a9643ea8Slogwang kqueue_assert(fd_ref != NULL); 502a9643ea8Slogwang 503a9643ea8Slogwang int osfd = this->GetOsfd(); 504a9643ea8Slogwang int new_events = this->GetEvents(); 505a9643ea8Slogwang 506a9643ea8Slogwang // 通知对象需要更新, FD通知对象理论上不会复用, 这里做冲突检查, 异常log记录 507a9643ea8Slogwang KqueuerObj* old_obj = fd_ref->GetNotifyObj(); 508a9643ea8Slogwang if ((old_obj != NULL) && (old_obj != this)) 509a9643ea8Slogwang { 510a9643ea8Slogwang MTLOG_ERROR("kqfd ref conflict, fd: %d, old: %p, now: %p", osfd, old_obj, this); 511a9643ea8Slogwang return -1; 512a9643ea8Slogwang } 513a9643ea8Slogwang fd_ref->SetNotifyObj(this); 514a9643ea8Slogwang 515a9643ea8Slogwang // 调用框架的epoll ctl接口, 屏蔽epoll ctrl细节 516a9643ea8Slogwang if (!frame->KqueueCtrlAdd(osfd, new_events)) 517a9643ea8Slogwang { 518a9643ea8Slogwang MTLOG_ERROR("kqfd ref add failed, log"); 519a9643ea8Slogwang fd_ref->SetNotifyObj(old_obj); 520a9643ea8Slogwang return -2; 521a9643ea8Slogwang } 522a9643ea8Slogwang 523a9643ea8Slogwang return 0; 524a9643ea8Slogwang } 525a9643ea8Slogwang 526a9643ea8Slogwang int KqueuerObj::KqueueCtlDel(void* args) 527a9643ea8Slogwang { 528a9643ea8Slogwang MtFrame* frame = MtFrame::Instance(); 529a9643ea8Slogwang KqFdRef* fd_ref = (KqFdRef*)args; 530a9643ea8Slogwang kqueue_assert(fd_ref != NULL); 531a9643ea8Slogwang 532a9643ea8Slogwang int osfd = this->GetOsfd(); 533a9643ea8Slogwang int events = this->GetEvents(); 534a9643ea8Slogwang 535a9643ea8Slogwang // 通知对象需要更新, FD通知对象理论上不会复用, 这里做冲突检查, 异常log记录 536a9643ea8Slogwang KqueuerObj* old_obj = fd_ref->GetNotifyObj(); 537a9643ea8Slogwang if (old_obj != this) 538a9643ea8Slogwang { 539a9643ea8Slogwang MTLOG_ERROR("kqfd ref conflict, fd: %d, old: %p, now: %p", osfd, old_obj, this); 540a9643ea8Slogwang return -1; 541a9643ea8Slogwang } 542a9643ea8Slogwang fd_ref->SetNotifyObj(NULL); 543a9643ea8Slogwang 544a9643ea8Slogwang // 调用框架的epoll ctl接口, 屏蔽epoll ctrl细节 545a9643ea8Slogwang if (!frame->KqueueCtrlDelRef(osfd, events, false)) // 引用有风险, 弊大于利, 关闭掉 546a9643ea8Slogwang { 547a9643ea8Slogwang MTLOG_ERROR("kqfd ref del failed, log"); 548a9643ea8Slogwang fd_ref->SetNotifyObj(old_obj); 549a9643ea8Slogwang return -2; 550a9643ea8Slogwang } 551a9643ea8Slogwang 552a9643ea8Slogwang return 0; 553a9643ea8Slogwang 554a9643ea8Slogwang } 555a9643ea8Slogwang 556