diff --git a/handler/src/main/java/io/netty/handler/timeout/WriteTimeoutHandler.java b/handler/src/main/java/io/netty/handler/timeout/WriteTimeoutHandler.java index 06b3ba087b..0968f3322e 100644 --- a/handler/src/main/java/io/netty/handler/timeout/WriteTimeoutHandler.java +++ b/handler/src/main/java/io/netty/handler/timeout/WriteTimeoutHandler.java @@ -114,9 +114,11 @@ public class WriteTimeoutHandler extends ChannelOutboundHandlerAdapter { @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { + assert ctx.executor().inEventLoop(); WriteTimeoutTask task = lastTask; lastTask = null; while (task != null) { + assert task.ctx.executor().inEventLoop(); task.scheduledFuture.cancel(false); WriteTimeoutTask prev = task.prev; task.prev = null; @@ -139,6 +141,7 @@ public class WriteTimeoutHandler extends ChannelOutboundHandlerAdapter { } private void addWriteTimeoutTask(WriteTimeoutTask task) { + assert task.ctx.executor().inEventLoop(); if (lastTask != null) { lastTask.next = task; task.prev = lastTask; @@ -147,6 +150,7 @@ public class WriteTimeoutHandler extends ChannelOutboundHandlerAdapter { } private void removeWriteTimeoutTask(WriteTimeoutTask task) { + assert task.ctx.executor().inEventLoop(); if (task == lastTask) { // task is the tail of list assert task.next == null; @@ -214,7 +218,19 @@ public class WriteTimeoutHandler extends ChannelOutboundHandlerAdapter { public void operationComplete(ChannelFuture future) throws Exception { // scheduledFuture has already be set when reaching here scheduledFuture.cancel(false); - removeWriteTimeoutTask(this); + + // Check if its safe to modify the "doubly-linked-list" that we maintain. If its not we will schedule the + // modification so its picked up by the executor.. + if (ctx.executor().inEventLoop()) { + removeWriteTimeoutTask(this); + } else { + // So let's just pass outself to the executor which will then take care of remove this task + // from the doubly-linked list. Schedule ourself is fine as the promise itself is done. + // + // This fixes https://github.com/netty/netty/issues/11053 + assert promise.isDone(); + ctx.executor().execute(this); + } } } } diff --git a/handler/src/test/java/io/netty/handler/timeout/WriteTimeoutHandlerTest.java b/handler/src/test/java/io/netty/handler/timeout/WriteTimeoutHandlerTest.java new file mode 100644 index 0000000000..88378dffa7 --- /dev/null +++ b/handler/src/test/java/io/netty/handler/timeout/WriteTimeoutHandlerTest.java @@ -0,0 +1,61 @@ +/* + * Copyright 2021 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.handler.timeout; + +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.embedded.EmbeddedChannel; +import io.netty.util.concurrent.DefaultEventExecutorGroup; +import io.netty.util.concurrent.EventExecutorGroup; +import org.junit.Test; + +import java.util.concurrent.CountDownLatch; + +import static org.junit.Assert.assertTrue; + +public class WriteTimeoutHandlerTest { + + @Test + public void testPromiseUseDifferentExecutor() throws Exception { + EventExecutorGroup group1 = new DefaultEventExecutorGroup(1); + EventExecutorGroup group2 = new DefaultEventExecutorGroup(1); + EmbeddedChannel channel = new EmbeddedChannel(false, false); + try { + channel.pipeline().addLast(group1, new WriteTimeoutHandler(10000)); + final CountDownLatch latch = new CountDownLatch(1); + channel.pipeline().addLast(group2, new ChannelInboundHandlerAdapter() { + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + ctx.writeAndFlush("something").addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + latch.countDown(); + } + }); + } + }); + + channel.register(); + latch.await(); + assertTrue(channel.finishAndReleaseAll()); + } finally { + group1.shutdownGracefully(); + group2.shutdownGracefully(); + } + } +}