1 2 /** 3 * Tencent is pleased to support the open source community by making MSEC available. 4 * 5 * Copyright (C) 2016 THL A29 Limited, a Tencent company. All rights reserved. 6 * 7 * Licensed under the GNU General Public License, Version 2.0 (the "License"); 8 * you may not use this file except in compliance with the License. You may 9 * obtain a copy of the License at 10 * 11 * https://opensource.org/licenses/GPL-2.0 12 * 13 * Unless required by applicable law or agreed to in writing, software distributed under the 14 * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, 15 * either express or implied. See the License for the specific language governing permissions 16 * and limitations under the License. 17 */ 18 19 20 /** 21 * @filename kqueue.h 22 * @info kqueue for micro thread manage 23 */ 24 25 #ifndef _KQUEUE_PROXY___ 26 #define _KQUEUE_PROXY___ 27 28 #include <stdlib.h> 29 #include <unistd.h> 30 #include <sys/queue.h> 31 32 #include "ff_api.h" 33 34 #include <set> 35 #include <vector> 36 using std::set; 37 using std::vector; 38 39 #define kqueue_assert(statement) 40 //#define kqueue_assert(statement) assert(statement) 41 42 namespace NS_MICRO_THREAD { 43 44 #define KQ_EVENT_NONE 0 45 #define KQ_EVENT_READ 1 46 #define KQ_EVENT_WRITE 2 47 48 /******************************************************************************/ 49 /* 操作系统头文件适配定义 */ 50 /******************************************************************************/ 51 52 /** 53 * @brief add more detail for linux <sys/queue.h>, freebsd and University of California 54 * @info queue.h version 8.3 (suse) diff version 8.5 (tlinux) 55 */ 56 #ifndef TAILQ_CONCAT 57 58 #define TAILQ_EMPTY(head) ((head)->tqh_first == NULL) 59 #define TAILQ_FIRST(head) ((head)->tqh_first) 60 #define TAILQ_NEXT(elm, field) ((elm)->field.tqe_next) 61 62 #define TAILQ_LAST(head, headname) \ 63 (*(((struct headname *)((head)->tqh_last))->tqh_last)) 64 65 #define TAILQ_FOREACH(var, head, field) \ 66 for ((var) = TAILQ_FIRST((head)); \ 67 (var); \ 68 (var) = TAILQ_NEXT((var), field)) 69 70 #define TAILQ_CONCAT(head1, head2, field) \ 71 do { \ 72 if (!TAILQ_EMPTY(head2)) { \ 73 *(head1)->tqh_last = (head2)->tqh_first; \ 74 (head2)->tqh_first->field.tqe_prev = (head1)->tqh_last; \ 75 (head1)->tqh_last = (head2)->tqh_last; \ 76 TAILQ_INIT((head2)); \ 77 } \ 78 } while (0) 79 80 #endif 81 82 #ifndef TAILQ_FOREACH_SAFE // tlinux no this define 83 #define TAILQ_FOREACH_SAFE(var, head, field, tvar) \ 84 for ((var) = TAILQ_FIRST((head)); \ 85 (var) && ((tvar) = TAILQ_NEXT((var), field), 1); \ 86 (var) = (tvar)) 87 #endif 88 89 90 91 92 /******************************************************************************/ 93 /* Kqueue proxy 定义与实现部分 */ 94 /******************************************************************************/ 95 96 class KqueueProxy; 97 class MicroThread; 98 99 /** 100 * @brief kqueue通知对象基类定义 101 */ 102 class KqueuerObj 103 { 104 protected: 105 int _fd; 106 int _events; 107 int _revents; 108 int _type; 109 MicroThread* _thread; 110 111 public: 112 113 TAILQ_ENTRY(KqueuerObj) _entry; 114 115 explicit KqueuerObj(int fd = -1) { 116 _fd = fd; 117 _events = 0; 118 _revents = 0; 119 _type = 0; 120 _thread = NULL; 121 }; 122 virtual ~KqueuerObj(){}; 123 124 virtual int InputNotify(); 125 virtual int OutputNotify(); 126 virtual int HangupNotify(); 127 virtual int KqueueCtlAdd(void* args); 128 virtual int KqueueCtlDel(void* args); 129 130 /** 131 * @brief fd打开可读事件侦听 132 */ 133 void EnableInput() { _events |= KQ_EVENT_READ; }; 134 135 /** 136 * @brief fd打开可写事件侦听 137 */ 138 void EnableOutput() { _events |= KQ_EVENT_WRITE; }; 139 140 /** 141 * @brief fd关闭可读事件侦听 142 */ 143 void DisableInput() { _events &= ~KQ_EVENT_READ; }; 144 145 /** 146 * @brief fd关闭可写事件侦听 147 */ 148 void DisableOutput() { _events &= ~KQ_EVENT_WRITE; }; 149 150 /** 151 * @brief 系统socket设置读取封装 152 */ 153 int GetOsfd() { return _fd; }; 154 void SetOsfd(int fd) { _fd = fd; }; 155 156 /** 157 * @brief 监听事件与收到事件的访问方法 158 */ 159 int GetEvents() { return _events; }; 160 void SetRcvEvents(int revents) { _revents = revents; }; 161 int GetRcvEvents() { return _revents; }; 162 163 /** 164 * @brief 工厂管理方法, 获取真实类型 165 */ 166 int GetNtfyType() { return _type; }; 167 virtual void Reset() { 168 _fd = -1; 169 _events = 0; 170 _revents = 0; 171 _type = 0; 172 _thread = NULL; 173 }; 174 175 /** 176 * @brief 设置与获取所属的微线程句柄接口 177 * @param thread 关联的线程指针 178 */ 179 void SetOwnerThread(MicroThread* thread) { _thread = thread; }; 180 MicroThread* GetOwnerThread() { return _thread; }; 181 182 }; 183 184 typedef TAILQ_HEAD(__KqFdList, KqueuerObj) KqObjList; ///< 高效的双链管理 185 typedef struct kevent KqEvent; ///< 重定义一下kqueue event 186 187 188 /** 189 * @brief EPOLL支持同一FD多个线程侦听, 建立一个引用计数数组, 元素定义 190 * @info 引用计数弊大于利, 没有实际意义, 字段保留, 功能移除掉 20150623 191 */ 192 class KqFdRef 193 { 194 private: 195 int _wr_ref; ///< 监听写的引用计数 196 int _rd_ref; ///< 监听读的引用计数 197 int _events; ///< 当前正在侦听的事件列表 198 int _revents; ///< 当前该fd收到的事件信息, 仅在epoll_wait后处理中有效 199 KqueuerObj* _kqobj; ///< 单独注册调度器对象,一个fd关联一个对象 200 201 public: 202 203 /** 204 * @brief 构造与析构函数 205 */ 206 KqFdRef() { 207 _wr_ref = 0; 208 _rd_ref = 0; 209 _events = 0; 210 _revents = 0; 211 _kqobj = NULL; 212 }; 213 ~KqFdRef(){}; 214 215 /** 216 * @brief 监听事件获取与设置接口 217 */ 218 void SetListenEvents(int events) { 219 _events = events; 220 }; 221 int GetListenEvents() { 222 return _events; 223 }; 224 225 /** 226 * @brief 监听对象获取与设置接口 227 */ 228 void SetNotifyObj(KqueuerObj* ntfy) { 229 _kqobj = ntfy; 230 }; 231 KqueuerObj* GetNotifyObj() { 232 return _kqobj; 233 }; 234 235 /** 236 * @brief 监听引用计数的更新 237 */ 238 void AttachEvents(int event) { 239 if (event & KQ_EVENT_READ) { 240 _rd_ref++; 241 } 242 if (event & KQ_EVENT_WRITE){ 243 _wr_ref++; 244 } 245 }; 246 void DetachEvents(int event) { 247 if (event & KQ_EVENT_READ) { 248 if (_rd_ref > 0) { 249 _rd_ref--; 250 } else { 251 _rd_ref = 0; 252 } 253 } 254 if (event & KQ_EVENT_WRITE){ 255 if (_wr_ref > 0) { 256 _wr_ref--; 257 } else { 258 _wr_ref = 0; 259 } 260 } 261 }; 262 263 /** 264 * @brief 获取引用计数 265 */ 266 int ReadRefCnt() { return _rd_ref; }; 267 int WriteRefCnt() { return _wr_ref; }; 268 269 }; 270 271 272 class KqueueProxy 273 { 274 public: 275 static const int DEFAULT_MAX_FD_NUM = 100000; 276 277 private: 278 int _kqfd; 279 int _maxfd; 280 KqEvent* _evtlist; 281 KqFdRef* _kqrefs; 282 283 public: 284 KqueueProxy(); 285 virtual ~KqueueProxy(){}; 286 287 int InitKqueue(int max_num); 288 void TermKqueue(void); 289 290 virtual int KqueueGetTimeout(void) { return 0; }; 291 virtual bool KqueueSchedule(KqObjList* fdlist, KqueuerObj* fd, int timeout) { return false; }; 292 293 bool KqueueAdd(KqObjList& fdset); 294 bool KqueueDel(KqObjList& fdset); 295 void KqueueDispatch(void); 296 bool KqueueAddObj(KqueuerObj* obj); 297 bool KqueueDelObj(KqueuerObj* obj); 298 bool KqueueCtrlAdd(int fd, int new_events); 299 bool KqueueCtrlDel(int fd, int new_events); 300 bool KqueueCtrlDelRef(int fd, int new_events, bool use_ref); 301 302 KqFdRef* KqFdRefGet(int fd) { 303 return ((fd >= _maxfd) || (fd < 0)) ? (KqFdRef*)NULL : &_kqrefs[fd]; 304 } 305 306 void KqueueNtfyReg(int fd, KqueuerObj* obj) { 307 KqFdRef* ref = KqFdRefGet(fd); 308 if (ref) { 309 ref->SetNotifyObj(obj); 310 } 311 }; 312 313 protected: 314 void KqueueRcvEventList(int evtfdnum); 315 }; 316 317 } 318 319 320 #endif 321