Not wakeup the EventLoop for writes as they will not cause a flush anyway

This commit is contained in:
Norman Maurer 2014-01-30 20:02:15 +01:00
parent f94b563bcd
commit 64c3f58279
3 changed files with 102 additions and 32 deletions

View File

@ -694,11 +694,16 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor {
} }
} }
if (!addTaskWakesUp) { if (!addTaskWakesUp && wakesUpForTask(task)) {
wakeup(inEventLoop); wakeup(inEventLoop);
} }
} }
@SuppressWarnings("unused")
protected boolean wakesUpForTask(Runnable task) {
return true;
}
protected static void reject() { protected static void reject() {
throw new RejectedExecutionException("event executor terminated"); throw new RejectedExecutionException("event executor terminated");
} }

View File

@ -709,7 +709,13 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
buffer.incrementPendingOutboundBytes(size); buffer.incrementPendingOutboundBytes(size);
} }
} }
safeExecute(executor, WriteTask.newInstance(next, msg, size, flush, promise), promise, msg); Runnable task;
if (flush) {
task = WriteAndFlushTask.newInstance(next, msg, size, promise);
} else {
task = WriteTask.newInstance(next, msg, size, promise);
}
safeExecute(executor, task, promise, msg);
} }
} }
@ -862,12 +868,54 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
} }
} }
static final class WriteTask implements Runnable { abstract static class AbstractWriteTask implements Runnable {
private final Recycler.Handle handle;
private DefaultChannelHandlerContext ctx; private DefaultChannelHandlerContext ctx;
private Object msg; private Object msg;
private ChannelPromise promise; private ChannelPromise promise;
private int size; private int size;
private boolean flush;
private AbstractWriteTask(Recycler.Handle handle) {
this.handle = handle;
}
protected static void init(AbstractWriteTask task, DefaultChannelHandlerContext ctx,
Object msg, int size, ChannelPromise promise) {
task.ctx = ctx;
task.msg = msg;
task.promise = promise;
task.size = size;
}
@Override
public final void run() {
try {
if (size > 0) {
ChannelOutboundBuffer buffer = ctx.channel.unsafe().outboundBuffer();
// Check for null as it may be set to null if the channel is closed already
if (buffer != null) {
buffer.decrementPendingOutboundBytes(size);
}
}
write(ctx, msg, promise);
} finally {
// Set to null so the GC can collect them directly
ctx = null;
msg = null;
promise = null;
recycle(handle);
}
}
protected void write(DefaultChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
ctx.invokeWrite(msg, promise);
}
protected abstract void recycle(Recycler.Handle handle);
}
static final class WriteTask extends AbstractWriteTask implements SingleThreadEventLoop.NonWakeupRunnable {
private static final Recycler<WriteTask> RECYCLER = new Recycler<WriteTask>() { private static final Recycler<WriteTask> RECYCLER = new Recycler<WriteTask>() {
@Override @Override
@ -877,44 +925,51 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
}; };
private static WriteTask newInstance( private static WriteTask newInstance(
DefaultChannelHandlerContext ctx, Object msg, int size, boolean flush, ChannelPromise promise) { DefaultChannelHandlerContext ctx, Object msg, int size, ChannelPromise promise) {
WriteTask task = RECYCLER.get(); WriteTask task = RECYCLER.get();
task.ctx = ctx; init(task, ctx, msg, size, promise);
task.msg = msg;
task.promise = promise;
task.size = size;
task.flush = flush;
return task; return task;
} }
private final Recycler.Handle handle;
private WriteTask(Recycler.Handle handle) { private WriteTask(Recycler.Handle handle) {
this.handle = handle; super(handle);
} }
@Override @Override
public void run() { protected void recycle(Recycler.Handle handle) {
try { RECYCLER.recycle(this, handle);
if (size > 0) { }
ChannelOutboundBuffer buffer = ctx.channel.unsafe().outboundBuffer(); }
// Check for null as it may be set to null if the channel is closed already
if (buffer != null) {
buffer.decrementPendingOutboundBytes(size);
}
}
ctx.invokeWrite(msg, promise);
if (flush) {
ctx.invokeFlush();
}
} finally {
// Set to null so the GC can collect them directly
ctx = null;
msg = null;
promise = null;
RECYCLER.recycle(this, handle); static final class WriteAndFlushTask extends AbstractWriteTask {
private static final Recycler<WriteAndFlushTask> RECYCLER = new Recycler<WriteAndFlushTask>() {
@Override
protected WriteAndFlushTask newObject(Handle handle) {
return new WriteAndFlushTask(handle);
} }
};
private static WriteAndFlushTask newInstance(
DefaultChannelHandlerContext ctx, Object msg, int size, ChannelPromise promise) {
WriteAndFlushTask task = RECYCLER.get();
init(task, ctx, msg, size, promise);
return task;
}
private WriteAndFlushTask(Recycler.Handle handle) {
super(handle);
}
@Override
public void write(DefaultChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
super.write(ctx, msg, promise);
ctx.invokeFlush();
}
@Override
protected void recycle(Recycler.Handle handle) {
RECYCLER.recycle(this, handle);
} }
} }
} }

View File

@ -60,4 +60,14 @@ public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor im
channel.unsafe().register(this, promise); channel.unsafe().register(this, promise);
return promise; return promise;
} }
@Override
protected boolean wakesUpForTask(Runnable task) {
return !(task instanceof NonWakeupRunnable);
}
/**
* Marker interface for {@linkRunnable} that will not trigger an {@link #wakeup(boolean)} in all cases.
*/
interface NonWakeupRunnable extends Runnable { }
} }