package outbox import ( "context" "database/sql" "fmt" "time" "atlas9.dev/c/core/dbi" ) type PostgresStore struct { db dbi.DBI consumers []string fifo bool } var _ Emitter = (*PostgresStore)(nil) var _ Consumer = (*PostgresStore)(nil) func NewPostgresStore(db dbi.DBI, consumers []string, fifo bool) *PostgresStore { return &PostgresStore{db: db, consumers: consumers, fifo: fifo} } func (s *PostgresStore) Emit(ctx context.Context, eventType, partitionKey string, payload []byte, opts *EmitOptions) error { var processAfter *time.Time var idempotencyKey *string if opts != nil { processAfter = opts.ProcessAfter idempotencyKey = opts.IdempotencyKey } var seq int64 err := s.db.QueryRow(ctx, ` INSERT INTO outbox (event_type, partition_key, idempotency_key, payload) VALUES ($1, $2, $3, $4) ON CONFLICT (idempotency_key) DO NOTHING RETURNING seq `, eventType, partitionKey, idempotencyKey, payload).Scan(&seq) if err == sql.ErrNoRows { return nil // duplicate idempotency key, silently skip } if err != nil { return fmt.Errorf("inserting outbox event: %w", err) } for _, consumer := range s.consumers { _, err := s.db.Exec(ctx, ` INSERT INTO outbox_deliveries (seq, consumer, partition_key, process_after) VALUES ($1, $2, $3, $4) `, seq, consumer, partitionKey, processAfter) if err != nil { return fmt.Errorf("inserting task for %s: %w", consumer, err) } } return nil } func (s *PostgresStore) Claim(ctx context.Context, consumer string, leaseDuration time.Duration, limit int) ([]Claimed, error) { s.reclaimExpired(ctx) leaseUntil := time.Now().Add(leaseDuration).UTC() claimQuery := ` UPDATE outbox_deliveries SET status = 'processing', lease_until = $1, attempts = attempts + 1 WHERE (seq, consumer) IN ( SELECT d.seq, d.consumer FROM outbox_deliveries d WHERE d.consumer = $2 AND d.status = 'pending' AND (d.process_after IS NULL OR d.process_after <= now()) AND d.seq = ( SELECT MIN(d2.seq) FROM outbox_deliveries d2 WHERE d2.consumer = d.consumer AND d2.partition_key = d.partition_key AND d2.status = 'pending' ) ORDER BY RANDOM() LIMIT $3 FOR UPDATE SKIP LOCKED ) RETURNING seq, consumer, partition_key, status, process_after, lease_until, completed_at, attempts` if s.fifo { claimQuery = ` UPDATE outbox_deliveries SET status = 'processing', lease_until = $1, attempts = attempts + 1 WHERE (seq, consumer) IN ( SELECT d.seq, d.consumer FROM outbox_deliveries d WHERE d.consumer = $2 AND d.status = 'pending' AND (d.process_after IS NULL OR d.process_after <= now()) AND NOT EXISTS ( SELECT 1 FROM outbox_deliveries inflight WHERE inflight.consumer = d.consumer AND inflight.partition_key = d.partition_key AND inflight.status = 'processing' ) AND d.seq = ( SELECT MIN(d2.seq) FROM outbox_deliveries d2 WHERE d2.consumer = d.consumer AND d2.partition_key = d.partition_key AND d2.status = 'pending' ) ORDER BY RANDOM() LIMIT $3 FOR UPDATE SKIP LOCKED ) RETURNING seq, consumer, partition_key, status, process_after, lease_until, completed_at, attempts` } rows, err := s.db.Query(ctx, claimQuery, leaseUntil, consumer, limit) if err != nil { return nil, fmt.Errorf("claiming tasks: %w", err) } defer rows.Close() var tasks []Task for rows.Next() { var task Task var status string err := rows.Scan(&task.Seq, &task.Consumer, &task.PartitionKey, &status, &task.ProcessAfter, &task.LeaseUntil, &task.CompletedAt, &task.Attempts) if err != nil { return nil, fmt.Errorf("scanning claimed task: %w", err) } task.Status = Status(status) tasks = append(tasks, task) } if err := rows.Err(); err != nil { return nil, fmt.Errorf("iterating claimed tasks: %w", err) } if len(tasks) == 0 { return nil, nil } // Fetch events for claimed tasks var results []Claimed for _, task := range tasks { var event Event err := s.db.QueryRow(ctx, ` SELECT seq, event_type, partition_key, idempotency_key, payload, created_at FROM outbox WHERE seq = $1 `, task.Seq).Scan(&event.Seq, &event.EventType, &event.PartitionKey, &event.IdempotencyKey, &event.Payload, &event.CreatedAt) if err != nil { return results, fmt.Errorf("fetching event: %w", err) } results = append(results, Claimed{Event: event, Task: task}) } return results, nil } func (s *PostgresStore) Complete(ctx context.Context, seq int64, consumer string) error { _, err := s.db.Exec(ctx, ` UPDATE outbox_deliveries SET status = 'completed', completed_at = now() WHERE seq = $1 AND consumer = $2 `, seq, consumer) if err != nil { return fmt.Errorf("completing delivery: %w", err) } return nil } func (s *PostgresStore) Nack(ctx context.Context, seq int64, consumer string, retryAfter *time.Time) error { _, err := s.db.Exec(ctx, ` UPDATE outbox_deliveries SET status = 'pending', lease_until = NULL, process_after = $3 WHERE seq = $1 AND consumer = $2 `, seq, consumer, retryAfter) if err != nil { return fmt.Errorf("nacking delivery: %w", err) } return nil } func (s *PostgresStore) Fail(ctx context.Context, seq int64, consumer string) error { _, err := s.db.Exec(ctx, ` UPDATE outbox_deliveries SET status = 'failed', lease_until = NULL WHERE seq = $1 AND consumer = $2 `, seq, consumer) if err != nil { return fmt.Errorf("failing delivery: %w", err) } return nil } func (s *PostgresStore) reclaimExpired(ctx context.Context) error { _, err := s.db.Exec(ctx, ` UPDATE outbox_deliveries SET status = 'pending', lease_until = NULL WHERE status = 'processing' AND lease_until < now() `) if err != nil { return fmt.Errorf("reclaiming expired deliveries: %w", err) } return nil }