"""
Corpus extraction run ledger - observational database for build provenance.
This module provides the CorpusLedger class, which manages SQLite-based tracking
of all syllable extraction runs. The ledger records who ran what extraction tool,
when, with which settings, and what outputs were produced.
Critical Design Principle:
The ledger is **observational only** - it records what happened but does not
influence extraction behavior. Extractors remain pure, deterministic functions.
The ledger just watches and remembers.
Typical Usage:
>>> from build_tools.corpus_db import CorpusLedger
>>> from pathlib import Path
>>>
>>> # Initialize ledger (finds or creates database)
>>> ledger = CorpusLedger()
>>>
>>> # Start a new extraction run
>>> run_id = ledger.start_run(
... extractor_tool="syllable_extractor",
... extractor_version="0.2.0",
... pyphen_lang="en_US",
... min_len=2,
... max_len=8,
... command_line="python -m build_tools.syllable_extractor --file input.txt"
... )
>>>
>>> # Record input sources
>>> ledger.record_input(run_id, Path("data/corpus/english.txt"))
>>>
>>> # ... extraction happens (ledger doesn't participate) ...
>>>
>>> # Record outputs
>>> ledger.record_output(
... run_id,
... output_path=Path("data/raw/en_US/corpus_v1.syllables"),
... syllable_count=5432,
... unique_syllable_count=1234,
... meta_path=Path("data/raw/en_US/corpus_v1.meta")
... )
>>>
>>> # Mark run complete
>>> ledger.complete_run(run_id, exit_code=0, status="completed")
>>>
>>> # Query runs later
>>> runs = ledger.get_runs_by_tool("syllable_extractor")
>>> recent = ledger.get_recent_runs(limit=10)
"""
from __future__ import annotations
import socket
import sqlite3
from collections.abc import Iterator
from contextlib import contextmanager
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
from .schema import SCHEMA_VERSION, get_all_ddl_statements
[docs]
class CorpusLedger:
"""
Manages the corpus extraction run ledger database.
The CorpusLedger provides a simple API for recording extraction runs,
their inputs, outputs, and outcomes. All operations are append-only -
runs are never modified or deleted once recorded.
The database file location is configurable but defaults to:
- data/raw/syllable_extractor.db
Attributes:
db_path: Path to the SQLite database file
_conn: Active database connection (None if not connected)
Example:
>>> ledger = CorpusLedger()
>>> run_id = ledger.start_run(
... extractor_tool="syllable_extractor",
... extractor_version="0.2.0",
... pyphen_lang="en_US"
... )
>>> ledger.complete_run(run_id, exit_code=0, status="completed")
"""
[docs]
def __init__(self, db_path: Path | None = None):
"""
Initialize the corpus ledger.
Creates the database and tables if they don't exist. If the database
already exists, validates the schema version.
Args:
db_path: Path to SQLite database file. If None, defaults to
data/raw/syllable_extractor.db in the project root.
Raises:
sqlite3.Error: If database initialization fails
Example:
>>> # Use default location
>>> ledger = CorpusLedger()
>>>
>>> # Use custom location
>>> ledger = CorpusLedger(Path("_working/test.db"))
"""
if db_path is None:
# Default to data/raw/syllable_extractor.db
# Assumes we're running from project root or tests
project_root = Path.cwd()
db_path = project_root / "data" / "raw" / "syllable_extractor.db"
self.db_path = db_path
self._conn: sqlite3.Connection | None = None
# Ensure parent directory exists
self.db_path.parent.mkdir(parents=True, exist_ok=True)
# Initialize database
self._initialize_db()
def _initialize_db(self) -> None:
"""
Create database tables and indexes if they don't exist.
This method is idempotent - safe to call multiple times. If the
database already exists with the correct schema, no changes are made.
Raises:
sqlite3.Error: If database creation fails
"""
with self._get_connection() as conn:
cursor = conn.cursor()
# Execute all DDL statements
for ddl in get_all_ddl_statements():
cursor.execute(ddl)
# Check if schema version is recorded
cursor.execute(
"SELECT version FROM schema_version WHERE version = ?", (SCHEMA_VERSION,)
)
if cursor.fetchone() is None:
# Record schema version
now = datetime.now(timezone.utc).isoformat()
cursor.execute(
"INSERT INTO schema_version (version, applied_at) VALUES (?, ?)",
(SCHEMA_VERSION, now),
)
conn.commit()
@contextmanager
def _get_connection(self) -> Iterator[sqlite3.Connection]:
"""
Context manager for database connections.
Ensures connections are properly managed and closed. Uses row factory
for dict-like row access.
Yields:
Active SQLite connection
Example:
>>> with ledger._get_connection() as conn:
... cursor = conn.cursor()
... cursor.execute("SELECT * FROM runs")
"""
if self._conn is None:
self._conn = sqlite3.connect(self.db_path)
self._conn.row_factory = sqlite3.Row
try:
yield self._conn
finally:
# Keep connection open for reuse, but ensure changes are committed
self._conn.commit()
[docs]
def close(self) -> None:
"""
Close the database connection.
Should be called when done with the ledger. Using the ledger as a
context manager (with statement) is preferred as it handles cleanup
automatically.
Example:
>>> ledger = CorpusLedger()
>>> # ... use ledger ...
>>> ledger.close()
>>>
>>> # Preferred: use context manager
>>> with CorpusLedger() as ledger:
... ledger.start_run(...)
"""
if self._conn is not None:
self._conn.close()
self._conn = None
[docs]
def __enter__(self) -> "CorpusLedger":
"""Context manager entry."""
return self
[docs]
def __exit__(self, exc_type, exc_val, exc_tb) -> None:
"""Context manager exit - ensures connection is closed."""
self.close()
[docs]
def start_run(
self,
extractor_tool: str,
extractor_version: str | None = None,
pyphen_lang: str | None = None,
auto_lang_detected: str | None = None,
min_len: int | None = None,
max_len: int | None = None,
recursive: bool = False,
pattern: str | None = None,
command_line: str | None = None,
notes: str | None = None,
) -> int:
"""
Record the start of a new extraction run.
Creates a new run record with status='running' and returns the run ID.
The caller should use this ID to record inputs, outputs, and eventually
mark the run complete or failed.
Args:
extractor_tool: Name of the extraction tool (e.g., 'syllable_extractor',
'syllable_extractor_nltk', 'syllable_extractor_espeak')
extractor_version: Version string or git SHA of the tool
pyphen_lang: Pyphen language code (NULL for non-pyphen tools)
auto_lang_detected: Auto-detected language code if auto-detection was used
min_len: Minimum syllable length constraint
max_len: Maximum syllable length constraint
recursive: Whether source directory was processed recursively
pattern: File pattern filter (e.g., '*.txt')
command_line: Full command-line invocation for reproducibility
notes: User-provided annotations about this run
Returns:
Unique run ID (integer) for this extraction run
Example:
>>> run_id = ledger.start_run(
... extractor_tool="syllable_extractor",
... extractor_version="0.2.0",
... pyphen_lang="en_US",
... min_len=2,
... max_len=8,
... command_line="python -m build_tools.syllable_extractor --file input.txt",
... notes="Testing new corpus from Project Gutenberg"
... )
>>> print(f"Started run {run_id}")
Started run 42
"""
# Get current timestamp in ISO 8601 format (UTC)
timestamp = datetime.now(timezone.utc).isoformat()
# Get hostname for tracking which machine ran this
hostname = socket.gethostname()
with self._get_connection() as conn:
cursor = conn.cursor()
cursor.execute(
"""
INSERT INTO runs (
run_timestamp, extractor_tool, extractor_version, hostname,
status, pyphen_lang, auto_lang_detected, min_len, max_len,
recursive, pattern, command_line, notes
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
timestamp,
extractor_tool,
extractor_version,
hostname,
"running", # Initial status
pyphen_lang,
auto_lang_detected,
min_len,
max_len,
1 if recursive else 0,
pattern,
command_line,
notes,
),
)
conn.commit()
run_id = cursor.lastrowid
# lastrowid is guaranteed to be not None after INSERT with AUTOINCREMENT
assert run_id is not None, "Failed to get run_id from database"
return run_id
[docs]
def record_output(
self,
run_id: int,
output_path: Path,
syllable_count: int | None = None,
unique_syllable_count: int | None = None,
meta_path: Path | None = None,
) -> None:
"""
Record an output file for a run.
Associates an output .syllables file with an extraction run. Multiple
outputs can be recorded for a single run (e.g., batch processing).
Note: Paths are stored in POSIX format (forward slashes) for cross-platform
compatibility.
Args:
run_id: Run ID from start_run()
output_path: Path to generated .syllables file
syllable_count: Total number of syllables (including duplicates)
unique_syllable_count: Number of unique syllables
meta_path: Path to corresponding .meta file (if generated)
Example:
>>> ledger.record_output(
... run_id=42,
... output_path=Path("data/raw/en_US/corpus_v1.syllables"),
... syllable_count=5432,
... unique_syllable_count=1234,
... meta_path=Path("data/raw/en_US/corpus_v1.meta")
... )
"""
with self._get_connection() as conn:
cursor = conn.cursor()
cursor.execute(
"""
INSERT INTO outputs (
run_id, output_path, syllable_count,
unique_syllable_count, meta_path
) VALUES (?, ?, ?, ?, ?)
""",
(
run_id,
output_path.as_posix(),
syllable_count,
unique_syllable_count,
meta_path.as_posix() if meta_path else None,
),
)
conn.commit()
[docs]
def complete_run(
self,
run_id: int,
exit_code: int,
status: str = "completed",
) -> None:
"""
Mark a run as complete or failed.
Updates the run status and exit code. This should be called when
extraction finishes, whether successfully or with errors.
Args:
run_id: Run ID from start_run()
exit_code: Unix exit code (0 = success, non-zero = failure)
status: Final status - one of 'completed', 'failed', 'interrupted'
Raises:
ValueError: If status is not a valid value
Example:
>>> # Successful run
>>> ledger.complete_run(run_id, exit_code=0, status="completed")
>>>
>>> # Failed run
>>> ledger.complete_run(run_id, exit_code=1, status="failed")
"""
valid_statuses = {"completed", "failed", "interrupted"}
if status not in valid_statuses:
raise ValueError(f"Invalid status '{status}'. Must be one of {valid_statuses}")
with self._get_connection() as conn:
cursor = conn.cursor()
cursor.execute(
"""
UPDATE runs
SET exit_code = ?, status = ?
WHERE id = ?
""",
(exit_code, status, run_id),
)
conn.commit()
[docs]
def get_run(self, run_id: int) -> dict[str, Any] | None:
"""
Get details for a specific run.
Args:
run_id: Run ID to fetch
Returns:
Dictionary with run details, or None if run_id doesn't exist
Example:
>>> run = ledger.get_run(42)
>>> if run:
... print(f"Tool: {run['extractor_tool']}")
... print(f"Status: {run['status']}")
... print(f"Command: {run['command_line']}")
"""
with self._get_connection() as conn:
cursor = conn.cursor()
cursor.execute("SELECT * FROM runs WHERE id = ?", (run_id,))
row = cursor.fetchone()
return dict(row) if row else None
[docs]
def get_recent_runs(self, limit: int = 10) -> list[dict[str, Any]]:
"""
Get the most recent extraction runs.
Args:
limit: Maximum number of runs to return (default: 10)
Returns:
List of run dictionaries, ordered by timestamp descending (newest first)
Example:
>>> recent = ledger.get_recent_runs(limit=5)
>>> for run in recent:
... print(f"{run['run_timestamp']}: {run['extractor_tool']}")
"""
with self._get_connection() as conn:
cursor = conn.cursor()
cursor.execute(
"""
SELECT * FROM runs
ORDER BY run_timestamp DESC
LIMIT ?
""",
(limit,),
)
return [dict(row) for row in cursor.fetchall()]
[docs]
def get_run_outputs(self, run_id: int) -> list[dict[str, Any]]:
"""
Get all outputs for a run.
Args:
run_id: Run ID to fetch outputs for
Returns:
List of output dictionaries with paths and syllable counts
Example:
>>> outputs = ledger.get_run_outputs(42)
>>> for out in outputs:
... print(f"Output: {out['output_path']}")
... print(f" Unique syllables: {out['unique_syllable_count']}")
"""
with self._get_connection() as conn:
cursor = conn.cursor()
cursor.execute(
"SELECT * FROM outputs WHERE run_id = ?",
(run_id,),
)
return [dict(row) for row in cursor.fetchall()]
[docs]
def find_run_by_output(self, output_path: Path) -> dict[str, Any] | None:
"""
Find which run produced a specific output file.
This is the "reverse lookup" - given a .syllables file, find out
how it was created.
Args:
output_path: Path to .syllables file to search for
Returns:
Run dictionary if found, None otherwise
Example:
>>> run = ledger.find_run_by_output(Path("data/raw/en_US/corpus_v1.syllables"))
>>> if run:
... print(f"Created by: {run['command_line']}")
... print(f"On: {run['run_timestamp']}")
"""
with self._get_connection() as conn:
cursor = conn.cursor()
cursor.execute(
"""
SELECT r.* FROM runs r
JOIN outputs o ON r.id = o.run_id
WHERE o.output_path = ?
""",
(output_path.as_posix(),),
)
row = cursor.fetchone()
return dict(row) if row else None
[docs]
def get_stats(self) -> dict[str, Any]:
"""
Get overall ledger statistics.
Returns summary stats about all recorded runs, useful for understanding
build history at a glance.
Returns:
Dictionary with statistics:
- total_runs: Total number of runs recorded
- completed_runs: Runs with status='completed'
- failed_runs: Runs with status='failed'
- tools_used: Set of unique extractor tools
- languages_used: Set of unique pyphen language codes
Example:
>>> stats = ledger.get_stats()
>>> print(f"Total runs: {stats['total_runs']}")
>>> print(f"Success rate: {stats['completed_runs']/stats['total_runs']*100:.1f}%")
>>> print(f"Tools: {', '.join(stats['tools_used'])}")
"""
with self._get_connection() as conn:
cursor = conn.cursor()
# Total runs
cursor.execute("SELECT COUNT(*) FROM runs")
total_runs = cursor.fetchone()[0]
# Completed runs
cursor.execute("SELECT COUNT(*) FROM runs WHERE status = 'completed'")
completed_runs = cursor.fetchone()[0]
# Failed runs
cursor.execute("SELECT COUNT(*) FROM runs WHERE status = 'failed'")
failed_runs = cursor.fetchone()[0]
# Unique tools
cursor.execute("SELECT DISTINCT extractor_tool FROM runs")
tools_used = {row[0] for row in cursor.fetchall()}
# Unique languages
cursor.execute("SELECT DISTINCT pyphen_lang FROM runs WHERE pyphen_lang IS NOT NULL")
languages_used = {row[0] for row in cursor.fetchall()}
return {
"total_runs": total_runs,
"completed_runs": completed_runs,
"failed_runs": failed_runs,
"tools_used": tools_used,
"languages_used": languages_used,
}