diff --git a/transport/src/main/java/io/netty/channel/AbstractChannel.java b/transport/src/main/java/io/netty/channel/AbstractChannel.java index d84b53d143..34c8e8e731 100644 --- a/transport/src/main/java/io/netty/channel/AbstractChannel.java +++ b/transport/src/main/java/io/netty/channel/AbstractChannel.java @@ -437,7 +437,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha future.setFailure(t); pipeline.fireExceptionCaught(t); - closeFuture().setSuccess(); + closeFuture.setClosed(); } } diff --git a/transport/src/main/java/io/netty/channel/socket/aio/AbstractAioChannel.java b/transport/src/main/java/io/netty/channel/socket/aio/AbstractAioChannel.java index 21ae6f7a9b..80c6fa5daa 100644 --- a/transport/src/main/java/io/netty/channel/socket/aio/AbstractAioChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/aio/AbstractAioChannel.java @@ -17,6 +17,7 @@ package io.netty.channel.socket.aio; import io.netty.channel.AbstractChannel; import io.netty.channel.Channel; +import io.netty.channel.ChannelException; import io.netty.channel.ChannelFuture; import io.netty.channel.EventLoop; @@ -27,8 +28,9 @@ import java.nio.channels.AsynchronousChannel; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; -public abstract class AbstractAioChannel extends AbstractChannel { +abstract class AbstractAioChannel extends AbstractChannel { + protected final AioEventLoop eventLoop; private final AsynchronousChannel ch; /** @@ -39,9 +41,10 @@ public abstract class AbstractAioChannel extends AbstractChannel { protected ScheduledFuture connectTimeoutFuture; private ConnectException connectTimeoutException; - protected AbstractAioChannel(Channel parent, Integer id, AsynchronousChannel ch) { + protected AbstractAioChannel(Channel parent, Integer id, AioEventLoop eventLoop, AsynchronousChannel ch) { super(parent, id); this.ch = ch; + this.eventLoop = eventLoop; } @Override @@ -63,6 +66,16 @@ public abstract class AbstractAioChannel extends AbstractChannel { return ch.isOpen(); } + @Override + protected Runnable doRegister() throws Exception { + if (((AioChildEventLoop) eventLoop()).parent != eventLoop) { + throw new ChannelException( + getClass().getSimpleName() + " must be registered to the " + + AioEventLoop.class.getSimpleName() + " which was specified in the constructor."); + } + return null; + } + @Override protected void doDeregister() throws Exception { // NOOP diff --git a/transport/src/main/java/io/netty/channel/socket/aio/AioChildEventLoop.java b/transport/src/main/java/io/netty/channel/socket/aio/AioChildEventLoop.java index ee61b1e4ee..fcadb4306f 100644 --- a/transport/src/main/java/io/netty/channel/socket/aio/AioChildEventLoop.java +++ b/transport/src/main/java/io/netty/channel/socket/aio/AioChildEventLoop.java @@ -21,8 +21,11 @@ import java.util.concurrent.ThreadFactory; final class AioChildEventLoop extends SingleThreadEventLoop { - AioChildEventLoop(ThreadFactory threadFactory) { + final AioEventLoop parent; + + AioChildEventLoop(AioEventLoop parent, ThreadFactory threadFactory) { super(threadFactory); + this.parent = parent; } @Override diff --git a/transport/src/main/java/io/netty/channel/socket/aio/AioEventLoop.java b/transport/src/main/java/io/netty/channel/socket/aio/AioEventLoop.java index 844109bcf3..bcce2c4365 100644 --- a/transport/src/main/java/io/netty/channel/socket/aio/AioEventLoop.java +++ b/transport/src/main/java/io/netty/channel/socket/aio/AioEventLoop.java @@ -99,6 +99,6 @@ public class AioEventLoop extends MultithreadEventLoop { @Override protected EventExecutor newChild(ThreadFactory threadFactory, Object... args) throws Exception { - return new AioChildEventLoop(threadFactory); + return new AioChildEventLoop(this, threadFactory); } } diff --git a/transport/src/main/java/io/netty/channel/socket/aio/AioServerSocketChannel.java b/transport/src/main/java/io/netty/channel/socket/aio/AioServerSocketChannel.java index 53d33b2237..5be38f2f46 100644 --- a/transport/src/main/java/io/netty/channel/socket/aio/AioServerSocketChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/aio/AioServerSocketChannel.java @@ -51,7 +51,7 @@ public class AioServerSocketChannel extends AbstractAioChannel implements Server } public AioServerSocketChannel(AioEventLoop eventLoop) { - super(null, null, newSocket(eventLoop.group)); + super(null, null, eventLoop, newSocket(eventLoop.group)); config = new AioServerSocketChannelConfig(javaChannel()); } @@ -117,7 +117,7 @@ public class AioServerSocketChannel extends AbstractAioChannel implements Server @Override protected Runnable doRegister() throws Exception { - return null; + return super.doRegister(); } private static final class AcceptHandler @@ -129,7 +129,8 @@ public class AioServerSocketChannel extends AbstractAioChannel implements Server channel.javaChannel().accept(channel, this); // create the socket add it to the buffer and fire the event - channel.pipeline().inboundMessageBuffer().add(new AioSocketChannel(channel, null, ch)); + channel.pipeline().inboundMessageBuffer().add( + new AioSocketChannel(channel, null, channel.eventLoop, ch)); channel.pipeline().fireInboundBufferUpdated(); } diff --git a/transport/src/main/java/io/netty/channel/socket/aio/AioSocketChannel.java b/transport/src/main/java/io/netty/channel/socket/aio/AioSocketChannel.java index 83c9807e38..bbd0ad0b69 100644 --- a/transport/src/main/java/io/netty/channel/socket/aio/AioSocketChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/aio/AioSocketChannel.java @@ -53,11 +53,13 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne private boolean flushing; public AioSocketChannel(AioEventLoop eventLoop) { - this(null, null, newSocket(eventLoop.group)); + this(null, null, eventLoop, newSocket(eventLoop.group)); } - AioSocketChannel(AioServerSocketChannel parent, Integer id, AsynchronousSocketChannel ch) { - super(parent, id, ch); + AioSocketChannel( + AioServerSocketChannel parent, Integer id, + AioEventLoop eventLoop, AsynchronousSocketChannel ch) { + super(parent, id, eventLoop, ch); config = new AioSocketChannelConfig(ch); } @@ -110,6 +112,8 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne @Override protected Runnable doRegister() throws Exception { + super.doRegister(); + if (remoteAddress() == null) { return null; }