diff --git a/bin/tests/system/cipher-suites/tests_cipher_suites.py b/bin/tests/system/cipher-suites/tests_cipher_suites.py index edc3c09598..86adc29da1 100644 --- a/bin/tests/system/cipher-suites/tests_cipher_suites.py +++ b/bin/tests/system/cipher-suites/tests_cipher_suites.py @@ -9,7 +9,7 @@ # See the COPYRIGHT file distributed with this work for additional # information regarding copyright ownership. -import re +from re import compile as Re import pytest @@ -31,7 +31,7 @@ pytestmark = pytest.mark.extra_artifacts( @pytest.fixture(scope="module") def transfers_complete(servers): for zone in ["example", "example-aes-128", "example-aes-256", "example-chacha-20"]: - pattern = re.compile( + pattern = Re( f"transfer of '{zone}/IN' from 10.53.0.1#[0-9]+: Transfer completed" ) for ns in ["ns2", "ns3", "ns4", "ns5"]: diff --git a/bin/tests/system/configloading/tests_configloading.py b/bin/tests/system/configloading/tests_configloading.py index de3d25aa71..ec017a8bfe 100644 --- a/bin/tests/system/configloading/tests_configloading.py +++ b/bin/tests/system/configloading/tests_configloading.py @@ -9,8 +9,7 @@ # See the COPYRIGHT file distributed with this work for additional # information regarding copyright ownership. -import re - +from re import compile as Re import isctest @@ -59,7 +58,7 @@ def test_reload_fails_log(ns1, templates): "apply_configuration", "loop exclusive mode: starting", "apply_configuration: configure_views", - re.compile(r".*port '9999999' out of range"), + Re(r".*port '9999999' out of range"), "apply_configuration: detaching views", "loop exclusive mode: ending", "reloading configuration failed", diff --git a/bin/tests/system/conftest.py b/bin/tests/system/conftest.py index bbb2c28385..9eff2ee8b2 100644 --- a/bin/tests/system/conftest.py +++ b/bin/tests/system/conftest.py @@ -12,7 +12,7 @@ import filecmp import os from pathlib import Path -import re +from re import compile as Re import shutil import subprocess import tempfile @@ -53,7 +53,7 @@ else: XDIST_WORKER = os.environ.get("PYTEST_XDIST_WORKER", "") FILE_DIR = os.path.abspath(Path(__file__).parent) -ENV_RE = re.compile(b"([^=]+)=(.*)") +ENV_RE = Re(b"([^=]+)=(.*)") PRIORITY_TESTS = [ # Tests that are scheduled first. Speeds up parallel execution. "rpz/", @@ -62,9 +62,9 @@ PRIORITY_TESTS = [ "timeouts/", "upforwd/", ] -PRIORITY_TESTS_RE = re.compile("|".join(PRIORITY_TESTS)) -SYSTEM_TEST_NAME_RE = re.compile(f"{SYSTEM_TEST_DIR_GIT_PATH}" + r"/([^/]+)") -SYMLINK_REPLACEMENT_RE = re.compile(r"/tests(_.*)\.py") +PRIORITY_TESTS_RE = Re("|".join(PRIORITY_TESTS)) +SYSTEM_TEST_NAME_RE = Re(f"{SYSTEM_TEST_DIR_GIT_PATH}" + r"/([^/]+)") +SYMLINK_REPLACEMENT_RE = Re(r"/tests(_.*)\.py") # ----------------------- Global requirements ---------------------------- diff --git a/bin/tests/system/dnssec/tests_signing.py b/bin/tests/system/dnssec/tests_signing.py index b0295e5840..2a1abd531d 100644 --- a/bin/tests/system/dnssec/tests_signing.py +++ b/bin/tests/system/dnssec/tests_signing.py @@ -12,6 +12,7 @@ from collections import namedtuple import os import re +from re import compile as Re import struct import time @@ -475,7 +476,7 @@ def test_offline_ksk_signing(ns2): os.rename(f"ns2/{KSK}.private.bak", f"ns2/{KSK}.private") def loadkeys(): - pattern = re.compile(f"{zone}/IN.*next key event") + pattern = Re(f"{zone}/IN.*next key event") with ns2.watch_log_from_here() as watcher: ns2.rndc(f"loadkeys {zone}", log=False) watcher.wait_for_line(pattern) diff --git a/bin/tests/system/dnssec/tests_tat.py b/bin/tests/system/dnssec/tests_tat.py index 609f110c59..e37644f3a9 100644 --- a/bin/tests/system/dnssec/tests_tat.py +++ b/bin/tests/system/dnssec/tests_tat.py @@ -10,7 +10,7 @@ # information regarding copyright ownership. import os -import re +from re import compile as Re from dns import edns @@ -67,7 +67,7 @@ def test_tat_queries(ns1, ns6): msg = isctest.query.create(".", "DNSKEY") opt = edns.GenericOption(14, b"\xff\xff") msg.use_edns(edns=True, options=[opt]) - pattern = re.compile("trust-anchor-telemetry './IN' from .* 65535") + pattern = Re("trust-anchor-telemetry './IN' from .* 65535") with ns1.watch_log_from_here() as watcher: res = isctest.query.tcp(msg, "10.53.0.1") watcher.wait_for_line(pattern) @@ -79,7 +79,7 @@ def test_tat_queries(ns1, ns6): opt1 = edns.GenericOption(14, b"\xff\xff") opt2 = edns.GenericOption(14, b"\xff\xfe") msg.use_edns(edns=True, options=[opt2, opt1]) - pattern = re.compile("trust-anchor-telemetry './IN' from .* 65534") + pattern = Re("trust-anchor-telemetry './IN' from .* 65534") with ns1.watch_log_from_here() as watcher: res = isctest.query.tcp(msg, "10.53.0.1") isctest.check.noerror(res) diff --git a/bin/tests/system/dnssec/tests_validation_multiview.py b/bin/tests/system/dnssec/tests_validation_multiview.py index 085f388b2c..015e458349 100644 --- a/bin/tests/system/dnssec/tests_validation_multiview.py +++ b/bin/tests/system/dnssec/tests_validation_multiview.py @@ -10,7 +10,7 @@ # information regarding copyright ownership. import os -import re +from re import compile as Re import isctest @@ -47,7 +47,7 @@ def test_staticstub_delegations(): def test_validator_logging(ns4): # check that validator logging includes the view name with multiple views - pattern = re.compile("view rec: *validat") + pattern = Re("view rec: *validat") with ns4.watch_log_from_start() as watcher: msg = isctest.query.create("secure.example", "NS") isctest.query.tcp(msg, "10.53.0.4") diff --git a/bin/tests/system/isctest/kasp.py b/bin/tests/system/isctest/kasp.py index 2e1b2c36fb..f46ceabe73 100644 --- a/bin/tests/system/isctest/kasp.py +++ b/bin/tests/system/isctest/kasp.py @@ -15,6 +15,7 @@ import glob import os from pathlib import Path import re +from re import compile as Re import time from typing import Dict, List, Optional, Tuple, Union @@ -1475,7 +1476,7 @@ def next_key_event_equals(server, zone, next_event): waitfor = rf".*zone {zone}.*: next key event in (?!3600$)(.*) seconds" with server.watch_log_from_start() as watcher: - watcher.wait_for_line(re.compile(waitfor)) + watcher.wait_for_line(Re(waitfor)) # WMM: The with code below is extracting the line the watcher was # waiting for. If WatchLog.wait_for_line()` returned the matched string, diff --git a/bin/tests/system/isctest/log/watchlog.py b/bin/tests/system/isctest/log/watchlog.py index 6d85e9817d..d2d8521ebd 100644 --- a/bin/tests/system/isctest/log/watchlog.py +++ b/bin/tests/system/isctest/log/watchlog.py @@ -14,6 +14,7 @@ from typing import Any, Iterator, List, Match, Optional, Pattern, TextIO, TypeVa import abc import os import re +from re import compile as Re import time @@ -213,7 +214,7 @@ class WatchLog(abc.ABC): if isinstance(string, Pattern): patterns.append(string) elif isinstance(string, str): - pattern = re.compile(re.escape(string)) + pattern = Re(re.escape(string)) patterns.append(pattern) else: raise WatchLogException( @@ -256,13 +257,14 @@ class WatchLog(abc.ABC): Recommended use: ```python + from re import compile as Re import isctest def test_foo(servers): with servers["ns1"].watch_log_from_start() as watcher: watcher.wait_for_line("all zones loaded") - pattern = re.compile(r"next key event in ([0-9]+) seconds") + pattern = Re(r"next key event in ([0-9]+) seconds") with servers["ns1"].watch_log_from_here() as watcher: # ... do stuff here ... match = watcher.wait_for_line(pattern) @@ -321,7 +323,8 @@ class WatchLog(abc.ABC): >>> # Different values must be returned depending on which line is >>> # found in the log file. >>> import tempfile - >>> patterns = [re.compile(r"bar ([0-9])"), "qux"] + >>> from re import compile as Re + >>> patterns = [Re(r"bar ([0-9])"), "qux"] >>> with tempfile.NamedTemporaryFile("w") as file: ... print("foo bar 3", file=file, flush=True) ... with WatchLogFromStart(file.name) as watcher: @@ -443,7 +446,8 @@ class WatchLog(abc.ABC): >>> assert ret[1].group(0) == "foo" >>> import tempfile - >>> bar_pattern = re.compile('bar') + >>> from re import compile as Re + >>> bar_pattern = Re('bar') >>> patterns = ['foo', bar_pattern] >>> with tempfile.NamedTemporaryFile("w") as file: ... print("bar", file=file, flush=True) diff --git a/bin/tests/system/isctest/vars/openssl.py b/bin/tests/system/isctest/vars/openssl.py index 19d62d1811..3d1829e723 100644 --- a/bin/tests/system/isctest/vars/openssl.py +++ b/bin/tests/system/isctest/vars/openssl.py @@ -10,7 +10,7 @@ # information regarding copyright ownership. import os -import re +from re import compile as Re from typing import Optional from .. import log @@ -30,7 +30,7 @@ def parse_openssl_config(path: Optional[str]): return assert os.path.isfile(path), f"{path} exists, but it's not a file" - regex = re.compile(r"([^=]+)=(.*)") + regex = Re(r"([^=]+)=(.*)") log.debug(f"parsing openssl config: {path}") with open(path, "r", encoding="utf-8") as conf: for line in conf: diff --git a/bin/tests/system/multisigner/tests_multisigner.py b/bin/tests/system/multisigner/tests_multisigner.py index c356595afe..900cc7e7b4 100644 --- a/bin/tests/system/multisigner/tests_multisigner.py +++ b/bin/tests/system/multisigner/tests_multisigner.py @@ -12,6 +12,7 @@ from datetime import timedelta import os import re +from re import compile as Re import pytest @@ -103,9 +104,7 @@ def check_no_dnssec_in_journal(server, zone): ] cmd = isctest.run.cmd(journalprint) - pattern = re.compile( - r"^\s*(?:\S+\s+){4}(NSEC|NSEC3|NSEC3PARAM|RRSIG)", flags=re.MULTILINE - ) + pattern = Re(r"^\s*(?:\S+\s+){4}(NSEC|NSEC3|NSEC3PARAM|RRSIG)", flags=re.MULTILINE) match = pattern.search(cmd.out) assert not match, f"{match.group(1)} record found in journal" diff --git a/bin/tests/system/xfer-servers-list/tests_xfer_servers_list.py b/bin/tests/system/xfer-servers-list/tests_xfer_servers_list.py index 612be36b1d..54cf2fcaf2 100644 --- a/bin/tests/system/xfer-servers-list/tests_xfer_servers_list.py +++ b/bin/tests/system/xfer-servers-list/tests_xfer_servers_list.py @@ -9,7 +9,7 @@ # See the COPYRIGHT file distributed with this work for additional # information regarding copyright ownership. -import re +from re import compile as Re import isctest @@ -32,7 +32,7 @@ def wait_for_initial_xfrin(ns): def wait_for_sending_notify(ns1, ns, key_name): - pattern = re.compile( + pattern = Re( f"zone test/IN: sending notify to {ns.ip}#[0-9]+ : TSIG \\({key_name}\\)" ) with ns1.watch_log_from_start() as watcher: diff --git a/bin/tests/system/xferquota/tests_xferquota.py b/bin/tests/system/xferquota/tests_xferquota.py index a4edc5998f..a0d042d135 100644 --- a/bin/tests/system/xferquota/tests_xferquota.py +++ b/bin/tests/system/xferquota/tests_xferquota.py @@ -12,6 +12,7 @@ import glob import os import re +from re import compile as Re import shutil import signal import time @@ -71,7 +72,7 @@ def test_xferquota(named_port, ns1, ns2): isctest.check.rrsets_equal(ns1response.answer, ns2response.answer) query_and_compare(axfr_msg) - pattern = re.compile( + pattern = Re( f"transfer of 'changing/IN' from 10.53.0.1#{named_port}: " f"Transfer completed: .*\\(serial 2\\)" )