diff --git a/buffer/src/main/java/io/netty/buffer/ByteBufInputStream.java b/buffer/src/main/java/io/netty/buffer/ByteBufInputStream.java index 56496fadf4..e36b936a5a 100644 --- a/buffer/src/main/java/io/netty/buffer/ByteBufInputStream.java +++ b/buffer/src/main/java/io/netty/buffer/ByteBufInputStream.java @@ -15,6 +15,8 @@ */ package io.netty.buffer; +import io.netty.util.ReferenceCounted; + import java.io.DataInput; import java.io.DataInputStream; import java.io.EOFException; @@ -38,15 +40,23 @@ import java.io.InputStream; * @see ByteBufOutputStream */ public class ByteBufInputStream extends InputStream implements DataInput { - private final ByteBuf buffer; private final int startIndex; private final int endIndex; + private boolean closed; + /** + * To preserve backwards compatibility (which didn't transfer ownership) we support a conditional flag which + * indicates if {@link #buffer} should be released when this {@link InputStream} is closed. + * However in future releases ownership should always be transferred and callers of this class should call + * {@link ReferenceCounted#retain()} if necessary. + */ + private boolean releaseOnClose; /** * Creates a new stream which reads data from the specified {@code buffer} * starting at the current {@code readerIndex} and ending at the current * {@code writerIndex}. + * @param buffer The buffer which provides the content for this {@link InputStream}. */ public ByteBufInputStream(ByteBuf buffer) { this(buffer, buffer.readableBytes()); @@ -56,23 +66,59 @@ public class ByteBufInputStream extends InputStream implements DataInput { * Creates a new stream which reads data from the specified {@code buffer} * starting at the current {@code readerIndex} and ending at * {@code readerIndex + length}. - * + * @param buffer The buffer which provides the content for this {@link InputStream}. + * @param length The length of the buffer to use for this {@link InputStream}. * @throws IndexOutOfBoundsException * if {@code readerIndex + length} is greater than * {@code writerIndex} */ public ByteBufInputStream(ByteBuf buffer, int length) { + this(buffer, length, false); + } + + /** + * Creates a new stream which reads data from the specified {@code buffer} + * starting at the current {@code readerIndex} and ending at the current + * {@code writerIndex}. + * @param buffer The buffer which provides the content for this {@link InputStream}. + * @param releaseOnClose {@code true} means that when {@link #close()} is called then {@link ByteBuf#release()} will + * be called on {@code buffer}. + */ + public ByteBufInputStream(ByteBuf buffer, boolean releaseOnClose) { + this(buffer, buffer.readableBytes(), releaseOnClose); + } + + /** + * Creates a new stream which reads data from the specified {@code buffer} + * starting at the current {@code readerIndex} and ending at + * {@code readerIndex + length}. + * @param buffer The buffer which provides the content for this {@Link InputStream}. + * @param length The length of the buffer to use for this {@link InputStream}. + * @param releaseOnClose {@code true} means that when {@link #close()} is called then {@link ByteBuf#release()} will + * be called on {@code buffer}. + * @throws IndexOutOfBoundsException + * if {@code readerIndex + length} is greater than + * {@code writerIndex} + */ + public ByteBufInputStream(ByteBuf buffer, int length, boolean releaseOnClose) { if (buffer == null) { throw new NullPointerException("buffer"); } if (length < 0) { + if (releaseOnClose) { + buffer.release(); + } throw new IllegalArgumentException("length: " + length); } if (length > buffer.readableBytes()) { + if (releaseOnClose) { + buffer.release(); + } throw new IndexOutOfBoundsException("Too many bytes to be read - Needs " + length + ", maximum is " + buffer.readableBytes()); } + this.releaseOnClose = releaseOnClose; this.buffer = buffer; startIndex = buffer.readerIndex(); endIndex = startIndex + length; @@ -86,6 +132,19 @@ public class ByteBufInputStream extends InputStream implements DataInput { return buffer.readerIndex() - startIndex; } + @Override + public void close() throws IOException { + try { + super.close(); + } finally { + // The Closable interface says "If the stream is already closed then invoking this method has no effect." + if (releaseOnClose && !closed) { + closed = true; + buffer.release(); + } + } + } + @Override public int available() throws IOException { return endIndex - buffer.readerIndex(); diff --git a/buffer/src/test/java/io/netty/buffer/ByteBufStreamTest.java b/buffer/src/test/java/io/netty/buffer/ByteBufStreamTest.java index 7f2d180061..d8c096afd9 100644 --- a/buffer/src/test/java/io/netty/buffer/ByteBufStreamTest.java +++ b/buffer/src/test/java/io/netty/buffer/ByteBufStreamTest.java @@ -15,6 +15,8 @@ */ package io.netty.buffer; +import io.netty.util.ReferenceCountUtil; +import io.netty.util.ReferenceCounted; import org.junit.Test; import java.io.EOFException; @@ -30,7 +32,7 @@ public class ByteBufStreamTest { @Test public void testAll() throws Exception { - ByteBuf buf = Unpooled.buffer(0, 65536); + ByteBuf buf = ReferenceCountUtil.releaseLater(Unpooled.buffer(0, 65536)); try { new ByteBufOutputStream(null); @@ -40,133 +42,140 @@ public class ByteBufStreamTest { } ByteBufOutputStream out = new ByteBufOutputStream(buf); - assertSame(buf, out.buffer()); - out.writeBoolean(true); - out.writeBoolean(false); - out.writeByte(42); - out.writeByte(224); - out.writeBytes("Hello, World!"); - out.writeChars("Hello, World"); - out.writeChar('!'); - out.writeDouble(42.0); - out.writeFloat(42.0f); - out.writeInt(42); - out.writeLong(42); - out.writeShort(42); - out.writeShort(49152); - out.writeUTF("Hello, World!"); - out.writeBytes("The first line\r\r\n"); - out.write(EMPTY_BYTES); - out.write(new byte[] { 1, 2, 3, 4 }); - out.write(new byte[] { 1, 3, 3, 4 }, 0, 0); - out.close(); + try { + assertSame(buf, out.buffer()); + out.writeBoolean(true); + out.writeBoolean(false); + out.writeByte(42); + out.writeByte(224); + out.writeBytes("Hello, World!"); + out.writeChars("Hello, World"); + out.writeChar('!'); + out.writeDouble(42.0); + out.writeFloat(42.0f); + out.writeInt(42); + out.writeLong(42); + out.writeShort(42); + out.writeShort(49152); + out.writeUTF("Hello, World!"); + out.writeBytes("The first line\r\r\n"); + out.write(EMPTY_BYTES); + out.write(new byte[]{1, 2, 3, 4}); + out.write(new byte[]{1, 3, 3, 4}, 0, 0); + } finally { + out.close(); + } try { - new ByteBufInputStream(null); + new ByteBufInputStream(null, true); fail(); } catch (NullPointerException e) { // Expected } try { - new ByteBufInputStream(null, 0); + new ByteBufInputStream(null, 0, true); fail(); } catch (NullPointerException e) { // Expected } try { - new ByteBufInputStream(buf, -1); + new ByteBufInputStream(buf.retainedSlice(), -1, true); } catch (IllegalArgumentException e) { // Expected } try { - new ByteBufInputStream(buf, buf.capacity() + 1); + new ByteBufInputStream(buf.retainedSlice(), buf.capacity() + 1, true); } catch (IndexOutOfBoundsException e) { // Expected } - ByteBufInputStream in = new ByteBufInputStream(buf); - - assertTrue(in.markSupported()); - in.mark(Integer.MAX_VALUE); - - assertEquals(buf.writerIndex(), in.skip(Long.MAX_VALUE)); - assertFalse(buf.isReadable()); - - in.reset(); - assertEquals(0, buf.readerIndex()); - - assertEquals(4, in.skip(4)); - assertEquals(4, buf.readerIndex()); - in.reset(); - - assertTrue(in.readBoolean()); - assertFalse(in.readBoolean()); - assertEquals(42, in.readByte()); - assertEquals(224, in.readUnsignedByte()); - - byte[] tmp = new byte[13]; - in.readFully(tmp); - assertEquals("Hello, World!", new String(tmp, "ISO-8859-1")); - - assertEquals('H', in.readChar()); - assertEquals('e', in.readChar()); - assertEquals('l', in.readChar()); - assertEquals('l', in.readChar()); - assertEquals('o', in.readChar()); - assertEquals(',', in.readChar()); - assertEquals(' ', in.readChar()); - assertEquals('W', in.readChar()); - assertEquals('o', in.readChar()); - assertEquals('r', in.readChar()); - assertEquals('l', in.readChar()); - assertEquals('d', in.readChar()); - assertEquals('!', in.readChar()); - - assertEquals(42.0, in.readDouble(), 0.0); - assertEquals(42.0f, in.readFloat(), 0.0); - assertEquals(42, in.readInt()); - assertEquals(42, in.readLong()); - assertEquals(42, in.readShort()); - assertEquals(49152, in.readUnsignedShort()); - - assertEquals("Hello, World!", in.readUTF()); - assertEquals("The first line", in.readLine()); - assertEquals("", in.readLine()); - - assertEquals(4, in.read(tmp)); - assertEquals(1, tmp[0]); - assertEquals(2, tmp[1]); - assertEquals(3, tmp[2]); - assertEquals(4, tmp[3]); - - assertEquals(-1, in.read()); - assertEquals(-1, in.read(tmp)); - + ByteBufInputStream in = new ByteBufInputStream(buf, true); try { - in.readByte(); - fail(); - } catch (EOFException e) { - // Expected - } + assertTrue(in.markSupported()); + in.mark(Integer.MAX_VALUE); - try { - in.readFully(tmp, 0, -1); - fail(); - } catch (IndexOutOfBoundsException e) { - // Expected - } + assertEquals(buf.writerIndex(), in.skip(Long.MAX_VALUE)); + assertFalse(buf.isReadable()); - try { + in.reset(); + assertEquals(0, buf.readerIndex()); + + assertEquals(4, in.skip(4)); + assertEquals(4, buf.readerIndex()); + in.reset(); + + assertTrue(in.readBoolean()); + assertFalse(in.readBoolean()); + assertEquals(42, in.readByte()); + assertEquals(224, in.readUnsignedByte()); + + byte[] tmp = new byte[13]; in.readFully(tmp); - fail(); - } catch (EOFException e) { - // Expected - } + assertEquals("Hello, World!", new String(tmp, "ISO-8859-1")); - in.close(); + assertEquals('H', in.readChar()); + assertEquals('e', in.readChar()); + assertEquals('l', in.readChar()); + assertEquals('l', in.readChar()); + assertEquals('o', in.readChar()); + assertEquals(',', in.readChar()); + assertEquals(' ', in.readChar()); + assertEquals('W', in.readChar()); + assertEquals('o', in.readChar()); + assertEquals('r', in.readChar()); + assertEquals('l', in.readChar()); + assertEquals('d', in.readChar()); + assertEquals('!', in.readChar()); + + assertEquals(42.0, in.readDouble(), 0.0); + assertEquals(42.0f, in.readFloat(), 0.0); + assertEquals(42, in.readInt()); + assertEquals(42, in.readLong()); + assertEquals(42, in.readShort()); + assertEquals(49152, in.readUnsignedShort()); + + assertEquals("Hello, World!", in.readUTF()); + assertEquals("The first line", in.readLine()); + assertEquals("", in.readLine()); + + assertEquals(4, in.read(tmp)); + assertEquals(1, tmp[0]); + assertEquals(2, tmp[1]); + assertEquals(3, tmp[2]); + assertEquals(4, tmp[3]); + + assertEquals(-1, in.read()); + assertEquals(-1, in.read(tmp)); + + try { + in.readByte(); + fail(); + } catch (EOFException e) { + // Expected + } + + try { + in.readFully(tmp, 0, -1); + fail(); + } catch (IndexOutOfBoundsException e) { + // Expected + } + + try { + in.readFully(tmp); + fail(); + } catch (EOFException e) { + // Expected + } + } finally { + // Ownership was transferred to the ByteBufOutputStream, before we close we must retain the underlying + // buffer. + buf.retain(); + in.close(); + } assertEquals(buf.readerIndex(), in.readBytes()); } @@ -175,7 +184,7 @@ public class ByteBufStreamTest { public void testReadLine() throws Exception { Charset utf8 = Charset.forName("UTF-8"); ByteBuf buf = Unpooled.buffer(); - ByteBufInputStream in = new ByteBufInputStream(buf); + ByteBufInputStream in = new ByteBufInputStream(buf, true); String s = in.readLine(); assertNull(s); diff --git a/buffer/src/test/java/io/netty/buffer/FixedCompositeByteBufTest.java b/buffer/src/test/java/io/netty/buffer/FixedCompositeByteBufTest.java index 2dafc91a14..e01a6b7cfc 100644 --- a/buffer/src/test/java/io/netty/buffer/FixedCompositeByteBufTest.java +++ b/buffer/src/test/java/io/netty/buffer/FixedCompositeByteBufTest.java @@ -20,6 +20,7 @@ import org.junit.Test; import java.io.ByteArrayInputStream; import java.io.IOException; +import java.io.OutputStream; import java.nio.ByteBuffer; import java.nio.ReadOnlyBufferException; import java.nio.channels.ScatteringByteChannel; @@ -288,14 +289,21 @@ public class FixedCompositeByteBufTest { ByteBuf composite = unmodifiableBuffer(buf1, buf2, buf3); ByteBuf copy = directBuffer(3); ByteBuf copy2 = buffer(3); - composite.getBytes(0, new ByteBufOutputStream(copy), 3); - composite.getBytes(0, new ByteBufOutputStream(copy2), 3); - assertEquals(0, ByteBufUtil.compare(copy, composite)); - assertEquals(0, ByteBufUtil.compare(copy2, composite)); - assertEquals(0, ByteBufUtil.compare(copy, copy2)); - copy.release(); - copy2.release(); - composite.release(); + OutputStream copyStream = new ByteBufOutputStream(copy); + OutputStream copy2Stream = new ByteBufOutputStream(copy2); + try { + composite.getBytes(0, copyStream, 3); + composite.getBytes(0, copy2Stream, 3); + assertEquals(0, ByteBufUtil.compare(copy, composite)); + assertEquals(0, ByteBufUtil.compare(copy2, composite)); + assertEquals(0, ByteBufUtil.compare(copy, copy2)); + } finally { + copy.release(); + copy2.release(); + copyStream.close(); + copy2Stream.close(); + composite.release(); + } } @Test diff --git a/codec/src/main/java/io/netty/handler/codec/compression/LzmaFrameEncoder.java b/codec/src/main/java/io/netty/handler/codec/compression/LzmaFrameEncoder.java index 62cafc4e10..acc2bfceeb 100644 --- a/codec/src/main/java/io/netty/handler/codec/compression/LzmaFrameEncoder.java +++ b/codec/src/main/java/io/netty/handler/codec/compression/LzmaFrameEncoder.java @@ -175,16 +175,23 @@ public class LzmaFrameEncoder extends MessageToByteEncoder { @Override protected void encode(ChannelHandlerContext ctx, ByteBuf in, ByteBuf out) throws Exception { final int length = in.readableBytes(); - final InputStream bbIn = new ByteBufInputStream(in); - - final ByteBufOutputStream bbOut = new ByteBufOutputStream(out); - bbOut.writeByte(properties); - bbOut.writeInt(littleEndianDictionarySize); - bbOut.writeLong(Long.reverseBytes(length)); - encoder.code(bbIn, bbOut, -1, -1, null); - - bbIn.close(); - bbOut.close(); + InputStream bbIn = null; + ByteBufOutputStream bbOut = null; + try { + bbIn = new ByteBufInputStream(in); + bbOut = new ByteBufOutputStream(out); + bbOut.writeByte(properties); + bbOut.writeInt(littleEndianDictionarySize); + bbOut.writeLong(Long.reverseBytes(length)); + encoder.code(bbIn, bbOut, -1, -1, null); + } finally { + if (bbIn != null) { + bbIn.close(); + } + if (bbOut != null) { + bbOut.close(); + } + } } @Override diff --git a/codec/src/main/java/io/netty/handler/codec/serialization/ObjectDecoder.java b/codec/src/main/java/io/netty/handler/codec/serialization/ObjectDecoder.java index f4fddc5cfa..04e4a915eb 100644 --- a/codec/src/main/java/io/netty/handler/codec/serialization/ObjectDecoder.java +++ b/codec/src/main/java/io/netty/handler/codec/serialization/ObjectDecoder.java @@ -71,14 +71,11 @@ public class ObjectDecoder extends LengthFieldBasedFrameDecoder { return null; } - ObjectInputStream is = new CompactObjectInputStream(new ByteBufInputStream(frame), classResolver); - Object result = is.readObject(); - is.close(); - return result; - } - - @Override - protected ByteBuf extractFrame(ChannelHandlerContext ctx, ByteBuf buffer, int index, int length) { - return buffer.slice(index, length); + ObjectInputStream ois = new CompactObjectInputStream(new ByteBufInputStream(frame, true), classResolver); + try { + return ois.readObject(); + } finally { + ois.close(); + } } } diff --git a/codec/src/main/java/io/netty/handler/codec/serialization/ObjectEncoder.java b/codec/src/main/java/io/netty/handler/codec/serialization/ObjectEncoder.java index c3bbf2a347..910466762b 100644 --- a/codec/src/main/java/io/netty/handler/codec/serialization/ObjectEncoder.java +++ b/codec/src/main/java/io/netty/handler/codec/serialization/ObjectEncoder.java @@ -42,11 +42,19 @@ public class ObjectEncoder extends MessageToByteEncoder { int startIdx = out.writerIndex(); ByteBufOutputStream bout = new ByteBufOutputStream(out); - bout.write(LENGTH_PLACEHOLDER); - ObjectOutputStream oout = new CompactObjectOutputStream(bout); - oout.writeObject(msg); - oout.flush(); - oout.close(); + ObjectOutputStream oout = null; + try { + bout.write(LENGTH_PLACEHOLDER); + oout = new CompactObjectOutputStream(bout); + oout.writeObject(msg); + oout.flush(); + } finally { + if (oout != null) { + oout.close(); + } else { + bout.close(); + } + } int endIdx = out.writerIndex(); diff --git a/codec/src/main/java/io/netty/handler/codec/serialization/ObjectEncoderOutputStream.java b/codec/src/main/java/io/netty/handler/codec/serialization/ObjectEncoderOutputStream.java index 4329592898..769db41f4d 100644 --- a/codec/src/main/java/io/netty/handler/codec/serialization/ObjectEncoderOutputStream.java +++ b/codec/src/main/java/io/netty/handler/codec/serialization/ObjectEncoderOutputStream.java @@ -80,16 +80,22 @@ public class ObjectEncoderOutputStream extends OutputStream implements @Override public void writeObject(Object obj) throws IOException { - ByteBufOutputStream bout = new ByteBufOutputStream(Unpooled.buffer(estimatedLength)); - ObjectOutputStream oout = new CompactObjectOutputStream(bout); - oout.writeObject(obj); - oout.flush(); - oout.close(); + ByteBuf buf = Unpooled.buffer(estimatedLength); + try { + ObjectOutputStream oout = new CompactObjectOutputStream(new ByteBufOutputStream(buf)); + try { + oout.writeObject(obj); + oout.flush(); + } finally { + oout.close(); + } - ByteBuf buffer = bout.buffer(); - int objectSize = buffer.readableBytes(); - writeInt(objectSize); - buffer.getBytes(0, this, objectSize); + int objectSize = buf.readableBytes(); + writeInt(objectSize); + buf.getBytes(0, this, objectSize); + } finally { + buf.release(); + } } @Override diff --git a/codec/src/test/java/io/netty/handler/codec/compression/AbstractEncoderTest.java b/codec/src/test/java/io/netty/handler/codec/compression/AbstractEncoderTest.java index 632c9dd5c6..71c210f5de 100644 --- a/codec/src/test/java/io/netty/handler/codec/compression/AbstractEncoderTest.java +++ b/codec/src/test/java/io/netty/handler/codec/compression/AbstractEncoderTest.java @@ -120,8 +120,6 @@ public abstract class AbstractEncoderTest extends AbstractCompressionTest { while ((msg = channel.readOutbound()) != null) { compressed.addComponent(true, msg); } - ByteBuf decompressed = decompress(compressed, dataLength); - compressed.release(); - return decompressed; + return decompress(compressed, dataLength); } } diff --git a/codec/src/test/java/io/netty/handler/codec/compression/Bzip2EncoderTest.java b/codec/src/test/java/io/netty/handler/codec/compression/Bzip2EncoderTest.java index 65c87c58dc..2ccb14bc27 100644 --- a/codec/src/test/java/io/netty/handler/codec/compression/Bzip2EncoderTest.java +++ b/codec/src/test/java/io/netty/handler/codec/compression/Bzip2EncoderTest.java @@ -35,21 +35,28 @@ public class Bzip2EncoderTest extends AbstractEncoderTest { @Override protected ByteBuf decompress(ByteBuf compressed, int originalLength) throws Exception { - InputStream is = new ByteBufInputStream(compressed); - BZip2CompressorInputStream bzip2Is = new BZip2CompressorInputStream(is); - + InputStream is = new ByteBufInputStream(compressed, true); + BZip2CompressorInputStream bzip2Is = null; byte[] decompressed = new byte[originalLength]; - int remaining = originalLength; - while (remaining > 0) { - int read = bzip2Is.read(decompressed, originalLength - remaining, remaining); - if (read > 0) { - remaining -= read; + try { + bzip2Is = new BZip2CompressorInputStream(is); + int remaining = originalLength; + while (remaining > 0) { + int read = bzip2Is.read(decompressed, originalLength - remaining, remaining); + if (read > 0) { + remaining -= read; + } else { + break; + } + } + assertEquals(-1, bzip2Is.read()); + } finally { + if (bzip2Is != null) { + bzip2Is.close(); } else { - break; + is.close(); } } - assertEquals(-1, bzip2Is.read()); - bzip2Is.close(); return Unpooled.wrappedBuffer(decompressed); } diff --git a/codec/src/test/java/io/netty/handler/codec/compression/Lz4FrameEncoderTest.java b/codec/src/test/java/io/netty/handler/codec/compression/Lz4FrameEncoderTest.java index 9be2d8eb40..fc07f2e608 100644 --- a/codec/src/test/java/io/netty/handler/codec/compression/Lz4FrameEncoderTest.java +++ b/codec/src/test/java/io/netty/handler/codec/compression/Lz4FrameEncoderTest.java @@ -34,21 +34,28 @@ public class Lz4FrameEncoderTest extends AbstractEncoderTest { @Override protected ByteBuf decompress(ByteBuf compressed, int originalLength) throws Exception { - InputStream is = new ByteBufInputStream(compressed); - LZ4BlockInputStream lz4Is = new LZ4BlockInputStream(is); - + InputStream is = new ByteBufInputStream(compressed, true); + LZ4BlockInputStream lz4Is = null; byte[] decompressed = new byte[originalLength]; - int remaining = originalLength; - while (remaining > 0) { - int read = lz4Is.read(decompressed, originalLength - remaining, remaining); - if (read > 0) { - remaining -= read; + try { + lz4Is = new LZ4BlockInputStream(is); + int remaining = originalLength; + while (remaining > 0) { + int read = lz4Is.read(decompressed, originalLength - remaining, remaining); + if (read > 0) { + remaining -= read; + } else { + break; + } + } + assertEquals(-1, lz4Is.read()); + } finally { + if (lz4Is != null) { + lz4Is.close(); } else { - break; + is.close(); } } - assertEquals(-1, lz4Is.read()); - lz4Is.close(); return Unpooled.wrappedBuffer(decompressed); } diff --git a/codec/src/test/java/io/netty/handler/codec/compression/LzfEncoderTest.java b/codec/src/test/java/io/netty/handler/codec/compression/LzfEncoderTest.java index be14e04e96..0b598dbc0b 100644 --- a/codec/src/test/java/io/netty/handler/codec/compression/LzfEncoderTest.java +++ b/codec/src/test/java/io/netty/handler/codec/compression/LzfEncoderTest.java @@ -31,6 +31,7 @@ public class LzfEncoderTest extends AbstractEncoderTest { protected ByteBuf decompress(ByteBuf compressed, int originalLength) throws Exception { byte[] compressedArray = new byte[compressed.readableBytes()]; compressed.readBytes(compressedArray); + compressed.release(); byte[] decompressed = LZFDecoder.decode(compressedArray); return Unpooled.wrappedBuffer(decompressed); diff --git a/codec/src/test/java/io/netty/handler/codec/compression/LzmaFrameEncoderTest.java b/codec/src/test/java/io/netty/handler/codec/compression/LzmaFrameEncoderTest.java index d0fbd069b5..62c07a8a88 100644 --- a/codec/src/test/java/io/netty/handler/codec/compression/LzmaFrameEncoderTest.java +++ b/codec/src/test/java/io/netty/handler/codec/compression/LzmaFrameEncoderTest.java @@ -80,21 +80,28 @@ public class LzmaFrameEncoderTest extends AbstractEncoderTest { @Override protected ByteBuf decompress(ByteBuf compressed, int originalLength) throws Exception { - InputStream is = new ByteBufInputStream(compressed); - LzmaInputStream lzmaIs = new LzmaInputStream(is, new Decoder()); - + InputStream is = new ByteBufInputStream(compressed, true); + LzmaInputStream lzmaIs = null; byte[] decompressed = new byte[originalLength]; - int remaining = originalLength; - while (remaining > 0) { - int read = lzmaIs.read(decompressed, originalLength - remaining, remaining); - if (read > 0) { - remaining -= read; + try { + lzmaIs = new LzmaInputStream(is, new Decoder()); + int remaining = originalLength; + while (remaining > 0) { + int read = lzmaIs.read(decompressed, originalLength - remaining, remaining); + if (read > 0) { + remaining -= read; + } else { + break; + } + } + assertEquals(-1, lzmaIs.read()); + } finally { + if (lzmaIs != null) { + lzmaIs.close(); } else { - break; + is.close(); } } - assertEquals(-1, lzmaIs.read()); - lzmaIs.close(); return Unpooled.wrappedBuffer(decompressed); } diff --git a/codec/src/test/java/io/netty/handler/codec/compression/ZlibTest.java b/codec/src/test/java/io/netty/handler/codec/compression/ZlibTest.java index 9afe53859e..b9d6dd81f0 100644 --- a/codec/src/test/java/io/netty/handler/codec/compression/ZlibTest.java +++ b/codec/src/test/java/io/netty/handler/codec/compression/ZlibTest.java @@ -300,16 +300,19 @@ public abstract class ZlibTest { } ByteBuf decoded = Unpooled.buffer(); - GZIPInputStream stream = new GZIPInputStream(new ByteBufInputStream(encoded)); - byte[] buf = new byte[8192]; - for (;;) { - int readBytes = stream.read(buf); - if (readBytes < 0) { - break; + GZIPInputStream stream = new GZIPInputStream(new ByteBufInputStream(encoded, true)); + try { + byte[] buf = new byte[8192]; + for (;;) { + int readBytes = stream.read(buf); + if (readBytes < 0) { + break; + } + decoded.writeBytes(buf, 0, readBytes); } - decoded.writeBytes(buf, 0, readBytes); + } finally { + stream.close(); } - stream.close(); if (data != null) { assertEquals(Unpooled.wrappedBuffer(data), decoded); @@ -317,7 +320,6 @@ public abstract class ZlibTest { assertFalse(decoded.isReadable()); } - encoded.release(); decoded.release(); } diff --git a/handler/src/main/java/io/netty/handler/ssl/SslContext.java b/handler/src/main/java/io/netty/handler/ssl/SslContext.java index 9372ba87b8..4dbf8042f2 100644 --- a/handler/src/main/java/io/netty/handler/ssl/SslContext.java +++ b/handler/src/main/java/io/netty/handler/ssl/SslContext.java @@ -1010,13 +1010,24 @@ public abstract class SslContext { CertificateFactory cf = CertificateFactory.getInstance("X.509"); X509Certificate[] x509Certs = new X509Certificate[certs.length]; + int i = 0; try { - for (int i = 0; i < certs.length; i++) { - x509Certs[i] = (X509Certificate) cf.generateCertificate(new ByteBufInputStream(certs[i])); + for (; i < certs.length; i++) { + InputStream is = new ByteBufInputStream(certs[i], true); + try { + x509Certs[i] = (X509Certificate) cf.generateCertificate(is); + } finally { + try { + is.close(); + } catch (IOException e) { + // This is not expected to happen, but re-throw in case it does. + throw new RuntimeException(e); + } + } } } finally { - for (ByteBuf buf: certs) { - buf.release(); + for (; i < certs.length; i++) { + certs[i].release(); } } return x509Certs; diff --git a/transport/src/test/java/io/netty/channel/DefaultChannelIdTest.java b/transport/src/test/java/io/netty/channel/DefaultChannelIdTest.java index 699d3a1fed..26c085b596 100644 --- a/transport/src/test/java/io/netty/channel/DefaultChannelIdTest.java +++ b/transport/src/test/java/io/netty/channel/DefaultChannelIdTest.java @@ -25,8 +25,11 @@ import org.junit.Test; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; -import static org.hamcrest.CoreMatchers.*; -import static org.junit.Assert.*; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.not; +import static org.hamcrest.CoreMatchers.sameInstance; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; @SuppressWarnings("DynamicRegexReplaceableByCompiledPattern") public class DefaultChannelIdTest { @@ -63,11 +66,19 @@ public class DefaultChannelIdTest { ByteBuf buf = Unpooled.buffer(); ObjectOutputStream out = new ObjectOutputStream(new ByteBufOutputStream(buf)); - out.writeObject(a); - out.flush(); + try { + out.writeObject(a); + out.flush(); + } finally { + out.close(); + } - ObjectInputStream in = new ObjectInputStream(new ByteBufInputStream(buf)); - b = (ChannelId) in.readObject(); + ObjectInputStream in = new ObjectInputStream(new ByteBufInputStream(buf, true)); + try { + b = (ChannelId) in.readObject(); + } finally { + in.close(); + } assertThat(a, is(b)); assertThat(a, is(not(sameInstance(b)))); diff --git a/transport/src/test/java/io/netty/channel/embedded/EmbeddedChannelIdTest.java b/transport/src/test/java/io/netty/channel/embedded/EmbeddedChannelIdTest.java index 3755507c95..26ca1b9ef4 100644 --- a/transport/src/test/java/io/netty/channel/embedded/EmbeddedChannelIdTest.java +++ b/transport/src/test/java/io/netty/channel/embedded/EmbeddedChannelIdTest.java @@ -15,17 +15,17 @@ */ package io.netty.channel.embedded; -import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; - -import org.junit.Assert; -import org.junit.Test; - +import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufInputStream; import io.netty.buffer.ByteBufOutputStream; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelId; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; public class EmbeddedChannelIdTest { @@ -34,14 +34,21 @@ public class EmbeddedChannelIdTest { // test that a deserialized instance works the same as a normal instance (issue #2869) ChannelId normalInstance = EmbeddedChannelId.INSTANCE; - ByteBufOutputStream buffer = new ByteBufOutputStream(Unpooled.buffer()); - ObjectOutputStream outStream = new ObjectOutputStream(buffer); - outStream.writeObject(normalInstance); - outStream.close(); + ByteBuf buf = Unpooled.buffer(); + ObjectOutputStream outStream = new ObjectOutputStream(new ByteBufOutputStream(buf)); + try { + outStream.writeObject(normalInstance); + } finally { + outStream.close(); + } - ObjectInputStream inStream = new ObjectInputStream(new ByteBufInputStream(buffer.buffer())); - ChannelId deserializedInstance = (ChannelId) inStream.readObject(); - inStream.close(); + ObjectInputStream inStream = new ObjectInputStream(new ByteBufInputStream(buf, true)); + final ChannelId deserializedInstance; + try { + deserializedInstance = (ChannelId) inStream.readObject(); + } finally { + inStream.close(); + } Assert.assertEquals(normalInstance, deserializedInstance); Assert.assertEquals(normalInstance.hashCode(), deserializedInstance.hashCode());