Require a user specify the same AioEventLoop
.. both when create an AIO channel and registering it - Also fixed a bug in AbstractChannel where is does not handle registration failure correctly.
This commit is contained in:
parent
d233be7041
commit
701cda2819
@ -437,7 +437,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
|
||||
|
||||
future.setFailure(t);
|
||||
pipeline.fireExceptionCaught(t);
|
||||
closeFuture().setSuccess();
|
||||
closeFuture.setClosed();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -17,6 +17,7 @@ package io.netty.channel.socket.aio;
|
||||
|
||||
import io.netty.channel.AbstractChannel;
|
||||
import io.netty.channel.Channel;
|
||||
import io.netty.channel.ChannelException;
|
||||
import io.netty.channel.ChannelFuture;
|
||||
import io.netty.channel.EventLoop;
|
||||
|
||||
@ -27,8 +28,9 @@ import java.nio.channels.AsynchronousChannel;
|
||||
import java.util.concurrent.ScheduledFuture;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
public abstract class AbstractAioChannel extends AbstractChannel {
|
||||
abstract class AbstractAioChannel extends AbstractChannel {
|
||||
|
||||
protected final AioEventLoop eventLoop;
|
||||
private final AsynchronousChannel ch;
|
||||
|
||||
/**
|
||||
@ -39,9 +41,10 @@ public abstract class AbstractAioChannel extends AbstractChannel {
|
||||
protected ScheduledFuture<?> connectTimeoutFuture;
|
||||
private ConnectException connectTimeoutException;
|
||||
|
||||
protected AbstractAioChannel(Channel parent, Integer id, AsynchronousChannel ch) {
|
||||
protected AbstractAioChannel(Channel parent, Integer id, AioEventLoop eventLoop, AsynchronousChannel ch) {
|
||||
super(parent, id);
|
||||
this.ch = ch;
|
||||
this.eventLoop = eventLoop;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -63,6 +66,16 @@ public abstract class AbstractAioChannel extends AbstractChannel {
|
||||
return ch.isOpen();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Runnable doRegister() throws Exception {
|
||||
if (((AioChildEventLoop) eventLoop()).parent != eventLoop) {
|
||||
throw new ChannelException(
|
||||
getClass().getSimpleName() + " must be registered to the " +
|
||||
AioEventLoop.class.getSimpleName() + " which was specified in the constructor.");
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doDeregister() throws Exception {
|
||||
// NOOP
|
||||
|
@ -21,8 +21,11 @@ import java.util.concurrent.ThreadFactory;
|
||||
|
||||
final class AioChildEventLoop extends SingleThreadEventLoop {
|
||||
|
||||
AioChildEventLoop(ThreadFactory threadFactory) {
|
||||
final AioEventLoop parent;
|
||||
|
||||
AioChildEventLoop(AioEventLoop parent, ThreadFactory threadFactory) {
|
||||
super(threadFactory);
|
||||
this.parent = parent;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -99,6 +99,6 @@ public class AioEventLoop extends MultithreadEventLoop {
|
||||
|
||||
@Override
|
||||
protected EventExecutor newChild(ThreadFactory threadFactory, Object... args) throws Exception {
|
||||
return new AioChildEventLoop(threadFactory);
|
||||
return new AioChildEventLoop(this, threadFactory);
|
||||
}
|
||||
}
|
||||
|
@ -51,7 +51,7 @@ public class AioServerSocketChannel extends AbstractAioChannel implements Server
|
||||
}
|
||||
|
||||
public AioServerSocketChannel(AioEventLoop eventLoop) {
|
||||
super(null, null, newSocket(eventLoop.group));
|
||||
super(null, null, eventLoop, newSocket(eventLoop.group));
|
||||
config = new AioServerSocketChannelConfig(javaChannel());
|
||||
}
|
||||
|
||||
@ -117,7 +117,7 @@ public class AioServerSocketChannel extends AbstractAioChannel implements Server
|
||||
|
||||
@Override
|
||||
protected Runnable doRegister() throws Exception {
|
||||
return null;
|
||||
return super.doRegister();
|
||||
}
|
||||
|
||||
private static final class AcceptHandler
|
||||
@ -129,7 +129,8 @@ public class AioServerSocketChannel extends AbstractAioChannel implements Server
|
||||
channel.javaChannel().accept(channel, this);
|
||||
|
||||
// create the socket add it to the buffer and fire the event
|
||||
channel.pipeline().inboundMessageBuffer().add(new AioSocketChannel(channel, null, ch));
|
||||
channel.pipeline().inboundMessageBuffer().add(
|
||||
new AioSocketChannel(channel, null, channel.eventLoop, ch));
|
||||
channel.pipeline().fireInboundBufferUpdated();
|
||||
}
|
||||
|
||||
|
@ -53,11 +53,13 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
|
||||
private boolean flushing;
|
||||
|
||||
public AioSocketChannel(AioEventLoop eventLoop) {
|
||||
this(null, null, newSocket(eventLoop.group));
|
||||
this(null, null, eventLoop, newSocket(eventLoop.group));
|
||||
}
|
||||
|
||||
AioSocketChannel(AioServerSocketChannel parent, Integer id, AsynchronousSocketChannel ch) {
|
||||
super(parent, id, ch);
|
||||
AioSocketChannel(
|
||||
AioServerSocketChannel parent, Integer id,
|
||||
AioEventLoop eventLoop, AsynchronousSocketChannel ch) {
|
||||
super(parent, id, eventLoop, ch);
|
||||
config = new AioSocketChannelConfig(ch);
|
||||
}
|
||||
|
||||
@ -110,6 +112,8 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
|
||||
|
||||
@Override
|
||||
protected Runnable doRegister() throws Exception {
|
||||
super.doRegister();
|
||||
|
||||
if (remoteAddress() == null) {
|
||||
return null;
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user