diff --git a/src/borg/archiver/benchmark_cmd.py b/src/borg/archiver/benchmark_cmd.py index 2818435f1..8e290ad73 100644 --- a/src/borg/archiver/benchmark_cmd.py +++ b/src/borg/archiver/benchmark_cmd.py @@ -134,23 +134,33 @@ class BenchmarkMixIn: key_96 = os.urandom(12) import io - from ..chunkers import get_chunker + from ..chunkers import get_chunker # noqa print("Chunkers =======================================================") size = "1GB" - def chunkit(chunker_name, *args, **kwargs): + def chunkit(ch): with io.BytesIO(random_10M) as data_file: - ch = get_chunker(chunker_name, *args, **kwargs) for _ in ch.chunkify(fd=data_file): pass - for spec, func in [ - ("buzhash,19,23,21,4095", lambda: chunkit("buzhash", 19, 23, 21, 4095, sparse=False)), - ("buzhash64,19,23,21,4095", lambda: chunkit("buzhash64", 19, 23, 21, 4095, sparse=False)), - ("fixed,1048576", lambda: chunkit("fixed", 1048576, sparse=False)), + for spec, setup, func, vars in [ + ( + "buzhash,19,23,21,4095", + "ch = get_chunker('buzhash', 19, 23, 21, 4095, sparse=False)", + "chunkit(ch)", + locals(), + ), + # note: the buzhash64 chunker creation is rather slow, so we must keep it in setup + ( + "buzhash64,19,23,21,4095", + "ch = get_chunker('buzhash64', 19, 23, 21, 4095, sparse=False)", + "chunkit(ch)", + locals(), + ), + ("fixed,1048576", "ch = get_chunker('fixed', 1048576, sparse=False)", "chunkit(ch)", locals()), ]: - print(f"{spec:<24} {size:<10} {timeit(func, number=100):.3f}s") + print(f"{spec:<24} {size:<10} {timeit(func, setup, number=100, globals=vars):.3f}s") from ..checksums import crc32, xxh64 diff --git a/src/borg/chunkers/__init__.py b/src/borg/chunkers/__init__.py index 463be44c8..5712c2ee8 100644 --- a/src/borg/chunkers/__init__.py +++ b/src/borg/chunkers/__init__.py @@ -13,7 +13,9 @@ def get_chunker(algo, *params, **kw): # key.chunk_seed only has 32bits seed = key.chunk_seed if key is not None else 0 # for buzhash64, we want a much longer key, so we derive it from the id key - bh64_key = key.derive_key(salt=b"", domain=b"buzhash64", size=32, from_id_key=True) if key is not None else b"" + bh64_key = ( + key.derive_key(salt=b"", domain=b"buzhash64", size=32, from_id_key=True) if key is not None else b"\0" * 32 + ) if algo == "buzhash": return Chunker(seed, *params, sparse=sparse) if algo == "buzhash64": diff --git a/src/borg/chunkers/buzhash64.pyi b/src/borg/chunkers/buzhash64.pyi index 3414bd609..7ff85b0f8 100644 --- a/src/borg/chunkers/buzhash64.pyi +++ b/src/borg/chunkers/buzhash64.pyi @@ -6,6 +6,7 @@ API_VERSION: str def buzhash64(data: bytes, key: bytes) -> int: ... def buzhash64_update(sum: int, remove: int, add: int, len: int, key: bytes) -> int: ... +def buzhash64_get_table(key: bytes) -> List[int]: ... class ChunkerBuzHash64: def __init__( diff --git a/src/borg/chunkers/buzhash64.pyx b/src/borg/chunkers/buzhash64.pyx index 0199406fe..a93f15a25 100644 --- a/src/borg/chunkers/buzhash64.pyx +++ b/src/borg/chunkers/buzhash64.pyx @@ -4,13 +4,14 @@ API_VERSION = '1.2_01' import cython import time -from hashlib import sha256 from cpython.bytes cimport PyBytes_AsString from libc.stdint cimport uint8_t, uint64_t from libc.stdlib cimport malloc, free from libc.string cimport memcpy, memmove +from ..crypto.low_level import CSPRNG + from ..constants import CH_DATA, CH_ALLOC, CH_HOLE, zeros from .reader import FileReader, Chunk @@ -40,14 +41,31 @@ cdef extern from *: @cython.boundscheck(False) # Deactivate bounds checking @cython.wraparound(False) # Deactivate negative indexing. cdef uint64_t* buzhash64_init_table(bytes key): - """Initialize the buzhash table using the given key.""" - cdef int i + """ + Generate a balanced pseudo-random table deterministically from a 256-bit key. + Balanced means that for each bit position 0..63, exactly 50% of the table values have the bit set to 1. + """ + # Create deterministic random number generator + rng = CSPRNG(key) + + cdef int i, j, bit_pos cdef uint64_t* table = malloc(2048) # 256 * sizeof(uint64_t) + + # Initialize all values to 0 for i in range(256): - # deterministically generate a pseudo-random 64-bit unsigned integer for table entry i involving the key: - v = f"{i:02x}".encode() + key - d64 = sha256(v).digest()[:8] - table[i] = int.from_bytes(d64, byteorder='little') + table[i] = 0 + + # For each bit position, deterministically assign exactly 128 positions to have that bit set + for bit_pos in range(64): + # Create a list of indices and shuffle deterministically + indices = list(range(256)) + rng.shuffle(indices) + + # Set the bit at bit_pos for the first 128 shuffled indices + for i in range(128): + j = indices[i] + table[j] |= (1ULL << bit_pos) + return table @@ -289,3 +307,14 @@ def buzhash64_update(uint64_t sum, unsigned char remove, unsigned char add, size sum = _buzhash64_update(sum, remove, add, len, table) free(table) return sum + + +def buzhash64_get_table(bytes key): + """Get the buzhash table generated from .""" + cdef uint64_t *table + cdef int i + table = buzhash64_init_table(key) + try: + return [table[i] for i in range(256)] + finally: + free(table) diff --git a/src/borg/crypto/low_level.pyx b/src/borg/crypto/low_level.pyx index 2a42ef513..4fd7f2f49 100644 --- a/src/borg/crypto/low_level.pyx +++ b/src/borg/crypto/low_level.pyx @@ -40,6 +40,10 @@ from math import ceil from cpython cimport PyMem_Malloc, PyMem_Free from cpython.buffer cimport PyBUF_SIMPLE, PyObject_GetBuffer, PyBuffer_Release +from cpython.bytes cimport PyBytes_FromStringAndSize, PyBytes_AsString +from libc.stdlib cimport malloc, free +from libc.stdint cimport uint8_t, uint32_t, uint64_t +from libc.string cimport memset, memcpy API_VERSION = '1.3_01' @@ -714,3 +718,161 @@ def blake2b_256(key, data): def blake2b_128(data): return hashlib.blake2b(data, digest_size=16).digest() + + +cdef class CSPRNG: + """ + Cryptographically Secure Pseudo-Random Number Generator based on AES-CTR mode. + + This class provides methods for generating random bytes and shuffling lists + using a deterministic algorithm seeded with a 256-bit key. + + The implementation uses AES-256 in CTR mode, which is a well-established + method for creating a CSPRNG. + """ + cdef EVP_CIPHER_CTX *ctx + cdef uint8_t key[32] + cdef uint8_t iv[16] + cdef uint8_t zeros[4096] # Static buffer for zeros + cdef uint8_t buffer[4096] # Static buffer for random bytes + cdef size_t buffer_size + cdef size_t buffer_pos + + def __cinit__(self, bytes seed_key): + """ + Initialize the CSPRNG with a 256-bit key. + + :param seed_key: A 32-byte key used as the seed for the CSPRNG + """ + if len(seed_key) != 32: + raise ValueError("Seed key must be 32 bytes (256 bits)") + + # Initialize context + self.ctx = EVP_CIPHER_CTX_new() + if self.ctx == NULL: + raise MemoryError("Failed to allocate cipher context") + + self.key = seed_key[:32] + + # Initialize to zeros + memset(self.iv, 0, 16) + memset(self.zeros, 0, 4096) + + self.buffer_size = 4096 + self.buffer_pos = self.buffer_size # Force refill on first use + + # Initialize the cipher + if not EVP_EncryptInit_ex(self.ctx, EVP_aes_256_ctr(), NULL, self.key, self.iv): + EVP_CIPHER_CTX_free(self.ctx) + raise CryptoError("Failed to initialize AES-CTR cipher") + + def __dealloc__(self): + """Free resources when the object is deallocated.""" + if self.ctx != NULL: + EVP_CIPHER_CTX_free(self.ctx) + self.ctx = NULL + + cdef _refill_buffer(self): + """Refill the internal buffer with random bytes.""" + cdef int outlen = 0 + + # Encrypt zeros to get random bytes + if not EVP_EncryptUpdate(self.ctx, self.buffer, &outlen, self.zeros, self.buffer_size): + raise CryptoError("Failed to generate random bytes") + if outlen != self.buffer_size: + raise CryptoError("Unexpected length of random bytes") + + self.buffer_pos = 0 + + def random_bytes(self, size_t n): + """ + Generate n random bytes. + + :param n: Number of bytes to generate + :return: a bytes object containing the random bytes + """ + # Directly create a Python bytes object of the required size + cdef object py_bytes = PyBytes_FromStringAndSize(NULL, n) + cdef uint8_t *result = PyBytes_AsString(py_bytes) + cdef size_t remaining + cdef size_t pos + cdef size_t to_copy + cdef size_t available + + remaining = n + pos = 0 + + while remaining > 0: + if self.buffer_pos >= self.buffer_size: + self._refill_buffer() + + # Calculate how many bytes we can copy + available = self.buffer_size - self.buffer_pos + to_copy = remaining if remaining < available else available + + # Copy bytes from buffer to result + memcpy(result + pos, &self.buffer[self.buffer_pos], to_copy) + + self.buffer_pos += to_copy + pos += to_copy + remaining -= to_copy + + return py_bytes + + def random_int(self, n): + """ + Generate a random integer in the range [0, n). + + :param n: Upper bound (exclusive) + :return: Random integer + """ + if n <= 0: + raise ValueError("Upper bound must be positive") + if n == 1: + return 0 + + # Calculate the number of bits and bytes needed + bits_needed = 0 + temp = n - 1 + while temp > 0: + bits_needed += 1 + temp >>= 1 + bytes_needed = (bits_needed + 7) // 8 + + # Generate random bytes + mask = (1 << bits_needed) - 1 + max_attempts = 1000 # Prevent infinite loop + + # Rejection sampling to avoid bias + attempts = 0 + while attempts < max_attempts: + attempts += 1 + random_data = self.random_bytes(bytes_needed) + result = int.from_bytes(random_data, byteorder='big') + + # Apply mask to get the right number of bits + result &= mask + if result < n: + return result + + # If we reach here, we've made too many attempts + # Fall back to a slightly biased but guaranteed-to-terminate method + random_data = self.random_bytes(bytes_needed) + result = int.from_bytes(random_data, byteorder='big') + return result % n + + def shuffle(self, list items): + """ + Shuffle a list in-place using the Fisher-Yates algorithm. + + :param items: List to shuffle + """ + cdef size_t n = len(items) + cdef size_t i, j + + for i in range(n - 1, 0, -1): + # Generate random index j such that 0 <= j <= i + j = self.random_int(i + 1) + + # Swap items[i] and items[j] + items[i], items[j] = items[j], items[i] diff --git a/src/borg/testsuite/chunkers/buzhash64_self_test.py b/src/borg/testsuite/chunkers/buzhash64_self_test.py index 41198477d..03b5a8bfa 100644 --- a/src/borg/testsuite/chunkers/buzhash64_self_test.py +++ b/src/borg/testsuite/chunkers/buzhash64_self_test.py @@ -6,63 +6,69 @@ from io import BytesIO from ...chunkers import get_chunker from ...chunkers.buzhash64 import buzhash64, buzhash64_update, ChunkerBuzHash64 from ...constants import * # NOQA +from ...helpers import hex_to_bin from .. import BaseTestCase from . import cf +# from os.urandom(32) +key0 = hex_to_bin("ad9f89095817f0566337dc9ee292fcd59b70f054a8200151f1df5f21704824da") +key1 = hex_to_bin("f1088c7e9e6ae83557ad1558ff36c44a369ea719d1081c29684f52ffccb72cb8") +key2 = hex_to_bin("57174a65fde67fe127b18430525b50a58406f1bd6cc629535208c7832e181067") + class ChunkerBuzHash64TestCase(BaseTestCase): def test_chunkify64(self): data = b"0" * int(1.5 * (1 << CHUNK_MAX_EXP)) + b"Y" - parts = cf(ChunkerBuzHash64(b"0", 1, CHUNK_MAX_EXP, 2, 2).chunkify(BytesIO(data))) + parts = cf(ChunkerBuzHash64(key0, 1, CHUNK_MAX_EXP, 2, 2).chunkify(BytesIO(data))) self.assert_equal(len(parts), 2) self.assert_equal(b"".join(parts), data) - self.assert_equal(cf(ChunkerBuzHash64(b"0", 1, CHUNK_MAX_EXP, 2, 2).chunkify(BytesIO(b""))), []) + self.assert_equal(cf(ChunkerBuzHash64(key0, 1, CHUNK_MAX_EXP, 2, 2).chunkify(BytesIO(b""))), []) self.assert_equal( - cf(ChunkerBuzHash64(b"0", 1, CHUNK_MAX_EXP, 2, 2).chunkify(BytesIO(b"foobarboobaz" * 3))), - [b"fo", b"obarbo", b"ob", b"azfo", b"obarbo", b"ob", b"azfo", b"obarbo", b"obaz"], + cf(ChunkerBuzHash64(key0, 1, CHUNK_MAX_EXP, 2, 2).chunkify(BytesIO(b"foobarboobaz" * 3))), + [b"foobarb", b"ooba", b"zf", b"oobarb", b"ooba", b"zf", b"oobarb", b"oobaz"], ) self.assert_equal( - cf(ChunkerBuzHash64(b"1", 1, CHUNK_MAX_EXP, 2, 2).chunkify(BytesIO(b"foobarboobaz" * 3))), - [b"fooba", b"rboobaz", b"fooba", b"rboobaz", b"fooba", b"rboobaz"], + cf(ChunkerBuzHash64(key1, 1, CHUNK_MAX_EXP, 2, 2).chunkify(BytesIO(b"foobarboobaz" * 3))), + [b"fo", b"oba", b"rb", b"oob", b"azf", b"ooba", b"rb", b"oob", b"azf", b"ooba", b"rb", b"oobaz"], ) self.assert_equal( - cf(ChunkerBuzHash64(b"2", 1, CHUNK_MAX_EXP, 2, 2).chunkify(BytesIO(b"foobarboobaz" * 3))), + cf(ChunkerBuzHash64(key2, 1, CHUNK_MAX_EXP, 2, 2).chunkify(BytesIO(b"foobarboobaz" * 3))), + [b"foobar", b"booba", b"zfoobar", b"booba", b"zfoobar", b"boobaz"], + ) + self.assert_equal( + cf(ChunkerBuzHash64(key0, 2, CHUNK_MAX_EXP, 2, 3).chunkify(BytesIO(b"foobarboobaz" * 3))), + [b"foobarbo", b"obaz", b"foobarbo", b"obaz", b"foobarbo", b"obaz"], + ) + self.assert_equal( + cf(ChunkerBuzHash64(key1, 2, CHUNK_MAX_EXP, 2, 3).chunkify(BytesIO(b"foobarboobaz" * 3))), + [b"foobarboob", b"azfoobarboob", b"azfoobarboobaz"], + ) + self.assert_equal( + cf(ChunkerBuzHash64(key2, 2, CHUNK_MAX_EXP, 2, 3).chunkify(BytesIO(b"foobarboobaz" * 3))), [b"foob", b"arboobazfoob", b"arboobazfoob", b"arboobaz"], ) self.assert_equal( - cf(ChunkerBuzHash64(b"0", 2, CHUNK_MAX_EXP, 2, 3).chunkify(BytesIO(b"foobarboobaz" * 3))), - [b"foobarb", b"oobaz", b"foobarb", b"oobaz", b"foobarb", b"oobaz"], - ) - self.assert_equal( - cf(ChunkerBuzHash64(b"1", 2, CHUNK_MAX_EXP, 2, 3).chunkify(BytesIO(b"foobarboobaz" * 3))), - [b"foobarbo", b"obazfo", b"obarbo", b"obazfo", b"obarbo", b"obaz"], - ) - self.assert_equal( - cf(ChunkerBuzHash64(b"2", 2, CHUNK_MAX_EXP, 2, 3).chunkify(BytesIO(b"foobarboobaz" * 3))), - [b"foobarboobaz", b"foobarboobaz", b"foobarboobaz"], - ) - self.assert_equal( - cf(ChunkerBuzHash64(b"0", 3, CHUNK_MAX_EXP, 2, 3).chunkify(BytesIO(b"foobarboobaz" * 3))), - [b"foobarbo", b"obazfoobarb", b"oobazfoo", b"barboobaz"], - ) - self.assert_equal( - cf(ChunkerBuzHash64(b"1", 3, CHUNK_MAX_EXP, 2, 3).chunkify(BytesIO(b"foobarboobaz" * 3))), + cf(ChunkerBuzHash64(key0, 3, CHUNK_MAX_EXP, 2, 3).chunkify(BytesIO(b"foobarboobaz" * 3))), [b"foobarbo", b"obazfoobarbo", b"obazfoobarbo", b"obaz"], ) self.assert_equal( - cf(ChunkerBuzHash64(b"2", 3, CHUNK_MAX_EXP, 2, 3).chunkify(BytesIO(b"foobarboobaz" * 3))), - [b"foobarboobaz", b"foobarboobaz", b"foobarboobaz"], + cf(ChunkerBuzHash64(key1, 3, CHUNK_MAX_EXP, 2, 3).chunkify(BytesIO(b"foobarboobaz" * 3))), + [b"foobarboob", b"azfoobarboob", b"azfoobarboobaz"], + ) + self.assert_equal( + cf(ChunkerBuzHash64(key2, 3, CHUNK_MAX_EXP, 2, 3).chunkify(BytesIO(b"foobarboobaz" * 3))), + [b"foobarboobazfoob", b"arboobazfoob", b"arboobaz"], ) def test_buzhash64(self): - self.assert_equal(buzhash64(b"abcdefghijklmnop", b"0"), 13095190927899934478) - self.assert_equal(buzhash64(b"abcdefghijklmnop", b"1"), 10129419249308136910) - expected = buzhash64(b"abcdefghijklmnop", b"1") - previous = buzhash64(b"Xabcdefghijklmno", b"1") - this = buzhash64_update(previous, ord("X"), ord("p"), 16, b"1") + self.assert_equal(buzhash64(b"abcdefghijklmnop", key0), 17414563089559790077) + self.assert_equal(buzhash64(b"abcdefghijklmnop", key1), 1397285894609271345) + expected = buzhash64(b"abcdefghijklmnop", key0) + previous = buzhash64(b"Xabcdefghijklmno", key0) + this = buzhash64_update(previous, ord("X"), ord("p"), 16, key0) self.assert_equal(this, expected) # Test with more than 63 bytes to make sure our barrel_shift macro works correctly - self.assert_equal(buzhash64(b"abcdefghijklmnopqrstuvwxyz" * 4, b"0"), 9064183923498167899) + self.assert_equal(buzhash64(b"abcdefghijklmnopqrstuvwxyz" * 4, key0), 17683050804041322250) def test_small_reads64(self): class SmallReadFile: diff --git a/src/borg/testsuite/chunkers/buzhash64_test.py b/src/borg/testsuite/chunkers/buzhash64_test.py index 7a0019732..1aafdbf70 100644 --- a/src/borg/testsuite/chunkers/buzhash64_test.py +++ b/src/borg/testsuite/chunkers/buzhash64_test.py @@ -4,10 +4,16 @@ import os from . import cf from ...chunkers import ChunkerBuzHash64 +from ...chunkers.buzhash64 import buzhash64_get_table from ...constants import * # NOQA from ...helpers import hex_to_bin +# from os.urandom(32) +key0 = hex_to_bin("ad9f89095817f0566337dc9ee292fcd59b70f054a8200151f1df5f21704824da") +key1 = hex_to_bin("f1088c7e9e6ae83557ad1558ff36c44a369ea719d1081c29684f52ffccb72cb8") + + def H(data): return sha256(data).digest() @@ -30,7 +36,7 @@ def test_chunkpoints64_unchanged(): if minexp >= maxexp: continue for maskbits in (4, 7, 10, 12): - for key in (b"first_key", b"second_key"): + for key in (key0, key1): fh = BytesIO(data) chunker = ChunkerBuzHash64(key, minexp, maxexp, maskbits, winsize) chunks = [H(c) for c in cf(chunker.chunkify(fh, -1))] @@ -39,13 +45,14 @@ def test_chunkpoints64_unchanged(): # The "correct" hash below matches the existing chunker behavior. # Future chunker optimisations must not change this, or existing repos will bloat. overall_hash = H(b"".join(runs)) - assert overall_hash == hex_to_bin("ab98713d28c5a544eeb8b6a2b5ba6405847bd6924d45fb7e267d173892ad0cdc") + print(overall_hash.hex()) + assert overall_hash == hex_to_bin("676676133fb3621ada0f6cc1b18002c3e37016c9469217d18f8e382fadaf23fd") def test_buzhash64_chunksize_distribution(): data = os.urandom(1048576) min_exp, max_exp, mask = 10, 16, 14 # chunk size target 16kiB, clip at 1kiB and 64kiB - chunker = ChunkerBuzHash64(b"", min_exp, max_exp, mask, 4095) + chunker = ChunkerBuzHash64(key0, min_exp, max_exp, mask, 4095) f = BytesIO(data) chunks = cf(chunker.chunkify(f)) del chunks[-1] # get rid of the last chunk, it can be smaller than 2**min_exp @@ -67,3 +74,27 @@ def test_buzhash64_chunksize_distribution(): # most chunks should be cut due to buzhash triggering, not due to clipping at min/max size: assert min_count < 10 assert max_count < 10 + + +def test_buzhash64_table(): + # Test that the function returns a list of 256 integers + table0 = buzhash64_get_table(key0) + assert len(table0) == 256 + + # Test that all elements are integers + for value in table0: + assert isinstance(value, int) + + # Test that the function is deterministic (same key produces same table) + table0_again = buzhash64_get_table(key0) + assert table0 == table0_again + + # Test that different keys produce different tables + table1 = buzhash64_get_table(key1) + assert table0 != table1 + + # Test that the table has balanced bit distribution + # For each bit position 0..63, exactly 50% of the table values should have the bit set to 1 + for bit_pos in range(64): + bit_count = sum(1 for value in table0 if value & (1 << bit_pos)) + assert bit_count == 128 # 50% of 256 = 128 diff --git a/src/borg/testsuite/crypto/csprng_test.py b/src/borg/testsuite/crypto/csprng_test.py new file mode 100644 index 000000000..110f30c10 --- /dev/null +++ b/src/borg/testsuite/crypto/csprng_test.py @@ -0,0 +1,183 @@ +import pytest + +from ...crypto.low_level import CSPRNG + + +# Test keys (32 bytes each) +key1 = bytes.fromhex("0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef") +key2 = bytes.fromhex("fedcba9876543210fedcba9876543210fedcba9876543210fedcba9876543210") + + +def test_deterministic_output(): + """Test that the same key produces the same random sequence.""" + # Create two CSPRNGs with the same key + rng1 = CSPRNG(key1) + rng2 = CSPRNG(key1) + + # Generate random bytes from both + bytes1 = rng1.random_bytes(100) + bytes2 = rng2.random_bytes(100) + + # They should be identical + assert bytes1 == bytes2 + + # Different keys should produce different outputs + rng3 = CSPRNG(key2) + bytes3 = rng3.random_bytes(100) + assert bytes1 != bytes3 + + +def test_random_bytes(): + """Test the random_bytes method.""" + rng = CSPRNG(key1) + + # Test different sizes + for size in [1, 10, 100, 1000, 10000]: + random_data = rng.random_bytes(size) + + # Check type + assert isinstance(random_data, bytes) + + # Check length + assert len(random_data) == size + + +def test_random_int(): + """Test the random_int method.""" + rng = CSPRNG(key1) + + # Test different ranges + for upper_bound in [2, 10, 100, 1000, 1000000, 1000000000, 1000000000000]: + # Generate multiple random integers + for _ in range(10): + random_int = rng.random_int(upper_bound) + + # Check range + assert 0 <= random_int < upper_bound + + # Check type + assert isinstance(random_int, int) + + +def test_random_int_edge_cases(): + """Test the random_int method with edge cases.""" + rng = CSPRNG(key1) + + # Test error case: upper_bound <= 0 + with pytest.raises(ValueError): + rng.random_int(-1) + + with pytest.raises(ValueError): + rng.random_int(0) + + # Test with upper bound 1 + assert rng.random_int(1) == 0 + + # Test with upper bound 2 + for _ in range(10): + result = rng.random_int(2) + assert 0 <= result < 2 + + # Test with upper bound that is a power of 2 + power_of_2 = 256 + for _ in range(10): + result = rng.random_int(power_of_2) + assert 0 <= result < power_of_2 + + # Test with upper bound that is one less than a power of 2 + almost_power_of_2 = 255 + for _ in range(10): + result = rng.random_int(almost_power_of_2) + assert 0 <= result < almost_power_of_2 + + # Test with upper bound that is one more than a power of 2 + just_over_power_of_2 = 257 + for _ in range(10): + result = rng.random_int(just_over_power_of_2) + assert 0 <= result < just_over_power_of_2 + + # Test with a large upper bound + large_bound = 1000000000 + for _ in range(10): + result = rng.random_int(large_bound) + assert 0 <= result < large_bound + + +def test_shuffle(): + """Test the shuffle method.""" + rng1 = CSPRNG(key1) + rng2 = CSPRNG(key1) + + # Create two identical lists + list1 = list(range(100)) + list2 = list(range(100)) + + # Shuffle both lists with the same key + rng1.shuffle(list1) + rng2.shuffle(list2) + + # They should be identical after shuffling + assert list1 == list2 + + # The shuffled list should be a permutation of the original + assert sorted(list1) == list(range(100)) + + # Different keys should produce different shuffles + rng3 = CSPRNG(key2) + list3 = list(range(100)) + rng3.shuffle(list3) + assert list1 != list3 + + # Getting another shuffled list by an already used RNG should produce a different shuffle + list4 = list(range(100)) + rng1.shuffle(list4) + assert list1 != list4 + + +def test_statistical_properties(): + """Test basic statistical properties of the random output.""" + rng = CSPRNG(key1) + + # Generate a large number of random bytes + data = rng.random_bytes(10000) + + # Count occurrences of each byte value + counts = [0] * 256 + for byte in data: + counts[byte] += 1 + + # Check that each byte value appears with roughly equal frequency + # For 10000 bytes, each value should appear about 39 times (10000/256) + # We allow a generous margin of error (±50%) + for count in counts: + assert 19 <= count <= 59, "Byte distribution is not uniform" + + # Test bit distribution + bits_set = 0 + for byte in data: + bits_set += bin(byte).count("1") + + # For random data, approximately 50% of bits should be set + # 10000 bytes = 80000 bits, so about 40000 should be set + # Allow ±5% margin + assert 38000 <= bits_set <= 42000, "Bit distribution is not uniform" + + +def test_large_shuffle(): + """Test shuffling a large list.""" + rng = CSPRNG(key1) + + # Create a large list + large_list = list(range(10000)) + + # Make a copy for comparison + original = large_list.copy() + + # Shuffle the list + rng.shuffle(large_list) + + # The shuffled list should be different from the original + assert large_list != original + + # The shuffled list should be a permutation of the original + assert sorted(large_list) == original