Epoll: Don't wake event loop when splicing (#9354)

Motivation

I noticed this while looking at something else.
AbstractEpollStreamChannel::spliceQueue is an MPSC queue but only
accessed from the event loop. So it could be just changed to e.g. an
ArrayDeque. This PR instead reverts to using is as an MPSC queue to
avoid dispatching a task to the EL, as appears was the original
intention.

Modification

Change AbstractEpollStreamChannel::spliceQueue to be volatile and lazily
initialized via double-checked locking. Add tasks directly to the queue
from the public methods rather than possibly waking the EL just to
enqueue.

An alternative is just to change PlatformDependent.newMpscQueue() to new
ArrayDeque() and be done with it :)

Result

Less disruptive channel/fd-splicing.
This commit is contained in:
Nick Hill 2019-07-12 09:06:26 -07:00 committed by Norman Maurer
parent e9fec0a710
commit a6abff75a6

View File

@ -66,9 +66,9 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel im
// meantime. // meantime.
((AbstractEpollUnsafe) unsafe()).flush0(); ((AbstractEpollUnsafe) unsafe()).flush0();
}; };
private Queue<SpliceInTask> spliceQueue;
// Lazy init these if we need to splice(...) // Lazy init these if we need to splice(...)
private volatile Queue<SpliceInTask> spliceQueue;
private FileDescriptor pipeIn; private FileDescriptor pipeIn;
private FileDescriptor pipeOut; private FileDescriptor pipeOut;
@ -646,13 +646,14 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel im
} }
private void clearSpliceQueue() { private void clearSpliceQueue() {
if (spliceQueue == null) { Queue<SpliceInTask> sQueue = spliceQueue;
if (sQueue == null) {
return; return;
} }
ClosedChannelException exception = null; ClosedChannelException exception = null;
for (;;) { for (;;) {
SpliceInTask task = spliceQueue.poll(); SpliceInTask task = sQueue.poll();
if (task == null) { if (task == null) {
break; break;
} }
@ -725,15 +726,16 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel im
ByteBuf byteBuf = null; ByteBuf byteBuf = null;
boolean close = false; boolean close = false;
try { try {
Queue<SpliceInTask> sQueue = null;
do { do {
if (spliceQueue != null) { if (sQueue != null || (sQueue = spliceQueue) != null) {
SpliceInTask spliceTask = spliceQueue.peek(); SpliceInTask spliceTask = sQueue.peek();
if (spliceTask != null) { if (spliceTask != null) {
if (spliceTask.spliceIn(allocHandle)) { if (spliceTask.spliceIn(allocHandle)) {
// We need to check if it is still active as if not we removed all SpliceTasks in // We need to check if it is still active as if not we removed all SpliceTasks in
// doClose(...) // doClose(...)
if (isActive()) { if (isActive()) {
spliceQueue.remove(); sQueue.remove();
} }
continue; continue;
} else { } else {
@ -795,19 +797,16 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel im
} }
private void addToSpliceQueue(final SpliceInTask task) { private void addToSpliceQueue(final SpliceInTask task) {
EventLoop eventLoop = eventLoop(); Queue<SpliceInTask> sQueue = spliceQueue;
if (eventLoop.inEventLoop()) { if (sQueue == null) {
addToSpliceQueue0(task); synchronized (this) {
} else { sQueue = spliceQueue;
eventLoop.execute(() -> addToSpliceQueue0(task)); if (sQueue == null) {
spliceQueue = sQueue = PlatformDependent.newMpscQueue();
}
}
} }
} sQueue.add(task);
private void addToSpliceQueue0(SpliceInTask task) {
if (spliceQueue == null) {
spliceQueue = PlatformDependent.newMpscQueue();
}
spliceQueue.add(task);
} }
protected abstract class SpliceInTask { protected abstract class SpliceInTask {