mirror of
https://github.com/redis/redis.git
synced 2026-03-01 21:10:49 -05:00
## Summary and detailed design for new stream command ## XDELEX ### Syntax ``` XDELEX key [KEEPREF | DELREF | ACKED] IDS numids id [id ...] ``` ### Description The `XDELEX` command extends the Redis Streams `XDEL` command, offering enhanced control over message entry deletion with respect to consumer groups. It accepts optional `DELREF` or `ACKED` parameters to modify its behavior: - **KEEPREF:** Deletes the specified entries from the stream, but preserves existing references to these entries in all consumer groups' PEL. This behavior is similar to XDEL. - **DELREF:** Deletes the specified entries from the stream and also removes all references to these entries from all consumer groups' pending entry lists, effectively cleaning up all traces of the messages. - **ACKED:** Only trims entries that were read and acknowledged by all consumer groups. **Note:** The `IDS` block can appear at any position in the command, consistent with other commands. ### Reply Array reply, for each `id`: - `-1`: No such `id` exists in the provided stream `key`. - `1`: Entry was deleted from the stream. - `2`: Entry was not deleted, but there are still dangling references. (ACKED option) ## XACKDEL ### Syntax ``` XACKDEL key group [KEEPREF | DELREF | ACKED] IDS numids id [id ...] ``` ### Description The `XACKDEL` command combines `XACK` and `XDEL` functionalities in Redis Streams. It acknowledges specified message IDs in the given consumer group and attempts to delete corresponding stream entries. It accepts optional `DELREF` or `ACKED` parameters: - **KEEPREF:** Acknowledges the messages in the specified consumer group and deletes the entries from the stream, but preserves existing references to these entries in all consumer groups' PEL. - **DELREF:** Acknowledges the messages in the specified consumer group, deletes the entries from the stream, and also removes all references to these entries from all consumer groups' pending entry lists, effectively cleaning up all traces of the messages. - **ACKED:** Acknowledges the messages in the specified consumer group and only trims entries that were read and acknowledged by all consumer groups. ### Reply Array reply, for each `id`: - `-1`: No such `id` exists in the provided stream `key`. - `1`: Entry was acknowledged and deleted from the stream. - `2`: Entry was acknowledged but not deleted, but there are still dangling references. (ACKED option) # Redis Streams Commands Extension ## XTRIM ### Syntax ``` XTRIM key <MAXLEN | MINID> [= | ~] threshold [LIMIT count] [KEEPREF | DELREF | ACKED] ``` ### Description The `XTRIM` command trims a stream by removing entries based on specified criteria, extended to include optional `DELREF` or `ACKED` parameters for consumer group handling: - **KEEPREF:** Trims the stream according to the specified strategy (MAXLEN or MINID) regardless of whether entries are referenced by any consumer groups, but preserves existing references to these entries in all consumer groups' PEL. - **DELREF:** Trims the stream according to the specified strategy and also removes all references to the trimmed entries from all consumer groups' PEL. - **ACKED:** Only trims entries that were read and acknowledged by all consumer groups. ### Reply No change. ## XADD ### Syntax ``` XADD key [NOMKSTREAM] [<MAXLEN | MINID> [= | ~] threshold [LIMIT count]] [KEEPREF | DELREF | ACKED] <* | id> field value [field value ...] ``` ### Description The `XADD` command appends a new entry to a stream and optionally trims it in the same operation, extended to include optional `DELREF` or `ACKED` parameters for trimming behavior: - **KEEPREF:** When trimming, removes entries from the stream according to the specified strategy (MAXLEN or MINID), regardless of whether they are referenced by any consumer groups, but preserves existing references to these entries in all consumer groups' PEL. - **DELREF:** When trimming, removes entries from the stream according to the specified strategy and also removes all references to these entries from all consumer groups' PEL. - **ACKED:** When trimming, only removes entries that were read and acknowledged by all consumer groups. Note that if the number of referenced entries is bigger than MAXLEN, we will still stop. ### Reply No change. ## Key implementation Since we currently have no simple way to track the association between an entry and consumer groups without iterating over all groups, we introduce two mechanisms to establish this link. This allows us to determine whether an entry has been seen by all consumer groups, and to identify which groups are referencing it. With this links, we can break the association when the entry is either acknowledged or deleted. 1) Added reference tracking between stream messages and consumer groups using `cgroups_ref` The cgroups_ref is implemented as a rax that maps stream message IDs to lists of consumer groups that reference those messages, and streamNACK stores the corresponding nodes of this list, so that the corresponding groups can be deleted during `ACK`. In this way, we can determine whether an entry has been seen but not ack. 2) Store a cache minimum last_id in the stream structure. The reason for doing this is that there is a situation where an entry has never been seen by the consume group. In this case, we think this entry has not been consumed either. If there is an "ACKED" option, we cannot directly delete this entry either. When a consumer group updates its last_id, we don’t immediately update the cached minimum last_id. Instead, we check whether the group’s previous last_id was equal to the current minimum, or whether the new last_id is smaller than the current minimum (when using `XGROUP SETID`). If either is true, we mark the cached minimum last_id as invalid, and defer the actual update until the next time it’s needed. --------- Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> Co-authored-by: moticless <moticless@github.com> Co-authored-by: Ozan Tezcan <ozantezcan@gmail.com> Co-authored-by: Slavomir Kaslev <slavomir.kaslev@gmail.com> Co-authored-by: Yuan Wang <yuan.wang@redis.com> |
||
|---|---|---|
| .. | ||
| aclcheck.c | ||
| auth.c | ||
| basics.c | ||
| blockedclient.c | ||
| blockonbackground.c | ||
| blockonkeys.c | ||
| cmdintrospection.c | ||
| commandfilter.c | ||
| crash.c | ||
| datatype.c | ||
| datatype2.c | ||
| defragtest.c | ||
| eventloop.c | ||
| fork.c | ||
| getchannels.c | ||
| getkeys.c | ||
| hash.c | ||
| hooks.c | ||
| infotest.c | ||
| internalsecret.c | ||
| keyspace_events.c | ||
| keyspecs.c | ||
| list.c | ||
| Makefile | ||
| mallocsize.c | ||
| misc.c | ||
| moduleauthtwo.c | ||
| moduleconfigs.c | ||
| moduleconfigstwo.c | ||
| postnotifications.c | ||
| propagate.c | ||
| publish.c | ||
| rdbloadsave.c | ||
| reply.c | ||
| scan.c | ||
| stream.c | ||
| subcommands.c | ||
| test_lazyfree.c | ||
| testrdb.c | ||
| timer.c | ||
| usercall.c | ||
| zset.c | ||