Merge branch 'server_reconfig' into 'master'

Zone db listen reconfiguration

See merge request knot/knot-dns!1838
This commit is contained in:
Libor Peltan 2025-12-27 17:42:57 +01:00
commit a59276c107
5 changed files with 107 additions and 23 deletions

View file

@ -241,9 +241,12 @@ int rdb_addr_to_str(struct sockaddr_storage *addr, char *out, size_t out_len, in
*port = sockaddr_port(addr);
sockaddr_port_set(addr, 0);
if (sockaddr_tostr(out, out_len, addr) <= 0 || *port == 0) {
if (sockaddr_tostr(out, out_len, addr) <= 0 || *port < 0) {
return KNOT_EINVAL;
}
if (*port == 0) {
*port = CONF_REDIS_PORT;
}
}
return KNOT_EOK;
@ -355,6 +358,8 @@ redisContext *rdb_connect(conf_t *conf, bool require_master, const char *info)
goto connected;
}
} else {
log_debug("rdb, failed to query, remote %s%s%.0u",
addr_str, (port != 0 ? "@" : ""), port);
redisFree(rdb);
}

View file

@ -39,6 +39,7 @@
#include "contrib/conn_pool.h"
#include "contrib/files.h"
#include "contrib/net.h"
#include "contrib/openbsd/siphash.h"
#include "contrib/openbsd/strlcat.h"
#include "contrib/os.h"
#include "contrib/sockaddr.h"
@ -941,15 +942,15 @@ static int rdb_listener_run(struct dthread *thread)
redisReply *reply = redisCommand(s->rdb_ctx, "XREAD BLOCK %d STREAMS %b %s",
4000, RDB_EVENT_KEY, strlen(RDB_EVENT_KEY), since);
if (reply == NULL) {
if (thread->state & ThreadDead) {
if (dt_is_cancelled(thread)) {
break;
}
if (s->rdb_ctx->err != REDIS_OK) {
if (s->rdb_ctx != NULL && s->rdb_ctx->err != REDIS_OK) {
log_error("rdb, failed to read events (%s)", s->rdb_ctx->errstr);
}
rdb_disconnect(s->rdb_ctx, false);
s->rdb_ctx = NULL;
sleep(2);
sleep(1);
continue;
}
if (reply->type == REDIS_REPLY_NIL) {
@ -1026,7 +1027,6 @@ int server_init(server_t *server, int bg_workers)
int ret = catalog_update_init(&server->catalog_upd);
if (ret != KNOT_EOK) {
dt_stop(server->rdb_events);
dt_delete(&server->rdb_events);
worker_pool_destroy(server->workers);
evsched_deinit(&server->sched);
@ -1163,10 +1163,8 @@ static void server_free_handler(iohandler_t *h)
}
/* Wait for threads to finish */
if (h->unit) {
dt_stop(h->unit);
dt_join(h->unit);
}
dt_stop(h->unit);
dt_join(h->unit);
/* Destroy worker context. */
dt_delete(&h->unit);
@ -1593,10 +1591,11 @@ void server_stop(server_t *server)
#ifdef ENABLE_REDIS
/* Interrupt and stop XREAD BLOCK loop. */
if (server->rdb_ctx != NULL) {
redisSetTimeout(server->rdb_ctx, (struct timeval){ 0, 1 });
}
dt_stop(server->rdb_events);
if (server->rdb_ctx != NULL) {
shutdown(server->rdb_ctx->fd, SHUT_RDWR);
}
dt_join(server->rdb_events);
#endif // ENABLE_REDIS
/* Stop scheduler. */
@ -1605,6 +1604,7 @@ void server_stop(server_t *server)
server->state |= ServerShutting;
/* Stop timer DB syncing thread */
dt_stop(server->timerdb_sync);
dt_join(server->timerdb_sync);
/* Interrupt background workers. */
worker_pool_stop(server->workers);
@ -1689,7 +1689,7 @@ static int reconfigure_timer_db(conf_t *conf, server_t *server)
}
} else if (!should_sync && exists_sync) {
dt_stop(server->timerdb_sync);
dt_delete(&server->timerdb_sync);
dt_join(server->timerdb_sync);
}
return ret;
@ -1714,6 +1714,15 @@ static bool invalid_redis_conn(intptr_t ptr)
return ptr == CONN_POOL_FD_INVALID || !zone_redis_ping((void *)ptr);
}
static uint64_t db_listen_hash(conf_val_t *val)
{
SIPHASH_CTX ctx;
SIPHASH_KEY key = { 0 };
SipHash24_Init(&ctx, &key);
SipHash24_Update(&ctx, val->blob, val->blob_len);
return SipHash24_End(&ctx);
}
static int reconfigure_remote_pool(conf_t *conf, server_t *server)
{
conf_val_t val = conf_get(conf, C_SRV, C_RMT_POOL_LIMIT);
@ -1754,14 +1763,27 @@ static int reconfigure_remote_pool(conf_t *conf, server_t *server)
}
val = conf_get(conf, C_DB, C_ZONE_DB_LISTEN);
if (global_redis_pool == NULL && val.code == KNOT_EOK) {
size_t bg_wrkrs = conf_bg_threads(conf);
conn_pool_t *new_pool = conn_pool_init(bg_wrkrs, REDIS_CONN_POOL_TIMEOUT,
free_redis_conn, invalid_redis_conn);
if (new_pool == NULL) {
return KNOT_ENOMEM;
if (val.code == KNOT_EOK) {
static uint64_t hash = 0;
uint64_t curr_hash = db_listen_hash(&val);
if (global_redis_pool == NULL) {
size_t bg_wrkrs = conf_bg_threads(conf);
conn_pool_t *new_pool = conn_pool_init(bg_wrkrs + 1, REDIS_CONN_POOL_TIMEOUT,
free_redis_conn, invalid_redis_conn);
if (new_pool == NULL) {
return KNOT_ENOMEM;
}
global_redis_pool = new_pool;
}
global_redis_pool = new_pool;
if (hash != curr_hash) {
conn_pool_purge(global_redis_pool);
if (server->rdb_ctx != NULL) {
// Interrupt XREAD BLOCK and reconnect events.
shutdown(server->rdb_ctx->fd, SHUT_RDWR);
}
}
hash = curr_hash;
}
val = conf_get(conf, C_SRV, C_RMT_RETRY_DELAY);
@ -1834,7 +1856,7 @@ int server_reconfigure(conf_t *conf, server_t *server)
/* Reconfigure Timer DB. */
if ((ret = reconfigure_timer_db(conf, server)) != KNOT_EOK) {
log_error("failed to reconfigure Timer DB (%s)",
log_error("failed to reconfigure timer DB (%s)",
knot_strerror(ret));
}

View file

@ -116,6 +116,8 @@ for i in range(5):
up.add("loop", 3600, "AAAA", "1::%d" % i)
up.send()
master.zones_wait(zones, serials6, equal=True)
if i < 3:
slave.zones_wait(zones, serials6, equal=True)
slave.ctl("zone-thaw")
slave.zones_wait(zones, serials6, equal=True)

View file

@ -86,10 +86,9 @@ t.xfr_diff(master, slave1, zones, serial_init)
t.xfr_diff(master, slave2, zones, serial_init)
# Add to DB manually.
slave2.stop()
slave2.db_in(zones, [redis_slave2], 1)
slave2.gen_confile()
slave2.start()
slave2.reload()
slave2.zones_wait(zones) # interesting: remove and see
txn = redis_slave1.cli("knot.upd.begin", ZONE, "1")
redis_slave1.cli("knot.upd.add", ZONE, txn, "test TXT test")

View file

@ -0,0 +1,56 @@
#!/usr/bin/env python3
'''Test for Redis database reconfiguration.'''
import random
from dnstest.test import Test
from dnstest.utils import *
def db_upd(db, zone_name, inst, init=False):
if init:
txn = db.cli("knot.zone.begin", zone_name, str(inst))
r = db.cli("knot.zone.store", zone_name, txn, "@ SOA dns mail 1 36000 600 864000 300")
r = db.cli("knot.zone.commit", zone_name, txn)
else:
txn = db.cli("knot.upd.begin", zone_name, str(inst))
r = db.cli("knot.upd.commit", zone_name, txn)
t = Test()
server = t.server("knot")
ZONE = "example.com."
zones = t.zone(ZONE)
t.link(zones, server)
redis1 = t.backend("redis", tls=random.choice([True, False]))
redis2 = t.backend("redis", tls=random.choice([True, False]))
server.db_in(zones, [redis1], 1)
t.start()
t.sleep(1)
db_upd(redis1, ZONE, 1, True)
db_upd(redis2, ZONE, 1, True)
db_upd(redis1, ZONE, 2, True)
db_upd(redis2, ZONE, 2, True)
serials = server.zones_wait(zones)
db_upd(redis1, ZONE, 1)
db_upd(redis2, ZONE, 1)
db_upd(redis1, ZONE, 2)
db_upd(redis2, ZONE, 2)
serials = server.zones_wait(zones, serials)
target_redis = random.choice([redis1, redis2])
target_inst = random.choice([1, 2])
server.db_in(zones, [target_redis], target_inst)
server.gen_confile()
server.reload()
db_upd(target_redis, ZONE, target_inst)
serials = server.zones_wait(zones, serials)
t.end()