xref: /redis-3.2.3/src/blocked.c (revision 32f80e2f)
1 /* blocked.c - generic support for blocking operations like BLPOP & WAIT.
2  *
3  * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
4  * All rights reserved.
5  *
6  * Redistribution and use in source and binary forms, with or without
7  * modification, are permitted provided that the following conditions are met:
8  *
9  *   * Redistributions of source code must retain the above copyright notice,
10  *     this list of conditions and the following disclaimer.
11  *   * Redistributions in binary form must reproduce the above copyright
12  *     notice, this list of conditions and the following disclaimer in the
13  *     documentation and/or other materials provided with the distribution.
14  *   * Neither the name of Redis nor the names of its contributors may be used
15  *     to endorse or promote products derived from this software without
16  *     specific prior written permission.
17  *
18  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
19  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
21  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
22  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
23  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
24  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
25  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
26  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
27  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28  * POSSIBILITY OF SUCH DAMAGE.
29  *
30  * ---------------------------------------------------------------------------
31  *
32  * API:
33  *
34  * getTimeoutFromObjectOrReply() is just an utility function to parse a
35  * timeout argument since blocking operations usually require a timeout.
36  *
37  * blockClient() set the CLIENT_BLOCKED flag in the client, and set the
38  * specified block type 'btype' filed to one of BLOCKED_* macros.
39  *
40  * unblockClient() unblocks the client doing the following:
41  * 1) It calls the btype-specific function to cleanup the state.
42  * 2) It unblocks the client by unsetting the CLIENT_BLOCKED flag.
43  * 3) It puts the client into a list of just unblocked clients that are
44  *    processed ASAP in the beforeSleep() event loop callback, so that
45  *    if there is some query buffer to process, we do it. This is also
46  *    required because otherwise there is no 'readable' event fired, we
47  *    already read the pending commands. We also set the CLIENT_UNBLOCKED
48  *    flag to remember the client is in the unblocked_clients list.
49  *
50  * processUnblockedClients() is called inside the beforeSleep() function
51  * to process the query buffer from unblocked clients and remove the clients
52  * from the blocked_clients queue.
53  *
54  * replyToBlockedClientTimedOut() is called by the cron function when
55  * a client blocked reaches the specified timeout (if the timeout is set
56  * to 0, no timeout is processed).
57  * It usually just needs to send a reply to the client.
58  *
59  * When implementing a new type of blocking opeation, the implementation
60  * should modify unblockClient() and replyToBlockedClientTimedOut() in order
61  * to handle the btype-specific behavior of this two functions.
62  * If the blocking operation waits for certain keys to change state, the
63  * clusterRedirectBlockedClientIfNeeded() function should also be updated.
64  */
65 
66 #include "server.h"
67 
68 /* Get a timeout value from an object and store it into 'timeout'.
69  * The final timeout is always stored as milliseconds as a time where the
70  * timeout will expire, however the parsing is performed according to
71  * the 'unit' that can be seconds or milliseconds.
72  *
73  * Note that if the timeout is zero (usually from the point of view of
74  * commands API this means no timeout) the value stored into 'timeout'
75  * is zero. */
getTimeoutFromObjectOrReply(client * c,robj * object,mstime_t * timeout,int unit)76 int getTimeoutFromObjectOrReply(client *c, robj *object, mstime_t *timeout, int unit) {
77     long long tval;
78 
79     if (getLongLongFromObjectOrReply(c,object,&tval,
80         "timeout is not an integer or out of range") != C_OK)
81         return C_ERR;
82 
83     if (tval < 0) {
84         addReplyError(c,"timeout is negative");
85         return C_ERR;
86     }
87 
88     if (tval > 0) {
89         if (unit == UNIT_SECONDS) tval *= 1000;
90         tval += mstime();
91     }
92     *timeout = tval;
93 
94     return C_OK;
95 }
96 
97 /* Block a client for the specific operation type. Once the CLIENT_BLOCKED
98  * flag is set client query buffer is not longer processed, but accumulated,
99  * and will be processed when the client is unblocked. */
blockClient(client * c,int btype)100 void blockClient(client *c, int btype) {
101     c->flags |= CLIENT_BLOCKED;
102     c->btype = btype;
103     server.bpop_blocked_clients++;
104 }
105 
106 /* This function is called in the beforeSleep() function of the event loop
107  * in order to process the pending input buffer of clients that were
108  * unblocked after a blocking operation. */
processUnblockedClients(void)109 void processUnblockedClients(void) {
110     listNode *ln;
111     client *c;
112 
113     while (listLength(server.unblocked_clients)) {
114         ln = listFirst(server.unblocked_clients);
115         serverAssert(ln != NULL);
116         c = ln->value;
117         listDelNode(server.unblocked_clients,ln);
118         c->flags &= ~CLIENT_UNBLOCKED;
119 
120         /* Process remaining data in the input buffer, unless the client
121          * is blocked again. Actually processInputBuffer() checks that the
122          * client is not blocked before to proceed, but things may change and
123          * the code is conceptually more correct this way. */
124         if (!(c->flags & CLIENT_BLOCKED)) {
125             if (c->querybuf && sdslen(c->querybuf) > 0) {
126                 processInputBuffer(c);
127             }
128         }
129     }
130 }
131 
132 /* Unblock a client calling the right function depending on the kind
133  * of operation the client is blocking for. */
unblockClient(client * c)134 void unblockClient(client *c) {
135     if (c->btype == BLOCKED_LIST) {
136         unblockClientWaitingData(c);
137     } else if (c->btype == BLOCKED_WAIT) {
138         unblockClientWaitingReplicas(c);
139     } else {
140         serverPanic("Unknown btype in unblockClient().");
141     }
142     /* Clear the flags, and put the client in the unblocked list so that
143      * we'll process new commands in its query buffer ASAP. */
144     c->flags &= ~CLIENT_BLOCKED;
145     c->btype = BLOCKED_NONE;
146     server.bpop_blocked_clients--;
147     /* The client may already be into the unblocked list because of a previous
148      * blocking operation, don't add back it into the list multiple times. */
149     if (!(c->flags & CLIENT_UNBLOCKED)) {
150         c->flags |= CLIENT_UNBLOCKED;
151         listAddNodeTail(server.unblocked_clients,c);
152     }
153 }
154 
155 /* This function gets called when a blocked client timed out in order to
156  * send it a reply of some kind. */
replyToBlockedClientTimedOut(client * c)157 void replyToBlockedClientTimedOut(client *c) {
158     if (c->btype == BLOCKED_LIST) {
159         addReply(c,shared.nullmultibulk);
160     } else if (c->btype == BLOCKED_WAIT) {
161         addReplyLongLong(c,replicationCountAcksByOffset(c->bpop.reploffset));
162     } else {
163         serverPanic("Unknown btype in replyToBlockedClientTimedOut().");
164     }
165 }
166 
167 /* Mass-unblock clients because something changed in the instance that makes
168  * blocking no longer safe. For example clients blocked in list operations
169  * in an instance which turns from master to slave is unsafe, so this function
170  * is called when a master turns into a slave.
171  *
172  * The semantics is to send an -UNBLOCKED error to the client, disconnecting
173  * it at the same time. */
disconnectAllBlockedClients(void)174 void disconnectAllBlockedClients(void) {
175     listNode *ln;
176     listIter li;
177 
178     listRewind(server.clients,&li);
179     while((ln = listNext(&li))) {
180         client *c = listNodeValue(ln);
181 
182         if (c->flags & CLIENT_BLOCKED) {
183             addReplySds(c,sdsnew(
184                 "-UNBLOCKED force unblock from blocking operation, "
185                 "instance state changed (master -> slave?)\r\n"));
186             unblockClient(c);
187             c->flags |= CLIENT_CLOSE_AFTER_REPLY;
188         }
189     }
190 }
191