Fix min_cgroup_last_id cache not updated when destroying consumer group (#14552)
Some checks are pending
CI / test-ubuntu-latest (push) Waiting to run
CI / test-sanitizer-address (push) Waiting to run
CI / build-debian-old (push) Waiting to run
CI / build-macos-latest (push) Waiting to run
CI / build-32bit (push) Waiting to run
CI / build-libc-malloc (push) Waiting to run
CI / build-centos-jemalloc (push) Waiting to run
CI / build-old-chain-jemalloc (push) Waiting to run
Codecov / code-coverage (push) Waiting to run
External Server Tests / test-external-standalone (push) Waiting to run
External Server Tests / test-external-cluster (push) Waiting to run
External Server Tests / test-external-nodebug (push) Waiting to run
Spellcheck / Spellcheck (push) Waiting to run

## Problem

When destroying a consumer group with `XGROUP DESTROY`, the cached
`min_cgroup_last_id` was not being invalidated. This caused incorrect
behavior when using `XDELEX` with the `ACKED` option, as the cache still
referenced the destroyed group's `last_id`.

## Solution

Invalidate the `min_cgroup_last_id` cache when the destroyed group's
`last_id` equals the cached minimum. The cache will be recalculated on
the next call to `streamEntryIsReferenced()`.

---------

Co-authored-by: guybe7 <guy.benoish@redislabs.com>
This commit is contained in:
debing.sun 2025-11-21 22:37:17 +08:00 committed by GitHub
parent 1102415f46
commit bb6389e823
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 34 additions and 0 deletions

View file

@ -3130,6 +3130,7 @@ static void streamFreeCG(stream *s, streamCG *cg) {
/* Destroy a consumer group and clean up all associated references. */
void streamDestroyCG(stream *s, streamCG *cg) {
/* Remove all references from the cgroups_ref. */
raxIterator it;
raxStart(&it, cg->pel);
raxSeek(&it, "^", NULL, 0);
@ -3139,6 +3140,12 @@ void streamDestroyCG(stream *s, streamCG *cg) {
}
raxStop(&it);
/* If we're destroying the group with the minimum last_id, the cached
* minimum is no longer valid and needs to be recalculated from the
* remaining groups. */
if (s->min_cgroup_last_id_valid && streamCompareID(&s->min_cgroup_last_id, &cg->last_id) == 0)
s->min_cgroup_last_id_valid = 0;
streamFreeCG(s, cg);
}

View file

@ -713,6 +713,33 @@ start_server {
assert_equal 0 [r XLEN mystream]
}
test {XGROUP DESTROY correctly manage min_cgroup_last_id cache} {
r DEL mystream
# Add some entries
r XADD mystream 1-0 f1 v1
r XADD mystream 2-0 f2 v2
r XADD mystream 3-0 f3 v3
r XADD mystream 4-0 f4 v4
r XADD mystream 5-0 f5 v5
# Create two consumer groups
r XGROUP CREATE mystream group1 1-0 ;# min_cgroup_last_id is 1-0 now
r XGROUP CREATE mystream group2 3-0
# Entry 1-0 should be deletable (1-0 <= min_cgroup_last_id and not in any PEL)
assert_equal {1} [r XDELEX mystream ACKED IDS 1 1-0]
# Entry 2-0 should be referenced (2-0 > 1-0, not yet consumed by all consume groups)
assert_equal {2} [r XDELEX mystream ACKED IDS 1 2-0]
# Destroy group1
# min_cgroup_last_id is 3-0 now
r XGROUP DESTROY mystream group1
# Entry 2-0 should now be deletable (2-0 < 3-0 and not in any PEL)
assert_equal {1} [r XDELEX mystream ACKED IDS 1 2-0]
}
test {RENAME can unblock XREADGROUP with data} {
r del mystream{t}
r XGROUP CREATE mystream{t} mygroup $ MKSTREAM