diff --git a/redis.conf b/redis.conf index 5d2b27ffb..7229d5c27 100644 --- a/redis.conf +++ b/redis.conf @@ -1827,6 +1827,41 @@ aof-timestamp-enabled no # # cluster-slot-stats-enabled no +# Slot migration write pause timeout controls how long the source node will +# pause write operations during slot migration handoff phase. This usually +# finishes in a few milliseconds, depending on traffic and load. When the source +# node pauses writes to allow the destination to catch up and take the ownership +# of the slots, this timeout prevents writes from being blocked indefinitely. +# +# If the destination node fails to complete the slot ownership takeover within +# this timeout, the source node will resume accepting writes and assume the +# migration task is failed. This prevents the source node from being permanently +# blocked if the destination node becomes unresponsive or fails during migration. +# +# If this timeout is set too low, the source may resume writes and assume that +# the slot migration has failed while the destination is still in the process of +# draining the replication stream and publishing the configuration update. +# During this window, writes accepted by the source will not be replicated to +# the destination; if the destination later publishes the updated config and +# takes ownership, those writes could be lost. Therefore, avoid setting this +# timeout too low. +# +# This timeout is specified in milliseconds. +# +# cluster-slot-migration-write-pause-timeout 10000 + +# This config controls the maximum acceptable lag in bytes between source and +# destination nodes during slot migration before triggering the slot handoff +# phase. If the remaining replication stream size falls below this threshold, +# the source node pauses writes and then signals destination that it can take +# over the slot ownership after draining the remaining replication stream. +# +# A smaller value means potentially shorter write pause duration, but it may +# take longer for the destination to catch up. A larger value means handoff can +# be triggered earlier, but the write pause may potentially be longer. +# +# cluster-slot-migration-handoff-max-lag-bytes 1mb + # In order to setup your cluster make sure to read the documentation # available at https://redis.io web site. diff --git a/src/Makefile b/src/Makefile index 99536485c..0589b7141 100644 --- a/src/Makefile +++ b/src/Makefile @@ -382,7 +382,7 @@ endif REDIS_SERVER_NAME=redis-server$(PROG_SUFFIX) REDIS_SENTINEL_NAME=redis-sentinel$(PROG_SUFFIX) -REDIS_SERVER_OBJ=threads_mngr.o memory_prefetch.o adlist.o quicklist.o ae.o anet.o dict.o ebuckets.o eventnotifier.o iothread.o mstr.o kvstore.o fwtree.o estore.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o cluster_legacy.o cluster_slot_stats.o crc16.o endianconv.o slowlog.o eval.o bio.o rio.o rand.o memtest.o syscheck.o crcspeed.o crccombine.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o redis-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o lolwut8.o acl.o tracking.o socket.o tls.o sha256.o timeout.o setcpuaffinity.o monotonic.o mt19937-64.o resp_parser.o call_reply.o script_lua.o script.o functions.o function_lua.o commands.o strl.o connection.o unix.o logreqres.o +REDIS_SERVER_OBJ=threads_mngr.o memory_prefetch.o adlist.o quicklist.o ae.o anet.o dict.o ebuckets.o eventnotifier.o iothread.o mstr.o kvstore.o fwtree.o estore.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o cluster_asm.o cluster_legacy.o cluster_slot_stats.o crc16.o endianconv.o slowlog.o eval.o bio.o rio.o rand.o memtest.o syscheck.o crcspeed.o crccombine.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o redis-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o lolwut8.o acl.o tracking.o socket.o tls.o sha256.o timeout.o setcpuaffinity.o monotonic.o mt19937-64.o resp_parser.o call_reply.o script_lua.o script.o functions.o function_lua.o commands.o strl.o connection.o unix.o logreqres.o REDIS_CLI_NAME=redis-cli$(PROG_SUFFIX) REDIS_CLI_OBJ=anet.o adlist.o dict.o redis-cli.o zmalloc.o release.o ae.o redisassert.o crcspeed.o crccombine.o crc64.o siphash.o crc16.o monotonic.o cli_common.o mt19937-64.o strl.o cli_commands.o REDIS_BENCHMARK_NAME=redis-benchmark$(PROG_SUFFIX) diff --git a/src/acl.c b/src/acl.c index b9f81bcc8..6bd3f0ee4 100644 --- a/src/acl.c +++ b/src/acl.c @@ -3203,7 +3203,7 @@ void addReplyCommandCategories(client *c, struct redisCommand *cmd) { /* When successful, initiates an internal connection, that is able to execute * internal commands (see CMD_INTERNAL). */ static void internalAuth(client *c) { - if (server.cluster == NULL) { + if (!server.cluster_enabled) { addReplyError(c, "Cannot authenticate as an internal connection on non-cluster instances"); return; } diff --git a/src/aof.c b/src/aof.c index 8a9be94b6..94a28775b 100644 --- a/src/aof.c +++ b/src/aof.c @@ -11,6 +11,7 @@ #include "bio.h" #include "rio.h" #include "functions.h" +#include "cluster_asm.h" #include #include @@ -2384,11 +2385,48 @@ werr: return 0; } +int rewriteObject(rio *r, robj *key, robj *o, int dbid, long long expiretime) { + /* Save the key and associated value */ + if (o->type == OBJ_STRING) { + /* Emit a SET command */ + static const char cmd[]="*3\r\n$3\r\nSET\r\n"; + if (rioWrite(r,cmd,sizeof(cmd)-1) == 0) return C_ERR; + /* Key and value */ + if (rioWriteBulkObject(r,key) == 0) return C_ERR; + if (rioWriteBulkObject(r,o) == 0) return C_ERR; + } else if (o->type == OBJ_LIST) { + if (rewriteListObject(r,key,o) == 0) return C_ERR; + } else if (o->type == OBJ_SET) { + if (rewriteSetObject(r,key,o) == 0) return C_ERR; + } else if (o->type == OBJ_ZSET) { + if (rewriteSortedSetObject(r,key,o) == 0) return C_ERR; + } else if (o->type == OBJ_HASH) { + if (rewriteHashObject(r,key,o) == 0) return C_ERR; + } else if (o->type == OBJ_STREAM) { + if (rewriteStreamObject(r,key,o) == 0) return C_ERR; + } else if (o->type == OBJ_MODULE) { + if (rewriteModuleObject(r,key,o,dbid) == 0) return C_ERR; + } else { + serverPanic("Unknown object type"); + } + + /* Save the expire time */ + if (expiretime != -1) { + static const char cmd[]="*3\r\n$9\r\nPEXPIREAT\r\n"; + if (rioWrite(r,cmd,sizeof(cmd)-1) == 0) return C_ERR; + if (rioWriteBulkObject(r,key) == 0) return C_ERR; + if (rioWriteBulkLongLong(r,expiretime) == 0) return C_ERR; + } + + return C_OK; +} + int rewriteAppendOnlyFileRio(rio *aof) { dictEntry *de; int j; long key_count = 0; long long updated_time = 0; + unsigned long long skipped = 0; kvstoreIterator *kvs_it = NULL; /* Record timestamp at the beginning of rewriting AOF. */ @@ -2420,34 +2458,21 @@ int rewriteAppendOnlyFileRio(rio *aof) { /* Get the expire time */ expiretime = kvobjGetExpire(o); + + /* Skip keys that are being trimmed */ + if (server.cluster_enabled) { + int curr_slot = kvstoreIteratorGetCurrentDictIndex(kvs_it); + if (isSlotInTrimJob(curr_slot)) { + skipped++; + continue; + } + } /* Set on stack string object for key */ robj key; initStaticStringObject(key, kvobjGetKey(o)); - /* Save the key and associated value */ - if (o->type == OBJ_STRING) { - /* Emit a SET command */ - char cmd[]="*3\r\n$3\r\nSET\r\n"; - if (rioWrite(aof,cmd,sizeof(cmd)-1) == 0) goto werr; - /* Key and value */ - if (rioWriteBulkObject(aof,&key) == 0) goto werr; - if (rioWriteBulkObject(aof,o) == 0) goto werr; - } else if (o->type == OBJ_LIST) { - if (rewriteListObject(aof,&key,o) == 0) goto werr; - } else if (o->type == OBJ_SET) { - if (rewriteSetObject(aof,&key,o) == 0) goto werr; - } else if (o->type == OBJ_ZSET) { - if (rewriteSortedSetObject(aof,&key,o) == 0) goto werr; - } else if (o->type == OBJ_HASH) { - if (rewriteHashObject(aof,&key,o) == 0) goto werr; - } else if (o->type == OBJ_STREAM) { - if (rewriteStreamObject(aof,&key,o) == 0) goto werr; - } else if (o->type == OBJ_MODULE) { - if (rewriteModuleObject(aof,&key,o,j) == 0) goto werr; - } else { - serverPanic("Unknown object type"); - } + if (rewriteObject(aof, &key, o, j, expiretime) == C_ERR) goto werr; /* In fork child process, we can try to release memory back to the * OS and possibly avoid or decrease COW. We give the dismiss @@ -2455,14 +2480,6 @@ int rewriteAppendOnlyFileRio(rio *aof) { size_t dump_size = aof->processed_bytes - aof_bytes_before_key; if (server.in_fork_child) dismissObject(o, dump_size); - /* Save the expire time */ - if (expiretime != -1) { - char cmd[]="*3\r\n$9\r\nPEXPIREAT\r\n"; - if (rioWrite(aof,cmd,sizeof(cmd)-1) == 0) goto werr; - if (rioWriteBulkObject(aof,&key) == 0) goto werr; - if (rioWriteBulkLongLong(aof,expiretime) == 0) goto werr; - } - /* Update info every 1 second (approximately). * in order to avoid calling mstime() on each iteration, we will * check the diff every 1024 keys */ @@ -2480,6 +2497,7 @@ int rewriteAppendOnlyFileRio(rio *aof) { } kvstoreIteratorRelease(kvs_it); } + serverLog(LL_NOTICE, "AOF rewrite done, %ld keys saved, %llu keys skipped.", key_count, skipped); return C_OK; werr: diff --git a/src/blocked.c b/src/blocked.c index a1c8702a4..ee5a36514 100644 --- a/src/blocked.c +++ b/src/blocked.c @@ -76,7 +76,8 @@ void blockClient(client *c, int btype) { serverAssert(!(c->flags & CLIENT_MASTER && btype != BLOCKED_MODULE && btype != BLOCKED_LAZYFREE && - btype != BLOCKED_POSTPONE)); + btype != BLOCKED_POSTPONE && + btype != BLOCKED_POSTPONE_TRIM)); c->flags |= CLIENT_BLOCKED; c->bstate.btype = btype; @@ -191,7 +192,7 @@ void unblockClient(client *c, int queue_for_reprocessing) { } else if (c->bstate.btype == BLOCKED_MODULE) { if (moduleClientIsBlockedOnKeys(c)) unblockClientWaitingData(c); unblockClientFromModule(c); - } else if (c->bstate.btype == BLOCKED_POSTPONE) { + } else if (c->bstate.btype == BLOCKED_POSTPONE || c->bstate.btype == BLOCKED_POSTPONE_TRIM) { listDelNode(server.postponed_clients,c->postponed_list_node); c->postponed_list_node = NULL; } else if (c->bstate.btype == BLOCKED_SHUTDOWN) { @@ -293,7 +294,7 @@ void disconnectAllBlockedClients(void) { * command processing will start from scratch, and the command will * be either executed or rejected. (unlike LIST blocked clients for * which the command is already in progress in a way. */ - if (c->bstate.btype == BLOCKED_POSTPONE) + if (c->bstate.btype == BLOCKED_POSTPONE || c->bstate.btype == BLOCKED_POSTPONE_TRIM) continue; if (c->bstate.btype == BLOCKED_LAZYFREE) { @@ -639,15 +640,21 @@ void blockForAofFsync(client *c, mstime_t timeout, long long offset, int numloca /* Postpone client from executing a command. For example the server might be busy * requesting to avoid processing clients commands which will be processed later * when the it is ready to accept them. */ -void blockPostponeClient(client *c) { +void blockPostponeClientWithType(client *c, int btype) { + serverAssert(btype == BLOCKED_POSTPONE || btype == BLOCKED_POSTPONE_TRIM); c->bstate.timeout = 0; - blockClient(c,BLOCKED_POSTPONE); + blockClient(c, btype); listAddNodeTail(server.postponed_clients, c); c->postponed_list_node = listLast(server.postponed_clients); /* Mark this client to execute its command */ c->flags |= CLIENT_PENDING_COMMAND; } +/* Postpone client from executing a command. */ +void blockPostponeClient(client *c) { + blockPostponeClientWithType(c, BLOCKED_POSTPONE); +} + /* Block client due to shutdown command */ void blockClientShutdown(client *c) { blockClient(c, BLOCKED_SHUTDOWN); diff --git a/src/cluster.c b/src/cluster.c index 2f861b5c2..330907b60 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -20,6 +20,7 @@ #include "server.h" #include "cluster.h" +#include "cluster_asm.h" #include "cluster_slot_stats.h" #include @@ -279,7 +280,7 @@ void restoreCommand(client *c) { objectSetLRUOrLFU(kv, lfu_freq, lru_idle, lru_clock, 1000); signalModifiedKey(c,c->db,key); notifyKeyspaceEvent(NOTIFY_GENERIC,"restore",key,c->db->id); - + /* If we deleted a key that means REPLACE parameter was passed and the * destination key existed. */ if (deleted) { @@ -1016,6 +1017,11 @@ void clusterCommand(client *c) { addReplyError(c,"Invalid slot"); return; } + + if (!clusterCanAccessKeysInSlot(slot)) { + addReplyLongLong(c, 0); + return; + } addReplyLongLong(c,countKeysInSlot(slot)); } else if (!strcasecmp(c->argv[1]->ptr,"getkeysinslot") && c->argc == 4) { /* CLUSTER GETKEYSINSLOT */ @@ -1031,6 +1037,11 @@ void clusterCommand(client *c) { return; } + if (!clusterCanAccessKeysInSlot(slot)) { + addReplyArrayLen(c, 0); + return; + } + unsigned int keys_in_slot = countKeysInSlot(slot); unsigned int numkeys = maxkeys > keys_in_slot ? keys_in_slot : maxkeys; addReplyArrayLen(c,numkeys); @@ -1588,14 +1599,374 @@ void readonlyCommand(client *c) { addReply(c,shared.ok); } -void replySlotsFlushAndFree(client *c, SlotsFlush *sflush) { - addReplyArrayLen(c, sflush->numRanges); - for (int i = 0 ; i < sflush->numRanges ; i++) { - addReplyArrayLen(c, 2); - addReplyLongLong(c, sflush->ranges[i].first); - addReplyLongLong(c, sflush->ranges[i].last); +/* Remove all the keys in the specified hash slot. + * The number of removed items is returned. */ +unsigned int clusterDelKeysInSlot(unsigned int hashslot, int by_command) { + unsigned int j = 0; + + if (!kvstoreDictSize(server.db->keys, (int) hashslot)) + return 0; + + kvstoreDictIterator *kvs_di = NULL; + dictEntry *de = NULL; + kvs_di = kvstoreGetDictSafeIterator(server.db->keys, (int) hashslot); + while((de = kvstoreDictIteratorNext(kvs_di)) != NULL) { + enterExecutionUnit(1, 0); + sds sdskey = kvobjGetKey(dictGetKV(de)); + robj *key = createStringObject(sdskey, sdslen(sdskey)); + dbDelete(&server.db[0], key); + + signalModifiedKey(NULL, &server.db[0], key); + if (by_command) { + /* Keys are deleted by a command (trimslots), we need to notify the + * keyspace event. Though, we don't need to propagate the DEL + * command, as the command (trimslots) will be propagated. */ + notifyKeyspaceEvent(NOTIFY_GENERIC, "del", key, server.db[0].id); + } else { + /* Propagate the DEL command */ + propagateDeletion(&server.db[0], key, server.lazyfree_lazy_server_del); + /* The keys are not actually logically deleted from the database, + * just moved to another node. The modules needs to know that these + * keys are no longer available locally, so just send the keyspace + * notification to the modules, but not to clients. */ + moduleNotifyKeyspaceEvent(NOTIFY_GENERIC, "del", key, server.db[0].id); + } + exitExecutionUnit(); + postExecutionUnitOperations(); + decrRefCount(key); + j++; + server.dirty++; } - zfree(sflush); + kvstoreReleaseDictIterator(kvs_di); + return j; +} + +/* Delete the keys in the slot ranges. Returns the number of deleted items */ +unsigned int clusterDelKeysInSlotRangeArray(slotRangeArray *slots, int by_command) { + unsigned int j = 0; + for (int i = 0; i < slots->num_ranges; i++) { + for (int slot = slots->ranges[i].start; slot <= slots->ranges[i].end; slot++) { + j += clusterDelKeysInSlot(slot, by_command); + } + } + return j; +} + +int clusterIsMySlot(int slot) { + return getMyClusterNode() == getNodeBySlot(slot); +} + +void replySlotsFlushAndFree(client *c, slotRangeArray *slots) { + addReplyArrayLen(c, slots->num_ranges); + for (int i = 0 ; i < slots->num_ranges ; i++) { + addReplyArrayLen(c, 2); + addReplyLongLong(c, slots->ranges[i].start); + addReplyLongLong(c, slots->ranges[i].end); + } + slotRangeArrayFree(slots); +} + +/* Checks that slot ranges are well-formed and non-overlapping. */ +int validateSlotRanges(slotRangeArray *slots, sds *err) { + unsigned char used_slots[CLUSTER_SLOTS] = {0}; + + if (slots->num_ranges <= 0 || slots->num_ranges >= CLUSTER_SLOTS) { + *err = sdscatprintf(sdsempty(), "invalid number of slot ranges: %d", slots->num_ranges); + return C_ERR; + } + + for (int i = 0; i < slots->num_ranges; i++) { + if (slots->ranges[i].start >= CLUSTER_SLOTS || + slots->ranges[i].end >= CLUSTER_SLOTS) + { + *err = sdscatprintf(sdsempty(), "slot range is out of range: %d-%d", + slots->ranges[i].start, slots->ranges[i].end); + return C_ERR; + } + + if (slots->ranges[i].start > slots->ranges[i].end) { + *err = sdscatprintf(sdsempty(), "start slot number %d is greater than end slot number %d", + slots->ranges[i].start, slots->ranges[i].end); + return C_ERR; + } + + for (int j = slots->ranges[i].start; j <= slots->ranges[i].end; j++) { + if (used_slots[j]) { + *err = sdscatprintf(sdsempty(), "Slot %d specified multiple times", j); + return C_ERR; + } + used_slots[j]++; + } + } + return C_OK; +} + +/* Create a slot range array with the specified number of ranges. */ +slotRangeArray *slotRangeArrayCreate(int num_ranges) { + slotRangeArray *slots = zcalloc(sizeof(slotRangeArray) + num_ranges * sizeof(slotRange)); + slots->num_ranges = num_ranges; + return slots; +} + +/* Duplicate the slot range array. */ +slotRangeArray *slotRangeArrayDup(slotRangeArray *slots) { + slotRangeArray *dup = slotRangeArrayCreate(slots->num_ranges); + memcpy(dup->ranges, slots->ranges, sizeof(slotRange) * slots->num_ranges); + return dup; +} + +/* Set the slot range at the specified index. */ +void slotRangeArraySet(slotRangeArray *slots, int idx, int start, int end) { + slots->ranges[idx].start = start; + slots->ranges[idx].end = end; +} + +/* Create a slot range string in the format of: "1000-2000 3000-4000 ..." */ +sds slotRangeArrayToString(slotRangeArray *slots) { + sds s = sdsempty(); + + for (int i = 0; i < slots->num_ranges; i++) { + slotRange *sr = &slots->ranges[i]; + s = sdscatprintf(s, "%d-%d ", sr->start, sr->end); + } + sdssetlen(s, sdslen(s) - 1); + s[sdslen(s)] = '\0'; + + return s; +} + +/* Parse a slot range string in the format "1000-2000 3000-4000 ..." into a slotRangeArray. + * Returns a new slotRangeArray on success, NULL on failure. */ +slotRangeArray *slotRangeArrayFromString(sds data) { + int num_ranges; + long long start, end; + slotRangeArray *slots = NULL; + if (!data || sdslen(data) == 0) return NULL; + + sds *parts = sdssplitlen(data, sdslen(data), " ", 1, &num_ranges); + if (num_ranges <= 0) goto err; + + slots = slotRangeArrayCreate(num_ranges); + + /* Parse each slot range */ + for (int i = 0; i < num_ranges; i++) { + char *dash = strchr(parts[i], '-'); + if (!dash) goto err; + + if (string2ll(parts[i], dash - parts[i], &start) == 0 || + string2ll(dash + 1, sdslen(parts[i]) - (dash - parts[i]) - 1, &end) == 0) + goto err; + slotRangeArraySet(slots, i, start, end); + } + + /* Validate all ranges */ + sds err_msg = NULL; + if (validateSlotRanges(slots, &err_msg) != C_OK) { + if (err_msg) sdsfree(err_msg); + goto err; + } + sdsfreesplitres(parts, num_ranges); + return slots; + +err: + if (slots) slotRangeArrayFree(slots); + sdsfreesplitres(parts, num_ranges); + return NULL; +} + +static int compareSlotRange(const void *a, const void *b) { + const slotRange *sa = a; + const slotRange *sb = b; + if (sa->start < sb->start) return -1; + if (sa->start > sb->start) return 1; + return 0; +} + +/* Compare two slot range arrays, return 1 if equal, 0 otherwise */ +int slotRangeArrayIsEqual(slotRangeArray *slots1, slotRangeArray *slots2) { + if (slots1->num_ranges != slots2->num_ranges) return 0; + + /* Sort slot ranges first */ + qsort(slots1->ranges, slots1->num_ranges, sizeof(slotRange), compareSlotRange); + qsort(slots2->ranges, slots2->num_ranges, sizeof(slotRange), compareSlotRange); + + for (int i = 0; i < slots1->num_ranges; i++) { + if (slots1->ranges[i].start != slots2->ranges[i].start || + slots1->ranges[i].end != slots2->ranges[i].end) { + return 0; + } + } + return 1; +} + +/* Add a slot to the slot range array. + * Usage: + * slotRangeArray *slots = NULL + * slots = slotRangeArrayAppend(slots, 1000); + * slots = slotRangeArrayAppend(slots, 1001); + * slots = slotRangeArrayAppend(slots, 1003); + * slots = slotRangeArrayAppend(slots, 1004); + * slots = slotRangeArrayAppend(slots, 1005); + * + * Result: 1000-1001, 1003-1005 + * Note: `slot` must be greater than the previous slot. + * */ +slotRangeArray *slotRangeArrayAppend(slotRangeArray *slots, int slot) { + if (slots == NULL) { + slots = slotRangeArrayCreate(4); + slots->ranges[0].start = slot; + slots->ranges[0].end = slot; + slots->num_ranges = 1; + return slots; + } + + serverAssert(slots->num_ranges >= 0 && slots->num_ranges <= CLUSTER_SLOTS); + serverAssert(slot > slots->ranges[slots->num_ranges - 1].end); + + /* Check if we can extend the last range */ + slotRange *last = &slots->ranges[slots->num_ranges - 1]; + if (slot == last->end + 1) { + last->end = slot; + return slots; + } + + /* Calculate current capacity and reallocate if needed */ + int cap = (int) ((zmalloc_size(slots) - sizeof(slotRangeArray)) / sizeof(slotRange)); + if (slots->num_ranges >= cap) + slots = zrealloc(slots, sizeof(slotRangeArray) + sizeof(slotRange) * cap * 2); + + /* Add new single-slot range */ + slots->ranges[slots->num_ranges].start = slot; + slots->ranges[slots->num_ranges].end = slot; + slots->num_ranges++; + + return slots; +} + +/* Returns 1 if the slot range array contains the given slot, 0 otherwise. */ +int slotRangeArrayContains(slotRangeArray *slots, unsigned int slot) { + for (int i = 0; i < slots->num_ranges; i++) + if (slots->ranges[i].start <= slot && slots->ranges[i].end >= slot) + return 1; + return 0; +} + +/* Free the slot range array. */ +void slotRangeArrayFree(slotRangeArray *slots) { + zfree(slots); +} + +/* Generic version of slotRangeArrayFree(). */ +void slotRangeArrayFreeGeneric(void *slots) { + slotRangeArrayFree(slots); +} + +/* Slot range array iterator */ +slotRangeArrayIter *slotRangeArrayGetIterator(slotRangeArray *slots) { + slotRangeArrayIter *it = zmalloc(sizeof(*it)); + it->slots = slots; + it->range_index = 0; + it->cur_slot = slots->num_ranges > 0 ? slots->ranges[0].start : -1; + return it; +} + +/* Returns the next slot in the array, or -1 if there are no more slots. */ +int slotRangeArrayNext(slotRangeArrayIter *it) { + if (it->range_index >= it->slots->num_ranges) return -1; + + if (it->cur_slot < it->slots->ranges[it->range_index].end) { + it->cur_slot++; + } else { + it->range_index++; + if (it->range_index < it->slots->num_ranges) + it->cur_slot = it->slots->ranges[it->range_index].start; + else + it->cur_slot = -1; /* finished */ + } + return it->cur_slot; +} + +int slotRangeArrayGetCurrentSlot(slotRangeArrayIter *it) { + return it->cur_slot; +} + +void slotRangeArrayIteratorFree(slotRangeArrayIter *it) { + zfree(it); +} + +/* Parse slot ranges from the command arguments. Returns NULL on error. */ +slotRangeArray *parseSlotRangesOrReply(client *c, int argc, int pos) { + int start, end, count; + slotRangeArray *slots; + + serverAssert(pos <= argc); + serverAssert((argc - pos) % 2 == 0); + + count = (argc - pos) / 2; + slots = slotRangeArrayCreate(count); + slots->num_ranges = 0; + + for (int j = pos; j < argc; j += 2) { + if ((start = getSlotOrReply(c, c->argv[j])) == -1 || + (end = getSlotOrReply(c, c->argv[j + 1])) == -1) + { + slotRangeArrayFree(slots); + return NULL; + } + slotRangeArraySet(slots, slots->num_ranges, start, end); + slots->num_ranges++; + } + + sds err = NULL; + if (validateSlotRanges(slots, &err) != C_OK) { + addReplyErrorSds(c, err); + slotRangeArrayFree(slots); + return NULL; + } + return slots; +} + +/* Return 1 if the keys in the slot can be accessed, 0 otherwise. */ +int clusterCanAccessKeysInSlot(int slot) { + /* If not in cluster mode, all keys are accessible */ + if (server.cluster_enabled == 0) return 1; + + /* If the slot is being imported under old slot migration approach, we should + * allow to list keys from the slot as previously. */ + if (getImportingSlotSource(slot)) return 1; + + /* If using atomic slot migration, check if the slot belongs to the current + * node or its master, return 1 if so. */ + clusterNode *myself = getMyClusterNode(); + if (clusterNodeIsSlave(myself)) { + clusterNode *master = clusterNodeGetMaster(myself); + if (master && clusterNodeCoversSlot(master, slot)) + return 1; + } else { + if (clusterNodeCoversSlot(myself, slot)) + return 1; + } + return 0; +} + +/* Return the slot ranges that belong to the current node or its master. */ +slotRangeArray *clusterGetLocalSlotRanges(void) { + slotRangeArray *slots = NULL; + + if (!server.cluster_enabled) { + slots = slotRangeArrayCreate(1); + slotRangeArraySet(slots, 0, 0, CLUSTER_SLOTS - 1); + return slots; + } + + clusterNode *master = clusterNodeGetMaster(getMyClusterNode()); + if (master) { + for (int i = 0; i < CLUSTER_SLOTS; i++) { + if (clusterNodeCoversSlot(master, i)) + slots = slotRangeArrayAppend(slots, i); + } + } + return slots ? slots : slotRangeArrayCreate(0); } /* Partially flush destination DB in a cluster node, based on the slot range. @@ -1635,77 +2006,44 @@ void sflushCommand(client *c) { return; } - /* Verify slot pairs are valid and not overlapping */ - long long j, first, last; - unsigned char slotsToFlushRq[CLUSTER_SLOTS] = {0}; - for (j = 1; j < argc; j += 2) { - /* check if the first slot is valid */ - if (getLongLongFromObject(c->argv[j], &first) != C_OK || first < 0 || first >= CLUSTER_SLOTS) { - addReplyError(c,"Invalid or out of range slot"); - return; - } + /* Parse slot ranges from the command arguments. */ + slotRangeArray *slots = parseSlotRangesOrReply(c, argc, 1); + if (!slots) return; - /* check if the last slot is valid */ - if (getLongLongFromObject(c->argv[j+1], &last) != C_OK || last < 0 || last >= CLUSTER_SLOTS) { - addReplyError(c,"Invalid or out of range slot"); - return; - } - - if (first > last) { - addReplyErrorFormat(c,"start slot number %lld is greater than end slot number %lld", first, last); - return; - } - - /* Mark the slots in slotsToFlushRq[] */ - for (int i = first; i <= last; i++) { - if (slotsToFlushRq[i]) { - addReplyErrorFormat(c, "Slot %d specified multiple times", i); - return; + /* Iterate and find the slot ranges that belong to this node. Save them in + * a new slotRangeArray. It is allocated on heap since there is a chance + * that FLUSH SYNC will be running as blocking ASYNC and only later reply + * with slot ranges */ + unsigned char slots_to_flush[CLUSTER_SLOTS] = {0}; /* Requested slots to flush */ + slotRangeArray *myslots = NULL; + for (int i = 0; i < slots->num_ranges; i++) { + for (int j = slots->ranges[i].start; j <= slots->ranges[i].end; j++) { + if (clusterIsMySlot(j)) { + myslots = slotRangeArrayAppend(myslots, j); + slots_to_flush[j] = 1; } - slotsToFlushRq[i] = 1; } } - /* Verify slotsToFlushRq[] covers ALL slots of myNode. */ - clusterNode *myNode = getMyClusterNode(); - /* During iteration trace also the slot range pairs and save in SlotsFlush. - * It is allocated on heap since there is a chance that FLUSH SYNC will be - * running as blocking ASYNC and only later reply with slot ranges */ - int capacity = 32; /* Initial capacity */ - SlotsFlush *sflush = zmalloc(sizeof(SlotsFlush) + sizeof(SlotRange) * capacity); - sflush->numRanges = 0; - int inSlotRange = 0; + /* Verify that all slots of mynode got covered. See sflushCommand() comment. */ + int all_slots_covered = 1; for (int i = 0; i < CLUSTER_SLOTS; i++) { - if (myNode == getNodeBySlot(i)) { - if (!slotsToFlushRq[i]) { - addReplySetLen(c, 0); /* Not all slots of mynode got covered. See sflushCommand() comment. */ - zfree(sflush); - return; - } - - if (!inSlotRange) { /* If start another slot range */ - sflush->ranges[sflush->numRanges].first = i; - inSlotRange = 1; - } - } else { - if (inSlotRange) { /* If end another slot range */ - sflush->ranges[sflush->numRanges++].last = i - 1; - inSlotRange = 0; - /* If reached 'sflush' capacity, double the capacity */ - if (sflush->numRanges >= capacity) { - capacity *= 2; - sflush = zrealloc(sflush, sizeof(SlotsFlush) + sizeof(SlotRange) * capacity); - } - } + if (clusterIsMySlot(i) && !slots_to_flush[i]) { + all_slots_covered = 0; + break; } } + if (myslots == NULL || !all_slots_covered) { + addReplyArrayLen(c, 0); + slotRangeArrayFree(slots); + slotRangeArrayFree(myslots); + return; + } + slotRangeArrayFree(slots); - /* Update last pair if last cluster slot is also end of last range */ - if (inSlotRange) sflush->ranges[sflush->numRanges++].last = CLUSTER_SLOTS - 1; - /* Flush selected slots. If not flush as blocking async, then reply immediately */ - if (flushCommandCommon(c, FLUSH_TYPE_SLOTS, flags, sflush) == 0) - replySlotsFlushAndFree(c, sflush); + if (flushCommandCommon(c, FLUSH_TYPE_SLOTS, flags, myslots) == 0) + replySlotsFlushAndFree(c, myslots); } /* The READWRITE command just clears the READONLY command state. */ diff --git a/src/cluster.h b/src/cluster.h index dc99f8dc9..246cb5948 100644 --- a/src/cluster.h +++ b/src/cluster.h @@ -153,6 +153,9 @@ clusterNode *clusterLookupNode(const char *name, int length); const char *clusterGetSecret(size_t *len); unsigned int countKeysInSlot(unsigned int slot); int getSlotOrReply(client *c, robj *o); +int clusterIsMySlot(int slot); +int clusterCanAccessKeysInSlot(int slot); +struct slotRangeArray *clusterGetLocalSlotRanges(void); /* functions with shared implementations */ clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, int argc, int *hashslot, uint64_t cmd_flags, int *error_code); @@ -160,11 +163,44 @@ int clusterRedirectBlockedClientIfNeeded(client *c); void clusterRedirectClient(client *c, clusterNode *n, int hashslot, int error_code); void migrateCloseTimedoutSockets(void); int patternHashSlot(char *pattern, int length); +int getSlotOrReply(client *c, robj *o); int isValidAuxString(char *s, unsigned int length); void migrateCommand(client *c); void clusterCommand(client *c); ConnectionType *connTypeOfCluster(void); +typedef struct slotRange { + unsigned short start, end; +} slotRange; +typedef struct slotRangeArray { + int num_ranges; + slotRange ranges[]; +} slotRangeArray; +typedef struct slotRangeArrayIter { + slotRangeArray *slots; /* the array we’re iterating */ + int range_index; /* current range index */ + int cur_slot; /* current slot within the range */ +} slotRangeArrayIter; +slotRangeArray *slotRangeArrayCreate(int num_ranges); +slotRangeArray *slotRangeArrayDup(slotRangeArray *slots); +void slotRangeArraySet(slotRangeArray *slots, int idx, int start, int end); +sds slotRangeArrayToString(slotRangeArray *slots); +slotRangeArray *slotRangeArrayFromString(sds data); +int slotRangeArrayIsEqual(slotRangeArray *slots1, slotRangeArray *slots2); +slotRangeArray *slotRangeArrayAppend(slotRangeArray *slots, int slot); +int slotRangeArrayContains(slotRangeArray *slots, unsigned int slot); +void slotRangeArrayFree(slotRangeArray *slots); +void slotRangeArrayFreeGeneric(void *slots); +slotRangeArrayIter *slotRangeArrayGetIterator(slotRangeArray *slots); +int slotRangeArrayNext(slotRangeArrayIter *it); +int slotRangeArrayGetCurrentSlot(slotRangeArrayIter *it); +void slotRangeArrayIteratorFree(slotRangeArrayIter *it); +int validateSlotRanges(slotRangeArray *slots, sds *err); +slotRangeArray *parseSlotRangesOrReply(client *c, int argc, int pos); + +unsigned int clusterDelKeysInSlot(unsigned int hashslot, int by_command); +unsigned int clusterDelKeysInSlotRangeArray(slotRangeArray *slots, int by_command); + void clusterGenNodesSlotsInfo(int filter); void clusterFreeNodesSlotsInfo(clusterNode *n); int clusterNodeSlotInfoCount(clusterNode *n); @@ -184,4 +220,136 @@ clusterNode *clusterShardNodeFirst(void *shard); int clusterNodeTcpPort(clusterNode *node); int clusterNodeTlsPort(clusterNode *node); + +/* API for alternative cluster implementations to start and coordinate + * Atomic Slot Migration (ASM). + * + * These two functions drive ASM for alternative cluster implementations. + * - clusterAsmProcess(...) impl -> redis: initiates/advances/cancels ASM operations + * - clusterAsmOnEvent(...) redis -> impl: notifies state changes + * + * Generic steps for an alternative implementation: + * - On destination side, implementation calls clusterAsmProcess(ASM_EVENT_IMPORT_START) + * to start an import operation. + * - Redis calls clusterAsmOnEvent() when an ASM event occurs. + * - On the source side, Redis will call clusterAsmOnEvent(ASM_EVENT_HANDOFF_PREP) + * when slots are ready to be handed off and the write pause is needed. + * - Implementation stops the traffic to the slots and calls clusterAsmProcess(ASM_EVENT_HANDOFF) + * - On the destination side, Redis calls clusterAsmOnEvent(ASM_EVENT_TAKEOVER) + * when destination node is ready to take over the slot, waiting for ownership change. + * - Cluster implementation updates the config and calls clusterAsmProcess(ASM_EVENT_DONE) + * to notify Redis that the slots ownership has changed. + * + * Sequence diagram for import: + * - Note: shows only the events that cluster implementation needs to react. + * + * ┌───────────────┐ ┌───────────────┐ ┌───────────────┐ ┌───────────────┐ + * │ Destination │ │ Destination │ │ Source │ │ Source │ + * │ Cluster impl │ │ Master │ │ Master │ │ Cluster impl │ + * └───────┬───────┘ └───────┬───────┘ └───────┬───────┘ └───────┬───────┘ + * │ │ │ │ + * │ ASM_EVENT_IMPORT_START │ │ │ + * ├─────────────────────────────►│ │ │ + * │ │ CLUSTER SYNCSLOTS │ │ + * │ ├────────────────────────►│ │ + * │ │ │ │ + * │ │ SNAPSHOT(restore cmds) │ │ + * │ │◄────────────────────────┤ │ + * │ │ Repl stream │ │ + * │ │◄────────────────────────┤ │ + * │ │ │ ASM_EVENT_HANDOFF_PREP │ + * │ │ ├────────────────────────────►│ + * │ │ │ ASM_EVENT_HANDOFF │ + * │ │ │◄────────────────────────────┤ + * │ │ Drain repl stream │ │ + * │ │◄────────────────────────┤ │ + * │ ASM_EVENT_TAKEOVER │ │ │ + * │◄─────────────────────────────┤ │ │ + * │ │ │ │ + * │ ASM_EVENT_DONE │ │ │ + * ├─────────────────────────────►│ │ ASM_EVENT_DONE │ + * │ │ │◄────────────────────────────┤ + * │ │ │ │ + */ + +#define ASM_EVENT_IMPORT_START 1 /* Start a new import operation (destination side) */ +#define ASM_EVENT_CANCEL 2 /* Cancel an ongoing import/migrate operation (source and destination side) */ +#define ASM_EVENT_HANDOFF_PREP 3 /* Slot is ready to be handed off to the destination shard (source side) */ +#define ASM_EVENT_HANDOFF 4 /* Notify that the slot can be handed off (source side) */ +#define ASM_EVENT_TAKEOVER 5 /* Ready to take over the slot, waiting for config change (destination side) */ +#define ASM_EVENT_DONE 6 /* Notify that import/migrate is completed, config is updated (source and destination side) */ + +#define ASM_EVENT_IMPORT_PREP 7 /* Import is about to start, the implementation may reject by returning C_ERR */ +#define ASM_EVENT_IMPORT_STARTED 8 /* Import started */ +#define ASM_EVENT_IMPORT_FAILED 9 /* Import failed */ +#define ASM_EVENT_IMPORT_COMPLETED 10 /* Import completed (config updated) */ +#define ASM_EVENT_MIGRATE_PREP 11 /* Migrate is about to start, the implementation may reject by returning C_ERR */ +#define ASM_EVENT_MIGRATE_STARTED 12 /* Migrate started */ +#define ASM_EVENT_MIGRATE_FAILED 13 /* Migrate failed */ +#define ASM_EVENT_MIGRATE_COMPLETED 14 /* Migrate completed (config updated) */ + + +/* Called by cluster implementation to request an ASM operation. (cluster impl --> redis) + * Valid values for 'event': + * ASM_EVENT_IMPORT_START + * ASM_EVENT_CANCEL + * ASM_EVENT_HANDOFF + * ASM_EVENT_DONE + * + * For ASM_EVENT_IMPORT_START, 'task_id' should be a unique string. + * For other events (ASM_EVENT_CANCEL, ASM_EVENT_HANDOFF, ASM_EVENT_DONE), + * 'task_id' should match the ID from the corresponding import operation. + * Usage: + * char *task_id = malloc(CLUSTER_NAMELEN + 1); + * getRandomHexChars(task_id, CLUSTER_NAMELEN); + * task_id[CLUSTER_NAMELEN] = '\0'; + * + * slotRangeArray *slots = slotRangeArrayCreate(1); + * slotRangeArraySet(slots, 0, 0, 1000); + * + * const char *err = NULL; + * int ret = clusterAsmProcess(task_id, ASM_EVENT_IMPORT_START, slots, &err); + * zfree(task_id); + * slotRangeArrayFree(slots); + * + * if (ret != C_OK) { + * perror(err); + * return; + * } + * + * For ASM_EVENT_CANCEL, if `task_id` is NULL, all tasks will be cancelled. + * If `arg` parameter is provided, it should be a pointer to an int. It will be + * set to the number of tasks cancelled. + * + * Return value: + * - Returns C_OK on success, C_ERR on failure and 'err' will be set to the + * error message. + * + * Memory management: + * - There is no ownership transfer of 'task_id', 'err' or `slotRangeArray`. + * - `task_id` and `slotRangeArray` should be allocated and be freed by the + * caller. Redis internally will make a copy of these. + * - `err` is allocated by Redis and should NOT be freed by the caller. + **/ +int clusterAsmProcess(const char *task_id, int event, void *arg, char **err); + +/* Called when an ASM event occurs to notify the cluster implementation. (redis --> cluster impl) + * + * `arg` will point to a `slotRangeArray` for the following events: + * ASM_EVENT_IMPORT_PREP + * ASM_EVENT_IMPORT_STARTED + * ASM_EVENT_MIGRATE_PREP + * ASM_EVENT_MIGRATE_STARTED + * ASM_EVENT_HANDOFF_PREP + * + * Memory management: + * - Redis owns the `task_id` and `slotRangeArray`. + * + * Returns C_OK on success. + * + * If the cluster implementation returns C_ERR for ASM_EVENT_IMPORT_PREP or + * ASM_EVENT_MIGRATE_PREP, operation will not start. + **/ +int clusterAsmOnEvent(const char *task_id, int event, void *arg); + #endif /* __CLUSTER_H */ diff --git a/src/cluster_asm.c b/src/cluster_asm.c new file mode 100644 index 000000000..9abd85d3f --- /dev/null +++ b/src/cluster_asm.c @@ -0,0 +1,3467 @@ +/* + * Copyright (c) 2025-Present, Redis Ltd. + * All rights reserved. + * + * Licensed under your choice of (a) the Redis Source Available License 2.0 + * (RSALv2); or (b) the Server Side Public License v1 (SSPLv1); or (c) the + * GNU Affero General Public License v3 (AGPLv3). + */ + +#include "server.h" +#include "cluster.h" +#include "functions.h" +#include "cluster_legacy.h" +#include "cluster_asm.h" +#include "cluster_slot_stats.h" + +#define ASM_IMPORT (1 << 1) +#define ASM_MIGRATE (1 << 2) + +#define ASM_DEBUG_TRIM_DEFAULT 0 +#define ASM_DEBUG_TRIM_NONE 1 +#define ASM_DEBUG_TRIM_BG 2 +#define ASM_DEBUG_TRIM_ACTIVE 3 + +#define ASM_AOF_MIN_ITEMS_PER_KEY 512 /* Minimum number of items per key to use AOF format encoding */ + +typedef struct asmTask { + sds id; /* Task ID */ + int operation; /* Either ASM_IMPORT or ASM_MIGRATE */ + slotRangeArray *slots; /* List of slot ranges for this migration task */ + int state; /* Current state of the task */ + int dest_state; /* Destination node's main state (approximate) */ + char source[CLUSTER_NAMELEN]; /* Source node name */ + char dest[CLUSTER_NAMELEN]; /* Destination node name */ + clusterNode *source_node; /* Source node */ + connection *main_channel_conn; /* Main channel connection */ + connection *rdb_channel_conn; /* RDB channel connection */ + int rdb_channel_state; /* State of the RDB channel */ + unsigned long long dest_offset; /* Destination offset */ + unsigned long long source_offset; /* Source offset */ + int cross_slot_during_propagating; /* If cross-slot commands are encountered during propagating */ + int stream_eof_during_streaming; /* If STREAM-EOF is received during streaming buffer */ + replDataBuf sync_buffer; /* Buffer for the stream */ + client *main_channel_client; /* Client for the main channel on the source side */ + client *rdb_channel_client; /* Client for the RDB channel on the source side */ + long long retry_count; /* Number of retries for this task */ + mstime_t create_time; /* Task creation time */ + mstime_t start_time; /* Task start time */ + mstime_t end_time; /* Task end time */ + mstime_t paused_time; /* The time when the slot writes were paused */ + mstime_t dest_slots_snapshot_time; /* The time when the destination starts applying the slot snapshot */ + mstime_t dest_accum_applied_time; /* The time when the destination finishes applying the accumulated buffer */ + sds error; /* Error message for this task */ + redisOpArray *pre_snapshot_module_cmds; /* Module commands to be propagated at the beginning of slot migration */ +} asmTask; + +struct asmManager { + list *tasks; /* List of asmTask to be processed */ + list *archived_tasks; /* List of archived asmTask */ + list *pending_trim_jobs; /* List of pending trim jobs (due to write pause) */ + list *active_trim_jobs; /* List of active trim jobs */ + slotRangeArrayIter *active_trim_it; /* Iterator of the current active trim job */ + size_t sync_buffer_peak; /* Peak size of sync buffer */ + asmTask *master_task; /* The task that is currently active on the master */ + + /* Fail point injection for debugging */ + int debug_failed_channel; /* Channel where the task failed */ + int debug_failed_state; /* State where the task failed */ + int debug_trim_method; /* Method to trim the buffer */ + int debug_active_trim_delay; /* Sleep before trimming each key */ + + /* Active trim stats */ + unsigned long long active_trim_started; /* Number of times active trim was started */ + unsigned long long active_trim_completed; /* Number of times active trim was completed */ + unsigned long long active_trim_cancelled; /* Number of times active trim was cancelled */ + unsigned long long active_trim_current_job_keys; /* Total number of keys to trim in the current job */ + unsigned long long active_trim_current_job_trimmed; /* Number of keys trimmed in the current job */ +}; + +enum asmState { + /* Common state */ + ASM_NONE = 0, + ASM_CONNECTING, + ASM_AUTH_REPLY, + ASM_CANCELED, + ASM_FAILED, + ASM_COMPLETED, + + /* Import state */ + ASM_SEND_HANDSHAKE, + ASM_HANDSHAKE_REPLY, + ASM_SEND_SYNCSLOTS, + ASM_SYNCSLOTS_REPLY, + ASM_INIT_RDBCHANNEL, + ASM_ACCUMULATE_BUF, + ASM_READY_TO_STREAM, + ASM_STREAMING_BUF, + ASM_WAIT_STREAM_EOF, + ASM_TAKEOVER, + + /* Migrate state */ + ASM_WAIT_RDBCHANNEL, + ASM_WAIT_BGSAVE_START, + ASM_SEND_BULK_AND_STREAM, + ASM_SEND_STREAM, + ASM_HANDOFF_PREP, + ASM_HANDOFF, + ASM_STREAM_EOF, + + /* RDB channel state */ + ASM_RDBCHANNEL_REQUEST, + ASM_RDBCHANNEL_REPLY, + ASM_RDBCHANNEL_TRANSFER, +}; + +enum asmChannel { + ASM_IMPORT_MAIN_CHANNEL = 1, /* Main channel for the import task */ + ASM_IMPORT_RDB_CHANNEL, /* RDB channel for the import task */ + ASM_MIGRATE_MAIN_CHANNEL, /* Main channel for the migrate task */ + ASM_MIGRATE_RDB_CHANNEL /* RDB channel for the migrate task */ +}; + +/* Global ASM manager */ +struct asmManager *asmManager = NULL; + +/* replication.c */ +char *sendCommand(connection *conn, ...); +char *sendCommandArgv(connection *conn, int argc, char **argv, size_t *argv_lens); +char *receiveSynchronousResponse(connection *conn); +ConnectionType *connTypeOfReplication(void); +int startBgsaveForReplication(int mincapa, int req); +void createReplicationBacklogIfNeeded(void); +/* cluster.c */ +void createDumpPayload(rio *payload, robj *o, robj *key, int dbid); +/* cluster_asm.c */ +static void asmStartImportTask(asmTask *task); +static void asmTaskCancel(asmTask *task, const char *reason); +static void asmSyncBufferReadFromConn(connection *conn); +static void propagateTrimSlots(slotRangeArray *slots); +void asmTrimJobSchedule(slotRangeArray *slots); +void asmTrimJobProcessPending(void); +void asmTriggerActiveTrim(slotRangeArray *slots); +void asmActiveTrimEnd(void); +int asmIsAnyTrimJobOverlaps(slotRangeArray *slots); +void asmTrimSlotsIfNotOwned(slotRangeArray *slots); +void asmNotifyStateChange(asmTask *task, int event); + +void asmInit(void) { + asmManager = zcalloc(sizeof(*asmManager)); + asmManager->tasks = listCreate(); + asmManager->archived_tasks = listCreate(); + asmManager->pending_trim_jobs = listCreate(); + asmManager->sync_buffer_peak = 0; + asmManager->master_task = NULL; + asmManager->debug_failed_channel = 0; + asmManager->debug_failed_state = 0; + asmManager->debug_trim_method = ASM_DEBUG_TRIM_DEFAULT; + asmManager->debug_active_trim_delay = 0; + asmManager->active_trim_jobs = listCreate(); + asmManager->active_trim_started = 0; + asmManager->active_trim_completed = 0; + asmManager->active_trim_cancelled = 0; + listSetFreeMethod(asmManager->active_trim_jobs, slotRangeArrayFreeGeneric); +} + +char *asmTaskStateToString(int state) { + switch (state) { + case ASM_NONE: return "none"; + case ASM_CONNECTING: return "connecting"; + case ASM_AUTH_REPLY: return "auth-reply"; + case ASM_CANCELED: return "canceled"; + case ASM_FAILED: return "failed"; + case ASM_COMPLETED: return "completed"; + + /* Import state */ + case ASM_SEND_HANDSHAKE: return "send-handshake"; + case ASM_HANDSHAKE_REPLY: return "handshake-reply"; + case ASM_SEND_SYNCSLOTS: return "send-syncslots"; + case ASM_SYNCSLOTS_REPLY: return "syncslots-reply"; + case ASM_INIT_RDBCHANNEL: return "init-rdbchannel"; + case ASM_ACCUMULATE_BUF: return "accumulate-buffer"; + case ASM_READY_TO_STREAM: return "ready-to-stream"; + case ASM_STREAMING_BUF: return "streaming-buffer"; + case ASM_WAIT_STREAM_EOF: return "wait-stream-eof"; + case ASM_TAKEOVER: return "takeover"; + + /* Migrate state */ + case ASM_WAIT_RDBCHANNEL: return "wait-rdbchannel"; + case ASM_WAIT_BGSAVE_START: return "wait-bgsave-start"; + case ASM_SEND_BULK_AND_STREAM: return "send-bulk-and-stream"; + case ASM_SEND_STREAM: return "send-stream"; + case ASM_HANDOFF_PREP: return "handoff-prep"; + case ASM_HANDOFF: return "handoff"; + case ASM_STREAM_EOF: return "stream-eof"; + + /* RDB channel state */ + case ASM_RDBCHANNEL_REQUEST: return "rdbchannel-request"; + case ASM_RDBCHANNEL_REPLY: return "rdbchannel-reply"; + case ASM_RDBCHANNEL_TRANSFER: return "rdbchannel-transfer"; + + default: return "unknown"; + } + serverAssert(0); /* Unreachable */ +} + +const char *asmChannelToString(int channel) { + switch (channel) { + case ASM_IMPORT_MAIN_CHANNEL: return "import-main-channel"; + case ASM_IMPORT_RDB_CHANNEL: return "import-rdb-channel"; + case ASM_MIGRATE_MAIN_CHANNEL: return "migrate-main-channel"; + case ASM_MIGRATE_RDB_CHANNEL: return "migrate-rdb-channel"; + default: return "unknown"; + } +} + +int asmDebugSetFailPoint(char * channel, char *state) { + if (!asmManager) { + serverLog(LL_WARNING, "ASM manager is not initialized"); + return C_ERR; + } + asmManager->debug_failed_channel = 0; + asmManager->debug_failed_state = 0; + if (!channel && !state) return C_ERR; + if (sdslen(channel) == 0 && sdslen(state) == 0) { + serverLog(LL_WARNING, "ASM fail point is cleared"); + return C_OK; + } + + for (int i = ASM_IMPORT_MAIN_CHANNEL; i <= ASM_MIGRATE_RDB_CHANNEL; i++) { + if (!strcasecmp(channel, asmChannelToString(i))) { + asmManager->debug_failed_channel = i; + break; + } + } + if (asmManager->debug_failed_channel == 0) return C_ERR; + + for (int i = ASM_NONE; i <= ASM_RDBCHANNEL_TRANSFER; i++) { + if (!strcasecmp(state, asmTaskStateToString(i))) { + asmManager->debug_failed_state = i; + break; + } + } + if (asmManager->debug_failed_state == 0) return C_ERR; + + serverLog(LL_NOTICE, "ASM fail point set: channel=%s, state=%s", channel, state); + return C_OK; +} + +int asmDebugSetTrimMethod(const char *method, int active_trim_delay) { + if (!asmManager) { + serverLog(LL_WARNING, "ASM manager is not initialized"); + return C_ERR; + } + int prev = asmManager->debug_trim_method; + if (!strcasecmp(method, "default")) asmManager->debug_trim_method = ASM_DEBUG_TRIM_DEFAULT; + else if (!strcasecmp(method, "none")) asmManager->debug_trim_method = ASM_DEBUG_TRIM_NONE; + else if (!strcasecmp(method, "bg")) asmManager->debug_trim_method = ASM_DEBUG_TRIM_BG; + else if (!strcasecmp(method, "active")) asmManager->debug_trim_method = ASM_DEBUG_TRIM_ACTIVE; + else return C_ERR; + + /* If we are switching from none to default, delete all the keys in the + * slots we don't own */ + if (prev == ASM_DEBUG_TRIM_NONE && asmManager->debug_trim_method != ASM_DEBUG_TRIM_NONE) { + for (int i = 0; i < CLUSTER_SLOTS; i++) + if (!clusterIsMySlot(i)) + clusterDelKeysInSlot(i, 0); + } + asmManager->debug_active_trim_delay = active_trim_delay; + serverLog(LL_NOTICE, "ASM trim method was set=%s, active_trim_delay=%d", method, active_trim_delay); + return C_OK; +} + +int asmDebugIsFailPointActive(int channel, int state) { + if (!asmManager) return 0; /* ASM manager not initialized */ + if (asmManager->debug_failed_channel == channel && asmManager->debug_failed_state == state) { + serverLog(LL_NOTICE, "ASM fail point active: channel=%s, state=%s", + asmChannelToString(channel), asmTaskStateToString(state)); + return 1; + } + return 0; +} + +sds asmCatInfoString(sds info) { + int active_tasks = 0; + + listIter li; + listNode *ln; + listRewind(asmManager->tasks, &li); + while ((ln = listNext(&li)) != NULL) { + asmTask *task = listNodeValue(ln); + if (task->operation == ASM_IMPORT || + (task->operation == ASM_MIGRATE && task->state != ASM_FAILED)) + { + active_tasks++; + } + } + + return sdscatprintf(info ? info : sdsempty(), + "cluster_slot_migration_active_tasks:%d\r\n" + "cluster_slot_migration_active_trim_running:%lu\r\n" + "cluster_slot_migration_active_trim_current_job_keys:%llu\r\n" + "cluster_slot_migration_active_trim_current_job_trimmed:%llu\r\n" + "cluster_slot_migration_stats_active_trim_started:%llu\r\n" + "cluster_slot_migration_stats_active_trim_completed:%llu\r\n" + "cluster_slot_migration_stats_active_trim_cancelled:%llu\r\n", + active_tasks, + listLength(asmManager->active_trim_jobs), + asmManager->active_trim_current_job_keys, + asmManager->active_trim_current_job_trimmed, + asmManager->active_trim_started, + asmManager->active_trim_completed, + asmManager->active_trim_cancelled); +} + +void asmTaskReset(asmTask *task) { + task->state = ASM_NONE; + task->dest_state = ASM_NONE; + task->rdb_channel_state = ASM_NONE; + task->main_channel_conn = NULL; + task->rdb_channel_conn = NULL; + task->dest_offset = 0; + task->source_offset = 0; + task->stream_eof_during_streaming = 0; + task->cross_slot_during_propagating = 0; + replDataBufInit(&task->sync_buffer); + task->main_channel_client = NULL; + task->rdb_channel_client = NULL; + task->paused_time = 0; + task->dest_slots_snapshot_time = 0; + task->dest_accum_applied_time = 0; + task->pre_snapshot_module_cmds = NULL; +} + +asmTask *asmTaskCreate(const char *task_id) { + asmTask *task = zcalloc(sizeof(*task)); + task->error = sdsempty(); + asmTaskReset(task); + task->slots = NULL; + task->source_node = NULL; + task->retry_count = 0; + task->create_time = server.mstime; + task->start_time = -1; + task->end_time = -1; + if (task_id) { + task->id = sdsnew(task_id); + } else { + task->id = sdsnewlen(NULL, CLUSTER_NAMELEN); + getRandomHexChars(task->id, CLUSTER_NAMELEN); + } + + return task; +} + +void asmTaskFree(asmTask *task) { + replDataBufClear(&task->sync_buffer); + sdsfree(task->id); + slotRangeArrayFree(task->slots); + sdsfree(task->error); + zfree(task); +} + +/* Convert the task state to the corresponding event. */ +int asmTaskStateToEvent(asmTask *task) { + if (task->operation == ASM_IMPORT) { + if (task->state == ASM_COMPLETED) return ASM_EVENT_IMPORT_COMPLETED; + else if (task->state == ASM_FAILED) return ASM_EVENT_IMPORT_FAILED; + else return ASM_EVENT_IMPORT_STARTED; + } else { + if (task->state == ASM_COMPLETED) return ASM_EVENT_MIGRATE_COMPLETED; + else if (task->state == ASM_FAILED) return ASM_EVENT_MIGRATE_FAILED; + else return ASM_EVENT_MIGRATE_STARTED; + } +} + +/* Serialize ASM task information into a string for transmission to replicas. + * Format: "task_id:source_node:dest_node:operation:state:slot_ranges" + * Where slot_ranges is in the format "1000-2000 3000-4000 ..." */ +sds asmTaskSerialize(asmTask *task) { + sds serialized = sdsempty(); + + /* Add task ID */ + serialized = sdscatprintf(serialized, "%s:", task->id); + + /* Add source node ID (40 chars) */ + serialized = sdscatlen(serialized, task->source, CLUSTER_NAMELEN); + serialized = sdscat(serialized, ":"); + + /* Add destination node ID (40 chars) */ + serialized = sdscatlen(serialized, task->dest, CLUSTER_NAMELEN); + serialized = sdscat(serialized, ":"); + + /* Add operation type */ + serialized = sdscatprintf(serialized, "%s:", task->operation == ASM_IMPORT ? + "import" : "migrate"); + + /* Add current state */ + serialized = sdscatprintf(serialized, "%s:", asmTaskStateToString(task->state)); + + /* Add slot ranges sds */ + sds slots_str = slotRangeArrayToString(task->slots); + serialized = sdscatprintf(serialized, "%s", slots_str); + sdsfree(slots_str); + + return serialized; +} + +/* Deserialize ASM task information from a string and create a complete asmTask. + * Format: "task_id:source_node:dest_node:operation:state:slot_ranges" + * Returns a new asmTask on success, NULL on failure. */ +asmTask *asmTaskDeserialize(sds data) { + int count, idx = 0; + asmTask *task = NULL; + if (!data || sdslen(data) == 0) return NULL; + + sds *parts = sdssplitlen(data, sdslen(data), ":", 1, &count); + if (count < 6) goto err; + + /* Parse task ID */ + if (sdslen(parts[idx]) == 0) goto err; + task = asmTaskCreate(parts[idx]); + if (!task) goto err; + idx++; + + /* Parse source node ID */ + if (sdslen(parts[idx]) != CLUSTER_NAMELEN) goto err; + memcpy(task->source, parts[idx], CLUSTER_NAMELEN); + idx++; + + /* Parse destination node ID */ + if (sdslen(parts[idx]) != CLUSTER_NAMELEN) goto err; + memcpy(task->dest, parts[idx], CLUSTER_NAMELEN); + idx++; + + /* Parse operation type */ + if (!strcasecmp(parts[idx], "import")) { + task->operation = ASM_IMPORT; + } else if (!strcasecmp(parts[idx], "migrate")) { + task->operation = ASM_MIGRATE; + } else { + goto err; + } + idx++; + + /* Parse state */ + task->state = ASM_NONE; /* Default state */ + for (int state = ASM_NONE; state <= ASM_RDBCHANNEL_TRANSFER; state++) { + if (!strcasecmp(parts[idx], asmTaskStateToString(state))) { + task->state = state; + break; + } + } + idx++; + + /* Parse slot ranges */ + task->slots = slotRangeArrayFromString(parts[idx]); + if (!task->slots) goto err; + idx++; + + /* Ignore any extra fields for future compatibility */ + + sdsfreesplitres(parts, count); + return task; + +err: + if (task) asmTaskFree(task); + sdsfreesplitres(parts, count); + return NULL; +} + +/* Notify replicas about ASM task information to maintain consistency during + * slot migration. This function sends a CLUSTER SYNCSLOTS CONF ASM-TASK command + * to all connected replicas with the serialized task information. */ +void asmNotifyReplicasStateChange(struct asmTask *task) { + if (!server.cluster_enabled || !clusterNodeIsMaster(getMyClusterNode())) return; + + /* Do not propagate migrate task to replicas, as replicas never migrate data. */ + if (task->operation == ASM_MIGRATE) return; + + /* Create command arguments for CLUSTER SYNCSLOTS CONF ASM-TASK */ + robj *argv[5]; + argv[0] = createStringObject("CLUSTER", 7); + argv[1] = createStringObject("SYNCSLOTS", 9); + argv[2] = createStringObject("CONF", 4); + argv[3] = createStringObject("ASM-TASK", 8); + argv[4] = createObject(OBJ_STRING, asmTaskSerialize(task)); + + /* Send the command to all replicas */ + replicationFeedSlaves(server.slaves, -1, argv, 5); + + /* Clean up command objects */ + for (int i = 0; i < 5; i++) { + decrRefCount(argv[i]); + } +} + +/* Dump the active import ASM task information. */ +sds asmDumpActiveImportTask(void) { + if (!server.cluster_enabled) return NULL; + + /* For replica, dump the master active task. */ + if (clusterNodeIsSlave(getMyClusterNode()) && + asmManager->master_task && + asmManager->master_task->state != ASM_FAILED && + asmManager->master_task->state != ASM_COMPLETED) + { + return asmTaskSerialize(asmManager->master_task); + } + + /* For master, dump the first active task. */ + if (!asmManager || listLength(asmManager->tasks) == 0) return NULL; + asmTask *task = listNodeValue(listFirst(asmManager->tasks)); + if (task->operation == ASM_MIGRATE) return NULL; + if (task->state == ASM_NONE || task->state == ASM_FAILED || + task->state == ASM_COMPLETED) return NULL; + + return asmTaskSerialize(task); +} + +size_t asmGetPeakSyncBufferSize(void) { + if (!asmManager) return 0; + /* Compute peak sync buffer usage. The current task's peak may not + * reflect in asmManager->sync_buffer_peak immediately. */ + size_t peak = asmManager->sync_buffer_peak; + asmTask *task = listFirst(asmManager->tasks) ? + listNodeValue(listFirst(asmManager->tasks)) : NULL; + if (task && task->operation == ASM_IMPORT) + peak = max(task->sync_buffer.peak, asmManager->sync_buffer_peak); + + return peak; +} + +size_t asmGetImportInputBufferSize(void) { + if (!asmManager || listLength(asmManager->tasks) == 0) return 0; + + asmTask *task = listNodeValue(listFirst(asmManager->tasks)); + if (task->operation == ASM_IMPORT) + return task->sync_buffer.mem_used; + + return 0; +} + +size_t asmGetMigrateOutputBufferSize(void) { + if (!asmManager || listLength(asmManager->tasks) == 0) return 0; + + asmTask *task = listNodeValue(listFirst(asmManager->tasks)); + if (task->operation == ASM_MIGRATE && task->main_channel_client) + return getClientOutputBufferMemoryUsage(task->main_channel_client); + + return 0; +} + +/* Returns the ASM task with the given ID, or NULL if no such task exists. */ +static asmTask *asmLookupTaskAt(list *tasks, const char *id) { + listIter li; + listNode *ln; + + listRewind(tasks, &li); + while ((ln = listNext(&li)) != NULL) { + asmTask *task = listNodeValue(ln); + if (!strcmp(task->id, id)) return task; + } + return NULL; +} + +/* Returns the ASM task with the given ID, or NULL if no such task exists. */ +asmTask *asmLookupTaskById(const char *id) { + return asmLookupTaskAt(asmManager->tasks, id); +} + +/* Returns the ASM task that is identical to the given slot range array, or NULL + * if no such task exists. */ +asmTask *asmLookupTaskBySlotRangeArray(slotRangeArray *slots) { + listIter li; + listNode *ln; + + listRewind(asmManager->tasks, &li); + while ((ln = listNext(&li)) != NULL) { + asmTask *task = listNodeValue(ln); + if (slotRangeArrayIsEqual(task->slots, slots)) + return task; + } + return NULL; +} + +/* Returns the slot range array for the given task ID */ +slotRangeArray *asmTaskGetSlotRanges(const char *task_id) { + asmTask *task = NULL; + if (!task_id || (task = asmLookupTaskById(task_id)) == NULL) return NULL; + + return task->slots; +} + +/* Returns 1 if the slot range array overlaps with the given slot range. */ +static int slotRangeArrayOverlaps(slotRangeArray *slots, slotRange *req) { + for (int i = 0; i < slots->num_ranges; i++) { + slotRange *sr = &slots->ranges[i]; + if (sr->start <= req->end && sr->end >= req->start) + return 1; + } + return 0; +} + +/* Returns 1 if the two slot range arrays overlap, 0 otherwise. */ +static int slotRangeArraysOverlap(slotRangeArray *slots1, slotRangeArray *slots2) { + for (int i = 0; i < slots1->num_ranges; i++) { + slotRange *sr1 = &slots1->ranges[i]; + if (slotRangeArrayOverlaps(slots2, sr1)) return 1; + } + return 0; +} + +/* Returns the ASM task that overlaps with the given slot range, or NULL if + * no such task exists. */ +static asmTask *lookupAsmTaskBySlotRange(slotRange *req) { + listIter li; + listNode *ln; + + listRewind(asmManager->tasks, &li); + while ((ln = listNext(&li)) != NULL) { + asmTask *task = listNodeValue(ln); + if (slotRangeArrayOverlaps(task->slots, req)) + return task; + } + return NULL; +} + +/* Validates the given slot ranges for a migration task: + * - Ensures the current node is a master. + * - Verifies all slots are in a STABLE state. + * - Checks that slot ranges are well-formed and non-overlapping. + * - Confirms all slots belong to a single source node. + * - Confirms no ongoing import task that overlaps with the slot ranges. + * + * Returns the source node if validation succeeds. + * Otherwise, returns NULL and sets 'err' variable. */ +static clusterNode *validateImportSlotRanges(slotRangeArray *slots, sds *err, asmTask *current) { + clusterNode *source = NULL; + + *err = NULL; + + /* Ensure this is a master node */ + if (!clusterNodeIsMaster(getMyClusterNode())) { + *err = sdsnew("slot migration not allowed on replica."); + goto out; + } + + /* Ensure no manual migration is in progress. */ + for (int i = 0; i < CLUSTER_SLOTS; i++) { + if (getImportingSlotSource(i) != NULL || + getMigratingSlotDest(i) != NULL) + { + *err = sdsnew("all slot states must be STABLE to start a slot migration task."); + goto out; + } + } + + for (int i = 0; i < slots->num_ranges; i++) { + slotRange *sr = &slots->ranges[i]; + + /* Ensure no import task overlaps with this slot range. + * Skip check current task that is running for this slot range. */ + asmTask *task = lookupAsmTaskBySlotRange(sr); + if (task && task != current && task->operation == ASM_IMPORT) { + *err = sdscatprintf(sdsempty(), + "overlapping import exists for slot range: %d-%d", + sr->start, sr->end); + goto out; + } + + /* Validate if we can start migration task for this slot range. */ + for (int j = sr->start; j <= sr->end; j++) { + clusterNode *node = getNodeBySlot(j); + if (node == NULL) { + *err = sdscatprintf(sdsempty(), "slot has no owner: %d", j); + goto out; + } + + if (!source) { + source = node; + } else if (source != node) { + *err = sdsnew("slots belong to different source nodes"); + goto out; + } + } + } + +out: + return *err ? NULL : source; +} + +/* Returns 1 if a task with the specified operation is in progress, 0 otherwise. */ +static int asmTaskInProgress(int operation) { + listIter li; + listNode *ln; + + if (!asmManager || listLength(asmManager->tasks) == 0) return 0; + + listRewind(asmManager->tasks, &li); + while ((ln = listNext(&li)) != NULL) { + asmTask *task = listNodeValue(ln); + if (task->operation == operation) return 1; + } + return 0; +} + +/* Returns 1 if a migrate task is in progress, 0 otherwise. */ +int asmMigrateInProgress(void) { + return asmTaskInProgress(ASM_MIGRATE); +} + +/* Returns 1 if an import task is in progress, 0 otherwise. */ +int asmImportInProgress(void) { + return asmTaskInProgress(ASM_IMPORT); +} + +/* Returns 1 if the task is in a state where it can receive replication stream +* for the slot range, 0 otherwise. */ +inline static int asmCanFeedMigrationClient(asmTask *task) { + return task->operation == ASM_MIGRATE && + !task->cross_slot_during_propagating && + (task->state == ASM_SEND_BULK_AND_STREAM || + task->state == ASM_SEND_STREAM || + task->state == ASM_HANDOFF_PREP); +} + +/* Feed the migration client with the replication stream for the slot range. */ +void asmFeedMigrationClient(robj **argv, int argc) { + asmTask *task = NULL; + + if (server.cluster_enabled == 0 || listLength(asmManager->tasks) == 0) + return; + + /* Check if there is a migrate task that can receive replication stream. */ + task = listNodeValue(listFirst(asmManager->tasks)); + if (!asmCanFeedMigrationClient(task)) return; + + /* Ensure all arguments are converted to string encoding if necessary, + * since getSlotFromCommand expects them to be string-encoded. + * Generally the arguments are string-encoded, but we may rewrite + * the command arguments to integer encoding. */ + for (int i = 0; i < argc; i++) { + if (!sdsEncodedObject(argv[i])) { + serverAssert(argv[i]->encoding == OBJ_ENCODING_INT); + robj *old = argv[i]; + argv[i] = createStringObjectFromLongLongWithSds((long)old->ptr); + decrRefCount(old); + } + } + + /* Check if the command belongs to the slot range. */ + struct redisCommand *cmd = lookupCommand(argv, argc); + serverAssert(cmd); + + int slot = getSlotFromCommand(cmd, argv, argc); + + /* If the command does not have keys, skip it now. + * SELECT is not propagated, since we only support a single db in cluster mode. + * MULTI/EXEC is not needed, since transaction semantics are unnecessary + * before the slot handoff. + * FUNCTION subcommands should be executed on all nodes, so here we skip it, + * and even propagating them may cause an error when executing. + * + * NOTICE: if some keyless commands should be propagated to the destination, + * we should identify them here and send. */ + if (slot == GETSLOT_NOKEYS) return; + + /* Generally we reject cross-slot commands before executing, but module may + * replicate this kind of command, so we check again. To guarantee data + * consistency, we cancel the task if we encounter a cross-slot command. */ + if (slot == GETSLOT_CROSSSLOT) { + /* We cannot cancel the task directly here, since it may lead to a recursive + * call: asmTaskCancel() --> moduleFireServerEvent() --> moduleFreeContext() + * --> postExecutionUnitOperations() --> propagateNow(). Even worse, this + * could result in propagating pending commands to the replication stream twice. + * To avoid this, we simply set a flag here, cancel the task in beforeSleep. */ + task->cross_slot_during_propagating = 1; + return; + } + + /* Check if the slot belongs to the task's slot range. */ + slotRange sr = {slot, slot}; + if (!slotRangeArrayOverlaps(task->slots, &sr)) return; + + if (unlikely(asmDebugIsFailPointActive(ASM_MIGRATE_MAIN_CHANNEL, task->state))) + freeClientAsync(task->main_channel_client); + + /* Feed main channel with the command. */ + client *c = task->main_channel_client; + size_t prev_bytes = getNormalClientPendingReplyBytes(c); + + addReplyArrayLen(c, argc); + for (int i = 0; i < argc; i++) + addReplyBulk(c, argv[i]); + + /* Update the task's source offset to reflect the bytes sent. */ + task->source_offset += (getNormalClientPendingReplyBytes(c) - prev_bytes); +} + +asmTask *asmCreateImportTask(const char *task_id, slotRangeArray *slots, sds *err) { + clusterNode *source; + + *err = NULL; + /* Validate that the slot ranges are valid and that migration can be + * initiated for them. */ + source = validateImportSlotRanges(slots, err, NULL); + if (!source) + return NULL; + + if (source == getMyClusterNode()) { + *err = sdsnew("this node is already the owner of the slot range"); + return NULL; + } + + /* Only support a single task at a time now. */ + if (listLength(asmManager->tasks) != 0) { + asmTask *current = listNodeValue(listFirst(asmManager->tasks)); + if (current->state == ASM_FAILED) { + /* We can create a new import task only if the current one is failed, + * cancel the failed task to create a new one. */ + asmTaskCancel(current, "new import requested"); + } else { + *err = sdsnew("another ASM task is already in progress"); + return NULL; + } + } + /* There should be no task in progress. */ + serverAssert(listLength(asmManager->tasks) == 0); + + /* Create a slot migration task */ + asmTask *task = asmTaskCreate(task_id); + task->slots = slotRangeArrayDup(slots); + task->state = ASM_NONE; + task->operation = ASM_IMPORT; + task->source_node = source; + memcpy(task->source, source->name, CLUSTER_NAMELEN); + memcpy(task->dest, getMyClusterId(), CLUSTER_NAMELEN); + + listAddNodeTail(asmManager->tasks, task); + sds slots_str = slotRangeArrayToString(slots); + serverLog(LL_NOTICE, "Import task %s created: src=%.40s, dest=%.40s, slots=%s", + task->id, task->source, task->dest, slots_str); + sdsfree(slots_str); + + return task; +} + +/* CLUSTER MIGRATION IMPORT + * + * Sent by operator to the destination node to start the migration. */ +static void clusterMigrationCommandImport(client *c) { + /* Validate slot range arg count */ + int remaining = c->argc - 3; + if (remaining == 0 || remaining % 2 != 0) { + addReplyErrorArity(c); + return; + } + + slotRangeArray *slots = parseSlotRangesOrReply(c, c->argc, 3); + if (!slots) return; + + sds err = NULL; + asmTask *task = asmCreateImportTask(NULL, slots, &err); + slotRangeArrayFree(slots); + if (!task) { + addReplyErrorSds(c, err); + return; + } + + addReplyBulkCString(c, task->id); +} + +/* CLUSTER MIGRATION CANCEL [ID | ALL] + * - Reply: Number of cancelled tasks + * + * Cancels import tasks that overlap with the specified slot ranges. + * Multiple tasks may be cancelled. */ +static void clusterMigrationCommandCancel(client *c) { + sds task_id = NULL; + int num_cancelled = 0; + + /* Validate slot range arg count */ + if (c->argc != 4 && c->argc != 5) { + addReplyErrorArity(c); + return; + } + + if (!strcasecmp(c->argv[3]->ptr, "id")) { + if (c->argc != 5) { + addReplyErrorArity(c); + return; + } + task_id = c->argv[4]->ptr; + } else if (!strcasecmp(c->argv[3]->ptr, "all")) { + if (c->argc != 4) { + addReplyErrorArity(c); + return; + } + } else { + addReplyError(c, "unknown argument"); + return; + } + + num_cancelled = clusterAsmCancel(task_id, "user request"); + addReplyLongLong(c, num_cancelled); +} + +/* Reply with the status of the task. */ +static void replyTaskStatus(client *c, asmTask *task) { + mstime_t p = 0; + + addReplyMapLen(c, 12); + addReplyBulkCString(c, "id"); + addReplyBulkCString(c, task->id); + addReplyBulkCString(c, "slots"); + addReplyBulkSds(c, slotRangeArrayToString(task->slots)); + addReplyBulkCString(c, "source"); + addReplyBulkCBuffer(c, task->source, CLUSTER_NAMELEN); + addReplyBulkCString(c, "dest"); + addReplyBulkCBuffer(c, task->dest, CLUSTER_NAMELEN); + addReplyBulkCString(c, "operation"); + addReplyBulkCString(c, task->operation == ASM_IMPORT ? "import" : "migrate"); + addReplyBulkCString(c, "state"); + addReplyBulkCString(c, asmTaskStateToString(task->state)); + addReplyBulkCString(c, "last_error"); + addReplyBulkCBuffer(c, task->error, sdslen(task->error)); + addReplyBulkCString(c, "retries"); + addReplyLongLong(c, task->retry_count); + addReplyBulkCString(c, "create_time"); + addReplyLongLong(c, task->create_time); + addReplyBulkCString(c, "start_time"); + addReplyLongLong(c, task->start_time); + addReplyBulkCString(c, "end_time"); + addReplyLongLong(c, task->end_time); + + if (task->operation == ASM_MIGRATE && task->state == ASM_COMPLETED) + p = task->end_time - task->paused_time; + addReplyBulkCString(c, "write_pause_ms"); + addReplyLongLong(c, p); +} + +/* CLUSTER MIGRATION STATUS [ID | ALL] + * - Reply: Array of atomic slot migration tasks */ +static void clusterMigrationCommandStatus(client *c) { + listIter li; + listNode *ln; + + if (c->argc != 4 && c->argc != 5) { + addReplyErrorArity(c); + return; + } + + if (!strcasecmp(c->argv[3]->ptr, "id")) { + if (c->argc != 5) { + addReplyErrorArity(c); + return; + } + sds id = c->argv[4]->ptr; + asmTask *task = asmLookupTaskAt(asmManager->tasks, id); + if (!task) task = asmLookupTaskAt(asmManager->archived_tasks, id); + if (!task) { + addReplyArrayLen(c, 0); + return; + } + + addReplyArrayLen(c, 1); + replyTaskStatus(c, task); + } else if (!strcasecmp(c->argv[3]->ptr, "all")) { + if (c->argc != 4) { + addReplyErrorArity(c); + return; + } + addReplyArrayLen(c, listLength(asmManager->tasks) + + listLength(asmManager->archived_tasks)); + listRewind(asmManager->tasks, &li); + while ((ln = listNext(&li)) != NULL) + replyTaskStatus(c, listNodeValue(ln)); + + listRewind(asmManager->archived_tasks, &li); + while ((ln = listNext(&li)) != NULL) + replyTaskStatus(c, listNodeValue(ln)); + } else { + addReplyError(c, "unknown argument"); + return; + } +} + +/* CLUSTER MIGRATION + * | + * STATUS [ID | ALL] | + * CANCEL [ID | ALL]> +*/ +void clusterMigrationCommand(client *c) { + if (c->argc < 4) { + addReplyErrorArity(c); + return; + } + + if (strcasecmp(c->argv[2]->ptr, "import") == 0) { + clusterMigrationCommandImport(c); + } else if (strcasecmp(c->argv[2]->ptr, "status") == 0) { + clusterMigrationCommandStatus(c); + } else if (strcasecmp(c->argv[2]->ptr, "cancel") == 0) { + clusterMigrationCommandCancel(c); + } else { + addReplyError(c, "unknown argument"); + } +} + +/* Notify the state change to the module and the cluster implementation. */ +void asmNotifyStateChange(asmTask *task, int event) { + RedisModuleClusterSlotMigrationInfo info = { + .version = REDISMODULE_CLUSTER_SLOT_MIGRATION_INFO_VERSION, + .task_id = task->id, + .slots = (RedisModuleSlotRangeArray *) task->slots + }; + memcpy(info.source_node_id, task->source, CLUSTER_NAMELEN); + memcpy(info.destination_node_id, task->dest, CLUSTER_NAMELEN); + + int module_event = -1; + if (event == ASM_EVENT_IMPORT_STARTED) module_event = REDISMODULE_SUBEVENT_CLUSTER_SLOT_MIGRATION_IMPORT_STARTED; + else if (event == ASM_EVENT_IMPORT_COMPLETED) module_event = REDISMODULE_SUBEVENT_CLUSTER_SLOT_MIGRATION_IMPORT_COMPLETED; + else if (event == ASM_EVENT_IMPORT_FAILED) module_event = REDISMODULE_SUBEVENT_CLUSTER_SLOT_MIGRATION_IMPORT_FAILED; + else if (event == ASM_EVENT_MIGRATE_STARTED) module_event = REDISMODULE_SUBEVENT_CLUSTER_SLOT_MIGRATION_MIGRATE_STARTED; + else if (event == ASM_EVENT_MIGRATE_COMPLETED) module_event = REDISMODULE_SUBEVENT_CLUSTER_SLOT_MIGRATION_MIGRATE_COMPLETED; + else if (event == ASM_EVENT_MIGRATE_FAILED) module_event = REDISMODULE_SUBEVENT_CLUSTER_SLOT_MIGRATION_MIGRATE_FAILED; + serverAssert(module_event != -1); + + moduleFireServerEvent(REDISMODULE_EVENT_CLUSTER_SLOT_MIGRATION, module_event, &info); + serverLog(LL_DEBUG, "Fire cluster asm module event, task %s: state=%s", + task->id, asmTaskStateToString(task->state)); + + if (clusterNodeIsMaster(getMyClusterNode())) { + /* Notify the cluster impl only if it is a real active import task. */ + if (task != asmManager->master_task) + clusterAsmOnEvent(task->id, event, task->slots); + asmNotifyReplicasStateChange(task); /* Propagate state change to replicas */ + } +} + +void asmImportSetFailed(asmTask *task) { + serverAssert(task->operation == ASM_IMPORT); + if (task->state == ASM_FAILED) return; + + /* If we are in the RDB channel transfer state, we need to + * close the client that was created for the RDB channel. */ + if (task->rdb_channel_conn && task->rdb_channel_state == ASM_RDBCHANNEL_TRANSFER) { + client *c = connGetPrivateData(task->rdb_channel_conn); + serverAssert(c->task == task); + task->rdb_channel_conn = NULL; + c->task = NULL; + c->flags &= ~CLIENT_MASTER; + freeClientAsync(c); + } + + /* If in the wait stream EOF or streaming buffer state, we need to close the + * client that was created for the main channel. */ + if (task->main_channel_conn && + (task->state == ASM_STREAMING_BUF || task->state == ASM_WAIT_STREAM_EOF)) + { + client *c = connGetPrivateData(task->main_channel_conn); + serverAssert(c->task == task); + task->main_channel_conn = NULL; + c->task = NULL; + c->flags &= ~CLIENT_MASTER; + freeClientAsync(c); + } + + /* Close the connections */ + if (task->rdb_channel_conn) connClose(task->rdb_channel_conn); + if (task->main_channel_conn) connClose(task->main_channel_conn); + task->rdb_channel_conn = NULL; + task->main_channel_conn = NULL; + + /* Clear the replication data buffer */ + asmManager->sync_buffer_peak = max(asmManager->sync_buffer_peak, task->sync_buffer.peak); + replDataBufClear(&task->sync_buffer); + + /* Mark the task as failed and notify the cluster */ + task->state = ASM_FAILED; + asmNotifyStateChange(task, ASM_EVENT_IMPORT_FAILED); + /* This node may become replica, only master can setup new slot trimming jobs. */ + if (clusterNodeIsMaster(getMyClusterNode())) + asmTrimJobSchedule(task->slots); +} + +void asmMigrateSetFailed(asmTask *task) { + serverAssert(task->operation == ASM_MIGRATE); + if (task->state == ASM_FAILED) return; + + /* Close the RDB and main channel clients*/ + if (task->rdb_channel_client) { + task->rdb_channel_client->task = NULL; + freeClientAsync(task->rdb_channel_client); + task->rdb_channel_client = NULL; + } + if (task->main_channel_client) { + task->main_channel_client->task = NULL; + freeClientAsync(task->main_channel_client); + task->main_channel_client = NULL; + } + + /* Actually it is not necessary to clear the sync buffer here, + * to make asmTaskReset work properly after migrate task failed */ + replDataBufClear(&task->sync_buffer); + + /* Mark the task as failed and notify the cluster */ + task->state = ASM_FAILED; + asmNotifyStateChange(task, ASM_EVENT_MIGRATE_FAILED); +} + +void asmTaskSetFailed(asmTask *task, const char *fmt, ...) { + va_list ap; + sds error = sdsempty(); + + /* Set the error message */ + va_start(ap, fmt); + error = sdscatvprintf(error, fmt, ap); + va_end(ap); + error = sdscatprintf(error, " (state: %s, rdb_channel_state: %s)", + asmTaskStateToString(task->state), + asmTaskStateToString(task->rdb_channel_state)); + sdsfree(task->error); + task->error = error; + + /* Log the error */ + sds slots_str = slotRangeArrayToString(task->slots); + serverLog(LL_WARNING, "%s task %s failed: slots=%s, err=%s", + task->operation == ASM_IMPORT ? "Import" : "Migrate", + task->id, slots_str, task->error); + sdsfree(slots_str); + + if (task->operation == ASM_IMPORT) + asmImportSetFailed(task); + else + asmMigrateSetFailed(task); +} + +/* The task is completed or canceled. Update stats and move it to + * the archived list. */ +void asmTaskFinalize(asmTask *task) { + listNode *ln = listFirst(asmManager->tasks); + serverAssert(ln->value == task); + + task->source_node = NULL; /* Should never access it */ + task->end_time = server.mstime; + + if (task->operation == ASM_IMPORT) { + asmManager->sync_buffer_peak = max(asmManager->sync_buffer_peak, + task->sync_buffer.peak); + replDataBufClear(&task->sync_buffer); /* Not used, so save memory */ + } + + /* Move the task to the archived list */ + listUnlinkNode(asmManager->tasks, ln); + listLinkNodeHead(asmManager->archived_tasks, ln); +} + +static void asmTaskCancel(asmTask *task, const char *reason) { + if (task->state == ASM_CANCELED) return; + + asmTaskSetFailed(task, "Cancelled due to %s", reason); + task->state = ASM_CANCELED; + asmTaskFinalize(task); +} + +void asmImportTakeover(asmTask *task) { + serverAssert(task->state == ASM_WAIT_STREAM_EOF || + task->state == ASM_STREAMING_BUF); + + /* Free the main channel connection since it is no longer needed. */ + serverAssert(task->main_channel_conn != NULL); + client *c = connGetPrivateData(task->main_channel_conn); + c->task = NULL; + c->flags &= ~CLIENT_MASTER; + freeClientAsync(c); + task->main_channel_conn = NULL; + + task->state = ASM_TAKEOVER; + clusterAsmOnEvent(task->id, ASM_EVENT_TAKEOVER, NULL); +} + +void asmCallbackOnFreeClient(client *c) { + asmTask *task = c->task; + if (!task) return; + + /* If the RDB channel connection is closed, mark the task as failed. */ + if (c->conn && task->rdb_channel_conn == c->conn) { + /* We create the client only when transferring data on the RDB channel */ + serverAssert(task->rdb_channel_state == ASM_RDBCHANNEL_TRANSFER); + task->rdb_channel_conn = NULL; /* Will be freed by freeClient */ + c->flags &= ~CLIENT_MASTER; + asmTaskSetFailed(task, "RDB channel - Connection is closed"); + return; + } + + if (c->conn && task->main_channel_conn == c->conn) { + /* After or in the process of streaming buffer to DB, a client will be + * created based on the main channel connection. */ + serverAssert(task->state == ASM_STREAMING_BUF || + task->state == ASM_WAIT_STREAM_EOF); + task->main_channel_conn = NULL; /* Will be freed by freeClient */ + c->flags &= ~CLIENT_MASTER; + asmTaskSetFailed(task, "Main channel - Connection is closed"); + return; + } + + if (c == task->rdb_channel_client) { + /* TODO: Detect whether the bgsave is completed successfully and + * update the state properly. */ + task->rdb_channel_state = ASM_COMPLETED; + /* We may not have detected whether the child process has exited yet, + * so we can't determine whether the client has completed the slots + * snapshot transfer. If the RDB channel is interrupted unexpectedly, + * the destination side will also close the main channel. + * So here we just reset the RDB channel client of task. */ + task->rdb_channel_client = NULL; + return; + } + + /* If the main channel client is closed, we need to mark the task as failed + * and clean up the RDB channel client if it exists. */ + if (c == task->main_channel_client) { + task->main_channel_client = NULL; + /* The rdb channel client will be cleaned up */ + asmTaskSetFailed(task, "Main and RDB channel clients are disconnected."); + return; + } +} + +/* Sends an AUTH command to the source node using the internal secret. + * Returns an error string if the command fails, or NULL on success. */ +char *asmSendInternalAuth(connection *conn) { + size_t len = 0; + const char *internal_secret = clusterGetSecret(&len); + serverAssert(internal_secret != NULL); + + sds secret = sdsnewlen(internal_secret, len); + char *err = sendCommand(conn, "AUTH", "internal connection", secret, NULL); + sdsfree(secret); + return err; +} + +/* Handles the RDB channel sync with the source node. + * This function is called when the RDB channel is established + * and ready to sync with the source node. */ +void asmRdbChannelSyncWithSource(connection *conn) { + asmTask *task = connGetPrivateData(conn); + char *err = NULL; + sds task_error_msg = NULL; + + /* Check for errors in the socket: after a non blocking connect() we + * may find that the socket is in error state. */ + if (connGetState(conn) != CONN_STATE_CONNECTED) + goto error; + + /* Check if the task is in a fail point state */ + if (unlikely(asmDebugIsFailPointActive(ASM_IMPORT_RDB_CHANNEL, task->rdb_channel_state))) { + char buf[1]; + /* Simulate a failure by shutting down the connection. On some operating + * systems (e.g. Linux), the socket's receive buffer is not flushed + * immediately, so we issue a dummy read to drain any pending data and + * surface the error condition. + * using shutdown() instead of connShutdown() because connTLSShutdown() + * will free the connection directly, which is not what we want. */ + shutdown(conn->fd, SHUT_RDWR); + connRead(conn, buf, 1); + } + + if (task->rdb_channel_state == ASM_CONNECTING) { + connSetReadHandler(conn, asmRdbChannelSyncWithSource); + connSetWriteHandler(conn, NULL); + + /* Send AUTH command to source node using internal auth */ + err = asmSendInternalAuth(conn); + if (err) goto write_error; + task->rdb_channel_state = ASM_AUTH_REPLY; + return; + } + + if (task->rdb_channel_state == ASM_AUTH_REPLY) { + err = receiveSynchronousResponse(conn); + /* The source node did not reply */ + if (err == NULL) goto no_response_error; + + /* Check `+OK` reply */ + if (!strcmp(err, "+OK")) { + sdsfree(err); + err = NULL; + task->rdb_channel_state = ASM_RDBCHANNEL_REQUEST; + serverLog(LL_NOTICE, "Source node replied to AUTH command, syncslots rdb channel operation can continue..."); + } else { + task_error_msg = sdscatprintf(sdsempty(), + "Error reply to AUTH from source: %s", err); + sdsfree(err); + goto error; + } + } + + if (task->rdb_channel_state == ASM_RDBCHANNEL_REQUEST) { + err = sendCommand(conn, "CLUSTER", "SYNCSLOTS", "RDBCHANNEL", task->id, NULL); + if (err) goto write_error; + task->rdb_channel_state = ASM_RDBCHANNEL_REPLY; + return; + } + + if (task->rdb_channel_state == ASM_RDBCHANNEL_REPLY) { + err = receiveSynchronousResponse(conn); + /* The source node did not reply */ + if (err == NULL) goto no_response_error; + + /* Ignore ‘\n' sent from the source node to keep the connection alive. */ + if (sdslen(err) == 0) { + serverLog(LL_DEBUG, "Received an empty line in RDBCHANNEL reply, slots snapshot delivery will start later"); + sdsfree(err); + return; + } + + /* Check `+SLOTSSNAPSHOT` reply */ + if (!strncmp(err, "+SLOTSSNAPSHOT", strlen("+SLOTSSNAPSHOT"))) { + sdsfree(err); + err = NULL; + task->state = ASM_ACCUMULATE_BUF; + /* The main channel buffers pending commands. */ + connSetReadHandler(task->main_channel_conn, asmSyncBufferReadFromConn); + + task->rdb_channel_state = ASM_RDBCHANNEL_TRANSFER; + client *c = createClient(conn); + c->flags |= (CLIENT_MASTER | CLIENT_INTERNAL | CLIENT_ASM_IMPORTING); + c->querybuf = sdsempty(); + c->authenticated = 1; + c->user = NULL; + c->task = task; + serverLog(LL_NOTICE, + "Source node replied to SLOTSSNAPSHOT, syncing slots snapshot can continue..."); + } else { + task_error_msg = sdscatprintf(sdsempty(), + "Error reply to CLUSTER SYNCSLOTS RDBCHANNEL from the source: %s", err); + sdsfree(err); + goto error; + } + return; + } + return; + +no_response_error: + task_error_msg = sdsnew("Source node did not respond to command during RDBCHANNELSYNCSLOTS handshake"); + /* Fall through to regular error handling */ + +error: + asmTaskSetFailed(task, "RDB channel - Failed to sync with the source node: %s", + task_error_msg ? task_error_msg : connGetLastError(conn)); + sdsfree(task_error_msg); + return; + +write_error: /* Handle sendCommand() errors. */ + task_error_msg = sdscatprintf(sdsempty(), "Failed to send command to the source node: %s", err); + sdsfree(err); + goto error; +} + +char *asmSendSlotRangesSync(connection *conn, asmTask *task) { + /* Prepare CLUSTER SYNCSLOTS SYNC command */ + serverAssert(task->slots->num_ranges <= CLUSTER_SLOTS); + int argc = task->slots->num_ranges * 2 + 4; + char **args = zcalloc(sizeof(char*) * argc); + size_t *lens = zcalloc(sizeof(size_t) * argc); + + args[0] = "CLUSTER"; + args[1] = "SYNCSLOTS"; + args[2] = "SYNC"; + args[3] = task->id; + lens[0] = strlen("CLUSTER"); + lens[1] = strlen("SYNCSLOTS"); + lens[2] = strlen("SYNC"); + lens[3] = sdslen(task->id); + + int i = 4; + for (int j = 0; j < task->slots->num_ranges; j++) { + slotRange *sr = &task->slots->ranges[j]; + args[i] = sdscatprintf(sdsempty(), "%d", sr->start); + lens[i] = sdslen(args[i]); + args[i+1] = sdscatprintf(sdsempty(), "%d", sr->end); + lens[i+1] = sdslen(args[i+1]); + i += 2; + } + serverAssert(i == argc); + + /* Send command to source node */ + char *err = sendCommandArgv(conn, argc, args, lens); + + /* Free allocated memory */ + for (int j = 4; j < argc; j++) { + sdsfree(args[j]); + } + zfree(args); + zfree(lens); + + return err; +} + +void asmSyncWithSource(connection *conn) { + asmTask *task = connGetPrivateData(conn); + char *err = NULL; + + /* Some task errors are not network issues, we record them explicitly. */ + sds task_error_msg = NULL; + + /* Check for errors in the socket: after a non blocking connect() we + * may find that the socket is in error state. */ + if (connGetState(conn) != CONN_STATE_CONNECTED) + goto error; + + /* Check if the fail point is active for this channel and state */ + if (unlikely(asmDebugIsFailPointActive(ASM_IMPORT_MAIN_CHANNEL, task->state))) { + char buf[1]; + shutdown(conn->fd, SHUT_RDWR); + connRead(conn, buf, 1); + } + + if (task->state == ASM_CONNECTING) { + connSetReadHandler(conn, asmSyncWithSource); + connSetWriteHandler(conn, NULL); + /* Send AUTH command to source node using internal auth */ + err = asmSendInternalAuth(conn); + if (err) goto write_error; + task->state = ASM_AUTH_REPLY; + return; + } + + if (task->state == ASM_AUTH_REPLY) { + err = receiveSynchronousResponse(conn); + /* The source node did not reply */ + if (err == NULL) goto no_response_error; + + /* Check `+OK` reply */ + if (!strcmp(err, "+OK")) { + sdsfree(err); + err = NULL; + task->state = ASM_SEND_HANDSHAKE; + serverLog(LL_NOTICE, "Source node replied to AUTH command, syncslots can continue..."); + } else { + task_error_msg = sdscatprintf(sdsempty(), + "Error reply to AUTH from the source: %s", err); + sdsfree(err); + goto error; + } + } + + if (task->state == ASM_SEND_HANDSHAKE) { + sds node_id = sdsnewlen(clusterNodeGetName(getMyClusterNode()), CLUSTER_NAMELEN); + err = sendCommand(conn, "CLUSTER", "SYNCSLOTS", "CONF", "NODE-ID", node_id, NULL); + sdsfree(node_id); + if (err) goto write_error; + task->state = ASM_HANDSHAKE_REPLY; + return; + } + + if (task->state == ASM_HANDSHAKE_REPLY) { + err = receiveSynchronousResponse(conn); + /* The source node did not reply */ + if (err == NULL) goto no_response_error; + + /* Check `+OK` reply */ + if (!strcmp(err, "+OK")) { + sdsfree(err); + err = NULL; + task->state = ASM_SEND_SYNCSLOTS; + serverLog(LL_NOTICE, "Source node replied to SYNCSLOTS CONF command, syncslots can continue..."); + } else { + task_error_msg = sdscatprintf(sdsempty(), + "Error reply to CLUSTER SYNCSLOTS CONF from the source: %s", err); + sdsfree(err); + goto error; + } + } + + if (task->state == ASM_SEND_SYNCSLOTS) { + err = asmSendSlotRangesSync(conn, task); + if (err) goto write_error; + + task->state = ASM_SYNCSLOTS_REPLY; + return; + } + + if (task->state == ASM_SYNCSLOTS_REPLY) { + err = receiveSynchronousResponse(conn); + /* The source node did not reply */ + if (err == NULL) goto no_response_error; + + /* Check `+RDBCHANNELSYNCSLOTS` reply */ + if (!strncmp(err, "+RDBCHANNELSYNCSLOTS", strlen("+RDBCHANNELSYNCSLOTS"))) { + sdsfree(err); + err = NULL; + task->state = ASM_INIT_RDBCHANNEL; + serverLog(LL_NOTICE, + "Source node replied to RDBCHANNELSYNCSLOTS, syncslots can continue..."); + } else { + task_error_msg = sdscatprintf(sdsempty(), + "Error reply to CLUSTER SYNCSLOTS SYNC from the source: %s", err); + sdsfree(err); + goto error; + } + } + + if (task->state == ASM_INIT_RDBCHANNEL) { + /* Create RDB channel connection */ + char *ip = clusterNodeIp(task->source_node); + int port = server.tls_replication ? clusterNodeTlsPort(task->source_node) : + clusterNodeTcpPort(task->source_node); + task->rdb_channel_conn = connCreate(server.el, connTypeOfReplication()); + if (connConnect(task->rdb_channel_conn, ip, port, + server.bind_source_addr, asmRdbChannelSyncWithSource) == C_ERR) + { + serverLog(LL_WARNING, "Unable to connect to the source node: %s", + connGetLastError(task->rdb_channel_conn)); + goto error; + } + task->rdb_channel_state = ASM_CONNECTING; + connSetPrivateData(task->rdb_channel_conn, task); + serverLog(LL_NOTICE, + "RDB channel connection to source node %.40s established, waiting for AUTH reply...", + task->source); + + /* Main channel waits for the new event */ + connSetReadHandler(conn, NULL); + return; + } + return; + +no_response_error: + serverLog(LL_WARNING, "Source node did not respond to command during SYNCSLOTS handshake"); + /* Fall through to regular error handling */ + +error: + asmTaskSetFailed(task, "Main channel - Failed to sync with source node: %s", + task_error_msg ? task_error_msg : connGetLastError(conn)); + sdsfree(task_error_msg); + return; + +write_error: /* Handle sendCommand() errors. */ + serverLog(LL_WARNING, "Failed to send command to source node: %s", err); + sdsfree(err); + goto error; +} + +int asmImportSendACK(asmTask *task) { + serverAssert(task->operation == ASM_IMPORT && task->state == ASM_WAIT_STREAM_EOF); + serverLog(LL_DEBUG, "Destination node applied offset is %lld", task->dest_offset); + + char offset[64]; + ull2string(offset, sizeof(offset), task->dest_offset); + + char *err = sendCommand(task->main_channel_conn, "CLUSTER", "SYNCSLOTS", "ACK", + asmTaskStateToString(task->state), offset, NULL); + if (err) { + asmTaskSetFailed(task, "Main channel - Failed to send ACK: %s", err); + sdsfree(err); + return C_ERR; + } + return C_OK; +} + +/* Called when the RDB channel begins sending the snapshot. + * From this point on, the main channel also starts sending incremental streams. */ +void asmSlotSnapshotAndStreamStart(struct asmTask *task) { + if (task == NULL || task->state != ASM_WAIT_BGSAVE_START) return; + + if (unlikely(asmDebugIsFailPointActive(ASM_MIGRATE_RDB_CHANNEL, task->state))) { + shutdown(task->rdb_channel_client->conn->fd, SHUT_RDWR); + return; + } + task->main_channel_client->replstate = SLAVE_STATE_SEND_BULK_AND_STREAM; + + task->state = ASM_SEND_BULK_AND_STREAM; + task->rdb_channel_state = ASM_RDBCHANNEL_TRANSFER; + + /* From the source node's perspective, the destination node begins to accumulate + * the buffer while the RDB channel starts applying the slot snapshot data. */ + task->dest_state = ASM_ACCUMULATE_BUF; + task->dest_slots_snapshot_time = server.mstime; +} + +/* Called when the RDB channel has succeeded in sending the snapshot. */ +void asmSlotSnapshotSucceed(struct asmTask *task) { + if (task == NULL || task->state != ASM_SEND_BULK_AND_STREAM) return; + + /* The destination starts sending ACKs to keep the main channel alive after + * receiving the snapshot, so here we need to update the last interaction + * time to avoid false timeout. */ + task->main_channel_client->lastinteraction = server.unixtime; + + task->state = ASM_SEND_STREAM; + task->rdb_channel_state = ASM_COMPLETED; +} + +/* Called when the RDB channel fails to send the snapshot. */ +void asmSlotSnapshotFailed(struct asmTask *task) { + if (task == NULL || task->state != ASM_SEND_BULK_AND_STREAM) return; + + asmTaskSetFailed(task, "RDB channel - Failed to send slots snapshot"); +} + +/* CLUSTER SYNCSLOTS SNAPSHOT-EOF + * + * This command is sent by the source node to the destination node to indicate + * that the slots snapshot has ended. */ +void clusterSyncSlotsSnapshotEOF(client *c) { + /* This client is RDB channel connection. */ + asmTask *task = c->task; + if (!task || task->rdb_channel_state != ASM_RDBCHANNEL_TRANSFER || + c->conn != task->rdb_channel_conn) + { + /* Unexpected SNAPSHOT-EOF command */ + serverLog(LL_WARNING, "Unexpected CLUSTER SYNCSLOTS SNAPSHOT-EOF command: " + "rdb_channel_state=%s", + asmTaskStateToString(task ? task->rdb_channel_state : ASM_NONE)); + freeClientAsync(c); + return; + } + + /* RDB channel state: ASM_RDBCHANNEL_TRANSFER */ + if (unlikely(asmDebugIsFailPointActive(ASM_IMPORT_RDB_CHANNEL, task->rdb_channel_state))) { + freeClientAsync(c); /* Simulate a failure */ + return; + } + + /* Clear the RDB channel connection */ + task->rdb_channel_conn = NULL; + task->rdb_channel_state = ASM_COMPLETED; + serverLog(LL_NOTICE, "RDB channel snapshot transfer completed for the import task."); + + /* Free the RDB channel connection. */ + c->task = NULL; + c->flags &= ~CLIENT_MASTER; + freeClientAsync(c); + + /* Will start streaming the buffer to DB, don't start here since now + * we are in the context of executing command, otherwise, redis will + * generate a big MULTI-EXEC including all the commands in the buffer. + * just update the state here, and do it in beforeSleep(). */ + task->state = ASM_READY_TO_STREAM; + connSetReadHandler(task->main_channel_conn, NULL); +} + +/* CLUSTER SYNCSLOTS STREAM-EOF + * + * This command is sent by the source node to the destination node to indicate + * that the slot sync stream has ended and the slots can be handed off. */ +void clusterSyncSlotsStreamEOF(client *c) { + asmTask *task = c->task; + + if (!task || task->operation != ASM_IMPORT) { + serverLog(LL_WARNING, "Unexpected CLUSTER SYNCSLOTS STREAM-EOF command"); + freeClientAsync(c); + return; + } + + if (task->state == ASM_STREAMING_BUF) { + /* We are still streaming the buffer to DB, mark the EOF received, and we + * can take over after streaming is EOF. Since we may release the context + * in asmImportTakeover, this breaks the context for streaming buffer. */ + task->stream_eof_during_streaming = 1; + serverLog(LL_NOTICE, "CLUSTER SYNCSLOTS STREAM-EOF received during streaming buffer"); + return; + } + + if (task->state != ASM_WAIT_STREAM_EOF) { + serverLog(LL_WARNING, "Unexpected CLUSTER SYNCSLOTS STREAM-EOF state: %s", + asmTaskStateToString(task->state)); + freeClientAsync(c); + return; + } + serverLog(LL_NOTICE, "CLUSTER SYNCSLOTS STREAM-EOF received when waiting for STREAM-EOF"); + + /* STREAM-EOF received, the source is ready to handoff, takeover now. */ + asmImportTakeover(task); +} + +/* Start the import task. */ +static void asmStartImportTask(asmTask *task) { + if (task->operation != ASM_IMPORT || task->state != ASM_NONE) return; + sds slots_str = slotRangeArrayToString(task->slots); + + /* Sanity check: Clean up any keys that exist in slots not owned by this node. + * This handles cases where users previously migrated slots using legacy method + * but left behind orphaned keys, or maybe cluster missed cleaning up during + * previous operations, which could interfere with the ASM import process. */ + asmTrimSlotsIfNotOwned(task->slots); + + /* Check if there is any trim job in progress for the slot ranges. + * We can't start the import task since the trim job will modify the data.*/ + int trim_in_progress = asmIsAnyTrimJobOverlaps(task->slots); + + /* Notify the cluster implementation to prepare for the import task. */ + int impl_ret = clusterAsmOnEvent(task->id, ASM_EVENT_IMPORT_PREP, task->slots); + + static int start_blocked_logged = 0; + /* Cannot start import task since pause action is performed. Otherwise, we + * will break the promise that no writes are performed during the pause. */ + if (isPausedActions(PAUSE_ACTION_CLIENT_ALL) || + isPausedActions(PAUSE_ACTION_CLIENT_WRITE) || + trim_in_progress || + impl_ret != C_OK) + { + const char *reason = impl_ret != C_OK ? "cluster is not ready" : + trim_in_progress ? "trim in progress for some of the slots" : + "server paused"; + if (start_blocked_logged == 0) { + serverLog(LL_WARNING, "Can not start import task %s for slots: %s due to %s", + task->id, slots_str, reason); + start_blocked_logged = 1; + } + sdsfree(slots_str); + return; + } + start_blocked_logged = 0; /* Reset the log flag */ + + /* Detect if the cluster topology is changed. We should cancel the task if + * we can not schedule it, and update the source node if needed. */ + sds err = NULL; + clusterNode *source = validateImportSlotRanges(task->slots, &err, task); + if (!source) { + asmTaskCancel(task, err); + sdsfree(slots_str); + sdsfree(err); + return; + } + /* Now I'm the owner of the slot range, cancel the import task. */ + if (source == getMyClusterNode()) { + asmTaskCancel(task, "slots owned by myself now"); + sdsfree(slots_str); + return; + } + /* Change the source node if needed. */ + if (source != task->source_node) { + task->source_node = source; + memcpy(task->source, source->name, CLUSTER_NAMELEN); + serverLog(LL_NOTICE, "Import task %s source node changed: slots=%s, " + "new_source=%.40s", task->id, slots_str, source->name); + } + + serverLog(LL_NOTICE, "Import task %s starting: src=%.40s, dest=%.40s, slots=%s", + task->id, task->source, task->dest, slots_str); + sdsfree(slots_str); + + task->state = ASM_CONNECTING; + task->start_time = server.mstime; + asmNotifyStateChange(task, ASM_EVENT_IMPORT_STARTED); + + task->main_channel_conn = connCreate(server.el, connTypeOfReplication()); + char *ip = clusterNodeIp(task->source_node); + int port = server.tls_replication ? clusterNodeTlsPort(task->source_node) : + clusterNodeTcpPort(task->source_node); + if (connConnect(task->main_channel_conn, ip, port, server.bind_source_addr, + asmSyncWithSource) == C_ERR) + { + asmTaskSetFailed(task, "Main channel - Failed to connect to source node: %s", + connGetLastError(task->main_channel_conn)); + return; + } + connSetPrivateData(task->main_channel_conn, task); +} + +void clusterSyncSlotsCommand(client *c) { + /* Only internal clients are allowed to execute this command to avoid + * potential attack, since some state changes are not well protected, + * external clients may damage the slot migration state. */ + if (!(c->flags & (CLIENT_INTERNAL | CLIENT_MASTER))) { + addReplyError(c, "CLUSTER SYNCSLOTS subcommands are only allowed for internal clients"); + c->flags |= CLIENT_CLOSE_AFTER_REPLY; + return; + } + + /* On replica, only allow master client to execute CONF subcommand. */ + if (!clusterNodeIsMaster(getMyClusterNode())) { + if (!(c->flags & CLIENT_MASTER)) { + /* Not master client, reject all subcommands and close the connection. */ + addReplyError(c, "CLUSTER SYNCSLOTS subcommands are only allowed for master"); + c->flags |= CLIENT_CLOSE_AFTER_REPLY; + return; + } else { + /* Only allow CONF subcommand on replica. */ + if (strcasecmp(c->argv[2]->ptr, "conf")) return; + } + } + + if (!strcasecmp(c->argv[2]->ptr, "sync") && c->argc >= 6) { + /* CLUSTER SYNCSLOTS SYNC [ ] */ + if (c->argc % 2 == 1) { + addReplyErrorArity(c); + return; + } + + slotRangeArray *slots = parseSlotRangesOrReply(c, c->argc, 4); + if (!slots) return; + + /* Validate that the slot ranges are valid and that migration can be + * initiated for them. */ + sds err = NULL; + clusterNode *source = validateImportSlotRanges(slots, &err, NULL); + if (!source) { + addReplyErrorSds(c, err); + slotRangeArrayFree(slots); + return; + } + + /* Check if the source node is the same as the current node. */ + if (source != getMyClusterNode()) { + addReplyError(c, "This node is not the owner of the slots"); + slotRangeArrayFree(slots); + return; + } + + sds task_id = c->argv[3]->ptr; + /* Notify the cluster implementation to prepare for the migrate task. */ + if (clusterAsmOnEvent(task_id, ASM_EVENT_MIGRATE_PREP, slots) != C_OK) { + addReplyError(c, "Cluster is not ready right now, please retry later"); + slotRangeArrayFree(slots); + return; + } + + asmTask *task = listLength(asmManager->tasks) == 0 ? NULL : + listNodeValue(listFirst(asmManager->tasks)); + if (task && !strcmp(task->id, task_id) && + task->operation == ASM_MIGRATE && task->state == ASM_FAILED && + slotRangeArrayIsEqual(slots, task->slots) && + memcmp(task->dest, c->node_id, CLUSTER_NAMELEN) == 0) + { + /* Reuse the failed task */ + asmTaskReset(task); + slotRangeArrayFree(task->slots); /* Will be set again later */ + task->retry_count++; + } else if (task) { + if (task->state == ASM_FAILED) { + /* We can create a new migrate task only if the current one is + * failed, cancel the failed task to create a new one. */ + asmTaskCancel(task, "new migration requested"); + task = NULL; + } else { + addReplyError(c, "Another ASM task is already in progress"); + slotRangeArrayFree(slots); + return; + } + } + + /* Create the migrate slots task and add it to the list, + * otherwise reuse the existing one */ + if (task == NULL) { + task = asmTaskCreate(task_id); + task->start_time = server.mstime; /* Start immediately */ + serverAssert(listLength(asmManager->tasks) == 0); + listAddNodeTail(asmManager->tasks, task); + } + + task->slots = slots; + task->operation = ASM_MIGRATE; + memcpy(task->source, clusterNodeGetName(getMyClusterNode()), CLUSTER_NAMELEN); + if (c->node_id) memcpy(task->dest, c->node_id, CLUSTER_NAMELEN); + + task->main_channel_client = c; + c->task = task; + + /* We mark the main channel client as a replica, so this client is limited + * by the client output buffer settings for replicas. The replstate has + * no real significance, just to prevent it from going online. */ + c->flags |= (CLIENT_SLAVE | CLIENT_ASM_MIGRATING); + c->replstate = SLAVE_STATE_WAIT_RDB_CHANNEL; + if (server.repl_disable_tcp_nodelay) + connDisableTcpNoDelay(c->conn); /* Non-critical if it fails. */ + listAddNodeTail(server.slaves, c); + createReplicationBacklogIfNeeded(); + + /* Wait for RDB channel to be ready */ + task->state = ASM_WAIT_RDBCHANNEL; + + sds slots_str = slotRangeArrayToString(slots); + serverLog(LL_NOTICE, "Migrate task %s created: src=%.40s, dest=%.40s, slots=%s", + task->id, task->source, task->dest, slots_str); + sdsfree(slots_str); + + asmNotifyStateChange(task, ASM_EVENT_MIGRATE_STARTED); + + /* Keep the client in the main thread to avoid data races between the + * connWrite call below and the client's event handler in IO threads. */ + if (c->tid != IOTHREAD_MAIN_THREAD_ID) keepClientInMainThread(c); + + /* addReply*() is not suitable for clients in SLAVE_STATE_WAIT_RDB_CHANNEL state. */ + if (connWrite(c->conn, "+RDBCHANNELSYNCSLOTS\r\n", 22) != 22) + freeClientAsync(c); + } else if (!strcasecmp(c->argv[2]->ptr, "rdbchannel") && c->argc == 4) { + /* CLUSTER SYNCSLOTS RDBCHANNEL */ + sds task_id = c->argv[3]->ptr; + if (sdslen(task_id) != CLUSTER_NAMELEN) { + addReplyError(c, "Invalid task id"); + return; + } + + if (listLength(asmManager->tasks) == 0) { + addReplyError(c, "No slot migration task in progress"); + return; + } + + asmTask *task = listNodeValue(listFirst(asmManager->tasks)); + if (task->operation != ASM_MIGRATE || task->state != ASM_WAIT_RDBCHANNEL || + strcmp(task->id, task_id) != 0) + { + addReplyError(c, "Another migration task is already in progress"); + return; + } + + if (unlikely(asmDebugIsFailPointActive(ASM_MIGRATE_MAIN_CHANNEL, task->state))) { + /* Close the main channel client before rdb channel client connects */ + if (task->main_channel_client) + freeClient(task->main_channel_client); + } + + /* The main channel client must be present when setting RDB channel client */ + if (task->main_channel_client == NULL) { + /* Maybe the main channel connection is closed. */ + addReplyError(c, "Main channel connection is not established"); + return; + } + + /* Mark the client as a slave to generate slots snapshot */ + c->flags |= (CLIENT_SLAVE | CLIENT_REPL_RDB_CHANNEL | CLIENT_REPL_RDBONLY | CLIENT_ASM_MIGRATING); + c->slave_capa |= SLAVE_CAPA_EOF; + c->slave_req |= (SLAVE_REQ_SLOTS_SNAPSHOT | SLAVE_REQ_RDB_CHANNEL); + c->replstate = SLAVE_STATE_WAIT_BGSAVE_START; + c->repldbfd = -1; + if (server.repl_disable_tcp_nodelay) + connDisableTcpNoDelay(c->conn); /* Non-critical if it fails. */ + listAddNodeTail(server.slaves, c); + + /* Wait for bgsave to start for slots sync */ + task->state = ASM_WAIT_BGSAVE_START; + task->rdb_channel_state = ASM_WAIT_BGSAVE_START; + task->rdb_channel_client = c; + c->task = task; + + /* Keep the client in the main thread to avoid data races between the + * connWrite call in startBgsaveForReplication and the client's event + * handler in IO threads. */ + if (c->tid != IOTHREAD_MAIN_THREAD_ID) keepClientInMainThread(c); + + if (!hasActiveChildProcess()) { + startBgsaveForReplication(c->slave_capa, c->slave_req); + } else { + serverLog(LL_NOTICE, "BGSAVE for slots snapshot sync delayed"); + } + } else if (!strcasecmp(c->argv[2]->ptr, "snapshot-eof") && c->argc == 3) { + /* CLUSTER SYNCSLOTS SNAPSHOT-EOF */ + clusterSyncSlotsSnapshotEOF(c); + } else if (!strcasecmp(c->argv[2]->ptr, "stream-eof") && c->argc == 3) { + /* CLUSTER SYNCSLOTS STREAM-EOF */ + clusterSyncSlotsStreamEOF(c); + } else if (!strcasecmp(c->argv[2]->ptr, "ack") && c->argc == 5) { + /* CLUSTER SYNCSLOTS ACK */ + long long offset; + int dest_state; + + if (!strcasecmp(c->argv[3]->ptr, asmTaskStateToString(ASM_STREAMING_BUF))) { + dest_state = ASM_STREAMING_BUF; + } else if (!strcasecmp(c->argv[3]->ptr, asmTaskStateToString(ASM_WAIT_STREAM_EOF))) { + dest_state = ASM_WAIT_STREAM_EOF; + } else { + return; /* Not support now. */ + } + + if ((getLongLongFromObject(c->argv[4], &offset) != C_OK)) + return; + + if (c->task && c->task->operation == ASM_MIGRATE) { + /* Update the state and ACKed offset from destination. */ + asmTask *task = c->task; + task->dest_state = dest_state; + if (task->dest_offset > (unsigned long long) offset) { + serverLog(LL_WARNING, "CLUSTER SYNCSLOTS ACK received, dest state: %s, " + "but offset %lld is less than the current dest offset %lld", + asmTaskStateToString(dest_state), offset, task->dest_offset); + return; + } + task->dest_offset = offset; + serverLog(LL_DEBUG, "CLUSTER SYNCSLOTS ACK received, dest state: %s, " + "updated dest offset to %lld, source offset: %lld", + asmTaskStateToString(dest_state), task->dest_offset, task->source_offset); + + /* Record the time when the destination finishes applying the accumulated buffer */ + if (task->dest_state == ASM_WAIT_STREAM_EOF && task->dest_accum_applied_time == 0) + task->dest_accum_applied_time = server.mstime; + + /* Pause write if needed */ + if (task->state == ASM_SEND_BULK_AND_STREAM || task->state == ASM_SEND_STREAM) { + /* Pause writes on the main channel if the lag is less than the threshold. */ + if (task->dest_offset + server.asm_handoff_max_lag_bytes >= task->source_offset) { + if (unlikely(asmDebugIsFailPointActive(ASM_MIGRATE_MAIN_CHANNEL, ASM_HANDOFF_PREP))) + return; /* Do not enter handoff prep state for testing buffer drain timeout. */ + + serverLog(LL_NOTICE, "The applied offset lag %lld is less than the threshold %lld, " + "pausing writes for slot handoff", + task->source_offset - task->dest_offset, + server.asm_handoff_max_lag_bytes); + task->state = ASM_HANDOFF_PREP; + clusterAsmOnEvent(task->id, ASM_EVENT_HANDOFF_PREP, task->slots); + } + } + } + } else if (!strcasecmp(c->argv[2]->ptr, "fail") && c->argc == 4) { + /* CLUSTER SYNCSLOTS FAIL */ + return; /* This is a no-op, just to handle the command syntax. */ + } else if (!strcasecmp(c->argv[2]->ptr, "conf") && c->argc >= 5) { + /* CLUSTER SYNCSLOTS CONF