diff --git a/Makefile b/Makefile index ff1b45a1a..407ed08ce 100644 --- a/Makefile +++ b/Makefile @@ -17,7 +17,7 @@ endif endif endif -CFLAGS = -O2 -Wall -Wextra -g -ffast-math $(SAN) +CFLAGS = -O2 -Wall -Wextra -g $(SAN) -std=c11 LDFLAGS = -lm $(SAN) # Detect OS @@ -26,7 +26,7 @@ uname_M := $(shell sh -c 'uname -m 2>/dev/null || echo not') # Shared library compile flags for linux / osx ifeq ($(uname_S),Linux) - SHOBJ_CFLAGS ?= -W -Wall -fno-common -g -ggdb -std=c99 -O2 + SHOBJ_CFLAGS ?= -W -Wall -fno-common -g -ggdb -std=c11 -O2 SHOBJ_LDFLAGS ?= -shared ifneq (,$(findstring armv,$(uname_M))) SHOBJ_LDFLAGS += -latomic @@ -35,7 +35,7 @@ ifneq (,$(findstring aarch64,$(uname_M))) SHOBJ_LDFLAGS += -latomic endif else - SHOBJ_CFLAGS ?= -W -Wall -dynamic -fno-common -g -ggdb -std=c99 -Ofast -ffast-math + SHOBJ_CFLAGS ?= -W -Wall -dynamic -fno-common -g -ggdb -std=c11 -O3 SHOBJ_LDFLAGS ?= -bundle -undefined dynamic_lookup endif diff --git a/vset.c b/vset.c index fd5ea573a..c83a4a485 100644 --- a/vset.c +++ b/vset.c @@ -57,6 +57,43 @@ * our read commands running in the main thread don't need to use * hnsw_search() or other HNSW functions using the visited epochs slots * we are safe. + * + * 4. There is a race from the moment we create a thread, passing the + * vector set object, to the moment the thread can actually lock the + * result win the in_use_lock mutex: as the thread starts, in the meanwhile + * a DEL/expire could trigger and remove the object. For this reason + * we use an atomic counter that protects our object for this small + * time in vectorSetWaitAllBackgroundClients(). This prevents removal + * of objects that are about to be taken by threads. + * + * Note that other competing soltuions could be used to fix the problem + * but have their set of issues, however they are worth documenting here + * and evaluating in the future: + * + * A. Using a conditional variable we could "wait" for the thread to + * acquire the lock. However this means waiting before returning + * to the event loop, and would make the command execution slower. + * B. We could use again an atomic variable, like we did, but this time + * as a refcount for the object, with a vsetAcquire() vsetRelease(). + * In this case, the command could retain the object in the main thread + * before starting the thread, and the thread, after the work is done, + * could release it. This way sometimes the object would be freed by + * the thread, and it's while now can be safe to do the kind of resource + * deallocation that vectorSetReleaseObject() does, given that the + * Redis Modules API is not always thread safe this solution may not + * be future-proof. However there is to evaluate it better in the + * future. + * C. We could use the "B" solution but instead of freeing the object + * in the thread, in this specific case we could just put it into a + * list and defer it for later freeing (for instance in the reply + * callback), so that the object is always freed in the main thread. + * This would require a list of objects to free. + * + * However the current solution only disadvantage is the potential busy + * loop, but this busy loop in practical terms will almost never do + * much: to trigger it, a number of circumnstances must happen: deleting + * Vector Set keys while using them, hitting the small window needed to + * start the thread and read-lock the mutex. */ #define _DEFAULT_SOURCE @@ -72,6 +109,7 @@ #include #include #include +#include #include "hnsw.h" // We inline directly the expression implementation here so that building @@ -107,6 +145,8 @@ struct vsetObject { uint64_t id; // Unique ID used by threaded VADD to know the // object is still the same. uint64_t numattribs; // Number of nodes associated with an attribute. + atomic_int thread_creation_pending; // Number of threads that are currently + // pending to lock the object. }; /* Each node has two associated values: the associated string (the item @@ -200,6 +240,7 @@ struct vsetObject *createVectorSetObject(unsigned int dim, uint32_t quant_type, o->proj_matrix = NULL; o->proj_input_size = 0; o->numattribs = 0; + o->thread_creation_pending = 0; RedisModule_Assert(pthread_rwlock_init(&o->in_use_lock,NULL) == 0); return o; } @@ -223,8 +264,19 @@ void vectorSetReleaseObject(struct vsetObject *o) { /* Wait for all the threads performing operations on this * index to terminate their work (locking for write will - * wait for all the other threads). */ -void vectorSetWaitAllBackgroundClients(struct vsetObject *vset) { + * wait for all the other threads). + * + * if 'for_del' is set to 1, we also wait for all the pending threads + * that still didn't acquire the lock to finish their work. This + * is useful only if we are going to call this function to delete + * the object, and not if we want to just to modify it. */ +void vectorSetWaitAllBackgroundClients(struct vsetObject *vset, int for_del) { + if (for_del) { + // If we are going to destroy the object, after this call, let's + // wait for threads that are being created and still didn't had + // a chance to acquire the lock. + while (vset->thread_creation_pending > 0); + } RedisModule_Assert(pthread_rwlock_wrlock(&vset->in_use_lock) == 0); pthread_rwlock_unlock(&vset->in_use_lock); } @@ -252,7 +304,7 @@ int vectorSetInsert(struct vsetObject *o, float *vec, int8_t *qvec, float qrange /* Wait for clients in the background: background VSIM * operations touch the nodes attributes we are going * to touch. */ - vectorSetWaitAllBackgroundClients(o); + vectorSetWaitAllBackgroundClients(o,0); struct vsetNodeVal *nv = node->value; /* Pass NULL as value-free function. We want to reuse @@ -403,6 +455,11 @@ void *VADD_thread(void *arg) { float *vec = targ[3]; int ef = (uint64_t)targ[6]; + /* Lock the object and signal that we are no longer pending + * the lock acquisition. */ + RedisModule_Assert(pthread_rwlock_rdlock(&vset->in_use_lock) == 0); + vset->thread_creation_pending--; + /* Look for candidates... */ InsertContext *ic = hnsw_prepare_insert(vset->hnsw, vec, NULL, 0, 0, ef); targ[5] = ic; // Pass the context to the reply callback. @@ -680,10 +737,6 @@ int VADD_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { * way, or use a thread to do candidate neighbors selection and only * later, in the reply callback, actually add the element. */ if (cas) { - /* Make sure the key does not get deleted during the background - * operation. See VSIM implementation for more information. */ - RedisModule_Assert(pthread_rwlock_rdlock(&vset->in_use_lock) == 0); - RedisModuleBlockedClient *bc = RedisModule_BlockClient(ctx,VADD_CASReply,NULL,NULL,0); pthread_t tid; void **targ = RedisModule_Alloc(sizeof(void*)*8); @@ -698,8 +751,9 @@ int VADD_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { RedisModule_RetainString(ctx,val); if (attrib) RedisModule_RetainString(ctx,attrib); RedisModule_BlockedClientMeasureTimeStart(bc); + vset->thread_creation_pending++; if (pthread_create(&tid,NULL,VADD_thread,targ) != 0) { - pthread_rwlock_unlock(&vset->in_use_lock); + vset->thread_creation_pending--; RedisModule_AbortBlock(bc); RedisModule_Free(targ); RedisModule_FreeString(ctx,val); @@ -823,15 +877,20 @@ void *VSIM_thread(void *arg) { RedisModule_Free(targ[4]); RedisModule_Free(targ); + /* Lock the object and signal that we are no longer pending + * the lock acquisition. */ + RedisModule_Assert(pthread_rwlock_rdlock(&vset->in_use_lock) == 0); + vset->thread_creation_pending--; + // Accumulate reply in a thread safe context: no contention. RedisModuleCtx *ctx = RedisModule_GetThreadSafeContext(bc); // Run the query. VSIM_execute(ctx, vset, vec, count, epsilon, withscores, ef, filter_expr, filter_ef, ground_truth); + pthread_rwlock_unlock(&vset->in_use_lock); // Cleanup. RedisModule_FreeThreadSafeContext(ctx); - pthread_rwlock_unlock(&vset->in_use_lock); RedisModule_BlockedClientMeasureTimeEnd(bc); RedisModule_UnblockClient(bc,NULL); return NULL; @@ -1022,26 +1081,12 @@ int VSIM_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { } if (threaded_request) { - /* Spawn the thread serving the request: - * Acquire the lock here so that the object will not be - * destroyed while we work with it in the thread. - * - * This lock should never block, since: - * 1. If we are in the main thread, the key exists (we looked it up) - * and so there is no deletion in progress. - * 2. If the write lock is taken while destroying the object, another - * command or operation (expire?) from the main thread acquired - * it to delete the object, so *it* will block if there are still - * operations in progress on this key. - * - * Note: even if we create one thread per request, the underlying + /* Note: even if we create one thread per request, the underlying * HNSW library has a fixed number of slots for the threads, as it's * defined in HNSW_MAX_THREADS (beware that if you increase it, * every node will use more memory). This means that while this request * is threaded, and will NOT block Redis, it may end waiting for a * free slot if all the HNSW_MAX_THREADS slots are used. */ - RedisModule_Assert(pthread_rwlock_rdlock(&vset->in_use_lock) == 0); - RedisModuleBlockedClient *bc = RedisModule_BlockClient(ctx,NULL,NULL,NULL,0); pthread_t tid; void **targ = RedisModule_Alloc(sizeof(void*)*10); @@ -1057,8 +1102,9 @@ int VSIM_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { targ[8] = (void*)(unsigned long)filter_ef; targ[9] = (void*)(unsigned long)ground_truth; RedisModule_BlockedClientMeasureTimeStart(bc); + vset->thread_creation_pending++; if (pthread_create(&tid,NULL,VSIM_thread,targ) != 0) { - pthread_rwlock_unlock(&vset->in_use_lock); + vset->thread_creation_pending--; RedisModule_AbortBlock(bc); RedisModule_Free(targ[4]); RedisModule_Free(targ); @@ -1259,7 +1305,7 @@ int VSETATTR_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int arg /* Background VSIM operations use the node attributes, so * wait for background operations before messing with them. */ - vectorSetWaitAllBackgroundClients(vset); + vectorSetWaitAllBackgroundClients(vset,0); /* Set or delete the attribute based on the fact it's an empty * string or not. */ @@ -1817,7 +1863,7 @@ size_t VectorSetMemUsage(const void *value) { void VectorSetFree(void *value) { struct vsetObject *vset = value; - vectorSetWaitAllBackgroundClients(vset); + vectorSetWaitAllBackgroundClients(vset,1); vectorSetReleaseObject(value); }