Lines Matching refs:server
143 server.cluster->currentEpoch = in clusterLoadConfig()
146 server.cluster->lastVoteEpoch = in clusterLoadConfig()
189 serverAssert(server.cluster->myself == NULL); in clusterLoadConfig()
190 myself = server.cluster->myself = n; in clusterLoadConfig()
257 server.cluster->migrating_slots_to[slot] = cn; in clusterLoadConfig()
259 server.cluster->importing_slots_from[slot] = cn; in clusterLoadConfig()
277 if (server.cluster->myself == NULL) goto fmterr; in clusterLoadConfig()
287 if (clusterGetMaxEpoch() > server.cluster->currentEpoch) { in clusterLoadConfig()
288 server.cluster->currentEpoch = clusterGetMaxEpoch(); in clusterLoadConfig()
318 server.cluster->todo_before_sleep &= ~CLUSTER_TODO_SAVE_CONFIG; in clusterSaveConfig()
324 (unsigned long long) server.cluster->currentEpoch, in clusterSaveConfig()
325 (unsigned long long) server.cluster->lastVoteEpoch); in clusterSaveConfig()
328 if ((fd = open(server.cluster_configfile,O_WRONLY|O_CREAT,0644)) in clusterSaveConfig()
340 server.cluster->todo_before_sleep &= ~CLUSTER_TODO_FSYNC_CONFIG; in clusterSaveConfig()
419 int nofailover = server.cluster_slave_no_failover ? in clusterUpdateMyselfFlags()
432 server.cluster = zmalloc(sizeof(clusterState)); in clusterInit()
433 server.cluster->myself = NULL; in clusterInit()
434 server.cluster->currentEpoch = 0; in clusterInit()
435 server.cluster->state = CLUSTER_FAIL; in clusterInit()
436 server.cluster->size = 1; in clusterInit()
437 server.cluster->todo_before_sleep = 0; in clusterInit()
438 server.cluster->nodes = dictCreate(&clusterNodesDictType,NULL); in clusterInit()
439 server.cluster->nodes_black_list = in clusterInit()
441 server.cluster->failover_auth_time = 0; in clusterInit()
442 server.cluster->failover_auth_count = 0; in clusterInit()
443 server.cluster->failover_auth_rank = 0; in clusterInit()
444 server.cluster->failover_auth_epoch = 0; in clusterInit()
445 server.cluster->cant_failover_reason = CLUSTER_CANT_FAILOVER_NONE; in clusterInit()
446 server.cluster->lastVoteEpoch = 0; in clusterInit()
448 server.cluster->stats_bus_messages_sent[i] = 0; in clusterInit()
449 server.cluster->stats_bus_messages_received[i] = 0; in clusterInit()
451 server.cluster->stats_pfail_nodes = 0; in clusterInit()
452 memset(server.cluster->slots,0, sizeof(server.cluster->slots)); in clusterInit()
457 if (clusterLockConfig(server.cluster_configfile) == C_ERR) in clusterInit()
461 if (clusterLoadConfig(server.cluster_configfile) == C_ERR) { in clusterInit()
464 myself = server.cluster->myself = in clusterInit()
474 server.cfd_count = 0; in clusterInit()
479 if (server.port > (65535-CLUSTER_PORT_INCR)) { in clusterInit()
488 if (listenToPort(server.port+CLUSTER_PORT_INCR, in clusterInit()
489 server.cfd,&server.cfd_count) == C_ERR) in clusterInit()
495 for (j = 0; j < server.cfd_count; j++) { in clusterInit()
496 if (aeCreateFileEvent(server.el, server.cfd[j], AE_READABLE, in clusterInit()
504 server.cluster->slots_to_keys = raxNew(); in clusterInit()
505 memset(server.cluster->slots_keys_count,0, in clusterInit()
506 sizeof(server.cluster->slots_keys_count)); in clusterInit()
510 myself->port = server.port; in clusterInit()
511 myself->cport = server.port+CLUSTER_PORT_INCR; in clusterInit()
512 if (server.cluster_announce_port) in clusterInit()
513 myself->port = server.cluster_announce_port; in clusterInit()
514 if (server.cluster_announce_bus_port) in clusterInit()
515 myself->cport = server.cluster_announce_bus_port; in clusterInit()
517 server.cluster->mf_end = 0; in clusterInit()
551 di = dictGetSafeIterator(server.cluster->nodes); in clusterReset()
564 server.cluster->currentEpoch = 0; in clusterReset()
565 server.cluster->lastVoteEpoch = 0; in clusterReset()
572 dictDelete(server.cluster->nodes,oldname); in clusterReset()
604 aeDeleteFileEvent(server.el, link->fd, AE_READABLE|AE_WRITABLE); in freeClusterLink()
626 if (server.masterhost == NULL && server.loading) return; in clusterAcceptHandler()
629 cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport); in clusterAcceptHandler()
633 "Error accepting cluster node: %s", server.neterr); in clusterAcceptHandler()
648 aeCreateFileEvent(server.el,cfd,AE_READABLE,clusterReadHandler,link); in clusterAcceptHandler()
769 mstime_t maxtime = server.cluster_node_timeout * in clusterNodeCleanupFailureReports()
875 serverAssert(dictDelete(server.cluster->nodes,nodename) == DICT_OK); in freeClusterNode()
889 retval = dictAdd(server.cluster->nodes, in clusterAddNode()
912 if (server.cluster->importing_slots_from[j] == delnode) in clusterDelNode()
913 server.cluster->importing_slots_from[j] = NULL; in clusterDelNode()
914 if (server.cluster->migrating_slots_to[j] == delnode) in clusterDelNode()
915 server.cluster->migrating_slots_to[j] = NULL; in clusterDelNode()
916 if (server.cluster->slots[j] == delnode) in clusterDelNode()
921 di = dictGetSafeIterator(server.cluster->nodes); in clusterDelNode()
939 de = dictFind(server.cluster->nodes,s); in clusterLookupNode()
955 retval = dictDelete(server.cluster->nodes, s); in clusterRenameNode()
973 di = dictGetSafeIterator(server.cluster->nodes); in clusterGetMaxEpoch()
979 if (max < server.cluster->currentEpoch) max = server.cluster->currentEpoch; in clusterGetMaxEpoch()
1018 server.cluster->currentEpoch++; in clusterBumpConfigEpochWithoutConsensus()
1019 myself->configEpoch = server.cluster->currentEpoch; in clusterBumpConfigEpochWithoutConsensus()
1084 server.cluster->currentEpoch++; in clusterHandleConfigEpochCollision()
1085 myself->configEpoch = server.cluster->currentEpoch; in clusterHandleConfigEpochCollision()
1129 di = dictGetSafeIterator(server.cluster->nodes_black_list); in clusterBlacklistCleanup()
1133 if (expire < server.unixtime) in clusterBlacklistCleanup()
1134 dictDelete(server.cluster->nodes_black_list,dictGetKey(de)); in clusterBlacklistCleanup()
1145 if (dictAdd(server.cluster->nodes_black_list,id,NULL) == DICT_OK) { in clusterBlacklistAddNode()
1150 de = dictFind(server.cluster->nodes_black_list,id); in clusterBlacklistAddNode()
1163 retval = dictFind(server.cluster->nodes_black_list,id) != NULL; in clusterBlacklistExists()
1195 int needed_quorum = (server.cluster->size / 2) + 1; in markNodeAsFailingIfNeeded()
1244 (server.cluster_node_timeout * CLUSTER_FAIL_UNDO_TIME_MULT)) in clearNodeFailureIfNeeded()
1261 di = dictGetSafeIterator(server.cluster->nodes); in clusterHandshakeInProgress()
1348 if (server.verbosity == LL_DEBUG) { in clusterProcessGossipSection()
1396 if (pongtime <= (server.mstime+500) && in clusterProcessGossipSection()
1556 if (server.cluster->slots[j] == sender) continue; in clusterUpdateSlotsConfigWith()
1562 if (server.cluster->importing_slots_from[j]) continue; in clusterUpdateSlotsConfigWith()
1568 if (server.cluster->slots[j] == NULL || in clusterUpdateSlotsConfigWith()
1569 server.cluster->slots[j]->configEpoch < senderConfigEpoch) in clusterUpdateSlotsConfigWith()
1573 if (server.cluster->slots[j] == myself && in clusterUpdateSlotsConfigWith()
1581 if (server.cluster->slots[j] == curmaster) in clusterUpdateSlotsConfigWith()
1595 if (server.cluster_module_flags & CLUSTER_MODULE_FLAG_NO_REDIRECTION) in clusterUpdateSlotsConfigWith()
1641 server.cluster->stats_bus_messages_received[type]++; in clusterProcessPacket()
1706 if (senderCurrentEpoch > server.cluster->currentEpoch) in clusterProcessPacket()
1707 server.cluster->currentEpoch = senderCurrentEpoch; in clusterProcessPacket()
1719 if (server.cluster->mf_end && in clusterProcessPacket()
1723 server.cluster->mf_master_offset == 0) in clusterProcessPacket()
1725 server.cluster->mf_master_offset = sender->repl_offset; in clusterProcessPacket()
1729 server.cluster->mf_master_offset); in clusterProcessPacket()
1749 server.cluster_announce_ip == NULL) in clusterProcessPacket()
1968 if (server.cluster->slots[j] == sender || in clusterProcessPacket()
1969 server.cluster->slots[j] == NULL) continue; in clusterProcessPacket()
1970 if (server.cluster->slots[j]->configEpoch > in clusterProcessPacket()
1976 sender->name, server.cluster->slots[j]->name); in clusterProcessPacket()
1978 server.cluster->slots[j]); in clusterProcessPacket()
2028 if (dictSize(server.pubsub_channels) || in clusterProcessPacket()
2029 listLength(server.pubsub_patterns)) in clusterProcessPacket()
2051 senderCurrentEpoch >= server.cluster->failover_auth_epoch) in clusterProcessPacket()
2053 server.cluster->failover_auth_count++; in clusterProcessPacket()
2065 server.cluster->mf_end = mstime() + CLUSTER_MF_TIMEOUT; in clusterProcessPacket()
2066 server.cluster->mf_slave = sender; in clusterProcessPacket()
2135 aeDeleteFileEvent(server.el, link->fd, AE_WRITABLE); in clusterWriteHandler()
2211 aeCreateFileEvent(server.el,link->fd,AE_WRITABLE|AE_BARRIER, in clusterSendMessage()
2220 server.cluster->stats_bus_messages_sent[type]++; in clusterSendMessage()
2233 di = dictGetSafeIterator(server.cluster->nodes); in clusterBroadcastMessage()
2272 if (server.cluster_announce_ip) { in clusterBuildMessageHdr()
2273 strncpy(hdr->myip,server.cluster_announce_ip,NET_IP_STR_LEN); in clusterBuildMessageHdr()
2278 int announced_port = server.cluster_announce_port ? in clusterBuildMessageHdr()
2279 server.cluster_announce_port : server.port; in clusterBuildMessageHdr()
2280 int announced_cport = server.cluster_announce_bus_port ? in clusterBuildMessageHdr()
2281 server.cluster_announce_bus_port : in clusterBuildMessageHdr()
2282 (server.port + CLUSTER_PORT_INCR); in clusterBuildMessageHdr()
2291 hdr->state = server.cluster->state; in clusterBuildMessageHdr()
2294 hdr->currentEpoch = htonu64(server.cluster->currentEpoch); in clusterBuildMessageHdr()
2301 offset = server.master_repl_offset; in clusterBuildMessageHdr()
2305 if (nodeIsMaster(myself) && server.cluster->mf_end) in clusterBuildMessageHdr()
2360 int freshnodes = dictSize(server.cluster->nodes)-2; in clusterSendPing()
2388 wanted = floor(dictSize(server.cluster->nodes)/10); in clusterSendPing()
2394 int pfail_wanted = server.cluster->stats_pfail_nodes; in clusterSendPing()
2415 dictEntry *de = dictGetRandomKey(server.cluster->nodes); in clusterSendPing()
2451 di = dictGetSafeIterator(server.cluster->nodes); in clusterSendPing()
2498 di = dictGetSafeIterator(server.cluster->nodes); in clusterBroadcastPong()
2673 if (server.cluster->mf_end) hdr->mflags[0] |= CLUSTERMSG_FLAG0_FORCEACK; in clusterRequestFailoverAuth()
2724 if (requestCurrentEpoch < server.cluster->currentEpoch) { in clusterSendFailoverAuthIfNeeded()
2729 (unsigned long long) server.cluster->currentEpoch); in clusterSendFailoverAuthIfNeeded()
2734 if (server.cluster->lastVoteEpoch == server.cluster->currentEpoch) { in clusterSendFailoverAuthIfNeeded()
2738 (unsigned long long) server.cluster->currentEpoch); in clusterSendFailoverAuthIfNeeded()
2767 if (mstime() - node->slaveof->voted_time < server.cluster_node_timeout * 2) in clusterSendFailoverAuthIfNeeded()
2773 (long long) ((server.cluster_node_timeout*2)- in clusterSendFailoverAuthIfNeeded()
2783 if (server.cluster->slots[j] == NULL || in clusterSendFailoverAuthIfNeeded()
2784 server.cluster->slots[j]->configEpoch <= requestConfigEpoch) in clusterSendFailoverAuthIfNeeded()
2795 (unsigned long long) server.cluster->slots[j]->configEpoch, in clusterSendFailoverAuthIfNeeded()
2801 server.cluster->lastVoteEpoch = server.cluster->currentEpoch; in clusterSendFailoverAuthIfNeeded()
2806 node->name, (unsigned long long) server.cluster->currentEpoch); in clusterSendFailoverAuthIfNeeded()
2863 mstime_t nolog_fail_time = server.cluster_node_timeout + 5000; in clusterLogCantFailover()
2866 if (reason == server.cluster->cant_failover_reason && in clusterLogCantFailover()
2870 server.cluster->cant_failover_reason = reason; in clusterLogCantFailover()
2948 mstime_t auth_age = mstime() - server.cluster->failover_auth_time; in clusterHandleSlaveFailover()
2949 int needed_quorum = (server.cluster->size / 2) + 1; in clusterHandleSlaveFailover()
2950 int manual_failover = server.cluster->mf_end != 0 && in clusterHandleSlaveFailover()
2951 server.cluster->mf_can_start; in clusterHandleSlaveFailover()
2954 server.cluster->todo_before_sleep &= ~CLUSTER_TODO_HANDLE_FAILOVER; in clusterHandleSlaveFailover()
2963 auth_timeout = server.cluster_node_timeout*2; in clusterHandleSlaveFailover()
2977 (server.cluster_slave_no_failover && !manual_failover) || in clusterHandleSlaveFailover()
2982 server.cluster->cant_failover_reason = CLUSTER_CANT_FAILOVER_NONE; in clusterHandleSlaveFailover()
2988 if (server.repl_state == REPL_STATE_CONNECTED) { in clusterHandleSlaveFailover()
2989 data_age = (mstime_t)(server.unixtime - server.master->lastinteraction) in clusterHandleSlaveFailover()
2992 data_age = (mstime_t)(server.unixtime - server.repl_down_since) * 1000; in clusterHandleSlaveFailover()
2998 if (data_age > server.cluster_node_timeout) in clusterHandleSlaveFailover()
2999 data_age -= server.cluster_node_timeout; in clusterHandleSlaveFailover()
3005 if (server.cluster_slave_validity_factor && in clusterHandleSlaveFailover()
3007 (((mstime_t)server.repl_ping_slave_period * 1000) + in clusterHandleSlaveFailover()
3008 (server.cluster_node_timeout * server.cluster_slave_validity_factor))) in clusterHandleSlaveFailover()
3019 server.cluster->failover_auth_time = mstime() + in clusterHandleSlaveFailover()
3022 server.cluster->failover_auth_count = 0; in clusterHandleSlaveFailover()
3023 server.cluster->failover_auth_sent = 0; in clusterHandleSlaveFailover()
3024 server.cluster->failover_auth_rank = clusterGetSlaveRank(); in clusterHandleSlaveFailover()
3028 server.cluster->failover_auth_time += in clusterHandleSlaveFailover()
3029 server.cluster->failover_auth_rank * 1000; in clusterHandleSlaveFailover()
3031 if (server.cluster->mf_end) { in clusterHandleSlaveFailover()
3032 server.cluster->failover_auth_time = mstime(); in clusterHandleSlaveFailover()
3033 server.cluster->failover_auth_rank = 0; in clusterHandleSlaveFailover()
3039 server.cluster->failover_auth_time - mstime(), in clusterHandleSlaveFailover()
3040 server.cluster->failover_auth_rank, in clusterHandleSlaveFailover()
3054 if (server.cluster->failover_auth_sent == 0 && in clusterHandleSlaveFailover()
3055 server.cluster->mf_end == 0) in clusterHandleSlaveFailover()
3058 if (newrank > server.cluster->failover_auth_rank) { in clusterHandleSlaveFailover()
3060 (newrank - server.cluster->failover_auth_rank) * 1000; in clusterHandleSlaveFailover()
3061 server.cluster->failover_auth_time += added_delay; in clusterHandleSlaveFailover()
3062 server.cluster->failover_auth_rank = newrank; in clusterHandleSlaveFailover()
3070 if (mstime() < server.cluster->failover_auth_time) { in clusterHandleSlaveFailover()
3082 if (server.cluster->failover_auth_sent == 0) { in clusterHandleSlaveFailover()
3083 server.cluster->currentEpoch++; in clusterHandleSlaveFailover()
3084 server.cluster->failover_auth_epoch = server.cluster->currentEpoch; in clusterHandleSlaveFailover()
3086 (unsigned long long) server.cluster->currentEpoch); in clusterHandleSlaveFailover()
3088 server.cluster->failover_auth_sent = 1; in clusterHandleSlaveFailover()
3096 if (server.cluster->failover_auth_count >= needed_quorum) { in clusterHandleSlaveFailover()
3103 if (myself->configEpoch < server.cluster->failover_auth_epoch) { in clusterHandleSlaveFailover()
3104 myself->configEpoch = server.cluster->failover_auth_epoch; in clusterHandleSlaveFailover()
3151 if (server.cluster->state != CLUSTER_OK) return; in clusterHandleSlaveMigration()
3159 if (okslaves <= server.cluster_migration_barrier) return; in clusterHandleSlaveMigration()
3172 di = dictGetSafeIterator(server.cluster->nodes); in clusterHandleSlaveMigration()
3221 !(server.cluster_module_flags & CLUSTER_MODULE_FLAG_NO_FAILOVER)) in clusterHandleSlaveMigration()
3264 if (server.cluster->mf_end && clientsArePaused()) { in resetManualFailover()
3265 server.clients_pause_end_time = 0; in resetManualFailover()
3268 server.cluster->mf_end = 0; /* No manual failover in progress. */ in resetManualFailover()
3269 server.cluster->mf_can_start = 0; in resetManualFailover()
3270 server.cluster->mf_slave = NULL; in resetManualFailover()
3271 server.cluster->mf_master_offset = 0; in resetManualFailover()
3276 if (server.cluster->mf_end && server.cluster->mf_end < mstime()) { in manualFailoverCheckTimeout()
3286 if (server.cluster->mf_end == 0) return; in clusterHandleManualFailover()
3290 if (server.cluster->mf_can_start) return; in clusterHandleManualFailover()
3292 if (server.cluster->mf_master_offset == 0) return; /* Wait for offset... */ in clusterHandleManualFailover()
3294 if (server.cluster->mf_master_offset == replicationGetSlaveOffset()) { in clusterHandleManualFailover()
3297 server.cluster->mf_can_start = 1; in clusterHandleManualFailover()
3328 char *curr_ip = server.cluster_announce_ip; in clusterCron()
3344 strncpy(myself->ip,server.cluster_announce_ip,NET_IP_STR_LEN); in clusterCron()
3356 handshake_timeout = server.cluster_node_timeout; in clusterCron()
3365 di = dictGetSafeIterator(server.cluster->nodes); in clusterCron()
3366 server.cluster->stats_pfail_nodes = 0; in clusterCron()
3375 server.cluster->stats_pfail_nodes++; in clusterCron()
3389 fd = anetTcpNonBlockBindConnect(server.neterr, node->ip, in clusterCron()
3400 node->cport, server.neterr); in clusterCron()
3406 aeCreateFileEvent(server.el,link->fd,AE_READABLE, in clusterCron()
3444 de = dictGetRandomKey(server.cluster->nodes); in clusterCron()
3471 di = dictGetSafeIterator(server.cluster->nodes); in clusterCron()
3504 server.cluster_node_timeout && /* was not already reconnected */ in clusterCron()
3508 now - node->ping_sent > server.cluster_node_timeout/2) in clusterCron()
3520 (now - node->pong_received) > server.cluster_node_timeout/2) in clusterCron()
3528 if (server.cluster->mf_end && in clusterCron()
3530 server.cluster->mf_slave == node && in clusterCron()
3545 if (delay > server.cluster_node_timeout) { in clusterCron()
3562 server.masterhost == NULL && in clusterCron()
3574 if (!(server.cluster_module_flags & CLUSTER_MODULE_FLAG_NO_FAILOVER)) in clusterCron()
3585 if (update_state || server.cluster->state == CLUSTER_FAIL) in clusterCron()
3597 if (server.cluster->todo_before_sleep & CLUSTER_TODO_HANDLE_FAILOVER) in clusterBeforeSleep()
3601 if (server.cluster->todo_before_sleep & CLUSTER_TODO_UPDATE_STATE) in clusterBeforeSleep()
3605 if (server.cluster->todo_before_sleep & CLUSTER_TODO_SAVE_CONFIG) { in clusterBeforeSleep()
3606 int fsync = server.cluster->todo_before_sleep & in clusterBeforeSleep()
3613 server.cluster->todo_before_sleep = 0; in clusterBeforeSleep()
3617 server.cluster->todo_before_sleep |= flags; in clusterDoBeforeSleep()
3650 dictIterator *di = dictGetSafeIterator(server.cluster->nodes); in clusterMastersHaveSlaves()
3706 if (server.cluster->slots[slot]) return C_ERR; in clusterAddSlot()
3708 server.cluster->slots[slot] = n; in clusterAddSlot()
3716 clusterNode *n = server.cluster->slots[slot]; in clusterDelSlot()
3720 server.cluster->slots[slot] = NULL; in clusterDelSlot()
3741 memset(server.cluster->migrating_slots_to,0, in clusterCloseAllSlots()
3742 sizeof(server.cluster->migrating_slots_to)); in clusterCloseAllSlots()
3743 memset(server.cluster->importing_slots_from,0, in clusterCloseAllSlots()
3744 sizeof(server.cluster->importing_slots_from)); in clusterCloseAllSlots()
3765 server.cluster->todo_before_sleep &= ~CLUSTER_TODO_UPDATE_STATE; in clusterUpdateState()
3775 server.cluster->state == CLUSTER_FAIL && in clusterUpdateState()
3783 if (server.cluster_require_full_coverage) { in clusterUpdateState()
3785 if (server.cluster->slots[j] == NULL || in clusterUpdateState()
3786 server.cluster->slots[j]->flags & (CLUSTER_NODE_FAIL)) in clusterUpdateState()
3803 server.cluster->size = 0; in clusterUpdateState()
3804 di = dictGetSafeIterator(server.cluster->nodes); in clusterUpdateState()
3809 server.cluster->size++; in clusterUpdateState()
3820 int needed_quorum = (server.cluster->size / 2) + 1; in clusterUpdateState()
3829 if (new_state != server.cluster->state) { in clusterUpdateState()
3830 mstime_t rejoin_delay = server.cluster_node_timeout; in clusterUpdateState()
3851 server.cluster->state = new_state; in clusterUpdateState()
3883 if (server.cluster_module_flags & CLUSTER_MODULE_FLAG_NO_REDIRECTION) in verifyClusterConfigWithData()
3891 for (j = 1; j < server.dbnum; j++) { in verifyClusterConfigWithData()
3892 if (dictSize(server.db[j].dict)) return C_ERR; in verifyClusterConfigWithData()
3902 if (server.cluster->slots[j] == myself || in verifyClusterConfigWithData()
3903 server.cluster->importing_slots_from[j] != NULL) continue; in verifyClusterConfigWithData()
3911 if (server.cluster->slots[j] == NULL) { in verifyClusterConfigWithData()
3919 server.cluster->importing_slots_from[j] = server.cluster->slots[j]; in verifyClusterConfigWithData()
4042 if (server.cluster->migrating_slots_to[j]) { in clusterGenNodeDescription()
4044 server.cluster->migrating_slots_to[j]->name); in clusterGenNodeDescription()
4045 } else if (server.cluster->importing_slots_from[j]) { in clusterGenNodeDescription()
4047 server.cluster->importing_slots_from[j]->name); in clusterGenNodeDescription()
4071 di = dictGetSafeIterator(server.cluster->nodes); in clusterGenNodesDescription()
4133 dictIterator *di = dictGetSafeIterator(server.cluster->nodes); in clusterReplyMultiBulkSlots()
4192 if (server.cluster_enabled == 0) { in clusterCommand()
4268 if (dictSize(server.db[0].dict) != 0) { in clusterCommand()
4292 if (del && server.cluster->slots[slot] == NULL) { in clusterCommand()
4296 } else if (!del && server.cluster->slots[slot]) { in clusterCommand()
4314 if (server.cluster->importing_slots_from[j]) in clusterCommand()
4315 server.cluster->importing_slots_from[j] = NULL; in clusterCommand()
4341 if (server.cluster->slots[slot] != myself) { in clusterCommand()
4350 server.cluster->migrating_slots_to[slot] = n; in clusterCommand()
4352 if (server.cluster->slots[slot] == myself) { in clusterCommand()
4362 server.cluster->importing_slots_from[slot] = n; in clusterCommand()
4365 server.cluster->importing_slots_from[slot] = NULL; in clusterCommand()
4366 server.cluster->migrating_slots_to[slot] = NULL; in clusterCommand()
4378 if (server.cluster->slots[slot] == myself && n != myself) { in clusterCommand()
4390 server.cluster->migrating_slots_to[slot]) in clusterCommand()
4391 server.cluster->migrating_slots_to[slot] = NULL; in clusterCommand()
4396 server.cluster->importing_slots_from[slot]) in clusterCommand()
4411 server.cluster->importing_slots_from[slot] = NULL; in clusterCommand()
4437 clusterNode *n = server.cluster->slots[j]; in clusterCommand()
4463 , statestr[server.cluster->state], in clusterCommand()
4468 dictSize(server.cluster->nodes), in clusterCommand()
4469 server.cluster->size, in clusterCommand()
4470 (unsigned long long) server.cluster->currentEpoch, in clusterCommand()
4479 if (server.cluster->stats_bus_messages_sent[i] == 0) continue; in clusterCommand()
4480 tot_msg_sent += server.cluster->stats_bus_messages_sent[i]; in clusterCommand()
4484 server.cluster->stats_bus_messages_sent[i]); in clusterCommand()
4490 if (server.cluster->stats_bus_messages_received[i] == 0) continue; in clusterCommand()
4491 tot_msg_received += server.cluster->stats_bus_messages_received[i]; in clusterCommand()
4495 server.cluster->stats_bus_messages_received[i]); in clusterCommand()
4603 (myself->numslots != 0 || dictSize(server.db[0].dict) != 0)) { in clusterCommand()
4683 server.cluster->mf_end = mstime() + CLUSTER_MF_TIMEOUT; in clusterCommand()
4698 server.cluster->mf_can_start = 1; in clusterCommand()
4720 } else if (dictSize(server.cluster->nodes) > 1) { in clusterCommand()
4731 if (server.cluster->currentEpoch < (uint64_t)epoch) in clusterCommand()
4732 server.cluster->currentEpoch = epoch; in clusterCommand()
4936 server.dirty++; in restoreCommand()
4974 cs = dictFetchValue(server.migrate_cached_sockets,name); in migrateGetSocket()
4977 cs->last_use_time = server.unixtime; in migrateGetSocket()
4982 if (dictSize(server.migrate_cached_sockets) == MIGRATE_SOCKET_CACHE_ITEMS) { in migrateGetSocket()
4984 dictEntry *de = dictGetRandomKey(server.migrate_cached_sockets); in migrateGetSocket()
4988 dictDelete(server.migrate_cached_sockets,dictGetKey(de)); in migrateGetSocket()
4992 fd = anetTcpNonBlockConnect(server.neterr,c->argv[1]->ptr, in migrateGetSocket()
4997 server.neterr); in migrateGetSocket()
5000 anetEnableTcpNoDelay(server.neterr,fd); in migrateGetSocket()
5015 cs->last_use_time = server.unixtime; in migrateGetSocket()
5016 dictAdd(server.migrate_cached_sockets,name,cs); in migrateGetSocket()
5028 cs = dictFetchValue(server.migrate_cached_sockets,name); in migrateCloseSocket()
5036 dictDelete(server.migrate_cached_sockets,name); in migrateCloseSocket()
5041 dictIterator *di = dictGetSafeIterator(server.migrate_cached_sockets); in migrateCloseTimedoutSockets()
5047 if ((server.unixtime - cs->last_use_time) > MIGRATE_SOCKET_CACHE_TTL) { in migrateCloseTimedoutSockets()
5050 dictDelete(server.migrate_cached_sockets,dictGetKey(de)); in migrateCloseTimedoutSockets()
5194 if (server.cluster_enabled) in migrateCommand()
5288 server.dirty++; in migrateCommand()
5394 if (server.cluster_enabled == 0) { in askingCommand()
5406 if (server.cluster_enabled == 0) { in readonlyCommand()
5461 if (server.cluster_module_flags & CLUSTER_MODULE_FLAG_NO_REDIRECTION) in getNodeByQuery()
5512 n = server.cluster->slots[slot]; in getNodeByQuery()
5531 server.cluster->migrating_slots_to[slot] != NULL) in getNodeByQuery()
5534 } else if (server.cluster->importing_slots_from[slot] != NULL) { in getNodeByQuery()
5557 lookupKeyRead(&server.db[0],thiskey) == NULL) in getNodeByQuery()
5570 if (server.cluster->state != CLUSTER_OK) { in getNodeByQuery()
5588 return server.cluster->migrating_slots_to[slot]; in getNodeByQuery()
5676 if (server.cluster->state == CLUSTER_FAIL) { in clusterRedirectBlockedClientIfNeeded()
5686 clusterNode *node = server.cluster->slots[slot]; in clusterRedirectBlockedClientIfNeeded()
5692 server.cluster->importing_slots_from[slot] == NULL) in clusterRedirectBlockedClientIfNeeded()