package outbox import ( "context" "database/sql" "log/slog" "time" "atlas9.dev/c/core/dbi" ) type OutboxWorker struct { DB *sql.DB Outbox func(dbi.DBI) Consumer Consumers []string Handler func(ctx context.Context, event Event, task Task) error Interval time.Duration LeaseDuration time.Duration MaxAttempts int BatchSize int } func (w *OutboxWorker) Run(ctx context.Context) { for { if ctx.Err() != nil { return } processed := w.poll(ctx) if !processed { select { case <-time.After(w.Interval): case <-ctx.Done(): return } } } } func backoff(attempts int) time.Duration { d := time.Duration(1< 5*time.Minute { d = 5 * time.Minute } return d } func (w *OutboxWorker) poll(ctx context.Context) bool { processed := false for _, consumer := range w.Consumers { if w.pollConsumer(ctx, consumer) { processed = true } } return processed } func (w *OutboxWorker) pollConsumer(ctx context.Context, consumer string) bool { batchSize := w.BatchSize if batchSize <= 0 { batchSize = 1 } var claimed []Claimed err := dbi.ReadWrite(ctx, w.DB, func(tx dbi.DBI) error { var e error claimed, e = w.Outbox(tx).Claim(ctx, consumer, w.LeaseDuration, batchSize) return e }) if err != nil { slog.Error("claiming tasks", "consumer", consumer, "err", err) return false } if len(claimed) == 0 { return false } for _, c := range claimed { err := w.Handler(ctx, c.Event, c.Task) if err != nil { slog.Error("processing event", "type", c.Event.EventType, "consumer", c.Task.Consumer, "attempts", c.Task.Attempts, "err", err) nackErr := dbi.ReadWrite(ctx, w.DB, func(tx dbi.DBI) error { store := w.Outbox(tx) if w.MaxAttempts > 0 && c.Task.Attempts >= w.MaxAttempts { return store.Fail(ctx, c.Event.Seq, consumer) } retryAfter := time.Now().Add(backoff(c.Task.Attempts)) return store.Nack(ctx, c.Event.Seq, consumer, &retryAfter) }) if nackErr != nil { slog.Error("nacking task", "err", nackErr) } continue } err = dbi.ReadWrite(ctx, w.DB, func(tx dbi.DBI) error { return w.Outbox(tx).Complete(ctx, c.Event.Seq, consumer) }) if err != nil { slog.Error("completing task", "err", err) } } return true }