From 68248e7aa220586cbf075d3cb6eb56a6461ae81f Mon Sep 17 00:00:00 2001
From: Andres Freund <andres@anarazel.de>
Date: Tue, 4 Oct 2022 16:04:16 -0700
Subject: [PATCH v1 1/2] Add a varint implementation

Author:
Reviewed-by:
Discussion: https://postgr.es/m/20191210015054.5otdfuftxrqb5gum@alap3.anarazel.de
Backpatch:
---
 src/include/common/varint.h |  92 +++++++++++++++
 src/common/Makefile         |   1 +
 src/common/meson.build      |   1 +
 src/common/varint.c         | 228 ++++++++++++++++++++++++++++++++++++
 4 files changed, 322 insertions(+)
 create mode 100644 src/include/common/varint.h
 create mode 100644 src/common/varint.c

diff --git a/src/include/common/varint.h b/src/include/common/varint.h
new file mode 100644
index 00000000000..43d9ade7808
--- /dev/null
+++ b/src/include/common/varint.h
@@ -0,0 +1,92 @@
+#ifndef PG_VARINT_H
+#define PG_VARINT_H
+
+/*
+ * Variable length integer encoding.
+ *
+ * Represent the length of the varint, in bytes - 1, as the number of
+ * "leading" 0 bits in the first byte. I.e. the length is stored in
+ * unary.  If less than 9 bytes of data are needed, the leading 0 bits
+ * are followed by a 1 bit, followed by the data. When encoding 64bit
+ * integers, for 8 bytes of input data, there is however no high 1 bit
+ * in the second output byte, as that would be unnecessary (and
+ * increase the size to 10 bytes of output bytes). If, hypothetically,
+ * larger numbers were supported, that'd not be the case, obviously.
+ *
+ * The reason to encode the number of bytes as zero bits, rathter than
+ * ones, is that more architectures have instructions for efficiently
+ * finding the first bit set, than finding the first bit unset.
+ *
+ * In contrast to encoding the length as e.g. the high-bit in each following
+ * bytes, this allows for faster decoding (and encoding, although the benefit
+ * is smaller), because no length related branches are needed after processing
+ * the first byte and no 7 bit/byte -> 8 bit/byte conversion is needed. The
+ * mask / shift overhead is similar.
+ *
+ *
+ * I.e. the encoding of a unsiged 64bit integer is as follows
+ * [0 {#bytes - 1}][1 as separator][leading 0 bits][data]
+ *
+ * E.g.
+ * - 7 (input 0b0000'0111) is encoded as 0b1000'0111
+ * - 145 (input 0b1001'0001) is 0b0100'0000''1001'0001 (as the separator does
+ *   not fit into the one byte of the fixed width encoding)
+ * - 4141 (input 0b0001'0000''0010'1101) is 0b0101'0000''0010'1101 (the
+ *   leading 0, followed by a 1 indicating that two bytes are needed)
+ *
+ *
+ * Naively encoding negative two's complement integers this way would
+ * lead to such numbers always being stored as the maximum varint
+ * length, due to the leading sign bits.  Instead the sign bit is
+ * encoded in the least significant bit. To avoid different limits
+ * than native 64bit integers - in particular around the lowest
+ * possible value - the data portion is bit flipped instead of
+ * negated.
+ *
+ * The reason for the sign bit to be trailing is that that makes it
+ * trivial to reuse code of the unsigned case, whereas a leading bit
+ * would make that harder.
+ *
+ * XXX: One problem with storing negative numbers this way is that it
+ * makes varints not compare lexicographically. Is it worth the code
+ * duplication to allow for that? Given the variable width of
+ * integers, lexicographic sorting doesn't seem that relevant.
+ *
+ * Therefore the encoding of a signed 64bit integer is as follows:
+ * [0 {#bytes - 1}][1][leading 0 bits][data][sign bit]
+ *
+ * E.g. the encoding of
+ * -  7 (input 0b0000'0111) is encoded as 0b1000'1110
+ * - -7 (input 0b[1*]''1111'1000) is encoded as 0b1001'1101
+ */
+
+/*
+ * TODO:
+ * - check how well this works on big endian
+ * - consider encoding as little endian, more common
+ * - proper header separation
+ */
+
+
+/*
+ * The max value fitting in a 1 byte varint. One bit is needed for the length
+ * indicator.
+ */
+#define PG_VARINT_UINT64_MAX_1BYTE_VAL ((1L << (BITS_PER_BYTE - 1)) - 1)
+
+/*
+ * The max value fitting into a varint below 8 bytes has to take into account
+ * the number of bits for the length indicator for 8 bytes (7 bits), and
+ * additionally a bit for the separator (1 bit).
+ */
+#define PG_VARINT_UINT64_MAX_8BYTE_VAL ((1L << (64 - (7 + 1))) - 1)
+
+
+extern int pg_varint_encode_uint64(uint64 uval, uint8 *buf);
+extern int pg_varint_encode_int64(int64 val, uint8 *buf);
+
+extern uint64 pg_varint_decode_uint64(const uint8 *buf, int *consumed);
+extern int64 pg_varint_decode_int64(const uint8 *buf, int *consumed);
+
+
+#endif /* PG_VARINT_H */
diff --git a/src/common/Makefile b/src/common/Makefile
index e9af7346c9c..dd98b468fe4 100644
--- a/src/common/Makefile
+++ b/src/common/Makefile
@@ -78,6 +78,7 @@ OBJS_COMMON = \
 	stringinfo.o \
 	unicode_norm.o \
 	username.o \
+	varint.o \
 	wait_error.o \
 	wchar.o
 
diff --git a/src/common/meson.build b/src/common/meson.build
index 23842e1ffef..d1710569f1d 100644
--- a/src/common/meson.build
+++ b/src/common/meson.build
@@ -28,6 +28,7 @@ common_sources = files(
   'stringinfo.c',
   'unicode_norm.c',
   'username.c',
+  'varint.c',
   'wait_error.c',
   'wchar.c',
 )
diff --git a/src/common/varint.c b/src/common/varint.c
new file mode 100644
index 00000000000..9f41966773a
--- /dev/null
+++ b/src/common/varint.c
@@ -0,0 +1,228 @@
+#include "c.h"
+
+#include "common/varint.h"
+#include "port/pg_bswap.h"
+
+/*
+ * Encode unsigned 64bit integer as a variable length integer in buf, return
+ * encoded length.
+ */
+int
+pg_varint_encode_uint64(uint64 uval, uint8 *buf)
+{
+	if (uval <= PG_VARINT_UINT64_MAX_1BYTE_VAL)
+	{
+		buf[0] = (uint8) uval | 1u << 7;
+
+		return 1;
+	}
+	else if (uval <= PG_VARINT_UINT64_MAX_8BYTE_VAL)
+	{
+		int clz = __builtin_clzl(uval);
+
+		int bits_of_input_data;
+		int bytes_of_output_data;
+
+		uint64 output_uval = 0;
+		unsigned bytes_of_input_data;
+
+		bits_of_input_data =
+			(sizeof(uval) * BITS_PER_BYTE - clz); /* bits of actual data */
+
+		bytes_of_input_data =
+			(bits_of_input_data + (BITS_PER_BYTE - 1)) / BITS_PER_BYTE;
+
+		bytes_of_output_data = bytes_of_input_data;
+
+		/*
+		 * Check whether another output byte is needed to account for the
+		 * overhead of length/separator bits. Avoiding a separate division is
+		 * cheaper.
+		 *
+		 * XXX: I'm sure there's a neater way to do this.
+		 */
+		if ((bits_of_input_data + /* data */
+			 (bytes_of_input_data - 1) + /* length indicator in unary */
+			 1) /* 1 separator */
+			> (bytes_of_input_data * BITS_PER_BYTE) /* available space */
+			)
+			bytes_of_output_data++;
+
+		/* mask in value at correct position */
+		output_uval = uval << (64 - bytes_of_output_data * BITS_PER_BYTE);
+
+		/* set length bits, by setting the terminating bit to 1 */
+		output_uval |= (uint64)(1 << (BITS_PER_BYTE - (bytes_of_output_data)))
+			<< (64 - BITS_PER_BYTE); /* and shift into the most significant byte */
+
+		/* convert to big endian */
+		output_uval = pg_hton64(output_uval);
+
+		/* output */
+		memcpy(&buf[0], &output_uval, sizeof(uint64));
+
+		return bytes_of_output_data;
+	}
+	else
+	{
+		uint64 uval_be;
+
+		/*
+		 * Set length bits, no separator for 9 bytes (detectable via max
+		 * length).
+		 */
+		buf[0] = 0;
+
+		uval_be = pg_hton64(uval);
+		memcpy(&buf[1], &uval_be, sizeof(uint64));
+
+		return 9;
+	}
+}
+
+/*
+ * Encode signed 64bit integer as a variable length integer in buf, return
+ * encoded length.
+ *
+ * See also pg_varint_encode_uint64().
+ */
+int
+pg_varint_encode_int64(int64 val, uint8 *buf)
+{
+	uint64 uval;
+	bool neg;
+
+	if (val < 0)
+	{
+		neg = true;
+
+		/* reinterpret number as uint64 */
+		memcpy(&uval, &val, sizeof(int64));
+
+		/* store bit flipped value */
+		uval = ~uval;
+	}
+	else
+	{
+		neg = false;
+
+		uval = (uint64) val;
+	}
+
+	/* make space for sign bit */
+	uval <<= 1;
+	uval |= (uint8) neg;
+
+	return pg_varint_encode_uint64(uval, buf);
+}
+
+/*
+ * Decode buf into unsigned 64bit integer.
+ *
+ * Note that this function, for efficiency, reads 8 bytes, even if the
+ * variable integer is less than 8 bytes long. The buffer has to be
+ * allocated sufficiently large to account for that fact. The maximum
+ * amount of memory read is 9 bytes.
+ *
+ * FIXME: Need to length of buffer "used"!
+ */
+uint64
+pg_varint_decode_uint64(const uint8 *buf, int *consumed)
+{
+	uint8 first = buf[0];
+
+	if (first & (1 << (BITS_PER_BYTE - 1)))
+	{
+		/*
+		 * Fast path for common case of small integers.
+		 */
+		// XXX: Should we put this path into a static inline function in the
+		// header? It's pretty common...
+		*consumed = 1;
+		return first ^ (1 << 7);
+	}
+	else if (first != 0)
+	{
+		/*
+		 * Separate path for cases of 1-8 bytes - that case is different
+		 * enough from the 9 byte case that it's not worth sharing code.
+		 */
+		int clz =__builtin_clz(first);
+		int bytes = 8 - (32 - clz) + 1;
+		uint64 ret;
+
+		*consumed = bytes;
+
+		/*
+		 * Note that we explicitly read "too much" here - but we never look at
+		 * the additional data, if the length bit doesn't indicate that's
+		 * ok. A load that varies on length would require substantial, often
+		 * unpredictable, branching.
+		 */
+		memcpy(&ret, buf, sizeof(uint64));
+
+		/* move data into the appropriate place */
+		ret <<= BITS_PER_BYTE * (BITS_PER_BYTE - bytes);
+
+		/* restore native endianess*/
+		ret = pg_ntoh64(ret);
+
+		/* mask out length indicator bits & separator bit */
+		ret ^= 1L << (BITS_PER_BYTE * (bytes - 1) + (BITS_PER_BYTE - bytes));
+
+		return ret;
+	}
+	else
+	{
+		/*
+		 * 9 byte varint encoding of an 8byte integer. All the
+		 * data is following the width indicating byte, so
+		 * there's no length / separator bit to unset or such.
+		 */
+		uint64 ret;
+
+		*consumed = 9;
+
+		memcpy(&ret, &buf[1], sizeof(uint64));
+
+		/* restore native endianess*/
+		ret = pg_ntoh64(ret);
+
+		return ret;
+	}
+
+	pg_unreachable();
+	*consumed = 0;
+	return 0;
+}
+
+/*
+ * Decode buf into signed 64bit integer.
+ *
+ * See pg_varint_decode_uint64 for caveats.
+ */
+int64
+pg_varint_decode_int64(const uint8 *buf, int *consumed)
+{
+	uint64 uval = pg_varint_decode_uint64(buf, consumed);
+	bool neg;
+
+	/* determine sign */
+	neg = uval & 1;
+
+	/* remove sign bit */
+	uval >>= 1;
+
+	if (neg)
+	{
+		int64 val;
+
+		uval = ~uval;
+
+		memcpy(&val, &uval, sizeof(uint64));
+
+		return val;
+	}
+	else
+		return uval;
+}
-- 
2.37.3.542.gdd3f6c4cae

