Fix XTRIM/XADD with approx not deletes entries for DELREF/ACKED strategies (#14623)
Some checks are pending
CI / test-ubuntu-latest (push) Waiting to run
CI / test-sanitizer-address (push) Waiting to run
CI / build-debian-old (push) Waiting to run
CI / build-macos-latest (push) Waiting to run
CI / build-32bit (push) Waiting to run
CI / build-libc-malloc (push) Waiting to run
CI / build-centos-jemalloc (push) Waiting to run
CI / build-old-chain-jemalloc (push) Waiting to run
Codecov / code-coverage (push) Waiting to run
External Server Tests / test-external-standalone (push) Waiting to run
External Server Tests / test-external-cluster (push) Waiting to run
External Server Tests / test-external-nodebug (push) Waiting to run
Spellcheck / Spellcheck (push) Waiting to run

This bug was introduced by #14130 and found by guybe7 

When using XTRIM/XADD with approx mode (~) and DELREF/ACKED delete
strategies, if a node was eligible for removal but couldn't be removed
directly (because consumer group references need to be checked), the
code would incorrectly break out of the loop instead of continuing to
process entries within the node. This fix allows the per-entry deletion
logic to execute for eligible nodes when using non-KEEPREF strategies.
This commit is contained in:
debing.sun 2026-01-05 21:17:36 +08:00 committed by GitHub
parent 4eda670de9
commit 9ca860be9e
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 63 additions and 3 deletions

View file

@ -700,7 +700,9 @@ typedef struct {
int trim_strategy_arg_idx; /* Index of the count in MAXLEN/MINID, for rewriting. */
int delete_strategy; /* DELETE_STRATEGY_* */
int approx_trim; /* If 1 only delete whole radix tree nodes, so
* the trim argument is not applied verbatim. */
* the trim argument is not applied verbatim.
* Note: This flag is ignored when delete_strategy is non-KEEPREF.
* Individual entries may still be processed for consumer groups. */
long long limit; /* Maximum amount of entries to trim. If 0, no limitation
* on the amount of trimming work is enforced. */
/* TRIM_STRATEGY_MAXLEN options */
@ -808,8 +810,11 @@ int64_t streamTrim(stream *s, streamAddTrimArgs *args) {
}
/* If we cannot remove a whole element, and approx is true,
* stop here. */
if (approx) break;
* stop here. However, for non-KEEPREF strategies, if the node was
* eligible for removal but we couldn't remove it (because we need
* to check consumer group references), we should continue to process
* entries within this node. */
if (approx && delete_strategy == DELETE_STRATEGY_KEEPREF) break;
/* Now we have to trim entries from within 'lp' */
size_t oldsize = lpBytes(lp);

View file

@ -884,6 +884,61 @@ start_server {
assert {[r XTRIM mystream MAXLEN ~ 0 LIMIT 1] == 0}
assert {[r XTRIM mystream MAXLEN ~ 0 LIMIT 2] == 2}
}
test {XTRIM with approx and ACKED deletes entries correctly} {
# This test verifies that when using approx trim (~) with ACKED strategy,
# if the first node cannot be removed (has unacked messages), we should
# continue to check subsequent nodes that might be eligible for removal.
r DEL mystream
set origin_max_entries [config_get_set stream-node-max-entries 2]
# Create 5 entries in 3 nodes (2 entries per node)
r XADD mystream 1-0 f v
r XADD mystream 2-0 f v
r XADD mystream 3-0 f v
r XADD mystream 4-0 f v
r XADD mystream 5-0 f v
# Create a consumer group and read all messages
r XGROUP CREATE mystream mygroup 0
r XREADGROUP GROUP mygroup consumer1 STREAMS mystream >
# Acknowledge messages: 1-0, 2-0 (first node), and 4-0 (second node)
r XACK mystream mygroup 1-0 2-0 4-0
# XTRIM MINID ~ 6-0 ACKED should remove:
# Total 3 entries removed (1-0, 2-0, 4-0), 2 unacked entries remain (3-0, 5-0)
assert_equal 3 [r XTRIM mystream MINID ~ 6-0 ACKED]
assert_equal 2 [r XLEN mystream]
assert_equal {{3-0 {f v}} {5-0 {f v}}} [r XRANGE mystream - +]
r config set stream-node-max-entries $origin_max_entries
}
test {XTRIM with approx and DELREF deletes entries correctly} {
# Similar test but with DELREF strategy
r DEL mystream
set origin_max_entries [config_get_set stream-node-max-entries 2]
# Create 4 entries in 2 nodes
r XADD mystream 1-0 f v
r XADD mystream 2-0 f v
r XADD mystream 3-0 f v
r XADD mystream 4-0 f v
# Create a consumer group and read all messages
r XGROUP CREATE mystream mygroup 0
r XREADGROUP GROUP mygroup consumer1 STREAMS mystream >
# With XTRIM MINID ~ 5-0 DELREF, all eligible nodes should be trimmed
# and PEL entries should be cleaned up
assert_equal 4 [r XTRIM mystream MINID ~ 5-0 DELREF]
assert_equal 0 [r XLEN mystream]
# PEL should be empty after DELREF
assert_equal {0 {} {} {}} [r XPENDING mystream mygroup]
r config set stream-node-max-entries $origin_max_entries
}
}
start_server {tags {"stream needs:debug"} overrides {appendonly yes}} {