From 758b7dc793dd771df81dd58d5932302f3c96be78 Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Thu, 7 Jun 2012 15:40:00 +0200 Subject: [PATCH] Make sure calling ExecutionHandler.releaseExternalResource() does not lead to a dead-lock when calling from a ChannelEventRunnable. See #200 --- .../ChannelDownstreamEventRunnable.java | 11 +- .../execution/ChannelEventRunnable.java | 19 ++- .../ChannelUpstreamEventRunnable.java | 7 +- .../handler/execution/ExecutionHandler.java | 4 +- .../execution/ExecutionHandlerTest.java | 136 ++++++++++++++++++ 5 files changed, 167 insertions(+), 10 deletions(-) create mode 100644 src/test/java/org/jboss/netty/handler/execution/ExecutionHandlerTest.java diff --git a/src/main/java/org/jboss/netty/handler/execution/ChannelDownstreamEventRunnable.java b/src/main/java/org/jboss/netty/handler/execution/ChannelDownstreamEventRunnable.java index 6b0f94cd53..59515c070f 100644 --- a/src/main/java/org/jboss/netty/handler/execution/ChannelDownstreamEventRunnable.java +++ b/src/main/java/org/jboss/netty/handler/execution/ChannelDownstreamEventRunnable.java @@ -16,6 +16,8 @@ package org.jboss.netty.handler.execution; +import java.util.concurrent.Executor; + import org.jboss.netty.channel.ChannelEvent; import org.jboss.netty.channel.ChannelHandlerContext; @@ -24,14 +26,15 @@ import org.jboss.netty.channel.ChannelHandlerContext; */ public class ChannelDownstreamEventRunnable extends ChannelEventRunnable { - public ChannelDownstreamEventRunnable(ChannelHandlerContext ctx, ChannelEvent e) { - super(ctx, e); + public ChannelDownstreamEventRunnable(ChannelHandlerContext ctx, ChannelEvent e, Executor executor) { + super(ctx, e, executor); } /** * Send the {@link ChannelEvent} downstream */ - public void run() { - ctx.sendDownstream(e); + @Override + protected void doRun() { + ctx.sendDownstream(e); } } diff --git a/src/main/java/org/jboss/netty/handler/execution/ChannelEventRunnable.java b/src/main/java/org/jboss/netty/handler/execution/ChannelEventRunnable.java index 8361ca4667..84235f631c 100644 --- a/src/main/java/org/jboss/netty/handler/execution/ChannelEventRunnable.java +++ b/src/main/java/org/jboss/netty/handler/execution/ChannelEventRunnable.java @@ -15,23 +15,28 @@ */ package org.jboss.netty.handler.execution; +import java.util.concurrent.Executor; + import org.jboss.netty.channel.ChannelEvent; import org.jboss.netty.channel.ChannelHandlerContext; import org.jboss.netty.util.EstimatableObjectWrapper; +import org.jboss.netty.util.internal.DeadLockProofWorker; public abstract class ChannelEventRunnable implements Runnable, EstimatableObjectWrapper { protected final ChannelHandlerContext ctx; protected final ChannelEvent e; int estimatedSize; + private Executor executor; /** * Creates a {@link Runnable} which sends the specified {@link ChannelEvent} * upstream via the specified {@link ChannelHandlerContext}. */ - public ChannelEventRunnable(ChannelHandlerContext ctx, ChannelEvent e) { + public ChannelEventRunnable(ChannelHandlerContext ctx, ChannelEvent e, Executor executor) { this.ctx = ctx; this.e = e; + this.executor = executor; } /** @@ -52,4 +57,16 @@ public abstract class ChannelEventRunnable implements Runnable, EstimatableObjec public Object unwrap() { return e; } + + public final void run() { + try { + DeadLockProofWorker.PARENT.set(executor); + doRun(); + } finally { + DeadLockProofWorker.PARENT.remove(); + } + + } + + protected abstract void doRun(); } diff --git a/src/main/java/org/jboss/netty/handler/execution/ChannelUpstreamEventRunnable.java b/src/main/java/org/jboss/netty/handler/execution/ChannelUpstreamEventRunnable.java index ba8433cdb1..caac49aa31 100644 --- a/src/main/java/org/jboss/netty/handler/execution/ChannelUpstreamEventRunnable.java +++ b/src/main/java/org/jboss/netty/handler/execution/ChannelUpstreamEventRunnable.java @@ -32,15 +32,16 @@ public class ChannelUpstreamEventRunnable extends ChannelEventRunnable { * Creates a {@link Runnable} which sends the specified {@link ChannelEvent} * upstream via the specified {@link ChannelHandlerContext}. */ - public ChannelUpstreamEventRunnable(ChannelHandlerContext ctx, ChannelEvent e) { - super(ctx, e); + public ChannelUpstreamEventRunnable(ChannelHandlerContext ctx, ChannelEvent e, Executor executor) { + super(ctx, e, executor); } /** * Sends the event upstream. */ - public void run() { + @Override + protected void doRun() { ctx.sendUpstream(e); } } diff --git a/src/main/java/org/jboss/netty/handler/execution/ExecutionHandler.java b/src/main/java/org/jboss/netty/handler/execution/ExecutionHandler.java index d65a05e4ef..dde16c540d 100644 --- a/src/main/java/org/jboss/netty/handler/execution/ExecutionHandler.java +++ b/src/main/java/org/jboss/netty/handler/execution/ExecutionHandler.java @@ -169,7 +169,7 @@ public class ExecutionHandler implements ChannelUpstreamHandler, ChannelDownstre public void handleUpstream( ChannelHandlerContext context, ChannelEvent e) throws Exception { if (handleUpstream) { - executor.execute(new ChannelUpstreamEventRunnable(context, e)); + executor.execute(new ChannelUpstreamEventRunnable(context, e, executor)); } else { context.sendUpstream(e); } @@ -180,7 +180,7 @@ public class ExecutionHandler implements ChannelUpstreamHandler, ChannelDownstre // check if the read was suspend if (!handleReadSuspend(ctx, e)) { if (handleDownstream) { - executor.execute(new ChannelDownstreamEventRunnable(ctx, e)); + executor.execute(new ChannelDownstreamEventRunnable(ctx, e, executor)); } else { ctx.sendDownstream(e); } diff --git a/src/test/java/org/jboss/netty/handler/execution/ExecutionHandlerTest.java b/src/test/java/org/jboss/netty/handler/execution/ExecutionHandlerTest.java new file mode 100644 index 0000000000..525af9f587 --- /dev/null +++ b/src/test/java/org/jboss/netty/handler/execution/ExecutionHandlerTest.java @@ -0,0 +1,136 @@ +/* + * Copyright 2011 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.jboss.netty.handler.execution; + +import static org.easymock.EasyMock.*; +import static org.junit.Assert.assertTrue; + +import org.jboss.netty.channel.Channel; +import org.jboss.netty.channel.ChannelEvent; +import org.jboss.netty.channel.ChannelHandler; +import org.jboss.netty.channel.ChannelHandlerContext; +import org.jboss.netty.channel.ChannelPipeline; +import org.jboss.netty.channel.DefaultChannelFuture; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.junit.Test; + +public class ExecutionHandlerTest { + + @Test + public void testReleaseExternalResourceViaUpstreamEvent() throws Exception { + + Channel channel = createMock(Channel.class); + expect(channel.isOpen()).andReturn(true).anyTimes(); + ChannelEvent event = createMock(ChannelEvent.class); + expect(event.getChannel()).andReturn(channel).anyTimes(); + expect(event.getFuture()).andReturn(new DefaultChannelFuture(channel,false)).anyTimes(); + replay(channel, event); + + final CountDownLatch latch = new CountDownLatch(1); + + OrderedMemoryAwareThreadPoolExecutor executor = new OrderedMemoryAwareThreadPoolExecutor(10, 0L, 0L); + final ExecutionHandler handler = new ExecutionHandler(executor, true, true); + handler.handleUpstream(new TestChannelHandlerContext(channel, handler, latch), event); + + assertTrue(latch.await(5, TimeUnit.SECONDS)); + } + + @Test + public void testReleaseExternalResourceViaDownstreamEvent() throws Exception { + Channel channel = createMock(Channel.class); + expect(channel.getCloseFuture()).andReturn(new DefaultChannelFuture(channel, false)); + ChannelEvent event = createMock(ChannelEvent.class); + expect(event.getChannel()).andReturn(channel).anyTimes(); + expect(event.getFuture()).andReturn(new DefaultChannelFuture(channel,false)).anyTimes(); + + + replay(channel, event); + + final CountDownLatch latch = new CountDownLatch(1); + + OrderedDownstreamThreadPoolExecutor executor = new OrderedDownstreamThreadPoolExecutor(10); + final ExecutionHandler handler = new ExecutionHandler(executor, true, true); + handler.handleDownstream(new TestChannelHandlerContext(channel, handler, latch), event); + + assertTrue(latch.await(5, TimeUnit.SECONDS)); + } + + private static final class TestChannelHandlerContext implements ChannelHandlerContext { + + private final CountDownLatch latch; + private final ExecutionHandler handler; + private Channel channel; + + public TestChannelHandlerContext(Channel channel, ExecutionHandler handler, CountDownLatch latch) { + this.latch = latch; + this.handler = handler; + this.channel = channel; + } + + + public Channel getChannel() { + return channel; + } + + public ChannelPipeline getPipeline() { + return null; + } + + public String getName() { + return handler.getClass().getName(); + } + + public ChannelHandler getHandler() { + return handler; + } + + public boolean canHandleUpstream() { + return true; + } + + public boolean canHandleDownstream() { + return true; + } + + public void sendUpstream(ChannelEvent e) { + try { + handler.releaseExternalResources(); + } catch (IllegalStateException ex) { + latch.countDown(); + } + } + + public void sendDownstream(ChannelEvent e) { + try { + handler.releaseExternalResources(); + } catch (IllegalStateException ex) { + latch.countDown(); + } + } + + public Object getAttachment() { + return null; + } + + public void setAttachment(Object attachment) { + + } + + } +}