1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) Nginx, Inc.
5  */
6 
7 
8 #include <ngx_config.h>
9 #include <ngx_core.h>
10 #include <ngx_stream.h>
11 
12 
13 static ngx_int_t ngx_stream_upstream_add_variables(ngx_conf_t *cf);
14 static ngx_int_t ngx_stream_upstream_addr_variable(ngx_stream_session_t *s,
15     ngx_stream_variable_value_t *v, uintptr_t data);
16 static ngx_int_t ngx_stream_upstream_response_time_variable(
17     ngx_stream_session_t *s, ngx_stream_variable_value_t *v, uintptr_t data);
18 static ngx_int_t ngx_stream_upstream_bytes_variable(ngx_stream_session_t *s,
19     ngx_stream_variable_value_t *v, uintptr_t data);
20 
21 static char *ngx_stream_upstream(ngx_conf_t *cf, ngx_command_t *cmd,
22     void *dummy);
23 static char *ngx_stream_upstream_server(ngx_conf_t *cf, ngx_command_t *cmd,
24     void *conf);
25 static void *ngx_stream_upstream_create_main_conf(ngx_conf_t *cf);
26 static char *ngx_stream_upstream_init_main_conf(ngx_conf_t *cf, void *conf);
27 
28 
29 static ngx_command_t  ngx_stream_upstream_commands[] = {
30 
31     { ngx_string("upstream"),
32       NGX_STREAM_MAIN_CONF|NGX_CONF_BLOCK|NGX_CONF_TAKE1,
33       ngx_stream_upstream,
34       0,
35       0,
36       NULL },
37 
38     { ngx_string("server"),
39       NGX_STREAM_UPS_CONF|NGX_CONF_1MORE,
40       ngx_stream_upstream_server,
41       NGX_STREAM_SRV_CONF_OFFSET,
42       0,
43       NULL },
44 
45       ngx_null_command
46 };
47 
48 
49 static ngx_stream_module_t  ngx_stream_upstream_module_ctx = {
50     ngx_stream_upstream_add_variables,     /* preconfiguration */
51     NULL,                                  /* postconfiguration */
52 
53     ngx_stream_upstream_create_main_conf,  /* create main configuration */
54     ngx_stream_upstream_init_main_conf,    /* init main configuration */
55 
56     NULL,                                  /* create server configuration */
57     NULL                                   /* merge server configuration */
58 };
59 
60 
61 ngx_module_t  ngx_stream_upstream_module = {
62     NGX_MODULE_V1,
63     &ngx_stream_upstream_module_ctx,       /* module context */
64     ngx_stream_upstream_commands,          /* module directives */
65     NGX_STREAM_MODULE,                     /* module type */
66     NULL,                                  /* init master */
67     NULL,                                  /* init module */
68     NULL,                                  /* init process */
69     NULL,                                  /* init thread */
70     NULL,                                  /* exit thread */
71     NULL,                                  /* exit process */
72     NULL,                                  /* exit master */
73     NGX_MODULE_V1_PADDING
74 };
75 
76 
77 static ngx_stream_variable_t  ngx_stream_upstream_vars[] = {
78 
79     { ngx_string("upstream_addr"), NULL,
80       ngx_stream_upstream_addr_variable, 0,
81       NGX_STREAM_VAR_NOCACHEABLE, 0 },
82 
83     { ngx_string("upstream_bytes_sent"), NULL,
84       ngx_stream_upstream_bytes_variable, 0,
85       NGX_STREAM_VAR_NOCACHEABLE, 0 },
86 
87     { ngx_string("upstream_connect_time"), NULL,
88       ngx_stream_upstream_response_time_variable, 2,
89       NGX_STREAM_VAR_NOCACHEABLE, 0 },
90 
91     { ngx_string("upstream_first_byte_time"), NULL,
92       ngx_stream_upstream_response_time_variable, 1,
93       NGX_STREAM_VAR_NOCACHEABLE, 0 },
94 
95     { ngx_string("upstream_session_time"), NULL,
96       ngx_stream_upstream_response_time_variable, 0,
97       NGX_STREAM_VAR_NOCACHEABLE, 0 },
98 
99     { ngx_string("upstream_bytes_received"), NULL,
100       ngx_stream_upstream_bytes_variable, 1,
101       NGX_STREAM_VAR_NOCACHEABLE, 0 },
102 
103       ngx_stream_null_variable
104 };
105 
106 
107 static ngx_int_t
ngx_stream_upstream_add_variables(ngx_conf_t * cf)108 ngx_stream_upstream_add_variables(ngx_conf_t *cf)
109 {
110     ngx_stream_variable_t  *var, *v;
111 
112     for (v = ngx_stream_upstream_vars; v->name.len; v++) {
113         var = ngx_stream_add_variable(cf, &v->name, v->flags);
114         if (var == NULL) {
115             return NGX_ERROR;
116         }
117 
118         var->get_handler = v->get_handler;
119         var->data = v->data;
120     }
121 
122     return NGX_OK;
123 }
124 
125 
126 static ngx_int_t
ngx_stream_upstream_addr_variable(ngx_stream_session_t * s,ngx_stream_variable_value_t * v,uintptr_t data)127 ngx_stream_upstream_addr_variable(ngx_stream_session_t *s,
128     ngx_stream_variable_value_t *v, uintptr_t data)
129 {
130     u_char                       *p;
131     size_t                        len;
132     ngx_uint_t                    i;
133     ngx_stream_upstream_state_t  *state;
134 
135     v->valid = 1;
136     v->no_cacheable = 0;
137     v->not_found = 0;
138 
139     if (s->upstream_states == NULL || s->upstream_states->nelts == 0) {
140         v->not_found = 1;
141         return NGX_OK;
142     }
143 
144     len = 0;
145     state = s->upstream_states->elts;
146 
147     for (i = 0; i < s->upstream_states->nelts; i++) {
148         if (state[i].peer) {
149             len += state[i].peer->len;
150         }
151 
152         len += 2;
153     }
154 
155     p = ngx_pnalloc(s->connection->pool, len);
156     if (p == NULL) {
157         return NGX_ERROR;
158     }
159 
160     v->data = p;
161 
162     i = 0;
163 
164     for ( ;; ) {
165         if (state[i].peer) {
166             p = ngx_cpymem(p, state[i].peer->data, state[i].peer->len);
167         }
168 
169         if (++i == s->upstream_states->nelts) {
170             break;
171         }
172 
173         *p++ = ',';
174         *p++ = ' ';
175     }
176 
177     v->len = p - v->data;
178 
179     return NGX_OK;
180 }
181 
182 
183 static ngx_int_t
ngx_stream_upstream_bytes_variable(ngx_stream_session_t * s,ngx_stream_variable_value_t * v,uintptr_t data)184 ngx_stream_upstream_bytes_variable(ngx_stream_session_t *s,
185     ngx_stream_variable_value_t *v, uintptr_t data)
186 {
187     u_char                       *p;
188     size_t                        len;
189     ngx_uint_t                    i;
190     ngx_stream_upstream_state_t  *state;
191 
192     v->valid = 1;
193     v->no_cacheable = 0;
194     v->not_found = 0;
195 
196     if (s->upstream_states == NULL || s->upstream_states->nelts == 0) {
197         v->not_found = 1;
198         return NGX_OK;
199     }
200 
201     len = s->upstream_states->nelts * (NGX_OFF_T_LEN + 2);
202 
203     p = ngx_pnalloc(s->connection->pool, len);
204     if (p == NULL) {
205         return NGX_ERROR;
206     }
207 
208     v->data = p;
209 
210     i = 0;
211     state = s->upstream_states->elts;
212 
213     for ( ;; ) {
214 
215         if (data == 1) {
216             p = ngx_sprintf(p, "%O", state[i].bytes_received);
217 
218         } else {
219             p = ngx_sprintf(p, "%O", state[i].bytes_sent);
220         }
221 
222         if (++i == s->upstream_states->nelts) {
223             break;
224         }
225 
226         *p++ = ',';
227         *p++ = ' ';
228     }
229 
230     v->len = p - v->data;
231 
232     return NGX_OK;
233 }
234 
235 
236 static ngx_int_t
ngx_stream_upstream_response_time_variable(ngx_stream_session_t * s,ngx_stream_variable_value_t * v,uintptr_t data)237 ngx_stream_upstream_response_time_variable(ngx_stream_session_t *s,
238     ngx_stream_variable_value_t *v, uintptr_t data)
239 {
240     u_char                       *p;
241     size_t                        len;
242     ngx_uint_t                    i;
243     ngx_msec_int_t                ms;
244     ngx_stream_upstream_state_t  *state;
245 
246     v->valid = 1;
247     v->no_cacheable = 0;
248     v->not_found = 0;
249 
250     if (s->upstream_states == NULL || s->upstream_states->nelts == 0) {
251         v->not_found = 1;
252         return NGX_OK;
253     }
254 
255     len = s->upstream_states->nelts * (NGX_TIME_T_LEN + 4 + 2);
256 
257     p = ngx_pnalloc(s->connection->pool, len);
258     if (p == NULL) {
259         return NGX_ERROR;
260     }
261 
262     v->data = p;
263 
264     i = 0;
265     state = s->upstream_states->elts;
266 
267     for ( ;; ) {
268 
269         if (data == 1) {
270             ms = state[i].first_byte_time;
271 
272         } else if (data == 2) {
273             ms = state[i].connect_time;
274 
275         } else {
276             ms = state[i].response_time;
277         }
278 
279         if (ms != -1) {
280             ms = ngx_max(ms, 0);
281             p = ngx_sprintf(p, "%T.%03M", (time_t) ms / 1000, ms % 1000);
282 
283         } else {
284             *p++ = '-';
285         }
286 
287         if (++i == s->upstream_states->nelts) {
288             break;
289         }
290 
291         *p++ = ',';
292         *p++ = ' ';
293     }
294 
295     v->len = p - v->data;
296 
297     return NGX_OK;
298 }
299 
300 
301 static char *
ngx_stream_upstream(ngx_conf_t * cf,ngx_command_t * cmd,void * dummy)302 ngx_stream_upstream(ngx_conf_t *cf, ngx_command_t *cmd, void *dummy)
303 {
304     char                            *rv;
305     void                            *mconf;
306     ngx_str_t                       *value;
307     ngx_url_t                        u;
308     ngx_uint_t                       m;
309     ngx_conf_t                       pcf;
310     ngx_stream_module_t             *module;
311     ngx_stream_conf_ctx_t           *ctx, *stream_ctx;
312     ngx_stream_upstream_srv_conf_t  *uscf;
313 
314     ngx_memzero(&u, sizeof(ngx_url_t));
315 
316     value = cf->args->elts;
317     u.host = value[1];
318     u.no_resolve = 1;
319     u.no_port = 1;
320 
321     uscf = ngx_stream_upstream_add(cf, &u, NGX_STREAM_UPSTREAM_CREATE
322                                            |NGX_STREAM_UPSTREAM_WEIGHT
323                                            |NGX_STREAM_UPSTREAM_MAX_CONNS
324                                            |NGX_STREAM_UPSTREAM_MAX_FAILS
325                                            |NGX_STREAM_UPSTREAM_FAIL_TIMEOUT
326                                            |NGX_STREAM_UPSTREAM_DOWN
327                                            |NGX_STREAM_UPSTREAM_BACKUP);
328     if (uscf == NULL) {
329         return NGX_CONF_ERROR;
330     }
331 
332 
333     ctx = ngx_pcalloc(cf->pool, sizeof(ngx_stream_conf_ctx_t));
334     if (ctx == NULL) {
335         return NGX_CONF_ERROR;
336     }
337 
338     stream_ctx = cf->ctx;
339     ctx->main_conf = stream_ctx->main_conf;
340 
341     /* the upstream{}'s srv_conf */
342 
343     ctx->srv_conf = ngx_pcalloc(cf->pool,
344                                 sizeof(void *) * ngx_stream_max_module);
345     if (ctx->srv_conf == NULL) {
346         return NGX_CONF_ERROR;
347     }
348 
349     ctx->srv_conf[ngx_stream_upstream_module.ctx_index] = uscf;
350 
351     uscf->srv_conf = ctx->srv_conf;
352 
353     for (m = 0; cf->cycle->modules[m]; m++) {
354         if (cf->cycle->modules[m]->type != NGX_STREAM_MODULE) {
355             continue;
356         }
357 
358         module = cf->cycle->modules[m]->ctx;
359 
360         if (module->create_srv_conf) {
361             mconf = module->create_srv_conf(cf);
362             if (mconf == NULL) {
363                 return NGX_CONF_ERROR;
364             }
365 
366             ctx->srv_conf[cf->cycle->modules[m]->ctx_index] = mconf;
367         }
368     }
369 
370     uscf->servers = ngx_array_create(cf->pool, 4,
371                                      sizeof(ngx_stream_upstream_server_t));
372     if (uscf->servers == NULL) {
373         return NGX_CONF_ERROR;
374     }
375 
376 
377     /* parse inside upstream{} */
378 
379     pcf = *cf;
380     cf->ctx = ctx;
381     cf->cmd_type = NGX_STREAM_UPS_CONF;
382 
383     rv = ngx_conf_parse(cf, NULL);
384 
385     *cf = pcf;
386 
387     if (rv != NGX_CONF_OK) {
388         return rv;
389     }
390 
391     if (uscf->servers->nelts == 0) {
392         ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
393                            "no servers are inside upstream");
394         return NGX_CONF_ERROR;
395     }
396 
397     return rv;
398 }
399 
400 
401 static char *
ngx_stream_upstream_server(ngx_conf_t * cf,ngx_command_t * cmd,void * conf)402 ngx_stream_upstream_server(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
403 {
404     ngx_stream_upstream_srv_conf_t  *uscf = conf;
405 
406     time_t                         fail_timeout;
407     ngx_str_t                     *value, s;
408     ngx_url_t                      u;
409     ngx_int_t                      weight, max_conns, max_fails;
410     ngx_uint_t                     i;
411     ngx_stream_upstream_server_t  *us;
412 
413     us = ngx_array_push(uscf->servers);
414     if (us == NULL) {
415         return NGX_CONF_ERROR;
416     }
417 
418     ngx_memzero(us, sizeof(ngx_stream_upstream_server_t));
419 
420     value = cf->args->elts;
421 
422     weight = 1;
423     max_conns = 0;
424     max_fails = 1;
425     fail_timeout = 10;
426 
427     for (i = 2; i < cf->args->nelts; i++) {
428 
429         if (ngx_strncmp(value[i].data, "weight=", 7) == 0) {
430 
431             if (!(uscf->flags & NGX_STREAM_UPSTREAM_WEIGHT)) {
432                 goto not_supported;
433             }
434 
435             weight = ngx_atoi(&value[i].data[7], value[i].len - 7);
436 
437             if (weight == NGX_ERROR || weight == 0) {
438                 goto invalid;
439             }
440 
441             continue;
442         }
443 
444         if (ngx_strncmp(value[i].data, "max_conns=", 10) == 0) {
445 
446             if (!(uscf->flags & NGX_STREAM_UPSTREAM_MAX_CONNS)) {
447                 goto not_supported;
448             }
449 
450             max_conns = ngx_atoi(&value[i].data[10], value[i].len - 10);
451 
452             if (max_conns == NGX_ERROR) {
453                 goto invalid;
454             }
455 
456             continue;
457         }
458 
459         if (ngx_strncmp(value[i].data, "max_fails=", 10) == 0) {
460 
461             if (!(uscf->flags & NGX_STREAM_UPSTREAM_MAX_FAILS)) {
462                 goto not_supported;
463             }
464 
465             max_fails = ngx_atoi(&value[i].data[10], value[i].len - 10);
466 
467             if (max_fails == NGX_ERROR) {
468                 goto invalid;
469             }
470 
471             continue;
472         }
473 
474         if (ngx_strncmp(value[i].data, "fail_timeout=", 13) == 0) {
475 
476             if (!(uscf->flags & NGX_STREAM_UPSTREAM_FAIL_TIMEOUT)) {
477                 goto not_supported;
478             }
479 
480             s.len = value[i].len - 13;
481             s.data = &value[i].data[13];
482 
483             fail_timeout = ngx_parse_time(&s, 1);
484 
485             if (fail_timeout == (time_t) NGX_ERROR) {
486                 goto invalid;
487             }
488 
489             continue;
490         }
491 
492         if (ngx_strcmp(value[i].data, "backup") == 0) {
493 
494             if (!(uscf->flags & NGX_STREAM_UPSTREAM_BACKUP)) {
495                 goto not_supported;
496             }
497 
498             us->backup = 1;
499 
500             continue;
501         }
502 
503         if (ngx_strcmp(value[i].data, "down") == 0) {
504 
505             if (!(uscf->flags & NGX_STREAM_UPSTREAM_DOWN)) {
506                 goto not_supported;
507             }
508 
509             us->down = 1;
510 
511             continue;
512         }
513 
514         goto invalid;
515     }
516 
517     ngx_memzero(&u, sizeof(ngx_url_t));
518 
519     u.url = value[1];
520 
521     if (ngx_parse_url(cf->pool, &u) != NGX_OK) {
522         if (u.err) {
523             ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
524                                "%s in upstream \"%V\"", u.err, &u.url);
525         }
526 
527         return NGX_CONF_ERROR;
528     }
529 
530     if (u.no_port) {
531         ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
532                            "no port in upstream \"%V\"", &u.url);
533         return NGX_CONF_ERROR;
534     }
535 
536     us->name = u.url;
537     us->addrs = u.addrs;
538     us->naddrs = u.naddrs;
539     us->weight = weight;
540     us->max_conns = max_conns;
541     us->max_fails = max_fails;
542     us->fail_timeout = fail_timeout;
543 
544     return NGX_CONF_OK;
545 
546 invalid:
547 
548     ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
549                        "invalid parameter \"%V\"", &value[i]);
550 
551     return NGX_CONF_ERROR;
552 
553 not_supported:
554 
555     ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
556                        "balancing method does not support parameter \"%V\"",
557                        &value[i]);
558 
559     return NGX_CONF_ERROR;
560 }
561 
562 
563 ngx_stream_upstream_srv_conf_t *
ngx_stream_upstream_add(ngx_conf_t * cf,ngx_url_t * u,ngx_uint_t flags)564 ngx_stream_upstream_add(ngx_conf_t *cf, ngx_url_t *u, ngx_uint_t flags)
565 {
566     ngx_uint_t                        i;
567     ngx_stream_upstream_server_t     *us;
568     ngx_stream_upstream_srv_conf_t   *uscf, **uscfp;
569     ngx_stream_upstream_main_conf_t  *umcf;
570 
571     if (!(flags & NGX_STREAM_UPSTREAM_CREATE)) {
572 
573         if (ngx_parse_url(cf->pool, u) != NGX_OK) {
574             if (u->err) {
575                 ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
576                                    "%s in upstream \"%V\"", u->err, &u->url);
577             }
578 
579             return NULL;
580         }
581     }
582 
583     umcf = ngx_stream_conf_get_module_main_conf(cf, ngx_stream_upstream_module);
584 
585     uscfp = umcf->upstreams.elts;
586 
587     for (i = 0; i < umcf->upstreams.nelts; i++) {
588 
589         if (uscfp[i]->host.len != u->host.len
590             || ngx_strncasecmp(uscfp[i]->host.data, u->host.data, u->host.len)
591                != 0)
592         {
593             continue;
594         }
595 
596         if ((flags & NGX_STREAM_UPSTREAM_CREATE)
597              && (uscfp[i]->flags & NGX_STREAM_UPSTREAM_CREATE))
598         {
599             ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
600                                "duplicate upstream \"%V\"", &u->host);
601             return NULL;
602         }
603 
604         if ((uscfp[i]->flags & NGX_STREAM_UPSTREAM_CREATE) && !u->no_port) {
605             ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
606                                "upstream \"%V\" may not have port %d",
607                                &u->host, u->port);
608             return NULL;
609         }
610 
611         if ((flags & NGX_STREAM_UPSTREAM_CREATE) && !uscfp[i]->no_port) {
612             ngx_log_error(NGX_LOG_EMERG, cf->log, 0,
613                           "upstream \"%V\" may not have port %d in %s:%ui",
614                           &u->host, uscfp[i]->port,
615                           uscfp[i]->file_name, uscfp[i]->line);
616             return NULL;
617         }
618 
619         if (uscfp[i]->port != u->port) {
620             continue;
621         }
622 
623         if (flags & NGX_STREAM_UPSTREAM_CREATE) {
624             uscfp[i]->flags = flags;
625         }
626 
627         return uscfp[i];
628     }
629 
630     uscf = ngx_pcalloc(cf->pool, sizeof(ngx_stream_upstream_srv_conf_t));
631     if (uscf == NULL) {
632         return NULL;
633     }
634 
635     uscf->flags = flags;
636     uscf->host = u->host;
637     uscf->file_name = cf->conf_file->file.name.data;
638     uscf->line = cf->conf_file->line;
639     uscf->port = u->port;
640     uscf->no_port = u->no_port;
641 
642     if (u->naddrs == 1 && (u->port || u->family == AF_UNIX)) {
643         uscf->servers = ngx_array_create(cf->pool, 1,
644                                          sizeof(ngx_stream_upstream_server_t));
645         if (uscf->servers == NULL) {
646             return NULL;
647         }
648 
649         us = ngx_array_push(uscf->servers);
650         if (us == NULL) {
651             return NULL;
652         }
653 
654         ngx_memzero(us, sizeof(ngx_stream_upstream_server_t));
655 
656         us->addrs = u->addrs;
657         us->naddrs = 1;
658     }
659 
660     uscfp = ngx_array_push(&umcf->upstreams);
661     if (uscfp == NULL) {
662         return NULL;
663     }
664 
665     *uscfp = uscf;
666 
667     return uscf;
668 }
669 
670 
671 static void *
ngx_stream_upstream_create_main_conf(ngx_conf_t * cf)672 ngx_stream_upstream_create_main_conf(ngx_conf_t *cf)
673 {
674     ngx_stream_upstream_main_conf_t  *umcf;
675 
676     umcf = ngx_pcalloc(cf->pool, sizeof(ngx_stream_upstream_main_conf_t));
677     if (umcf == NULL) {
678         return NULL;
679     }
680 
681     if (ngx_array_init(&umcf->upstreams, cf->pool, 4,
682                        sizeof(ngx_stream_upstream_srv_conf_t *))
683         != NGX_OK)
684     {
685         return NULL;
686     }
687 
688     return umcf;
689 }
690 
691 
692 static char *
ngx_stream_upstream_init_main_conf(ngx_conf_t * cf,void * conf)693 ngx_stream_upstream_init_main_conf(ngx_conf_t *cf, void *conf)
694 {
695     ngx_stream_upstream_main_conf_t *umcf = conf;
696 
697     ngx_uint_t                        i;
698     ngx_stream_upstream_init_pt       init;
699     ngx_stream_upstream_srv_conf_t  **uscfp;
700 
701     uscfp = umcf->upstreams.elts;
702 
703     for (i = 0; i < umcf->upstreams.nelts; i++) {
704 
705         init = uscfp[i]->peer.init_upstream
706                                          ? uscfp[i]->peer.init_upstream
707                                          : ngx_stream_upstream_init_round_robin;
708 
709         if (init(cf, uscfp[i]) != NGX_OK) {
710             return NGX_CONF_ERROR;
711         }
712     }
713 
714     return NGX_CONF_OK;
715 }
716