#!/usr/bin/env python3 import os import re from subprocess import Popen, PIPE, check_call from dnstest.utils import * import dnstest.config import dnstest.params as params class KnotModule(object): '''Query module configuration''' MOD_CONF_PREFIX = "mod-" MOD_API_PREFIX = "knotd_mod_api_" # Instance counter. count = 1 # Module API name suffix. mod_name = None # Empty configuration empty = False def __init__(self): self.conf_id = "id%s" % type(self).count type(self).count += 1 @property def conf_name(self): return self.MOD_CONF_PREFIX + self.mod_name @classmethod def _check_cmd(cls): if params.libtool_bin: prefix = [params.libtool_bin, "exec"] else: prefix = [] return prefix + ["objdump", "-t", params.knot_bin] @classmethod def check(cls): '''Checks the server binary for the module code''' try: proc = Popen(cls._check_cmd(), stdout=PIPE, stderr=PIPE, universal_newlines=True) (out, err) = proc.communicate() if re.search(cls.MOD_API_PREFIX + cls.mod_name, out): return raise Skip() except: raise Skip("Module '%s' not detected" % cls.mod_name) def get_conf_ref(self): if self.empty: return str(self.conf_name) else: return "%s/%s" % (self.conf_name, self.conf_id) def get_conf(self, conf=None): pass class ModSynthRecord(KnotModule): '''Automatic forward/reverse records module''' mod_name = "synthrecord" def __init__(self, mtype, prefix, ttl, network, origin=None): super().__init__() self.mtype = mtype self.prefix = prefix self.ttl = ttl self.network = network self.origin = origin def get_conf(self, conf=None): if not conf: conf = dnstest.config.KnotConf() conf.begin(self.conf_name) conf.id_item("id", self.conf_id) conf.item_str("type", self.mtype) if (self.prefix): conf.item_str("prefix", self.prefix) if (self.ttl): conf.item_str("ttl", self.ttl) conf.item("network", self.network) if (self.origin): conf.item_str("origin", self.origin) conf.end() return conf class ModDnstap(KnotModule): '''Dnstap module''' mod_name = "dnstap" def __init__(self, sink): super().__init__() self.sink = sink def get_conf(self, conf=None): if not conf: conf = dnstest.config.KnotConf() conf.begin(self.conf_name) conf.id_item("id", self.conf_id) conf.item_str("sink", self.sink) conf.end() return conf class ModRRL(KnotModule): '''RRL module''' mod_name = "rrl" def __init__(self, rate_limit, slip=None, table_size=None, whitelist=None, instant_limit=None, log_period=0): super().__init__() self.rate_limit = rate_limit self.instant_limit = instant_limit if instant_limit else rate_limit self.slip = slip self.table_size = table_size self.whitelist = whitelist self.log_period = log_period def get_conf(self, conf=None): if not conf: conf = dnstest.config.KnotConf() conf.begin(self.conf_name) conf.id_item("id", self.conf_id) conf.item_str("rate-limit", self.rate_limit) conf.item_str("instant-limit", self.instant_limit) if self.slip or self.slip == 0: conf.item_str("slip", self.slip) if self.table_size: conf.item_str("table-size", self.table_size) if self.whitelist: conf.item_str("whitelist", self.whitelist) if self.log_period > 0: conf.item_str("log-period", self.log_period) conf.end() return conf class ModDnsproxy(KnotModule): '''Dnsproxy module''' mod_name = "dnsproxy" def __init__(self, addr, port=53, nxdomain=False, fallback=True): super().__init__() self.addr = addr self.port = port self.fallback = fallback self.nxdomain = nxdomain def get_conf(self, conf=None): if not conf: conf = dnstest.config.KnotConf() conf.begin("remote") conf.id_item("id", "%s_%s" % (self.conf_name, self.conf_id)) conf.item_str("address", "%s@%s" % (self.addr, self.port)) conf.end() conf.begin(self.conf_name) conf.id_item("id", self.conf_id) conf.item_str("remote", "%s_%s" % (self.conf_name, self.conf_id)) conf.item_str("fallback", "on" if self.fallback else "off") conf.item_str("catch-nxdomain", "on" if self.nxdomain else "off") conf.end() return conf class ModWhoami(KnotModule): '''Whoami module''' mod_name = "whoami" empty = True def __init__(self): super().__init__() class ModOnlineSign(KnotModule): '''Online-sign module''' mod_name = "onlinesign" def __init__(self, algorithm=None, key_size=None, prop_delay=3600, ksc=[ ], ksci=999, ksk_life=9999, ksk_shared=False, cds_publish="rollover", cds_digesttype="sha256", single_type_signing=True): super().__init__() self.algorithm = algorithm self.key_size = key_size if not algorithm: self.empty = True self.prop_delay = prop_delay self.ksc = ksc self.ksci = ksci self.ksk_life = ksk_life self.ksk_shared = ksk_shared self.cds_publish = cds_publish self.cds_digesttype = cds_digesttype self.single_type_signing = single_type_signing def get_conf(self, conf=None): if not conf: conf = dnstest.config.KnotConf() if self.algorithm and len(self.ksc) > 0: conf.begin("submission") conf.id_item("id", "blahblah") parents = "" for parent in self.ksc: if parents: parents += ", " parents += parent.name conf.item("parent", "[%s]" % parents) conf.item_str("check-interval", self.ksci) if self.algorithm: conf.begin("policy") conf.id_item("id", "%s_%s" % (self.conf_name, self.conf_id)) conf.item_str("algorithm", self.algorithm) if self.key_size: conf.item_str("zsk-size", self.key_size) conf.item_str("propagation-delay", self.prop_delay) if len(self.ksc) > 0: conf.item("ksk-submission", "blahblah") conf.item("ksk-lifetime", self.ksk_life) conf.item("ksk-shared", self.ksk_shared) conf.item("cds-cdnskey-publish", self.cds_publish) conf.item("cds-digest-type", self.cds_digesttype) conf.item("single-type-signing", self.single_type_signing) conf.end() conf.begin(self.conf_name) conf.id_item("id", self.conf_id) conf.item_str("policy", "%s_%s" % (self.conf_name, self.conf_id)) conf.end() return conf class ModStats(KnotModule): '''Stats module''' mod_name = "stats" def __init__(self): super().__init__() def _bool(self, conf, name, value=True): conf.item_str(name, "on" if value else "off") def get_conf(self, conf=None): if not conf: conf = dnstest.config.KnotConf() conf.begin(self.conf_name) conf.id_item("id", self.conf_id) self._bool(conf, "request-protocol", True) self._bool(conf, "server-operation", True) self._bool(conf, "request-bytes", True) self._bool(conf, "response-bytes", True) self._bool(conf, "edns-presence", True) self._bool(conf, "flag-presence", True) self._bool(conf, "response-code", True) self._bool(conf, "request-edns-option", True) self._bool(conf, "response-edns-option", True) self._bool(conf, "reply-nodata", True) self._bool(conf, "query-type", True) self._bool(conf, "query-size", True) self._bool(conf, "reply-size", True) conf.end() return conf class ModCookies(KnotModule): '''Cookies module''' mod_name = "cookies" def __init__(self, secret_lifetime : int | None = None, badcookie_slip : int | None = None, secret : list[bytearray] | None = None): super().__init__() self.secret_lifetime = secret_lifetime self.badcookie_slip = badcookie_slip self.secret = ['0x'+s.hex() for s in secret] if secret else None def get_conf(self, conf=None): if not conf: conf = dnstest.config.KnotConf() conf.begin(self.conf_name) conf.id_item("id", self.conf_id) if self.badcookie_slip: conf.item_str("badcookie-slip", self.badcookie_slip) if self.secret_lifetime: conf.item_str("secret-lifetime", self.secret_lifetime) if self.secret: conf.item_list("secret", self.secret) conf.end() return conf class ModQueryacl(KnotModule): '''Query ACL module''' mod_name = "queryacl" def __init__(self, address=None, interface=None): super().__init__() self.address = address self.interface = interface def get_conf(self, conf=None): if not conf: conf = dnstest.config.KnotConf() conf.begin(self.conf_name) conf.id_item("id", self.conf_id) if self.address: if isinstance(self.address, list): conf.item_list("address", self.address) else: conf.item("address", self.address) if self.interface: if isinstance(self.interface, list): conf.item_list("interface", self.interface) else: conf.item("interface", self.interface) conf.end() return conf class ModGeoip(KnotModule): '''GeoIP module''' mod_name = "geoip" def __init__(self, config_file="net.conf", mode="subnet", geodb_file=None, geodb_key=None): super().__init__() self.config_file = config_file self.mode = mode self.geodb_file = geodb_file self.geodb_key = geodb_key @classmethod def check(self): '''Extended module check by libmaxminddb dependency check''' super().check() try: proc = Popen(self._check_cmd(), stdout=PIPE, stderr=PIPE, universal_newlines=True) (out, err) = proc.communicate() if re.search("MMDB_open", out): return raise Skip() except: raise Skip("Library 'maxminddb' not detected") def get_conf(self, conf=None): if not conf: conf = dnstest.config.KnotConf() conf.begin(self.conf_name) conf.id_item("id", self.conf_id) conf.item_str("config-file", self.config_file) conf.item("mode", self.mode) if self.geodb_file: conf.item_str("geodb-file", self.geodb_file) if self.geodb_key: if isinstance(self.geodb_key, list): conf.item_list("geodb-key", self.geodb_key) else: conf.item("geodb-key", self.geodb_key) conf.item("ttl", 1234) conf.end() return conf class ModNoudp(KnotModule): '''No UDP module''' mod_name = "noudp" def __init__(self, allow_rate=None, trunc_rate=None): super().__init__() self.allow_rate = allow_rate self.trunc_rate = trunc_rate def get_conf(self, conf=None): if not conf: conf = dnstest.config.KnotConf() conf.begin(self.conf_name) conf.id_item("id", self.conf_id) if self.allow_rate: conf.item_str("udp-allow-rate", self.allow_rate) if self.trunc_rate: conf.item_str("udp-truncate-rate", self.trunc_rate) conf.end() return conf class ModProbe(KnotModule): '''Probe module''' mod_name = "probe" def __init__(self, path=None, channels=1, max_rate=1000): super().__init__() self.path = path self.channels = channels self.max_rate = max_rate def get_conf(self, conf=None): if not conf: conf = dnstest.config.KnotConf() conf.begin(self.conf_name) conf.id_item("id", self.conf_id) if self.path: conf.item_str("path", self.path) if self.channels: conf.item("channels", self.channels) if self.max_rate: conf.item("max-rate", self.max_rate) conf.end() return conf class ModAuthSignal(KnotModule): '''AuthSignal module''' mod_name = "authsignal" empty = True