mirror of
https://github.com/isc-projects/bind9.git
synced 2026-03-11 02:30:44 -04:00
The artifact lists in clean.sh and extra_artifacts might be slightly different. The list was updated for each test to reflect the current state.
484 lines
13 KiB
Python
Executable file
484 lines
13 KiB
Python
Executable file
#!/usr/bin/python3
|
|
|
|
# Copyright (C) Internet Systems Consortium, Inc. ("ISC")
|
|
#
|
|
# SPDX-License-Identifier: MPL-2.0
|
|
#
|
|
# This Source Code Form is subject to the terms of the Mozilla Public
|
|
# License, v. 2.0. If a copy of the MPL was not distributed with this
|
|
# file, you can obtain one at https://mozilla.org/MPL/2.0/.
|
|
#
|
|
# See the COPYRIGHT file distributed with this work for additional
|
|
# information regarding copyright ownership.
|
|
|
|
|
|
from typing import NamedTuple, Tuple
|
|
|
|
import os
|
|
import sys
|
|
import time
|
|
|
|
import isctest
|
|
import pytest
|
|
|
|
pytest.importorskip("dns", minversion="2.0.0")
|
|
import dns.exception
|
|
import dns.message
|
|
import dns.name
|
|
import dns.rcode
|
|
import dns.rdataclass
|
|
import dns.rdatatype
|
|
|
|
|
|
pytestmark = [
|
|
pytest.mark.skipif(
|
|
sys.version_info < (3, 7), reason="Python >= 3.7 required [GL #3001]"
|
|
),
|
|
pytest.mark.extra_artifacts(
|
|
[
|
|
"*.out",
|
|
"ns*/*.db",
|
|
"ns*/*.db.infile",
|
|
"ns*/*.db.signed",
|
|
"ns*/*.jnl",
|
|
"ns*/*.jbk",
|
|
"ns*/*.keyname",
|
|
"ns*/dsset-*",
|
|
"ns*/K*",
|
|
"ns*/keygen.out*",
|
|
"ns*/settime.out*",
|
|
"ns*/signer.out*",
|
|
"ns*/trusted.conf",
|
|
"ns*/zones",
|
|
]
|
|
),
|
|
]
|
|
|
|
|
|
def has_signed_apex_nsec(zone, response):
|
|
has_nsec = False
|
|
has_rrsig = False
|
|
|
|
ttl = 300
|
|
nextname = "a."
|
|
labelcount = zone.count(".") # zone is specified as FQDN
|
|
types = "NS SOA RRSIG NSEC DNSKEY"
|
|
match = f"{zone} {ttl} IN NSEC {nextname}{zone} {types}"
|
|
sig = f"{zone} {ttl} IN RRSIG NSEC 13 {labelcount} 300"
|
|
|
|
for rr in response.answer:
|
|
if match in rr.to_text():
|
|
has_nsec = True
|
|
if sig in rr.to_text():
|
|
has_rrsig = True
|
|
|
|
if not has_nsec:
|
|
print("error: missing apex NSEC record in response")
|
|
if not has_rrsig:
|
|
print("error: missing NSEC signature in response")
|
|
|
|
return has_nsec and has_rrsig
|
|
|
|
|
|
def do_query(server, qname, qtype, tcp=False):
|
|
msg = dns.message.make_query(qname, qtype, use_edns=True, want_dnssec=True)
|
|
query_func = isctest.query.tcp if tcp else isctest.query.udp
|
|
response = query_func(msg, server.ip, expected_rcode=dns.rcode.NOERROR)
|
|
return response
|
|
|
|
|
|
def verify_zone(zone, transfer):
|
|
verify = os.getenv("VERIFY")
|
|
assert verify is not None
|
|
|
|
filename = f"{zone}out"
|
|
with open(filename, "w", encoding="utf-8") as file:
|
|
for rr in transfer.answer:
|
|
file.write(rr.to_text())
|
|
file.write("\n")
|
|
|
|
# dnssec-verify command with default arguments.
|
|
verify_cmd = [verify, "-z", "-o", zone, filename]
|
|
|
|
verifier = isctest.run.cmd(verify_cmd)
|
|
|
|
if verifier.returncode != 0:
|
|
print(f"error: dnssec-verify {zone} failed")
|
|
sys.stderr.buffer.write(verifier.stderr)
|
|
|
|
return verifier.returncode == 0
|
|
|
|
|
|
def read_statefile(server, zone):
|
|
count = 0
|
|
keyid = 0
|
|
state = {}
|
|
|
|
response = do_query(server, zone, "DS", tcp=True)
|
|
# fetch key id from response.
|
|
for rr in response.answer:
|
|
if rr.match(
|
|
dns.name.from_text(zone),
|
|
dns.rdataclass.IN,
|
|
dns.rdatatype.DS,
|
|
dns.rdatatype.NONE,
|
|
):
|
|
if count == 0:
|
|
keyid = list(dict(rr.items).items())[0][0].key_tag
|
|
count += 1
|
|
|
|
assert (
|
|
count == 1
|
|
), f"expected a single DS in response for {zone} from {server.ip}, got {count}"
|
|
|
|
filename = f"ns9/K{zone}+013+{keyid:05d}.state"
|
|
print(f"read state file {filename}")
|
|
|
|
try:
|
|
with open(filename, "r", encoding="utf-8") as file:
|
|
for line in file:
|
|
if line.startswith(";"):
|
|
continue
|
|
key, val = line.strip().split(":", 1)
|
|
state[key.strip()] = val.strip()
|
|
except FileNotFoundError:
|
|
# file may not be written just yet.
|
|
return {}
|
|
|
|
return state
|
|
|
|
|
|
def zone_check(server, zone):
|
|
fqdn = f"{zone}."
|
|
|
|
# check zone is fully signed.
|
|
response = do_query(server, fqdn, "NSEC")
|
|
assert has_signed_apex_nsec(fqdn, response)
|
|
|
|
# check if zone if DNSSEC valid.
|
|
transfer = do_query(server, fqdn, "AXFR", tcp=True)
|
|
assert verify_zone(fqdn, transfer)
|
|
|
|
|
|
def keystate_check(server, zone, key):
|
|
fqdn = f"{zone}."
|
|
val = 0
|
|
deny = False
|
|
|
|
search = key
|
|
if key.startswith("!"):
|
|
deny = True
|
|
search = key[1:]
|
|
|
|
for _ in range(10):
|
|
state = read_statefile(server, fqdn)
|
|
try:
|
|
val = state[search]
|
|
except KeyError:
|
|
pass
|
|
|
|
if not deny and val != 0:
|
|
break
|
|
if deny and val == 0:
|
|
break
|
|
|
|
time.sleep(1)
|
|
|
|
if deny:
|
|
assert val == 0
|
|
else:
|
|
assert val != 0
|
|
|
|
|
|
def rekey(zone):
|
|
rndc = os.getenv("RNDC")
|
|
assert rndc is not None
|
|
|
|
port = os.getenv("CONTROLPORT")
|
|
assert port is not None
|
|
|
|
# rndc loadkeys.
|
|
rndc_cmd = [
|
|
rndc,
|
|
"-c",
|
|
"../_common/rndc.conf",
|
|
"-p",
|
|
port,
|
|
"-s",
|
|
"10.53.0.9",
|
|
"loadkeys",
|
|
zone,
|
|
]
|
|
controller = isctest.run.cmd(rndc_cmd)
|
|
|
|
if controller.returncode != 0:
|
|
print(f"error: rndc loadkeys {zone} failed")
|
|
sys.stderr.buffer.write(controller.stderr)
|
|
|
|
assert controller.returncode == 0
|
|
|
|
|
|
class CheckDSTest(NamedTuple):
|
|
zone: str
|
|
logs_to_wait_for: Tuple[str]
|
|
expected_parent_state: str
|
|
|
|
|
|
parental_agents_tests = [
|
|
# Using a reference to parental-agents.
|
|
CheckDSTest(
|
|
zone="reference.explicit.dspublish.ns2",
|
|
logs_to_wait_for=("DS response from 10.53.0.8",),
|
|
expected_parent_state="DSPublish",
|
|
),
|
|
# Using a resolver as parental-agent (ns3).
|
|
CheckDSTest(
|
|
zone="resolver.explicit.dspublish.ns2",
|
|
logs_to_wait_for=("DS response from 10.53.0.3",),
|
|
expected_parent_state="DSPublish",
|
|
),
|
|
# Using a resolver as parental-agent (ns3).
|
|
CheckDSTest(
|
|
zone="resolver.explicit.dsremoved.ns5",
|
|
logs_to_wait_for=("empty DS response from 10.53.0.3",),
|
|
expected_parent_state="DSRemoved",
|
|
),
|
|
]
|
|
|
|
no_ent_tests = [
|
|
CheckDSTest(
|
|
zone="no-ent.ns2",
|
|
logs_to_wait_for=("DS response from 10.53.0.2",),
|
|
expected_parent_state="DSPublish",
|
|
),
|
|
CheckDSTest(
|
|
zone="no-ent.ns5",
|
|
logs_to_wait_for=("DS response from 10.53.0.5",),
|
|
expected_parent_state="DSRemoved",
|
|
),
|
|
]
|
|
|
|
|
|
def dspublished_tests(checkds, addr):
|
|
return [
|
|
#
|
|
# 1.1.1: DS is correctly published in parent.
|
|
# parental-agents: ns2
|
|
#
|
|
# The simple case.
|
|
CheckDSTest(
|
|
zone=f"good.{checkds}.dspublish.ns2",
|
|
logs_to_wait_for=(f"DS response from {addr}",),
|
|
expected_parent_state="DSPublish",
|
|
),
|
|
#
|
|
# 1.1.2: DS is not published in parent.
|
|
# parental-agents: ns5
|
|
#
|
|
CheckDSTest(
|
|
zone=f"not-yet.{checkds}.dspublish.ns5",
|
|
logs_to_wait_for=("empty DS response from 10.53.0.5",),
|
|
expected_parent_state="!DSPublish",
|
|
),
|
|
#
|
|
# 1.1.3: The parental agent is badly configured.
|
|
# parental-agents: ns6
|
|
#
|
|
CheckDSTest(
|
|
zone=f"bad.{checkds}.dspublish.ns6",
|
|
logs_to_wait_for=(
|
|
(
|
|
"bad DS response from 10.53.0.6"
|
|
if checkds == "explicit"
|
|
else "error during parental-agents processing"
|
|
),
|
|
),
|
|
expected_parent_state="!DSPublish",
|
|
),
|
|
#
|
|
# 1.1.4: DS is published, but has bogus signature.
|
|
#
|
|
# TBD
|
|
#
|
|
# 1.2.1: DS is correctly published in all parents.
|
|
# parental-agents: ns2, ns4
|
|
#
|
|
CheckDSTest(
|
|
zone=f"good.{checkds}.dspublish.ns2-4",
|
|
logs_to_wait_for=(f"DS response from {addr}", "DS response from 10.53.0.4"),
|
|
expected_parent_state="DSPublish",
|
|
),
|
|
#
|
|
# 1.2.2: DS is not published in some parents.
|
|
# parental-agents: ns2, ns4, ns5
|
|
#
|
|
CheckDSTest(
|
|
zone=f"incomplete.{checkds}.dspublish.ns2-4-5",
|
|
logs_to_wait_for=(
|
|
f"DS response from {addr}",
|
|
"DS response from 10.53.0.4",
|
|
"empty DS response from 10.53.0.5",
|
|
),
|
|
expected_parent_state="!DSPublish",
|
|
),
|
|
#
|
|
# 1.2.3: One parental agent is badly configured.
|
|
# parental-agents: ns2, ns4, ns6
|
|
#
|
|
CheckDSTest(
|
|
zone=f"bad.{checkds}.dspublish.ns2-4-6",
|
|
logs_to_wait_for=(
|
|
f"DS response from {addr}",
|
|
"DS response from 10.53.0.4",
|
|
"bad DS response from 10.53.0.6",
|
|
),
|
|
expected_parent_state="!DSPublish",
|
|
),
|
|
#
|
|
# 1.2.4: DS is completely published, bogus signature.
|
|
#
|
|
# TBD
|
|
# TBD: Check with TSIG
|
|
# TBD: Check with TLS
|
|
]
|
|
|
|
|
|
def dswithdrawn_tests(checkds, addr):
|
|
return [
|
|
#
|
|
# 2.1.1: DS correctly withdrawn from the parent.
|
|
# parental-agents: ns5
|
|
#
|
|
# The simple case.
|
|
CheckDSTest(
|
|
zone=f"good.{checkds}.dsremoved.ns5",
|
|
logs_to_wait_for=(f"empty DS response from {addr}",),
|
|
expected_parent_state="DSRemoved",
|
|
),
|
|
#
|
|
# 2.1.2: DS is published in the parent.
|
|
# parental-agents: ns2
|
|
#
|
|
CheckDSTest(
|
|
zone=f"still-there.{checkds}.dsremoved.ns2",
|
|
logs_to_wait_for=("DS response from 10.53.0.2",),
|
|
expected_parent_state="!DSRemoved",
|
|
),
|
|
#
|
|
# 2.1.3: The parental agent is badly configured.
|
|
# parental-agents: ns6
|
|
#
|
|
CheckDSTest(
|
|
zone=f"bad.{checkds}.dsremoved.ns6",
|
|
logs_to_wait_for=(
|
|
(
|
|
"bad DS response from 10.53.0.6"
|
|
if checkds == "explicit"
|
|
else "error during parental-agents processing"
|
|
),
|
|
),
|
|
expected_parent_state="!DSRemoved",
|
|
),
|
|
#
|
|
# 2.1.4: DS is withdrawn, but has bogus signature.
|
|
#
|
|
# TBD
|
|
#
|
|
# 2.2.1: DS is correctly withdrawn from all parents.
|
|
# parental-agents: ns5, ns7
|
|
#
|
|
CheckDSTest(
|
|
zone=f"good.{checkds}.dsremoved.ns5-7",
|
|
logs_to_wait_for=(
|
|
f"empty DS response from {addr}",
|
|
"empty DS response from 10.53.0.7",
|
|
),
|
|
expected_parent_state="DSRemoved",
|
|
),
|
|
#
|
|
# 2.2.2: DS is not withdrawn from some parents.
|
|
# parental-agents: ns2, ns5, ns7
|
|
#
|
|
CheckDSTest(
|
|
zone=f"incomplete.{checkds}.dsremoved.ns2-5-7",
|
|
logs_to_wait_for=(
|
|
"DS response from 10.53.0.2",
|
|
f"empty DS response from {addr}",
|
|
"empty DS response from 10.53.0.7",
|
|
),
|
|
expected_parent_state="!DSRemoved",
|
|
),
|
|
#
|
|
# 2.2.3: One parental agent is badly configured.
|
|
# parental-agents: ns5, ns6, ns7
|
|
#
|
|
CheckDSTest(
|
|
zone=f"bad.{checkds}.dsremoved.ns5-6-7",
|
|
logs_to_wait_for=(
|
|
f"empty DS response from {addr}",
|
|
"empty DS response from 10.53.0.7",
|
|
"bad DS response from 10.53.0.6",
|
|
),
|
|
expected_parent_state="!DSRemoved",
|
|
),
|
|
#
|
|
# 2.2.4:: DS is removed completely, bogus signature.
|
|
#
|
|
# TBD
|
|
]
|
|
|
|
|
|
checkds_no_tests = [
|
|
CheckDSTest(
|
|
zone="good.no.dspublish.ns2",
|
|
logs_to_wait_for=(),
|
|
expected_parent_state="!DSPublish",
|
|
),
|
|
CheckDSTest(
|
|
zone="good.no.dspublish.ns2-4",
|
|
logs_to_wait_for=(),
|
|
expected_parent_state="!DSPublish",
|
|
),
|
|
CheckDSTest(
|
|
zone="good.no.dsremoved.ns5",
|
|
logs_to_wait_for=(),
|
|
expected_parent_state="!DSRemoved",
|
|
),
|
|
CheckDSTest(
|
|
zone="good.no.dsremoved.ns5-7",
|
|
logs_to_wait_for=(),
|
|
expected_parent_state="!DSRemoved",
|
|
),
|
|
]
|
|
|
|
|
|
checkds_tests = (
|
|
parental_agents_tests
|
|
+ no_ent_tests
|
|
+ dspublished_tests("explicit", "10.53.0.8")
|
|
+ dspublished_tests("yes", "10.53.0.2")
|
|
+ dswithdrawn_tests("explicit", "10.53.0.10")
|
|
+ dswithdrawn_tests("yes", "10.53.0.5")
|
|
+ checkds_no_tests
|
|
)
|
|
|
|
|
|
@pytest.mark.parametrize("params", checkds_tests, ids=lambda t: t.zone)
|
|
def test_checkds(servers, params):
|
|
# Wait until the provided zone is signed and then verify its DNSSEC data.
|
|
zone_check(servers["ns9"], params.zone)
|
|
|
|
# Wait up to 10 seconds until all the expected log lines are found in the
|
|
# log file for the provided server. Rekey every second if necessary.
|
|
time_remaining = 10
|
|
for log_string in params.logs_to_wait_for:
|
|
line = f"zone {params.zone}/IN (signed): checkds: {log_string}"
|
|
while line not in servers["ns9"].log:
|
|
rekey(params.zone)
|
|
time_remaining -= 1
|
|
assert time_remaining, f'Timed out waiting for "{log_string}" to be logged'
|
|
time.sleep(1)
|
|
|
|
# Check whether key states on the parent server provided match
|
|
# expectations.
|
|
keystate_check(servers["ns2"], params.zone, params.expected_parent_state)
|