mirror of
https://github.com/redis/redis.git
synced 2026-02-03 20:39:54 -05:00
Handle primary/replica clients in IO threads (#14335)
# Problem While introducing Async IO threads(https://github.com/redis/redis/pull/13695) primary and replica clients were left to be handled inside main thread due to data race and synchronization issues. This PR solves this issue with the additional hope it increases performance of replication. # Overview ## Moving the clients to IO threads Since clients first participate in a handshake and an RDB replication phases it was decided they are moved to IO-thread after RDB replication is done. For primary client this was trivial as the master client is created only after RDB sync (+ some additional checks one can see in `isClientMustHandledByMainThread`). Replica clients though are moved to IO threads immediately after connection (as are all clients) so currently in `unstable` replication happens while this client is in IO-thread. In this PR it was moved to main thread after receiving the first `REPLCONF` message from the replica, but it is a bit hacky and we can remove it. I didn't find issues between the two versions. ## Primary client (replica node) We have few issues here: - during `serverCron` a `replicationCron` is ran which periodically sends `REPLCONF ACK` message to the master, also checks for timed-out master. In order to prevent data races we utilize`IOThreadClientsCron`. The client is periodically sent to main thread and during `processClientsFromIOThread` it's checked if it needs to run the replication cron behaviour. - data races with main thread - specifically `lastinteraction` and `read_reploff` members of the primary client that are written to in `readQueryFromClient` could be accessed at the same time from main thread during execution of `INFO REPLICATION`(`genRedisInfoString`). To solve this the members were duplicated so if the client is in IO-thread it writes to the duplicates and they are synced with the original variables each time the client is send to main thread ( that means `INFO REPLICATION` could potentially return stale values). - During `freeClient` the primary client is fetched to main thread but when caching it(`replicationCacheMaster`) the thread id will remain the id of the IO thread it was from. This creates problems when resurrecting the master client. Here the call to `unbindClientFromIOThreadEventLoop` in `freeClient` was rewritten to call `keepClientInMainThread` which automatically fixes the problem. - During `exitScriptTimedoutMode` the master is queued for reprocessing (specifically process any pending commands ASAP after it's unblocked). We do that by putting it in the `server.unblocked_clients` list, which are processed in the next `beforeSleep` cycle in main thread. Since this will create a contention between main and IO thread, we just skip this queueing in `unblocked_clients` and just queue the client to main thread - the `processClientsFromIOThread` will process the pending commands just as main would have. ## Replica clients (primary node) We move the client after RDB replication is done and after replication backlog is fed with its first message. We do that so that the client's reference to the first replication backlog node is initialized before it's read from IO-thread, hence no contention with main thread on it. ### Shared replication buffer Currently in unstable the replication buffer is shared amongst clients. This is done via clients holding references to the nodes inside the buffer. A node from the buffer can be trimmed once each replica client has read it and send its contents. The reference is `client->ref_repl_buf_node`. The replication buffer is written to by main thread in `feedReplicationBuffer` and the refcounting is intrusive - it's inside the replication-buffer nodes themselves. Since the replica client changes the refcount (decreases the refcount of the node it has just read, and increases the refcount of the next node it starts to read) during `writeToClient` we have a data race with main thread when it feeds the replication buffer. Moreover, main thread also updates the `used` size of the node - how much it has written to it, compared to its capacity which the replica client relies on to know how much to read. Obviously replica being in IO-thread creates another data race here. To mitigate these issues a few new variables were added to the client's struct: - `io_curr_repl_node` - starting node this replica is reading from inside IO-thread - `io_bound_repl_node` - the last node in the replication buffer the replica sees before being send to IO-thread. These values are only allowed to be updated in main thread. The client keeps track of how much it has read into the buffer via the old `ref_repl_buf_node`. Generally while in IO-thread the replica client will now keep refcount of the `io_curr_repl_node` until it's processed all the nodes up to `io_bound_repl_node` - at that point its returned to main thread which can safely update the refcounts. The `io_bound_repl_node` reference is there so the replica knows when to stop reading from the repl buffer - imagine that replica reads from the last node of the replication buffer while main thread feeds data to it - we will create a data race on the `used` value (`_writeToClientSlave`(IO-thread) vs `feedReplicationBuffer`(main)). That's why this value is updated just before the replica is being send to IO thread. *NOTE*, this means that when replicas are handled by IO threads they will hold more than one node at a time (i.e `io_curr_repl_node` up to `io_bound_repl_node`) meaning trimming will happen a bit less frequently. Tests show no significant problems with that. (tnx to @ShooterIT for the `io_curr_repl_node` and `io_bound_repl_node` mechanism as my initial implementation had similar semantics but was way less clear) Example of how this works: * Replication buffer state at time N: | node 0| ... | node M, used_size K | * replica caches `io_curr_repl_node`=0, `io_bound_repl_node`=M and `io_bound_block_pos`=K * replica moves to IO thread and processes all the data it sees * Replication buffer state at time N + 1: | node 0| ... | node M, used_size Full | |node M + 1| |node M + 2, used_size L|, where Full > M * replica moves to main thread at time N + 1, at this point following happens - refcount to node 0 (io_curr_repl_node) is decreased - `ref_repl_buf_node` becomes node M(io_bound_repl_node) (we still have size-K bytes to process from there) - refcount to node M is increased (now all nodes from 0 up to M-1 including can be trimmed unless some other replica holds reference to them) - And just before the replica is send back to IO thread the following are updated: - `io_bound_repl_node` ref becomes node M+2 - `io_bound_block_pos` becomes L Note that replica client is only moved to main if it has processed all the data it knows about (i.e up to `io_bound_repl_node` + `io_bound_block_pos`) ### Replica clients kept in main as much as possible During implementation an issue arose - how fast is the replica client able to get knowledge about new data from the replication buffer and how fast can it trim it. In order for that to happen ASAP whenever a replica is moved to main it remains there until the replication buffer is fed new data. At that point its put in the pending write queue and special cased in handleClientsWithPendingWrites so that its send to IO thread ASAP to write the new data to replica. Also since each time the replica writes its whole repl data it knows about that means after it's send to main thread `processClientsFromIOThread` is able to immediately update the refcounts and trim whatever it can. ### ACK messages from primary Slave clients need to periodically read `REPLCONF ACK` messages from client. Since replica can remain in main thread indefinitely if no DB change occurs, a new atomic `pending_read` was added during `readQueryFromClient`. If a replica client has a pending read it's returned back to IO-thread in order to process the read even if there is no pending repl data to write. ### Replicas during shutdown During shutdown the main thread pauses write actions and periodically checks if all replicas have reached the same replication offset as the primary node. During `finishShutdown` that may or may not be the case. Either way a client data may be read from the replicas and even we may try to write any pending data to them inside `flushSlavesOutputBuffers`. In order to prevent races all the replicas from IO threads are moved to main via `fetchClientFromIOThread`. A cancel of the shutdown should be ok, since the mechanism employed by `handleClientsWithPendingWrites` should return the client back to IO thread when needed. ## Notes While adding new tests timing issues with Tsan tests were found and fixed. Also there is a data race issue caught by Tsan on the `last_error` member of the `client` struct. It happens when both IO-thread and main thread make a syscall using a `client` instance - this can happen only for primary and replica clients since their data can be accessed by commands send from other clients. Specific example is the `INFO REPLICATION` command. Although other such races were fixed, as described above, this once is insignificant and it was decided to be ignored in `tsan.sup`. --------- Co-authored-by: Yuan Wang <wangyuancode@163.com> Co-authored-by: Yuan Wang <yuan.wang@redis.com>
This commit is contained in:
parent
b9c00b27f8
commit
e3c38aab66
19 changed files with 886 additions and 86 deletions
14
.github/workflows/daily.yml
vendored
14
.github/workflows/daily.yml
vendored
|
|
@ -322,7 +322,7 @@ jobs:
|
|||
run: sudo apt-get install tcl8.6 tclx
|
||||
- name: test
|
||||
if: true && !contains(github.event.inputs.skiptests, 'redis')
|
||||
run: ./runtest --config io-threads 4 --accurate --verbose --tags network --dump-logs ${{github.event.inputs.test_args}}
|
||||
run: ./runtest --config io-threads 4 --accurate --verbose --tags "network iothreads psync2 repl failover" --dump-logs ${{github.event.inputs.test_args}}
|
||||
- name: cluster tests
|
||||
if: true && !contains(github.event.inputs.skiptests, 'cluster')
|
||||
run: ./runtest-cluster --config io-threads 4 ${{github.event.inputs.cluster_test_args}}
|
||||
|
|
@ -433,7 +433,10 @@ jobs:
|
|||
sudo apt-get install tcl8.6 tclx valgrind g++ -y
|
||||
- name: test
|
||||
if: true && !contains(github.event.inputs.skiptests, 'redis')
|
||||
run: ./runtest --valgrind --no-latency --verbose --clients 1 --timeout 2400 --dump-logs ${{github.event.inputs.test_args}}
|
||||
# Note that valgrind's overhead doesn't pair well with io-threads so we
|
||||
# explicitly disable tests tagged with 'iothreads' - these are tests that
|
||||
# always run with io-threads enabled.
|
||||
run: ./runtest --valgrind --no-latency --verbose --clients 1 --timeout 2400 --tags -iothreads --dump-logs ${{github.event.inputs.test_args}}
|
||||
|
||||
test-valgrind-misc:
|
||||
runs-on: ubuntu-latest
|
||||
|
|
@ -495,7 +498,7 @@ jobs:
|
|||
sudo apt-get install tcl8.6 tclx valgrind g++ -y
|
||||
- name: test
|
||||
if: true && !contains(github.event.inputs.skiptests, 'redis')
|
||||
run: ./runtest --valgrind --no-latency --verbose --clients 1 --timeout 2400 --dump-logs ${{github.event.inputs.test_args}}
|
||||
run: ./runtest --valgrind --tags -iothreads --no-latency --verbose --clients 1 --timeout 2400 --dump-logs ${{github.event.inputs.test_args}}
|
||||
|
||||
test-valgrind-no-malloc-usable-size-misc:
|
||||
runs-on: ubuntu-latest
|
||||
|
|
@ -580,7 +583,7 @@ jobs:
|
|||
!contains(github.event.inputs.skipjobs, 'sanitizer')
|
||||
timeout-minutes: 14400
|
||||
env:
|
||||
CC: clang # MSan work only with clang
|
||||
CC: clang # MSan works only with clang
|
||||
steps:
|
||||
- name: prep
|
||||
if: github.event_name == 'workflow_dispatch'
|
||||
|
|
@ -665,6 +668,7 @@ jobs:
|
|||
!contains(github.event.inputs.skipjobs, 'sanitizer')
|
||||
timeout-minutes: 14400
|
||||
strategy:
|
||||
fail-fast: false # let gcc and clang both run until the end even if one of them fails
|
||||
matrix:
|
||||
compiler: [ gcc, clang ]
|
||||
env:
|
||||
|
|
@ -696,7 +700,7 @@ jobs:
|
|||
run: ./runtest --tsan --clients 1 --config io-threads 4 --accurate --verbose --dump-logs ${{github.event.inputs.test_args}}
|
||||
- name: sentinel tests
|
||||
if: true && !contains(github.event.inputs.skiptests, 'sentinel')
|
||||
run: ./runtest-sentinel --config io-threads 2 ${{github.event.inputs.cluster_test_args}}
|
||||
run: ./runtest-sentinel --tsan ${{github.event.inputs.cluster_test_args}}
|
||||
- name: cluster tests
|
||||
if: true && !contains(github.event.inputs.skiptests, 'cluster')
|
||||
run: ./runtest-cluster --config io-threads 2 ${{github.event.inputs.cluster_test_args}}
|
||||
|
|
|
|||
163
src/iothread.c
163
src/iothread.c
|
|
@ -42,6 +42,67 @@ static inline void sendPendingClientsToMainThreadIfNeeded(IOThread *t, int check
|
|||
}
|
||||
}
|
||||
|
||||
/* When moving a client from IO thread to main thread we may need to update
|
||||
* some of its variables as they are duplicated to avoid contention with main
|
||||
* thread.
|
||||
* For now this is valid only for master or slave clients. */
|
||||
void updateClientDataFromIOThread(client *c) {
|
||||
if (!(c->flags & CLIENT_MASTER) && !(c->flags & CLIENT_SLAVE)) return;
|
||||
|
||||
serverAssert(c->tid != IOTHREAD_MAIN_THREAD_ID &&
|
||||
c->running_tid == IOTHREAD_MAIN_THREAD_ID);
|
||||
|
||||
if (c->io_repl_ack_time > c->repl_ack_time) {
|
||||
serverAssert(c->flags & CLIENT_SLAVE);
|
||||
c->repl_ack_time = c->io_repl_ack_time;
|
||||
}
|
||||
if (c->io_lastinteraction > c->lastinteraction) {
|
||||
serverAssert(c->flags & CLIENT_MASTER);
|
||||
c->lastinteraction = c->io_lastinteraction;
|
||||
}
|
||||
if (c->io_read_reploff > c->read_reploff) {
|
||||
serverAssert(c->flags & CLIENT_MASTER);
|
||||
c->read_reploff = c->io_read_reploff;
|
||||
}
|
||||
|
||||
/* Update replication buffer referenced node if IO thread has sent some data. */
|
||||
if (c->flags & CLIENT_SLAVE && c->ref_repl_buf_node != NULL &&
|
||||
(c->io_curr_repl_node != c->ref_repl_buf_node ||
|
||||
c->io_curr_block_pos != c->ref_block_pos))
|
||||
{
|
||||
((replBufBlock*)listNodeValue(c->ref_repl_buf_node))->refcount--;
|
||||
((replBufBlock*)listNodeValue(c->io_curr_repl_node))->refcount++;
|
||||
c->ref_block_pos = c->io_curr_block_pos;
|
||||
c->ref_repl_buf_node = c->io_curr_repl_node;
|
||||
incrementalTrimReplicationBacklog(REPL_BACKLOG_TRIM_BLOCKS_PER_CALL);
|
||||
}
|
||||
}
|
||||
|
||||
/* Check to see if the client needs any cron jobs run for them. Return 1 if the
|
||||
* client should be terminated */
|
||||
int runClientCronFromIOThread(client *c) {
|
||||
if (c->flags & CLIENT_MASTER &&
|
||||
c->io_last_repl_cron + 1000 <= server.mstime)
|
||||
{
|
||||
c->io_last_repl_cron = server.mstime;
|
||||
if (replicationCronRunMasterClient()) return 1;
|
||||
}
|
||||
|
||||
/* Run client cron task for the client per second or it is marked as pending cron. */
|
||||
if (c->io_last_client_cron + 1000 <= server.mstime ||
|
||||
c->io_flags & CLIENT_IO_PENDING_CRON)
|
||||
{
|
||||
c->io_last_client_cron = server.mstime;
|
||||
if (clientsCronRunClient(c)) return 1;
|
||||
} else {
|
||||
/* Update the client in the mem usage if clientsCronRunClient is not
|
||||
* being called, since that function already performs the update. */
|
||||
updateClientMemUsageAndBucket(c);
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
/* When IO threads read a complete query of clients or want to free clients, it
|
||||
* should remove it from its clients list and put the client in the list to main
|
||||
* thread, we will send these clients to main thread in IOThreadBeforeSleep. */
|
||||
|
|
@ -66,6 +127,32 @@ void enqueuePendingClientsToMainThread(client *c, int unbind) {
|
|||
}
|
||||
}
|
||||
|
||||
void enqueuePendingClienstToIOThreads(client *c) {
|
||||
serverAssert(c->tid != IOTHREAD_MAIN_THREAD_ID &&
|
||||
c->running_tid == IOTHREAD_MAIN_THREAD_ID);
|
||||
|
||||
if (c->flags & CLIENT_PENDING_WRITE) {
|
||||
c->flags &= ~CLIENT_PENDING_WRITE;
|
||||
listUnlinkNode(server.clients_pending_write, &c->clients_pending_write_node);
|
||||
}
|
||||
if (c->flags & CLIENT_SLAVE) {
|
||||
serverAssert(c->ref_repl_buf_node != NULL);
|
||||
|
||||
c->io_repl_ack_time = c->repl_ack_time;
|
||||
c->io_curr_repl_node = c->ref_repl_buf_node;
|
||||
c->io_curr_block_pos = c->ref_block_pos;
|
||||
c->io_bound_repl_node = listLast(server.repl_buffer_blocks);
|
||||
c->io_bound_block_pos = ((replBufBlock*)listNodeValue(c->io_bound_repl_node))->used;
|
||||
}
|
||||
if (c->flags & CLIENT_MASTER) {
|
||||
c->io_read_reploff = c->read_reploff;
|
||||
c->io_lastinteraction = c->lastinteraction;
|
||||
}
|
||||
|
||||
c->running_tid = c->tid;
|
||||
listAddNodeHead(mainThreadPendingClientsToIOThreads[c->tid], c);
|
||||
}
|
||||
|
||||
/* Unbind connection of client from io thread event loop, write and read handlers
|
||||
* also be removed, ensures that we can operate the client safely. */
|
||||
void unbindClientFromIOThreadEventLoop(client *c) {
|
||||
|
|
@ -88,11 +175,12 @@ void keepClientInMainThread(client *c) {
|
|||
server.io_threads_clients_num[c->tid]--;
|
||||
/* Unbind connection of client from io thread event loop. */
|
||||
unbindClientFromIOThreadEventLoop(c);
|
||||
/* Update the client's data in case it was just fetched from IO thread */
|
||||
updateClientDataFromIOThread(c);
|
||||
/* Let main thread to run it, rebind event loop and read handler */
|
||||
connRebindEventLoop(c->conn, server.el);
|
||||
connSetReadHandler(c->conn, readQueryFromClient);
|
||||
c->io_flags |= CLIENT_IO_READ_ENABLED | CLIENT_IO_WRITE_ENABLED;
|
||||
c->running_tid = IOTHREAD_MAIN_THREAD_ID;
|
||||
c->tid = IOTHREAD_MAIN_THREAD_ID;
|
||||
freeClientDeferredObjects(c, 1); /* Free deferred objects. */
|
||||
freeClientIODeferredObjects(c, 1); /* Free IO deferred objects. */
|
||||
|
|
@ -132,22 +220,26 @@ void fetchClientFromIOThread(client *c) {
|
|||
/* Unbind connection of client from io thread event loop. */
|
||||
connUnbindEventLoop(c->conn);
|
||||
/* Now main thread can process it. */
|
||||
c->running_tid = IOTHREAD_MAIN_THREAD_ID;
|
||||
resumeIOThread(c->tid);
|
||||
freeClientDeferredObjects(c, 1); /* Free deferred objects. */
|
||||
freeClientIODeferredObjects(c, 1); /* Free IO deferred objects. */
|
||||
tryUnlinkClientFromPendingRefReply(c, 0);
|
||||
|
||||
/* Keep the client in main thread. */
|
||||
c->running_tid = IOTHREAD_MAIN_THREAD_ID;
|
||||
keepClientInMainThread(c);
|
||||
}
|
||||
|
||||
/* For some clients, we must handle them in the main thread, since there is
|
||||
* data race to be processed in IO threads.
|
||||
*
|
||||
* - Close ASAP, we must free the client in main thread.
|
||||
* - Replica, pubsub, monitor, blocked, tracking clients, main thread may
|
||||
* - Pubsub, monitor, blocked, tracking clients, main thread may
|
||||
* directly write them a reply when conditions are met.
|
||||
* - Script command with debug may operate connection directly. */
|
||||
* - Script command with debug may operate connection directly.
|
||||
* - Master/Replica are only handled by IO thread when RDB replication is
|
||||
* completed. Note we need to check them after checking for other flags
|
||||
* that may overlap with CLIENT_MASTER/SLAVE - CLOSE_ASAP, MONITOR,
|
||||
* (UN)BLOCKED, TRACKING. */
|
||||
int isClientMustHandledByMainThread(client *c) {
|
||||
if (c->flags & (CLIENT_CLOSE_ASAP | CLIENT_MASTER | CLIENT_SLAVE |
|
||||
if (c->flags & (CLIENT_CLOSE_ASAP |
|
||||
CLIENT_PUBSUB | CLIENT_MONITOR | CLIENT_BLOCKED |
|
||||
CLIENT_UNBLOCKED | CLIENT_TRACKING | CLIENT_LUA_DEBUG |
|
||||
CLIENT_LUA_DEBUG_SYNC | CLIENT_ASM_MIGRATING |
|
||||
|
|
@ -155,6 +247,34 @@ int isClientMustHandledByMainThread(client *c) {
|
|||
{
|
||||
return 1;
|
||||
}
|
||||
|
||||
/* If RDB replication is done it's safe to move the master client to an IO thread.
|
||||
* Note that we keep the master client in main thread during failover so as
|
||||
* not to slow down the failover process by waiting the master replication
|
||||
* cron in IO thread. */
|
||||
if (c->flags & CLIENT_MASTER &&
|
||||
server.repl_state == REPL_STATE_CONNECTED &&
|
||||
server.repl_rdb_ch_state == REPL_RDB_CH_STATE_NONE &&
|
||||
server.failover_state == NO_FAILOVER)
|
||||
{
|
||||
return 0;
|
||||
}
|
||||
|
||||
/* If RDB replication is done for this slave it's safe to move it to an IO thread
|
||||
* Note that we also check if the ref_repl_buf_node is initialized in order
|
||||
* to prevent race conditions with main thread when it feeds the replication
|
||||
* buffer. */
|
||||
if (c->flags & CLIENT_SLAVE &&
|
||||
(c->replstate == SLAVE_STATE_ONLINE ||
|
||||
c->replstate == SLAVE_STATE_SEND_BULK_AND_STREAM) &&
|
||||
c->repl_start_cmd_stream_on_ack == 0 &&
|
||||
c->ref_repl_buf_node != NULL)
|
||||
{
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (c->flags & (CLIENT_MASTER | CLIENT_SLAVE)) return 1;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
|
@ -175,7 +295,6 @@ void assignClientToIOThread(client *c) {
|
|||
/* Assign the client to the IO thread. */
|
||||
server.io_threads_clients_num[c->tid]--;
|
||||
c->tid = min_id;
|
||||
c->running_tid = min_id;
|
||||
server.io_threads_clients_num[min_id]++;
|
||||
|
||||
/* The client running in IO thread needs to have deferred objects array. */
|
||||
|
|
@ -186,7 +305,8 @@ void assignClientToIOThread(client *c) {
|
|||
* to IO thread in beforeSleep. */
|
||||
connUnbindEventLoop(c->conn);
|
||||
c->io_flags &= ~(CLIENT_IO_READ_ENABLED | CLIENT_IO_WRITE_ENABLED);
|
||||
listAddNodeTail(mainThreadPendingClientsToIOThreads[c->tid], c);
|
||||
|
||||
enqueuePendingClienstToIOThreads(c);
|
||||
}
|
||||
|
||||
/* If updating maxclients config, we not only resize the event loop of main thread
|
||||
|
|
@ -480,17 +600,12 @@ int processClientsFromIOThread(IOThread *t) {
|
|||
continue;
|
||||
}
|
||||
|
||||
/* Run cron task for the client per second or it is marked as pending cron. */
|
||||
if (c->last_cron_check_time + 1000 <= server.mstime ||
|
||||
c->io_flags & CLIENT_IO_PENDING_CRON)
|
||||
{
|
||||
c->last_cron_check_time = server.mstime;
|
||||
if (clientsCronRunClient(c)) continue;
|
||||
} else {
|
||||
/* Update the client in the mem usage if clientsCronRunClient is not
|
||||
* being called, since that function already performs the update. */
|
||||
updateClientMemUsageAndBucket(c);
|
||||
}
|
||||
/* Update some client's members while we are in main thread so we avoid
|
||||
* data races. */
|
||||
updateClientDataFromIOThread(c);
|
||||
|
||||
/* Check if we need to run a cron job for the client */
|
||||
if (runClientCronFromIOThread(c)) continue;
|
||||
|
||||
/* Process the pending command and input buffer. */
|
||||
if (!isClientReadErrorFatal(c) && c->io_flags & CLIENT_IO_PENDING_COMMAND) {
|
||||
|
|
@ -515,6 +630,10 @@ int processClientsFromIOThread(IOThread *t) {
|
|||
continue;
|
||||
}
|
||||
|
||||
/* Handle replica clients in putReplicasInPendingClientsToIOThreads in
|
||||
* beforeSleep */
|
||||
if (c->flags & CLIENT_SLAVE) continue;
|
||||
|
||||
/* Remove this client from pending write clients queue of main thread,
|
||||
* And some clients may do not have reply if CLIENT REPLY OFF/SKIP. */
|
||||
if (c->flags & CLIENT_PENDING_WRITE) {
|
||||
|
|
@ -524,7 +643,7 @@ int processClientsFromIOThread(IOThread *t) {
|
|||
c->running_tid = c->tid;
|
||||
listLinkNodeHead(mainThreadPendingClientsToIOThreads[c->tid], node);
|
||||
node = NULL;
|
||||
|
||||
|
||||
/* If there are several clients to process, let io thread handle them ASAP. */
|
||||
sendPendingClientsToIOThreadIfNeeded(t, 1);
|
||||
}
|
||||
|
|
|
|||
140
src/networking.c
140
src/networking.c
|
|
@ -157,6 +157,10 @@ client *createClient(connection *conn) {
|
|||
c->last_header = NULL;
|
||||
c->ref_repl_buf_node = NULL;
|
||||
c->ref_block_pos = 0;
|
||||
c->io_curr_repl_node = NULL;
|
||||
c->io_curr_block_pos = 0;
|
||||
c->io_bound_repl_node = NULL;
|
||||
c->io_bound_block_pos = 0;
|
||||
c->qb_pos = 0;
|
||||
c->querybuf = NULL;
|
||||
c->querybuf_peak = 0;
|
||||
|
|
@ -186,6 +190,7 @@ client *createClient(connection *conn) {
|
|||
c->slot = -1;
|
||||
c->cluster_compatibility_check_slot = -2;
|
||||
c->ctime = c->lastinteraction = server.unixtime;
|
||||
c->io_lastinteraction = 0;
|
||||
c->duration = 0;
|
||||
clientSetDefaultAuth(c);
|
||||
c->replstate = REPL_STATE_NONE;
|
||||
|
|
@ -193,9 +198,11 @@ client *createClient(connection *conn) {
|
|||
c->reploff = 0;
|
||||
c->reploff_next = 0;
|
||||
c->read_reploff = 0;
|
||||
c->io_read_reploff = 0;
|
||||
c->repl_applied = 0;
|
||||
c->repl_ack_off = 0;
|
||||
c->repl_ack_time = 0;
|
||||
c->io_repl_ack_time = 0;
|
||||
c->repl_aof_off = 0;
|
||||
c->repl_last_partial_write = 0;
|
||||
c->slave_listening_port = 0;
|
||||
|
|
@ -222,7 +229,8 @@ client *createClient(connection *conn) {
|
|||
c->postponed_list_node = NULL;
|
||||
c->client_tracking_redirection = 0;
|
||||
c->client_tracking_prefixes = NULL;
|
||||
c->last_cron_check_time = 0;
|
||||
c->io_last_client_cron = 0;
|
||||
c->io_last_repl_cron = 0;
|
||||
c->last_memory_usage = 0;
|
||||
c->last_memory_type = CLIENT_TYPE_NORMAL;
|
||||
c->module_blocked_client = NULL;
|
||||
|
|
@ -243,6 +251,7 @@ client *createClient(connection *conn) {
|
|||
c->commands_processed = 0;
|
||||
c->task = NULL;
|
||||
c->node_id = NULL;
|
||||
atomicSet(c->pending_read, 0);
|
||||
return c;
|
||||
}
|
||||
|
||||
|
|
@ -317,7 +326,7 @@ static inline int _prepareClientToWrite(client *c) {
|
|||
* If the client runs in an IO thread, we should not put the client in the
|
||||
* pending write queue. Instead, we will install the write handler to the
|
||||
* corresponding IO thread’s event loop and let it handle the reply. */
|
||||
if (!clientHasPendingReplies(c) && likely(c->running_tid == IOTHREAD_MAIN_THREAD_ID))
|
||||
if (likely(c->running_tid == IOTHREAD_MAIN_THREAD_ID) && !clientHasPendingReplies(c))
|
||||
putClientInPendingWriteQueue(c);
|
||||
|
||||
/* Authorize the caller to queue in the output buffer of this client. */
|
||||
|
|
@ -1459,8 +1468,9 @@ void deferredAfterErrorReply(client *c, list *errors) {
|
|||
/* Logically copy 'src' replica client buffers info to 'dst' replica.
|
||||
* Basically increase referenced buffer block node reference count. */
|
||||
void copyReplicaOutputBuffer(client *dst, client *src) {
|
||||
serverAssert(src->bufpos == 0 && listLength(src->reply) == 0);
|
||||
|
||||
serverAssert(src->bufpos == 0 && listLength(src->reply) == 0);
|
||||
serverAssert(src->running_tid == IOTHREAD_MAIN_THREAD_ID &&
|
||||
dst->running_tid == IOTHREAD_MAIN_THREAD_ID);
|
||||
if (src->ref_repl_buf_node == NULL) return;
|
||||
dst->ref_repl_buf_node = src->ref_repl_buf_node;
|
||||
dst->ref_block_pos = src->ref_block_pos;
|
||||
|
|
@ -1479,10 +1489,15 @@ static inline int _clientHasPendingRepliesSlave(client *c) {
|
|||
|
||||
/* If the last replication buffer block content is totally sent,
|
||||
* we have nothing to send. */
|
||||
listNode *ln = listLast(server.repl_buffer_blocks);
|
||||
replBufBlock *tail = listNodeValue(ln);
|
||||
if (ln == c->ref_repl_buf_node &&
|
||||
c->ref_block_pos == tail->used) return 0;
|
||||
if (c->running_tid == IOTHREAD_MAIN_THREAD_ID) {
|
||||
listNode *ln = listLast(server.repl_buffer_blocks);
|
||||
replBufBlock *tail = listNodeValue(ln);
|
||||
if (ln == c->ref_repl_buf_node &&
|
||||
c->ref_block_pos == tail->used) return 0;
|
||||
} else {
|
||||
if (c->io_bound_repl_node == c->io_curr_repl_node &&
|
||||
c->io_bound_block_pos == c->io_curr_block_pos) return 0;
|
||||
}
|
||||
return 1;
|
||||
}
|
||||
|
||||
|
|
@ -2036,7 +2051,7 @@ void freeClient(client *c) {
|
|||
|
||||
/* We need to unbind connection of client from io thread event loop first. */
|
||||
if (c->tid != IOTHREAD_MAIN_THREAD_ID) {
|
||||
unbindClientFromIOThreadEventLoop(c);
|
||||
keepClientInMainThread(c);
|
||||
}
|
||||
|
||||
/* Update the number of clients in the IO thread. */
|
||||
|
|
@ -2076,6 +2091,7 @@ void freeClient(client *c) {
|
|||
serverLog(LL_NOTICE,"Connection with master lost.");
|
||||
if (!(c->flags & (CLIENT_PROTOCOL_ERROR|CLIENT_BLOCKED))) {
|
||||
c->flags &= ~(CLIENT_CLOSE_ASAP|CLIENT_CLOSE_AFTER_REPLY);
|
||||
c->io_flags &= ~CLIENT_IO_CLOSE_ASAP;
|
||||
replicationCacheMaster(c);
|
||||
return;
|
||||
}
|
||||
|
|
@ -2591,6 +2607,26 @@ static inline int _writeToClientNonSlave(client *c, ssize_t *nwritten) {
|
|||
static inline int _writeToClientSlave(client *c, ssize_t *nwritten) {
|
||||
*nwritten = 0;
|
||||
serverAssert(c->bufpos == 0 && listLength(c->reply) == 0);
|
||||
|
||||
if (c->running_tid != IOTHREAD_MAIN_THREAD_ID) {
|
||||
replBufBlock *o = listNodeValue(c->io_curr_repl_node);
|
||||
/* The IO thread must not send data beyond the bound position. */
|
||||
size_t pos = c->io_curr_repl_node == c->io_bound_repl_node ?
|
||||
c->io_bound_block_pos : o->used;
|
||||
if (pos > c->io_curr_block_pos) {
|
||||
*nwritten = connWrite(c->conn, o->buf+c->io_curr_block_pos,
|
||||
pos-c->io_curr_block_pos);
|
||||
if (*nwritten <= 0) return C_ERR;
|
||||
c->io_curr_block_pos += *nwritten;
|
||||
}
|
||||
/* If we fully sent the object and there are more nodes to send, go to the next one. */
|
||||
if (c->io_curr_block_pos == pos && c->io_curr_repl_node != c->io_bound_repl_node) {
|
||||
c->io_curr_repl_node = listNextNode(c->io_curr_repl_node);
|
||||
c->io_curr_block_pos = 0;
|
||||
}
|
||||
return C_OK;
|
||||
}
|
||||
|
||||
replBufBlock *o = listNodeValue(c->ref_repl_buf_node);
|
||||
serverAssert(o->used >= c->ref_block_pos);
|
||||
/* Send current block if it is not fully sent. */
|
||||
|
|
@ -2704,6 +2740,14 @@ int writeToClient(client *c, int handler_installed) {
|
|||
/* Remove client from pending referenced reply clients list. */
|
||||
if (c->running_tid == IOTHREAD_MAIN_THREAD_ID)
|
||||
tryUnlinkClientFromPendingRefReply(c, 1);
|
||||
|
||||
/* If replica client has sent all the replication data it knows about
|
||||
* we send it to main thread so it can pick up new repl data ASAP.
|
||||
* Note, that we keep it in IO thread in case we have a pending ACK read. */
|
||||
if (c->flags & CLIENT_SLAVE && c->running_tid != IOTHREAD_MAIN_THREAD_ID) {
|
||||
if (!replicaFromIOThreadHasPendingRead(c))
|
||||
enqueuePendingClientsToMainThread(c, 0);
|
||||
}
|
||||
}
|
||||
/* Update client's memory usage after writing.
|
||||
* Since this isn't thread safe we do this conditionally. */
|
||||
|
|
@ -2731,6 +2775,11 @@ int handleClientsWithPendingWrites(void) {
|
|||
listRewind(server.clients_pending_write,&li);
|
||||
while((ln = listNext(&li))) {
|
||||
client *c = listNodeValue(ln);
|
||||
|
||||
/* We handle IO thread replicas in putReplicasInPendingClientsToIOThreads */
|
||||
if (c->flags & CLIENT_SLAVE && c->tid != IOTHREAD_MAIN_THREAD_ID)
|
||||
continue;
|
||||
|
||||
c->flags &= ~CLIENT_PENDING_WRITE;
|
||||
listUnlinkNode(server.clients_pending_write,ln);
|
||||
|
||||
|
|
@ -2744,6 +2793,7 @@ int handleClientsWithPendingWrites(void) {
|
|||
/* Let IO thread handle the client if possible. */
|
||||
if (server.io_threads_num > 1 &&
|
||||
!(c->flags & CLIENT_CLOSE_AFTER_REPLY) &&
|
||||
c->tid == IOTHREAD_MAIN_THREAD_ID &&
|
||||
!isClientMustHandledByMainThread(c))
|
||||
{
|
||||
assignClientToIOThread(c);
|
||||
|
|
@ -2793,7 +2843,8 @@ static inline void resetClientInternal(client *c, int num_pcmds_to_free) {
|
|||
c->cur_script = NULL;
|
||||
c->slot = -1;
|
||||
c->cluster_compatibility_check_slot = -2;
|
||||
c->flags &= ~CLIENT_EXECUTING_COMMAND;
|
||||
if (c->flags & CLIENT_EXECUTING_COMMAND)
|
||||
c->flags &= ~CLIENT_EXECUTING_COMMAND;
|
||||
|
||||
/* Make sure the duration has been recorded to some command. */
|
||||
serverAssert(c->duration == 0);
|
||||
|
|
@ -2807,19 +2858,27 @@ static inline void resetClientInternal(client *c, int num_pcmds_to_free) {
|
|||
|
||||
/* We clear the ASKING flag as well if we are not inside a MULTI, and
|
||||
* if what we just executed is not the ASKING command itself. */
|
||||
if (!(c->flags & CLIENT_MULTI) && prevcmd != askingCommand)
|
||||
if (c->flags & CLIENT_ASKING && !(c->flags & CLIENT_MULTI) &&
|
||||
prevcmd != askingCommand)
|
||||
{
|
||||
c->flags &= ~CLIENT_ASKING;
|
||||
}
|
||||
|
||||
/* We do the same for the CACHING command as well. It also affects
|
||||
* the next command or transaction executed, in a way very similar
|
||||
* to ASKING. */
|
||||
if (!(c->flags & CLIENT_MULTI) && prevcmd != clientCommand)
|
||||
if (c->flags & CLIENT_TRACKING_CACHING && !(c->flags & CLIENT_MULTI) &&
|
||||
prevcmd != clientCommand)
|
||||
{
|
||||
c->flags &= ~CLIENT_TRACKING_CACHING;
|
||||
}
|
||||
|
||||
/* Remove the CLIENT_REPLY_SKIP flag if any so that the reply
|
||||
* to the next command will be sent, but set the flag if the command
|
||||
* we just processed was "CLIENT REPLY SKIP". */
|
||||
c->flags &= ~CLIENT_REPLY_SKIP;
|
||||
if (c->flags & CLIENT_REPLY_SKIP)
|
||||
c->flags &= ~CLIENT_REPLY_SKIP;
|
||||
|
||||
if (c->flags & CLIENT_REPLY_SKIP_NEXT) {
|
||||
c->flags |= CLIENT_REPLY_SKIP;
|
||||
c->flags &= ~CLIENT_REPLY_SKIP_NEXT;
|
||||
|
|
@ -2908,8 +2967,18 @@ int processInlineBuffer(client *c, pendingCommand *pcmd) {
|
|||
/* Newline from slaves can be used to refresh the last ACK time.
|
||||
* This is useful for a slave to ping back while loading a big
|
||||
* RDB file. */
|
||||
if (querylen == 0 && clientTypeIsSlave(c))
|
||||
c->repl_ack_time = server.unixtime;
|
||||
if (querylen == 0 && clientTypeIsSlave(c)) {
|
||||
if (c->running_tid == IOTHREAD_MAIN_THREAD_ID)
|
||||
c->repl_ack_time = server.unixtime;
|
||||
else
|
||||
/* If this is a replica client running in an IO thread we cache the
|
||||
* last ack time in a different member variable in order to avoid
|
||||
* contention with main thread. f.e see refreshGoodSlavesCount()
|
||||
* Note c->repl_ack_time will still be updated in
|
||||
* updateClientDataFromIOThread with the value of c->io_repl_ack_time
|
||||
* when the client moves from IO to main thread. */
|
||||
c->io_repl_ack_time = server.unixtime;
|
||||
}
|
||||
|
||||
/* Masters should never send us inline protocol to run actual
|
||||
* commands. If this happens, it is likely due to a bug in Redis where
|
||||
|
|
@ -3505,7 +3574,11 @@ int processInputBuffer(client *c) {
|
|||
if (unlikely(pcmd->read_error || (pcmd->flags & PENDING_CMD_FLAG_INCOMPLETE)))
|
||||
break;
|
||||
|
||||
pcmd->reploff = c->read_reploff - sdslen(c->querybuf) + c->qb_pos;
|
||||
if (c->running_tid == IOTHREAD_MAIN_THREAD_ID)
|
||||
pcmd->reploff = c->read_reploff - sdslen(c->querybuf) + c->qb_pos;
|
||||
else
|
||||
pcmd->reploff = c->io_read_reploff - sdslen(c->querybuf) + c->qb_pos;
|
||||
|
||||
preprocessCommand(c, pcmd);
|
||||
pcmd->flags |= PENDING_CMD_FLAG_PREPROCESSED;
|
||||
resetClientQbufState(c);
|
||||
|
|
@ -3611,7 +3684,14 @@ void readQueryFromClient(connection *conn) {
|
|||
client *c = connGetPrivateData(conn);
|
||||
int nread, big_arg = 0;
|
||||
size_t qblen, readlen;
|
||||
if (!(c->io_flags & CLIENT_IO_READ_ENABLED)) return;
|
||||
|
||||
if (!(c->io_flags & CLIENT_IO_READ_ENABLED)) {
|
||||
atomicSetWithSync(c->pending_read, 1);
|
||||
return;
|
||||
} else if (server.io_threads_num > 1) {
|
||||
atomicSetWithSync(c->pending_read, 0);
|
||||
}
|
||||
|
||||
c->read_error = 0;
|
||||
|
||||
/* Update the number of reads of io threads on server */
|
||||
|
|
@ -3702,9 +3782,21 @@ void readQueryFromClient(connection *conn) {
|
|||
qblen = sdslen(c->querybuf);
|
||||
if (c->querybuf_peak < qblen) c->querybuf_peak = qblen;
|
||||
|
||||
c->lastinteraction = server.unixtime;
|
||||
if (!(c->flags & CLIENT_MASTER) || c->running_tid == IOTHREAD_MAIN_THREAD_ID)
|
||||
c->lastinteraction = server.unixtime;
|
||||
else
|
||||
/* Avoid contention with genRedisInfoString as it can access master
|
||||
* client's data. If this is a master running in IO thread the value of
|
||||
* c->lastinteraction will be updated during processClientsFromIOThread */
|
||||
c->io_lastinteraction = server.unixtime;
|
||||
|
||||
if (c->flags & CLIENT_MASTER) {
|
||||
c->read_reploff += nread;
|
||||
if (c->running_tid == IOTHREAD_MAIN_THREAD_ID) {
|
||||
c->read_reploff += nread;
|
||||
} else {
|
||||
/* Same comment as for c->io_lastinteraction */
|
||||
c->io_read_reploff += nread;
|
||||
}
|
||||
atomicIncr(server.stat_net_repl_input_bytes, nread);
|
||||
} else {
|
||||
atomicIncr(server.stat_net_input_bytes, nread);
|
||||
|
|
@ -5127,6 +5219,16 @@ void flushSlavesOutputBuffers(void) {
|
|||
listRewind(server.slaves,&li);
|
||||
while((ln = listNext(&li))) {
|
||||
client *slave = listNodeValue(ln);
|
||||
|
||||
/* Fetch the replica clients that are currently running in IO thread. */
|
||||
if (slave->running_tid != IOTHREAD_MAIN_THREAD_ID) {
|
||||
fetchClientFromIOThread(slave);
|
||||
/* If the slave doesn't have any pending replies nothing to do
|
||||
* anyways. */
|
||||
if (!clientHasPendingReplies(slave)) continue;
|
||||
putClientInPendingWriteQueue(slave);
|
||||
}
|
||||
|
||||
int can_receive_writes = connHasWriteHandler(slave->conn) ||
|
||||
(slave->flags & CLIENT_PENDING_WRITE);
|
||||
|
||||
|
|
|
|||
|
|
@ -98,6 +98,86 @@ unsigned long replicationLogicalReplicaCount(void) {
|
|||
return count;
|
||||
}
|
||||
|
||||
int replicaFromIOThreadHasPendingRead(client *c) {
|
||||
serverAssert(c->tid != IOTHREAD_MAIN_THREAD_ID);
|
||||
|
||||
int pending_read;
|
||||
atomicGetWithSync(c->pending_read, pending_read);
|
||||
return pending_read;
|
||||
}
|
||||
|
||||
/* Send replicas to their respective IO threads if it has pending reads or
|
||||
* writes. Otherwise it remains in main thread so it can check for new data in
|
||||
* the replication buffer ASAP. */
|
||||
void putReplicasInPendingClientsToIOThreads(void) {
|
||||
if (server.io_threads_num <= 1) return;
|
||||
|
||||
serverAssert(pthread_equal(pthread_self(), server.main_thread_id));
|
||||
|
||||
listIter li;
|
||||
listNode *ln;
|
||||
listRewind(server.slaves,&li);
|
||||
while((ln = listNext(&li))) {
|
||||
client *replica = listNodeValue(ln);
|
||||
|
||||
/* We only care about replicas that need to run on IO thread but are
|
||||
* currently in main */
|
||||
if (replica->tid == IOTHREAD_MAIN_THREAD_ID ||
|
||||
replica->running_tid != IOTHREAD_MAIN_THREAD_ID)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
/* Skip the replica if it's scheduled for close */
|
||||
if (replica->flags & CLIENT_CLOSE_ASAP) continue;
|
||||
|
||||
/* The call to clientHasPendingReplies may seem redundant but in the
|
||||
* case of replica being in IO thread we can have the following case:
|
||||
* replica gets back to main thread after sending the repl buffer it
|
||||
* knows about. In the mean time main thread has accumulated new repl
|
||||
* data. In that case the replica's client wouldn't have been put in
|
||||
* the pending write queue but will still have new repl data it needs to
|
||||
* send, so we make sure to check for that and send it back to IO thread
|
||||
* if so. On the other hand if replica gets back to main thread before
|
||||
* any new repl data has accumulated then after a new cmd is propagated
|
||||
* the replica will be put in the pending write queue as usual so we
|
||||
* need to check for that also.
|
||||
* In addition, if the replica client has pending read events, we should
|
||||
* also send them to the IO thread. */
|
||||
if (replica->flags & CLIENT_PENDING_WRITE ||
|
||||
clientHasPendingReplies(replica) ||
|
||||
replicaFromIOThreadHasPendingRead(replica))
|
||||
{
|
||||
enqueuePendingClienstToIOThreads(replica);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/* Run some cron tasks for a connected master client. Return 1 when the client
|
||||
* is freed, 0 otherwise. */
|
||||
int replicationCronRunMasterClient(void) {
|
||||
if (!server.masterhost || !server.master) return 0;
|
||||
|
||||
if (server.master->running_tid != IOTHREAD_MAIN_THREAD_ID) return 0;
|
||||
|
||||
/* Timed out master when we are an already connected slave? */
|
||||
if (server.repl_state == REPL_STATE_CONNECTED &&
|
||||
(time(NULL)-server.master->lastinteraction) > server.repl_timeout)
|
||||
{
|
||||
serverLog(LL_WARNING,"MASTER timeout: no data nor PING received...");
|
||||
freeClient(server.master);
|
||||
return 1;
|
||||
}
|
||||
|
||||
/* Send ACK to master from time to time.
|
||||
* Note that we do not send periodic acks to masters that don't
|
||||
* support PSYNC and replication offsets. */
|
||||
if (!(server.master->flags & CLIENT_PRE_PSYNC))
|
||||
replicationSendAck();
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
ConnectionType *connTypeOfReplication(void) {
|
||||
if (server.tls_replication) {
|
||||
return connectionTypeTls();
|
||||
|
|
@ -375,6 +455,8 @@ void incrementalTrimReplicationBacklog(size_t max_blocks) {
|
|||
|
||||
/* Free replication buffer blocks that are referenced by this client. */
|
||||
void freeReplicaReferencedReplBuffer(client *replica) {
|
||||
serverAssert(replica->running_tid == IOTHREAD_MAIN_THREAD_ID);
|
||||
|
||||
if (replica->ref_repl_buf_node != NULL) {
|
||||
/* Decrease the start buffer node reference count. */
|
||||
replBufBlock *o = listNodeValue(replica->ref_repl_buf_node);
|
||||
|
|
@ -706,6 +788,8 @@ void replicationFeedMonitors(client *c, list *monitors, int dictid, robj **argv,
|
|||
/* Feed the slave 'c' with the replication backlog starting from the
|
||||
* specified 'offset' up to the end of the backlog. */
|
||||
long long addReplyReplicationBacklog(client *c, long long offset) {
|
||||
serverAssert(c->running_tid == IOTHREAD_MAIN_THREAD_ID);
|
||||
|
||||
long long skip;
|
||||
|
||||
serverLog(LL_DEBUG, "[PSYNC] Replica request offset: %lld", offset);
|
||||
|
|
@ -4361,6 +4445,7 @@ void replicationSendAck(void) {
|
|||
*/
|
||||
void replicationCacheMaster(client *c) {
|
||||
serverAssert(server.master != NULL && server.cached_master == NULL);
|
||||
serverAssert(server.master->tid == IOTHREAD_MAIN_THREAD_ID);
|
||||
serverLog(LL_NOTICE,"Caching the disconnected master state.");
|
||||
|
||||
/* Unlink the client from the server structures. */
|
||||
|
|
@ -4456,6 +4541,8 @@ void replicationDiscardCachedMaster(void) {
|
|||
* so the stream of data that we'll receive will start from where this
|
||||
* master left. */
|
||||
void replicationResurrectCachedMaster(connection *conn) {
|
||||
serverAssert(server.cached_master->tid == IOTHREAD_MAIN_THREAD_ID);
|
||||
|
||||
server.master = server.cached_master;
|
||||
server.cached_master = NULL;
|
||||
server.master->conn = conn;
|
||||
|
|
@ -4800,14 +4887,6 @@ void replicationCron(void) {
|
|||
cancelReplicationHandshake(1);
|
||||
}
|
||||
|
||||
/* Timed out master when we are an already connected slave? */
|
||||
if (server.masterhost && server.repl_state == REPL_STATE_CONNECTED &&
|
||||
(time(NULL)-server.master->lastinteraction) > server.repl_timeout)
|
||||
{
|
||||
serverLog(LL_WARNING,"MASTER timeout: no data nor PING received...");
|
||||
freeClient(server.master);
|
||||
}
|
||||
|
||||
/* Check if we should connect to a MASTER */
|
||||
if (server.repl_state == REPL_STATE_CONNECT) {
|
||||
serverLog(LL_NOTICE,"Connecting to MASTER %s:%d",
|
||||
|
|
@ -4815,12 +4894,7 @@ void replicationCron(void) {
|
|||
connectWithMaster();
|
||||
}
|
||||
|
||||
/* Send ACK to master from time to time.
|
||||
* Note that we do not send periodic acks to masters that don't
|
||||
* support PSYNC and replication offsets. */
|
||||
if (server.masterhost && server.master &&
|
||||
!(server.master->flags & CLIENT_PRE_PSYNC))
|
||||
replicationSendAck();
|
||||
replicationCronRunMasterClient();
|
||||
|
||||
/* If we have attached slaves, PING them from time to time.
|
||||
* So slaves can implement an explicit timeout to masters, and will
|
||||
|
|
|
|||
17
src/script.c
17
src/script.c
|
|
@ -38,7 +38,22 @@ static void exitScriptTimedoutMode(scriptRunCtx *run_ctx) {
|
|||
run_ctx->flags &= ~SCRIPT_TIMEDOUT;
|
||||
blockingOperationEnds();
|
||||
/* if we are a replica and we have an active master, set it for continue processing */
|
||||
if (server.masterhost && server.master) queueClientForReprocessing(server.master);
|
||||
if (server.masterhost && server.master) {
|
||||
/* Master running in IO thread needs to be sent to main thread so that
|
||||
* it can process any pending commands ASAP without waiting for the next
|
||||
* read.
|
||||
* We don't queue the client for reprocessing in this case as it will
|
||||
* create contention with main thread when it deals with unblocked
|
||||
* clients - see comment above queueClientForReprocessing. */
|
||||
if (server.master->running_tid != IOTHREAD_MAIN_THREAD_ID) {
|
||||
pauseIOThread(server.master->tid);
|
||||
enqueuePendingClientsToMainThread(server.master, 0);
|
||||
resumeIOThread(server.master->tid);
|
||||
return;
|
||||
}
|
||||
|
||||
queueClientForReprocessing(server.master);
|
||||
}
|
||||
}
|
||||
|
||||
static void enterScriptTimedoutMode(scriptRunCtx *run_ctx) {
|
||||
|
|
|
|||
24
src/server.c
24
src/server.c
|
|
@ -1983,6 +1983,10 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
|
|||
/* Handle writes with pending output buffers. */
|
||||
handleClientsWithPendingWrites();
|
||||
|
||||
/* Check if IO thread replicas have any pending read or writes and send them
|
||||
* back to their threads if so. */
|
||||
putReplicasInPendingClientsToIOThreads();
|
||||
|
||||
/* Let io thread to handle its pending clients. */
|
||||
sendPendingClientsToIOThreads();
|
||||
|
||||
|
|
@ -4883,6 +4887,15 @@ int finishShutdown(void) {
|
|||
/* Don't count migration destination replicas. */
|
||||
if (replica->flags & CLIENT_ASM_MIGRATING) continue;
|
||||
num_replicas++;
|
||||
|
||||
/* We pause the IO thread this replica is running on so we avoid data
|
||||
* races. */
|
||||
int paused = 0;
|
||||
if (replica->running_tid != IOTHREAD_MAIN_THREAD_ID) {
|
||||
pauseIOThread(replica->tid);
|
||||
paused = 1;
|
||||
}
|
||||
|
||||
if (replica->repl_ack_off != server.master_repl_offset) {
|
||||
num_lagging_replicas++;
|
||||
long lag = replica->replstate == SLAVE_STATE_ONLINE ?
|
||||
|
|
@ -4894,6 +4907,8 @@ int finishShutdown(void) {
|
|||
lag,
|
||||
replstateToString(replica->replstate));
|
||||
}
|
||||
|
||||
if (paused) resumeIOThread(replica->tid);
|
||||
}
|
||||
if (num_replicas > 0) {
|
||||
serverLog(LL_NOTICE,
|
||||
|
|
@ -6520,10 +6535,11 @@ sds genRedisInfoString(dict *section_dict, int all_sections, int everything) {
|
|||
server.repl_down_since ?
|
||||
(intmax_t)(server.unixtime-server.repl_down_since) : -1);
|
||||
} else {
|
||||
info = sdscatprintf(info,
|
||||
info = sdscatprintf(info, FMTARGS(
|
||||
"master_link_up_since_seconds:%jd\r\n",
|
||||
server.repl_up_since ? /* defensive code, should never be 0 when connected */
|
||||
(intmax_t)(server.unixtime-server.repl_up_since) : -1);
|
||||
(intmax_t)(server.unixtime-server.repl_up_since) : -1,
|
||||
"master_client_io_thread:%d\r\n", server.master->tid));
|
||||
}
|
||||
info = sdscatprintf(info, "total_disconnect_time_sec:%jd\r\n", (intmax_t)server.repl_total_disconnect_time+(current_disconnect_time));
|
||||
|
||||
|
|
@ -6582,9 +6598,9 @@ sds genRedisInfoString(dict *section_dict, int all_sections, int everything) {
|
|||
|
||||
info = sdscatprintf(info,
|
||||
"slave%d:ip=%s,port=%d,state=%s,"
|
||||
"offset=%lld,lag=%ld\r\n",
|
||||
"offset=%lld,lag=%ld,io-thread=%d\r\n",
|
||||
slaveid,slaveip,slave->slave_listening_port,state,
|
||||
slave->repl_ack_off, lag);
|
||||
slave->repl_ack_off, lag, slave->tid);
|
||||
slaveid++;
|
||||
}
|
||||
}
|
||||
|
|
|
|||
58
src/server.h
58
src/server.h
|
|
@ -1124,7 +1124,27 @@ typedef struct clientReplyBlock {
|
|||
* node, it should first increase the next node's refcount, and when we trim
|
||||
* the replication buffer nodes, we remove node always from the head node which
|
||||
* refcount is 0. If the refcount of the head node is not 0, we must stop
|
||||
* trimming and never iterate the next node. */
|
||||
* trimming and never iterate the next node.
|
||||
*
|
||||
* For replicas in IO threads we don't update the refcount while sending the
|
||||
* repl data, but only when the client is sent back to main. This avoids data
|
||||
* races. In order to achieve this, the replicas keep track of following:
|
||||
* - io_curr_repl_node - the current node we've reached.
|
||||
* - io_bound_repl_node - the last node in the replication buffer as seen by
|
||||
* the replica client before it was sent to IO thread
|
||||
*
|
||||
* When the client is sent to IO thread for the first time io_curr_repl_node is
|
||||
* initialized with ref_repl_buf_node.
|
||||
* When the client is sent back to main it can decrement ref_repl_buf_node's
|
||||
* refcount and increment it for io_curr_repl_node, since all the nodes
|
||||
* in-between are already sent and the client doesn't hold reference to them.
|
||||
*
|
||||
* `io_bound_repl_node` is needed because IO thread needs to know when to stop
|
||||
* sending data. If it was reading directly from the replication buffer,
|
||||
* there will be a data race, because main thread may be writing to it during
|
||||
* `feedReplicationBuffer`. `io_bound_repl_node` is cached in the client
|
||||
* together with its used size just before sending the client to IO thread
|
||||
* in `enqueuePendingClienstToIOThreads`. */
|
||||
|
||||
/* Similar with 'clientReplyBlock', it is used for shared buffers between
|
||||
* all replica clients and replication backlog. */
|
||||
|
|
@ -1132,7 +1152,8 @@ typedef struct replBufBlock {
|
|||
int refcount; /* Number of replicas or repl backlog using. */
|
||||
long long id; /* The unique incremental number. */
|
||||
long long repl_offset; /* Start replication offset of the block. */
|
||||
size_t size, used;
|
||||
size_t size; /* Capacity of the buf in bytes */
|
||||
size_t used; /* Count of written bytes */
|
||||
char buf[];
|
||||
} replBufBlock;
|
||||
|
||||
|
|
@ -1452,8 +1473,15 @@ typedef struct client {
|
|||
* any positive number means we found a slot and no violation yet. */
|
||||
dictEntry *cur_script; /* Cached pointer to the dictEntry of the script being executed. */
|
||||
time_t lastinteraction; /* Time of the last interaction, used for timeout */
|
||||
time_t io_lastinteraction; /* Time of the last interaction as seen from
|
||||
* IO thread. When the client is moved to main
|
||||
* it updates its `lastinteraction` value from
|
||||
* this. */
|
||||
time_t obuf_soft_limit_reached_time;
|
||||
mstime_t last_cron_check_time; /* The last client check time in cron */
|
||||
mstime_t io_last_client_cron; /* Timestamp of last invocation of client
|
||||
* cron if client is running in IO thread */
|
||||
mstime_t io_last_repl_cron; /* Timestamp of last invocation of replication
|
||||
* cron if client is running in IO thread. */
|
||||
int authenticated; /* Needed when the default user requires auth. */
|
||||
int replstate; /* Replication state if this is a slave. */
|
||||
int repl_start_cmd_stream_on_ack; /* Install slave write handler on first ACK. */
|
||||
|
|
@ -1462,12 +1490,20 @@ typedef struct client {
|
|||
off_t repldbsize; /* Replication DB file size. */
|
||||
sds replpreamble; /* Replication DB preamble. */
|
||||
long long read_reploff; /* Read replication offset if this is a master. */
|
||||
long long io_read_reploff; /* Copy of read_reploff but only used when
|
||||
* master client is in IO thread so we don't
|
||||
* have contention with IO thread. */
|
||||
long long reploff; /* Applied replication offset if this is a master. */
|
||||
long long reploff_next; /* Next value to set for reploff when a command finishes executing */
|
||||
long long repl_applied; /* Applied replication data count in querybuf, if this is a replica. */
|
||||
long long repl_ack_off; /* Replication ack offset, if this is a slave. */
|
||||
long long repl_aof_off; /* Replication AOF fsync ack offset, if this is a slave. */
|
||||
long long repl_ack_time;/* Replication ack time, if this is a slave. */
|
||||
long long io_repl_ack_time; /* Replication ack time, if this is a replica in
|
||||
* IO thread. Keeps track of repl_ack_time while
|
||||
* replica is in IO thread to avoid data races
|
||||
* with main. repl_ack_time is updated with this
|
||||
* value when replica returns to main thread. */
|
||||
long long repl_last_partial_write; /* The last time the server did a partial write from the RDB child pipe to this replica */
|
||||
long long psync_initial_offset; /* FULLRESYNC reply offset other slaves
|
||||
copying this slave output buffer
|
||||
|
|
@ -1528,6 +1564,14 @@ typedef struct client {
|
|||
* see the definition of replBufBlock. */
|
||||
size_t ref_block_pos; /* Access position of referenced buffer block,
|
||||
* i.e. the next offset to send. */
|
||||
listNode *io_curr_repl_node; /* Current node we are sending repl data from in
|
||||
* IO thread. */
|
||||
size_t io_curr_block_pos; /* Current position we are sending repl data from
|
||||
* in IO thread. */
|
||||
listNode *io_bound_repl_node;/* Bound node we are sending repl data from in
|
||||
* IO thread. */
|
||||
size_t io_bound_block_pos; /* Bound position we are sending repl data from
|
||||
* in IO thread. */
|
||||
|
||||
/* list node in clients_pending_write list */
|
||||
listNode clients_pending_write_node;
|
||||
|
|
@ -1554,6 +1598,9 @@ typedef struct client {
|
|||
unsigned long long commands_processed; /* Total count of commands this client executed. */
|
||||
struct asmTask *task; /* Atomic slot migration task */
|
||||
char *node_id; /* Node ID to connect to for atomic slot migration */
|
||||
|
||||
redisAtomic int pending_read; /* Flag indicating an IO thread client residing
|
||||
* in main thread has received a read event. */
|
||||
} client;
|
||||
|
||||
typedef struct __attribute__((aligned(CACHE_LINE_SIZE))) {
|
||||
|
|
@ -3170,7 +3217,7 @@ void resumeIOThreadsRange(int start, int end);
|
|||
int resizeAllIOThreadsEventLoops(size_t newsize);
|
||||
int sendPendingClientsToIOThreads(void);
|
||||
void enqueuePendingClientsToMainThread(client *c, int unbind);
|
||||
void putInPendingClienstForIOThreads(client *c);
|
||||
void enqueuePendingClienstToIOThreads(client *c);
|
||||
void handleClientReadError(client *c);
|
||||
void unbindClientFromIOThreadEventLoop(client *c);
|
||||
int processClientsOfAllIOThreads(void);
|
||||
|
|
@ -3317,6 +3364,9 @@ void replDataBufInit(replDataBuf *buf);
|
|||
void replDataBufClear(replDataBuf *buf);
|
||||
void replDataBufReadFromConn(connection *conn, replDataBuf *buf, void (*error_handler)(connection *conn));
|
||||
int replDataBufStreamToDb(replDataBuf *buf, replDataBufToDbCtx *ctx);
|
||||
int replicaFromIOThreadHasPendingRead(client *c);
|
||||
void putReplicasInPendingClientsToIOThreads(void);
|
||||
int replicationCronRunMasterClient(void);
|
||||
|
||||
/* Generic persistence functions */
|
||||
void startLoadingFile(size_t size, char* filename, int rdbflags);
|
||||
|
|
|
|||
13
src/tsan.sup
13
src/tsan.sup
|
|
@ -6,3 +6,16 @@ signal:printCrashReport
|
|||
# TODO Investigate this race in jemalloc probably related to
|
||||
# https://github.com/jemalloc/jemalloc/issues/2621
|
||||
race:malloc_mutex_trylock_final
|
||||
|
||||
# A race can happen on conn->last_errno if replica client is reading/writing
|
||||
# data in IO thread and main thread is calling connAddrPeerName for some reason
|
||||
# (f.e genRedisInfoString/roleCommand...).
|
||||
# Not worth the additional code for synchronization as:
|
||||
# - errno is thread-safe according to POSIX std
|
||||
# - we don't support systems that allow word tearing, i.e last_errno value would
|
||||
# be a correct value at the end - either the errno from main or from IO thread
|
||||
# - even if we fix the data race on last_errno we still have the problem of it
|
||||
# being set to either errno unless we pause the IO thread during main-thread's
|
||||
# execution which would incur too big of a cost.
|
||||
# - the race happens rarely
|
||||
race:connSocketAddr
|
||||
|
|
|
|||
|
|
@ -35,7 +35,8 @@ proc get_one_of_my_replica {id} {
|
|||
|
||||
# To avoid -LOADING reply, wait until replica syncs with master.
|
||||
wait_for_condition 1000 50 {
|
||||
[RI $replica_id_num master_link_status] eq {up}
|
||||
[RI $replica_id_num master_link_status] eq {up} &&
|
||||
[R $replica_id_num dbsize] eq [R $id dbsize]
|
||||
} else {
|
||||
fail "Replica did not sync in time."
|
||||
}
|
||||
|
|
|
|||
|
|
@ -40,6 +40,7 @@ set ::dirs {} ; # We remove all the temp dirs at exit
|
|||
set ::run_matching {} ; # If non empty, only tests matching pattern are run.
|
||||
set ::stop_on_failure 0
|
||||
set ::loop 0
|
||||
set ::tsan 0
|
||||
|
||||
if {[catch {cd tmp}]} {
|
||||
puts "tmp directory not found."
|
||||
|
|
@ -310,6 +311,8 @@ proc parse_options {} {
|
|||
set ::log_req_res 1
|
||||
} elseif {$opt eq {--force-resp3}} {
|
||||
set ::force_resp3 1
|
||||
} elseif {$opt eq {--tsan}} {
|
||||
set ::tsan 1
|
||||
} elseif {$opt eq "--help"} {
|
||||
puts "--single <pattern> Only runs tests specified by pattern."
|
||||
puts "--dont-clean Keep log files on exit."
|
||||
|
|
|
|||
|
|
@ -81,7 +81,7 @@ start_server {overrides {save {}}} {
|
|||
resume_process [srv -1 pid]
|
||||
|
||||
# Execute the failover
|
||||
$node_0 failover to $node_1_host $node_1_port
|
||||
assert_equal "OK" [$node_0 failover to $node_1_host $node_1_port]
|
||||
|
||||
# Wait for failover to end
|
||||
wait_for_condition 50 100 {
|
||||
|
|
@ -180,10 +180,19 @@ start_server {overrides {save {}}} {
|
|||
|
||||
assert_equal [count_log_message -2 "time out exceeded, failing over."] 1
|
||||
|
||||
# We should accept both psyncs, although this is the condition we might not
|
||||
# since we didn't catch up.
|
||||
assert_equal [expr [s 0 sync_partial_ok] - $initial_psyncs] 2
|
||||
assert_equal [expr [s 0 sync_full] - $initial_syncs] 0
|
||||
# We should accept both psyncs, although this is the condition we might
|
||||
# not meet since we didn't catch up. This happens often if TSan is
|
||||
# enabled as it slows down the execution time significantly.
|
||||
set psyncs [expr [s 0 sync_partial_ok] - $initial_psyncs]
|
||||
set full_syncs [expr [s 0 sync_full] - $initial_syncs]
|
||||
if {$::tsan} {
|
||||
assert_lessthan_equal $psyncs 2
|
||||
assert_morethan_equal $full_syncs 0
|
||||
assert_equal [expr $psyncs + $full_syncs] 2
|
||||
} else {
|
||||
assert_equal $psyncs 2
|
||||
assert_equal $full_syncs 0
|
||||
}
|
||||
assert_digests_match $node_0 $node_1 $node_2
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -164,8 +164,12 @@ start_server {} {
|
|||
assert {[status $replica sync_partial_ok] == 1}
|
||||
|
||||
set digest [$master debug digest]
|
||||
assert {$digest eq [$replica debug digest]}
|
||||
assert {$digest eq [$sub_replica debug digest]}
|
||||
wait_for_condition 10 100 {
|
||||
$digest eq [$replica debug digest] &&
|
||||
$digest eq [$sub_replica debug digest]
|
||||
} else {
|
||||
fail "Replica and sub-replica didn't sync after master restart in time..."
|
||||
}
|
||||
}
|
||||
|
||||
test "PSYNC2: Full resync after Master restart when too many key expired" {
|
||||
|
|
|
|||
|
|
@ -62,6 +62,16 @@ start_server {} {
|
|||
test "All replicas share one global replication buffer rdbchannel=$rdbchannel" {
|
||||
set before_used [s used_memory]
|
||||
populate 1024 "" 1024 ; # Write extra 1M data
|
||||
|
||||
# In case we are running with IO-threads we need to give a few cycles
|
||||
# for IO-threads to start sending the cmd stream. If we don't do that
|
||||
# the checks related to the repl_buf_mem will be incorrect as the buffer
|
||||
# will still be full with the above 1Mb data.
|
||||
set iothreads [s io_threads_active]
|
||||
if {$iothreads && $rdbchannel == "yes"} {
|
||||
after 1000
|
||||
}
|
||||
|
||||
# New data uses 1M memory, but all replicas use only one
|
||||
# replication buffer, so all replicas output memory is not
|
||||
# more than double of replication buffer.
|
||||
|
|
|
|||
331
tests/integration/replication-iothreads.tcl
Normal file
331
tests/integration/replication-iothreads.tcl
Normal file
|
|
@ -0,0 +1,331 @@
|
|||
#
|
||||
# Copyright (c) 2025-Present, Redis Ltd.
|
||||
# All rights reserved.
|
||||
#
|
||||
# Licensed under your choice of (a) the Redis Source Available License 2.0
|
||||
# (RSALv2); or (b) the Server Side Public License v1 (SSPLv1); or (c) the
|
||||
# GNU Affero General Public License v3 (AGPLv3).
|
||||
#
|
||||
|
||||
# Tests for master and slave clients in IO threads
|
||||
|
||||
# Helper function to get master client IO thread from INFO replication
|
||||
proc get_master_client_io_thread {r} {
|
||||
return [status $r master_client_io_thread]
|
||||
}
|
||||
|
||||
# Helper function to get slave client IO thread from INFO replication
|
||||
proc get_slave_client_io_thread {r slave_idx} {
|
||||
set info [$r info replication]
|
||||
set lines [split $info "\r\n"]
|
||||
|
||||
foreach line $lines {
|
||||
if {[string match "slave${slave_idx}:*" $line]} {
|
||||
# Parse the slave line to extract io-thread value
|
||||
set parts [split $line ","]
|
||||
foreach part $parts {
|
||||
if {[string match "*io-thread=*" $part]} {
|
||||
set kv [split $part "="]
|
||||
assert_equal [llength $kv] 2
|
||||
return [lindex $kv 1]
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return -1
|
||||
}
|
||||
|
||||
start_server {overrides {io-threads 4 save ""} tags {"iothreads repl network external:skip"}} {
|
||||
start_server {overrides {io-threads 4 save ""}} {
|
||||
set master [srv 0 client]
|
||||
set master_host [srv 0 host]
|
||||
set master_port [srv 0 port]
|
||||
set slave [srv -1 client]
|
||||
|
||||
test {Setup slave} {
|
||||
$slave slaveof $master_host $master_port
|
||||
wait_for_condition 1000 100 {
|
||||
[s -1 master_link_status] eq {up}
|
||||
} else {
|
||||
fail "Replication not started."
|
||||
}
|
||||
}
|
||||
|
||||
test {Master client moves to IO thread after sync complete} {
|
||||
# Check master client thread assignment (master client is on slave side)
|
||||
wait_for_condition 100 100 {
|
||||
[get_master_client_io_thread $slave] > 0
|
||||
} else {
|
||||
fail "Master client was not assigned to IO thread"
|
||||
}
|
||||
}
|
||||
|
||||
test {Slave client assignment to IO threads} {
|
||||
# Verify slave is connected and online
|
||||
wait_replica_online $master 0
|
||||
|
||||
# Slave client is connected - force a write so that it's assigned to an
|
||||
# IO thread.
|
||||
assert_equal "OK" [$master set x x]
|
||||
|
||||
# Check slave client thread assignment
|
||||
wait_for_condition 50 100 {
|
||||
[get_slave_client_io_thread $master 0] > 0
|
||||
} else {
|
||||
fail "Slave client was not assigned to IO thread"
|
||||
}
|
||||
}
|
||||
|
||||
test {WAIT command works with master/slave in IO threads} {
|
||||
# Test basic WAIT functionality
|
||||
$master set wait_test_key1 value1
|
||||
$master set wait_test_key2 value2
|
||||
$master incr wait_counter
|
||||
|
||||
assert {[$master wait 1 2000] == 1}
|
||||
|
||||
# Verify data reached slave
|
||||
wait_for_condition 10 100 {
|
||||
[$slave get wait_test_key1] eq "value1" &&
|
||||
[$slave get wait_test_key2] eq "value2" &&
|
||||
[$slave get wait_counter] eq "1"
|
||||
} else {
|
||||
fail "commands not propagated to IO thread slave in time"
|
||||
}
|
||||
}
|
||||
|
||||
test {Replication data integrity with IO threads} {
|
||||
# Generate significant replication traffic
|
||||
for {set i 0} {$i < 100} {incr i} {
|
||||
$master set bulk_key_$i [string repeat "data" 10]
|
||||
$master lpush bulk_list element_$i
|
||||
$master zadd bulk_zset $i member_$i
|
||||
if {$i % 20 == 0} {
|
||||
# Periodically verify WAIT works
|
||||
assert {[$master wait 1 2000] == 1}
|
||||
}
|
||||
}
|
||||
|
||||
# Final verification
|
||||
wait_for_condition 50 100 {
|
||||
[$slave get bulk_key_99] eq [string repeat "data" 10] &&
|
||||
[$slave llen bulk_list] eq 100 &&
|
||||
[$slave zcard bulk_zset] eq 100
|
||||
} else {
|
||||
fail "Replication data integrity failed"
|
||||
}
|
||||
}
|
||||
|
||||
test {WAIT timeout behavior with slave in IO thread} {
|
||||
set slave_pid [srv -1 pid]
|
||||
|
||||
# Pause slave to test timeout
|
||||
pause_process $slave_pid
|
||||
|
||||
# Should timeout and return 0 acks
|
||||
$master set timeout_test_key timeout_value
|
||||
set start_time [clock milliseconds]
|
||||
assert {[$master wait 1 2000] == 0}
|
||||
set elapsed [expr {[clock milliseconds] - $start_time}]
|
||||
assert_range $elapsed 2000 2500
|
||||
|
||||
# Resume and verify recovery
|
||||
resume_process $slave_pid
|
||||
|
||||
assert {[$master wait 1 2000] == 1}
|
||||
|
||||
# Verify data reached slave after resume
|
||||
wait_for_condition 10 100 {
|
||||
[$slave get timeout_test_key] eq "timeout_value"
|
||||
} else {
|
||||
fail "commands not propagated to IO thread slave in time"
|
||||
}
|
||||
}
|
||||
|
||||
test {Network interruption recovery with IO threads} {
|
||||
# Generate traffic before interruption
|
||||
for {set i 0} {$i < 50} {incr i} {
|
||||
$master set pre_interrupt_$i value_$i
|
||||
}
|
||||
|
||||
# Simulate network interruption
|
||||
pause_process $slave_pid
|
||||
|
||||
# Continue writing during interruption
|
||||
for {set i 0} {$i < 50} {incr i} {
|
||||
$master set during_interrupt_$i value_$i
|
||||
}
|
||||
|
||||
# WAIT should timeout
|
||||
assert {[$master wait 1 2000] == 0}
|
||||
|
||||
# Resume slave and verify recovery
|
||||
resume_process $slave_pid
|
||||
|
||||
# Verify WAIT works again
|
||||
assert {[$master wait 1 2000] == 1}
|
||||
|
||||
# Wait for reconnection and catch up
|
||||
wait_for_condition 100 100 {
|
||||
[$slave get during_interrupt_49] eq "value_49"
|
||||
} else {
|
||||
fail "Slave didn't catch up after network recovery"
|
||||
}
|
||||
|
||||
$master set post_recovery_test recovery_value
|
||||
wait_for_condition 10 100 {
|
||||
[$slave get post_recovery_test] eq "recovery_value"
|
||||
} else {
|
||||
fail "Slave didn't receive 'set post_recovery_test' command"
|
||||
}
|
||||
|
||||
# Check thread assignments after recovery
|
||||
wait_for_condition 100 100 {
|
||||
[get_master_client_io_thread $slave] > 0
|
||||
} else {
|
||||
fail "Slave client not assigned to IO thread after recovery"
|
||||
}
|
||||
}
|
||||
|
||||
test {Replication reconnection cycles with IO threads} {
|
||||
# Test multiple disconnect/reconnect cycles
|
||||
for {set cycle 0} {$cycle < 3} {incr cycle} {
|
||||
# Generate traffic
|
||||
for {set i 0} {$i < 20} {incr i} {
|
||||
$master set cycle_${cycle}_key_$i value_$i
|
||||
}
|
||||
|
||||
assert {[$master wait 1 2000] == 1}
|
||||
|
||||
# Record thread assignments during cycle
|
||||
set master_thread [get_master_client_io_thread $slave]
|
||||
set slave_thread [get_slave_client_io_thread $master 0]
|
||||
puts "Cycle $cycle - Master thread: $master_thread, Slave thread: $slave_thread"
|
||||
|
||||
# Disconnect and reconnect (except last cycle)
|
||||
if {$cycle < 2} {
|
||||
$slave replicaof no one
|
||||
after 100
|
||||
$slave replicaof $master_host $master_port
|
||||
wait_for_sync $slave
|
||||
}
|
||||
}
|
||||
|
||||
# Verify final state
|
||||
wait_for_condition 10 100 {
|
||||
[$slave get cycle_2_key_19] eq "value_19"
|
||||
} else {
|
||||
fail "last command not propagated to IO thread slave in time"
|
||||
}
|
||||
}
|
||||
|
||||
test {INFO replication shows correct thread information} {
|
||||
# Test INFO replication output format
|
||||
set info [$master info replication]
|
||||
|
||||
# Should show master role
|
||||
assert_match "*role:master*" $info
|
||||
|
||||
# Should have slave thread information
|
||||
assert_match "*slave0:*io-thread=*" $info
|
||||
|
||||
# Test we can parse the thread ID
|
||||
set slave_thread [get_slave_client_io_thread $master 0]
|
||||
assert_morethan $slave_thread 0
|
||||
|
||||
# Test master client thread info
|
||||
set slave_info [$slave info replication]
|
||||
assert_match "*role:slave*" $slave_info
|
||||
assert_match "*master_client_io_thread:*" $slave_info
|
||||
|
||||
set master_thread [get_master_client_io_thread $slave]
|
||||
assert_morethan $master_thread 0
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
start_server {overrides {io-threads 4 save ""} tags {"iothreads repl network external:skip"}} {
|
||||
start_server {overrides {io-threads 4 save ""}} {
|
||||
start_server {overrides {io-threads 4 save ""}} {
|
||||
start_server {overrides {io-threads 4 save ""}} {
|
||||
set master [srv 0 client]
|
||||
set master_host [srv 0 host]
|
||||
set master_port [srv 0 port]
|
||||
set slave1 [srv -1 client]
|
||||
set slave2 [srv -2 client]
|
||||
set slave3 [srv -3 client]
|
||||
|
||||
test {Multiple slaves across IO threads} {
|
||||
# Setup replication for all slaves
|
||||
$slave1 replicaof $master_host $master_port
|
||||
$slave2 replicaof $master_host $master_port
|
||||
$slave3 replicaof $master_host $master_port
|
||||
|
||||
# Wait for all slaves to be online
|
||||
wait_replica_online $master 0
|
||||
wait_replica_online $master 1
|
||||
wait_replica_online $master 2
|
||||
|
||||
set iterations 5
|
||||
while {[incr iterations -1] >= 0} {
|
||||
# Slave clients are connected - force a write so that they are assigned
|
||||
# to IO threads.
|
||||
assert_equal "OK" [$master set x x]
|
||||
|
||||
wait_for_condition 10 100 {
|
||||
([get_slave_client_io_thread $master 0] > 0) &&
|
||||
([get_slave_client_io_thread $master 1] > 0) &&
|
||||
([get_slave_client_io_thread $master 2] > 0)
|
||||
} else {
|
||||
continue
|
||||
}
|
||||
|
||||
break
|
||||
}
|
||||
if {$iterations < 0} {
|
||||
fail "Replicas failed to be assigned to IO threads in time"
|
||||
}
|
||||
|
||||
# Test concurrent replication to all slaves
|
||||
for {set i 0} {$i < 200} {incr i} {
|
||||
$master set multi_key_$i value_$i
|
||||
if {$i % 50 == 0} {
|
||||
assert {[$master wait 3 2000] == 3}
|
||||
}
|
||||
}
|
||||
|
||||
# Final verification all slaves got data
|
||||
wait_for_condition 50 100 {
|
||||
[$slave1 get multi_key_199] eq "value_199" &&
|
||||
[$slave2 get multi_key_199] eq "value_199" &&
|
||||
[$slave3 get multi_key_199] eq "value_199"
|
||||
} else {
|
||||
fail "Multi-slave replication failed"
|
||||
}
|
||||
}
|
||||
|
||||
test {WAIT with multiple slaves in IO threads} {
|
||||
# Test various WAIT scenarios
|
||||
$master set wait_multi_test1 value1
|
||||
assert {[$master wait 3 2000] == 3}
|
||||
|
||||
$master set wait_multi_test2 value2
|
||||
assert {[$master wait 2 2000] >= 2}
|
||||
|
||||
$master set wait_multi_test3 value3
|
||||
assert {[$master wait 1 2000] >= 1}
|
||||
|
||||
# Verify all slaves have the data
|
||||
wait_for_condition 10 100 {
|
||||
[$slave1 get wait_multi_test3] eq "value3" &&
|
||||
[$slave2 get wait_multi_test3] eq "value3" &&
|
||||
[$slave3 get wait_multi_test3] eq "value3"
|
||||
} else {
|
||||
fail "commands not propagated to io thread slaves in time"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -1247,7 +1247,7 @@ test {Kill rdb child process if its dumping RDB is not useful} {
|
|||
# Slave2 disconnect with master
|
||||
$slave2 slaveof no one
|
||||
# Should kill child
|
||||
wait_for_condition 100 10 {
|
||||
wait_for_condition 1000 10 {
|
||||
[s 0 rdb_bgsave_in_progress] eq 0
|
||||
} else {
|
||||
fail "can't kill rdb child"
|
||||
|
|
|
|||
|
|
@ -55,8 +55,13 @@ test "Master reboot in very short time" {
|
|||
kill_instance redis $master_id
|
||||
reboot_instance redis $master_id
|
||||
|
||||
set max_tries 1000
|
||||
if {$::tsan} {
|
||||
set max_tries 5000
|
||||
}
|
||||
|
||||
foreach_sentinel_id id {
|
||||
wait_for_condition 1000 100 {
|
||||
wait_for_condition $max_tries 100 {
|
||||
[lindex [S $id SENTINEL GET-MASTER-ADDR-BY-NAME mymaster] 1] != $old_port
|
||||
} else {
|
||||
fail "At least one Sentinel did not receive failover info"
|
||||
|
|
@ -104,4 +109,4 @@ test "The old master eventually gets reconfigured as a slave" {
|
|||
} else {
|
||||
fail "Old master not reconfigured as slave of new master"
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -126,6 +126,10 @@ proc assert_refcount_morethan {key ref} {
|
|||
# max retries and delay between retries. Otherwise the 'elsescript' is
|
||||
# executed.
|
||||
proc wait_for_condition {maxtries delay e _else_ elsescript} {
|
||||
if {$_else_ ne "else"} {
|
||||
error "$_else_ must be equal to \"else\""
|
||||
}
|
||||
|
||||
while {[incr maxtries -1] >= 0} {
|
||||
set errcode [catch {uplevel 1 [list expr $e]} result]
|
||||
if {$errcode == 0} {
|
||||
|
|
@ -136,7 +140,7 @@ proc wait_for_condition {maxtries delay e _else_ elsescript} {
|
|||
after $delay
|
||||
}
|
||||
if {$maxtries == -1} {
|
||||
set errcode [catch [uplevel 1 $elsescript] result]
|
||||
set errcode [catch {uplevel 1 $elsescript} result]
|
||||
return -code $errcode $result
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -129,7 +129,14 @@ proc waitForBgrewriteaof r {
|
|||
}
|
||||
|
||||
proc wait_for_sync r {
|
||||
wait_for_condition 50 100 {
|
||||
set maxtries 50
|
||||
# tsan adds significant overhead to the execution time, so we increase the
|
||||
# wait time here JIC
|
||||
if {$::tsan} {
|
||||
set maxtries 100
|
||||
}
|
||||
|
||||
wait_for_condition $maxtries 100 {
|
||||
[status $r master_link_status] eq "up"
|
||||
} else {
|
||||
fail "replica didn't sync in time"
|
||||
|
|
@ -137,6 +144,12 @@ proc wait_for_sync r {
|
|||
}
|
||||
|
||||
proc wait_replica_online {r {replica_id 0} {maxtries 50} {delay 100}} {
|
||||
# tsan adds significant overhead to the execution time, so we increase the
|
||||
# wait time here JIC
|
||||
if {$::tsan} {
|
||||
set maxtries [expr {$maxtries * 2}]
|
||||
}
|
||||
|
||||
wait_for_condition $maxtries $delay {
|
||||
[string match "*slave$replica_id:*,state=online*" [$r info replication]]
|
||||
} else {
|
||||
|
|
@ -145,7 +158,13 @@ proc wait_replica_online {r {replica_id 0} {maxtries 50} {delay 100}} {
|
|||
}
|
||||
|
||||
proc wait_for_ofs_sync {r1 r2} {
|
||||
wait_for_condition 50 100 {
|
||||
set maxtries 50
|
||||
# tsan adds significant overhead to the execution time, so we increase the
|
||||
# wait time here JIC
|
||||
if {$::tsan} {
|
||||
set maxtries 100
|
||||
}
|
||||
wait_for_condition $maxtries 100 {
|
||||
[status $r1 master_repl_offset] eq [status $r2 master_repl_offset]
|
||||
} else {
|
||||
fail "replica offset didn't match in time"
|
||||
|
|
@ -722,7 +741,23 @@ proc resume_process {pid} {
|
|||
puts [get_proc_job $pid]
|
||||
fail "process was not stopped"
|
||||
}
|
||||
exec kill -SIGCONT $pid
|
||||
|
||||
set max_attempts 10
|
||||
set attempt 0
|
||||
while {($attempt < $max_attempts) && [string match "T*" [exec ps -o state= -p $pid]]} {
|
||||
exec kill -SIGCONT $pid
|
||||
|
||||
incr attempt
|
||||
after 100
|
||||
}
|
||||
|
||||
wait_for_condition 50 1000 {
|
||||
[string match "R*" [exec ps -o state= -p $pid]] ||
|
||||
[string match "S*" [exec ps -o state= -p $pid]]
|
||||
} else {
|
||||
puts [exec ps j $pid]
|
||||
fail "process was not resumed"
|
||||
}
|
||||
}
|
||||
|
||||
proc cmdrstat {cmd r} {
|
||||
|
|
|
|||
|
|
@ -596,8 +596,13 @@ tags "modules external:skip" {
|
|||
|
||||
assert_equal [$master get k1] 1
|
||||
assert_equal [$master ttl k1] -1
|
||||
assert_equal [$replica get k1] 1
|
||||
assert_equal [$replica ttl k1] -1
|
||||
|
||||
wait_for_condition 50 100 {
|
||||
[$replica get k1] eq 1 &&
|
||||
[$replica ttl k1] eq -1
|
||||
} else {
|
||||
fail "failed RM_Call of expired key propagation"
|
||||
}
|
||||
}
|
||||
|
||||
test {module notification on set} {
|
||||
|
|
|
|||
Loading…
Reference in a new issue