package main import ( "context" "database/sql" "log/slog" "time" "atlas9.dev/c/core/dbi" "atlas9.dev/c/core/outbox" ) type OutboxWorker struct { DB *sql.DB Outbox func(dbi.DBI) outbox.Consumer Consumers []string Handler func(ctx context.Context, event outbox.Event, task outbox.Task) error Interval time.Duration LeaseDuration time.Duration MaxAttempts int BatchSize int } func (w *OutboxWorker) Run(ctx context.Context) { for { if ctx.Err() != nil { return } processed := w.poll(ctx) if !processed { select { case <-time.After(w.Interval): case <-ctx.Done(): return } } } } func backoff(attempts int) time.Duration { d := time.Duration(1< 5*time.Minute { d = 5 * time.Minute } return d } func (w *OutboxWorker) poll(ctx context.Context) bool { processed := false for _, consumer := range w.Consumers { if w.pollConsumer(ctx, consumer) { processed = true } } return processed } func (w *OutboxWorker) pollConsumer(ctx context.Context, consumer string) bool { batchSize := w.BatchSize if batchSize <= 0 { batchSize = 1 } claimed, err := dbi.ReadWrite(ctx, w.DB, func(tx dbi.DBI) ([]outbox.Claimed, error) { return w.Outbox(tx).Claim(ctx, consumer, w.LeaseDuration, batchSize) }) if err != nil { slog.Error("claiming tasks", "consumer", consumer, "err", err) return false } if len(claimed) == 0 { return false } for _, c := range claimed { err := w.Handler(ctx, c.Event, c.Task) if err != nil { slog.Error("processing event", "type", c.Event.EventType, "consumer", c.Task.Consumer, "attempts", c.Task.Attempts, "err", err) _, nackErr := dbi.ReadWrite(ctx, w.DB, func(tx dbi.DBI) (struct{}, error) { store := w.Outbox(tx) if w.MaxAttempts > 0 && c.Task.Attempts >= w.MaxAttempts { return struct{}{}, store.Fail(ctx, c.Event.Seq, consumer) } retryAfter := time.Now().Add(backoff(c.Task.Attempts)) return struct{}{}, store.Nack(ctx, c.Event.Seq, consumer, &retryAfter) }) if nackErr != nil { slog.Error("nacking task", "err", nackErr) } continue } _, err = dbi.ReadWrite(ctx, w.DB, func(tx dbi.DBI) (struct{}, error) { return struct{}{}, w.Outbox(tx).Complete(ctx, c.Event.Seq, consumer) }) if err != nil { slog.Error("completing task", "err", err) } } return true }