Reduce memory usage by EmbeddedChannel

Motivation:

When using an EmbeddedChannel often it either does inbound or outbound processing which means we only often need one queue.

Modifications:

Lazy init the inbound and outbound message queues.

Result:

Less memory usage.
This commit is contained in:
Norman Maurer 2015-07-07 10:27:49 +02:00
parent 513f4054df
commit d295321bb4

View File

@ -53,8 +53,8 @@ public class EmbeddedChannel extends AbstractChannel {
private final ChannelConfig config = new DefaultChannelConfig(this); private final ChannelConfig config = new DefaultChannelConfig(this);
private final SocketAddress localAddress = new EmbeddedSocketAddress(); private final SocketAddress localAddress = new EmbeddedSocketAddress();
private final SocketAddress remoteAddress = new EmbeddedSocketAddress(); private final SocketAddress remoteAddress = new EmbeddedSocketAddress();
private final Queue<Object> inboundMessages = new ArrayDeque<Object>(); private Queue<Object> inboundMessages;
private final Queue<Object> outboundMessages = new ArrayDeque<Object>(); private Queue<Object> outboundMessages;
private Throwable lastException; private Throwable lastException;
private int state; // 0 = OPEN, 1 = ACTIVE, 2 = CLOSED private int state; // 0 = OPEN, 1 = ACTIVE, 2 = CLOSED
@ -125,6 +125,9 @@ public class EmbeddedChannel extends AbstractChannel {
* Returns the {@link Queue} which holds all the {@link Object}s that were received by this {@link Channel}. * Returns the {@link Queue} which holds all the {@link Object}s that were received by this {@link Channel}.
*/ */
public Queue<Object> inboundMessages() { public Queue<Object> inboundMessages() {
if (inboundMessages == null) {
inboundMessages = new ArrayDeque<Object>();
}
return inboundMessages; return inboundMessages;
} }
@ -140,6 +143,9 @@ public class EmbeddedChannel extends AbstractChannel {
* Returns the {@link Queue} which holds all the {@link Object}s that were written by this {@link Channel}. * Returns the {@link Queue} which holds all the {@link Object}s that were written by this {@link Channel}.
*/ */
public Queue<Object> outboundMessages() { public Queue<Object> outboundMessages() {
if (outboundMessages == null) {
outboundMessages = new ArrayDeque<Object>();
}
return outboundMessages; return outboundMessages;
} }
@ -155,14 +161,14 @@ public class EmbeddedChannel extends AbstractChannel {
* Return received data from this {@link Channel} * Return received data from this {@link Channel}
*/ */
public Object readInbound() { public Object readInbound() {
return inboundMessages.poll(); return poll(inboundMessages);
} }
/** /**
* Read data froum the outbound. This may return {@code null} if nothing is readable. * Read data froum the outbound. This may return {@code null} if nothing is readable.
*/ */
public Object readOutbound() { public Object readOutbound() {
return outboundMessages.poll(); return poll(outboundMessages);
} }
/** /**
@ -175,7 +181,7 @@ public class EmbeddedChannel extends AbstractChannel {
public boolean writeInbound(Object... msgs) { public boolean writeInbound(Object... msgs) {
ensureOpen(); ensureOpen();
if (msgs.length == 0) { if (msgs.length == 0) {
return !inboundMessages.isEmpty(); return isNotEmpty(inboundMessages);
} }
ChannelPipeline p = pipeline(); ChannelPipeline p = pipeline();
@ -185,7 +191,7 @@ public class EmbeddedChannel extends AbstractChannel {
p.fireChannelReadComplete(); p.fireChannelReadComplete();
runPendingTasks(); runPendingTasks();
checkException(); checkException();
return !inboundMessages.isEmpty(); return isNotEmpty(inboundMessages);
} }
/** /**
@ -197,7 +203,7 @@ public class EmbeddedChannel extends AbstractChannel {
public boolean writeOutbound(Object... msgs) { public boolean writeOutbound(Object... msgs) {
ensureOpen(); ensureOpen();
if (msgs.length == 0) { if (msgs.length == 0) {
return !outboundMessages.isEmpty(); return isNotEmpty(outboundMessages);
} }
RecyclableArrayList futures = RecyclableArrayList.newInstance(msgs.length); RecyclableArrayList futures = RecyclableArrayList.newInstance(msgs.length);
@ -222,7 +228,7 @@ public class EmbeddedChannel extends AbstractChannel {
runPendingTasks(); runPendingTasks();
checkException(); checkException();
return !outboundMessages.isEmpty(); return isNotEmpty(outboundMessages);
} finally { } finally {
futures.recycle(); futures.recycle();
} }
@ -242,7 +248,15 @@ public class EmbeddedChannel extends AbstractChannel {
checkException(); checkException();
return !inboundMessages.isEmpty() || !outboundMessages.isEmpty(); return isNotEmpty(inboundMessages) || isNotEmpty(outboundMessages);
}
private static boolean isNotEmpty(Queue<Object> queue) {
return queue != null && !queue.isEmpty();
}
private static Object poll(Queue<Object> queue) {
return queue != null ? queue.poll() : null;
} }
/** /**
@ -365,7 +379,7 @@ public class EmbeddedChannel extends AbstractChannel {
} }
ReferenceCountUtil.retain(msg); ReferenceCountUtil.retain(msg);
outboundMessages.add(msg); outboundMessages().add(msg);
in.remove(); in.remove();
} }
} }
@ -380,7 +394,7 @@ public class EmbeddedChannel extends AbstractChannel {
private final class LastInboundHandler extends ChannelInboundHandlerAdapter { private final class LastInboundHandler extends ChannelInboundHandlerAdapter {
@Override @Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
inboundMessages.add(msg); inboundMessages().add(msg);
} }
@Override @Override