1 /* blocked.c - generic support for blocking operations like BLPOP & WAIT.
2 *
3 * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are met:
8 *
9 * * Redistributions of source code must retain the above copyright notice,
10 * this list of conditions and the following disclaimer.
11 * * Redistributions in binary form must reproduce the above copyright
12 * notice, this list of conditions and the following disclaimer in the
13 * documentation and/or other materials provided with the distribution.
14 * * Neither the name of Redis nor the names of its contributors may be used
15 * to endorse or promote products derived from this software without
16 * specific prior written permission.
17 *
18 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
19 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
21 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
22 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
23 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
24 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
25 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
26 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
27 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28 * POSSIBILITY OF SUCH DAMAGE.
29 *
30 * ---------------------------------------------------------------------------
31 *
32 * API:
33 *
34 * getTimeoutFromObjectOrReply() is just an utility function to parse a
35 * timeout argument since blocking operations usually require a timeout.
36 *
37 * blockClient() set the CLIENT_BLOCKED flag in the client, and set the
38 * specified block type 'btype' filed to one of BLOCKED_* macros.
39 *
40 * unblockClient() unblocks the client doing the following:
41 * 1) It calls the btype-specific function to cleanup the state.
42 * 2) It unblocks the client by unsetting the CLIENT_BLOCKED flag.
43 * 3) It puts the client into a list of just unblocked clients that are
44 * processed ASAP in the beforeSleep() event loop callback, so that
45 * if there is some query buffer to process, we do it. This is also
46 * required because otherwise there is no 'readable' event fired, we
47 * already read the pending commands. We also set the CLIENT_UNBLOCKED
48 * flag to remember the client is in the unblocked_clients list.
49 *
50 * processUnblockedClients() is called inside the beforeSleep() function
51 * to process the query buffer from unblocked clients and remove the clients
52 * from the blocked_clients queue.
53 *
54 * replyToBlockedClientTimedOut() is called by the cron function when
55 * a client blocked reaches the specified timeout (if the timeout is set
56 * to 0, no timeout is processed).
57 * It usually just needs to send a reply to the client.
58 *
59 * When implementing a new type of blocking opeation, the implementation
60 * should modify unblockClient() and replyToBlockedClientTimedOut() in order
61 * to handle the btype-specific behavior of this two functions.
62 * If the blocking operation waits for certain keys to change state, the
63 * clusterRedirectBlockedClientIfNeeded() function should also be updated.
64 */
65
66 #include "server.h"
67
68 /* Get a timeout value from an object and store it into 'timeout'.
69 * The final timeout is always stored as milliseconds as a time where the
70 * timeout will expire, however the parsing is performed according to
71 * the 'unit' that can be seconds or milliseconds.
72 *
73 * Note that if the timeout is zero (usually from the point of view of
74 * commands API this means no timeout) the value stored into 'timeout'
75 * is zero. */
getTimeoutFromObjectOrReply(client * c,robj * object,mstime_t * timeout,int unit)76 int getTimeoutFromObjectOrReply(client *c, robj *object, mstime_t *timeout, int unit) {
77 long long tval;
78
79 if (getLongLongFromObjectOrReply(c,object,&tval,
80 "timeout is not an integer or out of range") != C_OK)
81 return C_ERR;
82
83 if (tval < 0) {
84 addReplyError(c,"timeout is negative");
85 return C_ERR;
86 }
87
88 if (tval > 0) {
89 if (unit == UNIT_SECONDS) tval *= 1000;
90 tval += mstime();
91 }
92 *timeout = tval;
93
94 return C_OK;
95 }
96
97 /* Block a client for the specific operation type. Once the CLIENT_BLOCKED
98 * flag is set client query buffer is not longer processed, but accumulated,
99 * and will be processed when the client is unblocked. */
blockClient(client * c,int btype)100 void blockClient(client *c, int btype) {
101 c->flags |= CLIENT_BLOCKED;
102 c->btype = btype;
103 server.bpop_blocked_clients++;
104 }
105
106 /* This function is called in the beforeSleep() function of the event loop
107 * in order to process the pending input buffer of clients that were
108 * unblocked after a blocking operation. */
processUnblockedClients(void)109 void processUnblockedClients(void) {
110 listNode *ln;
111 client *c;
112
113 while (listLength(server.unblocked_clients)) {
114 ln = listFirst(server.unblocked_clients);
115 serverAssert(ln != NULL);
116 c = ln->value;
117 listDelNode(server.unblocked_clients,ln);
118 c->flags &= ~CLIENT_UNBLOCKED;
119
120 /* Process remaining data in the input buffer, unless the client
121 * is blocked again. Actually processInputBuffer() checks that the
122 * client is not blocked before to proceed, but things may change and
123 * the code is conceptually more correct this way. */
124 if (!(c->flags & CLIENT_BLOCKED)) {
125 if (c->querybuf && sdslen(c->querybuf) > 0) {
126 processInputBuffer(c);
127 }
128 }
129 }
130 }
131
132 /* Unblock a client calling the right function depending on the kind
133 * of operation the client is blocking for. */
unblockClient(client * c)134 void unblockClient(client *c) {
135 if (c->btype == BLOCKED_LIST) {
136 unblockClientWaitingData(c);
137 } else if (c->btype == BLOCKED_WAIT) {
138 unblockClientWaitingReplicas(c);
139 } else {
140 serverPanic("Unknown btype in unblockClient().");
141 }
142 /* Clear the flags, and put the client in the unblocked list so that
143 * we'll process new commands in its query buffer ASAP. */
144 c->flags &= ~CLIENT_BLOCKED;
145 c->btype = BLOCKED_NONE;
146 server.bpop_blocked_clients--;
147 /* The client may already be into the unblocked list because of a previous
148 * blocking operation, don't add back it into the list multiple times. */
149 if (!(c->flags & CLIENT_UNBLOCKED)) {
150 c->flags |= CLIENT_UNBLOCKED;
151 listAddNodeTail(server.unblocked_clients,c);
152 }
153 }
154
155 /* This function gets called when a blocked client timed out in order to
156 * send it a reply of some kind. */
replyToBlockedClientTimedOut(client * c)157 void replyToBlockedClientTimedOut(client *c) {
158 if (c->btype == BLOCKED_LIST) {
159 addReply(c,shared.nullmultibulk);
160 } else if (c->btype == BLOCKED_WAIT) {
161 addReplyLongLong(c,replicationCountAcksByOffset(c->bpop.reploffset));
162 } else {
163 serverPanic("Unknown btype in replyToBlockedClientTimedOut().");
164 }
165 }
166
167 /* Mass-unblock clients because something changed in the instance that makes
168 * blocking no longer safe. For example clients blocked in list operations
169 * in an instance which turns from master to slave is unsafe, so this function
170 * is called when a master turns into a slave.
171 *
172 * The semantics is to send an -UNBLOCKED error to the client, disconnecting
173 * it at the same time. */
disconnectAllBlockedClients(void)174 void disconnectAllBlockedClients(void) {
175 listNode *ln;
176 listIter li;
177
178 listRewind(server.clients,&li);
179 while((ln = listNext(&li))) {
180 client *c = listNodeValue(ln);
181
182 if (c->flags & CLIENT_BLOCKED) {
183 addReplySds(c,sdsnew(
184 "-UNBLOCKED force unblock from blocking operation, "
185 "instance state changed (master -> slave?)\r\n"));
186 unblockClient(c);
187 c->flags |= CLIENT_CLOSE_AFTER_REPLY;
188 }
189 }
190 }
191