xref: /f-stack/app/micro_thread/kqueue_proxy.cpp (revision a9643ea8)
1 
2 /**
3  * Tencent is pleased to support the open source community by making MSEC available.
4  *
5  * Copyright (C) 2016 THL A29 Limited, a Tencent company. All rights reserved.
6  *
7  * Licensed under the GNU General Public License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License. You may
9  * obtain a copy of the License at
10  *
11  *     https://opensource.org/licenses/GPL-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software distributed under the
14  * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
15  * either express or implied. See the License for the specific language governing permissions
16  * and limitations under the License.
17  */
18 
19 
20 
21 /**
22  *  @filename kqueue_proxy.cpp
23  *  @info     kqueue for micro thread manage
24  */
25 
26 #include "kqueue_proxy.h"
27 #include "micro_thread.h"
28 #include "ff_hook.h"
29 
30 using namespace NS_MICRO_THREAD;
31 
32 KqueueProxy::KqueueProxy()
33 {
34     _maxfd = KqueueProxy::DEFAULT_MAX_FD_NUM;
35     _kqfd = -1;
36     _evtlist = NULL;
37     _kqrefs = NULL;
38 }
39 
40 int KqueueProxy::InitKqueue(int max_num)
41 {
42 	int rc = 0;
43 	if (max_num > _maxfd)
44 	{
45 		_maxfd = max_num;
46 	}
47 
48 	_kqfd = ff_kqueue();
49 	if (_kqfd < 0)
50 	{
51 		rc = -1;
52 		goto EXIT_LABEL;
53 	}
54 
55 	ff_fcntl(_kqfd, F_SETFD, FD_CLOEXEC);
56 
57 	_kqrefs = new KqFdRef[_maxfd];
58 	if (_kqrefs == NULL)
59 	{
60 		rc = -2;
61 		goto EXIT_LABEL;
62 	}
63 
64 	_evtlist = (KqEvent*)calloc(_maxfd, sizeof(KqEvent));
65 	if (_evtlist == NULL)
66 	{
67 		rc = -3;
68 		goto EXIT_LABEL;
69 	}
70 
71     struct rlimit rlim;
72     memset(&rlim, 0, sizeof(rlim));
73     if (getrlimit(RLIMIT_NOFILE, &rlim) == 0)
74     {
75         if ((int)rlim.rlim_max < _maxfd)
76         {
77             rlim.rlim_cur = rlim.rlim_max;
78             setrlimit(RLIMIT_NOFILE, &rlim);
79             rlim.rlim_cur = _maxfd;
80             rlim.rlim_max = _maxfd;
81             setrlimit(RLIMIT_NOFILE, &rlim);
82         }
83     }
84 
85 EXIT_LABEL:
86 
87     if (rc < 0)
88     {
89         TermKqueue();
90     }
91 
92     return rc;
93 }
94 
95 void KqueueProxy::TermKqueue()
96 {
97     if (_kqfd > 0)
98     {
99         close(_kqfd);
100         _kqfd = -1;
101     }
102 
103     if (_evtlist != NULL)
104     {
105         free(_evtlist);
106         _evtlist = NULL;
107     }
108 
109     if (_kqrefs != NULL)
110     {
111         delete []_kqrefs;
112         _kqrefs = NULL;
113     }
114 }
115 
116 bool KqueueProxy::KqueueAdd(KqObjList& obj_list)
117 {
118 	bool ret = true;
119 	KqueuerObj *kqobj = NULL;
120 	KqueuerObj *kqobj_error = NULL;
121 	TAILQ_FOREACH(kqobj, &obj_list, _entry)
122 	{
123 		if (!KqueueAddObj(kqobj))
124 		{
125             MTLOG_ERROR("kqobj add failed, fd: %d", kqobj->GetOsfd());
126             kqueue_assert(0);
127             kqobj_error = kqobj;
128             ret = false;
129             goto EXIT_LABEL;
130 		}
131 	}
132 
133 EXIT_LABEL:
134 
135     if (!ret)
136     {
137         TAILQ_FOREACH(kqobj, &obj_list, _entry)
138         {
139             if (kqobj == kqobj_error)
140             {
141                 break;
142             }
143             KqueueDelObj(kqobj);
144         }
145     }
146 
147     return ret;
148 }
149 
150 bool KqueueProxy::KqueueDel(KqObjList& obj_list)
151 {
152     bool ret = true;
153 
154     KqueuerObj *kqobj = NULL;
155     TAILQ_FOREACH(kqobj, &obj_list, _entry)
156     {
157         if (!KqueueDelObj(kqobj))  // failed also need continue, be sure ref count ok
158         {
159             MTLOG_ERROR("epobj del failed, fd: %d", kqobj->GetOsfd());
160             kqueue_assert(0);
161             ret = false;
162         }
163     }
164 
165     return ret;
166 }
167 
168 bool KqueueProxy::KqueueCtrlAdd(int fd, int events)
169 {
170 	KqFdRef* item = KqFdRefGet(fd);
171 	if (item == NULL)
172 	{
173         MT_ATTR_API(320851, 1); // fd error, wtf?
174         MTLOG_ERROR("kqfd ref not find, failed, fd: %d", fd);
175         kqueue_assert(0);
176         return false;
177 	}
178 
179 	item->AttachEvents(events);
180 
181 	int old_events = item->GetListenEvents();
182 	int new_events = old_events | events;
183 	if (old_events == new_events)
184 	{
185 		return true;
186 	}
187 
188 	KqEvent ke;
189 	int ret;
190 	if (CHK_FD_BIT(fd)) {
191 		fd = CLR_FD_BIT(fd);
192     }
193 	if (old_events & KQ_EVENT_WRITE) {
194 		EV_SET(&ke, fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL);
195 		ret = ff_kevent(_kqfd, &ke, 1, NULL, 0, NULL);
196 		if (ret == -1) {
197 			// TODO, error check
198 			item->DetachEvents(events);
199 			kqueue_assert(0);
200 			return false;
201 		}
202 	}
203 	if (old_events & KQ_EVENT_READ) {
204 		EV_SET(&ke, fd, EVFILT_READ, EV_DELETE, 0, 0, NULL);
205 		ret = ff_kevent(_kqfd, &ke, 1, NULL, 0, NULL);
206 		if (ret == -1) {
207 			// TODO, error check
208 			item->DetachEvents(events);
209 			kqueue_assert(0);
210 			return false;
211 		}
212 	}
213 	if (events & KQ_EVENT_WRITE) {
214 		EV_SET(&ke, fd, EVFILT_WRITE, EV_ADD, 0, 0, NULL);
215 		ret = ff_kevent(_kqfd, &ke, 1, NULL, 0, NULL);
216 		if (ret == -1) {
217 			// TODO, error check
218 			item->DetachEvents(events);
219 			kqueue_assert(0);
220 			return false;
221 		}
222 	}
223 	if (events & KQ_EVENT_READ) {
224 		EV_SET(&ke, fd, EVFILT_READ, EV_ADD, 0, 0, NULL);
225 		ret = ff_kevent(_kqfd, &ke, 1, NULL, 0, NULL);
226 		if (ret == -1) {
227 			// TODO, error check
228 			item->DetachEvents(events);
229 			kqueue_assert(0);
230 			return false;
231 		}
232 	}
233 
234 	item->SetListenEvents(new_events);
235 
236 	return true;
237 }
238 
239 
240 bool KqueueProxy::KqueueCtrlDel(int fd, int events)
241 {
242 	return KqueueCtrlDelRef(fd, events, false);
243 }
244 
245 bool KqueueProxy::KqueueCtrlDelRef(int fd, int events, bool use_ref)
246 {
247 	KqFdRef* item = KqFdRefGet(fd);
248 	if (item == NULL)
249 	{
250         MT_ATTR_API(320851, 1); // fd error
251         MTLOG_ERROR("kqfd ref not find, failed, fd: %d", fd);
252         kqueue_assert(0);
253         return false;
254 
255 	}
256 
257 	item->DetachEvents(events);
258 	int old_events = item->GetListenEvents();
259 	int new_events = old_events &~ events;
260 
261 	if (use_ref) {
262 		new_events = old_events;
263 		if (item->ReadRefCnt() == 0) {
264 			new_events = new_events & ~KQ_EVENT_READ;
265 		}
266 		if (item->WriteRefCnt() == 0) {
267 			new_events = new_events & ~KQ_EVENT_WRITE;
268 		}
269 	}
270 
271 	if (old_events == new_events)
272 	{
273 		return true;
274 	}
275 	KqEvent ke;
276 	int ret;
277 	if (CHK_FD_BIT(fd)) {
278 		fd = CLR_FD_BIT(fd);
279     }
280 	if (old_events & KQ_EVENT_WRITE) {
281 		EV_SET(&ke, fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL);
282 		ret = ff_kevent(_kqfd, &ke, 1, NULL, 0, NULL);
283 		if (ret == -1) {
284 			kqueue_assert(0);
285 			return false;
286 		}
287 	}
288 	if (old_events & KQ_EVENT_READ) {
289 		EV_SET(&ke, fd, EVFILT_READ, EV_DELETE, 0, 0, NULL);
290 		ret = ff_kevent(_kqfd, &ke, 1, NULL, 0, NULL);
291 		if (ret == -1) {
292 			kqueue_assert(0);
293 			return false;
294 		}
295 	}
296 
297 	if (new_events & KQ_EVENT_WRITE) {
298 		EV_SET(&ke, fd, EVFILT_WRITE, EV_ADD, 0, 0, NULL);
299 		ret = ff_kevent(_kqfd, &ke, 1, NULL, 0, NULL);
300 		if (ret == -1) {
301 			kqueue_assert(0);
302 			return false;
303 		}
304 	}
305 	if (new_events & KQ_EVENT_READ) {
306 		EV_SET(&ke, fd, EVFILT_READ, EV_ADD, 0, 0, NULL);
307 		ret = ff_kevent(_kqfd, &ke, 1, NULL, 0, NULL);
308 		if (ret == -1) {
309 			kqueue_assert(0);
310 			return false;
311 		}
312 	}
313 
314 	item->SetListenEvents(new_events);
315 
316 	return true;
317 }
318 
319 bool KqueueProxy::KqueueAddObj(KqueuerObj* obj)
320 {
321 	if (obj == NULL)
322 	{
323         MTLOG_ERROR("kqobj input invalid, %p", obj);
324         return false;
325 	}
326 
327 	KqFdRef* item = KqFdRefGet(obj->GetOsfd());
328 	if (item == NULL)
329 	{
330         MT_ATTR_API(320851, 1); // fd error
331         MTLOG_ERROR("kqfd ref not find, failed, fd: %d", obj->GetOsfd());
332         kqueue_assert(0);
333         return false;
334 	}
335 
336 	int ret = obj->KqueueCtlAdd(item);
337 	if (ret < 0) {
338         MTLOG_ERROR("kqueue ctrl callback failed, fd: %d, obj: %p", obj->GetOsfd(), obj);
339         kqueue_assert(0);
340         return false;
341 	}
342 
343 	return true;
344 }
345 
346 bool KqueueProxy::KqueueDelObj(KqueuerObj* obj)
347 {
348 	if (obj == NULL)
349 	{
350         MTLOG_ERROR("kqobj input invalid, %p", obj);
351         return false;
352 	}
353 	KqFdRef* item = KqFdRefGet(obj->GetOsfd());
354 	if (item == NULL)
355 	{
356         MT_ATTR_API(320851, 1); // fd error
357         MTLOG_ERROR("kqfd ref not find, failed, fd: %d", obj->GetOsfd());
358         kqueue_assert(0);
359         return false;
360 	}
361 
362 	int ret = obj->KqueueCtlDel(item);
363 	if (ret < 0) {
364         MTLOG_ERROR("kqueue ctrl callback failed, fd: %d, obj: %p", obj->GetOsfd(), obj);
365         kqueue_assert(0);
366         return false;
367 	}
368 
369 	return true;
370 }
371 
372 void KqueueProxy::KqueueRcvEventList(int evtfdnum)
373 {
374 	int ret = 0;
375 	int osfd = 0;
376 	int revents = 0;
377 	int tmp_evts = 0;
378 	KqFdRef* item = NULL;
379 	KqueuerObj* obj = NULL;
380 
381 	for (int i = 0; i < evtfdnum; i++)
382 	{
383 		osfd = _evtlist[i].ident |= 1 << FF_FD_BITS;
384 
385 		item = KqFdRefGet(osfd);
386 		if (item == NULL)
387 		{
388             MT_ATTR_API(320851, 1); // fd error
389             MTLOG_ERROR("kqfd ref not find, failed, fd: %d", osfd);
390             kqueue_assert(0);
391             continue;
392 		}
393 		tmp_evts = _evtlist[i].filter;
394 		if (tmp_evts == EVFILT_READ) {
395 			revents |= KQ_EVENT_READ;
396 		}
397 		if (tmp_evts == EVFILT_WRITE) {
398 			revents |= KQ_EVENT_WRITE;
399 		}
400 		obj = item->GetNotifyObj();
401 		if (obj == NULL)
402 		{
403             MTLOG_ERROR("fd notify obj null, failed, fd: %d", osfd);
404             KqueueCtrlDel(osfd, (revents & (KQ_EVENT_READ | KQ_EVENT_WRITE)));
405             continue;
406 		}
407 		obj->SetRcvEvents(revents);
408 
409 		if (tmp_evts == EV_ERROR)
410 		{
411 			obj->HangupNotify();
412 			continue;
413 		}
414 
415 		if (revents & KQ_EVENT_READ)
416 		{
417 			ret = obj->InputNotify();
418 			if (ret != 0)
419 			{
420 				continue;
421 			}
422 		}
423 
424 		if (revents & KQ_EVENT_WRITE)
425 		{
426 			ret = obj->OutputNotify();
427 			if (ret != 0)
428 			{
429 				continue;
430 			}
431 		}
432 	}
433 }
434 
435 void KqueueProxy::KqueueDispatch()
436 {
437 	int nfd;
438 	int wait_time = KqueueGetTimeout();
439 	if (wait_time) {
440 		struct timespec ts;
441 		ts.tv_sec = wait_time / 1000;
442 		ts.tv_nsec = 0;
443 		nfd = ff_kevent(_kqfd, NULL, 0, _evtlist, _maxfd, &ts);
444 	} else {
445 		nfd = ff_kevent(_kqfd, NULL, 0, _evtlist, _maxfd, NULL);
446 	}
447 	if (nfd <= 0)
448 	{
449 		return;
450 	}
451 
452 	KqueueRcvEventList(nfd);
453 }
454 
455 int KqueuerObj::InputNotify()
456 {
457 	MicroThread* thread = this->GetOwnerThread();
458 	if (thread == NULL)
459 	{
460 		kqueue_assert(0);
461         MTLOG_ERROR("kqueue fd obj, no thread ptr, wrong");
462         return -1;
463 	}
464 
465 	if (thread->HasFlag(MicroThread::IO_LIST))
466 	{
467         MtFrame* frame = MtFrame::Instance();
468         frame->RemoveIoWait(thread);
469         frame->InsertRunable(thread);
470 	}
471 
472 	return 0;
473 }
474 
475 int KqueuerObj::OutputNotify()
476 {
477     MicroThread* thread = this->GetOwnerThread();
478     if (NULL == thread)
479     {
480         kqueue_assert(0);
481         MTLOG_ERROR("kqueue fd obj, no thread ptr, wrong");
482         return -1;
483     }
484 
485     // 多个事件同时到达, 防重复操作
486     if (thread->HasFlag(MicroThread::IO_LIST))
487     {
488         MtFrame* frame = MtFrame::Instance();
489         frame->RemoveIoWait(thread);
490         frame->InsertRunable(thread);
491     }
492 
493     return 0;
494 }
495 
496 int KqueuerObj::HangupNotify()
497 {
498     MtFrame* frame = MtFrame::Instance();
499     frame->KqueueCtrlDel(this->GetOsfd(), this->GetEvents());
500     return 0;
501 }
502 
503 int KqueuerObj::KqueueCtlAdd(void* args)
504 {
505     MtFrame* frame = MtFrame::Instance();
506     KqFdRef* fd_ref = (KqFdRef*)args;
507     kqueue_assert(fd_ref != NULL);
508 
509     int osfd = this->GetOsfd();
510     int new_events = this->GetEvents();
511 
512     // 通知对象需要更新, FD通知对象理论上不会复用, 这里做冲突检查, 异常log记录
513     KqueuerObj* old_obj = fd_ref->GetNotifyObj();
514     if ((old_obj != NULL) && (old_obj != this))
515     {
516         MTLOG_ERROR("kqfd ref conflict, fd: %d, old: %p, now: %p", osfd, old_obj, this);
517         return -1;
518     }
519     fd_ref->SetNotifyObj(this);
520 
521     // 调用框架的epoll ctl接口, 屏蔽epoll ctrl细节
522     if (!frame->KqueueCtrlAdd(osfd, new_events))
523     {
524         MTLOG_ERROR("kqfd ref add failed, log");
525         fd_ref->SetNotifyObj(old_obj);
526         return -2;
527     }
528 
529     return 0;
530 }
531 
532 int KqueuerObj::KqueueCtlDel(void* args)
533 {
534     MtFrame* frame = MtFrame::Instance();
535     KqFdRef* fd_ref = (KqFdRef*)args;
536     kqueue_assert(fd_ref != NULL);
537 
538     int osfd = this->GetOsfd();
539     int events = this->GetEvents();
540 
541     // 通知对象需要更新, FD通知对象理论上不会复用, 这里做冲突检查, 异常log记录
542     KqueuerObj* old_obj = fd_ref->GetNotifyObj();
543     if (old_obj != this)
544     {
545         MTLOG_ERROR("kqfd ref conflict, fd: %d, old: %p, now: %p", osfd, old_obj, this);
546         return -1;
547     }
548     fd_ref->SetNotifyObj(NULL);
549 
550     // 调用框架的epoll ctl接口, 屏蔽epoll ctrl细节
551     if (!frame->KqueueCtrlDelRef(osfd, events, false)) // 引用有风险, 弊大于利, 关闭掉
552     {
553         MTLOG_ERROR("kqfd ref del failed, log");
554         fd_ref->SetNotifyObj(old_obj);
555         return -2;
556     }
557 
558     return 0;
559 
560 }
561 
562