1start_server { 2 tags {"stream"} 3} { 4 test {XGROUP CREATE: creation and duplicate group name detection} { 5 r DEL mystream 6 r XADD mystream * foo bar 7 r XGROUP CREATE mystream mygroup $ 8 catch {r XGROUP CREATE mystream mygroup $} err 9 set err 10 } {BUSYGROUP*} 11 12 test {XGROUP CREATE: automatic stream creation fails without MKSTREAM} { 13 r DEL mystream 14 catch {r XGROUP CREATE mystream mygroup $} err 15 set err 16 } {ERR*} 17 18 test {XGROUP CREATE: automatic stream creation works with MKSTREAM} { 19 r DEL mystream 20 r XGROUP CREATE mystream mygroup $ MKSTREAM 21 } {OK} 22 23 test {XREADGROUP will return only new elements} { 24 r XADD mystream * a 1 25 r XADD mystream * b 2 26 # XREADGROUP should return only the new elements "a 1" "b 1" 27 # and not the element "foo bar" which was pre existing in the 28 # stream (see previous test) 29 set reply [ 30 r XREADGROUP GROUP mygroup client-1 STREAMS mystream ">" 31 ] 32 assert {[llength [lindex $reply 0 1]] == 2} 33 lindex $reply 0 1 0 1 34 } {a 1} 35 36 test {XREADGROUP can read the history of the elements we own} { 37 # Add a few more elements 38 r XADD mystream * c 3 39 r XADD mystream * d 4 40 # Read a few elements using a different consumer name 41 set reply [ 42 r XREADGROUP GROUP mygroup client-2 STREAMS mystream ">" 43 ] 44 assert {[llength [lindex $reply 0 1]] == 2} 45 assert {[lindex $reply 0 1 0 1] eq {c 3}} 46 47 set r1 [r XREADGROUP GROUP mygroup client-1 COUNT 10 STREAMS mystream 0] 48 set r2 [r XREADGROUP GROUP mygroup client-2 COUNT 10 STREAMS mystream 0] 49 assert {[lindex $r1 0 1 0 1] eq {a 1}} 50 assert {[lindex $r2 0 1 0 1] eq {c 3}} 51 } 52 53 test {XPENDING is able to return pending items} { 54 set pending [r XPENDING mystream mygroup - + 10] 55 assert {[llength $pending] == 4} 56 for {set j 0} {$j < 4} {incr j} { 57 set item [lindex $pending $j] 58 if {$j < 2} { 59 set owner client-1 60 } else { 61 set owner client-2 62 } 63 assert {[lindex $item 1] eq $owner} 64 assert {[lindex $item 1] eq $owner} 65 } 66 } 67 68 test {XPENDING can return single consumer items} { 69 set pending [r XPENDING mystream mygroup - + 10 client-1] 70 assert {[llength $pending] == 2} 71 } 72 73 test {XACK is able to remove items from the client/group PEL} { 74 set pending [r XPENDING mystream mygroup - + 10 client-1] 75 set id1 [lindex $pending 0 0] 76 set id2 [lindex $pending 1 0] 77 assert {[r XACK mystream mygroup $id1] eq 1} 78 set pending [r XPENDING mystream mygroup - + 10 client-1] 79 assert {[llength $pending] == 1} 80 set id [lindex $pending 0 0] 81 assert {$id eq $id2} 82 set global_pel [r XPENDING mystream mygroup - + 10] 83 assert {[llength $global_pel] == 3} 84 } 85 86 test {XACK can't remove the same item multiple times} { 87 assert {[r XACK mystream mygroup $id1] eq 0} 88 } 89 90 test {XACK is able to accept multiple arguments} { 91 # One of the IDs was already removed, so it should ack 92 # just ID2. 93 assert {[r XACK mystream mygroup $id1 $id2] eq 1} 94 } 95 96 test {PEL NACK reassignment after XGROUP SETID event} { 97 r del events 98 r xadd events * f1 v1 99 r xadd events * f1 v1 100 r xadd events * f1 v1 101 r xadd events * f1 v1 102 r xgroup create events g1 $ 103 r xadd events * f1 v1 104 set c [llength [lindex [r xreadgroup group g1 c1 streams events >] 0 1]] 105 assert {$c == 1} 106 r xgroup setid events g1 - 107 set c [llength [lindex [r xreadgroup group g1 c2 streams events >] 0 1]] 108 assert {$c == 5} 109 } 110 111 test {XREADGROUP will not report data on empty history. Bug #5577} { 112 r del events 113 r xadd events * a 1 114 r xadd events * b 2 115 r xadd events * c 3 116 r xgroup create events mygroup 0 117 118 # Current local PEL should be empty 119 set res [r xpending events mygroup - + 10] 120 assert {[llength $res] == 0} 121 122 # So XREADGROUP should read an empty history as well 123 set res [r xreadgroup group mygroup myconsumer count 3 streams events 0] 124 assert {[llength [lindex $res 0 1]] == 0} 125 126 # We should fetch all the elements in the stream asking for > 127 set res [r xreadgroup group mygroup myconsumer count 3 streams events >] 128 assert {[llength [lindex $res 0 1]] == 3} 129 130 # Now the history is populated with three not acked entries 131 set res [r xreadgroup group mygroup myconsumer count 3 streams events 0] 132 assert {[llength [lindex $res 0 1]] == 3} 133 } 134 135 test {XREADGROUP history reporting of deleted entries. Bug #5570} { 136 r del mystream 137 r XGROUP CREATE mystream mygroup $ MKSTREAM 138 r XADD mystream 1 field1 A 139 r XREADGROUP GROUP mygroup myconsumer STREAMS mystream > 140 r XADD mystream MAXLEN 1 2 field1 B 141 r XREADGROUP GROUP mygroup myconsumer STREAMS mystream > 142 143 # Now we have two pending entries, however one should be deleted 144 # and one should be ok (we should only see "B") 145 set res [r XREADGROUP GROUP mygroup myconsumer STREAMS mystream 0-1] 146 assert {[lindex $res 0 1 0] == {1-0 {}}} 147 assert {[lindex $res 0 1 1] == {2-0 {field1 B}}} 148 } 149 150 test {XCLAIM can claim PEL items from another consumer} { 151 # Add 3 items into the stream, and create a consumer group 152 r del mystream 153 set id1 [r XADD mystream * a 1] 154 set id2 [r XADD mystream * b 2] 155 set id3 [r XADD mystream * c 3] 156 r XGROUP CREATE mystream mygroup 0 157 158 # Client 1 reads item 1 from the stream without acknowledgements. 159 # Client 2 then claims pending item 1 from the PEL of client 1 160 set reply [ 161 r XREADGROUP GROUP mygroup client1 count 1 STREAMS mystream > 162 ] 163 assert {[llength [lindex $reply 0 1 0 1]] == 2} 164 assert {[lindex $reply 0 1 0 1] eq {a 1}} 165 r debug sleep 0.2 166 set reply [ 167 r XCLAIM mystream mygroup client2 10 $id1 168 ] 169 assert {[llength [lindex $reply 0 1]] == 2} 170 assert {[lindex $reply 0 1] eq {a 1}} 171 172 # Client 1 reads another 2 items from stream 173 r XREADGROUP GROUP mygroup client1 count 2 STREAMS mystream > 174 r debug sleep 0.2 175 176 # Delete item 2 from the stream. Now client 1 has PEL that contains 177 # only item 3. Try to use client 2 to claim the deleted item 2 178 # from the PEL of client 1, this should return nil 179 r XDEL mystream $id2 180 set reply [ 181 r XCLAIM mystream mygroup client2 10 $id2 182 ] 183 assert {[llength $reply] == 1} 184 assert_equal "" [lindex $reply 0] 185 186 # Delete item 3 from the stream. Now client 1 has PEL that is empty. 187 # Try to use client 2 to claim the deleted item 3 from the PEL 188 # of client 1, this should return nil 189 r debug sleep 0.2 190 r XDEL mystream $id3 191 set reply [ 192 r XCLAIM mystream mygroup client2 10 $id3 193 ] 194 assert {[llength $reply] == 1} 195 assert_equal "" [lindex $reply 0] 196 } 197 198 test {XCLAIM without JUSTID increments delivery count} { 199 # Add 3 items into the stream, and create a consumer group 200 r del mystream 201 set id1 [r XADD mystream * a 1] 202 set id2 [r XADD mystream * b 2] 203 set id3 [r XADD mystream * c 3] 204 r XGROUP CREATE mystream mygroup 0 205 206 # Client 1 reads item 1 from the stream without acknowledgements. 207 # Client 2 then claims pending item 1 from the PEL of client 1 208 set reply [ 209 r XREADGROUP GROUP mygroup client1 count 1 STREAMS mystream > 210 ] 211 assert {[llength [lindex $reply 0 1 0 1]] == 2} 212 assert {[lindex $reply 0 1 0 1] eq {a 1}} 213 r debug sleep 0.2 214 set reply [ 215 r XCLAIM mystream mygroup client2 10 $id1 216 ] 217 assert {[llength [lindex $reply 0 1]] == 2} 218 assert {[lindex $reply 0 1] eq {a 1}} 219 220 set reply [ 221 r XPENDING mystream mygroup - + 10 222 ] 223 assert {[llength [lindex $reply 0]] == 4} 224 assert {[lindex $reply 0 3] == 2} 225 226 # Client 3 then claims pending item 1 from the PEL of client 2 using JUSTID 227 r debug sleep 0.2 228 set reply [ 229 r XCLAIM mystream mygroup client3 10 $id1 JUSTID 230 ] 231 assert {[llength $reply] == 1} 232 assert {[lindex $reply 0] eq $id1} 233 234 set reply [ 235 r XPENDING mystream mygroup - + 10 236 ] 237 assert {[llength [lindex $reply 0]] == 4} 238 assert {[lindex $reply 0 3] == 2} 239 } 240 241 start_server {} { 242 set master [srv -1 client] 243 set master_host [srv -1 host] 244 set master_port [srv -1 port] 245 set slave [srv 0 client] 246 247 foreach noack {0 1} { 248 test "Consumer group last ID propagation to slave (NOACK=$noack)" { 249 $slave slaveof $master_host $master_port 250 wait_for_condition 50 100 { 251 [s 0 master_link_status] eq {up} 252 } else { 253 fail "Replication not started." 254 } 255 256 $master del stream 257 $master xadd stream * a 1 258 $master xadd stream * a 2 259 $master xadd stream * a 3 260 $master xgroup create stream mygroup 0 261 262 # Consume the first two items on the master 263 for {set j 0} {$j < 2} {incr j} { 264 if {$noack} { 265 set item [$master xreadgroup group mygroup \ 266 myconsumer COUNT 1 NOACK STREAMS stream >] 267 } else { 268 set item [$master xreadgroup group mygroup \ 269 myconsumer COUNT 1 STREAMS stream >] 270 } 271 set id [lindex $item 0 1 0 0] 272 if {$noack == 0} { 273 assert {[$master xack stream mygroup $id] eq "1"} 274 } 275 } 276 277 wait_for_ofs_sync $master $slave 278 279 # Turn slave into master 280 $slave slaveof no one 281 282 set item [$slave xreadgroup group mygroup myconsumer \ 283 COUNT 1 STREAMS stream >] 284 285 # The consumed enty should be the third 286 set myentry [lindex $item 0 1 0 1] 287 assert {$myentry eq {a 3}} 288 } 289 } 290 } 291} 292