mirror of
https://github.com/redis/redis.git
synced 2025-12-18 21:46:08 -05:00
Support delay trimming slots after finishing migrating slots (#14567)
This PR introduces a mechanism that allows a module to temporarily
disable trimming after an ASM migration operation so it can safely
finish ongoing asynchronous jobs that depend on keys in migrating (and
about to be trimmed) slots.
1. **ClusterDisableTrim/ClusterEnableTrim**
We introduce `ClusterDisableTrim/ClusterEnableTrim` Module APIs to allow
module to disable/enable slot migration
```
/* Disable automatic slot trimming. */
int RM_ClusterDisableTrim(RedisModuleCtx *ctx)
/* Enable automatic slot trimming */
int RM_ClusterEnableTrim(RedisModuleCtx *ctx)
```
**Please notice**: Redis will not start any subsequent import or migrate
ASM operations while slot trimming is disabled, so modules must
re-enable trimming immediately after completing their pending work.
The only valid and meaningful time for a module to disable trimming
appears to be after the MIGRATE_COMPLETED event.
2. **REDISMODULE_OPEN_KEY_ACCESS_TRIMMED**
Added REDISMODULE_OPEN_KEY_ACCESS_TRIMMED to RM_OpenKey() so that module
can operate with these keys in the unowned slots after trim is paused.
And now we don't delete the key if it is in trim job when we access it.
And `expireIfNeeded` returns `KEY_VALID` if
`EXPIRE_ALLOW_ACCESS_TRIMMED` is set, otherwise, returns `KEY_TRIMMED`
without deleting key.
3. **REDISMODULE_CTX_FLAGS_TRIM_IN_PROGRESS**
We also extend RM_GetContextFlags() to include a flag
REDISMODULE_CTX_FLAGS_TRIM_IN_PROGRESS indicating whether a trimming job
is pending (due to trim pause) or in progress. Modules could
periodically poll this flag to synchronize their internal state, e.g.,
if a trim job was delayed or if the module incorrectly assumed trimming
was still active.
Bugfix: RM_SetClusterFlags could not clear a flag after enabling it first.
---------
Co-authored-by: Ozan Tezcan <ozantezcan@gmail.com>
This commit is contained in:
parent
ddbd96d8ae
commit
33391a7b61
9 changed files with 276 additions and 59 deletions
|
|
@ -1705,15 +1705,20 @@ static void asmStartImportTask(asmTask *task) {
|
|||
/* Notify the cluster implementation to prepare for the import task. */
|
||||
int impl_ret = clusterAsmOnEvent(task->id, ASM_EVENT_IMPORT_PREP, task->slots);
|
||||
|
||||
/* We do not start the import task if trim is disabled by module. */
|
||||
int disabled_by_module = server.cluster_module_trim_disablers > 0;
|
||||
|
||||
static int start_blocked_logged = 0;
|
||||
/* Cannot start import task since pause action is performed. Otherwise, we
|
||||
* will break the promise that no writes are performed during the pause. */
|
||||
if (isPausedActions(PAUSE_ACTION_CLIENT_ALL) ||
|
||||
isPausedActions(PAUSE_ACTION_CLIENT_WRITE) ||
|
||||
trim_in_progress ||
|
||||
impl_ret != C_OK)
|
||||
impl_ret != C_OK ||
|
||||
disabled_by_module)
|
||||
{
|
||||
const char *reason = impl_ret != C_OK ? "cluster is not ready" :
|
||||
const char *reason = disabled_by_module ? "trim is disabled by module" :
|
||||
impl_ret != C_OK ? "cluster is not ready" :
|
||||
trim_in_progress ? "trim in progress for some of the slots" :
|
||||
"server paused";
|
||||
if (start_blocked_logged == 0) {
|
||||
|
|
@ -1842,6 +1847,14 @@ void clusterSyncSlotsCommand(client *c) {
|
|||
return;
|
||||
}
|
||||
|
||||
/* We do not start the migrate task if trim is disabled by module. */
|
||||
int disabled_by_module = server.cluster_module_trim_disablers > 0;
|
||||
if (disabled_by_module) {
|
||||
addReplyError(c, "Trim is disabled by module");
|
||||
slotRangeArrayFree(slots);
|
||||
return;
|
||||
}
|
||||
|
||||
asmTask *task = listLength(asmManager->tasks) == 0 ? NULL :
|
||||
listNodeValue(listFirst(asmManager->tasks));
|
||||
if (task && !strcmp(task->id, task_id) &&
|
||||
|
|
@ -2946,6 +2959,12 @@ void asmTrimSlots(slotRangeArray *slots) {
|
|||
* in propagateNow(), as propagation is not allowed during a write pause. */
|
||||
void asmTrimJobSchedule(slotRangeArray *slots) {
|
||||
listAddNodeTail(asmManager->pending_trim_jobs, slotRangeArrayDup(slots));
|
||||
|
||||
/* If we call this function from beforeSleep, or cluster gossip message
|
||||
* handlers instead of normal command handlers, we can try to process the
|
||||
* trim job immediately. */
|
||||
if (server.execution_nesting == 0)
|
||||
asmTrimJobProcessPending();
|
||||
}
|
||||
|
||||
/* Process any pending trim jobs. */
|
||||
|
|
@ -2959,15 +2978,22 @@ void asmTrimJobProcessPending(void) {
|
|||
|
||||
/* Determine if we can start the trim job:
|
||||
* - require client writes not paused (so key deletions are allowed)
|
||||
* - require replicas not paused (so TRIMSLOTS can be propagated). */
|
||||
* - require replicas not paused (so TRIMSLOTS can be propagated).
|
||||
* - require trim is not disabled via RedisModule_ClusterDisableTrim().
|
||||
*/
|
||||
static int logged = 0;
|
||||
int disabled_by_module = server.cluster_module_trim_disablers > 0;
|
||||
|
||||
if (isPausedActions(PAUSE_ACTION_CLIENT_WRITE) ||
|
||||
isPausedActions(PAUSE_ACTION_CLIENT_ALL) ||
|
||||
isPausedActions(PAUSE_ACTION_REPLICA))
|
||||
isPausedActions(PAUSE_ACTION_REPLICA) ||
|
||||
disabled_by_module)
|
||||
{
|
||||
if (logged == 0) {
|
||||
logged = 1;
|
||||
serverLog(LL_NOTICE, "Trim job will start after the write pause is lifted.");
|
||||
const char *reason = disabled_by_module ? "trim is disabled by module" :
|
||||
"pause action is in effect";
|
||||
serverLog(LL_NOTICE, "Trim job is deferred since %s.", reason);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
|
@ -3345,18 +3371,23 @@ void asmActiveTrimCycle(void) {
|
|||
return;
|
||||
}
|
||||
|
||||
/* Verify client pause is not in effect so we can delete keys. */
|
||||
/* Verify client pause is not in effect and trim is not disabled by module,
|
||||
* so we can delete keys. */
|
||||
static int blocked = 0;
|
||||
int disabled_by_module = server.cluster_module_trim_disablers > 0;
|
||||
if (isPausedActions(PAUSE_ACTION_CLIENT_ALL) ||
|
||||
isPausedActions(PAUSE_ACTION_CLIENT_WRITE))
|
||||
isPausedActions(PAUSE_ACTION_CLIENT_WRITE) ||
|
||||
disabled_by_module)
|
||||
{
|
||||
if (blocked == 0) {
|
||||
blocked = 1;
|
||||
serverLog(LL_NOTICE, "Active trim cycle will continue after the write pause is lifted.");
|
||||
const char *reason = disabled_by_module ? "trim is disabled by module" :
|
||||
"pause action is in effect";
|
||||
serverLog(LL_NOTICE, "Active trim cycle is blocked since %s.", reason);
|
||||
}
|
||||
return;
|
||||
}
|
||||
if (blocked) serverLog(LL_NOTICE, "Active trim cycle is resumed after the write pause is lifted.");
|
||||
if (blocked) serverLog(LL_NOTICE, "Active trim cycle is unblocked.");
|
||||
blocked = 0;
|
||||
|
||||
/* This works in a similar way to activeExpireCycle, in the sense that
|
||||
|
|
@ -3414,28 +3445,10 @@ void asmActiveTrimCycle(void) {
|
|||
}
|
||||
}
|
||||
|
||||
/* Trim a specific key if trimming is pending or in progress for its slot.
|
||||
* Return 1 if the key was trimmed */
|
||||
int asmActiveTrimDelIfNeeded(redisDb *db, robj *key, kvobj *kv) {
|
||||
/* Check if trimming is in progress. */
|
||||
if (server.allow_access_trimmed ||
|
||||
!asmIsTrimInProgress())
|
||||
{
|
||||
/* Check if the key in a trim job. */
|
||||
int asmIsKeyInTrimJob(sds keyname) {
|
||||
if (!asmIsTrimInProgress() || !isSlotInTrimJob(getKeySlot(keyname)))
|
||||
return 0;
|
||||
}
|
||||
|
||||
/* Check if the slot is in a trim job. */
|
||||
sds keyname = key ? key->ptr : kvobjGetKey(kv);
|
||||
if (!isSlotInTrimJob(getKeySlot(keyname)))
|
||||
return 0;
|
||||
|
||||
if (key) {
|
||||
asmActiveTrimDeleteKey(db, key);
|
||||
} else {
|
||||
robj *tmpkey = createStringObject(keyname, sdslen(keyname));
|
||||
asmActiveTrimDeleteKey(db, tmpkey);
|
||||
decrRefCount(tmpkey);
|
||||
}
|
||||
return 1;
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -51,7 +51,7 @@ void asmFinalizeMasterTask(void);
|
|||
int asmIsTrimInProgress(void);
|
||||
int asmGetTrimmingSlotForCommand(struct redisCommand *cmd, robj **argv, int argc);
|
||||
void asmActiveTrimCycle(void);
|
||||
int asmActiveTrimDelIfNeeded(redisDb *db, robj *key, kvobj *kv);
|
||||
int asmIsKeyInTrimJob(sds keyname);
|
||||
int asmModulePropagateBeforeSlotSnapshot(struct redisCommand *cmd, robj **argv, int argc);
|
||||
#endif
|
||||
|
||||
|
|
|
|||
22
src/db.c
22
src/db.c
|
|
@ -35,12 +35,14 @@ static_assert(MAX_KEYSIZES_TYPES == OBJ_TYPE_BASIC_MAX, "Must be equal");
|
|||
#define EXPIRE_FORCE_DELETE_EXPIRED 1
|
||||
#define EXPIRE_AVOID_DELETE_EXPIRED 2
|
||||
#define EXPIRE_ALLOW_ACCESS_EXPIRED 4
|
||||
#define EXPIRE_ALLOW_ACCESS_TRIMMED 8
|
||||
|
||||
/* Return values for expireIfNeeded */
|
||||
typedef enum {
|
||||
KEY_VALID = 0, /* Could be volatile and not yet expired, non-volatile, or even non-existing key. */
|
||||
KEY_EXPIRED, /* Logically expired but not yet deleted. */
|
||||
KEY_DELETED /* The key was deleted now. */
|
||||
KEY_DELETED, /* The key was deleted now. */
|
||||
KEY_TRIMMED /* Logically trimmed but not yet deleted. */
|
||||
} keyStatus;
|
||||
|
||||
static keyStatus expireIfNeeded(redisDb *db, robj *key, kvobj *kv, int flags);
|
||||
|
|
@ -256,6 +258,8 @@ kvobj *lookupKey(redisDb *db, robj *key, int flags, dictEntryLink *link) {
|
|||
expire_flags |= EXPIRE_AVOID_DELETE_EXPIRED;
|
||||
if (flags & LOOKUP_ACCESS_EXPIRED)
|
||||
expire_flags |= EXPIRE_ALLOW_ACCESS_EXPIRED;
|
||||
if (flags & LOOKUP_ACCESS_TRIMMED)
|
||||
expire_flags |= EXPIRE_ALLOW_ACCESS_TRIMMED;
|
||||
if (expireIfNeeded(db, key, val, expire_flags) != KEY_VALID) {
|
||||
/* The key is no longer valid. */
|
||||
val = NULL;
|
||||
|
|
@ -2721,7 +2725,9 @@ int confAllowsExpireDel(void) {
|
|||
*
|
||||
* The return value of the function is KEY_VALID if the key is still valid.
|
||||
* The function returns KEY_EXPIRED if the key is expired BUT not deleted,
|
||||
* or returns KEY_DELETED if the key is expired and deleted.
|
||||
* or returns KEY_DELETED if the key is expired and deleted. If the key is in a
|
||||
* trim job due to slot migration, the function returns KEY_TRIMMED, unless
|
||||
* EXPIRE_ALLOW_ACCESS_TRIMMED is set, in which case it returns KEY_VALID.
|
||||
*
|
||||
* You can optionally pass `kv` to save a lookup.
|
||||
*/
|
||||
|
|
@ -2729,9 +2735,15 @@ keyStatus expireIfNeeded(redisDb *db, robj *key, kvobj *kv, int flags) {
|
|||
debugAssert(key != NULL || kv != NULL);
|
||||
|
||||
/* NOTE: Keys in slots scheduled for trimming can still exist for a while.
|
||||
* If a module touches one of these keys, we remove it right away and
|
||||
* return KEY_DELETED. */
|
||||
if (asmActiveTrimDelIfNeeded(db, key, kv)) return KEY_DELETED;
|
||||
* We don't delete it here, return KEY_VALID if allowing access to trimmed
|
||||
* keys, and return KEY_TRIMMED otherwise. */
|
||||
sds key_name = key ? key->ptr : kvobjGetKey(kv);
|
||||
if (asmIsKeyInTrimJob(key_name)) {
|
||||
if (server.allow_access_trimmed || (flags & EXPIRE_ALLOW_ACCESS_TRIMMED))
|
||||
return KEY_VALID;
|
||||
|
||||
return KEY_TRIMMED;
|
||||
}
|
||||
|
||||
if ((flags & EXPIRE_ALLOW_ACCESS_EXPIRED) ||
|
||||
(!keyIsExpired(db, key ? key->ptr : NULL, kv)))
|
||||
|
|
|
|||
40
src/module.c
40
src/module.c
|
|
@ -3943,6 +3943,8 @@ int RM_GetSelectedDb(RedisModuleCtx *ctx) {
|
|||
*
|
||||
* * REDISMODULE_CTX_FLAGS_DEBUG_ENABLED: Debug commands are enabled for this
|
||||
* context.
|
||||
* * REDISMODULE_CTX_FLAGS_TRIM_IN_PROGRESS: Trim is in progress due to slot
|
||||
* migration.
|
||||
*/
|
||||
int RM_GetContextFlags(RedisModuleCtx *ctx) {
|
||||
int flags = 0;
|
||||
|
|
@ -4040,6 +4042,9 @@ int RM_GetContextFlags(RedisModuleCtx *ctx) {
|
|||
flags |= REDISMODULE_CTX_FLAGS_DEBUG_ENABLED;
|
||||
}
|
||||
|
||||
if (asmIsTrimInProgress())
|
||||
flags |= REDISMODULE_CTX_FLAGS_TRIM_IN_PROGRESS;
|
||||
|
||||
return flags;
|
||||
}
|
||||
|
||||
|
|
@ -4145,6 +4150,7 @@ RedisModuleKey *RM_OpenKey(RedisModuleCtx *ctx, robj *keyname, int mode) {
|
|||
flags |= (mode & REDISMODULE_OPEN_KEY_NOEXPIRE? LOOKUP_NOEXPIRE: 0);
|
||||
flags |= (mode & REDISMODULE_OPEN_KEY_NOEFFECTS? LOOKUP_NOEFFECTS: 0);
|
||||
flags |= (mode & REDISMODULE_OPEN_KEY_ACCESS_EXPIRED ? (LOOKUP_ACCESS_EXPIRED) : 0);
|
||||
flags |= (mode & REDISMODULE_OPEN_KEY_ACCESS_TRIMMED ? (LOOKUP_ACCESS_TRIMMED) : 0);
|
||||
|
||||
if (mode & REDISMODULE_WRITE) {
|
||||
kv = lookupKeyWriteWithFlags(ctx->client->db,keyname, flags);
|
||||
|
|
@ -9388,12 +9394,44 @@ int RM_GetClusterNodeInfo(RedisModuleCtx *ctx, const char *id, char *ip, char *m
|
|||
* cluster, but without effect. */
|
||||
void RM_SetClusterFlags(RedisModuleCtx *ctx, uint64_t flags) {
|
||||
UNUSED(ctx);
|
||||
server.cluster_module_flags = CLUSTER_MODULE_FLAG_NONE;
|
||||
if (flags & REDISMODULE_CLUSTER_FLAG_NO_FAILOVER)
|
||||
server.cluster_module_flags |= CLUSTER_MODULE_FLAG_NO_FAILOVER;
|
||||
if (flags & REDISMODULE_CLUSTER_FLAG_NO_REDIRECTION)
|
||||
server.cluster_module_flags |= CLUSTER_MODULE_FLAG_NO_REDIRECTION;
|
||||
}
|
||||
|
||||
/* RM_ClusterDisableTrim allows a module to temporarily prevent slot trimming
|
||||
* after a slot migration. This is useful when the module has asynchronous
|
||||
* operations that rely on keys in migrating slots, which would be trimmed.
|
||||
*
|
||||
* The module must call RM_ClusterEnableTrim once it has completed those
|
||||
* operations to re-enable trimming.
|
||||
*
|
||||
* Trimming uses a reference counter: every call to RM_ClusterDisableTrim
|
||||
* increments the counter, and every RM_ClusterEnableTrim call decrements it.
|
||||
* Trimming remains disabled as long as the counter is greater than zero.
|
||||
*
|
||||
* Disable automatic slot trimming. */
|
||||
int RM_ClusterDisableTrim(RedisModuleCtx *ctx) {
|
||||
UNUSED(ctx);
|
||||
if (server.cluster_module_trim_disablers < INT_MAX) {
|
||||
server.cluster_module_trim_disablers++;
|
||||
return REDISMODULE_OK;
|
||||
}
|
||||
return REDISMODULE_ERR;
|
||||
}
|
||||
|
||||
/* Enable automatic slot trimming. See also comments on RM_ClusterDisableTrim. */
|
||||
int RM_ClusterEnableTrim(RedisModuleCtx *ctx) {
|
||||
UNUSED(ctx);
|
||||
if (server.cluster_module_trim_disablers > 0) {
|
||||
server.cluster_module_trim_disablers--;
|
||||
return REDISMODULE_OK;
|
||||
}
|
||||
return REDISMODULE_ERR;
|
||||
}
|
||||
|
||||
/* Returns the cluster slot of a key, similar to the `CLUSTER KEYSLOT` command.
|
||||
* This function works even if cluster mode is not enabled. */
|
||||
unsigned int RM_ClusterKeySlot(RedisModuleString *key) {
|
||||
|
|
@ -15087,6 +15125,8 @@ void moduleRegisterCoreAPI(void) {
|
|||
REGISTER_API(SetDisconnectCallback);
|
||||
REGISTER_API(GetBlockedClientHandle);
|
||||
REGISTER_API(SetClusterFlags);
|
||||
REGISTER_API(ClusterDisableTrim);
|
||||
REGISTER_API(ClusterEnableTrim);
|
||||
REGISTER_API(ClusterKeySlot);
|
||||
REGISTER_API(ClusterKeySlotC);
|
||||
REGISTER_API(ClusterCanonicalKeyNameInSlot);
|
||||
|
|
|
|||
|
|
@ -62,11 +62,14 @@ typedef long long ustime_t;
|
|||
#define REDISMODULE_OPEN_KEY_NOEFFECTS (1<<20)
|
||||
/* Allow access expired key that haven't deleted yet */
|
||||
#define REDISMODULE_OPEN_KEY_ACCESS_EXPIRED (1<<21)
|
||||
/* Allow access trimmed key that haven't deleted yet */
|
||||
#define REDISMODULE_OPEN_KEY_ACCESS_TRIMMED (1<<22)
|
||||
|
||||
|
||||
/* Mask of all REDISMODULE_OPEN_KEY_* values. Any new mode should be added to this list.
|
||||
* Should not be used directly by the module, use RM_GetOpenKeyModesAll instead.
|
||||
* Located here so when we will add new modes we will not forget to update it. */
|
||||
#define _REDISMODULE_OPEN_KEY_ALL REDISMODULE_READ | REDISMODULE_WRITE | REDISMODULE_OPEN_KEY_NOTOUCH | REDISMODULE_OPEN_KEY_NONOTIFY | REDISMODULE_OPEN_KEY_NOSTATS | REDISMODULE_OPEN_KEY_NOEXPIRE | REDISMODULE_OPEN_KEY_NOEFFECTS | REDISMODULE_OPEN_KEY_ACCESS_EXPIRED
|
||||
#define _REDISMODULE_OPEN_KEY_ALL REDISMODULE_READ | REDISMODULE_WRITE | REDISMODULE_OPEN_KEY_NOTOUCH | REDISMODULE_OPEN_KEY_NONOTIFY | REDISMODULE_OPEN_KEY_NOSTATS | REDISMODULE_OPEN_KEY_NOEXPIRE | REDISMODULE_OPEN_KEY_NOEFFECTS | REDISMODULE_OPEN_KEY_ACCESS_EXPIRED | REDISMODULE_OPEN_KEY_ACCESS_TRIMMED
|
||||
|
||||
/* List push and pop */
|
||||
#define REDISMODULE_LIST_HEAD 0
|
||||
|
|
@ -210,11 +213,13 @@ typedef struct RedisModuleStreamID {
|
|||
#define REDISMODULE_CTX_FLAGS_SERVER_STARTUP (1<<24)
|
||||
/* This context can call execute debug commands. */
|
||||
#define REDISMODULE_CTX_FLAGS_DEBUG_ENABLED (1<<25)
|
||||
/* Trim is in progress due to slot migration. */
|
||||
#define REDISMODULE_CTX_FLAGS_TRIM_IN_PROGRESS (1<<26)
|
||||
|
||||
/* Next context flag, must be updated when adding new flags above!
|
||||
This flag should not be used directly by the module.
|
||||
* Use RedisModule_GetContextFlagsAll instead. */
|
||||
#define _REDISMODULE_CTX_FLAGS_NEXT (1<<26)
|
||||
#define _REDISMODULE_CTX_FLAGS_NEXT (1<<27)
|
||||
|
||||
/* Keyspace changes notification classes. Every class is associated with a
|
||||
* character for configuration purposes.
|
||||
|
|
@ -1334,6 +1339,8 @@ REDISMODULE_API void (*RedisModule_GetRandomBytes)(unsigned char *dst, size_t le
|
|||
REDISMODULE_API void (*RedisModule_GetRandomHexChars)(char *dst, size_t len) REDISMODULE_ATTR;
|
||||
REDISMODULE_API void (*RedisModule_SetDisconnectCallback)(RedisModuleBlockedClient *bc, RedisModuleDisconnectFunc callback) REDISMODULE_ATTR;
|
||||
REDISMODULE_API void (*RedisModule_SetClusterFlags)(RedisModuleCtx *ctx, uint64_t flags) REDISMODULE_ATTR;
|
||||
REDISMODULE_API int (*RedisModule_ClusterDisableTrim)(RedisModuleCtx *ctx) REDISMODULE_ATTR;
|
||||
REDISMODULE_API int (*RedisModule_ClusterEnableTrim)(RedisModuleCtx *ctx) REDISMODULE_ATTR;
|
||||
REDISMODULE_API unsigned int (*RedisModule_ClusterKeySlot)(RedisModuleString *key) REDISMODULE_ATTR;
|
||||
REDISMODULE_API unsigned int (*RedisModule_ClusterKeySlotC)(const char *keystr, size_t keylen) REDISMODULE_ATTR;
|
||||
REDISMODULE_API const char *(*RedisModule_ClusterCanonicalKeyNameInSlot)(unsigned int slot) REDISMODULE_ATTR;
|
||||
|
|
@ -1727,6 +1734,8 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int
|
|||
REDISMODULE_GET_API(GetRandomBytes);
|
||||
REDISMODULE_GET_API(GetRandomHexChars);
|
||||
REDISMODULE_GET_API(SetClusterFlags);
|
||||
REDISMODULE_GET_API(ClusterDisableTrim);
|
||||
REDISMODULE_GET_API(ClusterEnableTrim);
|
||||
REDISMODULE_GET_API(ClusterKeySlot);
|
||||
REDISMODULE_GET_API(ClusterKeySlotC);
|
||||
REDISMODULE_GET_API(ClusterCanonicalKeyNameInSlot);
|
||||
|
|
|
|||
|
|
@ -2348,6 +2348,7 @@ void initServerConfig(void) {
|
|||
server.shutdown_flags = 0;
|
||||
server.shutdown_mstime = 0;
|
||||
server.cluster_module_flags = CLUSTER_MODULE_FLAG_NONE;
|
||||
server.cluster_module_trim_disablers = 0;
|
||||
server.migrate_cached_sockets = dictCreate(&migrateCacheDictType);
|
||||
server.next_client_id = 1; /* Client IDs, start from 1 .*/
|
||||
server.page_size = sysconf(_SC_PAGESIZE);
|
||||
|
|
|
|||
|
|
@ -2341,6 +2341,7 @@ struct redisServer {
|
|||
to set in order to suppress certain
|
||||
native Redis Cluster features. Check the
|
||||
REDISMODULE_CLUSTER_FLAG_*. */
|
||||
int cluster_module_trim_disablers; /* Number of module requests to disable trimming */
|
||||
int cluster_allow_reads_when_down; /* Are reads allowed when the cluster
|
||||
is down? */
|
||||
int cluster_config_file_lock_fd; /* cluster config fd, will be flocked. */
|
||||
|
|
@ -3831,6 +3832,7 @@ int objectSetLRUOrLFU(robj *val, long long lfu_freq, long long lru_idle,
|
|||
#define LOOKUP_WRITE (1<<3) /* Delete expired keys even in replicas. */
|
||||
#define LOOKUP_NOEXPIRE (1<<4) /* Avoid deleting lazy expired keys. */
|
||||
#define LOOKUP_ACCESS_EXPIRED (1<<5) /* Allow lookup to expired key. */
|
||||
#define LOOKUP_ACCESS_TRIMMED (1<<6) /* Allow lookup to key in slots being trimmed. */
|
||||
#define LOOKUP_NOEFFECTS (LOOKUP_NONOTIFY | LOOKUP_NOSTATS | LOOKUP_NOTOUCH | LOOKUP_NOEXPIRE) /* Avoid any effects from fetching the key */
|
||||
|
||||
static inline kvobj *dictGetKV(const dictEntry *de) {return (kvobj *) dictGetKey(de);}
|
||||
|
|
|
|||
|
|
@ -17,6 +17,9 @@ int numClusterTrimEvents = 0;
|
|||
/* Log of last deleted key event. */
|
||||
const char *lastDeletedKeyLog = NULL;
|
||||
|
||||
/* Flag to disable trim. */
|
||||
int disableTrimFlag = 0;
|
||||
|
||||
int replicateModuleCommand = 0; /* Enable or disable module command replication. */
|
||||
RedisModuleString *moduleCommandKeyName = NULL; /* Key name to replicate. */
|
||||
RedisModuleString *moduleCommandKeyVal = NULL; /* Key value to replicate. */
|
||||
|
|
@ -234,6 +237,33 @@ static void testNonFatalScenarios(RedisModuleCtx *ctx, RedisModuleClusterSlotMig
|
|||
testReplicatingUnknownCommand(ctx);
|
||||
}
|
||||
|
||||
int disableTrimCmd(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
|
||||
REDISMODULE_NOT_USED(argv);
|
||||
REDISMODULE_NOT_USED(argc);
|
||||
disableTrimFlag = 1;
|
||||
/* Only disable when MIGRATE_COMPLETED for simulating recommended usage. */
|
||||
// RedisModule_ClusterDisableTrim(ctx)
|
||||
RedisModule_ReplyWithSimpleString(ctx, "OK");
|
||||
return REDISMODULE_OK;
|
||||
}
|
||||
|
||||
int enableTrimCmd(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
|
||||
REDISMODULE_NOT_USED(argv);
|
||||
REDISMODULE_NOT_USED(argc);
|
||||
disableTrimFlag = 0;
|
||||
RedisModule_Assert(RedisModule_ClusterEnableTrim(ctx) == REDISMODULE_OK);
|
||||
RedisModule_ReplyWithSimpleString(ctx, "OK");
|
||||
return REDISMODULE_OK;
|
||||
}
|
||||
|
||||
int trimInProgressCmd(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
|
||||
REDISMODULE_NOT_USED(argv);
|
||||
REDISMODULE_NOT_USED(argc);
|
||||
uint64_t flags = RedisModule_GetContextFlags(ctx);
|
||||
RedisModule_ReplyWithLongLong(ctx, !!(flags & REDISMODULE_CTX_FLAGS_TRIM_IN_PROGRESS));
|
||||
return REDISMODULE_OK;
|
||||
}
|
||||
|
||||
void clusterEventCallback(RedisModuleCtx *ctx, RedisModuleEvent e, uint64_t sub, void *data) {
|
||||
REDISMODULE_NOT_USED(ctx);
|
||||
int ret;
|
||||
|
|
@ -260,10 +290,39 @@ void clusterEventCallback(RedisModuleCtx *ctx, RedisModuleEvent e, uint64_t sub,
|
|||
/* Log the event. */
|
||||
if (numClusterEvents >= MAX_EVENTS) return;
|
||||
clusterEventLog[numClusterEvents++] = clusterAsmInfoToString(info, sub);
|
||||
|
||||
if (sub == REDISMODULE_SUBEVENT_CLUSTER_SLOT_MIGRATION_MIGRATE_COMPLETED) {
|
||||
/* If users ask to disable trim, we disable trim. */
|
||||
if (disableTrimFlag) {
|
||||
RedisModule_Assert(RedisModule_ClusterDisableTrim(ctx) == REDISMODULE_OK);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
int getPendingTrimKeyCmd(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
|
||||
if (argc != 2) {
|
||||
RedisModule_ReplyWithError(ctx, "ERR wrong number of arguments");
|
||||
return REDISMODULE_ERR;
|
||||
}
|
||||
RedisModuleKey *key = RedisModule_OpenKey(ctx, argv[1],
|
||||
REDISMODULE_READ | REDISMODULE_OPEN_KEY_ACCESS_TRIMMED);
|
||||
if (!key) {
|
||||
RedisModule_ReplyWithNull(ctx);
|
||||
return REDISMODULE_OK;
|
||||
}
|
||||
if (RedisModule_KeyType(key) != REDISMODULE_KEYTYPE_STRING) {
|
||||
RedisModule_ReplyWithError(ctx, "key is not a string");
|
||||
return REDISMODULE_ERR;
|
||||
}
|
||||
size_t len;
|
||||
const char *value = RedisModule_StringDMA(key, &len, 0);
|
||||
RedisModule_ReplyWithStringBuffer(ctx, value, len);
|
||||
RedisModule_CloseKey(key);
|
||||
return REDISMODULE_OK;
|
||||
}
|
||||
|
||||
void clusterTrimEventCallback(RedisModuleCtx *ctx, RedisModuleEvent e, uint64_t sub, void *data) {
|
||||
REDISMODULE_NOT_USED(ctx);
|
||||
|
||||
|
|
@ -473,6 +532,18 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
|
|||
if (RedisModule_CreateCommand(ctx, "asm.keyless_cmd", keylessCmd, "write", 0, 0, 0) == REDISMODULE_ERR)
|
||||
return REDISMODULE_ERR;
|
||||
|
||||
if (RedisModule_CreateCommand(ctx, "asm.disable_trim", disableTrimCmd, "", 0, 0, 0) == REDISMODULE_ERR)
|
||||
return REDISMODULE_ERR;
|
||||
|
||||
if (RedisModule_CreateCommand(ctx, "asm.enable_trim", enableTrimCmd, "", 0, 0, 0) == REDISMODULE_ERR)
|
||||
return REDISMODULE_ERR;
|
||||
|
||||
if (RedisModule_CreateCommand(ctx, "asm.read_pending_trim_key", getPendingTrimKeyCmd, "readonly", 0, 0, 0) == REDISMODULE_ERR)
|
||||
return REDISMODULE_ERR;
|
||||
|
||||
if (RedisModule_CreateCommand(ctx, "asm.trim_in_progress", trimInProgressCmd, "", 0, 0, 0) == REDISMODULE_ERR)
|
||||
return REDISMODULE_ERR;
|
||||
|
||||
if (RedisModule_CreateCommand(ctx, "asm.read_keyless_cmd_val", readkeylessCmdVal, "", 0, 0, 0) == REDISMODULE_ERR)
|
||||
return REDISMODULE_ERR;
|
||||
|
||||
|
|
|
|||
|
|
@ -197,6 +197,19 @@ proc setup_slot_migration_with_delay {src_node dst_node start_slot end_slot {key
|
|||
return $task_id
|
||||
}
|
||||
|
||||
# Helper function to clear module internal event logs
|
||||
proc clear_module_event_log {} {
|
||||
for {set i 0} {$i < $::cluster_master_nodes + $::cluster_replica_nodes} {incr i} {
|
||||
R $i asm.clear_event_log
|
||||
}
|
||||
}
|
||||
|
||||
proc reset_default_trim_method {} {
|
||||
for {set i 0} {$i < $::cluster_master_nodes + $::cluster_replica_nodes} {incr i} {
|
||||
R $i debug asm-trim-method default
|
||||
}
|
||||
}
|
||||
|
||||
start_cluster 3 3 {tags {external:skip cluster} overrides {cluster-node-timeout 60000 cluster-allow-replica-migration no}} {
|
||||
foreach trim_method {"active" "bg"} {
|
||||
test "Simple slot migration (trim method: $trim_method)" {
|
||||
|
|
@ -1658,7 +1671,7 @@ start_cluster 3 3 {tags {external:skip cluster} overrides {cluster-node-timeout
|
|||
|
||||
# Trigger a failover with force to simulate unreachable master and
|
||||
# verify unowned keys are trimmed once replica becomes master.
|
||||
failover_and_wait_for_done 4
|
||||
failover_and_wait_for_done 4 force
|
||||
wait_for_log_messages -4 {"*Detected keys in slots that do not belong*Scheduling trim*"} $loglines 1000 10
|
||||
wait_for_condition 1000 10 {
|
||||
[R 1 dbsize] == 0 &&
|
||||
|
|
@ -2223,19 +2236,6 @@ start_cluster 3 3 {tags {external:skip cluster} overrides {cluster-node-timeout
|
|||
set testmodule [file normalize tests/modules/atomicslotmigration.so]
|
||||
|
||||
start_cluster 3 6 [list tags {external:skip cluster modules} config_lines [list loadmodule $testmodule cluster-node-timeout 60000 cluster-allow-replica-migration no]] {
|
||||
# Helper function to clear module internal event logs
|
||||
proc clear_module_event_log {} {
|
||||
for {set i 0} {$i < $::cluster_master_nodes + $::cluster_replica_nodes} {incr i} {
|
||||
R $i asm.clear_event_log
|
||||
}
|
||||
}
|
||||
|
||||
proc reset_default_trim_method {} {
|
||||
for {set i 0} {$i < $::cluster_master_nodes + $::cluster_replica_nodes} {incr i} {
|
||||
R $i debug asm-trim-method default
|
||||
}
|
||||
}
|
||||
|
||||
test "Module api sanity" {
|
||||
R 0 asm.sanity ;# on master
|
||||
R 3 asm.sanity ;# on replica
|
||||
|
|
@ -2535,17 +2535,17 @@ start_cluster 3 6 [list tags {external:skip cluster modules} config_lines [list
|
|||
# NOTE: only slot 0 has data, so only slot 0 is trimmed
|
||||
if {$trim_method eq "active"} {
|
||||
set trim_event_log [list \
|
||||
"sub: cluster-slot-migration-trim-started, slots:0-0" \
|
||||
"sub: cluster-slot-migration-trim-started, slots:0-100" \
|
||||
"keyspace: key_trimmed, key: $key" \
|
||||
"sub: cluster-slot-migration-trim-completed, slots:0-0" \
|
||||
"sub: cluster-slot-migration-trim-completed, slots:0-100" \
|
||||
]
|
||||
} else {
|
||||
set trim_event_log [list \
|
||||
"sub: cluster-slot-migration-trim-background, slots:0-0" \
|
||||
"sub: cluster-slot-migration-trim-background, slots:0-100" \
|
||||
]
|
||||
}
|
||||
wait_for_condition 500 20 {
|
||||
[list [lindex [R 1 asm.get_cluster_trim_event_log] 1]] eq $trim_event_log &&
|
||||
[R 1 asm.get_cluster_trim_event_log] eq $trim_event_log &&
|
||||
[R 4 asm.get_cluster_trim_event_log] eq $trim_event_log &&
|
||||
[R 7 asm.get_cluster_trim_event_log] eq $trim_event_log
|
||||
} else {
|
||||
|
|
@ -2822,10 +2822,8 @@ start_cluster 3 6 [list tags {external:skip cluster modules} config_lines [list
|
|||
fail "migrate failed"
|
||||
}
|
||||
|
||||
# Try to read the key from the slot being trimmed. It will lazily trim the key.
|
||||
set num_trimmed [CI 0 cluster_slot_migration_active_trim_current_job_trimmed]
|
||||
# We cannot open the key since it is in a slot being trimmed
|
||||
assert_equal {} [R 0 asm.get $key]
|
||||
assert_equal [expr $num_trimmed + 1] [CI 0 cluster_slot_migration_active_trim_current_job_trimmed]
|
||||
|
||||
# cleanup
|
||||
R 0 debug asm-trim-method default
|
||||
|
|
@ -2875,6 +2873,7 @@ start_cluster 2 0 [list tags {external:skip cluster modules} config_lines [list
|
|||
# restart server and verify aof is loaded
|
||||
restart_server 0 yes no yes nosave
|
||||
assert {[scan [regexp -inline {aof_current_size:([\d]*)} [R 0 info persistence]] aof_current_size=%d] > 0}
|
||||
wait_for_cluster_state "ok"
|
||||
|
||||
# verify TRIMSLOTS in AOF is executed synchronously
|
||||
assert_equal 0 [CI 0 cluster_slot_migration_stats_active_trim_completed]
|
||||
|
|
@ -2884,6 +2883,76 @@ start_cluster 2 0 [list tags {external:skip cluster modules} config_lines [list
|
|||
R 0 CLUSTER MIGRATION IMPORT 0 0
|
||||
wait_for_asm_done
|
||||
assert_equal 2000 [R 0 dbsize]
|
||||
R 0 flushall
|
||||
R 1 flushall
|
||||
clear_module_event_log
|
||||
|
||||
}
|
||||
|
||||
test "Test trim is disabled when module requests it" {
|
||||
R 0 asm.disable_trim
|
||||
|
||||
set slot0_key [slot_key 0 mykey]
|
||||
R 0 set $slot0_key "value"
|
||||
set task_id [R 1 CLUSTER MIGRATION IMPORT 0 0]
|
||||
wait_for_condition 1000 10 {
|
||||
[string match {*completed*} [migration_status 0 $task_id state]]
|
||||
} else {
|
||||
fail "ASM task did not complete"
|
||||
}
|
||||
# since we disable trim, the key should still exist on source,
|
||||
# we can read it with REDISMODULE_OPEN_KEY_ACCESS_TRIMMED flag
|
||||
assert_equal "value" [R 0 asm.read_pending_trim_key $slot0_key]
|
||||
assert_equal 1 [R 0 asm.trim_in_progress]
|
||||
|
||||
# enable trim and verify the key is trimmed
|
||||
R 0 asm.enable_trim
|
||||
wait_for_condition 1000 10 {
|
||||
[R 0 asm.read_pending_trim_key $slot0_key] eq "" &&
|
||||
[R 0 asm.trim_in_progress] == 0
|
||||
} else {
|
||||
fail "Trim did not complete"
|
||||
}
|
||||
wait_for_asm_done
|
||||
R 0 CLUSTER MIGRATION IMPORT 0 0
|
||||
wait_for_asm_done
|
||||
clear_module_event_log
|
||||
}
|
||||
|
||||
test "Can not start new asm task when trim is not allowed" {
|
||||
# start a migration task, wait it completed but not allow to trim slots
|
||||
R 0 asm.disable_trim
|
||||
set task_id [R 1 CLUSTER MIGRATION IMPORT 0 0]
|
||||
wait_for_condition 1000 10 {
|
||||
[string match {*completed*} [migration_status 0 $task_id state]]
|
||||
} else {
|
||||
fail "ASM task did not complete"
|
||||
}
|
||||
# Can not start new migrating task since trim is disabled
|
||||
set task_id [R 1 CLUSTER MIGRATION IMPORT 1 1]
|
||||
wait_for_condition 1000 10 {
|
||||
[string match {*fail*} [migration_status 1 $task_id state]] &&
|
||||
[string match {*Trim is disabled by module*} [migration_status 1 $task_id last_error]]
|
||||
} else {
|
||||
fail "ASM task did not fail"
|
||||
}
|
||||
R 0 asm.enable_trim
|
||||
wait_for_asm_done
|
||||
|
||||
# start a migration task, wait it completed but not allow to trim slots
|
||||
R 0 asm.disable_trim
|
||||
set task_id [R 1 CLUSTER MIGRATION IMPORT 2 2]
|
||||
wait_for_condition 1000 10 {
|
||||
[string match {*completed*} [migration_status 0 $task_id state]]
|
||||
} else {
|
||||
fail "ASM task did not complete"
|
||||
}
|
||||
set logline [count_log_lines 0]
|
||||
# Can not start new importing task since trim is disabled
|
||||
set task_id [R 0 CLUSTER MIGRATION IMPORT 0 1]
|
||||
wait_for_log_messages 0 {"*Can not start import task*trim is disabled by module*"} $logline 1000 10
|
||||
R 0 asm.enable_trim
|
||||
wait_for_asm_done
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Reference in a new issue