mirror of
https://github.com/isc-projects/bind9.git
synced 2026-02-28 12:31:29 -05:00
Ensure all "import dns.*" statements are always placed after
pytest.importorskip('dns') calls, in order to allow the latter to
fulfill their purpose. Explicitly import all dnspython modules used by
each dnspython-based test to avoid relying on nested imports. Replace
function-scoped imports with global imports to reduce code duplication.
(cherry picked from commit 49312d6bb2)
95 lines
3.9 KiB
Python
95 lines
3.9 KiB
Python
#!/usr/bin/python3
|
|
|
|
# Copyright (C) Internet Systems Consortium, Inc. ("ISC")
|
|
#
|
|
# SPDX-License-Identifier: MPL-2.0
|
|
#
|
|
# This Source Code Form is subject to the terms of the Mozilla Public
|
|
# License, v. 2.0. If a copy of the MPL was not distributed with this
|
|
# file, you can obtain one at https://mozilla.org/MPL/2.0/.
|
|
#
|
|
# See the COPYRIGHT file distributed with this work for additional
|
|
# information regarding copyright ownership.
|
|
|
|
import selectors
|
|
import struct
|
|
import subprocess
|
|
import time
|
|
|
|
import pytest
|
|
|
|
pytest.importorskip('dns')
|
|
import dns.exception
|
|
import dns.message
|
|
import dns.name
|
|
import dns.rdataclass
|
|
import dns.rdatatype
|
|
|
|
|
|
def test_gnutls_cli_query(gnutls_cli_executable, named_tlsport):
|
|
# Prepare the example/SOA query which will be sent over TLS.
|
|
query = dns.message.make_query('example.', dns.rdatatype.SOA)
|
|
query_wire = query.to_wire()
|
|
query_with_length = struct.pack('>H', len(query_wire)) + query_wire
|
|
|
|
# Run gnutls-cli.
|
|
gnutls_cli_args = [gnutls_cli_executable, '--no-ca-verification', '-V',
|
|
'--no-ocsp', '--alpn=dot', '--logfile=gnutls-cli.log',
|
|
'--port=%d' % named_tlsport, '10.53.0.1']
|
|
with open('gnutls-cli.err', 'wb') as gnutls_cli_stderr, \
|
|
subprocess.Popen(gnutls_cli_args, stdin=subprocess.PIPE,
|
|
stdout=subprocess.PIPE, stderr=gnutls_cli_stderr,
|
|
bufsize=0) as gnutls_cli:
|
|
# Send the example/SOA query to the standard input of gnutls-cli. Do
|
|
# not close standard input yet because that causes gnutls-cli to close
|
|
# the TLS connection immediately, preventing the response from being
|
|
# read.
|
|
gnutls_cli.stdin.write(query_with_length)
|
|
gnutls_cli.stdin.flush()
|
|
|
|
# Keep reading data from the standard output of gnutls-cli until a full
|
|
# DNS message is received or a timeout is exceeded or gnutls-cli exits.
|
|
# Popen.communicate() cannot be used here because: a) it closes
|
|
# standard input after sending data to the process (see above why this
|
|
# is a problem), b) gnutls-cli is not DNS-aware, so it does not exit
|
|
# upon receiving a DNS response.
|
|
selector = selectors.DefaultSelector()
|
|
selector.register(gnutls_cli.stdout, selectors.EVENT_READ)
|
|
deadline = time.time() + 10
|
|
gnutls_cli_output = b''
|
|
response = b''
|
|
while not response and not gnutls_cli.poll():
|
|
if not selector.select(timeout=deadline - time.time()):
|
|
break
|
|
gnutls_cli_output += gnutls_cli.stdout.read(512)
|
|
try:
|
|
# Ignore TCP length, just try to parse a DNS message from
|
|
# the rest of the data received.
|
|
response = dns.message.from_wire(gnutls_cli_output[2:])
|
|
except dns.exception.FormError:
|
|
continue
|
|
|
|
# At this point either a DNS response was received or a timeout fired
|
|
# or gnutls-cli exited prematurely. Close the standard input of
|
|
# gnutls-cli. Terminate it if that does not cause it to shut down
|
|
# gracefully.
|
|
gnutls_cli.stdin.close()
|
|
try:
|
|
gnutls_cli.wait(5)
|
|
except subprocess.TimeoutExpired:
|
|
gnutls_cli.kill()
|
|
|
|
# Store the response received for diagnostic purposes.
|
|
with open('gnutls-cli.out.bin', 'wb') as response_bin:
|
|
response_bin.write(gnutls_cli_output)
|
|
if response:
|
|
with open('gnutls-cli.out.txt', 'w', encoding='utf-8') as response_txt:
|
|
response_txt.write(response.to_text())
|
|
|
|
# Check whether a response was received and whether it is sane.
|
|
assert response
|
|
assert query.id == response.id
|
|
assert len(response.answer) == 1
|
|
assert response.answer[0].match(dns.name.from_text('example.'),
|
|
dns.rdataclass.IN, dns.rdatatype.SOA,
|
|
dns.rdatatype.NONE)
|