1 /*
2 * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of Redis nor the names of its contributors may be used
14 * to endorse or promote products derived from this software without
15 * specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
28 */
29
30 #include "server.h"
31
32 /* ================================ MULTI/EXEC ============================== */
33
34 /* Client state initialization for MULTI/EXEC */
initClientMultiState(client * c)35 void initClientMultiState(client *c) {
36 c->mstate.commands = NULL;
37 c->mstate.count = 0;
38 c->mstate.cmd_flags = 0;
39 }
40
41 /* Release all the resources associated with MULTI/EXEC state */
freeClientMultiState(client * c)42 void freeClientMultiState(client *c) {
43 int j;
44
45 for (j = 0; j < c->mstate.count; j++) {
46 int i;
47 multiCmd *mc = c->mstate.commands+j;
48
49 for (i = 0; i < mc->argc; i++)
50 decrRefCount(mc->argv[i]);
51 zfree(mc->argv);
52 }
53 zfree(c->mstate.commands);
54 }
55
56 /* Add a new command into the MULTI commands queue */
queueMultiCommand(client * c)57 void queueMultiCommand(client *c) {
58 multiCmd *mc;
59 int j;
60
61 c->mstate.commands = zrealloc(c->mstate.commands,
62 sizeof(multiCmd)*(c->mstate.count+1));
63 mc = c->mstate.commands+c->mstate.count;
64 mc->cmd = c->cmd;
65 mc->argc = c->argc;
66 mc->argv = zmalloc(sizeof(robj*)*c->argc);
67 memcpy(mc->argv,c->argv,sizeof(robj*)*c->argc);
68 for (j = 0; j < c->argc; j++)
69 incrRefCount(mc->argv[j]);
70 c->mstate.count++;
71 c->mstate.cmd_flags |= c->cmd->flags;
72 }
73
discardTransaction(client * c)74 void discardTransaction(client *c) {
75 freeClientMultiState(c);
76 initClientMultiState(c);
77 c->flags &= ~(CLIENT_MULTI|CLIENT_DIRTY_CAS|CLIENT_DIRTY_EXEC);
78 unwatchAllKeys(c);
79 }
80
81 /* Flag the transacation as DIRTY_EXEC so that EXEC will fail.
82 * Should be called every time there is an error while queueing a command. */
flagTransaction(client * c)83 void flagTransaction(client *c) {
84 if (c->flags & CLIENT_MULTI)
85 c->flags |= CLIENT_DIRTY_EXEC;
86 }
87
multiCommand(client * c)88 void multiCommand(client *c) {
89 if (c->flags & CLIENT_MULTI) {
90 addReplyError(c,"MULTI calls can not be nested");
91 return;
92 }
93 c->flags |= CLIENT_MULTI;
94 addReply(c,shared.ok);
95 }
96
discardCommand(client * c)97 void discardCommand(client *c) {
98 if (!(c->flags & CLIENT_MULTI)) {
99 addReplyError(c,"DISCARD without MULTI");
100 return;
101 }
102 discardTransaction(c);
103 addReply(c,shared.ok);
104 }
105
106 /* Send a MULTI command to all the slaves and AOF file. Check the execCommand
107 * implementation for more information. */
execCommandPropagateMulti(client * c)108 void execCommandPropagateMulti(client *c) {
109 robj *multistring = createStringObject("MULTI",5);
110
111 propagate(server.multiCommand,c->db->id,&multistring,1,
112 PROPAGATE_AOF|PROPAGATE_REPL);
113 decrRefCount(multistring);
114 }
115
execCommand(client * c)116 void execCommand(client *c) {
117 int j;
118 robj **orig_argv;
119 int orig_argc;
120 struct redisCommand *orig_cmd;
121 int must_propagate = 0; /* Need to propagate MULTI/EXEC to AOF / slaves? */
122 int was_master = server.masterhost == NULL;
123
124 if (!(c->flags & CLIENT_MULTI)) {
125 addReplyError(c,"EXEC without MULTI");
126 return;
127 }
128
129 /* Check if we need to abort the EXEC because:
130 * 1) Some WATCHed key was touched.
131 * 2) There was a previous error while queueing commands.
132 * A failed EXEC in the first case returns a multi bulk nil object
133 * (technically it is not an error but a special behavior), while
134 * in the second an EXECABORT error is returned. */
135 if (c->flags & (CLIENT_DIRTY_CAS|CLIENT_DIRTY_EXEC)) {
136 addReply(c, c->flags & CLIENT_DIRTY_EXEC ? shared.execaborterr :
137 shared.nullmultibulk);
138 discardTransaction(c);
139 goto handle_monitor;
140 }
141
142 /* If there are write commands inside the transaction, and this is a read
143 * only slave, we want to send an error. This happens when the transaction
144 * was initiated when the instance was a master or a writable replica and
145 * then the configuration changed (for example instance was turned into
146 * a replica). */
147 if (!server.loading && server.masterhost && server.repl_slave_ro &&
148 !(c->flags & CLIENT_MASTER) && c->mstate.cmd_flags & CMD_WRITE)
149 {
150 addReplyError(c,
151 "Transaction contains write commands but instance "
152 "is now a read-only slave. EXEC aborted.");
153 discardTransaction(c);
154 goto handle_monitor;
155 }
156
157 /* Exec all the queued commands */
158 unwatchAllKeys(c); /* Unwatch ASAP otherwise we'll waste CPU cycles */
159 orig_argv = c->argv;
160 orig_argc = c->argc;
161 orig_cmd = c->cmd;
162 addReplyMultiBulkLen(c,c->mstate.count);
163 for (j = 0; j < c->mstate.count; j++) {
164 c->argc = c->mstate.commands[j].argc;
165 c->argv = c->mstate.commands[j].argv;
166 c->cmd = c->mstate.commands[j].cmd;
167
168 /* Propagate a MULTI request once we encounter the first command which
169 * is not readonly nor an administrative one.
170 * This way we'll deliver the MULTI/..../EXEC block as a whole and
171 * both the AOF and the replication link will have the same consistency
172 * and atomicity guarantees. */
173 if (!must_propagate && !(c->cmd->flags & (CMD_READONLY|CMD_ADMIN))) {
174 execCommandPropagateMulti(c);
175 must_propagate = 1;
176 }
177
178 call(c,server.loading ? CMD_CALL_NONE : CMD_CALL_FULL);
179
180 /* Commands may alter argc/argv, restore mstate. */
181 c->mstate.commands[j].argc = c->argc;
182 c->mstate.commands[j].argv = c->argv;
183 c->mstate.commands[j].cmd = c->cmd;
184 }
185 c->argv = orig_argv;
186 c->argc = orig_argc;
187 c->cmd = orig_cmd;
188 discardTransaction(c);
189
190 /* Make sure the EXEC command will be propagated as well if MULTI
191 * was already propagated. */
192 if (must_propagate) {
193 int is_master = server.masterhost == NULL;
194 server.dirty++;
195 /* If inside the MULTI/EXEC block this instance was suddenly
196 * switched from master to slave (using the SLAVEOF command), the
197 * initial MULTI was propagated into the replication backlog, but the
198 * rest was not. We need to make sure to at least terminate the
199 * backlog with the final EXEC. */
200 if (server.repl_backlog && was_master && !is_master) {
201 char *execcmd = "*1\r\n$4\r\nEXEC\r\n";
202 feedReplicationBacklog(execcmd,strlen(execcmd));
203 }
204 }
205
206 handle_monitor:
207 /* Send EXEC to clients waiting data from MONITOR. We do it here
208 * since the natural order of commands execution is actually:
209 * MUTLI, EXEC, ... commands inside transaction ...
210 * Instead EXEC is flagged as CMD_SKIP_MONITOR in the command
211 * table, and we do it here with correct ordering. */
212 if (listLength(server.monitors) && !server.loading)
213 replicationFeedMonitors(c,server.monitors,c->db->id,c->argv,c->argc);
214 }
215
216 /* ===================== WATCH (CAS alike for MULTI/EXEC) ===================
217 *
218 * The implementation uses a per-DB hash table mapping keys to list of clients
219 * WATCHing those keys, so that given a key that is going to be modified
220 * we can mark all the associated clients as dirty.
221 *
222 * Also every client contains a list of WATCHed keys so that's possible to
223 * un-watch such keys when the client is freed or when UNWATCH is called. */
224
225 /* In the client->watched_keys list we need to use watchedKey structures
226 * as in order to identify a key in Redis we need both the key name and the
227 * DB */
228 typedef struct watchedKey {
229 robj *key;
230 redisDb *db;
231 } watchedKey;
232
233 /* Watch for the specified key */
watchForKey(client * c,robj * key)234 void watchForKey(client *c, robj *key) {
235 list *clients = NULL;
236 listIter li;
237 listNode *ln;
238 watchedKey *wk;
239
240 /* Check if we are already watching for this key */
241 listRewind(c->watched_keys,&li);
242 while((ln = listNext(&li))) {
243 wk = listNodeValue(ln);
244 if (wk->db == c->db && equalStringObjects(key,wk->key))
245 return; /* Key already watched */
246 }
247 /* This key is not already watched in this DB. Let's add it */
248 clients = dictFetchValue(c->db->watched_keys,key);
249 if (!clients) {
250 clients = listCreate();
251 dictAdd(c->db->watched_keys,key,clients);
252 incrRefCount(key);
253 }
254 listAddNodeTail(clients,c);
255 /* Add the new key to the list of keys watched by this client */
256 wk = zmalloc(sizeof(*wk));
257 wk->key = key;
258 wk->db = c->db;
259 incrRefCount(key);
260 listAddNodeTail(c->watched_keys,wk);
261 }
262
263 /* Unwatch all the keys watched by this client. To clean the EXEC dirty
264 * flag is up to the caller. */
unwatchAllKeys(client * c)265 void unwatchAllKeys(client *c) {
266 listIter li;
267 listNode *ln;
268
269 if (listLength(c->watched_keys) == 0) return;
270 listRewind(c->watched_keys,&li);
271 while((ln = listNext(&li))) {
272 list *clients;
273 watchedKey *wk;
274
275 /* Lookup the watched key -> clients list and remove the client
276 * from the list */
277 wk = listNodeValue(ln);
278 clients = dictFetchValue(wk->db->watched_keys, wk->key);
279 serverAssertWithInfo(c,NULL,clients != NULL);
280 listDelNode(clients,listSearchKey(clients,c));
281 /* Kill the entry at all if this was the only client */
282 if (listLength(clients) == 0)
283 dictDelete(wk->db->watched_keys, wk->key);
284 /* Remove this watched key from the client->watched list */
285 listDelNode(c->watched_keys,ln);
286 decrRefCount(wk->key);
287 zfree(wk);
288 }
289 }
290
291 /* "Touch" a key, so that if this key is being WATCHed by some client the
292 * next EXEC will fail. */
touchWatchedKey(redisDb * db,robj * key)293 void touchWatchedKey(redisDb *db, robj *key) {
294 list *clients;
295 listIter li;
296 listNode *ln;
297
298 if (dictSize(db->watched_keys) == 0) return;
299 clients = dictFetchValue(db->watched_keys, key);
300 if (!clients) return;
301
302 /* Mark all the clients watching this key as CLIENT_DIRTY_CAS */
303 /* Check if we are already watching for this key */
304 listRewind(clients,&li);
305 while((ln = listNext(&li))) {
306 client *c = listNodeValue(ln);
307
308 c->flags |= CLIENT_DIRTY_CAS;
309 }
310 }
311
312 /* On FLUSHDB or FLUSHALL all the watched keys that are present before the
313 * flush but will be deleted as effect of the flushing operation should
314 * be touched. "dbid" is the DB that's getting the flush. -1 if it is
315 * a FLUSHALL operation (all the DBs flushed). */
touchWatchedKeysOnFlush(int dbid)316 void touchWatchedKeysOnFlush(int dbid) {
317 listIter li1, li2;
318 listNode *ln;
319
320 /* For every client, check all the waited keys */
321 listRewind(server.clients,&li1);
322 while((ln = listNext(&li1))) {
323 client *c = listNodeValue(ln);
324 listRewind(c->watched_keys,&li2);
325 while((ln = listNext(&li2))) {
326 watchedKey *wk = listNodeValue(ln);
327
328 /* For every watched key matching the specified DB, if the
329 * key exists, mark the client as dirty, as the key will be
330 * removed. */
331 if (dbid == -1 || wk->db->id == dbid) {
332 if (dictFind(wk->db->dict, wk->key->ptr) != NULL)
333 c->flags |= CLIENT_DIRTY_CAS;
334 }
335 }
336 }
337 }
338
watchCommand(client * c)339 void watchCommand(client *c) {
340 int j;
341
342 if (c->flags & CLIENT_MULTI) {
343 addReplyError(c,"WATCH inside MULTI is not allowed");
344 return;
345 }
346 for (j = 1; j < c->argc; j++)
347 watchForKey(c,c->argv[j]);
348 addReply(c,shared.ok);
349 }
350
unwatchCommand(client * c)351 void unwatchCommand(client *c) {
352 unwatchAllKeys(c);
353 c->flags &= (~CLIENT_DIRTY_CAS);
354 addReply(c,shared.ok);
355 }
356