substratein progressp1

Substrate cron scheduler

substrate-cron-scheduler · updated 2026-05-09T15:00:00Z · owner rritz

Use the pencil to edit title, status, priority, and owner. Changing status auto-prepends a changelog entry.

Every source declares a schedule cron string, but those declarations are
inert until a daemon fires them. This spec wires an apscheduler.AsyncIOScheduler
inside the josh-ingester container that walks discover_sources() on boot,
registers one job per source whose schedule is non-null, and invokes the
shared run_source() runner on each tick — picking up flock-based per-source
advisory locks and writing every cron-fired run to ingestion_runs with
triggered_by='scheduler' so cron runs are distinguishable from manual ones.

As an OSS self-hoster, I want my substrate to stay fresh without invoking the CLI by hand so that I can deploy the ingester once and trust it to keep up with daily refreshes.

As an agent operator, I want to see the next scheduled fire time and last-fired status per source so that I can spot stuck or drifting schedules before stale data shows up in retrieval.

As an SRE-of-one, I want cron-fired runs distinguished from manual `josh-ingester run` invocations so that when something fires twice or fails I can tell whether a human or the scheduler did it.

  1. When the `josh-ingester` container starts with the daemon entrypoint, the system shall enumerate all sources via `discover_sources()` and register one APScheduler job per source whose `schedule` attribute is non-null.
  2. When a registered cron fire time arrives for source X, the system shall acquire the per-source flock at `/data/locks/ingest-X.lock`, run the full discover→fetch→parse→load pipeline via `run_source(source, mode='incremental')`, and release the lock on completion or failure.
  3. While source X's advisory lock is already held (e.g., a long backfill is running), if its scheduled fire time arrives, the system shall log the skip via `LockHeld` and not queue or duplicate the run.
  4. When a scheduled run inserts an `ingestion_runs` row, the system shall set `triggered_by='scheduler'` so cron-fired runs are distinguishable from manual CLI invocations (which default to `triggered_by='manual'`).
  5. When the daemon is restarted, the system shall resume firing on the next scheduled tick without replaying any missed fires (APScheduler `coalesce=True` + `misfire_grace_time` short window).
  6. When `josh-ingester schedule` is run inside the container, the system shall print, for every source with a non-null schedule, the cron string, the timezone, and the computed next-fire time.
  7. When the daemon receives SIGTERM, it shall call `scheduler.shutdown()` and exit cleanly within 30 s without dropping in-flight runs (in-flight runs hold their flock; new fires are blocked until they release).
kindsql

DB

substrate

Query

SELECT COUNT(DISTINCT source) FROM ingestion_runs
WHERE triggered_by = 'scheduler'
  AND started_at >= strftime('%Y-%m-%dT%H:%M:%fZ', 'now', '-25 hours');

Compare (machine-checked)

{
  "op": "gte",
  "value": 2
}

Expect (prose)

At least 2 distinct sources had scheduler-triggered ingestion_runs in the last 25 hours.

Verifies that at least two distinct sources (the two shipped ones — CRS reports and legislators+committees) have produced a cron-fired `ingestion_runs` row within the past 25 hours. The 25-hour window accommodates daily schedules + small misfire allowance. Because both shipped sources fire daily (CRS at 07:30 ET, legislators at 02:00 ET), confirming this determiner from a cold deploy may require waiting up to ~24 hours after the daemon comes up. To validate sooner, temporarily set a source's `schedule` to a 1-minute cron (`* * * * *`) before deploy, observe one fire, then revert. The `josh-ingester schedule` command (which prints next-fire times) is a cheap pre-determiner sanity check — non-null next-fire times for every shipped source confirm the registry-to-scheduler wiring is live without waiting for the first tick.

None.

  • Distributed scheduling across multiple ingester replicas. v1 is single-replica; APScheduler runs in-process and the flock is local. If we ever need horizontal scaling, switch to APScheduler's database-backed jobstore + `max_instances=1` and adopt a distributed lock (Redis / DB advisory).
  • Arbitrary user-defined schedules at runtime. The only input is the per-source `schedule` class attribute; schedule changes ship via git → Kamal redeploy.
  • Backfills triggered by the scheduler. Backfills are one-shot human operations via `josh-ingester backfill <source>`; the daemon only fires `mode='incremental'` runs.
  • A separate ingester web/admin endpoint. The `josh-ingester schedule` CLI is the introspection surface in v1; richer admin views land via `josh-core` admin endpoints later.

Library + topology. apscheduler.AsyncIOScheduler running in-process
inside josh-ingester with timezone America/New_York (the only timezone
used in source schedule strings). Single-replica only — no jobstore, no
distributed coordination. The shape is already sketched in
josh-ingester/josh_ingester/scheduler.py; this spec wires it to a long-running
entry point and the container CMD.

Daemon entry. New typer command josh-ingester daemon that calls
scheduler.start_scheduler() (which builds the scheduler from
discover_sources(), starts it, and blocks on asyncio.Event().wait()).
Signal handling: install SIGTERM/SIGINT handlers that set the event so the
finally clause runs scheduler.shutdown(wait=True) and the process exits
cleanly. The existing schedule command stays as the introspection command;
it gains a next_fire_time field per source.

Container CMD. The ingester role in the project-root
config/deploy.yml carries cmd: josh-ingester daemon, overriding the
image's default web-role CMD. Manual operations
(josh-ingester run <source>, backfill, etc.) still work via
kamal app exec --reuse --roles=ingester '…' — they reuse the same
container and run alongside the daemon (the per-source flock prevents
conflicts with whatever the daemon is doing).

Per-source advisory locks. No new code — the runner already wraps
_run_locked in source_lock(source.name) from ingester.concurrency.
When the scheduler fires while a manual run is going (or vice versa), the
second invocation raises LockHeld, returns a RunResult(status='skipped'),
and the scheduler logs it. APScheduler's max_instances=1 is belt-and-braces
on top.

Catch-up suppression. APScheduler coalesce=True (collapse missed
fires into one) + misfire_grace_time=300 (5 min). After a restart, missed
fires within 5 min coalesce; older missed fires are dropped. No catch-up
storm of accumulated fires after a multi-hour outage.

triggered_by column. Small Alembic migration adds
triggered_by TEXT NOT NULL DEFAULT 'manual' to ingestion_runs. The
scheduler trampoline (_run_source_job) passes triggered_by='scheduler'
to run_source(...); manual CLI commands leave the default. Threading is
one new keyword on run_source_run_locked_run_inner
state.create_run. success_determiner queries on this column.

josh-ingester schedule enhancement. The command currently prints the
cron string + depends_on per source. Add a next_fire_time field computed
from CronTrigger.from_crontab(source.schedule, timezone=...).get_next_fire_time(None, datetime.now()).
No daemon attachment needed — the trigger object computes next-fire purely
from the cron string.

Files touched.
- shared/josh_substrate/src/josh_substrate/migrations/versions/0004_ingestion_runs_triggered_by.py (new)
- josh-ingester/josh_ingester/state.py (create_run accepts triggered_by)
- josh-ingester/josh_ingester/runner.py (thread triggered_by through)
- josh-ingester/josh_ingester/scheduler.py (trampoline passes triggered_by='scheduler'; SIGTERM handling)
- josh-ingester/josh_ingester/cli.py (new daemon command; schedule adds next_fire_time)
- config/deploy.yml (project root; ingester role's cmd:josh-ingester daemon)

7 of 14 done.

  • t1 Spec fleshed out: why, user_stories, acceptance_criteria, success_determiner, plan, out_of_scope
  • t2 Alembic migration 0004 adds `triggered_by TEXT NOT NULL DEFAULT 'manual'` to `ingestion_runs`
  • t3 `state.create_run()` accepts `triggered_by` kwarg; threaded through `run_source` → `_run_locked`
  • t4 Scheduler trampoline `_run_source_job` passes `triggered_by='scheduler'` to `run_source`
  • t5 New `josh-ingester daemon` CLI command running `start_scheduler()` with SIGTERM/SIGINT handling
  • t6 `josh-ingester schedule` enhanced to print next-fire time per source
  • t7 Ingester role's `cmd:` set to `josh-ingester daemon` in project-root `config/deploy.yml` (post-consolidation; pre-consolidation this lived in `josh-ingester/config/deploy.yml`)
  • t8 Migration applied to production substrate (`alembic upgrade head` via Kamal)
  • t9 Daemon deployed to production via `kamal deploy` (USER-APPROVED)
  • t10 Container running daemon confirmed via `docker ps` + `kamal app exec --reuse 'josh-ingester schedule'` shows next-fire times
  • t11 Cron firing observed for both CRS and Legislators sources via `ingestion_runs` rows with `triggered_by='scheduler'`
  • t12 `crs-reports-ingester.yaml` and `legislators-and-committees-ingester.yaml` confirm t11 done with cron-fired evidence in changelog
  • t13 Step 12 of `https://docs.usejosh.com/operations/add-a-new-source/` apology removed; replaced with confirmation that `schedule` is now picked up automatically
  • t14 Success determiner runs green (`>= 2` distinct sources cron-fired in last 25h)
  • 2026-05-09T15:00:00Z plannedin_progress Spec fleshed out and Phase 2 implementation landed locally: migration 0004 (triggered_by column), state/runner/scheduler threading, `josh-ingester daemon` CLI command with SIGTERM handling, `schedule` CLI enhanced with next-fire times, Dockerfile CMD switched from `sleep infinity` to `python -m ingester daemon`. Pending production deploy + observation of first cron-fired runs (t8–t14).

docs/spec/substrate-cron-scheduler.html · generated by bin/build-spec.py