Source code for build_tools.corpus_db.ledger

"""
Corpus extraction run ledger - observational database for build provenance.

This module provides the CorpusLedger class, which manages SQLite-based tracking
of all syllable extraction runs. The ledger records who ran what extraction tool,
when, with which settings, and what outputs were produced.

Critical Design Principle:
    The ledger is **observational only** - it records what happened but does not
    influence extraction behavior. Extractors remain pure, deterministic functions.
    The ledger just watches and remembers.

Typical Usage:
    >>> from build_tools.corpus_db import CorpusLedger
    >>> from pathlib import Path
    >>>
    >>> # Initialize ledger (finds or creates database)
    >>> ledger = CorpusLedger()
    >>>
    >>> # Start a new extraction run
    >>> run_id = ledger.start_run(
    ...     extractor_tool="syllable_extractor",
    ...     extractor_version="0.2.0",
    ...     pyphen_lang="en_US",
    ...     min_len=2,
    ...     max_len=8,
    ...     command_line="python -m build_tools.syllable_extractor --file input.txt"
    ... )
    >>>
    >>> # Record input sources
    >>> ledger.record_input(run_id, Path("data/corpus/english.txt"))
    >>>
    >>> # ... extraction happens (ledger doesn't participate) ...
    >>>
    >>> # Record outputs
    >>> ledger.record_output(
    ...     run_id,
    ...     output_path=Path("data/raw/en_US/corpus_v1.syllables"),
    ...     syllable_count=5432,
    ...     unique_syllable_count=1234,
    ...     meta_path=Path("data/raw/en_US/corpus_v1.meta")
    ... )
    >>>
    >>> # Mark run complete
    >>> ledger.complete_run(run_id, exit_code=0, status="completed")
    >>>
    >>> # Query runs later
    >>> runs = ledger.get_runs_by_tool("syllable_extractor")
    >>> recent = ledger.get_recent_runs(limit=10)
"""

import socket
import sqlite3
from contextlib import contextmanager
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Iterator, Optional

from .schema import SCHEMA_VERSION, get_all_ddl_statements


[docs] class CorpusLedger: """ Manages the corpus extraction run ledger database. The CorpusLedger provides a simple API for recording extraction runs, their inputs, outputs, and outcomes. All operations are append-only - runs are never modified or deleted once recorded. The database file location is configurable but defaults to: - data/raw/syllable_extractor.db Attributes: db_path: Path to the SQLite database file _conn: Active database connection (None if not connected) Example: >>> ledger = CorpusLedger() >>> run_id = ledger.start_run( ... extractor_tool="syllable_extractor", ... extractor_version="0.2.0", ... pyphen_lang="en_US" ... ) >>> ledger.complete_run(run_id, exit_code=0, status="completed") """
[docs] def __init__(self, db_path: Optional[Path] = None): """ Initialize the corpus ledger. Creates the database and tables if they don't exist. If the database already exists, validates the schema version. Args: db_path: Path to SQLite database file. If None, defaults to data/raw/syllable_extractor.db in the project root. Raises: sqlite3.Error: If database initialization fails Example: >>> # Use default location >>> ledger = CorpusLedger() >>> >>> # Use custom location >>> ledger = CorpusLedger(Path("_working/test.db")) """ if db_path is None: # Default to data/raw/syllable_extractor.db # Assumes we're running from project root or tests project_root = Path.cwd() db_path = project_root / "data" / "raw" / "syllable_extractor.db" self.db_path = db_path self._conn: Optional[sqlite3.Connection] = None # Ensure parent directory exists self.db_path.parent.mkdir(parents=True, exist_ok=True) # Initialize database self._initialize_db()
def _initialize_db(self) -> None: """ Create database tables and indexes if they don't exist. This method is idempotent - safe to call multiple times. If the database already exists with the correct schema, no changes are made. Raises: sqlite3.Error: If database creation fails """ with self._get_connection() as conn: cursor = conn.cursor() # Execute all DDL statements for ddl in get_all_ddl_statements(): cursor.execute(ddl) # Check if schema version is recorded cursor.execute( "SELECT version FROM schema_version WHERE version = ?", (SCHEMA_VERSION,) ) if cursor.fetchone() is None: # Record schema version now = datetime.now(timezone.utc).isoformat() cursor.execute( "INSERT INTO schema_version (version, applied_at) VALUES (?, ?)", (SCHEMA_VERSION, now), ) conn.commit() @contextmanager def _get_connection(self) -> Iterator[sqlite3.Connection]: """ Context manager for database connections. Ensures connections are properly managed and closed. Uses row factory for dict-like row access. Yields: Active SQLite connection Example: >>> with ledger._get_connection() as conn: ... cursor = conn.cursor() ... cursor.execute("SELECT * FROM runs") """ if self._conn is None: self._conn = sqlite3.connect(self.db_path) self._conn.row_factory = sqlite3.Row try: yield self._conn finally: # Keep connection open for reuse, but ensure changes are committed self._conn.commit()
[docs] def close(self) -> None: """ Close the database connection. Should be called when done with the ledger. Using the ledger as a context manager (with statement) is preferred as it handles cleanup automatically. Example: >>> ledger = CorpusLedger() >>> # ... use ledger ... >>> ledger.close() >>> >>> # Preferred: use context manager >>> with CorpusLedger() as ledger: ... ledger.start_run(...) """ if self._conn is not None: self._conn.close() self._conn = None
[docs] def __enter__(self) -> "CorpusLedger": """Context manager entry.""" return self
[docs] def __exit__(self, exc_type, exc_val, exc_tb) -> None: """Context manager exit - ensures connection is closed.""" self.close()
[docs] def start_run( self, extractor_tool: str, extractor_version: Optional[str] = None, pyphen_lang: Optional[str] = None, auto_lang_detected: Optional[str] = None, min_len: Optional[int] = None, max_len: Optional[int] = None, recursive: bool = False, pattern: Optional[str] = None, command_line: Optional[str] = None, notes: Optional[str] = None, ) -> int: """ Record the start of a new extraction run. Creates a new run record with status='running' and returns the run ID. The caller should use this ID to record inputs, outputs, and eventually mark the run complete or failed. Args: extractor_tool: Name of the extraction tool (e.g., 'syllable_extractor', 'syllable_extractor_nltk', 'syllable_extractor_espeak') extractor_version: Version string or git SHA of the tool pyphen_lang: Pyphen language code (NULL for non-pyphen tools) auto_lang_detected: Auto-detected language code if auto-detection was used min_len: Minimum syllable length constraint max_len: Maximum syllable length constraint recursive: Whether source directory was processed recursively pattern: File pattern filter (e.g., '*.txt') command_line: Full command-line invocation for reproducibility notes: User-provided annotations about this run Returns: Unique run ID (integer) for this extraction run Example: >>> run_id = ledger.start_run( ... extractor_tool="syllable_extractor", ... extractor_version="0.2.0", ... pyphen_lang="en_US", ... min_len=2, ... max_len=8, ... command_line="python -m build_tools.syllable_extractor --file input.txt", ... notes="Testing new corpus from Project Gutenberg" ... ) >>> print(f"Started run {run_id}") Started run 42 """ # Get current timestamp in ISO 8601 format (UTC) timestamp = datetime.now(timezone.utc).isoformat() # Get hostname for tracking which machine ran this hostname = socket.gethostname() with self._get_connection() as conn: cursor = conn.cursor() cursor.execute( """ INSERT INTO runs ( run_timestamp, extractor_tool, extractor_version, hostname, status, pyphen_lang, auto_lang_detected, min_len, max_len, recursive, pattern, command_line, notes ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( timestamp, extractor_tool, extractor_version, hostname, "running", # Initial status pyphen_lang, auto_lang_detected, min_len, max_len, 1 if recursive else 0, pattern, command_line, notes, ), ) conn.commit() run_id = cursor.lastrowid # lastrowid is guaranteed to be not None after INSERT with AUTOINCREMENT assert run_id is not None, "Failed to get run_id from database" return run_id
[docs] def record_input( self, run_id: int, source_path: Path, file_count: Optional[int] = None, ) -> None: """ Record an input source for a run. Associates an input file or directory with an extraction run. Multiple inputs can be recorded for a single run. Note: Paths are stored in POSIX format (forward slashes) for cross-platform compatibility. Args: run_id: Run ID from start_run() source_path: Path to input file or directory file_count: Number of files if source_path is a directory (None for single file) Example: >>> run_id = ledger.start_run("syllable_extractor", "0.2.0") >>> ledger.record_input(run_id, Path("data/corpus/english.txt")) >>> ledger.record_input(run_id, Path("data/corpus/german/"), file_count=42) """ with self._get_connection() as conn: cursor = conn.cursor() cursor.execute( """ INSERT INTO inputs (run_id, source_path, file_count) VALUES (?, ?, ?) """, (run_id, source_path.as_posix(), file_count), ) conn.commit()
[docs] def record_output( self, run_id: int, output_path: Path, syllable_count: Optional[int] = None, unique_syllable_count: Optional[int] = None, meta_path: Optional[Path] = None, ) -> None: """ Record an output file for a run. Associates an output .syllables file with an extraction run. Multiple outputs can be recorded for a single run (e.g., batch processing). Note: Paths are stored in POSIX format (forward slashes) for cross-platform compatibility. Args: run_id: Run ID from start_run() output_path: Path to generated .syllables file syllable_count: Total number of syllables (including duplicates) unique_syllable_count: Number of unique syllables meta_path: Path to corresponding .meta file (if generated) Example: >>> ledger.record_output( ... run_id=42, ... output_path=Path("data/raw/en_US/corpus_v1.syllables"), ... syllable_count=5432, ... unique_syllable_count=1234, ... meta_path=Path("data/raw/en_US/corpus_v1.meta") ... ) """ with self._get_connection() as conn: cursor = conn.cursor() cursor.execute( """ INSERT INTO outputs ( run_id, output_path, syllable_count, unique_syllable_count, meta_path ) VALUES (?, ?, ?, ?, ?) """, ( run_id, output_path.as_posix(), syllable_count, unique_syllable_count, meta_path.as_posix() if meta_path else None, ), ) conn.commit()
[docs] def complete_run( self, run_id: int, exit_code: int, status: str = "completed", ) -> None: """ Mark a run as complete or failed. Updates the run status and exit code. This should be called when extraction finishes, whether successfully or with errors. Args: run_id: Run ID from start_run() exit_code: Unix exit code (0 = success, non-zero = failure) status: Final status - one of 'completed', 'failed', 'interrupted' Raises: ValueError: If status is not a valid value Example: >>> # Successful run >>> ledger.complete_run(run_id, exit_code=0, status="completed") >>> >>> # Failed run >>> ledger.complete_run(run_id, exit_code=1, status="failed") """ valid_statuses = {"completed", "failed", "interrupted"} if status not in valid_statuses: raise ValueError(f"Invalid status '{status}'. Must be one of {valid_statuses}") with self._get_connection() as conn: cursor = conn.cursor() cursor.execute( """ UPDATE runs SET exit_code = ?, status = ? WHERE id = ? """, (exit_code, status, run_id), ) conn.commit()
[docs] def get_run(self, run_id: int) -> Optional[dict[str, Any]]: """ Get details for a specific run. Args: run_id: Run ID to fetch Returns: Dictionary with run details, or None if run_id doesn't exist Example: >>> run = ledger.get_run(42) >>> if run: ... print(f"Tool: {run['extractor_tool']}") ... print(f"Status: {run['status']}") ... print(f"Command: {run['command_line']}") """ with self._get_connection() as conn: cursor = conn.cursor() cursor.execute("SELECT * FROM runs WHERE id = ?", (run_id,)) row = cursor.fetchone() return dict(row) if row else None
[docs] def get_runs_by_tool(self, extractor_tool: str) -> list[dict[str, Any]]: """ Get all runs for a specific extractor tool. Args: extractor_tool: Tool name to filter by (e.g., 'syllable_extractor') Returns: List of run dictionaries, ordered by timestamp descending (newest first) Example: >>> runs = ledger.get_runs_by_tool("syllable_extractor") >>> for run in runs: ... print(f"Run {run['id']}: {run['pyphen_lang']} ({run['status']})") """ with self._get_connection() as conn: cursor = conn.cursor() cursor.execute( """ SELECT * FROM runs WHERE extractor_tool = ? ORDER BY run_timestamp DESC """, (extractor_tool,), ) return [dict(row) for row in cursor.fetchall()]
[docs] def get_recent_runs(self, limit: int = 10) -> list[dict[str, Any]]: """ Get the most recent extraction runs. Args: limit: Maximum number of runs to return (default: 10) Returns: List of run dictionaries, ordered by timestamp descending (newest first) Example: >>> recent = ledger.get_recent_runs(limit=5) >>> for run in recent: ... print(f"{run['run_timestamp']}: {run['extractor_tool']}") """ with self._get_connection() as conn: cursor = conn.cursor() cursor.execute( """ SELECT * FROM runs ORDER BY run_timestamp DESC LIMIT ? """, (limit,), ) return [dict(row) for row in cursor.fetchall()]
[docs] def get_run_inputs(self, run_id: int) -> list[dict[str, Any]]: """ Get all input sources for a run. Args: run_id: Run ID to fetch inputs for Returns: List of input dictionaries with source_path and file_count Example: >>> inputs = ledger.get_run_inputs(42) >>> for inp in inputs: ... print(f"Source: {inp['source_path']}") ... if inp['file_count']: ... print(f" Files: {inp['file_count']}") """ with self._get_connection() as conn: cursor = conn.cursor() cursor.execute( "SELECT * FROM inputs WHERE run_id = ?", (run_id,), ) return [dict(row) for row in cursor.fetchall()]
[docs] def get_run_outputs(self, run_id: int) -> list[dict[str, Any]]: """ Get all outputs for a run. Args: run_id: Run ID to fetch outputs for Returns: List of output dictionaries with paths and syllable counts Example: >>> outputs = ledger.get_run_outputs(42) >>> for out in outputs: ... print(f"Output: {out['output_path']}") ... print(f" Unique syllables: {out['unique_syllable_count']}") """ with self._get_connection() as conn: cursor = conn.cursor() cursor.execute( "SELECT * FROM outputs WHERE run_id = ?", (run_id,), ) return [dict(row) for row in cursor.fetchall()]
[docs] def find_run_by_output(self, output_path: Path) -> Optional[dict[str, Any]]: """ Find which run produced a specific output file. This is the "reverse lookup" - given a .syllables file, find out how it was created. Args: output_path: Path to .syllables file to search for Returns: Run dictionary if found, None otherwise Example: >>> run = ledger.find_run_by_output(Path("data/raw/en_US/corpus_v1.syllables")) >>> if run: ... print(f"Created by: {run['command_line']}") ... print(f"On: {run['run_timestamp']}") """ with self._get_connection() as conn: cursor = conn.cursor() cursor.execute( """ SELECT r.* FROM runs r JOIN outputs o ON r.id = o.run_id WHERE o.output_path = ? """, (output_path.as_posix(),), ) row = cursor.fetchone() return dict(row) if row else None
[docs] def get_stats(self) -> dict[str, Any]: """ Get overall ledger statistics. Returns summary stats about all recorded runs, useful for understanding build history at a glance. Returns: Dictionary with statistics: - total_runs: Total number of runs recorded - completed_runs: Runs with status='completed' - failed_runs: Runs with status='failed' - tools_used: Set of unique extractor tools - languages_used: Set of unique pyphen language codes Example: >>> stats = ledger.get_stats() >>> print(f"Total runs: {stats['total_runs']}") >>> print(f"Success rate: {stats['completed_runs']/stats['total_runs']*100:.1f}%") >>> print(f"Tools: {', '.join(stats['tools_used'])}") """ with self._get_connection() as conn: cursor = conn.cursor() # Total runs cursor.execute("SELECT COUNT(*) FROM runs") total_runs = cursor.fetchone()[0] # Completed runs cursor.execute("SELECT COUNT(*) FROM runs WHERE status = 'completed'") completed_runs = cursor.fetchone()[0] # Failed runs cursor.execute("SELECT COUNT(*) FROM runs WHERE status = 'failed'") failed_runs = cursor.fetchone()[0] # Unique tools cursor.execute("SELECT DISTINCT extractor_tool FROM runs") tools_used = {row[0] for row in cursor.fetchall()} # Unique languages cursor.execute("SELECT DISTINCT pyphen_lang FROM runs WHERE pyphen_lang IS NOT NULL") languages_used = {row[0] for row in cursor.fetchall()} return { "total_runs": total_runs, "completed_runs": completed_runs, "failed_runs": failed_runs, "tools_used": tools_used, "languages_used": languages_used, }