mirror of
https://gitlab.nic.cz/knot/knot-dns.git
synced 2026-02-03 18:49:28 -05:00
90 lines
3.5 KiB
Python
90 lines
3.5 KiB
Python
#!/usr/bin/env python3
|
|
|
|
'''Test for the probe module'''
|
|
|
|
import dns.rdatatype
|
|
|
|
from dnstest.utils import *
|
|
from dnstest.test import Test
|
|
from dnstest.module import ModProbe
|
|
from dnstest.libknot import libknot
|
|
|
|
class TestItem(object):
|
|
def __init__(self, qname: str, qtype: str, udp: bool, dnssec: bool, rcode: str,
|
|
ede: int = libknot.probe.KnotProbeData.EDE_NONE, aa: bool = False,
|
|
nsid: bool = False, xdp: bool = False):
|
|
self.qname = qname
|
|
self.qtype = qtype
|
|
self.udp = udp
|
|
self.dnssec = dnssec
|
|
self.rcode = rcode
|
|
self.ede = ede
|
|
self.aa = aa
|
|
self.nsid = nsid
|
|
self.xdp = xdp
|
|
|
|
def check(self, data, server):
|
|
compare(data.ip, 6 if ":" in server.addr else 4, "IP version")
|
|
ref_proto = libknot.probe.KnotProbeDataProto.UDP if self.udp else \
|
|
libknot.probe.KnotProbeDataProto.TCP
|
|
compare(data.proto, ref_proto.value, "proto")
|
|
compare(data.addr_str(data.local_addr), server.addr, "local address")
|
|
if self.xdp and server.xdp_port is not None and server.xdp_port > 0:
|
|
compare(data.local_port, server.xdp_port, "local XDP port")
|
|
else:
|
|
compare(data.local_port, server.port, "local port")
|
|
|
|
compare(data.qname_str(), str(dns.name.from_text(self.qname)), "qname")
|
|
compare(data.query_type, dns.rdatatype.from_text(self.qtype), "qtype")
|
|
compare(data.query_class, 1, "qclass")
|
|
|
|
ref_edns = self.dnssec or self.nsid
|
|
compare(bool(data.edns_present), ref_edns, "EDNS")
|
|
if ref_edns:
|
|
compare(data.edns_version, 0, "EDNS version")
|
|
compare(data.edns_options, 8 if self.nsid else 0, "NSID")
|
|
compare(bool(data.edns_flag_do), self.dnssec, "DO")
|
|
|
|
ref_rcode = dns.rcode.from_text(self.rcode)
|
|
compare(data.reply_rcode, ref_rcode, "rcode")
|
|
compare(data.reply_ede, self.ede, "EDE")
|
|
|
|
compare(data.reply_hdr.id, data.query_hdr.id, "header ID match")
|
|
compare(bool(data.reply_hdr.flag_qr), True, "reply QR")
|
|
compare(bool(data.query_hdr.flag_qr), False, "query QR")
|
|
compare(data.reply_hdr.opcode, 0, "reply OPCODE")
|
|
compare(data.query_hdr.opcode, 0, "query OPCODE")
|
|
compare(bool(data.reply_hdr.flag_aa), self.aa, "reply AA")
|
|
compare(data.reply_hdr.rcode, ref_rcode, "reply RCODE")
|
|
compare(data.reply_hdr.questions, 1, "reply questions")
|
|
compare(data.query_hdr.questions, 1, "query questions")
|
|
|
|
tests = [TestItem("ns1.example.", "A", udp=True, dnssec=False, rcode="NOERROR", aa=True),
|
|
TestItem("not-exists.example.", "A", udp=False, dnssec=True, rcode="NXDOMAIN", aa=True),
|
|
TestItem("example.", "SOA", udp=True, dnssec=True, rcode="NOERROR", aa=True, nsid=True),
|
|
TestItem(".", "SOA", udp=True, dnssec=False, rcode="REFUSED", xdp=True),
|
|
TestItem(".", "SOA", udp=True, dnssec=True, rcode="REFUSED", ede=20)]
|
|
|
|
t = Test(stress=False, tsig=False)
|
|
|
|
ModProbe.check()
|
|
|
|
server = t.server("knot")
|
|
zone = t.zone("example.")
|
|
t.link(zone, server)
|
|
server.add_module(None, ModProbe(t.out_dir, channels=1))
|
|
|
|
probe = libknot.probe.KnotProbe(t.out_dir, idx=1)
|
|
data = libknot.probe.KnotProbeDataArray()
|
|
|
|
t.start()
|
|
|
|
t.sleep(1) # Not zone_wait() as it would generate probe data!
|
|
|
|
for item in tests:
|
|
resp = server.dig(item.qname, item.qtype, udp=item.udp, dnssec=item.dnssec, nsid=item.nsid, xdp=item.xdp)
|
|
probe.consume(data)
|
|
compare(data.used, 1, "data array occupation")
|
|
item.check(data[0], server)
|
|
|
|
t.end()
|