BUG #14972: row duplicate on first SELECT from CTE (by JOIN/FOR UPDATE) from which UPDATE performed recently

From: dsuchka(at)gmail(dot)com
To: pgsql-bugs(at)postgresql(dot)org
Cc: dsuchka(at)gmail(dot)com
Subject: BUG #14972: row duplicate on first SELECT from CTE (by JOIN/FOR UPDATE) from which UPDATE performed recently
Date: 2017-12-13 19:12:16
Message-ID: 20171213191216.20144.83388@wrigleys.postgresql.org
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-bugs

The following bug has been logged on the website:

Bug reference: 14972
Logged by: Evgeniy Kozlov
Email address: dsuchka(at)gmail(dot)com
PostgreSQL version: 9.5.5
Operating system: gentoo, debian
Description:

Since ON CONFLICT does not work with partitions, I have designed an
aggregation appender by hand using UPDATE (for existed rows) + INSERT (for
new ones). Unexpectedly I got a strange result as a count of updated (really
joined) rows running that function cuncurrently on 9.5.5 and 9.5.7 (9.5.2
works correctly).
The got value exceeds the expected result by 1.

This happens only if the CTE with SELECT/JOIN performed with FOR UPDATE
option or if actually no UPDATE performed.

There is the result of cuncurrently invocation (first 2 rows are the select
results from the same CTE, and they differ):

psql:/tmp/go.sql:276: NOTICE:
*** joined data (first time):
["(1011,0,1,1)", "(1011,0,1,1)", "(1012,0,2,2)", "(1013,1,0,3)",
"(1014,1,1,4)", "(1015,1,2,5)", "(1016,2,0,6)", "(1017,2,1,7)",
"(1018,2,2,8)", "(1019,3,0,9)", "(1020,3,1,10)"]
psql:/tmp/go.sql:276: NOTICE:
*** joined data (next time):
["(1011,0,1,1)", "(1012,0,2,2)", "(1013,1,0,3)", "(1014,1,1,4)",
"(1015,1,2,5)", "(1016,2,0,6)", "(1017,2,1,7)", "(1018,2,2,8)",
"(1019,3,0,9)", "(1020,3,1,10)"]
psql:/tmp/go.sql:276: NOTICE:
*** input data:
["(0,1,1)", "(0,2,2)", "(1,0,3)", "(1,1,4)", "(1,2,5)", "(2,0,6)",
"(2,1,7)", "(2,2,8)", "(3,0,9)", "(3,1,10)"]
psql:/tmp/go.sql:276: NOTICE:
*** overall agg data:
["(1011,0,1,2981)", "(1012,0,2,5962)", "(1013,1,0,8943)",
"(1014,1,1,11924)", "(1015,1,2,14905)", "(1016,2,0,17886)",
"(1017,2,1,20867)", "(1018,2,2,23848)", "(1019,3,0,26829)",
"(1020,3,1,29810)"]

file /tmp/go.sql:
SELECT test_agg.append_agg(to_jsonb(ARRAY(
SELECT jsonb_build_object('a', x.i / 3, 'b', x.i % 3, 'x', x.i)
FROM generate_series(1, 10) AS x(i)
)));
... repeat 100+ times
\i /tmp/go.sql -- yeap, run it recursively w/o tail recursion (growth of
opened fd)
(end of go.sql)

Code (there is the test simplified version):
CREATE SCHEMA IF NOT EXISTS test_agg;

CREATE TABLE test_agg.some_agg (
id bigserial PRIMARY KEY,
key_a integer NOT NULL,
key_b integer NOT NULL,
value integer NOT NULL,

UNIQUE (key_a, key_b)
);

-- Request: [ {"a": <int4>, "b": <int4>, "x": <int4>}, ... ]
CREATE OR REPLACE FUNCTION test_agg.append_agg(request jsonb)
RETURNS jsonb
LANGUAGE plpgsql
AS $$
DECLARE
input_count integer;
updated_count integer := 0;
inserted_count integer;
joined_count integer;
joined_count_data jsonb;
joined_data text;
input_data text;
agg_data text;
BEGIN
-- convert the input data (json -> table)
CREATE TEMP TABLE _tt_input AS
SELECT (i->>'a')::integer AS a,
(i->>'b')::integer AS b,
(i->>'x')::integer AS x
FROM jsonb_array_elements(request) AS i
;
ALTER TABLE _tt_input ADD PRIMARY KEY (a, b);
SELECT count(*) INTO input_count FROM _tt_input;

<<append_loop>>
LOOP
-- update existed agg rows
WITH
joined AS (
SELECT agg.id, src.*
FROM test_agg.some_agg AS agg
JOIN _tt_input AS src
ON (agg.key_a = src.a) AND (agg.key_b = src.b)
--/*
ORDER BY agg.key_a, agg.key_b
FOR UPDATE
--*/
),
--/*
updated AS (
UPDATE test_agg.some_agg AS agg
SET value = agg.value + src.x
FROM joined AS src
WHERE (agg.key_a = src.a) AND (agg.key_b = src.b)
),
--*/
cleaned AS (
DELETE FROM _tt_input
WHERE (a, b) IN (SELECT x.a, x.b FROM joined AS x)
)

SELECT
(SELECT jsonb_build_object('count', foo.c, 'data',
foo.d)
FROM (SELECT count(*), jsonb_agg((x.*)::text) FROM
joined AS x) AS foo(c, d)),
(SELECT jsonb_agg(row(x.*)::text) FROM joined AS
x)::text,
(SELECT jsonb_agg(row(x.*)::text) FROM _tt_input AS
x)::text,
(SELECT jsonb_agg(row(x.*)::text) FROM test_agg.some_agg
AS x)::text
INTO
joined_count_data,
joined_data,
input_data,
agg_data
; -- end WITH
joined_count := (joined_count_data->>'count')::integer;
updated_count := updated_count + joined_count;

IF (joined_count > input_count) THEN
RAISE NOTICE E'\n *** joined data (first time):\n%',
joined_count_data->>'data';
RAISE NOTICE E'\n *** joined data (next time):\n%',
joined_data;
RAISE NOTICE E'\n *** input data:\n%', input_data;
RAISE NOTICE E'\n *** overall agg data:\n%', agg_data;
SELECT pg_sleep(10);
END IF;

-- try to insert new ones
BEGIN
INSERT INTO test_agg.some_agg(key_a, key_b, value)
SELECT a, b, x FROM _tt_input;
GET DIAGNOSTICS inserted_count := ROW_COUNT;
EXIT append_loop;
EXCEPTION
WHEN unique_violation THEN
NULL;
END;
END LOOP append_loop;

DROP TABLE _tt_input;

RETURN jsonb_build_object('i', inserted_count, 'u', updated_count);
END
$$;

Responses

Browse pgsql-bugs by date

  From Date Subject
Next Message David G. Johnston 2017-12-13 20:14:52 Re: BUG #14972: row duplicate on first SELECT from CTE (by JOIN/FOR UPDATE) from which UPDATE performed recently
Previous Message Tom Lane 2017-12-13 17:36:41 Re: Build error on Windows 10 of version 9.5.10 using Visual Studio 2015