xref: /f-stack/app/redis-5.0.5/src/sentinel.c (revision 572c4311)
1 /* Redis Sentinel implementation
2  *
3  * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
4  * All rights reserved.
5  *
6  * Redistribution and use in source and binary forms, with or without
7  * modification, are permitted provided that the following conditions are met:
8  *
9  *   * Redistributions of source code must retain the above copyright notice,
10  *     this list of conditions and the following disclaimer.
11  *   * Redistributions in binary form must reproduce the above copyright
12  *     notice, this list of conditions and the following disclaimer in the
13  *     documentation and/or other materials provided with the distribution.
14  *   * Neither the name of Redis nor the names of its contributors may be used
15  *     to endorse or promote products derived from this software without
16  *     specific prior written permission.
17  *
18  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
19  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
21  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
22  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
23  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
24  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
25  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
26  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
27  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28  * POSSIBILITY OF SUCH DAMAGE.
29  */
30 
31 #include "server.h"
32 #include "hiredis.h"
33 #include "async.h"
34 
35 #include <ctype.h>
36 #include <arpa/inet.h>
37 #include <sys/socket.h>
38 #include <sys/wait.h>
39 #include <fcntl.h>
40 
41 extern char **environ;
42 
43 #define REDIS_SENTINEL_PORT 26379
44 
45 /* ======================== Sentinel global state =========================== */
46 
47 /* Address object, used to describe an ip:port pair. */
48 typedef struct sentinelAddr {
49     char *ip;
50     int port;
51 } sentinelAddr;
52 
53 /* A Sentinel Redis Instance object is monitoring. */
54 #define SRI_MASTER  (1<<0)
55 #define SRI_SLAVE   (1<<1)
56 #define SRI_SENTINEL (1<<2)
57 #define SRI_S_DOWN (1<<3)   /* Subjectively down (no quorum). */
58 #define SRI_O_DOWN (1<<4)   /* Objectively down (confirmed by others). */
59 #define SRI_MASTER_DOWN (1<<5) /* A Sentinel with this flag set thinks that
60                                    its master is down. */
61 #define SRI_FAILOVER_IN_PROGRESS (1<<6) /* Failover is in progress for
62                                            this master. */
63 #define SRI_PROMOTED (1<<7)            /* Slave selected for promotion. */
64 #define SRI_RECONF_SENT (1<<8)     /* SLAVEOF <newmaster> sent. */
65 #define SRI_RECONF_INPROG (1<<9)   /* Slave synchronization in progress. */
66 #define SRI_RECONF_DONE (1<<10)     /* Slave synchronized with new master. */
67 #define SRI_FORCE_FAILOVER (1<<11)  /* Force failover with master up. */
68 #define SRI_SCRIPT_KILL_SENT (1<<12) /* SCRIPT KILL already sent on -BUSY */
69 
70 /* Note: times are in milliseconds. */
71 #define SENTINEL_INFO_PERIOD 10000
72 #define SENTINEL_PING_PERIOD 1000
73 #define SENTINEL_ASK_PERIOD 1000
74 #define SENTINEL_PUBLISH_PERIOD 2000
75 #define SENTINEL_DEFAULT_DOWN_AFTER 30000
76 #define SENTINEL_HELLO_CHANNEL "__sentinel__:hello"
77 #define SENTINEL_TILT_TRIGGER 2000
78 #define SENTINEL_TILT_PERIOD (SENTINEL_PING_PERIOD*30)
79 #define SENTINEL_DEFAULT_SLAVE_PRIORITY 100
80 #define SENTINEL_SLAVE_RECONF_TIMEOUT 10000
81 #define SENTINEL_DEFAULT_PARALLEL_SYNCS 1
82 #define SENTINEL_MIN_LINK_RECONNECT_PERIOD 15000
83 #define SENTINEL_DEFAULT_FAILOVER_TIMEOUT (60*3*1000)
84 #define SENTINEL_MAX_PENDING_COMMANDS 100
85 #define SENTINEL_ELECTION_TIMEOUT 10000
86 #define SENTINEL_MAX_DESYNC 1000
87 #define SENTINEL_DEFAULT_DENY_SCRIPTS_RECONFIG 1
88 
89 /* Failover machine different states. */
90 #define SENTINEL_FAILOVER_STATE_NONE 0  /* No failover in progress. */
91 #define SENTINEL_FAILOVER_STATE_WAIT_START 1  /* Wait for failover_start_time*/
92 #define SENTINEL_FAILOVER_STATE_SELECT_SLAVE 2 /* Select slave to promote */
93 #define SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE 3 /* Slave -> Master */
94 #define SENTINEL_FAILOVER_STATE_WAIT_PROMOTION 4 /* Wait slave to change role */
95 #define SENTINEL_FAILOVER_STATE_RECONF_SLAVES 5 /* SLAVEOF newmaster */
96 #define SENTINEL_FAILOVER_STATE_UPDATE_CONFIG 6 /* Monitor promoted slave. */
97 
98 #define SENTINEL_MASTER_LINK_STATUS_UP 0
99 #define SENTINEL_MASTER_LINK_STATUS_DOWN 1
100 
101 /* Generic flags that can be used with different functions.
102  * They use higher bits to avoid colliding with the function specific
103  * flags. */
104 #define SENTINEL_NO_FLAGS 0
105 #define SENTINEL_GENERATE_EVENT (1<<16)
106 #define SENTINEL_LEADER (1<<17)
107 #define SENTINEL_OBSERVER (1<<18)
108 
109 /* Script execution flags and limits. */
110 #define SENTINEL_SCRIPT_NONE 0
111 #define SENTINEL_SCRIPT_RUNNING 1
112 #define SENTINEL_SCRIPT_MAX_QUEUE 256
113 #define SENTINEL_SCRIPT_MAX_RUNNING 16
114 #define SENTINEL_SCRIPT_MAX_RUNTIME 60000 /* 60 seconds max exec time. */
115 #define SENTINEL_SCRIPT_MAX_RETRY 10
116 #define SENTINEL_SCRIPT_RETRY_DELAY 30000 /* 30 seconds between retries. */
117 
118 /* SENTINEL SIMULATE-FAILURE command flags. */
119 #define SENTINEL_SIMFAILURE_NONE 0
120 #define SENTINEL_SIMFAILURE_CRASH_AFTER_ELECTION (1<<0)
121 #define SENTINEL_SIMFAILURE_CRASH_AFTER_PROMOTION (1<<1)
122 
123 /* The link to a sentinelRedisInstance. When we have the same set of Sentinels
124  * monitoring many masters, we have different instances representing the
125  * same Sentinels, one per master, and we need to share the hiredis connections
126  * among them. Oherwise if 5 Sentinels are monitoring 100 masters we create
127  * 500 outgoing connections instead of 5.
128  *
129  * So this structure represents a reference counted link in terms of the two
130  * hiredis connections for commands and Pub/Sub, and the fields needed for
131  * failure detection, since the ping/pong time are now local to the link: if
132  * the link is available, the instance is avaialbe. This way we don't just
133  * have 5 connections instead of 500, we also send 5 pings instead of 500.
134  *
135  * Links are shared only for Sentinels: master and slave instances have
136  * a link with refcount = 1, always. */
137 typedef struct instanceLink {
138     int refcount;          /* Number of sentinelRedisInstance owners. */
139     int disconnected;      /* Non-zero if we need to reconnect cc or pc. */
140     int pending_commands;  /* Number of commands sent waiting for a reply. */
141     redisAsyncContext *cc; /* Hiredis context for commands. */
142     redisAsyncContext *pc; /* Hiredis context for Pub / Sub. */
143     mstime_t cc_conn_time; /* cc connection time. */
144     mstime_t pc_conn_time; /* pc connection time. */
145     mstime_t pc_last_activity; /* Last time we received any message. */
146     mstime_t last_avail_time; /* Last time the instance replied to ping with
147                                  a reply we consider valid. */
148     mstime_t act_ping_time;   /* Time at which the last pending ping (no pong
149                                  received after it) was sent. This field is
150                                  set to 0 when a pong is received, and set again
151                                  to the current time if the value is 0 and a new
152                                  ping is sent. */
153     mstime_t last_ping_time;  /* Time at which we sent the last ping. This is
154                                  only used to avoid sending too many pings
155                                  during failure. Idle time is computed using
156                                  the act_ping_time field. */
157     mstime_t last_pong_time;  /* Last time the instance replied to ping,
158                                  whatever the reply was. That's used to check
159                                  if the link is idle and must be reconnected. */
160     mstime_t last_reconn_time;  /* Last reconnection attempt performed when
161                                    the link was down. */
162 } instanceLink;
163 
164 typedef struct sentinelRedisInstance {
165     int flags;      /* See SRI_... defines */
166     char *name;     /* Master name from the point of view of this sentinel. */
167     char *runid;    /* Run ID of this instance, or unique ID if is a Sentinel.*/
168     uint64_t config_epoch;  /* Configuration epoch. */
169     sentinelAddr *addr; /* Master host. */
170     instanceLink *link; /* Link to the instance, may be shared for Sentinels. */
171     mstime_t last_pub_time;   /* Last time we sent hello via Pub/Sub. */
172     mstime_t last_hello_time; /* Only used if SRI_SENTINEL is set. Last time
173                                  we received a hello from this Sentinel
174                                  via Pub/Sub. */
175     mstime_t last_master_down_reply_time; /* Time of last reply to
176                                              SENTINEL is-master-down command. */
177     mstime_t s_down_since_time; /* Subjectively down since time. */
178     mstime_t o_down_since_time; /* Objectively down since time. */
179     mstime_t down_after_period; /* Consider it down after that period. */
180     mstime_t info_refresh;  /* Time at which we received INFO output from it. */
181     dict *renamed_commands;     /* Commands renamed in this instance:
182                                    Sentinel will use the alternative commands
183                                    mapped on this table to send things like
184                                    SLAVEOF, CONFING, INFO, ... */
185 
186     /* Role and the first time we observed it.
187      * This is useful in order to delay replacing what the instance reports
188      * with our own configuration. We need to always wait some time in order
189      * to give a chance to the leader to report the new configuration before
190      * we do silly things. */
191     int role_reported;
192     mstime_t role_reported_time;
193     mstime_t slave_conf_change_time; /* Last time slave master addr changed. */
194 
195     /* Master specific. */
196     dict *sentinels;    /* Other sentinels monitoring the same master. */
197     dict *slaves;       /* Slaves for this master instance. */
198     unsigned int quorum;/* Number of sentinels that need to agree on failure. */
199     int parallel_syncs; /* How many slaves to reconfigure at same time. */
200     char *auth_pass;    /* Password to use for AUTH against master & slaves. */
201 
202     /* Slave specific. */
203     mstime_t master_link_down_time; /* Slave replication link down time. */
204     int slave_priority; /* Slave priority according to its INFO output. */
205     mstime_t slave_reconf_sent_time; /* Time at which we sent SLAVE OF <new> */
206     struct sentinelRedisInstance *master; /* Master instance if it's slave. */
207     char *slave_master_host;    /* Master host as reported by INFO */
208     int slave_master_port;      /* Master port as reported by INFO */
209     int slave_master_link_status; /* Master link status as reported by INFO */
210     unsigned long long slave_repl_offset; /* Slave replication offset. */
211     /* Failover */
212     char *leader;       /* If this is a master instance, this is the runid of
213                            the Sentinel that should perform the failover. If
214                            this is a Sentinel, this is the runid of the Sentinel
215                            that this Sentinel voted as leader. */
216     uint64_t leader_epoch; /* Epoch of the 'leader' field. */
217     uint64_t failover_epoch; /* Epoch of the currently started failover. */
218     int failover_state; /* See SENTINEL_FAILOVER_STATE_* defines. */
219     mstime_t failover_state_change_time;
220     mstime_t failover_start_time;   /* Last failover attempt start time. */
221     mstime_t failover_timeout;      /* Max time to refresh failover state. */
222     mstime_t failover_delay_logged; /* For what failover_start_time value we
223                                        logged the failover delay. */
224     struct sentinelRedisInstance *promoted_slave; /* Promoted slave instance. */
225     /* Scripts executed to notify admin or reconfigure clients: when they
226      * are set to NULL no script is executed. */
227     char *notification_script;
228     char *client_reconfig_script;
229     sds info; /* cached INFO output */
230 } sentinelRedisInstance;
231 
232 /* Main state. */
233 struct sentinelState {
234     char myid[CONFIG_RUN_ID_SIZE+1]; /* This sentinel ID. */
235     uint64_t current_epoch;         /* Current epoch. */
236     dict *masters;      /* Dictionary of master sentinelRedisInstances.
237                            Key is the instance name, value is the
238                            sentinelRedisInstance structure pointer. */
239     int tilt;           /* Are we in TILT mode? */
240     int running_scripts;    /* Number of scripts in execution right now. */
241     mstime_t tilt_start_time;       /* When TITL started. */
242     mstime_t previous_time;         /* Last time we ran the time handler. */
243     list *scripts_queue;            /* Queue of user scripts to execute. */
244     char *announce_ip;  /* IP addr that is gossiped to other sentinels if
245                            not NULL. */
246     int announce_port;  /* Port that is gossiped to other sentinels if
247                            non zero. */
248     unsigned long simfailure_flags; /* Failures simulation. */
249     int deny_scripts_reconfig; /* Allow SENTINEL SET ... to change script
250                                   paths at runtime? */
251 } sentinel;
252 
253 /* A script execution job. */
254 typedef struct sentinelScriptJob {
255     int flags;              /* Script job flags: SENTINEL_SCRIPT_* */
256     int retry_num;          /* Number of times we tried to execute it. */
257     char **argv;            /* Arguments to call the script. */
258     mstime_t start_time;    /* Script execution time if the script is running,
259                                otherwise 0 if we are allowed to retry the
260                                execution at any time. If the script is not
261                                running and it's not 0, it means: do not run
262                                before the specified time. */
263     pid_t pid;              /* Script execution pid. */
264 } sentinelScriptJob;
265 
266 /* ======================= hiredis ae.c adapters =============================
267  * Note: this implementation is taken from hiredis/adapters/ae.h, however
268  * we have our modified copy for Sentinel in order to use our allocator
269  * and to have full control over how the adapter works. */
270 
271 typedef struct redisAeEvents {
272     redisAsyncContext *context;
273     aeEventLoop *loop;
274     int fd;
275     int reading, writing;
276 } redisAeEvents;
277 
redisAeReadEvent(aeEventLoop * el,int fd,void * privdata,int mask)278 static void redisAeReadEvent(aeEventLoop *el, int fd, void *privdata, int mask) {
279     ((void)el); ((void)fd); ((void)mask);
280 
281     redisAeEvents *e = (redisAeEvents*)privdata;
282     redisAsyncHandleRead(e->context);
283 }
284 
redisAeWriteEvent(aeEventLoop * el,int fd,void * privdata,int mask)285 static void redisAeWriteEvent(aeEventLoop *el, int fd, void *privdata, int mask) {
286     ((void)el); ((void)fd); ((void)mask);
287 
288     redisAeEvents *e = (redisAeEvents*)privdata;
289     redisAsyncHandleWrite(e->context);
290 }
291 
redisAeAddRead(void * privdata)292 static void redisAeAddRead(void *privdata) {
293     redisAeEvents *e = (redisAeEvents*)privdata;
294     aeEventLoop *loop = e->loop;
295     if (!e->reading) {
296         e->reading = 1;
297         aeCreateFileEvent(loop,e->fd,AE_READABLE,redisAeReadEvent,e);
298     }
299 }
300 
redisAeDelRead(void * privdata)301 static void redisAeDelRead(void *privdata) {
302     redisAeEvents *e = (redisAeEvents*)privdata;
303     aeEventLoop *loop = e->loop;
304     if (e->reading) {
305         e->reading = 0;
306         aeDeleteFileEvent(loop,e->fd,AE_READABLE);
307     }
308 }
309 
redisAeAddWrite(void * privdata)310 static void redisAeAddWrite(void *privdata) {
311     redisAeEvents *e = (redisAeEvents*)privdata;
312     aeEventLoop *loop = e->loop;
313     if (!e->writing) {
314         e->writing = 1;
315         aeCreateFileEvent(loop,e->fd,AE_WRITABLE,redisAeWriteEvent,e);
316     }
317 }
318 
redisAeDelWrite(void * privdata)319 static void redisAeDelWrite(void *privdata) {
320     redisAeEvents *e = (redisAeEvents*)privdata;
321     aeEventLoop *loop = e->loop;
322     if (e->writing) {
323         e->writing = 0;
324         aeDeleteFileEvent(loop,e->fd,AE_WRITABLE);
325     }
326 }
327 
redisAeCleanup(void * privdata)328 static void redisAeCleanup(void *privdata) {
329     redisAeEvents *e = (redisAeEvents*)privdata;
330     redisAeDelRead(privdata);
331     redisAeDelWrite(privdata);
332     zfree(e);
333 }
334 
redisAeAttach(aeEventLoop * loop,redisAsyncContext * ac)335 static int redisAeAttach(aeEventLoop *loop, redisAsyncContext *ac) {
336     redisContext *c = &(ac->c);
337     redisAeEvents *e;
338 
339     /* Nothing should be attached when something is already attached */
340     if (ac->ev.data != NULL)
341         return C_ERR;
342 
343     /* Create container for context and r/w events */
344     e = (redisAeEvents*)zmalloc(sizeof(*e));
345     e->context = ac;
346     e->loop = loop;
347     e->fd = c->fd;
348     e->reading = e->writing = 0;
349 
350     /* Register functions to start/stop listening for events */
351     ac->ev.addRead = redisAeAddRead;
352     ac->ev.delRead = redisAeDelRead;
353     ac->ev.addWrite = redisAeAddWrite;
354     ac->ev.delWrite = redisAeDelWrite;
355     ac->ev.cleanup = redisAeCleanup;
356     ac->ev.data = e;
357 
358     return C_OK;
359 }
360 
361 /* ============================= Prototypes ================================= */
362 
363 void sentinelLinkEstablishedCallback(const redisAsyncContext *c, int status);
364 void sentinelDisconnectCallback(const redisAsyncContext *c, int status);
365 void sentinelReceiveHelloMessages(redisAsyncContext *c, void *reply, void *privdata);
366 sentinelRedisInstance *sentinelGetMasterByName(char *name);
367 char *sentinelGetSubjectiveLeader(sentinelRedisInstance *master);
368 char *sentinelGetObjectiveLeader(sentinelRedisInstance *master);
369 int yesnotoi(char *s);
370 void instanceLinkConnectionError(const redisAsyncContext *c);
371 const char *sentinelRedisInstanceTypeStr(sentinelRedisInstance *ri);
372 void sentinelAbortFailover(sentinelRedisInstance *ri);
373 void sentinelEvent(int level, char *type, sentinelRedisInstance *ri, const char *fmt, ...);
374 sentinelRedisInstance *sentinelSelectSlave(sentinelRedisInstance *master);
375 void sentinelScheduleScriptExecution(char *path, ...);
376 void sentinelStartFailover(sentinelRedisInstance *master);
377 void sentinelDiscardReplyCallback(redisAsyncContext *c, void *reply, void *privdata);
378 int sentinelSendSlaveOf(sentinelRedisInstance *ri, char *host, int port);
379 char *sentinelVoteLeader(sentinelRedisInstance *master, uint64_t req_epoch, char *req_runid, uint64_t *leader_epoch);
380 void sentinelFlushConfig(void);
381 void sentinelGenerateInitialMonitorEvents(void);
382 int sentinelSendPing(sentinelRedisInstance *ri);
383 int sentinelForceHelloUpdateForMaster(sentinelRedisInstance *master);
384 sentinelRedisInstance *getSentinelRedisInstanceByAddrAndRunID(dict *instances, char *ip, int port, char *runid);
385 void sentinelSimFailureCrash(void);
386 
387 /* ========================= Dictionary types =============================== */
388 
389 uint64_t dictSdsHash(const void *key);
390 uint64_t dictSdsCaseHash(const void *key);
391 int dictSdsKeyCompare(void *privdata, const void *key1, const void *key2);
392 int dictSdsKeyCaseCompare(void *privdata, const void *key1, const void *key2);
393 void releaseSentinelRedisInstance(sentinelRedisInstance *ri);
394 
dictInstancesValDestructor(void * privdata,void * obj)395 void dictInstancesValDestructor (void *privdata, void *obj) {
396     UNUSED(privdata);
397     releaseSentinelRedisInstance(obj);
398 }
399 
400 /* Instance name (sds) -> instance (sentinelRedisInstance pointer)
401  *
402  * also used for: sentinelRedisInstance->sentinels dictionary that maps
403  * sentinels ip:port to last seen time in Pub/Sub hello message. */
404 dictType instancesDictType = {
405     dictSdsHash,               /* hash function */
406     NULL,                      /* key dup */
407     NULL,                      /* val dup */
408     dictSdsKeyCompare,         /* key compare */
409     NULL,                      /* key destructor */
410     dictInstancesValDestructor /* val destructor */
411 };
412 
413 /* Instance runid (sds) -> votes (long casted to void*)
414  *
415  * This is useful into sentinelGetObjectiveLeader() function in order to
416  * count the votes and understand who is the leader. */
417 dictType leaderVotesDictType = {
418     dictSdsHash,               /* hash function */
419     NULL,                      /* key dup */
420     NULL,                      /* val dup */
421     dictSdsKeyCompare,         /* key compare */
422     NULL,                      /* key destructor */
423     NULL                       /* val destructor */
424 };
425 
426 /* Instance renamed commands table. */
427 dictType renamedCommandsDictType = {
428     dictSdsCaseHash,           /* hash function */
429     NULL,                      /* key dup */
430     NULL,                      /* val dup */
431     dictSdsKeyCaseCompare,     /* key compare */
432     dictSdsDestructor,         /* key destructor */
433     dictSdsDestructor          /* val destructor */
434 };
435 
436 /* =========================== Initialization =============================== */
437 
438 void sentinelCommand(client *c);
439 void sentinelInfoCommand(client *c);
440 void sentinelSetCommand(client *c);
441 void sentinelPublishCommand(client *c);
442 void sentinelRoleCommand(client *c);
443 
444 struct redisCommand sentinelcmds[] = {
445     {"ping",pingCommand,1,"",0,NULL,0,0,0,0,0},
446     {"sentinel",sentinelCommand,-2,"",0,NULL,0,0,0,0,0},
447     {"subscribe",subscribeCommand,-2,"",0,NULL,0,0,0,0,0},
448     {"unsubscribe",unsubscribeCommand,-1,"",0,NULL,0,0,0,0,0},
449     {"psubscribe",psubscribeCommand,-2,"",0,NULL,0,0,0,0,0},
450     {"punsubscribe",punsubscribeCommand,-1,"",0,NULL,0,0,0,0,0},
451     {"publish",sentinelPublishCommand,3,"",0,NULL,0,0,0,0,0},
452     {"info",sentinelInfoCommand,-1,"",0,NULL,0,0,0,0,0},
453     {"role",sentinelRoleCommand,1,"l",0,NULL,0,0,0,0,0},
454     {"client",clientCommand,-2,"rs",0,NULL,0,0,0,0,0},
455     {"shutdown",shutdownCommand,-1,"",0,NULL,0,0,0,0,0},
456     {"auth",authCommand,2,"sltF",0,NULL,0,0,0,0,0}
457 };
458 
459 /* This function overwrites a few normal Redis config default with Sentinel
460  * specific defaults. */
initSentinelConfig(void)461 void initSentinelConfig(void) {
462     server.port = REDIS_SENTINEL_PORT;
463     server.protected_mode = 0; /* Sentinel must be exposed. */
464 }
465 
466 /* Perform the Sentinel mode initialization. */
initSentinel(void)467 void initSentinel(void) {
468     unsigned int j;
469 
470     /* Remove usual Redis commands from the command table, then just add
471      * the SENTINEL command. */
472     dictEmpty(server.commands,NULL);
473     for (j = 0; j < sizeof(sentinelcmds)/sizeof(sentinelcmds[0]); j++) {
474         int retval;
475         struct redisCommand *cmd = sentinelcmds+j;
476 
477         retval = dictAdd(server.commands, sdsnew(cmd->name), cmd);
478         serverAssert(retval == DICT_OK);
479     }
480 
481     /* Initialize various data structures. */
482     sentinel.current_epoch = 0;
483     sentinel.masters = dictCreate(&instancesDictType,NULL);
484     sentinel.tilt = 0;
485     sentinel.tilt_start_time = 0;
486     sentinel.previous_time = mstime();
487     sentinel.running_scripts = 0;
488     sentinel.scripts_queue = listCreate();
489     sentinel.announce_ip = NULL;
490     sentinel.announce_port = 0;
491     sentinel.simfailure_flags = SENTINEL_SIMFAILURE_NONE;
492     sentinel.deny_scripts_reconfig = SENTINEL_DEFAULT_DENY_SCRIPTS_RECONFIG;
493     memset(sentinel.myid,0,sizeof(sentinel.myid));
494 }
495 
496 /* This function gets called when the server is in Sentinel mode, started,
497  * loaded the configuration, and is ready for normal operations. */
sentinelIsRunning(void)498 void sentinelIsRunning(void) {
499     int j;
500 
501     if (server.configfile == NULL) {
502         serverLog(LL_WARNING,
503             "Sentinel started without a config file. Exiting...");
504         exit(1);
505     } else if (access(server.configfile,W_OK) == -1) {
506         serverLog(LL_WARNING,
507             "Sentinel config file %s is not writable: %s. Exiting...",
508             server.configfile,strerror(errno));
509         exit(1);
510     }
511 
512     /* If this Sentinel has yet no ID set in the configuration file, we
513      * pick a random one and persist the config on disk. From now on this
514      * will be this Sentinel ID across restarts. */
515     for (j = 0; j < CONFIG_RUN_ID_SIZE; j++)
516         if (sentinel.myid[j] != 0) break;
517 
518     if (j == CONFIG_RUN_ID_SIZE) {
519         /* Pick ID and persist the config. */
520         getRandomHexChars(sentinel.myid,CONFIG_RUN_ID_SIZE);
521         sentinelFlushConfig();
522     }
523 
524     /* Log its ID to make debugging of issues simpler. */
525     serverLog(LL_WARNING,"Sentinel ID is %s", sentinel.myid);
526 
527     /* We want to generate a +monitor event for every configured master
528      * at startup. */
529     sentinelGenerateInitialMonitorEvents();
530 }
531 
532 /* ============================== sentinelAddr ============================== */
533 
534 /* Create a sentinelAddr object and return it on success.
535  * On error NULL is returned and errno is set to:
536  *  ENOENT: Can't resolve the hostname.
537  *  EINVAL: Invalid port number.
538  */
createSentinelAddr(char * hostname,int port)539 sentinelAddr *createSentinelAddr(char *hostname, int port) {
540     char ip[NET_IP_STR_LEN];
541     sentinelAddr *sa;
542 
543     if (port < 0 || port > 65535) {
544         errno = EINVAL;
545         return NULL;
546     }
547     if (anetResolve(NULL,hostname,ip,sizeof(ip)) == ANET_ERR) {
548         errno = ENOENT;
549         return NULL;
550     }
551     sa = zmalloc(sizeof(*sa));
552     sa->ip = sdsnew(ip);
553     sa->port = port;
554     return sa;
555 }
556 
557 /* Return a duplicate of the source address. */
dupSentinelAddr(sentinelAddr * src)558 sentinelAddr *dupSentinelAddr(sentinelAddr *src) {
559     sentinelAddr *sa;
560 
561     sa = zmalloc(sizeof(*sa));
562     sa->ip = sdsnew(src->ip);
563     sa->port = src->port;
564     return sa;
565 }
566 
567 /* Free a Sentinel address. Can't fail. */
releaseSentinelAddr(sentinelAddr * sa)568 void releaseSentinelAddr(sentinelAddr *sa) {
569     sdsfree(sa->ip);
570     zfree(sa);
571 }
572 
573 /* Return non-zero if two addresses are equal. */
sentinelAddrIsEqual(sentinelAddr * a,sentinelAddr * b)574 int sentinelAddrIsEqual(sentinelAddr *a, sentinelAddr *b) {
575     return a->port == b->port && !strcasecmp(a->ip,b->ip);
576 }
577 
578 /* =========================== Events notification ========================== */
579 
580 /* Send an event to log, pub/sub, user notification script.
581  *
582  * 'level' is the log level for logging. Only LL_WARNING events will trigger
583  * the execution of the user notification script.
584  *
585  * 'type' is the message type, also used as a pub/sub channel name.
586  *
587  * 'ri', is the redis instance target of this event if applicable, and is
588  * used to obtain the path of the notification script to execute.
589  *
590  * The remaining arguments are printf-alike.
591  * If the format specifier starts with the two characters "%@" then ri is
592  * not NULL, and the message is prefixed with an instance identifier in the
593  * following format:
594  *
595  *  <instance type> <instance name> <ip> <port>
596  *
597  *  If the instance type is not master, than the additional string is
598  *  added to specify the originating master:
599  *
600  *  @ <master name> <master ip> <master port>
601  *
602  *  Any other specifier after "%@" is processed by printf itself.
603  */
sentinelEvent(int level,char * type,sentinelRedisInstance * ri,const char * fmt,...)604 void sentinelEvent(int level, char *type, sentinelRedisInstance *ri,
605                    const char *fmt, ...) {
606     va_list ap;
607     char msg[LOG_MAX_LEN];
608     robj *channel, *payload;
609 
610     /* Handle %@ */
611     if (fmt[0] == '%' && fmt[1] == '@') {
612         sentinelRedisInstance *master = (ri->flags & SRI_MASTER) ?
613                                          NULL : ri->master;
614 
615         if (master) {
616             snprintf(msg, sizeof(msg), "%s %s %s %d @ %s %s %d",
617                 sentinelRedisInstanceTypeStr(ri),
618                 ri->name, ri->addr->ip, ri->addr->port,
619                 master->name, master->addr->ip, master->addr->port);
620         } else {
621             snprintf(msg, sizeof(msg), "%s %s %s %d",
622                 sentinelRedisInstanceTypeStr(ri),
623                 ri->name, ri->addr->ip, ri->addr->port);
624         }
625         fmt += 2;
626     } else {
627         msg[0] = '\0';
628     }
629 
630     /* Use vsprintf for the rest of the formatting if any. */
631     if (fmt[0] != '\0') {
632         va_start(ap, fmt);
633         vsnprintf(msg+strlen(msg), sizeof(msg)-strlen(msg), fmt, ap);
634         va_end(ap);
635     }
636 
637     /* Log the message if the log level allows it to be logged. */
638     if (level >= server.verbosity)
639         serverLog(level,"%s %s",type,msg);
640 
641     /* Publish the message via Pub/Sub if it's not a debugging one. */
642     if (level != LL_DEBUG) {
643         channel = createStringObject(type,strlen(type));
644         payload = createStringObject(msg,strlen(msg));
645         pubsubPublishMessage(channel,payload);
646         decrRefCount(channel);
647         decrRefCount(payload);
648     }
649 
650     /* Call the notification script if applicable. */
651     if (level == LL_WARNING && ri != NULL) {
652         sentinelRedisInstance *master = (ri->flags & SRI_MASTER) ?
653                                          ri : ri->master;
654         if (master && master->notification_script) {
655             sentinelScheduleScriptExecution(master->notification_script,
656                 type,msg,NULL);
657         }
658     }
659 }
660 
661 /* This function is called only at startup and is used to generate a
662  * +monitor event for every configured master. The same events are also
663  * generated when a master to monitor is added at runtime via the
664  * SENTINEL MONITOR command. */
sentinelGenerateInitialMonitorEvents(void)665 void sentinelGenerateInitialMonitorEvents(void) {
666     dictIterator *di;
667     dictEntry *de;
668 
669     di = dictGetIterator(sentinel.masters);
670     while((de = dictNext(di)) != NULL) {
671         sentinelRedisInstance *ri = dictGetVal(de);
672         sentinelEvent(LL_WARNING,"+monitor",ri,"%@ quorum %d",ri->quorum);
673     }
674     dictReleaseIterator(di);
675 }
676 
677 /* ============================ script execution ============================ */
678 
679 /* Release a script job structure and all the associated data. */
sentinelReleaseScriptJob(sentinelScriptJob * sj)680 void sentinelReleaseScriptJob(sentinelScriptJob *sj) {
681     int j = 0;
682 
683     while(sj->argv[j]) sdsfree(sj->argv[j++]);
684     zfree(sj->argv);
685     zfree(sj);
686 }
687 
688 #define SENTINEL_SCRIPT_MAX_ARGS 16
sentinelScheduleScriptExecution(char * path,...)689 void sentinelScheduleScriptExecution(char *path, ...) {
690     va_list ap;
691     char *argv[SENTINEL_SCRIPT_MAX_ARGS+1];
692     int argc = 1;
693     sentinelScriptJob *sj;
694 
695     va_start(ap, path);
696     while(argc < SENTINEL_SCRIPT_MAX_ARGS) {
697         argv[argc] = va_arg(ap,char*);
698         if (!argv[argc]) break;
699         argv[argc] = sdsnew(argv[argc]); /* Copy the string. */
700         argc++;
701     }
702     va_end(ap);
703     argv[0] = sdsnew(path);
704 
705     sj = zmalloc(sizeof(*sj));
706     sj->flags = SENTINEL_SCRIPT_NONE;
707     sj->retry_num = 0;
708     sj->argv = zmalloc(sizeof(char*)*(argc+1));
709     sj->start_time = 0;
710     sj->pid = 0;
711     memcpy(sj->argv,argv,sizeof(char*)*(argc+1));
712 
713     listAddNodeTail(sentinel.scripts_queue,sj);
714 
715     /* Remove the oldest non running script if we already hit the limit. */
716     if (listLength(sentinel.scripts_queue) > SENTINEL_SCRIPT_MAX_QUEUE) {
717         listNode *ln;
718         listIter li;
719 
720         listRewind(sentinel.scripts_queue,&li);
721         while ((ln = listNext(&li)) != NULL) {
722             sj = ln->value;
723 
724             if (sj->flags & SENTINEL_SCRIPT_RUNNING) continue;
725             /* The first node is the oldest as we add on tail. */
726             listDelNode(sentinel.scripts_queue,ln);
727             sentinelReleaseScriptJob(sj);
728             break;
729         }
730         serverAssert(listLength(sentinel.scripts_queue) <=
731                     SENTINEL_SCRIPT_MAX_QUEUE);
732     }
733 }
734 
735 /* Lookup a script in the scripts queue via pid, and returns the list node
736  * (so that we can easily remove it from the queue if needed). */
sentinelGetScriptListNodeByPid(pid_t pid)737 listNode *sentinelGetScriptListNodeByPid(pid_t pid) {
738     listNode *ln;
739     listIter li;
740 
741     listRewind(sentinel.scripts_queue,&li);
742     while ((ln = listNext(&li)) != NULL) {
743         sentinelScriptJob *sj = ln->value;
744 
745         if ((sj->flags & SENTINEL_SCRIPT_RUNNING) && sj->pid == pid)
746             return ln;
747     }
748     return NULL;
749 }
750 
751 /* Run pending scripts if we are not already at max number of running
752  * scripts. */
sentinelRunPendingScripts(void)753 void sentinelRunPendingScripts(void) {
754     listNode *ln;
755     listIter li;
756     mstime_t now = mstime();
757 
758     /* Find jobs that are not running and run them, from the top to the
759      * tail of the queue, so we run older jobs first. */
760     listRewind(sentinel.scripts_queue,&li);
761     while (sentinel.running_scripts < SENTINEL_SCRIPT_MAX_RUNNING &&
762            (ln = listNext(&li)) != NULL)
763     {
764         sentinelScriptJob *sj = ln->value;
765         pid_t pid;
766 
767         /* Skip if already running. */
768         if (sj->flags & SENTINEL_SCRIPT_RUNNING) continue;
769 
770         /* Skip if it's a retry, but not enough time has elapsed. */
771         if (sj->start_time && sj->start_time > now) continue;
772 
773         sj->flags |= SENTINEL_SCRIPT_RUNNING;
774         sj->start_time = mstime();
775         sj->retry_num++;
776         pid = fork();
777 
778         if (pid == -1) {
779             /* Parent (fork error).
780              * We report fork errors as signal 99, in order to unify the
781              * reporting with other kind of errors. */
782             sentinelEvent(LL_WARNING,"-script-error",NULL,
783                           "%s %d %d", sj->argv[0], 99, 0);
784             sj->flags &= ~SENTINEL_SCRIPT_RUNNING;
785             sj->pid = 0;
786         } else if (pid == 0) {
787             /* Child */
788             resetCpuAffinity(NULL);
789             execve(sj->argv[0],sj->argv,environ);
790             /* If we are here an error occurred. */
791             _exit(2); /* Don't retry execution. */
792         } else {
793             sentinel.running_scripts++;
794             sj->pid = pid;
795             sentinelEvent(LL_DEBUG,"+script-child",NULL,"%ld",(long)pid);
796         }
797     }
798 }
799 
800 /* How much to delay the execution of a script that we need to retry after
801  * an error?
802  *
803  * We double the retry delay for every further retry we do. So for instance
804  * if RETRY_DELAY is set to 30 seconds and the max number of retries is 10
805  * starting from the second attempt to execute the script the delays are:
806  * 30 sec, 60 sec, 2 min, 4 min, 8 min, 16 min, 32 min, 64 min, 128 min. */
sentinelScriptRetryDelay(int retry_num)807 mstime_t sentinelScriptRetryDelay(int retry_num) {
808     mstime_t delay = SENTINEL_SCRIPT_RETRY_DELAY;
809 
810     while (retry_num-- > 1) delay *= 2;
811     return delay;
812 }
813 
814 /* Check for scripts that terminated, and remove them from the queue if the
815  * script terminated successfully. If instead the script was terminated by
816  * a signal, or returned exit code "1", it is scheduled to run again if
817  * the max number of retries did not already elapsed. */
sentinelCollectTerminatedScripts(void)818 void sentinelCollectTerminatedScripts(void) {
819     int statloc;
820     pid_t pid;
821 
822     while ((pid = wait3(&statloc,WNOHANG,NULL)) > 0) {
823         int exitcode = WEXITSTATUS(statloc);
824         int bysignal = 0;
825         listNode *ln;
826         sentinelScriptJob *sj;
827 
828         if (WIFSIGNALED(statloc)) bysignal = WTERMSIG(statloc);
829         sentinelEvent(LL_DEBUG,"-script-child",NULL,"%ld %d %d",
830             (long)pid, exitcode, bysignal);
831 
832         ln = sentinelGetScriptListNodeByPid(pid);
833         if (ln == NULL) {
834             serverLog(LL_WARNING,"wait3() returned a pid (%ld) we can't find in our scripts execution queue!", (long)pid);
835             continue;
836         }
837         sj = ln->value;
838 
839         /* If the script was terminated by a signal or returns an
840          * exit code of "1" (that means: please retry), we reschedule it
841          * if the max number of retries is not already reached. */
842         if ((bysignal || exitcode == 1) &&
843             sj->retry_num != SENTINEL_SCRIPT_MAX_RETRY)
844         {
845             sj->flags &= ~SENTINEL_SCRIPT_RUNNING;
846             sj->pid = 0;
847             sj->start_time = mstime() +
848                              sentinelScriptRetryDelay(sj->retry_num);
849         } else {
850             /* Otherwise let's remove the script, but log the event if the
851              * execution did not terminated in the best of the ways. */
852             if (bysignal || exitcode != 0) {
853                 sentinelEvent(LL_WARNING,"-script-error",NULL,
854                               "%s %d %d", sj->argv[0], bysignal, exitcode);
855             }
856             listDelNode(sentinel.scripts_queue,ln);
857             sentinelReleaseScriptJob(sj);
858             sentinel.running_scripts--;
859         }
860     }
861 }
862 
863 /* Kill scripts in timeout, they'll be collected by the
864  * sentinelCollectTerminatedScripts() function. */
sentinelKillTimedoutScripts(void)865 void sentinelKillTimedoutScripts(void) {
866     listNode *ln;
867     listIter li;
868     mstime_t now = mstime();
869 
870     listRewind(sentinel.scripts_queue,&li);
871     while ((ln = listNext(&li)) != NULL) {
872         sentinelScriptJob *sj = ln->value;
873 
874         if (sj->flags & SENTINEL_SCRIPT_RUNNING &&
875             (now - sj->start_time) > SENTINEL_SCRIPT_MAX_RUNTIME)
876         {
877             sentinelEvent(LL_WARNING,"-script-timeout",NULL,"%s %ld",
878                 sj->argv[0], (long)sj->pid);
879             kill(sj->pid,SIGKILL);
880         }
881     }
882 }
883 
884 /* Implements SENTINEL PENDING-SCRIPTS command. */
sentinelPendingScriptsCommand(client * c)885 void sentinelPendingScriptsCommand(client *c) {
886     listNode *ln;
887     listIter li;
888 
889     addReplyMultiBulkLen(c,listLength(sentinel.scripts_queue));
890     listRewind(sentinel.scripts_queue,&li);
891     while ((ln = listNext(&li)) != NULL) {
892         sentinelScriptJob *sj = ln->value;
893         int j = 0;
894 
895         addReplyMultiBulkLen(c,10);
896 
897         addReplyBulkCString(c,"argv");
898         while (sj->argv[j]) j++;
899         addReplyMultiBulkLen(c,j);
900         j = 0;
901         while (sj->argv[j]) addReplyBulkCString(c,sj->argv[j++]);
902 
903         addReplyBulkCString(c,"flags");
904         addReplyBulkCString(c,
905             (sj->flags & SENTINEL_SCRIPT_RUNNING) ? "running" : "scheduled");
906 
907         addReplyBulkCString(c,"pid");
908         addReplyBulkLongLong(c,sj->pid);
909 
910         if (sj->flags & SENTINEL_SCRIPT_RUNNING) {
911             addReplyBulkCString(c,"run-time");
912             addReplyBulkLongLong(c,mstime() - sj->start_time);
913         } else {
914             mstime_t delay = sj->start_time ? (sj->start_time-mstime()) : 0;
915             if (delay < 0) delay = 0;
916             addReplyBulkCString(c,"run-delay");
917             addReplyBulkLongLong(c,delay);
918         }
919 
920         addReplyBulkCString(c,"retry-num");
921         addReplyBulkLongLong(c,sj->retry_num);
922     }
923 }
924 
925 /* This function calls, if any, the client reconfiguration script with the
926  * following parameters:
927  *
928  * <master-name> <role> <state> <from-ip> <from-port> <to-ip> <to-port>
929  *
930  * It is called every time a failover is performed.
931  *
932  * <state> is currently always "failover".
933  * <role> is either "leader" or "observer".
934  *
935  * from/to fields are respectively master -> promoted slave addresses for
936  * "start" and "end". */
sentinelCallClientReconfScript(sentinelRedisInstance * master,int role,char * state,sentinelAddr * from,sentinelAddr * to)937 void sentinelCallClientReconfScript(sentinelRedisInstance *master, int role, char *state, sentinelAddr *from, sentinelAddr *to) {
938     char fromport[32], toport[32];
939 
940     if (master->client_reconfig_script == NULL) return;
941     ll2string(fromport,sizeof(fromport),from->port);
942     ll2string(toport,sizeof(toport),to->port);
943     sentinelScheduleScriptExecution(master->client_reconfig_script,
944         master->name,
945         (role == SENTINEL_LEADER) ? "leader" : "observer",
946         state, from->ip, fromport, to->ip, toport, NULL);
947 }
948 
949 /* =============================== instanceLink ============================= */
950 
951 /* Create a not yet connected link object. */
createInstanceLink(void)952 instanceLink *createInstanceLink(void) {
953     instanceLink *link = zmalloc(sizeof(*link));
954 
955     link->refcount = 1;
956     link->disconnected = 1;
957     link->pending_commands = 0;
958     link->cc = NULL;
959     link->pc = NULL;
960     link->cc_conn_time = 0;
961     link->pc_conn_time = 0;
962     link->last_reconn_time = 0;
963     link->pc_last_activity = 0;
964     /* We set the act_ping_time to "now" even if we actually don't have yet
965      * a connection with the node, nor we sent a ping.
966      * This is useful to detect a timeout in case we'll not be able to connect
967      * with the node at all. */
968     link->act_ping_time = mstime();
969     link->last_ping_time = 0;
970     link->last_avail_time = mstime();
971     link->last_pong_time = mstime();
972     return link;
973 }
974 
975 /* Disconnect an hiredis connection in the context of an instance link. */
instanceLinkCloseConnection(instanceLink * link,redisAsyncContext * c)976 void instanceLinkCloseConnection(instanceLink *link, redisAsyncContext *c) {
977     if (c == NULL) return;
978 
979     if (link->cc == c) {
980         link->cc = NULL;
981         link->pending_commands = 0;
982     }
983     if (link->pc == c) link->pc = NULL;
984     c->data = NULL;
985     link->disconnected = 1;
986     redisAsyncFree(c);
987 }
988 
989 /* Decrement the refcount of a link object, if it drops to zero, actually
990  * free it and return NULL. Otherwise don't do anything and return the pointer
991  * to the object.
992  *
993  * If we are not going to free the link and ri is not NULL, we rebind all the
994  * pending requests in link->cc (hiredis connection for commands) to a
995  * callback that will just ignore them. This is useful to avoid processing
996  * replies for an instance that no longer exists. */
releaseInstanceLink(instanceLink * link,sentinelRedisInstance * ri)997 instanceLink *releaseInstanceLink(instanceLink *link, sentinelRedisInstance *ri)
998 {
999     serverAssert(link->refcount > 0);
1000     link->refcount--;
1001     if (link->refcount != 0) {
1002         if (ri && ri->link->cc) {
1003             /* This instance may have pending callbacks in the hiredis async
1004              * context, having as 'privdata' the instance that we are going to
1005              * free. Let's rewrite the callback list, directly exploiting
1006              * hiredis internal data structures, in order to bind them with
1007              * a callback that will ignore the reply at all. */
1008             redisCallback *cb;
1009             redisCallbackList *callbacks = &link->cc->replies;
1010 
1011             cb = callbacks->head;
1012             while(cb) {
1013                 if (cb->privdata == ri) {
1014                     cb->fn = sentinelDiscardReplyCallback;
1015                     cb->privdata = NULL; /* Not strictly needed. */
1016                 }
1017                 cb = cb->next;
1018             }
1019         }
1020         return link; /* Other active users. */
1021     }
1022 
1023     instanceLinkCloseConnection(link,link->cc);
1024     instanceLinkCloseConnection(link,link->pc);
1025     zfree(link);
1026     return NULL;
1027 }
1028 
1029 /* This function will attempt to share the instance link we already have
1030  * for the same Sentinel in the context of a different master, with the
1031  * instance we are passing as argument.
1032  *
1033  * This way multiple Sentinel objects that refer all to the same physical
1034  * Sentinel instance but in the context of different masters will use
1035  * a single connection, will send a single PING per second for failure
1036  * detection and so forth.
1037  *
1038  * Return C_OK if a matching Sentinel was found in the context of a
1039  * different master and sharing was performed. Otherwise C_ERR
1040  * is returned. */
sentinelTryConnectionSharing(sentinelRedisInstance * ri)1041 int sentinelTryConnectionSharing(sentinelRedisInstance *ri) {
1042     serverAssert(ri->flags & SRI_SENTINEL);
1043     dictIterator *di;
1044     dictEntry *de;
1045 
1046     if (ri->runid == NULL) return C_ERR; /* No way to identify it. */
1047     if (ri->link->refcount > 1) return C_ERR; /* Already shared. */
1048 
1049     di = dictGetIterator(sentinel.masters);
1050     while((de = dictNext(di)) != NULL) {
1051         sentinelRedisInstance *master = dictGetVal(de), *match;
1052         /* We want to share with the same physical Sentinel referenced
1053          * in other masters, so skip our master. */
1054         if (master == ri->master) continue;
1055         match = getSentinelRedisInstanceByAddrAndRunID(master->sentinels,
1056                                                        NULL,0,ri->runid);
1057         if (match == NULL) continue; /* No match. */
1058         if (match == ri) continue; /* Should never happen but... safer. */
1059 
1060         /* We identified a matching Sentinel, great! Let's free our link
1061          * and use the one of the matching Sentinel. */
1062         releaseInstanceLink(ri->link,NULL);
1063         ri->link = match->link;
1064         match->link->refcount++;
1065         return C_OK;
1066     }
1067     dictReleaseIterator(di);
1068     return C_ERR;
1069 }
1070 
1071 /* When we detect a Sentinel to switch address (reporting a different IP/port
1072  * pair in Hello messages), let's update all the matching Sentinels in the
1073  * context of other masters as well and disconnect the links, so that everybody
1074  * will be updated.
1075  *
1076  * Return the number of updated Sentinel addresses. */
sentinelUpdateSentinelAddressInAllMasters(sentinelRedisInstance * ri)1077 int sentinelUpdateSentinelAddressInAllMasters(sentinelRedisInstance *ri) {
1078     serverAssert(ri->flags & SRI_SENTINEL);
1079     dictIterator *di;
1080     dictEntry *de;
1081     int reconfigured = 0;
1082 
1083     di = dictGetIterator(sentinel.masters);
1084     while((de = dictNext(di)) != NULL) {
1085         sentinelRedisInstance *master = dictGetVal(de), *match;
1086         match = getSentinelRedisInstanceByAddrAndRunID(master->sentinels,
1087                                                        NULL,0,ri->runid);
1088         /* If there is no match, this master does not know about this
1089          * Sentinel, try with the next one. */
1090         if (match == NULL) continue;
1091 
1092         /* Disconnect the old links if connected. */
1093         if (match->link->cc != NULL)
1094             instanceLinkCloseConnection(match->link,match->link->cc);
1095         if (match->link->pc != NULL)
1096             instanceLinkCloseConnection(match->link,match->link->pc);
1097 
1098         if (match == ri) continue; /* Address already updated for it. */
1099 
1100         /* Update the address of the matching Sentinel by copying the address
1101          * of the Sentinel object that received the address update. */
1102         releaseSentinelAddr(match->addr);
1103         match->addr = dupSentinelAddr(ri->addr);
1104         reconfigured++;
1105     }
1106     dictReleaseIterator(di);
1107     if (reconfigured)
1108         sentinelEvent(LL_NOTICE,"+sentinel-address-update", ri,
1109                     "%@ %d additional matching instances", reconfigured);
1110     return reconfigured;
1111 }
1112 
1113 /* This function is called when an hiredis connection reported an error.
1114  * We set it to NULL and mark the link as disconnected so that it will be
1115  * reconnected again.
1116  *
1117  * Note: we don't free the hiredis context as hiredis will do it for us
1118  * for async connections. */
instanceLinkConnectionError(const redisAsyncContext * c)1119 void instanceLinkConnectionError(const redisAsyncContext *c) {
1120     instanceLink *link = c->data;
1121     int pubsub;
1122 
1123     if (!link) return;
1124 
1125     pubsub = (link->pc == c);
1126     if (pubsub)
1127         link->pc = NULL;
1128     else
1129         link->cc = NULL;
1130     link->disconnected = 1;
1131 }
1132 
1133 /* Hiredis connection established / disconnected callbacks. We need them
1134  * just to cleanup our link state. */
sentinelLinkEstablishedCallback(const redisAsyncContext * c,int status)1135 void sentinelLinkEstablishedCallback(const redisAsyncContext *c, int status) {
1136     if (status != C_OK) instanceLinkConnectionError(c);
1137 }
1138 
sentinelDisconnectCallback(const redisAsyncContext * c,int status)1139 void sentinelDisconnectCallback(const redisAsyncContext *c, int status) {
1140     UNUSED(status);
1141     instanceLinkConnectionError(c);
1142 }
1143 
1144 /* ========================== sentinelRedisInstance ========================= */
1145 
1146 /* Create a redis instance, the following fields must be populated by the
1147  * caller if needed:
1148  * runid: set to NULL but will be populated once INFO output is received.
1149  * info_refresh: is set to 0 to mean that we never received INFO so far.
1150  *
1151  * If SRI_MASTER is set into initial flags the instance is added to
1152  * sentinel.masters table.
1153  *
1154  * if SRI_SLAVE or SRI_SENTINEL is set then 'master' must be not NULL and the
1155  * instance is added into master->slaves or master->sentinels table.
1156  *
1157  * If the instance is a slave or sentinel, the name parameter is ignored and
1158  * is created automatically as hostname:port.
1159  *
1160  * The function fails if hostname can't be resolved or port is out of range.
1161  * When this happens NULL is returned and errno is set accordingly to the
1162  * createSentinelAddr() function.
1163  *
1164  * The function may also fail and return NULL with errno set to EBUSY if
1165  * a master with the same name, a slave with the same address, or a sentinel
1166  * with the same ID already exists. */
1167 
createSentinelRedisInstance(char * name,int flags,char * hostname,int port,int quorum,sentinelRedisInstance * master)1168 sentinelRedisInstance *createSentinelRedisInstance(char *name, int flags, char *hostname, int port, int quorum, sentinelRedisInstance *master) {
1169     sentinelRedisInstance *ri;
1170     sentinelAddr *addr;
1171     dict *table = NULL;
1172     char slavename[NET_PEER_ID_LEN], *sdsname;
1173 
1174     serverAssert(flags & (SRI_MASTER|SRI_SLAVE|SRI_SENTINEL));
1175     serverAssert((flags & SRI_MASTER) || master != NULL);
1176 
1177     /* Check address validity. */
1178     addr = createSentinelAddr(hostname,port);
1179     if (addr == NULL) return NULL;
1180 
1181     /* For slaves use ip:port as name. */
1182     if (flags & SRI_SLAVE) {
1183         anetFormatAddr(slavename, sizeof(slavename), hostname, port);
1184         name = slavename;
1185     }
1186 
1187     /* Make sure the entry is not duplicated. This may happen when the same
1188      * name for a master is used multiple times inside the configuration or
1189      * if we try to add multiple times a slave or sentinel with same ip/port
1190      * to a master. */
1191     if (flags & SRI_MASTER) table = sentinel.masters;
1192     else if (flags & SRI_SLAVE) table = master->slaves;
1193     else if (flags & SRI_SENTINEL) table = master->sentinels;
1194     sdsname = sdsnew(name);
1195     if (dictFind(table,sdsname)) {
1196         releaseSentinelAddr(addr);
1197         sdsfree(sdsname);
1198         errno = EBUSY;
1199         return NULL;
1200     }
1201 
1202     /* Create the instance object. */
1203     ri = zmalloc(sizeof(*ri));
1204     /* Note that all the instances are started in the disconnected state,
1205      * the event loop will take care of connecting them. */
1206     ri->flags = flags;
1207     ri->name = sdsname;
1208     ri->runid = NULL;
1209     ri->config_epoch = 0;
1210     ri->addr = addr;
1211     ri->link = createInstanceLink();
1212     ri->last_pub_time = mstime();
1213     ri->last_hello_time = mstime();
1214     ri->last_master_down_reply_time = mstime();
1215     ri->s_down_since_time = 0;
1216     ri->o_down_since_time = 0;
1217     ri->down_after_period = master ? master->down_after_period :
1218                             SENTINEL_DEFAULT_DOWN_AFTER;
1219     ri->master_link_down_time = 0;
1220     ri->auth_pass = NULL;
1221     ri->slave_priority = SENTINEL_DEFAULT_SLAVE_PRIORITY;
1222     ri->slave_reconf_sent_time = 0;
1223     ri->slave_master_host = NULL;
1224     ri->slave_master_port = 0;
1225     ri->slave_master_link_status = SENTINEL_MASTER_LINK_STATUS_DOWN;
1226     ri->slave_repl_offset = 0;
1227     ri->sentinels = dictCreate(&instancesDictType,NULL);
1228     ri->quorum = quorum;
1229     ri->parallel_syncs = SENTINEL_DEFAULT_PARALLEL_SYNCS;
1230     ri->master = master;
1231     ri->slaves = dictCreate(&instancesDictType,NULL);
1232     ri->info_refresh = 0;
1233     ri->renamed_commands = dictCreate(&renamedCommandsDictType,NULL);
1234 
1235     /* Failover state. */
1236     ri->leader = NULL;
1237     ri->leader_epoch = 0;
1238     ri->failover_epoch = 0;
1239     ri->failover_state = SENTINEL_FAILOVER_STATE_NONE;
1240     ri->failover_state_change_time = 0;
1241     ri->failover_start_time = 0;
1242     ri->failover_timeout = SENTINEL_DEFAULT_FAILOVER_TIMEOUT;
1243     ri->failover_delay_logged = 0;
1244     ri->promoted_slave = NULL;
1245     ri->notification_script = NULL;
1246     ri->client_reconfig_script = NULL;
1247     ri->info = NULL;
1248 
1249     /* Role */
1250     ri->role_reported = ri->flags & (SRI_MASTER|SRI_SLAVE);
1251     ri->role_reported_time = mstime();
1252     ri->slave_conf_change_time = mstime();
1253 
1254     /* Add into the right table. */
1255     dictAdd(table, ri->name, ri);
1256     return ri;
1257 }
1258 
1259 /* Release this instance and all its slaves, sentinels, hiredis connections.
1260  * This function does not take care of unlinking the instance from the main
1261  * masters table (if it is a master) or from its master sentinels/slaves table
1262  * if it is a slave or sentinel. */
releaseSentinelRedisInstance(sentinelRedisInstance * ri)1263 void releaseSentinelRedisInstance(sentinelRedisInstance *ri) {
1264     /* Release all its slaves or sentinels if any. */
1265     dictRelease(ri->sentinels);
1266     dictRelease(ri->slaves);
1267 
1268     /* Disconnect the instance. */
1269     releaseInstanceLink(ri->link,ri);
1270 
1271     /* Free other resources. */
1272     sdsfree(ri->name);
1273     sdsfree(ri->runid);
1274     sdsfree(ri->notification_script);
1275     sdsfree(ri->client_reconfig_script);
1276     sdsfree(ri->slave_master_host);
1277     sdsfree(ri->leader);
1278     sdsfree(ri->auth_pass);
1279     sdsfree(ri->info);
1280     releaseSentinelAddr(ri->addr);
1281     dictRelease(ri->renamed_commands);
1282 
1283     /* Clear state into the master if needed. */
1284     if ((ri->flags & SRI_SLAVE) && (ri->flags & SRI_PROMOTED) && ri->master)
1285         ri->master->promoted_slave = NULL;
1286 
1287     zfree(ri);
1288 }
1289 
1290 /* Lookup a slave in a master Redis instance, by ip and port. */
sentinelRedisInstanceLookupSlave(sentinelRedisInstance * ri,char * ip,int port)1291 sentinelRedisInstance *sentinelRedisInstanceLookupSlave(
1292                 sentinelRedisInstance *ri, char *ip, int port)
1293 {
1294     sds key;
1295     sentinelRedisInstance *slave;
1296     char buf[NET_PEER_ID_LEN];
1297 
1298     serverAssert(ri->flags & SRI_MASTER);
1299     anetFormatAddr(buf,sizeof(buf),ip,port);
1300     key = sdsnew(buf);
1301     slave = dictFetchValue(ri->slaves,key);
1302     sdsfree(key);
1303     return slave;
1304 }
1305 
1306 /* Return the name of the type of the instance as a string. */
sentinelRedisInstanceTypeStr(sentinelRedisInstance * ri)1307 const char *sentinelRedisInstanceTypeStr(sentinelRedisInstance *ri) {
1308     if (ri->flags & SRI_MASTER) return "master";
1309     else if (ri->flags & SRI_SLAVE) return "slave";
1310     else if (ri->flags & SRI_SENTINEL) return "sentinel";
1311     else return "unknown";
1312 }
1313 
1314 /* This function remove the Sentinel with the specified ID from the
1315  * specified master.
1316  *
1317  * If "runid" is NULL the function returns ASAP.
1318  *
1319  * This function is useful because on Sentinels address switch, we want to
1320  * remove our old entry and add a new one for the same ID but with the new
1321  * address.
1322  *
1323  * The function returns 1 if the matching Sentinel was removed, otherwise
1324  * 0 if there was no Sentinel with this ID. */
removeMatchingSentinelFromMaster(sentinelRedisInstance * master,char * runid)1325 int removeMatchingSentinelFromMaster(sentinelRedisInstance *master, char *runid) {
1326     dictIterator *di;
1327     dictEntry *de;
1328     int removed = 0;
1329 
1330     if (runid == NULL) return 0;
1331 
1332     di = dictGetSafeIterator(master->sentinels);
1333     while((de = dictNext(di)) != NULL) {
1334         sentinelRedisInstance *ri = dictGetVal(de);
1335 
1336         if (ri->runid && strcmp(ri->runid,runid) == 0) {
1337             dictDelete(master->sentinels,ri->name);
1338             removed++;
1339         }
1340     }
1341     dictReleaseIterator(di);
1342     return removed;
1343 }
1344 
1345 /* Search an instance with the same runid, ip and port into a dictionary
1346  * of instances. Return NULL if not found, otherwise return the instance
1347  * pointer.
1348  *
1349  * runid or ip can be NULL. In such a case the search is performed only
1350  * by the non-NULL field. */
getSentinelRedisInstanceByAddrAndRunID(dict * instances,char * ip,int port,char * runid)1351 sentinelRedisInstance *getSentinelRedisInstanceByAddrAndRunID(dict *instances, char *ip, int port, char *runid) {
1352     dictIterator *di;
1353     dictEntry *de;
1354     sentinelRedisInstance *instance = NULL;
1355 
1356     serverAssert(ip || runid);   /* User must pass at least one search param. */
1357     di = dictGetIterator(instances);
1358     while((de = dictNext(di)) != NULL) {
1359         sentinelRedisInstance *ri = dictGetVal(de);
1360 
1361         if (runid && !ri->runid) continue;
1362         if ((runid == NULL || strcmp(ri->runid, runid) == 0) &&
1363             (ip == NULL || (strcmp(ri->addr->ip, ip) == 0 &&
1364                             ri->addr->port == port)))
1365         {
1366             instance = ri;
1367             break;
1368         }
1369     }
1370     dictReleaseIterator(di);
1371     return instance;
1372 }
1373 
1374 /* Master lookup by name */
sentinelGetMasterByName(char * name)1375 sentinelRedisInstance *sentinelGetMasterByName(char *name) {
1376     sentinelRedisInstance *ri;
1377     sds sdsname = sdsnew(name);
1378 
1379     ri = dictFetchValue(sentinel.masters,sdsname);
1380     sdsfree(sdsname);
1381     return ri;
1382 }
1383 
1384 /* Add the specified flags to all the instances in the specified dictionary. */
sentinelAddFlagsToDictOfRedisInstances(dict * instances,int flags)1385 void sentinelAddFlagsToDictOfRedisInstances(dict *instances, int flags) {
1386     dictIterator *di;
1387     dictEntry *de;
1388 
1389     di = dictGetIterator(instances);
1390     while((de = dictNext(di)) != NULL) {
1391         sentinelRedisInstance *ri = dictGetVal(de);
1392         ri->flags |= flags;
1393     }
1394     dictReleaseIterator(di);
1395 }
1396 
1397 /* Remove the specified flags to all the instances in the specified
1398  * dictionary. */
sentinelDelFlagsToDictOfRedisInstances(dict * instances,int flags)1399 void sentinelDelFlagsToDictOfRedisInstances(dict *instances, int flags) {
1400     dictIterator *di;
1401     dictEntry *de;
1402 
1403     di = dictGetIterator(instances);
1404     while((de = dictNext(di)) != NULL) {
1405         sentinelRedisInstance *ri = dictGetVal(de);
1406         ri->flags &= ~flags;
1407     }
1408     dictReleaseIterator(di);
1409 }
1410 
1411 /* Reset the state of a monitored master:
1412  * 1) Remove all slaves.
1413  * 2) Remove all sentinels.
1414  * 3) Remove most of the flags resulting from runtime operations.
1415  * 4) Reset timers to their default value. For example after a reset it will be
1416  *    possible to failover again the same master ASAP, without waiting the
1417  *    failover timeout delay.
1418  * 5) In the process of doing this undo the failover if in progress.
1419  * 6) Disconnect the connections with the master (will reconnect automatically).
1420  */
1421 
1422 #define SENTINEL_RESET_NO_SENTINELS (1<<0)
sentinelResetMaster(sentinelRedisInstance * ri,int flags)1423 void sentinelResetMaster(sentinelRedisInstance *ri, int flags) {
1424     serverAssert(ri->flags & SRI_MASTER);
1425     dictRelease(ri->slaves);
1426     ri->slaves = dictCreate(&instancesDictType,NULL);
1427     if (!(flags & SENTINEL_RESET_NO_SENTINELS)) {
1428         dictRelease(ri->sentinels);
1429         ri->sentinels = dictCreate(&instancesDictType,NULL);
1430     }
1431     instanceLinkCloseConnection(ri->link,ri->link->cc);
1432     instanceLinkCloseConnection(ri->link,ri->link->pc);
1433     ri->flags &= SRI_MASTER;
1434     if (ri->leader) {
1435         sdsfree(ri->leader);
1436         ri->leader = NULL;
1437     }
1438     ri->failover_state = SENTINEL_FAILOVER_STATE_NONE;
1439     ri->failover_state_change_time = 0;
1440     ri->failover_start_time = 0; /* We can failover again ASAP. */
1441     ri->promoted_slave = NULL;
1442     sdsfree(ri->runid);
1443     sdsfree(ri->slave_master_host);
1444     ri->runid = NULL;
1445     ri->slave_master_host = NULL;
1446     ri->link->act_ping_time = mstime();
1447     ri->link->last_ping_time = 0;
1448     ri->link->last_avail_time = mstime();
1449     ri->link->last_pong_time = mstime();
1450     ri->role_reported_time = mstime();
1451     ri->role_reported = SRI_MASTER;
1452     if (flags & SENTINEL_GENERATE_EVENT)
1453         sentinelEvent(LL_WARNING,"+reset-master",ri,"%@");
1454 }
1455 
1456 /* Call sentinelResetMaster() on every master with a name matching the specified
1457  * pattern. */
sentinelResetMastersByPattern(char * pattern,int flags)1458 int sentinelResetMastersByPattern(char *pattern, int flags) {
1459     dictIterator *di;
1460     dictEntry *de;
1461     int reset = 0;
1462 
1463     di = dictGetIterator(sentinel.masters);
1464     while((de = dictNext(di)) != NULL) {
1465         sentinelRedisInstance *ri = dictGetVal(de);
1466 
1467         if (ri->name) {
1468             if (stringmatch(pattern,ri->name,0)) {
1469                 sentinelResetMaster(ri,flags);
1470                 reset++;
1471             }
1472         }
1473     }
1474     dictReleaseIterator(di);
1475     return reset;
1476 }
1477 
1478 /* Reset the specified master with sentinelResetMaster(), and also change
1479  * the ip:port address, but take the name of the instance unmodified.
1480  *
1481  * This is used to handle the +switch-master event.
1482  *
1483  * The function returns C_ERR if the address can't be resolved for some
1484  * reason. Otherwise C_OK is returned.  */
sentinelResetMasterAndChangeAddress(sentinelRedisInstance * master,char * ip,int port)1485 int sentinelResetMasterAndChangeAddress(sentinelRedisInstance *master, char *ip, int port) {
1486     sentinelAddr *oldaddr, *newaddr;
1487     sentinelAddr **slaves = NULL;
1488     int numslaves = 0, j;
1489     dictIterator *di;
1490     dictEntry *de;
1491 
1492     newaddr = createSentinelAddr(ip,port);
1493     if (newaddr == NULL) return C_ERR;
1494 
1495     /* Make a list of slaves to add back after the reset.
1496      * Don't include the one having the address we are switching to. */
1497     di = dictGetIterator(master->slaves);
1498     while((de = dictNext(di)) != NULL) {
1499         sentinelRedisInstance *slave = dictGetVal(de);
1500 
1501         if (sentinelAddrIsEqual(slave->addr,newaddr)) continue;
1502         slaves = zrealloc(slaves,sizeof(sentinelAddr*)*(numslaves+1));
1503         slaves[numslaves++] = createSentinelAddr(slave->addr->ip,
1504                                                  slave->addr->port);
1505     }
1506     dictReleaseIterator(di);
1507 
1508     /* If we are switching to a different address, include the old address
1509      * as a slave as well, so that we'll be able to sense / reconfigure
1510      * the old master. */
1511     if (!sentinelAddrIsEqual(newaddr,master->addr)) {
1512         slaves = zrealloc(slaves,sizeof(sentinelAddr*)*(numslaves+1));
1513         slaves[numslaves++] = createSentinelAddr(master->addr->ip,
1514                                                  master->addr->port);
1515     }
1516 
1517     /* Reset and switch address. */
1518     sentinelResetMaster(master,SENTINEL_RESET_NO_SENTINELS);
1519     oldaddr = master->addr;
1520     master->addr = newaddr;
1521     master->o_down_since_time = 0;
1522     master->s_down_since_time = 0;
1523 
1524     /* Add slaves back. */
1525     for (j = 0; j < numslaves; j++) {
1526         sentinelRedisInstance *slave;
1527 
1528         slave = createSentinelRedisInstance(NULL,SRI_SLAVE,slaves[j]->ip,
1529                     slaves[j]->port, master->quorum, master);
1530         releaseSentinelAddr(slaves[j]);
1531         if (slave) sentinelEvent(LL_NOTICE,"+slave",slave,"%@");
1532     }
1533     zfree(slaves);
1534 
1535     /* Release the old address at the end so we are safe even if the function
1536      * gets the master->addr->ip and master->addr->port as arguments. */
1537     releaseSentinelAddr(oldaddr);
1538     sentinelFlushConfig();
1539     return C_OK;
1540 }
1541 
1542 /* Return non-zero if there was no SDOWN or ODOWN error associated to this
1543  * instance in the latest 'ms' milliseconds. */
sentinelRedisInstanceNoDownFor(sentinelRedisInstance * ri,mstime_t ms)1544 int sentinelRedisInstanceNoDownFor(sentinelRedisInstance *ri, mstime_t ms) {
1545     mstime_t most_recent;
1546 
1547     most_recent = ri->s_down_since_time;
1548     if (ri->o_down_since_time > most_recent)
1549         most_recent = ri->o_down_since_time;
1550     return most_recent == 0 || (mstime() - most_recent) > ms;
1551 }
1552 
1553 /* Return the current master address, that is, its address or the address
1554  * of the promoted slave if already operational. */
sentinelGetCurrentMasterAddress(sentinelRedisInstance * master)1555 sentinelAddr *sentinelGetCurrentMasterAddress(sentinelRedisInstance *master) {
1556     /* If we are failing over the master, and the state is already
1557      * SENTINEL_FAILOVER_STATE_RECONF_SLAVES or greater, it means that we
1558      * already have the new configuration epoch in the master, and the
1559      * slave acknowledged the configuration switch. Advertise the new
1560      * address. */
1561     if ((master->flags & SRI_FAILOVER_IN_PROGRESS) &&
1562         master->promoted_slave &&
1563         master->failover_state >= SENTINEL_FAILOVER_STATE_RECONF_SLAVES)
1564     {
1565         return master->promoted_slave->addr;
1566     } else {
1567         return master->addr;
1568     }
1569 }
1570 
1571 /* This function sets the down_after_period field value in 'master' to all
1572  * the slaves and sentinel instances connected to this master. */
sentinelPropagateDownAfterPeriod(sentinelRedisInstance * master)1573 void sentinelPropagateDownAfterPeriod(sentinelRedisInstance *master) {
1574     dictIterator *di;
1575     dictEntry *de;
1576     int j;
1577     dict *d[] = {master->slaves, master->sentinels, NULL};
1578 
1579     for (j = 0; d[j]; j++) {
1580         di = dictGetIterator(d[j]);
1581         while((de = dictNext(di)) != NULL) {
1582             sentinelRedisInstance *ri = dictGetVal(de);
1583             ri->down_after_period = master->down_after_period;
1584         }
1585         dictReleaseIterator(di);
1586     }
1587 }
1588 
sentinelGetInstanceTypeString(sentinelRedisInstance * ri)1589 char *sentinelGetInstanceTypeString(sentinelRedisInstance *ri) {
1590     if (ri->flags & SRI_MASTER) return "master";
1591     else if (ri->flags & SRI_SLAVE) return "slave";
1592     else if (ri->flags & SRI_SENTINEL) return "sentinel";
1593     else return "unknown";
1594 }
1595 
1596 /* This function is used in order to send commands to Redis instances: the
1597  * commands we send from Sentinel may be renamed, a common case is a master
1598  * with CONFIG and SLAVEOF commands renamed for security concerns. In that
1599  * case we check the ri->renamed_command table (or if the instance is a slave,
1600  * we check the one of the master), and map the command that we should send
1601  * to the set of renamed commads. However, if the command was not renamed,
1602  * we just return "command" itself. */
sentinelInstanceMapCommand(sentinelRedisInstance * ri,char * command)1603 char *sentinelInstanceMapCommand(sentinelRedisInstance *ri, char *command) {
1604     sds sc = sdsnew(command);
1605     if (ri->master) ri = ri->master;
1606     char *retval = dictFetchValue(ri->renamed_commands, sc);
1607     sdsfree(sc);
1608     return retval ? retval : command;
1609 }
1610 
1611 /* ============================ Config handling ============================= */
sentinelHandleConfiguration(char ** argv,int argc)1612 char *sentinelHandleConfiguration(char **argv, int argc) {
1613     sentinelRedisInstance *ri;
1614 
1615     if (!strcasecmp(argv[0],"monitor") && argc == 5) {
1616         /* monitor <name> <host> <port> <quorum> */
1617         int quorum = atoi(argv[4]);
1618 
1619         if (quorum <= 0) return "Quorum must be 1 or greater.";
1620         if (createSentinelRedisInstance(argv[1],SRI_MASTER,argv[2],
1621                                         atoi(argv[3]),quorum,NULL) == NULL)
1622         {
1623             switch(errno) {
1624             case EBUSY: return "Duplicated master name.";
1625             case ENOENT: return "Can't resolve master instance hostname.";
1626             case EINVAL: return "Invalid port number";
1627             }
1628         }
1629     } else if (!strcasecmp(argv[0],"down-after-milliseconds") && argc == 3) {
1630         /* down-after-milliseconds <name> <milliseconds> */
1631         ri = sentinelGetMasterByName(argv[1]);
1632         if (!ri) return "No such master with specified name.";
1633         ri->down_after_period = atoi(argv[2]);
1634         if (ri->down_after_period <= 0)
1635             return "negative or zero time parameter.";
1636         sentinelPropagateDownAfterPeriod(ri);
1637     } else if (!strcasecmp(argv[0],"failover-timeout") && argc == 3) {
1638         /* failover-timeout <name> <milliseconds> */
1639         ri = sentinelGetMasterByName(argv[1]);
1640         if (!ri) return "No such master with specified name.";
1641         ri->failover_timeout = atoi(argv[2]);
1642         if (ri->failover_timeout <= 0)
1643             return "negative or zero time parameter.";
1644    } else if (!strcasecmp(argv[0],"parallel-syncs") && argc == 3) {
1645         /* parallel-syncs <name> <milliseconds> */
1646         ri = sentinelGetMasterByName(argv[1]);
1647         if (!ri) return "No such master with specified name.";
1648         ri->parallel_syncs = atoi(argv[2]);
1649    } else if (!strcasecmp(argv[0],"notification-script") && argc == 3) {
1650         /* notification-script <name> <path> */
1651         ri = sentinelGetMasterByName(argv[1]);
1652         if (!ri) return "No such master with specified name.";
1653         if (access(argv[2],X_OK) == -1)
1654             return "Notification script seems non existing or non executable.";
1655         ri->notification_script = sdsnew(argv[2]);
1656    } else if (!strcasecmp(argv[0],"client-reconfig-script") && argc == 3) {
1657         /* client-reconfig-script <name> <path> */
1658         ri = sentinelGetMasterByName(argv[1]);
1659         if (!ri) return "No such master with specified name.";
1660         if (access(argv[2],X_OK) == -1)
1661             return "Client reconfiguration script seems non existing or "
1662                    "non executable.";
1663         ri->client_reconfig_script = sdsnew(argv[2]);
1664    } else if (!strcasecmp(argv[0],"auth-pass") && argc == 3) {
1665         /* auth-pass <name> <password> */
1666         ri = sentinelGetMasterByName(argv[1]);
1667         if (!ri) return "No such master with specified name.";
1668         ri->auth_pass = sdsnew(argv[2]);
1669     } else if (!strcasecmp(argv[0],"current-epoch") && argc == 2) {
1670         /* current-epoch <epoch> */
1671         unsigned long long current_epoch = strtoull(argv[1],NULL,10);
1672         if (current_epoch > sentinel.current_epoch)
1673             sentinel.current_epoch = current_epoch;
1674     } else if (!strcasecmp(argv[0],"myid") && argc == 2) {
1675         if (strlen(argv[1]) != CONFIG_RUN_ID_SIZE)
1676             return "Malformed Sentinel id in myid option.";
1677         memcpy(sentinel.myid,argv[1],CONFIG_RUN_ID_SIZE);
1678     } else if (!strcasecmp(argv[0],"config-epoch") && argc == 3) {
1679         /* config-epoch <name> <epoch> */
1680         ri = sentinelGetMasterByName(argv[1]);
1681         if (!ri) return "No such master with specified name.";
1682         ri->config_epoch = strtoull(argv[2],NULL,10);
1683         /* The following update of current_epoch is not really useful as
1684          * now the current epoch is persisted on the config file, but
1685          * we leave this check here for redundancy. */
1686         if (ri->config_epoch > sentinel.current_epoch)
1687             sentinel.current_epoch = ri->config_epoch;
1688     } else if (!strcasecmp(argv[0],"leader-epoch") && argc == 3) {
1689         /* leader-epoch <name> <epoch> */
1690         ri = sentinelGetMasterByName(argv[1]);
1691         if (!ri) return "No such master with specified name.";
1692         ri->leader_epoch = strtoull(argv[2],NULL,10);
1693     } else if ((!strcasecmp(argv[0],"known-slave") ||
1694                 !strcasecmp(argv[0],"known-replica")) && argc == 4)
1695     {
1696         sentinelRedisInstance *slave;
1697 
1698         /* known-replica <name> <ip> <port> */
1699         ri = sentinelGetMasterByName(argv[1]);
1700         if (!ri) return "No such master with specified name.";
1701         if ((slave = createSentinelRedisInstance(NULL,SRI_SLAVE,argv[2],
1702                     atoi(argv[3]), ri->quorum, ri)) == NULL)
1703         {
1704             return "Wrong hostname or port for replica.";
1705         }
1706     } else if (!strcasecmp(argv[0],"known-sentinel") &&
1707                (argc == 4 || argc == 5)) {
1708         sentinelRedisInstance *si;
1709 
1710         if (argc == 5) { /* Ignore the old form without runid. */
1711             /* known-sentinel <name> <ip> <port> [runid] */
1712             ri = sentinelGetMasterByName(argv[1]);
1713             if (!ri) return "No such master with specified name.";
1714             if ((si = createSentinelRedisInstance(argv[4],SRI_SENTINEL,argv[2],
1715                         atoi(argv[3]), ri->quorum, ri)) == NULL)
1716             {
1717                 return "Wrong hostname or port for sentinel.";
1718             }
1719             si->runid = sdsnew(argv[4]);
1720             sentinelTryConnectionSharing(si);
1721         }
1722     } else if (!strcasecmp(argv[0],"rename-command") && argc == 4) {
1723         /* rename-command <name> <command> <renamed-command> */
1724         ri = sentinelGetMasterByName(argv[1]);
1725         if (!ri) return "No such master with specified name.";
1726         sds oldcmd = sdsnew(argv[2]);
1727         sds newcmd = sdsnew(argv[3]);
1728         if (dictAdd(ri->renamed_commands,oldcmd,newcmd) != DICT_OK) {
1729             sdsfree(oldcmd);
1730             sdsfree(newcmd);
1731             return "Same command renamed multiple times with rename-command.";
1732         }
1733     } else if (!strcasecmp(argv[0],"announce-ip") && argc == 2) {
1734         /* announce-ip <ip-address> */
1735         if (strlen(argv[1]))
1736             sentinel.announce_ip = sdsnew(argv[1]);
1737     } else if (!strcasecmp(argv[0],"announce-port") && argc == 2) {
1738         /* announce-port <port> */
1739         sentinel.announce_port = atoi(argv[1]);
1740     } else if (!strcasecmp(argv[0],"deny-scripts-reconfig") && argc == 2) {
1741         /* deny-scripts-reconfig <yes|no> */
1742         if ((sentinel.deny_scripts_reconfig = yesnotoi(argv[1])) == -1) {
1743             return "Please specify yes or no for the "
1744                    "deny-scripts-reconfig options.";
1745         }
1746     } else {
1747         return "Unrecognized sentinel configuration statement.";
1748     }
1749     return NULL;
1750 }
1751 
1752 /* Implements CONFIG REWRITE for "sentinel" option.
1753  * This is used not just to rewrite the configuration given by the user
1754  * (the configured masters) but also in order to retain the state of
1755  * Sentinel across restarts: config epoch of masters, associated slaves
1756  * and sentinel instances, and so forth. */
rewriteConfigSentinelOption(struct rewriteConfigState * state)1757 void rewriteConfigSentinelOption(struct rewriteConfigState *state) {
1758     dictIterator *di, *di2;
1759     dictEntry *de;
1760     sds line;
1761 
1762     /* sentinel unique ID. */
1763     line = sdscatprintf(sdsempty(), "sentinel myid %s", sentinel.myid);
1764     rewriteConfigRewriteLine(state,"sentinel",line,1);
1765 
1766     /* sentinel deny-scripts-reconfig. */
1767     line = sdscatprintf(sdsempty(), "sentinel deny-scripts-reconfig %s",
1768         sentinel.deny_scripts_reconfig ? "yes" : "no");
1769     rewriteConfigRewriteLine(state,"sentinel",line,
1770         sentinel.deny_scripts_reconfig != SENTINEL_DEFAULT_DENY_SCRIPTS_RECONFIG);
1771 
1772     /* For every master emit a "sentinel monitor" config entry. */
1773     di = dictGetIterator(sentinel.masters);
1774     while((de = dictNext(di)) != NULL) {
1775         sentinelRedisInstance *master, *ri;
1776         sentinelAddr *master_addr;
1777 
1778         /* sentinel monitor */
1779         master = dictGetVal(de);
1780         master_addr = sentinelGetCurrentMasterAddress(master);
1781         line = sdscatprintf(sdsempty(),"sentinel monitor %s %s %d %d",
1782             master->name, master_addr->ip, master_addr->port,
1783             master->quorum);
1784         rewriteConfigRewriteLine(state,"sentinel",line,1);
1785 
1786         /* sentinel down-after-milliseconds */
1787         if (master->down_after_period != SENTINEL_DEFAULT_DOWN_AFTER) {
1788             line = sdscatprintf(sdsempty(),
1789                 "sentinel down-after-milliseconds %s %ld",
1790                 master->name, (long) master->down_after_period);
1791             rewriteConfigRewriteLine(state,"sentinel",line,1);
1792         }
1793 
1794         /* sentinel failover-timeout */
1795         if (master->failover_timeout != SENTINEL_DEFAULT_FAILOVER_TIMEOUT) {
1796             line = sdscatprintf(sdsempty(),
1797                 "sentinel failover-timeout %s %ld",
1798                 master->name, (long) master->failover_timeout);
1799             rewriteConfigRewriteLine(state,"sentinel",line,1);
1800         }
1801 
1802         /* sentinel parallel-syncs */
1803         if (master->parallel_syncs != SENTINEL_DEFAULT_PARALLEL_SYNCS) {
1804             line = sdscatprintf(sdsempty(),
1805                 "sentinel parallel-syncs %s %d",
1806                 master->name, master->parallel_syncs);
1807             rewriteConfigRewriteLine(state,"sentinel",line,1);
1808         }
1809 
1810         /* sentinel notification-script */
1811         if (master->notification_script) {
1812             line = sdscatprintf(sdsempty(),
1813                 "sentinel notification-script %s %s",
1814                 master->name, master->notification_script);
1815             rewriteConfigRewriteLine(state,"sentinel",line,1);
1816         }
1817 
1818         /* sentinel client-reconfig-script */
1819         if (master->client_reconfig_script) {
1820             line = sdscatprintf(sdsempty(),
1821                 "sentinel client-reconfig-script %s %s",
1822                 master->name, master->client_reconfig_script);
1823             rewriteConfigRewriteLine(state,"sentinel",line,1);
1824         }
1825 
1826         /* sentinel auth-pass */
1827         if (master->auth_pass) {
1828             line = sdscatprintf(sdsempty(),
1829                 "sentinel auth-pass %s %s",
1830                 master->name, master->auth_pass);
1831             rewriteConfigRewriteLine(state,"sentinel",line,1);
1832         }
1833 
1834         /* sentinel config-epoch */
1835         line = sdscatprintf(sdsempty(),
1836             "sentinel config-epoch %s %llu",
1837             master->name, (unsigned long long) master->config_epoch);
1838         rewriteConfigRewriteLine(state,"sentinel",line,1);
1839 
1840         /* sentinel leader-epoch */
1841         line = sdscatprintf(sdsempty(),
1842             "sentinel leader-epoch %s %llu",
1843             master->name, (unsigned long long) master->leader_epoch);
1844         rewriteConfigRewriteLine(state,"sentinel",line,1);
1845 
1846         /* sentinel known-slave */
1847         di2 = dictGetIterator(master->slaves);
1848         while((de = dictNext(di2)) != NULL) {
1849             sentinelAddr *slave_addr;
1850 
1851             ri = dictGetVal(de);
1852             slave_addr = ri->addr;
1853 
1854             /* If master_addr (obtained using sentinelGetCurrentMasterAddress()
1855              * so it may be the address of the promoted slave) is equal to this
1856              * slave's address, a failover is in progress and the slave was
1857              * already successfully promoted. So as the address of this slave
1858              * we use the old master address instead. */
1859             if (sentinelAddrIsEqual(slave_addr,master_addr))
1860                 slave_addr = master->addr;
1861             line = sdscatprintf(sdsempty(),
1862                 "sentinel known-replica %s %s %d",
1863                 master->name, slave_addr->ip, slave_addr->port);
1864             rewriteConfigRewriteLine(state,"sentinel",line,1);
1865         }
1866         dictReleaseIterator(di2);
1867 
1868         /* sentinel known-sentinel */
1869         di2 = dictGetIterator(master->sentinels);
1870         while((de = dictNext(di2)) != NULL) {
1871             ri = dictGetVal(de);
1872             if (ri->runid == NULL) continue;
1873             line = sdscatprintf(sdsempty(),
1874                 "sentinel known-sentinel %s %s %d %s",
1875                 master->name, ri->addr->ip, ri->addr->port, ri->runid);
1876             rewriteConfigRewriteLine(state,"sentinel",line,1);
1877         }
1878         dictReleaseIterator(di2);
1879 
1880         /* sentinel rename-command */
1881         di2 = dictGetIterator(master->renamed_commands);
1882         while((de = dictNext(di2)) != NULL) {
1883             sds oldname = dictGetKey(de);
1884             sds newname = dictGetVal(de);
1885             line = sdscatprintf(sdsempty(),
1886                 "sentinel rename-command %s %s %s",
1887                 master->name, oldname, newname);
1888             rewriteConfigRewriteLine(state,"sentinel",line,1);
1889         }
1890         dictReleaseIterator(di2);
1891     }
1892 
1893     /* sentinel current-epoch is a global state valid for all the masters. */
1894     line = sdscatprintf(sdsempty(),
1895         "sentinel current-epoch %llu", (unsigned long long) sentinel.current_epoch);
1896     rewriteConfigRewriteLine(state,"sentinel",line,1);
1897 
1898     /* sentinel announce-ip. */
1899     if (sentinel.announce_ip) {
1900         line = sdsnew("sentinel announce-ip ");
1901         line = sdscatrepr(line, sentinel.announce_ip, sdslen(sentinel.announce_ip));
1902         rewriteConfigRewriteLine(state,"sentinel",line,1);
1903     }
1904 
1905     /* sentinel announce-port. */
1906     if (sentinel.announce_port) {
1907         line = sdscatprintf(sdsempty(),"sentinel announce-port %d",
1908                             sentinel.announce_port);
1909         rewriteConfigRewriteLine(state,"sentinel",line,1);
1910     }
1911 
1912     dictReleaseIterator(di);
1913 }
1914 
1915 /* This function uses the config rewriting Redis engine in order to persist
1916  * the state of the Sentinel in the current configuration file.
1917  *
1918  * Before returning the function calls fsync() against the generated
1919  * configuration file to make sure changes are committed to disk.
1920  *
1921  * On failure the function logs a warning on the Redis log. */
sentinelFlushConfig(void)1922 void sentinelFlushConfig(void) {
1923     int fd = -1;
1924     int saved_hz = server.hz;
1925     int rewrite_status;
1926 
1927     server.hz = CONFIG_DEFAULT_HZ;
1928     rewrite_status = rewriteConfig(server.configfile);
1929     server.hz = saved_hz;
1930 
1931     if (rewrite_status == -1) goto werr;
1932     if ((fd = open(server.configfile,O_RDONLY)) == -1) goto werr;
1933     if (fsync(fd) == -1) goto werr;
1934     if (close(fd) == EOF) goto werr;
1935     return;
1936 
1937 werr:
1938     if (fd != -1) close(fd);
1939     serverLog(LL_WARNING,"WARNING: Sentinel was not able to save the new configuration on disk!!!: %s", strerror(errno));
1940 }
1941 
1942 /* ====================== hiredis connection handling ======================= */
1943 
1944 /* Send the AUTH command with the specified master password if needed.
1945  * Note that for slaves the password set for the master is used.
1946  *
1947  * In case this Sentinel requires a password as well, via the "requirepass"
1948  * configuration directive, we assume we should use the local password in
1949  * order to authenticate when connecting with the other Sentinels as well.
1950  * So basically all the Sentinels share the same password and use it to
1951  * authenticate reciprocally.
1952  *
1953  * We don't check at all if the command was successfully transmitted
1954  * to the instance as if it fails Sentinel will detect the instance down,
1955  * will disconnect and reconnect the link and so forth. */
sentinelSendAuthIfNeeded(sentinelRedisInstance * ri,redisAsyncContext * c)1956 void sentinelSendAuthIfNeeded(sentinelRedisInstance *ri, redisAsyncContext *c) {
1957     char *auth_pass = NULL;
1958 
1959     if (ri->flags & SRI_MASTER) {
1960         auth_pass = ri->auth_pass;
1961     } else if (ri->flags & SRI_SLAVE) {
1962         auth_pass = ri->master->auth_pass;
1963     } else if (ri->flags & SRI_SENTINEL) {
1964         if (server.requirepass) auth_pass = server.requirepass;
1965     }
1966 
1967     if (auth_pass) {
1968         if (redisAsyncCommand(c, sentinelDiscardReplyCallback, ri, "%s %s",
1969             sentinelInstanceMapCommand(ri,"AUTH"),
1970             auth_pass) == C_OK) ri->link->pending_commands++;
1971     }
1972 }
1973 
1974 /* Use CLIENT SETNAME to name the connection in the Redis instance as
1975  * sentinel-<first_8_chars_of_runid>-<connection_type>
1976  * The connection type is "cmd" or "pubsub" as specified by 'type'.
1977  *
1978  * This makes it possible to list all the sentinel instances connected
1979  * to a Redis servewr with CLIENT LIST, grepping for a specific name format. */
sentinelSetClientName(sentinelRedisInstance * ri,redisAsyncContext * c,char * type)1980 void sentinelSetClientName(sentinelRedisInstance *ri, redisAsyncContext *c, char *type) {
1981     char name[64];
1982 
1983     snprintf(name,sizeof(name),"sentinel-%.8s-%s",sentinel.myid,type);
1984     if (redisAsyncCommand(c, sentinelDiscardReplyCallback, ri,
1985         "%s SETNAME %s",
1986         sentinelInstanceMapCommand(ri,"CLIENT"),
1987         name) == C_OK)
1988     {
1989         ri->link->pending_commands++;
1990     }
1991 }
1992 
1993 /* Create the async connections for the instance link if the link
1994  * is disconnected. Note that link->disconnected is true even if just
1995  * one of the two links (commands and pub/sub) is missing. */
sentinelReconnectInstance(sentinelRedisInstance * ri)1996 void sentinelReconnectInstance(sentinelRedisInstance *ri) {
1997     if (ri->link->disconnected == 0) return;
1998     if (ri->addr->port == 0) return; /* port == 0 means invalid address. */
1999     instanceLink *link = ri->link;
2000     mstime_t now = mstime();
2001 
2002     if (now - ri->link->last_reconn_time < SENTINEL_PING_PERIOD) return;
2003     ri->link->last_reconn_time = now;
2004 
2005     /* Commands connection. */
2006     if (link->cc == NULL) {
2007         link->cc = redisAsyncConnectBind(ri->addr->ip,ri->addr->port,NET_FIRST_BIND_ADDR);
2008         if (link->cc->err) {
2009             sentinelEvent(LL_DEBUG,"-cmd-link-reconnection",ri,"%@ #%s",
2010                 link->cc->errstr);
2011             instanceLinkCloseConnection(link,link->cc);
2012         } else {
2013             link->pending_commands = 0;
2014             link->cc_conn_time = mstime();
2015             link->cc->data = link;
2016             redisAeAttach(server.el,link->cc);
2017             redisAsyncSetConnectCallback(link->cc,
2018                     sentinelLinkEstablishedCallback);
2019             redisAsyncSetDisconnectCallback(link->cc,
2020                     sentinelDisconnectCallback);
2021             sentinelSendAuthIfNeeded(ri,link->cc);
2022             sentinelSetClientName(ri,link->cc,"cmd");
2023 
2024             /* Send a PING ASAP when reconnecting. */
2025             sentinelSendPing(ri);
2026         }
2027     }
2028     /* Pub / Sub */
2029     if ((ri->flags & (SRI_MASTER|SRI_SLAVE)) && link->pc == NULL) {
2030         link->pc = redisAsyncConnectBind(ri->addr->ip,ri->addr->port,NET_FIRST_BIND_ADDR);
2031         if (link->pc->err) {
2032             sentinelEvent(LL_DEBUG,"-pubsub-link-reconnection",ri,"%@ #%s",
2033                 link->pc->errstr);
2034             instanceLinkCloseConnection(link,link->pc);
2035         } else {
2036             int retval;
2037 
2038             link->pc_conn_time = mstime();
2039             link->pc->data = link;
2040             redisAeAttach(server.el,link->pc);
2041             redisAsyncSetConnectCallback(link->pc,
2042                     sentinelLinkEstablishedCallback);
2043             redisAsyncSetDisconnectCallback(link->pc,
2044                     sentinelDisconnectCallback);
2045             sentinelSendAuthIfNeeded(ri,link->pc);
2046             sentinelSetClientName(ri,link->pc,"pubsub");
2047             /* Now we subscribe to the Sentinels "Hello" channel. */
2048             retval = redisAsyncCommand(link->pc,
2049                 sentinelReceiveHelloMessages, ri, "%s %s",
2050                 sentinelInstanceMapCommand(ri,"SUBSCRIBE"),
2051                 SENTINEL_HELLO_CHANNEL);
2052             if (retval != C_OK) {
2053                 /* If we can't subscribe, the Pub/Sub connection is useless
2054                  * and we can simply disconnect it and try again. */
2055                 instanceLinkCloseConnection(link,link->pc);
2056                 return;
2057             }
2058         }
2059     }
2060     /* Clear the disconnected status only if we have both the connections
2061      * (or just the commands connection if this is a sentinel instance). */
2062     if (link->cc && (ri->flags & SRI_SENTINEL || link->pc))
2063         link->disconnected = 0;
2064 }
2065 
2066 /* ======================== Redis instances pinging  ======================== */
2067 
2068 /* Return true if master looks "sane", that is:
2069  * 1) It is actually a master in the current configuration.
2070  * 2) It reports itself as a master.
2071  * 3) It is not SDOWN or ODOWN.
2072  * 4) We obtained last INFO no more than two times the INFO period time ago. */
sentinelMasterLooksSane(sentinelRedisInstance * master)2073 int sentinelMasterLooksSane(sentinelRedisInstance *master) {
2074     return
2075         master->flags & SRI_MASTER &&
2076         master->role_reported == SRI_MASTER &&
2077         (master->flags & (SRI_S_DOWN|SRI_O_DOWN)) == 0 &&
2078         (mstime() - master->info_refresh) < SENTINEL_INFO_PERIOD*2;
2079 }
2080 
2081 /* Process the INFO output from masters. */
sentinelRefreshInstanceInfo(sentinelRedisInstance * ri,const char * info)2082 void sentinelRefreshInstanceInfo(sentinelRedisInstance *ri, const char *info) {
2083     sds *lines;
2084     int numlines, j;
2085     int role = 0;
2086 
2087     /* cache full INFO output for instance */
2088     sdsfree(ri->info);
2089     ri->info = sdsnew(info);
2090 
2091     /* The following fields must be reset to a given value in the case they
2092      * are not found at all in the INFO output. */
2093     ri->master_link_down_time = 0;
2094 
2095     /* Process line by line. */
2096     lines = sdssplitlen(info,strlen(info),"\r\n",2,&numlines);
2097     for (j = 0; j < numlines; j++) {
2098         sentinelRedisInstance *slave;
2099         sds l = lines[j];
2100 
2101         /* run_id:<40 hex chars>*/
2102         if (sdslen(l) >= 47 && !memcmp(l,"run_id:",7)) {
2103             if (ri->runid == NULL) {
2104                 ri->runid = sdsnewlen(l+7,40);
2105             } else {
2106                 if (strncmp(ri->runid,l+7,40) != 0) {
2107                     sentinelEvent(LL_NOTICE,"+reboot",ri,"%@");
2108                     sdsfree(ri->runid);
2109                     ri->runid = sdsnewlen(l+7,40);
2110                 }
2111             }
2112         }
2113 
2114         /* old versions: slave0:<ip>,<port>,<state>
2115          * new versions: slave0:ip=127.0.0.1,port=9999,... */
2116         if ((ri->flags & SRI_MASTER) &&
2117             sdslen(l) >= 7 &&
2118             !memcmp(l,"slave",5) && isdigit(l[5]))
2119         {
2120             char *ip, *port, *end;
2121 
2122             if (strstr(l,"ip=") == NULL) {
2123                 /* Old format. */
2124                 ip = strchr(l,':'); if (!ip) continue;
2125                 ip++; /* Now ip points to start of ip address. */
2126                 port = strchr(ip,','); if (!port) continue;
2127                 *port = '\0'; /* nul term for easy access. */
2128                 port++; /* Now port points to start of port number. */
2129                 end = strchr(port,','); if (!end) continue;
2130                 *end = '\0'; /* nul term for easy access. */
2131             } else {
2132                 /* New format. */
2133                 ip = strstr(l,"ip="); if (!ip) continue;
2134                 ip += 3; /* Now ip points to start of ip address. */
2135                 port = strstr(l,"port="); if (!port) continue;
2136                 port += 5; /* Now port points to start of port number. */
2137                 /* Nul term both fields for easy access. */
2138                 end = strchr(ip,','); if (end) *end = '\0';
2139                 end = strchr(port,','); if (end) *end = '\0';
2140             }
2141 
2142             /* Check if we already have this slave into our table,
2143              * otherwise add it. */
2144             if (sentinelRedisInstanceLookupSlave(ri,ip,atoi(port)) == NULL) {
2145                 if ((slave = createSentinelRedisInstance(NULL,SRI_SLAVE,ip,
2146                             atoi(port), ri->quorum, ri)) != NULL)
2147                 {
2148                     sentinelEvent(LL_NOTICE,"+slave",slave,"%@");
2149                     sentinelFlushConfig();
2150                 }
2151             }
2152         }
2153 
2154         /* master_link_down_since_seconds:<seconds> */
2155         if (sdslen(l) >= 32 &&
2156             !memcmp(l,"master_link_down_since_seconds",30))
2157         {
2158             ri->master_link_down_time = strtoll(l+31,NULL,10)*1000;
2159         }
2160 
2161         /* role:<role> */
2162         if (!memcmp(l,"role:master",11)) role = SRI_MASTER;
2163         else if (!memcmp(l,"role:slave",10)) role = SRI_SLAVE;
2164 
2165         if (role == SRI_SLAVE) {
2166             /* master_host:<host> */
2167             if (sdslen(l) >= 12 && !memcmp(l,"master_host:",12)) {
2168                 if (ri->slave_master_host == NULL ||
2169                     strcasecmp(l+12,ri->slave_master_host))
2170                 {
2171                     sdsfree(ri->slave_master_host);
2172                     ri->slave_master_host = sdsnew(l+12);
2173                     ri->slave_conf_change_time = mstime();
2174                 }
2175             }
2176 
2177             /* master_port:<port> */
2178             if (sdslen(l) >= 12 && !memcmp(l,"master_port:",12)) {
2179                 int slave_master_port = atoi(l+12);
2180 
2181                 if (ri->slave_master_port != slave_master_port) {
2182                     ri->slave_master_port = slave_master_port;
2183                     ri->slave_conf_change_time = mstime();
2184                 }
2185             }
2186 
2187             /* master_link_status:<status> */
2188             if (sdslen(l) >= 19 && !memcmp(l,"master_link_status:",19)) {
2189                 ri->slave_master_link_status =
2190                     (strcasecmp(l+19,"up") == 0) ?
2191                     SENTINEL_MASTER_LINK_STATUS_UP :
2192                     SENTINEL_MASTER_LINK_STATUS_DOWN;
2193             }
2194 
2195             /* slave_priority:<priority> */
2196             if (sdslen(l) >= 15 && !memcmp(l,"slave_priority:",15))
2197                 ri->slave_priority = atoi(l+15);
2198 
2199             /* slave_repl_offset:<offset> */
2200             if (sdslen(l) >= 18 && !memcmp(l,"slave_repl_offset:",18))
2201                 ri->slave_repl_offset = strtoull(l+18,NULL,10);
2202         }
2203     }
2204     ri->info_refresh = mstime();
2205     sdsfreesplitres(lines,numlines);
2206 
2207     /* ---------------------------- Acting half -----------------------------
2208      * Some things will not happen if sentinel.tilt is true, but some will
2209      * still be processed. */
2210 
2211     /* Remember when the role changed. */
2212     if (role != ri->role_reported) {
2213         ri->role_reported_time = mstime();
2214         ri->role_reported = role;
2215         if (role == SRI_SLAVE) ri->slave_conf_change_time = mstime();
2216         /* Log the event with +role-change if the new role is coherent or
2217          * with -role-change if there is a mismatch with the current config. */
2218         sentinelEvent(LL_VERBOSE,
2219             ((ri->flags & (SRI_MASTER|SRI_SLAVE)) == role) ?
2220             "+role-change" : "-role-change",
2221             ri, "%@ new reported role is %s",
2222             role == SRI_MASTER ? "master" : "slave",
2223             ri->flags & SRI_MASTER ? "master" : "slave");
2224     }
2225 
2226     /* None of the following conditions are processed when in tilt mode, so
2227      * return asap. */
2228     if (sentinel.tilt) return;
2229 
2230     /* Handle master -> slave role switch. */
2231     if ((ri->flags & SRI_MASTER) && role == SRI_SLAVE) {
2232         /* Nothing to do, but masters claiming to be slaves are
2233          * considered to be unreachable by Sentinel, so eventually
2234          * a failover will be triggered. */
2235     }
2236 
2237     /* Handle slave -> master role switch. */
2238     if ((ri->flags & SRI_SLAVE) && role == SRI_MASTER) {
2239         /* If this is a promoted slave we can change state to the
2240          * failover state machine. */
2241         if ((ri->flags & SRI_PROMOTED) &&
2242             (ri->master->flags & SRI_FAILOVER_IN_PROGRESS) &&
2243             (ri->master->failover_state ==
2244                 SENTINEL_FAILOVER_STATE_WAIT_PROMOTION))
2245         {
2246             /* Now that we are sure the slave was reconfigured as a master
2247              * set the master configuration epoch to the epoch we won the
2248              * election to perform this failover. This will force the other
2249              * Sentinels to update their config (assuming there is not
2250              * a newer one already available). */
2251             ri->master->config_epoch = ri->master->failover_epoch;
2252             ri->master->failover_state = SENTINEL_FAILOVER_STATE_RECONF_SLAVES;
2253             ri->master->failover_state_change_time = mstime();
2254             sentinelFlushConfig();
2255             sentinelEvent(LL_WARNING,"+promoted-slave",ri,"%@");
2256             if (sentinel.simfailure_flags &
2257                 SENTINEL_SIMFAILURE_CRASH_AFTER_PROMOTION)
2258                 sentinelSimFailureCrash();
2259             sentinelEvent(LL_WARNING,"+failover-state-reconf-slaves",
2260                 ri->master,"%@");
2261             sentinelCallClientReconfScript(ri->master,SENTINEL_LEADER,
2262                 "start",ri->master->addr,ri->addr);
2263             sentinelForceHelloUpdateForMaster(ri->master);
2264         } else {
2265             /* A slave turned into a master. We want to force our view and
2266              * reconfigure as slave. Wait some time after the change before
2267              * going forward, to receive new configs if any. */
2268             mstime_t wait_time = SENTINEL_PUBLISH_PERIOD*4;
2269 
2270             if (!(ri->flags & SRI_PROMOTED) &&
2271                  sentinelMasterLooksSane(ri->master) &&
2272                  sentinelRedisInstanceNoDownFor(ri,wait_time) &&
2273                  mstime() - ri->role_reported_time > wait_time)
2274             {
2275                 int retval = sentinelSendSlaveOf(ri,
2276                         ri->master->addr->ip,
2277                         ri->master->addr->port);
2278                 if (retval == C_OK)
2279                     sentinelEvent(LL_NOTICE,"+convert-to-slave",ri,"%@");
2280             }
2281         }
2282     }
2283 
2284     /* Handle slaves replicating to a different master address. */
2285     if ((ri->flags & SRI_SLAVE) &&
2286         role == SRI_SLAVE &&
2287         (ri->slave_master_port != ri->master->addr->port ||
2288          strcasecmp(ri->slave_master_host,ri->master->addr->ip)))
2289     {
2290         mstime_t wait_time = ri->master->failover_timeout;
2291 
2292         /* Make sure the master is sane before reconfiguring this instance
2293          * into a slave. */
2294         if (sentinelMasterLooksSane(ri->master) &&
2295             sentinelRedisInstanceNoDownFor(ri,wait_time) &&
2296             mstime() - ri->slave_conf_change_time > wait_time)
2297         {
2298             int retval = sentinelSendSlaveOf(ri,
2299                     ri->master->addr->ip,
2300                     ri->master->addr->port);
2301             if (retval == C_OK)
2302                 sentinelEvent(LL_NOTICE,"+fix-slave-config",ri,"%@");
2303         }
2304     }
2305 
2306     /* Detect if the slave that is in the process of being reconfigured
2307      * changed state. */
2308     if ((ri->flags & SRI_SLAVE) && role == SRI_SLAVE &&
2309         (ri->flags & (SRI_RECONF_SENT|SRI_RECONF_INPROG)))
2310     {
2311         /* SRI_RECONF_SENT -> SRI_RECONF_INPROG. */
2312         if ((ri->flags & SRI_RECONF_SENT) &&
2313             ri->slave_master_host &&
2314             strcmp(ri->slave_master_host,
2315                     ri->master->promoted_slave->addr->ip) == 0 &&
2316             ri->slave_master_port == ri->master->promoted_slave->addr->port)
2317         {
2318             ri->flags &= ~SRI_RECONF_SENT;
2319             ri->flags |= SRI_RECONF_INPROG;
2320             sentinelEvent(LL_NOTICE,"+slave-reconf-inprog",ri,"%@");
2321         }
2322 
2323         /* SRI_RECONF_INPROG -> SRI_RECONF_DONE */
2324         if ((ri->flags & SRI_RECONF_INPROG) &&
2325             ri->slave_master_link_status == SENTINEL_MASTER_LINK_STATUS_UP)
2326         {
2327             ri->flags &= ~SRI_RECONF_INPROG;
2328             ri->flags |= SRI_RECONF_DONE;
2329             sentinelEvent(LL_NOTICE,"+slave-reconf-done",ri,"%@");
2330         }
2331     }
2332 }
2333 
sentinelInfoReplyCallback(redisAsyncContext * c,void * reply,void * privdata)2334 void sentinelInfoReplyCallback(redisAsyncContext *c, void *reply, void *privdata) {
2335     sentinelRedisInstance *ri = privdata;
2336     instanceLink *link = c->data;
2337     redisReply *r;
2338 
2339     if (!reply || !link) return;
2340     link->pending_commands--;
2341     r = reply;
2342 
2343     if (r->type == REDIS_REPLY_STRING)
2344         sentinelRefreshInstanceInfo(ri,r->str);
2345 }
2346 
2347 /* Just discard the reply. We use this when we are not monitoring the return
2348  * value of the command but its effects directly. */
sentinelDiscardReplyCallback(redisAsyncContext * c,void * reply,void * privdata)2349 void sentinelDiscardReplyCallback(redisAsyncContext *c, void *reply, void *privdata) {
2350     instanceLink *link = c->data;
2351     UNUSED(reply);
2352     UNUSED(privdata);
2353 
2354     if (link) link->pending_commands--;
2355 }
2356 
sentinelPingReplyCallback(redisAsyncContext * c,void * reply,void * privdata)2357 void sentinelPingReplyCallback(redisAsyncContext *c, void *reply, void *privdata) {
2358     sentinelRedisInstance *ri = privdata;
2359     instanceLink *link = c->data;
2360     redisReply *r;
2361 
2362     if (!reply || !link) return;
2363     link->pending_commands--;
2364     r = reply;
2365 
2366     if (r->type == REDIS_REPLY_STATUS ||
2367         r->type == REDIS_REPLY_ERROR) {
2368         /* Update the "instance available" field only if this is an
2369          * acceptable reply. */
2370         if (strncmp(r->str,"PONG",4) == 0 ||
2371             strncmp(r->str,"LOADING",7) == 0 ||
2372             strncmp(r->str,"MASTERDOWN",10) == 0)
2373         {
2374             link->last_avail_time = mstime();
2375             link->act_ping_time = 0; /* Flag the pong as received. */
2376         } else {
2377             /* Send a SCRIPT KILL command if the instance appears to be
2378              * down because of a busy script. */
2379             if (strncmp(r->str,"BUSY",4) == 0 &&
2380                 (ri->flags & SRI_S_DOWN) &&
2381                 !(ri->flags & SRI_SCRIPT_KILL_SENT))
2382             {
2383                 if (redisAsyncCommand(ri->link->cc,
2384                         sentinelDiscardReplyCallback, ri,
2385                         "%s KILL",
2386                         sentinelInstanceMapCommand(ri,"SCRIPT")) == C_OK)
2387                 {
2388                     ri->link->pending_commands++;
2389                 }
2390                 ri->flags |= SRI_SCRIPT_KILL_SENT;
2391             }
2392         }
2393     }
2394     link->last_pong_time = mstime();
2395 }
2396 
2397 /* This is called when we get the reply about the PUBLISH command we send
2398  * to the master to advertise this sentinel. */
sentinelPublishReplyCallback(redisAsyncContext * c,void * reply,void * privdata)2399 void sentinelPublishReplyCallback(redisAsyncContext *c, void *reply, void *privdata) {
2400     sentinelRedisInstance *ri = privdata;
2401     instanceLink *link = c->data;
2402     redisReply *r;
2403 
2404     if (!reply || !link) return;
2405     link->pending_commands--;
2406     r = reply;
2407 
2408     /* Only update pub_time if we actually published our message. Otherwise
2409      * we'll retry again in 100 milliseconds. */
2410     if (r->type != REDIS_REPLY_ERROR)
2411         ri->last_pub_time = mstime();
2412 }
2413 
2414 /* Process an hello message received via Pub/Sub in master or slave instance,
2415  * or sent directly to this sentinel via the (fake) PUBLISH command of Sentinel.
2416  *
2417  * If the master name specified in the message is not known, the message is
2418  * discarded. */
sentinelProcessHelloMessage(char * hello,int hello_len)2419 void sentinelProcessHelloMessage(char *hello, int hello_len) {
2420     /* Format is composed of 8 tokens:
2421      * 0=ip,1=port,2=runid,3=current_epoch,4=master_name,
2422      * 5=master_ip,6=master_port,7=master_config_epoch. */
2423     int numtokens, port, removed, master_port;
2424     uint64_t current_epoch, master_config_epoch;
2425     char **token = sdssplitlen(hello, hello_len, ",", 1, &numtokens);
2426     sentinelRedisInstance *si, *master;
2427 
2428     if (numtokens == 8) {
2429         /* Obtain a reference to the master this hello message is about */
2430         master = sentinelGetMasterByName(token[4]);
2431         if (!master) goto cleanup; /* Unknown master, skip the message. */
2432 
2433         /* First, try to see if we already have this sentinel. */
2434         port = atoi(token[1]);
2435         master_port = atoi(token[6]);
2436         si = getSentinelRedisInstanceByAddrAndRunID(
2437                         master->sentinels,token[0],port,token[2]);
2438         current_epoch = strtoull(token[3],NULL,10);
2439         master_config_epoch = strtoull(token[7],NULL,10);
2440 
2441         if (!si) {
2442             /* If not, remove all the sentinels that have the same runid
2443              * because there was an address change, and add the same Sentinel
2444              * with the new address back. */
2445             removed = removeMatchingSentinelFromMaster(master,token[2]);
2446             if (removed) {
2447                 sentinelEvent(LL_NOTICE,"+sentinel-address-switch",master,
2448                     "%@ ip %s port %d for %s", token[0],port,token[2]);
2449             } else {
2450                 /* Check if there is another Sentinel with the same address this
2451                  * new one is reporting. What we do if this happens is to set its
2452                  * port to 0, to signal the address is invalid. We'll update it
2453                  * later if we get an HELLO message. */
2454                 sentinelRedisInstance *other =
2455                     getSentinelRedisInstanceByAddrAndRunID(
2456                         master->sentinels, token[0],port,NULL);
2457                 if (other) {
2458                     sentinelEvent(LL_NOTICE,"+sentinel-invalid-addr",other,"%@");
2459                     other->addr->port = 0; /* It means: invalid address. */
2460                     sentinelUpdateSentinelAddressInAllMasters(other);
2461                 }
2462             }
2463 
2464             /* Add the new sentinel. */
2465             si = createSentinelRedisInstance(token[2],SRI_SENTINEL,
2466                             token[0],port,master->quorum,master);
2467 
2468             if (si) {
2469                 if (!removed) sentinelEvent(LL_NOTICE,"+sentinel",si,"%@");
2470                 /* The runid is NULL after a new instance creation and
2471                  * for Sentinels we don't have a later chance to fill it,
2472                  * so do it now. */
2473                 si->runid = sdsnew(token[2]);
2474                 sentinelTryConnectionSharing(si);
2475                 if (removed) sentinelUpdateSentinelAddressInAllMasters(si);
2476                 sentinelFlushConfig();
2477             }
2478         }
2479 
2480         /* Update local current_epoch if received current_epoch is greater.*/
2481         if (current_epoch > sentinel.current_epoch) {
2482             sentinel.current_epoch = current_epoch;
2483             sentinelFlushConfig();
2484             sentinelEvent(LL_WARNING,"+new-epoch",master,"%llu",
2485                 (unsigned long long) sentinel.current_epoch);
2486         }
2487 
2488         /* Update master info if received configuration is newer. */
2489         if (si && master->config_epoch < master_config_epoch) {
2490             master->config_epoch = master_config_epoch;
2491             if (master_port != master->addr->port ||
2492                 strcmp(master->addr->ip, token[5]))
2493             {
2494                 sentinelAddr *old_addr;
2495 
2496                 sentinelEvent(LL_WARNING,"+config-update-from",si,"%@");
2497                 sentinelEvent(LL_WARNING,"+switch-master",
2498                     master,"%s %s %d %s %d",
2499                     master->name,
2500                     master->addr->ip, master->addr->port,
2501                     token[5], master_port);
2502 
2503                 old_addr = dupSentinelAddr(master->addr);
2504                 sentinelResetMasterAndChangeAddress(master, token[5], master_port);
2505                 sentinelCallClientReconfScript(master,
2506                     SENTINEL_OBSERVER,"start",
2507                     old_addr,master->addr);
2508                 releaseSentinelAddr(old_addr);
2509             }
2510         }
2511 
2512         /* Update the state of the Sentinel. */
2513         if (si) si->last_hello_time = mstime();
2514     }
2515 
2516 cleanup:
2517     sdsfreesplitres(token,numtokens);
2518 }
2519 
2520 
2521 /* This is our Pub/Sub callback for the Hello channel. It's useful in order
2522  * to discover other sentinels attached at the same master. */
sentinelReceiveHelloMessages(redisAsyncContext * c,void * reply,void * privdata)2523 void sentinelReceiveHelloMessages(redisAsyncContext *c, void *reply, void *privdata) {
2524     sentinelRedisInstance *ri = privdata;
2525     redisReply *r;
2526     UNUSED(c);
2527 
2528     if (!reply || !ri) return;
2529     r = reply;
2530 
2531     /* Update the last activity in the pubsub channel. Note that since we
2532      * receive our messages as well this timestamp can be used to detect
2533      * if the link is probably disconnected even if it seems otherwise. */
2534     ri->link->pc_last_activity = mstime();
2535 
2536     /* Sanity check in the reply we expect, so that the code that follows
2537      * can avoid to check for details. */
2538     if (r->type != REDIS_REPLY_ARRAY ||
2539         r->elements != 3 ||
2540         r->element[0]->type != REDIS_REPLY_STRING ||
2541         r->element[1]->type != REDIS_REPLY_STRING ||
2542         r->element[2]->type != REDIS_REPLY_STRING ||
2543         strcmp(r->element[0]->str,"message") != 0) return;
2544 
2545     /* We are not interested in meeting ourselves */
2546     if (strstr(r->element[2]->str,sentinel.myid) != NULL) return;
2547 
2548     sentinelProcessHelloMessage(r->element[2]->str, r->element[2]->len);
2549 }
2550 
2551 /* Send an "Hello" message via Pub/Sub to the specified 'ri' Redis
2552  * instance in order to broadcast the current configuration for this
2553  * master, and to advertise the existence of this Sentinel at the same time.
2554  *
2555  * The message has the following format:
2556  *
2557  * sentinel_ip,sentinel_port,sentinel_runid,current_epoch,
2558  * master_name,master_ip,master_port,master_config_epoch.
2559  *
2560  * Returns C_OK if the PUBLISH was queued correctly, otherwise
2561  * C_ERR is returned. */
sentinelSendHello(sentinelRedisInstance * ri)2562 int sentinelSendHello(sentinelRedisInstance *ri) {
2563     char ip[NET_IP_STR_LEN];
2564     char payload[NET_IP_STR_LEN+1024];
2565     int retval;
2566     char *announce_ip;
2567     int announce_port;
2568     sentinelRedisInstance *master = (ri->flags & SRI_MASTER) ? ri : ri->master;
2569     sentinelAddr *master_addr = sentinelGetCurrentMasterAddress(master);
2570 
2571     if (ri->link->disconnected) return C_ERR;
2572 
2573     /* Use the specified announce address if specified, otherwise try to
2574      * obtain our own IP address. */
2575     if (sentinel.announce_ip) {
2576         announce_ip = sentinel.announce_ip;
2577     } else {
2578         if (anetSockName(ri->link->cc->c.fd,ip,sizeof(ip),NULL) == -1)
2579             return C_ERR;
2580         announce_ip = ip;
2581     }
2582     announce_port = sentinel.announce_port ?
2583                     sentinel.announce_port : server.port;
2584 
2585     /* Format and send the Hello message. */
2586     snprintf(payload,sizeof(payload),
2587         "%s,%d,%s,%llu," /* Info about this sentinel. */
2588         "%s,%s,%d,%llu", /* Info about current master. */
2589         announce_ip, announce_port, sentinel.myid,
2590         (unsigned long long) sentinel.current_epoch,
2591         /* --- */
2592         master->name,master_addr->ip,master_addr->port,
2593         (unsigned long long) master->config_epoch);
2594     retval = redisAsyncCommand(ri->link->cc,
2595         sentinelPublishReplyCallback, ri, "%s %s %s",
2596         sentinelInstanceMapCommand(ri,"PUBLISH"),
2597         SENTINEL_HELLO_CHANNEL,payload);
2598     if (retval != C_OK) return C_ERR;
2599     ri->link->pending_commands++;
2600     return C_OK;
2601 }
2602 
2603 /* Reset last_pub_time in all the instances in the specified dictionary
2604  * in order to force the delivery of an Hello update ASAP. */
sentinelForceHelloUpdateDictOfRedisInstances(dict * instances)2605 void sentinelForceHelloUpdateDictOfRedisInstances(dict *instances) {
2606     dictIterator *di;
2607     dictEntry *de;
2608 
2609     di = dictGetSafeIterator(instances);
2610     while((de = dictNext(di)) != NULL) {
2611         sentinelRedisInstance *ri = dictGetVal(de);
2612         if (ri->last_pub_time >= (SENTINEL_PUBLISH_PERIOD+1))
2613             ri->last_pub_time -= (SENTINEL_PUBLISH_PERIOD+1);
2614     }
2615     dictReleaseIterator(di);
2616 }
2617 
2618 /* This function forces the delivery of an "Hello" message (see
2619  * sentinelSendHello() top comment for further information) to all the Redis
2620  * and Sentinel instances related to the specified 'master'.
2621  *
2622  * It is technically not needed since we send an update to every instance
2623  * with a period of SENTINEL_PUBLISH_PERIOD milliseconds, however when a
2624  * Sentinel upgrades a configuration it is a good idea to deliever an update
2625  * to the other Sentinels ASAP. */
sentinelForceHelloUpdateForMaster(sentinelRedisInstance * master)2626 int sentinelForceHelloUpdateForMaster(sentinelRedisInstance *master) {
2627     if (!(master->flags & SRI_MASTER)) return C_ERR;
2628     if (master->last_pub_time >= (SENTINEL_PUBLISH_PERIOD+1))
2629         master->last_pub_time -= (SENTINEL_PUBLISH_PERIOD+1);
2630     sentinelForceHelloUpdateDictOfRedisInstances(master->sentinels);
2631     sentinelForceHelloUpdateDictOfRedisInstances(master->slaves);
2632     return C_OK;
2633 }
2634 
2635 /* Send a PING to the specified instance and refresh the act_ping_time
2636  * if it is zero (that is, if we received a pong for the previous ping).
2637  *
2638  * On error zero is returned, and we can't consider the PING command
2639  * queued in the connection. */
sentinelSendPing(sentinelRedisInstance * ri)2640 int sentinelSendPing(sentinelRedisInstance *ri) {
2641     int retval = redisAsyncCommand(ri->link->cc,
2642         sentinelPingReplyCallback, ri, "%s",
2643         sentinelInstanceMapCommand(ri,"PING"));
2644     if (retval == C_OK) {
2645         ri->link->pending_commands++;
2646         ri->link->last_ping_time = mstime();
2647         /* We update the active ping time only if we received the pong for
2648          * the previous ping, otherwise we are technically waiting since the
2649          * first ping that did not receive a reply. */
2650         if (ri->link->act_ping_time == 0)
2651             ri->link->act_ping_time = ri->link->last_ping_time;
2652         return 1;
2653     } else {
2654         return 0;
2655     }
2656 }
2657 
2658 /* Send periodic PING, INFO, and PUBLISH to the Hello channel to
2659  * the specified master or slave instance. */
sentinelSendPeriodicCommands(sentinelRedisInstance * ri)2660 void sentinelSendPeriodicCommands(sentinelRedisInstance *ri) {
2661     mstime_t now = mstime();
2662     mstime_t info_period, ping_period;
2663     int retval;
2664 
2665     /* Return ASAP if we have already a PING or INFO already pending, or
2666      * in the case the instance is not properly connected. */
2667     if (ri->link->disconnected) return;
2668 
2669     /* For INFO, PING, PUBLISH that are not critical commands to send we
2670      * also have a limit of SENTINEL_MAX_PENDING_COMMANDS. We don't
2671      * want to use a lot of memory just because a link is not working
2672      * properly (note that anyway there is a redundant protection about this,
2673      * that is, the link will be disconnected and reconnected if a long
2674      * timeout condition is detected. */
2675     if (ri->link->pending_commands >=
2676         SENTINEL_MAX_PENDING_COMMANDS * ri->link->refcount) return;
2677 
2678     /* If this is a slave of a master in O_DOWN condition we start sending
2679      * it INFO every second, instead of the usual SENTINEL_INFO_PERIOD
2680      * period. In this state we want to closely monitor slaves in case they
2681      * are turned into masters by another Sentinel, or by the sysadmin.
2682      *
2683      * Similarly we monitor the INFO output more often if the slave reports
2684      * to be disconnected from the master, so that we can have a fresh
2685      * disconnection time figure. */
2686     if ((ri->flags & SRI_SLAVE) &&
2687         ((ri->master->flags & (SRI_O_DOWN|SRI_FAILOVER_IN_PROGRESS)) ||
2688          (ri->master_link_down_time != 0)))
2689     {
2690         info_period = 1000;
2691     } else {
2692         info_period = SENTINEL_INFO_PERIOD;
2693     }
2694 
2695     /* We ping instances every time the last received pong is older than
2696      * the configured 'down-after-milliseconds' time, but every second
2697      * anyway if 'down-after-milliseconds' is greater than 1 second. */
2698     ping_period = ri->down_after_period;
2699     if (ping_period > SENTINEL_PING_PERIOD) ping_period = SENTINEL_PING_PERIOD;
2700 
2701     /* Send INFO to masters and slaves, not sentinels. */
2702     if ((ri->flags & SRI_SENTINEL) == 0 &&
2703         (ri->info_refresh == 0 ||
2704         (now - ri->info_refresh) > info_period))
2705     {
2706         retval = redisAsyncCommand(ri->link->cc,
2707             sentinelInfoReplyCallback, ri, "%s",
2708             sentinelInstanceMapCommand(ri,"INFO"));
2709         if (retval == C_OK) ri->link->pending_commands++;
2710     }
2711 
2712     /* Send PING to all the three kinds of instances. */
2713     if ((now - ri->link->last_pong_time) > ping_period &&
2714                (now - ri->link->last_ping_time) > ping_period/2) {
2715         sentinelSendPing(ri);
2716     }
2717 
2718     /* PUBLISH hello messages to all the three kinds of instances. */
2719     if ((now - ri->last_pub_time) > SENTINEL_PUBLISH_PERIOD) {
2720         sentinelSendHello(ri);
2721     }
2722 }
2723 
2724 /* =========================== SENTINEL command ============================= */
2725 
sentinelFailoverStateStr(int state)2726 const char *sentinelFailoverStateStr(int state) {
2727     switch(state) {
2728     case SENTINEL_FAILOVER_STATE_NONE: return "none";
2729     case SENTINEL_FAILOVER_STATE_WAIT_START: return "wait_start";
2730     case SENTINEL_FAILOVER_STATE_SELECT_SLAVE: return "select_slave";
2731     case SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE: return "send_slaveof_noone";
2732     case SENTINEL_FAILOVER_STATE_WAIT_PROMOTION: return "wait_promotion";
2733     case SENTINEL_FAILOVER_STATE_RECONF_SLAVES: return "reconf_slaves";
2734     case SENTINEL_FAILOVER_STATE_UPDATE_CONFIG: return "update_config";
2735     default: return "unknown";
2736     }
2737 }
2738 
2739 /* Redis instance to Redis protocol representation. */
addReplySentinelRedisInstance(client * c,sentinelRedisInstance * ri)2740 void addReplySentinelRedisInstance(client *c, sentinelRedisInstance *ri) {
2741     char *flags = sdsempty();
2742     void *mbl;
2743     int fields = 0;
2744 
2745     mbl = addDeferredMultiBulkLength(c);
2746 
2747     addReplyBulkCString(c,"name");
2748     addReplyBulkCString(c,ri->name);
2749     fields++;
2750 
2751     addReplyBulkCString(c,"ip");
2752     addReplyBulkCString(c,ri->addr->ip);
2753     fields++;
2754 
2755     addReplyBulkCString(c,"port");
2756     addReplyBulkLongLong(c,ri->addr->port);
2757     fields++;
2758 
2759     addReplyBulkCString(c,"runid");
2760     addReplyBulkCString(c,ri->runid ? ri->runid : "");
2761     fields++;
2762 
2763     addReplyBulkCString(c,"flags");
2764     if (ri->flags & SRI_S_DOWN) flags = sdscat(flags,"s_down,");
2765     if (ri->flags & SRI_O_DOWN) flags = sdscat(flags,"o_down,");
2766     if (ri->flags & SRI_MASTER) flags = sdscat(flags,"master,");
2767     if (ri->flags & SRI_SLAVE) flags = sdscat(flags,"slave,");
2768     if (ri->flags & SRI_SENTINEL) flags = sdscat(flags,"sentinel,");
2769     if (ri->link->disconnected) flags = sdscat(flags,"disconnected,");
2770     if (ri->flags & SRI_MASTER_DOWN) flags = sdscat(flags,"master_down,");
2771     if (ri->flags & SRI_FAILOVER_IN_PROGRESS)
2772         flags = sdscat(flags,"failover_in_progress,");
2773     if (ri->flags & SRI_PROMOTED) flags = sdscat(flags,"promoted,");
2774     if (ri->flags & SRI_RECONF_SENT) flags = sdscat(flags,"reconf_sent,");
2775     if (ri->flags & SRI_RECONF_INPROG) flags = sdscat(flags,"reconf_inprog,");
2776     if (ri->flags & SRI_RECONF_DONE) flags = sdscat(flags,"reconf_done,");
2777 
2778     if (sdslen(flags) != 0) sdsrange(flags,0,-2); /* remove last "," */
2779     addReplyBulkCString(c,flags);
2780     sdsfree(flags);
2781     fields++;
2782 
2783     addReplyBulkCString(c,"link-pending-commands");
2784     addReplyBulkLongLong(c,ri->link->pending_commands);
2785     fields++;
2786 
2787     addReplyBulkCString(c,"link-refcount");
2788     addReplyBulkLongLong(c,ri->link->refcount);
2789     fields++;
2790 
2791     if (ri->flags & SRI_FAILOVER_IN_PROGRESS) {
2792         addReplyBulkCString(c,"failover-state");
2793         addReplyBulkCString(c,(char*)sentinelFailoverStateStr(ri->failover_state));
2794         fields++;
2795     }
2796 
2797     addReplyBulkCString(c,"last-ping-sent");
2798     addReplyBulkLongLong(c,
2799         ri->link->act_ping_time ? (mstime() - ri->link->act_ping_time) : 0);
2800     fields++;
2801 
2802     addReplyBulkCString(c,"last-ok-ping-reply");
2803     addReplyBulkLongLong(c,mstime() - ri->link->last_avail_time);
2804     fields++;
2805 
2806     addReplyBulkCString(c,"last-ping-reply");
2807     addReplyBulkLongLong(c,mstime() - ri->link->last_pong_time);
2808     fields++;
2809 
2810     if (ri->flags & SRI_S_DOWN) {
2811         addReplyBulkCString(c,"s-down-time");
2812         addReplyBulkLongLong(c,mstime()-ri->s_down_since_time);
2813         fields++;
2814     }
2815 
2816     if (ri->flags & SRI_O_DOWN) {
2817         addReplyBulkCString(c,"o-down-time");
2818         addReplyBulkLongLong(c,mstime()-ri->o_down_since_time);
2819         fields++;
2820     }
2821 
2822     addReplyBulkCString(c,"down-after-milliseconds");
2823     addReplyBulkLongLong(c,ri->down_after_period);
2824     fields++;
2825 
2826     /* Masters and Slaves */
2827     if (ri->flags & (SRI_MASTER|SRI_SLAVE)) {
2828         addReplyBulkCString(c,"info-refresh");
2829         addReplyBulkLongLong(c,mstime() - ri->info_refresh);
2830         fields++;
2831 
2832         addReplyBulkCString(c,"role-reported");
2833         addReplyBulkCString(c, (ri->role_reported == SRI_MASTER) ? "master" :
2834                                                                    "slave");
2835         fields++;
2836 
2837         addReplyBulkCString(c,"role-reported-time");
2838         addReplyBulkLongLong(c,mstime() - ri->role_reported_time);
2839         fields++;
2840     }
2841 
2842     /* Only masters */
2843     if (ri->flags & SRI_MASTER) {
2844         addReplyBulkCString(c,"config-epoch");
2845         addReplyBulkLongLong(c,ri->config_epoch);
2846         fields++;
2847 
2848         addReplyBulkCString(c,"num-slaves");
2849         addReplyBulkLongLong(c,dictSize(ri->slaves));
2850         fields++;
2851 
2852         addReplyBulkCString(c,"num-other-sentinels");
2853         addReplyBulkLongLong(c,dictSize(ri->sentinels));
2854         fields++;
2855 
2856         addReplyBulkCString(c,"quorum");
2857         addReplyBulkLongLong(c,ri->quorum);
2858         fields++;
2859 
2860         addReplyBulkCString(c,"failover-timeout");
2861         addReplyBulkLongLong(c,ri->failover_timeout);
2862         fields++;
2863 
2864         addReplyBulkCString(c,"parallel-syncs");
2865         addReplyBulkLongLong(c,ri->parallel_syncs);
2866         fields++;
2867 
2868         if (ri->notification_script) {
2869             addReplyBulkCString(c,"notification-script");
2870             addReplyBulkCString(c,ri->notification_script);
2871             fields++;
2872         }
2873 
2874         if (ri->client_reconfig_script) {
2875             addReplyBulkCString(c,"client-reconfig-script");
2876             addReplyBulkCString(c,ri->client_reconfig_script);
2877             fields++;
2878         }
2879     }
2880 
2881     /* Only slaves */
2882     if (ri->flags & SRI_SLAVE) {
2883         addReplyBulkCString(c,"master-link-down-time");
2884         addReplyBulkLongLong(c,ri->master_link_down_time);
2885         fields++;
2886 
2887         addReplyBulkCString(c,"master-link-status");
2888         addReplyBulkCString(c,
2889             (ri->slave_master_link_status == SENTINEL_MASTER_LINK_STATUS_UP) ?
2890             "ok" : "err");
2891         fields++;
2892 
2893         addReplyBulkCString(c,"master-host");
2894         addReplyBulkCString(c,
2895             ri->slave_master_host ? ri->slave_master_host : "?");
2896         fields++;
2897 
2898         addReplyBulkCString(c,"master-port");
2899         addReplyBulkLongLong(c,ri->slave_master_port);
2900         fields++;
2901 
2902         addReplyBulkCString(c,"slave-priority");
2903         addReplyBulkLongLong(c,ri->slave_priority);
2904         fields++;
2905 
2906         addReplyBulkCString(c,"slave-repl-offset");
2907         addReplyBulkLongLong(c,ri->slave_repl_offset);
2908         fields++;
2909     }
2910 
2911     /* Only sentinels */
2912     if (ri->flags & SRI_SENTINEL) {
2913         addReplyBulkCString(c,"last-hello-message");
2914         addReplyBulkLongLong(c,mstime() - ri->last_hello_time);
2915         fields++;
2916 
2917         addReplyBulkCString(c,"voted-leader");
2918         addReplyBulkCString(c,ri->leader ? ri->leader : "?");
2919         fields++;
2920 
2921         addReplyBulkCString(c,"voted-leader-epoch");
2922         addReplyBulkLongLong(c,ri->leader_epoch);
2923         fields++;
2924     }
2925 
2926     setDeferredMultiBulkLength(c,mbl,fields*2);
2927 }
2928 
2929 /* Output a number of instances contained inside a dictionary as
2930  * Redis protocol. */
addReplyDictOfRedisInstances(client * c,dict * instances)2931 void addReplyDictOfRedisInstances(client *c, dict *instances) {
2932     dictIterator *di;
2933     dictEntry *de;
2934 
2935     di = dictGetIterator(instances);
2936     addReplyMultiBulkLen(c,dictSize(instances));
2937     while((de = dictNext(di)) != NULL) {
2938         sentinelRedisInstance *ri = dictGetVal(de);
2939 
2940         addReplySentinelRedisInstance(c,ri);
2941     }
2942     dictReleaseIterator(di);
2943 }
2944 
2945 /* Lookup the named master into sentinel.masters.
2946  * If the master is not found reply to the client with an error and returns
2947  * NULL. */
sentinelGetMasterByNameOrReplyError(client * c,robj * name)2948 sentinelRedisInstance *sentinelGetMasterByNameOrReplyError(client *c,
2949                         robj *name)
2950 {
2951     sentinelRedisInstance *ri;
2952 
2953     ri = dictFetchValue(sentinel.masters,name->ptr);
2954     if (!ri) {
2955         addReplyError(c,"No such master with that name");
2956         return NULL;
2957     }
2958     return ri;
2959 }
2960 
2961 #define SENTINEL_ISQR_OK 0
2962 #define SENTINEL_ISQR_NOQUORUM (1<<0)
2963 #define SENTINEL_ISQR_NOAUTH (1<<1)
sentinelIsQuorumReachable(sentinelRedisInstance * master,int * usableptr)2964 int sentinelIsQuorumReachable(sentinelRedisInstance *master, int *usableptr) {
2965     dictIterator *di;
2966     dictEntry *de;
2967     int usable = 1; /* Number of usable Sentinels. Init to 1 to count myself. */
2968     int result = SENTINEL_ISQR_OK;
2969     int voters = dictSize(master->sentinels)+1; /* Known Sentinels + myself. */
2970 
2971     di = dictGetIterator(master->sentinels);
2972     while((de = dictNext(di)) != NULL) {
2973         sentinelRedisInstance *ri = dictGetVal(de);
2974 
2975         if (ri->flags & (SRI_S_DOWN|SRI_O_DOWN)) continue;
2976         usable++;
2977     }
2978     dictReleaseIterator(di);
2979 
2980     if (usable < (int)master->quorum) result |= SENTINEL_ISQR_NOQUORUM;
2981     if (usable < voters/2+1) result |= SENTINEL_ISQR_NOAUTH;
2982     if (usableptr) *usableptr = usable;
2983     return result;
2984 }
2985 
sentinelCommand(client * c)2986 void sentinelCommand(client *c) {
2987     if (!strcasecmp(c->argv[1]->ptr,"masters")) {
2988         /* SENTINEL MASTERS */
2989         if (c->argc != 2) goto numargserr;
2990         addReplyDictOfRedisInstances(c,sentinel.masters);
2991     } else if (!strcasecmp(c->argv[1]->ptr,"master")) {
2992         /* SENTINEL MASTER <name> */
2993         sentinelRedisInstance *ri;
2994 
2995         if (c->argc != 3) goto numargserr;
2996         if ((ri = sentinelGetMasterByNameOrReplyError(c,c->argv[2]))
2997             == NULL) return;
2998         addReplySentinelRedisInstance(c,ri);
2999     } else if (!strcasecmp(c->argv[1]->ptr,"slaves") ||
3000                !strcasecmp(c->argv[1]->ptr,"replicas"))
3001     {
3002         /* SENTINEL REPLICAS <master-name> */
3003         sentinelRedisInstance *ri;
3004 
3005         if (c->argc != 3) goto numargserr;
3006         if ((ri = sentinelGetMasterByNameOrReplyError(c,c->argv[2])) == NULL)
3007             return;
3008         addReplyDictOfRedisInstances(c,ri->slaves);
3009     } else if (!strcasecmp(c->argv[1]->ptr,"sentinels")) {
3010         /* SENTINEL SENTINELS <master-name> */
3011         sentinelRedisInstance *ri;
3012 
3013         if (c->argc != 3) goto numargserr;
3014         if ((ri = sentinelGetMasterByNameOrReplyError(c,c->argv[2])) == NULL)
3015             return;
3016         addReplyDictOfRedisInstances(c,ri->sentinels);
3017     } else if (!strcasecmp(c->argv[1]->ptr,"is-master-down-by-addr")) {
3018         /* SENTINEL IS-MASTER-DOWN-BY-ADDR <ip> <port> <current-epoch> <runid>
3019          *
3020          * Arguments:
3021          *
3022          * ip and port are the ip and port of the master we want to be
3023          * checked by Sentinel. Note that the command will not check by
3024          * name but just by master, in theory different Sentinels may monitor
3025          * differnet masters with the same name.
3026          *
3027          * current-epoch is needed in order to understand if we are allowed
3028          * to vote for a failover leader or not. Each Sentinel can vote just
3029          * one time per epoch.
3030          *
3031          * runid is "*" if we are not seeking for a vote from the Sentinel
3032          * in order to elect the failover leader. Otherwise it is set to the
3033          * runid we want the Sentinel to vote if it did not already voted.
3034          */
3035         sentinelRedisInstance *ri;
3036         long long req_epoch;
3037         uint64_t leader_epoch = 0;
3038         char *leader = NULL;
3039         long port;
3040         int isdown = 0;
3041 
3042         if (c->argc != 6) goto numargserr;
3043         if (getLongFromObjectOrReply(c,c->argv[3],&port,NULL) != C_OK ||
3044             getLongLongFromObjectOrReply(c,c->argv[4],&req_epoch,NULL)
3045                                                               != C_OK)
3046             return;
3047         ri = getSentinelRedisInstanceByAddrAndRunID(sentinel.masters,
3048             c->argv[2]->ptr,port,NULL);
3049 
3050         /* It exists? Is actually a master? Is subjectively down? It's down.
3051          * Note: if we are in tilt mode we always reply with "0". */
3052         if (!sentinel.tilt && ri && (ri->flags & SRI_S_DOWN) &&
3053                                     (ri->flags & SRI_MASTER))
3054             isdown = 1;
3055 
3056         /* Vote for the master (or fetch the previous vote) if the request
3057          * includes a runid, otherwise the sender is not seeking for a vote. */
3058         if (ri && ri->flags & SRI_MASTER && strcasecmp(c->argv[5]->ptr,"*")) {
3059             leader = sentinelVoteLeader(ri,(uint64_t)req_epoch,
3060                                             c->argv[5]->ptr,
3061                                             &leader_epoch);
3062         }
3063 
3064         /* Reply with a three-elements multi-bulk reply:
3065          * down state, leader, vote epoch. */
3066         addReplyMultiBulkLen(c,3);
3067         addReply(c, isdown ? shared.cone : shared.czero);
3068         addReplyBulkCString(c, leader ? leader : "*");
3069         addReplyLongLong(c, (long long)leader_epoch);
3070         if (leader) sdsfree(leader);
3071     } else if (!strcasecmp(c->argv[1]->ptr,"reset")) {
3072         /* SENTINEL RESET <pattern> */
3073         if (c->argc != 3) goto numargserr;
3074         addReplyLongLong(c,sentinelResetMastersByPattern(c->argv[2]->ptr,SENTINEL_GENERATE_EVENT));
3075     } else if (!strcasecmp(c->argv[1]->ptr,"get-master-addr-by-name")) {
3076         /* SENTINEL GET-MASTER-ADDR-BY-NAME <master-name> */
3077         sentinelRedisInstance *ri;
3078 
3079         if (c->argc != 3) goto numargserr;
3080         ri = sentinelGetMasterByName(c->argv[2]->ptr);
3081         if (ri == NULL) {
3082             addReply(c,shared.nullmultibulk);
3083         } else {
3084             sentinelAddr *addr = sentinelGetCurrentMasterAddress(ri);
3085 
3086             addReplyMultiBulkLen(c,2);
3087             addReplyBulkCString(c,addr->ip);
3088             addReplyBulkLongLong(c,addr->port);
3089         }
3090     } else if (!strcasecmp(c->argv[1]->ptr,"failover")) {
3091         /* SENTINEL FAILOVER <master-name> */
3092         sentinelRedisInstance *ri;
3093 
3094         if (c->argc != 3) goto numargserr;
3095         if ((ri = sentinelGetMasterByNameOrReplyError(c,c->argv[2])) == NULL)
3096             return;
3097         if (ri->flags & SRI_FAILOVER_IN_PROGRESS) {
3098             addReplySds(c,sdsnew("-INPROG Failover already in progress\r\n"));
3099             return;
3100         }
3101         if (sentinelSelectSlave(ri) == NULL) {
3102             addReplySds(c,sdsnew("-NOGOODSLAVE No suitable replica to promote\r\n"));
3103             return;
3104         }
3105         serverLog(LL_WARNING,"Executing user requested FAILOVER of '%s'",
3106             ri->name);
3107         sentinelStartFailover(ri);
3108         ri->flags |= SRI_FORCE_FAILOVER;
3109         addReply(c,shared.ok);
3110     } else if (!strcasecmp(c->argv[1]->ptr,"pending-scripts")) {
3111         /* SENTINEL PENDING-SCRIPTS */
3112 
3113         if (c->argc != 2) goto numargserr;
3114         sentinelPendingScriptsCommand(c);
3115     } else if (!strcasecmp(c->argv[1]->ptr,"monitor")) {
3116         /* SENTINEL MONITOR <name> <ip> <port> <quorum> */
3117         sentinelRedisInstance *ri;
3118         long quorum, port;
3119         char ip[NET_IP_STR_LEN];
3120 
3121         if (c->argc != 6) goto numargserr;
3122         if (getLongFromObjectOrReply(c,c->argv[5],&quorum,"Invalid quorum")
3123             != C_OK) return;
3124         if (getLongFromObjectOrReply(c,c->argv[4],&port,"Invalid port")
3125             != C_OK) return;
3126 
3127         if (quorum <= 0) {
3128             addReplyError(c, "Quorum must be 1 or greater.");
3129             return;
3130         }
3131 
3132         /* Make sure the IP field is actually a valid IP before passing it
3133          * to createSentinelRedisInstance(), otherwise we may trigger a
3134          * DNS lookup at runtime. */
3135         if (anetResolveIP(NULL,c->argv[3]->ptr,ip,sizeof(ip)) == ANET_ERR) {
3136             addReplyError(c,"Invalid IP address specified");
3137             return;
3138         }
3139 
3140         /* Parameters are valid. Try to create the master instance. */
3141         ri = createSentinelRedisInstance(c->argv[2]->ptr,SRI_MASTER,
3142                 c->argv[3]->ptr,port,quorum,NULL);
3143         if (ri == NULL) {
3144             switch(errno) {
3145             case EBUSY:
3146                 addReplyError(c,"Duplicated master name");
3147                 break;
3148             case EINVAL:
3149                 addReplyError(c,"Invalid port number");
3150                 break;
3151             default:
3152                 addReplyError(c,"Unspecified error adding the instance");
3153                 break;
3154             }
3155         } else {
3156             sentinelFlushConfig();
3157             sentinelEvent(LL_WARNING,"+monitor",ri,"%@ quorum %d",ri->quorum);
3158             addReply(c,shared.ok);
3159         }
3160     } else if (!strcasecmp(c->argv[1]->ptr,"flushconfig")) {
3161         if (c->argc != 2) goto numargserr;
3162         sentinelFlushConfig();
3163         addReply(c,shared.ok);
3164         return;
3165     } else if (!strcasecmp(c->argv[1]->ptr,"remove")) {
3166         /* SENTINEL REMOVE <name> */
3167         sentinelRedisInstance *ri;
3168 
3169         if (c->argc != 3) goto numargserr;
3170         if ((ri = sentinelGetMasterByNameOrReplyError(c,c->argv[2]))
3171             == NULL) return;
3172         sentinelEvent(LL_WARNING,"-monitor",ri,"%@");
3173         dictDelete(sentinel.masters,c->argv[2]->ptr);
3174         sentinelFlushConfig();
3175         addReply(c,shared.ok);
3176     } else if (!strcasecmp(c->argv[1]->ptr,"ckquorum")) {
3177         /* SENTINEL CKQUORUM <name> */
3178         sentinelRedisInstance *ri;
3179         int usable;
3180 
3181         if (c->argc != 3) goto numargserr;
3182         if ((ri = sentinelGetMasterByNameOrReplyError(c,c->argv[2]))
3183             == NULL) return;
3184         int result = sentinelIsQuorumReachable(ri,&usable);
3185         if (result == SENTINEL_ISQR_OK) {
3186             addReplySds(c, sdscatfmt(sdsempty(),
3187                 "+OK %i usable Sentinels. Quorum and failover authorization "
3188                 "can be reached\r\n",usable));
3189         } else {
3190             sds e = sdscatfmt(sdsempty(),
3191                 "-NOQUORUM %i usable Sentinels. ",usable);
3192             if (result & SENTINEL_ISQR_NOQUORUM)
3193                 e = sdscat(e,"Not enough available Sentinels to reach the"
3194                              " specified quorum for this master");
3195             if (result & SENTINEL_ISQR_NOAUTH) {
3196                 if (result & SENTINEL_ISQR_NOQUORUM) e = sdscat(e,". ");
3197                 e = sdscat(e, "Not enough available Sentinels to reach the"
3198                               " majority and authorize a failover");
3199             }
3200             e = sdscat(e,"\r\n");
3201             addReplySds(c,e);
3202         }
3203     } else if (!strcasecmp(c->argv[1]->ptr,"set")) {
3204         if (c->argc < 3) goto numargserr;
3205         sentinelSetCommand(c);
3206     } else if (!strcasecmp(c->argv[1]->ptr,"info-cache")) {
3207         /* SENTINEL INFO-CACHE <name> */
3208         if (c->argc < 2) goto numargserr;
3209         mstime_t now = mstime();
3210 
3211         /* Create an ad-hoc dictionary type so that we can iterate
3212          * a dictionary composed of just the master groups the user
3213          * requested. */
3214         dictType copy_keeper = instancesDictType;
3215         copy_keeper.valDestructor = NULL;
3216         dict *masters_local = sentinel.masters;
3217         if (c->argc > 2) {
3218             masters_local = dictCreate(&copy_keeper, NULL);
3219 
3220             for (int i = 2; i < c->argc; i++) {
3221                 sentinelRedisInstance *ri;
3222                 ri = sentinelGetMasterByName(c->argv[i]->ptr);
3223                 if (!ri) continue; /* ignore non-existing names */
3224                 dictAdd(masters_local, ri->name, ri);
3225             }
3226         }
3227 
3228         /* Reply format:
3229          *   1.) master name
3230          *   2.) 1.) info from master
3231          *       2.) info from replica
3232          *       ...
3233          *   3.) other master name
3234          *   ...
3235          */
3236         addReplyMultiBulkLen(c,dictSize(masters_local) * 2);
3237 
3238         dictIterator  *di;
3239         dictEntry *de;
3240         di = dictGetIterator(masters_local);
3241         while ((de = dictNext(di)) != NULL) {
3242             sentinelRedisInstance *ri = dictGetVal(de);
3243             addReplyBulkCBuffer(c,ri->name,strlen(ri->name));
3244             addReplyMultiBulkLen(c,dictSize(ri->slaves) + 1); /* +1 for self */
3245             addReplyMultiBulkLen(c,2);
3246             addReplyLongLong(c, now - ri->info_refresh);
3247             if (ri->info)
3248                 addReplyBulkCBuffer(c,ri->info,sdslen(ri->info));
3249             else
3250                 addReply(c,shared.nullbulk);
3251 
3252             dictIterator *sdi;
3253             dictEntry *sde;
3254             sdi = dictGetIterator(ri->slaves);
3255             while ((sde = dictNext(sdi)) != NULL) {
3256                 sentinelRedisInstance *sri = dictGetVal(sde);
3257                 addReplyMultiBulkLen(c,2);
3258                 addReplyLongLong(c, now - sri->info_refresh);
3259                 if (sri->info)
3260                     addReplyBulkCBuffer(c,sri->info,sdslen(sri->info));
3261                 else
3262                     addReply(c,shared.nullbulk);
3263             }
3264             dictReleaseIterator(sdi);
3265         }
3266         dictReleaseIterator(di);
3267         if (masters_local != sentinel.masters) dictRelease(masters_local);
3268     } else if (!strcasecmp(c->argv[1]->ptr,"simulate-failure")) {
3269         /* SENTINEL SIMULATE-FAILURE <flag> <flag> ... <flag> */
3270         int j;
3271 
3272         sentinel.simfailure_flags = SENTINEL_SIMFAILURE_NONE;
3273         for (j = 2; j < c->argc; j++) {
3274             if (!strcasecmp(c->argv[j]->ptr,"crash-after-election")) {
3275                 sentinel.simfailure_flags |=
3276                     SENTINEL_SIMFAILURE_CRASH_AFTER_ELECTION;
3277                 serverLog(LL_WARNING,"Failure simulation: this Sentinel "
3278                     "will crash after being successfully elected as failover "
3279                     "leader");
3280             } else if (!strcasecmp(c->argv[j]->ptr,"crash-after-promotion")) {
3281                 sentinel.simfailure_flags |=
3282                     SENTINEL_SIMFAILURE_CRASH_AFTER_PROMOTION;
3283                 serverLog(LL_WARNING,"Failure simulation: this Sentinel "
3284                     "will crash after promoting the selected replica to master");
3285             } else if (!strcasecmp(c->argv[j]->ptr,"help")) {
3286                 addReplyMultiBulkLen(c,2);
3287                 addReplyBulkCString(c,"crash-after-election");
3288                 addReplyBulkCString(c,"crash-after-promotion");
3289             } else {
3290                 addReplyError(c,"Unknown failure simulation specified");
3291                 return;
3292             }
3293         }
3294         addReply(c,shared.ok);
3295     } else {
3296         addReplyErrorFormat(c,"Unknown sentinel subcommand '%s'",
3297                                (char*)c->argv[1]->ptr);
3298     }
3299     return;
3300 
3301 numargserr:
3302     addReplyErrorFormat(c,"Wrong number of arguments for 'sentinel %s'",
3303                           (char*)c->argv[1]->ptr);
3304 }
3305 
3306 #define info_section_from_redis(section_name) do { \
3307     if (defsections || allsections || !strcasecmp(section,section_name)) { \
3308         sds redissection; \
3309         if (sections++) info = sdscat(info,"\r\n"); \
3310         redissection = genRedisInfoString(section_name); \
3311         info = sdscatlen(info,redissection,sdslen(redissection)); \
3312         sdsfree(redissection); \
3313     } \
3314 } while(0)
3315 
3316 /* SENTINEL INFO [section] */
sentinelInfoCommand(client * c)3317 void sentinelInfoCommand(client *c) {
3318     if (c->argc > 2) {
3319         addReply(c,shared.syntaxerr);
3320         return;
3321     }
3322 
3323     int defsections = 0, allsections = 0;
3324     char *section = c->argc == 2 ? c->argv[1]->ptr : NULL;
3325     if (section) {
3326         allsections = !strcasecmp(section,"all");
3327         defsections = !strcasecmp(section,"default");
3328     } else {
3329         defsections = 1;
3330     }
3331 
3332     int sections = 0;
3333     sds info = sdsempty();
3334 
3335     info_section_from_redis("server");
3336     info_section_from_redis("clients");
3337     info_section_from_redis("cpu");
3338     info_section_from_redis("stats");
3339 
3340     if (defsections || allsections || !strcasecmp(section,"sentinel")) {
3341         dictIterator *di;
3342         dictEntry *de;
3343         int master_id = 0;
3344 
3345         if (sections++) info = sdscat(info,"\r\n");
3346         info = sdscatprintf(info,
3347             "# Sentinel\r\n"
3348             "sentinel_masters:%lu\r\n"
3349             "sentinel_tilt:%d\r\n"
3350             "sentinel_running_scripts:%d\r\n"
3351             "sentinel_scripts_queue_length:%ld\r\n"
3352             "sentinel_simulate_failure_flags:%lu\r\n",
3353             dictSize(sentinel.masters),
3354             sentinel.tilt,
3355             sentinel.running_scripts,
3356             listLength(sentinel.scripts_queue),
3357             sentinel.simfailure_flags);
3358 
3359         di = dictGetIterator(sentinel.masters);
3360         while((de = dictNext(di)) != NULL) {
3361             sentinelRedisInstance *ri = dictGetVal(de);
3362             char *status = "ok";
3363 
3364             if (ri->flags & SRI_O_DOWN) status = "odown";
3365             else if (ri->flags & SRI_S_DOWN) status = "sdown";
3366             info = sdscatprintf(info,
3367                 "master%d:name=%s,status=%s,address=%s:%d,"
3368                 "slaves=%lu,sentinels=%lu\r\n",
3369                 master_id++, ri->name, status,
3370                 ri->addr->ip, ri->addr->port,
3371                 dictSize(ri->slaves),
3372                 dictSize(ri->sentinels)+1);
3373         }
3374         dictReleaseIterator(di);
3375     }
3376 
3377     addReplyBulkSds(c, info);
3378 }
3379 
3380 /* Implements Sentinel version of the ROLE command. The output is
3381  * "sentinel" and the list of currently monitored master names. */
sentinelRoleCommand(client * c)3382 void sentinelRoleCommand(client *c) {
3383     dictIterator *di;
3384     dictEntry *de;
3385 
3386     addReplyMultiBulkLen(c,2);
3387     addReplyBulkCBuffer(c,"sentinel",8);
3388     addReplyMultiBulkLen(c,dictSize(sentinel.masters));
3389 
3390     di = dictGetIterator(sentinel.masters);
3391     while((de = dictNext(di)) != NULL) {
3392         sentinelRedisInstance *ri = dictGetVal(de);
3393 
3394         addReplyBulkCString(c,ri->name);
3395     }
3396     dictReleaseIterator(di);
3397 }
3398 
3399 /* SENTINEL SET <mastername> [<option> <value> ...] */
sentinelSetCommand(client * c)3400 void sentinelSetCommand(client *c) {
3401     sentinelRedisInstance *ri;
3402     int j, changes = 0;
3403     int badarg = 0; /* Bad argument position for error reporting. */
3404     char *option;
3405 
3406     if ((ri = sentinelGetMasterByNameOrReplyError(c,c->argv[2]))
3407         == NULL) return;
3408 
3409     /* Process option - value pairs. */
3410     for (j = 3; j < c->argc; j++) {
3411         int moreargs = (c->argc-1) - j;
3412         option = c->argv[j]->ptr;
3413         long long ll;
3414         int old_j = j; /* Used to know what to log as an event. */
3415 
3416         if (!strcasecmp(option,"down-after-milliseconds") && moreargs > 0) {
3417             /* down-after-millisecodns <milliseconds> */
3418             robj *o = c->argv[++j];
3419             if (getLongLongFromObject(o,&ll) == C_ERR || ll <= 0) {
3420                 badarg = j;
3421                 goto badfmt;
3422             }
3423             ri->down_after_period = ll;
3424             sentinelPropagateDownAfterPeriod(ri);
3425             changes++;
3426         } else if (!strcasecmp(option,"failover-timeout") && moreargs > 0) {
3427             /* failover-timeout <milliseconds> */
3428             robj *o = c->argv[++j];
3429             if (getLongLongFromObject(o,&ll) == C_ERR || ll <= 0) {
3430                 badarg = j;
3431                 goto badfmt;
3432             }
3433             ri->failover_timeout = ll;
3434             changes++;
3435         } else if (!strcasecmp(option,"parallel-syncs") && moreargs > 0) {
3436             /* parallel-syncs <milliseconds> */
3437             robj *o = c->argv[++j];
3438             if (getLongLongFromObject(o,&ll) == C_ERR || ll <= 0) {
3439                 badarg = j;
3440                 goto badfmt;
3441             }
3442             ri->parallel_syncs = ll;
3443             changes++;
3444         } else if (!strcasecmp(option,"notification-script") && moreargs > 0) {
3445             /* notification-script <path> */
3446             char *value = c->argv[++j]->ptr;
3447             if (sentinel.deny_scripts_reconfig) {
3448                 addReplyError(c,
3449                     "Reconfiguration of scripts path is denied for "
3450                     "security reasons. Check the deny-scripts-reconfig "
3451                     "configuration directive in your Sentinel configuration");
3452                 return;
3453             }
3454 
3455             if (strlen(value) && access(value,X_OK) == -1) {
3456                 addReplyError(c,
3457                     "Notification script seems non existing or non executable");
3458                 if (changes) sentinelFlushConfig();
3459                 return;
3460             }
3461             sdsfree(ri->notification_script);
3462             ri->notification_script = strlen(value) ? sdsnew(value) : NULL;
3463             changes++;
3464         } else if (!strcasecmp(option,"client-reconfig-script") && moreargs > 0) {
3465             /* client-reconfig-script <path> */
3466             char *value = c->argv[++j]->ptr;
3467             if (sentinel.deny_scripts_reconfig) {
3468                 addReplyError(c,
3469                     "Reconfiguration of scripts path is denied for "
3470                     "security reasons. Check the deny-scripts-reconfig "
3471                     "configuration directive in your Sentinel configuration");
3472                 return;
3473             }
3474 
3475             if (strlen(value) && access(value,X_OK) == -1) {
3476                 addReplyError(c,
3477                     "Client reconfiguration script seems non existing or "
3478                     "non executable");
3479                 if (changes) sentinelFlushConfig();
3480                 return;
3481             }
3482             sdsfree(ri->client_reconfig_script);
3483             ri->client_reconfig_script = strlen(value) ? sdsnew(value) : NULL;
3484             changes++;
3485         } else if (!strcasecmp(option,"auth-pass") && moreargs > 0) {
3486             /* auth-pass <password> */
3487             char *value = c->argv[++j]->ptr;
3488             sdsfree(ri->auth_pass);
3489             ri->auth_pass = strlen(value) ? sdsnew(value) : NULL;
3490             changes++;
3491         } else if (!strcasecmp(option,"quorum") && moreargs > 0) {
3492             /* quorum <count> */
3493             robj *o = c->argv[++j];
3494             if (getLongLongFromObject(o,&ll) == C_ERR || ll <= 0) {
3495                 badarg = j;
3496                 goto badfmt;
3497             }
3498             ri->quorum = ll;
3499             changes++;
3500         } else if (!strcasecmp(option,"rename-command") && moreargs > 1) {
3501             /* rename-command <oldname> <newname> */
3502             sds oldname = c->argv[++j]->ptr;
3503             sds newname = c->argv[++j]->ptr;
3504 
3505             if ((sdslen(oldname) == 0) || (sdslen(newname) == 0)) {
3506                 badarg = sdslen(newname) ? j-1 : j;
3507                 goto badfmt;
3508             }
3509 
3510             /* Remove any older renaming for this command. */
3511             dictDelete(ri->renamed_commands,oldname);
3512 
3513             /* If the target name is the same as the source name there
3514              * is no need to add an entry mapping to itself. */
3515             if (!dictSdsKeyCaseCompare(NULL,oldname,newname)) {
3516                 oldname = sdsdup(oldname);
3517                 newname = sdsdup(newname);
3518                 dictAdd(ri->renamed_commands,oldname,newname);
3519             }
3520             changes++;
3521         } else {
3522             addReplyErrorFormat(c,"Unknown option or number of arguments for "
3523                                   "SENTINEL SET '%s'", option);
3524             if (changes) sentinelFlushConfig();
3525             return;
3526         }
3527 
3528         /* Log the event. */
3529         int numargs = j-old_j+1;
3530         switch(numargs) {
3531         case 2:
3532             sentinelEvent(LL_WARNING,"+set",ri,"%@ %s %s",c->argv[old_j]->ptr,
3533                                                           c->argv[old_j+1]->ptr);
3534             break;
3535         case 3:
3536             sentinelEvent(LL_WARNING,"+set",ri,"%@ %s %s %s",c->argv[old_j]->ptr,
3537                                                              c->argv[old_j+1]->ptr,
3538                                                              c->argv[old_j+2]->ptr);
3539             break;
3540         default:
3541             sentinelEvent(LL_WARNING,"+set",ri,"%@ %s",c->argv[old_j]->ptr);
3542             break;
3543         }
3544     }
3545 
3546     if (changes) sentinelFlushConfig();
3547     addReply(c,shared.ok);
3548     return;
3549 
3550 badfmt: /* Bad format errors */
3551     if (changes) sentinelFlushConfig();
3552     addReplyErrorFormat(c,"Invalid argument '%s' for SENTINEL SET '%s'",
3553         (char*)c->argv[badarg]->ptr,option);
3554 }
3555 
3556 /* Our fake PUBLISH command: it is actually useful only to receive hello messages
3557  * from the other sentinel instances, and publishing to a channel other than
3558  * SENTINEL_HELLO_CHANNEL is forbidden.
3559  *
3560  * Because we have a Sentinel PUBLISH, the code to send hello messages is the same
3561  * for all the three kind of instances: masters, slaves, sentinels. */
sentinelPublishCommand(client * c)3562 void sentinelPublishCommand(client *c) {
3563     if (strcmp(c->argv[1]->ptr,SENTINEL_HELLO_CHANNEL)) {
3564         addReplyError(c, "Only HELLO messages are accepted by Sentinel instances.");
3565         return;
3566     }
3567     sentinelProcessHelloMessage(c->argv[2]->ptr,sdslen(c->argv[2]->ptr));
3568     addReplyLongLong(c,1);
3569 }
3570 
3571 /* ===================== SENTINEL availability checks ======================= */
3572 
3573 /* Is this instance down from our point of view? */
sentinelCheckSubjectivelyDown(sentinelRedisInstance * ri)3574 void sentinelCheckSubjectivelyDown(sentinelRedisInstance *ri) {
3575     mstime_t elapsed = 0;
3576 
3577     if (ri->link->act_ping_time)
3578         elapsed = mstime() - ri->link->act_ping_time;
3579     else if (ri->link->disconnected)
3580         elapsed = mstime() - ri->link->last_avail_time;
3581 
3582     /* Check if we are in need for a reconnection of one of the
3583      * links, because we are detecting low activity.
3584      *
3585      * 1) Check if the command link seems connected, was connected not less
3586      *    than SENTINEL_MIN_LINK_RECONNECT_PERIOD, but still we have a
3587      *    pending ping for more than half the timeout. */
3588     if (ri->link->cc &&
3589         (mstime() - ri->link->cc_conn_time) >
3590         SENTINEL_MIN_LINK_RECONNECT_PERIOD &&
3591         ri->link->act_ping_time != 0 && /* There is a pending ping... */
3592         /* The pending ping is delayed, and we did not receive
3593          * error replies as well. */
3594         (mstime() - ri->link->act_ping_time) > (ri->down_after_period/2) &&
3595         (mstime() - ri->link->last_pong_time) > (ri->down_after_period/2))
3596     {
3597         instanceLinkCloseConnection(ri->link,ri->link->cc);
3598     }
3599 
3600     /* 2) Check if the pubsub link seems connected, was connected not less
3601      *    than SENTINEL_MIN_LINK_RECONNECT_PERIOD, but still we have no
3602      *    activity in the Pub/Sub channel for more than
3603      *    SENTINEL_PUBLISH_PERIOD * 3.
3604      */
3605     if (ri->link->pc &&
3606         (mstime() - ri->link->pc_conn_time) >
3607          SENTINEL_MIN_LINK_RECONNECT_PERIOD &&
3608         (mstime() - ri->link->pc_last_activity) > (SENTINEL_PUBLISH_PERIOD*3))
3609     {
3610         instanceLinkCloseConnection(ri->link,ri->link->pc);
3611     }
3612 
3613     /* Update the SDOWN flag. We believe the instance is SDOWN if:
3614      *
3615      * 1) It is not replying.
3616      * 2) We believe it is a master, it reports to be a slave for enough time
3617      *    to meet the down_after_period, plus enough time to get two times
3618      *    INFO report from the instance. */
3619     if (elapsed > ri->down_after_period ||
3620         (ri->flags & SRI_MASTER &&
3621          ri->role_reported == SRI_SLAVE &&
3622          mstime() - ri->role_reported_time >
3623           (ri->down_after_period+SENTINEL_INFO_PERIOD*2)))
3624     {
3625         /* Is subjectively down */
3626         if ((ri->flags & SRI_S_DOWN) == 0) {
3627             sentinelEvent(LL_WARNING,"+sdown",ri,"%@");
3628             ri->s_down_since_time = mstime();
3629             ri->flags |= SRI_S_DOWN;
3630         }
3631     } else {
3632         /* Is subjectively up */
3633         if (ri->flags & SRI_S_DOWN) {
3634             sentinelEvent(LL_WARNING,"-sdown",ri,"%@");
3635             ri->flags &= ~(SRI_S_DOWN|SRI_SCRIPT_KILL_SENT);
3636         }
3637     }
3638 }
3639 
3640 /* Is this instance down according to the configured quorum?
3641  *
3642  * Note that ODOWN is a weak quorum, it only means that enough Sentinels
3643  * reported in a given time range that the instance was not reachable.
3644  * However messages can be delayed so there are no strong guarantees about
3645  * N instances agreeing at the same time about the down state. */
sentinelCheckObjectivelyDown(sentinelRedisInstance * master)3646 void sentinelCheckObjectivelyDown(sentinelRedisInstance *master) {
3647     dictIterator *di;
3648     dictEntry *de;
3649     unsigned int quorum = 0, odown = 0;
3650 
3651     if (master->flags & SRI_S_DOWN) {
3652         /* Is down for enough sentinels? */
3653         quorum = 1; /* the current sentinel. */
3654         /* Count all the other sentinels. */
3655         di = dictGetIterator(master->sentinels);
3656         while((de = dictNext(di)) != NULL) {
3657             sentinelRedisInstance *ri = dictGetVal(de);
3658 
3659             if (ri->flags & SRI_MASTER_DOWN) quorum++;
3660         }
3661         dictReleaseIterator(di);
3662         if (quorum >= master->quorum) odown = 1;
3663     }
3664 
3665     /* Set the flag accordingly to the outcome. */
3666     if (odown) {
3667         if ((master->flags & SRI_O_DOWN) == 0) {
3668             sentinelEvent(LL_WARNING,"+odown",master,"%@ #quorum %d/%d",
3669                 quorum, master->quorum);
3670             master->flags |= SRI_O_DOWN;
3671             master->o_down_since_time = mstime();
3672         }
3673     } else {
3674         if (master->flags & SRI_O_DOWN) {
3675             sentinelEvent(LL_WARNING,"-odown",master,"%@");
3676             master->flags &= ~SRI_O_DOWN;
3677         }
3678     }
3679 }
3680 
3681 /* Receive the SENTINEL is-master-down-by-addr reply, see the
3682  * sentinelAskMasterStateToOtherSentinels() function for more information. */
sentinelReceiveIsMasterDownReply(redisAsyncContext * c,void * reply,void * privdata)3683 void sentinelReceiveIsMasterDownReply(redisAsyncContext *c, void *reply, void *privdata) {
3684     sentinelRedisInstance *ri = privdata;
3685     instanceLink *link = c->data;
3686     redisReply *r;
3687 
3688     if (!reply || !link) return;
3689     link->pending_commands--;
3690     r = reply;
3691 
3692     /* Ignore every error or unexpected reply.
3693      * Note that if the command returns an error for any reason we'll
3694      * end clearing the SRI_MASTER_DOWN flag for timeout anyway. */
3695     if (r->type == REDIS_REPLY_ARRAY && r->elements == 3 &&
3696         r->element[0]->type == REDIS_REPLY_INTEGER &&
3697         r->element[1]->type == REDIS_REPLY_STRING &&
3698         r->element[2]->type == REDIS_REPLY_INTEGER)
3699     {
3700         ri->last_master_down_reply_time = mstime();
3701         if (r->element[0]->integer == 1) {
3702             ri->flags |= SRI_MASTER_DOWN;
3703         } else {
3704             ri->flags &= ~SRI_MASTER_DOWN;
3705         }
3706         if (strcmp(r->element[1]->str,"*")) {
3707             /* If the runid in the reply is not "*" the Sentinel actually
3708              * replied with a vote. */
3709             sdsfree(ri->leader);
3710             if ((long long)ri->leader_epoch != r->element[2]->integer)
3711                 serverLog(LL_WARNING,
3712                     "%s voted for %s %llu", ri->name,
3713                     r->element[1]->str,
3714                     (unsigned long long) r->element[2]->integer);
3715             ri->leader = sdsnew(r->element[1]->str);
3716             ri->leader_epoch = r->element[2]->integer;
3717         }
3718     }
3719 }
3720 
3721 /* If we think the master is down, we start sending
3722  * SENTINEL IS-MASTER-DOWN-BY-ADDR requests to other sentinels
3723  * in order to get the replies that allow to reach the quorum
3724  * needed to mark the master in ODOWN state and trigger a failover. */
3725 #define SENTINEL_ASK_FORCED (1<<0)
sentinelAskMasterStateToOtherSentinels(sentinelRedisInstance * master,int flags)3726 void sentinelAskMasterStateToOtherSentinels(sentinelRedisInstance *master, int flags) {
3727     dictIterator *di;
3728     dictEntry *de;
3729 
3730     di = dictGetIterator(master->sentinels);
3731     while((de = dictNext(di)) != NULL) {
3732         sentinelRedisInstance *ri = dictGetVal(de);
3733         mstime_t elapsed = mstime() - ri->last_master_down_reply_time;
3734         char port[32];
3735         int retval;
3736 
3737         /* If the master state from other sentinel is too old, we clear it. */
3738         if (elapsed > SENTINEL_ASK_PERIOD*5) {
3739             ri->flags &= ~SRI_MASTER_DOWN;
3740             sdsfree(ri->leader);
3741             ri->leader = NULL;
3742         }
3743 
3744         /* Only ask if master is down to other sentinels if:
3745          *
3746          * 1) We believe it is down, or there is a failover in progress.
3747          * 2) Sentinel is connected.
3748          * 3) We did not receive the info within SENTINEL_ASK_PERIOD ms. */
3749         if ((master->flags & SRI_S_DOWN) == 0) continue;
3750         if (ri->link->disconnected) continue;
3751         if (!(flags & SENTINEL_ASK_FORCED) &&
3752             mstime() - ri->last_master_down_reply_time < SENTINEL_ASK_PERIOD)
3753             continue;
3754 
3755         /* Ask */
3756         ll2string(port,sizeof(port),master->addr->port);
3757         retval = redisAsyncCommand(ri->link->cc,
3758                     sentinelReceiveIsMasterDownReply, ri,
3759                     "%s is-master-down-by-addr %s %s %llu %s",
3760                     sentinelInstanceMapCommand(ri,"SENTINEL"),
3761                     master->addr->ip, port,
3762                     sentinel.current_epoch,
3763                     (master->failover_state > SENTINEL_FAILOVER_STATE_NONE) ?
3764                     sentinel.myid : "*");
3765         if (retval == C_OK) ri->link->pending_commands++;
3766     }
3767     dictReleaseIterator(di);
3768 }
3769 
3770 /* =============================== FAILOVER ================================= */
3771 
3772 /* Crash because of user request via SENTINEL simulate-failure command. */
sentinelSimFailureCrash(void)3773 void sentinelSimFailureCrash(void) {
3774     serverLog(LL_WARNING,
3775         "Sentinel CRASH because of SENTINEL simulate-failure");
3776     exit(99);
3777 }
3778 
3779 /* Vote for the sentinel with 'req_runid' or return the old vote if already
3780  * voted for the specified 'req_epoch' or one greater.
3781  *
3782  * If a vote is not available returns NULL, otherwise return the Sentinel
3783  * runid and populate the leader_epoch with the epoch of the vote. */
sentinelVoteLeader(sentinelRedisInstance * master,uint64_t req_epoch,char * req_runid,uint64_t * leader_epoch)3784 char *sentinelVoteLeader(sentinelRedisInstance *master, uint64_t req_epoch, char *req_runid, uint64_t *leader_epoch) {
3785     if (req_epoch > sentinel.current_epoch) {
3786         sentinel.current_epoch = req_epoch;
3787         sentinelFlushConfig();
3788         sentinelEvent(LL_WARNING,"+new-epoch",master,"%llu",
3789             (unsigned long long) sentinel.current_epoch);
3790     }
3791 
3792     if (master->leader_epoch < req_epoch && sentinel.current_epoch <= req_epoch)
3793     {
3794         sdsfree(master->leader);
3795         master->leader = sdsnew(req_runid);
3796         master->leader_epoch = sentinel.current_epoch;
3797         sentinelFlushConfig();
3798         sentinelEvent(LL_WARNING,"+vote-for-leader",master,"%s %llu",
3799             master->leader, (unsigned long long) master->leader_epoch);
3800         /* If we did not voted for ourselves, set the master failover start
3801          * time to now, in order to force a delay before we can start a
3802          * failover for the same master. */
3803         if (strcasecmp(master->leader,sentinel.myid))
3804             master->failover_start_time = mstime()+rand()%SENTINEL_MAX_DESYNC;
3805     }
3806 
3807     *leader_epoch = master->leader_epoch;
3808     return master->leader ? sdsnew(master->leader) : NULL;
3809 }
3810 
3811 struct sentinelLeader {
3812     char *runid;
3813     unsigned long votes;
3814 };
3815 
3816 /* Helper function for sentinelGetLeader, increment the counter
3817  * relative to the specified runid. */
sentinelLeaderIncr(dict * counters,char * runid)3818 int sentinelLeaderIncr(dict *counters, char *runid) {
3819     dictEntry *existing, *de;
3820     uint64_t oldval;
3821 
3822     de = dictAddRaw(counters,runid,&existing);
3823     if (existing) {
3824         oldval = dictGetUnsignedIntegerVal(existing);
3825         dictSetUnsignedIntegerVal(existing,oldval+1);
3826         return oldval+1;
3827     } else {
3828         serverAssert(de != NULL);
3829         dictSetUnsignedIntegerVal(de,1);
3830         return 1;
3831     }
3832 }
3833 
3834 /* Scan all the Sentinels attached to this master to check if there
3835  * is a leader for the specified epoch.
3836  *
3837  * To be a leader for a given epoch, we should have the majority of
3838  * the Sentinels we know (ever seen since the last SENTINEL RESET) that
3839  * reported the same instance as leader for the same epoch. */
sentinelGetLeader(sentinelRedisInstance * master,uint64_t epoch)3840 char *sentinelGetLeader(sentinelRedisInstance *master, uint64_t epoch) {
3841     dict *counters;
3842     dictIterator *di;
3843     dictEntry *de;
3844     unsigned int voters = 0, voters_quorum;
3845     char *myvote;
3846     char *winner = NULL;
3847     uint64_t leader_epoch;
3848     uint64_t max_votes = 0;
3849 
3850     serverAssert(master->flags & (SRI_O_DOWN|SRI_FAILOVER_IN_PROGRESS));
3851     counters = dictCreate(&leaderVotesDictType,NULL);
3852 
3853     voters = dictSize(master->sentinels)+1; /* All the other sentinels and me.*/
3854 
3855     /* Count other sentinels votes */
3856     di = dictGetIterator(master->sentinels);
3857     while((de = dictNext(di)) != NULL) {
3858         sentinelRedisInstance *ri = dictGetVal(de);
3859         if (ri->leader != NULL && ri->leader_epoch == sentinel.current_epoch)
3860             sentinelLeaderIncr(counters,ri->leader);
3861     }
3862     dictReleaseIterator(di);
3863 
3864     /* Check what's the winner. For the winner to win, it needs two conditions:
3865      * 1) Absolute majority between voters (50% + 1).
3866      * 2) And anyway at least master->quorum votes. */
3867     di = dictGetIterator(counters);
3868     while((de = dictNext(di)) != NULL) {
3869         uint64_t votes = dictGetUnsignedIntegerVal(de);
3870 
3871         if (votes > max_votes) {
3872             max_votes = votes;
3873             winner = dictGetKey(de);
3874         }
3875     }
3876     dictReleaseIterator(di);
3877 
3878     /* Count this Sentinel vote:
3879      * if this Sentinel did not voted yet, either vote for the most
3880      * common voted sentinel, or for itself if no vote exists at all. */
3881     if (winner)
3882         myvote = sentinelVoteLeader(master,epoch,winner,&leader_epoch);
3883     else
3884         myvote = sentinelVoteLeader(master,epoch,sentinel.myid,&leader_epoch);
3885 
3886     if (myvote && leader_epoch == epoch) {
3887         uint64_t votes = sentinelLeaderIncr(counters,myvote);
3888 
3889         if (votes > max_votes) {
3890             max_votes = votes;
3891             winner = myvote;
3892         }
3893     }
3894 
3895     voters_quorum = voters/2+1;
3896     if (winner && (max_votes < voters_quorum || max_votes < master->quorum))
3897         winner = NULL;
3898 
3899     winner = winner ? sdsnew(winner) : NULL;
3900     sdsfree(myvote);
3901     dictRelease(counters);
3902     return winner;
3903 }
3904 
3905 /* Send SLAVEOF to the specified instance, always followed by a
3906  * CONFIG REWRITE command in order to store the new configuration on disk
3907  * when possible (that is, if the Redis instance is recent enough to support
3908  * config rewriting, and if the server was started with a configuration file).
3909  *
3910  * If Host is NULL the function sends "SLAVEOF NO ONE".
3911  *
3912  * The command returns C_OK if the SLAVEOF command was accepted for
3913  * (later) delivery otherwise C_ERR. The command replies are just
3914  * discarded. */
sentinelSendSlaveOf(sentinelRedisInstance * ri,char * host,int port)3915 int sentinelSendSlaveOf(sentinelRedisInstance *ri, char *host, int port) {
3916     char portstr[32];
3917     int retval;
3918 
3919     ll2string(portstr,sizeof(portstr),port);
3920 
3921     /* If host is NULL we send SLAVEOF NO ONE that will turn the instance
3922      * into a master. */
3923     if (host == NULL) {
3924         host = "NO";
3925         memcpy(portstr,"ONE",4);
3926     }
3927 
3928     /* In order to send SLAVEOF in a safe way, we send a transaction performing
3929      * the following tasks:
3930      * 1) Reconfigure the instance according to the specified host/port params.
3931      * 2) Rewrite the configuration.
3932      * 3) Disconnect all clients (but this one sending the commnad) in order
3933      *    to trigger the ask-master-on-reconnection protocol for connected
3934      *    clients.
3935      *
3936      * Note that we don't check the replies returned by commands, since we
3937      * will observe instead the effects in the next INFO output. */
3938     retval = redisAsyncCommand(ri->link->cc,
3939         sentinelDiscardReplyCallback, ri, "%s",
3940         sentinelInstanceMapCommand(ri,"MULTI"));
3941     if (retval == C_ERR) return retval;
3942     ri->link->pending_commands++;
3943 
3944     retval = redisAsyncCommand(ri->link->cc,
3945         sentinelDiscardReplyCallback, ri, "%s %s %s",
3946         sentinelInstanceMapCommand(ri,"SLAVEOF"),
3947         host, portstr);
3948     if (retval == C_ERR) return retval;
3949     ri->link->pending_commands++;
3950 
3951     retval = redisAsyncCommand(ri->link->cc,
3952         sentinelDiscardReplyCallback, ri, "%s REWRITE",
3953         sentinelInstanceMapCommand(ri,"CONFIG"));
3954     if (retval == C_ERR) return retval;
3955     ri->link->pending_commands++;
3956 
3957     /* CLIENT KILL TYPE <type> is only supported starting from Redis 2.8.12,
3958      * however sending it to an instance not understanding this command is not
3959      * an issue because CLIENT is variadic command, so Redis will not
3960      * recognized as a syntax error, and the transaction will not fail (but
3961      * only the unsupported command will fail). */
3962     retval = redisAsyncCommand(ri->link->cc,
3963         sentinelDiscardReplyCallback, ri, "%s KILL TYPE normal",
3964         sentinelInstanceMapCommand(ri,"CLIENT"));
3965     if (retval == C_ERR) return retval;
3966     ri->link->pending_commands++;
3967 
3968     retval = redisAsyncCommand(ri->link->cc,
3969         sentinelDiscardReplyCallback, ri, "%s",
3970         sentinelInstanceMapCommand(ri,"EXEC"));
3971     if (retval == C_ERR) return retval;
3972     ri->link->pending_commands++;
3973 
3974     return C_OK;
3975 }
3976 
3977 /* Setup the master state to start a failover. */
sentinelStartFailover(sentinelRedisInstance * master)3978 void sentinelStartFailover(sentinelRedisInstance *master) {
3979     serverAssert(master->flags & SRI_MASTER);
3980 
3981     master->failover_state = SENTINEL_FAILOVER_STATE_WAIT_START;
3982     master->flags |= SRI_FAILOVER_IN_PROGRESS;
3983     master->failover_epoch = ++sentinel.current_epoch;
3984     sentinelEvent(LL_WARNING,"+new-epoch",master,"%llu",
3985         (unsigned long long) sentinel.current_epoch);
3986     sentinelEvent(LL_WARNING,"+try-failover",master,"%@");
3987     master->failover_start_time = mstime()+rand()%SENTINEL_MAX_DESYNC;
3988     master->failover_state_change_time = mstime();
3989 }
3990 
3991 /* This function checks if there are the conditions to start the failover,
3992  * that is:
3993  *
3994  * 1) Master must be in ODOWN condition.
3995  * 2) No failover already in progress.
3996  * 3) No failover already attempted recently.
3997  *
3998  * We still don't know if we'll win the election so it is possible that we
3999  * start the failover but that we'll not be able to act.
4000  *
4001  * Return non-zero if a failover was started. */
sentinelStartFailoverIfNeeded(sentinelRedisInstance * master)4002 int sentinelStartFailoverIfNeeded(sentinelRedisInstance *master) {
4003     /* We can't failover if the master is not in O_DOWN state. */
4004     if (!(master->flags & SRI_O_DOWN)) return 0;
4005 
4006     /* Failover already in progress? */
4007     if (master->flags & SRI_FAILOVER_IN_PROGRESS) return 0;
4008 
4009     /* Last failover attempt started too little time ago? */
4010     if (mstime() - master->failover_start_time <
4011         master->failover_timeout*2)
4012     {
4013         if (master->failover_delay_logged != master->failover_start_time) {
4014             time_t clock = (master->failover_start_time +
4015                             master->failover_timeout*2) / 1000;
4016             char ctimebuf[26];
4017 
4018             ctime_r(&clock,ctimebuf);
4019             ctimebuf[24] = '\0'; /* Remove newline. */
4020             master->failover_delay_logged = master->failover_start_time;
4021             serverLog(LL_WARNING,
4022                 "Next failover delay: I will not start a failover before %s",
4023                 ctimebuf);
4024         }
4025         return 0;
4026     }
4027 
4028     sentinelStartFailover(master);
4029     return 1;
4030 }
4031 
4032 /* Select a suitable slave to promote. The current algorithm only uses
4033  * the following parameters:
4034  *
4035  * 1) None of the following conditions: S_DOWN, O_DOWN, DISCONNECTED.
4036  * 2) Last time the slave replied to ping no more than 5 times the PING period.
4037  * 3) info_refresh not older than 3 times the INFO refresh period.
4038  * 4) master_link_down_time no more than:
4039  *     (now - master->s_down_since_time) + (master->down_after_period * 10).
4040  *    Basically since the master is down from our POV, the slave reports
4041  *    to be disconnected no more than 10 times the configured down-after-period.
4042  *    This is pretty much black magic but the idea is, the master was not
4043  *    available so the slave may be lagging, but not over a certain time.
4044  *    Anyway we'll select the best slave according to replication offset.
4045  * 5) Slave priority can't be zero, otherwise the slave is discarded.
4046  *
4047  * Among all the slaves matching the above conditions we select the slave
4048  * with, in order of sorting key:
4049  *
4050  * - lower slave_priority.
4051  * - bigger processed replication offset.
4052  * - lexicographically smaller runid.
4053  *
4054  * Basically if runid is the same, the slave that processed more commands
4055  * from the master is selected.
4056  *
4057  * The function returns the pointer to the selected slave, otherwise
4058  * NULL if no suitable slave was found.
4059  */
4060 
4061 /* Helper for sentinelSelectSlave(). This is used by qsort() in order to
4062  * sort suitable slaves in a "better first" order, to take the first of
4063  * the list. */
compareSlavesForPromotion(const void * a,const void * b)4064 int compareSlavesForPromotion(const void *a, const void *b) {
4065     sentinelRedisInstance **sa = (sentinelRedisInstance **)a,
4066                           **sb = (sentinelRedisInstance **)b;
4067     char *sa_runid, *sb_runid;
4068 
4069     if ((*sa)->slave_priority != (*sb)->slave_priority)
4070         return (*sa)->slave_priority - (*sb)->slave_priority;
4071 
4072     /* If priority is the same, select the slave with greater replication
4073      * offset (processed more data from the master). */
4074     if ((*sa)->slave_repl_offset > (*sb)->slave_repl_offset) {
4075         return -1; /* a < b */
4076     } else if ((*sa)->slave_repl_offset < (*sb)->slave_repl_offset) {
4077         return 1; /* a > b */
4078     }
4079 
4080     /* If the replication offset is the same select the slave with that has
4081      * the lexicographically smaller runid. Note that we try to handle runid
4082      * == NULL as there are old Redis versions that don't publish runid in
4083      * INFO. A NULL runid is considered bigger than any other runid. */
4084     sa_runid = (*sa)->runid;
4085     sb_runid = (*sb)->runid;
4086     if (sa_runid == NULL && sb_runid == NULL) return 0;
4087     else if (sa_runid == NULL) return 1;  /* a > b */
4088     else if (sb_runid == NULL) return -1; /* a < b */
4089     return strcasecmp(sa_runid, sb_runid);
4090 }
4091 
sentinelSelectSlave(sentinelRedisInstance * master)4092 sentinelRedisInstance *sentinelSelectSlave(sentinelRedisInstance *master) {
4093     sentinelRedisInstance **instance =
4094         zmalloc(sizeof(instance[0])*dictSize(master->slaves));
4095     sentinelRedisInstance *selected = NULL;
4096     int instances = 0;
4097     dictIterator *di;
4098     dictEntry *de;
4099     mstime_t max_master_down_time = 0;
4100 
4101     if (master->flags & SRI_S_DOWN)
4102         max_master_down_time += mstime() - master->s_down_since_time;
4103     max_master_down_time += master->down_after_period * 10;
4104 
4105     di = dictGetIterator(master->slaves);
4106     while((de = dictNext(di)) != NULL) {
4107         sentinelRedisInstance *slave = dictGetVal(de);
4108         mstime_t info_validity_time;
4109 
4110         if (slave->flags & (SRI_S_DOWN|SRI_O_DOWN)) continue;
4111         if (slave->link->disconnected) continue;
4112         if (mstime() - slave->link->last_avail_time > SENTINEL_PING_PERIOD*5) continue;
4113         if (slave->slave_priority == 0) continue;
4114 
4115         /* If the master is in SDOWN state we get INFO for slaves every second.
4116          * Otherwise we get it with the usual period so we need to account for
4117          * a larger delay. */
4118         if (master->flags & SRI_S_DOWN)
4119             info_validity_time = SENTINEL_PING_PERIOD*5;
4120         else
4121             info_validity_time = SENTINEL_INFO_PERIOD*3;
4122         if (mstime() - slave->info_refresh > info_validity_time) continue;
4123         if (slave->master_link_down_time > max_master_down_time) continue;
4124         instance[instances++] = slave;
4125     }
4126     dictReleaseIterator(di);
4127     if (instances) {
4128         qsort(instance,instances,sizeof(sentinelRedisInstance*),
4129             compareSlavesForPromotion);
4130         selected = instance[0];
4131     }
4132     zfree(instance);
4133     return selected;
4134 }
4135 
4136 /* ---------------- Failover state machine implementation ------------------- */
sentinelFailoverWaitStart(sentinelRedisInstance * ri)4137 void sentinelFailoverWaitStart(sentinelRedisInstance *ri) {
4138     char *leader;
4139     int isleader;
4140 
4141     /* Check if we are the leader for the failover epoch. */
4142     leader = sentinelGetLeader(ri, ri->failover_epoch);
4143     isleader = leader && strcasecmp(leader,sentinel.myid) == 0;
4144     sdsfree(leader);
4145 
4146     /* If I'm not the leader, and it is not a forced failover via
4147      * SENTINEL FAILOVER, then I can't continue with the failover. */
4148     if (!isleader && !(ri->flags & SRI_FORCE_FAILOVER)) {
4149         int election_timeout = SENTINEL_ELECTION_TIMEOUT;
4150 
4151         /* The election timeout is the MIN between SENTINEL_ELECTION_TIMEOUT
4152          * and the configured failover timeout. */
4153         if (election_timeout > ri->failover_timeout)
4154             election_timeout = ri->failover_timeout;
4155         /* Abort the failover if I'm not the leader after some time. */
4156         if (mstime() - ri->failover_start_time > election_timeout) {
4157             sentinelEvent(LL_WARNING,"-failover-abort-not-elected",ri,"%@");
4158             sentinelAbortFailover(ri);
4159         }
4160         return;
4161     }
4162     sentinelEvent(LL_WARNING,"+elected-leader",ri,"%@");
4163     if (sentinel.simfailure_flags & SENTINEL_SIMFAILURE_CRASH_AFTER_ELECTION)
4164         sentinelSimFailureCrash();
4165     ri->failover_state = SENTINEL_FAILOVER_STATE_SELECT_SLAVE;
4166     ri->failover_state_change_time = mstime();
4167     sentinelEvent(LL_WARNING,"+failover-state-select-slave",ri,"%@");
4168 }
4169 
sentinelFailoverSelectSlave(sentinelRedisInstance * ri)4170 void sentinelFailoverSelectSlave(sentinelRedisInstance *ri) {
4171     sentinelRedisInstance *slave = sentinelSelectSlave(ri);
4172 
4173     /* We don't handle the timeout in this state as the function aborts
4174      * the failover or go forward in the next state. */
4175     if (slave == NULL) {
4176         sentinelEvent(LL_WARNING,"-failover-abort-no-good-slave",ri,"%@");
4177         sentinelAbortFailover(ri);
4178     } else {
4179         sentinelEvent(LL_WARNING,"+selected-slave",slave,"%@");
4180         slave->flags |= SRI_PROMOTED;
4181         ri->promoted_slave = slave;
4182         ri->failover_state = SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE;
4183         ri->failover_state_change_time = mstime();
4184         sentinelEvent(LL_NOTICE,"+failover-state-send-slaveof-noone",
4185             slave, "%@");
4186     }
4187 }
4188 
sentinelFailoverSendSlaveOfNoOne(sentinelRedisInstance * ri)4189 void sentinelFailoverSendSlaveOfNoOne(sentinelRedisInstance *ri) {
4190     int retval;
4191 
4192     /* We can't send the command to the promoted slave if it is now
4193      * disconnected. Retry again and again with this state until the timeout
4194      * is reached, then abort the failover. */
4195     if (ri->promoted_slave->link->disconnected) {
4196         if (mstime() - ri->failover_state_change_time > ri->failover_timeout) {
4197             sentinelEvent(LL_WARNING,"-failover-abort-slave-timeout",ri,"%@");
4198             sentinelAbortFailover(ri);
4199         }
4200         return;
4201     }
4202 
4203     /* Send SLAVEOF NO ONE command to turn the slave into a master.
4204      * We actually register a generic callback for this command as we don't
4205      * really care about the reply. We check if it worked indirectly observing
4206      * if INFO returns a different role (master instead of slave). */
4207     retval = sentinelSendSlaveOf(ri->promoted_slave,NULL,0);
4208     if (retval != C_OK) return;
4209     sentinelEvent(LL_NOTICE, "+failover-state-wait-promotion",
4210         ri->promoted_slave,"%@");
4211     ri->failover_state = SENTINEL_FAILOVER_STATE_WAIT_PROMOTION;
4212     ri->failover_state_change_time = mstime();
4213 }
4214 
4215 /* We actually wait for promotion indirectly checking with INFO when the
4216  * slave turns into a master. */
sentinelFailoverWaitPromotion(sentinelRedisInstance * ri)4217 void sentinelFailoverWaitPromotion(sentinelRedisInstance *ri) {
4218     /* Just handle the timeout. Switching to the next state is handled
4219      * by the function parsing the INFO command of the promoted slave. */
4220     if (mstime() - ri->failover_state_change_time > ri->failover_timeout) {
4221         sentinelEvent(LL_WARNING,"-failover-abort-slave-timeout",ri,"%@");
4222         sentinelAbortFailover(ri);
4223     }
4224 }
4225 
sentinelFailoverDetectEnd(sentinelRedisInstance * master)4226 void sentinelFailoverDetectEnd(sentinelRedisInstance *master) {
4227     int not_reconfigured = 0, timeout = 0;
4228     dictIterator *di;
4229     dictEntry *de;
4230     mstime_t elapsed = mstime() - master->failover_state_change_time;
4231 
4232     /* We can't consider failover finished if the promoted slave is
4233      * not reachable. */
4234     if (master->promoted_slave == NULL ||
4235         master->promoted_slave->flags & SRI_S_DOWN) return;
4236 
4237     /* The failover terminates once all the reachable slaves are properly
4238      * configured. */
4239     di = dictGetIterator(master->slaves);
4240     while((de = dictNext(di)) != NULL) {
4241         sentinelRedisInstance *slave = dictGetVal(de);
4242 
4243         if (slave->flags & (SRI_PROMOTED|SRI_RECONF_DONE)) continue;
4244         if (slave->flags & SRI_S_DOWN) continue;
4245         not_reconfigured++;
4246     }
4247     dictReleaseIterator(di);
4248 
4249     /* Force end of failover on timeout. */
4250     if (elapsed > master->failover_timeout) {
4251         not_reconfigured = 0;
4252         timeout = 1;
4253         sentinelEvent(LL_WARNING,"+failover-end-for-timeout",master,"%@");
4254     }
4255 
4256     if (not_reconfigured == 0) {
4257         sentinelEvent(LL_WARNING,"+failover-end",master,"%@");
4258         master->failover_state = SENTINEL_FAILOVER_STATE_UPDATE_CONFIG;
4259         master->failover_state_change_time = mstime();
4260     }
4261 
4262     /* If I'm the leader it is a good idea to send a best effort SLAVEOF
4263      * command to all the slaves still not reconfigured to replicate with
4264      * the new master. */
4265     if (timeout) {
4266         dictIterator *di;
4267         dictEntry *de;
4268 
4269         di = dictGetIterator(master->slaves);
4270         while((de = dictNext(di)) != NULL) {
4271             sentinelRedisInstance *slave = dictGetVal(de);
4272             int retval;
4273 
4274             if (slave->flags & (SRI_RECONF_DONE|SRI_RECONF_SENT)) continue;
4275             if (slave->link->disconnected) continue;
4276 
4277             retval = sentinelSendSlaveOf(slave,
4278                     master->promoted_slave->addr->ip,
4279                     master->promoted_slave->addr->port);
4280             if (retval == C_OK) {
4281                 sentinelEvent(LL_NOTICE,"+slave-reconf-sent-be",slave,"%@");
4282                 slave->flags |= SRI_RECONF_SENT;
4283             }
4284         }
4285         dictReleaseIterator(di);
4286     }
4287 }
4288 
4289 /* Send SLAVE OF <new master address> to all the remaining slaves that
4290  * still don't appear to have the configuration updated. */
sentinelFailoverReconfNextSlave(sentinelRedisInstance * master)4291 void sentinelFailoverReconfNextSlave(sentinelRedisInstance *master) {
4292     dictIterator *di;
4293     dictEntry *de;
4294     int in_progress = 0;
4295 
4296     di = dictGetIterator(master->slaves);
4297     while((de = dictNext(di)) != NULL) {
4298         sentinelRedisInstance *slave = dictGetVal(de);
4299 
4300         if (slave->flags & (SRI_RECONF_SENT|SRI_RECONF_INPROG))
4301             in_progress++;
4302     }
4303     dictReleaseIterator(di);
4304 
4305     di = dictGetIterator(master->slaves);
4306     while(in_progress < master->parallel_syncs &&
4307           (de = dictNext(di)) != NULL)
4308     {
4309         sentinelRedisInstance *slave = dictGetVal(de);
4310         int retval;
4311 
4312         /* Skip the promoted slave, and already configured slaves. */
4313         if (slave->flags & (SRI_PROMOTED|SRI_RECONF_DONE)) continue;
4314 
4315         /* If too much time elapsed without the slave moving forward to
4316          * the next state, consider it reconfigured even if it is not.
4317          * Sentinels will detect the slave as misconfigured and fix its
4318          * configuration later. */
4319         if ((slave->flags & SRI_RECONF_SENT) &&
4320             (mstime() - slave->slave_reconf_sent_time) >
4321             SENTINEL_SLAVE_RECONF_TIMEOUT)
4322         {
4323             sentinelEvent(LL_NOTICE,"-slave-reconf-sent-timeout",slave,"%@");
4324             slave->flags &= ~SRI_RECONF_SENT;
4325             slave->flags |= SRI_RECONF_DONE;
4326         }
4327 
4328         /* Nothing to do for instances that are disconnected or already
4329          * in RECONF_SENT state. */
4330         if (slave->flags & (SRI_RECONF_SENT|SRI_RECONF_INPROG)) continue;
4331         if (slave->link->disconnected) continue;
4332 
4333         /* Send SLAVEOF <new master>. */
4334         retval = sentinelSendSlaveOf(slave,
4335                 master->promoted_slave->addr->ip,
4336                 master->promoted_slave->addr->port);
4337         if (retval == C_OK) {
4338             slave->flags |= SRI_RECONF_SENT;
4339             slave->slave_reconf_sent_time = mstime();
4340             sentinelEvent(LL_NOTICE,"+slave-reconf-sent",slave,"%@");
4341             in_progress++;
4342         }
4343     }
4344     dictReleaseIterator(di);
4345 
4346     /* Check if all the slaves are reconfigured and handle timeout. */
4347     sentinelFailoverDetectEnd(master);
4348 }
4349 
4350 /* This function is called when the slave is in
4351  * SENTINEL_FAILOVER_STATE_UPDATE_CONFIG state. In this state we need
4352  * to remove it from the master table and add the promoted slave instead. */
sentinelFailoverSwitchToPromotedSlave(sentinelRedisInstance * master)4353 void sentinelFailoverSwitchToPromotedSlave(sentinelRedisInstance *master) {
4354     sentinelRedisInstance *ref = master->promoted_slave ?
4355                                  master->promoted_slave : master;
4356 
4357     sentinelEvent(LL_WARNING,"+switch-master",master,"%s %s %d %s %d",
4358         master->name, master->addr->ip, master->addr->port,
4359         ref->addr->ip, ref->addr->port);
4360 
4361     sentinelResetMasterAndChangeAddress(master,ref->addr->ip,ref->addr->port);
4362 }
4363 
sentinelFailoverStateMachine(sentinelRedisInstance * ri)4364 void sentinelFailoverStateMachine(sentinelRedisInstance *ri) {
4365     serverAssert(ri->flags & SRI_MASTER);
4366 
4367     if (!(ri->flags & SRI_FAILOVER_IN_PROGRESS)) return;
4368 
4369     switch(ri->failover_state) {
4370         case SENTINEL_FAILOVER_STATE_WAIT_START:
4371             sentinelFailoverWaitStart(ri);
4372             break;
4373         case SENTINEL_FAILOVER_STATE_SELECT_SLAVE:
4374             sentinelFailoverSelectSlave(ri);
4375             break;
4376         case SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE:
4377             sentinelFailoverSendSlaveOfNoOne(ri);
4378             break;
4379         case SENTINEL_FAILOVER_STATE_WAIT_PROMOTION:
4380             sentinelFailoverWaitPromotion(ri);
4381             break;
4382         case SENTINEL_FAILOVER_STATE_RECONF_SLAVES:
4383             sentinelFailoverReconfNextSlave(ri);
4384             break;
4385     }
4386 }
4387 
4388 /* Abort a failover in progress:
4389  *
4390  * This function can only be called before the promoted slave acknowledged
4391  * the slave -> master switch. Otherwise the failover can't be aborted and
4392  * will reach its end (possibly by timeout). */
sentinelAbortFailover(sentinelRedisInstance * ri)4393 void sentinelAbortFailover(sentinelRedisInstance *ri) {
4394     serverAssert(ri->flags & SRI_FAILOVER_IN_PROGRESS);
4395     serverAssert(ri->failover_state <= SENTINEL_FAILOVER_STATE_WAIT_PROMOTION);
4396 
4397     ri->flags &= ~(SRI_FAILOVER_IN_PROGRESS|SRI_FORCE_FAILOVER);
4398     ri->failover_state = SENTINEL_FAILOVER_STATE_NONE;
4399     ri->failover_state_change_time = mstime();
4400     if (ri->promoted_slave) {
4401         ri->promoted_slave->flags &= ~SRI_PROMOTED;
4402         ri->promoted_slave = NULL;
4403     }
4404 }
4405 
4406 /* ======================== SENTINEL timer handler ==========================
4407  * This is the "main" our Sentinel, being sentinel completely non blocking
4408  * in design. The function is called every second.
4409  * -------------------------------------------------------------------------- */
4410 
4411 /* Perform scheduled operations for the specified Redis instance. */
sentinelHandleRedisInstance(sentinelRedisInstance * ri)4412 void sentinelHandleRedisInstance(sentinelRedisInstance *ri) {
4413     /* ========== MONITORING HALF ============ */
4414     /* Every kind of instance */
4415     sentinelReconnectInstance(ri);
4416     sentinelSendPeriodicCommands(ri);
4417 
4418     /* ============== ACTING HALF ============= */
4419     /* We don't proceed with the acting half if we are in TILT mode.
4420      * TILT happens when we find something odd with the time, like a
4421      * sudden change in the clock. */
4422     if (sentinel.tilt) {
4423         if (mstime()-sentinel.tilt_start_time < SENTINEL_TILT_PERIOD) return;
4424         sentinel.tilt = 0;
4425         sentinelEvent(LL_WARNING,"-tilt",NULL,"#tilt mode exited");
4426     }
4427 
4428     /* Every kind of instance */
4429     sentinelCheckSubjectivelyDown(ri);
4430 
4431     /* Masters and slaves */
4432     if (ri->flags & (SRI_MASTER|SRI_SLAVE)) {
4433         /* Nothing so far. */
4434     }
4435 
4436     /* Only masters */
4437     if (ri->flags & SRI_MASTER) {
4438         sentinelCheckObjectivelyDown(ri);
4439         if (sentinelStartFailoverIfNeeded(ri))
4440             sentinelAskMasterStateToOtherSentinels(ri,SENTINEL_ASK_FORCED);
4441         sentinelFailoverStateMachine(ri);
4442         sentinelAskMasterStateToOtherSentinels(ri,SENTINEL_NO_FLAGS);
4443     }
4444 }
4445 
4446 /* Perform scheduled operations for all the instances in the dictionary.
4447  * Recursively call the function against dictionaries of slaves. */
sentinelHandleDictOfRedisInstances(dict * instances)4448 void sentinelHandleDictOfRedisInstances(dict *instances) {
4449     dictIterator *di;
4450     dictEntry *de;
4451     sentinelRedisInstance *switch_to_promoted = NULL;
4452 
4453     /* There are a number of things we need to perform against every master. */
4454     di = dictGetIterator(instances);
4455     while((de = dictNext(di)) != NULL) {
4456         sentinelRedisInstance *ri = dictGetVal(de);
4457 
4458         sentinelHandleRedisInstance(ri);
4459         if (ri->flags & SRI_MASTER) {
4460             sentinelHandleDictOfRedisInstances(ri->slaves);
4461             sentinelHandleDictOfRedisInstances(ri->sentinels);
4462             if (ri->failover_state == SENTINEL_FAILOVER_STATE_UPDATE_CONFIG) {
4463                 switch_to_promoted = ri;
4464             }
4465         }
4466     }
4467     if (switch_to_promoted)
4468         sentinelFailoverSwitchToPromotedSlave(switch_to_promoted);
4469     dictReleaseIterator(di);
4470 }
4471 
4472 /* This function checks if we need to enter the TITL mode.
4473  *
4474  * The TILT mode is entered if we detect that between two invocations of the
4475  * timer interrupt, a negative amount of time, or too much time has passed.
4476  * Note that we expect that more or less just 100 milliseconds will pass
4477  * if everything is fine. However we'll see a negative number or a
4478  * difference bigger than SENTINEL_TILT_TRIGGER milliseconds if one of the
4479  * following conditions happen:
4480  *
4481  * 1) The Sentiel process for some time is blocked, for every kind of
4482  * random reason: the load is huge, the computer was frozen for some time
4483  * in I/O or alike, the process was stopped by a signal. Everything.
4484  * 2) The system clock was altered significantly.
4485  *
4486  * Under both this conditions we'll see everything as timed out and failing
4487  * without good reasons. Instead we enter the TILT mode and wait
4488  * for SENTINEL_TILT_PERIOD to elapse before starting to act again.
4489  *
4490  * During TILT time we still collect information, we just do not act. */
sentinelCheckTiltCondition(void)4491 void sentinelCheckTiltCondition(void) {
4492     mstime_t now = mstime();
4493     mstime_t delta = now - sentinel.previous_time;
4494 
4495     if (delta < 0 || delta > SENTINEL_TILT_TRIGGER) {
4496         sentinel.tilt = 1;
4497         sentinel.tilt_start_time = mstime();
4498         sentinelEvent(LL_WARNING,"+tilt",NULL,"#tilt mode entered");
4499     }
4500     sentinel.previous_time = mstime();
4501 }
4502 
sentinelTimer(void)4503 void sentinelTimer(void) {
4504     sentinelCheckTiltCondition();
4505     sentinelHandleDictOfRedisInstances(sentinel.masters);
4506     sentinelRunPendingScripts();
4507     sentinelCollectTerminatedScripts();
4508     sentinelKillTimedoutScripts();
4509 
4510     /* We continuously change the frequency of the Redis "timer interrupt"
4511      * in order to desynchronize every Sentinel from every other.
4512      * This non-determinism avoids that Sentinels started at the same time
4513      * exactly continue to stay synchronized asking to be voted at the
4514      * same time again and again (resulting in nobody likely winning the
4515      * election because of split brain voting). */
4516     server.hz = CONFIG_DEFAULT_HZ + rand() % CONFIG_DEFAULT_HZ;
4517 }
4518 
4519