From b8eb13b5bd4114a9860cfa83f8240ab09db588b4 Mon Sep 17 00:00:00 2001
From: Justin Pryzby <pryzbyj@telsasoft.com>
Date: Tue, 22 Dec 2020 01:06:26 -0600
Subject: [PATCH 06/20] pg_dump: zstd compression

document any change in search for .gz?
docs
Maybe compress_io should be split so all the library-specific stuff are in
separate files, like compress_{zlib/zstd}.c
---
 configure                             | 123 ++++++-
 configure.ac                          |  22 ++
 src/bin/pg_dump/compress_io.c         | 480 ++++++++++++++++++++++++++
 src/bin/pg_dump/pg_backup.h           |  14 +
 src/bin/pg_dump/pg_backup_archiver.h  |   4 +
 src/bin/pg_dump/pg_backup_directory.c |   8 +-
 src/bin/pg_dump/pg_dump.c             |  39 +++
 src/include/pg_config.h.in            |   3 +
 src/tools/msvc/Solution.pm            |   1 +
 9 files changed, 686 insertions(+), 8 deletions(-)

diff --git a/configure b/configure
index 11a4284e5b..fe739879af 100755
--- a/configure
+++ b/configure
@@ -698,6 +698,7 @@ with_gnu_ld
 LD
 LDFLAGS_SL
 LDFLAGS_EX
+with_zstd
 with_zlib
 with_system_tzdata
 with_libxslt
@@ -798,6 +799,7 @@ infodir
 docdir
 oldincludedir
 includedir
+runstatedir
 localstatedir
 sharedstatedir
 sysconfdir
@@ -866,6 +868,7 @@ with_libxml
 with_libxslt
 with_system_tzdata
 with_zlib
+with_zstd
 with_gnu_ld
 enable_largefile
 '
@@ -935,6 +938,7 @@ datadir='${datarootdir}'
 sysconfdir='${prefix}/etc'
 sharedstatedir='${prefix}/com'
 localstatedir='${prefix}/var'
+runstatedir='${localstatedir}/run'
 includedir='${prefix}/include'
 oldincludedir='/usr/include'
 docdir='${datarootdir}/doc/${PACKAGE_TARNAME}'
@@ -1187,6 +1191,15 @@ do
   | -silent | --silent | --silen | --sile | --sil)
     silent=yes ;;
 
+  -runstatedir | --runstatedir | --runstatedi | --runstated \
+  | --runstate | --runstat | --runsta | --runst | --runs \
+  | --run | --ru | --r)
+    ac_prev=runstatedir ;;
+  -runstatedir=* | --runstatedir=* | --runstatedi=* | --runstated=* \
+  | --runstate=* | --runstat=* | --runsta=* | --runst=* | --runs=* \
+  | --run=* | --ru=* | --r=*)
+    runstatedir=$ac_optarg ;;
+
   -sbindir | --sbindir | --sbindi | --sbind | --sbin | --sbi | --sb)
     ac_prev=sbindir ;;
   -sbindir=* | --sbindir=* | --sbindi=* | --sbind=* | --sbin=* \
@@ -1324,7 +1337,7 @@ fi
 for ac_var in	exec_prefix prefix bindir sbindir libexecdir datarootdir \
 		datadir sysconfdir sharedstatedir localstatedir includedir \
 		oldincludedir docdir infodir htmldir dvidir pdfdir psdir \
-		libdir localedir mandir
+		libdir localedir mandir runstatedir
 do
   eval ac_val=\$$ac_var
   # Remove trailing slashes.
@@ -1477,6 +1490,7 @@ Fine tuning of the installation directories:
   --sysconfdir=DIR        read-only single-machine data [PREFIX/etc]
   --sharedstatedir=DIR    modifiable architecture-independent data [PREFIX/com]
   --localstatedir=DIR     modifiable single-machine data [PREFIX/var]
+  --runstatedir=DIR       modifiable per-process data [LOCALSTATEDIR/run]
   --libdir=DIR            object code libraries [EPREFIX/lib]
   --includedir=DIR        C header files [PREFIX/include]
   --oldincludedir=DIR     C header files for non-gcc [/usr/include]
@@ -1570,6 +1584,7 @@ Optional Packages:
   --with-system-tzdata=DIR
                           use system time zone data in DIR
   --without-zlib          do not use Zlib
+  --with-zstd             use Zstd compression library
   --with-gnu-ld           assume the C compiler uses GNU ld [default=no]
 
 Some influential environment variables:
@@ -8601,6 +8616,35 @@ fi
 
 
 
+#
+# Zstd
+#
+
+
+
+# Check whether --with-zstd was given.
+if test "${with_zstd+set}" = set; then :
+  withval=$with_zstd;
+  case $withval in
+    yes)
+      :
+      ;;
+    no)
+      :
+      ;;
+    *)
+      as_fn_error $? "no argument expected for --with-zstd option" "$LINENO" 5
+      ;;
+  esac
+
+else
+  with_zstd=no
+
+fi
+
+
+
+
 #
 # Assignments
 #
@@ -12092,6 +12136,59 @@ fi
 
 fi
 
+if test "$with_zstd" = yes; then
+  { $as_echo "$as_me:${as_lineno-$LINENO}: checking for ZSTD_compressStream2 in -lzstd" >&5
+$as_echo_n "checking for ZSTD_compressStream2 in -lzstd... " >&6; }
+if ${ac_cv_lib_zstd_ZSTD_compressStream2+:} false; then :
+  $as_echo_n "(cached) " >&6
+else
+  ac_check_lib_save_LIBS=$LIBS
+LIBS="-lzstd  $LIBS"
+cat confdefs.h - <<_ACEOF >conftest.$ac_ext
+/* end confdefs.h.  */
+
+/* Override any GCC internal prototype to avoid an error.
+   Use char because int might match the return type of a GCC
+   builtin and then its argument prototype would still apply.  */
+#ifdef __cplusplus
+extern "C"
+#endif
+char ZSTD_compressStream2 ();
+int
+main ()
+{
+return ZSTD_compressStream2 ();
+  ;
+  return 0;
+}
+_ACEOF
+if ac_fn_c_try_link "$LINENO"; then :
+  ac_cv_lib_zstd_ZSTD_compressStream2=yes
+else
+  ac_cv_lib_zstd_ZSTD_compressStream2=no
+fi
+rm -f core conftest.err conftest.$ac_objext \
+    conftest$ac_exeext conftest.$ac_ext
+LIBS=$ac_check_lib_save_LIBS
+fi
+{ $as_echo "$as_me:${as_lineno-$LINENO}: result: $ac_cv_lib_zstd_ZSTD_compressStream2" >&5
+$as_echo "$ac_cv_lib_zstd_ZSTD_compressStream2" >&6; }
+if test "x$ac_cv_lib_zstd_ZSTD_compressStream2" = xyes; then :
+  cat >>confdefs.h <<_ACEOF
+#define HAVE_LIBZSTD 1
+_ACEOF
+
+  LIBS="-lzstd $LIBS"
+
+else
+  as_fn_error $? "zstd library not found
+If you have zstd already installed, see config.log for details on the
+failure.  It is possible the compiler isn't looking in the proper directory.
+Use --without-zstd to disable zstd support." "$LINENO" 5
+fi
+
+fi
+
 if test "$enable_spinlocks" = yes; then
 
 $as_echo "#define HAVE_SPINLOCKS 1" >>confdefs.h
@@ -13295,6 +13392,20 @@ Use --without-zlib to disable zlib support." "$LINENO" 5
 fi
 
 
+fi
+
+if test "$with_zstd" = yes; then
+  ac_fn_c_check_header_mongrel "$LINENO" "zstd.h" "ac_cv_header_zstd_h" "$ac_includes_default"
+if test "x$ac_cv_header_zstd_h" = xyes; then :
+
+else
+  as_fn_error $? "zstd header not found
+If you have zstd already installed, see config.log for details on the
+failure.  It is possible the compiler isn't looking in the proper directory.
+Use --without-zstd to disable zstd support." "$LINENO" 5
+fi
+
+
 fi
 
 if test "$with_gssapi" = yes ; then
@@ -14689,7 +14800,7 @@ else
     We can't simply define LARGE_OFF_T to be 9223372036854775807,
     since some C++ compilers masquerading as C compilers
     incorrectly reject 9223372036854775807.  */
-#define LARGE_OFF_T (((off_t) 1 << 62) - 1 + ((off_t) 1 << 62))
+#define LARGE_OFF_T ((((off_t) 1 << 31) << 31) - 1 + (((off_t) 1 << 31) << 31))
   int off_t_is_large[(LARGE_OFF_T % 2147483629 == 721
 		       && LARGE_OFF_T % 2147483647 == 1)
 		      ? 1 : -1];
@@ -14735,7 +14846,7 @@ else
     We can't simply define LARGE_OFF_T to be 9223372036854775807,
     since some C++ compilers masquerading as C compilers
     incorrectly reject 9223372036854775807.  */
-#define LARGE_OFF_T (((off_t) 1 << 62) - 1 + ((off_t) 1 << 62))
+#define LARGE_OFF_T ((((off_t) 1 << 31) << 31) - 1 + (((off_t) 1 << 31) << 31))
   int off_t_is_large[(LARGE_OFF_T % 2147483629 == 721
 		       && LARGE_OFF_T % 2147483647 == 1)
 		      ? 1 : -1];
@@ -14759,7 +14870,7 @@ rm -f core conftest.err conftest.$ac_objext conftest.$ac_ext
     We can't simply define LARGE_OFF_T to be 9223372036854775807,
     since some C++ compilers masquerading as C compilers
     incorrectly reject 9223372036854775807.  */
-#define LARGE_OFF_T (((off_t) 1 << 62) - 1 + ((off_t) 1 << 62))
+#define LARGE_OFF_T ((((off_t) 1 << 31) << 31) - 1 + (((off_t) 1 << 31) << 31))
   int off_t_is_large[(LARGE_OFF_T % 2147483629 == 721
 		       && LARGE_OFF_T % 2147483647 == 1)
 		      ? 1 : -1];
@@ -14804,7 +14915,7 @@ else
     We can't simply define LARGE_OFF_T to be 9223372036854775807,
     since some C++ compilers masquerading as C compilers
     incorrectly reject 9223372036854775807.  */
-#define LARGE_OFF_T (((off_t) 1 << 62) - 1 + ((off_t) 1 << 62))
+#define LARGE_OFF_T ((((off_t) 1 << 31) << 31) - 1 + (((off_t) 1 << 31) << 31))
   int off_t_is_large[(LARGE_OFF_T % 2147483629 == 721
 		       && LARGE_OFF_T % 2147483647 == 1)
 		      ? 1 : -1];
@@ -14828,7 +14939,7 @@ rm -f core conftest.err conftest.$ac_objext conftest.$ac_ext
     We can't simply define LARGE_OFF_T to be 9223372036854775807,
     since some C++ compilers masquerading as C compilers
     incorrectly reject 9223372036854775807.  */
-#define LARGE_OFF_T (((off_t) 1 << 62) - 1 + ((off_t) 1 << 62))
+#define LARGE_OFF_T ((((off_t) 1 << 31) << 31) - 1 + (((off_t) 1 << 31) << 31))
   int off_t_is_large[(LARGE_OFF_T % 2147483629 == 721
 		       && LARGE_OFF_T % 2147483647 == 1)
 		      ? 1 : -1];
diff --git a/configure.ac b/configure.ac
index fc523c6aeb..744836ea7f 100644
--- a/configure.ac
+++ b/configure.ac
@@ -999,6 +999,13 @@ PGAC_ARG_BOOL(with, zlib, yes,
               [do not use Zlib])
 AC_SUBST(with_zlib)
 
+#
+# Zstd
+#
+PGAC_ARG_BOOL(with, zstd, no,
+              [use Zstd compression library])
+AC_SUBST(with_zstd)
+
 #
 # Assignments
 #
@@ -1186,6 +1193,14 @@ failure.  It is possible the compiler isn't looking in the proper directory.
 Use --without-zlib to disable zlib support.])])
 fi
 
+if test "$with_zstd" = yes; then
+  AC_CHECK_LIB(zstd, ZSTD_compressStream2, [],
+               [AC_MSG_ERROR([zstd library not found
+If you have zstd already installed, see config.log for details on the
+failure.  It is possible the compiler isn't looking in the proper directory.
+Use --without-zstd to disable zstd support.])])
+fi
+
 if test "$enable_spinlocks" = yes; then
   AC_DEFINE(HAVE_SPINLOCKS, 1, [Define to 1 if you have spinlocks.])
 else
@@ -1400,6 +1415,13 @@ failure.  It is possible the compiler isn't looking in the proper directory.
 Use --without-zlib to disable zlib support.])])
 fi
 
+if test "$with_zstd" = yes; then
+  AC_CHECK_HEADER(zstd.h, [], [AC_MSG_ERROR([zstd header not found
+If you have zstd already installed, see config.log for details on the
+failure.  It is possible the compiler isn't looking in the proper directory.
+Use --without-zstd to disable zstd support.])])
+fi
+
 if test "$with_gssapi" = yes ; then
   AC_CHECK_HEADERS(gssapi/gssapi.h, [],
 	[AC_CHECK_HEADERS(gssapi.h, [], [AC_MSG_ERROR([gssapi.h header file is required for GSSAPI])])])
diff --git a/src/bin/pg_dump/compress_io.c b/src/bin/pg_dump/compress_io.c
index d66d6f60f5..285f554c1a 100644
--- a/src/bin/pg_dump/compress_io.c
+++ b/src/bin/pg_dump/compress_io.c
@@ -65,6 +65,18 @@ compresslibs[] = {
 	{ COMPR_ALG_LIBZ, "libz", ".gz", Z_DEFAULT_COMPRESSION },
 	{ COMPR_ALG_LIBZ, "zlib", ".gz", Z_DEFAULT_COMPRESSION }, /* Alternate name */
 
+#ifdef HAVE_LIBZSTD
+	/*
+	 * ZSTD doesen't have a #define for it, but 0 means "the current default".
+	 * Note that ZSTD_CLEVEL_DEFAULT is currently defined to 3.
+	 *
+	 * Block size should be ZSTD_DStreamOutSize(), but needs to be
+	 * constant, so use ZSTD_BLOCKSIZE_MAX (128kB)
+	 */
+	{ COMPR_ALG_ZSTD, "zst",  ".zst", 0 },
+	{ COMPR_ALG_ZSTD, "zstd", ".zst", 0 }, /* Alternate name */
+#endif /* HAVE_LIBZSTD */
+
 	{ 0, NULL, } /* sentinel */
 };
 
@@ -84,6 +96,18 @@ struct CompressorState
 	char	   *zlibOut;
 	size_t		zlibOutSize;
 #endif
+
+#ifdef HAVE_LIBZSTD
+	union {
+		struct {
+			ZSTD_outBuffer output;
+			ZSTD_inBuffer input;
+			// XXX: use one separate ZSTD_CStream per thread: disable on windows ?
+			ZSTD_CStream *cstream;
+		} zstd;
+	} u;
+#endif
+
 };
 
 /* Routines that support zlib compressed data I/O */
@@ -97,6 +121,15 @@ static void WriteDataToArchiveZlib(ArchiveHandle *AH, CompressorState *cs,
 static void EndCompressorZlib(ArchiveHandle *AH, CompressorState *cs);
 #endif
 
+#ifdef HAVE_LIBZSTD
+static ZSTD_CStream *ZstdCStreamParams(Compress *compress);
+static void InitCompressorZstd(CompressorState *cs, Compress *compress);
+static void EndCompressorZstd(ArchiveHandle *AH, CompressorState *cs);
+static void WriteDataToArchiveZstd(ArchiveHandle *AH, CompressorState *cs,
+					   const char *data, size_t dLen);
+static void ReadDataFromArchiveZstd(ArchiveHandle *AH, ReadFunc readF);
+#endif
+
 /* Routines that support uncompressed data I/O */
 static void ReadDataFromArchiveNone(ArchiveHandle *AH, ReadFunc readF);
 static void WriteDataToArchiveNone(ArchiveHandle *AH, CompressorState *cs,
@@ -125,6 +158,13 @@ AllocateCompressor(Compress *compression, WriteFunc writeF)
 		InitCompressorZlib(cs, compression);
 		break;
 #endif
+
+#ifdef HAVE_LIBZSTD
+	case COMPR_ALG_ZSTD:
+		InitCompressorZstd(cs, compression);
+		break;
+#endif
+
 	case COMPR_ALG_NONE:
 		/* Do nothing */
 		break;
@@ -153,6 +193,13 @@ ReadDataFromArchive(ArchiveHandle *AH, ReadFunc readF)
 		ReadDataFromArchiveZlib(AH, readF);
 		break;
 #endif
+
+#ifdef HAVE_LIBZSTD
+	case COMPR_ALG_ZSTD:
+		ReadDataFromArchiveZstd(AH, readF);
+		break;
+#endif
+
 	default:
 		/* Should not happen */
 		fatal("requested compression not available in this installation");
@@ -173,6 +220,12 @@ WriteDataToArchive(ArchiveHandle *AH, CompressorState *cs,
 			WriteDataToArchiveZlib(AH, cs, data, dLen);
 			break;
 #endif
+
+#ifdef HAVE_LIBZSTD
+		case COMPR_ALG_ZSTD:
+			WriteDataToArchiveZstd(AH, cs, data, dLen);
+			break;
+#endif
 		case COMPR_ALG_NONE:
 			WriteDataToArchiveNone(AH, cs, data, dLen);
 			break;
@@ -193,11 +246,202 @@ EndCompressor(ArchiveHandle *AH, CompressorState *cs)
 	if (cs->comprAlg == COMPR_ALG_LIBZ)
 		EndCompressorZlib(AH, cs);
 #endif
+
+#ifdef HAVE_LIBZSTD
+	if (cs->comprAlg == COMPR_ALG_ZSTD)
+		EndCompressorZstd(AH, cs);
+#endif
+
 	free(cs);
 }
 
 /* Private routines, specific to each compression method. */
 
+#ifdef HAVE_LIBZSTD
+
+static void ZSTD_CCtx_setParam_or_die(ZSTD_CStream *cstream,
+		ZSTD_cParameter param, int value)
+
+{
+	size_t res;
+	res = ZSTD_CCtx_setParameter(cstream, param, value);
+	if (ZSTD_isError(res))
+		fatal("could not set compression parameter: %s",
+				ZSTD_getErrorName(res));
+}
+
+/* Return a compression stream with parameters set per argument */
+static ZSTD_CStream*
+ZstdCStreamParams(Compress *compress)
+{
+	ZSTD_CStream *cstream;
+	cstream = ZSTD_createCStream();
+	if (cstream == NULL)
+		fatal("could not initialize compression library");
+
+	if (compress->level != 0) // XXX: ZSTD_CLEVEL_DEFAULT
+	{
+		size_t res;
+		res = ZSTD_CCtx_setParameter(cstream,
+				ZSTD_c_compressionLevel, compress->level);
+		if (ZSTD_isError(res))
+			fatal("could not set compression level: %s",
+					ZSTD_getErrorName(res));
+	}
+
+	if (compress->zstd.longdistance) // XXX: ternary
+		ZSTD_CCtx_setParam_or_die(cstream,
+				ZSTD_c_enableLongDistanceMatching,
+				compress->zstd.longdistance);
+
+	if (compress->zstd.checksum)
+		ZSTD_CCtx_setParam_or_die(cstream, ZSTD_c_checksumFlag,
+				compress->zstd.checksum);
+
+// not supported in my library ?
+	if (compress->zstd.threads)
+		ZSTD_CCtx_setParam_or_die(cstream, ZSTD_c_nbWorkers,
+				compress->zstd.threads);
+
+#if 0
+	/* Still marked as experimental */
+	if (compress->zstd.rsyncable)
+		ZSTD_CCtx_setParam_or_die(cstream, ZSTD_c_rsyncable, 1);
+#endif
+
+	return cstream;
+}
+
+static void
+InitCompressorZstd(CompressorState *cs, Compress *compress)
+{
+	cs->u.zstd.cstream = ZstdCStreamParams(compress);
+	/* XXX: initialize safely like the corresponding zlib "paranoia" */
+	cs->u.zstd.output.size = ZSTD_CStreamOutSize();
+	cs->u.zstd.output.dst = pg_malloc(cs->u.zstd.output.size);
+	cs->u.zstd.output.pos = 0;
+}
+
+static void
+EndCompressorZstd(ArchiveHandle *AH, CompressorState *cs)
+{
+	ZSTD_outBuffer	*output = &cs->u.zstd.output;
+
+	for (;;)
+	{
+		size_t res;
+
+		res = ZSTD_compressStream2(cs->u.zstd.cstream, output,
+				&cs->u.zstd.input, ZSTD_e_end);
+
+		if (output->pos > 0)
+			cs->writeF(AH, output->dst, output->pos);
+
+		if (res == 0)
+			break;
+
+		if (ZSTD_isError(res))
+			fatal("could not close compression stream: %s",
+					ZSTD_getErrorName(res));
+	}
+
+	// XXX: retval
+	ZSTD_freeCStream(cs->u.zstd.cstream);
+}
+
+static void
+WriteDataToArchiveZstd(ArchiveHandle *AH, CompressorState *cs,
+					   const char *data, size_t dLen)
+{
+	ZSTD_inBuffer	*input = &cs->u.zstd.input;
+	ZSTD_outBuffer	*output = &cs->u.zstd.output;
+
+	input->src = (void *) unconstify(char *, data);
+	input->size = dLen;
+	input->pos = 0;
+
+	while (input->pos != input->size)
+	{
+		size_t		res;
+
+		res = ZSTD_compressStream2(cs->u.zstd.cstream, output,
+				input, ZSTD_e_continue);
+
+		if (output->pos == output->size ||
+				input->pos != input->size)
+		{
+			/*
+			 * Extra paranoia: avoid zero-length chunks, since a zero length
+			 * chunk is the EOF marker in the custom format. This should never
+			 * happen but...
+			 */
+			if (output->pos > 0)
+				cs->writeF(AH, output->dst, output->pos);
+
+			output->pos = 0;
+		}
+
+		if (ZSTD_isError(res))
+			fatal("could not compress data: %s", ZSTD_getErrorName(res));
+	}
+}
+
+/* Read data from a compressed zstd archive */
+static void
+ReadDataFromArchiveZstd(ArchiveHandle *AH, ReadFunc readF)
+{
+	ZSTD_DStream	*dstream;
+	ZSTD_outBuffer	output;
+	ZSTD_inBuffer	input;
+	size_t			res;
+	size_t			input_size;
+
+	dstream = ZSTD_createDStream();
+	if (dstream == NULL)
+		fatal("could not initialize compression library");
+
+	input_size = ZSTD_DStreamInSize();
+	input.src = pg_malloc(input_size);
+
+	output.size = ZSTD_DStreamOutSize();
+	output.dst = pg_malloc(output.size);
+
+	/* read compressed data */
+	for (;;)
+	{
+		size_t			cnt;
+
+		input.size = input_size; // XXX: the buffer can grow, we shouldn't keep resetting it to the original value..
+		cnt = readF(AH, (char **)unconstify(void **, &input.src), &input.size);
+		input.pos = 0;
+		input.size = cnt;
+
+		if (cnt == 0)
+			break;
+
+		while (input.pos < input.size)
+		{
+			/* decompress */
+			output.pos = 0;
+			res = ZSTD_decompressStream(dstream, &output, &input);
+
+			if (ZSTD_isError(res))
+				fatal("could not decompress data: %s", ZSTD_getErrorName(res));
+
+			/* write to output handle */
+			((char *)output.dst)[output.pos] = '\0';
+			ahwrite(output.dst, 1, output.pos, AH);
+			// if (res == 0)
+				// break;
+		}
+	}
+
+	pg_free(unconstify(void *, input.src));
+	pg_free(output.dst);
+}
+
+#endif		/* HAVE_LIBZSTD */
+
 #ifdef HAVE_LIBZ
 /*
  * Functions for zlib compressed output.
@@ -411,6 +655,19 @@ struct cfp
 #ifdef HAVE_LIBZ
 	gzFile		compressedfp;
 #endif
+
+#ifdef HAVE_LIBZSTD // XXX: this should be a union with a CompressionAlgorithm alg?
+	/* This is a normal file to which we read/write compressed data */
+	struct {
+		FILE			*fp;
+		// XXX: use one separate ZSTD_CStream per thread: disable on windows ?
+		ZSTD_CStream	*cstream;
+		ZSTD_DStream	*dstream;
+		ZSTD_outBuffer	output;
+		ZSTD_inBuffer	input;
+	} zstd;
+#endif
+
 };
 
 static int	hasSuffix(const char *filename);
@@ -525,6 +782,31 @@ cfopen(const char *path, const char *mode, Compress *compression)
 		return fp;
 #endif
 
+#ifdef HAVE_LIBZSTD
+	case COMPR_ALG_ZSTD:
+		fp->zstd.fp = fopen(path, mode);
+		if (fp->zstd.fp == NULL)
+		{
+			free_keep_errno(fp);
+			fp = NULL;
+		}
+		else if (mode[0] == 'w' || mode[0] == 'a' ||
+			strchr(mode, '+') != NULL)
+		{
+			fp->zstd.output.size = ZSTD_CStreamOutSize();
+			fp->zstd.output.dst = pg_malloc0(fp->zstd.output.size);
+			fp->zstd.cstream = ZstdCStreamParams(compression);
+		}
+		else if (strchr(mode, 'r'))
+		{
+			fp->zstd.input.src = pg_malloc0(ZSTD_DStreamInSize());
+			fp->zstd.dstream = ZSTD_createDStream();
+			if (fp->zstd.dstream == NULL)
+				fatal("could not initialize compression library");
+		} // XXX else: bad mode
+		return fp;
+#endif
+
 	case COMPR_ALG_NONE:
 		fp->uncompressedfp = fopen(path, mode);
 		if (fp->uncompressedfp == NULL)
@@ -576,6 +858,31 @@ cfdopen(int fd, const char *mode, Compress *compression)
 		return fp;
 #endif
 
+#ifdef HAVE_LIBZSTD
+	case COMPR_ALG_ZSTD:
+		fp->zstd.fp = fdopen(fd, mode);
+		if (fp->zstd.fp == NULL)
+		{
+			free_keep_errno(fp);
+			fp = NULL;
+		}
+		else if (mode[0] == 'w' || mode[0] == 'a' ||
+			strchr(mode, '+') != NULL)
+		{
+			fp->zstd.output.size = ZSTD_CStreamOutSize();
+			fp->zstd.output.dst = pg_malloc0(fp->zstd.output.size);
+			fp->zstd.cstream = ZstdCStreamParams(compression);
+		}
+		else if (strchr(mode, 'r'))
+		{
+			fp->zstd.input.src = pg_malloc0(ZSTD_DStreamInSize());
+			fp->zstd.dstream = ZSTD_createDStream();
+			if (fp->zstd.dstream == NULL)
+				fatal("could not initialize compression library");
+		} // XXX else: bad mode
+		return fp;
+#endif
+
 	case COMPR_ALG_NONE:
 		fp->uncompressedfp = fdopen(fd, mode);
 		if (fp->uncompressedfp == NULL)
@@ -617,6 +924,68 @@ cfread(void *ptr, int size, cfp *fp)
 	}
 #endif
 
+#ifdef HAVE_LIBZSTD
+	if (fp->zstd.fp)
+	{
+		ZSTD_outBuffer	*output = &fp->zstd.output;
+		ZSTD_inBuffer	*input = &fp->zstd.input;
+		size_t			input_size = ZSTD_DStreamInSize();
+		/* input_size is the allocated size */
+		size_t			res, cnt;
+
+		output->size = size;
+		output->dst = ptr;
+		output->pos = 0;
+
+		for (;;)
+		{
+			Assert(input->pos <= input->size);
+			Assert(input->size <= input_size);
+
+			/* If the input is completely consumed, start back at the beginning */
+			if (input->pos == input->size)
+			{
+				/* input->size is size produced by "fread" */
+				input->size = 0;
+				/* input->pos is position consumed by decompress */
+				input->pos = 0;
+			}
+
+			/* read compressed data if we must produce more input */
+			if (input->pos == input->size)
+			{
+				cnt = fread(unconstify(void *, input->src), 1, input_size, fp->zstd.fp);
+				input->size = cnt;
+
+				/* If we have no input to consume, we're done */
+				if (cnt == 0)
+					break;
+			}
+
+			Assert(cnt >= 0);
+			Assert(input->size <= input_size);
+
+			/* Now consume as much as possible */
+			for ( ; input->pos < input->size; )
+			{
+				/* decompress */
+				res = ZSTD_decompressStream(fp->zstd.dstream, output, input);
+				if (res == 0)
+					break; /* End of frame */
+				if (output->pos == output->size)
+					break; /* No more room for output */
+				if (ZSTD_isError(res))
+					fatal("could not decompress data: %s", ZSTD_getErrorName(res));
+			}
+
+			if (output->pos == output->size)
+				break; /* We read all the data that fits */
+		}
+
+		return output->pos;
+	}
+#endif
+
 	ret = fread(ptr, 1, size, fp->uncompressedfp);
 	if (ret != size && !feof(fp->uncompressedfp))
 		READ_ERROR_EXIT(fp->uncompressedfp);
@@ -630,6 +999,35 @@ cfwrite(const void *ptr, int size, cfp *fp)
 	if (fp->compressedfp)
 		return gzwrite(fp->compressedfp, ptr, size);
 #endif
+
+#ifdef HAVE_LIBZSTD
+	if (fp->zstd.fp)
+	{
+		size_t      res, cnt;
+		ZSTD_outBuffer	*output = &fp->zstd.output;
+		ZSTD_inBuffer	*input = &fp->zstd.input;
+
+		input->src = ptr;
+		input->size = size;
+		input->pos = 0;
+
+		/* Consume all input, and flush later */
+		while (input->pos != input->size)
+		{
+			output->pos = 0;
+			res = ZSTD_compressStream2(fp->zstd.cstream, output, input, ZSTD_e_continue);
+			if (ZSTD_isError(res))
+				fatal("could not compress data: %s", ZSTD_getErrorName(res));
+
+			cnt = fwrite(output->dst, 1, output->pos, fp->zstd.fp);
+			if (cnt != output->pos)
+				fatal("could not write data: %s", strerror(errno));
+		}
+
+		return size;
+	}
+#endif
+
 	return fwrite(ptr, 1, size, fp->uncompressedfp);
 }
 
@@ -652,6 +1050,21 @@ cfgetc(cfp *fp)
 		return ret;
 	}
 #endif
+
+#ifdef HAVE_LIBZSTD
+	if (fp->zstd.fp)
+	{
+		if (cfread(&ret, 1, fp) != 1)
+		{
+			if (feof(fp->zstd.fp))
+				fatal("could not read from input file: end of file");
+			else
+				fatal("could not read from input file: %s", strerror(errno));
+		}
+		return ret;
+	}
+#endif
+
 	ret = fgetc(fp->uncompressedfp);
 	if (ret == EOF)
 		READ_ERROR_EXIT(fp->uncompressedfp);
@@ -665,6 +1078,31 @@ cfgets(cfp *fp, char *buf, int len)
 	if (fp->compressedfp)
 		return gzgets(fp->compressedfp, buf, len);
 #endif
+#ifdef HAVE_LIBZSTD
+	if (fp->zstd.fp)
+	{
+		/*
+		 * Read one byte at a time until newline or EOF.
+		 * This is only used to read the list of blobs, and the I/O is
+		 * buffered anyway.
+		 */
+		int i, res;
+		for (i = 0; i < len - 1; ++i)
+		{
+			res = cfread(&buf[i], 1, fp);
+			if (res != 1)
+				break;
+			if (buf[i] == '\n')
+			{
+				++i;
+				break;
+			}
+		}
+		buf[i] = '\0';
+		return i > 0 ? buf : 0;
+	}
+#endif
+
 	return fgets(buf, len, fp->uncompressedfp);
 }
 
@@ -688,6 +1126,44 @@ cfclose(cfp *fp)
 	}
 #endif
 
+#ifdef HAVE_LIBZSTD
+	if (fp->zstd.fp)
+	{
+		ZSTD_outBuffer	*output = &fp->zstd.output;
+		ZSTD_inBuffer	*input = &fp->zstd.input;
+		size_t res, cnt;
+
+		if (fp->zstd.cstream)
+		{
+			for (;;)
+			{
+				output->pos = 0;
+				res = ZSTD_compressStream2(fp->zstd.cstream, output, input, ZSTD_e_end);
+				if (ZSTD_isError(res))
+					fatal("could not compress data: %s", ZSTD_getErrorName(res));
+				cnt = fwrite(output->dst, 1, output->pos, fp->zstd.fp);
+				if (cnt != output->pos)
+					fatal("could not write data: %s", strerror(errno));
+				if (res == 0)
+					break;
+			}
+
+			ZSTD_freeCStream(fp->zstd.cstream);
+			pg_free(fp->zstd.output.dst);
+		}
+
+		if (fp->zstd.dstream)
+		{
+			ZSTD_freeDStream(fp->zstd.dstream);
+			pg_free(unconstify(void *, fp->zstd.input.src));
+		}
+
+		result = fclose(fp->zstd.fp);
+		fp->zstd.fp = NULL;
+		return result;
+	}
+#endif
+
 	result = fclose(fp->uncompressedfp);
 	fp->uncompressedfp = NULL;
 	free_keep_errno(fp);
@@ -702,6 +1178,10 @@ cfeof(cfp *fp)
 		return gzeof(fp->compressedfp);
 #endif
 
+#ifdef HAVE_LIBZSTD
+	if (fp->zstd.fp)
+		return feof(fp->zstd.fp);
+#endif
 	return feof(fp->uncompressedfp);
 }
 
diff --git a/src/bin/pg_dump/pg_backup.h b/src/bin/pg_dump/pg_backup.h
index f2390b7937..19ff6248d5 100644
--- a/src/bin/pg_dump/pg_backup.h
+++ b/src/bin/pg_dump/pg_backup.h
@@ -77,6 +77,7 @@ typedef enum
 		COMPR_ALG_DEFAULT = -1,
 		COMPR_ALG_NONE,
 		COMPR_ALG_LIBZ,
+		COMPR_ALG_ZSTD,
 } CompressionAlgorithm;
 /* Should be called "method" or "library" ? */
 
@@ -88,6 +89,19 @@ typedef struct Compress {
 	 * are all integer, though.
 	*/
 	bool		level_set;
+
+	/*
+	 * This could be a union across all compress algorithms, but
+	 * keeping as separate structs allows checking that options are
+	 * not specified for a different algorithm than selected.
+	 */
+
+	struct {
+		bool		longdistance;
+		bool		checksum;
+		bool		rsyncable;
+		int		threads;
+	} zstd;
 } Compress;
 
 
diff --git a/src/bin/pg_dump/pg_backup_archiver.h b/src/bin/pg_dump/pg_backup_archiver.h
index 9f511b49b9..da2eb53277 100644
--- a/src/bin/pg_dump/pg_backup_archiver.h
+++ b/src/bin/pg_dump/pg_backup_archiver.h
@@ -56,6 +56,10 @@ typedef struct _z_stream
 typedef z_stream *z_streamp;
 #endif
 
+#ifdef HAVE_LIBZSTD
+#include <zstd.h>
+#endif	/* HAVE_LIBZSTD */
+
 /* Data block types */
 #define BLK_DATA 1
 #define BLK_BLOBS 3
diff --git a/src/bin/pg_dump/pg_backup_directory.c b/src/bin/pg_dump/pg_backup_directory.c
index 75c1bf22e4..b8efeb8ca7 100644
--- a/src/bin/pg_dump/pg_backup_directory.c
+++ b/src/bin/pg_dump/pg_backup_directory.c
@@ -393,8 +393,12 @@ _PrintFileData(ArchiveHandle *AH, char *filename)
 	if (!cfp)
 		fatal("could not open input file \"%s\": %m", filename);
 
-	buf = pg_malloc(ZLIB_OUT_SIZE);
-	buflen = ZLIB_OUT_SIZE;
+	/*
+	 * zstd prefers a 128kB buffer.  The allocation cannot happen in
+	 * cfread, since the "cfp" is an opaque type.
+	 */
+	buf = pg_malloc(128*1024);
+	buflen = 128*1024;
 
 	while ((cnt = cfread(buf, buflen, cfp)))
 	{
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index 75985fd4d3..7c2f7a9ca3 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -356,6 +356,12 @@ parse_compression(const char *optarg, Compress *compress)
 				compress->level = atoi(1+eq);
 				compress->level_set = true;
 			}
+			else if (strncmp(optarg, "zstdlong", len) == 0)
+				compress->zstd.longdistance = atoi(1+eq);
+			else if (strncmp(optarg, "checksum", len) == 0)
+				compress->zstd.checksum = atoi(1+eq);
+			else if (strncmp(optarg, "threads", len) == 0)
+				compress->zstd.threads = atoi(1+eq);
 			else
 			{
 				pg_log_error("unknown compression setting: %s", optarg);
@@ -367,11 +373,31 @@ parse_compression(const char *optarg, Compress *compress)
 				break;
 		}
 
+		/* XXX: zstd will check its own compression level later */
+		if (compress->alg != COMPR_ALG_ZSTD)
+		{
+			Compress nullopts = {0};
+
+			if (compress->level < 0 || compress->level > 9)
+			{
+				pg_log_error("compression level must be in range 0..9");
+				exit_nicely(1);
+			}
+
+// XXX: needs to set default alg first
+			if (memcmp(&compress->zstd, &nullopts.zstd, sizeof(nullopts.zstd)) != 0)
+			{
+				pg_log_error("compression option not supported with this algorithm");
+				exit_nicely(1);
+			}
+		}
+
 		if (!compress->level_set)
 		{ // XXX
 			const int default_compress_level[] = {
 				0,			/* COMPR_ALG_NONE */
 				Z_DEFAULT_COMPRESSION,	/* COMPR_ALG_ZLIB */
+				0, // XXX: ZSTD_CLEVEL_DEFAULT,	/* COMPR_ALG_ZSTD */
 			};
 
 			compress->level = default_compress_level[compress->alg];
@@ -764,6 +790,11 @@ main(int argc, char **argv)
 			compress.alg = COMPR_ALG_LIBZ;
 			compress.level = Z_DEFAULT_COMPRESSION;
 #endif
+
+#ifdef HAVE_LIBZSTD
+			compress.alg = COMPR_ALG_ZSTD; // Set default for testing purposes
+			compress.level = ZSTD_CLEVEL_DEFAULT;
+#endif
 		}
 		else
 		{
@@ -780,6 +811,14 @@ main(int argc, char **argv)
 	}
 #endif
 
+#ifndef HAVE_LIBZSTD
+	if (compress.alg == COMPR_ALG_ZSTD)
+	{
+		pg_log_warning("requested compression not available in this installation -- archive will be uncompressed");
+		compress.alg = 0;
+	}
+#endif
+
 	/*
 	 * If emitting an archive format, we always want to emit a DATABASE item,
 	 * in case --create is specified at pg_restore time.
diff --git a/src/include/pg_config.h.in b/src/include/pg_config.h.in
index de8f838e53..da35415c72 100644
--- a/src/include/pg_config.h.in
+++ b/src/include/pg_config.h.in
@@ -346,6 +346,9 @@
 /* Define to 1 if you have the `z' library (-lz). */
 #undef HAVE_LIBZ
 
+/* Define to 1 if you have the `zstd' library (-lzstd). */
+#undef HAVE_LIBZSTD
+
 /* Define to 1 if you have the `link' function. */
 #undef HAVE_LINK
 
diff --git a/src/tools/msvc/Solution.pm b/src/tools/msvc/Solution.pm
index 22d6abd367..a101366b4c 100644
--- a/src/tools/msvc/Solution.pm
+++ b/src/tools/msvc/Solution.pm
@@ -307,6 +307,7 @@ sub GenerateFiles
 		HAVE_LIBXML2                                => undef,
 		HAVE_LIBXSLT                                => undef,
 		HAVE_LIBZ                   => $self->{options}->{zlib} ? 1 : undef,
+		HAVE_LIBZSTD                => $self->{options}->{zstd} ? 1 : undef,
 		HAVE_LINK                   => undef,
 		HAVE_LOCALE_T               => 1,
 		HAVE_LONG_INT_64            => undef,
-- 
2.17.0

