1package MemcachedTest;
2use strict;
3use IO::Socket::INET;
4use IO::Socket::UNIX;
5use POSIX ":sys_wait_h";
6use Exporter 'import';
7use Carp qw(croak);
8use vars qw(@EXPORT);
9
10# Instead of doing the substitution with Autoconf, we assume that
11# cwd == builddir.
12use Cwd;
13my $builddir = getcwd;
14
15my @unixsockets = ();
16
17@EXPORT = qw(new_memcached sleep
18             mem_get_is mem_gets mem_gets_is mem_stats mem_move_time
19             supports_sasl free_port supports_drop_priv supports_extstore
20             wait_ext_flush supports_tls enabled_tls_testing run_help
21             supports_unix_socket get_memcached_exe supports_proxy);
22
23use constant MAX_READ_WRITE_SIZE => 16384;
24use constant SRV_CRT => "server_crt.pem";
25use constant SRV_KEY => "server_key.pem";
26use constant CLIENT_CRT => "client_crt.pem";
27use constant CLIENT_KEY => "client_key.pem";
28use constant CA_CRT => "cacert.pem";
29
30my $testdir = $builddir . "/t/";
31my $client_crt = $testdir. CLIENT_CRT;
32my $client_key = $testdir. CLIENT_KEY;
33my $server_crt = $testdir . SRV_CRT;
34my $server_key = $testdir . SRV_KEY;
35
36my $tls_checked = 0;
37
38sub sleep {
39    my $n = shift;
40    select undef, undef, undef, $n;
41}
42
43# Wait until all items have flushed
44sub wait_ext_flush {
45    my $sock = shift;
46    my $target = shift || 0;
47    my $sum = $target + 1;
48    while ($sum > $target) {
49        my $s = mem_stats($sock, "items");
50        $sum = 0;
51        for my $key (keys %$s) {
52            if ($key =~ m/items:(\d+):number/) {
53                # Ignore classes which can contain extstore items
54                next if $1 < 3;
55                $sum += $s->{$key};
56            }
57        }
58        sleep 1 if $sum > $target;
59    }
60}
61
62sub mem_stats {
63    my ($sock, $type) = @_;
64    $type = $type ? " $type" : "";
65    print $sock "stats$type\r\n";
66    my $stats = {};
67    while (<$sock>) {
68        last if /^(\.|END)/;
69        /^(STAT|ITEM) (\S+)\s+([^\r\n]+)/;
70        #print " slabs: $_";
71        $stats->{$2} = $3;
72    }
73    return $stats;
74}
75
76sub mem_move_time {
77    my ($sock, $move) = @_;
78    print $sock "debugtime $move\r\n";
79    <$sock>;
80}
81
82sub mem_get_is {
83    # works on single-line values only.  no newlines in value.
84    my ($sock_opts, $key, $val, $msg) = @_;
85    my $opts = ref $sock_opts eq "HASH" ? $sock_opts : {};
86    my $sock = ref $sock_opts eq "HASH" ? $opts->{sock} : $sock_opts;
87
88    my $expect_flags = $opts->{flags} || 0;
89    my $dval = defined $val ? "'$val'" : "<undef>";
90    $msg ||= "$key == $dval";
91
92    print $sock "get $key\r\n";
93    if (! defined $val) {
94        my $line = scalar <$sock>;
95        if ($line =~ /^VALUE/) {
96            $line .= scalar(<$sock>) . scalar(<$sock>);
97        }
98        Test::More::is($line, "END\r\n", $msg);
99    } else {
100        my $len = length($val);
101        my $body = scalar(<$sock>);
102        my $expected = "VALUE $key $expect_flags $len\r\n$val\r\nEND\r\n";
103        if (!$body || $body =~ /^END/) {
104            Test::More::is($body, $expected, $msg);
105            return;
106        }
107        $body .= scalar(<$sock>) . scalar(<$sock>);
108        Test::More::is($body, $expected, $msg);
109    }
110}
111
112sub mem_gets {
113    # works on single-line values only.  no newlines in value.
114    my ($sock_opts, $key) = @_;
115    my $opts = ref $sock_opts eq "HASH" ? $sock_opts : {};
116    my $sock = ref $sock_opts eq "HASH" ? $opts->{sock} : $sock_opts;
117    my $val;
118    my $expect_flags = $opts->{flags} || 0;
119
120    print $sock "gets $key\r\n";
121    my $response = <$sock>;
122    if ($response =~ /^END/) {
123        return "NOT_FOUND";
124    }
125    else
126    {
127        $response =~ /VALUE (.*) (\d+) (\d+) (\d+)/;
128        my $flags = $2;
129        my $len = $3;
130        my $identifier = $4;
131        read $sock, $val , $len;
132        # get the END
133        $_ = <$sock>;
134        $_ = <$sock>;
135
136        return ($identifier,$val);
137    }
138
139}
140sub mem_gets_is {
141    # works on single-line values only.  no newlines in value.
142    my ($sock_opts, $identifier, $key, $val, $msg) = @_;
143    my $opts = ref $sock_opts eq "HASH" ? $sock_opts : {};
144    my $sock = ref $sock_opts eq "HASH" ? $opts->{sock} : $sock_opts;
145
146    my $expect_flags = $opts->{flags} || 0;
147    my $dval = defined $val ? "'$val'" : "<undef>";
148    $msg ||= "$key == $dval";
149
150    print $sock "gets $key\r\n";
151    if (! defined $val) {
152        my $line = scalar <$sock>;
153        if ($line =~ /^VALUE/) {
154            $line .= scalar(<$sock>) . scalar(<$sock>);
155        }
156        Test::More::is($line, "END\r\n", $msg);
157    } else {
158        my $len = length($val);
159        my $body = scalar(<$sock>);
160        my $expected = "VALUE $key $expect_flags $len $identifier\r\n$val\r\nEND\r\n";
161        if (!$body || $body =~ /^END/) {
162            Test::More::is($body, $expected, $msg);
163            return;
164        }
165        $body .= scalar(<$sock>) . scalar(<$sock>);
166        Test::More::is($body, $expected, $msg);
167    }
168}
169
170sub free_port {
171    my $type = shift || "tcp";
172    my $sock;
173    my $port;
174    while (!$sock) {
175        $port = int(rand(20000)) + 30000;
176        if (enabled_tls_testing()) {
177            $sock = eval qq{ IO::Socket::SSL->new(LocalAddr => '127.0.0.1',
178                                      LocalPort => $port,
179                                      Proto     => '$type',
180                                      ReuseAddr => 1,
181                                      SSL_verify_mode => SSL_VERIFY_NONE);
182                                      };
183             die $@ if $@; # sanity check.
184        } else {
185            $sock = IO::Socket::INET->new(LocalAddr => '127.0.0.1',
186                                      LocalPort => $port,
187                                      Proto     => $type,
188                                      ReuseAddr => 1);
189        }
190    }
191    return $port;
192}
193
194sub print_help {
195    my $exe = get_memcached_exe();
196    my $output = `$exe -h`;
197    return $output;
198}
199
200sub supports_udp {
201    my $output = print_help();
202    return 0 if $output =~ /^memcached 1\.1\./;
203    return 1;
204}
205
206sub supports_sasl {
207    my $output = print_help();
208    return 1 if $output =~ /sasl/i;
209    return 0;
210}
211
212sub supports_extstore {
213    my $output = print_help();
214    return 1 if $output =~ /ext_path/i;
215    return 0;
216}
217
218sub supports_proxy {
219    my $output = print_help();
220    return 1 if $output =~ /proxy_config/i;
221    return 0;
222}
223
224sub supports_tls {
225    my $output = print_help();
226    return 1 if $output =~ /enable-ssl/i;
227    return 0;
228}
229
230sub supports_unix_socket {
231    my $output = print_help();
232    return 1 if $output =~ /unix-socket/i;
233    return 0;
234}
235
236sub enabled_tls_testing {
237    if ($tls_checked) {
238        return 1;
239    } elsif (supports_tls() && $ENV{SSL_TEST}) {
240        eval "use IO::Socket::SSL";
241        croak("IO::Socket::SSL not installed or failed to load, cannot run SSL tests as requested") if $@;
242        $tls_checked = 1;
243        return 1;
244    }
245}
246
247sub supports_drop_priv {
248    my $output = print_help();
249    return 1 if $output =~ /no_drop_privileges/i;
250    return 0;
251}
252
253sub get_memcached_exe {
254    my $exe = "$builddir/memcached-debug";
255    croak("memcached binary doesn't exist.  Haven't run 'make' ?\n") unless -e $exe;
256    croak("memcached binary not executable\n") unless -x _;
257    return $exe;
258}
259
260sub run_help {
261    my $exe = get_memcached_exe();
262    return system("$exe -h");
263}
264
265# -1 if the pid is actually dead.
266sub is_running {
267    return unless defined $_[0];
268    return waitpid($_[0], WNOHANG) >= 0 ? 1 : 0;
269}
270
271sub new_memcached {
272    my ($args, $passed_port) = @_;
273    my $port = $passed_port;
274    my $host = '127.0.0.1';
275    my $ssl_enabled  = enabled_tls_testing();
276    my $unix_socket_disabled  = !supports_unix_socket();
277    my $use_external = 0;
278    if ($ENV{T_MEMD_EXTERNAL}) {
279        $use_external = $ENV{T_MEMD_EXTERNAL};
280    }
281
282    if ($ENV{T_MEMD_USE_DAEMON}) {
283        my ($host, $port) = ($ENV{T_MEMD_USE_DAEMON} =~ m/^([^:]+):(\d+)$/);
284        my $conn;
285        if ($ssl_enabled) {
286            $conn = eval qq{IO::Socket::SSL->new(PeerAddr => "$host:$port",
287                                        SSL_verify_mode => SSL_VERIFY_NONE,
288                                        SSL_cert_file => '$client_crt',
289                                        SSL_key_file => '$client_key');
290                                        };
291             die $@ if $@; # sanity check.
292        } else {
293            $conn = IO::Socket::INET->new(PeerAddr => "$host:$port");
294        }
295        if ($conn) {
296            return Memcached::Handle->new(conn => $conn,
297                                          host => $host,
298                                          port => $port);
299        }
300        croak("Failed to connect to specified memcached server.") unless $conn;
301    }
302
303    if ($< == 0) {
304        $args .= " -u root";
305    }
306    $args .= " -o relaxed_privileges";
307
308    my $udpport;
309    if ($args =~ /-l (\S+)/ || (($ssl_enabled || $unix_socket_disabled) && ($args !~ /-s (\S+)/))) {
310        if (!$port) {
311            $port = free_port();
312        }
313        $udpport = free_port("udp");
314        $args .= " -p $port";
315        if (supports_udp() && $args !~ /-U (\S+)/) {
316            $args .= " -U $udpport";
317        }
318        if ($ssl_enabled) {
319            $args .= " -Z";
320            if ($args !~ /-o ssl_chain_cert=(\S+)/) {
321                $args .= " -o ssl_chain_cert=$server_crt";
322            }
323            if ($args !~ /-o ssl_key=(\S+)/) {
324                $args .= " -o ssl_key=$server_key";
325            }
326        }
327    } elsif ($args !~ /-s (\S+)/) {
328        my $num = @unixsockets;
329        my $file = "/tmp/memcachetest.$$.$num";
330        if ($use_external) {
331            $file = "/tmp/memcachedtest.$use_external.$num";
332        }
333        $args .= " -s $file";
334        push(@unixsockets, $file);
335    }
336
337    my $wait_tries = 60;
338    my $exe = get_memcached_exe();
339    my $childpid;
340    if ($use_external) {
341        print STDERR "External daemon requested. Start arguments:\n$exe $args\n";
342        $wait_tries = 10000;
343        $childpid;
344    } else {
345        $childpid = fork();
346
347        unless ($childpid) {
348            my $valgrind = "";
349            my $valgrind_args = "--quiet --error-exitcode=1 --exit-on-first-error=yes";
350            if ($ENV{VALGRIND_ARGS}) {
351                $valgrind_args = $ENV{VALGRIND_ARGS};
352            }
353            if ($ENV{VALGRIND_TEST}) {
354                $valgrind = "valgrind $valgrind_args";
355                # NOTE: caller file stuff.
356                $valgrind .= " $ENV{VALGRIND_EXTRA_ARGS}";
357            }
358            my $cmd = "$builddir/timedrun 600 $valgrind $exe $args";
359            #print STDERR "RUN: $cmd\n\n";
360            exec $cmd;
361            exit; # never gets here.
362        }
363    }
364
365    # unix domain sockets
366    if ($args =~ /-s (\S+)/) {
367        # A slow/emulated/valgrinded/etc system may take longer than a second
368        # for the unix socket to appear.
369        my $filename = $1;
370        for (1..$wait_tries) {
371            my $conn = IO::Socket::UNIX->new(Peer => $filename);
372
373            if ($conn) {
374                return Memcached::Handle->new(pid  => $childpid,
375                                              conn => $conn,
376                                              domainsocket => $filename,
377                                              host => $host,
378                                              port => $port);
379            } else {
380                if (!$ENV{T_MEMD_EXTERNAL}) {
381                    croak("Failed to connect to unix socket: memcached not running") unless is_running($childpid);
382                }
383                select undef, undef, undef, 0.20;
384            }
385        }
386        croak("Failed to connect to unix domain socket: $! '$filename'") if $@;
387    }
388
389    # try to connect / find open port, only if we're not using unix domain
390    # sockets
391
392    for (1..80) {
393        my $conn;
394        if ($ssl_enabled) {
395            $conn = eval qq{ IO::Socket::SSL->new(PeerAddr => "127.0.0.1:$port",
396                                        SSL_verify_mode => SSL_VERIFY_NONE,
397                                        SSL_cert_file => '$client_crt',
398                                        SSL_key_file => '$client_key');
399                                        };
400            die $@ if $@; # sanity check.
401        } else {
402            $conn = IO::Socket::INET->new(PeerAddr => "127.0.0.1:$port");
403        }
404        if ($conn) {
405            return Memcached::Handle->new(pid  => $childpid,
406                                          conn => $conn,
407                                          udpport => $udpport,
408                                          host => $host,
409                                          port => $port);
410        }
411        croak("Failed to connect: memcached not running") unless is_running($childpid);
412        select undef, undef, undef, 0.25;
413    }
414    croak("Failed to startup/connect to memcached server.");
415}
416
417END {
418    for (@unixsockets) {
419        unlink $_;
420    }
421}
422
423############################################################################
424package Memcached::Handle;
425use POSIX ":sys_wait_h";
426sub new {
427    my ($class, %params) = @_;
428    return bless \%params, $class;
429}
430
431sub DESTROY {
432    my $self = shift;
433    if ($self->{pid}) {
434        kill 2, $self->{pid};
435    } else {
436        print STDERR "WANT TO ISSUE KILL: 2\n";
437    }
438}
439
440sub stop {
441    my $self = shift;
442    if ($self->{pid}) {
443        kill 15, $self->{pid};
444    } else {
445        print STDERR "WANT TO ISSUE KILL: 15\n";
446    }
447}
448
449sub graceful_stop {
450    my $self = shift;
451    if ($self->{pid}) {
452        kill 'SIGUSR1', $self->{pid};
453    } else {
454        print STDERR "WANT TO ISSUE KILL: SIGUSR1\n";
455    }
456}
457
458sub reload {
459    my $self = shift;
460    if ($self->{pid}) {
461        kill 'SIGHUP', $self->{pid};
462    } else {
463        print STDERR "WANT TO ISSUE KILL: SIGHUP\n";
464    }
465}
466
467# -1 if the pid is actually dead.
468sub is_running {
469    my $self = shift;
470    if ($self->{pid}) {
471        return waitpid($self->{pid}, WNOHANG) >= 0 ? 1 : 0;
472    } else {
473        print STDERR "WANTED TO CHECK IF DAEMON IS RUNNING\n";
474    }
475}
476
477sub host { $_[0]{host} }
478sub port { $_[0]{port} }
479sub udpport { $_[0]{udpport} }
480
481sub sock {
482    my $self = shift;
483
484    if ($self->{conn} && ($self->{domainsocket} || getpeername($self->{conn}))) {
485        return $self->{conn};
486    }
487    return $self->new_sock;
488}
489
490sub new_sock {
491    my $self = shift;
492    if ($self->{domainsocket}) {
493        return IO::Socket::UNIX->new(Peer => $self->{domainsocket});
494    } elsif (MemcachedTest::enabled_tls_testing()) {
495        my $ssl_session_cache = shift;
496        my $ssl_version = shift;
497        return eval qq{ IO::Socket::SSL->new(PeerAddr => "$self->{host}:$self->{port}",
498                                    SSL_verify_mode => IO::Socket::SSL::SSL_VERIFY_NONE,
499                                    SSL_session_cache => \$ssl_session_cache,
500                                    SSL_version => '$ssl_version',
501                                    SSL_cert_file => '$client_crt',
502                                    SSL_key_file => '$client_key');
503                                    };
504    } else {
505        return IO::Socket::INET->new(PeerAddr => "$self->{host}:$self->{port}");
506    }
507}
508
509# needed for a specific test
510sub new_nocert_tls_sock {
511    my $self = shift;
512    if (MemcachedTest::enabled_tls_testing()) {
513        my $port = shift;
514        my $ssl_version = shift;
515        return eval qq{ IO::Socket::SSL->new(PeerAddr => "$self->{host}:$port",
516                                    SSL_verify_mode => IO::Socket::SSL::SSL_VERIFY_NONE,
517                                    SSL_version => '$ssl_version');
518                                    };
519    }
520}
521
522sub new_udp_sock {
523    my $self = shift;
524    return IO::Socket::INET->new(PeerAddr => '127.0.0.1',
525                                 PeerPort => $self->{udpport},
526                                 Proto    => 'udp',
527                                 LocalAddr => '127.0.0.1',
528                                 LocalPort => MemcachedTest::free_port('udp'),
529        );
530
531}
532
533############################################################################
534package Memcached::ProxyTest;
535use IO::Socket qw(AF_INET SOCK_STREAM);
536# We call out to Test::More because of some package instancing. Not completely
537# sure if this is necessary anymore?
538use strict;
539use warnings;
540
541sub new {
542    my ($class, %p) = @_;
543
544    die "needs servers argument"
545        unless exists $p{servers} && ref($p{servers}) eq 'ARRAY';
546
547    $p{_srv} = [];
548    for my $port (@{$p{servers}}) {
549        my $srv = _mock_server($port);
550        Test::More::ok(defined $srv, "mock server object created");
551        push(@{$p{_srv}}, $srv);
552    }
553
554    $p{_csel} = IO::Select->new();
555
556    return bless \%p, $class;
557}
558
559sub _mock_server {
560    my $port = shift;
561    my $srv = IO::Socket->new(
562        Domain => AF_INET,
563        Type => SOCK_STREAM,
564        Proto => 'tcp',
565        LocalHost => '127.0.0.1',
566        LocalPort => $port,
567        ReusePort => 1,
568        Listen => 5) || die "IO::Socket: $@";
569    return $srv;
570}
571
572sub _accept_backend {
573    my $srv = shift;
574    my $be = $srv->accept();
575    $be->autoflush(1);
576    Test::More::ok(defined $be, "mock backend created");
577    Test::More::like(<$be>, qr/version/, "received version command");
578    my $sent = $be->send("VERSION 1.0.0-mock\r\n");
579    Test::More::cmp_ok($sent, '>', 0, "wrote VERSION back to backend");
580
581    return $be;
582}
583
584sub accept_backends {
585    my $self = shift;
586    $self->{_be} = [];
587    for my $srv (@{$self->{_srv}}) {
588        my $be = _accept_backend($srv);
589        push(@{$self->{_be}}, $be);
590    }
591}
592
593sub accept_backend {
594    my $self = shift;
595    my $idx = shift;
596    $self->{_be}->[$idx] = _accept_backend($self->{_srv}->[$idx]);
597}
598
599sub close_backend {
600    my $self = shift;
601    my $idx = shift;
602    $self->{_be}->[$idx]->close;
603}
604
605sub srv_accept_waiting {
606    my $self = shift;
607    my $idx = shift;
608    my $wait = shift || 1;
609    my $s = IO::Select->new();
610    $s->add($self->{_srv}->[$idx]);
611    return $s->can_read($wait);
612}
613
614sub set_c {
615    my $self = shift;
616    my $sel = $self->{_csel};
617    if (exists $self->{_c}) {
618        $sel->remove($self->{_c});
619    }
620    $self->{_c} = shift;
621    $sel->add($self->{_c});
622}
623
624sub check_c {
625    my $self = shift;
626    my $c = $self->{_c};
627    print $c "version\r\n";
628    Test::More::like(scalar <$c>, qr/VERSION /, "version received");
629}
630
631sub wait_c {
632    my ($self, $wait) = @_;
633    return $self->{_csel}->can_read($wait);
634}
635
636# Remembers the last command sent to the client socket.
637sub c_send {
638    my $self = shift;
639    my $cmd = shift;
640    my $c = $self->{_c};
641    print $c $cmd;
642    $self->{_cmd} = $cmd;
643}
644
645# Backends can be specified as a bare number, and array reference ([0,1,2]),
646# or 'all' to run against all available backends.
647sub _be_list {
648    my $self = shift;
649    my $list = shift;
650    my @l = ();
651    if (ref $list eq '') {
652        if (exists $self->{_be}->[$list]) {
653            push(@l, $self->{_be}->[$list]);
654        } elsif ($list eq 'all') {
655            @l = @{$self->{_be}};
656        } else {
657            die "unknown argument";
658        }
659    } elsif (ref $list eq 'ARRAY') {
660        for my $i (@$list) {
661            push(@l, $self->{_be}->[$i]);
662        }
663    }
664    return \@l;
665}
666
667# Check that the last command sent to the client arrives at the backends.
668# This is a common case so this saves typing/errors while writing tests.
669sub be_recv_c {
670    my $self = shift;
671    my $list = shift;
672    my $detail = shift || 'be received data';
673    die "issue a a command with c_send before calling be_recv_c" unless exists $self->{_cmd};
674
675    my $l = $self->_be_list($list);
676    my $cmd = $self->{_cmd};
677    my @cmds = split(/(?<=\r\n)/, $cmd);
678    for my $be (@$l) {
679        for my $c (@cmds) {
680            Test::More::is(scalar <$be>, $c, $detail);
681        }
682    }
683}
684
685sub be_recv_like {
686    my $self = shift;
687    my $list = shift;
688    my $cmd = shift || die "must provide a command to check";
689    my $detail = shift || 'be received data';
690
691    my $l = $self->_be_list($list);
692    for my $be (@$l) {
693        Test::More::like(scalar <$be>, $cmd, $detail);
694    }
695}
696
697# Receive a different/specific string to the backend socket.
698sub be_recv {
699    my $self = shift;
700    my $list = shift;
701    my $cmd = shift || die "must provide a command to check";
702    my $detail = shift || 'be received data';
703
704    my $l = $self->_be_list($list);
705    for my $be (@$l) {
706        Test::More::is(scalar <$be>, $cmd, $detail);
707    }
708}
709
710# Sends a specific command to a backend socket back towards the proxy.
711# Remembers the last command sent.
712sub be_send {
713    my $self = shift;
714    my $list = shift;
715    my $cmd = shift || die "must provide data to return";
716
717    $self->{_becmd} = $cmd;
718    my $l = $self->_be_list($list);
719    for my $be (@$l) {
720        print $be $cmd;
721    }
722}
723
724# Client receives the last command sent to any backend. This is also a common
725# case so we save some typing/errors by remembering this here.
726sub c_recv_be {
727    my $self = shift;
728    my $detail = shift || 'client received be response';
729
730    my $cmd = $self->{_becmd};
731    my $c = $self->{_c};
732    Test::More::is(scalar <$c>, $cmd, $detail);
733}
734
735# Client to receive an arbitrary string.
736sub c_recv {
737    my $self = shift;
738    my $cmd = shift;
739    my $detail = shift || 'client received response';
740
741    my $c = $self->{_c};
742    Test::More::is(scalar <$c>, $cmd, $detail);
743}
744
745# Clear out any remembered commands and check the client pipe is clear.
746sub clear {
747    my $self = shift;
748    delete $self->{_becmd} if exists $self->{_becmd};
749    delete $self->{_cmd} if exists $self->{_cmd};
750    $self->check_c();
751}
752
7531;
754