# Outbox TODO ## Current State Two-table architecture (event log + tasks) with SQLite and PostgreSQL store implementations in `core/outbox/`. Worker in `apps/demo/outbox_worker.go`. Interfaces: `Emitter` (producers) and `Consumer` (workers) in `core/outbox/outbox.go`. Features implemented: emit with fan-out, batch claim with fair scheduling, FIFO/non-FIFO modes, ack/nack/fail with exponential backoff, delayed execution (ProcessAfter), idempotency keys, lease-based crash recovery. ## PostgreSQL Migration Create a PostgreSQL-specific migration alongside the existing SQLite one in `apps/demo/migrations/`. Key differences from SQLite: - `BIGSERIAL` instead of `INTEGER PRIMARY KEY AUTOINCREMENT` - `TIMESTAMPTZ` instead of `TEXT` for time columns - `BYTEA` instead of `TEXT` for payload - Native `now()` instead of `datetime('now')` - Partial indexes same as SQLite ## LISTEN/NOTIFY Add push-based wakeup to the PostgreSQL store so workers don't poll an empty queue. - Add a trigger on the `outbox_deliveries` table (or pending partition) that calls `pg_notify('outbox_' || NEW.consumer, '')` on INSERT - Add a `Listen(ctx, consumer string) <-chan struct{}` method or similar to `PostgresStore` — needs a dedicated `*sql.DB` connection (not from the pool) since LISTEN is session-scoped - Worker uses the channel to wake up immediately when new work arrives, with polling as a fallback safety net - Consider: should this be on the `Consumer` interface or separate? Probably separate — SQLite doesn't need it, and it requires a raw connection rather than `dbi.DBI` ## Status Partitioning (PostgreSQL) Partition `outbox_deliveries` by status to keep the pending working set small. ```sql CREATE TABLE outbox_deliveries (...) PARTITION BY LIST (status); CREATE TABLE deliveries_pending PARTITION OF outbox_deliveries FOR VALUES IN ('pending'); CREATE TABLE deliveries_processing PARTITION OF outbox_deliveries FOR VALUES IN ('processing'); CREATE TABLE deliveries_completed PARTITION OF outbox_deliveries FOR VALUES IN ('completed'); CREATE TABLE deliveries_failed PARTITION OF outbox_deliveries FOR VALUES IN ('failed'); ``` - Requires `ENABLE ROW MOVEMENT` for status UPDATEs to move rows between partitions - Sub-partition completed deliveries by time for instant cleanup via `DROP TABLE` - This is PostgreSQL-only — SQLite doesn't support declarative partitioning - The Go code doesn't change; this is purely schema-level ## synchronous_commit Per-Transaction For PostgreSQL job queue operations, relaxing durability for throughput: ```sql SET LOCAL synchronous_commit = off; ``` - This is per-transaction, not per-database - Only for job queue operations (claim, complete, nack) — not for business data writes - Could be a flag on `PostgresStore` or handled by the caller via `dbi.WithTx` options - Consider: does `dbi.DBI` need to support `SET LOCAL`? Currently it only has Query/QueryRow/Exec ## Event Type Filtering on Claim Consumers that only care about certain event types. Currently all consumers get all events. Options: - Filter at emit time (only create tasks for matching consumers) — requires a subscription/routing table - Filter at claim time (add `AND event_type IN (...)` to the claim query) — simpler but wastes task rows - Emit-time filtering is more efficient but couples the emitter to consumer configuration ## Dead Letter / Failed Task Inspection Query and manage failed tasks: - List failed tasks by consumer, event type, partition key - Retry a failed task (reset to pending) - View failure history (attempt count, last error) - Consider: should error messages be stored on the task? Currently there's no error column ## Metrics / Observability Queue health monitoring: - Queue depth per consumer (pending count) - Processing latency (time from emit to complete) - Failure rate per consumer - In-flight count - Could be a `Stats(ctx, consumer) (*QueueStats, error)` method on Consumer, or separate ## Graceful Shutdown The worker currently just checks `ctx.Err()` at the top of the loop. In-flight work is abandoned (lease expires, gets reclaimed). - On context cancellation, finish processing the current batch before stopping - Don't claim new work after cancellation signal - Log a clean shutdown message - Consider a drain timeout — if processing takes too long, give up ## Concurrent Processing The worker processes a batch sequentially. For I/O-bound handlers (HTTP calls, email sends), concurrent processing would improve throughput: - Add a `Concurrency int` field to `OutboxWorker` - Use a semaphore or worker pool to process batch items concurrently - Each goroutine independently nacks/fails/completes its own task - With FIFO mode, concurrent processing within a batch is safe because each claimed task is from a different partition (the claim query ensures this)