diff --git a/transport/src/main/java/io/netty/channel/embedded/EmbeddedChannel.java b/transport/src/main/java/io/netty/channel/embedded/EmbeddedChannel.java index 39ee855656..15f0e2f285 100644 --- a/transport/src/main/java/io/netty/channel/embedded/EmbeddedChannel.java +++ b/transport/src/main/java/io/netty/channel/embedded/EmbeddedChannel.java @@ -53,8 +53,8 @@ public class EmbeddedChannel extends AbstractChannel { private final ChannelConfig config = new DefaultChannelConfig(this); private final SocketAddress localAddress = new EmbeddedSocketAddress(); private final SocketAddress remoteAddress = new EmbeddedSocketAddress(); - private final Queue inboundMessages = new ArrayDeque(); - private final Queue outboundMessages = new ArrayDeque(); + private Queue inboundMessages; + private Queue outboundMessages; private Throwable lastException; private int state; // 0 = OPEN, 1 = ACTIVE, 2 = CLOSED @@ -125,6 +125,9 @@ public class EmbeddedChannel extends AbstractChannel { * Returns the {@link Queue} which holds all the {@link Object}s that were received by this {@link Channel}. */ public Queue inboundMessages() { + if (inboundMessages == null) { + inboundMessages = new ArrayDeque(); + } return inboundMessages; } @@ -140,6 +143,9 @@ public class EmbeddedChannel extends AbstractChannel { * Returns the {@link Queue} which holds all the {@link Object}s that were written by this {@link Channel}. */ public Queue outboundMessages() { + if (outboundMessages == null) { + outboundMessages = new ArrayDeque(); + } return outboundMessages; } @@ -155,14 +161,14 @@ public class EmbeddedChannel extends AbstractChannel { * Return received data from this {@link Channel} */ public Object readInbound() { - return inboundMessages.poll(); + return poll(inboundMessages); } /** * Read data froum the outbound. This may return {@code null} if nothing is readable. */ public Object readOutbound() { - return outboundMessages.poll(); + return poll(outboundMessages); } /** @@ -175,7 +181,7 @@ public class EmbeddedChannel extends AbstractChannel { public boolean writeInbound(Object... msgs) { ensureOpen(); if (msgs.length == 0) { - return !inboundMessages.isEmpty(); + return isNotEmpty(inboundMessages); } ChannelPipeline p = pipeline(); @@ -185,7 +191,7 @@ public class EmbeddedChannel extends AbstractChannel { p.fireChannelReadComplete(); runPendingTasks(); checkException(); - return !inboundMessages.isEmpty(); + return isNotEmpty(inboundMessages); } /** @@ -197,7 +203,7 @@ public class EmbeddedChannel extends AbstractChannel { public boolean writeOutbound(Object... msgs) { ensureOpen(); if (msgs.length == 0) { - return !outboundMessages.isEmpty(); + return isNotEmpty(outboundMessages); } RecyclableArrayList futures = RecyclableArrayList.newInstance(msgs.length); @@ -222,7 +228,7 @@ public class EmbeddedChannel extends AbstractChannel { runPendingTasks(); checkException(); - return !outboundMessages.isEmpty(); + return isNotEmpty(outboundMessages); } finally { futures.recycle(); } @@ -242,7 +248,15 @@ public class EmbeddedChannel extends AbstractChannel { checkException(); - return !inboundMessages.isEmpty() || !outboundMessages.isEmpty(); + return isNotEmpty(inboundMessages) || isNotEmpty(outboundMessages); + } + + private static boolean isNotEmpty(Queue queue) { + return queue != null && !queue.isEmpty(); + } + + private static Object poll(Queue queue) { + return queue != null ? queue.poll() : null; } /** @@ -365,7 +379,7 @@ public class EmbeddedChannel extends AbstractChannel { } ReferenceCountUtil.retain(msg); - outboundMessages.add(msg); + outboundMessages().add(msg); in.remove(); } } @@ -380,7 +394,7 @@ public class EmbeddedChannel extends AbstractChannel { private final class LastInboundHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { - inboundMessages.add(msg); + inboundMessages().add(msg); } @Override