mirror of
https://github.com/redis/redis.git
synced 2026-02-03 20:39:54 -05:00
New eviction policies - least recently modified (#14624)
### Summary This PR introduces two new maxmemory eviction policies: `volatile-lrm` and `allkeys-lrm`. LRM (Least Recently Modified) is similar to LRU but only updates the timestamp on write operations, not read operations. This makes it useful for evicting keys that haven't been modified recently, regardless of how frequently they are read. ### Core Implementation The LRM implementation reuses the existing LRU infrastructure but with a key difference in when timestamps are updated: - **LRU**: Updates timestamp on both read and write operations - **LRM**: Updates timestamp only on write operations via `updateLRM()` ### Key changes: Add `keyModified()` to accept an optional `robj *val` parameter and call `updateLRM()` when a value is provided. Since `keyModified()` serves as the unified entry point for all key modifications, placing the LRM update here ensures timestamps are consistently updated across all write operations --------- Co-authored-by: oranagra <oran@redislabs.com> Co-authored-by: Yuan Wang <yuan.wang@redis.com>
This commit is contained in:
parent
9ca860be9e
commit
0cb1ee0dc1
24 changed files with 295 additions and 104 deletions
11
redis.conf
11
redis.conf
|
|
@ -1177,6 +1177,8 @@ acllog-max-len 128
|
|||
# allkeys-lru -> Evict any key using approximated LRU.
|
||||
# volatile-lfu -> Evict using approximated LFU, only keys with an expire set.
|
||||
# allkeys-lfu -> Evict any key using approximated LFU.
|
||||
# volatile-lrm -> Evict using approximated LRM, only keys with an expire set.
|
||||
# allkeys-lrm -> Evict any key using approximated LRM.
|
||||
# volatile-random -> Remove a random key having an expire set.
|
||||
# allkeys-random -> Remove a random key, any key.
|
||||
# volatile-ttl -> Remove the key with the nearest expire time (minor TTL)
|
||||
|
|
@ -1184,10 +1186,17 @@ acllog-max-len 128
|
|||
#
|
||||
# LRU means Least Recently Used
|
||||
# LFU means Least Frequently Used
|
||||
# LRM means Least Recently Modified (only write operations update the timestamp)
|
||||
#
|
||||
# Both LRU, LFU and volatile-ttl are implemented using approximated
|
||||
# LRU, LFU, LRM and volatile-ttl are implemented using approximated
|
||||
# randomized algorithms.
|
||||
#
|
||||
# LRU vs LRM: Both use similar eviction logic based on access time, but:
|
||||
# - LRU updates the timestamp on both read (GET) and write (SET) operations
|
||||
# - LRM only updates the timestamp on write (SET, INCR, etc.) operations
|
||||
# This makes LRM useful when you want to evict keys that haven't been updated
|
||||
# recently, regardless of how often they are read.
|
||||
#
|
||||
# Note: with any of the above policies, when there are no suitable keys for
|
||||
# eviction, Redis will return an error on write operations that require
|
||||
# more memory. These are usually commands that create new keys, add data or
|
||||
|
|
|
|||
|
|
@ -875,7 +875,7 @@ void setbitCommand(client *c) {
|
|||
byteval &= ~(1 << bit);
|
||||
byteval |= ((on & 0x1) << bit);
|
||||
((uint8_t*)o->ptr)[byte] = byteval;
|
||||
signalModifiedKey(c,c->db,c->argv[1]);
|
||||
keyModified(c,c->db,c->argv[1],o,1);
|
||||
notifyKeyspaceEvent(NOTIFY_STRING,"setbit",c->argv[1],c->db->id);
|
||||
server.dirty++;
|
||||
|
||||
|
|
@ -1447,7 +1447,7 @@ void bitopCommand(client *c) {
|
|||
notifyKeyspaceEvent(NOTIFY_STRING,"set",targetkey,c->db->id);
|
||||
server.dirty++;
|
||||
} else if (dbDelete(c->db,targetkey)) {
|
||||
signalModifiedKey(c,c->db,targetkey);
|
||||
keyModified(c,c->db,targetkey,NULL,1);
|
||||
notifyKeyspaceEvent(NOTIFY_GENERIC,"del",targetkey,c->db->id);
|
||||
server.dirty++;
|
||||
}
|
||||
|
|
@ -1951,7 +1951,7 @@ void bitfieldGeneric(client *c, int flags) {
|
|||
updateKeysizesHist(c->db, getKeySlot(c->argv[1]->ptr), OBJ_STRING,
|
||||
strOldSize, strOldSize + strGrowSize);
|
||||
|
||||
signalModifiedKey(c,c->db,c->argv[1]);
|
||||
keyModified(c,c->db,c->argv[1],o,1);
|
||||
notifyKeyspaceEvent(NOTIFY_STRING,"setbit",c->argv[1],c->db->id);
|
||||
server.dirty += changes;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -257,7 +257,7 @@ void restoreCommand(client *c) {
|
|||
if (deleted) {
|
||||
robj *aux = server.lazyfree_lazy_server_del ? shared.unlink : shared.del;
|
||||
rewriteClientCommandVector(c, 2, aux, key);
|
||||
signalModifiedKey(c,c->db,key);
|
||||
keyModified(c,c->db,key,NULL,1);
|
||||
notifyKeyspaceEvent(NOTIFY_GENERIC,"del",key,c->db->id);
|
||||
server.dirty++;
|
||||
}
|
||||
|
|
@ -287,7 +287,7 @@ void restoreCommand(client *c) {
|
|||
}
|
||||
}
|
||||
objectSetLRUOrLFU(kv, lfu_freq, lru_idle, lru_clock, 1000);
|
||||
signalModifiedKey(c,c->db,key);
|
||||
keyModified(c,c->db,key,NULL,1);
|
||||
notifyKeyspaceEvent(NOTIFY_GENERIC,"restore",key,c->db->id);
|
||||
|
||||
/* If we deleted a key that means REPLACE parameter was passed and the
|
||||
|
|
@ -660,7 +660,7 @@ void migrateCommand(client *c) {
|
|||
if (!copy) {
|
||||
/* No COPY option: remove the local key, signal the change. */
|
||||
dbDelete(c->db,keyArray[j]);
|
||||
signalModifiedKey(c,c->db,keyArray[j]);
|
||||
keyModified(c,c->db,keyArray[j],NULL,1);
|
||||
notifyKeyspaceEvent(NOTIFY_GENERIC,"del",keyArray[j],c->db->id);
|
||||
server.dirty++;
|
||||
|
||||
|
|
@ -1681,7 +1681,7 @@ unsigned int clusterDelKeysInSlot(unsigned int hashslot, int by_command) {
|
|||
robj *key = createStringObject(sdskey, sdslen(sdskey));
|
||||
dbDelete(&server.db[0], key);
|
||||
|
||||
signalModifiedKey(NULL, &server.db[0], key);
|
||||
keyModified(NULL, &server.db[0], key, NULL, 1);
|
||||
if (by_command) {
|
||||
/* Keys are deleted by a command (trimslots), we need to notify the
|
||||
* keyspace event. Though, we don't need to propagate the DEL
|
||||
|
|
|
|||
|
|
@ -3416,7 +3416,7 @@ void asmActiveTrimDeleteKey(redisDb *db, robj *keyobj) {
|
|||
if (static_key) keyobj = createStringObject(keyobj->ptr, sdslen(keyobj->ptr));
|
||||
|
||||
dbDelete(db, keyobj);
|
||||
signalModifiedKey(NULL, db, keyobj);
|
||||
keyModified(NULL, db, keyobj, NULL, 1);
|
||||
/* The keys are not actually logically deleted from the database, just moved
|
||||
* to another node. The modules need to know that these keys are no longer
|
||||
* available locally, so just send the keyspace notification to the modules,
|
||||
|
|
|
|||
|
|
@ -40,9 +40,11 @@ configEnum maxmemory_policy_enum[] = {
|
|||
{"volatile-lfu", MAXMEMORY_VOLATILE_LFU},
|
||||
{"volatile-random",MAXMEMORY_VOLATILE_RANDOM},
|
||||
{"volatile-ttl",MAXMEMORY_VOLATILE_TTL},
|
||||
{"volatile-lrm",MAXMEMORY_VOLATILE_LRM},
|
||||
{"allkeys-lru",MAXMEMORY_ALLKEYS_LRU},
|
||||
{"allkeys-lfu",MAXMEMORY_ALLKEYS_LFU},
|
||||
{"allkeys-random",MAXMEMORY_ALLKEYS_RANDOM},
|
||||
{"allkeys-lrm",MAXMEMORY_ALLKEYS_LRM},
|
||||
{"noeviction",MAXMEMORY_NO_EVICTION},
|
||||
{NULL, 0}
|
||||
};
|
||||
|
|
|
|||
59
src/db.c
59
src/db.c
|
|
@ -56,6 +56,15 @@ void updateLFU(robj *val) {
|
|||
val->lru = (LFUGetTimeInMinutes()<<8) | counter;
|
||||
}
|
||||
|
||||
/* Update LRM when an object is modified. */
|
||||
void updateLRM(robj *o) {
|
||||
if (o->refcount == OBJ_SHARED_REFCOUNT)
|
||||
return;
|
||||
if (server.maxmemory_policy & MAXMEMORY_FLAG_LRM) {
|
||||
o->lru = LRU_CLOCK();
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Update histogram of keys-sizes
|
||||
*
|
||||
|
|
@ -278,7 +287,8 @@ kvobj *lookupKey(redisDb *db, robj *key, int flags, dictEntryLink *link) {
|
|||
if (!hasActiveChildProcess() && !(flags & LOOKUP_NOTOUCH)){
|
||||
if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) {
|
||||
updateLFU(val);
|
||||
} else {
|
||||
} else if (!(server.maxmemory_policy & MAXMEMORY_FLAG_LRM)) {
|
||||
/* LRM policy should NOT update timestamp on reads. */
|
||||
val->lru = LRU_CLOCK();
|
||||
}
|
||||
}
|
||||
|
|
@ -682,8 +692,8 @@ void setKeyByLink(client *c, redisDb *db, robj *key, robj **valref, int flags, d
|
|||
dbAddByLink(db, key, valref, link);
|
||||
}
|
||||
|
||||
if (!(flags & SETKEY_NO_SIGNAL))
|
||||
signalModifiedKey(c,db,key);
|
||||
/* Signal key modification and update LRM timestamp. */
|
||||
keyModified(c,db,key,*valref,!(flags & SETKEY_NO_SIGNAL));
|
||||
}
|
||||
|
||||
/* During atomic slot migration, keys that are being imported are in an
|
||||
|
|
@ -1016,16 +1026,27 @@ long long dbTotalServerKeyCount(void) {
|
|||
* Hooks for key space changes.
|
||||
*
|
||||
* Every time a key in the database is modified the function
|
||||
* signalModifiedKey() is called.
|
||||
* keyModified() is called.
|
||||
*
|
||||
* Every time a DB is flushed the function signalFlushDb() is called.
|
||||
*----------------------------------------------------------------------------*/
|
||||
|
||||
/* Note that the 'c' argument may be NULL if the key was modified out of
|
||||
* a context of a client. */
|
||||
void signalModifiedKey(client *c, redisDb *db, robj *key) {
|
||||
touchWatchedKey(db,key);
|
||||
trackingInvalidateKey(c,key,1);
|
||||
/* Called when a key is modified to update LRM timestamp
|
||||
* and optionally signal watchers/tracking clients.
|
||||
*
|
||||
* Arguments:
|
||||
* - c: client (may be NULL if the key was modified out of a context of a client)
|
||||
* - db: database containing the key
|
||||
* - key: the key that was modified
|
||||
* - val: the value object (if NULL, LRM won't be updated, e.g., for deleted keys)
|
||||
* - signal: if true, trigger WATCH and client-side tracking invalidation
|
||||
*/
|
||||
void keyModified(client *c, redisDb *db, robj *key, robj *val, int signal) {
|
||||
if (val) updateLRM(val);
|
||||
if (signal) {
|
||||
touchWatchedKey(db,key);
|
||||
trackingInvalidateKey(c,key,1);
|
||||
}
|
||||
}
|
||||
|
||||
void signalFlushedDb(int dbid, int async, slotRangeArray *slots) {
|
||||
|
|
@ -1246,7 +1267,7 @@ void delGenericCommand(client *c, int lazy) {
|
|||
int deleted = lazy ? dbAsyncDelete(c->db,c->argv[j]) :
|
||||
dbSyncDelete(c->db,c->argv[j]);
|
||||
if (deleted) {
|
||||
signalModifiedKey(c,c->db,c->argv[j]);
|
||||
keyModified(c,c->db,c->argv[j],NULL,1);
|
||||
notifyKeyspaceEvent(NOTIFY_GENERIC,
|
||||
"del",c->argv[j],c->db->id);
|
||||
server.dirty++;
|
||||
|
|
@ -1346,7 +1367,7 @@ void delexCommand(client *c) {
|
|||
|
||||
if (deleted) {
|
||||
rewriteClientCommandVector(c, 2, shared.del, key);
|
||||
signalModifiedKey(c, c->db, key);
|
||||
keyModified(c, c->db, key, NULL, 1);
|
||||
notifyKeyspaceEvent(NOTIFY_GENERIC, "del", key, c->db->id);
|
||||
server.dirty++;
|
||||
}
|
||||
|
|
@ -2059,8 +2080,8 @@ void renameGenericCommand(client *c, int nx) {
|
|||
if (minHashExpireTime != EB_EXPIRE_TIME_INVALID)
|
||||
estoreAdd(c->db->subexpires, getKeySlot(c->argv[2]->ptr), o, minHashExpireTime);
|
||||
|
||||
signalModifiedKey(c,c->db,c->argv[1]);
|
||||
signalModifiedKey(c,c->db,c->argv[2]);
|
||||
keyModified(c,c->db,c->argv[1],NULL,1);
|
||||
keyModified(c,c->db,c->argv[2],NULL,1); /* LRM already updated by dbAddInternal */
|
||||
notifyKeyspaceEvent(NOTIFY_GENERIC,"rename_from",
|
||||
c->argv[1],c->db->id);
|
||||
notifyKeyspaceEvent(NOTIFY_GENERIC,"rename_to",
|
||||
|
|
@ -2152,8 +2173,8 @@ void moveCommand(client *c) {
|
|||
if (hashExpireTime != EB_EXPIRE_TIME_INVALID)
|
||||
estoreAdd(dst->subexpires, slot, kv, hashExpireTime);
|
||||
|
||||
signalModifiedKey(c,src,c->argv[1]);
|
||||
signalModifiedKey(c,dst,c->argv[1]);
|
||||
keyModified(c,src,c->argv[1],NULL,1);
|
||||
keyModified(c,dst,c->argv[1],NULL,1); /* LRM already updated by dbAddInternal */
|
||||
notifyKeyspaceEvent(NOTIFY_GENERIC,
|
||||
"move_from",c->argv[1],src->id);
|
||||
notifyKeyspaceEvent(NOTIFY_GENERIC,
|
||||
|
|
@ -2269,8 +2290,8 @@ void copyCommand(client *c) {
|
|||
if (minHashExpire != EB_EXPIRE_TIME_INVALID)
|
||||
estoreAdd(dst->subexpires, getKeySlot(newkey->ptr), kvCopy, minHashExpire);
|
||||
|
||||
/* OK! key copied */
|
||||
signalModifiedKey(c,dst,c->argv[2]);
|
||||
/* OK! key copied. Signal modification (LRM already updated by dbAddInternal) */
|
||||
keyModified(c,dst,c->argv[2],NULL,1);
|
||||
notifyKeyspaceEvent(NOTIFY_GENERIC,"copy_to",c->argv[2],dst->id);
|
||||
|
||||
/* `delete` implies the destination key was overwritten */
|
||||
|
|
@ -2588,7 +2609,7 @@ static void deleteKeyAndPropagate(redisDb *db, robj *keyobj, int notify_type, lo
|
|||
* we are freeing removing the key, but we can't account for
|
||||
* that otherwise we would never exit the loop.
|
||||
*
|
||||
* Same for CSC invalidation messages generated by signalModifiedKey.
|
||||
* Same for CSC invalidation messages generated by keyModified.
|
||||
*
|
||||
* AOF and Output buffer memory will be freed eventually so
|
||||
* we only care about memory used by the key space.
|
||||
|
|
@ -2605,7 +2626,7 @@ static void deleteKeyAndPropagate(redisDb *db, robj *keyobj, int notify_type, lo
|
|||
if (key_mem_freed) *key_mem_freed -= (long long) zmalloc_used_memory() - freeMemoryGetNotCountedMemory();
|
||||
|
||||
notifyKeyspaceEvent(notify_type, notify_name,keyobj, db->id);
|
||||
signalModifiedKey(NULL, db, keyobj);
|
||||
keyModified(NULL, db, keyobj, NULL, 1);
|
||||
propagateDeletion(db, keyobj, lazy_flag);
|
||||
|
||||
if (notify_type == NOTIFY_EXPIRED)
|
||||
|
|
|
|||
|
|
@ -785,7 +785,7 @@ NULL
|
|||
memcpy(val->ptr, buf, valsize<=buflen? valsize: buflen);
|
||||
}
|
||||
dbAdd(c->db, key, &val);
|
||||
signalModifiedKey(c,c->db,key);
|
||||
keyModified(c,c->db,key,NULL,1);
|
||||
decrRefCount(key);
|
||||
}
|
||||
addReply(c,shared.ok);
|
||||
|
|
|
|||
|
|
@ -149,7 +149,7 @@ int evictionPoolPopulate(redisDb *db, kvstore *samplekvs, struct evictionPoolEnt
|
|||
/* Calculate the idle time according to the policy. This is called
|
||||
* idle just because the code initially handled LRU, but is in fact
|
||||
* just a score where a higher score means better candidate. */
|
||||
if (server.maxmemory_policy & MAXMEMORY_FLAG_LRU) {
|
||||
if (server.maxmemory_policy & (MAXMEMORY_FLAG_LRU|MAXMEMORY_FLAG_LRM)) {
|
||||
idle = estimateObjectIdleTime(kv);
|
||||
} else if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) {
|
||||
/* When we use an LRU policy, we sort the keys by idle time
|
||||
|
|
@ -572,7 +572,7 @@ int performEvictions(void) {
|
|||
redisDb *db;
|
||||
dictEntry *de;
|
||||
|
||||
if (server.maxmemory_policy & (MAXMEMORY_FLAG_LRU|MAXMEMORY_FLAG_LFU) ||
|
||||
if (server.maxmemory_policy & (MAXMEMORY_FLAG_LRU|MAXMEMORY_FLAG_LFU|MAXMEMORY_FLAG_LRM) ||
|
||||
server.maxmemory_policy == MAXMEMORY_VOLATILE_TTL)
|
||||
{
|
||||
struct evictionPoolEntry *pool = EvictionPoolLRU;
|
||||
|
|
|
|||
|
|
@ -814,7 +814,7 @@ void expireGenericCommand(client *c, long long basetime, int unit) {
|
|||
/* Replicate/AOF this as an explicit DEL or UNLINK. */
|
||||
aux = server.lazyfree_lazy_expire ? shared.unlink : shared.del;
|
||||
rewriteClientCommandVector(c,2,aux,key);
|
||||
signalModifiedKey(c,c->db,key);
|
||||
keyModified(c,c->db,key,NULL,1);
|
||||
notifyKeyspaceEvent(NOTIFY_GENERIC,"del",key,c->db->id);
|
||||
addReply(c, shared.cone);
|
||||
return;
|
||||
|
|
@ -834,7 +834,7 @@ void expireGenericCommand(client *c, long long basetime, int unit) {
|
|||
decrRefCount(when_obj);
|
||||
}
|
||||
|
||||
signalModifiedKey(c,c->db,key);
|
||||
keyModified(c,c->db,key,kv,1);
|
||||
notifyKeyspaceEvent(NOTIFY_GENERIC,"expire",key,c->db->id);
|
||||
server.dirty++;
|
||||
return;
|
||||
|
|
@ -908,9 +908,10 @@ void pexpiretimeCommand(client *c) {
|
|||
|
||||
/* PERSIST key */
|
||||
void persistCommand(client *c) {
|
||||
if (lookupKeyWrite(c->db,c->argv[1])) {
|
||||
kvobj *kv;
|
||||
if ((kv = lookupKeyWrite(c->db,c->argv[1]))) {
|
||||
if (removeExpire(c->db,c->argv[1])) {
|
||||
signalModifiedKey(c,c->db,c->argv[1]);
|
||||
keyModified(c,c->db,c->argv[1],kv,1);
|
||||
notifyKeyspaceEvent(NOTIFY_GENERIC,"persist",c->argv[1],c->db->id);
|
||||
addReply(c,shared.cone);
|
||||
server.dirty++;
|
||||
|
|
|
|||
|
|
@ -699,7 +699,7 @@ void georadiusGeneric(client *c, int srcKeyIndex, int flags) {
|
|||
if (storekey) {
|
||||
/* store key is not NULL, try to delete it and return 0. */
|
||||
if (dbDelete(c->db, storekey)) {
|
||||
signalModifiedKey(c, c->db, storekey);
|
||||
keyModified(c, c->db, storekey, NULL, 1);
|
||||
notifyKeyspaceEvent(NOTIFY_GENERIC, "del", storekey, c->db->id);
|
||||
server.dirty++;
|
||||
}
|
||||
|
|
@ -833,7 +833,7 @@ void georadiusGeneric(client *c, int srcKeyIndex, int flags) {
|
|||
c->db->id);
|
||||
server.dirty += returned_items;
|
||||
} else if (dbDelete(c->db,storekey)) {
|
||||
signalModifiedKey(c,c->db,storekey);
|
||||
keyModified(c,c->db,storekey,NULL,1);
|
||||
notifyKeyspaceEvent(NOTIFY_GENERIC,"del",storekey,c->db->id);
|
||||
server.dirty++;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1655,7 +1655,7 @@ void pfaddCommand(client *c) {
|
|||
updateSlotAllocSize(c->db, getKeySlot(c->argv[1]->ptr), oldsize, stringObjectAllocSize(kv));
|
||||
if (updated) {
|
||||
HLL_INVALIDATE_CACHE(hdr);
|
||||
signalModifiedKey(c,c->db,c->argv[1]);
|
||||
keyModified(c,c->db,c->argv[1],kv,1);
|
||||
notifyKeyspaceEvent(NOTIFY_STRING,"pfadd",c->argv[1],c->db->id);
|
||||
server.dirty += updated;
|
||||
}
|
||||
|
|
@ -1750,7 +1750,7 @@ void pfcountCommand(client *c) {
|
|||
/* This is considered a read-only command even if the cached value
|
||||
* may be modified and given that the HLL is a Redis string
|
||||
* we need to propagate the change. */
|
||||
signalModifiedKey(c,c->db,c->argv[1]);
|
||||
keyModified(c,c->db,c->argv[1],o,1);
|
||||
server.dirty++;
|
||||
}
|
||||
addReplyLongLong(c,card);
|
||||
|
|
@ -1836,7 +1836,7 @@ void pfmergeCommand(client *c) {
|
|||
|
||||
if (server.memory_tracking_per_slot)
|
||||
updateSlotAllocSize(c->db, getKeySlot(c->argv[1]->ptr), oldsize, stringObjectAllocSize(kv));
|
||||
signalModifiedKey(c,c->db,c->argv[1]);
|
||||
keyModified(c,c->db,c->argv[1],kv,1);
|
||||
/* We generate a PFADD event for PFMERGE for semantical simplicity
|
||||
* since in theory this is a mass-add of elements. */
|
||||
notifyKeyspaceEvent(NOTIFY_STRING,"pfadd",c->argv[1],c->db->id);
|
||||
|
|
|
|||
|
|
@ -396,7 +396,7 @@ typedef struct RedisModuleConfigIterator {
|
|||
#define REDISMODULE_ARGV_DRY_RUN (1<<10)
|
||||
#define REDISMODULE_ARGV_ALLOW_BLOCK (1<<11)
|
||||
|
||||
/* Determine whether Redis should signalModifiedKey implicitly.
|
||||
/* Determine whether Redis should signal modified key implicitly.
|
||||
* In case 'ctx' has no 'module' member (and therefore no module->options),
|
||||
* we assume default behavior, that is, Redis signals.
|
||||
* (see RM_GetThreadSafeContext) */
|
||||
|
|
@ -2536,7 +2536,8 @@ void RM_SetModuleOptions(RedisModuleCtx *ctx, int options) {
|
|||
* RM_SetModuleOptions().
|
||||
*/
|
||||
int RM_SignalModifiedKey(RedisModuleCtx *ctx, RedisModuleString *keyname) {
|
||||
signalModifiedKey(ctx->client,ctx->client->db,keyname);
|
||||
kvobj *kv = lookupKeyReadWithFlags(ctx->client->db, keyname, LOOKUP_NOTOUCH);
|
||||
keyModified(ctx->client,ctx->client->db,keyname,kv,1);
|
||||
return REDISMODULE_OK;
|
||||
}
|
||||
|
||||
|
|
@ -4189,7 +4190,7 @@ int RM_GetOpenKeyModesAll(void) {
|
|||
static void moduleCloseKey(RedisModuleKey *key) {
|
||||
int signal = SHOULD_SIGNAL_MODIFIED_KEYS(key->ctx);
|
||||
if ((key->mode & REDISMODULE_WRITE) && signal)
|
||||
signalModifiedKey(key->ctx->client,key->db,key->key);
|
||||
keyModified(key->ctx->client,key->db,key->key,key->kv,1);
|
||||
if (key->kv) {
|
||||
if (key->iter) moduleFreeKeyIterator(key);
|
||||
switch (key->kv->type) {
|
||||
|
|
|
|||
|
|
@ -672,8 +672,9 @@ typedef enum {
|
|||
#define MAXMEMORY_FLAG_LRU (1<<0)
|
||||
#define MAXMEMORY_FLAG_LFU (1<<1)
|
||||
#define MAXMEMORY_FLAG_ALLKEYS (1<<2)
|
||||
#define MAXMEMORY_FLAG_LRM (1<<3)
|
||||
#define MAXMEMORY_FLAG_NO_SHARED_INTEGERS \
|
||||
(MAXMEMORY_FLAG_LRU|MAXMEMORY_FLAG_LFU)
|
||||
(MAXMEMORY_FLAG_LRU|MAXMEMORY_FLAG_LFU|MAXMEMORY_FLAG_LRM)
|
||||
|
||||
#define MAXMEMORY_VOLATILE_LRU ((0<<8)|MAXMEMORY_FLAG_LRU)
|
||||
#define MAXMEMORY_VOLATILE_LFU ((1<<8)|MAXMEMORY_FLAG_LFU)
|
||||
|
|
@ -683,6 +684,8 @@ typedef enum {
|
|||
#define MAXMEMORY_ALLKEYS_LFU ((5<<8)|MAXMEMORY_FLAG_LFU|MAXMEMORY_FLAG_ALLKEYS)
|
||||
#define MAXMEMORY_ALLKEYS_RANDOM ((6<<8)|MAXMEMORY_FLAG_ALLKEYS)
|
||||
#define MAXMEMORY_NO_EVICTION (7<<8)
|
||||
#define MAXMEMORY_VOLATILE_LRM ((8<<8)|MAXMEMORY_FLAG_LRM)
|
||||
#define MAXMEMORY_ALLKEYS_LRM ((9<<8)|MAXMEMORY_FLAG_LRM|MAXMEMORY_FLAG_ALLKEYS)
|
||||
|
||||
/* Units */
|
||||
#define UNIT_SECONDS 0
|
||||
|
|
@ -3870,7 +3873,7 @@ void discardTempDb(redisDb *tempDb);
|
|||
|
||||
|
||||
int selectDb(client *c, int id);
|
||||
void signalModifiedKey(client *c, redisDb *db, robj *key);
|
||||
void keyModified(client *c, redisDb *db, robj *key, robj *val, int signal);
|
||||
void signalFlushedDb(int dbid, int async, struct slotRangeArray *slots);
|
||||
void scanGenericCommand(client *c, robj *o, unsigned long long cursor);
|
||||
int parseScanCursorOrReply(client *c, robj *o, unsigned long long *cursor);
|
||||
|
|
|
|||
|
|
@ -632,7 +632,7 @@ void sortCommandGeneric(client *c, int readonly) {
|
|||
/* Ownership of sobj transferred to the db. No need to free it. */
|
||||
} else {
|
||||
if (dbDelete(c->db, storekey)) {
|
||||
signalModifiedKey(c, c->db, storekey);
|
||||
keyModified(c, c->db, storekey, NULL, 1);
|
||||
notifyKeyspaceEvent(NOTIFY_GENERIC, "del", storekey, c->db->id);
|
||||
server.dirty++;
|
||||
}
|
||||
|
|
|
|||
28
src/t_hash.c
28
src/t_hash.c
|
|
@ -781,10 +781,10 @@ GetFieldRes hashTypeGetValue(redisDb *db, kvobj *o, sds field, unsigned char **v
|
|||
if (!(hfeFlags & HFE_LAZY_NO_NOTIFICATION))
|
||||
notifyKeyspaceEvent(NOTIFY_GENERIC, "del", keyObj, db->id);
|
||||
dbDelete(db,keyObj);
|
||||
o = NULL;
|
||||
res = GETF_EXPIRED_HASH;
|
||||
}
|
||||
if (!(hfeFlags & HFE_LAZY_NO_SIGNAL))
|
||||
signalModifiedKey(NULL, db, keyObj);
|
||||
keyModified(NULL, db, keyObj, o, !(hfeFlags & HFE_LAZY_NO_SIGNAL));
|
||||
decrRefCount(keyObj);
|
||||
return res;
|
||||
}
|
||||
|
|
@ -1919,6 +1919,7 @@ uint64_t hashTypeActiveExpire(redisDb *db, kvobj *o, uint32_t *quota, int update
|
|||
robj *key = createStringObject(keystr, sdslen(keystr));
|
||||
notifyKeyspaceEvent(NOTIFY_HASH, "hexpired", key, db->id);
|
||||
int slot;
|
||||
int deleted = 0;
|
||||
|
||||
if (updateSubexpires) {
|
||||
slot = getKeySlot(keystr);
|
||||
|
|
@ -1929,12 +1930,13 @@ uint64_t hashTypeActiveExpire(redisDb *db, kvobj *o, uint32_t *quota, int update
|
|||
notifyKeyspaceEvent(NOTIFY_GENERIC, "del", key, db->id);
|
||||
dbDelete(db, key);
|
||||
noExpireLeftRes = 0;
|
||||
deleted = 1;
|
||||
} else {
|
||||
if ((updateSubexpires) && (info.nextExpireTime != EB_EXPIRE_TIME_INVALID))
|
||||
estoreAdd(db->subexpires, slot, o, info.nextExpireTime);
|
||||
}
|
||||
|
||||
signalModifiedKey(NULL, db, key);
|
||||
keyModified(NULL, db, key, deleted ? NULL : o, 1);
|
||||
decrRefCount(key);
|
||||
}
|
||||
|
||||
|
|
@ -2101,7 +2103,7 @@ void hsetnxCommand(client *c) {
|
|||
hashTypeTryConversion(c->db, kv, c->argv, 2, 3);
|
||||
hashTypeSet(c->db, kv, c->argv[2]->ptr, c->argv[3]->ptr, HASH_SET_COPY);
|
||||
addReply(c, shared.cone);
|
||||
signalModifiedKey(c,c->db,c->argv[1]);
|
||||
keyModified(c,c->db,c->argv[1], kv, 1);
|
||||
notifyKeyspaceEvent(NOTIFY_HASH,"hset",c->argv[1],c->db->id);
|
||||
hlen = hashTypeLength(kv, 0);
|
||||
updateKeysizesHist(c->db, getKeySlot(c->argv[1]->ptr), OBJ_HASH, hlen - 1, hlen);
|
||||
|
|
@ -2138,7 +2140,7 @@ void hsetCommand(client *c) {
|
|||
/* HMSET */
|
||||
addReply(c, shared.ok);
|
||||
}
|
||||
signalModifiedKey(c,c->db,c->argv[1]);
|
||||
keyModified(c,c->db,c->argv[1],kv,1);
|
||||
unsigned long l = hashTypeLength(kv, 0);
|
||||
updateKeysizesHist(c->db, getKeySlot(c->argv[1]->ptr), OBJ_HASH, l - created, l);
|
||||
if (server.memory_tracking_per_slot)
|
||||
|
|
@ -2467,7 +2469,7 @@ out:
|
|||
updateSlotAllocSize(c->db, getKeySlot(c->argv[1]->ptr), oldsize, hashTypeAllocSize(o));
|
||||
/* Emit keyspace notifications based on field expiry, mutation, or key deletion */
|
||||
if (fields_set || expired) {
|
||||
signalModifiedKey(c, c->db, c->argv[1]);
|
||||
keyModified(c, c->db, c->argv[1], o, 1);
|
||||
if (expired)
|
||||
notifyKeyspaceEvent(NOTIFY_HASH, "hexpired", c->argv[1], c->db->id);
|
||||
if (fields_set) {
|
||||
|
|
@ -2536,7 +2538,7 @@ void hincrbyCommand(client *c) {
|
|||
if (server.memory_tracking_per_slot)
|
||||
updateSlotAllocSize(c->db, getKeySlot(c->argv[1]->ptr), oldsize, hashTypeAllocSize(o));
|
||||
addReplyLongLong(c,value);
|
||||
signalModifiedKey(c,c->db,c->argv[1]);
|
||||
keyModified(c,c->db,c->argv[1], o, 1);
|
||||
notifyKeyspaceEvent(NOTIFY_HASH,"hincrby",c->argv[1],c->db->id);
|
||||
server.dirty++;
|
||||
}
|
||||
|
|
@ -2594,7 +2596,7 @@ void hincrbyfloatCommand(client *c) {
|
|||
if (server.memory_tracking_per_slot)
|
||||
updateSlotAllocSize(c->db, getKeySlot(c->argv[1]->ptr), oldsize, hashTypeAllocSize(o));
|
||||
addReplyBulkCBuffer(c,buf,len);
|
||||
signalModifiedKey(c,c->db,c->argv[1]);
|
||||
keyModified(c,c->db,c->argv[1],o,1);
|
||||
notifyKeyspaceEvent(NOTIFY_HASH,"hincrbyfloat",c->argv[1],c->db->id);
|
||||
server.dirty++;
|
||||
|
||||
|
|
@ -2737,7 +2739,7 @@ void hgetdelCommand(client *c) {
|
|||
|
||||
if (server.memory_tracking_per_slot)
|
||||
updateSlotAllocSize(c->db, getKeySlot(c->argv[1]->ptr), oldsize, hashTypeAllocSize(o));
|
||||
signalModifiedKey(c, c->db, c->argv[1]);
|
||||
keyModified(c, c->db, c->argv[1], o, 1);
|
||||
|
||||
if (expired)
|
||||
notifyKeyspaceEvent(NOTIFY_HASH, "hexpired", c->argv[1], c->db->id);
|
||||
|
|
@ -2845,7 +2847,7 @@ void hgetexCommand(client *c) {
|
|||
return;
|
||||
|
||||
server.dirty += deleted + updated;
|
||||
signalModifiedKey(c, c->db, c->argv[1]);
|
||||
keyModified(c, c->db, c->argv[1], o, 1);
|
||||
|
||||
/* This command will never be propagated as it is. It will be propagated as
|
||||
* HDELs when fields are lazily expired or deleted, if the new timestamp is
|
||||
|
|
@ -2948,7 +2950,7 @@ void hdelCommand(client *c) {
|
|||
updateSlotAllocSize(c->db, getKeySlot(c->argv[1]->ptr), oldsize, hashTypeAllocSize(o));
|
||||
if (deleted) {
|
||||
int64_t newLen = -1; /* The value -1 indicates that the key is deleted. */
|
||||
signalModifiedKey(c,c->db,c->argv[1]);
|
||||
keyModified(c, c->db, c->argv[1], keyremoved ? NULL : o, 1);
|
||||
notifyKeyspaceEvent(NOTIFY_HASH,"hdel",c->argv[1],c->db->id);
|
||||
if (keyremoved) {
|
||||
notifyKeyspaceEvent(NOTIFY_GENERIC, "del", c->argv[1], c->db->id);
|
||||
|
|
@ -3844,7 +3846,7 @@ static void hexpireGenericCommand(client *c, long long basetime, int unit) {
|
|||
|
||||
if (deleted + updated > 0) {
|
||||
server.dirty += deleted + updated;
|
||||
signalModifiedKey(c, c->db, keyArg);
|
||||
keyModified(c, c->db, keyArg, hashObj, 1);
|
||||
notifyKeyspaceEvent(NOTIFY_HASH, deleted ? "hdel" : "hexpire",
|
||||
keyArg, c->db->id);
|
||||
}
|
||||
|
|
@ -4069,7 +4071,7 @@ void hpersistCommand(client *c) {
|
|||
* has been successfully deleted. */
|
||||
if (changed) {
|
||||
notifyKeyspaceEvent(NOTIFY_HASH, "hpersist", c->argv[1], c->db->id);
|
||||
signalModifiedKey(c, c->db, c->argv[1]);
|
||||
keyModified(c, c->db, c->argv[1], hashObj, 1);
|
||||
server.dirty++;
|
||||
}
|
||||
}
|
||||
|
|
|
|||
17
src/t_list.c
17
src/t_list.c
|
|
@ -513,7 +513,7 @@ void pushGenericCommand(client *c, int where, int xx) {
|
|||
addReplyLongLong(c, llen);
|
||||
|
||||
char *event = (where == LIST_HEAD) ? "lpush" : "rpush";
|
||||
signalModifiedKey(c,c->db,c->argv[1]);
|
||||
keyModified(c,c->db,c->argv[1],lobj,1);
|
||||
notifyKeyspaceEvent(NOTIFY_LIST,event,c->argv[1],c->db->id);
|
||||
updateKeysizesHist(c->db, getKeySlot(c->argv[1]->ptr), OBJ_LIST, llen - (c->argc - 2), llen);
|
||||
if (server.memory_tracking_per_slot)
|
||||
|
|
@ -587,7 +587,7 @@ void linsertCommand(client *c) {
|
|||
updateSlotAllocSize(c->db, getKeySlot(c->argv[1]->ptr), oldsize, listTypeAllocSize(subject));
|
||||
|
||||
if (inserted) {
|
||||
signalModifiedKey(c,c->db,c->argv[1]);
|
||||
keyModified(c,c->db,c->argv[1],subject,1);
|
||||
notifyKeyspaceEvent(NOTIFY_LIST,"linsert",
|
||||
c->argv[1],c->db->id);
|
||||
server.dirty++;
|
||||
|
|
@ -659,7 +659,7 @@ void lsetCommand(client *c) {
|
|||
* above, so here we just need to try the conversion for shrinking. */
|
||||
listTypeTryConversion(o,LIST_CONV_SHRINKING,NULL,NULL);
|
||||
addReply(c,shared.ok);
|
||||
signalModifiedKey(c,c->db,c->argv[1]);
|
||||
keyModified(c,c->db,c->argv[1],o,1);
|
||||
notifyKeyspaceEvent(NOTIFY_LIST,"lset",c->argv[1],c->db->id);
|
||||
server.dirty++;
|
||||
} else {
|
||||
|
|
@ -778,7 +778,7 @@ void addListRangeReply(client *c, robj *o, long start, long end, int reverse) {
|
|||
|
||||
/* A housekeeping helper for list elements popping tasks.
|
||||
*
|
||||
* If 'signal' is 0, skip calling signalModifiedKey().
|
||||
* If 'signal' is 0, skip calling keyModified().
|
||||
*
|
||||
* 'deleted' is an optional output argument to get an indication
|
||||
* if the key got deleted by this function. */
|
||||
|
|
@ -801,7 +801,8 @@ void listElementsRemoved(client *c, robj *key, int where, robj *o, long count, s
|
|||
updateSlotAllocSize(c->db, getKeySlot(key->ptr), oldsize, listTypeAllocSize(o));
|
||||
if (deleted) *deleted = 0;
|
||||
}
|
||||
if (signal) signalModifiedKey(c, c->db, key);
|
||||
if (signal)
|
||||
keyModified(c, c->db, key, llen ? o : NULL, 1);
|
||||
server.dirty += count;
|
||||
}
|
||||
|
||||
|
|
@ -978,7 +979,7 @@ void ltrimCommand(client *c) {
|
|||
listTypeTryConversion(o,LIST_CONV_SHRINKING,NULL,NULL);
|
||||
}
|
||||
updateKeysizesHist(c->db, getKeySlot(c->argv[1]->ptr), OBJ_LIST, llen, llenNew);
|
||||
signalModifiedKey(c,c->db,c->argv[1]);
|
||||
keyModified(c, c->db, c->argv[1], (llenNew > 0) ? o : NULL, 1);
|
||||
server.dirty += (ltrim + rtrim);
|
||||
addReply(c,shared.ok);
|
||||
}
|
||||
|
|
@ -1150,7 +1151,7 @@ void lremCommand(client *c) {
|
|||
} else {
|
||||
listTypeTryConversion(subject,LIST_CONV_SHRINKING,NULL,NULL);
|
||||
}
|
||||
signalModifiedKey(c,c->db,c->argv[1]);
|
||||
keyModified(c, c->db, c->argv[1], ll ? subject : NULL, 1);
|
||||
}
|
||||
|
||||
addReplyLongLong(c,removed);
|
||||
|
|
@ -1170,7 +1171,7 @@ void lmoveHandlePush(client *c, robj *dstkey, robj *dstobj, robj *value,
|
|||
listTypePush(dstobj,value,where);
|
||||
if (server.memory_tracking_per_slot)
|
||||
updateSlotAllocSize(c->db, getKeySlot(dstkey->ptr), oldsize, listTypeAllocSize(dstobj));
|
||||
signalModifiedKey(c,c->db,dstkey);
|
||||
keyModified(c,c->db,dstkey,dstobj,1);
|
||||
|
||||
notifyKeyspaceEvent(NOTIFY_LIST,
|
||||
where == LIST_HEAD ? "lpush" : "rpush",
|
||||
|
|
|
|||
23
src/t_set.c
23
src/t_set.c
|
|
@ -638,7 +638,7 @@ void saddCommand(client *c) {
|
|||
if (added) {
|
||||
unsigned long size = setTypeSize(set);
|
||||
updateKeysizesHist(c->db, getKeySlot(c->argv[1]->ptr), OBJ_SET, size - added, size);
|
||||
signalModifiedKey(c,c->db,c->argv[1]);
|
||||
keyModified(c,c->db,c->argv[1],set,1);
|
||||
notifyKeyspaceEvent(NOTIFY_SET,"sadd",c->argv[1],c->db->id);
|
||||
}
|
||||
server.dirty += added;
|
||||
|
|
@ -674,7 +674,7 @@ void sremCommand(client *c) {
|
|||
if (deleted) {
|
||||
int64_t newSize = oldSize - deleted;
|
||||
|
||||
signalModifiedKey(c,c->db,c->argv[1]);
|
||||
keyModified(c, c->db, c->argv[1], keyremoved ? NULL : set, 1);
|
||||
notifyKeyspaceEvent(NOTIFY_SET,"srem",c->argv[1],c->db->id);
|
||||
if (keyremoved) {
|
||||
notifyKeyspaceEvent(NOTIFY_GENERIC,"del",c->argv[1],
|
||||
|
|
@ -741,7 +741,7 @@ void smoveCommand(client *c) {
|
|||
dbAdd(c->db, c->argv[2], &dstset);
|
||||
}
|
||||
|
||||
signalModifiedKey(c,c->db,c->argv[1]);
|
||||
keyModified(c, c->db, c->argv[1], (srcNewLen > 0) ? srcset : NULL, 1);
|
||||
server.dirty++;
|
||||
|
||||
if (server.memory_tracking_per_slot)
|
||||
|
|
@ -751,7 +751,7 @@ void smoveCommand(client *c) {
|
|||
unsigned long dstLen = setTypeSize(dstset);
|
||||
updateKeysizesHist(c->db, getKeySlot(c->argv[2]->ptr), OBJ_SET, dstLen - 1, dstLen);
|
||||
server.dirty++;
|
||||
signalModifiedKey(c,c->db,c->argv[2]);
|
||||
keyModified(c,c->db,c->argv[2],dstset,1);
|
||||
notifyKeyspaceEvent(NOTIFY_SET,"sadd",c->argv[2],c->db->id);
|
||||
}
|
||||
if (server.memory_tracking_per_slot)
|
||||
|
|
@ -858,7 +858,7 @@ void spopWithCountCommand(client *c) {
|
|||
/* Propagate this command as a DEL or UNLINK operation */
|
||||
robj *aux = server.lazyfree_lazy_server_del ? shared.unlink : shared.del;
|
||||
rewriteClientCommandVector(c, 2, aux, c->argv[1]);
|
||||
signalModifiedKey(c,c->db,c->argv[1]);
|
||||
keyModified(c,c->db,c->argv[1],NULL,1);
|
||||
return;
|
||||
}
|
||||
|
||||
|
|
@ -1020,6 +1020,7 @@ void spopWithCountCommand(client *c) {
|
|||
if (server.memory_tracking_per_slot)
|
||||
updateSlotAllocSize(c->db, getKeySlot(c->argv[1]->ptr), oldsize, setTypeAllocSize(set));
|
||||
dbReplaceValue(c->db, c->argv[1], &newset, 0);
|
||||
set = newset;
|
||||
}
|
||||
|
||||
/* Replicate/AOF the remaining elements as an SREM operation */
|
||||
|
|
@ -1037,7 +1038,7 @@ void spopWithCountCommand(client *c) {
|
|||
* we propagated the command as a set of SREMs operations using
|
||||
* the alsoPropagate() API. */
|
||||
preventCommandPropagation(c);
|
||||
signalModifiedKey(c,c->db,c->argv[1]);
|
||||
keyModified(c,c->db,c->argv[1],set,1);
|
||||
}
|
||||
|
||||
void spopCommand(client *c) {
|
||||
|
|
@ -1080,13 +1081,15 @@ void spopCommand(client *c) {
|
|||
decrRefCount(ele);
|
||||
|
||||
/* Delete the kv if it's empty */
|
||||
int deleted = 0;
|
||||
if (setTypeSize(kv) == 0) {
|
||||
deleted = 1;
|
||||
dbDelete(c->db,c->argv[1]);
|
||||
notifyKeyspaceEvent(NOTIFY_GENERIC,"del",c->argv[1],c->db->id);
|
||||
}
|
||||
|
||||
/* Set has been modified */
|
||||
signalModifiedKey(c,c->db,c->argv[1]);
|
||||
keyModified(c, c->db, c->argv[1], deleted ? NULL : kv, 1);
|
||||
server.dirty++;
|
||||
}
|
||||
|
||||
|
|
@ -1408,7 +1411,7 @@ void sinterGenericCommand(client *c, robj **setkeys,
|
|||
zfree(sets);
|
||||
if (dstkey) {
|
||||
if (dbDelete(c->db,dstkey)) {
|
||||
signalModifiedKey(c,c->db,dstkey);
|
||||
keyModified(c,c->db,dstkey,NULL,1);
|
||||
notifyKeyspaceEvent(NOTIFY_GENERIC,"del",dstkey,c->db->id);
|
||||
server.dirty++;
|
||||
}
|
||||
|
|
@ -1534,7 +1537,7 @@ void sinterGenericCommand(client *c, robj **setkeys,
|
|||
addReply(c,shared.czero);
|
||||
if (dbDelete(c->db,dstkey)) {
|
||||
server.dirty++;
|
||||
signalModifiedKey(c,c->db,dstkey);
|
||||
keyModified(c,c->db,dstkey,NULL,1);
|
||||
notifyKeyspaceEvent(NOTIFY_GENERIC,"del",dstkey,c->db->id);
|
||||
}
|
||||
decrRefCount(dstset);
|
||||
|
|
@ -1815,7 +1818,7 @@ void sunionDiffGenericCommand(client *c, robj **setkeys, int setnum,
|
|||
addReply(c,shared.czero);
|
||||
if (dbDelete(c->db,dstkey)) {
|
||||
server.dirty++;
|
||||
signalModifiedKey(c,c->db,dstkey);
|
||||
keyModified(c,c->db,dstkey,NULL,1);
|
||||
notifyKeyspaceEvent(NOTIFY_GENERIC,"del",dstkey,c->db->id);
|
||||
}
|
||||
decrRefCount(dstset);
|
||||
|
|
|
|||
|
|
@ -2442,7 +2442,7 @@ void xaddCommand(client *c) {
|
|||
if (server.memory_tracking_per_slot && old_alloc != s->alloc_size)
|
||||
updateSlotAllocSize(c->db,getKeySlot(c->argv[1]->ptr),old_alloc,s->alloc_size);
|
||||
|
||||
signalModifiedKey(c,c->db,c->argv[1]);
|
||||
keyModified(c,c->db,c->argv[1],kv,1);
|
||||
|
||||
/* Let's rewrite the ID argument with the one actually generated for
|
||||
* AOF/replication propagation. */
|
||||
|
|
@ -2804,6 +2804,7 @@ void xreadCommand(client *c) {
|
|||
consumer->name);
|
||||
}
|
||||
consumer->seen_time = commandTimeSnapshot();
|
||||
keyModified(c,c->db,c->argv[streams_arg+i],o,0); /* only update LRM */
|
||||
} else if (s->length) {
|
||||
/* For consumers without a group, we serve synchronously if we can
|
||||
* actually provide at least one item from the stream. */
|
||||
|
|
@ -2849,7 +2850,10 @@ void xreadCommand(client *c) {
|
|||
consumer, flags, &spi, &propCount);
|
||||
if (server.memory_tracking_per_slot && old_alloc != s->alloc_size)
|
||||
updateSlotAllocSize(c->db,getKeySlot(c->argv[streams_arg+i]->ptr),old_alloc,s->alloc_size);
|
||||
if (propCount) server.dirty++;
|
||||
if (propCount) {
|
||||
server.dirty++;
|
||||
keyModified(c,c->db,c->argv[streams_arg+i],o,0); /* only update LRM */
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -3340,7 +3344,7 @@ NULL
|
|||
o = createStreamObject();
|
||||
dbAdd(c->db, c->argv[2], &o);
|
||||
s = o->ptr;
|
||||
signalModifiedKey(c,c->db,c->argv[2]);
|
||||
keyModified(c,c->db,c->argv[2],o,1);
|
||||
}
|
||||
|
||||
if (entries_read != SCG_INVALID_ENTRIES_READ && (uint64_t)entries_read > s->entries_added) {
|
||||
|
|
@ -3356,6 +3360,7 @@ NULL
|
|||
server.dirty++;
|
||||
notifyKeyspaceEvent(NOTIFY_STREAM,"xgroup-create",
|
||||
c->argv[2],c->db->id);
|
||||
keyModified(c,c->db,c->argv[2],o,0);
|
||||
} else {
|
||||
addReplyError(c,"-BUSYGROUP Consumer Group name already exists");
|
||||
}
|
||||
|
|
@ -3376,6 +3381,7 @@ NULL
|
|||
addReply(c,shared.ok);
|
||||
server.dirty++;
|
||||
notifyKeyspaceEvent(NOTIFY_STREAM,"xgroup-setid",c->argv[2],c->db->id);
|
||||
keyModified(c,c->db,c->argv[2],o,0);
|
||||
} else if (!strcasecmp(opt,"DESTROY") && c->argc == 4) {
|
||||
if (cg) {
|
||||
old_alloc = s->alloc_size;
|
||||
|
|
@ -3387,6 +3393,7 @@ NULL
|
|||
server.dirty++;
|
||||
notifyKeyspaceEvent(NOTIFY_STREAM,"xgroup-destroy",
|
||||
c->argv[2],c->db->id);
|
||||
keyModified(c,c->db,c->argv[2],o,0);
|
||||
/* We want to unblock any XREADGROUP consumers with -NOGROUP. */
|
||||
signalKeyAsReady(c->db,c->argv[2],OBJ_STREAM);
|
||||
} else {
|
||||
|
|
@ -3396,6 +3403,7 @@ NULL
|
|||
old_alloc = s->alloc_size;
|
||||
streamConsumer *created = streamCreateConsumer(s,cg,c->argv[4]->ptr,c->argv[2],
|
||||
c->db->id,SCC_DEFAULT);
|
||||
keyModified(c,c->db,c->argv[2],o,0);
|
||||
if (server.memory_tracking_per_slot)
|
||||
updateSlotAllocSize(c->db,getKeySlot(c->argv[2]->ptr),old_alloc,s->alloc_size);
|
||||
addReplyLongLong(c,created ? 1 : 0);
|
||||
|
|
@ -3413,6 +3421,7 @@ NULL
|
|||
server.dirty++;
|
||||
notifyKeyspaceEvent(NOTIFY_STREAM,"xgroup-delconsumer",
|
||||
c->argv[2],c->db->id);
|
||||
keyModified(c,c->db,c->argv[2],o,0);
|
||||
}
|
||||
addReplyLongLong(c,pending);
|
||||
} else {
|
||||
|
|
@ -3493,6 +3502,7 @@ void xsetidCommand(client *c) {
|
|||
addReply(c,shared.ok);
|
||||
server.dirty++;
|
||||
notifyKeyspaceEvent(NOTIFY_STREAM,"xsetid",c->argv[1],c->db->id);
|
||||
keyModified(c,c->db,c->argv[1],kv,0);
|
||||
}
|
||||
|
||||
/* XACK <key> <group> <id> <id> ... <id>
|
||||
|
|
@ -3549,6 +3559,7 @@ void xackCommand(client *c) {
|
|||
streamDestroyNACK(kv->ptr, nack, buf);
|
||||
acknowledged++;
|
||||
server.dirty++;
|
||||
keyModified(c,c->db,c->argv[1],kv,0);
|
||||
}
|
||||
}
|
||||
if (server.memory_tracking_per_slot && old_alloc != s->alloc_size)
|
||||
|
|
@ -3604,7 +3615,7 @@ void xackdelCommand(client *c) {
|
|||
s = kv->ptr;
|
||||
size_t old_alloc = s->alloc_size;
|
||||
int first_entry = 0;
|
||||
int deleted = 0;
|
||||
int deleted = 0, dirty = server.dirty;
|
||||
addReplyArrayLen(c, args.numids);
|
||||
for (int j = 0; j < args.numids; j++) {
|
||||
int res = XACKDEL_NO_ID;
|
||||
|
|
@ -3666,8 +3677,11 @@ void xackdelCommand(client *c) {
|
|||
}
|
||||
|
||||
/* Propagate the write. */
|
||||
signalModifiedKey(c,c->db,c->argv[1]);
|
||||
keyModified(c,c->db,c->argv[1],kv,1);
|
||||
notifyKeyspaceEvent(NOTIFY_STREAM,"xdel",c->argv[1],c->db->id);
|
||||
} else if (server.dirty > dirty) {
|
||||
/* Only ACK succeeded without deleting elements, just update LRM without signaling */
|
||||
keyModified(c,c->db,c->argv[1],kv,0);
|
||||
}
|
||||
|
||||
cleanup:
|
||||
|
|
@ -4137,6 +4151,7 @@ void xclaimCommand(client *c) {
|
|||
}
|
||||
setDeferredArrayLen(c,arraylenptr,arraylen);
|
||||
preventCommandPropagation(c);
|
||||
keyModified(c,c->db,c->argv[1],o,0);
|
||||
cleanup:
|
||||
if (ids != static_ids) zfree(ids);
|
||||
}
|
||||
|
|
@ -4339,6 +4354,8 @@ void xautoclaimCommand(client *c) {
|
|||
zfree(deleted_ids);
|
||||
|
||||
preventCommandPropagation(c);
|
||||
/* Update LRM but don't signal. */
|
||||
keyModified(c,c->db,c->argv[1],o,0);
|
||||
}
|
||||
|
||||
/* XDEL <key> [<ID1> <ID2> ... <IDN>]
|
||||
|
|
@ -4398,7 +4415,7 @@ void xdelCommand(client *c) {
|
|||
|
||||
/* Propagate the write if needed. */
|
||||
if (deleted) {
|
||||
signalModifiedKey(c,c->db,c->argv[1]);
|
||||
keyModified(c,c->db,c->argv[1],kv,1);
|
||||
notifyKeyspaceEvent(NOTIFY_STREAM,"xdel",c->argv[1],c->db->id);
|
||||
server.dirty += deleted;
|
||||
}
|
||||
|
|
@ -4502,7 +4519,7 @@ void xdelexCommand(client *c) {
|
|||
}
|
||||
|
||||
/* Propagate the write. */
|
||||
signalModifiedKey(c,c->db,c->argv[1]);
|
||||
keyModified(c,c->db,c->argv[1],kv,1);
|
||||
notifyKeyspaceEvent(NOTIFY_STREAM,"xdel",c->argv[1],c->db->id);
|
||||
server.dirty += deleted;
|
||||
}
|
||||
|
|
@ -4570,7 +4587,7 @@ void xtrimCommand(client *c) {
|
|||
}
|
||||
|
||||
/* Propagate the write. */
|
||||
signalModifiedKey(c, c->db,c->argv[1]);
|
||||
keyModified(c, c->db,c->argv[1], kv, 1);
|
||||
server.dirty += deleted;
|
||||
}
|
||||
addReplyLongLong(c,deleted);
|
||||
|
|
|
|||
|
|
@ -499,7 +499,7 @@ void getexCommand(client *c) {
|
|||
serverAssert(deleted);
|
||||
robj *aux = server.lazyfree_lazy_expire ? shared.unlink : shared.del;
|
||||
rewriteClientCommandVector(c,2,aux,c->argv[1]);
|
||||
signalModifiedKey(c, c->db, c->argv[1]);
|
||||
keyModified(c, c->db, c->argv[1], NULL, 1);
|
||||
notifyKeyspaceEvent(NOTIFY_GENERIC, "del", c->argv[1], c->db->id);
|
||||
server.dirty++;
|
||||
} else if (args.expire) {
|
||||
|
|
@ -509,12 +509,12 @@ void getexCommand(client *c) {
|
|||
robj *milliseconds_obj = createStringObjectFromLongLong(milliseconds);
|
||||
rewriteClientCommandVector(c,3,shared.pexpireat,c->argv[1],milliseconds_obj);
|
||||
decrRefCount(milliseconds_obj);
|
||||
signalModifiedKey(c, c->db, c->argv[1]);
|
||||
keyModified(c, c->db, c->argv[1], o, 1);
|
||||
notifyKeyspaceEvent(NOTIFY_GENERIC,"expire",c->argv[1],c->db->id);
|
||||
server.dirty++;
|
||||
} else if (args.flags & OBJ_PERSIST) {
|
||||
if (removeExpire(c->db, c->argv[1])) {
|
||||
signalModifiedKey(c, c->db, c->argv[1]);
|
||||
keyModified(c, c->db, c->argv[1], o, 1);
|
||||
rewriteClientCommandVector(c, 2, shared.persist, c->argv[1]);
|
||||
notifyKeyspaceEvent(NOTIFY_GENERIC,"persist",c->argv[1],c->db->id);
|
||||
server.dirty++;
|
||||
|
|
@ -527,7 +527,7 @@ void getdelCommand(client *c) {
|
|||
if (dbSyncDelete(c->db, c->argv[1])) {
|
||||
/* Propagate as DEL command */
|
||||
rewriteClientCommandVector(c,2,shared.del,c->argv[1]);
|
||||
signalModifiedKey(c, c->db, c->argv[1]);
|
||||
keyModified(c, c->db, c->argv[1], NULL, 1);
|
||||
notifyKeyspaceEvent(NOTIFY_GENERIC, "del", c->argv[1], c->db->id);
|
||||
server.dirty++;
|
||||
}
|
||||
|
|
@ -606,7 +606,7 @@ void setrangeCommand(client *c) {
|
|||
if (server.memory_tracking_per_slot)
|
||||
updateSlotAllocSize(c->db, getKeySlot(c->argv[1]->ptr), oldsize, stringObjectAllocSize(kv));
|
||||
memcpy((char*)kv->ptr+offset,value,value_len);
|
||||
signalModifiedKey(c,c->db,c->argv[1]);
|
||||
keyModified(c,c->db,c->argv[1],kv,1);
|
||||
notifyKeyspaceEvent(NOTIFY_STRING,
|
||||
"setrange",c->argv[1],c->db->id);
|
||||
server.dirty++;
|
||||
|
|
@ -827,7 +827,7 @@ void incrDecrCommand(client *c, long long incr) {
|
|||
}
|
||||
}
|
||||
addReplyLongLongFromStr(c,new);
|
||||
signalModifiedKey(c,c->db,c->argv[1]);
|
||||
keyModified(c,c->db,c->argv[1],new,1);
|
||||
notifyKeyspaceEvent(NOTIFY_STRING,"incrby",c->argv[1],c->db->id);
|
||||
server.dirty++;
|
||||
}
|
||||
|
|
@ -879,7 +879,7 @@ void incrbyfloatCommand(client *c) {
|
|||
dbReplaceValueWithLink(c->db, c->argv[1], &new, link);
|
||||
else
|
||||
dbAddByLink(c->db, c->argv[1], &new, &link);
|
||||
signalModifiedKey(c,c->db,c->argv[1]);
|
||||
keyModified(c,c->db,c->argv[1],new,1);
|
||||
notifyKeyspaceEvent(NOTIFY_STRING,"incrbyfloat",c->argv[1],c->db->id);
|
||||
server.dirty++;
|
||||
addReplyBulk(c,new);
|
||||
|
|
@ -903,7 +903,7 @@ void appendCommand(client *c) {
|
|||
if (o == NULL) {
|
||||
/* Create the key */
|
||||
c->argv[2] = tryObjectEncoding(c->argv[2]);
|
||||
dbAddByLink(c->db, c->argv[1], &c->argv[2], &link);
|
||||
o = dbAddByLink(c->db, c->argv[1], &c->argv[2], &link);
|
||||
incrRefCount(c->argv[2]);
|
||||
totlen = stringObjectLen(c->argv[2]);
|
||||
} else {
|
||||
|
|
@ -928,7 +928,7 @@ void appendCommand(client *c) {
|
|||
int64_t oldlen = totlen - append_len;
|
||||
updateKeysizesHist(c->db, getKeySlot(c->argv[1]->ptr), OBJ_STRING, oldlen, totlen);
|
||||
}
|
||||
signalModifiedKey(c,c->db,c->argv[1]);
|
||||
keyModified(c,c->db,c->argv[1],o,1);
|
||||
notifyKeyspaceEvent(NOTIFY_STRING,"append",c->argv[1],c->db->id);
|
||||
server.dirty++;
|
||||
|
||||
|
|
|
|||
12
src/t_zset.c
12
src/t_zset.c
|
|
@ -1923,7 +1923,7 @@ reply_to_client:
|
|||
cleanup:
|
||||
zfree(scores);
|
||||
if (added || updated) {
|
||||
signalModifiedKey(c,c->db,key);
|
||||
keyModified(c,c->db,key,zobj,1);
|
||||
notifyKeyspaceEvent(NOTIFY_ZSET,
|
||||
incr ? "zincr" : "zadd", key, c->db->id);
|
||||
}
|
||||
|
|
@ -1971,7 +1971,7 @@ void zremCommand(client *c) {
|
|||
}
|
||||
|
||||
updateKeysizesHist(c->db, getKeySlot(key->ptr), OBJ_ZSET, oldlen, newlen);
|
||||
signalModifiedKey(c,c->db,key);
|
||||
keyModified(c, c->db, key, keyremoved ? NULL : zobj, 1);
|
||||
server.dirty += deleted;
|
||||
}
|
||||
addReplyLongLong(c,deleted);
|
||||
|
|
@ -2092,7 +2092,7 @@ void zremrangeGenericCommand(client *c, zrange_type rangetype) {
|
|||
updateSlotAllocSize(c->db, getKeySlot(key->ptr), oldsize, zsetAllocSize(zobj));
|
||||
if (deleted) {
|
||||
int64_t oldlen, newlen;
|
||||
signalModifiedKey(c,c->db,key);
|
||||
keyModified(c,c->db,key,NULL,1);
|
||||
notifyKeyspaceEvent(NOTIFY_ZSET,notify_type,key,c->db->id);
|
||||
if (keyremoved) {
|
||||
notifyKeyspaceEvent(NOTIFY_GENERIC, "del", key, c->db->id);
|
||||
|
|
@ -2969,7 +2969,7 @@ void zunionInterDiffGenericCommand(client *c, robj *dstkey, int numkeysIndex, in
|
|||
} else {
|
||||
addReply(c, shared.czero);
|
||||
if (dbDelete(c->db, dstkey)) {
|
||||
signalModifiedKey(c, c->db, dstkey);
|
||||
keyModified(c, c->db, dstkey, NULL, 1);
|
||||
notifyKeyspaceEvent(NOTIFY_GENERIC, "del", dstkey, c->db->id);
|
||||
server.dirty++;
|
||||
}
|
||||
|
|
@ -3178,7 +3178,7 @@ static void zrangeResultFinalizeStore(zrange_result_handler *handler, size_t res
|
|||
} else {
|
||||
addReply(handler->client, shared.czero);
|
||||
if (dbDelete(handler->client->db, handler->dstkey)) {
|
||||
signalModifiedKey(handler->client, handler->client->db, handler->dstkey);
|
||||
keyModified(handler->client, handler->client->db, handler->dstkey, NULL, 1);
|
||||
notifyKeyspaceEvent(NOTIFY_GENERIC, "del", handler->dstkey, handler->client->db->id);
|
||||
server.dirty++;
|
||||
}
|
||||
|
|
@ -4174,7 +4174,7 @@ void genericZpopCommand(client *c, robj **keyv, int keyc, int where, int emitkey
|
|||
newlen = -1;
|
||||
}
|
||||
updateKeysizesHist(c->db, getKeySlot(key->ptr), OBJ_ZSET, oldlen, newlen);
|
||||
signalModifiedKey(c,c->db,key);
|
||||
keyModified(c, c->db, key, (newlen > 0) ? zobj : NULL, 1);
|
||||
|
||||
if (c->cmd->proc == zmpopCommand) {
|
||||
/* Always replicate it as ZPOP[MIN|MAX] with COUNT option instead of ZMPOP. */
|
||||
|
|
|
|||
|
|
@ -352,7 +352,7 @@ void trackingRememberKeyToBroadcast(client *c, char *keyname, size_t keylen) {
|
|||
raxStop(&ri);
|
||||
}
|
||||
|
||||
/* This function is called from signalModifiedKey() or other places in Redis
|
||||
/* This function is called from keyModified() or other places in Redis
|
||||
* when a key changes value. In the context of keys tracking, our task here is
|
||||
* to send a notification to every client that may have keys about such caching
|
||||
* slot.
|
||||
|
|
|
|||
|
|
@ -551,6 +551,16 @@ int only_reply_ok(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
|
|||
return REDISMODULE_OK;
|
||||
}
|
||||
|
||||
/* Test command for RM_SignalModifiedKey. */
|
||||
int test_signalmodifiedkey(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
|
||||
if (argc != 2) return RedisModule_WrongArity(ctx);
|
||||
|
||||
/* Manually signal that the key was modified */
|
||||
RedisModule_SignalModifiedKey(ctx, argv[1]);
|
||||
RedisModule_ReplyWithSimpleString(ctx, "OK");
|
||||
return REDISMODULE_OK;
|
||||
}
|
||||
|
||||
int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
|
||||
REDISMODULE_NOT_USED(argv);
|
||||
REDISMODULE_NOT_USED(argc);
|
||||
|
|
@ -625,6 +635,8 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
|
|||
RedisModuleCommand *parent = RedisModule_GetCommand(ctx, "test.no_cluster_cmd");
|
||||
if (RedisModule_CreateSubcommand(parent, "set", only_reply_ok, "no-cluster", 0, 0, 0) == REDISMODULE_ERR)
|
||||
return REDISMODULE_ERR;
|
||||
if (RedisModule_CreateCommand(ctx, "test.signalmodifiedkey", test_signalmodifiedkey, "write", 1, 1, 1) == REDISMODULE_ERR)
|
||||
return REDISMODULE_ERR;
|
||||
|
||||
return REDISMODULE_OK;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -163,7 +163,7 @@ start_server {tags {"maxmemory" "external:skip"}} {
|
|||
start_server {tags {"maxmemory external:skip"}} {
|
||||
|
||||
foreach policy {
|
||||
allkeys-random allkeys-lru allkeys-lfu volatile-lru volatile-lfu volatile-random volatile-ttl
|
||||
allkeys-random allkeys-lru allkeys-lfu allkeys-lrm volatile-lru volatile-lfu volatile-random volatile-ttl volatile-lrm
|
||||
} {
|
||||
test "maxmemory - is the memory limit honoured? (policy $policy)" {
|
||||
# make sure to start with a blank instance
|
||||
|
|
@ -195,7 +195,7 @@ start_server {tags {"maxmemory external:skip"}} {
|
|||
}
|
||||
|
||||
foreach policy {
|
||||
allkeys-random allkeys-lru volatile-lru volatile-random volatile-ttl
|
||||
allkeys-random allkeys-lru allkeys-lrm volatile-lru volatile-random volatile-ttl volatile-lrm
|
||||
} {
|
||||
test "maxmemory - only allkeys-* should remove non-volatile keys ($policy)" {
|
||||
# make sure to start with a blank instance
|
||||
|
|
@ -237,7 +237,7 @@ start_server {tags {"maxmemory external:skip"}} {
|
|||
}
|
||||
|
||||
foreach policy {
|
||||
volatile-lru volatile-lfu volatile-random volatile-ttl
|
||||
volatile-lru volatile-lfu volatile-random volatile-ttl volatile-lrm
|
||||
} {
|
||||
test "maxmemory - policy $policy should only remove volatile keys." {
|
||||
# make sure to start with a blank instance
|
||||
|
|
@ -602,3 +602,92 @@ start_server {tags {"maxmemory" "external:skip"}} {
|
|||
assert {[r object freq foo] == 5}
|
||||
}
|
||||
}
|
||||
|
||||
# LRM eviction policy tests
|
||||
start_server {tags {"maxmemory" "external:skip"}} {
|
||||
test {LRM: Basic write updates idle time} {
|
||||
r flushdb
|
||||
r config set maxmemory-policy allkeys-lrm
|
||||
|
||||
r set foo a
|
||||
after 2000
|
||||
|
||||
# Read the key should NOT update LRM
|
||||
r get foo
|
||||
assert_morethan_equal [r object idletime foo] 1
|
||||
|
||||
# LRM should be updated (idletime should be smaller)
|
||||
r set foo b
|
||||
assert_lessthan_equal [r object idletime foo] 1
|
||||
} {} {slow}
|
||||
|
||||
test {LRM: RENAME updates destination key LRM} {
|
||||
r flushdb
|
||||
r set src value
|
||||
after 2000
|
||||
r rename src dst
|
||||
assert_lessthan [r object idletime dst] 1
|
||||
} {} {slow}
|
||||
|
||||
test {LRM: XREADGROUP updates stream LRM} {
|
||||
r flushdb
|
||||
r xadd mystream * field value
|
||||
r xgroup create mystream mygroup 0
|
||||
after 2000
|
||||
r xreadgroup GROUP mygroup consumer1 STREAMS mystream >
|
||||
assert_lessthan [r object idletime mystream] 1
|
||||
} {} {slow}
|
||||
|
||||
test {LRM: Keys with only read operations should be removed first} {
|
||||
r flushdb
|
||||
r config set maxmemory 0
|
||||
r config set maxmemory-policy allkeys-lrm
|
||||
r config set maxmemory-samples 64 ;# Ensure eviction sampling can pick all keys
|
||||
|
||||
# Create keys and populate them
|
||||
# We'll create two groups of keys:
|
||||
# - read-only keys: will only be read after creation
|
||||
# - write keys: will be continuously written to
|
||||
for {set j 0} {$j < 25} {incr j} {
|
||||
r set "read:$j" [string repeat x 20000]
|
||||
r set "write:$j" [string repeat x 20000]
|
||||
}
|
||||
|
||||
after 1000
|
||||
|
||||
# Perform read and write operations on keys
|
||||
for {set j 0} {$j < 25} {incr j} {
|
||||
r get "read:$j"
|
||||
r set "write:$j" [string repeat y 20000]
|
||||
}
|
||||
|
||||
# Set memory limit to force eviction
|
||||
set used [s used_memory]
|
||||
set limit [expr {$used - 200*1024}]
|
||||
r config set maxmemory $limit
|
||||
|
||||
# Add more keys to trigger eviction
|
||||
for {set j 0} {$j < 10} {incr j} {
|
||||
r set "trigger:$j" [string repeat z 20000]
|
||||
}
|
||||
|
||||
# Count how many keys from each group survived
|
||||
set read_survived 0
|
||||
set write_survived 0
|
||||
for {set j 0} {$j < 25} {incr j} {
|
||||
if {[r exists "read:$j"]} {
|
||||
incr read_survived
|
||||
}
|
||||
if {[r exists "write:$j"]} {
|
||||
incr write_survived
|
||||
}
|
||||
}
|
||||
|
||||
# If read-only keys haven't been fully evicted, write keys must not be evicted at all. */
|
||||
if {$read_survived > 0} {
|
||||
assert {$write_survived == 25}
|
||||
} else {
|
||||
assert {$write_survived > $read_survived}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -148,6 +148,36 @@ start_server {overrides {save {900 1}} tags {"modules external:skip"}} {
|
|||
$rd_trk close
|
||||
}
|
||||
|
||||
test {RM_SignalModifiedKey - tracking invalidation} {
|
||||
set rd_trk [redis_client]
|
||||
$rd_trk HELLO 3
|
||||
$rd_trk CLIENT TRACKING on
|
||||
r SET mykey{t} abc
|
||||
|
||||
# Track the key by reading it
|
||||
$rd_trk GET mykey{t}
|
||||
|
||||
# # Modify the key using module command that calls RM_SignalModifiedKey
|
||||
r test.signalmodifiedkey mykey{t}
|
||||
|
||||
# # Should receive invalidation message
|
||||
assert_equal {invalidate mykey{t}} [$rd_trk read]
|
||||
assert_equal "PONG" [$rd_trk ping]
|
||||
$rd_trk close
|
||||
}
|
||||
|
||||
test {RM_SignalModifiedKey - update LRM timestamp} {
|
||||
set old_policy [config_get_set maxmemory-policy allkeys-lrm]
|
||||
r SET mykey{t} abc
|
||||
after 2000
|
||||
assert_morethan_equal [r object idletime mykey{t}] 1
|
||||
|
||||
# LRM should be updated.
|
||||
r test.signalmodifiedkey mykey{t}
|
||||
assert_lessthan [r object idletime mykey{t}] 2
|
||||
r config set maxmemory-policy $old_policy
|
||||
} {OK} {slow}
|
||||
|
||||
test {publish to self inside rm_call} {
|
||||
r hello 3
|
||||
r subscribe foo
|
||||
|
|
|
|||
Loading…
Reference in a new issue