xref: /f-stack/app/micro_thread/kqueue_proxy.cpp (revision a02c88d6)
1a9643ea8Slogwang 
2a9643ea8Slogwang /**
3a9643ea8Slogwang  * Tencent is pleased to support the open source community by making MSEC available.
4a9643ea8Slogwang  *
5a9643ea8Slogwang  * Copyright (C) 2016 THL A29 Limited, a Tencent company. All rights reserved.
6a9643ea8Slogwang  *
7a9643ea8Slogwang  * Licensed under the GNU General Public License, Version 2.0 (the "License");
8a9643ea8Slogwang  * you may not use this file except in compliance with the License. You may
9a9643ea8Slogwang  * obtain a copy of the License at
10a9643ea8Slogwang  *
11a9643ea8Slogwang  *     https://opensource.org/licenses/GPL-2.0
12a9643ea8Slogwang  *
13a9643ea8Slogwang  * Unless required by applicable law or agreed to in writing, software distributed under the
14a9643ea8Slogwang  * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
15a9643ea8Slogwang  * either express or implied. See the License for the specific language governing permissions
16a9643ea8Slogwang  * and limitations under the License.
17a9643ea8Slogwang  */
18a9643ea8Slogwang 
19a9643ea8Slogwang 
20a9643ea8Slogwang 
21a9643ea8Slogwang /**
22a9643ea8Slogwang  *  @filename kqueue_proxy.cpp
23a9643ea8Slogwang  *  @info     kqueue for micro thread manage
24a9643ea8Slogwang  */
25a9643ea8Slogwang 
26a9643ea8Slogwang #include "kqueue_proxy.h"
27a9643ea8Slogwang #include "micro_thread.h"
28a9643ea8Slogwang #include "ff_hook.h"
29a9643ea8Slogwang 
30a9643ea8Slogwang using namespace NS_MICRO_THREAD;
31a9643ea8Slogwang 
32a9643ea8Slogwang KqueueProxy::KqueueProxy()
33a9643ea8Slogwang {
34a9643ea8Slogwang     _maxfd = KqueueProxy::DEFAULT_MAX_FD_NUM;
35a9643ea8Slogwang     _kqfd = -1;
36a9643ea8Slogwang     _evtlist = NULL;
37a9643ea8Slogwang     _kqrefs = NULL;
38a9643ea8Slogwang }
39a9643ea8Slogwang 
40a9643ea8Slogwang int KqueueProxy::InitKqueue(int max_num)
41a9643ea8Slogwang {
42a9643ea8Slogwang 	int rc = 0;
43a9643ea8Slogwang 	if (max_num > _maxfd)
44a9643ea8Slogwang 	{
45a9643ea8Slogwang 		_maxfd = max_num;
46a9643ea8Slogwang 	}
47a9643ea8Slogwang 
48a9643ea8Slogwang 	_kqfd = ff_kqueue();
49a9643ea8Slogwang 	if (_kqfd < 0)
50a9643ea8Slogwang 	{
51a9643ea8Slogwang 		rc = -1;
52a9643ea8Slogwang 		goto EXIT_LABEL;
53a9643ea8Slogwang 	}
54a9643ea8Slogwang 
55a9643ea8Slogwang 	ff_fcntl(_kqfd, F_SETFD, FD_CLOEXEC);
56a9643ea8Slogwang 
57a9643ea8Slogwang 	_kqrefs = new KqFdRef[_maxfd];
58a9643ea8Slogwang 	if (_kqrefs == NULL)
59a9643ea8Slogwang 	{
60a9643ea8Slogwang 		rc = -2;
61a9643ea8Slogwang 		goto EXIT_LABEL;
62a9643ea8Slogwang 	}
63a9643ea8Slogwang 
64a9643ea8Slogwang 	_evtlist = (KqEvent*)calloc(_maxfd, sizeof(KqEvent));
65a9643ea8Slogwang 	if (_evtlist == NULL)
66a9643ea8Slogwang 	{
67a9643ea8Slogwang 		rc = -3;
68a9643ea8Slogwang 		goto EXIT_LABEL;
69a9643ea8Slogwang 	}
70a9643ea8Slogwang 
71a9643ea8Slogwang     struct rlimit rlim;
72a9643ea8Slogwang     memset(&rlim, 0, sizeof(rlim));
73a9643ea8Slogwang     if (getrlimit(RLIMIT_NOFILE, &rlim) == 0)
74a9643ea8Slogwang     {
75a9643ea8Slogwang         if ((int)rlim.rlim_max < _maxfd)
76a9643ea8Slogwang         {
77a9643ea8Slogwang             rlim.rlim_cur = rlim.rlim_max;
78a9643ea8Slogwang             setrlimit(RLIMIT_NOFILE, &rlim);
79a9643ea8Slogwang             rlim.rlim_cur = _maxfd;
80a9643ea8Slogwang             rlim.rlim_max = _maxfd;
81a9643ea8Slogwang             setrlimit(RLIMIT_NOFILE, &rlim);
82a9643ea8Slogwang         }
83a9643ea8Slogwang     }
84a9643ea8Slogwang 
85a9643ea8Slogwang EXIT_LABEL:
86a9643ea8Slogwang 
87a9643ea8Slogwang     if (rc < 0)
88a9643ea8Slogwang     {
89a9643ea8Slogwang         TermKqueue();
90a9643ea8Slogwang     }
91a9643ea8Slogwang 
92a9643ea8Slogwang     return rc;
93a9643ea8Slogwang }
94a9643ea8Slogwang 
95a9643ea8Slogwang void KqueueProxy::TermKqueue()
96a9643ea8Slogwang {
97a9643ea8Slogwang     if (_kqfd > 0)
98a9643ea8Slogwang     {
99a9643ea8Slogwang         close(_kqfd);
100a9643ea8Slogwang         _kqfd = -1;
101a9643ea8Slogwang     }
102a9643ea8Slogwang 
103a9643ea8Slogwang     if (_evtlist != NULL)
104a9643ea8Slogwang     {
105a9643ea8Slogwang         free(_evtlist);
106a9643ea8Slogwang         _evtlist = NULL;
107a9643ea8Slogwang     }
108a9643ea8Slogwang 
109a9643ea8Slogwang     if (_kqrefs != NULL)
110a9643ea8Slogwang     {
111a9643ea8Slogwang         delete []_kqrefs;
112a9643ea8Slogwang         _kqrefs = NULL;
113a9643ea8Slogwang     }
114a9643ea8Slogwang }
115a9643ea8Slogwang 
116a9643ea8Slogwang bool KqueueProxy::KqueueAdd(KqObjList& obj_list)
117a9643ea8Slogwang {
118a9643ea8Slogwang 	bool ret = true;
119a9643ea8Slogwang 	KqueuerObj *kqobj = NULL;
120a9643ea8Slogwang 	KqueuerObj *kqobj_error = NULL;
121a9643ea8Slogwang 	TAILQ_FOREACH(kqobj, &obj_list, _entry)
122a9643ea8Slogwang 	{
123a9643ea8Slogwang 		if (!KqueueAddObj(kqobj))
124a9643ea8Slogwang 		{
125a9643ea8Slogwang             MTLOG_ERROR("kqobj add failed, fd: %d", kqobj->GetOsfd());
126a9643ea8Slogwang             kqueue_assert(0);
127a9643ea8Slogwang             kqobj_error = kqobj;
128a9643ea8Slogwang             ret = false;
129a9643ea8Slogwang             goto EXIT_LABEL;
130a9643ea8Slogwang 		}
131a9643ea8Slogwang 	}
132a9643ea8Slogwang 
133a9643ea8Slogwang EXIT_LABEL:
134a9643ea8Slogwang 
135a9643ea8Slogwang     if (!ret)
136a9643ea8Slogwang     {
137a9643ea8Slogwang         TAILQ_FOREACH(kqobj, &obj_list, _entry)
138a9643ea8Slogwang         {
139a9643ea8Slogwang             if (kqobj == kqobj_error)
140a9643ea8Slogwang             {
141a9643ea8Slogwang                 break;
142a9643ea8Slogwang             }
143a9643ea8Slogwang             KqueueDelObj(kqobj);
144a9643ea8Slogwang         }
145a9643ea8Slogwang     }
146a9643ea8Slogwang 
147a9643ea8Slogwang     return ret;
148a9643ea8Slogwang }
149a9643ea8Slogwang 
150a9643ea8Slogwang bool KqueueProxy::KqueueDel(KqObjList& obj_list)
151a9643ea8Slogwang {
152a9643ea8Slogwang     bool ret = true;
153a9643ea8Slogwang 
154a9643ea8Slogwang     KqueuerObj *kqobj = NULL;
155a9643ea8Slogwang     TAILQ_FOREACH(kqobj, &obj_list, _entry)
156a9643ea8Slogwang     {
157a9643ea8Slogwang         if (!KqueueDelObj(kqobj))  // failed also need continue, be sure ref count ok
158a9643ea8Slogwang         {
159a9643ea8Slogwang             MTLOG_ERROR("epobj del failed, fd: %d", kqobj->GetOsfd());
160a9643ea8Slogwang             kqueue_assert(0);
161a9643ea8Slogwang             ret = false;
162a9643ea8Slogwang         }
163a9643ea8Slogwang     }
164a9643ea8Slogwang 
165a9643ea8Slogwang     return ret;
166a9643ea8Slogwang }
167a9643ea8Slogwang 
168a9643ea8Slogwang bool KqueueProxy::KqueueCtrlAdd(int fd, int events)
169a9643ea8Slogwang {
170a9643ea8Slogwang 	KqFdRef* item = KqFdRefGet(fd);
171a9643ea8Slogwang 	if (item == NULL)
172a9643ea8Slogwang 	{
173a9643ea8Slogwang         MT_ATTR_API(320851, 1); // fd error, wtf?
174a9643ea8Slogwang         MTLOG_ERROR("kqfd ref not find, failed, fd: %d", fd);
175a9643ea8Slogwang         kqueue_assert(0);
176a9643ea8Slogwang         return false;
177a9643ea8Slogwang 	}
178a9643ea8Slogwang 
179a9643ea8Slogwang 	item->AttachEvents(events);
180a9643ea8Slogwang 
181a9643ea8Slogwang 	int old_events = item->GetListenEvents();
182a9643ea8Slogwang 	int new_events = old_events | events;
183a9643ea8Slogwang 	if (old_events == new_events)
184a9643ea8Slogwang 	{
185a9643ea8Slogwang 		return true;
186a9643ea8Slogwang 	}
187a9643ea8Slogwang 
188a9643ea8Slogwang 	KqEvent ke;
189a9643ea8Slogwang 	int ret;
190a9643ea8Slogwang 	if (old_events & KQ_EVENT_WRITE) {
191a9643ea8Slogwang 		EV_SET(&ke, fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL);
192a9643ea8Slogwang 		ret = ff_kevent(_kqfd, &ke, 1, NULL, 0, NULL);
193a9643ea8Slogwang 		if (ret == -1) {
194a9643ea8Slogwang 			// TODO, error check
195a9643ea8Slogwang 			item->DetachEvents(events);
196a9643ea8Slogwang 			kqueue_assert(0);
197a9643ea8Slogwang 			return false;
198a9643ea8Slogwang 		}
199a9643ea8Slogwang 	}
200a9643ea8Slogwang 	if (old_events & KQ_EVENT_READ) {
201a9643ea8Slogwang 		EV_SET(&ke, fd, EVFILT_READ, EV_DELETE, 0, 0, NULL);
202a9643ea8Slogwang 		ret = ff_kevent(_kqfd, &ke, 1, NULL, 0, NULL);
203a9643ea8Slogwang 		if (ret == -1) {
204a9643ea8Slogwang 			// TODO, error check
205a9643ea8Slogwang 			item->DetachEvents(events);
206a9643ea8Slogwang 			kqueue_assert(0);
207a9643ea8Slogwang 			return false;
208a9643ea8Slogwang 		}
209a9643ea8Slogwang 	}
210a9643ea8Slogwang 	if (events & KQ_EVENT_WRITE) {
211a9643ea8Slogwang 		EV_SET(&ke, fd, EVFILT_WRITE, EV_ADD, 0, 0, NULL);
212a9643ea8Slogwang 		ret = ff_kevent(_kqfd, &ke, 1, NULL, 0, NULL);
213a9643ea8Slogwang 		if (ret == -1) {
214a9643ea8Slogwang 			// TODO, error check
215a9643ea8Slogwang 			item->DetachEvents(events);
216a9643ea8Slogwang 			kqueue_assert(0);
217a9643ea8Slogwang 			return false;
218a9643ea8Slogwang 		}
219a9643ea8Slogwang 	}
220a9643ea8Slogwang 	if (events & KQ_EVENT_READ) {
221a9643ea8Slogwang 		EV_SET(&ke, fd, EVFILT_READ, EV_ADD, 0, 0, NULL);
222a9643ea8Slogwang 		ret = ff_kevent(_kqfd, &ke, 1, NULL, 0, NULL);
223a9643ea8Slogwang 		if (ret == -1) {
224a9643ea8Slogwang 			// TODO, error check
225a9643ea8Slogwang 			item->DetachEvents(events);
226a9643ea8Slogwang 			kqueue_assert(0);
227a9643ea8Slogwang 			return false;
228a9643ea8Slogwang 		}
229a9643ea8Slogwang 	}
230a9643ea8Slogwang 
231a9643ea8Slogwang 	item->SetListenEvents(new_events);
232a9643ea8Slogwang 
233a9643ea8Slogwang 	return true;
234a9643ea8Slogwang }
235a9643ea8Slogwang 
236a9643ea8Slogwang 
237a9643ea8Slogwang bool KqueueProxy::KqueueCtrlDel(int fd, int events)
238a9643ea8Slogwang {
239a9643ea8Slogwang 	return KqueueCtrlDelRef(fd, events, false);
240a9643ea8Slogwang }
241a9643ea8Slogwang 
242a9643ea8Slogwang bool KqueueProxy::KqueueCtrlDelRef(int fd, int events, bool use_ref)
243a9643ea8Slogwang {
244a9643ea8Slogwang 	KqFdRef* item = KqFdRefGet(fd);
245a9643ea8Slogwang 	if (item == NULL)
246a9643ea8Slogwang 	{
247a9643ea8Slogwang         MT_ATTR_API(320851, 1); // fd error
248a9643ea8Slogwang         MTLOG_ERROR("kqfd ref not find, failed, fd: %d", fd);
249a9643ea8Slogwang         kqueue_assert(0);
250a9643ea8Slogwang         return false;
251a9643ea8Slogwang 
252a9643ea8Slogwang 	}
253a9643ea8Slogwang 
254a9643ea8Slogwang 	item->DetachEvents(events);
255a9643ea8Slogwang 	int old_events = item->GetListenEvents();
256a9643ea8Slogwang 	int new_events = old_events &~ events;
257a9643ea8Slogwang 
258a9643ea8Slogwang 	if (use_ref) {
259a9643ea8Slogwang 		new_events = old_events;
260a9643ea8Slogwang 		if (item->ReadRefCnt() == 0) {
261a9643ea8Slogwang 			new_events = new_events & ~KQ_EVENT_READ;
262a9643ea8Slogwang 		}
263a9643ea8Slogwang 		if (item->WriteRefCnt() == 0) {
264a9643ea8Slogwang 			new_events = new_events & ~KQ_EVENT_WRITE;
265a9643ea8Slogwang 		}
266a9643ea8Slogwang 	}
267a9643ea8Slogwang 
268a9643ea8Slogwang 	if (old_events == new_events)
269a9643ea8Slogwang 	{
270a9643ea8Slogwang 		return true;
271a9643ea8Slogwang 	}
272a9643ea8Slogwang 	KqEvent ke;
273a9643ea8Slogwang 	int ret;
274a9643ea8Slogwang 	if (old_events & KQ_EVENT_WRITE) {
275a9643ea8Slogwang 		EV_SET(&ke, fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL);
276a9643ea8Slogwang 		ret = ff_kevent(_kqfd, &ke, 1, NULL, 0, NULL);
277a9643ea8Slogwang 		if (ret == -1) {
278a9643ea8Slogwang 			kqueue_assert(0);
279a9643ea8Slogwang 			return false;
280a9643ea8Slogwang 		}
281a9643ea8Slogwang 	}
282a9643ea8Slogwang 	if (old_events & KQ_EVENT_READ) {
283a9643ea8Slogwang 		EV_SET(&ke, fd, EVFILT_READ, EV_DELETE, 0, 0, NULL);
284a9643ea8Slogwang 		ret = ff_kevent(_kqfd, &ke, 1, NULL, 0, NULL);
285a9643ea8Slogwang 		if (ret == -1) {
286a9643ea8Slogwang 			kqueue_assert(0);
287a9643ea8Slogwang 			return false;
288a9643ea8Slogwang 		}
289a9643ea8Slogwang 	}
290a9643ea8Slogwang 
291a9643ea8Slogwang 	if (new_events & KQ_EVENT_WRITE) {
292a9643ea8Slogwang 		EV_SET(&ke, fd, EVFILT_WRITE, EV_ADD, 0, 0, NULL);
293a9643ea8Slogwang 		ret = ff_kevent(_kqfd, &ke, 1, NULL, 0, NULL);
294a9643ea8Slogwang 		if (ret == -1) {
295a9643ea8Slogwang 			kqueue_assert(0);
296a9643ea8Slogwang 			return false;
297a9643ea8Slogwang 		}
298a9643ea8Slogwang 	}
299a9643ea8Slogwang 	if (new_events & KQ_EVENT_READ) {
300a9643ea8Slogwang 		EV_SET(&ke, fd, EVFILT_READ, EV_ADD, 0, 0, NULL);
301a9643ea8Slogwang 		ret = ff_kevent(_kqfd, &ke, 1, NULL, 0, NULL);
302a9643ea8Slogwang 		if (ret == -1) {
303a9643ea8Slogwang 			kqueue_assert(0);
304a9643ea8Slogwang 			return false;
305a9643ea8Slogwang 		}
306a9643ea8Slogwang 	}
307a9643ea8Slogwang 
308a9643ea8Slogwang 	item->SetListenEvents(new_events);
309a9643ea8Slogwang 
310a9643ea8Slogwang 	return true;
311a9643ea8Slogwang }
312a9643ea8Slogwang 
313a9643ea8Slogwang bool KqueueProxy::KqueueAddObj(KqueuerObj* obj)
314a9643ea8Slogwang {
315a9643ea8Slogwang 	if (obj == NULL)
316a9643ea8Slogwang 	{
317a9643ea8Slogwang         MTLOG_ERROR("kqobj input invalid, %p", obj);
318a9643ea8Slogwang         return false;
319a9643ea8Slogwang 	}
320a9643ea8Slogwang 
321a9643ea8Slogwang 	KqFdRef* item = KqFdRefGet(obj->GetOsfd());
322a9643ea8Slogwang 	if (item == NULL)
323a9643ea8Slogwang 	{
324a9643ea8Slogwang         MT_ATTR_API(320851, 1); // fd error
325a9643ea8Slogwang         MTLOG_ERROR("kqfd ref not find, failed, fd: %d", obj->GetOsfd());
326a9643ea8Slogwang         kqueue_assert(0);
327a9643ea8Slogwang         return false;
328a9643ea8Slogwang 	}
329a9643ea8Slogwang 
330a9643ea8Slogwang 	int ret = obj->KqueueCtlAdd(item);
331a9643ea8Slogwang 	if (ret < 0) {
332a9643ea8Slogwang         MTLOG_ERROR("kqueue ctrl callback failed, fd: %d, obj: %p", obj->GetOsfd(), obj);
333a9643ea8Slogwang         kqueue_assert(0);
334a9643ea8Slogwang         return false;
335a9643ea8Slogwang 	}
336a9643ea8Slogwang 
337a9643ea8Slogwang 	return true;
338a9643ea8Slogwang }
339a9643ea8Slogwang 
340a9643ea8Slogwang bool KqueueProxy::KqueueDelObj(KqueuerObj* obj)
341a9643ea8Slogwang {
342a9643ea8Slogwang 	if (obj == NULL)
343a9643ea8Slogwang 	{
344a9643ea8Slogwang         MTLOG_ERROR("kqobj input invalid, %p", obj);
345a9643ea8Slogwang         return false;
346a9643ea8Slogwang 	}
347a9643ea8Slogwang 	KqFdRef* item = KqFdRefGet(obj->GetOsfd());
348a9643ea8Slogwang 	if (item == NULL)
349a9643ea8Slogwang 	{
350a9643ea8Slogwang         MT_ATTR_API(320851, 1); // fd error
351a9643ea8Slogwang         MTLOG_ERROR("kqfd ref not find, failed, fd: %d", obj->GetOsfd());
352a9643ea8Slogwang         kqueue_assert(0);
353a9643ea8Slogwang         return false;
354a9643ea8Slogwang 	}
355a9643ea8Slogwang 
356a9643ea8Slogwang 	int ret = obj->KqueueCtlDel(item);
357a9643ea8Slogwang 	if (ret < 0) {
358a9643ea8Slogwang         MTLOG_ERROR("kqueue ctrl callback failed, fd: %d, obj: %p", obj->GetOsfd(), obj);
359a9643ea8Slogwang         kqueue_assert(0);
360a9643ea8Slogwang         return false;
361a9643ea8Slogwang 	}
362a9643ea8Slogwang 
363a9643ea8Slogwang 	return true;
364a9643ea8Slogwang }
365a9643ea8Slogwang 
366a9643ea8Slogwang void KqueueProxy::KqueueRcvEventList(int evtfdnum)
367a9643ea8Slogwang {
368a9643ea8Slogwang 	int ret = 0;
369a9643ea8Slogwang 	int osfd = 0;
370a9643ea8Slogwang 	int revents = 0;
371a9643ea8Slogwang 	int tmp_evts = 0;
372a9643ea8Slogwang 	KqFdRef* item = NULL;
373a9643ea8Slogwang 	KqueuerObj* obj = NULL;
374a9643ea8Slogwang 
375a9643ea8Slogwang 	for (int i = 0; i < evtfdnum; i++)
376a9643ea8Slogwang 	{
377*a02c88d6Slogwang 		osfd = _evtlist[i].ident;
378a9643ea8Slogwang 
379a9643ea8Slogwang 		item = KqFdRefGet(osfd);
380a9643ea8Slogwang 		if (item == NULL)
381a9643ea8Slogwang 		{
382a9643ea8Slogwang             MT_ATTR_API(320851, 1); // fd error
383a9643ea8Slogwang             MTLOG_ERROR("kqfd ref not find, failed, fd: %d", osfd);
384a9643ea8Slogwang             kqueue_assert(0);
385a9643ea8Slogwang             continue;
386a9643ea8Slogwang 		}
387a9643ea8Slogwang 		tmp_evts = _evtlist[i].filter;
388a9643ea8Slogwang 		if (tmp_evts == EVFILT_READ) {
389a9643ea8Slogwang 			revents |= KQ_EVENT_READ;
390a9643ea8Slogwang 		}
391a9643ea8Slogwang 		if (tmp_evts == EVFILT_WRITE) {
392a9643ea8Slogwang 			revents |= KQ_EVENT_WRITE;
393a9643ea8Slogwang 		}
394a9643ea8Slogwang 		obj = item->GetNotifyObj();
395a9643ea8Slogwang 		if (obj == NULL)
396a9643ea8Slogwang 		{
397a9643ea8Slogwang             MTLOG_ERROR("fd notify obj null, failed, fd: %d", osfd);
398a9643ea8Slogwang             KqueueCtrlDel(osfd, (revents & (KQ_EVENT_READ | KQ_EVENT_WRITE)));
399a9643ea8Slogwang             continue;
400a9643ea8Slogwang 		}
401a9643ea8Slogwang 		obj->SetRcvEvents(revents);
402a9643ea8Slogwang 
403a9643ea8Slogwang 		if (tmp_evts == EV_ERROR)
404a9643ea8Slogwang 		{
405a9643ea8Slogwang 			obj->HangupNotify();
406a9643ea8Slogwang 			continue;
407a9643ea8Slogwang 		}
408a9643ea8Slogwang 
409a9643ea8Slogwang 		if (revents & KQ_EVENT_READ)
410a9643ea8Slogwang 		{
411a9643ea8Slogwang 			ret = obj->InputNotify();
412a9643ea8Slogwang 			if (ret != 0)
413a9643ea8Slogwang 			{
414a9643ea8Slogwang 				continue;
415a9643ea8Slogwang 			}
416a9643ea8Slogwang 		}
417a9643ea8Slogwang 
418a9643ea8Slogwang 		if (revents & KQ_EVENT_WRITE)
419a9643ea8Slogwang 		{
420a9643ea8Slogwang 			ret = obj->OutputNotify();
421a9643ea8Slogwang 			if (ret != 0)
422a9643ea8Slogwang 			{
423a9643ea8Slogwang 				continue;
424a9643ea8Slogwang 			}
425a9643ea8Slogwang 		}
426a9643ea8Slogwang 	}
427a9643ea8Slogwang }
428a9643ea8Slogwang 
429a9643ea8Slogwang void KqueueProxy::KqueueDispatch()
430a9643ea8Slogwang {
431a9643ea8Slogwang 	int nfd;
432a9643ea8Slogwang 	int wait_time = KqueueGetTimeout();
433a9643ea8Slogwang 	if (wait_time) {
434a9643ea8Slogwang 		struct timespec ts;
435a9643ea8Slogwang 		ts.tv_sec = wait_time / 1000;
436a9643ea8Slogwang 		ts.tv_nsec = 0;
437a9643ea8Slogwang 		nfd = ff_kevent(_kqfd, NULL, 0, _evtlist, _maxfd, &ts);
438a9643ea8Slogwang 	} else {
439a9643ea8Slogwang 		nfd = ff_kevent(_kqfd, NULL, 0, _evtlist, _maxfd, NULL);
440a9643ea8Slogwang 	}
441a9643ea8Slogwang 	if (nfd <= 0)
442a9643ea8Slogwang 	{
443a9643ea8Slogwang 		return;
444a9643ea8Slogwang 	}
445a9643ea8Slogwang 
446a9643ea8Slogwang 	KqueueRcvEventList(nfd);
447a9643ea8Slogwang }
448a9643ea8Slogwang 
449a9643ea8Slogwang int KqueuerObj::InputNotify()
450a9643ea8Slogwang {
451a9643ea8Slogwang 	MicroThread* thread = this->GetOwnerThread();
452a9643ea8Slogwang 	if (thread == NULL)
453a9643ea8Slogwang 	{
454a9643ea8Slogwang 		kqueue_assert(0);
455a9643ea8Slogwang         MTLOG_ERROR("kqueue fd obj, no thread ptr, wrong");
456a9643ea8Slogwang         return -1;
457a9643ea8Slogwang 	}
458a9643ea8Slogwang 
459a9643ea8Slogwang 	if (thread->HasFlag(MicroThread::IO_LIST))
460a9643ea8Slogwang 	{
461a9643ea8Slogwang         MtFrame* frame = MtFrame::Instance();
462a9643ea8Slogwang         frame->RemoveIoWait(thread);
463a9643ea8Slogwang         frame->InsertRunable(thread);
464a9643ea8Slogwang 	}
465a9643ea8Slogwang 
466a9643ea8Slogwang 	return 0;
467a9643ea8Slogwang }
468a9643ea8Slogwang 
469a9643ea8Slogwang int KqueuerObj::OutputNotify()
470a9643ea8Slogwang {
471a9643ea8Slogwang     MicroThread* thread = this->GetOwnerThread();
472a9643ea8Slogwang     if (NULL == thread)
473a9643ea8Slogwang     {
474a9643ea8Slogwang         kqueue_assert(0);
475a9643ea8Slogwang         MTLOG_ERROR("kqueue fd obj, no thread ptr, wrong");
476a9643ea8Slogwang         return -1;
477a9643ea8Slogwang     }
478a9643ea8Slogwang 
479a9643ea8Slogwang     // 多个事件同时到达, 防重复操作
480a9643ea8Slogwang     if (thread->HasFlag(MicroThread::IO_LIST))
481a9643ea8Slogwang     {
482a9643ea8Slogwang         MtFrame* frame = MtFrame::Instance();
483a9643ea8Slogwang         frame->RemoveIoWait(thread);
484a9643ea8Slogwang         frame->InsertRunable(thread);
485a9643ea8Slogwang     }
486a9643ea8Slogwang 
487a9643ea8Slogwang     return 0;
488a9643ea8Slogwang }
489a9643ea8Slogwang 
490a9643ea8Slogwang int KqueuerObj::HangupNotify()
491a9643ea8Slogwang {
492a9643ea8Slogwang     MtFrame* frame = MtFrame::Instance();
493a9643ea8Slogwang     frame->KqueueCtrlDel(this->GetOsfd(), this->GetEvents());
494a9643ea8Slogwang     return 0;
495a9643ea8Slogwang }
496a9643ea8Slogwang 
497a9643ea8Slogwang int KqueuerObj::KqueueCtlAdd(void* args)
498a9643ea8Slogwang {
499a9643ea8Slogwang     MtFrame* frame = MtFrame::Instance();
500a9643ea8Slogwang     KqFdRef* fd_ref = (KqFdRef*)args;
501a9643ea8Slogwang     kqueue_assert(fd_ref != NULL);
502a9643ea8Slogwang 
503a9643ea8Slogwang     int osfd = this->GetOsfd();
504a9643ea8Slogwang     int new_events = this->GetEvents();
505a9643ea8Slogwang 
506a9643ea8Slogwang     // 通知对象需要更新, FD通知对象理论上不会复用, 这里做冲突检查, 异常log记录
507a9643ea8Slogwang     KqueuerObj* old_obj = fd_ref->GetNotifyObj();
508a9643ea8Slogwang     if ((old_obj != NULL) && (old_obj != this))
509a9643ea8Slogwang     {
510a9643ea8Slogwang         MTLOG_ERROR("kqfd ref conflict, fd: %d, old: %p, now: %p", osfd, old_obj, this);
511a9643ea8Slogwang         return -1;
512a9643ea8Slogwang     }
513a9643ea8Slogwang     fd_ref->SetNotifyObj(this);
514a9643ea8Slogwang 
515a9643ea8Slogwang     // 调用框架的epoll ctl接口, 屏蔽epoll ctrl细节
516a9643ea8Slogwang     if (!frame->KqueueCtrlAdd(osfd, new_events))
517a9643ea8Slogwang     {
518a9643ea8Slogwang         MTLOG_ERROR("kqfd ref add failed, log");
519a9643ea8Slogwang         fd_ref->SetNotifyObj(old_obj);
520a9643ea8Slogwang         return -2;
521a9643ea8Slogwang     }
522a9643ea8Slogwang 
523a9643ea8Slogwang     return 0;
524a9643ea8Slogwang }
525a9643ea8Slogwang 
526a9643ea8Slogwang int KqueuerObj::KqueueCtlDel(void* args)
527a9643ea8Slogwang {
528a9643ea8Slogwang     MtFrame* frame = MtFrame::Instance();
529a9643ea8Slogwang     KqFdRef* fd_ref = (KqFdRef*)args;
530a9643ea8Slogwang     kqueue_assert(fd_ref != NULL);
531a9643ea8Slogwang 
532a9643ea8Slogwang     int osfd = this->GetOsfd();
533a9643ea8Slogwang     int events = this->GetEvents();
534a9643ea8Slogwang 
535a9643ea8Slogwang     // 通知对象需要更新, FD通知对象理论上不会复用, 这里做冲突检查, 异常log记录
536a9643ea8Slogwang     KqueuerObj* old_obj = fd_ref->GetNotifyObj();
537a9643ea8Slogwang     if (old_obj != this)
538a9643ea8Slogwang     {
539a9643ea8Slogwang         MTLOG_ERROR("kqfd ref conflict, fd: %d, old: %p, now: %p", osfd, old_obj, this);
540a9643ea8Slogwang         return -1;
541a9643ea8Slogwang     }
542a9643ea8Slogwang     fd_ref->SetNotifyObj(NULL);
543a9643ea8Slogwang 
544a9643ea8Slogwang     // 调用框架的epoll ctl接口, 屏蔽epoll ctrl细节
545a9643ea8Slogwang     if (!frame->KqueueCtrlDelRef(osfd, events, false)) // 引用有风险, 弊大于利, 关闭掉
546a9643ea8Slogwang     {
547a9643ea8Slogwang         MTLOG_ERROR("kqfd ref del failed, log");
548a9643ea8Slogwang         fd_ref->SetNotifyObj(old_obj);
549a9643ea8Slogwang         return -2;
550a9643ea8Slogwang     }
551a9643ea8Slogwang 
552a9643ea8Slogwang     return 0;
553a9643ea8Slogwang 
554a9643ea8Slogwang }
555a9643ea8Slogwang 
556