xref: /f-stack/app/micro_thread/kqueue_proxy.cpp (revision 5e0cf829)
1 
2 /**
3  * Tencent is pleased to support the open source community by making MSEC available.
4  *
5  * Copyright (C) 2016 THL A29 Limited, a Tencent company. All rights reserved.
6  *
7  * Licensed under the GNU General Public License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License. You may
9  * obtain a copy of the License at
10  *
11  *     https://opensource.org/licenses/GPL-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software distributed under the
14  * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
15  * either express or implied. See the License for the specific language governing permissions
16  * and limitations under the License.
17  */
18 
19 
20 
21 /**
22  *  @filename kqueue_proxy.cpp
23  *  @info     kqueue for micro thread manage
24  */
25 
26 #include "kqueue_proxy.h"
27 #include "micro_thread.h"
28 #include "ff_hook.h"
29 
30 using namespace NS_MICRO_THREAD;
31 
32 KqueueProxy::KqueueProxy()
33 {
34     _maxfd = KqueueProxy::DEFAULT_MAX_FD_NUM;
35     _kqfd = -1;
36     _evtlist = NULL;
37     _kqrefs = NULL;
38 }
39 
40 int KqueueProxy::InitKqueue(int max_num)
41 {
42     int rc = 0;
43     if (max_num > _maxfd)
44     {
45         _maxfd = max_num;
46     }
47 
48     _kqfd = ff_kqueue();
49     if (_kqfd < 0)
50     {
51         rc = -1;
52         goto EXIT_LABEL;
53     }
54 
55     ff_fcntl(_kqfd, F_SETFD, FD_CLOEXEC);
56 
57     _kqrefs = new KqFdRef[_maxfd];
58     if (_kqrefs == NULL)
59     {
60         rc = -2;
61         goto EXIT_LABEL;
62     }
63 
64     _evtlist = (KqEvent*)calloc(_maxfd, sizeof(KqEvent));
65     if (_evtlist == NULL)
66     {
67         rc = -3;
68         goto EXIT_LABEL;
69     }
70 
71     struct rlimit rlim;
72     memset(&rlim, 0, sizeof(rlim));
73     if (getrlimit(RLIMIT_NOFILE, &rlim) == 0)
74     {
75         if ((int)rlim.rlim_max < _maxfd)
76         {
77             rlim.rlim_cur = rlim.rlim_max;
78             setrlimit(RLIMIT_NOFILE, &rlim);
79             rlim.rlim_cur = _maxfd;
80             rlim.rlim_max = _maxfd;
81             setrlimit(RLIMIT_NOFILE, &rlim);
82         }
83     }
84 
85 EXIT_LABEL:
86 
87     if (rc < 0)
88     {
89         TermKqueue();
90     }
91 
92     return rc;
93 }
94 
95 void KqueueProxy::TermKqueue()
96 {
97     if (_kqfd > 0)
98     {
99         close(_kqfd);
100         _kqfd = -1;
101     }
102 
103     if (_evtlist != NULL)
104     {
105         free(_evtlist);
106         _evtlist = NULL;
107     }
108 
109     if (_kqrefs != NULL)
110     {
111         delete []_kqrefs;
112         _kqrefs = NULL;
113     }
114 }
115 
116 bool KqueueProxy::KqueueAdd(KqObjList& obj_list)
117 {
118     bool ret = true;
119     KqueuerObj *kqobj = NULL;
120     KqueuerObj *kqobj_error = NULL;
121     TAILQ_FOREACH(kqobj, &obj_list, _entry)
122     {
123         if (!KqueueAddObj(kqobj))
124         {
125             MTLOG_ERROR("kqobj add failed, fd: %d", kqobj->GetOsfd());
126             kqueue_assert(0);
127             kqobj_error = kqobj;
128             ret = false;
129             goto EXIT_LABEL;
130         }
131     }
132 
133 EXIT_LABEL:
134 
135     if (!ret)
136     {
137         TAILQ_FOREACH(kqobj, &obj_list, _entry)
138         {
139             if (kqobj == kqobj_error)
140             {
141                 break;
142             }
143             KqueueDelObj(kqobj);
144         }
145     }
146 
147     return ret;
148 }
149 
150 bool KqueueProxy::KqueueDel(KqObjList& obj_list)
151 {
152     bool ret = true;
153 
154     KqueuerObj *kqobj = NULL;
155     TAILQ_FOREACH(kqobj, &obj_list, _entry)
156     {
157         if (!KqueueDelObj(kqobj))  // failed also need continue, be sure ref count ok
158         {
159             MTLOG_ERROR("epobj del failed, fd: %d", kqobj->GetOsfd());
160             kqueue_assert(0);
161             ret = false;
162         }
163     }
164 
165     return ret;
166 }
167 
168 bool KqueueProxy::KqueueCtrlAdd(int fd, int events)
169 {
170     KqFdRef* item = KqFdRefGet(fd);
171     if (item == NULL)
172     {
173         MT_ATTR_API(320851, 1); // fd error, wtf?
174         MTLOG_ERROR("kqfd ref not find, failed, fd: %d", fd);
175         kqueue_assert(0);
176         return false;
177     }
178 
179     item->AttachEvents(events);
180 
181     int old_events = item->GetListenEvents();
182     int new_events = old_events | events;
183     if (old_events == new_events)
184     {
185         return true;
186     }
187 
188     KqEvent ke;
189     int ret;
190     if (old_events & KQ_EVENT_WRITE) {
191         EV_SET(&ke, fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL);
192         ret = ff_kevent(_kqfd, &ke, 1, NULL, 0, NULL);
193         if (ret == -1) {
194             // TODO, error check
195             item->DetachEvents(events);
196             kqueue_assert(0);
197             return false;
198         }
199     }
200     if (old_events & KQ_EVENT_READ) {
201         EV_SET(&ke, fd, EVFILT_READ, EV_DELETE, 0, 0, NULL);
202         ret = ff_kevent(_kqfd, &ke, 1, NULL, 0, NULL);
203         if (ret == -1) {
204             // TODO, error check
205             item->DetachEvents(events);
206             kqueue_assert(0);
207             return false;
208         }
209     }
210     if (events & KQ_EVENT_WRITE) {
211         EV_SET(&ke, fd, EVFILT_WRITE, EV_ADD, 0, 0, NULL);
212         ret = ff_kevent(_kqfd, &ke, 1, NULL, 0, NULL);
213         if (ret == -1) {
214             // TODO, error check
215             item->DetachEvents(events);
216             kqueue_assert(0);
217             return false;
218         }
219     }
220     if (events & KQ_EVENT_READ) {
221         EV_SET(&ke, fd, EVFILT_READ, EV_ADD, 0, 0, NULL);
222         ret = ff_kevent(_kqfd, &ke, 1, NULL, 0, NULL);
223         if (ret == -1) {
224             // TODO, error check
225             item->DetachEvents(events);
226             kqueue_assert(0);
227             return false;
228         }
229     }
230 
231     item->SetListenEvents(new_events);
232 
233     return true;
234 }
235 
236 
237 bool KqueueProxy::KqueueCtrlDel(int fd, int events)
238 {
239     return KqueueCtrlDelRef(fd, events, false);
240 }
241 
242 bool KqueueProxy::KqueueCtrlDelRef(int fd, int events, bool use_ref)
243 {
244     KqFdRef* item = KqFdRefGet(fd);
245     if (item == NULL)
246     {
247         MT_ATTR_API(320851, 1); // fd error
248         MTLOG_ERROR("kqfd ref not find, failed, fd: %d", fd);
249         kqueue_assert(0);
250         return false;
251 
252     }
253 
254     item->DetachEvents(events);
255     int old_events = item->GetListenEvents();
256     int new_events = old_events &~ events;
257 
258     if (use_ref) {
259         new_events = old_events;
260         if (item->ReadRefCnt() == 0) {
261             new_events = new_events & ~KQ_EVENT_READ;
262         }
263         if (item->WriteRefCnt() == 0) {
264             new_events = new_events & ~KQ_EVENT_WRITE;
265         }
266     }
267 
268     if (old_events == new_events)
269     {
270         return true;
271     }
272     KqEvent ke;
273     int ret;
274     if (old_events & KQ_EVENT_WRITE) {
275         EV_SET(&ke, fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL);
276         ret = ff_kevent(_kqfd, &ke, 1, NULL, 0, NULL);
277         if (ret == -1) {
278             kqueue_assert(0);
279             return false;
280         }
281     }
282     if (old_events & KQ_EVENT_READ) {
283         EV_SET(&ke, fd, EVFILT_READ, EV_DELETE, 0, 0, NULL);
284         ret = ff_kevent(_kqfd, &ke, 1, NULL, 0, NULL);
285         if (ret == -1) {
286             kqueue_assert(0);
287             return false;
288         }
289     }
290 
291     if (new_events & KQ_EVENT_WRITE) {
292         EV_SET(&ke, fd, EVFILT_WRITE, EV_ADD, 0, 0, NULL);
293         ret = ff_kevent(_kqfd, &ke, 1, NULL, 0, NULL);
294         if (ret == -1) {
295             kqueue_assert(0);
296             return false;
297         }
298     }
299     if (new_events & KQ_EVENT_READ) {
300         EV_SET(&ke, fd, EVFILT_READ, EV_ADD, 0, 0, NULL);
301         ret = ff_kevent(_kqfd, &ke, 1, NULL, 0, NULL);
302         if (ret == -1) {
303             kqueue_assert(0);
304             return false;
305         }
306     }
307 
308     item->SetListenEvents(new_events);
309 
310     return true;
311 }
312 
313 bool KqueueProxy::KqueueAddObj(KqueuerObj* obj)
314 {
315     if (obj == NULL)
316     {
317         MTLOG_ERROR("kqobj input invalid, %p", obj);
318         return false;
319     }
320 
321     KqFdRef* item = KqFdRefGet(obj->GetOsfd());
322     if (item == NULL)
323     {
324         MT_ATTR_API(320851, 1); // fd error
325         MTLOG_ERROR("kqfd ref not find, failed, fd: %d", obj->GetOsfd());
326         kqueue_assert(0);
327         return false;
328     }
329 
330     int ret = obj->KqueueCtlAdd(item);
331     if (ret < 0) {
332         MTLOG_ERROR("kqueue ctrl callback failed, fd: %d, obj: %p", obj->GetOsfd(), obj);
333         kqueue_assert(0);
334         return false;
335     }
336 
337     return true;
338 }
339 
340 bool KqueueProxy::KqueueDelObj(KqueuerObj* obj)
341 {
342     if (obj == NULL)
343     {
344         MTLOG_ERROR("kqobj input invalid, %p", obj);
345         return false;
346     }
347     KqFdRef* item = KqFdRefGet(obj->GetOsfd());
348     if (item == NULL)
349     {
350         MT_ATTR_API(320851, 1); // fd error
351         MTLOG_ERROR("kqfd ref not find, failed, fd: %d", obj->GetOsfd());
352         kqueue_assert(0);
353         return false;
354     }
355 
356     int ret = obj->KqueueCtlDel(item);
357     if (ret < 0) {
358         MTLOG_ERROR("kqueue ctrl callback failed, fd: %d, obj: %p", obj->GetOsfd(), obj);
359         kqueue_assert(0);
360         return false;
361     }
362 
363     return true;
364 }
365 
366 void KqueueProxy::KqueueRcvEventList(int evtfdnum)
367 {
368     int ret = 0;
369     int osfd = 0;
370     int revents = 0;
371     int tmp_evts = 0;
372     KqFdRef* item = NULL;
373     KqueuerObj* obj = NULL;
374 
375     for (int i = 0; i < evtfdnum; i++)
376     {
377         osfd = _evtlist[i].ident;
378 
379         item = KqFdRefGet(osfd);
380         if (item == NULL)
381         {
382             MT_ATTR_API(320851, 1); // fd error
383             MTLOG_ERROR("kqfd ref not find, failed, fd: %d", osfd);
384             kqueue_assert(0);
385             continue;
386         }
387         tmp_evts = _evtlist[i].filter;
388         if (tmp_evts == EVFILT_READ) {
389             revents |= KQ_EVENT_READ;
390         }
391         if (tmp_evts == EVFILT_WRITE) {
392             revents |= KQ_EVENT_WRITE;
393         }
394         obj = item->GetNotifyObj();
395         if (obj == NULL)
396         {
397             MTLOG_ERROR("fd notify obj null, failed, fd: %d", osfd);
398             KqueueCtrlDel(osfd, (revents & (KQ_EVENT_READ | KQ_EVENT_WRITE)));
399             continue;
400         }
401         obj->SetRcvEvents(revents);
402 
403         if (tmp_evts == EV_ERROR)
404         {
405             obj->HangupNotify();
406             continue;
407         }
408 
409         if (revents & KQ_EVENT_READ)
410         {
411             ret = obj->InputNotify();
412             if (ret != 0)
413             {
414                 continue;
415             }
416         }
417 
418         if (revents & KQ_EVENT_WRITE)
419         {
420             ret = obj->OutputNotify();
421             if (ret != 0)
422             {
423                 continue;
424             }
425         }
426     }
427 }
428 
429 void KqueueProxy::KqueueDispatch()
430 {
431     int nfd;
432     int wait_time = KqueueGetTimeout();
433     if (wait_time) {
434         struct timespec ts;
435         ts.tv_sec = wait_time / 1000;
436         ts.tv_nsec = 0;
437         nfd = ff_kevent(_kqfd, NULL, 0, _evtlist, _maxfd, &ts);
438     } else {
439         nfd = ff_kevent(_kqfd, NULL, 0, _evtlist, _maxfd, NULL);
440     }
441     if (nfd <= 0)
442     {
443         return;
444     }
445 
446     KqueueRcvEventList(nfd);
447 }
448 
449 int KqueuerObj::InputNotify()
450 {
451     MicroThread* thread = this->GetOwnerThread();
452     if (thread == NULL)
453     {
454         kqueue_assert(0);
455         MTLOG_ERROR("kqueue fd obj, no thread ptr, wrong");
456         return -1;
457     }
458 
459     if (thread->HasFlag(MicroThread::IO_LIST))
460     {
461         MtFrame* frame = MtFrame::Instance();
462         frame->RemoveIoWait(thread);
463         frame->InsertRunable(thread);
464     }
465 
466     return 0;
467 }
468 
469 int KqueuerObj::OutputNotify()
470 {
471     MicroThread* thread = this->GetOwnerThread();
472     if (NULL == thread)
473     {
474         kqueue_assert(0);
475         MTLOG_ERROR("kqueue fd obj, no thread ptr, wrong");
476         return -1;
477     }
478 
479     // Multiple events arrive at the same time
480     if (thread->HasFlag(MicroThread::IO_LIST))
481     {
482         MtFrame* frame = MtFrame::Instance();
483         frame->RemoveIoWait(thread);
484         frame->InsertRunable(thread);
485     }
486 
487     return 0;
488 }
489 
490 int KqueuerObj::HangupNotify()
491 {
492     MtFrame* frame = MtFrame::Instance();
493     frame->KqueueCtrlDel(this->GetOsfd(), this->GetEvents());
494     return 0;
495 }
496 
497 int KqueuerObj::KqueueCtlAdd(void* args)
498 {
499     MtFrame* frame = MtFrame::Instance();
500     KqFdRef* fd_ref = (KqFdRef*)args;
501     kqueue_assert(fd_ref != NULL);
502 
503     int osfd = this->GetOsfd();
504     int new_events = this->GetEvents();
505 
506     // Notify object needs updating
507     KqueuerObj* old_obj = fd_ref->GetNotifyObj();
508     if ((old_obj != NULL) && (old_obj != this))
509     {
510         MTLOG_ERROR("kqfd ref conflict, fd: %d, old: %p, now: %p", osfd, old_obj, this);
511         return -1;
512     }
513     fd_ref->SetNotifyObj(this);
514 
515     if (!frame->KqueueCtrlAdd(osfd, new_events))
516     {
517         MTLOG_ERROR("kqfd ref add failed, log");
518         fd_ref->SetNotifyObj(old_obj);
519         return -2;
520     }
521 
522     return 0;
523 }
524 
525 int KqueuerObj::KqueueCtlDel(void* args)
526 {
527     MtFrame* frame = MtFrame::Instance();
528     KqFdRef* fd_ref = (KqFdRef*)args;
529     kqueue_assert(fd_ref != NULL);
530 
531     int osfd = this->GetOsfd();
532     int events = this->GetEvents();
533 
534     KqueuerObj* old_obj = fd_ref->GetNotifyObj();
535     if (old_obj != this)
536     {
537         MTLOG_ERROR("kqfd ref conflict, fd: %d, old: %p, now: %p", osfd, old_obj, this);
538         return -1;
539     }
540     fd_ref->SetNotifyObj(NULL);
541 
542     if (!frame->KqueueCtrlDelRef(osfd, events, false))
543     {
544         MTLOG_ERROR("kqfd ref del failed, log");
545         fd_ref->SetNotifyObj(old_obj);
546         return -2;
547     }
548 
549     return 0;
550 
551 }
552 
553