Source code for build_tools.corpus_sqlite_builder.converter

"""
JSON to SQLite conversion logic for corpus data.

This module handles converting large annotated JSON files into optimized
SQLite databases for efficient querying.
"""

import json
import sqlite3
from datetime import datetime, timezone
from pathlib import Path

from .schema import CORPUS_SCHEMA_VERSION, create_database, insert_metadata


[docs] def find_annotated_json(data_dir: Path) -> Path | None: """ Find the annotated JSON file in a corpus data directory. Args: data_dir: Path to the data directory (e.g., _working/output/.../data/) Returns: Path to the annotated JSON file, or None if not found Looks for files matching the pattern: *_syllables_annotated.json Supports both pyphen and nltk prefixes. """ if not data_dir.exists() or not data_dir.is_dir(): return None # Look for annotated JSON files json_files = list(data_dir.glob("*_syllables_annotated.json")) if len(json_files) == 0: return None elif len(json_files) == 1: return json_files[0] else: # Multiple annotated JSON files found - this shouldn't happen # Return the first one but this indicates a potential issue return json_files[0]
[docs] def convert_json_to_sqlite(corpus_dir: Path, force: bool = False, batch_size: int = 10000) -> Path: """ Convert an annotated JSON file to a SQLite database. This function discovers the annotated JSON file in the corpus directory, creates a SQLite database with the appropriate schema, and efficiently converts all syllable data using batched transactions. Args: corpus_dir: Path to corpus directory (e.g., _working/output/20260110_115453_pyphen/) force: If True, overwrite existing database. If False, raise error if exists. batch_size: Number of records to insert per transaction (default: 10000) Returns: Path to the created corpus.db file Raises: FileNotFoundError: If corpus_dir doesn't exist or no annotated JSON found FileExistsError: If corpus.db exists and force=False ValueError: If JSON structure is invalid json.JSONDecodeError: If JSON is malformed """ # Validate corpus directory exists if not corpus_dir.exists() or not corpus_dir.is_dir(): raise FileNotFoundError(f"Corpus directory not found: {corpus_dir}") # Find the annotated JSON file data_dir = corpus_dir / "data" json_path = find_annotated_json(data_dir) if json_path is None: raise FileNotFoundError( f"No annotated JSON file found in {data_dir}. " "Looking for: *_syllables_annotated.json" ) # Set up database path db_path = data_dir / "corpus.db" # Check if database already exists if db_path.exists() and not force: raise FileExistsError( f"Database already exists: {db_path}\n" "Use --force to overwrite, or run a new extraction instead." ) # Load and validate JSON data print(f"Loading JSON from: {json_path.name}") with open(json_path, "r", encoding="utf-8") as f: data = json.load(f) if not isinstance(data, list): raise ValueError("JSON must contain a list of syllable records") if len(data) == 0: raise ValueError("JSON contains no syllable records") # Validate structure of first record _validate_record_structure(data[0]) print(f"Converting {len(data):,} syllables to SQLite...") # Remove existing database if force=True if db_path.exists(): db_path.unlink() # Create new database conn = create_database(db_path) try: # Insert syllables in batches _insert_syllables_batched(conn, data, batch_size) # Insert metadata metadata = { "schema_version": str(CORPUS_SCHEMA_VERSION), "source_tool": "corpus_sqlite_builder", "source_version": "0.2.0", # TODO: Get from package version "generated_at": datetime.now(timezone.utc).isoformat(), "total_syllables": str(len(data)), "source_json_path": str(json_path.name), } insert_metadata(conn, metadata) # Optimize database for query performance # VACUUM: Reclaims unused space, defragments the database file # ANALYZE: Updates query planner statistics for optimal query plans # These operations improve both file size and query performance print("Optimizing database...") conn.execute("VACUUM;") conn.execute("ANALYZE;") conn.commit() # Verify data integrity cursor = conn.cursor() row_count = cursor.execute("SELECT COUNT(*) FROM syllables").fetchone()[0] if row_count != len(data): raise ValueError( f"Data integrity check failed: expected {len(data)} rows, " f"found {row_count} rows" ) print(f"✓ Successfully created: {db_path.name}") print(f" Syllables: {row_count:,}") print(f" Size: {db_path.stat().st_size / (1024 * 1024):.1f} MB") return db_path except Exception: conn.close() # Clean up partial database on error if db_path.exists(): db_path.unlink() raise finally: conn.close()
def _validate_record_structure(record: dict) -> None: """ Validate that a syllable record has the expected structure. Args: record: Dictionary representing a syllable record Raises: ValueError: If record structure is invalid """ required_fields = ["syllable", "frequency", "features"] for field in required_fields: if field not in record: raise ValueError(f"Record missing required field: {field}") features = record["features"] if not isinstance(features, dict): raise ValueError("Record 'features' must be a dictionary") required_features = [ "starts_with_vowel", "starts_with_cluster", "starts_with_heavy_cluster", "contains_plosive", "contains_fricative", "contains_liquid", "contains_nasal", "short_vowel", "long_vowel", "ends_with_vowel", "ends_with_nasal", "ends_with_stop", ] for feature in required_features: if feature not in features: raise ValueError(f"Record features missing required field: {feature}") def _insert_syllables_batched(conn: sqlite3.Connection, data: list[dict], batch_size: int) -> None: """ Insert syllable records in batches for efficient bulk loading. Batching improves performance by reducing transaction overhead. Instead of committing after each row (30,000+ transactions), we commit after every batch_size rows (3-4 transactions for typical corpora). This provides: - 10-20x faster insertion than row-by-row commits - Controlled memory usage (only batch_size rows in memory at once) - Progress reporting without excessive overhead Args: conn: SQLite database connection data: List of syllable records from JSON batch_size: Number of records per transaction (default: 10,000) """ cursor = conn.cursor() batch = [] total = len(data) inserted = 0 for record in data: features = record["features"] # Convert boolean features to integers (SQLite doesn't have boolean type) row = ( record["syllable"], record["frequency"], int(features["starts_with_vowel"]), int(features["starts_with_cluster"]), int(features["starts_with_heavy_cluster"]), int(features["contains_plosive"]), int(features["contains_fricative"]), int(features["contains_liquid"]), int(features["contains_nasal"]), int(features["short_vowel"]), int(features["long_vowel"]), int(features["ends_with_vowel"]), int(features["ends_with_nasal"]), int(features["ends_with_stop"]), ) batch.append(row) # Insert batch when it reaches batch_size if len(batch) >= batch_size: cursor.executemany( """ INSERT INTO syllables ( syllable, frequency, starts_with_vowel, starts_with_cluster, starts_with_heavy_cluster, contains_plosive, contains_fricative, contains_liquid, contains_nasal, short_vowel, long_vowel, ends_with_vowel, ends_with_nasal, ends_with_stop ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, batch, ) conn.commit() inserted += len(batch) print(f" Progress: {inserted:,} / {total:,} ({inserted/total*100:.1f}%)") batch = [] # Insert remaining records if batch: cursor.executemany( """ INSERT INTO syllables ( syllable, frequency, starts_with_vowel, starts_with_cluster, starts_with_heavy_cluster, contains_plosive, contains_fricative, contains_liquid, contains_nasal, short_vowel, long_vowel, ends_with_vowel, ends_with_nasal, ends_with_stop ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, batch, ) conn.commit() inserted += len(batch) print(f" Progress: {inserted:,} / {total:,} (100.0%)")