Fix test failures and reported leaks

This commit is contained in:
Trustin Lee 2013-06-13 15:18:11 +09:00
parent 01d9f10af6
commit 427d9c4bf2
6 changed files with 44 additions and 23 deletions

View File

@ -44,13 +44,24 @@ import io.netty.util.internal.TypeParameterMatcher;
public abstract class MessageToByteEncoder<I> extends ChannelOutboundHandlerAdapter {
private final TypeParameterMatcher matcher;
private final boolean preferDirect;
protected MessageToByteEncoder() {
matcher = TypeParameterMatcher.find(this, MessageToByteEncoder.class, "I");
this(true);
}
protected MessageToByteEncoder(Class<? extends I> outboundMessageType) {
this(outboundMessageType, true);
}
protected MessageToByteEncoder(boolean preferDirect) {
matcher = TypeParameterMatcher.find(this, MessageToByteEncoder.class, "I");
this.preferDirect = preferDirect;
}
protected MessageToByteEncoder(Class<? extends I> outboundMessageType, boolean preferDirect) {
matcher = TypeParameterMatcher.get(outboundMessageType);
this.preferDirect = preferDirect;
}
public boolean acceptOutboundMessage(Object msg) throws Exception {
@ -61,8 +72,8 @@ public abstract class MessageToByteEncoder<I> extends ChannelOutboundHandlerAdap
public void write(ChannelHandlerContext ctx, MessageList<Object> msgs, ChannelPromise promise) throws Exception {
MessageList<Object> out = MessageList.newInstance();
boolean success = false;
ByteBuf buf = null;
try {
ByteBuf buf = null;
int size = msgs.size();
for (int i = 0; i < size; i ++) {
// handler was removed in the loop so now copy over all remaining messages
@ -78,7 +89,11 @@ public abstract class MessageToByteEncoder<I> extends ChannelOutboundHandlerAdap
@SuppressWarnings("unchecked")
I cast = (I) m;
if (buf == null) {
buf = ctx.alloc().buffer();
if (preferDirect) {
buf = ctx.alloc().ioBuffer();
} else {
buf = ctx.alloc().heapBuffer();
}
}
try {
encode(ctx, cast, buf);
@ -98,8 +113,7 @@ public abstract class MessageToByteEncoder<I> extends ChannelOutboundHandlerAdap
if (buf != null) {
if (buf.isReadable()) {
out.add(buf);
} else {
buf.release();
buf = null;
}
}
@ -110,6 +124,9 @@ public abstract class MessageToByteEncoder<I> extends ChannelOutboundHandlerAdap
throw new EncoderException(e);
} finally {
msgs.recycle();
if (buf != null) {
buf.release();
}
if (success) {
ctx.write(out, promise);
} else {

View File

@ -263,8 +263,7 @@ public class JZlibEncoder extends ZlibEncoder {
}
@Override
protected void encode(ChannelHandlerContext ctx,
ByteBuf in, ByteBuf out) throws Exception {
protected void encode(ChannelHandlerContext ctx, ByteBuf in, ByteBuf out) throws Exception {
if (finished.get()) {
return;
}
@ -288,7 +287,6 @@ public class JZlibEncoder extends ZlibEncoder {
// Configure output.
int maxOutputLength = (int) Math.ceil(inputLength * 1.001) + 12;
ByteBuf compressed = ctx.alloc().heapBuffer(maxOutputLength);
z.avail_out = maxOutputLength;
z.next_out = out.array();
z.next_out_index = out.arrayOffset() + out.writerIndex();

View File

@ -25,6 +25,10 @@ import io.netty.handler.codec.MessageToByteEncoder;
*/
public abstract class ZlibEncoder extends MessageToByteEncoder<ByteBuf> {
protected ZlibEncoder() {
super(false);
}
/**
* Returns {@code true} if and only if the end of the compressed stream
* has been reached.

View File

@ -17,7 +17,6 @@ package io.netty.handler.codec.compression;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.util.CharsetUtil;
import org.junit.Test;
@ -128,7 +127,7 @@ public class SnappyIntegrationTest {
decoder.writeInbound(compressed.retain());
assertFalse(compressed.isReadable());
compressed.release();
CompositeByteBuf decompressed = Unpooled.compositeBuffer();
CompositeByteBuf decompressed = compositeBuffer();
for (;;) {
Object o = decoder.readInbound();
if (o == null) {
@ -138,6 +137,7 @@ public class SnappyIntegrationTest {
decompressed.writerIndex(decompressed.writerIndex() + ((ByteBuf) o).readableBytes());
}
assertEquals(in, decompressed);
decompressed.release();
} finally {
// Avoids memory leak through AbstractChannel.allChannels
encoder.close();

View File

@ -56,7 +56,9 @@ public abstract class AbstractCompatibleMarshallingEncoderTest {
unmarshaller.finish();
unmarshaller.close();
buffer.release();
}
protected ByteBuf truncate(ByteBuf buf) {
return buf;
}

View File

@ -508,7 +508,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
try {
((ChannelOutboundHandler) handler()).bind(this, localAddress, promise);
} catch (Throwable t) {
notifyHandlerException(t, promise);
notifyOutboundHandlerException(t, promise);
}
}
@ -547,7 +547,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
try {
((ChannelOutboundHandler) handler()).connect(this, remoteAddress, localAddress, promise);
} catch (Throwable t) {
notifyHandlerException(t, promise);
notifyOutboundHandlerException(t, promise);
}
}
@ -584,7 +584,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
try {
((ChannelOutboundHandler) handler()).disconnect(this, promise);
} catch (Throwable t) {
notifyHandlerException(t, promise);
notifyOutboundHandlerException(t, promise);
}
}
@ -614,7 +614,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
try {
((ChannelOutboundHandler) handler()).close(this, promise);
} catch (Throwable t) {
notifyHandlerException(t, promise);
notifyOutboundHandlerException(t, promise);
}
}
@ -644,7 +644,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
try {
((ChannelOutboundHandler) handler()).deregister(this, promise);
} catch (Throwable t) {
notifyHandlerException(t, promise);
notifyOutboundHandlerException(t, promise);
}
}
@ -715,22 +715,22 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
try {
handler.write(this, msgs.cast(), promise);
} catch (Throwable t) {
notifyHandlerException(t, promise);
notifyOutboundHandlerException(t, promise);
}
}
private void notifyHandlerException(Throwable cause, ChannelPromise promise) {
private static void notifyOutboundHandlerException(Throwable cause, ChannelPromise promise) {
// only try to fail the promise if its not a VoidChannelPromise, as
// the VoidChannelPromise would also fire the cause through the pipeline
if (!(promise instanceof VoidChannelPromise) && !promise.tryFailure(cause)) {
if (logger.isWarnEnabled()) {
logger.warn(
"Failed to fail the promise", cause);
}
if (promise instanceof VoidChannelPromise) {
return;
}
notifyHandlerException(cause);
if (!promise.tryFailure(cause)) {
if (logger.isWarnEnabled()) {
logger.warn("Failed to fail the promise because it's done already: {}", promise, cause);
}
}
}
private void notifyHandlerException(Throwable cause) {