diff --git a/redis.conf b/redis.conf index 22dea17ad..9859a7972 100644 --- a/redis.conf +++ b/redis.conf @@ -1177,6 +1177,8 @@ acllog-max-len 128 # allkeys-lru -> Evict any key using approximated LRU. # volatile-lfu -> Evict using approximated LFU, only keys with an expire set. # allkeys-lfu -> Evict any key using approximated LFU. +# volatile-lrm -> Evict using approximated LRM, only keys with an expire set. +# allkeys-lrm -> Evict any key using approximated LRM. # volatile-random -> Remove a random key having an expire set. # allkeys-random -> Remove a random key, any key. # volatile-ttl -> Remove the key with the nearest expire time (minor TTL) @@ -1184,10 +1186,17 @@ acllog-max-len 128 # # LRU means Least Recently Used # LFU means Least Frequently Used +# LRM means Least Recently Modified (only write operations update the timestamp) # -# Both LRU, LFU and volatile-ttl are implemented using approximated +# LRU, LFU, LRM and volatile-ttl are implemented using approximated # randomized algorithms. # +# LRU vs LRM: Both use similar eviction logic based on access time, but: +# - LRU updates the timestamp on both read (GET) and write (SET) operations +# - LRM only updates the timestamp on write (SET, INCR, etc.) operations +# This makes LRM useful when you want to evict keys that haven't been updated +# recently, regardless of how often they are read. +# # Note: with any of the above policies, when there are no suitable keys for # eviction, Redis will return an error on write operations that require # more memory. These are usually commands that create new keys, add data or diff --git a/src/bitops.c b/src/bitops.c index abb59bb9d..7a3d9f934 100644 --- a/src/bitops.c +++ b/src/bitops.c @@ -875,7 +875,7 @@ void setbitCommand(client *c) { byteval &= ~(1 << bit); byteval |= ((on & 0x1) << bit); ((uint8_t*)o->ptr)[byte] = byteval; - signalModifiedKey(c,c->db,c->argv[1]); + keyModified(c,c->db,c->argv[1],o,1); notifyKeyspaceEvent(NOTIFY_STRING,"setbit",c->argv[1],c->db->id); server.dirty++; @@ -1447,7 +1447,7 @@ void bitopCommand(client *c) { notifyKeyspaceEvent(NOTIFY_STRING,"set",targetkey,c->db->id); server.dirty++; } else if (dbDelete(c->db,targetkey)) { - signalModifiedKey(c,c->db,targetkey); + keyModified(c,c->db,targetkey,NULL,1); notifyKeyspaceEvent(NOTIFY_GENERIC,"del",targetkey,c->db->id); server.dirty++; } @@ -1951,7 +1951,7 @@ void bitfieldGeneric(client *c, int flags) { updateKeysizesHist(c->db, getKeySlot(c->argv[1]->ptr), OBJ_STRING, strOldSize, strOldSize + strGrowSize); - signalModifiedKey(c,c->db,c->argv[1]); + keyModified(c,c->db,c->argv[1],o,1); notifyKeyspaceEvent(NOTIFY_STRING,"setbit",c->argv[1],c->db->id); server.dirty += changes; } diff --git a/src/cluster.c b/src/cluster.c index c2cb0a6df..895a40b23 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -257,7 +257,7 @@ void restoreCommand(client *c) { if (deleted) { robj *aux = server.lazyfree_lazy_server_del ? shared.unlink : shared.del; rewriteClientCommandVector(c, 2, aux, key); - signalModifiedKey(c,c->db,key); + keyModified(c,c->db,key,NULL,1); notifyKeyspaceEvent(NOTIFY_GENERIC,"del",key,c->db->id); server.dirty++; } @@ -287,7 +287,7 @@ void restoreCommand(client *c) { } } objectSetLRUOrLFU(kv, lfu_freq, lru_idle, lru_clock, 1000); - signalModifiedKey(c,c->db,key); + keyModified(c,c->db,key,NULL,1); notifyKeyspaceEvent(NOTIFY_GENERIC,"restore",key,c->db->id); /* If we deleted a key that means REPLACE parameter was passed and the @@ -660,7 +660,7 @@ void migrateCommand(client *c) { if (!copy) { /* No COPY option: remove the local key, signal the change. */ dbDelete(c->db,keyArray[j]); - signalModifiedKey(c,c->db,keyArray[j]); + keyModified(c,c->db,keyArray[j],NULL,1); notifyKeyspaceEvent(NOTIFY_GENERIC,"del",keyArray[j],c->db->id); server.dirty++; @@ -1681,7 +1681,7 @@ unsigned int clusterDelKeysInSlot(unsigned int hashslot, int by_command) { robj *key = createStringObject(sdskey, sdslen(sdskey)); dbDelete(&server.db[0], key); - signalModifiedKey(NULL, &server.db[0], key); + keyModified(NULL, &server.db[0], key, NULL, 1); if (by_command) { /* Keys are deleted by a command (trimslots), we need to notify the * keyspace event. Though, we don't need to propagate the DEL diff --git a/src/cluster_asm.c b/src/cluster_asm.c index 0638890df..d29969701 100644 --- a/src/cluster_asm.c +++ b/src/cluster_asm.c @@ -3416,7 +3416,7 @@ void asmActiveTrimDeleteKey(redisDb *db, robj *keyobj) { if (static_key) keyobj = createStringObject(keyobj->ptr, sdslen(keyobj->ptr)); dbDelete(db, keyobj); - signalModifiedKey(NULL, db, keyobj); + keyModified(NULL, db, keyobj, NULL, 1); /* The keys are not actually logically deleted from the database, just moved * to another node. The modules need to know that these keys are no longer * available locally, so just send the keyspace notification to the modules, diff --git a/src/config.c b/src/config.c index 3f1e57a9a..26b77aabc 100644 --- a/src/config.c +++ b/src/config.c @@ -40,9 +40,11 @@ configEnum maxmemory_policy_enum[] = { {"volatile-lfu", MAXMEMORY_VOLATILE_LFU}, {"volatile-random",MAXMEMORY_VOLATILE_RANDOM}, {"volatile-ttl",MAXMEMORY_VOLATILE_TTL}, + {"volatile-lrm",MAXMEMORY_VOLATILE_LRM}, {"allkeys-lru",MAXMEMORY_ALLKEYS_LRU}, {"allkeys-lfu",MAXMEMORY_ALLKEYS_LFU}, {"allkeys-random",MAXMEMORY_ALLKEYS_RANDOM}, + {"allkeys-lrm",MAXMEMORY_ALLKEYS_LRM}, {"noeviction",MAXMEMORY_NO_EVICTION}, {NULL, 0} }; diff --git a/src/db.c b/src/db.c index 154439987..9d6203659 100644 --- a/src/db.c +++ b/src/db.c @@ -56,6 +56,15 @@ void updateLFU(robj *val) { val->lru = (LFUGetTimeInMinutes()<<8) | counter; } +/* Update LRM when an object is modified. */ +void updateLRM(robj *o) { + if (o->refcount == OBJ_SHARED_REFCOUNT) + return; + if (server.maxmemory_policy & MAXMEMORY_FLAG_LRM) { + o->lru = LRU_CLOCK(); + } +} + /* * Update histogram of keys-sizes * @@ -278,7 +287,8 @@ kvobj *lookupKey(redisDb *db, robj *key, int flags, dictEntryLink *link) { if (!hasActiveChildProcess() && !(flags & LOOKUP_NOTOUCH)){ if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) { updateLFU(val); - } else { + } else if (!(server.maxmemory_policy & MAXMEMORY_FLAG_LRM)) { + /* LRM policy should NOT update timestamp on reads. */ val->lru = LRU_CLOCK(); } } @@ -682,8 +692,8 @@ void setKeyByLink(client *c, redisDb *db, robj *key, robj **valref, int flags, d dbAddByLink(db, key, valref, link); } - if (!(flags & SETKEY_NO_SIGNAL)) - signalModifiedKey(c,db,key); + /* Signal key modification and update LRM timestamp. */ + keyModified(c,db,key,*valref,!(flags & SETKEY_NO_SIGNAL)); } /* During atomic slot migration, keys that are being imported are in an @@ -1016,16 +1026,27 @@ long long dbTotalServerKeyCount(void) { * Hooks for key space changes. * * Every time a key in the database is modified the function - * signalModifiedKey() is called. + * keyModified() is called. * * Every time a DB is flushed the function signalFlushDb() is called. *----------------------------------------------------------------------------*/ -/* Note that the 'c' argument may be NULL if the key was modified out of - * a context of a client. */ -void signalModifiedKey(client *c, redisDb *db, robj *key) { - touchWatchedKey(db,key); - trackingInvalidateKey(c,key,1); +/* Called when a key is modified to update LRM timestamp + * and optionally signal watchers/tracking clients. + * + * Arguments: + * - c: client (may be NULL if the key was modified out of a context of a client) + * - db: database containing the key + * - key: the key that was modified + * - val: the value object (if NULL, LRM won't be updated, e.g., for deleted keys) + * - signal: if true, trigger WATCH and client-side tracking invalidation + */ +void keyModified(client *c, redisDb *db, robj *key, robj *val, int signal) { + if (val) updateLRM(val); + if (signal) { + touchWatchedKey(db,key); + trackingInvalidateKey(c,key,1); + } } void signalFlushedDb(int dbid, int async, slotRangeArray *slots) { @@ -1246,7 +1267,7 @@ void delGenericCommand(client *c, int lazy) { int deleted = lazy ? dbAsyncDelete(c->db,c->argv[j]) : dbSyncDelete(c->db,c->argv[j]); if (deleted) { - signalModifiedKey(c,c->db,c->argv[j]); + keyModified(c,c->db,c->argv[j],NULL,1); notifyKeyspaceEvent(NOTIFY_GENERIC, "del",c->argv[j],c->db->id); server.dirty++; @@ -1346,7 +1367,7 @@ void delexCommand(client *c) { if (deleted) { rewriteClientCommandVector(c, 2, shared.del, key); - signalModifiedKey(c, c->db, key); + keyModified(c, c->db, key, NULL, 1); notifyKeyspaceEvent(NOTIFY_GENERIC, "del", key, c->db->id); server.dirty++; } @@ -2059,8 +2080,8 @@ void renameGenericCommand(client *c, int nx) { if (minHashExpireTime != EB_EXPIRE_TIME_INVALID) estoreAdd(c->db->subexpires, getKeySlot(c->argv[2]->ptr), o, minHashExpireTime); - signalModifiedKey(c,c->db,c->argv[1]); - signalModifiedKey(c,c->db,c->argv[2]); + keyModified(c,c->db,c->argv[1],NULL,1); + keyModified(c,c->db,c->argv[2],NULL,1); /* LRM already updated by dbAddInternal */ notifyKeyspaceEvent(NOTIFY_GENERIC,"rename_from", c->argv[1],c->db->id); notifyKeyspaceEvent(NOTIFY_GENERIC,"rename_to", @@ -2152,8 +2173,8 @@ void moveCommand(client *c) { if (hashExpireTime != EB_EXPIRE_TIME_INVALID) estoreAdd(dst->subexpires, slot, kv, hashExpireTime); - signalModifiedKey(c,src,c->argv[1]); - signalModifiedKey(c,dst,c->argv[1]); + keyModified(c,src,c->argv[1],NULL,1); + keyModified(c,dst,c->argv[1],NULL,1); /* LRM already updated by dbAddInternal */ notifyKeyspaceEvent(NOTIFY_GENERIC, "move_from",c->argv[1],src->id); notifyKeyspaceEvent(NOTIFY_GENERIC, @@ -2269,8 +2290,8 @@ void copyCommand(client *c) { if (minHashExpire != EB_EXPIRE_TIME_INVALID) estoreAdd(dst->subexpires, getKeySlot(newkey->ptr), kvCopy, minHashExpire); - /* OK! key copied */ - signalModifiedKey(c,dst,c->argv[2]); + /* OK! key copied. Signal modification (LRM already updated by dbAddInternal) */ + keyModified(c,dst,c->argv[2],NULL,1); notifyKeyspaceEvent(NOTIFY_GENERIC,"copy_to",c->argv[2],dst->id); /* `delete` implies the destination key was overwritten */ @@ -2588,7 +2609,7 @@ static void deleteKeyAndPropagate(redisDb *db, robj *keyobj, int notify_type, lo * we are freeing removing the key, but we can't account for * that otherwise we would never exit the loop. * - * Same for CSC invalidation messages generated by signalModifiedKey. + * Same for CSC invalidation messages generated by keyModified. * * AOF and Output buffer memory will be freed eventually so * we only care about memory used by the key space. @@ -2605,7 +2626,7 @@ static void deleteKeyAndPropagate(redisDb *db, robj *keyobj, int notify_type, lo if (key_mem_freed) *key_mem_freed -= (long long) zmalloc_used_memory() - freeMemoryGetNotCountedMemory(); notifyKeyspaceEvent(notify_type, notify_name,keyobj, db->id); - signalModifiedKey(NULL, db, keyobj); + keyModified(NULL, db, keyobj, NULL, 1); propagateDeletion(db, keyobj, lazy_flag); if (notify_type == NOTIFY_EXPIRED) diff --git a/src/debug.c b/src/debug.c index cc4a7e45c..523180064 100644 --- a/src/debug.c +++ b/src/debug.c @@ -785,7 +785,7 @@ NULL memcpy(val->ptr, buf, valsize<=buflen? valsize: buflen); } dbAdd(c->db, key, &val); - signalModifiedKey(c,c->db,key); + keyModified(c,c->db,key,NULL,1); decrRefCount(key); } addReply(c,shared.ok); diff --git a/src/evict.c b/src/evict.c index 5509bdf03..e287edec6 100644 --- a/src/evict.c +++ b/src/evict.c @@ -149,7 +149,7 @@ int evictionPoolPopulate(redisDb *db, kvstore *samplekvs, struct evictionPoolEnt /* Calculate the idle time according to the policy. This is called * idle just because the code initially handled LRU, but is in fact * just a score where a higher score means better candidate. */ - if (server.maxmemory_policy & MAXMEMORY_FLAG_LRU) { + if (server.maxmemory_policy & (MAXMEMORY_FLAG_LRU|MAXMEMORY_FLAG_LRM)) { idle = estimateObjectIdleTime(kv); } else if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) { /* When we use an LRU policy, we sort the keys by idle time @@ -572,7 +572,7 @@ int performEvictions(void) { redisDb *db; dictEntry *de; - if (server.maxmemory_policy & (MAXMEMORY_FLAG_LRU|MAXMEMORY_FLAG_LFU) || + if (server.maxmemory_policy & (MAXMEMORY_FLAG_LRU|MAXMEMORY_FLAG_LFU|MAXMEMORY_FLAG_LRM) || server.maxmemory_policy == MAXMEMORY_VOLATILE_TTL) { struct evictionPoolEntry *pool = EvictionPoolLRU; diff --git a/src/expire.c b/src/expire.c index 4e69f0813..af0a103ec 100644 --- a/src/expire.c +++ b/src/expire.c @@ -814,7 +814,7 @@ void expireGenericCommand(client *c, long long basetime, int unit) { /* Replicate/AOF this as an explicit DEL or UNLINK. */ aux = server.lazyfree_lazy_expire ? shared.unlink : shared.del; rewriteClientCommandVector(c,2,aux,key); - signalModifiedKey(c,c->db,key); + keyModified(c,c->db,key,NULL,1); notifyKeyspaceEvent(NOTIFY_GENERIC,"del",key,c->db->id); addReply(c, shared.cone); return; @@ -834,7 +834,7 @@ void expireGenericCommand(client *c, long long basetime, int unit) { decrRefCount(when_obj); } - signalModifiedKey(c,c->db,key); + keyModified(c,c->db,key,kv,1); notifyKeyspaceEvent(NOTIFY_GENERIC,"expire",key,c->db->id); server.dirty++; return; @@ -908,9 +908,10 @@ void pexpiretimeCommand(client *c) { /* PERSIST key */ void persistCommand(client *c) { - if (lookupKeyWrite(c->db,c->argv[1])) { + kvobj *kv; + if ((kv = lookupKeyWrite(c->db,c->argv[1]))) { if (removeExpire(c->db,c->argv[1])) { - signalModifiedKey(c,c->db,c->argv[1]); + keyModified(c,c->db,c->argv[1],kv,1); notifyKeyspaceEvent(NOTIFY_GENERIC,"persist",c->argv[1],c->db->id); addReply(c,shared.cone); server.dirty++; diff --git a/src/geo.c b/src/geo.c index 1f9e5d20f..7166f3b93 100644 --- a/src/geo.c +++ b/src/geo.c @@ -699,7 +699,7 @@ void georadiusGeneric(client *c, int srcKeyIndex, int flags) { if (storekey) { /* store key is not NULL, try to delete it and return 0. */ if (dbDelete(c->db, storekey)) { - signalModifiedKey(c, c->db, storekey); + keyModified(c, c->db, storekey, NULL, 1); notifyKeyspaceEvent(NOTIFY_GENERIC, "del", storekey, c->db->id); server.dirty++; } @@ -833,7 +833,7 @@ void georadiusGeneric(client *c, int srcKeyIndex, int flags) { c->db->id); server.dirty += returned_items; } else if (dbDelete(c->db,storekey)) { - signalModifiedKey(c,c->db,storekey); + keyModified(c,c->db,storekey,NULL,1); notifyKeyspaceEvent(NOTIFY_GENERIC,"del",storekey,c->db->id); server.dirty++; } diff --git a/src/hyperloglog.c b/src/hyperloglog.c index 0e0ea1d03..05a515295 100644 --- a/src/hyperloglog.c +++ b/src/hyperloglog.c @@ -1655,7 +1655,7 @@ void pfaddCommand(client *c) { updateSlotAllocSize(c->db, getKeySlot(c->argv[1]->ptr), oldsize, stringObjectAllocSize(kv)); if (updated) { HLL_INVALIDATE_CACHE(hdr); - signalModifiedKey(c,c->db,c->argv[1]); + keyModified(c,c->db,c->argv[1],kv,1); notifyKeyspaceEvent(NOTIFY_STRING,"pfadd",c->argv[1],c->db->id); server.dirty += updated; } @@ -1750,7 +1750,7 @@ void pfcountCommand(client *c) { /* This is considered a read-only command even if the cached value * may be modified and given that the HLL is a Redis string * we need to propagate the change. */ - signalModifiedKey(c,c->db,c->argv[1]); + keyModified(c,c->db,c->argv[1],o,1); server.dirty++; } addReplyLongLong(c,card); @@ -1836,7 +1836,7 @@ void pfmergeCommand(client *c) { if (server.memory_tracking_per_slot) updateSlotAllocSize(c->db, getKeySlot(c->argv[1]->ptr), oldsize, stringObjectAllocSize(kv)); - signalModifiedKey(c,c->db,c->argv[1]); + keyModified(c,c->db,c->argv[1],kv,1); /* We generate a PFADD event for PFMERGE for semantical simplicity * since in theory this is a mass-add of elements. */ notifyKeyspaceEvent(NOTIFY_STRING,"pfadd",c->argv[1],c->db->id); diff --git a/src/module.c b/src/module.c index 8284648f6..9b8fdea42 100644 --- a/src/module.c +++ b/src/module.c @@ -396,7 +396,7 @@ typedef struct RedisModuleConfigIterator { #define REDISMODULE_ARGV_DRY_RUN (1<<10) #define REDISMODULE_ARGV_ALLOW_BLOCK (1<<11) -/* Determine whether Redis should signalModifiedKey implicitly. +/* Determine whether Redis should signal modified key implicitly. * In case 'ctx' has no 'module' member (and therefore no module->options), * we assume default behavior, that is, Redis signals. * (see RM_GetThreadSafeContext) */ @@ -2536,7 +2536,8 @@ void RM_SetModuleOptions(RedisModuleCtx *ctx, int options) { * RM_SetModuleOptions(). */ int RM_SignalModifiedKey(RedisModuleCtx *ctx, RedisModuleString *keyname) { - signalModifiedKey(ctx->client,ctx->client->db,keyname); + kvobj *kv = lookupKeyReadWithFlags(ctx->client->db, keyname, LOOKUP_NOTOUCH); + keyModified(ctx->client,ctx->client->db,keyname,kv,1); return REDISMODULE_OK; } @@ -4189,7 +4190,7 @@ int RM_GetOpenKeyModesAll(void) { static void moduleCloseKey(RedisModuleKey *key) { int signal = SHOULD_SIGNAL_MODIFIED_KEYS(key->ctx); if ((key->mode & REDISMODULE_WRITE) && signal) - signalModifiedKey(key->ctx->client,key->db,key->key); + keyModified(key->ctx->client,key->db,key->key,key->kv,1); if (key->kv) { if (key->iter) moduleFreeKeyIterator(key); switch (key->kv->type) { diff --git a/src/server.h b/src/server.h index 21d4786b9..b891f7947 100644 --- a/src/server.h +++ b/src/server.h @@ -672,8 +672,9 @@ typedef enum { #define MAXMEMORY_FLAG_LRU (1<<0) #define MAXMEMORY_FLAG_LFU (1<<1) #define MAXMEMORY_FLAG_ALLKEYS (1<<2) +#define MAXMEMORY_FLAG_LRM (1<<3) #define MAXMEMORY_FLAG_NO_SHARED_INTEGERS \ - (MAXMEMORY_FLAG_LRU|MAXMEMORY_FLAG_LFU) + (MAXMEMORY_FLAG_LRU|MAXMEMORY_FLAG_LFU|MAXMEMORY_FLAG_LRM) #define MAXMEMORY_VOLATILE_LRU ((0<<8)|MAXMEMORY_FLAG_LRU) #define MAXMEMORY_VOLATILE_LFU ((1<<8)|MAXMEMORY_FLAG_LFU) @@ -683,6 +684,8 @@ typedef enum { #define MAXMEMORY_ALLKEYS_LFU ((5<<8)|MAXMEMORY_FLAG_LFU|MAXMEMORY_FLAG_ALLKEYS) #define MAXMEMORY_ALLKEYS_RANDOM ((6<<8)|MAXMEMORY_FLAG_ALLKEYS) #define MAXMEMORY_NO_EVICTION (7<<8) +#define MAXMEMORY_VOLATILE_LRM ((8<<8)|MAXMEMORY_FLAG_LRM) +#define MAXMEMORY_ALLKEYS_LRM ((9<<8)|MAXMEMORY_FLAG_LRM|MAXMEMORY_FLAG_ALLKEYS) /* Units */ #define UNIT_SECONDS 0 @@ -3870,7 +3873,7 @@ void discardTempDb(redisDb *tempDb); int selectDb(client *c, int id); -void signalModifiedKey(client *c, redisDb *db, robj *key); +void keyModified(client *c, redisDb *db, robj *key, robj *val, int signal); void signalFlushedDb(int dbid, int async, struct slotRangeArray *slots); void scanGenericCommand(client *c, robj *o, unsigned long long cursor); int parseScanCursorOrReply(client *c, robj *o, unsigned long long *cursor); diff --git a/src/sort.c b/src/sort.c index 774dcd783..d397b5ed5 100644 --- a/src/sort.c +++ b/src/sort.c @@ -632,7 +632,7 @@ void sortCommandGeneric(client *c, int readonly) { /* Ownership of sobj transferred to the db. No need to free it. */ } else { if (dbDelete(c->db, storekey)) { - signalModifiedKey(c, c->db, storekey); + keyModified(c, c->db, storekey, NULL, 1); notifyKeyspaceEvent(NOTIFY_GENERIC, "del", storekey, c->db->id); server.dirty++; } diff --git a/src/t_hash.c b/src/t_hash.c index d4b068e14..c868ec8a8 100644 --- a/src/t_hash.c +++ b/src/t_hash.c @@ -781,10 +781,10 @@ GetFieldRes hashTypeGetValue(redisDb *db, kvobj *o, sds field, unsigned char **v if (!(hfeFlags & HFE_LAZY_NO_NOTIFICATION)) notifyKeyspaceEvent(NOTIFY_GENERIC, "del", keyObj, db->id); dbDelete(db,keyObj); + o = NULL; res = GETF_EXPIRED_HASH; } - if (!(hfeFlags & HFE_LAZY_NO_SIGNAL)) - signalModifiedKey(NULL, db, keyObj); + keyModified(NULL, db, keyObj, o, !(hfeFlags & HFE_LAZY_NO_SIGNAL)); decrRefCount(keyObj); return res; } @@ -1919,6 +1919,7 @@ uint64_t hashTypeActiveExpire(redisDb *db, kvobj *o, uint32_t *quota, int update robj *key = createStringObject(keystr, sdslen(keystr)); notifyKeyspaceEvent(NOTIFY_HASH, "hexpired", key, db->id); int slot; + int deleted = 0; if (updateSubexpires) { slot = getKeySlot(keystr); @@ -1929,12 +1930,13 @@ uint64_t hashTypeActiveExpire(redisDb *db, kvobj *o, uint32_t *quota, int update notifyKeyspaceEvent(NOTIFY_GENERIC, "del", key, db->id); dbDelete(db, key); noExpireLeftRes = 0; + deleted = 1; } else { if ((updateSubexpires) && (info.nextExpireTime != EB_EXPIRE_TIME_INVALID)) estoreAdd(db->subexpires, slot, o, info.nextExpireTime); } - signalModifiedKey(NULL, db, key); + keyModified(NULL, db, key, deleted ? NULL : o, 1); decrRefCount(key); } @@ -2101,7 +2103,7 @@ void hsetnxCommand(client *c) { hashTypeTryConversion(c->db, kv, c->argv, 2, 3); hashTypeSet(c->db, kv, c->argv[2]->ptr, c->argv[3]->ptr, HASH_SET_COPY); addReply(c, shared.cone); - signalModifiedKey(c,c->db,c->argv[1]); + keyModified(c,c->db,c->argv[1], kv, 1); notifyKeyspaceEvent(NOTIFY_HASH,"hset",c->argv[1],c->db->id); hlen = hashTypeLength(kv, 0); updateKeysizesHist(c->db, getKeySlot(c->argv[1]->ptr), OBJ_HASH, hlen - 1, hlen); @@ -2138,7 +2140,7 @@ void hsetCommand(client *c) { /* HMSET */ addReply(c, shared.ok); } - signalModifiedKey(c,c->db,c->argv[1]); + keyModified(c,c->db,c->argv[1],kv,1); unsigned long l = hashTypeLength(kv, 0); updateKeysizesHist(c->db, getKeySlot(c->argv[1]->ptr), OBJ_HASH, l - created, l); if (server.memory_tracking_per_slot) @@ -2467,7 +2469,7 @@ out: updateSlotAllocSize(c->db, getKeySlot(c->argv[1]->ptr), oldsize, hashTypeAllocSize(o)); /* Emit keyspace notifications based on field expiry, mutation, or key deletion */ if (fields_set || expired) { - signalModifiedKey(c, c->db, c->argv[1]); + keyModified(c, c->db, c->argv[1], o, 1); if (expired) notifyKeyspaceEvent(NOTIFY_HASH, "hexpired", c->argv[1], c->db->id); if (fields_set) { @@ -2536,7 +2538,7 @@ void hincrbyCommand(client *c) { if (server.memory_tracking_per_slot) updateSlotAllocSize(c->db, getKeySlot(c->argv[1]->ptr), oldsize, hashTypeAllocSize(o)); addReplyLongLong(c,value); - signalModifiedKey(c,c->db,c->argv[1]); + keyModified(c,c->db,c->argv[1], o, 1); notifyKeyspaceEvent(NOTIFY_HASH,"hincrby",c->argv[1],c->db->id); server.dirty++; } @@ -2594,7 +2596,7 @@ void hincrbyfloatCommand(client *c) { if (server.memory_tracking_per_slot) updateSlotAllocSize(c->db, getKeySlot(c->argv[1]->ptr), oldsize, hashTypeAllocSize(o)); addReplyBulkCBuffer(c,buf,len); - signalModifiedKey(c,c->db,c->argv[1]); + keyModified(c,c->db,c->argv[1],o,1); notifyKeyspaceEvent(NOTIFY_HASH,"hincrbyfloat",c->argv[1],c->db->id); server.dirty++; @@ -2737,7 +2739,7 @@ void hgetdelCommand(client *c) { if (server.memory_tracking_per_slot) updateSlotAllocSize(c->db, getKeySlot(c->argv[1]->ptr), oldsize, hashTypeAllocSize(o)); - signalModifiedKey(c, c->db, c->argv[1]); + keyModified(c, c->db, c->argv[1], o, 1); if (expired) notifyKeyspaceEvent(NOTIFY_HASH, "hexpired", c->argv[1], c->db->id); @@ -2845,7 +2847,7 @@ void hgetexCommand(client *c) { return; server.dirty += deleted + updated; - signalModifiedKey(c, c->db, c->argv[1]); + keyModified(c, c->db, c->argv[1], o, 1); /* This command will never be propagated as it is. It will be propagated as * HDELs when fields are lazily expired or deleted, if the new timestamp is @@ -2948,7 +2950,7 @@ void hdelCommand(client *c) { updateSlotAllocSize(c->db, getKeySlot(c->argv[1]->ptr), oldsize, hashTypeAllocSize(o)); if (deleted) { int64_t newLen = -1; /* The value -1 indicates that the key is deleted. */ - signalModifiedKey(c,c->db,c->argv[1]); + keyModified(c, c->db, c->argv[1], keyremoved ? NULL : o, 1); notifyKeyspaceEvent(NOTIFY_HASH,"hdel",c->argv[1],c->db->id); if (keyremoved) { notifyKeyspaceEvent(NOTIFY_GENERIC, "del", c->argv[1], c->db->id); @@ -3844,7 +3846,7 @@ static void hexpireGenericCommand(client *c, long long basetime, int unit) { if (deleted + updated > 0) { server.dirty += deleted + updated; - signalModifiedKey(c, c->db, keyArg); + keyModified(c, c->db, keyArg, hashObj, 1); notifyKeyspaceEvent(NOTIFY_HASH, deleted ? "hdel" : "hexpire", keyArg, c->db->id); } @@ -4069,7 +4071,7 @@ void hpersistCommand(client *c) { * has been successfully deleted. */ if (changed) { notifyKeyspaceEvent(NOTIFY_HASH, "hpersist", c->argv[1], c->db->id); - signalModifiedKey(c, c->db, c->argv[1]); + keyModified(c, c->db, c->argv[1], hashObj, 1); server.dirty++; } } diff --git a/src/t_list.c b/src/t_list.c index 9d9d3239c..9aa8fbb23 100644 --- a/src/t_list.c +++ b/src/t_list.c @@ -513,7 +513,7 @@ void pushGenericCommand(client *c, int where, int xx) { addReplyLongLong(c, llen); char *event = (where == LIST_HEAD) ? "lpush" : "rpush"; - signalModifiedKey(c,c->db,c->argv[1]); + keyModified(c,c->db,c->argv[1],lobj,1); notifyKeyspaceEvent(NOTIFY_LIST,event,c->argv[1],c->db->id); updateKeysizesHist(c->db, getKeySlot(c->argv[1]->ptr), OBJ_LIST, llen - (c->argc - 2), llen); if (server.memory_tracking_per_slot) @@ -587,7 +587,7 @@ void linsertCommand(client *c) { updateSlotAllocSize(c->db, getKeySlot(c->argv[1]->ptr), oldsize, listTypeAllocSize(subject)); if (inserted) { - signalModifiedKey(c,c->db,c->argv[1]); + keyModified(c,c->db,c->argv[1],subject,1); notifyKeyspaceEvent(NOTIFY_LIST,"linsert", c->argv[1],c->db->id); server.dirty++; @@ -659,7 +659,7 @@ void lsetCommand(client *c) { * above, so here we just need to try the conversion for shrinking. */ listTypeTryConversion(o,LIST_CONV_SHRINKING,NULL,NULL); addReply(c,shared.ok); - signalModifiedKey(c,c->db,c->argv[1]); + keyModified(c,c->db,c->argv[1],o,1); notifyKeyspaceEvent(NOTIFY_LIST,"lset",c->argv[1],c->db->id); server.dirty++; } else { @@ -778,7 +778,7 @@ void addListRangeReply(client *c, robj *o, long start, long end, int reverse) { /* A housekeeping helper for list elements popping tasks. * - * If 'signal' is 0, skip calling signalModifiedKey(). + * If 'signal' is 0, skip calling keyModified(). * * 'deleted' is an optional output argument to get an indication * if the key got deleted by this function. */ @@ -801,7 +801,8 @@ void listElementsRemoved(client *c, robj *key, int where, robj *o, long count, s updateSlotAllocSize(c->db, getKeySlot(key->ptr), oldsize, listTypeAllocSize(o)); if (deleted) *deleted = 0; } - if (signal) signalModifiedKey(c, c->db, key); + if (signal) + keyModified(c, c->db, key, llen ? o : NULL, 1); server.dirty += count; } @@ -978,7 +979,7 @@ void ltrimCommand(client *c) { listTypeTryConversion(o,LIST_CONV_SHRINKING,NULL,NULL); } updateKeysizesHist(c->db, getKeySlot(c->argv[1]->ptr), OBJ_LIST, llen, llenNew); - signalModifiedKey(c,c->db,c->argv[1]); + keyModified(c, c->db, c->argv[1], (llenNew > 0) ? o : NULL, 1); server.dirty += (ltrim + rtrim); addReply(c,shared.ok); } @@ -1150,7 +1151,7 @@ void lremCommand(client *c) { } else { listTypeTryConversion(subject,LIST_CONV_SHRINKING,NULL,NULL); } - signalModifiedKey(c,c->db,c->argv[1]); + keyModified(c, c->db, c->argv[1], ll ? subject : NULL, 1); } addReplyLongLong(c,removed); @@ -1170,7 +1171,7 @@ void lmoveHandlePush(client *c, robj *dstkey, robj *dstobj, robj *value, listTypePush(dstobj,value,where); if (server.memory_tracking_per_slot) updateSlotAllocSize(c->db, getKeySlot(dstkey->ptr), oldsize, listTypeAllocSize(dstobj)); - signalModifiedKey(c,c->db,dstkey); + keyModified(c,c->db,dstkey,dstobj,1); notifyKeyspaceEvent(NOTIFY_LIST, where == LIST_HEAD ? "lpush" : "rpush", diff --git a/src/t_set.c b/src/t_set.c index bddafa126..96875e6e6 100644 --- a/src/t_set.c +++ b/src/t_set.c @@ -638,7 +638,7 @@ void saddCommand(client *c) { if (added) { unsigned long size = setTypeSize(set); updateKeysizesHist(c->db, getKeySlot(c->argv[1]->ptr), OBJ_SET, size - added, size); - signalModifiedKey(c,c->db,c->argv[1]); + keyModified(c,c->db,c->argv[1],set,1); notifyKeyspaceEvent(NOTIFY_SET,"sadd",c->argv[1],c->db->id); } server.dirty += added; @@ -674,7 +674,7 @@ void sremCommand(client *c) { if (deleted) { int64_t newSize = oldSize - deleted; - signalModifiedKey(c,c->db,c->argv[1]); + keyModified(c, c->db, c->argv[1], keyremoved ? NULL : set, 1); notifyKeyspaceEvent(NOTIFY_SET,"srem",c->argv[1],c->db->id); if (keyremoved) { notifyKeyspaceEvent(NOTIFY_GENERIC,"del",c->argv[1], @@ -741,7 +741,7 @@ void smoveCommand(client *c) { dbAdd(c->db, c->argv[2], &dstset); } - signalModifiedKey(c,c->db,c->argv[1]); + keyModified(c, c->db, c->argv[1], (srcNewLen > 0) ? srcset : NULL, 1); server.dirty++; if (server.memory_tracking_per_slot) @@ -751,7 +751,7 @@ void smoveCommand(client *c) { unsigned long dstLen = setTypeSize(dstset); updateKeysizesHist(c->db, getKeySlot(c->argv[2]->ptr), OBJ_SET, dstLen - 1, dstLen); server.dirty++; - signalModifiedKey(c,c->db,c->argv[2]); + keyModified(c,c->db,c->argv[2],dstset,1); notifyKeyspaceEvent(NOTIFY_SET,"sadd",c->argv[2],c->db->id); } if (server.memory_tracking_per_slot) @@ -858,7 +858,7 @@ void spopWithCountCommand(client *c) { /* Propagate this command as a DEL or UNLINK operation */ robj *aux = server.lazyfree_lazy_server_del ? shared.unlink : shared.del; rewriteClientCommandVector(c, 2, aux, c->argv[1]); - signalModifiedKey(c,c->db,c->argv[1]); + keyModified(c,c->db,c->argv[1],NULL,1); return; } @@ -1020,6 +1020,7 @@ void spopWithCountCommand(client *c) { if (server.memory_tracking_per_slot) updateSlotAllocSize(c->db, getKeySlot(c->argv[1]->ptr), oldsize, setTypeAllocSize(set)); dbReplaceValue(c->db, c->argv[1], &newset, 0); + set = newset; } /* Replicate/AOF the remaining elements as an SREM operation */ @@ -1037,7 +1038,7 @@ void spopWithCountCommand(client *c) { * we propagated the command as a set of SREMs operations using * the alsoPropagate() API. */ preventCommandPropagation(c); - signalModifiedKey(c,c->db,c->argv[1]); + keyModified(c,c->db,c->argv[1],set,1); } void spopCommand(client *c) { @@ -1080,13 +1081,15 @@ void spopCommand(client *c) { decrRefCount(ele); /* Delete the kv if it's empty */ + int deleted = 0; if (setTypeSize(kv) == 0) { + deleted = 1; dbDelete(c->db,c->argv[1]); notifyKeyspaceEvent(NOTIFY_GENERIC,"del",c->argv[1],c->db->id); } /* Set has been modified */ - signalModifiedKey(c,c->db,c->argv[1]); + keyModified(c, c->db, c->argv[1], deleted ? NULL : kv, 1); server.dirty++; } @@ -1408,7 +1411,7 @@ void sinterGenericCommand(client *c, robj **setkeys, zfree(sets); if (dstkey) { if (dbDelete(c->db,dstkey)) { - signalModifiedKey(c,c->db,dstkey); + keyModified(c,c->db,dstkey,NULL,1); notifyKeyspaceEvent(NOTIFY_GENERIC,"del",dstkey,c->db->id); server.dirty++; } @@ -1534,7 +1537,7 @@ void sinterGenericCommand(client *c, robj **setkeys, addReply(c,shared.czero); if (dbDelete(c->db,dstkey)) { server.dirty++; - signalModifiedKey(c,c->db,dstkey); + keyModified(c,c->db,dstkey,NULL,1); notifyKeyspaceEvent(NOTIFY_GENERIC,"del",dstkey,c->db->id); } decrRefCount(dstset); @@ -1815,7 +1818,7 @@ void sunionDiffGenericCommand(client *c, robj **setkeys, int setnum, addReply(c,shared.czero); if (dbDelete(c->db,dstkey)) { server.dirty++; - signalModifiedKey(c,c->db,dstkey); + keyModified(c,c->db,dstkey,NULL,1); notifyKeyspaceEvent(NOTIFY_GENERIC,"del",dstkey,c->db->id); } decrRefCount(dstset); diff --git a/src/t_stream.c b/src/t_stream.c index 63dace333..54ca36a95 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -2442,7 +2442,7 @@ void xaddCommand(client *c) { if (server.memory_tracking_per_slot && old_alloc != s->alloc_size) updateSlotAllocSize(c->db,getKeySlot(c->argv[1]->ptr),old_alloc,s->alloc_size); - signalModifiedKey(c,c->db,c->argv[1]); + keyModified(c,c->db,c->argv[1],kv,1); /* Let's rewrite the ID argument with the one actually generated for * AOF/replication propagation. */ @@ -2804,6 +2804,7 @@ void xreadCommand(client *c) { consumer->name); } consumer->seen_time = commandTimeSnapshot(); + keyModified(c,c->db,c->argv[streams_arg+i],o,0); /* only update LRM */ } else if (s->length) { /* For consumers without a group, we serve synchronously if we can * actually provide at least one item from the stream. */ @@ -2849,7 +2850,10 @@ void xreadCommand(client *c) { consumer, flags, &spi, &propCount); if (server.memory_tracking_per_slot && old_alloc != s->alloc_size) updateSlotAllocSize(c->db,getKeySlot(c->argv[streams_arg+i]->ptr),old_alloc,s->alloc_size); - if (propCount) server.dirty++; + if (propCount) { + server.dirty++; + keyModified(c,c->db,c->argv[streams_arg+i],o,0); /* only update LRM */ + } } } @@ -3340,7 +3344,7 @@ NULL o = createStreamObject(); dbAdd(c->db, c->argv[2], &o); s = o->ptr; - signalModifiedKey(c,c->db,c->argv[2]); + keyModified(c,c->db,c->argv[2],o,1); } if (entries_read != SCG_INVALID_ENTRIES_READ && (uint64_t)entries_read > s->entries_added) { @@ -3356,6 +3360,7 @@ NULL server.dirty++; notifyKeyspaceEvent(NOTIFY_STREAM,"xgroup-create", c->argv[2],c->db->id); + keyModified(c,c->db,c->argv[2],o,0); } else { addReplyError(c,"-BUSYGROUP Consumer Group name already exists"); } @@ -3376,6 +3381,7 @@ NULL addReply(c,shared.ok); server.dirty++; notifyKeyspaceEvent(NOTIFY_STREAM,"xgroup-setid",c->argv[2],c->db->id); + keyModified(c,c->db,c->argv[2],o,0); } else if (!strcasecmp(opt,"DESTROY") && c->argc == 4) { if (cg) { old_alloc = s->alloc_size; @@ -3387,6 +3393,7 @@ NULL server.dirty++; notifyKeyspaceEvent(NOTIFY_STREAM,"xgroup-destroy", c->argv[2],c->db->id); + keyModified(c,c->db,c->argv[2],o,0); /* We want to unblock any XREADGROUP consumers with -NOGROUP. */ signalKeyAsReady(c->db,c->argv[2],OBJ_STREAM); } else { @@ -3396,6 +3403,7 @@ NULL old_alloc = s->alloc_size; streamConsumer *created = streamCreateConsumer(s,cg,c->argv[4]->ptr,c->argv[2], c->db->id,SCC_DEFAULT); + keyModified(c,c->db,c->argv[2],o,0); if (server.memory_tracking_per_slot) updateSlotAllocSize(c->db,getKeySlot(c->argv[2]->ptr),old_alloc,s->alloc_size); addReplyLongLong(c,created ? 1 : 0); @@ -3413,6 +3421,7 @@ NULL server.dirty++; notifyKeyspaceEvent(NOTIFY_STREAM,"xgroup-delconsumer", c->argv[2],c->db->id); + keyModified(c,c->db,c->argv[2],o,0); } addReplyLongLong(c,pending); } else { @@ -3493,6 +3502,7 @@ void xsetidCommand(client *c) { addReply(c,shared.ok); server.dirty++; notifyKeyspaceEvent(NOTIFY_STREAM,"xsetid",c->argv[1],c->db->id); + keyModified(c,c->db,c->argv[1],kv,0); } /* XACK ... @@ -3549,6 +3559,7 @@ void xackCommand(client *c) { streamDestroyNACK(kv->ptr, nack, buf); acknowledged++; server.dirty++; + keyModified(c,c->db,c->argv[1],kv,0); } } if (server.memory_tracking_per_slot && old_alloc != s->alloc_size) @@ -3604,7 +3615,7 @@ void xackdelCommand(client *c) { s = kv->ptr; size_t old_alloc = s->alloc_size; int first_entry = 0; - int deleted = 0; + int deleted = 0, dirty = server.dirty; addReplyArrayLen(c, args.numids); for (int j = 0; j < args.numids; j++) { int res = XACKDEL_NO_ID; @@ -3666,8 +3677,11 @@ void xackdelCommand(client *c) { } /* Propagate the write. */ - signalModifiedKey(c,c->db,c->argv[1]); + keyModified(c,c->db,c->argv[1],kv,1); notifyKeyspaceEvent(NOTIFY_STREAM,"xdel",c->argv[1],c->db->id); + } else if (server.dirty > dirty) { + /* Only ACK succeeded without deleting elements, just update LRM without signaling */ + keyModified(c,c->db,c->argv[1],kv,0); } cleanup: @@ -4137,6 +4151,7 @@ void xclaimCommand(client *c) { } setDeferredArrayLen(c,arraylenptr,arraylen); preventCommandPropagation(c); + keyModified(c,c->db,c->argv[1],o,0); cleanup: if (ids != static_ids) zfree(ids); } @@ -4339,6 +4354,8 @@ void xautoclaimCommand(client *c) { zfree(deleted_ids); preventCommandPropagation(c); + /* Update LRM but don't signal. */ + keyModified(c,c->db,c->argv[1],o,0); } /* XDEL [ ... ] @@ -4398,7 +4415,7 @@ void xdelCommand(client *c) { /* Propagate the write if needed. */ if (deleted) { - signalModifiedKey(c,c->db,c->argv[1]); + keyModified(c,c->db,c->argv[1],kv,1); notifyKeyspaceEvent(NOTIFY_STREAM,"xdel",c->argv[1],c->db->id); server.dirty += deleted; } @@ -4502,7 +4519,7 @@ void xdelexCommand(client *c) { } /* Propagate the write. */ - signalModifiedKey(c,c->db,c->argv[1]); + keyModified(c,c->db,c->argv[1],kv,1); notifyKeyspaceEvent(NOTIFY_STREAM,"xdel",c->argv[1],c->db->id); server.dirty += deleted; } @@ -4570,7 +4587,7 @@ void xtrimCommand(client *c) { } /* Propagate the write. */ - signalModifiedKey(c, c->db,c->argv[1]); + keyModified(c, c->db,c->argv[1], kv, 1); server.dirty += deleted; } addReplyLongLong(c,deleted); diff --git a/src/t_string.c b/src/t_string.c index b582eb70d..901db75f3 100644 --- a/src/t_string.c +++ b/src/t_string.c @@ -499,7 +499,7 @@ void getexCommand(client *c) { serverAssert(deleted); robj *aux = server.lazyfree_lazy_expire ? shared.unlink : shared.del; rewriteClientCommandVector(c,2,aux,c->argv[1]); - signalModifiedKey(c, c->db, c->argv[1]); + keyModified(c, c->db, c->argv[1], NULL, 1); notifyKeyspaceEvent(NOTIFY_GENERIC, "del", c->argv[1], c->db->id); server.dirty++; } else if (args.expire) { @@ -509,12 +509,12 @@ void getexCommand(client *c) { robj *milliseconds_obj = createStringObjectFromLongLong(milliseconds); rewriteClientCommandVector(c,3,shared.pexpireat,c->argv[1],milliseconds_obj); decrRefCount(milliseconds_obj); - signalModifiedKey(c, c->db, c->argv[1]); + keyModified(c, c->db, c->argv[1], o, 1); notifyKeyspaceEvent(NOTIFY_GENERIC,"expire",c->argv[1],c->db->id); server.dirty++; } else if (args.flags & OBJ_PERSIST) { if (removeExpire(c->db, c->argv[1])) { - signalModifiedKey(c, c->db, c->argv[1]); + keyModified(c, c->db, c->argv[1], o, 1); rewriteClientCommandVector(c, 2, shared.persist, c->argv[1]); notifyKeyspaceEvent(NOTIFY_GENERIC,"persist",c->argv[1],c->db->id); server.dirty++; @@ -527,7 +527,7 @@ void getdelCommand(client *c) { if (dbSyncDelete(c->db, c->argv[1])) { /* Propagate as DEL command */ rewriteClientCommandVector(c,2,shared.del,c->argv[1]); - signalModifiedKey(c, c->db, c->argv[1]); + keyModified(c, c->db, c->argv[1], NULL, 1); notifyKeyspaceEvent(NOTIFY_GENERIC, "del", c->argv[1], c->db->id); server.dirty++; } @@ -606,7 +606,7 @@ void setrangeCommand(client *c) { if (server.memory_tracking_per_slot) updateSlotAllocSize(c->db, getKeySlot(c->argv[1]->ptr), oldsize, stringObjectAllocSize(kv)); memcpy((char*)kv->ptr+offset,value,value_len); - signalModifiedKey(c,c->db,c->argv[1]); + keyModified(c,c->db,c->argv[1],kv,1); notifyKeyspaceEvent(NOTIFY_STRING, "setrange",c->argv[1],c->db->id); server.dirty++; @@ -827,7 +827,7 @@ void incrDecrCommand(client *c, long long incr) { } } addReplyLongLongFromStr(c,new); - signalModifiedKey(c,c->db,c->argv[1]); + keyModified(c,c->db,c->argv[1],new,1); notifyKeyspaceEvent(NOTIFY_STRING,"incrby",c->argv[1],c->db->id); server.dirty++; } @@ -879,7 +879,7 @@ void incrbyfloatCommand(client *c) { dbReplaceValueWithLink(c->db, c->argv[1], &new, link); else dbAddByLink(c->db, c->argv[1], &new, &link); - signalModifiedKey(c,c->db,c->argv[1]); + keyModified(c,c->db,c->argv[1],new,1); notifyKeyspaceEvent(NOTIFY_STRING,"incrbyfloat",c->argv[1],c->db->id); server.dirty++; addReplyBulk(c,new); @@ -903,7 +903,7 @@ void appendCommand(client *c) { if (o == NULL) { /* Create the key */ c->argv[2] = tryObjectEncoding(c->argv[2]); - dbAddByLink(c->db, c->argv[1], &c->argv[2], &link); + o = dbAddByLink(c->db, c->argv[1], &c->argv[2], &link); incrRefCount(c->argv[2]); totlen = stringObjectLen(c->argv[2]); } else { @@ -928,7 +928,7 @@ void appendCommand(client *c) { int64_t oldlen = totlen - append_len; updateKeysizesHist(c->db, getKeySlot(c->argv[1]->ptr), OBJ_STRING, oldlen, totlen); } - signalModifiedKey(c,c->db,c->argv[1]); + keyModified(c,c->db,c->argv[1],o,1); notifyKeyspaceEvent(NOTIFY_STRING,"append",c->argv[1],c->db->id); server.dirty++; diff --git a/src/t_zset.c b/src/t_zset.c index 988d5c059..5c4f66ad1 100644 --- a/src/t_zset.c +++ b/src/t_zset.c @@ -1923,7 +1923,7 @@ reply_to_client: cleanup: zfree(scores); if (added || updated) { - signalModifiedKey(c,c->db,key); + keyModified(c,c->db,key,zobj,1); notifyKeyspaceEvent(NOTIFY_ZSET, incr ? "zincr" : "zadd", key, c->db->id); } @@ -1971,7 +1971,7 @@ void zremCommand(client *c) { } updateKeysizesHist(c->db, getKeySlot(key->ptr), OBJ_ZSET, oldlen, newlen); - signalModifiedKey(c,c->db,key); + keyModified(c, c->db, key, keyremoved ? NULL : zobj, 1); server.dirty += deleted; } addReplyLongLong(c,deleted); @@ -2092,7 +2092,7 @@ void zremrangeGenericCommand(client *c, zrange_type rangetype) { updateSlotAllocSize(c->db, getKeySlot(key->ptr), oldsize, zsetAllocSize(zobj)); if (deleted) { int64_t oldlen, newlen; - signalModifiedKey(c,c->db,key); + keyModified(c,c->db,key,NULL,1); notifyKeyspaceEvent(NOTIFY_ZSET,notify_type,key,c->db->id); if (keyremoved) { notifyKeyspaceEvent(NOTIFY_GENERIC, "del", key, c->db->id); @@ -2969,7 +2969,7 @@ void zunionInterDiffGenericCommand(client *c, robj *dstkey, int numkeysIndex, in } else { addReply(c, shared.czero); if (dbDelete(c->db, dstkey)) { - signalModifiedKey(c, c->db, dstkey); + keyModified(c, c->db, dstkey, NULL, 1); notifyKeyspaceEvent(NOTIFY_GENERIC, "del", dstkey, c->db->id); server.dirty++; } @@ -3178,7 +3178,7 @@ static void zrangeResultFinalizeStore(zrange_result_handler *handler, size_t res } else { addReply(handler->client, shared.czero); if (dbDelete(handler->client->db, handler->dstkey)) { - signalModifiedKey(handler->client, handler->client->db, handler->dstkey); + keyModified(handler->client, handler->client->db, handler->dstkey, NULL, 1); notifyKeyspaceEvent(NOTIFY_GENERIC, "del", handler->dstkey, handler->client->db->id); server.dirty++; } @@ -4174,7 +4174,7 @@ void genericZpopCommand(client *c, robj **keyv, int keyc, int where, int emitkey newlen = -1; } updateKeysizesHist(c->db, getKeySlot(key->ptr), OBJ_ZSET, oldlen, newlen); - signalModifiedKey(c,c->db,key); + keyModified(c, c->db, key, (newlen > 0) ? zobj : NULL, 1); if (c->cmd->proc == zmpopCommand) { /* Always replicate it as ZPOP[MIN|MAX] with COUNT option instead of ZMPOP. */ diff --git a/src/tracking.c b/src/tracking.c index a50b51232..256b8b6b1 100644 --- a/src/tracking.c +++ b/src/tracking.c @@ -352,7 +352,7 @@ void trackingRememberKeyToBroadcast(client *c, char *keyname, size_t keylen) { raxStop(&ri); } -/* This function is called from signalModifiedKey() or other places in Redis +/* This function is called from keyModified() or other places in Redis * when a key changes value. In the context of keys tracking, our task here is * to send a notification to every client that may have keys about such caching * slot. diff --git a/tests/modules/misc.c b/tests/modules/misc.c index c678d71f4..dbf0fecee 100644 --- a/tests/modules/misc.c +++ b/tests/modules/misc.c @@ -551,6 +551,16 @@ int only_reply_ok(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { return REDISMODULE_OK; } +/* Test command for RM_SignalModifiedKey. */ +int test_signalmodifiedkey(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + if (argc != 2) return RedisModule_WrongArity(ctx); + + /* Manually signal that the key was modified */ + RedisModule_SignalModifiedKey(ctx, argv[1]); + RedisModule_ReplyWithSimpleString(ctx, "OK"); + return REDISMODULE_OK; +} + int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { REDISMODULE_NOT_USED(argv); REDISMODULE_NOT_USED(argc); @@ -625,6 +635,8 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) RedisModuleCommand *parent = RedisModule_GetCommand(ctx, "test.no_cluster_cmd"); if (RedisModule_CreateSubcommand(parent, "set", only_reply_ok, "no-cluster", 0, 0, 0) == REDISMODULE_ERR) return REDISMODULE_ERR; + if (RedisModule_CreateCommand(ctx, "test.signalmodifiedkey", test_signalmodifiedkey, "write", 1, 1, 1) == REDISMODULE_ERR) + return REDISMODULE_ERR; return REDISMODULE_OK; } diff --git a/tests/unit/maxmemory.tcl b/tests/unit/maxmemory.tcl index 62f5965a6..409709285 100644 --- a/tests/unit/maxmemory.tcl +++ b/tests/unit/maxmemory.tcl @@ -163,7 +163,7 @@ start_server {tags {"maxmemory" "external:skip"}} { start_server {tags {"maxmemory external:skip"}} { foreach policy { - allkeys-random allkeys-lru allkeys-lfu volatile-lru volatile-lfu volatile-random volatile-ttl + allkeys-random allkeys-lru allkeys-lfu allkeys-lrm volatile-lru volatile-lfu volatile-random volatile-ttl volatile-lrm } { test "maxmemory - is the memory limit honoured? (policy $policy)" { # make sure to start with a blank instance @@ -195,7 +195,7 @@ start_server {tags {"maxmemory external:skip"}} { } foreach policy { - allkeys-random allkeys-lru volatile-lru volatile-random volatile-ttl + allkeys-random allkeys-lru allkeys-lrm volatile-lru volatile-random volatile-ttl volatile-lrm } { test "maxmemory - only allkeys-* should remove non-volatile keys ($policy)" { # make sure to start with a blank instance @@ -237,7 +237,7 @@ start_server {tags {"maxmemory external:skip"}} { } foreach policy { - volatile-lru volatile-lfu volatile-random volatile-ttl + volatile-lru volatile-lfu volatile-random volatile-ttl volatile-lrm } { test "maxmemory - policy $policy should only remove volatile keys." { # make sure to start with a blank instance @@ -602,3 +602,92 @@ start_server {tags {"maxmemory" "external:skip"}} { assert {[r object freq foo] == 5} } } + +# LRM eviction policy tests +start_server {tags {"maxmemory" "external:skip"}} { + test {LRM: Basic write updates idle time} { + r flushdb + r config set maxmemory-policy allkeys-lrm + + r set foo a + after 2000 + + # Read the key should NOT update LRM + r get foo + assert_morethan_equal [r object idletime foo] 1 + + # LRM should be updated (idletime should be smaller) + r set foo b + assert_lessthan_equal [r object idletime foo] 1 + } {} {slow} + + test {LRM: RENAME updates destination key LRM} { + r flushdb + r set src value + after 2000 + r rename src dst + assert_lessthan [r object idletime dst] 1 + } {} {slow} + + test {LRM: XREADGROUP updates stream LRM} { + r flushdb + r xadd mystream * field value + r xgroup create mystream mygroup 0 + after 2000 + r xreadgroup GROUP mygroup consumer1 STREAMS mystream > + assert_lessthan [r object idletime mystream] 1 + } {} {slow} + + test {LRM: Keys with only read operations should be removed first} { + r flushdb + r config set maxmemory 0 + r config set maxmemory-policy allkeys-lrm + r config set maxmemory-samples 64 ;# Ensure eviction sampling can pick all keys + + # Create keys and populate them + # We'll create two groups of keys: + # - read-only keys: will only be read after creation + # - write keys: will be continuously written to + for {set j 0} {$j < 25} {incr j} { + r set "read:$j" [string repeat x 20000] + r set "write:$j" [string repeat x 20000] + } + + after 1000 + + # Perform read and write operations on keys + for {set j 0} {$j < 25} {incr j} { + r get "read:$j" + r set "write:$j" [string repeat y 20000] + } + + # Set memory limit to force eviction + set used [s used_memory] + set limit [expr {$used - 200*1024}] + r config set maxmemory $limit + + # Add more keys to trigger eviction + for {set j 0} {$j < 10} {incr j} { + r set "trigger:$j" [string repeat z 20000] + } + + # Count how many keys from each group survived + set read_survived 0 + set write_survived 0 + for {set j 0} {$j < 25} {incr j} { + if {[r exists "read:$j"]} { + incr read_survived + } + if {[r exists "write:$j"]} { + incr write_survived + } + } + + # If read-only keys haven't been fully evicted, write keys must not be evicted at all. */ + if {$read_survived > 0} { + assert {$write_survived == 25} + } else { + assert {$write_survived > $read_survived} + } + } +} diff --git a/tests/unit/moduleapi/misc.tcl b/tests/unit/moduleapi/misc.tcl index 7cb5b908f..b51fffb31 100644 --- a/tests/unit/moduleapi/misc.tcl +++ b/tests/unit/moduleapi/misc.tcl @@ -148,6 +148,36 @@ start_server {overrides {save {900 1}} tags {"modules external:skip"}} { $rd_trk close } + test {RM_SignalModifiedKey - tracking invalidation} { + set rd_trk [redis_client] + $rd_trk HELLO 3 + $rd_trk CLIENT TRACKING on + r SET mykey{t} abc + + # Track the key by reading it + $rd_trk GET mykey{t} + + # # Modify the key using module command that calls RM_SignalModifiedKey + r test.signalmodifiedkey mykey{t} + + # # Should receive invalidation message + assert_equal {invalidate mykey{t}} [$rd_trk read] + assert_equal "PONG" [$rd_trk ping] + $rd_trk close + } + + test {RM_SignalModifiedKey - update LRM timestamp} { + set old_policy [config_get_set maxmemory-policy allkeys-lrm] + r SET mykey{t} abc + after 2000 + assert_morethan_equal [r object idletime mykey{t}] 1 + + # LRM should be updated. + r test.signalmodifiedkey mykey{t} + assert_lessthan [r object idletime mykey{t}] 2 + r config set maxmemory-policy $old_policy + } {OK} {slow} + test {publish to self inside rm_call} { r hello 3 r subscribe foo