diff --git a/.github/workflows/daily.yml b/.github/workflows/daily.yml index 8a03854a0..0e478bb77 100644 --- a/.github/workflows/daily.yml +++ b/.github/workflows/daily.yml @@ -322,7 +322,7 @@ jobs: run: sudo apt-get install tcl8.6 tclx - name: test if: true && !contains(github.event.inputs.skiptests, 'redis') - run: ./runtest --config io-threads 4 --accurate --verbose --tags network --dump-logs ${{github.event.inputs.test_args}} + run: ./runtest --config io-threads 4 --accurate --verbose --tags "network iothreads psync2 repl failover" --dump-logs ${{github.event.inputs.test_args}} - name: cluster tests if: true && !contains(github.event.inputs.skiptests, 'cluster') run: ./runtest-cluster --config io-threads 4 ${{github.event.inputs.cluster_test_args}} @@ -433,7 +433,10 @@ jobs: sudo apt-get install tcl8.6 tclx valgrind g++ -y - name: test if: true && !contains(github.event.inputs.skiptests, 'redis') - run: ./runtest --valgrind --no-latency --verbose --clients 1 --timeout 2400 --dump-logs ${{github.event.inputs.test_args}} + # Note that valgrind's overhead doesn't pair well with io-threads so we + # explicitly disable tests tagged with 'iothreads' - these are tests that + # always run with io-threads enabled. + run: ./runtest --valgrind --no-latency --verbose --clients 1 --timeout 2400 --tags -iothreads --dump-logs ${{github.event.inputs.test_args}} test-valgrind-misc: runs-on: ubuntu-latest @@ -495,7 +498,7 @@ jobs: sudo apt-get install tcl8.6 tclx valgrind g++ -y - name: test if: true && !contains(github.event.inputs.skiptests, 'redis') - run: ./runtest --valgrind --no-latency --verbose --clients 1 --timeout 2400 --dump-logs ${{github.event.inputs.test_args}} + run: ./runtest --valgrind --tags -iothreads --no-latency --verbose --clients 1 --timeout 2400 --dump-logs ${{github.event.inputs.test_args}} test-valgrind-no-malloc-usable-size-misc: runs-on: ubuntu-latest @@ -580,7 +583,7 @@ jobs: !contains(github.event.inputs.skipjobs, 'sanitizer') timeout-minutes: 14400 env: - CC: clang # MSan work only with clang + CC: clang # MSan works only with clang steps: - name: prep if: github.event_name == 'workflow_dispatch' @@ -665,6 +668,7 @@ jobs: !contains(github.event.inputs.skipjobs, 'sanitizer') timeout-minutes: 14400 strategy: + fail-fast: false # let gcc and clang both run until the end even if one of them fails matrix: compiler: [ gcc, clang ] env: @@ -696,7 +700,7 @@ jobs: run: ./runtest --tsan --clients 1 --config io-threads 4 --accurate --verbose --dump-logs ${{github.event.inputs.test_args}} - name: sentinel tests if: true && !contains(github.event.inputs.skiptests, 'sentinel') - run: ./runtest-sentinel --config io-threads 2 ${{github.event.inputs.cluster_test_args}} + run: ./runtest-sentinel --tsan ${{github.event.inputs.cluster_test_args}} - name: cluster tests if: true && !contains(github.event.inputs.skiptests, 'cluster') run: ./runtest-cluster --config io-threads 2 ${{github.event.inputs.cluster_test_args}} diff --git a/src/iothread.c b/src/iothread.c index b04256f26..54a9cae1e 100644 --- a/src/iothread.c +++ b/src/iothread.c @@ -42,6 +42,67 @@ static inline void sendPendingClientsToMainThreadIfNeeded(IOThread *t, int check } } +/* When moving a client from IO thread to main thread we may need to update + * some of its variables as they are duplicated to avoid contention with main + * thread. + * For now this is valid only for master or slave clients. */ +void updateClientDataFromIOThread(client *c) { + if (!(c->flags & CLIENT_MASTER) && !(c->flags & CLIENT_SLAVE)) return; + + serverAssert(c->tid != IOTHREAD_MAIN_THREAD_ID && + c->running_tid == IOTHREAD_MAIN_THREAD_ID); + + if (c->io_repl_ack_time > c->repl_ack_time) { + serverAssert(c->flags & CLIENT_SLAVE); + c->repl_ack_time = c->io_repl_ack_time; + } + if (c->io_lastinteraction > c->lastinteraction) { + serverAssert(c->flags & CLIENT_MASTER); + c->lastinteraction = c->io_lastinteraction; + } + if (c->io_read_reploff > c->read_reploff) { + serverAssert(c->flags & CLIENT_MASTER); + c->read_reploff = c->io_read_reploff; + } + + /* Update replication buffer referenced node if IO thread has sent some data. */ + if (c->flags & CLIENT_SLAVE && c->ref_repl_buf_node != NULL && + (c->io_curr_repl_node != c->ref_repl_buf_node || + c->io_curr_block_pos != c->ref_block_pos)) + { + ((replBufBlock*)listNodeValue(c->ref_repl_buf_node))->refcount--; + ((replBufBlock*)listNodeValue(c->io_curr_repl_node))->refcount++; + c->ref_block_pos = c->io_curr_block_pos; + c->ref_repl_buf_node = c->io_curr_repl_node; + incrementalTrimReplicationBacklog(REPL_BACKLOG_TRIM_BLOCKS_PER_CALL); + } +} + +/* Check to see if the client needs any cron jobs run for them. Return 1 if the + * client should be terminated */ +int runClientCronFromIOThread(client *c) { + if (c->flags & CLIENT_MASTER && + c->io_last_repl_cron + 1000 <= server.mstime) + { + c->io_last_repl_cron = server.mstime; + if (replicationCronRunMasterClient()) return 1; + } + + /* Run client cron task for the client per second or it is marked as pending cron. */ + if (c->io_last_client_cron + 1000 <= server.mstime || + c->io_flags & CLIENT_IO_PENDING_CRON) + { + c->io_last_client_cron = server.mstime; + if (clientsCronRunClient(c)) return 1; + } else { + /* Update the client in the mem usage if clientsCronRunClient is not + * being called, since that function already performs the update. */ + updateClientMemUsageAndBucket(c); + } + + return 0; +} + /* When IO threads read a complete query of clients or want to free clients, it * should remove it from its clients list and put the client in the list to main * thread, we will send these clients to main thread in IOThreadBeforeSleep. */ @@ -66,6 +127,32 @@ void enqueuePendingClientsToMainThread(client *c, int unbind) { } } +void enqueuePendingClienstToIOThreads(client *c) { + serverAssert(c->tid != IOTHREAD_MAIN_THREAD_ID && + c->running_tid == IOTHREAD_MAIN_THREAD_ID); + + if (c->flags & CLIENT_PENDING_WRITE) { + c->flags &= ~CLIENT_PENDING_WRITE; + listUnlinkNode(server.clients_pending_write, &c->clients_pending_write_node); + } + if (c->flags & CLIENT_SLAVE) { + serverAssert(c->ref_repl_buf_node != NULL); + + c->io_repl_ack_time = c->repl_ack_time; + c->io_curr_repl_node = c->ref_repl_buf_node; + c->io_curr_block_pos = c->ref_block_pos; + c->io_bound_repl_node = listLast(server.repl_buffer_blocks); + c->io_bound_block_pos = ((replBufBlock*)listNodeValue(c->io_bound_repl_node))->used; + } + if (c->flags & CLIENT_MASTER) { + c->io_read_reploff = c->read_reploff; + c->io_lastinteraction = c->lastinteraction; + } + + c->running_tid = c->tid; + listAddNodeHead(mainThreadPendingClientsToIOThreads[c->tid], c); +} + /* Unbind connection of client from io thread event loop, write and read handlers * also be removed, ensures that we can operate the client safely. */ void unbindClientFromIOThreadEventLoop(client *c) { @@ -88,11 +175,12 @@ void keepClientInMainThread(client *c) { server.io_threads_clients_num[c->tid]--; /* Unbind connection of client from io thread event loop. */ unbindClientFromIOThreadEventLoop(c); + /* Update the client's data in case it was just fetched from IO thread */ + updateClientDataFromIOThread(c); /* Let main thread to run it, rebind event loop and read handler */ connRebindEventLoop(c->conn, server.el); connSetReadHandler(c->conn, readQueryFromClient); c->io_flags |= CLIENT_IO_READ_ENABLED | CLIENT_IO_WRITE_ENABLED; - c->running_tid = IOTHREAD_MAIN_THREAD_ID; c->tid = IOTHREAD_MAIN_THREAD_ID; freeClientDeferredObjects(c, 1); /* Free deferred objects. */ freeClientIODeferredObjects(c, 1); /* Free IO deferred objects. */ @@ -132,22 +220,26 @@ void fetchClientFromIOThread(client *c) { /* Unbind connection of client from io thread event loop. */ connUnbindEventLoop(c->conn); /* Now main thread can process it. */ - c->running_tid = IOTHREAD_MAIN_THREAD_ID; resumeIOThread(c->tid); - freeClientDeferredObjects(c, 1); /* Free deferred objects. */ - freeClientIODeferredObjects(c, 1); /* Free IO deferred objects. */ - tryUnlinkClientFromPendingRefReply(c, 0); + + /* Keep the client in main thread. */ + c->running_tid = IOTHREAD_MAIN_THREAD_ID; + keepClientInMainThread(c); } /* For some clients, we must handle them in the main thread, since there is * data race to be processed in IO threads. * * - Close ASAP, we must free the client in main thread. - * - Replica, pubsub, monitor, blocked, tracking clients, main thread may + * - Pubsub, monitor, blocked, tracking clients, main thread may * directly write them a reply when conditions are met. - * - Script command with debug may operate connection directly. */ + * - Script command with debug may operate connection directly. + * - Master/Replica are only handled by IO thread when RDB replication is + * completed. Note we need to check them after checking for other flags + * that may overlap with CLIENT_MASTER/SLAVE - CLOSE_ASAP, MONITOR, + * (UN)BLOCKED, TRACKING. */ int isClientMustHandledByMainThread(client *c) { - if (c->flags & (CLIENT_CLOSE_ASAP | CLIENT_MASTER | CLIENT_SLAVE | + if (c->flags & (CLIENT_CLOSE_ASAP | CLIENT_PUBSUB | CLIENT_MONITOR | CLIENT_BLOCKED | CLIENT_UNBLOCKED | CLIENT_TRACKING | CLIENT_LUA_DEBUG | CLIENT_LUA_DEBUG_SYNC | CLIENT_ASM_MIGRATING | @@ -155,6 +247,34 @@ int isClientMustHandledByMainThread(client *c) { { return 1; } + + /* If RDB replication is done it's safe to move the master client to an IO thread. + * Note that we keep the master client in main thread during failover so as + * not to slow down the failover process by waiting the master replication + * cron in IO thread. */ + if (c->flags & CLIENT_MASTER && + server.repl_state == REPL_STATE_CONNECTED && + server.repl_rdb_ch_state == REPL_RDB_CH_STATE_NONE && + server.failover_state == NO_FAILOVER) + { + return 0; + } + + /* If RDB replication is done for this slave it's safe to move it to an IO thread + * Note that we also check if the ref_repl_buf_node is initialized in order + * to prevent race conditions with main thread when it feeds the replication + * buffer. */ + if (c->flags & CLIENT_SLAVE && + (c->replstate == SLAVE_STATE_ONLINE || + c->replstate == SLAVE_STATE_SEND_BULK_AND_STREAM) && + c->repl_start_cmd_stream_on_ack == 0 && + c->ref_repl_buf_node != NULL) + { + return 0; + } + + if (c->flags & (CLIENT_MASTER | CLIENT_SLAVE)) return 1; + return 0; } @@ -175,7 +295,6 @@ void assignClientToIOThread(client *c) { /* Assign the client to the IO thread. */ server.io_threads_clients_num[c->tid]--; c->tid = min_id; - c->running_tid = min_id; server.io_threads_clients_num[min_id]++; /* The client running in IO thread needs to have deferred objects array. */ @@ -186,7 +305,8 @@ void assignClientToIOThread(client *c) { * to IO thread in beforeSleep. */ connUnbindEventLoop(c->conn); c->io_flags &= ~(CLIENT_IO_READ_ENABLED | CLIENT_IO_WRITE_ENABLED); - listAddNodeTail(mainThreadPendingClientsToIOThreads[c->tid], c); + + enqueuePendingClienstToIOThreads(c); } /* If updating maxclients config, we not only resize the event loop of main thread @@ -480,17 +600,12 @@ int processClientsFromIOThread(IOThread *t) { continue; } - /* Run cron task for the client per second or it is marked as pending cron. */ - if (c->last_cron_check_time + 1000 <= server.mstime || - c->io_flags & CLIENT_IO_PENDING_CRON) - { - c->last_cron_check_time = server.mstime; - if (clientsCronRunClient(c)) continue; - } else { - /* Update the client in the mem usage if clientsCronRunClient is not - * being called, since that function already performs the update. */ - updateClientMemUsageAndBucket(c); - } + /* Update some client's members while we are in main thread so we avoid + * data races. */ + updateClientDataFromIOThread(c); + + /* Check if we need to run a cron job for the client */ + if (runClientCronFromIOThread(c)) continue; /* Process the pending command and input buffer. */ if (!isClientReadErrorFatal(c) && c->io_flags & CLIENT_IO_PENDING_COMMAND) { @@ -515,6 +630,10 @@ int processClientsFromIOThread(IOThread *t) { continue; } + /* Handle replica clients in putReplicasInPendingClientsToIOThreads in + * beforeSleep */ + if (c->flags & CLIENT_SLAVE) continue; + /* Remove this client from pending write clients queue of main thread, * And some clients may do not have reply if CLIENT REPLY OFF/SKIP. */ if (c->flags & CLIENT_PENDING_WRITE) { @@ -524,7 +643,7 @@ int processClientsFromIOThread(IOThread *t) { c->running_tid = c->tid; listLinkNodeHead(mainThreadPendingClientsToIOThreads[c->tid], node); node = NULL; - + /* If there are several clients to process, let io thread handle them ASAP. */ sendPendingClientsToIOThreadIfNeeded(t, 1); } diff --git a/src/networking.c b/src/networking.c index ff43728cf..62d6caf64 100644 --- a/src/networking.c +++ b/src/networking.c @@ -157,6 +157,10 @@ client *createClient(connection *conn) { c->last_header = NULL; c->ref_repl_buf_node = NULL; c->ref_block_pos = 0; + c->io_curr_repl_node = NULL; + c->io_curr_block_pos = 0; + c->io_bound_repl_node = NULL; + c->io_bound_block_pos = 0; c->qb_pos = 0; c->querybuf = NULL; c->querybuf_peak = 0; @@ -186,6 +190,7 @@ client *createClient(connection *conn) { c->slot = -1; c->cluster_compatibility_check_slot = -2; c->ctime = c->lastinteraction = server.unixtime; + c->io_lastinteraction = 0; c->duration = 0; clientSetDefaultAuth(c); c->replstate = REPL_STATE_NONE; @@ -193,9 +198,11 @@ client *createClient(connection *conn) { c->reploff = 0; c->reploff_next = 0; c->read_reploff = 0; + c->io_read_reploff = 0; c->repl_applied = 0; c->repl_ack_off = 0; c->repl_ack_time = 0; + c->io_repl_ack_time = 0; c->repl_aof_off = 0; c->repl_last_partial_write = 0; c->slave_listening_port = 0; @@ -222,7 +229,8 @@ client *createClient(connection *conn) { c->postponed_list_node = NULL; c->client_tracking_redirection = 0; c->client_tracking_prefixes = NULL; - c->last_cron_check_time = 0; + c->io_last_client_cron = 0; + c->io_last_repl_cron = 0; c->last_memory_usage = 0; c->last_memory_type = CLIENT_TYPE_NORMAL; c->module_blocked_client = NULL; @@ -243,6 +251,7 @@ client *createClient(connection *conn) { c->commands_processed = 0; c->task = NULL; c->node_id = NULL; + atomicSet(c->pending_read, 0); return c; } @@ -317,7 +326,7 @@ static inline int _prepareClientToWrite(client *c) { * If the client runs in an IO thread, we should not put the client in the * pending write queue. Instead, we will install the write handler to the * corresponding IO thread’s event loop and let it handle the reply. */ - if (!clientHasPendingReplies(c) && likely(c->running_tid == IOTHREAD_MAIN_THREAD_ID)) + if (likely(c->running_tid == IOTHREAD_MAIN_THREAD_ID) && !clientHasPendingReplies(c)) putClientInPendingWriteQueue(c); /* Authorize the caller to queue in the output buffer of this client. */ @@ -1459,8 +1468,9 @@ void deferredAfterErrorReply(client *c, list *errors) { /* Logically copy 'src' replica client buffers info to 'dst' replica. * Basically increase referenced buffer block node reference count. */ void copyReplicaOutputBuffer(client *dst, client *src) { - serverAssert(src->bufpos == 0 && listLength(src->reply) == 0); - + serverAssert(src->bufpos == 0 && listLength(src->reply) == 0); + serverAssert(src->running_tid == IOTHREAD_MAIN_THREAD_ID && + dst->running_tid == IOTHREAD_MAIN_THREAD_ID); if (src->ref_repl_buf_node == NULL) return; dst->ref_repl_buf_node = src->ref_repl_buf_node; dst->ref_block_pos = src->ref_block_pos; @@ -1479,10 +1489,15 @@ static inline int _clientHasPendingRepliesSlave(client *c) { /* If the last replication buffer block content is totally sent, * we have nothing to send. */ - listNode *ln = listLast(server.repl_buffer_blocks); - replBufBlock *tail = listNodeValue(ln); - if (ln == c->ref_repl_buf_node && - c->ref_block_pos == tail->used) return 0; + if (c->running_tid == IOTHREAD_MAIN_THREAD_ID) { + listNode *ln = listLast(server.repl_buffer_blocks); + replBufBlock *tail = listNodeValue(ln); + if (ln == c->ref_repl_buf_node && + c->ref_block_pos == tail->used) return 0; + } else { + if (c->io_bound_repl_node == c->io_curr_repl_node && + c->io_bound_block_pos == c->io_curr_block_pos) return 0; + } return 1; } @@ -2036,7 +2051,7 @@ void freeClient(client *c) { /* We need to unbind connection of client from io thread event loop first. */ if (c->tid != IOTHREAD_MAIN_THREAD_ID) { - unbindClientFromIOThreadEventLoop(c); + keepClientInMainThread(c); } /* Update the number of clients in the IO thread. */ @@ -2076,6 +2091,7 @@ void freeClient(client *c) { serverLog(LL_NOTICE,"Connection with master lost."); if (!(c->flags & (CLIENT_PROTOCOL_ERROR|CLIENT_BLOCKED))) { c->flags &= ~(CLIENT_CLOSE_ASAP|CLIENT_CLOSE_AFTER_REPLY); + c->io_flags &= ~CLIENT_IO_CLOSE_ASAP; replicationCacheMaster(c); return; } @@ -2591,6 +2607,26 @@ static inline int _writeToClientNonSlave(client *c, ssize_t *nwritten) { static inline int _writeToClientSlave(client *c, ssize_t *nwritten) { *nwritten = 0; serverAssert(c->bufpos == 0 && listLength(c->reply) == 0); + + if (c->running_tid != IOTHREAD_MAIN_THREAD_ID) { + replBufBlock *o = listNodeValue(c->io_curr_repl_node); + /* The IO thread must not send data beyond the bound position. */ + size_t pos = c->io_curr_repl_node == c->io_bound_repl_node ? + c->io_bound_block_pos : o->used; + if (pos > c->io_curr_block_pos) { + *nwritten = connWrite(c->conn, o->buf+c->io_curr_block_pos, + pos-c->io_curr_block_pos); + if (*nwritten <= 0) return C_ERR; + c->io_curr_block_pos += *nwritten; + } + /* If we fully sent the object and there are more nodes to send, go to the next one. */ + if (c->io_curr_block_pos == pos && c->io_curr_repl_node != c->io_bound_repl_node) { + c->io_curr_repl_node = listNextNode(c->io_curr_repl_node); + c->io_curr_block_pos = 0; + } + return C_OK; + } + replBufBlock *o = listNodeValue(c->ref_repl_buf_node); serverAssert(o->used >= c->ref_block_pos); /* Send current block if it is not fully sent. */ @@ -2704,6 +2740,14 @@ int writeToClient(client *c, int handler_installed) { /* Remove client from pending referenced reply clients list. */ if (c->running_tid == IOTHREAD_MAIN_THREAD_ID) tryUnlinkClientFromPendingRefReply(c, 1); + + /* If replica client has sent all the replication data it knows about + * we send it to main thread so it can pick up new repl data ASAP. + * Note, that we keep it in IO thread in case we have a pending ACK read. */ + if (c->flags & CLIENT_SLAVE && c->running_tid != IOTHREAD_MAIN_THREAD_ID) { + if (!replicaFromIOThreadHasPendingRead(c)) + enqueuePendingClientsToMainThread(c, 0); + } } /* Update client's memory usage after writing. * Since this isn't thread safe we do this conditionally. */ @@ -2731,6 +2775,11 @@ int handleClientsWithPendingWrites(void) { listRewind(server.clients_pending_write,&li); while((ln = listNext(&li))) { client *c = listNodeValue(ln); + + /* We handle IO thread replicas in putReplicasInPendingClientsToIOThreads */ + if (c->flags & CLIENT_SLAVE && c->tid != IOTHREAD_MAIN_THREAD_ID) + continue; + c->flags &= ~CLIENT_PENDING_WRITE; listUnlinkNode(server.clients_pending_write,ln); @@ -2744,6 +2793,7 @@ int handleClientsWithPendingWrites(void) { /* Let IO thread handle the client if possible. */ if (server.io_threads_num > 1 && !(c->flags & CLIENT_CLOSE_AFTER_REPLY) && + c->tid == IOTHREAD_MAIN_THREAD_ID && !isClientMustHandledByMainThread(c)) { assignClientToIOThread(c); @@ -2793,7 +2843,8 @@ static inline void resetClientInternal(client *c, int num_pcmds_to_free) { c->cur_script = NULL; c->slot = -1; c->cluster_compatibility_check_slot = -2; - c->flags &= ~CLIENT_EXECUTING_COMMAND; + if (c->flags & CLIENT_EXECUTING_COMMAND) + c->flags &= ~CLIENT_EXECUTING_COMMAND; /* Make sure the duration has been recorded to some command. */ serverAssert(c->duration == 0); @@ -2807,19 +2858,27 @@ static inline void resetClientInternal(client *c, int num_pcmds_to_free) { /* We clear the ASKING flag as well if we are not inside a MULTI, and * if what we just executed is not the ASKING command itself. */ - if (!(c->flags & CLIENT_MULTI) && prevcmd != askingCommand) + if (c->flags & CLIENT_ASKING && !(c->flags & CLIENT_MULTI) && + prevcmd != askingCommand) + { c->flags &= ~CLIENT_ASKING; + } /* We do the same for the CACHING command as well. It also affects * the next command or transaction executed, in a way very similar * to ASKING. */ - if (!(c->flags & CLIENT_MULTI) && prevcmd != clientCommand) + if (c->flags & CLIENT_TRACKING_CACHING && !(c->flags & CLIENT_MULTI) && + prevcmd != clientCommand) + { c->flags &= ~CLIENT_TRACKING_CACHING; + } /* Remove the CLIENT_REPLY_SKIP flag if any so that the reply * to the next command will be sent, but set the flag if the command * we just processed was "CLIENT REPLY SKIP". */ - c->flags &= ~CLIENT_REPLY_SKIP; + if (c->flags & CLIENT_REPLY_SKIP) + c->flags &= ~CLIENT_REPLY_SKIP; + if (c->flags & CLIENT_REPLY_SKIP_NEXT) { c->flags |= CLIENT_REPLY_SKIP; c->flags &= ~CLIENT_REPLY_SKIP_NEXT; @@ -2908,8 +2967,18 @@ int processInlineBuffer(client *c, pendingCommand *pcmd) { /* Newline from slaves can be used to refresh the last ACK time. * This is useful for a slave to ping back while loading a big * RDB file. */ - if (querylen == 0 && clientTypeIsSlave(c)) - c->repl_ack_time = server.unixtime; + if (querylen == 0 && clientTypeIsSlave(c)) { + if (c->running_tid == IOTHREAD_MAIN_THREAD_ID) + c->repl_ack_time = server.unixtime; + else + /* If this is a replica client running in an IO thread we cache the + * last ack time in a different member variable in order to avoid + * contention with main thread. f.e see refreshGoodSlavesCount() + * Note c->repl_ack_time will still be updated in + * updateClientDataFromIOThread with the value of c->io_repl_ack_time + * when the client moves from IO to main thread. */ + c->io_repl_ack_time = server.unixtime; + } /* Masters should never send us inline protocol to run actual * commands. If this happens, it is likely due to a bug in Redis where @@ -3505,7 +3574,11 @@ int processInputBuffer(client *c) { if (unlikely(pcmd->read_error || (pcmd->flags & PENDING_CMD_FLAG_INCOMPLETE))) break; - pcmd->reploff = c->read_reploff - sdslen(c->querybuf) + c->qb_pos; + if (c->running_tid == IOTHREAD_MAIN_THREAD_ID) + pcmd->reploff = c->read_reploff - sdslen(c->querybuf) + c->qb_pos; + else + pcmd->reploff = c->io_read_reploff - sdslen(c->querybuf) + c->qb_pos; + preprocessCommand(c, pcmd); pcmd->flags |= PENDING_CMD_FLAG_PREPROCESSED; resetClientQbufState(c); @@ -3611,7 +3684,14 @@ void readQueryFromClient(connection *conn) { client *c = connGetPrivateData(conn); int nread, big_arg = 0; size_t qblen, readlen; - if (!(c->io_flags & CLIENT_IO_READ_ENABLED)) return; + + if (!(c->io_flags & CLIENT_IO_READ_ENABLED)) { + atomicSetWithSync(c->pending_read, 1); + return; + } else if (server.io_threads_num > 1) { + atomicSetWithSync(c->pending_read, 0); + } + c->read_error = 0; /* Update the number of reads of io threads on server */ @@ -3702,9 +3782,21 @@ void readQueryFromClient(connection *conn) { qblen = sdslen(c->querybuf); if (c->querybuf_peak < qblen) c->querybuf_peak = qblen; - c->lastinteraction = server.unixtime; + if (!(c->flags & CLIENT_MASTER) || c->running_tid == IOTHREAD_MAIN_THREAD_ID) + c->lastinteraction = server.unixtime; + else + /* Avoid contention with genRedisInfoString as it can access master + * client's data. If this is a master running in IO thread the value of + * c->lastinteraction will be updated during processClientsFromIOThread */ + c->io_lastinteraction = server.unixtime; + if (c->flags & CLIENT_MASTER) { - c->read_reploff += nread; + if (c->running_tid == IOTHREAD_MAIN_THREAD_ID) { + c->read_reploff += nread; + } else { + /* Same comment as for c->io_lastinteraction */ + c->io_read_reploff += nread; + } atomicIncr(server.stat_net_repl_input_bytes, nread); } else { atomicIncr(server.stat_net_input_bytes, nread); @@ -5127,6 +5219,16 @@ void flushSlavesOutputBuffers(void) { listRewind(server.slaves,&li); while((ln = listNext(&li))) { client *slave = listNodeValue(ln); + + /* Fetch the replica clients that are currently running in IO thread. */ + if (slave->running_tid != IOTHREAD_MAIN_THREAD_ID) { + fetchClientFromIOThread(slave); + /* If the slave doesn't have any pending replies nothing to do + * anyways. */ + if (!clientHasPendingReplies(slave)) continue; + putClientInPendingWriteQueue(slave); + } + int can_receive_writes = connHasWriteHandler(slave->conn) || (slave->flags & CLIENT_PENDING_WRITE); diff --git a/src/replication.c b/src/replication.c index d4ea0aa6d..309d6c4f7 100644 --- a/src/replication.c +++ b/src/replication.c @@ -98,6 +98,86 @@ unsigned long replicationLogicalReplicaCount(void) { return count; } +int replicaFromIOThreadHasPendingRead(client *c) { + serverAssert(c->tid != IOTHREAD_MAIN_THREAD_ID); + + int pending_read; + atomicGetWithSync(c->pending_read, pending_read); + return pending_read; +} + +/* Send replicas to their respective IO threads if it has pending reads or + * writes. Otherwise it remains in main thread so it can check for new data in + * the replication buffer ASAP. */ +void putReplicasInPendingClientsToIOThreads(void) { + if (server.io_threads_num <= 1) return; + + serverAssert(pthread_equal(pthread_self(), server.main_thread_id)); + + listIter li; + listNode *ln; + listRewind(server.slaves,&li); + while((ln = listNext(&li))) { + client *replica = listNodeValue(ln); + + /* We only care about replicas that need to run on IO thread but are + * currently in main */ + if (replica->tid == IOTHREAD_MAIN_THREAD_ID || + replica->running_tid != IOTHREAD_MAIN_THREAD_ID) + { + continue; + } + + /* Skip the replica if it's scheduled for close */ + if (replica->flags & CLIENT_CLOSE_ASAP) continue; + + /* The call to clientHasPendingReplies may seem redundant but in the + * case of replica being in IO thread we can have the following case: + * replica gets back to main thread after sending the repl buffer it + * knows about. In the mean time main thread has accumulated new repl + * data. In that case the replica's client wouldn't have been put in + * the pending write queue but will still have new repl data it needs to + * send, so we make sure to check for that and send it back to IO thread + * if so. On the other hand if replica gets back to main thread before + * any new repl data has accumulated then after a new cmd is propagated + * the replica will be put in the pending write queue as usual so we + * need to check for that also. + * In addition, if the replica client has pending read events, we should + * also send them to the IO thread. */ + if (replica->flags & CLIENT_PENDING_WRITE || + clientHasPendingReplies(replica) || + replicaFromIOThreadHasPendingRead(replica)) + { + enqueuePendingClienstToIOThreads(replica); + } + } +} + +/* Run some cron tasks for a connected master client. Return 1 when the client + * is freed, 0 otherwise. */ +int replicationCronRunMasterClient(void) { + if (!server.masterhost || !server.master) return 0; + + if (server.master->running_tid != IOTHREAD_MAIN_THREAD_ID) return 0; + + /* Timed out master when we are an already connected slave? */ + if (server.repl_state == REPL_STATE_CONNECTED && + (time(NULL)-server.master->lastinteraction) > server.repl_timeout) + { + serverLog(LL_WARNING,"MASTER timeout: no data nor PING received..."); + freeClient(server.master); + return 1; + } + + /* Send ACK to master from time to time. + * Note that we do not send periodic acks to masters that don't + * support PSYNC and replication offsets. */ + if (!(server.master->flags & CLIENT_PRE_PSYNC)) + replicationSendAck(); + + return 0; +} + ConnectionType *connTypeOfReplication(void) { if (server.tls_replication) { return connectionTypeTls(); @@ -375,6 +455,8 @@ void incrementalTrimReplicationBacklog(size_t max_blocks) { /* Free replication buffer blocks that are referenced by this client. */ void freeReplicaReferencedReplBuffer(client *replica) { + serverAssert(replica->running_tid == IOTHREAD_MAIN_THREAD_ID); + if (replica->ref_repl_buf_node != NULL) { /* Decrease the start buffer node reference count. */ replBufBlock *o = listNodeValue(replica->ref_repl_buf_node); @@ -706,6 +788,8 @@ void replicationFeedMonitors(client *c, list *monitors, int dictid, robj **argv, /* Feed the slave 'c' with the replication backlog starting from the * specified 'offset' up to the end of the backlog. */ long long addReplyReplicationBacklog(client *c, long long offset) { + serverAssert(c->running_tid == IOTHREAD_MAIN_THREAD_ID); + long long skip; serverLog(LL_DEBUG, "[PSYNC] Replica request offset: %lld", offset); @@ -4361,6 +4445,7 @@ void replicationSendAck(void) { */ void replicationCacheMaster(client *c) { serverAssert(server.master != NULL && server.cached_master == NULL); + serverAssert(server.master->tid == IOTHREAD_MAIN_THREAD_ID); serverLog(LL_NOTICE,"Caching the disconnected master state."); /* Unlink the client from the server structures. */ @@ -4456,6 +4541,8 @@ void replicationDiscardCachedMaster(void) { * so the stream of data that we'll receive will start from where this * master left. */ void replicationResurrectCachedMaster(connection *conn) { + serverAssert(server.cached_master->tid == IOTHREAD_MAIN_THREAD_ID); + server.master = server.cached_master; server.cached_master = NULL; server.master->conn = conn; @@ -4800,14 +4887,6 @@ void replicationCron(void) { cancelReplicationHandshake(1); } - /* Timed out master when we are an already connected slave? */ - if (server.masterhost && server.repl_state == REPL_STATE_CONNECTED && - (time(NULL)-server.master->lastinteraction) > server.repl_timeout) - { - serverLog(LL_WARNING,"MASTER timeout: no data nor PING received..."); - freeClient(server.master); - } - /* Check if we should connect to a MASTER */ if (server.repl_state == REPL_STATE_CONNECT) { serverLog(LL_NOTICE,"Connecting to MASTER %s:%d", @@ -4815,12 +4894,7 @@ void replicationCron(void) { connectWithMaster(); } - /* Send ACK to master from time to time. - * Note that we do not send periodic acks to masters that don't - * support PSYNC and replication offsets. */ - if (server.masterhost && server.master && - !(server.master->flags & CLIENT_PRE_PSYNC)) - replicationSendAck(); + replicationCronRunMasterClient(); /* If we have attached slaves, PING them from time to time. * So slaves can implement an explicit timeout to masters, and will diff --git a/src/script.c b/src/script.c index fb815241a..f222b89e6 100644 --- a/src/script.c +++ b/src/script.c @@ -38,7 +38,22 @@ static void exitScriptTimedoutMode(scriptRunCtx *run_ctx) { run_ctx->flags &= ~SCRIPT_TIMEDOUT; blockingOperationEnds(); /* if we are a replica and we have an active master, set it for continue processing */ - if (server.masterhost && server.master) queueClientForReprocessing(server.master); + if (server.masterhost && server.master) { + /* Master running in IO thread needs to be sent to main thread so that + * it can process any pending commands ASAP without waiting for the next + * read. + * We don't queue the client for reprocessing in this case as it will + * create contention with main thread when it deals with unblocked + * clients - see comment above queueClientForReprocessing. */ + if (server.master->running_tid != IOTHREAD_MAIN_THREAD_ID) { + pauseIOThread(server.master->tid); + enqueuePendingClientsToMainThread(server.master, 0); + resumeIOThread(server.master->tid); + return; + } + + queueClientForReprocessing(server.master); + } } static void enterScriptTimedoutMode(scriptRunCtx *run_ctx) { diff --git a/src/server.c b/src/server.c index b07ca0c58..a0d0af867 100644 --- a/src/server.c +++ b/src/server.c @@ -1983,6 +1983,10 @@ void beforeSleep(struct aeEventLoop *eventLoop) { /* Handle writes with pending output buffers. */ handleClientsWithPendingWrites(); + /* Check if IO thread replicas have any pending read or writes and send them + * back to their threads if so. */ + putReplicasInPendingClientsToIOThreads(); + /* Let io thread to handle its pending clients. */ sendPendingClientsToIOThreads(); @@ -4883,6 +4887,15 @@ int finishShutdown(void) { /* Don't count migration destination replicas. */ if (replica->flags & CLIENT_ASM_MIGRATING) continue; num_replicas++; + + /* We pause the IO thread this replica is running on so we avoid data + * races. */ + int paused = 0; + if (replica->running_tid != IOTHREAD_MAIN_THREAD_ID) { + pauseIOThread(replica->tid); + paused = 1; + } + if (replica->repl_ack_off != server.master_repl_offset) { num_lagging_replicas++; long lag = replica->replstate == SLAVE_STATE_ONLINE ? @@ -4894,6 +4907,8 @@ int finishShutdown(void) { lag, replstateToString(replica->replstate)); } + + if (paused) resumeIOThread(replica->tid); } if (num_replicas > 0) { serverLog(LL_NOTICE, @@ -6520,10 +6535,11 @@ sds genRedisInfoString(dict *section_dict, int all_sections, int everything) { server.repl_down_since ? (intmax_t)(server.unixtime-server.repl_down_since) : -1); } else { - info = sdscatprintf(info, + info = sdscatprintf(info, FMTARGS( "master_link_up_since_seconds:%jd\r\n", server.repl_up_since ? /* defensive code, should never be 0 when connected */ - (intmax_t)(server.unixtime-server.repl_up_since) : -1); + (intmax_t)(server.unixtime-server.repl_up_since) : -1, + "master_client_io_thread:%d\r\n", server.master->tid)); } info = sdscatprintf(info, "total_disconnect_time_sec:%jd\r\n", (intmax_t)server.repl_total_disconnect_time+(current_disconnect_time)); @@ -6582,9 +6598,9 @@ sds genRedisInfoString(dict *section_dict, int all_sections, int everything) { info = sdscatprintf(info, "slave%d:ip=%s,port=%d,state=%s," - "offset=%lld,lag=%ld\r\n", + "offset=%lld,lag=%ld,io-thread=%d\r\n", slaveid,slaveip,slave->slave_listening_port,state, - slave->repl_ack_off, lag); + slave->repl_ack_off, lag, slave->tid); slaveid++; } } diff --git a/src/server.h b/src/server.h index dc6fcd2d2..1e0173dfa 100644 --- a/src/server.h +++ b/src/server.h @@ -1124,7 +1124,27 @@ typedef struct clientReplyBlock { * node, it should first increase the next node's refcount, and when we trim * the replication buffer nodes, we remove node always from the head node which * refcount is 0. If the refcount of the head node is not 0, we must stop - * trimming and never iterate the next node. */ + * trimming and never iterate the next node. + * + * For replicas in IO threads we don't update the refcount while sending the + * repl data, but only when the client is sent back to main. This avoids data + * races. In order to achieve this, the replicas keep track of following: + * - io_curr_repl_node - the current node we've reached. + * - io_bound_repl_node - the last node in the replication buffer as seen by + * the replica client before it was sent to IO thread + * + * When the client is sent to IO thread for the first time io_curr_repl_node is + * initialized with ref_repl_buf_node. + * When the client is sent back to main it can decrement ref_repl_buf_node's + * refcount and increment it for io_curr_repl_node, since all the nodes + * in-between are already sent and the client doesn't hold reference to them. + * + * `io_bound_repl_node` is needed because IO thread needs to know when to stop + * sending data. If it was reading directly from the replication buffer, + * there will be a data race, because main thread may be writing to it during + * `feedReplicationBuffer`. `io_bound_repl_node` is cached in the client + * together with its used size just before sending the client to IO thread + * in `enqueuePendingClienstToIOThreads`. */ /* Similar with 'clientReplyBlock', it is used for shared buffers between * all replica clients and replication backlog. */ @@ -1132,7 +1152,8 @@ typedef struct replBufBlock { int refcount; /* Number of replicas or repl backlog using. */ long long id; /* The unique incremental number. */ long long repl_offset; /* Start replication offset of the block. */ - size_t size, used; + size_t size; /* Capacity of the buf in bytes */ + size_t used; /* Count of written bytes */ char buf[]; } replBufBlock; @@ -1452,8 +1473,15 @@ typedef struct client { * any positive number means we found a slot and no violation yet. */ dictEntry *cur_script; /* Cached pointer to the dictEntry of the script being executed. */ time_t lastinteraction; /* Time of the last interaction, used for timeout */ + time_t io_lastinteraction; /* Time of the last interaction as seen from + * IO thread. When the client is moved to main + * it updates its `lastinteraction` value from + * this. */ time_t obuf_soft_limit_reached_time; - mstime_t last_cron_check_time; /* The last client check time in cron */ + mstime_t io_last_client_cron; /* Timestamp of last invocation of client + * cron if client is running in IO thread */ + mstime_t io_last_repl_cron; /* Timestamp of last invocation of replication + * cron if client is running in IO thread. */ int authenticated; /* Needed when the default user requires auth. */ int replstate; /* Replication state if this is a slave. */ int repl_start_cmd_stream_on_ack; /* Install slave write handler on first ACK. */ @@ -1462,12 +1490,20 @@ typedef struct client { off_t repldbsize; /* Replication DB file size. */ sds replpreamble; /* Replication DB preamble. */ long long read_reploff; /* Read replication offset if this is a master. */ + long long io_read_reploff; /* Copy of read_reploff but only used when + * master client is in IO thread so we don't + * have contention with IO thread. */ long long reploff; /* Applied replication offset if this is a master. */ long long reploff_next; /* Next value to set for reploff when a command finishes executing */ long long repl_applied; /* Applied replication data count in querybuf, if this is a replica. */ long long repl_ack_off; /* Replication ack offset, if this is a slave. */ long long repl_aof_off; /* Replication AOF fsync ack offset, if this is a slave. */ long long repl_ack_time;/* Replication ack time, if this is a slave. */ + long long io_repl_ack_time; /* Replication ack time, if this is a replica in + * IO thread. Keeps track of repl_ack_time while + * replica is in IO thread to avoid data races + * with main. repl_ack_time is updated with this + * value when replica returns to main thread. */ long long repl_last_partial_write; /* The last time the server did a partial write from the RDB child pipe to this replica */ long long psync_initial_offset; /* FULLRESYNC reply offset other slaves copying this slave output buffer @@ -1528,6 +1564,14 @@ typedef struct client { * see the definition of replBufBlock. */ size_t ref_block_pos; /* Access position of referenced buffer block, * i.e. the next offset to send. */ + listNode *io_curr_repl_node; /* Current node we are sending repl data from in + * IO thread. */ + size_t io_curr_block_pos; /* Current position we are sending repl data from + * in IO thread. */ + listNode *io_bound_repl_node;/* Bound node we are sending repl data from in + * IO thread. */ + size_t io_bound_block_pos; /* Bound position we are sending repl data from + * in IO thread. */ /* list node in clients_pending_write list */ listNode clients_pending_write_node; @@ -1554,6 +1598,9 @@ typedef struct client { unsigned long long commands_processed; /* Total count of commands this client executed. */ struct asmTask *task; /* Atomic slot migration task */ char *node_id; /* Node ID to connect to for atomic slot migration */ + + redisAtomic int pending_read; /* Flag indicating an IO thread client residing + * in main thread has received a read event. */ } client; typedef struct __attribute__((aligned(CACHE_LINE_SIZE))) { @@ -3170,7 +3217,7 @@ void resumeIOThreadsRange(int start, int end); int resizeAllIOThreadsEventLoops(size_t newsize); int sendPendingClientsToIOThreads(void); void enqueuePendingClientsToMainThread(client *c, int unbind); -void putInPendingClienstForIOThreads(client *c); +void enqueuePendingClienstToIOThreads(client *c); void handleClientReadError(client *c); void unbindClientFromIOThreadEventLoop(client *c); int processClientsOfAllIOThreads(void); @@ -3317,6 +3364,9 @@ void replDataBufInit(replDataBuf *buf); void replDataBufClear(replDataBuf *buf); void replDataBufReadFromConn(connection *conn, replDataBuf *buf, void (*error_handler)(connection *conn)); int replDataBufStreamToDb(replDataBuf *buf, replDataBufToDbCtx *ctx); +int replicaFromIOThreadHasPendingRead(client *c); +void putReplicasInPendingClientsToIOThreads(void); +int replicationCronRunMasterClient(void); /* Generic persistence functions */ void startLoadingFile(size_t size, char* filename, int rdbflags); diff --git a/src/tsan.sup b/src/tsan.sup index 786d1d3df..904ea6b33 100644 --- a/src/tsan.sup +++ b/src/tsan.sup @@ -6,3 +6,16 @@ signal:printCrashReport # TODO Investigate this race in jemalloc probably related to # https://github.com/jemalloc/jemalloc/issues/2621 race:malloc_mutex_trylock_final + +# A race can happen on conn->last_errno if replica client is reading/writing +# data in IO thread and main thread is calling connAddrPeerName for some reason +# (f.e genRedisInfoString/roleCommand...). +# Not worth the additional code for synchronization as: +# - errno is thread-safe according to POSIX std +# - we don't support systems that allow word tearing, i.e last_errno value would +# be a correct value at the end - either the errno from main or from IO thread +# - even if we fix the data race on last_errno we still have the problem of it +# being set to either errno unless we pause the IO thread during main-thread's +# execution which would incur too big of a cost. +# - the race happens rarely +race:connSocketAddr diff --git a/tests/cluster/tests/14-consistency-check.tcl b/tests/cluster/tests/14-consistency-check.tcl index e6f7723fc..53c67c93c 100644 --- a/tests/cluster/tests/14-consistency-check.tcl +++ b/tests/cluster/tests/14-consistency-check.tcl @@ -35,7 +35,8 @@ proc get_one_of_my_replica {id} { # To avoid -LOADING reply, wait until replica syncs with master. wait_for_condition 1000 50 { - [RI $replica_id_num master_link_status] eq {up} + [RI $replica_id_num master_link_status] eq {up} && + [R $replica_id_num dbsize] eq [R $id dbsize] } else { fail "Replica did not sync in time." } diff --git a/tests/instances.tcl b/tests/instances.tcl index 05b8507a1..d93b36b33 100644 --- a/tests/instances.tcl +++ b/tests/instances.tcl @@ -40,6 +40,7 @@ set ::dirs {} ; # We remove all the temp dirs at exit set ::run_matching {} ; # If non empty, only tests matching pattern are run. set ::stop_on_failure 0 set ::loop 0 +set ::tsan 0 if {[catch {cd tmp}]} { puts "tmp directory not found." @@ -310,6 +311,8 @@ proc parse_options {} { set ::log_req_res 1 } elseif {$opt eq {--force-resp3}} { set ::force_resp3 1 + } elseif {$opt eq {--tsan}} { + set ::tsan 1 } elseif {$opt eq "--help"} { puts "--single Only runs tests specified by pattern." puts "--dont-clean Keep log files on exit." diff --git a/tests/integration/failover.tcl b/tests/integration/failover.tcl index bd33f84ab..c2df86c67 100644 --- a/tests/integration/failover.tcl +++ b/tests/integration/failover.tcl @@ -81,7 +81,7 @@ start_server {overrides {save {}}} { resume_process [srv -1 pid] # Execute the failover - $node_0 failover to $node_1_host $node_1_port + assert_equal "OK" [$node_0 failover to $node_1_host $node_1_port] # Wait for failover to end wait_for_condition 50 100 { @@ -180,10 +180,19 @@ start_server {overrides {save {}}} { assert_equal [count_log_message -2 "time out exceeded, failing over."] 1 - # We should accept both psyncs, although this is the condition we might not - # since we didn't catch up. - assert_equal [expr [s 0 sync_partial_ok] - $initial_psyncs] 2 - assert_equal [expr [s 0 sync_full] - $initial_syncs] 0 + # We should accept both psyncs, although this is the condition we might + # not meet since we didn't catch up. This happens often if TSan is + # enabled as it slows down the execution time significantly. + set psyncs [expr [s 0 sync_partial_ok] - $initial_psyncs] + set full_syncs [expr [s 0 sync_full] - $initial_syncs] + if {$::tsan} { + assert_lessthan_equal $psyncs 2 + assert_morethan_equal $full_syncs 0 + assert_equal [expr $psyncs + $full_syncs] 2 + } else { + assert_equal $psyncs 2 + assert_equal $full_syncs 0 + } assert_digests_match $node_0 $node_1 $node_2 } diff --git a/tests/integration/psync2-master-restart.tcl b/tests/integration/psync2-master-restart.tcl index b0d394389..5971e74e1 100644 --- a/tests/integration/psync2-master-restart.tcl +++ b/tests/integration/psync2-master-restart.tcl @@ -164,8 +164,12 @@ start_server {} { assert {[status $replica sync_partial_ok] == 1} set digest [$master debug digest] - assert {$digest eq [$replica debug digest]} - assert {$digest eq [$sub_replica debug digest]} + wait_for_condition 10 100 { + $digest eq [$replica debug digest] && + $digest eq [$sub_replica debug digest] + } else { + fail "Replica and sub-replica didn't sync after master restart in time..." + } } test "PSYNC2: Full resync after Master restart when too many key expired" { diff --git a/tests/integration/replication-buffer.tcl b/tests/integration/replication-buffer.tcl index 2b8ae6806..11e604c75 100644 --- a/tests/integration/replication-buffer.tcl +++ b/tests/integration/replication-buffer.tcl @@ -62,6 +62,16 @@ start_server {} { test "All replicas share one global replication buffer rdbchannel=$rdbchannel" { set before_used [s used_memory] populate 1024 "" 1024 ; # Write extra 1M data + + # In case we are running with IO-threads we need to give a few cycles + # for IO-threads to start sending the cmd stream. If we don't do that + # the checks related to the repl_buf_mem will be incorrect as the buffer + # will still be full with the above 1Mb data. + set iothreads [s io_threads_active] + if {$iothreads && $rdbchannel == "yes"} { + after 1000 + } + # New data uses 1M memory, but all replicas use only one # replication buffer, so all replicas output memory is not # more than double of replication buffer. diff --git a/tests/integration/replication-iothreads.tcl b/tests/integration/replication-iothreads.tcl new file mode 100644 index 000000000..12d084526 --- /dev/null +++ b/tests/integration/replication-iothreads.tcl @@ -0,0 +1,331 @@ +# +# Copyright (c) 2025-Present, Redis Ltd. +# All rights reserved. +# +# Licensed under your choice of (a) the Redis Source Available License 2.0 +# (RSALv2); or (b) the Server Side Public License v1 (SSPLv1); or (c) the +# GNU Affero General Public License v3 (AGPLv3). +# + +# Tests for master and slave clients in IO threads + +# Helper function to get master client IO thread from INFO replication +proc get_master_client_io_thread {r} { + return [status $r master_client_io_thread] +} + +# Helper function to get slave client IO thread from INFO replication +proc get_slave_client_io_thread {r slave_idx} { + set info [$r info replication] + set lines [split $info "\r\n"] + + foreach line $lines { + if {[string match "slave${slave_idx}:*" $line]} { + # Parse the slave line to extract io-thread value + set parts [split $line ","] + foreach part $parts { + if {[string match "*io-thread=*" $part]} { + set kv [split $part "="] + assert_equal [llength $kv] 2 + return [lindex $kv 1] + } + } + } + } + return -1 +} + +start_server {overrides {io-threads 4 save ""} tags {"iothreads repl network external:skip"}} { +start_server {overrides {io-threads 4 save ""}} { + set master [srv 0 client] + set master_host [srv 0 host] + set master_port [srv 0 port] + set slave [srv -1 client] + + test {Setup slave} { + $slave slaveof $master_host $master_port + wait_for_condition 1000 100 { + [s -1 master_link_status] eq {up} + } else { + fail "Replication not started." + } + } + + test {Master client moves to IO thread after sync complete} { + # Check master client thread assignment (master client is on slave side) + wait_for_condition 100 100 { + [get_master_client_io_thread $slave] > 0 + } else { + fail "Master client was not assigned to IO thread" + } + } + + test {Slave client assignment to IO threads} { + # Verify slave is connected and online + wait_replica_online $master 0 + + # Slave client is connected - force a write so that it's assigned to an + # IO thread. + assert_equal "OK" [$master set x x] + + # Check slave client thread assignment + wait_for_condition 50 100 { + [get_slave_client_io_thread $master 0] > 0 + } else { + fail "Slave client was not assigned to IO thread" + } + } + + test {WAIT command works with master/slave in IO threads} { + # Test basic WAIT functionality + $master set wait_test_key1 value1 + $master set wait_test_key2 value2 + $master incr wait_counter + + assert {[$master wait 1 2000] == 1} + + # Verify data reached slave + wait_for_condition 10 100 { + [$slave get wait_test_key1] eq "value1" && + [$slave get wait_test_key2] eq "value2" && + [$slave get wait_counter] eq "1" + } else { + fail "commands not propagated to IO thread slave in time" + } + } + + test {Replication data integrity with IO threads} { + # Generate significant replication traffic + for {set i 0} {$i < 100} {incr i} { + $master set bulk_key_$i [string repeat "data" 10] + $master lpush bulk_list element_$i + $master zadd bulk_zset $i member_$i + if {$i % 20 == 0} { + # Periodically verify WAIT works + assert {[$master wait 1 2000] == 1} + } + } + + # Final verification + wait_for_condition 50 100 { + [$slave get bulk_key_99] eq [string repeat "data" 10] && + [$slave llen bulk_list] eq 100 && + [$slave zcard bulk_zset] eq 100 + } else { + fail "Replication data integrity failed" + } + } + + test {WAIT timeout behavior with slave in IO thread} { + set slave_pid [srv -1 pid] + + # Pause slave to test timeout + pause_process $slave_pid + + # Should timeout and return 0 acks + $master set timeout_test_key timeout_value + set start_time [clock milliseconds] + assert {[$master wait 1 2000] == 0} + set elapsed [expr {[clock milliseconds] - $start_time}] + assert_range $elapsed 2000 2500 + + # Resume and verify recovery + resume_process $slave_pid + + assert {[$master wait 1 2000] == 1} + + # Verify data reached slave after resume + wait_for_condition 10 100 { + [$slave get timeout_test_key] eq "timeout_value" + } else { + fail "commands not propagated to IO thread slave in time" + } + } + + test {Network interruption recovery with IO threads} { + # Generate traffic before interruption + for {set i 0} {$i < 50} {incr i} { + $master set pre_interrupt_$i value_$i + } + + # Simulate network interruption + pause_process $slave_pid + + # Continue writing during interruption + for {set i 0} {$i < 50} {incr i} { + $master set during_interrupt_$i value_$i + } + + # WAIT should timeout + assert {[$master wait 1 2000] == 0} + + # Resume slave and verify recovery + resume_process $slave_pid + + # Verify WAIT works again + assert {[$master wait 1 2000] == 1} + + # Wait for reconnection and catch up + wait_for_condition 100 100 { + [$slave get during_interrupt_49] eq "value_49" + } else { + fail "Slave didn't catch up after network recovery" + } + + $master set post_recovery_test recovery_value + wait_for_condition 10 100 { + [$slave get post_recovery_test] eq "recovery_value" + } else { + fail "Slave didn't receive 'set post_recovery_test' command" + } + + # Check thread assignments after recovery + wait_for_condition 100 100 { + [get_master_client_io_thread $slave] > 0 + } else { + fail "Slave client not assigned to IO thread after recovery" + } + } + + test {Replication reconnection cycles with IO threads} { + # Test multiple disconnect/reconnect cycles + for {set cycle 0} {$cycle < 3} {incr cycle} { + # Generate traffic + for {set i 0} {$i < 20} {incr i} { + $master set cycle_${cycle}_key_$i value_$i + } + + assert {[$master wait 1 2000] == 1} + + # Record thread assignments during cycle + set master_thread [get_master_client_io_thread $slave] + set slave_thread [get_slave_client_io_thread $master 0] + puts "Cycle $cycle - Master thread: $master_thread, Slave thread: $slave_thread" + + # Disconnect and reconnect (except last cycle) + if {$cycle < 2} { + $slave replicaof no one + after 100 + $slave replicaof $master_host $master_port + wait_for_sync $slave + } + } + + # Verify final state + wait_for_condition 10 100 { + [$slave get cycle_2_key_19] eq "value_19" + } else { + fail "last command not propagated to IO thread slave in time" + } + } + + test {INFO replication shows correct thread information} { + # Test INFO replication output format + set info [$master info replication] + + # Should show master role + assert_match "*role:master*" $info + + # Should have slave thread information + assert_match "*slave0:*io-thread=*" $info + + # Test we can parse the thread ID + set slave_thread [get_slave_client_io_thread $master 0] + assert_morethan $slave_thread 0 + + # Test master client thread info + set slave_info [$slave info replication] + assert_match "*role:slave*" $slave_info + assert_match "*master_client_io_thread:*" $slave_info + + set master_thread [get_master_client_io_thread $slave] + assert_morethan $master_thread 0 + } +} +} + +start_server {overrides {io-threads 4 save ""} tags {"iothreads repl network external:skip"}} { +start_server {overrides {io-threads 4 save ""}} { +start_server {overrides {io-threads 4 save ""}} { +start_server {overrides {io-threads 4 save ""}} { + set master [srv 0 client] + set master_host [srv 0 host] + set master_port [srv 0 port] + set slave1 [srv -1 client] + set slave2 [srv -2 client] + set slave3 [srv -3 client] + + test {Multiple slaves across IO threads} { + # Setup replication for all slaves + $slave1 replicaof $master_host $master_port + $slave2 replicaof $master_host $master_port + $slave3 replicaof $master_host $master_port + + # Wait for all slaves to be online + wait_replica_online $master 0 + wait_replica_online $master 1 + wait_replica_online $master 2 + + set iterations 5 + while {[incr iterations -1] >= 0} { + # Slave clients are connected - force a write so that they are assigned + # to IO threads. + assert_equal "OK" [$master set x x] + + wait_for_condition 10 100 { + ([get_slave_client_io_thread $master 0] > 0) && + ([get_slave_client_io_thread $master 1] > 0) && + ([get_slave_client_io_thread $master 2] > 0) + } else { + continue + } + + break + } + if {$iterations < 0} { + fail "Replicas failed to be assigned to IO threads in time" + } + + # Test concurrent replication to all slaves + for {set i 0} {$i < 200} {incr i} { + $master set multi_key_$i value_$i + if {$i % 50 == 0} { + assert {[$master wait 3 2000] == 3} + } + } + + # Final verification all slaves got data + wait_for_condition 50 100 { + [$slave1 get multi_key_199] eq "value_199" && + [$slave2 get multi_key_199] eq "value_199" && + [$slave3 get multi_key_199] eq "value_199" + } else { + fail "Multi-slave replication failed" + } + } + + test {WAIT with multiple slaves in IO threads} { + # Test various WAIT scenarios + $master set wait_multi_test1 value1 + assert {[$master wait 3 2000] == 3} + + $master set wait_multi_test2 value2 + assert {[$master wait 2 2000] >= 2} + + $master set wait_multi_test3 value3 + assert {[$master wait 1 2000] >= 1} + + # Verify all slaves have the data + wait_for_condition 10 100 { + [$slave1 get wait_multi_test3] eq "value3" && + [$slave2 get wait_multi_test3] eq "value3" && + [$slave3 get wait_multi_test3] eq "value3" + } else { + fail "commands not propagated to io thread slaves in time" + } + } +} +} +} +} + diff --git a/tests/integration/replication.tcl b/tests/integration/replication.tcl index 6f1d30854..1805a3d52 100644 --- a/tests/integration/replication.tcl +++ b/tests/integration/replication.tcl @@ -1247,7 +1247,7 @@ test {Kill rdb child process if its dumping RDB is not useful} { # Slave2 disconnect with master $slave2 slaveof no one # Should kill child - wait_for_condition 100 10 { + wait_for_condition 1000 10 { [s 0 rdb_bgsave_in_progress] eq 0 } else { fail "can't kill rdb child" diff --git a/tests/sentinel/tests/12-master-reboot.tcl b/tests/sentinel/tests/12-master-reboot.tcl index 5779316a3..3d0d82821 100644 --- a/tests/sentinel/tests/12-master-reboot.tcl +++ b/tests/sentinel/tests/12-master-reboot.tcl @@ -55,8 +55,13 @@ test "Master reboot in very short time" { kill_instance redis $master_id reboot_instance redis $master_id + set max_tries 1000 + if {$::tsan} { + set max_tries 5000 + } + foreach_sentinel_id id { - wait_for_condition 1000 100 { + wait_for_condition $max_tries 100 { [lindex [S $id SENTINEL GET-MASTER-ADDR-BY-NAME mymaster] 1] != $old_port } else { fail "At least one Sentinel did not receive failover info" @@ -104,4 +109,4 @@ test "The old master eventually gets reconfigured as a slave" { } else { fail "Old master not reconfigured as slave of new master" } -} \ No newline at end of file +} diff --git a/tests/support/test.tcl b/tests/support/test.tcl index d85f31e0b..60fd0a966 100644 --- a/tests/support/test.tcl +++ b/tests/support/test.tcl @@ -126,6 +126,10 @@ proc assert_refcount_morethan {key ref} { # max retries and delay between retries. Otherwise the 'elsescript' is # executed. proc wait_for_condition {maxtries delay e _else_ elsescript} { + if {$_else_ ne "else"} { + error "$_else_ must be equal to \"else\"" + } + while {[incr maxtries -1] >= 0} { set errcode [catch {uplevel 1 [list expr $e]} result] if {$errcode == 0} { @@ -136,7 +140,7 @@ proc wait_for_condition {maxtries delay e _else_ elsescript} { after $delay } if {$maxtries == -1} { - set errcode [catch [uplevel 1 $elsescript] result] + set errcode [catch {uplevel 1 $elsescript} result] return -code $errcode $result } } diff --git a/tests/support/util.tcl b/tests/support/util.tcl index f7be470df..5d06c8cd9 100644 --- a/tests/support/util.tcl +++ b/tests/support/util.tcl @@ -129,7 +129,14 @@ proc waitForBgrewriteaof r { } proc wait_for_sync r { - wait_for_condition 50 100 { + set maxtries 50 + # tsan adds significant overhead to the execution time, so we increase the + # wait time here JIC + if {$::tsan} { + set maxtries 100 + } + + wait_for_condition $maxtries 100 { [status $r master_link_status] eq "up" } else { fail "replica didn't sync in time" @@ -137,6 +144,12 @@ proc wait_for_sync r { } proc wait_replica_online {r {replica_id 0} {maxtries 50} {delay 100}} { + # tsan adds significant overhead to the execution time, so we increase the + # wait time here JIC + if {$::tsan} { + set maxtries [expr {$maxtries * 2}] + } + wait_for_condition $maxtries $delay { [string match "*slave$replica_id:*,state=online*" [$r info replication]] } else { @@ -145,7 +158,13 @@ proc wait_replica_online {r {replica_id 0} {maxtries 50} {delay 100}} { } proc wait_for_ofs_sync {r1 r2} { - wait_for_condition 50 100 { + set maxtries 50 + # tsan adds significant overhead to the execution time, so we increase the + # wait time here JIC + if {$::tsan} { + set maxtries 100 + } + wait_for_condition $maxtries 100 { [status $r1 master_repl_offset] eq [status $r2 master_repl_offset] } else { fail "replica offset didn't match in time" @@ -722,7 +741,23 @@ proc resume_process {pid} { puts [get_proc_job $pid] fail "process was not stopped" } - exec kill -SIGCONT $pid + + set max_attempts 10 + set attempt 0 + while {($attempt < $max_attempts) && [string match "T*" [exec ps -o state= -p $pid]]} { + exec kill -SIGCONT $pid + + incr attempt + after 100 + } + + wait_for_condition 50 1000 { + [string match "R*" [exec ps -o state= -p $pid]] || + [string match "S*" [exec ps -o state= -p $pid]] + } else { + puts [exec ps j $pid] + fail "process was not resumed" + } } proc cmdrstat {cmd r} { diff --git a/tests/unit/moduleapi/propagate.tcl b/tests/unit/moduleapi/propagate.tcl index eed61bf66..6fd315579 100644 --- a/tests/unit/moduleapi/propagate.tcl +++ b/tests/unit/moduleapi/propagate.tcl @@ -596,8 +596,13 @@ tags "modules external:skip" { assert_equal [$master get k1] 1 assert_equal [$master ttl k1] -1 - assert_equal [$replica get k1] 1 - assert_equal [$replica ttl k1] -1 + + wait_for_condition 50 100 { + [$replica get k1] eq 1 && + [$replica ttl k1] eq -1 + } else { + fail "failed RM_Call of expired key propagation" + } } test {module notification on set} {