xref: /f-stack/app/micro_thread/kqueue_proxy.h (revision 7abd0fb2)
1 
2 /**
3  * Tencent is pleased to support the open source community by making MSEC available.
4  *
5  * Copyright (C) 2016 THL A29 Limited, a Tencent company. All rights reserved.
6  *
7  * Licensed under the GNU General Public License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License. You may
9  * obtain a copy of the License at
10  *
11  *     https://opensource.org/licenses/GPL-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software distributed under the
14  * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
15  * either express or implied. See the License for the specific language governing permissions
16  * and limitations under the License.
17  */
18 
19 
20 /**
21  *  @filename kqueue.h
22  *  @info     kqueue for micro thread manage
23  */
24 
25 #ifndef _KQUEUE_PROXY___
26 #define _KQUEUE_PROXY___
27 
28 #include <stdlib.h>
29 #include <unistd.h>
30 #include <sys/queue.h>
31 
32 #include "ff_api.h"
33 
34 #include <set>
35 #include <vector>
36 using std::set;
37 using std::vector;
38 
39 #define kqueue_assert(statement)
40 //#define kqueue_assert(statement) assert(statement)
41 
42 namespace NS_MICRO_THREAD {
43 
44 #define KQ_EVENT_NONE 0
45 #define KQ_EVENT_READ 1
46 #define KQ_EVENT_WRITE 2
47 
48 /******************************************************************************/
49 /*  操作系统头文件适配定义                                                    */
50 /******************************************************************************/
51 
52 /**
53  * @brief add more detail for linux <sys/queue.h>, freebsd and University of California
54  * @info  queue.h version 8.3 (suse)  diff version 8.5 (tlinux)
55  */
56 #ifndef TAILQ_CONCAT
57 
58 #define TAILQ_EMPTY(head)   ((head)->tqh_first == NULL)
59 #define TAILQ_FIRST(head)   ((head)->tqh_first)
60 #define TAILQ_NEXT(elm, field) ((elm)->field.tqe_next)
61 
62 #define TAILQ_LAST(head, headname) \
63         (*(((struct headname *)((head)->tqh_last))->tqh_last))
64 
65 #define TAILQ_FOREACH(var, head, field)                                     \
66         for ((var) = TAILQ_FIRST((head));                                   \
67              (var);                                                         \
68              (var) = TAILQ_NEXT((var), field))
69 
70 #define TAILQ_CONCAT(head1, head2, field)                                   \
71 do {                                                                        \
72     if (!TAILQ_EMPTY(head2)) {                                              \
73         *(head1)->tqh_last = (head2)->tqh_first;                            \
74         (head2)->tqh_first->field.tqe_prev = (head1)->tqh_last;             \
75         (head1)->tqh_last = (head2)->tqh_last;                              \
76         TAILQ_INIT((head2));                                                \
77     }                                                                       \
78 } while (0)
79 
80 #endif
81 
82 #ifndef TAILQ_FOREACH_SAFE      // tlinux no this define
83 #define TAILQ_FOREACH_SAFE(var, head, field, tvar)                          \
84         for ((var) = TAILQ_FIRST((head));                                   \
85              (var) && ((tvar) = TAILQ_NEXT((var), field), 1);               \
86              (var) = (tvar))
87 #endif
88 
89 
90 
91 
92 /******************************************************************************/
93 /*  Kqueue proxy 定义与实现部分                                                */
94 /******************************************************************************/
95 
96 class KqueueProxy;
97 class MicroThread;
98 
99 /**
100  *  @brief kqueue通知对象基类定义
101  */
102 class KqueuerObj
103 {
104 	protected:
105 		int _fd;
106 		int _events;
107 		int _revents;
108 		int _type;
109 		MicroThread* _thread;
110 
111 	public:
112 
113 		TAILQ_ENTRY(KqueuerObj) _entry;
114 
115 		explicit KqueuerObj(int fd = -1) {
116 			_fd       = fd;
117 			_events   = 0;
118 			_revents  = 0;
119 			_type     = 0;
120 			_thread   = NULL;
121 		};
122 		virtual ~KqueuerObj(){};
123 
124 		virtual int InputNotify();
125 		virtual int OutputNotify();
126 		virtual int HangupNotify();
127 		virtual int KqueueCtlAdd(void* args);
128 		virtual int KqueueCtlDel(void* args);
129 
130 		/**
131 		 *  @brief fd打开可读事件侦听
132 		 */
133 		void EnableInput() {    _events |= KQ_EVENT_READ; };
134 
135 		/**
136 		 *  @brief fd打开可写事件侦听
137 		 */
138 		void EnableOutput() {     _events |= KQ_EVENT_WRITE; };
139 
140 		/**
141 		 *  @brief fd关闭可读事件侦听
142 		 */
143 		void DisableInput() {   _events &= ~KQ_EVENT_READ; };
144 
145 		/**
146 		 *  @brief fd关闭可写事件侦听
147 		 */
148 		void DisableOutput() {    _events &= ~KQ_EVENT_WRITE; };
149 
150 		/**
151 		 *  @brief 系统socket设置读取封装
152 		 */
153 		int GetOsfd() { return _fd; };
154 		void SetOsfd(int fd) {   _fd = fd; };
155 
156 		/**
157 		 *  @brief 监听事件与收到事件的访问方法
158 		 */
159 		int GetEvents() { return _events; };
160 		void SetRcvEvents(int revents) { _revents = revents; };
161 		int GetRcvEvents() { return _revents; };
162 
163 		/**
164 		 *  @brief 工厂管理方法, 获取真实类型
165 		 */
166 		int GetNtfyType() {    return _type; };
167 		virtual void Reset() {
168 			_fd      = -1;
169 			_events  = 0;
170 			_revents = 0;
171 			_type    = 0;
172 			_thread  = NULL;
173 		};
174 
175 		/**
176 		 *  @brief 设置与获取所属的微线程句柄接口
177 		 *  @param thread 关联的线程指针
178 		 */
179 		void SetOwnerThread(MicroThread* thread) {      _thread = thread; };
180 		MicroThread* GetOwnerThread() {        return _thread; };
181 
182 };
183 
184 typedef TAILQ_HEAD(__KqFdList, KqueuerObj) KqObjList;  ///< 高效的双链管理
185 typedef struct kevent KqEvent;                 ///< 重定义一下kqueue event
186 
187 
188 /**
189  *  @brief EPOLL支持同一FD多个线程侦听, 建立一个引用计数数组, 元素定义
190  *  @info  引用计数弊大于利, 没有实际意义, 字段保留, 功能移除掉 20150623
191  */
192 class KqFdRef
193 {
194 private:
195     int _wr_ref;             ///< 监听写的引用计数
196     int _rd_ref;             ///< 监听读的引用计数
197     int _events;             ///< 当前正在侦听的事件列表
198     int _revents;            ///< 当前该fd收到的事件信息, 仅在epoll_wait后处理中有效
199     KqueuerObj* _kqobj;      ///< 单独注册调度器对象,一个fd关联一个对象
200 
201 public:
202 
203     /**
204      *  @brief 构造与析构函数
205      */
206     KqFdRef() {
207         _wr_ref  = 0;
208         _rd_ref  = 0;
209         _events  = 0;
210         _revents = 0;
211         _kqobj   = NULL;
212     };
213     ~KqFdRef(){};
214 
215     /**
216      *  @brief 监听事件获取与设置接口
217      */
218     void SetListenEvents(int events) {
219         _events = events;
220     };
221     int GetListenEvents() {
222         return _events;
223     };
224 
225     /**
226      *  @brief 监听对象获取与设置接口
227      */
228     void SetNotifyObj(KqueuerObj* ntfy) {
229         _kqobj = ntfy;
230     };
231     KqueuerObj* GetNotifyObj() {
232         return _kqobj;
233     };
234 
235     /**
236      *  @brief 监听引用计数的更新
237      */
238     void AttachEvents(int event) {
239         if (event & KQ_EVENT_READ) {
240             _rd_ref++;
241         }
242         if (event & KQ_EVENT_WRITE){
243             _wr_ref++;
244         }
245     };
246     void DetachEvents(int event) {
247         if (event & KQ_EVENT_READ) {
248             if (_rd_ref > 0) {
249                 _rd_ref--;
250             } else {
251                 _rd_ref = 0;
252             }
253         }
254         if (event & KQ_EVENT_WRITE){
255             if (_wr_ref > 0) {
256                 _wr_ref--;
257             } else {
258                 _wr_ref = 0;
259             }
260         }
261     };
262 
263     /**
264      * @brief 获取引用计数
265      */
266     int ReadRefCnt() { return _rd_ref; };
267     int WriteRefCnt() { return _wr_ref; };
268 
269 };
270 
271 
272 class KqueueProxy
273 {
274 	public:
275 		static const int DEFAULT_MAX_FD_NUM = 100000;
276 
277 	private:
278 		int                       _kqfd;
279 		int                       _maxfd;
280 		KqEvent*                  _evtlist;
281 		KqFdRef*                  _kqrefs;
282 
283 	public:
284 		KqueueProxy();
285 		virtual ~KqueueProxy(){};
286 
287 		int InitKqueue(int max_num);
288 		void TermKqueue(void);
289 
290 		virtual int KqueueGetTimeout(void) { return 0; };
291 		virtual bool KqueueSchedule(KqObjList* fdlist, KqueuerObj* fd, int timeout) { return false; };
292 
293 		bool KqueueAdd(KqObjList& fdset);
294 		bool KqueueDel(KqObjList& fdset);
295 		void KqueueDispatch(void);
296 		bool KqueueAddObj(KqueuerObj* obj);
297 		bool KqueueDelObj(KqueuerObj* obj);
298 		bool KqueueCtrlAdd(int fd, int new_events);
299 		bool KqueueCtrlDel(int fd, int new_events);
300 		bool KqueueCtrlDelRef(int fd, int new_events, bool use_ref);
301 
302 		KqFdRef* KqFdRefGet(int fd) {
303 			return ((fd >= _maxfd) || (fd < 0)) ? (KqFdRef*)NULL : &_kqrefs[fd];
304 		}
305 
306 		void KqueueNtfyReg(int fd, KqueuerObj* obj) {
307 			KqFdRef* ref = KqFdRefGet(fd);
308 			if (ref) {
309 				ref->SetNotifyObj(obj);
310 			}
311 		};
312 
313 	protected:
314 		void KqueueRcvEventList(int evtfdnum);
315 };
316 
317 }
318 
319 
320 #endif
321