diff --git a/src/t_stream.c b/src/t_stream.c index 67205a62f..63dace333 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -700,7 +700,9 @@ typedef struct { int trim_strategy_arg_idx; /* Index of the count in MAXLEN/MINID, for rewriting. */ int delete_strategy; /* DELETE_STRATEGY_* */ int approx_trim; /* If 1 only delete whole radix tree nodes, so - * the trim argument is not applied verbatim. */ + * the trim argument is not applied verbatim. + * Note: This flag is ignored when delete_strategy is non-KEEPREF. + * Individual entries may still be processed for consumer groups. */ long long limit; /* Maximum amount of entries to trim. If 0, no limitation * on the amount of trimming work is enforced. */ /* TRIM_STRATEGY_MAXLEN options */ @@ -808,8 +810,11 @@ int64_t streamTrim(stream *s, streamAddTrimArgs *args) { } /* If we cannot remove a whole element, and approx is true, - * stop here. */ - if (approx) break; + * stop here. However, for non-KEEPREF strategies, if the node was + * eligible for removal but we couldn't remove it (because we need + * to check consumer group references), we should continue to process + * entries within this node. */ + if (approx && delete_strategy == DELETE_STRATEGY_KEEPREF) break; /* Now we have to trim entries from within 'lp' */ size_t oldsize = lpBytes(lp); diff --git a/tests/unit/type/stream.tcl b/tests/unit/type/stream.tcl index 9d888589d..1aeeca81d 100644 --- a/tests/unit/type/stream.tcl +++ b/tests/unit/type/stream.tcl @@ -884,6 +884,61 @@ start_server { assert {[r XTRIM mystream MAXLEN ~ 0 LIMIT 1] == 0} assert {[r XTRIM mystream MAXLEN ~ 0 LIMIT 2] == 2} } + + test {XTRIM with approx and ACKED deletes entries correctly} { + # This test verifies that when using approx trim (~) with ACKED strategy, + # if the first node cannot be removed (has unacked messages), we should + # continue to check subsequent nodes that might be eligible for removal. + r DEL mystream + set origin_max_entries [config_get_set stream-node-max-entries 2] + + # Create 5 entries in 3 nodes (2 entries per node) + r XADD mystream 1-0 f v + r XADD mystream 2-0 f v + r XADD mystream 3-0 f v + r XADD mystream 4-0 f v + r XADD mystream 5-0 f v + + # Create a consumer group and read all messages + r XGROUP CREATE mystream mygroup 0 + r XREADGROUP GROUP mygroup consumer1 STREAMS mystream > + + # Acknowledge messages: 1-0, 2-0 (first node), and 4-0 (second node) + r XACK mystream mygroup 1-0 2-0 4-0 + + # XTRIM MINID ~ 6-0 ACKED should remove: + # Total 3 entries removed (1-0, 2-0, 4-0), 2 unacked entries remain (3-0, 5-0) + assert_equal 3 [r XTRIM mystream MINID ~ 6-0 ACKED] + assert_equal 2 [r XLEN mystream] + assert_equal {{3-0 {f v}} {5-0 {f v}}} [r XRANGE mystream - +] + + r config set stream-node-max-entries $origin_max_entries + } + + test {XTRIM with approx and DELREF deletes entries correctly} { + # Similar test but with DELREF strategy + r DEL mystream + set origin_max_entries [config_get_set stream-node-max-entries 2] + + # Create 4 entries in 2 nodes + r XADD mystream 1-0 f v + r XADD mystream 2-0 f v + r XADD mystream 3-0 f v + r XADD mystream 4-0 f v + + # Create a consumer group and read all messages + r XGROUP CREATE mystream mygroup 0 + r XREADGROUP GROUP mygroup consumer1 STREAMS mystream > + + # With XTRIM MINID ~ 5-0 DELREF, all eligible nodes should be trimmed + # and PEL entries should be cleaned up + assert_equal 4 [r XTRIM mystream MINID ~ 5-0 DELREF] + assert_equal 0 [r XLEN mystream] + # PEL should be empty after DELREF + assert_equal {0 {} {} {}} [r XPENDING mystream mygroup] + + r config set stream-node-max-entries $origin_max_entries + } } start_server {tags {"stream needs:debug"} overrides {appendonly yes}} {