diff --git a/src/Makefile b/src/Makefile index 4b86468b6..14dc57d34 100644 --- a/src/Makefile +++ b/src/Makefile @@ -375,7 +375,7 @@ endif REDIS_SERVER_NAME=redis-server$(PROG_SUFFIX) REDIS_SENTINEL_NAME=redis-sentinel$(PROG_SUFFIX) -REDIS_SERVER_OBJ=threads_mngr.o adlist.o quicklist.o ae.o anet.o dict.o ebuckets.o eventnotifier.o iothread.o mstr.o kvstore.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o cluster_legacy.o crc16.o endianconv.o slowlog.o eval.o bio.o rio.o rand.o memtest.o syscheck.o crcspeed.o crccombine.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o redis-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o lolwut8.o acl.o tracking.o socket.o tls.o sha256.o timeout.o setcpuaffinity.o monotonic.o mt19937-64.o resp_parser.o call_reply.o script_lua.o script.o functions.o function_lua.o commands.o strl.o connection.o unix.o logreqres.o +REDIS_SERVER_OBJ=threads_mngr.o memory_prefetch.o adlist.o quicklist.o ae.o anet.o dict.o ebuckets.o eventnotifier.o iothread.o mstr.o kvstore.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o cluster_legacy.o crc16.o endianconv.o slowlog.o eval.o bio.o rio.o rand.o memtest.o syscheck.o crcspeed.o crccombine.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o redis-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o lolwut8.o acl.o tracking.o socket.o tls.o sha256.o timeout.o setcpuaffinity.o monotonic.o mt19937-64.o resp_parser.o call_reply.o script_lua.o script.o functions.o function_lua.o commands.o strl.o connection.o unix.o logreqres.o REDIS_CLI_NAME=redis-cli$(PROG_SUFFIX) REDIS_CLI_OBJ=anet.o adlist.o dict.o redis-cli.o zmalloc.o release.o ae.o redisassert.o crcspeed.o crccombine.o crc64.o siphash.o crc16.o monotonic.o cli_common.o mt19937-64.o strl.o cli_commands.o REDIS_BENCHMARK_NAME=redis-benchmark$(PROG_SUFFIX) diff --git a/src/config.c b/src/config.c index a5ddad6b7..673cf60c1 100644 --- a/src/config.c +++ b/src/config.c @@ -3168,6 +3168,7 @@ standardConfig static_configs[] = { createIntConfig("databases", NULL, IMMUTABLE_CONFIG, 1, INT_MAX, server.dbnum, 16, INTEGER_CONFIG, NULL, NULL), createIntConfig("port", NULL, MODIFIABLE_CONFIG, 0, 65535, server.port, 6379, INTEGER_CONFIG, NULL, updatePort), /* TCP port. */ createIntConfig("io-threads", NULL, DEBUG_CONFIG | IMMUTABLE_CONFIG, 1, 128, server.io_threads_num, 1, INTEGER_CONFIG, NULL, NULL), /* Single threaded by default */ + createIntConfig("prefetch-batch-max-size", NULL, MODIFIABLE_CONFIG | HIDDEN_CONFIG, 0, 128, server.prefetch_batch_max_size, 16, INTEGER_CONFIG, NULL, NULL), createIntConfig("auto-aof-rewrite-percentage", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.aof_rewrite_perc, 100, INTEGER_CONFIG, NULL, NULL), createIntConfig("cluster-replica-validity-factor", "cluster-slave-validity-factor", MODIFIABLE_CONFIG, 0, INT_MAX, server.cluster_slave_validity_factor, 10, INTEGER_CONFIG, NULL, NULL), /* Slave max data age factor. */ createIntConfig("list-max-listpack-size", "list-max-ziplist-size", MODIFIABLE_CONFIG, INT_MIN, INT_MAX, server.list_max_listpack_size, -2, INTEGER_CONFIG, NULL, NULL), diff --git a/src/config.h b/src/config.h index 2427a7364..513c80275 100644 --- a/src/config.h +++ b/src/config.h @@ -105,10 +105,10 @@ /* Test for __builtin_prefetch() * Supported in LLVM since 2.9: https://releases.llvm.org/2.9/docs/ReleaseNotes.html - * Supported in GCC since 3.1 but we use 4.9 given it's too old: https://gcc.gnu.org/gcc-3.1/changes.html. */ + * Supported in GCC since 3.1 but we use 4.8 given it's too old: https://gcc.gnu.org/gcc-3.1/changes.html. */ #if defined(__clang__) && (__clang_major__ > 2 || (__clang_major__ == 2 && __clang_minor__ >= 9)) #define HAS_BUILTIN_PREFETCH 1 -#elif defined(__GNUC__) && (__GNUC__ > 4 || (__GNUC__ == 4 && __GNUC_MINOR__ >= 9)) +#elif defined(__GNUC__) && (__GNUC__ > 4 || (__GNUC__ == 4 && __GNUC_MINOR__ >= 8)) #define HAS_BUILTIN_PREFETCH 1 #else #define HAS_BUILTIN_PREFETCH 0 diff --git a/src/db.c b/src/db.c index 32184f580..d44a7f2d3 100644 --- a/src/db.c +++ b/src/db.c @@ -363,6 +363,24 @@ int getKeySlot(sds key) { return calculateKeySlot(key); } +/* Return the slot of the key in the command. IO threads use this function + * to calculate slot to reduce main-thread load */ +int getSlotFromCommand(struct redisCommand *cmd, robj **argv, int argc) { + int slot = -1; + if (!cmd || !server.cluster_enabled) return slot; + + /* Get the keys from the command */ + getKeysResult result = GETKEYS_RESULT_INIT; + int numkeys = getKeysFromCommand(cmd, argv, argc, &result); + if (numkeys > 0) { + /* Get the slot of the first key */ + robj *first = argv[result.keys[0].pos]; + slot = keyHashSlot(first->ptr, (int)sdslen(first->ptr)); + } + getKeysFreeResult(&result); + return slot; +} + /* This is a special version of dbAdd() that is used only when loading * keys from the RDB file: the key is passed as an SDS string that is * copied by the function and freed by the caller. diff --git a/src/dict.c b/src/dict.c index 25fcde711..2eb77d52e 100644 --- a/src/dict.c +++ b/src/dict.c @@ -66,7 +66,6 @@ static void _dictShrinkIfNeeded(dict *d); static void _dictRehashStepIfNeeded(dict *d, uint64_t visitedIdx); static signed char _dictNextExp(unsigned long size); static int _dictInit(dict *d, dictType *type); -static dictEntry *dictGetNext(const dictEntry *de); static dictEntryLink dictGetNextLink(dictEntry *de); static void dictSetNext(dictEntry *de, dictEntry *next); static int dictDefaultCompare(dictCmpCache *cache, const void *key1, const void *key2); @@ -499,6 +498,12 @@ int dictAdd(dict *d, void *key, void *val) return DICT_OK; } +int dictCompareKeys(dict *d, const void *key1, const void *key2) { + dictCmpCache cache = {0}; + keyCmpFunc cmpFunc = dictGetCmpFunc(d); + return cmpFunc(&cache, key1, key2); +} + /* Low level add or find: * This function adds the entry but instead of setting a value returns the * dictEntry structure to the user, that will make sure to fill the value @@ -1007,6 +1012,10 @@ double dictIncrDoubleVal(dictEntry *de, double val) { return de->v.d += val; } +int dictEntryIsKey(const dictEntry *de) { + return entryIsKey(de); +} + void *dictGetKey(const dictEntry *de) { /* if entryIsKey() */ if ((uintptr_t)de & ENTRY_PTR_IS_ODD_KEY) return (void *) de; @@ -1044,7 +1053,7 @@ double *dictGetDoubleValPtr(dictEntry *de) { /* Returns the 'next' field of the entry or NULL if the entry doesn't have a * 'next' field. */ -static dictEntry *dictGetNext(const dictEntry *de) { +dictEntry *dictGetNext(const dictEntry *de) { if (entryIsKey(de)) return NULL; /* there's no next */ if (entryIsNoValue(de)) return decodeEntryNoValue(de)->next; return de->next; diff --git a/src/dict.h b/src/dict.h index 288d95b97..10cf29b46 100644 --- a/src/dict.h +++ b/src/dict.h @@ -181,11 +181,6 @@ typedef struct { if ((d)->type->keyDestructor) \ (d)->type->keyDestructor((d), dictGetKey(entry)) -#define dictCompareKeys(d, key1, key2) \ - (((d)->type->keyCompare) ? \ - (d)->type->keyCompare((d), key1, key2) : \ - (key1) == (key2)) - #define dictMetadata(d) (&(d)->metadata) #define dictMetadataSize(d) ((d)->type->dictMetadataBytes \ ? (d)->type->dictMetadataBytes(d) : 0) @@ -234,6 +229,8 @@ dictEntry * dictFind(dict *d, const void *key); int dictShrinkIfNeeded(dict *d); int dictExpandIfNeeded(dict *d); void *dictGetKey(const dictEntry *de); +int dictEntryIsKey(const dictEntry *de); +int dictCompareKeys(dict *d, const void *key1, const void *key2); size_t dictMemUsage(const dict *d); size_t dictEntryMemUsage(int noValueDict); dictIterator *dictGetIterator(dict *d); @@ -242,6 +239,7 @@ void dictInitIterator(dictIterator *iter, dict *d); void dictInitSafeIterator(dictIterator *iter, dict *d); void dictResetIterator(dictIterator *iter); dictEntry *dictNext(dictIterator *iter); +dictEntry *dictGetNext(const dictEntry *de); void dictReleaseIterator(dictIterator *iter); dictEntry *dictGetRandomKey(dict *d); dictEntry *dictGetFairRandomKey(dict *d); diff --git a/src/fmtargs.h b/src/fmtargs.h index e52d3b99c..f32f6ea8b 100644 --- a/src/fmtargs.h +++ b/src/fmtargs.h @@ -44,9 +44,9 @@ /* Everything below this line is automatically generated by * generate-fmtargs.py. Do not manually edit. */ -#define ARG_N(_1, _2, _3, _4, _5, _6, _7, _8, _9, _10, _11, _12, _13, _14, _15, _16, _17, _18, _19, _20, _21, _22, _23, _24, _25, _26, _27, _28, _29, _30, _31, _32, _33, _34, _35, _36, _37, _38, _39, _40, _41, _42, _43, _44, _45, _46, _47, _48, _49, _50, _51, _52, _53, _54, _55, _56, _57, _58, _59, _60, _61, _62, _63, _64, _65, _66, _67, _68, _69, _70, _71, _72, _73, _74, _75, _76, _77, _78, _79, _80, _81, _82, _83, _84, _85, _86, _87, _88, _89, _90, _91, _92, _93, _94, _95, _96, _97, _98, _99, _100, _101, _102, _103, _104, _105, _106, _107, _108, _109, _110, _111, _112, _113, _114, _115, _116, _117, _118, _119, _120, N, ...) N +#define ARG_N(_1, _2, _3, _4, _5, _6, _7, _8, _9, _10, _11, _12, _13, _14, _15, _16, _17, _18, _19, _20, _21, _22, _23, _24, _25, _26, _27, _28, _29, _30, _31, _32, _33, _34, _35, _36, _37, _38, _39, _40, _41, _42, _43, _44, _45, _46, _47, _48, _49, _50, _51, _52, _53, _54, _55, _56, _57, _58, _59, _60, _61, _62, _63, _64, _65, _66, _67, _68, _69, _70, _71, _72, _73, _74, _75, _76, _77, _78, _79, _80, _81, _82, _83, _84, _85, _86, _87, _88, _89, _90, _91, _92, _93, _94, _95, _96, _97, _98, _99, _100, _101, _102, _103, _104, _105, _106, _107, _108, _109, _110, _111, _112, _113, _114, _115, _116, _117, _118, _119, _120, _121, _122, _123, _124, _125, _126, _127, _128, _129, _130, _131, _132, _133, _134, _135, _136, _137, _138, _139, _140, _141, _142, _143, _144, _145, _146, _147, _148, _149, _150, _151, _152, _153, _154, _155, _156, _157, _158, _159, _160, N, ...) N -#define RSEQ_N() 120, 119, 118, 117, 116, 115, 114, 113, 112, 111, 110, 109, 108, 107, 106, 105, 104, 103, 102, 101, 100, 99, 98, 97, 96, 95, 94, 93, 92, 91, 90, 89, 88, 87, 86, 85, 84, 83, 82, 81, 80, 79, 78, 77, 76, 75, 74, 73, 72, 71, 70, 69, 68, 67, 66, 65, 64, 63, 62, 61, 60, 59, 58, 57, 56, 55, 54, 53, 52, 51, 50, 49, 48, 47, 46, 45, 44, 43, 42, 41, 40, 39, 38, 37, 36, 35, 34, 33, 32, 31, 30, 29, 28, 27, 26, 25, 24, 23, 22, 21, 20, 19, 18, 17, 16, 15, 14, 13, 12, 11, 10, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0 +#define RSEQ_N() 160, 159, 158, 157, 156, 155, 154, 153, 152, 151, 150, 149, 148, 147, 146, 145, 144, 143, 142, 141, 140, 139, 138, 137, 136, 135, 134, 133, 132, 131, 130, 129, 128, 127, 126, 125, 124, 123, 122, 121, 120, 119, 118, 117, 116, 115, 114, 113, 112, 111, 110, 109, 108, 107, 106, 105, 104, 103, 102, 101, 100, 99, 98, 97, 96, 95, 94, 93, 92, 91, 90, 89, 88, 87, 86, 85, 84, 83, 82, 81, 80, 79, 78, 77, 76, 75, 74, 73, 72, 71, 70, 69, 68, 67, 66, 65, 64, 63, 62, 61, 60, 59, 58, 57, 56, 55, 54, 53, 52, 51, 50, 49, 48, 47, 46, 45, 44, 43, 42, 41, 40, 39, 38, 37, 36, 35, 34, 33, 32, 31, 30, 29, 28, 27, 26, 25, 24, 23, 22, 21, 20, 19, 18, 17, 16, 15, 14, 13, 12, 11, 10, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0 #define COMPACT_FMT_2(fmt, value) fmt #define COMPACT_FMT_4(fmt, value, ...) fmt COMPACT_FMT_2(__VA_ARGS__) @@ -108,6 +108,26 @@ #define COMPACT_FMT_116(fmt, value, ...) fmt COMPACT_FMT_114(__VA_ARGS__) #define COMPACT_FMT_118(fmt, value, ...) fmt COMPACT_FMT_116(__VA_ARGS__) #define COMPACT_FMT_120(fmt, value, ...) fmt COMPACT_FMT_118(__VA_ARGS__) +#define COMPACT_FMT_122(fmt, value, ...) fmt COMPACT_FMT_120(__VA_ARGS__) +#define COMPACT_FMT_124(fmt, value, ...) fmt COMPACT_FMT_122(__VA_ARGS__) +#define COMPACT_FMT_126(fmt, value, ...) fmt COMPACT_FMT_124(__VA_ARGS__) +#define COMPACT_FMT_128(fmt, value, ...) fmt COMPACT_FMT_126(__VA_ARGS__) +#define COMPACT_FMT_130(fmt, value, ...) fmt COMPACT_FMT_128(__VA_ARGS__) +#define COMPACT_FMT_132(fmt, value, ...) fmt COMPACT_FMT_130(__VA_ARGS__) +#define COMPACT_FMT_134(fmt, value, ...) fmt COMPACT_FMT_132(__VA_ARGS__) +#define COMPACT_FMT_136(fmt, value, ...) fmt COMPACT_FMT_134(__VA_ARGS__) +#define COMPACT_FMT_138(fmt, value, ...) fmt COMPACT_FMT_136(__VA_ARGS__) +#define COMPACT_FMT_140(fmt, value, ...) fmt COMPACT_FMT_138(__VA_ARGS__) +#define COMPACT_FMT_142(fmt, value, ...) fmt COMPACT_FMT_140(__VA_ARGS__) +#define COMPACT_FMT_144(fmt, value, ...) fmt COMPACT_FMT_142(__VA_ARGS__) +#define COMPACT_FMT_146(fmt, value, ...) fmt COMPACT_FMT_144(__VA_ARGS__) +#define COMPACT_FMT_148(fmt, value, ...) fmt COMPACT_FMT_146(__VA_ARGS__) +#define COMPACT_FMT_150(fmt, value, ...) fmt COMPACT_FMT_148(__VA_ARGS__) +#define COMPACT_FMT_152(fmt, value, ...) fmt COMPACT_FMT_150(__VA_ARGS__) +#define COMPACT_FMT_154(fmt, value, ...) fmt COMPACT_FMT_152(__VA_ARGS__) +#define COMPACT_FMT_156(fmt, value, ...) fmt COMPACT_FMT_154(__VA_ARGS__) +#define COMPACT_FMT_158(fmt, value, ...) fmt COMPACT_FMT_156(__VA_ARGS__) +#define COMPACT_FMT_160(fmt, value, ...) fmt COMPACT_FMT_158(__VA_ARGS__) #define COMPACT_VALUES_2(fmt, value) value #define COMPACT_VALUES_4(fmt, value, ...) value, COMPACT_VALUES_2(__VA_ARGS__) @@ -169,5 +189,25 @@ #define COMPACT_VALUES_116(fmt, value, ...) value, COMPACT_VALUES_114(__VA_ARGS__) #define COMPACT_VALUES_118(fmt, value, ...) value, COMPACT_VALUES_116(__VA_ARGS__) #define COMPACT_VALUES_120(fmt, value, ...) value, COMPACT_VALUES_118(__VA_ARGS__) +#define COMPACT_VALUES_122(fmt, value, ...) value, COMPACT_VALUES_120(__VA_ARGS__) +#define COMPACT_VALUES_124(fmt, value, ...) value, COMPACT_VALUES_122(__VA_ARGS__) +#define COMPACT_VALUES_126(fmt, value, ...) value, COMPACT_VALUES_124(__VA_ARGS__) +#define COMPACT_VALUES_128(fmt, value, ...) value, COMPACT_VALUES_126(__VA_ARGS__) +#define COMPACT_VALUES_130(fmt, value, ...) value, COMPACT_VALUES_128(__VA_ARGS__) +#define COMPACT_VALUES_132(fmt, value, ...) value, COMPACT_VALUES_130(__VA_ARGS__) +#define COMPACT_VALUES_134(fmt, value, ...) value, COMPACT_VALUES_132(__VA_ARGS__) +#define COMPACT_VALUES_136(fmt, value, ...) value, COMPACT_VALUES_134(__VA_ARGS__) +#define COMPACT_VALUES_138(fmt, value, ...) value, COMPACT_VALUES_136(__VA_ARGS__) +#define COMPACT_VALUES_140(fmt, value, ...) value, COMPACT_VALUES_138(__VA_ARGS__) +#define COMPACT_VALUES_142(fmt, value, ...) value, COMPACT_VALUES_140(__VA_ARGS__) +#define COMPACT_VALUES_144(fmt, value, ...) value, COMPACT_VALUES_142(__VA_ARGS__) +#define COMPACT_VALUES_146(fmt, value, ...) value, COMPACT_VALUES_144(__VA_ARGS__) +#define COMPACT_VALUES_148(fmt, value, ...) value, COMPACT_VALUES_146(__VA_ARGS__) +#define COMPACT_VALUES_150(fmt, value, ...) value, COMPACT_VALUES_148(__VA_ARGS__) +#define COMPACT_VALUES_152(fmt, value, ...) value, COMPACT_VALUES_150(__VA_ARGS__) +#define COMPACT_VALUES_154(fmt, value, ...) value, COMPACT_VALUES_152(__VA_ARGS__) +#define COMPACT_VALUES_156(fmt, value, ...) value, COMPACT_VALUES_154(__VA_ARGS__) +#define COMPACT_VALUES_158(fmt, value, ...) value, COMPACT_VALUES_156(__VA_ARGS__) +#define COMPACT_VALUES_160(fmt, value, ...) value, COMPACT_VALUES_158(__VA_ARGS__) #endif diff --git a/src/iothread.c b/src/iothread.c index a17031791..89e218546 100644 --- a/src/iothread.c +++ b/src/iothread.c @@ -343,6 +343,29 @@ int sendPendingClientsToIOThreads(void) { return processed; } +/* Prefetch the commands from the IO thread. The return value is the number + * of clients that have been prefetched. */ +int prefetchIOThreadCommands(IOThread *t) { + int len = listLength(mainThreadProcessingClients[t->id]); + int to_prefetch = determinePrefetchCount(len); + if (to_prefetch == 0) return 0; + + int clients = 0; + listIter li; + listNode *ln; + listRewind(mainThreadProcessingClients[t->id], &li); + while((ln = listNext(&li)) && clients++ < to_prefetch) { + client *c = listNodeValue(ln); + /* A single command may contain multiple keys. If the batch is full, + * we stop adding clients to it. */ + if (addCommandToBatch(c) == C_ERR) break; + } + + /* Prefetch the commands in the batch. */ + prefetchCommands(); + return clients; +} + extern int ProcessingEventsWhileBlocked; /* Send the pending clients to the IO thread if the number of pending clients @@ -391,8 +414,19 @@ int processClientsFromIOThread(IOThread *t) { size_t processed = listLength(mainThreadProcessingClients[t->id]); if (processed == 0) return 0; + int prefetch_clients = 0; + /* We may call processClientsFromIOThread reentrantly, so we need to + * reset the prefetching batch, besides, users may change the config + * of prefetch batch size, so we need to reset the prefetching batch. */ + resetCommandsBatch(); + listNode *node = NULL; while (listLength(mainThreadProcessingClients[t->id])) { + /* Prefetch the commands if no clients in the batch. */ + if (prefetch_clients <= 0) prefetch_clients = prefetchIOThreadCommands(t); + /* Reset the prefetching batch if we have processed all clients. */ + if (--prefetch_clients <= 0) resetCommandsBatch(); + /* Each time we pop up only the first client to process to guarantee * reentrancy safety. */ if (node) zfree(node); @@ -651,6 +685,8 @@ void initThreadedIO(void) { exit(1); } + prefetchCommandsBatchInit(); + /* Spawn and initialize the I/O threads. */ for (int i = 1; i < server.io_threads_num; i++) { IOThread *t = &IOThreads[i]; diff --git a/src/kvstore.c b/src/kvstore.c index 8aded921f..e5f2930c5 100644 --- a/src/kvstore.c +++ b/src/kvstore.c @@ -83,7 +83,7 @@ typedef struct { /**********************************/ /* Get the dictionary pointer based on dict-index. */ -static dict *kvstoreGetDict(kvstore *kvs, int didx) { +dict *kvstoreGetDict(kvstore *kvs, int didx) { return kvs->dicts[didx]; } diff --git a/src/kvstore.h b/src/kvstore.h index 7ef619d3c..ce7aa895e 100644 --- a/src/kvstore.h +++ b/src/kvstore.h @@ -97,6 +97,7 @@ dictEntry *kvstoreDictAddRaw(kvstore *kvs, int didx, void *key, dictEntry **exis dictEntryLink kvstoreDictTwoPhaseUnlinkFind(kvstore *kvs, int didx, const void *key, int *table_index); void kvstoreDictTwoPhaseUnlinkFree(kvstore *kvs, int didx, dictEntryLink plink, int table_index); int kvstoreDictDelete(kvstore *kvs, int didx, const void *key); +dict *kvstoreGetDict(kvstore *kvs, int didx); kvstoreDictMetadata *kvstoreGetDictMetadata(kvstore *kvs, int didx); kvstoreMetadata *kvstoreGetMetadata(kvstore *kvs); diff --git a/src/memory_prefetch.c b/src/memory_prefetch.c new file mode 100644 index 000000000..8f3f77ef2 --- /dev/null +++ b/src/memory_prefetch.c @@ -0,0 +1,399 @@ +/* + * This file utilizes prefetching keys and data for multiple commands in a batch, + * to improve performance by amortizing memory access costs across multiple operations. + * + * Copyright (c) 2025-Present, Redis Ltd. and contributors. + * All rights reserved. + * + * Copyright (c) 2024-present, Valkey contributors. + * All rights reserved. + * + * Licensed under your choice of (a) the Redis Source Available License 2.0 + * (RSALv2); or (b) the Server Side Public License v1 (SSPLv1); or (c) the + * GNU Affero General Public License v3 (AGPLv3). + * + * Portions of this file are available under BSD3 terms; see REDISCONTRIBUTIONS for more information. + */ + +#include "memory_prefetch.h" +#include "server.h" +#include "dict.h" + +typedef enum { HT_IDX_FIRST = 0, HT_IDX_SECOND = 1, HT_IDX_INVALID = -1 } HashTableIndex; + +typedef enum { + PREFETCH_BUCKET, /* Initial state, determines which hash table to use and prefetch the table's bucket */ + PREFETCH_ENTRY, /* prefetch entries associated with the given key's hash */ + PREFETCH_KVOBJ, /* prefetch the kv object of the entry found in the previous step */ + PREFETCH_VALDATA, /* prefetch the value data of the kv object found in the previous step */ + PREFETCH_DONE /* Indicates that prefetching for this key is complete */ +} PrefetchState; + + +/************************************ State machine diagram for the prefetch operation. ******************************** + │ + start + │ + ┌────────▼─────────┐ + ┌─────────►│ PREFETCH_BUCKET ├────►────────┐ + │ └────────┬─────────┘ no more tables -> done + | bucket|found | + │ | │ + entry not found - goto next table ┌────────▼────────┐ │ + └────◄─────┤ PREFETCH_ENTRY | ▼ + ┌────────────►└────────┬────────┘ │ + | Entry│found │ + │ | │ + | ┌───────▼────────┐ │ + │ | PREFETCH_KVOBJ | ▼ + │ └───────┬────────┘ │ + kvobj not found - goto next entry | | + │ ┌───────────▼────────────┐ │ + └──────◄───│ PREFETCH_VALDATA │ ▼ + └───────────┬────────────┘ │ + | │ + ┌───────-─▼─────────────┐ │ + │ PREFETCH_DONE │◄────────┘ + └───────────────────────┘ +**********************************************************************************************************************/ + +typedef void *(*GetValueDataFunc)(const void *val); + +typedef struct KeyPrefetchInfo { + PrefetchState state; /* Current state of the prefetch operation */ + HashTableIndex ht_idx; /* Index of the current hash table (0 or 1 for rehashing) */ + uint64_t bucket_idx; /* Index of the bucket in the current hash table */ + uint64_t key_hash; /* Hash value of the key being prefetched */ + dictEntry *current_entry; /* Pointer to the current entry being processed */ + kvobj *current_kv; /* Pointer to the kv object being prefetched */ +} KeyPrefetchInfo; + +/* PrefetchCommandsBatch structure holds the state of the current batch of client commands being processed. */ +typedef struct PrefetchCommandsBatch { + size_t cur_idx; /* Index of the current key being processed */ + size_t key_count; /* Number of keys in the current batch */ + size_t client_count; /* Number of clients in the current batch */ + size_t max_prefetch_size; /* Maximum number of keys to prefetch in a batch */ + void **keys; /* Array of keys to prefetch in the current batch */ + client **clients; /* Array of clients in the current batch */ + dict **keys_dicts; /* Main dict for each key */ + dict **current_dicts; /* Points to dict to prefetch from */ + KeyPrefetchInfo *prefetch_info; /* Prefetch info for each key */ + GetValueDataFunc get_value_data_func; /* Function to get the value data */ +} PrefetchCommandsBatch; + +static PrefetchCommandsBatch *batch = NULL; + +void freePrefetchCommandsBatch(void) { + if (batch == NULL) { + return; + } + + zfree(batch->clients); + zfree(batch->keys); + zfree(batch->keys_dicts); + zfree(batch->prefetch_info); + zfree(batch); + batch = NULL; +} + +void prefetchCommandsBatchInit(void) { + serverAssert(!batch); + + /* To avoid prefetching small batches, we set the max size to twice + * the configured size, so if not exceeding twice the limit, we can + * prefetch all of it. See also `determinePrefetchCount` */ + size_t max_prefetch_size = server.prefetch_batch_max_size * 2; + + if (max_prefetch_size == 0) { + return; + } + + batch = zcalloc(sizeof(PrefetchCommandsBatch)); + batch->max_prefetch_size = max_prefetch_size; + batch->clients = zcalloc(max_prefetch_size * sizeof(client *)); + batch->keys = zcalloc(max_prefetch_size * sizeof(void *)); + batch->keys_dicts = zcalloc(max_prefetch_size * sizeof(dict *)); + batch->prefetch_info = zcalloc(max_prefetch_size * sizeof(KeyPrefetchInfo)); +} + +void onMaxBatchSizeChange(void) { + if (batch && batch->client_count > 0) { + /* We need to process the current batch before updating the size */ + return; + } + + freePrefetchCommandsBatch(); + prefetchCommandsBatchInit(); +} + +/* Prefetch the given pointer and move to the next key in the batch. */ +static inline void prefetchAndMoveToNextKey(void *addr) { + redis_prefetch_read(addr); + /* While the prefetch is in progress, we can continue to the next key */ + batch->cur_idx = (batch->cur_idx + 1) % batch->key_count; +} + +static inline void markKeyAsdone(KeyPrefetchInfo *info) { + info->state = PREFETCH_DONE; + server.stat_total_prefetch_entries++; +} + +/* Returns the next KeyPrefetchInfo structure that needs to be processed. */ +static KeyPrefetchInfo *getNextPrefetchInfo(void) { + size_t start_idx = batch->cur_idx; + do { + KeyPrefetchInfo *info = &batch->prefetch_info[batch->cur_idx]; + if (info->state != PREFETCH_DONE) return info; + batch->cur_idx = (batch->cur_idx + 1) % batch->key_count; + } while (batch->cur_idx != start_idx); + return NULL; +} + +static void initBatchInfo(dict **dicts, GetValueDataFunc func) { + batch->current_dicts = dicts; + batch->get_value_data_func = func; + + /* Initialize the prefetch info */ + for (size_t i = 0; i < batch->key_count; i++) { + KeyPrefetchInfo *info = &batch->prefetch_info[i]; + if (!batch->current_dicts[i] || dictSize(batch->current_dicts[i]) == 0) { + info->state = PREFETCH_DONE; + continue; + } + info->ht_idx = HT_IDX_INVALID; + info->current_entry = NULL; + info->current_kv = NULL; + info->state = PREFETCH_BUCKET; + info->key_hash = dictGetHash(batch->current_dicts[i], batch->keys[i]); + } +} + +/* Prefetch the bucket of the next hash table index. + * If no tables are left, move to the PREFETCH_DONE state. */ +static void prefetchBucket(KeyPrefetchInfo *info) { + size_t i = batch->cur_idx; + + /* Determine which hash table to use */ + if (info->ht_idx == HT_IDX_INVALID) { + info->ht_idx = HT_IDX_FIRST; + } else if (info->ht_idx == HT_IDX_FIRST && dictIsRehashing(batch->current_dicts[i])) { + info->ht_idx = HT_IDX_SECOND; + } else { + /* No more tables left - mark as done. */ + markKeyAsdone(info); + return; + } + + /* Prefetch the bucket */ + info->bucket_idx = info->key_hash & DICTHT_SIZE_MASK(batch->current_dicts[i]->ht_size_exp[info->ht_idx]); + prefetchAndMoveToNextKey(&batch->current_dicts[i]->ht_table[info->ht_idx][info->bucket_idx]); + info->current_entry = NULL; + info->state = PREFETCH_ENTRY; +} + +/* Prefetch the entry in the bucket and move to the PREFETCH_KVOBJ state. + * If no more entries in the bucket, move to the PREFETCH_BUCKET state to look at the next table. */ +static void prefetchEntry(KeyPrefetchInfo *info) { + size_t i = batch->cur_idx; + + if (info->current_entry) { + /* We already found an entry in the bucket - move to the next entry */ + info->current_entry = dictGetNext(info->current_entry); + } else { + /* Go to the first entry in the bucket */ + info->current_entry = batch->current_dicts[i]->ht_table[info->ht_idx][info->bucket_idx]; + } + + if (info->current_entry) { + prefetchAndMoveToNextKey(info->current_entry); + info->current_kv = NULL; + info->state = PREFETCH_KVOBJ; + } else { + /* No entry found in the bucket - try the bucket in the next table */ + info->state = PREFETCH_BUCKET; + } +} + +/* Prefetch the kv object in the dict entry, and to the PREFETCH_VALDATA state. */ +static inline void prefetchKVOject(KeyPrefetchInfo *info) { + kvobj *kv = dictGetKey(info->current_entry); + int is_kv = dictEntryIsKey(info->current_entry); + + info->current_kv = kv; + info->state = PREFETCH_VALDATA; + /* If the entry is a pointer of kv object, we don't need to prefetch it */ + if (!is_kv) prefetchAndMoveToNextKey(kv); +} + +/* Prefetch the value data of the kv object found in dict entry. */ +static void prefetchValueData(KeyPrefetchInfo *info) { + size_t i = batch->cur_idx; + kvobj *kv = info->current_kv; + + /* 1. If this is the last element, we assume a hit and don't compare the keys + * 2. This kv object is the target of the lookup. */ + if ((!dictGetNext(info->current_entry) && !dictIsRehashing(batch->current_dicts[i])) || + dictCompareKeys(batch->current_dicts[i], batch->keys[i], kv)) + { + if (batch->get_value_data_func) { + void *value_data = batch->get_value_data_func(kv); + if (value_data) prefetchAndMoveToNextKey(value_data); + } + markKeyAsdone(info); + } else { + /* Not found in the current entry, move to the next entry */ + info->state = PREFETCH_ENTRY; + } +} + +/* Prefetch dictionary data for an array of keys. + * + * This function takes an array of dictionaries and keys, attempting to bring + * data closer to the L1 cache that might be needed for dictionary operations + * on those keys. + * + * The dictFind algorithm: + * 1. Evaluate the hash of the key + * 2. Access the index in the first table + * 3. Walk the entries linked list until the key is found + * If the key hasn't been found and the dictionary is in the middle of rehashing, + * access the index on the second table and repeat step 3 + * + * dictPrefetch executes the same algorithm as dictFind, but one step at a time + * for each key. Instead of waiting for data to be read from memory, it prefetches + * the data and then moves on to execute the next prefetch for another key. + * + * dicts - An array of dictionaries to prefetch data from. + * get_val_data_func - A callback function that dictPrefetch can invoke + * to bring the key's value data closer to the L1 cache as well. + */ +static void dictPrefetch(dict **dicts, GetValueDataFunc get_val_data_func) { + initBatchInfo(dicts, get_val_data_func); + KeyPrefetchInfo *info; + while ((info = getNextPrefetchInfo())) { + switch (info->state) { + case PREFETCH_BUCKET: prefetchBucket(info); break; + case PREFETCH_ENTRY: prefetchEntry(info); break; + case PREFETCH_KVOBJ: prefetchKVOject(info); break; + case PREFETCH_VALDATA: prefetchValueData(info); break; + default: serverPanic("Unknown prefetch state %d", info->state); + } + } +} + +/* Helper function to get the value pointer of a kv object. */ +static void *getObjectValuePtr(const void *value) { + kvobj *kv = (kvobj *)value; + return (kv->type == OBJ_STRING && kv->encoding == OBJ_ENCODING_RAW) ? kv->ptr : NULL; +} + +void resetCommandsBatch(void) { + if (batch == NULL) { + /* Handle the case where prefetching becomes enabled from disabled. */ + if (server.prefetch_batch_max_size) prefetchCommandsBatchInit(); + return; + } + + batch->cur_idx = 0; + batch->key_count = 0; + batch->client_count = 0; + + /* Handle the case where the max prefetch size has been changed. */ + if (batch->max_prefetch_size != (size_t)server.prefetch_batch_max_size * 2) { + onMaxBatchSizeChange(); + } +} + +/* Prefetching in very small batches tends to be ineffective because the technique + * relies on a small gap—typically a few CPU cycles—between issuing the prefetch + * and performing the actual memory access. If the batch is too small, this delay + * cannot be effectively inserted, and the prefetching yields little to no benefit. + * + * To avoid wasting effort, when the remaining data is small (less than twice the + * maximum batch size), we simply prefetch all of it at once. Otherwise, we only + * prefetch a limited portion, capped at the configured maximum. */ +int determinePrefetchCount(int len) { + if (!batch) return 0; + + /* The batch max size is double of the configured size. */ + int config_size = batch->max_prefetch_size / 2; + return len < server.prefetch_batch_max_size ? len : config_size; +} + +/* Prefetch command-related data: + * 1. Prefetch the command arguments allocated by the I/O thread to bring them + * closer to the L1 cache. + * 2. Prefetch the keys and values for all commands in the current batch from + * the main dictionaries. */ +void prefetchCommands(void) { + if (!batch) return; + + /* Prefetch argv's for all clients */ + for (size_t i = 0; i < batch->client_count; i++) { + client *c = batch->clients[i]; + if (!c || c->argc <= 1) continue; + /* Skip prefetching first argv (cmd name) it was already looked up by + * the I/O thread, and the main thread will not touch argv[0]. */ + for (int j = 1; j < c->argc; j++) { + redis_prefetch_read(c->argv[j]); + } + } + + /* Prefetch the argv->ptr if required */ + for (size_t i = 0; i < batch->client_count; i++) { + client *c = batch->clients[i]; + if (!c || c->argc <= 1) continue; + for (int j = 1; j < c->argc; j++) { + if (c->argv[j]->encoding == OBJ_ENCODING_RAW) { + redis_prefetch_read(c->argv[j]->ptr); + } + } + } + + /* Get the keys ptrs - we do it here after the key obj was prefetched. */ + for (size_t i = 0; i < batch->key_count; i++) { + batch->keys[i] = ((robj *)batch->keys[i])->ptr; + } + + /* Prefetch dict keys for all commands. + * Prefetching is beneficial only if there are more than one key. */ + if (batch->key_count > 1) { + server.stat_total_prefetch_batches++; + /* Prefetch keys from the main dict */ + dictPrefetch(batch->keys_dicts, getObjectValuePtr); + } +} + +/* Adds the client's command to the current batch. + * + * Returns C_OK if the command was added successfully, C_ERR otherwise. */ +int addCommandToBatch(client *c) { + if (unlikely(!batch)) return C_ERR; + + /* If the batch is full, process it. + * We also check the client count to handle cases where + * no keys exist for the clients' commands. */ + if (batch->client_count == batch->max_prefetch_size || + batch->key_count == batch->max_prefetch_size) + { + return C_ERR; + } + + batch->clients[batch->client_count++] = c; + + if (likely(c->iolookedcmd)) { + /* Get command's keys positions */ + getKeysResult result = GETKEYS_RESULT_INIT; + int num_keys = getKeysFromCommand(c->iolookedcmd, c->argv, c->argc, &result); + for (int i = 0; i < num_keys && batch->key_count < batch->max_prefetch_size; i++) { + batch->keys[batch->key_count] = c->argv[result.keys[i].pos]; + batch->keys_dicts[batch->key_count] = + kvstoreGetDict(c->db->keys, c->slot > 0 ? c->slot : 0); + batch->key_count++; + } + getKeysFreeResult(&result); + } + + return C_OK; +} diff --git a/src/memory_prefetch.h b/src/memory_prefetch.h new file mode 100644 index 000000000..e2977f10f --- /dev/null +++ b/src/memory_prefetch.h @@ -0,0 +1,26 @@ +/* + * Copyright (c) 2025-Present, Redis Ltd. + * All rights reserved. + * + * Copyright (c) 2024-present, Valkey contributors. + * All rights reserved. + * + * Licensed under your choice of (a) the Redis Source Available License 2.0 + * (RSALv2); or (b) the Server Side Public License v1 (SSPLv1); or (c) the + * GNU Affero General Public License v3 (AGPLv3). + * + * Portions of this file are available under BSD3 terms; see REDISCONTRIBUTIONS for more information. + */ + +#ifndef MEMORY_PREFETCH_H +#define MEMORY_PREFETCH_H + +struct client; + +void prefetchCommandsBatchInit(void); +int determinePrefetchCount(int len); +int addCommandToBatch(struct client *c); +void resetCommandsBatch(void); +void prefetchCommands(void); + +#endif /* MEMORY_PREFETCH_H */ diff --git a/src/networking.c b/src/networking.c index 028eae68b..30e4789f8 100644 --- a/src/networking.c +++ b/src/networking.c @@ -2877,6 +2877,7 @@ int processInputBuffer(client *c) { if (c->running_tid != IOTHREAD_MAIN_THREAD_ID) { c->io_flags |= CLIENT_IO_PENDING_COMMAND; c->iolookedcmd = lookupCommand(c->argv, c->argc); + c->slot = getSlotFromCommand(c->iolookedcmd, c->argv, c->argc); enqueuePendingClientsToMainThread(c, 0); break; } diff --git a/src/server.c b/src/server.c index cbb522023..8fa30ffab 100644 --- a/src/server.c +++ b/src/server.c @@ -2745,6 +2745,8 @@ void resetServerStats(void) { server.stat_reply_buffer_shrinks = 0; server.stat_reply_buffer_expands = 0; server.stat_cluster_incompatible_ops = 0; + server.stat_total_prefetch_batches = 0; + server.stat_total_prefetch_entries = 0; memset(server.duration_stats, 0, sizeof(durationStats) * EL_DURATION_TYPE_NUM); server.el_cmd_cnt_max = 0; lazyfreeResetStats(); @@ -6223,6 +6225,8 @@ sds genRedisInfoString(dict *section_dict, int all_sections, int everything) { "total_writes_processed:%lld\r\n", stat_total_writes_processed, "io_threaded_reads_processed:%lld\r\n", stat_io_reads_processed, "io_threaded_writes_processed:%lld\r\n", stat_io_writes_processed, + "io_threaded_total_prefetch_batches:%lld\r\n", server.stat_total_prefetch_batches, + "io_threaded_total_prefetch_entries:%lld\r\n", server.stat_total_prefetch_entries, "client_query_buffer_limit_disconnections:%lld\r\n", stat_client_qbuf_limit_disconnections, "client_output_buffer_limit_disconnections:%lld\r\n", server.stat_client_outbuf_limit_disconnections, "reply_buffer_shrinks:%lld\r\n", server.stat_reply_buffer_shrinks, diff --git a/src/server.h b/src/server.h index d631cc19c..a434dad76 100644 --- a/src/server.h +++ b/src/server.h @@ -63,6 +63,7 @@ typedef long long ustime_t; /* microsecond time type. */ #include "rax.h" /* Radix tree */ #include "connection.h" /* Connection abstraction */ #include "eventnotifier.h" /* Event notification */ +#include "memory_prefetch.h" #define REDISMODULE_CORE 1 typedef struct redisObject robj; @@ -1838,6 +1839,7 @@ struct redisServer { int io_threads_clients_num[IO_THREADS_MAX_NUM]; /* Number of clients assigned to each IO thread. */ int io_threads_do_reads; /* Read and parse from IO threads? */ int io_threads_active; /* Is IO threads currently active? */ + int prefetch_batch_max_size;/* Maximum number of keys to prefetch in a single batch */ long long events_processed_while_blocked; /* processEventsWhileBlocked() */ int enable_protected_configs; /* Enable the modification of protected configs, see PROTECTED_ACTION_ALLOWED_* */ int enable_debug_cmd; /* Enable DEBUG commands, see PROTECTED_ACTION_ALLOWED_* */ @@ -1913,6 +1915,8 @@ struct redisServer { redisAtomic long long stat_client_qbuf_limit_disconnections; /* Total number of clients reached query buf length limit */ long long stat_client_outbuf_limit_disconnections; /* Total number of clients reached output buf length limit */ long long stat_cluster_incompatible_ops; /* Number of operations that are incompatible with cluster mode */ + long long stat_total_prefetch_entries; /* Total number of prefetched dict entries */ + long long stat_total_prefetch_batches; /* Total number of prefetched batches */ /* The following two are used to track instantaneous metrics, like * number of operations per second, network traffic. */ struct { @@ -3695,6 +3699,7 @@ void freeReplicationBacklogRefMemAsync(list *blocks, rax *index); int getKeysFromCommandWithSpecs(struct redisCommand *cmd, robj **argv, int argc, int search_flags, getKeysResult *result); keyReference *getKeysPrepareResult(getKeysResult *result, int numkeys); int getKeysFromCommand(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result); +int getSlotFromCommand(struct redisCommand *cmd, robj **argv, int argc); int doesCommandHaveKeys(struct redisCommand *cmd); int getChannelsFromCommand(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result); int doesCommandHaveChannelsWithFlags(struct redisCommand *cmd, int flags); diff --git a/tests/unit/networking.tcl b/tests/unit/networking.tcl index 79d6e399d..1b8e50291 100644 --- a/tests/unit/networking.tcl +++ b/tests/unit/networking.tcl @@ -1,3 +1,17 @@ +# +# Copyright (c) 2009-Present, Redis Ltd. +# All rights reserved. +# +# Copyright (c) 2025-present, Valkey contributors. +# All rights reserved. +# +# Licensed under your choice of (a) the Redis Source Available License 2.0 +# (RSALv2); or (b) the Server Side Public License v1 (SSPLv1); or (c) the +# GNU Affero General Public License v3 (AGPLv3). +# +# Portions of this file are available under BSD3 terms; see REDISCONTRIBUTIONS for more information. +# + source tests/support/cli.tcl test {CONFIG SET port number} { @@ -170,3 +184,143 @@ start_server {config "minimal.conf" tags {"external:skip"}} { } } } + +start_server {config "minimal.conf" tags {"external:skip"} overrides {enable-debug-command {yes} io-threads 2}} { + set server_pid [s process_id] + # Since each thread may perform memory prefetch independently, this test is + # only run when the number of IO threads is 2 to ensure deterministic results. + if {[r config get io-threads] eq "io-threads 2"} { + test {prefetch works as expected when killing a client from the middle of prefetch commands batch} { + # Create 16 (prefetch batch size) +1 clients + for {set i 0} {$i < 16} {incr i} { + set rd$i [redis_deferring_client] + } + + # set a key that will be later be prefetch + r set a 0 + + # Get the client ID of rd4 + $rd4 client id + set rd4_id [$rd4 read] + + # Create a batch of commands by suspending the server for a while + # before responding to the first command + pause_process $server_pid + + # The first client will kill the fourth client + $rd0 client kill id $rd4_id + + # Send set commands for all clients except the first + for {set i 1} {$i < 16} {incr i} { + [set rd$i] set a $i + [set rd$i] flush + } + + # Resume the server + resume_process $server_pid + + # Read the results + assert_equal {1} [$rd0 read] + catch {$rd4 read} err + assert_match {I/O error reading reply} $err + + # verify the prefetch stats are as expected + set info [r info stats] + set prefetch_entries [getInfoProperty $info io_threaded_total_prefetch_entries] + assert_range $prefetch_entries 2 15; # With slower machines, the number of prefetch entries can be lower + set prefetch_batches [getInfoProperty $info io_threaded_total_prefetch_batches] + assert_range $prefetch_batches 1 7; # With slower machines, the number of batches can be higher + + # Verify the final state + $rd15 get a + assert_equal {OK} [$rd15 read] + assert_equal {15} [$rd15 read] + } + + test {prefetch works as expected when changing the batch size while executing the commands batch} { + # Create 16 (default prefetch batch size) clients + for {set i 0} {$i < 16} {incr i} { + set rd$i [redis_deferring_client] + } + + # Create a batch of commands by suspending the server for a while + # before responding to the first command + pause_process $server_pid + + # Send set commands for all clients the 5th client will change the prefetch batch size + for {set i 0} {$i < 16} {incr i} { + if {$i == 4} { + [set rd$i] config set prefetch-batch-max-size 1 + } + [set rd$i] set a $i + [set rd$i] flush + } + # Resume the server + resume_process $server_pid + # Read the results + for {set i 0} {$i < 16} {incr i} { + assert_equal {OK} [[set rd$i] read] + [set rd$i] close + } + + # assert the configured prefetch batch size was changed + assert {[r config get prefetch-batch-max-size] eq "prefetch-batch-max-size 1"} + } + + proc do_prefetch_batch {server_pid batch_size} { + # Create clients + for {set i 0} {$i < $batch_size} {incr i} { + set rd$i [redis_deferring_client] + } + + # Suspend the server to batch the commands + pause_process $server_pid + + # Send commands from all clients + for {set i 0} {$i < $batch_size} {incr i} { + [set rd$i] set a $i + [set rd$i] flush + } + + # Resume the server to process the batch + resume_process $server_pid + + # Verify responses + for {set i 0} {$i < $batch_size} {incr i} { + assert_equal {OK} [[set rd$i] read] + [set rd$i] close + } + } + + test {no prefetch when the batch size is set to 0} { + # set the batch size to 0 + r config set prefetch-batch-max-size 0 + # save the current value of prefetch entries + set info [r info stats] + set prefetch_entries [getInfoProperty $info io_threaded_total_prefetch_entries] + + do_prefetch_batch $server_pid 16 + + # assert the prefetch entries did not change + set info [r info stats] + set new_prefetch_entries [getInfoProperty $info io_threaded_total_prefetch_entries] + assert_equal $prefetch_entries $new_prefetch_entries + } + + test {Prefetch can resume working when the configuration option is set to a non-zero value} { + # save the current value of prefetch entries + set info [r info stats] + set prefetch_entries [getInfoProperty $info io_threaded_total_prefetch_entries] + # set the batch size to 0 + r config set prefetch-batch-max-size 16 + + do_prefetch_batch $server_pid 16 + + # assert the prefetch entries did not change + set info [r info stats] + set new_prefetch_entries [getInfoProperty $info io_threaded_total_prefetch_entries] + # With slower machines, the number of prefetch entries can be lower + assert_range $new_prefetch_entries [expr {$prefetch_entries + 2}] [expr {$prefetch_entries + 16}] + } + } +} diff --git a/utils/generate-fmtargs.py b/utils/generate-fmtargs.py index e16cc368f..1eced02c6 100755 --- a/utils/generate-fmtargs.py +++ b/utils/generate-fmtargs.py @@ -1,7 +1,7 @@ #!/usr/bin/env python3 # Outputs the generated part of src/fmtargs.h -MAX_ARGS = 120 +MAX_ARGS = 160 import os print("/* Everything below this line is automatically generated by")