build_tools.pipeline_tui.services.pipeline
Pipeline execution service for Pipeline TUI.
This module provides subprocess-based execution of the syllable extraction pipeline stages (extraction, normalization, annotation) with progress monitoring, cancellation support, and log capture.
Design Principles:
Non-blocking execution via asyncio subprocess
Cancellation support via process termination
Real-time log capture and progress updates
Clean error handling with informative messages
Usage Example:
>>> from build_tools.pipeline_tui.services.pipeline import PipelineExecutor
>>> from build_tools.pipeline_tui.core.state import ExtractionConfig, ExtractorType
>>>
>>> config = ExtractionConfig(
... extractor_type=ExtractorType.PYPHEN,
... source_path=Path("/data/corpus"),
... output_dir=Path("_working/output"),
... language="en_US",
... )
>>>
>>> executor = PipelineExecutor()
>>> result = await executor.run_pipeline(
... config=config,
... run_normalize=True,
... run_annotate=True,
... on_progress=lambda stage, pct, msg: print(f"{stage}: {pct}% - {msg}"),
... )
Attributes
Classes
Result from executing a single pipeline stage. |
|
Result from executing the full pipeline. |
|
Executes pipeline stages as subprocesses with progress monitoring. |
Module Contents
- build_tools.pipeline_tui.services.pipeline.ProgressCallback
- class build_tools.pipeline_tui.services.pipeline.StageResult[source]
Result from executing a single pipeline stage.
- stage
Name of the stage (extraction, normalization, annotation)
- success
Whether the stage completed successfully
- output_path
Path to the output (run directory or file)
- return_code
Process return code
- stdout
Captured standard output
- stderr
Captured standard error
- duration_seconds
How long the stage took
- error_message
Error message if stage failed
- output_path: pathlib.Path | None = None
- class build_tools.pipeline_tui.services.pipeline.PipelineResult[source]
Result from executing the full pipeline.
- success
Whether all stages completed successfully
- stages
List of individual stage results
- run_directory
Path to the output run directory
- cancelled
Whether the pipeline was cancelled
- total_duration_seconds
Total pipeline duration
- stages: list[StageResult] = []
- run_directory: pathlib.Path | None = None
- class build_tools.pipeline_tui.services.pipeline.PipelineExecutor[source]
Executes pipeline stages as subprocesses with progress monitoring.
This class manages the execution of extraction, normalization, and annotation stages as separate Python subprocesses. It provides:
Real-time stdout/stderr capture
Progress updates via callbacks
Cancellation support
Clean error handling
- _current_process
Currently running subprocess (for cancellation)
- _cancelled
Flag indicating if cancellation was requested
Example
>>> executor = PipelineExecutor() >>> result = await executor.run_pipeline(config, on_progress=callback) >>> if result.success: ... print(f"Output: {result.run_directory}")
Initialize the pipeline executor.
- async run_pipeline(config, run_normalize=True, run_annotate=True, on_progress=None, on_log=None)[source]
Execute the full pipeline with configured stages.
Runs extraction, then optionally normalization and annotation. Progress is reported via callbacks for UI updates.
- Parameters:
config (build_tools.pipeline_tui.core.state.ExtractionConfig) – Extraction configuration specifying source, output, etc.
run_normalize (bool) – Whether to run normalization after extraction
run_annotate (bool) – Whether to run annotation after normalization
on_progress (ProgressCallback | None) – Callback for progress updates (stage, percent, message)
on_log (Callable[[str], None] | None) – Callback for log messages
- Returns:
PipelineResult with success status and stage results
- Raises:
ValueError – If config is invalid
- Return type: