mirror of
https://gitlab.nic.cz/knot/knot-dns.git
synced 2026-02-03 18:49:28 -05:00
156 lines
4.1 KiB
Python
156 lines
4.1 KiB
Python
#!/usr/bin/env python3
|
|
|
|
''' Check 'dnstap' query module functionality. '''
|
|
|
|
import os
|
|
import re
|
|
import dns
|
|
from dnstest.test import Test
|
|
from dnstest.module import ModDnstap
|
|
from dnstest.utils import *
|
|
|
|
# --- Simple DNSTAP file parser ---
|
|
|
|
def read_byte(f, left):
|
|
if left[0] < 1:
|
|
return 0
|
|
left[0] -= 1
|
|
return f.read(1)[0]
|
|
|
|
def read_be32(f, left):
|
|
if left[0] < 4:
|
|
return 0
|
|
left[0] -= 4
|
|
be = f.read(4)
|
|
return (be[0] << 24) + (be[1] << 16) + (be[2] << 8) + be[3]
|
|
|
|
def pb_varint(f, left):
|
|
x = 0
|
|
shift = 0
|
|
while True:
|
|
b = read_byte(f, left)
|
|
x += ((b & 0x7f) << shift)
|
|
if (b & 0x80) == 0:
|
|
return x
|
|
shift += 7
|
|
|
|
def pb_skip2field(f, left, field):
|
|
while left[0] > 0:
|
|
b = read_byte(f, left)
|
|
if (b >> 3) == field:
|
|
return ((b & 0x2) != 0)
|
|
if (b & 0x1) != 0:
|
|
valen = 4 if (b & 0x7) == 5 else 8
|
|
left[0] -= valen
|
|
_ = f.read(valen)
|
|
else:
|
|
val = pb_varint(f, left)
|
|
if (b & 0x2) != 0: # val is length
|
|
left[0] -= val
|
|
_ = f.read(val)
|
|
|
|
def sd_file(f, msg, field, subfield):
|
|
left = [ 2**62 ]
|
|
|
|
# skip intro frame
|
|
zero = read_be32(f, left)
|
|
if zero != 0:
|
|
return None
|
|
intro = read_be32(f, left)
|
|
_ = f.read(intro)
|
|
|
|
# skip to n-th msg
|
|
for imsg in range(msg):
|
|
skip = read_be32(f, left)
|
|
_ = f.read(skip)
|
|
|
|
left[0] = read_be32(f, left)
|
|
# now continue with protobuf format
|
|
|
|
haslen = pb_skip2field(f, left, field)
|
|
if subfield is not None:
|
|
if not haslen:
|
|
return None
|
|
left_prev = left[0]
|
|
left[0] = pb_varint(f, left)
|
|
if left_prev < left[0]:
|
|
return None
|
|
haslen = pb_skip2field(f, left, subfield)
|
|
|
|
val = pb_varint(f, left)
|
|
if haslen:
|
|
return f.read(val) if left[0] >= val else None
|
|
else:
|
|
return val
|
|
|
|
def simple_dnstap(file, msg, field, subfield=None):
|
|
with open(file, "rb") as f:
|
|
return sd_file(f, msg, field, subfield)
|
|
|
|
# --- Simple DNSTAP file parser end ---
|
|
|
|
def sink_contains(sink, qname):
|
|
try:
|
|
for msg in range(1000): # in practice, while(!exception)
|
|
query = simple_dnstap(sink, msg, 14, 10)
|
|
if query is None or query == 0:
|
|
continue
|
|
qr = dns.message.from_wire(query)
|
|
if str(qr.question[0].name) == str(qname):
|
|
return True
|
|
except Exception as e:
|
|
return False
|
|
return False
|
|
|
|
def sink_contains_proto(sink, proto):
|
|
try:
|
|
for msg in range(1000): # in practice, while(!exception)
|
|
prot = simple_dnstap(sink, msg, 14, 3)
|
|
if prot == proto:
|
|
return True
|
|
except Exception as e:
|
|
return False
|
|
return False
|
|
|
|
t = Test(quic=True, stress=False)
|
|
|
|
ModDnstap.check()
|
|
|
|
# Initialize server configuration
|
|
knot = t.server("knot")
|
|
slave = t.server("knot")
|
|
zone = t.zone("flags.")
|
|
t.link(zone, knot, slave)
|
|
|
|
# Configure 'dnstap' module for all queries (default).
|
|
dflt_sink = t.out_dir + "/all.tap"
|
|
knot.add_module(None, ModDnstap(dflt_sink))
|
|
# Configure 'dnstap' module for flags zone only.
|
|
flags_sink = t.out_dir + "/flags.tap"
|
|
knot.add_module(zone, ModDnstap(flags_sink))
|
|
|
|
t.start()
|
|
|
|
slave.zone_wait(zone)
|
|
|
|
dflt_qname = "dnstap_default_test" + ".example."
|
|
resp = knot.dig(dflt_qname, "NS")
|
|
flags_qname = "dnstap_flags_test" + ".flags."
|
|
resp = knot.dig(flags_qname, "NS")
|
|
|
|
knot.stop()
|
|
|
|
# Check if dnstap sinks exist.
|
|
isset(os.path.isfile(dflt_sink), "default sink")
|
|
isset(os.path.isfile(flags_sink), "zone sink")
|
|
|
|
# Check sink contents.
|
|
isset(sink_contains(dflt_sink, flags_qname), "qname '%s' in '%s'" % (flags_qname, dflt_sink))
|
|
isset(sink_contains(dflt_sink, dflt_qname), "qname '%s' in '%s'" % (dflt_qname, dflt_sink))
|
|
isset(sink_contains(flags_sink, flags_qname), "qname '%s' in '%s'" % (flags_qname, flags_sink))
|
|
isset(not sink_contains(flags_sink, dflt_qname), "qname '%s' in '%s'" % (dflt_qname, flags_sink))
|
|
|
|
isset(sink_contains_proto(dflt_sink, 7), "QUIC in '%s'" % dflt_sink)
|
|
isset(sink_contains_proto(flags_sink, 7), "QUIC in '%s'" % flags_sink)
|
|
|
|
t.end()
|