Source code for build_tools.syllable_walk_web.api.walker_session

"""Session-focused walker API helpers and handlers.

This module isolates session save/list/load behavior from ``api/walker.py``.
The extraction is mechanical: endpoint behavior and payload contracts are
preserved, while dependencies are passed as callables where needed so the
legacy wrapper functions in ``walker.py`` remain the patch/test authority.
"""

from __future__ import annotations

import json
from pathlib import Path
from typing import Any, Callable, cast

from build_tools.syllable_walk_web.api.walker_types import (
    ErrorResponse,
    ErrorWithLockResponse,
    RestorePatchArtifactsResult,
    SessionListEntry,
    SessionLoadPatchResult,
    SessionLoadResponse,
    SessionSaveResponse,
    SessionsResponse,
)
from build_tools.syllable_walk_web.state import PatchState, ServerState

EnforceActiveLockFn = Callable[[dict[str, Any], ServerState], dict[str, Any] | None]
CoerceLockHolderFn = Callable[[dict[str, Any]], tuple[str | None, str | None]]
LockConflictErrorFn = Callable[..., dict[str, Any]]
LoadCorpusFn = Callable[[dict[str, Any], ServerState], dict[str, Any]]
RestorePatchArtifactsFn = Callable[..., RestorePatchArtifactsResult]
ReadJsonObjectFn = Callable[[Path], dict[str, Any] | None]


def _build_restore_result(
    *,
    status: str,
    reason: str,
    restored: bool,
    restored_kinds: list[str],
    run_state_ipc_input_hash: str | None,
    run_state_ipc_output_hash: str | None,
) -> RestorePatchArtifactsResult:
    """Build a run-state restore result payload with consistent shape."""

    return {
        "status": status,
        "reason": reason,
        "restored": restored,
        "restored_kinds": restored_kinds,
        "run_state_ipc_input_hash": run_state_ipc_input_hash,
        "run_state_ipc_output_hash": run_state_ipc_output_hash,
    }


def _patch_load_failure_result(
    *,
    verification_status: str,
    verification_reason: str,
    run_id: str | None = None,
) -> SessionLoadPatchResult:
    """Build one deterministic failed patch-load result block."""

    return {
        "loaded": False,
        "restored": False,
        "verification_status": verification_status,
        "verification_reason": verification_reason,
        "run_id": run_id,
    }


[docs] def handle_save_session( body: dict[str, Any], state: ServerState, *, enforce_active_session_lock_fn: EnforceActiveLockFn, ) -> SessionSaveResponse | ErrorResponse: """Handle POST ``/api/walker/save-session`` with injected lock enforcement.""" lock_error = enforce_active_session_lock_fn(body, state) if lock_error is not None: return cast(ErrorWithLockResponse, lock_error) from build_tools.syllable_walk_web.services.session_paths import resolve_sessions_base from build_tools.syllable_walk_web.services.walker_session_store import save_session label = body.get("label") session_id = body.get("session_id") repair_from_session_id = body.get("repair_from_session_id") if label is not None and not isinstance(label, str): return {"error": "label must be a string or null."} if session_id is not None and not isinstance(session_id, str): return {"error": "session_id must be a string or null."} if repair_from_session_id is not None and not isinstance(repair_from_session_id, str): return {"error": "repair_from_session_id must be a string or null."} try: result = save_session( state=state, label=label, session_id=session_id, repair_from_session_id=repair_from_session_id, ) except Exception as e: return {"error": f"Session save failed: {e}"} return { "status": result.status, "reason": result.reason, "session_id": result.session_id, "session_path": str(result.session_path) if isinstance(result.session_path, Path) else None, "sessions_base": str( resolve_sessions_base( output_base=state.output_base, configured_sessions_base=state.sessions_base, ) ), "patch_a": { "status": result.patch_a_status, "reason": result.patch_a_reason, }, "patch_b": { "status": result.patch_b_status, "reason": result.patch_b_reason, }, "ipc_input_hash": result.ipc_input_hash, "ipc_output_hash": result.ipc_output_hash, "root_session_id": result.root_session_id, "parent_session_id": result.parent_session_id, "revision": result.revision, }
[docs] def handle_sessions(state: ServerState) -> SessionsResponse | ErrorResponse: """Handle GET ``/api/walker/sessions``.""" from build_tools.syllable_walk_web.services.walker_session_lock import get_session_lock_info from build_tools.syllable_walk_web.services.walker_session_store import list_sessions try: entries = list_sessions( output_base=state.output_base, configured_sessions_base=state.sessions_base, ) except Exception as e: return {"error": f"Session listing failed: {e}"} serialized_sessions: list[SessionListEntry] = [] for entry in entries: lock_info = get_session_lock_info( state=state, session_id=entry.session_id, ) serialized_sessions.append( { "session_id": entry.session_id, "created_at_utc": entry.created_at_utc, "label": entry.label, "patch_a_run_id": entry.patch_a_run_id, "patch_b_run_id": entry.patch_b_run_id, "verification_status": entry.verification_status, "verification_reason": entry.verification_reason, "session_path": str(entry.session_path), "root_session_id": entry.root_session_id, "parent_session_id": entry.parent_session_id, "revision": entry.revision, "lock_status": lock_info.get("status"), "lock": lock_info.get("lock"), } ) return {"sessions": serialized_sessions}
[docs] def read_json_object(path: Path) -> dict[str, Any] | None: """Read one JSON object from ``path``. Returns ``None`` on IO, decode, parse, or type failures. """ try: payload = json.loads(path.read_text(encoding="utf-8")) except (OSError, UnicodeDecodeError, json.JSONDecodeError): return None if not isinstance(payload, dict): return None return payload
[docs] def restore_patch_artifacts_from_run_state( *, patch_key: str, patch: PatchState, read_json_object_fn: ReadJsonObjectFn = read_json_object, ) -> RestorePatchArtifactsResult: """Restore patch artifacts from verified run-state sidecars.""" if not isinstance(patch.run_id, str) or not patch.run_id.strip(): return _build_restore_result( status="skipped", reason="run-state-context-missing:run-id", restored=False, restored_kinds=[], run_state_ipc_input_hash=None, run_state_ipc_output_hash=None, ) if not isinstance(patch.corpus_dir, Path): return _build_restore_result( status="skipped", reason="run-state-context-missing:run-dir", restored=False, restored_kinds=[], run_state_ipc_input_hash=None, run_state_ipc_output_hash=None, ) from build_tools.syllable_walk_web.services.walker_run_state_store import load_run_state run_state_result = load_run_state( run_dir=patch.corpus_dir, run_id=patch.run_id, manifest_ipc_output_hash=patch.manifest_ipc_output_hash, ) if run_state_result.status != "verified" or not isinstance(run_state_result.payload, dict): return _build_restore_result( status=run_state_result.status, reason=run_state_result.reason, restored=False, restored_kinds=[], run_state_ipc_input_hash=run_state_result.run_state_ipc_input_hash, run_state_ipc_output_hash=run_state_result.run_state_ipc_output_hash, ) raw_sidecars = run_state_result.payload.get("sidecars") if not isinstance(raw_sidecars, dict): return _build_restore_result( status="mismatch", reason="run-state-sidecars-missing", restored=False, restored_kinds=[], run_state_ipc_input_hash=run_state_result.run_state_ipc_input_hash, run_state_ipc_output_hash=run_state_result.run_state_ipc_output_hash, ) run_dir_resolved = patch.corpus_dir.resolve() restored_kinds: list[str] = [] for artifact_kind in ("walks", "candidates", "selections", "package"): slot = f"patch_{patch_key}_{artifact_kind}" ref = raw_sidecars.get(slot) if ref is None: continue if not isinstance(ref, dict): return _build_restore_result( status="mismatch", reason=f"run-state-sidecar-ref-invalid:{slot}", restored=False, restored_kinds=restored_kinds, run_state_ipc_input_hash=run_state_result.run_state_ipc_input_hash, run_state_ipc_output_hash=run_state_result.run_state_ipc_output_hash, ) relative_path = ref.get("relative_path") if not isinstance(relative_path, str) or not relative_path: return _build_restore_result( status="mismatch", reason=f"run-state-sidecar-relative-path-invalid:{slot}", restored=False, restored_kinds=restored_kinds, run_state_ipc_input_hash=run_state_result.run_state_ipc_input_hash, run_state_ipc_output_hash=run_state_result.run_state_ipc_output_hash, ) sidecar_path = (patch.corpus_dir / relative_path).resolve() if not str(sidecar_path).startswith(str(run_dir_resolved)): return _build_restore_result( status="mismatch", reason=f"run-state-sidecar-path-outside-run-dir:{slot}", restored=False, restored_kinds=restored_kinds, run_state_ipc_input_hash=run_state_result.run_state_ipc_input_hash, run_state_ipc_output_hash=run_state_result.run_state_ipc_output_hash, ) if not sidecar_path.exists(): return _build_restore_result( status="missing", reason=f"run-state-sidecar-missing:{slot}", restored=False, restored_kinds=restored_kinds, run_state_ipc_input_hash=run_state_result.run_state_ipc_input_hash, run_state_ipc_output_hash=run_state_result.run_state_ipc_output_hash, ) sidecar_payload = read_json_object_fn(sidecar_path) if sidecar_payload is None: return _build_restore_result( status="error", reason=f"run-state-sidecar-parse-error:{slot}", restored=False, restored_kinds=restored_kinds, run_state_ipc_input_hash=run_state_result.run_state_ipc_input_hash, run_state_ipc_output_hash=run_state_result.run_state_ipc_output_hash, ) payload_block = sidecar_payload.get("payload") if not isinstance(payload_block, dict): return _build_restore_result( status="mismatch", reason=f"run-state-sidecar-payload-invalid:{slot}", restored=False, restored_kinds=restored_kinds, run_state_ipc_input_hash=run_state_result.run_state_ipc_input_hash, run_state_ipc_output_hash=run_state_result.run_state_ipc_output_hash, ) if artifact_kind == "walks": walks = payload_block.get("walks") if not isinstance(walks, list): return _build_restore_result( status="mismatch", reason=f"run-state-sidecar-walks-invalid:{slot}", restored=False, restored_kinds=restored_kinds, run_state_ipc_input_hash=run_state_result.run_state_ipc_input_hash, run_state_ipc_output_hash=run_state_result.run_state_ipc_output_hash, ) patch.walks = walks restored_kinds.append("walks") continue if artifact_kind == "candidates": candidates = payload_block.get("candidates") if not isinstance(candidates, list): return _build_restore_result( status="mismatch", reason=f"run-state-sidecar-candidates-invalid:{slot}", restored=False, restored_kinds=restored_kinds, run_state_ipc_input_hash=run_state_result.run_state_ipc_input_hash, run_state_ipc_output_hash=run_state_result.run_state_ipc_output_hash, ) patch.candidates = candidates restored_kinds.append("candidates") continue if artifact_kind == "selections": selected_names = payload_block.get("selected_names") if not isinstance(selected_names, list): return _build_restore_result( status="mismatch", reason=f"run-state-sidecar-selections-invalid:{slot}", restored=False, restored_kinds=restored_kinds, run_state_ipc_input_hash=run_state_result.run_state_ipc_input_hash, run_state_ipc_output_hash=run_state_result.run_state_ipc_output_hash, ) patch.selected_names = selected_names restored_kinds.append("selections") continue package_payload = payload_block.get("package") if not isinstance(package_payload, dict): return _build_restore_result( status="mismatch", reason=f"run-state-sidecar-package-invalid:{slot}", restored=False, restored_kinds=restored_kinds, run_state_ipc_input_hash=run_state_result.run_state_ipc_input_hash, run_state_ipc_output_hash=run_state_result.run_state_ipc_output_hash, ) restored_kinds.append("package") return _build_restore_result( status="verified", reason="run-state-restored", restored=len(restored_kinds) > 0, restored_kinds=restored_kinds, run_state_ipc_input_hash=run_state_result.run_state_ipc_input_hash, run_state_ipc_output_hash=run_state_result.run_state_ipc_output_hash, )
[docs] def is_stale_session_recoverable(*, status: str, reason: str | None) -> bool: """Return ``True`` for mismatch states safe to recover from raw payload.""" if status != "mismatch" or not isinstance(reason, str): return False return reason.endswith("run-state-output-hash-mismatch")
[docs] def handle_load_session( body: dict[str, Any], state: ServerState, *, coerce_lock_holder_id_fn: CoerceLockHolderFn, lock_conflict_error_fn: LockConflictErrorFn, handle_load_corpus_fn: LoadCorpusFn, restore_patch_artifacts_from_run_state_fn: RestorePatchArtifactsFn, read_json_object_fn: ReadJsonObjectFn, ) -> SessionLoadResponse | ErrorResponse | ErrorWithLockResponse: """Handle POST ``/api/walker/load-session`` with injected dependencies.""" from build_tools.syllable_walk_web.services.walker_session_store import load_session raw_session_id = body.get("session_id") if not isinstance(raw_session_id, str) or not raw_session_id.strip(): return {"error": "Missing or invalid session_id."} session_id = raw_session_id.strip() lock_holder_id, lock_holder_error = coerce_lock_holder_id_fn(body) if lock_holder_error is not None: return {"error": lock_holder_error} force_lock = bool(body.get("force_lock", False)) lock_result: dict[str, Any] | None = None if isinstance(lock_holder_id, str): from build_tools.syllable_walk_web.services.walker_session_lock import acquire_session_lock lock_result = acquire_session_lock( state=state, session_id=session_id, holder_id=lock_holder_id, force=force_lock, ) lock_status = lock_result.get("status") if lock_status == "locked": return cast( ErrorWithLockResponse, lock_conflict_error_fn( active_session_id=session_id, lock_payload=( lock_result.get("lock") if isinstance(lock_result.get("lock"), dict) else None ), ), ) if lock_status not in {"acquired", "held", "taken_over"}: return { "error": f"Failed to acquire session lock: {lock_result.get('reason', 'unknown')}" } try: result = load_session( session_id=session_id, output_base=state.output_base, configured_sessions_base=state.sessions_base, ) except Exception as e: return {"error": f"Session load failed: {e}"} payload: dict[str, Any] | None = result.payload if isinstance(result.payload, dict) else None recovered_from_stale_session = False if payload is None and is_stale_session_recoverable(status=result.status, reason=result.reason): candidate_path = getattr(result, "session_path", None) if isinstance(candidate_path, Path): recovered_payload = read_json_object_fn(candidate_path) if isinstance(recovered_payload, dict): payload = recovered_payload recovered_from_stale_session = True if payload is None: if isinstance(lock_holder_id, str): from build_tools.syllable_walk_web.services.walker_session_lock import ( release_session_lock, ) release_session_lock( state=state, session_id=session_id, holder_id=lock_holder_id, ) return { "status": result.status, "reason": result.reason, "session_id": result.session_id or session_id, "ipc_input_hash": result.ipc_input_hash, "ipc_output_hash": result.ipc_output_hash, "patch_a": { "loaded": False, "restored": False, "verification_status": result.status, "verification_reason": result.reason, }, "patch_b": { "loaded": False, "restored": False, "verification_status": result.status, "verification_reason": result.reason, }, } patch_load_results: dict[str, SessionLoadPatchResult] = {} for patch_key in ("a", "b"): patch_ref = payload.get(f"patch_{patch_key}") if patch_ref is None: patch_load_results[patch_key] = _patch_load_failure_result( verification_status="missing", verification_reason=f"session-patch-{patch_key}-absent", ) continue if not isinstance(patch_ref, dict): patch_load_results[patch_key] = _patch_load_failure_result( verification_status="mismatch", verification_reason=f"session-patch-{patch_key}-invalid", ) continue run_id = patch_ref.get("run_id") if not isinstance(run_id, str) or not run_id.strip(): patch_load_results[patch_key] = _patch_load_failure_result( verification_status="mismatch", verification_reason=f"session-patch-{patch_key}-run-id-missing", ) continue load_result = handle_load_corpus_fn( { "patch": patch_key, "run_id": run_id, "_internal_session_load": True, "lock_holder_id": lock_holder_id, }, state, ) if "error" in load_result: patch_load_results[patch_key] = _patch_load_failure_result( verification_status="error", verification_reason=str(load_result["error"]), run_id=run_id, ) continue patch_state = state.patch_a if patch_key == "a" else state.patch_b restore_result = restore_patch_artifacts_from_run_state_fn( patch_key=patch_key, patch=patch_state, ) verification_status = "verified" verification_reason = "session-load-started" restored = False if restore_result["status"] == "verified": verification_reason = str(restore_result["reason"]) restored = bool(restore_result["restored"]) elif restore_result["status"] == "skipped": verification_reason = str(restore_result["reason"]) else: verification_status = str(restore_result["status"]) verification_reason = str(restore_result["reason"]) patch_load_results[patch_key] = { "loaded": True, "restored": restored, "restored_kinds": list(restore_result["restored_kinds"]), "verification_status": verification_status, "verification_reason": verification_reason, "run_id": run_id, "status": load_result.get("status"), "source": load_result.get("source"), "syllable_count": load_result.get("syllable_count"), "run_state_ipc_input_hash": restore_result["run_state_ipc_input_hash"], "run_state_ipc_output_hash": restore_result["run_state_ipc_output_hash"], } loaded_session_id = ( payload.get("session_id", session_id) if isinstance(payload.get("session_id", session_id), str) else session_id ) state.active_session_id = loaded_session_id state.active_session_lock_holder_id = lock_holder_id lock_status_value = ( str(lock_result.get("status")) if isinstance(lock_result, dict) and isinstance(lock_result.get("status"), str) else "unlocked" ) lock_reason_value = ( str(lock_result.get("reason")) if isinstance(lock_result, dict) and isinstance(lock_result.get("reason"), str) else "no-lock-holder" ) return { "status": result.status if recovered_from_stale_session else "verified", "reason": result.reason if recovered_from_stale_session else "verified", "session_id": loaded_session_id, "ipc_input_hash": result.ipc_input_hash, "ipc_output_hash": result.ipc_output_hash, "recovered_from_stale_session": recovered_from_stale_session, "session_lock": { "status": lock_status_value, "reason": lock_reason_value, "lock": ( lock_result.get("lock") if isinstance(lock_result, dict) and isinstance(lock_result.get("lock"), dict) else None ), }, "patch_a": patch_load_results["a"], "patch_b": patch_load_results["b"], }