diff --git a/codec/src/main/java/io/netty/handler/codec/MessageToByteEncoder.java b/codec/src/main/java/io/netty/handler/codec/MessageToByteEncoder.java index e5bb79e7a2..9c19681e66 100644 --- a/codec/src/main/java/io/netty/handler/codec/MessageToByteEncoder.java +++ b/codec/src/main/java/io/netty/handler/codec/MessageToByteEncoder.java @@ -44,13 +44,24 @@ import io.netty.util.internal.TypeParameterMatcher; public abstract class MessageToByteEncoder extends ChannelOutboundHandlerAdapter { private final TypeParameterMatcher matcher; + private final boolean preferDirect; protected MessageToByteEncoder() { - matcher = TypeParameterMatcher.find(this, MessageToByteEncoder.class, "I"); + this(true); } protected MessageToByteEncoder(Class outboundMessageType) { + this(outboundMessageType, true); + } + + protected MessageToByteEncoder(boolean preferDirect) { + matcher = TypeParameterMatcher.find(this, MessageToByteEncoder.class, "I"); + this.preferDirect = preferDirect; + } + + protected MessageToByteEncoder(Class outboundMessageType, boolean preferDirect) { matcher = TypeParameterMatcher.get(outboundMessageType); + this.preferDirect = preferDirect; } public boolean acceptOutboundMessage(Object msg) throws Exception { @@ -61,8 +72,8 @@ public abstract class MessageToByteEncoder extends ChannelOutboundHandlerAdap public void write(ChannelHandlerContext ctx, MessageList msgs, ChannelPromise promise) throws Exception { MessageList out = MessageList.newInstance(); boolean success = false; + ByteBuf buf = null; try { - ByteBuf buf = null; int size = msgs.size(); for (int i = 0; i < size; i ++) { // handler was removed in the loop so now copy over all remaining messages @@ -78,7 +89,11 @@ public abstract class MessageToByteEncoder extends ChannelOutboundHandlerAdap @SuppressWarnings("unchecked") I cast = (I) m; if (buf == null) { - buf = ctx.alloc().buffer(); + if (preferDirect) { + buf = ctx.alloc().ioBuffer(); + } else { + buf = ctx.alloc().heapBuffer(); + } } try { encode(ctx, cast, buf); @@ -98,8 +113,7 @@ public abstract class MessageToByteEncoder extends ChannelOutboundHandlerAdap if (buf != null) { if (buf.isReadable()) { out.add(buf); - } else { - buf.release(); + buf = null; } } @@ -110,6 +124,9 @@ public abstract class MessageToByteEncoder extends ChannelOutboundHandlerAdap throw new EncoderException(e); } finally { msgs.recycle(); + if (buf != null) { + buf.release(); + } if (success) { ctx.write(out, promise); } else { diff --git a/codec/src/main/java/io/netty/handler/codec/compression/JZlibEncoder.java b/codec/src/main/java/io/netty/handler/codec/compression/JZlibEncoder.java index 2a94ab2ef8..758feb0e81 100644 --- a/codec/src/main/java/io/netty/handler/codec/compression/JZlibEncoder.java +++ b/codec/src/main/java/io/netty/handler/codec/compression/JZlibEncoder.java @@ -263,8 +263,7 @@ public class JZlibEncoder extends ZlibEncoder { } @Override - protected void encode(ChannelHandlerContext ctx, - ByteBuf in, ByteBuf out) throws Exception { + protected void encode(ChannelHandlerContext ctx, ByteBuf in, ByteBuf out) throws Exception { if (finished.get()) { return; } @@ -288,7 +287,6 @@ public class JZlibEncoder extends ZlibEncoder { // Configure output. int maxOutputLength = (int) Math.ceil(inputLength * 1.001) + 12; - ByteBuf compressed = ctx.alloc().heapBuffer(maxOutputLength); z.avail_out = maxOutputLength; z.next_out = out.array(); z.next_out_index = out.arrayOffset() + out.writerIndex(); diff --git a/codec/src/main/java/io/netty/handler/codec/compression/ZlibEncoder.java b/codec/src/main/java/io/netty/handler/codec/compression/ZlibEncoder.java index c45d949658..932f7836d0 100644 --- a/codec/src/main/java/io/netty/handler/codec/compression/ZlibEncoder.java +++ b/codec/src/main/java/io/netty/handler/codec/compression/ZlibEncoder.java @@ -25,6 +25,10 @@ import io.netty.handler.codec.MessageToByteEncoder; */ public abstract class ZlibEncoder extends MessageToByteEncoder { + protected ZlibEncoder() { + super(false); + } + /** * Returns {@code true} if and only if the end of the compressed stream * has been reached. diff --git a/codec/src/test/java/io/netty/handler/codec/compression/SnappyIntegrationTest.java b/codec/src/test/java/io/netty/handler/codec/compression/SnappyIntegrationTest.java index a073ec9945..6b54c1d69a 100644 --- a/codec/src/test/java/io/netty/handler/codec/compression/SnappyIntegrationTest.java +++ b/codec/src/test/java/io/netty/handler/codec/compression/SnappyIntegrationTest.java @@ -17,7 +17,6 @@ package io.netty.handler.codec.compression; import io.netty.buffer.ByteBuf; import io.netty.buffer.CompositeByteBuf; -import io.netty.buffer.Unpooled; import io.netty.channel.embedded.EmbeddedChannel; import io.netty.util.CharsetUtil; import org.junit.Test; @@ -128,7 +127,7 @@ public class SnappyIntegrationTest { decoder.writeInbound(compressed.retain()); assertFalse(compressed.isReadable()); compressed.release(); - CompositeByteBuf decompressed = Unpooled.compositeBuffer(); + CompositeByteBuf decompressed = compositeBuffer(); for (;;) { Object o = decoder.readInbound(); if (o == null) { @@ -138,6 +137,7 @@ public class SnappyIntegrationTest { decompressed.writerIndex(decompressed.writerIndex() + ((ByteBuf) o).readableBytes()); } assertEquals(in, decompressed); + decompressed.release(); } finally { // Avoids memory leak through AbstractChannel.allChannels encoder.close(); diff --git a/codec/src/test/java/io/netty/handler/codec/marshalling/AbstractCompatibleMarshallingEncoderTest.java b/codec/src/test/java/io/netty/handler/codec/marshalling/AbstractCompatibleMarshallingEncoderTest.java index 2a03e827cb..f6dfa4f219 100644 --- a/codec/src/test/java/io/netty/handler/codec/marshalling/AbstractCompatibleMarshallingEncoderTest.java +++ b/codec/src/test/java/io/netty/handler/codec/marshalling/AbstractCompatibleMarshallingEncoderTest.java @@ -56,7 +56,9 @@ public abstract class AbstractCompatibleMarshallingEncoderTest { unmarshaller.finish(); unmarshaller.close(); + buffer.release(); } + protected ByteBuf truncate(ByteBuf buf) { return buf; } diff --git a/transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java b/transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java index c94d1a1074..b16b5bdcc8 100755 --- a/transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java +++ b/transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java @@ -508,7 +508,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements try { ((ChannelOutboundHandler) handler()).bind(this, localAddress, promise); } catch (Throwable t) { - notifyHandlerException(t, promise); + notifyOutboundHandlerException(t, promise); } } @@ -547,7 +547,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements try { ((ChannelOutboundHandler) handler()).connect(this, remoteAddress, localAddress, promise); } catch (Throwable t) { - notifyHandlerException(t, promise); + notifyOutboundHandlerException(t, promise); } } @@ -584,7 +584,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements try { ((ChannelOutboundHandler) handler()).disconnect(this, promise); } catch (Throwable t) { - notifyHandlerException(t, promise); + notifyOutboundHandlerException(t, promise); } } @@ -614,7 +614,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements try { ((ChannelOutboundHandler) handler()).close(this, promise); } catch (Throwable t) { - notifyHandlerException(t, promise); + notifyOutboundHandlerException(t, promise); } } @@ -644,7 +644,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements try { ((ChannelOutboundHandler) handler()).deregister(this, promise); } catch (Throwable t) { - notifyHandlerException(t, promise); + notifyOutboundHandlerException(t, promise); } } @@ -715,22 +715,22 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements try { handler.write(this, msgs.cast(), promise); } catch (Throwable t) { - notifyHandlerException(t, promise); + notifyOutboundHandlerException(t, promise); } } - private void notifyHandlerException(Throwable cause, ChannelPromise promise) { + private static void notifyOutboundHandlerException(Throwable cause, ChannelPromise promise) { // only try to fail the promise if its not a VoidChannelPromise, as // the VoidChannelPromise would also fire the cause through the pipeline - if (!(promise instanceof VoidChannelPromise) && !promise.tryFailure(cause)) { - if (logger.isWarnEnabled()) { - logger.warn( - "Failed to fail the promise", cause); - } + if (promise instanceof VoidChannelPromise) { return; } - notifyHandlerException(cause); + if (!promise.tryFailure(cause)) { + if (logger.isWarnEnabled()) { + logger.warn("Failed to fail the promise because it's done already: {}", promise, cause); + } + } } private void notifyHandlerException(Throwable cause) {