1 /* 2 * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com> 3 * All rights reserved. 4 * 5 * Redistribution and use in source and binary forms, with or without 6 * modification, are permitted provided that the following conditions are met: 7 * 8 * * Redistributions of source code must retain the above copyright notice, 9 * this list of conditions and the following disclaimer. 10 * * Redistributions in binary form must reproduce the above copyright 11 * notice, this list of conditions and the following disclaimer in the 12 * documentation and/or other materials provided with the distribution. 13 * * Neither the name of Redis nor the names of its contributors may be used 14 * to endorse or promote products derived from this software without 15 * specific prior written permission. 16 * 17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 27 * POSSIBILITY OF SUCH DAMAGE. 28 */ 29 30 #include "redis.h" 31 32 /* ================================ MULTI/EXEC ============================== */ 33 34 /* Client state initialization for MULTI/EXEC */ 35 void initClientMultiState(redisClient *c) { 36 c->mstate.commands = NULL; 37 c->mstate.count = 0; 38 } 39 40 /* Release all the resources associated with MULTI/EXEC state */ 41 void freeClientMultiState(redisClient *c) { 42 int j; 43 44 for (j = 0; j < c->mstate.count; j++) { 45 int i; 46 multiCmd *mc = c->mstate.commands+j; 47 48 for (i = 0; i < mc->argc; i++) 49 decrRefCount(mc->argv[i]); 50 zfree(mc->argv); 51 } 52 zfree(c->mstate.commands); 53 } 54 55 /* Add a new command into the MULTI commands queue */ 56 void queueMultiCommand(redisClient *c) { 57 multiCmd *mc; 58 int j; 59 60 c->mstate.commands = zrealloc(c->mstate.commands, 61 sizeof(multiCmd)*(c->mstate.count+1)); 62 mc = c->mstate.commands+c->mstate.count; 63 mc->cmd = c->cmd; 64 mc->argc = c->argc; 65 mc->argv = zmalloc(sizeof(robj*)*c->argc); 66 memcpy(mc->argv,c->argv,sizeof(robj*)*c->argc); 67 for (j = 0; j < c->argc; j++) 68 incrRefCount(mc->argv[j]); 69 c->mstate.count++; 70 } 71 72 void discardTransaction(redisClient *c) { 73 freeClientMultiState(c); 74 initClientMultiState(c); 75 c->flags &= ~(REDIS_MULTI|REDIS_DIRTY_CAS|REDIS_DIRTY_EXEC);; 76 unwatchAllKeys(c); 77 } 78 79 /* Flag the transacation as DIRTY_EXEC so that EXEC will fail. 80 * Should be called every time there is an error while queueing a command. */ 81 void flagTransaction(redisClient *c) { 82 if (c->flags & REDIS_MULTI) 83 c->flags |= REDIS_DIRTY_EXEC; 84 } 85 86 void multiCommand(redisClient *c) { 87 if (c->flags & REDIS_MULTI) { 88 addReplyError(c,"MULTI calls can not be nested"); 89 return; 90 } 91 c->flags |= REDIS_MULTI; 92 addReply(c,shared.ok); 93 } 94 95 void discardCommand(redisClient *c) { 96 if (!(c->flags & REDIS_MULTI)) { 97 addReplyError(c,"DISCARD without MULTI"); 98 return; 99 } 100 discardTransaction(c); 101 addReply(c,shared.ok); 102 } 103 104 /* Send a MULTI command to all the slaves and AOF file. Check the execCommand 105 * implementation for more information. */ 106 void execCommandPropagateMulti(redisClient *c) { 107 robj *multistring = createStringObject("MULTI",5); 108 109 propagate(server.multiCommand,c->db->id,&multistring,1, 110 REDIS_PROPAGATE_AOF|REDIS_PROPAGATE_REPL); 111 decrRefCount(multistring); 112 } 113 114 void execCommand(redisClient *c) { 115 int j; 116 robj **orig_argv; 117 int orig_argc; 118 struct redisCommand *orig_cmd; 119 int must_propagate = 0; /* Need to propagate MULTI/EXEC to AOF / slaves? */ 120 121 if (!(c->flags & REDIS_MULTI)) { 122 addReplyError(c,"EXEC without MULTI"); 123 return; 124 } 125 126 /* Check if we need to abort the EXEC because: 127 * 1) Some WATCHed key was touched. 128 * 2) There was a previous error while queueing commands. 129 * A failed EXEC in the first case returns a multi bulk nil object 130 * (technically it is not an error but a special behavior), while 131 * in the second an EXECABORT error is returned. */ 132 if (c->flags & (REDIS_DIRTY_CAS|REDIS_DIRTY_EXEC)) { 133 addReply(c, c->flags & REDIS_DIRTY_EXEC ? shared.execaborterr : 134 shared.nullmultibulk); 135 discardTransaction(c); 136 goto handle_monitor; 137 } 138 139 /* Exec all the queued commands */ 140 unwatchAllKeys(c); /* Unwatch ASAP otherwise we'll waste CPU cycles */ 141 orig_argv = c->argv; 142 orig_argc = c->argc; 143 orig_cmd = c->cmd; 144 addReplyMultiBulkLen(c,c->mstate.count); 145 for (j = 0; j < c->mstate.count; j++) { 146 c->argc = c->mstate.commands[j].argc; 147 c->argv = c->mstate.commands[j].argv; 148 c->cmd = c->mstate.commands[j].cmd; 149 150 /* Propagate a MULTI request once we encounter the first write op. 151 * This way we'll deliver the MULTI/..../EXEC block as a whole and 152 * both the AOF and the replication link will have the same consistency 153 * and atomicity guarantees. */ 154 if (!must_propagate && !(c->cmd->flags & REDIS_CMD_READONLY)) { 155 execCommandPropagateMulti(c); 156 must_propagate = 1; 157 } 158 159 call(c,REDIS_CALL_FULL); 160 161 /* Commands may alter argc/argv, restore mstate. */ 162 c->mstate.commands[j].argc = c->argc; 163 c->mstate.commands[j].argv = c->argv; 164 c->mstate.commands[j].cmd = c->cmd; 165 } 166 c->argv = orig_argv; 167 c->argc = orig_argc; 168 c->cmd = orig_cmd; 169 discardTransaction(c); 170 /* Make sure the EXEC command will be propagated as well if MULTI 171 * was already propagated. */ 172 if (must_propagate) server.dirty++; 173 174 handle_monitor: 175 /* Send EXEC to clients waiting data from MONITOR. We do it here 176 * since the natural order of commands execution is actually: 177 * MUTLI, EXEC, ... commands inside transaction ... 178 * Instead EXEC is flagged as REDIS_CMD_SKIP_MONITOR in the command 179 * table, and we do it here with correct ordering. */ 180 if (listLength(server.monitors) && !server.loading) 181 replicationFeedMonitors(c,server.monitors,c->db->id,c->argv,c->argc); 182 } 183 184 /* ===================== WATCH (CAS alike for MULTI/EXEC) =================== 185 * 186 * The implementation uses a per-DB hash table mapping keys to list of clients 187 * WATCHing those keys, so that given a key that is going to be modified 188 * we can mark all the associated clients as dirty. 189 * 190 * Also every client contains a list of WATCHed keys so that's possible to 191 * un-watch such keys when the client is freed or when UNWATCH is called. */ 192 193 /* In the client->watched_keys list we need to use watchedKey structures 194 * as in order to identify a key in Redis we need both the key name and the 195 * DB */ 196 typedef struct watchedKey { 197 robj *key; 198 redisDb *db; 199 } watchedKey; 200 201 /* Watch for the specified key */ 202 void watchForKey(redisClient *c, robj *key) { 203 list *clients = NULL; 204 listIter li; 205 listNode *ln; 206 watchedKey *wk; 207 208 /* Check if we are already watching for this key */ 209 listRewind(c->watched_keys,&li); 210 while((ln = listNext(&li))) { 211 wk = listNodeValue(ln); 212 if (wk->db == c->db && equalStringObjects(key,wk->key)) 213 return; /* Key already watched */ 214 } 215 /* This key is not already watched in this DB. Let's add it */ 216 clients = dictFetchValue(c->db->watched_keys,key); 217 if (!clients) { 218 clients = listCreate(); 219 dictAdd(c->db->watched_keys,key,clients); 220 incrRefCount(key); 221 } 222 listAddNodeTail(clients,c); 223 /* Add the new key to the list of keys watched by this client */ 224 wk = zmalloc(sizeof(*wk)); 225 wk->key = key; 226 wk->db = c->db; 227 incrRefCount(key); 228 listAddNodeTail(c->watched_keys,wk); 229 } 230 231 /* Unwatch all the keys watched by this client. To clean the EXEC dirty 232 * flag is up to the caller. */ 233 void unwatchAllKeys(redisClient *c) { 234 listIter li; 235 listNode *ln; 236 237 if (listLength(c->watched_keys) == 0) return; 238 listRewind(c->watched_keys,&li); 239 while((ln = listNext(&li))) { 240 list *clients; 241 watchedKey *wk; 242 243 /* Lookup the watched key -> clients list and remove the client 244 * from the list */ 245 wk = listNodeValue(ln); 246 clients = dictFetchValue(wk->db->watched_keys, wk->key); 247 redisAssertWithInfo(c,NULL,clients != NULL); 248 listDelNode(clients,listSearchKey(clients,c)); 249 /* Kill the entry at all if this was the only client */ 250 if (listLength(clients) == 0) 251 dictDelete(wk->db->watched_keys, wk->key); 252 /* Remove this watched key from the client->watched list */ 253 listDelNode(c->watched_keys,ln); 254 decrRefCount(wk->key); 255 zfree(wk); 256 } 257 } 258 259 /* "Touch" a key, so that if this key is being WATCHed by some client the 260 * next EXEC will fail. */ 261 void touchWatchedKey(redisDb *db, robj *key) { 262 list *clients; 263 listIter li; 264 listNode *ln; 265 266 if (dictSize(db->watched_keys) == 0) return; 267 clients = dictFetchValue(db->watched_keys, key); 268 if (!clients) return; 269 270 /* Mark all the clients watching this key as REDIS_DIRTY_CAS */ 271 /* Check if we are already watching for this key */ 272 listRewind(clients,&li); 273 while((ln = listNext(&li))) { 274 redisClient *c = listNodeValue(ln); 275 276 c->flags |= REDIS_DIRTY_CAS; 277 } 278 } 279 280 /* On FLUSHDB or FLUSHALL all the watched keys that are present before the 281 * flush but will be deleted as effect of the flushing operation should 282 * be touched. "dbid" is the DB that's getting the flush. -1 if it is 283 * a FLUSHALL operation (all the DBs flushed). */ 284 void touchWatchedKeysOnFlush(int dbid) { 285 listIter li1, li2; 286 listNode *ln; 287 288 /* For every client, check all the waited keys */ 289 listRewind(server.clients,&li1); 290 while((ln = listNext(&li1))) { 291 redisClient *c = listNodeValue(ln); 292 listRewind(c->watched_keys,&li2); 293 while((ln = listNext(&li2))) { 294 watchedKey *wk = listNodeValue(ln); 295 296 /* For every watched key matching the specified DB, if the 297 * key exists, mark the client as dirty, as the key will be 298 * removed. */ 299 if (dbid == -1 || wk->db->id == dbid) { 300 if (dictFind(wk->db->dict, wk->key->ptr) != NULL) 301 c->flags |= REDIS_DIRTY_CAS; 302 } 303 } 304 } 305 } 306 307 void watchCommand(redisClient *c) { 308 int j; 309 310 if (c->flags & REDIS_MULTI) { 311 addReplyError(c,"WATCH inside MULTI is not allowed"); 312 return; 313 } 314 for (j = 1; j < c->argc; j++) 315 watchForKey(c,c->argv[j]); 316 addReply(c,shared.ok); 317 } 318 319 void unwatchCommand(redisClient *c) { 320 unwatchAllKeys(c); 321 c->flags &= (~REDIS_DIRTY_CAS); 322 addReply(c,shared.ok); 323 } 324