From 2bc4e0299d4d23cf8a0cb32b5cbf82e6ecb2b75a Mon Sep 17 00:00:00 2001 From: Ozan Tezcan Date: Wed, 22 Oct 2025 15:56:20 +0300 Subject: [PATCH] Add Atomic Slot Migration (ASM) support (#14414) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Overview This PR is a joint effort with @ShooterIT . I’m just opening it on behalf of both of us. This PR introduces Atomic Slot Migration (ASM) for Redis Cluster — a new mechanism for safely and efficiently migrating hash slots between nodes. Redis Cluster distributes data across nodes using 16384 hash slots, each owned by a specific node. Sometimes slots need to be moved — for example, to rebalance after adding or removing nodes, or to mitigate a hot shard that’s overloaded. Before ASM, slot migration was non-atomic and client-dependent, relying on CLUSTER SETSLOT, GETKEYSINSLOT, MIGRATE commands, and client-side handling of ASK/ASKING replies. This process was complex, error-prone, slow and could leave clusters in inconsistent states after failures. Clients had to implement redirect logic, multi-key commands could fail mid-migration, and errors often resulted in orphaned keys or required manual cleanup. Several related discussions can be found in the issue list, some examples: https://github.com/redis/redis/issues/14300 , https://github.com/redis/redis/issues/4937 , https://github.com/redis/redis/issues/10370 , https://github.com/redis/redis/issues/4333 , https://github.com/redis/redis/issues/13122, https://github.com/redis/redis/issues/11312 Atomic Slot Migration (ASM) makes slot rebalancing safe, transparent, and reliable, addressing many of the limitations of the legacy migration method. Instead of moving keys one by one, ASM replicates the entire slot’s data plus live updates to the target node, then performs a single atomic handoff. Clients keep working without handling ASK/ASKING replies, multi-key operations remain consistent, failures don’t leave partial states, and replicas stay in sync. The migration process also completes significantly faster. Operators gain new commands (CLUSTER MIGRATION IMPORT, STATUS, CANCEL) for monitoring and control, while modules can hook into migration events for deeper integration. ### The problems of legacy method in detail Operators and developers ran into multiple issues with the legacy method, some of these issues in detail: 1. **Redirects and Client Complexity:** While a slot was being migrated, some keys were already moved while others were not. Clients had to handle `-ASK` and `-ASKING` responses, reissuing requests to the target node. Not all client libraries implemented this correctly, leading to failed commands or subtle bugs. Even when implemented, it increased latency and broke naive pipelines. 2. **Multi-Key Operations Became Unreliable:** Commands like `MGET key1 key2` could fail with `TRYAGAIN` if part of the slot was already migrated. This made application logic unpredictable during resharding. 3. **Risk of failure:** Keys were moved one-by-one (with MIGRATE command). If the source crashed, or the destination ran out of memory, the system could be left in an inconsistent state: some keys moved, others lost, slots partially migrated. Manual intervention was often needed, sometimes resulting in data loss. 4. **Replica and Failover Issues:** Replicas weren’t aware of migrations in progress. If a failover occurred mid-migration, manual intervention was required to clean up or resume the process safely. 5. **Operational Overhead:** Operators had to coordinate multiple commands (CLUSTER SETSLOT, MIGRATE, GETKEYSINSLOT, etc.) with little visibility into progress or errors, making rebalancing slow and error-prone. 6. **Poor performance:** Key-by-key migration was inherently slow and inefficient for large slot ranges. 7. **Large keys:** Large keys could fail to migrate or cause latency spikes on the destination node. ### How Atomic Slot Migration Fixes This Atomic Slot Migration (ASM) eliminates all of these issues by: 1. **Clients:** Clients no longer need to handle ASK/ASKING; the migration is fully transparent. 2. **Atomic ownership transfer:** The entire slot’s data (snapshot + live updates) is replicated and handed off in a single atomic step. 3. **Performance**: ASM completes migrations significantly faster by streaming slot data in parallel (snapshot + incremental updates) and eliminating key-by-key operations. 4. **Consistency guarantees:** Multi-key operations and pipelines continue to work reliably throughout migration. 5. **Resilience:** Failures no longer leave orphaned keys or partial states; migration tasks can be retried or safely cancelled. 6. **Replica awareness:** Replicas remain consistent during migration, and failovers will no longer leave partially imported keys. 7. **Operator visibility:** New CLUSTER MIGRATION subcommands (IMPORT, STATUS, CANCEL) provide clear observability and management for operators. ### ASM Diagram and Migration Steps ``` ┌─────────────┐ ┌────────────┐ ┌───────────┐ ┌───────────┐ ┌───────┐ │ │ │Destination │ │Destination│ │ Source │ │Source │ │ Operator │ │ master │ │ replica │ │ master │ │ Fork │ │ │ │ │ │ │ │ │ │ │ └──────┬──────┘ └─────┬──────┘ └─────┬─────┘ └─────┬─────┘ └───┬───┘ │ │ │ │ │ │ │ │ │ │ │CLUSTER MIGRATION IMPORT │ │ │ │ │ ..│ │ │ │ ├───────────────────────────►│ │ │ │ │ │ │ │ │ │ Reply with │ │ │ │ │◄───────────────────────────┤ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ CLUSTER SYNCSLOTS│SYNC │ │ │ CLUSTER MIGRATION STATUS │ .│ │ Monitor │ ID ├────────────────────────────────────►│ │ task ┌─►├───────────────────────────►│ │ │ │ state │ │ │ │ │ │ till │ │ Reply status │ Negotiation with multiple channels │ │ completed └─ │◄───────────────────────────┤ (i.e rdbchannel repl) │ │ │ │◄───────────────────────────────────►│ │ │ │ │ │ Fork │ │ │ │ ├──────────►│ ─┐ │ │ │ │ │ │ Slot snapshot as RESTORE commands │ │ │ │◄────────────────────────────────────────────────┤ │ │ Propagate │ │ │ │ ┌─────────────┐ ├─────────────────►│ │ │ │ │ │ │ │ │ │ │ Snapshot │ Client │ │ │ │ │ │ delivery │ │ │ Replication stream for slot range │ │ │ duration └──────┬──────┘ │◄────────────────────────────────────┤ │ │ │ │ Propagate │ │ │ │ │ ├─────────────────►│ │ │ │ │ │ │ │ │ │ │ SET key value1 │ │ │ │ │ ├─────────────────────────────────────────────────────────────────►│ │ │ │ +OK │ │ │ │ ─┘ │◄─────────────────────────────────────────────────────────────────┤ │ │ │ │ │ │ │ │ Drain repl stream │ ──┐ │ │ │◄────────────────────────────────────┤ │ │ │ SET key value2 │ │ │ │ │ ├─────────────────────────────────────────────────────────────────►│ │Write │ │ │ │ │ │pause │ │ │ │ │ │ │ │ │ Publish new config via cluster bus │ │ │ │ +MOVED ├────────────────────────────────────►│ ──┘ │ │◄─────────────────────────────────────────────────────────────────┤ ──┐ │ │ │ │ │ │ │ │ │ │ │ │Trim │ │ │ │ │ ──┘ │ │ SET key value2 │ │ │ │ ├───────────────────────────►│ │ │ │ │ +OK │ │ │ │ │◄───────────────────────────┤ │ │ │ │ │ │ │ │ │ │ │ │ │ ``` ### New commands introduced There are two new commands: 1. A command to start, monitor and cancel the migration operation: `CLUSTER MIGRATION ` 2. An internal command to manage slot transfer between source and destination: `CLUSTER SYNCSLOTS ` For more details, please refer to the [New Commands](#new-commands) section. Internal command messaging is mostly omitted in the diagram for simplicity. ### Steps 1. Slot migration begins when the operator sends `CLUSTER MIGRATION IMPORT ...` to the destination master. The process is initiated from the destination node, similar to REPLICAOF. This approach allows us to reuse the same logic and share code with the new replication mechanism (see https://github.com/redis/redis/pull/13732). The command can include multiple slot ranges. The destination node creates one migration task per source node, regardless of how many slot ranges are specified. Upon successfully creating the task, the destination node replies IMPORT command with the assigned task ID. The operator can then monitor progress using `CLUSTER MIGRATION STATUS ID ` . When the task’s state field changes to `completed`, the migration has finished successfully. Please see [New Commands](#new-commands) section for the output sample. 2. After creating the migration task, the destination node will request replication of slots by using the internal command `CLUSTER SYNCSLOTS`. 3. Once the source node accepts the request, the destination node establishes another separate connection(similar to rdbchannel replication) so snapshot data and incremental changes can be transmitted in parallel. 4. Source node forks, starts delivering snapshot content (as per-key RESTORE commands) from one connection and incremental changes from the other connection. The destination master starts applying commands from the snapshot connection and accumulates incremental changes. Applied commands are also propagated to the destination replicas via replication backlog. Note: Only commands of related slots are delivered to the destination node. This is done by writing them to the migration client’s output buffer, which serves as the replication stream for the migration operation. 5. Once the source node finishes delivering the snapshot and determines that the destination node has caught up (remaining repl stream to consume went under a configured limit), it pauses write traffic for the entire server. After pausing the writes, the source node forwards any remaining write commands to the destination node. 6. Once the destination consumes all the writes, it bumps up cluster config epoch and changes the configuration. New config is published via cluster bus. 7. When the source node receives the new configuration, it can redirect clients and it begins trimming the migrated slots, while also resuming write traffic on the server. ### Internal slots synchronization state machine ![asm state machine](https://github.com/user-attachments/assets/b7db353c-969e-4bde-b77f-c6abe5aa13d3) 1. The destination node performs authentication using the cluster secret introduced in #13763 , and transmits its node ID information. 2. The destination node sends `CLUSTER SYNCSLOTS SYNC ` to initiate a slot synchronization request and establish the main channel. The source node responds with `+RDBCHANNELSYNCSLOTS`, indicating that the destination node should establish an RDB channel. 3. The destination node then sends `CLUSTER SYNCSLOTS RDBCHANNEL ` to establish the RDB channel, using the same task-id as in the previous step to associate the two connections as part of the same ASM task. The source node replies with `+SLOTSSNAPSHOT`, and `fork` a child process to transfer slot snapshot. 4. The destination node applies the slot snapshot data received over the RDB channel, while proxying the command stream to replicas. At the same time, the main channel continues to read and buffer incremental commands in memory. 5. Once the source node finishes sending the slot snapshot, it notifies the destination node using the `CLUSTER SYNCSLOTS SNAPSHOT-EOF` command. The destination node then starts streaming the buffered commands while continuing to read and buffer incremental commands sent from the source. 6. The destination node periodically sends `CLUSTER SYNCSLOTS ACK ` to inform the source of the applied data offset. When the offset gap meets the threshold, the source node pauses write operations. After all buffered data has been drained, it sends `CLUSTER SYNCSLOTS STREAM-EOF` to the destination node to hand off slots. 7. Finally, the destination node takes over slot ownership, updates the slot configuration and bumps the epoch, then broadcasts the updates via cluster bus. Once the source node detects the updated slot configuration, the slot migration process is complete. ### Error handling - If the connection between the source and destination is lost (due to disconnection, output buffer overflow, OOM, or timeout), the destination node automatically restarts the migration from the beginning. The destination node will retry the operation until it is explicitly cancelled using the CLUSTER MIGRATION CANCEL command. - If a replica connection drops during migration, it can later resume with PSYNC, since the imported slot data is also written to the replication backlog. - During the write pause phase, the source node sets a timeout. If the destination node fails to drain remaining replication data and update the config during that time, the source node assumes the destination has failed and automatically resumes normal writes for the migrating slots. - On any error, the destination node triggers a trim operation to discard any partially imported slot data. - If node crashes during importing, unowned keys are deleted on start up. ### Slot Snapshot Format Considerations When the source node forks to deliver slot content, in theory, there are several possible formats for transmitting the snapshot data: **Mini RDB**:A compact RDB file containing only the keys from the migrating slots. This format is efficient for transmission, but it cannot be easily forwarded to destination-side replicas. **AOF format**: The source node can generate commands in AOF form (e.g., SET x y, HSET h f v) and stream them. Individual commands are easily appended to the replication stream and propagated to replicas. Large keys can also be split into multiple commands (incrementally reconstructing the value), similar to the AOF rewrite process. **RESTORE commands**: Each key is serialized and sent as a `RESTORE` command. These can be appended directly to the destination’s replication stream, though very large keys may make serialization and transmission less efficient. We chose the `RESTORE` command as default approach for the following reasons: - It can be easily propagated to replicas. - It is more efficient than AOF for most cases, and some module keys do not support the AOF format. - For large **non-module** keys that are not string, ASM automatically switches to the AOF-based key encoding as an optimization when the key’s cardinality exceeds 512. This approach allows the key to be transferred in chunks rather than as a single large payload, reducing memory pressure and improving migration efficiency. In future versions, the RESTORE command may be enhanced to handle large keys more efficiently. Some details: - For RESTORE commands, normally by default Redis compresses keys. We disable compression while delivering RESTORE commands as compression comes with a performance hit. Without compression, replication is several times faster. - For string keys, we still prefer AOF format, e.g. SET commands as it is currently more efficient than RESTORE, especially for big keys. ### Trimming the keys When a migration completes successfully, the source node deletes the migrated keys from its local database. Since the migrated slots may contain a large number of keys, this trimming process must be efficient and non-blocking. In cluster mode, Redis maintains per-slot data structures for keys, expires, and subexpires. This organization makes it possible to efficiently detach all data associated with a given slot in a single step. During trimming, these slot-specific data structures are handed off to a background I/O (BIO) thread for asynchronous cleanup—similar to how FLUSHALL or FLUSHDB operate. This mechanism is referred to as background trimming, and it is the preferred and default method for ASM, ensuring that the main thread remains unblocked. However, unlike Redis itself, some modules may not maintain per-slot data structures and therefore cannot drop related slots data in a single operation. To support these cases, Redis introduces active trimming, where key deletion occurs in the main thread instead. This is not a blocking operation, trimming runs concurrently in the main thread, periodically removing keys during the cron loop. Each deletion triggers a keyspace notification so that modules can react to individual key removals. While active trim is less efficient, it ensures backward compatibility for modules during the transition period. Before starting the trim, Redis checks whether any module is subscribed to newly added `REDISMODULE_NOTIFY_KEY_TRIMMED` keyspace event. If such subscribers exist, active trimming is used; otherwise, background trimming is triggered. Going forward, modules are expected to adopt background trimming to take advantage of its performance and scalability benefits, and active trimming will be phased out once modules migrate to the new model. Redis also prefers active trimming if there is any client that is using client tracking feature (see [client-side caching](https://redis.io/docs/latest/develop/reference/client-side-caching/)). In the current client tracking protocol, when a database is flushed (e.g., via the FLUSHDB command), a null value is sent to tracking clients to indicate that they should invalidate all locally cached keys. However, there is currently no mechanism to signal that only specific slots have been flushed. Iterating over all keys in the slots to be trimmed would be a blocking operation. To avoid this, if there is any client that is using client tracking feature, Redis automatically switches to active trimming mode. In the future, the client tracking protocol can be extended to support slot-based invalidation, allowing background trimming to be used in these cases as well. Finally, trimming may also be triggered after a migration failure. In such cases, the operation ensures that any partially imported or inconsistent slot data is cleaned up, maintaining cluster consistency and preventing stale keys from remaining in the source or destination nodes. Note about active trim: Subsequent migrations can complete while a prior trim is still running. In that case, the new migration’s trim job is queued and will start automatically after the current trim finishes. This does not affect slot ownership or client traffic—it only serializes the background cleanup. ### Replica handling - During importing, new keys are propagated to destination side replica. Replica will check slot ownership before replying commands like SCAN, KEYS, DBSIZE not to include these unowned keys in the reply. Also, when an import operation begins, the master now propagates an internal command through the replication stream, allowing replicas to recognize that an ASM operation is in progress. This is done by the internal `CLUSTER SYNCSLOTS CONF ASM-TASK` command in the replication stream. This enables replicas to trigger the relevant module events so that modules can adapt their behavior — for example, filtering out unowned keys from read-only requests during ASM operations. To be able to support full sync with RDB delivery scenarios, a new AUX field is also added to the RDB: `cluster-asm-task`. It's value is a string in the format of `task_id:source_node:dest_node:operation:state:slot_ranges`. - After a successful migration or on a failed import, master will trim the keys. In that case, master will propagate a new command to the replica: `TRIMSLOTS RANGES ... ` . So, the replica will start trimming once this command is received. ### Propagating data outside the keyspace When the destination node is newly added to the cluster, certain data outside the keyspace may need to be propagated first. A common example is functions. Previously, redis-cli handled this by transferring functions when a new node was added. With ASM, Redis now automatically dumps and sends functions to the destination node using `FUNCTION RESTORE ..REPLACE` command — done purely for convenience to simplify setup. Additionally, modules may also need to propagate their own data outside the keyspace. To support this, a new API has been introduced: `RM_ClusterPropagateForSlotMigration()`. See the [Module Support](#module-support) section for implementation details. ### Limitations 1. Single migration at a time: Only one ASM migration operation is allowed at a time. This limitation simplifies the current design but can be extended in the future. 2. Large key handling: For large keys, ASM switches to AOF encoding to deliver key data in chunks. This mechanism currently applies only to non-module keys. In the future, the RESTORE command may be extended to support chunked delivery, providing a unified solution for all key types. See [Slot Snapshot Format Considerations](#slot-snapshot-format-considerations) for details. 3. There are several cases that may cause an Atomic Slot Migration (ASM) to be aborted (can be retried afterwards): - FLUSHALL / FLUSHDB: These commands introduce complexity during ASM. For example, if executed on the migrating node, they must be propagated only for the migrating slots. However, when combined with active trimming, their execution may need to be deferred until it is safe to proceed, adding further complexity to the process. - FAILOVER: The replica cannot resume the migration process. Migration should start from the beginning. - Module propagates cross-slot command during ASM via RM_Replicate(): If this occurs on the migrating node, Redis cannot split the command to propagate only the relevant slots to the ASM destination. To keep the logic simple and consistent, ASM is cancelled in this case. Modules should avoid propagating cross-slot commands during migration. - CLIENT PAUSE: The import task cannot progress during a write pause, as doing so would violate the guarantee that no writes occur during migration. To keep things simple, the ASM task is aborted when CLIENT PAUSE is active. - Manual Slot Configuration Changes: If slot configuration is modified manually during ASM (for example, when legacy migration methods are mixed with ASM), the process is aborted. Note: This situation is highly unexpected — users should not combine ASM with legacy migration methods. 4. When active trimming is enabled, a node must not re-import the same slots while trimming for those slots is still in progress. Otherwise, it can’t distinguish newly imported keys from pre-existing ones, and the trim cron might delete the incoming keys by mistake. In this state, the node rejects IMPORT operation for those slots until trimming completes. If the master has finished trimming but a replica is still trimming, master may still start the import operation for those slots. So, the replica checks whether the master is sending commands for those slots; if so, it blocks the master’s client connection until trimming finishes. This is a corner case, but we believe the behavior is reasonable for now. In the worst case, the master may drop the replica (e.g., buffer overrun), triggering a new full sync. # API Changes ## New Commands ### Public commands 1. **Syntax:** `CLUSTER MIGRATION IMPORT [ ]...` **Args:** Slot ranges **Reply:** - String task ID - -ERR on failure (e.g. invalid slot range) **Description:** Executes on the destination master. Accepts multiple slot ranges and triggers atomic migration for the specified ranges. Returns a task ID that can be used to monitor the status of the task. In CLUSTER MIGRATION STATUS output, “state” field will be `completed` on a successful operation. 2. **Syntax:** `CLUSTER MIGRATION CANCEL [ID | ALL]` **Args:** Task ID or ALL **Reply:** Number of cancelled tasks **Description:** Cancels an ongoing migration task by its ID or cancels all tasks if ALL is specified. Note: Cancelling a task on the source node does not stop the migration on the destination node, which will continue retrying until it is also cancelled there. 3. **Syntax:** `CLUSTER MIGRATION STATUS [ID | ALL]` **Args:** Task ID or ALL - **ID:** If provided, returns the status of the specified migration task. - **ALL:** Lists the status of all migration tasks. **Reply:** - A list of migration task details (both ongoing and completed ones). - Empty list if the given task ID does not exist. **Description:** Displays the status of all current and completed atomic slot migration tasks. If a specific task ID is provided, it returns detailed information for that task only. **Sample output:** ``` 127.0.0.1:5001> cluster migration status all 1) 1) "id" 2) "24cf41718b20f7f05901743dffc40bc9b15db339" 3) "slots" 4) "0-1000" 5) "source" 6) "1098d90d9ba2d1f12965442daf501ef0b6667bec" 7) "dest" 8) "b3b5b426e7ea6166d1548b2a26e1d5adeb1213ac" 9) "operation" 10) "migrate" 11) "state" 12) "completed" 13) "last_error" 14) "" 15) "retries" 16) "0" 17) "create_time" 18) "1759694528449" 19) "start_time" 20) "1759694528449" 21) "end_time" 22) "1759694528464" 23) "write_pause_ms" 24) "10" ``` ### Internal commands 1. **Syntax:** `CLUSTER SYNCSLOTS ...` **Args:** Internal messaging operations **Reply:** +OK or -ERR on failure (e.g. invalid slot range) **Description:** Used for internal communication between source and destination nodes. e.g. handshaking, establishing multiple channels, triggering handoff. 2. **Syntax:** `TRIMSLOTS RANGES ...` **Args:** Slot ranges to trim **Reply:** +OK **Description:** Master propagates it to replica so that replica can trim unowned keys after a successful migration or on a failed import. ## New configs - `cluster-slot-migration-max-archived-tasks`: To list in `CLUSTER MIGRATION STATUS ALL` output, Redis keeps last n migration tasks in memory. This config controls maximum number of archived ASM tasks. Default value: 32, used as a hidden config - `cluster-slot-migration-handoff-max-lag-bytes`: After the slot snapshot is completed, if the remaining replication stream size falls below this threshold, the source node pauses writes to hand off slot ownership. A higher value may trigger the handoff earlier but can lead to a longer write pause, since more data remains to be replicated. A lower value can result in a shorter write pause, but it may be harder to reach the threshold if there is a steady flow of incoming writes. Default value: 1MB - `cluster-slot-migration-write-pause-timeout`: The maximum duration (in milliseconds) that the source node pauses writes during ASM handoff. After pausing writes, if the destination node fails to take over the slots within this timeout (for example, due to a cluster configuration update failure), the source node assumes the migration has failed and resumes writes to prevent indefinite blocking. Default value: 10 seconds - `cluster-slot-migration-sync-buffer-drain-timeout`: Timeout in milliseconds for sync buffer to be drained during ASM. After the destination applies the accumulated buffer, the source continues sending commands for migrating slots. The destination keeps applying them, but if the gap remains above the acceptable limit (see `slot-migration-handoff-max-lag-bytes`), which may cause endless synchronization. A timeout check is required to handle this case. The timeout is calculated as **the maximum of two values**: - A configurable timeout (slot-migration-sync-buffer-drain-timeout) to avoid false positives. - A dynamic timeout based on the time that the destination took to apply the slot snapshot and the accumulated buffer during slot snapshot delivery. The destination should be able to drain the remaining sync buffer in less time than this. We multiply it by 2 to be more conservative. Default value: 60000 millliseconds, used as a hidden config ## New flag in CLIENT LIST - the client responsible for importing slots is marked with the `o` flag. - the client responsible for migrating slots is marked with the `g` flag. ## New INFO fields - `mem_cluster_slot_migration_output_buffer`: Memory usage of the migration client’s output buffer. Redis writes incoming changes to this buffer during the migration process. - `mem_cluster_slot_migration_input_buffer`: Memory usage of the accumulated replication stream buffer on the importing node. - `mem_cluster_slot_migration_input_buffer_peak`: Peak accumulated repl buffer size on the importing side ## New CLUSTER INFO fields - `cluster_slot_migration_active_tasks`: Number of in-progress ASM tasks. Currently, it will be 1 or 0. - `cluster_slot_migration_active_trim_running`: Number of active trim jobs in progress and scheduled - `cluster_slot_migration_active_trim_current_job_keys`: Number of keys scheduled for deletion in the current trim job. - `cluster_slot_migration_active_trim_current_job_trimmed`: Number of keys already deleted in the current trim job. - `cluster_slot_migration_stats_active_trim_started`: Total number of trim jobs that have started since the process began. - `cluster_slot_migration_stats_active_trim_completed`: Total number of trim jobs completed since the process began. - `cluster_slot_migration_stats_active_trim_cancelled`: Total number of trim jobs cancelled since the process began. ## Changes in RDB format A new aux field is added to RDB: `cluster-asm-task`. When an import operation begins, the master now propagates an internal command through the replication stream, allowing replicas to recognize that an ASM operation is in progress. This enables replicas to trigger the relevant module events so that modules can adapt their behavior — for example, filtering out unowned keys from read-only requests during ASM operations. To be able to support RDB delivery scenarios, a new field is added to the RDB. See [replica handling](#replica-handling) ## Bug fix - Fix memory leak when processing forgetting node type message - Fix data race of writing reply to replica client directly when enabling multi-threading We don't plan to back point them into old versions, since they are very rare cases. ## Keys visibility When performing atomic slot migration, during key importing on the destination node or key trimming on the source/destination, these keys will be filtered out in the following commands: - KEYS - SCAN - RANDOMKEY - CLUSTER GETKEYSINSLOT - DBSIZE - CLUSTER COUNTKEYSINSLOT The only command that will reflect the increasing number of keys is: - INFO KEYSPACE ## Module Support **NOTE:** Please read [trimming](#trimming-the-keys) section and see how does ASM decide about trimming method when there are modules in use. ### New notification: ```c #define REDISMODULE_NOTIFY_KEY_TRIMMED (1<<17) ``` When a key is deleted by the active trim operation, this notification will be sent to subscribed modules. Also, ASM will automatically choose the trimming method depending on whether there are any subscribers to this new event. Please see the further details here: [trimming](#trimming-the-keys) ### New struct in the API: ```c typedef struct RedisModuleSlotRange { uint16_t start; uint16_t end; } RedisModuleSlotRange; typedef struct RedisModuleSlotRangeArray { int32_t num_ranges; RedisModuleSlotRange ranges[]; } RedisModuleSlotRangeArray; ``` ### New Events #### 1. REDISMODULE_EVENT_CLUSTER_SLOT_MIGRATION (RedisModuleEvent_ClusterSlotMigration) These events notify modules about different stages of Active Slot Migration (ASM) operations such as when import or migration starts, fails, or completes. Modules can use these notifications to track cluster slot movements or perform custom logic during ASM transitions. ```c #define REDISMODULE_SUBEVENT_CLUSTER_SLOT_MIGRATION_IMPORT_STARTED 0 #define REDISMODULE_SUBEVENT_CLUSTER_SLOT_MIGRATION_IMPORT_FAILED 1 #define REDISMODULE_SUBEVENT_CLUSTER_SLOT_MIGRATION_IMPORT_COMPLETED 2 #define REDISMODULE_SUBEVENT_CLUSTER_SLOT_MIGRATION_MIGRATE_STARTED 3 #define REDISMODULE_SUBEVENT_CLUSTER_SLOT_MIGRATION_MIGRATE_FAILED 4 #define REDISMODULE_SUBEVENT_CLUSTER_SLOT_MIGRATION_MIGRATE_COMPLETED 5 #define REDISMODULE_SUBEVENT_CLUSTER_SLOT_MIGRATION_MIGRATE_MODULE_PROPAGATE 6 ``` Parameter to these events: ```c typedef struct RedisModuleClusterSlotMigrationInfo { uint64_t version; /* Not used since this structure is never passed from the module to the core right now. Here for future compatibility. */ char source_node_id[REDISMODULE_NODE_ID_LEN + 1]; char destination_node_id[REDISMODULE_NODE_ID_LEN + 1]; const char *task_id; RedisModuleSlotRangeArray* slots; } RedisModuleClusterSlotMigrationInfoV1; #define RedisModuleClusterSlotMigrationInfo RedisModuleClusterSlotMigrationInfoV1 ``` #### 2. REDISMODULE_EVENT_CLUSTER_SLOT_MIGRATION_TRIM (RedisModuleEvent_ClusterSlotMigrationTrim) These events inform modules about the lifecycle of ASM key trimming operations. Modules can use them to detect when trimming starts, completes, or is performed asynchronously in the background. ```c #define REDISMODULE_SUBEVENT_CLUSTER_SLOT_MIGRATION_TRIM_STARTED 0 #define REDISMODULE_SUBEVENT_CLUSTER_SLOT_MIGRATION_TRIM_COMPLETED 1 #define REDISMODULE_SUBEVENT_CLUSTER_SLOT_MIGRATION_TRIM_BACKGROUND 2 ``` Parameter to these events: ```c typedef struct RedisModuleClusterSlotMigrationTrimInfo { uint64_t version; /* Not used since this structure is never passed from the module to the core right now. Here for future compatibility. */ RedisModuleSlotRangeArray* slots; } RedisModuleClusterSlotMigrationTrimInfoV1; #define RedisModuleClusterSlotMigrationTrimInfo RedisModuleClusterSlotMigrationTrimInfoV1 ``` ### New functions ```c /* Returns 1 if keys in the specified slot can be accessed by this node, 0 otherwise. * * This function returns 1 in the following cases: * - The slot is owned by this node or by its master if this node is a replica * - The slot is being imported under the old slot migration approach (CLUSTER SETSLOT IMPORTING ..) * - Not in cluster mode (all slots are accessible) * * Returns 0 for: * - Invalid slot numbers (< 0 or >= 16384) * - Slots owned by other nodes */ int RM_ClusterCanAccessKeysInSlot(int slot); /* Propagate commands along with slot migration. * * This function allows modules to add commands that will be sent to the * destination node before the actual slot migration begins. It should only be * called during the REDISMODULE_SUBEVENT_CLUSTER_SLOT_MIGRATION_MIGRATE_MODULE_PROPAGATE event. * * This function can be called multiple times within the same event to * replicate multiple commands. All commands will be sent before the * actual slot data migration begins. * * Note: This function is only available in the fork child process just before * slot snapshot delivery begins. * * On success REDISMODULE_OK is returned, otherwise * REDISMODULE_ERR is returned and errno is set to the following values: * * * EINVAL: function arguments or format specifiers are invalid. * * EBADF: not called in the correct context, e.g. not called in the REDISMODULE_SUBEVENT_CLUSTER_SLOT_MIGRATION_MIGRATE_MODULE_PROPAGATE event. * * ENOENT: command does not exist. * * ENOTSUP: command is cross-slot. * * ERANGE: command contains keys that are not within the migrating slot range. */ int RM_ClusterPropagateForSlotMigration(RedisModuleCtx *ctx, const char *cmdname, const char *fmt, ...); /* Returns the locally owned slot ranges for the node. * * An optional `ctx` can be provided to enable auto-memory management. * If cluster mode is disabled, the array will include all slots (0–16383). * If the node is a replica, the slot ranges of its master are returned. * * The returned array must be freed with RM_ClusterFreeSlotRanges(). */ RedisModuleSlotRangeArray *RM_ClusterGetLocalSlotRanges(RedisModuleCtx *ctx); /* Frees a slot range array returned by RM_ClusterGetLocalSlotRanges(). * Pass the `ctx` pointer only if the array was created with a context. */ void RM_ClusterFreeSlotRanges(RedisModuleCtx *ctx, RedisModuleSlotRangeArray *slots); ``` ## ASM API for alternative cluster implementations Following https://github.com/redis/redis/pull/12742, Redis cluster code was restructured to support alternative cluster implementations. Redis uses cluster_legacy.c implementation by default. This PR adds a generic ASM API so alternative implementations can initiate and coordinate Atomic Slot Migration (ASM) while Redis executes the data movement and emits state changes. Documentation rests in `cluster.h`: ```c There are two new functions: /* Called by cluster implementation to request an ASM operation. (cluster impl --> redis) */ int clusterAsmProcess(const char *task_id, int event, void *arg, char **err); /* Called when an ASM event occurs to notify the cluster implementation. (redis --> cluster impl) */ int clusterAsmOnEvent(const char *task_id, int event, void *arg); ``` ```c /* API for alternative cluster implementations to start and coordinate * Atomic Slot Migration (ASM). * * These two functions drive ASM for alternative cluster implementations. * - clusterAsmProcess(...) impl -> redis: initiates/advances/cancels ASM operations * - clusterAsmOnEvent(...) redis -> impl: notifies state changes * * Generic steps for an alternative implementation: * - On destination side, implementation calls clusterAsmProcess(ASM_EVENT_IMPORT_START) * to start an import operation. * - Redis calls clusterAsmOnEvent() when an ASM event occurs. * - On the source side, Redis will call clusterAsmOnEvent(ASM_EVENT_HANDOFF_PREP) * when slots are ready to be handed off and the write pause is needed. * - Implementation stops the traffic to the slots and calls clusterAsmProcess(ASM_EVENT_HANDOFF) * - On the destination side, Redis calls clusterAsmOnEvent(ASM_EVENT_TAKEOVER) * when destination node is ready to take over the slot, waiting for ownership change. * - Cluster implementation updates the config and calls clusterAsmProcess(ASM_EVENT_DONE) * to notify Redis that the slots ownership has changed. * * Sequence diagram for import: * - Note: shows only the events that cluster implementation needs to react. * * ┌───────────────┐ ┌───────────────┐ ┌───────────────┐ ┌───────────────┐ * │ Destination │ │ Destination │ │ Source │ │ Source │ * │ Cluster impl │ │ Master │ │ Master │ │ Cluster impl │ * └───────┬───────┘ └───────┬───────┘ └───────┬───────┘ └───────┬───────┘ * │ │ │ │ * │ ASM_EVENT_IMPORT_START │ │ │ * ├─────────────────────────────►│ │ │ * │ │ CLUSTER SYNCSLOTS │ │ * │ ├────────────────────────►│ │ * │ │ │ │ * │ │ SNAPSHOT(restore cmds) │ │ * │ │◄────────────────────────┤ │ * │ │ Repl stream │ │ * │ │◄────────────────────────┤ │ * │ │ │ ASM_EVENT_HANDOFF_PREP │ * │ │ ├────────────────────────────►│ * │ │ │ ASM_EVENT_HANDOFF │ * │ │ │◄────────────────────────────┤ * │ │ Drain repl stream │ │ * │ │◄────────────────────────┤ │ * │ ASM_EVENT_TAKEOVER │ │ │ * │◄─────────────────────────────┤ │ │ * │ │ │ │ * │ ASM_EVENT_DONE │ │ │ * ├─────────────────────────────►│ │ ASM_EVENT_DONE │ * │ │ │◄────────────────────────────┤ * │ │ │ │ */ #define ASM_EVENT_IMPORT_START 1 /* Start a new import operation (destination side) */ #define ASM_EVENT_CANCEL 2 /* Cancel an ongoing import/migrate operation (source and destination side) */ #define ASM_EVENT_HANDOFF_PREP 3 /* Slot is ready to be handed off to the destination shard (source side) */ #define ASM_EVENT_HANDOFF 4 /* Notify that the slot can be handed off (source side) */ #define ASM_EVENT_TAKEOVER 5 /* Ready to take over the slot, waiting for config change (destination side) */ #define ASM_EVENT_DONE 6 /* Notify that import/migrate is completed, config is updated (source and destination side) */ #define ASM_EVENT_IMPORT_PREP 7 /* Import is about to start, the implementation may reject by returning C_ERR */ #define ASM_EVENT_IMPORT_STARTED 8 /* Import started */ #define ASM_EVENT_IMPORT_FAILED 9 /* Import failed */ #define ASM_EVENT_IMPORT_COMPLETED 10 /* Import completed (config updated) */ #define ASM_EVENT_MIGRATE_PREP 11 /* Migrate is about to start, the implementation may reject by returning C_ERR */ #define ASM_EVENT_MIGRATE_STARTED 12 /* Migrate started */ #define ASM_EVENT_MIGRATE_FAILED 13 /* Migrate failed */ #define ASM_EVENT_MIGRATE_COMPLETED 14 /* Migrate completed (config updated) */ ``` ------ Co-authored-by: Yuan Wang --------- Co-authored-by: Yuan Wang --- redis.conf | 35 + src/Makefile | 2 +- src/acl.c | 2 +- src/aof.c | 80 +- src/blocked.c | 17 +- src/cluster.c | 476 ++- src/cluster.h | 168 + src/cluster_asm.c | 3467 ++++++++++++++++++ src/cluster_asm.h | 56 + src/cluster_legacy.c | 191 +- src/commands.def | 143 + src/commands/cluster-migration.json | 141 + src/commands/cluster-syncslots.json | 117 + src/commands/trimslots.json | 48 + src/config.c | 4 + src/db.c | 155 +- src/debug.c | 20 + src/estore.c | 20 + src/estore.h | 2 + src/evict.c | 33 +- src/expire.c | 25 +- src/functions.c | 28 +- src/functions.h | 2 + src/iothread.c | 7 +- src/kvstore.c | 91 +- src/kvstore.h | 7 +- src/lazyfree.c | 9 +- src/module.c | 192 +- src/multi.c | 9 +- src/networking.c | 67 +- src/object.c | 7 + src/rdb.c | 39 +- src/redismodule.h | 72 +- src/replication.c | 381 +- src/rio.c | 1 + src/server.c | 46 +- src/server.h | 61 +- src/t_hash.c | 18 +- tests/cluster/cluster.tcl | 20 +- tests/modules/Makefile | 3 +- tests/modules/atomicslotmigration.c | 523 +++ tests/support/cluster_util.tcl | 23 +- tests/test_helper.tcl | 4 + tests/unit/cluster/atomic-slot-migration.tcl | 2863 +++++++++++++++ tests/unit/cluster/slot-stats.tcl | 46 + 45 files changed, 9333 insertions(+), 388 deletions(-) create mode 100644 src/cluster_asm.c create mode 100644 src/cluster_asm.h create mode 100644 src/commands/cluster-migration.json create mode 100644 src/commands/cluster-syncslots.json create mode 100644 src/commands/trimslots.json create mode 100644 tests/modules/atomicslotmigration.c create mode 100644 tests/unit/cluster/atomic-slot-migration.tcl diff --git a/redis.conf b/redis.conf index 5d2b27ffb..7229d5c27 100644 --- a/redis.conf +++ b/redis.conf @@ -1827,6 +1827,41 @@ aof-timestamp-enabled no # # cluster-slot-stats-enabled no +# Slot migration write pause timeout controls how long the source node will +# pause write operations during slot migration handoff phase. This usually +# finishes in a few milliseconds, depending on traffic and load. When the source +# node pauses writes to allow the destination to catch up and take the ownership +# of the slots, this timeout prevents writes from being blocked indefinitely. +# +# If the destination node fails to complete the slot ownership takeover within +# this timeout, the source node will resume accepting writes and assume the +# migration task is failed. This prevents the source node from being permanently +# blocked if the destination node becomes unresponsive or fails during migration. +# +# If this timeout is set too low, the source may resume writes and assume that +# the slot migration has failed while the destination is still in the process of +# draining the replication stream and publishing the configuration update. +# During this window, writes accepted by the source will not be replicated to +# the destination; if the destination later publishes the updated config and +# takes ownership, those writes could be lost. Therefore, avoid setting this +# timeout too low. +# +# This timeout is specified in milliseconds. +# +# cluster-slot-migration-write-pause-timeout 10000 + +# This config controls the maximum acceptable lag in bytes between source and +# destination nodes during slot migration before triggering the slot handoff +# phase. If the remaining replication stream size falls below this threshold, +# the source node pauses writes and then signals destination that it can take +# over the slot ownership after draining the remaining replication stream. +# +# A smaller value means potentially shorter write pause duration, but it may +# take longer for the destination to catch up. A larger value means handoff can +# be triggered earlier, but the write pause may potentially be longer. +# +# cluster-slot-migration-handoff-max-lag-bytes 1mb + # In order to setup your cluster make sure to read the documentation # available at https://redis.io web site. diff --git a/src/Makefile b/src/Makefile index 99536485c..0589b7141 100644 --- a/src/Makefile +++ b/src/Makefile @@ -382,7 +382,7 @@ endif REDIS_SERVER_NAME=redis-server$(PROG_SUFFIX) REDIS_SENTINEL_NAME=redis-sentinel$(PROG_SUFFIX) -REDIS_SERVER_OBJ=threads_mngr.o memory_prefetch.o adlist.o quicklist.o ae.o anet.o dict.o ebuckets.o eventnotifier.o iothread.o mstr.o kvstore.o fwtree.o estore.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o cluster_legacy.o cluster_slot_stats.o crc16.o endianconv.o slowlog.o eval.o bio.o rio.o rand.o memtest.o syscheck.o crcspeed.o crccombine.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o redis-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o lolwut8.o acl.o tracking.o socket.o tls.o sha256.o timeout.o setcpuaffinity.o monotonic.o mt19937-64.o resp_parser.o call_reply.o script_lua.o script.o functions.o function_lua.o commands.o strl.o connection.o unix.o logreqres.o +REDIS_SERVER_OBJ=threads_mngr.o memory_prefetch.o adlist.o quicklist.o ae.o anet.o dict.o ebuckets.o eventnotifier.o iothread.o mstr.o kvstore.o fwtree.o estore.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o cluster_asm.o cluster_legacy.o cluster_slot_stats.o crc16.o endianconv.o slowlog.o eval.o bio.o rio.o rand.o memtest.o syscheck.o crcspeed.o crccombine.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o redis-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o lolwut8.o acl.o tracking.o socket.o tls.o sha256.o timeout.o setcpuaffinity.o monotonic.o mt19937-64.o resp_parser.o call_reply.o script_lua.o script.o functions.o function_lua.o commands.o strl.o connection.o unix.o logreqres.o REDIS_CLI_NAME=redis-cli$(PROG_SUFFIX) REDIS_CLI_OBJ=anet.o adlist.o dict.o redis-cli.o zmalloc.o release.o ae.o redisassert.o crcspeed.o crccombine.o crc64.o siphash.o crc16.o monotonic.o cli_common.o mt19937-64.o strl.o cli_commands.o REDIS_BENCHMARK_NAME=redis-benchmark$(PROG_SUFFIX) diff --git a/src/acl.c b/src/acl.c index b9f81bcc8..6bd3f0ee4 100644 --- a/src/acl.c +++ b/src/acl.c @@ -3203,7 +3203,7 @@ void addReplyCommandCategories(client *c, struct redisCommand *cmd) { /* When successful, initiates an internal connection, that is able to execute * internal commands (see CMD_INTERNAL). */ static void internalAuth(client *c) { - if (server.cluster == NULL) { + if (!server.cluster_enabled) { addReplyError(c, "Cannot authenticate as an internal connection on non-cluster instances"); return; } diff --git a/src/aof.c b/src/aof.c index 8a9be94b6..94a28775b 100644 --- a/src/aof.c +++ b/src/aof.c @@ -11,6 +11,7 @@ #include "bio.h" #include "rio.h" #include "functions.h" +#include "cluster_asm.h" #include #include @@ -2384,11 +2385,48 @@ werr: return 0; } +int rewriteObject(rio *r, robj *key, robj *o, int dbid, long long expiretime) { + /* Save the key and associated value */ + if (o->type == OBJ_STRING) { + /* Emit a SET command */ + static const char cmd[]="*3\r\n$3\r\nSET\r\n"; + if (rioWrite(r,cmd,sizeof(cmd)-1) == 0) return C_ERR; + /* Key and value */ + if (rioWriteBulkObject(r,key) == 0) return C_ERR; + if (rioWriteBulkObject(r,o) == 0) return C_ERR; + } else if (o->type == OBJ_LIST) { + if (rewriteListObject(r,key,o) == 0) return C_ERR; + } else if (o->type == OBJ_SET) { + if (rewriteSetObject(r,key,o) == 0) return C_ERR; + } else if (o->type == OBJ_ZSET) { + if (rewriteSortedSetObject(r,key,o) == 0) return C_ERR; + } else if (o->type == OBJ_HASH) { + if (rewriteHashObject(r,key,o) == 0) return C_ERR; + } else if (o->type == OBJ_STREAM) { + if (rewriteStreamObject(r,key,o) == 0) return C_ERR; + } else if (o->type == OBJ_MODULE) { + if (rewriteModuleObject(r,key,o,dbid) == 0) return C_ERR; + } else { + serverPanic("Unknown object type"); + } + + /* Save the expire time */ + if (expiretime != -1) { + static const char cmd[]="*3\r\n$9\r\nPEXPIREAT\r\n"; + if (rioWrite(r,cmd,sizeof(cmd)-1) == 0) return C_ERR; + if (rioWriteBulkObject(r,key) == 0) return C_ERR; + if (rioWriteBulkLongLong(r,expiretime) == 0) return C_ERR; + } + + return C_OK; +} + int rewriteAppendOnlyFileRio(rio *aof) { dictEntry *de; int j; long key_count = 0; long long updated_time = 0; + unsigned long long skipped = 0; kvstoreIterator *kvs_it = NULL; /* Record timestamp at the beginning of rewriting AOF. */ @@ -2420,34 +2458,21 @@ int rewriteAppendOnlyFileRio(rio *aof) { /* Get the expire time */ expiretime = kvobjGetExpire(o); + + /* Skip keys that are being trimmed */ + if (server.cluster_enabled) { + int curr_slot = kvstoreIteratorGetCurrentDictIndex(kvs_it); + if (isSlotInTrimJob(curr_slot)) { + skipped++; + continue; + } + } /* Set on stack string object for key */ robj key; initStaticStringObject(key, kvobjGetKey(o)); - /* Save the key and associated value */ - if (o->type == OBJ_STRING) { - /* Emit a SET command */ - char cmd[]="*3\r\n$3\r\nSET\r\n"; - if (rioWrite(aof,cmd,sizeof(cmd)-1) == 0) goto werr; - /* Key and value */ - if (rioWriteBulkObject(aof,&key) == 0) goto werr; - if (rioWriteBulkObject(aof,o) == 0) goto werr; - } else if (o->type == OBJ_LIST) { - if (rewriteListObject(aof,&key,o) == 0) goto werr; - } else if (o->type == OBJ_SET) { - if (rewriteSetObject(aof,&key,o) == 0) goto werr; - } else if (o->type == OBJ_ZSET) { - if (rewriteSortedSetObject(aof,&key,o) == 0) goto werr; - } else if (o->type == OBJ_HASH) { - if (rewriteHashObject(aof,&key,o) == 0) goto werr; - } else if (o->type == OBJ_STREAM) { - if (rewriteStreamObject(aof,&key,o) == 0) goto werr; - } else if (o->type == OBJ_MODULE) { - if (rewriteModuleObject(aof,&key,o,j) == 0) goto werr; - } else { - serverPanic("Unknown object type"); - } + if (rewriteObject(aof, &key, o, j, expiretime) == C_ERR) goto werr; /* In fork child process, we can try to release memory back to the * OS and possibly avoid or decrease COW. We give the dismiss @@ -2455,14 +2480,6 @@ int rewriteAppendOnlyFileRio(rio *aof) { size_t dump_size = aof->processed_bytes - aof_bytes_before_key; if (server.in_fork_child) dismissObject(o, dump_size); - /* Save the expire time */ - if (expiretime != -1) { - char cmd[]="*3\r\n$9\r\nPEXPIREAT\r\n"; - if (rioWrite(aof,cmd,sizeof(cmd)-1) == 0) goto werr; - if (rioWriteBulkObject(aof,&key) == 0) goto werr; - if (rioWriteBulkLongLong(aof,expiretime) == 0) goto werr; - } - /* Update info every 1 second (approximately). * in order to avoid calling mstime() on each iteration, we will * check the diff every 1024 keys */ @@ -2480,6 +2497,7 @@ int rewriteAppendOnlyFileRio(rio *aof) { } kvstoreIteratorRelease(kvs_it); } + serverLog(LL_NOTICE, "AOF rewrite done, %ld keys saved, %llu keys skipped.", key_count, skipped); return C_OK; werr: diff --git a/src/blocked.c b/src/blocked.c index a1c8702a4..ee5a36514 100644 --- a/src/blocked.c +++ b/src/blocked.c @@ -76,7 +76,8 @@ void blockClient(client *c, int btype) { serverAssert(!(c->flags & CLIENT_MASTER && btype != BLOCKED_MODULE && btype != BLOCKED_LAZYFREE && - btype != BLOCKED_POSTPONE)); + btype != BLOCKED_POSTPONE && + btype != BLOCKED_POSTPONE_TRIM)); c->flags |= CLIENT_BLOCKED; c->bstate.btype = btype; @@ -191,7 +192,7 @@ void unblockClient(client *c, int queue_for_reprocessing) { } else if (c->bstate.btype == BLOCKED_MODULE) { if (moduleClientIsBlockedOnKeys(c)) unblockClientWaitingData(c); unblockClientFromModule(c); - } else if (c->bstate.btype == BLOCKED_POSTPONE) { + } else if (c->bstate.btype == BLOCKED_POSTPONE || c->bstate.btype == BLOCKED_POSTPONE_TRIM) { listDelNode(server.postponed_clients,c->postponed_list_node); c->postponed_list_node = NULL; } else if (c->bstate.btype == BLOCKED_SHUTDOWN) { @@ -293,7 +294,7 @@ void disconnectAllBlockedClients(void) { * command processing will start from scratch, and the command will * be either executed or rejected. (unlike LIST blocked clients for * which the command is already in progress in a way. */ - if (c->bstate.btype == BLOCKED_POSTPONE) + if (c->bstate.btype == BLOCKED_POSTPONE || c->bstate.btype == BLOCKED_POSTPONE_TRIM) continue; if (c->bstate.btype == BLOCKED_LAZYFREE) { @@ -639,15 +640,21 @@ void blockForAofFsync(client *c, mstime_t timeout, long long offset, int numloca /* Postpone client from executing a command. For example the server might be busy * requesting to avoid processing clients commands which will be processed later * when the it is ready to accept them. */ -void blockPostponeClient(client *c) { +void blockPostponeClientWithType(client *c, int btype) { + serverAssert(btype == BLOCKED_POSTPONE || btype == BLOCKED_POSTPONE_TRIM); c->bstate.timeout = 0; - blockClient(c,BLOCKED_POSTPONE); + blockClient(c, btype); listAddNodeTail(server.postponed_clients, c); c->postponed_list_node = listLast(server.postponed_clients); /* Mark this client to execute its command */ c->flags |= CLIENT_PENDING_COMMAND; } +/* Postpone client from executing a command. */ +void blockPostponeClient(client *c) { + blockPostponeClientWithType(c, BLOCKED_POSTPONE); +} + /* Block client due to shutdown command */ void blockClientShutdown(client *c) { blockClient(c, BLOCKED_SHUTDOWN); diff --git a/src/cluster.c b/src/cluster.c index 2f861b5c2..330907b60 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -20,6 +20,7 @@ #include "server.h" #include "cluster.h" +#include "cluster_asm.h" #include "cluster_slot_stats.h" #include @@ -279,7 +280,7 @@ void restoreCommand(client *c) { objectSetLRUOrLFU(kv, lfu_freq, lru_idle, lru_clock, 1000); signalModifiedKey(c,c->db,key); notifyKeyspaceEvent(NOTIFY_GENERIC,"restore",key,c->db->id); - + /* If we deleted a key that means REPLACE parameter was passed and the * destination key existed. */ if (deleted) { @@ -1016,6 +1017,11 @@ void clusterCommand(client *c) { addReplyError(c,"Invalid slot"); return; } + + if (!clusterCanAccessKeysInSlot(slot)) { + addReplyLongLong(c, 0); + return; + } addReplyLongLong(c,countKeysInSlot(slot)); } else if (!strcasecmp(c->argv[1]->ptr,"getkeysinslot") && c->argc == 4) { /* CLUSTER GETKEYSINSLOT */ @@ -1031,6 +1037,11 @@ void clusterCommand(client *c) { return; } + if (!clusterCanAccessKeysInSlot(slot)) { + addReplyArrayLen(c, 0); + return; + } + unsigned int keys_in_slot = countKeysInSlot(slot); unsigned int numkeys = maxkeys > keys_in_slot ? keys_in_slot : maxkeys; addReplyArrayLen(c,numkeys); @@ -1588,14 +1599,374 @@ void readonlyCommand(client *c) { addReply(c,shared.ok); } -void replySlotsFlushAndFree(client *c, SlotsFlush *sflush) { - addReplyArrayLen(c, sflush->numRanges); - for (int i = 0 ; i < sflush->numRanges ; i++) { - addReplyArrayLen(c, 2); - addReplyLongLong(c, sflush->ranges[i].first); - addReplyLongLong(c, sflush->ranges[i].last); +/* Remove all the keys in the specified hash slot. + * The number of removed items is returned. */ +unsigned int clusterDelKeysInSlot(unsigned int hashslot, int by_command) { + unsigned int j = 0; + + if (!kvstoreDictSize(server.db->keys, (int) hashslot)) + return 0; + + kvstoreDictIterator *kvs_di = NULL; + dictEntry *de = NULL; + kvs_di = kvstoreGetDictSafeIterator(server.db->keys, (int) hashslot); + while((de = kvstoreDictIteratorNext(kvs_di)) != NULL) { + enterExecutionUnit(1, 0); + sds sdskey = kvobjGetKey(dictGetKV(de)); + robj *key = createStringObject(sdskey, sdslen(sdskey)); + dbDelete(&server.db[0], key); + + signalModifiedKey(NULL, &server.db[0], key); + if (by_command) { + /* Keys are deleted by a command (trimslots), we need to notify the + * keyspace event. Though, we don't need to propagate the DEL + * command, as the command (trimslots) will be propagated. */ + notifyKeyspaceEvent(NOTIFY_GENERIC, "del", key, server.db[0].id); + } else { + /* Propagate the DEL command */ + propagateDeletion(&server.db[0], key, server.lazyfree_lazy_server_del); + /* The keys are not actually logically deleted from the database, + * just moved to another node. The modules needs to know that these + * keys are no longer available locally, so just send the keyspace + * notification to the modules, but not to clients. */ + moduleNotifyKeyspaceEvent(NOTIFY_GENERIC, "del", key, server.db[0].id); + } + exitExecutionUnit(); + postExecutionUnitOperations(); + decrRefCount(key); + j++; + server.dirty++; } - zfree(sflush); + kvstoreReleaseDictIterator(kvs_di); + return j; +} + +/* Delete the keys in the slot ranges. Returns the number of deleted items */ +unsigned int clusterDelKeysInSlotRangeArray(slotRangeArray *slots, int by_command) { + unsigned int j = 0; + for (int i = 0; i < slots->num_ranges; i++) { + for (int slot = slots->ranges[i].start; slot <= slots->ranges[i].end; slot++) { + j += clusterDelKeysInSlot(slot, by_command); + } + } + return j; +} + +int clusterIsMySlot(int slot) { + return getMyClusterNode() == getNodeBySlot(slot); +} + +void replySlotsFlushAndFree(client *c, slotRangeArray *slots) { + addReplyArrayLen(c, slots->num_ranges); + for (int i = 0 ; i < slots->num_ranges ; i++) { + addReplyArrayLen(c, 2); + addReplyLongLong(c, slots->ranges[i].start); + addReplyLongLong(c, slots->ranges[i].end); + } + slotRangeArrayFree(slots); +} + +/* Checks that slot ranges are well-formed and non-overlapping. */ +int validateSlotRanges(slotRangeArray *slots, sds *err) { + unsigned char used_slots[CLUSTER_SLOTS] = {0}; + + if (slots->num_ranges <= 0 || slots->num_ranges >= CLUSTER_SLOTS) { + *err = sdscatprintf(sdsempty(), "invalid number of slot ranges: %d", slots->num_ranges); + return C_ERR; + } + + for (int i = 0; i < slots->num_ranges; i++) { + if (slots->ranges[i].start >= CLUSTER_SLOTS || + slots->ranges[i].end >= CLUSTER_SLOTS) + { + *err = sdscatprintf(sdsempty(), "slot range is out of range: %d-%d", + slots->ranges[i].start, slots->ranges[i].end); + return C_ERR; + } + + if (slots->ranges[i].start > slots->ranges[i].end) { + *err = sdscatprintf(sdsempty(), "start slot number %d is greater than end slot number %d", + slots->ranges[i].start, slots->ranges[i].end); + return C_ERR; + } + + for (int j = slots->ranges[i].start; j <= slots->ranges[i].end; j++) { + if (used_slots[j]) { + *err = sdscatprintf(sdsempty(), "Slot %d specified multiple times", j); + return C_ERR; + } + used_slots[j]++; + } + } + return C_OK; +} + +/* Create a slot range array with the specified number of ranges. */ +slotRangeArray *slotRangeArrayCreate(int num_ranges) { + slotRangeArray *slots = zcalloc(sizeof(slotRangeArray) + num_ranges * sizeof(slotRange)); + slots->num_ranges = num_ranges; + return slots; +} + +/* Duplicate the slot range array. */ +slotRangeArray *slotRangeArrayDup(slotRangeArray *slots) { + slotRangeArray *dup = slotRangeArrayCreate(slots->num_ranges); + memcpy(dup->ranges, slots->ranges, sizeof(slotRange) * slots->num_ranges); + return dup; +} + +/* Set the slot range at the specified index. */ +void slotRangeArraySet(slotRangeArray *slots, int idx, int start, int end) { + slots->ranges[idx].start = start; + slots->ranges[idx].end = end; +} + +/* Create a slot range string in the format of: "1000-2000 3000-4000 ..." */ +sds slotRangeArrayToString(slotRangeArray *slots) { + sds s = sdsempty(); + + for (int i = 0; i < slots->num_ranges; i++) { + slotRange *sr = &slots->ranges[i]; + s = sdscatprintf(s, "%d-%d ", sr->start, sr->end); + } + sdssetlen(s, sdslen(s) - 1); + s[sdslen(s)] = '\0'; + + return s; +} + +/* Parse a slot range string in the format "1000-2000 3000-4000 ..." into a slotRangeArray. + * Returns a new slotRangeArray on success, NULL on failure. */ +slotRangeArray *slotRangeArrayFromString(sds data) { + int num_ranges; + long long start, end; + slotRangeArray *slots = NULL; + if (!data || sdslen(data) == 0) return NULL; + + sds *parts = sdssplitlen(data, sdslen(data), " ", 1, &num_ranges); + if (num_ranges <= 0) goto err; + + slots = slotRangeArrayCreate(num_ranges); + + /* Parse each slot range */ + for (int i = 0; i < num_ranges; i++) { + char *dash = strchr(parts[i], '-'); + if (!dash) goto err; + + if (string2ll(parts[i], dash - parts[i], &start) == 0 || + string2ll(dash + 1, sdslen(parts[i]) - (dash - parts[i]) - 1, &end) == 0) + goto err; + slotRangeArraySet(slots, i, start, end); + } + + /* Validate all ranges */ + sds err_msg = NULL; + if (validateSlotRanges(slots, &err_msg) != C_OK) { + if (err_msg) sdsfree(err_msg); + goto err; + } + sdsfreesplitres(parts, num_ranges); + return slots; + +err: + if (slots) slotRangeArrayFree(slots); + sdsfreesplitres(parts, num_ranges); + return NULL; +} + +static int compareSlotRange(const void *a, const void *b) { + const slotRange *sa = a; + const slotRange *sb = b; + if (sa->start < sb->start) return -1; + if (sa->start > sb->start) return 1; + return 0; +} + +/* Compare two slot range arrays, return 1 if equal, 0 otherwise */ +int slotRangeArrayIsEqual(slotRangeArray *slots1, slotRangeArray *slots2) { + if (slots1->num_ranges != slots2->num_ranges) return 0; + + /* Sort slot ranges first */ + qsort(slots1->ranges, slots1->num_ranges, sizeof(slotRange), compareSlotRange); + qsort(slots2->ranges, slots2->num_ranges, sizeof(slotRange), compareSlotRange); + + for (int i = 0; i < slots1->num_ranges; i++) { + if (slots1->ranges[i].start != slots2->ranges[i].start || + slots1->ranges[i].end != slots2->ranges[i].end) { + return 0; + } + } + return 1; +} + +/* Add a slot to the slot range array. + * Usage: + * slotRangeArray *slots = NULL + * slots = slotRangeArrayAppend(slots, 1000); + * slots = slotRangeArrayAppend(slots, 1001); + * slots = slotRangeArrayAppend(slots, 1003); + * slots = slotRangeArrayAppend(slots, 1004); + * slots = slotRangeArrayAppend(slots, 1005); + * + * Result: 1000-1001, 1003-1005 + * Note: `slot` must be greater than the previous slot. + * */ +slotRangeArray *slotRangeArrayAppend(slotRangeArray *slots, int slot) { + if (slots == NULL) { + slots = slotRangeArrayCreate(4); + slots->ranges[0].start = slot; + slots->ranges[0].end = slot; + slots->num_ranges = 1; + return slots; + } + + serverAssert(slots->num_ranges >= 0 && slots->num_ranges <= CLUSTER_SLOTS); + serverAssert(slot > slots->ranges[slots->num_ranges - 1].end); + + /* Check if we can extend the last range */ + slotRange *last = &slots->ranges[slots->num_ranges - 1]; + if (slot == last->end + 1) { + last->end = slot; + return slots; + } + + /* Calculate current capacity and reallocate if needed */ + int cap = (int) ((zmalloc_size(slots) - sizeof(slotRangeArray)) / sizeof(slotRange)); + if (slots->num_ranges >= cap) + slots = zrealloc(slots, sizeof(slotRangeArray) + sizeof(slotRange) * cap * 2); + + /* Add new single-slot range */ + slots->ranges[slots->num_ranges].start = slot; + slots->ranges[slots->num_ranges].end = slot; + slots->num_ranges++; + + return slots; +} + +/* Returns 1 if the slot range array contains the given slot, 0 otherwise. */ +int slotRangeArrayContains(slotRangeArray *slots, unsigned int slot) { + for (int i = 0; i < slots->num_ranges; i++) + if (slots->ranges[i].start <= slot && slots->ranges[i].end >= slot) + return 1; + return 0; +} + +/* Free the slot range array. */ +void slotRangeArrayFree(slotRangeArray *slots) { + zfree(slots); +} + +/* Generic version of slotRangeArrayFree(). */ +void slotRangeArrayFreeGeneric(void *slots) { + slotRangeArrayFree(slots); +} + +/* Slot range array iterator */ +slotRangeArrayIter *slotRangeArrayGetIterator(slotRangeArray *slots) { + slotRangeArrayIter *it = zmalloc(sizeof(*it)); + it->slots = slots; + it->range_index = 0; + it->cur_slot = slots->num_ranges > 0 ? slots->ranges[0].start : -1; + return it; +} + +/* Returns the next slot in the array, or -1 if there are no more slots. */ +int slotRangeArrayNext(slotRangeArrayIter *it) { + if (it->range_index >= it->slots->num_ranges) return -1; + + if (it->cur_slot < it->slots->ranges[it->range_index].end) { + it->cur_slot++; + } else { + it->range_index++; + if (it->range_index < it->slots->num_ranges) + it->cur_slot = it->slots->ranges[it->range_index].start; + else + it->cur_slot = -1; /* finished */ + } + return it->cur_slot; +} + +int slotRangeArrayGetCurrentSlot(slotRangeArrayIter *it) { + return it->cur_slot; +} + +void slotRangeArrayIteratorFree(slotRangeArrayIter *it) { + zfree(it); +} + +/* Parse slot ranges from the command arguments. Returns NULL on error. */ +slotRangeArray *parseSlotRangesOrReply(client *c, int argc, int pos) { + int start, end, count; + slotRangeArray *slots; + + serverAssert(pos <= argc); + serverAssert((argc - pos) % 2 == 0); + + count = (argc - pos) / 2; + slots = slotRangeArrayCreate(count); + slots->num_ranges = 0; + + for (int j = pos; j < argc; j += 2) { + if ((start = getSlotOrReply(c, c->argv[j])) == -1 || + (end = getSlotOrReply(c, c->argv[j + 1])) == -1) + { + slotRangeArrayFree(slots); + return NULL; + } + slotRangeArraySet(slots, slots->num_ranges, start, end); + slots->num_ranges++; + } + + sds err = NULL; + if (validateSlotRanges(slots, &err) != C_OK) { + addReplyErrorSds(c, err); + slotRangeArrayFree(slots); + return NULL; + } + return slots; +} + +/* Return 1 if the keys in the slot can be accessed, 0 otherwise. */ +int clusterCanAccessKeysInSlot(int slot) { + /* If not in cluster mode, all keys are accessible */ + if (server.cluster_enabled == 0) return 1; + + /* If the slot is being imported under old slot migration approach, we should + * allow to list keys from the slot as previously. */ + if (getImportingSlotSource(slot)) return 1; + + /* If using atomic slot migration, check if the slot belongs to the current + * node or its master, return 1 if so. */ + clusterNode *myself = getMyClusterNode(); + if (clusterNodeIsSlave(myself)) { + clusterNode *master = clusterNodeGetMaster(myself); + if (master && clusterNodeCoversSlot(master, slot)) + return 1; + } else { + if (clusterNodeCoversSlot(myself, slot)) + return 1; + } + return 0; +} + +/* Return the slot ranges that belong to the current node or its master. */ +slotRangeArray *clusterGetLocalSlotRanges(void) { + slotRangeArray *slots = NULL; + + if (!server.cluster_enabled) { + slots = slotRangeArrayCreate(1); + slotRangeArraySet(slots, 0, 0, CLUSTER_SLOTS - 1); + return slots; + } + + clusterNode *master = clusterNodeGetMaster(getMyClusterNode()); + if (master) { + for (int i = 0; i < CLUSTER_SLOTS; i++) { + if (clusterNodeCoversSlot(master, i)) + slots = slotRangeArrayAppend(slots, i); + } + } + return slots ? slots : slotRangeArrayCreate(0); } /* Partially flush destination DB in a cluster node, based on the slot range. @@ -1635,77 +2006,44 @@ void sflushCommand(client *c) { return; } - /* Verify slot pairs are valid and not overlapping */ - long long j, first, last; - unsigned char slotsToFlushRq[CLUSTER_SLOTS] = {0}; - for (j = 1; j < argc; j += 2) { - /* check if the first slot is valid */ - if (getLongLongFromObject(c->argv[j], &first) != C_OK || first < 0 || first >= CLUSTER_SLOTS) { - addReplyError(c,"Invalid or out of range slot"); - return; - } + /* Parse slot ranges from the command arguments. */ + slotRangeArray *slots = parseSlotRangesOrReply(c, argc, 1); + if (!slots) return; - /* check if the last slot is valid */ - if (getLongLongFromObject(c->argv[j+1], &last) != C_OK || last < 0 || last >= CLUSTER_SLOTS) { - addReplyError(c,"Invalid or out of range slot"); - return; - } - - if (first > last) { - addReplyErrorFormat(c,"start slot number %lld is greater than end slot number %lld", first, last); - return; - } - - /* Mark the slots in slotsToFlushRq[] */ - for (int i = first; i <= last; i++) { - if (slotsToFlushRq[i]) { - addReplyErrorFormat(c, "Slot %d specified multiple times", i); - return; + /* Iterate and find the slot ranges that belong to this node. Save them in + * a new slotRangeArray. It is allocated on heap since there is a chance + * that FLUSH SYNC will be running as blocking ASYNC and only later reply + * with slot ranges */ + unsigned char slots_to_flush[CLUSTER_SLOTS] = {0}; /* Requested slots to flush */ + slotRangeArray *myslots = NULL; + for (int i = 0; i < slots->num_ranges; i++) { + for (int j = slots->ranges[i].start; j <= slots->ranges[i].end; j++) { + if (clusterIsMySlot(j)) { + myslots = slotRangeArrayAppend(myslots, j); + slots_to_flush[j] = 1; } - slotsToFlushRq[i] = 1; } } - /* Verify slotsToFlushRq[] covers ALL slots of myNode. */ - clusterNode *myNode = getMyClusterNode(); - /* During iteration trace also the slot range pairs and save in SlotsFlush. - * It is allocated on heap since there is a chance that FLUSH SYNC will be - * running as blocking ASYNC and only later reply with slot ranges */ - int capacity = 32; /* Initial capacity */ - SlotsFlush *sflush = zmalloc(sizeof(SlotsFlush) + sizeof(SlotRange) * capacity); - sflush->numRanges = 0; - int inSlotRange = 0; + /* Verify that all slots of mynode got covered. See sflushCommand() comment. */ + int all_slots_covered = 1; for (int i = 0; i < CLUSTER_SLOTS; i++) { - if (myNode == getNodeBySlot(i)) { - if (!slotsToFlushRq[i]) { - addReplySetLen(c, 0); /* Not all slots of mynode got covered. See sflushCommand() comment. */ - zfree(sflush); - return; - } - - if (!inSlotRange) { /* If start another slot range */ - sflush->ranges[sflush->numRanges].first = i; - inSlotRange = 1; - } - } else { - if (inSlotRange) { /* If end another slot range */ - sflush->ranges[sflush->numRanges++].last = i - 1; - inSlotRange = 0; - /* If reached 'sflush' capacity, double the capacity */ - if (sflush->numRanges >= capacity) { - capacity *= 2; - sflush = zrealloc(sflush, sizeof(SlotsFlush) + sizeof(SlotRange) * capacity); - } - } + if (clusterIsMySlot(i) && !slots_to_flush[i]) { + all_slots_covered = 0; + break; } } + if (myslots == NULL || !all_slots_covered) { + addReplyArrayLen(c, 0); + slotRangeArrayFree(slots); + slotRangeArrayFree(myslots); + return; + } + slotRangeArrayFree(slots); - /* Update last pair if last cluster slot is also end of last range */ - if (inSlotRange) sflush->ranges[sflush->numRanges++].last = CLUSTER_SLOTS - 1; - /* Flush selected slots. If not flush as blocking async, then reply immediately */ - if (flushCommandCommon(c, FLUSH_TYPE_SLOTS, flags, sflush) == 0) - replySlotsFlushAndFree(c, sflush); + if (flushCommandCommon(c, FLUSH_TYPE_SLOTS, flags, myslots) == 0) + replySlotsFlushAndFree(c, myslots); } /* The READWRITE command just clears the READONLY command state. */ diff --git a/src/cluster.h b/src/cluster.h index dc99f8dc9..246cb5948 100644 --- a/src/cluster.h +++ b/src/cluster.h @@ -153,6 +153,9 @@ clusterNode *clusterLookupNode(const char *name, int length); const char *clusterGetSecret(size_t *len); unsigned int countKeysInSlot(unsigned int slot); int getSlotOrReply(client *c, robj *o); +int clusterIsMySlot(int slot); +int clusterCanAccessKeysInSlot(int slot); +struct slotRangeArray *clusterGetLocalSlotRanges(void); /* functions with shared implementations */ clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, int argc, int *hashslot, uint64_t cmd_flags, int *error_code); @@ -160,11 +163,44 @@ int clusterRedirectBlockedClientIfNeeded(client *c); void clusterRedirectClient(client *c, clusterNode *n, int hashslot, int error_code); void migrateCloseTimedoutSockets(void); int patternHashSlot(char *pattern, int length); +int getSlotOrReply(client *c, robj *o); int isValidAuxString(char *s, unsigned int length); void migrateCommand(client *c); void clusterCommand(client *c); ConnectionType *connTypeOfCluster(void); +typedef struct slotRange { + unsigned short start, end; +} slotRange; +typedef struct slotRangeArray { + int num_ranges; + slotRange ranges[]; +} slotRangeArray; +typedef struct slotRangeArrayIter { + slotRangeArray *slots; /* the array we’re iterating */ + int range_index; /* current range index */ + int cur_slot; /* current slot within the range */ +} slotRangeArrayIter; +slotRangeArray *slotRangeArrayCreate(int num_ranges); +slotRangeArray *slotRangeArrayDup(slotRangeArray *slots); +void slotRangeArraySet(slotRangeArray *slots, int idx, int start, int end); +sds slotRangeArrayToString(slotRangeArray *slots); +slotRangeArray *slotRangeArrayFromString(sds data); +int slotRangeArrayIsEqual(slotRangeArray *slots1, slotRangeArray *slots2); +slotRangeArray *slotRangeArrayAppend(slotRangeArray *slots, int slot); +int slotRangeArrayContains(slotRangeArray *slots, unsigned int slot); +void slotRangeArrayFree(slotRangeArray *slots); +void slotRangeArrayFreeGeneric(void *slots); +slotRangeArrayIter *slotRangeArrayGetIterator(slotRangeArray *slots); +int slotRangeArrayNext(slotRangeArrayIter *it); +int slotRangeArrayGetCurrentSlot(slotRangeArrayIter *it); +void slotRangeArrayIteratorFree(slotRangeArrayIter *it); +int validateSlotRanges(slotRangeArray *slots, sds *err); +slotRangeArray *parseSlotRangesOrReply(client *c, int argc, int pos); + +unsigned int clusterDelKeysInSlot(unsigned int hashslot, int by_command); +unsigned int clusterDelKeysInSlotRangeArray(slotRangeArray *slots, int by_command); + void clusterGenNodesSlotsInfo(int filter); void clusterFreeNodesSlotsInfo(clusterNode *n); int clusterNodeSlotInfoCount(clusterNode *n); @@ -184,4 +220,136 @@ clusterNode *clusterShardNodeFirst(void *shard); int clusterNodeTcpPort(clusterNode *node); int clusterNodeTlsPort(clusterNode *node); + +/* API for alternative cluster implementations to start and coordinate + * Atomic Slot Migration (ASM). + * + * These two functions drive ASM for alternative cluster implementations. + * - clusterAsmProcess(...) impl -> redis: initiates/advances/cancels ASM operations + * - clusterAsmOnEvent(...) redis -> impl: notifies state changes + * + * Generic steps for an alternative implementation: + * - On destination side, implementation calls clusterAsmProcess(ASM_EVENT_IMPORT_START) + * to start an import operation. + * - Redis calls clusterAsmOnEvent() when an ASM event occurs. + * - On the source side, Redis will call clusterAsmOnEvent(ASM_EVENT_HANDOFF_PREP) + * when slots are ready to be handed off and the write pause is needed. + * - Implementation stops the traffic to the slots and calls clusterAsmProcess(ASM_EVENT_HANDOFF) + * - On the destination side, Redis calls clusterAsmOnEvent(ASM_EVENT_TAKEOVER) + * when destination node is ready to take over the slot, waiting for ownership change. + * - Cluster implementation updates the config and calls clusterAsmProcess(ASM_EVENT_DONE) + * to notify Redis that the slots ownership has changed. + * + * Sequence diagram for import: + * - Note: shows only the events that cluster implementation needs to react. + * + * ┌───────────────┐ ┌───────────────┐ ┌───────────────┐ ┌───────────────┐ + * │ Destination │ │ Destination │ │ Source │ │ Source │ + * │ Cluster impl │ │ Master │ │ Master │ │ Cluster impl │ + * └───────┬───────┘ └───────┬───────┘ └───────┬───────┘ └───────┬───────┘ + * │ │ │ │ + * │ ASM_EVENT_IMPORT_START │ │ │ + * ├─────────────────────────────►│ │ │ + * │ │ CLUSTER SYNCSLOTS │ │ + * │ ├────────────────────────►│ │ + * │ │ │ │ + * │ │ SNAPSHOT(restore cmds) │ │ + * │ │◄────────────────────────┤ │ + * │ │ Repl stream │ │ + * │ │◄────────────────────────┤ │ + * │ │ │ ASM_EVENT_HANDOFF_PREP │ + * │ │ ├────────────────────────────►│ + * │ │ │ ASM_EVENT_HANDOFF │ + * │ │ │◄────────────────────────────┤ + * │ │ Drain repl stream │ │ + * │ │◄────────────────────────┤ │ + * │ ASM_EVENT_TAKEOVER │ │ │ + * │◄─────────────────────────────┤ │ │ + * │ │ │ │ + * │ ASM_EVENT_DONE │ │ │ + * ├─────────────────────────────►│ │ ASM_EVENT_DONE │ + * │ │ │◄────────────────────────────┤ + * │ │ │ │ + */ + +#define ASM_EVENT_IMPORT_START 1 /* Start a new import operation (destination side) */ +#define ASM_EVENT_CANCEL 2 /* Cancel an ongoing import/migrate operation (source and destination side) */ +#define ASM_EVENT_HANDOFF_PREP 3 /* Slot is ready to be handed off to the destination shard (source side) */ +#define ASM_EVENT_HANDOFF 4 /* Notify that the slot can be handed off (source side) */ +#define ASM_EVENT_TAKEOVER 5 /* Ready to take over the slot, waiting for config change (destination side) */ +#define ASM_EVENT_DONE 6 /* Notify that import/migrate is completed, config is updated (source and destination side) */ + +#define ASM_EVENT_IMPORT_PREP 7 /* Import is about to start, the implementation may reject by returning C_ERR */ +#define ASM_EVENT_IMPORT_STARTED 8 /* Import started */ +#define ASM_EVENT_IMPORT_FAILED 9 /* Import failed */ +#define ASM_EVENT_IMPORT_COMPLETED 10 /* Import completed (config updated) */ +#define ASM_EVENT_MIGRATE_PREP 11 /* Migrate is about to start, the implementation may reject by returning C_ERR */ +#define ASM_EVENT_MIGRATE_STARTED 12 /* Migrate started */ +#define ASM_EVENT_MIGRATE_FAILED 13 /* Migrate failed */ +#define ASM_EVENT_MIGRATE_COMPLETED 14 /* Migrate completed (config updated) */ + + +/* Called by cluster implementation to request an ASM operation. (cluster impl --> redis) + * Valid values for 'event': + * ASM_EVENT_IMPORT_START + * ASM_EVENT_CANCEL + * ASM_EVENT_HANDOFF + * ASM_EVENT_DONE + * + * For ASM_EVENT_IMPORT_START, 'task_id' should be a unique string. + * For other events (ASM_EVENT_CANCEL, ASM_EVENT_HANDOFF, ASM_EVENT_DONE), + * 'task_id' should match the ID from the corresponding import operation. + * Usage: + * char *task_id = malloc(CLUSTER_NAMELEN + 1); + * getRandomHexChars(task_id, CLUSTER_NAMELEN); + * task_id[CLUSTER_NAMELEN] = '\0'; + * + * slotRangeArray *slots = slotRangeArrayCreate(1); + * slotRangeArraySet(slots, 0, 0, 1000); + * + * const char *err = NULL; + * int ret = clusterAsmProcess(task_id, ASM_EVENT_IMPORT_START, slots, &err); + * zfree(task_id); + * slotRangeArrayFree(slots); + * + * if (ret != C_OK) { + * perror(err); + * return; + * } + * + * For ASM_EVENT_CANCEL, if `task_id` is NULL, all tasks will be cancelled. + * If `arg` parameter is provided, it should be a pointer to an int. It will be + * set to the number of tasks cancelled. + * + * Return value: + * - Returns C_OK on success, C_ERR on failure and 'err' will be set to the + * error message. + * + * Memory management: + * - There is no ownership transfer of 'task_id', 'err' or `slotRangeArray`. + * - `task_id` and `slotRangeArray` should be allocated and be freed by the + * caller. Redis internally will make a copy of these. + * - `err` is allocated by Redis and should NOT be freed by the caller. + **/ +int clusterAsmProcess(const char *task_id, int event, void *arg, char **err); + +/* Called when an ASM event occurs to notify the cluster implementation. (redis --> cluster impl) + * + * `arg` will point to a `slotRangeArray` for the following events: + * ASM_EVENT_IMPORT_PREP + * ASM_EVENT_IMPORT_STARTED + * ASM_EVENT_MIGRATE_PREP + * ASM_EVENT_MIGRATE_STARTED + * ASM_EVENT_HANDOFF_PREP + * + * Memory management: + * - Redis owns the `task_id` and `slotRangeArray`. + * + * Returns C_OK on success. + * + * If the cluster implementation returns C_ERR for ASM_EVENT_IMPORT_PREP or + * ASM_EVENT_MIGRATE_PREP, operation will not start. + **/ +int clusterAsmOnEvent(const char *task_id, int event, void *arg); + #endif /* __CLUSTER_H */ diff --git a/src/cluster_asm.c b/src/cluster_asm.c new file mode 100644 index 000000000..9abd85d3f --- /dev/null +++ b/src/cluster_asm.c @@ -0,0 +1,3467 @@ +/* + * Copyright (c) 2025-Present, Redis Ltd. + * All rights reserved. + * + * Licensed under your choice of (a) the Redis Source Available License 2.0 + * (RSALv2); or (b) the Server Side Public License v1 (SSPLv1); or (c) the + * GNU Affero General Public License v3 (AGPLv3). + */ + +#include "server.h" +#include "cluster.h" +#include "functions.h" +#include "cluster_legacy.h" +#include "cluster_asm.h" +#include "cluster_slot_stats.h" + +#define ASM_IMPORT (1 << 1) +#define ASM_MIGRATE (1 << 2) + +#define ASM_DEBUG_TRIM_DEFAULT 0 +#define ASM_DEBUG_TRIM_NONE 1 +#define ASM_DEBUG_TRIM_BG 2 +#define ASM_DEBUG_TRIM_ACTIVE 3 + +#define ASM_AOF_MIN_ITEMS_PER_KEY 512 /* Minimum number of items per key to use AOF format encoding */ + +typedef struct asmTask { + sds id; /* Task ID */ + int operation; /* Either ASM_IMPORT or ASM_MIGRATE */ + slotRangeArray *slots; /* List of slot ranges for this migration task */ + int state; /* Current state of the task */ + int dest_state; /* Destination node's main state (approximate) */ + char source[CLUSTER_NAMELEN]; /* Source node name */ + char dest[CLUSTER_NAMELEN]; /* Destination node name */ + clusterNode *source_node; /* Source node */ + connection *main_channel_conn; /* Main channel connection */ + connection *rdb_channel_conn; /* RDB channel connection */ + int rdb_channel_state; /* State of the RDB channel */ + unsigned long long dest_offset; /* Destination offset */ + unsigned long long source_offset; /* Source offset */ + int cross_slot_during_propagating; /* If cross-slot commands are encountered during propagating */ + int stream_eof_during_streaming; /* If STREAM-EOF is received during streaming buffer */ + replDataBuf sync_buffer; /* Buffer for the stream */ + client *main_channel_client; /* Client for the main channel on the source side */ + client *rdb_channel_client; /* Client for the RDB channel on the source side */ + long long retry_count; /* Number of retries for this task */ + mstime_t create_time; /* Task creation time */ + mstime_t start_time; /* Task start time */ + mstime_t end_time; /* Task end time */ + mstime_t paused_time; /* The time when the slot writes were paused */ + mstime_t dest_slots_snapshot_time; /* The time when the destination starts applying the slot snapshot */ + mstime_t dest_accum_applied_time; /* The time when the destination finishes applying the accumulated buffer */ + sds error; /* Error message for this task */ + redisOpArray *pre_snapshot_module_cmds; /* Module commands to be propagated at the beginning of slot migration */ +} asmTask; + +struct asmManager { + list *tasks; /* List of asmTask to be processed */ + list *archived_tasks; /* List of archived asmTask */ + list *pending_trim_jobs; /* List of pending trim jobs (due to write pause) */ + list *active_trim_jobs; /* List of active trim jobs */ + slotRangeArrayIter *active_trim_it; /* Iterator of the current active trim job */ + size_t sync_buffer_peak; /* Peak size of sync buffer */ + asmTask *master_task; /* The task that is currently active on the master */ + + /* Fail point injection for debugging */ + int debug_failed_channel; /* Channel where the task failed */ + int debug_failed_state; /* State where the task failed */ + int debug_trim_method; /* Method to trim the buffer */ + int debug_active_trim_delay; /* Sleep before trimming each key */ + + /* Active trim stats */ + unsigned long long active_trim_started; /* Number of times active trim was started */ + unsigned long long active_trim_completed; /* Number of times active trim was completed */ + unsigned long long active_trim_cancelled; /* Number of times active trim was cancelled */ + unsigned long long active_trim_current_job_keys; /* Total number of keys to trim in the current job */ + unsigned long long active_trim_current_job_trimmed; /* Number of keys trimmed in the current job */ +}; + +enum asmState { + /* Common state */ + ASM_NONE = 0, + ASM_CONNECTING, + ASM_AUTH_REPLY, + ASM_CANCELED, + ASM_FAILED, + ASM_COMPLETED, + + /* Import state */ + ASM_SEND_HANDSHAKE, + ASM_HANDSHAKE_REPLY, + ASM_SEND_SYNCSLOTS, + ASM_SYNCSLOTS_REPLY, + ASM_INIT_RDBCHANNEL, + ASM_ACCUMULATE_BUF, + ASM_READY_TO_STREAM, + ASM_STREAMING_BUF, + ASM_WAIT_STREAM_EOF, + ASM_TAKEOVER, + + /* Migrate state */ + ASM_WAIT_RDBCHANNEL, + ASM_WAIT_BGSAVE_START, + ASM_SEND_BULK_AND_STREAM, + ASM_SEND_STREAM, + ASM_HANDOFF_PREP, + ASM_HANDOFF, + ASM_STREAM_EOF, + + /* RDB channel state */ + ASM_RDBCHANNEL_REQUEST, + ASM_RDBCHANNEL_REPLY, + ASM_RDBCHANNEL_TRANSFER, +}; + +enum asmChannel { + ASM_IMPORT_MAIN_CHANNEL = 1, /* Main channel for the import task */ + ASM_IMPORT_RDB_CHANNEL, /* RDB channel for the import task */ + ASM_MIGRATE_MAIN_CHANNEL, /* Main channel for the migrate task */ + ASM_MIGRATE_RDB_CHANNEL /* RDB channel for the migrate task */ +}; + +/* Global ASM manager */ +struct asmManager *asmManager = NULL; + +/* replication.c */ +char *sendCommand(connection *conn, ...); +char *sendCommandArgv(connection *conn, int argc, char **argv, size_t *argv_lens); +char *receiveSynchronousResponse(connection *conn); +ConnectionType *connTypeOfReplication(void); +int startBgsaveForReplication(int mincapa, int req); +void createReplicationBacklogIfNeeded(void); +/* cluster.c */ +void createDumpPayload(rio *payload, robj *o, robj *key, int dbid); +/* cluster_asm.c */ +static void asmStartImportTask(asmTask *task); +static void asmTaskCancel(asmTask *task, const char *reason); +static void asmSyncBufferReadFromConn(connection *conn); +static void propagateTrimSlots(slotRangeArray *slots); +void asmTrimJobSchedule(slotRangeArray *slots); +void asmTrimJobProcessPending(void); +void asmTriggerActiveTrim(slotRangeArray *slots); +void asmActiveTrimEnd(void); +int asmIsAnyTrimJobOverlaps(slotRangeArray *slots); +void asmTrimSlotsIfNotOwned(slotRangeArray *slots); +void asmNotifyStateChange(asmTask *task, int event); + +void asmInit(void) { + asmManager = zcalloc(sizeof(*asmManager)); + asmManager->tasks = listCreate(); + asmManager->archived_tasks = listCreate(); + asmManager->pending_trim_jobs = listCreate(); + asmManager->sync_buffer_peak = 0; + asmManager->master_task = NULL; + asmManager->debug_failed_channel = 0; + asmManager->debug_failed_state = 0; + asmManager->debug_trim_method = ASM_DEBUG_TRIM_DEFAULT; + asmManager->debug_active_trim_delay = 0; + asmManager->active_trim_jobs = listCreate(); + asmManager->active_trim_started = 0; + asmManager->active_trim_completed = 0; + asmManager->active_trim_cancelled = 0; + listSetFreeMethod(asmManager->active_trim_jobs, slotRangeArrayFreeGeneric); +} + +char *asmTaskStateToString(int state) { + switch (state) { + case ASM_NONE: return "none"; + case ASM_CONNECTING: return "connecting"; + case ASM_AUTH_REPLY: return "auth-reply"; + case ASM_CANCELED: return "canceled"; + case ASM_FAILED: return "failed"; + case ASM_COMPLETED: return "completed"; + + /* Import state */ + case ASM_SEND_HANDSHAKE: return "send-handshake"; + case ASM_HANDSHAKE_REPLY: return "handshake-reply"; + case ASM_SEND_SYNCSLOTS: return "send-syncslots"; + case ASM_SYNCSLOTS_REPLY: return "syncslots-reply"; + case ASM_INIT_RDBCHANNEL: return "init-rdbchannel"; + case ASM_ACCUMULATE_BUF: return "accumulate-buffer"; + case ASM_READY_TO_STREAM: return "ready-to-stream"; + case ASM_STREAMING_BUF: return "streaming-buffer"; + case ASM_WAIT_STREAM_EOF: return "wait-stream-eof"; + case ASM_TAKEOVER: return "takeover"; + + /* Migrate state */ + case ASM_WAIT_RDBCHANNEL: return "wait-rdbchannel"; + case ASM_WAIT_BGSAVE_START: return "wait-bgsave-start"; + case ASM_SEND_BULK_AND_STREAM: return "send-bulk-and-stream"; + case ASM_SEND_STREAM: return "send-stream"; + case ASM_HANDOFF_PREP: return "handoff-prep"; + case ASM_HANDOFF: return "handoff"; + case ASM_STREAM_EOF: return "stream-eof"; + + /* RDB channel state */ + case ASM_RDBCHANNEL_REQUEST: return "rdbchannel-request"; + case ASM_RDBCHANNEL_REPLY: return "rdbchannel-reply"; + case ASM_RDBCHANNEL_TRANSFER: return "rdbchannel-transfer"; + + default: return "unknown"; + } + serverAssert(0); /* Unreachable */ +} + +const char *asmChannelToString(int channel) { + switch (channel) { + case ASM_IMPORT_MAIN_CHANNEL: return "import-main-channel"; + case ASM_IMPORT_RDB_CHANNEL: return "import-rdb-channel"; + case ASM_MIGRATE_MAIN_CHANNEL: return "migrate-main-channel"; + case ASM_MIGRATE_RDB_CHANNEL: return "migrate-rdb-channel"; + default: return "unknown"; + } +} + +int asmDebugSetFailPoint(char * channel, char *state) { + if (!asmManager) { + serverLog(LL_WARNING, "ASM manager is not initialized"); + return C_ERR; + } + asmManager->debug_failed_channel = 0; + asmManager->debug_failed_state = 0; + if (!channel && !state) return C_ERR; + if (sdslen(channel) == 0 && sdslen(state) == 0) { + serverLog(LL_WARNING, "ASM fail point is cleared"); + return C_OK; + } + + for (int i = ASM_IMPORT_MAIN_CHANNEL; i <= ASM_MIGRATE_RDB_CHANNEL; i++) { + if (!strcasecmp(channel, asmChannelToString(i))) { + asmManager->debug_failed_channel = i; + break; + } + } + if (asmManager->debug_failed_channel == 0) return C_ERR; + + for (int i = ASM_NONE; i <= ASM_RDBCHANNEL_TRANSFER; i++) { + if (!strcasecmp(state, asmTaskStateToString(i))) { + asmManager->debug_failed_state = i; + break; + } + } + if (asmManager->debug_failed_state == 0) return C_ERR; + + serverLog(LL_NOTICE, "ASM fail point set: channel=%s, state=%s", channel, state); + return C_OK; +} + +int asmDebugSetTrimMethod(const char *method, int active_trim_delay) { + if (!asmManager) { + serverLog(LL_WARNING, "ASM manager is not initialized"); + return C_ERR; + } + int prev = asmManager->debug_trim_method; + if (!strcasecmp(method, "default")) asmManager->debug_trim_method = ASM_DEBUG_TRIM_DEFAULT; + else if (!strcasecmp(method, "none")) asmManager->debug_trim_method = ASM_DEBUG_TRIM_NONE; + else if (!strcasecmp(method, "bg")) asmManager->debug_trim_method = ASM_DEBUG_TRIM_BG; + else if (!strcasecmp(method, "active")) asmManager->debug_trim_method = ASM_DEBUG_TRIM_ACTIVE; + else return C_ERR; + + /* If we are switching from none to default, delete all the keys in the + * slots we don't own */ + if (prev == ASM_DEBUG_TRIM_NONE && asmManager->debug_trim_method != ASM_DEBUG_TRIM_NONE) { + for (int i = 0; i < CLUSTER_SLOTS; i++) + if (!clusterIsMySlot(i)) + clusterDelKeysInSlot(i, 0); + } + asmManager->debug_active_trim_delay = active_trim_delay; + serverLog(LL_NOTICE, "ASM trim method was set=%s, active_trim_delay=%d", method, active_trim_delay); + return C_OK; +} + +int asmDebugIsFailPointActive(int channel, int state) { + if (!asmManager) return 0; /* ASM manager not initialized */ + if (asmManager->debug_failed_channel == channel && asmManager->debug_failed_state == state) { + serverLog(LL_NOTICE, "ASM fail point active: channel=%s, state=%s", + asmChannelToString(channel), asmTaskStateToString(state)); + return 1; + } + return 0; +} + +sds asmCatInfoString(sds info) { + int active_tasks = 0; + + listIter li; + listNode *ln; + listRewind(asmManager->tasks, &li); + while ((ln = listNext(&li)) != NULL) { + asmTask *task = listNodeValue(ln); + if (task->operation == ASM_IMPORT || + (task->operation == ASM_MIGRATE && task->state != ASM_FAILED)) + { + active_tasks++; + } + } + + return sdscatprintf(info ? info : sdsempty(), + "cluster_slot_migration_active_tasks:%d\r\n" + "cluster_slot_migration_active_trim_running:%lu\r\n" + "cluster_slot_migration_active_trim_current_job_keys:%llu\r\n" + "cluster_slot_migration_active_trim_current_job_trimmed:%llu\r\n" + "cluster_slot_migration_stats_active_trim_started:%llu\r\n" + "cluster_slot_migration_stats_active_trim_completed:%llu\r\n" + "cluster_slot_migration_stats_active_trim_cancelled:%llu\r\n", + active_tasks, + listLength(asmManager->active_trim_jobs), + asmManager->active_trim_current_job_keys, + asmManager->active_trim_current_job_trimmed, + asmManager->active_trim_started, + asmManager->active_trim_completed, + asmManager->active_trim_cancelled); +} + +void asmTaskReset(asmTask *task) { + task->state = ASM_NONE; + task->dest_state = ASM_NONE; + task->rdb_channel_state = ASM_NONE; + task->main_channel_conn = NULL; + task->rdb_channel_conn = NULL; + task->dest_offset = 0; + task->source_offset = 0; + task->stream_eof_during_streaming = 0; + task->cross_slot_during_propagating = 0; + replDataBufInit(&task->sync_buffer); + task->main_channel_client = NULL; + task->rdb_channel_client = NULL; + task->paused_time = 0; + task->dest_slots_snapshot_time = 0; + task->dest_accum_applied_time = 0; + task->pre_snapshot_module_cmds = NULL; +} + +asmTask *asmTaskCreate(const char *task_id) { + asmTask *task = zcalloc(sizeof(*task)); + task->error = sdsempty(); + asmTaskReset(task); + task->slots = NULL; + task->source_node = NULL; + task->retry_count = 0; + task->create_time = server.mstime; + task->start_time = -1; + task->end_time = -1; + if (task_id) { + task->id = sdsnew(task_id); + } else { + task->id = sdsnewlen(NULL, CLUSTER_NAMELEN); + getRandomHexChars(task->id, CLUSTER_NAMELEN); + } + + return task; +} + +void asmTaskFree(asmTask *task) { + replDataBufClear(&task->sync_buffer); + sdsfree(task->id); + slotRangeArrayFree(task->slots); + sdsfree(task->error); + zfree(task); +} + +/* Convert the task state to the corresponding event. */ +int asmTaskStateToEvent(asmTask *task) { + if (task->operation == ASM_IMPORT) { + if (task->state == ASM_COMPLETED) return ASM_EVENT_IMPORT_COMPLETED; + else if (task->state == ASM_FAILED) return ASM_EVENT_IMPORT_FAILED; + else return ASM_EVENT_IMPORT_STARTED; + } else { + if (task->state == ASM_COMPLETED) return ASM_EVENT_MIGRATE_COMPLETED; + else if (task->state == ASM_FAILED) return ASM_EVENT_MIGRATE_FAILED; + else return ASM_EVENT_MIGRATE_STARTED; + } +} + +/* Serialize ASM task information into a string for transmission to replicas. + * Format: "task_id:source_node:dest_node:operation:state:slot_ranges" + * Where slot_ranges is in the format "1000-2000 3000-4000 ..." */ +sds asmTaskSerialize(asmTask *task) { + sds serialized = sdsempty(); + + /* Add task ID */ + serialized = sdscatprintf(serialized, "%s:", task->id); + + /* Add source node ID (40 chars) */ + serialized = sdscatlen(serialized, task->source, CLUSTER_NAMELEN); + serialized = sdscat(serialized, ":"); + + /* Add destination node ID (40 chars) */ + serialized = sdscatlen(serialized, task->dest, CLUSTER_NAMELEN); + serialized = sdscat(serialized, ":"); + + /* Add operation type */ + serialized = sdscatprintf(serialized, "%s:", task->operation == ASM_IMPORT ? + "import" : "migrate"); + + /* Add current state */ + serialized = sdscatprintf(serialized, "%s:", asmTaskStateToString(task->state)); + + /* Add slot ranges sds */ + sds slots_str = slotRangeArrayToString(task->slots); + serialized = sdscatprintf(serialized, "%s", slots_str); + sdsfree(slots_str); + + return serialized; +} + +/* Deserialize ASM task information from a string and create a complete asmTask. + * Format: "task_id:source_node:dest_node:operation:state:slot_ranges" + * Returns a new asmTask on success, NULL on failure. */ +asmTask *asmTaskDeserialize(sds data) { + int count, idx = 0; + asmTask *task = NULL; + if (!data || sdslen(data) == 0) return NULL; + + sds *parts = sdssplitlen(data, sdslen(data), ":", 1, &count); + if (count < 6) goto err; + + /* Parse task ID */ + if (sdslen(parts[idx]) == 0) goto err; + task = asmTaskCreate(parts[idx]); + if (!task) goto err; + idx++; + + /* Parse source node ID */ + if (sdslen(parts[idx]) != CLUSTER_NAMELEN) goto err; + memcpy(task->source, parts[idx], CLUSTER_NAMELEN); + idx++; + + /* Parse destination node ID */ + if (sdslen(parts[idx]) != CLUSTER_NAMELEN) goto err; + memcpy(task->dest, parts[idx], CLUSTER_NAMELEN); + idx++; + + /* Parse operation type */ + if (!strcasecmp(parts[idx], "import")) { + task->operation = ASM_IMPORT; + } else if (!strcasecmp(parts[idx], "migrate")) { + task->operation = ASM_MIGRATE; + } else { + goto err; + } + idx++; + + /* Parse state */ + task->state = ASM_NONE; /* Default state */ + for (int state = ASM_NONE; state <= ASM_RDBCHANNEL_TRANSFER; state++) { + if (!strcasecmp(parts[idx], asmTaskStateToString(state))) { + task->state = state; + break; + } + } + idx++; + + /* Parse slot ranges */ + task->slots = slotRangeArrayFromString(parts[idx]); + if (!task->slots) goto err; + idx++; + + /* Ignore any extra fields for future compatibility */ + + sdsfreesplitres(parts, count); + return task; + +err: + if (task) asmTaskFree(task); + sdsfreesplitres(parts, count); + return NULL; +} + +/* Notify replicas about ASM task information to maintain consistency during + * slot migration. This function sends a CLUSTER SYNCSLOTS CONF ASM-TASK command + * to all connected replicas with the serialized task information. */ +void asmNotifyReplicasStateChange(struct asmTask *task) { + if (!server.cluster_enabled || !clusterNodeIsMaster(getMyClusterNode())) return; + + /* Do not propagate migrate task to replicas, as replicas never migrate data. */ + if (task->operation == ASM_MIGRATE) return; + + /* Create command arguments for CLUSTER SYNCSLOTS CONF ASM-TASK */ + robj *argv[5]; + argv[0] = createStringObject("CLUSTER", 7); + argv[1] = createStringObject("SYNCSLOTS", 9); + argv[2] = createStringObject("CONF", 4); + argv[3] = createStringObject("ASM-TASK", 8); + argv[4] = createObject(OBJ_STRING, asmTaskSerialize(task)); + + /* Send the command to all replicas */ + replicationFeedSlaves(server.slaves, -1, argv, 5); + + /* Clean up command objects */ + for (int i = 0; i < 5; i++) { + decrRefCount(argv[i]); + } +} + +/* Dump the active import ASM task information. */ +sds asmDumpActiveImportTask(void) { + if (!server.cluster_enabled) return NULL; + + /* For replica, dump the master active task. */ + if (clusterNodeIsSlave(getMyClusterNode()) && + asmManager->master_task && + asmManager->master_task->state != ASM_FAILED && + asmManager->master_task->state != ASM_COMPLETED) + { + return asmTaskSerialize(asmManager->master_task); + } + + /* For master, dump the first active task. */ + if (!asmManager || listLength(asmManager->tasks) == 0) return NULL; + asmTask *task = listNodeValue(listFirst(asmManager->tasks)); + if (task->operation == ASM_MIGRATE) return NULL; + if (task->state == ASM_NONE || task->state == ASM_FAILED || + task->state == ASM_COMPLETED) return NULL; + + return asmTaskSerialize(task); +} + +size_t asmGetPeakSyncBufferSize(void) { + if (!asmManager) return 0; + /* Compute peak sync buffer usage. The current task's peak may not + * reflect in asmManager->sync_buffer_peak immediately. */ + size_t peak = asmManager->sync_buffer_peak; + asmTask *task = listFirst(asmManager->tasks) ? + listNodeValue(listFirst(asmManager->tasks)) : NULL; + if (task && task->operation == ASM_IMPORT) + peak = max(task->sync_buffer.peak, asmManager->sync_buffer_peak); + + return peak; +} + +size_t asmGetImportInputBufferSize(void) { + if (!asmManager || listLength(asmManager->tasks) == 0) return 0; + + asmTask *task = listNodeValue(listFirst(asmManager->tasks)); + if (task->operation == ASM_IMPORT) + return task->sync_buffer.mem_used; + + return 0; +} + +size_t asmGetMigrateOutputBufferSize(void) { + if (!asmManager || listLength(asmManager->tasks) == 0) return 0; + + asmTask *task = listNodeValue(listFirst(asmManager->tasks)); + if (task->operation == ASM_MIGRATE && task->main_channel_client) + return getClientOutputBufferMemoryUsage(task->main_channel_client); + + return 0; +} + +/* Returns the ASM task with the given ID, or NULL if no such task exists. */ +static asmTask *asmLookupTaskAt(list *tasks, const char *id) { + listIter li; + listNode *ln; + + listRewind(tasks, &li); + while ((ln = listNext(&li)) != NULL) { + asmTask *task = listNodeValue(ln); + if (!strcmp(task->id, id)) return task; + } + return NULL; +} + +/* Returns the ASM task with the given ID, or NULL if no such task exists. */ +asmTask *asmLookupTaskById(const char *id) { + return asmLookupTaskAt(asmManager->tasks, id); +} + +/* Returns the ASM task that is identical to the given slot range array, or NULL + * if no such task exists. */ +asmTask *asmLookupTaskBySlotRangeArray(slotRangeArray *slots) { + listIter li; + listNode *ln; + + listRewind(asmManager->tasks, &li); + while ((ln = listNext(&li)) != NULL) { + asmTask *task = listNodeValue(ln); + if (slotRangeArrayIsEqual(task->slots, slots)) + return task; + } + return NULL; +} + +/* Returns the slot range array for the given task ID */ +slotRangeArray *asmTaskGetSlotRanges(const char *task_id) { + asmTask *task = NULL; + if (!task_id || (task = asmLookupTaskById(task_id)) == NULL) return NULL; + + return task->slots; +} + +/* Returns 1 if the slot range array overlaps with the given slot range. */ +static int slotRangeArrayOverlaps(slotRangeArray *slots, slotRange *req) { + for (int i = 0; i < slots->num_ranges; i++) { + slotRange *sr = &slots->ranges[i]; + if (sr->start <= req->end && sr->end >= req->start) + return 1; + } + return 0; +} + +/* Returns 1 if the two slot range arrays overlap, 0 otherwise. */ +static int slotRangeArraysOverlap(slotRangeArray *slots1, slotRangeArray *slots2) { + for (int i = 0; i < slots1->num_ranges; i++) { + slotRange *sr1 = &slots1->ranges[i]; + if (slotRangeArrayOverlaps(slots2, sr1)) return 1; + } + return 0; +} + +/* Returns the ASM task that overlaps with the given slot range, or NULL if + * no such task exists. */ +static asmTask *lookupAsmTaskBySlotRange(slotRange *req) { + listIter li; + listNode *ln; + + listRewind(asmManager->tasks, &li); + while ((ln = listNext(&li)) != NULL) { + asmTask *task = listNodeValue(ln); + if (slotRangeArrayOverlaps(task->slots, req)) + return task; + } + return NULL; +} + +/* Validates the given slot ranges for a migration task: + * - Ensures the current node is a master. + * - Verifies all slots are in a STABLE state. + * - Checks that slot ranges are well-formed and non-overlapping. + * - Confirms all slots belong to a single source node. + * - Confirms no ongoing import task that overlaps with the slot ranges. + * + * Returns the source node if validation succeeds. + * Otherwise, returns NULL and sets 'err' variable. */ +static clusterNode *validateImportSlotRanges(slotRangeArray *slots, sds *err, asmTask *current) { + clusterNode *source = NULL; + + *err = NULL; + + /* Ensure this is a master node */ + if (!clusterNodeIsMaster(getMyClusterNode())) { + *err = sdsnew("slot migration not allowed on replica."); + goto out; + } + + /* Ensure no manual migration is in progress. */ + for (int i = 0; i < CLUSTER_SLOTS; i++) { + if (getImportingSlotSource(i) != NULL || + getMigratingSlotDest(i) != NULL) + { + *err = sdsnew("all slot states must be STABLE to start a slot migration task."); + goto out; + } + } + + for (int i = 0; i < slots->num_ranges; i++) { + slotRange *sr = &slots->ranges[i]; + + /* Ensure no import task overlaps with this slot range. + * Skip check current task that is running for this slot range. */ + asmTask *task = lookupAsmTaskBySlotRange(sr); + if (task && task != current && task->operation == ASM_IMPORT) { + *err = sdscatprintf(sdsempty(), + "overlapping import exists for slot range: %d-%d", + sr->start, sr->end); + goto out; + } + + /* Validate if we can start migration task for this slot range. */ + for (int j = sr->start; j <= sr->end; j++) { + clusterNode *node = getNodeBySlot(j); + if (node == NULL) { + *err = sdscatprintf(sdsempty(), "slot has no owner: %d", j); + goto out; + } + + if (!source) { + source = node; + } else if (source != node) { + *err = sdsnew("slots belong to different source nodes"); + goto out; + } + } + } + +out: + return *err ? NULL : source; +} + +/* Returns 1 if a task with the specified operation is in progress, 0 otherwise. */ +static int asmTaskInProgress(int operation) { + listIter li; + listNode *ln; + + if (!asmManager || listLength(asmManager->tasks) == 0) return 0; + + listRewind(asmManager->tasks, &li); + while ((ln = listNext(&li)) != NULL) { + asmTask *task = listNodeValue(ln); + if (task->operation == operation) return 1; + } + return 0; +} + +/* Returns 1 if a migrate task is in progress, 0 otherwise. */ +int asmMigrateInProgress(void) { + return asmTaskInProgress(ASM_MIGRATE); +} + +/* Returns 1 if an import task is in progress, 0 otherwise. */ +int asmImportInProgress(void) { + return asmTaskInProgress(ASM_IMPORT); +} + +/* Returns 1 if the task is in a state where it can receive replication stream +* for the slot range, 0 otherwise. */ +inline static int asmCanFeedMigrationClient(asmTask *task) { + return task->operation == ASM_MIGRATE && + !task->cross_slot_during_propagating && + (task->state == ASM_SEND_BULK_AND_STREAM || + task->state == ASM_SEND_STREAM || + task->state == ASM_HANDOFF_PREP); +} + +/* Feed the migration client with the replication stream for the slot range. */ +void asmFeedMigrationClient(robj **argv, int argc) { + asmTask *task = NULL; + + if (server.cluster_enabled == 0 || listLength(asmManager->tasks) == 0) + return; + + /* Check if there is a migrate task that can receive replication stream. */ + task = listNodeValue(listFirst(asmManager->tasks)); + if (!asmCanFeedMigrationClient(task)) return; + + /* Ensure all arguments are converted to string encoding if necessary, + * since getSlotFromCommand expects them to be string-encoded. + * Generally the arguments are string-encoded, but we may rewrite + * the command arguments to integer encoding. */ + for (int i = 0; i < argc; i++) { + if (!sdsEncodedObject(argv[i])) { + serverAssert(argv[i]->encoding == OBJ_ENCODING_INT); + robj *old = argv[i]; + argv[i] = createStringObjectFromLongLongWithSds((long)old->ptr); + decrRefCount(old); + } + } + + /* Check if the command belongs to the slot range. */ + struct redisCommand *cmd = lookupCommand(argv, argc); + serverAssert(cmd); + + int slot = getSlotFromCommand(cmd, argv, argc); + + /* If the command does not have keys, skip it now. + * SELECT is not propagated, since we only support a single db in cluster mode. + * MULTI/EXEC is not needed, since transaction semantics are unnecessary + * before the slot handoff. + * FUNCTION subcommands should be executed on all nodes, so here we skip it, + * and even propagating them may cause an error when executing. + * + * NOTICE: if some keyless commands should be propagated to the destination, + * we should identify them here and send. */ + if (slot == GETSLOT_NOKEYS) return; + + /* Generally we reject cross-slot commands before executing, but module may + * replicate this kind of command, so we check again. To guarantee data + * consistency, we cancel the task if we encounter a cross-slot command. */ + if (slot == GETSLOT_CROSSSLOT) { + /* We cannot cancel the task directly here, since it may lead to a recursive + * call: asmTaskCancel() --> moduleFireServerEvent() --> moduleFreeContext() + * --> postExecutionUnitOperations() --> propagateNow(). Even worse, this + * could result in propagating pending commands to the replication stream twice. + * To avoid this, we simply set a flag here, cancel the task in beforeSleep. */ + task->cross_slot_during_propagating = 1; + return; + } + + /* Check if the slot belongs to the task's slot range. */ + slotRange sr = {slot, slot}; + if (!slotRangeArrayOverlaps(task->slots, &sr)) return; + + if (unlikely(asmDebugIsFailPointActive(ASM_MIGRATE_MAIN_CHANNEL, task->state))) + freeClientAsync(task->main_channel_client); + + /* Feed main channel with the command. */ + client *c = task->main_channel_client; + size_t prev_bytes = getNormalClientPendingReplyBytes(c); + + addReplyArrayLen(c, argc); + for (int i = 0; i < argc; i++) + addReplyBulk(c, argv[i]); + + /* Update the task's source offset to reflect the bytes sent. */ + task->source_offset += (getNormalClientPendingReplyBytes(c) - prev_bytes); +} + +asmTask *asmCreateImportTask(const char *task_id, slotRangeArray *slots, sds *err) { + clusterNode *source; + + *err = NULL; + /* Validate that the slot ranges are valid and that migration can be + * initiated for them. */ + source = validateImportSlotRanges(slots, err, NULL); + if (!source) + return NULL; + + if (source == getMyClusterNode()) { + *err = sdsnew("this node is already the owner of the slot range"); + return NULL; + } + + /* Only support a single task at a time now. */ + if (listLength(asmManager->tasks) != 0) { + asmTask *current = listNodeValue(listFirst(asmManager->tasks)); + if (current->state == ASM_FAILED) { + /* We can create a new import task only if the current one is failed, + * cancel the failed task to create a new one. */ + asmTaskCancel(current, "new import requested"); + } else { + *err = sdsnew("another ASM task is already in progress"); + return NULL; + } + } + /* There should be no task in progress. */ + serverAssert(listLength(asmManager->tasks) == 0); + + /* Create a slot migration task */ + asmTask *task = asmTaskCreate(task_id); + task->slots = slotRangeArrayDup(slots); + task->state = ASM_NONE; + task->operation = ASM_IMPORT; + task->source_node = source; + memcpy(task->source, source->name, CLUSTER_NAMELEN); + memcpy(task->dest, getMyClusterId(), CLUSTER_NAMELEN); + + listAddNodeTail(asmManager->tasks, task); + sds slots_str = slotRangeArrayToString(slots); + serverLog(LL_NOTICE, "Import task %s created: src=%.40s, dest=%.40s, slots=%s", + task->id, task->source, task->dest, slots_str); + sdsfree(slots_str); + + return task; +} + +/* CLUSTER MIGRATION IMPORT + * + * Sent by operator to the destination node to start the migration. */ +static void clusterMigrationCommandImport(client *c) { + /* Validate slot range arg count */ + int remaining = c->argc - 3; + if (remaining == 0 || remaining % 2 != 0) { + addReplyErrorArity(c); + return; + } + + slotRangeArray *slots = parseSlotRangesOrReply(c, c->argc, 3); + if (!slots) return; + + sds err = NULL; + asmTask *task = asmCreateImportTask(NULL, slots, &err); + slotRangeArrayFree(slots); + if (!task) { + addReplyErrorSds(c, err); + return; + } + + addReplyBulkCString(c, task->id); +} + +/* CLUSTER MIGRATION CANCEL [ID | ALL] + * - Reply: Number of cancelled tasks + * + * Cancels import tasks that overlap with the specified slot ranges. + * Multiple tasks may be cancelled. */ +static void clusterMigrationCommandCancel(client *c) { + sds task_id = NULL; + int num_cancelled = 0; + + /* Validate slot range arg count */ + if (c->argc != 4 && c->argc != 5) { + addReplyErrorArity(c); + return; + } + + if (!strcasecmp(c->argv[3]->ptr, "id")) { + if (c->argc != 5) { + addReplyErrorArity(c); + return; + } + task_id = c->argv[4]->ptr; + } else if (!strcasecmp(c->argv[3]->ptr, "all")) { + if (c->argc != 4) { + addReplyErrorArity(c); + return; + } + } else { + addReplyError(c, "unknown argument"); + return; + } + + num_cancelled = clusterAsmCancel(task_id, "user request"); + addReplyLongLong(c, num_cancelled); +} + +/* Reply with the status of the task. */ +static void replyTaskStatus(client *c, asmTask *task) { + mstime_t p = 0; + + addReplyMapLen(c, 12); + addReplyBulkCString(c, "id"); + addReplyBulkCString(c, task->id); + addReplyBulkCString(c, "slots"); + addReplyBulkSds(c, slotRangeArrayToString(task->slots)); + addReplyBulkCString(c, "source"); + addReplyBulkCBuffer(c, task->source, CLUSTER_NAMELEN); + addReplyBulkCString(c, "dest"); + addReplyBulkCBuffer(c, task->dest, CLUSTER_NAMELEN); + addReplyBulkCString(c, "operation"); + addReplyBulkCString(c, task->operation == ASM_IMPORT ? "import" : "migrate"); + addReplyBulkCString(c, "state"); + addReplyBulkCString(c, asmTaskStateToString(task->state)); + addReplyBulkCString(c, "last_error"); + addReplyBulkCBuffer(c, task->error, sdslen(task->error)); + addReplyBulkCString(c, "retries"); + addReplyLongLong(c, task->retry_count); + addReplyBulkCString(c, "create_time"); + addReplyLongLong(c, task->create_time); + addReplyBulkCString(c, "start_time"); + addReplyLongLong(c, task->start_time); + addReplyBulkCString(c, "end_time"); + addReplyLongLong(c, task->end_time); + + if (task->operation == ASM_MIGRATE && task->state == ASM_COMPLETED) + p = task->end_time - task->paused_time; + addReplyBulkCString(c, "write_pause_ms"); + addReplyLongLong(c, p); +} + +/* CLUSTER MIGRATION STATUS [ID | ALL] + * - Reply: Array of atomic slot migration tasks */ +static void clusterMigrationCommandStatus(client *c) { + listIter li; + listNode *ln; + + if (c->argc != 4 && c->argc != 5) { + addReplyErrorArity(c); + return; + } + + if (!strcasecmp(c->argv[3]->ptr, "id")) { + if (c->argc != 5) { + addReplyErrorArity(c); + return; + } + sds id = c->argv[4]->ptr; + asmTask *task = asmLookupTaskAt(asmManager->tasks, id); + if (!task) task = asmLookupTaskAt(asmManager->archived_tasks, id); + if (!task) { + addReplyArrayLen(c, 0); + return; + } + + addReplyArrayLen(c, 1); + replyTaskStatus(c, task); + } else if (!strcasecmp(c->argv[3]->ptr, "all")) { + if (c->argc != 4) { + addReplyErrorArity(c); + return; + } + addReplyArrayLen(c, listLength(asmManager->tasks) + + listLength(asmManager->archived_tasks)); + listRewind(asmManager->tasks, &li); + while ((ln = listNext(&li)) != NULL) + replyTaskStatus(c, listNodeValue(ln)); + + listRewind(asmManager->archived_tasks, &li); + while ((ln = listNext(&li)) != NULL) + replyTaskStatus(c, listNodeValue(ln)); + } else { + addReplyError(c, "unknown argument"); + return; + } +} + +/* CLUSTER MIGRATION + * | + * STATUS [ID | ALL] | + * CANCEL [ID | ALL]> +*/ +void clusterMigrationCommand(client *c) { + if (c->argc < 4) { + addReplyErrorArity(c); + return; + } + + if (strcasecmp(c->argv[2]->ptr, "import") == 0) { + clusterMigrationCommandImport(c); + } else if (strcasecmp(c->argv[2]->ptr, "status") == 0) { + clusterMigrationCommandStatus(c); + } else if (strcasecmp(c->argv[2]->ptr, "cancel") == 0) { + clusterMigrationCommandCancel(c); + } else { + addReplyError(c, "unknown argument"); + } +} + +/* Notify the state change to the module and the cluster implementation. */ +void asmNotifyStateChange(asmTask *task, int event) { + RedisModuleClusterSlotMigrationInfo info = { + .version = REDISMODULE_CLUSTER_SLOT_MIGRATION_INFO_VERSION, + .task_id = task->id, + .slots = (RedisModuleSlotRangeArray *) task->slots + }; + memcpy(info.source_node_id, task->source, CLUSTER_NAMELEN); + memcpy(info.destination_node_id, task->dest, CLUSTER_NAMELEN); + + int module_event = -1; + if (event == ASM_EVENT_IMPORT_STARTED) module_event = REDISMODULE_SUBEVENT_CLUSTER_SLOT_MIGRATION_IMPORT_STARTED; + else if (event == ASM_EVENT_IMPORT_COMPLETED) module_event = REDISMODULE_SUBEVENT_CLUSTER_SLOT_MIGRATION_IMPORT_COMPLETED; + else if (event == ASM_EVENT_IMPORT_FAILED) module_event = REDISMODULE_SUBEVENT_CLUSTER_SLOT_MIGRATION_IMPORT_FAILED; + else if (event == ASM_EVENT_MIGRATE_STARTED) module_event = REDISMODULE_SUBEVENT_CLUSTER_SLOT_MIGRATION_MIGRATE_STARTED; + else if (event == ASM_EVENT_MIGRATE_COMPLETED) module_event = REDISMODULE_SUBEVENT_CLUSTER_SLOT_MIGRATION_MIGRATE_COMPLETED; + else if (event == ASM_EVENT_MIGRATE_FAILED) module_event = REDISMODULE_SUBEVENT_CLUSTER_SLOT_MIGRATION_MIGRATE_FAILED; + serverAssert(module_event != -1); + + moduleFireServerEvent(REDISMODULE_EVENT_CLUSTER_SLOT_MIGRATION, module_event, &info); + serverLog(LL_DEBUG, "Fire cluster asm module event, task %s: state=%s", + task->id, asmTaskStateToString(task->state)); + + if (clusterNodeIsMaster(getMyClusterNode())) { + /* Notify the cluster impl only if it is a real active import task. */ + if (task != asmManager->master_task) + clusterAsmOnEvent(task->id, event, task->slots); + asmNotifyReplicasStateChange(task); /* Propagate state change to replicas */ + } +} + +void asmImportSetFailed(asmTask *task) { + serverAssert(task->operation == ASM_IMPORT); + if (task->state == ASM_FAILED) return; + + /* If we are in the RDB channel transfer state, we need to + * close the client that was created for the RDB channel. */ + if (task->rdb_channel_conn && task->rdb_channel_state == ASM_RDBCHANNEL_TRANSFER) { + client *c = connGetPrivateData(task->rdb_channel_conn); + serverAssert(c->task == task); + task->rdb_channel_conn = NULL; + c->task = NULL; + c->flags &= ~CLIENT_MASTER; + freeClientAsync(c); + } + + /* If in the wait stream EOF or streaming buffer state, we need to close the + * client that was created for the main channel. */ + if (task->main_channel_conn && + (task->state == ASM_STREAMING_BUF || task->state == ASM_WAIT_STREAM_EOF)) + { + client *c = connGetPrivateData(task->main_channel_conn); + serverAssert(c->task == task); + task->main_channel_conn = NULL; + c->task = NULL; + c->flags &= ~CLIENT_MASTER; + freeClientAsync(c); + } + + /* Close the connections */ + if (task->rdb_channel_conn) connClose(task->rdb_channel_conn); + if (task->main_channel_conn) connClose(task->main_channel_conn); + task->rdb_channel_conn = NULL; + task->main_channel_conn = NULL; + + /* Clear the replication data buffer */ + asmManager->sync_buffer_peak = max(asmManager->sync_buffer_peak, task->sync_buffer.peak); + replDataBufClear(&task->sync_buffer); + + /* Mark the task as failed and notify the cluster */ + task->state = ASM_FAILED; + asmNotifyStateChange(task, ASM_EVENT_IMPORT_FAILED); + /* This node may become replica, only master can setup new slot trimming jobs. */ + if (clusterNodeIsMaster(getMyClusterNode())) + asmTrimJobSchedule(task->slots); +} + +void asmMigrateSetFailed(asmTask *task) { + serverAssert(task->operation == ASM_MIGRATE); + if (task->state == ASM_FAILED) return; + + /* Close the RDB and main channel clients*/ + if (task->rdb_channel_client) { + task->rdb_channel_client->task = NULL; + freeClientAsync(task->rdb_channel_client); + task->rdb_channel_client = NULL; + } + if (task->main_channel_client) { + task->main_channel_client->task = NULL; + freeClientAsync(task->main_channel_client); + task->main_channel_client = NULL; + } + + /* Actually it is not necessary to clear the sync buffer here, + * to make asmTaskReset work properly after migrate task failed */ + replDataBufClear(&task->sync_buffer); + + /* Mark the task as failed and notify the cluster */ + task->state = ASM_FAILED; + asmNotifyStateChange(task, ASM_EVENT_MIGRATE_FAILED); +} + +void asmTaskSetFailed(asmTask *task, const char *fmt, ...) { + va_list ap; + sds error = sdsempty(); + + /* Set the error message */ + va_start(ap, fmt); + error = sdscatvprintf(error, fmt, ap); + va_end(ap); + error = sdscatprintf(error, " (state: %s, rdb_channel_state: %s)", + asmTaskStateToString(task->state), + asmTaskStateToString(task->rdb_channel_state)); + sdsfree(task->error); + task->error = error; + + /* Log the error */ + sds slots_str = slotRangeArrayToString(task->slots); + serverLog(LL_WARNING, "%s task %s failed: slots=%s, err=%s", + task->operation == ASM_IMPORT ? "Import" : "Migrate", + task->id, slots_str, task->error); + sdsfree(slots_str); + + if (task->operation == ASM_IMPORT) + asmImportSetFailed(task); + else + asmMigrateSetFailed(task); +} + +/* The task is completed or canceled. Update stats and move it to + * the archived list. */ +void asmTaskFinalize(asmTask *task) { + listNode *ln = listFirst(asmManager->tasks); + serverAssert(ln->value == task); + + task->source_node = NULL; /* Should never access it */ + task->end_time = server.mstime; + + if (task->operation == ASM_IMPORT) { + asmManager->sync_buffer_peak = max(asmManager->sync_buffer_peak, + task->sync_buffer.peak); + replDataBufClear(&task->sync_buffer); /* Not used, so save memory */ + } + + /* Move the task to the archived list */ + listUnlinkNode(asmManager->tasks, ln); + listLinkNodeHead(asmManager->archived_tasks, ln); +} + +static void asmTaskCancel(asmTask *task, const char *reason) { + if (task->state == ASM_CANCELED) return; + + asmTaskSetFailed(task, "Cancelled due to %s", reason); + task->state = ASM_CANCELED; + asmTaskFinalize(task); +} + +void asmImportTakeover(asmTask *task) { + serverAssert(task->state == ASM_WAIT_STREAM_EOF || + task->state == ASM_STREAMING_BUF); + + /* Free the main channel connection since it is no longer needed. */ + serverAssert(task->main_channel_conn != NULL); + client *c = connGetPrivateData(task->main_channel_conn); + c->task = NULL; + c->flags &= ~CLIENT_MASTER; + freeClientAsync(c); + task->main_channel_conn = NULL; + + task->state = ASM_TAKEOVER; + clusterAsmOnEvent(task->id, ASM_EVENT_TAKEOVER, NULL); +} + +void asmCallbackOnFreeClient(client *c) { + asmTask *task = c->task; + if (!task) return; + + /* If the RDB channel connection is closed, mark the task as failed. */ + if (c->conn && task->rdb_channel_conn == c->conn) { + /* We create the client only when transferring data on the RDB channel */ + serverAssert(task->rdb_channel_state == ASM_RDBCHANNEL_TRANSFER); + task->rdb_channel_conn = NULL; /* Will be freed by freeClient */ + c->flags &= ~CLIENT_MASTER; + asmTaskSetFailed(task, "RDB channel - Connection is closed"); + return; + } + + if (c->conn && task->main_channel_conn == c->conn) { + /* After or in the process of streaming buffer to DB, a client will be + * created based on the main channel connection. */ + serverAssert(task->state == ASM_STREAMING_BUF || + task->state == ASM_WAIT_STREAM_EOF); + task->main_channel_conn = NULL; /* Will be freed by freeClient */ + c->flags &= ~CLIENT_MASTER; + asmTaskSetFailed(task, "Main channel - Connection is closed"); + return; + } + + if (c == task->rdb_channel_client) { + /* TODO: Detect whether the bgsave is completed successfully and + * update the state properly. */ + task->rdb_channel_state = ASM_COMPLETED; + /* We may not have detected whether the child process has exited yet, + * so we can't determine whether the client has completed the slots + * snapshot transfer. If the RDB channel is interrupted unexpectedly, + * the destination side will also close the main channel. + * So here we just reset the RDB channel client of task. */ + task->rdb_channel_client = NULL; + return; + } + + /* If the main channel client is closed, we need to mark the task as failed + * and clean up the RDB channel client if it exists. */ + if (c == task->main_channel_client) { + task->main_channel_client = NULL; + /* The rdb channel client will be cleaned up */ + asmTaskSetFailed(task, "Main and RDB channel clients are disconnected."); + return; + } +} + +/* Sends an AUTH command to the source node using the internal secret. + * Returns an error string if the command fails, or NULL on success. */ +char *asmSendInternalAuth(connection *conn) { + size_t len = 0; + const char *internal_secret = clusterGetSecret(&len); + serverAssert(internal_secret != NULL); + + sds secret = sdsnewlen(internal_secret, len); + char *err = sendCommand(conn, "AUTH", "internal connection", secret, NULL); + sdsfree(secret); + return err; +} + +/* Handles the RDB channel sync with the source node. + * This function is called when the RDB channel is established + * and ready to sync with the source node. */ +void asmRdbChannelSyncWithSource(connection *conn) { + asmTask *task = connGetPrivateData(conn); + char *err = NULL; + sds task_error_msg = NULL; + + /* Check for errors in the socket: after a non blocking connect() we + * may find that the socket is in error state. */ + if (connGetState(conn) != CONN_STATE_CONNECTED) + goto error; + + /* Check if the task is in a fail point state */ + if (unlikely(asmDebugIsFailPointActive(ASM_IMPORT_RDB_CHANNEL, task->rdb_channel_state))) { + char buf[1]; + /* Simulate a failure by shutting down the connection. On some operating + * systems (e.g. Linux), the socket's receive buffer is not flushed + * immediately, so we issue a dummy read to drain any pending data and + * surface the error condition. + * using shutdown() instead of connShutdown() because connTLSShutdown() + * will free the connection directly, which is not what we want. */ + shutdown(conn->fd, SHUT_RDWR); + connRead(conn, buf, 1); + } + + if (task->rdb_channel_state == ASM_CONNECTING) { + connSetReadHandler(conn, asmRdbChannelSyncWithSource); + connSetWriteHandler(conn, NULL); + + /* Send AUTH command to source node using internal auth */ + err = asmSendInternalAuth(conn); + if (err) goto write_error; + task->rdb_channel_state = ASM_AUTH_REPLY; + return; + } + + if (task->rdb_channel_state == ASM_AUTH_REPLY) { + err = receiveSynchronousResponse(conn); + /* The source node did not reply */ + if (err == NULL) goto no_response_error; + + /* Check `+OK` reply */ + if (!strcmp(err, "+OK")) { + sdsfree(err); + err = NULL; + task->rdb_channel_state = ASM_RDBCHANNEL_REQUEST; + serverLog(LL_NOTICE, "Source node replied to AUTH command, syncslots rdb channel operation can continue..."); + } else { + task_error_msg = sdscatprintf(sdsempty(), + "Error reply to AUTH from source: %s", err); + sdsfree(err); + goto error; + } + } + + if (task->rdb_channel_state == ASM_RDBCHANNEL_REQUEST) { + err = sendCommand(conn, "CLUSTER", "SYNCSLOTS", "RDBCHANNEL", task->id, NULL); + if (err) goto write_error; + task->rdb_channel_state = ASM_RDBCHANNEL_REPLY; + return; + } + + if (task->rdb_channel_state == ASM_RDBCHANNEL_REPLY) { + err = receiveSynchronousResponse(conn); + /* The source node did not reply */ + if (err == NULL) goto no_response_error; + + /* Ignore ‘\n' sent from the source node to keep the connection alive. */ + if (sdslen(err) == 0) { + serverLog(LL_DEBUG, "Received an empty line in RDBCHANNEL reply, slots snapshot delivery will start later"); + sdsfree(err); + return; + } + + /* Check `+SLOTSSNAPSHOT` reply */ + if (!strncmp(err, "+SLOTSSNAPSHOT", strlen("+SLOTSSNAPSHOT"))) { + sdsfree(err); + err = NULL; + task->state = ASM_ACCUMULATE_BUF; + /* The main channel buffers pending commands. */ + connSetReadHandler(task->main_channel_conn, asmSyncBufferReadFromConn); + + task->rdb_channel_state = ASM_RDBCHANNEL_TRANSFER; + client *c = createClient(conn); + c->flags |= (CLIENT_MASTER | CLIENT_INTERNAL | CLIENT_ASM_IMPORTING); + c->querybuf = sdsempty(); + c->authenticated = 1; + c->user = NULL; + c->task = task; + serverLog(LL_NOTICE, + "Source node replied to SLOTSSNAPSHOT, syncing slots snapshot can continue..."); + } else { + task_error_msg = sdscatprintf(sdsempty(), + "Error reply to CLUSTER SYNCSLOTS RDBCHANNEL from the source: %s", err); + sdsfree(err); + goto error; + } + return; + } + return; + +no_response_error: + task_error_msg = sdsnew("Source node did not respond to command during RDBCHANNELSYNCSLOTS handshake"); + /* Fall through to regular error handling */ + +error: + asmTaskSetFailed(task, "RDB channel - Failed to sync with the source node: %s", + task_error_msg ? task_error_msg : connGetLastError(conn)); + sdsfree(task_error_msg); + return; + +write_error: /* Handle sendCommand() errors. */ + task_error_msg = sdscatprintf(sdsempty(), "Failed to send command to the source node: %s", err); + sdsfree(err); + goto error; +} + +char *asmSendSlotRangesSync(connection *conn, asmTask *task) { + /* Prepare CLUSTER SYNCSLOTS SYNC command */ + serverAssert(task->slots->num_ranges <= CLUSTER_SLOTS); + int argc = task->slots->num_ranges * 2 + 4; + char **args = zcalloc(sizeof(char*) * argc); + size_t *lens = zcalloc(sizeof(size_t) * argc); + + args[0] = "CLUSTER"; + args[1] = "SYNCSLOTS"; + args[2] = "SYNC"; + args[3] = task->id; + lens[0] = strlen("CLUSTER"); + lens[1] = strlen("SYNCSLOTS"); + lens[2] = strlen("SYNC"); + lens[3] = sdslen(task->id); + + int i = 4; + for (int j = 0; j < task->slots->num_ranges; j++) { + slotRange *sr = &task->slots->ranges[j]; + args[i] = sdscatprintf(sdsempty(), "%d", sr->start); + lens[i] = sdslen(args[i]); + args[i+1] = sdscatprintf(sdsempty(), "%d", sr->end); + lens[i+1] = sdslen(args[i+1]); + i += 2; + } + serverAssert(i == argc); + + /* Send command to source node */ + char *err = sendCommandArgv(conn, argc, args, lens); + + /* Free allocated memory */ + for (int j = 4; j < argc; j++) { + sdsfree(args[j]); + } + zfree(args); + zfree(lens); + + return err; +} + +void asmSyncWithSource(connection *conn) { + asmTask *task = connGetPrivateData(conn); + char *err = NULL; + + /* Some task errors are not network issues, we record them explicitly. */ + sds task_error_msg = NULL; + + /* Check for errors in the socket: after a non blocking connect() we + * may find that the socket is in error state. */ + if (connGetState(conn) != CONN_STATE_CONNECTED) + goto error; + + /* Check if the fail point is active for this channel and state */ + if (unlikely(asmDebugIsFailPointActive(ASM_IMPORT_MAIN_CHANNEL, task->state))) { + char buf[1]; + shutdown(conn->fd, SHUT_RDWR); + connRead(conn, buf, 1); + } + + if (task->state == ASM_CONNECTING) { + connSetReadHandler(conn, asmSyncWithSource); + connSetWriteHandler(conn, NULL); + /* Send AUTH command to source node using internal auth */ + err = asmSendInternalAuth(conn); + if (err) goto write_error; + task->state = ASM_AUTH_REPLY; + return; + } + + if (task->state == ASM_AUTH_REPLY) { + err = receiveSynchronousResponse(conn); + /* The source node did not reply */ + if (err == NULL) goto no_response_error; + + /* Check `+OK` reply */ + if (!strcmp(err, "+OK")) { + sdsfree(err); + err = NULL; + task->state = ASM_SEND_HANDSHAKE; + serverLog(LL_NOTICE, "Source node replied to AUTH command, syncslots can continue..."); + } else { + task_error_msg = sdscatprintf(sdsempty(), + "Error reply to AUTH from the source: %s", err); + sdsfree(err); + goto error; + } + } + + if (task->state == ASM_SEND_HANDSHAKE) { + sds node_id = sdsnewlen(clusterNodeGetName(getMyClusterNode()), CLUSTER_NAMELEN); + err = sendCommand(conn, "CLUSTER", "SYNCSLOTS", "CONF", "NODE-ID", node_id, NULL); + sdsfree(node_id); + if (err) goto write_error; + task->state = ASM_HANDSHAKE_REPLY; + return; + } + + if (task->state == ASM_HANDSHAKE_REPLY) { + err = receiveSynchronousResponse(conn); + /* The source node did not reply */ + if (err == NULL) goto no_response_error; + + /* Check `+OK` reply */ + if (!strcmp(err, "+OK")) { + sdsfree(err); + err = NULL; + task->state = ASM_SEND_SYNCSLOTS; + serverLog(LL_NOTICE, "Source node replied to SYNCSLOTS CONF command, syncslots can continue..."); + } else { + task_error_msg = sdscatprintf(sdsempty(), + "Error reply to CLUSTER SYNCSLOTS CONF from the source: %s", err); + sdsfree(err); + goto error; + } + } + + if (task->state == ASM_SEND_SYNCSLOTS) { + err = asmSendSlotRangesSync(conn, task); + if (err) goto write_error; + + task->state = ASM_SYNCSLOTS_REPLY; + return; + } + + if (task->state == ASM_SYNCSLOTS_REPLY) { + err = receiveSynchronousResponse(conn); + /* The source node did not reply */ + if (err == NULL) goto no_response_error; + + /* Check `+RDBCHANNELSYNCSLOTS` reply */ + if (!strncmp(err, "+RDBCHANNELSYNCSLOTS", strlen("+RDBCHANNELSYNCSLOTS"))) { + sdsfree(err); + err = NULL; + task->state = ASM_INIT_RDBCHANNEL; + serverLog(LL_NOTICE, + "Source node replied to RDBCHANNELSYNCSLOTS, syncslots can continue..."); + } else { + task_error_msg = sdscatprintf(sdsempty(), + "Error reply to CLUSTER SYNCSLOTS SYNC from the source: %s", err); + sdsfree(err); + goto error; + } + } + + if (task->state == ASM_INIT_RDBCHANNEL) { + /* Create RDB channel connection */ + char *ip = clusterNodeIp(task->source_node); + int port = server.tls_replication ? clusterNodeTlsPort(task->source_node) : + clusterNodeTcpPort(task->source_node); + task->rdb_channel_conn = connCreate(server.el, connTypeOfReplication()); + if (connConnect(task->rdb_channel_conn, ip, port, + server.bind_source_addr, asmRdbChannelSyncWithSource) == C_ERR) + { + serverLog(LL_WARNING, "Unable to connect to the source node: %s", + connGetLastError(task->rdb_channel_conn)); + goto error; + } + task->rdb_channel_state = ASM_CONNECTING; + connSetPrivateData(task->rdb_channel_conn, task); + serverLog(LL_NOTICE, + "RDB channel connection to source node %.40s established, waiting for AUTH reply...", + task->source); + + /* Main channel waits for the new event */ + connSetReadHandler(conn, NULL); + return; + } + return; + +no_response_error: + serverLog(LL_WARNING, "Source node did not respond to command during SYNCSLOTS handshake"); + /* Fall through to regular error handling */ + +error: + asmTaskSetFailed(task, "Main channel - Failed to sync with source node: %s", + task_error_msg ? task_error_msg : connGetLastError(conn)); + sdsfree(task_error_msg); + return; + +write_error: /* Handle sendCommand() errors. */ + serverLog(LL_WARNING, "Failed to send command to source node: %s", err); + sdsfree(err); + goto error; +} + +int asmImportSendACK(asmTask *task) { + serverAssert(task->operation == ASM_IMPORT && task->state == ASM_WAIT_STREAM_EOF); + serverLog(LL_DEBUG, "Destination node applied offset is %lld", task->dest_offset); + + char offset[64]; + ull2string(offset, sizeof(offset), task->dest_offset); + + char *err = sendCommand(task->main_channel_conn, "CLUSTER", "SYNCSLOTS", "ACK", + asmTaskStateToString(task->state), offset, NULL); + if (err) { + asmTaskSetFailed(task, "Main channel - Failed to send ACK: %s", err); + sdsfree(err); + return C_ERR; + } + return C_OK; +} + +/* Called when the RDB channel begins sending the snapshot. + * From this point on, the main channel also starts sending incremental streams. */ +void asmSlotSnapshotAndStreamStart(struct asmTask *task) { + if (task == NULL || task->state != ASM_WAIT_BGSAVE_START) return; + + if (unlikely(asmDebugIsFailPointActive(ASM_MIGRATE_RDB_CHANNEL, task->state))) { + shutdown(task->rdb_channel_client->conn->fd, SHUT_RDWR); + return; + } + task->main_channel_client->replstate = SLAVE_STATE_SEND_BULK_AND_STREAM; + + task->state = ASM_SEND_BULK_AND_STREAM; + task->rdb_channel_state = ASM_RDBCHANNEL_TRANSFER; + + /* From the source node's perspective, the destination node begins to accumulate + * the buffer while the RDB channel starts applying the slot snapshot data. */ + task->dest_state = ASM_ACCUMULATE_BUF; + task->dest_slots_snapshot_time = server.mstime; +} + +/* Called when the RDB channel has succeeded in sending the snapshot. */ +void asmSlotSnapshotSucceed(struct asmTask *task) { + if (task == NULL || task->state != ASM_SEND_BULK_AND_STREAM) return; + + /* The destination starts sending ACKs to keep the main channel alive after + * receiving the snapshot, so here we need to update the last interaction + * time to avoid false timeout. */ + task->main_channel_client->lastinteraction = server.unixtime; + + task->state = ASM_SEND_STREAM; + task->rdb_channel_state = ASM_COMPLETED; +} + +/* Called when the RDB channel fails to send the snapshot. */ +void asmSlotSnapshotFailed(struct asmTask *task) { + if (task == NULL || task->state != ASM_SEND_BULK_AND_STREAM) return; + + asmTaskSetFailed(task, "RDB channel - Failed to send slots snapshot"); +} + +/* CLUSTER SYNCSLOTS SNAPSHOT-EOF + * + * This command is sent by the source node to the destination node to indicate + * that the slots snapshot has ended. */ +void clusterSyncSlotsSnapshotEOF(client *c) { + /* This client is RDB channel connection. */ + asmTask *task = c->task; + if (!task || task->rdb_channel_state != ASM_RDBCHANNEL_TRANSFER || + c->conn != task->rdb_channel_conn) + { + /* Unexpected SNAPSHOT-EOF command */ + serverLog(LL_WARNING, "Unexpected CLUSTER SYNCSLOTS SNAPSHOT-EOF command: " + "rdb_channel_state=%s", + asmTaskStateToString(task ? task->rdb_channel_state : ASM_NONE)); + freeClientAsync(c); + return; + } + + /* RDB channel state: ASM_RDBCHANNEL_TRANSFER */ + if (unlikely(asmDebugIsFailPointActive(ASM_IMPORT_RDB_CHANNEL, task->rdb_channel_state))) { + freeClientAsync(c); /* Simulate a failure */ + return; + } + + /* Clear the RDB channel connection */ + task->rdb_channel_conn = NULL; + task->rdb_channel_state = ASM_COMPLETED; + serverLog(LL_NOTICE, "RDB channel snapshot transfer completed for the import task."); + + /* Free the RDB channel connection. */ + c->task = NULL; + c->flags &= ~CLIENT_MASTER; + freeClientAsync(c); + + /* Will start streaming the buffer to DB, don't start here since now + * we are in the context of executing command, otherwise, redis will + * generate a big MULTI-EXEC including all the commands in the buffer. + * just update the state here, and do it in beforeSleep(). */ + task->state = ASM_READY_TO_STREAM; + connSetReadHandler(task->main_channel_conn, NULL); +} + +/* CLUSTER SYNCSLOTS STREAM-EOF + * + * This command is sent by the source node to the destination node to indicate + * that the slot sync stream has ended and the slots can be handed off. */ +void clusterSyncSlotsStreamEOF(client *c) { + asmTask *task = c->task; + + if (!task || task->operation != ASM_IMPORT) { + serverLog(LL_WARNING, "Unexpected CLUSTER SYNCSLOTS STREAM-EOF command"); + freeClientAsync(c); + return; + } + + if (task->state == ASM_STREAMING_BUF) { + /* We are still streaming the buffer to DB, mark the EOF received, and we + * can take over after streaming is EOF. Since we may release the context + * in asmImportTakeover, this breaks the context for streaming buffer. */ + task->stream_eof_during_streaming = 1; + serverLog(LL_NOTICE, "CLUSTER SYNCSLOTS STREAM-EOF received during streaming buffer"); + return; + } + + if (task->state != ASM_WAIT_STREAM_EOF) { + serverLog(LL_WARNING, "Unexpected CLUSTER SYNCSLOTS STREAM-EOF state: %s", + asmTaskStateToString(task->state)); + freeClientAsync(c); + return; + } + serverLog(LL_NOTICE, "CLUSTER SYNCSLOTS STREAM-EOF received when waiting for STREAM-EOF"); + + /* STREAM-EOF received, the source is ready to handoff, takeover now. */ + asmImportTakeover(task); +} + +/* Start the import task. */ +static void asmStartImportTask(asmTask *task) { + if (task->operation != ASM_IMPORT || task->state != ASM_NONE) return; + sds slots_str = slotRangeArrayToString(task->slots); + + /* Sanity check: Clean up any keys that exist in slots not owned by this node. + * This handles cases where users previously migrated slots using legacy method + * but left behind orphaned keys, or maybe cluster missed cleaning up during + * previous operations, which could interfere with the ASM import process. */ + asmTrimSlotsIfNotOwned(task->slots); + + /* Check if there is any trim job in progress for the slot ranges. + * We can't start the import task since the trim job will modify the data.*/ + int trim_in_progress = asmIsAnyTrimJobOverlaps(task->slots); + + /* Notify the cluster implementation to prepare for the import task. */ + int impl_ret = clusterAsmOnEvent(task->id, ASM_EVENT_IMPORT_PREP, task->slots); + + static int start_blocked_logged = 0; + /* Cannot start import task since pause action is performed. Otherwise, we + * will break the promise that no writes are performed during the pause. */ + if (isPausedActions(PAUSE_ACTION_CLIENT_ALL) || + isPausedActions(PAUSE_ACTION_CLIENT_WRITE) || + trim_in_progress || + impl_ret != C_OK) + { + const char *reason = impl_ret != C_OK ? "cluster is not ready" : + trim_in_progress ? "trim in progress for some of the slots" : + "server paused"; + if (start_blocked_logged == 0) { + serverLog(LL_WARNING, "Can not start import task %s for slots: %s due to %s", + task->id, slots_str, reason); + start_blocked_logged = 1; + } + sdsfree(slots_str); + return; + } + start_blocked_logged = 0; /* Reset the log flag */ + + /* Detect if the cluster topology is changed. We should cancel the task if + * we can not schedule it, and update the source node if needed. */ + sds err = NULL; + clusterNode *source = validateImportSlotRanges(task->slots, &err, task); + if (!source) { + asmTaskCancel(task, err); + sdsfree(slots_str); + sdsfree(err); + return; + } + /* Now I'm the owner of the slot range, cancel the import task. */ + if (source == getMyClusterNode()) { + asmTaskCancel(task, "slots owned by myself now"); + sdsfree(slots_str); + return; + } + /* Change the source node if needed. */ + if (source != task->source_node) { + task->source_node = source; + memcpy(task->source, source->name, CLUSTER_NAMELEN); + serverLog(LL_NOTICE, "Import task %s source node changed: slots=%s, " + "new_source=%.40s", task->id, slots_str, source->name); + } + + serverLog(LL_NOTICE, "Import task %s starting: src=%.40s, dest=%.40s, slots=%s", + task->id, task->source, task->dest, slots_str); + sdsfree(slots_str); + + task->state = ASM_CONNECTING; + task->start_time = server.mstime; + asmNotifyStateChange(task, ASM_EVENT_IMPORT_STARTED); + + task->main_channel_conn = connCreate(server.el, connTypeOfReplication()); + char *ip = clusterNodeIp(task->source_node); + int port = server.tls_replication ? clusterNodeTlsPort(task->source_node) : + clusterNodeTcpPort(task->source_node); + if (connConnect(task->main_channel_conn, ip, port, server.bind_source_addr, + asmSyncWithSource) == C_ERR) + { + asmTaskSetFailed(task, "Main channel - Failed to connect to source node: %s", + connGetLastError(task->main_channel_conn)); + return; + } + connSetPrivateData(task->main_channel_conn, task); +} + +void clusterSyncSlotsCommand(client *c) { + /* Only internal clients are allowed to execute this command to avoid + * potential attack, since some state changes are not well protected, + * external clients may damage the slot migration state. */ + if (!(c->flags & (CLIENT_INTERNAL | CLIENT_MASTER))) { + addReplyError(c, "CLUSTER SYNCSLOTS subcommands are only allowed for internal clients"); + c->flags |= CLIENT_CLOSE_AFTER_REPLY; + return; + } + + /* On replica, only allow master client to execute CONF subcommand. */ + if (!clusterNodeIsMaster(getMyClusterNode())) { + if (!(c->flags & CLIENT_MASTER)) { + /* Not master client, reject all subcommands and close the connection. */ + addReplyError(c, "CLUSTER SYNCSLOTS subcommands are only allowed for master"); + c->flags |= CLIENT_CLOSE_AFTER_REPLY; + return; + } else { + /* Only allow CONF subcommand on replica. */ + if (strcasecmp(c->argv[2]->ptr, "conf")) return; + } + } + + if (!strcasecmp(c->argv[2]->ptr, "sync") && c->argc >= 6) { + /* CLUSTER SYNCSLOTS SYNC [ ] */ + if (c->argc % 2 == 1) { + addReplyErrorArity(c); + return; + } + + slotRangeArray *slots = parseSlotRangesOrReply(c, c->argc, 4); + if (!slots) return; + + /* Validate that the slot ranges are valid and that migration can be + * initiated for them. */ + sds err = NULL; + clusterNode *source = validateImportSlotRanges(slots, &err, NULL); + if (!source) { + addReplyErrorSds(c, err); + slotRangeArrayFree(slots); + return; + } + + /* Check if the source node is the same as the current node. */ + if (source != getMyClusterNode()) { + addReplyError(c, "This node is not the owner of the slots"); + slotRangeArrayFree(slots); + return; + } + + sds task_id = c->argv[3]->ptr; + /* Notify the cluster implementation to prepare for the migrate task. */ + if (clusterAsmOnEvent(task_id, ASM_EVENT_MIGRATE_PREP, slots) != C_OK) { + addReplyError(c, "Cluster is not ready right now, please retry later"); + slotRangeArrayFree(slots); + return; + } + + asmTask *task = listLength(asmManager->tasks) == 0 ? NULL : + listNodeValue(listFirst(asmManager->tasks)); + if (task && !strcmp(task->id, task_id) && + task->operation == ASM_MIGRATE && task->state == ASM_FAILED && + slotRangeArrayIsEqual(slots, task->slots) && + memcmp(task->dest, c->node_id, CLUSTER_NAMELEN) == 0) + { + /* Reuse the failed task */ + asmTaskReset(task); + slotRangeArrayFree(task->slots); /* Will be set again later */ + task->retry_count++; + } else if (task) { + if (task->state == ASM_FAILED) { + /* We can create a new migrate task only if the current one is + * failed, cancel the failed task to create a new one. */ + asmTaskCancel(task, "new migration requested"); + task = NULL; + } else { + addReplyError(c, "Another ASM task is already in progress"); + slotRangeArrayFree(slots); + return; + } + } + + /* Create the migrate slots task and add it to the list, + * otherwise reuse the existing one */ + if (task == NULL) { + task = asmTaskCreate(task_id); + task->start_time = server.mstime; /* Start immediately */ + serverAssert(listLength(asmManager->tasks) == 0); + listAddNodeTail(asmManager->tasks, task); + } + + task->slots = slots; + task->operation = ASM_MIGRATE; + memcpy(task->source, clusterNodeGetName(getMyClusterNode()), CLUSTER_NAMELEN); + if (c->node_id) memcpy(task->dest, c->node_id, CLUSTER_NAMELEN); + + task->main_channel_client = c; + c->task = task; + + /* We mark the main channel client as a replica, so this client is limited + * by the client output buffer settings for replicas. The replstate has + * no real significance, just to prevent it from going online. */ + c->flags |= (CLIENT_SLAVE | CLIENT_ASM_MIGRATING); + c->replstate = SLAVE_STATE_WAIT_RDB_CHANNEL; + if (server.repl_disable_tcp_nodelay) + connDisableTcpNoDelay(c->conn); /* Non-critical if it fails. */ + listAddNodeTail(server.slaves, c); + createReplicationBacklogIfNeeded(); + + /* Wait for RDB channel to be ready */ + task->state = ASM_WAIT_RDBCHANNEL; + + sds slots_str = slotRangeArrayToString(slots); + serverLog(LL_NOTICE, "Migrate task %s created: src=%.40s, dest=%.40s, slots=%s", + task->id, task->source, task->dest, slots_str); + sdsfree(slots_str); + + asmNotifyStateChange(task, ASM_EVENT_MIGRATE_STARTED); + + /* Keep the client in the main thread to avoid data races between the + * connWrite call below and the client's event handler in IO threads. */ + if (c->tid != IOTHREAD_MAIN_THREAD_ID) keepClientInMainThread(c); + + /* addReply*() is not suitable for clients in SLAVE_STATE_WAIT_RDB_CHANNEL state. */ + if (connWrite(c->conn, "+RDBCHANNELSYNCSLOTS\r\n", 22) != 22) + freeClientAsync(c); + } else if (!strcasecmp(c->argv[2]->ptr, "rdbchannel") && c->argc == 4) { + /* CLUSTER SYNCSLOTS RDBCHANNEL */ + sds task_id = c->argv[3]->ptr; + if (sdslen(task_id) != CLUSTER_NAMELEN) { + addReplyError(c, "Invalid task id"); + return; + } + + if (listLength(asmManager->tasks) == 0) { + addReplyError(c, "No slot migration task in progress"); + return; + } + + asmTask *task = listNodeValue(listFirst(asmManager->tasks)); + if (task->operation != ASM_MIGRATE || task->state != ASM_WAIT_RDBCHANNEL || + strcmp(task->id, task_id) != 0) + { + addReplyError(c, "Another migration task is already in progress"); + return; + } + + if (unlikely(asmDebugIsFailPointActive(ASM_MIGRATE_MAIN_CHANNEL, task->state))) { + /* Close the main channel client before rdb channel client connects */ + if (task->main_channel_client) + freeClient(task->main_channel_client); + } + + /* The main channel client must be present when setting RDB channel client */ + if (task->main_channel_client == NULL) { + /* Maybe the main channel connection is closed. */ + addReplyError(c, "Main channel connection is not established"); + return; + } + + /* Mark the client as a slave to generate slots snapshot */ + c->flags |= (CLIENT_SLAVE | CLIENT_REPL_RDB_CHANNEL | CLIENT_REPL_RDBONLY | CLIENT_ASM_MIGRATING); + c->slave_capa |= SLAVE_CAPA_EOF; + c->slave_req |= (SLAVE_REQ_SLOTS_SNAPSHOT | SLAVE_REQ_RDB_CHANNEL); + c->replstate = SLAVE_STATE_WAIT_BGSAVE_START; + c->repldbfd = -1; + if (server.repl_disable_tcp_nodelay) + connDisableTcpNoDelay(c->conn); /* Non-critical if it fails. */ + listAddNodeTail(server.slaves, c); + + /* Wait for bgsave to start for slots sync */ + task->state = ASM_WAIT_BGSAVE_START; + task->rdb_channel_state = ASM_WAIT_BGSAVE_START; + task->rdb_channel_client = c; + c->task = task; + + /* Keep the client in the main thread to avoid data races between the + * connWrite call in startBgsaveForReplication and the client's event + * handler in IO threads. */ + if (c->tid != IOTHREAD_MAIN_THREAD_ID) keepClientInMainThread(c); + + if (!hasActiveChildProcess()) { + startBgsaveForReplication(c->slave_capa, c->slave_req); + } else { + serverLog(LL_NOTICE, "BGSAVE for slots snapshot sync delayed"); + } + } else if (!strcasecmp(c->argv[2]->ptr, "snapshot-eof") && c->argc == 3) { + /* CLUSTER SYNCSLOTS SNAPSHOT-EOF */ + clusterSyncSlotsSnapshotEOF(c); + } else if (!strcasecmp(c->argv[2]->ptr, "stream-eof") && c->argc == 3) { + /* CLUSTER SYNCSLOTS STREAM-EOF */ + clusterSyncSlotsStreamEOF(c); + } else if (!strcasecmp(c->argv[2]->ptr, "ack") && c->argc == 5) { + /* CLUSTER SYNCSLOTS ACK */ + long long offset; + int dest_state; + + if (!strcasecmp(c->argv[3]->ptr, asmTaskStateToString(ASM_STREAMING_BUF))) { + dest_state = ASM_STREAMING_BUF; + } else if (!strcasecmp(c->argv[3]->ptr, asmTaskStateToString(ASM_WAIT_STREAM_EOF))) { + dest_state = ASM_WAIT_STREAM_EOF; + } else { + return; /* Not support now. */ + } + + if ((getLongLongFromObject(c->argv[4], &offset) != C_OK)) + return; + + if (c->task && c->task->operation == ASM_MIGRATE) { + /* Update the state and ACKed offset from destination. */ + asmTask *task = c->task; + task->dest_state = dest_state; + if (task->dest_offset > (unsigned long long) offset) { + serverLog(LL_WARNING, "CLUSTER SYNCSLOTS ACK received, dest state: %s, " + "but offset %lld is less than the current dest offset %lld", + asmTaskStateToString(dest_state), offset, task->dest_offset); + return; + } + task->dest_offset = offset; + serverLog(LL_DEBUG, "CLUSTER SYNCSLOTS ACK received, dest state: %s, " + "updated dest offset to %lld, source offset: %lld", + asmTaskStateToString(dest_state), task->dest_offset, task->source_offset); + + /* Record the time when the destination finishes applying the accumulated buffer */ + if (task->dest_state == ASM_WAIT_STREAM_EOF && task->dest_accum_applied_time == 0) + task->dest_accum_applied_time = server.mstime; + + /* Pause write if needed */ + if (task->state == ASM_SEND_BULK_AND_STREAM || task->state == ASM_SEND_STREAM) { + /* Pause writes on the main channel if the lag is less than the threshold. */ + if (task->dest_offset + server.asm_handoff_max_lag_bytes >= task->source_offset) { + if (unlikely(asmDebugIsFailPointActive(ASM_MIGRATE_MAIN_CHANNEL, ASM_HANDOFF_PREP))) + return; /* Do not enter handoff prep state for testing buffer drain timeout. */ + + serverLog(LL_NOTICE, "The applied offset lag %lld is less than the threshold %lld, " + "pausing writes for slot handoff", + task->source_offset - task->dest_offset, + server.asm_handoff_max_lag_bytes); + task->state = ASM_HANDOFF_PREP; + clusterAsmOnEvent(task->id, ASM_EVENT_HANDOFF_PREP, task->slots); + } + } + } + } else if (!strcasecmp(c->argv[2]->ptr, "fail") && c->argc == 4) { + /* CLUSTER SYNCSLOTS FAIL */ + return; /* This is a no-op, just to handle the command syntax. */ + } else if (!strcasecmp(c->argv[2]->ptr, "conf") && c->argc >= 5) { + /* CLUSTER SYNCSLOTS CONF