bind9/bin/tests/system/statschannel/generic.py
Aram Sargsyan 4e94ff2541 Fix a statschannel system test zone loadtime issue
The check_loaded() function compares the zone's loadtime value and
an expected loadtime value, which is based on the zone file's mtime
extracted from the filesystem.

For the secondary zones there may be cases, when the zone file isn't
ready yet before the zone transfer is complete and the zone file is
dumped to the disk, so a so zero value mtime is retrieved.

In such cases wait one second and retry until timeout. Also modify
the affected check to allow a possible difference of the same amount
of seconds as the chosen timeout value.
2023-12-18 08:46:31 +00:00

229 lines
6.8 KiB
Python

# Copyright (C) Internet Systems Consortium, Inc. ("ISC")
#
# SPDX-License-Identifier: MPL-2.0
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, you can obtain one at https://mozilla.org/MPL/2.0/.
#
# See the COPYRIGHT file distributed with this work for additional
# information regarding copyright ownership.
from datetime import datetime, timedelta
from collections import defaultdict
from time import sleep
import os
import dns.message
import dns.query
import dns.rcode
import isctest
# ISO datetime format without msec
fmt = "%Y-%m-%dT%H:%M:%SZ"
# The constants were taken from BIND 9 source code (lib/dns/zone.c)
max_refresh = timedelta(seconds=2419200) # 4 weeks
max_expires = timedelta(seconds=14515200) # 24 weeks
dayzero = datetime.utcfromtimestamp(0).replace(microsecond=0)
# Wait for the secondary zone files to appear to extract their mtime
max_secondary_zone_waittime_sec = 5
# Generic helper functions
def check_expires(expires, min_time, max_time):
assert expires >= min_time
assert expires <= max_time
def check_refresh(refresh, min_time, max_time):
assert refresh >= min_time
assert refresh <= max_time
def check_loaded(loaded, expected, now):
# Sanity check the zone timers values
assert (loaded - expected).total_seconds() < max_secondary_zone_waittime_sec
assert loaded <= now
def check_zone_timers(loaded, expires, refresh, loaded_exp):
now = datetime.utcnow().replace(microsecond=0)
# Sanity checks the zone timers values
if expires is not None:
check_expires(expires, now, now + max_expires)
if refresh is not None:
check_refresh(refresh, now, now + max_refresh)
check_loaded(loaded, loaded_exp, now)
#
# The output is gibberish, but at least make sure it does not crash.
#
def check_manykeys(name, zone=None):
# pylint: disable=unused-argument
assert name == "manykeys"
def zone_mtime(zonedir, name):
try:
si = os.stat(os.path.join(zonedir, "{}.db".format(name)))
except FileNotFoundError:
return dayzero
mtime = datetime.utcfromtimestamp(si.st_mtime).replace(microsecond=0)
return mtime
def test_zone_timers_primary(fetch_zones, load_timers, **kwargs):
statsip = kwargs["statsip"]
statsport = kwargs["statsport"]
zonedir = kwargs["zonedir"]
zones = fetch_zones(statsip, statsport)
for zone in zones:
(name, loaded, expires, refresh) = load_timers(zone, True)
mtime = zone_mtime(zonedir, name)
check_zone_timers(loaded, expires, refresh, mtime)
def test_zone_timers_secondary(fetch_zones, load_timers, **kwargs):
statsip = kwargs["statsip"]
statsport = kwargs["statsport"]
zonedir = kwargs["zonedir"]
# If any one of the zone files isn't ready, then retry until timeout.
tries = max_secondary_zone_waittime_sec
while tries >= 0:
zones = fetch_zones(statsip, statsport)
again = False
for zone in zones:
(name, loaded, expires, refresh) = load_timers(zone, False)
mtime = zone_mtime(zonedir, name)
if (mtime != dayzero) or (tries == 0):
# mtime was either retrieved successfully or no tries were
# left, run the check anyway.
check_zone_timers(loaded, expires, refresh, mtime)
else:
tries = tries - 1
again = True
break
if again:
sleep(1)
else:
break
def test_zone_with_many_keys(fetch_zones, load_zone, **kwargs):
statsip = kwargs["statsip"]
statsport = kwargs["statsport"]
zones = fetch_zones(statsip, statsport)
for zone in zones:
name = load_zone(zone)
if name == "manykeys":
check_manykeys(name)
def create_msg(qname, qtype):
msg = dns.message.make_query(
qname, qtype, want_dnssec=True, use_edns=0, payload=4096
)
return msg
def create_expected(data):
expected = {
"dns-tcp-requests-sizes-received-ipv4": defaultdict(int),
"dns-tcp-responses-sizes-sent-ipv4": defaultdict(int),
"dns-tcp-requests-sizes-received-ipv6": defaultdict(int),
"dns-tcp-responses-sizes-sent-ipv6": defaultdict(int),
"dns-udp-requests-sizes-received-ipv4": defaultdict(int),
"dns-udp-requests-sizes-received-ipv6": defaultdict(int),
"dns-udp-responses-sizes-sent-ipv4": defaultdict(int),
"dns-udp-responses-sizes-sent-ipv6": defaultdict(int),
}
for k, v in data.items():
for kk, vv in v.items():
expected[k][kk] += vv
return expected
def update_expected(expected, key, msg):
msg_len = len(msg.to_wire())
bucket_num = (msg_len // 16) * 16
bucket = "{}-{}".format(bucket_num, bucket_num + 15)
expected[key][bucket] += 1
def check_traffic(data, expected):
def ordered(obj):
if isinstance(obj, dict):
return sorted((k, ordered(v)) for k, v in obj.items())
if isinstance(obj, list):
return sorted(ordered(x) for x in obj)
return obj
ordered_data = ordered(data)
ordered_expected = ordered(expected)
assert len(ordered_data) == 8
assert len(ordered_expected) == 8
assert len(data) == len(ordered_data)
assert len(expected) == len(ordered_expected)
assert ordered_data == ordered_expected
def test_traffic(fetch_traffic, **kwargs):
statsip = kwargs["statsip"]
statsport = kwargs["statsport"]
data = fetch_traffic(statsip, statsport)
exp = create_expected(data)
msg = create_msg("short.example.", "TXT")
update_expected(exp, "dns-udp-requests-sizes-received-ipv4", msg)
ans = isctest.query.udp(msg, statsip)
isctest.check.noerror(ans)
update_expected(exp, "dns-udp-responses-sizes-sent-ipv4", ans)
data = fetch_traffic(statsip, statsport)
check_traffic(data, exp)
msg = create_msg("long.example.", "TXT")
update_expected(exp, "dns-udp-requests-sizes-received-ipv4", msg)
ans = isctest.query.udp(msg, statsip)
isctest.check.noerror(ans)
update_expected(exp, "dns-udp-responses-sizes-sent-ipv4", ans)
data = fetch_traffic(statsip, statsport)
check_traffic(data, exp)
msg = create_msg("short.example.", "TXT")
update_expected(exp, "dns-tcp-requests-sizes-received-ipv4", msg)
ans = isctest.query.tcp(msg, statsip)
isctest.check.noerror(ans)
update_expected(exp, "dns-tcp-responses-sizes-sent-ipv4", ans)
data = fetch_traffic(statsip, statsport)
check_traffic(data, exp)
msg = create_msg("long.example.", "TXT")
update_expected(exp, "dns-tcp-requests-sizes-received-ipv4", msg)
ans = isctest.query.tcp(msg, statsip)
isctest.check.noerror(ans)
update_expected(exp, "dns-tcp-responses-sizes-sent-ipv4", ans)
data = fetch_traffic(statsip, statsport)
check_traffic(data, exp)