mirror of
https://github.com/isc-projects/bind9.git
synced 2026-02-28 20:41:18 -05:00
153 lines
4.2 KiB
Python
153 lines
4.2 KiB
Python
############################################################################
|
|
# Copyright (C) Internet Systems Consortium, Inc. ("ISC")
|
|
#
|
|
# This Source Code Form is subject to the terms of the Mozilla Public
|
|
# License, v. 2.0. If a copy of the MPL was not distributed with this
|
|
# file, you can obtain one at https://mozilla.org/MPL/2.0/.
|
|
#
|
|
# See the COPYRIGHT file distributed with this work for additional
|
|
# information regarding copyright ownership.
|
|
############################################################################
|
|
|
|
import os
|
|
import os.path
|
|
|
|
from collections import defaultdict
|
|
from datetime import datetime, timedelta
|
|
|
|
import dns.message
|
|
import dns.query
|
|
import dns.rcode
|
|
|
|
# ISO datetime format without msec
|
|
fmt = '%Y-%m-%dT%H:%M:%SZ'
|
|
|
|
# The constants were taken from BIND 9 source code (lib/dns/zone.c)
|
|
max_refresh = timedelta(seconds=2419200) # 4 weeks
|
|
max_expires = timedelta(seconds=14515200) # 24 weeks
|
|
now = datetime.utcnow().replace(microsecond=0)
|
|
dayzero = datetime.utcfromtimestamp(0).replace(microsecond=0)
|
|
|
|
|
|
TIMEOUT = 10
|
|
|
|
|
|
# Generic helper functions
|
|
def check_expires(expires, min_time, max_time):
|
|
assert expires >= min_time
|
|
assert expires <= max_time
|
|
|
|
|
|
def check_refresh(refresh, min_time, max_time):
|
|
assert refresh >= min_time
|
|
assert refresh <= max_time
|
|
|
|
|
|
def check_loaded(loaded, expected):
|
|
# Sanity check the zone timers values
|
|
assert loaded == expected
|
|
assert loaded < now
|
|
|
|
|
|
def check_zone_timers(loaded, expires, refresh, loaded_exp):
|
|
# Sanity checks the zone timers values
|
|
if expires is not None:
|
|
check_expires(expires, now, now + max_expires)
|
|
if refresh is not None:
|
|
check_refresh(refresh, now, now + max_refresh)
|
|
check_loaded(loaded, loaded_exp)
|
|
|
|
|
|
#
|
|
# The output is gibberish, but at least make sure it does not crash.
|
|
#
|
|
def check_manykeys(name, zone=None):
|
|
# pylint: disable=unused-argument
|
|
assert name == "manykeys"
|
|
|
|
|
|
def zone_mtime(zonedir, name):
|
|
|
|
try:
|
|
si = os.stat(os.path.join(zonedir, "{}.db".format(name)))
|
|
except FileNotFoundError:
|
|
return dayzero
|
|
|
|
mtime = datetime.utcfromtimestamp(si.st_mtime).replace(microsecond=0)
|
|
|
|
return mtime
|
|
|
|
|
|
def zone_keyid(nameserver, zone, key):
|
|
with open('{}/{}.{}.id'.format(nameserver, zone, key)) as f:
|
|
keyid = f.read().strip()
|
|
print('{}-{} ID: {}'.format(zone, key, keyid))
|
|
return keyid
|
|
|
|
|
|
def create_msg(qname, qtype):
|
|
msg = dns.message.make_query(qname, qtype, want_dnssec=True,
|
|
use_edns=0, payload=4096)
|
|
|
|
return msg
|
|
|
|
|
|
def udp_query(ip, port, msg):
|
|
|
|
ans = dns.query.udp(msg, ip, TIMEOUT, port=port)
|
|
assert ans.rcode() == dns.rcode.NOERROR
|
|
|
|
return ans
|
|
|
|
|
|
def tcp_query(ip, port, msg):
|
|
|
|
ans = dns.query.tcp(msg, ip, TIMEOUT, port=port)
|
|
assert ans.rcode() == dns.rcode.NOERROR
|
|
|
|
return ans
|
|
|
|
|
|
def create_expected(data):
|
|
expected = {"dns-tcp-requests-sizes-received-ipv4": defaultdict(int),
|
|
"dns-tcp-responses-sizes-sent-ipv4": defaultdict(int),
|
|
"dns-tcp-requests-sizes-received-ipv6": defaultdict(int),
|
|
"dns-tcp-responses-sizes-sent-ipv6": defaultdict(int),
|
|
"dns-udp-requests-sizes-received-ipv4": defaultdict(int),
|
|
"dns-udp-requests-sizes-received-ipv6": defaultdict(int),
|
|
"dns-udp-responses-sizes-sent-ipv4": defaultdict(int),
|
|
"dns-udp-responses-sizes-sent-ipv6": defaultdict(int),
|
|
}
|
|
|
|
for k, v in data.items():
|
|
for kk, vv in v.items():
|
|
expected[k][kk] += vv
|
|
|
|
return expected
|
|
|
|
|
|
def update_expected(expected, key, msg):
|
|
msg_len = len(msg.to_wire())
|
|
bucket_num = (msg_len // 16) * 16
|
|
bucket = "{}-{}".format(bucket_num, bucket_num + 15)
|
|
|
|
expected[key][bucket] += 1
|
|
|
|
|
|
def check_traffic(data, expected):
|
|
def ordered(obj):
|
|
if isinstance(obj, dict):
|
|
return sorted((k, ordered(v)) for k, v in obj.items())
|
|
if isinstance(obj, list):
|
|
return sorted(ordered(x) for x in obj)
|
|
return obj
|
|
|
|
ordered_data = ordered(data)
|
|
ordered_expected = ordered(expected)
|
|
|
|
assert len(ordered_data) == 8
|
|
assert len(ordered_expected) == 8
|
|
assert len(data) == len(ordered_data)
|
|
assert len(expected) == len(ordered_expected)
|
|
|
|
assert ordered_data == ordered_expected
|