Lazy Initialization of epoll splice queue

Motivation:
AbstractEpollStreamChannel has a queue which collects splice events. Splice is assumed not to be the most common use case of this class and thus the splice queue could be initialized in a lazy fashion to save memory. This becomes more significant when the number of connections grows.

Modifications:
- AbstractEpollStreamChannel.spliceQueue will be initialized in a lazy fashion

Result:
Less memory consumption for most use cases
This commit is contained in:
Scott Mitchell 2015-11-19 22:52:26 -08:00
parent c08c965117
commit 6058067c26

View File

@ -69,7 +69,7 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel {
private ChannelPromise connectPromise; private ChannelPromise connectPromise;
private ScheduledFuture<?> connectTimeoutFuture; private ScheduledFuture<?> connectTimeoutFuture;
private SocketAddress requestedRemoteAddress; private SocketAddress requestedRemoteAddress;
private final Queue<SpliceInTask> spliceQueue = PlatformDependent.newMpscQueue(); private Queue<SpliceInTask> spliceQueue;
// Lazy init these if we need to splice(...) // Lazy init these if we need to splice(...)
private FileDescriptor pipeIn; private FileDescriptor pipeIn;
@ -172,8 +172,7 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel {
if (!isOpen()) { if (!isOpen()) {
promise.tryFailure(CLOSED_CHANNEL_EXCEPTION); promise.tryFailure(CLOSED_CHANNEL_EXCEPTION);
} else { } else {
SpliceInTask task = new SpliceInChannelTask(ch, len, checkNotNull(promise, "promise")); addToSpliceQueue(new SpliceInChannelTask(ch, len, promise));
spliceQueue.add(task);
failSpliceIfClosed(promise); failSpliceIfClosed(promise);
} }
return promise; return promise;
@ -190,6 +189,7 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel {
* <li>{@link EpollChannelConfig#getEpollMode()} must be {@link EpollMode#LEVEL_TRIGGERED} for this * <li>{@link EpollChannelConfig#getEpollMode()} must be {@link EpollMode#LEVEL_TRIGGERED} for this
* {@link AbstractEpollStreamChannel}</li> * {@link AbstractEpollStreamChannel}</li>
* <li>the {@link FileDescriptor} will not be closed after the {@link ChannelFuture} is notified</li> * <li>the {@link FileDescriptor} will not be closed after the {@link ChannelFuture} is notified</li>
* <li>this channel must be registered to an event loop or {@link IllegalStateException} will be thrown.</li>
* </ul> * </ul>
*/ */
public final ChannelFuture spliceTo(final FileDescriptor ch, final int offset, final int len) { public final ChannelFuture spliceTo(final FileDescriptor ch, final int offset, final int len) {
@ -207,6 +207,7 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel {
* <li>{@link EpollChannelConfig#getEpollMode()} must be {@link EpollMode#LEVEL_TRIGGERED} for this * <li>{@link EpollChannelConfig#getEpollMode()} must be {@link EpollMode#LEVEL_TRIGGERED} for this
* {@link AbstractEpollStreamChannel}</li> * {@link AbstractEpollStreamChannel}</li>
* <li>the {@link FileDescriptor} will not be closed after the {@link ChannelPromise} is notified</li> * <li>the {@link FileDescriptor} will not be closed after the {@link ChannelPromise} is notified</li>
* <li>this channel must be registered to an event loop or {@link IllegalStateException} will be thrown.</li>
* </ul> * </ul>
*/ */
public final ChannelFuture spliceTo(final FileDescriptor ch, final int offset, final int len, public final ChannelFuture spliceTo(final FileDescriptor ch, final int offset, final int len,
@ -224,8 +225,7 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel {
if (!isOpen()) { if (!isOpen()) {
promise.tryFailure(CLOSED_CHANNEL_EXCEPTION); promise.tryFailure(CLOSED_CHANNEL_EXCEPTION);
} else { } else {
SpliceInTask task = new SpliceFdTask(ch, offset, len, checkNotNull(promise, "promise")); addToSpliceQueue(new SpliceFdTask(ch, offset, len, promise));
spliceQueue.add(task);
failSpliceIfClosed(promise); failSpliceIfClosed(promise);
} }
return promise; return promise;
@ -562,6 +562,9 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel {
} }
private void clearSpliceQueue() { private void clearSpliceQueue() {
if (spliceQueue == null) {
return;
}
for (;;) { for (;;) {
SpliceInTask task = spliceQueue.poll(); SpliceInTask task = spliceQueue.poll();
if (task == null) { if (task == null) {
@ -796,17 +799,19 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel {
int messages = 0; int messages = 0;
int totalReadAmount = 0; int totalReadAmount = 0;
do { do {
SpliceInTask spliceTask = spliceQueue.peek(); if (spliceQueue != null) {
if (spliceTask != null) { SpliceInTask spliceTask = spliceQueue.peek();
if (spliceTask.spliceIn(allocHandle)) { if (spliceTask != null) {
// We need to check if it is still active as if not we removed all SpliceTasks in if (spliceTask.spliceIn(allocHandle)) {
// doClose(...) // We need to check if it is still active as if not we removed all SpliceTasks in
if (isActive()) { // doClose(...)
spliceQueue.remove(); if (isActive()) {
spliceQueue.remove();
}
continue;
} else {
break;
} }
continue;
} else {
break;
} }
} }
@ -880,6 +885,27 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel {
} }
} }
private void addToSpliceQueue(final SpliceInTask task) {
EventLoop eventLoop = eventLoop();
if (eventLoop.inEventLoop()) {
addToSpliceQueue0(task);
} else {
eventLoop.execute(new OneTimeTask() {
@Override
public void run() {
addToSpliceQueue0(task);
}
});
}
}
private void addToSpliceQueue0(SpliceInTask task) {
if (spliceQueue == null) {
spliceQueue = PlatformDependent.newMpscQueue();
}
spliceQueue.add(task);
}
protected abstract class SpliceInTask extends MpscLinkedQueueNode<SpliceInTask> { protected abstract class SpliceInTask extends MpscLinkedQueueNode<SpliceInTask> {
final ChannelPromise promise; final ChannelPromise promise;
int len; int len;