Synchronized between 4.1 and master (part 3)

Motivation:

4 and 5 were diverged long time ago and we recently reverted some of the
early commits in master.  We must make sure 4.1 and master are not very
different now.

Modification:

Fix found differences

Result:

4.1 and master got closer.
This commit is contained in:
Trustin Lee 2014-04-25 15:27:21 +09:00
parent 28742b91a5
commit 2d9735817c
10 changed files with 121 additions and 10 deletions

View File

@ -23,3 +23,4 @@ Note that this is build-time requirement. JDK 5 (for 3.x) or 6 (for 4.0+) is en
## Branches to look
[The 'master' branch](https://github.com/netty/netty/tree/master) is where the development of the latest major version lives on. The development of all other versions takes place in each branch whose name is identical to `<majorVersion>.<minorVersion>`. For example, the development of 3.9 and 4.0 resides in [the branch '3.9'](https://github.com/netty/netty/tree/3.9) and [the branch '4.0'](https://github.com/netty/netty/tree/4.0) respectively.

View File

@ -91,6 +91,7 @@ public abstract class MultithreadEventExecutorGroup extends AbstractEventExecuto
e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
}
} catch (InterruptedException interrupted) {
// Let the caller handle the interruption.
Thread.currentThread().interrupt();
break;
}

View File

@ -244,8 +244,9 @@ public class LoggingHandler extends ChannelHandlerAdapter {
}
@Override
public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress,
ChannelPromise promise) throws Exception {
public void connect(
ChannelHandlerContext ctx,
SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) throws Exception {
if (logger.isEnabled(internalLevel)) {
logger.log(internalLevel, format(ctx, "CONNECT", remoteAddress, localAddress));
}
@ -268,6 +269,14 @@ public class LoggingHandler extends ChannelHandlerAdapter {
ctx.close(promise);
}
@Override
public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
if (logger.isEnabled(internalLevel)) {
logger.log(internalLevel, format(ctx, "DEREGISTER"));
}
ctx.deregister(promise);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (logger.isEnabled(internalLevel)) {

View File

@ -202,7 +202,7 @@ public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C ext
throw new IllegalStateException("group not set");
}
if (channelFactory == null) {
throw new IllegalStateException("factory not set");
throw new IllegalStateException("channel or channelFactory not set");
}
return (B) this;
}

View File

@ -23,6 +23,7 @@ import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelId;
import io.netty.channel.EventLoop;
import io.netty.channel.ServerChannel;
import io.netty.util.CharsetUtil;
import io.netty.util.concurrent.GlobalEventExecutor;
@ -205,4 +206,23 @@ public interface ChannelGroup extends Set<Channel>, Comparable<ChannelGroup> {
* the operation is done for all channels
*/
ChannelGroupFuture close(ChannelMatcher matcher);
/**
* Deregister all {@link Channel}s in this group from their {@link EventLoop}.
* Please note that this operation is asynchronous as {@link Channel#deregister()} is.
*
* @return the {@link ChannelGroupFuture} instance that notifies when
* the operation is done for all channels
*/
@Deprecated
ChannelGroupFuture deregister();
/**
* Deregister all {@link Channel}s in this group from their {@link EventLoop} that match the given
* {@link ChannelMatcher}. Please note that this operation is asynchronous as {@link Channel#deregister()} is.
*
* @return the {@link ChannelGroupFuture} instance that notifies when
* the operation is done for all channels
*/
ChannelGroupFuture deregister(ChannelMatcher matcher);
}

View File

@ -189,6 +189,11 @@ public class DefaultChannelGroup extends AbstractSet<Channel> implements Channel
return disconnect(ChannelMatchers.all());
}
@Override
public ChannelGroupFuture deregister() {
return deregister(ChannelMatchers.all());
}
@Override
public ChannelGroupFuture write(Object message) {
return write(message, ChannelMatchers.all());
@ -282,6 +287,29 @@ public class DefaultChannelGroup extends AbstractSet<Channel> implements Channel
return new DefaultChannelGroupFuture(this, futures, executor);
}
@Override
public ChannelGroupFuture deregister(ChannelMatcher matcher) {
if (matcher == null) {
throw new NullPointerException("matcher");
}
Map<Channel, ChannelFuture> futures =
new LinkedHashMap<Channel, ChannelFuture>(size());
for (Channel c: serverChannels.values()) {
if (matcher.matches(c)) {
futures.put(c, c.deregister());
}
}
for (Channel c: nonServerChannels.values()) {
if (matcher.matches(c)) {
futures.put(c, c.deregister());
}
}
return new DefaultChannelGroupFuture(this, futures, executor);
}
@Override
public ChannelGroup flush(ChannelMatcher matcher) {
for (Channel c: nonServerChannels.values()) {

View File

@ -15,14 +15,21 @@
*/
package io.netty.channel;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.socket.ServerSocketChannel;
import io.netty.channel.socket.SocketChannel;
import io.netty.util.concurrent.DefaultEventExecutorGroup;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.EventExecutorGroup;
import org.junit.Test;
import static org.junit.Assert.*;
public abstract class AbstractEventLoopTest {
/**
* Test for https://github.com/netty/netty/issues/803
*/
/*
@Test
public void testReregister() {
EventLoopGroup group = newEventLoopGroup();
@ -46,19 +53,20 @@ public abstract class AbstractEventLoopTest {
EventExecutor executor = future.channel().pipeline().context(TestChannelHandler2.class).executor();
EventExecutor executor1 = future.channel().pipeline().context(TestChannelHandler.class).executor();
future.channel().deregister().awaitUninterruptibly();
Channel channel = group2.register(future.channel()).awaitUninterruptibly().channel();
EventExecutor executorNew = channel.pipeline().context(TestChannelHandler.class).executor();
assertNotSame(executor1, executorNew);
assertSame(executor, future.channel().pipeline().context(TestChannelHandler2.class).executor());
}
private static final class TestChannelHandler extends ChannelDuplexHandler { }
private static final class TestChannelHandler extends ChannelHandlerAdapter { }
private static final class TestChannelHandler2 extends ChannelDuplexHandler {
private static final class TestChannelHandler2 extends ChannelHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { }
}
*/
protected abstract EventLoopGroup newEventLoopGroup();
protected abstract Class<? extends ServerSocketChannel> newChannel();
}

View File

@ -249,7 +249,7 @@ public class SingleThreadEventLoopTest {
testScheduleTaskWithFixedDelay(loopB);
}
private static void testScheduleTaskWithFixedDelay(EventExecutor loopA) throws InterruptedException {
private static void testScheduleTaskWithFixedDelay(EventLoop loopA) throws InterruptedException {
final Queue<Long> timestamps = new LinkedBlockingQueue<Long>();
ScheduledFuture<?> f = loopA.scheduleWithFixedDelay(new Runnable() {
@Override

View File

@ -19,8 +19,8 @@ import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.AbstractChannel;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.DefaultEventLoopGroup;
@ -248,8 +248,41 @@ public class LocalChannelTest {
}
}
static class TestHandler extends ChannelHandlerAdapter {
@Test
public void testReRegister() {
EventLoopGroup group1 = new LocalEventLoopGroup();
EventLoopGroup group2 = new LocalEventLoopGroup();
LocalAddress addr = new LocalAddress(LOCAL_ADDR_ID);
Bootstrap cb = new Bootstrap();
ServerBootstrap sb = new ServerBootstrap();
cb.group(group1)
.channel(LocalChannel.class)
.handler(new TestHandler());
sb.group(group2)
.channel(LocalServerChannel.class)
.childHandler(new ChannelInitializer<LocalChannel>() {
@Override
public void initChannel(LocalChannel ch) throws Exception {
ch.pipeline().addLast(new TestHandler());
}
});
// Start server
final Channel sc = sb.bind(addr).syncUninterruptibly().channel();
// Connect to the server
final Channel cc = cb.connect(addr).syncUninterruptibly().channel();
cc.deregister().syncUninterruptibly();
// Change event loop group.
group2.register(cc).syncUninterruptibly();
cc.close().syncUninterruptibly();
sc.close().syncUninterruptibly();
}
static class TestHandler extends ChannelHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
logger.info(String.format("Received mesage: %s", msg));

View File

@ -49,6 +49,7 @@ public class LocalTransportThreadModelTest3 {
MESSAGE_RECEIVED_LAST,
INACTIVE,
ACTIVE,
UNREGISTERED,
REGISTERED,
MESSAGE_RECEIVED,
WRITE,
@ -197,9 +198,14 @@ public class LocalTransportThreadModelTest3 {
ch.close().sync();
while (events.peekLast() != EventType.UNREGISTERED) {
Thread.sleep(10);
}
expectedEvents.addFirst(EventType.ACTIVE);
expectedEvents.addFirst(EventType.REGISTERED);
expectedEvents.addLast(EventType.INACTIVE);
expectedEvents.addLast(EventType.UNREGISTERED);
for (;;) {
EventType event = events.poll();
@ -286,6 +292,11 @@ public class LocalTransportThreadModelTest3 {
events.add(EventType.ACTIVE);
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
events.add(EventType.UNREGISTERED);
}
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
events.add(EventType.REGISTERED);