Synchronized between 4.1 and master again

Motivation:
4 and 5 were diverged long time ago and we recently reverted some of the
early commits in master.  We must make sure 4.1 and master are not very
different now.

Modification:
Remove ChannelHandlerInvoker.writeAndFlush(...) and the related implementations.

Result:
4.1 and master got closer.
This commit is contained in:
Norman Maurer 2014-04-24 21:28:24 +02:00
parent a2f1f21612
commit 8c3eaf3b56
5 changed files with 30 additions and 129 deletions

View File

@ -146,14 +146,6 @@ public interface ChannelHandlerInvoker {
*/ */
void invokeWrite(ChannelHandlerContext ctx, Object msg, ChannelPromise promise); void invokeWrite(ChannelHandlerContext ctx, Object msg, ChannelPromise promise);
/**
* Invokes {@link ChannelOutboundHandler#write(ChannelHandlerContext, Object, ChannelPromise)} and
* {@link ChannelOutboundHandler#flush(ChannelHandlerContext)} sequentially.
* This method is not for a user but for the internal {@link ChannelHandlerContext} implementation.
* To trigger an event, use the methods in {@link ChannelHandlerContext} instead.
*/
void invokeWriteAndFlush(ChannelHandlerContext ctx, Object msg, ChannelPromise promise);
/** /**
* Invokes {@link ChannelOutboundHandler#flush(ChannelHandlerContext)}. * Invokes {@link ChannelOutboundHandler#flush(ChannelHandlerContext)}.
* This method is not for a user but for the internal {@link ChannelHandlerContext} implementation. * This method is not for a user but for the internal {@link ChannelHandlerContext} implementation.

View File

@ -171,11 +171,6 @@ public final class ChannelHandlerInvokerUtil {
} }
} }
public static void invokeWriteAndFlushNow(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
invokeWriteNow(ctx, msg, promise);
invokeFlushNow(ctx);
}
public static boolean validatePromise( public static boolean validatePromise(
ChannelHandlerContext ctx, ChannelPromise promise, boolean allowVoidPromise) { ChannelHandlerContext ctx, ChannelPromise promise, boolean allowVoidPromise) {
if (ctx == null) { if (ctx == null) {

View File

@ -306,7 +306,9 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou
public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) { public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
DefaultChannelHandlerContext next = findContextOutbound(); DefaultChannelHandlerContext next = findContextOutbound();
ReferenceCountUtil.touch(msg, next); ReferenceCountUtil.touch(msg, next);
next.invoker().invokeWriteAndFlush(next, msg, promise); ChannelHandlerInvoker invoker = next.invoker();
invoker.invokeWrite(next, msg, promise);
invoker.invokeFlush(next);
return promise; return promise;
} }

View File

@ -326,22 +326,14 @@ public class DefaultChannelHandlerInvoker implements ChannelHandlerInvoker {
if (msg == null) { if (msg == null) {
throw new NullPointerException("msg"); throw new NullPointerException("msg");
} }
if (!validatePromise(ctx, promise, true)) { if (!validatePromise(ctx, promise, true)) {
// promise cancelled // promise cancelled
ReferenceCountUtil.release(msg); ReferenceCountUtil.release(msg);
return; return;
} }
invokeWrite(ctx, msg, false, promise);
}
private void invokeWrite(ChannelHandlerContext ctx, Object msg, boolean flush, ChannelPromise promise) {
if (executor.inEventLoop()) { if (executor.inEventLoop()) {
invokeWriteNow(ctx, msg, promise); invokeWriteNow(ctx, msg, promise);
if (flush) {
invokeFlushNow(ctx);
}
} else { } else {
AbstractChannel channel = (AbstractChannel) ctx.channel(); AbstractChannel channel = (AbstractChannel) ctx.channel();
int size = channel.estimatorHandle().size(msg); int size = channel.estimatorHandle().size(msg);
@ -352,13 +344,7 @@ public class DefaultChannelHandlerInvoker implements ChannelHandlerInvoker {
buffer.incrementPendingOutboundBytes(size); buffer.incrementPendingOutboundBytes(size);
} }
} }
Runnable task; safeExecuteOutbound(WriteTask.newInstance(ctx, msg, size, promise), promise, msg);
if (flush) {
task = WriteAndFlushTask.newInstance(ctx, msg, size, promise);
} else {
task = WriteTask.newInstance(ctx, msg, size, promise);
}
safeExecuteOutbound(task, promise, msg);
} }
} }
@ -381,21 +367,6 @@ public class DefaultChannelHandlerInvoker implements ChannelHandlerInvoker {
} }
} }
@Override
public void invokeWriteAndFlush(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
if (msg == null) {
throw new NullPointerException("msg");
}
if (!validatePromise(ctx, promise, true)) {
// promise cancelled
ReferenceCountUtil.release(msg);
return;
}
invokeWrite(ctx, msg, true, promise);
}
private void safeExecuteInbound(Runnable task, Object msg) { private void safeExecuteInbound(Runnable task, Object msg) {
boolean success = false; boolean success = false;
try { try {
@ -427,56 +398,12 @@ public class DefaultChannelHandlerInvoker implements ChannelHandlerInvoker {
} }
} }
abstract static class AbstractWriteTask<T> extends OneTimeTask { static final class WriteTask extends OneTimeTask implements SingleThreadEventLoop.NonWakeupRunnable {
private final Recycler.Handle<T> handle;
private ChannelHandlerContext ctx; private ChannelHandlerContext ctx;
private Object msg; private Object msg;
private ChannelPromise promise; private ChannelPromise promise;
private int size; private int size;
protected AbstractWriteTask(Recycler.Handle<T> handle) {
this.handle = handle;
}
protected static void init(
AbstractWriteTask<?> task, ChannelHandlerContext ctx, Object msg, int size, ChannelPromise promise) {
task.ctx = ctx;
task.msg = msg;
task.promise = promise;
task.size = size;
}
@Override
public final void run() {
try {
if (size > 0) {
ChannelOutboundBuffer buffer = ctx.channel().unsafe().outboundBuffer();
// Check for null as it may be set to null if the channel is closed already
if (buffer != null) {
buffer.decrementPendingOutboundBytes(size);
}
}
write(ctx, msg, promise);
} finally {
// Set to null so the GC can collect them directly
ctx = null;
msg = null;
promise = null;
recycle(handle);
}
}
protected void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
invokeWriteNow(ctx, msg, promise);
}
protected abstract void recycle(Recycler.Handle<T> handle);
}
static final class WriteTask
extends AbstractWriteTask<WriteTask> implements SingleThreadEventLoop.NonWakeupRunnable {
private static final Recycler<WriteTask> RECYCLER = new Recycler<WriteTask>() { private static final Recycler<WriteTask> RECYCLER = new Recycler<WriteTask>() {
@Override @Override
protected WriteTask newObject(Handle<WriteTask> handle) { protected WriteTask newObject(Handle<WriteTask> handle) {
@ -484,51 +411,41 @@ public class DefaultChannelHandlerInvoker implements ChannelHandlerInvoker {
} }
}; };
static WriteTask newInstance(ChannelHandlerContext ctx, Object msg, int size, ChannelPromise promise) { private static WriteTask newInstance(
ChannelHandlerContext ctx, Object msg, int size, ChannelPromise promise) {
WriteTask task = RECYCLER.get(); WriteTask task = RECYCLER.get();
init(task, ctx, msg, size, promise); task.ctx = ctx;
task.msg = msg;
task.promise = promise;
task.size = size;
return task; return task;
} }
private final Recycler.Handle<WriteTask> handle;
private WriteTask(Recycler.Handle<WriteTask> handle) { private WriteTask(Recycler.Handle<WriteTask> handle) {
super(handle); this.handle = handle;
} }
@Override @Override
protected void recycle(Recycler.Handle<WriteTask> handle) { public void run() {
RECYCLER.recycle(this, handle); try {
} if (size > 0) {
} ChannelOutboundBuffer buffer = ctx.channel().unsafe().outboundBuffer();
// Check for null as it may be set to null if the channel is closed already
if (buffer != null) {
buffer.decrementPendingOutboundBytes(size);
}
}
invokeWriteNow(ctx, msg, promise);
} finally {
// Set to null so the GC can collect them directly
ctx = null;
msg = null;
promise = null;
static final class WriteAndFlushTask extends AbstractWriteTask<WriteAndFlushTask> { RECYCLER.recycle(this, handle);
private static final Recycler<WriteAndFlushTask> RECYCLER = new Recycler<WriteAndFlushTask>() {
@Override
protected WriteAndFlushTask newObject(Handle<WriteAndFlushTask> handle) {
return new WriteAndFlushTask(handle);
} }
};
static WriteAndFlushTask newInstance(
ChannelHandlerContext ctx, Object msg, int size, ChannelPromise promise) {
WriteAndFlushTask task = RECYCLER.get();
init(task, ctx, msg, size, promise);
return task;
}
private WriteAndFlushTask(Recycler.Handle<WriteAndFlushTask> handle) {
super(handle);
}
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
super.write(ctx, msg, promise);
invokeFlushNow(ctx);
}
@Override
protected void recycle(Recycler.Handle<WriteAndFlushTask> handle) {
RECYCLER.recycle(this, handle);
} }
} }
} }

View File

@ -203,11 +203,6 @@ final class EmbeddedEventLoop extends AbstractEventLoop implements ChannelHandle
invokeWriteNow(ctx, msg, promise); invokeWriteNow(ctx, msg, promise);
} }
@Override
public void invokeWriteAndFlush(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
invokeWriteAndFlushNow(ctx, msg, promise);
}
@Override @Override
public void invokeFlush(ChannelHandlerContext ctx) { public void invokeFlush(ChannelHandlerContext ctx) {
invokeFlushNow(ctx); invokeFlushNow(ctx);