From 2d9735817c3a93a4741dc4fd53f9886daf00c160 Mon Sep 17 00:00:00 2001 From: Trustin Lee Date: Fri, 25 Apr 2014 15:27:21 +0900 Subject: [PATCH] Synchronized between 4.1 and master (part 3) Motivation: 4 and 5 were diverged long time ago and we recently reverted some of the early commits in master. We must make sure 4.1 and master are not very different now. Modification: Fix found differences Result: 4.1 and master got closer. --- README.md | 1 + .../MultithreadEventExecutorGroup.java | 1 + .../netty/handler/logging/LoggingHandler.java | 13 ++++++- .../io/netty/bootstrap/AbstractBootstrap.java | 2 +- .../io/netty/channel/group/ChannelGroup.java | 20 ++++++++++ .../channel/group/DefaultChannelGroup.java | 28 ++++++++++++++ .../netty/channel/AbstractEventLoopTest.java | 16 ++++++-- .../channel/SingleThreadEventLoopTest.java | 2 +- .../netty/channel/local/LocalChannelTest.java | 37 ++++++++++++++++++- .../local/LocalTransportThreadModelTest3.java | 11 ++++++ 10 files changed, 121 insertions(+), 10 deletions(-) diff --git a/README.md b/README.md index c6318fb9a5..6feeddc0fc 100644 --- a/README.md +++ b/README.md @@ -23,3 +23,4 @@ Note that this is build-time requirement. JDK 5 (for 3.x) or 6 (for 4.0+) is en ## Branches to look [The 'master' branch](https://github.com/netty/netty/tree/master) is where the development of the latest major version lives on. The development of all other versions takes place in each branch whose name is identical to `.`. For example, the development of 3.9 and 4.0 resides in [the branch '3.9'](https://github.com/netty/netty/tree/3.9) and [the branch '4.0'](https://github.com/netty/netty/tree/4.0) respectively. + diff --git a/common/src/main/java/io/netty/util/concurrent/MultithreadEventExecutorGroup.java b/common/src/main/java/io/netty/util/concurrent/MultithreadEventExecutorGroup.java index 9ad7805207..1b7a0303bb 100644 --- a/common/src/main/java/io/netty/util/concurrent/MultithreadEventExecutorGroup.java +++ b/common/src/main/java/io/netty/util/concurrent/MultithreadEventExecutorGroup.java @@ -91,6 +91,7 @@ public abstract class MultithreadEventExecutorGroup extends AbstractEventExecuto e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS); } } catch (InterruptedException interrupted) { + // Let the caller handle the interruption. Thread.currentThread().interrupt(); break; } diff --git a/handler/src/main/java/io/netty/handler/logging/LoggingHandler.java b/handler/src/main/java/io/netty/handler/logging/LoggingHandler.java index c41372a3bf..f867181a41 100644 --- a/handler/src/main/java/io/netty/handler/logging/LoggingHandler.java +++ b/handler/src/main/java/io/netty/handler/logging/LoggingHandler.java @@ -244,8 +244,9 @@ public class LoggingHandler extends ChannelHandlerAdapter { } @Override - public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, - ChannelPromise promise) throws Exception { + public void connect( + ChannelHandlerContext ctx, + SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) throws Exception { if (logger.isEnabled(internalLevel)) { logger.log(internalLevel, format(ctx, "CONNECT", remoteAddress, localAddress)); } @@ -268,6 +269,14 @@ public class LoggingHandler extends ChannelHandlerAdapter { ctx.close(promise); } + @Override + public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { + if (logger.isEnabled(internalLevel)) { + logger.log(internalLevel, format(ctx, "DEREGISTER")); + } + ctx.deregister(promise); + } + @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if (logger.isEnabled(internalLevel)) { diff --git a/transport/src/main/java/io/netty/bootstrap/AbstractBootstrap.java b/transport/src/main/java/io/netty/bootstrap/AbstractBootstrap.java index 5766d56ca4..4810d2f1a1 100644 --- a/transport/src/main/java/io/netty/bootstrap/AbstractBootstrap.java +++ b/transport/src/main/java/io/netty/bootstrap/AbstractBootstrap.java @@ -202,7 +202,7 @@ public abstract class AbstractBootstrap, C ext throw new IllegalStateException("group not set"); } if (channelFactory == null) { - throw new IllegalStateException("factory not set"); + throw new IllegalStateException("channel or channelFactory not set"); } return (B) this; } diff --git a/transport/src/main/java/io/netty/channel/group/ChannelGroup.java b/transport/src/main/java/io/netty/channel/group/ChannelGroup.java index 039ec231c3..a6ce2ffce5 100644 --- a/transport/src/main/java/io/netty/channel/group/ChannelGroup.java +++ b/transport/src/main/java/io/netty/channel/group/ChannelGroup.java @@ -23,6 +23,7 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelId; +import io.netty.channel.EventLoop; import io.netty.channel.ServerChannel; import io.netty.util.CharsetUtil; import io.netty.util.concurrent.GlobalEventExecutor; @@ -205,4 +206,23 @@ public interface ChannelGroup extends Set, Comparable { * the operation is done for all channels */ ChannelGroupFuture close(ChannelMatcher matcher); + + /** + * Deregister all {@link Channel}s in this group from their {@link EventLoop}. + * Please note that this operation is asynchronous as {@link Channel#deregister()} is. + * + * @return the {@link ChannelGroupFuture} instance that notifies when + * the operation is done for all channels + */ + @Deprecated + ChannelGroupFuture deregister(); + + /** + * Deregister all {@link Channel}s in this group from their {@link EventLoop} that match the given + * {@link ChannelMatcher}. Please note that this operation is asynchronous as {@link Channel#deregister()} is. + * + * @return the {@link ChannelGroupFuture} instance that notifies when + * the operation is done for all channels + */ + ChannelGroupFuture deregister(ChannelMatcher matcher); } diff --git a/transport/src/main/java/io/netty/channel/group/DefaultChannelGroup.java b/transport/src/main/java/io/netty/channel/group/DefaultChannelGroup.java index fd54b25d87..d3c20d7645 100644 --- a/transport/src/main/java/io/netty/channel/group/DefaultChannelGroup.java +++ b/transport/src/main/java/io/netty/channel/group/DefaultChannelGroup.java @@ -189,6 +189,11 @@ public class DefaultChannelGroup extends AbstractSet implements Channel return disconnect(ChannelMatchers.all()); } + @Override + public ChannelGroupFuture deregister() { + return deregister(ChannelMatchers.all()); + } + @Override public ChannelGroupFuture write(Object message) { return write(message, ChannelMatchers.all()); @@ -282,6 +287,29 @@ public class DefaultChannelGroup extends AbstractSet implements Channel return new DefaultChannelGroupFuture(this, futures, executor); } + @Override + public ChannelGroupFuture deregister(ChannelMatcher matcher) { + if (matcher == null) { + throw new NullPointerException("matcher"); + } + + Map futures = + new LinkedHashMap(size()); + + for (Channel c: serverChannels.values()) { + if (matcher.matches(c)) { + futures.put(c, c.deregister()); + } + } + for (Channel c: nonServerChannels.values()) { + if (matcher.matches(c)) { + futures.put(c, c.deregister()); + } + } + + return new DefaultChannelGroupFuture(this, futures, executor); + } + @Override public ChannelGroup flush(ChannelMatcher matcher) { for (Channel c: nonServerChannels.values()) { diff --git a/transport/src/test/java/io/netty/channel/AbstractEventLoopTest.java b/transport/src/test/java/io/netty/channel/AbstractEventLoopTest.java index 32c7c461c0..24b3d7606c 100644 --- a/transport/src/test/java/io/netty/channel/AbstractEventLoopTest.java +++ b/transport/src/test/java/io/netty/channel/AbstractEventLoopTest.java @@ -15,14 +15,21 @@ */ package io.netty.channel; +import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.socket.ServerSocketChannel; +import io.netty.channel.socket.SocketChannel; +import io.netty.util.concurrent.DefaultEventExecutorGroup; +import io.netty.util.concurrent.EventExecutor; +import io.netty.util.concurrent.EventExecutorGroup; +import org.junit.Test; + +import static org.junit.Assert.*; public abstract class AbstractEventLoopTest { /** * Test for https://github.com/netty/netty/issues/803 */ - /* @Test public void testReregister() { EventLoopGroup group = newEventLoopGroup(); @@ -46,19 +53,20 @@ public abstract class AbstractEventLoopTest { EventExecutor executor = future.channel().pipeline().context(TestChannelHandler2.class).executor(); EventExecutor executor1 = future.channel().pipeline().context(TestChannelHandler.class).executor(); + future.channel().deregister().awaitUninterruptibly(); Channel channel = group2.register(future.channel()).awaitUninterruptibly().channel(); EventExecutor executorNew = channel.pipeline().context(TestChannelHandler.class).executor(); assertNotSame(executor1, executorNew); assertSame(executor, future.channel().pipeline().context(TestChannelHandler2.class).executor()); } - private static final class TestChannelHandler extends ChannelDuplexHandler { } + private static final class TestChannelHandler extends ChannelHandlerAdapter { } - private static final class TestChannelHandler2 extends ChannelDuplexHandler { + private static final class TestChannelHandler2 extends ChannelHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { } } -*/ + protected abstract EventLoopGroup newEventLoopGroup(); protected abstract Class newChannel(); } diff --git a/transport/src/test/java/io/netty/channel/SingleThreadEventLoopTest.java b/transport/src/test/java/io/netty/channel/SingleThreadEventLoopTest.java index bc16d20462..281cedc154 100644 --- a/transport/src/test/java/io/netty/channel/SingleThreadEventLoopTest.java +++ b/transport/src/test/java/io/netty/channel/SingleThreadEventLoopTest.java @@ -249,7 +249,7 @@ public class SingleThreadEventLoopTest { testScheduleTaskWithFixedDelay(loopB); } - private static void testScheduleTaskWithFixedDelay(EventExecutor loopA) throws InterruptedException { + private static void testScheduleTaskWithFixedDelay(EventLoop loopA) throws InterruptedException { final Queue timestamps = new LinkedBlockingQueue(); ScheduledFuture f = loopA.scheduleWithFixedDelay(new Runnable() { @Override diff --git a/transport/src/test/java/io/netty/channel/local/LocalChannelTest.java b/transport/src/test/java/io/netty/channel/local/LocalChannelTest.java index b17aed9b5d..376e00432b 100644 --- a/transport/src/test/java/io/netty/channel/local/LocalChannelTest.java +++ b/transport/src/test/java/io/netty/channel/local/LocalChannelTest.java @@ -19,8 +19,8 @@ import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.AbstractChannel; import io.netty.channel.Channel; -import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInitializer; import io.netty.channel.DefaultEventLoopGroup; @@ -248,8 +248,41 @@ public class LocalChannelTest { } } - static class TestHandler extends ChannelHandlerAdapter { + @Test + public void testReRegister() { + EventLoopGroup group1 = new LocalEventLoopGroup(); + EventLoopGroup group2 = new LocalEventLoopGroup(); + LocalAddress addr = new LocalAddress(LOCAL_ADDR_ID); + Bootstrap cb = new Bootstrap(); + ServerBootstrap sb = new ServerBootstrap(); + cb.group(group1) + .channel(LocalChannel.class) + .handler(new TestHandler()); + + sb.group(group2) + .channel(LocalServerChannel.class) + .childHandler(new ChannelInitializer() { + @Override + public void initChannel(LocalChannel ch) throws Exception { + ch.pipeline().addLast(new TestHandler()); + } + }); + + // Start server + final Channel sc = sb.bind(addr).syncUninterruptibly().channel(); + + // Connect to the server + final Channel cc = cb.connect(addr).syncUninterruptibly().channel(); + + cc.deregister().syncUninterruptibly(); + // Change event loop group. + group2.register(cc).syncUninterruptibly(); + cc.close().syncUninterruptibly(); + sc.close().syncUninterruptibly(); + } + + static class TestHandler extends ChannelHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { logger.info(String.format("Received mesage: %s", msg)); diff --git a/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest3.java b/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest3.java index b8081a160f..19960ab343 100644 --- a/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest3.java +++ b/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest3.java @@ -49,6 +49,7 @@ public class LocalTransportThreadModelTest3 { MESSAGE_RECEIVED_LAST, INACTIVE, ACTIVE, + UNREGISTERED, REGISTERED, MESSAGE_RECEIVED, WRITE, @@ -197,9 +198,14 @@ public class LocalTransportThreadModelTest3 { ch.close().sync(); + while (events.peekLast() != EventType.UNREGISTERED) { + Thread.sleep(10); + } + expectedEvents.addFirst(EventType.ACTIVE); expectedEvents.addFirst(EventType.REGISTERED); expectedEvents.addLast(EventType.INACTIVE); + expectedEvents.addLast(EventType.UNREGISTERED); for (;;) { EventType event = events.poll(); @@ -286,6 +292,11 @@ public class LocalTransportThreadModelTest3 { events.add(EventType.ACTIVE); } + @Override + public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { + events.add(EventType.UNREGISTERED); + } + @Override public void channelRegistered(ChannelHandlerContext ctx) throws Exception { events.add(EventType.REGISTERED);