From 699240b1d9ef64bfb52b8636965213ac5586de9f Mon Sep 17 00:00:00 2001 From: Peter Eisentraut Date: Mon, 26 Aug 2024 08:02:54 +0200 Subject: [PATCH v1] Support POSITION with nondeterministic collations This allows using text position search functions with nondeterministic collations. These functions are - position, strpos - replace - split_part - string_to_array - string_to_table which all use common internal infrastructure. There was previously no internal implementation of this, so it was met with a not-supported error. This adds the internal implementation and removes the error. Unlike with deterministic collations, the search cannot use any byte-by-byte optimized techniques but has to go substring by substring. We also need to consider that the found match could have a different length than the needle and that there could be substrings of different length matching at a position. In most cases, we need to find the longest such substring (greedy semantics). --- src/backend/utils/adt/varlena.c | 105 +++++++++++--- .../regress/expected/collate.icu.utf8.out | 136 +++++++++++++++--- src/test/regress/sql/collate.icu.utf8.sql | 32 ++++- 3 files changed, 236 insertions(+), 37 deletions(-) diff --git a/src/backend/utils/adt/varlena.c b/src/backend/utils/adt/varlena.c index 7c6391a2760..5e72cdcdd27 100644 --- a/src/backend/utils/adt/varlena.c +++ b/src/backend/utils/adt/varlena.c @@ -54,7 +54,9 @@ typedef struct varlena VarString; */ typedef struct { + pg_locale_t locale; /* collation used for substring matching */ bool is_multibyte_char_in_char; /* need to check char boundaries? */ + bool greedy; /* find longest possible substring? */ char *str1; /* haystack string */ char *str2; /* needle string */ @@ -65,7 +67,13 @@ typedef struct int skiptablemask; /* mask for ANDing with skiptable subscripts */ int skiptable[256]; /* skip distance for given mismatched char */ + /* + * Note that with nondeterministic collations, the length of the last + * match is not necessarily equal to the length of the "needle" passed in. + */ char *last_match; /* pointer to last match in 'str1' */ + int last_match_len; /* length of last match */ + int last_match_len_tmp; /* same but for internal use */ /* * Sometimes we need to convert the byte position of a match to a @@ -1178,15 +1186,21 @@ text_position(text *t1, text *t2, Oid collid) TextPositionState state; int result; + check_collation_set(collid); + /* Empty needle always matches at position 1 */ if (VARSIZE_ANY_EXHDR(t2) < 1) return 1; /* Otherwise, can't match if haystack is shorter than needle */ - if (VARSIZE_ANY_EXHDR(t1) < VARSIZE_ANY_EXHDR(t2)) + if (VARSIZE_ANY_EXHDR(t1) < VARSIZE_ANY_EXHDR(t2) && + pg_newlocale_from_collation(collid)->deterministic) return 0; text_position_setup(t1, t2, collid, &state); + /* don't need greedy mode here */ + state.greedy = false; + if (!text_position_next(&state)) result = 0; else @@ -1217,18 +1231,17 @@ text_position_setup(text *t1, text *t2, Oid collid, TextPositionState *state) { int len1 = VARSIZE_ANY_EXHDR(t1); int len2 = VARSIZE_ANY_EXHDR(t2); - pg_locale_t mylocale; check_collation_set(collid); - mylocale = pg_newlocale_from_collation(collid); + state->locale = pg_newlocale_from_collation(collid); - if (!pg_locale_deterministic(mylocale)) - ereport(ERROR, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("nondeterministic collations are not supported for substring searches"))); + /* + * Most callers need greedy mode, but some might want to unset this to + * optimize. + */ + state->greedy = true; - Assert(len1 > 0); Assert(len2 > 0); /* @@ -1264,8 +1277,11 @@ text_position_setup(text *t1, text *t2, Oid collid, TextPositionState *state) * point in wasting cycles initializing the table. We also choose not to * use B-M-H for needles of length 1, since the skip table can't possibly * save anything in that case. + * + * (With nondeterministic collations, the search was already + * multibyte-aware, so we don't need this.) */ - if (len1 >= len2 && len2 > 1) + if (len1 >= len2 && len2 > 1 && state->locale->deterministic) { int searchlength = len1 - len2; int skiptablemask; @@ -1343,7 +1359,7 @@ text_position_next(TextPositionState *state) /* Start from the point right after the previous match. */ if (state->last_match) - start_ptr = state->last_match + needle_len; + start_ptr = state->last_match + state->last_match_len; else start_ptr = state->str1; @@ -1359,7 +1375,7 @@ text_position_next(TextPositionState *state) * multi-byte character, we need to verify that the match was at a * character boundary, not in the middle of a multi-byte character. */ - if (state->is_multibyte_char_in_char) + if (state->is_multibyte_char_in_char && state->locale->deterministic) { /* Walk one character at a time, until we reach the match. */ @@ -1387,6 +1403,7 @@ text_position_next(TextPositionState *state) } state->last_match = matchptr; + state->last_match_len = state->last_match_len_tmp; return true; } @@ -1408,7 +1425,63 @@ text_position_next_internal(char *start_ptr, TextPositionState *state) Assert(start_ptr >= haystack && start_ptr <= haystack_end); - if (needle_len == 1) + state->last_match_len_tmp = needle_len; + + if (!state->locale->deterministic) + { + /* + * With a nondeterministic collation, we have to use an unoptimized + * route. We walk through the haystack and see if at each position + * there is a substring of the remaining string that is equal to the + * needle under the given collation. + * + * Note, the found substring could have a different length than the + * needle, including being empty. Callers that want to skip over the + * found string need to read the length of the found substring from + * last_match_len rather than just using the length of their needle. + * + * Most callers will require "greedy" semantics, meaning that we need + * to find the longest such substring, not the shortest. For callers + * don't don't need greedy semantics, we can finish on the first + * match. + */ + const char *result_hptr = NULL; + + hptr = start_ptr; + while (hptr < haystack_end) + { + /* + * First check the common case that there is a match in the + * haystack of exactly the length of the needle. + */ + if (!state->greedy && + haystack_end - hptr >= needle_len && + pg_strncoll(hptr, needle_len, needle, needle_len, state->locale) == 0) + return (char *) hptr; + + /* + * Else check if any of the possible substrings starting at hptr + * are equal to the needle. + */ + for (const char *test_end = hptr; test_end < haystack_end; test_end += pg_mblen(test_end)) + { + if (pg_strncoll(hptr, (test_end - hptr), needle, needle_len, state->locale) == 0) + { + state->last_match_len_tmp = (test_end - hptr); + result_hptr = hptr; + if (!state->greedy) + break; + } + } + if (result_hptr) + break; + + hptr += pg_mblen(hptr); + } + + return (char *) result_hptr; + } + else if (needle_len == 1) { /* No point in using B-M-H for a one-character needle */ char nchar = *needle; @@ -4022,7 +4095,7 @@ replace_text(PG_FUNCTION_ARGS) appendStringInfoText(&str, to_sub_text); - start_ptr = curr_ptr + from_sub_text_len; + start_ptr = curr_ptr + state.last_match_len; found = text_position_next(&state); if (found) @@ -4412,7 +4485,7 @@ split_part(PG_FUNCTION_ARGS) /* special case of last field does not require an extra pass */ if (fldnum == -1) { - start_ptr = text_position_get_match_ptr(&state) + fldsep_len; + start_ptr = text_position_get_match_ptr(&state) + state.last_match_len; end_ptr = VARDATA_ANY(inputstring) + inputstring_len; text_position_cleanup(&state); PG_RETURN_TEXT_P(cstring_to_text_with_len(start_ptr, @@ -4442,7 +4515,7 @@ split_part(PG_FUNCTION_ARGS) while (found && --fldnum > 0) { /* identify bounds of next field */ - start_ptr = end_ptr + fldsep_len; + start_ptr = end_ptr + state.last_match_len; found = text_position_next(&state); if (found) end_ptr = text_position_get_match_ptr(&state); @@ -4658,7 +4731,7 @@ split_text(FunctionCallInfo fcinfo, SplitTextOutputData *tstate) if (!found) break; - start_ptr = end_ptr + fldsep_len; + start_ptr = end_ptr + state.last_match_len; } text_position_cleanup(&state); diff --git a/src/test/regress/expected/collate.icu.utf8.out b/src/test/regress/expected/collate.icu.utf8.out index 31345295c11..319e3eea476 100644 --- a/src/test/regress/expected/collate.icu.utf8.out +++ b/src/test/regress/expected/collate.icu.utf8.out @@ -1276,26 +1276,96 @@ CREATE COLLATION ctest_nondet (provider = icu, locale = '', deterministic = fals NOTICE: using standard form "und" for ICU locale "" CREATE TABLE test6 (a int, b text); -- same string in different normal forms -INSERT INTO test6 VALUES (1, U&'\00E4bc'); -INSERT INTO test6 VALUES (2, U&'\0061\0308bc'); +INSERT INTO test6 VALUES (1, U&'zy\00E4bc'); +INSERT INTO test6 VALUES (2, U&'zy\0061\0308bc'); SELECT * FROM test6; - a | b ----+----- - 1 | äbc - 2 | äbc + a | b +---+------- + 1 | zyäbc + 2 | zyäbc (2 rows) -SELECT * FROM test6 WHERE b = 'äbc' COLLATE ctest_det; - a | b ----+----- - 1 | äbc +SELECT * FROM test6 WHERE b = 'zyäbc' COLLATE ctest_det; + a | b +---+------- + 1 | zyäbc (1 row) -SELECT * FROM test6 WHERE b = 'äbc' COLLATE ctest_nondet; - a | b ----+----- - 1 | äbc - 2 | äbc +SELECT * FROM test6 WHERE b = 'zyäbc' COLLATE ctest_nondet; + a | b +---+------- + 1 | zyäbc + 2 | zyäbc +(2 rows) + +SELECT strpos(b COLLATE ctest_det, 'bc') FROM test6; + strpos +-------- + 4 + 5 +(2 rows) + +SELECT strpos(b COLLATE ctest_nondet, 'bc') FROM test6; + strpos +-------- + 4 + 5 +(2 rows) + +SELECT replace(b COLLATE ctest_det, U&'\00E4b', 'X') FROM test6; + replace +--------- + zyXc + zyäbc +(2 rows) + +SELECT replace(b COLLATE ctest_nondet, U&'\00E4b', 'X') FROM test6; + replace +--------- + zyXc + zyXc +(2 rows) + +SELECT a, split_part(b COLLATE ctest_det, U&'\00E4b', 2) FROM test6; + a | split_part +---+------------ + 1 | c + 2 | +(2 rows) + +SELECT a, split_part(b COLLATE ctest_nondet, U&'\00E4b', 2) FROM test6; + a | split_part +---+------------ + 1 | c + 2 | c +(2 rows) + +SELECT a, split_part(b COLLATE ctest_det, U&'\00E4b', -1) FROM test6; + a | split_part +---+------------ + 1 | c + 2 | zyäbc +(2 rows) + +SELECT a, split_part(b COLLATE ctest_nondet, U&'\00E4b', -1) FROM test6; + a | split_part +---+------------ + 1 | c + 2 | c +(2 rows) + +SELECT a, string_to_array(b COLLATE ctest_det, U&'\00E4b') FROM test6; + a | string_to_array +---+----------------- + 1 | {zy,c} + 2 | {zyäbc} +(2 rows) + +SELECT a, string_to_array(b COLLATE ctest_nondet, U&'\00E4b') FROM test6; + a | string_to_array +---+----------------- + 1 | {zy,c} + 2 | {zy,c} (2 rows) -- same with arrays @@ -1601,7 +1671,11 @@ CREATE UNIQUE INDEX ON test3ci (x); -- error ERROR: could not create unique index "test3ci_x_idx" DETAIL: Key (x)=(abc) is duplicated. SELECT string_to_array('ABC,DEF,GHI' COLLATE case_insensitive, ',', 'abc'); -ERROR: nondeterministic collations are not supported for substring searches + string_to_array +----------------- + {NULL,DEF,GHI} +(1 row) + SELECT string_to_array('ABCDEFGHI' COLLATE case_insensitive, NULL, 'b'); string_to_array ------------------------ @@ -1719,7 +1793,11 @@ CREATE UNIQUE INDEX ON test3bpci (x); -- error ERROR: could not create unique index "test3bpci_x_idx" DETAIL: Key (x)=(abc) is duplicated. SELECT string_to_array('ABC,DEF,GHI'::char(11) COLLATE case_insensitive, ',', 'abc'); -ERROR: nondeterministic collations are not supported for substring searches + string_to_array +----------------- + {NULL,DEF,GHI} +(1 row) + SELECT string_to_array('ABCDEFGHI'::char(9) COLLATE case_insensitive, NULL, 'b'); string_to_array ------------------------ @@ -1840,6 +1918,30 @@ SELECT * FROM test4 WHERE b = 'Cote' COLLATE case_insensitive; 1 | cote (1 row) +CREATE TABLE test4nfd (a int, b text); +INSERT INTO test4nfd VALUES (1, 'cote'), (2, 'côte'), (3, 'coté'), (4, 'côté'); +UPDATE test4nfd SET b = normalize(b, nfd); +-- This shows why replace should be greedy. Otherwise, in the NFD +-- case, the match would stop before the decomposed accents, which +-- would leave the accents in the results. +SELECT a, b, replace(b COLLATE ignore_accents, 'co', 'ma') FROM test4; + a | b | replace +---+------+--------- + 1 | cote | mate + 2 | côte | mate + 3 | coté | maté + 4 | côté | maté +(4 rows) + +SELECT a, b, replace(b COLLATE ignore_accents, 'co', 'ma') FROM test4nfd; + a | b | replace +---+------+--------- + 1 | cote | mate + 2 | côte | mate + 3 | coté | maté + 4 | côté | maté +(4 rows) + -- foreign keys (should use collation of primary key) -- PK is case-sensitive, FK is case-insensitive CREATE TABLE test10pk (x text COLLATE case_sensitive PRIMARY KEY); diff --git a/src/test/regress/sql/collate.icu.utf8.sql b/src/test/regress/sql/collate.icu.utf8.sql index 80f28a97d78..ffbb3897af0 100644 --- a/src/test/regress/sql/collate.icu.utf8.sql +++ b/src/test/regress/sql/collate.icu.utf8.sql @@ -516,11 +516,25 @@ CREATE COLLATION ctest_nondet (provider = icu, locale = '', deterministic = fals CREATE TABLE test6 (a int, b text); -- same string in different normal forms -INSERT INTO test6 VALUES (1, U&'\00E4bc'); -INSERT INTO test6 VALUES (2, U&'\0061\0308bc'); +INSERT INTO test6 VALUES (1, U&'zy\00E4bc'); +INSERT INTO test6 VALUES (2, U&'zy\0061\0308bc'); SELECT * FROM test6; -SELECT * FROM test6 WHERE b = 'äbc' COLLATE ctest_det; -SELECT * FROM test6 WHERE b = 'äbc' COLLATE ctest_nondet; +SELECT * FROM test6 WHERE b = 'zyäbc' COLLATE ctest_det; +SELECT * FROM test6 WHERE b = 'zyäbc' COLLATE ctest_nondet; + +SELECT strpos(b COLLATE ctest_det, 'bc') FROM test6; +SELECT strpos(b COLLATE ctest_nondet, 'bc') FROM test6; + +SELECT replace(b COLLATE ctest_det, U&'\00E4b', 'X') FROM test6; +SELECT replace(b COLLATE ctest_nondet, U&'\00E4b', 'X') FROM test6; + +SELECT a, split_part(b COLLATE ctest_det, U&'\00E4b', 2) FROM test6; +SELECT a, split_part(b COLLATE ctest_nondet, U&'\00E4b', 2) FROM test6; +SELECT a, split_part(b COLLATE ctest_det, U&'\00E4b', -1) FROM test6; +SELECT a, split_part(b COLLATE ctest_nondet, U&'\00E4b', -1) FROM test6; + +SELECT a, string_to_array(b COLLATE ctest_det, U&'\00E4b') FROM test6; +SELECT a, string_to_array(b COLLATE ctest_nondet, U&'\00E4b') FROM test6; -- same with arrays CREATE TABLE test6a (a int, b text[]); @@ -687,6 +701,16 @@ CREATE TABLE test4 (a int, b text); SELECT * FROM test4 WHERE b = 'Cote' COLLATE ignore_accents; -- still case-sensitive SELECT * FROM test4 WHERE b = 'Cote' COLLATE case_insensitive; +CREATE TABLE test4nfd (a int, b text); +INSERT INTO test4nfd VALUES (1, 'cote'), (2, 'côte'), (3, 'coté'), (4, 'côté'); +UPDATE test4nfd SET b = normalize(b, nfd); + +-- This shows why replace should be greedy. Otherwise, in the NFD +-- case, the match would stop before the decomposed accents, which +-- would leave the accents in the results. +SELECT a, b, replace(b COLLATE ignore_accents, 'co', 'ma') FROM test4; +SELECT a, b, replace(b COLLATE ignore_accents, 'co', 'ma') FROM test4nfd; + -- foreign keys (should use collation of primary key) -- PK is case-sensitive, FK is case-insensitive base-commit: 8daa62a10c911c851f7e9ec5ef7b90cfd4b73212 -- 2.46.0