diff --git a/.pylintrc b/.pylintrc index b5ea55a5e5..a75e65d8d3 100644 --- a/.pylintrc +++ b/.pylintrc @@ -1,4 +1,10 @@ -[MASTER] +[IMPORTS] + +deprecated-modules= + dns.resolver, + +[MESSAGES CONTROL] + disable= C0103, # invalid-name C0114, # missing-module-docstring diff --git a/bin/tests/system/dnstap/tests_dnstap.py b/bin/tests/system/dnstap/tests_dnstap.py index 0dbf2aa3b8..2b2185a6f3 100644 --- a/bin/tests/system/dnstap/tests_dnstap.py +++ b/bin/tests/system/dnstap/tests_dnstap.py @@ -15,10 +15,12 @@ import os import re import subprocess +import isctest import pytest +import dns.message + pytest.importorskip("dns", minversion="2.0.0") -import dns.resolver def run_rndc(server, rndc_command): @@ -34,15 +36,13 @@ def run_rndc(server, rndc_command): subprocess.check_output(cmdline, stderr=subprocess.STDOUT, timeout=10) -def test_dnstap_dispatch_socket_addresses(named_port): - # Prepare for querying ns3. - resolver = dns.resolver.Resolver() - resolver.nameservers = ["10.53.0.3"] - resolver.port = named_port - +def test_dnstap_dispatch_socket_addresses(): # Send some query to ns3 so that it records something in its dnstap file. - ans = resolver.resolve("mail.example.", "A") - assert ans[0].address == "10.0.0.2" + msg = dns.message.make_query("mail.example.", "A") + res = isctest.query.tcp(msg, "10.53.0.2", expected_rcode=dns.rcode.NOERROR) + assert res.answer == [ + dns.rrset.from_text("mail.example.", 300, "IN", "A", "10.0.0.2") + ] # Before continuing, roll dnstap file to ensure it is flushed to disk. run_rndc("10.53.0.3", ["dnstap", "-roll", "1"]) diff --git a/bin/tests/system/isctest/check.py b/bin/tests/system/isctest/check.py index 28eb16d5dd..251e87e4f3 100644 --- a/bin/tests/system/isctest/check.py +++ b/bin/tests/system/isctest/check.py @@ -10,23 +10,14 @@ # information regarding copyright ownership. import shutil -from typing import Any, Optional +from typing import Optional import dns.rcode import dns.message import dns.zone import isctest.log - -# compatiblity with dnspython<2.0.0 -try: - # In dnspython>=2.0.0, dns.rcode.Rcode class is available - # pylint: disable=invalid-name - dns_rcode = dns.rcode.Rcode # type: Any -except AttributeError: - # In dnspython<2.0.0, selected rcodes are available as integers directly - # from dns.rcode - dns_rcode = dns.rcode +from isctest.compat import dns_rcode def rcode(message: dns.message.Message, expected_rcode) -> None: diff --git a/bin/tests/system/isctest/compat.py b/bin/tests/system/isctest/compat.py new file mode 100644 index 0000000000..5580f1f4c5 --- /dev/null +++ b/bin/tests/system/isctest/compat.py @@ -0,0 +1,24 @@ +# Copyright (C) Internet Systems Consortium, Inc. ("ISC") +# +# SPDX-License-Identifier: MPL-2.0 +# +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, you can obtain one at https://mozilla.org/MPL/2.0/. +# +# See the COPYRIGHT file distributed with this work for additional +# information regarding copyright ownership. + +from typing import Any + +import dns.rcode + +# compatiblity with dnspython<2.0.0 +try: + # In dnspython>=2.0.0, dns.rcode.Rcode class is available + # pylint: disable=invalid-name + dns_rcode = dns.rcode.Rcode # type: Any +except AttributeError: + # In dnspython<2.0.0, selected rcodes are available as integers directly + # from dns.rcode + dns_rcode = dns.rcode diff --git a/bin/tests/system/isctest/query.py b/bin/tests/system/isctest/query.py index 46fd9b85f9..3e5ed8b78b 100644 --- a/bin/tests/system/isctest/query.py +++ b/bin/tests/system/isctest/query.py @@ -10,34 +10,50 @@ # information regarding copyright ownership. import os -from typing import Optional +import time +from typing import Any, Callable, Optional import dns.query import dns.message +import isctest.log +from isctest.compat import dns_rcode QUERY_TIMEOUT = 10 -def udp( +# pylint: disable=too-many-arguments +def generic_query( + query_func: Callable[..., Any], message: dns.message.Message, ip: str, port: Optional[int] = None, source: Optional[str] = None, timeout: int = QUERY_TIMEOUT, -) -> dns.message.Message: + attempts: int = 10, + expected_rcode: dns_rcode = None, +) -> Any: if port is None: port = int(os.environ["PORT"]) - return dns.query.udp(message, ip, timeout, port=port, source=source) + res = None + for attempt in range(attempts): + try: + isctest.log.debug( + f"{generic_query.__name__}(): ip={ip}, port={port}, source={source}, " + f"timeout={timeout}, attempts left={attempts-attempt}" + ) + res = query_func(message, ip, timeout, port=port, source=source) + if res.rcode() == expected_rcode or expected_rcode is None: + return res + except (dns.exception.Timeout, ConnectionRefusedError) as e: + isctest.log.debug(f"{generic_query.__name__}(): the '{e}' exceptio raised") + time.sleep(1) + raise dns.exception.Timeout -def tcp( - message: dns.message.Message, - ip: str, - port: Optional[int] = None, - source: Optional[str] = None, - timeout: int = QUERY_TIMEOUT, -) -> dns.message.Message: - if port is None: - port = int(os.environ["PORT"]) - return dns.query.tcp(message, ip, timeout, port=port, source=source) +def udp(*args, **kwargs) -> Any: + return generic_query(dns.query.udp, *args, **kwargs) + + +def tcp(*args, **kwargs) -> Any: + return generic_query(dns.query.tcp, *args, **kwargs) diff --git a/bin/tests/system/isctest/run.py b/bin/tests/system/isctest/run.py index e48e1c1b57..b3cb8200c0 100644 --- a/bin/tests/system/isctest/run.py +++ b/bin/tests/system/isctest/run.py @@ -9,11 +9,15 @@ # See the COPYRIGHT file distributed with this work for additional # information regarding copyright ownership. +import os import subprocess import time from typing import Optional import isctest.log +from isctest.compat import dns_rcode + +import dns.message def cmd( # pylint: disable=too-many-arguments @@ -70,3 +74,36 @@ def retry_with_timeout(func, timeout, delay=1, msg=None): if msg is None: msg = f"{func.__module__}.{func.__qualname__} timed out after {timeout} s" assert False, msg + + +def get_named_cmdline(cfg_dir, cfg_file="named.conf"): + cfg_dir = os.path.join(os.getcwd(), cfg_dir) + assert os.path.isdir(cfg_dir) + + cfg_file = os.path.join(cfg_dir, cfg_file) + assert os.path.isfile(cfg_file) + + named = os.getenv("NAMED") + assert named is not None + + named_cmdline = [named, "-c", cfg_file, "-d", "99", "-g"] + + return named_cmdline + + +def get_custom_named_instance(assumed_ns): + # This test launches and monitors a named instance itself rather than using + # bin/tests/system/start.pl, so manually defining a NamedInstance here is + # necessary for sending RNDC commands to that instance. If this "custom" + # instance listens on 10.53.0.3, use "ns3" as the identifier passed to + # the NamedInstance constructor. + named_ports = isctest.instance.NamedPorts.from_env() + instance = isctest.instance.NamedInstance(assumed_ns, named_ports) + + return instance + + +def assert_custom_named_is_alive(named_proc, resolver_ip): + assert named_proc.poll() is None, "named isn't running" + msg = dns.message.make_query("version.bind", "TXT", "CH") + isctest.query.tcp(msg, resolver_ip, expected_rcode=dns_rcode.NOERROR) diff --git a/bin/tests/system/rpzextra/tests_rpzextra.py b/bin/tests/system/rpzextra/tests_rpzextra.py index ab5da45973..bf2fe94c6b 100644 --- a/bin/tests/system/rpzextra/tests_rpzextra.py +++ b/bin/tests/system/rpzextra/tests_rpzextra.py @@ -11,121 +11,100 @@ # See the COPYRIGHT file distributed with this work for additional # information regarding copyright ownership. -import time import os - import pytest pytest.importorskip("dns", minversion="2.0.0") -import dns.resolver +import isctest +from isctest.compat import dns_rcode + +import dns.message -def wait_for_transfer(ip, port, client_ip, name, rrtype): - resolver = dns.resolver.Resolver() - resolver.nameservers = [ip] - resolver.port = port +@pytest.mark.parametrize( + "qname,source,rcode", + [ + # For 10.53.0.1 source IP: + # - baddomain.com isn't allowed (CNAME .), should return NXDOMAIN + # - gooddomain.com is allowed + # - allowed. is allowed + ("baddomain.", "10.53.0.1", dns.rcode.NXDOMAIN), + ("gooddomain.", "10.53.0.1", dns.rcode.NOERROR), + ("allowed.", "10.53.0.1", dns.rcode.NOERROR), + # For 10.53.0.2 source IP: + # - allowed.com isn't allowed (CNAME .), should return NXDOMAIN + # - baddomain.com is allowed + # - gooddomain.com is allowed + ("baddomain.", "10.53.0.2", dns.rcode.NOERROR), + ("gooddomain.", "10.53.0.2", dns.rcode.NOERROR), + ("allowed.", "10.53.0.2", dns.rcode.NXDOMAIN), + # For 10.53.0.3 source IP: + # - gooddomain.com is allowed + # - baddomain.com is allowed + # - allowed. is allowed + ("baddomain.", "10.53.0.3", dns.rcode.NOERROR), + ("gooddomain.", "10.53.0.3", dns.rcode.NOERROR), + ("allowed.", "10.53.0.3", dns.rcode.NOERROR), + # For 10.53.0.4 source IP: + # - gooddomain.com isn't allowed (CNAME .), should return NXDOMAIN + # - baddomain.com isn't allowed (CNAME .), should return NXDOMAIN + # - allowed. is allowed + ("baddomain.", "10.53.0.4", dns.rcode.NXDOMAIN), + ("gooddomain.", "10.53.0.4", dns.rcode.NXDOMAIN), + ("allowed.", "10.53.0.4", dns.rcode.NOERROR), + # For 10.53.0.5 (any) source IP: + # - baddomain.com is allowed + # - gooddomain.com isn't allowed (CNAME .), should return NXDOMAIN + # - allowed.com isn't allowed (CNAME .), should return NXDOMAIN + ("baddomain.", "10.53.0.5", dns.rcode.NOERROR), + ("gooddomain.", "10.53.0.5", dns.rcode.NXDOMAIN), + ("allowed.", "10.53.0.5", dns.rcode.NXDOMAIN), + ], +) +def test_rpz_multiple_views(qname, source, rcode): + # Wait for the rpz-external.local zone transfer + msg = dns.message.make_query("rpz-external.local", "SOA") + isctest.query.tcp( + msg, + ip="10.53.0.3", + source="10.53.0.2", + expected_rcode=dns_rcode.NOERROR, + ) + isctest.query.tcp( + msg, + ip="10.53.0.3", + source="10.53.0.5", + expected_rcode=dns_rcode.NOERROR, + ) - for _ in range(10): - try: - resolver.resolve(name, rrtype, source=client_ip) - except dns.resolver.NoNameservers: - time.sleep(1) - else: - break - else: - raise RuntimeError( - "zone transfer failed: " - f"client {client_ip} got NXDOMAIN for {name} {rrtype} from @{ip}:{port}" - ) + msg = dns.message.make_query(qname, "A") + res = isctest.query.udp(msg, "10.53.0.3", source=source, expected_rcode=rcode) + if rcode == dns.rcode.NOERROR: + assert res.answer == [dns.rrset.from_text(qname, 300, "IN", "A", "10.53.0.2")] -def test_rpz_multiple_views(named_port): - resolver = dns.resolver.Resolver() - resolver.nameservers = ["10.53.0.3"] - resolver.port = named_port - - wait_for_transfer("10.53.0.3", named_port, "10.53.0.2", "rpz-external.local", "SOA") - wait_for_transfer("10.53.0.3", named_port, "10.53.0.5", "rpz-external.local", "SOA") - - # For 10.53.0.1 source IP: - # - baddomain.com isn't allowed (CNAME .), should return NXDOMAIN - # - gooddomain.com is allowed - # - allowed. is allowed - with pytest.raises(dns.resolver.NXDOMAIN): - resolver.resolve("baddomain.", "A", source="10.53.0.1") - - ans = resolver.resolve("gooddomain.", "A", source="10.53.0.1") - assert ans[0].address == "10.53.0.2" - - ans = resolver.resolve("allowed.", "A", source="10.53.0.1") - assert ans[0].address == "10.53.0.2" - - # For 10.53.0.2 source IP: - # - allowed.com isn't allowed (CNAME .), should return NXDOMAIN - # - baddomain.com is allowed - # - gooddomain.com is allowed - ans = resolver.resolve("baddomain.", "A", source="10.53.0.2") - assert ans[0].address == "10.53.0.2" - - ans = resolver.resolve("gooddomain.", "A", source="10.53.0.2") - assert ans[0].address == "10.53.0.2" - - with pytest.raises(dns.resolver.NXDOMAIN): - resolver.resolve("allowed.", "A", source="10.53.0.2") - - # For 10.53.0.3 source IP: - # - gooddomain.com is allowed - # - baddomain.com is allowed - # - allowed. is allowed - ans = resolver.resolve("baddomain.", "A", source="10.53.0.3") - assert ans[0].address == "10.53.0.2" - - ans = resolver.resolve("gooddomain.", "A", source="10.53.0.3") - assert ans[0].address == "10.53.0.2" - - ans = resolver.resolve("allowed.", "A", source="10.53.0.3") - assert ans[0].address == "10.53.0.2" - - # For 10.53.0.4 source IP: - # - gooddomain.com isn't allowed (CNAME .), should return NXDOMAIN - # - baddomain.com isn't allowed (CNAME .), should return NXDOMAIN - # - allowed. is allowed - with pytest.raises(dns.resolver.NXDOMAIN): - resolver.resolve("baddomain.", "A", source="10.53.0.4") - - with pytest.raises(dns.resolver.NXDOMAIN): - resolver.resolve("gooddomain.", "A", source="10.53.0.4") - - ans = resolver.resolve("allowed.", "A", source="10.53.0.4") - assert ans[0].address == "10.53.0.2" - - # For 10.53.0.5 (any) source IP: - # - baddomain.com is allowed - # - gooddomain.com isn't allowed (CNAME .), should return NXDOMAIN - # - allowed.com isn't allowed (CNAME .), should return NXDOMAIN - ans = resolver.resolve("baddomain.", "A", source="10.53.0.5") - assert ans[0].address == "10.53.0.2" - - with pytest.raises(dns.resolver.NXDOMAIN): - resolver.resolve("gooddomain.", "A", source="10.53.0.5") - - with pytest.raises(dns.resolver.NXDOMAIN): - resolver.resolve("allowed.", "A", source="10.53.0.5") - - -def test_rpz_passthru_logging(named_port): - resolver = dns.resolver.Resolver() - resolver.nameservers = ["10.53.0.3"] - resolver.port = named_port +def test_rpz_passthru_logging(): + resolver_ip = "10.53.0.3" # Should generate a log entry into rpz_passthru.txt - ans = resolver.resolve("allowed.", "A", source="10.53.0.1") - assert ans[0].address == "10.53.0.2" + msg_allowed = dns.message.make_query("allowed.", "A") + res_allowed = isctest.query.udp( + msg_allowed, resolver_ip, source="10.53.0.1", expected_rcode=dns.rcode.NOERROR + ) + assert res_allowed.answer == [ + dns.rrset.from_text("allowed.", 300, "IN", "A", "10.53.0.2") + ] # baddomain.com isn't allowed (CNAME .), should return NXDOMAIN # Should generate a log entry into rpz.txt - with pytest.raises(dns.resolver.NXDOMAIN): - resolver.resolve("baddomain.", "A", source="10.53.0.1") + msg_not_allowed = dns.message.make_query("baddomain.", "A") + res_not_allowed = isctest.query.udp( + msg_not_allowed, + resolver_ip, + source="10.53.0.1", + expected_rcode=dns.rcode.NXDOMAIN, + ) + isctest.check.nxdomain(res_not_allowed) rpz_passthru_logfile = os.path.join("ns3", "rpz_passthru.txt") rpz_logfile = os.path.join("ns3", "rpz.txt") diff --git a/bin/tests/system/shutdown/tests_shutdown.py b/bin/tests/system/shutdown/tests_shutdown.py index a993cc9ffa..121feed7b7 100755 --- a/bin/tests/system/shutdown/tests_shutdown.py +++ b/bin/tests/system/shutdown/tests_shutdown.py @@ -23,12 +23,11 @@ import pytest pytest.importorskip("dns", minversion="2.0.0") import dns.exception -import dns.resolver import isctest -def do_work(named_proc, resolver, instance, kill_method, n_workers, n_queries): +def do_work(named_proc, resolver_ip, instance, kill_method, n_workers, n_queries): """Creates a number of A queries to run in parallel in order simulate a slightly more realistic test scenario. @@ -47,8 +46,8 @@ def do_work(named_proc, resolver, instance, kill_method, n_workers, n_queries): :param named_proc: named process instance :type named_proc: subprocess.Popen - :param resolver: target resolver - :type resolver: dns.resolver.Resolver + :param resolver_ip: target resolver's IP address + :type resolver_ip: str :param instance: the named instance to send RNDC commands to :type instance: isctest.instance.NamedInstance @@ -74,7 +73,7 @@ def do_work(named_proc, resolver, instance, kill_method, n_workers, n_queries): return -1 # We're going to execute queries in parallel by means of a thread pool. - # dnspython functions block, so we need to circunvent that. + # dnspython functions block, so we need to circumvent that. with ThreadPoolExecutor(n_workers + 1) as executor: # Helper dict, where keys=Future objects and values are tags used # to process results later. @@ -83,7 +82,7 @@ def do_work(named_proc, resolver, instance, kill_method, n_workers, n_queries): # 50% of work will be A queries. # 1 work will be rndc stop. # Remaining work will be rndc status (so we test parallel control - # connections that were crashing named). + # connections that were crashing named). shutdown = True for i in range(n_queries): if i < (n_queries // 2): @@ -101,7 +100,8 @@ def do_work(named_proc, resolver, instance, kill_method, n_workers, n_queries): ) qname = relname + ".test" - futures[executor.submit(resolver.resolve, qname, "A")] = tag + msg = dns.message.make_query(qname, "A") + futures[executor.submit(isctest.query.udp, msg, resolver_ip)] = tag elif shutdown: # We attempt to stop named in the middle shutdown = False if kill_method == "rndc": @@ -125,28 +125,13 @@ def do_work(named_proc, resolver, instance, kill_method, n_workers, n_queries): # named process exited gracefully after SIGTERM signal. if futures[future] == "stop": ret_code = result - - except ( - dns.resolver.NXDOMAIN, - dns.resolver.NoNameservers, - dns.exception.Timeout, - ): + except dns.exception.Timeout: pass if kill_method == "rndc": assert ret_code == 0 -def wait_for_named_loaded(resolver, retries=10): - for _ in range(retries): - try: - resolver.resolve("version.bind", "TXT", "CH") - return True - except (dns.resolver.NoNameservers, dns.exception.Timeout): - time.sleep(1) - return False - - def wait_for_proc_termination(proc, max_timeout=10): for _ in range(max_timeout): if proc.poll() is not None: @@ -171,40 +156,22 @@ def wait_for_proc_termination(proc, max_timeout=10): ["rndc", "sigterm"], ) def test_named_shutdown(kill_method): - # pylint: disable-msg=too-many-locals - cfg_dir = os.path.join(os.getcwd(), "resolver") - assert os.path.isdir(cfg_dir) + resolver_ip = "10.53.0.3" - cfg_file = os.path.join(cfg_dir, "named.conf") - assert os.path.isfile(cfg_file) + cfg_dir = "resolver" - named = os.getenv("NAMED") - assert named is not None + named_cmdline = isctest.run.get_named_cmdline(cfg_dir) + instance = isctest.run.get_custom_named_instance("ns3") - # This test launches and monitors a named instance itself rather than using - # bin/tests/system/start.pl, so manually defining a NamedInstance here is - # necessary for sending RNDC commands to that instance. This "custom" - # instance listens on 10.53.0.3, so use "ns3" as the identifier passed to - # the NamedInstance constructor. - named_ports = isctest.instance.NamedPorts.from_env() - instance = isctest.instance.NamedInstance("ns3", named_ports) - - # We create a resolver instance that will be used to send queries. - resolver = dns.resolver.Resolver() - resolver.nameservers = ["10.53.0.3"] - resolver.port = named_ports.dns - - named_cmdline = [named, "-c", cfg_file, "-d", "99", "-g"] with open(os.path.join(cfg_dir, "named.run"), "ab") as named_log: with subprocess.Popen( named_cmdline, cwd=cfg_dir, stderr=named_log ) as named_proc: try: - assert named_proc.poll() is None, "named isn't running" - assert wait_for_named_loaded(resolver) + isctest.run.assert_custom_named_is_alive(named_proc, resolver_ip) do_work( named_proc, - resolver, + resolver_ip, instance, kill_method, n_workers=12, diff --git a/bin/tests/system/statschannel/generic.py b/bin/tests/system/statschannel/generic.py index f4b3d857e8..611d396925 100644 --- a/bin/tests/system/statschannel/generic.py +++ b/bin/tests/system/statschannel/generic.py @@ -194,7 +194,7 @@ def test_traffic(fetch_traffic, **kwargs): msg = create_msg("short.example.", "TXT") update_expected(exp, "dns-udp-requests-sizes-received-ipv4", msg) - ans = isctest.query.udp(msg, statsip) + ans = isctest.query.udp(msg, statsip, attempts=1) isctest.check.noerror(ans) update_expected(exp, "dns-udp-responses-sizes-sent-ipv4", ans) data = fetch_traffic(statsip, statsport) @@ -203,7 +203,7 @@ def test_traffic(fetch_traffic, **kwargs): msg = create_msg("long.example.", "TXT") update_expected(exp, "dns-udp-requests-sizes-received-ipv4", msg) - ans = isctest.query.udp(msg, statsip) + ans = isctest.query.udp(msg, statsip, attempts=1) isctest.check.noerror(ans) update_expected(exp, "dns-udp-responses-sizes-sent-ipv4", ans) data = fetch_traffic(statsip, statsport) @@ -212,7 +212,7 @@ def test_traffic(fetch_traffic, **kwargs): msg = create_msg("short.example.", "TXT") update_expected(exp, "dns-tcp-requests-sizes-received-ipv4", msg) - ans = isctest.query.tcp(msg, statsip) + ans = isctest.query.tcp(msg, statsip, attempts=1) isctest.check.noerror(ans) update_expected(exp, "dns-tcp-responses-sizes-sent-ipv4", ans) data = fetch_traffic(statsip, statsport) @@ -221,7 +221,7 @@ def test_traffic(fetch_traffic, **kwargs): msg = create_msg("long.example.", "TXT") update_expected(exp, "dns-tcp-requests-sizes-received-ipv4", msg) - ans = isctest.query.tcp(msg, statsip) + ans = isctest.query.tcp(msg, statsip, attempts=1) isctest.check.noerror(ans) update_expected(exp, "dns-tcp-responses-sizes-sent-ipv4", ans) data = fetch_traffic(statsip, statsport) diff --git a/bin/tests/system/wildcard/tests_wildcard.py b/bin/tests/system/wildcard/tests_wildcard.py index cc5c1571a2..71a84e567f 100755 --- a/bin/tests/system/wildcard/tests_wildcard.py +++ b/bin/tests/system/wildcard/tests_wildcard.py @@ -47,7 +47,7 @@ try: pytest.importorskip("hypothesis") except ValueError: pytest.importorskip("hypothesis", minversion="4.41.2") -from hypothesis import assume, example, given +from hypothesis import assume, example, given, settings from isctest.hypothesis.strategies import dns_names, dns_rdatatypes_without_meta import isctest.check @@ -63,6 +63,7 @@ IP_ADDR = "10.53.0.1" TIMEOUT = 5 # seconds, just a sanity check +@settings(deadline=None) @given(name=dns_names(suffix=SUFFIX), rdtype=dns_rdatatypes_without_meta) def test_wildcard_rdtype_mismatch( name: dns.name.Name, rdtype: dns.rdatatype.RdataType, named_port: int @@ -90,6 +91,7 @@ def test_wildcard_rdtype_mismatch( isctest.check.empty_answer(response_msg) +@settings(deadline=None) @given(name=dns_names(suffix=SUFFIX, min_labels=len(SUFFIX) + 1)) def test_wildcard_match(name: dns.name.Name, named_port: int) -> None: """Any label with maching rdtype must result in wildcard data in answer.""" @@ -116,6 +118,7 @@ def test_wildcard_match(name: dns.name.Name, named_port: int) -> None: # Force the `*.*.allwild.test.` corner case to be checked. +@settings(deadline=None) @example(name=isctest.name.prepend_label("*", isctest.name.prepend_label("*", SUFFIX))) @given( name=dns_names( @@ -138,6 +141,7 @@ NESTED_SUFFIX = dns.name.from_text("*.*.nestedwild.test.") # Force `*.*.*.nestedwild.test.` to be checked. +@settings(deadline=None) @example(name=isctest.name.prepend_label("*", NESTED_SUFFIX)) @given(name=dns_names(suffix=NESTED_SUFFIX, min_labels=len(NESTED_SUFFIX) + 1)) def test_name_in_between_wildcards(name: dns.name.Name, named_port: int) -> None: @@ -172,6 +176,7 @@ def test_name_in_between_wildcards(name: dns.name.Name, named_port: int) -> None assert response_msg.answer == expected_answer, str(response_msg) +@settings(deadline=None) @given( name=dns_names( suffix=isctest.name.prepend_label("*", NESTED_SUFFIX),