diff --git a/buffer/src/main/java/io/netty/buffer/DefaultCompositeByteBuf.java b/buffer/src/main/java/io/netty/buffer/DefaultCompositeByteBuf.java index af6e21565e..f23f094168 100644 --- a/buffer/src/main/java/io/netty/buffer/DefaultCompositeByteBuf.java +++ b/buffer/src/main/java/io/netty/buffer/DefaultCompositeByteBuf.java @@ -24,12 +24,14 @@ import java.nio.ByteBuffer; import java.nio.ByteOrder; import java.nio.channels.GatheringByteChannel; import java.nio.channels.ScatteringByteChannel; +import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.ListIterator; +import java.util.Queue; /** @@ -48,6 +50,7 @@ public class DefaultCompositeByteBuf extends AbstractByteBuf implements Composit private Component lastAccessed; private int lastAccessedId; private boolean freed; + private Queue suspendedDeallocations; public DefaultCompositeByteBuf(ByteBufAllocator alloc, int maxNumComponents) { super(Integer.MAX_VALUE); @@ -1259,7 +1262,7 @@ public class DefaultCompositeByteBuf extends AbstractByteBuf implements Composit return result + ", components=" + components.size() + ')'; } - private static final class Component { + private final class Component { final UnsafeByteBuf buf; final int length; final boolean allocatedBySelf; @@ -1282,11 +1285,15 @@ public class DefaultCompositeByteBuf extends AbstractByteBuf implements Composit for (buf = this.buf; buf.unwrap() != null; buf = (UnsafeByteBuf) buf.unwrap()) { continue; } - buf.free(); // We should not get a NPE here. If so, it must be a bug. + + if (suspendedDeallocations == null) { + buf.free(); // We should not get a NPE here. If so, it must be a bug. + } else { + suspendedDeallocations.add(buf); + } } } - @Override public CompositeByteBuf readerIndex(int readerIndex) { return (CompositeByteBuf) super.readerIndex(readerIndex); @@ -1536,6 +1543,7 @@ public class DefaultCompositeByteBuf extends AbstractByteBuf implements Composit discardReadComponents(); } + @Override public boolean isFreed() { return freed; } @@ -1547,11 +1555,33 @@ public class DefaultCompositeByteBuf extends AbstractByteBuf implements Composit } freed = true; + resumeIntermediaryDeallocations(); for (Component c: components) { c.freeIfNecessary(); } } + @Override + public void suspendIntermediaryDeallocations() { + if (suspendedDeallocations == null) { + suspendedDeallocations = new ArrayDeque(2); + } + } + + @Override + public void resumeIntermediaryDeallocations() { + if (suspendedDeallocations == null) { + return; + } + + Queue suspendedDeallocations = this.suspendedDeallocations; + this.suspendedDeallocations = null; + + for (UnsafeByteBuf buf: suspendedDeallocations) { + buf.free(); + } + } + @Override public ByteBuf unwrap() { return null; diff --git a/buffer/src/main/java/io/netty/buffer/DuplicatedByteBuf.java b/buffer/src/main/java/io/netty/buffer/DuplicatedByteBuf.java index 6e35812299..b94b6f047b 100644 --- a/buffer/src/main/java/io/netty/buffer/DuplicatedByteBuf.java +++ b/buffer/src/main/java/io/netty/buffer/DuplicatedByteBuf.java @@ -258,7 +258,11 @@ public class DuplicatedByteBuf extends AbstractByteBuf { } @Override - public void free() { + public void free() { } - } -} + @Override + public void suspendIntermediaryDeallocations() { } + + @Override + public void resumeIntermediaryDeallocations() { } +} \ No newline at end of file diff --git a/buffer/src/main/java/io/netty/buffer/ReadOnlyByteBuf.java b/buffer/src/main/java/io/netty/buffer/ReadOnlyByteBuf.java index 3e1b255400..0cc1584a14 100644 --- a/buffer/src/main/java/io/netty/buffer/ReadOnlyByteBuf.java +++ b/buffer/src/main/java/io/netty/buffer/ReadOnlyByteBuf.java @@ -259,4 +259,10 @@ public class ReadOnlyByteBuf extends AbstractByteBuf { @Override public void free() { } + + @Override + public void suspendIntermediaryDeallocations() { } + + @Override + public void resumeIntermediaryDeallocations() { } } diff --git a/buffer/src/main/java/io/netty/buffer/SlicedByteBuf.java b/buffer/src/main/java/io/netty/buffer/SlicedByteBuf.java index 8c447f4193..4bce3a7ff0 100644 --- a/buffer/src/main/java/io/netty/buffer/SlicedByteBuf.java +++ b/buffer/src/main/java/io/netty/buffer/SlicedByteBuf.java @@ -331,4 +331,10 @@ public class SlicedByteBuf extends AbstractByteBuf { @Override public void free() { } + + @Override + public void suspendIntermediaryDeallocations() { } + + @Override + public void resumeIntermediaryDeallocations() { } } diff --git a/buffer/src/main/java/io/netty/buffer/SwappedByteBuf.java b/buffer/src/main/java/io/netty/buffer/SwappedByteBuf.java index 244eeae125..c0d92e2881 100644 --- a/buffer/src/main/java/io/netty/buffer/SwappedByteBuf.java +++ b/buffer/src/main/java/io/netty/buffer/SwappedByteBuf.java @@ -816,4 +816,14 @@ public class SwappedByteBuf implements UnsafeByteBuf { @Override public void free() { } + + @Override + public void suspendIntermediaryDeallocations() { + buf.suspendIntermediaryDeallocations(); + } + + @Override + public void resumeIntermediaryDeallocations() { + buf.resumeIntermediaryDeallocations(); + } } diff --git a/buffer/src/main/java/io/netty/buffer/UnpooledDirectByteBuf.java b/buffer/src/main/java/io/netty/buffer/UnpooledDirectByteBuf.java index 1e8e5090cc..5c4822ef71 100644 --- a/buffer/src/main/java/io/netty/buffer/UnpooledDirectByteBuf.java +++ b/buffer/src/main/java/io/netty/buffer/UnpooledDirectByteBuf.java @@ -26,6 +26,8 @@ import java.nio.ByteOrder; import java.nio.channels.ClosedChannelException; import java.nio.channels.GatheringByteChannel; import java.nio.channels.ScatteringByteChannel; +import java.util.ArrayDeque; +import java.util.Queue; /** * A NIO {@link ByteBuffer} based buffer. It is recommended to use {@link Unpooled#directBuffer(int)} @@ -72,6 +74,7 @@ public class UnpooledDirectByteBuf extends AbstractByteBuf { private int capacity; private boolean freed; private boolean doNotFree; + private Queue suspendedDeallocations; /** * Creates a new direct buffer. @@ -137,7 +140,11 @@ public class UnpooledDirectByteBuf extends AbstractByteBuf { if (doNotFree) { doNotFree = false; } else { - freeDirect(oldBuffer); + if (suspendedDeallocations == null) { + freeDirect(oldBuffer); + } else { + suspendedDeallocations.add(oldBuffer); + } } } @@ -514,9 +521,31 @@ public class UnpooledDirectByteBuf extends AbstractByteBuf { return; } + resumeIntermediaryDeallocations(); freeDirect(buffer); } + @Override + public void suspendIntermediaryDeallocations() { + if (suspendedDeallocations == null) { + suspendedDeallocations = new ArrayDeque(2); + } + } + + @Override + public void resumeIntermediaryDeallocations() { + if (suspendedDeallocations == null) { + return; + } + + Queue suspendedDeallocations = this.suspendedDeallocations; + this.suspendedDeallocations = null; + + for (ByteBuffer buf: suspendedDeallocations) { + freeDirect(buf); + } + } + @Override public ByteBuf unwrap() { return null; diff --git a/buffer/src/main/java/io/netty/buffer/UnpooledHeapByteBuf.java b/buffer/src/main/java/io/netty/buffer/UnpooledHeapByteBuf.java index 9a28705fde..51a6d5a359 100644 --- a/buffer/src/main/java/io/netty/buffer/UnpooledHeapByteBuf.java +++ b/buffer/src/main/java/io/netty/buffer/UnpooledHeapByteBuf.java @@ -385,6 +385,12 @@ public class UnpooledHeapByteBuf extends AbstractByteBuf { freed = true; } + @Override + public void suspendIntermediaryDeallocations() { } + + @Override + public void resumeIntermediaryDeallocations() { } + @Override public ByteBuf unwrap() { return null; diff --git a/buffer/src/main/java/io/netty/buffer/UnsafeByteBuf.java b/buffer/src/main/java/io/netty/buffer/UnsafeByteBuf.java index 4c81abb7bb..fc28c455bd 100644 --- a/buffer/src/main/java/io/netty/buffer/UnsafeByteBuf.java +++ b/buffer/src/main/java/io/netty/buffer/UnsafeByteBuf.java @@ -53,6 +53,19 @@ public interface UnsafeByteBuf extends ByteBuf { */ void free(); + /** + * Suspends the intermediary deallocation of the internal memory block of this buffer until asked via + * {@link #resumeIntermediaryDeallocations()}. An intermediary deallocation is usually made when the capacity of + * a buffer changes. + */ + void suspendIntermediaryDeallocations(); + + /** + * Resumes the intermediary deallocation of the internal memory block of this buffer, suspended by + * {@link #suspendIntermediaryDeallocations()}. + */ + void resumeIntermediaryDeallocations(); + /** * Return the underlying buffer instance if this buffer is a wrapper. * diff --git a/codec/src/main/java/io/netty/handler/codec/ReplayingDecoderBuffer.java b/codec/src/main/java/io/netty/handler/codec/ReplayingDecoderBuffer.java index 478ee38b05..7737a381a6 100644 --- a/codec/src/main/java/io/netty/handler/codec/ReplayingDecoderBuffer.java +++ b/codec/src/main/java/io/netty/handler/codec/ReplayingDecoderBuffer.java @@ -875,6 +875,16 @@ class ReplayingDecoderBuffer implements UnsafeByteBuf { throw new UnreplayableOperationException(); } + @Override + public void suspendIntermediaryDeallocations() { + throw new UnreplayableOperationException(); + } + + @Override + public void resumeIntermediaryDeallocations() { + throw new UnreplayableOperationException(); + } + @Override public ByteBuf unwrap() { throw new UnreplayableOperationException(); diff --git a/transport/src/main/java/io/netty/channel/socket/aio/AioSocketChannel.java b/transport/src/main/java/io/netty/channel/socket/aio/AioSocketChannel.java index 449da1c60c..efc177af3c 100755 --- a/transport/src/main/java/io/netty/channel/socket/aio/AioSocketChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/aio/AioSocketChannel.java @@ -271,6 +271,7 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne if (asyncWriteInProgress) { // JDK decided to write data (or notify handler) later. + ((UnsafeByteBuf) buf).suspendIntermediaryDeallocations(); break; } @@ -346,7 +347,10 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne @Override protected void completed0(T result, AioSocketChannel channel) { channel.asyncWriteInProgress = false; + ByteBuf buf = channel.unsafe().directOutboundContext().outboundByteBuffer(); + ((UnsafeByteBuf) buf).resumeIntermediaryDeallocations(); + int writtenBytes = result.intValue(); if (writtenBytes > 0) { // Update the readerIndex with the amount of read bytes