mirror of
https://gitlab.nic.cz/knot/knot-dns.git
synced 2026-02-03 18:49:28 -05:00
374 lines
11 KiB
Python
374 lines
11 KiB
Python
"""Libknot server control interface wrapper."""
|
|
|
|
import ctypes
|
|
import enum
|
|
import warnings
|
|
import libknot
|
|
|
|
|
|
def load_lib(path: str = None) -> None:
|
|
"""Compatibility wrapper."""
|
|
|
|
libknot.Knot(path)
|
|
warnings.warn("libknot.control.load_lib() is deprecated, use libknot.Knot() instead", \
|
|
category=Warning, stacklevel=2)
|
|
|
|
|
|
class KnotCtlType(enum.IntEnum):
|
|
"""Libknot server control data unit types."""
|
|
|
|
END = 0
|
|
DATA = 1
|
|
EXTRA = 2
|
|
BLOCK = 3
|
|
|
|
|
|
class KnotCtlDataIdx(enum.IntEnum):
|
|
"""Libknot server control data unit indices."""
|
|
|
|
COMMAND = 0
|
|
FLAGS = 1
|
|
ERROR = 2
|
|
SECTION = 3
|
|
ITEM = 4
|
|
ID = 5
|
|
ZONE = 6
|
|
OWNER = 7
|
|
TTL = 8
|
|
TYPE = 9
|
|
DATA = 10
|
|
FILTER = 11
|
|
|
|
|
|
class KnotCtlData(object):
|
|
"""Libknot server control data unit."""
|
|
|
|
DataArray = ctypes.c_char_p * len(KnotCtlDataIdx)
|
|
|
|
def __init__(self) -> None:
|
|
self.data = self.DataArray()
|
|
|
|
def __str__(self) -> str:
|
|
"""Returns data unit in text form."""
|
|
|
|
string = str()
|
|
|
|
for idx in KnotCtlDataIdx:
|
|
if self.data[idx]:
|
|
if string:
|
|
string += ", "
|
|
string += "%s = '%s'" % (idx.name, self.data[idx].decode())
|
|
|
|
return string
|
|
|
|
def __getitem__(self, index: KnotCtlDataIdx) -> str:
|
|
"""Data unit item getter."""
|
|
|
|
value = self.data[index]
|
|
return value.decode() if value else str()
|
|
|
|
def __setitem__(self, index: KnotCtlDataIdx, value: str) -> None:
|
|
"""Data unit item setter."""
|
|
|
|
self.data[index] = ctypes.c_char_p(value.encode()) if value else ctypes.c_char_p()
|
|
|
|
|
|
class KnotCtlError(Exception):
|
|
"""Libknot server control error."""
|
|
|
|
def __init__(self, message: str, data: KnotCtlData = None) -> None:
|
|
super().__init__()
|
|
self.message = message
|
|
self.data = data
|
|
|
|
def __str__(self) -> str:
|
|
out = "%s" % self.message
|
|
if self.data:
|
|
out += " (%s)" % self.data
|
|
return out
|
|
|
|
|
|
class KnotCtlErrorConnect(KnotCtlError):
|
|
"""Control connection error."""
|
|
|
|
|
|
class KnotCtlErrorSend(KnotCtlError):
|
|
"""Control data send error."""
|
|
|
|
|
|
class KnotCtlErrorReceive(KnotCtlError):
|
|
"""Control data receive error."""
|
|
|
|
|
|
class KnotCtlErrorRemote(KnotCtlError):
|
|
"""Control error on the remote (server) side."""
|
|
|
|
|
|
class KnotCtl(object):
|
|
"""Libknot server control interface."""
|
|
|
|
ALLOC = None
|
|
FREE = None
|
|
SET_TIMEOUT = None
|
|
CONNECT = None
|
|
CLOSE = None
|
|
SEND = None
|
|
RECEIVE = None
|
|
|
|
def __init__(self) -> None:
|
|
"""Initializes a control interface instance."""
|
|
|
|
if not KnotCtl.ALLOC:
|
|
libknot.Knot()
|
|
|
|
KnotCtl.ALLOC = libknot.Knot.LIBKNOT.knot_ctl_alloc
|
|
KnotCtl.ALLOC.restype = ctypes.c_void_p
|
|
|
|
KnotCtl.FREE = libknot.Knot.LIBKNOT.knot_ctl_free
|
|
KnotCtl.FREE.argtypes = [ctypes.c_void_p]
|
|
|
|
KnotCtl.SET_TIMEOUT = libknot.Knot.LIBKNOT.knot_ctl_set_timeout
|
|
KnotCtl.SET_TIMEOUT.argtypes = [ctypes.c_void_p, ctypes.c_int]
|
|
|
|
KnotCtl.CONNECT = libknot.Knot.LIBKNOT.knot_ctl_connect
|
|
KnotCtl.CONNECT.restype = ctypes.c_int
|
|
KnotCtl.CONNECT.argtypes = [ctypes.c_void_p, ctypes.c_char_p]
|
|
|
|
KnotCtl.CLOSE = libknot.Knot.LIBKNOT.knot_ctl_close
|
|
KnotCtl.CLOSE.argtypes = [ctypes.c_void_p]
|
|
|
|
KnotCtl.SEND = libknot.Knot.LIBKNOT.knot_ctl_send
|
|
KnotCtl.SEND.restype = ctypes.c_int
|
|
KnotCtl.SEND.argtypes = [ctypes.c_void_p, ctypes.c_uint, ctypes.c_void_p]
|
|
|
|
KnotCtl.RECEIVE = libknot.Knot.LIBKNOT.knot_ctl_receive
|
|
KnotCtl.RECEIVE.restype = ctypes.c_int
|
|
KnotCtl.RECEIVE.argtypes = [ctypes.c_void_p, ctypes.c_void_p, ctypes.c_void_p]
|
|
|
|
self.obj = KnotCtl.ALLOC()
|
|
|
|
def __del__(self) -> None:
|
|
"""Deallocates control interface instance."""
|
|
|
|
KnotCtl.FREE(self.obj)
|
|
|
|
def set_timeout(self, timeout: int) -> None:
|
|
"""Sets control socket operations timeout in seconds."""
|
|
|
|
KnotCtl.SET_TIMEOUT(self.obj, timeout * 1000)
|
|
|
|
def connect(self, path: str) -> None:
|
|
"""Connect to a specified control UNIX socket."""
|
|
|
|
ret = KnotCtl.CONNECT(self.obj, path.encode())
|
|
if ret != 0:
|
|
err = libknot.Knot.STRERROR(ret)
|
|
raise KnotCtlErrorConnect(err.decode())
|
|
|
|
def close(self) -> None:
|
|
"""Disconnects from the current control socket."""
|
|
|
|
KnotCtl.CLOSE(self.obj)
|
|
|
|
def send(self, data_type: KnotCtlType, data: KnotCtlData = None) -> None:
|
|
"""Sends a data unit to the connected control socket."""
|
|
|
|
ret = KnotCtl.SEND(self.obj, data_type,
|
|
data.data if data else ctypes.c_char_p())
|
|
if ret != 0:
|
|
err = libknot.Knot.STRERROR(ret)
|
|
raise KnotCtlErrorSend(err.decode())
|
|
|
|
def receive(self, data: KnotCtlData = None) -> KnotCtlType:
|
|
"""Receives a data unit from the connected control socket."""
|
|
|
|
data_type = ctypes.c_uint()
|
|
ret = KnotCtl.RECEIVE(self.obj, ctypes.byref(data_type),
|
|
data.data if data else ctypes.c_char_p())
|
|
if ret != 0:
|
|
err = libknot.Knot.STRERROR(ret)
|
|
raise KnotCtlErrorReceive(err.decode())
|
|
return KnotCtlType(data_type.value)
|
|
|
|
def send_block(self, cmd: str, section: str = None, item: str = None,
|
|
identifier: str = None, zone: str = None, owner: str = None,
|
|
ttl: str = None, rtype: str = None, data: str = None,
|
|
flags: str = None, filters: str = None) -> None:
|
|
"""Sends a control query block."""
|
|
|
|
query = KnotCtlData()
|
|
query[KnotCtlDataIdx.COMMAND] = cmd
|
|
query[KnotCtlDataIdx.SECTION] = section
|
|
query[KnotCtlDataIdx.ITEM] = item
|
|
query[KnotCtlDataIdx.ID] = identifier
|
|
query[KnotCtlDataIdx.ZONE] = zone
|
|
query[KnotCtlDataIdx.OWNER] = owner
|
|
query[KnotCtlDataIdx.TTL] = ttl
|
|
query[KnotCtlDataIdx.TYPE] = rtype
|
|
query[KnotCtlDataIdx.DATA] = data
|
|
query[KnotCtlDataIdx.FLAGS] = flags
|
|
query[KnotCtlDataIdx.FILTER] = filters
|
|
|
|
self.send(KnotCtlType.DATA, query)
|
|
self.send(KnotCtlType.BLOCK)
|
|
|
|
def _receive_conf(self, out, reply):
|
|
|
|
section = reply[KnotCtlDataIdx.SECTION]
|
|
ident = reply[KnotCtlDataIdx.ID]
|
|
item = reply[KnotCtlDataIdx.ITEM]
|
|
data = reply[KnotCtlDataIdx.DATA]
|
|
|
|
# Add the section if not exists.
|
|
if section not in out:
|
|
out[section] = dict()
|
|
|
|
# Add the identifier if not exists.
|
|
if ident and ident not in out[section]:
|
|
out[section][ident] = dict()
|
|
|
|
# Return if no item/value.
|
|
if not item:
|
|
return
|
|
|
|
item_level = out[section][ident] if ident else out[section]
|
|
|
|
# Treat alone identifier item differently.
|
|
if item in ["id", "domain", "target"]:
|
|
if data not in out[section]:
|
|
out[section][data] = dict()
|
|
else:
|
|
if item not in item_level:
|
|
item_level[item] = list()
|
|
|
|
if data:
|
|
item_level[item].append(data)
|
|
|
|
def _receive_zone_status(self, out, reply):
|
|
|
|
zone = reply[KnotCtlDataIdx.ZONE]
|
|
rtype = reply[KnotCtlDataIdx.TYPE]
|
|
data = reply[KnotCtlDataIdx.DATA]
|
|
|
|
# Add the zone if not exists.
|
|
if zone not in out:
|
|
out[zone] = dict()
|
|
|
|
out[zone][rtype] = data
|
|
|
|
def _receive_zone(self, out, reply):
|
|
|
|
zone = reply[KnotCtlDataIdx.ZONE]
|
|
owner = reply[KnotCtlDataIdx.OWNER]
|
|
ttl = reply[KnotCtlDataIdx.TTL]
|
|
rtype = reply[KnotCtlDataIdx.TYPE]
|
|
data = reply[KnotCtlDataIdx.DATA]
|
|
|
|
# Add the zone if not exists.
|
|
if zone not in out:
|
|
out[zone] = dict()
|
|
|
|
if owner not in out[zone]:
|
|
out[zone][owner] = dict()
|
|
|
|
if rtype not in out[zone][owner]:
|
|
out[zone][owner][rtype] = dict()
|
|
|
|
# Add the key/value.
|
|
out[zone][owner][rtype]["ttl"] = ttl
|
|
|
|
if not "data" in out[zone][owner][rtype]:
|
|
out[zone][owner][rtype]["data"] = [data]
|
|
else:
|
|
out[zone][owner][rtype]["data"].append(data)
|
|
|
|
def _receive_stats(self, out, reply):
|
|
|
|
zone = reply[KnotCtlDataIdx.ZONE]
|
|
section = reply[KnotCtlDataIdx.SECTION]
|
|
item = reply[KnotCtlDataIdx.ITEM]
|
|
idx = reply[KnotCtlDataIdx.ID]
|
|
data = int(reply[KnotCtlDataIdx.DATA])
|
|
|
|
# Add the zone if not exists.
|
|
if zone:
|
|
if "zone" not in out:
|
|
out["zone"] = dict()
|
|
|
|
if zone not in out["zone"]:
|
|
out["zone"][zone] = dict()
|
|
|
|
section_level = out["zone"][zone] if zone else out
|
|
|
|
if section not in section_level:
|
|
section_level[section] = dict()
|
|
|
|
if idx:
|
|
if item not in section_level[section]:
|
|
section_level[section][item] = dict()
|
|
|
|
section_level[section][item][idx] = data
|
|
else:
|
|
section_level[section][item] = data
|
|
|
|
def receive_stats(self) -> dict:
|
|
"""Receives statistics answer and returns it as a structured dictionary."""
|
|
|
|
out = dict()
|
|
err_reply = None
|
|
|
|
while True:
|
|
reply = KnotCtlData()
|
|
reply_type = self.receive(reply)
|
|
|
|
# Stop if not data type.
|
|
if reply_type not in [KnotCtlType.DATA, KnotCtlType.EXTRA]:
|
|
break
|
|
|
|
# Check for an error.
|
|
if reply[KnotCtlDataIdx.ERROR]:
|
|
err_reply = reply
|
|
continue
|
|
|
|
self._receive_stats(out, reply)
|
|
|
|
if err_reply:
|
|
raise KnotCtlErrorRemote(err_reply[KnotCtlDataIdx.ERROR], err_reply)
|
|
|
|
return out
|
|
|
|
def receive_block(self) -> dict:
|
|
"""Receives a control answer and returns it as a structured dictionary."""
|
|
|
|
out = dict()
|
|
err_reply = None
|
|
|
|
while True:
|
|
reply = KnotCtlData()
|
|
reply_type = self.receive(reply)
|
|
|
|
# Stop if not data type.
|
|
if reply_type not in [KnotCtlType.DATA, KnotCtlType.EXTRA]:
|
|
break
|
|
|
|
# Check for an error.
|
|
if reply[KnotCtlDataIdx.ERROR]:
|
|
err_reply = reply
|
|
continue
|
|
|
|
# Check for config data.
|
|
if reply[KnotCtlDataIdx.SECTION]:
|
|
self._receive_conf(out, reply)
|
|
# Check for zone data.
|
|
elif reply[KnotCtlDataIdx.ZONE]:
|
|
if reply[KnotCtlDataIdx.OWNER]:
|
|
self._receive_zone(out, reply)
|
|
else:
|
|
self._receive_zone_status(out, reply)
|
|
else:
|
|
continue
|
|
|
|
if err_reply:
|
|
raise KnotCtlErrorRemote(err_reply[KnotCtlDataIdx.ERROR], err_reply)
|
|
|
|
return out
|