Initial breakdown of struct client

This commit is contained in:
Yangbo Long 2026-05-03 22:30:15 -04:00
parent 2432f55278
commit ca6071a57a
17 changed files with 715 additions and 636 deletions

View file

@ -1976,7 +1976,7 @@ int ACLShouldKillPubsubClient(client *c, list *upcoming) {
/* Check for pattern violations. */
dictIterator di;
dictEntry *de;
dictInitIterator(&di, c->pubsub_patterns);
dictInitIterator(&di, c->pubsub_data->pubsub_patterns);
while (!kill && ((de = dictNext(&di)) != NULL)) {
o = dictGetKey(de);
int res = ACLCheckChannelAgainstList(upcoming, o->ptr, sdslen(o->ptr), 1);
@ -1987,7 +1987,7 @@ int ACLShouldKillPubsubClient(client *c, list *upcoming) {
/* Check for channel violations. */
if (!kill) {
/* Check for global channels violation. */
dictInitIterator(&di, c->pubsub_channels);
dictInitIterator(&di, c->pubsub_data->pubsub_channels);
while (!kill && ((de = dictNext(&di)) != NULL)) {
o = dictGetKey(de);
@ -1998,7 +1998,7 @@ int ACLShouldKillPubsubClient(client *c, list *upcoming) {
}
if (!kill) {
/* Check for shard channels violation. */
dictInitIterator(&di, c->pubsubshard_channels);
dictInitIterator(&di, c->pubsub_data->pubsubshard_channels);
while (!kill && ((de = dictNext(&di)) != NULL)) {
o = dictGetKey(de);
int res = ACLCheckChannelAgainstList(upcoming, o->ptr, sdslen(o->ptr), 0);

View file

@ -1471,7 +1471,8 @@ struct client *createAOFClient(void) {
/* We set the fake client as a slave waiting for the synchronization
* so that Redis will not try to send replies to this client. */
c->replstate = SLAVE_STATE_WAIT_BGSAVE_START;
initClientReplicationData(c);
c->repl_data->replstate = SLAVE_STATE_WAIT_BGSAVE_START;
return c;
}

View file

@ -59,13 +59,17 @@ static void moduleUnblockClientOnKey(client *c, robj *key);
static void releaseBlockedEntry(client *c, dictEntry *de, int remove_key);
void initClientBlockingState(client *c) {
c->bstate.btype = BLOCKED_NONE;
c->bstate.timeout = 0;
c->bstate.keys = dictCreate(&objectKeyHeapPointerValueDictType);
c->bstate.numreplicas = 0;
c->bstate.reploffset = 0;
c->bstate.unblock_on_nokey = 0;
c->bstate.async_rm_call_handle = NULL;
if (c->bstate) return;
c->bstate = zcalloc(sizeof(blockingState));
c->bstate->btype = BLOCKED_NONE;
c->bstate->keys = dictCreate(&objectKeyHeapPointerValueDictType);
}
void freeClientBlockingState(client *c) {
if (!c->bstate) return;
dictRelease(c->bstate->keys);
zfree(c->bstate);
c->bstate = NULL;
}
/* Block a client for the specific operation type. Once the CLIENT_BLOCKED
@ -80,7 +84,8 @@ void blockClient(client *c, int btype) {
btype != BLOCKED_POSTPONE_TRIM));
c->flags |= CLIENT_BLOCKED;
c->bstate.btype = btype;
initClientBlockingState(c);
c->bstate->btype = btype;
if (!(c->flags & CLIENT_MODULE)) server.blocked_clients++; /* We count blocked client stats on regular clients and not on module clients */
server.blocked_clients_by_type[btype]++;
addClientToTimeoutTable(c);
@ -182,21 +187,21 @@ void queueClientForReprocessing(client *c) {
/* Unblock a client calling the right function depending on the kind
* of operation the client is blocking for. */
void unblockClient(client *c, int queue_for_reprocessing) {
if (c->bstate.btype == BLOCKED_LIST ||
c->bstate.btype == BLOCKED_ZSET ||
c->bstate.btype == BLOCKED_STREAM) {
if (c->bstate->btype == BLOCKED_LIST ||
c->bstate->btype == BLOCKED_ZSET ||
c->bstate->btype == BLOCKED_STREAM) {
unblockClientWaitingData(c);
} else if (c->bstate.btype == BLOCKED_WAIT || c->bstate.btype == BLOCKED_WAITAOF) {
} else if (c->bstate->btype == BLOCKED_WAIT || c->bstate->btype == BLOCKED_WAITAOF) {
unblockClientWaitingReplicas(c);
} else if (c->bstate.btype == BLOCKED_MODULE) {
} else if (c->bstate->btype == BLOCKED_MODULE) {
if (moduleClientIsBlockedOnKeys(c)) unblockClientWaitingData(c);
unblockClientFromModule(c);
} else if (c->bstate.btype == BLOCKED_POSTPONE || c->bstate.btype == BLOCKED_POSTPONE_TRIM) {
} else if (c->bstate->btype == BLOCKED_POSTPONE || c->bstate->btype == BLOCKED_POSTPONE_TRIM) {
listDelNode(server.postponed_clients,c->postponed_list_node);
c->postponed_list_node = NULL;
} else if (c->bstate.btype == BLOCKED_SHUTDOWN) {
} else if (c->bstate->btype == BLOCKED_SHUTDOWN) {
/* No special cleanup. */
} else if (c->bstate.btype == BLOCKED_LAZYFREE) {
} else if (c->bstate->btype == BLOCKED_LAZYFREE) {
/* No special cleanup. */
} else {
serverPanic("Unknown btype in unblockClient().");
@ -206,10 +211,10 @@ void unblockClient(client *c, int queue_for_reprocessing) {
/* Clear the flags, and put the client in the unblocked list so that
* we'll process new commands in its query buffer ASAP. */
if (!(c->flags & CLIENT_MODULE)) server.blocked_clients--; /* We count blocked client stats on regular clients and not on module clients */
server.blocked_clients_by_type[c->bstate.btype]--;
server.blocked_clients_by_type[c->bstate->btype]--;
c->flags &= ~CLIENT_BLOCKED;
c->bstate.btype = BLOCKED_NONE;
c->bstate.unblock_on_nokey = 0;
c->bstate->btype = BLOCKED_NONE;
c->bstate->unblock_on_nokey = 0;
removeClientFromTimeoutTable(c);
if (queue_for_reprocessing) queueClientForReprocessing(c);
}
@ -217,15 +222,15 @@ void unblockClient(client *c, int queue_for_reprocessing) {
/* Check if the specified client can be safely timed out using
* unblockClientOnTimeout(). */
int blockedClientMayTimeout(client *c) {
if (c->bstate.btype == BLOCKED_MODULE) {
if (c->bstate->btype == BLOCKED_MODULE) {
return moduleBlockedClientMayTimeout(c);
}
if (c->bstate.btype == BLOCKED_LIST ||
c->bstate.btype == BLOCKED_ZSET ||
c->bstate.btype == BLOCKED_STREAM ||
c->bstate.btype == BLOCKED_WAIT ||
c->bstate.btype == BLOCKED_WAITAOF)
if (c->bstate->btype == BLOCKED_LIST ||
c->bstate->btype == BLOCKED_ZSET ||
c->bstate->btype == BLOCKED_STREAM ||
c->bstate->btype == BLOCKED_WAIT ||
c->bstate->btype == BLOCKED_WAITAOF)
{
return 1;
}
@ -236,24 +241,24 @@ int blockedClientMayTimeout(client *c) {
* send it a reply of some kind. After this function is called,
* unblockClient() will be called with the same client as argument. */
void replyToBlockedClientTimedOut(client *c) {
if (c->bstate.btype == BLOCKED_LAZYFREE) {
if (c->bstate->btype == BLOCKED_LAZYFREE) {
/* SFLUSH: reply with empty array, FLUSH*: reply with OK */
if (c->cmd && c->cmd->proc == sflushCommand)
addReplyArrayLen(c, 0);
else
addReply(c, shared.ok); /* No reason lazy-free to fail */
} else if (c->bstate.btype == BLOCKED_LIST ||
c->bstate.btype == BLOCKED_ZSET ||
c->bstate.btype == BLOCKED_STREAM) {
} else if (c->bstate->btype == BLOCKED_LIST ||
c->bstate->btype == BLOCKED_ZSET ||
c->bstate->btype == BLOCKED_STREAM) {
addReplyNullArray(c);
updateStatsOnUnblock(c, 0, 0, 0);
} else if (c->bstate.btype == BLOCKED_WAIT) {
addReplyLongLong(c,replicationCountAcksByOffset(c->bstate.reploffset));
} else if (c->bstate.btype == BLOCKED_WAITAOF) {
} else if (c->bstate->btype == BLOCKED_WAIT) {
addReplyLongLong(c,replicationCountAcksByOffset(c->bstate->reploffset));
} else if (c->bstate->btype == BLOCKED_WAITAOF) {
addReplyArrayLen(c,2);
addReplyLongLong(c,server.fsynced_reploff >= c->bstate.reploffset);
addReplyLongLong(c,replicationCountAOFAcksByOffset(c->bstate.reploffset));
} else if (c->bstate.btype == BLOCKED_MODULE) {
addReplyLongLong(c,server.fsynced_reploff >= c->bstate->reploffset);
addReplyLongLong(c,replicationCountAOFAcksByOffset(c->bstate->reploffset));
} else if (c->bstate->btype == BLOCKED_MODULE) {
moduleBlockedClientTimedOut(c);
} else {
serverPanic("Unknown btype in replyToBlockedClientTimedOut().");
@ -269,7 +274,7 @@ void replyToClientsBlockedOnShutdown(void) {
listRewind(server.clients, &li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
if (c->flags & CLIENT_BLOCKED && c->bstate.btype == BLOCKED_SHUTDOWN) {
if (c->flags & CLIENT_BLOCKED && c->bstate->btype == BLOCKED_SHUTDOWN) {
c->duration = 0;
addReplyError(c, "Errors trying to SHUTDOWN. Check logs.");
unblockClient(c, 1);
@ -297,10 +302,10 @@ void disconnectAllBlockedClients(void) {
* command processing will start from scratch, and the command will
* be either executed or rejected. (unlike LIST blocked clients for
* which the command is already in progress in a way. */
if (c->bstate.btype == BLOCKED_POSTPONE || c->bstate.btype == BLOCKED_POSTPONE_TRIM)
if (c->bstate->btype == BLOCKED_POSTPONE || c->bstate->btype == BLOCKED_POSTPONE_TRIM)
continue;
if (c->bstate.btype == BLOCKED_LAZYFREE) {
if (c->bstate->btype == BLOCKED_LAZYFREE) {
/* SFLUSH: reply with empty array, FLUSH*: reply with OK */
if (c->cmd && c->cmd->proc == sflushCommand)
addReplyArrayLen(c, 0);
@ -398,15 +403,16 @@ void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeo
list *l;
int j;
initClientBlockingState(c);
if (!(c->flags & CLIENT_REEXECUTING_COMMAND)) {
/* If the client is re-processing the command, we do not set the timeout
* because we need to retain the client's original timeout. */
c->bstate.timeout = timeout;
c->bstate->timeout = timeout;
}
for (j = 0; j < numkeys; j++) {
/* If the key already exists in the dictionary ignore it. */
if (!(client_blocked_entry = dictAddRaw(c->bstate.keys,keys[j],NULL))) {
if (!(client_blocked_entry = dictAddRaw(c->bstate->keys,keys[j],NULL))) {
continue;
}
incrRefCount(keys[j]);
@ -423,7 +429,7 @@ void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeo
l = dictGetVal(db_blocked_existing_entry);
}
listAddNodeTail(l,c);
dictSetVal(c->bstate.keys,client_blocked_entry,listLast(l));
dictSetVal(c->bstate->keys,client_blocked_entry,listLast(l));
/* We need to add the key to blocking_keys_unblock_on_nokey, if the client
* wants to be awakened if key is deleted (like XREADGROUP) */
@ -437,7 +443,7 @@ void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeo
}
}
}
c->bstate.unblock_on_nokey = unblock_on_nokey;
c->bstate->unblock_on_nokey = unblock_on_nokey;
/* Currently we assume key blocking will require reprocessing the command.
* However in case of modules, they have a different way to handle the reprocessing
* which does not require setting the pending command flag */
@ -452,16 +458,16 @@ static void unblockClientWaitingData(client *c) {
dictEntry *de;
dictIterator di;
if (dictSize(c->bstate.keys) == 0)
if (dictSize(c->bstate->keys) == 0)
return;
dictInitIterator(&di, c->bstate.keys);
dictInitIterator(&di, c->bstate->keys);
/* The client may wait for multiple keys, so unblock it for every key. */
while((de = dictNext(&di)) != NULL) {
releaseBlockedEntry(c, de, 0);
}
dictResetIterator(&di);
dictEmpty(c->bstate.keys, NULL);
dictEmpty(c->bstate->keys, NULL);
}
static blocking_type getBlockedTypeByType(int type) {
@ -563,7 +569,7 @@ static void releaseBlockedEntry(client *c, dictEntry *de, int remove_key) {
if (listLength(l) == 0) {
dictDelete(c->db->blocking_keys, key);
dictDelete(c->db->blocking_keys_unblock_on_nokey,key);
} else if (c->bstate.unblock_on_nokey) {
} else if (c->bstate->unblock_on_nokey) {
unblock_on_nokey_entry = dictFind(c->db->blocking_keys_unblock_on_nokey,key);
/* it is not possible to have a client blocked on nokey with no matching entry */
serverAssertWithInfo(c,key,unblock_on_nokey_entry != NULL);
@ -573,7 +579,7 @@ static void releaseBlockedEntry(client *c, dictEntry *de, int remove_key) {
}
}
if (remove_key)
dictDelete(c->bstate.keys, key);
dictDelete(c->bstate->keys, key);
}
void signalKeyAsReady(redisDb *db, robj *key, int type) {
@ -612,11 +618,11 @@ static void handleClientsBlockedOnKey(readyList *rl) {
* module is trying to accomplish right now.
* 3. In case of XREADGROUP call we will want to unblock on any change in object type
* or in case the key was deleted, since the group is no longer valid. */
if ((o != NULL && (receiver->bstate.btype == getBlockedTypeByType(o->type))) ||
(o != NULL && (receiver->bstate.btype == BLOCKED_MODULE)) ||
(receiver->bstate.unblock_on_nokey))
if ((o != NULL && (receiver->bstate->btype == getBlockedTypeByType(o->type))) ||
(o != NULL && (receiver->bstate->btype == BLOCKED_MODULE)) ||
(receiver->bstate->unblock_on_nokey))
{
if (receiver->bstate.btype != BLOCKED_MODULE)
if (receiver->bstate->btype != BLOCKED_MODULE)
unblockClientOnKey(receiver, rl->key);
else
moduleUnblockClientOnKey(receiver, rl->key);
@ -627,19 +633,21 @@ static void handleClientsBlockedOnKey(readyList *rl) {
/* block a client due to wait command */
void blockForReplication(client *c, mstime_t timeout, long long offset, long numreplicas) {
c->bstate.timeout = timeout;
c->bstate.reploffset = offset;
c->bstate.numreplicas = numreplicas;
initClientBlockingState(c);
c->bstate->timeout = timeout;
c->bstate->reploffset = offset;
c->bstate->numreplicas = numreplicas;
listAddNodeHead(server.clients_waiting_acks,c);
blockClient(c,BLOCKED_WAIT);
}
/* block a client due to waitaof command */
void blockForAofFsync(client *c, mstime_t timeout, long long offset, int numlocal, long numreplicas) {
c->bstate.timeout = timeout;
c->bstate.reploffset = offset;
c->bstate.numreplicas = numreplicas;
c->bstate.numlocal = numlocal;
initClientBlockingState(c);
c->bstate->timeout = timeout;
c->bstate->reploffset = offset;
c->bstate->numreplicas = numreplicas;
c->bstate->numlocal = numlocal;
listAddNodeHead(server.clients_waiting_acks,c);
blockClient(c,BLOCKED_WAITAOF);
}
@ -649,7 +657,8 @@ void blockForAofFsync(client *c, mstime_t timeout, long long offset, int numloca
* when the it is ready to accept them. */
void blockPostponeClientWithType(client *c, int btype) {
serverAssert(btype == BLOCKED_POSTPONE || btype == BLOCKED_POSTPONE_TRIM);
c->bstate.timeout = 0;
initClientBlockingState(c);
c->bstate->timeout = 0;
blockClient(c, btype);
listAddNodeTail(server.postponed_clients, c);
c->postponed_list_node = listLast(server.postponed_clients);
@ -674,14 +683,14 @@ void blockClientShutdown(client *c) {
static void unblockClientOnKey(client *c, robj *key) {
dictEntry *de;
de = dictFind(c->bstate.keys, key);
de = dictFind(c->bstate->keys, key);
releaseBlockedEntry(c, de, 1);
/* Only in case of blocking API calls, we might be blocked on several keys.
however we should force unblock the entire blocking keys */
serverAssert(c->bstate.btype == BLOCKED_STREAM ||
c->bstate.btype == BLOCKED_LIST ||
c->bstate.btype == BLOCKED_ZSET);
serverAssert(c->bstate->btype == BLOCKED_STREAM ||
c->bstate->btype == BLOCKED_LIST ||
c->bstate->btype == BLOCKED_ZSET);
/* We need to unblock the client before calling processCommandAndResetClient
* because it checks the CLIENT_BLOCKED flag */
@ -745,7 +754,7 @@ static void moduleUnblockClientOnKey(client *c, robj *key) {
* command with timeout reply. */
void unblockClientOnTimeout(client *c) {
/* The client has been unlocked (in the moduleUnblocked list), return ASAP. */
if (c->bstate.btype == BLOCKED_MODULE && isModuleClientUnblocked(c)) return;
if (c->bstate->btype == BLOCKED_MODULE && isModuleClientUnblocked(c)) return;
replyToBlockedClientTimedOut(c);
if (c->flags & CLIENT_PENDING_COMMAND)

View file

@ -1220,7 +1220,7 @@ clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, in
/* If CLIENT_MULTI flag is not set EXEC is just going to return an
* error. */
if (!(c->flags & CLIENT_MULTI)) return myself;
ms = &c->mstate;
ms = c->mstate;
} else {
/* In order to have a single codepath create a fake Multi State
* structure if the client is not in MULTI/EXEC state, this way
@ -1420,7 +1420,7 @@ clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, in
* node is a slave and the request is about a hash slot our master
* is serving, we can reply without redirection. */
int is_write_command = (cmd_flags & CMD_WRITE) ||
(c->cmd->proc == execCommand && (c->mstate.cmd_flags & CMD_WRITE));
(c->cmd->proc == execCommand && (c->mstate->cmd_flags & CMD_WRITE));
if (((c->flags & CLIENT_READONLY) || pubsubshard_included) &&
!is_write_command &&
clusterNodeIsSlave(myself) &&
@ -1495,10 +1495,10 @@ void clusterRedirectClient(client *c, clusterNode *n, int hashslot, int error_co
int clusterRedirectBlockedClientIfNeeded(client *c) {
clusterNode *myself = getMyClusterNode();
if (c->flags & CLIENT_BLOCKED &&
(c->bstate.btype == BLOCKED_LIST ||
c->bstate.btype == BLOCKED_ZSET ||
c->bstate.btype == BLOCKED_STREAM ||
c->bstate.btype == BLOCKED_MODULE))
(c->bstate->btype == BLOCKED_LIST ||
c->bstate->btype == BLOCKED_ZSET ||
c->bstate->btype == BLOCKED_STREAM ||
c->bstate->btype == BLOCKED_MODULE))
{
dictEntry *de;
dictIterator di;
@ -1514,11 +1514,11 @@ int clusterRedirectBlockedClientIfNeeded(client *c) {
/* If the client is blocked on module, but not on a specific key,
* don't unblock it (except for the CLUSTER_FAIL case above). */
if (c->bstate.btype == BLOCKED_MODULE && !moduleClientIsBlockedOnKeys(c))
if (c->bstate->btype == BLOCKED_MODULE && !moduleClientIsBlockedOnKeys(c))
return 0;
/* All keys must belong to the same slot, so check first key only. */
dictInitIterator(&di, c->bstate.keys);
dictInitIterator(&di, c->bstate->keys);
if ((de = dictNext(&di)) != NULL) {
robj *key = dictGetKey(de);
int slot = keyHashSlot((char*)key->ptr, sdslen(key->ptr));

View file

@ -1696,7 +1696,7 @@ void asmSlotSnapshotAndStreamStart(struct asmTask *task) {
shutdown(task->rdb_channel_client->conn->fd, SHUT_RDWR);
return;
}
task->main_channel_client->replstate = SLAVE_STATE_SEND_BULK_AND_STREAM;
task->main_channel_client->repl_data->replstate = SLAVE_STATE_SEND_BULK_AND_STREAM;
task->state = ASM_SEND_BULK_AND_STREAM;
task->rdb_channel_state = ASM_RDBCHANNEL_TRANSFER;
@ -2020,7 +2020,7 @@ void clusterSyncSlotsCommand(client *c) {
* by the client output buffer settings for replicas. The replstate has
* no real significance, just to prevent it from going online. */
c->flags |= (CLIENT_SLAVE | CLIENT_ASM_MIGRATING);
c->replstate = SLAVE_STATE_WAIT_RDB_CHANNEL;
c->repl_data->replstate = SLAVE_STATE_WAIT_RDB_CHANNEL;
if (server.repl_disable_tcp_nodelay)
connDisableTcpNoDelay(c->conn); /* Non-critical if it fails. */
listAddNodeTail(server.slaves, c);
@ -2079,10 +2079,10 @@ void clusterSyncSlotsCommand(client *c) {
/* Mark the client as a slave to generate slots snapshot */
c->flags |= (CLIENT_SLAVE | CLIENT_REPL_RDB_CHANNEL | CLIENT_REPL_RDBONLY | CLIENT_ASM_MIGRATING);
c->slave_capa |= SLAVE_CAPA_EOF;
c->slave_req |= (SLAVE_REQ_SLOTS_SNAPSHOT | SLAVE_REQ_RDB_CHANNEL);
c->replstate = SLAVE_STATE_WAIT_BGSAVE_START;
c->repldbfd = -1;
c->repl_data->slave_capa |= SLAVE_CAPA_EOF;
c->repl_data->slave_req |= (SLAVE_REQ_SLOTS_SNAPSHOT | SLAVE_REQ_RDB_CHANNEL);
c->repl_data->replstate = SLAVE_STATE_WAIT_BGSAVE_START;
c->repl_data->repldbfd = -1;
if (server.repl_disable_tcp_nodelay)
connDisableTcpNoDelay(c->conn); /* Non-critical if it fails. */
listAddNodeTail(server.slaves, c);
@ -2099,7 +2099,7 @@ void clusterSyncSlotsCommand(client *c) {
if (c->tid != IOTHREAD_MAIN_THREAD_ID) keepClientInMainThread(c);
if (!hasActiveChildProcess()) {
startBgsaveForReplication(c->slave_capa, c->slave_req);
startBgsaveForReplication(c->repl_data->slave_capa, c->repl_data->slave_req);
} else {
serverLog(LL_NOTICE, "BGSAVE for slots snapshot sync delayed");
}
@ -3008,7 +3008,7 @@ static void propagateTrimSlots(slotRangeArray *slots) {
void asmUnblockMasterAfterTrim(void) {
if (server.master &&
server.master->flags & CLIENT_BLOCKED &&
server.master->bstate.btype == BLOCKED_POSTPONE_TRIM)
server.master->bstate->btype == BLOCKED_POSTPONE_TRIM)
{
unblockClient(server.master, 1);
serverLog(LL_NOTICE, "Unblocking master client after active trim is completed");

View file

@ -1245,10 +1245,11 @@ void flushAllDataAndResetRDB(int flags) {
/* Block client for blocking ASYNC FLUSH operation (FLUSH*, SFLUSH). */
void blockClientForAsyncFlush(client *c) {
initClientBlockingState(c);
/* measure bg job till completion as elapsed time of flush command */
elapsedStart(&c->bstate.lazyfreeStartTime);
elapsedStart(&c->bstate->lazyfreeStartTime);
c->bstate.timeout = 0;
c->bstate->timeout = 0;
/* We still need to perform cleanup operations for the command, including
* updating the replication offset, so mark this command as pending to
* avoid command from being reset during unblock. */
@ -1299,7 +1300,7 @@ void unblockClientForAsyncFlush(uint64_t client_id, struct slotRangeArray *slots
server.current_client = c;
/* Don't update blocked_us since command was processed in bg by lazy_free thread */
updateStatsOnUnblock(c, 0 /*blocked_us*/, elapsedUs(c->bstate.lazyfreeStartTime), 0);
updateStatsOnUnblock(c, 0 /*blocked_us*/, elapsedUs(c->bstate->lazyfreeStartTime), 0);
/* Only SFLUSH command pass user data pointer. */
if (slots)

View file

@ -52,28 +52,28 @@ void updateClientDataFromIOThread(client *c) {
serverAssert(c->tid != IOTHREAD_MAIN_THREAD_ID &&
c->running_tid == IOTHREAD_MAIN_THREAD_ID);
if (c->io_repl_ack_time > c->repl_ack_time) {
if (c->repl_data->io_repl_ack_time > c->repl_data->repl_ack_time) {
serverAssert(c->flags & CLIENT_SLAVE);
c->repl_ack_time = c->io_repl_ack_time;
c->repl_data->repl_ack_time = c->repl_data->io_repl_ack_time;
}
if (c->io_lastinteraction > c->lastinteraction) {
serverAssert(c->flags & CLIENT_MASTER);
c->lastinteraction = c->io_lastinteraction;
}
if (c->io_read_reploff > c->read_reploff) {
if (c->repl_data->io_read_reploff > c->repl_data->read_reploff) {
serverAssert(c->flags & CLIENT_MASTER);
c->read_reploff = c->io_read_reploff;
c->repl_data->read_reploff = c->repl_data->io_read_reploff;
}
/* Update replication buffer referenced node if IO thread has sent some data. */
if (c->flags & CLIENT_SLAVE && c->ref_repl_buf_node != NULL &&
(c->io_curr_repl_node != c->ref_repl_buf_node ||
c->io_curr_block_pos != c->ref_block_pos))
if (c->flags & CLIENT_SLAVE && c->repl_data->ref_repl_buf_node != NULL &&
(c->repl_data->io_curr_repl_node != c->repl_data->ref_repl_buf_node ||
c->repl_data->io_curr_block_pos != c->repl_data->ref_block_pos))
{
((replBufBlock*)listNodeValue(c->ref_repl_buf_node))->refcount--;
((replBufBlock*)listNodeValue(c->io_curr_repl_node))->refcount++;
c->ref_block_pos = c->io_curr_block_pos;
c->ref_repl_buf_node = c->io_curr_repl_node;
((replBufBlock*)listNodeValue(c->repl_data->ref_repl_buf_node))->refcount--;
((replBufBlock*)listNodeValue(c->repl_data->io_curr_repl_node))->refcount++;
c->repl_data->ref_block_pos = c->repl_data->io_curr_block_pos;
c->repl_data->ref_repl_buf_node = c->repl_data->io_curr_repl_node;
incrementalTrimReplicationBacklog(REPL_BACKLOG_TRIM_BLOCKS_PER_CALL);
}
}
@ -136,16 +136,16 @@ void enqueuePendingClienstToIOThreads(client *c) {
listUnlinkNode(server.clients_pending_write, &c->clients_pending_write_node);
}
if (c->flags & CLIENT_SLAVE) {
serverAssert(c->ref_repl_buf_node != NULL);
serverAssert(c->repl_data->ref_repl_buf_node != NULL);
c->io_repl_ack_time = c->repl_ack_time;
c->io_curr_repl_node = c->ref_repl_buf_node;
c->io_curr_block_pos = c->ref_block_pos;
c->io_bound_repl_node = listLast(server.repl_buffer_blocks);
c->io_bound_block_pos = ((replBufBlock*)listNodeValue(c->io_bound_repl_node))->used;
c->repl_data->io_repl_ack_time = c->repl_data->repl_ack_time;
c->repl_data->io_curr_repl_node = c->repl_data->ref_repl_buf_node;
c->repl_data->io_curr_block_pos = c->repl_data->ref_block_pos;
c->repl_data->io_bound_repl_node = listLast(server.repl_buffer_blocks);
c->repl_data->io_bound_block_pos = ((replBufBlock*)listNodeValue(c->repl_data->io_bound_repl_node))->used;
}
if (c->flags & CLIENT_MASTER) {
c->io_read_reploff = c->read_reploff;
c->repl_data->io_read_reploff = c->repl_data->read_reploff;
c->io_lastinteraction = c->lastinteraction;
}
@ -265,10 +265,10 @@ int isClientMustHandledByMainThread(client *c) {
* to prevent race conditions with main thread when it feeds the replication
* buffer. */
if (c->flags & CLIENT_SLAVE &&
(c->replstate == SLAVE_STATE_ONLINE ||
c->replstate == SLAVE_STATE_SEND_BULK_AND_STREAM) &&
c->repl_start_cmd_stream_on_ack == 0 &&
c->ref_repl_buf_node != NULL)
(c->repl_data->replstate == SLAVE_STATE_ONLINE ||
c->repl_data->replstate == SLAVE_STATE_SEND_BULK_AND_STREAM) &&
c->repl_data->repl_start_cmd_stream_on_ack == 0 &&
c->repl_data->ref_repl_buf_node != NULL)
{
return 0;
}

View file

@ -647,6 +647,18 @@ void *RM_PoolAlloc(RedisModuleCtx *ctx, size_t bytes) {
* Helpers for modules API implementation
* -------------------------------------------------------------------------- */
void initClientModuleData(client *c) {
if (c->module_data) return;
c->module_data = zcalloc(sizeof(ClientModuleData));
}
void freeClientModuleData(client *c) {
if (!c->module_data) return;
zfree(c->module_data->module_blocked_client);
zfree(c->module_data);
c->module_data = NULL;
}
client *moduleAllocTempClient(void) {
client *c = NULL;
@ -687,11 +699,11 @@ void moduleReleaseTempClient(client *c) {
c->flags = CLIENT_MODULE;
c->user = NULL; /* Root user */
c->cmd = c->lastcmd = c->realcmd = NULL;
if (c->bstate.async_rm_call_handle) {
RedisModuleAsyncRMCallPromise *promise = c->bstate.async_rm_call_handle;
if (c->bstate && c->bstate->async_rm_call_handle) {
RedisModuleAsyncRMCallPromise *promise = c->bstate->async_rm_call_handle;
promise->c = NULL; /* Remove the client from the promise so it will no longer be possible to abort it. */
freeRedisModuleAsyncRMCallPromise(promise);
c->bstate.async_rm_call_handle = NULL;
c->bstate->async_rm_call_handle = NULL;
}
moduleTempClients[moduleTempClientCount++] = c;
}
@ -894,7 +906,7 @@ static CallReply *moduleParseReply(client *c, RedisModuleCtx *ctx) {
void moduleCallCommandUnblockedHandler(client *c) {
RedisModuleCtx ctx;
RedisModuleAsyncRMCallPromise *promise = c->bstate.async_rm_call_handle;
RedisModuleAsyncRMCallPromise *promise = c->bstate->async_rm_call_handle;
serverAssert(promise);
RedisModule *module = promise->module;
if (!promise->on_unblocked) {
@ -7132,7 +7144,7 @@ RedisModuleCallReply *RM_Call(RedisModuleCtx *ctx, const char *cmdname, const ch
.ctx = (ctx->flags & REDISMODULE_CTX_AUTO_MEMORY) ? ctx : NULL,
};
reply = callReplyCreatePromise(promise);
c->bstate.async_rm_call_handle = promise;
c->bstate->async_rm_call_handle = promise;
if (!(call_flags & CMD_CALL_PROPAGATE_AOF)) {
/* No need for AOF propagation, set the relevant flags of the client */
c->flags |= CLIENT_MODULE_PREVENT_AOF_PROP;
@ -8256,7 +8268,7 @@ void RM_LatencyAddSample(const char *event, mstime_t latency) {
/* Returns 1 if the client already in the moduleUnblocked list, 0 otherwise. */
int isModuleClientUnblocked(client *c) {
RedisModuleBlockedClient *bc = c->bstate.module_blocked_handle;
RedisModuleBlockedClient *bc = c->bstate->module_blocked_handle;
return bc->unblocked == 1;
}
@ -8274,7 +8286,7 @@ int isModuleClientUnblocked(client *c) {
* The structure RedisModuleBlockedClient will be always deallocated when
* running the list of clients blocked by a module that need to be unblocked. */
void unblockClientFromModule(client *c) {
RedisModuleBlockedClient *bc = c->bstate.module_blocked_handle;
RedisModuleBlockedClient *bc = c->bstate->module_blocked_handle;
/* Call the disconnection callback if any. Note that
* bc->disconnect_callback is set to NULL if the client gets disconnected
@ -8339,8 +8351,9 @@ RedisModuleBlockedClient *moduleBlockClient(RedisModuleCtx *ctx, RedisModuleCmdF
int islua = scriptIsRunning();
int ismulti = server.in_exec;
c->bstate.module_blocked_handle = zcalloc(sizeof(RedisModuleBlockedClient));
RedisModuleBlockedClient *bc = c->bstate.module_blocked_handle;
initClientBlockingState(c);
c->bstate->module_blocked_handle = zcalloc(sizeof(RedisModuleBlockedClient));
RedisModuleBlockedClient *bc = c->bstate->module_blocked_handle;
ctx->module->blocked_clients++;
/* We need to handle the invalid operation of calling modules blocking
@ -8369,7 +8382,7 @@ RedisModuleBlockedClient *moduleBlockClient(RedisModuleCtx *ctx, RedisModuleCmdF
if (timeout_ms) {
mstime_t now = mstime();
if (timeout_ms > LLONG_MAX - now) {
c->bstate.module_blocked_handle = NULL;
c->bstate->module_blocked_handle = NULL;
addReplyError(c, "timeout is out of range"); /* 'timeout_ms+now' would overflow */
return bc;
}
@ -8377,21 +8390,21 @@ RedisModuleBlockedClient *moduleBlockClient(RedisModuleCtx *ctx, RedisModuleCmdF
}
if (islua || ismulti) {
c->bstate.module_blocked_handle = NULL;
c->bstate->module_blocked_handle = NULL;
addReplyError(c, islua ?
"Blocking module command called from Lua script" :
"Blocking module command called from transaction");
} else if (ctx->flags & REDISMODULE_CTX_BLOCKED_REPLY) {
c->bstate.module_blocked_handle = NULL;
c->bstate->module_blocked_handle = NULL;
addReplyError(c, "Blocking module command called from a Reply callback context");
} else if (!auth_reply_callback && clientHasModuleAuthInProgress(c)) {
c->bstate.module_blocked_handle = NULL;
c->bstate->module_blocked_handle = NULL;
addReplyError(c, "Clients undergoing module based authentication can only be blocked on auth");
} else {
if (keys) {
blockForKeys(c,BLOCKED_MODULE,keys,numkeys,timeout,flags&REDISMODULE_BLOCK_UNBLOCK_DELETED);
} else {
c->bstate.timeout = timeout;
c->bstate->timeout = timeout;
blockClient(c,BLOCKED_MODULE);
}
}
@ -8488,7 +8501,7 @@ void moduleUnregisterAuthCBs(RedisModule *module) {
/* Search for & attempt next module auth callback after skipping the ones already attempted.
* Returns the result of the module auth callback. */
int attemptNextAuthCb(client *c, robj *username, robj *password, robj **err) {
int handle_next_callback = c->module_auth_ctx == NULL;
int handle_next_callback = (!c->module_data || c->module_data->module_auth_ctx == NULL);
RedisModuleAuthCtx *cur_auth_ctx = NULL;
listNode *ln;
listIter li;
@ -8498,7 +8511,7 @@ int attemptNextAuthCb(client *c, robj *username, robj *password, robj **err) {
cur_auth_ctx = listNodeValue(ln);
/* Skip over the previously attempted auth contexts. */
if (!handle_next_callback) {
handle_next_callback = cur_auth_ctx == c->module_auth_ctx;
handle_next_callback = cur_auth_ctx == c->module_data->module_auth_ctx;
continue;
}
/* Remove the module auth complete flag before we attempt the next cb. */
@ -8507,7 +8520,8 @@ int attemptNextAuthCb(client *c, robj *username, robj *password, robj **err) {
moduleCreateContext(&ctx, cur_auth_ctx->module, REDISMODULE_CTX_NONE);
ctx.client = c;
*err = NULL;
c->module_auth_ctx = cur_auth_ctx;
initClientModuleData(c);
c->module_data->module_auth_ctx = cur_auth_ctx;
result = cur_auth_ctx->auth_cb(&ctx, username, password, err);
moduleFreeContext(&ctx);
if (result == REDISMODULE_AUTH_HANDLED) break;
@ -8523,8 +8537,8 @@ int attemptNextAuthCb(client *c, robj *username, robj *password, robj **err) {
* return the result of the reply callback. */
int attemptBlockedAuthReplyCallback(client *c, robj *username, robj *password, robj **err) {
int result = REDISMODULE_AUTH_NOT_HANDLED;
if (!c->module_blocked_client) return result;
RedisModuleBlockedClient *bc = (RedisModuleBlockedClient *) c->module_blocked_client;
if (!c->module_data || !c->module_data->module_blocked_client) return result;
RedisModuleBlockedClient *bc = (RedisModuleBlockedClient *) c->module_data->module_blocked_client;
bc->client = c;
if (bc->auth_reply_cb) {
RedisModuleCtx ctx;
@ -8537,7 +8551,7 @@ int attemptBlockedAuthReplyCallback(client *c, robj *username, robj *password, r
moduleFreeContext(&ctx);
}
moduleInvokeFreePrivDataCallback(c, bc);
c->module_blocked_client = NULL;
c->module_data->module_blocked_client = NULL;
c->lastcmd->microseconds += bc->background_duration;
bc->module->blocked_clients--;
zfree(bc);
@ -8565,7 +8579,7 @@ int checkModuleAuthentication(client *c, robj *username, robj *password, robj **
serverAssert(result == REDISMODULE_AUTH_HANDLED);
return AUTH_BLOCKED;
}
c->module_auth_ctx = NULL;
c->module_data->module_auth_ctx = NULL;
if (result == REDISMODULE_AUTH_NOT_HANDLED) {
c->flags &= ~CLIENT_MODULE_AUTH_HAS_RESULT;
return AUTH_NOT_HANDLED;
@ -8586,7 +8600,7 @@ int checkModuleAuthentication(client *c, robj *username, robj *password, robj **
* This function returns 1 if client was served (and should be unblocked) */
int moduleTryServeClientBlockedOnKey(client *c, robj *key) {
int served = 0;
RedisModuleBlockedClient *bc = c->bstate.module_blocked_handle;
RedisModuleBlockedClient *bc = c->bstate->module_blocked_handle;
/* Protect against re-processing: don't serve clients that are already
* in the unblocking list for any reason (including RM_UnblockClient()
@ -8782,14 +8796,14 @@ int moduleUnblockClientByHandle(RedisModuleBlockedClient *bc, void *privdata) {
/* This API is used by the Redis core to unblock a client that was blocked
* by a module. */
void moduleUnblockClient(client *c) {
RedisModuleBlockedClient *bc = c->bstate.module_blocked_handle;
RedisModuleBlockedClient *bc = c->bstate->module_blocked_handle;
moduleUnblockClientByHandle(bc,NULL);
}
/* Return true if the client 'c' was blocked by a module using
* RM_BlockClientOnKeys(). */
int moduleClientIsBlockedOnKeys(client *c) {
RedisModuleBlockedClient *bc = c->bstate.module_blocked_handle;
RedisModuleBlockedClient *bc = c->bstate->module_blocked_handle;
return bc->blocked_on_keys;
}
@ -8904,7 +8918,7 @@ void moduleHandleBlockedClients(void) {
/* Hold onto the blocked client if module auth is in progress. The reply callback is invoked
* when the client is reprocessed. */
if (c && clientHasModuleAuthInProgress(c)) {
c->module_blocked_client = bc;
c->module_data->module_blocked_client = bc;
} else {
/* Free privdata if any. */
moduleInvokeFreePrivDataCallback(c, bc);
@ -8969,10 +8983,10 @@ void moduleHandleBlockedClients(void) {
* moduleBlockedClientTimedOut().
*/
int moduleBlockedClientMayTimeout(client *c) {
if (c->bstate.btype != BLOCKED_MODULE)
if (c->bstate->btype != BLOCKED_MODULE)
return 1;
RedisModuleBlockedClient *bc = c->bstate.module_blocked_handle;
RedisModuleBlockedClient *bc = c->bstate->module_blocked_handle;
return (bc && bc->timeout_callback != NULL);
}
@ -8985,7 +8999,7 @@ int moduleBlockedClientMayTimeout(client *c) {
* of the client synchronously. This ensures that we can reply to the client before
* resetClient() is called. */
void moduleBlockedClientTimedOut(client *c) {
RedisModuleBlockedClient *bc = c->bstate.module_blocked_handle;
RedisModuleBlockedClient *bc = c->bstate->module_blocked_handle;
RedisModuleCtx ctx;
moduleCreateContext(&ctx, bc->module, REDISMODULE_CTX_BLOCKED_TIMEOUT);
@ -10395,16 +10409,16 @@ static void eventLoopHandleOneShotEvents(void) {
* A client's user can be changed through the AUTH command, module
* authentication, and when a client is freed. */
void moduleNotifyUserChanged(client *c) {
if (c->auth_callback) {
c->auth_callback(c->id, c->auth_callback_privdata);
if (!c->module_data || !c->module_data->auth_callback) return;
/* The callback will fire exactly once, even if the user remains
* the same. It is expected to completely clean up the state
* so all references are cleared here. */
c->auth_callback = NULL;
c->auth_callback_privdata = NULL;
c->auth_module = NULL;
}
c->module_data->auth_callback(c->id, c->module_data->auth_callback_privdata);
/* The callback will fire exactly once, even if the user remains
* the same. It is expected to completely clean up the state
* so all references are cleared here. */
c->module_data->auth_callback = NULL;
c->module_data->auth_callback_privdata = NULL;
c->module_data->auth_module = NULL;
}
void revokeClientAuthentication(client *c) {
@ -10427,9 +10441,9 @@ static void moduleFreeAuthenticatedClients(RedisModule *module) {
listRewind(server.clients,&li);
while ((ln = listNext(&li)) != NULL) {
client *c = listNodeValue(ln);
if (!c->auth_module) continue;
if (!c->module_data || !c->module_data->auth_module) continue;
RedisModule *auth_module = (RedisModule *) c->auth_module;
RedisModule *auth_module = (RedisModule *) c->module_data->auth_module;
if (auth_module == module) {
revokeClientAuthentication(c);
}
@ -10792,9 +10806,10 @@ static int authenticateClientWithUser(RedisModuleCtx *ctx, user *user, RedisModu
}
if (callback) {
ctx->client->auth_callback = callback;
ctx->client->auth_callback_privdata = privdata;
ctx->client->auth_module = ctx->module;
initClientModuleData(ctx->client);
ctx->client->module_data->auth_callback = callback;
ctx->client->module_data->auth_callback_privdata = privdata;
ctx->client->module_data->auth_module = ctx->module;
}
if (client_id) {

View file

@ -14,21 +14,20 @@
/* Client state initialization for MULTI/EXEC */
void initClientMultiState(client *c) {
c->mstate.commands = NULL;
c->mstate.count = 0;
c->mstate.cmd_flags = 0;
c->mstate.cmd_inv_flags = 0;
c->mstate.argv_len_sums = 0;
c->mstate.alloc_count = 0;
c->mstate.executing_cmd = -1;
if (c->mstate) return;
c->mstate = zcalloc(sizeof(multiState));
c->mstate->executing_cmd = -1;
}
/* Release all the resources associated with MULTI/EXEC state */
void freeClientMultiState(client *c) {
for (int i = 0; i < c->mstate.count; i++) {
freePendingCommand(c, c->mstate.commands[i]);
if (!c->mstate) return;
for (int i = 0; i < c->mstate->count; i++) {
freePendingCommand(c, c->mstate->commands[i]);
}
zfree(c->mstate.commands);
zfree(c->mstate->commands);
zfree(c->mstate);
c->mstate = NULL;
}
/* Add a new command into the MULTI commands queue */
@ -39,15 +38,16 @@ void queueMultiCommand(client *c, uint64_t cmd_flags) {
* aborted. */
if (c->flags & (CLIENT_DIRTY_CAS|CLIENT_DIRTY_EXEC))
return;
if (c->mstate.count == 0) {
if (!c->mstate) initClientMultiState(c);
if (c->mstate->count == 0) {
/* If a client is using multi/exec, assuming it is used to execute at least
* two commands. Hence, creating by default size of 2. */
c->mstate.commands = zmalloc(sizeof(pendingCommand*)*2);
c->mstate.alloc_count = 2;
c->mstate->commands = zmalloc(sizeof(pendingCommand*)*2);
c->mstate->alloc_count = 2;
}
if (c->mstate.count == c->mstate.alloc_count) {
c->mstate.alloc_count = c->mstate.alloc_count < INT_MAX/2 ? c->mstate.alloc_count*2 : INT_MAX;
c->mstate.commands = zrealloc(c->mstate.commands, sizeof(pendingCommand*)*(c->mstate.alloc_count));
if (c->mstate->count == c->mstate->alloc_count) {
c->mstate->alloc_count = c->mstate->alloc_count < INT_MAX/2 ? c->mstate->alloc_count*2 : INT_MAX;
c->mstate->commands = zrealloc(c->mstate->commands, sizeof(pendingCommand*)*(c->mstate->alloc_count));
}
/* Move the pending command into the multi-state.
@ -55,13 +55,13 @@ void queueMultiCommand(client *c, uint64_t cmd_flags) {
* later, but set the value to NULL to indicate it has been moved out and should not be freed. */
pendingCommand *pcmd = popPendingCommandFromHead(&c->pending_cmds);
c->current_pending_cmd = NULL;
pendingCommand **mc = c->mstate.commands + c->mstate.count;
pendingCommand **mc = c->mstate->commands + c->mstate->count;
*mc = pcmd;
c->mstate.count++;
c->mstate.cmd_flags |= cmd_flags;
c->mstate.cmd_inv_flags |= ~cmd_flags;
c->mstate.argv_len_sums += (*mc)->argv_len_sum;
c->mstate->count++;
c->mstate->cmd_flags |= cmd_flags;
c->mstate->cmd_inv_flags |= ~cmd_flags;
c->mstate->argv_len_sums += (*mc)->argv_len_sum;
c->all_argv_len_sum -= (*mc)->argv_len_sum;
(*mc)->argv_len_sum = 0; /* This is no longer tracked through all_argv_len_sum, so we don't want */
@ -93,6 +93,7 @@ void multiCommand(client *c) {
addReplyError(c,"MULTI calls can not be nested");
return;
}
if (!c->mstate) initClientMultiState(c);
c->flags |= CLIENT_MULTI;
addReply(c,shared.ok);
@ -177,17 +178,17 @@ void execCommand(client *c) {
* Otherwise, we get inconsistencies and all_argv_len_sum doesn't go back to exactly 0 when the client is finished */
orig_all_argv_len_sum = c->all_argv_len_sum;
c->all_argv_len_sum = c->mstate.argv_len_sums;
c->all_argv_len_sum = c->mstate->argv_len_sums;
/* Skip ACL check for the AOF client while server loading. */
int skip_acl_check = server.loading && c->id == CLIENT_ID_AOF;
addReplyArrayLen(c,c->mstate.count);
for (j = 0; j < c->mstate.count; j++) {
c->argc = c->mstate.commands[j]->argc;
c->argv = c->mstate.commands[j]->argv;
c->argv_len = c->mstate.commands[j]->argv_len;
c->cmd = c->realcmd = c->mstate.commands[j]->cmd;
addReplyArrayLen(c,c->mstate->count);
for (j = 0; j < c->mstate->count; j++) {
c->argc = c->mstate->commands[j]->argc;
c->argv = c->mstate->commands[j]->argv;
c->argv_len = c->mstate->commands[j]->argv_len;
c->cmd = c->realcmd = c->mstate->commands[j]->cmd;
/* ACL permissions are also checked at the time of execution in case
* they were changed after the commands were queued. */
@ -220,7 +221,7 @@ void execCommand(client *c) {
"This command is no longer allowed for the "
"following reason: %s", reason);
} else {
c->mstate.executing_cmd = j;
c->mstate->executing_cmd = j;
if (c->id == CLIENT_ID_AOF)
call(c,CMD_CALL_NONE);
else
@ -230,10 +231,10 @@ void execCommand(client *c) {
}
/* Commands may alter argc/argv, restore mstate. */
c->mstate.commands[j]->argc = c->argc;
c->mstate.commands[j]->argv = c->argv;
c->mstate.commands[j]->argv_len = c->argv_len;
c->mstate.commands[j]->cmd = c->cmd;
c->mstate->commands[j]->argc = c->argc;
c->mstate->commands[j]->argv = c->argv;
c->mstate->commands[j]->argv_len = c->argv_len;
c->mstate->commands[j]->cmd = c->cmd;
}
// restore old DENY_BLOCKING value
@ -500,10 +501,12 @@ void unwatchCommand(client *c) {
}
size_t multiStateMemOverhead(client *c) {
size_t mem = c->mstate.argv_len_sums;
/* Add watched keys overhead, Note: this doesn't take into account the watched keys themselves, because they aren't managed per-client. */
mem += listLength(c->watched_keys) * (sizeof(listNode) + sizeof(watchedKey));
/* Add watched keys overhead. Note: this doesn't take into account the watched keys themselves,
* because they aren't managed per-client. */
size_t mem = listLength(c->watched_keys) * (sizeof(listNode) + sizeof(watchedKey));
if (!c->mstate) return mem;
mem += c->mstate->argv_len_sums;
/* Reserved memory for queued multi commands. */
mem += c->mstate.alloc_count * sizeof(pendingCommand);
mem += c->mstate->alloc_count * sizeof(pendingCommand);
return mem;
}

View file

@ -155,12 +155,7 @@ client *createClient(connection *conn) {
c->buf_peak_last_reset_time = server.unixtime;
c->buf_encoded = 0;
c->last_header = NULL;
c->ref_repl_buf_node = NULL;
c->ref_block_pos = 0;
c->io_curr_repl_node = NULL;
c->io_curr_block_pos = 0;
c->io_bound_repl_node = NULL;
c->io_bound_block_pos = 0;
c->repl_data = NULL;
c->qb_pos = 0;
c->querybuf = NULL;
c->querybuf_peak = 0;
@ -193,51 +188,26 @@ client *createClient(connection *conn) {
c->io_lastinteraction = 0;
c->duration = 0;
clientSetDefaultAuth(c);
c->replstate = REPL_STATE_NONE;
c->repl_start_cmd_stream_on_ack = 0;
c->reploff = 0;
c->reploff_next = 0;
c->read_reploff = 0;
c->io_read_reploff = 0;
c->repl_applied = 0;
c->repl_ack_off = 0;
c->repl_ack_time = 0;
c->io_repl_ack_time = 0;
c->repl_aof_off = 0;
c->repl_last_partial_write = 0;
c->slave_listening_port = 0;
c->slave_addr = NULL;
c->slave_capa = SLAVE_CAPA_NONE;
c->slave_req = SLAVE_REQ_NONE;
c->main_ch_client_id = 0;
c->reply = listCreate();
c->deferred_reply_errors = NULL;
c->reply_bytes = 0;
c->obuf_soft_limit_reached_time = 0;
listSetFreeMethod(c->reply,freeClientReplyValue);
listSetDupMethod(c->reply,dupClientReplyValue);
initClientBlockingState(c);
c->woff = 0;
c->bstate = NULL;
c->watched_keys = listCreate();
c->pubsub_channels = dictCreate(&objectKeyPointerValueDictType);
c->pubsub_patterns = dictCreate(&objectKeyPointerValueDictType);
c->pubsubshard_channels = dictCreate(&objectKeyPointerValueDictType);
c->pubsub_data = NULL;
c->peerid = NULL;
c->sockname = NULL;
c->client_list_node = NULL;
c->io_thread_client_list_node = NULL;
c->postponed_list_node = NULL;
c->client_tracking_redirection = 0;
c->client_tracking_prefixes = NULL;
c->io_last_client_cron = 0;
c->io_last_repl_cron = 0;
c->last_memory_usage = 0;
c->last_memory_type = CLIENT_TYPE_NORMAL;
c->module_blocked_client = NULL;
c->module_auth_ctx = NULL;
c->auth_callback = NULL;
c->auth_callback_privdata = NULL;
c->auth_module = NULL;
c->module_data = NULL;
listInitNode(&c->clients_pending_write_node, c);
listInitNode(&c->pending_ref_reply_node, c);
c->mem_usage_bucket = NULL;
@ -245,7 +215,7 @@ client *createClient(connection *conn) {
c->net_input_bytes_curr_cmd = 0;
c->net_output_bytes_curr_cmd = 0;
if (conn) linkClient(c);
initClientMultiState(c);
c->mstate = NULL;
c->net_input_bytes = 0;
c->net_output_bytes = 0;
c->commands_processed = 0;
@ -288,9 +258,9 @@ void putClientInPendingWriteQueue(client *c) {
* if not already done and, for slaves, if the slave can actually receive
* writes at this stage. */
if (!(c->flags & CLIENT_PENDING_WRITE) &&
(c->replstate == REPL_STATE_NONE ||
c->replstate == SLAVE_STATE_SEND_BULK_AND_STREAM ||
(c->replstate == SLAVE_STATE_ONLINE && !c->repl_start_cmd_stream_on_ack)))
(!c->repl_data ||
c->repl_data->replstate == SLAVE_STATE_SEND_BULK_AND_STREAM ||
(c->repl_data->replstate == SLAVE_STATE_ONLINE && !c->repl_data->repl_start_cmd_stream_on_ack)))
{
/* Here instead of installing the write handler, we just flag the
* client and put it into a list of clients that have something
@ -1489,10 +1459,10 @@ void copyReplicaOutputBuffer(client *dst, client *src) {
serverAssert(src->bufpos == 0 && listLength(src->reply) == 0);
serverAssert(src->running_tid == IOTHREAD_MAIN_THREAD_ID &&
dst->running_tid == IOTHREAD_MAIN_THREAD_ID);
if (src->ref_repl_buf_node == NULL) return;
dst->ref_repl_buf_node = src->ref_repl_buf_node;
dst->ref_block_pos = src->ref_block_pos;
((replBufBlock *)listNodeValue(dst->ref_repl_buf_node))->refcount++;
if (src->repl_data->ref_repl_buf_node == NULL) return;
dst->repl_data->ref_repl_buf_node = src->repl_data->ref_repl_buf_node;
dst->repl_data->ref_block_pos = src->repl_data->ref_block_pos;
((replBufBlock *)listNodeValue(dst->repl_data->ref_repl_buf_node))->refcount++;
}
static inline int _clientHasPendingRepliesNonSlave(client *c) {
@ -1503,18 +1473,18 @@ static inline int _clientHasPendingRepliesSlave(client *c) {
/* Replicas use global shared replication buffer instead of
* private output buffer. */
serverAssert(c->bufpos == 0 && listLength(c->reply) == 0);
if (c->ref_repl_buf_node == NULL) return 0;
if (c->repl_data->ref_repl_buf_node == NULL) return 0;
/* If the last replication buffer block content is totally sent,
* we have nothing to send. */
if (c->running_tid == IOTHREAD_MAIN_THREAD_ID) {
listNode *ln = listLast(server.repl_buffer_blocks);
replBufBlock *tail = listNodeValue(ln);
if (ln == c->ref_repl_buf_node &&
c->ref_block_pos == tail->used) return 0;
if (ln == c->repl_data->ref_repl_buf_node &&
c->repl_data->ref_block_pos == tail->used) return 0;
} else {
if (c->io_bound_repl_node == c->io_curr_repl_node &&
c->io_bound_block_pos == c->io_curr_block_pos) return 0;
if (c->repl_data->io_bound_repl_node == c->repl_data->io_curr_repl_node &&
c->repl_data->io_bound_block_pos == c->repl_data->io_curr_block_pos) return 0;
}
return 1;
}
@ -1827,7 +1797,7 @@ int anyOtherSlaveWaitRdb(client *except_me) {
while((ln = listNext(&li))) {
client *slave = ln->value;
if (slave != except_me &&
slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END)
slave->repl_data->replstate == SLAVE_STATE_WAIT_BGSAVE_END)
{
return 1;
}
@ -1859,7 +1829,7 @@ void unlinkClient(client *c) {
/* Check if this is a replica waiting for diskless replication (rdb pipe),
* in which case it needs to be cleaned from that list */
if (c->flags & CLIENT_SLAVE &&
c->replstate == SLAVE_STATE_WAIT_BGSAVE_END &&
c->repl_data->replstate == SLAVE_STATE_WAIT_BGSAVE_END &&
server.rdb_pipe_conns)
{
int i;
@ -1955,10 +1925,12 @@ void clearClientConnectionState(client *c) {
moduleNotifyUserChanged(c);
discardTransaction(c);
pubsubUnsubscribeAllChannels(c,0);
pubsubUnsubscribeShardAllChannels(c, 0);
pubsubUnsubscribeAllPatterns(c,0);
unmarkClientAsPubSub(c);
if (c->pubsub_data) {
pubsubUnsubscribeAllChannels(c,0);
pubsubUnsubscribeShardAllChannels(c, 0);
pubsubUnsubscribeAllPatterns(c,0);
unmarkClientAsPubSub(c);
}
if (c->name) {
decrRefCount(c->name);
@ -2086,8 +2058,8 @@ void freeClient(client *c) {
/* Notify module system that this client auth status changed. */
moduleNotifyUserChanged(c);
/* Free the RedisModuleBlockedClient held onto for reprocessing if not already freed. */
zfree(c->module_blocked_client);
/* Free module data (including RedisModuleBlockedClient held onto for reprocessing). */
freeClientModuleData(c);
/* If this client was scheduled for async freeing we need to remove it
* from the queue. Note that we need to do this here, because later
@ -2131,20 +2103,20 @@ void freeClient(client *c) {
/* If there is any in-flight command, we don't record their duration. */
c->duration = 0;
if (c->flags & CLIENT_BLOCKED) unblockClient(c, 1);
dictRelease(c->bstate.keys);
freeClientBlockingState(c);
/* UNWATCH all the keys */
unwatchAllKeys(c);
listRelease(c->watched_keys);
/* Unsubscribe from all the pubsub channels */
pubsubUnsubscribeAllChannels(c,0);
pubsubUnsubscribeShardAllChannels(c, 0);
pubsubUnsubscribeAllPatterns(c,0);
unmarkClientAsPubSub(c);
dictRelease(c->pubsub_channels);
dictRelease(c->pubsub_patterns);
dictRelease(c->pubsubshard_channels);
if (c->pubsub_data) {
pubsubUnsubscribeAllChannels(c,0);
pubsubUnsubscribeShardAllChannels(c, 0);
pubsubUnsubscribeAllPatterns(c,0);
unmarkClientAsPubSub(c);
}
freeClientPubSubData(c);
/* Free data structures. */
releaseAllBufReferences(c); /* Release all references to string objects in encoded buffers before freeing */
@ -2189,16 +2161,18 @@ void freeClient(client *c) {
* should not remove directly since that means RDB is important for users
* to keep data safe and we may delay configured 'save' for full sync. */
if (server.saveparamslen == 0 &&
c->replstate == SLAVE_STATE_WAIT_BGSAVE_END &&
c->repl_data->replstate == SLAVE_STATE_WAIT_BGSAVE_END &&
server.child_type == CHILD_TYPE_RDB &&
server.rdb_child_type == RDB_CHILD_TYPE_DISK &&
anyOtherSlaveWaitRdb(c) == 0)
{
killRDBChild();
}
if (c->replstate == SLAVE_STATE_SEND_BULK) {
if (c->repldbfd != -1) close(c->repldbfd);
if (c->replpreamble) sdsfree(c->replpreamble);
if (c->repl_data->replstate == SLAVE_STATE_SEND_BULK) {
if (c->repl_data->repldbfd != -1) close(c->repl_data->repldbfd);
c->repl_data->repldbfd = -1;
if (c->repl_data->replpreamble) sdsfree(c->repl_data->replpreamble);
c->repl_data->replpreamble = NULL;
}
list *l = (c->flags & CLIENT_MONITOR) ? server.monitors : server.slaves;
ln = listSearchKey(l,c);
@ -2211,7 +2185,7 @@ void freeClient(client *c) {
server.repl_no_slaves_since = server.unixtime;
refreshGoodSlavesCount();
/* Fire the replica change modules event. */
if (c->replstate == SLAVE_STATE_ONLINE)
if (c->repl_data->replstate == SLAVE_STATE_ONLINE)
moduleFireServerEvent(REDISMODULE_EVENT_REPLICA_CHANGE,
REDISMODULE_SUBEVENT_REPLICA_CHANGE_OFFLINE,
NULL);
@ -2235,7 +2209,7 @@ void freeClient(client *c) {
serverAssert(c->all_argv_len_sum == 0);
sdsfree(c->peerid);
sdsfree(c->sockname);
sdsfree(c->slave_addr);
freeClientReplicationData(c);
sdsfree(c->node_id);
zfree(c);
}
@ -2626,40 +2600,40 @@ static inline int _writeToClientSlave(client *c, ssize_t *nwritten) {
serverAssert(c->bufpos == 0 && listLength(c->reply) == 0);
if (c->running_tid != IOTHREAD_MAIN_THREAD_ID) {
replBufBlock *o = listNodeValue(c->io_curr_repl_node);
replBufBlock *o = listNodeValue(c->repl_data->io_curr_repl_node);
/* The IO thread must not send data beyond the bound position. */
size_t pos = c->io_curr_repl_node == c->io_bound_repl_node ?
c->io_bound_block_pos : o->used;
if (pos > c->io_curr_block_pos) {
*nwritten = connWrite(c->conn, o->buf+c->io_curr_block_pos,
pos-c->io_curr_block_pos);
size_t pos = c->repl_data->io_curr_repl_node == c->repl_data->io_bound_repl_node ?
c->repl_data->io_bound_block_pos : o->used;
if (pos > c->repl_data->io_curr_block_pos) {
*nwritten = connWrite(c->conn, o->buf+c->repl_data->io_curr_block_pos,
pos-c->repl_data->io_curr_block_pos);
if (*nwritten <= 0) return C_ERR;
c->io_curr_block_pos += *nwritten;
c->repl_data->io_curr_block_pos += *nwritten;
}
/* If we fully sent the object and there are more nodes to send, go to the next one. */
if (c->io_curr_block_pos == pos && c->io_curr_repl_node != c->io_bound_repl_node) {
c->io_curr_repl_node = listNextNode(c->io_curr_repl_node);
c->io_curr_block_pos = 0;
if (c->repl_data->io_curr_block_pos == pos && c->repl_data->io_curr_repl_node != c->repl_data->io_bound_repl_node) {
c->repl_data->io_curr_repl_node = listNextNode(c->repl_data->io_curr_repl_node);
c->repl_data->io_curr_block_pos = 0;
}
return C_OK;
}
replBufBlock *o = listNodeValue(c->ref_repl_buf_node);
serverAssert(o->used >= c->ref_block_pos);
replBufBlock *o = listNodeValue(c->repl_data->ref_repl_buf_node);
serverAssert(o->used >= c->repl_data->ref_block_pos);
/* Send current block if it is not fully sent. */
if (o->used > c->ref_block_pos) {
*nwritten = connWrite(c->conn, o->buf+c->ref_block_pos,
o->used-c->ref_block_pos);
if (o->used > c->repl_data->ref_block_pos) {
*nwritten = connWrite(c->conn, o->buf+c->repl_data->ref_block_pos,
o->used-c->repl_data->ref_block_pos);
if (*nwritten <= 0) return C_ERR;
c->ref_block_pos += *nwritten;
c->repl_data->ref_block_pos += *nwritten;
}
/* If we fully sent the object on head, go to the next one. */
listNode *next = listNextNode(c->ref_repl_buf_node);
if (next && c->ref_block_pos == o->used) {
listNode *next = listNextNode(c->repl_data->ref_repl_buf_node);
if (next && c->repl_data->ref_block_pos == o->used) {
o->refcount--;
((replBufBlock *)(listNodeValue(next)))->refcount++;
c->ref_repl_buf_node = next;
c->ref_block_pos = 0;
c->repl_data->ref_repl_buf_node = next;
c->repl_data->ref_block_pos = 0;
incrementalTrimReplicationBacklog(REPL_BACKLOG_TRIM_BLOCKS_PER_CALL);
}
return C_OK;
@ -2986,15 +2960,15 @@ int processInlineBuffer(client *c, pendingCommand *pcmd) {
* RDB file. */
if (querylen == 0 && clientTypeIsSlave(c)) {
if (c->running_tid == IOTHREAD_MAIN_THREAD_ID)
c->repl_ack_time = server.unixtime;
c->repl_data->repl_ack_time = server.unixtime;
else
/* If this is a replica client running in an IO thread we cache the
* last ack time in a different member variable in order to avoid
* contention with main thread. f.e see refreshGoodSlavesCount()
* Note c->repl_ack_time will still be updated in
* updateClientDataFromIOThread with the value of c->io_repl_ack_time
* Note c->repl_data->repl_ack_time will still be updated in
* updateClientDataFromIOThread with the value of c->repl_data->io_repl_ack_time
* when the client moves from IO to main thread. */
c->io_repl_ack_time = server.unixtime;
c->repl_data->io_repl_ack_time = server.unixtime;
}
/* Masters should never send us inline protocol to run actual
@ -3342,11 +3316,11 @@ void commandProcessed(client *c) {
prepareForNextCommand(c, 1);
long long prev_offset = c->reploff;
long long prev_offset = c->repl_data->reploff;
if (c->flags & CLIENT_MASTER && !(c->flags & CLIENT_MULTI)) {
/* Update the applied replication offset of our master. */
serverAssert(c->reploff_next > 0);
c->reploff = c->reploff_next;
serverAssert(c->repl_data->reploff_next > 0);
c->repl_data->reploff = c->repl_data->reploff_next;
}
/* If the client is a master we need to compute the difference
@ -3356,10 +3330,10 @@ void commandProcessed(client *c) {
* part of the replication stream, will be propagated to the
* sub-replicas and to the replication backlog. */
if (c->flags & CLIENT_MASTER) {
long long applied = c->reploff - prev_offset;
long long applied = c->repl_data->reploff - prev_offset;
if (applied) {
replicationFeedStreamFromMasterStream(c->querybuf+c->repl_applied,applied);
c->repl_applied += applied;
replicationFeedStreamFromMasterStream(c->querybuf+c->repl_data->repl_applied,applied);
c->repl_data->repl_applied += applied;
/* Update the atomic slot migration task's applied bytes. */
if (c->flags & CLIENT_ASM_IMPORTING)
@ -3600,10 +3574,14 @@ int processInputBuffer(client *c) {
if (unlikely(pcmd->read_error || (pcmd->flags & PENDING_CMD_FLAG_INCOMPLETE)))
break;
if (c->running_tid == IOTHREAD_MAIN_THREAD_ID)
pcmd->reploff = c->read_reploff - sdslen(c->querybuf) + c->qb_pos;
else
pcmd->reploff = c->io_read_reploff - sdslen(c->querybuf) + c->qb_pos;
if (c->flags & CLIENT_MASTER) {
if (c->running_tid == IOTHREAD_MAIN_THREAD_ID)
pcmd->reploff = c->repl_data->read_reploff - sdslen(c->querybuf) + c->qb_pos;
else
pcmd->reploff = c->repl_data->io_read_reploff - sdslen(c->querybuf) + c->qb_pos;
} else {
pcmd->reploff = 0;
}
preprocessCommand(c, pcmd);
pcmd->flags |= PENDING_CMD_FLAG_PREPROCESSED;
@ -3629,7 +3607,7 @@ int processInputBuffer(client *c) {
c->argv = curcmd->argv;
c->argv_len = curcmd->argv_len;
c->net_input_bytes_curr_cmd += curcmd->input_bytes;
c->reploff_next = curcmd->reploff;
if (c->flags & CLIENT_MASTER) c->repl_data->reploff_next = curcmd->reploff;
c->slot = curcmd->slot;
c->lookedcmd = curcmd->cmd;
c->read_error = curcmd->read_error;
@ -3694,11 +3672,11 @@ int processInputBuffer(client *c) {
* In these scenarios, qb_pos points to the part of the current command
* or the beginning of next command, and the current command is not applied yet,
* so the repl_applied is not equal to qb_pos. */
if (c->repl_applied) {
sdsrange(c->querybuf,c->repl_applied,-1);
serverAssert(c->qb_pos >= (size_t)c->repl_applied);
c->qb_pos -= c->repl_applied;
c->repl_applied = 0;
if ((c->flags & CLIENT_MASTER) && c->repl_data->repl_applied) {
sdsrange(c->querybuf,c->repl_data->repl_applied,-1);
serverAssert(c->qb_pos >= (size_t)c->repl_data->repl_applied);
c->qb_pos -= c->repl_data->repl_applied;
c->repl_data->repl_applied = 0;
}
} else if (c->qb_pos) {
/* Trim to pos */
@ -3829,10 +3807,10 @@ void readQueryFromClient(connection *conn) {
if (c->flags & CLIENT_MASTER) {
if (c->running_tid == IOTHREAD_MAIN_THREAD_ID) {
c->read_reploff += nread;
c->repl_data->read_reploff += nread;
} else {
/* Same comment as for c->io_lastinteraction */
c->io_read_reploff += nread;
c->repl_data->io_read_reploff += nread;
}
atomicIncr(server.stat_net_repl_input_bytes, nread);
} else {
@ -3845,8 +3823,8 @@ void readQueryFromClient(connection *conn) {
* so they are also considered a part of the query buffer in a broader sense.
*
* For unauthenticated clients, the query buffer cannot exceed 1MB at most. */
(c->mstate.argv_len_sums + sdslen(c->querybuf) > server.client_max_querybuf_len ||
(c->mstate.argv_len_sums + sdslen(c->querybuf) > 1024*1024 && authRequired(c))))
((c->mstate ? c->mstate->argv_len_sums : 0) + sdslen(c->querybuf) > server.client_max_querybuf_len ||
((c->mstate ? c->mstate->argv_len_sums : 0) + sdslen(c->querybuf) > 1024*1024 && authRequired(c))))
{
c->read_error = CLIENT_READ_REACHED_MAX_QUERYBUF;
freeClientAsync(c);
@ -3989,9 +3967,9 @@ sds catClientInfoString(sds s, client *client) {
size_t obufmem, total_mem = getClientMemoryUsage(client, &obufmem);
size_t used_blocks_of_repl_buf = 0;
if (client->ref_repl_buf_node) {
if (client->repl_data && client->repl_data->ref_repl_buf_node) {
replBufBlock *last = listNodeValue(listLast(server.repl_buffer_blocks));
replBufBlock *cur = listNodeValue(client->ref_repl_buf_node);
replBufBlock *cur = listNodeValue(client->repl_data->ref_repl_buf_node);
used_blocks_of_repl_buf = last->id - cur->id + 1;
}
@ -4005,15 +3983,15 @@ sds catClientInfoString(sds s, client *client) {
" idle=%I", (long long)(server.unixtime - client->lastinteraction),
" flags=%s", flags,
" db=%i", client->db->id,
" sub=%i", (int) dictSize(client->pubsub_channels),
" psub=%i", (int) dictSize(client->pubsub_patterns),
" ssub=%i", (int) dictSize(client->pubsubshard_channels),
" multi=%i", (client->flags & CLIENT_MULTI) ? client->mstate.count : -1,
" sub=%i", client->pubsub_data ? (int) dictSize(client->pubsub_data->pubsub_channels) : 0,
" psub=%i", client->pubsub_data ? (int) dictSize(client->pubsub_data->pubsub_patterns) : 0,
" ssub=%i", client->pubsub_data ? (int) dictSize(client->pubsub_data->pubsubshard_channels) : 0,
" multi=%i", (client->flags & CLIENT_MULTI && client->mstate) ? client->mstate->count : -1,
" watch=%i", (int) listLength(client->watched_keys),
" qbuf=%U", client->querybuf ? (unsigned long long) sdslen(client->querybuf) : 0,
" qbuf-free=%U", client->querybuf ? (unsigned long long) sdsavail(client->querybuf) : 0,
" argv-mem=%U", (unsigned long long) client->all_argv_len_sum,
" multi-mem=%U", (unsigned long long) client->mstate.argv_len_sums,
" multi-mem=%U", client->mstate ? (unsigned long long) client->mstate->argv_len_sums : 0,
" rbs=%U", (unsigned long long) client->buf_usable_size,
" rbp=%U", (unsigned long long) client->buf_peak,
" obl=%U", (unsigned long long) client->bufpos,
@ -4023,7 +4001,7 @@ sds catClientInfoString(sds s, client *client) {
" events=%s", events,
" cmd=%s", client->lastcmd ? client->lastcmd->fullname : "NULL",
" user=%s", client->user ? client->user->name : "(superuser)",
" redir=%I", (client->flags & CLIENT_TRACKING) ? (long long) client->client_tracking_redirection : -1,
" redir=%I", (client->flags & CLIENT_TRACKING && client->pubsub_data) ? (long long) client->pubsub_data->client_tracking_redirection : -1,
" resp=%i", client->resp,
" lib-name=%s", client->lib_name ? (char*)client->lib_name->ptr : "",
" lib-ver=%s", client->lib_ver ? (char*)client->lib_ver->ptr : "",
@ -4666,7 +4644,7 @@ NULL
} else if (!strcasecmp(c->argv[1]->ptr,"getredir") && c->argc == 2) {
/* CLIENT GETREDIR */
if (c->flags & CLIENT_TRACKING) {
addReplyLongLong(c,c->client_tracking_redirection);
addReplyLongLong(c,c->pubsub_data ? c->pubsub_data->client_tracking_redirection : 0);
} else {
addReplyLongLong(c,-1);
}
@ -4712,17 +4690,17 @@ NULL
/* Redirect */
addReplyBulkCString(c,"redirect");
if (c->flags & CLIENT_TRACKING) {
addReplyLongLong(c,c->client_tracking_redirection);
addReplyLongLong(c,c->pubsub_data ? c->pubsub_data->client_tracking_redirection : 0);
} else {
addReplyLongLong(c,-1);
}
/* Prefixes */
addReplyBulkCString(c,"prefixes");
if (c->client_tracking_prefixes) {
addReplyArrayLen(c,raxSize(c->client_tracking_prefixes));
if (c->pubsub_data && c->pubsub_data->client_tracking_prefixes) {
addReplyArrayLen(c,raxSize(c->pubsub_data->client_tracking_prefixes));
raxIterator ri;
raxStart(&ri,c->client_tracking_prefixes);
raxStart(&ri,c->pubsub_data->client_tracking_prefixes);
raxSeek(&ri,"^",NULL,0);
while(raxNext(&ri)) {
addReplyBulkCBuffer(c,ri.key,ri.key_len);
@ -4931,7 +4909,7 @@ void replaceClientCommandVector(client *c, int argc, robj **argv) {
* to update, so we skip that code. */
pendingCommand *pcmd = NULL;
int is_mstate = 0;
if (c->mstate.executing_cmd < 0) {
if (!(c->flags & CLIENT_MULTI) || c->mstate->executing_cmd < 0) {
is_mstate = 0;
if (c->pending_cmds.ready_len > 0) {
pcmd = c->pending_cmds.head;
@ -4939,8 +4917,8 @@ void replaceClientCommandVector(client *c, int argc, robj **argv) {
}
} else {
is_mstate = 1;
serverAssert(c->mstate.executing_cmd < c->mstate.count);
pcmd = c->mstate.commands[c->mstate.executing_cmd];
serverAssert(c->mstate->executing_cmd < c->mstate->count);
pcmd = c->mstate->commands[c->mstate->executing_cmd];
}
if (pcmd) {
@ -5052,9 +5030,9 @@ size_t getClientOutputBufferMemoryUsage(client *c) {
size_t repl_buf_size = 0;
size_t repl_node_num = 0;
size_t repl_node_size = sizeof(listNode) + sizeof(replBufBlock);
if (c->ref_repl_buf_node) {
if (c->repl_data && c->repl_data->ref_repl_buf_node) {
replBufBlock *last = listNodeValue(listLast(server.repl_buffer_blocks));
replBufBlock *cur = listNodeValue(c->ref_repl_buf_node);
replBufBlock *cur = listNodeValue(c->repl_data->ref_repl_buf_node);
repl_buf_size = last->repl_offset + last->size - cur->repl_offset;
repl_node_num = last->id - cur->id + 1;
}
@ -5095,8 +5073,8 @@ size_t getClientMemoryUsage(client *c, size_t *output_buffer_mem_usage) {
mem += pubsubMemOverhead(c);
/* Add memory overhead of the tracking prefixes, this is an underestimation so we don't need to traverse the entire rax */
if (c->client_tracking_prefixes)
mem += c->client_tracking_prefixes->numnodes * (sizeof(raxNode) * sizeof(raxNode*));
if (c->pubsub_data && c->pubsub_data->client_tracking_prefixes)
mem += c->pubsub_data->client_tracking_prefixes->numnodes * (sizeof(raxNode) * sizeof(raxNode*));
return mem;
}
@ -5286,10 +5264,10 @@ void flushSlavesOutputBuffers(void) {
*
* 3. Obviously if the slave is not ONLINE.
*/
if ((slave->replstate == SLAVE_STATE_ONLINE || slave->replstate == SLAVE_STATE_SEND_BULK_AND_STREAM) &&
if ((slave->repl_data->replstate == SLAVE_STATE_ONLINE || slave->repl_data->replstate == SLAVE_STATE_SEND_BULK_AND_STREAM) &&
!(slave->flags & CLIENT_CLOSE_ASAP) &&
can_receive_writes &&
!slave->repl_start_cmd_stream_on_ack &&
!slave->repl_data->repl_start_cmd_stream_on_ack &&
clientHasPendingReplies(slave))
{
writeToClient(slave,0);

View file

@ -90,6 +90,25 @@ pubsubtype pubSubShardType = {
* message. However if the caller sets 'msg' as NULL, it will be able
* to send a special message (for instance an Array type) by using the
* addReply*() API family. */
void initClientPubSubData(client *c) {
if (c->pubsub_data) return;
c->pubsub_data = zcalloc(sizeof(ClientPubSubData));
c->pubsub_data->pubsub_channels = dictCreate(&objectKeyPointerValueDictType);
c->pubsub_data->pubsub_patterns = dictCreate(&objectKeyPointerValueDictType);
c->pubsub_data->pubsubshard_channels = dictCreate(&objectKeyPointerValueDictType);
}
void freeClientPubSubData(client *c) {
if (!c->pubsub_data) return;
dictRelease(c->pubsub_data->pubsub_channels);
dictRelease(c->pubsub_data->pubsub_patterns);
dictRelease(c->pubsub_data->pubsubshard_channels);
if (c->pubsub_data->client_tracking_prefixes)
raxFree(c->pubsub_data->client_tracking_prefixes);
zfree(c->pubsub_data);
c->pubsub_data = NULL;
}
void addReplyPubsubMessage(client *c, robj *channel, robj *msg, robj *message_bulk) {
uint64_t old_flags = c->flags;
c->flags |= CLIENT_PUSHING;
@ -204,20 +223,22 @@ int serverPubsubShardSubscriptionCount(void) {
/* Return the number of channels + patterns a client is subscribed to. */
int clientSubscriptionsCount(client *c) {
return dictSize(c->pubsub_channels) + dictSize(c->pubsub_patterns);
if (!c->pubsub_data) return 0;
return dictSize(c->pubsub_data->pubsub_channels) + dictSize(c->pubsub_data->pubsub_patterns);
}
/* Return the number of shard level channels a client is subscribed to. */
int clientShardSubscriptionsCount(client *c) {
return dictSize(c->pubsubshard_channels);
if (!c->pubsub_data) return 0;
return dictSize(c->pubsub_data->pubsubshard_channels);
}
dict* getClientPubSubChannels(client *c) {
return c->pubsub_channels;
return c->pubsub_data ? c->pubsub_data->pubsub_channels : NULL;
}
dict* getClientPubSubShardChannels(client *c) {
return c->pubsubshard_channels;
return c->pubsub_data ? c->pubsub_data->pubsubshard_channels : NULL;
}
/* Return the number of pubsub + pubsub shard level channels
@ -335,7 +356,7 @@ void pubsubShardUnsubscribeAllChannelsInSlot(unsigned int slot) {
dictInitIterator(&iter, clients);
while ((entry = dictNext(&iter)) != NULL) {
client *c = dictGetKey(entry);
int retval = dictDelete(c->pubsubshard_channels, channel);
int retval = dictDelete(c->pubsub_data->pubsubshard_channels, channel);
serverAssertWithInfo(c,channel,retval == DICT_OK);
addReplyPubsubUnsubscribed(c, channel, pubSubShardType);
/* If the client has no other pubsub subscription,
@ -356,7 +377,7 @@ int pubsubSubscribePattern(client *c, robj *pattern) {
dict *clients;
int retval = 0;
if (dictAdd(c->pubsub_patterns, pattern, NULL) == DICT_OK) {
if (dictAdd(c->pubsub_data->pubsub_patterns, pattern, NULL) == DICT_OK) {
retval = 1;
incrRefCount(pattern);
/* Add the client to the pattern -> list of clients hash table */
@ -383,7 +404,7 @@ int pubsubUnsubscribePattern(client *c, robj *pattern, int notify) {
int retval = 0;
incrRefCount(pattern); /* Protect the object. May be the same we remove */
if (dictDelete(c->pubsub_patterns, pattern) == DICT_OK) {
if (dictDelete(c->pubsub_data->pubsub_patterns, pattern) == DICT_OK) {
retval = 1;
/* Remove the client from the pattern -> clients list hash table */
de = dictFind(server.pubsub_patterns,pattern);
@ -446,11 +467,11 @@ int pubsubUnsubscribeShardAllChannels(client *c, int notify) {
int pubsubUnsubscribeAllPatterns(client *c, int notify) {
int count = 0;
if (dictSize(c->pubsub_patterns) > 0) {
if (c->pubsub_data && dictSize(c->pubsub_data->pubsub_patterns) > 0) {
dictIterator di;
dictEntry *de;
dictInitSafeIterator(&di, c->pubsub_patterns);
dictInitSafeIterator(&di, c->pubsub_data->pubsub_patterns);
while ((de = dictNext(&di)) != NULL) {
robj *pattern = dictGetKey(de);
count += pubsubUnsubscribePattern(c, pattern, notify);
@ -552,6 +573,7 @@ void subscribeCommand(client *c) {
addReplyError(c, "SUBSCRIBE isn't allowed for a DENY BLOCKING client");
return;
}
initClientPubSubData(c);
for (j = 1; j < c->argc; j++)
pubsubSubscribeChannel(c,c->argv[j],pubSubType);
markClientAsPubSub(c);
@ -587,6 +609,7 @@ void psubscribeCommand(client *c) {
return;
}
initClientPubSubData(c);
for (j = 1; j < c->argc; j++)
pubsubSubscribePattern(c,c->argv[j]);
markClientAsPubSub(c);
@ -734,6 +757,7 @@ void ssubscribeCommand(client *c) {
return;
}
initClientPubSubData(c);
for (int j = 1; j < c->argc; j++) {
pubsubSubscribeChannel(c, c->argv[j], pubSubShardType);
}
@ -755,12 +779,13 @@ void sunsubscribeCommand(client *c) {
}
size_t pubsubMemOverhead(client *c) {
if (!c->pubsub_data) return 0;
/* PubSub patterns */
size_t mem = dictMemUsage(c->pubsub_patterns);
size_t mem = dictMemUsage(c->pubsub_data->pubsub_patterns);
/* Global PubSub channels */
mem += dictMemUsage(c->pubsub_channels);
mem += dictMemUsage(c->pubsub_data->pubsub_channels);
/* Sharded PubSub channels */
mem += dictMemUsage(c->pubsubshard_channels);
mem += dictMemUsage(c->pubsub_data->pubsubshard_channels);
return mem;
}

View file

@ -4433,9 +4433,9 @@ int rdbSaveToSlavesSockets(int req, rdbSaveInfo *rsi) {
listRewind(server.slaves,&li);
while((ln = listNext(&li))) {
client *slave = ln->value;
if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) {
if (slave->repl_data->replstate == SLAVE_STATE_WAIT_BGSAVE_START) {
/* Check slave has the exact requirements */
if (slave->slave_req != req)
if (slave->repl_data->slave_req != req)
continue;
replicationSetupSlaveForFullResync(slave, getPsyncInitialOffset());
conns[numconns++] = slave->conn;
@ -4518,8 +4518,8 @@ int rdbSaveToSlavesSockets(int req, rdbSaveInfo *rsi) {
listRewind(server.slaves,&li);
while((ln = listNext(&li))) {
client *slave = ln->value;
if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END) {
slave->replstate = SLAVE_STATE_WAIT_BGSAVE_START;
if (slave->repl_data->replstate == SLAVE_STATE_WAIT_BGSAVE_END) {
slave->repl_data->replstate = SLAVE_STATE_WAIT_BGSAVE_START;
}
}

File diff suppressed because it is too large Load diff

View file

@ -4445,19 +4445,19 @@ int processCommand(client *c) {
const uint64_t cmd_flags = getCommandFlags(c);
int is_read_command = (cmd_flags & CMD_READONLY) ||
(c->cmd->proc == execCommand && (c->mstate.cmd_flags & CMD_READONLY));
(c->cmd->proc == execCommand && (c->flags & CLIENT_MULTI) && (c->mstate->cmd_flags & CMD_READONLY));
int is_write_command = (cmd_flags & CMD_WRITE) ||
(c->cmd->proc == execCommand && (c->mstate.cmd_flags & CMD_WRITE));
(c->cmd->proc == execCommand && (c->flags & CLIENT_MULTI) && (c->mstate->cmd_flags & CMD_WRITE));
int is_denyoom_command = (cmd_flags & CMD_DENYOOM) ||
(c->cmd->proc == execCommand && (c->mstate.cmd_flags & CMD_DENYOOM));
(c->cmd->proc == execCommand && (c->flags & CLIENT_MULTI) && (c->mstate->cmd_flags & CMD_DENYOOM));
int is_denystale_command = !(cmd_flags & CMD_STALE) ||
(c->cmd->proc == execCommand && (c->mstate.cmd_inv_flags & CMD_STALE));
(c->cmd->proc == execCommand && (c->flags & CLIENT_MULTI) && (c->mstate->cmd_inv_flags & CMD_STALE));
int is_denyloading_command = !(cmd_flags & CMD_LOADING) ||
(c->cmd->proc == execCommand && (c->mstate.cmd_inv_flags & CMD_LOADING));
(c->cmd->proc == execCommand && (c->flags & CLIENT_MULTI) && (c->mstate->cmd_inv_flags & CMD_LOADING));
int is_may_replicate_command = (cmd_flags & (CMD_WRITE | CMD_MAY_REPLICATE)) ||
(c->cmd->proc == execCommand && (c->mstate.cmd_flags & (CMD_WRITE | CMD_MAY_REPLICATE)));
(c->cmd->proc == execCommand && (c->flags & CLIENT_MULTI) && (c->mstate->cmd_flags & (CMD_WRITE | CMD_MAY_REPLICATE)));
int is_deny_async_loading_command = (cmd_flags & CMD_NO_ASYNC_LOADING) ||
(c->cmd->proc == execCommand && (c->mstate.cmd_flags & CMD_NO_ASYNC_LOADING));
(c->cmd->proc == execCommand && (c->flags & CLIENT_MULTI) && (c->mstate->cmd_flags & CMD_NO_ASYNC_LOADING));
int obey_client = mustObeyClient(c);
if (authRequired(c)) {
@ -4746,7 +4746,7 @@ int areCommandKeysInSameSlot(client *c, int *hashslot) {
if (c->cmd->proc == execCommand) {
if (!(c->flags & CLIENT_MULTI)) return 1;
else ms = &c->mstate;
else ms = c->mstate;
}
/* If client is in multi-exec, we need to check the slot of all keys
@ -4934,7 +4934,7 @@ int isReadyToShutdown(void) {
client *replica = listNodeValue(ln);
/* Don't count migration destination replicas. */
if (replica->flags & CLIENT_ASM_MIGRATING) continue;
if (replica->repl_ack_off != server.master_repl_offset) return 0;
if (replica->repl_data->repl_ack_off != server.master_repl_offset) return 0;
}
return 1;
}
@ -4992,16 +4992,16 @@ int finishShutdown(void) {
paused = 1;
}
if (replica->repl_ack_off != server.master_repl_offset) {
if (replica->repl_data->repl_ack_off != server.master_repl_offset) {
num_lagging_replicas++;
long lag = replica->replstate == SLAVE_STATE_ONLINE ?
time(NULL) - replica->repl_ack_time : 0;
long lag = replica->repl_data->replstate == SLAVE_STATE_ONLINE ?
time(NULL) - replica->repl_data->repl_ack_time : 0;
serverLog(LL_NOTICE,
"Lagging replica %s reported offset %lld behind master, lag=%ld, state=%s.",
replicationGetSlaveName(replica),
server.master_repl_offset - replica->repl_ack_off,
server.master_repl_offset - replica->repl_data->repl_ack_off,
lag,
replstateToString(replica->replstate));
replstateToString(replica->repl_data->replstate));
}
if (paused) resumeIOThread(replica->tid);
@ -6727,11 +6727,11 @@ sds genRedisInfoString(dict *section_dict, int all_sections, int everything) {
server.unixtime - server.repl_down_since : 0 ;
if (server.master) {
slave_repl_offset = server.master->reploff;
slave_read_repl_offset = server.master->read_reploff;
slave_repl_offset = server.master->repl_data->reploff;
slave_read_repl_offset = server.master->repl_data->read_reploff;
} else if (server.cached_master) {
slave_repl_offset = server.cached_master->reploff;
slave_read_repl_offset = server.cached_master->read_reploff;
slave_repl_offset = server.cached_master->repl_data->reploff;
slave_read_repl_offset = server.cached_master->repl_data->read_reploff;
}
info = sdscatprintf(info, FMTARGS(
@ -6800,7 +6800,7 @@ sds genRedisInfoString(dict *section_dict, int all_sections, int everything) {
listRewind(server.slaves,&li);
while((ln = listNext(&li))) {
client *slave = listNodeValue(ln);
char ip[NET_IP_STR_LEN], *slaveip = slave->slave_addr;
char ip[NET_IP_STR_LEN], *slaveip = slave->repl_data->slave_addr;
int port;
long lag = 0;
@ -6821,16 +6821,16 @@ sds genRedisInfoString(dict *section_dict, int all_sections, int everything) {
continue;
slaveip = ip;
}
const char *state = replstateToString(slave->replstate);
const char *state = replstateToString(slave->repl_data->replstate);
if (state[0] == '\0') continue;
if (slave->replstate == SLAVE_STATE_ONLINE)
lag = time(NULL) - slave->repl_ack_time;
if (slave->repl_data->replstate == SLAVE_STATE_ONLINE)
lag = time(NULL) - slave->repl_data->repl_ack_time;
info = sdscatprintf(info,
"slave%d:ip=%s,port=%d,state=%s,"
"offset=%lld,lag=%ld,io-thread=%d\r\n",
slaveid,slaveip,slave->slave_listening_port,state,
slave->repl_ack_off, lag, slave->tid);
slaveid,slaveip,slave->repl_data->slave_listening_port,state,
slave->repl_data->repl_ack_off, lag, slave->tid);
slaveid++;
}
}

View file

@ -1081,7 +1081,7 @@ struct RedisModuleDigest {
} while(0)
/* Macro to check if the client is in the middle of module based authentication. */
#define clientHasModuleAuthInProgress(c) ((c)->module_auth_ctx != NULL)
#define clientHasModuleAuthInProgress(c) ((c)->module_data && (c)->module_data->module_auth_ctx != NULL)
/* The string name for an object's type as listed above
* Native types are checked against the OBJ_STRING, OBJ_LIST, OBJ_* defines,
@ -1457,6 +1457,83 @@ typedef struct {
} clientReqResInfo;
#endif
typedef struct ClientReplicationData {
int replstate; /* Replication state if this is a slave. */
int repl_start_cmd_stream_on_ack; /* Install slave write handler on first ACK. */
int repldbfd; /* Replication DB file descriptor. */
off_t repldboff; /* Replication DB file offset. */
off_t repldbsize; /* Replication DB file size. */
sds replpreamble; /* Replication DB preamble. */
long long read_reploff; /* Read replication offset if this is a master. */
long long io_read_reploff; /* Copy of read_reploff but only used when
* master client is in IO thread so we don't
* have contention with IO thread. */
long long reploff; /* Applied replication offset if this is a master. */
long long reploff_next; /* Next value to set for reploff when a command finishes executing */
long long repl_applied; /* Applied replication data count in querybuf, if this is a replica. */
long long repl_ack_off; /* Replication ack offset, if this is a slave. */
long long repl_aof_off; /* Replication AOF fsync ack offset, if this is a slave. */
long long repl_ack_time;/* Replication ack time, if this is a slave. */
long long io_repl_ack_time; /* Replication ack time, if this is a replica in
* IO thread. Keeps track of repl_ack_time while
* replica is in IO thread to avoid data races
* with main. repl_ack_time is updated with this
* value when replica returns to main thread. */
long long repl_last_partial_write; /* The last time the server did a partial write from the RDB child pipe to this replica */
long long psync_initial_offset; /* FULLRESYNC reply offset other slaves
copying this slave output buffer
should use. */
char replid[CONFIG_RUN_ID_SIZE+1]; /* Master replication ID (if master). */
int slave_listening_port; /* As configured with: REPLCONF listening-port */
char *slave_addr; /* Optionally given by REPLCONF ip-address */
int slave_capa; /* Slave capabilities: SLAVE_CAPA_* bitwise OR. */
int slave_req; /* Slave requirements: SLAVE_REQ_* */
uint64_t main_ch_client_id; /* The client id of this replica's main channel */
listNode *ref_repl_buf_node; /* Referenced node of replication buffer blocks,
* see the definition of replBufBlock. */
size_t ref_block_pos; /* Access position of referenced buffer block,
* i.e. the next offset to send. */
listNode *io_curr_repl_node; /* Current node we are sending repl data from in
* IO thread. */
size_t io_curr_block_pos; /* Current position we are sending repl data from
* in IO thread. */
listNode *io_bound_repl_node;/* Bound node we are sending repl data from in
* IO thread. */
size_t io_bound_block_pos; /* Bound position we are sending repl data from
* in IO thread. */
} ClientReplicationData;
typedef struct ClientPubSubData {
dict *pubsub_channels; /* channels a client is interested in (SUBSCRIBE) */
dict *pubsub_patterns; /* patterns a client is interested in (PSUBSCRIBE) */
dict *pubsubshard_channels; /* shard level channels a client is interested in (SSUBSCRIBE) */
/* If this client is in tracking mode and this field is non zero,
* invalidation messages for keys fetched by this client will be sent to
* the specified client ID. */
uint64_t client_tracking_redirection;
rax *client_tracking_prefixes; /* A dictionary of prefixes we are already
subscribed to in BCAST mode, in the
context of client side caching. */
} ClientPubSubData;
typedef struct ClientModuleData {
void *module_blocked_client; /* Pointer to the RedisModuleBlockedClient associated with this
* client. This is set in case of module authentication before the
* unblocked client is reprocessed to handle reply callbacks. */
void *module_auth_ctx; /* Ongoing / attempted module based auth callback's ctx.
* This is only tracked within the context of the command attempting
* authentication. If not NULL, it means module auth is in progress. */
RedisModuleUserChangedFunc auth_callback; /* Module callback to execute
* when the authenticated user
* changes. */
void *auth_callback_privdata; /* Private data that is passed when the auth
* changed callback is executed. Opaque for
* Redis Core. */
void *auth_module; /* The module that owns the callback, which is used
* to disconnect the client if the module is
* unloaded for cleanup. Opaque for Redis Core. */
} ClientModuleData;
typedef struct client {
uint64_t id; /* Client incremental unique ID. */
uint64_t flags; /* Client flags: CLIENT_* macros. */
@ -1522,72 +1599,19 @@ typedef struct client {
mstime_t io_last_repl_cron; /* Timestamp of last invocation of replication
* cron if client is running in IO thread. */
int authenticated; /* Needed when the default user requires auth. */
int replstate; /* Replication state if this is a slave. */
int repl_start_cmd_stream_on_ack; /* Install slave write handler on first ACK. */
int repldbfd; /* Replication DB file descriptor. */
off_t repldboff; /* Replication DB file offset. */
off_t repldbsize; /* Replication DB file size. */
sds replpreamble; /* Replication DB preamble. */
long long read_reploff; /* Read replication offset if this is a master. */
long long io_read_reploff; /* Copy of read_reploff but only used when
* master client is in IO thread so we don't
* have contention with IO thread. */
long long reploff; /* Applied replication offset if this is a master. */
long long reploff_next; /* Next value to set for reploff when a command finishes executing */
long long repl_applied; /* Applied replication data count in querybuf, if this is a replica. */
long long repl_ack_off; /* Replication ack offset, if this is a slave. */
long long repl_aof_off; /* Replication AOF fsync ack offset, if this is a slave. */
long long repl_ack_time;/* Replication ack time, if this is a slave. */
long long io_repl_ack_time; /* Replication ack time, if this is a replica in
* IO thread. Keeps track of repl_ack_time while
* replica is in IO thread to avoid data races
* with main. repl_ack_time is updated with this
* value when replica returns to main thread. */
long long repl_last_partial_write; /* The last time the server did a partial write from the RDB child pipe to this replica */
long long psync_initial_offset; /* FULLRESYNC reply offset other slaves
copying this slave output buffer
should use. */
char replid[CONFIG_RUN_ID_SIZE+1]; /* Master replication ID (if master). */
int slave_listening_port; /* As configured with: REPLCONF listening-port */
char *slave_addr; /* Optionally given by REPLCONF ip-address */
int slave_capa; /* Slave capabilities: SLAVE_CAPA_* bitwise OR. */
int slave_req; /* Slave requirements: SLAVE_REQ_* */
uint64_t main_ch_client_id; /* The client id of this replica's main channel */
multiState mstate; /* MULTI/EXEC state */
blockingState bstate; /* blocking state */
ClientReplicationData *repl_data; /* Replication state, lazily allocated. NULL for regular clients. */
multiState *mstate; /* MULTI/EXEC state, lazily allocated. NULL if not in transaction. */
blockingState *bstate; /* Blocking state, lazily allocated. NULL for non-blocked clients. */
long long woff; /* Last write global replication offset. */
list *watched_keys; /* Keys WATCHED for MULTI/EXEC CAS */
dict *pubsub_channels; /* channels a client is interested in (SUBSCRIBE) */
dict *pubsub_patterns; /* patterns a client is interested in (PSUBSCRIBE) */
dict *pubsubshard_channels; /* shard level channels a client is interested in (SSUBSCRIBE) */
ClientPubSubData *pubsub_data; /* Pub/sub state, lazily allocated. NULL if never subscribed. */
sds peerid; /* Cached peer ID. */
sds sockname; /* Cached connection target address. */
listNode *client_list_node; /* list node in client list */
listNode *io_thread_client_list_node; /* list node in io thread client list */
listNode *postponed_list_node; /* list node within the postponed list */
void *module_blocked_client; /* Pointer to the RedisModuleBlockedClient associated with this
* client. This is set in case of module authentication before the
* unblocked client is reprocessed to handle reply callbacks. */
void *module_auth_ctx; /* Ongoing / attempted module based auth callback's ctx.
* This is only tracked within the context of the command attempting
* authentication. If not NULL, it means module auth is in progress. */
RedisModuleUserChangedFunc auth_callback; /* Module callback to execute
* when the authenticated user
* changes. */
void *auth_callback_privdata; /* Private data that is passed when the auth
* changed callback is executed. Opaque for
* Redis Core. */
void *auth_module; /* The module that owns the callback, which is used
* to disconnect the client if the module is
* unloaded for cleanup. Opaque for Redis Core.*/
ClientModuleData *module_data; /* Module state, lazily allocated. NULL if no module interaction. */
/* If this client is in tracking mode and this field is non zero,
* invalidation messages for keys fetched by this client will be sent to
* the specified client ID. */
uint64_t client_tracking_redirection;
rax *client_tracking_prefixes; /* A dictionary of prefixes we are already
subscribed to in BCAST mode, in the
context of client side caching. */
/* In updateClientMemoryUsage() we track the memory usage of
* each client and add it to the sum of all the clients of a given type,
* however we need to remember what was the old contribution of each
@ -1599,19 +1623,6 @@ typedef struct client {
listNode *mem_usage_bucket_node;
clientMemUsageBucket *mem_usage_bucket;
listNode *ref_repl_buf_node; /* Referenced node of replication buffer blocks,
* see the definition of replBufBlock. */
size_t ref_block_pos; /* Access position of referenced buffer block,
* i.e. the next offset to send. */
listNode *io_curr_repl_node; /* Current node we are sending repl data from in
* IO thread. */
size_t io_curr_block_pos; /* Current position we are sending repl data from
* in IO thread. */
listNode *io_bound_repl_node;/* Bound node we are sending repl data from in
* IO thread. */
size_t io_bound_block_pos; /* Bound position we are sending repl data from
* in IO thread. */
/* list node in clients_pending_write list */
listNode clients_pending_write_node;
/* list node in clients_with_pending_ref_reply list */
@ -1647,6 +1658,16 @@ typedef struct client {
size_t stat_avg_pipeline_length_cnt; /* Count of pipeline length samples */
} client;
void initClientReplicationData(client *c);
void freeClientReplicationData(client *c);
void initClientBlockingState(client *c);
void freeClientBlockingState(client *c);
void initClientPubSubData(client *c);
void freeClientPubSubData(client *c);
void initClientModuleData(client *c);
void freeClientModuleData(client *c);
void initClientMultiState(client *c);
typedef struct __attribute__((aligned(CACHE_LINE_SIZE))) {
uint8_t id; /* The unique ID assigned, if IO_THREADS_MAX_NUM is more
* than 256, we should also promote the data type. */
@ -3361,7 +3382,6 @@ void listTypeTryConversionAppend(robj *o, robj **argv, int start, int end, befor
/* MULTI/EXEC/WATCH... */
void unwatchAllKeys(client *c);
void initClientMultiState(client *c);
void freeClientMultiState(client *c);
void queueMultiCommand(client *c, uint64_t cmd_flags);
size_t multiStateMemOverhead(client *c);
@ -4133,7 +4153,6 @@ typedef struct luaScript {
/* Blocked clients API */
void processUnblockedClients(void);
void initClientBlockingState(client *c);
void blockClient(client *c, int btype);
void unblockClient(client *c, int queue_for_reprocessing);
void unblockClientOnTimeout(client *c);

View file

@ -19,8 +19,8 @@
* Otherwise 0 is returned and no operation is performed. */
int checkBlockedClientTimeout(client *c, mstime_t now) {
if (c->flags & CLIENT_BLOCKED &&
c->bstate.timeout != 0
&& c->bstate.timeout < now)
c->bstate->timeout != 0
&& c->bstate->timeout < now)
{
/* Handle blocking operation specific timeout. */
unblockClientOnTimeout(c);
@ -94,8 +94,8 @@ void decodeTimeoutKey(unsigned char *buf, uint64_t *toptr, client **cptr) {
* to handle blocked clients timeouts. The client is not added to the list
* if its timeout is zero (block forever). */
void addClientToTimeoutTable(client *c) {
if (c->bstate.timeout == 0) return;
uint64_t timeout = c->bstate.timeout;
if (c->bstate->timeout == 0) return;
uint64_t timeout = c->bstate->timeout;
unsigned char buf[CLIENT_ST_KEYLEN];
encodeTimeoutKey(buf,timeout,c);
if (raxTryInsert(server.clients_timeout_table,buf,sizeof(buf),NULL,NULL))
@ -107,7 +107,7 @@ void addClientToTimeoutTable(client *c) {
void removeClientFromTimeoutTable(client *c) {
if (!(c->flags & CLIENT_IN_TO_TABLE)) return;
c->flags &= ~CLIENT_IN_TO_TABLE;
uint64_t timeout = c->bstate.timeout;
uint64_t timeout = c->bstate->timeout;
unsigned char buf[CLIENT_ST_KEYLEN];
encodeTimeoutKey(buf,timeout,c);
raxRemove(server.clients_timeout_table,buf,sizeof(buf),NULL);

View file

@ -47,9 +47,9 @@ typedef struct bcastState {
void disableTracking(client *c) {
/* If this client is in broadcasting mode, we need to unsubscribe it
* from all the prefixes it is registered to. */
if (c->flags & CLIENT_TRACKING_BCAST) {
if (c->flags & CLIENT_TRACKING_BCAST && c->pubsub_data) {
raxIterator ri;
raxStart(&ri,c->client_tracking_prefixes);
raxStart(&ri,c->pubsub_data->client_tracking_prefixes);
raxSeek(&ri,"^",NULL,0);
while(raxNext(&ri)) {
void *result;
@ -67,8 +67,8 @@ void disableTracking(client *c) {
}
}
raxStop(&ri);
raxFree(c->client_tracking_prefixes);
c->client_tracking_prefixes = NULL;
raxFree(c->pubsub_data->client_tracking_prefixes);
c->pubsub_data->client_tracking_prefixes = NULL;
}
/* Clear flags and adjust the count. */
@ -94,9 +94,9 @@ static int stringCheckPrefix(unsigned char *s1, size_t s1_len, unsigned char *s2
int checkPrefixCollisionsOrReply(client *c, robj **prefixes, size_t numprefix) {
for (size_t i = 0; i < numprefix; i++) {
/* Check input list has no overlap with existing prefixes. */
if (c->client_tracking_prefixes) {
if (c->pubsub_data && c->pubsub_data->client_tracking_prefixes) {
raxIterator ri;
raxStart(&ri,c->client_tracking_prefixes);
raxStart(&ri,c->pubsub_data->client_tracking_prefixes);
raxSeek(&ri,"^",NULL,0);
while(raxNext(&ri)) {
if (stringCheckPrefix(ri.key,ri.key_len,
@ -148,9 +148,9 @@ void enableBcastTrackingForPrefix(client *c, char *prefix, size_t plen) {
bs = result;
}
if (raxTryInsert(bs->clients,(unsigned char*)&c,sizeof(c),NULL,NULL)) {
if (c->client_tracking_prefixes == NULL)
c->client_tracking_prefixes = raxNew();
raxInsert(c->client_tracking_prefixes,
if (c->pubsub_data->client_tracking_prefixes == NULL)
c->pubsub_data->client_tracking_prefixes = raxNew();
raxInsert(c->pubsub_data->client_tracking_prefixes,
(unsigned char*)prefix,plen,NULL,NULL);
}
}
@ -163,12 +163,13 @@ void enableBcastTrackingForPrefix(client *c, char *prefix, size_t plen) {
* inform it of the condition. Multiple clients can redirect the invalidation
* messages to the same client ID. */
void enableTracking(client *c, uint64_t redirect_to, uint64_t options, robj **prefix, size_t numprefix) {
initClientPubSubData(c);
if (!(c->flags & CLIENT_TRACKING)) server.tracking_clients++;
c->flags |= CLIENT_TRACKING;
c->flags &= ~(CLIENT_TRACKING_BROKEN_REDIR|CLIENT_TRACKING_BCAST|
CLIENT_TRACKING_OPTIN|CLIENT_TRACKING_OPTOUT|
CLIENT_TRACKING_NOLOOP);
c->client_tracking_redirection = redirect_to;
c->pubsub_data->client_tracking_redirection = redirect_to;
/* This may be the first client we ever enable. Create the tracking
* table if it does not exist. */
@ -260,8 +261,8 @@ void sendTrackingMessage(client *c, char *keyname, size_t keylen, int proto) {
c->flags |= CLIENT_PUSHING;
int using_redirection = 0;
if (c->client_tracking_redirection) {
client *redir = lookupClientByID(c->client_tracking_redirection);
if (c->pubsub_data && c->pubsub_data->client_tracking_redirection) {
client *redir = lookupClientByID(c->pubsub_data->client_tracking_redirection);
if (!redir) {
c->flags |= CLIENT_TRACKING_BROKEN_REDIR;
/* We need to signal to the original connection that we
@ -270,7 +271,7 @@ void sendTrackingMessage(client *c, char *keyname, size_t keylen, int proto) {
if (c->resp > 2) {
addReplyPushLen(c,2);
addReplyBulkCBuffer(c,"tracking-redir-broken",21);
addReplyLongLong(c,c->client_tracking_redirection);
addReplyLongLong(c,c->pubsub_data->client_tracking_redirection);
}
if (!(old_flags & CLIENT_PUSHING)) c->flags &= ~CLIENT_PUSHING;
return;