From e32a9655ea7892c3f89abbdd4a401bc8172ef507 Mon Sep 17 00:00:00 2001
From: usernamedt <usernamedt@protonmail.ch>
Date: Thu, 30 Dec 2021 15:59:01 +0500
Subject: [PATCH 2/4] Add ZSTD support

---
 src/Makefile.global.in        |   1 +
 src/backend/Makefile          |   4 +
 src/common/z_stream.c         | 228 ++++++++++++++++++++++++++++++++++
 src/interfaces/libpq/Makefile |   5 +
 src/tools/msvc/Solution.pm    |   3 +-
 5 files changed, 240 insertions(+), 1 deletion(-)

diff --git a/src/Makefile.global.in b/src/Makefile.global.in
index 1b3dc97e97d..da3abeddc94 100644
--- a/src/Makefile.global.in
+++ b/src/Makefile.global.in
@@ -197,6 +197,7 @@ with_system_tzdata = @with_system_tzdata@
 with_uuid	= @with_uuid@
 with_zlib	= @with_zlib@
 with_lz4    = @with_lz4@
+with_zstd   = @with_zstd@
 enable_rpath	= @enable_rpath@
 enable_nls	= @enable_nls@
 enable_debug	= @enable_debug@
diff --git a/src/backend/Makefile b/src/backend/Makefile
index f90e078b6ed..d7041c9aede 100644
--- a/src/backend/Makefile
+++ b/src/backend/Makefile
@@ -55,6 +55,10 @@ ifeq ($(with_systemd),yes)
 LIBS += -lsystemd
 endif
 
+ifeq ($(with_zstd),yes)
+LIBS += -lzstd
+endif
+
 ifeq ($(with_lz4),yes)
 LIBS += -llz4
 endif
diff --git a/src/common/z_stream.c b/src/common/z_stream.c
index b75f194a1cf..98f4e4d1d9e 100644
--- a/src/common/z_stream.c
+++ b/src/common/z_stream.c
@@ -89,6 +89,231 @@ struct ZStream
 	bool		not_flushed;
 };
 
+#if HAVE_LIBZSTD
+
+#include <stdlib.h>
+#include <zstd.h>
+
+/*
+ * Maximum allowed back-reference distance, expressed as power of 2.
+ * This setting controls max compressor/decompressor window size.
+ * More details https://github.com/facebook/zstd/blob/v1.4.7/lib/zstd.h#L536
+ */
+#define ZSTD_WINDOWLOG_LIMIT 23 /* set max window size to 8MB */
+
+
+typedef struct ZS_ZSTD_CStream
+{
+	ZSTD_CStream *stream;
+	char const *error;			/* error message */
+}			ZS_ZSTD_CStream;
+
+typedef struct ZS_ZSTD_DStream
+{
+	ZSTD_DStream *stream;
+	char const *error;			/* error message */
+}			ZS_ZSTD_DStream;
+
+static void *
+zstd_create_compressor(int level)
+{
+	size_t		rc;
+	ZS_ZSTD_CStream *c_stream = (ZS_ZSTD_CStream *) malloc(sizeof(ZS_ZSTD_CStream));
+
+	c_stream->stream = ZSTD_createCStream();
+	rc = ZSTD_initCStream(c_stream->stream, level);
+	if (ZSTD_isError(rc))
+	{
+		ZSTD_freeCStream(c_stream->stream);
+		free(c_stream);
+		return NULL;
+	}
+#if ZSTD_VERSION_MAJOR > 1 || ZSTD_VERSION_MINOR > 3
+	ZSTD_CCtx_setParameter(c_stream->stream, ZSTD_c_windowLog, ZSTD_WINDOWLOG_LIMIT);
+#endif
+	c_stream->error = NULL;
+	return c_stream;
+}
+
+static void *
+zstd_create_decompressor()
+{
+	size_t		rc;
+	ZS_ZSTD_DStream *d_stream = (ZS_ZSTD_DStream *) malloc(sizeof(ZS_ZSTD_DStream));
+
+	d_stream->stream = ZSTD_createDStream();
+	rc = ZSTD_initDStream(d_stream->stream);
+	if (ZSTD_isError(rc))
+	{
+		ZSTD_freeDStream(d_stream->stream);
+		free(d_stream);
+		return NULL;
+	}
+#if ZSTD_VERSION_MAJOR > 1 || ZSTD_VERSION_MINOR > 3
+	ZSTD_DCtx_setParameter(d_stream->stream, ZSTD_d_windowLogMax, ZSTD_WINDOWLOG_LIMIT);
+#endif
+	d_stream->error = NULL;
+	return d_stream;
+}
+
+static ssize_t
+zstd_decompress(void *d_stream, void const *src, size_t src_size, size_t *src_processed, void *dst, size_t dst_size, size_t *dst_processed)
+{
+	ZS_ZSTD_DStream *ds = (ZS_ZSTD_DStream *) d_stream;
+	ZSTD_inBuffer in;
+	ZSTD_outBuffer out;
+	size_t		rc;
+
+	in.src = src;
+	in.pos = 0;
+	in.size = src_size;
+
+	out.dst = dst;
+	out.pos = 0;
+	out.size = dst_size;
+
+	rc = ZSTD_decompressStream(ds->stream, &out, &in);
+
+	*src_processed = in.pos;
+	*dst_processed = out.pos;
+	if (ZSTD_isError(rc))
+	{
+		ds->error = ZSTD_getErrorName(rc);
+		return ZS_DECOMPRESS_ERROR;
+	}
+
+	if (rc == 0)
+	{
+		return ZS_STREAM_END;
+	}
+
+	if (out.pos == out.size)
+	{
+		/*
+		 * if `output.pos == output.size`, there might be some data left
+		 * within internal buffers
+		 */
+		return ZS_DATA_PENDING;
+	}
+	return ZS_OK;
+}
+
+static ssize_t
+zstd_compress(void *c_stream, void const *src, size_t src_size, size_t *src_processed, void *dst, size_t dst_size, size_t *dst_processed)
+{
+	ZS_ZSTD_CStream *cs = (ZS_ZSTD_CStream *) c_stream;
+	ZSTD_inBuffer in;
+	ZSTD_outBuffer out;
+
+	in.src = src;
+	in.pos = 0;
+	in.size = src_size;
+
+	out.dst = dst;
+	out.pos = 0;
+	out.size = dst_size;
+
+	if (in.pos < src_size)		/* Has something to compress in input buffer */
+	{
+		size_t		rc = ZSTD_compressStream(cs->stream, &out, &in);
+
+		*dst_processed = out.pos;
+		*src_processed = in.pos;
+		if (ZSTD_isError(rc))
+		{
+			cs->error = ZSTD_getErrorName(rc);
+			return ZS_COMPRESS_ERROR;
+		}
+	}
+
+	if (in.pos == src_size)		/* All data is compressed: flush internal zstd
+								 * buffer */
+	{
+		size_t		tx_not_flushed = ZSTD_flushStream(cs->stream, &out);
+
+		*dst_processed = out.pos;
+		if (tx_not_flushed > 0)
+		{
+			return ZS_DATA_PENDING;
+		}
+	}
+
+	return ZS_OK;
+}
+
+static ssize_t
+zstd_end(void *c_stream, void *dst, size_t dst_size, size_t *dst_processed)
+{
+	size_t		tx_not_flushed;
+	ZS_ZSTD_CStream *cs = (ZS_ZSTD_CStream *) c_stream;
+	ZSTD_outBuffer output;
+
+	output.dst = dst;
+	output.pos = 0;
+	output.size = dst_size;
+
+	do
+	{
+		tx_not_flushed = ZSTD_endStream(cs->stream, &output);
+	} while ((tx_not_flushed > 0) && (output.pos < output.size));
+
+	*dst_processed = output.pos;
+
+	if (tx_not_flushed > 0)
+	{
+		return ZS_DATA_PENDING;
+	}
+	return ZS_OK;
+}
+
+static void
+zstd_free_compressor(void *c_stream)
+{
+	ZS_ZSTD_CStream *cs = (ZS_ZSTD_CStream *) c_stream;
+
+	if (cs != NULL)
+	{
+		ZSTD_freeCStream(cs->stream);
+		free(cs);
+	}
+}
+
+static void
+zstd_free_decompressor(void *d_stream)
+{
+	ZS_ZSTD_DStream *ds = (ZS_ZSTD_DStream *) d_stream;
+
+	if (ds != NULL)
+	{
+		ZSTD_freeDStream(ds->stream);
+		free(ds);
+	}
+}
+
+static char const *
+zstd_compress_error(void *c_stream)
+{
+	ZS_ZSTD_CStream *cs = (ZS_ZSTD_CStream *) c_stream;
+
+	return cs->error;
+}
+
+static char const *
+zstd_decompress_error(void *d_stream)
+{
+	ZS_ZSTD_DStream *ds = (ZS_ZSTD_DStream *) d_stream;
+
+	return ds->error;
+}
+
+static char const *
+zstd_name(void)
+{
+	return "zstd";
+}
+
+#endif
+
 #if HAVE_LIBZ
 
 #include <stdlib.h>
@@ -424,6 +649,9 @@ no_compression_name(void)
  */
 static ZAlgorithm const zs_algorithms[] =
 {
+#if HAVE_LIBZSTD
+	{zstd_name, zstd_create_compressor, zstd_create_decompressor, zstd_decompress, zstd_compress, zstd_free_compressor, zstd_free_decompressor, zstd_compress_error, zstd_decompress_error, zstd_end},
+#endif
 #if HAVE_LIBZ
 	{zlib_name, zlib_create_compressor, zlib_create_decompressor, zlib_decompress, zlib_compress, zlib_free_compressor, zlib_free_decompressor, zlib_error, zlib_error, zlib_end},
 #endif
diff --git a/src/interfaces/libpq/Makefile b/src/interfaces/libpq/Makefile
index 27875cbee0f..9b1db2c0dc5 100644
--- a/src/interfaces/libpq/Makefile
+++ b/src/interfaces/libpq/Makefile
@@ -35,6 +35,11 @@ LIBS += -llz4
 SHLIB_LINK += -llz4
 endif
 
+ifeq ($(with_zstd),yes)
+LIBS += -lzstd
+SHLIB_LINK += -lzstd
+endif
+
 ifeq ($(with_zlib),yes)
 LIBS += -lz
 SHLIB_LINK += -lz
diff --git a/src/tools/msvc/Solution.pm b/src/tools/msvc/Solution.pm
index c2acb58df0e..f35b29f8ebd 100644
--- a/src/tools/msvc/Solution.pm
+++ b/src/tools/msvc/Solution.pm
@@ -293,7 +293,8 @@ sub GenerateFiles
 		HAVE_LIBXML2                                => undef,
 		HAVE_LIBXSLT                                => undef,
 		HAVE_LIBZ                   => $self->{options}->{zlib} ? 1 : undef,
-		HAVE_LIBZSTD                => undef,
+		HAVE_LIBZSTD                => $self->{options}->{zstd} ? 1 : undef,
+		HAVE_LINK                   => undef,
 		HAVE_LOCALE_T               => 1,
 		HAVE_LONG_INT_64            => undef,
 		HAVE_LONG_LONG_INT_64       => 1,
-- 
2.25.1

