1 /*
2 * work_thread.c - threads implementation for blocking worker child.
3 */
4 #include <config.h>
5 #include "ntp_workimpl.h"
6
7 #ifdef WORK_THREAD
8
9 #include <stdio.h>
10 #include <ctype.h>
11 #include <signal.h>
12 #ifndef SYS_WINNT
13 #include <pthread.h>
14 #endif
15
16 #include "ntp_stdlib.h"
17 #include "ntp_malloc.h"
18 #include "ntp_syslog.h"
19 #include "ntpd.h"
20 #include "ntp_io.h"
21 #include "ntp_assert.h"
22 #include "ntp_unixtime.h"
23 #include "timespecops.h"
24 #include "ntp_worker.h"
25
26 #define CHILD_EXIT_REQ ((blocking_pipe_header *)(intptr_t)-1)
27 #define CHILD_GONE_RESP CHILD_EXIT_REQ
28 /* Queue size increments:
29 * The request queue grows a bit faster than the response queue -- the
30 * daemon can push requests and pull results faster on avarage than the
31 * worker can process requests and push results... If this really pays
32 * off is debatable.
33 */
34 #define WORKITEMS_ALLOC_INC 16
35 #define RESPONSES_ALLOC_INC 4
36
37 /* Fiddle with min/max stack sizes. 64kB minimum seems to work, so we
38 * set the maximum to 256kB. If the minimum goes below the
39 * system-defined minimum stack size, we have to adjust accordingly.
40 */
41 #ifndef THREAD_MINSTACKSIZE
42 # define THREAD_MINSTACKSIZE (64U * 1024)
43 #endif
44 #ifndef __sun
45 #if defined(PTHREAD_STACK_MIN) && THREAD_MINSTACKSIZE < PTHREAD_STACK_MIN
46 # undef THREAD_MINSTACKSIZE
47 # define THREAD_MINSTACKSIZE PTHREAD_STACK_MIN
48 #endif
49 #endif
50
51 #ifndef THREAD_MAXSTACKSIZE
52 # define THREAD_MAXSTACKSIZE (256U * 1024)
53 #endif
54 #if THREAD_MAXSTACKSIZE < THREAD_MINSTACKSIZE
55 # undef THREAD_MAXSTACKSIZE
56 # define THREAD_MAXSTACKSIZE THREAD_MINSTACKSIZE
57 #endif
58
59 /* need a good integer to store a pointer... */
60 #ifndef UINTPTR_T
61 # if defined(UINTPTR_MAX)
62 # define UINTPTR_T uintptr_t
63 # elif defined(UINT_PTR)
64 # define UINTPTR_T UINT_PTR
65 # else
66 # define UINTPTR_T size_t
67 # endif
68 #endif
69
70
71 #ifdef SYS_WINNT
72
73 # define thread_exit(c) _endthreadex(c)
74 # define tickle_sem(sh) ReleaseSemaphore((sh->shnd), 1, NULL)
75 u_int WINAPI blocking_thread(void *);
76 static BOOL same_os_sema(const sem_ref obj, void * osobj);
77
78 #else
79
80 # define thread_exit(c) pthread_exit((void*)(UINTPTR_T)(c))
81 # define tickle_sem sem_post
82 void * blocking_thread(void *);
83 static void block_thread_signals(sigset_t *);
84
85 #endif
86
87 #ifdef WORK_PIPE
88 addremove_io_fd_func addremove_io_fd;
89 #else
90 addremove_io_semaphore_func addremove_io_semaphore;
91 #endif
92
93 static void start_blocking_thread(blocking_child *);
94 static void start_blocking_thread_internal(blocking_child *);
95 static void prepare_child_sems(blocking_child *);
96 static int wait_for_sem(sem_ref, struct timespec *);
97 static int ensure_workitems_empty_slot(blocking_child *);
98 static int ensure_workresp_empty_slot(blocking_child *);
99 static int queue_req_pointer(blocking_child *, blocking_pipe_header *);
100 static void cleanup_after_child(blocking_child *);
101
102 static sema_type worker_mmutex;
103 static sem_ref worker_memlock;
104
105 /* --------------------------------------------------------------------
106 * locking the global worker state table (and other global stuff)
107 */
108 void
worker_global_lock(int inOrOut)109 worker_global_lock(
110 int inOrOut)
111 {
112 if (worker_memlock) {
113 if (inOrOut)
114 wait_for_sem(worker_memlock, NULL);
115 else
116 tickle_sem(worker_memlock);
117 }
118 }
119
120 /* --------------------------------------------------------------------
121 * implementation isolation wrapper
122 */
123 void
exit_worker(int exitcode)124 exit_worker(
125 int exitcode
126 )
127 {
128 thread_exit(exitcode); /* see #define thread_exit */
129 }
130
131 /* --------------------------------------------------------------------
132 * sleep for a given time or until the wakup semaphore is tickled.
133 */
134 int
worker_sleep(blocking_child * c,time_t seconds)135 worker_sleep(
136 blocking_child * c,
137 time_t seconds
138 )
139 {
140 struct timespec until;
141 int rc;
142
143 # ifdef HAVE_CLOCK_GETTIME
144 if (0 != clock_gettime(CLOCK_REALTIME, &until)) {
145 msyslog(LOG_ERR, "worker_sleep: clock_gettime() failed: %m");
146 return -1;
147 }
148 # else
149 if (0 != getclock(TIMEOFDAY, &until)) {
150 msyslog(LOG_ERR, "worker_sleep: getclock() failed: %m");
151 return -1;
152 }
153 # endif
154 until.tv_sec += seconds;
155 rc = wait_for_sem(c->wake_scheduled_sleep, &until);
156 if (0 == rc)
157 return -1;
158 if (-1 == rc && ETIMEDOUT == errno)
159 return 0;
160 msyslog(LOG_ERR, "worker_sleep: sem_timedwait: %m");
161 return -1;
162 }
163
164
165 /* --------------------------------------------------------------------
166 * Wake up a worker that takes a nap.
167 */
168 void
interrupt_worker_sleep(void)169 interrupt_worker_sleep(void)
170 {
171 u_int idx;
172 blocking_child * c;
173
174 for (idx = 0; idx < blocking_children_alloc; idx++) {
175 c = blocking_children[idx];
176 if (NULL == c || NULL == c->wake_scheduled_sleep)
177 continue;
178 tickle_sem(c->wake_scheduled_sleep);
179 }
180 }
181
182 /* --------------------------------------------------------------------
183 * Make sure there is an empty slot at the head of the request
184 * queue. Tell if the queue is currently empty.
185 */
186 static int
ensure_workitems_empty_slot(blocking_child * c)187 ensure_workitems_empty_slot(
188 blocking_child *c
189 )
190 {
191 /*
192 ** !!! PRECONDITION: caller holds access lock!
193 **
194 ** This simply tries to increase the size of the buffer if it
195 ** becomes full. The resize operation does *not* maintain the
196 ** order of requests, but that should be irrelevant since the
197 ** processing is considered asynchronous anyway.
198 **
199 ** Return if the buffer is currently empty.
200 */
201
202 static const size_t each =
203 sizeof(blocking_children[0]->workitems[0]);
204
205 size_t new_alloc;
206 size_t slots_used;
207 size_t sidx;
208
209 slots_used = c->head_workitem - c->tail_workitem;
210 if (slots_used >= c->workitems_alloc) {
211 new_alloc = c->workitems_alloc + WORKITEMS_ALLOC_INC;
212 c->workitems = erealloc(c->workitems, new_alloc * each);
213 for (sidx = c->workitems_alloc; sidx < new_alloc; ++sidx)
214 c->workitems[sidx] = NULL;
215 c->tail_workitem = 0;
216 c->head_workitem = c->workitems_alloc;
217 c->workitems_alloc = new_alloc;
218 }
219 INSIST(NULL == c->workitems[c->head_workitem % c->workitems_alloc]);
220 return (0 == slots_used);
221 }
222
223 /* --------------------------------------------------------------------
224 * Make sure there is an empty slot at the head of the response
225 * queue. Tell if the queue is currently empty.
226 */
227 static int
ensure_workresp_empty_slot(blocking_child * c)228 ensure_workresp_empty_slot(
229 blocking_child *c
230 )
231 {
232 /*
233 ** !!! PRECONDITION: caller holds access lock!
234 **
235 ** Works like the companion function above.
236 */
237
238 static const size_t each =
239 sizeof(blocking_children[0]->responses[0]);
240
241 size_t new_alloc;
242 size_t slots_used;
243 size_t sidx;
244
245 slots_used = c->head_response - c->tail_response;
246 if (slots_used >= c->responses_alloc) {
247 new_alloc = c->responses_alloc + RESPONSES_ALLOC_INC;
248 c->responses = erealloc(c->responses, new_alloc * each);
249 for (sidx = c->responses_alloc; sidx < new_alloc; ++sidx)
250 c->responses[sidx] = NULL;
251 c->tail_response = 0;
252 c->head_response = c->responses_alloc;
253 c->responses_alloc = new_alloc;
254 }
255 INSIST(NULL == c->responses[c->head_response % c->responses_alloc]);
256 return (0 == slots_used);
257 }
258
259
260 /* --------------------------------------------------------------------
261 * queue_req_pointer() - append a work item or idle exit request to
262 * blocking_workitems[]. Employ proper locking.
263 */
264 static int
queue_req_pointer(blocking_child * c,blocking_pipe_header * hdr)265 queue_req_pointer(
266 blocking_child * c,
267 blocking_pipe_header * hdr
268 )
269 {
270 size_t qhead;
271
272 /* >>>> ACCESS LOCKING STARTS >>>> */
273 wait_for_sem(c->accesslock, NULL);
274 ensure_workitems_empty_slot(c);
275 qhead = c->head_workitem;
276 c->workitems[qhead % c->workitems_alloc] = hdr;
277 c->head_workitem = 1 + qhead;
278 tickle_sem(c->accesslock);
279 /* <<<< ACCESS LOCKING ENDS <<<< */
280
281 /* queue consumer wake-up notification */
282 tickle_sem(c->workitems_pending);
283
284 return 0;
285 }
286
287 /* --------------------------------------------------------------------
288 * API function to make sure a worker is running, a proper private copy
289 * of the data is made, the data eneterd into the queue and the worker
290 * is signalled.
291 */
292 int
send_blocking_req_internal(blocking_child * c,blocking_pipe_header * hdr,void * data)293 send_blocking_req_internal(
294 blocking_child * c,
295 blocking_pipe_header * hdr,
296 void * data
297 )
298 {
299 blocking_pipe_header * threadcopy;
300 size_t payload_octets;
301
302 REQUIRE(hdr != NULL);
303 REQUIRE(data != NULL);
304 DEBUG_REQUIRE(BLOCKING_REQ_MAGIC == hdr->magic_sig);
305
306 if (hdr->octets <= sizeof(*hdr))
307 return 1; /* failure */
308 payload_octets = hdr->octets - sizeof(*hdr);
309
310 if (NULL == c->thread_ref)
311 start_blocking_thread(c);
312 threadcopy = emalloc(hdr->octets);
313 memcpy(threadcopy, hdr, sizeof(*hdr));
314 memcpy((char *)threadcopy + sizeof(*hdr), data, payload_octets);
315
316 return queue_req_pointer(c, threadcopy);
317 }
318
319 /* --------------------------------------------------------------------
320 * Wait for the 'incoming queue no longer empty' signal, lock the shared
321 * structure and dequeue an item.
322 */
323 blocking_pipe_header *
receive_blocking_req_internal(blocking_child * c)324 receive_blocking_req_internal(
325 blocking_child * c
326 )
327 {
328 blocking_pipe_header * req;
329 size_t qhead, qtail;
330
331 req = NULL;
332 do {
333 /* wait for tickle from the producer side */
334 wait_for_sem(c->workitems_pending, NULL);
335
336 /* >>>> ACCESS LOCKING STARTS >>>> */
337 wait_for_sem(c->accesslock, NULL);
338 qhead = c->head_workitem;
339 do {
340 qtail = c->tail_workitem;
341 if (qhead == qtail)
342 break;
343 c->tail_workitem = qtail + 1;
344 qtail %= c->workitems_alloc;
345 req = c->workitems[qtail];
346 c->workitems[qtail] = NULL;
347 } while (NULL == req);
348 tickle_sem(c->accesslock);
349 /* <<<< ACCESS LOCKING ENDS <<<< */
350
351 } while (NULL == req);
352
353 INSIST(NULL != req);
354 if (CHILD_EXIT_REQ == req) { /* idled out */
355 send_blocking_resp_internal(c, CHILD_GONE_RESP);
356 req = NULL;
357 }
358
359 return req;
360 }
361
362 /* --------------------------------------------------------------------
363 * Push a response into the return queue and eventually tickle the
364 * receiver.
365 */
366 int
send_blocking_resp_internal(blocking_child * c,blocking_pipe_header * resp)367 send_blocking_resp_internal(
368 blocking_child * c,
369 blocking_pipe_header * resp
370 )
371 {
372 size_t qhead;
373 int empty;
374
375 /* >>>> ACCESS LOCKING STARTS >>>> */
376 wait_for_sem(c->accesslock, NULL);
377 empty = ensure_workresp_empty_slot(c);
378 qhead = c->head_response;
379 c->responses[qhead % c->responses_alloc] = resp;
380 c->head_response = 1 + qhead;
381 tickle_sem(c->accesslock);
382 /* <<<< ACCESS LOCKING ENDS <<<< */
383
384 /* queue consumer wake-up notification */
385 if (empty)
386 {
387 # ifdef WORK_PIPE
388 if (1 != write(c->resp_write_pipe, "", 1))
389 msyslog(LOG_WARNING, "async resolver: %s",
390 "failed to notify main thread!");
391 # else
392 tickle_sem(c->responses_pending);
393 # endif
394 }
395 return 0;
396 }
397
398
399 #ifndef WORK_PIPE
400
401 /* --------------------------------------------------------------------
402 * Check if a (Windows-)hanndle to a semaphore is actually the same we
403 * are using inside the sema wrapper.
404 */
405 static BOOL
same_os_sema(const sem_ref obj,void * osh)406 same_os_sema(
407 const sem_ref obj,
408 void* osh
409 )
410 {
411 return obj && osh && (obj->shnd == (HANDLE)osh);
412 }
413
414 /* --------------------------------------------------------------------
415 * Find the shared context that associates to an OS handle and make sure
416 * the data is dequeued and processed.
417 */
418 void
handle_blocking_resp_sem(void * context)419 handle_blocking_resp_sem(
420 void * context
421 )
422 {
423 blocking_child * c;
424 u_int idx;
425
426 c = NULL;
427 for (idx = 0; idx < blocking_children_alloc; idx++) {
428 c = blocking_children[idx];
429 if (c != NULL &&
430 c->thread_ref != NULL &&
431 same_os_sema(c->responses_pending, context))
432 break;
433 }
434 if (idx < blocking_children_alloc)
435 process_blocking_resp(c);
436 }
437 #endif /* !WORK_PIPE */
438
439 /* --------------------------------------------------------------------
440 * Fetch the next response from the return queue. In case of signalling
441 * via pipe, make sure the pipe is flushed, too.
442 */
443 blocking_pipe_header *
receive_blocking_resp_internal(blocking_child * c)444 receive_blocking_resp_internal(
445 blocking_child * c
446 )
447 {
448 blocking_pipe_header * removed;
449 size_t qhead, qtail, slot;
450
451 #ifdef WORK_PIPE
452 int rc;
453 char scratch[32];
454
455 do
456 rc = read(c->resp_read_pipe, scratch, sizeof(scratch));
457 while (-1 == rc && EINTR == errno);
458 #endif
459
460 /* >>>> ACCESS LOCKING STARTS >>>> */
461 wait_for_sem(c->accesslock, NULL);
462 qhead = c->head_response;
463 qtail = c->tail_response;
464 for (removed = NULL; !removed && (qhead != qtail); ++qtail) {
465 slot = qtail % c->responses_alloc;
466 removed = c->responses[slot];
467 c->responses[slot] = NULL;
468 }
469 c->tail_response = qtail;
470 tickle_sem(c->accesslock);
471 /* <<<< ACCESS LOCKING ENDS <<<< */
472
473 if (NULL != removed) {
474 DEBUG_ENSURE(CHILD_GONE_RESP == removed ||
475 BLOCKING_RESP_MAGIC == removed->magic_sig);
476 }
477 if (CHILD_GONE_RESP == removed) {
478 cleanup_after_child(c);
479 removed = NULL;
480 }
481
482 return removed;
483 }
484
485 /* --------------------------------------------------------------------
486 * Light up a new worker.
487 */
488 static void
start_blocking_thread(blocking_child * c)489 start_blocking_thread(
490 blocking_child * c
491 )
492 {
493
494 DEBUG_INSIST(!c->reusable);
495
496 prepare_child_sems(c);
497 start_blocking_thread_internal(c);
498 }
499
500 /* --------------------------------------------------------------------
501 * Create a worker thread. There are several differences between POSIX
502 * and Windows, of course -- most notably the Windows thread is no
503 * detached thread, and we keep the handle around until we want to get
504 * rid of the thread. The notification scheme also differs: Windows
505 * makes use of semaphores in both directions, POSIX uses a pipe for
506 * integration with 'select()' or alike.
507 */
508 static void
start_blocking_thread_internal(blocking_child * c)509 start_blocking_thread_internal(
510 blocking_child * c
511 )
512 #ifdef SYS_WINNT
513 {
514 BOOL resumed;
515
516 c->thread_ref = NULL;
517 (*addremove_io_semaphore)(c->responses_pending->shnd, FALSE);
518 c->thr_table[0].thnd =
519 (HANDLE)_beginthreadex(
520 NULL,
521 0,
522 &blocking_thread,
523 c,
524 CREATE_SUSPENDED,
525 NULL);
526
527 if (NULL == c->thr_table[0].thnd) {
528 msyslog(LOG_ERR, "start blocking thread failed: %m");
529 exit(-1);
530 }
531 /* remember the thread priority is only within the process class */
532 if (!SetThreadPriority(c->thr_table[0].thnd,
533 THREAD_PRIORITY_BELOW_NORMAL))
534 msyslog(LOG_ERR, "Error lowering blocking thread priority: %m");
535
536 resumed = ResumeThread(c->thr_table[0].thnd);
537 DEBUG_INSIST(resumed);
538 c->thread_ref = &c->thr_table[0];
539 }
540 #else /* pthreads start_blocking_thread_internal() follows */
541 {
542 # ifdef NEED_PTHREAD_INIT
543 static int pthread_init_called;
544 # endif
545 pthread_attr_t thr_attr;
546 int rc;
547 int pipe_ends[2]; /* read then write */
548 int is_pipe;
549 int flags;
550 size_t ostacksize;
551 size_t nstacksize;
552 sigset_t saved_sig_mask;
553
554 c->thread_ref = NULL;
555
556 # ifdef NEED_PTHREAD_INIT
557 /*
558 * from lib/isc/unix/app.c:
559 * BSDI 3.1 seg faults in pthread_sigmask() if we don't do this.
560 */
561 if (!pthread_init_called) {
562 pthread_init();
563 pthread_init_called = TRUE;
564 }
565 # endif
566
567 rc = pipe_socketpair(&pipe_ends[0], &is_pipe);
568 if (0 != rc) {
569 msyslog(LOG_ERR, "start_blocking_thread: pipe_socketpair() %m");
570 exit(1);
571 }
572 c->resp_read_pipe = move_fd(pipe_ends[0]);
573 c->resp_write_pipe = move_fd(pipe_ends[1]);
574 c->ispipe = is_pipe;
575 flags = fcntl(c->resp_read_pipe, F_GETFL, 0);
576 if (-1 == flags) {
577 msyslog(LOG_ERR, "start_blocking_thread: fcntl(F_GETFL) %m");
578 exit(1);
579 }
580 rc = fcntl(c->resp_read_pipe, F_SETFL, O_NONBLOCK | flags);
581 if (-1 == rc) {
582 msyslog(LOG_ERR,
583 "start_blocking_thread: fcntl(F_SETFL, O_NONBLOCK) %m");
584 exit(1);
585 }
586 (*addremove_io_fd)(c->resp_read_pipe, c->ispipe, FALSE);
587 pthread_attr_init(&thr_attr);
588 pthread_attr_setdetachstate(&thr_attr, PTHREAD_CREATE_DETACHED);
589 #if defined(HAVE_PTHREAD_ATTR_GETSTACKSIZE) && \
590 defined(HAVE_PTHREAD_ATTR_SETSTACKSIZE)
591 rc = pthread_attr_getstacksize(&thr_attr, &ostacksize);
592 if (0 != rc) {
593 msyslog(LOG_ERR,
594 "start_blocking_thread: pthread_attr_getstacksize() -> %s",
595 strerror(rc));
596 } else {
597 if (ostacksize < THREAD_MINSTACKSIZE)
598 nstacksize = THREAD_MINSTACKSIZE;
599 else if (ostacksize > THREAD_MAXSTACKSIZE)
600 nstacksize = THREAD_MAXSTACKSIZE;
601 else
602 nstacksize = ostacksize;
603 if (nstacksize != ostacksize)
604 rc = pthread_attr_setstacksize(&thr_attr, nstacksize);
605 if (0 != rc)
606 msyslog(LOG_ERR,
607 "start_blocking_thread: pthread_attr_setstacksize(0x%lx -> 0x%lx) -> %s",
608 (u_long)ostacksize, (u_long)nstacksize,
609 strerror(rc));
610 }
611 #else
612 UNUSED_ARG(nstacksize);
613 UNUSED_ARG(ostacksize);
614 #endif
615 #if defined(PTHREAD_SCOPE_SYSTEM) && defined(NEED_PTHREAD_SCOPE_SYSTEM)
616 pthread_attr_setscope(&thr_attr, PTHREAD_SCOPE_SYSTEM);
617 #endif
618 c->thread_ref = emalloc_zero(sizeof(*c->thread_ref));
619 block_thread_signals(&saved_sig_mask);
620 rc = pthread_create(&c->thr_table[0], &thr_attr,
621 &blocking_thread, c);
622 pthread_sigmask(SIG_SETMASK, &saved_sig_mask, NULL);
623 pthread_attr_destroy(&thr_attr);
624 if (0 != rc) {
625 msyslog(LOG_ERR, "start_blocking_thread: pthread_create() -> %s",
626 strerror(rc));
627 exit(1);
628 }
629 c->thread_ref = &c->thr_table[0];
630 }
631 #endif
632
633 /* --------------------------------------------------------------------
634 * block_thread_signals()
635 *
636 * Temporarily block signals used by ntpd main thread, so that signal
637 * mask inherited by child threads leaves them blocked. Returns prior
638 * active signal mask via pmask, to be restored by the main thread
639 * after pthread_create().
640 */
641 #ifndef SYS_WINNT
642 void
block_thread_signals(sigset_t * pmask)643 block_thread_signals(
644 sigset_t * pmask
645 )
646 {
647 sigset_t block;
648
649 sigemptyset(&block);
650 # ifdef HAVE_SIGNALED_IO
651 # ifdef SIGIO
652 sigaddset(&block, SIGIO);
653 # endif
654 # ifdef SIGPOLL
655 sigaddset(&block, SIGPOLL);
656 # endif
657 # endif /* HAVE_SIGNALED_IO */
658 sigaddset(&block, SIGALRM);
659 sigaddset(&block, MOREDEBUGSIG);
660 sigaddset(&block, LESSDEBUGSIG);
661 # ifdef SIGDIE1
662 sigaddset(&block, SIGDIE1);
663 # endif
664 # ifdef SIGDIE2
665 sigaddset(&block, SIGDIE2);
666 # endif
667 # ifdef SIGDIE3
668 sigaddset(&block, SIGDIE3);
669 # endif
670 # ifdef SIGDIE4
671 sigaddset(&block, SIGDIE4);
672 # endif
673 # ifdef SIGBUS
674 sigaddset(&block, SIGBUS);
675 # endif
676 sigemptyset(pmask);
677 pthread_sigmask(SIG_BLOCK, &block, pmask);
678 }
679 #endif /* !SYS_WINNT */
680
681
682 /* --------------------------------------------------------------------
683 * Create & destroy semaphores. This is sufficiently different between
684 * POSIX and Windows to warrant wrapper functions and close enough to
685 * use the concept of synchronization via semaphore for all platforms.
686 */
687 static sem_ref
create_sema(sema_type * semptr,u_int inival,u_int maxval)688 create_sema(
689 sema_type* semptr,
690 u_int inival,
691 u_int maxval)
692 {
693 #ifdef SYS_WINNT
694
695 long svini, svmax;
696 if (NULL != semptr) {
697 svini = (inival < LONG_MAX)
698 ? (long)inival : LONG_MAX;
699 svmax = (maxval < LONG_MAX && maxval > 0)
700 ? (long)maxval : LONG_MAX;
701 semptr->shnd = CreateSemaphore(NULL, svini, svmax, NULL);
702 if (NULL == semptr->shnd)
703 semptr = NULL;
704 }
705
706 #else
707
708 (void)maxval;
709 if (semptr && sem_init(semptr, FALSE, inival))
710 semptr = NULL;
711
712 #endif
713
714 return semptr;
715 }
716
717 /* ------------------------------------------------------------------ */
718 static sem_ref
delete_sema(sem_ref obj)719 delete_sema(
720 sem_ref obj)
721 {
722
723 # ifdef SYS_WINNT
724
725 if (obj) {
726 if (obj->shnd)
727 CloseHandle(obj->shnd);
728 obj->shnd = NULL;
729 }
730
731 # else
732
733 if (obj)
734 sem_destroy(obj);
735
736 # endif
737
738 return NULL;
739 }
740
741 /* --------------------------------------------------------------------
742 * prepare_child_sems()
743 *
744 * create sync & access semaphores
745 *
746 * All semaphores are cleared, only the access semaphore has 1 unit.
747 * Childs wait on 'workitems_pending', then grabs 'sema_access'
748 * and dequeues jobs. When done, 'sema_access' is given one unit back.
749 *
750 * The producer grabs 'sema_access', manages the queue, restores
751 * 'sema_access' and puts one unit into 'workitems_pending'.
752 *
753 * The story goes the same for the response queue.
754 */
755 static void
prepare_child_sems(blocking_child * c)756 prepare_child_sems(
757 blocking_child *c
758 )
759 {
760 if (NULL == worker_memlock)
761 worker_memlock = create_sema(&worker_mmutex, 1, 1);
762
763 c->accesslock = create_sema(&c->sem_table[0], 1, 1);
764 c->workitems_pending = create_sema(&c->sem_table[1], 0, 0);
765 c->wake_scheduled_sleep = create_sema(&c->sem_table[2], 0, 1);
766 # ifndef WORK_PIPE
767 c->responses_pending = create_sema(&c->sem_table[3], 0, 0);
768 # endif
769 }
770
771 /* --------------------------------------------------------------------
772 * wait for semaphore. Where the wait can be interrupted, it will
773 * internally resume -- When this function returns, there is either no
774 * semaphore at all, a timeout occurred, or the caller could
775 * successfully take a token from the semaphore.
776 *
777 * For untimed wait, not checking the result of this function at all is
778 * definitely an option.
779 */
780 static int
wait_for_sem(sem_ref sem,struct timespec * timeout)781 wait_for_sem(
782 sem_ref sem,
783 struct timespec * timeout /* wall-clock */
784 )
785 #ifdef SYS_WINNT
786 {
787 struct timespec now;
788 struct timespec delta;
789 DWORD msec;
790 DWORD rc;
791
792 if (!(sem && sem->shnd)) {
793 errno = EINVAL;
794 return -1;
795 }
796
797 if (NULL == timeout) {
798 msec = INFINITE;
799 } else {
800 getclock(TIMEOFDAY, &now);
801 delta = sub_tspec(*timeout, now);
802 if (delta.tv_sec < 0) {
803 msec = 0;
804 } else if ((delta.tv_sec + 1) >= (MAXDWORD / 1000)) {
805 msec = INFINITE;
806 } else {
807 msec = 1000 * (DWORD)delta.tv_sec;
808 msec += delta.tv_nsec / (1000 * 1000);
809 }
810 }
811 rc = WaitForSingleObject(sem->shnd, msec);
812 if (WAIT_OBJECT_0 == rc)
813 return 0;
814 if (WAIT_TIMEOUT == rc) {
815 errno = ETIMEDOUT;
816 return -1;
817 }
818 msyslog(LOG_ERR, "WaitForSingleObject unexpected 0x%x", rc);
819 errno = EFAULT;
820 return -1;
821 }
822 #else /* pthreads wait_for_sem() follows */
823 {
824 int rc = -1;
825
826 if (sem) do {
827 if (NULL == timeout)
828 rc = sem_wait(sem);
829 else
830 rc = sem_timedwait(sem, timeout);
831 } while (rc == -1 && errno == EINTR);
832 else
833 errno = EINVAL;
834
835 return rc;
836 }
837 #endif
838
839 /* --------------------------------------------------------------------
840 * blocking_thread - thread functions have WINAPI (aka 'stdcall')
841 * calling conventions under Windows and POSIX-defined signature
842 * otherwise.
843 */
844 #ifdef SYS_WINNT
845 u_int WINAPI
846 #else
847 void *
848 #endif
blocking_thread(void * ThreadArg)849 blocking_thread(
850 void * ThreadArg
851 )
852 {
853 blocking_child *c;
854
855 c = ThreadArg;
856 exit_worker(blocking_child_common(c));
857
858 /* NOTREACHED */
859 return 0;
860 }
861
862 /* --------------------------------------------------------------------
863 * req_child_exit() runs in the parent.
864 *
865 * This function is called from from the idle timer, too, and possibly
866 * without a thread being there any longer. Since we have folded up our
867 * tent in that case and all the semaphores are already gone, we simply
868 * ignore this request in this case.
869 *
870 * Since the existence of the semaphores is controlled exclusively by
871 * the parent, there's no risk of data race here.
872 */
873 int
req_child_exit(blocking_child * c)874 req_child_exit(
875 blocking_child *c
876 )
877 {
878 return (c->accesslock)
879 ? queue_req_pointer(c, CHILD_EXIT_REQ)
880 : 0;
881 }
882
883 /* --------------------------------------------------------------------
884 * cleanup_after_child() runs in parent.
885 */
886 static void
cleanup_after_child(blocking_child * c)887 cleanup_after_child(
888 blocking_child * c
889 )
890 {
891 DEBUG_INSIST(!c->reusable);
892
893 # ifdef SYS_WINNT
894 /* The thread was not created in detached state, so we better
895 * clean up.
896 */
897 if (c->thread_ref && c->thread_ref->thnd) {
898 WaitForSingleObject(c->thread_ref->thnd, INFINITE);
899 INSIST(CloseHandle(c->thread_ref->thnd));
900 c->thread_ref->thnd = NULL;
901 }
902 # endif
903 c->thread_ref = NULL;
904
905 /* remove semaphores and (if signalling vi IO) pipes */
906
907 c->accesslock = delete_sema(c->accesslock);
908 c->workitems_pending = delete_sema(c->workitems_pending);
909 c->wake_scheduled_sleep = delete_sema(c->wake_scheduled_sleep);
910
911 # ifdef WORK_PIPE
912 DEBUG_INSIST(-1 != c->resp_read_pipe);
913 DEBUG_INSIST(-1 != c->resp_write_pipe);
914 (*addremove_io_fd)(c->resp_read_pipe, c->ispipe, TRUE);
915 close(c->resp_write_pipe);
916 close(c->resp_read_pipe);
917 c->resp_write_pipe = -1;
918 c->resp_read_pipe = -1;
919 # else
920 DEBUG_INSIST(NULL != c->responses_pending);
921 (*addremove_io_semaphore)(c->responses_pending->shnd, TRUE);
922 c->responses_pending = delete_sema(c->responses_pending);
923 # endif
924
925 /* Is it necessary to check if there are pending requests and
926 * responses? If so, and if there are, what to do with them?
927 */
928
929 /* re-init buffer index sequencers */
930 c->head_workitem = 0;
931 c->tail_workitem = 0;
932 c->head_response = 0;
933 c->tail_response = 0;
934
935 c->reusable = TRUE;
936 }
937
938
939 #else /* !WORK_THREAD follows */
940 char work_thread_nonempty_compilation_unit;
941 #endif
942