"""Session IPC store for dual-patch walker restore metadata.
This service implements Phase 2 of the Patch A/B IPC session-load plan:
- write deterministic session artifacts under runtime-resolved ``sessions_base``
- verify session payload IPC integrity and linked run-state references
- list and load verified (or diagnosable) sessions for API integration
"""
from __future__ import annotations
import json
import re
from dataclasses import dataclass
from datetime import UTC, datetime
from importlib.metadata import PackageNotFoundError, version
from pathlib import Path
from typing import Any
from uuid import uuid4
from pipeworks_ipc.hashing import compute_output_hash, compute_payload_hash
from build_tools.syllable_walk_web.services.session_paths import (
PATCH_KEYS,
resolve_sessions_base,
session_file_path,
)
from build_tools.syllable_walk_web.services.walker_run_state_store import (
RUN_STATE_FILENAME,
verify_run_state,
)
from build_tools.syllable_walk_web.state import PatchState, ServerState
SESSION_SCHEMA_VERSION = 1
SESSION_KIND = "walker_patch_session"
SESSION_RUN_STATE_RELATIVE_PATH = f"ipc/{RUN_STATE_FILENAME}"
IPC_LIBRARY = "pipeworks-ipc"
_SHA256_RE = re.compile(r"^[0-9a-f]{64}$")
[docs]
@dataclass(frozen=True)
class SessionPatchReferenceResult:
"""Outcome of resolving one patch reference for session payload building."""
status: str
reason: str
patch_ref: dict[str, Any] | None
[docs]
@dataclass(frozen=True)
class SessionSaveResult:
"""Outcome of saving one dual-patch session payload."""
status: str
reason: str
session_id: str | None = None
session_path: Path | None = None
patch_a_status: str | None = None
patch_a_reason: str | None = None
patch_b_status: str | None = None
patch_b_reason: str | None = None
ipc_input_hash: str | None = None
ipc_output_hash: str | None = None
root_session_id: str | None = None
parent_session_id: str | None = None
revision: int | None = None
[docs]
@dataclass(frozen=True)
class SessionVerificationResult:
"""Outcome of validating one persisted session artifact."""
status: str
reason: str
session_path: Path
session_id: str | None = None
ipc_input_hash: str | None = None
ipc_output_hash: str | None = None
[docs]
@dataclass(frozen=True)
class SessionLoadResult:
"""Outcome of loading one persisted session artifact."""
status: str
reason: str
session_path: Path
session_id: str | None = None
payload: dict[str, Any] | None = None
ipc_input_hash: str | None = None
ipc_output_hash: str | None = None
[docs]
@dataclass(frozen=True)
class SessionListEntry:
"""List item representing one persisted session artifact."""
session_id: str
created_at_utc: str | None
label: str | None
patch_a_run_id: str | None
patch_b_run_id: str | None
verification_status: str
verification_reason: str
session_path: Path
root_session_id: str | None = None
parent_session_id: str | None = None
revision: int | None = None
def _utc_now_iso() -> str:
"""Return UTC timestamp in ``YYYY-MM-DDTHH:MM:SSZ`` format."""
return datetime.now(UTC).replace(microsecond=0).strftime("%Y-%m-%dT%H:%M:%SZ")
def _json_dumps_canonical(payload: dict[str, Any]) -> str:
"""Serialize JSON deterministically for stable hashing."""
return json.dumps(payload, ensure_ascii=False, sort_keys=True, separators=(",", ":"))
def _is_sha256_hex(value: Any) -> bool:
"""Return ``True`` when value is a canonical lowercase SHA-256 hash."""
return isinstance(value, str) and _SHA256_RE.match(value) is not None
def _resolve_pipeworks_ipc_version() -> str:
"""Resolve installed ``pipeworks-ipc`` version for metadata fields."""
try:
return version("pipeworks-ipc")
except PackageNotFoundError:
return "unknown"
def _read_json_object(path: Path) -> dict[str, Any] | None:
"""Read JSON object from ``path``; return ``None`` for parse/type errors."""
try:
payload = json.loads(path.read_text(encoding="utf-8"))
except (OSError, UnicodeDecodeError, json.JSONDecodeError):
return None
if not isinstance(payload, dict):
return None
return payload
def _new_session_id() -> str:
"""Generate opaque session id suitable for filename usage."""
timestamp = datetime.now(UTC).strftime("%Y%m%d_%H%M%S")
return f"session_{timestamp}_{uuid4().hex[:10]}"
def _normalize_label(label: str | None) -> str | None:
"""Normalize optional user label for session payloads."""
if label is None:
return None
cleaned = str(label).strip()
return cleaned or None
def _default_lineage(*, session_id: str) -> dict[str, Any]:
"""Build default lineage metadata for a first-generation session."""
return {
"root_session_id": session_id,
"parent_session_id": None,
"revision": 0,
}
def _coerce_lineage(raw: Any) -> dict[str, Any] | None:
"""Validate/coerce session lineage metadata when present."""
if not isinstance(raw, dict):
return None
root_session_id = raw.get("root_session_id")
parent_session_id = raw.get("parent_session_id")
revision = raw.get("revision")
if not isinstance(root_session_id, str) or not root_session_id.strip():
return None
if parent_session_id is not None and (
not isinstance(parent_session_id, str) or not parent_session_id.strip()
):
return None
if not isinstance(revision, int) or revision < 0:
return None
return {
"root_session_id": root_session_id.strip(),
"parent_session_id": (
parent_session_id.strip() if isinstance(parent_session_id, str) else None
),
"revision": revision,
}
def _coerce_patch_ref(raw: Any, expected_patch: str) -> dict[str, str] | None:
"""Validate/coerce one persisted patch reference object."""
if raw is None:
return None
if not isinstance(raw, dict):
return None
required = {
"patch",
"run_id",
"manifest_ipc_output_hash",
"run_state_relative_path",
"run_state_ipc_output_hash",
}
if not required.issubset(raw.keys()):
return None
patch = raw.get("patch")
run_id = raw.get("run_id")
manifest_hash = raw.get("manifest_ipc_output_hash")
run_state_rel = raw.get("run_state_relative_path")
run_state_hash = raw.get("run_state_ipc_output_hash")
if patch != expected_patch:
return None
if not isinstance(run_id, str) or not run_id.strip():
return None
if run_state_rel != SESSION_RUN_STATE_RELATIVE_PATH:
return None
if not _is_sha256_hex(manifest_hash) or not _is_sha256_hex(run_state_hash):
return None
return {
"patch": patch,
"run_id": run_id,
"manifest_ipc_output_hash": str(manifest_hash),
"run_state_relative_path": run_state_rel,
"run_state_ipc_output_hash": str(run_state_hash),
}
def _build_patch_reference(
*, patch_key: str, patch_state: PatchState
) -> SessionPatchReferenceResult:
"""Build and verify one patch reference from in-memory patch state."""
if patch_key not in PATCH_KEYS:
return SessionPatchReferenceResult(
status="error",
reason=f"invalid-patch:{patch_key}",
patch_ref=None,
)
run_id = patch_state.run_id
run_dir = patch_state.corpus_dir
manifest_hash = patch_state.manifest_ipc_output_hash
if not isinstance(run_id, str) or not run_id.strip():
return SessionPatchReferenceResult(
status="skipped",
reason=f"patch-{patch_key}-run-id-missing",
patch_ref=None,
)
if not isinstance(run_dir, Path):
return SessionPatchReferenceResult(
status="skipped",
reason=f"patch-{patch_key}-run-dir-missing",
patch_ref=None,
)
if not _is_sha256_hex(manifest_hash):
return SessionPatchReferenceResult(
status="skipped",
reason=f"patch-{patch_key}-manifest-hash-missing",
patch_ref=None,
)
verification = verify_run_state(
run_dir=run_dir,
run_id=run_id,
manifest_ipc_output_hash=str(manifest_hash),
)
if verification.status != "verified":
return SessionPatchReferenceResult(
status=verification.status,
reason=f"patch-{patch_key}-run-state-{verification.reason}",
patch_ref=None,
)
if not _is_sha256_hex(verification.run_state_ipc_output_hash):
return SessionPatchReferenceResult(
status="mismatch",
reason=f"patch-{patch_key}-run-state-output-hash-missing",
patch_ref=None,
)
return SessionPatchReferenceResult(
status="saved",
reason="saved",
patch_ref={
"patch": patch_key,
"run_id": run_id,
"manifest_ipc_output_hash": str(manifest_hash),
"run_state_relative_path": SESSION_RUN_STATE_RELATIVE_PATH,
"run_state_ipc_output_hash": str(verification.run_state_ipc_output_hash),
},
)
def _build_session_input_payload(
*,
session_id: str,
label: str | None,
patch_a: dict[str, Any] | None,
patch_b: dict[str, Any] | None,
lineage: dict[str, Any] | None = None,
) -> dict[str, Any]:
"""Build canonical session IPC input payload."""
payload = {
"session_id": session_id,
"label": label,
"patch_a": patch_a,
"patch_b": patch_b,
}
if lineage is not None:
payload["lineage"] = lineage
return payload
def _build_session_output_payload(
*,
patch_a: dict[str, Any] | None,
patch_b: dict[str, Any] | None,
) -> dict[str, Any]:
"""Build canonical session IPC output payload."""
return {
"patch_a": patch_a,
"patch_b": patch_b,
}
[docs]
def save_session(
*,
state: ServerState,
label: str | None = None,
session_id: str | None = None,
repair_from_session_id: str | None = None,
) -> SessionSaveResult:
"""Persist one dual-patch session payload under resolved ``sessions_base``."""
normalized_label = _normalize_label(label)
cleaned_session_id = (
session_id.strip() if isinstance(session_id, str) and session_id.strip() else None
)
cleaned_repair_source_id = (
repair_from_session_id.strip()
if isinstance(repair_from_session_id, str) and repair_from_session_id.strip()
else None
)
if cleaned_session_id is not None and cleaned_repair_source_id is not None:
return SessionSaveResult(
status="error",
reason="session-id-and-repair-source-are-mutually-exclusive",
)
lineage: dict[str, Any]
if cleaned_repair_source_id is not None:
parent_path = session_file_path(
session_id=cleaned_repair_source_id,
output_base=state.output_base,
configured_sessions_base=state.sessions_base,
)
parent_payload = _read_json_object(parent_path)
if parent_payload is None:
return SessionSaveResult(
status="missing",
reason="repair-source-session-missing-or-invalid",
)
parent_session_id_raw = parent_payload.get("session_id")
parent_session_id = (
parent_session_id_raw.strip()
if isinstance(parent_session_id_raw, str) and parent_session_id_raw.strip()
else cleaned_repair_source_id
)
parent_lineage = _coerce_lineage(parent_payload.get("lineage"))
if parent_lineage is None:
parent_lineage = _default_lineage(session_id=parent_session_id)
lineage = {
"root_session_id": parent_lineage["root_session_id"],
"parent_session_id": parent_session_id,
"revision": int(parent_lineage["revision"]) + 1,
}
resolved_session_id = _new_session_id()
else:
resolved_session_id = cleaned_session_id or _new_session_id()
path = session_file_path(
session_id=resolved_session_id,
output_base=state.output_base,
configured_sessions_base=state.sessions_base,
)
if path.exists():
return SessionSaveResult(
status="mismatch",
reason="session-id-already-exists",
session_id=resolved_session_id,
)
lineage = _default_lineage(session_id=resolved_session_id)
patch_a_result = _build_patch_reference(patch_key="a", patch_state=state.patch_a)
patch_b_result = _build_patch_reference(patch_key="b", patch_state=state.patch_b)
if patch_a_result.patch_ref is None and patch_b_result.patch_ref is None:
return SessionSaveResult(
status="skipped",
reason="no-verifiable-patches",
session_id=resolved_session_id,
patch_a_status=patch_a_result.status,
patch_a_reason=patch_a_result.reason,
patch_b_status=patch_b_result.status,
patch_b_reason=patch_b_result.reason,
root_session_id=lineage["root_session_id"],
parent_session_id=lineage["parent_session_id"],
revision=lineage["revision"],
)
created_at_utc = _utc_now_iso()
input_payload = _build_session_input_payload(
session_id=resolved_session_id,
label=normalized_label,
patch_a=patch_a_result.patch_ref,
patch_b=patch_b_result.patch_ref,
lineage=lineage,
)
output_payload = _build_session_output_payload(
patch_a=patch_a_result.patch_ref,
patch_b=patch_b_result.patch_ref,
)
ipc_input_hash = compute_payload_hash(input_payload)
ipc_output_hash = compute_output_hash(_json_dumps_canonical(output_payload))
payload = {
"schema_version": SESSION_SCHEMA_VERSION,
"session_kind": SESSION_KIND,
"session_id": resolved_session_id,
"created_at_utc": created_at_utc,
"label": normalized_label,
"lineage": lineage,
"patch_a": patch_a_result.patch_ref,
"patch_b": patch_b_result.patch_ref,
"ipc": {
"version": SESSION_SCHEMA_VERSION,
"library": IPC_LIBRARY,
"library_ref": f"pipeworks-ipc-v{_resolve_pipeworks_ipc_version()}",
"input_hash": ipc_input_hash,
"output_hash": ipc_output_hash,
"input_payload": input_payload,
"output_payload": output_payload,
},
}
path = session_file_path(
session_id=resolved_session_id,
output_base=state.output_base,
configured_sessions_base=state.sessions_base,
)
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8")
return SessionSaveResult(
status="saved",
reason="saved",
session_id=resolved_session_id,
session_path=path,
patch_a_status=patch_a_result.status,
patch_a_reason=patch_a_result.reason,
patch_b_status=patch_b_result.status,
patch_b_reason=patch_b_result.reason,
ipc_input_hash=ipc_input_hash,
ipc_output_hash=ipc_output_hash,
root_session_id=lineage["root_session_id"],
parent_session_id=lineage["parent_session_id"],
revision=lineage["revision"],
)
[docs]
def verify_session(*, session_path: Path, output_base: Path) -> SessionVerificationResult:
"""Verify persisted session payload + linked run-state references."""
if not session_path.exists():
return SessionVerificationResult(
status="missing",
reason="session-missing",
session_path=session_path,
)
payload = _read_json_object(session_path)
if payload is None:
return SessionVerificationResult(
status="error",
reason="session-parse-error",
session_path=session_path,
)
if payload.get("schema_version") != SESSION_SCHEMA_VERSION:
return SessionVerificationResult(
status="mismatch",
reason="session-schema-version-mismatch",
session_path=session_path,
)
if payload.get("session_kind") != SESSION_KIND:
return SessionVerificationResult(
status="mismatch",
reason="session-kind-mismatch",
session_path=session_path,
)
session_id = payload.get("session_id")
if not isinstance(session_id, str) or not session_id.strip():
return SessionVerificationResult(
status="mismatch",
reason="session-id-missing",
session_path=session_path,
)
raw_lineage = payload.get("lineage", None)
if raw_lineage is None:
lineage = _default_lineage(session_id=session_id)
else:
coerced_lineage = _coerce_lineage(raw_lineage)
if coerced_lineage is None:
return SessionVerificationResult(
status="mismatch",
reason="session-lineage-invalid",
session_path=session_path,
session_id=session_id,
)
lineage = coerced_lineage
patch_a = _coerce_patch_ref(payload.get("patch_a"), "a")
patch_b = _coerce_patch_ref(payload.get("patch_b"), "b")
raw_patch_a = payload.get("patch_a")
raw_patch_b = payload.get("patch_b")
if raw_patch_a is not None and patch_a is None:
return SessionVerificationResult(
status="mismatch",
reason="session-patch-a-invalid",
session_path=session_path,
session_id=session_id,
)
if raw_patch_b is not None and patch_b is None:
return SessionVerificationResult(
status="mismatch",
reason="session-patch-b-invalid",
session_path=session_path,
session_id=session_id,
)
if patch_a is None and patch_b is None:
return SessionVerificationResult(
status="mismatch",
reason="session-no-patches",
session_path=session_path,
session_id=session_id,
)
ipc = payload.get("ipc")
if not isinstance(ipc, dict):
return SessionVerificationResult(
status="mismatch",
reason="session-ipc-block-missing",
session_path=session_path,
session_id=session_id,
)
stored_in = ipc.get("input_hash")
stored_out = ipc.get("output_hash")
input_payload = ipc.get("input_payload")
output_payload = ipc.get("output_payload")
if not _is_sha256_hex(stored_in) or not _is_sha256_hex(stored_out):
return SessionVerificationResult(
status="mismatch",
reason="session-ipc-hash-invalid",
session_path=session_path,
session_id=session_id,
)
if not isinstance(input_payload, dict) or not isinstance(output_payload, dict):
return SessionVerificationResult(
status="mismatch",
reason="session-ipc-payload-invalid",
session_path=session_path,
session_id=session_id,
)
normalized_label = _normalize_label(payload.get("label"))
expected_input_payload = _build_session_input_payload(
session_id=session_id,
label=normalized_label,
patch_a=patch_a,
patch_b=patch_b,
lineage=lineage if raw_lineage is not None else None,
)
expected_output_payload = _build_session_output_payload(
patch_a=patch_a,
patch_b=patch_b,
)
if input_payload != expected_input_payload:
return SessionVerificationResult(
status="mismatch",
reason="session-ipc-input-payload-mismatch",
session_path=session_path,
session_id=session_id,
)
if output_payload != expected_output_payload:
return SessionVerificationResult(
status="mismatch",
reason="session-ipc-output-payload-mismatch",
session_path=session_path,
session_id=session_id,
)
expected_input_hash = compute_payload_hash(expected_input_payload)
expected_output_hash = compute_output_hash(_json_dumps_canonical(expected_output_payload))
if stored_in != expected_input_hash:
return SessionVerificationResult(
status="mismatch",
reason="session-input-hash-mismatch",
session_path=session_path,
session_id=session_id,
ipc_input_hash=str(stored_in),
ipc_output_hash=str(stored_out),
)
if stored_out != expected_output_hash:
return SessionVerificationResult(
status="mismatch",
reason="session-output-hash-mismatch",
session_path=session_path,
session_id=session_id,
ipc_input_hash=str(stored_in),
ipc_output_hash=str(stored_out),
)
for patch_ref in (patch_a, patch_b):
if patch_ref is None:
continue
run_dir = output_base / patch_ref["run_id"]
run_state_verification = verify_run_state(
run_dir=run_dir,
run_id=patch_ref["run_id"],
manifest_ipc_output_hash=patch_ref["manifest_ipc_output_hash"],
)
if run_state_verification.status != "verified":
return SessionVerificationResult(
status=run_state_verification.status,
reason=f"session-{patch_ref['patch']}-run-state-{run_state_verification.reason}",
session_path=session_path,
session_id=session_id,
ipc_input_hash=str(stored_in),
ipc_output_hash=str(stored_out),
)
if (
run_state_verification.run_state_ipc_output_hash
!= patch_ref["run_state_ipc_output_hash"]
):
return SessionVerificationResult(
status="mismatch",
reason=f"session-{patch_ref['patch']}-run-state-output-hash-mismatch",
session_path=session_path,
session_id=session_id,
ipc_input_hash=str(stored_in),
ipc_output_hash=str(stored_out),
)
return SessionVerificationResult(
status="verified",
reason="verified",
session_path=session_path,
session_id=session_id,
ipc_input_hash=str(stored_in),
ipc_output_hash=str(stored_out),
)
[docs]
def load_session(
*,
session_id: str,
output_base: Path,
configured_sessions_base: Path | None = None,
) -> SessionLoadResult:
"""Load one session by id when verification succeeds."""
path = session_file_path(
session_id=session_id,
output_base=output_base,
configured_sessions_base=configured_sessions_base,
)
verification = verify_session(session_path=path, output_base=output_base)
if verification.status != "verified":
return SessionLoadResult(
status=verification.status,
reason=verification.reason,
session_path=path,
session_id=verification.session_id,
payload=None,
ipc_input_hash=verification.ipc_input_hash,
ipc_output_hash=verification.ipc_output_hash,
)
payload = _read_json_object(path)
if payload is None:
return SessionLoadResult(
status="error",
reason="session-parse-error",
session_path=path,
session_id=verification.session_id,
payload=None,
ipc_input_hash=verification.ipc_input_hash,
ipc_output_hash=verification.ipc_output_hash,
)
return SessionLoadResult(
status="verified",
reason="verified",
session_path=path,
session_id=verification.session_id,
payload=payload,
ipc_input_hash=verification.ipc_input_hash,
ipc_output_hash=verification.ipc_output_hash,
)
[docs]
def list_sessions(
*,
output_base: Path,
configured_sessions_base: Path | None = None,
) -> list[SessionListEntry]:
"""List persisted sessions in descending ``created_at_utc`` order."""
sessions_base = resolve_sessions_base(
output_base=output_base,
configured_sessions_base=configured_sessions_base,
)
if not sessions_base.exists() or not sessions_base.is_dir():
return []
entries: list[SessionListEntry] = []
for path in sessions_base.glob("*.json"):
payload = _read_json_object(path)
if payload is None:
continue
session_id = payload.get("session_id")
if not isinstance(session_id, str) or not session_id.strip():
continue
verification = verify_session(session_path=path, output_base=output_base)
patch_a = payload.get("patch_a")
patch_b = payload.get("patch_b")
raw_lineage = payload.get("lineage", None)
lineage = _coerce_lineage(raw_lineage) if raw_lineage is not None else None
patch_a_run_id = patch_a.get("run_id") if isinstance(patch_a, dict) else None
patch_b_run_id = patch_b.get("run_id") if isinstance(patch_b, dict) else None
created_at_utc = payload.get("created_at_utc")
label = _normalize_label(payload.get("label"))
entries.append(
SessionListEntry(
session_id=session_id,
created_at_utc=str(created_at_utc) if isinstance(created_at_utc, str) else None,
label=label,
patch_a_run_id=str(patch_a_run_id) if isinstance(patch_a_run_id, str) else None,
patch_b_run_id=str(patch_b_run_id) if isinstance(patch_b_run_id, str) else None,
verification_status=verification.status,
verification_reason=verification.reason,
session_path=path,
root_session_id=(
lineage["root_session_id"]
if isinstance(lineage, dict)
else (session_id if isinstance(session_id, str) else None)
),
parent_session_id=(
lineage["parent_session_id"] if isinstance(lineage, dict) else None
),
revision=(lineage["revision"] if isinstance(lineage, dict) else 0),
)
)
entries.sort(key=lambda item: item.created_at_utc or "", reverse=True)
return entries