mirror of
https://gitlab.nic.cz/knot/knot-dns.git
synced 2026-02-03 18:49:28 -05:00
445 lines
13 KiB
Python
445 lines
13 KiB
Python
#!/usr/bin/env python3
|
|
|
|
import os
|
|
import re
|
|
from subprocess import Popen, PIPE, check_call
|
|
from dnstest.utils import *
|
|
import dnstest.config
|
|
import dnstest.params as params
|
|
|
|
class KnotModule(object):
|
|
'''Query module configuration'''
|
|
|
|
MOD_CONF_PREFIX = "mod-"
|
|
MOD_API_PREFIX = "knotd_mod_api_"
|
|
|
|
# Instance counter.
|
|
count = 1
|
|
# Module API name suffix.
|
|
mod_name = None
|
|
# Empty configuration
|
|
empty = False
|
|
|
|
def __init__(self):
|
|
self.conf_id = "id%s" % type(self).count
|
|
type(self).count += 1
|
|
|
|
@property
|
|
def conf_name(self):
|
|
return self.MOD_CONF_PREFIX + self.mod_name
|
|
|
|
@classmethod
|
|
def _check_cmd(cls):
|
|
if params.libtool_bin:
|
|
prefix = [params.libtool_bin, "exec"]
|
|
else:
|
|
prefix = []
|
|
|
|
return prefix + ["objdump", "-t", params.knot_bin]
|
|
|
|
@classmethod
|
|
def check(cls):
|
|
'''Checks the server binary for the module code'''
|
|
|
|
try:
|
|
proc = Popen(cls._check_cmd(), stdout=PIPE, stderr=PIPE,
|
|
universal_newlines=True)
|
|
(out, err) = proc.communicate()
|
|
|
|
if re.search(cls.MOD_API_PREFIX + cls.mod_name, out):
|
|
return
|
|
|
|
raise Skip()
|
|
except:
|
|
raise Skip("Module '%s' not detected" % cls.mod_name)
|
|
|
|
def get_conf_ref(self):
|
|
if self.empty:
|
|
return str(self.conf_name)
|
|
else:
|
|
return "%s/%s" % (self.conf_name, self.conf_id)
|
|
|
|
def get_conf(self, conf=None): pass
|
|
|
|
class ModSynthRecord(KnotModule):
|
|
'''Automatic forward/reverse records module'''
|
|
|
|
mod_name = "synthrecord"
|
|
|
|
def __init__(self, mtype, prefix, ttl, network, origin=None):
|
|
super().__init__()
|
|
self.mtype = mtype
|
|
self.prefix = prefix
|
|
self.ttl = ttl
|
|
self.network = network
|
|
self.origin = origin
|
|
|
|
def get_conf(self, conf=None):
|
|
if not conf:
|
|
conf = dnstest.config.KnotConf()
|
|
|
|
conf.begin(self.conf_name)
|
|
conf.id_item("id", self.conf_id)
|
|
conf.item_str("type", self.mtype)
|
|
if (self.prefix):
|
|
conf.item_str("prefix", self.prefix)
|
|
if (self.ttl):
|
|
conf.item_str("ttl", self.ttl)
|
|
conf.item("network", self.network)
|
|
if (self.origin):
|
|
conf.item_str("origin", self.origin)
|
|
conf.end()
|
|
|
|
return conf
|
|
|
|
class ModDnstap(KnotModule):
|
|
'''Dnstap module'''
|
|
|
|
mod_name = "dnstap"
|
|
|
|
def __init__(self, sink):
|
|
super().__init__()
|
|
self.sink = sink
|
|
|
|
def get_conf(self, conf=None):
|
|
if not conf:
|
|
conf = dnstest.config.KnotConf()
|
|
|
|
conf.begin(self.conf_name)
|
|
conf.id_item("id", self.conf_id)
|
|
conf.item_str("sink", self.sink)
|
|
conf.end()
|
|
|
|
return conf
|
|
|
|
class ModRRL(KnotModule):
|
|
'''RRL module'''
|
|
|
|
mod_name = "rrl"
|
|
|
|
def __init__(self, rate_limit, slip=None, table_size=None, whitelist=None,
|
|
instant_limit=None, log_period=0):
|
|
super().__init__()
|
|
self.rate_limit = rate_limit
|
|
self.instant_limit = instant_limit if instant_limit else rate_limit
|
|
self.slip = slip
|
|
self.table_size = table_size
|
|
self.whitelist = whitelist
|
|
self.log_period = log_period
|
|
|
|
def get_conf(self, conf=None):
|
|
if not conf:
|
|
conf = dnstest.config.KnotConf()
|
|
|
|
conf.begin(self.conf_name)
|
|
conf.id_item("id", self.conf_id)
|
|
conf.item_str("rate-limit", self.rate_limit)
|
|
conf.item_str("instant-limit", self.instant_limit)
|
|
if self.slip or self.slip == 0:
|
|
conf.item_str("slip", self.slip)
|
|
if self.table_size:
|
|
conf.item_str("table-size", self.table_size)
|
|
if self.whitelist:
|
|
conf.item_str("whitelist", self.whitelist)
|
|
if self.log_period > 0:
|
|
conf.item_str("log-period", self.log_period)
|
|
conf.end()
|
|
|
|
return conf
|
|
|
|
class ModDnsproxy(KnotModule):
|
|
'''Dnsproxy module'''
|
|
|
|
mod_name = "dnsproxy"
|
|
|
|
def __init__(self, addr, port=53, nxdomain=False, fallback=True):
|
|
super().__init__()
|
|
self.addr = addr
|
|
self.port = port
|
|
self.fallback = fallback
|
|
self.nxdomain = nxdomain
|
|
|
|
def get_conf(self, conf=None):
|
|
if not conf:
|
|
conf = dnstest.config.KnotConf()
|
|
|
|
conf.begin("remote")
|
|
conf.id_item("id", "%s_%s" % (self.conf_name, self.conf_id))
|
|
conf.item_str("address", "%s@%s" % (self.addr, self.port))
|
|
conf.end()
|
|
|
|
conf.begin(self.conf_name)
|
|
conf.id_item("id", self.conf_id)
|
|
conf.item_str("remote", "%s_%s" % (self.conf_name, self.conf_id))
|
|
conf.item_str("fallback", "on" if self.fallback else "off")
|
|
conf.item_str("catch-nxdomain", "on" if self.nxdomain else "off")
|
|
conf.end()
|
|
|
|
return conf
|
|
|
|
class ModWhoami(KnotModule):
|
|
'''Whoami module'''
|
|
|
|
mod_name = "whoami"
|
|
empty = True
|
|
|
|
def __init__(self):
|
|
super().__init__()
|
|
|
|
class ModOnlineSign(KnotModule):
|
|
'''Online-sign module'''
|
|
|
|
mod_name = "onlinesign"
|
|
|
|
def __init__(self, algorithm=None, key_size=None, prop_delay=3600, ksc=[ ],
|
|
ksci=999, ksk_life=9999, ksk_shared=False, cds_publish="rollover",
|
|
cds_digesttype="sha256", single_type_signing=True):
|
|
super().__init__()
|
|
self.algorithm = algorithm
|
|
self.key_size = key_size
|
|
if not algorithm:
|
|
self.empty = True
|
|
self.prop_delay = prop_delay
|
|
self.ksc = ksc
|
|
self.ksci = ksci
|
|
self.ksk_life = ksk_life
|
|
self.ksk_shared = ksk_shared
|
|
self.cds_publish = cds_publish
|
|
self.cds_digesttype = cds_digesttype
|
|
self.single_type_signing = single_type_signing
|
|
|
|
def get_conf(self, conf=None):
|
|
if not conf:
|
|
conf = dnstest.config.KnotConf()
|
|
|
|
if self.algorithm and len(self.ksc) > 0:
|
|
conf.begin("submission")
|
|
conf.id_item("id", "blahblah")
|
|
parents = ""
|
|
for parent in self.ksc:
|
|
if parents:
|
|
parents += ", "
|
|
parents += parent.name
|
|
conf.item("parent", "[%s]" % parents)
|
|
conf.item_str("check-interval", self.ksci)
|
|
|
|
if self.algorithm:
|
|
conf.begin("policy")
|
|
conf.id_item("id", "%s_%s" % (self.conf_name, self.conf_id))
|
|
conf.item_str("algorithm", self.algorithm)
|
|
if self.key_size:
|
|
conf.item_str("zsk-size", self.key_size)
|
|
conf.item_str("propagation-delay", self.prop_delay)
|
|
if len(self.ksc) > 0:
|
|
conf.item("ksk-submission", "blahblah")
|
|
conf.item("ksk-lifetime", self.ksk_life)
|
|
conf.item("ksk-shared", self.ksk_shared)
|
|
conf.item("cds-cdnskey-publish", self.cds_publish)
|
|
conf.item("cds-digest-type", self.cds_digesttype)
|
|
conf.item("single-type-signing", self.single_type_signing)
|
|
conf.end()
|
|
|
|
conf.begin(self.conf_name)
|
|
conf.id_item("id", self.conf_id)
|
|
conf.item_str("policy", "%s_%s" % (self.conf_name, self.conf_id))
|
|
conf.end()
|
|
|
|
return conf
|
|
|
|
class ModStats(KnotModule):
|
|
'''Stats module'''
|
|
|
|
mod_name = "stats"
|
|
|
|
def __init__(self):
|
|
super().__init__()
|
|
|
|
def _bool(self, conf, name, value=True):
|
|
conf.item_str(name, "on" if value else "off")
|
|
|
|
def get_conf(self, conf=None):
|
|
if not conf:
|
|
conf = dnstest.config.KnotConf()
|
|
|
|
conf.begin(self.conf_name)
|
|
conf.id_item("id", self.conf_id)
|
|
self._bool(conf, "request-protocol", True)
|
|
self._bool(conf, "server-operation", True)
|
|
self._bool(conf, "request-bytes", True)
|
|
self._bool(conf, "response-bytes", True)
|
|
self._bool(conf, "edns-presence", True)
|
|
self._bool(conf, "flag-presence", True)
|
|
self._bool(conf, "response-code", True)
|
|
self._bool(conf, "request-edns-option", True)
|
|
self._bool(conf, "response-edns-option", True)
|
|
self._bool(conf, "reply-nodata", True)
|
|
self._bool(conf, "query-type", True)
|
|
self._bool(conf, "query-size", True)
|
|
self._bool(conf, "reply-size", True)
|
|
conf.end()
|
|
|
|
return conf
|
|
|
|
class ModCookies(KnotModule):
|
|
'''Cookies module'''
|
|
|
|
mod_name = "cookies"
|
|
|
|
def __init__(self,
|
|
secret_lifetime : int | None = None,
|
|
badcookie_slip : int | None = None,
|
|
secret : list[bytearray] | None = None):
|
|
super().__init__()
|
|
self.secret_lifetime = secret_lifetime
|
|
self.badcookie_slip = badcookie_slip
|
|
self.secret = ['0x'+s.hex() for s in secret] if secret else None
|
|
|
|
def get_conf(self, conf=None):
|
|
if not conf:
|
|
conf = dnstest.config.KnotConf()
|
|
|
|
conf.begin(self.conf_name)
|
|
conf.id_item("id", self.conf_id)
|
|
if self.badcookie_slip:
|
|
conf.item_str("badcookie-slip", self.badcookie_slip)
|
|
if self.secret_lifetime:
|
|
conf.item_str("secret-lifetime", self.secret_lifetime)
|
|
if self.secret:
|
|
conf.item_list("secret", self.secret)
|
|
conf.end()
|
|
|
|
return conf
|
|
|
|
class ModQueryacl(KnotModule):
|
|
'''Query ACL module'''
|
|
|
|
mod_name = "queryacl"
|
|
|
|
def __init__(self, address=None, interface=None):
|
|
super().__init__()
|
|
self.address = address
|
|
self.interface = interface
|
|
|
|
def get_conf(self, conf=None):
|
|
if not conf:
|
|
conf = dnstest.config.KnotConf()
|
|
|
|
conf.begin(self.conf_name)
|
|
conf.id_item("id", self.conf_id)
|
|
if self.address:
|
|
if isinstance(self.address, list):
|
|
conf.item_list("address", self.address)
|
|
else:
|
|
conf.item("address", self.address)
|
|
if self.interface:
|
|
if isinstance(self.interface, list):
|
|
conf.item_list("interface", self.interface)
|
|
else:
|
|
conf.item("interface", self.interface)
|
|
conf.end()
|
|
|
|
return conf
|
|
|
|
class ModGeoip(KnotModule):
|
|
'''GeoIP module'''
|
|
|
|
mod_name = "geoip"
|
|
|
|
def __init__(self, config_file="net.conf", mode="subnet", geodb_file=None, geodb_key=None):
|
|
super().__init__()
|
|
self.config_file = config_file
|
|
self.mode = mode
|
|
self.geodb_file = geodb_file
|
|
self.geodb_key = geodb_key
|
|
|
|
@classmethod
|
|
def check(self):
|
|
'''Extended module check by libmaxminddb dependency check'''
|
|
super().check()
|
|
|
|
try:
|
|
proc = Popen(self._check_cmd(), stdout=PIPE, stderr=PIPE,
|
|
universal_newlines=True)
|
|
(out, err) = proc.communicate()
|
|
if re.search("MMDB_open", out):
|
|
return
|
|
raise Skip()
|
|
except:
|
|
raise Skip("Library 'maxminddb' not detected")
|
|
|
|
def get_conf(self, conf=None):
|
|
if not conf:
|
|
conf = dnstest.config.KnotConf()
|
|
|
|
conf.begin(self.conf_name)
|
|
conf.id_item("id", self.conf_id)
|
|
conf.item_str("config-file", self.config_file)
|
|
conf.item("mode", self.mode)
|
|
if self.geodb_file:
|
|
conf.item_str("geodb-file", self.geodb_file)
|
|
if self.geodb_key:
|
|
if isinstance(self.geodb_key, list):
|
|
conf.item_list("geodb-key", self.geodb_key)
|
|
else:
|
|
conf.item("geodb-key", self.geodb_key)
|
|
conf.item("ttl", 1234)
|
|
conf.end()
|
|
|
|
return conf
|
|
|
|
class ModNoudp(KnotModule):
|
|
'''No UDP module'''
|
|
|
|
mod_name = "noudp"
|
|
|
|
def __init__(self, allow_rate=None, trunc_rate=None):
|
|
super().__init__()
|
|
self.allow_rate = allow_rate
|
|
self.trunc_rate = trunc_rate
|
|
|
|
def get_conf(self, conf=None):
|
|
if not conf:
|
|
conf = dnstest.config.KnotConf()
|
|
|
|
conf.begin(self.conf_name)
|
|
conf.id_item("id", self.conf_id)
|
|
if self.allow_rate:
|
|
conf.item_str("udp-allow-rate", self.allow_rate)
|
|
if self.trunc_rate:
|
|
conf.item_str("udp-truncate-rate", self.trunc_rate)
|
|
conf.end()
|
|
|
|
return conf
|
|
|
|
class ModProbe(KnotModule):
|
|
'''Probe module'''
|
|
|
|
mod_name = "probe"
|
|
|
|
def __init__(self, path=None, channels=1, max_rate=1000):
|
|
super().__init__()
|
|
self.path = path
|
|
self.channels = channels
|
|
self.max_rate = max_rate
|
|
|
|
def get_conf(self, conf=None):
|
|
if not conf:
|
|
conf = dnstest.config.KnotConf()
|
|
|
|
conf.begin(self.conf_name)
|
|
conf.id_item("id", self.conf_id)
|
|
if self.path:
|
|
conf.item_str("path", self.path)
|
|
if self.channels:
|
|
conf.item("channels", self.channels)
|
|
if self.max_rate:
|
|
conf.item("max-rate", self.max_rate)
|
|
conf.end()
|
|
|
|
return conf
|
|
|
|
class ModAuthSignal(KnotModule):
|
|
'''AuthSignal module'''
|
|
|
|
mod_name = "authsignal"
|
|
empty = True
|