From 8fcc561a14ff463227edd1faaabae3e67273e1da Mon Sep 17 00:00:00 2001 From: Daniel Salzman Date: Mon, 2 Feb 2026 10:58:18 +0100 Subject: [PATCH] test --- .../tests/catalog/generate_reconf/test.py | 235 ++++++++++++++++++ tests-extra/tools/dnstest/server.py | 22 +- 2 files changed, 250 insertions(+), 7 deletions(-) create mode 100644 tests-extra/tests/catalog/generate_reconf/test.py diff --git a/tests-extra/tests/catalog/generate_reconf/test.py b/tests-extra/tests/catalog/generate_reconf/test.py new file mode 100644 index 000000000..69eea5703 --- /dev/null +++ b/tests-extra/tests/catalog/generate_reconf/test.py @@ -0,0 +1,235 @@ +#!/usr/bin/env python3 + +'''Test of catalog zone reconfiguration.''' + +from dnstest.test import Test +from dnstest.utils import compare, set_err, detail_log +from dnstest.libknot import libknot +import os +import random +import shutil +import time + +def ctl_begin(server): + ctl = libknot.control.KnotCtl() + ctl.connect(os.path.join(server.dir, "knot.sock")) + ctl.send_block(cmd="conf-begin") + resp = ctl.receive_block() + return ctl + +def ctl_end(ctl): + ctl.send_block(cmd="conf-commit") + resp = ctl.receive_block() + ctl.send(libknot.control.KnotCtlType.END) + ctl.close() + +t = Test() + +master = t.server("knot") +slave = t.server("knot") + +zone1 = t.zone_rnd(1) +t.link(zone1, master, slave) +slave.cat_hidden(zone1) + +gen1 = t.zone("gen1.", exists=False) +t.link(gen1, master, slave) +master.cat_generate(gen1) +slave.cat_interpret(gen1) + +t.start() + +master.zones_wait(zone1, use_ctl=True) +serial = slave.zones_wait(gen1, use_ctl=True) + +# Add a new member to an existing generated catalog. + +memb1 = t.zone_rnd(1) +t.link(memb1, master, slave) +master.cat_member(memb1, gen1) +slave.cat_hidden(memb1) + +master.gen_confile() +master.reload() +serial = slave.zones_wait(gen1, serial, use_ctl=True) +slave.zones_wait(memb1, use_ctl=True) + +# Add a second generated catalog along with its member and catalogize existing zone. + +gen2 = t.zone("gen2.", exists=False) +t.link(gen2, master, slave) +master.cat_generate(gen2) +slave.cat_interpret(gen2) + +memb2 = t.zone_rnd(1) +t.link(memb2, master, slave) +master.cat_member(memb2, gen2) # New zone +slave.cat_hidden(memb2) + +master.cat_member(zone1, gen2) # Existing zone + +master.cat_member(memb1, gen1, remove=True) # Decataloged zone +slave.cat_hidden(memb1, remove=True) +slave.zones.pop(memb1[0].name) # Remove a relict of t.link() - explicit configuration + +master.gen_confile() +slave.gen_confile() +slave.reload() +master.reload() + +slave.zones_wait(gen1, use_ctl=True) +slave.zones_wait(gen2, use_ctl=True) +slave.zones_wait(memb2, use_ctl=True) +slave.zones_wait(zone1, use_ctl=True) +t.sleep(2) +resp = slave.dig(memb1[0].name, "SOA") +resp.check_no_rr(rtype="SOA") # REFUSED not reliable as it can return NXDOMAIN due to random zone names + +# Move a member between generated catalogs + +master.cat_member(memb2, gen2, remove=True) +master.cat_member(memb2, gen1) + +master.gen_confile() +master.ctl("zone-xfr-freeze %s" % gen1[0].name) # Ensure the zone is decataloged first. +master.reload() + +t.sleep(2) +resp = slave.dig(memb2[0].name, "SOA") +resp.check_no_rr(rtype="SOA") # REFUSED not reliable as it can return NXDOMAIN due to random zone names + +master.ctl("zone-xfr-thaw %s" % gen1[0].name) +slave.ctl("zone-refresh %s" % gen1[0].name) +slave.zones_wait(memb2, use_ctl=True) + +# Catalog group change - verify the target template module is activated + +resp = slave.dig(memb2[0].name, "SOA", udp=True) +resp.check(noflags="TC") # Module mod-noudp is not active + +ctl = ctl_begin(slave) +ctl.send_block(cmd="conf-set", section="template", identifier="catalog-signed", item="module", data="mod-noudp") +resp = ctl.receive_block() +ctl_end(ctl) + +master.cat_member(memb2, gen1, "catalog-signed") + +master.gen_confile() +master.reload() + +slave.zones_wait(memb2, use_ctl=True) +t.sleep(2) +resp = slave.dig(memb2[0].name, "SOA", dnssec=True, udp=False) +resp.check(rcode="NOERROR") +resp.check_count(1, "RRSIG") # Signing active + +resp = slave.dig(memb2[0].name, "SOA", udp=True) +resp.check(flags="TC") # Module mod-noudp is active + +''' +# testcase 2: adding member zones dynamically/online/offline +zone_add = t.zone("flags.") + t.zone("records.") +if USE_CTL: + shutil.copy(t.data_dir + "generic.zone", os.path.join(master.dir, "generic.zone")) + ctl = ctl_begin() + ctl_add_zone(ctl, "flags.") + ctl_add_zone(ctl, "records.") + ctl_end(ctl) +else: + t.link(zone_add, master, slave) + for z in zone_add: + master.cat_member(z, catz) + slave.cat_hidden(z) + + master.gen_confile() + + add_online = random.choice([True, False]) + if add_online: + master.reload() + else: + master.stop() + t.sleep(1) + master.start() + +slave.zones_wait(zone + zone_add) + +# testcase 3: removing member zone dynamically/online/offline +serial_bef_rem = slave.zone_wait(catz, tsig=True) +master.ctl("-f zone-purge example.com") +if USE_CTL: + ctl = ctl_begin() + ctl.send_block(cmd="conf-unset", section="zone", item="domain", data="example.com.") + resp = ctl.receive_block() + ctl_end(ctl) +else: + master.zones.pop("example.com.") + master.gen_confile() + + add_online = random.choice([True, False]) + if add_online: + master.reload() + else: + master.stop() + t.sleep(1) + master.start() + +slave.zone_wait(catz, serial_bef_rem, tsig=True) +t.sleep(2) # allow the member zone to actually be purged +resp = slave.dig("example.com.", "SOA") +resp.check(rcode="REFUSED") + +#testcase 4: remove/add same member zone while slave offline, with purge +resp0 = slave.dig("records.", "DNSKEY") +resp0.check_count(1, "DNSKEY") +dnskey0 = resp0.resp.answer[0].to_rdataset() +slave.stop() + +if USE_CTL: + ctl = ctl_begin() + ctl.send_block(cmd="conf-unset", section="zone", item="domain", data="records.") + resp = ctl.receive_block() + ctl_end(ctl) + t.sleep(7) + master.ctl("-f zone-purge +orphan records.") + ctl = ctl_begin() + ctl_add_zone(ctl, "records.") + ctl_end(ctl) +else: + temp_rem = master.zones.pop("records.") + master.gen_confile() + master.reload() + t.sleep(7) + master.ctl("-f zone-purge +orphan records.") + master.zones["records."] = temp_rem + master.gen_confile() + master.reload() + +slave.start() +wait_for_zonefile(slave, "records.", 3, 30) +slave.ctl("zone-refresh") +wait_for_zonefile(slave, "records.", 3, 30) +resp1 = slave.dig("records.", "DNSKEY") +resp1.check_count(1, "DNSKEY") +dnskey1 = resp1.resp.answer[0].to_rdataset() +if dnskey0 == dnskey1: + set_err("ZONE NOT PURGED") + +#testcase 5: reload and don't reload a zone depending on the config change +if USE_CTL: + shutil.copy(t.data_dir + "generic.upd", os.path.join(master.dir, "generic.zone")) + + ctl = ctl_begin() + ctl.send_block(cmd="conf-set", section="zone", identifier="flags.", item="zone-max-size", data="10000") + resp = ctl.receive_block() + ctl.send_block(cmd="conf-set", section="zone", identifier="records.", item="comment", data="don't reload") + resp = ctl.receive_block() + ctl_end(ctl) + + t.sleep(2) + resp = master.dig("flags.", "SOA") + compare(resp.soa_serial(), 10, "master zone reloaded") + resp = master.dig("records.", "SOA") + compare(resp.soa_serial(), 1, "master zone not reloaded") + slave.zone_wait(zone_add[0], 1) +''' +t.end() diff --git a/tests-extra/tools/dnstest/server.py b/tests-extra/tools/dnstest/server.py index d87ad870f..2fd12d34e 100644 --- a/tests-extra/tools/dnstest/server.py +++ b/tests-extra/tools/dnstest/server.py @@ -304,16 +304,24 @@ class Server(object): z = self.zones[zone_arg_check(zone).name] z.catalog_role = ZoneCatalogRole.GENERATE - def cat_member(self, zone, catalog, group=None): + def cat_member(self, zone, catalog, group=None, remove=False): z = self.zones[zone_arg_check(zone).name] - c = self.zones[zone_arg_check(catalog).name] - z.catalog_role = ZoneCatalogRole.MEMBER - z.catalog_gen_name = c.name - z.catalog_group = group + if remove: + z.catalog_role = None + z.catalog_gen_name = None + z.catalog_group = None + else: + c = self.zones[zone_arg_check(catalog).name] + z.catalog_role = ZoneCatalogRole.MEMBER + z.catalog_gen_name = c.name + z.catalog_group = group - def cat_hidden(self, zone): + def cat_hidden(self, zone, remove=False): z = self.zones[zone_arg_check(zone).name] - z.catalog_role = ZoneCatalogRole.HIDDEN + if remove: + z.catalog_role = None + else: + z.catalog_role = ZoneCatalogRole.HIDDEN def compile(self): try: