Source code for build_tools.syllable_walk_web.api.walker_cache_lock

"""Cache rebuild and lock endpoint helpers for walker API.

This module isolates operational handlers that manage:
- profile reach cache rebuild requests
- session lock heartbeat/release requests

The extraction is mechanical and behavior-preserving. ``api/walker.py`` keeps
public wrapper names and delegates into these functions.
"""

from __future__ import annotations

from pathlib import Path
from typing import Any, Callable, cast

from build_tools.syllable_walk_web.api.walker_types import (
    ErrorResponse,
    ErrorWithLockResponse,
    RebuildReachCacheResponse,
    SessionLockStatusResponse,
)
from build_tools.syllable_walk_web.state import PatchState, ServerState

EnforceActiveLockFn = Callable[[dict[str, Any], ServerState], dict[str, Any] | None]
ResolvePatchStateFn = Callable[[dict[str, Any], ServerState], tuple[str, PatchState] | None]
CoerceLockHolderFn = Callable[[dict[str, Any]], tuple[str | None, str | None]]
IsSha256HexFn = Callable[[Any], bool]


def _coerce_lock_request(
    body: dict[str, Any],
    *,
    coerce_lock_holder_id_fn: CoerceLockHolderFn,
) -> tuple[str | None, str | None, ErrorResponse | None]:
    """Validate lock request body and return session/holder identifiers."""

    raw_session_id = body.get("session_id")
    if not isinstance(raw_session_id, str) or not raw_session_id.strip():
        return None, None, {"error": "Missing or invalid session_id."}
    holder_id, holder_error = coerce_lock_holder_id_fn(body)
    if holder_error is not None or holder_id is None:
        return None, None, {"error": holder_error or "Missing lock_holder_id."}
    return raw_session_id.strip(), holder_id, None


def _lock_payload(result: dict[str, Any]) -> dict[str, Any] | None:
    """Extract normalized lock metadata block from lock service result."""

    lock = result.get("lock")
    return lock if isinstance(lock, dict) else None


def _lock_error_response(
    *,
    result: dict[str, Any],
    default_message: str,
) -> ErrorWithLockResponse:
    """Build consistent lock error payload for heartbeat/release endpoints."""

    return {
        "error": result.get("reason", default_message),
        "lock_status": str(result.get("status", "error")),
        "lock": _lock_payload(result),
    }


[docs] def handle_rebuild_reach_cache( body: dict[str, Any], state: ServerState, *, enforce_active_session_lock_fn: EnforceActiveLockFn, resolve_patch_state_fn: ResolvePatchStateFn, is_sha256_hex_fn: IsSha256HexFn, ) -> RebuildReachCacheResponse | ErrorResponse | ErrorWithLockResponse: """Handle ``POST /api/walker/rebuild-reach-cache``. Recomputes profile reaches for one loaded patch and rewrites run-local cache IPC artifact. """ lock_error = enforce_active_session_lock_fn(body, state) if lock_error is not None: return cast(ErrorWithLockResponse, lock_error) resolved = resolve_patch_state_fn(body, state) if resolved is None: return {"error": "Invalid patch. Must be 'a' or 'b'."} patch_key, patch = resolved requested_run_id = body.get("run_id") if requested_run_id is not None and ( not isinstance(requested_run_id, str) or not requested_run_id.strip() ): return {"error": "run_id must be a non-empty string when provided."} if isinstance(requested_run_id, str) and patch.run_id and requested_run_id != patch.run_id: return {"error": f"run_id mismatch for patch {patch_key.upper()}."} if not patch.walker_ready or patch.walker is None: return {"error": f"Walker not ready for patch {patch_key.upper()}. Load a corpus first."} if not isinstance(patch.corpus_dir, Path): return {"error": f"Run directory missing for patch {patch_key.upper()}."} if not isinstance(patch.run_id, str) or not patch.run_id.strip(): return {"error": f"run_id missing for patch {patch_key.upper()}."} from build_tools.syllable_walk.reach import compute_all_reaches from build_tools.syllable_walk_web.services.profile_reaches_cache import ( read_cached_profile_reach_hashes, write_cached_profile_reaches, ) try: profile_reaches = compute_all_reaches(patch.walker) except Exception as e: return {"error": f"Failed to compute profile reaches: {e}"} wrote = write_cached_profile_reaches( run_dir=patch.corpus_dir, run_id=patch.run_id, walker=patch.walker, profile_reaches=profile_reaches, ) if not wrote: return {"error": "Failed to write reach cache artifact."} cache_input_hash, cache_output_hash = read_cached_profile_reach_hashes(patch.corpus_dir) patch.profile_reaches = profile_reaches patch.reach_cache_status = "hit" patch.reach_cache_ipc_input_hash = cache_input_hash patch.reach_cache_ipc_output_hash = cache_output_hash if is_sha256_hex_fn(cache_input_hash) and is_sha256_hex_fn(cache_output_hash): patch.reach_cache_ipc_verification_status = "verified" patch.reach_cache_ipc_verification_reason = "cache-rebuilt" else: patch.reach_cache_ipc_verification_status = "error" patch.reach_cache_ipc_verification_reason = "cache-rebuilt-hashes-missing" return { "patch": patch_key, "run_id": patch.run_id, "status": "rebuilt", "ipc_input_hash": patch.reach_cache_ipc_input_hash, "ipc_output_hash": patch.reach_cache_ipc_output_hash, "verification_status": patch.reach_cache_ipc_verification_status, "verification_reason": patch.reach_cache_ipc_verification_reason, }
[docs] def handle_session_lock_heartbeat( body: dict[str, Any], state: ServerState, *, coerce_lock_holder_id_fn: CoerceLockHolderFn, ) -> SessionLockStatusResponse | ErrorResponse | ErrorWithLockResponse: """Handle ``POST /api/walker/session-lock/heartbeat``.""" session_id, holder_id, request_error = _coerce_lock_request( body, coerce_lock_holder_id_fn=coerce_lock_holder_id_fn, ) if request_error is not None: return request_error assert session_id is not None assert holder_id is not None from build_tools.syllable_walk_web.services.walker_session_lock import heartbeat_session_lock result = heartbeat_session_lock( state=state, session_id=session_id, holder_id=holder_id, ) status = result.get("status") if status in {"locked", "error"}: return _lock_error_response( result=result, default_message="Session lock heartbeat failed.", ) if status == "missing": return { "status": "missing", "reason": result.get("reason", "Session lock not found."), "lock": None, } return { "status": "held", "reason": result.get("reason", "session lock refreshed"), "lock": _lock_payload(result), }
[docs] def handle_session_lock_release( body: dict[str, Any], state: ServerState, *, coerce_lock_holder_id_fn: CoerceLockHolderFn, ) -> SessionLockStatusResponse | ErrorResponse | ErrorWithLockResponse: """Handle ``POST /api/walker/session-lock/release``.""" session_id, holder_id, request_error = _coerce_lock_request( body, coerce_lock_holder_id_fn=coerce_lock_holder_id_fn, ) if request_error is not None: return request_error assert session_id is not None assert holder_id is not None from build_tools.syllable_walk_web.services.walker_session_lock import release_session_lock result = release_session_lock( state=state, session_id=session_id, holder_id=holder_id, ) status = result.get("status") if status in {"locked", "error"}: return _lock_error_response( result=result, default_message="Session lock release failed.", ) safe_status = str(status) if isinstance(status, str) else "released" return { "status": safe_status, "reason": result.get("reason", "session lock released"), "lock": _lock_payload(result), }