diff --git a/transport/src/main/java/io/netty/channel/embedded/EmbeddedChannel.java b/transport/src/main/java/io/netty/channel/embedded/EmbeddedChannel.java index 6cd5f5063c..6bb2eb7d9d 100644 --- a/transport/src/main/java/io/netty/channel/embedded/EmbeddedChannel.java +++ b/transport/src/main/java/io/netty/channel/embedded/EmbeddedChannel.java @@ -57,8 +57,9 @@ public class EmbeddedChannel extends AbstractChannel { private final EmbeddedEventLoop loop = new EmbeddedEventLoop(); private final ChannelConfig config = new DefaultChannelConfig(this); - private final Queue inboundMessages = new ArrayDeque(); - private final Queue outboundMessages = new ArrayDeque(); + + private Queue inboundMessages; + private Queue outboundMessages; private Throwable lastException; private State state; @@ -136,6 +137,9 @@ public class EmbeddedChannel extends AbstractChannel { * Returns the {@link Queue} which holds all the {@link Object}s that were received by this {@link Channel}. */ public Queue inboundMessages() { + if (inboundMessages == null) { + inboundMessages = new ArrayDeque(); + } return inboundMessages; } @@ -151,6 +155,9 @@ public class EmbeddedChannel extends AbstractChannel { * Returns the {@link Queue} which holds all the {@link Object}s that were written by this {@link Channel}. */ public Queue outboundMessages() { + if (outboundMessages == null) { + outboundMessages = new ArrayDeque(); + } return outboundMessages; } @@ -167,7 +174,7 @@ public class EmbeddedChannel extends AbstractChannel { */ @SuppressWarnings("unchecked") public T readInbound() { - return (T) inboundMessages.poll(); + return (T) poll(inboundMessages); } /** @@ -175,7 +182,7 @@ public class EmbeddedChannel extends AbstractChannel { */ @SuppressWarnings("unchecked") public T readOutbound() { - return (T) outboundMessages.poll(); + return (T) poll(outboundMessages); } /** @@ -188,7 +195,7 @@ public class EmbeddedChannel extends AbstractChannel { public boolean writeInbound(Object... msgs) { ensureOpen(); if (msgs.length == 0) { - return !inboundMessages.isEmpty(); + return isNotEmpty(inboundMessages); } ChannelPipeline p = pipeline(); @@ -198,7 +205,7 @@ public class EmbeddedChannel extends AbstractChannel { p.fireChannelReadComplete(); runPendingTasks(); checkException(); - return !inboundMessages.isEmpty(); + return isNotEmpty(inboundMessages); } /** @@ -210,7 +217,7 @@ public class EmbeddedChannel extends AbstractChannel { public boolean writeOutbound(Object... msgs) { ensureOpen(); if (msgs.length == 0) { - return !outboundMessages.isEmpty(); + return isNotEmpty(outboundMessages); } RecyclableArrayList futures = RecyclableArrayList.newInstance(msgs.length); @@ -235,7 +242,7 @@ public class EmbeddedChannel extends AbstractChannel { runPendingTasks(); checkException(); - return !outboundMessages.isEmpty(); + return isNotEmpty(outboundMessages); } finally { futures.recycle(); } @@ -255,7 +262,15 @@ public class EmbeddedChannel extends AbstractChannel { checkException(); - return !inboundMessages.isEmpty() || !outboundMessages.isEmpty(); + return isNotEmpty(inboundMessages) || isNotEmpty(outboundMessages); + } + + private static boolean isNotEmpty(Queue queue) { + return queue != null && !queue.isEmpty(); + } + + private static Object poll(Queue queue) { + return queue != null ? queue.poll() : null; } /** @@ -378,7 +393,7 @@ public class EmbeddedChannel extends AbstractChannel { } ReferenceCountUtil.retain(msg); - outboundMessages.add(msg); + outboundMessages().add(msg); in.remove(); } } @@ -393,7 +408,7 @@ public class EmbeddedChannel extends AbstractChannel { private final class LastInboundHandler extends ChannelHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { - inboundMessages.add(msg); + inboundMessages().add(msg); } @Override