Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions evcache-core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ dependencies {
api group:"joda-time", name:"joda-time", version:"latest.release"
api group:"javax.annotation", name:"javax.annotation-api", version:"latest.release"
api group:"com.github.fzakaria", name:"ascii85", version:"latest.release"
api group:"com.github.luben", name:"zstd-jni", version:"latest.release"

testImplementation group:"org.testng", name:"testng", version:"7.5"
testImplementation group:"com.beust", name:"jcommander", version:"1.72"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,23 +22,27 @@

package com.netflix.evcache;

import com.github.luben.zstd.Zstd;
import com.github.luben.zstd.ZstdInputStream;
import com.netflix.evcache.metrics.EVCacheMetricsFactory;
import com.netflix.evcache.pool.ServerGroup;
import com.netflix.spectator.api.BasicTag;
import com.netflix.spectator.api.Tag;
import com.netflix.spectator.api.Timer;
import net.spy.memcached.CachedData;
import net.spy.memcached.transcoders.BaseSerializingTranscoder;
import net.spy.memcached.transcoders.Transcoder;
import net.spy.memcached.transcoders.TranscoderUtils;
import net.spy.memcached.util.StringUtils;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;


Expand All @@ -65,8 +69,16 @@ public class EVCacheSerializingTranscoder extends BaseSerializingTranscoder impl

static final String COMPRESSION = "COMPRESSION_METRIC";

public enum CompressionAlgorithm { GZIP, ZSTD }

public static final int DEFAULT_ZSTD_COMPRESSION_LEVEL = 3;

private static final int ZSTD_MAGIC = 0xFD2FB528;

private final TranscoderUtils tu = new TranscoderUtils(true);
private Timer timer;
private CompressionAlgorithm compressionAlgorithm = CompressionAlgorithm.GZIP;
private int zstdLevel = DEFAULT_ZSTD_COMPRESSION_LEVEL;

/**
* Get a serializing transcoder with the default max data size.
Expand All @@ -82,6 +94,14 @@ public EVCacheSerializingTranscoder(int max) {
super(max);
}

public void setCompressionAlgorithm(CompressionAlgorithm algo) {
this.compressionAlgorithm = algo;
}

public void setCompressionLevel(int level) {
this.zstdLevel = level;
}

@Override
public boolean asyncDecode(CachedData d) {
if ((d.getFlags() & COMPRESSED) != 0 || (d.getFlags() & SERIALIZED) != 0) {
Expand Down Expand Up @@ -179,29 +199,90 @@ public CachedData encode(Object o) {
}
assert b != null;
if (b.length > compressionThreshold) {
int originalLength = b.length;
byte[] compressed = compress(b);
if (compressed.length < b.length) {
if (compressed.length < originalLength) {
getLogger().trace("Compressed %s from %d to %d",
o.getClass().getName(), b.length, compressed.length);
o.getClass().getName(), originalLength, compressed.length);
b = compressed;
flags |= COMPRESSED;
} else {
getLogger().debug("Compression increased the size of %s from %d to %d",
o.getClass().getName(), b.length, compressed.length);
o.getClass().getName(), originalLength, compressed.length);
}

long compression_ratio = Math.round((double) compressed.length / b.length * 100);
long compression_ratio = Math.round((double) compressed.length / originalLength * 100);
updateTimerWithCompressionRatio(compression_ratio);
}
return new CachedData(flags, b, getMaxSize());
}

@Override
protected byte[] compress(byte[] in) {
if (in == null) throw new NullPointerException("Can't compress null");
switch (compressionAlgorithm) {
case ZSTD:
return Zstd.compress(in, zstdLevel);
case GZIP:
return super.compress(in);
default:
throw new IllegalArgumentException("Unsupported compression algorithm: " + compressionAlgorithm);
}
}

@Override
protected byte[] decompress(byte[] in) {
if (in == null || in.length == 0) return in;
if (isZstdCompressed(in)) return decompressZstd(in);
return super.decompress(in);
}

private boolean isZstdCompressed(byte[] data) {
if (data == null || data.length < 4) return false;
int magic = ByteBuffer.wrap(data, 0, 4).order(ByteOrder.LITTLE_ENDIAN).getInt();
return magic == ZSTD_MAGIC;
}

private byte[] decompressZstd(byte[] in) {
long originalSize = Zstd.decompressedSize(in);
if (originalSize > Integer.MAX_VALUE) {
getLogger().warn("Zstd decompressed size exceeds int range: " + originalSize);
return null;
}
if (originalSize > 0) {
// Fast path: frame carries a content-size header (compress() above always does).
return Zstd.decompress(in, (int) originalSize);
}
// Slow path: declared size is 0, unknown (-1), or invalid (-2) — stream-decode and let
// ZstdInputStream surface any frame errors.
ZstdInputStream zis = null;
try {
zis = new ZstdInputStream(new ByteArrayInputStream(in));
return readAll(zis);
} catch (IOException e) {
getLogger().error("Error reading Zstd input stream", e);
return null;
} finally {
try { if (zis != null) zis.close(); } catch (IOException ignored) {}
}
}

private static byte[] readAll(InputStream in) throws IOException {
ByteArrayOutputStream out = new ByteArrayOutputStream();
byte[] buf = new byte[8192];
int n;
while ((n = in.read(buf)) != -1) {
out.write(buf, 0, n);
}
return out.toByteArray();
}

private void updateTimerWithCompressionRatio(long ratio_percentage) {
if(timer == null) {
final List<Tag> tagList = new ArrayList<Tag>(1);
tagList.add(new BasicTag(EVCacheMetricsFactory.COMPRESSION_TYPE, "gzip"));
tagList.add(new BasicTag(EVCacheMetricsFactory.COMPRESSION_TYPE, compressionAlgorithm.name().toLowerCase()));
timer = EVCacheMetricsFactory.getInstance().getPercentileTimer(EVCacheMetricsFactory.COMPRESSION_RATIO, tagList, Duration.ofMillis(100));
};
}

timer.record(ratio_percentage, TimeUnit.MILLISECONDS);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.netflix.evcache;

import com.netflix.archaius.api.PropertyRepository;
import com.netflix.evcache.util.EVCacheConfig;

import net.spy.memcached.CachedData;
Expand All @@ -17,16 +18,15 @@ public EVCacheTranscoder(int max) {
public EVCacheTranscoder(int max, int compressionThreshold) {
super(max);
setCompressionThreshold(compressionThreshold);
}

@Override
public boolean asyncDecode(CachedData d) {
return super.asyncDecode(d);
}

@Override
public Object decode(CachedData d) {
return super.decode(d);
PropertyRepository config = EVCacheConfig.getInstance().getPropertyRepository();
CompressionAlgorithm algo = CompressionAlgorithm.valueOf(
config.get("default.evcache.compression.algorithm", String.class)
.orElse("GZIP").get().toUpperCase());
setCompressionAlgorithm(algo);
if (algo == CompressionAlgorithm.ZSTD) {
setCompressionLevel(config.get("default.evcache.compression.zstd.level", Integer.class)
.orElse(DEFAULT_ZSTD_COMPRESSION_LEVEL).get());
}
}

@Override
Expand Down
Loading