build_tools.pipeline_tui.services.pipeline

Pipeline execution service for Pipeline TUI.

This module provides subprocess-based execution of the syllable extraction pipeline stages (extraction, normalization, annotation) with progress monitoring, cancellation support, and log capture.

Design Principles:

  • Non-blocking execution via asyncio subprocess

  • Cancellation support via process termination

  • Real-time log capture and progress updates

  • Clean error handling with informative messages

Usage Example:

>>> from build_tools.pipeline_tui.services.pipeline import PipelineExecutor
>>> from build_tools.pipeline_tui.core.state import ExtractionConfig, ExtractorType
>>>
>>> config = ExtractionConfig(
...     extractor_type=ExtractorType.PYPHEN,
...     source_path=Path("/data/corpus"),
...     output_dir=Path("_working/output"),
...     language="en_US",
... )
>>>
>>> executor = PipelineExecutor()
>>> result = await executor.run_pipeline(
...     config=config,
...     run_normalize=True,
...     run_annotate=True,
...     on_progress=lambda stage, pct, msg: print(f"{stage}: {pct}% - {msg}"),
... )

Attributes

ProgressCallback

Classes

StageResult

Result from executing a single pipeline stage.

PipelineResult

Result from executing the full pipeline.

PipelineExecutor

Executes pipeline stages as subprocesses with progress monitoring.

Module Contents

build_tools.pipeline_tui.services.pipeline.ProgressCallback
class build_tools.pipeline_tui.services.pipeline.StageResult[source]

Result from executing a single pipeline stage.

stage

Name of the stage (extraction, normalization, annotation)

success

Whether the stage completed successfully

output_path

Path to the output (run directory or file)

return_code

Process return code

stdout

Captured standard output

stderr

Captured standard error

duration_seconds

How long the stage took

error_message

Error message if stage failed

stage: str
success: bool
output_path: pathlib.Path | None = None
return_code: int = 0
stdout: str = ''
stderr: str = ''
duration_seconds: float = 0.0
error_message: str = ''
class build_tools.pipeline_tui.services.pipeline.PipelineResult[source]

Result from executing the full pipeline.

success

Whether all stages completed successfully

stages

List of individual stage results

run_directory

Path to the output run directory

cancelled

Whether the pipeline was cancelled

total_duration_seconds

Total pipeline duration

success: bool
stages: list[StageResult] = []
run_directory: pathlib.Path | None = None
cancelled: bool = False
total_duration_seconds: float = 0.0
class build_tools.pipeline_tui.services.pipeline.PipelineExecutor[source]

Executes pipeline stages as subprocesses with progress monitoring.

This class manages the execution of extraction, normalization, and annotation stages as separate Python subprocesses. It provides:

  • Real-time stdout/stderr capture

  • Progress updates via callbacks

  • Cancellation support

  • Clean error handling

_current_process

Currently running subprocess (for cancellation)

_cancelled

Flag indicating if cancellation was requested

Example

>>> executor = PipelineExecutor()
>>> result = await executor.run_pipeline(config, on_progress=callback)
>>> if result.success:
...     print(f"Output: {result.run_directory}")

Initialize the pipeline executor.

async run_pipeline(config, run_normalize=True, run_annotate=True, on_progress=None, on_log=None)[source]

Execute the full pipeline with configured stages.

Runs extraction, then optionally normalization and annotation. Progress is reported via callbacks for UI updates.

Parameters:
  • config (build_tools.pipeline_tui.core.state.ExtractionConfig) – Extraction configuration specifying source, output, etc.

  • run_normalize (bool) – Whether to run normalization after extraction

  • run_annotate (bool) – Whether to run annotation after normalization

  • on_progress (ProgressCallback | None) – Callback for progress updates (stage, percent, message)

  • on_log (Callable[[str], None] | None) – Callback for log messages

Returns:

PipelineResult with success status and stage results

Raises:

ValueError – If config is invalid

Return type:

PipelineResult

async cancel()[source]

Cancel the currently running pipeline.

Terminates the current subprocess if one is running.