xref: /f-stack/app/micro_thread/kqueue_proxy.cpp (revision a02c88d6)
1 
2 /**
3  * Tencent is pleased to support the open source community by making MSEC available.
4  *
5  * Copyright (C) 2016 THL A29 Limited, a Tencent company. All rights reserved.
6  *
7  * Licensed under the GNU General Public License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License. You may
9  * obtain a copy of the License at
10  *
11  *     https://opensource.org/licenses/GPL-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software distributed under the
14  * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
15  * either express or implied. See the License for the specific language governing permissions
16  * and limitations under the License.
17  */
18 
19 
20 
21 /**
22  *  @filename kqueue_proxy.cpp
23  *  @info     kqueue for micro thread manage
24  */
25 
26 #include "kqueue_proxy.h"
27 #include "micro_thread.h"
28 #include "ff_hook.h"
29 
30 using namespace NS_MICRO_THREAD;
31 
32 KqueueProxy::KqueueProxy()
33 {
34     _maxfd = KqueueProxy::DEFAULT_MAX_FD_NUM;
35     _kqfd = -1;
36     _evtlist = NULL;
37     _kqrefs = NULL;
38 }
39 
40 int KqueueProxy::InitKqueue(int max_num)
41 {
42 	int rc = 0;
43 	if (max_num > _maxfd)
44 	{
45 		_maxfd = max_num;
46 	}
47 
48 	_kqfd = ff_kqueue();
49 	if (_kqfd < 0)
50 	{
51 		rc = -1;
52 		goto EXIT_LABEL;
53 	}
54 
55 	ff_fcntl(_kqfd, F_SETFD, FD_CLOEXEC);
56 
57 	_kqrefs = new KqFdRef[_maxfd];
58 	if (_kqrefs == NULL)
59 	{
60 		rc = -2;
61 		goto EXIT_LABEL;
62 	}
63 
64 	_evtlist = (KqEvent*)calloc(_maxfd, sizeof(KqEvent));
65 	if (_evtlist == NULL)
66 	{
67 		rc = -3;
68 		goto EXIT_LABEL;
69 	}
70 
71     struct rlimit rlim;
72     memset(&rlim, 0, sizeof(rlim));
73     if (getrlimit(RLIMIT_NOFILE, &rlim) == 0)
74     {
75         if ((int)rlim.rlim_max < _maxfd)
76         {
77             rlim.rlim_cur = rlim.rlim_max;
78             setrlimit(RLIMIT_NOFILE, &rlim);
79             rlim.rlim_cur = _maxfd;
80             rlim.rlim_max = _maxfd;
81             setrlimit(RLIMIT_NOFILE, &rlim);
82         }
83     }
84 
85 EXIT_LABEL:
86 
87     if (rc < 0)
88     {
89         TermKqueue();
90     }
91 
92     return rc;
93 }
94 
95 void KqueueProxy::TermKqueue()
96 {
97     if (_kqfd > 0)
98     {
99         close(_kqfd);
100         _kqfd = -1;
101     }
102 
103     if (_evtlist != NULL)
104     {
105         free(_evtlist);
106         _evtlist = NULL;
107     }
108 
109     if (_kqrefs != NULL)
110     {
111         delete []_kqrefs;
112         _kqrefs = NULL;
113     }
114 }
115 
116 bool KqueueProxy::KqueueAdd(KqObjList& obj_list)
117 {
118 	bool ret = true;
119 	KqueuerObj *kqobj = NULL;
120 	KqueuerObj *kqobj_error = NULL;
121 	TAILQ_FOREACH(kqobj, &obj_list, _entry)
122 	{
123 		if (!KqueueAddObj(kqobj))
124 		{
125             MTLOG_ERROR("kqobj add failed, fd: %d", kqobj->GetOsfd());
126             kqueue_assert(0);
127             kqobj_error = kqobj;
128             ret = false;
129             goto EXIT_LABEL;
130 		}
131 	}
132 
133 EXIT_LABEL:
134 
135     if (!ret)
136     {
137         TAILQ_FOREACH(kqobj, &obj_list, _entry)
138         {
139             if (kqobj == kqobj_error)
140             {
141                 break;
142             }
143             KqueueDelObj(kqobj);
144         }
145     }
146 
147     return ret;
148 }
149 
150 bool KqueueProxy::KqueueDel(KqObjList& obj_list)
151 {
152     bool ret = true;
153 
154     KqueuerObj *kqobj = NULL;
155     TAILQ_FOREACH(kqobj, &obj_list, _entry)
156     {
157         if (!KqueueDelObj(kqobj))  // failed also need continue, be sure ref count ok
158         {
159             MTLOG_ERROR("epobj del failed, fd: %d", kqobj->GetOsfd());
160             kqueue_assert(0);
161             ret = false;
162         }
163     }
164 
165     return ret;
166 }
167 
168 bool KqueueProxy::KqueueCtrlAdd(int fd, int events)
169 {
170 	KqFdRef* item = KqFdRefGet(fd);
171 	if (item == NULL)
172 	{
173         MT_ATTR_API(320851, 1); // fd error, wtf?
174         MTLOG_ERROR("kqfd ref not find, failed, fd: %d", fd);
175         kqueue_assert(0);
176         return false;
177 	}
178 
179 	item->AttachEvents(events);
180 
181 	int old_events = item->GetListenEvents();
182 	int new_events = old_events | events;
183 	if (old_events == new_events)
184 	{
185 		return true;
186 	}
187 
188 	KqEvent ke;
189 	int ret;
190 	if (old_events & KQ_EVENT_WRITE) {
191 		EV_SET(&ke, fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL);
192 		ret = ff_kevent(_kqfd, &ke, 1, NULL, 0, NULL);
193 		if (ret == -1) {
194 			// TODO, error check
195 			item->DetachEvents(events);
196 			kqueue_assert(0);
197 			return false;
198 		}
199 	}
200 	if (old_events & KQ_EVENT_READ) {
201 		EV_SET(&ke, fd, EVFILT_READ, EV_DELETE, 0, 0, NULL);
202 		ret = ff_kevent(_kqfd, &ke, 1, NULL, 0, NULL);
203 		if (ret == -1) {
204 			// TODO, error check
205 			item->DetachEvents(events);
206 			kqueue_assert(0);
207 			return false;
208 		}
209 	}
210 	if (events & KQ_EVENT_WRITE) {
211 		EV_SET(&ke, fd, EVFILT_WRITE, EV_ADD, 0, 0, NULL);
212 		ret = ff_kevent(_kqfd, &ke, 1, NULL, 0, NULL);
213 		if (ret == -1) {
214 			// TODO, error check
215 			item->DetachEvents(events);
216 			kqueue_assert(0);
217 			return false;
218 		}
219 	}
220 	if (events & KQ_EVENT_READ) {
221 		EV_SET(&ke, fd, EVFILT_READ, EV_ADD, 0, 0, NULL);
222 		ret = ff_kevent(_kqfd, &ke, 1, NULL, 0, NULL);
223 		if (ret == -1) {
224 			// TODO, error check
225 			item->DetachEvents(events);
226 			kqueue_assert(0);
227 			return false;
228 		}
229 	}
230 
231 	item->SetListenEvents(new_events);
232 
233 	return true;
234 }
235 
236 
237 bool KqueueProxy::KqueueCtrlDel(int fd, int events)
238 {
239 	return KqueueCtrlDelRef(fd, events, false);
240 }
241 
242 bool KqueueProxy::KqueueCtrlDelRef(int fd, int events, bool use_ref)
243 {
244 	KqFdRef* item = KqFdRefGet(fd);
245 	if (item == NULL)
246 	{
247         MT_ATTR_API(320851, 1); // fd error
248         MTLOG_ERROR("kqfd ref not find, failed, fd: %d", fd);
249         kqueue_assert(0);
250         return false;
251 
252 	}
253 
254 	item->DetachEvents(events);
255 	int old_events = item->GetListenEvents();
256 	int new_events = old_events &~ events;
257 
258 	if (use_ref) {
259 		new_events = old_events;
260 		if (item->ReadRefCnt() == 0) {
261 			new_events = new_events & ~KQ_EVENT_READ;
262 		}
263 		if (item->WriteRefCnt() == 0) {
264 			new_events = new_events & ~KQ_EVENT_WRITE;
265 		}
266 	}
267 
268 	if (old_events == new_events)
269 	{
270 		return true;
271 	}
272 	KqEvent ke;
273 	int ret;
274 	if (old_events & KQ_EVENT_WRITE) {
275 		EV_SET(&ke, fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL);
276 		ret = ff_kevent(_kqfd, &ke, 1, NULL, 0, NULL);
277 		if (ret == -1) {
278 			kqueue_assert(0);
279 			return false;
280 		}
281 	}
282 	if (old_events & KQ_EVENT_READ) {
283 		EV_SET(&ke, fd, EVFILT_READ, EV_DELETE, 0, 0, NULL);
284 		ret = ff_kevent(_kqfd, &ke, 1, NULL, 0, NULL);
285 		if (ret == -1) {
286 			kqueue_assert(0);
287 			return false;
288 		}
289 	}
290 
291 	if (new_events & KQ_EVENT_WRITE) {
292 		EV_SET(&ke, fd, EVFILT_WRITE, EV_ADD, 0, 0, NULL);
293 		ret = ff_kevent(_kqfd, &ke, 1, NULL, 0, NULL);
294 		if (ret == -1) {
295 			kqueue_assert(0);
296 			return false;
297 		}
298 	}
299 	if (new_events & KQ_EVENT_READ) {
300 		EV_SET(&ke, fd, EVFILT_READ, EV_ADD, 0, 0, NULL);
301 		ret = ff_kevent(_kqfd, &ke, 1, NULL, 0, NULL);
302 		if (ret == -1) {
303 			kqueue_assert(0);
304 			return false;
305 		}
306 	}
307 
308 	item->SetListenEvents(new_events);
309 
310 	return true;
311 }
312 
313 bool KqueueProxy::KqueueAddObj(KqueuerObj* obj)
314 {
315 	if (obj == NULL)
316 	{
317         MTLOG_ERROR("kqobj input invalid, %p", obj);
318         return false;
319 	}
320 
321 	KqFdRef* item = KqFdRefGet(obj->GetOsfd());
322 	if (item == NULL)
323 	{
324         MT_ATTR_API(320851, 1); // fd error
325         MTLOG_ERROR("kqfd ref not find, failed, fd: %d", obj->GetOsfd());
326         kqueue_assert(0);
327         return false;
328 	}
329 
330 	int ret = obj->KqueueCtlAdd(item);
331 	if (ret < 0) {
332         MTLOG_ERROR("kqueue ctrl callback failed, fd: %d, obj: %p", obj->GetOsfd(), obj);
333         kqueue_assert(0);
334         return false;
335 	}
336 
337 	return true;
338 }
339 
340 bool KqueueProxy::KqueueDelObj(KqueuerObj* obj)
341 {
342 	if (obj == NULL)
343 	{
344         MTLOG_ERROR("kqobj input invalid, %p", obj);
345         return false;
346 	}
347 	KqFdRef* item = KqFdRefGet(obj->GetOsfd());
348 	if (item == NULL)
349 	{
350         MT_ATTR_API(320851, 1); // fd error
351         MTLOG_ERROR("kqfd ref not find, failed, fd: %d", obj->GetOsfd());
352         kqueue_assert(0);
353         return false;
354 	}
355 
356 	int ret = obj->KqueueCtlDel(item);
357 	if (ret < 0) {
358         MTLOG_ERROR("kqueue ctrl callback failed, fd: %d, obj: %p", obj->GetOsfd(), obj);
359         kqueue_assert(0);
360         return false;
361 	}
362 
363 	return true;
364 }
365 
366 void KqueueProxy::KqueueRcvEventList(int evtfdnum)
367 {
368 	int ret = 0;
369 	int osfd = 0;
370 	int revents = 0;
371 	int tmp_evts = 0;
372 	KqFdRef* item = NULL;
373 	KqueuerObj* obj = NULL;
374 
375 	for (int i = 0; i < evtfdnum; i++)
376 	{
377 		osfd = _evtlist[i].ident;
378 
379 		item = KqFdRefGet(osfd);
380 		if (item == NULL)
381 		{
382             MT_ATTR_API(320851, 1); // fd error
383             MTLOG_ERROR("kqfd ref not find, failed, fd: %d", osfd);
384             kqueue_assert(0);
385             continue;
386 		}
387 		tmp_evts = _evtlist[i].filter;
388 		if (tmp_evts == EVFILT_READ) {
389 			revents |= KQ_EVENT_READ;
390 		}
391 		if (tmp_evts == EVFILT_WRITE) {
392 			revents |= KQ_EVENT_WRITE;
393 		}
394 		obj = item->GetNotifyObj();
395 		if (obj == NULL)
396 		{
397             MTLOG_ERROR("fd notify obj null, failed, fd: %d", osfd);
398             KqueueCtrlDel(osfd, (revents & (KQ_EVENT_READ | KQ_EVENT_WRITE)));
399             continue;
400 		}
401 		obj->SetRcvEvents(revents);
402 
403 		if (tmp_evts == EV_ERROR)
404 		{
405 			obj->HangupNotify();
406 			continue;
407 		}
408 
409 		if (revents & KQ_EVENT_READ)
410 		{
411 			ret = obj->InputNotify();
412 			if (ret != 0)
413 			{
414 				continue;
415 			}
416 		}
417 
418 		if (revents & KQ_EVENT_WRITE)
419 		{
420 			ret = obj->OutputNotify();
421 			if (ret != 0)
422 			{
423 				continue;
424 			}
425 		}
426 	}
427 }
428 
429 void KqueueProxy::KqueueDispatch()
430 {
431 	int nfd;
432 	int wait_time = KqueueGetTimeout();
433 	if (wait_time) {
434 		struct timespec ts;
435 		ts.tv_sec = wait_time / 1000;
436 		ts.tv_nsec = 0;
437 		nfd = ff_kevent(_kqfd, NULL, 0, _evtlist, _maxfd, &ts);
438 	} else {
439 		nfd = ff_kevent(_kqfd, NULL, 0, _evtlist, _maxfd, NULL);
440 	}
441 	if (nfd <= 0)
442 	{
443 		return;
444 	}
445 
446 	KqueueRcvEventList(nfd);
447 }
448 
449 int KqueuerObj::InputNotify()
450 {
451 	MicroThread* thread = this->GetOwnerThread();
452 	if (thread == NULL)
453 	{
454 		kqueue_assert(0);
455         MTLOG_ERROR("kqueue fd obj, no thread ptr, wrong");
456         return -1;
457 	}
458 
459 	if (thread->HasFlag(MicroThread::IO_LIST))
460 	{
461         MtFrame* frame = MtFrame::Instance();
462         frame->RemoveIoWait(thread);
463         frame->InsertRunable(thread);
464 	}
465 
466 	return 0;
467 }
468 
469 int KqueuerObj::OutputNotify()
470 {
471     MicroThread* thread = this->GetOwnerThread();
472     if (NULL == thread)
473     {
474         kqueue_assert(0);
475         MTLOG_ERROR("kqueue fd obj, no thread ptr, wrong");
476         return -1;
477     }
478 
479     // 多个事件同时到达, 防重复操作
480     if (thread->HasFlag(MicroThread::IO_LIST))
481     {
482         MtFrame* frame = MtFrame::Instance();
483         frame->RemoveIoWait(thread);
484         frame->InsertRunable(thread);
485     }
486 
487     return 0;
488 }
489 
490 int KqueuerObj::HangupNotify()
491 {
492     MtFrame* frame = MtFrame::Instance();
493     frame->KqueueCtrlDel(this->GetOsfd(), this->GetEvents());
494     return 0;
495 }
496 
497 int KqueuerObj::KqueueCtlAdd(void* args)
498 {
499     MtFrame* frame = MtFrame::Instance();
500     KqFdRef* fd_ref = (KqFdRef*)args;
501     kqueue_assert(fd_ref != NULL);
502 
503     int osfd = this->GetOsfd();
504     int new_events = this->GetEvents();
505 
506     // 通知对象需要更新, FD通知对象理论上不会复用, 这里做冲突检查, 异常log记录
507     KqueuerObj* old_obj = fd_ref->GetNotifyObj();
508     if ((old_obj != NULL) && (old_obj != this))
509     {
510         MTLOG_ERROR("kqfd ref conflict, fd: %d, old: %p, now: %p", osfd, old_obj, this);
511         return -1;
512     }
513     fd_ref->SetNotifyObj(this);
514 
515     // 调用框架的epoll ctl接口, 屏蔽epoll ctrl细节
516     if (!frame->KqueueCtrlAdd(osfd, new_events))
517     {
518         MTLOG_ERROR("kqfd ref add failed, log");
519         fd_ref->SetNotifyObj(old_obj);
520         return -2;
521     }
522 
523     return 0;
524 }
525 
526 int KqueuerObj::KqueueCtlDel(void* args)
527 {
528     MtFrame* frame = MtFrame::Instance();
529     KqFdRef* fd_ref = (KqFdRef*)args;
530     kqueue_assert(fd_ref != NULL);
531 
532     int osfd = this->GetOsfd();
533     int events = this->GetEvents();
534 
535     // 通知对象需要更新, FD通知对象理论上不会复用, 这里做冲突检查, 异常log记录
536     KqueuerObj* old_obj = fd_ref->GetNotifyObj();
537     if (old_obj != this)
538     {
539         MTLOG_ERROR("kqfd ref conflict, fd: %d, old: %p, now: %p", osfd, old_obj, this);
540         return -1;
541     }
542     fd_ref->SetNotifyObj(NULL);
543 
544     // 调用框架的epoll ctl接口, 屏蔽epoll ctrl细节
545     if (!frame->KqueueCtrlDelRef(osfd, events, false)) // 引用有风险, 弊大于利, 关闭掉
546     {
547         MTLOG_ERROR("kqfd ref del failed, log");
548         fd_ref->SetNotifyObj(old_obj);
549         return -2;
550     }
551 
552     return 0;
553 
554 }
555 
556