Allow modify the config even before the channel is registered to the eventLoop

This commit is contained in:
Norman Maurer 2012-12-14 15:10:10 +01:00
parent d8c569a71b
commit 42f6a27235
5 changed files with 146 additions and 113 deletions

View File

@ -287,7 +287,6 @@
<!-- Used for NIO UDP multicast --> <!-- Used for NIO UDP multicast -->
<ignore>java.nio.channels.DatagramChannel</ignore> <ignore>java.nio.channels.DatagramChannel</ignore>
<ignore>java.nio.channels.MembershipKey</ignore> <ignore>java.nio.channels.MembershipKey</ignore>
<ignore>java.net.StandardSocketOptions</ignore>
<ignore>java.net.StandardProtocolFamily</ignore> <ignore>java.net.StandardProtocolFamily</ignore>
<!-- Used for NIO. 2 --> <!-- Used for NIO. 2 -->
@ -297,6 +296,8 @@
<ignore>java.nio.channels.AsynchronousChannelGroup</ignore> <ignore>java.nio.channels.AsynchronousChannelGroup</ignore>
<ignore>java.nio.channels.NetworkChannel</ignore> <ignore>java.nio.channels.NetworkChannel</ignore>
<ignore>java.nio.channels.InterruptedByTimeoutException</ignore> <ignore>java.nio.channels.InterruptedByTimeoutException</ignore>
<ignore>java.net.StandardSocketOptions</ignore>
<ignore>java.net.SocketOption</ignore>
</ignores> </ignores>
</configuration> </configuration>
<executions> <executions>

View File

@ -143,7 +143,7 @@ public class AioServerSocketChannel extends AbstractAioChannel implements Server
if (ch == null) { if (ch == null) {
AsynchronousServerSocketChannel channel = newSocket(((AioEventLoopGroup) eventLoop().parent()).group); AsynchronousServerSocketChannel channel = newSocket(((AioEventLoopGroup) eventLoop().parent()).group);
ch = channel; ch = channel;
config.channel = channel; config.active(channel);
} }
return task; return task;
} }

View File

@ -23,9 +23,12 @@ import io.netty.channel.socket.ServerSocketChannelConfig;
import io.netty.util.NetworkConstants; import io.netty.util.NetworkConstants;
import java.io.IOException; import java.io.IOException;
import java.net.SocketOption;
import java.net.StandardSocketOptions; import java.net.StandardSocketOptions;
import java.nio.channels.AsynchronousServerSocketChannel; import java.nio.channels.AsynchronousServerSocketChannel;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
/** /**
* The Async {@link ServerSocketChannelConfig} implementation. * The Async {@link ServerSocketChannelConfig} implementation.
@ -33,16 +36,20 @@ import java.util.Map;
final class AioServerSocketChannelConfig extends DefaultChannelConfig final class AioServerSocketChannelConfig extends DefaultChannelConfig
implements ServerSocketChannelConfig { implements ServerSocketChannelConfig {
volatile AsynchronousServerSocketChannel channel; private final AtomicReference<AsynchronousServerSocketChannel> channel
= new AtomicReference<AsynchronousServerSocketChannel>();
private volatile int backlog = NetworkConstants.SOMAXCONN; private volatile int backlog = NetworkConstants.SOMAXCONN;
private Map<SocketOption<?>, Object> options = new ConcurrentHashMap<SocketOption<?>, Object>();
AioServerSocketChannelConfig(AsynchronousServerSocketChannel channel) { private static final int DEFAULT_SND_BUF_SIZE = 32 * 1024;
this.channel = channel; private static final boolean DEFAULT_SO_REUSEADDR = false;
}
AioServerSocketChannelConfig() { AioServerSocketChannelConfig() {
} }
AioServerSocketChannelConfig(AsynchronousServerSocketChannel channel) {
this.channel.set(channel);
}
@Override @Override
public Map<ChannelOption<?>, Object> getOptions() { public Map<ChannelOption<?>, Object> getOptions() {
return getOptions(super.getOptions(), SO_RCVBUF, SO_REUSEADDR, SO_BACKLOG); return getOptions(super.getOptions(), SO_RCVBUF, SO_REUSEADDR, SO_BACKLOG);
@ -83,38 +90,22 @@ final class AioServerSocketChannelConfig extends DefaultChannelConfig
@Override @Override
public boolean isReuseAddress() { public boolean isReuseAddress() {
try { return (Boolean) getOption(StandardSocketOptions.SO_REUSEADDR, DEFAULT_SO_REUSEADDR);
return channel.getOption(StandardSocketOptions.SO_REUSEADDR);
} catch (IOException e) {
throw new ChannelException(e);
}
} }
@Override @Override
public void setReuseAddress(boolean reuseAddress) { public void setReuseAddress(boolean reuseAddress) {
try { setOption(StandardSocketOptions.SO_REUSEADDR, reuseAddress);
channel.setOption(StandardSocketOptions.SO_REUSEADDR, reuseAddress);
} catch (IOException e) {
throw new ChannelException(e);
}
} }
@Override @Override
public int getReceiveBufferSize() { public int getReceiveBufferSize() {
try { return (Integer) getOption(StandardSocketOptions.SO_RCVBUF, DEFAULT_SND_BUF_SIZE);
return channel.getOption(StandardSocketOptions.SO_RCVBUF);
} catch (IOException e) {
throw new ChannelException(e);
}
} }
@Override @Override
public void setReceiveBufferSize(int receiveBufferSize) { public void setReceiveBufferSize(int receiveBufferSize) {
try { setOption(StandardSocketOptions.SO_RCVBUF, receiveBufferSize);
channel.setOption(StandardSocketOptions.SO_RCVBUF, receiveBufferSize);
} catch (IOException e) {
throw new ChannelException(e);
}
} }
@Override @Override
@ -134,4 +125,57 @@ final class AioServerSocketChannelConfig extends DefaultChannelConfig
} }
this.backlog = backlog; this.backlog = backlog;
} }
private Object getOption(SocketOption option, Object defaultValue) {
if (channel.get() == null) {
Object value = options.get(option);
if (value == null) {
return defaultValue;
} else {
return value;
}
}
try {
return channel.get().getOption(option);
} catch (IOException e) {
throw new ChannelException(e);
}
}
private void setOption(SocketOption option, Object defaultValue) {
if (channel.get() == null) {
options.put(option, defaultValue);
return;
}
try {
channel.get().setOption(option, defaultValue);
} catch (IOException e) {
throw new ChannelException(e);
}
}
void active(AsynchronousServerSocketChannel channel) {
if (channel == null) {
throw new NullPointerException("channel");
}
if (this.channel.compareAndSet(null, channel)) {
propagateOptions();
}
}
private void propagateOptions() {
for (SocketOption option: options.keySet()) {
Object value = options.remove(option);
if (value != null) {
try {
channel.get().setOption(option, value);
} catch (IOException e) {
throw new ChannelException(e);
}
}
}
// not needed anymore
options = null;
}
} }

View File

@ -177,7 +177,7 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
super.doRegister(); super.doRegister();
if (ch == null) { if (ch == null) {
ch = newSocket(((AioEventLoopGroup) eventLoop().parent()).group); ch = newSocket(((AioEventLoopGroup) eventLoop().parent()).group);
config.channel = javaChannel(); config.active(javaChannel());
} }
if (remoteAddress() == null) { if (remoteAddress() == null) {

View File

@ -21,10 +21,13 @@ import io.netty.channel.DefaultChannelConfig;
import io.netty.channel.socket.SocketChannelConfig; import io.netty.channel.socket.SocketChannelConfig;
import java.io.IOException; import java.io.IOException;
import java.net.SocketOption;
import java.net.StandardSocketOptions; import java.net.StandardSocketOptions;
import java.nio.channels.InterruptedByTimeoutException; import java.nio.channels.InterruptedByTimeoutException;
import java.nio.channels.NetworkChannel; import java.nio.channels.NetworkChannel;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
import static io.netty.channel.ChannelOption.*; import static io.netty.channel.ChannelOption.*;
@ -34,16 +37,24 @@ import static io.netty.channel.ChannelOption.*;
final class AioSocketChannelConfig extends DefaultChannelConfig final class AioSocketChannelConfig extends DefaultChannelConfig
implements SocketChannelConfig { implements SocketChannelConfig {
volatile NetworkChannel channel; private final AtomicReference<NetworkChannel> channel = new AtomicReference<NetworkChannel>();
private volatile boolean allowHalfClosure; private volatile boolean allowHalfClosure;
private volatile long readTimeoutInMillis; private volatile long readTimeoutInMillis;
private volatile long writeTimeoutInMillis; private volatile long writeTimeoutInMillis;
private Map<SocketOption<?>, Object> options = new ConcurrentHashMap<SocketOption<?>, Object>();
private static final int DEFAULT_RCV_BUF_SIZE = 32 * 1024;
private static final int DEFAULT_SND_BUF_SIZE = 32 * 1024;
private static final int DEFAULT_SO_LINGER = -1;
private static final boolean DEFAULT_SO_KEEP_ALIVE = false;
private static final int DEFAULT_IP_TOS = 0;
private static final boolean DEFAULT_SO_REUSEADDR = false;
private static final boolean DEFAULT_TCP_NODELAY = false;
AioSocketChannelConfig() {
}
/**
* Creates a new instance.
*/
AioSocketChannelConfig(NetworkChannel channel) { AioSocketChannelConfig(NetworkChannel channel) {
this.channel = channel; this.channel.set(channel);
} }
@Override @Override
@ -124,82 +135,42 @@ final class AioSocketChannelConfig extends DefaultChannelConfig
@Override @Override
public int getReceiveBufferSize() { public int getReceiveBufferSize() {
channelNull(); return (Integer) getOption(StandardSocketOptions.SO_RCVBUF, DEFAULT_RCV_BUF_SIZE);
try {
return channel.getOption(StandardSocketOptions.SO_RCVBUF);
} catch (IOException e) {
throw new ChannelException(e);
}
} }
@Override @Override
public int getSendBufferSize() { public int getSendBufferSize() {
channelNull(); return (Integer) getOption(StandardSocketOptions.SO_SNDBUF, DEFAULT_SND_BUF_SIZE);
try {
return channel.getOption(StandardSocketOptions.SO_SNDBUF);
} catch (IOException e) {
throw new ChannelException(e);
}
} }
@Override @Override
public int getSoLinger() { public int getSoLinger() {
channelNull(); return (Integer) getOption(StandardSocketOptions.SO_LINGER, DEFAULT_SO_LINGER);
try {
return channel.getOption(StandardSocketOptions.SO_LINGER);
} catch (IOException e) {
throw new ChannelException(e);
}
} }
@Override @Override
public int getTrafficClass() { public int getTrafficClass() {
channelNull(); return (Integer) getOption(StandardSocketOptions.IP_TOS, DEFAULT_IP_TOS);
try {
return channel.getOption(StandardSocketOptions.IP_TOS);
} catch (IOException e) {
throw new ChannelException(e);
}
} }
@Override @Override
public boolean isKeepAlive() { public boolean isKeepAlive() {
channelNull(); return (Boolean) getOption(StandardSocketOptions.SO_KEEPALIVE, DEFAULT_SO_KEEP_ALIVE);
try {
return channel.getOption(StandardSocketOptions.SO_KEEPALIVE);
} catch (IOException e) {
throw new ChannelException(e);
}
} }
@Override @Override
public boolean isReuseAddress() { public boolean isReuseAddress() {
channelNull(); return (Boolean) getOption(StandardSocketOptions.SO_REUSEADDR, DEFAULT_SO_REUSEADDR);
try {
return channel.getOption(StandardSocketOptions.SO_REUSEADDR);
} catch (IOException e) {
throw new ChannelException(e);
}
} }
@Override @Override
public boolean isTcpNoDelay() { public boolean isTcpNoDelay() {
channelNull(); return (Boolean) getOption(StandardSocketOptions.TCP_NODELAY, DEFAULT_TCP_NODELAY);
try {
return channel.getOption(StandardSocketOptions.SO_REUSEADDR);
} catch (IOException e) {
throw new ChannelException(e);
}
} }
@Override @Override
public void setKeepAlive(boolean keepAlive) { public void setKeepAlive(boolean keepAlive) {
channelNull(); setOption(StandardSocketOptions.SO_KEEPALIVE, keepAlive);
try {
channel.setOption(StandardSocketOptions.SO_KEEPALIVE, keepAlive);
} catch (IOException e) {
throw new ChannelException(e);
}
} }
@Override @Override
@ -210,59 +181,58 @@ final class AioSocketChannelConfig extends DefaultChannelConfig
@Override @Override
public void setReceiveBufferSize(int receiveBufferSize) { public void setReceiveBufferSize(int receiveBufferSize) {
channelNull(); setOption(StandardSocketOptions.SO_RCVBUF, receiveBufferSize);
try {
channel.setOption(StandardSocketOptions.SO_RCVBUF, receiveBufferSize);
} catch (IOException e) {
throw new ChannelException(e);
}
} }
@Override @Override
public void setReuseAddress(boolean reuseAddress) { public void setReuseAddress(boolean reuseAddress) {
channelNull(); setOption(StandardSocketOptions.SO_REUSEADDR, reuseAddress);
try {
channel.setOption(StandardSocketOptions.SO_REUSEADDR, reuseAddress);
} catch (IOException e) {
throw new ChannelException(e);
}
} }
@Override @Override
public void setSendBufferSize(int sendBufferSize) { public void setSendBufferSize(int sendBufferSize) {
channelNull(); setOption(StandardSocketOptions.SO_SNDBUF, sendBufferSize);
try {
channel.setOption(StandardSocketOptions.SO_SNDBUF, sendBufferSize);
} catch (IOException e) {
throw new ChannelException(e);
}
} }
@Override @Override
public void setSoLinger(int soLinger) { public void setSoLinger(int soLinger) {
channelNull(); setOption(StandardSocketOptions.SO_LINGER, soLinger);
try {
channel.setOption(StandardSocketOptions.SO_LINGER, soLinger);
} catch (IOException e) {
throw new ChannelException(e);
}
} }
@Override @Override
public void setTcpNoDelay(boolean tcpNoDelay) { public void setTcpNoDelay(boolean tcpNoDelay) {
channelNull(); setOption(StandardSocketOptions.TCP_NODELAY, tcpNoDelay);
}
@Override
public void setTrafficClass(int trafficClass) {
setOption(StandardSocketOptions.IP_TOS, trafficClass);
}
private Object getOption(SocketOption option, Object defaultValue) {
if (channel.get() == null) {
Object value = options.get(option);
if (value == null) {
return defaultValue;
} else {
return value;
}
}
try { try {
channel.setOption(StandardSocketOptions.TCP_NODELAY, tcpNoDelay); return channel.get().getOption(option);
} catch (IOException e) { } catch (IOException e) {
throw new ChannelException(e); throw new ChannelException(e);
} }
} }
@Override private void setOption(SocketOption option, Object defaultValue) {
public void setTrafficClass(int trafficClass) { if (channel.get() == null) {
channelNull(); options.put(option, defaultValue);
return;
}
try { try {
channel.setOption(StandardSocketOptions.IP_TOS, trafficClass); channel.get().setOption(option, defaultValue);
} catch (IOException e) { } catch (IOException e) {
throw new ChannelException(e); throw new ChannelException(e);
} }
@ -324,9 +294,27 @@ final class AioSocketChannelConfig extends DefaultChannelConfig
this.allowHalfClosure = allowHalfClosure; this.allowHalfClosure = allowHalfClosure;
} }
private void channelNull() { void active(NetworkChannel channel) {
if (channel == null) { if (channel == null) {
throw new IllegalStateException("Channel not set while try to change options on it"); throw new NullPointerException("channel");
} }
if (this.channel.compareAndSet(null, channel)) {
propagateOptions();
}
}
private void propagateOptions() {
for (SocketOption option: options.keySet()) {
Object value = options.remove(option);
if (value != null) {
try {
channel.get().setOption(option, value);
} catch (IOException e) {
throw new ChannelException(e);
}
}
}
// not needed anymore
options = null;
} }
} }