1
2 /*
3 * Copyright (C) Nginx, Inc.
4 * Copyright (C) Valentin V. Bartenev
5 * Copyright (C) Ruslan Ermilov
6 */
7
8
9 #include <ngx_config.h>
10 #include <ngx_core.h>
11 #include <ngx_thread_pool.h>
12
13
14 typedef struct {
15 ngx_array_t pools;
16 } ngx_thread_pool_conf_t;
17
18
19 typedef struct {
20 ngx_thread_task_t *first;
21 ngx_thread_task_t **last;
22 } ngx_thread_pool_queue_t;
23
24 #define ngx_thread_pool_queue_init(q) \
25 (q)->first = NULL; \
26 (q)->last = &(q)->first
27
28
29 struct ngx_thread_pool_s {
30 ngx_thread_mutex_t mtx;
31 ngx_thread_pool_queue_t queue;
32 ngx_int_t waiting;
33 ngx_thread_cond_t cond;
34
35 ngx_log_t *log;
36
37 ngx_str_t name;
38 ngx_uint_t threads;
39 ngx_int_t max_queue;
40
41 u_char *file;
42 ngx_uint_t line;
43 };
44
45
46 static ngx_int_t ngx_thread_pool_init(ngx_thread_pool_t *tp, ngx_log_t *log,
47 ngx_pool_t *pool);
48 static void ngx_thread_pool_destroy(ngx_thread_pool_t *tp);
49 static void ngx_thread_pool_exit_handler(void *data, ngx_log_t *log);
50
51 static void *ngx_thread_pool_cycle(void *data);
52 static void ngx_thread_pool_handler(ngx_event_t *ev);
53
54 static char *ngx_thread_pool(ngx_conf_t *cf, ngx_command_t *cmd, void *conf);
55
56 static void *ngx_thread_pool_create_conf(ngx_cycle_t *cycle);
57 static char *ngx_thread_pool_init_conf(ngx_cycle_t *cycle, void *conf);
58
59 static ngx_int_t ngx_thread_pool_init_worker(ngx_cycle_t *cycle);
60 static void ngx_thread_pool_exit_worker(ngx_cycle_t *cycle);
61
62
63 static ngx_command_t ngx_thread_pool_commands[] = {
64
65 { ngx_string("thread_pool"),
66 NGX_MAIN_CONF|NGX_DIRECT_CONF|NGX_CONF_TAKE23,
67 ngx_thread_pool,
68 0,
69 0,
70 NULL },
71
72 ngx_null_command
73 };
74
75
76 static ngx_core_module_t ngx_thread_pool_module_ctx = {
77 ngx_string("thread_pool"),
78 ngx_thread_pool_create_conf,
79 ngx_thread_pool_init_conf
80 };
81
82
83 ngx_module_t ngx_thread_pool_module = {
84 NGX_MODULE_V1,
85 &ngx_thread_pool_module_ctx, /* module context */
86 ngx_thread_pool_commands, /* module directives */
87 NGX_CORE_MODULE, /* module type */
88 NULL, /* init master */
89 NULL, /* init module */
90 ngx_thread_pool_init_worker, /* init process */
91 NULL, /* init thread */
92 NULL, /* exit thread */
93 ngx_thread_pool_exit_worker, /* exit process */
94 NULL, /* exit master */
95 NGX_MODULE_V1_PADDING
96 };
97
98
99 static ngx_str_t ngx_thread_pool_default = ngx_string("default");
100
101 static ngx_uint_t ngx_thread_pool_task_id;
102 static ngx_atomic_t ngx_thread_pool_done_lock;
103 static ngx_thread_pool_queue_t ngx_thread_pool_done;
104
105
106 static ngx_int_t
ngx_thread_pool_init(ngx_thread_pool_t * tp,ngx_log_t * log,ngx_pool_t * pool)107 ngx_thread_pool_init(ngx_thread_pool_t *tp, ngx_log_t *log, ngx_pool_t *pool)
108 {
109 int err;
110 pthread_t tid;
111 ngx_uint_t n;
112 pthread_attr_t attr;
113
114 if (ngx_notify == NULL) {
115 ngx_log_error(NGX_LOG_ALERT, log, 0,
116 "the configured event method cannot be used with thread pools");
117 return NGX_ERROR;
118 }
119
120 ngx_thread_pool_queue_init(&tp->queue);
121
122 if (ngx_thread_mutex_create(&tp->mtx, log) != NGX_OK) {
123 return NGX_ERROR;
124 }
125
126 if (ngx_thread_cond_create(&tp->cond, log) != NGX_OK) {
127 (void) ngx_thread_mutex_destroy(&tp->mtx, log);
128 return NGX_ERROR;
129 }
130
131 tp->log = log;
132
133 err = pthread_attr_init(&attr);
134 if (err) {
135 ngx_log_error(NGX_LOG_ALERT, log, err,
136 "pthread_attr_init() failed");
137 return NGX_ERROR;
138 }
139
140 err = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
141 if (err) {
142 ngx_log_error(NGX_LOG_ALERT, log, err,
143 "pthread_attr_setdetachstate() failed");
144 return NGX_ERROR;
145 }
146
147 #if 0
148 err = pthread_attr_setstacksize(&attr, PTHREAD_STACK_MIN);
149 if (err) {
150 ngx_log_error(NGX_LOG_ALERT, log, err,
151 "pthread_attr_setstacksize() failed");
152 return NGX_ERROR;
153 }
154 #endif
155
156 for (n = 0; n < tp->threads; n++) {
157 err = pthread_create(&tid, &attr, ngx_thread_pool_cycle, tp);
158 if (err) {
159 ngx_log_error(NGX_LOG_ALERT, log, err,
160 "pthread_create() failed");
161 return NGX_ERROR;
162 }
163 }
164
165 (void) pthread_attr_destroy(&attr);
166
167 return NGX_OK;
168 }
169
170
171 static void
ngx_thread_pool_destroy(ngx_thread_pool_t * tp)172 ngx_thread_pool_destroy(ngx_thread_pool_t *tp)
173 {
174 ngx_uint_t n;
175 ngx_thread_task_t task;
176 volatile ngx_uint_t lock;
177
178 ngx_memzero(&task, sizeof(ngx_thread_task_t));
179
180 task.handler = ngx_thread_pool_exit_handler;
181 task.ctx = (void *) &lock;
182
183 for (n = 0; n < tp->threads; n++) {
184 lock = 1;
185
186 if (ngx_thread_task_post(tp, &task) != NGX_OK) {
187 return;
188 }
189
190 while (lock) {
191 ngx_sched_yield();
192 }
193
194 task.event.active = 0;
195 }
196
197 (void) ngx_thread_cond_destroy(&tp->cond, tp->log);
198
199 (void) ngx_thread_mutex_destroy(&tp->mtx, tp->log);
200 }
201
202
203 static void
ngx_thread_pool_exit_handler(void * data,ngx_log_t * log)204 ngx_thread_pool_exit_handler(void *data, ngx_log_t *log)
205 {
206 ngx_uint_t *lock = data;
207
208 *lock = 0;
209
210 pthread_exit(0);
211 }
212
213
214 ngx_thread_task_t *
ngx_thread_task_alloc(ngx_pool_t * pool,size_t size)215 ngx_thread_task_alloc(ngx_pool_t *pool, size_t size)
216 {
217 ngx_thread_task_t *task;
218
219 task = ngx_pcalloc(pool, sizeof(ngx_thread_task_t) + size);
220 if (task == NULL) {
221 return NULL;
222 }
223
224 task->ctx = task + 1;
225
226 return task;
227 }
228
229
230 ngx_int_t
ngx_thread_task_post(ngx_thread_pool_t * tp,ngx_thread_task_t * task)231 ngx_thread_task_post(ngx_thread_pool_t *tp, ngx_thread_task_t *task)
232 {
233 if (task->event.active) {
234 ngx_log_error(NGX_LOG_ALERT, tp->log, 0,
235 "task #%ui already active", task->id);
236 return NGX_ERROR;
237 }
238
239 if (ngx_thread_mutex_lock(&tp->mtx, tp->log) != NGX_OK) {
240 return NGX_ERROR;
241 }
242
243 if (tp->waiting >= tp->max_queue) {
244 (void) ngx_thread_mutex_unlock(&tp->mtx, tp->log);
245
246 ngx_log_error(NGX_LOG_ERR, tp->log, 0,
247 "thread pool \"%V\" queue overflow: %i tasks waiting",
248 &tp->name, tp->waiting);
249 return NGX_ERROR;
250 }
251
252 task->event.active = 1;
253
254 task->id = ngx_thread_pool_task_id++;
255 task->next = NULL;
256
257 if (ngx_thread_cond_signal(&tp->cond, tp->log) != NGX_OK) {
258 (void) ngx_thread_mutex_unlock(&tp->mtx, tp->log);
259 return NGX_ERROR;
260 }
261
262 *tp->queue.last = task;
263 tp->queue.last = &task->next;
264
265 tp->waiting++;
266
267 (void) ngx_thread_mutex_unlock(&tp->mtx, tp->log);
268
269 ngx_log_debug2(NGX_LOG_DEBUG_CORE, tp->log, 0,
270 "task #%ui added to thread pool \"%V\"",
271 task->id, &tp->name);
272
273 return NGX_OK;
274 }
275
276
277 static void *
ngx_thread_pool_cycle(void * data)278 ngx_thread_pool_cycle(void *data)
279 {
280 ngx_thread_pool_t *tp = data;
281
282 int err;
283 sigset_t set;
284 ngx_thread_task_t *task;
285
286 #if 0
287 ngx_time_update();
288 #endif
289
290 ngx_log_debug1(NGX_LOG_DEBUG_CORE, tp->log, 0,
291 "thread in pool \"%V\" started", &tp->name);
292
293 sigfillset(&set);
294
295 sigdelset(&set, SIGILL);
296 sigdelset(&set, SIGFPE);
297 sigdelset(&set, SIGSEGV);
298 sigdelset(&set, SIGBUS);
299
300 err = pthread_sigmask(SIG_BLOCK, &set, NULL);
301 if (err) {
302 ngx_log_error(NGX_LOG_ALERT, tp->log, err, "pthread_sigmask() failed");
303 return NULL;
304 }
305
306 for ( ;; ) {
307 if (ngx_thread_mutex_lock(&tp->mtx, tp->log) != NGX_OK) {
308 return NULL;
309 }
310
311 /* the number may become negative */
312 tp->waiting--;
313
314 while (tp->queue.first == NULL) {
315 if (ngx_thread_cond_wait(&tp->cond, &tp->mtx, tp->log)
316 != NGX_OK)
317 {
318 (void) ngx_thread_mutex_unlock(&tp->mtx, tp->log);
319 return NULL;
320 }
321 }
322
323 task = tp->queue.first;
324 tp->queue.first = task->next;
325
326 if (tp->queue.first == NULL) {
327 tp->queue.last = &tp->queue.first;
328 }
329
330 if (ngx_thread_mutex_unlock(&tp->mtx, tp->log) != NGX_OK) {
331 return NULL;
332 }
333
334 #if 0
335 ngx_time_update();
336 #endif
337
338 ngx_log_debug2(NGX_LOG_DEBUG_CORE, tp->log, 0,
339 "run task #%ui in thread pool \"%V\"",
340 task->id, &tp->name);
341
342 task->handler(task->ctx, tp->log);
343
344 ngx_log_debug2(NGX_LOG_DEBUG_CORE, tp->log, 0,
345 "complete task #%ui in thread pool \"%V\"",
346 task->id, &tp->name);
347
348 task->next = NULL;
349
350 ngx_spinlock(&ngx_thread_pool_done_lock, 1, 2048);
351
352 *ngx_thread_pool_done.last = task;
353 ngx_thread_pool_done.last = &task->next;
354
355 ngx_memory_barrier();
356
357 ngx_unlock(&ngx_thread_pool_done_lock);
358
359 (void) ngx_notify(ngx_thread_pool_handler);
360 }
361 }
362
363
364 static void
ngx_thread_pool_handler(ngx_event_t * ev)365 ngx_thread_pool_handler(ngx_event_t *ev)
366 {
367 ngx_event_t *event;
368 ngx_thread_task_t *task;
369
370 ngx_log_debug0(NGX_LOG_DEBUG_CORE, ev->log, 0, "thread pool handler");
371
372 ngx_spinlock(&ngx_thread_pool_done_lock, 1, 2048);
373
374 task = ngx_thread_pool_done.first;
375 ngx_thread_pool_done.first = NULL;
376 ngx_thread_pool_done.last = &ngx_thread_pool_done.first;
377
378 ngx_memory_barrier();
379
380 ngx_unlock(&ngx_thread_pool_done_lock);
381
382 while (task) {
383 ngx_log_debug1(NGX_LOG_DEBUG_CORE, ev->log, 0,
384 "run completion handler for task #%ui", task->id);
385
386 event = &task->event;
387 task = task->next;
388
389 event->complete = 1;
390 event->active = 0;
391
392 event->handler(event);
393 }
394 }
395
396
397 static void *
ngx_thread_pool_create_conf(ngx_cycle_t * cycle)398 ngx_thread_pool_create_conf(ngx_cycle_t *cycle)
399 {
400 ngx_thread_pool_conf_t *tcf;
401
402 tcf = ngx_pcalloc(cycle->pool, sizeof(ngx_thread_pool_conf_t));
403 if (tcf == NULL) {
404 return NULL;
405 }
406
407 if (ngx_array_init(&tcf->pools, cycle->pool, 4,
408 sizeof(ngx_thread_pool_t *))
409 != NGX_OK)
410 {
411 return NULL;
412 }
413
414 return tcf;
415 }
416
417
418 static char *
ngx_thread_pool_init_conf(ngx_cycle_t * cycle,void * conf)419 ngx_thread_pool_init_conf(ngx_cycle_t *cycle, void *conf)
420 {
421 ngx_thread_pool_conf_t *tcf = conf;
422
423 ngx_uint_t i;
424 ngx_thread_pool_t **tpp;
425
426 tpp = tcf->pools.elts;
427
428 for (i = 0; i < tcf->pools.nelts; i++) {
429
430 if (tpp[i]->threads) {
431 continue;
432 }
433
434 if (tpp[i]->name.len == ngx_thread_pool_default.len
435 && ngx_strncmp(tpp[i]->name.data, ngx_thread_pool_default.data,
436 ngx_thread_pool_default.len)
437 == 0)
438 {
439 tpp[i]->threads = 32;
440 tpp[i]->max_queue = 65536;
441 continue;
442 }
443
444 ngx_log_error(NGX_LOG_EMERG, cycle->log, 0,
445 "unknown thread pool \"%V\" in %s:%ui",
446 &tpp[i]->name, tpp[i]->file, tpp[i]->line);
447
448 return NGX_CONF_ERROR;
449 }
450
451 return NGX_CONF_OK;
452 }
453
454
455 static char *
ngx_thread_pool(ngx_conf_t * cf,ngx_command_t * cmd,void * conf)456 ngx_thread_pool(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
457 {
458 ngx_str_t *value;
459 ngx_uint_t i;
460 ngx_thread_pool_t *tp;
461
462 value = cf->args->elts;
463
464 tp = ngx_thread_pool_add(cf, &value[1]);
465
466 if (tp == NULL) {
467 return NGX_CONF_ERROR;
468 }
469
470 if (tp->threads) {
471 ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
472 "duplicate thread pool \"%V\"", &tp->name);
473 return NGX_CONF_ERROR;
474 }
475
476 tp->max_queue = 65536;
477
478 for (i = 2; i < cf->args->nelts; i++) {
479
480 if (ngx_strncmp(value[i].data, "threads=", 8) == 0) {
481
482 tp->threads = ngx_atoi(value[i].data + 8, value[i].len - 8);
483
484 if (tp->threads == (ngx_uint_t) NGX_ERROR || tp->threads == 0) {
485 ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
486 "invalid threads value \"%V\"", &value[i]);
487 return NGX_CONF_ERROR;
488 }
489
490 continue;
491 }
492
493 if (ngx_strncmp(value[i].data, "max_queue=", 10) == 0) {
494
495 tp->max_queue = ngx_atoi(value[i].data + 10, value[i].len - 10);
496
497 if (tp->max_queue == NGX_ERROR) {
498 ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
499 "invalid max_queue value \"%V\"", &value[i]);
500 return NGX_CONF_ERROR;
501 }
502
503 continue;
504 }
505 }
506
507 if (tp->threads == 0) {
508 ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
509 "\"%V\" must have \"threads\" parameter",
510 &cmd->name);
511 return NGX_CONF_ERROR;
512 }
513
514 return NGX_CONF_OK;
515 }
516
517
518 ngx_thread_pool_t *
ngx_thread_pool_add(ngx_conf_t * cf,ngx_str_t * name)519 ngx_thread_pool_add(ngx_conf_t *cf, ngx_str_t *name)
520 {
521 ngx_thread_pool_t *tp, **tpp;
522 ngx_thread_pool_conf_t *tcf;
523
524 if (name == NULL) {
525 name = &ngx_thread_pool_default;
526 }
527
528 tp = ngx_thread_pool_get(cf->cycle, name);
529
530 if (tp) {
531 return tp;
532 }
533
534 tp = ngx_pcalloc(cf->pool, sizeof(ngx_thread_pool_t));
535 if (tp == NULL) {
536 return NULL;
537 }
538
539 tp->name = *name;
540 tp->file = cf->conf_file->file.name.data;
541 tp->line = cf->conf_file->line;
542
543 tcf = (ngx_thread_pool_conf_t *) ngx_get_conf(cf->cycle->conf_ctx,
544 ngx_thread_pool_module);
545
546 tpp = ngx_array_push(&tcf->pools);
547 if (tpp == NULL) {
548 return NULL;
549 }
550
551 *tpp = tp;
552
553 return tp;
554 }
555
556
557 ngx_thread_pool_t *
ngx_thread_pool_get(ngx_cycle_t * cycle,ngx_str_t * name)558 ngx_thread_pool_get(ngx_cycle_t *cycle, ngx_str_t *name)
559 {
560 ngx_uint_t i;
561 ngx_thread_pool_t **tpp;
562 ngx_thread_pool_conf_t *tcf;
563
564 tcf = (ngx_thread_pool_conf_t *) ngx_get_conf(cycle->conf_ctx,
565 ngx_thread_pool_module);
566
567 tpp = tcf->pools.elts;
568
569 for (i = 0; i < tcf->pools.nelts; i++) {
570
571 if (tpp[i]->name.len == name->len
572 && ngx_strncmp(tpp[i]->name.data, name->data, name->len) == 0)
573 {
574 return tpp[i];
575 }
576 }
577
578 return NULL;
579 }
580
581
582 static ngx_int_t
ngx_thread_pool_init_worker(ngx_cycle_t * cycle)583 ngx_thread_pool_init_worker(ngx_cycle_t *cycle)
584 {
585 ngx_uint_t i;
586 ngx_thread_pool_t **tpp;
587 ngx_thread_pool_conf_t *tcf;
588
589 if (ngx_process != NGX_PROCESS_WORKER
590 && ngx_process != NGX_PROCESS_SINGLE)
591 {
592 return NGX_OK;
593 }
594
595 tcf = (ngx_thread_pool_conf_t *) ngx_get_conf(cycle->conf_ctx,
596 ngx_thread_pool_module);
597
598 if (tcf == NULL) {
599 return NGX_OK;
600 }
601
602 ngx_thread_pool_queue_init(&ngx_thread_pool_done);
603
604 tpp = tcf->pools.elts;
605
606 for (i = 0; i < tcf->pools.nelts; i++) {
607 if (ngx_thread_pool_init(tpp[i], cycle->log, cycle->pool) != NGX_OK) {
608 return NGX_ERROR;
609 }
610 }
611
612 return NGX_OK;
613 }
614
615
616 static void
ngx_thread_pool_exit_worker(ngx_cycle_t * cycle)617 ngx_thread_pool_exit_worker(ngx_cycle_t *cycle)
618 {
619 ngx_uint_t i;
620 ngx_thread_pool_t **tpp;
621 ngx_thread_pool_conf_t *tcf;
622
623 if (ngx_process != NGX_PROCESS_WORKER
624 && ngx_process != NGX_PROCESS_SINGLE)
625 {
626 return;
627 }
628
629 tcf = (ngx_thread_pool_conf_t *) ngx_get_conf(cycle->conf_ctx,
630 ngx_thread_pool_module);
631
632 if (tcf == NULL) {
633 return;
634 }
635
636 tpp = tcf->pools.elts;
637
638 for (i = 0; i < tcf->pools.nelts; i++) {
639 ngx_thread_pool_destroy(tpp[i]);
640 }
641 }
642