Ensure the same ByteBufAllocator is used in the EmbeddedChannel when compress / decompress. Related to [#5294]
Motivation: The user may specify to use a different allocator then the default. In this case we need to ensure it is shared when creating the EmbeddedChannel inside of a ChannelHandler Modifications: Use the config of the "original" Channel in the EmbeddedChannel and so share the same allocator etc. Result: Same type of buffers are used.
This commit is contained in:
parent
ec271cd174
commit
5b1ee83639
@ -15,6 +15,7 @@
|
||||
*/
|
||||
package io.netty.handler.codec.http;
|
||||
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.embedded.EmbeddedChannel;
|
||||
import io.netty.handler.codec.compression.ZlibCodecFactory;
|
||||
import io.netty.handler.codec.compression.ZlibWrapper;
|
||||
@ -32,6 +33,7 @@ public class HttpContentCompressor extends HttpContentEncoder {
|
||||
private final int compressionLevel;
|
||||
private final int windowBits;
|
||||
private final int memLevel;
|
||||
private ChannelHandlerContext ctx;
|
||||
|
||||
/**
|
||||
* Creates a new handler with the default compression level (<tt>6</tt>),
|
||||
@ -92,6 +94,11 @@ public class HttpContentCompressor extends HttpContentEncoder {
|
||||
this.memLevel = memLevel;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
|
||||
this.ctx = ctx;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Result beginEncode(HttpResponse headers, String acceptEncoding) throws Exception {
|
||||
String contentEncoding = headers.headers().get(HttpHeaders.Names.CONTENT_ENCODING);
|
||||
@ -119,7 +126,8 @@ public class HttpContentCompressor extends HttpContentEncoder {
|
||||
|
||||
return new Result(
|
||||
targetContentEncoding,
|
||||
new EmbeddedChannel(ZlibCodecFactory.newZlibEncoder(
|
||||
new EmbeddedChannel(ctx.channel().metadata().hasDisconnect(),
|
||||
ctx.channel().config(), ZlibCodecFactory.newZlibEncoder(
|
||||
wrapper, compressionLevel, windowBits, memLevel)));
|
||||
}
|
||||
|
||||
|
@ -45,6 +45,7 @@ import java.util.List;
|
||||
*/
|
||||
public abstract class HttpContentDecoder extends MessageToMessageDecoder<HttpObject> {
|
||||
|
||||
protected ChannelHandlerContext ctx;
|
||||
private EmbeddedChannel decoder;
|
||||
private boolean continueResponse;
|
||||
|
||||
@ -197,6 +198,12 @@ public abstract class HttpContentDecoder extends MessageToMessageDecoder<HttpObj
|
||||
super.channelInactive(ctx);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
|
||||
this.ctx = ctx;
|
||||
super.handlerAdded(ctx);
|
||||
}
|
||||
|
||||
private void cleanup() {
|
||||
if (decoder != null) {
|
||||
// Clean-up the previous decoder if not cleaned up correctly.
|
||||
|
@ -48,7 +48,8 @@ public class HttpContentDecompressor extends HttpContentDecoder {
|
||||
@Override
|
||||
protected EmbeddedChannel newContentDecoder(String contentEncoding) throws Exception {
|
||||
if ("gzip".equalsIgnoreCase(contentEncoding) || "x-gzip".equalsIgnoreCase(contentEncoding)) {
|
||||
return new EmbeddedChannel(ZlibCodecFactory.newZlibDecoder(ZlibWrapper.GZIP));
|
||||
return new EmbeddedChannel(ctx.channel().metadata().hasDisconnect(),
|
||||
ctx.channel().config(), ZlibCodecFactory.newZlibDecoder(ZlibWrapper.GZIP));
|
||||
}
|
||||
if ("deflate".equalsIgnoreCase(contentEncoding) || "x-deflate".equalsIgnoreCase(contentEncoding)) {
|
||||
ZlibWrapper wrapper;
|
||||
@ -58,7 +59,8 @@ public class HttpContentDecompressor extends HttpContentDecoder {
|
||||
wrapper = ZlibWrapper.ZLIB_OR_NONE;
|
||||
}
|
||||
// To be strict, 'deflate' means ZLIB, but some servers were not implemented correctly.
|
||||
return new EmbeddedChannel(ZlibCodecFactory.newZlibDecoder(wrapper));
|
||||
return new EmbeddedChannel(ctx.channel().metadata().hasDisconnect(),
|
||||
ctx.channel().config(), ZlibCodecFactory.newZlibDecoder(wrapper));
|
||||
}
|
||||
|
||||
// 'identity' or unsupported
|
||||
|
@ -88,10 +88,34 @@ public class EmbeddedChannel extends AbstractChannel {
|
||||
*/
|
||||
public EmbeddedChannel(boolean hasDisconnect, final ChannelHandler... handlers) {
|
||||
super(null);
|
||||
ObjectUtil.checkNotNull(handlers, "handlers");
|
||||
metadata = hasDisconnect ? METADATA_DISCONNECT : METADATA_NO_DISCONNECT;
|
||||
metadata = metadata(hasDisconnect);
|
||||
config = new DefaultChannelConfig(this);
|
||||
setup(handlers);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a new instance with the channel ID set to the given ID and the pipeline
|
||||
* initialized with the specified handlers.
|
||||
*
|
||||
* @param hasDisconnect {@code false} if this {@link Channel} will delegate {@link #disconnect()}
|
||||
* to {@link #close()}, {@link false} otherwise.
|
||||
* @param config the {@link ChannelConfig} which will be returned by {@link #config()}.
|
||||
* @param handlers the {@link ChannelHandler}s which will be add in the {@link ChannelPipeline}
|
||||
*/
|
||||
public EmbeddedChannel(boolean hasDisconnect, final ChannelConfig config,
|
||||
final ChannelHandler... handlers) {
|
||||
super(null);
|
||||
metadata = metadata(hasDisconnect);
|
||||
this.config = ObjectUtil.checkNotNull(config, "config");
|
||||
setup(handlers);
|
||||
}
|
||||
|
||||
private static ChannelMetadata metadata(boolean hasDisconnect) {
|
||||
return hasDisconnect ? METADATA_DISCONNECT : METADATA_NO_DISCONNECT;
|
||||
}
|
||||
|
||||
private void setup(final ChannelHandler... handlers) {
|
||||
ObjectUtil.checkNotNull(handlers, "handlers");
|
||||
ChannelPipeline p = pipeline();
|
||||
p.addLast(new ChannelInitializer<Channel>() {
|
||||
@Override
|
||||
|
Loading…
x
Reference in New Issue
Block a user