From c4db51e85d9e4bf9f3c72d81ea8386fe9dfdeecf Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Thu, 13 Dec 2012 19:54:39 +0100 Subject: [PATCH] Refactor AIO Transport to allow to use Bootstrap without the ugly hack --- .../java/io/netty/bootstrap/Bootstrap.java | 25 ------------------ .../socket/aio/AbstractAioChannel.java | 18 ++++++------- .../socket/aio/AioServerSocketChannel.java | 5 ++-- .../channel/socket/aio/AioSocketChannel.java | 19 ++++++++------ .../socket/aio/AioSocketChannelConfig.java | 26 +++++++++++++++---- 5 files changed, 43 insertions(+), 50 deletions(-) diff --git a/transport/src/main/java/io/netty/bootstrap/Bootstrap.java b/transport/src/main/java/io/netty/bootstrap/Bootstrap.java index d21e2d7831..160c5455d9 100644 --- a/transport/src/main/java/io/netty/bootstrap/Bootstrap.java +++ b/transport/src/main/java/io/netty/bootstrap/Bootstrap.java @@ -20,8 +20,6 @@ import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelOption; import io.netty.channel.ChannelPipeline; -import io.netty.channel.socket.aio.AioEventLoopGroup; -import io.netty.channel.socket.aio.AioSocketChannel; import io.netty.logging.InternalLogger; import io.netty.logging.InternalLoggerFactory; import io.netty.util.AttributeKey; @@ -179,17 +177,6 @@ public class Bootstrap extends AbstractBootstrap { return b; } - @Override - public Bootstrap channel(Class channelClass) { - if (channelClass == null) { - throw new NullPointerException("channelClass"); - } - if (channelClass == AioSocketChannel.class) { - return channelFactory(new AioSocketChannelFactory()); - } - return super.channel(channelClass); - } - @Override public String toString() { if (remoteAddress == null) { @@ -204,16 +191,4 @@ public class Bootstrap extends AbstractBootstrap { return buf.toString(); } - - private final class AioSocketChannelFactory implements ChannelFactory { - @Override - public Channel newChannel() { - return new AioSocketChannel((AioEventLoopGroup) group()); - } - - @Override - public String toString() { - return AioSocketChannel.class.getSimpleName() + ".class"; - } - } } diff --git a/transport/src/main/java/io/netty/channel/socket/aio/AbstractAioChannel.java b/transport/src/main/java/io/netty/channel/socket/aio/AbstractAioChannel.java index c7655019d5..bd651f78e9 100755 --- a/transport/src/main/java/io/netty/channel/socket/aio/AbstractAioChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/aio/AbstractAioChannel.java @@ -17,7 +17,6 @@ package io.netty.channel.socket.aio; import io.netty.channel.AbstractChannel; import io.netty.channel.Channel; -import io.netty.channel.ChannelException; import io.netty.channel.ChannelFuture; import io.netty.channel.EventLoop; @@ -30,8 +29,7 @@ import java.util.concurrent.TimeUnit; abstract class AbstractAioChannel extends AbstractChannel { - protected final AioEventLoopGroup group; - private final AsynchronousChannel ch; + protected AsynchronousChannel ch; /** * The future of the current connection attempt. If not null, subsequent @@ -41,10 +39,9 @@ abstract class AbstractAioChannel extends AbstractChannel { protected ScheduledFuture connectTimeoutFuture; private ConnectException connectTimeoutException; - protected AbstractAioChannel(Channel parent, Integer id, AioEventLoopGroup group, AsynchronousChannel ch) { + protected AbstractAioChannel(Channel parent, Integer id, AsynchronousChannel ch) { super(parent, id); this.ch = ch; - this.group = group; } @Override @@ -58,21 +55,22 @@ abstract class AbstractAioChannel extends AbstractChannel { } protected AsynchronousChannel javaChannel() { + if (ch == null) { + throw new IllegalStateException("Try to access Channel before eventLoop was registered"); + } return ch; } @Override public boolean isOpen() { + if (ch == null) { + return true; + } return ch.isOpen(); } @Override protected Runnable doRegister() throws Exception { - if (eventLoop().parent() != group) { - throw new ChannelException( - getClass().getSimpleName() + " must be registered to the " + - AioEventLoopGroup.class.getSimpleName() + " which was specified in the constructor."); - } return null; } diff --git a/transport/src/main/java/io/netty/channel/socket/aio/AioServerSocketChannel.java b/transport/src/main/java/io/netty/channel/socket/aio/AioServerSocketChannel.java index 4867179e9c..d51c915d1d 100755 --- a/transport/src/main/java/io/netty/channel/socket/aio/AioServerSocketChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/aio/AioServerSocketChannel.java @@ -66,8 +66,9 @@ public class AioServerSocketChannel extends AbstractAioChannel implements Server } public AioServerSocketChannel(AioEventLoopGroup parentGroup, AioEventLoopGroup childGroup) { - super(null, null, parentGroup, newSocket(parentGroup.group)); + super(null, null, newSocket(parentGroup.group)); this.childGroup = childGroup; + config = new AioServerSocketChannelConfig(javaChannel()); } @@ -153,7 +154,7 @@ public class AioServerSocketChannel extends AbstractAioChannel implements Server // create the socket add it to the buffer and fire the event channel.pipeline().inboundMessageBuffer().add( - new AioSocketChannel(channel, null, channel.childGroup, ch)); + new AioSocketChannel(channel, null, ch)); channel.pipeline().fireInboundBufferUpdated(); } diff --git a/transport/src/main/java/io/netty/channel/socket/aio/AioSocketChannel.java b/transport/src/main/java/io/netty/channel/socket/aio/AioSocketChannel.java index cd68cef3fa..2c08caf097 100755 --- a/transport/src/main/java/io/netty/channel/socket/aio/AioSocketChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/aio/AioSocketChannel.java @@ -60,7 +60,7 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne } } - private final AioSocketChannelConfig config; + private AioSocketChannelConfig config; private volatile boolean inputShutdown; private volatile boolean outputShutdown; @@ -78,20 +78,19 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne } }; - public AioSocketChannel(AioEventLoopGroup eventLoop) { - this(null, null, eventLoop, newSocket(eventLoop.group)); + public AioSocketChannel() { + this(null, null, null); } AioSocketChannel( - AioServerSocketChannel parent, Integer id, - AioEventLoopGroup eventLoop, AsynchronousSocketChannel ch) { - super(parent, id, eventLoop, ch); - config = new AioSocketChannelConfig(ch); + AioServerSocketChannel parent, Integer id, AsynchronousSocketChannel ch) { + super(parent, id, ch); + config = new AioSocketChannelConfig(null); } @Override public boolean isActive() { - return javaChannel().isOpen() && remoteAddress0() != null; + return ch != null && javaChannel().isOpen() && remoteAddress0() != null; } @Override @@ -176,6 +175,10 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne @Override protected Runnable doRegister() throws Exception { super.doRegister(); + if (ch == null) { + ch = newSocket(((AioEventLoopGroup) eventLoop().parent()).group); + config.channel = javaChannel(); + } if (remoteAddress() == null) { return null; diff --git a/transport/src/main/java/io/netty/channel/socket/aio/AioSocketChannelConfig.java b/transport/src/main/java/io/netty/channel/socket/aio/AioSocketChannelConfig.java index 509b6fbf06..a72d71387c 100644 --- a/transport/src/main/java/io/netty/channel/socket/aio/AioSocketChannelConfig.java +++ b/transport/src/main/java/io/netty/channel/socket/aio/AioSocketChannelConfig.java @@ -34,7 +34,7 @@ import static io.netty.channel.ChannelOption.*; final class AioSocketChannelConfig extends DefaultChannelConfig implements SocketChannelConfig { - private final NetworkChannel channel; + volatile NetworkChannel channel; private volatile boolean allowHalfClosure; private volatile long readTimeoutInMillis; private volatile long writeTimeoutInMillis; @@ -43,10 +43,6 @@ final class AioSocketChannelConfig extends DefaultChannelConfig * Creates a new instance. */ AioSocketChannelConfig(NetworkChannel channel) { - if (channel == null) { - throw new NullPointerException("channel"); - } - this.channel = channel; } @@ -128,6 +124,7 @@ final class AioSocketChannelConfig extends DefaultChannelConfig @Override public int getReceiveBufferSize() { + channelNull(); try { return channel.getOption(StandardSocketOptions.SO_RCVBUF); } catch (IOException e) { @@ -137,6 +134,7 @@ final class AioSocketChannelConfig extends DefaultChannelConfig @Override public int getSendBufferSize() { + channelNull(); try { return channel.getOption(StandardSocketOptions.SO_SNDBUF); } catch (IOException e) { @@ -146,6 +144,7 @@ final class AioSocketChannelConfig extends DefaultChannelConfig @Override public int getSoLinger() { + channelNull(); try { return channel.getOption(StandardSocketOptions.SO_LINGER); } catch (IOException e) { @@ -155,6 +154,7 @@ final class AioSocketChannelConfig extends DefaultChannelConfig @Override public int getTrafficClass() { + channelNull(); try { return channel.getOption(StandardSocketOptions.IP_TOS); } catch (IOException e) { @@ -164,6 +164,7 @@ final class AioSocketChannelConfig extends DefaultChannelConfig @Override public boolean isKeepAlive() { + channelNull(); try { return channel.getOption(StandardSocketOptions.SO_KEEPALIVE); } catch (IOException e) { @@ -173,6 +174,7 @@ final class AioSocketChannelConfig extends DefaultChannelConfig @Override public boolean isReuseAddress() { + channelNull(); try { return channel.getOption(StandardSocketOptions.SO_REUSEADDR); } catch (IOException e) { @@ -182,6 +184,7 @@ final class AioSocketChannelConfig extends DefaultChannelConfig @Override public boolean isTcpNoDelay() { + channelNull(); try { return channel.getOption(StandardSocketOptions.SO_REUSEADDR); } catch (IOException e) { @@ -191,6 +194,7 @@ final class AioSocketChannelConfig extends DefaultChannelConfig @Override public void setKeepAlive(boolean keepAlive) { + channelNull(); try { channel.setOption(StandardSocketOptions.SO_KEEPALIVE, keepAlive); } catch (IOException e) { @@ -206,6 +210,7 @@ final class AioSocketChannelConfig extends DefaultChannelConfig @Override public void setReceiveBufferSize(int receiveBufferSize) { + channelNull(); try { channel.setOption(StandardSocketOptions.SO_RCVBUF, receiveBufferSize); } catch (IOException e) { @@ -215,6 +220,7 @@ final class AioSocketChannelConfig extends DefaultChannelConfig @Override public void setReuseAddress(boolean reuseAddress) { + channelNull(); try { channel.setOption(StandardSocketOptions.SO_REUSEADDR, reuseAddress); } catch (IOException e) { @@ -224,6 +230,7 @@ final class AioSocketChannelConfig extends DefaultChannelConfig @Override public void setSendBufferSize(int sendBufferSize) { + channelNull(); try { channel.setOption(StandardSocketOptions.SO_SNDBUF, sendBufferSize); } catch (IOException e) { @@ -233,6 +240,7 @@ final class AioSocketChannelConfig extends DefaultChannelConfig @Override public void setSoLinger(int soLinger) { + channelNull(); try { channel.setOption(StandardSocketOptions.SO_LINGER, soLinger); } catch (IOException e) { @@ -242,6 +250,7 @@ final class AioSocketChannelConfig extends DefaultChannelConfig @Override public void setTcpNoDelay(boolean tcpNoDelay) { + channelNull(); try { channel.setOption(StandardSocketOptions.TCP_NODELAY, tcpNoDelay); } catch (IOException e) { @@ -251,6 +260,7 @@ final class AioSocketChannelConfig extends DefaultChannelConfig @Override public void setTrafficClass(int trafficClass) { + channelNull(); try { channel.setOption(StandardSocketOptions.IP_TOS, trafficClass); } catch (IOException e) { @@ -313,4 +323,10 @@ final class AioSocketChannelConfig extends DefaultChannelConfig public void setAllowHalfClosure(boolean allowHalfClosure) { this.allowHalfClosure = allowHalfClosure; } + + private void channelNull() { + if (channel == null) { + throw new IllegalStateException("Channel not set while try to change options on it"); + } + } }