xref: /redis-3.2.3/src/sentinel.c (revision 5b5e6520)
1 /* Redis Sentinel implementation
2  *
3  * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
4  * All rights reserved.
5  *
6  * Redistribution and use in source and binary forms, with or without
7  * modification, are permitted provided that the following conditions are met:
8  *
9  *   * Redistributions of source code must retain the above copyright notice,
10  *     this list of conditions and the following disclaimer.
11  *   * Redistributions in binary form must reproduce the above copyright
12  *     notice, this list of conditions and the following disclaimer in the
13  *     documentation and/or other materials provided with the distribution.
14  *   * Neither the name of Redis nor the names of its contributors may be used
15  *     to endorse or promote products derived from this software without
16  *     specific prior written permission.
17  *
18  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
19  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
21  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
22  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
23  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
24  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
25  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
26  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
27  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28  * POSSIBILITY OF SUCH DAMAGE.
29  */
30 
31 #include "server.h"
32 #include "hiredis.h"
33 #include "async.h"
34 
35 #include <ctype.h>
36 #include <arpa/inet.h>
37 #include <sys/socket.h>
38 #include <sys/wait.h>
39 #include <fcntl.h>
40 
41 extern char **environ;
42 
43 #define REDIS_SENTINEL_PORT 26379
44 
45 /* ======================== Sentinel global state =========================== */
46 
47 /* Address object, used to describe an ip:port pair. */
48 typedef struct sentinelAddr {
49     char *ip;
50     int port;
51 } sentinelAddr;
52 
53 /* A Sentinel Redis Instance object is monitoring. */
54 #define SRI_MASTER  (1<<0)
55 #define SRI_SLAVE   (1<<1)
56 #define SRI_SENTINEL (1<<2)
57 #define SRI_S_DOWN (1<<3)   /* Subjectively down (no quorum). */
58 #define SRI_O_DOWN (1<<4)   /* Objectively down (confirmed by others). */
59 #define SRI_MASTER_DOWN (1<<5) /* A Sentinel with this flag set thinks that
60                                    its master is down. */
61 #define SRI_FAILOVER_IN_PROGRESS (1<<6) /* Failover is in progress for
62                                            this master. */
63 #define SRI_PROMOTED (1<<7)            /* Slave selected for promotion. */
64 #define SRI_RECONF_SENT (1<<8)     /* SLAVEOF <newmaster> sent. */
65 #define SRI_RECONF_INPROG (1<<9)   /* Slave synchronization in progress. */
66 #define SRI_RECONF_DONE (1<<10)     /* Slave synchronized with new master. */
67 #define SRI_FORCE_FAILOVER (1<<11)  /* Force failover with master up. */
68 #define SRI_SCRIPT_KILL_SENT (1<<12) /* SCRIPT KILL already sent on -BUSY */
69 
70 /* Note: times are in milliseconds. */
71 #define SENTINEL_INFO_PERIOD 10000
72 #define SENTINEL_PING_PERIOD 1000
73 #define SENTINEL_ASK_PERIOD 1000
74 #define SENTINEL_PUBLISH_PERIOD 2000
75 #define SENTINEL_DEFAULT_DOWN_AFTER 30000
76 #define SENTINEL_HELLO_CHANNEL "__sentinel__:hello"
77 #define SENTINEL_TILT_TRIGGER 2000
78 #define SENTINEL_TILT_PERIOD (SENTINEL_PING_PERIOD*30)
79 #define SENTINEL_DEFAULT_SLAVE_PRIORITY 100
80 #define SENTINEL_SLAVE_RECONF_TIMEOUT 10000
81 #define SENTINEL_DEFAULT_PARALLEL_SYNCS 1
82 #define SENTINEL_MIN_LINK_RECONNECT_PERIOD 15000
83 #define SENTINEL_DEFAULT_FAILOVER_TIMEOUT (60*3*1000)
84 #define SENTINEL_MAX_PENDING_COMMANDS 100
85 #define SENTINEL_ELECTION_TIMEOUT 10000
86 #define SENTINEL_MAX_DESYNC 1000
87 
88 /* Failover machine different states. */
89 #define SENTINEL_FAILOVER_STATE_NONE 0  /* No failover in progress. */
90 #define SENTINEL_FAILOVER_STATE_WAIT_START 1  /* Wait for failover_start_time*/
91 #define SENTINEL_FAILOVER_STATE_SELECT_SLAVE 2 /* Select slave to promote */
92 #define SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE 3 /* Slave -> Master */
93 #define SENTINEL_FAILOVER_STATE_WAIT_PROMOTION 4 /* Wait slave to change role */
94 #define SENTINEL_FAILOVER_STATE_RECONF_SLAVES 5 /* SLAVEOF newmaster */
95 #define SENTINEL_FAILOVER_STATE_UPDATE_CONFIG 6 /* Monitor promoted slave. */
96 
97 #define SENTINEL_MASTER_LINK_STATUS_UP 0
98 #define SENTINEL_MASTER_LINK_STATUS_DOWN 1
99 
100 /* Generic flags that can be used with different functions.
101  * They use higher bits to avoid colliding with the function specific
102  * flags. */
103 #define SENTINEL_NO_FLAGS 0
104 #define SENTINEL_GENERATE_EVENT (1<<16)
105 #define SENTINEL_LEADER (1<<17)
106 #define SENTINEL_OBSERVER (1<<18)
107 
108 /* Script execution flags and limits. */
109 #define SENTINEL_SCRIPT_NONE 0
110 #define SENTINEL_SCRIPT_RUNNING 1
111 #define SENTINEL_SCRIPT_MAX_QUEUE 256
112 #define SENTINEL_SCRIPT_MAX_RUNNING 16
113 #define SENTINEL_SCRIPT_MAX_RUNTIME 60000 /* 60 seconds max exec time. */
114 #define SENTINEL_SCRIPT_MAX_RETRY 10
115 #define SENTINEL_SCRIPT_RETRY_DELAY 30000 /* 30 seconds between retries. */
116 
117 /* SENTINEL SIMULATE-FAILURE command flags. */
118 #define SENTINEL_SIMFAILURE_NONE 0
119 #define SENTINEL_SIMFAILURE_CRASH_AFTER_ELECTION (1<<0)
120 #define SENTINEL_SIMFAILURE_CRASH_AFTER_PROMOTION (1<<1)
121 
122 /* The link to a sentinelRedisInstance. When we have the same set of Sentinels
123  * monitoring many masters, we have different instances representing the
124  * same Sentinels, one per master, and we need to share the hiredis connections
125  * among them. Oherwise if 5 Sentinels are monitoring 100 masters we create
126  * 500 outgoing connections instead of 5.
127  *
128  * So this structure represents a reference counted link in terms of the two
129  * hiredis connections for commands and Pub/Sub, and the fields needed for
130  * failure detection, since the ping/pong time are now local to the link: if
131  * the link is available, the instance is avaialbe. This way we don't just
132  * have 5 connections instead of 500, we also send 5 pings instead of 500.
133  *
134  * Links are shared only for Sentinels: master and slave instances have
135  * a link with refcount = 1, always. */
136 typedef struct instanceLink {
137     int refcount;          /* Number of sentinelRedisInstance owners. */
138     int disconnected;      /* Non-zero if we need to reconnect cc or pc. */
139     int pending_commands;  /* Number of commands sent waiting for a reply. */
140     redisAsyncContext *cc; /* Hiredis context for commands. */
141     redisAsyncContext *pc; /* Hiredis context for Pub / Sub. */
142     mstime_t cc_conn_time; /* cc connection time. */
143     mstime_t pc_conn_time; /* pc connection time. */
144     mstime_t pc_last_activity; /* Last time we received any message. */
145     mstime_t last_avail_time; /* Last time the instance replied to ping with
146                                  a reply we consider valid. */
147     mstime_t act_ping_time;   /* Time at which the last pending ping (no pong
148                                  received after it) was sent. This field is
149                                  set to 0 when a pong is received, and set again
150                                  to the current time if the value is 0 and a new
151                                  ping is sent. */
152     mstime_t last_ping_time;  /* Time at which we sent the last ping. This is
153                                  only used to avoid sending too many pings
154                                  during failure. Idle time is computed using
155                                  the act_ping_time field. */
156     mstime_t last_pong_time;  /* Last time the instance replied to ping,
157                                  whatever the reply was. That's used to check
158                                  if the link is idle and must be reconnected. */
159     mstime_t last_reconn_time;  /* Last reconnection attempt performed when
160                                    the link was down. */
161 } instanceLink;
162 
163 typedef struct sentinelRedisInstance {
164     int flags;      /* See SRI_... defines */
165     char *name;     /* Master name from the point of view of this sentinel. */
166     char *runid;    /* Run ID of this instance, or unique ID if is a Sentinel.*/
167     uint64_t config_epoch;  /* Configuration epoch. */
168     sentinelAddr *addr; /* Master host. */
169     instanceLink *link; /* Link to the instance, may be shared for Sentinels. */
170     mstime_t last_pub_time;   /* Last time we sent hello via Pub/Sub. */
171     mstime_t last_hello_time; /* Only used if SRI_SENTINEL is set. Last time
172                                  we received a hello from this Sentinel
173                                  via Pub/Sub. */
174     mstime_t last_master_down_reply_time; /* Time of last reply to
175                                              SENTINEL is-master-down command. */
176     mstime_t s_down_since_time; /* Subjectively down since time. */
177     mstime_t o_down_since_time; /* Objectively down since time. */
178     mstime_t down_after_period; /* Consider it down after that period. */
179     mstime_t info_refresh;  /* Time at which we received INFO output from it. */
180 
181     /* Role and the first time we observed it.
182      * This is useful in order to delay replacing what the instance reports
183      * with our own configuration. We need to always wait some time in order
184      * to give a chance to the leader to report the new configuration before
185      * we do silly things. */
186     int role_reported;
187     mstime_t role_reported_time;
188     mstime_t slave_conf_change_time; /* Last time slave master addr changed. */
189 
190     /* Master specific. */
191     dict *sentinels;    /* Other sentinels monitoring the same master. */
192     dict *slaves;       /* Slaves for this master instance. */
193     unsigned int quorum;/* Number of sentinels that need to agree on failure. */
194     int parallel_syncs; /* How many slaves to reconfigure at same time. */
195     char *auth_pass;    /* Password to use for AUTH against master & slaves. */
196 
197     /* Slave specific. */
198     mstime_t master_link_down_time; /* Slave replication link down time. */
199     int slave_priority; /* Slave priority according to its INFO output. */
200     mstime_t slave_reconf_sent_time; /* Time at which we sent SLAVE OF <new> */
201     struct sentinelRedisInstance *master; /* Master instance if it's slave. */
202     char *slave_master_host;    /* Master host as reported by INFO */
203     int slave_master_port;      /* Master port as reported by INFO */
204     int slave_master_link_status; /* Master link status as reported by INFO */
205     unsigned long long slave_repl_offset; /* Slave replication offset. */
206     /* Failover */
207     char *leader;       /* If this is a master instance, this is the runid of
208                            the Sentinel that should perform the failover. If
209                            this is a Sentinel, this is the runid of the Sentinel
210                            that this Sentinel voted as leader. */
211     uint64_t leader_epoch; /* Epoch of the 'leader' field. */
212     uint64_t failover_epoch; /* Epoch of the currently started failover. */
213     int failover_state; /* See SENTINEL_FAILOVER_STATE_* defines. */
214     mstime_t failover_state_change_time;
215     mstime_t failover_start_time;   /* Last failover attempt start time. */
216     mstime_t failover_timeout;      /* Max time to refresh failover state. */
217     mstime_t failover_delay_logged; /* For what failover_start_time value we
218                                        logged the failover delay. */
219     struct sentinelRedisInstance *promoted_slave; /* Promoted slave instance. */
220     /* Scripts executed to notify admin or reconfigure clients: when they
221      * are set to NULL no script is executed. */
222     char *notification_script;
223     char *client_reconfig_script;
224     sds info; /* cached INFO output */
225 } sentinelRedisInstance;
226 
227 /* Main state. */
228 struct sentinelState {
229     char myid[CONFIG_RUN_ID_SIZE+1]; /* This sentinel ID. */
230     uint64_t current_epoch;         /* Current epoch. */
231     dict *masters;      /* Dictionary of master sentinelRedisInstances.
232                            Key is the instance name, value is the
233                            sentinelRedisInstance structure pointer. */
234     int tilt;           /* Are we in TILT mode? */
235     int running_scripts;    /* Number of scripts in execution right now. */
236     mstime_t tilt_start_time;       /* When TITL started. */
237     mstime_t previous_time;         /* Last time we ran the time handler. */
238     list *scripts_queue;            /* Queue of user scripts to execute. */
239     char *announce_ip;  /* IP addr that is gossiped to other sentinels if
240                            not NULL. */
241     int announce_port;  /* Port that is gossiped to other sentinels if
242                            non zero. */
243     unsigned long simfailure_flags; /* Failures simulation. */
244 } sentinel;
245 
246 /* A script execution job. */
247 typedef struct sentinelScriptJob {
248     int flags;              /* Script job flags: SENTINEL_SCRIPT_* */
249     int retry_num;          /* Number of times we tried to execute it. */
250     char **argv;            /* Arguments to call the script. */
251     mstime_t start_time;    /* Script execution time if the script is running,
252                                otherwise 0 if we are allowed to retry the
253                                execution at any time. If the script is not
254                                running and it's not 0, it means: do not run
255                                before the specified time. */
256     pid_t pid;              /* Script execution pid. */
257 } sentinelScriptJob;
258 
259 /* ======================= hiredis ae.c adapters =============================
260  * Note: this implementation is taken from hiredis/adapters/ae.h, however
261  * we have our modified copy for Sentinel in order to use our allocator
262  * and to have full control over how the adapter works. */
263 
264 typedef struct redisAeEvents {
265     redisAsyncContext *context;
266     aeEventLoop *loop;
267     int fd;
268     int reading, writing;
269 } redisAeEvents;
270 
redisAeReadEvent(aeEventLoop * el,int fd,void * privdata,int mask)271 static void redisAeReadEvent(aeEventLoop *el, int fd, void *privdata, int mask) {
272     ((void)el); ((void)fd); ((void)mask);
273 
274     redisAeEvents *e = (redisAeEvents*)privdata;
275     redisAsyncHandleRead(e->context);
276 }
277 
redisAeWriteEvent(aeEventLoop * el,int fd,void * privdata,int mask)278 static void redisAeWriteEvent(aeEventLoop *el, int fd, void *privdata, int mask) {
279     ((void)el); ((void)fd); ((void)mask);
280 
281     redisAeEvents *e = (redisAeEvents*)privdata;
282     redisAsyncHandleWrite(e->context);
283 }
284 
redisAeAddRead(void * privdata)285 static void redisAeAddRead(void *privdata) {
286     redisAeEvents *e = (redisAeEvents*)privdata;
287     aeEventLoop *loop = e->loop;
288     if (!e->reading) {
289         e->reading = 1;
290         aeCreateFileEvent(loop,e->fd,AE_READABLE,redisAeReadEvent,e);
291     }
292 }
293 
redisAeDelRead(void * privdata)294 static void redisAeDelRead(void *privdata) {
295     redisAeEvents *e = (redisAeEvents*)privdata;
296     aeEventLoop *loop = e->loop;
297     if (e->reading) {
298         e->reading = 0;
299         aeDeleteFileEvent(loop,e->fd,AE_READABLE);
300     }
301 }
302 
redisAeAddWrite(void * privdata)303 static void redisAeAddWrite(void *privdata) {
304     redisAeEvents *e = (redisAeEvents*)privdata;
305     aeEventLoop *loop = e->loop;
306     if (!e->writing) {
307         e->writing = 1;
308         aeCreateFileEvent(loop,e->fd,AE_WRITABLE,redisAeWriteEvent,e);
309     }
310 }
311 
redisAeDelWrite(void * privdata)312 static void redisAeDelWrite(void *privdata) {
313     redisAeEvents *e = (redisAeEvents*)privdata;
314     aeEventLoop *loop = e->loop;
315     if (e->writing) {
316         e->writing = 0;
317         aeDeleteFileEvent(loop,e->fd,AE_WRITABLE);
318     }
319 }
320 
redisAeCleanup(void * privdata)321 static void redisAeCleanup(void *privdata) {
322     redisAeEvents *e = (redisAeEvents*)privdata;
323     redisAeDelRead(privdata);
324     redisAeDelWrite(privdata);
325     zfree(e);
326 }
327 
redisAeAttach(aeEventLoop * loop,redisAsyncContext * ac)328 static int redisAeAttach(aeEventLoop *loop, redisAsyncContext *ac) {
329     redisContext *c = &(ac->c);
330     redisAeEvents *e;
331 
332     /* Nothing should be attached when something is already attached */
333     if (ac->ev.data != NULL)
334         return C_ERR;
335 
336     /* Create container for context and r/w events */
337     e = (redisAeEvents*)zmalloc(sizeof(*e));
338     e->context = ac;
339     e->loop = loop;
340     e->fd = c->fd;
341     e->reading = e->writing = 0;
342 
343     /* Register functions to start/stop listening for events */
344     ac->ev.addRead = redisAeAddRead;
345     ac->ev.delRead = redisAeDelRead;
346     ac->ev.addWrite = redisAeAddWrite;
347     ac->ev.delWrite = redisAeDelWrite;
348     ac->ev.cleanup = redisAeCleanup;
349     ac->ev.data = e;
350 
351     return C_OK;
352 }
353 
354 /* ============================= Prototypes ================================= */
355 
356 void sentinelLinkEstablishedCallback(const redisAsyncContext *c, int status);
357 void sentinelDisconnectCallback(const redisAsyncContext *c, int status);
358 void sentinelReceiveHelloMessages(redisAsyncContext *c, void *reply, void *privdata);
359 sentinelRedisInstance *sentinelGetMasterByName(char *name);
360 char *sentinelGetSubjectiveLeader(sentinelRedisInstance *master);
361 char *sentinelGetObjectiveLeader(sentinelRedisInstance *master);
362 int yesnotoi(char *s);
363 void instanceLinkConnectionError(const redisAsyncContext *c);
364 const char *sentinelRedisInstanceTypeStr(sentinelRedisInstance *ri);
365 void sentinelAbortFailover(sentinelRedisInstance *ri);
366 void sentinelEvent(int level, char *type, sentinelRedisInstance *ri, const char *fmt, ...);
367 sentinelRedisInstance *sentinelSelectSlave(sentinelRedisInstance *master);
368 void sentinelScheduleScriptExecution(char *path, ...);
369 void sentinelStartFailover(sentinelRedisInstance *master);
370 void sentinelDiscardReplyCallback(redisAsyncContext *c, void *reply, void *privdata);
371 int sentinelSendSlaveOf(sentinelRedisInstance *ri, char *host, int port);
372 char *sentinelVoteLeader(sentinelRedisInstance *master, uint64_t req_epoch, char *req_runid, uint64_t *leader_epoch);
373 void sentinelFlushConfig(void);
374 void sentinelGenerateInitialMonitorEvents(void);
375 int sentinelSendPing(sentinelRedisInstance *ri);
376 int sentinelForceHelloUpdateForMaster(sentinelRedisInstance *master);
377 sentinelRedisInstance *getSentinelRedisInstanceByAddrAndRunID(dict *instances, char *ip, int port, char *runid);
378 void sentinelSimFailureCrash(void);
379 
380 /* ========================= Dictionary types =============================== */
381 
382 unsigned int dictSdsHash(const void *key);
383 int dictSdsKeyCompare(void *privdata, const void *key1, const void *key2);
384 void releaseSentinelRedisInstance(sentinelRedisInstance *ri);
385 
dictInstancesValDestructor(void * privdata,void * obj)386 void dictInstancesValDestructor (void *privdata, void *obj) {
387     UNUSED(privdata);
388     releaseSentinelRedisInstance(obj);
389 }
390 
391 /* Instance name (sds) -> instance (sentinelRedisInstance pointer)
392  *
393  * also used for: sentinelRedisInstance->sentinels dictionary that maps
394  * sentinels ip:port to last seen time in Pub/Sub hello message. */
395 dictType instancesDictType = {
396     dictSdsHash,               /* hash function */
397     NULL,                      /* key dup */
398     NULL,                      /* val dup */
399     dictSdsKeyCompare,         /* key compare */
400     NULL,                      /* key destructor */
401     dictInstancesValDestructor /* val destructor */
402 };
403 
404 /* Instance runid (sds) -> votes (long casted to void*)
405  *
406  * This is useful into sentinelGetObjectiveLeader() function in order to
407  * count the votes and understand who is the leader. */
408 dictType leaderVotesDictType = {
409     dictSdsHash,               /* hash function */
410     NULL,                      /* key dup */
411     NULL,                      /* val dup */
412     dictSdsKeyCompare,         /* key compare */
413     NULL,                      /* key destructor */
414     NULL                       /* val destructor */
415 };
416 
417 /* =========================== Initialization =============================== */
418 
419 void sentinelCommand(client *c);
420 void sentinelInfoCommand(client *c);
421 void sentinelSetCommand(client *c);
422 void sentinelPublishCommand(client *c);
423 void sentinelRoleCommand(client *c);
424 
425 struct redisCommand sentinelcmds[] = {
426     {"ping",pingCommand,1,"",0,NULL,0,0,0,0,0},
427     {"sentinel",sentinelCommand,-2,"",0,NULL,0,0,0,0,0},
428     {"subscribe",subscribeCommand,-2,"",0,NULL,0,0,0,0,0},
429     {"unsubscribe",unsubscribeCommand,-1,"",0,NULL,0,0,0,0,0},
430     {"psubscribe",psubscribeCommand,-2,"",0,NULL,0,0,0,0,0},
431     {"punsubscribe",punsubscribeCommand,-1,"",0,NULL,0,0,0,0,0},
432     {"publish",sentinelPublishCommand,3,"",0,NULL,0,0,0,0,0},
433     {"info",sentinelInfoCommand,-1,"",0,NULL,0,0,0,0,0},
434     {"role",sentinelRoleCommand,1,"l",0,NULL,0,0,0,0,0},
435     {"client",clientCommand,-2,"rs",0,NULL,0,0,0,0,0},
436     {"shutdown",shutdownCommand,-1,"",0,NULL,0,0,0,0,0}
437 };
438 
439 /* This function overwrites a few normal Redis config default with Sentinel
440  * specific defaults. */
initSentinelConfig(void)441 void initSentinelConfig(void) {
442     server.port = REDIS_SENTINEL_PORT;
443 }
444 
445 /* Perform the Sentinel mode initialization. */
initSentinel(void)446 void initSentinel(void) {
447     unsigned int j;
448 
449     /* Remove usual Redis commands from the command table, then just add
450      * the SENTINEL command. */
451     dictEmpty(server.commands,NULL);
452     for (j = 0; j < sizeof(sentinelcmds)/sizeof(sentinelcmds[0]); j++) {
453         int retval;
454         struct redisCommand *cmd = sentinelcmds+j;
455 
456         retval = dictAdd(server.commands, sdsnew(cmd->name), cmd);
457         serverAssert(retval == DICT_OK);
458     }
459 
460     /* Initialize various data structures. */
461     sentinel.current_epoch = 0;
462     sentinel.masters = dictCreate(&instancesDictType,NULL);
463     sentinel.tilt = 0;
464     sentinel.tilt_start_time = 0;
465     sentinel.previous_time = mstime();
466     sentinel.running_scripts = 0;
467     sentinel.scripts_queue = listCreate();
468     sentinel.announce_ip = NULL;
469     sentinel.announce_port = 0;
470     sentinel.simfailure_flags = SENTINEL_SIMFAILURE_NONE;
471     memset(sentinel.myid,0,sizeof(sentinel.myid));
472 }
473 
474 /* This function gets called when the server is in Sentinel mode, started,
475  * loaded the configuration, and is ready for normal operations. */
sentinelIsRunning(void)476 void sentinelIsRunning(void) {
477     int j;
478 
479     if (server.configfile == NULL) {
480         serverLog(LL_WARNING,
481             "Sentinel started without a config file. Exiting...");
482         exit(1);
483     } else if (access(server.configfile,W_OK) == -1) {
484         serverLog(LL_WARNING,
485             "Sentinel config file %s is not writable: %s. Exiting...",
486             server.configfile,strerror(errno));
487         exit(1);
488     }
489 
490     /* If this Sentinel has yet no ID set in the configuration file, we
491      * pick a random one and persist the config on disk. From now on this
492      * will be this Sentinel ID across restarts. */
493     for (j = 0; j < CONFIG_RUN_ID_SIZE; j++)
494         if (sentinel.myid[j] != 0) break;
495 
496     if (j == CONFIG_RUN_ID_SIZE) {
497         /* Pick ID and presist the config. */
498         getRandomHexChars(sentinel.myid,CONFIG_RUN_ID_SIZE);
499         sentinelFlushConfig();
500     }
501 
502     /* Log its ID to make debugging of issues simpler. */
503     serverLog(LL_WARNING,"Sentinel ID is %s", sentinel.myid);
504 
505     /* We want to generate a +monitor event for every configured master
506      * at startup. */
507     sentinelGenerateInitialMonitorEvents();
508 }
509 
510 /* ============================== sentinelAddr ============================== */
511 
512 /* Create a sentinelAddr object and return it on success.
513  * On error NULL is returned and errno is set to:
514  *  ENOENT: Can't resolve the hostname.
515  *  EINVAL: Invalid port number.
516  */
createSentinelAddr(char * hostname,int port)517 sentinelAddr *createSentinelAddr(char *hostname, int port) {
518     char ip[NET_IP_STR_LEN];
519     sentinelAddr *sa;
520 
521     if (port < 0 || port > 65535) {
522         errno = EINVAL;
523         return NULL;
524     }
525     if (anetResolve(NULL,hostname,ip,sizeof(ip)) == ANET_ERR) {
526         errno = ENOENT;
527         return NULL;
528     }
529     sa = zmalloc(sizeof(*sa));
530     sa->ip = sdsnew(ip);
531     sa->port = port;
532     return sa;
533 }
534 
535 /* Return a duplicate of the source address. */
dupSentinelAddr(sentinelAddr * src)536 sentinelAddr *dupSentinelAddr(sentinelAddr *src) {
537     sentinelAddr *sa;
538 
539     sa = zmalloc(sizeof(*sa));
540     sa->ip = sdsnew(src->ip);
541     sa->port = src->port;
542     return sa;
543 }
544 
545 /* Free a Sentinel address. Can't fail. */
releaseSentinelAddr(sentinelAddr * sa)546 void releaseSentinelAddr(sentinelAddr *sa) {
547     sdsfree(sa->ip);
548     zfree(sa);
549 }
550 
551 /* Return non-zero if two addresses are equal. */
sentinelAddrIsEqual(sentinelAddr * a,sentinelAddr * b)552 int sentinelAddrIsEqual(sentinelAddr *a, sentinelAddr *b) {
553     return a->port == b->port && !strcasecmp(a->ip,b->ip);
554 }
555 
556 /* =========================== Events notification ========================== */
557 
558 /* Send an event to log, pub/sub, user notification script.
559  *
560  * 'level' is the log level for logging. Only LL_WARNING events will trigger
561  * the execution of the user notification script.
562  *
563  * 'type' is the message type, also used as a pub/sub channel name.
564  *
565  * 'ri', is the redis instance target of this event if applicable, and is
566  * used to obtain the path of the notification script to execute.
567  *
568  * The remaining arguments are printf-alike.
569  * If the format specifier starts with the two characters "%@" then ri is
570  * not NULL, and the message is prefixed with an instance identifier in the
571  * following format:
572  *
573  *  <instance type> <instance name> <ip> <port>
574  *
575  *  If the instance type is not master, than the additional string is
576  *  added to specify the originating master:
577  *
578  *  @ <master name> <master ip> <master port>
579  *
580  *  Any other specifier after "%@" is processed by printf itself.
581  */
sentinelEvent(int level,char * type,sentinelRedisInstance * ri,const char * fmt,...)582 void sentinelEvent(int level, char *type, sentinelRedisInstance *ri,
583                    const char *fmt, ...) {
584     va_list ap;
585     char msg[LOG_MAX_LEN];
586     robj *channel, *payload;
587 
588     /* Handle %@ */
589     if (fmt[0] == '%' && fmt[1] == '@') {
590         sentinelRedisInstance *master = (ri->flags & SRI_MASTER) ?
591                                          NULL : ri->master;
592 
593         if (master) {
594             snprintf(msg, sizeof(msg), "%s %s %s %d @ %s %s %d",
595                 sentinelRedisInstanceTypeStr(ri),
596                 ri->name, ri->addr->ip, ri->addr->port,
597                 master->name, master->addr->ip, master->addr->port);
598         } else {
599             snprintf(msg, sizeof(msg), "%s %s %s %d",
600                 sentinelRedisInstanceTypeStr(ri),
601                 ri->name, ri->addr->ip, ri->addr->port);
602         }
603         fmt += 2;
604     } else {
605         msg[0] = '\0';
606     }
607 
608     /* Use vsprintf for the rest of the formatting if any. */
609     if (fmt[0] != '\0') {
610         va_start(ap, fmt);
611         vsnprintf(msg+strlen(msg), sizeof(msg)-strlen(msg), fmt, ap);
612         va_end(ap);
613     }
614 
615     /* Log the message if the log level allows it to be logged. */
616     if (level >= server.verbosity)
617         serverLog(level,"%s %s",type,msg);
618 
619     /* Publish the message via Pub/Sub if it's not a debugging one. */
620     if (level != LL_DEBUG) {
621         channel = createStringObject(type,strlen(type));
622         payload = createStringObject(msg,strlen(msg));
623         pubsubPublishMessage(channel,payload);
624         decrRefCount(channel);
625         decrRefCount(payload);
626     }
627 
628     /* Call the notification script if applicable. */
629     if (level == LL_WARNING && ri != NULL) {
630         sentinelRedisInstance *master = (ri->flags & SRI_MASTER) ?
631                                          ri : ri->master;
632         if (master && master->notification_script) {
633             sentinelScheduleScriptExecution(master->notification_script,
634                 type,msg,NULL);
635         }
636     }
637 }
638 
639 /* This function is called only at startup and is used to generate a
640  * +monitor event for every configured master. The same events are also
641  * generated when a master to monitor is added at runtime via the
642  * SENTINEL MONITOR command. */
sentinelGenerateInitialMonitorEvents(void)643 void sentinelGenerateInitialMonitorEvents(void) {
644     dictIterator *di;
645     dictEntry *de;
646 
647     di = dictGetIterator(sentinel.masters);
648     while((de = dictNext(di)) != NULL) {
649         sentinelRedisInstance *ri = dictGetVal(de);
650         sentinelEvent(LL_WARNING,"+monitor",ri,"%@ quorum %d",ri->quorum);
651     }
652     dictReleaseIterator(di);
653 }
654 
655 /* ============================ script execution ============================ */
656 
657 /* Release a script job structure and all the associated data. */
sentinelReleaseScriptJob(sentinelScriptJob * sj)658 void sentinelReleaseScriptJob(sentinelScriptJob *sj) {
659     int j = 0;
660 
661     while(sj->argv[j]) sdsfree(sj->argv[j++]);
662     zfree(sj->argv);
663     zfree(sj);
664 }
665 
666 #define SENTINEL_SCRIPT_MAX_ARGS 16
sentinelScheduleScriptExecution(char * path,...)667 void sentinelScheduleScriptExecution(char *path, ...) {
668     va_list ap;
669     char *argv[SENTINEL_SCRIPT_MAX_ARGS+1];
670     int argc = 1;
671     sentinelScriptJob *sj;
672 
673     va_start(ap, path);
674     while(argc < SENTINEL_SCRIPT_MAX_ARGS) {
675         argv[argc] = va_arg(ap,char*);
676         if (!argv[argc]) break;
677         argv[argc] = sdsnew(argv[argc]); /* Copy the string. */
678         argc++;
679     }
680     va_end(ap);
681     argv[0] = sdsnew(path);
682 
683     sj = zmalloc(sizeof(*sj));
684     sj->flags = SENTINEL_SCRIPT_NONE;
685     sj->retry_num = 0;
686     sj->argv = zmalloc(sizeof(char*)*(argc+1));
687     sj->start_time = 0;
688     sj->pid = 0;
689     memcpy(sj->argv,argv,sizeof(char*)*(argc+1));
690 
691     listAddNodeTail(sentinel.scripts_queue,sj);
692 
693     /* Remove the oldest non running script if we already hit the limit. */
694     if (listLength(sentinel.scripts_queue) > SENTINEL_SCRIPT_MAX_QUEUE) {
695         listNode *ln;
696         listIter li;
697 
698         listRewind(sentinel.scripts_queue,&li);
699         while ((ln = listNext(&li)) != NULL) {
700             sj = ln->value;
701 
702             if (sj->flags & SENTINEL_SCRIPT_RUNNING) continue;
703             /* The first node is the oldest as we add on tail. */
704             listDelNode(sentinel.scripts_queue,ln);
705             sentinelReleaseScriptJob(sj);
706             break;
707         }
708         serverAssert(listLength(sentinel.scripts_queue) <=
709                     SENTINEL_SCRIPT_MAX_QUEUE);
710     }
711 }
712 
713 /* Lookup a script in the scripts queue via pid, and returns the list node
714  * (so that we can easily remove it from the queue if needed). */
sentinelGetScriptListNodeByPid(pid_t pid)715 listNode *sentinelGetScriptListNodeByPid(pid_t pid) {
716     listNode *ln;
717     listIter li;
718 
719     listRewind(sentinel.scripts_queue,&li);
720     while ((ln = listNext(&li)) != NULL) {
721         sentinelScriptJob *sj = ln->value;
722 
723         if ((sj->flags & SENTINEL_SCRIPT_RUNNING) && sj->pid == pid)
724             return ln;
725     }
726     return NULL;
727 }
728 
729 /* Run pending scripts if we are not already at max number of running
730  * scripts. */
sentinelRunPendingScripts(void)731 void sentinelRunPendingScripts(void) {
732     listNode *ln;
733     listIter li;
734     mstime_t now = mstime();
735 
736     /* Find jobs that are not running and run them, from the top to the
737      * tail of the queue, so we run older jobs first. */
738     listRewind(sentinel.scripts_queue,&li);
739     while (sentinel.running_scripts < SENTINEL_SCRIPT_MAX_RUNNING &&
740            (ln = listNext(&li)) != NULL)
741     {
742         sentinelScriptJob *sj = ln->value;
743         pid_t pid;
744 
745         /* Skip if already running. */
746         if (sj->flags & SENTINEL_SCRIPT_RUNNING) continue;
747 
748         /* Skip if it's a retry, but not enough time has elapsed. */
749         if (sj->start_time && sj->start_time > now) continue;
750 
751         sj->flags |= SENTINEL_SCRIPT_RUNNING;
752         sj->start_time = mstime();
753         sj->retry_num++;
754         pid = fork();
755 
756         if (pid == -1) {
757             /* Parent (fork error).
758              * We report fork errors as signal 99, in order to unify the
759              * reporting with other kind of errors. */
760             sentinelEvent(LL_WARNING,"-script-error",NULL,
761                           "%s %d %d", sj->argv[0], 99, 0);
762             sj->flags &= ~SENTINEL_SCRIPT_RUNNING;
763             sj->pid = 0;
764         } else if (pid == 0) {
765             /* Child */
766             execve(sj->argv[0],sj->argv,environ);
767             /* If we are here an error occurred. */
768             _exit(2); /* Don't retry execution. */
769         } else {
770             sentinel.running_scripts++;
771             sj->pid = pid;
772             sentinelEvent(LL_DEBUG,"+script-child",NULL,"%ld",(long)pid);
773         }
774     }
775 }
776 
777 /* How much to delay the execution of a script that we need to retry after
778  * an error?
779  *
780  * We double the retry delay for every further retry we do. So for instance
781  * if RETRY_DELAY is set to 30 seconds and the max number of retries is 10
782  * starting from the second attempt to execute the script the delays are:
783  * 30 sec, 60 sec, 2 min, 4 min, 8 min, 16 min, 32 min, 64 min, 128 min. */
sentinelScriptRetryDelay(int retry_num)784 mstime_t sentinelScriptRetryDelay(int retry_num) {
785     mstime_t delay = SENTINEL_SCRIPT_RETRY_DELAY;
786 
787     while (retry_num-- > 1) delay *= 2;
788     return delay;
789 }
790 
791 /* Check for scripts that terminated, and remove them from the queue if the
792  * script terminated successfully. If instead the script was terminated by
793  * a signal, or returned exit code "1", it is scheduled to run again if
794  * the max number of retries did not already elapsed. */
sentinelCollectTerminatedScripts(void)795 void sentinelCollectTerminatedScripts(void) {
796     int statloc;
797     pid_t pid;
798 
799     while ((pid = wait3(&statloc,WNOHANG,NULL)) > 0) {
800         int exitcode = WEXITSTATUS(statloc);
801         int bysignal = 0;
802         listNode *ln;
803         sentinelScriptJob *sj;
804 
805         if (WIFSIGNALED(statloc)) bysignal = WTERMSIG(statloc);
806         sentinelEvent(LL_DEBUG,"-script-child",NULL,"%ld %d %d",
807             (long)pid, exitcode, bysignal);
808 
809         ln = sentinelGetScriptListNodeByPid(pid);
810         if (ln == NULL) {
811             serverLog(LL_WARNING,"wait3() returned a pid (%ld) we can't find in our scripts execution queue!", (long)pid);
812             continue;
813         }
814         sj = ln->value;
815 
816         /* If the script was terminated by a signal or returns an
817          * exit code of "1" (that means: please retry), we reschedule it
818          * if the max number of retries is not already reached. */
819         if ((bysignal || exitcode == 1) &&
820             sj->retry_num != SENTINEL_SCRIPT_MAX_RETRY)
821         {
822             sj->flags &= ~SENTINEL_SCRIPT_RUNNING;
823             sj->pid = 0;
824             sj->start_time = mstime() +
825                              sentinelScriptRetryDelay(sj->retry_num);
826         } else {
827             /* Otherwise let's remove the script, but log the event if the
828              * execution did not terminated in the best of the ways. */
829             if (bysignal || exitcode != 0) {
830                 sentinelEvent(LL_WARNING,"-script-error",NULL,
831                               "%s %d %d", sj->argv[0], bysignal, exitcode);
832             }
833             listDelNode(sentinel.scripts_queue,ln);
834             sentinelReleaseScriptJob(sj);
835             sentinel.running_scripts--;
836         }
837     }
838 }
839 
840 /* Kill scripts in timeout, they'll be collected by the
841  * sentinelCollectTerminatedScripts() function. */
sentinelKillTimedoutScripts(void)842 void sentinelKillTimedoutScripts(void) {
843     listNode *ln;
844     listIter li;
845     mstime_t now = mstime();
846 
847     listRewind(sentinel.scripts_queue,&li);
848     while ((ln = listNext(&li)) != NULL) {
849         sentinelScriptJob *sj = ln->value;
850 
851         if (sj->flags & SENTINEL_SCRIPT_RUNNING &&
852             (now - sj->start_time) > SENTINEL_SCRIPT_MAX_RUNTIME)
853         {
854             sentinelEvent(LL_WARNING,"-script-timeout",NULL,"%s %ld",
855                 sj->argv[0], (long)sj->pid);
856             kill(sj->pid,SIGKILL);
857         }
858     }
859 }
860 
861 /* Implements SENTINEL PENDING-SCRIPTS command. */
sentinelPendingScriptsCommand(client * c)862 void sentinelPendingScriptsCommand(client *c) {
863     listNode *ln;
864     listIter li;
865 
866     addReplyMultiBulkLen(c,listLength(sentinel.scripts_queue));
867     listRewind(sentinel.scripts_queue,&li);
868     while ((ln = listNext(&li)) != NULL) {
869         sentinelScriptJob *sj = ln->value;
870         int j = 0;
871 
872         addReplyMultiBulkLen(c,10);
873 
874         addReplyBulkCString(c,"argv");
875         while (sj->argv[j]) j++;
876         addReplyMultiBulkLen(c,j);
877         j = 0;
878         while (sj->argv[j]) addReplyBulkCString(c,sj->argv[j++]);
879 
880         addReplyBulkCString(c,"flags");
881         addReplyBulkCString(c,
882             (sj->flags & SENTINEL_SCRIPT_RUNNING) ? "running" : "scheduled");
883 
884         addReplyBulkCString(c,"pid");
885         addReplyBulkLongLong(c,sj->pid);
886 
887         if (sj->flags & SENTINEL_SCRIPT_RUNNING) {
888             addReplyBulkCString(c,"run-time");
889             addReplyBulkLongLong(c,mstime() - sj->start_time);
890         } else {
891             mstime_t delay = sj->start_time ? (sj->start_time-mstime()) : 0;
892             if (delay < 0) delay = 0;
893             addReplyBulkCString(c,"run-delay");
894             addReplyBulkLongLong(c,delay);
895         }
896 
897         addReplyBulkCString(c,"retry-num");
898         addReplyBulkLongLong(c,sj->retry_num);
899     }
900 }
901 
902 /* This function calls, if any, the client reconfiguration script with the
903  * following parameters:
904  *
905  * <master-name> <role> <state> <from-ip> <from-port> <to-ip> <to-port>
906  *
907  * It is called every time a failover is performed.
908  *
909  * <state> is currently always "failover".
910  * <role> is either "leader" or "observer".
911  *
912  * from/to fields are respectively master -> promoted slave addresses for
913  * "start" and "end". */
sentinelCallClientReconfScript(sentinelRedisInstance * master,int role,char * state,sentinelAddr * from,sentinelAddr * to)914 void sentinelCallClientReconfScript(sentinelRedisInstance *master, int role, char *state, sentinelAddr *from, sentinelAddr *to) {
915     char fromport[32], toport[32];
916 
917     if (master->client_reconfig_script == NULL) return;
918     ll2string(fromport,sizeof(fromport),from->port);
919     ll2string(toport,sizeof(toport),to->port);
920     sentinelScheduleScriptExecution(master->client_reconfig_script,
921         master->name,
922         (role == SENTINEL_LEADER) ? "leader" : "observer",
923         state, from->ip, fromport, to->ip, toport, NULL);
924 }
925 
926 /* =============================== instanceLink ============================= */
927 
928 /* Create a not yet connected link object. */
createInstanceLink(void)929 instanceLink *createInstanceLink(void) {
930     instanceLink *link = zmalloc(sizeof(*link));
931 
932     link->refcount = 1;
933     link->disconnected = 1;
934     link->pending_commands = 0;
935     link->cc = NULL;
936     link->pc = NULL;
937     link->cc_conn_time = 0;
938     link->pc_conn_time = 0;
939     link->last_reconn_time = 0;
940     link->pc_last_activity = 0;
941     /* We set the act_ping_time to "now" even if we actually don't have yet
942      * a connection with the node, nor we sent a ping.
943      * This is useful to detect a timeout in case we'll not be able to connect
944      * with the node at all. */
945     link->act_ping_time = mstime();
946     link->last_ping_time = 0;
947     link->last_avail_time = mstime();
948     link->last_pong_time = mstime();
949     return link;
950 }
951 
952 /* Disconnect an hiredis connection in the context of an instance link. */
instanceLinkCloseConnection(instanceLink * link,redisAsyncContext * c)953 void instanceLinkCloseConnection(instanceLink *link, redisAsyncContext *c) {
954     if (c == NULL) return;
955 
956     if (link->cc == c) {
957         link->cc = NULL;
958         link->pending_commands = 0;
959     }
960     if (link->pc == c) link->pc = NULL;
961     c->data = NULL;
962     link->disconnected = 1;
963     redisAsyncFree(c);
964 }
965 
966 /* Decrement the refcount of a link object, if it drops to zero, actually
967  * free it and return NULL. Otherwise don't do anything and return the pointer
968  * to the object.
969  *
970  * If we are not going to free the link and ri is not NULL, we rebind all the
971  * pending requests in link->cc (hiredis connection for commands) to a
972  * callback that will just ignore them. This is useful to avoid processing
973  * replies for an instance that no longer exists. */
releaseInstanceLink(instanceLink * link,sentinelRedisInstance * ri)974 instanceLink *releaseInstanceLink(instanceLink *link, sentinelRedisInstance *ri)
975 {
976     serverAssert(link->refcount > 0);
977     link->refcount--;
978     if (link->refcount != 0) {
979         if (ri && ri->link->cc) {
980             /* This instance may have pending callbacks in the hiredis async
981              * context, having as 'privdata' the instance that we are going to
982              * free. Let's rewrite the callback list, directly exploiting
983              * hiredis internal data structures, in order to bind them with
984              * a callback that will ignore the reply at all. */
985             redisCallback *cb;
986             redisCallbackList *callbacks = &link->cc->replies;
987 
988             cb = callbacks->head;
989             while(cb) {
990                 if (cb->privdata == ri) {
991                     cb->fn = sentinelDiscardReplyCallback;
992                     cb->privdata = NULL; /* Not strictly needed. */
993                 }
994                 cb = cb->next;
995             }
996         }
997         return link; /* Other active users. */
998     }
999 
1000     instanceLinkCloseConnection(link,link->cc);
1001     instanceLinkCloseConnection(link,link->pc);
1002     zfree(link);
1003     return NULL;
1004 }
1005 
1006 /* This function will attempt to share the instance link we already have
1007  * for the same Sentinel in the context of a different master, with the
1008  * instance we are passing as argument.
1009  *
1010  * This way multiple Sentinel objects that refer all to the same physical
1011  * Sentinel instance but in the context of different masters will use
1012  * a single connection, will send a single PING per second for failure
1013  * detection and so forth.
1014  *
1015  * Return C_OK if a matching Sentinel was found in the context of a
1016  * different master and sharing was performed. Otherwise C_ERR
1017  * is returned. */
sentinelTryConnectionSharing(sentinelRedisInstance * ri)1018 int sentinelTryConnectionSharing(sentinelRedisInstance *ri) {
1019     serverAssert(ri->flags & SRI_SENTINEL);
1020     dictIterator *di;
1021     dictEntry *de;
1022 
1023     if (ri->runid == NULL) return C_ERR; /* No way to identify it. */
1024     if (ri->link->refcount > 1) return C_ERR; /* Already shared. */
1025 
1026     di = dictGetIterator(sentinel.masters);
1027     while((de = dictNext(di)) != NULL) {
1028         sentinelRedisInstance *master = dictGetVal(de), *match;
1029         /* We want to share with the same physical Sentinel referenced
1030          * in other masters, so skip our master. */
1031         if (master == ri->master) continue;
1032         match = getSentinelRedisInstanceByAddrAndRunID(master->sentinels,
1033                                                        NULL,0,ri->runid);
1034         if (match == NULL) continue; /* No match. */
1035         if (match == ri) continue; /* Should never happen but... safer. */
1036 
1037         /* We identified a matching Sentinel, great! Let's free our link
1038          * and use the one of the matching Sentinel. */
1039         releaseInstanceLink(ri->link,NULL);
1040         ri->link = match->link;
1041         match->link->refcount++;
1042         return C_OK;
1043     }
1044     dictReleaseIterator(di);
1045     return C_ERR;
1046 }
1047 
1048 /* When we detect a Sentinel to switch address (reporting a different IP/port
1049  * pair in Hello messages), let's update all the matching Sentinels in the
1050  * context of other masters as well and disconnect the links, so that everybody
1051  * will be updated.
1052  *
1053  * Return the number of updated Sentinel addresses. */
sentinelUpdateSentinelAddressInAllMasters(sentinelRedisInstance * ri)1054 int sentinelUpdateSentinelAddressInAllMasters(sentinelRedisInstance *ri) {
1055     serverAssert(ri->flags & SRI_SENTINEL);
1056     dictIterator *di;
1057     dictEntry *de;
1058     int reconfigured = 0;
1059 
1060     di = dictGetIterator(sentinel.masters);
1061     while((de = dictNext(di)) != NULL) {
1062         sentinelRedisInstance *master = dictGetVal(de), *match;
1063         match = getSentinelRedisInstanceByAddrAndRunID(master->sentinels,
1064                                                        NULL,0,ri->runid);
1065         /* If there is no match, this master does not know about this
1066          * Sentinel, try with the next one. */
1067         if (match == NULL) continue;
1068 
1069         /* Disconnect the old links if connected. */
1070         if (match->link->cc != NULL)
1071             instanceLinkCloseConnection(match->link,match->link->cc);
1072         if (match->link->pc != NULL)
1073             instanceLinkCloseConnection(match->link,match->link->pc);
1074 
1075         if (match == ri) continue; /* Address already updated for it. */
1076 
1077         /* Update the address of the matching Sentinel by copying the address
1078          * of the Sentinel object that received the address update. */
1079         releaseSentinelAddr(match->addr);
1080         match->addr = dupSentinelAddr(ri->addr);
1081         reconfigured++;
1082     }
1083     dictReleaseIterator(di);
1084     if (reconfigured)
1085         sentinelEvent(LL_NOTICE,"+sentinel-address-update", ri,
1086                     "%@ %d additional matching instances", reconfigured);
1087     return reconfigured;
1088 }
1089 
1090 /* This function is called when an hiredis connection reported an error.
1091  * We set it to NULL and mark the link as disconnected so that it will be
1092  * reconnected again.
1093  *
1094  * Note: we don't free the hiredis context as hiredis will do it for us
1095  * for async connections. */
instanceLinkConnectionError(const redisAsyncContext * c)1096 void instanceLinkConnectionError(const redisAsyncContext *c) {
1097     instanceLink *link = c->data;
1098     int pubsub;
1099 
1100     if (!link) return;
1101 
1102     pubsub = (link->pc == c);
1103     if (pubsub)
1104         link->pc = NULL;
1105     else
1106         link->cc = NULL;
1107     link->disconnected = 1;
1108 }
1109 
1110 /* Hiredis connection established / disconnected callbacks. We need them
1111  * just to cleanup our link state. */
sentinelLinkEstablishedCallback(const redisAsyncContext * c,int status)1112 void sentinelLinkEstablishedCallback(const redisAsyncContext *c, int status) {
1113     if (status != C_OK) instanceLinkConnectionError(c);
1114 }
1115 
sentinelDisconnectCallback(const redisAsyncContext * c,int status)1116 void sentinelDisconnectCallback(const redisAsyncContext *c, int status) {
1117     UNUSED(status);
1118     instanceLinkConnectionError(c);
1119 }
1120 
1121 /* ========================== sentinelRedisInstance ========================= */
1122 
1123 /* Create a redis instance, the following fields must be populated by the
1124  * caller if needed:
1125  * runid: set to NULL but will be populated once INFO output is received.
1126  * info_refresh: is set to 0 to mean that we never received INFO so far.
1127  *
1128  * If SRI_MASTER is set into initial flags the instance is added to
1129  * sentinel.masters table.
1130  *
1131  * if SRI_SLAVE or SRI_SENTINEL is set then 'master' must be not NULL and the
1132  * instance is added into master->slaves or master->sentinels table.
1133  *
1134  * If the instance is a slave or sentinel, the name parameter is ignored and
1135  * is created automatically as hostname:port.
1136  *
1137  * The function fails if hostname can't be resolved or port is out of range.
1138  * When this happens NULL is returned and errno is set accordingly to the
1139  * createSentinelAddr() function.
1140  *
1141  * The function may also fail and return NULL with errno set to EBUSY if
1142  * a master with the same name, a slave with the same address, or a sentinel
1143  * with the same ID already exists. */
1144 
createSentinelRedisInstance(char * name,int flags,char * hostname,int port,int quorum,sentinelRedisInstance * master)1145 sentinelRedisInstance *createSentinelRedisInstance(char *name, int flags, char *hostname, int port, int quorum, sentinelRedisInstance *master) {
1146     sentinelRedisInstance *ri;
1147     sentinelAddr *addr;
1148     dict *table = NULL;
1149     char slavename[NET_PEER_ID_LEN], *sdsname;
1150 
1151     serverAssert(flags & (SRI_MASTER|SRI_SLAVE|SRI_SENTINEL));
1152     serverAssert((flags & SRI_MASTER) || master != NULL);
1153 
1154     /* Check address validity. */
1155     addr = createSentinelAddr(hostname,port);
1156     if (addr == NULL) return NULL;
1157 
1158     /* For slaves use ip:port as name. */
1159     if (flags & SRI_SLAVE) {
1160         anetFormatAddr(slavename, sizeof(slavename), hostname, port);
1161         name = slavename;
1162     }
1163 
1164     /* Make sure the entry is not duplicated. This may happen when the same
1165      * name for a master is used multiple times inside the configuration or
1166      * if we try to add multiple times a slave or sentinel with same ip/port
1167      * to a master. */
1168     if (flags & SRI_MASTER) table = sentinel.masters;
1169     else if (flags & SRI_SLAVE) table = master->slaves;
1170     else if (flags & SRI_SENTINEL) table = master->sentinels;
1171     sdsname = sdsnew(name);
1172     if (dictFind(table,sdsname)) {
1173         releaseSentinelAddr(addr);
1174         sdsfree(sdsname);
1175         errno = EBUSY;
1176         return NULL;
1177     }
1178 
1179     /* Create the instance object. */
1180     ri = zmalloc(sizeof(*ri));
1181     /* Note that all the instances are started in the disconnected state,
1182      * the event loop will take care of connecting them. */
1183     ri->flags = flags;
1184     ri->name = sdsname;
1185     ri->runid = NULL;
1186     ri->config_epoch = 0;
1187     ri->addr = addr;
1188     ri->link = createInstanceLink();
1189     ri->last_pub_time = mstime();
1190     ri->last_hello_time = mstime();
1191     ri->last_master_down_reply_time = mstime();
1192     ri->s_down_since_time = 0;
1193     ri->o_down_since_time = 0;
1194     ri->down_after_period = master ? master->down_after_period :
1195                             SENTINEL_DEFAULT_DOWN_AFTER;
1196     ri->master_link_down_time = 0;
1197     ri->auth_pass = NULL;
1198     ri->slave_priority = SENTINEL_DEFAULT_SLAVE_PRIORITY;
1199     ri->slave_reconf_sent_time = 0;
1200     ri->slave_master_host = NULL;
1201     ri->slave_master_port = 0;
1202     ri->slave_master_link_status = SENTINEL_MASTER_LINK_STATUS_DOWN;
1203     ri->slave_repl_offset = 0;
1204     ri->sentinels = dictCreate(&instancesDictType,NULL);
1205     ri->quorum = quorum;
1206     ri->parallel_syncs = SENTINEL_DEFAULT_PARALLEL_SYNCS;
1207     ri->master = master;
1208     ri->slaves = dictCreate(&instancesDictType,NULL);
1209     ri->info_refresh = 0;
1210 
1211     /* Failover state. */
1212     ri->leader = NULL;
1213     ri->leader_epoch = 0;
1214     ri->failover_epoch = 0;
1215     ri->failover_state = SENTINEL_FAILOVER_STATE_NONE;
1216     ri->failover_state_change_time = 0;
1217     ri->failover_start_time = 0;
1218     ri->failover_timeout = SENTINEL_DEFAULT_FAILOVER_TIMEOUT;
1219     ri->failover_delay_logged = 0;
1220     ri->promoted_slave = NULL;
1221     ri->notification_script = NULL;
1222     ri->client_reconfig_script = NULL;
1223     ri->info = NULL;
1224 
1225     /* Role */
1226     ri->role_reported = ri->flags & (SRI_MASTER|SRI_SLAVE);
1227     ri->role_reported_time = mstime();
1228     ri->slave_conf_change_time = mstime();
1229 
1230     /* Add into the right table. */
1231     dictAdd(table, ri->name, ri);
1232     return ri;
1233 }
1234 
1235 /* Release this instance and all its slaves, sentinels, hiredis connections.
1236  * This function does not take care of unlinking the instance from the main
1237  * masters table (if it is a master) or from its master sentinels/slaves table
1238  * if it is a slave or sentinel. */
releaseSentinelRedisInstance(sentinelRedisInstance * ri)1239 void releaseSentinelRedisInstance(sentinelRedisInstance *ri) {
1240     /* Release all its slaves or sentinels if any. */
1241     dictRelease(ri->sentinels);
1242     dictRelease(ri->slaves);
1243 
1244     /* Disconnect the instance. */
1245     releaseInstanceLink(ri->link,ri);
1246 
1247     /* Free other resources. */
1248     sdsfree(ri->name);
1249     sdsfree(ri->runid);
1250     sdsfree(ri->notification_script);
1251     sdsfree(ri->client_reconfig_script);
1252     sdsfree(ri->slave_master_host);
1253     sdsfree(ri->leader);
1254     sdsfree(ri->auth_pass);
1255     sdsfree(ri->info);
1256     releaseSentinelAddr(ri->addr);
1257 
1258     /* Clear state into the master if needed. */
1259     if ((ri->flags & SRI_SLAVE) && (ri->flags & SRI_PROMOTED) && ri->master)
1260         ri->master->promoted_slave = NULL;
1261 
1262     zfree(ri);
1263 }
1264 
1265 /* Lookup a slave in a master Redis instance, by ip and port. */
sentinelRedisInstanceLookupSlave(sentinelRedisInstance * ri,char * ip,int port)1266 sentinelRedisInstance *sentinelRedisInstanceLookupSlave(
1267                 sentinelRedisInstance *ri, char *ip, int port)
1268 {
1269     sds key;
1270     sentinelRedisInstance *slave;
1271     char buf[NET_PEER_ID_LEN];
1272 
1273     serverAssert(ri->flags & SRI_MASTER);
1274     anetFormatAddr(buf,sizeof(buf),ip,port);
1275     key = sdsnew(buf);
1276     slave = dictFetchValue(ri->slaves,key);
1277     sdsfree(key);
1278     return slave;
1279 }
1280 
1281 /* Return the name of the type of the instance as a string. */
sentinelRedisInstanceTypeStr(sentinelRedisInstance * ri)1282 const char *sentinelRedisInstanceTypeStr(sentinelRedisInstance *ri) {
1283     if (ri->flags & SRI_MASTER) return "master";
1284     else if (ri->flags & SRI_SLAVE) return "slave";
1285     else if (ri->flags & SRI_SENTINEL) return "sentinel";
1286     else return "unknown";
1287 }
1288 
1289 /* This function remove the Sentinel with the specified ID from the
1290  * specified master.
1291  *
1292  * If "runid" is NULL the function returns ASAP.
1293  *
1294  * This function is useful because on Sentinels address switch, we want to
1295  * remove our old entry and add a new one for the same ID but with the new
1296  * address.
1297  *
1298  * The function returns 1 if the matching Sentinel was removed, otherwise
1299  * 0 if there was no Sentinel with this ID. */
removeMatchingSentinelFromMaster(sentinelRedisInstance * master,char * runid)1300 int removeMatchingSentinelFromMaster(sentinelRedisInstance *master, char *runid) {
1301     dictIterator *di;
1302     dictEntry *de;
1303     int removed = 0;
1304 
1305     if (runid == NULL) return 0;
1306 
1307     di = dictGetSafeIterator(master->sentinels);
1308     while((de = dictNext(di)) != NULL) {
1309         sentinelRedisInstance *ri = dictGetVal(de);
1310 
1311         if (ri->runid && strcmp(ri->runid,runid) == 0) {
1312             dictDelete(master->sentinels,ri->name);
1313             removed++;
1314         }
1315     }
1316     dictReleaseIterator(di);
1317     return removed;
1318 }
1319 
1320 /* Search an instance with the same runid, ip and port into a dictionary
1321  * of instances. Return NULL if not found, otherwise return the instance
1322  * pointer.
1323  *
1324  * runid or ip can be NULL. In such a case the search is performed only
1325  * by the non-NULL field. */
getSentinelRedisInstanceByAddrAndRunID(dict * instances,char * ip,int port,char * runid)1326 sentinelRedisInstance *getSentinelRedisInstanceByAddrAndRunID(dict *instances, char *ip, int port, char *runid) {
1327     dictIterator *di;
1328     dictEntry *de;
1329     sentinelRedisInstance *instance = NULL;
1330 
1331     serverAssert(ip || runid);   /* User must pass at least one search param. */
1332     di = dictGetIterator(instances);
1333     while((de = dictNext(di)) != NULL) {
1334         sentinelRedisInstance *ri = dictGetVal(de);
1335 
1336         if (runid && !ri->runid) continue;
1337         if ((runid == NULL || strcmp(ri->runid, runid) == 0) &&
1338             (ip == NULL || (strcmp(ri->addr->ip, ip) == 0 &&
1339                             ri->addr->port == port)))
1340         {
1341             instance = ri;
1342             break;
1343         }
1344     }
1345     dictReleaseIterator(di);
1346     return instance;
1347 }
1348 
1349 /* Master lookup by name */
sentinelGetMasterByName(char * name)1350 sentinelRedisInstance *sentinelGetMasterByName(char *name) {
1351     sentinelRedisInstance *ri;
1352     sds sdsname = sdsnew(name);
1353 
1354     ri = dictFetchValue(sentinel.masters,sdsname);
1355     sdsfree(sdsname);
1356     return ri;
1357 }
1358 
1359 /* Add the specified flags to all the instances in the specified dictionary. */
sentinelAddFlagsToDictOfRedisInstances(dict * instances,int flags)1360 void sentinelAddFlagsToDictOfRedisInstances(dict *instances, int flags) {
1361     dictIterator *di;
1362     dictEntry *de;
1363 
1364     di = dictGetIterator(instances);
1365     while((de = dictNext(di)) != NULL) {
1366         sentinelRedisInstance *ri = dictGetVal(de);
1367         ri->flags |= flags;
1368     }
1369     dictReleaseIterator(di);
1370 }
1371 
1372 /* Remove the specified flags to all the instances in the specified
1373  * dictionary. */
sentinelDelFlagsToDictOfRedisInstances(dict * instances,int flags)1374 void sentinelDelFlagsToDictOfRedisInstances(dict *instances, int flags) {
1375     dictIterator *di;
1376     dictEntry *de;
1377 
1378     di = dictGetIterator(instances);
1379     while((de = dictNext(di)) != NULL) {
1380         sentinelRedisInstance *ri = dictGetVal(de);
1381         ri->flags &= ~flags;
1382     }
1383     dictReleaseIterator(di);
1384 }
1385 
1386 /* Reset the state of a monitored master:
1387  * 1) Remove all slaves.
1388  * 2) Remove all sentinels.
1389  * 3) Remove most of the flags resulting from runtime operations.
1390  * 4) Reset timers to their default value. For example after a reset it will be
1391  *    possible to failover again the same master ASAP, without waiting the
1392  *    failover timeout delay.
1393  * 5) In the process of doing this undo the failover if in progress.
1394  * 6) Disconnect the connections with the master (will reconnect automatically).
1395  */
1396 
1397 #define SENTINEL_RESET_NO_SENTINELS (1<<0)
sentinelResetMaster(sentinelRedisInstance * ri,int flags)1398 void sentinelResetMaster(sentinelRedisInstance *ri, int flags) {
1399     serverAssert(ri->flags & SRI_MASTER);
1400     dictRelease(ri->slaves);
1401     ri->slaves = dictCreate(&instancesDictType,NULL);
1402     if (!(flags & SENTINEL_RESET_NO_SENTINELS)) {
1403         dictRelease(ri->sentinels);
1404         ri->sentinels = dictCreate(&instancesDictType,NULL);
1405     }
1406     instanceLinkCloseConnection(ri->link,ri->link->cc);
1407     instanceLinkCloseConnection(ri->link,ri->link->pc);
1408     ri->flags &= SRI_MASTER;
1409     if (ri->leader) {
1410         sdsfree(ri->leader);
1411         ri->leader = NULL;
1412     }
1413     ri->failover_state = SENTINEL_FAILOVER_STATE_NONE;
1414     ri->failover_state_change_time = 0;
1415     ri->failover_start_time = 0; /* We can failover again ASAP. */
1416     ri->promoted_slave = NULL;
1417     sdsfree(ri->runid);
1418     sdsfree(ri->slave_master_host);
1419     ri->runid = NULL;
1420     ri->slave_master_host = NULL;
1421     ri->link->act_ping_time = mstime();
1422     ri->link->last_ping_time = 0;
1423     ri->link->last_avail_time = mstime();
1424     ri->link->last_pong_time = mstime();
1425     ri->role_reported_time = mstime();
1426     ri->role_reported = SRI_MASTER;
1427     if (flags & SENTINEL_GENERATE_EVENT)
1428         sentinelEvent(LL_WARNING,"+reset-master",ri,"%@");
1429 }
1430 
1431 /* Call sentinelResetMaster() on every master with a name matching the specified
1432  * pattern. */
sentinelResetMastersByPattern(char * pattern,int flags)1433 int sentinelResetMastersByPattern(char *pattern, int flags) {
1434     dictIterator *di;
1435     dictEntry *de;
1436     int reset = 0;
1437 
1438     di = dictGetIterator(sentinel.masters);
1439     while((de = dictNext(di)) != NULL) {
1440         sentinelRedisInstance *ri = dictGetVal(de);
1441 
1442         if (ri->name) {
1443             if (stringmatch(pattern,ri->name,0)) {
1444                 sentinelResetMaster(ri,flags);
1445                 reset++;
1446             }
1447         }
1448     }
1449     dictReleaseIterator(di);
1450     return reset;
1451 }
1452 
1453 /* Reset the specified master with sentinelResetMaster(), and also change
1454  * the ip:port address, but take the name of the instance unmodified.
1455  *
1456  * This is used to handle the +switch-master event.
1457  *
1458  * The function returns C_ERR if the address can't be resolved for some
1459  * reason. Otherwise C_OK is returned.  */
sentinelResetMasterAndChangeAddress(sentinelRedisInstance * master,char * ip,int port)1460 int sentinelResetMasterAndChangeAddress(sentinelRedisInstance *master, char *ip, int port) {
1461     sentinelAddr *oldaddr, *newaddr;
1462     sentinelAddr **slaves = NULL;
1463     int numslaves = 0, j;
1464     dictIterator *di;
1465     dictEntry *de;
1466 
1467     newaddr = createSentinelAddr(ip,port);
1468     if (newaddr == NULL) return C_ERR;
1469 
1470     /* Make a list of slaves to add back after the reset.
1471      * Don't include the one having the address we are switching to. */
1472     di = dictGetIterator(master->slaves);
1473     while((de = dictNext(di)) != NULL) {
1474         sentinelRedisInstance *slave = dictGetVal(de);
1475 
1476         if (sentinelAddrIsEqual(slave->addr,newaddr)) continue;
1477         slaves = zrealloc(slaves,sizeof(sentinelAddr*)*(numslaves+1));
1478         slaves[numslaves++] = createSentinelAddr(slave->addr->ip,
1479                                                  slave->addr->port);
1480     }
1481     dictReleaseIterator(di);
1482 
1483     /* If we are switching to a different address, include the old address
1484      * as a slave as well, so that we'll be able to sense / reconfigure
1485      * the old master. */
1486     if (!sentinelAddrIsEqual(newaddr,master->addr)) {
1487         slaves = zrealloc(slaves,sizeof(sentinelAddr*)*(numslaves+1));
1488         slaves[numslaves++] = createSentinelAddr(master->addr->ip,
1489                                                  master->addr->port);
1490     }
1491 
1492     /* Reset and switch address. */
1493     sentinelResetMaster(master,SENTINEL_RESET_NO_SENTINELS);
1494     oldaddr = master->addr;
1495     master->addr = newaddr;
1496     master->o_down_since_time = 0;
1497     master->s_down_since_time = 0;
1498 
1499     /* Add slaves back. */
1500     for (j = 0; j < numslaves; j++) {
1501         sentinelRedisInstance *slave;
1502 
1503         slave = createSentinelRedisInstance(NULL,SRI_SLAVE,slaves[j]->ip,
1504                     slaves[j]->port, master->quorum, master);
1505         releaseSentinelAddr(slaves[j]);
1506         if (slave) sentinelEvent(LL_NOTICE,"+slave",slave,"%@");
1507     }
1508     zfree(slaves);
1509 
1510     /* Release the old address at the end so we are safe even if the function
1511      * gets the master->addr->ip and master->addr->port as arguments. */
1512     releaseSentinelAddr(oldaddr);
1513     sentinelFlushConfig();
1514     return C_OK;
1515 }
1516 
1517 /* Return non-zero if there was no SDOWN or ODOWN error associated to this
1518  * instance in the latest 'ms' milliseconds. */
sentinelRedisInstanceNoDownFor(sentinelRedisInstance * ri,mstime_t ms)1519 int sentinelRedisInstanceNoDownFor(sentinelRedisInstance *ri, mstime_t ms) {
1520     mstime_t most_recent;
1521 
1522     most_recent = ri->s_down_since_time;
1523     if (ri->o_down_since_time > most_recent)
1524         most_recent = ri->o_down_since_time;
1525     return most_recent == 0 || (mstime() - most_recent) > ms;
1526 }
1527 
1528 /* Return the current master address, that is, its address or the address
1529  * of the promoted slave if already operational. */
sentinelGetCurrentMasterAddress(sentinelRedisInstance * master)1530 sentinelAddr *sentinelGetCurrentMasterAddress(sentinelRedisInstance *master) {
1531     /* If we are failing over the master, and the state is already
1532      * SENTINEL_FAILOVER_STATE_RECONF_SLAVES or greater, it means that we
1533      * already have the new configuration epoch in the master, and the
1534      * slave acknowledged the configuration switch. Advertise the new
1535      * address. */
1536     if ((master->flags & SRI_FAILOVER_IN_PROGRESS) &&
1537         master->promoted_slave &&
1538         master->failover_state >= SENTINEL_FAILOVER_STATE_RECONF_SLAVES)
1539     {
1540         return master->promoted_slave->addr;
1541     } else {
1542         return master->addr;
1543     }
1544 }
1545 
1546 /* This function sets the down_after_period field value in 'master' to all
1547  * the slaves and sentinel instances connected to this master. */
sentinelPropagateDownAfterPeriod(sentinelRedisInstance * master)1548 void sentinelPropagateDownAfterPeriod(sentinelRedisInstance *master) {
1549     dictIterator *di;
1550     dictEntry *de;
1551     int j;
1552     dict *d[] = {master->slaves, master->sentinels, NULL};
1553 
1554     for (j = 0; d[j]; j++) {
1555         di = dictGetIterator(d[j]);
1556         while((de = dictNext(di)) != NULL) {
1557             sentinelRedisInstance *ri = dictGetVal(de);
1558             ri->down_after_period = master->down_after_period;
1559         }
1560         dictReleaseIterator(di);
1561     }
1562 }
1563 
sentinelGetInstanceTypeString(sentinelRedisInstance * ri)1564 char *sentinelGetInstanceTypeString(sentinelRedisInstance *ri) {
1565     if (ri->flags & SRI_MASTER) return "master";
1566     else if (ri->flags & SRI_SLAVE) return "slave";
1567     else if (ri->flags & SRI_SENTINEL) return "sentinel";
1568     else return "unknown";
1569 }
1570 
1571 /* ============================ Config handling ============================= */
sentinelHandleConfiguration(char ** argv,int argc)1572 char *sentinelHandleConfiguration(char **argv, int argc) {
1573     sentinelRedisInstance *ri;
1574 
1575     if (!strcasecmp(argv[0],"monitor") && argc == 5) {
1576         /* monitor <name> <host> <port> <quorum> */
1577         int quorum = atoi(argv[4]);
1578 
1579         if (quorum <= 0) return "Quorum must be 1 or greater.";
1580         if (createSentinelRedisInstance(argv[1],SRI_MASTER,argv[2],
1581                                         atoi(argv[3]),quorum,NULL) == NULL)
1582         {
1583             switch(errno) {
1584             case EBUSY: return "Duplicated master name.";
1585             case ENOENT: return "Can't resolve master instance hostname.";
1586             case EINVAL: return "Invalid port number";
1587             }
1588         }
1589     } else if (!strcasecmp(argv[0],"down-after-milliseconds") && argc == 3) {
1590         /* down-after-milliseconds <name> <milliseconds> */
1591         ri = sentinelGetMasterByName(argv[1]);
1592         if (!ri) return "No such master with specified name.";
1593         ri->down_after_period = atoi(argv[2]);
1594         if (ri->down_after_period <= 0)
1595             return "negative or zero time parameter.";
1596         sentinelPropagateDownAfterPeriod(ri);
1597     } else if (!strcasecmp(argv[0],"failover-timeout") && argc == 3) {
1598         /* failover-timeout <name> <milliseconds> */
1599         ri = sentinelGetMasterByName(argv[1]);
1600         if (!ri) return "No such master with specified name.";
1601         ri->failover_timeout = atoi(argv[2]);
1602         if (ri->failover_timeout <= 0)
1603             return "negative or zero time parameter.";
1604    } else if (!strcasecmp(argv[0],"parallel-syncs") && argc == 3) {
1605         /* parallel-syncs <name> <milliseconds> */
1606         ri = sentinelGetMasterByName(argv[1]);
1607         if (!ri) return "No such master with specified name.";
1608         ri->parallel_syncs = atoi(argv[2]);
1609    } else if (!strcasecmp(argv[0],"notification-script") && argc == 3) {
1610         /* notification-script <name> <path> */
1611         ri = sentinelGetMasterByName(argv[1]);
1612         if (!ri) return "No such master with specified name.";
1613         if (access(argv[2],X_OK) == -1)
1614             return "Notification script seems non existing or non executable.";
1615         ri->notification_script = sdsnew(argv[2]);
1616    } else if (!strcasecmp(argv[0],"client-reconfig-script") && argc == 3) {
1617         /* client-reconfig-script <name> <path> */
1618         ri = sentinelGetMasterByName(argv[1]);
1619         if (!ri) return "No such master with specified name.";
1620         if (access(argv[2],X_OK) == -1)
1621             return "Client reconfiguration script seems non existing or "
1622                    "non executable.";
1623         ri->client_reconfig_script = sdsnew(argv[2]);
1624    } else if (!strcasecmp(argv[0],"auth-pass") && argc == 3) {
1625         /* auth-pass <name> <password> */
1626         ri = sentinelGetMasterByName(argv[1]);
1627         if (!ri) return "No such master with specified name.";
1628         ri->auth_pass = sdsnew(argv[2]);
1629     } else if (!strcasecmp(argv[0],"current-epoch") && argc == 2) {
1630         /* current-epoch <epoch> */
1631         unsigned long long current_epoch = strtoull(argv[1],NULL,10);
1632         if (current_epoch > sentinel.current_epoch)
1633             sentinel.current_epoch = current_epoch;
1634     } else if (!strcasecmp(argv[0],"myid") && argc == 2) {
1635         if (strlen(argv[1]) != CONFIG_RUN_ID_SIZE)
1636             return "Malformed Sentinel id in myid option.";
1637         memcpy(sentinel.myid,argv[1],CONFIG_RUN_ID_SIZE);
1638     } else if (!strcasecmp(argv[0],"config-epoch") && argc == 3) {
1639         /* config-epoch <name> <epoch> */
1640         ri = sentinelGetMasterByName(argv[1]);
1641         if (!ri) return "No such master with specified name.";
1642         ri->config_epoch = strtoull(argv[2],NULL,10);
1643         /* The following update of current_epoch is not really useful as
1644          * now the current epoch is persisted on the config file, but
1645          * we leave this check here for redundancy. */
1646         if (ri->config_epoch > sentinel.current_epoch)
1647             sentinel.current_epoch = ri->config_epoch;
1648     } else if (!strcasecmp(argv[0],"leader-epoch") && argc == 3) {
1649         /* leader-epoch <name> <epoch> */
1650         ri = sentinelGetMasterByName(argv[1]);
1651         if (!ri) return "No such master with specified name.";
1652         ri->leader_epoch = strtoull(argv[2],NULL,10);
1653     } else if (!strcasecmp(argv[0],"known-slave") && argc == 4) {
1654         sentinelRedisInstance *slave;
1655 
1656         /* known-slave <name> <ip> <port> */
1657         ri = sentinelGetMasterByName(argv[1]);
1658         if (!ri) return "No such master with specified name.";
1659         if ((slave = createSentinelRedisInstance(NULL,SRI_SLAVE,argv[2],
1660                     atoi(argv[3]), ri->quorum, ri)) == NULL)
1661         {
1662             return "Wrong hostname or port for slave.";
1663         }
1664     } else if (!strcasecmp(argv[0],"known-sentinel") &&
1665                (argc == 4 || argc == 5)) {
1666         sentinelRedisInstance *si;
1667 
1668         if (argc == 5) { /* Ignore the old form without runid. */
1669             /* known-sentinel <name> <ip> <port> [runid] */
1670             ri = sentinelGetMasterByName(argv[1]);
1671             if (!ri) return "No such master with specified name.";
1672             if ((si = createSentinelRedisInstance(argv[4],SRI_SENTINEL,argv[2],
1673                         atoi(argv[3]), ri->quorum, ri)) == NULL)
1674             {
1675                 return "Wrong hostname or port for sentinel.";
1676             }
1677             si->runid = sdsnew(argv[4]);
1678             sentinelTryConnectionSharing(si);
1679         }
1680     } else if (!strcasecmp(argv[0],"announce-ip") && argc == 2) {
1681         /* announce-ip <ip-address> */
1682         if (strlen(argv[1]))
1683             sentinel.announce_ip = sdsnew(argv[1]);
1684     } else if (!strcasecmp(argv[0],"announce-port") && argc == 2) {
1685         /* announce-port <port> */
1686         sentinel.announce_port = atoi(argv[1]);
1687     } else {
1688         return "Unrecognized sentinel configuration statement.";
1689     }
1690     return NULL;
1691 }
1692 
1693 /* Implements CONFIG REWRITE for "sentinel" option.
1694  * This is used not just to rewrite the configuration given by the user
1695  * (the configured masters) but also in order to retain the state of
1696  * Sentinel across restarts: config epoch of masters, associated slaves
1697  * and sentinel instances, and so forth. */
rewriteConfigSentinelOption(struct rewriteConfigState * state)1698 void rewriteConfigSentinelOption(struct rewriteConfigState *state) {
1699     dictIterator *di, *di2;
1700     dictEntry *de;
1701     sds line;
1702 
1703     /* sentinel unique ID. */
1704     line = sdscatprintf(sdsempty(), "sentinel myid %s", sentinel.myid);
1705     rewriteConfigRewriteLine(state,"sentinel",line,1);
1706 
1707     /* For every master emit a "sentinel monitor" config entry. */
1708     di = dictGetIterator(sentinel.masters);
1709     while((de = dictNext(di)) != NULL) {
1710         sentinelRedisInstance *master, *ri;
1711         sentinelAddr *master_addr;
1712 
1713         /* sentinel monitor */
1714         master = dictGetVal(de);
1715         master_addr = sentinelGetCurrentMasterAddress(master);
1716         line = sdscatprintf(sdsempty(),"sentinel monitor %s %s %d %d",
1717             master->name, master_addr->ip, master_addr->port,
1718             master->quorum);
1719         rewriteConfigRewriteLine(state,"sentinel",line,1);
1720 
1721         /* sentinel down-after-milliseconds */
1722         if (master->down_after_period != SENTINEL_DEFAULT_DOWN_AFTER) {
1723             line = sdscatprintf(sdsempty(),
1724                 "sentinel down-after-milliseconds %s %ld",
1725                 master->name, (long) master->down_after_period);
1726             rewriteConfigRewriteLine(state,"sentinel",line,1);
1727         }
1728 
1729         /* sentinel failover-timeout */
1730         if (master->failover_timeout != SENTINEL_DEFAULT_FAILOVER_TIMEOUT) {
1731             line = sdscatprintf(sdsempty(),
1732                 "sentinel failover-timeout %s %ld",
1733                 master->name, (long) master->failover_timeout);
1734             rewriteConfigRewriteLine(state,"sentinel",line,1);
1735         }
1736 
1737         /* sentinel parallel-syncs */
1738         if (master->parallel_syncs != SENTINEL_DEFAULT_PARALLEL_SYNCS) {
1739             line = sdscatprintf(sdsempty(),
1740                 "sentinel parallel-syncs %s %d",
1741                 master->name, master->parallel_syncs);
1742             rewriteConfigRewriteLine(state,"sentinel",line,1);
1743         }
1744 
1745         /* sentinel notification-script */
1746         if (master->notification_script) {
1747             line = sdscatprintf(sdsempty(),
1748                 "sentinel notification-script %s %s",
1749                 master->name, master->notification_script);
1750             rewriteConfigRewriteLine(state,"sentinel",line,1);
1751         }
1752 
1753         /* sentinel client-reconfig-script */
1754         if (master->client_reconfig_script) {
1755             line = sdscatprintf(sdsempty(),
1756                 "sentinel client-reconfig-script %s %s",
1757                 master->name, master->client_reconfig_script);
1758             rewriteConfigRewriteLine(state,"sentinel",line,1);
1759         }
1760 
1761         /* sentinel auth-pass */
1762         if (master->auth_pass) {
1763             line = sdscatprintf(sdsempty(),
1764                 "sentinel auth-pass %s %s",
1765                 master->name, master->auth_pass);
1766             rewriteConfigRewriteLine(state,"sentinel",line,1);
1767         }
1768 
1769         /* sentinel config-epoch */
1770         line = sdscatprintf(sdsempty(),
1771             "sentinel config-epoch %s %llu",
1772             master->name, (unsigned long long) master->config_epoch);
1773         rewriteConfigRewriteLine(state,"sentinel",line,1);
1774 
1775         /* sentinel leader-epoch */
1776         line = sdscatprintf(sdsempty(),
1777             "sentinel leader-epoch %s %llu",
1778             master->name, (unsigned long long) master->leader_epoch);
1779         rewriteConfigRewriteLine(state,"sentinel",line,1);
1780 
1781         /* sentinel known-slave */
1782         di2 = dictGetIterator(master->slaves);
1783         while((de = dictNext(di2)) != NULL) {
1784             sentinelAddr *slave_addr;
1785 
1786             ri = dictGetVal(de);
1787             slave_addr = ri->addr;
1788 
1789             /* If master_addr (obtained using sentinelGetCurrentMasterAddress()
1790              * so it may be the address of the promoted slave) is equal to this
1791              * slave's address, a failover is in progress and the slave was
1792              * already successfully promoted. So as the address of this slave
1793              * we use the old master address instead. */
1794             if (sentinelAddrIsEqual(slave_addr,master_addr))
1795                 slave_addr = master->addr;
1796             line = sdscatprintf(sdsempty(),
1797                 "sentinel known-slave %s %s %d",
1798                 master->name, slave_addr->ip, slave_addr->port);
1799             rewriteConfigRewriteLine(state,"sentinel",line,1);
1800         }
1801         dictReleaseIterator(di2);
1802 
1803         /* sentinel known-sentinel */
1804         di2 = dictGetIterator(master->sentinels);
1805         while((de = dictNext(di2)) != NULL) {
1806             ri = dictGetVal(de);
1807             if (ri->runid == NULL) continue;
1808             line = sdscatprintf(sdsempty(),
1809                 "sentinel known-sentinel %s %s %d %s",
1810                 master->name, ri->addr->ip, ri->addr->port, ri->runid);
1811             rewriteConfigRewriteLine(state,"sentinel",line,1);
1812         }
1813         dictReleaseIterator(di2);
1814     }
1815 
1816     /* sentinel current-epoch is a global state valid for all the masters. */
1817     line = sdscatprintf(sdsempty(),
1818         "sentinel current-epoch %llu", (unsigned long long) sentinel.current_epoch);
1819     rewriteConfigRewriteLine(state,"sentinel",line,1);
1820 
1821     /* sentinel announce-ip. */
1822     if (sentinel.announce_ip) {
1823         line = sdsnew("sentinel announce-ip ");
1824         line = sdscatrepr(line, sentinel.announce_ip, sdslen(sentinel.announce_ip));
1825         rewriteConfigRewriteLine(state,"sentinel",line,1);
1826     }
1827 
1828     /* sentinel announce-port. */
1829     if (sentinel.announce_port) {
1830         line = sdscatprintf(sdsempty(),"sentinel announce-port %d",
1831                             sentinel.announce_port);
1832         rewriteConfigRewriteLine(state,"sentinel",line,1);
1833     }
1834 
1835     dictReleaseIterator(di);
1836 }
1837 
1838 /* This function uses the config rewriting Redis engine in order to persist
1839  * the state of the Sentinel in the current configuration file.
1840  *
1841  * Before returning the function calls fsync() against the generated
1842  * configuration file to make sure changes are committed to disk.
1843  *
1844  * On failure the function logs a warning on the Redis log. */
sentinelFlushConfig(void)1845 void sentinelFlushConfig(void) {
1846     int fd = -1;
1847     int saved_hz = server.hz;
1848     int rewrite_status;
1849 
1850     server.hz = CONFIG_DEFAULT_HZ;
1851     rewrite_status = rewriteConfig(server.configfile);
1852     server.hz = saved_hz;
1853 
1854     if (rewrite_status == -1) goto werr;
1855     if ((fd = open(server.configfile,O_RDONLY)) == -1) goto werr;
1856     if (fsync(fd) == -1) goto werr;
1857     if (close(fd) == EOF) goto werr;
1858     return;
1859 
1860 werr:
1861     if (fd != -1) close(fd);
1862     serverLog(LL_WARNING,"WARNING: Sentinel was not able to save the new configuration on disk!!!: %s", strerror(errno));
1863 }
1864 
1865 /* ====================== hiredis connection handling ======================= */
1866 
1867 /* Send the AUTH command with the specified master password if needed.
1868  * Note that for slaves the password set for the master is used.
1869  *
1870  * We don't check at all if the command was successfully transmitted
1871  * to the instance as if it fails Sentinel will detect the instance down,
1872  * will disconnect and reconnect the link and so forth. */
sentinelSendAuthIfNeeded(sentinelRedisInstance * ri,redisAsyncContext * c)1873 void sentinelSendAuthIfNeeded(sentinelRedisInstance *ri, redisAsyncContext *c) {
1874     char *auth_pass = (ri->flags & SRI_MASTER) ? ri->auth_pass :
1875                                                  ri->master->auth_pass;
1876 
1877     if (auth_pass) {
1878         if (redisAsyncCommand(c, sentinelDiscardReplyCallback, ri, "AUTH %s",
1879             auth_pass) == C_OK) ri->link->pending_commands++;
1880     }
1881 }
1882 
1883 /* Use CLIENT SETNAME to name the connection in the Redis instance as
1884  * sentinel-<first_8_chars_of_runid>-<connection_type>
1885  * The connection type is "cmd" or "pubsub" as specified by 'type'.
1886  *
1887  * This makes it possible to list all the sentinel instances connected
1888  * to a Redis servewr with CLIENT LIST, grepping for a specific name format. */
sentinelSetClientName(sentinelRedisInstance * ri,redisAsyncContext * c,char * type)1889 void sentinelSetClientName(sentinelRedisInstance *ri, redisAsyncContext *c, char *type) {
1890     char name[64];
1891 
1892     snprintf(name,sizeof(name),"sentinel-%.8s-%s",sentinel.myid,type);
1893     if (redisAsyncCommand(c, sentinelDiscardReplyCallback, ri,
1894         "CLIENT SETNAME %s", name) == C_OK)
1895     {
1896         ri->link->pending_commands++;
1897     }
1898 }
1899 
1900 /* Create the async connections for the instance link if the link
1901  * is disconnected. Note that link->disconnected is true even if just
1902  * one of the two links (commands and pub/sub) is missing. */
sentinelReconnectInstance(sentinelRedisInstance * ri)1903 void sentinelReconnectInstance(sentinelRedisInstance *ri) {
1904     if (ri->link->disconnected == 0) return;
1905     if (ri->addr->port == 0) return; /* port == 0 means invalid address. */
1906     instanceLink *link = ri->link;
1907     mstime_t now = mstime();
1908 
1909     if (now - ri->link->last_reconn_time < SENTINEL_PING_PERIOD) return;
1910     ri->link->last_reconn_time = now;
1911 
1912     /* Commands connection. */
1913     if (link->cc == NULL) {
1914         link->cc = redisAsyncConnectBind(ri->addr->ip,ri->addr->port,NET_FIRST_BIND_ADDR);
1915         if (link->cc->err) {
1916             sentinelEvent(LL_DEBUG,"-cmd-link-reconnection",ri,"%@ #%s",
1917                 link->cc->errstr);
1918             instanceLinkCloseConnection(link,link->cc);
1919         } else {
1920             link->pending_commands = 0;
1921             link->cc_conn_time = mstime();
1922             link->cc->data = link;
1923             redisAeAttach(server.el,link->cc);
1924             redisAsyncSetConnectCallback(link->cc,
1925                     sentinelLinkEstablishedCallback);
1926             redisAsyncSetDisconnectCallback(link->cc,
1927                     sentinelDisconnectCallback);
1928             sentinelSendAuthIfNeeded(ri,link->cc);
1929             sentinelSetClientName(ri,link->cc,"cmd");
1930 
1931             /* Send a PING ASAP when reconnecting. */
1932             sentinelSendPing(ri);
1933         }
1934     }
1935     /* Pub / Sub */
1936     if ((ri->flags & (SRI_MASTER|SRI_SLAVE)) && link->pc == NULL) {
1937         link->pc = redisAsyncConnectBind(ri->addr->ip,ri->addr->port,NET_FIRST_BIND_ADDR);
1938         if (link->pc->err) {
1939             sentinelEvent(LL_DEBUG,"-pubsub-link-reconnection",ri,"%@ #%s",
1940                 link->pc->errstr);
1941             instanceLinkCloseConnection(link,link->pc);
1942         } else {
1943             int retval;
1944 
1945             link->pc_conn_time = mstime();
1946             link->pc->data = link;
1947             redisAeAttach(server.el,link->pc);
1948             redisAsyncSetConnectCallback(link->pc,
1949                     sentinelLinkEstablishedCallback);
1950             redisAsyncSetDisconnectCallback(link->pc,
1951                     sentinelDisconnectCallback);
1952             sentinelSendAuthIfNeeded(ri,link->pc);
1953             sentinelSetClientName(ri,link->pc,"pubsub");
1954             /* Now we subscribe to the Sentinels "Hello" channel. */
1955             retval = redisAsyncCommand(link->pc,
1956                 sentinelReceiveHelloMessages, ri, "SUBSCRIBE %s",
1957                     SENTINEL_HELLO_CHANNEL);
1958             if (retval != C_OK) {
1959                 /* If we can't subscribe, the Pub/Sub connection is useless
1960                  * and we can simply disconnect it and try again. */
1961                 instanceLinkCloseConnection(link,link->pc);
1962                 return;
1963             }
1964         }
1965     }
1966     /* Clear the disconnected status only if we have both the connections
1967      * (or just the commands connection if this is a sentinel instance). */
1968     if (link->cc && (ri->flags & SRI_SENTINEL || link->pc))
1969         link->disconnected = 0;
1970 }
1971 
1972 /* ======================== Redis instances pinging  ======================== */
1973 
1974 /* Return true if master looks "sane", that is:
1975  * 1) It is actually a master in the current configuration.
1976  * 2) It reports itself as a master.
1977  * 3) It is not SDOWN or ODOWN.
1978  * 4) We obtained last INFO no more than two times the INFO period time ago. */
sentinelMasterLooksSane(sentinelRedisInstance * master)1979 int sentinelMasterLooksSane(sentinelRedisInstance *master) {
1980     return
1981         master->flags & SRI_MASTER &&
1982         master->role_reported == SRI_MASTER &&
1983         (master->flags & (SRI_S_DOWN|SRI_O_DOWN)) == 0 &&
1984         (mstime() - master->info_refresh) < SENTINEL_INFO_PERIOD*2;
1985 }
1986 
1987 /* Process the INFO output from masters. */
sentinelRefreshInstanceInfo(sentinelRedisInstance * ri,const char * info)1988 void sentinelRefreshInstanceInfo(sentinelRedisInstance *ri, const char *info) {
1989     sds *lines;
1990     int numlines, j;
1991     int role = 0;
1992 
1993     /* cache full INFO output for instance */
1994     sdsfree(ri->info);
1995     ri->info = sdsnew(info);
1996 
1997     /* The following fields must be reset to a given value in the case they
1998      * are not found at all in the INFO output. */
1999     ri->master_link_down_time = 0;
2000 
2001     /* Process line by line. */
2002     lines = sdssplitlen(info,strlen(info),"\r\n",2,&numlines);
2003     for (j = 0; j < numlines; j++) {
2004         sentinelRedisInstance *slave;
2005         sds l = lines[j];
2006 
2007         /* run_id:<40 hex chars>*/
2008         if (sdslen(l) >= 47 && !memcmp(l,"run_id:",7)) {
2009             if (ri->runid == NULL) {
2010                 ri->runid = sdsnewlen(l+7,40);
2011             } else {
2012                 if (strncmp(ri->runid,l+7,40) != 0) {
2013                     sentinelEvent(LL_NOTICE,"+reboot",ri,"%@");
2014                     sdsfree(ri->runid);
2015                     ri->runid = sdsnewlen(l+7,40);
2016                 }
2017             }
2018         }
2019 
2020         /* old versions: slave0:<ip>,<port>,<state>
2021          * new versions: slave0:ip=127.0.0.1,port=9999,... */
2022         if ((ri->flags & SRI_MASTER) &&
2023             sdslen(l) >= 7 &&
2024             !memcmp(l,"slave",5) && isdigit(l[5]))
2025         {
2026             char *ip, *port, *end;
2027 
2028             if (strstr(l,"ip=") == NULL) {
2029                 /* Old format. */
2030                 ip = strchr(l,':'); if (!ip) continue;
2031                 ip++; /* Now ip points to start of ip address. */
2032                 port = strchr(ip,','); if (!port) continue;
2033                 *port = '\0'; /* nul term for easy access. */
2034                 port++; /* Now port points to start of port number. */
2035                 end = strchr(port,','); if (!end) continue;
2036                 *end = '\0'; /* nul term for easy access. */
2037             } else {
2038                 /* New format. */
2039                 ip = strstr(l,"ip="); if (!ip) continue;
2040                 ip += 3; /* Now ip points to start of ip address. */
2041                 port = strstr(l,"port="); if (!port) continue;
2042                 port += 5; /* Now port points to start of port number. */
2043                 /* Nul term both fields for easy access. */
2044                 end = strchr(ip,','); if (end) *end = '\0';
2045                 end = strchr(port,','); if (end) *end = '\0';
2046             }
2047 
2048             /* Check if we already have this slave into our table,
2049              * otherwise add it. */
2050             if (sentinelRedisInstanceLookupSlave(ri,ip,atoi(port)) == NULL) {
2051                 if ((slave = createSentinelRedisInstance(NULL,SRI_SLAVE,ip,
2052                             atoi(port), ri->quorum, ri)) != NULL)
2053                 {
2054                     sentinelEvent(LL_NOTICE,"+slave",slave,"%@");
2055                     sentinelFlushConfig();
2056                 }
2057             }
2058         }
2059 
2060         /* master_link_down_since_seconds:<seconds> */
2061         if (sdslen(l) >= 32 &&
2062             !memcmp(l,"master_link_down_since_seconds",30))
2063         {
2064             ri->master_link_down_time = strtoll(l+31,NULL,10)*1000;
2065         }
2066 
2067         /* role:<role> */
2068         if (!memcmp(l,"role:master",11)) role = SRI_MASTER;
2069         else if (!memcmp(l,"role:slave",10)) role = SRI_SLAVE;
2070 
2071         if (role == SRI_SLAVE) {
2072             /* master_host:<host> */
2073             if (sdslen(l) >= 12 && !memcmp(l,"master_host:",12)) {
2074                 if (ri->slave_master_host == NULL ||
2075                     strcasecmp(l+12,ri->slave_master_host))
2076                 {
2077                     sdsfree(ri->slave_master_host);
2078                     ri->slave_master_host = sdsnew(l+12);
2079                     ri->slave_conf_change_time = mstime();
2080                 }
2081             }
2082 
2083             /* master_port:<port> */
2084             if (sdslen(l) >= 12 && !memcmp(l,"master_port:",12)) {
2085                 int slave_master_port = atoi(l+12);
2086 
2087                 if (ri->slave_master_port != slave_master_port) {
2088                     ri->slave_master_port = slave_master_port;
2089                     ri->slave_conf_change_time = mstime();
2090                 }
2091             }
2092 
2093             /* master_link_status:<status> */
2094             if (sdslen(l) >= 19 && !memcmp(l,"master_link_status:",19)) {
2095                 ri->slave_master_link_status =
2096                     (strcasecmp(l+19,"up") == 0) ?
2097                     SENTINEL_MASTER_LINK_STATUS_UP :
2098                     SENTINEL_MASTER_LINK_STATUS_DOWN;
2099             }
2100 
2101             /* slave_priority:<priority> */
2102             if (sdslen(l) >= 15 && !memcmp(l,"slave_priority:",15))
2103                 ri->slave_priority = atoi(l+15);
2104 
2105             /* slave_repl_offset:<offset> */
2106             if (sdslen(l) >= 18 && !memcmp(l,"slave_repl_offset:",18))
2107                 ri->slave_repl_offset = strtoull(l+18,NULL,10);
2108         }
2109     }
2110     ri->info_refresh = mstime();
2111     sdsfreesplitres(lines,numlines);
2112 
2113     /* ---------------------------- Acting half -----------------------------
2114      * Some things will not happen if sentinel.tilt is true, but some will
2115      * still be processed. */
2116 
2117     /* Remember when the role changed. */
2118     if (role != ri->role_reported) {
2119         ri->role_reported_time = mstime();
2120         ri->role_reported = role;
2121         if (role == SRI_SLAVE) ri->slave_conf_change_time = mstime();
2122         /* Log the event with +role-change if the new role is coherent or
2123          * with -role-change if there is a mismatch with the current config. */
2124         sentinelEvent(LL_VERBOSE,
2125             ((ri->flags & (SRI_MASTER|SRI_SLAVE)) == role) ?
2126             "+role-change" : "-role-change",
2127             ri, "%@ new reported role is %s",
2128             role == SRI_MASTER ? "master" : "slave",
2129             ri->flags & SRI_MASTER ? "master" : "slave");
2130     }
2131 
2132     /* None of the following conditions are processed when in tilt mode, so
2133      * return asap. */
2134     if (sentinel.tilt) return;
2135 
2136     /* Handle master -> slave role switch. */
2137     if ((ri->flags & SRI_MASTER) && role == SRI_SLAVE) {
2138         /* Nothing to do, but masters claiming to be slaves are
2139          * considered to be unreachable by Sentinel, so eventually
2140          * a failover will be triggered. */
2141     }
2142 
2143     /* Handle slave -> master role switch. */
2144     if ((ri->flags & SRI_SLAVE) && role == SRI_MASTER) {
2145         /* If this is a promoted slave we can change state to the
2146          * failover state machine. */
2147         if ((ri->flags & SRI_PROMOTED) &&
2148             (ri->master->flags & SRI_FAILOVER_IN_PROGRESS) &&
2149             (ri->master->failover_state ==
2150                 SENTINEL_FAILOVER_STATE_WAIT_PROMOTION))
2151         {
2152             /* Now that we are sure the slave was reconfigured as a master
2153              * set the master configuration epoch to the epoch we won the
2154              * election to perform this failover. This will force the other
2155              * Sentinels to update their config (assuming there is not
2156              * a newer one already available). */
2157             ri->master->config_epoch = ri->master->failover_epoch;
2158             ri->master->failover_state = SENTINEL_FAILOVER_STATE_RECONF_SLAVES;
2159             ri->master->failover_state_change_time = mstime();
2160             sentinelFlushConfig();
2161             sentinelEvent(LL_WARNING,"+promoted-slave",ri,"%@");
2162             if (sentinel.simfailure_flags &
2163                 SENTINEL_SIMFAILURE_CRASH_AFTER_PROMOTION)
2164                 sentinelSimFailureCrash();
2165             sentinelEvent(LL_WARNING,"+failover-state-reconf-slaves",
2166                 ri->master,"%@");
2167             sentinelCallClientReconfScript(ri->master,SENTINEL_LEADER,
2168                 "start",ri->master->addr,ri->addr);
2169             sentinelForceHelloUpdateForMaster(ri->master);
2170         } else {
2171             /* A slave turned into a master. We want to force our view and
2172              * reconfigure as slave. Wait some time after the change before
2173              * going forward, to receive new configs if any. */
2174             mstime_t wait_time = SENTINEL_PUBLISH_PERIOD*4;
2175 
2176             if (!(ri->flags & SRI_PROMOTED) &&
2177                  sentinelMasterLooksSane(ri->master) &&
2178                  sentinelRedisInstanceNoDownFor(ri,wait_time) &&
2179                  mstime() - ri->role_reported_time > wait_time)
2180             {
2181                 int retval = sentinelSendSlaveOf(ri,
2182                         ri->master->addr->ip,
2183                         ri->master->addr->port);
2184                 if (retval == C_OK)
2185                     sentinelEvent(LL_NOTICE,"+convert-to-slave",ri,"%@");
2186             }
2187         }
2188     }
2189 
2190     /* Handle slaves replicating to a different master address. */
2191     if ((ri->flags & SRI_SLAVE) &&
2192         role == SRI_SLAVE &&
2193         (ri->slave_master_port != ri->master->addr->port ||
2194          strcasecmp(ri->slave_master_host,ri->master->addr->ip)))
2195     {
2196         mstime_t wait_time = ri->master->failover_timeout;
2197 
2198         /* Make sure the master is sane before reconfiguring this instance
2199          * into a slave. */
2200         if (sentinelMasterLooksSane(ri->master) &&
2201             sentinelRedisInstanceNoDownFor(ri,wait_time) &&
2202             mstime() - ri->slave_conf_change_time > wait_time)
2203         {
2204             int retval = sentinelSendSlaveOf(ri,
2205                     ri->master->addr->ip,
2206                     ri->master->addr->port);
2207             if (retval == C_OK)
2208                 sentinelEvent(LL_NOTICE,"+fix-slave-config",ri,"%@");
2209         }
2210     }
2211 
2212     /* Detect if the slave that is in the process of being reconfigured
2213      * changed state. */
2214     if ((ri->flags & SRI_SLAVE) && role == SRI_SLAVE &&
2215         (ri->flags & (SRI_RECONF_SENT|SRI_RECONF_INPROG)))
2216     {
2217         /* SRI_RECONF_SENT -> SRI_RECONF_INPROG. */
2218         if ((ri->flags & SRI_RECONF_SENT) &&
2219             ri->slave_master_host &&
2220             strcmp(ri->slave_master_host,
2221                     ri->master->promoted_slave->addr->ip) == 0 &&
2222             ri->slave_master_port == ri->master->promoted_slave->addr->port)
2223         {
2224             ri->flags &= ~SRI_RECONF_SENT;
2225             ri->flags |= SRI_RECONF_INPROG;
2226             sentinelEvent(LL_NOTICE,"+slave-reconf-inprog",ri,"%@");
2227         }
2228 
2229         /* SRI_RECONF_INPROG -> SRI_RECONF_DONE */
2230         if ((ri->flags & SRI_RECONF_INPROG) &&
2231             ri->slave_master_link_status == SENTINEL_MASTER_LINK_STATUS_UP)
2232         {
2233             ri->flags &= ~SRI_RECONF_INPROG;
2234             ri->flags |= SRI_RECONF_DONE;
2235             sentinelEvent(LL_NOTICE,"+slave-reconf-done",ri,"%@");
2236         }
2237     }
2238 }
2239 
sentinelInfoReplyCallback(redisAsyncContext * c,void * reply,void * privdata)2240 void sentinelInfoReplyCallback(redisAsyncContext *c, void *reply, void *privdata) {
2241     sentinelRedisInstance *ri = privdata;
2242     instanceLink *link = c->data;
2243     redisReply *r;
2244 
2245     if (!reply || !link) return;
2246     link->pending_commands--;
2247     r = reply;
2248 
2249     if (r->type == REDIS_REPLY_STRING)
2250         sentinelRefreshInstanceInfo(ri,r->str);
2251 }
2252 
2253 /* Just discard the reply. We use this when we are not monitoring the return
2254  * value of the command but its effects directly. */
sentinelDiscardReplyCallback(redisAsyncContext * c,void * reply,void * privdata)2255 void sentinelDiscardReplyCallback(redisAsyncContext *c, void *reply, void *privdata) {
2256     instanceLink *link = c->data;
2257     UNUSED(reply);
2258     UNUSED(privdata);
2259 
2260     if (link) link->pending_commands--;
2261 }
2262 
sentinelPingReplyCallback(redisAsyncContext * c,void * reply,void * privdata)2263 void sentinelPingReplyCallback(redisAsyncContext *c, void *reply, void *privdata) {
2264     sentinelRedisInstance *ri = privdata;
2265     instanceLink *link = c->data;
2266     redisReply *r;
2267 
2268     if (!reply || !link) return;
2269     link->pending_commands--;
2270     r = reply;
2271 
2272     if (r->type == REDIS_REPLY_STATUS ||
2273         r->type == REDIS_REPLY_ERROR) {
2274         /* Update the "instance available" field only if this is an
2275          * acceptable reply. */
2276         if (strncmp(r->str,"PONG",4) == 0 ||
2277             strncmp(r->str,"LOADING",7) == 0 ||
2278             strncmp(r->str,"MASTERDOWN",10) == 0)
2279         {
2280             link->last_avail_time = mstime();
2281             link->act_ping_time = 0; /* Flag the pong as received. */
2282         } else {
2283             /* Send a SCRIPT KILL command if the instance appears to be
2284              * down because of a busy script. */
2285             if (strncmp(r->str,"BUSY",4) == 0 &&
2286                 (ri->flags & SRI_S_DOWN) &&
2287                 !(ri->flags & SRI_SCRIPT_KILL_SENT))
2288             {
2289                 if (redisAsyncCommand(ri->link->cc,
2290                         sentinelDiscardReplyCallback, ri,
2291                         "SCRIPT KILL") == C_OK)
2292                     ri->link->pending_commands++;
2293                 ri->flags |= SRI_SCRIPT_KILL_SENT;
2294             }
2295         }
2296     }
2297     link->last_pong_time = mstime();
2298 }
2299 
2300 /* This is called when we get the reply about the PUBLISH command we send
2301  * to the master to advertise this sentinel. */
sentinelPublishReplyCallback(redisAsyncContext * c,void * reply,void * privdata)2302 void sentinelPublishReplyCallback(redisAsyncContext *c, void *reply, void *privdata) {
2303     sentinelRedisInstance *ri = privdata;
2304     instanceLink *link = c->data;
2305     redisReply *r;
2306 
2307     if (!reply || !link) return;
2308     link->pending_commands--;
2309     r = reply;
2310 
2311     /* Only update pub_time if we actually published our message. Otherwise
2312      * we'll retry again in 100 milliseconds. */
2313     if (r->type != REDIS_REPLY_ERROR)
2314         ri->last_pub_time = mstime();
2315 }
2316 
2317 /* Process an hello message received via Pub/Sub in master or slave instance,
2318  * or sent directly to this sentinel via the (fake) PUBLISH command of Sentinel.
2319  *
2320  * If the master name specified in the message is not known, the message is
2321  * discarded. */
sentinelProcessHelloMessage(char * hello,int hello_len)2322 void sentinelProcessHelloMessage(char *hello, int hello_len) {
2323     /* Format is composed of 8 tokens:
2324      * 0=ip,1=port,2=runid,3=current_epoch,4=master_name,
2325      * 5=master_ip,6=master_port,7=master_config_epoch. */
2326     int numtokens, port, removed, master_port;
2327     uint64_t current_epoch, master_config_epoch;
2328     char **token = sdssplitlen(hello, hello_len, ",", 1, &numtokens);
2329     sentinelRedisInstance *si, *master;
2330 
2331     if (numtokens == 8) {
2332         /* Obtain a reference to the master this hello message is about */
2333         master = sentinelGetMasterByName(token[4]);
2334         if (!master) goto cleanup; /* Unknown master, skip the message. */
2335 
2336         /* First, try to see if we already have this sentinel. */
2337         port = atoi(token[1]);
2338         master_port = atoi(token[6]);
2339         si = getSentinelRedisInstanceByAddrAndRunID(
2340                         master->sentinels,token[0],port,token[2]);
2341         current_epoch = strtoull(token[3],NULL,10);
2342         master_config_epoch = strtoull(token[7],NULL,10);
2343 
2344         if (!si) {
2345             /* If not, remove all the sentinels that have the same runid
2346              * because there was an address change, and add the same Sentinel
2347              * with the new address back. */
2348             removed = removeMatchingSentinelFromMaster(master,token[2]);
2349             if (removed) {
2350                 sentinelEvent(LL_NOTICE,"+sentinel-address-switch",master,
2351                     "%@ ip %s port %d for %s", token[0],port,token[2]);
2352             } else {
2353                 /* Check if there is another Sentinel with the same address this
2354                  * new one is reporting. What we do if this happens is to set its
2355                  * port to 0, to signal the address is invalid. We'll update it
2356                  * later if we get an HELLO message. */
2357                 sentinelRedisInstance *other =
2358                     getSentinelRedisInstanceByAddrAndRunID(
2359                         master->sentinels, token[0],port,NULL);
2360                 if (other) {
2361                     sentinelEvent(LL_NOTICE,"+sentinel-invalid-addr",other,"%@");
2362                     other->addr->port = 0; /* It means: invalid address. */
2363                     sentinelUpdateSentinelAddressInAllMasters(other);
2364                 }
2365             }
2366 
2367             /* Add the new sentinel. */
2368             si = createSentinelRedisInstance(token[2],SRI_SENTINEL,
2369                             token[0],port,master->quorum,master);
2370 
2371             if (si) {
2372                 if (!removed) sentinelEvent(LL_NOTICE,"+sentinel",si,"%@");
2373                 /* The runid is NULL after a new instance creation and
2374                  * for Sentinels we don't have a later chance to fill it,
2375                  * so do it now. */
2376                 si->runid = sdsnew(token[2]);
2377                 sentinelTryConnectionSharing(si);
2378                 if (removed) sentinelUpdateSentinelAddressInAllMasters(si);
2379                 sentinelFlushConfig();
2380             }
2381         }
2382 
2383         /* Update local current_epoch if received current_epoch is greater.*/
2384         if (current_epoch > sentinel.current_epoch) {
2385             sentinel.current_epoch = current_epoch;
2386             sentinelFlushConfig();
2387             sentinelEvent(LL_WARNING,"+new-epoch",master,"%llu",
2388                 (unsigned long long) sentinel.current_epoch);
2389         }
2390 
2391         /* Update master info if received configuration is newer. */
2392         if (si && master->config_epoch < master_config_epoch) {
2393             master->config_epoch = master_config_epoch;
2394             if (master_port != master->addr->port ||
2395                 strcmp(master->addr->ip, token[5]))
2396             {
2397                 sentinelAddr *old_addr;
2398 
2399                 sentinelEvent(LL_WARNING,"+config-update-from",si,"%@");
2400                 sentinelEvent(LL_WARNING,"+switch-master",
2401                     master,"%s %s %d %s %d",
2402                     master->name,
2403                     master->addr->ip, master->addr->port,
2404                     token[5], master_port);
2405 
2406                 old_addr = dupSentinelAddr(master->addr);
2407                 sentinelResetMasterAndChangeAddress(master, token[5], master_port);
2408                 sentinelCallClientReconfScript(master,
2409                     SENTINEL_OBSERVER,"start",
2410                     old_addr,master->addr);
2411                 releaseSentinelAddr(old_addr);
2412             }
2413         }
2414 
2415         /* Update the state of the Sentinel. */
2416         if (si) si->last_hello_time = mstime();
2417     }
2418 
2419 cleanup:
2420     sdsfreesplitres(token,numtokens);
2421 }
2422 
2423 
2424 /* This is our Pub/Sub callback for the Hello channel. It's useful in order
2425  * to discover other sentinels attached at the same master. */
sentinelReceiveHelloMessages(redisAsyncContext * c,void * reply,void * privdata)2426 void sentinelReceiveHelloMessages(redisAsyncContext *c, void *reply, void *privdata) {
2427     sentinelRedisInstance *ri = privdata;
2428     redisReply *r;
2429     UNUSED(c);
2430 
2431     if (!reply || !ri) return;
2432     r = reply;
2433 
2434     /* Update the last activity in the pubsub channel. Note that since we
2435      * receive our messages as well this timestamp can be used to detect
2436      * if the link is probably disconnected even if it seems otherwise. */
2437     ri->link->pc_last_activity = mstime();
2438 
2439     /* Sanity check in the reply we expect, so that the code that follows
2440      * can avoid to check for details. */
2441     if (r->type != REDIS_REPLY_ARRAY ||
2442         r->elements != 3 ||
2443         r->element[0]->type != REDIS_REPLY_STRING ||
2444         r->element[1]->type != REDIS_REPLY_STRING ||
2445         r->element[2]->type != REDIS_REPLY_STRING ||
2446         strcmp(r->element[0]->str,"message") != 0) return;
2447 
2448     /* We are not interested in meeting ourselves */
2449     if (strstr(r->element[2]->str,sentinel.myid) != NULL) return;
2450 
2451     sentinelProcessHelloMessage(r->element[2]->str, r->element[2]->len);
2452 }
2453 
2454 /* Send an "Hello" message via Pub/Sub to the specified 'ri' Redis
2455  * instance in order to broadcast the current configuraiton for this
2456  * master, and to advertise the existence of this Sentinel at the same time.
2457  *
2458  * The message has the following format:
2459  *
2460  * sentinel_ip,sentinel_port,sentinel_runid,current_epoch,
2461  * master_name,master_ip,master_port,master_config_epoch.
2462  *
2463  * Returns C_OK if the PUBLISH was queued correctly, otherwise
2464  * C_ERR is returned. */
sentinelSendHello(sentinelRedisInstance * ri)2465 int sentinelSendHello(sentinelRedisInstance *ri) {
2466     char ip[NET_IP_STR_LEN];
2467     char payload[NET_IP_STR_LEN+1024];
2468     int retval;
2469     char *announce_ip;
2470     int announce_port;
2471     sentinelRedisInstance *master = (ri->flags & SRI_MASTER) ? ri : ri->master;
2472     sentinelAddr *master_addr = sentinelGetCurrentMasterAddress(master);
2473 
2474     if (ri->link->disconnected) return C_ERR;
2475 
2476     /* Use the specified announce address if specified, otherwise try to
2477      * obtain our own IP address. */
2478     if (sentinel.announce_ip) {
2479         announce_ip = sentinel.announce_ip;
2480     } else {
2481         if (anetSockName(ri->link->cc->c.fd,ip,sizeof(ip),NULL) == -1)
2482             return C_ERR;
2483         announce_ip = ip;
2484     }
2485     announce_port = sentinel.announce_port ?
2486                     sentinel.announce_port : server.port;
2487 
2488     /* Format and send the Hello message. */
2489     snprintf(payload,sizeof(payload),
2490         "%s,%d,%s,%llu," /* Info about this sentinel. */
2491         "%s,%s,%d,%llu", /* Info about current master. */
2492         announce_ip, announce_port, sentinel.myid,
2493         (unsigned long long) sentinel.current_epoch,
2494         /* --- */
2495         master->name,master_addr->ip,master_addr->port,
2496         (unsigned long long) master->config_epoch);
2497     retval = redisAsyncCommand(ri->link->cc,
2498         sentinelPublishReplyCallback, ri, "PUBLISH %s %s",
2499             SENTINEL_HELLO_CHANNEL,payload);
2500     if (retval != C_OK) return C_ERR;
2501     ri->link->pending_commands++;
2502     return C_OK;
2503 }
2504 
2505 /* Reset last_pub_time in all the instances in the specified dictionary
2506  * in order to force the delivery of an Hello update ASAP. */
sentinelForceHelloUpdateDictOfRedisInstances(dict * instances)2507 void sentinelForceHelloUpdateDictOfRedisInstances(dict *instances) {
2508     dictIterator *di;
2509     dictEntry *de;
2510 
2511     di = dictGetSafeIterator(instances);
2512     while((de = dictNext(di)) != NULL) {
2513         sentinelRedisInstance *ri = dictGetVal(de);
2514         if (ri->last_pub_time >= (SENTINEL_PUBLISH_PERIOD+1))
2515             ri->last_pub_time -= (SENTINEL_PUBLISH_PERIOD+1);
2516     }
2517     dictReleaseIterator(di);
2518 }
2519 
2520 /* This function forces the delivery of an "Hello" message (see
2521  * sentinelSendHello() top comment for further information) to all the Redis
2522  * and Sentinel instances related to the specified 'master'.
2523  *
2524  * It is technically not needed since we send an update to every instance
2525  * with a period of SENTINEL_PUBLISH_PERIOD milliseconds, however when a
2526  * Sentinel upgrades a configuration it is a good idea to deliever an update
2527  * to the other Sentinels ASAP. */
sentinelForceHelloUpdateForMaster(sentinelRedisInstance * master)2528 int sentinelForceHelloUpdateForMaster(sentinelRedisInstance *master) {
2529     if (!(master->flags & SRI_MASTER)) return C_ERR;
2530     if (master->last_pub_time >= (SENTINEL_PUBLISH_PERIOD+1))
2531         master->last_pub_time -= (SENTINEL_PUBLISH_PERIOD+1);
2532     sentinelForceHelloUpdateDictOfRedisInstances(master->sentinels);
2533     sentinelForceHelloUpdateDictOfRedisInstances(master->slaves);
2534     return C_OK;
2535 }
2536 
2537 /* Send a PING to the specified instance and refresh the act_ping_time
2538  * if it is zero (that is, if we received a pong for the previous ping).
2539  *
2540  * On error zero is returned, and we can't consider the PING command
2541  * queued in the connection. */
sentinelSendPing(sentinelRedisInstance * ri)2542 int sentinelSendPing(sentinelRedisInstance *ri) {
2543     int retval = redisAsyncCommand(ri->link->cc,
2544         sentinelPingReplyCallback, ri, "PING");
2545     if (retval == C_OK) {
2546         ri->link->pending_commands++;
2547         ri->link->last_ping_time = mstime();
2548         /* We update the active ping time only if we received the pong for
2549          * the previous ping, otherwise we are technically waiting since the
2550          * first ping that did not received a reply. */
2551         if (ri->link->act_ping_time == 0)
2552             ri->link->act_ping_time = ri->link->last_ping_time;
2553         return 1;
2554     } else {
2555         return 0;
2556     }
2557 }
2558 
2559 /* Send periodic PING, INFO, and PUBLISH to the Hello channel to
2560  * the specified master or slave instance. */
sentinelSendPeriodicCommands(sentinelRedisInstance * ri)2561 void sentinelSendPeriodicCommands(sentinelRedisInstance *ri) {
2562     mstime_t now = mstime();
2563     mstime_t info_period, ping_period;
2564     int retval;
2565 
2566     /* Return ASAP if we have already a PING or INFO already pending, or
2567      * in the case the instance is not properly connected. */
2568     if (ri->link->disconnected) return;
2569 
2570     /* For INFO, PING, PUBLISH that are not critical commands to send we
2571      * also have a limit of SENTINEL_MAX_PENDING_COMMANDS. We don't
2572      * want to use a lot of memory just because a link is not working
2573      * properly (note that anyway there is a redundant protection about this,
2574      * that is, the link will be disconnected and reconnected if a long
2575      * timeout condition is detected. */
2576     if (ri->link->pending_commands >=
2577         SENTINEL_MAX_PENDING_COMMANDS * ri->link->refcount) return;
2578 
2579     /* If this is a slave of a master in O_DOWN condition we start sending
2580      * it INFO every second, instead of the usual SENTINEL_INFO_PERIOD
2581      * period. In this state we want to closely monitor slaves in case they
2582      * are turned into masters by another Sentinel, or by the sysadmin.
2583      *
2584      * Similarly we monitor the INFO output more often if the slave reports
2585      * to be disconnected from the master, so that we can have a fresh
2586      * disconnection time figure. */
2587     if ((ri->flags & SRI_SLAVE) &&
2588         ((ri->master->flags & (SRI_O_DOWN|SRI_FAILOVER_IN_PROGRESS)) ||
2589          (ri->master_link_down_time != 0)))
2590     {
2591         info_period = 1000;
2592     } else {
2593         info_period = SENTINEL_INFO_PERIOD;
2594     }
2595 
2596     /* We ping instances every time the last received pong is older than
2597      * the configured 'down-after-milliseconds' time, but every second
2598      * anyway if 'down-after-milliseconds' is greater than 1 second. */
2599     ping_period = ri->down_after_period;
2600     if (ping_period > SENTINEL_PING_PERIOD) ping_period = SENTINEL_PING_PERIOD;
2601 
2602     if ((ri->flags & SRI_SENTINEL) == 0 &&
2603         (ri->info_refresh == 0 ||
2604         (now - ri->info_refresh) > info_period))
2605     {
2606         /* Send INFO to masters and slaves, not sentinels. */
2607         retval = redisAsyncCommand(ri->link->cc,
2608             sentinelInfoReplyCallback, ri, "INFO");
2609         if (retval == C_OK) ri->link->pending_commands++;
2610     } else if ((now - ri->link->last_pong_time) > ping_period &&
2611                (now - ri->link->last_ping_time) > ping_period/2) {
2612         /* Send PING to all the three kinds of instances. */
2613         sentinelSendPing(ri);
2614     } else if ((now - ri->last_pub_time) > SENTINEL_PUBLISH_PERIOD) {
2615         /* PUBLISH hello messages to all the three kinds of instances. */
2616         sentinelSendHello(ri);
2617     }
2618 }
2619 
2620 /* =========================== SENTINEL command ============================= */
2621 
sentinelFailoverStateStr(int state)2622 const char *sentinelFailoverStateStr(int state) {
2623     switch(state) {
2624     case SENTINEL_FAILOVER_STATE_NONE: return "none";
2625     case SENTINEL_FAILOVER_STATE_WAIT_START: return "wait_start";
2626     case SENTINEL_FAILOVER_STATE_SELECT_SLAVE: return "select_slave";
2627     case SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE: return "send_slaveof_noone";
2628     case SENTINEL_FAILOVER_STATE_WAIT_PROMOTION: return "wait_promotion";
2629     case SENTINEL_FAILOVER_STATE_RECONF_SLAVES: return "reconf_slaves";
2630     case SENTINEL_FAILOVER_STATE_UPDATE_CONFIG: return "update_config";
2631     default: return "unknown";
2632     }
2633 }
2634 
2635 /* Redis instance to Redis protocol representation. */
addReplySentinelRedisInstance(client * c,sentinelRedisInstance * ri)2636 void addReplySentinelRedisInstance(client *c, sentinelRedisInstance *ri) {
2637     char *flags = sdsempty();
2638     void *mbl;
2639     int fields = 0;
2640 
2641     mbl = addDeferredMultiBulkLength(c);
2642 
2643     addReplyBulkCString(c,"name");
2644     addReplyBulkCString(c,ri->name);
2645     fields++;
2646 
2647     addReplyBulkCString(c,"ip");
2648     addReplyBulkCString(c,ri->addr->ip);
2649     fields++;
2650 
2651     addReplyBulkCString(c,"port");
2652     addReplyBulkLongLong(c,ri->addr->port);
2653     fields++;
2654 
2655     addReplyBulkCString(c,"runid");
2656     addReplyBulkCString(c,ri->runid ? ri->runid : "");
2657     fields++;
2658 
2659     addReplyBulkCString(c,"flags");
2660     if (ri->flags & SRI_S_DOWN) flags = sdscat(flags,"s_down,");
2661     if (ri->flags & SRI_O_DOWN) flags = sdscat(flags,"o_down,");
2662     if (ri->flags & SRI_MASTER) flags = sdscat(flags,"master,");
2663     if (ri->flags & SRI_SLAVE) flags = sdscat(flags,"slave,");
2664     if (ri->flags & SRI_SENTINEL) flags = sdscat(flags,"sentinel,");
2665     if (ri->link->disconnected) flags = sdscat(flags,"disconnected,");
2666     if (ri->flags & SRI_MASTER_DOWN) flags = sdscat(flags,"master_down,");
2667     if (ri->flags & SRI_FAILOVER_IN_PROGRESS)
2668         flags = sdscat(flags,"failover_in_progress,");
2669     if (ri->flags & SRI_PROMOTED) flags = sdscat(flags,"promoted,");
2670     if (ri->flags & SRI_RECONF_SENT) flags = sdscat(flags,"reconf_sent,");
2671     if (ri->flags & SRI_RECONF_INPROG) flags = sdscat(flags,"reconf_inprog,");
2672     if (ri->flags & SRI_RECONF_DONE) flags = sdscat(flags,"reconf_done,");
2673 
2674     if (sdslen(flags) != 0) sdsrange(flags,0,-2); /* remove last "," */
2675     addReplyBulkCString(c,flags);
2676     sdsfree(flags);
2677     fields++;
2678 
2679     addReplyBulkCString(c,"link-pending-commands");
2680     addReplyBulkLongLong(c,ri->link->pending_commands);
2681     fields++;
2682 
2683     addReplyBulkCString(c,"link-refcount");
2684     addReplyBulkLongLong(c,ri->link->refcount);
2685     fields++;
2686 
2687     if (ri->flags & SRI_FAILOVER_IN_PROGRESS) {
2688         addReplyBulkCString(c,"failover-state");
2689         addReplyBulkCString(c,(char*)sentinelFailoverStateStr(ri->failover_state));
2690         fields++;
2691     }
2692 
2693     addReplyBulkCString(c,"last-ping-sent");
2694     addReplyBulkLongLong(c,
2695         ri->link->act_ping_time ? (mstime() - ri->link->act_ping_time) : 0);
2696     fields++;
2697 
2698     addReplyBulkCString(c,"last-ok-ping-reply");
2699     addReplyBulkLongLong(c,mstime() - ri->link->last_avail_time);
2700     fields++;
2701 
2702     addReplyBulkCString(c,"last-ping-reply");
2703     addReplyBulkLongLong(c,mstime() - ri->link->last_pong_time);
2704     fields++;
2705 
2706     if (ri->flags & SRI_S_DOWN) {
2707         addReplyBulkCString(c,"s-down-time");
2708         addReplyBulkLongLong(c,mstime()-ri->s_down_since_time);
2709         fields++;
2710     }
2711 
2712     if (ri->flags & SRI_O_DOWN) {
2713         addReplyBulkCString(c,"o-down-time");
2714         addReplyBulkLongLong(c,mstime()-ri->o_down_since_time);
2715         fields++;
2716     }
2717 
2718     addReplyBulkCString(c,"down-after-milliseconds");
2719     addReplyBulkLongLong(c,ri->down_after_period);
2720     fields++;
2721 
2722     /* Masters and Slaves */
2723     if (ri->flags & (SRI_MASTER|SRI_SLAVE)) {
2724         addReplyBulkCString(c,"info-refresh");
2725         addReplyBulkLongLong(c,mstime() - ri->info_refresh);
2726         fields++;
2727 
2728         addReplyBulkCString(c,"role-reported");
2729         addReplyBulkCString(c, (ri->role_reported == SRI_MASTER) ? "master" :
2730                                                                    "slave");
2731         fields++;
2732 
2733         addReplyBulkCString(c,"role-reported-time");
2734         addReplyBulkLongLong(c,mstime() - ri->role_reported_time);
2735         fields++;
2736     }
2737 
2738     /* Only masters */
2739     if (ri->flags & SRI_MASTER) {
2740         addReplyBulkCString(c,"config-epoch");
2741         addReplyBulkLongLong(c,ri->config_epoch);
2742         fields++;
2743 
2744         addReplyBulkCString(c,"num-slaves");
2745         addReplyBulkLongLong(c,dictSize(ri->slaves));
2746         fields++;
2747 
2748         addReplyBulkCString(c,"num-other-sentinels");
2749         addReplyBulkLongLong(c,dictSize(ri->sentinels));
2750         fields++;
2751 
2752         addReplyBulkCString(c,"quorum");
2753         addReplyBulkLongLong(c,ri->quorum);
2754         fields++;
2755 
2756         addReplyBulkCString(c,"failover-timeout");
2757         addReplyBulkLongLong(c,ri->failover_timeout);
2758         fields++;
2759 
2760         addReplyBulkCString(c,"parallel-syncs");
2761         addReplyBulkLongLong(c,ri->parallel_syncs);
2762         fields++;
2763 
2764         if (ri->notification_script) {
2765             addReplyBulkCString(c,"notification-script");
2766             addReplyBulkCString(c,ri->notification_script);
2767             fields++;
2768         }
2769 
2770         if (ri->client_reconfig_script) {
2771             addReplyBulkCString(c,"client-reconfig-script");
2772             addReplyBulkCString(c,ri->client_reconfig_script);
2773             fields++;
2774         }
2775     }
2776 
2777     /* Only slaves */
2778     if (ri->flags & SRI_SLAVE) {
2779         addReplyBulkCString(c,"master-link-down-time");
2780         addReplyBulkLongLong(c,ri->master_link_down_time);
2781         fields++;
2782 
2783         addReplyBulkCString(c,"master-link-status");
2784         addReplyBulkCString(c,
2785             (ri->slave_master_link_status == SENTINEL_MASTER_LINK_STATUS_UP) ?
2786             "ok" : "err");
2787         fields++;
2788 
2789         addReplyBulkCString(c,"master-host");
2790         addReplyBulkCString(c,
2791             ri->slave_master_host ? ri->slave_master_host : "?");
2792         fields++;
2793 
2794         addReplyBulkCString(c,"master-port");
2795         addReplyBulkLongLong(c,ri->slave_master_port);
2796         fields++;
2797 
2798         addReplyBulkCString(c,"slave-priority");
2799         addReplyBulkLongLong(c,ri->slave_priority);
2800         fields++;
2801 
2802         addReplyBulkCString(c,"slave-repl-offset");
2803         addReplyBulkLongLong(c,ri->slave_repl_offset);
2804         fields++;
2805     }
2806 
2807     /* Only sentinels */
2808     if (ri->flags & SRI_SENTINEL) {
2809         addReplyBulkCString(c,"last-hello-message");
2810         addReplyBulkLongLong(c,mstime() - ri->last_hello_time);
2811         fields++;
2812 
2813         addReplyBulkCString(c,"voted-leader");
2814         addReplyBulkCString(c,ri->leader ? ri->leader : "?");
2815         fields++;
2816 
2817         addReplyBulkCString(c,"voted-leader-epoch");
2818         addReplyBulkLongLong(c,ri->leader_epoch);
2819         fields++;
2820     }
2821 
2822     setDeferredMultiBulkLength(c,mbl,fields*2);
2823 }
2824 
2825 /* Output a number of instances contained inside a dictionary as
2826  * Redis protocol. */
addReplyDictOfRedisInstances(client * c,dict * instances)2827 void addReplyDictOfRedisInstances(client *c, dict *instances) {
2828     dictIterator *di;
2829     dictEntry *de;
2830 
2831     di = dictGetIterator(instances);
2832     addReplyMultiBulkLen(c,dictSize(instances));
2833     while((de = dictNext(di)) != NULL) {
2834         sentinelRedisInstance *ri = dictGetVal(de);
2835 
2836         addReplySentinelRedisInstance(c,ri);
2837     }
2838     dictReleaseIterator(di);
2839 }
2840 
2841 /* Lookup the named master into sentinel.masters.
2842  * If the master is not found reply to the client with an error and returns
2843  * NULL. */
sentinelGetMasterByNameOrReplyError(client * c,robj * name)2844 sentinelRedisInstance *sentinelGetMasterByNameOrReplyError(client *c,
2845                         robj *name)
2846 {
2847     sentinelRedisInstance *ri;
2848 
2849     ri = dictFetchValue(sentinel.masters,name->ptr);
2850     if (!ri) {
2851         addReplyError(c,"No such master with that name");
2852         return NULL;
2853     }
2854     return ri;
2855 }
2856 
2857 #define SENTINEL_ISQR_OK 0
2858 #define SENTINEL_ISQR_NOQUORUM (1<<0)
2859 #define SENTINEL_ISQR_NOAUTH (1<<1)
sentinelIsQuorumReachable(sentinelRedisInstance * master,int * usableptr)2860 int sentinelIsQuorumReachable(sentinelRedisInstance *master, int *usableptr) {
2861     dictIterator *di;
2862     dictEntry *de;
2863     int usable = 1; /* Number of usable Sentinels. Init to 1 to count myself. */
2864     int result = SENTINEL_ISQR_OK;
2865     int voters = dictSize(master->sentinels)+1; /* Known Sentinels + myself. */
2866 
2867     di = dictGetIterator(master->sentinels);
2868     while((de = dictNext(di)) != NULL) {
2869         sentinelRedisInstance *ri = dictGetVal(de);
2870 
2871         if (ri->flags & (SRI_S_DOWN|SRI_O_DOWN)) continue;
2872         usable++;
2873     }
2874     dictReleaseIterator(di);
2875 
2876     if (usable < (int)master->quorum) result |= SENTINEL_ISQR_NOQUORUM;
2877     if (usable < voters/2+1) result |= SENTINEL_ISQR_NOAUTH;
2878     if (usableptr) *usableptr = usable;
2879     return result;
2880 }
2881 
sentinelCommand(client * c)2882 void sentinelCommand(client *c) {
2883     if (!strcasecmp(c->argv[1]->ptr,"masters")) {
2884         /* SENTINEL MASTERS */
2885         if (c->argc != 2) goto numargserr;
2886         addReplyDictOfRedisInstances(c,sentinel.masters);
2887     } else if (!strcasecmp(c->argv[1]->ptr,"master")) {
2888         /* SENTINEL MASTER <name> */
2889         sentinelRedisInstance *ri;
2890 
2891         if (c->argc != 3) goto numargserr;
2892         if ((ri = sentinelGetMasterByNameOrReplyError(c,c->argv[2]))
2893             == NULL) return;
2894         addReplySentinelRedisInstance(c,ri);
2895     } else if (!strcasecmp(c->argv[1]->ptr,"slaves")) {
2896         /* SENTINEL SLAVES <master-name> */
2897         sentinelRedisInstance *ri;
2898 
2899         if (c->argc != 3) goto numargserr;
2900         if ((ri = sentinelGetMasterByNameOrReplyError(c,c->argv[2])) == NULL)
2901             return;
2902         addReplyDictOfRedisInstances(c,ri->slaves);
2903     } else if (!strcasecmp(c->argv[1]->ptr,"sentinels")) {
2904         /* SENTINEL SENTINELS <master-name> */
2905         sentinelRedisInstance *ri;
2906 
2907         if (c->argc != 3) goto numargserr;
2908         if ((ri = sentinelGetMasterByNameOrReplyError(c,c->argv[2])) == NULL)
2909             return;
2910         addReplyDictOfRedisInstances(c,ri->sentinels);
2911     } else if (!strcasecmp(c->argv[1]->ptr,"is-master-down-by-addr")) {
2912         /* SENTINEL IS-MASTER-DOWN-BY-ADDR <ip> <port> <current-epoch> <runid>
2913          *
2914          * Arguments:
2915          *
2916          * ip and port are the ip and port of the master we want to be
2917          * checked by Sentinel. Note that the command will not check by
2918          * name but just by master, in theory different Sentinels may monitor
2919          * differnet masters with the same name.
2920          *
2921          * current-epoch is needed in order to understand if we are allowed
2922          * to vote for a failover leader or not. Each Sentinel can vote just
2923          * one time per epoch.
2924          *
2925          * runid is "*" if we are not seeking for a vote from the Sentinel
2926          * in order to elect the failover leader. Otherwise it is set to the
2927          * runid we want the Sentinel to vote if it did not already voted.
2928          */
2929         sentinelRedisInstance *ri;
2930         long long req_epoch;
2931         uint64_t leader_epoch = 0;
2932         char *leader = NULL;
2933         long port;
2934         int isdown = 0;
2935 
2936         if (c->argc != 6) goto numargserr;
2937         if (getLongFromObjectOrReply(c,c->argv[3],&port,NULL) != C_OK ||
2938             getLongLongFromObjectOrReply(c,c->argv[4],&req_epoch,NULL)
2939                                                               != C_OK)
2940             return;
2941         ri = getSentinelRedisInstanceByAddrAndRunID(sentinel.masters,
2942             c->argv[2]->ptr,port,NULL);
2943 
2944         /* It exists? Is actually a master? Is subjectively down? It's down.
2945          * Note: if we are in tilt mode we always reply with "0". */
2946         if (!sentinel.tilt && ri && (ri->flags & SRI_S_DOWN) &&
2947                                     (ri->flags & SRI_MASTER))
2948             isdown = 1;
2949 
2950         /* Vote for the master (or fetch the previous vote) if the request
2951          * includes a runid, otherwise the sender is not seeking for a vote. */
2952         if (ri && ri->flags & SRI_MASTER && strcasecmp(c->argv[5]->ptr,"*")) {
2953             leader = sentinelVoteLeader(ri,(uint64_t)req_epoch,
2954                                             c->argv[5]->ptr,
2955                                             &leader_epoch);
2956         }
2957 
2958         /* Reply with a three-elements multi-bulk reply:
2959          * down state, leader, vote epoch. */
2960         addReplyMultiBulkLen(c,3);
2961         addReply(c, isdown ? shared.cone : shared.czero);
2962         addReplyBulkCString(c, leader ? leader : "*");
2963         addReplyLongLong(c, (long long)leader_epoch);
2964         if (leader) sdsfree(leader);
2965     } else if (!strcasecmp(c->argv[1]->ptr,"reset")) {
2966         /* SENTINEL RESET <pattern> */
2967         if (c->argc != 3) goto numargserr;
2968         addReplyLongLong(c,sentinelResetMastersByPattern(c->argv[2]->ptr,SENTINEL_GENERATE_EVENT));
2969     } else if (!strcasecmp(c->argv[1]->ptr,"get-master-addr-by-name")) {
2970         /* SENTINEL GET-MASTER-ADDR-BY-NAME <master-name> */
2971         sentinelRedisInstance *ri;
2972 
2973         if (c->argc != 3) goto numargserr;
2974         ri = sentinelGetMasterByName(c->argv[2]->ptr);
2975         if (ri == NULL) {
2976             addReply(c,shared.nullmultibulk);
2977         } else {
2978             sentinelAddr *addr = sentinelGetCurrentMasterAddress(ri);
2979 
2980             addReplyMultiBulkLen(c,2);
2981             addReplyBulkCString(c,addr->ip);
2982             addReplyBulkLongLong(c,addr->port);
2983         }
2984     } else if (!strcasecmp(c->argv[1]->ptr,"failover")) {
2985         /* SENTINEL FAILOVER <master-name> */
2986         sentinelRedisInstance *ri;
2987 
2988         if (c->argc != 3) goto numargserr;
2989         if ((ri = sentinelGetMasterByNameOrReplyError(c,c->argv[2])) == NULL)
2990             return;
2991         if (ri->flags & SRI_FAILOVER_IN_PROGRESS) {
2992             addReplySds(c,sdsnew("-INPROG Failover already in progress\r\n"));
2993             return;
2994         }
2995         if (sentinelSelectSlave(ri) == NULL) {
2996             addReplySds(c,sdsnew("-NOGOODSLAVE No suitable slave to promote\r\n"));
2997             return;
2998         }
2999         serverLog(LL_WARNING,"Executing user requested FAILOVER of '%s'",
3000             ri->name);
3001         sentinelStartFailover(ri);
3002         ri->flags |= SRI_FORCE_FAILOVER;
3003         addReply(c,shared.ok);
3004     } else if (!strcasecmp(c->argv[1]->ptr,"pending-scripts")) {
3005         /* SENTINEL PENDING-SCRIPTS */
3006 
3007         if (c->argc != 2) goto numargserr;
3008         sentinelPendingScriptsCommand(c);
3009     } else if (!strcasecmp(c->argv[1]->ptr,"monitor")) {
3010         /* SENTINEL MONITOR <name> <ip> <port> <quorum> */
3011         sentinelRedisInstance *ri;
3012         long quorum, port;
3013         char ip[NET_IP_STR_LEN];
3014 
3015         if (c->argc != 6) goto numargserr;
3016         if (getLongFromObjectOrReply(c,c->argv[5],&quorum,"Invalid quorum")
3017             != C_OK) return;
3018         if (getLongFromObjectOrReply(c,c->argv[4],&port,"Invalid port")
3019             != C_OK) return;
3020 
3021         if (quorum <= 0) {
3022             addReplyError(c, "Quorum must be 1 or greater.");
3023             return;
3024         }
3025 
3026         /* Make sure the IP field is actually a valid IP before passing it
3027          * to createSentinelRedisInstance(), otherwise we may trigger a
3028          * DNS lookup at runtime. */
3029         if (anetResolveIP(NULL,c->argv[3]->ptr,ip,sizeof(ip)) == ANET_ERR) {
3030             addReplyError(c,"Invalid IP address specified");
3031             return;
3032         }
3033 
3034         /* Parameters are valid. Try to create the master instance. */
3035         ri = createSentinelRedisInstance(c->argv[2]->ptr,SRI_MASTER,
3036                 c->argv[3]->ptr,port,quorum,NULL);
3037         if (ri == NULL) {
3038             switch(errno) {
3039             case EBUSY:
3040                 addReplyError(c,"Duplicated master name");
3041                 break;
3042             case EINVAL:
3043                 addReplyError(c,"Invalid port number");
3044                 break;
3045             default:
3046                 addReplyError(c,"Unspecified error adding the instance");
3047                 break;
3048             }
3049         } else {
3050             sentinelFlushConfig();
3051             sentinelEvent(LL_WARNING,"+monitor",ri,"%@ quorum %d",ri->quorum);
3052             addReply(c,shared.ok);
3053         }
3054     } else if (!strcasecmp(c->argv[1]->ptr,"flushconfig")) {
3055         if (c->argc != 2) goto numargserr;
3056         sentinelFlushConfig();
3057         addReply(c,shared.ok);
3058         return;
3059     } else if (!strcasecmp(c->argv[1]->ptr,"remove")) {
3060         /* SENTINEL REMOVE <name> */
3061         sentinelRedisInstance *ri;
3062 
3063         if (c->argc != 3) goto numargserr;
3064         if ((ri = sentinelGetMasterByNameOrReplyError(c,c->argv[2]))
3065             == NULL) return;
3066         sentinelEvent(LL_WARNING,"-monitor",ri,"%@");
3067         dictDelete(sentinel.masters,c->argv[2]->ptr);
3068         sentinelFlushConfig();
3069         addReply(c,shared.ok);
3070     } else if (!strcasecmp(c->argv[1]->ptr,"ckquorum")) {
3071         /* SENTINEL CKQUORUM <name> */
3072         sentinelRedisInstance *ri;
3073         int usable;
3074 
3075         if (c->argc != 3) goto numargserr;
3076         if ((ri = sentinelGetMasterByNameOrReplyError(c,c->argv[2]))
3077             == NULL) return;
3078         int result = sentinelIsQuorumReachable(ri,&usable);
3079         if (result == SENTINEL_ISQR_OK) {
3080             addReplySds(c, sdscatfmt(sdsempty(),
3081                 "+OK %i usable Sentinels. Quorum and failover authorization "
3082                 "can be reached\r\n",usable));
3083         } else {
3084             sds e = sdscatfmt(sdsempty(),
3085                 "-NOQUORUM %i usable Sentinels. ",usable);
3086             if (result & SENTINEL_ISQR_NOQUORUM)
3087                 e = sdscat(e,"Not enough available Sentinels to reach the"
3088                              " specified quorum for this master");
3089             if (result & SENTINEL_ISQR_NOAUTH) {
3090                 if (result & SENTINEL_ISQR_NOQUORUM) e = sdscat(e,". ");
3091                 e = sdscat(e, "Not enough available Sentinels to reach the"
3092                               " majority and authorize a failover");
3093             }
3094             e = sdscat(e,"\r\n");
3095             addReplySds(c,e);
3096         }
3097     } else if (!strcasecmp(c->argv[1]->ptr,"set")) {
3098         if (c->argc < 3 || c->argc % 2 == 0) goto numargserr;
3099         sentinelSetCommand(c);
3100     } else if (!strcasecmp(c->argv[1]->ptr,"info-cache")) {
3101         /* SENTINEL INFO-CACHE <name> */
3102         if (c->argc < 2) goto numargserr;
3103         mstime_t now = mstime();
3104 
3105         /* Create an ad-hoc dictionary type so that we can iterate
3106          * a dictionary composed of just the master groups the user
3107          * requested. */
3108         dictType copy_keeper = instancesDictType;
3109         copy_keeper.valDestructor = NULL;
3110         dict *masters_local = sentinel.masters;
3111         if (c->argc > 2) {
3112             masters_local = dictCreate(&copy_keeper, NULL);
3113 
3114             for (int i = 2; i < c->argc; i++) {
3115                 sentinelRedisInstance *ri;
3116                 ri = sentinelGetMasterByName(c->argv[i]->ptr);
3117                 if (!ri) continue; /* ignore non-existing names */
3118                 dictAdd(masters_local, ri->name, ri);
3119             }
3120         }
3121 
3122         /* Reply format:
3123          *   1.) master name
3124          *   2.) 1.) info from master
3125          *       2.) info from replica
3126          *       ...
3127          *   3.) other master name
3128          *   ...
3129          */
3130         addReplyMultiBulkLen(c,dictSize(masters_local) * 2);
3131 
3132         dictIterator  *di;
3133         dictEntry *de;
3134         di = dictGetIterator(masters_local);
3135         while ((de = dictNext(di)) != NULL) {
3136             sentinelRedisInstance *ri = dictGetVal(de);
3137             addReplyBulkCBuffer(c,ri->name,strlen(ri->name));
3138             addReplyMultiBulkLen(c,dictSize(ri->slaves) + 1); /* +1 for self */
3139             addReplyMultiBulkLen(c,2);
3140             addReplyLongLong(c, now - ri->info_refresh);
3141             if (ri->info)
3142                 addReplyBulkCBuffer(c,ri->info,sdslen(ri->info));
3143             else
3144                 addReply(c,shared.nullbulk);
3145 
3146             dictIterator *sdi;
3147             dictEntry *sde;
3148             sdi = dictGetIterator(ri->slaves);
3149             while ((sde = dictNext(sdi)) != NULL) {
3150                 sentinelRedisInstance *sri = dictGetVal(sde);
3151                 addReplyMultiBulkLen(c,2);
3152                 addReplyLongLong(c, now - sri->info_refresh);
3153                 if (sri->info)
3154                     addReplyBulkCBuffer(c,sri->info,sdslen(sri->info));
3155                 else
3156                     addReply(c,shared.nullbulk);
3157             }
3158             dictReleaseIterator(sdi);
3159         }
3160         dictReleaseIterator(di);
3161         if (masters_local != sentinel.masters) dictRelease(masters_local);
3162     } else if (!strcasecmp(c->argv[1]->ptr,"simulate-failure")) {
3163         /* SENTINEL SIMULATE-FAILURE <flag> <flag> ... <flag> */
3164         int j;
3165 
3166         sentinel.simfailure_flags = SENTINEL_SIMFAILURE_NONE;
3167         for (j = 2; j < c->argc; j++) {
3168             if (!strcasecmp(c->argv[j]->ptr,"crash-after-election")) {
3169                 sentinel.simfailure_flags |=
3170                     SENTINEL_SIMFAILURE_CRASH_AFTER_ELECTION;
3171                 serverLog(LL_WARNING,"Failure simulation: this Sentinel "
3172                     "will crash after being successfully elected as failover "
3173                     "leader");
3174             } else if (!strcasecmp(c->argv[j]->ptr,"crash-after-promotion")) {
3175                 sentinel.simfailure_flags |=
3176                     SENTINEL_SIMFAILURE_CRASH_AFTER_PROMOTION;
3177                 serverLog(LL_WARNING,"Failure simulation: this Sentinel "
3178                     "will crash after promoting the selected slave to master");
3179             } else if (!strcasecmp(c->argv[j]->ptr,"help")) {
3180                 addReplyMultiBulkLen(c,2);
3181                 addReplyBulkCString(c,"crash-after-election");
3182                 addReplyBulkCString(c,"crash-after-promotion");
3183             } else {
3184                 addReplyError(c,"Unknown failure simulation specified");
3185                 return;
3186             }
3187         }
3188         addReply(c,shared.ok);
3189     } else {
3190         addReplyErrorFormat(c,"Unknown sentinel subcommand '%s'",
3191                                (char*)c->argv[1]->ptr);
3192     }
3193     return;
3194 
3195 numargserr:
3196     addReplyErrorFormat(c,"Wrong number of arguments for 'sentinel %s'",
3197                           (char*)c->argv[1]->ptr);
3198 }
3199 
3200 #define info_section_from_redis(section_name) do { \
3201     if (defsections || allsections || !strcasecmp(section,section_name)) { \
3202         sds redissection; \
3203         if (sections++) info = sdscat(info,"\r\n"); \
3204         redissection = genRedisInfoString(section_name); \
3205         info = sdscatlen(info,redissection,sdslen(redissection)); \
3206         sdsfree(redissection); \
3207     } \
3208 } while(0)
3209 
3210 /* SENTINEL INFO [section] */
sentinelInfoCommand(client * c)3211 void sentinelInfoCommand(client *c) {
3212     if (c->argc > 2) {
3213         addReply(c,shared.syntaxerr);
3214         return;
3215     }
3216 
3217     int defsections = 0, allsections = 0;
3218     char *section = c->argc == 2 ? c->argv[1]->ptr : NULL;
3219     if (section) {
3220         allsections = !strcasecmp(section,"all");
3221         defsections = !strcasecmp(section,"default");
3222     } else {
3223         defsections = 1;
3224     }
3225 
3226     int sections = 0;
3227     sds info = sdsempty();
3228 
3229     info_section_from_redis("server");
3230     info_section_from_redis("clients");
3231     info_section_from_redis("cpu");
3232     info_section_from_redis("stats");
3233 
3234     if (defsections || allsections || !strcasecmp(section,"sentinel")) {
3235         dictIterator *di;
3236         dictEntry *de;
3237         int master_id = 0;
3238 
3239         if (sections++) info = sdscat(info,"\r\n");
3240         info = sdscatprintf(info,
3241             "# Sentinel\r\n"
3242             "sentinel_masters:%lu\r\n"
3243             "sentinel_tilt:%d\r\n"
3244             "sentinel_running_scripts:%d\r\n"
3245             "sentinel_scripts_queue_length:%ld\r\n"
3246             "sentinel_simulate_failure_flags:%lu\r\n",
3247             dictSize(sentinel.masters),
3248             sentinel.tilt,
3249             sentinel.running_scripts,
3250             listLength(sentinel.scripts_queue),
3251             sentinel.simfailure_flags);
3252 
3253         di = dictGetIterator(sentinel.masters);
3254         while((de = dictNext(di)) != NULL) {
3255             sentinelRedisInstance *ri = dictGetVal(de);
3256             char *status = "ok";
3257 
3258             if (ri->flags & SRI_O_DOWN) status = "odown";
3259             else if (ri->flags & SRI_S_DOWN) status = "sdown";
3260             info = sdscatprintf(info,
3261                 "master%d:name=%s,status=%s,address=%s:%d,"
3262                 "slaves=%lu,sentinels=%lu\r\n",
3263                 master_id++, ri->name, status,
3264                 ri->addr->ip, ri->addr->port,
3265                 dictSize(ri->slaves),
3266                 dictSize(ri->sentinels)+1);
3267         }
3268         dictReleaseIterator(di);
3269     }
3270 
3271     addReplyBulkSds(c, info);
3272 }
3273 
3274 /* Implements Sentinel verison of the ROLE command. The output is
3275  * "sentinel" and the list of currently monitored master names. */
sentinelRoleCommand(client * c)3276 void sentinelRoleCommand(client *c) {
3277     dictIterator *di;
3278     dictEntry *de;
3279 
3280     addReplyMultiBulkLen(c,2);
3281     addReplyBulkCBuffer(c,"sentinel",8);
3282     addReplyMultiBulkLen(c,dictSize(sentinel.masters));
3283 
3284     di = dictGetIterator(sentinel.masters);
3285     while((de = dictNext(di)) != NULL) {
3286         sentinelRedisInstance *ri = dictGetVal(de);
3287 
3288         addReplyBulkCString(c,ri->name);
3289     }
3290     dictReleaseIterator(di);
3291 }
3292 
3293 /* SENTINEL SET <mastername> [<option> <value> ...] */
sentinelSetCommand(client * c)3294 void sentinelSetCommand(client *c) {
3295     sentinelRedisInstance *ri;
3296     int j, changes = 0;
3297     char *option, *value;
3298 
3299     if ((ri = sentinelGetMasterByNameOrReplyError(c,c->argv[2]))
3300         == NULL) return;
3301 
3302     /* Process option - value pairs. */
3303     for (j = 3; j < c->argc; j += 2) {
3304         option = c->argv[j]->ptr;
3305         value = c->argv[j+1]->ptr;
3306         robj *o = c->argv[j+1];
3307         long long ll;
3308 
3309         if (!strcasecmp(option,"down-after-milliseconds")) {
3310             /* down-after-millisecodns <milliseconds> */
3311             if (getLongLongFromObject(o,&ll) == C_ERR || ll <= 0)
3312                 goto badfmt;
3313             ri->down_after_period = ll;
3314             sentinelPropagateDownAfterPeriod(ri);
3315             changes++;
3316         } else if (!strcasecmp(option,"failover-timeout")) {
3317             /* failover-timeout <milliseconds> */
3318             if (getLongLongFromObject(o,&ll) == C_ERR || ll <= 0)
3319                 goto badfmt;
3320             ri->failover_timeout = ll;
3321             changes++;
3322        } else if (!strcasecmp(option,"parallel-syncs")) {
3323             /* parallel-syncs <milliseconds> */
3324             if (getLongLongFromObject(o,&ll) == C_ERR || ll <= 0)
3325                 goto badfmt;
3326             ri->parallel_syncs = ll;
3327             changes++;
3328        } else if (!strcasecmp(option,"notification-script")) {
3329             /* notification-script <path> */
3330             if (strlen(value) && access(value,X_OK) == -1) {
3331                 addReplyError(c,
3332                     "Notification script seems non existing or non executable");
3333                 if (changes) sentinelFlushConfig();
3334                 return;
3335             }
3336             sdsfree(ri->notification_script);
3337             ri->notification_script = strlen(value) ? sdsnew(value) : NULL;
3338             changes++;
3339        } else if (!strcasecmp(option,"client-reconfig-script")) {
3340             /* client-reconfig-script <path> */
3341             if (strlen(value) && access(value,X_OK) == -1) {
3342                 addReplyError(c,
3343                     "Client reconfiguration script seems non existing or "
3344                     "non executable");
3345                 if (changes) sentinelFlushConfig();
3346                 return;
3347             }
3348             sdsfree(ri->client_reconfig_script);
3349             ri->client_reconfig_script = strlen(value) ? sdsnew(value) : NULL;
3350             changes++;
3351        } else if (!strcasecmp(option,"auth-pass")) {
3352             /* auth-pass <password> */
3353             sdsfree(ri->auth_pass);
3354             ri->auth_pass = strlen(value) ? sdsnew(value) : NULL;
3355             changes++;
3356        } else if (!strcasecmp(option,"quorum")) {
3357             /* quorum <count> */
3358             if (getLongLongFromObject(o,&ll) == C_ERR || ll <= 0)
3359                 goto badfmt;
3360             ri->quorum = ll;
3361             changes++;
3362         } else {
3363             addReplyErrorFormat(c,"Unknown option '%s' for SENTINEL SET",
3364                 option);
3365             if (changes) sentinelFlushConfig();
3366             return;
3367         }
3368         sentinelEvent(LL_WARNING,"+set",ri,"%@ %s %s",option,value);
3369     }
3370 
3371     if (changes) sentinelFlushConfig();
3372     addReply(c,shared.ok);
3373     return;
3374 
3375 badfmt: /* Bad format errors */
3376     if (changes) sentinelFlushConfig();
3377     addReplyErrorFormat(c,"Invalid argument '%s' for SENTINEL SET '%s'",
3378             value, option);
3379 }
3380 
3381 /* Our fake PUBLISH command: it is actually useful only to receive hello messages
3382  * from the other sentinel instances, and publishing to a channel other than
3383  * SENTINEL_HELLO_CHANNEL is forbidden.
3384  *
3385  * Because we have a Sentinel PUBLISH, the code to send hello messages is the same
3386  * for all the three kind of instances: masters, slaves, sentinels. */
sentinelPublishCommand(client * c)3387 void sentinelPublishCommand(client *c) {
3388     if (strcmp(c->argv[1]->ptr,SENTINEL_HELLO_CHANNEL)) {
3389         addReplyError(c, "Only HELLO messages are accepted by Sentinel instances.");
3390         return;
3391     }
3392     sentinelProcessHelloMessage(c->argv[2]->ptr,sdslen(c->argv[2]->ptr));
3393     addReplyLongLong(c,1);
3394 }
3395 
3396 /* ===================== SENTINEL availability checks ======================= */
3397 
3398 /* Is this instance down from our point of view? */
sentinelCheckSubjectivelyDown(sentinelRedisInstance * ri)3399 void sentinelCheckSubjectivelyDown(sentinelRedisInstance *ri) {
3400     mstime_t elapsed = 0;
3401 
3402     if (ri->link->act_ping_time)
3403         elapsed = mstime() - ri->link->act_ping_time;
3404     else if (ri->link->disconnected)
3405         elapsed = mstime() - ri->link->last_avail_time;
3406 
3407     /* Check if we are in need for a reconnection of one of the
3408      * links, because we are detecting low activity.
3409      *
3410      * 1) Check if the command link seems connected, was connected not less
3411      *    than SENTINEL_MIN_LINK_RECONNECT_PERIOD, but still we have a
3412      *    pending ping for more than half the timeout. */
3413     if (ri->link->cc &&
3414         (mstime() - ri->link->cc_conn_time) >
3415         SENTINEL_MIN_LINK_RECONNECT_PERIOD &&
3416         ri->link->act_ping_time != 0 && /* Ther is a pending ping... */
3417         /* The pending ping is delayed, and we did not received
3418          * error replies as well. */
3419         (mstime() - ri->link->act_ping_time) > (ri->down_after_period/2) &&
3420         (mstime() - ri->link->last_pong_time) > (ri->down_after_period/2))
3421     {
3422         instanceLinkCloseConnection(ri->link,ri->link->cc);
3423     }
3424 
3425     /* 2) Check if the pubsub link seems connected, was connected not less
3426      *    than SENTINEL_MIN_LINK_RECONNECT_PERIOD, but still we have no
3427      *    activity in the Pub/Sub channel for more than
3428      *    SENTINEL_PUBLISH_PERIOD * 3.
3429      */
3430     if (ri->link->pc &&
3431         (mstime() - ri->link->pc_conn_time) >
3432          SENTINEL_MIN_LINK_RECONNECT_PERIOD &&
3433         (mstime() - ri->link->pc_last_activity) > (SENTINEL_PUBLISH_PERIOD*3))
3434     {
3435         instanceLinkCloseConnection(ri->link,ri->link->pc);
3436     }
3437 
3438     /* Update the SDOWN flag. We believe the instance is SDOWN if:
3439      *
3440      * 1) It is not replying.
3441      * 2) We believe it is a master, it reports to be a slave for enough time
3442      *    to meet the down_after_period, plus enough time to get two times
3443      *    INFO report from the instance. */
3444     if (elapsed > ri->down_after_period ||
3445         (ri->flags & SRI_MASTER &&
3446          ri->role_reported == SRI_SLAVE &&
3447          mstime() - ri->role_reported_time >
3448           (ri->down_after_period+SENTINEL_INFO_PERIOD*2)))
3449     {
3450         /* Is subjectively down */
3451         if ((ri->flags & SRI_S_DOWN) == 0) {
3452             sentinelEvent(LL_WARNING,"+sdown",ri,"%@");
3453             ri->s_down_since_time = mstime();
3454             ri->flags |= SRI_S_DOWN;
3455         }
3456     } else {
3457         /* Is subjectively up */
3458         if (ri->flags & SRI_S_DOWN) {
3459             sentinelEvent(LL_WARNING,"-sdown",ri,"%@");
3460             ri->flags &= ~(SRI_S_DOWN|SRI_SCRIPT_KILL_SENT);
3461         }
3462     }
3463 }
3464 
3465 /* Is this instance down according to the configured quorum?
3466  *
3467  * Note that ODOWN is a weak quorum, it only means that enough Sentinels
3468  * reported in a given time range that the instance was not reachable.
3469  * However messages can be delayed so there are no strong guarantees about
3470  * N instances agreeing at the same time about the down state. */
sentinelCheckObjectivelyDown(sentinelRedisInstance * master)3471 void sentinelCheckObjectivelyDown(sentinelRedisInstance *master) {
3472     dictIterator *di;
3473     dictEntry *de;
3474     unsigned int quorum = 0, odown = 0;
3475 
3476     if (master->flags & SRI_S_DOWN) {
3477         /* Is down for enough sentinels? */
3478         quorum = 1; /* the current sentinel. */
3479         /* Count all the other sentinels. */
3480         di = dictGetIterator(master->sentinels);
3481         while((de = dictNext(di)) != NULL) {
3482             sentinelRedisInstance *ri = dictGetVal(de);
3483 
3484             if (ri->flags & SRI_MASTER_DOWN) quorum++;
3485         }
3486         dictReleaseIterator(di);
3487         if (quorum >= master->quorum) odown = 1;
3488     }
3489 
3490     /* Set the flag accordingly to the outcome. */
3491     if (odown) {
3492         if ((master->flags & SRI_O_DOWN) == 0) {
3493             sentinelEvent(LL_WARNING,"+odown",master,"%@ #quorum %d/%d",
3494                 quorum, master->quorum);
3495             master->flags |= SRI_O_DOWN;
3496             master->o_down_since_time = mstime();
3497         }
3498     } else {
3499         if (master->flags & SRI_O_DOWN) {
3500             sentinelEvent(LL_WARNING,"-odown",master,"%@");
3501             master->flags &= ~SRI_O_DOWN;
3502         }
3503     }
3504 }
3505 
3506 /* Receive the SENTINEL is-master-down-by-addr reply, see the
3507  * sentinelAskMasterStateToOtherSentinels() function for more information. */
sentinelReceiveIsMasterDownReply(redisAsyncContext * c,void * reply,void * privdata)3508 void sentinelReceiveIsMasterDownReply(redisAsyncContext *c, void *reply, void *privdata) {
3509     sentinelRedisInstance *ri = privdata;
3510     instanceLink *link = c->data;
3511     redisReply *r;
3512 
3513     if (!reply || !link) return;
3514     link->pending_commands--;
3515     r = reply;
3516 
3517     /* Ignore every error or unexpected reply.
3518      * Note that if the command returns an error for any reason we'll
3519      * end clearing the SRI_MASTER_DOWN flag for timeout anyway. */
3520     if (r->type == REDIS_REPLY_ARRAY && r->elements == 3 &&
3521         r->element[0]->type == REDIS_REPLY_INTEGER &&
3522         r->element[1]->type == REDIS_REPLY_STRING &&
3523         r->element[2]->type == REDIS_REPLY_INTEGER)
3524     {
3525         ri->last_master_down_reply_time = mstime();
3526         if (r->element[0]->integer == 1) {
3527             ri->flags |= SRI_MASTER_DOWN;
3528         } else {
3529             ri->flags &= ~SRI_MASTER_DOWN;
3530         }
3531         if (strcmp(r->element[1]->str,"*")) {
3532             /* If the runid in the reply is not "*" the Sentinel actually
3533              * replied with a vote. */
3534             sdsfree(ri->leader);
3535             if ((long long)ri->leader_epoch != r->element[2]->integer)
3536                 serverLog(LL_WARNING,
3537                     "%s voted for %s %llu", ri->name,
3538                     r->element[1]->str,
3539                     (unsigned long long) r->element[2]->integer);
3540             ri->leader = sdsnew(r->element[1]->str);
3541             ri->leader_epoch = r->element[2]->integer;
3542         }
3543     }
3544 }
3545 
3546 /* If we think the master is down, we start sending
3547  * SENTINEL IS-MASTER-DOWN-BY-ADDR requests to other sentinels
3548  * in order to get the replies that allow to reach the quorum
3549  * needed to mark the master in ODOWN state and trigger a failover. */
3550 #define SENTINEL_ASK_FORCED (1<<0)
sentinelAskMasterStateToOtherSentinels(sentinelRedisInstance * master,int flags)3551 void sentinelAskMasterStateToOtherSentinels(sentinelRedisInstance *master, int flags) {
3552     dictIterator *di;
3553     dictEntry *de;
3554 
3555     di = dictGetIterator(master->sentinels);
3556     while((de = dictNext(di)) != NULL) {
3557         sentinelRedisInstance *ri = dictGetVal(de);
3558         mstime_t elapsed = mstime() - ri->last_master_down_reply_time;
3559         char port[32];
3560         int retval;
3561 
3562         /* If the master state from other sentinel is too old, we clear it. */
3563         if (elapsed > SENTINEL_ASK_PERIOD*5) {
3564             ri->flags &= ~SRI_MASTER_DOWN;
3565             sdsfree(ri->leader);
3566             ri->leader = NULL;
3567         }
3568 
3569         /* Only ask if master is down to other sentinels if:
3570          *
3571          * 1) We believe it is down, or there is a failover in progress.
3572          * 2) Sentinel is connected.
3573          * 3) We did not received the info within SENTINEL_ASK_PERIOD ms. */
3574         if ((master->flags & SRI_S_DOWN) == 0) continue;
3575         if (ri->link->disconnected) continue;
3576         if (!(flags & SENTINEL_ASK_FORCED) &&
3577             mstime() - ri->last_master_down_reply_time < SENTINEL_ASK_PERIOD)
3578             continue;
3579 
3580         /* Ask */
3581         ll2string(port,sizeof(port),master->addr->port);
3582         retval = redisAsyncCommand(ri->link->cc,
3583                     sentinelReceiveIsMasterDownReply, ri,
3584                     "SENTINEL is-master-down-by-addr %s %s %llu %s",
3585                     master->addr->ip, port,
3586                     sentinel.current_epoch,
3587                     (master->failover_state > SENTINEL_FAILOVER_STATE_NONE) ?
3588                     sentinel.myid : "*");
3589         if (retval == C_OK) ri->link->pending_commands++;
3590     }
3591     dictReleaseIterator(di);
3592 }
3593 
3594 /* =============================== FAILOVER ================================= */
3595 
3596 /* Crash because of user request via SENTINEL simulate-failure command. */
sentinelSimFailureCrash(void)3597 void sentinelSimFailureCrash(void) {
3598     serverLog(LL_WARNING,
3599         "Sentinel CRASH because of SENTINEL simulate-failure");
3600     exit(99);
3601 }
3602 
3603 /* Vote for the sentinel with 'req_runid' or return the old vote if already
3604  * voted for the specifed 'req_epoch' or one greater.
3605  *
3606  * If a vote is not available returns NULL, otherwise return the Sentinel
3607  * runid and populate the leader_epoch with the epoch of the vote. */
sentinelVoteLeader(sentinelRedisInstance * master,uint64_t req_epoch,char * req_runid,uint64_t * leader_epoch)3608 char *sentinelVoteLeader(sentinelRedisInstance *master, uint64_t req_epoch, char *req_runid, uint64_t *leader_epoch) {
3609     if (req_epoch > sentinel.current_epoch) {
3610         sentinel.current_epoch = req_epoch;
3611         sentinelFlushConfig();
3612         sentinelEvent(LL_WARNING,"+new-epoch",master,"%llu",
3613             (unsigned long long) sentinel.current_epoch);
3614     }
3615 
3616     if (master->leader_epoch < req_epoch && sentinel.current_epoch <= req_epoch)
3617     {
3618         sdsfree(master->leader);
3619         master->leader = sdsnew(req_runid);
3620         master->leader_epoch = sentinel.current_epoch;
3621         sentinelFlushConfig();
3622         sentinelEvent(LL_WARNING,"+vote-for-leader",master,"%s %llu",
3623             master->leader, (unsigned long long) master->leader_epoch);
3624         /* If we did not voted for ourselves, set the master failover start
3625          * time to now, in order to force a delay before we can start a
3626          * failover for the same master. */
3627         if (strcasecmp(master->leader,sentinel.myid))
3628             master->failover_start_time = mstime()+rand()%SENTINEL_MAX_DESYNC;
3629     }
3630 
3631     *leader_epoch = master->leader_epoch;
3632     return master->leader ? sdsnew(master->leader) : NULL;
3633 }
3634 
3635 struct sentinelLeader {
3636     char *runid;
3637     unsigned long votes;
3638 };
3639 
3640 /* Helper function for sentinelGetLeader, increment the counter
3641  * relative to the specified runid. */
sentinelLeaderIncr(dict * counters,char * runid)3642 int sentinelLeaderIncr(dict *counters, char *runid) {
3643     dictEntry *de = dictFind(counters,runid);
3644     uint64_t oldval;
3645 
3646     if (de) {
3647         oldval = dictGetUnsignedIntegerVal(de);
3648         dictSetUnsignedIntegerVal(de,oldval+1);
3649         return oldval+1;
3650     } else {
3651         de = dictAddRaw(counters,runid);
3652         serverAssert(de != NULL);
3653         dictSetUnsignedIntegerVal(de,1);
3654         return 1;
3655     }
3656 }
3657 
3658 /* Scan all the Sentinels attached to this master to check if there
3659  * is a leader for the specified epoch.
3660  *
3661  * To be a leader for a given epoch, we should have the majority of
3662  * the Sentinels we know (ever seen since the last SENTINEL RESET) that
3663  * reported the same instance as leader for the same epoch. */
sentinelGetLeader(sentinelRedisInstance * master,uint64_t epoch)3664 char *sentinelGetLeader(sentinelRedisInstance *master, uint64_t epoch) {
3665     dict *counters;
3666     dictIterator *di;
3667     dictEntry *de;
3668     unsigned int voters = 0, voters_quorum;
3669     char *myvote;
3670     char *winner = NULL;
3671     uint64_t leader_epoch;
3672     uint64_t max_votes = 0;
3673 
3674     serverAssert(master->flags & (SRI_O_DOWN|SRI_FAILOVER_IN_PROGRESS));
3675     counters = dictCreate(&leaderVotesDictType,NULL);
3676 
3677     voters = dictSize(master->sentinels)+1; /* All the other sentinels and me. */
3678 
3679     /* Count other sentinels votes */
3680     di = dictGetIterator(master->sentinels);
3681     while((de = dictNext(di)) != NULL) {
3682         sentinelRedisInstance *ri = dictGetVal(de);
3683         if (ri->leader != NULL && ri->leader_epoch == sentinel.current_epoch)
3684             sentinelLeaderIncr(counters,ri->leader);
3685     }
3686     dictReleaseIterator(di);
3687 
3688     /* Check what's the winner. For the winner to win, it needs two conditions:
3689      * 1) Absolute majority between voters (50% + 1).
3690      * 2) And anyway at least master->quorum votes. */
3691     di = dictGetIterator(counters);
3692     while((de = dictNext(di)) != NULL) {
3693         uint64_t votes = dictGetUnsignedIntegerVal(de);
3694 
3695         if (votes > max_votes) {
3696             max_votes = votes;
3697             winner = dictGetKey(de);
3698         }
3699     }
3700     dictReleaseIterator(di);
3701 
3702     /* Count this Sentinel vote:
3703      * if this Sentinel did not voted yet, either vote for the most
3704      * common voted sentinel, or for itself if no vote exists at all. */
3705     if (winner)
3706         myvote = sentinelVoteLeader(master,epoch,winner,&leader_epoch);
3707     else
3708         myvote = sentinelVoteLeader(master,epoch,sentinel.myid,&leader_epoch);
3709 
3710     if (myvote && leader_epoch == epoch) {
3711         uint64_t votes = sentinelLeaderIncr(counters,myvote);
3712 
3713         if (votes > max_votes) {
3714             max_votes = votes;
3715             winner = myvote;
3716         }
3717     }
3718 
3719     voters_quorum = voters/2+1;
3720     if (winner && (max_votes < voters_quorum || max_votes < master->quorum))
3721         winner = NULL;
3722 
3723     winner = winner ? sdsnew(winner) : NULL;
3724     sdsfree(myvote);
3725     dictRelease(counters);
3726     return winner;
3727 }
3728 
3729 /* Send SLAVEOF to the specified instance, always followed by a
3730  * CONFIG REWRITE command in order to store the new configuration on disk
3731  * when possible (that is, if the Redis instance is recent enough to support
3732  * config rewriting, and if the server was started with a configuration file).
3733  *
3734  * If Host is NULL the function sends "SLAVEOF NO ONE".
3735  *
3736  * The command returns C_OK if the SLAVEOF command was accepted for
3737  * (later) delivery otherwise C_ERR. The command replies are just
3738  * discarded. */
sentinelSendSlaveOf(sentinelRedisInstance * ri,char * host,int port)3739 int sentinelSendSlaveOf(sentinelRedisInstance *ri, char *host, int port) {
3740     char portstr[32];
3741     int retval;
3742 
3743     ll2string(portstr,sizeof(portstr),port);
3744 
3745     /* If host is NULL we send SLAVEOF NO ONE that will turn the instance
3746      * into a master. */
3747     if (host == NULL) {
3748         host = "NO";
3749         memcpy(portstr,"ONE",4);
3750     }
3751 
3752     /* In order to send SLAVEOF in a safe way, we send a transaction performing
3753      * the following tasks:
3754      * 1) Reconfigure the instance according to the specified host/port params.
3755      * 2) Rewrite the configuraiton.
3756      * 3) Disconnect all clients (but this one sending the commnad) in order
3757      *    to trigger the ask-master-on-reconnection protocol for connected
3758      *    clients.
3759      *
3760      * Note that we don't check the replies returned by commands, since we
3761      * will observe instead the effects in the next INFO output. */
3762     retval = redisAsyncCommand(ri->link->cc,
3763         sentinelDiscardReplyCallback, ri, "MULTI");
3764     if (retval == C_ERR) return retval;
3765     ri->link->pending_commands++;
3766 
3767     retval = redisAsyncCommand(ri->link->cc,
3768         sentinelDiscardReplyCallback, ri, "SLAVEOF %s %s", host, portstr);
3769     if (retval == C_ERR) return retval;
3770     ri->link->pending_commands++;
3771 
3772     retval = redisAsyncCommand(ri->link->cc,
3773         sentinelDiscardReplyCallback, ri, "CONFIG REWRITE");
3774     if (retval == C_ERR) return retval;
3775     ri->link->pending_commands++;
3776 
3777     /* CLIENT KILL TYPE <type> is only supported starting from Redis 2.8.12,
3778      * however sending it to an instance not understanding this command is not
3779      * an issue because CLIENT is variadic command, so Redis will not
3780      * recognized as a syntax error, and the transaction will not fail (but
3781      * only the unsupported command will fail). */
3782     retval = redisAsyncCommand(ri->link->cc,
3783         sentinelDiscardReplyCallback, ri, "CLIENT KILL TYPE normal");
3784     if (retval == C_ERR) return retval;
3785     ri->link->pending_commands++;
3786 
3787     retval = redisAsyncCommand(ri->link->cc,
3788         sentinelDiscardReplyCallback, ri, "EXEC");
3789     if (retval == C_ERR) return retval;
3790     ri->link->pending_commands++;
3791 
3792     return C_OK;
3793 }
3794 
3795 /* Setup the master state to start a failover. */
sentinelStartFailover(sentinelRedisInstance * master)3796 void sentinelStartFailover(sentinelRedisInstance *master) {
3797     serverAssert(master->flags & SRI_MASTER);
3798 
3799     master->failover_state = SENTINEL_FAILOVER_STATE_WAIT_START;
3800     master->flags |= SRI_FAILOVER_IN_PROGRESS;
3801     master->failover_epoch = ++sentinel.current_epoch;
3802     sentinelEvent(LL_WARNING,"+new-epoch",master,"%llu",
3803         (unsigned long long) sentinel.current_epoch);
3804     sentinelEvent(LL_WARNING,"+try-failover",master,"%@");
3805     master->failover_start_time = mstime()+rand()%SENTINEL_MAX_DESYNC;
3806     master->failover_state_change_time = mstime();
3807 }
3808 
3809 /* This function checks if there are the conditions to start the failover,
3810  * that is:
3811  *
3812  * 1) Master must be in ODOWN condition.
3813  * 2) No failover already in progress.
3814  * 3) No failover already attempted recently.
3815  *
3816  * We still don't know if we'll win the election so it is possible that we
3817  * start the failover but that we'll not be able to act.
3818  *
3819  * Return non-zero if a failover was started. */
sentinelStartFailoverIfNeeded(sentinelRedisInstance * master)3820 int sentinelStartFailoverIfNeeded(sentinelRedisInstance *master) {
3821     /* We can't failover if the master is not in O_DOWN state. */
3822     if (!(master->flags & SRI_O_DOWN)) return 0;
3823 
3824     /* Failover already in progress? */
3825     if (master->flags & SRI_FAILOVER_IN_PROGRESS) return 0;
3826 
3827     /* Last failover attempt started too little time ago? */
3828     if (mstime() - master->failover_start_time <
3829         master->failover_timeout*2)
3830     {
3831         if (master->failover_delay_logged != master->failover_start_time) {
3832             time_t clock = (master->failover_start_time +
3833                             master->failover_timeout*2) / 1000;
3834             char ctimebuf[26];
3835 
3836             ctime_r(&clock,ctimebuf);
3837             ctimebuf[24] = '\0'; /* Remove newline. */
3838             master->failover_delay_logged = master->failover_start_time;
3839             serverLog(LL_WARNING,
3840                 "Next failover delay: I will not start a failover before %s",
3841                 ctimebuf);
3842         }
3843         return 0;
3844     }
3845 
3846     sentinelStartFailover(master);
3847     return 1;
3848 }
3849 
3850 /* Select a suitable slave to promote. The current algorithm only uses
3851  * the following parameters:
3852  *
3853  * 1) None of the following conditions: S_DOWN, O_DOWN, DISCONNECTED.
3854  * 2) Last time the slave replied to ping no more than 5 times the PING period.
3855  * 3) info_refresh not older than 3 times the INFO refresh period.
3856  * 4) master_link_down_time no more than:
3857  *     (now - master->s_down_since_time) + (master->down_after_period * 10).
3858  *    Basically since the master is down from our POV, the slave reports
3859  *    to be disconnected no more than 10 times the configured down-after-period.
3860  *    This is pretty much black magic but the idea is, the master was not
3861  *    available so the slave may be lagging, but not over a certain time.
3862  *    Anyway we'll select the best slave according to replication offset.
3863  * 5) Slave priority can't be zero, otherwise the slave is discarded.
3864  *
3865  * Among all the slaves matching the above conditions we select the slave
3866  * with, in order of sorting key:
3867  *
3868  * - lower slave_priority.
3869  * - bigger processed replication offset.
3870  * - lexicographically smaller runid.
3871  *
3872  * Basically if runid is the same, the slave that processed more commands
3873  * from the master is selected.
3874  *
3875  * The function returns the pointer to the selected slave, otherwise
3876  * NULL if no suitable slave was found.
3877  */
3878 
3879 /* Helper for sentinelSelectSlave(). This is used by qsort() in order to
3880  * sort suitable slaves in a "better first" order, to take the first of
3881  * the list. */
compareSlavesForPromotion(const void * a,const void * b)3882 int compareSlavesForPromotion(const void *a, const void *b) {
3883     sentinelRedisInstance **sa = (sentinelRedisInstance **)a,
3884                           **sb = (sentinelRedisInstance **)b;
3885     char *sa_runid, *sb_runid;
3886 
3887     if ((*sa)->slave_priority != (*sb)->slave_priority)
3888         return (*sa)->slave_priority - (*sb)->slave_priority;
3889 
3890     /* If priority is the same, select the slave with greater replication
3891      * offset (processed more data from the master). */
3892     if ((*sa)->slave_repl_offset > (*sb)->slave_repl_offset) {
3893         return -1; /* a < b */
3894     } else if ((*sa)->slave_repl_offset < (*sb)->slave_repl_offset) {
3895         return 1; /* a > b */
3896     }
3897 
3898     /* If the replication offset is the same select the slave with that has
3899      * the lexicographically smaller runid. Note that we try to handle runid
3900      * == NULL as there are old Redis versions that don't publish runid in
3901      * INFO. A NULL runid is considered bigger than any other runid. */
3902     sa_runid = (*sa)->runid;
3903     sb_runid = (*sb)->runid;
3904     if (sa_runid == NULL && sb_runid == NULL) return 0;
3905     else if (sa_runid == NULL) return 1;  /* a > b */
3906     else if (sb_runid == NULL) return -1; /* a < b */
3907     return strcasecmp(sa_runid, sb_runid);
3908 }
3909 
sentinelSelectSlave(sentinelRedisInstance * master)3910 sentinelRedisInstance *sentinelSelectSlave(sentinelRedisInstance *master) {
3911     sentinelRedisInstance **instance =
3912         zmalloc(sizeof(instance[0])*dictSize(master->slaves));
3913     sentinelRedisInstance *selected = NULL;
3914     int instances = 0;
3915     dictIterator *di;
3916     dictEntry *de;
3917     mstime_t max_master_down_time = 0;
3918 
3919     if (master->flags & SRI_S_DOWN)
3920         max_master_down_time += mstime() - master->s_down_since_time;
3921     max_master_down_time += master->down_after_period * 10;
3922 
3923     di = dictGetIterator(master->slaves);
3924     while((de = dictNext(di)) != NULL) {
3925         sentinelRedisInstance *slave = dictGetVal(de);
3926         mstime_t info_validity_time;
3927 
3928         if (slave->flags & (SRI_S_DOWN|SRI_O_DOWN)) continue;
3929         if (slave->link->disconnected) continue;
3930         if (mstime() - slave->link->last_avail_time > SENTINEL_PING_PERIOD*5) continue;
3931         if (slave->slave_priority == 0) continue;
3932 
3933         /* If the master is in SDOWN state we get INFO for slaves every second.
3934          * Otherwise we get it with the usual period so we need to account for
3935          * a larger delay. */
3936         if (master->flags & SRI_S_DOWN)
3937             info_validity_time = SENTINEL_PING_PERIOD*5;
3938         else
3939             info_validity_time = SENTINEL_INFO_PERIOD*3;
3940         if (mstime() - slave->info_refresh > info_validity_time) continue;
3941         if (slave->master_link_down_time > max_master_down_time) continue;
3942         instance[instances++] = slave;
3943     }
3944     dictReleaseIterator(di);
3945     if (instances) {
3946         qsort(instance,instances,sizeof(sentinelRedisInstance*),
3947             compareSlavesForPromotion);
3948         selected = instance[0];
3949     }
3950     zfree(instance);
3951     return selected;
3952 }
3953 
3954 /* ---------------- Failover state machine implementation ------------------- */
sentinelFailoverWaitStart(sentinelRedisInstance * ri)3955 void sentinelFailoverWaitStart(sentinelRedisInstance *ri) {
3956     char *leader;
3957     int isleader;
3958 
3959     /* Check if we are the leader for the failover epoch. */
3960     leader = sentinelGetLeader(ri, ri->failover_epoch);
3961     isleader = leader && strcasecmp(leader,sentinel.myid) == 0;
3962     sdsfree(leader);
3963 
3964     /* If I'm not the leader, and it is not a forced failover via
3965      * SENTINEL FAILOVER, then I can't continue with the failover. */
3966     if (!isleader && !(ri->flags & SRI_FORCE_FAILOVER)) {
3967         int election_timeout = SENTINEL_ELECTION_TIMEOUT;
3968 
3969         /* The election timeout is the MIN between SENTINEL_ELECTION_TIMEOUT
3970          * and the configured failover timeout. */
3971         if (election_timeout > ri->failover_timeout)
3972             election_timeout = ri->failover_timeout;
3973         /* Abort the failover if I'm not the leader after some time. */
3974         if (mstime() - ri->failover_start_time > election_timeout) {
3975             sentinelEvent(LL_WARNING,"-failover-abort-not-elected",ri,"%@");
3976             sentinelAbortFailover(ri);
3977         }
3978         return;
3979     }
3980     sentinelEvent(LL_WARNING,"+elected-leader",ri,"%@");
3981     if (sentinel.simfailure_flags & SENTINEL_SIMFAILURE_CRASH_AFTER_ELECTION)
3982         sentinelSimFailureCrash();
3983     ri->failover_state = SENTINEL_FAILOVER_STATE_SELECT_SLAVE;
3984     ri->failover_state_change_time = mstime();
3985     sentinelEvent(LL_WARNING,"+failover-state-select-slave",ri,"%@");
3986 }
3987 
sentinelFailoverSelectSlave(sentinelRedisInstance * ri)3988 void sentinelFailoverSelectSlave(sentinelRedisInstance *ri) {
3989     sentinelRedisInstance *slave = sentinelSelectSlave(ri);
3990 
3991     /* We don't handle the timeout in this state as the function aborts
3992      * the failover or go forward in the next state. */
3993     if (slave == NULL) {
3994         sentinelEvent(LL_WARNING,"-failover-abort-no-good-slave",ri,"%@");
3995         sentinelAbortFailover(ri);
3996     } else {
3997         sentinelEvent(LL_WARNING,"+selected-slave",slave,"%@");
3998         slave->flags |= SRI_PROMOTED;
3999         ri->promoted_slave = slave;
4000         ri->failover_state = SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE;
4001         ri->failover_state_change_time = mstime();
4002         sentinelEvent(LL_NOTICE,"+failover-state-send-slaveof-noone",
4003             slave, "%@");
4004     }
4005 }
4006 
sentinelFailoverSendSlaveOfNoOne(sentinelRedisInstance * ri)4007 void sentinelFailoverSendSlaveOfNoOne(sentinelRedisInstance *ri) {
4008     int retval;
4009 
4010     /* We can't send the command to the promoted slave if it is now
4011      * disconnected. Retry again and again with this state until the timeout
4012      * is reached, then abort the failover. */
4013     if (ri->promoted_slave->link->disconnected) {
4014         if (mstime() - ri->failover_state_change_time > ri->failover_timeout) {
4015             sentinelEvent(LL_WARNING,"-failover-abort-slave-timeout",ri,"%@");
4016             sentinelAbortFailover(ri);
4017         }
4018         return;
4019     }
4020 
4021     /* Send SLAVEOF NO ONE command to turn the slave into a master.
4022      * We actually register a generic callback for this command as we don't
4023      * really care about the reply. We check if it worked indirectly observing
4024      * if INFO returns a different role (master instead of slave). */
4025     retval = sentinelSendSlaveOf(ri->promoted_slave,NULL,0);
4026     if (retval != C_OK) return;
4027     sentinelEvent(LL_NOTICE, "+failover-state-wait-promotion",
4028         ri->promoted_slave,"%@");
4029     ri->failover_state = SENTINEL_FAILOVER_STATE_WAIT_PROMOTION;
4030     ri->failover_state_change_time = mstime();
4031 }
4032 
4033 /* We actually wait for promotion indirectly checking with INFO when the
4034  * slave turns into a master. */
sentinelFailoverWaitPromotion(sentinelRedisInstance * ri)4035 void sentinelFailoverWaitPromotion(sentinelRedisInstance *ri) {
4036     /* Just handle the timeout. Switching to the next state is handled
4037      * by the function parsing the INFO command of the promoted slave. */
4038     if (mstime() - ri->failover_state_change_time > ri->failover_timeout) {
4039         sentinelEvent(LL_WARNING,"-failover-abort-slave-timeout",ri,"%@");
4040         sentinelAbortFailover(ri);
4041     }
4042 }
4043 
sentinelFailoverDetectEnd(sentinelRedisInstance * master)4044 void sentinelFailoverDetectEnd(sentinelRedisInstance *master) {
4045     int not_reconfigured = 0, timeout = 0;
4046     dictIterator *di;
4047     dictEntry *de;
4048     mstime_t elapsed = mstime() - master->failover_state_change_time;
4049 
4050     /* We can't consider failover finished if the promoted slave is
4051      * not reachable. */
4052     if (master->promoted_slave == NULL ||
4053         master->promoted_slave->flags & SRI_S_DOWN) return;
4054 
4055     /* The failover terminates once all the reachable slaves are properly
4056      * configured. */
4057     di = dictGetIterator(master->slaves);
4058     while((de = dictNext(di)) != NULL) {
4059         sentinelRedisInstance *slave = dictGetVal(de);
4060 
4061         if (slave->flags & (SRI_PROMOTED|SRI_RECONF_DONE)) continue;
4062         if (slave->flags & SRI_S_DOWN) continue;
4063         not_reconfigured++;
4064     }
4065     dictReleaseIterator(di);
4066 
4067     /* Force end of failover on timeout. */
4068     if (elapsed > master->failover_timeout) {
4069         not_reconfigured = 0;
4070         timeout = 1;
4071         sentinelEvent(LL_WARNING,"+failover-end-for-timeout",master,"%@");
4072     }
4073 
4074     if (not_reconfigured == 0) {
4075         sentinelEvent(LL_WARNING,"+failover-end",master,"%@");
4076         master->failover_state = SENTINEL_FAILOVER_STATE_UPDATE_CONFIG;
4077         master->failover_state_change_time = mstime();
4078     }
4079 
4080     /* If I'm the leader it is a good idea to send a best effort SLAVEOF
4081      * command to all the slaves still not reconfigured to replicate with
4082      * the new master. */
4083     if (timeout) {
4084         dictIterator *di;
4085         dictEntry *de;
4086 
4087         di = dictGetIterator(master->slaves);
4088         while((de = dictNext(di)) != NULL) {
4089             sentinelRedisInstance *slave = dictGetVal(de);
4090             int retval;
4091 
4092             if (slave->flags & (SRI_RECONF_DONE|SRI_RECONF_SENT)) continue;
4093             if (slave->link->disconnected) continue;
4094 
4095             retval = sentinelSendSlaveOf(slave,
4096                     master->promoted_slave->addr->ip,
4097                     master->promoted_slave->addr->port);
4098             if (retval == C_OK) {
4099                 sentinelEvent(LL_NOTICE,"+slave-reconf-sent-be",slave,"%@");
4100                 slave->flags |= SRI_RECONF_SENT;
4101             }
4102         }
4103         dictReleaseIterator(di);
4104     }
4105 }
4106 
4107 /* Send SLAVE OF <new master address> to all the remaining slaves that
4108  * still don't appear to have the configuration updated. */
sentinelFailoverReconfNextSlave(sentinelRedisInstance * master)4109 void sentinelFailoverReconfNextSlave(sentinelRedisInstance *master) {
4110     dictIterator *di;
4111     dictEntry *de;
4112     int in_progress = 0;
4113 
4114     di = dictGetIterator(master->slaves);
4115     while((de = dictNext(di)) != NULL) {
4116         sentinelRedisInstance *slave = dictGetVal(de);
4117 
4118         if (slave->flags & (SRI_RECONF_SENT|SRI_RECONF_INPROG))
4119             in_progress++;
4120     }
4121     dictReleaseIterator(di);
4122 
4123     di = dictGetIterator(master->slaves);
4124     while(in_progress < master->parallel_syncs &&
4125           (de = dictNext(di)) != NULL)
4126     {
4127         sentinelRedisInstance *slave = dictGetVal(de);
4128         int retval;
4129 
4130         /* Skip the promoted slave, and already configured slaves. */
4131         if (slave->flags & (SRI_PROMOTED|SRI_RECONF_DONE)) continue;
4132 
4133         /* If too much time elapsed without the slave moving forward to
4134          * the next state, consider it reconfigured even if it is not.
4135          * Sentinels will detect the slave as misconfigured and fix its
4136          * configuration later. */
4137         if ((slave->flags & SRI_RECONF_SENT) &&
4138             (mstime() - slave->slave_reconf_sent_time) >
4139             SENTINEL_SLAVE_RECONF_TIMEOUT)
4140         {
4141             sentinelEvent(LL_NOTICE,"-slave-reconf-sent-timeout",slave,"%@");
4142             slave->flags &= ~SRI_RECONF_SENT;
4143             slave->flags |= SRI_RECONF_DONE;
4144         }
4145 
4146         /* Nothing to do for instances that are disconnected or already
4147          * in RECONF_SENT state. */
4148         if (slave->flags & (SRI_RECONF_SENT|SRI_RECONF_INPROG)) continue;
4149         if (slave->link->disconnected) continue;
4150 
4151         /* Send SLAVEOF <new master>. */
4152         retval = sentinelSendSlaveOf(slave,
4153                 master->promoted_slave->addr->ip,
4154                 master->promoted_slave->addr->port);
4155         if (retval == C_OK) {
4156             slave->flags |= SRI_RECONF_SENT;
4157             slave->slave_reconf_sent_time = mstime();
4158             sentinelEvent(LL_NOTICE,"+slave-reconf-sent",slave,"%@");
4159             in_progress++;
4160         }
4161     }
4162     dictReleaseIterator(di);
4163 
4164     /* Check if all the slaves are reconfigured and handle timeout. */
4165     sentinelFailoverDetectEnd(master);
4166 }
4167 
4168 /* This function is called when the slave is in
4169  * SENTINEL_FAILOVER_STATE_UPDATE_CONFIG state. In this state we need
4170  * to remove it from the master table and add the promoted slave instead. */
sentinelFailoverSwitchToPromotedSlave(sentinelRedisInstance * master)4171 void sentinelFailoverSwitchToPromotedSlave(sentinelRedisInstance *master) {
4172     sentinelRedisInstance *ref = master->promoted_slave ?
4173                                  master->promoted_slave : master;
4174 
4175     sentinelEvent(LL_WARNING,"+switch-master",master,"%s %s %d %s %d",
4176         master->name, master->addr->ip, master->addr->port,
4177         ref->addr->ip, ref->addr->port);
4178 
4179     sentinelResetMasterAndChangeAddress(master,ref->addr->ip,ref->addr->port);
4180 }
4181 
sentinelFailoverStateMachine(sentinelRedisInstance * ri)4182 void sentinelFailoverStateMachine(sentinelRedisInstance *ri) {
4183     serverAssert(ri->flags & SRI_MASTER);
4184 
4185     if (!(ri->flags & SRI_FAILOVER_IN_PROGRESS)) return;
4186 
4187     switch(ri->failover_state) {
4188         case SENTINEL_FAILOVER_STATE_WAIT_START:
4189             sentinelFailoverWaitStart(ri);
4190             break;
4191         case SENTINEL_FAILOVER_STATE_SELECT_SLAVE:
4192             sentinelFailoverSelectSlave(ri);
4193             break;
4194         case SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE:
4195             sentinelFailoverSendSlaveOfNoOne(ri);
4196             break;
4197         case SENTINEL_FAILOVER_STATE_WAIT_PROMOTION:
4198             sentinelFailoverWaitPromotion(ri);
4199             break;
4200         case SENTINEL_FAILOVER_STATE_RECONF_SLAVES:
4201             sentinelFailoverReconfNextSlave(ri);
4202             break;
4203     }
4204 }
4205 
4206 /* Abort a failover in progress:
4207  *
4208  * This function can only be called before the promoted slave acknowledged
4209  * the slave -> master switch. Otherwise the failover can't be aborted and
4210  * will reach its end (possibly by timeout). */
sentinelAbortFailover(sentinelRedisInstance * ri)4211 void sentinelAbortFailover(sentinelRedisInstance *ri) {
4212     serverAssert(ri->flags & SRI_FAILOVER_IN_PROGRESS);
4213     serverAssert(ri->failover_state <= SENTINEL_FAILOVER_STATE_WAIT_PROMOTION);
4214 
4215     ri->flags &= ~(SRI_FAILOVER_IN_PROGRESS|SRI_FORCE_FAILOVER);
4216     ri->failover_state = SENTINEL_FAILOVER_STATE_NONE;
4217     ri->failover_state_change_time = mstime();
4218     if (ri->promoted_slave) {
4219         ri->promoted_slave->flags &= ~SRI_PROMOTED;
4220         ri->promoted_slave = NULL;
4221     }
4222 }
4223 
4224 /* ======================== SENTINEL timer handler ==========================
4225  * This is the "main" our Sentinel, being sentinel completely non blocking
4226  * in design. The function is called every second.
4227  * -------------------------------------------------------------------------- */
4228 
4229 /* Perform scheduled operations for the specified Redis instance. */
sentinelHandleRedisInstance(sentinelRedisInstance * ri)4230 void sentinelHandleRedisInstance(sentinelRedisInstance *ri) {
4231     /* ========== MONITORING HALF ============ */
4232     /* Every kind of instance */
4233     sentinelReconnectInstance(ri);
4234     sentinelSendPeriodicCommands(ri);
4235 
4236     /* ============== ACTING HALF ============= */
4237     /* We don't proceed with the acting half if we are in TILT mode.
4238      * TILT happens when we find something odd with the time, like a
4239      * sudden change in the clock. */
4240     if (sentinel.tilt) {
4241         if (mstime()-sentinel.tilt_start_time < SENTINEL_TILT_PERIOD) return;
4242         sentinel.tilt = 0;
4243         sentinelEvent(LL_WARNING,"-tilt",NULL,"#tilt mode exited");
4244     }
4245 
4246     /* Every kind of instance */
4247     sentinelCheckSubjectivelyDown(ri);
4248 
4249     /* Masters and slaves */
4250     if (ri->flags & (SRI_MASTER|SRI_SLAVE)) {
4251         /* Nothing so far. */
4252     }
4253 
4254     /* Only masters */
4255     if (ri->flags & SRI_MASTER) {
4256         sentinelCheckObjectivelyDown(ri);
4257         if (sentinelStartFailoverIfNeeded(ri))
4258             sentinelAskMasterStateToOtherSentinels(ri,SENTINEL_ASK_FORCED);
4259         sentinelFailoverStateMachine(ri);
4260         sentinelAskMasterStateToOtherSentinels(ri,SENTINEL_NO_FLAGS);
4261     }
4262 }
4263 
4264 /* Perform scheduled operations for all the instances in the dictionary.
4265  * Recursively call the function against dictionaries of slaves. */
sentinelHandleDictOfRedisInstances(dict * instances)4266 void sentinelHandleDictOfRedisInstances(dict *instances) {
4267     dictIterator *di;
4268     dictEntry *de;
4269     sentinelRedisInstance *switch_to_promoted = NULL;
4270 
4271     /* There are a number of things we need to perform against every master. */
4272     di = dictGetIterator(instances);
4273     while((de = dictNext(di)) != NULL) {
4274         sentinelRedisInstance *ri = dictGetVal(de);
4275 
4276         sentinelHandleRedisInstance(ri);
4277         if (ri->flags & SRI_MASTER) {
4278             sentinelHandleDictOfRedisInstances(ri->slaves);
4279             sentinelHandleDictOfRedisInstances(ri->sentinels);
4280             if (ri->failover_state == SENTINEL_FAILOVER_STATE_UPDATE_CONFIG) {
4281                 switch_to_promoted = ri;
4282             }
4283         }
4284     }
4285     if (switch_to_promoted)
4286         sentinelFailoverSwitchToPromotedSlave(switch_to_promoted);
4287     dictReleaseIterator(di);
4288 }
4289 
4290 /* This function checks if we need to enter the TITL mode.
4291  *
4292  * The TILT mode is entered if we detect that between two invocations of the
4293  * timer interrupt, a negative amount of time, or too much time has passed.
4294  * Note that we expect that more or less just 100 milliseconds will pass
4295  * if everything is fine. However we'll see a negative number or a
4296  * difference bigger than SENTINEL_TILT_TRIGGER milliseconds if one of the
4297  * following conditions happen:
4298  *
4299  * 1) The Sentiel process for some time is blocked, for every kind of
4300  * random reason: the load is huge, the computer was frozen for some time
4301  * in I/O or alike, the process was stopped by a signal. Everything.
4302  * 2) The system clock was altered significantly.
4303  *
4304  * Under both this conditions we'll see everything as timed out and failing
4305  * without good reasons. Instead we enter the TILT mode and wait
4306  * for SENTINEL_TILT_PERIOD to elapse before starting to act again.
4307  *
4308  * During TILT time we still collect information, we just do not act. */
sentinelCheckTiltCondition(void)4309 void sentinelCheckTiltCondition(void) {
4310     mstime_t now = mstime();
4311     mstime_t delta = now - sentinel.previous_time;
4312 
4313     if (delta < 0 || delta > SENTINEL_TILT_TRIGGER) {
4314         sentinel.tilt = 1;
4315         sentinel.tilt_start_time = mstime();
4316         sentinelEvent(LL_WARNING,"+tilt",NULL,"#tilt mode entered");
4317     }
4318     sentinel.previous_time = mstime();
4319 }
4320 
sentinelTimer(void)4321 void sentinelTimer(void) {
4322     sentinelCheckTiltCondition();
4323     sentinelHandleDictOfRedisInstances(sentinel.masters);
4324     sentinelRunPendingScripts();
4325     sentinelCollectTerminatedScripts();
4326     sentinelKillTimedoutScripts();
4327 
4328     /* We continuously change the frequency of the Redis "timer interrupt"
4329      * in order to desynchronize every Sentinel from every other.
4330      * This non-determinism avoids that Sentinels started at the same time
4331      * exactly continue to stay synchronized asking to be voted at the
4332      * same time again and again (resulting in nobody likely winning the
4333      * election because of split brain voting). */
4334     server.hz = CONFIG_DEFAULT_HZ + rand() % CONFIG_DEFAULT_HZ;
4335 }
4336 
4337