"""Run directory discovery for the syllable-walk web pipeline history.
History discovery is manifest-driven: a run is discoverable only when
``manifest.json`` exists and is parseable. This keeps the run directory itself as
the single source of truth and avoids legacy text-file parsing heuristics.
"""
from __future__ import annotations
import json
import re
from dataclasses import dataclass, field
from datetime import UTC, datetime
from pathlib import Path
from typing import Any
[docs]
@dataclass
class RunInfo:
"""Metadata about one manifest-backed pipeline run directory.
Attributes:
path: Absolute path to the run directory
run_id: Canonical run identifier (matches directory name)
extractor_type: Type of extractor ("nltk" or "pyphen")
timestamp: Run timestamp in YYYYMMDD_HHMMSS format
display_name: Human-readable display name
corpus_db_path: Path to corpus.db artifact if present and exists
annotated_json_path: Path to annotated JSON artifact if present and exists
syllable_count: Number of unique syllables from manifest metrics
selections: Dict mapping name class to selection file path
"""
path: Path
run_id: str
extractor_type: str
timestamp: str
display_name: str
corpus_db_path: Path | None
annotated_json_path: Path | None
syllable_count: int
source_path: str | None = None
files_processed: int | None = None
processing_time: str | None = None
output_tree_lines: list[str] = field(default_factory=list)
selections: dict[str, Path] = field(default_factory=dict)
status: str = "unknown"
created_at_utc: str | None = None
completed_at_utc: str | None = None
stage_statuses: dict[str, str] = field(default_factory=dict)
ipc_input_hash: str | None = None
ipc_output_hash: str | None = None
[docs]
def to_dict(self) -> dict:
"""Convert to dictionary for JSON serialization.
Returns:
Dictionary with all run metadata
"""
return {
"path": str(self.path),
"run_id": self.run_id,
"extractor_type": self.extractor_type,
"timestamp": self.timestamp,
"display_name": self.display_name,
"corpus_db_path": str(self.corpus_db_path) if self.corpus_db_path else None,
"annotated_json_path": (
str(self.annotated_json_path) if self.annotated_json_path else None
),
"syllable_count": self.syllable_count,
"source_path": self.source_path,
"files_processed": self.files_processed,
"processing_time": self.processing_time,
"output_tree_lines": self.output_tree_lines,
"selections": {k: str(v) for k, v in self.selections.items()},
"selection_count": len(self.selections),
"status": self.status,
"created_at_utc": self.created_at_utc,
"completed_at_utc": self.completed_at_utc,
"stage_statuses": self.stage_statuses,
"ipc_input_hash": self.ipc_input_hash,
"ipc_output_hash": self.ipc_output_hash,
}
_TIMESTAMP_RUN_RE = re.compile(r"^(\d{8}_\d{6})_(.+)$")
def _load_manifest(run_dir: Path) -> dict[str, Any] | None:
"""Load ``manifest.json`` for one run directory.
Returns ``None`` when the manifest file is missing or malformed.
"""
manifest_path = run_dir / "manifest.json"
if not manifest_path.exists():
return None
try:
raw = manifest_path.read_text(encoding="utf-8")
payload = json.loads(raw)
except (OSError, UnicodeDecodeError, json.JSONDecodeError):
return None
return payload if isinstance(payload, dict) else None
def _looks_like_run_directory_name(name: str) -> bool:
"""Return True when folder name follows ``YYYYMMDD_HHMMSS_<extractor>``."""
return _TIMESTAMP_RUN_RE.match(name) is not None
def _manifest_has_required_keys(manifest: dict[str, Any]) -> bool:
"""Validate required manifest structure used by the History API."""
required = (
"manifest_version",
"run_id",
"status",
"extractor",
"config",
"metrics",
"stages",
"artifacts",
)
if any(key not in manifest for key in required):
return False
if not isinstance(manifest.get("config"), dict):
return False
if not isinstance(manifest.get("metrics"), dict):
return False
if not isinstance(manifest.get("stages"), list):
return False
if not isinstance(manifest.get("artifacts"), list):
return False
return True
def _parse_timestamp(timestamp_str: str) -> datetime | None:
"""Parse timestamp string to datetime.
Args:
timestamp_str: Timestamp in YYYYMMDD_HHMMSS format
Returns:
Datetime object or None if parsing fails
"""
try:
return datetime.strptime(timestamp_str, "%Y%m%d_%H%M%S")
except ValueError:
return None
def _parse_iso_utc(timestamp: str | None) -> datetime | None:
"""Parse ``YYYY-MM-DDTHH:MM:SSZ`` timestamp into aware UTC datetime."""
if not timestamp or not isinstance(timestamp, str):
return None
try:
parsed = datetime.strptime(timestamp, "%Y-%m-%dT%H:%M:%SZ")
except ValueError:
return None
return parsed.replace(tzinfo=UTC)
def _format_processing_time(manifest: dict[str, Any]) -> str | None:
"""Build a history-friendly duration string from manifest timestamps.
Prefers run-level ``created_at_utc``/``completed_at_utc`` and falls back to
summing stage durations when a complete run-level interval is unavailable.
"""
created = _parse_iso_utc(manifest.get("created_at_utc"))
completed = _parse_iso_utc(manifest.get("completed_at_utc"))
if created and completed:
seconds = max((completed - created).total_seconds(), 0.0)
return f"{seconds:.2f}s"
durations = [
stage.get("duration_seconds")
for stage in manifest.get("stages", [])
if isinstance(stage, dict)
]
numeric = [value for value in durations if isinstance(value, (int, float))]
if numeric:
return f"{sum(float(v) for v in numeric):.2f}s"
return None
def _extract_artifact_paths(manifest: dict[str, Any]) -> tuple[str | None, str | None]:
"""Extract canonical corpus DB and annotated JSON artifact paths."""
corpus_rel: str | None = None
annotated_rel: str | None = None
for artifact in manifest.get("artifacts", []):
if not isinstance(artifact, dict):
continue
rel_path = artifact.get("path")
if not isinstance(rel_path, str):
continue
if rel_path == "data/corpus.db":
corpus_rel = rel_path
if rel_path.startswith("data/") and rel_path.endswith("_syllables_annotated.json"):
if annotated_rel is None:
annotated_rel = rel_path
return corpus_rel, annotated_rel
def _build_stage_statuses(manifest: dict[str, Any]) -> dict[str, str]:
"""Return stage status map keyed by stage name."""
stage_statuses: dict[str, str] = {}
for stage in manifest.get("stages", []):
if not isinstance(stage, dict):
continue
name = stage.get("name")
status = stage.get("status")
if isinstance(name, str) and isinstance(status, str):
stage_statuses[name] = status
return stage_statuses
def _format_display_name(
folder_name: str, extractor_type: str, syllable_count: int, selection_count: int
) -> str:
"""Format a human-readable display name for a run.
Uses the actual folder name for clarity, with syllable and selection counts.
Args:
folder_name: The actual directory name (e.g., "20260121_084017_nltk")
extractor_type: Extractor type (nltk, pyphen)
syllable_count: Number of syllables
selection_count: Number of selection files
Returns:
Formatted display name showing folder name and counts
"""
sel_info = f", {selection_count} selections" if selection_count > 0 else ""
return f"{folder_name} ({syllable_count:,} syllables{sel_info})"
def _discover_selections(run_dir: Path, extractor_type: str) -> dict[str, Path]:
"""Discover selection files in a run directory.
Args:
run_dir: Path to run directory
extractor_type: Extractor type for filename prefix
Returns:
Dict mapping name class (e.g., "first_name") to file path
"""
selections_dir = run_dir / "selections"
if not selections_dir.exists():
return {}
selections = {}
prefix = f"{extractor_type}_"
# Selection files follow the naming convention:
# {extractor}_{name_class}_{N}syl.json
# e.g. "nltk_first_name_2syl.json".
for json_file in selections_dir.glob(f"{prefix}*_*.json"):
filename = json_file.stem # e.g. "nltk_first_name_2syl"
if filename.endswith("_meta"):
continue
# Strip the extractor prefix to isolate the name class + syllable
# count portion (e.g. "first_name_2syl").
name_part = filename[len(prefix) :]
# rsplit("_", 1) splits from the right to handle compound name
# classes like "first_name" — splitting from the left would break
# on the underscore within the class name.
parts = name_part.rsplit("_", 1) # ["first_name", "2syl"]
if len(parts) == 2 and parts[1].endswith("syl"):
name_class = parts[0] # e.g., "first_name"
selections[name_class] = json_file
return selections
def _build_output_tree_lines(
run_name: str,
artifacts: list[dict[str, Any]],
syllable_count: int,
) -> list[str]:
"""Build a deterministic compact tree from manifest artifact paths."""
lines: list[str] = [f"{run_name}/"]
normalized_paths: list[str] = []
for artifact in artifacts:
if not isinstance(artifact, dict):
continue
rel_path = artifact.get("path")
if isinstance(rel_path, str):
normalized_paths.append(rel_path)
unique_paths = sorted(set(normalized_paths))
for idx, rel_path in enumerate(unique_paths):
connector = "└── " if idx == len(unique_paths) - 1 else "├── "
note = ""
if rel_path == "data/corpus.db":
note = f" {syllable_count:,} syllables"
elif rel_path.startswith("data/") and rel_path.endswith("_syllables_annotated.json"):
note = " annotated data"
lines.append(f"{connector}{rel_path}{note}")
return lines
[docs]
def discover_runs(base_path: Path | None = None) -> list[RunInfo]:
"""Discover all pipeline run directories.
Scans _working/output/ (or specified base path) for directories matching
the pattern YYYYMMDD_HHMMSS_{extractor}. Returns metadata for all valid
runs found, sorted by timestamp (newest first).
Args:
base_path: Directory to scan. Default: _working/output/
Returns:
List of RunInfo objects, sorted by timestamp (newest first)
Examples:
>>> runs = discover_runs()
>>> len(runs)
2
>>> runs[0].extractor_type
'nltk'
"""
if base_path is None:
base_path = Path("_working/output")
if not base_path.exists():
return []
runs = []
for run_dir in base_path.iterdir():
if not run_dir.is_dir():
continue
dir_name = run_dir.name
if not _looks_like_run_directory_name(dir_name):
continue
manifest = _load_manifest(run_dir)
if manifest is None or not _manifest_has_required_keys(manifest):
continue
run_id = manifest.get("run_id")
if not isinstance(run_id, str) or run_id != dir_name:
continue
extractor_type = manifest.get("extractor")
if not isinstance(extractor_type, str) or not extractor_type:
continue
timestamp_match = _TIMESTAMP_RUN_RE.match(run_id)
if timestamp_match is None:
continue
timestamp = timestamp_match.group(1)
metrics = manifest.get("metrics", {})
config = manifest.get("config", {})
syllable_count = metrics.get("syllable_count_unique")
if not isinstance(syllable_count, int) or syllable_count < 0:
syllable_count = 0
source_path = config.get("source_path")
if not isinstance(source_path, str):
source_path = None
files_processed = metrics.get("files_processed")
if not isinstance(files_processed, int):
files_processed = None
processing_time = _format_processing_time(manifest)
selections = _discover_selections(run_dir, extractor_type)
artifacts = manifest.get("artifacts", [])
output_tree_lines = _build_output_tree_lines(run_id, artifacts, syllable_count)
corpus_rel, annotated_rel = _extract_artifact_paths(manifest)
corpus_db_path = (run_dir / corpus_rel) if corpus_rel else None
if corpus_db_path and not corpus_db_path.exists():
corpus_db_path = None
annotated_json_path = (run_dir / annotated_rel) if annotated_rel else None
if annotated_json_path and not annotated_json_path.exists():
annotated_json_path = None
display_name = _format_display_name(
dir_name, extractor_type, syllable_count, len(selections)
)
ipc = manifest.get("ipc", {})
ipc_input_hash = ipc.get("input_hash") if isinstance(ipc, dict) else None
ipc_output_hash = ipc.get("output_hash") if isinstance(ipc, dict) else None
if not isinstance(ipc_input_hash, str):
ipc_input_hash = None
if not isinstance(ipc_output_hash, str):
ipc_output_hash = None
runs.append(
RunInfo(
path=run_dir.resolve(),
run_id=run_id,
extractor_type=extractor_type,
timestamp=timestamp,
display_name=display_name,
corpus_db_path=corpus_db_path.resolve() if corpus_db_path else None,
annotated_json_path=(
annotated_json_path.resolve() if annotated_json_path else None
),
syllable_count=syllable_count,
source_path=source_path,
files_processed=files_processed,
processing_time=processing_time,
output_tree_lines=output_tree_lines,
selections=selections,
status=str(manifest.get("status", "unknown")),
created_at_utc=(
manifest.get("created_at_utc")
if isinstance(manifest.get("created_at_utc"), str)
else None
),
completed_at_utc=(
manifest.get("completed_at_utc")
if isinstance(manifest.get("completed_at_utc"), str)
else None
),
stage_statuses=_build_stage_statuses(manifest),
ipc_input_hash=ipc_input_hash,
ipc_output_hash=ipc_output_hash,
)
)
# Deterministic ordering:
# 1) timestamp descending (newest first)
# 2) folder name ascending when timestamps match
runs.sort(key=lambda r: r.path.name)
runs.sort(key=lambda r: r.timestamp, reverse=True)
return runs
[docs]
def get_selection_data(selection_path: Path) -> dict:
"""Load selection data from a JSON file.
Args:
selection_path: Path to selection JSON file
Returns:
Dictionary with metadata and selections list
Raises:
FileNotFoundError: If file doesn't exist
json.JSONDecodeError: If file is not valid JSON
"""
with open(selection_path, encoding="utf-8") as f:
data: dict = json.load(f)
return data
[docs]
def get_run_by_id(run_id: str, base_path: Path | None = None) -> RunInfo | None:
"""Get a specific run by its directory name.
Args:
run_id: Run directory name (e.g., "20260121_084017_nltk")
base_path: Base path to search. Default: _working/output/
Returns:
RunInfo for the run, or None if not found
"""
runs = discover_runs(base_path)
for run in runs:
if run.run_id == run_id:
return run
return None