mirror of
https://github.com/isc-projects/bind9.git
synced 2026-03-11 02:30:44 -04:00
chg: test: Replace dns.resolver module in system tests
Closes #4634 Merge branch '4634-drop-dns.resolver-module-from-system-tests' into 'main' See merge request isc-projects/bind9!9150
This commit is contained in:
commit
91bc0b439a
10 changed files with 213 additions and 188 deletions
|
|
@ -1,4 +1,10 @@
|
|||
[MASTER]
|
||||
[IMPORTS]
|
||||
|
||||
deprecated-modules=
|
||||
dns.resolver,
|
||||
|
||||
[MESSAGES CONTROL]
|
||||
|
||||
disable=
|
||||
C0103, # invalid-name
|
||||
C0114, # missing-module-docstring
|
||||
|
|
|
|||
|
|
@ -15,10 +15,12 @@ import os
|
|||
import re
|
||||
import subprocess
|
||||
|
||||
import isctest
|
||||
import pytest
|
||||
|
||||
import dns.message
|
||||
|
||||
pytest.importorskip("dns", minversion="2.0.0")
|
||||
import dns.resolver
|
||||
|
||||
|
||||
def run_rndc(server, rndc_command):
|
||||
|
|
@ -34,15 +36,13 @@ def run_rndc(server, rndc_command):
|
|||
subprocess.check_output(cmdline, stderr=subprocess.STDOUT, timeout=10)
|
||||
|
||||
|
||||
def test_dnstap_dispatch_socket_addresses(named_port):
|
||||
# Prepare for querying ns3.
|
||||
resolver = dns.resolver.Resolver()
|
||||
resolver.nameservers = ["10.53.0.3"]
|
||||
resolver.port = named_port
|
||||
|
||||
def test_dnstap_dispatch_socket_addresses():
|
||||
# Send some query to ns3 so that it records something in its dnstap file.
|
||||
ans = resolver.resolve("mail.example.", "A")
|
||||
assert ans[0].address == "10.0.0.2"
|
||||
msg = dns.message.make_query("mail.example.", "A")
|
||||
res = isctest.query.tcp(msg, "10.53.0.2", expected_rcode=dns.rcode.NOERROR)
|
||||
assert res.answer == [
|
||||
dns.rrset.from_text("mail.example.", 300, "IN", "A", "10.0.0.2")
|
||||
]
|
||||
|
||||
# Before continuing, roll dnstap file to ensure it is flushed to disk.
|
||||
run_rndc("10.53.0.3", ["dnstap", "-roll", "1"])
|
||||
|
|
|
|||
|
|
@ -10,23 +10,14 @@
|
|||
# information regarding copyright ownership.
|
||||
|
||||
import shutil
|
||||
from typing import Any, Optional
|
||||
from typing import Optional
|
||||
|
||||
import dns.rcode
|
||||
import dns.message
|
||||
import dns.zone
|
||||
|
||||
import isctest.log
|
||||
|
||||
# compatiblity with dnspython<2.0.0
|
||||
try:
|
||||
# In dnspython>=2.0.0, dns.rcode.Rcode class is available
|
||||
# pylint: disable=invalid-name
|
||||
dns_rcode = dns.rcode.Rcode # type: Any
|
||||
except AttributeError:
|
||||
# In dnspython<2.0.0, selected rcodes are available as integers directly
|
||||
# from dns.rcode
|
||||
dns_rcode = dns.rcode
|
||||
from isctest.compat import dns_rcode
|
||||
|
||||
|
||||
def rcode(message: dns.message.Message, expected_rcode) -> None:
|
||||
|
|
|
|||
24
bin/tests/system/isctest/compat.py
Normal file
24
bin/tests/system/isctest/compat.py
Normal file
|
|
@ -0,0 +1,24 @@
|
|||
# Copyright (C) Internet Systems Consortium, Inc. ("ISC")
|
||||
#
|
||||
# SPDX-License-Identifier: MPL-2.0
|
||||
#
|
||||
# This Source Code Form is subject to the terms of the Mozilla Public
|
||||
# License, v. 2.0. If a copy of the MPL was not distributed with this
|
||||
# file, you can obtain one at https://mozilla.org/MPL/2.0/.
|
||||
#
|
||||
# See the COPYRIGHT file distributed with this work for additional
|
||||
# information regarding copyright ownership.
|
||||
|
||||
from typing import Any
|
||||
|
||||
import dns.rcode
|
||||
|
||||
# compatiblity with dnspython<2.0.0
|
||||
try:
|
||||
# In dnspython>=2.0.0, dns.rcode.Rcode class is available
|
||||
# pylint: disable=invalid-name
|
||||
dns_rcode = dns.rcode.Rcode # type: Any
|
||||
except AttributeError:
|
||||
# In dnspython<2.0.0, selected rcodes are available as integers directly
|
||||
# from dns.rcode
|
||||
dns_rcode = dns.rcode
|
||||
|
|
@ -10,34 +10,50 @@
|
|||
# information regarding copyright ownership.
|
||||
|
||||
import os
|
||||
from typing import Optional
|
||||
import time
|
||||
from typing import Any, Callable, Optional
|
||||
|
||||
import dns.query
|
||||
import dns.message
|
||||
|
||||
import isctest.log
|
||||
from isctest.compat import dns_rcode
|
||||
|
||||
QUERY_TIMEOUT = 10
|
||||
|
||||
|
||||
def udp(
|
||||
# pylint: disable=too-many-arguments
|
||||
def generic_query(
|
||||
query_func: Callable[..., Any],
|
||||
message: dns.message.Message,
|
||||
ip: str,
|
||||
port: Optional[int] = None,
|
||||
source: Optional[str] = None,
|
||||
timeout: int = QUERY_TIMEOUT,
|
||||
) -> dns.message.Message:
|
||||
attempts: int = 10,
|
||||
expected_rcode: dns_rcode = None,
|
||||
) -> Any:
|
||||
if port is None:
|
||||
port = int(os.environ["PORT"])
|
||||
return dns.query.udp(message, ip, timeout, port=port, source=source)
|
||||
res = None
|
||||
for attempt in range(attempts):
|
||||
try:
|
||||
isctest.log.debug(
|
||||
f"{generic_query.__name__}(): ip={ip}, port={port}, source={source}, "
|
||||
f"timeout={timeout}, attempts left={attempts-attempt}"
|
||||
)
|
||||
res = query_func(message, ip, timeout, port=port, source=source)
|
||||
if res.rcode() == expected_rcode or expected_rcode is None:
|
||||
return res
|
||||
except (dns.exception.Timeout, ConnectionRefusedError) as e:
|
||||
isctest.log.debug(f"{generic_query.__name__}(): the '{e}' exceptio raised")
|
||||
time.sleep(1)
|
||||
raise dns.exception.Timeout
|
||||
|
||||
|
||||
def tcp(
|
||||
message: dns.message.Message,
|
||||
ip: str,
|
||||
port: Optional[int] = None,
|
||||
source: Optional[str] = None,
|
||||
timeout: int = QUERY_TIMEOUT,
|
||||
) -> dns.message.Message:
|
||||
if port is None:
|
||||
port = int(os.environ["PORT"])
|
||||
return dns.query.tcp(message, ip, timeout, port=port, source=source)
|
||||
def udp(*args, **kwargs) -> Any:
|
||||
return generic_query(dns.query.udp, *args, **kwargs)
|
||||
|
||||
|
||||
def tcp(*args, **kwargs) -> Any:
|
||||
return generic_query(dns.query.tcp, *args, **kwargs)
|
||||
|
|
|
|||
|
|
@ -9,11 +9,15 @@
|
|||
# See the COPYRIGHT file distributed with this work for additional
|
||||
# information regarding copyright ownership.
|
||||
|
||||
import os
|
||||
import subprocess
|
||||
import time
|
||||
from typing import Optional
|
||||
|
||||
import isctest.log
|
||||
from isctest.compat import dns_rcode
|
||||
|
||||
import dns.message
|
||||
|
||||
|
||||
def cmd( # pylint: disable=too-many-arguments
|
||||
|
|
@ -70,3 +74,36 @@ def retry_with_timeout(func, timeout, delay=1, msg=None):
|
|||
if msg is None:
|
||||
msg = f"{func.__module__}.{func.__qualname__} timed out after {timeout} s"
|
||||
assert False, msg
|
||||
|
||||
|
||||
def get_named_cmdline(cfg_dir, cfg_file="named.conf"):
|
||||
cfg_dir = os.path.join(os.getcwd(), cfg_dir)
|
||||
assert os.path.isdir(cfg_dir)
|
||||
|
||||
cfg_file = os.path.join(cfg_dir, cfg_file)
|
||||
assert os.path.isfile(cfg_file)
|
||||
|
||||
named = os.getenv("NAMED")
|
||||
assert named is not None
|
||||
|
||||
named_cmdline = [named, "-c", cfg_file, "-d", "99", "-g"]
|
||||
|
||||
return named_cmdline
|
||||
|
||||
|
||||
def get_custom_named_instance(assumed_ns):
|
||||
# This test launches and monitors a named instance itself rather than using
|
||||
# bin/tests/system/start.pl, so manually defining a NamedInstance here is
|
||||
# necessary for sending RNDC commands to that instance. If this "custom"
|
||||
# instance listens on 10.53.0.3, use "ns3" as the identifier passed to
|
||||
# the NamedInstance constructor.
|
||||
named_ports = isctest.instance.NamedPorts.from_env()
|
||||
instance = isctest.instance.NamedInstance(assumed_ns, named_ports)
|
||||
|
||||
return instance
|
||||
|
||||
|
||||
def assert_custom_named_is_alive(named_proc, resolver_ip):
|
||||
assert named_proc.poll() is None, "named isn't running"
|
||||
msg = dns.message.make_query("version.bind", "TXT", "CH")
|
||||
isctest.query.tcp(msg, resolver_ip, expected_rcode=dns_rcode.NOERROR)
|
||||
|
|
|
|||
|
|
@ -11,121 +11,100 @@
|
|||
# See the COPYRIGHT file distributed with this work for additional
|
||||
# information regarding copyright ownership.
|
||||
|
||||
import time
|
||||
import os
|
||||
|
||||
import pytest
|
||||
|
||||
pytest.importorskip("dns", minversion="2.0.0")
|
||||
import dns.resolver
|
||||
import isctest
|
||||
from isctest.compat import dns_rcode
|
||||
|
||||
import dns.message
|
||||
|
||||
|
||||
def wait_for_transfer(ip, port, client_ip, name, rrtype):
|
||||
resolver = dns.resolver.Resolver()
|
||||
resolver.nameservers = [ip]
|
||||
resolver.port = port
|
||||
@pytest.mark.parametrize(
|
||||
"qname,source,rcode",
|
||||
[
|
||||
# For 10.53.0.1 source IP:
|
||||
# - baddomain.com isn't allowed (CNAME .), should return NXDOMAIN
|
||||
# - gooddomain.com is allowed
|
||||
# - allowed. is allowed
|
||||
("baddomain.", "10.53.0.1", dns.rcode.NXDOMAIN),
|
||||
("gooddomain.", "10.53.0.1", dns.rcode.NOERROR),
|
||||
("allowed.", "10.53.0.1", dns.rcode.NOERROR),
|
||||
# For 10.53.0.2 source IP:
|
||||
# - allowed.com isn't allowed (CNAME .), should return NXDOMAIN
|
||||
# - baddomain.com is allowed
|
||||
# - gooddomain.com is allowed
|
||||
("baddomain.", "10.53.0.2", dns.rcode.NOERROR),
|
||||
("gooddomain.", "10.53.0.2", dns.rcode.NOERROR),
|
||||
("allowed.", "10.53.0.2", dns.rcode.NXDOMAIN),
|
||||
# For 10.53.0.3 source IP:
|
||||
# - gooddomain.com is allowed
|
||||
# - baddomain.com is allowed
|
||||
# - allowed. is allowed
|
||||
("baddomain.", "10.53.0.3", dns.rcode.NOERROR),
|
||||
("gooddomain.", "10.53.0.3", dns.rcode.NOERROR),
|
||||
("allowed.", "10.53.0.3", dns.rcode.NOERROR),
|
||||
# For 10.53.0.4 source IP:
|
||||
# - gooddomain.com isn't allowed (CNAME .), should return NXDOMAIN
|
||||
# - baddomain.com isn't allowed (CNAME .), should return NXDOMAIN
|
||||
# - allowed. is allowed
|
||||
("baddomain.", "10.53.0.4", dns.rcode.NXDOMAIN),
|
||||
("gooddomain.", "10.53.0.4", dns.rcode.NXDOMAIN),
|
||||
("allowed.", "10.53.0.4", dns.rcode.NOERROR),
|
||||
# For 10.53.0.5 (any) source IP:
|
||||
# - baddomain.com is allowed
|
||||
# - gooddomain.com isn't allowed (CNAME .), should return NXDOMAIN
|
||||
# - allowed.com isn't allowed (CNAME .), should return NXDOMAIN
|
||||
("baddomain.", "10.53.0.5", dns.rcode.NOERROR),
|
||||
("gooddomain.", "10.53.0.5", dns.rcode.NXDOMAIN),
|
||||
("allowed.", "10.53.0.5", dns.rcode.NXDOMAIN),
|
||||
],
|
||||
)
|
||||
def test_rpz_multiple_views(qname, source, rcode):
|
||||
# Wait for the rpz-external.local zone transfer
|
||||
msg = dns.message.make_query("rpz-external.local", "SOA")
|
||||
isctest.query.tcp(
|
||||
msg,
|
||||
ip="10.53.0.3",
|
||||
source="10.53.0.2",
|
||||
expected_rcode=dns_rcode.NOERROR,
|
||||
)
|
||||
isctest.query.tcp(
|
||||
msg,
|
||||
ip="10.53.0.3",
|
||||
source="10.53.0.5",
|
||||
expected_rcode=dns_rcode.NOERROR,
|
||||
)
|
||||
|
||||
for _ in range(10):
|
||||
try:
|
||||
resolver.resolve(name, rrtype, source=client_ip)
|
||||
except dns.resolver.NoNameservers:
|
||||
time.sleep(1)
|
||||
else:
|
||||
break
|
||||
else:
|
||||
raise RuntimeError(
|
||||
"zone transfer failed: "
|
||||
f"client {client_ip} got NXDOMAIN for {name} {rrtype} from @{ip}:{port}"
|
||||
)
|
||||
msg = dns.message.make_query(qname, "A")
|
||||
res = isctest.query.udp(msg, "10.53.0.3", source=source, expected_rcode=rcode)
|
||||
if rcode == dns.rcode.NOERROR:
|
||||
assert res.answer == [dns.rrset.from_text(qname, 300, "IN", "A", "10.53.0.2")]
|
||||
|
||||
|
||||
def test_rpz_multiple_views(named_port):
|
||||
resolver = dns.resolver.Resolver()
|
||||
resolver.nameservers = ["10.53.0.3"]
|
||||
resolver.port = named_port
|
||||
|
||||
wait_for_transfer("10.53.0.3", named_port, "10.53.0.2", "rpz-external.local", "SOA")
|
||||
wait_for_transfer("10.53.0.3", named_port, "10.53.0.5", "rpz-external.local", "SOA")
|
||||
|
||||
# For 10.53.0.1 source IP:
|
||||
# - baddomain.com isn't allowed (CNAME .), should return NXDOMAIN
|
||||
# - gooddomain.com is allowed
|
||||
# - allowed. is allowed
|
||||
with pytest.raises(dns.resolver.NXDOMAIN):
|
||||
resolver.resolve("baddomain.", "A", source="10.53.0.1")
|
||||
|
||||
ans = resolver.resolve("gooddomain.", "A", source="10.53.0.1")
|
||||
assert ans[0].address == "10.53.0.2"
|
||||
|
||||
ans = resolver.resolve("allowed.", "A", source="10.53.0.1")
|
||||
assert ans[0].address == "10.53.0.2"
|
||||
|
||||
# For 10.53.0.2 source IP:
|
||||
# - allowed.com isn't allowed (CNAME .), should return NXDOMAIN
|
||||
# - baddomain.com is allowed
|
||||
# - gooddomain.com is allowed
|
||||
ans = resolver.resolve("baddomain.", "A", source="10.53.0.2")
|
||||
assert ans[0].address == "10.53.0.2"
|
||||
|
||||
ans = resolver.resolve("gooddomain.", "A", source="10.53.0.2")
|
||||
assert ans[0].address == "10.53.0.2"
|
||||
|
||||
with pytest.raises(dns.resolver.NXDOMAIN):
|
||||
resolver.resolve("allowed.", "A", source="10.53.0.2")
|
||||
|
||||
# For 10.53.0.3 source IP:
|
||||
# - gooddomain.com is allowed
|
||||
# - baddomain.com is allowed
|
||||
# - allowed. is allowed
|
||||
ans = resolver.resolve("baddomain.", "A", source="10.53.0.3")
|
||||
assert ans[0].address == "10.53.0.2"
|
||||
|
||||
ans = resolver.resolve("gooddomain.", "A", source="10.53.0.3")
|
||||
assert ans[0].address == "10.53.0.2"
|
||||
|
||||
ans = resolver.resolve("allowed.", "A", source="10.53.0.3")
|
||||
assert ans[0].address == "10.53.0.2"
|
||||
|
||||
# For 10.53.0.4 source IP:
|
||||
# - gooddomain.com isn't allowed (CNAME .), should return NXDOMAIN
|
||||
# - baddomain.com isn't allowed (CNAME .), should return NXDOMAIN
|
||||
# - allowed. is allowed
|
||||
with pytest.raises(dns.resolver.NXDOMAIN):
|
||||
resolver.resolve("baddomain.", "A", source="10.53.0.4")
|
||||
|
||||
with pytest.raises(dns.resolver.NXDOMAIN):
|
||||
resolver.resolve("gooddomain.", "A", source="10.53.0.4")
|
||||
|
||||
ans = resolver.resolve("allowed.", "A", source="10.53.0.4")
|
||||
assert ans[0].address == "10.53.0.2"
|
||||
|
||||
# For 10.53.0.5 (any) source IP:
|
||||
# - baddomain.com is allowed
|
||||
# - gooddomain.com isn't allowed (CNAME .), should return NXDOMAIN
|
||||
# - allowed.com isn't allowed (CNAME .), should return NXDOMAIN
|
||||
ans = resolver.resolve("baddomain.", "A", source="10.53.0.5")
|
||||
assert ans[0].address == "10.53.0.2"
|
||||
|
||||
with pytest.raises(dns.resolver.NXDOMAIN):
|
||||
resolver.resolve("gooddomain.", "A", source="10.53.0.5")
|
||||
|
||||
with pytest.raises(dns.resolver.NXDOMAIN):
|
||||
resolver.resolve("allowed.", "A", source="10.53.0.5")
|
||||
|
||||
|
||||
def test_rpz_passthru_logging(named_port):
|
||||
resolver = dns.resolver.Resolver()
|
||||
resolver.nameservers = ["10.53.0.3"]
|
||||
resolver.port = named_port
|
||||
def test_rpz_passthru_logging():
|
||||
resolver_ip = "10.53.0.3"
|
||||
|
||||
# Should generate a log entry into rpz_passthru.txt
|
||||
ans = resolver.resolve("allowed.", "A", source="10.53.0.1")
|
||||
assert ans[0].address == "10.53.0.2"
|
||||
msg_allowed = dns.message.make_query("allowed.", "A")
|
||||
res_allowed = isctest.query.udp(
|
||||
msg_allowed, resolver_ip, source="10.53.0.1", expected_rcode=dns.rcode.NOERROR
|
||||
)
|
||||
assert res_allowed.answer == [
|
||||
dns.rrset.from_text("allowed.", 300, "IN", "A", "10.53.0.2")
|
||||
]
|
||||
|
||||
# baddomain.com isn't allowed (CNAME .), should return NXDOMAIN
|
||||
# Should generate a log entry into rpz.txt
|
||||
with pytest.raises(dns.resolver.NXDOMAIN):
|
||||
resolver.resolve("baddomain.", "A", source="10.53.0.1")
|
||||
msg_not_allowed = dns.message.make_query("baddomain.", "A")
|
||||
res_not_allowed = isctest.query.udp(
|
||||
msg_not_allowed,
|
||||
resolver_ip,
|
||||
source="10.53.0.1",
|
||||
expected_rcode=dns.rcode.NXDOMAIN,
|
||||
)
|
||||
isctest.check.nxdomain(res_not_allowed)
|
||||
|
||||
rpz_passthru_logfile = os.path.join("ns3", "rpz_passthru.txt")
|
||||
rpz_logfile = os.path.join("ns3", "rpz.txt")
|
||||
|
|
|
|||
|
|
@ -23,12 +23,11 @@ import pytest
|
|||
|
||||
pytest.importorskip("dns", minversion="2.0.0")
|
||||
import dns.exception
|
||||
import dns.resolver
|
||||
|
||||
import isctest
|
||||
|
||||
|
||||
def do_work(named_proc, resolver, instance, kill_method, n_workers, n_queries):
|
||||
def do_work(named_proc, resolver_ip, instance, kill_method, n_workers, n_queries):
|
||||
"""Creates a number of A queries to run in parallel
|
||||
in order simulate a slightly more realistic test scenario.
|
||||
|
||||
|
|
@ -47,8 +46,8 @@ def do_work(named_proc, resolver, instance, kill_method, n_workers, n_queries):
|
|||
:param named_proc: named process instance
|
||||
:type named_proc: subprocess.Popen
|
||||
|
||||
:param resolver: target resolver
|
||||
:type resolver: dns.resolver.Resolver
|
||||
:param resolver_ip: target resolver's IP address
|
||||
:type resolver_ip: str
|
||||
|
||||
:param instance: the named instance to send RNDC commands to
|
||||
:type instance: isctest.instance.NamedInstance
|
||||
|
|
@ -74,7 +73,7 @@ def do_work(named_proc, resolver, instance, kill_method, n_workers, n_queries):
|
|||
return -1
|
||||
|
||||
# We're going to execute queries in parallel by means of a thread pool.
|
||||
# dnspython functions block, so we need to circunvent that.
|
||||
# dnspython functions block, so we need to circumvent that.
|
||||
with ThreadPoolExecutor(n_workers + 1) as executor:
|
||||
# Helper dict, where keys=Future objects and values are tags used
|
||||
# to process results later.
|
||||
|
|
@ -83,7 +82,7 @@ def do_work(named_proc, resolver, instance, kill_method, n_workers, n_queries):
|
|||
# 50% of work will be A queries.
|
||||
# 1 work will be rndc stop.
|
||||
# Remaining work will be rndc status (so we test parallel control
|
||||
# connections that were crashing named).
|
||||
# connections that were crashing named).
|
||||
shutdown = True
|
||||
for i in range(n_queries):
|
||||
if i < (n_queries // 2):
|
||||
|
|
@ -101,7 +100,8 @@ def do_work(named_proc, resolver, instance, kill_method, n_workers, n_queries):
|
|||
)
|
||||
|
||||
qname = relname + ".test"
|
||||
futures[executor.submit(resolver.resolve, qname, "A")] = tag
|
||||
msg = dns.message.make_query(qname, "A")
|
||||
futures[executor.submit(isctest.query.udp, msg, resolver_ip)] = tag
|
||||
elif shutdown: # We attempt to stop named in the middle
|
||||
shutdown = False
|
||||
if kill_method == "rndc":
|
||||
|
|
@ -125,28 +125,13 @@ def do_work(named_proc, resolver, instance, kill_method, n_workers, n_queries):
|
|||
# named process exited gracefully after SIGTERM signal.
|
||||
if futures[future] == "stop":
|
||||
ret_code = result
|
||||
|
||||
except (
|
||||
dns.resolver.NXDOMAIN,
|
||||
dns.resolver.NoNameservers,
|
||||
dns.exception.Timeout,
|
||||
):
|
||||
except dns.exception.Timeout:
|
||||
pass
|
||||
|
||||
if kill_method == "rndc":
|
||||
assert ret_code == 0
|
||||
|
||||
|
||||
def wait_for_named_loaded(resolver, retries=10):
|
||||
for _ in range(retries):
|
||||
try:
|
||||
resolver.resolve("version.bind", "TXT", "CH")
|
||||
return True
|
||||
except (dns.resolver.NoNameservers, dns.exception.Timeout):
|
||||
time.sleep(1)
|
||||
return False
|
||||
|
||||
|
||||
def wait_for_proc_termination(proc, max_timeout=10):
|
||||
for _ in range(max_timeout):
|
||||
if proc.poll() is not None:
|
||||
|
|
@ -171,40 +156,22 @@ def wait_for_proc_termination(proc, max_timeout=10):
|
|||
["rndc", "sigterm"],
|
||||
)
|
||||
def test_named_shutdown(kill_method):
|
||||
# pylint: disable-msg=too-many-locals
|
||||
cfg_dir = os.path.join(os.getcwd(), "resolver")
|
||||
assert os.path.isdir(cfg_dir)
|
||||
resolver_ip = "10.53.0.3"
|
||||
|
||||
cfg_file = os.path.join(cfg_dir, "named.conf")
|
||||
assert os.path.isfile(cfg_file)
|
||||
cfg_dir = "resolver"
|
||||
|
||||
named = os.getenv("NAMED")
|
||||
assert named is not None
|
||||
named_cmdline = isctest.run.get_named_cmdline(cfg_dir)
|
||||
instance = isctest.run.get_custom_named_instance("ns3")
|
||||
|
||||
# This test launches and monitors a named instance itself rather than using
|
||||
# bin/tests/system/start.pl, so manually defining a NamedInstance here is
|
||||
# necessary for sending RNDC commands to that instance. This "custom"
|
||||
# instance listens on 10.53.0.3, so use "ns3" as the identifier passed to
|
||||
# the NamedInstance constructor.
|
||||
named_ports = isctest.instance.NamedPorts.from_env()
|
||||
instance = isctest.instance.NamedInstance("ns3", named_ports)
|
||||
|
||||
# We create a resolver instance that will be used to send queries.
|
||||
resolver = dns.resolver.Resolver()
|
||||
resolver.nameservers = ["10.53.0.3"]
|
||||
resolver.port = named_ports.dns
|
||||
|
||||
named_cmdline = [named, "-c", cfg_file, "-d", "99", "-g"]
|
||||
with open(os.path.join(cfg_dir, "named.run"), "ab") as named_log:
|
||||
with subprocess.Popen(
|
||||
named_cmdline, cwd=cfg_dir, stderr=named_log
|
||||
) as named_proc:
|
||||
try:
|
||||
assert named_proc.poll() is None, "named isn't running"
|
||||
assert wait_for_named_loaded(resolver)
|
||||
isctest.run.assert_custom_named_is_alive(named_proc, resolver_ip)
|
||||
do_work(
|
||||
named_proc,
|
||||
resolver,
|
||||
resolver_ip,
|
||||
instance,
|
||||
kill_method,
|
||||
n_workers=12,
|
||||
|
|
|
|||
|
|
@ -194,7 +194,7 @@ def test_traffic(fetch_traffic, **kwargs):
|
|||
|
||||
msg = create_msg("short.example.", "TXT")
|
||||
update_expected(exp, "dns-udp-requests-sizes-received-ipv4", msg)
|
||||
ans = isctest.query.udp(msg, statsip)
|
||||
ans = isctest.query.udp(msg, statsip, attempts=1)
|
||||
isctest.check.noerror(ans)
|
||||
update_expected(exp, "dns-udp-responses-sizes-sent-ipv4", ans)
|
||||
data = fetch_traffic(statsip, statsport)
|
||||
|
|
@ -203,7 +203,7 @@ def test_traffic(fetch_traffic, **kwargs):
|
|||
|
||||
msg = create_msg("long.example.", "TXT")
|
||||
update_expected(exp, "dns-udp-requests-sizes-received-ipv4", msg)
|
||||
ans = isctest.query.udp(msg, statsip)
|
||||
ans = isctest.query.udp(msg, statsip, attempts=1)
|
||||
isctest.check.noerror(ans)
|
||||
update_expected(exp, "dns-udp-responses-sizes-sent-ipv4", ans)
|
||||
data = fetch_traffic(statsip, statsport)
|
||||
|
|
@ -212,7 +212,7 @@ def test_traffic(fetch_traffic, **kwargs):
|
|||
|
||||
msg = create_msg("short.example.", "TXT")
|
||||
update_expected(exp, "dns-tcp-requests-sizes-received-ipv4", msg)
|
||||
ans = isctest.query.tcp(msg, statsip)
|
||||
ans = isctest.query.tcp(msg, statsip, attempts=1)
|
||||
isctest.check.noerror(ans)
|
||||
update_expected(exp, "dns-tcp-responses-sizes-sent-ipv4", ans)
|
||||
data = fetch_traffic(statsip, statsport)
|
||||
|
|
@ -221,7 +221,7 @@ def test_traffic(fetch_traffic, **kwargs):
|
|||
|
||||
msg = create_msg("long.example.", "TXT")
|
||||
update_expected(exp, "dns-tcp-requests-sizes-received-ipv4", msg)
|
||||
ans = isctest.query.tcp(msg, statsip)
|
||||
ans = isctest.query.tcp(msg, statsip, attempts=1)
|
||||
isctest.check.noerror(ans)
|
||||
update_expected(exp, "dns-tcp-responses-sizes-sent-ipv4", ans)
|
||||
data = fetch_traffic(statsip, statsport)
|
||||
|
|
|
|||
|
|
@ -47,7 +47,7 @@ try:
|
|||
pytest.importorskip("hypothesis")
|
||||
except ValueError:
|
||||
pytest.importorskip("hypothesis", minversion="4.41.2")
|
||||
from hypothesis import assume, example, given
|
||||
from hypothesis import assume, example, given, settings
|
||||
|
||||
from isctest.hypothesis.strategies import dns_names, dns_rdatatypes_without_meta
|
||||
import isctest.check
|
||||
|
|
@ -63,6 +63,7 @@ IP_ADDR = "10.53.0.1"
|
|||
TIMEOUT = 5 # seconds, just a sanity check
|
||||
|
||||
|
||||
@settings(deadline=None)
|
||||
@given(name=dns_names(suffix=SUFFIX), rdtype=dns_rdatatypes_without_meta)
|
||||
def test_wildcard_rdtype_mismatch(
|
||||
name: dns.name.Name, rdtype: dns.rdatatype.RdataType, named_port: int
|
||||
|
|
@ -90,6 +91,7 @@ def test_wildcard_rdtype_mismatch(
|
|||
isctest.check.empty_answer(response_msg)
|
||||
|
||||
|
||||
@settings(deadline=None)
|
||||
@given(name=dns_names(suffix=SUFFIX, min_labels=len(SUFFIX) + 1))
|
||||
def test_wildcard_match(name: dns.name.Name, named_port: int) -> None:
|
||||
"""Any label with maching rdtype must result in wildcard data in answer."""
|
||||
|
|
@ -116,6 +118,7 @@ def test_wildcard_match(name: dns.name.Name, named_port: int) -> None:
|
|||
|
||||
|
||||
# Force the `*.*.allwild.test.` corner case to be checked.
|
||||
@settings(deadline=None)
|
||||
@example(name=isctest.name.prepend_label("*", isctest.name.prepend_label("*", SUFFIX)))
|
||||
@given(
|
||||
name=dns_names(
|
||||
|
|
@ -138,6 +141,7 @@ NESTED_SUFFIX = dns.name.from_text("*.*.nestedwild.test.")
|
|||
|
||||
|
||||
# Force `*.*.*.nestedwild.test.` to be checked.
|
||||
@settings(deadline=None)
|
||||
@example(name=isctest.name.prepend_label("*", NESTED_SUFFIX))
|
||||
@given(name=dns_names(suffix=NESTED_SUFFIX, min_labels=len(NESTED_SUFFIX) + 1))
|
||||
def test_name_in_between_wildcards(name: dns.name.Name, named_port: int) -> None:
|
||||
|
|
@ -172,6 +176,7 @@ def test_name_in_between_wildcards(name: dns.name.Name, named_port: int) -> None
|
|||
assert response_msg.answer == expected_answer, str(response_msg)
|
||||
|
||||
|
||||
@settings(deadline=None)
|
||||
@given(
|
||||
name=dns_names(
|
||||
suffix=isctest.name.prepend_label("*", NESTED_SUFFIX),
|
||||
|
|
|
|||
Loading…
Reference in a new issue