xref: /f-stack/app/micro_thread/kqueue_proxy.cpp (revision a9643ea8)
1*a9643ea8Slogwang 
2*a9643ea8Slogwang /**
3*a9643ea8Slogwang  * Tencent is pleased to support the open source community by making MSEC available.
4*a9643ea8Slogwang  *
5*a9643ea8Slogwang  * Copyright (C) 2016 THL A29 Limited, a Tencent company. All rights reserved.
6*a9643ea8Slogwang  *
7*a9643ea8Slogwang  * Licensed under the GNU General Public License, Version 2.0 (the "License");
8*a9643ea8Slogwang  * you may not use this file except in compliance with the License. You may
9*a9643ea8Slogwang  * obtain a copy of the License at
10*a9643ea8Slogwang  *
11*a9643ea8Slogwang  *     https://opensource.org/licenses/GPL-2.0
12*a9643ea8Slogwang  *
13*a9643ea8Slogwang  * Unless required by applicable law or agreed to in writing, software distributed under the
14*a9643ea8Slogwang  * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
15*a9643ea8Slogwang  * either express or implied. See the License for the specific language governing permissions
16*a9643ea8Slogwang  * and limitations under the License.
17*a9643ea8Slogwang  */
18*a9643ea8Slogwang 
19*a9643ea8Slogwang 
20*a9643ea8Slogwang 
21*a9643ea8Slogwang /**
22*a9643ea8Slogwang  *  @filename kqueue_proxy.cpp
23*a9643ea8Slogwang  *  @info     kqueue for micro thread manage
24*a9643ea8Slogwang  */
25*a9643ea8Slogwang 
26*a9643ea8Slogwang #include "kqueue_proxy.h"
27*a9643ea8Slogwang #include "micro_thread.h"
28*a9643ea8Slogwang #include "ff_hook.h"
29*a9643ea8Slogwang 
30*a9643ea8Slogwang using namespace NS_MICRO_THREAD;
31*a9643ea8Slogwang 
32*a9643ea8Slogwang KqueueProxy::KqueueProxy()
33*a9643ea8Slogwang {
34*a9643ea8Slogwang     _maxfd = KqueueProxy::DEFAULT_MAX_FD_NUM;
35*a9643ea8Slogwang     _kqfd = -1;
36*a9643ea8Slogwang     _evtlist = NULL;
37*a9643ea8Slogwang     _kqrefs = NULL;
38*a9643ea8Slogwang }
39*a9643ea8Slogwang 
40*a9643ea8Slogwang int KqueueProxy::InitKqueue(int max_num)
41*a9643ea8Slogwang {
42*a9643ea8Slogwang 	int rc = 0;
43*a9643ea8Slogwang 	if (max_num > _maxfd)
44*a9643ea8Slogwang 	{
45*a9643ea8Slogwang 		_maxfd = max_num;
46*a9643ea8Slogwang 	}
47*a9643ea8Slogwang 
48*a9643ea8Slogwang 	_kqfd = ff_kqueue();
49*a9643ea8Slogwang 	if (_kqfd < 0)
50*a9643ea8Slogwang 	{
51*a9643ea8Slogwang 		rc = -1;
52*a9643ea8Slogwang 		goto EXIT_LABEL;
53*a9643ea8Slogwang 	}
54*a9643ea8Slogwang 
55*a9643ea8Slogwang 	ff_fcntl(_kqfd, F_SETFD, FD_CLOEXEC);
56*a9643ea8Slogwang 
57*a9643ea8Slogwang 	_kqrefs = new KqFdRef[_maxfd];
58*a9643ea8Slogwang 	if (_kqrefs == NULL)
59*a9643ea8Slogwang 	{
60*a9643ea8Slogwang 		rc = -2;
61*a9643ea8Slogwang 		goto EXIT_LABEL;
62*a9643ea8Slogwang 	}
63*a9643ea8Slogwang 
64*a9643ea8Slogwang 	_evtlist = (KqEvent*)calloc(_maxfd, sizeof(KqEvent));
65*a9643ea8Slogwang 	if (_evtlist == NULL)
66*a9643ea8Slogwang 	{
67*a9643ea8Slogwang 		rc = -3;
68*a9643ea8Slogwang 		goto EXIT_LABEL;
69*a9643ea8Slogwang 	}
70*a9643ea8Slogwang 
71*a9643ea8Slogwang     struct rlimit rlim;
72*a9643ea8Slogwang     memset(&rlim, 0, sizeof(rlim));
73*a9643ea8Slogwang     if (getrlimit(RLIMIT_NOFILE, &rlim) == 0)
74*a9643ea8Slogwang     {
75*a9643ea8Slogwang         if ((int)rlim.rlim_max < _maxfd)
76*a9643ea8Slogwang         {
77*a9643ea8Slogwang             rlim.rlim_cur = rlim.rlim_max;
78*a9643ea8Slogwang             setrlimit(RLIMIT_NOFILE, &rlim);
79*a9643ea8Slogwang             rlim.rlim_cur = _maxfd;
80*a9643ea8Slogwang             rlim.rlim_max = _maxfd;
81*a9643ea8Slogwang             setrlimit(RLIMIT_NOFILE, &rlim);
82*a9643ea8Slogwang         }
83*a9643ea8Slogwang     }
84*a9643ea8Slogwang 
85*a9643ea8Slogwang EXIT_LABEL:
86*a9643ea8Slogwang 
87*a9643ea8Slogwang     if (rc < 0)
88*a9643ea8Slogwang     {
89*a9643ea8Slogwang         TermKqueue();
90*a9643ea8Slogwang     }
91*a9643ea8Slogwang 
92*a9643ea8Slogwang     return rc;
93*a9643ea8Slogwang }
94*a9643ea8Slogwang 
95*a9643ea8Slogwang void KqueueProxy::TermKqueue()
96*a9643ea8Slogwang {
97*a9643ea8Slogwang     if (_kqfd > 0)
98*a9643ea8Slogwang     {
99*a9643ea8Slogwang         close(_kqfd);
100*a9643ea8Slogwang         _kqfd = -1;
101*a9643ea8Slogwang     }
102*a9643ea8Slogwang 
103*a9643ea8Slogwang     if (_evtlist != NULL)
104*a9643ea8Slogwang     {
105*a9643ea8Slogwang         free(_evtlist);
106*a9643ea8Slogwang         _evtlist = NULL;
107*a9643ea8Slogwang     }
108*a9643ea8Slogwang 
109*a9643ea8Slogwang     if (_kqrefs != NULL)
110*a9643ea8Slogwang     {
111*a9643ea8Slogwang         delete []_kqrefs;
112*a9643ea8Slogwang         _kqrefs = NULL;
113*a9643ea8Slogwang     }
114*a9643ea8Slogwang }
115*a9643ea8Slogwang 
116*a9643ea8Slogwang bool KqueueProxy::KqueueAdd(KqObjList& obj_list)
117*a9643ea8Slogwang {
118*a9643ea8Slogwang 	bool ret = true;
119*a9643ea8Slogwang 	KqueuerObj *kqobj = NULL;
120*a9643ea8Slogwang 	KqueuerObj *kqobj_error = NULL;
121*a9643ea8Slogwang 	TAILQ_FOREACH(kqobj, &obj_list, _entry)
122*a9643ea8Slogwang 	{
123*a9643ea8Slogwang 		if (!KqueueAddObj(kqobj))
124*a9643ea8Slogwang 		{
125*a9643ea8Slogwang             MTLOG_ERROR("kqobj add failed, fd: %d", kqobj->GetOsfd());
126*a9643ea8Slogwang             kqueue_assert(0);
127*a9643ea8Slogwang             kqobj_error = kqobj;
128*a9643ea8Slogwang             ret = false;
129*a9643ea8Slogwang             goto EXIT_LABEL;
130*a9643ea8Slogwang 		}
131*a9643ea8Slogwang 	}
132*a9643ea8Slogwang 
133*a9643ea8Slogwang EXIT_LABEL:
134*a9643ea8Slogwang 
135*a9643ea8Slogwang     if (!ret)
136*a9643ea8Slogwang     {
137*a9643ea8Slogwang         TAILQ_FOREACH(kqobj, &obj_list, _entry)
138*a9643ea8Slogwang         {
139*a9643ea8Slogwang             if (kqobj == kqobj_error)
140*a9643ea8Slogwang             {
141*a9643ea8Slogwang                 break;
142*a9643ea8Slogwang             }
143*a9643ea8Slogwang             KqueueDelObj(kqobj);
144*a9643ea8Slogwang         }
145*a9643ea8Slogwang     }
146*a9643ea8Slogwang 
147*a9643ea8Slogwang     return ret;
148*a9643ea8Slogwang }
149*a9643ea8Slogwang 
150*a9643ea8Slogwang bool KqueueProxy::KqueueDel(KqObjList& obj_list)
151*a9643ea8Slogwang {
152*a9643ea8Slogwang     bool ret = true;
153*a9643ea8Slogwang 
154*a9643ea8Slogwang     KqueuerObj *kqobj = NULL;
155*a9643ea8Slogwang     TAILQ_FOREACH(kqobj, &obj_list, _entry)
156*a9643ea8Slogwang     {
157*a9643ea8Slogwang         if (!KqueueDelObj(kqobj))  // failed also need continue, be sure ref count ok
158*a9643ea8Slogwang         {
159*a9643ea8Slogwang             MTLOG_ERROR("epobj del failed, fd: %d", kqobj->GetOsfd());
160*a9643ea8Slogwang             kqueue_assert(0);
161*a9643ea8Slogwang             ret = false;
162*a9643ea8Slogwang         }
163*a9643ea8Slogwang     }
164*a9643ea8Slogwang 
165*a9643ea8Slogwang     return ret;
166*a9643ea8Slogwang }
167*a9643ea8Slogwang 
168*a9643ea8Slogwang bool KqueueProxy::KqueueCtrlAdd(int fd, int events)
169*a9643ea8Slogwang {
170*a9643ea8Slogwang 	KqFdRef* item = KqFdRefGet(fd);
171*a9643ea8Slogwang 	if (item == NULL)
172*a9643ea8Slogwang 	{
173*a9643ea8Slogwang         MT_ATTR_API(320851, 1); // fd error, wtf?
174*a9643ea8Slogwang         MTLOG_ERROR("kqfd ref not find, failed, fd: %d", fd);
175*a9643ea8Slogwang         kqueue_assert(0);
176*a9643ea8Slogwang         return false;
177*a9643ea8Slogwang 	}
178*a9643ea8Slogwang 
179*a9643ea8Slogwang 	item->AttachEvents(events);
180*a9643ea8Slogwang 
181*a9643ea8Slogwang 	int old_events = item->GetListenEvents();
182*a9643ea8Slogwang 	int new_events = old_events | events;
183*a9643ea8Slogwang 	if (old_events == new_events)
184*a9643ea8Slogwang 	{
185*a9643ea8Slogwang 		return true;
186*a9643ea8Slogwang 	}
187*a9643ea8Slogwang 
188*a9643ea8Slogwang 	KqEvent ke;
189*a9643ea8Slogwang 	int ret;
190*a9643ea8Slogwang 	if (CHK_FD_BIT(fd)) {
191*a9643ea8Slogwang 		fd = CLR_FD_BIT(fd);
192*a9643ea8Slogwang     }
193*a9643ea8Slogwang 	if (old_events & KQ_EVENT_WRITE) {
194*a9643ea8Slogwang 		EV_SET(&ke, fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL);
195*a9643ea8Slogwang 		ret = ff_kevent(_kqfd, &ke, 1, NULL, 0, NULL);
196*a9643ea8Slogwang 		if (ret == -1) {
197*a9643ea8Slogwang 			// TODO, error check
198*a9643ea8Slogwang 			item->DetachEvents(events);
199*a9643ea8Slogwang 			kqueue_assert(0);
200*a9643ea8Slogwang 			return false;
201*a9643ea8Slogwang 		}
202*a9643ea8Slogwang 	}
203*a9643ea8Slogwang 	if (old_events & KQ_EVENT_READ) {
204*a9643ea8Slogwang 		EV_SET(&ke, fd, EVFILT_READ, EV_DELETE, 0, 0, NULL);
205*a9643ea8Slogwang 		ret = ff_kevent(_kqfd, &ke, 1, NULL, 0, NULL);
206*a9643ea8Slogwang 		if (ret == -1) {
207*a9643ea8Slogwang 			// TODO, error check
208*a9643ea8Slogwang 			item->DetachEvents(events);
209*a9643ea8Slogwang 			kqueue_assert(0);
210*a9643ea8Slogwang 			return false;
211*a9643ea8Slogwang 		}
212*a9643ea8Slogwang 	}
213*a9643ea8Slogwang 	if (events & KQ_EVENT_WRITE) {
214*a9643ea8Slogwang 		EV_SET(&ke, fd, EVFILT_WRITE, EV_ADD, 0, 0, NULL);
215*a9643ea8Slogwang 		ret = ff_kevent(_kqfd, &ke, 1, NULL, 0, NULL);
216*a9643ea8Slogwang 		if (ret == -1) {
217*a9643ea8Slogwang 			// TODO, error check
218*a9643ea8Slogwang 			item->DetachEvents(events);
219*a9643ea8Slogwang 			kqueue_assert(0);
220*a9643ea8Slogwang 			return false;
221*a9643ea8Slogwang 		}
222*a9643ea8Slogwang 	}
223*a9643ea8Slogwang 	if (events & KQ_EVENT_READ) {
224*a9643ea8Slogwang 		EV_SET(&ke, fd, EVFILT_READ, EV_ADD, 0, 0, NULL);
225*a9643ea8Slogwang 		ret = ff_kevent(_kqfd, &ke, 1, NULL, 0, NULL);
226*a9643ea8Slogwang 		if (ret == -1) {
227*a9643ea8Slogwang 			// TODO, error check
228*a9643ea8Slogwang 			item->DetachEvents(events);
229*a9643ea8Slogwang 			kqueue_assert(0);
230*a9643ea8Slogwang 			return false;
231*a9643ea8Slogwang 		}
232*a9643ea8Slogwang 	}
233*a9643ea8Slogwang 
234*a9643ea8Slogwang 	item->SetListenEvents(new_events);
235*a9643ea8Slogwang 
236*a9643ea8Slogwang 	return true;
237*a9643ea8Slogwang }
238*a9643ea8Slogwang 
239*a9643ea8Slogwang 
240*a9643ea8Slogwang bool KqueueProxy::KqueueCtrlDel(int fd, int events)
241*a9643ea8Slogwang {
242*a9643ea8Slogwang 	return KqueueCtrlDelRef(fd, events, false);
243*a9643ea8Slogwang }
244*a9643ea8Slogwang 
245*a9643ea8Slogwang bool KqueueProxy::KqueueCtrlDelRef(int fd, int events, bool use_ref)
246*a9643ea8Slogwang {
247*a9643ea8Slogwang 	KqFdRef* item = KqFdRefGet(fd);
248*a9643ea8Slogwang 	if (item == NULL)
249*a9643ea8Slogwang 	{
250*a9643ea8Slogwang         MT_ATTR_API(320851, 1); // fd error
251*a9643ea8Slogwang         MTLOG_ERROR("kqfd ref not find, failed, fd: %d", fd);
252*a9643ea8Slogwang         kqueue_assert(0);
253*a9643ea8Slogwang         return false;
254*a9643ea8Slogwang 
255*a9643ea8Slogwang 	}
256*a9643ea8Slogwang 
257*a9643ea8Slogwang 	item->DetachEvents(events);
258*a9643ea8Slogwang 	int old_events = item->GetListenEvents();
259*a9643ea8Slogwang 	int new_events = old_events &~ events;
260*a9643ea8Slogwang 
261*a9643ea8Slogwang 	if (use_ref) {
262*a9643ea8Slogwang 		new_events = old_events;
263*a9643ea8Slogwang 		if (item->ReadRefCnt() == 0) {
264*a9643ea8Slogwang 			new_events = new_events & ~KQ_EVENT_READ;
265*a9643ea8Slogwang 		}
266*a9643ea8Slogwang 		if (item->WriteRefCnt() == 0) {
267*a9643ea8Slogwang 			new_events = new_events & ~KQ_EVENT_WRITE;
268*a9643ea8Slogwang 		}
269*a9643ea8Slogwang 	}
270*a9643ea8Slogwang 
271*a9643ea8Slogwang 	if (old_events == new_events)
272*a9643ea8Slogwang 	{
273*a9643ea8Slogwang 		return true;
274*a9643ea8Slogwang 	}
275*a9643ea8Slogwang 	KqEvent ke;
276*a9643ea8Slogwang 	int ret;
277*a9643ea8Slogwang 	if (CHK_FD_BIT(fd)) {
278*a9643ea8Slogwang 		fd = CLR_FD_BIT(fd);
279*a9643ea8Slogwang     }
280*a9643ea8Slogwang 	if (old_events & KQ_EVENT_WRITE) {
281*a9643ea8Slogwang 		EV_SET(&ke, fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL);
282*a9643ea8Slogwang 		ret = ff_kevent(_kqfd, &ke, 1, NULL, 0, NULL);
283*a9643ea8Slogwang 		if (ret == -1) {
284*a9643ea8Slogwang 			kqueue_assert(0);
285*a9643ea8Slogwang 			return false;
286*a9643ea8Slogwang 		}
287*a9643ea8Slogwang 	}
288*a9643ea8Slogwang 	if (old_events & KQ_EVENT_READ) {
289*a9643ea8Slogwang 		EV_SET(&ke, fd, EVFILT_READ, EV_DELETE, 0, 0, NULL);
290*a9643ea8Slogwang 		ret = ff_kevent(_kqfd, &ke, 1, NULL, 0, NULL);
291*a9643ea8Slogwang 		if (ret == -1) {
292*a9643ea8Slogwang 			kqueue_assert(0);
293*a9643ea8Slogwang 			return false;
294*a9643ea8Slogwang 		}
295*a9643ea8Slogwang 	}
296*a9643ea8Slogwang 
297*a9643ea8Slogwang 	if (new_events & KQ_EVENT_WRITE) {
298*a9643ea8Slogwang 		EV_SET(&ke, fd, EVFILT_WRITE, EV_ADD, 0, 0, NULL);
299*a9643ea8Slogwang 		ret = ff_kevent(_kqfd, &ke, 1, NULL, 0, NULL);
300*a9643ea8Slogwang 		if (ret == -1) {
301*a9643ea8Slogwang 			kqueue_assert(0);
302*a9643ea8Slogwang 			return false;
303*a9643ea8Slogwang 		}
304*a9643ea8Slogwang 	}
305*a9643ea8Slogwang 	if (new_events & KQ_EVENT_READ) {
306*a9643ea8Slogwang 		EV_SET(&ke, fd, EVFILT_READ, EV_ADD, 0, 0, NULL);
307*a9643ea8Slogwang 		ret = ff_kevent(_kqfd, &ke, 1, NULL, 0, NULL);
308*a9643ea8Slogwang 		if (ret == -1) {
309*a9643ea8Slogwang 			kqueue_assert(0);
310*a9643ea8Slogwang 			return false;
311*a9643ea8Slogwang 		}
312*a9643ea8Slogwang 	}
313*a9643ea8Slogwang 
314*a9643ea8Slogwang 	item->SetListenEvents(new_events);
315*a9643ea8Slogwang 
316*a9643ea8Slogwang 	return true;
317*a9643ea8Slogwang }
318*a9643ea8Slogwang 
319*a9643ea8Slogwang bool KqueueProxy::KqueueAddObj(KqueuerObj* obj)
320*a9643ea8Slogwang {
321*a9643ea8Slogwang 	if (obj == NULL)
322*a9643ea8Slogwang 	{
323*a9643ea8Slogwang         MTLOG_ERROR("kqobj input invalid, %p", obj);
324*a9643ea8Slogwang         return false;
325*a9643ea8Slogwang 	}
326*a9643ea8Slogwang 
327*a9643ea8Slogwang 	KqFdRef* item = KqFdRefGet(obj->GetOsfd());
328*a9643ea8Slogwang 	if (item == NULL)
329*a9643ea8Slogwang 	{
330*a9643ea8Slogwang         MT_ATTR_API(320851, 1); // fd error
331*a9643ea8Slogwang         MTLOG_ERROR("kqfd ref not find, failed, fd: %d", obj->GetOsfd());
332*a9643ea8Slogwang         kqueue_assert(0);
333*a9643ea8Slogwang         return false;
334*a9643ea8Slogwang 	}
335*a9643ea8Slogwang 
336*a9643ea8Slogwang 	int ret = obj->KqueueCtlAdd(item);
337*a9643ea8Slogwang 	if (ret < 0) {
338*a9643ea8Slogwang         MTLOG_ERROR("kqueue ctrl callback failed, fd: %d, obj: %p", obj->GetOsfd(), obj);
339*a9643ea8Slogwang         kqueue_assert(0);
340*a9643ea8Slogwang         return false;
341*a9643ea8Slogwang 	}
342*a9643ea8Slogwang 
343*a9643ea8Slogwang 	return true;
344*a9643ea8Slogwang }
345*a9643ea8Slogwang 
346*a9643ea8Slogwang bool KqueueProxy::KqueueDelObj(KqueuerObj* obj)
347*a9643ea8Slogwang {
348*a9643ea8Slogwang 	if (obj == NULL)
349*a9643ea8Slogwang 	{
350*a9643ea8Slogwang         MTLOG_ERROR("kqobj input invalid, %p", obj);
351*a9643ea8Slogwang         return false;
352*a9643ea8Slogwang 	}
353*a9643ea8Slogwang 	KqFdRef* item = KqFdRefGet(obj->GetOsfd());
354*a9643ea8Slogwang 	if (item == NULL)
355*a9643ea8Slogwang 	{
356*a9643ea8Slogwang         MT_ATTR_API(320851, 1); // fd error
357*a9643ea8Slogwang         MTLOG_ERROR("kqfd ref not find, failed, fd: %d", obj->GetOsfd());
358*a9643ea8Slogwang         kqueue_assert(0);
359*a9643ea8Slogwang         return false;
360*a9643ea8Slogwang 	}
361*a9643ea8Slogwang 
362*a9643ea8Slogwang 	int ret = obj->KqueueCtlDel(item);
363*a9643ea8Slogwang 	if (ret < 0) {
364*a9643ea8Slogwang         MTLOG_ERROR("kqueue ctrl callback failed, fd: %d, obj: %p", obj->GetOsfd(), obj);
365*a9643ea8Slogwang         kqueue_assert(0);
366*a9643ea8Slogwang         return false;
367*a9643ea8Slogwang 	}
368*a9643ea8Slogwang 
369*a9643ea8Slogwang 	return true;
370*a9643ea8Slogwang }
371*a9643ea8Slogwang 
372*a9643ea8Slogwang void KqueueProxy::KqueueRcvEventList(int evtfdnum)
373*a9643ea8Slogwang {
374*a9643ea8Slogwang 	int ret = 0;
375*a9643ea8Slogwang 	int osfd = 0;
376*a9643ea8Slogwang 	int revents = 0;
377*a9643ea8Slogwang 	int tmp_evts = 0;
378*a9643ea8Slogwang 	KqFdRef* item = NULL;
379*a9643ea8Slogwang 	KqueuerObj* obj = NULL;
380*a9643ea8Slogwang 
381*a9643ea8Slogwang 	for (int i = 0; i < evtfdnum; i++)
382*a9643ea8Slogwang 	{
383*a9643ea8Slogwang 		osfd = _evtlist[i].ident |= 1 << FF_FD_BITS;
384*a9643ea8Slogwang 
385*a9643ea8Slogwang 		item = KqFdRefGet(osfd);
386*a9643ea8Slogwang 		if (item == NULL)
387*a9643ea8Slogwang 		{
388*a9643ea8Slogwang             MT_ATTR_API(320851, 1); // fd error
389*a9643ea8Slogwang             MTLOG_ERROR("kqfd ref not find, failed, fd: %d", osfd);
390*a9643ea8Slogwang             kqueue_assert(0);
391*a9643ea8Slogwang             continue;
392*a9643ea8Slogwang 		}
393*a9643ea8Slogwang 		tmp_evts = _evtlist[i].filter;
394*a9643ea8Slogwang 		if (tmp_evts == EVFILT_READ) {
395*a9643ea8Slogwang 			revents |= KQ_EVENT_READ;
396*a9643ea8Slogwang 		}
397*a9643ea8Slogwang 		if (tmp_evts == EVFILT_WRITE) {
398*a9643ea8Slogwang 			revents |= KQ_EVENT_WRITE;
399*a9643ea8Slogwang 		}
400*a9643ea8Slogwang 		obj = item->GetNotifyObj();
401*a9643ea8Slogwang 		if (obj == NULL)
402*a9643ea8Slogwang 		{
403*a9643ea8Slogwang             MTLOG_ERROR("fd notify obj null, failed, fd: %d", osfd);
404*a9643ea8Slogwang             KqueueCtrlDel(osfd, (revents & (KQ_EVENT_READ | KQ_EVENT_WRITE)));
405*a9643ea8Slogwang             continue;
406*a9643ea8Slogwang 		}
407*a9643ea8Slogwang 		obj->SetRcvEvents(revents);
408*a9643ea8Slogwang 
409*a9643ea8Slogwang 		if (tmp_evts == EV_ERROR)
410*a9643ea8Slogwang 		{
411*a9643ea8Slogwang 			obj->HangupNotify();
412*a9643ea8Slogwang 			continue;
413*a9643ea8Slogwang 		}
414*a9643ea8Slogwang 
415*a9643ea8Slogwang 		if (revents & KQ_EVENT_READ)
416*a9643ea8Slogwang 		{
417*a9643ea8Slogwang 			ret = obj->InputNotify();
418*a9643ea8Slogwang 			if (ret != 0)
419*a9643ea8Slogwang 			{
420*a9643ea8Slogwang 				continue;
421*a9643ea8Slogwang 			}
422*a9643ea8Slogwang 		}
423*a9643ea8Slogwang 
424*a9643ea8Slogwang 		if (revents & KQ_EVENT_WRITE)
425*a9643ea8Slogwang 		{
426*a9643ea8Slogwang 			ret = obj->OutputNotify();
427*a9643ea8Slogwang 			if (ret != 0)
428*a9643ea8Slogwang 			{
429*a9643ea8Slogwang 				continue;
430*a9643ea8Slogwang 			}
431*a9643ea8Slogwang 		}
432*a9643ea8Slogwang 	}
433*a9643ea8Slogwang }
434*a9643ea8Slogwang 
435*a9643ea8Slogwang void KqueueProxy::KqueueDispatch()
436*a9643ea8Slogwang {
437*a9643ea8Slogwang 	int nfd;
438*a9643ea8Slogwang 	int wait_time = KqueueGetTimeout();
439*a9643ea8Slogwang 	if (wait_time) {
440*a9643ea8Slogwang 		struct timespec ts;
441*a9643ea8Slogwang 		ts.tv_sec = wait_time / 1000;
442*a9643ea8Slogwang 		ts.tv_nsec = 0;
443*a9643ea8Slogwang 		nfd = ff_kevent(_kqfd, NULL, 0, _evtlist, _maxfd, &ts);
444*a9643ea8Slogwang 	} else {
445*a9643ea8Slogwang 		nfd = ff_kevent(_kqfd, NULL, 0, _evtlist, _maxfd, NULL);
446*a9643ea8Slogwang 	}
447*a9643ea8Slogwang 	if (nfd <= 0)
448*a9643ea8Slogwang 	{
449*a9643ea8Slogwang 		return;
450*a9643ea8Slogwang 	}
451*a9643ea8Slogwang 
452*a9643ea8Slogwang 	KqueueRcvEventList(nfd);
453*a9643ea8Slogwang }
454*a9643ea8Slogwang 
455*a9643ea8Slogwang int KqueuerObj::InputNotify()
456*a9643ea8Slogwang {
457*a9643ea8Slogwang 	MicroThread* thread = this->GetOwnerThread();
458*a9643ea8Slogwang 	if (thread == NULL)
459*a9643ea8Slogwang 	{
460*a9643ea8Slogwang 		kqueue_assert(0);
461*a9643ea8Slogwang         MTLOG_ERROR("kqueue fd obj, no thread ptr, wrong");
462*a9643ea8Slogwang         return -1;
463*a9643ea8Slogwang 	}
464*a9643ea8Slogwang 
465*a9643ea8Slogwang 	if (thread->HasFlag(MicroThread::IO_LIST))
466*a9643ea8Slogwang 	{
467*a9643ea8Slogwang         MtFrame* frame = MtFrame::Instance();
468*a9643ea8Slogwang         frame->RemoveIoWait(thread);
469*a9643ea8Slogwang         frame->InsertRunable(thread);
470*a9643ea8Slogwang 	}
471*a9643ea8Slogwang 
472*a9643ea8Slogwang 	return 0;
473*a9643ea8Slogwang }
474*a9643ea8Slogwang 
475*a9643ea8Slogwang int KqueuerObj::OutputNotify()
476*a9643ea8Slogwang {
477*a9643ea8Slogwang     MicroThread* thread = this->GetOwnerThread();
478*a9643ea8Slogwang     if (NULL == thread)
479*a9643ea8Slogwang     {
480*a9643ea8Slogwang         kqueue_assert(0);
481*a9643ea8Slogwang         MTLOG_ERROR("kqueue fd obj, no thread ptr, wrong");
482*a9643ea8Slogwang         return -1;
483*a9643ea8Slogwang     }
484*a9643ea8Slogwang 
485*a9643ea8Slogwang     // 多个事件同时到达, 防重复操作
486*a9643ea8Slogwang     if (thread->HasFlag(MicroThread::IO_LIST))
487*a9643ea8Slogwang     {
488*a9643ea8Slogwang         MtFrame* frame = MtFrame::Instance();
489*a9643ea8Slogwang         frame->RemoveIoWait(thread);
490*a9643ea8Slogwang         frame->InsertRunable(thread);
491*a9643ea8Slogwang     }
492*a9643ea8Slogwang 
493*a9643ea8Slogwang     return 0;
494*a9643ea8Slogwang }
495*a9643ea8Slogwang 
496*a9643ea8Slogwang int KqueuerObj::HangupNotify()
497*a9643ea8Slogwang {
498*a9643ea8Slogwang     MtFrame* frame = MtFrame::Instance();
499*a9643ea8Slogwang     frame->KqueueCtrlDel(this->GetOsfd(), this->GetEvents());
500*a9643ea8Slogwang     return 0;
501*a9643ea8Slogwang }
502*a9643ea8Slogwang 
503*a9643ea8Slogwang int KqueuerObj::KqueueCtlAdd(void* args)
504*a9643ea8Slogwang {
505*a9643ea8Slogwang     MtFrame* frame = MtFrame::Instance();
506*a9643ea8Slogwang     KqFdRef* fd_ref = (KqFdRef*)args;
507*a9643ea8Slogwang     kqueue_assert(fd_ref != NULL);
508*a9643ea8Slogwang 
509*a9643ea8Slogwang     int osfd = this->GetOsfd();
510*a9643ea8Slogwang     int new_events = this->GetEvents();
511*a9643ea8Slogwang 
512*a9643ea8Slogwang     // 通知对象需要更新, FD通知对象理论上不会复用, 这里做冲突检查, 异常log记录
513*a9643ea8Slogwang     KqueuerObj* old_obj = fd_ref->GetNotifyObj();
514*a9643ea8Slogwang     if ((old_obj != NULL) && (old_obj != this))
515*a9643ea8Slogwang     {
516*a9643ea8Slogwang         MTLOG_ERROR("kqfd ref conflict, fd: %d, old: %p, now: %p", osfd, old_obj, this);
517*a9643ea8Slogwang         return -1;
518*a9643ea8Slogwang     }
519*a9643ea8Slogwang     fd_ref->SetNotifyObj(this);
520*a9643ea8Slogwang 
521*a9643ea8Slogwang     // 调用框架的epoll ctl接口, 屏蔽epoll ctrl细节
522*a9643ea8Slogwang     if (!frame->KqueueCtrlAdd(osfd, new_events))
523*a9643ea8Slogwang     {
524*a9643ea8Slogwang         MTLOG_ERROR("kqfd ref add failed, log");
525*a9643ea8Slogwang         fd_ref->SetNotifyObj(old_obj);
526*a9643ea8Slogwang         return -2;
527*a9643ea8Slogwang     }
528*a9643ea8Slogwang 
529*a9643ea8Slogwang     return 0;
530*a9643ea8Slogwang }
531*a9643ea8Slogwang 
532*a9643ea8Slogwang int KqueuerObj::KqueueCtlDel(void* args)
533*a9643ea8Slogwang {
534*a9643ea8Slogwang     MtFrame* frame = MtFrame::Instance();
535*a9643ea8Slogwang     KqFdRef* fd_ref = (KqFdRef*)args;
536*a9643ea8Slogwang     kqueue_assert(fd_ref != NULL);
537*a9643ea8Slogwang 
538*a9643ea8Slogwang     int osfd = this->GetOsfd();
539*a9643ea8Slogwang     int events = this->GetEvents();
540*a9643ea8Slogwang 
541*a9643ea8Slogwang     // 通知对象需要更新, FD通知对象理论上不会复用, 这里做冲突检查, 异常log记录
542*a9643ea8Slogwang     KqueuerObj* old_obj = fd_ref->GetNotifyObj();
543*a9643ea8Slogwang     if (old_obj != this)
544*a9643ea8Slogwang     {
545*a9643ea8Slogwang         MTLOG_ERROR("kqfd ref conflict, fd: %d, old: %p, now: %p", osfd, old_obj, this);
546*a9643ea8Slogwang         return -1;
547*a9643ea8Slogwang     }
548*a9643ea8Slogwang     fd_ref->SetNotifyObj(NULL);
549*a9643ea8Slogwang 
550*a9643ea8Slogwang     // 调用框架的epoll ctl接口, 屏蔽epoll ctrl细节
551*a9643ea8Slogwang     if (!frame->KqueueCtrlDelRef(osfd, events, false)) // 引用有风险, 弊大于利, 关闭掉
552*a9643ea8Slogwang     {
553*a9643ea8Slogwang         MTLOG_ERROR("kqfd ref del failed, log");
554*a9643ea8Slogwang         fd_ref->SetNotifyObj(old_obj);
555*a9643ea8Slogwang         return -2;
556*a9643ea8Slogwang     }
557*a9643ea8Slogwang 
558*a9643ea8Slogwang     return 0;
559*a9643ea8Slogwang 
560*a9643ea8Slogwang }
561*a9643ea8Slogwang 
562