diff --git a/handler/src/main/java/io/netty/handler/ssl/SslHandler.java b/handler/src/main/java/io/netty/handler/ssl/SslHandler.java index 709030fd36..199bab2121 100644 --- a/handler/src/main/java/io/netty/handler/ssl/SslHandler.java +++ b/handler/src/main/java/io/netty/handler/ssl/SslHandler.java @@ -1882,7 +1882,7 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH return composite; } return attemptCopyToCumulation(cumulation, next, wrapDataSize) ? cumulation : - composeIntoComposite(alloc, cumulation, next); + copyAndCompose(alloc, cumulation, next); } @Override diff --git a/transport/src/main/java/io/netty/channel/AbstractCoalescingBufferQueue.java b/transport/src/main/java/io/netty/channel/AbstractCoalescingBufferQueue.java index c15e9d5e39..552793ee92 100644 --- a/transport/src/main/java/io/netty/channel/AbstractCoalescingBufferQueue.java +++ b/transport/src/main/java/io/netty/channel/AbstractCoalescingBufferQueue.java @@ -17,16 +17,16 @@ package io.netty.channel; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.CompositeByteBuf; -import io.netty.util.ReferenceCountUtil; -import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.UnstableApi; import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; import java.util.ArrayDeque; +import static io.netty.util.ReferenceCountUtil.safeRelease; import static io.netty.util.internal.ObjectUtil.checkNotNull; import static io.netty.util.internal.ObjectUtil.checkPositiveOrZero; +import static io.netty.util.internal.PlatformDependent.throwException; @UnstableApi public abstract class AbstractCoalescingBufferQueue { @@ -177,10 +177,10 @@ public abstract class AbstractCoalescingBufferQueue { entryBuffer = null; } } catch (Throwable cause) { - ReferenceCountUtil.safeRelease(entryBuffer); - ReferenceCountUtil.safeRelease(toReturn); + safeRelease(entryBuffer); + safeRelease(toReturn); aggregatePromise.setFailure(cause); - PlatformDependent.throwException(cause); + throwException(cause); } decrementReadableBytes(originalBytes - bytes); return toReturn; @@ -276,11 +276,33 @@ public abstract class AbstractCoalescingBufferQueue { composite.addComponent(true, next); } catch (Throwable cause) { composite.release(); - PlatformDependent.throwException(cause); + safeRelease(next); + throwException(cause); } return composite; } + /** + * Compose {@code cumulation} and {@code next} into a new {@link ByteBufAllocator#ioBuffer()}. + * @param alloc The allocator to use to allocate the new buffer. + * @param cumulation The current cumulation. + * @param next The next buffer. + * @return The result of {@code cumulation + next}. + */ + protected final ByteBuf copyAndCompose(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf next) { + ByteBuf newCumulation = alloc.ioBuffer(cumulation.readableBytes() + next.readableBytes()); + try { + newCumulation.writeBytes(cumulation).writeBytes(next); + } catch (Throwable cause) { + newCumulation.release(); + safeRelease(next); + throwException(cause); + } + cumulation.release(); + next.release(); + return newCumulation; + } + /** * Calculate the first {@link ByteBuf} which will be used in subsequent calls to * {@link #compose(ByteBufAllocator, ByteBuf, ByteBuf)}. @@ -313,7 +335,7 @@ public abstract class AbstractCoalescingBufferQueue { } try { if (entry instanceof ByteBuf) { - ReferenceCountUtil.safeRelease(entry); + safeRelease(entry); } else { ((ChannelFutureListener) entry).operationComplete(future); }