1# return value is like strcmp() and similar. 2proc streamCompareID {a b} { 3 if {$a eq $b} {return 0} 4 lassign [split $a -] a_ms a_seq 5 lassign [split $b -] b_ms b_seq 6 if {$a_ms > $b_ms} {return 1} 7 if {$a_ms < $b_ms} {return -1} 8 # Same ms case, compare seq. 9 if {$a_seq > $b_seq} {return 1} 10 if {$a_seq < $b_seq} {return -1} 11} 12 13# return the ID immediately greater than the specified one. 14# Note that this function does not care to handle 'seq' overflow 15# since it's a 64 bit value. 16proc streamNextID {id} { 17 lassign [split $id -] ms seq 18 incr seq 19 join [list $ms $seq] - 20} 21 22# Generate a random stream entry ID with the ms part between min and max 23# and a low sequence number (0 - 999 range), in order to stress test 24# XRANGE against a Tcl implementation implementing the same concept 25# with Tcl-only code in a linear array. 26proc streamRandomID {min_id max_id} { 27 lassign [split $min_id -] min_ms min_seq 28 lassign [split $max_id -] max_ms max_seq 29 set delta [expr {$max_ms-$min_ms+1}] 30 set ms [expr {$min_ms+[randomInt $delta]}] 31 set seq [randomInt 1000] 32 return $ms-$seq 33} 34 35# Tcl-side implementation of XRANGE to perform fuzz testing in the Redis 36# XRANGE implementation. 37proc streamSimulateXRANGE {items start end} { 38 set res {} 39 foreach i $items { 40 set this_id [lindex $i 0] 41 if {[streamCompareID $this_id $start] >= 0} { 42 if {[streamCompareID $this_id $end] <= 0} { 43 lappend res $i 44 } 45 } 46 } 47 return $res 48} 49 50set content {} ;# Will be populated with Tcl side copy of the stream content. 51 52start_server { 53 tags {"stream"} 54} { 55 test {XADD can add entries into a stream that XRANGE can fetch} { 56 r XADD mystream * item 1 value a 57 r XADD mystream * item 2 value b 58 assert_equal 2 [r XLEN mystream] 59 set items [r XRANGE mystream - +] 60 assert_equal [lindex $items 0 1] {item 1 value a} 61 assert_equal [lindex $items 1 1] {item 2 value b} 62 } 63 64 test {XADD IDs are incremental} { 65 set id1 [r XADD mystream * item 1 value a] 66 set id2 [r XADD mystream * item 2 value b] 67 set id3 [r XADD mystream * item 3 value c] 68 assert {[streamCompareID $id1 $id2] == -1} 69 assert {[streamCompareID $id2 $id3] == -1} 70 } 71 72 test {XADD IDs are incremental when ms is the same as well} { 73 r multi 74 r XADD mystream * item 1 value a 75 r XADD mystream * item 2 value b 76 r XADD mystream * item 3 value c 77 lassign [r exec] id1 id2 id3 78 assert {[streamCompareID $id1 $id2] == -1} 79 assert {[streamCompareID $id2 $id3] == -1} 80 } 81 82 test {XADD with MAXLEN option} { 83 r DEL mystream 84 for {set j 0} {$j < 1000} {incr j} { 85 if {rand() < 0.9} { 86 r XADD mystream MAXLEN 5 * xitem $j 87 } else { 88 r XADD mystream MAXLEN 5 * yitem $j 89 } 90 } 91 set res [r xrange mystream - +] 92 set expected 995 93 foreach r $res { 94 assert {[lindex $r 1 1] == $expected} 95 incr expected 96 } 97 } 98 99 test {XADD mass insertion and XLEN} { 100 r DEL mystream 101 r multi 102 for {set j 0} {$j < 10000} {incr j} { 103 # From time to time insert a field with a different set 104 # of fields in order to stress the stream compression code. 105 if {rand() < 0.9} { 106 r XADD mystream * item $j 107 } else { 108 r XADD mystream * item $j otherfield foo 109 } 110 } 111 r exec 112 113 set items [r XRANGE mystream - +] 114 for {set j 0} {$j < 10000} {incr j} { 115 assert {[lrange [lindex $items $j 1] 0 1] eq [list item $j]} 116 } 117 assert {[r xlen mystream] == $j} 118 } 119 120 test {XRANGE COUNT works as expected} { 121 assert {[llength [r xrange mystream - + COUNT 10]] == 10} 122 } 123 124 test {XREVRANGE COUNT works as expected} { 125 assert {[llength [r xrevrange mystream + - COUNT 10]] == 10} 126 } 127 128 test {XRANGE can be used to iterate the whole stream} { 129 set last_id "-" 130 set j 0 131 while 1 { 132 set elements [r xrange mystream $last_id + COUNT 100] 133 if {[llength $elements] == 0} break 134 foreach e $elements { 135 assert {[lrange [lindex $e 1] 0 1] eq [list item $j]} 136 incr j; 137 } 138 set last_id [streamNextID [lindex $elements end 0]] 139 } 140 assert {$j == 10000} 141 } 142 143 test {XREVRANGE returns the reverse of XRANGE} { 144 assert {[r xrange mystream - +] == [lreverse [r xrevrange mystream + -]]} 145 } 146 147 test {XREAD with non empty stream} { 148 set res [r XREAD COUNT 1 STREAMS mystream 0-0] 149 assert {[lrange [lindex $res 0 1 0 1] 0 1] eq {item 0}} 150 } 151 152 test {Non blocking XREAD with empty streams} { 153 set res [r XREAD STREAMS s1 s2 0-0 0-0] 154 assert {$res eq {}} 155 } 156 157 test {XREAD with non empty second stream} { 158 set res [r XREAD COUNT 1 STREAMS nostream mystream 0-0 0-0] 159 assert {[lindex $res 0 0] eq {mystream}} 160 assert {[lrange [lindex $res 0 1 0 1] 0 1] eq {item 0}} 161 } 162 163 test {Blocking XREAD waiting new data} { 164 r XADD s2 * old abcd1234 165 set rd [redis_deferring_client] 166 $rd XREAD BLOCK 20000 STREAMS s1 s2 s3 $ $ $ 167 r XADD s2 * new abcd1234 168 set res [$rd read] 169 assert {[lindex $res 0 0] eq {s2}} 170 assert {[lindex $res 0 1 0 1] eq {new abcd1234}} 171 } 172 173 test {Blocking XREAD waiting old data} { 174 set rd [redis_deferring_client] 175 $rd XREAD BLOCK 20000 STREAMS s1 s2 s3 $ 0-0 $ 176 r XADD s2 * foo abcd1234 177 set res [$rd read] 178 assert {[lindex $res 0 0] eq {s2}} 179 assert {[lindex $res 0 1 0 1] eq {old abcd1234}} 180 } 181 182 test "XREAD: XADD + DEL should not awake client" { 183 set rd [redis_deferring_client] 184 r del s1 185 $rd XREAD BLOCK 20000 STREAMS s1 $ 186 r multi 187 r XADD s1 * old abcd1234 188 r DEL s1 189 r exec 190 r XADD s1 * new abcd1234 191 set res [$rd read] 192 assert {[lindex $res 0 0] eq {s1}} 193 assert {[lindex $res 0 1 0 1] eq {new abcd1234}} 194 } 195 196 test "XREAD: XADD + DEL + LPUSH should not awake client" { 197 set rd [redis_deferring_client] 198 r del s1 199 $rd XREAD BLOCK 20000 STREAMS s1 $ 200 r multi 201 r XADD s1 * old abcd1234 202 r DEL s1 203 r LPUSH s1 foo bar 204 r exec 205 r DEL s1 206 r XADD s1 * new abcd1234 207 set res [$rd read] 208 assert {[lindex $res 0 0] eq {s1}} 209 assert {[lindex $res 0 1 0 1] eq {new abcd1234}} 210 } 211 212 test {XREAD with same stream name multiple times should work} { 213 r XADD s2 * old abcd1234 214 set rd [redis_deferring_client] 215 $rd XREAD BLOCK 20000 STREAMS s2 s2 s2 $ $ $ 216 r XADD s2 * new abcd1234 217 set res [$rd read] 218 assert {[lindex $res 0 0] eq {s2}} 219 assert {[lindex $res 0 1 0 1] eq {new abcd1234}} 220 } 221 222 test {XREAD + multiple XADD inside transaction} { 223 r XADD s2 * old abcd1234 224 set rd [redis_deferring_client] 225 $rd XREAD BLOCK 20000 STREAMS s2 s2 s2 $ $ $ 226 r MULTI 227 r XADD s2 * field one 228 r XADD s2 * field two 229 r XADD s2 * field three 230 r EXEC 231 set res [$rd read] 232 assert {[lindex $res 0 0] eq {s2}} 233 assert {[lindex $res 0 1 0 1] eq {field one}} 234 assert {[lindex $res 0 1 1 1] eq {field two}} 235 } 236 237 test {XDEL basic test} { 238 r del somestream 239 r xadd somestream * foo value0 240 set id [r xadd somestream * foo value1] 241 r xadd somestream * foo value2 242 r xdel somestream $id 243 assert {[r xlen somestream] == 2} 244 set result [r xrange somestream - +] 245 assert {[lindex $result 0 1 1] eq {value0}} 246 assert {[lindex $result 1 1 1] eq {value2}} 247 } 248 249 # Here the idea is to check the consistency of the stream data structure 250 # as we remove all the elements down to zero elements. 251 test {XDEL fuzz test} { 252 r del somestream 253 set ids {} 254 set x 0; # Length of the stream 255 while 1 { 256 lappend ids [r xadd somestream * item $x] 257 incr x 258 # Add enough elements to have a few radix tree nodes inside the stream. 259 if {[dict get [r xinfo stream somestream] radix-tree-keys] > 20} break 260 } 261 262 # Now remove all the elements till we reach an empty stream 263 # and after every deletion, check that the stream is sane enough 264 # to report the right number of elements with XRANGE: this will also 265 # force accessing the whole data structure to check sanity. 266 assert {[r xlen somestream] == $x} 267 268 # We want to remove elements in random order to really test the 269 # implementation in a better way. 270 set ids [lshuffle $ids] 271 foreach id $ids { 272 assert {[r xdel somestream $id] == 1} 273 incr x -1 274 assert {[r xlen somestream] == $x} 275 # The test would be too slow calling XRANGE for every iteration. 276 # Do it every 100 removal. 277 if {$x % 100 == 0} { 278 set res [r xrange somestream - +] 279 assert {[llength $res] == $x} 280 } 281 } 282 } 283 284 test {XRANGE fuzzing} { 285 set low_id [lindex $items 0 0] 286 set high_id [lindex $items end 0] 287 for {set j 0} {$j < 100} {incr j} { 288 set start [streamRandomID $low_id $high_id] 289 set end [streamRandomID $low_id $high_id] 290 set range [r xrange mystream $start $end] 291 set tcl_range [streamSimulateXRANGE $items $start $end] 292 if {$range ne $tcl_range} { 293 puts "*** WARNING *** - XRANGE fuzzing mismatch: $start - $end" 294 puts "---" 295 puts "XRANGE: '$range'" 296 puts "---" 297 puts "TCL: '$tcl_range'" 298 puts "---" 299 fail "XRANGE fuzzing failed, check logs for details" 300 } 301 } 302 } 303 304 test {XREVRANGE regression test for issue #5006} { 305 # Add non compressed entries 306 r xadd teststream 1234567891230 key1 value1 307 r xadd teststream 1234567891240 key2 value2 308 r xadd teststream 1234567891250 key3 value3 309 310 # Add SAMEFIELD compressed entries 311 r xadd teststream2 1234567891230 key1 value1 312 r xadd teststream2 1234567891240 key1 value2 313 r xadd teststream2 1234567891250 key1 value3 314 315 assert_equal [r xrevrange teststream 1234567891245 -] {{1234567891240-0 {key2 value2}} {1234567891230-0 {key1 value1}}} 316 317 assert_equal [r xrevrange teststream2 1234567891245 -] {{1234567891240-0 {key1 value2}} {1234567891230-0 {key1 value1}}} 318 } 319} 320 321start_server {tags {"stream"} overrides {appendonly yes}} { 322 test {XADD with MAXLEN > xlen can propagate correctly} { 323 for {set j 0} {$j < 100} {incr j} { 324 r XADD mystream * xitem v 325 } 326 r XADD mystream MAXLEN 200 * xitem v 327 incr j 328 assert {[r xlen mystream] == $j} 329 r debug loadaof 330 r XADD mystream * xitem v 331 incr j 332 assert {[r xlen mystream] == $j} 333 } 334} 335 336start_server {tags {"stream"} overrides {appendonly yes}} { 337 test {XADD with ~ MAXLEN can propagate correctly} { 338 for {set j 0} {$j < 100} {incr j} { 339 r XADD mystream * xitem v 340 } 341 r XADD mystream MAXLEN ~ $j * xitem v 342 incr j 343 assert {[r xlen mystream] == $j} 344 r config set stream-node-max-entries 1 345 r debug loadaof 346 r XADD mystream * xitem v 347 incr j 348 assert {[r xlen mystream] == $j} 349 } 350} 351 352start_server {tags {"stream"} overrides {appendonly yes stream-node-max-entries 10}} { 353 test {XTRIM with ~ MAXLEN can propagate correctly} { 354 for {set j 0} {$j < 100} {incr j} { 355 r XADD mystream * xitem v 356 } 357 r XTRIM mystream MAXLEN ~ 85 358 assert {[r xlen mystream] == 89} 359 r config set stream-node-max-entries 1 360 r debug loadaof 361 r XADD mystream * xitem v 362 incr j 363 assert {[r xlen mystream] == 90} 364 } 365} 366 367start_server {tags {"xsetid"}} { 368 test {XADD can CREATE an empty stream} { 369 r XADD mystream MAXLEN 0 * a b 370 assert {[dict get [r xinfo stream mystream] length] == 0} 371 } 372 373 test {XSETID can set a specific ID} { 374 r XSETID mystream "200-0" 375 assert {[dict get [r xinfo stream mystream] last-generated-id] == "200-0"} 376 } 377 378 test {XSETID cannot SETID with smaller ID} { 379 r XADD mystream * a b 380 catch {r XSETID mystream "1-1"} err 381 r XADD mystream MAXLEN 0 * a b 382 set err 383 } {ERR*smaller*} 384 385 test {XSETID cannot SETID on non-existent key} { 386 catch {r XSETID stream 1-1} err 387 set _ $err 388 } {ERR no such key} 389} 390 391start_server {tags {"stream"} overrides {appendonly yes aof-use-rdb-preamble no}} { 392 test {Empty stream can be rewrite into AOF correctly} { 393 r XADD mystream MAXLEN 0 * a b 394 assert {[dict get [r xinfo stream mystream] length] == 0} 395 r bgrewriteaof 396 waitForBgrewriteaof r 397 r debug loadaof 398 assert {[dict get [r xinfo stream mystream] length] == 0} 399 } 400 401 test {Stream can be rewrite into AOF correctly after XDEL lastid} { 402 r XSETID mystream 0-0 403 r XADD mystream 1-1 a b 404 r XADD mystream 2-2 a b 405 assert {[dict get [r xinfo stream mystream] length] == 2} 406 r XDEL mystream 2-2 407 r bgrewriteaof 408 waitForBgrewriteaof r 409 r debug loadaof 410 assert {[dict get [r xinfo stream mystream] length] == 1} 411 assert {[dict get [r xinfo stream mystream] last-generated-id] == "2-2"} 412 } 413} 414