Skip to content

Ingestion architecture

The design for josh-ingester — how Josh pulls public federal data into the substrate SQLite file on a schedule.

This document is the canonical spec. Source modules, the scheduler, the runner, the state tables, and the admin UI's data shape all live downstream of decisions captured here. When this document disagrees with prose elsewhere, this document wins for ingestion concerns.

Storage backend: SQLite + FTS5 + sqlite-vec (with binary-quantization+rescore today, vec1 IVFADC+OPQ when it releases). See the Storage stack section of the architecture doc. Migrations are the source of truth for DDL; SQL examples here are illustrative.

Ingestion is plumbing. The substrate is the load-bearing layer, so the ingester should be boring, observable, restartable, and OSS-friendly. We optimize for:

  1. Independence. Each data source is a self-contained module. Adding a source adds one file. No source can break another.
  2. Resumability. Every stage produces a durable artifact (raw files on disk, parsed records in Postgres) so any stage can be re-run without redoing prior work.
  3. Observability without an external UI. State lives in Postgres tables that josh-web reads via josh-core API. We don't ship Airflow or Dagster.
  4. One-process simplicity at v1. A single josh-ingester container with an async scheduler, no message broker, no Celery. Scale up only when we measure a need.
  5. Deterministic backfill. Backfill is a CLI command, not an orchestrator concern. Bootstrapping a fresh install is a runbook.

Non-goals at v1: cross-source DAGs, real-time streaming, multi-tenant ingestion, complex data lineage tracking.

See migrations.html for the deploy story; the parallel decision for ingestion. The dependency graph across our 30 v1 sources is three tiers, mostly bootstrap-time, not runtime:

  • Tier 1 (independent, no FKs to other sources): Federal Register, Legislators, Committees, USC, CFR, CRS, GAO, CBO, Public Laws, SAP, taxonomy.
  • Tier 2 (FKs to Tier 1, but only at bootstrap): Bills (→ legislators, committees), Roll call votes, Hearings, LDA filings, Member statements, Committee reports.
  • Tier 3 (depends on text from 1+2 being loaded): Citation graph extraction.

After bootstrap, Tier 2 sources run on their own schedules and trust that Tier 1 has been refreshed at least once. No runtime ordering needed. Tier 3 runs as its own scheduled job after all loads complete.

A workflow scheduler (Airflow, Prefect, Dagster) buys us nothing for this shape and costs ~1.5-2 GB RAM, OSS-distribution complexity, and operational surface. We skip it.

The substrate is one SQLite file at /data/josh.db (with josh.db-wal and josh.db-shm companions in WAL mode). Both josh-core (read-mostly, agent and API serving) and josh-ingester (batch writes) bind-mount the same host directory, so they share the file directly.

SQLite's WAL mode allows concurrent readers and exactly one writer at a time. The ingester is structured to make this a non-issue:

  • One source at a time per ingester process (the scheduler kicks at most one source per slot, with an advisory file lock for belt-and-braces)
  • Writes batched (~500 records per transaction) and committed with BEGIN IMMEDIATE so the writer claim is acquired up-front rather than upgraded mid-tx
  • PRAGMA busy_timeout = 10000 gives readers/writers up to 10 seconds to acquire their lock before erroring
  • Embedding worker writes go through the same connection pool with the same discipline

Production concurrent-write profile is read-heavy from josh-core, write-bursty from the ingester during scheduled ticks. Profile fits SQLite cleanly. Re-evaluate if a multi-tenant deployment ever needs many concurrent user-driven writes.

┌───────────────────────────┐
┌─────────────────┐ │ josh-ingester │
│ external │ │ (single container) │
│ data sources │ │ │
│ │ │ ┌─────────────────────┐ │
│ Congress.gov │ ◀── HTTP (httpx) ──── │ │ Source: bills │ │
│ federalreg.gov │ │ │ Source: federal_..│ │
│ govinfo.gov │ │ │ Source: ... │ │
│ lda.senate.gov │ │ └─────────┬───────────┘ │
│ ... │ │ │ │
└─────────────────┘ │ ▼ │
│ ┌─────────────────────┐ │
/data/corpus/ ◀──────────────── │ │ runner.py │ │
(raw downloads) │ │ (4 stages) │ │
│ └─────────┬───────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────┐ │
│ │ scheduler.py │ │
│ │ (APScheduler) │ │
│ └─────────────────────┘ │
│ │
└────────────┬──────────────┘
┌───────────────────────────────────────┐
│ Postgres │
│ │
│ Substrate tables (fr_documents, │
│ bills, ...) │
│ │
│ State tables: │
│ ingestion_runs │
│ ingestion_logs │
│ ingestion_source_state │
│ ingestion_tasks │
└───────────────────────────────────────┘
│ reads via REST
┌──────────────────────┴────────────────┐
│ josh-web /admin/ingestion/* │
│ (reads through josh-core API) │
└───────────────────────────────────────┘

One container, one process, one async scheduler, many source modules.

Every source under ingester/sources/ implements the same interface:

from typing import Protocol, AsyncIterator
from datetime import datetime
from pathlib import Path
class FetchTask(BaseModel):
"""One unit of work for the fetch stage. Source-defined shape inside `params`."""
source: str # 'federal_register'
key: str # natural key, e.g. document_number 'fr:2026-08558'
params: dict # source-specific (URL, date, doc_id, etc.)
class ParsedRecord(BaseModel):
"""One parsed record ready for the load stage. Source-defined shape inside `payload`."""
source: str
table: str # 'fr_documents', 'bills', etc.
key: str # the natural key, used for upsert ON CONFLICT
payload: dict # the row to upsert
children: list["ParsedRecord"] = [] # related rows in other tables
class SourceState(BaseModel):
"""Per-source watermarks. Loaded from ingestion_source_state, written back by the runner."""
last_run_at: datetime | None
last_seen_key: str | None # source-specific cursor
custom: dict # source-defined extra state
class Source(Protocol):
name: str # canonical, lowercase, slug-shaped
schedule: str | None # cron expression, or None for manual-only
depends_on: list[str] # bootstrap order — used by `josh-ingester bootstrap`
async def discover(
self, state: SourceState, http: HttpClient
) -> AsyncIterator[FetchTask]:
"""Yield work units. Cheap. Reads HTTP if needed (e.g., RSS feed),
but doesn't fetch big payloads."""
async def fetch(self, task: FetchTask, http: HttpClient) -> Path:
"""Download to /data/corpus/{name}/.../filename. Idempotent — skip if exists.
Returns the on-disk path."""
async def parse(self, raw_path: Path) -> AsyncIterator[ParsedRecord]:
"""Read the raw artifact, yield validated records. Pure function of bytes
on disk — no network, no DB."""
async def load(
self, records: AsyncIterator[ParsedRecord], db: AsyncConnection
) -> LoadStats:
"""Upsert into Postgres. Transaction-per-batch. Returns counts.
Failures here surface as run-level errors."""

A source module is: one Python file, ~200-500 lines, no shared state with other sources.

The runner walks each source through these stages. Each stage's contract is narrow on purpose.

Produces FetchTask objects describing what work to do. Doesn't download body content. Reads cheap things like RSS feeds, sitemaps, list endpoints, or the previous run's watermarks.

Examples:

  • Federal Register: poll /documents.json?conditions[publication_date][gte]={last_run} and yield one FetchTask per document_number.
  • Bills: read GovInfo BILLSTATUS RSS feed, yield one FetchTask per file in the latest batches.
  • Legislators: yield one FetchTask per YAML file (eight tasks, basically static).

The discover step writes nothing to disk and produces no DB rows. It's pure work-list generation.

Downloads one task's payload to disk under /data/corpus/{source_name}/.... Idempotent — if the file exists with a sane size, skip. Concurrency happens here: the runner spawns N workers (default 4) that pull from the discover queue.

The on-disk path is part of the source's contract, so parse can find it later. Suggested layout:

/data/corpus/
├── federal_register/
│ ├── metadata/{YYYY}/{MM}/{document_number}.json
│ └── bodies/xml/{YYYY}/{MM}/{document_number}.xml
├── bills/
│ ├── billstatus/{congress}/{type}/BILLSTATUS-{c}{t}{n}.xml
│ └── text/{congress}/{type}/BILLS-{c}{t}{n}{version}.xml
├── legislators/
│ └── {filename}.yaml
└── ...

Fetch uses a shared HttpClient (httpx async) with retries, polite rate limiting (per-source token bucket), and a real-browser User-Agent.

Reads bytes off disk, produces ParsedRecord objects validated by Pydantic models. No network, no DB, no side effects — parse is a pure function of bytes-in to records-out. This makes it trivially re-runnable: change a parser bug, re-parse the existing corpus, re-load.

Each source defines its own Pydantic models per target table. The runner doesn't know about table schemas — it just shuttles records to load.

Upserts records into SQLite in transactions, batched (e.g., 500 at a time). Uses INSERT … ON CONFLICT(natural_key) DO UPDATE SET … (SQLite supports the same ON CONFLICT upsert syntax as Postgres). Records LoadStats (inserted, updated, skipped, errored) per batch.

Index updates happen as part of each transaction:

  • FTS5 full-text index is a separate virtual table per searchable parent table (e.g., fr_documents_fts). Triggers on the parent keep the FTS5 table in sync on insert/update/delete. The trigger feeds title/abstract/action/body into the FTS5 columns — column weights are applied at query time via bm25(fr_documents_fts, w_title, w_abstract, w_action, w_body).
  • Vector index (sqlite-vec vec0 virtual table) is NOT auto-populated — embedding is a separate step (see Embedding).

The load stage commits per batch with BEGIN IMMEDIATE. A failure mid-source means earlier batches are durable; the next run picks up from the last last_seen_key watermark.

Body normalization (HTML/XML/PDF → Markdown)

Section titled “Body normalization (HTML/XML/PDF → Markdown)”

Every source ships its body content in one of three formats — HTML (CRS, GAO, CBO), XML (Federal Register, USLM for USC and Public Laws, BILLSTATUS, eCFR), or PDF (committee reports sometimes, older hearings, GAO assets, SAPs). The substrate normalizes to a single canonical form: GitHub-flavored Markdown. Markdown is what gets chunked, embedded, indexed in FTS5, and rendered by the agent. The raw bytes stay on disk for re-conversion when libraries improve.

  • Uniform chunker interface. Section-aware chunking walks # / ## / ### headings — one implementation works across CRS HTML, FR XML, USC USLM, and PDF committee reports once they're all Markdown. No per-source XPath.
  • LLM-native. Modern LLMs handle Markdown without prompting tricks. Token cost is meaningfully lower than HTML.
  • Renderable everywhere. Agent clients, the eventual agent UI, and CLI output all consume Markdown trivially.
  • Diff-friendly. Bill-version comparison, proposed-vs-final rule diffs, etc., work better against Markdown than raw HTML or XML.

Raw bytes live as files on disk under /data/corpus/<source>/bodies/raw/... — HTML, XML, PDF, etc. Big binary-ish payloads stay out of the SQLite file so re-conversion is cheap and the DB stays small.

Normalized Markdown lives in the parent table's body_text column — text values that FTS5 can index directly. Storing markdown in a column rather than on disk avoids trigger-driven file reads (FTS5 indexes columns) and keeps queries simple.

/data/corpus/
├── federal_register/
│ └── bodies/
│ └── raw/{YYYY}/{MM}/{document_number}.xml # original FR XML
├── crs/
│ └── bodies/
│ └── raw/{number}.html # original CRS HTML
├── public_laws/
│ └── bodies/
│ └── raw/{congress}/PLAW-{c}publ{n}.xml # USLM
└── ...

Record tables store the path to the raw payload plus the normalized markdown directly:

-- Applied per record-bearing parent table that has body content
body_raw_path TEXT, -- /data/corpus/<source>/bodies/raw/...
body_raw_format TEXT, -- 'html' | 'xml' | 'pdf' | 'text'
body_raw_sha256 TEXT, -- hash of raw bytes (provenance + dedup)
body_text TEXT, -- normalized Markdown (FTS5 indexes this)
body_text_sha256 TEXT, -- hash of normalized output (re-conversion detection)
body_normalized_at TEXT, -- when body_text was generated
body_normalizer_version TEXT -- semver of converter used

body_normalizer_version is the key to safe evolution: when we ship a better parser, a migration finds rows where body_normalizer_version < {new_version}, re-reads body_raw_path, runs the new normalizer, and updates body_text + body_text_sha256 + body_normalized_at + body_normalizer_version in one transaction. No re-fetching from upstream APIs.

Why not put markdown on disk too? FTS5 virtual tables index columns, not files. If markdown lived only on disk, the FTS5 sync trigger would have to read files — fragile, slow, and complicates the trigger logic. Putting markdown in the column is simpler and aligns with how SQLite expects to be used. The DB-size cost is small (CRS averages ~50 KB markdown × 22K reports ≈ 1 GB; SQLite handles multi-TB DBs fine).

class Normalizer(Protocol):
name: str # 'html_v1', 'uslm_v1', 'fr_xml_v1', ...
version: str # semver — bump on parser change
accepts_format: str # 'html' | 'xml' | 'pdf' | 'text'
async def to_markdown(
self, raw_path: Path, source_hint: str | None = None
) -> NormalizedBody: ...
class NormalizedBody(BaseModel):
markdown: str
headings: list[Heading] # extracted heading tree, used by chunker
artifacts: list[Artifact] = [] # tables, figures preserved separately if lossy
warnings: list[str] = [] # parser-level warnings

Normalizers are pure functions of bytes-on-disk to Markdown-on-disk. No network. No DB. Trivially testable from fixtures.

FormatConverterLibraryNotes
HTMLhtml_v1markdownify primary, trafilatura fallback for messy pagesBulk of CRS, GAO product pages once we get past the bot wall
FR XMLfr_xml_v1hand-rolled lxml walkerPreserves <HD> headings, <P> paragraphs, <TABLE> as MD tables
USLMuslm_v1hand-rolled lxml walkerPreserves <title> / <chapter> / <section> hierarchy. Used for USC and PLAW
eCFR DIV-numbered XMLecfr_xml_v1hand-rolled lxml walkerSimilar shape to FR XML; preserves DIV1–DIV8 hierarchy
BILLSTATUS XMLmetadata-only; bodies via PLAW (USLM) or BILLS (billres DTD)uslm_v1 for enacted; billres_v1 walker for liveMost BILLSTATUS content is structured fields, not body text
PDFpdf_v1docling (OSS, IBM Research) primary, marker fallback. Mistral OCR if quality dominates and self-host doesn'tMost expensive; runs via separate queue (see below)
Plain texttext_v1passthrough with light whitespace cleanupRSS bodies, simple notice text
  • HTML and XML normalize inline during parse. The parser already has the DOM in memory; conversion is cheap. The output Markdown gets written into body_text as part of the load stage's upsert.
  • PDF normalizes via a separate queue. PDF parsing (especially with OCR) is the most expensive content step. Decoupling means PDF-heavy sources don't block their own metadata ingestion. Records load with body_text=NULL and a queue row; the normalization worker drains the queue and updates body_text later. Same shape as the embedding queue:
CREATE TABLE ingestion_normalization_queue (
id INTEGER PRIMARY KEY AUTOINCREMENT,
table_name TEXT NOT NULL,
record_id TEXT NOT NULL,
raw_path TEXT NOT NULL,
raw_format TEXT NOT NULL,
enqueued_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now')),
attempts INTEGER NOT NULL DEFAULT 0,
last_attempt_at TEXT,
last_error TEXT,
UNIQUE (table_name, record_id)
);

The agent gets metadata search the moment a record loads; full-text search and semantic search come online when normalization (and then embedding) catches up.

When uslm_v2 ships, no re-fetch is needed. A migration scans rows with body_normalizer_version < 'uslm_v2', re-reads body_raw_path, runs the new normalizer, and updates body_text + body_text_sha256 + body_normalized_at + body_normalizer_version. Then it enqueues the affected records into ingestion_embedding_queue so chunks get re-embedded. All in one transaction per record.

Every retrieval Josh hands back to an agent must carry enough provenance to cite the source without a second round-trip. This is non-negotiable for an agentic-first product whose outputs end up in regulatory comments, briefings, and reports. The agent must be able to drop a Bluebook-flavored citation directly into a user's document.

Every record-bearing parent table — every source — carries the following columns:

source_url TEXT NOT NULL, -- canonical deep link (page a human would visit)
source_org TEXT NOT NULL, -- issuing body, e.g. 'Congressional Research Service'
published_at TEXT, -- source's publication date (ISO-8601)
retrieved_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now')),
raw_sha256 TEXT, -- content hash of raw bytes — provenance + tamper-detection
citation_string TEXT NOT NULL -- pre-formatted citation, source-specific

Notes on each:

  • source_url is the human-facing URL, not the API endpoint. The EveryCRSReport HTML page, not everycrsreport.com/reports.csv. The FederalRegister.gov document page, not /api/v1/documents/{id}.json. The lda.gov filing detail page, not /api/v1/filings/{id}/.
  • source_org may be normalized via the existing agencies lookup (FR documents reference EPA, FDA, etc.); for sources where the org is constant or doesn't fit the agency table, store an explicit string ('Congressional Research Service', 'Government Accountability Office', 'Senate of the United States').
  • published_at semantics vary per source — document the exact field per source (FR publication_date, CRS latestPubDate, committee report issue_date, GAO RSS pubDate, etc.). ISO-8601 TEXT.
  • raw_sha256 lets us prove no tampering between fetch and serve. Computed in the fetch stage as bytes hit disk. Cheap.
  • citation_string is the formatted citation an agent can drop into a deliverable as-is. Per-source format below.

A small module per source defines citation_for(record) -> str. Bluebook-flavored but pragmatic — Josh isn't a legal product and we shouldn't pretend to be.

SourceFormat example
CRS reportCong. Rsch. Serv., R48481, Wildfire Emergency Spending (Mar. 15, 2025).
FR document91 Fed. Reg. 12345 (Mar. 4, 2026).
Public LawPub. L. No. 119-21 (2026).
US Code section42 U.S.C. § 1396a (2024).
GAO reportU.S. Gov't Accountability Off., GAO-25-107521, Title (Apr. 2025).
Committee reportH.R. Rep. No. 119-100 (2025).
BillH.R. 1234, 119th Cong. (2025).
Hearing transcript (CHRG)Hearing Title: Hearing Before the H. Comm. on X, 119th Cong. (2025).
Roll call voteRoll Call Vote No. 362 (House), 119th Cong., 1st Sess. (Mar. 4, 2025).
LDA filingLobbying Disclosure Act Filing {filing_uuid}, {client} via {registrant} (Q3 2025).
SAPStatement of Administration Policy on H.R. 1234 (Jan. 22, 2025).
Congressional Record171 Cong. Rec. H1234 (daily ed. Mar. 4, 2025) (statement of Rep. X).

Formatters live in shared/josh_substrate/src/josh_substrate/citations/formatters/<source>.py. Both josh-ingester (populates citation_string at load time) and josh-core (re-formats when needed) share them.

Search retrieval returns chunks, not whole records. Every <source>_chunks table carries a locator that lets the citation include the exact place the snippet came from:

chunk_locator_json TEXT -- {"heading_path": ["Background", "Statutory Authority"],
-- "section": "II.B", "page": 12}

The locator is built during chunking by walking the Markdown heading tree (Markdown headings are uniform across all sources per the Body normalization section above). Page numbers come from the PDF normalizer where available; section IDs come from structured XML sources.

Every search/fetch tool returns a citation block alongside content:

class RetrievalResult(BaseModel):
id: str # 'crs:R48481'
snippet: str # truncated text shown in tool output
citation: Citation # always present
class Citation(BaseModel):
title: str
organization: str # source_org
url: str # source_url
published_at: date | None
citation_string: str # the formatted citation
locator: ChunkLocator | None # heading path + page when chunk-scoped

Agents drop citation.citation_string directly into the document they're producing. citation.url lets the user click through. citation.locator is what makes "as the GAO report's Recommendations section notes…" possible without a re-fetch.

FieldStagePopulated by
source_urlparseper-source parser (constructs from natural key)
source_orgparseper-source parser (often constant per source, sometimes from agency lookup)
published_atparseper-source parser (extracts from body or list-endpoint metadata)
retrieved_atfetchrunner (default value strftime('now') at row insert)
raw_sha256fetchrunner (hashes payload as it writes to disk)
citation_stringparseper-source citation_for(record) formatter
chunk_locator_jsonchunkingchunker (builds from Markdown heading walk)

All citation data is pre-populated at ingestion. Nothing is computed at query time. The retrieval API is a thin SQL projection — fast and predictable.

These are part of the first migration, alongside the substrate tables. They're read by the admin UI in josh-web via josh-core's API. SQLite-flavored DDL — illustrative only; the canonical version lives in shared/josh_substrate/.../migrations/versions/0001_state_tables.py (to be re-authored on the SQLite foundation).

-- One row per ingestion run (per source per run)
CREATE TABLE ingestion_runs (
id TEXT PRIMARY KEY, -- UUID generated by the application
source TEXT NOT NULL, -- 'federal_register'
mode TEXT NOT NULL, -- 'incremental' | 'backfill' | 'manual'
status TEXT NOT NULL, -- 'running' | 'success' | 'partial' | 'failed' | 'cancelled'
started_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now')),
finished_at TEXT,
-- Counts, written incrementally as the run progresses
discovered_count INTEGER NOT NULL DEFAULT 0,
fetched_count INTEGER NOT NULL DEFAULT 0,
parsed_count INTEGER NOT NULL DEFAULT 0,
inserted_count INTEGER NOT NULL DEFAULT 0,
updated_count INTEGER NOT NULL DEFAULT 0,
skipped_count INTEGER NOT NULL DEFAULT 0,
error_count INTEGER NOT NULL DEFAULT 0,
-- Run params (for re-runnability)
config TEXT NOT NULL DEFAULT '{}', -- JSON, validated at app layer
error_summary TEXT -- final error if status='failed'
);
CREATE INDEX ingestion_runs_source_started ON ingestion_runs (source, started_at DESC);
CREATE INDEX ingestion_runs_status ON ingestion_runs (status, started_at DESC) WHERE status IN ('running', 'failed');
-- Run logs — one row per significant log event. Capped via rotation.
CREATE TABLE ingestion_logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
run_id TEXT NOT NULL REFERENCES ingestion_runs(id) ON DELETE CASCADE,
logged_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now')),
level TEXT NOT NULL, -- 'debug' | 'info' | 'warn' | 'error'
message TEXT NOT NULL,
context TEXT -- JSON
);
CREATE INDEX ingestion_logs_run ON ingestion_logs (run_id, logged_at);
CREATE INDEX ingestion_logs_errors ON ingestion_logs (run_id, logged_at) WHERE level = 'error';
-- Per-source watermarks, persisted across runs
CREATE TABLE ingestion_source_state (
source TEXT PRIMARY KEY,
last_run_at TEXT,
last_success_at TEXT,
last_seen_key TEXT, -- source-specific cursor
custom TEXT NOT NULL DEFAULT '{}' -- JSON
);
-- Per-task progress within a run — used to make resumable backfills
CREATE TABLE ingestion_tasks (
id INTEGER PRIMARY KEY AUTOINCREMENT,
run_id TEXT NOT NULL REFERENCES ingestion_runs(id) ON DELETE CASCADE,
source TEXT NOT NULL,
task_key TEXT NOT NULL, -- the FetchTask.key
stage TEXT NOT NULL, -- 'discovered' | 'fetched' | 'parsed' | 'loaded' | 'errored'
started_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now')),
finished_at TEXT,
error TEXT,
UNIQUE (run_id, task_key)
);
CREATE INDEX ingestion_tasks_run ON ingestion_tasks (run_id, stage);

Notes on the SQLite shape:

  • Timestamps as ISO-8601 TEXT, not native datetime — SQLite has no timestamp type. We use strftime('%Y-%m-%dT%H:%M:%fZ','now') for UTC ms-precision strings; lexicographic ordering matches chronological ordering.
  • UUIDs as TEXT, generated app-side (Python uuid.uuid4()).
  • JSON stored as TEXT, validated by Pydantic at the app boundary. SQLite's JSON1 functions (json_extract, etc.) work over TEXT columns.
  • Foreign keys require PRAGMA foreign_keys = ON at connection setup — they're off by default in SQLite. Set in the Alembic env and the app DB session helpers.

The runner writes to these tables transactionally (BEGIN IMMEDIATE). The admin UI in josh-web reads them through josh-core REST endpoints (e.g., GET /admin/ingestion/runs?source=bills&limit=20).

State table retention: keep all ingestion_runs and ingestion_source_state forever (small, useful as audit trail). Rotate ingestion_logs and ingestion_tasks after 30 days (delete-by-date job).

Within a source: bounded parallel workers in the fetch stage (HTTP-bound, fine to parallelize), sequential parse, batched load.

# Inside runner.py — simplified
async def run_source(source: Source, mode: str, http: HttpClient, db: AsyncConnection):
run_id = await create_run(db, source.name, mode)
state = await load_source_state(db, source.name)
sem = asyncio.Semaphore(source.fetch_concurrency or 4)
async def fetch_with_limit(task):
async with sem:
return task, await source.fetch(task, http)
tasks = source.discover(state, http)
fetch_coros = [fetch_with_limit(t) async for t in tasks]
for fut in asyncio.as_completed(fetch_coros):
task, raw_path = await fut
async for record in source.parse(raw_path):
await load_batch.add(record) # accumulator commits at batch size
await mark_task_loaded(db, run_id, task.key)
stats = await load_batch.flush()
await finalize_run(db, run_id, stats)
await update_source_state(db, source.name, state)

Real implementation will be more careful (error handling, partial failures, structured logging at each stage), but the topology is "fetch is concurrent, parse and load are pipelined per task."

Across sources: sources run in their own asyncio task; the scheduler kicks them off independently. Within one ingester process, only one source's load stage writes to SQLite at a time — the runner serializes write transactions per source. josh-core reads concurrently with the writer (WAL).

Single source running twice at once: prevented by a file-system advisory lock acquired at run-start (fcntl.flock on /data/locks/ingest-{source}.lock). If the lock is held, the second invocation logs and exits. (We previously used a Postgres pg_try_advisory_lock; flock is the SQLite-equivalent.)

apscheduler.AsyncIOScheduler running inside the ingester container. ~30 lines of wiring:

scheduler.py
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.cron import CronTrigger
async def run_ingester():
sources = discover_sources() # walks ingester/sources/ for modules
scheduler = AsyncIOScheduler(timezone="America/New_York")
for source in sources:
if source.schedule: # None means manual-only
scheduler.add_job(
lambda s=source: run_source(s, mode="incremental"),
CronTrigger.from_crontab(source.schedule),
id=source.name,
max_instances=1, # belt-and-braces — scheduler-level guard
misfire_grace_time=300,
)
scheduler.start()
await asyncio.Event().wait() # block forever

Schedules live as a string on the source itself:

ingester/sources/federal_register.py
class FederalRegister:
name = "federal_register"
schedule = "10 14 * * 1-5" # 10am ET = 14:00 UTC, weekdays — FR publication time
depends_on = []
...
# ingester/sources/bills.py
class Bills:
name = "bills"
schedule = "15 * * * *" # hourly at :15 — RSS-driven incremental
depends_on = ["legislators", "committees"]
...

Schedule changes go through git → Kamal deploy → ingester container restart, which is fine.

Bootstrap (one-time, manual, ordered):

Terminal window
# Inside the ingester container, or via `kamal app exec --reuse` from the Mac:
josh-ingester bootstrap # walks all sources, ordered by depends_on, full backfill
josh-ingester backfill federal_register --since=2020-01-01
josh-ingester backfill bills --congresses=119

bootstrap is just a topological sort over depends_on followed by backfill for each source.

Incremental (scheduled, automatic):

Triggered by the scheduler at each source's cron expression. Reads last_run_at and last_seen_key from ingestion_source_state, asks the source to discover what's new since then, fetches, parses, loads, updates the watermarks. Same code path as backfill, just with a tighter date window.

Both paths use run_source(source, mode) — only the watermark behavior differs.

Vector rows aren't auto-populated; we have to compute them. Embedding is a separate stage that runs after load:

  • For each newly loaded record with body text, chunk and embed asynchronously.
  • Embedding has its own queue table (ingestion_embedding_queue) and its own worker pool inside the ingester.
  • The chunker/embedder uses Snowflake Arctic-Embed-M-v2 (1024-dim, MIT license, runs on CPU). Locked OSS-only — no third-party API. See Architecture, Storage stack.
  • Failure to embed is non-fatal — the record is loaded; the embedding row gets retried.

Vector storage uses sqlite-vec's vec0 virtual tables. Today: brute-force scan of binary-quantized vectors with full-precision rescore on top-K. Migration to vec1 (IVFADC+OPQ) is planned when vec1 cuts a first release; that's a CREATE TABLE … vec1(…) swap and a one-time backfill, not a schema redesign at the substrate level.

Full schema for the embedding queue is part of the first migration:

CREATE TABLE ingestion_embedding_queue (
id INTEGER PRIMARY KEY AUTOINCREMENT,
table_name TEXT NOT NULL, -- 'fr_documents', 'bill_text_versions', ...
record_id TEXT NOT NULL,
enqueued_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now')),
attempts INTEGER NOT NULL DEFAULT 0,
last_attempt_at TEXT,
last_error TEXT,
UNIQUE (table_name, record_id)
);
CREATE INDEX ingestion_embedding_queue_pending ON ingestion_embedding_queue (id) WHERE attempts < 5;

Why split: embedding is the slowest, most expensive, and most failure-prone stage. Decoupling means a flaky embedding API doesn't block ingestion of metadata. The agent can search by metadata and FTS5 full-text the moment a record loads; semantic search comes online when the queue catches up.

josh-ingester is the package; python -m ingester is the entrypoint, exposed via a CLI written with typer.

Terminal window
# Run one incremental cycle for one source
josh-ingester run <source>
# Full or partial backfill
josh-ingester backfill <source> [--since=YYYY-MM-DD] [--congresses=119,120]
# Bootstrap a fresh install in dependency order
josh-ingester bootstrap
# Status
josh-ingester status # last run + state per source
josh-ingester status --source=bills # one source detail
# Logs
josh-ingester logs <run-id> # tail
josh-ingester logs --source=bills --tail=100
# Schedule introspection
josh-ingester schedule # show declared schedules per source
# Operations on the embedding queue
josh-ingester embed --workers=4 # drain the queue
josh-ingester embed-status
# Sanity / health
josh-ingester health # DB reachable? watermarks sane?

The CLI is a thin shell over the same code paths the scheduler uses.

ConcernChoiceWhy
HTTPhttpx (async)HTTP/2, modern API, mature. aiohttp is fine but httpx API is cleaner.
HTTP retriestenacityDecorator-based, composable, async-aware.
XML parsinglxmlFast, robust. xmltodict is convenient but slow at our volumes. Some sources will use both.
YAMLpyyamlFor congress-legislators files.
Validationpydantic v2Standard. Shared with josh-core.
SQLiteaiosqlite (driver) + SQLAlchemy 2.0 (ORM/migrations)sqlite+aiosqlite:// driver works with SQLAlchemy async + Alembic. Need extension-loading enabled for sqlite-vec; if aiosqlite's standard build doesn't expose enable_load_extension, fall back to apsw for the connection wrapper.
Vector extensionsqlite-vec (PyPI)Bundles the compiled extension binary; loaded at connection setup via db.load_extension('vec0').
FTSSQLite native FTS5 (no dep)Built into the SQLite library shipped with Python; tokenizers configured per-table.
Schedulerapscheduler (AsyncIOScheduler)In-process, no broker, mature.
LoggingstructlogStructured JSON to stdout + parallel writes to ingestion_logs via a custom processor.
CLItyperFastAPI-style ergonomics, Pydantic-friendly.
Citation extraction (later)eyeciteBuilt for legal citations; covers USC + bill citation patterns.
josh-ingester/
├── Dockerfile
├── pyproject.toml
├── README.md
├── alembic.ini # shared migrations? OR josh-core owns migrations
├── ingester/
│ ├── __init__.py
│ ├── __main__.py # CLI entry
│ ├── cli.py # typer commands
│ ├── runner.py # the run_source orchestrator
│ ├── scheduler.py # APScheduler wiring
│ ├── state.py # ingestion_runs / _logs / _source_state writers
│ ├── http.py # shared httpx client config + retries
│ ├── concurrency.py # Semaphore helpers, advisory locks
│ ├── corpus.py # /data/corpus path conventions
│ ├── embedding.py # the separate embedding worker
│ ├── protocols.py # Source, FetchTask, ParsedRecord, LoadStats
│ └── sources/
│ ├── __init__.py
│ ├── _registry.py # discover_sources()
│ ├── federal_register.py
│ ├── bills.py
│ ├── legislators.py
│ ├── committees.py
│ └── ... # one per source, lazy-added
└── tests/
├── unit/
└── integration/

The alembic question is resolved: migrations live in the shared/josh_substrate/ package, which both josh-core and josh-ingester install. Schema is owned by the substrate package; both services consume the same Pydantic models and reach the same migrations via josh_substrate:migrations in their alembic.ini. See repo-structure.html for the layout.

The full checklist for "I want to add committee_reports ingestion":

  1. Add the schema. Edit docs/sources/committee-reports.md with the field map, source URLs, etc. Add to docs/josh-data-sources.md inventory.
  2. Add migration. New Alembic migration in shared/josh_substrate/src/josh_substrate/migrations/versions/ for the committee_reports table + indexes.
  3. Add source module. josh-ingester/ingester/sources/committee_reports.py implementing the Source protocol.
  4. Add to registry. _registry.py picks up modules automatically by walking sources/ — no manual registration needed.
  5. Pick a schedule. Add the cron string to the source class.
  6. Test locally. Run the ingester directly against a throwaway SQLite file: SUBSTRATE_DB_PATH=/tmp/josh-dev.db python -m ingester backfill committee_reports --since=2024-01-01. (Or against a staging server: kamal app exec --reuse 'josh-ingester backfill committee_reports --since=2024-01-01'.)
  7. Deploy. kamal deploy for the ingester (assumes ingester is its own Kamal-managed service; see "Deployment shape" below).
  8. Manual bootstrap of historicals. SSH to server, kamal app exec --reuse 'josh-ingester backfill committee_reports'.

josh-web adds pages under /admin/ingestion/:

  • /admin/ingestion/ — overview: per-source last-run status, time since last success, error counts
  • /admin/ingestion/sources/{name} — source detail: schedule, recent runs, current state
  • /admin/ingestion/runs/{run_id} — run detail: timeline, logs, errors
  • /admin/ingestion/embedding — embedding queue depth, throughput

These pages call josh-core REST endpoints; josh-core reads the state tables. The ingester does not expose its own HTTP server. It's a worker only. Keeps the ingester simple and avoids two services owning the same admin surface.

josh-core admin endpoints (sketch):

GET /api/admin/ingestion/sources list of sources + status summary
GET /api/admin/ingestion/sources/{name} one source: state + recent runs
GET /api/admin/ingestion/runs paginated run list
GET /api/admin/ingestion/runs/{run_id} run detail incl. counts
GET /api/admin/ingestion/runs/{run_id}/logs paginated logs for run
POST /api/admin/ingestion/sources/{name}/run trigger an incremental run
POST /api/admin/ingestion/sources/{name}/backfill trigger backfill (admin-only)
GET /api/admin/ingestion/embedding/queue queue depth, top errors

Triggering runs from the UI: the API enqueues an instruction via a small Postgres ingestion_commands table that the ingester polls (or via LISTEN/NOTIFY for low latency). Out of v1 scope — at v1 the UI is read-only; manual triggers are CLI-only. Add interactive trigger in v1.x.

Ingester is a separate Kamal service (service: josh-ingester in its own config/deploy.yml). Same Postgres accessory as josh-core (or rather: the accessory belongs to josh-core's deploy config; josh-ingester just connects to it via the Kamal Docker network at josh-core-postgres:5432).

The /data/corpus/ host directory is a bind mount in the ingester container. When we add a josh-mirror (later — to publish snapshots for OSS users), it serves from the same /data/corpus/.

Open: do we run the ingester as a single replica with the scheduler loop, or as N replicas with the scheduler in one and N-1 doing fetch work? v1 = single replica. APScheduler has multi-instance modes but we don't need them. Scale up only when one container can't keep up.

FailureEffectRecovery
HTTP 429/503 from upstreamSource's per-task tenacity retries; if exhausted, task marked errored, run continuesRetried on next scheduled run
Malformed XML / parse errorTask logged with error context, run continuesFix parser, re-parse from /data/corpus/ (no re-fetch)
FK violation on loadBatch fails; records marked errored, run continuesInvestigate; usually means a companion source needs refresh
SQLITE_BUSY after busy_timeoutWrite transaction aborts, record erroredReduce concurrent-write surface (only one writer per process); retry on next batch
SQLite file corruption (rare; usually disk failure)All ops fail with database is malformedRestore from Litestream — point-in-time recovery
Ingester container crashes mid-runRun row left in running status; on restart, runner does a janitor pass to mark runningfailed for stale rowsClean state recovery
Stuck flock (e.g., crashed run before unlock)New runs of same source blockedflock auto-releases on process exit; if it doesn't, rm /data/locks/ingest-{source}.lock
Disk full on /data/corpus or /data/josh.dbFetch fails with ENOSPC, run marked failed; SQLite write fails with database or disk is fullIncrease volume, retry. We monitor disk usage as part of health
Two ingester containers accidentally runningflock prevents double-execution per source; both will compete on the lock and one winsKill the duplicate

Each source module has:

  • Unit tests for the parser — fixed-fixture XML/JSON files, parser output asserted exactly. No network. Fastest.
  • Integration tests for the full pipeline — fixture files in tests/fixtures/, ephemeral Postgres (testcontainers or pg_temp schema), discover-fetch-parse-load runs end-to-end.
  • Smoke tests for the live API — opt-in (env flag), hit one real call to confirm we haven't broken contract.

CI runs unit + integration. Smoke tests run nightly against the live ingester.

These don't block writing the framework but should be resolved before each becomes load-bearing:

  1. Schema ownership between josh-core and josh-ingesterresolved. Option (a) — shared internal package shared/josh_substrate/. Both services install it. See repo-structure.html.
  2. Embedding provider abstractionresolved. Locked OSS-only on Snowflake Arctic-Embed-M-v2 (1024-dim, MIT license, runs on CPU). No third-party API. Loaded locally by the embedding worker. See Architecture, Storage stack.
  3. Backup of /data/corpus. It's on the 100 GB volume. Daily snapshot to Hetzner Storage Box, or rebuild-from-source on disaster? Rebuild is technically free but takes days; snapshots are a few GB/day after compression. Lean: snapshot.
  4. Idempotency of upsert under partial-row schema changes. When we add a column, do we want existing rows to update with the default, or stay stale? Likely fine; flag if we hit an edge case.
  5. Rate limit etiquette per source. Shared HTTP client has a global default; should each source override? Probably yes — Congress.gov is 5K/hr (real key); GovInfo is unstated; FR is ~1-2 req/sec. Per-source token bucket inside the shared client.
  6. Citation extraction lifecycle. Inline at parse time (slow ingestion, eager graph) or batch post-load worker (fast ingestion, lagging graph)? Lean: batch post-load, same shape as embedding queue.