"""Ingestion orchestrator — walks content directories, classifies, and upserts. Run: docker compose exec api-dev python -m orrery_search.ingest """ import sys from pathlib import Path from sqlalchemy import select from orrery_search.db import async_session from orrery_search.ingest.mdx_parser import strip_mdx from orrery_search.models.document import Document from orrery_search.services.search_text import build_search_text CONTENT_DIR = Path("/data/content") def _resolve_paths() -> Path: """Return content_dir, preferring Docker mount.""" if CONTENT_DIR.exists(): return CONTENT_DIR here = Path(__file__).resolve() project_root = here for _ in range(10): project_root = project_root.parent if (project_root / "docs" / "src").exists(): break return project_root / "docs" / "src" / "content" / "docs" def _classify_content_type(rel_path: str) -> str: """Classify content type from the relative path within content/docs/.""" parts = rel_path.split("/") if parts[0] == "getting-started": return "getting_started" if parts[0] == "guides": return "guide" if parts[0] == "workflow": return "workflow" if parts[0] == "reference": return "reference" if parts[0] == "architecture": return "architecture" if parts[0] == "performance": return "performance" return "page" def _mdx_path_to_url(rel_path: str) -> str: """Convert relative .mdx path to Starlight page URL.""" slug = rel_path.removesuffix(".mdx").removesuffix("/index") return f"/{slug}/" def _mdx_path_to_section(rel_path: str) -> str: """Extract section from relative path.""" parts = Path(rel_path).parts if len(parts) > 1: return "/".join(parts[:-1]) return "" def _mdx_path_to_slug(rel_path: str) -> str: """Convert to a unique slug for dedup.""" return rel_path.removesuffix(".mdx").removesuffix("/index") def _collect_mdx_pages(content_dir: Path) -> list[dict]: """Walk the content directory and parse all .mdx files.""" pages = [] for mdx_path in sorted(content_dir.rglob("*.mdx")): try: raw = mdx_path.read_text(encoding="utf-8") except (OSError, UnicodeDecodeError) as exc: print(f" SKIP {mdx_path}: {exc}", file=sys.stderr) continue frontmatter, body = strip_mdx(raw) rel_path = str(mdx_path.relative_to(content_dir)) title = frontmatter.get("title", mdx_path.stem.replace("-", " ").title()) description = frontmatter.get("description") content_type = _classify_content_type(rel_path) word_count = len(body.split()) pages.append({ "content_type": content_type, "slug": _mdx_path_to_slug(rel_path), "title": title, "section": _mdx_path_to_section(rel_path), "description": description, "body": body, "url": _mdx_path_to_url(rel_path), "word_count": word_count, }) return pages async def ingest(): """Main ingestion: read docs content, upsert into document table.""" content_dir = _resolve_paths() print(f"Content dir: {content_dir}", file=sys.stderr) if not content_dir.exists(): print(f"Content directory not found: {content_dir}", file=sys.stderr) sys.exit(1) pages = _collect_mdx_pages(content_dir) print(f"Found {len(pages)} published pages", file=sys.stderr) async with async_session() as db: inserted = 0 updated = 0 errors = 0 for i, page_data in enumerate(pages): try: search_text = build_search_text( title=page_data["title"], section=page_data["section"], content_type=page_data["content_type"], description=page_data["description"], body=page_data["body"], ) async with db.begin_nested(): stmt = select(Document).where( Document.slug == page_data["slug"] ) result = await db.execute(stmt) existing = result.scalar_one_or_none() if existing: existing.title = page_data["title"] existing.section = page_data["section"] existing.description = page_data["description"] existing.body = page_data["body"] existing.search_text = search_text existing.url = page_data["url"] existing.content_type = page_data["content_type"] existing.word_count = page_data["word_count"] updated += 1 else: db.add(Document( content_type=page_data["content_type"], slug=page_data["slug"], title=page_data["title"], section=page_data["section"], description=page_data["description"], body=page_data["body"], search_text=search_text, url=page_data["url"], word_count=page_data["word_count"], )) inserted += 1 if (i + 1) % 50 == 0: await db.commit() print( f" progress: {i + 1}/{len(pages)}", file=sys.stderr, ) except Exception as exc: print( f" ERROR on {page_data['slug']}: {exc}", file=sys.stderr, ) errors += 1 await db.commit() print( f"Ingestion complete: {inserted} inserted, {updated} updated, " f"{errors} errors ({inserted + updated} total)", file=sys.stderr, )