From 1fb0d7a1d4af8961edb56e8d05400a118e9a584c Mon Sep 17 00:00:00 2001 From: Fujii Masao Date: Mon, 30 Sep 2024 23:05:26 +0900 Subject: [PATCH v6 2/3] file_fdw: Add on_error and log_verbosity options to file_fdw. In v17, the on_error and log_verbosity options were introduced for the COPY command. This commit extends support for these options to file_fdw. Setting on_error = 'ignore' for a file_fdw foreign table allows users to query it without errors, even when the input file contains malformed rows, by skipping the problematic rows. Both on_error and log_verbosity options apply to SELECT and ANALYZE operations on file_fdw foreign tables. Author: Atsushi Torikoshi Reviewed-by: Fujii Masao Discussion: https://postgr.es/m/ab59dad10490ea3734cf022b16c24cfd@oss.nttdata.com --- contrib/file_fdw/expected/file_fdw.out | 19 +++++++ contrib/file_fdw/file_fdw.c | 72 +++++++++++++++++++++++--- contrib/file_fdw/sql/file_fdw.sql | 7 +++ doc/src/sgml/file-fdw.sgml | 23 ++++++++ 4 files changed, 113 insertions(+), 8 deletions(-) diff --git a/contrib/file_fdw/expected/file_fdw.out b/contrib/file_fdw/expected/file_fdw.out index 86c148a86b..593fdc782e 100644 --- a/contrib/file_fdw/expected/file_fdw.out +++ b/contrib/file_fdw/expected/file_fdw.out @@ -206,6 +206,25 @@ SELECT * FROM agg_csv c JOIN agg_text t ON (t.a = c.a) ORDER BY c.a; SELECT * FROM agg_bad; -- ERROR ERROR: invalid input syntax for type real: "aaa" CONTEXT: COPY agg_bad, line 3, column b: "aaa" +-- on_error and log_verbosity tests +ALTER FOREIGN TABLE agg_bad OPTIONS (ADD on_error 'ignore'); +SELECT * FROM agg_bad; +NOTICE: 1 row was skipped due to data type incompatibility + a | b +-----+-------- + 100 | 99.097 + 42 | 324.78 +(2 rows) + +ALTER FOREIGN TABLE agg_bad OPTIONS (ADD log_verbosity 'silent'); +SELECT * FROM agg_bad; + a | b +-----+-------- + 100 | 99.097 + 42 | 324.78 +(2 rows) + +ANALYZE agg_bad; -- misc query tests \t on SELECT explain_filter('EXPLAIN (VERBOSE, COSTS FALSE) SELECT * FROM agg_csv'); diff --git a/contrib/file_fdw/file_fdw.c b/contrib/file_fdw/file_fdw.c index d16821f8e1..1e28c20797 100644 --- a/contrib/file_fdw/file_fdw.c +++ b/contrib/file_fdw/file_fdw.c @@ -22,6 +22,7 @@ #include "catalog/pg_authid.h" #include "catalog/pg_foreign_table.h" #include "commands/copy.h" +#include "commands/copyfrom_internal.h" #include "commands/defrem.h" #include "commands/explain.h" #include "commands/vacuum.h" @@ -74,6 +75,8 @@ static const struct FileFdwOption valid_options[] = { {"null", ForeignTableRelationId}, {"default", ForeignTableRelationId}, {"encoding", ForeignTableRelationId}, + {"on_error", ForeignTableRelationId}, + {"log_verbosity", ForeignTableRelationId}, {"force_not_null", AttributeRelationId}, {"force_null", AttributeRelationId}, @@ -725,12 +728,12 @@ fileIterateForeignScan(ForeignScanState *node) ExprContext *econtext; MemoryContext oldcontext; TupleTableSlot *slot = node->ss.ss_ScanTupleSlot; - bool found; + CopyFromState cstate = festate->cstate; ErrorContextCallback errcallback; /* Set up callback to identify error line number. */ errcallback.callback = CopyFromErrorCallback; - errcallback.arg = (void *) festate->cstate; + errcallback.arg = (void *) cstate; errcallback.previous = error_context_stack; error_context_stack = &errcallback; @@ -751,10 +754,27 @@ fileIterateForeignScan(ForeignScanState *node) * switch in case we are doing that. */ oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); - found = NextCopyFrom(festate->cstate, econtext, - slot->tts_values, slot->tts_isnull); - if (found) + +retry: + if (NextCopyFrom(cstate, econtext, slot->tts_values, slot->tts_isnull)) + { + if (cstate->opts.on_error == COPY_ON_ERROR_IGNORE && + cstate->escontext->error_occurred) + { + /* + * Soft error occurred, skip this tuple and just make + * ErrorSaveContext ready for the next NextCopyFrom. Since we + * don't set details_wanted and error_data is not to be filled, + * just resetting error_occurred is enough. + */ + cstate->escontext->error_occurred = false; + + /* Repeat NextCopyFrom() until no soft error occurs */ + goto retry; + } + ExecStoreVirtualTuple(slot); + } /* Switch back to original memory context */ MemoryContextSwitchTo(oldcontext); @@ -796,8 +816,19 @@ fileEndForeignScan(ForeignScanState *node) FileFdwExecutionState *festate = (FileFdwExecutionState *) node->fdw_state; /* if festate is NULL, we are in EXPLAIN; nothing to do */ - if (festate) - EndCopyFrom(festate->cstate); + if (!festate) + return; + + if (festate->cstate->opts.on_error == COPY_ON_ERROR_IGNORE && + festate->cstate->num_errors > 0 && + festate->cstate->opts.log_verbosity >= COPY_LOG_VERBOSITY_DEFAULT) + ereport(NOTICE, + errmsg_plural("%llu row was skipped due to data type incompatibility", + "%llu rows were skipped due to data type incompatibility", + (unsigned long long) festate->cstate->num_errors, + (unsigned long long) festate->cstate->num_errors)); + + EndCopyFrom(festate->cstate); } /* @@ -1113,7 +1144,8 @@ estimate_costs(PlannerInfo *root, RelOptInfo *baserel, * which must have at least targrows entries. * The actual number of rows selected is returned as the function result. * We also count the total number of rows in the file and return it into - * *totalrows. Note that *totaldeadrows is always set to 0. + * *totalrows. Rows skipped due to on_error = 'ignore' are not included + * in this count. Note that *totaldeadrows is always set to 0. * * Note that the returned list of rows is not always in order by physical * position in the file. Therefore, correlation estimates derived later @@ -1191,6 +1223,21 @@ file_acquire_sample_rows(Relation onerel, int elevel, if (!found) break; + if (cstate->opts.on_error == COPY_ON_ERROR_IGNORE && + cstate->escontext->error_occurred) + { + /* + * Soft error occurred, skip this tuple and just make + * ErrorSaveContext ready for the next NextCopyFrom. Since we + * don't set details_wanted and error_data is not to be filled, + * just resetting error_occurred is enough. + */ + cstate->escontext->error_occurred = false; + + /* Repeat NextCopyFrom() until no soft error occurs */ + continue; + } + /* * The first targrows sample rows are simply copied into the * reservoir. Then we start replacing tuples in the sample until we @@ -1236,6 +1283,15 @@ file_acquire_sample_rows(Relation onerel, int elevel, /* Clean up. */ MemoryContextDelete(tupcontext); + if (cstate->opts.on_error == COPY_ON_ERROR_IGNORE && + cstate->num_errors > 0 && + cstate->opts.log_verbosity >= COPY_LOG_VERBOSITY_DEFAULT) + ereport(NOTICE, + errmsg_plural("%llu row was skipped due to data type incompatibility", + "%llu rows were skipped due to data type incompatibility", + (unsigned long long) cstate->num_errors, + (unsigned long long) cstate->num_errors)); + EndCopyFrom(cstate); pfree(values); diff --git a/contrib/file_fdw/sql/file_fdw.sql b/contrib/file_fdw/sql/file_fdw.sql index f0548e14e1..edd77c5cd2 100644 --- a/contrib/file_fdw/sql/file_fdw.sql +++ b/contrib/file_fdw/sql/file_fdw.sql @@ -150,6 +150,13 @@ SELECT * FROM agg_csv c JOIN agg_text t ON (t.a = c.a) ORDER BY c.a; -- error context report tests SELECT * FROM agg_bad; -- ERROR +-- on_error and log_verbosity tests +ALTER FOREIGN TABLE agg_bad OPTIONS (ADD on_error 'ignore'); +SELECT * FROM agg_bad; +ALTER FOREIGN TABLE agg_bad OPTIONS (ADD log_verbosity 'silent'); +SELECT * FROM agg_bad; +ANALYZE agg_bad; + -- misc query tests \t on SELECT explain_filter('EXPLAIN (VERBOSE, COSTS FALSE) SELECT * FROM agg_csv'); diff --git a/doc/src/sgml/file-fdw.sgml b/doc/src/sgml/file-fdw.sgml index f2f2af9a59..bb3579b077 100644 --- a/doc/src/sgml/file-fdw.sgml +++ b/doc/src/sgml/file-fdw.sgml @@ -126,6 +126,29 @@ + + on_error + + + + Specifies how to behave when encountering an error converting a column's + input value into its data type, + the same as COPY's ON_ERROR option. + + + + + + log_verbosity + + + + Specifies the amount of messages emitted by file_fdw, + the same as COPY's LOG_VERBOSITY option. + + + + -- 2.45.2