[#1294] Make sure ByteBuf is released once written to channel
This commit is contained in:
parent
9a5f45a0c1
commit
9c4bfa44d9
@ -221,7 +221,8 @@ public abstract class HttpContentDecoder extends MessageToMessageDecoder<HttpObj
|
||||
}
|
||||
|
||||
private void decode(ByteBuf in, ByteBuf out) {
|
||||
decoder.writeInbound(in);
|
||||
// call retain as it will be release after is written
|
||||
decoder.writeInbound(in.retain());
|
||||
fetchDecoderOutput(out);
|
||||
}
|
||||
|
||||
|
@ -247,7 +247,8 @@ public abstract class HttpContentEncoder extends MessageToMessageCodec<HttpReque
|
||||
}
|
||||
|
||||
private void encode(ByteBuf in, ByteBuf out) {
|
||||
encoder.writeOutbound(in);
|
||||
// call retain here as it will call release after its written to the channel
|
||||
encoder.writeOutbound(in.retain());
|
||||
fetchEncoderOutput(out);
|
||||
}
|
||||
|
||||
|
@ -32,7 +32,7 @@ public class JZlibTest {
|
||||
EmbeddedByteChannel chEncoder =
|
||||
new EmbeddedByteChannel(new JZlibEncoder(ZlibWrapper.ZLIB));
|
||||
|
||||
chEncoder.writeOutbound(data);
|
||||
chEncoder.writeOutbound(data.copy());
|
||||
assertTrue(chEncoder.finish());
|
||||
|
||||
byte[] deflatedData = chEncoder.readOutbound().array();
|
||||
@ -61,7 +61,7 @@ public class JZlibTest {
|
||||
EmbeddedByteChannel chEncoder =
|
||||
new EmbeddedByteChannel(new JZlibEncoder(ZlibWrapper.NONE));
|
||||
|
||||
chEncoder.writeOutbound(data);
|
||||
chEncoder.writeOutbound(data.copy());
|
||||
assertTrue(chEncoder.finish());
|
||||
|
||||
byte[] deflatedData = chEncoder.readOutbound().array();
|
||||
@ -90,7 +90,7 @@ public class JZlibTest {
|
||||
EmbeddedByteChannel chEncoder =
|
||||
new EmbeddedByteChannel(new JZlibEncoder(ZlibWrapper.GZIP));
|
||||
|
||||
chEncoder.writeOutbound(data);
|
||||
chEncoder.writeOutbound(data.copy());
|
||||
assertTrue(chEncoder.finish());
|
||||
|
||||
byte[] deflatedData = chEncoder.readOutbound().array();
|
||||
|
@ -73,9 +73,9 @@ public class SnappyFramedEncoderTest {
|
||||
'n', 'e', 't', 't', 'y'
|
||||
});
|
||||
|
||||
channel.writeOutbound(in);
|
||||
channel.writeOutbound(in.copy());
|
||||
in.readerIndex(0); // rewind the buffer to write the same data
|
||||
channel.writeOutbound(in);
|
||||
channel.writeOutbound(in.copy());
|
||||
assertTrue(channel.finish());
|
||||
|
||||
ByteBuf expected = Unpooled.wrappedBuffer(new byte[] {
|
||||
@ -84,6 +84,7 @@ public class SnappyFramedEncoderTest {
|
||||
0x01, 0x09, 0x00, 0x00, 0x2d, -0x5a, -0x7e, -0x5e, 'n', 'e', 't', 't', 'y',
|
||||
});
|
||||
assertEquals(expected, channel.readOutbound());
|
||||
in.release();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -0,0 +1,87 @@
|
||||
/*
|
||||
* Copyright 2012 The Netty Project
|
||||
*
|
||||
* The Netty Project licenses this file to you under the Apache License,
|
||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at:
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package io.netty.testsuite.transport.socket;
|
||||
|
||||
import io.netty.bootstrap.Bootstrap;
|
||||
import io.netty.bootstrap.ServerBootstrap;
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.channel.Channel;
|
||||
import io.netty.channel.ChannelFuture;
|
||||
import io.netty.channel.ChannelFutureListener;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.ChannelInboundMessageHandlerAdapter;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.Random;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
public class SocketBufReleaseTest extends AbstractSocketTest {
|
||||
|
||||
@Test
|
||||
public void testBufRelease() throws Throwable {
|
||||
run();
|
||||
}
|
||||
|
||||
public void testBufRelease(ServerBootstrap sb, Bootstrap cb) throws Throwable {
|
||||
BufWriterHandler serverHandler = new BufWriterHandler();
|
||||
BufWriterHandler clientHandler = new BufWriterHandler();
|
||||
|
||||
sb.childHandler(serverHandler);
|
||||
cb.handler(clientHandler);
|
||||
|
||||
Channel sc = sb.bind().sync().channel();
|
||||
Channel cc = cb.connect().sync().channel();
|
||||
|
||||
sc.close().sync();
|
||||
cc.close().sync();
|
||||
serverHandler.check();
|
||||
clientHandler.check();
|
||||
}
|
||||
|
||||
private static class BufWriterHandler extends ChannelInboundMessageHandlerAdapter<Object> {
|
||||
|
||||
private final Random random = new Random();
|
||||
private final CountDownLatch latch = new CountDownLatch(1);
|
||||
private ByteBuf buf;
|
||||
@Override
|
||||
public void channelActive(final ChannelHandlerContext ctx) throws Exception {
|
||||
byte[] data = new byte[1024];
|
||||
random.nextBytes(data);
|
||||
|
||||
buf = ctx.alloc().buffer();
|
||||
buf.writeBytes(data);
|
||||
|
||||
ctx.channel().write(buf).addListener(new ChannelFutureListener() {
|
||||
@Override
|
||||
public void operationComplete(ChannelFuture future) throws Exception {
|
||||
latch.countDown();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public void messageReceived(ChannelHandlerContext ctx, Object msg) throws Exception {
|
||||
// Discard
|
||||
}
|
||||
|
||||
public void check() throws InterruptedException {
|
||||
latch.await();
|
||||
assertEquals(0, buf.refCnt());
|
||||
}
|
||||
}
|
||||
}
|
@ -149,7 +149,7 @@ public class SocketFixedLengthEchoTest extends AbstractSocketTest {
|
||||
}
|
||||
|
||||
if (channel.parent() != null) {
|
||||
channel.write(msg);
|
||||
channel.write(msg.retain());
|
||||
}
|
||||
|
||||
counter += actual.length;
|
||||
|
@ -1413,7 +1413,11 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
|
||||
outboundMessageBuffer().add(message);
|
||||
} else {
|
||||
ByteBuf buf = (ByteBuf) message;
|
||||
outboundByteBuffer().writeBytes(buf, buf.readerIndex(), buf.readableBytes());
|
||||
try {
|
||||
outboundByteBuffer().writeBytes(buf, buf.readerIndex(), buf.readableBytes());
|
||||
} finally {
|
||||
buf.release();
|
||||
}
|
||||
}
|
||||
invokeFlush0(promise);
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user