diff --git a/transport/src/main/java/io/netty/channel/socket/aio/AioServerSocketChannel.java b/transport/src/main/java/io/netty/channel/socket/aio/AioServerSocketChannel.java index efb016242d..b45c375b52 100644 --- a/transport/src/main/java/io/netty/channel/socket/aio/AioServerSocketChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/aio/AioServerSocketChannel.java @@ -20,6 +20,7 @@ import io.netty.channel.ChannelException; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelMetadata; import io.netty.channel.socket.ServerSocketChannel; +import io.netty.channel.socket.ServerSocketChannelConfig; import io.netty.logging.InternalLogger; import io.netty.logging.InternalLoggerFactory; @@ -37,14 +38,14 @@ public class AioServerSocketChannel extends AbstractAioChannel implements Server private static final AcceptHandler ACCEPT_HANDLER = new AcceptHandler(); private static final InternalLogger logger = InternalLoggerFactory.getInstance(AioServerSocketChannel.class); - private volatile AioServerSocketChannelConfig config; + + private final AioServerSocketChannelConfig config = new AioServerSocketChannelConfig(); private boolean closed; public AioServerSocketChannel() { super(null, null); } - @Override protected AsynchronousServerSocketChannel javaChannel() { return (AsynchronousServerSocketChannel) super.javaChannel(); @@ -116,7 +117,7 @@ public class AioServerSocketChannel extends AbstractAioChannel implements Server @Override protected Runnable doRegister() throws Exception { ch = AsynchronousServerSocketChannel.open(AsynchronousChannelGroup.withThreadPool(eventLoop())); - config = new AioServerSocketChannelConfig(javaChannel()); + config.setChannel(javaChannel()); return null; } @@ -150,10 +151,7 @@ public class AioServerSocketChannel extends AbstractAioChannel implements Server } @Override - public AioServerSocketChannelConfig config() { - if (config == null) { - throw new IllegalStateException("Channel not registered yet"); - } + public ServerSocketChannelConfig config() { return config; } } diff --git a/transport/src/main/java/io/netty/channel/socket/aio/AioServerSocketChannelConfig.java b/transport/src/main/java/io/netty/channel/socket/aio/AioServerSocketChannelConfig.java index 6ceb899d3f..cd74cddc17 100644 --- a/transport/src/main/java/io/netty/channel/socket/aio/AioServerSocketChannelConfig.java +++ b/transport/src/main/java/io/netty/channel/socket/aio/AioServerSocketChannelConfig.java @@ -30,20 +30,29 @@ import java.util.Map; /** * The Async {@link ServerSocketChannelConfig} implementation. */ -public class AioServerSocketChannelConfig extends DefaultChannelConfig +final class AioServerSocketChannelConfig extends DefaultChannelConfig implements ServerSocketChannelConfig { - private final AsynchronousServerSocketChannel channel; + private volatile AsynchronousServerSocketChannel channel; + private volatile Integer receiveBufferSize; + private volatile Boolean reuseAddress; private volatile int backlog = NetworkConstants.SOMAXCONN; - /** - * Creates a new instance. - */ - public AioServerSocketChannelConfig(AsynchronousServerSocketChannel channel) { + void setChannel(AsynchronousServerSocketChannel channel) { if (channel == null) { throw new NullPointerException("channel"); } + if (this.channel != null) { + throw new IllegalStateException(); + } this.channel = channel; + + if (receiveBufferSize != null) { + setReceiveBufferSize(receiveBufferSize); + } + if (reuseAddress != null) { + setReuseAddress(reuseAddress); + } } @Override @@ -85,6 +94,14 @@ public class AioServerSocketChannelConfig extends DefaultChannelConfig @Override public boolean isReuseAddress() { + AsynchronousServerSocketChannel channel = this.channel; + if (channel == null) { + if (reuseAddress == null) { + return false; + } else { + return reuseAddress; + } + } try { return channel.getOption(StandardSocketOptions.SO_REUSEADDR); } catch (IOException e) { @@ -94,6 +111,10 @@ public class AioServerSocketChannelConfig extends DefaultChannelConfig @Override public void setReuseAddress(boolean reuseAddress) { + AsynchronousServerSocketChannel channel = this.channel; + if (channel == null) { + this.reuseAddress = reuseAddress; + } try { channel.setOption(StandardSocketOptions.SO_REUSEADDR, reuseAddress); } catch (IOException e) { @@ -103,6 +124,14 @@ public class AioServerSocketChannelConfig extends DefaultChannelConfig @Override public int getReceiveBufferSize() { + AsynchronousServerSocketChannel channel = this.channel; + if (channel == null) { + if (receiveBufferSize == null) { + return 0; + } else { + return receiveBufferSize; + } + } try { return channel.getOption(StandardSocketOptions.SO_RCVBUF); } catch (IOException e) { @@ -112,6 +141,12 @@ public class AioServerSocketChannelConfig extends DefaultChannelConfig @Override public void setReceiveBufferSize(int receiveBufferSize) { + AsynchronousServerSocketChannel channel = this.channel; + if (channel == null) { + this.receiveBufferSize = receiveBufferSize; + return; + } + try { channel.setOption(StandardSocketOptions.SO_RCVBUF, receiveBufferSize); } catch (IOException e) { diff --git a/transport/src/main/java/io/netty/channel/socket/aio/AioSocketChannel.java b/transport/src/main/java/io/netty/channel/socket/aio/AioSocketChannel.java index 26b9b4c700..ce4e80bc7d 100644 --- a/transport/src/main/java/io/netty/channel/socket/aio/AioSocketChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/aio/AioSocketChannel.java @@ -30,6 +30,7 @@ import java.nio.channels.AsynchronousChannelGroup; import java.nio.channels.AsynchronousCloseException; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; +import java.nio.channels.NetworkChannel; public class AioSocketChannel extends AbstractAioChannel implements SocketChannel { @@ -40,9 +41,9 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne private static final CompletionHandler READ_HANDLER = new ReadHandler(); private static final CompletionHandler WRITE_HANDLER = new WriteHandler(); + private final AioSocketChannelConfig config = new AioSocketChannelConfig(); private boolean closed; private boolean flushing; - private volatile AioSocketChannelConfig config; public AioSocketChannel() { this(null, null, null); @@ -52,7 +53,7 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne super(parent, id); ch = channel; if (ch != null) { - config = new AioSocketChannelConfig(javaChannel()); + config.setChannel(channel); } } @@ -112,7 +113,7 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne protected Runnable doRegister() throws Exception { if (ch == null) { ch = AsynchronousSocketChannel.open(AsynchronousChannelGroup.withThreadPool(eventLoop())); - config = new AioSocketChannelConfig(javaChannel()); + config.setChannel((NetworkChannel) ch); return null; } else if (remoteAddress() != null) { return new Runnable() { @@ -139,7 +140,7 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne } - private boolean expandReadBuffer(ByteBuf byteBuf) { + private static boolean expandReadBuffer(ByteBuf byteBuf) { if (!byteBuf.writable()) { // FIXME: Magic number byteBuf.ensureWritableBytes(4096); diff --git a/transport/src/main/java/io/netty/channel/socket/aio/AioSocketChannelConfig.java b/transport/src/main/java/io/netty/channel/socket/aio/AioSocketChannelConfig.java index 45228e0804..5cb4f6057f 100644 --- a/transport/src/main/java/io/netty/channel/socket/aio/AioSocketChannelConfig.java +++ b/transport/src/main/java/io/netty/channel/socket/aio/AioSocketChannelConfig.java @@ -29,19 +29,51 @@ import java.util.Map; /** * The default {@link SocketChannelConfig} implementation. */ -public class AioSocketChannelConfig extends DefaultChannelConfig +final class AioSocketChannelConfig extends DefaultChannelConfig implements SocketChannelConfig { - private final NetworkChannel channel; + private volatile NetworkChannel channel; + private volatile Integer receiveBufferSize; + private volatile Integer sendBufferSize; + private volatile Boolean tcpNoDelay; + private volatile Boolean keepAlive; + private volatile Boolean reuseAddress; + private volatile Integer soLinger; + private volatile Integer trafficClass; /** * Creates a new instance. */ - public AioSocketChannelConfig(NetworkChannel channel) { + void setChannel(NetworkChannel channel) { if (channel == null) { throw new NullPointerException("channel"); } + if (this.channel != null) { + throw new IllegalStateException(); + } this.channel = channel; + + if (receiveBufferSize != null) { + setReceiveBufferSize(receiveBufferSize); + } + if (sendBufferSize != null) { + setSendBufferSize(sendBufferSize); + } + if (reuseAddress != null) { + setReuseAddress(reuseAddress); + } + if (tcpNoDelay != null) { + setTcpNoDelay(tcpNoDelay); + } + if (keepAlive != null) { + setKeepAlive(keepAlive); + } + if (soLinger != null) { + setSoLinger(soLinger); + } + if (trafficClass != null) { + setTrafficClass(trafficClass); + } } @Override @@ -105,6 +137,15 @@ public class AioSocketChannelConfig extends DefaultChannelConfig @Override public int getReceiveBufferSize() { + NetworkChannel channel = this.channel; + if (channel == null) { + if (receiveBufferSize == null) { + return 0; + } else { + return receiveBufferSize; + } + } + try { return channel.getOption(StandardSocketOptions.SO_RCVBUF); } catch (IOException e) { @@ -114,6 +155,15 @@ public class AioSocketChannelConfig extends DefaultChannelConfig @Override public int getSendBufferSize() { + NetworkChannel channel = this.channel; + if (channel == null) { + if (sendBufferSize == null) { + return 0; + } else { + return sendBufferSize; + } + } + try { return channel.getOption(StandardSocketOptions.SO_SNDBUF); } catch (IOException e) { @@ -123,6 +173,15 @@ public class AioSocketChannelConfig extends DefaultChannelConfig @Override public int getSoLinger() { + NetworkChannel channel = this.channel; + if (channel == null) { + if (soLinger == null) { + return 1; + } else { + return soLinger; + } + } + try { return channel.getOption(StandardSocketOptions.SO_LINGER); } catch (IOException e) { @@ -132,6 +191,15 @@ public class AioSocketChannelConfig extends DefaultChannelConfig @Override public int getTrafficClass() { + NetworkChannel channel = this.channel; + if (channel == null) { + if (trafficClass == null) { + return 0; + } else { + return trafficClass; + } + } + try { return channel.getOption(StandardSocketOptions.IP_TOS); } catch (IOException e) { @@ -141,6 +209,15 @@ public class AioSocketChannelConfig extends DefaultChannelConfig @Override public boolean isKeepAlive() { + NetworkChannel channel = this.channel; + if (channel == null) { + if (keepAlive == null) { + return false; + } else { + return keepAlive; + } + } + try { return channel.getOption(StandardSocketOptions.SO_KEEPALIVE); } catch (IOException e) { @@ -150,6 +227,15 @@ public class AioSocketChannelConfig extends DefaultChannelConfig @Override public boolean isReuseAddress() { + NetworkChannel channel = this.channel; + if (channel == null) { + if (reuseAddress == null) { + return false; + } else { + return reuseAddress; + } + } + try { return channel.getOption(StandardSocketOptions.SO_REUSEADDR); } catch (IOException e) { @@ -159,6 +245,15 @@ public class AioSocketChannelConfig extends DefaultChannelConfig @Override public boolean isTcpNoDelay() { + NetworkChannel channel = this.channel; + if (channel == null) { + if (tcpNoDelay == null) { + return false; + } else { + return tcpNoDelay; + } + } + try { return channel.getOption(StandardSocketOptions.SO_REUSEADDR); } catch (IOException e) { @@ -168,6 +263,12 @@ public class AioSocketChannelConfig extends DefaultChannelConfig @Override public void setKeepAlive(boolean keepAlive) { + NetworkChannel channel = this.channel; + if (channel == null) { + this.keepAlive = keepAlive; + return; + } + try { channel.setOption(StandardSocketOptions.SO_KEEPALIVE, keepAlive); } catch (IOException e) { @@ -183,6 +284,12 @@ public class AioSocketChannelConfig extends DefaultChannelConfig @Override public void setReceiveBufferSize(int receiveBufferSize) { + NetworkChannel channel = this.channel; + if (channel == null) { + this.receiveBufferSize = receiveBufferSize; + return; + } + try { channel.setOption(StandardSocketOptions.SO_RCVBUF, receiveBufferSize); } catch (IOException e) { @@ -192,6 +299,12 @@ public class AioSocketChannelConfig extends DefaultChannelConfig @Override public void setReuseAddress(boolean reuseAddress) { + NetworkChannel channel = this.channel; + if (channel == null) { + this.reuseAddress = reuseAddress; + return; + } + try { channel.setOption(StandardSocketOptions.SO_REUSEADDR, reuseAddress); } catch (IOException e) { @@ -201,6 +314,12 @@ public class AioSocketChannelConfig extends DefaultChannelConfig @Override public void setSendBufferSize(int sendBufferSize) { + NetworkChannel channel = this.channel; + if (channel == null) { + this.sendBufferSize = sendBufferSize; + return; + } + try { channel.setOption(StandardSocketOptions.SO_SNDBUF, sendBufferSize); } catch (IOException e) { @@ -210,6 +329,12 @@ public class AioSocketChannelConfig extends DefaultChannelConfig @Override public void setSoLinger(int soLinger) { + NetworkChannel channel = this.channel; + if (channel == null) { + this.soLinger = soLinger; + return; + } + try { channel.setOption(StandardSocketOptions.SO_LINGER, soLinger); } catch (IOException e) { @@ -219,6 +344,12 @@ public class AioSocketChannelConfig extends DefaultChannelConfig @Override public void setTcpNoDelay(boolean tcpNoDelay) { + NetworkChannel channel = this.channel; + if (channel == null) { + this.tcpNoDelay = tcpNoDelay; + return; + } + try { channel.setOption(StandardSocketOptions.TCP_NODELAY, tcpNoDelay); } catch (IOException e) { @@ -228,6 +359,11 @@ public class AioSocketChannelConfig extends DefaultChannelConfig @Override public void setTrafficClass(int trafficClass) { + NetworkChannel channel = this.channel; + if (channel == null) { + this.trafficClass = trafficClass; + } + try { channel.setOption(StandardSocketOptions.IP_TOS, trafficClass); } catch (IOException e) {