Reduce memory usage by EmbeddedChannel
Motivation: When using an EmbeddedChannel often it either does inbound or outbound processing which means we only often need one queue. Modifications: Lazy init the inbound and outbound message queues. Result: Less memory usage.
This commit is contained in:
parent
93b76257ad
commit
d43569fce6
@ -57,8 +57,9 @@ public class EmbeddedChannel extends AbstractChannel {
|
|||||||
|
|
||||||
private final EmbeddedEventLoop loop = new EmbeddedEventLoop();
|
private final EmbeddedEventLoop loop = new EmbeddedEventLoop();
|
||||||
private final ChannelConfig config = new DefaultChannelConfig(this);
|
private final ChannelConfig config = new DefaultChannelConfig(this);
|
||||||
private final Queue<Object> inboundMessages = new ArrayDeque<Object>();
|
|
||||||
private final Queue<Object> outboundMessages = new ArrayDeque<Object>();
|
private Queue<Object> inboundMessages;
|
||||||
|
private Queue<Object> outboundMessages;
|
||||||
private Throwable lastException;
|
private Throwable lastException;
|
||||||
private State state;
|
private State state;
|
||||||
|
|
||||||
@ -136,6 +137,9 @@ public class EmbeddedChannel extends AbstractChannel {
|
|||||||
* Returns the {@link Queue} which holds all the {@link Object}s that were received by this {@link Channel}.
|
* Returns the {@link Queue} which holds all the {@link Object}s that were received by this {@link Channel}.
|
||||||
*/
|
*/
|
||||||
public Queue<Object> inboundMessages() {
|
public Queue<Object> inboundMessages() {
|
||||||
|
if (inboundMessages == null) {
|
||||||
|
inboundMessages = new ArrayDeque<Object>();
|
||||||
|
}
|
||||||
return inboundMessages;
|
return inboundMessages;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -151,6 +155,9 @@ public class EmbeddedChannel extends AbstractChannel {
|
|||||||
* Returns the {@link Queue} which holds all the {@link Object}s that were written by this {@link Channel}.
|
* Returns the {@link Queue} which holds all the {@link Object}s that were written by this {@link Channel}.
|
||||||
*/
|
*/
|
||||||
public Queue<Object> outboundMessages() {
|
public Queue<Object> outboundMessages() {
|
||||||
|
if (outboundMessages == null) {
|
||||||
|
outboundMessages = new ArrayDeque<Object>();
|
||||||
|
}
|
||||||
return outboundMessages;
|
return outboundMessages;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -167,7 +174,7 @@ public class EmbeddedChannel extends AbstractChannel {
|
|||||||
*/
|
*/
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
public <T> T readInbound() {
|
public <T> T readInbound() {
|
||||||
return (T) inboundMessages.poll();
|
return (T) poll(inboundMessages);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -175,7 +182,7 @@ public class EmbeddedChannel extends AbstractChannel {
|
|||||||
*/
|
*/
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
public <T> T readOutbound() {
|
public <T> T readOutbound() {
|
||||||
return (T) outboundMessages.poll();
|
return (T) poll(outboundMessages);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -188,7 +195,7 @@ public class EmbeddedChannel extends AbstractChannel {
|
|||||||
public boolean writeInbound(Object... msgs) {
|
public boolean writeInbound(Object... msgs) {
|
||||||
ensureOpen();
|
ensureOpen();
|
||||||
if (msgs.length == 0) {
|
if (msgs.length == 0) {
|
||||||
return !inboundMessages.isEmpty();
|
return isNotEmpty(inboundMessages);
|
||||||
}
|
}
|
||||||
|
|
||||||
ChannelPipeline p = pipeline();
|
ChannelPipeline p = pipeline();
|
||||||
@ -198,7 +205,7 @@ public class EmbeddedChannel extends AbstractChannel {
|
|||||||
p.fireChannelReadComplete();
|
p.fireChannelReadComplete();
|
||||||
runPendingTasks();
|
runPendingTasks();
|
||||||
checkException();
|
checkException();
|
||||||
return !inboundMessages.isEmpty();
|
return isNotEmpty(inboundMessages);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -210,7 +217,7 @@ public class EmbeddedChannel extends AbstractChannel {
|
|||||||
public boolean writeOutbound(Object... msgs) {
|
public boolean writeOutbound(Object... msgs) {
|
||||||
ensureOpen();
|
ensureOpen();
|
||||||
if (msgs.length == 0) {
|
if (msgs.length == 0) {
|
||||||
return !outboundMessages.isEmpty();
|
return isNotEmpty(outboundMessages);
|
||||||
}
|
}
|
||||||
|
|
||||||
RecyclableArrayList futures = RecyclableArrayList.newInstance(msgs.length);
|
RecyclableArrayList futures = RecyclableArrayList.newInstance(msgs.length);
|
||||||
@ -235,7 +242,7 @@ public class EmbeddedChannel extends AbstractChannel {
|
|||||||
|
|
||||||
runPendingTasks();
|
runPendingTasks();
|
||||||
checkException();
|
checkException();
|
||||||
return !outboundMessages.isEmpty();
|
return isNotEmpty(outboundMessages);
|
||||||
} finally {
|
} finally {
|
||||||
futures.recycle();
|
futures.recycle();
|
||||||
}
|
}
|
||||||
@ -255,7 +262,15 @@ public class EmbeddedChannel extends AbstractChannel {
|
|||||||
|
|
||||||
checkException();
|
checkException();
|
||||||
|
|
||||||
return !inboundMessages.isEmpty() || !outboundMessages.isEmpty();
|
return isNotEmpty(inboundMessages) || isNotEmpty(outboundMessages);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static boolean isNotEmpty(Queue<Object> queue) {
|
||||||
|
return queue != null && !queue.isEmpty();
|
||||||
|
}
|
||||||
|
|
||||||
|
private static Object poll(Queue<Object> queue) {
|
||||||
|
return queue != null ? queue.poll() : null;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -378,7 +393,7 @@ public class EmbeddedChannel extends AbstractChannel {
|
|||||||
}
|
}
|
||||||
|
|
||||||
ReferenceCountUtil.retain(msg);
|
ReferenceCountUtil.retain(msg);
|
||||||
outboundMessages.add(msg);
|
outboundMessages().add(msg);
|
||||||
in.remove();
|
in.remove();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -393,7 +408,7 @@ public class EmbeddedChannel extends AbstractChannel {
|
|||||||
private final class LastInboundHandler extends ChannelHandlerAdapter {
|
private final class LastInboundHandler extends ChannelHandlerAdapter {
|
||||||
@Override
|
@Override
|
||||||
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
|
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
|
||||||
inboundMessages.add(msg);
|
inboundMessages().add(msg);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
Loading…
x
Reference in New Issue
Block a user