borg --cockpit: show TUI based on Textual

This commit is contained in:
Thomas Waldmann 2025-12-07 04:51:48 +01:00
parent 2152e1e3a9
commit 2af8de5800
No known key found for this signature in database
GPG key ID: 243ACFA951F78E01
9 changed files with 937 additions and 0 deletions

View file

@ -46,6 +46,7 @@ pyfuse3 = ["pyfuse3 >= 3.1.1"]
nofuse = []
s3 = ["borgstore[s3] ~= 0.3.0"]
sftp = ["borgstore[sftp] ~= 0.3.0"]
cockpit = ["textual>=6.8.0"] # might also work with older versions, untested
[project.urls]
"Homepage" = "https://borgbackup.org/"

View file

@ -337,6 +337,7 @@ class Archiver(
parser.add_argument(
"-V", "--version", action="version", version="%(prog)s " + __version__, help="show version number and exit"
)
parser.add_argument("--cockpit", dest="cockpit", action="store_true", help="Start the Borg TUI")
parser.common_options.add_common_group(parser, "_maincommand", provide_defaults=True)
common_parser = argparse.ArgumentParser(add_help=False, prog=self.prog)
@ -646,6 +647,23 @@ def main(): # pragma: no cover
print(msg, file=sys.stderr)
print(tb, file=sys.stderr)
sys.exit(EXIT_ERROR)
if args.cockpit:
# Cockpit TUI operation
try:
from ..cockpit.app import BorgCockpitApp
except ImportError as err:
print(f"ImportError: {err}", file=sys.stderr)
print("The Borg Cockpit feature has some additional requirements.", file=sys.stderr)
print("Please install them using: pip install 'borgbackup[cockpit]'", file=sys.stderr)
sys.exit(EXIT_ERROR)
app = BorgCockpitApp()
app.borg_args = [arg for arg in sys.argv[1:] if arg != "--cockpit"]
app.run()
sys.exit(EXIT_SUCCESS) # borg subprocess RC was already shown on the TUI
# normal borg CLI operation
try:
with sig_int:
exit_code = archiver.run(args)

View file

@ -0,0 +1,5 @@
"""
Borg Cockpit - Terminal User Interface for BorgBackup.
This module contains the TUI implementation using Textual.
"""

124
src/borg/cockpit/app.py Normal file
View file

@ -0,0 +1,124 @@
"""
Borg Cockpit - Application Entry Point.
"""
import asyncio
import time
from textual.app import App, ComposeResult
from textual.widgets import Header, Footer
from textual.containers import Horizontal, Container
from .theme import theme
class BorgCockpitApp(App):
"""The main TUI Application class for Borg Cockpit."""
from .. import __version__ as BORG_VERSION
TITLE = f"Cockpit for BorgBackup {BORG_VERSION}"
CSS_PATH = "cockpit.tcss"
BINDINGS = [("q", "quit", "Quit"), ("ctrl+c", "quit", "Quit"), ("t", "toggle_translator", "Toggle Translator")]
def compose(self) -> ComposeResult:
"""Create child widgets for the app."""
from .widgets import LogoPanel, StatusPanel, StandardLog
yield Header(show_clock=True)
with Container(id="main-grid"):
with Horizontal(id="top-row"):
yield LogoPanel(id="logopanel")
yield StatusPanel(id="status")
yield StandardLog(id="standard-log")
yield Footer()
def get_theme_variable_defaults(self):
# make these variables available to ALL themes
return {
"pulsar-color": "#ffffff",
"pulsar-dim-color": "#000000",
"star-color": "#888888",
"star-bright-color": "#ffffff",
"logo-color": "#00dd00",
}
def on_load(self) -> None:
"""Initialize theme before UI."""
self.register_theme(theme)
self.theme = theme.name
def on_mount(self) -> None:
"""Initialize components."""
from .runner import BorgRunner
self.query_one("#logo").styles.animate("opacity", 1, duration=1)
self.query_one("#slogan").styles.animate("opacity", 1, duration=1)
self.start_time = time.monotonic()
self.process_running = True
args = getattr(self, "borg_args", ["--version"]) # Default to safe command if none passed
self.runner = BorgRunner(args, self.handle_log_event)
self.runner_task = asyncio.create_task(self.runner.start())
# Speed tracking
self.total_lines_processed = 0
self.last_lines_processed = 0
self.speed_timer = self.set_interval(1.0, self.compute_speed)
def compute_speed(self) -> None:
"""Calculate and update speed (lines per second)."""
current_lines = self.total_lines_processed
lines_per_second = float(current_lines - self.last_lines_processed)
self.last_lines_processed = current_lines
status_panel = self.query_one("#status")
status_panel.update_speed(lines_per_second / 1000)
if self.process_running:
status_panel.elapsed_time = time.monotonic() - self.start_time
async def on_unmount(self) -> None:
"""Cleanup resources on app shutdown."""
if hasattr(self, "runner"):
await self.runner.stop()
async def action_quit(self) -> None:
"""Handle quit action."""
if hasattr(self, "speed_timer"):
self.speed_timer.stop()
if hasattr(self, "runner"):
await self.runner.stop()
if hasattr(self, "runner_task"):
await self.runner_task
self.query_one("#logo").styles.animate("opacity", 0, duration=2)
self.query_one("#slogan").styles.animate("opacity", 0, duration=2)
await asyncio.sleep(2) # give the user a chance the see the borg RC
self.exit()
def action_toggle_translator(self) -> None:
"""Toggle the universal translator."""
from .translator import TRANSLATOR
TRANSLATOR.toggle()
# Refresh dynamic UI elements
self.query_one("#status").refresh_ui_labels()
self.query_one("#standard-log").update_title()
self.query_one("#slogan").update_slogan()
def handle_log_event(self, data: dict):
"""Process a event from BorgRunner."""
msg_type = data.get("type", "log")
if msg_type == "stream_line":
self.total_lines_processed += 1
line = data.get("line", "")
widget = self.query_one("#standard-log")
widget.add_line(line)
elif msg_type == "process_finished":
self.process_running = False
rc = data.get("rc", 0)
self.query_one("#status").rc = rc

View file

@ -0,0 +1,201 @@
/* Borg Cockpit Stylesheet */
Screen {
background: $surface;
}
Header {
dock: top;
background: $primary;
color: $secondary;
text-style: bold;
}
Header * {
background: $primary;
color: $secondary;
text-style: bold;
}
.header--clock, .header--title, .header--icon {
background: $primary;
color: $secondary;
text-style: bold;
}
.header--clock {
dock: right;
}
Footer {
background: $background;
color: $primary;
dock: bottom;
}
.footer--key {
background: $background;
color: $primary;
text-style: bold;
}
.footer--description {
background: $background;
color: $primary;
text-style: bold;
}
.footer--highlight {
background: $primary;
color: $secondary;
}
#standard-log-content {
scrollbar-background: $background;
scrollbar-color: $primary;
/* Hide horizontal scrollbar and clip long lines at the right */
overflow-x: hidden;
text-wrap: nowrap;
}
#standard-log {
border: double $primary;
}
#main-grid {
/* Simple vertical stack: top row content-sized, log fills remaining space */
layout: vertical;
/* Fill available area between header and footer */
height: 1fr;
/* Allow shrinking when space is tight */
min-height: 0;
margin: 0 1;
}
#top-row {
border: double $primary;
/* If content grows too large, scroll rather than pushing the log off-screen */
overflow-y: auto;
/* Adjust this if status or logo panel shall get more/less height. */
height: 16;
}
#logopanel {
width: 50%;
/* Stretch to the full height of the top row so the separator spans fully */
height: 100%;
border-right: double $primary;
text-align: center;
layers: base overlay;
/* Make logo panel not influence row height beyond status; clip overflow */
overflow: hidden;
}
Starfield {
layer: base;
width: 100%;
/* Size to content and get clipped by the panel */
height: 100%;
min-height: 0;
}
Pulsar {
layer: overlay;
width: 3;
height: 3;
content-align: center middle;
color: $pulsar-color;
transition: color 4s linear;
}
Slogan {
layer: overlay;
width: auto;
height: 1;
content-align: center middle;
color: #00ff00;
transition: color 1s linear;
opacity: 0;
max-height: 100%;
overflow: hidden;
}
Logo {
layer: overlay;
width: auto;
/* Size to its intrinsic content, clipped by the panel */
height: auto;
opacity: 0;
max-height: 100%;
overflow: hidden;
}
Slogan.dim {
color: #005500;
}
Pulsar.dim {
color: $pulsar-dim-color;
}
#status {
width: 50%;
/* Let height be determined by content so the row can size to content */
height: auto;
/* Prevent internal content from forcing excessive height; allow scrolling */
overflow-y: auto;
}
/* Ensure the log always keeps at least 5 rows visible */
#standard-log {
min-height: 5;
/* Explicitly claim the remaining space in the grid */
height: 1fr;
}
/* Within the log panel (a Vertical container), keep the title to 1 line and let content fill the rest */
#standard-log-title {
height: 1;
}
#standard-log-content {
/* Allow the RichLog to expand within the log panel */
height: 1fr;
}
.panel-title {
background: $primary;
color: $secondary;
padding: 0 1;
text-style: bold;
}
#speed-sparkline {
width: 100%;
height: 4;
margin-bottom: 1;
}
.status {
color: $primary;
}
.errors-ok {
color: $success;
}
.errors-warning {
color: $warning;
}
.rc-ok {
color: $success;
}
.rc-warning {
color: $warning;
}
.rc-error {
color: $error;
}

View file

@ -0,0 +1,74 @@
"""
Borg Runner - Manages Borg subprocess execution and output parsing.
"""
import asyncio
import logging
import os
import sys
from typing import Optional, Callable, List
class BorgRunner:
"""
Manages the execution of the borg subprocess and parses its JSON output.
"""
def __init__(self, command: List[str], log_callback: Callable[[dict], None]):
self.command = command
self.log_callback = log_callback
self.process: Optional[asyncio.subprocess.Process] = None
self.logger = logging.getLogger(__name__)
async def start(self):
"""
Starts the Borg subprocess and processes its output.
"""
if self.process is not None:
self.logger.warning("Borg process already running.")
return
cmd = [sys.executable, "-m", "borg"] + self.command
self.logger.info(f"Starting Borg process: {cmd}")
env = os.environ.copy()
env["PYTHONUNBUFFERED"] = "1"
try:
self.process = await asyncio.create_subprocess_exec(
*cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, env=env
)
async def read_stream(stream, stream_name):
while True:
line = await stream.readline()
if not line:
break
decoded_line = line.decode("utf-8", errors="replace").rstrip()
if decoded_line:
self.log_callback({"type": "stream_line", "stream": stream_name, "line": decoded_line})
# Read both streams concurrently
await asyncio.gather(read_stream(self.process.stdout, "stdout"), read_stream(self.process.stderr, "stderr"))
rc = await self.process.wait()
self.log_callback({"type": "process_finished", "rc": rc})
except Exception as e:
self.logger.error(f"Failed to run Borg process: {e}")
self.log_callback({"type": "process_finished", "rc": -1, "error": str(e)})
finally:
self.process = None
async def stop(self):
"""
Stops the Borg subprocess if it is running.
"""
if self.process and self.process.returncode is None:
self.logger.info("Terminating Borg process...")
try:
self.process.terminate()
await self.process.wait()
except ProcessLookupError:
pass # Process already dead

29
src/borg/cockpit/theme.py Normal file
View file

@ -0,0 +1,29 @@
"""
Borg Theme Definition.
"""
from textual.theme import Theme
theme = Theme(
name="borg",
primary="#00FF00",
secondary="#000000", # text on top of $primary background
error="#FF0000",
warning="#FFA500",
success="#00FF00",
accent="#00FF00", # highlighted interactive elements
foreground="#00FF00", # default text color
background="#000000",
surface="#000000", # bg col of lowest layer
panel="#444444", # bg col of panels, containers, cards, sidebars, modal dialogs, etc.
dark=True,
variables={
"block-cursor-text-style": "none",
"input-selection-background": "#00FF00 35%",
"pulsar-color": "#ffffff",
"pulsar-dim-color": "#000000",
"star-color": "#888888",
"star-bright-color": "#ffffff",
"logo-color": "#00dd00",
},
)

View file

@ -0,0 +1,55 @@
"""
Universal Translator - Converts standard English into Borg Speak.
"""
BORG_DICTIONARY = { # English -> Borg
# UI Strings
"**** You're welcome! ****": "You will be assimilated! ",
"Files: ": "Drones: ",
"Unchanged: ": "Unchanged: ",
"Modified: ": "Modified: ",
"Added: ": "Assimilated: ",
"Other: ": "Other: ",
"Errors: ": "Escaped: ",
"RC: ": "Termination Code: ",
"Log": "Subspace Transmissions",
}
class UniversalTranslator:
"""
Handles translation of log messages.
"""
def __init__(self, enabled: bool = True):
# self.enabled is the opposite of "Translator active" on the TUI,
# because in the source, we translate English to Borg.
self.enabled = enabled # True: English -> Borg
def toggle(self):
"""Toggle translation state."""
self.enabled = not self.enabled
return self.enabled
def translate(self, message: str) -> str:
"""Translate a message if enabled."""
if not self.enabled:
return message
# Full matching first
if message in BORG_DICTIONARY:
return BORG_DICTIONARY[message]
# Substring matching next
for key, value in BORG_DICTIONARY.items():
if key in message:
return message.replace(key, value)
return message
# Global Instance
TRANSLATOR = UniversalTranslator(enabled=False)
# Global translation function
T = TRANSLATOR.translate

430
src/borg/cockpit/widgets.py Normal file
View file

@ -0,0 +1,430 @@
"""
Borg Cockpit - UI Widgets.
"""
import random
import time
from rich.markup import escape
from textual.app import ComposeResult
from textual.reactive import reactive
from textual.widgets import Static, RichLog
from textual.containers import Vertical, Container
from ..helpers import classify_ec
from .translator import T, TRANSLATOR
class StatusPanel(Static):
elapsed_time = reactive(0.0, init=False)
files_count = reactive(0, init=False) # unchanged + modified + added + other + error
unchanged_count = reactive(0, init=False)
modified_count = reactive(0, init=False)
added_count = reactive(0, init=False)
other_count = reactive(0, init=False)
error_count = reactive(0, init=False)
rc = reactive(None, init=False)
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.speed_history = [0.0] * SpeedSparkline.HISTORY_SIZE
def compose(self) -> ComposeResult:
with Vertical():
yield SpeedSparkline(self.speed_history, id="speed-sparkline")
yield Static(T("Speed: 0/s"), id="status-speed")
with Vertical(id="statuses"):
yield Static(T("Elapsed: 00d 00:00:00"), classes="status", id="status-elapsed")
yield Static(T("Files: 0"), classes="status", id="status-files")
yield Static(T("Unchanged: 0"), classes="status", id="status-unchanged")
yield Static(T("Modified: 0"), classes="status", id="status-modified")
yield Static(T("Added: 0"), classes="status", id="status-added")
yield Static(T("Other: 0"), classes="status", id="status-other")
yield Static(T("Errors: 0"), classes="status error-ok", id="status-errors")
yield Static(T("RC: RUNNING"), classes="status", id="status-rc")
def update_speed(self, kfiles_per_second: float):
self.speed_history.append(kfiles_per_second)
self.speed_history = self.speed_history[-SpeedSparkline.HISTORY_SIZE :]
# Use our custom update method
self.query_one("#speed-sparkline").update_data(self.speed_history)
self.query_one("#status-speed").update(T(f"Speed: {int(kfiles_per_second * 1000)}/s"))
def watch_error_count(self, count: int) -> None:
sw = self.query_one("#status-errors")
if count == 0:
sw.remove_class("errors-warning")
sw.add_class("errors-ok")
else:
sw.remove_class("errors-ok")
sw.add_class("errors-warning")
sw.update(T(f"Errors: {count}"))
def watch_files_count(self, count: int) -> None:
self.query_one("#status-files").update(T(f"Files: {count}"))
def watch_unchanged_count(self, count: int) -> None:
self.query_one("#status-unchanged").update(T(f"Unchanged: {count}"))
def watch_modified_count(self, count: int) -> None:
self.query_one("#status-modified").update(T(f"Modified: {count}"))
def watch_added_count(self, count: int) -> None:
self.query_one("#status-added").update(T(f"Added: {count}"))
def watch_other_count(self, count: int) -> None:
self.query_one("#status-other").update(T(f"Other: {count}"))
def watch_rc(self, rc: int):
label = self.query_one("#status-rc")
if rc is None:
label.update(T("RC: RUNNING"))
return
label.remove_class("rc-ok")
label.remove_class("rc-warning")
label.remove_class("rc-error")
status = classify_ec(rc)
if status == "success":
label.add_class("rc-ok")
elif status == "warning":
label.add_class("rc-warning")
else: # error, signal
label.add_class("rc-error")
label.update(T(f"RC: {rc}"))
def watch_elapsed_time(self, elapsed: float) -> None:
if TRANSLATOR.enabled:
# There seems to be no official formula for stardates, so we make something up.
# When showing the stardate, it is an absolute time, not relative "elapsed time".
ut = time.time()
sd = (ut - 1735689600) / 60.0 # Minutes since 2025-01-01 00:00.00 UTC
msg = f"Stardate {sd:.1f}"
else:
seconds = int(elapsed)
days, seconds = divmod(seconds, 86400)
h, m, s = seconds // 3600, (seconds % 3600) // 60, seconds % 60
msg = f"Elapsed: {days:02d}d {h:02d}:{m:02d}:{s:02d}"
self.query_one("#status-elapsed").update(msg)
def refresh_ui_labels(self):
"""Update static UI labels with current translation."""
self.watch_elapsed_time(self.elapsed_time)
self.query_one("#status-files").update(T(f"Files: {self.files_count}"))
self.query_one("#status-unchanged").update(T(f"Unchanged: {self.unchanged_count}"))
self.query_one("#status-modified").update(T(f"Modified: {self.modified_count}"))
self.query_one("#status-added").update(T(f"Added: {self.added_count}"))
self.query_one("#status-other").update(T(f"Other: {self.other_count}"))
self.query_one("#status-errors").update(T(f"Errors: {self.error_count}"))
if self.rc is not None:
self.watch_rc(self.rc)
else:
self.query_one("#status-rc").update(T("RC: RUNNING"))
class StandardLog(Vertical):
def compose(self) -> ComposeResult:
yield Static(T("Log"), classes="panel-title", id="standard-log-title")
yield RichLog(id="standard-log-content", highlight=False, markup=True, auto_scroll=True, max_lines=None)
def update_title(self):
self.query_one("#standard-log-title").update(T("Log"))
def add_line(self, line: str):
# TODO: make this more generic, use json output from borg.
# currently, this is only really useful for borg create/extract --list
line = line.rstrip()
if len(line) == 0:
return
markup_tag = None
if len(line) >= 2:
if line[1] == " " and line[0] in "EAMUdcbs+-":
# looks like from borg create/extract --list
status_panel = self.app.query_one("#status")
status_panel.files_count += 1
status = line[0]
if status == "E":
status_panel.error_count += 1
elif status in "U-":
status_panel.unchanged_count += 1
elif status in "M":
status_panel.modified_count += 1
elif status in "A+":
status_panel.added_count += 1
elif status in "dcbs":
status_panel.other_count += 1
markup_tag = {
"E": "red", # Error
"A": "white", # Added regular file (cache miss, slow!)
"M": "white", # Modified regular file (cache hit, but different, slow!)
"U": "green", # Updated regular file (cache hit)
"d": "green", # directory
"c": "green", # char device
"b": "green", # block device
"s": "green", # socket
"-": "white", # excluded
"+": "green", # included
}.get(status)
log_widget = self.query_one("#standard-log-content")
safe_line = escape(line)
if markup_tag:
safe_line = f"[{markup_tag}]{safe_line}[/]"
log_widget.write(safe_line)
class Starfield(Static):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# Generate a unique seed for this instance to ensure random
# distribution per session but stable appearance during resize.
self._seed = random.randint(0, 1000000) # nosec B311 - UI-only randomness, not for crypto
def on_mount(self) -> None:
self.call_after_refresh(self._update_art)
def on_resize(self, event) -> None:
self._update_art()
def _update_art(self) -> None:
"""Render starfield."""
w, h = self.size
# Don't try to render if too small
if w < 10 or h < 5:
return
# Use our instance seed to keep stars "static" (same pattern) during resize
random.seed(self._seed)
star_density = 0.1
big_star_chance = 0.1
from .theme import theme
star_color = f"[{theme.variables['star-color']}]"
star_bright_color = f"[{theme.variables['star-bright-color']}]"
# 1. Create canvas (Starfield)
canvas = [[(" ", "")] * w for _ in range(h)]
for y in range(h):
for x in range(w):
if random.random() < star_density: # nosec B311 - visual effect randomness
if random.random() < big_star_chance: # nosec B311 - visual effect randomness
char = "*"
color = star_bright_color
else:
char = random.choice([".", "·"]) # nosec B311 - visual effect randomness
color = star_color
canvas[y][x] = (char, color)
# 2. Render to string
c_reset = "[/]"
final_lines = []
for row in canvas:
line_str = ""
for char, color in row:
if char == " ":
line_str += " "
else:
line_str += f"{color}{escape(char)}{c_reset}"
final_lines.append(line_str)
art_str = "\n".join(final_lines)
self.update(art_str)
class Pulsar(Static):
PULSAR_ART = "\n".join(["", "─*─", ""])
H = 3
W = 3
def on_mount(self) -> None:
self.set_interval(4.0, self.pulse)
self.update_art()
def pulse(self) -> None:
self.toggle_class("dim")
def update_art(self) -> None:
self.update(self.PULSAR_ART)
class Slogan(Static):
SLOGAN = "**** You're welcome! ****"
H = 1
W = len(SLOGAN)
def on_mount(self) -> None:
self.update(self.SLOGAN)
self.set_interval(1.0, self.pulse)
def pulse(self) -> None:
self.toggle_class("dim")
def update_slogan(self):
self.update(T(self.SLOGAN))
class Logo(Static):
BORG_ART = [
"██████╗ ██████╗ ██████╗ ██████╗ ",
"██╔══██╗██╔═══██╗██╔══██╗██╔════╝ ",
"██████╔╝██║ ██║██████╔╝██║ ███╗",
"██╔══██╗██║ ██║██╔══██╗██║ ██║",
"██████╔╝╚██████╔╝██║ ██║╚██████╔╝",
"╚═════╝ ╚═════╝ ╚═╝ ╚═╝ ╚═════╝ ",
]
H = len(BORG_ART)
W = max(len(line) for line in BORG_ART)
def on_mount(self) -> None:
from .theme import theme
logo_color = theme.variables["logo-color"]
lines = []
for line in self.BORG_ART:
lines.append(f"[bold {logo_color}]{escape(line)}[/]")
self.update("\n".join(lines))
class LogoPanel(Container):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._seed = random.randint(0, 1000000) # nosec B311 - UI-only randomness, not for crypto
def compose(self) -> ComposeResult:
yield Starfield()
yield Logo(id="logo")
yield Slogan(id="slogan")
yield Pulsar()
def on_resize(self, event) -> None:
w, h = self.size
# Needs enough space to position reasonably
if w > 4 and h > 4:
random.seed(self._seed)
# Exclusion Zone Calculation
# --------------------------
# Logo top-left
logo_y = (h - Logo.H) // 2 - 1
logo_x = (w - Logo.W) // 2
# Slogan top-left
slogan_y = logo_y + Logo.H + 2
slogan_x = (w - Slogan.W) // 2
# Forbidden area
# --------------
# Combined rect over Logo and Slogan
f_y1 = logo_y
f_y2 = slogan_y + Slogan.H
f_x1 = min(logo_x, slogan_x)
f_x2 = max(logo_x + Logo.W, slogan_x + Slogan.W)
# Update Logo and Slogan position
# Note: In the overlay layer, widgets stack vertically.
# Logo is at y=0 (height Logo.H).
# Slogan is at y=Logo.H (height Slogan.H).
# Pulsar is at y=Logo.H+Slogan.H (height Pulsar.H)
# We must subtract these flow positions from the desired absolute positions.
self.query_one(Logo).styles.offset = (logo_x, logo_y)
self.query_one(Slogan).styles.offset = (slogan_x, slogan_y - Logo.H)
# Pulsar: styles.offset moves the top-left corner.
# So if offset is (px, py), it occupies x=[px, px+Pulsar.W), y=[py, py+Pulsar.H).
# Find a valid Pulsar position
for _ in range(20):
# Random position
max_x = max(0, w - Pulsar.W)
max_y = max(0, h - Pulsar.H)
px = random.randint(0, max_x) # nosec B311 - visual placement randomness
py = random.randint(0, max_y) # nosec B311 - visual placement randomness
# Pulsar Rect:
p_x1, p_y1 = px, py
p_x2, p_y2 = px + Pulsar.W, py + Pulsar.H
# Check intersection with forbidden rect
overlap_x = (p_x1 < f_x2) and (p_x2 > f_x1)
overlap_y = (p_y1 < f_y2) and (p_y2 > f_y1)
if overlap_x and overlap_y:
continue # Try again
# No overlap!
offset_x, offset_y = px, py - (Logo.H + Slogan.H)
break
else:
# Fallback if no safe spot found (e.g. screen too small):
# Place top-left or keep last valid. random 0,0 is safe-ish.
offset_x, offset_y = 0, 0 - (Logo.H + Slogan.H)
self.query_one(Pulsar).styles.offset = (offset_x, offset_y)
class SpeedSparkline(Static):
"""
Custom 4-line height sparkline.
"""
HISTORY_SIZE = 99
BLOCKS = [".", " ", "", "", "", "", "", "", ""]
def __init__(self, data: list[float] = None, **kwargs):
super().__init__(**kwargs)
self._data = data or []
def update_data(self, data: list[float]):
self._data = data
self.refresh_chart()
def refresh_chart(self):
if not self._data:
self.update("")
return
width = self.size.width or self.HISTORY_SIZE
# Slice data to width
dataset = self._data[-width:]
if not dataset:
self.update("")
return
max_val = max(dataset) if dataset else 1.0
max_val = max(max_val, 1.0) # Avoid div by zero
# We have 4 lines, each can take 8 levels. Total 32 levels.
# Normalize each data point to 0..32
lines = [[], [], [], []]
for val in dataset:
# Scale to 0-32
scaled = (val / max_val) * 32
# Generate 4 stacked chars
for i in range(4):
# i=0 is top line, i=3 is bottom line
# Thresholds: Top(24), Mid-High(16), Mid-Low(8), Low(0)
threshold = (3 - i) * 8
level = int(scaled - threshold)
level = max(0, min(8, level))
lines[i].append(self.BLOCKS[level])
# Join lines
rows = ["".join(line) for line in lines]
self.update("\n".join(rows))
def on_resize(self, event):
self.refresh_chart()