1#!/usr/bin/env perl
2
3use strict;
4use warnings;
5use Test::More;
6use FindBin qw($Bin);
7use lib "$Bin/lib";
8use Carp qw(croak);
9use MemcachedTest;
10use IO::Select;
11use IO::Socket qw(AF_INET SOCK_STREAM);
12
13if (!supports_proxy()) {
14    plan skip_all => 'proxy not enabled';
15    exit 0;
16}
17
18# Set up some server sockets.
19sub mock_server {
20    my $port = shift;
21    my $srv = IO::Socket->new(
22        Domain => AF_INET,
23        Type => SOCK_STREAM,
24        Proto => 'tcp',
25        LocalHost => '127.0.0.1',
26        LocalPort => $port,
27        ReusePort => 1,
28        Listen => 5) || die "IO::Socket: $@";
29    return $srv;
30}
31
32sub accept_backend {
33    my $srv = shift;
34    my $be = $srv->accept();
35    $be->autoflush(1);
36    ok(defined $be, "mock backend created");
37    like(<$be>, qr/version/, "received version command");
38    print $be "VERSION 1.0.0-mock\r\n";
39
40    return $be;
41}
42
43# Put a version command down the pipe to ensure the socket is clear.
44# client version commands skip the proxy code
45sub check_version {
46    my $ps = shift;
47    print $ps "version\r\n";
48    like(<$ps>, qr/VERSION /, "version received");
49}
50
51sub wait_reload {
52    my $w = shift;
53    like(<$w>, qr/ts=(\S+) gid=\d+ type=proxy_conf status=start/, "reload started");
54    like(<$w>, qr/ts=(\S+) gid=\d+ type=proxy_conf status=done/, "reload completed");
55}
56
57my @mocksrvs = ();
58note "making mock servers";
59for my $port (11711, 11712, 11713) {
60    my $srv = mock_server($port);
61    ok(defined $srv, "mock server created");
62    push(@mocksrvs, $srv);
63}
64
65# Start up a clean server.
66# Since limits are per worker thread, cut the worker threads down to 1 to ease
67# testing.
68my $p_srv = new_memcached('-o proxy_config=./t/proxylimits.lua -t 1');
69my $ps = $p_srv->sock;
70$ps->autoflush(1);
71my @mbe = ();
72my $watcher;
73
74subtest 'active request limit' => sub {
75    for my $msrv ($mocksrvs[0], $mocksrvs[1], $mocksrvs[2]) {
76        my $be = accept_backend($msrv);
77        push(@mbe, $be);
78    }
79
80    my $stats = mem_stats($ps, 'proxy');
81    isnt($stats->{active_req_limit}, 0, "active request limit is set");
82
83    # active request limit is 4, pipeline 6 requests and ensure the last two
84    # get junked
85    my $cmd = '';
86    for ('a', 'b', 'c', 'd', 'e', 'f') {
87        $cmd .= "mg /test/$_\r\n";
88    }
89    print $ps $cmd;
90
91    # Lua config only sends commands to the first backend for this test.
92    my $be = $mbe[0];
93    for (1 .. 4) {
94        like(<$be>, qr/^mg \/test\/\w\r\n$/, "backend received mg");
95        print $be "EN\r\n";
96    }
97    my $s = IO::Select->new();
98    $s->add($be);
99    my @readable = $s->can_read(0.25);
100    is(scalar @readable, 0, "no more pending reads on backend");
101
102    for (1 .. 4) {
103        is(scalar <$ps>, "EN\r\n", "received miss from backend");
104    }
105
106    is(scalar <$ps>, "SERVER_ERROR active request limit reached\r\n", "got error back");
107    is(scalar <$ps>, "SERVER_ERROR active request limit reached\r\n", "got two limit errors");
108
109    # Test turning the limit back off.
110    $watcher = $p_srv->new_sock;
111    print $watcher "watch proxyevents\n";
112    is(<$watcher>, "OK\r\n", "watcher enabled");
113    $p_srv->reload();
114    wait_reload($watcher);
115
116    $stats = mem_stats($ps, 'proxy');
117    is($stats->{active_req_limit}, 0, "active request limit unset");
118
119    $cmd = '';
120    for ('a', 'b', 'c', 'd', 'e', 'f') {
121        $cmd .= "mg /test/$_\r\n";
122    }
123    print $ps $cmd;
124    for (1 .. 6) {
125        like(<$be>, qr/^mg \/test\/\w\r\n$/, "backend received mg");
126        print $be "EN\r\n";
127    }
128    for (1 .. 6) {
129        is(scalar <$ps>, "EN\r\n", "received miss from backend");
130    }
131};
132
133subtest 'buffer memory limit' => sub {
134    # Test the buffer memory limiter.
135    # - limit per worker will be 1/t global limit
136    $p_srv->reload();
137    wait_reload($watcher);
138    # Get a secondary client to trample limit.
139    my $sps = $p_srv->new_sock;
140
141    my $stats = mem_stats($ps, 'proxy');
142    isnt($stats->{buffer_memory_limit}, 0, "buf mem limit is set");
143
144    # - test SET commands with values, but nothing being read on backend
145    my $data = 'x' x 30000;
146    my $cmd = "ms foo 30000 T30\r\n" . $data . "\r\n";
147    print $ps $cmd;
148
149    my $be = $mbe[0];
150    my $s = IO::Select->new;
151    $s->add($be);
152    # Wait until the backend has the request queued, then send the second one.
153    my @readable = $s->can_read(1);
154    print $sps $cmd;
155
156    my $res;
157    is(scalar <$be>, "ms foo 30000 T30\r\n", "received first ms");
158    $res = scalar <$be>;
159    print $be "HD\r\n";
160
161    # The second request should have been caught by the memory limiter
162    is(scalar <$sps>, "SERVER_ERROR out of memory\r\n", "got server error");
163    # FIXME: The original response cannot succeed because we cannot allocate
164    # enough memory to read the response from the backend.
165    # This is conveniently testing both paths right here but I would prefer
166    # something better.
167    # TODO: need to see if it's possible to surface an OOM from the backend
168    # handler, but that requires more rewiring.
169    is(scalar <$ps>, "SERVER_ERROR backend failure\r\n", "first request succeeded");
170
171    # Backend gets killed from a read OOM, so we need to re-establish.
172    $be = $mbe[0] = accept_backend($mocksrvs[0]);
173    like(<$watcher>, qr/error=outofmemory/, "OOM log line");
174
175    # Memory limits won't drop until the garbage collectors run, which
176    # requires a bit more push, so instead we raise the limits here so we can
177    # retry from the pre-existing connections to test swallow mode.
178    $p_srv->reload();
179    wait_reload($watcher);
180
181    # Test sending another request down both pipes to ensure they still work.
182    $cmd = "ms foo 2 T30\r\nhi\r\n";
183    print $ps $cmd;
184    is(scalar <$be>, "ms foo 2 T30\r\n", "client works after oom");
185    is(scalar <$be>, "hi\r\n", "client works after oom");
186    print $be "HD\r\n";
187    is(scalar <$ps>, "HD\r\n", "client received resp after oom");
188    print $sps $cmd;
189    is(scalar <$be>, "ms foo 2 T30\r\n", "client works after oom");
190    is(scalar <$be>, "hi\r\n", "client works after oom");
191    print $be "HD\r\n";
192    is(scalar <$sps>, "HD\r\n", "client received resp after oom");
193
194    # - test disabling the limiter
195    $stats = mem_stats($ps, 'proxy');
196    isnt($stats->{buffer_memory_limit}, 0, "buf mem limit is set");
197    $p_srv->reload();
198    wait_reload($watcher);
199
200    $stats = mem_stats($ps, 'proxy');
201    is($stats->{buffer_memory_limit}, 0, "buf mem limit is not set");
202
203    # - test GET commands but don't read back, large backend values
204    # extended testing:
205    # - create errors while holding the buffers?
206};
207
208check_version($ps);
209
210subtest 'memory used counter' => sub {
211    my $be = $mbe[0];
212
213    my $stats = mem_stats($ps, 'proxy');
214    my $used = $stats->{buffer_memory_used};
215    # check a very high number at first. The next batch of requests should
216    # kick the GC enough times to free memory from the previous set of tests.
217    # The rest should be much lower.
218    cmp_ok($used, '<', 1000000, "pre: buffer memory usage not inflated: $used");
219
220    my $cmd = "get foo\r\n";
221    for (1 .. 100) {
222        print $ps $cmd;
223        {
224            my $res = scalar <$be>;
225            print $be "VALUE foo 0 2\r\nhi\r\n";
226            print $be "END\r\n";
227        }
228        my $res = scalar <$ps>;
229        $res = scalar <$ps>;
230        $res = scalar <$ps>;
231    }
232
233    $stats = mem_stats($ps, 'proxy');
234    $used = $stats->{buffer_memory_used};
235    cmp_ok($used, '<', 1000, "mid: buffer memory usage not inflated: $used");
236
237    $cmd = "get foo foo foo foo\r\n";
238    for (1 .. 50) {
239        print $ps $cmd;
240        for (1 .. 4) {
241            my $res = scalar <$be>;
242            print $be "VALUE foo 0 2\r\nhi\r\n";
243            print $be "END\r\n";
244        }
245        for (1 .. 4) {
246            my $res = scalar <$ps>;
247            $res = scalar <$ps>;
248        }
249        # END
250        my $res = scalar <$ps>;
251    }
252
253    $stats = mem_stats($ps, 'proxy');
254    $used = $stats->{buffer_memory_used};
255    cmp_ok($used, '<', 1000, "multiget: buffer memory usage not inflated: $used");
256
257    $cmd = "get foo\r\n";
258    for (1 .. 200) {
259        print $ps $cmd;
260        {
261            my $res = scalar <$be>;
262            print $be "VALUE foo 0 2\r\nhi\r\n";
263            print $be "END\r\n";
264        }
265        my $res = scalar <$ps>;
266        $res = scalar <$ps>;
267        $res = scalar <$ps>;
268    }
269
270    $stats = mem_stats($ps, 'proxy');
271    $used = $stats->{buffer_memory_used};
272    cmp_ok($used, '<', 1000, "post: buffer memory usage not inflated: $used");
273};
274
275# TODO:
276# check reqlimit/bwlimit counters
277
278done_testing();
279