Use Re() for creating regular expressions

It's a fairly common pattern to use regular expression in our tests.
Instead of using the fairly verbose re.compile(), import that function
as Re() instead to allow for more brevity in the test syntax.
This commit is contained in:
Nicki Křížek 2025-10-02 11:47:56 +02:00
parent ac998da3f6
commit ac7127d620
12 changed files with 34 additions and 29 deletions

View file

@ -9,7 +9,7 @@
# See the COPYRIGHT file distributed with this work for additional
# information regarding copyright ownership.
import re
from re import compile as Re
import pytest
@ -31,7 +31,7 @@ pytestmark = pytest.mark.extra_artifacts(
@pytest.fixture(scope="module")
def transfers_complete(servers):
for zone in ["example", "example-aes-128", "example-aes-256", "example-chacha-20"]:
pattern = re.compile(
pattern = Re(
f"transfer of '{zone}/IN' from 10.53.0.1#[0-9]+: Transfer completed"
)
for ns in ["ns2", "ns3", "ns4", "ns5"]:

View file

@ -9,8 +9,7 @@
# See the COPYRIGHT file distributed with this work for additional
# information regarding copyright ownership.
import re
from re import compile as Re
import isctest
@ -59,7 +58,7 @@ def test_reload_fails_log(ns1, templates):
"apply_configuration",
"loop exclusive mode: starting",
"apply_configuration: configure_views",
re.compile(r".*port '9999999' out of range"),
Re(r".*port '9999999' out of range"),
"apply_configuration: detaching views",
"loop exclusive mode: ending",
"reloading configuration failed",

View file

@ -12,7 +12,7 @@
import filecmp
import os
from pathlib import Path
import re
from re import compile as Re
import shutil
import subprocess
import tempfile
@ -53,7 +53,7 @@ else:
XDIST_WORKER = os.environ.get("PYTEST_XDIST_WORKER", "")
FILE_DIR = os.path.abspath(Path(__file__).parent)
ENV_RE = re.compile(b"([^=]+)=(.*)")
ENV_RE = Re(b"([^=]+)=(.*)")
PRIORITY_TESTS = [
# Tests that are scheduled first. Speeds up parallel execution.
"rpz/",
@ -62,9 +62,9 @@ PRIORITY_TESTS = [
"timeouts/",
"upforwd/",
]
PRIORITY_TESTS_RE = re.compile("|".join(PRIORITY_TESTS))
SYSTEM_TEST_NAME_RE = re.compile(f"{SYSTEM_TEST_DIR_GIT_PATH}" + r"/([^/]+)")
SYMLINK_REPLACEMENT_RE = re.compile(r"/tests(_.*)\.py")
PRIORITY_TESTS_RE = Re("|".join(PRIORITY_TESTS))
SYSTEM_TEST_NAME_RE = Re(f"{SYSTEM_TEST_DIR_GIT_PATH}" + r"/([^/]+)")
SYMLINK_REPLACEMENT_RE = Re(r"/tests(_.*)\.py")
# ----------------------- Global requirements ----------------------------

View file

@ -12,6 +12,7 @@
from collections import namedtuple
import os
import re
from re import compile as Re
import struct
import time
@ -475,7 +476,7 @@ def test_offline_ksk_signing(ns2):
os.rename(f"ns2/{KSK}.private.bak", f"ns2/{KSK}.private")
def loadkeys():
pattern = re.compile(f"{zone}/IN.*next key event")
pattern = Re(f"{zone}/IN.*next key event")
with ns2.watch_log_from_here() as watcher:
ns2.rndc(f"loadkeys {zone}", log=False)
watcher.wait_for_line(pattern)

View file

@ -10,7 +10,7 @@
# information regarding copyright ownership.
import os
import re
from re import compile as Re
from dns import edns
@ -67,7 +67,7 @@ def test_tat_queries(ns1, ns6):
msg = isctest.query.create(".", "DNSKEY")
opt = edns.GenericOption(14, b"\xff\xff")
msg.use_edns(edns=True, options=[opt])
pattern = re.compile("trust-anchor-telemetry './IN' from .* 65535")
pattern = Re("trust-anchor-telemetry './IN' from .* 65535")
with ns1.watch_log_from_here() as watcher:
res = isctest.query.tcp(msg, "10.53.0.1")
watcher.wait_for_line(pattern)
@ -79,7 +79,7 @@ def test_tat_queries(ns1, ns6):
opt1 = edns.GenericOption(14, b"\xff\xff")
opt2 = edns.GenericOption(14, b"\xff\xfe")
msg.use_edns(edns=True, options=[opt2, opt1])
pattern = re.compile("trust-anchor-telemetry './IN' from .* 65534")
pattern = Re("trust-anchor-telemetry './IN' from .* 65534")
with ns1.watch_log_from_here() as watcher:
res = isctest.query.tcp(msg, "10.53.0.1")
isctest.check.noerror(res)

View file

@ -10,7 +10,7 @@
# information regarding copyright ownership.
import os
import re
from re import compile as Re
import isctest
@ -47,7 +47,7 @@ def test_staticstub_delegations():
def test_validator_logging(ns4):
# check that validator logging includes the view name with multiple views
pattern = re.compile("view rec: *validat")
pattern = Re("view rec: *validat")
with ns4.watch_log_from_start() as watcher:
msg = isctest.query.create("secure.example", "NS")
isctest.query.tcp(msg, "10.53.0.4")

View file

@ -15,6 +15,7 @@ import glob
import os
from pathlib import Path
import re
from re import compile as Re
import time
from typing import Dict, List, Optional, Tuple, Union
@ -1475,7 +1476,7 @@ def next_key_event_equals(server, zone, next_event):
waitfor = rf".*zone {zone}.*: next key event in (?!3600$)(.*) seconds"
with server.watch_log_from_start() as watcher:
watcher.wait_for_line(re.compile(waitfor))
watcher.wait_for_line(Re(waitfor))
# WMM: The with code below is extracting the line the watcher was
# waiting for. If WatchLog.wait_for_line()` returned the matched string,

View file

@ -14,6 +14,7 @@ from typing import Any, Iterator, List, Match, Optional, Pattern, TextIO, TypeVa
import abc
import os
import re
from re import compile as Re
import time
@ -213,7 +214,7 @@ class WatchLog(abc.ABC):
if isinstance(string, Pattern):
patterns.append(string)
elif isinstance(string, str):
pattern = re.compile(re.escape(string))
pattern = Re(re.escape(string))
patterns.append(pattern)
else:
raise WatchLogException(
@ -256,13 +257,14 @@ class WatchLog(abc.ABC):
Recommended use:
```python
from re import compile as Re
import isctest
def test_foo(servers):
with servers["ns1"].watch_log_from_start() as watcher:
watcher.wait_for_line("all zones loaded")
pattern = re.compile(r"next key event in ([0-9]+) seconds")
pattern = Re(r"next key event in ([0-9]+) seconds")
with servers["ns1"].watch_log_from_here() as watcher:
# ... do stuff here ...
match = watcher.wait_for_line(pattern)
@ -321,7 +323,8 @@ class WatchLog(abc.ABC):
>>> # Different values must be returned depending on which line is
>>> # found in the log file.
>>> import tempfile
>>> patterns = [re.compile(r"bar ([0-9])"), "qux"]
>>> from re import compile as Re
>>> patterns = [Re(r"bar ([0-9])"), "qux"]
>>> with tempfile.NamedTemporaryFile("w") as file:
... print("foo bar 3", file=file, flush=True)
... with WatchLogFromStart(file.name) as watcher:
@ -443,7 +446,8 @@ class WatchLog(abc.ABC):
>>> assert ret[1].group(0) == "foo"
>>> import tempfile
>>> bar_pattern = re.compile('bar')
>>> from re import compile as Re
>>> bar_pattern = Re('bar')
>>> patterns = ['foo', bar_pattern]
>>> with tempfile.NamedTemporaryFile("w") as file:
... print("bar", file=file, flush=True)

View file

@ -10,7 +10,7 @@
# information regarding copyright ownership.
import os
import re
from re import compile as Re
from typing import Optional
from .. import log
@ -30,7 +30,7 @@ def parse_openssl_config(path: Optional[str]):
return
assert os.path.isfile(path), f"{path} exists, but it's not a file"
regex = re.compile(r"([^=]+)=(.*)")
regex = Re(r"([^=]+)=(.*)")
log.debug(f"parsing openssl config: {path}")
with open(path, "r", encoding="utf-8") as conf:
for line in conf:

View file

@ -12,6 +12,7 @@
from datetime import timedelta
import os
import re
from re import compile as Re
import pytest
@ -103,9 +104,7 @@ def check_no_dnssec_in_journal(server, zone):
]
cmd = isctest.run.cmd(journalprint)
pattern = re.compile(
r"^\s*(?:\S+\s+){4}(NSEC|NSEC3|NSEC3PARAM|RRSIG)", flags=re.MULTILINE
)
pattern = Re(r"^\s*(?:\S+\s+){4}(NSEC|NSEC3|NSEC3PARAM|RRSIG)", flags=re.MULTILINE)
match = pattern.search(cmd.out)
assert not match, f"{match.group(1)} record found in journal"

View file

@ -9,7 +9,7 @@
# See the COPYRIGHT file distributed with this work for additional
# information regarding copyright ownership.
import re
from re import compile as Re
import isctest
@ -32,7 +32,7 @@ def wait_for_initial_xfrin(ns):
def wait_for_sending_notify(ns1, ns, key_name):
pattern = re.compile(
pattern = Re(
f"zone test/IN: sending notify to {ns.ip}#[0-9]+ : TSIG \\({key_name}\\)"
)
with ns1.watch_log_from_start() as watcher:

View file

@ -12,6 +12,7 @@
import glob
import os
import re
from re import compile as Re
import shutil
import signal
import time
@ -71,7 +72,7 @@ def test_xferquota(named_port, ns1, ns2):
isctest.check.rrsets_equal(ns1response.answer, ns2response.answer)
query_and_compare(axfr_msg)
pattern = re.compile(
pattern = Re(
f"transfer of 'changing/IN' from 10.53.0.1#{named_port}: "
f"Transfer completed: .*\\(serial 2\\)"
)