From 26af14449d9b235f33f0b48c9741f874a6b6ac20 Mon Sep 17 00:00:00 2001
From: Atsushi Torikoshi <torikoshia@oss.nttdata.com>
Date: Tue, 23 Jan 2024 00:04:25 +0900
Subject: [PATCH v2 2/2] Add new COPY option REJECT_LIMIT

REJECT_LIMIT specifies the maximum tolerable number of malformed rows.
If input data has more malformed errors than this value, entire COPY fails.
This option must be used with ON_ERROR to be set to other than stop.
---
 src/backend/commands/copy.c         | 16 ++++++++++++++++
 src/backend/commands/copyfrom.c     |  5 +++++
 src/include/commands/copy.h         |  1 +
 src/test/regress/expected/copy2.out | 17 +++++++++++++++++
 src/test/regress/sql/copy2.sql      | 29 +++++++++++++++++++++++++++++
 5 files changed, 68 insertions(+)

diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index cc0786c6f4..ca5263d588 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -615,6 +615,22 @@ ProcessCopyOptions(ParseState *pstate,
 			on_error_specified = true;
 			opts_out->on_error = defGetCopyOnErrorChoice(defel, pstate, is_from);
 		}
+		else if (strcmp(defel->defname, "reject_limit") == 0)
+		{
+			int64	reject_limit = defGetInt64(defel);
+
+			if (!opts_out->on_error)
+				ereport(ERROR,
+						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+						 errmsg("REJECT_LIMIT requires ON_ERROR to be set to other than stop")));
+			if (opts_out->reject_limit > 0)
+				errorConflictingDefElem(defel, pstate);
+			if (reject_limit <= 0)
+				ereport(ERROR,
+						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+						 errmsg("REJECT_LIMIT must be greater than zero")));
+			opts_out->reject_limit = reject_limit;
+		}
 		else
 			ereport(ERROR,
 					(errcode(ERRCODE_SYNTAX_ERROR),
diff --git a/src/backend/commands/copyfrom.c b/src/backend/commands/copyfrom.c
index 8ab3777664..a5902f3887 100644
--- a/src/backend/commands/copyfrom.c
+++ b/src/backend/commands/copyfrom.c
@@ -1017,6 +1017,11 @@ CopyFrom(CopyFromState cstate)
 			pgstat_progress_update_param(PROGRESS_COPY_TUPLES_SKIPPED,
 											 ++skipped);
 
+			if (cstate->opts.reject_limit > 0 && skipped > cstate->opts.reject_limit)
+				ereport(ERROR,
+						(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
+						 errmsg("exceeded the number specified by REJECT LIMIT \"%d\"",
+								cstate->opts.reject_limit)));
 			continue;
 		}
 
diff --git a/src/include/commands/copy.h b/src/include/commands/copy.h
index b3da3cb0be..8f8dab9524 100644
--- a/src/include/commands/copy.h
+++ b/src/include/commands/copy.h
@@ -73,6 +73,7 @@ typedef struct CopyFormatOptions
 	bool	   *force_null_flags;	/* per-column CSV FN flags */
 	bool		convert_selectively;	/* do selective binary conversion? */
 	CopyOnErrorChoice on_error; /* what to do when error happened */
+	int			reject_limit;	/* tolerable number of malformed rows */
 	List	   *convert_select; /* list of column names (can be NIL) */
 } CopyFormatOptions;
 
diff --git a/src/test/regress/expected/copy2.out b/src/test/regress/expected/copy2.out
index 25c401ce34..4f12a4f2cb 100644
--- a/src/test/regress/expected/copy2.out
+++ b/src/test/regress/expected/copy2.out
@@ -751,6 +751,23 @@ CONTEXT:  COPY check_ign_err, line 1: "1	{1}"
 COPY check_ign_err FROM STDIN WITH (on_error ignore);
 ERROR:  extra data after last expected column
 CONTEXT:  COPY check_ign_err, line 1: "1	{1}	3	abc"
+-- tests for reject_limit option
+COPY check_ign_err FROM STDIN WITH (on_error ignore, reject_limit 3);
+ERROR:  exceeded the number specified by REJECT LIMIT "3"
+CONTEXT:  COPY check_ign_err, line 5, column n: ""
+COPY check_ign_err FROM STDIN WITH (on_error ignore, reject_limit 4);
+NOTICE:  4 rows were skipped due to data type incompatibility
+-- test reject_limit without on_error option: should fail
+COPY check_ign_err FROM STDIN WITH (reject_limit 3);
+ERROR:  REJECT_LIMIT requires ON_ERROR to be set to other than stop
+-- test reject_limit specified string value: should fail
+COPY check_ign_err FROM STDIN WITH (on_error ignore, reject_limit foo);
+ERROR:  reject_limit requires a numeric value
+-- test reject_limit specified less than or equal to 0: should fail
+COPY check_ign_err FROM STDIN WITH (on_error ignore, reject_limit -3);
+ERROR:  REJECT_LIMIT must be greater than zero
+COPY check_ign_err FROM STDIN WITH (on_error ignore, reject_limit 0);
+ERROR:  REJECT_LIMIT must be greater than zero
 -- clean up
 DROP TABLE forcetest;
 DROP TABLE vistest;
diff --git a/src/test/regress/sql/copy2.sql b/src/test/regress/sql/copy2.sql
index b5e549e856..0b3fd59ba0 100644
--- a/src/test/regress/sql/copy2.sql
+++ b/src/test/regress/sql/copy2.sql
@@ -534,6 +534,35 @@ COPY check_ign_err FROM STDIN WITH (on_error ignore);
 1	{1}	3	abc
 \.
 
+-- tests for reject_limit option
+COPY check_ign_err FROM STDIN WITH (on_error ignore, reject_limit 3);
+6	{6}	6
+a	{7}	7
+7	{7}	7777777777
+8	{a, 8}	8
+
+9	{9}	9
+\.
+
+COPY check_ign_err FROM STDIN WITH (on_error ignore, reject_limit 4);
+6	{6}	6
+a	{7}	7
+7	{7}	7777777777
+8	{a, 8}	8
+
+9	{9}	9
+\.
+
+-- test reject_limit without on_error option: should fail
+COPY check_ign_err FROM STDIN WITH (reject_limit 3);
+
+-- test reject_limit specified string value: should fail
+COPY check_ign_err FROM STDIN WITH (on_error ignore, reject_limit foo);
+
+-- test reject_limit specified less than or equal to 0: should fail
+COPY check_ign_err FROM STDIN WITH (on_error ignore, reject_limit -3);
+COPY check_ign_err FROM STDIN WITH (on_error ignore, reject_limit 0);
+
 -- clean up
 DROP TABLE forcetest;
 DROP TABLE vistest;
-- 
2.39.2

