package outbox import ( "context" "database/sql" "fmt" "time" "atlas9.dev/c/core/dbi" ) const sqliteTimeFormat = "2006-01-02 15:04:05" type SqliteStore struct { db dbi.DBI consumers []string fifo bool } var _ Emitter = (*SqliteStore)(nil) var _ Consumer = (*SqliteStore)(nil) func NewSqliteStore(db dbi.DBI, consumers []string, fifo bool) *SqliteStore { return &SqliteStore{db: db, consumers: consumers, fifo: fifo} } func (s *SqliteStore) Emit(ctx context.Context, eventType, partitionKey string, payload []byte, opts *EmitOptions) error { var processAfterStr *string var idempotencyKey *string if opts != nil { if opts.ProcessAfter != nil { v := opts.ProcessAfter.UTC().Format(sqliteTimeFormat) processAfterStr = &v } idempotencyKey = opts.IdempotencyKey } res, err := s.db.Exec(ctx, ` INSERT OR IGNORE INTO outbox (event_type, partition_key, idempotency_key, payload) VALUES ($1, $2, $3, $4) `, eventType, partitionKey, idempotencyKey, payload) if err != nil { return fmt.Errorf("inserting outbox event: %w", err) } n, err := res.RowsAffected() if err != nil { return fmt.Errorf("checking rows affected: %w", err) } if n == 0 { return nil // duplicate idempotency key, silently skip } seq, err := res.LastInsertId() if err != nil { return fmt.Errorf("getting outbox seq: %w", err) } for _, consumer := range s.consumers { _, err := s.db.Exec(ctx, ` INSERT INTO outbox_deliveries (seq, consumer, partition_key, process_after) VALUES ($1, $2, $3, $4) `, seq, consumer, partitionKey, processAfterStr) if err != nil { return fmt.Errorf("inserting task for %s: %w", consumer, err) } } return nil } func (s *SqliteStore) Claim(ctx context.Context, consumer string, leaseDuration time.Duration, limit int) ([]Claimed, error) { s.reclaimExpired(ctx) leaseUntil := time.Now().Add(leaseDuration).UTC().Format(sqliteTimeFormat) claimQuery := ` UPDATE outbox_deliveries SET status = 'processing', lease_until = $1, attempts = attempts + 1 WHERE rowid = ( SELECT d.rowid FROM outbox_deliveries d WHERE d.consumer = $2 AND d.status = 'pending' AND (d.process_after IS NULL OR d.process_after <= datetime('now')) AND d.seq = ( SELECT MIN(d2.seq) FROM outbox_deliveries d2 WHERE d2.consumer = d.consumer AND d2.partition_key = d.partition_key AND d2.status = 'pending' ) ORDER BY RANDOM() LIMIT 1 )` if s.fifo { claimQuery = ` UPDATE outbox_deliveries SET status = 'processing', lease_until = $1, attempts = attempts + 1 WHERE rowid = ( SELECT d.rowid FROM outbox_deliveries d WHERE d.consumer = $2 AND d.status = 'pending' AND (d.process_after IS NULL OR d.process_after <= datetime('now')) AND NOT EXISTS ( SELECT 1 FROM outbox_deliveries inflight WHERE inflight.consumer = d.consumer AND inflight.partition_key = d.partition_key AND inflight.status = 'processing' ) AND d.seq = ( SELECT MIN(d2.seq) FROM outbox_deliveries d2 WHERE d2.consumer = d.consumer AND d2.partition_key = d.partition_key AND d2.status = 'pending' ) ORDER BY RANDOM() LIMIT 1 )` } var results []Claimed for range limit { res, err := s.db.Exec(ctx, claimQuery, leaseUntil, consumer) if err != nil { return results, fmt.Errorf("claiming task: %w", err) } n, err := res.RowsAffected() if err != nil { return results, fmt.Errorf("checking rows affected: %w", err) } if n == 0 { break } // Fetch the claimed task var task Task var status string var processAfterStr, leaseStr, completedStr sql.NullString row := s.db.QueryRow(ctx, ` SELECT seq, consumer, partition_key, status, process_after, lease_until, completed_at, attempts FROM outbox_deliveries WHERE consumer = $1 AND status = 'processing' AND lease_until = $2 ORDER BY seq LIMIT 1 `, consumer, leaseUntil) err = row.Scan(&task.Seq, &task.Consumer, &task.PartitionKey, &status, &processAfterStr, &leaseStr, &completedStr, &task.Attempts) if err != nil { return results, fmt.Errorf("fetching claimed task: %w", err) } task.Status = Status(status) if processAfterStr.Valid { t, _ := time.Parse(sqliteTimeFormat, processAfterStr.String) task.ProcessAfter = &t } if leaseStr.Valid { t, _ := time.Parse(sqliteTimeFormat, leaseStr.String) task.LeaseUntil = &t } if completedStr.Valid { t, _ := time.Parse(sqliteTimeFormat, completedStr.String) task.CompletedAt = &t } // Fetch the event var event Event var createdStr string var idempotencyKey sql.NullString row = s.db.QueryRow(ctx, ` SELECT seq, event_type, partition_key, idempotency_key, payload, created_at FROM outbox WHERE seq = $1 `, task.Seq) err = row.Scan(&event.Seq, &event.EventType, &event.PartitionKey, &idempotencyKey, &event.Payload, &createdStr) if err != nil { return results, fmt.Errorf("fetching event: %w", err) } event.CreatedAt, _ = time.Parse(sqliteTimeFormat, createdStr) if idempotencyKey.Valid { event.IdempotencyKey = &idempotencyKey.String } results = append(results, Claimed{Event: event, Task: task}) } return results, nil } func (s *SqliteStore) Complete(ctx context.Context, seq int64, consumer string) error { _, err := s.db.Exec(ctx, ` UPDATE outbox_deliveries SET status = 'completed', completed_at = datetime('now') WHERE seq = $1 AND consumer = $2 `, seq, consumer) if err != nil { return fmt.Errorf("completing delivery: %w", err) } return nil } func (s *SqliteStore) Nack(ctx context.Context, seq int64, consumer string, retryAfter *time.Time) error { var processAfterStr *string if retryAfter != nil { v := retryAfter.UTC().Format(sqliteTimeFormat) processAfterStr = &v } _, err := s.db.Exec(ctx, ` UPDATE outbox_deliveries SET status = 'pending', lease_until = NULL, process_after = $1 WHERE seq = $2 AND consumer = $3 `, processAfterStr, seq, consumer) if err != nil { return fmt.Errorf("nacking delivery: %w", err) } return nil } func (s *SqliteStore) Fail(ctx context.Context, seq int64, consumer string) error { _, err := s.db.Exec(ctx, ` UPDATE outbox_deliveries SET status = 'failed', lease_until = NULL WHERE seq = $1 AND consumer = $2 `, seq, consumer) if err != nil { return fmt.Errorf("failing delivery: %w", err) } return nil } func (s *SqliteStore) reclaimExpired(ctx context.Context) error { _, err := s.db.Exec(ctx, ` UPDATE outbox_deliveries SET status = 'pending', lease_until = NULL WHERE status = 'processing' AND lease_until < datetime('now') `) if err != nil { return fmt.Errorf("reclaiming expired deliveries: %w", err) } return nil }