"""
Textual TUI application for corpus database viewer.
Provides an interactive terminal interface for browsing database tables.
"""
from datetime import datetime
from pathlib import Path
from typing import Any
from textual import on
from textual.app import App, ComposeResult
from textual.binding import Binding
from textual.containers import Container, Horizontal, Vertical, VerticalScroll
from textual.screen import ModalScreen
from textual.widgets import (
Button,
DataTable,
Footer,
Header,
Input,
Label,
ListItem,
ListView,
Static,
)
from . import formatters, queries
[docs]
class SchemaModal(ModalScreen[None]):
"""Modal screen for displaying table schema information."""
DEFAULT_CSS = """
SchemaModal {
align: center middle;
}
SchemaModal > Container {
width: 90%;
height: 85%;
border: thick $background 80%;
background: $surface;
}
#schema-content {
width: 100%;
height: 1fr;
overflow-y: auto;
}
.schema-section {
margin: 1;
padding: 1;
border: solid $primary;
}
.schema-title {
text-style: bold;
color: $accent;
}
"""
def __init__(self, schema_data: dict[str, Any], table_name: str) -> None:
"""
Initialize schema modal.
Parameters
----------
schema_data : dict[str, Any]
Schema information from queries.get_table_schema()
table_name : str
Name of the table
"""
super().__init__()
self.schema_data = schema_data
self.table_name = table_name
[docs]
def compose(self) -> ComposeResult:
"""Create child widgets."""
with Container():
yield Static(f"Schema: {self.table_name}", classes="schema-title")
with VerticalScroll(id="schema-content"):
# Columns section
cols_text = "\n[bold]Columns:[/bold]\n"
for col in self.schema_data["columns"]:
pk = " [PRIMARY KEY]" if col["pk"] else ""
notnull = " NOT NULL" if col["notnull"] else ""
dflt = f" DEFAULT {col['dflt_value']}" if col["dflt_value"] else ""
cols_text += f" • {col['name']}: {col['type']}{pk}{notnull}{dflt}\n"
yield Static(cols_text, classes="schema-section")
# Indexes section
if self.schema_data["indexes"]:
idx_text = "\n[bold]Indexes:[/bold]\n"
for idx in self.schema_data["indexes"]:
idx_cols = ", ".join([c["name"] for c in idx["columns"]])
unique = "[UNIQUE] " if idx.get("unique") else ""
idx_text += f" • {unique}{idx['name']} ({idx_cols})\n"
yield Static(idx_text, classes="schema-section")
# CREATE TABLE SQL
if self.schema_data["create_sql"]:
yield Static(
f"\n[bold]CREATE TABLE Statement:[/bold]\n\n{self.schema_data['create_sql']}",
classes="schema-section",
)
yield Button("Close", id="close-schema")
[docs]
@on(Button.Pressed, "#close-schema")
def close_modal(self) -> None:
"""Close the schema modal."""
self.app.pop_screen()
[docs]
class ExportModal(ModalScreen[dict[str, str]]):
"""Modal screen for export options."""
DEFAULT_CSS = """
ExportModal {
align: center middle;
}
ExportModal > Container {
width: 60;
height: auto;
border: thick $background 80%;
background: $surface;
padding: 1 2;
}
ExportModal Input {
margin: 1 0;
}
ExportModal Horizontal {
height: auto;
align: center middle;
margin: 1 0;
}
"""
def __init__(self, default_filename: str) -> None:
"""
Initialize export modal.
Parameters
----------
default_filename : str
Default filename (without extension)
"""
super().__init__()
self.default_filename = default_filename
[docs]
def compose(self) -> ComposeResult:
"""Create child widgets."""
with Container():
yield Label("Export Data")
yield Label("Filename (without extension):")
yield Input(value=self.default_filename, id="filename-input")
with Horizontal():
yield Button("Export CSV", id="export-csv", variant="primary")
yield Button("Export JSON", id="export-json", variant="primary")
yield Button("Cancel", id="export-cancel")
[docs]
@on(Button.Pressed, "#export-csv")
def export_csv(self) -> None:
"""Export as CSV."""
filename_input = self.query_one("#filename-input", Input)
filename = filename_input.value.strip()
if filename:
self.dismiss({"format": "csv", "filename": filename})
[docs]
@on(Button.Pressed, "#export-json")
def export_json(self) -> None:
"""Export as JSON."""
filename_input = self.query_one("#filename-input", Input)
filename = filename_input.value.strip()
if filename:
self.dismiss({"format": "json", "filename": filename})
[docs]
@on(Button.Pressed, "#export-cancel")
def cancel_export(self) -> None:
"""Cancel export."""
self.dismiss(None)
[docs]
class HelpModal(ModalScreen[None]):
"""Modal screen showing keyboard shortcuts."""
DEFAULT_CSS = """
HelpModal {
align: center middle;
}
HelpModal > Container {
width: 70;
height: auto;
border: thick $background 80%;
background: $surface;
padding: 2;
}
"""
[docs]
def compose(self) -> ComposeResult:
"""Create child widgets."""
help_text = """
[bold cyan]Corpus Database Viewer - Keyboard Shortcuts[/bold cyan]
[bold]Navigation:[/bold]
↑/↓ Navigate rows
←/→ Previous/Next page
PageUp/PageDn Jump multiple pages
Home/End First/Last page
[bold]Actions:[/bold]
t Switch table (table selector)
i Show schema information
e Export current view
r Refresh data
[bold]Application:[/bold]
q Quit application
? Show this help screen
[bold]In Table Selector:[/bold]
↑/↓ Navigate tables
Enter Select table
Escape Cancel
Press any key to close this help screen.
"""
with Container():
yield Static(help_text)
yield Button("Close", id="close-help")
[docs]
@on(Button.Pressed, "#close-help")
def close_help(self) -> None:
"""Close help modal."""
self.app.pop_screen()
[docs]
def on_key(self, event) -> None:
"""Close on any key press."""
if event.key != "question_mark": # Don't close on the key that opened it
self.app.pop_screen()
[docs]
class CorpusDBViewerApp(App[None]):
"""
A Textual app for viewing corpus database provenance records.
This interactive TUI provides table browsing, schema viewing, and export
functionality for SQLite databases.
"""
CSS = """
Screen {
layout: horizontal;
}
#sidebar {
width: 30;
height: 100%;
border-right: solid $primary;
background: $boost;
}
#main-content {
width: 1fr;
height: 100%;
}
#table-info {
height: 3;
background: $boost;
padding: 0 1;
}
#data-table-container {
height: 1fr;
}
DataTable {
height: 100%;
}
ListView {
height: 100%;
}
ListItem {
padding: 0 1;
}
.sidebar-title {
text-style: bold;
padding: 1;
background: $primary;
color: $text;
}
"""
BINDINGS = [
Binding("q", "quit", "Quit", show=True),
Binding("question_mark", "show_help", "Help", show=True, key_display="?"),
Binding("t", "switch_table", "Switch Table", show=False),
Binding("i", "show_schema", "Schema", show=True),
Binding("e", "export_data", "Export", show=True),
Binding("r", "refresh", "Refresh", show=False),
Binding("left", "prev_page", "Prev Page", show=False),
Binding("right", "next_page", "Next Page", show=False),
Binding("pageup", "jump_back", "Jump Back", show=False),
Binding("pagedown", "jump_forward", "Jump Forward", show=False),
Binding("home", "first_page", "First Page", show=False),
Binding("end", "last_page", "Last Page", show=False),
]
def __init__(self, db_path: Path, export_dir: Path, page_size: int = 50) -> None:
"""
Initialize the app.
Parameters
----------
db_path : Path
Path to SQLite database
export_dir : Path
Directory for exported files
page_size : int, optional
Number of rows per page, by default 50
"""
super().__init__()
self.db_path = db_path
self.export_dir = export_dir
self.page_size = page_size
self.current_table: str | None = None
self.current_page: int = 1
self.total_pages: int = 1
self.total_rows: int = 0
self.current_data: list[dict[str, Any]] = []
self.tables: list[dict[str, str]] = [] # Store table list for lookup
[docs]
def compose(self) -> ComposeResult:
"""Create child widgets."""
yield Header()
with Horizontal():
# Sidebar with table list
with Vertical(id="sidebar"):
yield Static("Tables", classes="sidebar-title")
yield ListView(id="table-list")
# Main content area
with Vertical(id="main-content"):
yield Static(id="table-info")
with Container(id="data-table-container"):
yield DataTable(id="data-table", zebra_stripes=True)
yield Footer()
[docs]
def on_mount(self) -> None:
"""Initialize the app after mounting."""
# Load tables
try:
self.tables = queries.get_tables_list(self.db_path)
table_list = self.query_one("#table-list", ListView)
if not self.tables:
self.notify("No tables found in database", severity="warning")
return
for table in self.tables:
table_list.append(ListItem(Label(table["name"]), id=table["name"]))
# Select first table automatically
if self.tables:
self.current_table = self.tables[0]["name"]
self.load_table_data()
except Exception as e:
self.notify(f"Error loading tables: {e}", severity="error")
[docs]
def load_table_data(self) -> None:
"""Load data for the current table."""
if not self.current_table:
return
try:
# Get data
data = queries.get_table_data(
self.db_path,
self.current_table,
page=self.current_page,
limit=self.page_size,
)
self.total_rows = data["total"]
self.total_pages = data["total_pages"]
self.current_data = data["rows"]
# Update table info
table_info = self.query_one("#table-info", Static)
table_info.update(
f"Table: [bold]{self.current_table}[/bold] | "
f"Page {self.current_page}/{self.total_pages} | "
f"Total rows: {formatters.format_row_count(self.total_rows)}"
)
# Update data table
data_table = self.query_one("#data-table", DataTable)
data_table.clear(columns=True)
if self.current_data:
# Add columns
columns = list(self.current_data[0].keys())
for col in columns:
data_table.add_column(col, key=col)
# Add rows
for row in self.current_data:
data_table.add_row(*[str(row[col]) for col in columns])
except Exception as e:
self.notify(f"Error loading table data: {e}", severity="error")
[docs]
@on(ListView.Selected, "#table-list")
def on_table_selected(self, event: ListView.Selected) -> None:
"""Handle table selection from sidebar."""
if event.item and event.item.id:
# Use the ListItem's id which we set to the table name
self.current_table = str(event.item.id)
self.current_page = 1
self.load_table_data()
[docs]
def action_switch_table(self) -> None:
"""Focus the table list for switching tables."""
table_list = self.query_one("#table-list", ListView)
table_list.focus()
[docs]
def action_show_schema(self) -> None:
"""Show schema information for current table."""
if not self.current_table:
self.notify("No table selected", severity="warning")
return
try:
schema_data = queries.get_table_schema(self.db_path, self.current_table)
self.push_screen(SchemaModal(schema_data, self.current_table))
except Exception as e:
self.notify(f"Error loading schema: {e}", severity="error")
[docs]
def action_export_data(self) -> None:
"""Export current table data."""
if not self.current_table or not self.current_data:
self.notify("No data to export", severity="warning")
return
# Capture current_table in local variable for type narrowing
current_table = self.current_table
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
default_filename = f"{current_table}_{timestamp}"
def handle_export(result: dict[str, str] | None) -> None:
"""Handle export modal result."""
if result is None:
return
format_type = result["format"]
filename = result["filename"]
# Get all data (not just current page)
try:
all_data = queries.get_table_data(
self.db_path,
current_table,
page=1,
limit=self.total_rows if self.total_rows > 0 else 1000000,
)
output_path = self.export_dir / f"{filename}.{format_type}"
if format_type == "csv":
formatters.export_to_csv(all_data["rows"], output_path)
else: # json
formatters.export_to_json(all_data["rows"], output_path)
self.notify(f"Exported to {output_path}", severity="information")
except Exception as e:
self.notify(f"Export failed: {e}", severity="error")
self.push_screen(ExportModal(default_filename), handle_export)
[docs]
def action_refresh(self) -> None:
"""Refresh current table data."""
if self.current_table:
self.load_table_data()
self.notify("Data refreshed", severity="information")
[docs]
def action_show_help(self) -> None:
"""Show help modal with keyboard shortcuts."""
self.push_screen(HelpModal())
[docs]
def action_prev_page(self) -> None:
"""Go to previous page."""
if self.current_page > 1:
self.current_page -= 1
self.load_table_data()
[docs]
def action_next_page(self) -> None:
"""Go to next page."""
if self.current_page < self.total_pages:
self.current_page += 1
self.load_table_data()
[docs]
def action_jump_back(self) -> None:
"""Jump back 10 pages."""
self.current_page = max(1, self.current_page - 10)
self.load_table_data()
[docs]
def action_jump_forward(self) -> None:
"""Jump forward 10 pages."""
self.current_page = min(self.total_pages, self.current_page + 10)
self.load_table_data()
[docs]
def action_first_page(self) -> None:
"""Go to first page."""
self.current_page = 1
self.load_table_data()
[docs]
def action_last_page(self) -> None:
"""Go to last page."""
self.current_page = self.total_pages
self.load_table_data()