From ac7127d620614ccf54237e42c299fc238da759d3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nicki=20K=C5=99=C3=AD=C5=BEek?= Date: Thu, 2 Oct 2025 11:47:56 +0200 Subject: [PATCH] Use Re() for creating regular expressions It's a fairly common pattern to use regular expression in our tests. Instead of using the fairly verbose re.compile(), import that function as Re() instead to allow for more brevity in the test syntax. --- .../system/cipher-suites/tests_cipher_suites.py | 4 ++-- .../system/configloading/tests_configloading.py | 5 ++--- bin/tests/system/conftest.py | 10 +++++----- bin/tests/system/dnssec/tests_signing.py | 3 ++- bin/tests/system/dnssec/tests_tat.py | 6 +++--- .../system/dnssec/tests_validation_multiview.py | 4 ++-- bin/tests/system/isctest/kasp.py | 3 ++- bin/tests/system/isctest/log/watchlog.py | 12 ++++++++---- bin/tests/system/isctest/vars/openssl.py | 4 ++-- bin/tests/system/multisigner/tests_multisigner.py | 5 ++--- .../xfer-servers-list/tests_xfer_servers_list.py | 4 ++-- bin/tests/system/xferquota/tests_xferquota.py | 3 ++- 12 files changed, 34 insertions(+), 29 deletions(-) diff --git a/bin/tests/system/cipher-suites/tests_cipher_suites.py b/bin/tests/system/cipher-suites/tests_cipher_suites.py index edc3c09598..86adc29da1 100644 --- a/bin/tests/system/cipher-suites/tests_cipher_suites.py +++ b/bin/tests/system/cipher-suites/tests_cipher_suites.py @@ -9,7 +9,7 @@ # See the COPYRIGHT file distributed with this work for additional # information regarding copyright ownership. -import re +from re import compile as Re import pytest @@ -31,7 +31,7 @@ pytestmark = pytest.mark.extra_artifacts( @pytest.fixture(scope="module") def transfers_complete(servers): for zone in ["example", "example-aes-128", "example-aes-256", "example-chacha-20"]: - pattern = re.compile( + pattern = Re( f"transfer of '{zone}/IN' from 10.53.0.1#[0-9]+: Transfer completed" ) for ns in ["ns2", "ns3", "ns4", "ns5"]: diff --git a/bin/tests/system/configloading/tests_configloading.py b/bin/tests/system/configloading/tests_configloading.py index de3d25aa71..ec017a8bfe 100644 --- a/bin/tests/system/configloading/tests_configloading.py +++ b/bin/tests/system/configloading/tests_configloading.py @@ -9,8 +9,7 @@ # See the COPYRIGHT file distributed with this work for additional # information regarding copyright ownership. -import re - +from re import compile as Re import isctest @@ -59,7 +58,7 @@ def test_reload_fails_log(ns1, templates): "apply_configuration", "loop exclusive mode: starting", "apply_configuration: configure_views", - re.compile(r".*port '9999999' out of range"), + Re(r".*port '9999999' out of range"), "apply_configuration: detaching views", "loop exclusive mode: ending", "reloading configuration failed", diff --git a/bin/tests/system/conftest.py b/bin/tests/system/conftest.py index bbb2c28385..9eff2ee8b2 100644 --- a/bin/tests/system/conftest.py +++ b/bin/tests/system/conftest.py @@ -12,7 +12,7 @@ import filecmp import os from pathlib import Path -import re +from re import compile as Re import shutil import subprocess import tempfile @@ -53,7 +53,7 @@ else: XDIST_WORKER = os.environ.get("PYTEST_XDIST_WORKER", "") FILE_DIR = os.path.abspath(Path(__file__).parent) -ENV_RE = re.compile(b"([^=]+)=(.*)") +ENV_RE = Re(b"([^=]+)=(.*)") PRIORITY_TESTS = [ # Tests that are scheduled first. Speeds up parallel execution. "rpz/", @@ -62,9 +62,9 @@ PRIORITY_TESTS = [ "timeouts/", "upforwd/", ] -PRIORITY_TESTS_RE = re.compile("|".join(PRIORITY_TESTS)) -SYSTEM_TEST_NAME_RE = re.compile(f"{SYSTEM_TEST_DIR_GIT_PATH}" + r"/([^/]+)") -SYMLINK_REPLACEMENT_RE = re.compile(r"/tests(_.*)\.py") +PRIORITY_TESTS_RE = Re("|".join(PRIORITY_TESTS)) +SYSTEM_TEST_NAME_RE = Re(f"{SYSTEM_TEST_DIR_GIT_PATH}" + r"/([^/]+)") +SYMLINK_REPLACEMENT_RE = Re(r"/tests(_.*)\.py") # ----------------------- Global requirements ---------------------------- diff --git a/bin/tests/system/dnssec/tests_signing.py b/bin/tests/system/dnssec/tests_signing.py index b0295e5840..2a1abd531d 100644 --- a/bin/tests/system/dnssec/tests_signing.py +++ b/bin/tests/system/dnssec/tests_signing.py @@ -12,6 +12,7 @@ from collections import namedtuple import os import re +from re import compile as Re import struct import time @@ -475,7 +476,7 @@ def test_offline_ksk_signing(ns2): os.rename(f"ns2/{KSK}.private.bak", f"ns2/{KSK}.private") def loadkeys(): - pattern = re.compile(f"{zone}/IN.*next key event") + pattern = Re(f"{zone}/IN.*next key event") with ns2.watch_log_from_here() as watcher: ns2.rndc(f"loadkeys {zone}", log=False) watcher.wait_for_line(pattern) diff --git a/bin/tests/system/dnssec/tests_tat.py b/bin/tests/system/dnssec/tests_tat.py index 609f110c59..e37644f3a9 100644 --- a/bin/tests/system/dnssec/tests_tat.py +++ b/bin/tests/system/dnssec/tests_tat.py @@ -10,7 +10,7 @@ # information regarding copyright ownership. import os -import re +from re import compile as Re from dns import edns @@ -67,7 +67,7 @@ def test_tat_queries(ns1, ns6): msg = isctest.query.create(".", "DNSKEY") opt = edns.GenericOption(14, b"\xff\xff") msg.use_edns(edns=True, options=[opt]) - pattern = re.compile("trust-anchor-telemetry './IN' from .* 65535") + pattern = Re("trust-anchor-telemetry './IN' from .* 65535") with ns1.watch_log_from_here() as watcher: res = isctest.query.tcp(msg, "10.53.0.1") watcher.wait_for_line(pattern) @@ -79,7 +79,7 @@ def test_tat_queries(ns1, ns6): opt1 = edns.GenericOption(14, b"\xff\xff") opt2 = edns.GenericOption(14, b"\xff\xfe") msg.use_edns(edns=True, options=[opt2, opt1]) - pattern = re.compile("trust-anchor-telemetry './IN' from .* 65534") + pattern = Re("trust-anchor-telemetry './IN' from .* 65534") with ns1.watch_log_from_here() as watcher: res = isctest.query.tcp(msg, "10.53.0.1") isctest.check.noerror(res) diff --git a/bin/tests/system/dnssec/tests_validation_multiview.py b/bin/tests/system/dnssec/tests_validation_multiview.py index 085f388b2c..015e458349 100644 --- a/bin/tests/system/dnssec/tests_validation_multiview.py +++ b/bin/tests/system/dnssec/tests_validation_multiview.py @@ -10,7 +10,7 @@ # information regarding copyright ownership. import os -import re +from re import compile as Re import isctest @@ -47,7 +47,7 @@ def test_staticstub_delegations(): def test_validator_logging(ns4): # check that validator logging includes the view name with multiple views - pattern = re.compile("view rec: *validat") + pattern = Re("view rec: *validat") with ns4.watch_log_from_start() as watcher: msg = isctest.query.create("secure.example", "NS") isctest.query.tcp(msg, "10.53.0.4") diff --git a/bin/tests/system/isctest/kasp.py b/bin/tests/system/isctest/kasp.py index 2e1b2c36fb..f46ceabe73 100644 --- a/bin/tests/system/isctest/kasp.py +++ b/bin/tests/system/isctest/kasp.py @@ -15,6 +15,7 @@ import glob import os from pathlib import Path import re +from re import compile as Re import time from typing import Dict, List, Optional, Tuple, Union @@ -1475,7 +1476,7 @@ def next_key_event_equals(server, zone, next_event): waitfor = rf".*zone {zone}.*: next key event in (?!3600$)(.*) seconds" with server.watch_log_from_start() as watcher: - watcher.wait_for_line(re.compile(waitfor)) + watcher.wait_for_line(Re(waitfor)) # WMM: The with code below is extracting the line the watcher was # waiting for. If WatchLog.wait_for_line()` returned the matched string, diff --git a/bin/tests/system/isctest/log/watchlog.py b/bin/tests/system/isctest/log/watchlog.py index 6d85e9817d..d2d8521ebd 100644 --- a/bin/tests/system/isctest/log/watchlog.py +++ b/bin/tests/system/isctest/log/watchlog.py @@ -14,6 +14,7 @@ from typing import Any, Iterator, List, Match, Optional, Pattern, TextIO, TypeVa import abc import os import re +from re import compile as Re import time @@ -213,7 +214,7 @@ class WatchLog(abc.ABC): if isinstance(string, Pattern): patterns.append(string) elif isinstance(string, str): - pattern = re.compile(re.escape(string)) + pattern = Re(re.escape(string)) patterns.append(pattern) else: raise WatchLogException( @@ -256,13 +257,14 @@ class WatchLog(abc.ABC): Recommended use: ```python + from re import compile as Re import isctest def test_foo(servers): with servers["ns1"].watch_log_from_start() as watcher: watcher.wait_for_line("all zones loaded") - pattern = re.compile(r"next key event in ([0-9]+) seconds") + pattern = Re(r"next key event in ([0-9]+) seconds") with servers["ns1"].watch_log_from_here() as watcher: # ... do stuff here ... match = watcher.wait_for_line(pattern) @@ -321,7 +323,8 @@ class WatchLog(abc.ABC): >>> # Different values must be returned depending on which line is >>> # found in the log file. >>> import tempfile - >>> patterns = [re.compile(r"bar ([0-9])"), "qux"] + >>> from re import compile as Re + >>> patterns = [Re(r"bar ([0-9])"), "qux"] >>> with tempfile.NamedTemporaryFile("w") as file: ... print("foo bar 3", file=file, flush=True) ... with WatchLogFromStart(file.name) as watcher: @@ -443,7 +446,8 @@ class WatchLog(abc.ABC): >>> assert ret[1].group(0) == "foo" >>> import tempfile - >>> bar_pattern = re.compile('bar') + >>> from re import compile as Re + >>> bar_pattern = Re('bar') >>> patterns = ['foo', bar_pattern] >>> with tempfile.NamedTemporaryFile("w") as file: ... print("bar", file=file, flush=True) diff --git a/bin/tests/system/isctest/vars/openssl.py b/bin/tests/system/isctest/vars/openssl.py index 19d62d1811..3d1829e723 100644 --- a/bin/tests/system/isctest/vars/openssl.py +++ b/bin/tests/system/isctest/vars/openssl.py @@ -10,7 +10,7 @@ # information regarding copyright ownership. import os -import re +from re import compile as Re from typing import Optional from .. import log @@ -30,7 +30,7 @@ def parse_openssl_config(path: Optional[str]): return assert os.path.isfile(path), f"{path} exists, but it's not a file" - regex = re.compile(r"([^=]+)=(.*)") + regex = Re(r"([^=]+)=(.*)") log.debug(f"parsing openssl config: {path}") with open(path, "r", encoding="utf-8") as conf: for line in conf: diff --git a/bin/tests/system/multisigner/tests_multisigner.py b/bin/tests/system/multisigner/tests_multisigner.py index c356595afe..900cc7e7b4 100644 --- a/bin/tests/system/multisigner/tests_multisigner.py +++ b/bin/tests/system/multisigner/tests_multisigner.py @@ -12,6 +12,7 @@ from datetime import timedelta import os import re +from re import compile as Re import pytest @@ -103,9 +104,7 @@ def check_no_dnssec_in_journal(server, zone): ] cmd = isctest.run.cmd(journalprint) - pattern = re.compile( - r"^\s*(?:\S+\s+){4}(NSEC|NSEC3|NSEC3PARAM|RRSIG)", flags=re.MULTILINE - ) + pattern = Re(r"^\s*(?:\S+\s+){4}(NSEC|NSEC3|NSEC3PARAM|RRSIG)", flags=re.MULTILINE) match = pattern.search(cmd.out) assert not match, f"{match.group(1)} record found in journal" diff --git a/bin/tests/system/xfer-servers-list/tests_xfer_servers_list.py b/bin/tests/system/xfer-servers-list/tests_xfer_servers_list.py index 612be36b1d..54cf2fcaf2 100644 --- a/bin/tests/system/xfer-servers-list/tests_xfer_servers_list.py +++ b/bin/tests/system/xfer-servers-list/tests_xfer_servers_list.py @@ -9,7 +9,7 @@ # See the COPYRIGHT file distributed with this work for additional # information regarding copyright ownership. -import re +from re import compile as Re import isctest @@ -32,7 +32,7 @@ def wait_for_initial_xfrin(ns): def wait_for_sending_notify(ns1, ns, key_name): - pattern = re.compile( + pattern = Re( f"zone test/IN: sending notify to {ns.ip}#[0-9]+ : TSIG \\({key_name}\\)" ) with ns1.watch_log_from_start() as watcher: diff --git a/bin/tests/system/xferquota/tests_xferquota.py b/bin/tests/system/xferquota/tests_xferquota.py index a4edc5998f..a0d042d135 100644 --- a/bin/tests/system/xferquota/tests_xferquota.py +++ b/bin/tests/system/xferquota/tests_xferquota.py @@ -12,6 +12,7 @@ import glob import os import re +from re import compile as Re import shutil import signal import time @@ -71,7 +72,7 @@ def test_xferquota(named_port, ns1, ns2): isctest.check.rrsets_equal(ns1response.answer, ns2response.answer) query_and_compare(axfr_msg) - pattern = re.compile( + pattern = Re( f"transfer of 'changing/IN' from 10.53.0.1#{named_port}: " f"Transfer completed: .*\\(serial 2\\)" )