package tasks import ( "context" "database/sql" "fmt" "time" "atlas9.dev/c/core/dbi" ) const sqliteTimeFormat = "2006-01-02 15:04:05" // SqliteProducer routes task names to per-table queues. type SqliteProducer struct { db dbi.DBI tables map[string]string } func NewSqliteProducer(db dbi.DBI, tables map[string]string) *SqliteProducer { return &SqliteProducer{db: db, tables: tables} } func (p *SqliteProducer) Push(ctx context.Context, name string, payload []byte) error { table, ok := p.tables[name] if !ok { return fmt.Errorf("unknown task: %s", name) } _, err := p.db.Exec(ctx, "INSERT INTO "+table+" (payload) VALUES ($1)", string(payload)) if err != nil { return fmt.Errorf("inserting task: %w", err) } return nil } // SqliteConsumer claims and manages tasks in a named table. type SqliteConsumer struct { db dbi.DBI table string } func NewSqliteConsumer(db dbi.DBI, table string) *SqliteConsumer { return &SqliteConsumer{db: db, table: table} } func (s *SqliteConsumer) Claim(ctx context.Context, leaseDuration time.Duration, limit int) ([]Task, error) { var results []Task // Reclaim expired leases _, err := s.db.Exec(ctx, ` UPDATE `+s.table+` SET status = 'pending', lease_until = NULL WHERE status = 'processing' AND lease_until < datetime('now') `) if err != nil { return nil, fmt.Errorf("reclaiming expired leases: %w", err) } leaseUntil := time.Now().Add(leaseDuration).UTC().Format(sqliteTimeFormat) for range limit { // Claim one pending task res, err := s.db.Exec(ctx, ` UPDATE `+s.table+` SET status = 'processing', lease_until = $1, attempts = attempts + 1 WHERE id = ( SELECT id FROM `+s.table+` WHERE status = 'pending' AND run_after <= datetime('now') ORDER BY id LIMIT 1 ) `, leaseUntil) if err != nil { return nil, fmt.Errorf("claiming task: %w", err) } n, err := res.RowsAffected() if err != nil { return nil, fmt.Errorf("checking rows affected: %w", err) } if n == 0 { break } // Fetch the claimed task var task Task var payloadStr string var statusStr string var leaseStr sql.NullString var runAfterStr, createdStr string row := s.db.QueryRow(ctx, ` SELECT id, payload, status, attempts, run_after, lease_until, created_at FROM `+s.table+` WHERE status = 'processing' AND lease_until = $1 ORDER BY id LIMIT 1 `, leaseUntil) err = row.Scan( &task.ID, &payloadStr, &statusStr, &task.Attempts, &runAfterStr, &leaseStr, &createdStr, ) if err != nil { return nil, fmt.Errorf("fetching claimed task: %w", err) } task.Payload = []byte(payloadStr) task.Status = Status(statusStr) task.RunAfter, _ = time.Parse(sqliteTimeFormat, runAfterStr) task.CreatedAt, _ = time.Parse(sqliteTimeFormat, createdStr) if leaseStr.Valid { t, _ := time.Parse(sqliteTimeFormat, leaseStr.String) task.LeaseUntil = &t } results = append(results, task) } return results, nil } func (s *SqliteConsumer) Complete(ctx context.Context, id int64) error { _, err := s.db.Exec(ctx, ` UPDATE `+s.table+` SET status = 'completed', lease_until = NULL WHERE id = $1 `, id) if err != nil { return fmt.Errorf("completing task: %w", err) } return nil } func (s *SqliteConsumer) Retry(ctx context.Context, id int64, runAfter time.Time) error { _, err := s.db.Exec(ctx, ` UPDATE `+s.table+` SET status = 'pending', lease_until = NULL, run_after = $1 WHERE id = $2 `, runAfter.UTC().Format(sqliteTimeFormat), id) if err != nil { return fmt.Errorf("retrying task: %w", err) } return nil } func (s *SqliteConsumer) Fail(ctx context.Context, id int64) error { _, err := s.db.Exec(ctx, ` UPDATE `+s.table+` SET status = 'failed', lease_until = NULL WHERE id = $1 `, id) if err != nil { return fmt.Errorf("failing task: %w", err) } return nil }