knot-dns/tests-extra/tests/modules/probe/test.py
Libor Peltan 356350c473 tests-extra: use XDP when possible, including TCP ...
...on loopback interface, so exclusivity must be ensured with the help
of a lock file
2023-11-07 16:59:42 +01:00

90 lines
3.5 KiB
Python

#!/usr/bin/env python3
'''Test for the probe module'''
import dns.rdatatype
from dnstest.utils import *
from dnstest.test import Test
from dnstest.module import ModProbe
from dnstest.libknot import libknot
class TestItem(object):
def __init__(self, qname: str, qtype: str, udp: bool, dnssec: bool, rcode: str,
ede: int = libknot.probe.KnotProbeData.EDE_NONE, aa: bool = False,
nsid: bool = False, xdp: bool = False):
self.qname = qname
self.qtype = qtype
self.udp = udp
self.dnssec = dnssec
self.rcode = rcode
self.ede = ede
self.aa = aa
self.nsid = nsid
self.xdp = xdp
def check(self, data, server):
compare(data.ip, 6 if ":" in server.addr else 4, "IP version")
ref_proto = libknot.probe.KnotProbeDataProto.UDP if self.udp else \
libknot.probe.KnotProbeDataProto.TCP
compare(data.proto, ref_proto.value, "proto")
compare(data.addr_str(data.local_addr), server.addr, "local address")
if self.xdp and server.xdp_port is not None and server.xdp_port > 0:
compare(data.local_port, server.xdp_port, "local XDP port")
else:
compare(data.local_port, server.port, "local port")
compare(data.qname_str(), str(dns.name.from_text(self.qname)), "qname")
compare(data.query_type, dns.rdatatype.from_text(self.qtype), "qtype")
compare(data.query_class, 1, "qclass")
ref_edns = self.dnssec or self.nsid
compare(bool(data.edns_present), ref_edns, "EDNS")
if ref_edns:
compare(data.edns_version, 0, "EDNS version")
compare(data.edns_options, 8 if self.nsid else 0, "NSID")
compare(bool(data.edns_flag_do), self.dnssec, "DO")
ref_rcode = dns.rcode.from_text(self.rcode)
compare(data.reply_rcode, ref_rcode, "rcode")
compare(data.reply_ede, self.ede, "EDE")
compare(data.reply_hdr.id, data.query_hdr.id, "header ID match")
compare(bool(data.reply_hdr.flag_qr), True, "reply QR")
compare(bool(data.query_hdr.flag_qr), False, "query QR")
compare(data.reply_hdr.opcode, 0, "reply OPCODE")
compare(data.query_hdr.opcode, 0, "query OPCODE")
compare(bool(data.reply_hdr.flag_aa), self.aa, "reply AA")
compare(data.reply_hdr.rcode, ref_rcode, "reply RCODE")
compare(data.reply_hdr.questions, 1, "reply questions")
compare(data.query_hdr.questions, 1, "query questions")
tests = [TestItem("ns1.example.", "A", udp=True, dnssec=False, rcode="NOERROR", aa=True),
TestItem("not-exists.example.", "A", udp=False, dnssec=True, rcode="NXDOMAIN", aa=True),
TestItem("example.", "SOA", udp=True, dnssec=True, rcode="NOERROR", aa=True, nsid=True),
TestItem(".", "SOA", udp=True, dnssec=False, rcode="REFUSED", xdp=True),
TestItem(".", "SOA", udp=True, dnssec=True, rcode="REFUSED", ede=20)]
t = Test(stress=False, tsig=False)
ModProbe.check()
server = t.server("knot")
zone = t.zone("example.")
t.link(zone, server)
server.add_module(None, ModProbe(t.out_dir, channels=1))
probe = libknot.probe.KnotProbe(t.out_dir, idx=1)
data = libknot.probe.KnotProbeDataArray()
t.start()
t.sleep(1) # Not zone_wait() as it would generate probe data!
for item in tests:
resp = server.dig(item.qname, item.qtype, udp=item.udp, dnssec=item.dnssec, nsid=item.nsid, xdp=item.xdp)
probe.consume(data)
compare(data.used, 1, "data array occupation")
item.check(data[0], server)
t.end()