1 /* Redis Sentinel implementation
2 *
3 * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are met:
8 *
9 * * Redistributions of source code must retain the above copyright notice,
10 * this list of conditions and the following disclaimer.
11 * * Redistributions in binary form must reproduce the above copyright
12 * notice, this list of conditions and the following disclaimer in the
13 * documentation and/or other materials provided with the distribution.
14 * * Neither the name of Redis nor the names of its contributors may be used
15 * to endorse or promote products derived from this software without
16 * specific prior written permission.
17 *
18 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
19 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
21 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
22 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
23 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
24 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
25 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
26 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
27 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28 * POSSIBILITY OF SUCH DAMAGE.
29 */
30
31 #include "server.h"
32 #include "hiredis.h"
33 #include "async.h"
34
35 #include <ctype.h>
36 #include <arpa/inet.h>
37 #include <sys/socket.h>
38 #include <sys/wait.h>
39 #include <fcntl.h>
40
41 extern char **environ;
42
43 #define REDIS_SENTINEL_PORT 26379
44
45 /* ======================== Sentinel global state =========================== */
46
47 /* Address object, used to describe an ip:port pair. */
48 typedef struct sentinelAddr {
49 char *ip;
50 int port;
51 } sentinelAddr;
52
53 /* A Sentinel Redis Instance object is monitoring. */
54 #define SRI_MASTER (1<<0)
55 #define SRI_SLAVE (1<<1)
56 #define SRI_SENTINEL (1<<2)
57 #define SRI_S_DOWN (1<<3) /* Subjectively down (no quorum). */
58 #define SRI_O_DOWN (1<<4) /* Objectively down (confirmed by others). */
59 #define SRI_MASTER_DOWN (1<<5) /* A Sentinel with this flag set thinks that
60 its master is down. */
61 #define SRI_FAILOVER_IN_PROGRESS (1<<6) /* Failover is in progress for
62 this master. */
63 #define SRI_PROMOTED (1<<7) /* Slave selected for promotion. */
64 #define SRI_RECONF_SENT (1<<8) /* SLAVEOF <newmaster> sent. */
65 #define SRI_RECONF_INPROG (1<<9) /* Slave synchronization in progress. */
66 #define SRI_RECONF_DONE (1<<10) /* Slave synchronized with new master. */
67 #define SRI_FORCE_FAILOVER (1<<11) /* Force failover with master up. */
68 #define SRI_SCRIPT_KILL_SENT (1<<12) /* SCRIPT KILL already sent on -BUSY */
69
70 /* Note: times are in milliseconds. */
71 #define SENTINEL_INFO_PERIOD 10000
72 #define SENTINEL_PING_PERIOD 1000
73 #define SENTINEL_ASK_PERIOD 1000
74 #define SENTINEL_PUBLISH_PERIOD 2000
75 #define SENTINEL_DEFAULT_DOWN_AFTER 30000
76 #define SENTINEL_HELLO_CHANNEL "__sentinel__:hello"
77 #define SENTINEL_TILT_TRIGGER 2000
78 #define SENTINEL_TILT_PERIOD (SENTINEL_PING_PERIOD*30)
79 #define SENTINEL_DEFAULT_SLAVE_PRIORITY 100
80 #define SENTINEL_SLAVE_RECONF_TIMEOUT 10000
81 #define SENTINEL_DEFAULT_PARALLEL_SYNCS 1
82 #define SENTINEL_MIN_LINK_RECONNECT_PERIOD 15000
83 #define SENTINEL_DEFAULT_FAILOVER_TIMEOUT (60*3*1000)
84 #define SENTINEL_MAX_PENDING_COMMANDS 100
85 #define SENTINEL_ELECTION_TIMEOUT 10000
86 #define SENTINEL_MAX_DESYNC 1000
87 #define SENTINEL_DEFAULT_DENY_SCRIPTS_RECONFIG 1
88
89 /* Failover machine different states. */
90 #define SENTINEL_FAILOVER_STATE_NONE 0 /* No failover in progress. */
91 #define SENTINEL_FAILOVER_STATE_WAIT_START 1 /* Wait for failover_start_time*/
92 #define SENTINEL_FAILOVER_STATE_SELECT_SLAVE 2 /* Select slave to promote */
93 #define SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE 3 /* Slave -> Master */
94 #define SENTINEL_FAILOVER_STATE_WAIT_PROMOTION 4 /* Wait slave to change role */
95 #define SENTINEL_FAILOVER_STATE_RECONF_SLAVES 5 /* SLAVEOF newmaster */
96 #define SENTINEL_FAILOVER_STATE_UPDATE_CONFIG 6 /* Monitor promoted slave. */
97
98 #define SENTINEL_MASTER_LINK_STATUS_UP 0
99 #define SENTINEL_MASTER_LINK_STATUS_DOWN 1
100
101 /* Generic flags that can be used with different functions.
102 * They use higher bits to avoid colliding with the function specific
103 * flags. */
104 #define SENTINEL_NO_FLAGS 0
105 #define SENTINEL_GENERATE_EVENT (1<<16)
106 #define SENTINEL_LEADER (1<<17)
107 #define SENTINEL_OBSERVER (1<<18)
108
109 /* Script execution flags and limits. */
110 #define SENTINEL_SCRIPT_NONE 0
111 #define SENTINEL_SCRIPT_RUNNING 1
112 #define SENTINEL_SCRIPT_MAX_QUEUE 256
113 #define SENTINEL_SCRIPT_MAX_RUNNING 16
114 #define SENTINEL_SCRIPT_MAX_RUNTIME 60000 /* 60 seconds max exec time. */
115 #define SENTINEL_SCRIPT_MAX_RETRY 10
116 #define SENTINEL_SCRIPT_RETRY_DELAY 30000 /* 30 seconds between retries. */
117
118 /* SENTINEL SIMULATE-FAILURE command flags. */
119 #define SENTINEL_SIMFAILURE_NONE 0
120 #define SENTINEL_SIMFAILURE_CRASH_AFTER_ELECTION (1<<0)
121 #define SENTINEL_SIMFAILURE_CRASH_AFTER_PROMOTION (1<<1)
122
123 /* The link to a sentinelRedisInstance. When we have the same set of Sentinels
124 * monitoring many masters, we have different instances representing the
125 * same Sentinels, one per master, and we need to share the hiredis connections
126 * among them. Oherwise if 5 Sentinels are monitoring 100 masters we create
127 * 500 outgoing connections instead of 5.
128 *
129 * So this structure represents a reference counted link in terms of the two
130 * hiredis connections for commands and Pub/Sub, and the fields needed for
131 * failure detection, since the ping/pong time are now local to the link: if
132 * the link is available, the instance is avaialbe. This way we don't just
133 * have 5 connections instead of 500, we also send 5 pings instead of 500.
134 *
135 * Links are shared only for Sentinels: master and slave instances have
136 * a link with refcount = 1, always. */
137 typedef struct instanceLink {
138 int refcount; /* Number of sentinelRedisInstance owners. */
139 int disconnected; /* Non-zero if we need to reconnect cc or pc. */
140 int pending_commands; /* Number of commands sent waiting for a reply. */
141 redisAsyncContext *cc; /* Hiredis context for commands. */
142 redisAsyncContext *pc; /* Hiredis context for Pub / Sub. */
143 mstime_t cc_conn_time; /* cc connection time. */
144 mstime_t pc_conn_time; /* pc connection time. */
145 mstime_t pc_last_activity; /* Last time we received any message. */
146 mstime_t last_avail_time; /* Last time the instance replied to ping with
147 a reply we consider valid. */
148 mstime_t act_ping_time; /* Time at which the last pending ping (no pong
149 received after it) was sent. This field is
150 set to 0 when a pong is received, and set again
151 to the current time if the value is 0 and a new
152 ping is sent. */
153 mstime_t last_ping_time; /* Time at which we sent the last ping. This is
154 only used to avoid sending too many pings
155 during failure. Idle time is computed using
156 the act_ping_time field. */
157 mstime_t last_pong_time; /* Last time the instance replied to ping,
158 whatever the reply was. That's used to check
159 if the link is idle and must be reconnected. */
160 mstime_t last_reconn_time; /* Last reconnection attempt performed when
161 the link was down. */
162 } instanceLink;
163
164 typedef struct sentinelRedisInstance {
165 int flags; /* See SRI_... defines */
166 char *name; /* Master name from the point of view of this sentinel. */
167 char *runid; /* Run ID of this instance, or unique ID if is a Sentinel.*/
168 uint64_t config_epoch; /* Configuration epoch. */
169 sentinelAddr *addr; /* Master host. */
170 instanceLink *link; /* Link to the instance, may be shared for Sentinels. */
171 mstime_t last_pub_time; /* Last time we sent hello via Pub/Sub. */
172 mstime_t last_hello_time; /* Only used if SRI_SENTINEL is set. Last time
173 we received a hello from this Sentinel
174 via Pub/Sub. */
175 mstime_t last_master_down_reply_time; /* Time of last reply to
176 SENTINEL is-master-down command. */
177 mstime_t s_down_since_time; /* Subjectively down since time. */
178 mstime_t o_down_since_time; /* Objectively down since time. */
179 mstime_t down_after_period; /* Consider it down after that period. */
180 mstime_t info_refresh; /* Time at which we received INFO output from it. */
181 dict *renamed_commands; /* Commands renamed in this instance:
182 Sentinel will use the alternative commands
183 mapped on this table to send things like
184 SLAVEOF, CONFING, INFO, ... */
185
186 /* Role and the first time we observed it.
187 * This is useful in order to delay replacing what the instance reports
188 * with our own configuration. We need to always wait some time in order
189 * to give a chance to the leader to report the new configuration before
190 * we do silly things. */
191 int role_reported;
192 mstime_t role_reported_time;
193 mstime_t slave_conf_change_time; /* Last time slave master addr changed. */
194
195 /* Master specific. */
196 dict *sentinels; /* Other sentinels monitoring the same master. */
197 dict *slaves; /* Slaves for this master instance. */
198 unsigned int quorum;/* Number of sentinels that need to agree on failure. */
199 int parallel_syncs; /* How many slaves to reconfigure at same time. */
200 char *auth_pass; /* Password to use for AUTH against master & slaves. */
201
202 /* Slave specific. */
203 mstime_t master_link_down_time; /* Slave replication link down time. */
204 int slave_priority; /* Slave priority according to its INFO output. */
205 mstime_t slave_reconf_sent_time; /* Time at which we sent SLAVE OF <new> */
206 struct sentinelRedisInstance *master; /* Master instance if it's slave. */
207 char *slave_master_host; /* Master host as reported by INFO */
208 int slave_master_port; /* Master port as reported by INFO */
209 int slave_master_link_status; /* Master link status as reported by INFO */
210 unsigned long long slave_repl_offset; /* Slave replication offset. */
211 /* Failover */
212 char *leader; /* If this is a master instance, this is the runid of
213 the Sentinel that should perform the failover. If
214 this is a Sentinel, this is the runid of the Sentinel
215 that this Sentinel voted as leader. */
216 uint64_t leader_epoch; /* Epoch of the 'leader' field. */
217 uint64_t failover_epoch; /* Epoch of the currently started failover. */
218 int failover_state; /* See SENTINEL_FAILOVER_STATE_* defines. */
219 mstime_t failover_state_change_time;
220 mstime_t failover_start_time; /* Last failover attempt start time. */
221 mstime_t failover_timeout; /* Max time to refresh failover state. */
222 mstime_t failover_delay_logged; /* For what failover_start_time value we
223 logged the failover delay. */
224 struct sentinelRedisInstance *promoted_slave; /* Promoted slave instance. */
225 /* Scripts executed to notify admin or reconfigure clients: when they
226 * are set to NULL no script is executed. */
227 char *notification_script;
228 char *client_reconfig_script;
229 sds info; /* cached INFO output */
230 } sentinelRedisInstance;
231
232 /* Main state. */
233 struct sentinelState {
234 char myid[CONFIG_RUN_ID_SIZE+1]; /* This sentinel ID. */
235 uint64_t current_epoch; /* Current epoch. */
236 dict *masters; /* Dictionary of master sentinelRedisInstances.
237 Key is the instance name, value is the
238 sentinelRedisInstance structure pointer. */
239 int tilt; /* Are we in TILT mode? */
240 int running_scripts; /* Number of scripts in execution right now. */
241 mstime_t tilt_start_time; /* When TITL started. */
242 mstime_t previous_time; /* Last time we ran the time handler. */
243 list *scripts_queue; /* Queue of user scripts to execute. */
244 char *announce_ip; /* IP addr that is gossiped to other sentinels if
245 not NULL. */
246 int announce_port; /* Port that is gossiped to other sentinels if
247 non zero. */
248 unsigned long simfailure_flags; /* Failures simulation. */
249 int deny_scripts_reconfig; /* Allow SENTINEL SET ... to change script
250 paths at runtime? */
251 } sentinel;
252
253 /* A script execution job. */
254 typedef struct sentinelScriptJob {
255 int flags; /* Script job flags: SENTINEL_SCRIPT_* */
256 int retry_num; /* Number of times we tried to execute it. */
257 char **argv; /* Arguments to call the script. */
258 mstime_t start_time; /* Script execution time if the script is running,
259 otherwise 0 if we are allowed to retry the
260 execution at any time. If the script is not
261 running and it's not 0, it means: do not run
262 before the specified time. */
263 pid_t pid; /* Script execution pid. */
264 } sentinelScriptJob;
265
266 /* ======================= hiredis ae.c adapters =============================
267 * Note: this implementation is taken from hiredis/adapters/ae.h, however
268 * we have our modified copy for Sentinel in order to use our allocator
269 * and to have full control over how the adapter works. */
270
271 typedef struct redisAeEvents {
272 redisAsyncContext *context;
273 aeEventLoop *loop;
274 int fd;
275 int reading, writing;
276 } redisAeEvents;
277
redisAeReadEvent(aeEventLoop * el,int fd,void * privdata,int mask)278 static void redisAeReadEvent(aeEventLoop *el, int fd, void *privdata, int mask) {
279 ((void)el); ((void)fd); ((void)mask);
280
281 redisAeEvents *e = (redisAeEvents*)privdata;
282 redisAsyncHandleRead(e->context);
283 }
284
redisAeWriteEvent(aeEventLoop * el,int fd,void * privdata,int mask)285 static void redisAeWriteEvent(aeEventLoop *el, int fd, void *privdata, int mask) {
286 ((void)el); ((void)fd); ((void)mask);
287
288 redisAeEvents *e = (redisAeEvents*)privdata;
289 redisAsyncHandleWrite(e->context);
290 }
291
redisAeAddRead(void * privdata)292 static void redisAeAddRead(void *privdata) {
293 redisAeEvents *e = (redisAeEvents*)privdata;
294 aeEventLoop *loop = e->loop;
295 if (!e->reading) {
296 e->reading = 1;
297 aeCreateFileEvent(loop,e->fd,AE_READABLE,redisAeReadEvent,e);
298 }
299 }
300
redisAeDelRead(void * privdata)301 static void redisAeDelRead(void *privdata) {
302 redisAeEvents *e = (redisAeEvents*)privdata;
303 aeEventLoop *loop = e->loop;
304 if (e->reading) {
305 e->reading = 0;
306 aeDeleteFileEvent(loop,e->fd,AE_READABLE);
307 }
308 }
309
redisAeAddWrite(void * privdata)310 static void redisAeAddWrite(void *privdata) {
311 redisAeEvents *e = (redisAeEvents*)privdata;
312 aeEventLoop *loop = e->loop;
313 if (!e->writing) {
314 e->writing = 1;
315 aeCreateFileEvent(loop,e->fd,AE_WRITABLE,redisAeWriteEvent,e);
316 }
317 }
318
redisAeDelWrite(void * privdata)319 static void redisAeDelWrite(void *privdata) {
320 redisAeEvents *e = (redisAeEvents*)privdata;
321 aeEventLoop *loop = e->loop;
322 if (e->writing) {
323 e->writing = 0;
324 aeDeleteFileEvent(loop,e->fd,AE_WRITABLE);
325 }
326 }
327
redisAeCleanup(void * privdata)328 static void redisAeCleanup(void *privdata) {
329 redisAeEvents *e = (redisAeEvents*)privdata;
330 redisAeDelRead(privdata);
331 redisAeDelWrite(privdata);
332 zfree(e);
333 }
334
redisAeAttach(aeEventLoop * loop,redisAsyncContext * ac)335 static int redisAeAttach(aeEventLoop *loop, redisAsyncContext *ac) {
336 redisContext *c = &(ac->c);
337 redisAeEvents *e;
338
339 /* Nothing should be attached when something is already attached */
340 if (ac->ev.data != NULL)
341 return C_ERR;
342
343 /* Create container for context and r/w events */
344 e = (redisAeEvents*)zmalloc(sizeof(*e));
345 e->context = ac;
346 e->loop = loop;
347 e->fd = c->fd;
348 e->reading = e->writing = 0;
349
350 /* Register functions to start/stop listening for events */
351 ac->ev.addRead = redisAeAddRead;
352 ac->ev.delRead = redisAeDelRead;
353 ac->ev.addWrite = redisAeAddWrite;
354 ac->ev.delWrite = redisAeDelWrite;
355 ac->ev.cleanup = redisAeCleanup;
356 ac->ev.data = e;
357
358 return C_OK;
359 }
360
361 /* ============================= Prototypes ================================= */
362
363 void sentinelLinkEstablishedCallback(const redisAsyncContext *c, int status);
364 void sentinelDisconnectCallback(const redisAsyncContext *c, int status);
365 void sentinelReceiveHelloMessages(redisAsyncContext *c, void *reply, void *privdata);
366 sentinelRedisInstance *sentinelGetMasterByName(char *name);
367 char *sentinelGetSubjectiveLeader(sentinelRedisInstance *master);
368 char *sentinelGetObjectiveLeader(sentinelRedisInstance *master);
369 int yesnotoi(char *s);
370 void instanceLinkConnectionError(const redisAsyncContext *c);
371 const char *sentinelRedisInstanceTypeStr(sentinelRedisInstance *ri);
372 void sentinelAbortFailover(sentinelRedisInstance *ri);
373 void sentinelEvent(int level, char *type, sentinelRedisInstance *ri, const char *fmt, ...);
374 sentinelRedisInstance *sentinelSelectSlave(sentinelRedisInstance *master);
375 void sentinelScheduleScriptExecution(char *path, ...);
376 void sentinelStartFailover(sentinelRedisInstance *master);
377 void sentinelDiscardReplyCallback(redisAsyncContext *c, void *reply, void *privdata);
378 int sentinelSendSlaveOf(sentinelRedisInstance *ri, char *host, int port);
379 char *sentinelVoteLeader(sentinelRedisInstance *master, uint64_t req_epoch, char *req_runid, uint64_t *leader_epoch);
380 void sentinelFlushConfig(void);
381 void sentinelGenerateInitialMonitorEvents(void);
382 int sentinelSendPing(sentinelRedisInstance *ri);
383 int sentinelForceHelloUpdateForMaster(sentinelRedisInstance *master);
384 sentinelRedisInstance *getSentinelRedisInstanceByAddrAndRunID(dict *instances, char *ip, int port, char *runid);
385 void sentinelSimFailureCrash(void);
386
387 /* ========================= Dictionary types =============================== */
388
389 uint64_t dictSdsHash(const void *key);
390 uint64_t dictSdsCaseHash(const void *key);
391 int dictSdsKeyCompare(void *privdata, const void *key1, const void *key2);
392 int dictSdsKeyCaseCompare(void *privdata, const void *key1, const void *key2);
393 void releaseSentinelRedisInstance(sentinelRedisInstance *ri);
394
dictInstancesValDestructor(void * privdata,void * obj)395 void dictInstancesValDestructor (void *privdata, void *obj) {
396 UNUSED(privdata);
397 releaseSentinelRedisInstance(obj);
398 }
399
400 /* Instance name (sds) -> instance (sentinelRedisInstance pointer)
401 *
402 * also used for: sentinelRedisInstance->sentinels dictionary that maps
403 * sentinels ip:port to last seen time in Pub/Sub hello message. */
404 dictType instancesDictType = {
405 dictSdsHash, /* hash function */
406 NULL, /* key dup */
407 NULL, /* val dup */
408 dictSdsKeyCompare, /* key compare */
409 NULL, /* key destructor */
410 dictInstancesValDestructor /* val destructor */
411 };
412
413 /* Instance runid (sds) -> votes (long casted to void*)
414 *
415 * This is useful into sentinelGetObjectiveLeader() function in order to
416 * count the votes and understand who is the leader. */
417 dictType leaderVotesDictType = {
418 dictSdsHash, /* hash function */
419 NULL, /* key dup */
420 NULL, /* val dup */
421 dictSdsKeyCompare, /* key compare */
422 NULL, /* key destructor */
423 NULL /* val destructor */
424 };
425
426 /* Instance renamed commands table. */
427 dictType renamedCommandsDictType = {
428 dictSdsCaseHash, /* hash function */
429 NULL, /* key dup */
430 NULL, /* val dup */
431 dictSdsKeyCaseCompare, /* key compare */
432 dictSdsDestructor, /* key destructor */
433 dictSdsDestructor /* val destructor */
434 };
435
436 /* =========================== Initialization =============================== */
437
438 void sentinelCommand(client *c);
439 void sentinelInfoCommand(client *c);
440 void sentinelSetCommand(client *c);
441 void sentinelPublishCommand(client *c);
442 void sentinelRoleCommand(client *c);
443
444 struct redisCommand sentinelcmds[] = {
445 {"ping",pingCommand,1,"",0,NULL,0,0,0,0,0},
446 {"sentinel",sentinelCommand,-2,"",0,NULL,0,0,0,0,0},
447 {"subscribe",subscribeCommand,-2,"",0,NULL,0,0,0,0,0},
448 {"unsubscribe",unsubscribeCommand,-1,"",0,NULL,0,0,0,0,0},
449 {"psubscribe",psubscribeCommand,-2,"",0,NULL,0,0,0,0,0},
450 {"punsubscribe",punsubscribeCommand,-1,"",0,NULL,0,0,0,0,0},
451 {"publish",sentinelPublishCommand,3,"",0,NULL,0,0,0,0,0},
452 {"info",sentinelInfoCommand,-1,"",0,NULL,0,0,0,0,0},
453 {"role",sentinelRoleCommand,1,"l",0,NULL,0,0,0,0,0},
454 {"client",clientCommand,-2,"rs",0,NULL,0,0,0,0,0},
455 {"shutdown",shutdownCommand,-1,"",0,NULL,0,0,0,0,0},
456 {"auth",authCommand,2,"sltF",0,NULL,0,0,0,0,0}
457 };
458
459 /* This function overwrites a few normal Redis config default with Sentinel
460 * specific defaults. */
initSentinelConfig(void)461 void initSentinelConfig(void) {
462 server.port = REDIS_SENTINEL_PORT;
463 server.protected_mode = 0; /* Sentinel must be exposed. */
464 }
465
466 /* Perform the Sentinel mode initialization. */
initSentinel(void)467 void initSentinel(void) {
468 unsigned int j;
469
470 /* Remove usual Redis commands from the command table, then just add
471 * the SENTINEL command. */
472 dictEmpty(server.commands,NULL);
473 for (j = 0; j < sizeof(sentinelcmds)/sizeof(sentinelcmds[0]); j++) {
474 int retval;
475 struct redisCommand *cmd = sentinelcmds+j;
476
477 retval = dictAdd(server.commands, sdsnew(cmd->name), cmd);
478 serverAssert(retval == DICT_OK);
479 }
480
481 /* Initialize various data structures. */
482 sentinel.current_epoch = 0;
483 sentinel.masters = dictCreate(&instancesDictType,NULL);
484 sentinel.tilt = 0;
485 sentinel.tilt_start_time = 0;
486 sentinel.previous_time = mstime();
487 sentinel.running_scripts = 0;
488 sentinel.scripts_queue = listCreate();
489 sentinel.announce_ip = NULL;
490 sentinel.announce_port = 0;
491 sentinel.simfailure_flags = SENTINEL_SIMFAILURE_NONE;
492 sentinel.deny_scripts_reconfig = SENTINEL_DEFAULT_DENY_SCRIPTS_RECONFIG;
493 memset(sentinel.myid,0,sizeof(sentinel.myid));
494 }
495
496 /* This function gets called when the server is in Sentinel mode, started,
497 * loaded the configuration, and is ready for normal operations. */
sentinelIsRunning(void)498 void sentinelIsRunning(void) {
499 int j;
500
501 if (server.configfile == NULL) {
502 serverLog(LL_WARNING,
503 "Sentinel started without a config file. Exiting...");
504 exit(1);
505 } else if (access(server.configfile,W_OK) == -1) {
506 serverLog(LL_WARNING,
507 "Sentinel config file %s is not writable: %s. Exiting...",
508 server.configfile,strerror(errno));
509 exit(1);
510 }
511
512 /* If this Sentinel has yet no ID set in the configuration file, we
513 * pick a random one and persist the config on disk. From now on this
514 * will be this Sentinel ID across restarts. */
515 for (j = 0; j < CONFIG_RUN_ID_SIZE; j++)
516 if (sentinel.myid[j] != 0) break;
517
518 if (j == CONFIG_RUN_ID_SIZE) {
519 /* Pick ID and persist the config. */
520 getRandomHexChars(sentinel.myid,CONFIG_RUN_ID_SIZE);
521 sentinelFlushConfig();
522 }
523
524 /* Log its ID to make debugging of issues simpler. */
525 serverLog(LL_WARNING,"Sentinel ID is %s", sentinel.myid);
526
527 /* We want to generate a +monitor event for every configured master
528 * at startup. */
529 sentinelGenerateInitialMonitorEvents();
530 }
531
532 /* ============================== sentinelAddr ============================== */
533
534 /* Create a sentinelAddr object and return it on success.
535 * On error NULL is returned and errno is set to:
536 * ENOENT: Can't resolve the hostname.
537 * EINVAL: Invalid port number.
538 */
createSentinelAddr(char * hostname,int port)539 sentinelAddr *createSentinelAddr(char *hostname, int port) {
540 char ip[NET_IP_STR_LEN];
541 sentinelAddr *sa;
542
543 if (port < 0 || port > 65535) {
544 errno = EINVAL;
545 return NULL;
546 }
547 if (anetResolve(NULL,hostname,ip,sizeof(ip)) == ANET_ERR) {
548 errno = ENOENT;
549 return NULL;
550 }
551 sa = zmalloc(sizeof(*sa));
552 sa->ip = sdsnew(ip);
553 sa->port = port;
554 return sa;
555 }
556
557 /* Return a duplicate of the source address. */
dupSentinelAddr(sentinelAddr * src)558 sentinelAddr *dupSentinelAddr(sentinelAddr *src) {
559 sentinelAddr *sa;
560
561 sa = zmalloc(sizeof(*sa));
562 sa->ip = sdsnew(src->ip);
563 sa->port = src->port;
564 return sa;
565 }
566
567 /* Free a Sentinel address. Can't fail. */
releaseSentinelAddr(sentinelAddr * sa)568 void releaseSentinelAddr(sentinelAddr *sa) {
569 sdsfree(sa->ip);
570 zfree(sa);
571 }
572
573 /* Return non-zero if two addresses are equal. */
sentinelAddrIsEqual(sentinelAddr * a,sentinelAddr * b)574 int sentinelAddrIsEqual(sentinelAddr *a, sentinelAddr *b) {
575 return a->port == b->port && !strcasecmp(a->ip,b->ip);
576 }
577
578 /* =========================== Events notification ========================== */
579
580 /* Send an event to log, pub/sub, user notification script.
581 *
582 * 'level' is the log level for logging. Only LL_WARNING events will trigger
583 * the execution of the user notification script.
584 *
585 * 'type' is the message type, also used as a pub/sub channel name.
586 *
587 * 'ri', is the redis instance target of this event if applicable, and is
588 * used to obtain the path of the notification script to execute.
589 *
590 * The remaining arguments are printf-alike.
591 * If the format specifier starts with the two characters "%@" then ri is
592 * not NULL, and the message is prefixed with an instance identifier in the
593 * following format:
594 *
595 * <instance type> <instance name> <ip> <port>
596 *
597 * If the instance type is not master, than the additional string is
598 * added to specify the originating master:
599 *
600 * @ <master name> <master ip> <master port>
601 *
602 * Any other specifier after "%@" is processed by printf itself.
603 */
sentinelEvent(int level,char * type,sentinelRedisInstance * ri,const char * fmt,...)604 void sentinelEvent(int level, char *type, sentinelRedisInstance *ri,
605 const char *fmt, ...) {
606 va_list ap;
607 char msg[LOG_MAX_LEN];
608 robj *channel, *payload;
609
610 /* Handle %@ */
611 if (fmt[0] == '%' && fmt[1] == '@') {
612 sentinelRedisInstance *master = (ri->flags & SRI_MASTER) ?
613 NULL : ri->master;
614
615 if (master) {
616 snprintf(msg, sizeof(msg), "%s %s %s %d @ %s %s %d",
617 sentinelRedisInstanceTypeStr(ri),
618 ri->name, ri->addr->ip, ri->addr->port,
619 master->name, master->addr->ip, master->addr->port);
620 } else {
621 snprintf(msg, sizeof(msg), "%s %s %s %d",
622 sentinelRedisInstanceTypeStr(ri),
623 ri->name, ri->addr->ip, ri->addr->port);
624 }
625 fmt += 2;
626 } else {
627 msg[0] = '\0';
628 }
629
630 /* Use vsprintf for the rest of the formatting if any. */
631 if (fmt[0] != '\0') {
632 va_start(ap, fmt);
633 vsnprintf(msg+strlen(msg), sizeof(msg)-strlen(msg), fmt, ap);
634 va_end(ap);
635 }
636
637 /* Log the message if the log level allows it to be logged. */
638 if (level >= server.verbosity)
639 serverLog(level,"%s %s",type,msg);
640
641 /* Publish the message via Pub/Sub if it's not a debugging one. */
642 if (level != LL_DEBUG) {
643 channel = createStringObject(type,strlen(type));
644 payload = createStringObject(msg,strlen(msg));
645 pubsubPublishMessage(channel,payload);
646 decrRefCount(channel);
647 decrRefCount(payload);
648 }
649
650 /* Call the notification script if applicable. */
651 if (level == LL_WARNING && ri != NULL) {
652 sentinelRedisInstance *master = (ri->flags & SRI_MASTER) ?
653 ri : ri->master;
654 if (master && master->notification_script) {
655 sentinelScheduleScriptExecution(master->notification_script,
656 type,msg,NULL);
657 }
658 }
659 }
660
661 /* This function is called only at startup and is used to generate a
662 * +monitor event for every configured master. The same events are also
663 * generated when a master to monitor is added at runtime via the
664 * SENTINEL MONITOR command. */
sentinelGenerateInitialMonitorEvents(void)665 void sentinelGenerateInitialMonitorEvents(void) {
666 dictIterator *di;
667 dictEntry *de;
668
669 di = dictGetIterator(sentinel.masters);
670 while((de = dictNext(di)) != NULL) {
671 sentinelRedisInstance *ri = dictGetVal(de);
672 sentinelEvent(LL_WARNING,"+monitor",ri,"%@ quorum %d",ri->quorum);
673 }
674 dictReleaseIterator(di);
675 }
676
677 /* ============================ script execution ============================ */
678
679 /* Release a script job structure and all the associated data. */
sentinelReleaseScriptJob(sentinelScriptJob * sj)680 void sentinelReleaseScriptJob(sentinelScriptJob *sj) {
681 int j = 0;
682
683 while(sj->argv[j]) sdsfree(sj->argv[j++]);
684 zfree(sj->argv);
685 zfree(sj);
686 }
687
688 #define SENTINEL_SCRIPT_MAX_ARGS 16
sentinelScheduleScriptExecution(char * path,...)689 void sentinelScheduleScriptExecution(char *path, ...) {
690 va_list ap;
691 char *argv[SENTINEL_SCRIPT_MAX_ARGS+1];
692 int argc = 1;
693 sentinelScriptJob *sj;
694
695 va_start(ap, path);
696 while(argc < SENTINEL_SCRIPT_MAX_ARGS) {
697 argv[argc] = va_arg(ap,char*);
698 if (!argv[argc]) break;
699 argv[argc] = sdsnew(argv[argc]); /* Copy the string. */
700 argc++;
701 }
702 va_end(ap);
703 argv[0] = sdsnew(path);
704
705 sj = zmalloc(sizeof(*sj));
706 sj->flags = SENTINEL_SCRIPT_NONE;
707 sj->retry_num = 0;
708 sj->argv = zmalloc(sizeof(char*)*(argc+1));
709 sj->start_time = 0;
710 sj->pid = 0;
711 memcpy(sj->argv,argv,sizeof(char*)*(argc+1));
712
713 listAddNodeTail(sentinel.scripts_queue,sj);
714
715 /* Remove the oldest non running script if we already hit the limit. */
716 if (listLength(sentinel.scripts_queue) > SENTINEL_SCRIPT_MAX_QUEUE) {
717 listNode *ln;
718 listIter li;
719
720 listRewind(sentinel.scripts_queue,&li);
721 while ((ln = listNext(&li)) != NULL) {
722 sj = ln->value;
723
724 if (sj->flags & SENTINEL_SCRIPT_RUNNING) continue;
725 /* The first node is the oldest as we add on tail. */
726 listDelNode(sentinel.scripts_queue,ln);
727 sentinelReleaseScriptJob(sj);
728 break;
729 }
730 serverAssert(listLength(sentinel.scripts_queue) <=
731 SENTINEL_SCRIPT_MAX_QUEUE);
732 }
733 }
734
735 /* Lookup a script in the scripts queue via pid, and returns the list node
736 * (so that we can easily remove it from the queue if needed). */
sentinelGetScriptListNodeByPid(pid_t pid)737 listNode *sentinelGetScriptListNodeByPid(pid_t pid) {
738 listNode *ln;
739 listIter li;
740
741 listRewind(sentinel.scripts_queue,&li);
742 while ((ln = listNext(&li)) != NULL) {
743 sentinelScriptJob *sj = ln->value;
744
745 if ((sj->flags & SENTINEL_SCRIPT_RUNNING) && sj->pid == pid)
746 return ln;
747 }
748 return NULL;
749 }
750
751 /* Run pending scripts if we are not already at max number of running
752 * scripts. */
sentinelRunPendingScripts(void)753 void sentinelRunPendingScripts(void) {
754 listNode *ln;
755 listIter li;
756 mstime_t now = mstime();
757
758 /* Find jobs that are not running and run them, from the top to the
759 * tail of the queue, so we run older jobs first. */
760 listRewind(sentinel.scripts_queue,&li);
761 while (sentinel.running_scripts < SENTINEL_SCRIPT_MAX_RUNNING &&
762 (ln = listNext(&li)) != NULL)
763 {
764 sentinelScriptJob *sj = ln->value;
765 pid_t pid;
766
767 /* Skip if already running. */
768 if (sj->flags & SENTINEL_SCRIPT_RUNNING) continue;
769
770 /* Skip if it's a retry, but not enough time has elapsed. */
771 if (sj->start_time && sj->start_time > now) continue;
772
773 sj->flags |= SENTINEL_SCRIPT_RUNNING;
774 sj->start_time = mstime();
775 sj->retry_num++;
776 pid = fork();
777
778 if (pid == -1) {
779 /* Parent (fork error).
780 * We report fork errors as signal 99, in order to unify the
781 * reporting with other kind of errors. */
782 sentinelEvent(LL_WARNING,"-script-error",NULL,
783 "%s %d %d", sj->argv[0], 99, 0);
784 sj->flags &= ~SENTINEL_SCRIPT_RUNNING;
785 sj->pid = 0;
786 } else if (pid == 0) {
787 /* Child */
788 resetCpuAffinity(NULL);
789 execve(sj->argv[0],sj->argv,environ);
790 /* If we are here an error occurred. */
791 _exit(2); /* Don't retry execution. */
792 } else {
793 sentinel.running_scripts++;
794 sj->pid = pid;
795 sentinelEvent(LL_DEBUG,"+script-child",NULL,"%ld",(long)pid);
796 }
797 }
798 }
799
800 /* How much to delay the execution of a script that we need to retry after
801 * an error?
802 *
803 * We double the retry delay for every further retry we do. So for instance
804 * if RETRY_DELAY is set to 30 seconds and the max number of retries is 10
805 * starting from the second attempt to execute the script the delays are:
806 * 30 sec, 60 sec, 2 min, 4 min, 8 min, 16 min, 32 min, 64 min, 128 min. */
sentinelScriptRetryDelay(int retry_num)807 mstime_t sentinelScriptRetryDelay(int retry_num) {
808 mstime_t delay = SENTINEL_SCRIPT_RETRY_DELAY;
809
810 while (retry_num-- > 1) delay *= 2;
811 return delay;
812 }
813
814 /* Check for scripts that terminated, and remove them from the queue if the
815 * script terminated successfully. If instead the script was terminated by
816 * a signal, or returned exit code "1", it is scheduled to run again if
817 * the max number of retries did not already elapsed. */
sentinelCollectTerminatedScripts(void)818 void sentinelCollectTerminatedScripts(void) {
819 int statloc;
820 pid_t pid;
821
822 while ((pid = wait3(&statloc,WNOHANG,NULL)) > 0) {
823 int exitcode = WEXITSTATUS(statloc);
824 int bysignal = 0;
825 listNode *ln;
826 sentinelScriptJob *sj;
827
828 if (WIFSIGNALED(statloc)) bysignal = WTERMSIG(statloc);
829 sentinelEvent(LL_DEBUG,"-script-child",NULL,"%ld %d %d",
830 (long)pid, exitcode, bysignal);
831
832 ln = sentinelGetScriptListNodeByPid(pid);
833 if (ln == NULL) {
834 serverLog(LL_WARNING,"wait3() returned a pid (%ld) we can't find in our scripts execution queue!", (long)pid);
835 continue;
836 }
837 sj = ln->value;
838
839 /* If the script was terminated by a signal or returns an
840 * exit code of "1" (that means: please retry), we reschedule it
841 * if the max number of retries is not already reached. */
842 if ((bysignal || exitcode == 1) &&
843 sj->retry_num != SENTINEL_SCRIPT_MAX_RETRY)
844 {
845 sj->flags &= ~SENTINEL_SCRIPT_RUNNING;
846 sj->pid = 0;
847 sj->start_time = mstime() +
848 sentinelScriptRetryDelay(sj->retry_num);
849 } else {
850 /* Otherwise let's remove the script, but log the event if the
851 * execution did not terminated in the best of the ways. */
852 if (bysignal || exitcode != 0) {
853 sentinelEvent(LL_WARNING,"-script-error",NULL,
854 "%s %d %d", sj->argv[0], bysignal, exitcode);
855 }
856 listDelNode(sentinel.scripts_queue,ln);
857 sentinelReleaseScriptJob(sj);
858 sentinel.running_scripts--;
859 }
860 }
861 }
862
863 /* Kill scripts in timeout, they'll be collected by the
864 * sentinelCollectTerminatedScripts() function. */
sentinelKillTimedoutScripts(void)865 void sentinelKillTimedoutScripts(void) {
866 listNode *ln;
867 listIter li;
868 mstime_t now = mstime();
869
870 listRewind(sentinel.scripts_queue,&li);
871 while ((ln = listNext(&li)) != NULL) {
872 sentinelScriptJob *sj = ln->value;
873
874 if (sj->flags & SENTINEL_SCRIPT_RUNNING &&
875 (now - sj->start_time) > SENTINEL_SCRIPT_MAX_RUNTIME)
876 {
877 sentinelEvent(LL_WARNING,"-script-timeout",NULL,"%s %ld",
878 sj->argv[0], (long)sj->pid);
879 kill(sj->pid,SIGKILL);
880 }
881 }
882 }
883
884 /* Implements SENTINEL PENDING-SCRIPTS command. */
sentinelPendingScriptsCommand(client * c)885 void sentinelPendingScriptsCommand(client *c) {
886 listNode *ln;
887 listIter li;
888
889 addReplyMultiBulkLen(c,listLength(sentinel.scripts_queue));
890 listRewind(sentinel.scripts_queue,&li);
891 while ((ln = listNext(&li)) != NULL) {
892 sentinelScriptJob *sj = ln->value;
893 int j = 0;
894
895 addReplyMultiBulkLen(c,10);
896
897 addReplyBulkCString(c,"argv");
898 while (sj->argv[j]) j++;
899 addReplyMultiBulkLen(c,j);
900 j = 0;
901 while (sj->argv[j]) addReplyBulkCString(c,sj->argv[j++]);
902
903 addReplyBulkCString(c,"flags");
904 addReplyBulkCString(c,
905 (sj->flags & SENTINEL_SCRIPT_RUNNING) ? "running" : "scheduled");
906
907 addReplyBulkCString(c,"pid");
908 addReplyBulkLongLong(c,sj->pid);
909
910 if (sj->flags & SENTINEL_SCRIPT_RUNNING) {
911 addReplyBulkCString(c,"run-time");
912 addReplyBulkLongLong(c,mstime() - sj->start_time);
913 } else {
914 mstime_t delay = sj->start_time ? (sj->start_time-mstime()) : 0;
915 if (delay < 0) delay = 0;
916 addReplyBulkCString(c,"run-delay");
917 addReplyBulkLongLong(c,delay);
918 }
919
920 addReplyBulkCString(c,"retry-num");
921 addReplyBulkLongLong(c,sj->retry_num);
922 }
923 }
924
925 /* This function calls, if any, the client reconfiguration script with the
926 * following parameters:
927 *
928 * <master-name> <role> <state> <from-ip> <from-port> <to-ip> <to-port>
929 *
930 * It is called every time a failover is performed.
931 *
932 * <state> is currently always "failover".
933 * <role> is either "leader" or "observer".
934 *
935 * from/to fields are respectively master -> promoted slave addresses for
936 * "start" and "end". */
sentinelCallClientReconfScript(sentinelRedisInstance * master,int role,char * state,sentinelAddr * from,sentinelAddr * to)937 void sentinelCallClientReconfScript(sentinelRedisInstance *master, int role, char *state, sentinelAddr *from, sentinelAddr *to) {
938 char fromport[32], toport[32];
939
940 if (master->client_reconfig_script == NULL) return;
941 ll2string(fromport,sizeof(fromport),from->port);
942 ll2string(toport,sizeof(toport),to->port);
943 sentinelScheduleScriptExecution(master->client_reconfig_script,
944 master->name,
945 (role == SENTINEL_LEADER) ? "leader" : "observer",
946 state, from->ip, fromport, to->ip, toport, NULL);
947 }
948
949 /* =============================== instanceLink ============================= */
950
951 /* Create a not yet connected link object. */
createInstanceLink(void)952 instanceLink *createInstanceLink(void) {
953 instanceLink *link = zmalloc(sizeof(*link));
954
955 link->refcount = 1;
956 link->disconnected = 1;
957 link->pending_commands = 0;
958 link->cc = NULL;
959 link->pc = NULL;
960 link->cc_conn_time = 0;
961 link->pc_conn_time = 0;
962 link->last_reconn_time = 0;
963 link->pc_last_activity = 0;
964 /* We set the act_ping_time to "now" even if we actually don't have yet
965 * a connection with the node, nor we sent a ping.
966 * This is useful to detect a timeout in case we'll not be able to connect
967 * with the node at all. */
968 link->act_ping_time = mstime();
969 link->last_ping_time = 0;
970 link->last_avail_time = mstime();
971 link->last_pong_time = mstime();
972 return link;
973 }
974
975 /* Disconnect an hiredis connection in the context of an instance link. */
instanceLinkCloseConnection(instanceLink * link,redisAsyncContext * c)976 void instanceLinkCloseConnection(instanceLink *link, redisAsyncContext *c) {
977 if (c == NULL) return;
978
979 if (link->cc == c) {
980 link->cc = NULL;
981 link->pending_commands = 0;
982 }
983 if (link->pc == c) link->pc = NULL;
984 c->data = NULL;
985 link->disconnected = 1;
986 redisAsyncFree(c);
987 }
988
989 /* Decrement the refcount of a link object, if it drops to zero, actually
990 * free it and return NULL. Otherwise don't do anything and return the pointer
991 * to the object.
992 *
993 * If we are not going to free the link and ri is not NULL, we rebind all the
994 * pending requests in link->cc (hiredis connection for commands) to a
995 * callback that will just ignore them. This is useful to avoid processing
996 * replies for an instance that no longer exists. */
releaseInstanceLink(instanceLink * link,sentinelRedisInstance * ri)997 instanceLink *releaseInstanceLink(instanceLink *link, sentinelRedisInstance *ri)
998 {
999 serverAssert(link->refcount > 0);
1000 link->refcount--;
1001 if (link->refcount != 0) {
1002 if (ri && ri->link->cc) {
1003 /* This instance may have pending callbacks in the hiredis async
1004 * context, having as 'privdata' the instance that we are going to
1005 * free. Let's rewrite the callback list, directly exploiting
1006 * hiredis internal data structures, in order to bind them with
1007 * a callback that will ignore the reply at all. */
1008 redisCallback *cb;
1009 redisCallbackList *callbacks = &link->cc->replies;
1010
1011 cb = callbacks->head;
1012 while(cb) {
1013 if (cb->privdata == ri) {
1014 cb->fn = sentinelDiscardReplyCallback;
1015 cb->privdata = NULL; /* Not strictly needed. */
1016 }
1017 cb = cb->next;
1018 }
1019 }
1020 return link; /* Other active users. */
1021 }
1022
1023 instanceLinkCloseConnection(link,link->cc);
1024 instanceLinkCloseConnection(link,link->pc);
1025 zfree(link);
1026 return NULL;
1027 }
1028
1029 /* This function will attempt to share the instance link we already have
1030 * for the same Sentinel in the context of a different master, with the
1031 * instance we are passing as argument.
1032 *
1033 * This way multiple Sentinel objects that refer all to the same physical
1034 * Sentinel instance but in the context of different masters will use
1035 * a single connection, will send a single PING per second for failure
1036 * detection and so forth.
1037 *
1038 * Return C_OK if a matching Sentinel was found in the context of a
1039 * different master and sharing was performed. Otherwise C_ERR
1040 * is returned. */
sentinelTryConnectionSharing(sentinelRedisInstance * ri)1041 int sentinelTryConnectionSharing(sentinelRedisInstance *ri) {
1042 serverAssert(ri->flags & SRI_SENTINEL);
1043 dictIterator *di;
1044 dictEntry *de;
1045
1046 if (ri->runid == NULL) return C_ERR; /* No way to identify it. */
1047 if (ri->link->refcount > 1) return C_ERR; /* Already shared. */
1048
1049 di = dictGetIterator(sentinel.masters);
1050 while((de = dictNext(di)) != NULL) {
1051 sentinelRedisInstance *master = dictGetVal(de), *match;
1052 /* We want to share with the same physical Sentinel referenced
1053 * in other masters, so skip our master. */
1054 if (master == ri->master) continue;
1055 match = getSentinelRedisInstanceByAddrAndRunID(master->sentinels,
1056 NULL,0,ri->runid);
1057 if (match == NULL) continue; /* No match. */
1058 if (match == ri) continue; /* Should never happen but... safer. */
1059
1060 /* We identified a matching Sentinel, great! Let's free our link
1061 * and use the one of the matching Sentinel. */
1062 releaseInstanceLink(ri->link,NULL);
1063 ri->link = match->link;
1064 match->link->refcount++;
1065 return C_OK;
1066 }
1067 dictReleaseIterator(di);
1068 return C_ERR;
1069 }
1070
1071 /* When we detect a Sentinel to switch address (reporting a different IP/port
1072 * pair in Hello messages), let's update all the matching Sentinels in the
1073 * context of other masters as well and disconnect the links, so that everybody
1074 * will be updated.
1075 *
1076 * Return the number of updated Sentinel addresses. */
sentinelUpdateSentinelAddressInAllMasters(sentinelRedisInstance * ri)1077 int sentinelUpdateSentinelAddressInAllMasters(sentinelRedisInstance *ri) {
1078 serverAssert(ri->flags & SRI_SENTINEL);
1079 dictIterator *di;
1080 dictEntry *de;
1081 int reconfigured = 0;
1082
1083 di = dictGetIterator(sentinel.masters);
1084 while((de = dictNext(di)) != NULL) {
1085 sentinelRedisInstance *master = dictGetVal(de), *match;
1086 match = getSentinelRedisInstanceByAddrAndRunID(master->sentinels,
1087 NULL,0,ri->runid);
1088 /* If there is no match, this master does not know about this
1089 * Sentinel, try with the next one. */
1090 if (match == NULL) continue;
1091
1092 /* Disconnect the old links if connected. */
1093 if (match->link->cc != NULL)
1094 instanceLinkCloseConnection(match->link,match->link->cc);
1095 if (match->link->pc != NULL)
1096 instanceLinkCloseConnection(match->link,match->link->pc);
1097
1098 if (match == ri) continue; /* Address already updated for it. */
1099
1100 /* Update the address of the matching Sentinel by copying the address
1101 * of the Sentinel object that received the address update. */
1102 releaseSentinelAddr(match->addr);
1103 match->addr = dupSentinelAddr(ri->addr);
1104 reconfigured++;
1105 }
1106 dictReleaseIterator(di);
1107 if (reconfigured)
1108 sentinelEvent(LL_NOTICE,"+sentinel-address-update", ri,
1109 "%@ %d additional matching instances", reconfigured);
1110 return reconfigured;
1111 }
1112
1113 /* This function is called when an hiredis connection reported an error.
1114 * We set it to NULL and mark the link as disconnected so that it will be
1115 * reconnected again.
1116 *
1117 * Note: we don't free the hiredis context as hiredis will do it for us
1118 * for async connections. */
instanceLinkConnectionError(const redisAsyncContext * c)1119 void instanceLinkConnectionError(const redisAsyncContext *c) {
1120 instanceLink *link = c->data;
1121 int pubsub;
1122
1123 if (!link) return;
1124
1125 pubsub = (link->pc == c);
1126 if (pubsub)
1127 link->pc = NULL;
1128 else
1129 link->cc = NULL;
1130 link->disconnected = 1;
1131 }
1132
1133 /* Hiredis connection established / disconnected callbacks. We need them
1134 * just to cleanup our link state. */
sentinelLinkEstablishedCallback(const redisAsyncContext * c,int status)1135 void sentinelLinkEstablishedCallback(const redisAsyncContext *c, int status) {
1136 if (status != C_OK) instanceLinkConnectionError(c);
1137 }
1138
sentinelDisconnectCallback(const redisAsyncContext * c,int status)1139 void sentinelDisconnectCallback(const redisAsyncContext *c, int status) {
1140 UNUSED(status);
1141 instanceLinkConnectionError(c);
1142 }
1143
1144 /* ========================== sentinelRedisInstance ========================= */
1145
1146 /* Create a redis instance, the following fields must be populated by the
1147 * caller if needed:
1148 * runid: set to NULL but will be populated once INFO output is received.
1149 * info_refresh: is set to 0 to mean that we never received INFO so far.
1150 *
1151 * If SRI_MASTER is set into initial flags the instance is added to
1152 * sentinel.masters table.
1153 *
1154 * if SRI_SLAVE or SRI_SENTINEL is set then 'master' must be not NULL and the
1155 * instance is added into master->slaves or master->sentinels table.
1156 *
1157 * If the instance is a slave or sentinel, the name parameter is ignored and
1158 * is created automatically as hostname:port.
1159 *
1160 * The function fails if hostname can't be resolved or port is out of range.
1161 * When this happens NULL is returned and errno is set accordingly to the
1162 * createSentinelAddr() function.
1163 *
1164 * The function may also fail and return NULL with errno set to EBUSY if
1165 * a master with the same name, a slave with the same address, or a sentinel
1166 * with the same ID already exists. */
1167
createSentinelRedisInstance(char * name,int flags,char * hostname,int port,int quorum,sentinelRedisInstance * master)1168 sentinelRedisInstance *createSentinelRedisInstance(char *name, int flags, char *hostname, int port, int quorum, sentinelRedisInstance *master) {
1169 sentinelRedisInstance *ri;
1170 sentinelAddr *addr;
1171 dict *table = NULL;
1172 char slavename[NET_PEER_ID_LEN], *sdsname;
1173
1174 serverAssert(flags & (SRI_MASTER|SRI_SLAVE|SRI_SENTINEL));
1175 serverAssert((flags & SRI_MASTER) || master != NULL);
1176
1177 /* Check address validity. */
1178 addr = createSentinelAddr(hostname,port);
1179 if (addr == NULL) return NULL;
1180
1181 /* For slaves use ip:port as name. */
1182 if (flags & SRI_SLAVE) {
1183 anetFormatAddr(slavename, sizeof(slavename), hostname, port);
1184 name = slavename;
1185 }
1186
1187 /* Make sure the entry is not duplicated. This may happen when the same
1188 * name for a master is used multiple times inside the configuration or
1189 * if we try to add multiple times a slave or sentinel with same ip/port
1190 * to a master. */
1191 if (flags & SRI_MASTER) table = sentinel.masters;
1192 else if (flags & SRI_SLAVE) table = master->slaves;
1193 else if (flags & SRI_SENTINEL) table = master->sentinels;
1194 sdsname = sdsnew(name);
1195 if (dictFind(table,sdsname)) {
1196 releaseSentinelAddr(addr);
1197 sdsfree(sdsname);
1198 errno = EBUSY;
1199 return NULL;
1200 }
1201
1202 /* Create the instance object. */
1203 ri = zmalloc(sizeof(*ri));
1204 /* Note that all the instances are started in the disconnected state,
1205 * the event loop will take care of connecting them. */
1206 ri->flags = flags;
1207 ri->name = sdsname;
1208 ri->runid = NULL;
1209 ri->config_epoch = 0;
1210 ri->addr = addr;
1211 ri->link = createInstanceLink();
1212 ri->last_pub_time = mstime();
1213 ri->last_hello_time = mstime();
1214 ri->last_master_down_reply_time = mstime();
1215 ri->s_down_since_time = 0;
1216 ri->o_down_since_time = 0;
1217 ri->down_after_period = master ? master->down_after_period :
1218 SENTINEL_DEFAULT_DOWN_AFTER;
1219 ri->master_link_down_time = 0;
1220 ri->auth_pass = NULL;
1221 ri->slave_priority = SENTINEL_DEFAULT_SLAVE_PRIORITY;
1222 ri->slave_reconf_sent_time = 0;
1223 ri->slave_master_host = NULL;
1224 ri->slave_master_port = 0;
1225 ri->slave_master_link_status = SENTINEL_MASTER_LINK_STATUS_DOWN;
1226 ri->slave_repl_offset = 0;
1227 ri->sentinels = dictCreate(&instancesDictType,NULL);
1228 ri->quorum = quorum;
1229 ri->parallel_syncs = SENTINEL_DEFAULT_PARALLEL_SYNCS;
1230 ri->master = master;
1231 ri->slaves = dictCreate(&instancesDictType,NULL);
1232 ri->info_refresh = 0;
1233 ri->renamed_commands = dictCreate(&renamedCommandsDictType,NULL);
1234
1235 /* Failover state. */
1236 ri->leader = NULL;
1237 ri->leader_epoch = 0;
1238 ri->failover_epoch = 0;
1239 ri->failover_state = SENTINEL_FAILOVER_STATE_NONE;
1240 ri->failover_state_change_time = 0;
1241 ri->failover_start_time = 0;
1242 ri->failover_timeout = SENTINEL_DEFAULT_FAILOVER_TIMEOUT;
1243 ri->failover_delay_logged = 0;
1244 ri->promoted_slave = NULL;
1245 ri->notification_script = NULL;
1246 ri->client_reconfig_script = NULL;
1247 ri->info = NULL;
1248
1249 /* Role */
1250 ri->role_reported = ri->flags & (SRI_MASTER|SRI_SLAVE);
1251 ri->role_reported_time = mstime();
1252 ri->slave_conf_change_time = mstime();
1253
1254 /* Add into the right table. */
1255 dictAdd(table, ri->name, ri);
1256 return ri;
1257 }
1258
1259 /* Release this instance and all its slaves, sentinels, hiredis connections.
1260 * This function does not take care of unlinking the instance from the main
1261 * masters table (if it is a master) or from its master sentinels/slaves table
1262 * if it is a slave or sentinel. */
releaseSentinelRedisInstance(sentinelRedisInstance * ri)1263 void releaseSentinelRedisInstance(sentinelRedisInstance *ri) {
1264 /* Release all its slaves or sentinels if any. */
1265 dictRelease(ri->sentinels);
1266 dictRelease(ri->slaves);
1267
1268 /* Disconnect the instance. */
1269 releaseInstanceLink(ri->link,ri);
1270
1271 /* Free other resources. */
1272 sdsfree(ri->name);
1273 sdsfree(ri->runid);
1274 sdsfree(ri->notification_script);
1275 sdsfree(ri->client_reconfig_script);
1276 sdsfree(ri->slave_master_host);
1277 sdsfree(ri->leader);
1278 sdsfree(ri->auth_pass);
1279 sdsfree(ri->info);
1280 releaseSentinelAddr(ri->addr);
1281 dictRelease(ri->renamed_commands);
1282
1283 /* Clear state into the master if needed. */
1284 if ((ri->flags & SRI_SLAVE) && (ri->flags & SRI_PROMOTED) && ri->master)
1285 ri->master->promoted_slave = NULL;
1286
1287 zfree(ri);
1288 }
1289
1290 /* Lookup a slave in a master Redis instance, by ip and port. */
sentinelRedisInstanceLookupSlave(sentinelRedisInstance * ri,char * ip,int port)1291 sentinelRedisInstance *sentinelRedisInstanceLookupSlave(
1292 sentinelRedisInstance *ri, char *ip, int port)
1293 {
1294 sds key;
1295 sentinelRedisInstance *slave;
1296 char buf[NET_PEER_ID_LEN];
1297
1298 serverAssert(ri->flags & SRI_MASTER);
1299 anetFormatAddr(buf,sizeof(buf),ip,port);
1300 key = sdsnew(buf);
1301 slave = dictFetchValue(ri->slaves,key);
1302 sdsfree(key);
1303 return slave;
1304 }
1305
1306 /* Return the name of the type of the instance as a string. */
sentinelRedisInstanceTypeStr(sentinelRedisInstance * ri)1307 const char *sentinelRedisInstanceTypeStr(sentinelRedisInstance *ri) {
1308 if (ri->flags & SRI_MASTER) return "master";
1309 else if (ri->flags & SRI_SLAVE) return "slave";
1310 else if (ri->flags & SRI_SENTINEL) return "sentinel";
1311 else return "unknown";
1312 }
1313
1314 /* This function remove the Sentinel with the specified ID from the
1315 * specified master.
1316 *
1317 * If "runid" is NULL the function returns ASAP.
1318 *
1319 * This function is useful because on Sentinels address switch, we want to
1320 * remove our old entry and add a new one for the same ID but with the new
1321 * address.
1322 *
1323 * The function returns 1 if the matching Sentinel was removed, otherwise
1324 * 0 if there was no Sentinel with this ID. */
removeMatchingSentinelFromMaster(sentinelRedisInstance * master,char * runid)1325 int removeMatchingSentinelFromMaster(sentinelRedisInstance *master, char *runid) {
1326 dictIterator *di;
1327 dictEntry *de;
1328 int removed = 0;
1329
1330 if (runid == NULL) return 0;
1331
1332 di = dictGetSafeIterator(master->sentinels);
1333 while((de = dictNext(di)) != NULL) {
1334 sentinelRedisInstance *ri = dictGetVal(de);
1335
1336 if (ri->runid && strcmp(ri->runid,runid) == 0) {
1337 dictDelete(master->sentinels,ri->name);
1338 removed++;
1339 }
1340 }
1341 dictReleaseIterator(di);
1342 return removed;
1343 }
1344
1345 /* Search an instance with the same runid, ip and port into a dictionary
1346 * of instances. Return NULL if not found, otherwise return the instance
1347 * pointer.
1348 *
1349 * runid or ip can be NULL. In such a case the search is performed only
1350 * by the non-NULL field. */
getSentinelRedisInstanceByAddrAndRunID(dict * instances,char * ip,int port,char * runid)1351 sentinelRedisInstance *getSentinelRedisInstanceByAddrAndRunID(dict *instances, char *ip, int port, char *runid) {
1352 dictIterator *di;
1353 dictEntry *de;
1354 sentinelRedisInstance *instance = NULL;
1355
1356 serverAssert(ip || runid); /* User must pass at least one search param. */
1357 di = dictGetIterator(instances);
1358 while((de = dictNext(di)) != NULL) {
1359 sentinelRedisInstance *ri = dictGetVal(de);
1360
1361 if (runid && !ri->runid) continue;
1362 if ((runid == NULL || strcmp(ri->runid, runid) == 0) &&
1363 (ip == NULL || (strcmp(ri->addr->ip, ip) == 0 &&
1364 ri->addr->port == port)))
1365 {
1366 instance = ri;
1367 break;
1368 }
1369 }
1370 dictReleaseIterator(di);
1371 return instance;
1372 }
1373
1374 /* Master lookup by name */
sentinelGetMasterByName(char * name)1375 sentinelRedisInstance *sentinelGetMasterByName(char *name) {
1376 sentinelRedisInstance *ri;
1377 sds sdsname = sdsnew(name);
1378
1379 ri = dictFetchValue(sentinel.masters,sdsname);
1380 sdsfree(sdsname);
1381 return ri;
1382 }
1383
1384 /* Add the specified flags to all the instances in the specified dictionary. */
sentinelAddFlagsToDictOfRedisInstances(dict * instances,int flags)1385 void sentinelAddFlagsToDictOfRedisInstances(dict *instances, int flags) {
1386 dictIterator *di;
1387 dictEntry *de;
1388
1389 di = dictGetIterator(instances);
1390 while((de = dictNext(di)) != NULL) {
1391 sentinelRedisInstance *ri = dictGetVal(de);
1392 ri->flags |= flags;
1393 }
1394 dictReleaseIterator(di);
1395 }
1396
1397 /* Remove the specified flags to all the instances in the specified
1398 * dictionary. */
sentinelDelFlagsToDictOfRedisInstances(dict * instances,int flags)1399 void sentinelDelFlagsToDictOfRedisInstances(dict *instances, int flags) {
1400 dictIterator *di;
1401 dictEntry *de;
1402
1403 di = dictGetIterator(instances);
1404 while((de = dictNext(di)) != NULL) {
1405 sentinelRedisInstance *ri = dictGetVal(de);
1406 ri->flags &= ~flags;
1407 }
1408 dictReleaseIterator(di);
1409 }
1410
1411 /* Reset the state of a monitored master:
1412 * 1) Remove all slaves.
1413 * 2) Remove all sentinels.
1414 * 3) Remove most of the flags resulting from runtime operations.
1415 * 4) Reset timers to their default value. For example after a reset it will be
1416 * possible to failover again the same master ASAP, without waiting the
1417 * failover timeout delay.
1418 * 5) In the process of doing this undo the failover if in progress.
1419 * 6) Disconnect the connections with the master (will reconnect automatically).
1420 */
1421
1422 #define SENTINEL_RESET_NO_SENTINELS (1<<0)
sentinelResetMaster(sentinelRedisInstance * ri,int flags)1423 void sentinelResetMaster(sentinelRedisInstance *ri, int flags) {
1424 serverAssert(ri->flags & SRI_MASTER);
1425 dictRelease(ri->slaves);
1426 ri->slaves = dictCreate(&instancesDictType,NULL);
1427 if (!(flags & SENTINEL_RESET_NO_SENTINELS)) {
1428 dictRelease(ri->sentinels);
1429 ri->sentinels = dictCreate(&instancesDictType,NULL);
1430 }
1431 instanceLinkCloseConnection(ri->link,ri->link->cc);
1432 instanceLinkCloseConnection(ri->link,ri->link->pc);
1433 ri->flags &= SRI_MASTER;
1434 if (ri->leader) {
1435 sdsfree(ri->leader);
1436 ri->leader = NULL;
1437 }
1438 ri->failover_state = SENTINEL_FAILOVER_STATE_NONE;
1439 ri->failover_state_change_time = 0;
1440 ri->failover_start_time = 0; /* We can failover again ASAP. */
1441 ri->promoted_slave = NULL;
1442 sdsfree(ri->runid);
1443 sdsfree(ri->slave_master_host);
1444 ri->runid = NULL;
1445 ri->slave_master_host = NULL;
1446 ri->link->act_ping_time = mstime();
1447 ri->link->last_ping_time = 0;
1448 ri->link->last_avail_time = mstime();
1449 ri->link->last_pong_time = mstime();
1450 ri->role_reported_time = mstime();
1451 ri->role_reported = SRI_MASTER;
1452 if (flags & SENTINEL_GENERATE_EVENT)
1453 sentinelEvent(LL_WARNING,"+reset-master",ri,"%@");
1454 }
1455
1456 /* Call sentinelResetMaster() on every master with a name matching the specified
1457 * pattern. */
sentinelResetMastersByPattern(char * pattern,int flags)1458 int sentinelResetMastersByPattern(char *pattern, int flags) {
1459 dictIterator *di;
1460 dictEntry *de;
1461 int reset = 0;
1462
1463 di = dictGetIterator(sentinel.masters);
1464 while((de = dictNext(di)) != NULL) {
1465 sentinelRedisInstance *ri = dictGetVal(de);
1466
1467 if (ri->name) {
1468 if (stringmatch(pattern,ri->name,0)) {
1469 sentinelResetMaster(ri,flags);
1470 reset++;
1471 }
1472 }
1473 }
1474 dictReleaseIterator(di);
1475 return reset;
1476 }
1477
1478 /* Reset the specified master with sentinelResetMaster(), and also change
1479 * the ip:port address, but take the name of the instance unmodified.
1480 *
1481 * This is used to handle the +switch-master event.
1482 *
1483 * The function returns C_ERR if the address can't be resolved for some
1484 * reason. Otherwise C_OK is returned. */
sentinelResetMasterAndChangeAddress(sentinelRedisInstance * master,char * ip,int port)1485 int sentinelResetMasterAndChangeAddress(sentinelRedisInstance *master, char *ip, int port) {
1486 sentinelAddr *oldaddr, *newaddr;
1487 sentinelAddr **slaves = NULL;
1488 int numslaves = 0, j;
1489 dictIterator *di;
1490 dictEntry *de;
1491
1492 newaddr = createSentinelAddr(ip,port);
1493 if (newaddr == NULL) return C_ERR;
1494
1495 /* Make a list of slaves to add back after the reset.
1496 * Don't include the one having the address we are switching to. */
1497 di = dictGetIterator(master->slaves);
1498 while((de = dictNext(di)) != NULL) {
1499 sentinelRedisInstance *slave = dictGetVal(de);
1500
1501 if (sentinelAddrIsEqual(slave->addr,newaddr)) continue;
1502 slaves = zrealloc(slaves,sizeof(sentinelAddr*)*(numslaves+1));
1503 slaves[numslaves++] = createSentinelAddr(slave->addr->ip,
1504 slave->addr->port);
1505 }
1506 dictReleaseIterator(di);
1507
1508 /* If we are switching to a different address, include the old address
1509 * as a slave as well, so that we'll be able to sense / reconfigure
1510 * the old master. */
1511 if (!sentinelAddrIsEqual(newaddr,master->addr)) {
1512 slaves = zrealloc(slaves,sizeof(sentinelAddr*)*(numslaves+1));
1513 slaves[numslaves++] = createSentinelAddr(master->addr->ip,
1514 master->addr->port);
1515 }
1516
1517 /* Reset and switch address. */
1518 sentinelResetMaster(master,SENTINEL_RESET_NO_SENTINELS);
1519 oldaddr = master->addr;
1520 master->addr = newaddr;
1521 master->o_down_since_time = 0;
1522 master->s_down_since_time = 0;
1523
1524 /* Add slaves back. */
1525 for (j = 0; j < numslaves; j++) {
1526 sentinelRedisInstance *slave;
1527
1528 slave = createSentinelRedisInstance(NULL,SRI_SLAVE,slaves[j]->ip,
1529 slaves[j]->port, master->quorum, master);
1530 releaseSentinelAddr(slaves[j]);
1531 if (slave) sentinelEvent(LL_NOTICE,"+slave",slave,"%@");
1532 }
1533 zfree(slaves);
1534
1535 /* Release the old address at the end so we are safe even if the function
1536 * gets the master->addr->ip and master->addr->port as arguments. */
1537 releaseSentinelAddr(oldaddr);
1538 sentinelFlushConfig();
1539 return C_OK;
1540 }
1541
1542 /* Return non-zero if there was no SDOWN or ODOWN error associated to this
1543 * instance in the latest 'ms' milliseconds. */
sentinelRedisInstanceNoDownFor(sentinelRedisInstance * ri,mstime_t ms)1544 int sentinelRedisInstanceNoDownFor(sentinelRedisInstance *ri, mstime_t ms) {
1545 mstime_t most_recent;
1546
1547 most_recent = ri->s_down_since_time;
1548 if (ri->o_down_since_time > most_recent)
1549 most_recent = ri->o_down_since_time;
1550 return most_recent == 0 || (mstime() - most_recent) > ms;
1551 }
1552
1553 /* Return the current master address, that is, its address or the address
1554 * of the promoted slave if already operational. */
sentinelGetCurrentMasterAddress(sentinelRedisInstance * master)1555 sentinelAddr *sentinelGetCurrentMasterAddress(sentinelRedisInstance *master) {
1556 /* If we are failing over the master, and the state is already
1557 * SENTINEL_FAILOVER_STATE_RECONF_SLAVES or greater, it means that we
1558 * already have the new configuration epoch in the master, and the
1559 * slave acknowledged the configuration switch. Advertise the new
1560 * address. */
1561 if ((master->flags & SRI_FAILOVER_IN_PROGRESS) &&
1562 master->promoted_slave &&
1563 master->failover_state >= SENTINEL_FAILOVER_STATE_RECONF_SLAVES)
1564 {
1565 return master->promoted_slave->addr;
1566 } else {
1567 return master->addr;
1568 }
1569 }
1570
1571 /* This function sets the down_after_period field value in 'master' to all
1572 * the slaves and sentinel instances connected to this master. */
sentinelPropagateDownAfterPeriod(sentinelRedisInstance * master)1573 void sentinelPropagateDownAfterPeriod(sentinelRedisInstance *master) {
1574 dictIterator *di;
1575 dictEntry *de;
1576 int j;
1577 dict *d[] = {master->slaves, master->sentinels, NULL};
1578
1579 for (j = 0; d[j]; j++) {
1580 di = dictGetIterator(d[j]);
1581 while((de = dictNext(di)) != NULL) {
1582 sentinelRedisInstance *ri = dictGetVal(de);
1583 ri->down_after_period = master->down_after_period;
1584 }
1585 dictReleaseIterator(di);
1586 }
1587 }
1588
sentinelGetInstanceTypeString(sentinelRedisInstance * ri)1589 char *sentinelGetInstanceTypeString(sentinelRedisInstance *ri) {
1590 if (ri->flags & SRI_MASTER) return "master";
1591 else if (ri->flags & SRI_SLAVE) return "slave";
1592 else if (ri->flags & SRI_SENTINEL) return "sentinel";
1593 else return "unknown";
1594 }
1595
1596 /* This function is used in order to send commands to Redis instances: the
1597 * commands we send from Sentinel may be renamed, a common case is a master
1598 * with CONFIG and SLAVEOF commands renamed for security concerns. In that
1599 * case we check the ri->renamed_command table (or if the instance is a slave,
1600 * we check the one of the master), and map the command that we should send
1601 * to the set of renamed commads. However, if the command was not renamed,
1602 * we just return "command" itself. */
sentinelInstanceMapCommand(sentinelRedisInstance * ri,char * command)1603 char *sentinelInstanceMapCommand(sentinelRedisInstance *ri, char *command) {
1604 sds sc = sdsnew(command);
1605 if (ri->master) ri = ri->master;
1606 char *retval = dictFetchValue(ri->renamed_commands, sc);
1607 sdsfree(sc);
1608 return retval ? retval : command;
1609 }
1610
1611 /* ============================ Config handling ============================= */
sentinelHandleConfiguration(char ** argv,int argc)1612 char *sentinelHandleConfiguration(char **argv, int argc) {
1613 sentinelRedisInstance *ri;
1614
1615 if (!strcasecmp(argv[0],"monitor") && argc == 5) {
1616 /* monitor <name> <host> <port> <quorum> */
1617 int quorum = atoi(argv[4]);
1618
1619 if (quorum <= 0) return "Quorum must be 1 or greater.";
1620 if (createSentinelRedisInstance(argv[1],SRI_MASTER,argv[2],
1621 atoi(argv[3]),quorum,NULL) == NULL)
1622 {
1623 switch(errno) {
1624 case EBUSY: return "Duplicated master name.";
1625 case ENOENT: return "Can't resolve master instance hostname.";
1626 case EINVAL: return "Invalid port number";
1627 }
1628 }
1629 } else if (!strcasecmp(argv[0],"down-after-milliseconds") && argc == 3) {
1630 /* down-after-milliseconds <name> <milliseconds> */
1631 ri = sentinelGetMasterByName(argv[1]);
1632 if (!ri) return "No such master with specified name.";
1633 ri->down_after_period = atoi(argv[2]);
1634 if (ri->down_after_period <= 0)
1635 return "negative or zero time parameter.";
1636 sentinelPropagateDownAfterPeriod(ri);
1637 } else if (!strcasecmp(argv[0],"failover-timeout") && argc == 3) {
1638 /* failover-timeout <name> <milliseconds> */
1639 ri = sentinelGetMasterByName(argv[1]);
1640 if (!ri) return "No such master with specified name.";
1641 ri->failover_timeout = atoi(argv[2]);
1642 if (ri->failover_timeout <= 0)
1643 return "negative or zero time parameter.";
1644 } else if (!strcasecmp(argv[0],"parallel-syncs") && argc == 3) {
1645 /* parallel-syncs <name> <milliseconds> */
1646 ri = sentinelGetMasterByName(argv[1]);
1647 if (!ri) return "No such master with specified name.";
1648 ri->parallel_syncs = atoi(argv[2]);
1649 } else if (!strcasecmp(argv[0],"notification-script") && argc == 3) {
1650 /* notification-script <name> <path> */
1651 ri = sentinelGetMasterByName(argv[1]);
1652 if (!ri) return "No such master with specified name.";
1653 if (access(argv[2],X_OK) == -1)
1654 return "Notification script seems non existing or non executable.";
1655 ri->notification_script = sdsnew(argv[2]);
1656 } else if (!strcasecmp(argv[0],"client-reconfig-script") && argc == 3) {
1657 /* client-reconfig-script <name> <path> */
1658 ri = sentinelGetMasterByName(argv[1]);
1659 if (!ri) return "No such master with specified name.";
1660 if (access(argv[2],X_OK) == -1)
1661 return "Client reconfiguration script seems non existing or "
1662 "non executable.";
1663 ri->client_reconfig_script = sdsnew(argv[2]);
1664 } else if (!strcasecmp(argv[0],"auth-pass") && argc == 3) {
1665 /* auth-pass <name> <password> */
1666 ri = sentinelGetMasterByName(argv[1]);
1667 if (!ri) return "No such master with specified name.";
1668 ri->auth_pass = sdsnew(argv[2]);
1669 } else if (!strcasecmp(argv[0],"current-epoch") && argc == 2) {
1670 /* current-epoch <epoch> */
1671 unsigned long long current_epoch = strtoull(argv[1],NULL,10);
1672 if (current_epoch > sentinel.current_epoch)
1673 sentinel.current_epoch = current_epoch;
1674 } else if (!strcasecmp(argv[0],"myid") && argc == 2) {
1675 if (strlen(argv[1]) != CONFIG_RUN_ID_SIZE)
1676 return "Malformed Sentinel id in myid option.";
1677 memcpy(sentinel.myid,argv[1],CONFIG_RUN_ID_SIZE);
1678 } else if (!strcasecmp(argv[0],"config-epoch") && argc == 3) {
1679 /* config-epoch <name> <epoch> */
1680 ri = sentinelGetMasterByName(argv[1]);
1681 if (!ri) return "No such master with specified name.";
1682 ri->config_epoch = strtoull(argv[2],NULL,10);
1683 /* The following update of current_epoch is not really useful as
1684 * now the current epoch is persisted on the config file, but
1685 * we leave this check here for redundancy. */
1686 if (ri->config_epoch > sentinel.current_epoch)
1687 sentinel.current_epoch = ri->config_epoch;
1688 } else if (!strcasecmp(argv[0],"leader-epoch") && argc == 3) {
1689 /* leader-epoch <name> <epoch> */
1690 ri = sentinelGetMasterByName(argv[1]);
1691 if (!ri) return "No such master with specified name.";
1692 ri->leader_epoch = strtoull(argv[2],NULL,10);
1693 } else if ((!strcasecmp(argv[0],"known-slave") ||
1694 !strcasecmp(argv[0],"known-replica")) && argc == 4)
1695 {
1696 sentinelRedisInstance *slave;
1697
1698 /* known-replica <name> <ip> <port> */
1699 ri = sentinelGetMasterByName(argv[1]);
1700 if (!ri) return "No such master with specified name.";
1701 if ((slave = createSentinelRedisInstance(NULL,SRI_SLAVE,argv[2],
1702 atoi(argv[3]), ri->quorum, ri)) == NULL)
1703 {
1704 return "Wrong hostname or port for replica.";
1705 }
1706 } else if (!strcasecmp(argv[0],"known-sentinel") &&
1707 (argc == 4 || argc == 5)) {
1708 sentinelRedisInstance *si;
1709
1710 if (argc == 5) { /* Ignore the old form without runid. */
1711 /* known-sentinel <name> <ip> <port> [runid] */
1712 ri = sentinelGetMasterByName(argv[1]);
1713 if (!ri) return "No such master with specified name.";
1714 if ((si = createSentinelRedisInstance(argv[4],SRI_SENTINEL,argv[2],
1715 atoi(argv[3]), ri->quorum, ri)) == NULL)
1716 {
1717 return "Wrong hostname or port for sentinel.";
1718 }
1719 si->runid = sdsnew(argv[4]);
1720 sentinelTryConnectionSharing(si);
1721 }
1722 } else if (!strcasecmp(argv[0],"rename-command") && argc == 4) {
1723 /* rename-command <name> <command> <renamed-command> */
1724 ri = sentinelGetMasterByName(argv[1]);
1725 if (!ri) return "No such master with specified name.";
1726 sds oldcmd = sdsnew(argv[2]);
1727 sds newcmd = sdsnew(argv[3]);
1728 if (dictAdd(ri->renamed_commands,oldcmd,newcmd) != DICT_OK) {
1729 sdsfree(oldcmd);
1730 sdsfree(newcmd);
1731 return "Same command renamed multiple times with rename-command.";
1732 }
1733 } else if (!strcasecmp(argv[0],"announce-ip") && argc == 2) {
1734 /* announce-ip <ip-address> */
1735 if (strlen(argv[1]))
1736 sentinel.announce_ip = sdsnew(argv[1]);
1737 } else if (!strcasecmp(argv[0],"announce-port") && argc == 2) {
1738 /* announce-port <port> */
1739 sentinel.announce_port = atoi(argv[1]);
1740 } else if (!strcasecmp(argv[0],"deny-scripts-reconfig") && argc == 2) {
1741 /* deny-scripts-reconfig <yes|no> */
1742 if ((sentinel.deny_scripts_reconfig = yesnotoi(argv[1])) == -1) {
1743 return "Please specify yes or no for the "
1744 "deny-scripts-reconfig options.";
1745 }
1746 } else {
1747 return "Unrecognized sentinel configuration statement.";
1748 }
1749 return NULL;
1750 }
1751
1752 /* Implements CONFIG REWRITE for "sentinel" option.
1753 * This is used not just to rewrite the configuration given by the user
1754 * (the configured masters) but also in order to retain the state of
1755 * Sentinel across restarts: config epoch of masters, associated slaves
1756 * and sentinel instances, and so forth. */
rewriteConfigSentinelOption(struct rewriteConfigState * state)1757 void rewriteConfigSentinelOption(struct rewriteConfigState *state) {
1758 dictIterator *di, *di2;
1759 dictEntry *de;
1760 sds line;
1761
1762 /* sentinel unique ID. */
1763 line = sdscatprintf(sdsempty(), "sentinel myid %s", sentinel.myid);
1764 rewriteConfigRewriteLine(state,"sentinel",line,1);
1765
1766 /* sentinel deny-scripts-reconfig. */
1767 line = sdscatprintf(sdsempty(), "sentinel deny-scripts-reconfig %s",
1768 sentinel.deny_scripts_reconfig ? "yes" : "no");
1769 rewriteConfigRewriteLine(state,"sentinel",line,
1770 sentinel.deny_scripts_reconfig != SENTINEL_DEFAULT_DENY_SCRIPTS_RECONFIG);
1771
1772 /* For every master emit a "sentinel monitor" config entry. */
1773 di = dictGetIterator(sentinel.masters);
1774 while((de = dictNext(di)) != NULL) {
1775 sentinelRedisInstance *master, *ri;
1776 sentinelAddr *master_addr;
1777
1778 /* sentinel monitor */
1779 master = dictGetVal(de);
1780 master_addr = sentinelGetCurrentMasterAddress(master);
1781 line = sdscatprintf(sdsempty(),"sentinel monitor %s %s %d %d",
1782 master->name, master_addr->ip, master_addr->port,
1783 master->quorum);
1784 rewriteConfigRewriteLine(state,"sentinel",line,1);
1785
1786 /* sentinel down-after-milliseconds */
1787 if (master->down_after_period != SENTINEL_DEFAULT_DOWN_AFTER) {
1788 line = sdscatprintf(sdsempty(),
1789 "sentinel down-after-milliseconds %s %ld",
1790 master->name, (long) master->down_after_period);
1791 rewriteConfigRewriteLine(state,"sentinel",line,1);
1792 }
1793
1794 /* sentinel failover-timeout */
1795 if (master->failover_timeout != SENTINEL_DEFAULT_FAILOVER_TIMEOUT) {
1796 line = sdscatprintf(sdsempty(),
1797 "sentinel failover-timeout %s %ld",
1798 master->name, (long) master->failover_timeout);
1799 rewriteConfigRewriteLine(state,"sentinel",line,1);
1800 }
1801
1802 /* sentinel parallel-syncs */
1803 if (master->parallel_syncs != SENTINEL_DEFAULT_PARALLEL_SYNCS) {
1804 line = sdscatprintf(sdsempty(),
1805 "sentinel parallel-syncs %s %d",
1806 master->name, master->parallel_syncs);
1807 rewriteConfigRewriteLine(state,"sentinel",line,1);
1808 }
1809
1810 /* sentinel notification-script */
1811 if (master->notification_script) {
1812 line = sdscatprintf(sdsempty(),
1813 "sentinel notification-script %s %s",
1814 master->name, master->notification_script);
1815 rewriteConfigRewriteLine(state,"sentinel",line,1);
1816 }
1817
1818 /* sentinel client-reconfig-script */
1819 if (master->client_reconfig_script) {
1820 line = sdscatprintf(sdsempty(),
1821 "sentinel client-reconfig-script %s %s",
1822 master->name, master->client_reconfig_script);
1823 rewriteConfigRewriteLine(state,"sentinel",line,1);
1824 }
1825
1826 /* sentinel auth-pass */
1827 if (master->auth_pass) {
1828 line = sdscatprintf(sdsempty(),
1829 "sentinel auth-pass %s %s",
1830 master->name, master->auth_pass);
1831 rewriteConfigRewriteLine(state,"sentinel",line,1);
1832 }
1833
1834 /* sentinel config-epoch */
1835 line = sdscatprintf(sdsempty(),
1836 "sentinel config-epoch %s %llu",
1837 master->name, (unsigned long long) master->config_epoch);
1838 rewriteConfigRewriteLine(state,"sentinel",line,1);
1839
1840 /* sentinel leader-epoch */
1841 line = sdscatprintf(sdsempty(),
1842 "sentinel leader-epoch %s %llu",
1843 master->name, (unsigned long long) master->leader_epoch);
1844 rewriteConfigRewriteLine(state,"sentinel",line,1);
1845
1846 /* sentinel known-slave */
1847 di2 = dictGetIterator(master->slaves);
1848 while((de = dictNext(di2)) != NULL) {
1849 sentinelAddr *slave_addr;
1850
1851 ri = dictGetVal(de);
1852 slave_addr = ri->addr;
1853
1854 /* If master_addr (obtained using sentinelGetCurrentMasterAddress()
1855 * so it may be the address of the promoted slave) is equal to this
1856 * slave's address, a failover is in progress and the slave was
1857 * already successfully promoted. So as the address of this slave
1858 * we use the old master address instead. */
1859 if (sentinelAddrIsEqual(slave_addr,master_addr))
1860 slave_addr = master->addr;
1861 line = sdscatprintf(sdsempty(),
1862 "sentinel known-replica %s %s %d",
1863 master->name, slave_addr->ip, slave_addr->port);
1864 rewriteConfigRewriteLine(state,"sentinel",line,1);
1865 }
1866 dictReleaseIterator(di2);
1867
1868 /* sentinel known-sentinel */
1869 di2 = dictGetIterator(master->sentinels);
1870 while((de = dictNext(di2)) != NULL) {
1871 ri = dictGetVal(de);
1872 if (ri->runid == NULL) continue;
1873 line = sdscatprintf(sdsempty(),
1874 "sentinel known-sentinel %s %s %d %s",
1875 master->name, ri->addr->ip, ri->addr->port, ri->runid);
1876 rewriteConfigRewriteLine(state,"sentinel",line,1);
1877 }
1878 dictReleaseIterator(di2);
1879
1880 /* sentinel rename-command */
1881 di2 = dictGetIterator(master->renamed_commands);
1882 while((de = dictNext(di2)) != NULL) {
1883 sds oldname = dictGetKey(de);
1884 sds newname = dictGetVal(de);
1885 line = sdscatprintf(sdsempty(),
1886 "sentinel rename-command %s %s %s",
1887 master->name, oldname, newname);
1888 rewriteConfigRewriteLine(state,"sentinel",line,1);
1889 }
1890 dictReleaseIterator(di2);
1891 }
1892
1893 /* sentinel current-epoch is a global state valid for all the masters. */
1894 line = sdscatprintf(sdsempty(),
1895 "sentinel current-epoch %llu", (unsigned long long) sentinel.current_epoch);
1896 rewriteConfigRewriteLine(state,"sentinel",line,1);
1897
1898 /* sentinel announce-ip. */
1899 if (sentinel.announce_ip) {
1900 line = sdsnew("sentinel announce-ip ");
1901 line = sdscatrepr(line, sentinel.announce_ip, sdslen(sentinel.announce_ip));
1902 rewriteConfigRewriteLine(state,"sentinel",line,1);
1903 }
1904
1905 /* sentinel announce-port. */
1906 if (sentinel.announce_port) {
1907 line = sdscatprintf(sdsempty(),"sentinel announce-port %d",
1908 sentinel.announce_port);
1909 rewriteConfigRewriteLine(state,"sentinel",line,1);
1910 }
1911
1912 dictReleaseIterator(di);
1913 }
1914
1915 /* This function uses the config rewriting Redis engine in order to persist
1916 * the state of the Sentinel in the current configuration file.
1917 *
1918 * Before returning the function calls fsync() against the generated
1919 * configuration file to make sure changes are committed to disk.
1920 *
1921 * On failure the function logs a warning on the Redis log. */
sentinelFlushConfig(void)1922 void sentinelFlushConfig(void) {
1923 int fd = -1;
1924 int saved_hz = server.hz;
1925 int rewrite_status;
1926
1927 server.hz = CONFIG_DEFAULT_HZ;
1928 rewrite_status = rewriteConfig(server.configfile);
1929 server.hz = saved_hz;
1930
1931 if (rewrite_status == -1) goto werr;
1932 if ((fd = open(server.configfile,O_RDONLY)) == -1) goto werr;
1933 if (fsync(fd) == -1) goto werr;
1934 if (close(fd) == EOF) goto werr;
1935 return;
1936
1937 werr:
1938 if (fd != -1) close(fd);
1939 serverLog(LL_WARNING,"WARNING: Sentinel was not able to save the new configuration on disk!!!: %s", strerror(errno));
1940 }
1941
1942 /* ====================== hiredis connection handling ======================= */
1943
1944 /* Send the AUTH command with the specified master password if needed.
1945 * Note that for slaves the password set for the master is used.
1946 *
1947 * In case this Sentinel requires a password as well, via the "requirepass"
1948 * configuration directive, we assume we should use the local password in
1949 * order to authenticate when connecting with the other Sentinels as well.
1950 * So basically all the Sentinels share the same password and use it to
1951 * authenticate reciprocally.
1952 *
1953 * We don't check at all if the command was successfully transmitted
1954 * to the instance as if it fails Sentinel will detect the instance down,
1955 * will disconnect and reconnect the link and so forth. */
sentinelSendAuthIfNeeded(sentinelRedisInstance * ri,redisAsyncContext * c)1956 void sentinelSendAuthIfNeeded(sentinelRedisInstance *ri, redisAsyncContext *c) {
1957 char *auth_pass = NULL;
1958
1959 if (ri->flags & SRI_MASTER) {
1960 auth_pass = ri->auth_pass;
1961 } else if (ri->flags & SRI_SLAVE) {
1962 auth_pass = ri->master->auth_pass;
1963 } else if (ri->flags & SRI_SENTINEL) {
1964 if (server.requirepass) auth_pass = server.requirepass;
1965 }
1966
1967 if (auth_pass) {
1968 if (redisAsyncCommand(c, sentinelDiscardReplyCallback, ri, "%s %s",
1969 sentinelInstanceMapCommand(ri,"AUTH"),
1970 auth_pass) == C_OK) ri->link->pending_commands++;
1971 }
1972 }
1973
1974 /* Use CLIENT SETNAME to name the connection in the Redis instance as
1975 * sentinel-<first_8_chars_of_runid>-<connection_type>
1976 * The connection type is "cmd" or "pubsub" as specified by 'type'.
1977 *
1978 * This makes it possible to list all the sentinel instances connected
1979 * to a Redis servewr with CLIENT LIST, grepping for a specific name format. */
sentinelSetClientName(sentinelRedisInstance * ri,redisAsyncContext * c,char * type)1980 void sentinelSetClientName(sentinelRedisInstance *ri, redisAsyncContext *c, char *type) {
1981 char name[64];
1982
1983 snprintf(name,sizeof(name),"sentinel-%.8s-%s",sentinel.myid,type);
1984 if (redisAsyncCommand(c, sentinelDiscardReplyCallback, ri,
1985 "%s SETNAME %s",
1986 sentinelInstanceMapCommand(ri,"CLIENT"),
1987 name) == C_OK)
1988 {
1989 ri->link->pending_commands++;
1990 }
1991 }
1992
1993 /* Create the async connections for the instance link if the link
1994 * is disconnected. Note that link->disconnected is true even if just
1995 * one of the two links (commands and pub/sub) is missing. */
sentinelReconnectInstance(sentinelRedisInstance * ri)1996 void sentinelReconnectInstance(sentinelRedisInstance *ri) {
1997 if (ri->link->disconnected == 0) return;
1998 if (ri->addr->port == 0) return; /* port == 0 means invalid address. */
1999 instanceLink *link = ri->link;
2000 mstime_t now = mstime();
2001
2002 if (now - ri->link->last_reconn_time < SENTINEL_PING_PERIOD) return;
2003 ri->link->last_reconn_time = now;
2004
2005 /* Commands connection. */
2006 if (link->cc == NULL) {
2007 link->cc = redisAsyncConnectBind(ri->addr->ip,ri->addr->port,NET_FIRST_BIND_ADDR);
2008 if (link->cc->err) {
2009 sentinelEvent(LL_DEBUG,"-cmd-link-reconnection",ri,"%@ #%s",
2010 link->cc->errstr);
2011 instanceLinkCloseConnection(link,link->cc);
2012 } else {
2013 link->pending_commands = 0;
2014 link->cc_conn_time = mstime();
2015 link->cc->data = link;
2016 redisAeAttach(server.el,link->cc);
2017 redisAsyncSetConnectCallback(link->cc,
2018 sentinelLinkEstablishedCallback);
2019 redisAsyncSetDisconnectCallback(link->cc,
2020 sentinelDisconnectCallback);
2021 sentinelSendAuthIfNeeded(ri,link->cc);
2022 sentinelSetClientName(ri,link->cc,"cmd");
2023
2024 /* Send a PING ASAP when reconnecting. */
2025 sentinelSendPing(ri);
2026 }
2027 }
2028 /* Pub / Sub */
2029 if ((ri->flags & (SRI_MASTER|SRI_SLAVE)) && link->pc == NULL) {
2030 link->pc = redisAsyncConnectBind(ri->addr->ip,ri->addr->port,NET_FIRST_BIND_ADDR);
2031 if (link->pc->err) {
2032 sentinelEvent(LL_DEBUG,"-pubsub-link-reconnection",ri,"%@ #%s",
2033 link->pc->errstr);
2034 instanceLinkCloseConnection(link,link->pc);
2035 } else {
2036 int retval;
2037
2038 link->pc_conn_time = mstime();
2039 link->pc->data = link;
2040 redisAeAttach(server.el,link->pc);
2041 redisAsyncSetConnectCallback(link->pc,
2042 sentinelLinkEstablishedCallback);
2043 redisAsyncSetDisconnectCallback(link->pc,
2044 sentinelDisconnectCallback);
2045 sentinelSendAuthIfNeeded(ri,link->pc);
2046 sentinelSetClientName(ri,link->pc,"pubsub");
2047 /* Now we subscribe to the Sentinels "Hello" channel. */
2048 retval = redisAsyncCommand(link->pc,
2049 sentinelReceiveHelloMessages, ri, "%s %s",
2050 sentinelInstanceMapCommand(ri,"SUBSCRIBE"),
2051 SENTINEL_HELLO_CHANNEL);
2052 if (retval != C_OK) {
2053 /* If we can't subscribe, the Pub/Sub connection is useless
2054 * and we can simply disconnect it and try again. */
2055 instanceLinkCloseConnection(link,link->pc);
2056 return;
2057 }
2058 }
2059 }
2060 /* Clear the disconnected status only if we have both the connections
2061 * (or just the commands connection if this is a sentinel instance). */
2062 if (link->cc && (ri->flags & SRI_SENTINEL || link->pc))
2063 link->disconnected = 0;
2064 }
2065
2066 /* ======================== Redis instances pinging ======================== */
2067
2068 /* Return true if master looks "sane", that is:
2069 * 1) It is actually a master in the current configuration.
2070 * 2) It reports itself as a master.
2071 * 3) It is not SDOWN or ODOWN.
2072 * 4) We obtained last INFO no more than two times the INFO period time ago. */
sentinelMasterLooksSane(sentinelRedisInstance * master)2073 int sentinelMasterLooksSane(sentinelRedisInstance *master) {
2074 return
2075 master->flags & SRI_MASTER &&
2076 master->role_reported == SRI_MASTER &&
2077 (master->flags & (SRI_S_DOWN|SRI_O_DOWN)) == 0 &&
2078 (mstime() - master->info_refresh) < SENTINEL_INFO_PERIOD*2;
2079 }
2080
2081 /* Process the INFO output from masters. */
sentinelRefreshInstanceInfo(sentinelRedisInstance * ri,const char * info)2082 void sentinelRefreshInstanceInfo(sentinelRedisInstance *ri, const char *info) {
2083 sds *lines;
2084 int numlines, j;
2085 int role = 0;
2086
2087 /* cache full INFO output for instance */
2088 sdsfree(ri->info);
2089 ri->info = sdsnew(info);
2090
2091 /* The following fields must be reset to a given value in the case they
2092 * are not found at all in the INFO output. */
2093 ri->master_link_down_time = 0;
2094
2095 /* Process line by line. */
2096 lines = sdssplitlen(info,strlen(info),"\r\n",2,&numlines);
2097 for (j = 0; j < numlines; j++) {
2098 sentinelRedisInstance *slave;
2099 sds l = lines[j];
2100
2101 /* run_id:<40 hex chars>*/
2102 if (sdslen(l) >= 47 && !memcmp(l,"run_id:",7)) {
2103 if (ri->runid == NULL) {
2104 ri->runid = sdsnewlen(l+7,40);
2105 } else {
2106 if (strncmp(ri->runid,l+7,40) != 0) {
2107 sentinelEvent(LL_NOTICE,"+reboot",ri,"%@");
2108 sdsfree(ri->runid);
2109 ri->runid = sdsnewlen(l+7,40);
2110 }
2111 }
2112 }
2113
2114 /* old versions: slave0:<ip>,<port>,<state>
2115 * new versions: slave0:ip=127.0.0.1,port=9999,... */
2116 if ((ri->flags & SRI_MASTER) &&
2117 sdslen(l) >= 7 &&
2118 !memcmp(l,"slave",5) && isdigit(l[5]))
2119 {
2120 char *ip, *port, *end;
2121
2122 if (strstr(l,"ip=") == NULL) {
2123 /* Old format. */
2124 ip = strchr(l,':'); if (!ip) continue;
2125 ip++; /* Now ip points to start of ip address. */
2126 port = strchr(ip,','); if (!port) continue;
2127 *port = '\0'; /* nul term for easy access. */
2128 port++; /* Now port points to start of port number. */
2129 end = strchr(port,','); if (!end) continue;
2130 *end = '\0'; /* nul term for easy access. */
2131 } else {
2132 /* New format. */
2133 ip = strstr(l,"ip="); if (!ip) continue;
2134 ip += 3; /* Now ip points to start of ip address. */
2135 port = strstr(l,"port="); if (!port) continue;
2136 port += 5; /* Now port points to start of port number. */
2137 /* Nul term both fields for easy access. */
2138 end = strchr(ip,','); if (end) *end = '\0';
2139 end = strchr(port,','); if (end) *end = '\0';
2140 }
2141
2142 /* Check if we already have this slave into our table,
2143 * otherwise add it. */
2144 if (sentinelRedisInstanceLookupSlave(ri,ip,atoi(port)) == NULL) {
2145 if ((slave = createSentinelRedisInstance(NULL,SRI_SLAVE,ip,
2146 atoi(port), ri->quorum, ri)) != NULL)
2147 {
2148 sentinelEvent(LL_NOTICE,"+slave",slave,"%@");
2149 sentinelFlushConfig();
2150 }
2151 }
2152 }
2153
2154 /* master_link_down_since_seconds:<seconds> */
2155 if (sdslen(l) >= 32 &&
2156 !memcmp(l,"master_link_down_since_seconds",30))
2157 {
2158 ri->master_link_down_time = strtoll(l+31,NULL,10)*1000;
2159 }
2160
2161 /* role:<role> */
2162 if (!memcmp(l,"role:master",11)) role = SRI_MASTER;
2163 else if (!memcmp(l,"role:slave",10)) role = SRI_SLAVE;
2164
2165 if (role == SRI_SLAVE) {
2166 /* master_host:<host> */
2167 if (sdslen(l) >= 12 && !memcmp(l,"master_host:",12)) {
2168 if (ri->slave_master_host == NULL ||
2169 strcasecmp(l+12,ri->slave_master_host))
2170 {
2171 sdsfree(ri->slave_master_host);
2172 ri->slave_master_host = sdsnew(l+12);
2173 ri->slave_conf_change_time = mstime();
2174 }
2175 }
2176
2177 /* master_port:<port> */
2178 if (sdslen(l) >= 12 && !memcmp(l,"master_port:",12)) {
2179 int slave_master_port = atoi(l+12);
2180
2181 if (ri->slave_master_port != slave_master_port) {
2182 ri->slave_master_port = slave_master_port;
2183 ri->slave_conf_change_time = mstime();
2184 }
2185 }
2186
2187 /* master_link_status:<status> */
2188 if (sdslen(l) >= 19 && !memcmp(l,"master_link_status:",19)) {
2189 ri->slave_master_link_status =
2190 (strcasecmp(l+19,"up") == 0) ?
2191 SENTINEL_MASTER_LINK_STATUS_UP :
2192 SENTINEL_MASTER_LINK_STATUS_DOWN;
2193 }
2194
2195 /* slave_priority:<priority> */
2196 if (sdslen(l) >= 15 && !memcmp(l,"slave_priority:",15))
2197 ri->slave_priority = atoi(l+15);
2198
2199 /* slave_repl_offset:<offset> */
2200 if (sdslen(l) >= 18 && !memcmp(l,"slave_repl_offset:",18))
2201 ri->slave_repl_offset = strtoull(l+18,NULL,10);
2202 }
2203 }
2204 ri->info_refresh = mstime();
2205 sdsfreesplitres(lines,numlines);
2206
2207 /* ---------------------------- Acting half -----------------------------
2208 * Some things will not happen if sentinel.tilt is true, but some will
2209 * still be processed. */
2210
2211 /* Remember when the role changed. */
2212 if (role != ri->role_reported) {
2213 ri->role_reported_time = mstime();
2214 ri->role_reported = role;
2215 if (role == SRI_SLAVE) ri->slave_conf_change_time = mstime();
2216 /* Log the event with +role-change if the new role is coherent or
2217 * with -role-change if there is a mismatch with the current config. */
2218 sentinelEvent(LL_VERBOSE,
2219 ((ri->flags & (SRI_MASTER|SRI_SLAVE)) == role) ?
2220 "+role-change" : "-role-change",
2221 ri, "%@ new reported role is %s",
2222 role == SRI_MASTER ? "master" : "slave",
2223 ri->flags & SRI_MASTER ? "master" : "slave");
2224 }
2225
2226 /* None of the following conditions are processed when in tilt mode, so
2227 * return asap. */
2228 if (sentinel.tilt) return;
2229
2230 /* Handle master -> slave role switch. */
2231 if ((ri->flags & SRI_MASTER) && role == SRI_SLAVE) {
2232 /* Nothing to do, but masters claiming to be slaves are
2233 * considered to be unreachable by Sentinel, so eventually
2234 * a failover will be triggered. */
2235 }
2236
2237 /* Handle slave -> master role switch. */
2238 if ((ri->flags & SRI_SLAVE) && role == SRI_MASTER) {
2239 /* If this is a promoted slave we can change state to the
2240 * failover state machine. */
2241 if ((ri->flags & SRI_PROMOTED) &&
2242 (ri->master->flags & SRI_FAILOVER_IN_PROGRESS) &&
2243 (ri->master->failover_state ==
2244 SENTINEL_FAILOVER_STATE_WAIT_PROMOTION))
2245 {
2246 /* Now that we are sure the slave was reconfigured as a master
2247 * set the master configuration epoch to the epoch we won the
2248 * election to perform this failover. This will force the other
2249 * Sentinels to update their config (assuming there is not
2250 * a newer one already available). */
2251 ri->master->config_epoch = ri->master->failover_epoch;
2252 ri->master->failover_state = SENTINEL_FAILOVER_STATE_RECONF_SLAVES;
2253 ri->master->failover_state_change_time = mstime();
2254 sentinelFlushConfig();
2255 sentinelEvent(LL_WARNING,"+promoted-slave",ri,"%@");
2256 if (sentinel.simfailure_flags &
2257 SENTINEL_SIMFAILURE_CRASH_AFTER_PROMOTION)
2258 sentinelSimFailureCrash();
2259 sentinelEvent(LL_WARNING,"+failover-state-reconf-slaves",
2260 ri->master,"%@");
2261 sentinelCallClientReconfScript(ri->master,SENTINEL_LEADER,
2262 "start",ri->master->addr,ri->addr);
2263 sentinelForceHelloUpdateForMaster(ri->master);
2264 } else {
2265 /* A slave turned into a master. We want to force our view and
2266 * reconfigure as slave. Wait some time after the change before
2267 * going forward, to receive new configs if any. */
2268 mstime_t wait_time = SENTINEL_PUBLISH_PERIOD*4;
2269
2270 if (!(ri->flags & SRI_PROMOTED) &&
2271 sentinelMasterLooksSane(ri->master) &&
2272 sentinelRedisInstanceNoDownFor(ri,wait_time) &&
2273 mstime() - ri->role_reported_time > wait_time)
2274 {
2275 int retval = sentinelSendSlaveOf(ri,
2276 ri->master->addr->ip,
2277 ri->master->addr->port);
2278 if (retval == C_OK)
2279 sentinelEvent(LL_NOTICE,"+convert-to-slave",ri,"%@");
2280 }
2281 }
2282 }
2283
2284 /* Handle slaves replicating to a different master address. */
2285 if ((ri->flags & SRI_SLAVE) &&
2286 role == SRI_SLAVE &&
2287 (ri->slave_master_port != ri->master->addr->port ||
2288 strcasecmp(ri->slave_master_host,ri->master->addr->ip)))
2289 {
2290 mstime_t wait_time = ri->master->failover_timeout;
2291
2292 /* Make sure the master is sane before reconfiguring this instance
2293 * into a slave. */
2294 if (sentinelMasterLooksSane(ri->master) &&
2295 sentinelRedisInstanceNoDownFor(ri,wait_time) &&
2296 mstime() - ri->slave_conf_change_time > wait_time)
2297 {
2298 int retval = sentinelSendSlaveOf(ri,
2299 ri->master->addr->ip,
2300 ri->master->addr->port);
2301 if (retval == C_OK)
2302 sentinelEvent(LL_NOTICE,"+fix-slave-config",ri,"%@");
2303 }
2304 }
2305
2306 /* Detect if the slave that is in the process of being reconfigured
2307 * changed state. */
2308 if ((ri->flags & SRI_SLAVE) && role == SRI_SLAVE &&
2309 (ri->flags & (SRI_RECONF_SENT|SRI_RECONF_INPROG)))
2310 {
2311 /* SRI_RECONF_SENT -> SRI_RECONF_INPROG. */
2312 if ((ri->flags & SRI_RECONF_SENT) &&
2313 ri->slave_master_host &&
2314 strcmp(ri->slave_master_host,
2315 ri->master->promoted_slave->addr->ip) == 0 &&
2316 ri->slave_master_port == ri->master->promoted_slave->addr->port)
2317 {
2318 ri->flags &= ~SRI_RECONF_SENT;
2319 ri->flags |= SRI_RECONF_INPROG;
2320 sentinelEvent(LL_NOTICE,"+slave-reconf-inprog",ri,"%@");
2321 }
2322
2323 /* SRI_RECONF_INPROG -> SRI_RECONF_DONE */
2324 if ((ri->flags & SRI_RECONF_INPROG) &&
2325 ri->slave_master_link_status == SENTINEL_MASTER_LINK_STATUS_UP)
2326 {
2327 ri->flags &= ~SRI_RECONF_INPROG;
2328 ri->flags |= SRI_RECONF_DONE;
2329 sentinelEvent(LL_NOTICE,"+slave-reconf-done",ri,"%@");
2330 }
2331 }
2332 }
2333
sentinelInfoReplyCallback(redisAsyncContext * c,void * reply,void * privdata)2334 void sentinelInfoReplyCallback(redisAsyncContext *c, void *reply, void *privdata) {
2335 sentinelRedisInstance *ri = privdata;
2336 instanceLink *link = c->data;
2337 redisReply *r;
2338
2339 if (!reply || !link) return;
2340 link->pending_commands--;
2341 r = reply;
2342
2343 if (r->type == REDIS_REPLY_STRING)
2344 sentinelRefreshInstanceInfo(ri,r->str);
2345 }
2346
2347 /* Just discard the reply. We use this when we are not monitoring the return
2348 * value of the command but its effects directly. */
sentinelDiscardReplyCallback(redisAsyncContext * c,void * reply,void * privdata)2349 void sentinelDiscardReplyCallback(redisAsyncContext *c, void *reply, void *privdata) {
2350 instanceLink *link = c->data;
2351 UNUSED(reply);
2352 UNUSED(privdata);
2353
2354 if (link) link->pending_commands--;
2355 }
2356
sentinelPingReplyCallback(redisAsyncContext * c,void * reply,void * privdata)2357 void sentinelPingReplyCallback(redisAsyncContext *c, void *reply, void *privdata) {
2358 sentinelRedisInstance *ri = privdata;
2359 instanceLink *link = c->data;
2360 redisReply *r;
2361
2362 if (!reply || !link) return;
2363 link->pending_commands--;
2364 r = reply;
2365
2366 if (r->type == REDIS_REPLY_STATUS ||
2367 r->type == REDIS_REPLY_ERROR) {
2368 /* Update the "instance available" field only if this is an
2369 * acceptable reply. */
2370 if (strncmp(r->str,"PONG",4) == 0 ||
2371 strncmp(r->str,"LOADING",7) == 0 ||
2372 strncmp(r->str,"MASTERDOWN",10) == 0)
2373 {
2374 link->last_avail_time = mstime();
2375 link->act_ping_time = 0; /* Flag the pong as received. */
2376 } else {
2377 /* Send a SCRIPT KILL command if the instance appears to be
2378 * down because of a busy script. */
2379 if (strncmp(r->str,"BUSY",4) == 0 &&
2380 (ri->flags & SRI_S_DOWN) &&
2381 !(ri->flags & SRI_SCRIPT_KILL_SENT))
2382 {
2383 if (redisAsyncCommand(ri->link->cc,
2384 sentinelDiscardReplyCallback, ri,
2385 "%s KILL",
2386 sentinelInstanceMapCommand(ri,"SCRIPT")) == C_OK)
2387 {
2388 ri->link->pending_commands++;
2389 }
2390 ri->flags |= SRI_SCRIPT_KILL_SENT;
2391 }
2392 }
2393 }
2394 link->last_pong_time = mstime();
2395 }
2396
2397 /* This is called when we get the reply about the PUBLISH command we send
2398 * to the master to advertise this sentinel. */
sentinelPublishReplyCallback(redisAsyncContext * c,void * reply,void * privdata)2399 void sentinelPublishReplyCallback(redisAsyncContext *c, void *reply, void *privdata) {
2400 sentinelRedisInstance *ri = privdata;
2401 instanceLink *link = c->data;
2402 redisReply *r;
2403
2404 if (!reply || !link) return;
2405 link->pending_commands--;
2406 r = reply;
2407
2408 /* Only update pub_time if we actually published our message. Otherwise
2409 * we'll retry again in 100 milliseconds. */
2410 if (r->type != REDIS_REPLY_ERROR)
2411 ri->last_pub_time = mstime();
2412 }
2413
2414 /* Process an hello message received via Pub/Sub in master or slave instance,
2415 * or sent directly to this sentinel via the (fake) PUBLISH command of Sentinel.
2416 *
2417 * If the master name specified in the message is not known, the message is
2418 * discarded. */
sentinelProcessHelloMessage(char * hello,int hello_len)2419 void sentinelProcessHelloMessage(char *hello, int hello_len) {
2420 /* Format is composed of 8 tokens:
2421 * 0=ip,1=port,2=runid,3=current_epoch,4=master_name,
2422 * 5=master_ip,6=master_port,7=master_config_epoch. */
2423 int numtokens, port, removed, master_port;
2424 uint64_t current_epoch, master_config_epoch;
2425 char **token = sdssplitlen(hello, hello_len, ",", 1, &numtokens);
2426 sentinelRedisInstance *si, *master;
2427
2428 if (numtokens == 8) {
2429 /* Obtain a reference to the master this hello message is about */
2430 master = sentinelGetMasterByName(token[4]);
2431 if (!master) goto cleanup; /* Unknown master, skip the message. */
2432
2433 /* First, try to see if we already have this sentinel. */
2434 port = atoi(token[1]);
2435 master_port = atoi(token[6]);
2436 si = getSentinelRedisInstanceByAddrAndRunID(
2437 master->sentinels,token[0],port,token[2]);
2438 current_epoch = strtoull(token[3],NULL,10);
2439 master_config_epoch = strtoull(token[7],NULL,10);
2440
2441 if (!si) {
2442 /* If not, remove all the sentinels that have the same runid
2443 * because there was an address change, and add the same Sentinel
2444 * with the new address back. */
2445 removed = removeMatchingSentinelFromMaster(master,token[2]);
2446 if (removed) {
2447 sentinelEvent(LL_NOTICE,"+sentinel-address-switch",master,
2448 "%@ ip %s port %d for %s", token[0],port,token[2]);
2449 } else {
2450 /* Check if there is another Sentinel with the same address this
2451 * new one is reporting. What we do if this happens is to set its
2452 * port to 0, to signal the address is invalid. We'll update it
2453 * later if we get an HELLO message. */
2454 sentinelRedisInstance *other =
2455 getSentinelRedisInstanceByAddrAndRunID(
2456 master->sentinels, token[0],port,NULL);
2457 if (other) {
2458 sentinelEvent(LL_NOTICE,"+sentinel-invalid-addr",other,"%@");
2459 other->addr->port = 0; /* It means: invalid address. */
2460 sentinelUpdateSentinelAddressInAllMasters(other);
2461 }
2462 }
2463
2464 /* Add the new sentinel. */
2465 si = createSentinelRedisInstance(token[2],SRI_SENTINEL,
2466 token[0],port,master->quorum,master);
2467
2468 if (si) {
2469 if (!removed) sentinelEvent(LL_NOTICE,"+sentinel",si,"%@");
2470 /* The runid is NULL after a new instance creation and
2471 * for Sentinels we don't have a later chance to fill it,
2472 * so do it now. */
2473 si->runid = sdsnew(token[2]);
2474 sentinelTryConnectionSharing(si);
2475 if (removed) sentinelUpdateSentinelAddressInAllMasters(si);
2476 sentinelFlushConfig();
2477 }
2478 }
2479
2480 /* Update local current_epoch if received current_epoch is greater.*/
2481 if (current_epoch > sentinel.current_epoch) {
2482 sentinel.current_epoch = current_epoch;
2483 sentinelFlushConfig();
2484 sentinelEvent(LL_WARNING,"+new-epoch",master,"%llu",
2485 (unsigned long long) sentinel.current_epoch);
2486 }
2487
2488 /* Update master info if received configuration is newer. */
2489 if (si && master->config_epoch < master_config_epoch) {
2490 master->config_epoch = master_config_epoch;
2491 if (master_port != master->addr->port ||
2492 strcmp(master->addr->ip, token[5]))
2493 {
2494 sentinelAddr *old_addr;
2495
2496 sentinelEvent(LL_WARNING,"+config-update-from",si,"%@");
2497 sentinelEvent(LL_WARNING,"+switch-master",
2498 master,"%s %s %d %s %d",
2499 master->name,
2500 master->addr->ip, master->addr->port,
2501 token[5], master_port);
2502
2503 old_addr = dupSentinelAddr(master->addr);
2504 sentinelResetMasterAndChangeAddress(master, token[5], master_port);
2505 sentinelCallClientReconfScript(master,
2506 SENTINEL_OBSERVER,"start",
2507 old_addr,master->addr);
2508 releaseSentinelAddr(old_addr);
2509 }
2510 }
2511
2512 /* Update the state of the Sentinel. */
2513 if (si) si->last_hello_time = mstime();
2514 }
2515
2516 cleanup:
2517 sdsfreesplitres(token,numtokens);
2518 }
2519
2520
2521 /* This is our Pub/Sub callback for the Hello channel. It's useful in order
2522 * to discover other sentinels attached at the same master. */
sentinelReceiveHelloMessages(redisAsyncContext * c,void * reply,void * privdata)2523 void sentinelReceiveHelloMessages(redisAsyncContext *c, void *reply, void *privdata) {
2524 sentinelRedisInstance *ri = privdata;
2525 redisReply *r;
2526 UNUSED(c);
2527
2528 if (!reply || !ri) return;
2529 r = reply;
2530
2531 /* Update the last activity in the pubsub channel. Note that since we
2532 * receive our messages as well this timestamp can be used to detect
2533 * if the link is probably disconnected even if it seems otherwise. */
2534 ri->link->pc_last_activity = mstime();
2535
2536 /* Sanity check in the reply we expect, so that the code that follows
2537 * can avoid to check for details. */
2538 if (r->type != REDIS_REPLY_ARRAY ||
2539 r->elements != 3 ||
2540 r->element[0]->type != REDIS_REPLY_STRING ||
2541 r->element[1]->type != REDIS_REPLY_STRING ||
2542 r->element[2]->type != REDIS_REPLY_STRING ||
2543 strcmp(r->element[0]->str,"message") != 0) return;
2544
2545 /* We are not interested in meeting ourselves */
2546 if (strstr(r->element[2]->str,sentinel.myid) != NULL) return;
2547
2548 sentinelProcessHelloMessage(r->element[2]->str, r->element[2]->len);
2549 }
2550
2551 /* Send an "Hello" message via Pub/Sub to the specified 'ri' Redis
2552 * instance in order to broadcast the current configuration for this
2553 * master, and to advertise the existence of this Sentinel at the same time.
2554 *
2555 * The message has the following format:
2556 *
2557 * sentinel_ip,sentinel_port,sentinel_runid,current_epoch,
2558 * master_name,master_ip,master_port,master_config_epoch.
2559 *
2560 * Returns C_OK if the PUBLISH was queued correctly, otherwise
2561 * C_ERR is returned. */
sentinelSendHello(sentinelRedisInstance * ri)2562 int sentinelSendHello(sentinelRedisInstance *ri) {
2563 char ip[NET_IP_STR_LEN];
2564 char payload[NET_IP_STR_LEN+1024];
2565 int retval;
2566 char *announce_ip;
2567 int announce_port;
2568 sentinelRedisInstance *master = (ri->flags & SRI_MASTER) ? ri : ri->master;
2569 sentinelAddr *master_addr = sentinelGetCurrentMasterAddress(master);
2570
2571 if (ri->link->disconnected) return C_ERR;
2572
2573 /* Use the specified announce address if specified, otherwise try to
2574 * obtain our own IP address. */
2575 if (sentinel.announce_ip) {
2576 announce_ip = sentinel.announce_ip;
2577 } else {
2578 if (anetSockName(ri->link->cc->c.fd,ip,sizeof(ip),NULL) == -1)
2579 return C_ERR;
2580 announce_ip = ip;
2581 }
2582 announce_port = sentinel.announce_port ?
2583 sentinel.announce_port : server.port;
2584
2585 /* Format and send the Hello message. */
2586 snprintf(payload,sizeof(payload),
2587 "%s,%d,%s,%llu," /* Info about this sentinel. */
2588 "%s,%s,%d,%llu", /* Info about current master. */
2589 announce_ip, announce_port, sentinel.myid,
2590 (unsigned long long) sentinel.current_epoch,
2591 /* --- */
2592 master->name,master_addr->ip,master_addr->port,
2593 (unsigned long long) master->config_epoch);
2594 retval = redisAsyncCommand(ri->link->cc,
2595 sentinelPublishReplyCallback, ri, "%s %s %s",
2596 sentinelInstanceMapCommand(ri,"PUBLISH"),
2597 SENTINEL_HELLO_CHANNEL,payload);
2598 if (retval != C_OK) return C_ERR;
2599 ri->link->pending_commands++;
2600 return C_OK;
2601 }
2602
2603 /* Reset last_pub_time in all the instances in the specified dictionary
2604 * in order to force the delivery of an Hello update ASAP. */
sentinelForceHelloUpdateDictOfRedisInstances(dict * instances)2605 void sentinelForceHelloUpdateDictOfRedisInstances(dict *instances) {
2606 dictIterator *di;
2607 dictEntry *de;
2608
2609 di = dictGetSafeIterator(instances);
2610 while((de = dictNext(di)) != NULL) {
2611 sentinelRedisInstance *ri = dictGetVal(de);
2612 if (ri->last_pub_time >= (SENTINEL_PUBLISH_PERIOD+1))
2613 ri->last_pub_time -= (SENTINEL_PUBLISH_PERIOD+1);
2614 }
2615 dictReleaseIterator(di);
2616 }
2617
2618 /* This function forces the delivery of an "Hello" message (see
2619 * sentinelSendHello() top comment for further information) to all the Redis
2620 * and Sentinel instances related to the specified 'master'.
2621 *
2622 * It is technically not needed since we send an update to every instance
2623 * with a period of SENTINEL_PUBLISH_PERIOD milliseconds, however when a
2624 * Sentinel upgrades a configuration it is a good idea to deliever an update
2625 * to the other Sentinels ASAP. */
sentinelForceHelloUpdateForMaster(sentinelRedisInstance * master)2626 int sentinelForceHelloUpdateForMaster(sentinelRedisInstance *master) {
2627 if (!(master->flags & SRI_MASTER)) return C_ERR;
2628 if (master->last_pub_time >= (SENTINEL_PUBLISH_PERIOD+1))
2629 master->last_pub_time -= (SENTINEL_PUBLISH_PERIOD+1);
2630 sentinelForceHelloUpdateDictOfRedisInstances(master->sentinels);
2631 sentinelForceHelloUpdateDictOfRedisInstances(master->slaves);
2632 return C_OK;
2633 }
2634
2635 /* Send a PING to the specified instance and refresh the act_ping_time
2636 * if it is zero (that is, if we received a pong for the previous ping).
2637 *
2638 * On error zero is returned, and we can't consider the PING command
2639 * queued in the connection. */
sentinelSendPing(sentinelRedisInstance * ri)2640 int sentinelSendPing(sentinelRedisInstance *ri) {
2641 int retval = redisAsyncCommand(ri->link->cc,
2642 sentinelPingReplyCallback, ri, "%s",
2643 sentinelInstanceMapCommand(ri,"PING"));
2644 if (retval == C_OK) {
2645 ri->link->pending_commands++;
2646 ri->link->last_ping_time = mstime();
2647 /* We update the active ping time only if we received the pong for
2648 * the previous ping, otherwise we are technically waiting since the
2649 * first ping that did not receive a reply. */
2650 if (ri->link->act_ping_time == 0)
2651 ri->link->act_ping_time = ri->link->last_ping_time;
2652 return 1;
2653 } else {
2654 return 0;
2655 }
2656 }
2657
2658 /* Send periodic PING, INFO, and PUBLISH to the Hello channel to
2659 * the specified master or slave instance. */
sentinelSendPeriodicCommands(sentinelRedisInstance * ri)2660 void sentinelSendPeriodicCommands(sentinelRedisInstance *ri) {
2661 mstime_t now = mstime();
2662 mstime_t info_period, ping_period;
2663 int retval;
2664
2665 /* Return ASAP if we have already a PING or INFO already pending, or
2666 * in the case the instance is not properly connected. */
2667 if (ri->link->disconnected) return;
2668
2669 /* For INFO, PING, PUBLISH that are not critical commands to send we
2670 * also have a limit of SENTINEL_MAX_PENDING_COMMANDS. We don't
2671 * want to use a lot of memory just because a link is not working
2672 * properly (note that anyway there is a redundant protection about this,
2673 * that is, the link will be disconnected and reconnected if a long
2674 * timeout condition is detected. */
2675 if (ri->link->pending_commands >=
2676 SENTINEL_MAX_PENDING_COMMANDS * ri->link->refcount) return;
2677
2678 /* If this is a slave of a master in O_DOWN condition we start sending
2679 * it INFO every second, instead of the usual SENTINEL_INFO_PERIOD
2680 * period. In this state we want to closely monitor slaves in case they
2681 * are turned into masters by another Sentinel, or by the sysadmin.
2682 *
2683 * Similarly we monitor the INFO output more often if the slave reports
2684 * to be disconnected from the master, so that we can have a fresh
2685 * disconnection time figure. */
2686 if ((ri->flags & SRI_SLAVE) &&
2687 ((ri->master->flags & (SRI_O_DOWN|SRI_FAILOVER_IN_PROGRESS)) ||
2688 (ri->master_link_down_time != 0)))
2689 {
2690 info_period = 1000;
2691 } else {
2692 info_period = SENTINEL_INFO_PERIOD;
2693 }
2694
2695 /* We ping instances every time the last received pong is older than
2696 * the configured 'down-after-milliseconds' time, but every second
2697 * anyway if 'down-after-milliseconds' is greater than 1 second. */
2698 ping_period = ri->down_after_period;
2699 if (ping_period > SENTINEL_PING_PERIOD) ping_period = SENTINEL_PING_PERIOD;
2700
2701 /* Send INFO to masters and slaves, not sentinels. */
2702 if ((ri->flags & SRI_SENTINEL) == 0 &&
2703 (ri->info_refresh == 0 ||
2704 (now - ri->info_refresh) > info_period))
2705 {
2706 retval = redisAsyncCommand(ri->link->cc,
2707 sentinelInfoReplyCallback, ri, "%s",
2708 sentinelInstanceMapCommand(ri,"INFO"));
2709 if (retval == C_OK) ri->link->pending_commands++;
2710 }
2711
2712 /* Send PING to all the three kinds of instances. */
2713 if ((now - ri->link->last_pong_time) > ping_period &&
2714 (now - ri->link->last_ping_time) > ping_period/2) {
2715 sentinelSendPing(ri);
2716 }
2717
2718 /* PUBLISH hello messages to all the three kinds of instances. */
2719 if ((now - ri->last_pub_time) > SENTINEL_PUBLISH_PERIOD) {
2720 sentinelSendHello(ri);
2721 }
2722 }
2723
2724 /* =========================== SENTINEL command ============================= */
2725
sentinelFailoverStateStr(int state)2726 const char *sentinelFailoverStateStr(int state) {
2727 switch(state) {
2728 case SENTINEL_FAILOVER_STATE_NONE: return "none";
2729 case SENTINEL_FAILOVER_STATE_WAIT_START: return "wait_start";
2730 case SENTINEL_FAILOVER_STATE_SELECT_SLAVE: return "select_slave";
2731 case SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE: return "send_slaveof_noone";
2732 case SENTINEL_FAILOVER_STATE_WAIT_PROMOTION: return "wait_promotion";
2733 case SENTINEL_FAILOVER_STATE_RECONF_SLAVES: return "reconf_slaves";
2734 case SENTINEL_FAILOVER_STATE_UPDATE_CONFIG: return "update_config";
2735 default: return "unknown";
2736 }
2737 }
2738
2739 /* Redis instance to Redis protocol representation. */
addReplySentinelRedisInstance(client * c,sentinelRedisInstance * ri)2740 void addReplySentinelRedisInstance(client *c, sentinelRedisInstance *ri) {
2741 char *flags = sdsempty();
2742 void *mbl;
2743 int fields = 0;
2744
2745 mbl = addDeferredMultiBulkLength(c);
2746
2747 addReplyBulkCString(c,"name");
2748 addReplyBulkCString(c,ri->name);
2749 fields++;
2750
2751 addReplyBulkCString(c,"ip");
2752 addReplyBulkCString(c,ri->addr->ip);
2753 fields++;
2754
2755 addReplyBulkCString(c,"port");
2756 addReplyBulkLongLong(c,ri->addr->port);
2757 fields++;
2758
2759 addReplyBulkCString(c,"runid");
2760 addReplyBulkCString(c,ri->runid ? ri->runid : "");
2761 fields++;
2762
2763 addReplyBulkCString(c,"flags");
2764 if (ri->flags & SRI_S_DOWN) flags = sdscat(flags,"s_down,");
2765 if (ri->flags & SRI_O_DOWN) flags = sdscat(flags,"o_down,");
2766 if (ri->flags & SRI_MASTER) flags = sdscat(flags,"master,");
2767 if (ri->flags & SRI_SLAVE) flags = sdscat(flags,"slave,");
2768 if (ri->flags & SRI_SENTINEL) flags = sdscat(flags,"sentinel,");
2769 if (ri->link->disconnected) flags = sdscat(flags,"disconnected,");
2770 if (ri->flags & SRI_MASTER_DOWN) flags = sdscat(flags,"master_down,");
2771 if (ri->flags & SRI_FAILOVER_IN_PROGRESS)
2772 flags = sdscat(flags,"failover_in_progress,");
2773 if (ri->flags & SRI_PROMOTED) flags = sdscat(flags,"promoted,");
2774 if (ri->flags & SRI_RECONF_SENT) flags = sdscat(flags,"reconf_sent,");
2775 if (ri->flags & SRI_RECONF_INPROG) flags = sdscat(flags,"reconf_inprog,");
2776 if (ri->flags & SRI_RECONF_DONE) flags = sdscat(flags,"reconf_done,");
2777
2778 if (sdslen(flags) != 0) sdsrange(flags,0,-2); /* remove last "," */
2779 addReplyBulkCString(c,flags);
2780 sdsfree(flags);
2781 fields++;
2782
2783 addReplyBulkCString(c,"link-pending-commands");
2784 addReplyBulkLongLong(c,ri->link->pending_commands);
2785 fields++;
2786
2787 addReplyBulkCString(c,"link-refcount");
2788 addReplyBulkLongLong(c,ri->link->refcount);
2789 fields++;
2790
2791 if (ri->flags & SRI_FAILOVER_IN_PROGRESS) {
2792 addReplyBulkCString(c,"failover-state");
2793 addReplyBulkCString(c,(char*)sentinelFailoverStateStr(ri->failover_state));
2794 fields++;
2795 }
2796
2797 addReplyBulkCString(c,"last-ping-sent");
2798 addReplyBulkLongLong(c,
2799 ri->link->act_ping_time ? (mstime() - ri->link->act_ping_time) : 0);
2800 fields++;
2801
2802 addReplyBulkCString(c,"last-ok-ping-reply");
2803 addReplyBulkLongLong(c,mstime() - ri->link->last_avail_time);
2804 fields++;
2805
2806 addReplyBulkCString(c,"last-ping-reply");
2807 addReplyBulkLongLong(c,mstime() - ri->link->last_pong_time);
2808 fields++;
2809
2810 if (ri->flags & SRI_S_DOWN) {
2811 addReplyBulkCString(c,"s-down-time");
2812 addReplyBulkLongLong(c,mstime()-ri->s_down_since_time);
2813 fields++;
2814 }
2815
2816 if (ri->flags & SRI_O_DOWN) {
2817 addReplyBulkCString(c,"o-down-time");
2818 addReplyBulkLongLong(c,mstime()-ri->o_down_since_time);
2819 fields++;
2820 }
2821
2822 addReplyBulkCString(c,"down-after-milliseconds");
2823 addReplyBulkLongLong(c,ri->down_after_period);
2824 fields++;
2825
2826 /* Masters and Slaves */
2827 if (ri->flags & (SRI_MASTER|SRI_SLAVE)) {
2828 addReplyBulkCString(c,"info-refresh");
2829 addReplyBulkLongLong(c,mstime() - ri->info_refresh);
2830 fields++;
2831
2832 addReplyBulkCString(c,"role-reported");
2833 addReplyBulkCString(c, (ri->role_reported == SRI_MASTER) ? "master" :
2834 "slave");
2835 fields++;
2836
2837 addReplyBulkCString(c,"role-reported-time");
2838 addReplyBulkLongLong(c,mstime() - ri->role_reported_time);
2839 fields++;
2840 }
2841
2842 /* Only masters */
2843 if (ri->flags & SRI_MASTER) {
2844 addReplyBulkCString(c,"config-epoch");
2845 addReplyBulkLongLong(c,ri->config_epoch);
2846 fields++;
2847
2848 addReplyBulkCString(c,"num-slaves");
2849 addReplyBulkLongLong(c,dictSize(ri->slaves));
2850 fields++;
2851
2852 addReplyBulkCString(c,"num-other-sentinels");
2853 addReplyBulkLongLong(c,dictSize(ri->sentinels));
2854 fields++;
2855
2856 addReplyBulkCString(c,"quorum");
2857 addReplyBulkLongLong(c,ri->quorum);
2858 fields++;
2859
2860 addReplyBulkCString(c,"failover-timeout");
2861 addReplyBulkLongLong(c,ri->failover_timeout);
2862 fields++;
2863
2864 addReplyBulkCString(c,"parallel-syncs");
2865 addReplyBulkLongLong(c,ri->parallel_syncs);
2866 fields++;
2867
2868 if (ri->notification_script) {
2869 addReplyBulkCString(c,"notification-script");
2870 addReplyBulkCString(c,ri->notification_script);
2871 fields++;
2872 }
2873
2874 if (ri->client_reconfig_script) {
2875 addReplyBulkCString(c,"client-reconfig-script");
2876 addReplyBulkCString(c,ri->client_reconfig_script);
2877 fields++;
2878 }
2879 }
2880
2881 /* Only slaves */
2882 if (ri->flags & SRI_SLAVE) {
2883 addReplyBulkCString(c,"master-link-down-time");
2884 addReplyBulkLongLong(c,ri->master_link_down_time);
2885 fields++;
2886
2887 addReplyBulkCString(c,"master-link-status");
2888 addReplyBulkCString(c,
2889 (ri->slave_master_link_status == SENTINEL_MASTER_LINK_STATUS_UP) ?
2890 "ok" : "err");
2891 fields++;
2892
2893 addReplyBulkCString(c,"master-host");
2894 addReplyBulkCString(c,
2895 ri->slave_master_host ? ri->slave_master_host : "?");
2896 fields++;
2897
2898 addReplyBulkCString(c,"master-port");
2899 addReplyBulkLongLong(c,ri->slave_master_port);
2900 fields++;
2901
2902 addReplyBulkCString(c,"slave-priority");
2903 addReplyBulkLongLong(c,ri->slave_priority);
2904 fields++;
2905
2906 addReplyBulkCString(c,"slave-repl-offset");
2907 addReplyBulkLongLong(c,ri->slave_repl_offset);
2908 fields++;
2909 }
2910
2911 /* Only sentinels */
2912 if (ri->flags & SRI_SENTINEL) {
2913 addReplyBulkCString(c,"last-hello-message");
2914 addReplyBulkLongLong(c,mstime() - ri->last_hello_time);
2915 fields++;
2916
2917 addReplyBulkCString(c,"voted-leader");
2918 addReplyBulkCString(c,ri->leader ? ri->leader : "?");
2919 fields++;
2920
2921 addReplyBulkCString(c,"voted-leader-epoch");
2922 addReplyBulkLongLong(c,ri->leader_epoch);
2923 fields++;
2924 }
2925
2926 setDeferredMultiBulkLength(c,mbl,fields*2);
2927 }
2928
2929 /* Output a number of instances contained inside a dictionary as
2930 * Redis protocol. */
addReplyDictOfRedisInstances(client * c,dict * instances)2931 void addReplyDictOfRedisInstances(client *c, dict *instances) {
2932 dictIterator *di;
2933 dictEntry *de;
2934
2935 di = dictGetIterator(instances);
2936 addReplyMultiBulkLen(c,dictSize(instances));
2937 while((de = dictNext(di)) != NULL) {
2938 sentinelRedisInstance *ri = dictGetVal(de);
2939
2940 addReplySentinelRedisInstance(c,ri);
2941 }
2942 dictReleaseIterator(di);
2943 }
2944
2945 /* Lookup the named master into sentinel.masters.
2946 * If the master is not found reply to the client with an error and returns
2947 * NULL. */
sentinelGetMasterByNameOrReplyError(client * c,robj * name)2948 sentinelRedisInstance *sentinelGetMasterByNameOrReplyError(client *c,
2949 robj *name)
2950 {
2951 sentinelRedisInstance *ri;
2952
2953 ri = dictFetchValue(sentinel.masters,name->ptr);
2954 if (!ri) {
2955 addReplyError(c,"No such master with that name");
2956 return NULL;
2957 }
2958 return ri;
2959 }
2960
2961 #define SENTINEL_ISQR_OK 0
2962 #define SENTINEL_ISQR_NOQUORUM (1<<0)
2963 #define SENTINEL_ISQR_NOAUTH (1<<1)
sentinelIsQuorumReachable(sentinelRedisInstance * master,int * usableptr)2964 int sentinelIsQuorumReachable(sentinelRedisInstance *master, int *usableptr) {
2965 dictIterator *di;
2966 dictEntry *de;
2967 int usable = 1; /* Number of usable Sentinels. Init to 1 to count myself. */
2968 int result = SENTINEL_ISQR_OK;
2969 int voters = dictSize(master->sentinels)+1; /* Known Sentinels + myself. */
2970
2971 di = dictGetIterator(master->sentinels);
2972 while((de = dictNext(di)) != NULL) {
2973 sentinelRedisInstance *ri = dictGetVal(de);
2974
2975 if (ri->flags & (SRI_S_DOWN|SRI_O_DOWN)) continue;
2976 usable++;
2977 }
2978 dictReleaseIterator(di);
2979
2980 if (usable < (int)master->quorum) result |= SENTINEL_ISQR_NOQUORUM;
2981 if (usable < voters/2+1) result |= SENTINEL_ISQR_NOAUTH;
2982 if (usableptr) *usableptr = usable;
2983 return result;
2984 }
2985
sentinelCommand(client * c)2986 void sentinelCommand(client *c) {
2987 if (!strcasecmp(c->argv[1]->ptr,"masters")) {
2988 /* SENTINEL MASTERS */
2989 if (c->argc != 2) goto numargserr;
2990 addReplyDictOfRedisInstances(c,sentinel.masters);
2991 } else if (!strcasecmp(c->argv[1]->ptr,"master")) {
2992 /* SENTINEL MASTER <name> */
2993 sentinelRedisInstance *ri;
2994
2995 if (c->argc != 3) goto numargserr;
2996 if ((ri = sentinelGetMasterByNameOrReplyError(c,c->argv[2]))
2997 == NULL) return;
2998 addReplySentinelRedisInstance(c,ri);
2999 } else if (!strcasecmp(c->argv[1]->ptr,"slaves") ||
3000 !strcasecmp(c->argv[1]->ptr,"replicas"))
3001 {
3002 /* SENTINEL REPLICAS <master-name> */
3003 sentinelRedisInstance *ri;
3004
3005 if (c->argc != 3) goto numargserr;
3006 if ((ri = sentinelGetMasterByNameOrReplyError(c,c->argv[2])) == NULL)
3007 return;
3008 addReplyDictOfRedisInstances(c,ri->slaves);
3009 } else if (!strcasecmp(c->argv[1]->ptr,"sentinels")) {
3010 /* SENTINEL SENTINELS <master-name> */
3011 sentinelRedisInstance *ri;
3012
3013 if (c->argc != 3) goto numargserr;
3014 if ((ri = sentinelGetMasterByNameOrReplyError(c,c->argv[2])) == NULL)
3015 return;
3016 addReplyDictOfRedisInstances(c,ri->sentinels);
3017 } else if (!strcasecmp(c->argv[1]->ptr,"is-master-down-by-addr")) {
3018 /* SENTINEL IS-MASTER-DOWN-BY-ADDR <ip> <port> <current-epoch> <runid>
3019 *
3020 * Arguments:
3021 *
3022 * ip and port are the ip and port of the master we want to be
3023 * checked by Sentinel. Note that the command will not check by
3024 * name but just by master, in theory different Sentinels may monitor
3025 * differnet masters with the same name.
3026 *
3027 * current-epoch is needed in order to understand if we are allowed
3028 * to vote for a failover leader or not. Each Sentinel can vote just
3029 * one time per epoch.
3030 *
3031 * runid is "*" if we are not seeking for a vote from the Sentinel
3032 * in order to elect the failover leader. Otherwise it is set to the
3033 * runid we want the Sentinel to vote if it did not already voted.
3034 */
3035 sentinelRedisInstance *ri;
3036 long long req_epoch;
3037 uint64_t leader_epoch = 0;
3038 char *leader = NULL;
3039 long port;
3040 int isdown = 0;
3041
3042 if (c->argc != 6) goto numargserr;
3043 if (getLongFromObjectOrReply(c,c->argv[3],&port,NULL) != C_OK ||
3044 getLongLongFromObjectOrReply(c,c->argv[4],&req_epoch,NULL)
3045 != C_OK)
3046 return;
3047 ri = getSentinelRedisInstanceByAddrAndRunID(sentinel.masters,
3048 c->argv[2]->ptr,port,NULL);
3049
3050 /* It exists? Is actually a master? Is subjectively down? It's down.
3051 * Note: if we are in tilt mode we always reply with "0". */
3052 if (!sentinel.tilt && ri && (ri->flags & SRI_S_DOWN) &&
3053 (ri->flags & SRI_MASTER))
3054 isdown = 1;
3055
3056 /* Vote for the master (or fetch the previous vote) if the request
3057 * includes a runid, otherwise the sender is not seeking for a vote. */
3058 if (ri && ri->flags & SRI_MASTER && strcasecmp(c->argv[5]->ptr,"*")) {
3059 leader = sentinelVoteLeader(ri,(uint64_t)req_epoch,
3060 c->argv[5]->ptr,
3061 &leader_epoch);
3062 }
3063
3064 /* Reply with a three-elements multi-bulk reply:
3065 * down state, leader, vote epoch. */
3066 addReplyMultiBulkLen(c,3);
3067 addReply(c, isdown ? shared.cone : shared.czero);
3068 addReplyBulkCString(c, leader ? leader : "*");
3069 addReplyLongLong(c, (long long)leader_epoch);
3070 if (leader) sdsfree(leader);
3071 } else if (!strcasecmp(c->argv[1]->ptr,"reset")) {
3072 /* SENTINEL RESET <pattern> */
3073 if (c->argc != 3) goto numargserr;
3074 addReplyLongLong(c,sentinelResetMastersByPattern(c->argv[2]->ptr,SENTINEL_GENERATE_EVENT));
3075 } else if (!strcasecmp(c->argv[1]->ptr,"get-master-addr-by-name")) {
3076 /* SENTINEL GET-MASTER-ADDR-BY-NAME <master-name> */
3077 sentinelRedisInstance *ri;
3078
3079 if (c->argc != 3) goto numargserr;
3080 ri = sentinelGetMasterByName(c->argv[2]->ptr);
3081 if (ri == NULL) {
3082 addReply(c,shared.nullmultibulk);
3083 } else {
3084 sentinelAddr *addr = sentinelGetCurrentMasterAddress(ri);
3085
3086 addReplyMultiBulkLen(c,2);
3087 addReplyBulkCString(c,addr->ip);
3088 addReplyBulkLongLong(c,addr->port);
3089 }
3090 } else if (!strcasecmp(c->argv[1]->ptr,"failover")) {
3091 /* SENTINEL FAILOVER <master-name> */
3092 sentinelRedisInstance *ri;
3093
3094 if (c->argc != 3) goto numargserr;
3095 if ((ri = sentinelGetMasterByNameOrReplyError(c,c->argv[2])) == NULL)
3096 return;
3097 if (ri->flags & SRI_FAILOVER_IN_PROGRESS) {
3098 addReplySds(c,sdsnew("-INPROG Failover already in progress\r\n"));
3099 return;
3100 }
3101 if (sentinelSelectSlave(ri) == NULL) {
3102 addReplySds(c,sdsnew("-NOGOODSLAVE No suitable replica to promote\r\n"));
3103 return;
3104 }
3105 serverLog(LL_WARNING,"Executing user requested FAILOVER of '%s'",
3106 ri->name);
3107 sentinelStartFailover(ri);
3108 ri->flags |= SRI_FORCE_FAILOVER;
3109 addReply(c,shared.ok);
3110 } else if (!strcasecmp(c->argv[1]->ptr,"pending-scripts")) {
3111 /* SENTINEL PENDING-SCRIPTS */
3112
3113 if (c->argc != 2) goto numargserr;
3114 sentinelPendingScriptsCommand(c);
3115 } else if (!strcasecmp(c->argv[1]->ptr,"monitor")) {
3116 /* SENTINEL MONITOR <name> <ip> <port> <quorum> */
3117 sentinelRedisInstance *ri;
3118 long quorum, port;
3119 char ip[NET_IP_STR_LEN];
3120
3121 if (c->argc != 6) goto numargserr;
3122 if (getLongFromObjectOrReply(c,c->argv[5],&quorum,"Invalid quorum")
3123 != C_OK) return;
3124 if (getLongFromObjectOrReply(c,c->argv[4],&port,"Invalid port")
3125 != C_OK) return;
3126
3127 if (quorum <= 0) {
3128 addReplyError(c, "Quorum must be 1 or greater.");
3129 return;
3130 }
3131
3132 /* Make sure the IP field is actually a valid IP before passing it
3133 * to createSentinelRedisInstance(), otherwise we may trigger a
3134 * DNS lookup at runtime. */
3135 if (anetResolveIP(NULL,c->argv[3]->ptr,ip,sizeof(ip)) == ANET_ERR) {
3136 addReplyError(c,"Invalid IP address specified");
3137 return;
3138 }
3139
3140 /* Parameters are valid. Try to create the master instance. */
3141 ri = createSentinelRedisInstance(c->argv[2]->ptr,SRI_MASTER,
3142 c->argv[3]->ptr,port,quorum,NULL);
3143 if (ri == NULL) {
3144 switch(errno) {
3145 case EBUSY:
3146 addReplyError(c,"Duplicated master name");
3147 break;
3148 case EINVAL:
3149 addReplyError(c,"Invalid port number");
3150 break;
3151 default:
3152 addReplyError(c,"Unspecified error adding the instance");
3153 break;
3154 }
3155 } else {
3156 sentinelFlushConfig();
3157 sentinelEvent(LL_WARNING,"+monitor",ri,"%@ quorum %d",ri->quorum);
3158 addReply(c,shared.ok);
3159 }
3160 } else if (!strcasecmp(c->argv[1]->ptr,"flushconfig")) {
3161 if (c->argc != 2) goto numargserr;
3162 sentinelFlushConfig();
3163 addReply(c,shared.ok);
3164 return;
3165 } else if (!strcasecmp(c->argv[1]->ptr,"remove")) {
3166 /* SENTINEL REMOVE <name> */
3167 sentinelRedisInstance *ri;
3168
3169 if (c->argc != 3) goto numargserr;
3170 if ((ri = sentinelGetMasterByNameOrReplyError(c,c->argv[2]))
3171 == NULL) return;
3172 sentinelEvent(LL_WARNING,"-monitor",ri,"%@");
3173 dictDelete(sentinel.masters,c->argv[2]->ptr);
3174 sentinelFlushConfig();
3175 addReply(c,shared.ok);
3176 } else if (!strcasecmp(c->argv[1]->ptr,"ckquorum")) {
3177 /* SENTINEL CKQUORUM <name> */
3178 sentinelRedisInstance *ri;
3179 int usable;
3180
3181 if (c->argc != 3) goto numargserr;
3182 if ((ri = sentinelGetMasterByNameOrReplyError(c,c->argv[2]))
3183 == NULL) return;
3184 int result = sentinelIsQuorumReachable(ri,&usable);
3185 if (result == SENTINEL_ISQR_OK) {
3186 addReplySds(c, sdscatfmt(sdsempty(),
3187 "+OK %i usable Sentinels. Quorum and failover authorization "
3188 "can be reached\r\n",usable));
3189 } else {
3190 sds e = sdscatfmt(sdsempty(),
3191 "-NOQUORUM %i usable Sentinels. ",usable);
3192 if (result & SENTINEL_ISQR_NOQUORUM)
3193 e = sdscat(e,"Not enough available Sentinels to reach the"
3194 " specified quorum for this master");
3195 if (result & SENTINEL_ISQR_NOAUTH) {
3196 if (result & SENTINEL_ISQR_NOQUORUM) e = sdscat(e,". ");
3197 e = sdscat(e, "Not enough available Sentinels to reach the"
3198 " majority and authorize a failover");
3199 }
3200 e = sdscat(e,"\r\n");
3201 addReplySds(c,e);
3202 }
3203 } else if (!strcasecmp(c->argv[1]->ptr,"set")) {
3204 if (c->argc < 3) goto numargserr;
3205 sentinelSetCommand(c);
3206 } else if (!strcasecmp(c->argv[1]->ptr,"info-cache")) {
3207 /* SENTINEL INFO-CACHE <name> */
3208 if (c->argc < 2) goto numargserr;
3209 mstime_t now = mstime();
3210
3211 /* Create an ad-hoc dictionary type so that we can iterate
3212 * a dictionary composed of just the master groups the user
3213 * requested. */
3214 dictType copy_keeper = instancesDictType;
3215 copy_keeper.valDestructor = NULL;
3216 dict *masters_local = sentinel.masters;
3217 if (c->argc > 2) {
3218 masters_local = dictCreate(©_keeper, NULL);
3219
3220 for (int i = 2; i < c->argc; i++) {
3221 sentinelRedisInstance *ri;
3222 ri = sentinelGetMasterByName(c->argv[i]->ptr);
3223 if (!ri) continue; /* ignore non-existing names */
3224 dictAdd(masters_local, ri->name, ri);
3225 }
3226 }
3227
3228 /* Reply format:
3229 * 1.) master name
3230 * 2.) 1.) info from master
3231 * 2.) info from replica
3232 * ...
3233 * 3.) other master name
3234 * ...
3235 */
3236 addReplyMultiBulkLen(c,dictSize(masters_local) * 2);
3237
3238 dictIterator *di;
3239 dictEntry *de;
3240 di = dictGetIterator(masters_local);
3241 while ((de = dictNext(di)) != NULL) {
3242 sentinelRedisInstance *ri = dictGetVal(de);
3243 addReplyBulkCBuffer(c,ri->name,strlen(ri->name));
3244 addReplyMultiBulkLen(c,dictSize(ri->slaves) + 1); /* +1 for self */
3245 addReplyMultiBulkLen(c,2);
3246 addReplyLongLong(c, now - ri->info_refresh);
3247 if (ri->info)
3248 addReplyBulkCBuffer(c,ri->info,sdslen(ri->info));
3249 else
3250 addReply(c,shared.nullbulk);
3251
3252 dictIterator *sdi;
3253 dictEntry *sde;
3254 sdi = dictGetIterator(ri->slaves);
3255 while ((sde = dictNext(sdi)) != NULL) {
3256 sentinelRedisInstance *sri = dictGetVal(sde);
3257 addReplyMultiBulkLen(c,2);
3258 addReplyLongLong(c, now - sri->info_refresh);
3259 if (sri->info)
3260 addReplyBulkCBuffer(c,sri->info,sdslen(sri->info));
3261 else
3262 addReply(c,shared.nullbulk);
3263 }
3264 dictReleaseIterator(sdi);
3265 }
3266 dictReleaseIterator(di);
3267 if (masters_local != sentinel.masters) dictRelease(masters_local);
3268 } else if (!strcasecmp(c->argv[1]->ptr,"simulate-failure")) {
3269 /* SENTINEL SIMULATE-FAILURE <flag> <flag> ... <flag> */
3270 int j;
3271
3272 sentinel.simfailure_flags = SENTINEL_SIMFAILURE_NONE;
3273 for (j = 2; j < c->argc; j++) {
3274 if (!strcasecmp(c->argv[j]->ptr,"crash-after-election")) {
3275 sentinel.simfailure_flags |=
3276 SENTINEL_SIMFAILURE_CRASH_AFTER_ELECTION;
3277 serverLog(LL_WARNING,"Failure simulation: this Sentinel "
3278 "will crash after being successfully elected as failover "
3279 "leader");
3280 } else if (!strcasecmp(c->argv[j]->ptr,"crash-after-promotion")) {
3281 sentinel.simfailure_flags |=
3282 SENTINEL_SIMFAILURE_CRASH_AFTER_PROMOTION;
3283 serverLog(LL_WARNING,"Failure simulation: this Sentinel "
3284 "will crash after promoting the selected replica to master");
3285 } else if (!strcasecmp(c->argv[j]->ptr,"help")) {
3286 addReplyMultiBulkLen(c,2);
3287 addReplyBulkCString(c,"crash-after-election");
3288 addReplyBulkCString(c,"crash-after-promotion");
3289 } else {
3290 addReplyError(c,"Unknown failure simulation specified");
3291 return;
3292 }
3293 }
3294 addReply(c,shared.ok);
3295 } else {
3296 addReplyErrorFormat(c,"Unknown sentinel subcommand '%s'",
3297 (char*)c->argv[1]->ptr);
3298 }
3299 return;
3300
3301 numargserr:
3302 addReplyErrorFormat(c,"Wrong number of arguments for 'sentinel %s'",
3303 (char*)c->argv[1]->ptr);
3304 }
3305
3306 #define info_section_from_redis(section_name) do { \
3307 if (defsections || allsections || !strcasecmp(section,section_name)) { \
3308 sds redissection; \
3309 if (sections++) info = sdscat(info,"\r\n"); \
3310 redissection = genRedisInfoString(section_name); \
3311 info = sdscatlen(info,redissection,sdslen(redissection)); \
3312 sdsfree(redissection); \
3313 } \
3314 } while(0)
3315
3316 /* SENTINEL INFO [section] */
sentinelInfoCommand(client * c)3317 void sentinelInfoCommand(client *c) {
3318 if (c->argc > 2) {
3319 addReply(c,shared.syntaxerr);
3320 return;
3321 }
3322
3323 int defsections = 0, allsections = 0;
3324 char *section = c->argc == 2 ? c->argv[1]->ptr : NULL;
3325 if (section) {
3326 allsections = !strcasecmp(section,"all");
3327 defsections = !strcasecmp(section,"default");
3328 } else {
3329 defsections = 1;
3330 }
3331
3332 int sections = 0;
3333 sds info = sdsempty();
3334
3335 info_section_from_redis("server");
3336 info_section_from_redis("clients");
3337 info_section_from_redis("cpu");
3338 info_section_from_redis("stats");
3339
3340 if (defsections || allsections || !strcasecmp(section,"sentinel")) {
3341 dictIterator *di;
3342 dictEntry *de;
3343 int master_id = 0;
3344
3345 if (sections++) info = sdscat(info,"\r\n");
3346 info = sdscatprintf(info,
3347 "# Sentinel\r\n"
3348 "sentinel_masters:%lu\r\n"
3349 "sentinel_tilt:%d\r\n"
3350 "sentinel_running_scripts:%d\r\n"
3351 "sentinel_scripts_queue_length:%ld\r\n"
3352 "sentinel_simulate_failure_flags:%lu\r\n",
3353 dictSize(sentinel.masters),
3354 sentinel.tilt,
3355 sentinel.running_scripts,
3356 listLength(sentinel.scripts_queue),
3357 sentinel.simfailure_flags);
3358
3359 di = dictGetIterator(sentinel.masters);
3360 while((de = dictNext(di)) != NULL) {
3361 sentinelRedisInstance *ri = dictGetVal(de);
3362 char *status = "ok";
3363
3364 if (ri->flags & SRI_O_DOWN) status = "odown";
3365 else if (ri->flags & SRI_S_DOWN) status = "sdown";
3366 info = sdscatprintf(info,
3367 "master%d:name=%s,status=%s,address=%s:%d,"
3368 "slaves=%lu,sentinels=%lu\r\n",
3369 master_id++, ri->name, status,
3370 ri->addr->ip, ri->addr->port,
3371 dictSize(ri->slaves),
3372 dictSize(ri->sentinels)+1);
3373 }
3374 dictReleaseIterator(di);
3375 }
3376
3377 addReplyBulkSds(c, info);
3378 }
3379
3380 /* Implements Sentinel version of the ROLE command. The output is
3381 * "sentinel" and the list of currently monitored master names. */
sentinelRoleCommand(client * c)3382 void sentinelRoleCommand(client *c) {
3383 dictIterator *di;
3384 dictEntry *de;
3385
3386 addReplyMultiBulkLen(c,2);
3387 addReplyBulkCBuffer(c,"sentinel",8);
3388 addReplyMultiBulkLen(c,dictSize(sentinel.masters));
3389
3390 di = dictGetIterator(sentinel.masters);
3391 while((de = dictNext(di)) != NULL) {
3392 sentinelRedisInstance *ri = dictGetVal(de);
3393
3394 addReplyBulkCString(c,ri->name);
3395 }
3396 dictReleaseIterator(di);
3397 }
3398
3399 /* SENTINEL SET <mastername> [<option> <value> ...] */
sentinelSetCommand(client * c)3400 void sentinelSetCommand(client *c) {
3401 sentinelRedisInstance *ri;
3402 int j, changes = 0;
3403 int badarg = 0; /* Bad argument position for error reporting. */
3404 char *option;
3405
3406 if ((ri = sentinelGetMasterByNameOrReplyError(c,c->argv[2]))
3407 == NULL) return;
3408
3409 /* Process option - value pairs. */
3410 for (j = 3; j < c->argc; j++) {
3411 int moreargs = (c->argc-1) - j;
3412 option = c->argv[j]->ptr;
3413 long long ll;
3414 int old_j = j; /* Used to know what to log as an event. */
3415
3416 if (!strcasecmp(option,"down-after-milliseconds") && moreargs > 0) {
3417 /* down-after-millisecodns <milliseconds> */
3418 robj *o = c->argv[++j];
3419 if (getLongLongFromObject(o,&ll) == C_ERR || ll <= 0) {
3420 badarg = j;
3421 goto badfmt;
3422 }
3423 ri->down_after_period = ll;
3424 sentinelPropagateDownAfterPeriod(ri);
3425 changes++;
3426 } else if (!strcasecmp(option,"failover-timeout") && moreargs > 0) {
3427 /* failover-timeout <milliseconds> */
3428 robj *o = c->argv[++j];
3429 if (getLongLongFromObject(o,&ll) == C_ERR || ll <= 0) {
3430 badarg = j;
3431 goto badfmt;
3432 }
3433 ri->failover_timeout = ll;
3434 changes++;
3435 } else if (!strcasecmp(option,"parallel-syncs") && moreargs > 0) {
3436 /* parallel-syncs <milliseconds> */
3437 robj *o = c->argv[++j];
3438 if (getLongLongFromObject(o,&ll) == C_ERR || ll <= 0) {
3439 badarg = j;
3440 goto badfmt;
3441 }
3442 ri->parallel_syncs = ll;
3443 changes++;
3444 } else if (!strcasecmp(option,"notification-script") && moreargs > 0) {
3445 /* notification-script <path> */
3446 char *value = c->argv[++j]->ptr;
3447 if (sentinel.deny_scripts_reconfig) {
3448 addReplyError(c,
3449 "Reconfiguration of scripts path is denied for "
3450 "security reasons. Check the deny-scripts-reconfig "
3451 "configuration directive in your Sentinel configuration");
3452 return;
3453 }
3454
3455 if (strlen(value) && access(value,X_OK) == -1) {
3456 addReplyError(c,
3457 "Notification script seems non existing or non executable");
3458 if (changes) sentinelFlushConfig();
3459 return;
3460 }
3461 sdsfree(ri->notification_script);
3462 ri->notification_script = strlen(value) ? sdsnew(value) : NULL;
3463 changes++;
3464 } else if (!strcasecmp(option,"client-reconfig-script") && moreargs > 0) {
3465 /* client-reconfig-script <path> */
3466 char *value = c->argv[++j]->ptr;
3467 if (sentinel.deny_scripts_reconfig) {
3468 addReplyError(c,
3469 "Reconfiguration of scripts path is denied for "
3470 "security reasons. Check the deny-scripts-reconfig "
3471 "configuration directive in your Sentinel configuration");
3472 return;
3473 }
3474
3475 if (strlen(value) && access(value,X_OK) == -1) {
3476 addReplyError(c,
3477 "Client reconfiguration script seems non existing or "
3478 "non executable");
3479 if (changes) sentinelFlushConfig();
3480 return;
3481 }
3482 sdsfree(ri->client_reconfig_script);
3483 ri->client_reconfig_script = strlen(value) ? sdsnew(value) : NULL;
3484 changes++;
3485 } else if (!strcasecmp(option,"auth-pass") && moreargs > 0) {
3486 /* auth-pass <password> */
3487 char *value = c->argv[++j]->ptr;
3488 sdsfree(ri->auth_pass);
3489 ri->auth_pass = strlen(value) ? sdsnew(value) : NULL;
3490 changes++;
3491 } else if (!strcasecmp(option,"quorum") && moreargs > 0) {
3492 /* quorum <count> */
3493 robj *o = c->argv[++j];
3494 if (getLongLongFromObject(o,&ll) == C_ERR || ll <= 0) {
3495 badarg = j;
3496 goto badfmt;
3497 }
3498 ri->quorum = ll;
3499 changes++;
3500 } else if (!strcasecmp(option,"rename-command") && moreargs > 1) {
3501 /* rename-command <oldname> <newname> */
3502 sds oldname = c->argv[++j]->ptr;
3503 sds newname = c->argv[++j]->ptr;
3504
3505 if ((sdslen(oldname) == 0) || (sdslen(newname) == 0)) {
3506 badarg = sdslen(newname) ? j-1 : j;
3507 goto badfmt;
3508 }
3509
3510 /* Remove any older renaming for this command. */
3511 dictDelete(ri->renamed_commands,oldname);
3512
3513 /* If the target name is the same as the source name there
3514 * is no need to add an entry mapping to itself. */
3515 if (!dictSdsKeyCaseCompare(NULL,oldname,newname)) {
3516 oldname = sdsdup(oldname);
3517 newname = sdsdup(newname);
3518 dictAdd(ri->renamed_commands,oldname,newname);
3519 }
3520 changes++;
3521 } else {
3522 addReplyErrorFormat(c,"Unknown option or number of arguments for "
3523 "SENTINEL SET '%s'", option);
3524 if (changes) sentinelFlushConfig();
3525 return;
3526 }
3527
3528 /* Log the event. */
3529 int numargs = j-old_j+1;
3530 switch(numargs) {
3531 case 2:
3532 sentinelEvent(LL_WARNING,"+set",ri,"%@ %s %s",c->argv[old_j]->ptr,
3533 c->argv[old_j+1]->ptr);
3534 break;
3535 case 3:
3536 sentinelEvent(LL_WARNING,"+set",ri,"%@ %s %s %s",c->argv[old_j]->ptr,
3537 c->argv[old_j+1]->ptr,
3538 c->argv[old_j+2]->ptr);
3539 break;
3540 default:
3541 sentinelEvent(LL_WARNING,"+set",ri,"%@ %s",c->argv[old_j]->ptr);
3542 break;
3543 }
3544 }
3545
3546 if (changes) sentinelFlushConfig();
3547 addReply(c,shared.ok);
3548 return;
3549
3550 badfmt: /* Bad format errors */
3551 if (changes) sentinelFlushConfig();
3552 addReplyErrorFormat(c,"Invalid argument '%s' for SENTINEL SET '%s'",
3553 (char*)c->argv[badarg]->ptr,option);
3554 }
3555
3556 /* Our fake PUBLISH command: it is actually useful only to receive hello messages
3557 * from the other sentinel instances, and publishing to a channel other than
3558 * SENTINEL_HELLO_CHANNEL is forbidden.
3559 *
3560 * Because we have a Sentinel PUBLISH, the code to send hello messages is the same
3561 * for all the three kind of instances: masters, slaves, sentinels. */
sentinelPublishCommand(client * c)3562 void sentinelPublishCommand(client *c) {
3563 if (strcmp(c->argv[1]->ptr,SENTINEL_HELLO_CHANNEL)) {
3564 addReplyError(c, "Only HELLO messages are accepted by Sentinel instances.");
3565 return;
3566 }
3567 sentinelProcessHelloMessage(c->argv[2]->ptr,sdslen(c->argv[2]->ptr));
3568 addReplyLongLong(c,1);
3569 }
3570
3571 /* ===================== SENTINEL availability checks ======================= */
3572
3573 /* Is this instance down from our point of view? */
sentinelCheckSubjectivelyDown(sentinelRedisInstance * ri)3574 void sentinelCheckSubjectivelyDown(sentinelRedisInstance *ri) {
3575 mstime_t elapsed = 0;
3576
3577 if (ri->link->act_ping_time)
3578 elapsed = mstime() - ri->link->act_ping_time;
3579 else if (ri->link->disconnected)
3580 elapsed = mstime() - ri->link->last_avail_time;
3581
3582 /* Check if we are in need for a reconnection of one of the
3583 * links, because we are detecting low activity.
3584 *
3585 * 1) Check if the command link seems connected, was connected not less
3586 * than SENTINEL_MIN_LINK_RECONNECT_PERIOD, but still we have a
3587 * pending ping for more than half the timeout. */
3588 if (ri->link->cc &&
3589 (mstime() - ri->link->cc_conn_time) >
3590 SENTINEL_MIN_LINK_RECONNECT_PERIOD &&
3591 ri->link->act_ping_time != 0 && /* There is a pending ping... */
3592 /* The pending ping is delayed, and we did not receive
3593 * error replies as well. */
3594 (mstime() - ri->link->act_ping_time) > (ri->down_after_period/2) &&
3595 (mstime() - ri->link->last_pong_time) > (ri->down_after_period/2))
3596 {
3597 instanceLinkCloseConnection(ri->link,ri->link->cc);
3598 }
3599
3600 /* 2) Check if the pubsub link seems connected, was connected not less
3601 * than SENTINEL_MIN_LINK_RECONNECT_PERIOD, but still we have no
3602 * activity in the Pub/Sub channel for more than
3603 * SENTINEL_PUBLISH_PERIOD * 3.
3604 */
3605 if (ri->link->pc &&
3606 (mstime() - ri->link->pc_conn_time) >
3607 SENTINEL_MIN_LINK_RECONNECT_PERIOD &&
3608 (mstime() - ri->link->pc_last_activity) > (SENTINEL_PUBLISH_PERIOD*3))
3609 {
3610 instanceLinkCloseConnection(ri->link,ri->link->pc);
3611 }
3612
3613 /* Update the SDOWN flag. We believe the instance is SDOWN if:
3614 *
3615 * 1) It is not replying.
3616 * 2) We believe it is a master, it reports to be a slave for enough time
3617 * to meet the down_after_period, plus enough time to get two times
3618 * INFO report from the instance. */
3619 if (elapsed > ri->down_after_period ||
3620 (ri->flags & SRI_MASTER &&
3621 ri->role_reported == SRI_SLAVE &&
3622 mstime() - ri->role_reported_time >
3623 (ri->down_after_period+SENTINEL_INFO_PERIOD*2)))
3624 {
3625 /* Is subjectively down */
3626 if ((ri->flags & SRI_S_DOWN) == 0) {
3627 sentinelEvent(LL_WARNING,"+sdown",ri,"%@");
3628 ri->s_down_since_time = mstime();
3629 ri->flags |= SRI_S_DOWN;
3630 }
3631 } else {
3632 /* Is subjectively up */
3633 if (ri->flags & SRI_S_DOWN) {
3634 sentinelEvent(LL_WARNING,"-sdown",ri,"%@");
3635 ri->flags &= ~(SRI_S_DOWN|SRI_SCRIPT_KILL_SENT);
3636 }
3637 }
3638 }
3639
3640 /* Is this instance down according to the configured quorum?
3641 *
3642 * Note that ODOWN is a weak quorum, it only means that enough Sentinels
3643 * reported in a given time range that the instance was not reachable.
3644 * However messages can be delayed so there are no strong guarantees about
3645 * N instances agreeing at the same time about the down state. */
sentinelCheckObjectivelyDown(sentinelRedisInstance * master)3646 void sentinelCheckObjectivelyDown(sentinelRedisInstance *master) {
3647 dictIterator *di;
3648 dictEntry *de;
3649 unsigned int quorum = 0, odown = 0;
3650
3651 if (master->flags & SRI_S_DOWN) {
3652 /* Is down for enough sentinels? */
3653 quorum = 1; /* the current sentinel. */
3654 /* Count all the other sentinels. */
3655 di = dictGetIterator(master->sentinels);
3656 while((de = dictNext(di)) != NULL) {
3657 sentinelRedisInstance *ri = dictGetVal(de);
3658
3659 if (ri->flags & SRI_MASTER_DOWN) quorum++;
3660 }
3661 dictReleaseIterator(di);
3662 if (quorum >= master->quorum) odown = 1;
3663 }
3664
3665 /* Set the flag accordingly to the outcome. */
3666 if (odown) {
3667 if ((master->flags & SRI_O_DOWN) == 0) {
3668 sentinelEvent(LL_WARNING,"+odown",master,"%@ #quorum %d/%d",
3669 quorum, master->quorum);
3670 master->flags |= SRI_O_DOWN;
3671 master->o_down_since_time = mstime();
3672 }
3673 } else {
3674 if (master->flags & SRI_O_DOWN) {
3675 sentinelEvent(LL_WARNING,"-odown",master,"%@");
3676 master->flags &= ~SRI_O_DOWN;
3677 }
3678 }
3679 }
3680
3681 /* Receive the SENTINEL is-master-down-by-addr reply, see the
3682 * sentinelAskMasterStateToOtherSentinels() function for more information. */
sentinelReceiveIsMasterDownReply(redisAsyncContext * c,void * reply,void * privdata)3683 void sentinelReceiveIsMasterDownReply(redisAsyncContext *c, void *reply, void *privdata) {
3684 sentinelRedisInstance *ri = privdata;
3685 instanceLink *link = c->data;
3686 redisReply *r;
3687
3688 if (!reply || !link) return;
3689 link->pending_commands--;
3690 r = reply;
3691
3692 /* Ignore every error or unexpected reply.
3693 * Note that if the command returns an error for any reason we'll
3694 * end clearing the SRI_MASTER_DOWN flag for timeout anyway. */
3695 if (r->type == REDIS_REPLY_ARRAY && r->elements == 3 &&
3696 r->element[0]->type == REDIS_REPLY_INTEGER &&
3697 r->element[1]->type == REDIS_REPLY_STRING &&
3698 r->element[2]->type == REDIS_REPLY_INTEGER)
3699 {
3700 ri->last_master_down_reply_time = mstime();
3701 if (r->element[0]->integer == 1) {
3702 ri->flags |= SRI_MASTER_DOWN;
3703 } else {
3704 ri->flags &= ~SRI_MASTER_DOWN;
3705 }
3706 if (strcmp(r->element[1]->str,"*")) {
3707 /* If the runid in the reply is not "*" the Sentinel actually
3708 * replied with a vote. */
3709 sdsfree(ri->leader);
3710 if ((long long)ri->leader_epoch != r->element[2]->integer)
3711 serverLog(LL_WARNING,
3712 "%s voted for %s %llu", ri->name,
3713 r->element[1]->str,
3714 (unsigned long long) r->element[2]->integer);
3715 ri->leader = sdsnew(r->element[1]->str);
3716 ri->leader_epoch = r->element[2]->integer;
3717 }
3718 }
3719 }
3720
3721 /* If we think the master is down, we start sending
3722 * SENTINEL IS-MASTER-DOWN-BY-ADDR requests to other sentinels
3723 * in order to get the replies that allow to reach the quorum
3724 * needed to mark the master in ODOWN state and trigger a failover. */
3725 #define SENTINEL_ASK_FORCED (1<<0)
sentinelAskMasterStateToOtherSentinels(sentinelRedisInstance * master,int flags)3726 void sentinelAskMasterStateToOtherSentinels(sentinelRedisInstance *master, int flags) {
3727 dictIterator *di;
3728 dictEntry *de;
3729
3730 di = dictGetIterator(master->sentinels);
3731 while((de = dictNext(di)) != NULL) {
3732 sentinelRedisInstance *ri = dictGetVal(de);
3733 mstime_t elapsed = mstime() - ri->last_master_down_reply_time;
3734 char port[32];
3735 int retval;
3736
3737 /* If the master state from other sentinel is too old, we clear it. */
3738 if (elapsed > SENTINEL_ASK_PERIOD*5) {
3739 ri->flags &= ~SRI_MASTER_DOWN;
3740 sdsfree(ri->leader);
3741 ri->leader = NULL;
3742 }
3743
3744 /* Only ask if master is down to other sentinels if:
3745 *
3746 * 1) We believe it is down, or there is a failover in progress.
3747 * 2) Sentinel is connected.
3748 * 3) We did not receive the info within SENTINEL_ASK_PERIOD ms. */
3749 if ((master->flags & SRI_S_DOWN) == 0) continue;
3750 if (ri->link->disconnected) continue;
3751 if (!(flags & SENTINEL_ASK_FORCED) &&
3752 mstime() - ri->last_master_down_reply_time < SENTINEL_ASK_PERIOD)
3753 continue;
3754
3755 /* Ask */
3756 ll2string(port,sizeof(port),master->addr->port);
3757 retval = redisAsyncCommand(ri->link->cc,
3758 sentinelReceiveIsMasterDownReply, ri,
3759 "%s is-master-down-by-addr %s %s %llu %s",
3760 sentinelInstanceMapCommand(ri,"SENTINEL"),
3761 master->addr->ip, port,
3762 sentinel.current_epoch,
3763 (master->failover_state > SENTINEL_FAILOVER_STATE_NONE) ?
3764 sentinel.myid : "*");
3765 if (retval == C_OK) ri->link->pending_commands++;
3766 }
3767 dictReleaseIterator(di);
3768 }
3769
3770 /* =============================== FAILOVER ================================= */
3771
3772 /* Crash because of user request via SENTINEL simulate-failure command. */
sentinelSimFailureCrash(void)3773 void sentinelSimFailureCrash(void) {
3774 serverLog(LL_WARNING,
3775 "Sentinel CRASH because of SENTINEL simulate-failure");
3776 exit(99);
3777 }
3778
3779 /* Vote for the sentinel with 'req_runid' or return the old vote if already
3780 * voted for the specified 'req_epoch' or one greater.
3781 *
3782 * If a vote is not available returns NULL, otherwise return the Sentinel
3783 * runid and populate the leader_epoch with the epoch of the vote. */
sentinelVoteLeader(sentinelRedisInstance * master,uint64_t req_epoch,char * req_runid,uint64_t * leader_epoch)3784 char *sentinelVoteLeader(sentinelRedisInstance *master, uint64_t req_epoch, char *req_runid, uint64_t *leader_epoch) {
3785 if (req_epoch > sentinel.current_epoch) {
3786 sentinel.current_epoch = req_epoch;
3787 sentinelFlushConfig();
3788 sentinelEvent(LL_WARNING,"+new-epoch",master,"%llu",
3789 (unsigned long long) sentinel.current_epoch);
3790 }
3791
3792 if (master->leader_epoch < req_epoch && sentinel.current_epoch <= req_epoch)
3793 {
3794 sdsfree(master->leader);
3795 master->leader = sdsnew(req_runid);
3796 master->leader_epoch = sentinel.current_epoch;
3797 sentinelFlushConfig();
3798 sentinelEvent(LL_WARNING,"+vote-for-leader",master,"%s %llu",
3799 master->leader, (unsigned long long) master->leader_epoch);
3800 /* If we did not voted for ourselves, set the master failover start
3801 * time to now, in order to force a delay before we can start a
3802 * failover for the same master. */
3803 if (strcasecmp(master->leader,sentinel.myid))
3804 master->failover_start_time = mstime()+rand()%SENTINEL_MAX_DESYNC;
3805 }
3806
3807 *leader_epoch = master->leader_epoch;
3808 return master->leader ? sdsnew(master->leader) : NULL;
3809 }
3810
3811 struct sentinelLeader {
3812 char *runid;
3813 unsigned long votes;
3814 };
3815
3816 /* Helper function for sentinelGetLeader, increment the counter
3817 * relative to the specified runid. */
sentinelLeaderIncr(dict * counters,char * runid)3818 int sentinelLeaderIncr(dict *counters, char *runid) {
3819 dictEntry *existing, *de;
3820 uint64_t oldval;
3821
3822 de = dictAddRaw(counters,runid,&existing);
3823 if (existing) {
3824 oldval = dictGetUnsignedIntegerVal(existing);
3825 dictSetUnsignedIntegerVal(existing,oldval+1);
3826 return oldval+1;
3827 } else {
3828 serverAssert(de != NULL);
3829 dictSetUnsignedIntegerVal(de,1);
3830 return 1;
3831 }
3832 }
3833
3834 /* Scan all the Sentinels attached to this master to check if there
3835 * is a leader for the specified epoch.
3836 *
3837 * To be a leader for a given epoch, we should have the majority of
3838 * the Sentinels we know (ever seen since the last SENTINEL RESET) that
3839 * reported the same instance as leader for the same epoch. */
sentinelGetLeader(sentinelRedisInstance * master,uint64_t epoch)3840 char *sentinelGetLeader(sentinelRedisInstance *master, uint64_t epoch) {
3841 dict *counters;
3842 dictIterator *di;
3843 dictEntry *de;
3844 unsigned int voters = 0, voters_quorum;
3845 char *myvote;
3846 char *winner = NULL;
3847 uint64_t leader_epoch;
3848 uint64_t max_votes = 0;
3849
3850 serverAssert(master->flags & (SRI_O_DOWN|SRI_FAILOVER_IN_PROGRESS));
3851 counters = dictCreate(&leaderVotesDictType,NULL);
3852
3853 voters = dictSize(master->sentinels)+1; /* All the other sentinels and me.*/
3854
3855 /* Count other sentinels votes */
3856 di = dictGetIterator(master->sentinels);
3857 while((de = dictNext(di)) != NULL) {
3858 sentinelRedisInstance *ri = dictGetVal(de);
3859 if (ri->leader != NULL && ri->leader_epoch == sentinel.current_epoch)
3860 sentinelLeaderIncr(counters,ri->leader);
3861 }
3862 dictReleaseIterator(di);
3863
3864 /* Check what's the winner. For the winner to win, it needs two conditions:
3865 * 1) Absolute majority between voters (50% + 1).
3866 * 2) And anyway at least master->quorum votes. */
3867 di = dictGetIterator(counters);
3868 while((de = dictNext(di)) != NULL) {
3869 uint64_t votes = dictGetUnsignedIntegerVal(de);
3870
3871 if (votes > max_votes) {
3872 max_votes = votes;
3873 winner = dictGetKey(de);
3874 }
3875 }
3876 dictReleaseIterator(di);
3877
3878 /* Count this Sentinel vote:
3879 * if this Sentinel did not voted yet, either vote for the most
3880 * common voted sentinel, or for itself if no vote exists at all. */
3881 if (winner)
3882 myvote = sentinelVoteLeader(master,epoch,winner,&leader_epoch);
3883 else
3884 myvote = sentinelVoteLeader(master,epoch,sentinel.myid,&leader_epoch);
3885
3886 if (myvote && leader_epoch == epoch) {
3887 uint64_t votes = sentinelLeaderIncr(counters,myvote);
3888
3889 if (votes > max_votes) {
3890 max_votes = votes;
3891 winner = myvote;
3892 }
3893 }
3894
3895 voters_quorum = voters/2+1;
3896 if (winner && (max_votes < voters_quorum || max_votes < master->quorum))
3897 winner = NULL;
3898
3899 winner = winner ? sdsnew(winner) : NULL;
3900 sdsfree(myvote);
3901 dictRelease(counters);
3902 return winner;
3903 }
3904
3905 /* Send SLAVEOF to the specified instance, always followed by a
3906 * CONFIG REWRITE command in order to store the new configuration on disk
3907 * when possible (that is, if the Redis instance is recent enough to support
3908 * config rewriting, and if the server was started with a configuration file).
3909 *
3910 * If Host is NULL the function sends "SLAVEOF NO ONE".
3911 *
3912 * The command returns C_OK if the SLAVEOF command was accepted for
3913 * (later) delivery otherwise C_ERR. The command replies are just
3914 * discarded. */
sentinelSendSlaveOf(sentinelRedisInstance * ri,char * host,int port)3915 int sentinelSendSlaveOf(sentinelRedisInstance *ri, char *host, int port) {
3916 char portstr[32];
3917 int retval;
3918
3919 ll2string(portstr,sizeof(portstr),port);
3920
3921 /* If host is NULL we send SLAVEOF NO ONE that will turn the instance
3922 * into a master. */
3923 if (host == NULL) {
3924 host = "NO";
3925 memcpy(portstr,"ONE",4);
3926 }
3927
3928 /* In order to send SLAVEOF in a safe way, we send a transaction performing
3929 * the following tasks:
3930 * 1) Reconfigure the instance according to the specified host/port params.
3931 * 2) Rewrite the configuration.
3932 * 3) Disconnect all clients (but this one sending the commnad) in order
3933 * to trigger the ask-master-on-reconnection protocol for connected
3934 * clients.
3935 *
3936 * Note that we don't check the replies returned by commands, since we
3937 * will observe instead the effects in the next INFO output. */
3938 retval = redisAsyncCommand(ri->link->cc,
3939 sentinelDiscardReplyCallback, ri, "%s",
3940 sentinelInstanceMapCommand(ri,"MULTI"));
3941 if (retval == C_ERR) return retval;
3942 ri->link->pending_commands++;
3943
3944 retval = redisAsyncCommand(ri->link->cc,
3945 sentinelDiscardReplyCallback, ri, "%s %s %s",
3946 sentinelInstanceMapCommand(ri,"SLAVEOF"),
3947 host, portstr);
3948 if (retval == C_ERR) return retval;
3949 ri->link->pending_commands++;
3950
3951 retval = redisAsyncCommand(ri->link->cc,
3952 sentinelDiscardReplyCallback, ri, "%s REWRITE",
3953 sentinelInstanceMapCommand(ri,"CONFIG"));
3954 if (retval == C_ERR) return retval;
3955 ri->link->pending_commands++;
3956
3957 /* CLIENT KILL TYPE <type> is only supported starting from Redis 2.8.12,
3958 * however sending it to an instance not understanding this command is not
3959 * an issue because CLIENT is variadic command, so Redis will not
3960 * recognized as a syntax error, and the transaction will not fail (but
3961 * only the unsupported command will fail). */
3962 retval = redisAsyncCommand(ri->link->cc,
3963 sentinelDiscardReplyCallback, ri, "%s KILL TYPE normal",
3964 sentinelInstanceMapCommand(ri,"CLIENT"));
3965 if (retval == C_ERR) return retval;
3966 ri->link->pending_commands++;
3967
3968 retval = redisAsyncCommand(ri->link->cc,
3969 sentinelDiscardReplyCallback, ri, "%s",
3970 sentinelInstanceMapCommand(ri,"EXEC"));
3971 if (retval == C_ERR) return retval;
3972 ri->link->pending_commands++;
3973
3974 return C_OK;
3975 }
3976
3977 /* Setup the master state to start a failover. */
sentinelStartFailover(sentinelRedisInstance * master)3978 void sentinelStartFailover(sentinelRedisInstance *master) {
3979 serverAssert(master->flags & SRI_MASTER);
3980
3981 master->failover_state = SENTINEL_FAILOVER_STATE_WAIT_START;
3982 master->flags |= SRI_FAILOVER_IN_PROGRESS;
3983 master->failover_epoch = ++sentinel.current_epoch;
3984 sentinelEvent(LL_WARNING,"+new-epoch",master,"%llu",
3985 (unsigned long long) sentinel.current_epoch);
3986 sentinelEvent(LL_WARNING,"+try-failover",master,"%@");
3987 master->failover_start_time = mstime()+rand()%SENTINEL_MAX_DESYNC;
3988 master->failover_state_change_time = mstime();
3989 }
3990
3991 /* This function checks if there are the conditions to start the failover,
3992 * that is:
3993 *
3994 * 1) Master must be in ODOWN condition.
3995 * 2) No failover already in progress.
3996 * 3) No failover already attempted recently.
3997 *
3998 * We still don't know if we'll win the election so it is possible that we
3999 * start the failover but that we'll not be able to act.
4000 *
4001 * Return non-zero if a failover was started. */
sentinelStartFailoverIfNeeded(sentinelRedisInstance * master)4002 int sentinelStartFailoverIfNeeded(sentinelRedisInstance *master) {
4003 /* We can't failover if the master is not in O_DOWN state. */
4004 if (!(master->flags & SRI_O_DOWN)) return 0;
4005
4006 /* Failover already in progress? */
4007 if (master->flags & SRI_FAILOVER_IN_PROGRESS) return 0;
4008
4009 /* Last failover attempt started too little time ago? */
4010 if (mstime() - master->failover_start_time <
4011 master->failover_timeout*2)
4012 {
4013 if (master->failover_delay_logged != master->failover_start_time) {
4014 time_t clock = (master->failover_start_time +
4015 master->failover_timeout*2) / 1000;
4016 char ctimebuf[26];
4017
4018 ctime_r(&clock,ctimebuf);
4019 ctimebuf[24] = '\0'; /* Remove newline. */
4020 master->failover_delay_logged = master->failover_start_time;
4021 serverLog(LL_WARNING,
4022 "Next failover delay: I will not start a failover before %s",
4023 ctimebuf);
4024 }
4025 return 0;
4026 }
4027
4028 sentinelStartFailover(master);
4029 return 1;
4030 }
4031
4032 /* Select a suitable slave to promote. The current algorithm only uses
4033 * the following parameters:
4034 *
4035 * 1) None of the following conditions: S_DOWN, O_DOWN, DISCONNECTED.
4036 * 2) Last time the slave replied to ping no more than 5 times the PING period.
4037 * 3) info_refresh not older than 3 times the INFO refresh period.
4038 * 4) master_link_down_time no more than:
4039 * (now - master->s_down_since_time) + (master->down_after_period * 10).
4040 * Basically since the master is down from our POV, the slave reports
4041 * to be disconnected no more than 10 times the configured down-after-period.
4042 * This is pretty much black magic but the idea is, the master was not
4043 * available so the slave may be lagging, but not over a certain time.
4044 * Anyway we'll select the best slave according to replication offset.
4045 * 5) Slave priority can't be zero, otherwise the slave is discarded.
4046 *
4047 * Among all the slaves matching the above conditions we select the slave
4048 * with, in order of sorting key:
4049 *
4050 * - lower slave_priority.
4051 * - bigger processed replication offset.
4052 * - lexicographically smaller runid.
4053 *
4054 * Basically if runid is the same, the slave that processed more commands
4055 * from the master is selected.
4056 *
4057 * The function returns the pointer to the selected slave, otherwise
4058 * NULL if no suitable slave was found.
4059 */
4060
4061 /* Helper for sentinelSelectSlave(). This is used by qsort() in order to
4062 * sort suitable slaves in a "better first" order, to take the first of
4063 * the list. */
compareSlavesForPromotion(const void * a,const void * b)4064 int compareSlavesForPromotion(const void *a, const void *b) {
4065 sentinelRedisInstance **sa = (sentinelRedisInstance **)a,
4066 **sb = (sentinelRedisInstance **)b;
4067 char *sa_runid, *sb_runid;
4068
4069 if ((*sa)->slave_priority != (*sb)->slave_priority)
4070 return (*sa)->slave_priority - (*sb)->slave_priority;
4071
4072 /* If priority is the same, select the slave with greater replication
4073 * offset (processed more data from the master). */
4074 if ((*sa)->slave_repl_offset > (*sb)->slave_repl_offset) {
4075 return -1; /* a < b */
4076 } else if ((*sa)->slave_repl_offset < (*sb)->slave_repl_offset) {
4077 return 1; /* a > b */
4078 }
4079
4080 /* If the replication offset is the same select the slave with that has
4081 * the lexicographically smaller runid. Note that we try to handle runid
4082 * == NULL as there are old Redis versions that don't publish runid in
4083 * INFO. A NULL runid is considered bigger than any other runid. */
4084 sa_runid = (*sa)->runid;
4085 sb_runid = (*sb)->runid;
4086 if (sa_runid == NULL && sb_runid == NULL) return 0;
4087 else if (sa_runid == NULL) return 1; /* a > b */
4088 else if (sb_runid == NULL) return -1; /* a < b */
4089 return strcasecmp(sa_runid, sb_runid);
4090 }
4091
sentinelSelectSlave(sentinelRedisInstance * master)4092 sentinelRedisInstance *sentinelSelectSlave(sentinelRedisInstance *master) {
4093 sentinelRedisInstance **instance =
4094 zmalloc(sizeof(instance[0])*dictSize(master->slaves));
4095 sentinelRedisInstance *selected = NULL;
4096 int instances = 0;
4097 dictIterator *di;
4098 dictEntry *de;
4099 mstime_t max_master_down_time = 0;
4100
4101 if (master->flags & SRI_S_DOWN)
4102 max_master_down_time += mstime() - master->s_down_since_time;
4103 max_master_down_time += master->down_after_period * 10;
4104
4105 di = dictGetIterator(master->slaves);
4106 while((de = dictNext(di)) != NULL) {
4107 sentinelRedisInstance *slave = dictGetVal(de);
4108 mstime_t info_validity_time;
4109
4110 if (slave->flags & (SRI_S_DOWN|SRI_O_DOWN)) continue;
4111 if (slave->link->disconnected) continue;
4112 if (mstime() - slave->link->last_avail_time > SENTINEL_PING_PERIOD*5) continue;
4113 if (slave->slave_priority == 0) continue;
4114
4115 /* If the master is in SDOWN state we get INFO for slaves every second.
4116 * Otherwise we get it with the usual period so we need to account for
4117 * a larger delay. */
4118 if (master->flags & SRI_S_DOWN)
4119 info_validity_time = SENTINEL_PING_PERIOD*5;
4120 else
4121 info_validity_time = SENTINEL_INFO_PERIOD*3;
4122 if (mstime() - slave->info_refresh > info_validity_time) continue;
4123 if (slave->master_link_down_time > max_master_down_time) continue;
4124 instance[instances++] = slave;
4125 }
4126 dictReleaseIterator(di);
4127 if (instances) {
4128 qsort(instance,instances,sizeof(sentinelRedisInstance*),
4129 compareSlavesForPromotion);
4130 selected = instance[0];
4131 }
4132 zfree(instance);
4133 return selected;
4134 }
4135
4136 /* ---------------- Failover state machine implementation ------------------- */
sentinelFailoverWaitStart(sentinelRedisInstance * ri)4137 void sentinelFailoverWaitStart(sentinelRedisInstance *ri) {
4138 char *leader;
4139 int isleader;
4140
4141 /* Check if we are the leader for the failover epoch. */
4142 leader = sentinelGetLeader(ri, ri->failover_epoch);
4143 isleader = leader && strcasecmp(leader,sentinel.myid) == 0;
4144 sdsfree(leader);
4145
4146 /* If I'm not the leader, and it is not a forced failover via
4147 * SENTINEL FAILOVER, then I can't continue with the failover. */
4148 if (!isleader && !(ri->flags & SRI_FORCE_FAILOVER)) {
4149 int election_timeout = SENTINEL_ELECTION_TIMEOUT;
4150
4151 /* The election timeout is the MIN between SENTINEL_ELECTION_TIMEOUT
4152 * and the configured failover timeout. */
4153 if (election_timeout > ri->failover_timeout)
4154 election_timeout = ri->failover_timeout;
4155 /* Abort the failover if I'm not the leader after some time. */
4156 if (mstime() - ri->failover_start_time > election_timeout) {
4157 sentinelEvent(LL_WARNING,"-failover-abort-not-elected",ri,"%@");
4158 sentinelAbortFailover(ri);
4159 }
4160 return;
4161 }
4162 sentinelEvent(LL_WARNING,"+elected-leader",ri,"%@");
4163 if (sentinel.simfailure_flags & SENTINEL_SIMFAILURE_CRASH_AFTER_ELECTION)
4164 sentinelSimFailureCrash();
4165 ri->failover_state = SENTINEL_FAILOVER_STATE_SELECT_SLAVE;
4166 ri->failover_state_change_time = mstime();
4167 sentinelEvent(LL_WARNING,"+failover-state-select-slave",ri,"%@");
4168 }
4169
sentinelFailoverSelectSlave(sentinelRedisInstance * ri)4170 void sentinelFailoverSelectSlave(sentinelRedisInstance *ri) {
4171 sentinelRedisInstance *slave = sentinelSelectSlave(ri);
4172
4173 /* We don't handle the timeout in this state as the function aborts
4174 * the failover or go forward in the next state. */
4175 if (slave == NULL) {
4176 sentinelEvent(LL_WARNING,"-failover-abort-no-good-slave",ri,"%@");
4177 sentinelAbortFailover(ri);
4178 } else {
4179 sentinelEvent(LL_WARNING,"+selected-slave",slave,"%@");
4180 slave->flags |= SRI_PROMOTED;
4181 ri->promoted_slave = slave;
4182 ri->failover_state = SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE;
4183 ri->failover_state_change_time = mstime();
4184 sentinelEvent(LL_NOTICE,"+failover-state-send-slaveof-noone",
4185 slave, "%@");
4186 }
4187 }
4188
sentinelFailoverSendSlaveOfNoOne(sentinelRedisInstance * ri)4189 void sentinelFailoverSendSlaveOfNoOne(sentinelRedisInstance *ri) {
4190 int retval;
4191
4192 /* We can't send the command to the promoted slave if it is now
4193 * disconnected. Retry again and again with this state until the timeout
4194 * is reached, then abort the failover. */
4195 if (ri->promoted_slave->link->disconnected) {
4196 if (mstime() - ri->failover_state_change_time > ri->failover_timeout) {
4197 sentinelEvent(LL_WARNING,"-failover-abort-slave-timeout",ri,"%@");
4198 sentinelAbortFailover(ri);
4199 }
4200 return;
4201 }
4202
4203 /* Send SLAVEOF NO ONE command to turn the slave into a master.
4204 * We actually register a generic callback for this command as we don't
4205 * really care about the reply. We check if it worked indirectly observing
4206 * if INFO returns a different role (master instead of slave). */
4207 retval = sentinelSendSlaveOf(ri->promoted_slave,NULL,0);
4208 if (retval != C_OK) return;
4209 sentinelEvent(LL_NOTICE, "+failover-state-wait-promotion",
4210 ri->promoted_slave,"%@");
4211 ri->failover_state = SENTINEL_FAILOVER_STATE_WAIT_PROMOTION;
4212 ri->failover_state_change_time = mstime();
4213 }
4214
4215 /* We actually wait for promotion indirectly checking with INFO when the
4216 * slave turns into a master. */
sentinelFailoverWaitPromotion(sentinelRedisInstance * ri)4217 void sentinelFailoverWaitPromotion(sentinelRedisInstance *ri) {
4218 /* Just handle the timeout. Switching to the next state is handled
4219 * by the function parsing the INFO command of the promoted slave. */
4220 if (mstime() - ri->failover_state_change_time > ri->failover_timeout) {
4221 sentinelEvent(LL_WARNING,"-failover-abort-slave-timeout",ri,"%@");
4222 sentinelAbortFailover(ri);
4223 }
4224 }
4225
sentinelFailoverDetectEnd(sentinelRedisInstance * master)4226 void sentinelFailoverDetectEnd(sentinelRedisInstance *master) {
4227 int not_reconfigured = 0, timeout = 0;
4228 dictIterator *di;
4229 dictEntry *de;
4230 mstime_t elapsed = mstime() - master->failover_state_change_time;
4231
4232 /* We can't consider failover finished if the promoted slave is
4233 * not reachable. */
4234 if (master->promoted_slave == NULL ||
4235 master->promoted_slave->flags & SRI_S_DOWN) return;
4236
4237 /* The failover terminates once all the reachable slaves are properly
4238 * configured. */
4239 di = dictGetIterator(master->slaves);
4240 while((de = dictNext(di)) != NULL) {
4241 sentinelRedisInstance *slave = dictGetVal(de);
4242
4243 if (slave->flags & (SRI_PROMOTED|SRI_RECONF_DONE)) continue;
4244 if (slave->flags & SRI_S_DOWN) continue;
4245 not_reconfigured++;
4246 }
4247 dictReleaseIterator(di);
4248
4249 /* Force end of failover on timeout. */
4250 if (elapsed > master->failover_timeout) {
4251 not_reconfigured = 0;
4252 timeout = 1;
4253 sentinelEvent(LL_WARNING,"+failover-end-for-timeout",master,"%@");
4254 }
4255
4256 if (not_reconfigured == 0) {
4257 sentinelEvent(LL_WARNING,"+failover-end",master,"%@");
4258 master->failover_state = SENTINEL_FAILOVER_STATE_UPDATE_CONFIG;
4259 master->failover_state_change_time = mstime();
4260 }
4261
4262 /* If I'm the leader it is a good idea to send a best effort SLAVEOF
4263 * command to all the slaves still not reconfigured to replicate with
4264 * the new master. */
4265 if (timeout) {
4266 dictIterator *di;
4267 dictEntry *de;
4268
4269 di = dictGetIterator(master->slaves);
4270 while((de = dictNext(di)) != NULL) {
4271 sentinelRedisInstance *slave = dictGetVal(de);
4272 int retval;
4273
4274 if (slave->flags & (SRI_RECONF_DONE|SRI_RECONF_SENT)) continue;
4275 if (slave->link->disconnected) continue;
4276
4277 retval = sentinelSendSlaveOf(slave,
4278 master->promoted_slave->addr->ip,
4279 master->promoted_slave->addr->port);
4280 if (retval == C_OK) {
4281 sentinelEvent(LL_NOTICE,"+slave-reconf-sent-be",slave,"%@");
4282 slave->flags |= SRI_RECONF_SENT;
4283 }
4284 }
4285 dictReleaseIterator(di);
4286 }
4287 }
4288
4289 /* Send SLAVE OF <new master address> to all the remaining slaves that
4290 * still don't appear to have the configuration updated. */
sentinelFailoverReconfNextSlave(sentinelRedisInstance * master)4291 void sentinelFailoverReconfNextSlave(sentinelRedisInstance *master) {
4292 dictIterator *di;
4293 dictEntry *de;
4294 int in_progress = 0;
4295
4296 di = dictGetIterator(master->slaves);
4297 while((de = dictNext(di)) != NULL) {
4298 sentinelRedisInstance *slave = dictGetVal(de);
4299
4300 if (slave->flags & (SRI_RECONF_SENT|SRI_RECONF_INPROG))
4301 in_progress++;
4302 }
4303 dictReleaseIterator(di);
4304
4305 di = dictGetIterator(master->slaves);
4306 while(in_progress < master->parallel_syncs &&
4307 (de = dictNext(di)) != NULL)
4308 {
4309 sentinelRedisInstance *slave = dictGetVal(de);
4310 int retval;
4311
4312 /* Skip the promoted slave, and already configured slaves. */
4313 if (slave->flags & (SRI_PROMOTED|SRI_RECONF_DONE)) continue;
4314
4315 /* If too much time elapsed without the slave moving forward to
4316 * the next state, consider it reconfigured even if it is not.
4317 * Sentinels will detect the slave as misconfigured and fix its
4318 * configuration later. */
4319 if ((slave->flags & SRI_RECONF_SENT) &&
4320 (mstime() - slave->slave_reconf_sent_time) >
4321 SENTINEL_SLAVE_RECONF_TIMEOUT)
4322 {
4323 sentinelEvent(LL_NOTICE,"-slave-reconf-sent-timeout",slave,"%@");
4324 slave->flags &= ~SRI_RECONF_SENT;
4325 slave->flags |= SRI_RECONF_DONE;
4326 }
4327
4328 /* Nothing to do for instances that are disconnected or already
4329 * in RECONF_SENT state. */
4330 if (slave->flags & (SRI_RECONF_SENT|SRI_RECONF_INPROG)) continue;
4331 if (slave->link->disconnected) continue;
4332
4333 /* Send SLAVEOF <new master>. */
4334 retval = sentinelSendSlaveOf(slave,
4335 master->promoted_slave->addr->ip,
4336 master->promoted_slave->addr->port);
4337 if (retval == C_OK) {
4338 slave->flags |= SRI_RECONF_SENT;
4339 slave->slave_reconf_sent_time = mstime();
4340 sentinelEvent(LL_NOTICE,"+slave-reconf-sent",slave,"%@");
4341 in_progress++;
4342 }
4343 }
4344 dictReleaseIterator(di);
4345
4346 /* Check if all the slaves are reconfigured and handle timeout. */
4347 sentinelFailoverDetectEnd(master);
4348 }
4349
4350 /* This function is called when the slave is in
4351 * SENTINEL_FAILOVER_STATE_UPDATE_CONFIG state. In this state we need
4352 * to remove it from the master table and add the promoted slave instead. */
sentinelFailoverSwitchToPromotedSlave(sentinelRedisInstance * master)4353 void sentinelFailoverSwitchToPromotedSlave(sentinelRedisInstance *master) {
4354 sentinelRedisInstance *ref = master->promoted_slave ?
4355 master->promoted_slave : master;
4356
4357 sentinelEvent(LL_WARNING,"+switch-master",master,"%s %s %d %s %d",
4358 master->name, master->addr->ip, master->addr->port,
4359 ref->addr->ip, ref->addr->port);
4360
4361 sentinelResetMasterAndChangeAddress(master,ref->addr->ip,ref->addr->port);
4362 }
4363
sentinelFailoverStateMachine(sentinelRedisInstance * ri)4364 void sentinelFailoverStateMachine(sentinelRedisInstance *ri) {
4365 serverAssert(ri->flags & SRI_MASTER);
4366
4367 if (!(ri->flags & SRI_FAILOVER_IN_PROGRESS)) return;
4368
4369 switch(ri->failover_state) {
4370 case SENTINEL_FAILOVER_STATE_WAIT_START:
4371 sentinelFailoverWaitStart(ri);
4372 break;
4373 case SENTINEL_FAILOVER_STATE_SELECT_SLAVE:
4374 sentinelFailoverSelectSlave(ri);
4375 break;
4376 case SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE:
4377 sentinelFailoverSendSlaveOfNoOne(ri);
4378 break;
4379 case SENTINEL_FAILOVER_STATE_WAIT_PROMOTION:
4380 sentinelFailoverWaitPromotion(ri);
4381 break;
4382 case SENTINEL_FAILOVER_STATE_RECONF_SLAVES:
4383 sentinelFailoverReconfNextSlave(ri);
4384 break;
4385 }
4386 }
4387
4388 /* Abort a failover in progress:
4389 *
4390 * This function can only be called before the promoted slave acknowledged
4391 * the slave -> master switch. Otherwise the failover can't be aborted and
4392 * will reach its end (possibly by timeout). */
sentinelAbortFailover(sentinelRedisInstance * ri)4393 void sentinelAbortFailover(sentinelRedisInstance *ri) {
4394 serverAssert(ri->flags & SRI_FAILOVER_IN_PROGRESS);
4395 serverAssert(ri->failover_state <= SENTINEL_FAILOVER_STATE_WAIT_PROMOTION);
4396
4397 ri->flags &= ~(SRI_FAILOVER_IN_PROGRESS|SRI_FORCE_FAILOVER);
4398 ri->failover_state = SENTINEL_FAILOVER_STATE_NONE;
4399 ri->failover_state_change_time = mstime();
4400 if (ri->promoted_slave) {
4401 ri->promoted_slave->flags &= ~SRI_PROMOTED;
4402 ri->promoted_slave = NULL;
4403 }
4404 }
4405
4406 /* ======================== SENTINEL timer handler ==========================
4407 * This is the "main" our Sentinel, being sentinel completely non blocking
4408 * in design. The function is called every second.
4409 * -------------------------------------------------------------------------- */
4410
4411 /* Perform scheduled operations for the specified Redis instance. */
sentinelHandleRedisInstance(sentinelRedisInstance * ri)4412 void sentinelHandleRedisInstance(sentinelRedisInstance *ri) {
4413 /* ========== MONITORING HALF ============ */
4414 /* Every kind of instance */
4415 sentinelReconnectInstance(ri);
4416 sentinelSendPeriodicCommands(ri);
4417
4418 /* ============== ACTING HALF ============= */
4419 /* We don't proceed with the acting half if we are in TILT mode.
4420 * TILT happens when we find something odd with the time, like a
4421 * sudden change in the clock. */
4422 if (sentinel.tilt) {
4423 if (mstime()-sentinel.tilt_start_time < SENTINEL_TILT_PERIOD) return;
4424 sentinel.tilt = 0;
4425 sentinelEvent(LL_WARNING,"-tilt",NULL,"#tilt mode exited");
4426 }
4427
4428 /* Every kind of instance */
4429 sentinelCheckSubjectivelyDown(ri);
4430
4431 /* Masters and slaves */
4432 if (ri->flags & (SRI_MASTER|SRI_SLAVE)) {
4433 /* Nothing so far. */
4434 }
4435
4436 /* Only masters */
4437 if (ri->flags & SRI_MASTER) {
4438 sentinelCheckObjectivelyDown(ri);
4439 if (sentinelStartFailoverIfNeeded(ri))
4440 sentinelAskMasterStateToOtherSentinels(ri,SENTINEL_ASK_FORCED);
4441 sentinelFailoverStateMachine(ri);
4442 sentinelAskMasterStateToOtherSentinels(ri,SENTINEL_NO_FLAGS);
4443 }
4444 }
4445
4446 /* Perform scheduled operations for all the instances in the dictionary.
4447 * Recursively call the function against dictionaries of slaves. */
sentinelHandleDictOfRedisInstances(dict * instances)4448 void sentinelHandleDictOfRedisInstances(dict *instances) {
4449 dictIterator *di;
4450 dictEntry *de;
4451 sentinelRedisInstance *switch_to_promoted = NULL;
4452
4453 /* There are a number of things we need to perform against every master. */
4454 di = dictGetIterator(instances);
4455 while((de = dictNext(di)) != NULL) {
4456 sentinelRedisInstance *ri = dictGetVal(de);
4457
4458 sentinelHandleRedisInstance(ri);
4459 if (ri->flags & SRI_MASTER) {
4460 sentinelHandleDictOfRedisInstances(ri->slaves);
4461 sentinelHandleDictOfRedisInstances(ri->sentinels);
4462 if (ri->failover_state == SENTINEL_FAILOVER_STATE_UPDATE_CONFIG) {
4463 switch_to_promoted = ri;
4464 }
4465 }
4466 }
4467 if (switch_to_promoted)
4468 sentinelFailoverSwitchToPromotedSlave(switch_to_promoted);
4469 dictReleaseIterator(di);
4470 }
4471
4472 /* This function checks if we need to enter the TITL mode.
4473 *
4474 * The TILT mode is entered if we detect that between two invocations of the
4475 * timer interrupt, a negative amount of time, or too much time has passed.
4476 * Note that we expect that more or less just 100 milliseconds will pass
4477 * if everything is fine. However we'll see a negative number or a
4478 * difference bigger than SENTINEL_TILT_TRIGGER milliseconds if one of the
4479 * following conditions happen:
4480 *
4481 * 1) The Sentiel process for some time is blocked, for every kind of
4482 * random reason: the load is huge, the computer was frozen for some time
4483 * in I/O or alike, the process was stopped by a signal. Everything.
4484 * 2) The system clock was altered significantly.
4485 *
4486 * Under both this conditions we'll see everything as timed out and failing
4487 * without good reasons. Instead we enter the TILT mode and wait
4488 * for SENTINEL_TILT_PERIOD to elapse before starting to act again.
4489 *
4490 * During TILT time we still collect information, we just do not act. */
sentinelCheckTiltCondition(void)4491 void sentinelCheckTiltCondition(void) {
4492 mstime_t now = mstime();
4493 mstime_t delta = now - sentinel.previous_time;
4494
4495 if (delta < 0 || delta > SENTINEL_TILT_TRIGGER) {
4496 sentinel.tilt = 1;
4497 sentinel.tilt_start_time = mstime();
4498 sentinelEvent(LL_WARNING,"+tilt",NULL,"#tilt mode entered");
4499 }
4500 sentinel.previous_time = mstime();
4501 }
4502
sentinelTimer(void)4503 void sentinelTimer(void) {
4504 sentinelCheckTiltCondition();
4505 sentinelHandleDictOfRedisInstances(sentinel.masters);
4506 sentinelRunPendingScripts();
4507 sentinelCollectTerminatedScripts();
4508 sentinelKillTimedoutScripts();
4509
4510 /* We continuously change the frequency of the Redis "timer interrupt"
4511 * in order to desynchronize every Sentinel from every other.
4512 * This non-determinism avoids that Sentinels started at the same time
4513 * exactly continue to stay synchronized asking to be voted at the
4514 * same time again and again (resulting in nobody likely winning the
4515 * election because of split brain voting). */
4516 server.hz = CONFIG_DEFAULT_HZ + rand() % CONFIG_DEFAULT_HZ;
4517 }
4518
4519