From c149f4bcc0c0d02aa1abcd5e39c155a9e598822e Mon Sep 17 00:00:00 2001 From: bgallagher Date: Fri, 23 Aug 2013 14:52:52 -0400 Subject: [PATCH] Remove support from deregister a Channel from a EventLoop manually --- .../http/upload/HttpUploadServerHandler.java | 2 +- .../example/uptime/UptimeClientHandler.java | 6 +- .../netty/handler/logging/LoggingHandler.java | 18 --- .../java/io/netty/handler/ssl/SslHandler.java | 7 +- .../io/netty/channel/AbstractChannel.java | 45 ++---- .../main/java/io/netty/channel/Channel.java | 7 - .../netty/channel/ChannelDuplexHandler.java | 11 -- .../netty/channel/ChannelHandlerContext.java | 4 - .../netty/channel/ChannelInboundHandler.java | 12 +- .../channel/ChannelInboundHandlerAdapter.java | 13 +- .../netty/channel/ChannelInboundInvoker.java | 10 -- .../netty/channel/ChannelOutboundHandler.java | 9 -- .../ChannelOutboundHandlerAdapter.java | 11 -- .../netty/channel/ChannelOutboundInvoker.java | 31 ---- .../io/netty/channel/ChannelPipeline.java | 6 - .../channel/CombinedChannelDuplexHandler.java | 10 -- .../channel/DefaultChannelHandlerContext.java | 58 ------- .../netty/channel/DefaultChannelPipeline.java | 30 +--- .../io/netty/channel/group/ChannelGroup.java | 21 --- .../channel/group/DefaultChannelGroup.java | 27 ---- .../io/netty/channel/local/LocalChannel.java | 5 +- .../netty/channel/AbstractEventLoopTest.java | 12 +- .../io/netty/channel/BaseChannelTest.java | 14 ++ .../java/io/netty/channel/LoggingHandler.java | 17 +-- .../netty/channel/ReentrantChannelTest.java | 143 ++++++++++++++++++ .../local/LocalTransportThreadModelTest3.java | 22 +-- 26 files changed, 188 insertions(+), 363 deletions(-) diff --git a/example/src/main/java/io/netty/example/http/upload/HttpUploadServerHandler.java b/example/src/main/java/io/netty/example/http/upload/HttpUploadServerHandler.java index 878c5271f5..8856b9762e 100644 --- a/example/src/main/java/io/netty/example/http/upload/HttpUploadServerHandler.java +++ b/example/src/main/java/io/netty/example/http/upload/HttpUploadServerHandler.java @@ -88,7 +88,7 @@ public class HttpUploadServerHandler extends SimpleChannelInboundHandler { } @Override - public void channelInactive(ChannelHandlerContext ctx) throws Exception { + public void channelInactive(final ChannelHandlerContext ctx) throws Exception { println("Disconnected from: " + ctx.channel().remoteAddress()); - } - @Override - public void channelUnregistered(final ChannelHandlerContext ctx) - throws Exception { println("Sleeping for: " + UptimeClient.RECONNECT_DELAY + 's'); final EventLoop loop = ctx.channel().eventLoop(); diff --git a/handler/src/main/java/io/netty/handler/logging/LoggingHandler.java b/handler/src/main/java/io/netty/handler/logging/LoggingHandler.java index 154f1074b1..a33f637232 100644 --- a/handler/src/main/java/io/netty/handler/logging/LoggingHandler.java +++ b/handler/src/main/java/io/netty/handler/logging/LoggingHandler.java @@ -200,15 +200,6 @@ public class LoggingHandler extends ChannelDuplexHandler { super.channelRegistered(ctx); } - @Override - public void channelUnregistered(ChannelHandlerContext ctx) - throws Exception { - if (logger.isEnabled(internalLevel)) { - logger.log(internalLevel, format(ctx, "UNREGISTERED")); - } - super.channelUnregistered(ctx); - } - @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { @@ -282,15 +273,6 @@ public class LoggingHandler extends ChannelDuplexHandler { super.close(ctx, promise); } - @Override - public void deregister(ChannelHandlerContext ctx, - ChannelPromise promise) throws Exception { - if (logger.isEnabled(internalLevel)) { - logger.log(internalLevel, format(ctx, "DEREGISTER()")); - } - super.deregister(ctx, promise); - } - @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { logMessage(ctx, "RECEIVED", msg); diff --git a/handler/src/main/java/io/netty/handler/ssl/SslHandler.java b/handler/src/main/java/io/netty/handler/ssl/SslHandler.java index 0f5ecae84b..6427d543d9 100644 --- a/handler/src/main/java/io/netty/handler/ssl/SslHandler.java +++ b/handler/src/main/java/io/netty/handler/ssl/SslHandler.java @@ -44,6 +44,7 @@ import javax.net.ssl.SSLEngine; import javax.net.ssl.SSLEngineResult; import javax.net.ssl.SSLEngineResult.Status; import javax.net.ssl.SSLException; + import java.io.IOException; import java.net.SocketAddress; import java.nio.ByteBuffer; @@ -58,6 +59,7 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.regex.Pattern; + /** * Adds SSL * · TLS and StartTLS support to a {@link Channel}. Please refer @@ -370,11 +372,6 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH ctx.connect(remoteAddress, localAddress, promise); } - @Override - public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { - ctx.deregister(promise); - } - @Override public void disconnect(final ChannelHandlerContext ctx, final ChannelPromise promise) throws Exception { diff --git a/transport/src/main/java/io/netty/channel/AbstractChannel.java b/transport/src/main/java/io/netty/channel/AbstractChannel.java index 5876fc3b1a..6871ad3349 100644 --- a/transport/src/main/java/io/netty/channel/AbstractChannel.java +++ b/transport/src/main/java/io/netty/channel/AbstractChannel.java @@ -30,6 +30,7 @@ import java.net.InetSocketAddress; import java.net.SocketAddress; import java.nio.channels.ClosedChannelException; import java.nio.channels.NotYetConnectedException; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; /** * A skeletal {@link Channel} implementation. @@ -66,6 +67,9 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha private boolean strValActive; private String strVal; + private static final AtomicReferenceFieldUpdater EVENT_LOOP_UPDATER = + AtomicReferenceFieldUpdater.newUpdater(AbstractChannel.class, EventLoop.class, "eventLoop"); + /** * Creates a new instance. * @@ -177,11 +181,6 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha return pipeline.close(); } - @Override - public ChannelFuture deregister() { - return pipeline.deregister(); - } - @Override public Channel flush() { pipeline.flush(); @@ -213,11 +212,6 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha return pipeline.close(promise); } - @Override - public ChannelFuture deregister(ChannelPromise promise) { - return pipeline.deregister(promise); - } - @Override public Channel read() { pipeline.read(); @@ -395,17 +389,16 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha if (eventLoop == null) { throw new NullPointerException("eventLoop"); } - if (isRegistered()) { - promise.setFailure(new IllegalStateException("registered to an event loop already")); - return; - } + if (!isCompatible(eventLoop)) { - promise.setFailure( - new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName())); + promise.setFailure(new IllegalStateException("incompatible event loop type: " + + eventLoop.getClass().getName())); return; } - AbstractChannel.this.eventLoop = eventLoop; + if (!AbstractChannel.EVENT_LOOP_UPDATER.compareAndSet(AbstractChannel.this, null, eventLoop)) { + return; + } if (eventLoop.inEventLoop()) { register0(promise); @@ -560,7 +553,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha }); } - deregister(voidPromise()); + deregister(); } } @@ -573,10 +566,8 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha } } - @Override - public final void deregister(final ChannelPromise promise) { + private void deregister() { if (!registered) { - promise.setSuccess(); return; } @@ -587,18 +578,6 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha } finally { if (registered) { registered = false; - promise.setSuccess(); - invokeLater(new Runnable() { - @Override - public void run() { - pipeline.fireChannelUnregistered(); - } - }); - } else { - // Some transports like local and AIO does not allow the deregistration of - // an open channel. Their doDeregister() calls close(). Consequently, - // close() calls deregister() again - no need to fire channelUnregistered. - promise.setSuccess(); } } } diff --git a/transport/src/main/java/io/netty/channel/Channel.java b/transport/src/main/java/io/netty/channel/Channel.java index 39b8417762..2a9d34f3cc 100644 --- a/transport/src/main/java/io/netty/channel/Channel.java +++ b/transport/src/main/java/io/netty/channel/Channel.java @@ -222,13 +222,6 @@ public interface Channel extends AttributeMap, ChannelOutboundInvoker, ChannelPr */ void closeForcibly(); - /** - * Deregister the {@link Channel} of the {@link ChannelPromise} from {@link EventLoop} and notify the - * {@link ChannelPromise} once the operation was complete. - */ - @Deprecated - void deregister(ChannelPromise promise); - /** * Schedules a read operation that fills the inbound buffer of the first {@link ChannelInboundHandler} in the * {@link ChannelPipeline}. If there's already a pending read operation, this method does nothing. diff --git a/transport/src/main/java/io/netty/channel/ChannelDuplexHandler.java b/transport/src/main/java/io/netty/channel/ChannelDuplexHandler.java index 6b6d1841db..afea6180c2 100644 --- a/transport/src/main/java/io/netty/channel/ChannelDuplexHandler.java +++ b/transport/src/main/java/io/netty/channel/ChannelDuplexHandler.java @@ -73,17 +73,6 @@ public class ChannelDuplexHandler extends ChannelInboundHandlerAdapter implement ctx.close(future); } - /** - * Calls {@link ChannelHandlerContext#close(ChannelPromise)} to forward - * to the next {@link ChannelOutboundHandler} in the {@link ChannelPipeline}. - * - * Sub-classes may override this method to change behavior. - */ - @Override - public void deregister(ChannelHandlerContext ctx, ChannelPromise future) throws Exception { - ctx.deregister(future); - } - /** * Calls {@link ChannelHandlerContext#read()} to forward * to the next {@link ChannelOutboundHandler} in the {@link ChannelPipeline}. diff --git a/transport/src/main/java/io/netty/channel/ChannelHandlerContext.java b/transport/src/main/java/io/netty/channel/ChannelHandlerContext.java index 4c24ff1cc3..2d7ca803b1 100644 --- a/transport/src/main/java/io/netty/channel/ChannelHandlerContext.java +++ b/transport/src/main/java/io/netty/channel/ChannelHandlerContext.java @@ -160,10 +160,6 @@ public interface ChannelHandlerContext @Override ChannelHandlerContext fireChannelRegistered(); - @Override - @Deprecated - ChannelHandlerContext fireChannelUnregistered(); - @Override ChannelHandlerContext fireChannelActive(); diff --git a/transport/src/main/java/io/netty/channel/ChannelInboundHandler.java b/transport/src/main/java/io/netty/channel/ChannelInboundHandler.java index 4cccc84405..a4e1b97cfa 100644 --- a/transport/src/main/java/io/netty/channel/ChannelInboundHandler.java +++ b/transport/src/main/java/io/netty/channel/ChannelInboundHandler.java @@ -16,8 +16,8 @@ package io.netty.channel; /** - * {@link ChannelHandler} which adds callbacks for state changes. This allows the user - * to hook in to state changes easily. + * {@link ChannelHandler} which adds callbacks for state changes. This allows the user to hook in to state changes + * easily. */ public interface ChannelInboundHandler extends ChannelHandler { @@ -26,14 +26,6 @@ public interface ChannelInboundHandler extends ChannelHandler { */ void channelRegistered(ChannelHandlerContext ctx) throws Exception; - /** - * The {@link Channel} of the {@link ChannelHandlerContext} was unregistered from its {@link EventLoop} - * - * @deprecated use {@link #channelInactive(ChannelHandlerContext)} - */ - @Deprecated - void channelUnregistered(ChannelHandlerContext ctx) throws Exception; - /** * The {@link Channel} of the {@link ChannelHandlerContext} is now active */ diff --git a/transport/src/main/java/io/netty/channel/ChannelInboundHandlerAdapter.java b/transport/src/main/java/io/netty/channel/ChannelInboundHandlerAdapter.java index 3a06d12170..580cc82f3e 100644 --- a/transport/src/main/java/io/netty/channel/ChannelInboundHandlerAdapter.java +++ b/transport/src/main/java/io/netty/channel/ChannelInboundHandlerAdapter.java @@ -16,7 +16,7 @@ package io.netty.channel; /** - * Abstract base class for {@link ChannelInboundHandler} implementations which provide + * Abstract base class for {@link ChannelInboundHandler} implementations that provides * implementations of all of their methods. * *

@@ -42,17 +42,6 @@ public class ChannelInboundHandlerAdapter extends ChannelHandlerAdapter implemen ctx.fireChannelRegistered(); } - /** - * Calls {@link ChannelHandlerContext#fireChannelUnregistered()} to forward - * to the next {@link ChannelInboundHandler} in the {@link ChannelPipeline}. - * - * Sub-classes may override this method to change behavior. - */ - @Override - public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { - ctx.fireChannelUnregistered(); - } - /** * Calls {@link ChannelHandlerContext#fireChannelActive()} to forward * to the next {@link ChannelInboundHandler} in the {@link ChannelPipeline}. diff --git a/transport/src/main/java/io/netty/channel/ChannelInboundInvoker.java b/transport/src/main/java/io/netty/channel/ChannelInboundInvoker.java index 64086f348e..0771762140 100644 --- a/transport/src/main/java/io/netty/channel/ChannelInboundInvoker.java +++ b/transport/src/main/java/io/netty/channel/ChannelInboundInvoker.java @@ -30,16 +30,6 @@ interface ChannelInboundInvoker { */ ChannelInboundInvoker fireChannelRegistered(); - /** - * A {@link Channel} was unregistered from its {@link EventLoop}. - * - * This will result in having the {@link ChannelInboundHandler#channelUnregistered(ChannelHandlerContext)} method - * called of the next {@link ChannelInboundHandler} contained in the {@link ChannelPipeline} of the - * {@link Channel}. - */ - @Deprecated - ChannelInboundInvoker fireChannelUnregistered(); - /** * A {@link Channel} is active now, which means it is connected. * diff --git a/transport/src/main/java/io/netty/channel/ChannelOutboundHandler.java b/transport/src/main/java/io/netty/channel/ChannelOutboundHandler.java index 92a3943ad0..a02f780bca 100644 --- a/transport/src/main/java/io/netty/channel/ChannelOutboundHandler.java +++ b/transport/src/main/java/io/netty/channel/ChannelOutboundHandler.java @@ -62,15 +62,6 @@ public interface ChannelOutboundHandler extends ChannelHandler { */ void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception; - /** - * Called once a deregister operation is made from the current registered {@link EventLoop}. - * - * @param ctx the {@link ChannelHandlerContext} for which the close operation is made - * @param promise the {@link ChannelPromise} to notify once the operation completes - * @throws Exception thrown if an error accour - */ - void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception; - /** * Intercepts {@link ChannelHandlerContext#read()}. */ diff --git a/transport/src/main/java/io/netty/channel/ChannelOutboundHandlerAdapter.java b/transport/src/main/java/io/netty/channel/ChannelOutboundHandlerAdapter.java index c0a6897123..2ac728ac77 100644 --- a/transport/src/main/java/io/netty/channel/ChannelOutboundHandlerAdapter.java +++ b/transport/src/main/java/io/netty/channel/ChannelOutboundHandlerAdapter.java @@ -71,17 +71,6 @@ public class ChannelOutboundHandlerAdapter extends ChannelHandlerAdapter impleme ctx.close(promise); } - /** - * Calls {@link ChannelHandlerContext#close(ChannelPromise)} to forward - * to the next {@link ChannelOutboundHandler} in the {@link ChannelPipeline}. - * - * Sub-classes may override this method to change behavior. - */ - @Override - public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { - ctx.deregister(promise); - } - /** * Calls {@link ChannelHandlerContext#read()} to forward * to the next {@link ChannelOutboundHandler} in the {@link ChannelPipeline}. diff --git a/transport/src/main/java/io/netty/channel/ChannelOutboundInvoker.java b/transport/src/main/java/io/netty/channel/ChannelOutboundInvoker.java index 64b5ec64cc..f4de88cde6 100644 --- a/transport/src/main/java/io/netty/channel/ChannelOutboundInvoker.java +++ b/transport/src/main/java/io/netty/channel/ChannelOutboundInvoker.java @@ -15,8 +15,6 @@ */ package io.netty.channel; -import io.netty.util.concurrent.EventExecutor; - import java.net.ConnectException; import java.net.SocketAddress; @@ -88,20 +86,6 @@ interface ChannelOutboundInvoker { */ ChannelFuture close(); - /** - * Request to deregister this ChannelOutboundInvoker from the previous assigned {@link EventExecutor} and notify the - * {@link ChannelFuture} once the operation completes, either because the operation was successful or because of - * an error. - *

- * This will result in having the - * {@link ChannelOutboundHandler#deregister(ChannelHandlerContext, ChannelPromise)} - * method called of the next {@link ChannelOutboundHandler} contained in the {@link ChannelPipeline} of the - * {@link Channel}. - * - */ - @Deprecated - ChannelFuture deregister(); - /** * Request to bind to the given {@link SocketAddress} and notify the {@link ChannelFuture} once the operation * completes, either because the operation was successful or because of an error. @@ -175,21 +159,6 @@ interface ChannelOutboundInvoker { */ ChannelFuture close(ChannelPromise promise); - /** - * Request to deregister this ChannelOutboundInvoker from the previous assigned {@link EventExecutor} and notify the - * {@link ChannelFuture} once the operation completes, either because the operation was successful or because of - * an error. - * - * The given {@link ChannelPromise} will be notified. - *

- * This will result in having the - * {@link ChannelOutboundHandler#deregister(ChannelHandlerContext, ChannelPromise)} - * method called of the next {@link ChannelOutboundHandler} contained in the {@link ChannelPipeline} of the - * {@link Channel}. - */ - @Deprecated - ChannelFuture deregister(ChannelPromise promise); - /** * Request to Read data from the {@link Channel} into the first inbound buffer, triggers an * {@link ChannelInboundHandler#channelRead(ChannelHandlerContext, Object)} event if data was diff --git a/transport/src/main/java/io/netty/channel/ChannelPipeline.java b/transport/src/main/java/io/netty/channel/ChannelPipeline.java index f8e07bc9ee..7090f5dced 100644 --- a/transport/src/main/java/io/netty/channel/ChannelPipeline.java +++ b/transport/src/main/java/io/netty/channel/ChannelPipeline.java @@ -138,7 +138,6 @@ import java.util.NoSuchElementException; *

  • {@link ChannelHandlerContext#fireUserEventTriggered(Object)}
  • *
  • {@link ChannelHandlerContext#fireChannelWritabilityChanged()}
  • *
  • {@link ChannelHandlerContext#fireChannelInactive()}
  • - *
  • {@link ChannelHandlerContext#fireChannelUnregistered()}
  • * * *
  • Outbound event propagation methods: @@ -150,7 +149,6 @@ import java.util.NoSuchElementException; *
  • {@link ChannelHandlerContext#read()}
  • *
  • {@link ChannelHandlerContext#disconnect(ChannelPromise)}
  • *
  • {@link ChannelHandlerContext#close(ChannelPromise)}
  • - *
  • {@link ChannelHandlerContext#deregister(ChannelPromise)}
  • * * * @@ -597,10 +595,6 @@ public interface ChannelPipeline @Override ChannelPipeline fireChannelRegistered(); - @Override - @Deprecated - ChannelPipeline fireChannelUnregistered(); - @Override ChannelPipeline fireChannelActive(); diff --git a/transport/src/main/java/io/netty/channel/CombinedChannelDuplexHandler.java b/transport/src/main/java/io/netty/channel/CombinedChannelDuplexHandler.java index 7ba94a1208..4e9e925c68 100644 --- a/transport/src/main/java/io/netty/channel/CombinedChannelDuplexHandler.java +++ b/transport/src/main/java/io/netty/channel/CombinedChannelDuplexHandler.java @@ -118,11 +118,6 @@ public class CombinedChannelDuplexHandler, Comparable { * the operation is done for all channels */ ChannelGroupFuture close(ChannelMatcher matcher); - - /** - * Deregister all {@link Channel}s in this group from their {@link EventLoop}. - * Please note that this operation is asynchronous as {@link Channel#deregister()} is. - * - * @return the {@link ChannelGroupFuture} instance that notifies when - * the operation is done for all channels - */ - @Deprecated - ChannelGroupFuture deregister(); - - /** - * Deregister all {@link Channel}s in this group from their {@link EventLoop} that match the given - * {@link ChannelMatcher}. Please note that this operation is asynchronous as {@link Channel#deregister()} is. - * - * @return the {@link ChannelGroupFuture} instance that notifies when - * the operation is done for all channels - */ - @Deprecated - ChannelGroupFuture deregister(ChannelMatcher matcher); } diff --git a/transport/src/main/java/io/netty/channel/group/DefaultChannelGroup.java b/transport/src/main/java/io/netty/channel/group/DefaultChannelGroup.java index 2a83eaa26c..c4666af20f 100644 --- a/transport/src/main/java/io/netty/channel/group/DefaultChannelGroup.java +++ b/transport/src/main/java/io/netty/channel/group/DefaultChannelGroup.java @@ -170,10 +170,6 @@ public class DefaultChannelGroup extends AbstractSet implements Channel public ChannelGroupFuture disconnect() { return disconnect(ChannelMatchers.all()); } - @Override - public ChannelGroupFuture deregister() { - return deregister(ChannelMatchers.all()); - } @Override public ChannelGroupFuture write(Object message) { @@ -268,29 +264,6 @@ public class DefaultChannelGroup extends AbstractSet implements Channel return new DefaultChannelGroupFuture(this, futures, executor); } - @Override - public ChannelGroupFuture deregister(ChannelMatcher matcher) { - if (matcher == null) { - throw new NullPointerException("matcher"); - } - - Map futures = - new LinkedHashMap(size()); - - for (Channel c: serverChannels) { - if (matcher.matches(c)) { - futures.put(c, c.deregister()); - } - } - for (Channel c: nonServerChannels) { - if (matcher.matches(c)) { - futures.put(c, c.deregister()); - } - } - - return new DefaultChannelGroupFuture(this, futures, executor); - } - @Override public ChannelGroup flush(ChannelMatcher matcher) { for (Channel c: nonServerChannels) { diff --git a/transport/src/main/java/io/netty/channel/local/LocalChannel.java b/transport/src/main/java/io/netty/channel/local/LocalChannel.java index a52937ad38..0fa5af4fbe 100644 --- a/transport/src/main/java/io/netty/channel/local/LocalChannel.java +++ b/transport/src/main/java/io/netty/channel/local/LocalChannel.java @@ -154,7 +154,7 @@ public class LocalChannel extends AbstractChannel { if (peer != null) { state = 2; - peer.remoteAddress = parent().localAddress(); + peer.remoteAddress = parent() == null ? null : parent().localAddress(); peer.state = 2; // Always call peer.eventLoop().execute() even if peer.eventLoop().inEventLoop() is true. @@ -219,9 +219,6 @@ public class LocalChannel extends AbstractChannel { @Override protected void doDeregister() throws Exception { - if (isOpen()) { - unsafe().close(unsafe().voidPromise()); - } ((SingleThreadEventExecutor) eventLoop()).removeShutdownHook(shutdownHook); } diff --git a/transport/src/test/java/io/netty/channel/AbstractEventLoopTest.java b/transport/src/test/java/io/netty/channel/AbstractEventLoopTest.java index e87934e0e3..32c7c461c0 100644 --- a/transport/src/test/java/io/netty/channel/AbstractEventLoopTest.java +++ b/transport/src/test/java/io/netty/channel/AbstractEventLoopTest.java @@ -15,21 +15,14 @@ */ package io.netty.channel; -import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.socket.ServerSocketChannel; -import io.netty.channel.socket.SocketChannel; -import io.netty.util.concurrent.DefaultEventExecutorGroup; -import io.netty.util.concurrent.EventExecutor; -import io.netty.util.concurrent.EventExecutorGroup; -import org.junit.Test; - -import static org.junit.Assert.*; public abstract class AbstractEventLoopTest { /** * Test for https://github.com/netty/netty/issues/803 */ + /* @Test public void testReregister() { EventLoopGroup group = newEventLoopGroup(); @@ -53,7 +46,6 @@ public abstract class AbstractEventLoopTest { EventExecutor executor = future.channel().pipeline().context(TestChannelHandler2.class).executor(); EventExecutor executor1 = future.channel().pipeline().context(TestChannelHandler.class).executor(); - future.channel().deregister().awaitUninterruptibly(); Channel channel = group2.register(future.channel()).awaitUninterruptibly().channel(); EventExecutor executorNew = channel.pipeline().context(TestChannelHandler.class).executor(); assertNotSame(executor1, executorNew); @@ -66,7 +58,7 @@ public abstract class AbstractEventLoopTest { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { } } - +*/ protected abstract EventLoopGroup newEventLoopGroup(); protected abstract Class newChannel(); } diff --git a/transport/src/test/java/io/netty/channel/BaseChannelTest.java b/transport/src/test/java/io/netty/channel/BaseChannelTest.java index 907ed2def4..b3aaf3d1f4 100644 --- a/transport/src/test/java/io/netty/channel/BaseChannelTest.java +++ b/transport/src/test/java/io/netty/channel/BaseChannelTest.java @@ -25,6 +25,8 @@ import io.netty.channel.local.LocalChannel; import io.netty.channel.local.LocalEventLoopGroup; import io.netty.channel.local.LocalServerChannel; +import java.io.UnsupportedEncodingException; + class BaseChannelTest { private final LoggingHandler loggingHandler; @@ -64,6 +66,18 @@ class BaseChannelTest { return buf; } + static Object createTestBuf(String string) throws UnsupportedEncodingException { + byte[] buf = string.getBytes("US-ASCII"); + return createTestBuf(buf); + } + + static Object createTestBuf(byte[] buf) { + ByteBuf ret = createTestBuf(buf.length); + ret.clear(); + ret.writeBytes(buf); + return ret; + } + void assertLog(String expected) { String actual = this.loggingHandler.getLog(); assertEquals(expected, actual); diff --git a/transport/src/test/java/io/netty/channel/LoggingHandler.java b/transport/src/test/java/io/netty/channel/LoggingHandler.java index 1fc72d8f40..c58050439a 100644 --- a/transport/src/test/java/io/netty/channel/LoggingHandler.java +++ b/transport/src/test/java/io/netty/channel/LoggingHandler.java @@ -21,9 +21,8 @@ import java.util.EnumSet; final class LoggingHandler implements ChannelInboundHandler, ChannelOutboundHandler { - static enum Event { WRITE, FLUSH, BIND, CONNECT, DISCONNECT, CLOSE, DEREGISTER, READ, WRITABILITY, - HANDLER_ADDED, HANDLER_REMOVED, EXCEPTION, READ_COMPLETE, REGISTERED, UNREGISTERED, ACTIVE, INACTIVE, - USER }; + static enum Event { WRITE, FLUSH, BIND, CONNECT, DISCONNECT, CLOSE, READ, WRITABILITY, HANDLER_ADDED, + HANDLER_REMOVED, EXCEPTION, READ_COMPLETE, REGISTERED, ACTIVE, INACTIVE, USER }; private StringBuilder log = new StringBuilder(); @@ -67,12 +66,6 @@ final class LoggingHandler implements ChannelInboundHandler, ChannelOutboundHand ctx.close(promise); } - @Override - public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { - log(Event.DEREGISTER); - ctx.deregister(promise); - } - @Override public void read(ChannelHandlerContext ctx) throws Exception { log(Event.READ); @@ -106,12 +99,6 @@ final class LoggingHandler implements ChannelInboundHandler, ChannelOutboundHand ctx.fireChannelRegistered(); } - @Override - public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { - log(Event.UNREGISTERED); - ctx.fireChannelUnregistered(); - } - @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { log(Event.ACTIVE); diff --git a/transport/src/test/java/io/netty/channel/ReentrantChannelTest.java b/transport/src/test/java/io/netty/channel/ReentrantChannelTest.java index eb35ef8ee4..67cd5fbe17 100644 --- a/transport/src/test/java/io/netty/channel/ReentrantChannelTest.java +++ b/transport/src/test/java/io/netty/channel/ReentrantChannelTest.java @@ -15,11 +15,17 @@ */ package io.netty.channel; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.LoggingHandler.Event; import io.netty.channel.local.LocalAddress; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.GenericFutureListener; + +import java.nio.channels.ClosedChannelException; import org.junit.Test; @@ -91,4 +97,141 @@ public class ReentrantChannelTest extends BaseChannelTest { "WRITABILITY: writable=true\n"); } + @Test + public void testWriteFlushPingPong() throws Exception { + + LocalAddress addr = new LocalAddress("testWriteFlushPingPong"); + + ServerBootstrap sb = getLocalServerBootstrap(); + sb.bind(addr).sync().channel(); + + Bootstrap cb = getLocalClientBootstrap(); + + setInterest(Event.WRITE, Event.FLUSH, Event.CLOSE, Event.EXCEPTION); + + Channel clientChannel = cb.connect(addr).sync().channel(); + + clientChannel.pipeline().addLast(new ChannelOutboundHandlerAdapter() { + + int writeCount; + int flushCount; + + @Override + public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { + if (writeCount < 5) { + writeCount++; + ctx.channel().flush(); + } + super.write(ctx, msg, promise); + } + + @Override + public void flush(ChannelHandlerContext ctx) throws Exception { + if (flushCount < 5) { + flushCount++; + ctx.channel().write(createTestBuf(2000)); + } + super.flush(ctx); + } + }); + + clientChannel.writeAndFlush(createTestBuf(2000)); + clientChannel.close().sync(); + + assertLog( + "WRITE\n" + + "FLUSH\n" + + "WRITE\n" + + "FLUSH\n" + + "WRITE\n" + + "FLUSH\n" + + "WRITE\n" + + "FLUSH\n" + + "WRITE\n" + + "FLUSH\n" + + "WRITE\n" + + "FLUSH\n" + + "CLOSE\n"); + } + + @Test + public void testCloseInFlush() throws Exception { + + LocalAddress addr = new LocalAddress("testCloseInFlush"); + + ServerBootstrap sb = getLocalServerBootstrap(); + sb.bind(addr).sync().channel(); + + Bootstrap cb = getLocalClientBootstrap(); + + setInterest(Event.WRITE, Event.FLUSH, Event.CLOSE, Event.EXCEPTION); + + Channel clientChannel = cb.connect(addr).sync().channel(); + + clientChannel.pipeline().addLast(new ChannelOutboundHandlerAdapter() { + + @Override + public void write(final ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { + promise.addListener(new GenericFutureListener>() { + @Override + public void operationComplete(Future future) throws Exception { + ctx.channel().close(); + } + }); + super.write(ctx, msg, promise); + ctx.channel().flush(); + } + }); + + clientChannel.write(createTestBuf(2000)).sync(); + clientChannel.closeFuture().sync(); + + assertLog( + "WRITE\n" + + "FLUSH\n" + + "CLOSE\n"); + } + + @Test + public void testFlushFailure() throws Exception { + + LocalAddress addr = new LocalAddress("testFlushFailure"); + + ServerBootstrap sb = getLocalServerBootstrap(); + sb.bind(addr).sync().channel(); + + Bootstrap cb = getLocalClientBootstrap(); + + setInterest(Event.WRITE, Event.FLUSH, Event.CLOSE, Event.EXCEPTION); + + Channel clientChannel = cb.connect(addr).sync().channel(); + + clientChannel.pipeline().addLast(new ChannelOutboundHandlerAdapter() { + + @Override + public void flush(ChannelHandlerContext ctx) throws Exception { + throw new Exception("intentional failure"); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + ctx.close(); + } + }); + + try { + clientChannel.writeAndFlush(createTestBuf(2000)).sync(); + fail(); + } catch (Throwable cce) { + // FIXME: shouldn't this contain the "intentional failure" exception? + assertEquals(ClosedChannelException.class, cce.getClass()); + } + + clientChannel.closeFuture().sync(); + + assertLog( + "WRITE\n" + + "CLOSE\n"); + } + } diff --git a/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest3.java b/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest3.java index bfb3b4bd67..7e9292a878 100644 --- a/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest3.java +++ b/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest3.java @@ -28,11 +28,6 @@ import io.netty.util.ReferenceCountUtil; import io.netty.util.concurrent.DefaultEventExecutorGroup; import io.netty.util.concurrent.DefaultThreadFactory; import io.netty.util.concurrent.EventExecutorGroup; -import org.junit.AfterClass; -import org.junit.Assert; -import org.junit.BeforeClass; -import org.junit.Ignore; -import org.junit.Test; import java.util.Deque; import java.util.LinkedList; @@ -41,6 +36,12 @@ import java.util.Random; import java.util.UUID; import java.util.concurrent.ConcurrentLinkedDeque; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Ignore; +import org.junit.Test; + public class LocalTransportThreadModelTest3 { enum EventType { @@ -49,7 +50,6 @@ public class LocalTransportThreadModelTest3 { MESSAGE_RECEIVED_LAST, INACTIVE, ACTIVE, - UNREGISTERED, REGISTERED, MESSAGE_RECEIVED, WRITE, @@ -198,14 +198,9 @@ public class LocalTransportThreadModelTest3 { ch.close().sync(); - while (events.peekLast() != EventType.UNREGISTERED) { - Thread.sleep(10); - } - expectedEvents.addFirst(EventType.ACTIVE); expectedEvents.addFirst(EventType.REGISTERED); expectedEvents.addLast(EventType.INACTIVE); - expectedEvents.addLast(EventType.UNREGISTERED); for (;;) { EventType event = events.poll(); @@ -292,11 +287,6 @@ public class LocalTransportThreadModelTest3 { events.add(EventType.ACTIVE); } - @Override - public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { - events.add(EventType.UNREGISTERED); - } - @Override public void channelRegistered(ChannelHandlerContext ctx) throws Exception { events.add(EventType.REGISTERED);