From 4eb42125a7e007a1359e2b4de2076bc890d52dc0 Mon Sep 17 00:00:00 2001 From: norman Date: Tue, 5 Jun 2012 11:21:44 +0200 Subject: [PATCH] Make sure we can't deadlock even if the ChannelPipeline modification is executed by the EventExecutor --- .../netty/channel/DefaultChannelPipeline.java | 465 +++++++++++------- 1 file changed, 292 insertions(+), 173 deletions(-) diff --git a/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java b/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java index a8cda8984f..41ccfd8166 100644 --- a/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java +++ b/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java @@ -31,6 +31,7 @@ import java.util.Map; import java.util.NoSuchElementException; import java.util.Queue; import java.util.concurrent.Executor; +import java.util.concurrent.Future; /** * The default {@link ChannelPipeline} implementation. It is usually created @@ -54,6 +55,7 @@ public class DefaultChannelPipeline implements ChannelPipeline { final Map childExecutors = new IdentityHashMap(); + public DefaultChannelPipeline(Channel channel) { if (channel == null) { throw new NullPointerException("channel"); @@ -80,34 +82,44 @@ public class DefaultChannelPipeline implements ChannelPipeline { } @Override - public synchronized ChannelPipeline addFirst(EventExecutor executor, final String name, final ChannelHandler handler) { - checkDuplicateName(name); - final DefaultChannelHandlerContext nextCtx = head.next; - final DefaultChannelHandlerContext newCtx = - new DefaultChannelHandlerContext(this, executor, head, nextCtx, name, handler); + public ChannelPipeline addFirst(EventExecutor executor, final String name, final ChannelHandler handler) { + try { + Future future; - if (!newCtx.channel().isRegistered() || newCtx.executor().inEventLoop()) { - addFirst0(name, nextCtx, newCtx); - } else { - try { - newCtx.executor().submit(new Runnable() { + synchronized (this) { + checkDuplicateName(name); + final DefaultChannelHandlerContext nextCtx = head.next; + final DefaultChannelHandlerContext newCtx = new DefaultChannelHandlerContext(this, executor, head, nextCtx, name, handler); + + if (!newCtx.channel().isRegistered() || newCtx.executor().inEventLoop()) { + addFirst0(name, nextCtx, newCtx); + return this; + } + future = newCtx.executor().submit(new Runnable() { @Override public void run() { - checkDuplicateName(name); - addFirst0(name, nextCtx, newCtx); + synchronized (DefaultChannelPipeline.this) { + checkDuplicateName(name); + addFirst0(name, nextCtx, newCtx); + } + } - }).get(); - } catch (RuntimeException e) { - throw e; - } catch (Error e) { - throw e; - } catch (Throwable t) { - throw new ChannelPipelineException(t); + }); } + // Call Future.get() outside of the synchronized block to prevent dead-lock + future.get(); + return this; + + } catch (RuntimeException e) { + throw e; + } catch (Error e) { + throw e; + } catch (Throwable t) { + throw new ChannelPipelineException(t); } - return this; } + private void addFirst0(final String name, DefaultChannelHandlerContext nextCtx, DefaultChannelHandlerContext newCtx) { callBeforeAdd(newCtx); @@ -126,32 +138,41 @@ public class DefaultChannelPipeline implements ChannelPipeline { } @Override - public synchronized ChannelPipeline addLast(EventExecutor executor, final String name, final ChannelHandler handler) { - checkDuplicateName(name); - final DefaultChannelHandlerContext oldTail = tail; - final DefaultChannelHandlerContext newTail = - new DefaultChannelHandlerContext(this, executor, oldTail, null, name, handler); + public ChannelPipeline addLast(EventExecutor executor, final String name, final ChannelHandler handler) { + try { + Future future; + synchronized (this) { + checkDuplicateName(name); - if (!newTail.channel().isRegistered() || newTail.executor().inEventLoop()) { - addLast0(name, oldTail, newTail); - } else { - try { - newTail.executor().submit(new Runnable() { - @Override - public void run() { - checkDuplicateName(name); - addLast0(name, oldTail, newTail); - } - }).get(); - } catch (RuntimeException e) { - throw e; - } catch (Error e) { - throw e; - } catch (Throwable t) { - throw new ChannelPipelineException(t); + final DefaultChannelHandlerContext oldTail = tail; + final DefaultChannelHandlerContext newTail = new DefaultChannelHandlerContext(this, executor, oldTail, null, name, handler); + + if (!newTail.channel().isRegistered() || newTail.executor().inEventLoop()) { + addLast0(name, oldTail, newTail); + return this; + } else { + future = newTail.executor().submit(new Runnable() { + @Override + public void run() { + synchronized (DefaultChannelPipeline.this) { + checkDuplicateName(name); + addLast0(name, oldTail, newTail); + } + } + }); + } } + // Call Future.get() outside of synchronized block to prevent dead-lock + future.get(); + } catch (RuntimeException e) { + throw e; + } catch (Error e) { + throw e; + } catch (Throwable t) { + throw new ChannelPipelineException(t); } + return this; } @@ -171,31 +192,45 @@ public class DefaultChannelPipeline implements ChannelPipeline { } @Override - public synchronized ChannelPipeline addBefore(EventExecutor executor, String baseName, final String name, final ChannelHandler handler) { - final DefaultChannelHandlerContext ctx = getContextOrDie(baseName); - checkDuplicateName(name); - final DefaultChannelHandlerContext newCtx = - new DefaultChannelHandlerContext(this, executor, ctx.prev, ctx, name, handler); + public ChannelPipeline addBefore(EventExecutor executor, String baseName, final String name, final ChannelHandler handler) { + try { + Future future; + + synchronized (this) { + final DefaultChannelHandlerContext ctx = getContextOrDie(baseName); + checkDuplicateName(name); + final DefaultChannelHandlerContext newCtx = new DefaultChannelHandlerContext(this, executor, ctx.prev, ctx, name, handler); - if (!newCtx.channel().isRegistered() || newCtx.executor().inEventLoop()) { - addBefore0(name, ctx, newCtx); - } else { - try { - newCtx.executor().submit(new Runnable() { - @Override - public void run() { - checkDuplicateName(name); - addBefore0(name, ctx, newCtx); - } - }).get(); - } catch (RuntimeException e) { - throw e; - } catch (Error e) { - throw e; - } catch (Throwable t) { - throw new ChannelPipelineException(t); + if (!newCtx.channel().isRegistered() || newCtx.executor().inEventLoop()) { + addBefore0(name, ctx, newCtx); + return this; + } else { + future = newCtx.executor().submit(new Runnable() { + @Override + public void run() { + synchronized (DefaultChannelPipeline.this) { + checkDuplicateName(name); + addBefore0(name, ctx, newCtx); + } + + } + }); + + } } + + // Call Future.get() outside of the synchronized to prevent dead-lock + future.get(); + + + } catch (RuntimeException e) { + throw e; + } catch (Error e) { + throw e; + } catch (Throwable t) { + throw new ChannelPipelineException(t); } + return this; } @@ -215,36 +250,45 @@ public class DefaultChannelPipeline implements ChannelPipeline { } @Override - public synchronized ChannelPipeline addAfter(EventExecutor executor, String baseName, final String name, final ChannelHandler handler) { - final DefaultChannelHandlerContext ctx = getContextOrDie(baseName); - if (ctx == tail) { - addLast(name, handler); - } else { - checkDuplicateName(name); - final DefaultChannelHandlerContext newCtx = - new DefaultChannelHandlerContext(this, executor, ctx, ctx.next, name, handler); + public ChannelPipeline addAfter(EventExecutor executor, String baseName, final String name, final ChannelHandler handler) { + + try { + Future future; + + synchronized (this) { + final DefaultChannelHandlerContext ctx = getContextOrDie(baseName); + if (ctx == tail) { + return addLast(name, handler); + } + checkDuplicateName(name); + final DefaultChannelHandlerContext newCtx = new DefaultChannelHandlerContext(this, executor, ctx, ctx.next, name, handler); - if (!newCtx.channel().isRegistered() || newCtx.executor().inEventLoop()) { - addAfter0(name, ctx, newCtx); - } else { - try { - newCtx.executor().submit(new Runnable() { + if (!newCtx.channel().isRegistered() || newCtx.executor().inEventLoop()) { + addAfter0(name, ctx, newCtx); + return this; + } else { + future = newCtx.executor().submit(new Runnable() { @Override public void run() { - checkDuplicateName(name); - addAfter0(name, ctx, newCtx); + synchronized (DefaultChannelPipeline.this) { + checkDuplicateName(name); + addAfter0(name, ctx, newCtx); + } } - }).get(); - } catch (RuntimeException e) { - throw e; - } catch (Error e) { - throw e; - } catch (Throwable t) { - throw new ChannelPipelineException(t); + }); } } + future.get(); + return this; + + } catch (RuntimeException e) { + throw e; + } catch (Error e) { + throw e; + } catch (Throwable t) { + throw new ChannelPipelineException(t); } - return this; + } private void addAfter0(final String name, DefaultChannelHandlerContext ctx, DefaultChannelHandlerContext newCtx) { @@ -320,48 +364,81 @@ public class DefaultChannelPipeline implements ChannelPipeline { } @Override - public synchronized void remove(ChannelHandler handler) { + public void remove(ChannelHandler handler) { remove(getContextOrDie(handler)); } @Override - public synchronized ChannelHandler remove(String name) { + public ChannelHandler remove(String name) { return remove(getContextOrDie(name)).handler(); } @Override - public synchronized T remove(Class handlerType) { + public T remove(Class handlerType) { return (T) remove(getContextOrDie(handlerType)).handler(); } private DefaultChannelHandlerContext remove(final DefaultChannelHandlerContext ctx) { - if (head == tail) { - return null; - } else if (ctx == head) { - throw new Error(); // Should never happen. - } else if (ctx == tail) { - removeLast(); - } else { - if (!ctx.channel().isRegistered() || ctx.executor().inEventLoop()) { - remove0(ctx); - } else { - try { - ctx.executor().submit(new Runnable() { - @Override - public void run() { - remove0(ctx); - } - }).get(); - } catch (RuntimeException e) { - throw e; - } catch (Error e) { - throw e; - } catch (Throwable t) { - throw new ChannelPipelineException(t); + try { + DefaultChannelHandlerContext context; + Future future; + synchronized (this) { + if (head == tail) { + return null; + } else if (ctx == head) { + throw new Error(); // Should never happen. + } else if (ctx == tail) { + if (head == tail) { + throw new NoSuchElementException(); + } + + final DefaultChannelHandlerContext oldTail = tail; + if (!oldTail.channel().isRegistered() || oldTail.executor().inEventLoop()) { + removeLast0(oldTail); + return oldTail; + } else { + future = oldTail.executor().submit(new Runnable() { + @Override + public void run() { + synchronized (oldTail) { + removeLast0(oldTail); + } + } + }); + context = oldTail; + } + + } else { + if (!ctx.channel().isRegistered() || ctx.executor().inEventLoop()) { + remove0(ctx); + return ctx; + } else { + future = ctx.executor().submit(new Runnable() { + + @Override + public void run() { + synchronized (DefaultChannelPipeline.this) { + remove0(ctx); + } + } + + }); + context = ctx; + } } } + + // call the Future.get() outside of the synchronization block to prevent from dead-lock + future.get(); + return context; + } catch (RuntimeException e) { + throw e; + } catch (Error e) { + throw e; + } catch (Throwable t) { + throw new ChannelPipelineException(t); } - return ctx; + } private void remove0(DefaultChannelHandlerContext ctx) { @@ -377,7 +454,7 @@ public class DefaultChannelPipeline implements ChannelPipeline { } @Override - public synchronized ChannelHandler removeFirst() { + public ChannelHandler removeFirst() { if (head == tail) { throw new NoSuchElementException(); } @@ -385,31 +462,42 @@ public class DefaultChannelPipeline implements ChannelPipeline { } @Override - public synchronized ChannelHandler removeLast() { - if (head == tail) { - throw new NoSuchElementException(); + public ChannelHandler removeLast() { + try { + Future future; + final DefaultChannelHandlerContext oldTail; + synchronized (this) { + if (head == tail) { + throw new NoSuchElementException(); + } + oldTail = tail; + if (!oldTail.channel().isRegistered() || oldTail.executor().inEventLoop()) { + removeLast0(oldTail); + return oldTail.handler(); + } else { + future = oldTail.executor().submit(new Runnable() { + @Override + public void run() { + synchronized (DefaultChannelPipeline.this) { + removeLast0(oldTail); + } + } + }); + + } + } + // call Future.get() outside of the synchronized block to prevent deadlock + future.get(); + return oldTail.handler(); + } catch (RuntimeException e) { + throw e; + } catch (Error e) { + throw e; + } catch (Throwable t) { + throw new ChannelPipelineException(t); } - final DefaultChannelHandlerContext oldTail = tail; - if (!oldTail.channel().isRegistered() || oldTail.executor().inEventLoop()) { - removeLast0(oldTail); - } else { - try { - oldTail.executor().submit(new Runnable() { - @Override - public void run() { - removeLast0(oldTail); - } - }).get(); - } catch (RuntimeException e) { - throw e; - } catch (Error e) { - throw e; - } catch (Throwable t) { - throw new ChannelPipelineException(t); - } - } - return oldTail.handler(); + } private void removeLast0(DefaultChannelHandlerContext oldTail) { @@ -423,60 +511,91 @@ public class DefaultChannelPipeline implements ChannelPipeline { } @Override - public synchronized void replace(ChannelHandler oldHandler, String newName, ChannelHandler newHandler) { + public void replace(ChannelHandler oldHandler, String newName, ChannelHandler newHandler) { replace(getContextOrDie(oldHandler), newName, newHandler); } @Override - public synchronized ChannelHandler replace(String oldName, String newName, ChannelHandler newHandler) { + public ChannelHandler replace(String oldName, String newName, ChannelHandler newHandler) { return replace(getContextOrDie(oldName), newName, newHandler); } @Override @SuppressWarnings("unchecked") - public synchronized T replace( + public T replace( Class oldHandlerType, String newName, ChannelHandler newHandler) { return (T) replace(getContextOrDie(oldHandlerType), newName, newHandler); } private ChannelHandler replace(final DefaultChannelHandlerContext ctx, final String newName, final ChannelHandler newHandler) { - if (ctx == head) { - throw new IllegalArgumentException(); - } else if (ctx == tail) { - removeLast(); - addLast(newName, newHandler); - } else { - boolean sameName = ctx.name().equals(newName); - if (!sameName) { - checkDuplicateName(newName); - } + try { + Future future; + synchronized (this) { + if (ctx == head) { + throw new IllegalArgumentException(); + } else if (ctx == tail) { + if (head == tail) { + throw new NoSuchElementException(); + } + final DefaultChannelHandlerContext oldTail = tail; + final DefaultChannelHandlerContext newTail = new DefaultChannelHandlerContext(this, null, oldTail, null, newName, newHandler); + + if (!oldTail.channel().isRegistered() || oldTail.executor().inEventLoop()) { + removeLast0(oldTail); + checkDuplicateName(newName); + addLast0(newName, tail, newTail); + return ctx.handler(); - DefaultChannelHandlerContext prev = ctx.prev; - DefaultChannelHandlerContext next = ctx.next; - final DefaultChannelHandlerContext newCtx = - new DefaultChannelHandlerContext(this, ctx.executor, prev, next, newName, newHandler); + } else { + future = oldTail.executor().submit(new Runnable() { + @Override + public void run() { + synchronized (DefaultChannelPipeline.this) { + removeLast0(oldTail); + checkDuplicateName(newName); + addLast0(newName, tail, newTail); + } + } + }); + + } + + } else { + boolean sameName = ctx.name().equals(newName); + if (!sameName) { + checkDuplicateName(newName); + } - if (!newCtx.channel().isRegistered() || newCtx.executor().inEventLoop()) { - replace0(ctx, newName, newCtx); - } else { - try { - newCtx.executor().submit(new Runnable() { - @Override - public void run() { - replace0(ctx, newName, newCtx); - } - }).get(); - } catch (RuntimeException e) { - throw e; - } catch (Error e) { - throw e; - } catch (Throwable t) { - throw new ChannelPipelineException(t); + DefaultChannelHandlerContext prev = ctx.prev; + DefaultChannelHandlerContext next = ctx.next; + + final DefaultChannelHandlerContext newCtx = new DefaultChannelHandlerContext(this, ctx.executor, prev, next, newName, newHandler); + + if (!newCtx.channel().isRegistered() || newCtx.executor().inEventLoop()) { + replace0(ctx, newName, newCtx); + return ctx.handler(); + } else { + future = newCtx.executor().submit(new Runnable() { + @Override + public void run() { + synchronized (DefaultChannelPipeline.this) { + replace0(ctx, newName, newCtx); + } + } + }); + } } } - } + future.get(); + return ctx.handler(); - return ctx.handler(); + } catch (RuntimeException e) { + throw e; + } catch (Error e) { + throw e; + } catch (Throwable t) { + throw new ChannelPipelineException(t); + } } private void replace0(DefaultChannelHandlerContext ctx, String newName, DefaultChannelHandlerContext newCtx) {