Remove ChannelHandlerInvoker

Motivation:

We tried to provide the ability for the user to change the semantics of the threading-model by delegate the invoking of the ChannelHandler to the ChannelHandlerInvoker. Unfortunually this not really worked out quite well and resulted in just more complexity and splitting of code that belongs together. We should remove the ChannelHandlerInvoker again and just do the same as in 4.0

Modifications:

Remove ChannelHandlerInvoker again and replace its usage in Http2MultiplexCodec

Result:

Easier code and less bad abstractions.
This commit is contained in:
Norman Maurer 2016-05-17 09:05:55 +02:00
parent a56ef03f24
commit 68cd670eb9
20 changed files with 921 additions and 1590 deletions

View File

@ -24,8 +24,6 @@ import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelHandlerInvoker;
import io.netty.channel.ChannelHandlerInvokerUtil;
import io.netty.channel.ChannelPromise;
import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
@ -83,6 +81,7 @@ public final class Http2MultiplexCodec extends ChannelDuplexHandler {
private final List<StreamInfo> streamsToFireChildReadComplete = new ArrayList<StreamInfo>();
private ChannelHandlerContext ctx;
private ChannelHandlerContext http2HandlerCtx;
private volatile Runnable flushTask;
/**
* Construct a new handler whose child channels run in the same event loop as this handler.
@ -176,15 +175,74 @@ public final class Http2MultiplexCodec extends ChannelDuplexHandler {
}
}
// Override this to signal it will never throw an exception.
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
ctx.fireExceptionCaught(cause);
}
// Override this to signal it will never throw an exception.
@Override
public void flush(ChannelHandlerContext ctx) {
ctx.flush();
}
void flushFromStreamChannel() {
EventExecutor executor = ctx.executor();
if (executor.inEventLoop()) {
flush(ctx);
} else {
Runnable task = flushTask;
if (task == null) {
task = flushTask = new Runnable() {
@Override
public void run() {
flush(ctx);
}
};
}
executor.execute(task);
}
}
void writeFromStreamChannel(final Object msg, final boolean flush) {
final ChannelPromise promise = ctx.newPromise();
EventExecutor executor = ctx.executor();
if (executor.inEventLoop()) {
writeFromStreamChannel0(msg, flush, promise);
} else {
try {
executor.execute(new OneTimeTask() {
@Override
public void run() {
writeFromStreamChannel0(msg, flush, promise);
}
});
} catch (Throwable cause) {
promise.setFailure(cause);
}
}
}
private void writeFromStreamChannel0(Object msg, boolean flush, ChannelPromise promise) {
try {
write(ctx, msg, promise);
} catch (Throwable cause) {
promise.tryFailure(cause);
}
if (flush) {
flush(ctx);
}
}
/**
* Processes all {@link Http2Frame}s. {@link Http2StreamFrame}s may only originate in child
* streams.
*/
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
throws Exception {
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
if (!(msg instanceof Http2Frame)) {
super.write(ctx, msg, promise);
ctx.write(msg, promise);
return;
}
try {
@ -280,7 +338,7 @@ public final class Http2MultiplexCodec extends ChannelDuplexHandler {
@Override
public void onStreamClosed(Http2Stream stream) {
final StreamInfo streamInfo = (StreamInfo) stream.getProperty(streamInfoKey);
final StreamInfo streamInfo = stream.getProperty(streamInfoKey);
if (streamInfo != null) {
final EventLoop eventLoop = streamInfo.childChannel.eventLoop();
if (eventLoop.inEventLoop()) {
@ -318,8 +376,18 @@ public final class Http2MultiplexCodec extends ChannelDuplexHandler {
return true;
}
});
} catch (Throwable t) {
ctx.invoker().invokeExceptionCaught(ctx, t);
} catch (final Throwable t) {
EventExecutor executor = ctx.executor();
if (executor.inEventLoop()) {
exceptionCaught(ctx, t);
} else {
executor.execute(new OneTimeTask() {
@Override
public void run() {
exceptionCaught(ctx, t);
}
});
}
}
ctx.fireUserEventTriggered(goAway.duplicate().retain());
}
@ -339,7 +407,7 @@ public final class Http2MultiplexCodec extends ChannelDuplexHandler {
if (stream == null) {
return;
}
StreamInfo streamInfo = (StreamInfo) stream.getProperty(streamInfoKey);
StreamInfo streamInfo = stream.getProperty(streamInfoKey);
if (streamInfo == null) {
return;
}
@ -355,7 +423,7 @@ public final class Http2MultiplexCodec extends ChannelDuplexHandler {
public void onRstStreamRead(ChannelHandlerContext ctx, int streamId, long errorCode)
throws Http2Exception {
Http2Stream stream = http2Handler.connection().stream(streamId);
StreamInfo streamInfo = (StreamInfo) stream.getProperty(streamInfoKey);
StreamInfo streamInfo = stream.getProperty(streamInfoKey);
// Use a user event in order to circumvent read queue.
streamInfo.childChannel.pipeline().fireUserEventTriggered(new DefaultHttp2ResetFrame(errorCode));
}
@ -371,7 +439,7 @@ public final class Http2MultiplexCodec extends ChannelDuplexHandler {
public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers,
int padding, boolean endOfStream) throws Http2Exception {
Http2Stream stream = http2Handler.connection().stream(streamId);
StreamInfo streamInfo = (StreamInfo) stream.getProperty(streamInfoKey);
StreamInfo streamInfo = stream.getProperty(streamInfoKey);
fireChildReadAndRegister(streamInfo, new DefaultHttp2HeadersFrame(headers, endOfStream, padding));
}
@ -379,7 +447,7 @@ public final class Http2MultiplexCodec extends ChannelDuplexHandler {
public int onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding,
boolean endOfStream) throws Http2Exception {
Http2Stream stream = http2Handler.connection().stream(streamId);
StreamInfo streamInfo = (StreamInfo) stream.getProperty(streamInfoKey);
StreamInfo streamInfo = stream.getProperty(streamInfoKey);
fireChildReadAndRegister(streamInfo, new DefaultHttp2DataFrame(data.retain(), endOfStream, padding));
// We return the bytes in bytesConsumed() once the stream channel consumed the bytes.
return 0;
@ -389,7 +457,7 @@ public final class Http2MultiplexCodec extends ChannelDuplexHandler {
static final class StreamInfo {
final Http2StreamChannel childChannel;
/**
* {@code true} if stream is in {@link Http2MultiplexCodec#steamsToFireChildReadComplete}.
* {@code true} if stream is in {@link Http2MultiplexCodec#streamsToFireChildReadComplete}.
*/
boolean inStreamsToFireChildReadComplete;
@ -413,9 +481,7 @@ public final class Http2MultiplexCodec extends ChannelDuplexHandler {
protected void doClose() throws Exception {
if (!onStreamClosedFired) {
Http2StreamFrame resetFrame = new DefaultHttp2ResetFrame(Http2Error.CANCEL).setStream(this);
ChannelHandlerInvoker invoker = ctx.invoker();
invoker.invokeWrite(ctx, resetFrame, ctx.newPromise());
invoker.invokeFlush(ctx);
writeFromStreamChannel(resetFrame, true);
}
super.doClose();
}
@ -432,12 +498,13 @@ public final class Http2MultiplexCodec extends ChannelDuplexHandler {
throw new IllegalArgumentException("Stream must be null on the frame");
}
frame.setStream(this);
ctx.invoker().invokeWrite(ctx, frame, ctx.newPromise());
writeFromStreamChannel(msg, false);
}
@Override
protected void doWriteComplete() {
ctx.invoker().invokeFlush(ctx);
flushFromStreamChannel();
}
@Override
@ -464,7 +531,7 @@ public final class Http2MultiplexCodec extends ChannelDuplexHandler {
try {
http2Handler.connection().local().flowController().consumeBytes(stream, bytes);
} catch (Throwable t) {
ChannelHandlerInvokerUtil.invokeExceptionCaughtNow(ctx, t);
exceptionCaught(ctx, t);
}
}
}

View File

@ -38,6 +38,7 @@ import io.netty.util.AsciiString;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.internal.PlatformDependent;
import java.net.InetSocketAddress;
import java.util.ArrayDeque;
import java.util.Queue;
@ -68,7 +69,7 @@ public class Http2MultiplexCodecTest {
@Before
public void setUp() throws Exception {
channel.connect(null);
channel.connect(new InetSocketAddress(0));
channel.pipeline().addLast(serverCodec);
http2HandlerCtx = channel.pipeline().context(serverCodec.connectionHandler());

View File

@ -20,7 +20,6 @@ import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelHandlerInvoker;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelProgressivePromise;
import io.netty.channel.ChannelPromise;
@ -50,7 +49,7 @@ public abstract class EmbeddedChannelWriteReleaseHandlerContext implements Chann
this.alloc = checkNotNull(alloc, "alloc");
this.channel = checkNotNull(channel, "channel");
this.handler = checkNotNull(handler, "handler");
this.eventLoop = checkNotNull(channel.eventLoop(), "eventLoop");
eventLoop = checkNotNull(channel.eventLoop(), "eventLoop");
}
protected abstract void handleException(Throwable t);
@ -75,11 +74,6 @@ public abstract class EmbeddedChannelWriteReleaseHandlerContext implements Chann
return eventLoop;
}
@Override
public ChannelHandlerInvoker invoker() {
return eventLoop.asInvoker();
}
@Override
public String name() {
return HANDLER_NAME;

View File

@ -431,11 +431,6 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
return recvHandle;
}
@Override
public final ChannelHandlerInvoker invoker() {
return eventLoop().asInvoker();
}
@Override
public final ChannelOutboundBuffer outboundBuffer() {
return outboundBuffer;

View File

@ -218,11 +218,6 @@ public interface Channel extends AttributeMap, ChannelOutboundInvoker, Comparabl
*/
RecvByteBufAllocator.Handle recvBufAllocHandle();
/**
* Returns the {@link ChannelHandlerInvoker} which is used by default unless specified by a user.
*/
ChannelHandlerInvoker invoker();
/**
* Return the {@link SocketAddress} to which is bound local or
* {@code null} if none.

View File

@ -134,14 +134,6 @@ public interface ChannelHandlerContext extends AttributeMap, ChannelInboundInvok
*/
EventExecutor executor();
/**
* Returns the {@link ChannelHandlerInvoker} which is used to trigger an event for the associated
* {@link ChannelHandler}. Note that the methods in {@link ChannelHandlerInvoker} are not intended to be called
* by a user. Use this method only to obtain the reference to the {@link ChannelHandlerInvoker}
* (and not calling its methods) unless you know what you are doing.
*/
ChannelHandlerInvoker invoker();
/**
* The unique name of the {@link ChannelHandlerContext}.The name was used when then {@link ChannelHandler}
* was added to the {@link ChannelPipeline}. This name can also be used to access the registered

View File

@ -1,155 +0,0 @@
/*
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel;
import io.netty.util.concurrent.EventExecutor;
import java.net.SocketAddress;
/**
* Invokes the event handler methods of {@link ChannelInboundHandler} and {@link ChannelOutboundHandler}.
* A user can specify a {@link ChannelHandlerInvoker} to implement a custom thread model unsupported by the default
* implementation. Note that the methods in this interface are not intended to be called by a user.
*/
public interface ChannelHandlerInvoker {
/**
* Returns the {@link EventExecutor} which is used to execute an arbitrary task.
*/
EventExecutor executor();
/**
* Invokes {@link ChannelInboundHandler#channelRegistered(ChannelHandlerContext)}. This method is not for a user
* but for the internal {@link ChannelHandlerContext} implementation. To trigger an event, use the methods in
* {@link ChannelHandlerContext} instead.
*/
void invokeChannelRegistered(ChannelHandlerContext ctx);
/**
* Invokes {@link ChannelInboundHandler#channelUnregistered(ChannelHandlerContext)}. This method is not for a user
* but for the internal {@link ChannelHandlerContext} implementation. To trigger an event, use the methods in
* {@link ChannelHandlerContext} instead.
*/
void invokeChannelUnregistered(ChannelHandlerContext ctx);
/**
* Invokes {@link ChannelInboundHandler#channelActive(ChannelHandlerContext)}. This method is not for a user
* but for the internal {@link ChannelHandlerContext} implementation. To trigger an event, use the methods in
* {@link ChannelHandlerContext} instead.
*/
void invokeChannelActive(ChannelHandlerContext ctx);
/**
* Invokes {@link ChannelInboundHandler#channelInactive(ChannelHandlerContext)}. This method is not for a user
* but for the internal {@link ChannelHandlerContext} implementation. To trigger an event, use the methods in
* {@link ChannelHandlerContext} instead.
*/
void invokeChannelInactive(ChannelHandlerContext ctx);
/**
* Invokes {@link ChannelHandler#exceptionCaught(ChannelHandlerContext, Throwable)}. This method is not for a user
* but for the internal {@link ChannelHandlerContext} implementation. To trigger an event, use the methods in
* {@link ChannelHandlerContext} instead.
*/
void invokeExceptionCaught(ChannelHandlerContext ctx, Throwable cause);
/**
* Invokes {@link ChannelInboundHandler#userEventTriggered(ChannelHandlerContext, Object)}. This method is not for
* a user but for the internal {@link ChannelHandlerContext} implementation. To trigger an event, use the methods in
* {@link ChannelHandlerContext} instead.
*/
void invokeUserEventTriggered(ChannelHandlerContext ctx, Object event);
/**
* Invokes {@link ChannelInboundHandler#channelRead(ChannelHandlerContext, Object)}. This method is not for a user
* but for the internal {@link ChannelHandlerContext} implementation. To trigger an event, use the methods in
* {@link ChannelHandlerContext} instead.
*/
void invokeChannelRead(ChannelHandlerContext ctx, Object msg);
/**
* Invokes {@link ChannelInboundHandler#channelReadComplete(ChannelHandlerContext)}. This method is not for a user
* but for the internal {@link ChannelHandlerContext} implementation. To trigger an event, use the methods in
* {@link ChannelHandlerContext} instead.
*/
void invokeChannelReadComplete(ChannelHandlerContext ctx);
/**
* Invokes {@link ChannelInboundHandler#channelWritabilityChanged(ChannelHandlerContext)}. This method is not for
* a user but for the internal {@link ChannelHandlerContext} implementation. To trigger an event, use the methods in
* {@link ChannelHandlerContext} instead.
*/
void invokeChannelWritabilityChanged(ChannelHandlerContext ctx);
/**
* Invokes {@link ChannelOutboundHandler#bind(ChannelHandlerContext, SocketAddress, ChannelPromise)}.
* This method is not for a user but for the internal {@link ChannelHandlerContext} implementation.
* To trigger an event, use the methods in {@link ChannelHandlerContext} instead.
*/
void invokeBind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise);
/**
* Invokes
* {@link ChannelOutboundHandler#connect(ChannelHandlerContext, SocketAddress, SocketAddress, ChannelPromise)}.
* This method is not for a user but for the internal {@link ChannelHandlerContext} implementation.
* To trigger an event, use the methods in {@link ChannelHandlerContext} instead.
*/
void invokeConnect(
ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise);
/**
* Invokes {@link ChannelOutboundHandler#disconnect(ChannelHandlerContext, ChannelPromise)}.
* This method is not for a user but for the internal {@link ChannelHandlerContext} implementation.
* To trigger an event, use the methods in {@link ChannelHandlerContext} instead.
*/
void invokeDisconnect(ChannelHandlerContext ctx, ChannelPromise promise);
/**
* Invokes {@link ChannelOutboundHandler#close(ChannelHandlerContext, ChannelPromise)}.
* This method is not for a user but for the internal {@link ChannelHandlerContext} implementation.
* To trigger an event, use the methods in {@link ChannelHandlerContext} instead.
*/
void invokeClose(ChannelHandlerContext ctx, ChannelPromise promise);
/**
* Invokes {@link ChannelOutboundHandler#deregister(ChannelHandlerContext, ChannelPromise)}.
* This method is not for a user but for the internal {@link ChannelHandlerContext} implementation.
* To trigger an event, use the methods in {@link ChannelHandlerContext} instead.
*/
void invokeDeregister(ChannelHandlerContext ctx, ChannelPromise promise);
/**
* Invokes {@link ChannelOutboundHandler#read(ChannelHandlerContext)}.
* This method is not for a user but for the internal {@link ChannelHandlerContext} implementation.
* To trigger an event, use the methods in {@link ChannelHandlerContext} instead.
*/
void invokeRead(ChannelHandlerContext ctx);
/**
* Invokes {@link ChannelOutboundHandler#write(ChannelHandlerContext, Object, ChannelPromise)}.
* This method is not for a user but for the internal {@link ChannelHandlerContext} implementation.
* To trigger an event, use the methods in {@link ChannelHandlerContext} instead.
*/
void invokeWrite(ChannelHandlerContext ctx, Object msg, ChannelPromise promise);
/**
* Invokes {@link ChannelOutboundHandler#flush(ChannelHandlerContext)}.
* This method is not for a user but for the internal {@link ChannelHandlerContext} implementation.
* To trigger an event, use the methods in {@link ChannelHandlerContext} instead.
*/
void invokeFlush(ChannelHandlerContext ctx);
}

View File

@ -1,251 +0,0 @@
/*
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel;
import io.netty.util.internal.StringUtil;
import java.net.SocketAddress;
import static io.netty.channel.DefaultChannelPipeline.*;
/**
* A set of helper methods for easier implementation of custom {@link ChannelHandlerInvoker} implementation.
*/
public final class ChannelHandlerInvokerUtil {
public static void invokeChannelRegisteredNow(ChannelHandlerContext ctx) {
try {
((ChannelInboundHandler) ctx.handler()).channelRegistered(ctx);
} catch (Throwable t) {
notifyHandlerException(ctx, t);
}
}
public static void invokeChannelUnregisteredNow(ChannelHandlerContext ctx) {
try {
((ChannelInboundHandler) ctx.handler()).channelUnregistered(ctx);
} catch (Throwable t) {
notifyHandlerException(ctx, t);
}
}
public static void invokeChannelActiveNow(final ChannelHandlerContext ctx) {
try {
((ChannelInboundHandler) ctx.handler()).channelActive(ctx);
} catch (Throwable t) {
notifyHandlerException(ctx, t);
}
}
public static void invokeChannelInactiveNow(final ChannelHandlerContext ctx) {
try {
((ChannelInboundHandler) ctx.handler()).channelInactive(ctx);
} catch (Throwable t) {
notifyHandlerException(ctx, t);
}
}
public static void invokeExceptionCaughtNow(final ChannelHandlerContext ctx, final Throwable cause) {
try {
ctx.handler().exceptionCaught(ctx, cause);
} catch (Throwable t) {
if (logger.isWarnEnabled()) {
logger.warn("An exception was thrown by a user handler's exceptionCaught() method:", t);
logger.warn(".. and the cause of the exceptionCaught() was:", cause);
}
}
}
public static void invokeUserEventTriggeredNow(final ChannelHandlerContext ctx, final Object event) {
try {
((ChannelInboundHandler) ctx.handler()).userEventTriggered(ctx, event);
} catch (Throwable t) {
notifyHandlerException(ctx, t);
}
}
public static void invokeChannelReadNow(final ChannelHandlerContext ctx, final Object msg) {
try {
((ChannelInboundHandler) ctx.handler()).channelRead(ctx, msg);
} catch (Throwable t) {
notifyHandlerException(ctx, t);
}
}
public static void invokeChannelReadCompleteNow(final ChannelHandlerContext ctx) {
try {
((ChannelInboundHandler) ctx.handler()).channelReadComplete(ctx);
} catch (Throwable t) {
notifyHandlerException(ctx, t);
}
}
public static void invokeChannelWritabilityChangedNow(final ChannelHandlerContext ctx) {
try {
((ChannelInboundHandler) ctx.handler()).channelWritabilityChanged(ctx);
} catch (Throwable t) {
notifyHandlerException(ctx, t);
}
}
public static void invokeBindNow(
final ChannelHandlerContext ctx, final SocketAddress localAddress, final ChannelPromise promise) {
try {
((ChannelOutboundHandler) ctx.handler()).bind(ctx, localAddress, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
}
public static void invokeConnectNow(
final ChannelHandlerContext ctx,
final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
try {
((ChannelOutboundHandler) ctx.handler()).connect(ctx, remoteAddress, localAddress, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
}
public static void invokeDisconnectNow(final ChannelHandlerContext ctx, final ChannelPromise promise) {
try {
((ChannelOutboundHandler) ctx.handler()).disconnect(ctx, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
}
public static void invokeCloseNow(final ChannelHandlerContext ctx, final ChannelPromise promise) {
try {
((ChannelOutboundHandler) ctx.handler()).close(ctx, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
}
public static void invokeDeregisterNow(final ChannelHandlerContext ctx, final ChannelPromise promise) {
try {
((ChannelOutboundHandler) ctx.handler()).deregister(ctx, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
}
public static void invokeReadNow(final ChannelHandlerContext ctx) {
try {
((ChannelOutboundHandler) ctx.handler()).read(ctx);
} catch (Throwable t) {
notifyHandlerException(ctx, t);
}
}
public static void invokeWriteNow(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
try {
((ChannelOutboundHandler) ctx.handler()).write(ctx, msg, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
}
public static void invokeFlushNow(final ChannelHandlerContext ctx) {
try {
((ChannelOutboundHandler) ctx.handler()).flush(ctx);
} catch (Throwable t) {
notifyHandlerException(ctx, t);
}
}
public static boolean validatePromise(
ChannelHandlerContext ctx, ChannelPromise promise, boolean allowVoidPromise) {
if (ctx == null) {
throw new NullPointerException("ctx");
}
if (promise == null) {
throw new NullPointerException("promise");
}
if (promise.isDone()) {
if (promise.isCancelled()) {
return false;
}
throw new IllegalArgumentException("promise already done: " + promise);
}
if (promise.channel() != ctx.channel()) {
throw new IllegalArgumentException(String.format(
"promise.channel does not match: %s (expected: %s)", promise.channel(), ctx.channel()));
}
if (promise.getClass() == DefaultChannelPromise.class) {
return true;
}
if (!allowVoidPromise && promise instanceof VoidChannelPromise) {
throw new IllegalArgumentException(
StringUtil.simpleClassName(VoidChannelPromise.class) + " not allowed for this operation");
}
if (promise instanceof AbstractChannel.CloseFuture) {
throw new IllegalArgumentException(
StringUtil.simpleClassName(AbstractChannel.CloseFuture.class) + " not allowed in a pipeline");
}
return true;
}
private static void notifyHandlerException(ChannelHandlerContext ctx, Throwable cause) {
if (inExceptionCaught(cause)) {
if (logger.isWarnEnabled()) {
logger.warn(
"An exception was thrown by a user handler " +
"while handling an exceptionCaught event", cause);
}
return;
}
invokeExceptionCaughtNow(ctx, cause);
}
private static void notifyOutboundHandlerException(Throwable cause, ChannelPromise promise) {
if (!promise.tryFailure(cause) && !(promise instanceof VoidChannelPromise)) {
if (logger.isWarnEnabled()) {
logger.warn("Failed to fail the promise because it's done already: {}", promise, cause);
}
}
}
private static boolean inExceptionCaught(Throwable cause) {
do {
StackTraceElement[] trace = cause.getStackTrace();
if (trace != null) {
for (StackTraceElement t : trace) {
if (t == null) {
break;
}
if ("exceptionCaught".equals(t.getMethodName())) {
return true;
}
}
}
cause = cause.getCause();
} while (cause != null);
return false;
}
private ChannelHandlerInvokerUtil() { }
}

View File

@ -219,13 +219,13 @@ public interface ChannelPipeline
/**
* Inserts a {@link ChannelHandler} at the first position of this pipeline.
*
* @param name the name of the handler to insert first. {@code null} to let the name auto-generated.
* @param name the name of the handler to insert first
* @param handler the handler to insert first
*
* @throws IllegalArgumentException
* if there's an entry with the same name already in the pipeline
* @throws NullPointerException
* if the specified handler is {@code null}
* if the specified name or handler is {@code null}
*/
ChannelPipeline addFirst(String name, ChannelHandler handler);
@ -234,40 +234,26 @@ public interface ChannelPipeline
*
* @param group the {@link EventExecutorGroup} which will be used to execute the {@link ChannelHandler}
* methods
* @param name the name of the handler to insert first. {@code null} to let the name auto-generated.
* @param name the name of the handler to insert first
* @param handler the handler to insert first
*
* @throws IllegalArgumentException
* if there's an entry with the same name already in the pipeline
* @throws NullPointerException
* if the specified handler is {@code null}
* if the specified name or handler is {@code null}
*/
ChannelPipeline addFirst(EventExecutorGroup group, String name, ChannelHandler handler);
/**
* Inserts a {@link ChannelHandler} at the first position of this pipeline.
*
* @param invoker the {@link ChannelHandlerInvoker} which invokes the {@code handler}s event handler methods
* @param name the name of the handler to insert first. {@code null} to let the name auto-generated.
* @param handler the handler to insert first
*
* @throws IllegalArgumentException
* if there's an entry with the same name already in the pipeline
* @throws NullPointerException
* if the specified handler is {@code null}
*/
ChannelPipeline addFirst(ChannelHandlerInvoker invoker, String name, ChannelHandler handler);
/**
* Appends a {@link ChannelHandler} at the last position of this pipeline.
*
* @param name the name of the handler to append. {@code null} to let the name auto-generated.
* @param name the name of the handler to append
* @param handler the handler to append
*
* @throws IllegalArgumentException
* if there's an entry with the same name already in the pipeline
* @throws NullPointerException
* if the specified handler is {@code null}
* if the specified name or handler is {@code null}
*/
ChannelPipeline addLast(String name, ChannelHandler handler);
@ -276,36 +262,22 @@ public interface ChannelPipeline
*
* @param group the {@link EventExecutorGroup} which will be used to execute the {@link ChannelHandler}
* methods
* @param name the name of the handler to append. {@code null} to let the name auto-generated.
* @param name the name of the handler to append
* @param handler the handler to append
*
* @throws IllegalArgumentException
* if there's an entry with the same name already in the pipeline
* @throws NullPointerException
* if the specified handler is {@code null}
* if the specified name or handler is {@code null}
*/
ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler);
/**
* Appends a {@link ChannelHandler} at the last position of this pipeline.
*
* @param invoker the {@link ChannelHandlerInvoker} which invokes the {@code handler}s event handler methods
* @param name the name of the handler to append. {@code null} to let the name auto-generated.
* @param handler the handler to append
*
* @throws IllegalArgumentException
* if there's an entry with the same name already in the pipeline
* @throws NullPointerException
* if the specified handler is {@code null}
*/
ChannelPipeline addLast(ChannelHandlerInvoker invoker, String name, ChannelHandler handler);
/**
* Inserts a {@link ChannelHandler} before an existing handler of this
* pipeline.
*
* @param baseName the name of the existing handler
* @param name the name of the handler to insert before. {@code null} to let the name auto-generated.
* @param name the name of the handler to insert before
* @param handler the handler to insert before
*
* @throws NoSuchElementException
@ -313,7 +285,7 @@ public interface ChannelPipeline
* @throws IllegalArgumentException
* if there's an entry with the same name already in the pipeline
* @throws NullPointerException
* if the specified baseName or handler is {@code null}
* if the specified baseName, name, or handler is {@code null}
*/
ChannelPipeline addBefore(String baseName, String name, ChannelHandler handler);
@ -324,7 +296,7 @@ public interface ChannelPipeline
* @param group the {@link EventExecutorGroup} which will be used to execute the {@link ChannelHandler}
* methods
* @param baseName the name of the existing handler
* @param name the name of the handler to insert before. {@code null} to let the name auto-generated.
* @param name the name of the handler to insert before
* @param handler the handler to insert before
*
* @throws NoSuchElementException
@ -332,34 +304,16 @@ public interface ChannelPipeline
* @throws IllegalArgumentException
* if there's an entry with the same name already in the pipeline
* @throws NullPointerException
* if the specified baseName or handler is {@code null}
* if the specified baseName, name, or handler is {@code null}
*/
ChannelPipeline addBefore(EventExecutorGroup group, String baseName, String name, ChannelHandler handler);
/**
* Inserts a {@link ChannelHandler} before an existing handler of this
* pipeline.
*
* @param invoker the {@link ChannelHandlerInvoker} which invokes the {@code handler}s event handler methods
* @param baseName the name of the existing handler
* @param name the name of the handler to insert before. {@code null} to let the name auto-generated.
* @param handler the handler to insert before
*
* @throws NoSuchElementException
* if there's no such entry with the specified {@code baseName}
* @throws IllegalArgumentException
* if there's an entry with the same name already in the pipeline
* @throws NullPointerException
* if the specified baseName or handler is {@code null}
*/
ChannelPipeline addBefore(ChannelHandlerInvoker invoker, String baseName, String name, ChannelHandler handler);
/**
* Inserts a {@link ChannelHandler} after an existing handler of this
* pipeline.
*
* @param baseName the name of the existing handler
* @param name the name of the handler to insert after. {@code null} to let the name auto-generated.
* @param name the name of the handler to insert after
* @param handler the handler to insert after
*
* @throws NoSuchElementException
@ -367,7 +321,7 @@ public interface ChannelPipeline
* @throws IllegalArgumentException
* if there's an entry with the same name already in the pipeline
* @throws NullPointerException
* if the specified baseName or handler is {@code null}
* if the specified baseName, name, or handler is {@code null}
*/
ChannelPipeline addAfter(String baseName, String name, ChannelHandler handler);
@ -378,7 +332,7 @@ public interface ChannelPipeline
* @param group the {@link EventExecutorGroup} which will be used to execute the {@link ChannelHandler}
* methods
* @param baseName the name of the existing handler
* @param name the name of the handler to insert after. {@code null} to let the name auto-generated.
* @param name the name of the handler to insert after
* @param handler the handler to insert after
*
* @throws NoSuchElementException
@ -386,28 +340,10 @@ public interface ChannelPipeline
* @throws IllegalArgumentException
* if there's an entry with the same name already in the pipeline
* @throws NullPointerException
* if the specified baseName or handler is {@code null}
* if the specified baseName, name, or handler is {@code null}
*/
ChannelPipeline addAfter(EventExecutorGroup group, String baseName, String name, ChannelHandler handler);
/**
* Inserts a {@link ChannelHandler} after an existing handler of this
* pipeline.
*
* @param invoker the {@link ChannelHandlerInvoker} which invokes the {@code handler}s event handler methods
* @param baseName the name of the existing handler
* @param name the name of the handler to insert after. {@code null} to let the name auto-generated.
* @param handler the handler to insert after
*
* @throws NoSuchElementException
* if there's no such entry with the specified {@code baseName}
* @throws IllegalArgumentException
* if there's an entry with the same name already in the pipeline
* @throws NullPointerException
* if the specified baseName or handler is {@code null}
*/
ChannelPipeline addAfter(ChannelHandlerInvoker invoker, String baseName, String name, ChannelHandler handler);
/**
* Inserts {@link ChannelHandler}s at the first position of this pipeline.
*
@ -426,15 +362,6 @@ public interface ChannelPipeline
*/
ChannelPipeline addFirst(EventExecutorGroup group, ChannelHandler... handlers);
/**
* Inserts {@link ChannelHandler}s at the first position of this pipeline.
*
* @param invoker the {@link ChannelHandlerInvoker} which invokes the {@code handler}s event handler methods
* @param handlers the handlers to insert first
*
*/
ChannelPipeline addFirst(ChannelHandlerInvoker invoker, ChannelHandler... handlers);
/**
* Inserts {@link ChannelHandler}s at the last position of this pipeline.
*
@ -453,15 +380,6 @@ public interface ChannelPipeline
*/
ChannelPipeline addLast(EventExecutorGroup group, ChannelHandler... handlers);
/**
* Inserts {@link ChannelHandler}s at the last position of this pipeline.
*
* @param invoker the {@link ChannelHandlerInvoker} which invokes the {@code handler}s event handler methods
* @param handlers the handlers to insert last
*
*/
ChannelPipeline addLast(ChannelHandlerInvoker invoker, ChannelHandler... handlers);
/**
* Removes the specified {@link ChannelHandler} from this pipeline.
*
@ -527,8 +445,7 @@ public interface ChannelPipeline
* Replaces the specified {@link ChannelHandler} with a new handler in this pipeline.
*
* @param oldHandler the {@link ChannelHandler} to be replaced
* @param newName the name under which the replacement should be added.
* {@code null} to use the same name with the handler being replaced.
* @param newName the name under which the replacement should be added
* @param newHandler the {@link ChannelHandler} which is used as replacement
*
* @return itself
@ -539,7 +456,8 @@ public interface ChannelPipeline
* if a handler with the specified new name already exists in this
* pipeline, except for the handler to be replaced
* @throws NullPointerException
* if the specified old handler or new handler is {@code null}
* if the specified old handler, new name, or new handler is
* {@code null}
*/
ChannelPipeline replace(ChannelHandler oldHandler, String newName, ChannelHandler newHandler);
@ -547,8 +465,7 @@ public interface ChannelPipeline
* Replaces the {@link ChannelHandler} of the specified name with a new handler in this pipeline.
*
* @param oldName the name of the {@link ChannelHandler} to be replaced
* @param newName the name under which the replacement should be added.
* {@code null} to use the same name with the handler being replaced.
* @param newName the name under which the replacement should be added
* @param newHandler the {@link ChannelHandler} which is used as replacement
*
* @return the removed handler
@ -559,7 +476,8 @@ public interface ChannelPipeline
* if a handler with the specified new name already exists in this
* pipeline, except for the handler to be replaced
* @throws NullPointerException
* if the specified old handler or new handler is {@code null}
* if the specified old handler, new name, or new handler is
* {@code null}
*/
ChannelHandler replace(String oldName, String newName, ChannelHandler newHandler);
@ -567,8 +485,7 @@ public interface ChannelPipeline
* Replaces the {@link ChannelHandler} of the specified type with a new handler in this pipeline.
*
* @param oldHandlerType the type of the handler to be removed
* @param newName the name under which the replacement should be added.
* {@code null} to use the same name with the handler being replaced.
* @param newName the name under which the replacement should be added
* @param newHandler the {@link ChannelHandler} which is used as replacement
*
* @return the removed handler
@ -580,7 +497,8 @@ public interface ChannelPipeline
* if a handler with the specified new name already exists in this
* pipeline, except for the handler to be replaced
* @throws NullPointerException
* if the specified old handler or new handler is {@code null}
* if the specified old handler, new name, or new handler is
* {@code null}
*/
<T extends ChannelHandler> T replace(Class<T> oldHandlerType, String newName,
ChannelHandler newHandler);

View File

@ -372,11 +372,6 @@ public class CombinedChannelDuplexHandler<I extends ChannelInboundHandler, O ext
return ctx.executor();
}
@Override
public ChannelHandlerInvoker invoker() {
return ctx.invoker();
}
@Override
public String name() {
return ctx.name();

View File

@ -15,13 +15,15 @@
*/
package io.netty.channel;
import io.netty.util.concurrent.EventExecutor;
final class DefaultChannelHandlerContext extends AbstractChannelHandlerContext {
private final ChannelHandler handler;
DefaultChannelHandlerContext(
DefaultChannelPipeline pipeline, ChannelHandlerInvoker invoker, String name, ChannelHandler handler) {
super(pipeline, invoker, name, isInbound(handler), isOutbound(handler));
DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {
super(pipeline, executor, name, isInbound(handler), isOutbound(handler));
if (handler == null) {
throw new NullPointerException("handler");
}

View File

@ -1,510 +0,0 @@
/*
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel;
import io.netty.util.Recycler;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.internal.OneTimeTask;
import io.netty.util.internal.RecyclableMpscLinkedQueueNode;
import io.netty.util.internal.SystemPropertyUtil;
import java.net.SocketAddress;
import static io.netty.channel.ChannelHandlerInvokerUtil.*;
import static io.netty.channel.DefaultChannelPipeline.*;
public class DefaultChannelHandlerInvoker implements ChannelHandlerInvoker {
private final EventExecutor executor;
public DefaultChannelHandlerInvoker(EventExecutor executor) {
if (executor == null) {
throw new NullPointerException("executor");
}
this.executor = executor;
}
@Override
public EventExecutor executor() {
return executor;
}
@Override
public void invokeChannelRegistered(final ChannelHandlerContext ctx) {
if (executor.inEventLoop()) {
invokeChannelRegisteredNow(ctx);
} else {
executor.execute(new OneTimeTask() {
@Override
public void run() {
invokeChannelRegisteredNow(ctx);
}
});
}
}
@Override
public void invokeChannelUnregistered(final ChannelHandlerContext ctx) {
if (executor.inEventLoop()) {
invokeChannelUnregisteredNow(ctx);
} else {
executor.execute(new OneTimeTask() {
@Override
public void run() {
invokeChannelUnregisteredNow(ctx);
}
});
}
}
@Override
public void invokeChannelActive(final ChannelHandlerContext ctx) {
if (executor.inEventLoop()) {
invokeChannelActiveNow(ctx);
} else {
executor.execute(new OneTimeTask() {
@Override
public void run() {
invokeChannelActiveNow(ctx);
}
});
}
}
@Override
public void invokeChannelInactive(final ChannelHandlerContext ctx) {
if (executor.inEventLoop()) {
invokeChannelInactiveNow(ctx);
} else {
executor.execute(new OneTimeTask() {
@Override
public void run() {
invokeChannelInactiveNow(ctx);
}
});
}
}
@Override
public void invokeExceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) {
if (cause == null) {
throw new NullPointerException("cause");
}
if (executor.inEventLoop()) {
invokeExceptionCaughtNow(ctx, cause);
} else {
try {
executor.execute(new OneTimeTask() {
@Override
public void run() {
invokeExceptionCaughtNow(ctx, cause);
}
});
} catch (Throwable t) {
if (logger.isWarnEnabled()) {
logger.warn("Failed to submit an exceptionCaught() event.", t);
logger.warn("The exceptionCaught() event that was failed to submit was:", cause);
}
}
}
}
@Override
public void invokeUserEventTriggered(final ChannelHandlerContext ctx, final Object event) {
if (event == null) {
throw new NullPointerException("event");
}
if (executor.inEventLoop()) {
invokeUserEventTriggeredNow(ctx, event);
} else {
safeExecuteInbound(new OneTimeTask() {
@Override
public void run() {
invokeUserEventTriggeredNow(ctx, event);
}
}, event);
}
}
@Override
public void invokeChannelRead(final ChannelHandlerContext ctx, final Object msg) {
if (msg == null) {
throw new NullPointerException("msg");
}
if (executor.inEventLoop()) {
invokeChannelReadNow(ctx, msg);
} else {
safeExecuteInbound(new OneTimeTask() {
@Override
public void run() {
invokeChannelReadNow(ctx, msg);
}
}, msg);
}
}
@Override
public void invokeChannelReadComplete(final ChannelHandlerContext ctx) {
if (executor.inEventLoop()) {
invokeChannelReadCompleteNow(ctx);
} else {
Runnable task;
if (ctx instanceof AbstractChannelHandlerContext) {
AbstractChannelHandlerContext dctx = (AbstractChannelHandlerContext) ctx;
task = dctx.invokeChannelReadCompleteTask;
if (task == null) {
dctx.invokeChannelReadCompleteTask = task = new Runnable() {
@Override
public void run() {
invokeChannelReadCompleteNow(ctx);
}
};
}
} else {
task = new OneTimeTask() {
@Override
public void run() {
invokeChannelReadCompleteNow(ctx);
}
};
}
executor.execute(task);
}
}
@Override
public void invokeChannelWritabilityChanged(final ChannelHandlerContext ctx) {
if (executor.inEventLoop()) {
invokeChannelWritabilityChangedNow(ctx);
} else {
Runnable task;
if (ctx instanceof AbstractChannelHandlerContext) {
AbstractChannelHandlerContext dctx = (AbstractChannelHandlerContext) ctx;
task = dctx.invokeChannelWritableStateChangedTask;
if (task == null) {
dctx.invokeChannelWritableStateChangedTask = task = new Runnable() {
@Override
public void run() {
invokeChannelWritabilityChangedNow(ctx);
}
};
}
} else {
task = new OneTimeTask() {
@Override
public void run() {
invokeChannelWritabilityChangedNow(ctx);
}
};
}
executor.execute(task);
}
}
@Override
public void invokeBind(
final ChannelHandlerContext ctx, final SocketAddress localAddress, final ChannelPromise promise) {
if (localAddress == null) {
throw new NullPointerException("localAddress");
}
if (!validatePromise(ctx, promise, false)) {
// promise cancelled
return;
}
if (executor.inEventLoop()) {
invokeBindNow(ctx, localAddress, promise);
} else {
safeExecuteOutbound(new OneTimeTask() {
@Override
public void run() {
invokeBindNow(ctx, localAddress, promise);
}
}, promise);
}
}
@Override
public void invokeConnect(
final ChannelHandlerContext ctx,
final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
if (remoteAddress == null) {
throw new NullPointerException("remoteAddress");
}
if (!validatePromise(ctx, promise, false)) {
// promise cancelled
return;
}
if (executor.inEventLoop()) {
invokeConnectNow(ctx, remoteAddress, localAddress, promise);
} else {
safeExecuteOutbound(new OneTimeTask() {
@Override
public void run() {
invokeConnectNow(ctx, remoteAddress, localAddress, promise);
}
}, promise);
}
}
@Override
public void invokeDisconnect(final ChannelHandlerContext ctx, final ChannelPromise promise) {
if (!validatePromise(ctx, promise, false)) {
// promise cancelled
return;
}
if (executor.inEventLoop()) {
invokeDisconnectNow(ctx, promise);
} else {
safeExecuteOutbound(new OneTimeTask() {
@Override
public void run() {
invokeDisconnectNow(ctx, promise);
}
}, promise);
}
}
@Override
public void invokeClose(final ChannelHandlerContext ctx, final ChannelPromise promise) {
if (!validatePromise(ctx, promise, false)) {
// promise cancelled
return;
}
if (executor.inEventLoop()) {
invokeCloseNow(ctx, promise);
} else {
safeExecuteOutbound(new OneTimeTask() {
@Override
public void run() {
invokeCloseNow(ctx, promise);
}
}, promise);
}
}
@Override
public void invokeDeregister(final ChannelHandlerContext ctx, final ChannelPromise promise) {
if (!validatePromise(ctx, promise, false)) {
// promise cancelled
return;
}
if (executor.inEventLoop()) {
invokeDeregisterNow(ctx, promise);
} else {
safeExecuteOutbound(new OneTimeTask() {
@Override
public void run() {
invokeDeregisterNow(ctx, promise);
}
}, promise);
}
}
@Override
public void invokeRead(final ChannelHandlerContext ctx) {
if (executor.inEventLoop()) {
invokeReadNow(ctx);
} else {
Runnable task;
if (ctx instanceof AbstractChannelHandlerContext) {
AbstractChannelHandlerContext dctx = (AbstractChannelHandlerContext) ctx;
task = dctx.invokeReadTask;
if (task == null) {
dctx.invokeReadTask = task = new Runnable() {
@Override
public void run() {
invokeReadNow(ctx);
}
};
}
} else {
task = new OneTimeTask() {
@Override
public void run() {
invokeReadNow(ctx);
}
};
}
executor.execute(task);
}
}
@Override
public void invokeWrite(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
if (msg == null) {
throw new NullPointerException("msg");
}
try {
if (!validatePromise(ctx, promise, true)) {
ReferenceCountUtil.release(msg);
return;
}
} catch (RuntimeException e) {
ReferenceCountUtil.release(msg);
throw e;
}
if (executor.inEventLoop()) {
invokeWriteNow(ctx, msg, promise);
} else {
safeExecuteOutbound(WriteTask.newInstance(ctx, msg, promise), promise, msg);
}
}
@Override
public void invokeFlush(final ChannelHandlerContext ctx) {
if (executor.inEventLoop()) {
invokeFlushNow(ctx);
} else {
Runnable task;
if (ctx instanceof AbstractChannelHandlerContext) {
AbstractChannelHandlerContext dctx = (AbstractChannelHandlerContext) ctx;
task = dctx.invokeFlushTask;
if (task == null) {
dctx.invokeFlushTask = task = new Runnable() {
@Override
public void run() {
invokeFlushNow(ctx);
}
};
}
} else {
task = new OneTimeTask() {
@Override
public void run() {
invokeFlushNow(ctx);
}
};
}
executor.execute(task);
}
}
private void safeExecuteInbound(Runnable task, Object msg) {
boolean success = false;
try {
executor.execute(task);
success = true;
} finally {
if (!success) {
ReferenceCountUtil.release(msg);
}
}
}
private void safeExecuteOutbound(Runnable task, ChannelPromise promise) {
try {
executor.execute(task);
} catch (Throwable cause) {
promise.setFailure(cause);
}
}
private void safeExecuteOutbound(Runnable task, ChannelPromise promise, Object msg) {
try {
executor.execute(task);
} catch (Throwable cause) {
try {
promise.setFailure(cause);
} finally {
ReferenceCountUtil.release(msg);
}
}
}
static final class WriteTask extends RecyclableMpscLinkedQueueNode<SingleThreadEventLoop.NonWakeupRunnable>
implements SingleThreadEventLoop.NonWakeupRunnable {
private static final boolean ESTIMATE_TASK_SIZE_ON_SUBMIT =
SystemPropertyUtil.getBoolean("io.netty.transport.estimateSizeOnSubmit", true);
// Assuming a 64-bit JVM, 16 bytes object header, 3 reference fields and one int field, plus alignment
private static final int WRITE_TASK_OVERHEAD =
SystemPropertyUtil.getInt("io.netty.transport.writeTaskSizeOverhead", 48);
private ChannelHandlerContext ctx;
private Object msg;
private ChannelPromise promise;
private int size;
private static final Recycler<WriteTask> RECYCLER = new Recycler<WriteTask>() {
@Override
protected WriteTask newObject(Handle<WriteTask> handle) {
return new WriteTask(handle);
}
};
private static WriteTask newInstance(
ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
WriteTask task = RECYCLER.get();
task.ctx = ctx;
task.msg = msg;
task.promise = promise;
if (ESTIMATE_TASK_SIZE_ON_SUBMIT) {
ChannelOutboundBuffer buffer = ctx.channel().unsafe().outboundBuffer();
// Check for null as it may be set to null if the channel is closed already
if (buffer != null) {
task.size = ((AbstractChannel) ctx.channel()).estimatorHandle().size(msg) + WRITE_TASK_OVERHEAD;
buffer.incrementPendingOutboundBytes(task.size);
} else {
task.size = 0;
}
} else {
task.size = 0;
}
return task;
}
private WriteTask(Recycler.Handle<WriteTask> handle) {
super(handle);
}
@Override
public void run() {
try {
ChannelOutboundBuffer buffer = ctx.channel().unsafe().outboundBuffer();
// Check for null as it may be set to null if the channel is closed already
if (ESTIMATE_TASK_SIZE_ON_SUBMIT && buffer != null) {
buffer.decrementPendingOutboundBytes(size);
}
invokeWriteNow(ctx, msg, promise);
} finally {
// Set to null so the GC can collect them directly
ctx = null;
msg = null;
promise = null;
}
}
@Override
public SingleThreadEventLoop.NonWakeupRunnable value() {
return this;
}
}
}

View File

@ -65,10 +65,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
private final VoidChannelPromise voidPromise;
private final boolean touch = ResourceLeakDetector.isEnabled();
/**
* @see #findInvoker(EventExecutorGroup)
*/
private Map<EventExecutorGroup, ChannelHandlerInvoker> childInvokers;
private Map<EventExecutorGroup, EventExecutor> childExecutors;
/**
* This is the head of a linked list that is processed by {@link #callHandlerAddedForAllHandlers()} and so process
@ -86,7 +83,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
*/
private boolean registered;
DefaultChannelPipeline(AbstractChannel channel) {
public DefaultChannelPipeline(AbstractChannel channel) {
if (channel == null) {
throw new NullPointerException("channel");
}
@ -105,6 +102,29 @@ final class DefaultChannelPipeline implements ChannelPipeline {
return touch ? ReferenceCountUtil.touch(msg, next) : msg;
}
private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
}
private EventExecutor childExecutor(EventExecutorGroup group) {
if (group == null) {
return null;
}
Map<EventExecutorGroup, EventExecutor> childExecutors = this.childExecutors;
if (childExecutors == null) {
// Use size of 4 as most people only use one extra EventExecutor.
childExecutors = this.childExecutors = new IdentityHashMap<EventExecutorGroup, EventExecutor>(4);
}
// Pin one of the child executors once and remember it so that the same child executor
// is used to fire events for the same channel.
EventExecutor childExecutor = childExecutors.get(group);
if (childExecutor == null) {
childExecutor = group.next();
childExecutors.put(group, childExecutor);
}
return childExecutor;
}
@Override
public Channel channel() {
return channel;
@ -112,33 +132,24 @@ final class DefaultChannelPipeline implements ChannelPipeline {
@Override
public ChannelPipeline addFirst(String name, ChannelHandler handler) {
return addFirst(null, null, name, handler);
return addFirst(null, name, handler);
}
@Override
public ChannelPipeline addFirst(EventExecutorGroup group, String name, ChannelHandler handler) {
return addFirst(group, null, name, handler);
}
@Override
public ChannelPipeline addFirst(ChannelHandlerInvoker invoker, String name, ChannelHandler handler) {
return addFirst(null, invoker, name, handler);
}
private ChannelPipeline addFirst(
EventExecutorGroup group, ChannelHandlerInvoker invoker, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
final EventExecutor executor;
final boolean inEventLoop;
synchronized (this) {
if (name == null) {
name = generateName(handler);
} else {
checkDuplicateName(name);
}
checkMultiplicity(handler);
if (group != null) {
invoker = findInvoker(group);
}
newCtx = new DefaultChannelHandlerContext(this, invoker, filterName(name, handler), handler);
executor = executorSafe(invoker);
newCtx = newContext(group, name, handler);
executor = executorSafe(newCtx.executor);
// If the executor is null it means that the channel was not registered on an eventloop yet.
// In this case we add the context to the pipeline and add a task that will call
@ -180,35 +191,24 @@ final class DefaultChannelPipeline implements ChannelPipeline {
@Override
public ChannelPipeline addLast(String name, ChannelHandler handler) {
return addLast(null, null, name, handler);
return addLast(null, name, handler);
}
@Override
public ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
return addLast(group, null, name, handler);
}
@Override
public ChannelPipeline addLast(ChannelHandlerInvoker invoker, String name, ChannelHandler handler) {
return addLast(null, invoker, name, handler);
}
private ChannelPipeline addLast(EventExecutorGroup group, ChannelHandlerInvoker invoker,
String name, ChannelHandler handler) {
assertGroupAndInvoker(group, invoker);
final EventExecutor executor;
final AbstractChannelHandlerContext newCtx;
final boolean inEventLoop;
synchronized (this) {
if (name == null) {
name = generateName(handler);
} else {
checkDuplicateName(name);
}
checkMultiplicity(handler);
if (group != null) {
invoker = findInvoker(group);
}
newCtx = new DefaultChannelHandlerContext(this, invoker, filterName(name, handler), handler);
executor = executorSafe(invoker);
newCtx = newContext(group, name, handler);
executor = executorSafe(newCtx.executor);
// If the executor is null it means that the channel was not registered on an eventloop yet.
// In this case we add the context to the pipeline and add a task that will call
@ -249,25 +249,12 @@ final class DefaultChannelPipeline implements ChannelPipeline {
@Override
public ChannelPipeline addBefore(String baseName, String name, ChannelHandler handler) {
return addBefore(null, null, baseName, name, handler);
return addBefore(null, baseName, name, handler);
}
@Override
public ChannelPipeline addBefore(
EventExecutorGroup group, String baseName, String name, ChannelHandler handler) {
return addBefore(group, null, baseName, name, handler);
}
@Override
public ChannelPipeline addBefore(
ChannelHandlerInvoker invoker, String baseName, String name, ChannelHandler handler) {
return addBefore(null, invoker, baseName, name, handler);
}
private ChannelPipeline addBefore(EventExecutorGroup group,
ChannelHandlerInvoker invoker, String baseName, String name, ChannelHandler handler) {
assertGroupAndInvoker(group, invoker);
final EventExecutor executor;
final AbstractChannelHandlerContext newCtx;
final AbstractChannelHandlerContext ctx;
@ -275,13 +262,14 @@ final class DefaultChannelPipeline implements ChannelPipeline {
synchronized (this) {
checkMultiplicity(handler);
ctx = getContextOrDie(baseName);
if (group != null) {
invoker = findInvoker(group);
if (name == null) {
name = generateName(handler);
} else {
checkDuplicateName(name);
}
newCtx = new DefaultChannelHandlerContext(this, invoker, filterName(name, handler), handler);
executor = executorSafe(invoker);
newCtx = newContext(group, name, handler);
executor = executorSafe(newCtx.executor);
// If the executor is null it means that the channel was not registered on an eventloop yet.
// In this case we add the context to the pipeline and add a task that will call
@ -323,25 +311,12 @@ final class DefaultChannelPipeline implements ChannelPipeline {
@Override
public ChannelPipeline addAfter(String baseName, String name, ChannelHandler handler) {
return addAfter(null, null, baseName, name, handler);
return addAfter(null, baseName, name, handler);
}
@Override
public ChannelPipeline addAfter(
EventExecutorGroup group, String baseName, String name, ChannelHandler handler) {
return addAfter(group, null, baseName, name, handler);
}
@Override
public ChannelPipeline addAfter(
ChannelHandlerInvoker invoker, String baseName, String name, ChannelHandler handler) {
return addAfter(null, invoker, baseName, name, handler);
}
private ChannelPipeline addAfter(EventExecutorGroup group,
ChannelHandlerInvoker invoker, String baseName, String name, ChannelHandler handler) {
assertGroupAndInvoker(group, invoker);
EventExecutorGroup group, String baseName, final String name, ChannelHandler handler) {
final EventExecutor executor;
final AbstractChannelHandlerContext newCtx;
final AbstractChannelHandlerContext ctx;
@ -350,13 +325,10 @@ final class DefaultChannelPipeline implements ChannelPipeline {
synchronized (this) {
checkMultiplicity(handler);
ctx = getContextOrDie(baseName);
checkDuplicateName(name);
if (group != null) {
invoker = findInvoker(group);
}
newCtx = new DefaultChannelHandlerContext(this, invoker, filterName(name, handler), handler);
executor = executorSafe(invoker);
newCtx = newContext(group, name, handler);
executor = executorSafe(newCtx.executor);
// If the executor is null it means that the channel was not registered on an eventloop yet.
// In this case we remove the context from the pipeline and add a task that will call
@ -396,11 +368,11 @@ final class DefaultChannelPipeline implements ChannelPipeline {
@Override
public ChannelPipeline addFirst(ChannelHandler... handlers) {
return addFirst((ChannelHandlerInvoker) null, handlers);
return addFirst(null, handlers);
}
@Override
public ChannelPipeline addFirst(EventExecutorGroup group, ChannelHandler... handlers) {
public ChannelPipeline addFirst(EventExecutorGroup executor, ChannelHandler... handlers) {
if (handlers == null) {
throw new NullPointerException("handlers");
}
@ -417,31 +389,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
for (int i = size - 1; i >= 0; i --) {
ChannelHandler h = handlers[i];
addFirst(group, null, h);
}
return this;
}
@Override
public ChannelPipeline addFirst(ChannelHandlerInvoker invoker, ChannelHandler... handlers) {
if (handlers == null) {
throw new NullPointerException("handlers");
}
if (handlers.length == 0 || handlers[0] == null) {
return this;
}
int size;
for (size = 1; size < handlers.length; size ++) {
if (handlers[size] == null) {
break;
}
}
for (int i = size - 1; i >= 0; i --) {
ChannelHandler h = handlers[i];
addFirst(invoker, null, h);
addFirst(executor, generateName(h), h);
}
return this;
@ -449,11 +397,11 @@ final class DefaultChannelPipeline implements ChannelPipeline {
@Override
public ChannelPipeline addLast(ChannelHandler... handlers) {
return addLast((ChannelHandlerInvoker) null, handlers);
return addLast(null, handlers);
}
@Override
public ChannelPipeline addLast(EventExecutorGroup group, ChannelHandler... handlers) {
public ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {
if (handlers == null) {
throw new NullPointerException("handlers");
}
@ -462,55 +410,12 @@ final class DefaultChannelPipeline implements ChannelPipeline {
if (h == null) {
break;
}
addLast(group, null, h);
addLast(executor, generateName(h), h);
}
return this;
}
@Override
public ChannelPipeline addLast(ChannelHandlerInvoker invoker, ChannelHandler... handlers) {
if (handlers == null) {
throw new NullPointerException("handlers");
}
for (ChannelHandler h: handlers) {
if (h == null) {
break;
}
addLast(invoker, null, h);
}
return this;
}
private ChannelHandlerInvoker findInvoker(EventExecutorGroup group) {
if (group == null) {
return null;
}
// Lazily initialize the data structure that maps an EventExecutorGroup to a ChannelHandlerInvoker.
Map<EventExecutorGroup, ChannelHandlerInvoker> childInvokers = this.childInvokers;
if (childInvokers == null) {
childInvokers = this.childInvokers = new IdentityHashMap<EventExecutorGroup, ChannelHandlerInvoker>(4);
}
// Pick one of the child executors and remember its invoker
// so that the same invoker is used to fire events for the same channel.
ChannelHandlerInvoker invoker = childInvokers.get(group);
if (invoker == null) {
EventExecutor executor = group.next();
if (executor instanceof EventLoop) {
invoker = ((EventLoop) executor).asInvoker();
} else {
invoker = new DefaultChannelHandlerInvoker(executor);
}
childInvokers.put(group, invoker);
}
return invoker;
}
private String generateName(ChannelHandler handler) {
Map<Class<?>, String> cache = nameCaches.get();
Class<?> handlerType = handler.getClass();
@ -562,7 +467,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
final EventExecutor executor;
final boolean inEventLoop;
synchronized (this) {
executor = executorSafe(ctx.invoker);
executor = executorSafe(ctx.executor);
// If the executor is null it means that the channel was not registered on an eventloop yet.
// In this case we remove the context from the pipeline and add a task that will call
@ -643,15 +548,17 @@ final class DefaultChannelPipeline implements ChannelPipeline {
final boolean inEventLoop;
synchronized (this) {
checkMultiplicity(newHandler);
if (newName == null) {
newName = ctx.name();
} else if (!ctx.name().equals(newName)) {
newName = filterName(newName, newHandler);
newName = generateName(newHandler);
} else {
boolean sameName = ctx.name().equals(newName);
if (!sameName) {
checkDuplicateName(newName);
}
}
newCtx = new DefaultChannelHandlerContext(this, ctx.invoker, newName, newHandler);
executor = executorSafe(ctx.invoker);
newCtx = newContext(ctx.executor, newName, newHandler);
executor = executorSafe(ctx.executor);
// If the executor is null it means that the channel was not registered on an eventloop yet.
// In this case we replace the context in the pipeline
@ -1160,7 +1067,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
}
@Override
public ChannelFuture deregister(ChannelPromise promise) {
public ChannelFuture deregister(final ChannelPromise promise) {
return tail.deregister(promise);
}
@ -1215,16 +1122,10 @@ final class DefaultChannelPipeline implements ChannelPipeline {
return voidPromise;
}
private String filterName(String name, ChannelHandler handler) {
if (name == null) {
return generateName(handler);
private void checkDuplicateName(String name) {
if (context0(name) != null) {
throw new IllegalArgumentException("Duplicate handler name: " + name);
}
if (context0(name) == null) {
return name;
}
throw new IllegalArgumentException("Duplicate handler name: " + name);
}
private AbstractChannelHandlerContext context0(String name) {
@ -1310,18 +1211,14 @@ final class DefaultChannelPipeline implements ChannelPipeline {
}
}
private EventExecutor executorSafe(ChannelHandlerInvoker invoker) {
if (invoker == null) {
private EventExecutor executorSafe(EventExecutor eventExecutor) {
if (eventExecutor == null) {
// We check for channel().isRegistered and handlerAdded because even if isRegistered() is false we
// can safely access the invoker() if handlerAdded is true. This is because in this case the Channel
// can safely access the eventLoop() if handlerAdded is true. This is because in this case the Channel
// was previously registered and so we can still access the old EventLoop to dispatch things.
return channel.isRegistered() || registered ? channel.eventLoop() : null;
}
return invoker.executor();
}
private static void assertGroupAndInvoker(EventExecutorGroup group, ChannelHandlerInvoker invoker) {
assert group == null || invoker == null : "either group or invoker must be null";
return eventExecutor;
}
// A special catch-all handler that handles both bytes and messages.
@ -1397,7 +1294,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
private static final String HEAD_NAME = generateName0(HeadContext.class);
private final Unsafe unsafe;
protected final Unsafe unsafe;
HeadContext(DefaultChannelPipeline pipeline) {
super(pipeline, null, HEAD_NAME, false, true);
@ -1410,14 +1307,13 @@ final class DefaultChannelPipeline implements ChannelPipeline {
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception { }
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
// NOOP
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { }
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.fireExceptionCaught(cause);
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
// NOOP
}
@Override
@ -1464,6 +1360,11 @@ final class DefaultChannelPipeline implements ChannelPipeline {
public void flush(ChannelHandlerContext ctx) throws Exception {
unsafe.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.fireExceptionCaught(cause);
}
}
private abstract static class PendingHandlerCallback extends OneTimeTask {

View File

@ -27,10 +27,4 @@ import io.netty.util.concurrent.EventExecutor;
public interface EventLoop extends EventExecutor, EventLoopGroup {
@Override
EventLoopGroup parent();
/**
* Creates a new default {@link ChannelHandlerInvoker} implementation that uses this {@link EventLoop} to
* invoke event handler methods.
*/
ChannelHandlerInvoker asInvoker();
}

View File

@ -26,8 +26,6 @@ import java.util.concurrent.ThreadFactory;
*/
public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop {
private final ChannelHandlerInvoker invoker = new DefaultChannelHandlerInvoker(this);
protected SingleThreadEventLoop(EventLoopGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) {
super(parent, threadFactory, addTaskWakesUp);
}
@ -46,11 +44,6 @@ public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor im
return (EventLoop) super.next();
}
@Override
public ChannelHandlerInvoker asInvoker() {
return invoker;
}
@Override
public ChannelFuture register(Channel channel) {
return register(channel, new DefaultChannelPromise(channel, this));

View File

@ -17,24 +17,18 @@ package io.netty.channel.embedded;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelHandlerInvoker;
import io.netty.channel.ChannelPromise;
import io.netty.channel.DefaultChannelPromise;
import io.netty.util.concurrent.EventExecutor;
import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
import io.netty.util.concurrent.AbstractScheduledEventExecutor;
import io.netty.util.concurrent.Future;
import java.net.SocketAddress;
import java.util.ArrayDeque;
import java.util.Queue;
import java.util.concurrent.TimeUnit;
import static io.netty.channel.ChannelHandlerInvokerUtil.*;
final class EmbeddedEventLoop extends AbstractScheduledEventExecutor implements ChannelHandlerInvoker, EventLoop {
final class EmbeddedEventLoop extends AbstractScheduledEventExecutor implements EventLoop {
private final Queue<Runnable> tasks = new ArrayDeque<Runnable>(2);
@ -144,101 +138,4 @@ final class EmbeddedEventLoop extends AbstractScheduledEventExecutor implements
public boolean inEventLoop(Thread thread) {
return true;
}
@Override
public ChannelHandlerInvoker asInvoker() {
return this;
}
@Override
public EventExecutor executor() {
return this;
}
@Override
public void invokeChannelRegistered(ChannelHandlerContext ctx) {
invokeChannelRegisteredNow(ctx);
}
@Override
public void invokeChannelUnregistered(ChannelHandlerContext ctx) {
invokeChannelUnregisteredNow(ctx);
}
@Override
public void invokeChannelActive(ChannelHandlerContext ctx) {
invokeChannelActiveNow(ctx);
}
@Override
public void invokeChannelInactive(ChannelHandlerContext ctx) {
invokeChannelInactiveNow(ctx);
}
@Override
public void invokeExceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
invokeExceptionCaughtNow(ctx, cause);
}
@Override
public void invokeUserEventTriggered(ChannelHandlerContext ctx, Object event) {
invokeUserEventTriggeredNow(ctx, event);
}
@Override
public void invokeChannelRead(ChannelHandlerContext ctx, Object msg) {
invokeChannelReadNow(ctx, msg);
}
@Override
public void invokeChannelReadComplete(ChannelHandlerContext ctx) {
invokeChannelReadCompleteNow(ctx);
}
@Override
public void invokeChannelWritabilityChanged(ChannelHandlerContext ctx) {
invokeChannelWritabilityChangedNow(ctx);
}
@Override
public void invokeBind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) {
invokeBindNow(ctx, localAddress, promise);
}
@Override
public void invokeConnect(
ChannelHandlerContext ctx,
SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
invokeConnectNow(ctx, remoteAddress, localAddress, promise);
}
@Override
public void invokeDisconnect(ChannelHandlerContext ctx, ChannelPromise promise) {
invokeDisconnectNow(ctx, promise);
}
@Override
public void invokeClose(ChannelHandlerContext ctx, ChannelPromise promise) {
invokeCloseNow(ctx, promise);
}
@Override
public void invokeDeregister(ChannelHandlerContext ctx, ChannelPromise promise) {
invokeDeregisterNow(ctx, promise);
}
@Override
public void invokeRead(ChannelHandlerContext ctx) {
invokeReadNow(ctx);
}
@Override
public void invokeWrite(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
invokeWriteNow(ctx, msg, promise);
}
@Override
public void invokeFlush(ChannelHandlerContext ctx) {
invokeFlushNow(ctx);
}
}

View File

@ -32,8 +32,6 @@ public class AbstractChannelTest {
EventLoop eventLoop = createNiceMock(EventLoop.class);
// This allows us to have a single-threaded test
expect(eventLoop.inEventLoop()).andReturn(true).anyTimes();
ChannelHandlerInvoker invoker = new DefaultChannelHandlerInvoker(eventLoop);
expect(eventLoop.asInvoker()).andReturn(invoker).anyTimes();
TestChannel channel = new TestChannel();
ChannelInboundHandler handler = createMock(ChannelInboundHandler.class);
@ -65,8 +63,6 @@ public class AbstractChannelTest {
return null;
}
}).once();
ChannelHandlerInvoker invoker = new DefaultChannelHandlerInvoker(eventLoop);
expect(eventLoop.asInvoker()).andReturn(invoker).anyTimes();
final TestChannel channel = new TestChannel();
ChannelInboundHandler handler = createMock(ChannelInboundHandler.class);

View File

@ -363,11 +363,11 @@ public class CombinedChannelDuplexHandlerTest {
ChannelPipeline pipeline = ch.pipeline();
ChannelPromise promise = ch.newPromise();
pipeline.connect(null, null, promise);
pipeline.connect(new InetSocketAddress(0), null, promise);
promise.syncUninterruptibly();
promise = ch.newPromise();
pipeline.bind(null, promise);
pipeline.bind(new InetSocketAddress(0), promise);
promise.syncUninterruptibly();
promise = ch.newPromise();

View File

@ -1,67 +0,0 @@
/*
* Copyright 2016 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel;
import io.netty.util.ReferenceCounted;
import io.netty.util.concurrent.ImmediateEventExecutor;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import static org.junit.Assert.*;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public class DefaultChannelHandlerInvokerTest {
@Mock
private ReferenceCounted msg;
@Mock
private ChannelHandlerContext ctx;
@Mock
private ChannelPromise promise;
@Before
public void setup() {
MockitoAnnotations.initMocks(this);
}
@Test
public void writeWithInvalidPromiseStillReleasesMessage() {
when(promise.isDone()).thenReturn(true);
DefaultChannelHandlerInvoker invoker = new DefaultChannelHandlerInvoker(ImmediateEventExecutor.INSTANCE);
try {
invoker.invokeWrite(ctx, msg, promise);
} catch (IllegalArgumentException e) {
verify(msg).release();
return;
}
fail();
}
@Test
public void writeWithNullPromiseStillReleasesMessage() {
when(promise.isDone()).thenReturn(true);
DefaultChannelHandlerInvoker invoker = new DefaultChannelHandlerInvoker(ImmediateEventExecutor.INSTANCE);
try {
invoker.invokeWrite(ctx, msg, null);
} catch (NullPointerException e) {
verify(msg).release();
return;
}
fail();
}
}