diff --git a/evcache-core/build.gradle b/evcache-core/build.gradle index 63874384..b67317a4 100644 --- a/evcache-core/build.gradle +++ b/evcache-core/build.gradle @@ -43,6 +43,7 @@ dependencies { api group:"joda-time", name:"joda-time", version:"latest.release" api group:"javax.annotation", name:"javax.annotation-api", version:"latest.release" api group:"com.github.fzakaria", name:"ascii85", version:"latest.release" + api group:"com.github.luben", name:"zstd-jni", version:"latest.release" testImplementation group:"org.testng", name:"testng", version:"7.5" testImplementation group:"com.beust", name:"jcommander", version:"1.72" diff --git a/evcache-core/src/main/java/com/netflix/evcache/EVCacheSerializingTranscoder.java b/evcache-core/src/main/java/com/netflix/evcache/EVCacheSerializingTranscoder.java index 95c7a86e..33850ce4 100644 --- a/evcache-core/src/main/java/com/netflix/evcache/EVCacheSerializingTranscoder.java +++ b/evcache-core/src/main/java/com/netflix/evcache/EVCacheSerializingTranscoder.java @@ -22,8 +22,9 @@ package com.netflix.evcache; +import com.github.luben.zstd.Zstd; +import com.github.luben.zstd.ZstdInputStream; import com.netflix.evcache.metrics.EVCacheMetricsFactory; -import com.netflix.evcache.pool.ServerGroup; import com.netflix.spectator.api.BasicTag; import com.netflix.spectator.api.Tag; import com.netflix.spectator.api.Timer; @@ -31,14 +32,17 @@ import net.spy.memcached.transcoders.BaseSerializingTranscoder; import net.spy.memcached.transcoders.Transcoder; import net.spy.memcached.transcoders.TranscoderUtils; -import net.spy.memcached.util.StringUtils; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; import java.time.Duration; import java.util.ArrayList; import java.util.Date; import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; @@ -65,8 +69,16 @@ public class EVCacheSerializingTranscoder extends BaseSerializingTranscoder impl static final String COMPRESSION = "COMPRESSION_METRIC"; + public enum CompressionAlgorithm { GZIP, ZSTD } + + public static final int DEFAULT_ZSTD_COMPRESSION_LEVEL = 3; + + private static final int ZSTD_MAGIC = 0xFD2FB528; + private final TranscoderUtils tu = new TranscoderUtils(true); private Timer timer; + private CompressionAlgorithm compressionAlgorithm = CompressionAlgorithm.GZIP; + private int zstdLevel = DEFAULT_ZSTD_COMPRESSION_LEVEL; /** * Get a serializing transcoder with the default max data size. @@ -82,6 +94,14 @@ public EVCacheSerializingTranscoder(int max) { super(max); } + public void setCompressionAlgorithm(CompressionAlgorithm algo) { + this.compressionAlgorithm = algo; + } + + public void setCompressionLevel(int level) { + this.zstdLevel = level; + } + @Override public boolean asyncDecode(CachedData d) { if ((d.getFlags() & COMPRESSED) != 0 || (d.getFlags() & SERIALIZED) != 0) { @@ -179,29 +199,90 @@ public CachedData encode(Object o) { } assert b != null; if (b.length > compressionThreshold) { + int originalLength = b.length; byte[] compressed = compress(b); - if (compressed.length < b.length) { + if (compressed.length < originalLength) { getLogger().trace("Compressed %s from %d to %d", - o.getClass().getName(), b.length, compressed.length); + o.getClass().getName(), originalLength, compressed.length); b = compressed; flags |= COMPRESSED; } else { getLogger().debug("Compression increased the size of %s from %d to %d", - o.getClass().getName(), b.length, compressed.length); + o.getClass().getName(), originalLength, compressed.length); } - long compression_ratio = Math.round((double) compressed.length / b.length * 100); + long compression_ratio = Math.round((double) compressed.length / originalLength * 100); updateTimerWithCompressionRatio(compression_ratio); } return new CachedData(flags, b, getMaxSize()); } + @Override + protected byte[] compress(byte[] in) { + if (in == null) throw new NullPointerException("Can't compress null"); + switch (compressionAlgorithm) { + case ZSTD: + return Zstd.compress(in, zstdLevel); + case GZIP: + return super.compress(in); + default: + throw new IllegalArgumentException("Unsupported compression algorithm: " + compressionAlgorithm); + } + } + + @Override + protected byte[] decompress(byte[] in) { + if (in == null || in.length == 0) return in; + if (isZstdCompressed(in)) return decompressZstd(in); + return super.decompress(in); + } + + private boolean isZstdCompressed(byte[] data) { + if (data == null || data.length < 4) return false; + int magic = ByteBuffer.wrap(data, 0, 4).order(ByteOrder.LITTLE_ENDIAN).getInt(); + return magic == ZSTD_MAGIC; + } + + private byte[] decompressZstd(byte[] in) { + long originalSize = Zstd.decompressedSize(in); + if (originalSize > Integer.MAX_VALUE) { + getLogger().warn("Zstd decompressed size exceeds int range: " + originalSize); + return null; + } + if (originalSize > 0) { + // Fast path: frame carries a content-size header (compress() above always does). + return Zstd.decompress(in, (int) originalSize); + } + // Slow path: declared size is 0, unknown (-1), or invalid (-2) — stream-decode and let + // ZstdInputStream surface any frame errors. + ZstdInputStream zis = null; + try { + zis = new ZstdInputStream(new ByteArrayInputStream(in)); + return readAll(zis); + } catch (IOException e) { + getLogger().error("Error reading Zstd input stream", e); + return null; + } finally { + try { if (zis != null) zis.close(); } catch (IOException ignored) {} + } + } + + private static byte[] readAll(InputStream in) throws IOException { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + byte[] buf = new byte[8192]; + int n; + while ((n = in.read(buf)) != -1) { + out.write(buf, 0, n); + } + return out.toByteArray(); + } + private void updateTimerWithCompressionRatio(long ratio_percentage) { if(timer == null) { final List tagList = new ArrayList(1); - tagList.add(new BasicTag(EVCacheMetricsFactory.COMPRESSION_TYPE, "gzip")); + tagList.add(new BasicTag(EVCacheMetricsFactory.COMPRESSION_TYPE, compressionAlgorithm.name().toLowerCase())); timer = EVCacheMetricsFactory.getInstance().getPercentileTimer(EVCacheMetricsFactory.COMPRESSION_RATIO, tagList, Duration.ofMillis(100)); - }; + } timer.record(ratio_percentage, TimeUnit.MILLISECONDS); } diff --git a/evcache-core/src/main/java/com/netflix/evcache/EVCacheTranscoder.java b/evcache-core/src/main/java/com/netflix/evcache/EVCacheTranscoder.java index 97be808b..2fabc0ff 100644 --- a/evcache-core/src/main/java/com/netflix/evcache/EVCacheTranscoder.java +++ b/evcache-core/src/main/java/com/netflix/evcache/EVCacheTranscoder.java @@ -1,5 +1,6 @@ package com.netflix.evcache; +import com.netflix.archaius.api.PropertyRepository; import com.netflix.evcache.util.EVCacheConfig; import net.spy.memcached.CachedData; @@ -17,16 +18,15 @@ public EVCacheTranscoder(int max) { public EVCacheTranscoder(int max, int compressionThreshold) { super(max); setCompressionThreshold(compressionThreshold); - } - - @Override - public boolean asyncDecode(CachedData d) { - return super.asyncDecode(d); - } - - @Override - public Object decode(CachedData d) { - return super.decode(d); + PropertyRepository config = EVCacheConfig.getInstance().getPropertyRepository(); + CompressionAlgorithm algo = CompressionAlgorithm.valueOf( + config.get("default.evcache.compression.algorithm", String.class) + .orElse("GZIP").get().toUpperCase()); + setCompressionAlgorithm(algo); + if (algo == CompressionAlgorithm.ZSTD) { + setCompressionLevel(config.get("default.evcache.compression.zstd.level", Integer.class) + .orElse(DEFAULT_ZSTD_COMPRESSION_LEVEL).get()); + } } @Override diff --git a/evcache-core/src/test/java/com/netflix/evcache/EVCacheSerializingTranscoderTest.java b/evcache-core/src/test/java/com/netflix/evcache/EVCacheSerializingTranscoderTest.java new file mode 100644 index 00000000..2bbfc58c --- /dev/null +++ b/evcache-core/src/test/java/com/netflix/evcache/EVCacheSerializingTranscoderTest.java @@ -0,0 +1,227 @@ +package com.netflix.evcache; + +import com.netflix.archaius.DefaultPropertyFactory; +import com.netflix.archaius.api.PropertyRepository; +import com.netflix.archaius.config.DefaultSettableConfig; +import com.netflix.evcache.util.EVCacheConfig; +import net.spy.memcached.CachedData; +import org.testng.annotations.Test; + +import static org.testng.Assert.*; + +public class EVCacheSerializingTranscoderTest { + + @Test + public void testEnumValues() { + assertEquals(EVCacheSerializingTranscoder.CompressionAlgorithm.valueOf("GZIP"), + EVCacheSerializingTranscoder.CompressionAlgorithm.GZIP); + assertEquals(EVCacheSerializingTranscoder.CompressionAlgorithm.valueOf("ZSTD"), + EVCacheSerializingTranscoder.CompressionAlgorithm.ZSTD); + } + + @Test + public void testDefaultZstdLevelConstant() { + assertEquals(EVCacheSerializingTranscoder.DEFAULT_ZSTD_COMPRESSION_LEVEL, 3); + } + + @Test + public void testDefaultConstructorUsesGzip() { + EVCacheSerializingTranscoder t = new EVCacheSerializingTranscoder(); + t.setCompressionThreshold(0); + CachedData encoded = t.encode("hello world hello world hello world hello world hello world"); + assertTrue((encoded.getFlags() & EVCacheSerializingTranscoder.COMPRESSED) != 0, + "COMPRESSED flag must be set"); + byte[] data = encoded.getData(); + assertEquals(data[0], (byte) 0x1f, "Default constructor must use gzip"); + assertEquals(data[1], (byte) 0x8b, "Default constructor must use gzip"); + } + + @Test + public void testSetCompressionAlgorithmProducesZstd() { + EVCacheSerializingTranscoder t = new EVCacheSerializingTranscoder(CachedData.MAX_SIZE); + t.setCompressionAlgorithm(EVCacheSerializingTranscoder.CompressionAlgorithm.ZSTD); + t.setCompressionThreshold(0); + CachedData encoded = t.encode("hello world hello world hello world hello world hello world"); + assertTrue((encoded.getFlags() & EVCacheSerializingTranscoder.COMPRESSED) != 0, + "COMPRESSED flag must be set"); + byte[] data = encoded.getData(); + assertEquals(data[0], (byte) 0x28, "setCompressionAlgorithm(ZSTD) must produce zstd magic byte 0"); + assertEquals(data[1], (byte) 0xB5, "setCompressionAlgorithm(ZSTD) must produce zstd magic byte 1"); + } + + @Test + public void testSetCompressionLevelRoundTrip() { + EVCacheSerializingTranscoder t = new EVCacheSerializingTranscoder(CachedData.MAX_SIZE); + t.setCompressionAlgorithm(EVCacheSerializingTranscoder.CompressionAlgorithm.ZSTD); + t.setCompressionLevel(5); + t.setCompressionThreshold(1); + String original = "hello world hello world hello world hello world hello world"; + CachedData encoded = t.encode(original); + String decoded = (String) t.decode(encoded); + assertEquals(decoded, original, "Round-trip must succeed with custom zstd level 5"); + } + + @Test + public void testGzipEncodeSetsGzipMagicBytes() { + EVCacheSerializingTranscoder t = new EVCacheSerializingTranscoder(CachedData.MAX_SIZE); + t.setCompressionThreshold(0); + CachedData encoded = t.encode("hello world hello world hello world hello world hello world"); + assertTrue((encoded.getFlags() & EVCacheSerializingTranscoder.COMPRESSED) != 0, + "COMPRESSED flag must be set"); + byte[] data = encoded.getData(); + assertEquals(data[0], (byte) 0x1f, "Expected gzip magic byte 0"); + assertEquals(data[1], (byte) 0x8b, "Expected gzip magic byte 1"); + } + + @Test + public void testZstdEncodeSetsZstdMagicBytes() { + EVCacheSerializingTranscoder t = new EVCacheSerializingTranscoder(CachedData.MAX_SIZE); + t.setCompressionAlgorithm(EVCacheSerializingTranscoder.CompressionAlgorithm.ZSTD); + t.setCompressionThreshold(0); + CachedData encoded = t.encode("hello world hello world hello world hello world hello world"); + assertTrue((encoded.getFlags() & EVCacheSerializingTranscoder.COMPRESSED) != 0, + "COMPRESSED flag must be set"); + byte[] data = encoded.getData(); + // Zstd magic is 0xFD2FB528 in little-endian: bytes 0x28 0xB5 0x2F 0xFD + assertEquals(data[0], (byte) 0x28, "Expected zstd magic byte 0"); + assertEquals(data[1], (byte) 0xB5, "Expected zstd magic byte 1"); + assertEquals(data[2], (byte) 0x2F, "Expected zstd magic byte 2"); + assertEquals(data[3], (byte) 0xFD, "Expected zstd magic byte 3"); + } + + @Test + public void testGzipRoundTrip() { + EVCacheSerializingTranscoder transcoder = new EVCacheSerializingTranscoder(CachedData.MAX_SIZE); + transcoder.setCompressionThreshold(1); + String original = "hello world hello world hello world hello world hello world"; + CachedData encoded = transcoder.encode(original); + String decoded = (String) transcoder.decode(encoded); + assertEquals(decoded, original); + } + + @Test + public void testZstdRoundTrip() { + EVCacheSerializingTranscoder transcoder = new EVCacheSerializingTranscoder(CachedData.MAX_SIZE); + transcoder.setCompressionAlgorithm(EVCacheSerializingTranscoder.CompressionAlgorithm.ZSTD); + transcoder.setCompressionThreshold(1); + String original = "hello world hello world hello world hello world hello world"; + CachedData encoded = transcoder.encode(original); + String decoded = (String) transcoder.decode(encoded); + assertEquals(decoded, original); + } + + @Test + public void testGzipTranscoderDecodesZstdData() { + // zstd transcoder writes, gzip transcoder reads → cross-decode via magic-byte detection + EVCacheSerializingTranscoder writer = new EVCacheSerializingTranscoder(CachedData.MAX_SIZE); + writer.setCompressionAlgorithm(EVCacheSerializingTranscoder.CompressionAlgorithm.ZSTD); + writer.setCompressionThreshold(1); + EVCacheSerializingTranscoder reader = new EVCacheSerializingTranscoder(CachedData.MAX_SIZE); + + String original = "hello world hello world hello world hello world hello world"; + CachedData encoded = writer.encode(original); + String decoded = (String) reader.decode(encoded); + assertEquals(decoded, original); + } + + @Test + public void testZstdTranscoderDecodesGzipData() { + // gzip transcoder writes, zstd transcoder reads → cross-decode via magic-byte detection + EVCacheSerializingTranscoder writer = new EVCacheSerializingTranscoder(CachedData.MAX_SIZE); + writer.setCompressionThreshold(1); + EVCacheSerializingTranscoder reader = new EVCacheSerializingTranscoder(CachedData.MAX_SIZE); + reader.setCompressionAlgorithm(EVCacheSerializingTranscoder.CompressionAlgorithm.ZSTD); + + String original = "hello world hello world hello world hello world hello world"; + CachedData encoded = writer.encode(original); + String decoded = (String) reader.decode(encoded); + assertEquals(decoded, original); + } + + @Test + public void testEVCacheTranscoderDefaultsToGzip() { + EVCacheTranscoder transcoder = new EVCacheTranscoder(); + transcoder.setCompressionThreshold(0); + String original = "hello world hello world hello world hello world hello world"; + CachedData encoded = transcoder.encode(original); + assertTrue((encoded.getFlags() & EVCacheSerializingTranscoder.COMPRESSED) != 0, + "COMPRESSED flag must be set"); + byte[] data = encoded.getData(); + assertEquals(data[0], (byte) 0x1f, "EVCacheTranscoder must default to gzip"); + assertEquals(data[1], (byte) 0x8b, "EVCacheTranscoder must default to gzip"); + String decoded = (String) transcoder.decode(encoded); + assertEquals(decoded, original); + } + + @Test + public void testEVCacheTranscoderExplicitAlgorithm() { + EVCacheTranscoder transcoder = new EVCacheTranscoder(CachedData.MAX_SIZE, 1); + transcoder.setCompressionAlgorithm(EVCacheSerializingTranscoder.CompressionAlgorithm.ZSTD); + transcoder.setCompressionLevel(EVCacheSerializingTranscoder.DEFAULT_ZSTD_COMPRESSION_LEVEL); + String original = "hello world hello world hello world hello world hello world"; + CachedData encoded = transcoder.encode(original); + String decoded = (String) transcoder.decode(encoded); + assertEquals(decoded, original); + } + + @Test(expectedExceptions = IllegalArgumentException.class) + public void testInvalidAlgorithmEnumThrows() { + EVCacheSerializingTranscoder.CompressionAlgorithm.valueOf("INVALID"); + } + + @Test + public void testFPAlgorithmGzip() { + DefaultSettableConfig testConfig = new DefaultSettableConfig(); + testConfig.setProperty("default.evcache.compression.algorithm", "GZIP"); + PropertyRepository savedRepo = EVCacheConfig.getInstance().getPropertyRepository(); + EVCacheConfig.setPropertyRepository(new DefaultPropertyFactory(testConfig)); + try { + EVCacheTranscoder transcoder = new EVCacheTranscoder(CachedData.MAX_SIZE, 1); + CachedData encoded = transcoder.encode("hello world hello world hello world hello world hello world"); + assertTrue((encoded.getFlags() & EVCacheSerializingTranscoder.COMPRESSED) != 0, + "COMPRESSED flag must be set"); + byte[] data = encoded.getData(); + assertEquals(data[0], (byte) 0x1f, "FP GZIP must produce gzip magic byte 0"); + assertEquals(data[1], (byte) 0x8b, "FP GZIP must produce gzip magic byte 1"); + } finally { + EVCacheConfig.setPropertyRepository(savedRepo); + } + } + + @Test + public void testFPAlgorithmZstd() { + DefaultSettableConfig testConfig = new DefaultSettableConfig(); + testConfig.setProperty("default.evcache.compression.algorithm", "ZSTD"); + PropertyRepository savedRepo = EVCacheConfig.getInstance().getPropertyRepository(); + EVCacheConfig.setPropertyRepository(new DefaultPropertyFactory(testConfig)); + try { + EVCacheTranscoder transcoder = new EVCacheTranscoder(CachedData.MAX_SIZE, 1); + CachedData encoded = transcoder.encode("hello world hello world hello world hello world hello world"); + assertTrue((encoded.getFlags() & EVCacheSerializingTranscoder.COMPRESSED) != 0, + "COMPRESSED flag must be set"); + byte[] data = encoded.getData(); + assertEquals(data[0], (byte) 0x28, "FP ZSTD must produce zstd magic byte 0"); + assertEquals(data[1], (byte) 0xB5, "FP ZSTD must produce zstd magic byte 1"); + } finally { + EVCacheConfig.setPropertyRepository(savedRepo); + } + } + + @Test + public void testFPZstdLevel() { + DefaultSettableConfig testConfig = new DefaultSettableConfig(); + testConfig.setProperty("default.evcache.compression.algorithm", "ZSTD"); + testConfig.setProperty("default.evcache.compression.zstd.level", 1); + PropertyRepository savedRepo = EVCacheConfig.getInstance().getPropertyRepository(); + EVCacheConfig.setPropertyRepository(new DefaultPropertyFactory(testConfig)); + try { + EVCacheTranscoder transcoder = new EVCacheTranscoder(CachedData.MAX_SIZE, 1); + String original = "hello world hello world hello world hello world hello world"; + CachedData encoded = transcoder.encode(original); + String decoded = (String) transcoder.decode(encoded); + assertEquals(decoded, original, "FP zstd level 1 round-trip must succeed"); + } finally { + EVCacheConfig.setPropertyRepository(savedRepo); + } + } +} diff --git a/evcache-core/src/test/java/test-suite.xml b/evcache-core/src/test/java/test-suite.xml index f031a615..194ea07e 100644 --- a/evcache-core/src/test/java/test-suite.xml +++ b/evcache-core/src/test/java/test-suite.xml @@ -10,6 +10,11 @@ + + + + +