1 /* ====================================================================
2  *    Licensed to the Apache Software Foundation (ASF) under one
3  *    or more contributor license agreements.  See the NOTICE file
4  *    distributed with this work for additional information
5  *    regarding copyright ownership.  The ASF licenses this file
6  *    to you under the Apache License, Version 2.0 (the
7  *    "License"); you may not use this file except in compliance
8  *    with the License.  You may obtain a copy of the License at
9  *
10  *      http://www.apache.org/licenses/LICENSE-2.0
11  *
12  *    Unless required by applicable law or agreed to in writing,
13  *    software distributed under the License is distributed on an
14  *    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15  *    KIND, either express or implied.  See the License for the
16  *    specific language governing permissions and limitations
17  *    under the License.
18  * ====================================================================
19  */
20 
21 #include <apr_pools.h>
22 #include <apr_strings.h>
23 #include <apr_lib.h>
24 #include <apr_date.h>
25 
26 #include "serf.h"
27 #include "serf_bucket_util.h"
28 #include "serf_bucket_types.h"
29 
30 #include <stdlib.h>
31 
32 /* This is an implementation of Bidirectional Web Transfer Protocol (BWTP)
33  * See:
34  *   http://bwtp.wikidot.com/
35  */
36 
37 typedef struct {
38     int channel;
39     int open;
40     int type; /* 0 = header, 1 = message */ /* TODO enum? */
41     const char *phrase;
42     serf_bucket_t *headers;
43 
44     char req_line[1000];
45 } frame_context_t;
46 
47 typedef struct {
48     serf_bucket_t *stream;
49     serf_bucket_t *body;        /* Pointer to the stream wrapping the body. */
50     serf_bucket_t *headers;     /* holds parsed headers */
51 
52     enum {
53         STATE_STATUS_LINE,      /* reading status line */
54         STATE_HEADERS,          /* reading headers */
55         STATE_BODY,             /* reading body */
56         STATE_DONE              /* we've sent EOF */
57     } state;
58 
59     /* Buffer for accumulating a line from the response. */
60     serf_linebuf_t linebuf;
61 
62     int type; /* 0 = header, 1 = message */ /* TODO enum? */
63     int channel;
64     char *phrase;
65     apr_size_t length;
66 } incoming_context_t;
67 
68 
serf_bucket_bwtp_channel_close(int channel,serf_bucket_alloc_t * allocator)69 serf_bucket_t *serf_bucket_bwtp_channel_close(
70     int channel,
71     serf_bucket_alloc_t *allocator)
72 {
73     frame_context_t *ctx;
74 
75     ctx = serf_bucket_mem_alloc(allocator, sizeof(*ctx));
76     ctx->type = 0;
77     ctx->open = 0;
78     ctx->channel = channel;
79     ctx->phrase = "CLOSED";
80     ctx->headers = serf_bucket_headers_create(allocator);
81 
82     return serf_bucket_create(&serf_bucket_type_bwtp_frame, allocator, ctx);
83 }
84 
serf_bucket_bwtp_channel_open(int channel,const char * uri,serf_bucket_alloc_t * allocator)85 serf_bucket_t *serf_bucket_bwtp_channel_open(
86     int channel,
87     const char *uri,
88     serf_bucket_alloc_t *allocator)
89 {
90     frame_context_t *ctx;
91 
92     ctx = serf_bucket_mem_alloc(allocator, sizeof(*ctx));
93     ctx->type = 0;
94     ctx->open = 1;
95     ctx->channel = channel;
96     ctx->phrase = uri;
97     ctx->headers = serf_bucket_headers_create(allocator);
98 
99     return serf_bucket_create(&serf_bucket_type_bwtp_frame, allocator, ctx);
100 }
101 
serf_bucket_bwtp_header_create(int channel,const char * phrase,serf_bucket_alloc_t * allocator)102 serf_bucket_t *serf_bucket_bwtp_header_create(
103     int channel,
104     const char *phrase,
105     serf_bucket_alloc_t *allocator)
106 {
107     frame_context_t *ctx;
108 
109     ctx = serf_bucket_mem_alloc(allocator, sizeof(*ctx));
110     ctx->type = 0;
111     ctx->open = 0;
112     ctx->channel = channel;
113     ctx->phrase = phrase;
114     ctx->headers = serf_bucket_headers_create(allocator);
115 
116     return serf_bucket_create(&serf_bucket_type_bwtp_frame, allocator, ctx);
117 }
118 
serf_bucket_bwtp_message_create(int channel,serf_bucket_t * body,serf_bucket_alloc_t * allocator)119 serf_bucket_t *serf_bucket_bwtp_message_create(
120     int channel,
121     serf_bucket_t *body,
122     serf_bucket_alloc_t *allocator)
123 {
124     frame_context_t *ctx;
125 
126     ctx = serf_bucket_mem_alloc(allocator, sizeof(*ctx));
127     ctx->type = 1;
128     ctx->open = 0;
129     ctx->channel = channel;
130     ctx->phrase = "MESSAGE";
131     ctx->headers = serf_bucket_headers_create(allocator);
132 
133     return serf_bucket_create(&serf_bucket_type_bwtp_frame, allocator, ctx);
134 }
135 
serf_bucket_bwtp_frame_get_channel(serf_bucket_t * bucket)136 int serf_bucket_bwtp_frame_get_channel(
137     serf_bucket_t *bucket)
138 {
139     if (SERF_BUCKET_IS_BWTP_FRAME(bucket)) {
140         frame_context_t *ctx = bucket->data;
141 
142         return ctx->channel;
143     }
144     else if (SERF_BUCKET_IS_BWTP_INCOMING_FRAME(bucket)) {
145         incoming_context_t *ctx = bucket->data;
146 
147         return ctx->channel;
148     }
149 
150     return -1;
151 }
152 
serf_bucket_bwtp_frame_get_type(serf_bucket_t * bucket)153 int serf_bucket_bwtp_frame_get_type(
154     serf_bucket_t *bucket)
155 {
156     if (SERF_BUCKET_IS_BWTP_FRAME(bucket)) {
157         frame_context_t *ctx = bucket->data;
158 
159         return ctx->type;
160     }
161     else if (SERF_BUCKET_IS_BWTP_INCOMING_FRAME(bucket)) {
162         incoming_context_t *ctx = bucket->data;
163 
164         return ctx->type;
165     }
166 
167     return -1;
168 }
169 
serf_bucket_bwtp_frame_get_phrase(serf_bucket_t * bucket)170 const char *serf_bucket_bwtp_frame_get_phrase(
171     serf_bucket_t *bucket)
172 {
173     if (SERF_BUCKET_IS_BWTP_FRAME(bucket)) {
174         frame_context_t *ctx = bucket->data;
175 
176         return ctx->phrase;
177     }
178     else if (SERF_BUCKET_IS_BWTP_INCOMING_FRAME(bucket)) {
179         incoming_context_t *ctx = bucket->data;
180 
181         return ctx->phrase;
182     }
183 
184     return NULL;
185 }
186 
serf_bucket_bwtp_frame_get_headers(serf_bucket_t * bucket)187 serf_bucket_t *serf_bucket_bwtp_frame_get_headers(
188     serf_bucket_t *bucket)
189 {
190     if (SERF_BUCKET_IS_BWTP_FRAME(bucket)) {
191         frame_context_t *ctx = bucket->data;
192 
193         return ctx->headers;
194     }
195     else if (SERF_BUCKET_IS_BWTP_INCOMING_FRAME(bucket)) {
196         incoming_context_t *ctx = bucket->data;
197 
198         return ctx->headers;
199     }
200 
201     return NULL;
202 }
203 
count_size(void * baton,const char * key,const char * value)204 static int count_size(void *baton, const char *key, const char *value)
205 {
206     apr_size_t *c = baton;
207     /* TODO Deal with folding.  Yikes. */
208 
209     /* Add in ": " and CRLF - so an extra four bytes. */
210     *c += strlen(key) + strlen(value) + 4;
211 
212     return 0;
213 }
214 
calc_header_size(serf_bucket_t * hdrs)215 static apr_size_t calc_header_size(serf_bucket_t *hdrs)
216 {
217     apr_size_t size = 0;
218 
219     serf_bucket_headers_do(hdrs, count_size, &size);
220 
221     return size;
222 }
223 
serialize_data(serf_bucket_t * bucket)224 static void serialize_data(serf_bucket_t *bucket)
225 {
226     frame_context_t *ctx = bucket->data;
227     serf_bucket_t *new_bucket;
228     apr_size_t req_len;
229 
230     /* Serialize the request-line and headers into one mother string,
231      * and wrap a bucket around it.
232      */
233     req_len = apr_snprintf(ctx->req_line, sizeof(ctx->req_line),
234                            "%s %d " "%" APR_UINT64_T_HEX_FMT " %s%s\r\n",
235                            (ctx->type ? "BWM" : "BWH"),
236                            ctx->channel, calc_header_size(ctx->headers),
237                            (ctx->open ? "OPEN " : ""),
238                            ctx->phrase);
239     new_bucket = serf_bucket_simple_copy_create(ctx->req_line, req_len,
240                                                 bucket->allocator);
241 
242     /* Build up the new bucket structure.
243      *
244      * Note that self needs to become an aggregate bucket so that a
245      * pointer to self still represents the "right" data.
246      */
247     serf_bucket_aggregate_become(bucket);
248 
249     /* Insert the two buckets. */
250     serf_bucket_aggregate_append(bucket, new_bucket);
251     serf_bucket_aggregate_append(bucket, ctx->headers);
252 
253     /* Our private context is no longer needed, and is not referred to by
254      * any existing bucket. Toss it.
255      */
256     serf_bucket_mem_free(bucket->allocator, ctx);
257 }
258 
serf_bwtp_frame_read(serf_bucket_t * bucket,apr_size_t requested,const char ** data,apr_size_t * len)259 static apr_status_t serf_bwtp_frame_read(serf_bucket_t *bucket,
260                                          apr_size_t requested,
261                                          const char **data, apr_size_t *len)
262 {
263     /* Seralize our private data into a new aggregate bucket. */
264     serialize_data(bucket);
265 
266     /* Delegate to the "new" aggregate bucket to do the read. */
267     return serf_bucket_read(bucket, requested, data, len);
268 }
269 
serf_bwtp_frame_readline(serf_bucket_t * bucket,int acceptable,int * found,const char ** data,apr_size_t * len)270 static apr_status_t serf_bwtp_frame_readline(serf_bucket_t *bucket,
271                                              int acceptable, int *found,
272                                              const char **data, apr_size_t *len)
273 {
274     /* Seralize our private data into a new aggregate bucket. */
275     serialize_data(bucket);
276 
277     /* Delegate to the "new" aggregate bucket to do the readline. */
278     return serf_bucket_readline(bucket, acceptable, found, data, len);
279 }
280 
serf_bwtp_frame_read_iovec(serf_bucket_t * bucket,apr_size_t requested,int vecs_size,struct iovec * vecs,int * vecs_used)281 static apr_status_t serf_bwtp_frame_read_iovec(serf_bucket_t *bucket,
282                                                apr_size_t requested,
283                                                int vecs_size,
284                                                struct iovec *vecs,
285                                                int *vecs_used)
286 {
287     /* Seralize our private data into a new aggregate bucket. */
288     serialize_data(bucket);
289 
290     /* Delegate to the "new" aggregate bucket to do the read. */
291     return serf_bucket_read_iovec(bucket, requested,
292                                   vecs_size, vecs, vecs_used);
293 }
294 
serf_bwtp_frame_peek(serf_bucket_t * bucket,const char ** data,apr_size_t * len)295 static apr_status_t serf_bwtp_frame_peek(serf_bucket_t *bucket,
296                                          const char **data,
297                                          apr_size_t *len)
298 {
299     /* Seralize our private data into a new aggregate bucket. */
300     serialize_data(bucket);
301 
302     /* Delegate to the "new" aggregate bucket to do the peek. */
303     return serf_bucket_peek(bucket, data, len);
304 }
305 
306 const serf_bucket_type_t serf_bucket_type_bwtp_frame = {
307     "BWTP-FRAME",
308     serf_bwtp_frame_read,
309     serf_bwtp_frame_readline,
310     serf_bwtp_frame_read_iovec,
311     serf_default_read_for_sendfile,
312     serf_default_read_bucket,
313     serf_bwtp_frame_peek,
314     serf_default_destroy_and_data,
315 };
316 
317 
serf_bucket_bwtp_incoming_frame_create(serf_bucket_t * stream,serf_bucket_alloc_t * allocator)318 serf_bucket_t *serf_bucket_bwtp_incoming_frame_create(
319     serf_bucket_t *stream,
320     serf_bucket_alloc_t *allocator)
321 {
322     incoming_context_t *ctx;
323 
324     ctx = serf_bucket_mem_alloc(allocator, sizeof(*ctx));
325     ctx->stream = stream;
326     ctx->body = NULL;
327     ctx->headers = serf_bucket_headers_create(allocator);
328     ctx->state = STATE_STATUS_LINE;
329     ctx->length = 0;
330     ctx->channel = -1;
331     ctx->phrase = NULL;
332 
333     serf_linebuf_init(&ctx->linebuf);
334 
335     return serf_bucket_create(&serf_bucket_type_bwtp_incoming_frame, allocator, ctx);
336 }
337 
bwtp_incoming_destroy_and_data(serf_bucket_t * bucket)338 static void bwtp_incoming_destroy_and_data(serf_bucket_t *bucket)
339 {
340     incoming_context_t *ctx = bucket->data;
341 
342     if (ctx->state != STATE_STATUS_LINE && ctx->phrase) {
343         serf_bucket_mem_free(bucket->allocator, (void*)ctx->phrase);
344     }
345 
346     serf_bucket_destroy(ctx->stream);
347     if (ctx->body != NULL)
348         serf_bucket_destroy(ctx->body);
349     serf_bucket_destroy(ctx->headers);
350 
351     serf_default_destroy_and_data(bucket);
352 }
353 
fetch_line(incoming_context_t * ctx,int acceptable)354 static apr_status_t fetch_line(incoming_context_t *ctx, int acceptable)
355 {
356     return serf_linebuf_fetch(&ctx->linebuf, ctx->stream, acceptable);
357 }
358 
parse_status_line(incoming_context_t * ctx,serf_bucket_alloc_t * allocator)359 static apr_status_t parse_status_line(incoming_context_t *ctx,
360                                       serf_bucket_alloc_t *allocator)
361 {
362     int res;
363     char *reason; /* ### stupid APR interface makes this non-const */
364 
365     /* ctx->linebuf.line should be of form: BW* */
366     res = apr_date_checkmask(ctx->linebuf.line, "BW*");
367     if (!res) {
368         /* Not an BWTP response?  Well, at least we won't understand it. */
369         return APR_EGENERAL;
370     }
371 
372     if (ctx->linebuf.line[2] == 'H') {
373         ctx->type = 0;
374     }
375     else if (ctx->linebuf.line[2] == 'M') {
376         ctx->type = 1;
377     }
378     else {
379         ctx->type = -1;
380     }
381 
382     ctx->channel = apr_strtoi64(ctx->linebuf.line + 3, &reason, 16);
383 
384     /* Skip leading spaces for the reason string. */
385     if (apr_isspace(*reason)) {
386         reason++;
387     }
388 
389     ctx->length = apr_strtoi64(reason, &reason, 16);
390 
391     /* Skip leading spaces for the reason string. */
392     if (reason - ctx->linebuf.line < ctx->linebuf.used) {
393         if (apr_isspace(*reason)) {
394             reason++;
395         }
396 
397         ctx->phrase = serf_bstrmemdup(allocator, reason,
398                                       ctx->linebuf.used
399                                       - (reason - ctx->linebuf.line));
400     } else {
401         ctx->phrase = NULL;
402     }
403 
404     return APR_SUCCESS;
405 }
406 
407 /* This code should be replaced with header buckets. */
fetch_headers(serf_bucket_t * bkt,incoming_context_t * ctx)408 static apr_status_t fetch_headers(serf_bucket_t *bkt, incoming_context_t *ctx)
409 {
410     apr_status_t status;
411 
412     /* RFC 2616 says that CRLF is the only line ending, but we can easily
413      * accept any kind of line ending.
414      */
415     status = fetch_line(ctx, SERF_NEWLINE_ANY);
416     if (SERF_BUCKET_READ_ERROR(status)) {
417         return status;
418     }
419     /* Something was read. Process it. */
420 
421     if (ctx->linebuf.state == SERF_LINEBUF_READY && ctx->linebuf.used) {
422         const char *end_key;
423         const char *c;
424 
425         end_key = c = memchr(ctx->linebuf.line, ':', ctx->linebuf.used);
426         if (!c) {
427             /* Bad headers? */
428             return APR_EGENERAL;
429         }
430 
431         /* Skip over initial : and spaces. */
432         while (apr_isspace(*++c))
433             continue;
434 
435         /* Always copy the headers (from the linebuf into new mem). */
436         /* ### we should be able to optimize some mem copies */
437         serf_bucket_headers_setx(
438             ctx->headers,
439             ctx->linebuf.line, end_key - ctx->linebuf.line, 1,
440             c, ctx->linebuf.line + ctx->linebuf.used - c, 1);
441     }
442 
443     return status;
444 }
445 
446 /* Perform one iteration of the state machine.
447  *
448  * Will return when one the following conditions occurred:
449  *  1) a state change
450  *  2) an error
451  *  3) the stream is not ready or at EOF
452  *  4) APR_SUCCESS, meaning the machine can be run again immediately
453  */
run_machine(serf_bucket_t * bkt,incoming_context_t * ctx)454 static apr_status_t run_machine(serf_bucket_t *bkt, incoming_context_t *ctx)
455 {
456     apr_status_t status = APR_SUCCESS; /* initialize to avoid gcc warnings */
457 
458     switch (ctx->state) {
459     case STATE_STATUS_LINE:
460         /* RFC 2616 says that CRLF is the only line ending, but we can easily
461          * accept any kind of line ending.
462          */
463         status = fetch_line(ctx, SERF_NEWLINE_ANY);
464         if (SERF_BUCKET_READ_ERROR(status))
465             return status;
466 
467         if (ctx->linebuf.state == SERF_LINEBUF_READY && ctx->linebuf.used) {
468             /* The Status-Line is in the line buffer. Process it. */
469             status = parse_status_line(ctx, bkt->allocator);
470             if (status)
471                 return status;
472 
473             if (ctx->length) {
474                 ctx->body =
475                     serf_bucket_barrier_create(ctx->stream, bkt->allocator);
476                 ctx->body = serf_bucket_limit_create(ctx->body, ctx->length,
477                                                      bkt->allocator);
478                 if (!ctx->type) {
479                     ctx->state = STATE_HEADERS;
480                 } else {
481                     ctx->state = STATE_BODY;
482                 }
483             } else {
484                 ctx->state = STATE_DONE;
485             }
486         }
487         else {
488             /* The connection closed before we could get the next
489              * response.  Treat the request as lost so that our upper
490              * end knows the server never tried to give us a response.
491              */
492             if (APR_STATUS_IS_EOF(status)) {
493                 return SERF_ERROR_REQUEST_LOST;
494             }
495         }
496         break;
497     case STATE_HEADERS:
498         status = fetch_headers(ctx->body, ctx);
499         if (SERF_BUCKET_READ_ERROR(status))
500             return status;
501 
502         /* If an empty line was read, then we hit the end of the headers.
503          * Move on to the body.
504          */
505         if (ctx->linebuf.state == SERF_LINEBUF_READY && !ctx->linebuf.used) {
506             /* Advance the state. */
507             ctx->state = STATE_DONE;
508         }
509         break;
510     case STATE_BODY:
511         /* Don't do anything. */
512         break;
513     case STATE_DONE:
514         return APR_EOF;
515     default:
516         /* Not reachable */
517         return APR_EGENERAL;
518     }
519 
520     return status;
521 }
522 
wait_for_body(serf_bucket_t * bkt,incoming_context_t * ctx)523 static apr_status_t wait_for_body(serf_bucket_t *bkt, incoming_context_t *ctx)
524 {
525     apr_status_t status;
526 
527     /* Keep reading and moving through states if we aren't at the BODY */
528     while (ctx->state != STATE_BODY) {
529         status = run_machine(bkt, ctx);
530 
531         /* Anything other than APR_SUCCESS means that we cannot immediately
532          * read again (for now).
533          */
534         if (status)
535             return status;
536     }
537     /* in STATE_BODY */
538 
539     return APR_SUCCESS;
540 }
541 
serf_bucket_bwtp_incoming_frame_wait_for_headers(serf_bucket_t * bucket)542 apr_status_t serf_bucket_bwtp_incoming_frame_wait_for_headers(
543     serf_bucket_t *bucket)
544 {
545     incoming_context_t *ctx = bucket->data;
546 
547     return wait_for_body(bucket, ctx);
548 }
549 
bwtp_incoming_read(serf_bucket_t * bucket,apr_size_t requested,const char ** data,apr_size_t * len)550 static apr_status_t bwtp_incoming_read(serf_bucket_t *bucket,
551                                        apr_size_t requested,
552                                        const char **data, apr_size_t *len)
553 {
554     incoming_context_t *ctx = bucket->data;
555     apr_status_t rv;
556 
557     rv = wait_for_body(bucket, ctx);
558     if (rv) {
559         /* It's not possible to have read anything yet! */
560         if (APR_STATUS_IS_EOF(rv) || APR_STATUS_IS_EAGAIN(rv)) {
561             *len = 0;
562         }
563         return rv;
564     }
565 
566     rv = serf_bucket_read(ctx->body, requested, data, len);
567     if (APR_STATUS_IS_EOF(rv)) {
568         ctx->state = STATE_DONE;
569     }
570     return rv;
571 }
572 
bwtp_incoming_readline(serf_bucket_t * bucket,int acceptable,int * found,const char ** data,apr_size_t * len)573 static apr_status_t bwtp_incoming_readline(serf_bucket_t *bucket,
574                                            int acceptable, int *found,
575                                            const char **data, apr_size_t *len)
576 {
577     incoming_context_t *ctx = bucket->data;
578     apr_status_t rv;
579 
580     rv = wait_for_body(bucket, ctx);
581     if (rv) {
582         return rv;
583     }
584 
585     /* Delegate to the stream bucket to do the readline. */
586     return serf_bucket_readline(ctx->body, acceptable, found, data, len);
587 }
588 
589 /* ### need to implement */
590 #define bwtp_incoming_peek NULL
591 
592 const serf_bucket_type_t serf_bucket_type_bwtp_incoming_frame = {
593     "BWTP-INCOMING",
594     bwtp_incoming_read,
595     bwtp_incoming_readline,
596     serf_default_read_iovec,
597     serf_default_read_for_sendfile,
598     serf_default_read_bucket,
599     bwtp_incoming_peek,
600     bwtp_incoming_destroy_and_data,
601 };
602