1# return value is like strcmp() and similar.
2proc streamCompareID {a b} {
3    if {$a eq $b} {return 0}
4    lassign [split $a -] a_ms a_seq
5    lassign [split $b -] b_ms b_seq
6    if {$a_ms > $b_ms} {return 1}
7    if {$a_ms < $b_ms} {return -1}
8    # Same ms case, compare seq.
9    if {$a_seq > $b_seq} {return 1}
10    if {$a_seq < $b_seq} {return -1}
11}
12
13# return the ID immediately greater than the specified one.
14# Note that this function does not care to handle 'seq' overflow
15# since it's a 64 bit value.
16proc streamNextID {id} {
17    lassign [split $id -] ms seq
18    incr seq
19    join [list $ms $seq] -
20}
21
22# Generate a random stream entry ID with the ms part between min and max
23# and a low sequence number (0 - 999 range), in order to stress test
24# XRANGE against a Tcl implementation implementing the same concept
25# with Tcl-only code in a linear array.
26proc streamRandomID {min_id max_id} {
27    lassign [split $min_id -] min_ms min_seq
28    lassign [split $max_id -] max_ms max_seq
29    set delta [expr {$max_ms-$min_ms+1}]
30    set ms [expr {$min_ms+[randomInt $delta]}]
31    set seq [randomInt 1000]
32    return $ms-$seq
33}
34
35# Tcl-side implementation of XRANGE to perform fuzz testing in the Redis
36# XRANGE implementation.
37proc streamSimulateXRANGE {items start end} {
38    set res {}
39    foreach i $items  {
40        set this_id [lindex $i 0]
41        if {[streamCompareID $this_id $start] >= 0} {
42            if {[streamCompareID $this_id $end] <= 0} {
43                lappend res $i
44            }
45        }
46    }
47    return $res
48}
49
50set content {} ;# Will be populated with Tcl side copy of the stream content.
51
52start_server {
53    tags {"stream"}
54} {
55    test {XADD can add entries into a stream that XRANGE can fetch} {
56        r XADD mystream * item 1 value a
57        r XADD mystream * item 2 value b
58        assert_equal 2 [r XLEN mystream]
59        set items [r XRANGE mystream - +]
60        assert_equal [lindex $items 0 1] {item 1 value a}
61        assert_equal [lindex $items 1 1] {item 2 value b}
62    }
63
64    test {XADD IDs are incremental} {
65        set id1 [r XADD mystream * item 1 value a]
66        set id2 [r XADD mystream * item 2 value b]
67        set id3 [r XADD mystream * item 3 value c]
68        assert {[streamCompareID $id1 $id2] == -1}
69        assert {[streamCompareID $id2 $id3] == -1}
70    }
71
72    test {XADD IDs are incremental when ms is the same as well} {
73        r multi
74        r XADD mystream * item 1 value a
75        r XADD mystream * item 2 value b
76        r XADD mystream * item 3 value c
77        lassign [r exec] id1 id2 id3
78        assert {[streamCompareID $id1 $id2] == -1}
79        assert {[streamCompareID $id2 $id3] == -1}
80    }
81
82    test {XADD with MAXLEN option} {
83        r DEL mystream
84        for {set j 0} {$j < 1000} {incr j} {
85            if {rand() < 0.9} {
86                r XADD mystream MAXLEN 5 * xitem $j
87            } else {
88                r XADD mystream MAXLEN 5 * yitem $j
89            }
90        }
91        set res [r xrange mystream - +]
92        set expected 995
93        foreach r $res {
94            assert {[lindex $r 1 1] == $expected}
95            incr expected
96        }
97    }
98
99    test {XADD mass insertion and XLEN} {
100        r DEL mystream
101        r multi
102        for {set j 0} {$j < 10000} {incr j} {
103            # From time to time insert a field with a different set
104            # of fields in order to stress the stream compression code.
105            if {rand() < 0.9} {
106                r XADD mystream * item $j
107            } else {
108                r XADD mystream * item $j otherfield foo
109            }
110        }
111        r exec
112
113        set items [r XRANGE mystream - +]
114        for {set j 0} {$j < 10000} {incr j} {
115            assert {[lrange [lindex $items $j 1] 0 1] eq [list item $j]}
116        }
117        assert {[r xlen mystream] == $j}
118    }
119
120    test {XRANGE COUNT works as expected} {
121        assert {[llength [r xrange mystream - + COUNT 10]] == 10}
122    }
123
124    test {XREVRANGE COUNT works as expected} {
125        assert {[llength [r xrevrange mystream + - COUNT 10]] == 10}
126    }
127
128    test {XRANGE can be used to iterate the whole stream} {
129        set last_id "-"
130        set j 0
131        while 1 {
132            set elements [r xrange mystream $last_id + COUNT 100]
133            if {[llength $elements] == 0} break
134            foreach e $elements {
135                assert {[lrange [lindex $e 1] 0 1] eq [list item $j]}
136                incr j;
137            }
138            set last_id [streamNextID [lindex $elements end 0]]
139        }
140        assert {$j == 10000}
141    }
142
143    test {XREVRANGE returns the reverse of XRANGE} {
144        assert {[r xrange mystream - +] == [lreverse [r xrevrange mystream + -]]}
145    }
146
147    test {XREAD with non empty stream} {
148        set res [r XREAD COUNT 1 STREAMS mystream 0-0]
149        assert {[lrange [lindex $res 0 1 0 1] 0 1] eq {item 0}}
150    }
151
152    test {Non blocking XREAD with empty streams} {
153        set res [r XREAD STREAMS s1 s2 0-0 0-0]
154        assert {$res eq {}}
155    }
156
157    test {XREAD with non empty second stream} {
158        set res [r XREAD COUNT 1 STREAMS nostream mystream 0-0 0-0]
159        assert {[lindex $res 0 0] eq {mystream}}
160        assert {[lrange [lindex $res 0 1 0 1] 0 1] eq {item 0}}
161    }
162
163    test {Blocking XREAD waiting new data} {
164        r XADD s2 * old abcd1234
165        set rd [redis_deferring_client]
166        $rd XREAD BLOCK 20000 STREAMS s1 s2 s3 $ $ $
167        r XADD s2 * new abcd1234
168        set res [$rd read]
169        assert {[lindex $res 0 0] eq {s2}}
170        assert {[lindex $res 0 1 0 1] eq {new abcd1234}}
171    }
172
173    test {Blocking XREAD waiting old data} {
174        set rd [redis_deferring_client]
175        $rd XREAD BLOCK 20000 STREAMS s1 s2 s3 $ 0-0 $
176        r XADD s2 * foo abcd1234
177        set res [$rd read]
178        assert {[lindex $res 0 0] eq {s2}}
179        assert {[lindex $res 0 1 0 1] eq {old abcd1234}}
180    }
181
182    test "XREAD: XADD + DEL should not awake client" {
183        set rd [redis_deferring_client]
184        r del s1
185        $rd XREAD BLOCK 20000 STREAMS s1 $
186        r multi
187        r XADD s1 * old abcd1234
188        r DEL s1
189        r exec
190        r XADD s1 * new abcd1234
191        set res [$rd read]
192        assert {[lindex $res 0 0] eq {s1}}
193        assert {[lindex $res 0 1 0 1] eq {new abcd1234}}
194    }
195
196    test "XREAD: XADD + DEL + LPUSH should not awake client" {
197        set rd [redis_deferring_client]
198        r del s1
199        $rd XREAD BLOCK 20000 STREAMS s1 $
200        r multi
201        r XADD s1 * old abcd1234
202        r DEL s1
203        r LPUSH s1 foo bar
204        r exec
205        r DEL s1
206        r XADD s1 * new abcd1234
207        set res [$rd read]
208        assert {[lindex $res 0 0] eq {s1}}
209        assert {[lindex $res 0 1 0 1] eq {new abcd1234}}
210    }
211
212    test {XREAD with same stream name multiple times should work} {
213        r XADD s2 * old abcd1234
214        set rd [redis_deferring_client]
215        $rd XREAD BLOCK 20000 STREAMS s2 s2 s2 $ $ $
216        r XADD s2 * new abcd1234
217        set res [$rd read]
218        assert {[lindex $res 0 0] eq {s2}}
219        assert {[lindex $res 0 1 0 1] eq {new abcd1234}}
220    }
221
222    test {XREAD + multiple XADD inside transaction} {
223        r XADD s2 * old abcd1234
224        set rd [redis_deferring_client]
225        $rd XREAD BLOCK 20000 STREAMS s2 s2 s2 $ $ $
226        r MULTI
227        r XADD s2 * field one
228        r XADD s2 * field two
229        r XADD s2 * field three
230        r EXEC
231        set res [$rd read]
232        assert {[lindex $res 0 0] eq {s2}}
233        assert {[lindex $res 0 1 0 1] eq {field one}}
234        assert {[lindex $res 0 1 1 1] eq {field two}}
235    }
236
237    test {XDEL basic test} {
238        r del somestream
239        r xadd somestream * foo value0
240        set id [r xadd somestream * foo value1]
241        r xadd somestream * foo value2
242        r xdel somestream $id
243        assert {[r xlen somestream] == 2}
244        set result [r xrange somestream - +]
245        assert {[lindex $result 0 1 1] eq {value0}}
246        assert {[lindex $result 1 1 1] eq {value2}}
247    }
248
249    # Here the idea is to check the consistency of the stream data structure
250    # as we remove all the elements down to zero elements.
251    test {XDEL fuzz test} {
252        r del somestream
253        set ids {}
254        set x 0; # Length of the stream
255        while 1 {
256            lappend ids [r xadd somestream * item $x]
257            incr x
258            # Add enough elements to have a few radix tree nodes inside the stream.
259            if {[dict get [r xinfo stream somestream] radix-tree-keys] > 20} break
260        }
261
262        # Now remove all the elements till we reach an empty stream
263        # and after every deletion, check that the stream is sane enough
264        # to report the right number of elements with XRANGE: this will also
265        # force accessing the whole data structure to check sanity.
266        assert {[r xlen somestream] == $x}
267
268        # We want to remove elements in random order to really test the
269        # implementation in a better way.
270        set ids [lshuffle $ids]
271        foreach id $ids {
272            assert {[r xdel somestream $id] == 1}
273            incr x -1
274            assert {[r xlen somestream] == $x}
275            # The test would be too slow calling XRANGE for every iteration.
276            # Do it every 100 removal.
277            if {$x % 100 == 0} {
278                set res [r xrange somestream - +]
279                assert {[llength $res] == $x}
280            }
281        }
282    }
283
284    test {XRANGE fuzzing} {
285        set low_id [lindex $items 0 0]
286        set high_id [lindex $items end 0]
287        for {set j 0} {$j < 100} {incr j} {
288            set start [streamRandomID $low_id $high_id]
289            set end [streamRandomID $low_id $high_id]
290            set range [r xrange mystream $start $end]
291            set tcl_range [streamSimulateXRANGE $items $start $end]
292            if {$range ne $tcl_range} {
293                puts "*** WARNING *** - XRANGE fuzzing mismatch: $start - $end"
294                puts "---"
295                puts "XRANGE: '$range'"
296                puts "---"
297                puts "TCL: '$tcl_range'"
298                puts "---"
299                fail "XRANGE fuzzing failed, check logs for details"
300            }
301        }
302    }
303
304    test {XREVRANGE regression test for issue #5006} {
305        # Add non compressed entries
306        r xadd teststream 1234567891230 key1 value1
307        r xadd teststream 1234567891240 key2 value2
308        r xadd teststream 1234567891250 key3 value3
309
310        # Add SAMEFIELD compressed entries
311        r xadd teststream2 1234567891230 key1 value1
312        r xadd teststream2 1234567891240 key1 value2
313        r xadd teststream2 1234567891250 key1 value3
314
315        assert_equal [r xrevrange teststream 1234567891245 -] {{1234567891240-0 {key2 value2}} {1234567891230-0 {key1 value1}}}
316
317        assert_equal [r xrevrange teststream2 1234567891245 -] {{1234567891240-0 {key1 value2}} {1234567891230-0 {key1 value1}}}
318    }
319}
320
321start_server {tags {"stream"} overrides {appendonly yes}} {
322    test {XADD with MAXLEN > xlen can propagate correctly} {
323        for {set j 0} {$j < 100} {incr j} {
324            r XADD mystream * xitem v
325        }
326        r XADD mystream MAXLEN 200 * xitem v
327        incr j
328        assert {[r xlen mystream] == $j}
329        r debug loadaof
330        r XADD mystream * xitem v
331        incr j
332        assert {[r xlen mystream] == $j}
333    }
334}
335
336start_server {tags {"stream"} overrides {appendonly yes}} {
337    test {XADD with ~ MAXLEN can propagate correctly} {
338        for {set j 0} {$j < 100} {incr j} {
339            r XADD mystream * xitem v
340        }
341        r XADD mystream MAXLEN ~ $j * xitem v
342        incr j
343        assert {[r xlen mystream] == $j}
344        r config set stream-node-max-entries 1
345        r debug loadaof
346        r XADD mystream * xitem v
347        incr j
348        assert {[r xlen mystream] == $j}
349    }
350}
351
352start_server {tags {"stream"} overrides {appendonly yes stream-node-max-entries 10}} {
353    test {XTRIM with ~ MAXLEN can propagate correctly} {
354        for {set j 0} {$j < 100} {incr j} {
355            r XADD mystream * xitem v
356        }
357        r XTRIM mystream MAXLEN ~ 85
358        assert {[r xlen mystream] == 89}
359        r config set stream-node-max-entries 1
360        r debug loadaof
361        r XADD mystream * xitem v
362        incr j
363        assert {[r xlen mystream] == 90}
364    }
365}
366
367start_server {tags {"xsetid"}} {
368    test {XADD can CREATE an empty stream} {
369        r XADD mystream MAXLEN 0 * a b
370        assert {[dict get [r xinfo stream mystream] length] == 0}
371    }
372
373    test {XSETID can set a specific ID} {
374        r XSETID mystream "200-0"
375        assert {[dict get [r xinfo stream mystream] last-generated-id] == "200-0"}
376    }
377
378    test {XSETID cannot SETID with smaller ID} {
379        r XADD mystream * a b
380        catch {r XSETID mystream "1-1"} err
381        r XADD mystream MAXLEN 0 * a b
382        set err
383    } {ERR*smaller*}
384
385    test {XSETID cannot SETID on non-existent key} {
386        catch {r XSETID stream 1-1} err
387        set _ $err
388    } {ERR no such key}
389}
390
391start_server {tags {"stream"} overrides {appendonly yes aof-use-rdb-preamble no}} {
392    test {Empty stream can be rewrite into AOF correctly} {
393        r XADD mystream MAXLEN 0 * a b
394        assert {[dict get [r xinfo stream mystream] length] == 0}
395        r bgrewriteaof
396        waitForBgrewriteaof r
397        r debug loadaof
398        assert {[dict get [r xinfo stream mystream] length] == 0}
399    }
400
401    test {Stream can be rewrite into AOF correctly after XDEL lastid} {
402        r XSETID mystream 0-0
403        r XADD mystream 1-1 a b
404        r XADD mystream 2-2 a b
405        assert {[dict get [r xinfo stream mystream] length] == 2}
406        r XDEL mystream 2-2
407        r bgrewriteaof
408        waitForBgrewriteaof r
409        r debug loadaof
410        assert {[dict get [r xinfo stream mystream] length] == 1}
411        assert {[dict get [r xinfo stream mystream] last-generated-id] == "2-2"}
412    }
413}
414