From 4cd7e625556f86eb25a4a4cfae20ed0d34aaf5ed Mon Sep 17 00:00:00 2001 From: Trustin Lee Date: Thu, 18 Jul 2013 23:26:45 +0900 Subject: [PATCH] Make ChannelOutboundBuffer recycled --- .../io/netty/channel/AbstractChannel.java | 5 +- .../netty/channel/ChannelOutboundBuffer.java | 47 ++++++++++++++++--- 2 files changed, 45 insertions(+), 7 deletions(-) diff --git a/transport/src/main/java/io/netty/channel/AbstractChannel.java b/transport/src/main/java/io/netty/channel/AbstractChannel.java index c1d854b87a..84933a71d5 100644 --- a/transport/src/main/java/io/netty/channel/AbstractChannel.java +++ b/transport/src/main/java/io/netty/channel/AbstractChannel.java @@ -51,7 +51,6 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha private final long hashCode = ThreadLocalRandom.current().nextLong(); private final Unsafe unsafe; private final DefaultChannelPipeline pipeline; - private final ChannelOutboundBuffer outboundBuffer = new ChannelOutboundBuffer(this); private final ChannelFuture succeededFuture = new SucceededChannelFuture(this, null); private final VoidChannelPromise voidPromise = new VoidChannelPromise(this, true); private final VoidChannelPromise unsafeVoidPromise = new VoidChannelPromise(this, false); @@ -62,6 +61,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha private volatile EventLoop eventLoop; private volatile boolean registered; + private ChannelOutboundBuffer outboundBuffer = ChannelOutboundBuffer.newInstance(this); private boolean inFlush0; /** Cache for the string representation of this channel */ @@ -517,8 +517,11 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha } // fail all queued messages + ChannelOutboundBuffer outboundBuffer = AbstractChannel.this.outboundBuffer; outboundBuffer.failFlushed(CLOSED_CHANNEL_EXCEPTION); outboundBuffer.failUnflushed(CLOSED_CHANNEL_EXCEPTION); + outboundBuffer.recycle(); + AbstractChannel.this.outboundBuffer = null; if (wasActive && !isActive()) { invokeLater(new Runnable() { diff --git a/transport/src/main/java/io/netty/channel/ChannelOutboundBuffer.java b/transport/src/main/java/io/netty/channel/ChannelOutboundBuffer.java index 4c6004554c..10f8d478d2 100644 --- a/transport/src/main/java/io/netty/channel/ChannelOutboundBuffer.java +++ b/transport/src/main/java/io/netty/channel/ChannelOutboundBuffer.java @@ -20,6 +20,8 @@ package io.netty.channel; import io.netty.buffer.ByteBuf; +import io.netty.util.Recycler; +import io.netty.util.Recycler.Handle; import io.netty.util.ReferenceCountUtil; import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; @@ -34,7 +36,21 @@ public final class ChannelOutboundBuffer { private static final int MIN_INITIAL_CAPACITY = 8; - private final AbstractChannel channel; + private static final Recycler RECYCLER = new Recycler() { + @Override + protected ChannelOutboundBuffer newObject(Handle handle) { + return new ChannelOutboundBuffer(handle); + } + }; + + static ChannelOutboundBuffer newInstance(AbstractChannel channel) { + ChannelOutboundBuffer buffer = RECYCLER.get(); + buffer.channel = channel; + return buffer; + } + + private final Handle handle; + private AbstractChannel channel; // Flushed messages are stored in a circulas queue. private Object[] flushed; @@ -63,12 +79,11 @@ public final class ChannelOutboundBuffer { @SuppressWarnings({ "unused", "FieldMayBeFinal" }) private volatile int writable = 1; - ChannelOutboundBuffer(AbstractChannel channel) { - this(channel, MIN_INITIAL_CAPACITY << 1); + private ChannelOutboundBuffer(Handle handle) { + this(handle, MIN_INITIAL_CAPACITY << 1); } - @SuppressWarnings("unchecked") - ChannelOutboundBuffer(AbstractChannel channel, int initialCapacity) { + private ChannelOutboundBuffer(Handle handle, int initialCapacity) { if (initialCapacity < 0) { throw new IllegalArgumentException("initialCapacity: " + initialCapacity + " (expected: >= 0)"); } @@ -89,7 +104,7 @@ public final class ChannelOutboundBuffer { initialCapacity = MIN_INITIAL_CAPACITY; } - this.channel = channel; + this.handle = handle; flushed = new Object[initialCapacity]; flushedPromises = new ChannelPromise[initialCapacity]; @@ -103,6 +118,20 @@ public final class ChannelOutboundBuffer { unflushedTotals = new long[initialCapacity]; } + void recycle() { + if (head != tail) { + throw new IllegalStateException(); + } + if (unflushedCount != 0) { + throw new IllegalStateException(); + } + if (pendingOutboundBytes != 0) { + throw new IllegalStateException(); + } + + RECYCLER.recycle(this, handle); + } + void addMessage(Object msg, ChannelPromise promise) { Object[] unflushed = this.unflushed; int unflushedCount = this.unflushedCount; @@ -276,6 +305,7 @@ public final class ChannelOutboundBuffer { flushedPromises[head] = null; decrementPendingOutboundBytes(flushedTotals[head]); + flushedTotals[head] = 0; this.head = head + 1 & flushed.length - 1; return true; @@ -296,6 +326,7 @@ public final class ChannelOutboundBuffer { flushedPromises[head] = null; decrementPendingOutboundBytes(flushedTotals[head]); + flushedTotals[head] = 0; this.head = head + 1 & flushed.length - 1; return true; @@ -412,10 +443,14 @@ public final class ChannelOutboundBuffer { try { for (int i = 0; i < unflushedCount; i++) { safeRelease(unflushed[i]); + unflushed[i] = null; safeFail(unflushedPromises[i], cause); + unflushedPromises[i] = null; decrementPendingOutboundBytes(unflushedTotals[i]); + unflushedTotals[i] = 0; } } finally { + this.unflushedCount = 0; inFail = false; } }