From 6cb6282699b2ae24d36c35af142497a90df6042f Mon Sep 17 00:00:00 2001 From: Scott Mitchell Date: Thu, 19 Nov 2015 22:52:26 -0800 Subject: [PATCH] Lazy Initialization of epoll splice queue Motivation: AbstractEpollStreamChannel has a queue which collects splice events. Splice is assumed not to be the most common use case of this class and thus the splice queue could be initialized in a lazy fashion to save memory. This becomes more significant when the number of connections grows. Modifications: - AbstractEpollStreamChannel.spliceQueue will be initialized in a lazy fashion Result: Less memory consumption for most use cases --- .../epoll/AbstractEpollStreamChannel.java | 56 ++++++++++++++----- 1 file changed, 41 insertions(+), 15 deletions(-) diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollStreamChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollStreamChannel.java index 8e292c0f83..2530ad0aa1 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollStreamChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollStreamChannel.java @@ -69,7 +69,7 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel { private ChannelPromise connectPromise; private ScheduledFuture connectTimeoutFuture; private SocketAddress requestedRemoteAddress; - private final Queue spliceQueue = PlatformDependent.newMpscQueue(); + private Queue spliceQueue; // Lazy init these if we need to splice(...) private FileDescriptor pipeIn; @@ -172,8 +172,7 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel { if (!isOpen()) { promise.tryFailure(CLOSED_CHANNEL_EXCEPTION); } else { - SpliceInTask task = new SpliceInChannelTask(ch, len, checkNotNull(promise, "promise")); - spliceQueue.add(task); + addToSpliceQueue(new SpliceInChannelTask(ch, len, promise)); failSpliceIfClosed(promise); } return promise; @@ -190,6 +189,7 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel { *
  • {@link EpollChannelConfig#getEpollMode()} must be {@link EpollMode#LEVEL_TRIGGERED} for this * {@link AbstractEpollStreamChannel}
  • *
  • the {@link FileDescriptor} will not be closed after the {@link ChannelFuture} is notified
  • + *
  • this channel must be registered to an event loop or {@link IllegalStateException} will be thrown.
  • * */ public final ChannelFuture spliceTo(final FileDescriptor ch, final int offset, final int len) { @@ -207,6 +207,7 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel { *
  • {@link EpollChannelConfig#getEpollMode()} must be {@link EpollMode#LEVEL_TRIGGERED} for this * {@link AbstractEpollStreamChannel}
  • *
  • the {@link FileDescriptor} will not be closed after the {@link ChannelPromise} is notified
  • + *
  • this channel must be registered to an event loop or {@link IllegalStateException} will be thrown.
  • * */ public final ChannelFuture spliceTo(final FileDescriptor ch, final int offset, final int len, @@ -224,8 +225,7 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel { if (!isOpen()) { promise.tryFailure(CLOSED_CHANNEL_EXCEPTION); } else { - SpliceInTask task = new SpliceFdTask(ch, offset, len, checkNotNull(promise, "promise")); - spliceQueue.add(task); + addToSpliceQueue(new SpliceFdTask(ch, offset, len, promise)); failSpliceIfClosed(promise); } return promise; @@ -562,6 +562,9 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel { } private void clearSpliceQueue() { + if (spliceQueue == null) { + return; + } for (;;) { SpliceInTask task = spliceQueue.poll(); if (task == null) { @@ -790,17 +793,19 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel { boolean close = false; try { do { - SpliceInTask spliceTask = spliceQueue.peek(); - if (spliceTask != null) { - if (spliceTask.spliceIn(allocHandle)) { - // We need to check if it is still active as if not we removed all SpliceTasks in - // doClose(...) - if (isActive()) { - spliceQueue.remove(); + if (spliceQueue != null) { + SpliceInTask spliceTask = spliceQueue.peek(); + if (spliceTask != null) { + if (spliceTask.spliceIn(allocHandle)) { + // We need to check if it is still active as if not we removed all SpliceTasks in + // doClose(...) + if (isActive()) { + spliceQueue.remove(); + } + continue; + } else { + break; } - continue; - } else { - break; } } @@ -845,6 +850,27 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel { } } + private void addToSpliceQueue(final SpliceInTask task) { + EventLoop eventLoop = eventLoop(); + if (eventLoop.inEventLoop()) { + addToSpliceQueue0(task); + } else { + eventLoop.execute(new OneTimeTask() { + @Override + public void run() { + addToSpliceQueue0(task); + } + }); + } + } + + private void addToSpliceQueue0(SpliceInTask task) { + if (spliceQueue == null) { + spliceQueue = PlatformDependent.newMpscQueue(); + } + spliceQueue.add(task); + } + protected abstract class SpliceInTask extends MpscLinkedQueueNode { final ChannelPromise promise; int len;