1
2 /**
3 * Tencent is pleased to support the open source community by making MSEC available.
4 *
5 * Copyright (C) 2016 THL A29 Limited, a Tencent company. All rights reserved.
6 *
7 * Licensed under the GNU General Public License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License. You may
9 * obtain a copy of the License at
10 *
11 * https://opensource.org/licenses/GPL-2.0
12 *
13 * Unless required by applicable law or agreed to in writing, software distributed under the
14 * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
15 * either express or implied. See the License for the specific language governing permissions
16 * and limitations under the License.
17 */
18
19
20
21 /**
22 * @filename kqueue_proxy.cpp
23 * @info kqueue for micro thread manage
24 */
25
26 #include "kqueue_proxy.h"
27 #include "micro_thread.h"
28 #include "ff_hook.h"
29
30 using namespace NS_MICRO_THREAD;
31
KqueueProxy()32 KqueueProxy::KqueueProxy()
33 {
34 _maxfd = KqueueProxy::DEFAULT_MAX_FD_NUM;
35 _kqfd = -1;
36 _evtlist = NULL;
37 _kqrefs = NULL;
38 }
39
InitKqueue(int max_num)40 int KqueueProxy::InitKqueue(int max_num)
41 {
42 int rc = 0;
43 if (max_num > _maxfd)
44 {
45 _maxfd = max_num;
46 }
47
48 _kqfd = ff_kqueue();
49 if (_kqfd < 0)
50 {
51 rc = -1;
52 goto EXIT_LABEL;
53 }
54
55 ff_fcntl(_kqfd, F_SETFD, FD_CLOEXEC);
56
57 _kqrefs = new KqFdRef[_maxfd];
58 if (_kqrefs == NULL)
59 {
60 rc = -2;
61 goto EXIT_LABEL;
62 }
63
64 _evtlist = (KqEvent*)calloc(_maxfd, sizeof(KqEvent));
65 if (_evtlist == NULL)
66 {
67 rc = -3;
68 goto EXIT_LABEL;
69 }
70
71 struct rlimit rlim;
72 memset(&rlim, 0, sizeof(rlim));
73 if (getrlimit(RLIMIT_NOFILE, &rlim) == 0)
74 {
75 if ((int)rlim.rlim_max < _maxfd)
76 {
77 rlim.rlim_cur = rlim.rlim_max;
78 setrlimit(RLIMIT_NOFILE, &rlim);
79 rlim.rlim_cur = _maxfd;
80 rlim.rlim_max = _maxfd;
81 setrlimit(RLIMIT_NOFILE, &rlim);
82 }
83 }
84
85 EXIT_LABEL:
86
87 if (rc < 0)
88 {
89 TermKqueue();
90 }
91
92 return rc;
93 }
94
TermKqueue()95 void KqueueProxy::TermKqueue()
96 {
97 if (_kqfd > 0)
98 {
99 close(_kqfd);
100 _kqfd = -1;
101 }
102
103 if (_evtlist != NULL)
104 {
105 free(_evtlist);
106 _evtlist = NULL;
107 }
108
109 if (_kqrefs != NULL)
110 {
111 delete []_kqrefs;
112 _kqrefs = NULL;
113 }
114 }
115
KqueueAdd(KqObjList & obj_list)116 bool KqueueProxy::KqueueAdd(KqObjList& obj_list)
117 {
118 bool ret = true;
119 KqueuerObj *kqobj = NULL;
120 KqueuerObj *kqobj_error = NULL;
121 TAILQ_FOREACH(kqobj, &obj_list, _entry)
122 {
123 if (!KqueueAddObj(kqobj))
124 {
125 MTLOG_ERROR("kqobj add failed, fd: %d", kqobj->GetOsfd());
126 kqueue_assert(0);
127 kqobj_error = kqobj;
128 ret = false;
129 goto EXIT_LABEL;
130 }
131 }
132
133 EXIT_LABEL:
134
135 if (!ret)
136 {
137 TAILQ_FOREACH(kqobj, &obj_list, _entry)
138 {
139 if (kqobj == kqobj_error)
140 {
141 break;
142 }
143 KqueueDelObj(kqobj);
144 }
145 }
146
147 return ret;
148 }
149
KqueueDel(KqObjList & obj_list)150 bool KqueueProxy::KqueueDel(KqObjList& obj_list)
151 {
152 bool ret = true;
153
154 KqueuerObj *kqobj = NULL;
155 TAILQ_FOREACH(kqobj, &obj_list, _entry)
156 {
157 if (!KqueueDelObj(kqobj)) // failed also need continue, be sure ref count ok
158 {
159 MTLOG_ERROR("epobj del failed, fd: %d", kqobj->GetOsfd());
160 kqueue_assert(0);
161 ret = false;
162 }
163 }
164
165 return ret;
166 }
167
KqueueCtrlAdd(int fd,int events)168 bool KqueueProxy::KqueueCtrlAdd(int fd, int events)
169 {
170 KqFdRef* item = KqFdRefGet(fd);
171 if (item == NULL)
172 {
173 MT_ATTR_API(320851, 1); // fd error, wtf?
174 MTLOG_ERROR("kqfd ref not find, failed, fd: %d", fd);
175 kqueue_assert(0);
176 return false;
177 }
178
179 item->AttachEvents(events);
180
181 int old_events = item->GetListenEvents();
182 int new_events = old_events | events;
183 if (old_events == new_events)
184 {
185 return true;
186 }
187
188 KqEvent ke;
189 int ret;
190 if (old_events & KQ_EVENT_WRITE) {
191 EV_SET(&ke, fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL);
192 ret = ff_kevent(_kqfd, &ke, 1, NULL, 0, NULL);
193 if (ret == -1) {
194 // TODO, error check
195 item->DetachEvents(events);
196 kqueue_assert(0);
197 return false;
198 }
199 }
200 if (old_events & KQ_EVENT_READ) {
201 EV_SET(&ke, fd, EVFILT_READ, EV_DELETE, 0, 0, NULL);
202 ret = ff_kevent(_kqfd, &ke, 1, NULL, 0, NULL);
203 if (ret == -1) {
204 // TODO, error check
205 item->DetachEvents(events);
206 kqueue_assert(0);
207 return false;
208 }
209 }
210 if (events & KQ_EVENT_WRITE) {
211 EV_SET(&ke, fd, EVFILT_WRITE, EV_ADD, 0, 0, NULL);
212 ret = ff_kevent(_kqfd, &ke, 1, NULL, 0, NULL);
213 if (ret == -1) {
214 // TODO, error check
215 item->DetachEvents(events);
216 kqueue_assert(0);
217 return false;
218 }
219 }
220 if (events & KQ_EVENT_READ) {
221 EV_SET(&ke, fd, EVFILT_READ, EV_ADD, 0, 0, NULL);
222 ret = ff_kevent(_kqfd, &ke, 1, NULL, 0, NULL);
223 if (ret == -1) {
224 // TODO, error check
225 item->DetachEvents(events);
226 kqueue_assert(0);
227 return false;
228 }
229 }
230
231 item->SetListenEvents(new_events);
232
233 return true;
234 }
235
236
KqueueCtrlDel(int fd,int events)237 bool KqueueProxy::KqueueCtrlDel(int fd, int events)
238 {
239 return KqueueCtrlDelRef(fd, events, false);
240 }
241
KqueueCtrlDelRef(int fd,int events,bool use_ref)242 bool KqueueProxy::KqueueCtrlDelRef(int fd, int events, bool use_ref)
243 {
244 KqFdRef* item = KqFdRefGet(fd);
245 if (item == NULL)
246 {
247 MT_ATTR_API(320851, 1); // fd error
248 MTLOG_ERROR("kqfd ref not find, failed, fd: %d", fd);
249 kqueue_assert(0);
250 return false;
251
252 }
253
254 item->DetachEvents(events);
255 int old_events = item->GetListenEvents();
256 int new_events = old_events &~ events;
257
258 if (use_ref) {
259 new_events = old_events;
260 if (item->ReadRefCnt() == 0) {
261 new_events = new_events & ~KQ_EVENT_READ;
262 }
263 if (item->WriteRefCnt() == 0) {
264 new_events = new_events & ~KQ_EVENT_WRITE;
265 }
266 }
267
268 if (old_events == new_events)
269 {
270 return true;
271 }
272 KqEvent ke;
273 int ret;
274 if (old_events & KQ_EVENT_WRITE) {
275 EV_SET(&ke, fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL);
276 ret = ff_kevent(_kqfd, &ke, 1, NULL, 0, NULL);
277 if (ret == -1) {
278 kqueue_assert(0);
279 return false;
280 }
281 }
282 if (old_events & KQ_EVENT_READ) {
283 EV_SET(&ke, fd, EVFILT_READ, EV_DELETE, 0, 0, NULL);
284 ret = ff_kevent(_kqfd, &ke, 1, NULL, 0, NULL);
285 if (ret == -1) {
286 kqueue_assert(0);
287 return false;
288 }
289 }
290
291 if (new_events & KQ_EVENT_WRITE) {
292 EV_SET(&ke, fd, EVFILT_WRITE, EV_ADD, 0, 0, NULL);
293 ret = ff_kevent(_kqfd, &ke, 1, NULL, 0, NULL);
294 if (ret == -1) {
295 kqueue_assert(0);
296 return false;
297 }
298 }
299 if (new_events & KQ_EVENT_READ) {
300 EV_SET(&ke, fd, EVFILT_READ, EV_ADD, 0, 0, NULL);
301 ret = ff_kevent(_kqfd, &ke, 1, NULL, 0, NULL);
302 if (ret == -1) {
303 kqueue_assert(0);
304 return false;
305 }
306 }
307
308 item->SetListenEvents(new_events);
309
310 return true;
311 }
312
KqueueAddObj(KqueuerObj * obj)313 bool KqueueProxy::KqueueAddObj(KqueuerObj* obj)
314 {
315 if (obj == NULL)
316 {
317 MTLOG_ERROR("kqobj input invalid, %p", obj);
318 return false;
319 }
320
321 KqFdRef* item = KqFdRefGet(obj->GetOsfd());
322 if (item == NULL)
323 {
324 MT_ATTR_API(320851, 1); // fd error
325 MTLOG_ERROR("kqfd ref not find, failed, fd: %d", obj->GetOsfd());
326 kqueue_assert(0);
327 return false;
328 }
329
330 int ret = obj->KqueueCtlAdd(item);
331 if (ret < 0) {
332 MTLOG_ERROR("kqueue ctrl callback failed, fd: %d, obj: %p", obj->GetOsfd(), obj);
333 kqueue_assert(0);
334 return false;
335 }
336
337 return true;
338 }
339
KqueueDelObj(KqueuerObj * obj)340 bool KqueueProxy::KqueueDelObj(KqueuerObj* obj)
341 {
342 if (obj == NULL)
343 {
344 MTLOG_ERROR("kqobj input invalid, %p", obj);
345 return false;
346 }
347 KqFdRef* item = KqFdRefGet(obj->GetOsfd());
348 if (item == NULL)
349 {
350 MT_ATTR_API(320851, 1); // fd error
351 MTLOG_ERROR("kqfd ref not find, failed, fd: %d", obj->GetOsfd());
352 kqueue_assert(0);
353 return false;
354 }
355
356 int ret = obj->KqueueCtlDel(item);
357 if (ret < 0) {
358 MTLOG_ERROR("kqueue ctrl callback failed, fd: %d, obj: %p", obj->GetOsfd(), obj);
359 kqueue_assert(0);
360 return false;
361 }
362
363 return true;
364 }
365
KqueueRcvEventList(int evtfdnum)366 void KqueueProxy::KqueueRcvEventList(int evtfdnum)
367 {
368 int ret = 0;
369 int osfd = 0;
370 int revents = 0;
371 int tmp_evts = 0;
372 KqFdRef* item = NULL;
373 KqueuerObj* obj = NULL;
374
375 for (int i = 0; i < evtfdnum; i++)
376 {
377 osfd = _evtlist[i].ident;
378
379 item = KqFdRefGet(osfd);
380 if (item == NULL)
381 {
382 MT_ATTR_API(320851, 1); // fd error
383 MTLOG_ERROR("kqfd ref not find, failed, fd: %d", osfd);
384 kqueue_assert(0);
385 continue;
386 }
387 tmp_evts = _evtlist[i].filter;
388 if (tmp_evts == EVFILT_READ) {
389 revents |= KQ_EVENT_READ;
390 }
391 if (tmp_evts == EVFILT_WRITE) {
392 revents |= KQ_EVENT_WRITE;
393 }
394 obj = item->GetNotifyObj();
395 if (obj == NULL)
396 {
397 MTLOG_ERROR("fd notify obj null, failed, fd: %d", osfd);
398 KqueueCtrlDel(osfd, (revents & (KQ_EVENT_READ | KQ_EVENT_WRITE)));
399 continue;
400 }
401 obj->SetRcvEvents(revents);
402
403 if (tmp_evts == EV_ERROR)
404 {
405 obj->HangupNotify();
406 continue;
407 }
408
409 if (revents & KQ_EVENT_READ)
410 {
411 ret = obj->InputNotify();
412 if (ret != 0)
413 {
414 continue;
415 }
416 }
417
418 if (revents & KQ_EVENT_WRITE)
419 {
420 ret = obj->OutputNotify();
421 if (ret != 0)
422 {
423 continue;
424 }
425 }
426 }
427 }
428
KqueueDispatch()429 void KqueueProxy::KqueueDispatch()
430 {
431 int nfd;
432 int wait_time = KqueueGetTimeout();
433 if (wait_time) {
434 struct timespec ts;
435 ts.tv_sec = wait_time / 1000;
436 ts.tv_nsec = 0;
437 nfd = ff_kevent(_kqfd, NULL, 0, _evtlist, _maxfd, &ts);
438 } else {
439 nfd = ff_kevent(_kqfd, NULL, 0, _evtlist, _maxfd, NULL);
440 }
441 if (nfd <= 0)
442 {
443 return;
444 }
445
446 KqueueRcvEventList(nfd);
447 }
448
InputNotify()449 int KqueuerObj::InputNotify()
450 {
451 MicroThread* thread = this->GetOwnerThread();
452 if (thread == NULL)
453 {
454 kqueue_assert(0);
455 MTLOG_ERROR("kqueue fd obj, no thread ptr, wrong");
456 return -1;
457 }
458
459 if (thread->HasFlag(MicroThread::IO_LIST))
460 {
461 MtFrame* frame = MtFrame::Instance();
462 frame->RemoveIoWait(thread);
463 frame->InsertRunable(thread);
464 }
465
466 return 0;
467 }
468
OutputNotify()469 int KqueuerObj::OutputNotify()
470 {
471 MicroThread* thread = this->GetOwnerThread();
472 if (NULL == thread)
473 {
474 kqueue_assert(0);
475 MTLOG_ERROR("kqueue fd obj, no thread ptr, wrong");
476 return -1;
477 }
478
479 // Multiple events arrive at the same time
480 if (thread->HasFlag(MicroThread::IO_LIST))
481 {
482 MtFrame* frame = MtFrame::Instance();
483 frame->RemoveIoWait(thread);
484 frame->InsertRunable(thread);
485 }
486
487 return 0;
488 }
489
HangupNotify()490 int KqueuerObj::HangupNotify()
491 {
492 MtFrame* frame = MtFrame::Instance();
493 frame->KqueueCtrlDel(this->GetOsfd(), this->GetEvents());
494 return 0;
495 }
496
KqueueCtlAdd(void * args)497 int KqueuerObj::KqueueCtlAdd(void* args)
498 {
499 MtFrame* frame = MtFrame::Instance();
500 KqFdRef* fd_ref = (KqFdRef*)args;
501 kqueue_assert(fd_ref != NULL);
502
503 int osfd = this->GetOsfd();
504 int new_events = this->GetEvents();
505
506 // Notify object needs updating
507 KqueuerObj* old_obj = fd_ref->GetNotifyObj();
508 if ((old_obj != NULL) && (old_obj != this))
509 {
510 MTLOG_ERROR("kqfd ref conflict, fd: %d, old: %p, now: %p", osfd, old_obj, this);
511 return -1;
512 }
513 fd_ref->SetNotifyObj(this);
514
515 if (!frame->KqueueCtrlAdd(osfd, new_events))
516 {
517 MTLOG_ERROR("kqfd ref add failed, log");
518 fd_ref->SetNotifyObj(old_obj);
519 return -2;
520 }
521
522 return 0;
523 }
524
KqueueCtlDel(void * args)525 int KqueuerObj::KqueueCtlDel(void* args)
526 {
527 MtFrame* frame = MtFrame::Instance();
528 KqFdRef* fd_ref = (KqFdRef*)args;
529 kqueue_assert(fd_ref != NULL);
530
531 int osfd = this->GetOsfd();
532 int events = this->GetEvents();
533
534 KqueuerObj* old_obj = fd_ref->GetNotifyObj();
535 if (old_obj != this)
536 {
537 MTLOG_ERROR("kqfd ref conflict, fd: %d, old: %p, now: %p", osfd, old_obj, this);
538 return -1;
539 }
540 fd_ref->SetNotifyObj(NULL);
541
542 if (!frame->KqueueCtrlDelRef(osfd, events, false))
543 {
544 MTLOG_ERROR("kqfd ref del failed, log");
545 fd_ref->SetNotifyObj(old_obj);
546 return -2;
547 }
548
549 return 0;
550
551 }
552
553