1 /* 2 * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com> 3 * All rights reserved. 4 * 5 * Redistribution and use in source and binary forms, with or without 6 * modification, are permitted provided that the following conditions are met: 7 * 8 * * Redistributions of source code must retain the above copyright notice, 9 * this list of conditions and the following disclaimer. 10 * * Redistributions in binary form must reproduce the above copyright 11 * notice, this list of conditions and the following disclaimer in the 12 * documentation and/or other materials provided with the distribution. 13 * * Neither the name of Redis nor the names of its contributors may be used 14 * to endorse or promote products derived from this software without 15 * specific prior written permission. 16 * 17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 27 * POSSIBILITY OF SUCH DAMAGE. 28 */ 29 30 #include "server.h" 31 32 /* ================================ MULTI/EXEC ============================== */ 33 34 /* Client state initialization for MULTI/EXEC */ 35 void initClientMultiState(client *c) { 36 c->mstate.commands = NULL; 37 c->mstate.count = 0; 38 c->mstate.cmd_flags = 0; 39 } 40 41 /* Release all the resources associated with MULTI/EXEC state */ 42 void freeClientMultiState(client *c) { 43 int j; 44 45 for (j = 0; j < c->mstate.count; j++) { 46 int i; 47 multiCmd *mc = c->mstate.commands+j; 48 49 for (i = 0; i < mc->argc; i++) 50 decrRefCount(mc->argv[i]); 51 zfree(mc->argv); 52 } 53 zfree(c->mstate.commands); 54 } 55 56 /* Add a new command into the MULTI commands queue */ 57 void queueMultiCommand(client *c) { 58 multiCmd *mc; 59 int j; 60 61 c->mstate.commands = zrealloc(c->mstate.commands, 62 sizeof(multiCmd)*(c->mstate.count+1)); 63 mc = c->mstate.commands+c->mstate.count; 64 mc->cmd = c->cmd; 65 mc->argc = c->argc; 66 mc->argv = zmalloc(sizeof(robj*)*c->argc); 67 memcpy(mc->argv,c->argv,sizeof(robj*)*c->argc); 68 for (j = 0; j < c->argc; j++) 69 incrRefCount(mc->argv[j]); 70 c->mstate.count++; 71 c->mstate.cmd_flags |= c->cmd->flags; 72 } 73 74 void discardTransaction(client *c) { 75 freeClientMultiState(c); 76 initClientMultiState(c); 77 c->flags &= ~(CLIENT_MULTI|CLIENT_DIRTY_CAS|CLIENT_DIRTY_EXEC); 78 unwatchAllKeys(c); 79 } 80 81 /* Flag the transacation as DIRTY_EXEC so that EXEC will fail. 82 * Should be called every time there is an error while queueing a command. */ 83 void flagTransaction(client *c) { 84 if (c->flags & CLIENT_MULTI) 85 c->flags |= CLIENT_DIRTY_EXEC; 86 } 87 88 void multiCommand(client *c) { 89 if (c->flags & CLIENT_MULTI) { 90 addReplyError(c,"MULTI calls can not be nested"); 91 return; 92 } 93 c->flags |= CLIENT_MULTI; 94 addReply(c,shared.ok); 95 } 96 97 void discardCommand(client *c) { 98 if (!(c->flags & CLIENT_MULTI)) { 99 addReplyError(c,"DISCARD without MULTI"); 100 return; 101 } 102 discardTransaction(c); 103 addReply(c,shared.ok); 104 } 105 106 /* Send a MULTI command to all the slaves and AOF file. Check the execCommand 107 * implementation for more information. */ 108 void execCommandPropagateMulti(client *c) { 109 robj *multistring = createStringObject("MULTI",5); 110 111 propagate(server.multiCommand,c->db->id,&multistring,1, 112 PROPAGATE_AOF|PROPAGATE_REPL); 113 decrRefCount(multistring); 114 } 115 116 void execCommand(client *c) { 117 int j; 118 robj **orig_argv; 119 int orig_argc; 120 struct redisCommand *orig_cmd; 121 int must_propagate = 0; /* Need to propagate MULTI/EXEC to AOF / slaves? */ 122 int was_master = server.masterhost == NULL; 123 124 if (!(c->flags & CLIENT_MULTI)) { 125 addReplyError(c,"EXEC without MULTI"); 126 return; 127 } 128 129 /* Check if we need to abort the EXEC because: 130 * 1) Some WATCHed key was touched. 131 * 2) There was a previous error while queueing commands. 132 * A failed EXEC in the first case returns a multi bulk nil object 133 * (technically it is not an error but a special behavior), while 134 * in the second an EXECABORT error is returned. */ 135 if (c->flags & (CLIENT_DIRTY_CAS|CLIENT_DIRTY_EXEC)) { 136 addReply(c, c->flags & CLIENT_DIRTY_EXEC ? shared.execaborterr : 137 shared.nullmultibulk); 138 discardTransaction(c); 139 goto handle_monitor; 140 } 141 142 /* If there are write commands inside the transaction, and this is a read 143 * only slave, we want to send an error. This happens when the transaction 144 * was initiated when the instance was a master or a writable replica and 145 * then the configuration changed (for example instance was turned into 146 * a replica). */ 147 if (!server.loading && server.masterhost && server.repl_slave_ro && 148 !(c->flags & CLIENT_MASTER) && c->mstate.cmd_flags & CMD_WRITE) 149 { 150 addReplyError(c, 151 "Transaction contains write commands but instance " 152 "is now a read-only slave. EXEC aborted."); 153 discardTransaction(c); 154 goto handle_monitor; 155 } 156 157 /* Exec all the queued commands */ 158 unwatchAllKeys(c); /* Unwatch ASAP otherwise we'll waste CPU cycles */ 159 orig_argv = c->argv; 160 orig_argc = c->argc; 161 orig_cmd = c->cmd; 162 addReplyMultiBulkLen(c,c->mstate.count); 163 for (j = 0; j < c->mstate.count; j++) { 164 c->argc = c->mstate.commands[j].argc; 165 c->argv = c->mstate.commands[j].argv; 166 c->cmd = c->mstate.commands[j].cmd; 167 168 /* Propagate a MULTI request once we encounter the first command which 169 * is not readonly nor an administrative one. 170 * This way we'll deliver the MULTI/..../EXEC block as a whole and 171 * both the AOF and the replication link will have the same consistency 172 * and atomicity guarantees. */ 173 if (!must_propagate && !(c->cmd->flags & (CMD_READONLY|CMD_ADMIN))) { 174 execCommandPropagateMulti(c); 175 must_propagate = 1; 176 } 177 178 call(c,server.loading ? CMD_CALL_NONE : CMD_CALL_FULL); 179 180 /* Commands may alter argc/argv, restore mstate. */ 181 c->mstate.commands[j].argc = c->argc; 182 c->mstate.commands[j].argv = c->argv; 183 c->mstate.commands[j].cmd = c->cmd; 184 } 185 c->argv = orig_argv; 186 c->argc = orig_argc; 187 c->cmd = orig_cmd; 188 discardTransaction(c); 189 190 /* Make sure the EXEC command will be propagated as well if MULTI 191 * was already propagated. */ 192 if (must_propagate) { 193 int is_master = server.masterhost == NULL; 194 server.dirty++; 195 /* If inside the MULTI/EXEC block this instance was suddenly 196 * switched from master to slave (using the SLAVEOF command), the 197 * initial MULTI was propagated into the replication backlog, but the 198 * rest was not. We need to make sure to at least terminate the 199 * backlog with the final EXEC. */ 200 if (server.repl_backlog && was_master && !is_master) { 201 char *execcmd = "*1\r\n$4\r\nEXEC\r\n"; 202 feedReplicationBacklog(execcmd,strlen(execcmd)); 203 } 204 } 205 206 handle_monitor: 207 /* Send EXEC to clients waiting data from MONITOR. We do it here 208 * since the natural order of commands execution is actually: 209 * MUTLI, EXEC, ... commands inside transaction ... 210 * Instead EXEC is flagged as CMD_SKIP_MONITOR in the command 211 * table, and we do it here with correct ordering. */ 212 if (listLength(server.monitors) && !server.loading) 213 replicationFeedMonitors(c,server.monitors,c->db->id,c->argv,c->argc); 214 } 215 216 /* ===================== WATCH (CAS alike for MULTI/EXEC) =================== 217 * 218 * The implementation uses a per-DB hash table mapping keys to list of clients 219 * WATCHing those keys, so that given a key that is going to be modified 220 * we can mark all the associated clients as dirty. 221 * 222 * Also every client contains a list of WATCHed keys so that's possible to 223 * un-watch such keys when the client is freed or when UNWATCH is called. */ 224 225 /* In the client->watched_keys list we need to use watchedKey structures 226 * as in order to identify a key in Redis we need both the key name and the 227 * DB */ 228 typedef struct watchedKey { 229 robj *key; 230 redisDb *db; 231 } watchedKey; 232 233 /* Watch for the specified key */ 234 void watchForKey(client *c, robj *key) { 235 list *clients = NULL; 236 listIter li; 237 listNode *ln; 238 watchedKey *wk; 239 240 /* Check if we are already watching for this key */ 241 listRewind(c->watched_keys,&li); 242 while((ln = listNext(&li))) { 243 wk = listNodeValue(ln); 244 if (wk->db == c->db && equalStringObjects(key,wk->key)) 245 return; /* Key already watched */ 246 } 247 /* This key is not already watched in this DB. Let's add it */ 248 clients = dictFetchValue(c->db->watched_keys,key); 249 if (!clients) { 250 clients = listCreate(); 251 dictAdd(c->db->watched_keys,key,clients); 252 incrRefCount(key); 253 } 254 listAddNodeTail(clients,c); 255 /* Add the new key to the list of keys watched by this client */ 256 wk = zmalloc(sizeof(*wk)); 257 wk->key = key; 258 wk->db = c->db; 259 incrRefCount(key); 260 listAddNodeTail(c->watched_keys,wk); 261 } 262 263 /* Unwatch all the keys watched by this client. To clean the EXEC dirty 264 * flag is up to the caller. */ 265 void unwatchAllKeys(client *c) { 266 listIter li; 267 listNode *ln; 268 269 if (listLength(c->watched_keys) == 0) return; 270 listRewind(c->watched_keys,&li); 271 while((ln = listNext(&li))) { 272 list *clients; 273 watchedKey *wk; 274 275 /* Lookup the watched key -> clients list and remove the client 276 * from the list */ 277 wk = listNodeValue(ln); 278 clients = dictFetchValue(wk->db->watched_keys, wk->key); 279 serverAssertWithInfo(c,NULL,clients != NULL); 280 listDelNode(clients,listSearchKey(clients,c)); 281 /* Kill the entry at all if this was the only client */ 282 if (listLength(clients) == 0) 283 dictDelete(wk->db->watched_keys, wk->key); 284 /* Remove this watched key from the client->watched list */ 285 listDelNode(c->watched_keys,ln); 286 decrRefCount(wk->key); 287 zfree(wk); 288 } 289 } 290 291 /* "Touch" a key, so that if this key is being WATCHed by some client the 292 * next EXEC will fail. */ 293 void touchWatchedKey(redisDb *db, robj *key) { 294 list *clients; 295 listIter li; 296 listNode *ln; 297 298 if (dictSize(db->watched_keys) == 0) return; 299 clients = dictFetchValue(db->watched_keys, key); 300 if (!clients) return; 301 302 /* Mark all the clients watching this key as CLIENT_DIRTY_CAS */ 303 /* Check if we are already watching for this key */ 304 listRewind(clients,&li); 305 while((ln = listNext(&li))) { 306 client *c = listNodeValue(ln); 307 308 c->flags |= CLIENT_DIRTY_CAS; 309 } 310 } 311 312 /* On FLUSHDB or FLUSHALL all the watched keys that are present before the 313 * flush but will be deleted as effect of the flushing operation should 314 * be touched. "dbid" is the DB that's getting the flush. -1 if it is 315 * a FLUSHALL operation (all the DBs flushed). */ 316 void touchWatchedKeysOnFlush(int dbid) { 317 listIter li1, li2; 318 listNode *ln; 319 320 /* For every client, check all the waited keys */ 321 listRewind(server.clients,&li1); 322 while((ln = listNext(&li1))) { 323 client *c = listNodeValue(ln); 324 listRewind(c->watched_keys,&li2); 325 while((ln = listNext(&li2))) { 326 watchedKey *wk = listNodeValue(ln); 327 328 /* For every watched key matching the specified DB, if the 329 * key exists, mark the client as dirty, as the key will be 330 * removed. */ 331 if (dbid == -1 || wk->db->id == dbid) { 332 if (dictFind(wk->db->dict, wk->key->ptr) != NULL) 333 c->flags |= CLIENT_DIRTY_CAS; 334 } 335 } 336 } 337 } 338 339 void watchCommand(client *c) { 340 int j; 341 342 if (c->flags & CLIENT_MULTI) { 343 addReplyError(c,"WATCH inside MULTI is not allowed"); 344 return; 345 } 346 for (j = 1; j < c->argc; j++) 347 watchForKey(c,c->argv[j]); 348 addReply(c,shared.ok); 349 } 350 351 void unwatchCommand(client *c) { 352 unwatchAllKeys(c); 353 c->flags &= (~CLIENT_DIRTY_CAS); 354 addReply(c,shared.ok); 355 } 356