Add zstd http content compression support (#11470)
Motivation: netty needs to support zstd content-encoding http content compression Modification: Add ZstdOptions, and modify HttpContentCompressor and CompressorHttp2ConnectionEncoder to support zstd compression Result: netty supports zstd http content compression Signed-off-by: xingrufei <xingrufei@sogou-inc.com>
This commit is contained in:
parent
cc92b6c1e6
commit
154a3e0cab
@ -26,6 +26,8 @@ import io.netty.handler.codec.compression.CompressionOptions;
|
||||
import io.netty.handler.codec.compression.DeflateOptions;
|
||||
import io.netty.handler.codec.compression.GzipOptions;
|
||||
import io.netty.handler.codec.compression.StandardCompressionOptions;
|
||||
import io.netty.handler.codec.compression.ZstdEncoder;
|
||||
import io.netty.handler.codec.compression.ZstdOptions;
|
||||
import io.netty.util.internal.ObjectUtil;
|
||||
|
||||
/**
|
||||
@ -41,6 +43,7 @@ public class HttpContentCompressor extends HttpContentEncoder {
|
||||
private final BrotliOptions brotliOptions;
|
||||
private final GzipOptions gzipOptions;
|
||||
private final DeflateOptions deflateOptions;
|
||||
private final ZstdOptions zstdOptions;
|
||||
|
||||
private final int compressionLevel;
|
||||
private final int windowBits;
|
||||
@ -126,6 +129,7 @@ public class HttpContentCompressor extends HttpContentEncoder {
|
||||
this.brotliOptions = null;
|
||||
this.gzipOptions = null;
|
||||
this.deflateOptions = null;
|
||||
this.zstdOptions = null;
|
||||
supportsCompressionOptions = false;
|
||||
}
|
||||
|
||||
@ -156,10 +160,12 @@ public class HttpContentCompressor extends HttpContentEncoder {
|
||||
BrotliOptions brotliOptions = null;
|
||||
GzipOptions gzipOptions = null;
|
||||
DeflateOptions deflateOptions = null;
|
||||
ZstdOptions zstdOptions = null;
|
||||
if (compressionOptions == null || compressionOptions.length == 0) {
|
||||
brotliOptions = StandardCompressionOptions.brotli();
|
||||
gzipOptions = StandardCompressionOptions.gzip();
|
||||
deflateOptions = StandardCompressionOptions.deflate();
|
||||
zstdOptions = StandardCompressionOptions.zstd();
|
||||
} else {
|
||||
ObjectUtil.deepCheckNotNull("compressionOptionsIterable", compressionOptions);
|
||||
for (CompressionOptions compressionOption : compressionOptions) {
|
||||
@ -169,6 +175,8 @@ public class HttpContentCompressor extends HttpContentEncoder {
|
||||
gzipOptions = (GzipOptions) compressionOption;
|
||||
} else if (compressionOption instanceof DeflateOptions) {
|
||||
deflateOptions = (DeflateOptions) compressionOption;
|
||||
} else if (compressionOption instanceof ZstdOptions) {
|
||||
zstdOptions = (ZstdOptions) compressionOption;
|
||||
} else {
|
||||
throw new IllegalArgumentException("Unsupported " + CompressionOptions.class.getSimpleName() +
|
||||
": " + compressionOption);
|
||||
@ -183,10 +191,14 @@ public class HttpContentCompressor extends HttpContentEncoder {
|
||||
if (deflateOptions == null) {
|
||||
deflateOptions = StandardCompressionOptions.deflate();
|
||||
}
|
||||
if (zstdOptions == null) {
|
||||
zstdOptions = StandardCompressionOptions.zstd();
|
||||
}
|
||||
}
|
||||
this.brotliOptions = brotliOptions;
|
||||
this.gzipOptions = gzipOptions;
|
||||
this.deflateOptions = deflateOptions;
|
||||
this.zstdOptions = zstdOptions;
|
||||
this.compressionLevel = -1;
|
||||
this.windowBits = -1;
|
||||
this.memLevel = -1;
|
||||
@ -239,6 +251,12 @@ public class HttpContentCompressor extends HttpContentEncoder {
|
||||
new EmbeddedChannel(ctx.channel().id(), ctx.channel().metadata().hasDisconnect(),
|
||||
ctx.channel().config(), new BrotliEncoder(brotliOptions.parameters())));
|
||||
}
|
||||
if (targetContentEncoding.equals("zstd")) {
|
||||
return new Result(targetContentEncoding,
|
||||
new EmbeddedChannel(ctx.channel().id(), ctx.channel().metadata().hasDisconnect(),
|
||||
ctx.channel().config(), new ZstdEncoder(zstdOptions.compressionLevel(),
|
||||
zstdOptions.blockSize(), zstdOptions.maxEncodeSize())));
|
||||
}
|
||||
throw new Error();
|
||||
} else {
|
||||
ZlibWrapper wrapper = determineWrapper(acceptEncoding);
|
||||
@ -270,6 +288,7 @@ public class HttpContentCompressor extends HttpContentEncoder {
|
||||
protected String determineEncoding(String acceptEncoding) {
|
||||
float starQ = -1.0f;
|
||||
float brQ = -1.0f;
|
||||
float zstdQ = -1.0f;
|
||||
float gzipQ = -1.0f;
|
||||
float deflateQ = -1.0f;
|
||||
for (String encoding : acceptEncoding.split(",")) {
|
||||
@ -287,15 +306,19 @@ public class HttpContentCompressor extends HttpContentEncoder {
|
||||
starQ = q;
|
||||
} else if (encoding.contains("br") && q > brQ) {
|
||||
brQ = q;
|
||||
} else if (encoding.contains("zstd") && q > zstdQ) {
|
||||
zstdQ = q;
|
||||
} else if (encoding.contains("gzip") && q > gzipQ) {
|
||||
gzipQ = q;
|
||||
} else if (encoding.contains("deflate") && q > deflateQ) {
|
||||
deflateQ = q;
|
||||
}
|
||||
}
|
||||
if (brQ > 0.0f || gzipQ > 0.0f || deflateQ > 0.0f) {
|
||||
if (brQ != -1.0f && brQ >= gzipQ) {
|
||||
if (brQ > 0.0f || zstdQ > 0.0f || gzipQ > 0.0f || deflateQ > 0.0f) {
|
||||
if (brQ != -1.0f && brQ >= zstdQ) {
|
||||
return Brotli.isAvailable() ? "br" : null;
|
||||
} else if (zstdQ != -1.0f && zstdQ >= gzipQ) {
|
||||
return "zstd";
|
||||
} else if (gzipQ != -1.0f && gzipQ >= deflateQ) {
|
||||
return "gzip";
|
||||
} else if (deflateQ != -1.0f) {
|
||||
@ -306,6 +329,9 @@ public class HttpContentCompressor extends HttpContentEncoder {
|
||||
if (brQ == -1.0f) {
|
||||
return Brotli.isAvailable() ? "br" : null;
|
||||
}
|
||||
if (zstdQ == -1.0f) {
|
||||
return "zstd";
|
||||
}
|
||||
if (gzipQ == -1.0f) {
|
||||
return "gzip";
|
||||
}
|
||||
|
@ -23,12 +23,13 @@ import static org.hamcrest.CoreMatchers.is;
|
||||
import static org.hamcrest.CoreMatchers.not;
|
||||
import static org.hamcrest.CoreMatchers.nullValue;
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.junit.jupiter.api.Assertions.*;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
class BrotliContentCompressorTest {
|
||||
class HttpContentCompressorOptionsTest {
|
||||
|
||||
@Test
|
||||
void testGetTargetContentEncoding() {
|
||||
void testGetBrTargetContentEncoding() {
|
||||
HttpContentCompressor compressor = new HttpContentCompressor();
|
||||
|
||||
String[] tests = {
|
||||
@ -49,6 +50,27 @@ class BrotliContentCompressorTest {
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
void testGetZstdTargetContentEncoding() {
|
||||
HttpContentCompressor compressor = new HttpContentCompressor();
|
||||
|
||||
String[] tests = {
|
||||
// Accept-Encoding -> Content-Encoding
|
||||
"", null,
|
||||
"*;q=0.0", null,
|
||||
"zstd", "zstd",
|
||||
"compress, zstd;q=0.5", "zstd",
|
||||
"zstd; q=0.5, identity", "zstd",
|
||||
"zstd; q=0, deflate", "zstd",
|
||||
};
|
||||
for (int i = 0; i < tests.length; i += 2) {
|
||||
String acceptEncoding = tests[i];
|
||||
String contentEncoding = tests[i + 1];
|
||||
String targetEncoding = compressor.determineEncoding(acceptEncoding);
|
||||
assertEquals(contentEncoding, targetEncoding);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
void testAcceptEncodingHttpRequest() {
|
||||
EmbeddedChannel ch = new EmbeddedChannel(new HttpContentCompressor(null));
|
||||
@ -81,7 +103,7 @@ class BrotliContentCompressorTest {
|
||||
|
||||
private static FullHttpRequest newRequest() {
|
||||
FullHttpRequest req = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/");
|
||||
req.headers().set(HttpHeaderNames.ACCEPT_ENCODING, "br, gzip, deflate");
|
||||
req.headers().set(HttpHeaderNames.ACCEPT_ENCODING, "br, zstd, gzip, deflate");
|
||||
return req;
|
||||
}
|
||||
}
|
@ -29,6 +29,8 @@ import io.netty.handler.codec.compression.CompressionOptions;
|
||||
import io.netty.handler.codec.compression.DeflateOptions;
|
||||
import io.netty.handler.codec.compression.GzipOptions;
|
||||
import io.netty.handler.codec.compression.StandardCompressionOptions;
|
||||
import io.netty.handler.codec.compression.ZstdEncoder;
|
||||
import io.netty.handler.codec.compression.ZstdOptions;
|
||||
import io.netty.util.concurrent.PromiseCombiner;
|
||||
import io.netty.util.internal.ObjectUtil;
|
||||
import io.netty.util.internal.UnstableApi;
|
||||
@ -41,6 +43,7 @@ import static io.netty.handler.codec.http.HttpHeaderValues.GZIP;
|
||||
import static io.netty.handler.codec.http.HttpHeaderValues.IDENTITY;
|
||||
import static io.netty.handler.codec.http.HttpHeaderValues.X_DEFLATE;
|
||||
import static io.netty.handler.codec.http.HttpHeaderValues.X_GZIP;
|
||||
import static io.netty.handler.codec.http.HttpHeaderValues.ZSTD;
|
||||
|
||||
/**
|
||||
* A decorating HTTP2 encoder that will compress data frames according to the {@code content-encoding} header for each
|
||||
@ -63,6 +66,7 @@ public class CompressorHttp2ConnectionEncoder extends DecoratingHttp2ConnectionE
|
||||
private BrotliOptions brotliOptions;
|
||||
private GzipOptions gzipCompressionOptions;
|
||||
private DeflateOptions deflateOptions;
|
||||
private ZstdOptions zstdOptions;
|
||||
|
||||
/**
|
||||
* Create a new {@link CompressorHttp2ConnectionEncoder} instance
|
||||
@ -115,6 +119,8 @@ public class CompressorHttp2ConnectionEncoder extends DecoratingHttp2ConnectionE
|
||||
gzipCompressionOptions = (GzipOptions) compressionOptions;
|
||||
} else if (compressionOptions instanceof DeflateOptions) {
|
||||
deflateOptions = (DeflateOptions) compressionOptions;
|
||||
} else if (compressionOptions instanceof ZstdOptions) {
|
||||
zstdOptions = (ZstdOptions) compressionOptions;
|
||||
} else {
|
||||
throw new IllegalArgumentException("Unsupported " + CompressionOptions.class.getSimpleName() +
|
||||
": " + compressionOptions);
|
||||
@ -256,6 +262,11 @@ public class CompressorHttp2ConnectionEncoder extends DecoratingHttp2ConnectionE
|
||||
return new EmbeddedChannel(ctx.channel().id(), ctx.channel().metadata().hasDisconnect(),
|
||||
ctx.channel().config(), new BrotliEncoder(brotliOptions.parameters()));
|
||||
}
|
||||
if (zstdOptions != null && ZSTD.contentEqualsIgnoreCase(contentEncoding)) {
|
||||
return new EmbeddedChannel(ctx.channel().id(), ctx.channel().metadata().hasDisconnect(),
|
||||
ctx.channel().config(), new ZstdEncoder(zstdOptions.compressionLevel(),
|
||||
zstdOptions.blockSize(), zstdOptions.maxEncodeSize()));
|
||||
}
|
||||
// 'identity' or unsupported
|
||||
return null;
|
||||
}
|
||||
|
@ -257,6 +257,48 @@ public class DataCompressionHttp2Test {
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void zstdEncodingSingleEmptyMessage() throws Exception {
|
||||
final String text = "";
|
||||
final ByteBuf data = Unpooled.copiedBuffer(text.getBytes());
|
||||
bootstrapEnv(data.readableBytes());
|
||||
try {
|
||||
final Http2Headers headers = new DefaultHttp2Headers().method(POST).path(PATH)
|
||||
.set(HttpHeaderNames.CONTENT_ENCODING, HttpHeaderValues.ZSTD);
|
||||
|
||||
runInChannel(clientChannel, () -> {
|
||||
clientEncoder.writeHeaders(ctxClient(), 3, headers, 0, false, newPromiseClient());
|
||||
clientEncoder.writeData(ctxClient(), 3, data.retain(), 0, true, newPromiseClient());
|
||||
clientHandler.flush(ctxClient());
|
||||
});
|
||||
awaitServer();
|
||||
assertEquals(text, serverOut.toString(CharsetUtil.UTF_8.name()));
|
||||
} finally {
|
||||
data.release();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void zstdEncodingSingleMessage() throws Exception {
|
||||
final String text = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaabbbbbbbbbbbbbbbbbbbbbbbbbbbbbccccccccccccccccccccccc";
|
||||
final ByteBuf data = Unpooled.copiedBuffer(text.getBytes(CharsetUtil.UTF_8.name()));
|
||||
bootstrapEnv(data.readableBytes());
|
||||
try {
|
||||
final Http2Headers headers = new DefaultHttp2Headers().method(POST).path(PATH)
|
||||
.set(HttpHeaderNames.CONTENT_ENCODING, HttpHeaderValues.ZSTD);
|
||||
|
||||
runInChannel(clientChannel, () -> {
|
||||
clientEncoder.writeHeaders(ctxClient(), 3, headers, 0, false, newPromiseClient());
|
||||
clientEncoder.writeData(ctxClient(), 3, data.retain(), 0, true, newPromiseClient());
|
||||
clientHandler.flush(ctxClient());
|
||||
});
|
||||
awaitServer();
|
||||
assertEquals(text, serverOut.toString(CharsetUtil.UTF_8.name()));
|
||||
} finally {
|
||||
data.release();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void deflateEncodingWriteLargeMessage() throws Exception {
|
||||
final int BUFFER_SIZE = 1 << 12;
|
||||
|
@ -41,6 +41,29 @@ public final class StandardCompressionOptions {
|
||||
return new BrotliOptions(parameters);
|
||||
}
|
||||
|
||||
/**
|
||||
* Default implementation of {@link ZstdOptions} with{compressionLevel(int)} set to
|
||||
* {@link ZstdConstants#DEFAULT_COMPRESSION_LEVEL},{@link ZstdConstants#DEFAULT_BLOCK_SIZE},
|
||||
* {@link ZstdConstants#MAX_BLOCK_SIZE}
|
||||
*/
|
||||
public static ZstdOptions zstd() {
|
||||
return ZstdOptions.DEFAULT;
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a new {@link ZstdOptions}
|
||||
*
|
||||
* @param blockSize
|
||||
* is used to calculate the compressionLevel
|
||||
* @param maxEncodeSize
|
||||
* specifies the size of the largest compressed object
|
||||
* @param compressionLevel
|
||||
* specifies the level of the compression
|
||||
*/
|
||||
public static ZstdOptions zstd(int compressionLevel, int blockSize, int maxEncodeSize) {
|
||||
return new ZstdOptions(compressionLevel, blockSize, maxEncodeSize);
|
||||
}
|
||||
|
||||
/**
|
||||
* @see GzipOptions#DEFAULT
|
||||
*/
|
||||
|
@ -17,12 +17,20 @@ package io.netty.handler.codec.compression;
|
||||
|
||||
final class ZstdConstants {
|
||||
|
||||
static final int DEFAULT_COMPRESSION_LEVEL = 10;
|
||||
/**
|
||||
* Default compression level
|
||||
*/
|
||||
static final int DEFAULT_COMPRESSION_LEVEL = 3;
|
||||
|
||||
/**
|
||||
* Max compression level
|
||||
*/
|
||||
static final int MAX_COMPRESSION_LEVEL = 22;
|
||||
|
||||
/**
|
||||
* Max block size
|
||||
*/
|
||||
static final int MAX_BLOCK_SIZE = 1 << DEFAULT_COMPRESSION_LEVEL + 0x0F; // 32 M
|
||||
static final int MAX_BLOCK_SIZE = 1 << (DEFAULT_COMPRESSION_LEVEL + 7) + 0x0F; // 32 M
|
||||
/**
|
||||
* Default block size
|
||||
*/
|
||||
|
@ -27,6 +27,7 @@ import java.nio.ByteBuffer;
|
||||
import static io.netty.handler.codec.compression.ZstdConstants.DEFAULT_COMPRESSION_LEVEL;
|
||||
import static io.netty.handler.codec.compression.ZstdConstants.DEFAULT_BLOCK_SIZE;
|
||||
import static io.netty.handler.codec.compression.ZstdConstants.MAX_BLOCK_SIZE;
|
||||
import static io.netty.handler.codec.compression.ZstdConstants.MAX_COMPRESSION_LEVEL;
|
||||
|
||||
/**
|
||||
* Compresses a {@link ByteBuf} using the Zstandard algorithm.
|
||||
@ -80,7 +81,7 @@ public final class ZstdEncoder extends MessageToByteEncoder<ByteBuf> {
|
||||
*/
|
||||
public ZstdEncoder(int compressionLevel, int blockSize, int maxEncodeSize) {
|
||||
super(true);
|
||||
this.compressionLevel = ObjectUtil.checkPositive(compressionLevel, "compressionLevel");
|
||||
this.compressionLevel = ObjectUtil.checkInRange(compressionLevel, 0, MAX_COMPRESSION_LEVEL, "compressionLevel");
|
||||
this.blockSize = ObjectUtil.checkPositive(blockSize, "blockSize");
|
||||
this.maxEncodeSize = ObjectUtil.checkPositive(maxEncodeSize, "maxEncodeSize");
|
||||
}
|
||||
|
@ -0,0 +1,69 @@
|
||||
/*
|
||||
* Copyright 2021 The Netty Project
|
||||
*
|
||||
* The Netty Project licenses this file to you under the Apache License,
|
||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at:
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package io.netty.handler.codec.compression;
|
||||
|
||||
import io.netty.util.internal.ObjectUtil;
|
||||
|
||||
import static io.netty.handler.codec.compression.ZstdConstants.DEFAULT_COMPRESSION_LEVEL;
|
||||
import static io.netty.handler.codec.compression.ZstdConstants.MAX_COMPRESSION_LEVEL;
|
||||
import static io.netty.handler.codec.compression.ZstdConstants.DEFAULT_BLOCK_SIZE;
|
||||
import static io.netty.handler.codec.compression.ZstdConstants.MAX_BLOCK_SIZE;
|
||||
|
||||
/**
|
||||
* {@link ZstdOptions} holds compressionLevel for
|
||||
* Zstd compression.
|
||||
*/
|
||||
public class ZstdOptions implements CompressionOptions {
|
||||
|
||||
private final int blockSize;
|
||||
private final int compressionLevel;
|
||||
private final int maxEncodeSize;
|
||||
|
||||
/**
|
||||
* Default implementation of {@link ZstdOptions} with{compressionLevel(int)} set to
|
||||
* {@link ZstdConstants#DEFAULT_COMPRESSION_LEVEL},{@link ZstdConstants#DEFAULT_BLOCK_SIZE},
|
||||
* {@link ZstdConstants#MAX_BLOCK_SIZE}
|
||||
*/
|
||||
static final ZstdOptions DEFAULT = new ZstdOptions(DEFAULT_COMPRESSION_LEVEL, DEFAULT_BLOCK_SIZE, MAX_BLOCK_SIZE);
|
||||
|
||||
/**
|
||||
* Create a new {@link ZstdOptions}
|
||||
*
|
||||
* @param blockSize
|
||||
* is used to calculate the compressionLevel
|
||||
* @param maxEncodeSize
|
||||
* specifies the size of the largest compressed object
|
||||
* @param compressionLevel
|
||||
* specifies the level of the compression
|
||||
*/
|
||||
ZstdOptions(int compressionLevel, int blockSize, int maxEncodeSize) {
|
||||
this.compressionLevel = ObjectUtil.checkInRange(compressionLevel, 0, MAX_COMPRESSION_LEVEL, "compressionLevel");
|
||||
this.blockSize = ObjectUtil.checkPositive(blockSize, "blockSize");
|
||||
this.maxEncodeSize = ObjectUtil.checkPositive(maxEncodeSize, "maxEncodeSize");
|
||||
}
|
||||
|
||||
public int compressionLevel() {
|
||||
return compressionLevel;
|
||||
}
|
||||
|
||||
public int blockSize() {
|
||||
return blockSize;
|
||||
}
|
||||
|
||||
public int maxEncodeSize() {
|
||||
return maxEncodeSize;
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user