tests-extra: add SoftHSM backend support

This commit is contained in:
Jan Hák 2025-11-21 13:28:01 +01:00 committed by Daniel Salzman
parent 4149efc18d
commit f752c64094
8 changed files with 207 additions and 45 deletions

View file

@ -13,6 +13,7 @@ ldnsutils
lsof
gawk
objdump
softhsm2
(valgrind)
(gdb)

View file

@ -33,3 +33,21 @@
...
fun:allocate_stack
}
{
libgnutls/p11_load_module
Memcheck:Leak
match-leak-kinds: definite,reachable
fun:*alloc
...
fun:gnutls_pkcs11_add_provider
}
{
libstdc++/exit
Memcheck:Leak
match-leak-kinds: reachable
fun:malloc
...
fun:exit
}

View file

@ -5,19 +5,17 @@ Check of multi-keystore operation.
"""
import os
import random
import shutil
from dnstest.utils import *
from dnstest.keys import Keymgr
from dnstest.keystore import KeystorePEM
from dnstest.test import Test
def check_key_count(server, keystore, expected):
ksdir = os.path.join(server.keydir, keystore)
try:
files = len([name for name in os.listdir(ksdir)])
files = len([name for name in os.listdir(keystore.config())])
except FileNotFoundError:
files = 0
compare(files, expected, "privkey count in %s" % ksdir)
compare(files, expected, "privkey count in %s" % keystore.id)
t = Test()
@ -25,17 +23,20 @@ server = t.server("knot")
zone = t.zone("catalog.") # has zero TTL => faster key rollovers
t.link(zone, server)
keys1 = KeystorePEM("keys1")
keys2 = KeystorePEM("keys2")
server.dnssec(zone).enable = True
server.dnssec(zone).propagation_delay = 1
server.dnssec(zone).keystore = [ "keys1", "keys2" ]
server.dnssec(zone).keystore = [ keys1, keys2 ]
t.start()
serial = server.zone_wait(zone)
check_key_count(server, "keys1", 2)
check_key_count(server, "keys2", 0)
check_key_count(server, keys1, 2)
check_key_count(server, keys2, 0)
server.dnssec(zone).keystore = [ "keys2", "keys1" ]
server.dnssec(zone).keystore = [ keys2, keys1 ]
server.gen_confile()
server.reload()
server.ctl("zone-key-rollover %s zsk" % zone[0].name)
@ -43,17 +44,17 @@ server.ctl("zone-key-rollover %s zsk" % zone[0].name)
serial += 2 # wait for three increments which is whole ZSK rollover
serial = server.zone_wait(zone, serial)
check_key_count(server, "keys1", 1)
check_key_count(server, "keys2", 1)
check_key_count(server, keys1, 1)
check_key_count(server, keys2, 1)
backup_dir = os.path.join(server.dir, "backup1")
server.ctl("zone-backup +backupdir %s %s" % (backup_dir, zone[0].name), wait=True)
shutil.rmtree(os.path.join(server.keydir, "keys1"))
shutil.rmtree(os.path.join(server.keydir, "keys2"))
keys1.clear()
keys2.clear()
server.ctl("zone-restore +backupdir %s %s" % (backup_dir, zone[0].name), wait=True)
check_key_count(server, "keys1", 0)
check_key_count(server, "keys2", 2) # restore puts all keys to first configured keystore no matter where they were at backup
check_key_count(server, keys1, 0)
check_key_count(server, keys2, 2) # restore puts all keys to first configured keystore no matter where they were at backup
server.ctl("zone-sign %s" % zone[0].name, wait=True) # check that signing still works after restore
serial = server.zone_wait(zone, serial)
@ -61,53 +62,55 @@ serial = server.zone_wait(zone, serial)
server.flush(zone[0], wait=True)
server.zone_verify(zone[0])
server.dnssec(zone).keystore = [ "keys0ksk", "keys1", "keys2" ]
keys0ksk = KeystorePEM("keys0ksk", ksk_only=True)
server.dnssec(zone).keystore = [ keys0ksk, keys1, keys2 ]
server.gen_confile()
server.reload()
server.ctl("zone-key-rollover %s ksk" % zone[0].name)
serial += 1 # wait for two increments
serial = server.zone_wait(zone, serial)
check_key_count(server, "keys0ksk", 1)
check_key_count(server, "keys1", 0)
check_key_count(server, "keys2", 2)
check_key_count(server, keys0ksk, 1)
check_key_count(server, keys1, 0)
check_key_count(server, keys2, 2)
server.ctl("zone-ksk-submitted %s" % zone[0].name)
serial = server.zone_wait(zone, serial)
check_key_count(server, "keys0ksk", 1)
check_key_count(server, "keys1", 0)
check_key_count(server, "keys2", 1)
check_key_count(server, keys0ksk, 1)
check_key_count(server, keys1, 0)
check_key_count(server, keys2, 1)
server.ctl("zone-key-rollover %s zsk" % zone[0].name)
serial += 2 # wait for three increments which is whole ZSK rollover
serial = server.zone_wait(zone, serial)
check_key_count(server, "keys0ksk", 1)
check_key_count(server, "keys1", 1)
check_key_count(server, "keys2", 0)
check_key_count(server, keys0ksk, 1)
check_key_count(server, keys1, 1)
check_key_count(server, keys2, 0)
Keymgr.run_check(server.confile, zone[0].name, "generate", "ksk=yes")
check_key_count(server, "keys0ksk", 2)
check_key_count(server, "keys1", 1)
check_key_count(server, keys0ksk, 2)
check_key_count(server, keys1, 1)
Keymgr.run_check(server.confile, zone[0].name, "generate", "ksk=no")
check_key_count(server, "keys0ksk", 2)
check_key_count(server, "keys1", 2)
check_key_count(server, keys0ksk, 2)
check_key_count(server, keys1, 2)
Keymgr.run_check(server.confile, zone[0].name, "import-bind", os.path.join(t.data_dir, "Kcatalog.+013+07147.key"))
check_key_count(server, "keys0ksk", 2)
check_key_count(server, "keys1", 3)
check_key_count(server, keys0ksk, 2)
check_key_count(server, keys1, 3)
Keymgr.run_check(server.confile, zone[0].name, "import-bind", os.path.join(t.data_dir, "Kcatalog.+013+18635.key"))
check_key_count(server, "keys0ksk", 3)
check_key_count(server, "keys1", 3)
check_key_count(server, keys0ksk, 3)
check_key_count(server, keys1, 3)
Keymgr.run_check(server.confile, zone[0].name, "import-pem", os.path.join(t.data_dir, "8329a00d5dceefdcbbf7b8a3cdf61fe944c51d6f.pem"), "ksk=yes")
check_key_count(server, "keys0ksk", 4)
check_key_count(server, "keys1", 3)
check_key_count(server, keys0ksk, 4)
check_key_count(server, keys1, 3)
Keymgr.run_check(server.confile, zone[0].name, "import-pem", os.path.join(t.data_dir, "894d4240398f459f59f4a99cd4c5b658c9a62d54.pem"), "ksk=no")
check_key_count(server, "keys0ksk", 4)
check_key_count(server, "keys1", 4)
check_key_count(server, "keys2", 0)
check_key_count(server, keys0ksk, 4)
check_key_count(server, keys1, 4)
check_key_count(server, keys2, 0)
t.end()

View file

@ -0,0 +1,56 @@
#!/usr/bin/env python3
"""
Backup and restore of SoftHSM keystores.
"""
from dnstest.utils import *
from dnstest.keystore import KeystoreSoftHSM
from dnstest.test import Test
t = Test()
knot1 = t.server("knot")
knot2 = t.server("knot")
zone = t.zone("example.com")
t.link(zone, knot1)
t.link(zone, knot2)
keys1 = KeystoreSoftHSM("keys1")
keys1.link(knot1)
keys2 = KeystoreSoftHSM("keys2")
keys2.link(knot2)
knot1.dnssec(zone).enable = True
knot1.dnssec(zone).keystore = [ keys1 ]
t.start()
# Wait for signed zone
knot1.zone_wait(zone)
resp = knot1.dig(zone[0].name, "DNSKEY")
resp.check_count(2, "DNSKEY")
# Wait for unsigned zone
serial = knot2.zone_wait(zone)
resp = knot2.dig(zone[0].name, "DNSKEY")
resp.check_count(0, "DNSKEY")
backup_dir = os.path.join(knot1.dir, "backup")
knot1.ctl("zone-backup +keysonly +backupdir %s %s" % (backup_dir, zone[0].name), wait=True)
keys2.init(keys1) # Synchronize tokens directory between SoftHSMs
knot2.ctl("zone-restore +keysonly +backupdir %s %s" % (backup_dir, zone[0].name), wait=True)
# Enable signing with initial keys from the backup
knot2.dnssec(zone).enable = True
knot2.dnssec(zone).keystore = [ keys2 ]
knot2.gen_confile()
knot2.reload()
# Check the keysets match
knot2.zone_wait(zone, serial)
resp = knot2.dig(zone[0].name, "DNSKEY")
resp.cmp(knot1)
t.end()

View file

@ -3,6 +3,7 @@ __all__ = [
"context",
"inquirer",
"keys",
"keystore",
"knsupdate",
"libknot",
"module",

View file

@ -0,0 +1,75 @@
#!/usr/bin/env python3
import os
import shutil
from subprocess import Popen
from dnstest.context import Context
class Keystore(object):
def __init__(self, id: str, ksk_only: bool = None, key_label: bool = None):
self.id = id
self.ksk_only = ksk_only
self.key_label = key_label
class KeystorePEM(Keystore):
def __init__(self, id: str, ksk_only: bool = None, key_label: bool = None):
super().__init__(id, ksk_only, key_label)
def config(self):
return os.path.join(Context().test.out_dir, f"{self.backend()}-{self.id}")
def backend(self):
return "pem"
def clear(self):
shutil.rmtree(self.config())
class KeystoreSoftHSM(Keystore):
so_pin = "12345"
def __init__(self, id: str, ksk_only: bool = None, key_label: bool = None,
token: str = None, passwd: str = None, so_path: str = None):
super().__init__(id, ksk_only, key_label)
self.so_path = so_path if so_path else "/usr/lib/x86_64-linux-gnu/softhsm/libsofthsm2.so"
self.passwd = passwd if passwd else "1234"
self.token = token if token else "knot"
self.dir = os.path.join(Context().test.out_dir, f"{self.backend()}-{self.id}")
self.init()
def config(self):
return f"pkcs11:token={self.token};pin-value={self.passwd} {self.so_path}"
def backend(self):
return "pkcs11"
def config_file(self):
return os.path.join(self.dir, "softhsm.conf")
def clear(self):
shutil.rmtree(os.path.join(self.dir, "tokens"))
def init(self, keystore=None):
if not os.path.isdir(self.dir):
os.makedirs(os.path.join(self.dir, "tokens"))
with open(self.config_file(), "w") as conf_file:
config = (
f"directories.tokendir = {self.dir}/tokens/\n"
"objectstore.backend = file\n"
"log.level = INFO\n"
)
conf_file.write(config)
if keystore:
self.clear()
shutil.copytree(os.path.join(keystore.dir, "tokens"), os.path.join(self.dir, "tokens"))
else:
init_process = Popen(
['softhsm2-util', '--init-token', '--free', f'--label={self.token}',
f'--pin={self.passwd}', f'--so-pin={self.so_pin}', f'--module={self.so_path}'],
stdout=open(os.path.join(self.dir, "stdout"), mode='a'),
stderr=open(os.path.join(self.dir, "stderr"), mode='a'),
env=dict(os.environ, SOFTHSM2_CONF=self.config_file()))
init_process.wait()
def link(self, server):
server.softhsm_conf = self.config_file()

View file

@ -49,7 +49,7 @@ xdp = False
valgrind_bin = get_binary("KNOT_TEST_VALGRIND", "valgrind")
# KNOT_TEST_VALGRIND_FLAGS - valgrind flags.
valgrind_flags = get_param("KNOT_TEST_VALGRIND_FLAGS",
"--leak-check=full --show-leak-kinds=all --track-origins=yes --vgdb=yes --verbose --num-callers=20 --trace-children=yes --trace-children-skip=/usr/*sh,/bin/*sh")
"--leak-check=full --show-leak-kinds=all --track-origins=yes --vgdb=yes --verbose --num-callers=20 --trace-children=yes --trace-children-skip=/usr/*sh,/bin/*sh --gen-suppressions=all")
# KNOT_TEST_GDB - gdb binary.
gdb_bin = get_binary("KNOT_TEST_GDB", "gdb")
# KNOT_TEST_VGDB - vgdb binary.

View file

@ -192,6 +192,7 @@ class Server(object):
self.valgrind_log = None
self.session_log = None
self.confile = None
self.softhsm_conf = str()
self.redis_backends = list()
@ -353,7 +354,9 @@ class Server(object):
self.start_params,
stdout=open(self.fout, mode=mode),
stderr=open(self.ferr, mode=mode),
env=dict(os.environ, SSLKEYLOGFILE=self.session_log))
env=dict(os.environ,
SSLKEYLOGFILE=self.session_log,
SOFTHSM2_CONF=self.softhsm_conf))
if self.valgrind:
time.sleep(Server.START_WAIT_VALGRIND)
@ -1663,10 +1666,13 @@ class Knot(Server):
s.begin("keystore")
have_keystore = True
for ks in z.dnssec.keystore:
s.id_item("id", ks)
s.item("config", ks)
if ks.endswith("ksk"):
s.item("ksk-only", "on")
s.id_item("id", ks.id)
s.item_type("config", ks.config())
s.item_type("backend", ks.backend())
if ks.ksk_only:
s.item_type("ksk-only", ks.ksk_only)
if ks.key_label:
s.item_type("key-label", ks.key_label)
if have_keystore:
s.end()
@ -1677,9 +1683,11 @@ class Knot(Server):
s.begin("policy")
s.id_item("id", z.name)
for ci, val in self.conf["policy"][z.name].items():
if ci not in [ "enable", "shared_policy_with" ]:
if ci not in [ "enable", "shared_policy_with", "keystore" ]:
s.item_type(ci.replace("_", "-"), val)
if len(z.dnssec.keystore or []) > 0:
s.item_list("keystore", [ v.id for v in z.dnssec.keystore ])
if zone in self.conf["dnskey-sync"]:
s.item("dnskey-sync", zone)
if zone in self.conf["submission"]: