diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollStreamChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollStreamChannel.java index b23fc769e4..8dc970b61d 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollStreamChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollStreamChannel.java @@ -69,7 +69,7 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel { private ChannelPromise connectPromise; private ScheduledFuture connectTimeoutFuture; private SocketAddress requestedRemoteAddress; - private final Queue spliceQueue = PlatformDependent.newMpscQueue(); + private Queue spliceQueue; // Lazy init these if we need to splice(...) private FileDescriptor pipeIn; @@ -172,8 +172,7 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel { if (!isOpen()) { promise.tryFailure(CLOSED_CHANNEL_EXCEPTION); } else { - SpliceInTask task = new SpliceInChannelTask(ch, len, checkNotNull(promise, "promise")); - spliceQueue.add(task); + addToSpliceQueue(new SpliceInChannelTask(ch, len, promise)); failSpliceIfClosed(promise); } return promise; @@ -190,6 +189,7 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel { *
  • {@link EpollChannelConfig#getEpollMode()} must be {@link EpollMode#LEVEL_TRIGGERED} for this * {@link AbstractEpollStreamChannel}
  • *
  • the {@link FileDescriptor} will not be closed after the {@link ChannelFuture} is notified
  • + *
  • this channel must be registered to an event loop or {@link IllegalStateException} will be thrown.
  • * */ public final ChannelFuture spliceTo(final FileDescriptor ch, final int offset, final int len) { @@ -207,6 +207,7 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel { *
  • {@link EpollChannelConfig#getEpollMode()} must be {@link EpollMode#LEVEL_TRIGGERED} for this * {@link AbstractEpollStreamChannel}
  • *
  • the {@link FileDescriptor} will not be closed after the {@link ChannelPromise} is notified
  • + *
  • this channel must be registered to an event loop or {@link IllegalStateException} will be thrown.
  • * */ public final ChannelFuture spliceTo(final FileDescriptor ch, final int offset, final int len, @@ -224,8 +225,7 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel { if (!isOpen()) { promise.tryFailure(CLOSED_CHANNEL_EXCEPTION); } else { - SpliceInTask task = new SpliceFdTask(ch, offset, len, checkNotNull(promise, "promise")); - spliceQueue.add(task); + addToSpliceQueue(new SpliceFdTask(ch, offset, len, promise)); failSpliceIfClosed(promise); } return promise; @@ -562,6 +562,9 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel { } private void clearSpliceQueue() { + if (spliceQueue == null) { + return; + } for (;;) { SpliceInTask task = spliceQueue.poll(); if (task == null) { @@ -796,17 +799,19 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel { int messages = 0; int totalReadAmount = 0; do { - SpliceInTask spliceTask = spliceQueue.peek(); - if (spliceTask != null) { - if (spliceTask.spliceIn(allocHandle)) { - // We need to check if it is still active as if not we removed all SpliceTasks in - // doClose(...) - if (isActive()) { - spliceQueue.remove(); + if (spliceQueue != null) { + SpliceInTask spliceTask = spliceQueue.peek(); + if (spliceTask != null) { + if (spliceTask.spliceIn(allocHandle)) { + // We need to check if it is still active as if not we removed all SpliceTasks in + // doClose(...) + if (isActive()) { + spliceQueue.remove(); + } + continue; + } else { + break; } - continue; - } else { - break; } } @@ -880,6 +885,27 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel { } } + private void addToSpliceQueue(final SpliceInTask task) { + EventLoop eventLoop = eventLoop(); + if (eventLoop.inEventLoop()) { + addToSpliceQueue0(task); + } else { + eventLoop.execute(new OneTimeTask() { + @Override + public void run() { + addToSpliceQueue0(task); + } + }); + } + } + + private void addToSpliceQueue0(SpliceInTask task) { + if (spliceQueue == null) { + spliceQueue = PlatformDependent.newMpscQueue(); + } + spliceQueue.add(task); + } + protected abstract class SpliceInTask extends MpscLinkedQueueNode { final ChannelPromise promise; int len;