CTEs and concurrency

From: Brecht De Rooms <databrecht(at)gmail(dot)com>
To: pgsql-general(at)postgresql(dot)org
Subject: CTEs and concurrency
Date: 2024-03-04 21:44:10
Message-ID: CACVYcoey0p5urQ07Qg1dVm=4fubh3JLxAdw9HBXGnY9EH4uwEg@mail.gmail.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-general

Dear,

I am currently running a chaos test on a system (essentially starting nodes
that process something and randomly knockign them out). It appeared to work
fine with regular tests but I am seeing occasional duplicate key value
violattions of a uniqueness constraint on one of the complexer CTE-based
queries. Something that only happens with concurrency where nodes restart
and ample load.
I can not reproduce it by taking out the query and running it manually in
PG Admin, it behaves fine if I do so and does exactly what I expect.

The query looks like this (it uses Rust SQLX which is why there is some
unnesting happening on the parameters).

WITH unnested_inputs AS (
SELECT * FROM (
SELECT
unnest($1::uuid[]) AS event_id,
unnest($2::varchar[]) AS type,
unnest($3::int[]) AS version,
unnest($4::uuid[]) AS causation_id,
unnest($5::uuid[]) AS correlation_id,
unnest($6::text[]) AS idempotency_key,
unnest($7::jsonb[]) AS data,
unnest($8::jsonb[]) AS metadata,
unnest($9::text[]) AS subscription_id,
unnest($10::text[]) AS subscription_instance_identifier,
unnest($11::bigint[]) AS applied_order_id
) AS inputs
),
to_update_subscription_logs AS (
SELECT sl.id as subscription_log_id, sl.node_id, sl.status, ui.*
FROM subscription_log sl
JOIN unnested_inputs ui
ON sl.event_id = ui.causation_id
AND sl.node_id = $12
AND sl.status = 'assigned'
AND sl.subscription_id = ui.subscription_id
AND sl.subscription_instance_identifier =
ui.subscription_instance_identifier
FOR UPDATE NOWAIT -- if something is updating it, we probably shouldn't
touch it anymore.
),
updated_logs AS (
UPDATE subscription_log sl
SET status = 'processed',
updated_at = CURRENT_TIMESTAMP
FROM to_update_subscription_logs usl
WHERE sl.id = usl.subscription_log_id
AND usl.node_id = $12
),
inserted_event_log AS (
INSERT INTO event_log (
event_id, type, version, causation_id, correlation_id,
idempotency_key, data, metadata, created_at
)
SELECT
event_id, type, version, usl.causation_id, correlation_id,
idempotency_key, data, metadata, CURRENT_TIMESTAMP
FROM to_update_subscription_logs usl
),
inserted_output_routing_info AS (
INSERT INTO output_event_routing (event_id, subscription_id,
subscription_instance_identifier, applied_order_id)
SELECT event_id, subscription_id, subscription_instance_identifier,
applied_order_id
FROM to_update_subscription_logs usl
),

SELECT * FROM to_update_subscription_logs

The tables look as follows:

CREATE TABLE event_log (
event_id UUID PRIMARY KEY,
event_order_id BIGINT REFERENCES event(order_id),
type varchar NOT NULL,
version int NOT NULL,
causation_id UUID,
correlation_id UUID,
idempotency_key TEXT NOT NULL,
data JSONB NOT NULL,
metadata JSONB,
created_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP,
CONSTRAINT constraint_event_log_unique_idempotency_key
UNIQUE(idempotency_key) -- idempotent writes.
);

CREATE TABLE output_event_routing (
event_id UUID REFERENCES event_log(event_id),
subscription_id TEXT NOT NULL,
subscription_instance_identifier TEXT,
applied_order_id BIGINT,
CONSTRAINT constraint_output_event_routing_uniqueness
UNIQUE(subscription_id, subscription_instance_identifier, applied_order_id)
);

CREATE TABLE subscription_log (
id UUID NOT NULL PRIMARY KEY,
event_id UUID NOT NULL,
event_order_id BIGINT NOT NULL,
event_correlation_id UUID NOT NULL,
subscription_instance_identifier TEXT NOT NULL,
subscription_id TEXT NOT NULL REFERENCES subscription(name),
status processing_status NOT NULL DEFAULT 'enqueued',
node_id UUID references node(id), -- is null until assigned.
);

Since I'm trying to avoid using PL/pgSQL upon request I tried to achieve
the following behaviour in CTEs:
- For given events, update the subscription log to 'processed' only if we
still are the node that is processing these and the status is still
'assigned'.
- Only for the events where the previous succeeded, continue processing by
inserting in the event_log and inserting in
the inserted_output_routing_info.
The mechanism aims to make sure we don't insert results of event processing
twice.

When logging the input values, we can see that there are indeed two times
the same value sets (exactly the same) passed for different nodes, that's
to be expected and exactly what has to be caught by this logic. Same
values, but another node. What we see is that one node succeeds and the
other node fails due to the uniqueness violation. Which is actually fine
from a business perspective since rolling back has the same effect, albeit
with an error that I didn't expect. However, I would love to understand
this, how can one node succeed and set the status of the log to 'processed'
and continue to insert the values while the other, apparently also is able
to continue inserting (which means that both nodes saw 'assigned' in the
select when it locked it for update). Is there something I do not fully
understand about how CTEs work in combination with locks?

Things I tried:
1. Whether I go for regular FOR UPDATE, SKIP LOCK or NOWAIT makes no
difference.
2. I do return the to_update_subscription_logs at the end to be sure the
lock is held (I'm aware of that CTE behaviour for selects) even if it's
used by the next CTE.

3. Changing it to UPDATE/RETURN (which was my original logic)
updated_subscription AS (
UPDATE subscription_log sl
SET status = 'processed',
updated_at = CURRENT_TIMESTAMP
FROM unnested_inputs ui
WHERE sl.event_id = ui.causation_id
AND sl.node_id = $12
AND sl.status = 'assigned' -- Assuming you're updating from 'assigned'
RETURNING ui.causation_id
)

then doing a distinct on the causation ID and only allow inserts for values
that have a causation ID that was successfully updated to processed yields
exactly the same behaviour, everything goes fine but once in a while things
go wrong when a node dies and another takes over.

4. I also tried with both approaches to do more explicit checks in the
joins when we insert to make sure that in the insert CTEs the node is still
the same etc, things that shouldn't be necessary, they also didn't change
anything.

5. After the error we can see that the succeeding node successfully set the
status to process and that it 'owns' the subscription_log entry for that
processed event. Which means the other node should not have been able to
get passed the lock.

If anyone could provide some insight of what knowledge I'm missing about
how CTEs work that would be amazing. Sorry that it's a rather complex case
which makes it hard to come up with something 'smaller' with the same tests
that reproduce it.

Responses

Browse pgsql-general by date

  From Date Subject
Next Message sud 2024-03-05 05:20:19 Re: Is partition pruning impacted by data type
Previous Message veem v 2024-03-04 20:01:46 Re: When manual analyze is needed