From: | Alex Burkhart <burkharta77(at)gmail(dot)com> |
---|---|
To: | pgsql-general(at)lists(dot)postgresql(dot)org |
Subject: | Design of a reliable task processing queue |
Date: | 2025-01-18 10:44:07 |
Message-ID: | CA+vxVFNLn7TOOmE+DRfOJ2z-fpaHYveRg9mq+tYZYG_pckBMfA@mail.gmail.com |
Views: | Raw Message | Whole Thread | Download mbox | Resend email |
Thread: | |
Lists: | pgsql-general |
Hey team,
I'm looking for help to organize locks and transaction for a reliable task
queue.
REQUIREMENTS
1. Pending actions are persisted to a database. There's a trace once they
are done.
2. Application workers pick actions one by one. At any given time, each
action can be assigned to at most one worker (transaction).
3. If multiple actions have same "lock_id", only one of them is processed
at the time. That has to be action with smallest id.
MY ATTEMPT
I got something approximate working with the following setup.
=== BEGIN DATABASE QUERY ===
DROP TABLE IF EXISTS actions;
CREATE TABLE actions (
id SERIAL PRIMARY KEY,
lock_id BIGINT,
action VARCHAR(255),
done BOOLEAN DEFAULT false
);
-- Sample data for testing.
INSERT INTO actions (lock_id, action) VALUES
(26, 'Create instance 26'),
(8, 'Update instance 8'),
(26, 'Update instance 26'),
(8, 'Delete instance 8');
=== END DATABASE QUERY ===
I use Go client to simulate workers utilizing the "actions" table.
=== BEGIN GO CLIENT ===
package main
import (
"context"
"fmt"
"os"
"os/signal"
"time"
"github.com/jackc/pgx/v5/pgxpool"
)
func main() {
ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
defer cancel()
pool, err := pgxpool.New(ctx, "postgres://postgres:password(at)localhost
/database")
if err != nil {
panic(err)
}
defer pool.Close()
tx, err := pool.Begin(ctx)
if err != nil {
panic(err)
}
defer tx.Rollback(ctx)
fmt.Println("BEGIN")
var (
id int32
lockid int64
action string
)
if err := tx.QueryRow(
ctx,
`SELECT id, lock_id, action
FROM actions
WHERE done = false
ORDER BY id
LIMIT 1
FOR NO KEY UPDATE
SKIP LOCKED`,
).Scan(&id, &lockid, &action); err != nil {
panic(err)
}
fmt.Println("LOCKING", lockid, "...")
if _, err := tx.Exec(ctx, fmt.Sprintf("SELECT pg_advisory_xact_lock(%d)",
lockid)); err != nil {
panic(err)
}
fmt.Println("EXECUTING", action)
select {
case <-ctx.Done():
// Pretend to do work for 10 seconds.
case <-time.After(10 * time.Second):
if _, err := tx.Exec(ctx, "UPDATE actions SET done = true WHERE id = $1",
id); err != nil {
panic(err)
}
if err := tx.Commit(ctx); err != nil {
panic(err)
}
fmt.Println("DONE")
}
fmt.Println("UNLOCKED")
}
=== END GO CLIENT ===
This code generates transaction like this.
=== BEGIN TRANSACTION ===
BEGIN;
-- Lock one row in "actions" table.
SELECT id, lock_id, action
FROM actions
WHERE done = false
ORDER BY id
LIMIT 1
FOR NO KEY UPDATE
SKIP LOCKED;
-- Lock other transactions that process same lock_id.
SELECT pg_advisory_xact_lock(%lock_id);
-- Work on the action... mark it done at the end.
UPDATE actions SET done = true WHERE id = %d;
COMMIT;
=== END TRANSACTION ===
Which almost does the job. By running the Go client concurrently, it picks
actions one by one and processes only one transaction with same "lock_id"
at a time.
However I'm worried about the gap between the row lock and "lock_id" lock.
This leaves a room for requirement "That has to be action with smallest id"
to be unsatisfied.
QUESTION
Is there a way to improve this attempt and close the gap? Or a completely
different strategy? I was brainstorming how to lock all rows where columns
have the same value or using ARRAY but struggle to put together a reliable
solution.
Thank you,
Alex
From | Date | Subject | |
---|---|---|---|
Next Message | Alexander Farber | 2025-01-18 11:21:18 | Why is DELETE FROM vehicle_data WHERE NOW, > expires_at, failing in my stored function? |
Previous Message | Guillaume Lelarge | 2025-01-17 21:11:01 | Re: Different Autovacuum Settings on Master and Replica in Streaming Replication |