1start_server {
2    tags {"stream"}
3} {
4    test {XGROUP CREATE: creation and duplicate group name detection} {
5        r DEL mystream
6        r XADD mystream * foo bar
7        r XGROUP CREATE mystream mygroup $
8        catch {r XGROUP CREATE mystream mygroup $} err
9        set err
10    } {BUSYGROUP*}
11
12    test {XGROUP CREATE: automatic stream creation fails without MKSTREAM} {
13        r DEL mystream
14        catch {r XGROUP CREATE mystream mygroup $} err
15        set err
16    } {ERR*}
17
18    test {XGROUP CREATE: automatic stream creation works with MKSTREAM} {
19        r DEL mystream
20        r XGROUP CREATE mystream mygroup $ MKSTREAM
21    } {OK}
22
23    test {XREADGROUP will return only new elements} {
24        r XADD mystream * a 1
25        r XADD mystream * b 2
26        # XREADGROUP should return only the new elements "a 1" "b 1"
27        # and not the element "foo bar" which was pre existing in the
28        # stream (see previous test)
29        set reply [
30            r XREADGROUP GROUP mygroup client-1 STREAMS mystream ">"
31        ]
32        assert {[llength [lindex $reply 0 1]] == 2}
33        lindex $reply 0 1 0 1
34    } {a 1}
35
36    test {XREADGROUP can read the history of the elements we own} {
37        # Add a few more elements
38        r XADD mystream * c 3
39        r XADD mystream * d 4
40        # Read a few elements using a different consumer name
41        set reply [
42            r XREADGROUP GROUP mygroup client-2 STREAMS mystream ">"
43        ]
44        assert {[llength [lindex $reply 0 1]] == 2}
45        assert {[lindex $reply 0 1 0 1] eq {c 3}}
46
47        set r1 [r XREADGROUP GROUP mygroup client-1 COUNT 10 STREAMS mystream 0]
48        set r2 [r XREADGROUP GROUP mygroup client-2 COUNT 10 STREAMS mystream 0]
49        assert {[lindex $r1 0 1 0 1] eq {a 1}}
50        assert {[lindex $r2 0 1 0 1] eq {c 3}}
51    }
52
53    test {XPENDING is able to return pending items} {
54        set pending [r XPENDING mystream mygroup - + 10]
55        assert {[llength $pending] == 4}
56        for {set j 0} {$j < 4} {incr j} {
57            set item [lindex $pending $j]
58            if {$j < 2} {
59                set owner client-1
60            } else {
61                set owner client-2
62            }
63            assert {[lindex $item 1] eq $owner}
64            assert {[lindex $item 1] eq $owner}
65        }
66    }
67
68    test {XPENDING can return single consumer items} {
69        set pending [r XPENDING mystream mygroup - + 10 client-1]
70        assert {[llength $pending] == 2}
71    }
72
73    test {XACK is able to remove items from the client/group PEL} {
74        set pending [r XPENDING mystream mygroup - + 10 client-1]
75        set id1 [lindex $pending 0 0]
76        set id2 [lindex $pending 1 0]
77        assert {[r XACK mystream mygroup $id1] eq 1}
78        set pending [r XPENDING mystream mygroup - + 10 client-1]
79        assert {[llength $pending] == 1}
80        set id [lindex $pending 0 0]
81        assert {$id eq $id2}
82        set global_pel [r XPENDING mystream mygroup - + 10]
83        assert {[llength $global_pel] == 3}
84    }
85
86    test {XACK can't remove the same item multiple times} {
87        assert {[r XACK mystream mygroup $id1] eq 0}
88    }
89
90    test {XACK is able to accept multiple arguments} {
91        # One of the IDs was already removed, so it should ack
92        # just ID2.
93        assert {[r XACK mystream mygroup $id1 $id2] eq 1}
94    }
95
96    test {PEL NACK reassignment after XGROUP SETID event} {
97        r del events
98        r xadd events * f1 v1
99        r xadd events * f1 v1
100        r xadd events * f1 v1
101        r xadd events * f1 v1
102        r xgroup create events g1 $
103        r xadd events * f1 v1
104        set c [llength [lindex [r xreadgroup group g1 c1 streams events >] 0 1]]
105        assert {$c == 1}
106        r xgroup setid events g1 -
107        set c [llength [lindex [r xreadgroup group g1 c2 streams events >] 0 1]]
108        assert {$c == 5}
109    }
110
111    test {XREADGROUP will not report data on empty history. Bug #5577} {
112        r del events
113        r xadd events * a 1
114        r xadd events * b 2
115        r xadd events * c 3
116        r xgroup create events mygroup 0
117
118        # Current local PEL should be empty
119        set res [r xpending events mygroup - + 10]
120        assert {[llength $res] == 0}
121
122        # So XREADGROUP should read an empty history as well
123        set res [r xreadgroup group mygroup myconsumer count 3 streams events 0]
124        assert {[llength [lindex $res 0 1]] == 0}
125
126        # We should fetch all the elements in the stream asking for >
127        set res [r xreadgroup group mygroup myconsumer count 3 streams events >]
128        assert {[llength [lindex $res 0 1]] == 3}
129
130        # Now the history is populated with three not acked entries
131        set res [r xreadgroup group mygroup myconsumer count 3 streams events 0]
132        assert {[llength [lindex $res 0 1]] == 3}
133    }
134
135    test {XREADGROUP history reporting of deleted entries. Bug #5570} {
136        r del mystream
137        r XGROUP CREATE mystream mygroup $ MKSTREAM
138        r XADD mystream 1 field1 A
139        r XREADGROUP GROUP mygroup myconsumer STREAMS mystream >
140        r XADD mystream MAXLEN 1 2 field1 B
141        r XREADGROUP GROUP mygroup myconsumer STREAMS mystream >
142
143        # Now we have two pending entries, however one should be deleted
144        # and one should be ok (we should only see "B")
145        set res [r XREADGROUP GROUP mygroup myconsumer STREAMS mystream 0-1]
146        assert {[lindex $res 0 1 0] == {1-0 {}}}
147        assert {[lindex $res 0 1 1] == {2-0 {field1 B}}}
148    }
149
150    test {XCLAIM can claim PEL items from another consumer} {
151        # Add 3 items into the stream, and create a consumer group
152        r del mystream
153        set id1 [r XADD mystream * a 1]
154        set id2 [r XADD mystream * b 2]
155        set id3 [r XADD mystream * c 3]
156        r XGROUP CREATE mystream mygroup 0
157
158        # Client 1 reads item 1 from the stream without acknowledgements.
159        # Client 2 then claims pending item 1 from the PEL of client 1
160        set reply [
161            r XREADGROUP GROUP mygroup client1 count 1 STREAMS mystream >
162        ]
163        assert {[llength [lindex $reply 0 1 0 1]] == 2}
164        assert {[lindex $reply 0 1 0 1] eq {a 1}}
165        r debug sleep 0.2
166        set reply [
167            r XCLAIM mystream mygroup client2 10 $id1
168        ]
169        assert {[llength [lindex $reply 0 1]] == 2}
170        assert {[lindex $reply 0 1] eq {a 1}}
171
172        # Client 1 reads another 2 items from stream
173        r XREADGROUP GROUP mygroup client1 count 2 STREAMS mystream >
174        r debug sleep 0.2
175
176        # Delete item 2 from the stream. Now client 1 has PEL that contains
177        # only item 3. Try to use client 2 to claim the deleted item 2
178        # from the PEL of client 1, this should return nil
179        r XDEL mystream $id2
180        set reply [
181            r XCLAIM mystream mygroup client2 10 $id2
182        ]
183        assert {[llength $reply] == 1}
184        assert_equal "" [lindex $reply 0]
185
186        # Delete item 3 from the stream. Now client 1 has PEL that is empty.
187        # Try to use client 2 to claim the deleted item 3 from the PEL
188        # of client 1, this should return nil
189        r debug sleep 0.2
190        r XDEL mystream $id3
191        set reply [
192            r XCLAIM mystream mygroup client2 10 $id3
193        ]
194        assert {[llength $reply] == 1}
195        assert_equal "" [lindex $reply 0]
196    }
197
198    test {XCLAIM without JUSTID increments delivery count} {
199        # Add 3 items into the stream, and create a consumer group
200        r del mystream
201        set id1 [r XADD mystream * a 1]
202        set id2 [r XADD mystream * b 2]
203        set id3 [r XADD mystream * c 3]
204        r XGROUP CREATE mystream mygroup 0
205
206        # Client 1 reads item 1 from the stream without acknowledgements.
207        # Client 2 then claims pending item 1 from the PEL of client 1
208        set reply [
209            r XREADGROUP GROUP mygroup client1 count 1 STREAMS mystream >
210        ]
211        assert {[llength [lindex $reply 0 1 0 1]] == 2}
212        assert {[lindex $reply 0 1 0 1] eq {a 1}}
213        r debug sleep 0.2
214        set reply [
215            r XCLAIM mystream mygroup client2 10 $id1
216        ]
217        assert {[llength [lindex $reply 0 1]] == 2}
218        assert {[lindex $reply 0 1] eq {a 1}}
219
220        set reply [
221            r XPENDING mystream mygroup - + 10
222        ]
223        assert {[llength [lindex $reply 0]] == 4}
224        assert {[lindex $reply 0 3] == 2}
225
226        # Client 3 then claims pending item 1 from the PEL of client 2 using JUSTID
227        r debug sleep 0.2
228        set reply [
229            r XCLAIM mystream mygroup client3 10 $id1 JUSTID
230        ]
231        assert {[llength $reply] == 1}
232        assert {[lindex $reply 0] eq $id1}
233
234        set reply [
235            r XPENDING mystream mygroup - + 10
236        ]
237        assert {[llength [lindex $reply 0]] == 4}
238        assert {[lindex $reply 0 3] == 2}
239    }
240
241    start_server {} {
242        set master [srv -1 client]
243        set master_host [srv -1 host]
244        set master_port [srv -1 port]
245        set slave [srv 0 client]
246
247        foreach noack {0 1} {
248            test "Consumer group last ID propagation to slave (NOACK=$noack)" {
249                $slave slaveof $master_host $master_port
250                wait_for_condition 50 100 {
251                    [s 0 master_link_status] eq {up}
252                } else {
253                    fail "Replication not started."
254                }
255
256                $master del stream
257                $master xadd stream * a 1
258                $master xadd stream * a 2
259                $master xadd stream * a 3
260                $master xgroup create stream mygroup 0
261
262                # Consume the first two items on the master
263                for {set j 0} {$j < 2} {incr j} {
264                    if {$noack} {
265                        set item [$master xreadgroup group mygroup \
266                                  myconsumer COUNT 1 NOACK STREAMS stream >]
267                    } else {
268                        set item [$master xreadgroup group mygroup \
269                                  myconsumer COUNT 1 STREAMS stream >]
270                    }
271                    set id [lindex $item 0 1 0 0]
272                    if {$noack == 0} {
273                        assert {[$master xack stream mygroup $id] eq "1"}
274                    }
275                }
276
277                wait_for_ofs_sync $master $slave
278
279                # Turn slave into master
280                $slave slaveof no one
281
282                set item [$slave xreadgroup group mygroup myconsumer \
283                          COUNT 1 STREAMS stream >]
284
285                # The consumed enty should be the third
286                set myentry [lindex $item 0 1 0 1]
287                assert {$myentry eq {a 3}}
288            }
289        }
290    }
291}
292