redis/tests/modules/blockonbackground.c
debing.sun 4581d43230
Fix daylight race condition and some thread leaks (#13191)
fix some issues that come from sanitizer thread report.

1. when the main thread is updating daylight_active, other threads (bio,
module thread) may be writing logs at the same time.
```
WARNING: ThreadSanitizer: data race (pid=661064)
  Read of size 4 at 0x55c9a4d11c70 by thread T2:
    #0 serverLogRaw /home/sundb/data/redis_fork/src/server.c:116 (redis-server+0x8d797) (BuildId: dca0b1945ba30010e36129bdb296e488dd2b32d0)
    #1 _serverLog.constprop.2 /home/sundb/data/redis_fork/src/server.c:146 (redis-server+0x2a3b14) (BuildId: dca0b1945ba30010e36129bdb296e488dd2b32d0)
    #2 bioProcessBackgroundJobs /home/sundb/data/redis_fork/src/bio.c:329 (redis-server+0x1c24ca) (BuildId: dca0b1945ba30010e36129bdb296e488dd2b32d0)

  Previous write of size 4 at 0x55c9a4d11c70 by main thread (mutexes: write M0, write M1, write M2, write M3):
    #0 updateCachedTimeWithUs /home/sundb/data/redis_fork/src/server.c:1102 (redis-server+0x925e7) (BuildId: dca0b1945ba30010e36129bdb296e488dd2b32d0)
    #1 updateCachedTimeWithUs /home/sundb/data/redis_fork/src/server.c:1087 (redis-server+0x925e7)
    #2 updateCachedTime /home/sundb/data/redis_fork/src/server.c:1118 (redis-server+0x925e7)
    #3 afterSleep /home/sundb/data/redis_fork/src/server.c:1811 (redis-server+0x925e7)
    #4 aeProcessEvents /home/sundb/data/redis_fork/src/ae.c:389 (redis-server+0x85ae0) (BuildId: dca0b1945ba30010e36129bdb296e488dd2b32d0)
    #5 aeProcessEvents /home/sundb/data/redis_fork/src/ae.c:342 (redis-server+0x85ae0)
    #6 aeMain /home/sundb/data/redis_fork/src/ae.c:477 (redis-server+0x85ae0)
    #7 main /home/sundb/data/redis_fork/src/server.c:7211 (redis-server+0x7168c) (BuildId: dca0b1945ba30010e36129bdb296e488dd2b32d0)
```

2. thread leaks in module tests
```
WARNING: ThreadSanitizer: thread leak (pid=668683)
  Thread T13 (tid=670041, finished) created by main thread at:
    #0 pthread_create ../../../../src/libsanitizer/tsan/tsan_interceptors_posix.cpp:1036 (libtsan.so.2+0x3d179) (BuildId: 28a9f70061dbb2dfa2cef661d3b23aff4ea13536)
    #1 HelloBlockNoTracking_RedisCommand /home/sundb/data/redis_fork/tests/modules/blockonbackground.c:200 (blockonbackground.so+0x97fd) (BuildId: 9cd187906c57e88cdf896d121d1d96448b37a136)
    #2 HelloBlockNoTracking_RedisCommand /home/sundb/data/redis_fork/tests/modules/blockonbackground.c:169 (blockonbackground.so+0x97fd)
    #3 call /home/sundb/data/redis_fork/src/server.c:3546 (redis-server+0x9b7fb) (BuildId: dca0b1945ba30010e36129bdb296e488dd2b32d0)
    #4 processCommand /home/sundb/data/redis_fork/src/server.c:4176 (redis-server+0xa091c) (BuildId: dca0b1945ba30010e36129bdb296e488dd2b32d0)
    #5 processCommandAndResetClient /home/sundb/data/redis_fork/src/networking.c:2468 (redis-server+0xd2b8e) (BuildId: dca0b1945ba30010e36129bdb296e488dd2b32d0)
    #6 processInputBuffer /home/sundb/data/redis_fork/src/networking.c:2576 (redis-server+0xd2b8e)
    #7 readQueryFromClient /home/sundb/data/redis_fork/src/networking.c:2722 (redis-server+0xd358f) (BuildId: dca0b1945ba30010e36129bdb296e488dd2b32d0)
    #8 callHandler /home/sundb/data/redis_fork/src/connhelpers.h:58 (redis-server+0x288a7b) (BuildId: dca0b1945ba30010e36129bdb296e488dd2b32d0)
    #9 connSocketEventHandler /home/sundb/data/redis_fork/src/socket.c:277 (redis-server+0x288a7b)
    #10 aeProcessEvents /home/sundb/data/redis_fork/src/ae.c:417 (redis-server+0x85b45) (BuildId: dca0b1945ba30010e36129bdb296e488dd2b32d0)
    #11 aeProcessEvents /home/sundb/data/redis_fork/src/ae.c:342 (redis-server+0x85b45)
    #12 aeMain /home/sundb/data/redis_fork/src/ae.c:477 (redis-server+0x85b45)
    #13 main /home/sundb/data/redis_fork/src/server.c:7211 (redis-server+0x7168c) (BuildId: dca0b1945ba30010e36129bdb296e488dd2b32d0)
```
2024-04-04 13:49:51 +03:00

333 lines
13 KiB
C

#define _XOPEN_SOURCE 700
#include "redismodule.h"
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <time.h>
#define UNUSED(x) (void)(x)
typedef struct {
/* Mutex for protecting RedisModule_BlockedClientMeasureTime*() API from race
* conditions due to timeout callback triggered in the main thread. */
pthread_mutex_t measuretime_mutex;
int measuretime_completed; /* Indicates that time measure has ended and will not continue further */
int myint; /* Used for replying */
} BlockPrivdata;
void blockClientPrivdataInit(RedisModuleBlockedClient *bc) {
BlockPrivdata *block_privdata = RedisModule_Calloc(1, sizeof(*block_privdata));
block_privdata->measuretime_mutex = (pthread_mutex_t)PTHREAD_MUTEX_INITIALIZER;
RedisModule_BlockClientSetPrivateData(bc, block_privdata);
}
void blockClientMeasureTimeStart(RedisModuleBlockedClient *bc, BlockPrivdata *block_privdata) {
pthread_mutex_lock(&block_privdata->measuretime_mutex);
RedisModule_BlockedClientMeasureTimeStart(bc);
pthread_mutex_unlock(&block_privdata->measuretime_mutex);
}
void blockClientMeasureTimeEnd(RedisModuleBlockedClient *bc, BlockPrivdata *block_privdata, int completed) {
pthread_mutex_lock(&block_privdata->measuretime_mutex);
if (!block_privdata->measuretime_completed) {
RedisModule_BlockedClientMeasureTimeEnd(bc);
if (completed) block_privdata->measuretime_completed = 1;
}
pthread_mutex_unlock(&block_privdata->measuretime_mutex);
}
/* Reply callback for blocking command BLOCK.DEBUG */
int HelloBlock_Reply(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
UNUSED(argv);
UNUSED(argc);
BlockPrivdata *block_privdata = RedisModule_GetBlockedClientPrivateData(ctx);
return RedisModule_ReplyWithLongLong(ctx,block_privdata->myint);
}
/* Timeout callback for blocking command BLOCK.DEBUG */
int HelloBlock_Timeout(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
UNUSED(argv);
UNUSED(argc);
RedisModuleBlockedClient *bc = RedisModule_GetBlockedClientHandle(ctx);
BlockPrivdata *block_privdata = RedisModule_GetBlockedClientPrivateData(ctx);
blockClientMeasureTimeEnd(bc, block_privdata, 1);
return RedisModule_ReplyWithSimpleString(ctx,"Request timedout");
}
/* Private data freeing callback for BLOCK.DEBUG command. */
void HelloBlock_FreeData(RedisModuleCtx *ctx, void *privdata) {
UNUSED(ctx);
BlockPrivdata *block_privdata = privdata;
pthread_mutex_destroy(&block_privdata->measuretime_mutex);
RedisModule_Free(privdata);
}
/* Private data freeing callback for BLOCK.BLOCK command. */
void HelloBlock_FreeStringData(RedisModuleCtx *ctx, void *privdata) {
RedisModule_FreeString(ctx, (RedisModuleString*)privdata);
}
/* The thread entry point that actually executes the blocking part
* of the command BLOCK.DEBUG. */
void *BlockDebug_ThreadMain(void *arg) {
void **targ = arg;
RedisModuleBlockedClient *bc = targ[0];
long long delay = (unsigned long)targ[1];
long long enable_time_track = (unsigned long)targ[2];
BlockPrivdata *block_privdata = RedisModule_BlockClientGetPrivateData(bc);
if (enable_time_track)
blockClientMeasureTimeStart(bc, block_privdata);
RedisModule_Free(targ);
struct timespec ts;
ts.tv_sec = delay / 1000;
ts.tv_nsec = (delay % 1000) * 1000000;
nanosleep(&ts, NULL);
if (enable_time_track)
blockClientMeasureTimeEnd(bc, block_privdata, 0);
block_privdata->myint = rand();
RedisModule_UnblockClient(bc,block_privdata);
return NULL;
}
/* The thread entry point that actually executes the blocking part
* of the command BLOCK.DOUBLE_DEBUG. */
void *DoubleBlock_ThreadMain(void *arg) {
void **targ = arg;
RedisModuleBlockedClient *bc = targ[0];
long long delay = (unsigned long)targ[1];
BlockPrivdata *block_privdata = RedisModule_BlockClientGetPrivateData(bc);
blockClientMeasureTimeStart(bc, block_privdata);
RedisModule_Free(targ);
struct timespec ts;
ts.tv_sec = delay / 1000;
ts.tv_nsec = (delay % 1000) * 1000000;
nanosleep(&ts, NULL);
blockClientMeasureTimeEnd(bc, block_privdata, 0);
/* call again RedisModule_BlockedClientMeasureTimeStart() and
* RedisModule_BlockedClientMeasureTimeEnd and ensure that the
* total execution time is 2x the delay. */
blockClientMeasureTimeStart(bc, block_privdata);
nanosleep(&ts, NULL);
blockClientMeasureTimeEnd(bc, block_privdata, 0);
block_privdata->myint = rand();
RedisModule_UnblockClient(bc,block_privdata);
return NULL;
}
void HelloBlock_Disconnected(RedisModuleCtx *ctx, RedisModuleBlockedClient *bc) {
RedisModule_Log(ctx,"warning","Blocked client %p disconnected!",
(void*)bc);
}
/* BLOCK.DEBUG <delay_ms> <timeout_ms> -- Block for <count> milliseconds, then reply with
* a random number. Timeout is the command timeout, so that you can test
* what happens when the delay is greater than the timeout. */
int HelloBlock_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
if (argc != 3) return RedisModule_WrongArity(ctx);
long long delay;
long long timeout;
if (RedisModule_StringToLongLong(argv[1],&delay) != REDISMODULE_OK) {
return RedisModule_ReplyWithError(ctx,"ERR invalid count");
}
if (RedisModule_StringToLongLong(argv[2],&timeout) != REDISMODULE_OK) {
return RedisModule_ReplyWithError(ctx,"ERR invalid count");
}
pthread_t tid;
RedisModuleBlockedClient *bc = RedisModule_BlockClient(ctx,HelloBlock_Reply,HelloBlock_Timeout,HelloBlock_FreeData,timeout);
blockClientPrivdataInit(bc);
/* Here we set a disconnection handler, however since this module will
* block in sleep() in a thread, there is not much we can do in the
* callback, so this is just to show you the API. */
RedisModule_SetDisconnectCallback(bc,HelloBlock_Disconnected);
/* Now that we setup a blocking client, we need to pass the control
* to the thread. However we need to pass arguments to the thread:
* the delay and a reference to the blocked client handle. */
void **targ = RedisModule_Alloc(sizeof(void*)*3);
targ[0] = bc;
targ[1] = (void*)(unsigned long) delay;
// pass 1 as flag to enable time tracking
targ[2] = (void*)(unsigned long) 1;
if (pthread_create(&tid,NULL,BlockDebug_ThreadMain,targ) != 0) {
RedisModule_AbortBlock(bc);
return RedisModule_ReplyWithError(ctx,"-ERR Can't start thread");
}
pthread_detach(tid);
return REDISMODULE_OK;
}
/* BLOCK.DEBUG_NOTRACKING <delay_ms> <timeout_ms> -- Block for <count> milliseconds, then reply with
* a random number. Timeout is the command timeout, so that you can test
* what happens when the delay is greater than the timeout.
* this command does not track background time so the background time should no appear in stats*/
int HelloBlockNoTracking_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
if (argc != 3) return RedisModule_WrongArity(ctx);
long long delay;
long long timeout;
if (RedisModule_StringToLongLong(argv[1],&delay) != REDISMODULE_OK) {
return RedisModule_ReplyWithError(ctx,"ERR invalid count");
}
if (RedisModule_StringToLongLong(argv[2],&timeout) != REDISMODULE_OK) {
return RedisModule_ReplyWithError(ctx,"ERR invalid count");
}
pthread_t tid;
RedisModuleBlockedClient *bc = RedisModule_BlockClient(ctx,HelloBlock_Reply,HelloBlock_Timeout,HelloBlock_FreeData,timeout);
blockClientPrivdataInit(bc);
/* Here we set a disconnection handler, however since this module will
* block in sleep() in a thread, there is not much we can do in the
* callback, so this is just to show you the API. */
RedisModule_SetDisconnectCallback(bc,HelloBlock_Disconnected);
/* Now that we setup a blocking client, we need to pass the control
* to the thread. However we need to pass arguments to the thread:
* the delay and a reference to the blocked client handle. */
void **targ = RedisModule_Alloc(sizeof(void*)*3);
targ[0] = bc;
targ[1] = (void*)(unsigned long) delay;
// pass 0 as flag to enable time tracking
targ[2] = (void*)(unsigned long) 0;
if (pthread_create(&tid,NULL,BlockDebug_ThreadMain,targ) != 0) {
RedisModule_AbortBlock(bc);
return RedisModule_ReplyWithError(ctx,"-ERR Can't start thread");
}
pthread_detach(tid);
return REDISMODULE_OK;
}
/* BLOCK.DOUBLE_DEBUG <delay_ms> -- Block for 2 x <count> milliseconds,
* then reply with a random number.
* This command is used to test multiple calls to RedisModule_BlockedClientMeasureTimeStart()
* and RedisModule_BlockedClientMeasureTimeEnd() within the same execution. */
int HelloDoubleBlock_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
if (argc != 2) return RedisModule_WrongArity(ctx);
long long delay;
if (RedisModule_StringToLongLong(argv[1],&delay) != REDISMODULE_OK) {
return RedisModule_ReplyWithError(ctx,"ERR invalid count");
}
pthread_t tid;
RedisModuleBlockedClient *bc = RedisModule_BlockClient(ctx,HelloBlock_Reply,HelloBlock_Timeout,HelloBlock_FreeData,0);
blockClientPrivdataInit(bc);
/* Now that we setup a blocking client, we need to pass the control
* to the thread. However we need to pass arguments to the thread:
* the delay and a reference to the blocked client handle. */
void **targ = RedisModule_Alloc(sizeof(void*)*2);
targ[0] = bc;
targ[1] = (void*)(unsigned long) delay;
if (pthread_create(&tid,NULL,DoubleBlock_ThreadMain,targ) != 0) {
RedisModule_AbortBlock(bc);
return RedisModule_ReplyWithError(ctx,"-ERR Can't start thread");
}
pthread_detach(tid);
return REDISMODULE_OK;
}
RedisModuleBlockedClient *blocked_client = NULL;
/* BLOCK.BLOCK [TIMEOUT] -- Blocks the current client until released
* or TIMEOUT seconds. If TIMEOUT is zero, no timeout function is
* registered.
*/
int Block_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
if (RedisModule_IsBlockedReplyRequest(ctx)) {
RedisModuleString *r = RedisModule_GetBlockedClientPrivateData(ctx);
return RedisModule_ReplyWithString(ctx, r);
} else if (RedisModule_IsBlockedTimeoutRequest(ctx)) {
RedisModule_UnblockClient(blocked_client, NULL); /* Must be called to avoid leaks. */
blocked_client = NULL;
return RedisModule_ReplyWithSimpleString(ctx, "Timed out");
}
if (argc != 2) return RedisModule_WrongArity(ctx);
long long timeout;
if (RedisModule_StringToLongLong(argv[1], &timeout) != REDISMODULE_OK) {
return RedisModule_ReplyWithError(ctx, "ERR invalid timeout");
}
if (blocked_client) {
return RedisModule_ReplyWithError(ctx, "ERR another client already blocked");
}
/* Block client. We use this function as both a reply and optional timeout
* callback and differentiate the different code flows above.
*/
blocked_client = RedisModule_BlockClient(ctx, Block_RedisCommand,
timeout > 0 ? Block_RedisCommand : NULL, HelloBlock_FreeStringData, timeout);
return REDISMODULE_OK;
}
/* BLOCK.IS_BLOCKED -- Returns 1 if we have a blocked client, or 0 otherwise.
*/
int IsBlocked_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
UNUSED(argv);
UNUSED(argc);
RedisModule_ReplyWithLongLong(ctx, blocked_client ? 1 : 0);
return REDISMODULE_OK;
}
/* BLOCK.RELEASE [reply] -- Releases the blocked client and produce the specified reply.
*/
int Release_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
if (argc != 2) return RedisModule_WrongArity(ctx);
if (!blocked_client) {
return RedisModule_ReplyWithError(ctx, "ERR No blocked client");
}
RedisModuleString *replystr = argv[1];
RedisModule_RetainString(ctx, replystr);
RedisModule_UnblockClient(blocked_client, replystr);
blocked_client = NULL;
RedisModule_ReplyWithSimpleString(ctx, "OK");
return REDISMODULE_OK;
}
int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
UNUSED(argv);
UNUSED(argc);
if (RedisModule_Init(ctx,"block",1,REDISMODULE_APIVER_1)
== REDISMODULE_ERR) return REDISMODULE_ERR;
if (RedisModule_CreateCommand(ctx,"block.debug",
HelloBlock_RedisCommand,"",0,0,0) == REDISMODULE_ERR)
return REDISMODULE_ERR;
if (RedisModule_CreateCommand(ctx,"block.double_debug",
HelloDoubleBlock_RedisCommand,"",0,0,0) == REDISMODULE_ERR)
return REDISMODULE_ERR;
if (RedisModule_CreateCommand(ctx,"block.debug_no_track",
HelloBlockNoTracking_RedisCommand,"",0,0,0) == REDISMODULE_ERR)
return REDISMODULE_ERR;
if (RedisModule_CreateCommand(ctx, "block.block",
Block_RedisCommand, "", 0, 0, 0) == REDISMODULE_ERR)
return REDISMODULE_ERR;
if (RedisModule_CreateCommand(ctx,"block.is_blocked",
IsBlocked_RedisCommand,"",0,0,0) == REDISMODULE_ERR)
return REDISMODULE_ERR;
if (RedisModule_CreateCommand(ctx,"block.release",
Release_RedisCommand,"",0,0,0) == REDISMODULE_ERR)
return REDISMODULE_ERR;
return REDISMODULE_OK;
}