package outbox import ( "context" "database/sql" "testing" "time" _ "github.com/mattn/go-sqlite3" ) // dbDBI wraps *sql.DB to implement dbi.DBI. type dbDBI struct { db *sql.DB } func (d *dbDBI) Query(ctx context.Context, query string, args ...any) (*sql.Rows, error) { return d.db.QueryContext(ctx, query, args...) } func (d *dbDBI) QueryRow(ctx context.Context, query string, args ...any) *sql.Row { return d.db.QueryRowContext(ctx, query, args...) } func (d *dbDBI) Exec(ctx context.Context, query string, args ...any) (sql.Result, error) { return d.db.ExecContext(ctx, query, args...) } func setupOutboxDB(t *testing.T) *sql.DB { t.Helper() db, err := sql.Open("sqlite3", ":memory:") if err != nil { t.Fatal(err) } t.Cleanup(func() { db.Close() }) for _, stmt := range []string{ `CREATE TABLE outbox ( seq INTEGER PRIMARY KEY AUTOINCREMENT, event_type TEXT NOT NULL, partition_key TEXT NOT NULL, idempotency_key TEXT UNIQUE, payload TEXT NOT NULL, created_at TEXT NOT NULL DEFAULT (datetime('now')) )`, `CREATE TABLE outbox_deliveries ( seq INTEGER NOT NULL REFERENCES outbox(seq), consumer TEXT NOT NULL, partition_key TEXT NOT NULL, status TEXT NOT NULL DEFAULT 'pending', process_after TEXT, lease_until TEXT, completed_at TEXT, attempts INTEGER NOT NULL DEFAULT 0, PRIMARY KEY (seq, consumer) )`, } { if _, err := db.Exec(stmt); err != nil { t.Fatal(err) } } return db } func TestEmit(t *testing.T) { db := setupOutboxDB(t) consumers := []string{"email", "webhook"} store := NewSqliteStore(&dbDBI{db}, consumers, false) ctx := context.Background() err := store.Emit(ctx, "user.created", "user-1", []byte(`{"id":"1"}`), nil) if err != nil { t.Fatal(err) } // Check event was inserted var count int db.QueryRow("SELECT COUNT(*) FROM outbox").Scan(&count) if count != 1 { t.Errorf("expected 1 outbox event, got %d", count) } // Check deliveries were created for each consumer db.QueryRow("SELECT COUNT(*) FROM outbox_deliveries").Scan(&count) if count != 2 { t.Errorf("expected 2 deliveries (one per consumer), got %d", count) } } func TestEmitIdempotency(t *testing.T) { db := setupOutboxDB(t) store := NewSqliteStore(&dbDBI{db}, []string{"c1"}, false) ctx := context.Background() key := "dedup-key" opts := &EmitOptions{IdempotencyKey: &key} err := store.Emit(ctx, "evt", "pk", []byte(`{}`), opts) if err != nil { t.Fatal(err) } // Second emit with same idempotency key should be silently skipped err = store.Emit(ctx, "evt", "pk", []byte(`{}`), opts) if err != nil { t.Fatal(err) } var count int db.QueryRow("SELECT COUNT(*) FROM outbox").Scan(&count) if count != 1 { t.Errorf("expected 1 event (duplicate skipped), got %d", count) } } func TestClaimAndComplete(t *testing.T) { db := setupOutboxDB(t) store := NewSqliteStore(&dbDBI{db}, []string{"c1"}, false) ctx := context.Background() err := store.Emit(ctx, "evt", "pk", []byte(`{"x":1}`), nil) if err != nil { t.Fatal(err) } // Claim claimed, err := store.Claim(ctx, "c1", 30*time.Second, 10) if err != nil { t.Fatal(err) } if len(claimed) != 1 { t.Fatalf("expected 1 claimed, got %d", len(claimed)) } if claimed[0].Event.EventType != "evt" { t.Errorf("event type = %q, want %q", claimed[0].Event.EventType, "evt") } if string(claimed[0].Event.Payload) != `{"x":1}` { t.Errorf("payload = %q, want %q", claimed[0].Event.Payload, `{"x":1}`) } // Complete err = store.Complete(ctx, claimed[0].Task.Seq, "c1") if err != nil { t.Fatal(err) } // Should not be claimable again claimed, err = store.Claim(ctx, "c1", 30*time.Second, 10) if err != nil { t.Fatal(err) } if len(claimed) != 0 { t.Errorf("expected 0 claimed after complete, got %d", len(claimed)) } } func TestNack(t *testing.T) { db := setupOutboxDB(t) store := NewSqliteStore(&dbDBI{db}, []string{"c1"}, false) ctx := context.Background() store.Emit(ctx, "evt", "pk", []byte(`{}`), nil) claimed, err := store.Claim(ctx, "c1", 30*time.Second, 10) if err != nil { t.Fatal(err) } if len(claimed) != 1 { t.Fatal("expected 1 claimed") } // Nack without retry delay — should be immediately claimable err = store.Nack(ctx, claimed[0].Task.Seq, "c1", nil) if err != nil { t.Fatal(err) } reclaimed, err := store.Claim(ctx, "c1", 30*time.Second, 10) if err != nil { t.Fatalf("second claim error: %v", err) } if len(reclaimed) != 1 { t.Errorf("expected 1 reclaimed after nack, got %d", len(reclaimed)) } } func TestNackWithRetryAfter(t *testing.T) { db := setupOutboxDB(t) store := NewSqliteStore(&dbDBI{db}, []string{"c1"}, false) ctx := context.Background() store.Emit(ctx, "evt", "pk", []byte(`{}`), nil) claimed, _ := store.Claim(ctx, "c1", 30*time.Second, 10) // Nack with retry far in the future future := time.Now().Add(1 * time.Hour) err := store.Nack(ctx, claimed[0].Task.Seq, "c1", &future) if err != nil { t.Fatal(err) } // Should not be claimable yet reclaimed, _ := store.Claim(ctx, "c1", 30*time.Second, 10) if len(reclaimed) != 0 { t.Errorf("expected 0 claimed (retry in future), got %d", len(reclaimed)) } } func TestFail(t *testing.T) { db := setupOutboxDB(t) store := NewSqliteStore(&dbDBI{db}, []string{"c1"}, false) ctx := context.Background() store.Emit(ctx, "evt", "pk", []byte(`{}`), nil) claimed, _ := store.Claim(ctx, "c1", 30*time.Second, 10) err := store.Fail(ctx, claimed[0].Task.Seq, "c1") if err != nil { t.Fatal(err) } // Failed task should never be claimed again reclaimed, _ := store.Claim(ctx, "c1", 30*time.Second, 10) if len(reclaimed) != 0 { t.Errorf("expected 0 claimed after fail, got %d", len(reclaimed)) } } func TestClaimLeaseDuration(t *testing.T) { db := setupOutboxDB(t) store := NewSqliteStore(&dbDBI{db}, []string{"c1"}, false) ctx := context.Background() store.Emit(ctx, "evt", "pk", []byte(`{}`), nil) // Claim with long lease claimed, _ := store.Claim(ctx, "c1", 1*time.Hour, 10) if len(claimed) != 1 { t.Fatal("expected 1 claimed") } // Should not be re-claimable while lease is active reclaimed, _ := store.Claim(ctx, "c1", 30*time.Second, 10) if len(reclaimed) != 0 { t.Errorf("expected 0 claimed while lease active, got %d", len(reclaimed)) } }