Ingestion architecture
The design for josh-ingester — how Josh pulls public federal data into the substrate SQLite file on a schedule.
This document is the canonical spec. Source modules, the scheduler, the runner, the state tables, and the admin UI's data shape all live downstream of decisions captured here. When this document disagrees with prose elsewhere, this document wins for ingestion concerns.
Storage backend: SQLite + FTS5 +
sqlite-vec(with binary-quantization+rescore today,vec1IVFADC+OPQ when it releases). See the Storage stack section of the architecture doc. Migrations are the source of truth for DDL; SQL examples here are illustrative.
Design intent
Section titled “Design intent”Ingestion is plumbing. The substrate is the load-bearing layer, so the ingester should be boring, observable, restartable, and OSS-friendly. We optimize for:
- Independence. Each data source is a self-contained module. Adding a source adds one file. No source can break another.
- Resumability. Every stage produces a durable artifact (raw files on disk, parsed records in Postgres) so any stage can be re-run without redoing prior work.
- Observability without an external UI. State lives in Postgres tables that
josh-webreads viajosh-coreAPI. We don't ship Airflow or Dagster. - One-process simplicity at v1. A single
josh-ingestercontainer with an async scheduler, no message broker, no Celery. Scale up only when we measure a need. - Deterministic backfill. Backfill is a CLI command, not an orchestrator concern. Bootstrapping a fresh install is a runbook.
Non-goals at v1: cross-source DAGs, real-time streaming, multi-tenant ingestion, complex data lineage tracking.
Why no workflow scheduler
Section titled “Why no workflow scheduler”See migrations.html for the deploy story; the parallel decision for ingestion. The dependency graph across our 30 v1 sources is three tiers, mostly bootstrap-time, not runtime:
- Tier 1 (independent, no FKs to other sources): Federal Register, Legislators, Committees, USC, CFR, CRS, GAO, CBO, Public Laws, SAP, taxonomy.
- Tier 2 (FKs to Tier 1, but only at bootstrap): Bills (→ legislators, committees), Roll call votes, Hearings, LDA filings, Member statements, Committee reports.
- Tier 3 (depends on text from 1+2 being loaded): Citation graph extraction.
After bootstrap, Tier 2 sources run on their own schedules and trust that Tier 1 has been refreshed at least once. No runtime ordering needed. Tier 3 runs as its own scheduled job after all loads complete.
A workflow scheduler (Airflow, Prefect, Dagster) buys us nothing for this shape and costs ~1.5-2 GB RAM, OSS-distribution complexity, and operational surface. We skip it.
Concurrency: shared SQLite file
Section titled “Concurrency: shared SQLite file”The substrate is one SQLite file at /data/josh.db (with josh.db-wal and josh.db-shm companions in WAL mode). Both josh-core (read-mostly, agent and API serving) and josh-ingester (batch writes) bind-mount the same host directory, so they share the file directly.
SQLite's WAL mode allows concurrent readers and exactly one writer at a time. The ingester is structured to make this a non-issue:
- One source at a time per ingester process (the scheduler kicks at most one source per slot, with an advisory file lock for belt-and-braces)
- Writes batched (~500 records per transaction) and committed with
BEGIN IMMEDIATEso the writer claim is acquired up-front rather than upgraded mid-tx PRAGMA busy_timeout = 10000gives readers/writers up to 10 seconds to acquire their lock before erroring- Embedding worker writes go through the same connection pool with the same discipline
Production concurrent-write profile is read-heavy from josh-core, write-bursty from the ingester during scheduled ticks. Profile fits SQLite cleanly. Re-evaluate if a multi-tenant deployment ever needs many concurrent user-driven writes.
High-level layout
Section titled “High-level layout” ┌───────────────────────────┐┌─────────────────┐ │ josh-ingester ││ external │ │ (single container) ││ data sources │ │ ││ │ │ ┌─────────────────────┐ ││ Congress.gov │ ◀── HTTP (httpx) ──── │ │ Source: bills │ ││ federalreg.gov │ │ │ Source: federal_..│ ││ govinfo.gov │ │ │ Source: ... │ ││ lda.senate.gov │ │ └─────────┬───────────┘ ││ ... │ │ │ │└─────────────────┘ │ ▼ │ │ ┌─────────────────────┐ │ /data/corpus/ ◀──────────────── │ │ runner.py │ │ (raw downloads) │ │ (4 stages) │ │ │ └─────────┬───────────┘ │ │ │ │ │ ▼ │ │ ┌─────────────────────┐ │ │ │ scheduler.py │ │ │ │ (APScheduler) │ │ │ └─────────────────────┘ │ │ │ └────────────┬──────────────┘ │ ▼ ┌───────────────────────────────────────┐ │ Postgres │ │ │ │ Substrate tables (fr_documents, │ │ bills, ...) │ │ │ │ State tables: │ │ ingestion_runs │ │ ingestion_logs │ │ ingestion_source_state │ │ ingestion_tasks │ └───────────────────────────────────────┘ ▲ │ reads via REST ┌──────────────────────┴────────────────┐ │ josh-web /admin/ingestion/* │ │ (reads through josh-core API) │ └───────────────────────────────────────┘One container, one process, one async scheduler, many source modules.
The Source protocol
Section titled “The Source protocol”Every source under ingester/sources/ implements the same interface:
from typing import Protocol, AsyncIteratorfrom datetime import datetimefrom pathlib import Path
class FetchTask(BaseModel): """One unit of work for the fetch stage. Source-defined shape inside `params`.""" source: str # 'federal_register' key: str # natural key, e.g. document_number 'fr:2026-08558' params: dict # source-specific (URL, date, doc_id, etc.)
class ParsedRecord(BaseModel): """One parsed record ready for the load stage. Source-defined shape inside `payload`.""" source: str table: str # 'fr_documents', 'bills', etc. key: str # the natural key, used for upsert ON CONFLICT payload: dict # the row to upsert children: list["ParsedRecord"] = [] # related rows in other tables
class SourceState(BaseModel): """Per-source watermarks. Loaded from ingestion_source_state, written back by the runner.""" last_run_at: datetime | None last_seen_key: str | None # source-specific cursor custom: dict # source-defined extra state
class Source(Protocol): name: str # canonical, lowercase, slug-shaped schedule: str | None # cron expression, or None for manual-only depends_on: list[str] # bootstrap order — used by `josh-ingester bootstrap`
async def discover( self, state: SourceState, http: HttpClient ) -> AsyncIterator[FetchTask]: """Yield work units. Cheap. Reads HTTP if needed (e.g., RSS feed), but doesn't fetch big payloads."""
async def fetch(self, task: FetchTask, http: HttpClient) -> Path: """Download to /data/corpus/{name}/.../filename. Idempotent — skip if exists. Returns the on-disk path."""
async def parse(self, raw_path: Path) -> AsyncIterator[ParsedRecord]: """Read the raw artifact, yield validated records. Pure function of bytes on disk — no network, no DB."""
async def load( self, records: AsyncIterator[ParsedRecord], db: AsyncConnection ) -> LoadStats: """Upsert into Postgres. Transaction-per-batch. Returns counts. Failures here surface as run-level errors."""A source module is: one Python file, ~200-500 lines, no shared state with other sources.
The four stages, in detail
Section titled “The four stages, in detail”The runner walks each source through these stages. Each stage's contract is narrow on purpose.
1. Discover
Section titled “1. Discover”Produces FetchTask objects describing what work to do. Doesn't download body content. Reads cheap things like RSS feeds, sitemaps, list endpoints, or the previous run's watermarks.
Examples:
- Federal Register: poll
/documents.json?conditions[publication_date][gte]={last_run}and yield oneFetchTaskperdocument_number. - Bills: read GovInfo BILLSTATUS RSS feed, yield one
FetchTaskper file in the latest batches. - Legislators: yield one
FetchTaskper YAML file (eight tasks, basically static).
The discover step writes nothing to disk and produces no DB rows. It's pure work-list generation.
2. Fetch
Section titled “2. Fetch”Downloads one task's payload to disk under /data/corpus/{source_name}/.... Idempotent — if the file exists with a sane size, skip. Concurrency happens here: the runner spawns N workers (default 4) that pull from the discover queue.
The on-disk path is part of the source's contract, so parse can find it later. Suggested layout:
/data/corpus/├── federal_register/│ ├── metadata/{YYYY}/{MM}/{document_number}.json│ └── bodies/xml/{YYYY}/{MM}/{document_number}.xml├── bills/│ ├── billstatus/{congress}/{type}/BILLSTATUS-{c}{t}{n}.xml│ └── text/{congress}/{type}/BILLS-{c}{t}{n}{version}.xml├── legislators/│ └── {filename}.yaml└── ...Fetch uses a shared HttpClient (httpx async) with retries, polite rate limiting (per-source token bucket), and a real-browser User-Agent.
3. Parse
Section titled “3. Parse”Reads bytes off disk, produces ParsedRecord objects validated by Pydantic models. No network, no DB, no side effects — parse is a pure function of bytes-in to records-out. This makes it trivially re-runnable: change a parser bug, re-parse the existing corpus, re-load.
Each source defines its own Pydantic models per target table. The runner doesn't know about table schemas — it just shuttles records to load.
4. Load
Section titled “4. Load”Upserts records into SQLite in transactions, batched (e.g., 500 at a time). Uses INSERT … ON CONFLICT(natural_key) DO UPDATE SET … (SQLite supports the same ON CONFLICT upsert syntax as Postgres). Records LoadStats (inserted, updated, skipped, errored) per batch.
Index updates happen as part of each transaction:
- FTS5 full-text index is a separate virtual table per searchable parent table (e.g.,
fr_documents_fts). Triggers on the parent keep the FTS5 table in sync on insert/update/delete. The trigger feeds title/abstract/action/body into the FTS5 columns — column weights are applied at query time viabm25(fr_documents_fts, w_title, w_abstract, w_action, w_body). - Vector index (
sqlite-vecvec0virtual table) is NOT auto-populated — embedding is a separate step (see Embedding).
The load stage commits per batch with BEGIN IMMEDIATE. A failure mid-source means earlier batches are durable; the next run picks up from the last last_seen_key watermark.
Body normalization (HTML/XML/PDF → Markdown)
Section titled “Body normalization (HTML/XML/PDF → Markdown)”Every source ships its body content in one of three formats — HTML (CRS, GAO, CBO), XML (Federal Register, USLM for USC and Public Laws, BILLSTATUS, eCFR), or PDF (committee reports sometimes, older hearings, GAO assets, SAPs). The substrate normalizes to a single canonical form: GitHub-flavored Markdown. Markdown is what gets chunked, embedded, indexed in FTS5, and rendered by the agent. The raw bytes stay on disk for re-conversion when libraries improve.
Why Markdown
Section titled “Why Markdown”- Uniform chunker interface. Section-aware chunking walks
#/##/###headings — one implementation works across CRS HTML, FR XML, USC USLM, and PDF committee reports once they're all Markdown. No per-source XPath. - LLM-native. Modern LLMs handle Markdown without prompting tricks. Token cost is meaningfully lower than HTML.
- Renderable everywhere. Agent clients, the eventual agent UI, and CLI output all consume Markdown trivially.
- Diff-friendly. Bill-version comparison, proposed-vs-final rule diffs, etc., work better against Markdown than raw HTML or XML.
Storage shape
Section titled “Storage shape”Raw bytes live as files on disk under /data/corpus/<source>/bodies/raw/... — HTML, XML, PDF, etc. Big binary-ish payloads stay out of the SQLite file so re-conversion is cheap and the DB stays small.
Normalized Markdown lives in the parent table's body_text column — text values that FTS5 can index directly. Storing markdown in a column rather than on disk avoids trigger-driven file reads (FTS5 indexes columns) and keeps queries simple.
/data/corpus/├── federal_register/│ └── bodies/│ └── raw/{YYYY}/{MM}/{document_number}.xml # original FR XML├── crs/│ └── bodies/│ └── raw/{number}.html # original CRS HTML├── public_laws/│ └── bodies/│ └── raw/{congress}/PLAW-{c}publ{n}.xml # USLM└── ...Record tables store the path to the raw payload plus the normalized markdown directly:
-- Applied per record-bearing parent table that has body contentbody_raw_path TEXT, -- /data/corpus/<source>/bodies/raw/...body_raw_format TEXT, -- 'html' | 'xml' | 'pdf' | 'text'body_raw_sha256 TEXT, -- hash of raw bytes (provenance + dedup)body_text TEXT, -- normalized Markdown (FTS5 indexes this)body_text_sha256 TEXT, -- hash of normalized output (re-conversion detection)body_normalized_at TEXT, -- when body_text was generatedbody_normalizer_version TEXT -- semver of converter usedbody_normalizer_version is the key to safe evolution: when we ship a better parser, a migration finds rows where body_normalizer_version < {new_version}, re-reads body_raw_path, runs the new normalizer, and updates body_text + body_text_sha256 + body_normalized_at + body_normalizer_version in one transaction. No re-fetching from upstream APIs.
Why not put markdown on disk too? FTS5 virtual tables index columns, not files. If markdown lived only on disk, the FTS5 sync trigger would have to read files — fragile, slow, and complicates the trigger logic. Putting markdown in the column is simpler and aligns with how SQLite expects to be used. The DB-size cost is small (CRS averages ~50 KB markdown × 22K reports ≈ 1 GB; SQLite handles multi-TB DBs fine).
The Normalizer protocol
Section titled “The Normalizer protocol”class Normalizer(Protocol): name: str # 'html_v1', 'uslm_v1', 'fr_xml_v1', ... version: str # semver — bump on parser change accepts_format: str # 'html' | 'xml' | 'pdf' | 'text'
async def to_markdown( self, raw_path: Path, source_hint: str | None = None ) -> NormalizedBody: ...
class NormalizedBody(BaseModel): markdown: str headings: list[Heading] # extracted heading tree, used by chunker artifacts: list[Artifact] = [] # tables, figures preserved separately if lossy warnings: list[str] = [] # parser-level warningsNormalizers are pure functions of bytes-on-disk to Markdown-on-disk. No network. No DB. Trivially testable from fixtures.
Per-format converters (locked)
Section titled “Per-format converters (locked)”| Format | Converter | Library | Notes |
|---|---|---|---|
| HTML | html_v1 | markdownify primary, trafilatura fallback for messy pages | Bulk of CRS, GAO product pages once we get past the bot wall |
| FR XML | fr_xml_v1 | hand-rolled lxml walker | Preserves <HD> headings, <P> paragraphs, <TABLE> as MD tables |
| USLM | uslm_v1 | hand-rolled lxml walker | Preserves <title> / <chapter> / <section> hierarchy. Used for USC and PLAW |
| eCFR DIV-numbered XML | ecfr_xml_v1 | hand-rolled lxml walker | Similar shape to FR XML; preserves DIV1–DIV8 hierarchy |
| BILLSTATUS XML | metadata-only; bodies via PLAW (USLM) or BILLS (billres DTD) | uslm_v1 for enacted; billres_v1 walker for live | Most BILLSTATUS content is structured fields, not body text |
pdf_v1 | docling (OSS, IBM Research) primary, marker fallback. Mistral OCR if quality dominates and self-host doesn't | Most expensive; runs via separate queue (see below) | |
| Plain text | text_v1 | passthrough with light whitespace cleanup | RSS bodies, simple notice text |
When normalization happens
Section titled “When normalization happens”- HTML and XML normalize inline during parse. The parser already has the DOM in memory; conversion is cheap. The output Markdown gets written into
body_textas part of the load stage's upsert. - PDF normalizes via a separate queue. PDF parsing (especially with OCR) is the most expensive content step. Decoupling means PDF-heavy sources don't block their own metadata ingestion. Records load with
body_text=NULLand a queue row; the normalization worker drains the queue and updatesbody_textlater. Same shape as the embedding queue:
CREATE TABLE ingestion_normalization_queue ( id INTEGER PRIMARY KEY AUTOINCREMENT, table_name TEXT NOT NULL, record_id TEXT NOT NULL, raw_path TEXT NOT NULL, raw_format TEXT NOT NULL, enqueued_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now')), attempts INTEGER NOT NULL DEFAULT 0, last_attempt_at TEXT, last_error TEXT, UNIQUE (table_name, record_id));The agent gets metadata search the moment a record loads; full-text search and semantic search come online when normalization (and then embedding) catches up.
Re-conversion
Section titled “Re-conversion”When uslm_v2 ships, no re-fetch is needed. A migration scans rows with body_normalizer_version < 'uslm_v2', re-reads body_raw_path, runs the new normalizer, and updates body_text + body_text_sha256 + body_normalized_at + body_normalizer_version. Then it enqueues the affected records into ingestion_embedding_queue so chunks get re-embedded. All in one transaction per record.
Citation metadata
Section titled “Citation metadata”Every retrieval Josh hands back to an agent must carry enough provenance to cite the source without a second round-trip. This is non-negotiable for an agentic-first product whose outputs end up in regulatory comments, briefings, and reports. The agent must be able to drop a Bluebook-flavored citation directly into a user's document.
Required citation columns
Section titled “Required citation columns”Every record-bearing parent table — every source — carries the following columns:
source_url TEXT NOT NULL, -- canonical deep link (page a human would visit)source_org TEXT NOT NULL, -- issuing body, e.g. 'Congressional Research Service'published_at TEXT, -- source's publication date (ISO-8601)retrieved_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now')),raw_sha256 TEXT, -- content hash of raw bytes — provenance + tamper-detectioncitation_string TEXT NOT NULL -- pre-formatted citation, source-specificNotes on each:
source_urlis the human-facing URL, not the API endpoint. The EveryCRSReport HTML page, noteverycrsreport.com/reports.csv. The FederalRegister.gov document page, not/api/v1/documents/{id}.json. The lda.gov filing detail page, not/api/v1/filings/{id}/.source_orgmay be normalized via the existingagencieslookup (FR documents reference EPA, FDA, etc.); for sources where the org is constant or doesn't fit the agency table, store an explicit string ('Congressional Research Service', 'Government Accountability Office', 'Senate of the United States').published_atsemantics vary per source — document the exact field per source (FRpublication_date, CRSlatestPubDate, committee reportissue_date, GAO RSSpubDate, etc.). ISO-8601 TEXT.raw_sha256lets us prove no tampering between fetch and serve. Computed in the fetch stage as bytes hit disk. Cheap.citation_stringis the formatted citation an agent can drop into a deliverable as-is. Per-source format below.
Per-source citation formatters
Section titled “Per-source citation formatters”A small module per source defines citation_for(record) -> str. Bluebook-flavored but pragmatic — Josh isn't a legal product and we shouldn't pretend to be.
| Source | Format example |
|---|---|
| CRS report | Cong. Rsch. Serv., R48481, Wildfire Emergency Spending (Mar. 15, 2025). |
| FR document | 91 Fed. Reg. 12345 (Mar. 4, 2026). |
| Public Law | Pub. L. No. 119-21 (2026). |
| US Code section | 42 U.S.C. § 1396a (2024). |
| GAO report | U.S. Gov't Accountability Off., GAO-25-107521, Title (Apr. 2025). |
| Committee report | H.R. Rep. No. 119-100 (2025). |
| Bill | H.R. 1234, 119th Cong. (2025). |
| Hearing transcript (CHRG) | Hearing Title: Hearing Before the H. Comm. on X, 119th Cong. (2025). |
| Roll call vote | Roll Call Vote No. 362 (House), 119th Cong., 1st Sess. (Mar. 4, 2025). |
| LDA filing | Lobbying Disclosure Act Filing {filing_uuid}, {client} via {registrant} (Q3 2025). |
| SAP | Statement of Administration Policy on H.R. 1234 (Jan. 22, 2025). |
| Congressional Record | 171 Cong. Rec. H1234 (daily ed. Mar. 4, 2025) (statement of Rep. X). |
Formatters live in shared/josh_substrate/src/josh_substrate/citations/formatters/<source>.py. Both josh-ingester (populates citation_string at load time) and josh-core (re-formats when needed) share them.
Chunk-level locator
Section titled “Chunk-level locator”Search retrieval returns chunks, not whole records. Every <source>_chunks table carries a locator that lets the citation include the exact place the snippet came from:
chunk_locator_json TEXT -- {"heading_path": ["Background", "Statutory Authority"], -- "section": "II.B", "page": 12}The locator is built during chunking by walking the Markdown heading tree (Markdown headings are uniform across all sources per the Body normalization section above). Page numbers come from the PDF normalizer where available; section IDs come from structured XML sources.
Retrieval-API surface
Section titled “Retrieval-API surface”Every search/fetch tool returns a citation block alongside content:
class RetrievalResult(BaseModel): id: str # 'crs:R48481' snippet: str # truncated text shown in tool output citation: Citation # always present
class Citation(BaseModel): title: str organization: str # source_org url: str # source_url published_at: date | None citation_string: str # the formatted citation locator: ChunkLocator | None # heading path + page when chunk-scopedAgents drop citation.citation_string directly into the document they're producing. citation.url lets the user click through. citation.locator is what makes "as the GAO report's Recommendations section notes…" possible without a re-fetch.
When each field is populated
Section titled “When each field is populated”| Field | Stage | Populated by |
|---|---|---|
source_url | parse | per-source parser (constructs from natural key) |
source_org | parse | per-source parser (often constant per source, sometimes from agency lookup) |
published_at | parse | per-source parser (extracts from body or list-endpoint metadata) |
retrieved_at | fetch | runner (default value strftime('now') at row insert) |
raw_sha256 | fetch | runner (hashes payload as it writes to disk) |
citation_string | parse | per-source citation_for(record) formatter |
chunk_locator_json | chunking | chunker (builds from Markdown heading walk) |
All citation data is pre-populated at ingestion. Nothing is computed at query time. The retrieval API is a thin SQL projection — fast and predictable.
State tables
Section titled “State tables”These are part of the first migration, alongside the substrate tables. They're read by the admin UI in josh-web via josh-core's API. SQLite-flavored DDL — illustrative only; the canonical version lives in shared/josh_substrate/.../migrations/versions/0001_state_tables.py (to be re-authored on the SQLite foundation).
-- One row per ingestion run (per source per run)CREATE TABLE ingestion_runs ( id TEXT PRIMARY KEY, -- UUID generated by the application source TEXT NOT NULL, -- 'federal_register' mode TEXT NOT NULL, -- 'incremental' | 'backfill' | 'manual' status TEXT NOT NULL, -- 'running' | 'success' | 'partial' | 'failed' | 'cancelled' started_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now')), finished_at TEXT,
-- Counts, written incrementally as the run progresses discovered_count INTEGER NOT NULL DEFAULT 0, fetched_count INTEGER NOT NULL DEFAULT 0, parsed_count INTEGER NOT NULL DEFAULT 0, inserted_count INTEGER NOT NULL DEFAULT 0, updated_count INTEGER NOT NULL DEFAULT 0, skipped_count INTEGER NOT NULL DEFAULT 0, error_count INTEGER NOT NULL DEFAULT 0,
-- Run params (for re-runnability) config TEXT NOT NULL DEFAULT '{}', -- JSON, validated at app layer error_summary TEXT -- final error if status='failed');CREATE INDEX ingestion_runs_source_started ON ingestion_runs (source, started_at DESC);CREATE INDEX ingestion_runs_status ON ingestion_runs (status, started_at DESC) WHERE status IN ('running', 'failed');
-- Run logs — one row per significant log event. Capped via rotation.CREATE TABLE ingestion_logs ( id INTEGER PRIMARY KEY AUTOINCREMENT, run_id TEXT NOT NULL REFERENCES ingestion_runs(id) ON DELETE CASCADE, logged_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now')), level TEXT NOT NULL, -- 'debug' | 'info' | 'warn' | 'error' message TEXT NOT NULL, context TEXT -- JSON);CREATE INDEX ingestion_logs_run ON ingestion_logs (run_id, logged_at);CREATE INDEX ingestion_logs_errors ON ingestion_logs (run_id, logged_at) WHERE level = 'error';
-- Per-source watermarks, persisted across runsCREATE TABLE ingestion_source_state ( source TEXT PRIMARY KEY, last_run_at TEXT, last_success_at TEXT, last_seen_key TEXT, -- source-specific cursor custom TEXT NOT NULL DEFAULT '{}' -- JSON);
-- Per-task progress within a run — used to make resumable backfillsCREATE TABLE ingestion_tasks ( id INTEGER PRIMARY KEY AUTOINCREMENT, run_id TEXT NOT NULL REFERENCES ingestion_runs(id) ON DELETE CASCADE, source TEXT NOT NULL, task_key TEXT NOT NULL, -- the FetchTask.key stage TEXT NOT NULL, -- 'discovered' | 'fetched' | 'parsed' | 'loaded' | 'errored' started_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now')), finished_at TEXT, error TEXT, UNIQUE (run_id, task_key));CREATE INDEX ingestion_tasks_run ON ingestion_tasks (run_id, stage);Notes on the SQLite shape:
- Timestamps as ISO-8601 TEXT, not native datetime — SQLite has no
timestamptype. We usestrftime('%Y-%m-%dT%H:%M:%fZ','now')for UTC ms-precision strings; lexicographic ordering matches chronological ordering. - UUIDs as TEXT, generated app-side (Python
uuid.uuid4()). - JSON stored as TEXT, validated by Pydantic at the app boundary. SQLite's JSON1 functions (
json_extract, etc.) work over TEXT columns. - Foreign keys require
PRAGMA foreign_keys = ONat connection setup — they're off by default in SQLite. Set in the Alembic env and the app DB session helpers.
The runner writes to these tables transactionally (BEGIN IMMEDIATE). The admin UI in josh-web reads them through josh-core REST endpoints (e.g., GET /admin/ingestion/runs?source=bills&limit=20).
State table retention: keep all ingestion_runs and ingestion_source_state forever (small, useful as audit trail). Rotate ingestion_logs and ingestion_tasks after 30 days (delete-by-date job).
Concurrency model
Section titled “Concurrency model”Within a source: bounded parallel workers in the fetch stage (HTTP-bound, fine to parallelize), sequential parse, batched load.
# Inside runner.py — simplifiedasync def run_source(source: Source, mode: str, http: HttpClient, db: AsyncConnection): run_id = await create_run(db, source.name, mode) state = await load_source_state(db, source.name)
sem = asyncio.Semaphore(source.fetch_concurrency or 4)
async def fetch_with_limit(task): async with sem: return task, await source.fetch(task, http)
tasks = source.discover(state, http) fetch_coros = [fetch_with_limit(t) async for t in tasks]
for fut in asyncio.as_completed(fetch_coros): task, raw_path = await fut async for record in source.parse(raw_path): await load_batch.add(record) # accumulator commits at batch size await mark_task_loaded(db, run_id, task.key)
stats = await load_batch.flush() await finalize_run(db, run_id, stats) await update_source_state(db, source.name, state)Real implementation will be more careful (error handling, partial failures, structured logging at each stage), but the topology is "fetch is concurrent, parse and load are pipelined per task."
Across sources: sources run in their own asyncio task; the scheduler kicks them off independently. Within one ingester process, only one source's load stage writes to SQLite at a time — the runner serializes write transactions per source. josh-core reads concurrently with the writer (WAL).
Single source running twice at once: prevented by a file-system advisory lock acquired at run-start (fcntl.flock on /data/locks/ingest-{source}.lock). If the lock is held, the second invocation logs and exits. (We previously used a Postgres pg_try_advisory_lock; flock is the SQLite-equivalent.)
Scheduler
Section titled “Scheduler”apscheduler.AsyncIOScheduler running inside the ingester container. ~30 lines of wiring:
from apscheduler.schedulers.asyncio import AsyncIOSchedulerfrom apscheduler.triggers.cron import CronTrigger
async def run_ingester(): sources = discover_sources() # walks ingester/sources/ for modules scheduler = AsyncIOScheduler(timezone="America/New_York")
for source in sources: if source.schedule: # None means manual-only scheduler.add_job( lambda s=source: run_source(s, mode="incremental"), CronTrigger.from_crontab(source.schedule), id=source.name, max_instances=1, # belt-and-braces — scheduler-level guard misfire_grace_time=300, )
scheduler.start() await asyncio.Event().wait() # block foreverSchedules live as a string on the source itself:
class FederalRegister: name = "federal_register" schedule = "10 14 * * 1-5" # 10am ET = 14:00 UTC, weekdays — FR publication time depends_on = [] ...
# ingester/sources/bills.pyclass Bills: name = "bills" schedule = "15 * * * *" # hourly at :15 — RSS-driven incremental depends_on = ["legislators", "committees"] ...Schedule changes go through git → Kamal deploy → ingester container restart, which is fine.
Bootstrap vs incremental
Section titled “Bootstrap vs incremental”Bootstrap (one-time, manual, ordered):
# Inside the ingester container, or via `kamal app exec --reuse` from the Mac:josh-ingester bootstrap # walks all sources, ordered by depends_on, full backfilljosh-ingester backfill federal_register --since=2020-01-01josh-ingester backfill bills --congresses=119bootstrap is just a topological sort over depends_on followed by backfill for each source.
Incremental (scheduled, automatic):
Triggered by the scheduler at each source's cron expression. Reads last_run_at and last_seen_key from ingestion_source_state, asks the source to discover what's new since then, fetches, parses, loads, updates the watermarks. Same code path as backfill, just with a tighter date window.
Both paths use run_source(source, mode) — only the watermark behavior differs.
Embedding
Section titled “Embedding”Vector rows aren't auto-populated; we have to compute them. Embedding is a separate stage that runs after load:
- For each newly loaded record with body text, chunk and embed asynchronously.
- Embedding has its own queue table (
ingestion_embedding_queue) and its own worker pool inside the ingester. - The chunker/embedder uses Snowflake Arctic-Embed-M-v2 (1024-dim, MIT license, runs on CPU). Locked OSS-only — no third-party API. See Architecture, Storage stack.
- Failure to embed is non-fatal — the record is loaded; the embedding row gets retried.
Vector storage uses sqlite-vec's vec0 virtual tables. Today: brute-force scan of binary-quantized vectors with full-precision rescore on top-K. Migration to vec1 (IVFADC+OPQ) is planned when vec1 cuts a first release; that's a CREATE TABLE … vec1(…) swap and a one-time backfill, not a schema redesign at the substrate level.
Full schema for the embedding queue is part of the first migration:
CREATE TABLE ingestion_embedding_queue ( id INTEGER PRIMARY KEY AUTOINCREMENT, table_name TEXT NOT NULL, -- 'fr_documents', 'bill_text_versions', ... record_id TEXT NOT NULL, enqueued_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now')), attempts INTEGER NOT NULL DEFAULT 0, last_attempt_at TEXT, last_error TEXT, UNIQUE (table_name, record_id));CREATE INDEX ingestion_embedding_queue_pending ON ingestion_embedding_queue (id) WHERE attempts < 5;Why split: embedding is the slowest, most expensive, and most failure-prone stage. Decoupling means a flaky embedding API doesn't block ingestion of metadata. The agent can search by metadata and FTS5 full-text the moment a record loads; semantic search comes online when the queue catches up.
CLI shape
Section titled “CLI shape”josh-ingester is the package; python -m ingester is the entrypoint, exposed via a CLI written with typer.
# Run one incremental cycle for one sourcejosh-ingester run <source>
# Full or partial backfilljosh-ingester backfill <source> [--since=YYYY-MM-DD] [--congresses=119,120]
# Bootstrap a fresh install in dependency orderjosh-ingester bootstrap
# Statusjosh-ingester status # last run + state per sourcejosh-ingester status --source=bills # one source detail
# Logsjosh-ingester logs <run-id> # tailjosh-ingester logs --source=bills --tail=100
# Schedule introspectionjosh-ingester schedule # show declared schedules per source
# Operations on the embedding queuejosh-ingester embed --workers=4 # drain the queuejosh-ingester embed-status
# Sanity / healthjosh-ingester health # DB reachable? watermarks sane?The CLI is a thin shell over the same code paths the scheduler uses.
Library choices (locked)
Section titled “Library choices (locked)”| Concern | Choice | Why |
|---|---|---|
| HTTP | httpx (async) | HTTP/2, modern API, mature. aiohttp is fine but httpx API is cleaner. |
| HTTP retries | tenacity | Decorator-based, composable, async-aware. |
| XML parsing | lxml | Fast, robust. xmltodict is convenient but slow at our volumes. Some sources will use both. |
| YAML | pyyaml | For congress-legislators files. |
| Validation | pydantic v2 | Standard. Shared with josh-core. |
| SQLite | aiosqlite (driver) + SQLAlchemy 2.0 (ORM/migrations) | sqlite+aiosqlite:// driver works with SQLAlchemy async + Alembic. Need extension-loading enabled for sqlite-vec; if aiosqlite's standard build doesn't expose enable_load_extension, fall back to apsw for the connection wrapper. |
| Vector extension | sqlite-vec (PyPI) | Bundles the compiled extension binary; loaded at connection setup via db.load_extension('vec0'). |
| FTS | SQLite native FTS5 (no dep) | Built into the SQLite library shipped with Python; tokenizers configured per-table. |
| Scheduler | apscheduler (AsyncIOScheduler) | In-process, no broker, mature. |
| Logging | structlog | Structured JSON to stdout + parallel writes to ingestion_logs via a custom processor. |
| CLI | typer | FastAPI-style ergonomics, Pydantic-friendly. |
| Citation extraction (later) | eyecite | Built for legal citations; covers USC + bill citation patterns. |
File structure
Section titled “File structure”josh-ingester/├── Dockerfile├── pyproject.toml├── README.md├── alembic.ini # shared migrations? OR josh-core owns migrations├── ingester/│ ├── __init__.py│ ├── __main__.py # CLI entry│ ├── cli.py # typer commands│ ├── runner.py # the run_source orchestrator│ ├── scheduler.py # APScheduler wiring│ ├── state.py # ingestion_runs / _logs / _source_state writers│ ├── http.py # shared httpx client config + retries│ ├── concurrency.py # Semaphore helpers, advisory locks│ ├── corpus.py # /data/corpus path conventions│ ├── embedding.py # the separate embedding worker│ ├── protocols.py # Source, FetchTask, ParsedRecord, LoadStats│ └── sources/│ ├── __init__.py│ ├── _registry.py # discover_sources()│ ├── federal_register.py│ ├── bills.py│ ├── legislators.py│ ├── committees.py│ └── ... # one per source, lazy-added└── tests/ ├── unit/ └── integration/The alembic question is resolved: migrations live in the shared/josh_substrate/ package, which both josh-core and josh-ingester install. Schema is owned by the substrate package; both services consume the same Pydantic models and reach the same migrations via josh_substrate:migrations in their alembic.ini. See repo-structure.html for the layout.
Adding a new source
Section titled “Adding a new source”The full checklist for "I want to add committee_reports ingestion":
- Add the schema. Edit
docs/sources/committee-reports.mdwith the field map, source URLs, etc. Add todocs/josh-data-sources.mdinventory. - Add migration. New Alembic migration in
shared/josh_substrate/src/josh_substrate/migrations/versions/for thecommittee_reportstable + indexes. - Add source module.
josh-ingester/ingester/sources/committee_reports.pyimplementing theSourceprotocol. - Add to registry.
_registry.pypicks up modules automatically by walkingsources/— no manual registration needed. - Pick a schedule. Add the cron string to the source class.
- Test locally. Run the ingester directly against a throwaway SQLite file:
SUBSTRATE_DB_PATH=/tmp/josh-dev.db python -m ingester backfill committee_reports --since=2024-01-01. (Or against a staging server:kamal app exec --reuse 'josh-ingester backfill committee_reports --since=2024-01-01'.) - Deploy.
kamal deployfor the ingester (assumes ingester is its own Kamal-managed service; see "Deployment shape" below). - Manual bootstrap of historicals. SSH to server,
kamal app exec --reuse 'josh-ingester backfill committee_reports'.
Admin UI integration
Section titled “Admin UI integration”josh-web adds pages under /admin/ingestion/:
/admin/ingestion/— overview: per-source last-run status, time since last success, error counts/admin/ingestion/sources/{name}— source detail: schedule, recent runs, current state/admin/ingestion/runs/{run_id}— run detail: timeline, logs, errors/admin/ingestion/embedding— embedding queue depth, throughput
These pages call josh-core REST endpoints; josh-core reads the state tables. The ingester does not expose its own HTTP server. It's a worker only. Keeps the ingester simple and avoids two services owning the same admin surface.
josh-core admin endpoints (sketch):
GET /api/admin/ingestion/sources list of sources + status summaryGET /api/admin/ingestion/sources/{name} one source: state + recent runsGET /api/admin/ingestion/runs paginated run listGET /api/admin/ingestion/runs/{run_id} run detail incl. countsGET /api/admin/ingestion/runs/{run_id}/logs paginated logs for runPOST /api/admin/ingestion/sources/{name}/run trigger an incremental runPOST /api/admin/ingestion/sources/{name}/backfill trigger backfill (admin-only)GET /api/admin/ingestion/embedding/queue queue depth, top errorsTriggering runs from the UI: the API enqueues an instruction via a small Postgres ingestion_commands table that the ingester polls (or via LISTEN/NOTIFY for low latency). Out of v1 scope — at v1 the UI is read-only; manual triggers are CLI-only. Add interactive trigger in v1.x.
Deployment shape
Section titled “Deployment shape”Ingester is a separate Kamal service (service: josh-ingester in its own config/deploy.yml). Same Postgres accessory as josh-core (or rather: the accessory belongs to josh-core's deploy config; josh-ingester just connects to it via the Kamal Docker network at josh-core-postgres:5432).
The /data/corpus/ host directory is a bind mount in the ingester container. When we add a josh-mirror (later — to publish snapshots for OSS users), it serves from the same /data/corpus/.
Open: do we run the ingester as a single replica with the scheduler loop, or as N replicas with the scheduler in one and N-1 doing fetch work? v1 = single replica. APScheduler has multi-instance modes but we don't need them. Scale up only when one container can't keep up.
Failure modes
Section titled “Failure modes”| Failure | Effect | Recovery |
|---|---|---|
| HTTP 429/503 from upstream | Source's per-task tenacity retries; if exhausted, task marked errored, run continues | Retried on next scheduled run |
| Malformed XML / parse error | Task logged with error context, run continues | Fix parser, re-parse from /data/corpus/ (no re-fetch) |
| FK violation on load | Batch fails; records marked errored, run continues | Investigate; usually means a companion source needs refresh |
SQLITE_BUSY after busy_timeout | Write transaction aborts, record errored | Reduce concurrent-write surface (only one writer per process); retry on next batch |
| SQLite file corruption (rare; usually disk failure) | All ops fail with database is malformed | Restore from Litestream — point-in-time recovery |
| Ingester container crashes mid-run | Run row left in running status; on restart, runner does a janitor pass to mark running → failed for stale rows | Clean state recovery |
| Stuck flock (e.g., crashed run before unlock) | New runs of same source blocked | flock auto-releases on process exit; if it doesn't, rm /data/locks/ingest-{source}.lock |
Disk full on /data/corpus or /data/josh.db | Fetch fails with ENOSPC, run marked failed; SQLite write fails with database or disk is full | Increase volume, retry. We monitor disk usage as part of health |
| Two ingester containers accidentally running | flock prevents double-execution per source; both will compete on the lock and one wins | Kill the duplicate |
Testing strategy
Section titled “Testing strategy”Each source module has:
- Unit tests for the parser — fixed-fixture XML/JSON files, parser output asserted exactly. No network. Fastest.
- Integration tests for the full pipeline — fixture files in
tests/fixtures/, ephemeral Postgres (testcontainers or pg_temp schema), discover-fetch-parse-load runs end-to-end. - Smoke tests for the live API — opt-in (env flag), hit one real call to confirm we haven't broken contract.
CI runs unit + integration. Smoke tests run nightly against the live ingester.
Open questions
Section titled “Open questions”These don't block writing the framework but should be resolved before each becomes load-bearing:
Schema ownership between— resolved. Option (a) — shared internal packagejosh-coreandjosh-ingestershared/josh_substrate/. Both services install it. Seerepo-structure.html.Embedding provider abstraction— resolved. Locked OSS-only on Snowflake Arctic-Embed-M-v2 (1024-dim, MIT license, runs on CPU). No third-party API. Loaded locally by the embedding worker. See Architecture, Storage stack.- Backup of
/data/corpus. It's on the 100 GB volume. Daily snapshot to Hetzner Storage Box, or rebuild-from-source on disaster? Rebuild is technically free but takes days; snapshots are a few GB/day after compression. Lean: snapshot. - Idempotency of upsert under partial-row schema changes. When we add a column, do we want existing rows to update with the default, or stay stale? Likely fine; flag if we hit an edge case.
- Rate limit etiquette per source. Shared HTTP client has a global default; should each source override? Probably yes — Congress.gov is 5K/hr (real key); GovInfo is unstated; FR is ~1-2 req/sec. Per-source token bucket inside the shared client.
- Citation extraction lifecycle. Inline at parse time (slow ingestion, eager graph) or batch post-load worker (fast ingestion, lagging graph)? Lean: batch post-load, same shape as embedding queue.