Refactor AIO Transport to allow to use Bootstrap without the ugly hack

This commit is contained in:
Norman Maurer 2012-12-13 19:54:39 +01:00
parent 8945ce17ad
commit c4db51e85d
5 changed files with 43 additions and 50 deletions

View File

@ -20,8 +20,6 @@ import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelOption; import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.aio.AioEventLoopGroup;
import io.netty.channel.socket.aio.AioSocketChannel;
import io.netty.logging.InternalLogger; import io.netty.logging.InternalLogger;
import io.netty.logging.InternalLoggerFactory; import io.netty.logging.InternalLoggerFactory;
import io.netty.util.AttributeKey; import io.netty.util.AttributeKey;
@ -179,17 +177,6 @@ public class Bootstrap extends AbstractBootstrap<Bootstrap> {
return b; return b;
} }
@Override
public Bootstrap channel(Class<? extends Channel> channelClass) {
if (channelClass == null) {
throw new NullPointerException("channelClass");
}
if (channelClass == AioSocketChannel.class) {
return channelFactory(new AioSocketChannelFactory());
}
return super.channel(channelClass);
}
@Override @Override
public String toString() { public String toString() {
if (remoteAddress == null) { if (remoteAddress == null) {
@ -204,16 +191,4 @@ public class Bootstrap extends AbstractBootstrap<Bootstrap> {
return buf.toString(); return buf.toString();
} }
private final class AioSocketChannelFactory implements ChannelFactory {
@Override
public Channel newChannel() {
return new AioSocketChannel((AioEventLoopGroup) group());
}
@Override
public String toString() {
return AioSocketChannel.class.getSimpleName() + ".class";
}
}
} }

View File

@ -17,7 +17,6 @@ package io.netty.channel.socket.aio;
import io.netty.channel.AbstractChannel; import io.netty.channel.AbstractChannel;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoop; import io.netty.channel.EventLoop;
@ -30,8 +29,7 @@ import java.util.concurrent.TimeUnit;
abstract class AbstractAioChannel extends AbstractChannel { abstract class AbstractAioChannel extends AbstractChannel {
protected final AioEventLoopGroup group; protected AsynchronousChannel ch;
private final AsynchronousChannel ch;
/** /**
* The future of the current connection attempt. If not null, subsequent * The future of the current connection attempt. If not null, subsequent
@ -41,10 +39,9 @@ abstract class AbstractAioChannel extends AbstractChannel {
protected ScheduledFuture<?> connectTimeoutFuture; protected ScheduledFuture<?> connectTimeoutFuture;
private ConnectException connectTimeoutException; private ConnectException connectTimeoutException;
protected AbstractAioChannel(Channel parent, Integer id, AioEventLoopGroup group, AsynchronousChannel ch) { protected AbstractAioChannel(Channel parent, Integer id, AsynchronousChannel ch) {
super(parent, id); super(parent, id);
this.ch = ch; this.ch = ch;
this.group = group;
} }
@Override @Override
@ -58,21 +55,22 @@ abstract class AbstractAioChannel extends AbstractChannel {
} }
protected AsynchronousChannel javaChannel() { protected AsynchronousChannel javaChannel() {
if (ch == null) {
throw new IllegalStateException("Try to access Channel before eventLoop was registered");
}
return ch; return ch;
} }
@Override @Override
public boolean isOpen() { public boolean isOpen() {
if (ch == null) {
return true;
}
return ch.isOpen(); return ch.isOpen();
} }
@Override @Override
protected Runnable doRegister() throws Exception { protected Runnable doRegister() throws Exception {
if (eventLoop().parent() != group) {
throw new ChannelException(
getClass().getSimpleName() + " must be registered to the " +
AioEventLoopGroup.class.getSimpleName() + " which was specified in the constructor.");
}
return null; return null;
} }

View File

@ -66,8 +66,9 @@ public class AioServerSocketChannel extends AbstractAioChannel implements Server
} }
public AioServerSocketChannel(AioEventLoopGroup parentGroup, AioEventLoopGroup childGroup) { public AioServerSocketChannel(AioEventLoopGroup parentGroup, AioEventLoopGroup childGroup) {
super(null, null, parentGroup, newSocket(parentGroup.group)); super(null, null, newSocket(parentGroup.group));
this.childGroup = childGroup; this.childGroup = childGroup;
config = new AioServerSocketChannelConfig(javaChannel()); config = new AioServerSocketChannelConfig(javaChannel());
} }
@ -153,7 +154,7 @@ public class AioServerSocketChannel extends AbstractAioChannel implements Server
// create the socket add it to the buffer and fire the event // create the socket add it to the buffer and fire the event
channel.pipeline().inboundMessageBuffer().add( channel.pipeline().inboundMessageBuffer().add(
new AioSocketChannel(channel, null, channel.childGroup, ch)); new AioSocketChannel(channel, null, ch));
channel.pipeline().fireInboundBufferUpdated(); channel.pipeline().fireInboundBufferUpdated();
} }

View File

@ -60,7 +60,7 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
} }
} }
private final AioSocketChannelConfig config; private AioSocketChannelConfig config;
private volatile boolean inputShutdown; private volatile boolean inputShutdown;
private volatile boolean outputShutdown; private volatile boolean outputShutdown;
@ -78,20 +78,19 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
} }
}; };
public AioSocketChannel(AioEventLoopGroup eventLoop) { public AioSocketChannel() {
this(null, null, eventLoop, newSocket(eventLoop.group)); this(null, null, null);
} }
AioSocketChannel( AioSocketChannel(
AioServerSocketChannel parent, Integer id, AioServerSocketChannel parent, Integer id, AsynchronousSocketChannel ch) {
AioEventLoopGroup eventLoop, AsynchronousSocketChannel ch) { super(parent, id, ch);
super(parent, id, eventLoop, ch); config = new AioSocketChannelConfig(null);
config = new AioSocketChannelConfig(ch);
} }
@Override @Override
public boolean isActive() { public boolean isActive() {
return javaChannel().isOpen() && remoteAddress0() != null; return ch != null && javaChannel().isOpen() && remoteAddress0() != null;
} }
@Override @Override
@ -176,6 +175,10 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
@Override @Override
protected Runnable doRegister() throws Exception { protected Runnable doRegister() throws Exception {
super.doRegister(); super.doRegister();
if (ch == null) {
ch = newSocket(((AioEventLoopGroup) eventLoop().parent()).group);
config.channel = javaChannel();
}
if (remoteAddress() == null) { if (remoteAddress() == null) {
return null; return null;

View File

@ -34,7 +34,7 @@ import static io.netty.channel.ChannelOption.*;
final class AioSocketChannelConfig extends DefaultChannelConfig final class AioSocketChannelConfig extends DefaultChannelConfig
implements SocketChannelConfig { implements SocketChannelConfig {
private final NetworkChannel channel; volatile NetworkChannel channel;
private volatile boolean allowHalfClosure; private volatile boolean allowHalfClosure;
private volatile long readTimeoutInMillis; private volatile long readTimeoutInMillis;
private volatile long writeTimeoutInMillis; private volatile long writeTimeoutInMillis;
@ -43,10 +43,6 @@ final class AioSocketChannelConfig extends DefaultChannelConfig
* Creates a new instance. * Creates a new instance.
*/ */
AioSocketChannelConfig(NetworkChannel channel) { AioSocketChannelConfig(NetworkChannel channel) {
if (channel == null) {
throw new NullPointerException("channel");
}
this.channel = channel; this.channel = channel;
} }
@ -128,6 +124,7 @@ final class AioSocketChannelConfig extends DefaultChannelConfig
@Override @Override
public int getReceiveBufferSize() { public int getReceiveBufferSize() {
channelNull();
try { try {
return channel.getOption(StandardSocketOptions.SO_RCVBUF); return channel.getOption(StandardSocketOptions.SO_RCVBUF);
} catch (IOException e) { } catch (IOException e) {
@ -137,6 +134,7 @@ final class AioSocketChannelConfig extends DefaultChannelConfig
@Override @Override
public int getSendBufferSize() { public int getSendBufferSize() {
channelNull();
try { try {
return channel.getOption(StandardSocketOptions.SO_SNDBUF); return channel.getOption(StandardSocketOptions.SO_SNDBUF);
} catch (IOException e) { } catch (IOException e) {
@ -146,6 +144,7 @@ final class AioSocketChannelConfig extends DefaultChannelConfig
@Override @Override
public int getSoLinger() { public int getSoLinger() {
channelNull();
try { try {
return channel.getOption(StandardSocketOptions.SO_LINGER); return channel.getOption(StandardSocketOptions.SO_LINGER);
} catch (IOException e) { } catch (IOException e) {
@ -155,6 +154,7 @@ final class AioSocketChannelConfig extends DefaultChannelConfig
@Override @Override
public int getTrafficClass() { public int getTrafficClass() {
channelNull();
try { try {
return channel.getOption(StandardSocketOptions.IP_TOS); return channel.getOption(StandardSocketOptions.IP_TOS);
} catch (IOException e) { } catch (IOException e) {
@ -164,6 +164,7 @@ final class AioSocketChannelConfig extends DefaultChannelConfig
@Override @Override
public boolean isKeepAlive() { public boolean isKeepAlive() {
channelNull();
try { try {
return channel.getOption(StandardSocketOptions.SO_KEEPALIVE); return channel.getOption(StandardSocketOptions.SO_KEEPALIVE);
} catch (IOException e) { } catch (IOException e) {
@ -173,6 +174,7 @@ final class AioSocketChannelConfig extends DefaultChannelConfig
@Override @Override
public boolean isReuseAddress() { public boolean isReuseAddress() {
channelNull();
try { try {
return channel.getOption(StandardSocketOptions.SO_REUSEADDR); return channel.getOption(StandardSocketOptions.SO_REUSEADDR);
} catch (IOException e) { } catch (IOException e) {
@ -182,6 +184,7 @@ final class AioSocketChannelConfig extends DefaultChannelConfig
@Override @Override
public boolean isTcpNoDelay() { public boolean isTcpNoDelay() {
channelNull();
try { try {
return channel.getOption(StandardSocketOptions.SO_REUSEADDR); return channel.getOption(StandardSocketOptions.SO_REUSEADDR);
} catch (IOException e) { } catch (IOException e) {
@ -191,6 +194,7 @@ final class AioSocketChannelConfig extends DefaultChannelConfig
@Override @Override
public void setKeepAlive(boolean keepAlive) { public void setKeepAlive(boolean keepAlive) {
channelNull();
try { try {
channel.setOption(StandardSocketOptions.SO_KEEPALIVE, keepAlive); channel.setOption(StandardSocketOptions.SO_KEEPALIVE, keepAlive);
} catch (IOException e) { } catch (IOException e) {
@ -206,6 +210,7 @@ final class AioSocketChannelConfig extends DefaultChannelConfig
@Override @Override
public void setReceiveBufferSize(int receiveBufferSize) { public void setReceiveBufferSize(int receiveBufferSize) {
channelNull();
try { try {
channel.setOption(StandardSocketOptions.SO_RCVBUF, receiveBufferSize); channel.setOption(StandardSocketOptions.SO_RCVBUF, receiveBufferSize);
} catch (IOException e) { } catch (IOException e) {
@ -215,6 +220,7 @@ final class AioSocketChannelConfig extends DefaultChannelConfig
@Override @Override
public void setReuseAddress(boolean reuseAddress) { public void setReuseAddress(boolean reuseAddress) {
channelNull();
try { try {
channel.setOption(StandardSocketOptions.SO_REUSEADDR, reuseAddress); channel.setOption(StandardSocketOptions.SO_REUSEADDR, reuseAddress);
} catch (IOException e) { } catch (IOException e) {
@ -224,6 +230,7 @@ final class AioSocketChannelConfig extends DefaultChannelConfig
@Override @Override
public void setSendBufferSize(int sendBufferSize) { public void setSendBufferSize(int sendBufferSize) {
channelNull();
try { try {
channel.setOption(StandardSocketOptions.SO_SNDBUF, sendBufferSize); channel.setOption(StandardSocketOptions.SO_SNDBUF, sendBufferSize);
} catch (IOException e) { } catch (IOException e) {
@ -233,6 +240,7 @@ final class AioSocketChannelConfig extends DefaultChannelConfig
@Override @Override
public void setSoLinger(int soLinger) { public void setSoLinger(int soLinger) {
channelNull();
try { try {
channel.setOption(StandardSocketOptions.SO_LINGER, soLinger); channel.setOption(StandardSocketOptions.SO_LINGER, soLinger);
} catch (IOException e) { } catch (IOException e) {
@ -242,6 +250,7 @@ final class AioSocketChannelConfig extends DefaultChannelConfig
@Override @Override
public void setTcpNoDelay(boolean tcpNoDelay) { public void setTcpNoDelay(boolean tcpNoDelay) {
channelNull();
try { try {
channel.setOption(StandardSocketOptions.TCP_NODELAY, tcpNoDelay); channel.setOption(StandardSocketOptions.TCP_NODELAY, tcpNoDelay);
} catch (IOException e) { } catch (IOException e) {
@ -251,6 +260,7 @@ final class AioSocketChannelConfig extends DefaultChannelConfig
@Override @Override
public void setTrafficClass(int trafficClass) { public void setTrafficClass(int trafficClass) {
channelNull();
try { try {
channel.setOption(StandardSocketOptions.IP_TOS, trafficClass); channel.setOption(StandardSocketOptions.IP_TOS, trafficClass);
} catch (IOException e) { } catch (IOException e) {
@ -313,4 +323,10 @@ final class AioSocketChannelConfig extends DefaultChannelConfig
public void setAllowHalfClosure(boolean allowHalfClosure) { public void setAllowHalfClosure(boolean allowHalfClosure) {
this.allowHalfClosure = allowHalfClosure; this.allowHalfClosure = allowHalfClosure;
} }
private void channelNull() {
if (channel == null) {
throw new IllegalStateException("Channel not set while try to change options on it");
}
}
} }