xref: /f-stack/app/micro_thread/kqueue_proxy.h (revision a9643ea8)
1*a9643ea8Slogwang 
2*a9643ea8Slogwang /**
3*a9643ea8Slogwang  * Tencent is pleased to support the open source community by making MSEC available.
4*a9643ea8Slogwang  *
5*a9643ea8Slogwang  * Copyright (C) 2016 THL A29 Limited, a Tencent company. All rights reserved.
6*a9643ea8Slogwang  *
7*a9643ea8Slogwang  * Licensed under the GNU General Public License, Version 2.0 (the "License");
8*a9643ea8Slogwang  * you may not use this file except in compliance with the License. You may
9*a9643ea8Slogwang  * obtain a copy of the License at
10*a9643ea8Slogwang  *
11*a9643ea8Slogwang  *     https://opensource.org/licenses/GPL-2.0
12*a9643ea8Slogwang  *
13*a9643ea8Slogwang  * Unless required by applicable law or agreed to in writing, software distributed under the
14*a9643ea8Slogwang  * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
15*a9643ea8Slogwang  * either express or implied. See the License for the specific language governing permissions
16*a9643ea8Slogwang  * and limitations under the License.
17*a9643ea8Slogwang  */
18*a9643ea8Slogwang 
19*a9643ea8Slogwang 
20*a9643ea8Slogwang /**
21*a9643ea8Slogwang  *  @filename kqueue.h
22*a9643ea8Slogwang  *  @info     kqueue for micro thread manage
23*a9643ea8Slogwang  */
24*a9643ea8Slogwang 
25*a9643ea8Slogwang #ifndef _KQUEUE_PROXY___
26*a9643ea8Slogwang #define _KQUEUE_PROXY___
27*a9643ea8Slogwang 
28*a9643ea8Slogwang #include <stdlib.h>
29*a9643ea8Slogwang #include <unistd.h>
30*a9643ea8Slogwang #include <sys/queue.h>
31*a9643ea8Slogwang 
32*a9643ea8Slogwang #include "ff_api.h"
33*a9643ea8Slogwang 
34*a9643ea8Slogwang #include <set>
35*a9643ea8Slogwang #include <vector>
36*a9643ea8Slogwang using std::set;
37*a9643ea8Slogwang using std::vector;
38*a9643ea8Slogwang 
39*a9643ea8Slogwang #define kqueue_assert(statement)
40*a9643ea8Slogwang //#define kqueue_assert(statement) assert(statement)
41*a9643ea8Slogwang 
42*a9643ea8Slogwang namespace NS_MICRO_THREAD {
43*a9643ea8Slogwang 
44*a9643ea8Slogwang #define KQ_EVENT_NONE 0
45*a9643ea8Slogwang #define KQ_EVENT_READ 1
46*a9643ea8Slogwang #define KQ_EVENT_WRITE 2
47*a9643ea8Slogwang 
48*a9643ea8Slogwang /******************************************************************************/
49*a9643ea8Slogwang /*  操作系统头文件适配定义                                                    */
50*a9643ea8Slogwang /******************************************************************************/
51*a9643ea8Slogwang 
52*a9643ea8Slogwang /**
53*a9643ea8Slogwang  * @brief add more detail for linux <sys/queue.h>, freebsd and University of California
54*a9643ea8Slogwang  * @info  queue.h version 8.3 (suse)  diff version 8.5 (tlinux)
55*a9643ea8Slogwang  */
56*a9643ea8Slogwang #ifndef TAILQ_CONCAT
57*a9643ea8Slogwang 
58*a9643ea8Slogwang #define TAILQ_EMPTY(head)   ((head)->tqh_first == NULL)
59*a9643ea8Slogwang #define TAILQ_FIRST(head)   ((head)->tqh_first)
60*a9643ea8Slogwang #define TAILQ_NEXT(elm, field) ((elm)->field.tqe_next)
61*a9643ea8Slogwang 
62*a9643ea8Slogwang #define TAILQ_LAST(head, headname) \
63*a9643ea8Slogwang         (*(((struct headname *)((head)->tqh_last))->tqh_last))
64*a9643ea8Slogwang 
65*a9643ea8Slogwang #define TAILQ_FOREACH(var, head, field)                                     \
66*a9643ea8Slogwang         for ((var) = TAILQ_FIRST((head));                                   \
67*a9643ea8Slogwang              (var);                                                         \
68*a9643ea8Slogwang              (var) = TAILQ_NEXT((var), field))
69*a9643ea8Slogwang 
70*a9643ea8Slogwang #define TAILQ_CONCAT(head1, head2, field)                                   \
71*a9643ea8Slogwang do {                                                                        \
72*a9643ea8Slogwang     if (!TAILQ_EMPTY(head2)) {                                              \
73*a9643ea8Slogwang         *(head1)->tqh_last = (head2)->tqh_first;                            \
74*a9643ea8Slogwang         (head2)->tqh_first->field.tqe_prev = (head1)->tqh_last;             \
75*a9643ea8Slogwang         (head1)->tqh_last = (head2)->tqh_last;                              \
76*a9643ea8Slogwang         TAILQ_INIT((head2));                                                \
77*a9643ea8Slogwang     }                                                                       \
78*a9643ea8Slogwang } while (0)
79*a9643ea8Slogwang 
80*a9643ea8Slogwang #endif
81*a9643ea8Slogwang 
82*a9643ea8Slogwang #ifndef TAILQ_FOREACH_SAFE      // tlinux no this define
83*a9643ea8Slogwang #define TAILQ_FOREACH_SAFE(var, head, field, tvar)                          \
84*a9643ea8Slogwang         for ((var) = TAILQ_FIRST((head));                                   \
85*a9643ea8Slogwang              (var) && ((tvar) = TAILQ_NEXT((var), field), 1);               \
86*a9643ea8Slogwang              (var) = (tvar))
87*a9643ea8Slogwang #endif
88*a9643ea8Slogwang 
89*a9643ea8Slogwang 
90*a9643ea8Slogwang 
91*a9643ea8Slogwang 
92*a9643ea8Slogwang /******************************************************************************/
93*a9643ea8Slogwang /*  Kqueue proxy 定义与实现部分                                                */
94*a9643ea8Slogwang /******************************************************************************/
95*a9643ea8Slogwang 
96*a9643ea8Slogwang class KqueueProxy;
97*a9643ea8Slogwang class MicroThread;
98*a9643ea8Slogwang 
99*a9643ea8Slogwang /**
100*a9643ea8Slogwang  *  @brief kqueue通知对象基类定义
101*a9643ea8Slogwang  */
102*a9643ea8Slogwang class KqueuerObj
103*a9643ea8Slogwang {
104*a9643ea8Slogwang 	protected:
105*a9643ea8Slogwang 		int _fd;
106*a9643ea8Slogwang 		int _events;
107*a9643ea8Slogwang 		int _revents;
108*a9643ea8Slogwang 		int _type;
109*a9643ea8Slogwang 		MicroThread* _thread;
110*a9643ea8Slogwang 
111*a9643ea8Slogwang 	public:
112*a9643ea8Slogwang 
113*a9643ea8Slogwang 		TAILQ_ENTRY(KqueuerObj) _entry;
114*a9643ea8Slogwang 
115*a9643ea8Slogwang 		explicit KqueuerObj(int fd = -1) {
116*a9643ea8Slogwang 			_fd       = fd;
117*a9643ea8Slogwang 			_events   = 0;
118*a9643ea8Slogwang 			_revents  = 0;
119*a9643ea8Slogwang 			_type     = 0;
120*a9643ea8Slogwang 			_thread   = NULL;
121*a9643ea8Slogwang 		};
122*a9643ea8Slogwang 		virtual ~KqueuerObj(){};
123*a9643ea8Slogwang 
124*a9643ea8Slogwang 		virtual int InputNotify();
125*a9643ea8Slogwang 		virtual int OutputNotify();
126*a9643ea8Slogwang 		virtual int HangupNotify();
127*a9643ea8Slogwang 		virtual int KqueueCtlAdd(void* args);
128*a9643ea8Slogwang 		virtual int KqueueCtlDel(void* args);
129*a9643ea8Slogwang 
130*a9643ea8Slogwang 		/**
131*a9643ea8Slogwang 		 *  @brief fd打开可读事件侦听
132*a9643ea8Slogwang 		 */
133*a9643ea8Slogwang 		void EnableInput() {    _events |= KQ_EVENT_READ; };
134*a9643ea8Slogwang 
135*a9643ea8Slogwang 		/**
136*a9643ea8Slogwang 		 *  @brief fd打开可写事件侦听
137*a9643ea8Slogwang 		 */
138*a9643ea8Slogwang 		void EnableOutput() {     _events |= KQ_EVENT_WRITE; };
139*a9643ea8Slogwang 
140*a9643ea8Slogwang 		/**
141*a9643ea8Slogwang 		 *  @brief fd关闭可读事件侦听
142*a9643ea8Slogwang 		 */
143*a9643ea8Slogwang 		void DisableInput() {   _events &= ~KQ_EVENT_READ; };
144*a9643ea8Slogwang 
145*a9643ea8Slogwang 		/**
146*a9643ea8Slogwang 		 *  @brief fd关闭可写事件侦听
147*a9643ea8Slogwang 		 */
148*a9643ea8Slogwang 		void DisableOutput() {    _events &= ~KQ_EVENT_WRITE; };
149*a9643ea8Slogwang 
150*a9643ea8Slogwang 		/**
151*a9643ea8Slogwang 		 *  @brief 系统socket设置读取封装
152*a9643ea8Slogwang 		 */
153*a9643ea8Slogwang 		int GetOsfd() { return _fd; };
154*a9643ea8Slogwang 		void SetOsfd(int fd) {   _fd = fd; };
155*a9643ea8Slogwang 
156*a9643ea8Slogwang 		/**
157*a9643ea8Slogwang 		 *  @brief 监听事件与收到事件的访问方法
158*a9643ea8Slogwang 		 */
159*a9643ea8Slogwang 		int GetEvents() { return _events; };
160*a9643ea8Slogwang 		void SetRcvEvents(int revents) { _revents = revents; };
161*a9643ea8Slogwang 		int GetRcvEvents() { return _revents; };
162*a9643ea8Slogwang 
163*a9643ea8Slogwang 		/**
164*a9643ea8Slogwang 		 *  @brief 工厂管理方法, 获取真实类型
165*a9643ea8Slogwang 		 */
166*a9643ea8Slogwang 		int GetNtfyType() {    return _type; };
167*a9643ea8Slogwang 		virtual void Reset() {
168*a9643ea8Slogwang 			_fd      = -1;
169*a9643ea8Slogwang 			_events  = 0;
170*a9643ea8Slogwang 			_revents = 0;
171*a9643ea8Slogwang 			_type    = 0;
172*a9643ea8Slogwang 			_thread  = NULL;
173*a9643ea8Slogwang 		};
174*a9643ea8Slogwang 
175*a9643ea8Slogwang 		/**
176*a9643ea8Slogwang 		 *  @brief 设置与获取所属的微线程句柄接口
177*a9643ea8Slogwang 		 *  @param thread 关联的线程指针
178*a9643ea8Slogwang 		 */
179*a9643ea8Slogwang 		void SetOwnerThread(MicroThread* thread) {      _thread = thread; };
180*a9643ea8Slogwang 		MicroThread* GetOwnerThread() {        return _thread; };
181*a9643ea8Slogwang 
182*a9643ea8Slogwang };
183*a9643ea8Slogwang 
184*a9643ea8Slogwang typedef TAILQ_HEAD(__KqFdList, KqueuerObj) KqObjList;  ///< 高效的双链管理
185*a9643ea8Slogwang typedef struct kevent KqEvent;                 ///< 重定义一下kqueue event
186*a9643ea8Slogwang 
187*a9643ea8Slogwang 
188*a9643ea8Slogwang /**
189*a9643ea8Slogwang  *  @brief EPOLL支持同一FD多个线程侦听, 建立一个引用计数数组, 元素定义
190*a9643ea8Slogwang  *  @info  引用计数弊大于利, 没有实际意义, 字段保留, 功能移除掉 20150623
191*a9643ea8Slogwang  */
192*a9643ea8Slogwang class KqFdRef
193*a9643ea8Slogwang {
194*a9643ea8Slogwang private:
195*a9643ea8Slogwang     int _wr_ref;             ///< 监听写的引用计数
196*a9643ea8Slogwang     int _rd_ref;             ///< 监听读的引用计数
197*a9643ea8Slogwang     int _events;             ///< 当前正在侦听的事件列表
198*a9643ea8Slogwang     int _revents;            ///< 当前该fd收到的事件信息, 仅在epoll_wait后处理中有效
199*a9643ea8Slogwang     KqueuerObj* _kqobj;      ///< 单独注册调度器对象,一个fd关联一个对象
200*a9643ea8Slogwang 
201*a9643ea8Slogwang public:
202*a9643ea8Slogwang 
203*a9643ea8Slogwang     /**
204*a9643ea8Slogwang      *  @brief 构造与析构函数
205*a9643ea8Slogwang      */
206*a9643ea8Slogwang     KqFdRef() {
207*a9643ea8Slogwang         _wr_ref  = 0;
208*a9643ea8Slogwang         _rd_ref  = 0;
209*a9643ea8Slogwang         _events  = 0;
210*a9643ea8Slogwang         _revents = 0;
211*a9643ea8Slogwang         _kqobj   = NULL;
212*a9643ea8Slogwang     };
213*a9643ea8Slogwang     ~KqFdRef(){};
214*a9643ea8Slogwang 
215*a9643ea8Slogwang     /**
216*a9643ea8Slogwang      *  @brief 监听事件获取与设置接口
217*a9643ea8Slogwang      */
218*a9643ea8Slogwang     void SetListenEvents(int events) {
219*a9643ea8Slogwang         _events = events;
220*a9643ea8Slogwang     };
221*a9643ea8Slogwang     int GetListenEvents() {
222*a9643ea8Slogwang         return _events;
223*a9643ea8Slogwang     };
224*a9643ea8Slogwang 
225*a9643ea8Slogwang     /**
226*a9643ea8Slogwang      *  @brief 监听对象获取与设置接口
227*a9643ea8Slogwang      */
228*a9643ea8Slogwang     void SetNotifyObj(KqueuerObj* ntfy) {
229*a9643ea8Slogwang         _kqobj = ntfy;
230*a9643ea8Slogwang     };
231*a9643ea8Slogwang     KqueuerObj* GetNotifyObj() {
232*a9643ea8Slogwang         return _kqobj;
233*a9643ea8Slogwang     };
234*a9643ea8Slogwang 
235*a9643ea8Slogwang     /**
236*a9643ea8Slogwang      *  @brief 监听引用计数的更新
237*a9643ea8Slogwang      */
238*a9643ea8Slogwang     void AttachEvents(int event) {
239*a9643ea8Slogwang         if (event & KQ_EVENT_READ) {
240*a9643ea8Slogwang             _rd_ref++;
241*a9643ea8Slogwang         }
242*a9643ea8Slogwang         if (event & KQ_EVENT_WRITE){
243*a9643ea8Slogwang             _wr_ref++;
244*a9643ea8Slogwang         }
245*a9643ea8Slogwang     };
246*a9643ea8Slogwang     void DetachEvents(int event) {
247*a9643ea8Slogwang         if (event & KQ_EVENT_READ) {
248*a9643ea8Slogwang             if (_rd_ref > 0) {
249*a9643ea8Slogwang                 _rd_ref--;
250*a9643ea8Slogwang             } else {
251*a9643ea8Slogwang                 _rd_ref = 0;
252*a9643ea8Slogwang             }
253*a9643ea8Slogwang         }
254*a9643ea8Slogwang         if (event & KQ_EVENT_WRITE){
255*a9643ea8Slogwang             if (_wr_ref > 0) {
256*a9643ea8Slogwang                 _wr_ref--;
257*a9643ea8Slogwang             } else {
258*a9643ea8Slogwang                 _wr_ref = 0;
259*a9643ea8Slogwang             }
260*a9643ea8Slogwang         }
261*a9643ea8Slogwang     };
262*a9643ea8Slogwang 
263*a9643ea8Slogwang     /**
264*a9643ea8Slogwang      * @brief 获取引用计数
265*a9643ea8Slogwang      */
266*a9643ea8Slogwang     int ReadRefCnt() { return _rd_ref; };
267*a9643ea8Slogwang     int WriteRefCnt() { return _wr_ref; };
268*a9643ea8Slogwang 
269*a9643ea8Slogwang };
270*a9643ea8Slogwang 
271*a9643ea8Slogwang 
272*a9643ea8Slogwang class KqueueProxy
273*a9643ea8Slogwang {
274*a9643ea8Slogwang 	public:
275*a9643ea8Slogwang 		static const int DEFAULT_MAX_FD_NUM = 100000;
276*a9643ea8Slogwang 
277*a9643ea8Slogwang 	private:
278*a9643ea8Slogwang 		int                       _kqfd;
279*a9643ea8Slogwang 		int                       _maxfd;
280*a9643ea8Slogwang 		KqEvent*                  _evtlist;
281*a9643ea8Slogwang 		KqFdRef*                  _kqrefs;
282*a9643ea8Slogwang 
283*a9643ea8Slogwang 	public:
284*a9643ea8Slogwang 		KqueueProxy();
285*a9643ea8Slogwang 		virtual ~KqueueProxy(){};
286*a9643ea8Slogwang 
287*a9643ea8Slogwang 		int InitKqueue(int max_num);
288*a9643ea8Slogwang 		void TermKqueue(void);
289*a9643ea8Slogwang 
290*a9643ea8Slogwang 		virtual int KqueueGetTimeout(void) { return 0; };
291*a9643ea8Slogwang 		virtual bool KqueueSchedule(KqObjList* fdlist, KqueuerObj* fd, int timeout) { return false; };
292*a9643ea8Slogwang 
293*a9643ea8Slogwang 		bool KqueueAdd(KqObjList& fdset);
294*a9643ea8Slogwang 		bool KqueueDel(KqObjList& fdset);
295*a9643ea8Slogwang 		void KqueueDispatch(void);
296*a9643ea8Slogwang 		bool KqueueAddObj(KqueuerObj* obj);
297*a9643ea8Slogwang 		bool KqueueDelObj(KqueuerObj* obj);
298*a9643ea8Slogwang 		bool KqueueCtrlAdd(int fd, int new_events);
299*a9643ea8Slogwang 		bool KqueueCtrlDel(int fd, int new_events);
300*a9643ea8Slogwang 		bool KqueueCtrlDelRef(int fd, int new_events, bool use_ref);
301*a9643ea8Slogwang 
302*a9643ea8Slogwang 		KqFdRef* KqFdRefGet(int fd) {
303*a9643ea8Slogwang 			return ((fd >= _maxfd) || (fd < 0)) ? (KqFdRef*)NULL : &_kqrefs[fd];
304*a9643ea8Slogwang 		}
305*a9643ea8Slogwang 
306*a9643ea8Slogwang 		void KqueueNtfyReg(int fd, KqueuerObj* obj) {
307*a9643ea8Slogwang 			KqFdRef* ref = KqFdRefGet(fd);
308*a9643ea8Slogwang 			if (ref) {
309*a9643ea8Slogwang 				ref->SetNotifyObj(obj);
310*a9643ea8Slogwang 			}
311*a9643ea8Slogwang 		};
312*a9643ea8Slogwang 
313*a9643ea8Slogwang 	protected:
314*a9643ea8Slogwang 		void KqueueRcvEventList(int evtfdnum);
315*a9643ea8Slogwang };
316*a9643ea8Slogwang 
317*a9643ea8Slogwang }
318*a9643ea8Slogwang 
319*a9643ea8Slogwang 
320*a9643ea8Slogwang #endif
321