1*a9643ea8Slogwang 2*a9643ea8Slogwang /** 3*a9643ea8Slogwang * Tencent is pleased to support the open source community by making MSEC available. 4*a9643ea8Slogwang * 5*a9643ea8Slogwang * Copyright (C) 2016 THL A29 Limited, a Tencent company. All rights reserved. 6*a9643ea8Slogwang * 7*a9643ea8Slogwang * Licensed under the GNU General Public License, Version 2.0 (the "License"); 8*a9643ea8Slogwang * you may not use this file except in compliance with the License. You may 9*a9643ea8Slogwang * obtain a copy of the License at 10*a9643ea8Slogwang * 11*a9643ea8Slogwang * https://opensource.org/licenses/GPL-2.0 12*a9643ea8Slogwang * 13*a9643ea8Slogwang * Unless required by applicable law or agreed to in writing, software distributed under the 14*a9643ea8Slogwang * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, 15*a9643ea8Slogwang * either express or implied. See the License for the specific language governing permissions 16*a9643ea8Slogwang * and limitations under the License. 17*a9643ea8Slogwang */ 18*a9643ea8Slogwang 19*a9643ea8Slogwang 20*a9643ea8Slogwang /** 21*a9643ea8Slogwang * @filename kqueue.h 22*a9643ea8Slogwang * @info kqueue for micro thread manage 23*a9643ea8Slogwang */ 24*a9643ea8Slogwang 25*a9643ea8Slogwang #ifndef _KQUEUE_PROXY___ 26*a9643ea8Slogwang #define _KQUEUE_PROXY___ 27*a9643ea8Slogwang 28*a9643ea8Slogwang #include <stdlib.h> 29*a9643ea8Slogwang #include <unistd.h> 30*a9643ea8Slogwang #include <sys/queue.h> 31*a9643ea8Slogwang 32*a9643ea8Slogwang #include "ff_api.h" 33*a9643ea8Slogwang 34*a9643ea8Slogwang #include <set> 35*a9643ea8Slogwang #include <vector> 36*a9643ea8Slogwang using std::set; 37*a9643ea8Slogwang using std::vector; 38*a9643ea8Slogwang 39*a9643ea8Slogwang #define kqueue_assert(statement) 40*a9643ea8Slogwang //#define kqueue_assert(statement) assert(statement) 41*a9643ea8Slogwang 42*a9643ea8Slogwang namespace NS_MICRO_THREAD { 43*a9643ea8Slogwang 44*a9643ea8Slogwang #define KQ_EVENT_NONE 0 45*a9643ea8Slogwang #define KQ_EVENT_READ 1 46*a9643ea8Slogwang #define KQ_EVENT_WRITE 2 47*a9643ea8Slogwang 48*a9643ea8Slogwang /******************************************************************************/ 49*a9643ea8Slogwang /* 操作系统头文件适配定义 */ 50*a9643ea8Slogwang /******************************************************************************/ 51*a9643ea8Slogwang 52*a9643ea8Slogwang /** 53*a9643ea8Slogwang * @brief add more detail for linux <sys/queue.h>, freebsd and University of California 54*a9643ea8Slogwang * @info queue.h version 8.3 (suse) diff version 8.5 (tlinux) 55*a9643ea8Slogwang */ 56*a9643ea8Slogwang #ifndef TAILQ_CONCAT 57*a9643ea8Slogwang 58*a9643ea8Slogwang #define TAILQ_EMPTY(head) ((head)->tqh_first == NULL) 59*a9643ea8Slogwang #define TAILQ_FIRST(head) ((head)->tqh_first) 60*a9643ea8Slogwang #define TAILQ_NEXT(elm, field) ((elm)->field.tqe_next) 61*a9643ea8Slogwang 62*a9643ea8Slogwang #define TAILQ_LAST(head, headname) \ 63*a9643ea8Slogwang (*(((struct headname *)((head)->tqh_last))->tqh_last)) 64*a9643ea8Slogwang 65*a9643ea8Slogwang #define TAILQ_FOREACH(var, head, field) \ 66*a9643ea8Slogwang for ((var) = TAILQ_FIRST((head)); \ 67*a9643ea8Slogwang (var); \ 68*a9643ea8Slogwang (var) = TAILQ_NEXT((var), field)) 69*a9643ea8Slogwang 70*a9643ea8Slogwang #define TAILQ_CONCAT(head1, head2, field) \ 71*a9643ea8Slogwang do { \ 72*a9643ea8Slogwang if (!TAILQ_EMPTY(head2)) { \ 73*a9643ea8Slogwang *(head1)->tqh_last = (head2)->tqh_first; \ 74*a9643ea8Slogwang (head2)->tqh_first->field.tqe_prev = (head1)->tqh_last; \ 75*a9643ea8Slogwang (head1)->tqh_last = (head2)->tqh_last; \ 76*a9643ea8Slogwang TAILQ_INIT((head2)); \ 77*a9643ea8Slogwang } \ 78*a9643ea8Slogwang } while (0) 79*a9643ea8Slogwang 80*a9643ea8Slogwang #endif 81*a9643ea8Slogwang 82*a9643ea8Slogwang #ifndef TAILQ_FOREACH_SAFE // tlinux no this define 83*a9643ea8Slogwang #define TAILQ_FOREACH_SAFE(var, head, field, tvar) \ 84*a9643ea8Slogwang for ((var) = TAILQ_FIRST((head)); \ 85*a9643ea8Slogwang (var) && ((tvar) = TAILQ_NEXT((var), field), 1); \ 86*a9643ea8Slogwang (var) = (tvar)) 87*a9643ea8Slogwang #endif 88*a9643ea8Slogwang 89*a9643ea8Slogwang 90*a9643ea8Slogwang 91*a9643ea8Slogwang 92*a9643ea8Slogwang /******************************************************************************/ 93*a9643ea8Slogwang /* Kqueue proxy 定义与实现部分 */ 94*a9643ea8Slogwang /******************************************************************************/ 95*a9643ea8Slogwang 96*a9643ea8Slogwang class KqueueProxy; 97*a9643ea8Slogwang class MicroThread; 98*a9643ea8Slogwang 99*a9643ea8Slogwang /** 100*a9643ea8Slogwang * @brief kqueue通知对象基类定义 101*a9643ea8Slogwang */ 102*a9643ea8Slogwang class KqueuerObj 103*a9643ea8Slogwang { 104*a9643ea8Slogwang protected: 105*a9643ea8Slogwang int _fd; 106*a9643ea8Slogwang int _events; 107*a9643ea8Slogwang int _revents; 108*a9643ea8Slogwang int _type; 109*a9643ea8Slogwang MicroThread* _thread; 110*a9643ea8Slogwang 111*a9643ea8Slogwang public: 112*a9643ea8Slogwang 113*a9643ea8Slogwang TAILQ_ENTRY(KqueuerObj) _entry; 114*a9643ea8Slogwang 115*a9643ea8Slogwang explicit KqueuerObj(int fd = -1) { 116*a9643ea8Slogwang _fd = fd; 117*a9643ea8Slogwang _events = 0; 118*a9643ea8Slogwang _revents = 0; 119*a9643ea8Slogwang _type = 0; 120*a9643ea8Slogwang _thread = NULL; 121*a9643ea8Slogwang }; 122*a9643ea8Slogwang virtual ~KqueuerObj(){}; 123*a9643ea8Slogwang 124*a9643ea8Slogwang virtual int InputNotify(); 125*a9643ea8Slogwang virtual int OutputNotify(); 126*a9643ea8Slogwang virtual int HangupNotify(); 127*a9643ea8Slogwang virtual int KqueueCtlAdd(void* args); 128*a9643ea8Slogwang virtual int KqueueCtlDel(void* args); 129*a9643ea8Slogwang 130*a9643ea8Slogwang /** 131*a9643ea8Slogwang * @brief fd打开可读事件侦听 132*a9643ea8Slogwang */ 133*a9643ea8Slogwang void EnableInput() { _events |= KQ_EVENT_READ; }; 134*a9643ea8Slogwang 135*a9643ea8Slogwang /** 136*a9643ea8Slogwang * @brief fd打开可写事件侦听 137*a9643ea8Slogwang */ 138*a9643ea8Slogwang void EnableOutput() { _events |= KQ_EVENT_WRITE; }; 139*a9643ea8Slogwang 140*a9643ea8Slogwang /** 141*a9643ea8Slogwang * @brief fd关闭可读事件侦听 142*a9643ea8Slogwang */ 143*a9643ea8Slogwang void DisableInput() { _events &= ~KQ_EVENT_READ; }; 144*a9643ea8Slogwang 145*a9643ea8Slogwang /** 146*a9643ea8Slogwang * @brief fd关闭可写事件侦听 147*a9643ea8Slogwang */ 148*a9643ea8Slogwang void DisableOutput() { _events &= ~KQ_EVENT_WRITE; }; 149*a9643ea8Slogwang 150*a9643ea8Slogwang /** 151*a9643ea8Slogwang * @brief 系统socket设置读取封装 152*a9643ea8Slogwang */ 153*a9643ea8Slogwang int GetOsfd() { return _fd; }; 154*a9643ea8Slogwang void SetOsfd(int fd) { _fd = fd; }; 155*a9643ea8Slogwang 156*a9643ea8Slogwang /** 157*a9643ea8Slogwang * @brief 监听事件与收到事件的访问方法 158*a9643ea8Slogwang */ 159*a9643ea8Slogwang int GetEvents() { return _events; }; 160*a9643ea8Slogwang void SetRcvEvents(int revents) { _revents = revents; }; 161*a9643ea8Slogwang int GetRcvEvents() { return _revents; }; 162*a9643ea8Slogwang 163*a9643ea8Slogwang /** 164*a9643ea8Slogwang * @brief 工厂管理方法, 获取真实类型 165*a9643ea8Slogwang */ 166*a9643ea8Slogwang int GetNtfyType() { return _type; }; 167*a9643ea8Slogwang virtual void Reset() { 168*a9643ea8Slogwang _fd = -1; 169*a9643ea8Slogwang _events = 0; 170*a9643ea8Slogwang _revents = 0; 171*a9643ea8Slogwang _type = 0; 172*a9643ea8Slogwang _thread = NULL; 173*a9643ea8Slogwang }; 174*a9643ea8Slogwang 175*a9643ea8Slogwang /** 176*a9643ea8Slogwang * @brief 设置与获取所属的微线程句柄接口 177*a9643ea8Slogwang * @param thread 关联的线程指针 178*a9643ea8Slogwang */ 179*a9643ea8Slogwang void SetOwnerThread(MicroThread* thread) { _thread = thread; }; 180*a9643ea8Slogwang MicroThread* GetOwnerThread() { return _thread; }; 181*a9643ea8Slogwang 182*a9643ea8Slogwang }; 183*a9643ea8Slogwang 184*a9643ea8Slogwang typedef TAILQ_HEAD(__KqFdList, KqueuerObj) KqObjList; ///< 高效的双链管理 185*a9643ea8Slogwang typedef struct kevent KqEvent; ///< 重定义一下kqueue event 186*a9643ea8Slogwang 187*a9643ea8Slogwang 188*a9643ea8Slogwang /** 189*a9643ea8Slogwang * @brief EPOLL支持同一FD多个线程侦听, 建立一个引用计数数组, 元素定义 190*a9643ea8Slogwang * @info 引用计数弊大于利, 没有实际意义, 字段保留, 功能移除掉 20150623 191*a9643ea8Slogwang */ 192*a9643ea8Slogwang class KqFdRef 193*a9643ea8Slogwang { 194*a9643ea8Slogwang private: 195*a9643ea8Slogwang int _wr_ref; ///< 监听写的引用计数 196*a9643ea8Slogwang int _rd_ref; ///< 监听读的引用计数 197*a9643ea8Slogwang int _events; ///< 当前正在侦听的事件列表 198*a9643ea8Slogwang int _revents; ///< 当前该fd收到的事件信息, 仅在epoll_wait后处理中有效 199*a9643ea8Slogwang KqueuerObj* _kqobj; ///< 单独注册调度器对象,一个fd关联一个对象 200*a9643ea8Slogwang 201*a9643ea8Slogwang public: 202*a9643ea8Slogwang 203*a9643ea8Slogwang /** 204*a9643ea8Slogwang * @brief 构造与析构函数 205*a9643ea8Slogwang */ 206*a9643ea8Slogwang KqFdRef() { 207*a9643ea8Slogwang _wr_ref = 0; 208*a9643ea8Slogwang _rd_ref = 0; 209*a9643ea8Slogwang _events = 0; 210*a9643ea8Slogwang _revents = 0; 211*a9643ea8Slogwang _kqobj = NULL; 212*a9643ea8Slogwang }; 213*a9643ea8Slogwang ~KqFdRef(){}; 214*a9643ea8Slogwang 215*a9643ea8Slogwang /** 216*a9643ea8Slogwang * @brief 监听事件获取与设置接口 217*a9643ea8Slogwang */ 218*a9643ea8Slogwang void SetListenEvents(int events) { 219*a9643ea8Slogwang _events = events; 220*a9643ea8Slogwang }; 221*a9643ea8Slogwang int GetListenEvents() { 222*a9643ea8Slogwang return _events; 223*a9643ea8Slogwang }; 224*a9643ea8Slogwang 225*a9643ea8Slogwang /** 226*a9643ea8Slogwang * @brief 监听对象获取与设置接口 227*a9643ea8Slogwang */ 228*a9643ea8Slogwang void SetNotifyObj(KqueuerObj* ntfy) { 229*a9643ea8Slogwang _kqobj = ntfy; 230*a9643ea8Slogwang }; 231*a9643ea8Slogwang KqueuerObj* GetNotifyObj() { 232*a9643ea8Slogwang return _kqobj; 233*a9643ea8Slogwang }; 234*a9643ea8Slogwang 235*a9643ea8Slogwang /** 236*a9643ea8Slogwang * @brief 监听引用计数的更新 237*a9643ea8Slogwang */ 238*a9643ea8Slogwang void AttachEvents(int event) { 239*a9643ea8Slogwang if (event & KQ_EVENT_READ) { 240*a9643ea8Slogwang _rd_ref++; 241*a9643ea8Slogwang } 242*a9643ea8Slogwang if (event & KQ_EVENT_WRITE){ 243*a9643ea8Slogwang _wr_ref++; 244*a9643ea8Slogwang } 245*a9643ea8Slogwang }; 246*a9643ea8Slogwang void DetachEvents(int event) { 247*a9643ea8Slogwang if (event & KQ_EVENT_READ) { 248*a9643ea8Slogwang if (_rd_ref > 0) { 249*a9643ea8Slogwang _rd_ref--; 250*a9643ea8Slogwang } else { 251*a9643ea8Slogwang _rd_ref = 0; 252*a9643ea8Slogwang } 253*a9643ea8Slogwang } 254*a9643ea8Slogwang if (event & KQ_EVENT_WRITE){ 255*a9643ea8Slogwang if (_wr_ref > 0) { 256*a9643ea8Slogwang _wr_ref--; 257*a9643ea8Slogwang } else { 258*a9643ea8Slogwang _wr_ref = 0; 259*a9643ea8Slogwang } 260*a9643ea8Slogwang } 261*a9643ea8Slogwang }; 262*a9643ea8Slogwang 263*a9643ea8Slogwang /** 264*a9643ea8Slogwang * @brief 获取引用计数 265*a9643ea8Slogwang */ 266*a9643ea8Slogwang int ReadRefCnt() { return _rd_ref; }; 267*a9643ea8Slogwang int WriteRefCnt() { return _wr_ref; }; 268*a9643ea8Slogwang 269*a9643ea8Slogwang }; 270*a9643ea8Slogwang 271*a9643ea8Slogwang 272*a9643ea8Slogwang class KqueueProxy 273*a9643ea8Slogwang { 274*a9643ea8Slogwang public: 275*a9643ea8Slogwang static const int DEFAULT_MAX_FD_NUM = 100000; 276*a9643ea8Slogwang 277*a9643ea8Slogwang private: 278*a9643ea8Slogwang int _kqfd; 279*a9643ea8Slogwang int _maxfd; 280*a9643ea8Slogwang KqEvent* _evtlist; 281*a9643ea8Slogwang KqFdRef* _kqrefs; 282*a9643ea8Slogwang 283*a9643ea8Slogwang public: 284*a9643ea8Slogwang KqueueProxy(); 285*a9643ea8Slogwang virtual ~KqueueProxy(){}; 286*a9643ea8Slogwang 287*a9643ea8Slogwang int InitKqueue(int max_num); 288*a9643ea8Slogwang void TermKqueue(void); 289*a9643ea8Slogwang 290*a9643ea8Slogwang virtual int KqueueGetTimeout(void) { return 0; }; 291*a9643ea8Slogwang virtual bool KqueueSchedule(KqObjList* fdlist, KqueuerObj* fd, int timeout) { return false; }; 292*a9643ea8Slogwang 293*a9643ea8Slogwang bool KqueueAdd(KqObjList& fdset); 294*a9643ea8Slogwang bool KqueueDel(KqObjList& fdset); 295*a9643ea8Slogwang void KqueueDispatch(void); 296*a9643ea8Slogwang bool KqueueAddObj(KqueuerObj* obj); 297*a9643ea8Slogwang bool KqueueDelObj(KqueuerObj* obj); 298*a9643ea8Slogwang bool KqueueCtrlAdd(int fd, int new_events); 299*a9643ea8Slogwang bool KqueueCtrlDel(int fd, int new_events); 300*a9643ea8Slogwang bool KqueueCtrlDelRef(int fd, int new_events, bool use_ref); 301*a9643ea8Slogwang 302*a9643ea8Slogwang KqFdRef* KqFdRefGet(int fd) { 303*a9643ea8Slogwang return ((fd >= _maxfd) || (fd < 0)) ? (KqFdRef*)NULL : &_kqrefs[fd]; 304*a9643ea8Slogwang } 305*a9643ea8Slogwang 306*a9643ea8Slogwang void KqueueNtfyReg(int fd, KqueuerObj* obj) { 307*a9643ea8Slogwang KqFdRef* ref = KqFdRefGet(fd); 308*a9643ea8Slogwang if (ref) { 309*a9643ea8Slogwang ref->SetNotifyObj(obj); 310*a9643ea8Slogwang } 311*a9643ea8Slogwang }; 312*a9643ea8Slogwang 313*a9643ea8Slogwang protected: 314*a9643ea8Slogwang void KqueueRcvEventList(int evtfdnum); 315*a9643ea8Slogwang }; 316*a9643ea8Slogwang 317*a9643ea8Slogwang } 318*a9643ea8Slogwang 319*a9643ea8Slogwang 320*a9643ea8Slogwang #endif 321