1#!/usr/bin/env perl
2
3use strict;
4use warnings;
5use Test::More;
6use FindBin qw($Bin);
7use lib "$Bin/lib";
8use MemcachedTest;
9use Data::Dumper qw/Dumper/;
10
11my $ext_path;
12
13if (!supports_extstore()) {
14    plan skip_all => 'extstore not enabled';
15    exit 0;
16}
17
18$ext_path = "/tmp/extstore.$$";
19
20my $server = new_memcached("-m 64 -U 0 -o ext_page_size=8,ext_wbuf_size=2,ext_threads=1,ext_io_depth=2,ext_item_size=512,ext_item_age=2,ext_recache_rate=10000,ext_max_frag=0.9,ext_path=$ext_path:64m,no_lru_crawler,slab_automove=0,ext_max_sleep=100000");
21ok($server, "started the server");
22
23# Based almost 100% off testClient.py which is:
24# Copyright (c) 2007  Dustin Sallings <[email protected]>
25
26# Command constants
27use constant CMD_GET        => 0x00;
28use constant CMD_SET        => 0x01;
29use constant CMD_ADD        => 0x02;
30use constant CMD_REPLACE    => 0x03;
31use constant CMD_DELETE     => 0x04;
32use constant CMD_INCR       => 0x05;
33use constant CMD_DECR       => 0x06;
34use constant CMD_QUIT       => 0x07;
35use constant CMD_FLUSH      => 0x08;
36use constant CMD_GETQ       => 0x09;
37use constant CMD_NOOP       => 0x0A;
38use constant CMD_VERSION    => 0x0B;
39use constant CMD_GETK       => 0x0C;
40use constant CMD_GETKQ      => 0x0D;
41use constant CMD_APPEND     => 0x0E;
42use constant CMD_PREPEND    => 0x0F;
43use constant CMD_STAT       => 0x10;
44use constant CMD_SETQ       => 0x11;
45use constant CMD_ADDQ       => 0x12;
46use constant CMD_REPLACEQ   => 0x13;
47use constant CMD_DELETEQ    => 0x14;
48use constant CMD_INCREMENTQ => 0x15;
49use constant CMD_DECREMENTQ => 0x16;
50use constant CMD_QUITQ      => 0x17;
51use constant CMD_FLUSHQ     => 0x18;
52use constant CMD_APPENDQ    => 0x19;
53use constant CMD_PREPENDQ   => 0x1A;
54use constant CMD_TOUCH      => 0x1C;
55use constant CMD_GAT        => 0x1D;
56use constant CMD_GATQ       => 0x1E;
57use constant CMD_GATK       => 0x23;
58use constant CMD_GATKQ      => 0x24;
59
60# REQ and RES formats are divided even though they currently share
61# the same format, since they _could_ differ in the future.
62use constant REQ_PKT_FMT      => "CCnCCnNNNN";
63use constant RES_PKT_FMT      => "CCnCCnNNNN";
64use constant INCRDECR_PKT_FMT => "NNNNN";
65use constant MIN_RECV_BYTES   => length(pack(RES_PKT_FMT));
66use constant REQ_MAGIC        => 0x80;
67use constant RES_MAGIC        => 0x81;
68
69my $mc = MC::Client->new;
70
71# Wait until all items have flushed
72sub wait_for_ext {
73    my $sum = 1;
74    while ($sum != 0) {
75        my %s = $mc->stats("items");
76        $sum = 0;
77        for my $key (keys %s) {
78            if ($key =~ m/items:(\d+):number/) {
79                # Ignore classes which can contain extstore items
80                next if $1 < 3;
81                $sum += $s{$key};
82            }
83        }
84        sleep 1 if $sum != 0;
85    }
86}
87
88my $check = sub {
89    my ($key, $orig_flags, $orig_val) = @_;
90    my ($flags, $val, $cas) = $mc->get($key);
91    is($flags, $orig_flags, "Flags is set properly");
92    ok($val eq $orig_val || $val == $orig_val, $val . " = " . $orig_val);
93};
94
95my $set = sub {
96    my ($key, $exp, $orig_flags, $orig_value) = @_;
97    $mc->set($key, $orig_value, $orig_flags, $exp);
98    $check->($key, $orig_flags, $orig_value);
99};
100
101my $empty = sub {
102    my $key = shift;
103    my $rv =()= eval { $mc->get($key) };
104    is($rv, 0, "Didn't get a result from get");
105    ok($@->not_found, "We got a not found error when we expected one");
106};
107
108my $delete = sub {
109    my ($key, $when) = @_;
110    $mc->delete($key, $when);
111    $empty->($key);
112};
113
114my $value;
115my $bigvalue;
116{
117    my @chars = ("C".."Z");
118    for (1 .. 20000) {
119        $value .= $chars[rand @chars];
120    }
121    for (1 .. 800000) {
122        $bigvalue .= $chars[rand @chars];
123    }
124}
125
126# diag "small object";
127$set->('x', 10, 19, "somevalue");
128
129# check extstore counters
130{
131    my %stats = $mc->stats('');
132    is($stats{extstore_objects_written}, 0);
133}
134
135# diag "Delete";
136#$delete->('x');
137
138# diag "Flush";
139#$empty->('y');
140
141# fill some larger objects
142{
143    my $keycount = 1000;
144    for (1 .. $keycount) {
145        $set->("nfoo$_", 0, 19, $value);
146    }
147    # wait for a flush
148    wait_for_ext();
149    # value returns for one flushed object.
150    $check->('nfoo1', 19, $value);
151
152    # check extstore counters
153    my %stats = $mc->stats('');
154    cmp_ok($stats{extstore_page_allocs}, '>', 0, 'at least one page allocated');
155    cmp_ok($stats{extstore_objects_written}, '>', $keycount / 2, 'some objects written');
156    cmp_ok($stats{extstore_bytes_written}, '>', length($value) * 2, 'some bytes written');
157    cmp_ok($stats{get_extstore}, '>', 0, 'one object was fetched');
158    cmp_ok($stats{extstore_objects_read}, '>', 0, 'one object read');
159    cmp_ok($stats{extstore_bytes_read}, '>', length($value), 'some bytes read');
160    # Test multiget
161    my $rv = $mc->get_multi(qw(nfoo2 nfoo3 noexist));
162    is($rv->{nfoo2}->[1], $value, 'multiget nfoo2');
163    is($rv->{nfoo3}->[1], $value, 'multiget nfoo2');
164
165    # Remove half of the keys for the next test.
166    for (1 .. $keycount) {
167        next unless $_ % 2 == 0;
168        $delete->("nfoo$_");
169    }
170
171    my %stats2 = $mc->stats('');
172    cmp_ok($stats{extstore_bytes_used}, '>', $stats2{extstore_bytes_used},
173        'bytes used dropped after deletions');
174    cmp_ok($stats{extstore_objects_used}, '>', $stats2{extstore_objects_used},
175        'objects used dropped after deletions');
176    is($stats2{badcrc_from_extstore}, 0, 'CRC checks successful');
177
178    # delete the rest
179    for (1 .. $keycount) {
180        next unless $_ % 2 == 1;
181        $delete->("nfoo$_");
182    }
183}
184
185# check evictions and misses
186{
187    my $keycount = 1250;
188    for (1 .. $keycount) {
189        $set->("mfoo$_", 0, 19, $value);
190    }
191    wait_for_ext();
192    for ($keycount .. ($keycount*3)) {
193        $set->("mfoo$_", 0, 19, $value);
194    }
195    wait_for_ext();
196    # FIXME: Need to sample through a few values, or fix eviction to be
197    # more accurate. On 32bit systems some pages unused to this point get
198    # filled after the first few items, then the eviction algo pulls those
199    # pages since they have the lowest version number, leaving older objects
200    # in memory and evicting newer ones.
201    for (1 .. ($keycount*3)) {
202        next unless $_ % 100 == 0;
203        eval { $mc->get("mfoo$_"); };
204    }
205
206    my %s = $mc->stats('');
207    cmp_ok($s{extstore_objects_evicted}, '>', 0);
208    cmp_ok($s{miss_from_extstore}, '>', 0);
209}
210
211# store and re-fetch a chunked value
212{
213    my %stats = $mc->stats('');
214    $set->("bigvalue", 0, 0, $bigvalue);
215    wait_for_ext();
216    $check->("bigvalue", 0, $bigvalue);
217    my %stats2 = $mc->stats('');
218
219    cmp_ok($stats2{extstore_objects_written}, '>',
220        $stats{extstore_objects_written}, "a large value flushed");
221}
222
223# ensure ASCII can still fetch the chunked value.
224{
225    my $ns = $server->new_sock;
226
227    my %s1 = $mc->stats('');
228    mem_get_is($ns, "bigvalue", $bigvalue);
229    print $ns "extstore recache_rate 1\r\n";
230    is(scalar <$ns>, "OK\r\n", "recache rate upped");
231    for (1..3) {
232        mem_get_is($ns, "bigvalue", $bigvalue);
233        $check->('bigvalue', 0, $bigvalue);
234    }
235    my %s2 = $mc->stats('');
236    cmp_ok($s2{recache_from_extstore}, '>', $s1{recache_from_extstore},
237        'a new recache happened');
238
239}
240
241done_testing();
242
243END {
244    unlink $ext_path if $ext_path;
245}
246# ######################################################################
247# Test ends around here.
248# ######################################################################
249
250package MC::Client;
251
252use strict;
253use warnings;
254use fields qw(socket);
255use IO::Socket::INET;
256
257sub new {
258    my $self = shift;
259    my ($s) = @_;
260    $s = $server unless defined $s;
261    my $sock = $s->sock;
262    $self = fields::new($self);
263    $self->{socket} = $sock;
264    return $self;
265}
266
267sub build_command {
268    my $self = shift;
269    die "Not enough args to send_command" unless @_ >= 4;
270    my ($cmd, $key, $val, $opaque, $extra_header, $cas) = @_;
271
272    $extra_header = '' unless defined $extra_header;
273    my $keylen    = length($key);
274    my $vallen    = length($val);
275    my $extralen  = length($extra_header);
276    my $datatype  = 0;  # field for future use
277    my $reserved  = 0;  # field for future use
278    my $totallen  = $keylen + $vallen + $extralen;
279    my $ident_hi  = 0;
280    my $ident_lo  = 0;
281
282    if ($cas) {
283        $ident_hi = int($cas / 2 ** 32);
284        $ident_lo = int($cas % 2 ** 32);
285    }
286
287    my $msg = pack(::REQ_PKT_FMT, ::REQ_MAGIC, $cmd, $keylen, $extralen,
288                   $datatype, $reserved, $totallen, $opaque, $ident_hi,
289                   $ident_lo);
290    my $full_msg = $msg . $extra_header . $key . $val;
291    return $full_msg;
292}
293
294sub send_command {
295    my $self = shift;
296    die "Not enough args to send_command" unless @_ >= 4;
297    my ($cmd, $key, $val, $opaque, $extra_header, $cas) = @_;
298
299    my $full_msg = $self->build_command($cmd, $key, $val, $opaque, $extra_header, $cas);
300
301    my $sent = 0;
302    my $data_len =  length($full_msg);
303    while ($sent < $data_len) {
304        my $sent_bytes = $self->{socket}->syswrite($full_msg,
305                                    $data_len - $sent > MemcachedTest::MAX_READ_WRITE_SIZE ?
306                                        MemcachedTest::MAX_READ_WRITE_SIZE : ($data_len - $sent),
307                                    $sent);
308        last if ($sent_bytes <= 0);
309        $sent += $sent_bytes;
310    }
311    die("Send failed:  $!") unless $data_len;
312
313    if($sent != length($full_msg)) {
314        die("only sent $sent of " . length($full_msg) . " bytes");
315    }
316}
317
318sub flush_socket {
319    my $self = shift;
320    $self->{socket}->flush;
321}
322
323# Send a silent command and ensure it doesn't respond.
324sub send_silent {
325    my $self = shift;
326    die "Not enough args to send_silent" unless @_ >= 4;
327    my ($cmd, $key, $val, $opaque, $extra_header, $cas) = @_;
328
329    $self->send_command($cmd, $key, $val, $opaque, $extra_header, $cas);
330    $self->send_command(::CMD_NOOP, '', '', $opaque + 1);
331
332    my ($ropaque, $data) = $self->_handle_single_response;
333    Test::More::is($ropaque, $opaque + 1);
334}
335
336sub silent_mutation {
337    my $self = shift;
338    my ($cmd, $key, $value) = @_;
339
340    $empty->($key);
341    my $extra = pack "NN", 82, 0;
342    $mc->send_silent($cmd, $key, $value, 7278552, $extra, 0);
343    $check->($key, 82, $value);
344}
345
346sub _handle_single_response {
347    my $self = shift;
348    my $myopaque = shift;
349
350    my $hdr = "";
351    while(::MIN_RECV_BYTES - length($hdr) > 0) {
352        $self->{socket}->sysread(my $response, ::MIN_RECV_BYTES - length($hdr));
353        $hdr .= $response;
354    }
355    Test::More::is(length($hdr), ::MIN_RECV_BYTES, "Expected read length");
356
357    my ($magic, $cmd, $keylen, $extralen, $datatype, $status, $remaining,
358        $opaque, $ident_hi, $ident_lo) = unpack(::RES_PKT_FMT, $hdr);
359    Test::More::is($magic, ::RES_MAGIC, "Got proper response magic");
360
361    my $cas = ($ident_hi * 2 ** 32) + $ident_lo;
362
363    return ($opaque, '', $cas, 0) if($remaining == 0);
364
365    # fetch the value
366    my $rv="";
367    while($remaining - length($rv) > 0) {
368        $self->{socket}->sysread(my $buf, $remaining - length($rv));
369        $rv .= $buf;
370    }
371    if(length($rv) != $remaining) {
372        my $found = length($rv);
373        die("Expected $remaining bytes, got $found");
374    }
375    if (defined $myopaque) {
376        Test::More::is($opaque, $myopaque, "Expected opaque");
377    } else {
378        Test::More::pass("Implicit pass since myopaque is undefined");
379    }
380
381    if ($status) {
382        die MC::Error->new($status, $rv);
383    }
384
385    return ($opaque, $rv, $cas, $keylen);
386}
387
388sub _do_command {
389    my $self = shift;
390    die unless @_ >= 3;
391    my ($cmd, $key, $val, $extra_header, $cas) = @_;
392
393    $extra_header = '' unless defined $extra_header;
394    my $opaque = int(rand(2**32));
395    $self->send_command($cmd, $key, $val, $opaque, $extra_header, $cas);
396    my (undef, $rv, $rcas) = $self->_handle_single_response($opaque);
397    return ($rv, $rcas);
398}
399
400sub _incrdecr_header {
401    my $self = shift;
402    my ($amt, $init, $exp) = @_;
403
404    my $amt_hi = int($amt / 2 ** 32);
405    my $amt_lo = int($amt % 2 ** 32);
406
407    my $init_hi = int($init / 2 ** 32);
408    my $init_lo = int($init % 2 ** 32);
409
410    my $extra_header = pack(::INCRDECR_PKT_FMT, $amt_hi, $amt_lo, $init_hi,
411                            $init_lo, $exp);
412
413    return $extra_header;
414}
415
416sub _incrdecr_cas {
417    my $self = shift;
418    my ($cmd, $key, $amt, $init, $exp) = @_;
419
420    my ($data, $rcas) = $self->_do_command($cmd, $key, '',
421                                           $self->_incrdecr_header($amt, $init, $exp));
422
423    my $header = substr $data, 0, 8, '';
424    my ($resp_hi, $resp_lo) = unpack "NN", $header;
425    my $resp = ($resp_hi * 2 ** 32) + $resp_lo;
426
427    return $resp, $rcas;
428}
429
430sub _incrdecr {
431    my $self = shift;
432    my ($v, $c) = $self->_incrdecr_cas(@_);
433    return $v
434}
435
436sub silent_incrdecr {
437    my $self = shift;
438    my ($cmd, $key, $amt, $init, $exp) = @_;
439    my $opaque = 8275753;
440
441    $mc->send_silent($cmd, $key, '', $opaque,
442                     $mc->_incrdecr_header($amt, $init, $exp));
443}
444
445sub stats {
446    my $self = shift;
447    my $key  = shift;
448    my $cas = 0;
449    my $opaque = int(rand(2**32));
450    $self->send_command(::CMD_STAT, $key, '', $opaque, '', $cas);
451
452    my %rv = ();
453    my $found_key = '';
454    my $found_val = '';
455    do {
456        my ($op, $data, $cas, $keylen) = $self->_handle_single_response($opaque);
457        if($keylen > 0) {
458            $found_key = substr($data, 0, $keylen);
459            $found_val = substr($data, $keylen);
460            $rv{$found_key} = $found_val;
461        } else {
462            $found_key = '';
463        }
464    } while($found_key ne '');
465    return %rv;
466}
467
468sub get {
469    my $self = shift;
470    my $key  = shift;
471    my ($rv, $cas) = $self->_do_command(::CMD_GET, $key, '', '');
472
473    my $header = substr $rv, 0, 4, '';
474    my $flags  = unpack("N", $header);
475
476    return ($flags, $rv, $cas);
477}
478
479sub get_multi {
480    my $self = shift;
481    my @keys = @_;
482
483    for (my $i = 0; $i < @keys; $i++) {
484        $self->send_command(::CMD_GETQ, $keys[$i], '', $i, '', 0);
485    }
486
487    my $terminal = @keys + 10;
488    $self->send_command(::CMD_NOOP, '', '', $terminal);
489
490    my %return;
491    while (1) {
492        my ($opaque, $data) = $self->_handle_single_response;
493        last if $opaque == $terminal;
494
495        my $header = substr $data, 0, 4, '';
496        my $flags  = unpack("N", $header);
497
498        $return{$keys[$opaque]} = [$flags, $data];
499    }
500
501    return %return if wantarray;
502    return \%return;
503}
504
505sub touch {
506    my $self = shift;
507    my ($key, $expire) = @_;
508    my $extra_header = pack "N", $expire;
509    my $cas = 0;
510    return $self->_do_command(::CMD_TOUCH, $key, '', $extra_header, $cas);
511}
512
513sub gat {
514    my $self   = shift;
515    my $key    = shift;
516    my $expire = shift;
517    my $extra_header = pack "N", $expire;
518    my ($rv, $cas) = $self->_do_command(::CMD_GAT, $key, '', $extra_header);
519
520    my $header = substr $rv, 0, 4, '';
521    my $flags  = unpack("N", $header);
522
523    return ($flags, $rv, $cas);
524}
525
526sub version {
527    my $self = shift;
528    return $self->_do_command(::CMD_VERSION, '', '');
529}
530
531sub flush {
532    my $self = shift;
533    return $self->_do_command(::CMD_FLUSH, '', '');
534}
535
536sub add {
537    my $self = shift;
538    my ($key, $val, $flags, $expire) = @_;
539    my $extra_header = pack "NN", $flags, $expire;
540    my $cas = 0;
541    return $self->_do_command(::CMD_ADD, $key, $val, $extra_header, $cas);
542}
543
544sub set {
545    my $self = shift;
546    my ($key, $val, $flags, $expire, $cas) = @_;
547    my $extra_header = pack "NN", $flags, $expire;
548    return $self->_do_command(::CMD_SET, $key, $val, $extra_header, $cas);
549}
550
551sub _append_prepend {
552    my $self = shift;
553    my ($cmd, $key, $val, $cas) = @_;
554    return $self->_do_command($cmd, $key, $val, '', $cas);
555}
556
557sub replace {
558    my $self = shift;
559    my ($key, $val, $flags, $expire) = @_;
560    my $extra_header = pack "NN", $flags, $expire;
561    my $cas = 0;
562    return $self->_do_command(::CMD_REPLACE, $key, $val, $extra_header, $cas);
563}
564
565sub delete {
566    my $self = shift;
567    my ($key) = @_;
568    return $self->_do_command(::CMD_DELETE, $key, '');
569}
570
571sub incr {
572    my $self = shift;
573    my ($key, $amt, $init, $exp) = @_;
574    $amt = 1 unless defined $amt;
575    $init = 0 unless defined $init;
576    $exp = 0 unless defined $exp;
577
578    return $self->_incrdecr(::CMD_INCR, $key, $amt, $init, $exp);
579}
580
581sub incr_cas {
582    my $self = shift;
583    my ($key, $amt, $init, $exp) = @_;
584    $amt = 1 unless defined $amt;
585    $init = 0 unless defined $init;
586    $exp = 0 unless defined $exp;
587
588    return $self->_incrdecr_cas(::CMD_INCR, $key, $amt, $init, $exp);
589}
590
591sub decr {
592    my $self = shift;
593    my ($key, $amt, $init, $exp) = @_;
594    $amt = 1 unless defined $amt;
595    $init = 0 unless defined $init;
596    $exp = 0 unless defined $exp;
597
598    return $self->_incrdecr(::CMD_DECR, $key, $amt, $init, $exp);
599}
600
601sub noop {
602    my $self = shift;
603    return $self->_do_command(::CMD_NOOP, '', '');
604}
605
606package MC::Error;
607
608use strict;
609use warnings;
610
611use constant ERR_UNKNOWN_CMD  => 0x81;
612use constant ERR_NOT_FOUND    => 0x1;
613use constant ERR_EXISTS       => 0x2;
614use constant ERR_TOO_BIG      => 0x3;
615use constant ERR_EINVAL       => 0x4;
616use constant ERR_NOT_STORED   => 0x5;
617use constant ERR_DELTA_BADVAL => 0x6;
618
619use overload '""' => sub {
620    my $self = shift;
621    return "Memcache Error ($self->[0]): $self->[1]";
622};
623
624sub new {
625    my $class = shift;
626    my $error = [@_];
627    my $self = bless $error, (ref $class || $class);
628
629    return $self;
630}
631
632sub not_found {
633    my $self = shift;
634    return $self->[0] == ERR_NOT_FOUND;
635}
636
637sub exists {
638    my $self = shift;
639    return $self->[0] == ERR_EXISTS;
640}
641
642sub too_big {
643    my $self = shift;
644    return $self->[0] == ERR_TOO_BIG;
645}
646
647sub delta_badval {
648    my $self = shift;
649    return $self->[0] == ERR_DELTA_BADVAL;
650}
651
652sub einval {
653    my $self = shift;
654    return $self->[0] == ERR_EINVAL;
655}
656
657# vim: filetype=perl
658
659