bind9/bin/tests/system/nsec3-answer/tests_nsec3.py
Petr Špaček b0c7f8b598 Fix compatibility with Python < 3.10
Dataclass kw_only argument was added only in Python 3.10 but EL9 image
has only 3.9.21.
2025-07-30 14:35:32 +00:00

410 lines
15 KiB
Python
Executable file

#!/usr/bin/python3
# Copyright (C) Internet Systems Consortium, Inc. ("ISC")
#
# SPDX-License-Identifier: MPL-2.0
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, you can obtain one at https://mozilla.org/MPL/2.0/.
#
# See the COPYRIGHT file distributed with this work for additional
# information regarding copyright ownership.
from dataclasses import dataclass
import os
from pathlib import Path
from typing import Optional, Set, Tuple
import pytest
pytest.importorskip("dns", minversion="2.5.0")
import dns.dnssec
import dns.message
import dns.name
import dns.query
import dns.rcode
import dns.rdataclass
import dns.rdatatype
import dns.rdtypes.ANY.RRSIG
import dns.rdtypes.ANY.NSEC3
import dns.rrset
from isctest.hypothesis.strategies import dns_names, sampled_from
import isctest
import isctest.name
from hypothesis import assume, given
SUFFIX = dns.name.from_text(".")
AUTH = "10.53.0.1"
RESOLVER = "10.53.0.2"
TIMEOUT = 5
ZONE = isctest.name.ZoneAnalyzer.read_path(
Path(os.environ["srcdir"]) / "nsec3-answer/ns1/root.db.in", origin=SUFFIX
)
def do_test_query(
qname: dns.name.Name, qtype: dns.rdatatype.RdataType, server: str, named_port: int
) -> Tuple[dns.message.QueryMessage, "NSEC3Checker"]:
query = dns.message.make_query(qname, qtype, use_edns=True, want_dnssec=True)
response = isctest.query.tcp(query, server, named_port, timeout=TIMEOUT)
isctest.check.is_response_to(response, query)
assert response.rcode() in (dns.rcode.NOERROR, dns.rcode.NXDOMAIN)
return response, NSEC3Checker(response)
@pytest.mark.parametrize(
"server", [pytest.param(AUTH, id="ns1"), pytest.param(RESOLVER, id="ns2")]
)
@given(
qname=sampled_from(
sorted(ZONE.reachable - ZONE.get_names_with_type(dns.rdatatype.CNAME))
)
)
def test_nodata(server: str, qname: dns.name.Name, named_port: int) -> None:
"""An existing name, no wildcards, but a query type for RRset which does not exist"""
_, nsec3check = do_test_query(qname, dns.rdatatype.HINFO, server, named_port)
check_nodata(qname, nsec3check)
@pytest.mark.parametrize("server", [pytest.param(AUTH, id="ns1")])
@given(
qname=dns_names(
suffix=(ZONE.delegations - ZONE.get_names_with_type(dns.rdatatype.DS))
)
)
def test_nodata_ds(server: str, qname: dns.name.Name, named_port: int) -> None:
"""Auth sends proof of nonexistance with referral without DS RR. Opt-out is not supported."""
response, nsec3check = do_test_query(qname, dns.rdatatype.HINFO, server, named_port)
nsrr = None
for rrset in response.authority:
if rrset.rdtype == dns.rdatatype.NS:
nsrr = rrset
break
assert nsrr is not None, "NS RRset missing in delegation answer"
# DS RR does not exist so we must prove it by having NSEC3 with QNAME
check_nodata(nsrr.name, nsec3check)
def check_nodata(name: dns.name.Name, nsec3check: "NSEC3Checker") -> None:
assert nsec3check.response.rcode() is dns.rcode.NOERROR
nsec3check.prove_name_exists(name)
nsec3check.check_extraneous_rrs()
def assume_nx_and_no_delegation(qname: dns.name.Name) -> None:
assume(qname not in ZONE.all_existing_names)
# name must not be under a delegation or DNAME:
# it would not work with resolver ns2
assume(
not isctest.name.is_related_to_any(
qname,
(dns.name.NameRelation.EQUAL, dns.name.NameRelation.SUBDOMAIN),
ZONE.reachable_delegations.union(ZONE.reachable_dnames),
)
)
@pytest.mark.parametrize(
"server", [pytest.param(AUTH, id="ns1"), pytest.param(RESOLVER, id="ns2")]
)
@given(qname=dns_names(suffix=SUFFIX))
def test_nxdomain(server: str, qname: dns.name.Name, named_port: int) -> None:
"""A real NXDOMAIN, no wildcards involved"""
assume_nx_and_no_delegation(qname)
wname = ZONE.source_of_synthesis(qname)
assume(wname not in ZONE.reachable_wildcards)
_, nsec3check = do_test_query(qname, dns.rdatatype.A, server, named_port)
check_nxdomain(qname, nsec3check)
@pytest.mark.parametrize(
"server", [pytest.param(AUTH, id="ns1"), pytest.param(RESOLVER, id="ns2")]
)
@given(qname=sampled_from(sorted(ZONE.get_names_with_type(dns.rdatatype.CNAME))))
def test_cname_nxdomain(server: str, qname: dns.name.Name, named_port: int) -> None:
"""CNAME which terminates by NXDOMAIN, no wildcards involved"""
response, nsec3check = do_test_query(qname, dns.rdatatype.A, server, named_port)
chain = response.resolve_chaining()
assume_nx_and_no_delegation(chain.canonical_name)
wname = ZONE.source_of_synthesis(chain.canonical_name)
assume(wname not in ZONE.reachable_wildcards)
check_nxdomain(chain.canonical_name, nsec3check)
@pytest.mark.parametrize(
"server", [pytest.param(AUTH, id="ns1"), pytest.param(RESOLVER, id="ns2")]
)
@given(qname=dns_names(suffix=ZONE.get_names_with_type(dns.rdatatype.DNAME)))
def test_dname_nxdomain(server: str, qname: dns.name.Name, named_port: int) -> None:
"""DNAME which terminates by NXDOMAIN, no wildcards involved"""
assume(qname not in ZONE.reachable)
response, nsec3check = do_test_query(qname, dns.rdatatype.A, server, named_port)
chain = response.resolve_chaining()
assume_nx_and_no_delegation(chain.canonical_name)
wname = ZONE.source_of_synthesis(chain.canonical_name)
assume(wname not in ZONE.reachable_wildcards)
check_nxdomain(chain.canonical_name, nsec3check)
@pytest.mark.parametrize(
"server", [pytest.param(AUTH, id="ns1"), pytest.param(RESOLVER, id="ns2")]
)
@given(qname=dns_names(suffix=ZONE.ents))
def test_ents(server: str, qname: dns.name.Name, named_port: int) -> None:
"""ENT can have a wildcard under it"""
assume_nx_and_no_delegation(qname)
_, nsec3check = do_test_query(qname, dns.rdatatype.A, server, named_port)
wname = ZONE.source_of_synthesis(qname)
# does qname match a wildcard under ENT?
if wname in ZONE.reachable_wildcards:
check_wildcard_synthesis(qname, nsec3check)
else:
check_nxdomain(qname, nsec3check)
@pytest.mark.parametrize(
"server", [pytest.param(AUTH, id="ns1"), pytest.param(RESOLVER, id="ns2")]
)
@given(qname=dns_names(suffix=ZONE.reachable_wildcard_parents))
def test_wildcard_synthesis(server: str, qname: dns.name.Name, named_port: int) -> None:
assume(qname not in ZONE.all_existing_names)
wname = ZONE.source_of_synthesis(qname)
assume(wname in ZONE.reachable_wildcards)
_, nsec3check = do_test_query(qname, dns.rdatatype.A, server, named_port)
check_wildcard_synthesis(qname, nsec3check)
@pytest.mark.parametrize(
"server", [pytest.param(AUTH, id="ns1"), pytest.param(RESOLVER, id="ns2")]
)
@given(qname=dns_names(suffix=ZONE.reachable_wildcard_parents))
def test_wildcard_nodata(server: str, qname: dns.name.Name, named_port: int) -> None:
assume(qname not in ZONE.all_existing_names)
wname = ZONE.source_of_synthesis(qname)
assume(wname in ZONE.reachable_wildcards)
_, nsec3check = do_test_query(qname, dns.rdatatype.AAAA, server, named_port)
check_wildcard_nodata(qname, nsec3check)
def check_wildcard_nodata(qname: dns.name.Name, nsec3check: "NSEC3Checker") -> None:
assert nsec3check.response.rcode() is dns.rcode.NOERROR
ce, nce = ZONE.closest_encloser(qname)
nsec3check.prove_name_exists(ce)
nsec3check.prove_name_does_not_exist(nce)
wname = ZONE.source_of_synthesis(qname)
# expecting proof that wildcard owner does not have rdatatype requested
nsec3check.prove_name_exists(wname)
nsec3check.check_extraneous_rrs()
def check_nxdomain(qname: dns.name.Name, nsec3check: "NSEC3Checker") -> None:
assert nsec3check.response.rcode() is dns.rcode.NXDOMAIN
ce, nce = ZONE.closest_encloser(qname)
nsec3check.prove_name_exists(ce)
nsec3check.prove_name_does_not_exist(nce)
wname = ZONE.source_of_synthesis(qname)
nsec3check.prove_name_does_not_exist(wname)
nsec3check.check_extraneous_rrs()
def check_wildcard_synthesis(qname: dns.name.Name, nsec3check: "NSEC3Checker") -> None:
"""Expect wildcard response with a signed A RRset"""
assert nsec3check.response.rcode() is dns.rcode.NOERROR
answer_sig = nsec3check.response.get_rrset(
section="ANSWER",
name=qname,
rdclass=dns.rdataclass.IN,
rdtype=dns.rdatatype.RRSIG,
covers=dns.rdatatype.A,
)
assert answer_sig is not None
assert len(answer_sig) == 1
rrsig = answer_sig[0]
assert isinstance(rrsig, dns.rdtypes.ANY.RRSIG.RRSIG)
# RRSIG labels field RFC 4034 section 3.1.3 does not count:
# - root label
# - leftmost * label
wildcard_parent_labels = rrsig.labels + 1 # add root but not leftmost *
assert wildcard_parent_labels < len(qname)
# 1. We have RRSIG from the wildcard '*.something', which proves the node
# 'something' exists (by definition - it has a child, so it exists, but
# maybe it is an ENT). Thus we expect closest encloser = 'something'
# 2. If wildcard synthesis is legitimate, QNAME itself and no nodes between
# QNAME and the closest encloser can exist. Because of DNS node existence
# rules it's sufficient to prove non-existence of next-closer name, i.e.
# <one_label_under>.<closest_encloser>, to deny existence of the whole
# subtree down to QNAME.
ce, nce = ZONE.closest_encloser(qname)
assert ce == qname.split(wildcard_parent_labels)[1]
# ce is proven to exist by the RRSIG
assert nce == qname.split(wildcard_parent_labels + 1)[1]
nsec3check.prove_name_does_not_exist(nce)
nsec3check.check_extraneous_rrs()
@dataclass(frozen=True)
class NSEC3Params:
"""Common values from a single DNS response"""
algorithm: int
flags: int
iterations: int
salt: Optional[bytes]
class NSEC3Checker:
def __init__(self, response: dns.message.Message):
for rrset in response.answer:
assert not rrset.match(
dns.rdataclass.IN, dns.rdatatype.NSEC3, dns.rdatatype.NONE
), f"unexpected NSEC3 RR in ANSWER section:\n{response}"
for rrset in response.additional:
assert not rrset.match(
dns.rdataclass.IN, dns.rdatatype.NSEC3, dns.rdatatype.NONE
), f"unexpected NSEC3 RR in ADDITIONAL section:\n{response}"
attrs_seen = {
"algorithm": None,
"flags": None,
"iterations": None,
"salt": None,
}
first = True
owners_seen = set()
self.rrsets = []
for rrset in response.authority:
if not rrset.match(
dns.rdataclass.IN, dns.rdatatype.NSEC3, dns.rdatatype.NONE
):
continue
assert (
rrset.name not in owners_seen
), f"duplicate NSEC3 owner {rrset.name}:\n{response}"
owners_seen.add(rrset.name)
assert len(rrset) == 1
rr = rrset[0]
assert isinstance(rr, dns.rdtypes.ANY.NSEC3.NSEC3)
assert (
"NSEC3"
not in dns.rdtypes.ANY.NSEC3.Bitmap(rr.windows).to_text().split()
), f"NSEC3 RRset with NSEC3 in type bitmap:\n{response}"
# NSEC3 parameters MUST be consistent across all NSEC3 RRs:
# RFC 5155 section 7.2, last paragraph
for attr_name, value_seen in attrs_seen.items():
current = getattr(rr, attr_name)
if first:
attrs_seen[attr_name] = current
else:
assert (
current == value_seen
), f"inconsistent {attr_name}\n{response}"
first = False
self.rrsets.append(rrset)
assert attrs_seen["algorithm"] is not None, f"no NSEC3 found\n{response}"
self.params: NSEC3Params = NSEC3Params(**attrs_seen)
self.response: dns.message.Message = response
self.owners_present: Set[dns.name.Name] = owners_seen
self.owners_used: Set[dns.name.Name] = set()
@staticmethod
def nsec3_covers(rrset: dns.rrset.RRset, hashed_name: dns.name.Name) -> bool:
"""
Test if 'hashed_name' is covered by an NSEC3 record in 'rrset', i.e. the name does not exist.
"""
prev_name = rrset.name
assert len(rrset) == 1
nsec3 = rrset[0]
assert isinstance(nsec3, dns.rdtypes.ANY.NSEC3.NSEC3)
assert nsec3.flags == 0, "opt-out not supported by test logic"
next_name = nsec3.next_name(SUFFIX)
# Single name case.
if prev_name == next_name:
return prev_name != hashed_name
# Standard case.
if prev_name < next_name:
if prev_name < hashed_name < next_name:
return True
# The cover wraps.
if next_name < prev_name:
# Case 1: The covered name is at the end of the chain.
if hashed_name > prev_name:
return True
# Case 2: The covered name is at the start of the chain.
if hashed_name < next_name:
return True
return False
def hash_name(self, name: dns.name.Name) -> dns.name.Name:
nhash = dns.dnssec.nsec3_hash(
name,
salt=self.params.salt,
iterations=self.params.iterations,
algorithm=self.params.algorithm,
)
return dns.name.from_text(nhash, SUFFIX)
def prove_name_does_not_exist(self, name: dns.name.Name) -> dns.rrset.RRset:
"""Hash of a given name must fall between an NSEC3 owner and 'next' name"""
hashed_name = self.hash_name(name)
for rrset in self.rrsets:
name_is_covered = self.nsec3_covers(rrset, hashed_name)
if name_is_covered:
self.owners_used.add(rrset.name)
return rrset
assert (
False
), f"Expected covering NSEC3 for {name} (hash={hashed_name}) not found:\n{self.response}"
def prove_name_exists(self, owner: dns.name.Name) -> dns.rrset.RRset:
"""Check response has NSEC3 RR matching given owner name, i.e. the name exists."""
nsec3_owner = self.hash_name(owner)
for rrset in self.rrsets:
if rrset.match(
nsec3_owner, dns.rdataclass.IN, dns.rdatatype.NSEC3, dns.rdatatype.NONE
):
self.owners_used.add(rrset.name)
return rrset
assert (
False
), f"Expected matching NSEC3 for {owner} (hash={nsec3_owner}) not found:\n{self.response}"
def check_extraneous_rrs(self) -> None:
"""Check that all NSEC3 RRs present in the message were actually needed for proofs"""
assert (
self.owners_used == self.owners_present
), f"extraneous NSEC3 RRs detected\n{self.response}"