diff --git a/transport/src/main/java/io/netty/channel/local/LocalChannel.java b/transport/src/main/java/io/netty/channel/local/LocalChannel.java index 5cc2b99cf3..dc05768767 100644 --- a/transport/src/main/java/io/netty/channel/local/LocalChannel.java +++ b/transport/src/main/java/io/netty/channel/local/LocalChannel.java @@ -28,14 +28,14 @@ import io.netty.channel.EventLoop; import io.netty.channel.SingleThreadEventLoop; import io.netty.util.ReferenceCountUtil; import io.netty.util.internal.InternalThreadLocalMap; +import io.netty.util.internal.OneTimeTask; +import io.netty.util.internal.PlatformDependent; import java.net.SocketAddress; import java.nio.channels.AlreadyConnectedException; import java.nio.channels.ClosedChannelException; import java.nio.channels.ConnectionPendingException; import java.nio.channels.NotYetConnectedException; -import java.util.ArrayDeque; -import java.util.Collections; import java.util.Queue; /** @@ -50,7 +50,8 @@ public class LocalChannel extends AbstractChannel { private static final int MAX_READER_STACK_DEPTH = 8; private final ChannelConfig config = new DefaultChannelConfig(this); - private final Queue inboundBuffer = new ArrayDeque(); + // To futher optimize this we could write our own SPSC queue. + private final Queue inboundBuffer = PlatformDependent.newMpscQueue(); private final Runnable readTask = new Runnable() { @Override public void run() { @@ -65,7 +66,6 @@ public class LocalChannel extends AbstractChannel { pipeline.fireChannelReadComplete(); } }; - private final Runnable shutdownHook = new Runnable() { @Override public void run() { @@ -288,29 +288,25 @@ public class LocalChannel extends AbstractChannel { final ChannelPipeline peerPipeline = peer.pipeline(); final EventLoop peerLoop = peer.eventLoop(); - if (peerLoop == eventLoop()) { - for (;;) { - Object msg = in.current(); - if (msg == null) { - break; - } - peer.inboundBuffer.add(msg); - ReferenceCountUtil.retain(msg); - in.remove(); + for (;;) { + Object msg = in.current(); + if (msg == null) { + break; } + try { + peer.inboundBuffer.add(ReferenceCountUtil.retain(msg)); + in.remove(); + } catch (Throwable cause) { + in.remove(cause); + } + } + + if (peerLoop == eventLoop()) { finishPeerRead(peer, peerPipeline); } else { - // Use a copy because the original msgs will be recycled by AbstractChannel. - final Object[] msgsCopy = new Object[in.size()]; - for (int i = 0; i < msgsCopy.length; i ++) { - msgsCopy[i] = ReferenceCountUtil.retain(in.current()); - in.remove(); - } - - peerLoop.execute(new Runnable() { + peerLoop.execute(new OneTimeTask() { @Override public void run() { - Collections.addAll(peer.inboundBuffer, msgsCopy); finishPeerRead(peer, peerPipeline); } });