"""
Corpus extraction run ledger - observational database for build provenance.
This module provides the CorpusLedger class, which manages SQLite-based tracking
of all syllable extraction runs. The ledger records who ran what extraction tool,
when, with which settings, and what outputs were produced.
Critical Design Principle:
The ledger is **observational only** - it records what happened but does not
influence extraction behavior. Extractors remain pure, deterministic functions.
The ledger just watches and remembers.
Typical Usage:
>>> from build_tools.corpus_db import CorpusLedger
>>> from pathlib import Path
>>>
>>> # Initialize ledger (finds or creates database)
>>> ledger = CorpusLedger()
>>>
>>> # Start a new extraction run
>>> run_id = ledger.start_run(
... extractor_tool="syllable_extractor",
... extractor_version="0.2.0",
... pyphen_lang="en_US",
... min_len=2,
... max_len=8,
... command_line="python -m build_tools.syllable_extractor --file input.txt"
... )
>>>
>>> # Record input sources
>>> ledger.record_input(run_id, Path("data/corpus/english.txt"))
>>>
>>> # ... extraction happens (ledger doesn't participate) ...
>>>
>>> # Record outputs
>>> ledger.record_output(
... run_id,
... output_path=Path("data/raw/en_US/corpus_v1.syllables"),
... syllable_count=5432,
... unique_syllable_count=1234,
... meta_path=Path("data/raw/en_US/corpus_v1.meta")
... )
>>>
>>> # Mark run complete
>>> ledger.complete_run(run_id, exit_code=0, status="completed")
>>>
>>> # Query runs later
>>> runs = ledger.get_runs_by_tool("syllable_extractor")
>>> recent = ledger.get_recent_runs(limit=10)
"""
import socket
import sqlite3
from contextlib import contextmanager
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Iterator, Optional
from .schema import SCHEMA_VERSION, get_all_ddl_statements
[docs]
class CorpusLedger:
"""
Manages the corpus extraction run ledger database.
The CorpusLedger provides a simple API for recording extraction runs,
their inputs, outputs, and outcomes. All operations are append-only -
runs are never modified or deleted once recorded.
The database file location is configurable but defaults to:
- data/raw/syllable_extractor.db
Attributes:
db_path: Path to the SQLite database file
_conn: Active database connection (None if not connected)
Example:
>>> ledger = CorpusLedger()
>>> run_id = ledger.start_run(
... extractor_tool="syllable_extractor",
... extractor_version="0.2.0",
... pyphen_lang="en_US"
... )
>>> ledger.complete_run(run_id, exit_code=0, status="completed")
"""
[docs]
def __init__(self, db_path: Optional[Path] = None):
"""
Initialize the corpus ledger.
Creates the database and tables if they don't exist. If the database
already exists, validates the schema version.
Args:
db_path: Path to SQLite database file. If None, defaults to
data/raw/syllable_extractor.db in the project root.
Raises:
sqlite3.Error: If database initialization fails
Example:
>>> # Use default location
>>> ledger = CorpusLedger()
>>>
>>> # Use custom location
>>> ledger = CorpusLedger(Path("_working/test.db"))
"""
if db_path is None:
# Default to data/raw/syllable_extractor.db
# Assumes we're running from project root or tests
project_root = Path.cwd()
db_path = project_root / "data" / "raw" / "syllable_extractor.db"
self.db_path = db_path
self._conn: Optional[sqlite3.Connection] = None
# Ensure parent directory exists
self.db_path.parent.mkdir(parents=True, exist_ok=True)
# Initialize database
self._initialize_db()
def _initialize_db(self) -> None:
"""
Create database tables and indexes if they don't exist.
This method is idempotent - safe to call multiple times. If the
database already exists with the correct schema, no changes are made.
Raises:
sqlite3.Error: If database creation fails
"""
with self._get_connection() as conn:
cursor = conn.cursor()
# Execute all DDL statements
for ddl in get_all_ddl_statements():
cursor.execute(ddl)
# Check if schema version is recorded
cursor.execute(
"SELECT version FROM schema_version WHERE version = ?", (SCHEMA_VERSION,)
)
if cursor.fetchone() is None:
# Record schema version
now = datetime.now(timezone.utc).isoformat()
cursor.execute(
"INSERT INTO schema_version (version, applied_at) VALUES (?, ?)",
(SCHEMA_VERSION, now),
)
conn.commit()
@contextmanager
def _get_connection(self) -> Iterator[sqlite3.Connection]:
"""
Context manager for database connections.
Ensures connections are properly managed and closed. Uses row factory
for dict-like row access.
Yields:
Active SQLite connection
Example:
>>> with ledger._get_connection() as conn:
... cursor = conn.cursor()
... cursor.execute("SELECT * FROM runs")
"""
if self._conn is None:
self._conn = sqlite3.connect(self.db_path)
self._conn.row_factory = sqlite3.Row
try:
yield self._conn
finally:
# Keep connection open for reuse, but ensure changes are committed
self._conn.commit()
[docs]
def close(self) -> None:
"""
Close the database connection.
Should be called when done with the ledger. Using the ledger as a
context manager (with statement) is preferred as it handles cleanup
automatically.
Example:
>>> ledger = CorpusLedger()
>>> # ... use ledger ...
>>> ledger.close()
>>>
>>> # Preferred: use context manager
>>> with CorpusLedger() as ledger:
... ledger.start_run(...)
"""
if self._conn is not None:
self._conn.close()
self._conn = None
[docs]
def __enter__(self) -> "CorpusLedger":
"""Context manager entry."""
return self
[docs]
def __exit__(self, exc_type, exc_val, exc_tb) -> None:
"""Context manager exit - ensures connection is closed."""
self.close()
[docs]
def start_run(
self,
extractor_tool: str,
extractor_version: Optional[str] = None,
pyphen_lang: Optional[str] = None,
auto_lang_detected: Optional[str] = None,
min_len: Optional[int] = None,
max_len: Optional[int] = None,
recursive: bool = False,
pattern: Optional[str] = None,
command_line: Optional[str] = None,
notes: Optional[str] = None,
) -> int:
"""
Record the start of a new extraction run.
Creates a new run record with status='running' and returns the run ID.
The caller should use this ID to record inputs, outputs, and eventually
mark the run complete or failed.
Args:
extractor_tool: Name of the extraction tool (e.g., 'syllable_extractor',
'syllable_extractor_nltk', 'syllable_extractor_espeak')
extractor_version: Version string or git SHA of the tool
pyphen_lang: Pyphen language code (NULL for non-pyphen tools)
auto_lang_detected: Auto-detected language code if auto-detection was used
min_len: Minimum syllable length constraint
max_len: Maximum syllable length constraint
recursive: Whether source directory was processed recursively
pattern: File pattern filter (e.g., '*.txt')
command_line: Full command-line invocation for reproducibility
notes: User-provided annotations about this run
Returns:
Unique run ID (integer) for this extraction run
Example:
>>> run_id = ledger.start_run(
... extractor_tool="syllable_extractor",
... extractor_version="0.2.0",
... pyphen_lang="en_US",
... min_len=2,
... max_len=8,
... command_line="python -m build_tools.syllable_extractor --file input.txt",
... notes="Testing new corpus from Project Gutenberg"
... )
>>> print(f"Started run {run_id}")
Started run 42
"""
# Get current timestamp in ISO 8601 format (UTC)
timestamp = datetime.now(timezone.utc).isoformat()
# Get hostname for tracking which machine ran this
hostname = socket.gethostname()
with self._get_connection() as conn:
cursor = conn.cursor()
cursor.execute(
"""
INSERT INTO runs (
run_timestamp, extractor_tool, extractor_version, hostname,
status, pyphen_lang, auto_lang_detected, min_len, max_len,
recursive, pattern, command_line, notes
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
timestamp,
extractor_tool,
extractor_version,
hostname,
"running", # Initial status
pyphen_lang,
auto_lang_detected,
min_len,
max_len,
1 if recursive else 0,
pattern,
command_line,
notes,
),
)
conn.commit()
run_id = cursor.lastrowid
# lastrowid is guaranteed to be not None after INSERT with AUTOINCREMENT
assert run_id is not None, "Failed to get run_id from database"
return run_id
[docs]
def record_output(
self,
run_id: int,
output_path: Path,
syllable_count: Optional[int] = None,
unique_syllable_count: Optional[int] = None,
meta_path: Optional[Path] = None,
) -> None:
"""
Record an output file for a run.
Associates an output .syllables file with an extraction run. Multiple
outputs can be recorded for a single run (e.g., batch processing).
Note: Paths are stored in POSIX format (forward slashes) for cross-platform
compatibility.
Args:
run_id: Run ID from start_run()
output_path: Path to generated .syllables file
syllable_count: Total number of syllables (including duplicates)
unique_syllable_count: Number of unique syllables
meta_path: Path to corresponding .meta file (if generated)
Example:
>>> ledger.record_output(
... run_id=42,
... output_path=Path("data/raw/en_US/corpus_v1.syllables"),
... syllable_count=5432,
... unique_syllable_count=1234,
... meta_path=Path("data/raw/en_US/corpus_v1.meta")
... )
"""
with self._get_connection() as conn:
cursor = conn.cursor()
cursor.execute(
"""
INSERT INTO outputs (
run_id, output_path, syllable_count,
unique_syllable_count, meta_path
) VALUES (?, ?, ?, ?, ?)
""",
(
run_id,
output_path.as_posix(),
syllable_count,
unique_syllable_count,
meta_path.as_posix() if meta_path else None,
),
)
conn.commit()
[docs]
def complete_run(
self,
run_id: int,
exit_code: int,
status: str = "completed",
) -> None:
"""
Mark a run as complete or failed.
Updates the run status and exit code. This should be called when
extraction finishes, whether successfully or with errors.
Args:
run_id: Run ID from start_run()
exit_code: Unix exit code (0 = success, non-zero = failure)
status: Final status - one of 'completed', 'failed', 'interrupted'
Raises:
ValueError: If status is not a valid value
Example:
>>> # Successful run
>>> ledger.complete_run(run_id, exit_code=0, status="completed")
>>>
>>> # Failed run
>>> ledger.complete_run(run_id, exit_code=1, status="failed")
"""
valid_statuses = {"completed", "failed", "interrupted"}
if status not in valid_statuses:
raise ValueError(f"Invalid status '{status}'. Must be one of {valid_statuses}")
with self._get_connection() as conn:
cursor = conn.cursor()
cursor.execute(
"""
UPDATE runs
SET exit_code = ?, status = ?
WHERE id = ?
""",
(exit_code, status, run_id),
)
conn.commit()
[docs]
def get_run(self, run_id: int) -> Optional[dict[str, Any]]:
"""
Get details for a specific run.
Args:
run_id: Run ID to fetch
Returns:
Dictionary with run details, or None if run_id doesn't exist
Example:
>>> run = ledger.get_run(42)
>>> if run:
... print(f"Tool: {run['extractor_tool']}")
... print(f"Status: {run['status']}")
... print(f"Command: {run['command_line']}")
"""
with self._get_connection() as conn:
cursor = conn.cursor()
cursor.execute("SELECT * FROM runs WHERE id = ?", (run_id,))
row = cursor.fetchone()
return dict(row) if row else None
[docs]
def get_recent_runs(self, limit: int = 10) -> list[dict[str, Any]]:
"""
Get the most recent extraction runs.
Args:
limit: Maximum number of runs to return (default: 10)
Returns:
List of run dictionaries, ordered by timestamp descending (newest first)
Example:
>>> recent = ledger.get_recent_runs(limit=5)
>>> for run in recent:
... print(f"{run['run_timestamp']}: {run['extractor_tool']}")
"""
with self._get_connection() as conn:
cursor = conn.cursor()
cursor.execute(
"""
SELECT * FROM runs
ORDER BY run_timestamp DESC
LIMIT ?
""",
(limit,),
)
return [dict(row) for row in cursor.fetchall()]
[docs]
def get_run_outputs(self, run_id: int) -> list[dict[str, Any]]:
"""
Get all outputs for a run.
Args:
run_id: Run ID to fetch outputs for
Returns:
List of output dictionaries with paths and syllable counts
Example:
>>> outputs = ledger.get_run_outputs(42)
>>> for out in outputs:
... print(f"Output: {out['output_path']}")
... print(f" Unique syllables: {out['unique_syllable_count']}")
"""
with self._get_connection() as conn:
cursor = conn.cursor()
cursor.execute(
"SELECT * FROM outputs WHERE run_id = ?",
(run_id,),
)
return [dict(row) for row in cursor.fetchall()]
[docs]
def find_run_by_output(self, output_path: Path) -> Optional[dict[str, Any]]:
"""
Find which run produced a specific output file.
This is the "reverse lookup" - given a .syllables file, find out
how it was created.
Args:
output_path: Path to .syllables file to search for
Returns:
Run dictionary if found, None otherwise
Example:
>>> run = ledger.find_run_by_output(Path("data/raw/en_US/corpus_v1.syllables"))
>>> if run:
... print(f"Created by: {run['command_line']}")
... print(f"On: {run['run_timestamp']}")
"""
with self._get_connection() as conn:
cursor = conn.cursor()
cursor.execute(
"""
SELECT r.* FROM runs r
JOIN outputs o ON r.id = o.run_id
WHERE o.output_path = ?
""",
(output_path.as_posix(),),
)
row = cursor.fetchone()
return dict(row) if row else None
[docs]
def get_stats(self) -> dict[str, Any]:
"""
Get overall ledger statistics.
Returns summary stats about all recorded runs, useful for understanding
build history at a glance.
Returns:
Dictionary with statistics:
- total_runs: Total number of runs recorded
- completed_runs: Runs with status='completed'
- failed_runs: Runs with status='failed'
- tools_used: Set of unique extractor tools
- languages_used: Set of unique pyphen language codes
Example:
>>> stats = ledger.get_stats()
>>> print(f"Total runs: {stats['total_runs']}")
>>> print(f"Success rate: {stats['completed_runs']/stats['total_runs']*100:.1f}%")
>>> print(f"Tools: {', '.join(stats['tools_used'])}")
"""
with self._get_connection() as conn:
cursor = conn.cursor()
# Total runs
cursor.execute("SELECT COUNT(*) FROM runs")
total_runs = cursor.fetchone()[0]
# Completed runs
cursor.execute("SELECT COUNT(*) FROM runs WHERE status = 'completed'")
completed_runs = cursor.fetchone()[0]
# Failed runs
cursor.execute("SELECT COUNT(*) FROM runs WHERE status = 'failed'")
failed_runs = cursor.fetchone()[0]
# Unique tools
cursor.execute("SELECT DISTINCT extractor_tool FROM runs")
tools_used = {row[0] for row in cursor.fetchall()}
# Unique languages
cursor.execute("SELECT DISTINCT pyphen_lang FROM runs WHERE pyphen_lang IS NOT NULL")
languages_used = {row[0] for row in cursor.fetchall()}
return {
"total_runs": total_runs,
"completed_runs": completed_runs,
"failed_runs": failed_runs,
"tools_used": tools_used,
"languages_used": languages_used,
}