1package MemcachedTest; 2use strict; 3use IO::Socket::INET; 4use IO::Socket::UNIX; 5use POSIX ":sys_wait_h"; 6use Exporter 'import'; 7use Carp qw(croak); 8use vars qw(@EXPORT); 9 10# Instead of doing the substitution with Autoconf, we assume that 11# cwd == builddir. 12use Cwd; 13my $builddir = getcwd; 14 15my @unixsockets = (); 16 17@EXPORT = qw(new_memcached sleep 18 mem_get_is mem_gets mem_gets_is mem_stats mem_move_time 19 supports_sasl free_port supports_drop_priv supports_extstore 20 wait_ext_flush supports_tls enabled_tls_testing run_help 21 supports_unix_socket get_memcached_exe supports_proxy); 22 23use constant MAX_READ_WRITE_SIZE => 16384; 24use constant SRV_CRT => "server_crt.pem"; 25use constant SRV_KEY => "server_key.pem"; 26use constant CLIENT_CRT => "client_crt.pem"; 27use constant CLIENT_KEY => "client_key.pem"; 28use constant CA_CRT => "cacert.pem"; 29 30my $testdir = $builddir . "/t/"; 31my $client_crt = $testdir. CLIENT_CRT; 32my $client_key = $testdir. CLIENT_KEY; 33my $server_crt = $testdir . SRV_CRT; 34my $server_key = $testdir . SRV_KEY; 35 36my $tls_checked = 0; 37 38sub sleep { 39 my $n = shift; 40 select undef, undef, undef, $n; 41} 42 43# Wait until all items have flushed 44sub wait_ext_flush { 45 my $sock = shift; 46 my $target = shift || 0; 47 my $sum = $target + 1; 48 while ($sum > $target) { 49 my $s = mem_stats($sock, "items"); 50 $sum = 0; 51 for my $key (keys %$s) { 52 if ($key =~ m/items:(\d+):number/) { 53 # Ignore classes which can contain extstore items 54 next if $1 < 3; 55 $sum += $s->{$key}; 56 } 57 } 58 sleep 1 if $sum > $target; 59 } 60} 61 62sub mem_stats { 63 my ($sock, $type) = @_; 64 $type = $type ? " $type" : ""; 65 print $sock "stats$type\r\n"; 66 my $stats = {}; 67 while (<$sock>) { 68 last if /^(\.|END)/; 69 /^(STAT|ITEM) (\S+)\s+([^\r\n]+)/; 70 #print " slabs: $_"; 71 $stats->{$2} = $3; 72 } 73 return $stats; 74} 75 76sub mem_move_time { 77 my ($sock, $move) = @_; 78 print $sock "debugtime $move\r\n"; 79 <$sock>; 80} 81 82sub mem_get_is { 83 # works on single-line values only. no newlines in value. 84 my ($sock_opts, $key, $val, $msg) = @_; 85 my $opts = ref $sock_opts eq "HASH" ? $sock_opts : {}; 86 my $sock = ref $sock_opts eq "HASH" ? $opts->{sock} : $sock_opts; 87 88 my $expect_flags = $opts->{flags} || 0; 89 my $dval = defined $val ? "'$val'" : "<undef>"; 90 $msg ||= "$key == $dval"; 91 92 print $sock "get $key\r\n"; 93 if (! defined $val) { 94 my $line = scalar <$sock>; 95 if ($line =~ /^VALUE/) { 96 $line .= scalar(<$sock>) . scalar(<$sock>); 97 } 98 Test::More::is($line, "END\r\n", $msg); 99 } else { 100 my $len = length($val); 101 my $body = scalar(<$sock>); 102 my $expected = "VALUE $key $expect_flags $len\r\n$val\r\nEND\r\n"; 103 if (!$body || $body =~ /^END/) { 104 Test::More::is($body, $expected, $msg); 105 return; 106 } 107 $body .= scalar(<$sock>) . scalar(<$sock>); 108 Test::More::is($body, $expected, $msg); 109 } 110} 111 112sub mem_gets { 113 # works on single-line values only. no newlines in value. 114 my ($sock_opts, $key) = @_; 115 my $opts = ref $sock_opts eq "HASH" ? $sock_opts : {}; 116 my $sock = ref $sock_opts eq "HASH" ? $opts->{sock} : $sock_opts; 117 my $val; 118 my $expect_flags = $opts->{flags} || 0; 119 120 print $sock "gets $key\r\n"; 121 my $response = <$sock>; 122 if ($response =~ /^END/) { 123 return "NOT_FOUND"; 124 } 125 else 126 { 127 $response =~ /VALUE (.*) (\d+) (\d+) (\d+)/; 128 my $flags = $2; 129 my $len = $3; 130 my $identifier = $4; 131 read $sock, $val , $len; 132 # get the END 133 $_ = <$sock>; 134 $_ = <$sock>; 135 136 return ($identifier,$val); 137 } 138 139} 140sub mem_gets_is { 141 # works on single-line values only. no newlines in value. 142 my ($sock_opts, $identifier, $key, $val, $msg) = @_; 143 my $opts = ref $sock_opts eq "HASH" ? $sock_opts : {}; 144 my $sock = ref $sock_opts eq "HASH" ? $opts->{sock} : $sock_opts; 145 146 my $expect_flags = $opts->{flags} || 0; 147 my $dval = defined $val ? "'$val'" : "<undef>"; 148 $msg ||= "$key == $dval"; 149 150 print $sock "gets $key\r\n"; 151 if (! defined $val) { 152 my $line = scalar <$sock>; 153 if ($line =~ /^VALUE/) { 154 $line .= scalar(<$sock>) . scalar(<$sock>); 155 } 156 Test::More::is($line, "END\r\n", $msg); 157 } else { 158 my $len = length($val); 159 my $body = scalar(<$sock>); 160 my $expected = "VALUE $key $expect_flags $len $identifier\r\n$val\r\nEND\r\n"; 161 if (!$body || $body =~ /^END/) { 162 Test::More::is($body, $expected, $msg); 163 return; 164 } 165 $body .= scalar(<$sock>) . scalar(<$sock>); 166 Test::More::is($body, $expected, $msg); 167 } 168} 169 170sub free_port { 171 my $type = shift || "tcp"; 172 my $sock; 173 my $port; 174 while (!$sock) { 175 $port = int(rand(20000)) + 30000; 176 if (enabled_tls_testing()) { 177 $sock = eval qq{ IO::Socket::SSL->new(LocalAddr => '127.0.0.1', 178 LocalPort => $port, 179 Proto => '$type', 180 ReuseAddr => 1, 181 SSL_verify_mode => SSL_VERIFY_NONE); 182 }; 183 die $@ if $@; # sanity check. 184 } else { 185 $sock = IO::Socket::INET->new(LocalAddr => '127.0.0.1', 186 LocalPort => $port, 187 Proto => $type, 188 ReuseAddr => 1); 189 } 190 } 191 return $port; 192} 193 194sub print_help { 195 my $exe = get_memcached_exe(); 196 my $output = `$exe -h`; 197 return $output; 198} 199 200sub supports_udp { 201 my $output = print_help(); 202 return 0 if $output =~ /^memcached 1\.1\./; 203 return 1; 204} 205 206sub supports_sasl { 207 my $output = print_help(); 208 return 1 if $output =~ /sasl/i; 209 return 0; 210} 211 212sub supports_extstore { 213 my $output = print_help(); 214 return 1 if $output =~ /ext_path/i; 215 return 0; 216} 217 218sub supports_proxy { 219 my $output = print_help(); 220 return 1 if $output =~ /proxy_config/i; 221 return 0; 222} 223 224sub supports_tls { 225 my $output = print_help(); 226 return 1 if $output =~ /enable-ssl/i; 227 return 0; 228} 229 230sub supports_unix_socket { 231 my $output = print_help(); 232 return 1 if $output =~ /unix-socket/i; 233 return 0; 234} 235 236sub enabled_tls_testing { 237 if ($tls_checked) { 238 return 1; 239 } elsif (supports_tls() && $ENV{SSL_TEST}) { 240 eval "use IO::Socket::SSL"; 241 croak("IO::Socket::SSL not installed or failed to load, cannot run SSL tests as requested") if $@; 242 $tls_checked = 1; 243 return 1; 244 } 245} 246 247sub supports_drop_priv { 248 my $output = print_help(); 249 return 1 if $output =~ /no_drop_privileges/i; 250 return 0; 251} 252 253sub get_memcached_exe { 254 my $exe = "$builddir/memcached-debug"; 255 croak("memcached binary doesn't exist. Haven't run 'make' ?\n") unless -e $exe; 256 croak("memcached binary not executable\n") unless -x _; 257 return $exe; 258} 259 260sub run_help { 261 my $exe = get_memcached_exe(); 262 return system("$exe -h"); 263} 264 265# -1 if the pid is actually dead. 266sub is_running { 267 return unless defined $_[0]; 268 return waitpid($_[0], WNOHANG) >= 0 ? 1 : 0; 269} 270 271sub new_memcached { 272 my ($args, $passed_port) = @_; 273 my $port = $passed_port; 274 my $host = '127.0.0.1'; 275 my $ssl_enabled = enabled_tls_testing(); 276 my $unix_socket_disabled = !supports_unix_socket(); 277 my $use_external = 0; 278 if ($ENV{T_MEMD_EXTERNAL}) { 279 $use_external = $ENV{T_MEMD_EXTERNAL}; 280 } 281 282 if ($ENV{T_MEMD_USE_DAEMON}) { 283 my ($host, $port) = ($ENV{T_MEMD_USE_DAEMON} =~ m/^([^:]+):(\d+)$/); 284 my $conn; 285 if ($ssl_enabled) { 286 $conn = eval qq{IO::Socket::SSL->new(PeerAddr => "$host:$port", 287 SSL_verify_mode => SSL_VERIFY_NONE, 288 SSL_cert_file => '$client_crt', 289 SSL_key_file => '$client_key'); 290 }; 291 die $@ if $@; # sanity check. 292 } else { 293 $conn = IO::Socket::INET->new(PeerAddr => "$host:$port"); 294 } 295 if ($conn) { 296 return Memcached::Handle->new(conn => $conn, 297 host => $host, 298 port => $port); 299 } 300 croak("Failed to connect to specified memcached server.") unless $conn; 301 } 302 303 if ($< == 0) { 304 $args .= " -u root"; 305 } 306 $args .= " -o relaxed_privileges"; 307 308 my $udpport; 309 if ($args =~ /-l (\S+)/ || (($ssl_enabled || $unix_socket_disabled) && ($args !~ /-s (\S+)/))) { 310 if (!$port) { 311 $port = free_port(); 312 } 313 $udpport = free_port("udp"); 314 $args .= " -p $port"; 315 if (supports_udp() && $args !~ /-U (\S+)/) { 316 $args .= " -U $udpport"; 317 } 318 if ($ssl_enabled) { 319 $args .= " -Z"; 320 if ($args !~ /-o ssl_chain_cert=(\S+)/) { 321 $args .= " -o ssl_chain_cert=$server_crt"; 322 } 323 if ($args !~ /-o ssl_key=(\S+)/) { 324 $args .= " -o ssl_key=$server_key"; 325 } 326 } 327 } elsif ($args !~ /-s (\S+)/) { 328 my $num = @unixsockets; 329 my $file = "/tmp/memcachetest.$$.$num"; 330 if ($use_external) { 331 $file = "/tmp/memcachedtest.$use_external.$num"; 332 } 333 $args .= " -s $file"; 334 push(@unixsockets, $file); 335 } 336 337 my $wait_tries = 60; 338 my $exe = get_memcached_exe(); 339 my $childpid; 340 if ($use_external) { 341 print STDERR "External daemon requested. Start arguments:\n$exe $args\n"; 342 $wait_tries = 10000; 343 $childpid; 344 } else { 345 $childpid = fork(); 346 347 unless ($childpid) { 348 my $valgrind = ""; 349 my $valgrind_args = "--quiet --error-exitcode=1 --exit-on-first-error=yes"; 350 if ($ENV{VALGRIND_ARGS}) { 351 $valgrind_args = $ENV{VALGRIND_ARGS}; 352 } 353 if ($ENV{VALGRIND_TEST}) { 354 $valgrind = "valgrind $valgrind_args"; 355 # NOTE: caller file stuff. 356 $valgrind .= " $ENV{VALGRIND_EXTRA_ARGS}"; 357 } 358 my $cmd = "$builddir/timedrun 600 $valgrind $exe $args"; 359 #print STDERR "RUN: $cmd\n\n"; 360 exec $cmd; 361 exit; # never gets here. 362 } 363 } 364 365 # unix domain sockets 366 if ($args =~ /-s (\S+)/) { 367 # A slow/emulated/valgrinded/etc system may take longer than a second 368 # for the unix socket to appear. 369 my $filename = $1; 370 for (1..$wait_tries) { 371 my $conn = IO::Socket::UNIX->new(Peer => $filename); 372 373 if ($conn) { 374 return Memcached::Handle->new(pid => $childpid, 375 conn => $conn, 376 domainsocket => $filename, 377 host => $host, 378 port => $port); 379 } else { 380 if (!$ENV{T_MEMD_EXTERNAL}) { 381 croak("Failed to connect to unix socket: memcached not running") unless is_running($childpid); 382 } 383 select undef, undef, undef, 0.20; 384 } 385 } 386 croak("Failed to connect to unix domain socket: $! '$filename'") if $@; 387 } 388 389 # try to connect / find open port, only if we're not using unix domain 390 # sockets 391 392 for (1..80) { 393 my $conn; 394 if ($ssl_enabled) { 395 $conn = eval qq{ IO::Socket::SSL->new(PeerAddr => "127.0.0.1:$port", 396 SSL_verify_mode => SSL_VERIFY_NONE, 397 SSL_cert_file => '$client_crt', 398 SSL_key_file => '$client_key'); 399 }; 400 die $@ if $@; # sanity check. 401 } else { 402 $conn = IO::Socket::INET->new(PeerAddr => "127.0.0.1:$port"); 403 } 404 if ($conn) { 405 return Memcached::Handle->new(pid => $childpid, 406 conn => $conn, 407 udpport => $udpport, 408 host => $host, 409 port => $port); 410 } 411 croak("Failed to connect: memcached not running") unless is_running($childpid); 412 select undef, undef, undef, 0.25; 413 } 414 croak("Failed to startup/connect to memcached server."); 415} 416 417END { 418 for (@unixsockets) { 419 unlink $_; 420 } 421} 422 423############################################################################ 424package Memcached::Handle; 425use POSIX ":sys_wait_h"; 426sub new { 427 my ($class, %params) = @_; 428 return bless \%params, $class; 429} 430 431sub DESTROY { 432 my $self = shift; 433 if ($self->{pid}) { 434 kill 2, $self->{pid}; 435 } else { 436 print STDERR "WANT TO ISSUE KILL: 2\n"; 437 } 438} 439 440sub stop { 441 my $self = shift; 442 if ($self->{pid}) { 443 kill 15, $self->{pid}; 444 } else { 445 print STDERR "WANT TO ISSUE KILL: 15\n"; 446 } 447} 448 449sub graceful_stop { 450 my $self = shift; 451 if ($self->{pid}) { 452 kill 'SIGUSR1', $self->{pid}; 453 } else { 454 print STDERR "WANT TO ISSUE KILL: SIGUSR1\n"; 455 } 456} 457 458sub reload { 459 my $self = shift; 460 if ($self->{pid}) { 461 kill 'SIGHUP', $self->{pid}; 462 } else { 463 print STDERR "WANT TO ISSUE KILL: SIGHUP\n"; 464 } 465} 466 467# -1 if the pid is actually dead. 468sub is_running { 469 my $self = shift; 470 if ($self->{pid}) { 471 return waitpid($self->{pid}, WNOHANG) >= 0 ? 1 : 0; 472 } else { 473 print STDERR "WANTED TO CHECK IF DAEMON IS RUNNING\n"; 474 } 475} 476 477sub host { $_[0]{host} } 478sub port { $_[0]{port} } 479sub udpport { $_[0]{udpport} } 480 481sub sock { 482 my $self = shift; 483 484 if ($self->{conn} && ($self->{domainsocket} || getpeername($self->{conn}))) { 485 return $self->{conn}; 486 } 487 return $self->new_sock; 488} 489 490sub new_sock { 491 my $self = shift; 492 if ($self->{domainsocket}) { 493 return IO::Socket::UNIX->new(Peer => $self->{domainsocket}); 494 } elsif (MemcachedTest::enabled_tls_testing()) { 495 my $ssl_session_cache = shift; 496 my $ssl_version = shift; 497 return eval qq{ IO::Socket::SSL->new(PeerAddr => "$self->{host}:$self->{port}", 498 SSL_verify_mode => IO::Socket::SSL::SSL_VERIFY_NONE, 499 SSL_session_cache => \$ssl_session_cache, 500 SSL_version => '$ssl_version', 501 SSL_cert_file => '$client_crt', 502 SSL_key_file => '$client_key'); 503 }; 504 } else { 505 return IO::Socket::INET->new(PeerAddr => "$self->{host}:$self->{port}"); 506 } 507} 508 509# needed for a specific test 510sub new_nocert_tls_sock { 511 my $self = shift; 512 if (MemcachedTest::enabled_tls_testing()) { 513 my $port = shift; 514 my $ssl_version = shift; 515 return eval qq{ IO::Socket::SSL->new(PeerAddr => "$self->{host}:$port", 516 SSL_verify_mode => IO::Socket::SSL::SSL_VERIFY_NONE, 517 SSL_version => '$ssl_version'); 518 }; 519 } 520} 521 522sub new_udp_sock { 523 my $self = shift; 524 return IO::Socket::INET->new(PeerAddr => '127.0.0.1', 525 PeerPort => $self->{udpport}, 526 Proto => 'udp', 527 LocalAddr => '127.0.0.1', 528 LocalPort => MemcachedTest::free_port('udp'), 529 ); 530 531} 532 533############################################################################ 534package Memcached::ProxyTest; 535use IO::Socket qw(AF_INET SOCK_STREAM); 536# We call out to Test::More because of some package instancing. Not completely 537# sure if this is necessary anymore? 538use strict; 539use warnings; 540 541sub new { 542 my ($class, %p) = @_; 543 544 die "needs servers argument" 545 unless exists $p{servers} && ref($p{servers}) eq 'ARRAY'; 546 547 $p{_srv} = []; 548 for my $port (@{$p{servers}}) { 549 my $srv = _mock_server($port); 550 Test::More::ok(defined $srv, "mock server object created"); 551 push(@{$p{_srv}}, $srv); 552 } 553 554 $p{_csel} = IO::Select->new(); 555 556 return bless \%p, $class; 557} 558 559sub _mock_server { 560 my $port = shift; 561 my $srv = IO::Socket->new( 562 Domain => AF_INET, 563 Type => SOCK_STREAM, 564 Proto => 'tcp', 565 LocalHost => '127.0.0.1', 566 LocalPort => $port, 567 ReusePort => 1, 568 Listen => 5) || die "IO::Socket: $@"; 569 return $srv; 570} 571 572sub _accept_backend { 573 my $srv = shift; 574 my $be = $srv->accept(); 575 $be->autoflush(1); 576 Test::More::ok(defined $be, "mock backend created"); 577 Test::More::like(<$be>, qr/version/, "received version command"); 578 my $sent = $be->send("VERSION 1.0.0-mock\r\n"); 579 Test::More::cmp_ok($sent, '>', 0, "wrote VERSION back to backend"); 580 581 return $be; 582} 583 584sub accept_backends { 585 my $self = shift; 586 $self->{_be} = []; 587 for my $srv (@{$self->{_srv}}) { 588 my $be = _accept_backend($srv); 589 push(@{$self->{_be}}, $be); 590 } 591} 592 593sub accept_backend { 594 my $self = shift; 595 my $idx = shift; 596 $self->{_be}->[$idx] = _accept_backend($self->{_srv}->[$idx]); 597} 598 599sub close_backend { 600 my $self = shift; 601 my $idx = shift; 602 $self->{_be}->[$idx]->close; 603} 604 605sub srv_accept_waiting { 606 my $self = shift; 607 my $idx = shift; 608 my $wait = shift || 1; 609 my $s = IO::Select->new(); 610 $s->add($self->{_srv}->[$idx]); 611 return $s->can_read($wait); 612} 613 614sub set_c { 615 my $self = shift; 616 my $sel = $self->{_csel}; 617 if (exists $self->{_c}) { 618 $sel->remove($self->{_c}); 619 } 620 $self->{_c} = shift; 621 $sel->add($self->{_c}); 622} 623 624sub check_c { 625 my $self = shift; 626 my $c = $self->{_c}; 627 print $c "version\r\n"; 628 Test::More::like(scalar <$c>, qr/VERSION /, "version received"); 629} 630 631sub wait_c { 632 my ($self, $wait) = @_; 633 return $self->{_csel}->can_read($wait); 634} 635 636# Remembers the last command sent to the client socket. 637sub c_send { 638 my $self = shift; 639 my $cmd = shift; 640 my $c = $self->{_c}; 641 print $c $cmd; 642 $self->{_cmd} = $cmd; 643} 644 645# Backends can be specified as a bare number, and array reference ([0,1,2]), 646# or 'all' to run against all available backends. 647sub _be_list { 648 my $self = shift; 649 my $list = shift; 650 my @l = (); 651 if (ref $list eq '') { 652 if (exists $self->{_be}->[$list]) { 653 push(@l, $self->{_be}->[$list]); 654 } elsif ($list eq 'all') { 655 @l = @{$self->{_be}}; 656 } else { 657 die "unknown argument"; 658 } 659 } elsif (ref $list eq 'ARRAY') { 660 for my $i (@$list) { 661 push(@l, $self->{_be}->[$i]); 662 } 663 } 664 return \@l; 665} 666 667# Check that the last command sent to the client arrives at the backends. 668# This is a common case so this saves typing/errors while writing tests. 669sub be_recv_c { 670 my $self = shift; 671 my $list = shift; 672 my $detail = shift || 'be received data'; 673 die "issue a a command with c_send before calling be_recv_c" unless exists $self->{_cmd}; 674 675 my $l = $self->_be_list($list); 676 my $cmd = $self->{_cmd}; 677 my @cmds = split(/(?<=\r\n)/, $cmd); 678 for my $be (@$l) { 679 for my $c (@cmds) { 680 Test::More::is(scalar <$be>, $c, $detail); 681 } 682 } 683} 684 685sub be_recv_like { 686 my $self = shift; 687 my $list = shift; 688 my $cmd = shift || die "must provide a command to check"; 689 my $detail = shift || 'be received data'; 690 691 my $l = $self->_be_list($list); 692 for my $be (@$l) { 693 Test::More::like(scalar <$be>, $cmd, $detail); 694 } 695} 696 697# Receive a different/specific string to the backend socket. 698sub be_recv { 699 my $self = shift; 700 my $list = shift; 701 my $cmd = shift || die "must provide a command to check"; 702 my $detail = shift || 'be received data'; 703 704 my $l = $self->_be_list($list); 705 for my $be (@$l) { 706 Test::More::is(scalar <$be>, $cmd, $detail); 707 } 708} 709 710# Sends a specific command to a backend socket back towards the proxy. 711# Remembers the last command sent. 712sub be_send { 713 my $self = shift; 714 my $list = shift; 715 my $cmd = shift || die "must provide data to return"; 716 717 $self->{_becmd} = $cmd; 718 my $l = $self->_be_list($list); 719 for my $be (@$l) { 720 print $be $cmd; 721 } 722} 723 724# Client receives the last command sent to any backend. This is also a common 725# case so we save some typing/errors by remembering this here. 726sub c_recv_be { 727 my $self = shift; 728 my $detail = shift || 'client received be response'; 729 730 my $cmd = $self->{_becmd}; 731 my $c = $self->{_c}; 732 Test::More::is(scalar <$c>, $cmd, $detail); 733} 734 735# Client to receive an arbitrary string. 736sub c_recv { 737 my $self = shift; 738 my $cmd = shift; 739 my $detail = shift || 'client received response'; 740 741 my $c = $self->{_c}; 742 Test::More::is(scalar <$c>, $cmd, $detail); 743} 744 745# Clear out any remembered commands and check the client pipe is clear. 746sub clear { 747 my $self = shift; 748 delete $self->{_becmd} if exists $self->{_becmd}; 749 delete $self->{_cmd} if exists $self->{_cmd}; 750 $self->check_c(); 751} 752 7531; 754