Revert "Protect ChannelHandler from reentrancee issues (#9358)"
This reverts commit 48634f14665c864a47c6552f59fbce8d009780be.
This commit is contained in:
parent
745814b382
commit
4a8476af67
@ -73,8 +73,6 @@ public class WebSocketProtocolHandlerTest {
|
||||
// When
|
||||
channel.read();
|
||||
|
||||
channel.runPendingTasks();
|
||||
|
||||
// Then - pong frame was written to the outbound
|
||||
PongWebSocketFrame response1 = channel.readOutbound();
|
||||
assertEquals(text1, response1.content().toString(UTF_8));
|
||||
|
@ -417,8 +417,6 @@ public abstract class Http2MultiplexTest<C extends Http2FrameCodec> {
|
||||
Http2StreamChannel childChannel = newOutboundStream(handler);
|
||||
assertTrue(childChannel.isActive());
|
||||
|
||||
parentChannel.runPendingTasks();
|
||||
|
||||
childChannel.close();
|
||||
verify(frameWriter).writeRstStream(eqCodecCtx(),
|
||||
eqStreamId(childChannel), eq(Http2Error.CANCEL.code()), anyChannelPromise());
|
||||
@ -450,7 +448,6 @@ public abstract class Http2MultiplexTest<C extends Http2FrameCodec> {
|
||||
ctx.fireChannelActive();
|
||||
}
|
||||
});
|
||||
parentChannel.runPendingTasks();
|
||||
|
||||
assertFalse(childChannel.isActive());
|
||||
|
||||
@ -529,8 +526,6 @@ public abstract class Http2MultiplexTest<C extends Http2FrameCodec> {
|
||||
Http2Headers headers = new DefaultHttp2Headers().scheme("https").method("GET").path("/foo.txt");
|
||||
childChannel.writeAndFlush(new DefaultHttp2HeadersFrame(headers));
|
||||
|
||||
parentChannel.runPendingTasks();
|
||||
|
||||
// Read from the child channel
|
||||
frameInboundWriter.writeInboundHeaders(childChannel.stream().id(), headers, 0, false);
|
||||
|
||||
|
@ -357,18 +357,18 @@ public class SocketHalfClosedTest extends AbstractSocketTest {
|
||||
public void channelActive(ChannelHandlerContext ctx) throws Exception {
|
||||
ByteBuf buf = ctx.alloc().buffer(expectedBytes);
|
||||
buf.writerIndex(buf.writerIndex() + expectedBytes);
|
||||
ctx.writeAndFlush(buf.retainedDuplicate()).addListener((ChannelFutureListener) f -> {
|
||||
// We wait here to ensure that we write before we have a chance to process the outbound
|
||||
// shutdown event.
|
||||
followerCloseLatch.await();
|
||||
ctx.writeAndFlush(buf.retainedDuplicate());
|
||||
|
||||
// This write should fail, but we should still be allowed to read the peer's data
|
||||
ctx.writeAndFlush(buf).addListener((ChannelFutureListener) future -> {
|
||||
if (future.cause() == null) {
|
||||
causeRef.set(new IllegalStateException("second write should have failed!"));
|
||||
doneLatch.countDown();
|
||||
}
|
||||
});
|
||||
// We wait here to ensure that we write before we have a chance to process the outbound
|
||||
// shutdown event.
|
||||
followerCloseLatch.await();
|
||||
|
||||
// This write should fail, but we should still be allowed to read the peer's data
|
||||
ctx.writeAndFlush(buf).addListener((ChannelFutureListener) future -> {
|
||||
if (future.cause() == null) {
|
||||
causeRef.set(new IllegalStateException("second write should have failed!"));
|
||||
doneLatch.countDown();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
|
@ -25,9 +25,9 @@ import io.netty.util.ResourceLeakHint;
|
||||
import io.netty.util.concurrent.EventExecutor;
|
||||
import io.netty.util.internal.ObjectPool;
|
||||
import io.netty.util.internal.PromiseNotificationUtil;
|
||||
import io.netty.util.internal.SystemPropertyUtil;
|
||||
import io.netty.util.internal.StringUtil;
|
||||
import io.netty.util.internal.ThrowableUtil;
|
||||
import io.netty.util.internal.StringUtil;
|
||||
import io.netty.util.internal.SystemPropertyUtil;
|
||||
import io.netty.util.internal.logging.InternalLogger;
|
||||
import io.netty.util.internal.logging.InternalLoggerFactory;
|
||||
|
||||
@ -68,10 +68,6 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou
|
||||
DefaultChannelHandlerContext prev;
|
||||
private int handlerState = INIT;
|
||||
|
||||
// Keeps track of processing different events
|
||||
private short outboundOperations;
|
||||
private short inboundOperations;
|
||||
|
||||
DefaultChannelHandlerContext(DefaultChannelPipeline pipeline, String name,
|
||||
ChannelHandler handler) {
|
||||
this.name = requireNonNull(name, "name");
|
||||
@ -118,54 +114,6 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou
|
||||
return name;
|
||||
}
|
||||
|
||||
private boolean isProcessInboundDirectly() {
|
||||
assert inboundOperations >= 0;
|
||||
return inboundOperations == 0;
|
||||
}
|
||||
|
||||
private boolean isProcessOutboundDirectly() {
|
||||
assert outboundOperations >= 0;
|
||||
return outboundOperations == 0;
|
||||
}
|
||||
|
||||
private void incrementOutboundOperations() {
|
||||
assert outboundOperations >= 0;
|
||||
outboundOperations++;
|
||||
}
|
||||
|
||||
private void decrementOutboundOperations() {
|
||||
assert outboundOperations > 0;
|
||||
outboundOperations--;
|
||||
}
|
||||
|
||||
private void incrementInboundOperations() {
|
||||
assert inboundOperations >= 0;
|
||||
inboundOperations++;
|
||||
}
|
||||
|
||||
private void decrementInboundOperations() {
|
||||
assert inboundOperations > 0;
|
||||
inboundOperations--;
|
||||
}
|
||||
|
||||
private static void executeInboundReentrance(DefaultChannelHandlerContext context, Runnable task) {
|
||||
context.incrementInboundOperations();
|
||||
try {
|
||||
context.executor().execute(task);
|
||||
} catch (Throwable cause) {
|
||||
context.decrementInboundOperations();
|
||||
throw cause;
|
||||
}
|
||||
}
|
||||
|
||||
private static void executeOutboundReentrance(
|
||||
DefaultChannelHandlerContext context, Runnable task, ChannelPromise promise, Object msg) {
|
||||
context.incrementOutboundOperations();
|
||||
if (!safeExecute(context.executor(), task, promise, msg)) {
|
||||
context.decrementOutboundOperations();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelHandlerContext fireChannelRegistered() {
|
||||
EventExecutor executor = executor();
|
||||
@ -178,26 +126,14 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou
|
||||
}
|
||||
|
||||
private void findAndInvokeChannelRegistered() {
|
||||
DefaultChannelHandlerContext context = findContextInbound(MASK_CHANNEL_REGISTERED);
|
||||
if (context.isProcessInboundDirectly()) {
|
||||
context.invokeChannelRegistered();
|
||||
} else {
|
||||
executeInboundReentrance(context, context::invokeChannelRegistered0);
|
||||
}
|
||||
findContextInbound(MASK_CHANNEL_REGISTERED).invokeChannelRegistered();
|
||||
}
|
||||
|
||||
void invokeChannelRegistered() {
|
||||
incrementInboundOperations();
|
||||
invokeChannelRegistered0();
|
||||
}
|
||||
|
||||
private void invokeChannelRegistered0() {
|
||||
try {
|
||||
handler().channelRegistered(this);
|
||||
} catch (Throwable t) {
|
||||
notifyHandlerException(t);
|
||||
} finally {
|
||||
decrementInboundOperations();
|
||||
}
|
||||
}
|
||||
|
||||
@ -213,26 +149,14 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou
|
||||
}
|
||||
|
||||
private void findAndInvokeChannelUnregistered() {
|
||||
DefaultChannelHandlerContext context = findContextInbound(MASK_CHANNEL_UNREGISTERED);
|
||||
if (context.isProcessInboundDirectly()) {
|
||||
context.invokeChannelUnregistered();
|
||||
} else {
|
||||
executeInboundReentrance(context, context::invokeChannelUnregistered0);
|
||||
}
|
||||
findContextInbound(MASK_CHANNEL_UNREGISTERED).invokeChannelUnregistered();
|
||||
}
|
||||
|
||||
void invokeChannelUnregistered() {
|
||||
incrementInboundOperations();
|
||||
invokeChannelUnregistered0();
|
||||
}
|
||||
|
||||
private void invokeChannelUnregistered0() {
|
||||
try {
|
||||
handler().channelUnregistered(this);
|
||||
} catch (Throwable t) {
|
||||
notifyHandlerException(t);
|
||||
} finally {
|
||||
decrementInboundOperations();
|
||||
}
|
||||
}
|
||||
|
||||
@ -248,26 +172,14 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou
|
||||
}
|
||||
|
||||
private void findAndInvokeChannelActive() {
|
||||
DefaultChannelHandlerContext context = findContextInbound(MASK_CHANNEL_ACTIVE);
|
||||
if (context.isProcessInboundDirectly()) {
|
||||
context.invokeChannelActive();
|
||||
} else {
|
||||
executeInboundReentrance(context, context::invokeChannelActive0);
|
||||
}
|
||||
findContextInbound(MASK_CHANNEL_ACTIVE).invokeChannelActive();
|
||||
}
|
||||
|
||||
void invokeChannelActive() {
|
||||
incrementInboundOperations();
|
||||
invokeChannelActive0();
|
||||
}
|
||||
|
||||
private void invokeChannelActive0() {
|
||||
try {
|
||||
handler().channelActive(this);
|
||||
} catch (Throwable t) {
|
||||
notifyHandlerException(t);
|
||||
} finally {
|
||||
decrementInboundOperations();
|
||||
}
|
||||
}
|
||||
|
||||
@ -283,26 +195,14 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou
|
||||
}
|
||||
|
||||
private void findAndInvokeChannelInactive() {
|
||||
DefaultChannelHandlerContext context = findContextInbound(MASK_CHANNEL_INACTIVE);
|
||||
if (context.isProcessInboundDirectly()) {
|
||||
context.invokeChannelInactive();
|
||||
} else {
|
||||
executeInboundReentrance(context, context::invokeChannelInactive0);
|
||||
}
|
||||
findContextInbound(MASK_CHANNEL_INACTIVE).invokeChannelInactive();
|
||||
}
|
||||
|
||||
void invokeChannelInactive() {
|
||||
incrementInboundOperations();
|
||||
invokeChannelInactive0();
|
||||
}
|
||||
|
||||
private void invokeChannelInactive0() {
|
||||
try {
|
||||
handler().channelInactive(this);
|
||||
} catch (Throwable t) {
|
||||
notifyHandlerException(t);
|
||||
} finally {
|
||||
decrementInboundOperations();
|
||||
}
|
||||
}
|
||||
|
||||
@ -326,20 +226,10 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou
|
||||
}
|
||||
|
||||
private void findAndInvokeExceptionCaught(Throwable cause) {
|
||||
DefaultChannelHandlerContext context = findContextInbound(MASK_EXCEPTION_CAUGHT);
|
||||
if (context.isProcessInboundDirectly()) {
|
||||
context.invokeExceptionCaught(cause);
|
||||
} else {
|
||||
executeInboundReentrance(context, () -> context.invokeExceptionCaught0(cause));
|
||||
}
|
||||
findContextInbound(MASK_EXCEPTION_CAUGHT).invokeExceptionCaught(cause);
|
||||
}
|
||||
|
||||
void invokeExceptionCaught(final Throwable cause) {
|
||||
incrementInboundOperations();
|
||||
invokeExceptionCaught0(cause);
|
||||
}
|
||||
|
||||
private void invokeExceptionCaught0(final Throwable cause) {
|
||||
try {
|
||||
handler().exceptionCaught(this, cause);
|
||||
} catch (Throwable error) {
|
||||
@ -355,8 +245,6 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou
|
||||
"was thrown by a user handler's exceptionCaught() " +
|
||||
"method while handling the following exception:", error, cause);
|
||||
}
|
||||
} finally {
|
||||
decrementInboundOperations();
|
||||
}
|
||||
}
|
||||
|
||||
@ -373,26 +261,14 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou
|
||||
}
|
||||
|
||||
private void findAndInvokeUserEventTriggered(Object event) {
|
||||
DefaultChannelHandlerContext context = findContextInbound(MASK_USER_EVENT_TRIGGERED);
|
||||
if (context.isProcessInboundDirectly()) {
|
||||
context.invokeUserEventTriggered(event);
|
||||
} else {
|
||||
executeInboundReentrance(context, () -> context.invokeUserEventTriggered0(event));
|
||||
}
|
||||
findContextInbound(MASK_USER_EVENT_TRIGGERED).invokeUserEventTriggered(event);
|
||||
}
|
||||
|
||||
void invokeUserEventTriggered(Object event) {
|
||||
incrementInboundOperations();
|
||||
invokeUserEventTriggered0(event);
|
||||
}
|
||||
|
||||
private void invokeUserEventTriggered0(Object event) {
|
||||
try {
|
||||
handler().userEventTriggered(this, event);
|
||||
} catch (Throwable t) {
|
||||
notifyHandlerException(t);
|
||||
} finally {
|
||||
decrementInboundOperations();
|
||||
}
|
||||
}
|
||||
|
||||
@ -414,27 +290,15 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou
|
||||
}
|
||||
|
||||
private void findAndInvokeChannelRead(Object msg) {
|
||||
DefaultChannelHandlerContext context = findContextInbound(MASK_CHANNEL_READ);
|
||||
if (context.isProcessInboundDirectly()) {
|
||||
context.invokeChannelRead(msg);
|
||||
} else {
|
||||
executeInboundReentrance(context, () -> context.invokeChannelRead0(msg));
|
||||
}
|
||||
findContextInbound(MASK_CHANNEL_READ).invokeChannelRead(msg);
|
||||
}
|
||||
|
||||
void invokeChannelRead(Object msg) {
|
||||
incrementInboundOperations();
|
||||
invokeChannelRead0(msg);
|
||||
}
|
||||
|
||||
private void invokeChannelRead0(Object msg) {
|
||||
final Object m = pipeline.touch(requireNonNull(msg, "msg"), this);
|
||||
try {
|
||||
handler().channelRead(this, m);
|
||||
} catch (Throwable t) {
|
||||
notifyHandlerException(t);
|
||||
} finally {
|
||||
decrementInboundOperations();
|
||||
}
|
||||
}
|
||||
|
||||
@ -451,26 +315,14 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou
|
||||
}
|
||||
|
||||
private void findAndInvokeChannelReadComplete() {
|
||||
DefaultChannelHandlerContext context = findContextInbound(MASK_CHANNEL_READ_COMPLETE);
|
||||
if (context.isProcessInboundDirectly()) {
|
||||
context.invokeChannelReadComplete();
|
||||
} else {
|
||||
executeInboundReentrance(context, context::invokeChannelReadComplete0);
|
||||
}
|
||||
findContextInbound(MASK_CHANNEL_READ_COMPLETE).invokeChannelReadComplete();
|
||||
}
|
||||
|
||||
void invokeChannelReadComplete() {
|
||||
incrementInboundOperations();
|
||||
invokeChannelReadComplete0();
|
||||
}
|
||||
|
||||
private void invokeChannelReadComplete0() {
|
||||
try {
|
||||
handler().channelReadComplete(this);
|
||||
} catch (Throwable t) {
|
||||
notifyHandlerException(t);
|
||||
} finally {
|
||||
decrementInboundOperations();
|
||||
}
|
||||
}
|
||||
|
||||
@ -487,26 +339,14 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou
|
||||
}
|
||||
|
||||
private void findAndInvokeChannelWritabilityChanged() {
|
||||
DefaultChannelHandlerContext context = findContextInbound(MASK_CHANNEL_WRITABILITY_CHANGED);
|
||||
if (context.isProcessInboundDirectly()) {
|
||||
context.invokeChannelWritabilityChanged();
|
||||
} else {
|
||||
executeInboundReentrance(context, context::invokeChannelWritabilityChanged0);
|
||||
}
|
||||
findContextInbound(MASK_CHANNEL_WRITABILITY_CHANGED).invokeChannelWritabilityChanged();
|
||||
}
|
||||
|
||||
void invokeChannelWritabilityChanged() {
|
||||
incrementInboundOperations();
|
||||
invokeChannelWritabilityChanged0();
|
||||
}
|
||||
|
||||
private void invokeChannelWritabilityChanged0() {
|
||||
try {
|
||||
handler().channelWritabilityChanged(this);
|
||||
} catch (Throwable t) {
|
||||
notifyHandlerException(t);
|
||||
} finally {
|
||||
decrementInboundOperations();
|
||||
}
|
||||
}
|
||||
|
||||
@ -563,26 +403,14 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou
|
||||
}
|
||||
|
||||
private void findAndInvokeBind(SocketAddress localAddress, ChannelPromise promise) {
|
||||
DefaultChannelHandlerContext context = findContextOutbound(MASK_BIND);
|
||||
if (context.isProcessOutboundDirectly()) {
|
||||
context.invokeBind(localAddress, promise);
|
||||
} else {
|
||||
executeOutboundReentrance(context, () -> context.invokeBind0(localAddress, promise), promise, null);
|
||||
}
|
||||
findContextOutbound(MASK_BIND).invokeBind(localAddress, promise);
|
||||
}
|
||||
|
||||
private void invokeBind(SocketAddress localAddress, ChannelPromise promise) {
|
||||
incrementOutboundOperations();
|
||||
invokeBind0(localAddress, promise);
|
||||
}
|
||||
|
||||
private void invokeBind0(SocketAddress localAddress, ChannelPromise promise) {
|
||||
try {
|
||||
handler().bind(this, localAddress, promise);
|
||||
} catch (Throwable t) {
|
||||
notifyOutboundHandlerException(t, promise);
|
||||
} finally {
|
||||
decrementOutboundOperations();
|
||||
}
|
||||
}
|
||||
|
||||
@ -610,27 +438,14 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou
|
||||
}
|
||||
|
||||
private void findAndInvokeConnect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
|
||||
DefaultChannelHandlerContext context = findContextOutbound(MASK_CONNECT);
|
||||
if (context.isProcessOutboundDirectly()) {
|
||||
context.invokeConnect(remoteAddress, localAddress, promise);
|
||||
} else {
|
||||
executeOutboundReentrance(context, () -> context.invokeConnect0(remoteAddress, localAddress, promise),
|
||||
promise, null);
|
||||
}
|
||||
findContextOutbound(MASK_CONNECT).invokeConnect(remoteAddress, localAddress, promise);
|
||||
}
|
||||
|
||||
private void invokeConnect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
|
||||
incrementOutboundOperations();
|
||||
invokeConnect0(remoteAddress, localAddress, promise);
|
||||
}
|
||||
|
||||
private void invokeConnect0(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
|
||||
try {
|
||||
handler().connect(this, remoteAddress, localAddress, promise);
|
||||
} catch (Throwable t) {
|
||||
notifyOutboundHandlerException(t, promise);
|
||||
} finally {
|
||||
decrementOutboundOperations();
|
||||
}
|
||||
}
|
||||
|
||||
@ -657,26 +472,14 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou
|
||||
}
|
||||
|
||||
private void findAndInvokeDisconnect(ChannelPromise promise) {
|
||||
DefaultChannelHandlerContext context = findContextOutbound(MASK_DISCONNECT);
|
||||
if (context.isProcessOutboundDirectly()) {
|
||||
context.invokeDisconnect(promise);
|
||||
} else {
|
||||
executeOutboundReentrance(context, () -> context.invokeDisconnect0(promise), promise, null);
|
||||
}
|
||||
findContextOutbound(MASK_DISCONNECT).invokeDisconnect(promise);
|
||||
}
|
||||
|
||||
private void invokeDisconnect(ChannelPromise promise) {
|
||||
incrementOutboundOperations();
|
||||
invokeDisconnect0(promise);
|
||||
}
|
||||
|
||||
private void invokeDisconnect0(ChannelPromise promise) {
|
||||
try {
|
||||
handler().disconnect(this, promise);
|
||||
} catch (Throwable t) {
|
||||
notifyOutboundHandlerException(t, promise);
|
||||
} finally {
|
||||
decrementOutboundOperations();
|
||||
}
|
||||
}
|
||||
|
||||
@ -697,26 +500,14 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou
|
||||
}
|
||||
|
||||
private void findAndInvokeClose(ChannelPromise promise) {
|
||||
DefaultChannelHandlerContext context = findContextOutbound(MASK_CLOSE);
|
||||
if (context.isProcessOutboundDirectly()) {
|
||||
context.invokeClose(promise);
|
||||
} else {
|
||||
executeOutboundReentrance(context, () -> context.invokeClose0(promise), promise, null);
|
||||
}
|
||||
findContextOutbound(MASK_CLOSE).invokeClose(promise);
|
||||
}
|
||||
|
||||
private void invokeClose(ChannelPromise promise) {
|
||||
incrementOutboundOperations();
|
||||
invokeClose0(promise);
|
||||
}
|
||||
|
||||
private void invokeClose0(ChannelPromise promise) {
|
||||
try {
|
||||
handler().close(this, promise);
|
||||
} catch (Throwable t) {
|
||||
notifyOutboundHandlerException(t, promise);
|
||||
} finally {
|
||||
decrementOutboundOperations();
|
||||
}
|
||||
}
|
||||
|
||||
@ -737,26 +528,14 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou
|
||||
}
|
||||
|
||||
private void findAndInvokeRegister(ChannelPromise promise) {
|
||||
DefaultChannelHandlerContext context = findContextOutbound(MASK_REGISTER);
|
||||
if (context.isProcessOutboundDirectly()) {
|
||||
context.invokeRegister(promise);
|
||||
} else {
|
||||
executeOutboundReentrance(context, () -> context.invokeRegister0(promise), promise, null);
|
||||
}
|
||||
findContextOutbound(MASK_REGISTER).invokeRegister(promise);
|
||||
}
|
||||
|
||||
private void invokeRegister(ChannelPromise promise) {
|
||||
incrementOutboundOperations();
|
||||
invokeRegister0(promise);
|
||||
}
|
||||
|
||||
private void invokeRegister0(ChannelPromise promise) {
|
||||
try {
|
||||
handler().register(this, promise);
|
||||
} catch (Throwable t) {
|
||||
notifyOutboundHandlerException(t, promise);
|
||||
} finally {
|
||||
decrementOutboundOperations();
|
||||
}
|
||||
}
|
||||
|
||||
@ -777,26 +556,14 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou
|
||||
}
|
||||
|
||||
private void findAndInvokeDeregister(ChannelPromise promise) {
|
||||
DefaultChannelHandlerContext context = findContextOutbound(MASK_DEREGISTER);
|
||||
if (context.isProcessOutboundDirectly()) {
|
||||
context.invokeDeregister(promise);
|
||||
} else {
|
||||
executeOutboundReentrance(context, () -> context.invokeDeregister0(promise), promise, null);
|
||||
}
|
||||
findContextOutbound(MASK_DEREGISTER).invokeDeregister(promise);
|
||||
}
|
||||
|
||||
private void invokeDeregister(ChannelPromise promise) {
|
||||
incrementOutboundOperations();
|
||||
invokeDeregister0(promise);
|
||||
}
|
||||
|
||||
private void invokeDeregister0(ChannelPromise promise) {
|
||||
try {
|
||||
handler().deregister(this, promise);
|
||||
} catch (Throwable t) {
|
||||
notifyOutboundHandlerException(t, promise);
|
||||
} finally {
|
||||
decrementOutboundOperations();
|
||||
}
|
||||
}
|
||||
|
||||
@ -813,26 +580,14 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou
|
||||
}
|
||||
|
||||
private void findAndInvokeRead() {
|
||||
DefaultChannelHandlerContext context = findContextOutbound(MASK_READ);
|
||||
if (context.isProcessOutboundDirectly()) {
|
||||
context.invokeRead();
|
||||
} else {
|
||||
executeOutboundReentrance(context, context::invokeRead0, null, null);
|
||||
}
|
||||
findContextOutbound(MASK_READ).invokeRead();
|
||||
}
|
||||
|
||||
private void invokeRead() {
|
||||
incrementOutboundOperations();
|
||||
invokeRead0();
|
||||
}
|
||||
|
||||
private void invokeRead0() {
|
||||
try {
|
||||
handler().read(this);
|
||||
} catch (Throwable t) {
|
||||
invokeExceptionCaughtFromOutbound(t);
|
||||
} finally {
|
||||
decrementOutboundOperations();
|
||||
}
|
||||
}
|
||||
|
||||
@ -840,12 +595,7 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou
|
||||
if ((executionMask & MASK_EXCEPTION_CAUGHT) != 0) {
|
||||
notifyHandlerException(t);
|
||||
} else {
|
||||
DefaultChannelHandlerContext context = findContextInbound(MASK_EXCEPTION_CAUGHT);
|
||||
if (context.isProcessInboundDirectly()) {
|
||||
context.invokeExceptionCaught(t);
|
||||
} else {
|
||||
executeInboundReentrance(context, () -> context.invokeExceptionCaught0(t));
|
||||
}
|
||||
findContextInbound(MASK_EXCEPTION_CAUGHT).notifyHandlerException(t);
|
||||
}
|
||||
}
|
||||
|
||||
@ -862,18 +612,11 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou
|
||||
}
|
||||
|
||||
private void invokeWrite(Object msg, ChannelPromise promise) {
|
||||
incrementOutboundOperations();
|
||||
invokeWrite0(msg, promise);
|
||||
}
|
||||
|
||||
private void invokeWrite0(Object msg, ChannelPromise promise) {
|
||||
final Object m = pipeline.touch(msg, this);
|
||||
try {
|
||||
handler().write(this, m, promise);
|
||||
} catch (Throwable t) {
|
||||
notifyOutboundHandlerException(t, promise);
|
||||
} finally {
|
||||
decrementOutboundOperations();
|
||||
}
|
||||
}
|
||||
|
||||
@ -891,26 +634,14 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou
|
||||
}
|
||||
|
||||
private void findAndInvokeFlush() {
|
||||
DefaultChannelHandlerContext context = findContextOutbound(MASK_FLUSH);
|
||||
if (context.isProcessOutboundDirectly()) {
|
||||
context.invokeFlush();
|
||||
} else {
|
||||
executeOutboundReentrance(context, context::invokeFlush0, null, null);
|
||||
}
|
||||
findContextOutbound(MASK_FLUSH).invokeFlush();
|
||||
}
|
||||
|
||||
private void invokeFlush() {
|
||||
incrementOutboundOperations();
|
||||
invokeFlush0();
|
||||
}
|
||||
|
||||
private void invokeFlush0() {
|
||||
try {
|
||||
handler().flush(this);
|
||||
} catch (Throwable t) {
|
||||
invokeExceptionCaughtFromOutbound(t);
|
||||
} finally {
|
||||
decrementOutboundOperations();
|
||||
}
|
||||
}
|
||||
|
||||
@ -920,6 +651,11 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou
|
||||
return promise;
|
||||
}
|
||||
|
||||
private void invokeWriteAndFlush(Object msg, ChannelPromise promise) {
|
||||
invokeWrite(msg, promise);
|
||||
invokeFlush();
|
||||
}
|
||||
|
||||
private void write(Object msg, boolean flush, ChannelPromise promise) {
|
||||
requireNonNull(msg, "msg");
|
||||
try {
|
||||
@ -938,19 +674,9 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou
|
||||
final DefaultChannelHandlerContext next = findContextOutbound(flush ?
|
||||
(MASK_WRITE | MASK_FLUSH) : MASK_WRITE);
|
||||
if (flush) {
|
||||
if (next.isProcessOutboundDirectly()) {
|
||||
next.invokeWrite(msg, promise);
|
||||
next.invokeFlush();
|
||||
} else {
|
||||
executeOutboundReentrance(next, () -> next.invokeWrite0(msg, promise), promise, msg);
|
||||
executeOutboundReentrance(next, next::invokeFlush0, null, null);
|
||||
}
|
||||
next.invokeWriteAndFlush(msg, promise);
|
||||
} else {
|
||||
if (next.isProcessOutboundDirectly()) {
|
||||
next.invokeWrite(msg, promise);
|
||||
} else {
|
||||
executeOutboundReentrance(next, () -> next.invokeWrite0(msg, promise), promise, msg);
|
||||
}
|
||||
next.invokeWrite(msg, promise);
|
||||
}
|
||||
} else {
|
||||
final AbstractWriteTask task;
|
||||
@ -989,6 +715,7 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
invokeExceptionCaught(cause);
|
||||
}
|
||||
|
||||
@ -1071,7 +798,7 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou
|
||||
DefaultChannelHandlerContext ctx = this;
|
||||
do {
|
||||
ctx = ctx.next;
|
||||
} while ((ctx.executionMask & mask) == 0 && ctx.isProcessInboundDirectly());
|
||||
} while ((ctx.executionMask & mask) == 0);
|
||||
return ctx;
|
||||
}
|
||||
|
||||
@ -1079,7 +806,7 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou
|
||||
DefaultChannelHandlerContext ctx = this;
|
||||
do {
|
||||
ctx = ctx.prev;
|
||||
} while ((ctx.executionMask & mask) == 0 && ctx.isProcessOutboundDirectly());
|
||||
} while ((ctx.executionMask & mask) == 0);
|
||||
return ctx;
|
||||
}
|
||||
|
||||
@ -1144,9 +871,7 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou
|
||||
return true;
|
||||
} catch (Throwable cause) {
|
||||
try {
|
||||
if (promise != null) {
|
||||
promise.setFailure(cause);
|
||||
}
|
||||
promise.setFailure(cause);
|
||||
} finally {
|
||||
if (msg != null) {
|
||||
ReferenceCountUtil.release(msg);
|
||||
|
@ -69,6 +69,7 @@ public class DefaultChannelPipeline implements ChannelPipeline {
|
||||
private final VoidChannelPromise voidPromise;
|
||||
private final boolean touch = ResourceLeakDetector.isEnabled();
|
||||
private final List<DefaultChannelHandlerContext> handlers = new ArrayList<>(4);
|
||||
|
||||
private volatile MessageSizeEstimator.Handle estimatorHandle;
|
||||
|
||||
public DefaultChannelPipeline(Channel channel) {
|
||||
|
@ -49,12 +49,10 @@ import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.NoSuchElementException;
|
||||
import java.util.Queue;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.LinkedBlockingDeque;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
@ -1372,87 +1370,6 @@ public class DefaultChannelPipelineTest {
|
||||
channel2.close().syncUninterruptibly();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReentranceInbound() throws Exception {
|
||||
DefaultChannelPipeline pipeline = new DefaultChannelPipeline(newLocalChannel());
|
||||
|
||||
BlockingQueue<Integer> queue = new LinkedBlockingDeque<>();
|
||||
pipeline.addLast(new ChannelHandler() {
|
||||
@Override
|
||||
public void channelActive(ChannelHandlerContext ctx) {
|
||||
ctx.fireChannelRead(1);
|
||||
ctx.fireChannelRead(2);
|
||||
}
|
||||
});
|
||||
pipeline.addLast(new ChannelHandler() {
|
||||
boolean called;
|
||||
@Override
|
||||
public void read(ChannelHandlerContext ctx) {
|
||||
if (!called) {
|
||||
called = true;
|
||||
ctx.fireChannelRead(3);
|
||||
}
|
||||
ctx.read();
|
||||
}
|
||||
});
|
||||
pipeline.addLast(new ChannelHandler() {
|
||||
@Override
|
||||
public void channelRead(ChannelHandlerContext ctx, Object msg) {
|
||||
ctx.read();
|
||||
queue.add((Integer) msg);
|
||||
}
|
||||
});
|
||||
|
||||
pipeline.fireChannelActive();
|
||||
|
||||
assertEquals(1, (int) queue.take());
|
||||
assertEquals(3, (int) queue.take());
|
||||
assertEquals(2, (int) queue.take());
|
||||
pipeline.close().syncUninterruptibly();
|
||||
assertNull(queue.poll());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReentranceOutbound() throws Exception {
|
||||
DefaultChannelPipeline pipeline = new DefaultChannelPipeline(newLocalChannel());
|
||||
|
||||
BlockingQueue<Integer> queue = new LinkedBlockingDeque<>();
|
||||
pipeline.addLast(new ChannelHandler() {
|
||||
@Override
|
||||
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
|
||||
ctx.fireUserEventTriggered("");
|
||||
queue.add((Integer) msg);
|
||||
}
|
||||
});
|
||||
pipeline.addLast(new ChannelHandler() {
|
||||
boolean called;
|
||||
|
||||
@Override
|
||||
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
|
||||
if (!called) {
|
||||
called = true;
|
||||
ctx.write(3);
|
||||
}
|
||||
ctx.fireUserEventTriggered(evt);
|
||||
}
|
||||
});
|
||||
pipeline.addLast(new ChannelHandler() {
|
||||
@Override
|
||||
public void channelActive(ChannelHandlerContext ctx) {
|
||||
ctx.write(1);
|
||||
ctx.write(2);
|
||||
}
|
||||
});
|
||||
|
||||
pipeline.fireChannelActive();
|
||||
|
||||
assertEquals(1, (int) queue.take());
|
||||
assertEquals(3, (int) queue.take());
|
||||
assertEquals(2, (int) queue.take());
|
||||
pipeline.close().syncUninterruptibly();
|
||||
assertNull(queue.poll());
|
||||
}
|
||||
|
||||
@Test(timeout = 5000)
|
||||
public void handlerAddedStateUpdatedBeforeHandlerAddedDoneForceEventLoop() throws InterruptedException {
|
||||
handlerAddedStateUpdatedBeforeHandlerAddedDone(true);
|
||||
|
@ -21,7 +21,6 @@ import io.netty.channel.LoggingHandler.Event;
|
||||
import io.netty.channel.local.LocalAddress;
|
||||
|
||||
import org.hamcrest.Matchers;
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.nio.channels.ClosedChannelException;
|
||||
@ -136,39 +135,22 @@ public class ReentrantChannelTest extends BaseChannelTest {
|
||||
assertLog(
|
||||
// Case 1:
|
||||
"WRITABILITY: writable=false\n" +
|
||||
"FLUSH\n" +
|
||||
"WRITE\n" +
|
||||
"WRITABILITY: writable=false\n" +
|
||||
"WRITABILITY: writable=false\n" +
|
||||
"FLUSH\n" +
|
||||
"WRITABILITY: writable=true\n",
|
||||
"FLUSH\n" +
|
||||
"WRITE\n" +
|
||||
"WRITABILITY: writable=false\n" +
|
||||
"WRITABILITY: writable=false\n" +
|
||||
"FLUSH\n" +
|
||||
"WRITABILITY: writable=true\n",
|
||||
// Case 2:
|
||||
"WRITABILITY: writable=false\n" +
|
||||
"FLUSH\n" +
|
||||
"WRITE\n" +
|
||||
"WRITABILITY: writable=false\n" +
|
||||
"WRITABILITY: writable=false\n" +
|
||||
"FLUSH\n" +
|
||||
"WRITABILITY: writable=true\n" +
|
||||
"FLUSH\n",
|
||||
// Case 3:
|
||||
"FLUSH\n" +
|
||||
"WRITE\n" +
|
||||
"WRITABILITY: writable=false\n" +
|
||||
"FLUSH\n" +
|
||||
"WRITE\n" +
|
||||
"WRITABILITY: writable=false\n" +
|
||||
"FLUSH\n" +
|
||||
"WRITABILITY: writable=true\n",
|
||||
// Case 4:
|
||||
"WRITABILITY: writable=false\n" +
|
||||
"FLUSH\n" +
|
||||
"WRITE\n" +
|
||||
"WRITABILITY: writable=false\n" +
|
||||
"FLUSH\n" +
|
||||
"WRITABILITY: writable=true\n" +
|
||||
"WRITABILITY: writable=true\n");
|
||||
"FLUSH\n" +
|
||||
"WRITABILITY: writable=true\n" +
|
||||
"WRITABILITY: writable=true\n");
|
||||
}
|
||||
|
||||
@Ignore("The whole test is questionable so ignore for now")
|
||||
@Test
|
||||
public void testWriteFlushPingPong() throws Exception {
|
||||
|
||||
@ -204,41 +186,26 @@ public class ReentrantChannelTest extends BaseChannelTest {
|
||||
ctx.channel().write(createTestBuf(2000));
|
||||
}
|
||||
ctx.flush();
|
||||
if (flushCount == 5) {
|
||||
ctx.close();
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
clientChannel.write(createTestBuf(2000));
|
||||
clientChannel.closeFuture().syncUninterruptibly();
|
||||
clientChannel.writeAndFlush(createTestBuf(2000));
|
||||
clientChannel.close().sync();
|
||||
|
||||
assertLog(
|
||||
// Case 1:
|
||||
"WRITE\n" +
|
||||
"FLUSH\n" +
|
||||
"WRITE\n" +
|
||||
"FLUSH\n" +
|
||||
"WRITE\n" +
|
||||
"FLUSH\n" +
|
||||
"WRITE\n" +
|
||||
"FLUSH\n" +
|
||||
"WRITE\n" +
|
||||
"FLUSH\n" +
|
||||
"WRITE\n" +
|
||||
"FLUSH\n" +
|
||||
"CLOSE\n",
|
||||
// Case 2:
|
||||
"FLUSH\n" +
|
||||
"WRITE\n" +
|
||||
"FLUSH\n" +
|
||||
"FLUSH\n" +
|
||||
"WRITE\n" +
|
||||
"WRITE\n" +
|
||||
"FLUSH\n" +
|
||||
"FLUSH\n" +
|
||||
"WRITE\n" +
|
||||
"WRITE\n" +
|
||||
"FLUSH\n" +
|
||||
"CLOSE\n");
|
||||
"FLUSH\n" +
|
||||
"WRITE\n" +
|
||||
"FLUSH\n" +
|
||||
"WRITE\n" +
|
||||
"FLUSH\n" +
|
||||
"WRITE\n" +
|
||||
"FLUSH\n" +
|
||||
"WRITE\n" +
|
||||
"FLUSH\n" +
|
||||
"CLOSE\n");
|
||||
}
|
||||
|
||||
@Test
|
||||
|
Loading…
x
Reference in New Issue
Block a user