1#!/usr/bin/env perl 2 3use strict; 4use warnings; 5use Test::More; 6use FindBin qw($Bin); 7use lib "$Bin/lib"; 8use Carp qw(croak); 9use MemcachedTest; 10use IO::Select; 11use IO::Socket qw(AF_INET SOCK_STREAM); 12 13if (!supports_proxy()) { 14 plan skip_all => 'proxy not enabled'; 15 exit 0; 16} 17 18# Set up some server sockets. 19sub mock_server { 20 my $port = shift; 21 my $srv = IO::Socket->new( 22 Domain => AF_INET, 23 Type => SOCK_STREAM, 24 Proto => 'tcp', 25 LocalHost => '127.0.0.1', 26 LocalPort => $port, 27 ReusePort => 1, 28 Listen => 5) || die "IO::Socket: $@"; 29 return $srv; 30} 31 32sub accept_backend { 33 my $srv = shift; 34 my $be = $srv->accept(); 35 $be->autoflush(1); 36 ok(defined $be, "mock backend created"); 37 like(<$be>, qr/version/, "received version command"); 38 print $be "VERSION 1.0.0-mock\r\n"; 39 40 return $be; 41} 42 43# Put a version command down the pipe to ensure the socket is clear. 44# client version commands skip the proxy code 45sub check_version { 46 my $ps = shift; 47 print $ps "version\r\n"; 48 like(<$ps>, qr/VERSION /, "version received"); 49} 50 51sub wait_reload { 52 my $w = shift; 53 like(<$w>, qr/ts=(\S+) gid=\d+ type=proxy_conf status=start/, "reload started"); 54 like(<$w>, qr/ts=(\S+) gid=\d+ type=proxy_conf status=done/, "reload completed"); 55} 56 57my @mocksrvs = (); 58note "making mock servers"; 59for my $port (11711, 11712, 11713) { 60 my $srv = mock_server($port); 61 ok(defined $srv, "mock server created"); 62 push(@mocksrvs, $srv); 63} 64 65# Start up a clean server. 66# Since limits are per worker thread, cut the worker threads down to 1 to ease 67# testing. 68my $p_srv = new_memcached('-o proxy_config=./t/proxylimits.lua -t 1'); 69my $ps = $p_srv->sock; 70$ps->autoflush(1); 71my @mbe = (); 72my $watcher; 73 74subtest 'active request limit' => sub { 75 for my $msrv ($mocksrvs[0], $mocksrvs[1], $mocksrvs[2]) { 76 my $be = accept_backend($msrv); 77 push(@mbe, $be); 78 } 79 80 my $stats = mem_stats($ps, 'proxy'); 81 isnt($stats->{active_req_limit}, 0, "active request limit is set"); 82 83 # active request limit is 4, pipeline 6 requests and ensure the last two 84 # get junked 85 my $cmd = ''; 86 for ('a', 'b', 'c', 'd', 'e', 'f') { 87 $cmd .= "mg /test/$_\r\n"; 88 } 89 print $ps $cmd; 90 91 # Lua config only sends commands to the first backend for this test. 92 my $be = $mbe[0]; 93 for (1 .. 4) { 94 like(<$be>, qr/^mg \/test\/\w\r\n$/, "backend received mg"); 95 print $be "EN\r\n"; 96 } 97 my $s = IO::Select->new(); 98 $s->add($be); 99 my @readable = $s->can_read(0.25); 100 is(scalar @readable, 0, "no more pending reads on backend"); 101 102 for (1 .. 4) { 103 is(scalar <$ps>, "EN\r\n", "received miss from backend"); 104 } 105 106 is(scalar <$ps>, "SERVER_ERROR active request limit reached\r\n", "got error back"); 107 is(scalar <$ps>, "SERVER_ERROR active request limit reached\r\n", "got two limit errors"); 108 109 # Test turning the limit back off. 110 $watcher = $p_srv->new_sock; 111 print $watcher "watch proxyevents\n"; 112 is(<$watcher>, "OK\r\n", "watcher enabled"); 113 $p_srv->reload(); 114 wait_reload($watcher); 115 116 $stats = mem_stats($ps, 'proxy'); 117 is($stats->{active_req_limit}, 0, "active request limit unset"); 118 119 $cmd = ''; 120 for ('a', 'b', 'c', 'd', 'e', 'f') { 121 $cmd .= "mg /test/$_\r\n"; 122 } 123 print $ps $cmd; 124 for (1 .. 6) { 125 like(<$be>, qr/^mg \/test\/\w\r\n$/, "backend received mg"); 126 print $be "EN\r\n"; 127 } 128 for (1 .. 6) { 129 is(scalar <$ps>, "EN\r\n", "received miss from backend"); 130 } 131}; 132 133subtest 'buffer memory limit' => sub { 134 # Test the buffer memory limiter. 135 # - limit per worker will be 1/t global limit 136 $p_srv->reload(); 137 wait_reload($watcher); 138 # Get a secondary client to trample limit. 139 my $sps = $p_srv->new_sock; 140 141 my $stats = mem_stats($ps, 'proxy'); 142 isnt($stats->{buffer_memory_limit}, 0, "buf mem limit is set"); 143 144 # - test SET commands with values, but nothing being read on backend 145 my $data = 'x' x 30000; 146 my $cmd = "ms foo 30000 T30\r\n" . $data . "\r\n"; 147 print $ps $cmd; 148 149 my $be = $mbe[0]; 150 my $s = IO::Select->new; 151 $s->add($be); 152 # Wait until the backend has the request queued, then send the second one. 153 my @readable = $s->can_read(1); 154 print $sps $cmd; 155 156 my $res; 157 is(scalar <$be>, "ms foo 30000 T30\r\n", "received first ms"); 158 $res = scalar <$be>; 159 print $be "HD\r\n"; 160 161 # The second request should have been caught by the memory limiter 162 is(scalar <$sps>, "SERVER_ERROR out of memory\r\n", "got server error"); 163 # FIXME: The original response cannot succeed because we cannot allocate 164 # enough memory to read the response from the backend. 165 # This is conveniently testing both paths right here but I would prefer 166 # something better. 167 # TODO: need to see if it's possible to surface an OOM from the backend 168 # handler, but that requires more rewiring. 169 is(scalar <$ps>, "SERVER_ERROR backend failure\r\n", "first request succeeded"); 170 171 # Backend gets killed from a read OOM, so we need to re-establish. 172 $be = $mbe[0] = accept_backend($mocksrvs[0]); 173 like(<$watcher>, qr/error=outofmemory/, "OOM log line"); 174 175 # Memory limits won't drop until the garbage collectors run, which 176 # requires a bit more push, so instead we raise the limits here so we can 177 # retry from the pre-existing connections to test swallow mode. 178 $p_srv->reload(); 179 wait_reload($watcher); 180 181 # Test sending another request down both pipes to ensure they still work. 182 $cmd = "ms foo 2 T30\r\nhi\r\n"; 183 print $ps $cmd; 184 is(scalar <$be>, "ms foo 2 T30\r\n", "client works after oom"); 185 is(scalar <$be>, "hi\r\n", "client works after oom"); 186 print $be "HD\r\n"; 187 is(scalar <$ps>, "HD\r\n", "client received resp after oom"); 188 print $sps $cmd; 189 is(scalar <$be>, "ms foo 2 T30\r\n", "client works after oom"); 190 is(scalar <$be>, "hi\r\n", "client works after oom"); 191 print $be "HD\r\n"; 192 is(scalar <$sps>, "HD\r\n", "client received resp after oom"); 193 194 # - test disabling the limiter 195 $stats = mem_stats($ps, 'proxy'); 196 isnt($stats->{buffer_memory_limit}, 0, "buf mem limit is set"); 197 $p_srv->reload(); 198 wait_reload($watcher); 199 200 $stats = mem_stats($ps, 'proxy'); 201 is($stats->{buffer_memory_limit}, 0, "buf mem limit is not set"); 202 203 # - test GET commands but don't read back, large backend values 204 # extended testing: 205 # - create errors while holding the buffers? 206}; 207 208check_version($ps); 209 210subtest 'memory used counter' => sub { 211 my $be = $mbe[0]; 212 213 my $stats = mem_stats($ps, 'proxy'); 214 my $used = $stats->{buffer_memory_used}; 215 # check a very high number at first. The next batch of requests should 216 # kick the GC enough times to free memory from the previous set of tests. 217 # The rest should be much lower. 218 cmp_ok($used, '<', 1000000, "pre: buffer memory usage not inflated: $used"); 219 220 my $cmd = "get foo\r\n"; 221 for (1 .. 100) { 222 print $ps $cmd; 223 { 224 my $res = scalar <$be>; 225 print $be "VALUE foo 0 2\r\nhi\r\n"; 226 print $be "END\r\n"; 227 } 228 my $res = scalar <$ps>; 229 $res = scalar <$ps>; 230 $res = scalar <$ps>; 231 } 232 233 $stats = mem_stats($ps, 'proxy'); 234 $used = $stats->{buffer_memory_used}; 235 cmp_ok($used, '<', 1000, "mid: buffer memory usage not inflated: $used"); 236 237 $cmd = "get foo foo foo foo\r\n"; 238 for (1 .. 50) { 239 print $ps $cmd; 240 for (1 .. 4) { 241 my $res = scalar <$be>; 242 print $be "VALUE foo 0 2\r\nhi\r\n"; 243 print $be "END\r\n"; 244 } 245 for (1 .. 4) { 246 my $res = scalar <$ps>; 247 $res = scalar <$ps>; 248 } 249 # END 250 my $res = scalar <$ps>; 251 } 252 253 $stats = mem_stats($ps, 'proxy'); 254 $used = $stats->{buffer_memory_used}; 255 cmp_ok($used, '<', 1000, "multiget: buffer memory usage not inflated: $used"); 256 257 $cmd = "get foo\r\n"; 258 for (1 .. 200) { 259 print $ps $cmd; 260 { 261 my $res = scalar <$be>; 262 print $be "VALUE foo 0 2\r\nhi\r\n"; 263 print $be "END\r\n"; 264 } 265 my $res = scalar <$ps>; 266 $res = scalar <$ps>; 267 $res = scalar <$ps>; 268 } 269 270 $stats = mem_stats($ps, 'proxy'); 271 $used = $stats->{buffer_memory_used}; 272 cmp_ok($used, '<', 1000, "post: buffer memory usage not inflated: $used"); 273}; 274 275# TODO: 276# check reqlimit/bwlimit counters 277 278done_testing(); 279