Lines Matching refs:server

79     serverAssert(server.repl_backlog == NULL);  in createReplicationBacklog()
80 server.repl_backlog = zmalloc(server.repl_backlog_size); in createReplicationBacklog()
81 server.repl_backlog_histlen = 0; in createReplicationBacklog()
82 server.repl_backlog_idx = 0; in createReplicationBacklog()
87 server.repl_backlog_off = server.master_repl_offset+1; in createReplicationBacklog()
99 if (server.repl_backlog_size == newsize) return; in resizeReplicationBacklog()
101 server.repl_backlog_size = newsize; in resizeReplicationBacklog()
102 if (server.repl_backlog != NULL) { in resizeReplicationBacklog()
108 zfree(server.repl_backlog); in resizeReplicationBacklog()
109 server.repl_backlog = zmalloc(server.repl_backlog_size); in resizeReplicationBacklog()
110 server.repl_backlog_histlen = 0; in resizeReplicationBacklog()
111 server.repl_backlog_idx = 0; in resizeReplicationBacklog()
113 server.repl_backlog_off = server.master_repl_offset+1; in resizeReplicationBacklog()
118 serverAssert(listLength(server.slaves) == 0); in freeReplicationBacklog()
119 zfree(server.repl_backlog); in freeReplicationBacklog()
120 server.repl_backlog = NULL; in freeReplicationBacklog()
130 server.master_repl_offset += len; in feedReplicationBacklog()
135 size_t thislen = server.repl_backlog_size - server.repl_backlog_idx; in feedReplicationBacklog()
137 memcpy(server.repl_backlog+server.repl_backlog_idx,p,thislen); in feedReplicationBacklog()
138 server.repl_backlog_idx += thislen; in feedReplicationBacklog()
139 if (server.repl_backlog_idx == server.repl_backlog_size) in feedReplicationBacklog()
140 server.repl_backlog_idx = 0; in feedReplicationBacklog()
143 server.repl_backlog_histlen += thislen; in feedReplicationBacklog()
145 if (server.repl_backlog_histlen > server.repl_backlog_size) in feedReplicationBacklog()
146 server.repl_backlog_histlen = server.repl_backlog_size; in feedReplicationBacklog()
148 server.repl_backlog_off = server.master_repl_offset - in feedReplicationBacklog()
149 server.repl_backlog_histlen + 1; in feedReplicationBacklog()
185 if (server.masterhost != NULL) return; in replicationFeedSlaves()
189 if (server.repl_backlog == NULL && listLength(slaves) == 0) return; in replicationFeedSlaves()
192 serverAssert(!(listLength(slaves) != 0 && server.repl_backlog == NULL)); in replicationFeedSlaves()
195 if (server.slaveseldb != dictid) { in replicationFeedSlaves()
212 if (server.repl_backlog) feedReplicationBacklogWithObject(selectcmd); in replicationFeedSlaves()
225 server.slaveseldb = dictid; in replicationFeedSlaves()
228 if (server.repl_backlog) { in replicationFeedSlaves()
293 if (server.repl_backlog) feedReplicationBacklog(buf,buflen); in replicationFeedSlavesFromMasterStream()
317 cmdrepr = sdscatprintf(cmdrepr,"[%d unix:%s] ",dictid,server.unixsocket); in replicationFeedMonitors()
350 if (server.repl_backlog_histlen == 0) { in addReplyReplicationBacklog()
356 server.repl_backlog_size); in addReplyReplicationBacklog()
358 server.repl_backlog_off); in addReplyReplicationBacklog()
360 server.repl_backlog_histlen); in addReplyReplicationBacklog()
362 server.repl_backlog_idx); in addReplyReplicationBacklog()
365 skip = offset - server.repl_backlog_off; in addReplyReplicationBacklog()
370 j = (server.repl_backlog_idx + in addReplyReplicationBacklog()
371 (server.repl_backlog_size-server.repl_backlog_histlen)) % in addReplyReplicationBacklog()
372 server.repl_backlog_size; in addReplyReplicationBacklog()
376 j = (j + skip) % server.repl_backlog_size; in addReplyReplicationBacklog()
380 len = server.repl_backlog_histlen - skip; in addReplyReplicationBacklog()
384 ((server.repl_backlog_size - j) < len) ? in addReplyReplicationBacklog()
385 (server.repl_backlog_size - j) : len; in addReplyReplicationBacklog()
388 addReplySds(c,sdsnewlen(server.repl_backlog + j, thislen)); in addReplyReplicationBacklog()
392 return server.repl_backlog_histlen - skip; in addReplyReplicationBacklog()
400 return server.master_repl_offset; in getPsyncInitialOffset()
428 server.slaveseldb = -1; in replicationSetupSlaveForFullResync()
434 server.replid,offset); in replicationSetupSlaveForFullResync()
466 if (strcasecmp(master_replid, server.replid) && in masterTryPartialResynchronization()
467 (strcasecmp(master_replid, server.replid2) || in masterTryPartialResynchronization()
468 psync_offset > server.second_replid_offset)) in masterTryPartialResynchronization()
472 if (strcasecmp(master_replid, server.replid) && in masterTryPartialResynchronization()
473 strcasecmp(master_replid, server.replid2)) in masterTryPartialResynchronization()
478 master_replid, server.replid, server.replid2); in masterTryPartialResynchronization()
482 "up to %lld", psync_offset, server.second_replid_offset); in masterTryPartialResynchronization()
492 if (!server.repl_backlog || in masterTryPartialResynchronization()
493 psync_offset < server.repl_backlog_off || in masterTryPartialResynchronization()
494 psync_offset > (server.repl_backlog_off + server.repl_backlog_histlen)) in masterTryPartialResynchronization()
498 if (psync_offset > server.master_repl_offset) { in masterTryPartialResynchronization()
511 c->repl_ack_time = server.unixtime; in masterTryPartialResynchronization()
513 listAddNodeTail(server.slaves,c); in masterTryPartialResynchronization()
518 buflen = snprintf(buf,sizeof(buf),"+CONTINUE %s\r\n", server.replid); in masterTryPartialResynchronization()
566 int socket_target = server.repl_diskless_sync && (mincapa & SLAVE_CAPA_EOF); in startBgsaveForReplication()
581 retval = rdbSaveBackground(server.rdb_filename,rsiptr); in startBgsaveForReplication()
592 listRewind(server.slaves,&li); in startBgsaveForReplication()
599 listDelNode(server.slaves,ln); in startBgsaveForReplication()
611 listRewind(server.slaves,&li); in startBgsaveForReplication()
635 if (server.masterhost && server.repl_state != REPL_STATE_CONNECTED) { in syncCommand()
663 server.stat_sync_partial_ok++; in syncCommand()
672 if (master_replid[0] != '?') server.stat_sync_partial_err++; in syncCommand()
682 server.stat_sync_full++; in syncCommand()
687 if (server.repl_disable_tcp_nodelay) in syncCommand()
691 listAddNodeTail(server.slaves,c); in syncCommand()
694 if (listLength(server.slaves) == 1 && server.repl_backlog == NULL) { in syncCommand()
704 if (server.rdb_child_pid != -1 && in syncCommand()
705 server.rdb_child_type == RDB_CHILD_TYPE_DISK) in syncCommand()
714 listRewind(server.slaves,&li); in syncCommand()
734 } else if (server.rdb_child_pid != -1 && in syncCommand()
735 server.rdb_child_type == RDB_CHILD_TYPE_SOCKET) in syncCommand()
744 if (server.repl_diskless_sync && (c->slave_capa & SLAVE_CAPA_EOF)) { in syncCommand()
748 if (server.repl_diskless_sync_delay) in syncCommand()
754 if (server.aof_child_pid == -1) { in syncCommand()
823 c->repl_ack_time = server.unixtime; in replconfCommand()
834 if (server.masterhost && server.master) replicationSendAck(); in replconfCommand()
860 slave->repl_ack_time = server.unixtime; /* Prevent false timeout. */ in putSlaveOnline()
861 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, in putSlaveOnline()
890 server.stat_net_output_bytes += nwritten; in sendBulkToSlave()
919 server.stat_net_output_bytes += nwritten; in sendBulkToSlave()
923 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE); in sendBulkToSlave()
948 listRewind(server.slaves,&li); in updateSlavesWaitingBgsave()
975 slave->repl_ack_time = server.unixtime; /* Timeout otherwise. */ in updateSlavesWaitingBgsave()
982 if ((slave->repldbfd = open(server.rdb_filename,O_RDONLY)) == -1 || in updateSlavesWaitingBgsave()
994 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE); in updateSlavesWaitingBgsave()
995 … if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave) == AE_ERR) { in updateSlavesWaitingBgsave()
1010 getRandomHexChars(server.replid,CONFIG_RUN_ID_SIZE); in changeReplicationId()
1011 server.replid[CONFIG_RUN_ID_SIZE] = '\0'; in changeReplicationId()
1018 memset(server.replid2,'0',sizeof(server.replid)); in clearReplicationId2()
1019 server.replid2[CONFIG_RUN_ID_SIZE] = '\0'; in clearReplicationId2()
1020 server.second_replid_offset = -1; in clearReplicationId2()
1029 memcpy(server.replid2,server.replid,sizeof(server.replid)); in shiftReplicationId()
1037 server.second_replid_offset = server.master_repl_offset+1; in shiftReplicationId()
1039 …lid up to offset: %lld. New replication ID is %s", server.replid2, server.second_replid_offset, se… in shiftReplicationId()
1047 return server.repl_state >= REPL_STATE_RECEIVE_PONG && in slaveIsInHandshakeState()
1048 server.repl_state <= REPL_STATE_RECEIVE_PSYNC; in slaveIsInHandshakeState()
1063 if (write(server.repl_transfer_s,"\n",1) == -1) { in replicationSendNewlineToMaster()
1080 server.master = createClient(fd); in replicationCreateMasterClient()
1081 server.master->flags |= CLIENT_MASTER; in replicationCreateMasterClient()
1082 server.master->authenticated = 1; in replicationCreateMasterClient()
1083 server.master->reploff = server.master_initial_offset; in replicationCreateMasterClient()
1084 server.master->read_reploff = server.master->reploff; in replicationCreateMasterClient()
1085 memcpy(server.master->replid, server.master_replid, in replicationCreateMasterClient()
1086 sizeof(server.master_replid)); in replicationCreateMasterClient()
1089 if (server.master->reploff == -1) in replicationCreateMasterClient()
1090 server.master->flags |= CLIENT_PRE_PSYNC; in replicationCreateMasterClient()
1091 if (dbid != -1) selectDb(server.master,dbid); in replicationCreateMasterClient()
1133 if (server.repl_transfer_size == -1) { in readSyncBulkPayload()
1134 if (syncReadLine(fd,buf,1024,server.repl_syncio_timeout*1000) == -1) { in readSyncBulkPayload()
1150 server.repl_transfer_lastio = server.unixtime; in readSyncBulkPayload()
1173 server.repl_transfer_size = 0; in readSyncBulkPayload()
1178 server.repl_transfer_size = strtol(buf+1,NULL,10); in readSyncBulkPayload()
1181 (long long) server.repl_transfer_size); in readSyncBulkPayload()
1190 left = server.repl_transfer_size - server.repl_transfer_read; in readSyncBulkPayload()
1201 server.stat_net_input_bytes += nread; in readSyncBulkPayload()
1219 server.repl_transfer_lastio = server.unixtime; in readSyncBulkPayload()
1220 if ((nwritten = write(server.repl_transfer_fd,buf,nread)) != nread) { in readSyncBulkPayload()
1225 server.repl_transfer_read += nread; in readSyncBulkPayload()
1229 if (ftruncate(server.repl_transfer_fd, in readSyncBulkPayload()
1230 server.repl_transfer_read - CONFIG_RUN_ID_SIZE) == -1) in readSyncBulkPayload()
1240 if (server.repl_transfer_read >= in readSyncBulkPayload()
1241 server.repl_transfer_last_fsync_off + REPL_MAX_WRITTEN_BEFORE_FSYNC) in readSyncBulkPayload()
1243 off_t sync_size = server.repl_transfer_read - in readSyncBulkPayload()
1244 server.repl_transfer_last_fsync_off; in readSyncBulkPayload()
1245 rdb_fsync_range(server.repl_transfer_fd, in readSyncBulkPayload()
1246 server.repl_transfer_last_fsync_off, sync_size); in readSyncBulkPayload()
1247 server.repl_transfer_last_fsync_off += sync_size; in readSyncBulkPayload()
1252 if (server.repl_transfer_read == server.repl_transfer_size) in readSyncBulkPayload()
1257 int aof_is_enabled = server.aof_state != AOF_OFF; in readSyncBulkPayload()
1260 if (server.rdb_child_pid != -1) { in readSyncBulkPayload()
1266 (long) server.rdb_child_pid); in readSyncBulkPayload()
1267 kill(server.rdb_child_pid,SIGUSR1); in readSyncBulkPayload()
1268 rdbRemoveTempFile(server.rdb_child_pid); in readSyncBulkPayload()
1271 if (rename(server.repl_transfer_tmpfile,server.rdb_filename) == -1) { in readSyncBulkPayload()
1283 server.repl_slave_lazy_flush ? EMPTYDB_ASYNC : EMPTYDB_NO_FLAGS, in readSyncBulkPayload()
1289 aeDeleteFileEvent(server.el,server.repl_transfer_s,AE_READABLE); in readSyncBulkPayload()
1292 if (rdbLoad(server.rdb_filename,&rsi) != C_OK) { in readSyncBulkPayload()
1301 zfree(server.repl_transfer_tmpfile); in readSyncBulkPayload()
1302 close(server.repl_transfer_fd); in readSyncBulkPayload()
1303 replicationCreateMasterClient(server.repl_transfer_s,rsi.repl_stream_db); in readSyncBulkPayload()
1304 server.repl_state = REPL_STATE_CONNECTED; in readSyncBulkPayload()
1305 server.repl_down_since = 0; in readSyncBulkPayload()
1309 memcpy(server.replid,server.master->replid,sizeof(server.replid)); in readSyncBulkPayload()
1310 server.master_repl_offset = server.master->reploff; in readSyncBulkPayload()
1316 if (server.repl_backlog == NULL) createReplicationBacklog(); in readSyncBulkPayload()
1368 if (syncWrite(fd,cmd,sdslen(cmd),server.repl_syncio_timeout*1000) in sendSynchronousCommand()
1382 if (syncReadLine(fd,buf,sizeof(buf),server.repl_syncio_timeout*1000) in sendSynchronousCommand()
1388 server.repl_transfer_lastio = server.unixtime; in sendSynchronousCommand()
1460 server.master_initial_offset = -1; in slaveTryPartialResynchronization()
1462 if (server.cached_master) { in slaveTryPartialResynchronization()
1463 psync_replid = server.cached_master->replid; in slaveTryPartialResynchronization()
1464 snprintf(psync_offset,sizeof(psync_offset),"%lld", server.cached_master->reploff+1); in slaveTryPartialResynchronization()
1477 aeDeleteFileEvent(server.el,fd,AE_READABLE); in slaveTryPartialResynchronization()
1492 aeDeleteFileEvent(server.el,fd,AE_READABLE); in slaveTryPartialResynchronization()
1512 memset(server.master_replid,0,CONFIG_RUN_ID_SIZE+1); in slaveTryPartialResynchronization()
1514 memcpy(server.master_replid, replid, offset-replid-1); in slaveTryPartialResynchronization()
1515 server.master_replid[CONFIG_RUN_ID_SIZE] = '\0'; in slaveTryPartialResynchronization()
1516 server.master_initial_offset = strtoll(offset,NULL,10); in slaveTryPartialResynchronization()
1518 server.master_replid, in slaveTryPartialResynchronization()
1519 server.master_initial_offset); in slaveTryPartialResynchronization()
1545 if (strcmp(new,server.cached_master->replid)) { in slaveTryPartialResynchronization()
1550 memcpy(server.replid2,server.cached_master->replid, in slaveTryPartialResynchronization()
1551 sizeof(server.replid2)); in slaveTryPartialResynchronization()
1552 server.second_replid_offset = server.master_repl_offset+1; in slaveTryPartialResynchronization()
1556 memcpy(server.replid,new,sizeof(server.replid)); in slaveTryPartialResynchronization()
1557 memcpy(server.cached_master->replid,new,sizeof(server.replid)); in slaveTryPartialResynchronization()
1571 if (server.repl_backlog == NULL) createReplicationBacklog(); in slaveTryPartialResynchronization()
1619 if (server.repl_state == REPL_STATE_NONE) { in syncWithMaster()
1635 if (server.repl_state == REPL_STATE_CONNECTING) { in syncWithMaster()
1639 aeDeleteFileEvent(server.el,fd,AE_WRITABLE); in syncWithMaster()
1640 server.repl_state = REPL_STATE_RECEIVE_PONG; in syncWithMaster()
1649 if (server.repl_state == REPL_STATE_RECEIVE_PONG) { in syncWithMaster()
1669 server.repl_state = REPL_STATE_SEND_AUTH; in syncWithMaster()
1673 if (server.repl_state == REPL_STATE_SEND_AUTH) { in syncWithMaster()
1674 if (server.masterauth) { in syncWithMaster()
1675 err = sendSynchronousCommand(SYNC_CMD_WRITE,fd,"AUTH",server.masterauth,NULL); in syncWithMaster()
1677 server.repl_state = REPL_STATE_RECEIVE_AUTH; in syncWithMaster()
1680 server.repl_state = REPL_STATE_SEND_PORT; in syncWithMaster()
1685 if (server.repl_state == REPL_STATE_RECEIVE_AUTH) { in syncWithMaster()
1693 server.repl_state = REPL_STATE_SEND_PORT; in syncWithMaster()
1698 if (server.repl_state == REPL_STATE_SEND_PORT) { in syncWithMaster()
1699 sds port = sdsfromlonglong(server.slave_announce_port ? in syncWithMaster()
1700 server.slave_announce_port : server.port); in syncWithMaster()
1706 server.repl_state = REPL_STATE_RECEIVE_PORT; in syncWithMaster()
1711 if (server.repl_state == REPL_STATE_RECEIVE_PORT) { in syncWithMaster()
1720 server.repl_state = REPL_STATE_SEND_IP; in syncWithMaster()
1724 if (server.repl_state == REPL_STATE_SEND_IP && in syncWithMaster()
1725 server.slave_announce_ip == NULL) in syncWithMaster()
1727 server.repl_state = REPL_STATE_SEND_CAPA; in syncWithMaster()
1732 if (server.repl_state == REPL_STATE_SEND_IP) { in syncWithMaster()
1734 "ip-address",server.slave_announce_ip, NULL); in syncWithMaster()
1737 server.repl_state = REPL_STATE_RECEIVE_IP; in syncWithMaster()
1742 if (server.repl_state == REPL_STATE_RECEIVE_IP) { in syncWithMaster()
1751 server.repl_state = REPL_STATE_SEND_CAPA; in syncWithMaster()
1760 if (server.repl_state == REPL_STATE_SEND_CAPA) { in syncWithMaster()
1765 server.repl_state = REPL_STATE_RECEIVE_CAPA; in syncWithMaster()
1770 if (server.repl_state == REPL_STATE_RECEIVE_CAPA) { in syncWithMaster()
1779 server.repl_state = REPL_STATE_SEND_PSYNC; in syncWithMaster()
1787 if (server.repl_state == REPL_STATE_SEND_PSYNC) { in syncWithMaster()
1792 server.repl_state = REPL_STATE_RECEIVE_PSYNC; in syncWithMaster()
1797 if (server.repl_state != REPL_STATE_RECEIVE_PSYNC) { in syncWithMaster()
1800 server.repl_state); in syncWithMaster()
1833 if (syncWrite(fd,"SYNC\r\n",6,server.repl_syncio_timeout*1000) == -1) { in syncWithMaster()
1843 "temp-%d.%ld.rdb",(int)server.unixtime,(long int)getpid()); in syncWithMaster()
1854 if (aeCreateFileEvent(server.el,fd, AE_READABLE,readSyncBulkPayload,NULL) in syncWithMaster()
1863 server.repl_state = REPL_STATE_TRANSFER; in syncWithMaster()
1864 server.repl_transfer_size = -1; in syncWithMaster()
1865 server.repl_transfer_read = 0; in syncWithMaster()
1866 server.repl_transfer_last_fsync_off = 0; in syncWithMaster()
1867 server.repl_transfer_fd = dfd; in syncWithMaster()
1868 server.repl_transfer_lastio = server.unixtime; in syncWithMaster()
1869 server.repl_transfer_tmpfile = zstrdup(tmpfile); in syncWithMaster()
1873 aeDeleteFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE); in syncWithMaster()
1876 server.repl_transfer_s = -1; in syncWithMaster()
1877 server.repl_state = REPL_STATE_CONNECT; in syncWithMaster()
1890 server.masterhost,server.masterport,NET_FIRST_BIND_ADDR); in connectWithMaster()
1897 if (aeCreateFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE,syncWithMaster,NULL) == in connectWithMaster()
1905 server.repl_transfer_lastio = server.unixtime; in connectWithMaster()
1906 server.repl_transfer_s = fd; in connectWithMaster()
1907 server.repl_state = REPL_STATE_CONNECTING; in connectWithMaster()
1916 int fd = server.repl_transfer_s; in undoConnectWithMaster()
1918 aeDeleteFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE); in undoConnectWithMaster()
1920 server.repl_transfer_s = -1; in undoConnectWithMaster()
1927 serverAssert(server.repl_state == REPL_STATE_TRANSFER); in replicationAbortSyncTransfer()
1929 close(server.repl_transfer_fd); in replicationAbortSyncTransfer()
1930 unlink(server.repl_transfer_tmpfile); in replicationAbortSyncTransfer()
1931 zfree(server.repl_transfer_tmpfile); in replicationAbortSyncTransfer()
1943 if (server.repl_state == REPL_STATE_TRANSFER) { in cancelReplicationHandshake()
1945 server.repl_state = REPL_STATE_CONNECT; in cancelReplicationHandshake()
1946 } else if (server.repl_state == REPL_STATE_CONNECTING || in cancelReplicationHandshake()
1950 server.repl_state = REPL_STATE_CONNECT; in cancelReplicationHandshake()
1959 int was_master = server.masterhost == NULL; in replicationSetMaster()
1961 sdsfree(server.masterhost); in replicationSetMaster()
1962 server.masterhost = sdsnew(ip); in replicationSetMaster()
1963 server.masterport = port; in replicationSetMaster()
1964 if (server.master) { in replicationSetMaster()
1965 freeClient(server.master); in replicationSetMaster()
1976 server.repl_state = REPL_STATE_CONNECT; in replicationSetMaster()
1981 if (server.masterhost == NULL) return; /* Nothing to do. */ in replicationUnsetMaster()
1982 sdsfree(server.masterhost); in replicationUnsetMaster()
1983 server.masterhost = NULL; in replicationUnsetMaster()
1989 if (server.master) freeClient(server.master); in replicationUnsetMaster()
1997 server.repl_state = REPL_STATE_NONE; in replicationUnsetMaster()
2003 server.slaveseldb = -1; in replicationUnsetMaster()
2009 server.repl_no_slaves_since = server.unixtime; in replicationUnsetMaster()
2015 server.master = NULL; in replicationHandleMasterDisconnection()
2016 server.repl_state = REPL_STATE_CONNECT; in replicationHandleMasterDisconnection()
2017 server.repl_down_since = server.unixtime; in replicationHandleMasterDisconnection()
2026 if (server.cluster_enabled) { in replicaofCommand()
2035 if (server.masterhost) { in replicaofCommand()
2058 if (server.masterhost && !strcasecmp(server.masterhost,c->argv[1]->ptr) in replicaofCommand()
2059 && server.masterport == port) { in replicaofCommand()
2069 server.masterhost, server.masterport, client); in replicaofCommand()
2079 if (server.masterhost == NULL) { in roleCommand()
2087 addReplyLongLong(c,server.master_repl_offset); in roleCommand()
2089 listRewind(server.slaves,&li); in roleCommand()
2112 addReplyBulkCString(c,server.masterhost); in roleCommand()
2113 addReplyLongLong(c,server.masterport); in roleCommand()
2117 switch(server.repl_state) { in roleCommand()
2127 addReplyLongLong(c,server.master ? server.master->reploff : -1); in roleCommand()
2135 client *c = server.master; in replicationSendAck()
2168 serverAssert(server.master != NULL && server.cached_master == NULL); in replicationCacheMaster()
2178 sdsclear(server.master->querybuf); in replicationCacheMaster()
2179 sdsclear(server.master->pending_querybuf); in replicationCacheMaster()
2180 server.master->read_reploff = server.master->reploff; in replicationCacheMaster()
2190 server.cached_master = server.master; in replicationCacheMaster()
2216 server.master_initial_offset = server.master_repl_offset; in replicationCacheMasterUsingMyself()
2220 memcpy(server.master->replid, server.replid, sizeof(server.replid)); in replicationCacheMasterUsingMyself()
2223 unlinkClient(server.master); in replicationCacheMasterUsingMyself()
2224 server.cached_master = server.master; in replicationCacheMasterUsingMyself()
2225 server.master = NULL; in replicationCacheMasterUsingMyself()
2232 if (server.cached_master == NULL) return; in replicationDiscardCachedMaster()
2235 server.cached_master->flags &= ~CLIENT_MASTER; in replicationDiscardCachedMaster()
2236 freeClient(server.cached_master); in replicationDiscardCachedMaster()
2237 server.cached_master = NULL; in replicationDiscardCachedMaster()
2247 server.master = server.cached_master; in replicationResurrectCachedMaster()
2248 server.cached_master = NULL; in replicationResurrectCachedMaster()
2249 server.master->fd = newfd; in replicationResurrectCachedMaster()
2250 server.master->flags &= ~(CLIENT_CLOSE_AFTER_REPLY|CLIENT_CLOSE_ASAP); in replicationResurrectCachedMaster()
2251 server.master->authenticated = 1; in replicationResurrectCachedMaster()
2252 server.master->lastinteraction = server.unixtime; in replicationResurrectCachedMaster()
2253 server.repl_state = REPL_STATE_CONNECTED; in replicationResurrectCachedMaster()
2254 server.repl_down_since = 0; in replicationResurrectCachedMaster()
2257 linkClient(server.master); in replicationResurrectCachedMaster()
2258 if (aeCreateFileEvent(server.el, newfd, AE_READABLE, in replicationResurrectCachedMaster()
2259 readQueryFromClient, server.master)) { in replicationResurrectCachedMaster()
2261 freeClientAsync(server.master); /* Close ASAP. */ in replicationResurrectCachedMaster()
2266 if (clientHasPendingReplies(server.master)) { in replicationResurrectCachedMaster()
2267 if (aeCreateFileEvent(server.el, newfd, AE_WRITABLE, in replicationResurrectCachedMaster()
2268 sendReplyToClient, server.master)) { in replicationResurrectCachedMaster()
2270 freeClientAsync(server.master); /* Close ASAP. */ in replicationResurrectCachedMaster()
2285 if (!server.repl_min_slaves_to_write || in refreshGoodSlavesCount()
2286 !server.repl_min_slaves_max_lag) return; in refreshGoodSlavesCount()
2288 listRewind(server.slaves,&li); in refreshGoodSlavesCount()
2291 time_t lag = server.unixtime - slave->repl_ack_time; in refreshGoodSlavesCount()
2294 lag <= server.repl_min_slaves_max_lag) good++; in refreshGoodSlavesCount()
2296 server.repl_good_slaves_count = good; in refreshGoodSlavesCount()
2332 server.repl_scriptcache_size = 10000; in replicationScriptCacheInit()
2333 server.repl_scriptcache_dict = dictCreate(&replScriptCacheDictType,NULL); in replicationScriptCacheInit()
2334 server.repl_scriptcache_fifo = listCreate(); in replicationScriptCacheInit()
2349 dictEmpty(server.repl_scriptcache_dict,NULL); in replicationScriptCacheFlush()
2350 listRelease(server.repl_scriptcache_fifo); in replicationScriptCacheFlush()
2351 server.repl_scriptcache_fifo = listCreate(); in replicationScriptCacheFlush()
2361 if (listLength(server.repl_scriptcache_fifo) == server.repl_scriptcache_size) in replicationScriptCacheAdd()
2363 listNode *ln = listLast(server.repl_scriptcache_fifo); in replicationScriptCacheAdd()
2366 retval = dictDelete(server.repl_scriptcache_dict,oldest); in replicationScriptCacheAdd()
2368 listDelNode(server.repl_scriptcache_fifo,ln); in replicationScriptCacheAdd()
2372 retval = dictAdd(server.repl_scriptcache_dict,key,NULL); in replicationScriptCacheAdd()
2373 listAddNodeHead(server.repl_scriptcache_fifo,key); in replicationScriptCacheAdd()
2380 return dictFind(server.repl_scriptcache_dict,sha1) != NULL; in replicationScriptCacheExists()
2414 server.get_ack_from_slaves = 1; in replicationRequestAckFromSlaves()
2424 listRewind(server.slaves,&li); in replicationCountAcksByOffset()
2441 if (server.masterhost) { in waitCommand()
2464 listAddNodeTail(server.clients_waiting_acks,c); in waitCommand()
2477 listNode *ln = listSearchKey(server.clients_waiting_acks,c); in unblockClientWaitingReplicas()
2479 listDelNode(server.clients_waiting_acks,ln); in unblockClientWaitingReplicas()
2491 listRewind(server.clients_waiting_acks,&li); in processClientsWaitingReplicas()
2522 if (server.masterhost != NULL) { in replicationGetSlaveOffset()
2523 if (server.master) { in replicationGetSlaveOffset()
2524 offset = server.master->reploff; in replicationGetSlaveOffset()
2525 } else if (server.cached_master) { in replicationGetSlaveOffset()
2526 offset = server.cached_master->reploff; in replicationGetSlaveOffset()
2544 if (server.masterhost && in replicationCron()
2545 (server.repl_state == REPL_STATE_CONNECTING || in replicationCron()
2547 (time(NULL)-server.repl_transfer_lastio) > server.repl_timeout) in replicationCron()
2554 if (server.masterhost && server.repl_state == REPL_STATE_TRANSFER && in replicationCron()
2555 (time(NULL)-server.repl_transfer_lastio) > server.repl_timeout) in replicationCron()
2562 if (server.masterhost && server.repl_state == REPL_STATE_CONNECTED && in replicationCron()
2563 (time(NULL)-server.master->lastinteraction) > server.repl_timeout) in replicationCron()
2566 freeClient(server.master); in replicationCron()
2570 if (server.repl_state == REPL_STATE_CONNECT) { in replicationCron()
2572 server.masterhost, server.masterport); in replicationCron()
2581 if (server.masterhost && server.master && in replicationCron()
2582 !(server.master->flags & CLIENT_PRE_PSYNC)) in replicationCron()
2594 if ((replication_cron_loops % server.repl_ping_slave_period) == 0 && in replicationCron()
2595 listLength(server.slaves)) in replicationCron()
2602 server.cluster_enabled && in replicationCron()
2603 server.cluster->mf_end && in replicationCron()
2608 replicationFeedSlaves(server.slaves, server.slaveseldb, in replicationCron()
2628 listRewind(server.slaves,&li); in replicationCron()
2635 server.rdb_child_type != RDB_CHILD_TYPE_SOCKET)); in replicationCron()
2645 if (listLength(server.slaves)) { in replicationCron()
2649 listRewind(server.slaves,&li); in replicationCron()
2655 if ((server.unixtime - slave->repl_ack_time) > server.repl_timeout) in replicationCron()
2670 if (listLength(server.slaves) == 0 && server.repl_backlog_time_limit && in replicationCron()
2671 server.repl_backlog && server.masterhost == NULL) in replicationCron()
2673 time_t idle = server.unixtime - server.repl_no_slaves_since; in replicationCron()
2675 if (idle > server.repl_backlog_time_limit) { in replicationCron()
2697 (int) server.repl_backlog_time_limit); in replicationCron()
2704 if (listLength(server.slaves) == 0 && in replicationCron()
2705 server.aof_state == AOF_OFF && in replicationCron()
2706 listLength(server.repl_scriptcache_fifo) != 0) in replicationCron()
2717 if (server.rdb_child_pid == -1 && server.aof_child_pid == -1) { in replicationCron()
2724 listRewind(server.slaves,&li); in replicationCron()
2728 idle = server.unixtime - slave->lastinteraction; in replicationCron()
2737 (!server.repl_diskless_sync || in replicationCron()
2738 max_idle > server.repl_diskless_sync_delay)) in replicationCron()