diff --git a/transport-sctp/src/main/java/io/netty/channel/sctp/DefaultSctpChannelConfig.java b/transport-sctp/src/main/java/io/netty/channel/sctp/DefaultSctpChannelConfig.java index 475ad752ae..6f66119e9e 100644 --- a/transport-sctp/src/main/java/io/netty/channel/sctp/DefaultSctpChannelConfig.java +++ b/transport-sctp/src/main/java/io/netty/channel/sctp/DefaultSctpChannelConfig.java @@ -15,21 +15,25 @@ */ package io.netty.channel.sctp; -import com.sun.nio.sctp.SctpChannel; -import static com.sun.nio.sctp.SctpStandardSocketOptions.*; +import static io.netty.channel.ChannelOption.*; import io.netty.channel.ChannelException; +import io.netty.channel.ChannelOption; import io.netty.channel.DefaultChannelConfig; -import io.netty.channel.socket.nio.NioSocketChannelConfig; -import io.netty.util.internal.ConversionUtil; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import com.sun.nio.sctp.SctpChannel; +import com.sun.nio.sctp.SctpStandardSocketOptions.InitMaxStreams; /** * The default {@link NioSocketChannelConfig} implementation for SCTP. */ class DefaultSctpChannelConfig extends DefaultChannelConfig implements SctpChannelConfig { - private SctpChannel channel; + private final SctpChannel channel; DefaultSctpChannelConfig(SctpChannel channel) { if (channel == null) { @@ -39,30 +43,63 @@ class DefaultSctpChannelConfig extends DefaultChannelConfig implements SctpChann } @Override - public boolean setOption(String key, Object value) { - if (super.setOption(key, value)) { - return true; + public Map, Object> getOptions() { + // TODO: Investigate if other SCTP options such as SCTP_PRIMARY_ADDR can be exposed. + return getOptions(super.getOptions(), SO_RCVBUF, SO_SNDBUF, SCTP_NODELAY, SCTP_INIT_MAXSTREAMS); + } + + @Override + public T getOption(ChannelOption option) { + if (option == SO_RCVBUF) { + return (T) Integer.valueOf(getReceiveBufferSize()); + } + if (option == SO_SNDBUF) { + return (T) Integer.valueOf(getSendBufferSize()); + } + if (option == SCTP_NODELAY) { + return (T) Boolean.valueOf(isSctpNoDelay()); + } + if (option == SCTP_INIT_MAXSTREAMS) { + InitMaxStreams ims = getInitMaxStreams(); + if (ims == null) { + return null; + } + List values = new ArrayList(2); + values.add(ims.maxInStreams()); + values.add(ims.maxOutStreams()); + @SuppressWarnings("unchecked") + T ret = (T) values; + return ret; } - if (key.equals("receiveBufferSize")) { - setReceiveBufferSize(ConversionUtil.toInt(value)); - } else if (key.equals("sendBufferSize")) { - setSendBufferSize(ConversionUtil.toInt(value)); - } else if (key.equals("sctpNoDelay")) { - setSctpNoDelay(ConversionUtil.toBoolean(value)); - } else if (key.equals("sctpInitMaxStreams")) { - final Integer maxInOutStreams = ConversionUtil.toInt(value); - setInitMaxStreams(InitMaxStreams.create(maxInOutStreams, maxInOutStreams)); + return super.getOption(option); + } + + @Override + public boolean setOption(ChannelOption option, T value) { + validate(option, value); + + if (option == SO_RCVBUF) { + setReceiveBufferSize((Integer) value); + } else if (option == SO_SNDBUF) { + setSendBufferSize((Integer) value); + } else if (option == SCTP_NODELAY) { + setSctpNoDelay((Boolean) value); + } else if (option == SCTP_INIT_MAXSTREAMS) { + @SuppressWarnings("unchecked") + List values = (List) value; + setInitMaxStreams(InitMaxStreams.create(values.get(0), values.get(1))); } else { - return false; + return super.setOption(option, value); } + return true; } @Override public boolean isSctpNoDelay() { try { - return channel.getOption(SCTP_NODELAY); + return channel.getOption(com.sun.nio.sctp.SctpStandardSocketOptions.SCTP_NODELAY); } catch (IOException e) { throw new ChannelException(e); } @@ -71,7 +108,7 @@ class DefaultSctpChannelConfig extends DefaultChannelConfig implements SctpChann @Override public void setSctpNoDelay(boolean sctpNoDelay) { try { - channel.setOption(SCTP_NODELAY, sctpNoDelay); + channel.setOption(com.sun.nio.sctp.SctpStandardSocketOptions.SCTP_NODELAY, sctpNoDelay); } catch (IOException e) { throw new ChannelException(e); } @@ -80,7 +117,7 @@ class DefaultSctpChannelConfig extends DefaultChannelConfig implements SctpChann @Override public int getSendBufferSize() { try { - return channel.getOption(SO_SNDBUF); + return channel.getOption(com.sun.nio.sctp.SctpStandardSocketOptions.SO_SNDBUF); } catch (IOException e) { throw new ChannelException(e); } @@ -89,7 +126,7 @@ class DefaultSctpChannelConfig extends DefaultChannelConfig implements SctpChann @Override public void setSendBufferSize(int sendBufferSize) { try { - channel.setOption(SO_SNDBUF, sendBufferSize); + channel.setOption(com.sun.nio.sctp.SctpStandardSocketOptions.SO_SNDBUF, sendBufferSize); } catch (IOException e) { throw new ChannelException(e); } @@ -98,7 +135,7 @@ class DefaultSctpChannelConfig extends DefaultChannelConfig implements SctpChann @Override public int getReceiveBufferSize() { try { - return channel.getOption(SO_RCVBUF); + return channel.getOption(com.sun.nio.sctp.SctpStandardSocketOptions.SO_RCVBUF); } catch (IOException e) { throw new ChannelException(e); } @@ -107,7 +144,7 @@ class DefaultSctpChannelConfig extends DefaultChannelConfig implements SctpChann @Override public void setReceiveBufferSize(int receiveBufferSize) { try { - channel.setOption(SO_RCVBUF, receiveBufferSize); + channel.setOption(com.sun.nio.sctp.SctpStandardSocketOptions.SO_RCVBUF, receiveBufferSize); } catch (IOException e) { throw new ChannelException(e); } @@ -116,7 +153,7 @@ class DefaultSctpChannelConfig extends DefaultChannelConfig implements SctpChann @Override public InitMaxStreams getInitMaxStreams() { try { - return channel.getOption(SCTP_INIT_MAXSTREAMS); + return channel.getOption(com.sun.nio.sctp.SctpStandardSocketOptions.SCTP_INIT_MAXSTREAMS); } catch (IOException e) { throw new ChannelException(e); } @@ -125,7 +162,7 @@ class DefaultSctpChannelConfig extends DefaultChannelConfig implements SctpChann @Override public void setInitMaxStreams(InitMaxStreams initMaxStreams) { try { - channel.setOption(SCTP_INIT_MAXSTREAMS, initMaxStreams); + channel.setOption(com.sun.nio.sctp.SctpStandardSocketOptions.SCTP_INIT_MAXSTREAMS, initMaxStreams); } catch (IOException e) { throw new ChannelException(e); } diff --git a/transport/src/main/java/io/netty/channel/AbstractChannel.java b/transport/src/main/java/io/netty/channel/AbstractChannel.java index 27dffbd34c..dfb6efffca 100644 --- a/transport/src/main/java/io/netty/channel/AbstractChannel.java +++ b/transport/src/main/java/io/netty/channel/AbstractChannel.java @@ -90,7 +90,10 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha private ScheduledFuture connectTimeoutFuture; private ConnectException connectTimeoutException; - private long writtenAmount; + private long flushedAmount; + private FlushFutureEntry flushFuture; + private FlushFutureEntry lastFlushFuture; + private ClosedChannelException closedChannelException; /** Cache for the string representation of this channel */ private boolean strValActive; @@ -425,6 +428,10 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha if (AbstractChannel.this.eventLoop != null) { throw new IllegalStateException("registered to an event loop already"); } + if (!isCompatible(eventLoop)) { + throw new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()); + } + AbstractChannel.this.eventLoop = eventLoop; assert eventLoop().inEventLoop(); @@ -598,7 +605,13 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha future.setFailure(t); } + if (closedChannelException != null) { + closedChannelException = new ClosedChannelException(); + } + + notifyFlushFutures(closedChannelException); notifyClosureListeners(); + if (wasActive && !isActive()) { pipeline().fireChannelInactive(); } @@ -645,8 +658,8 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha public void read() { assert eventLoop().inEventLoop(); long readAmount = 0; + boolean closed = false; try { - boolean closed = false; for (;;) { int localReadAmount = doRead(); if (localReadAmount > 0) { @@ -665,41 +678,104 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha if (readAmount > 0) { pipeline.fireInboundBufferUpdated(); } - - if (closed) { - close(voidFuture()); - } } catch (Throwable t) { pipeline().fireExceptionCaught(t); if (t instanceof IOException) { close(voidFuture()); } + } finally { + if (closed && isOpen()) { + close(voidFuture()); + } } } @Override public void flush(final ChannelFuture future) { - // FIXME: Notify future properly using writtenAmount. if (eventLoop().inEventLoop()) { + // Append flush future to the notification list. + if (future != voidFuture && !future.isDone()) { + FlushFutureEntry newEntry = new FlushFutureEntry(future, flushedAmount + out().size(), null); + if (flushFuture == null) { + flushFuture = lastFlushFuture = newEntry; + } else { + lastFlushFuture.next = newEntry; + lastFlushFuture = newEntry; + } + } + + // Perform outbound I/O. try { - writtenAmount += doFlush(); - } catch (Exception e) { - future.setFailure(e); + flushedAmount += doFlush(); + } catch (Throwable t) { + notifyFlushFutures(t); + pipeline().fireExceptionCaught(t); + if (t instanceof IOException) { + close(voidFuture()); + } + } finally { + // Notify flush futures if necessary. + notifyFlushFutures(); + if (!isActive()) { + close(voidFuture()); + } } } else { eventLoop().execute(new Runnable() { @Override public void run() { - try { - writtenAmount += doFlush(); - } catch (Exception e) { - future.setFailure(e); - } + flush(future); } }); } } + private void notifyFlushFutures() { + FlushFutureEntry e = flushFuture; + if (e == null) { + return; + } + + final long flushedAmount = AbstractChannel.this.flushedAmount; + do { + if (e.expectedFlushedAmount > flushedAmount) { + break; + } + e.future.setSuccess(); + e = e.next; + } while (e != null); + + flushFuture = e; + + // Avoid overflow + if (e == null) { + // Reset the counter if there's nothing in the notification list. + AbstractChannel.this.flushedAmount = 0; + } else if (flushedAmount >= 0x1000000000000000L) { + // Otherwise, reset the counter only when the counter grew pretty large + // so that we can reduce the cost of updating all entries in the notification list. + AbstractChannel.this.flushedAmount = 0; + do { + e.expectedFlushedAmount -= flushedAmount; + e = e.next; + } while (e != null); + } + } + + private void notifyFlushFutures(Throwable cause) { + FlushFutureEntry e = flushFuture; + if (e == null) { + return; + } + + do { + e.future.setFailure(cause); + e = e.next; + } while (e != null); + + flushFuture = null; + } + private boolean ensureOpen(ChannelFuture future) { if (isOpen()) { return true; @@ -719,6 +795,20 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha } } + private static class FlushFutureEntry { + private final ChannelFuture future; + private long expectedFlushedAmount; + private FlushFutureEntry next; + + FlushFutureEntry(ChannelFuture future, long expectedWrittenAmount, FlushFutureEntry next) { + this.future = future; + expectedFlushedAmount = expectedWrittenAmount; + this.next = next; + } + } + + protected abstract boolean isCompatible(EventLoop loop); + protected abstract java.nio.channels.Channel javaChannel(); protected abstract ChannelBufferHolder firstOut(); diff --git a/transport/src/main/java/io/netty/channel/Channel.java b/transport/src/main/java/io/netty/channel/Channel.java index 8bce3295f0..59d125e68c 100644 --- a/transport/src/main/java/io/netty/channel/Channel.java +++ b/transport/src/main/java/io/netty/channel/Channel.java @@ -18,7 +18,6 @@ package io.netty.channel; import io.netty.channel.socket.DatagramChannel; import io.netty.channel.socket.ServerSocketChannel; import io.netty.channel.socket.SocketChannel; -import io.netty.channel.socket.nio.NioSocketChannelConfig; import io.netty.util.AttributeMap; import java.net.InetSocketAddress; diff --git a/transport/src/main/java/io/netty/channel/ChannelBufferHolder.java b/transport/src/main/java/io/netty/channel/ChannelBufferHolder.java index 0142d963d4..33145f2a3f 100644 --- a/transport/src/main/java/io/netty/channel/ChannelBufferHolder.java +++ b/transport/src/main/java/io/netty/channel/ChannelBufferHolder.java @@ -122,6 +122,22 @@ public final class ChannelBufferHolder { default: throw new Error(); } + } + public int size() { + switch (bypassDirection) { + case 0: + if (hasMessageBuffer()) { + return messageBuffer().size(); + } else { + return byteBuffer().readableBytes(); + } + case 1: + return ctx.nextIn().size(); + case 2: + return ctx.out().size(); + default: + throw new Error(); + } } } diff --git a/transport/src/main/java/io/netty/channel/ChannelConfig.java b/transport/src/main/java/io/netty/channel/ChannelConfig.java index 5e7c8d2938..d58b14fabf 100644 --- a/transport/src/main/java/io/netty/channel/ChannelConfig.java +++ b/transport/src/main/java/io/netty/channel/ChannelConfig.java @@ -16,8 +16,9 @@ package io.netty.channel; import io.netty.channel.socket.SocketChannelConfig; -import io.netty.channel.socket.nio.NioChannelConfig; +import java.nio.ByteBuffer; +import java.nio.channels.WritableByteChannel; import java.util.Map; /** @@ -50,7 +51,7 @@ import java.util.Map; *

* More options are available in the sub-types of {@link ChannelConfig}. For * example, you can configure the parameters which are specific to a TCP/IP - * socket as explained in {@link SocketChannelConfig} or {@link NioChannelConfig}. + * socket as explained in {@link SocketChannelConfig}. * * @apiviz.has io.netty.channel.ChannelPipelineFactory * @apiviz.composedOf io.netty.channel.ReceiveBufferSizePredictor @@ -59,10 +60,14 @@ import java.util.Map; */ public interface ChannelConfig { + Map, Object> getOptions(); + /** * Sets the configuration properties from the specified {@link Map}. */ - void setOptions(Map options); + boolean setOptions(Map, ?> options); + + T getOption(ChannelOption option); /** * Sets a configuration property with the specified name and value. @@ -84,7 +89,7 @@ public interface ChannelConfig { * * @return {@code true} if and only if the property has been set */ - boolean setOption(String name, Object value); + boolean setOption(ChannelOption option, T value); /** * Returns the connect timeout of the channel in milliseconds. If the @@ -104,4 +109,25 @@ public interface ChannelConfig { * {@code 0} to disable. */ void setConnectTimeoutMillis(int connectTimeoutMillis); + + /** + * Returns the maximum loop count for a write operation until + * {@link WritableByteChannel#write(ByteBuffer)} returns a non-zero value. + * It is similar to what a spin lock is used for in concurrency programming. + * It improves memory utilization and write throughput depending on + * the platform that JVM runs on. The default value is {@code 16}. + */ + int getWriteSpinCount(); + + /** + * Sets the maximum loop count for a write operation until + * {@link WritableByteChannel#write(ByteBuffer)} returns a non-zero value. + * It is similar to what a spin lock is used for in concurrency programming. + * It improves memory utilization and write throughput depending on + * the platform that JVM runs on. The default value is {@code 16}. + * + * @throws IllegalArgumentException + * if the specified value is {@code 0} or less than {@code 0} + */ + void setWriteSpinCount(int writeSpinCount); } diff --git a/transport/src/main/java/io/netty/channel/ChannelFuture.java b/transport/src/main/java/io/netty/channel/ChannelFuture.java index a4a2d52f3f..c984285992 100644 --- a/transport/src/main/java/io/netty/channel/ChannelFuture.java +++ b/transport/src/main/java/io/netty/channel/ChannelFuture.java @@ -15,10 +15,10 @@ */ package io.netty.channel; -import java.util.concurrent.TimeUnit; - import io.netty.bootstrap.ClientBootstrap; +import java.util.concurrent.TimeUnit; + /** * The result of an asynchronous {@link Channel} I/O operation. *

diff --git a/transport/src/main/java/io/netty/channel/ChannelOption.java b/transport/src/main/java/io/netty/channel/ChannelOption.java new file mode 100644 index 0000000000..3c9ef0df08 --- /dev/null +++ b/transport/src/main/java/io/netty/channel/ChannelOption.java @@ -0,0 +1,122 @@ +package io.netty.channel; + +import java.net.InetAddress; +import java.net.NetworkInterface; +import java.net.SocketAddress; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +public class ChannelOption implements Comparable> { + + public static final ChannelOption CONNECT_TIMEOUT_MILLIS = + new ChannelOption("CONNECT_TIMEOUT_MILLIS", Integer.class); + public static final ChannelOption WRITE_SPIN_COUNT = + new ChannelOption("WRITE_SPIN_COUNT", Integer.class); + + public static final ChannelOption SO_BROADCAST = + new ChannelOption("SO_BROADCAST", Boolean.class); + public static final ChannelOption SO_KEEPALIVE = + new ChannelOption("SO_KEEPALIVE", Boolean.class); + public static final ChannelOption SO_SNDBUF = + new ChannelOption("SO_SNDBUF", Integer.class); + public static final ChannelOption SO_RCVBUF = + new ChannelOption("SO_RCVBUF", Integer.class); + public static final ChannelOption SO_REUSEADDR = + new ChannelOption("SO_REUSEADDR", Boolean.class); + public static final ChannelOption SO_LINGER = + new ChannelOption("SO_LINGER", Integer.class); + public static final ChannelOption SO_BACKLOG = + new ChannelOption("SO_BACKLOG", Integer.class); + + public static final ChannelOption IP_TOS = + new ChannelOption("IP_TOS", Integer.class); + public static final ChannelOption IP_MULTICAST_ADDR = + new ChannelOption("IP_MULTICAST_ADDR", InetAddress.class); + public static final ChannelOption IP_MULTICAST_IF = + new ChannelOption("IP_MULTICAST_IF", NetworkInterface.class); + public static final ChannelOption IP_MULTICAST_TTL = + new ChannelOption("IP_MULTICAST_TTL", Integer.class); + public static final ChannelOption IP_MULTICAST_LOOP_DISABLED = + new ChannelOption("IP_MULTICAST_LOOP_DISABLED", Boolean.class); + + public static final ChannelOption TCP_NODELAY = + new ChannelOption("TCP_NODELAY", Boolean.class); + + public static final ChannelOption SCTP_DISABLE_FRAGMENTS = + new ChannelOption("SCTP_DISABLE_FRAGMENTS", Boolean.class); + public static final ChannelOption SCTP_EXPLICIT_COMPLETE = + new ChannelOption("SCTP_EXPLICIT_COMPLETE", Boolean.class); + public static final ChannelOption SCTP_FRAGMENT_INTERLEAVE = + new ChannelOption("SCTP_FRAGMENT_INTERLEAVE", Integer.class); + @SuppressWarnings("unchecked") + public static final ChannelOption> SCTP_INIT_MAXSTREAMS = + new ChannelOption>("SCTP_INIT_MAXSTREAMS", (Class>)(Class) List.class) { + @Override + public void validate(List value) { + super.validate(value); + if (value.size() != 2) { + throw new IllegalArgumentException("value must be a List of 2 Integers: " + value); + } + if (value.get(0) == null) { + throw new NullPointerException("value[0]"); + } + if (value.get(1) == null) { + throw new NullPointerException("value[1]"); + } + } + }; + public static final ChannelOption SCTP_NODELAY = + new ChannelOption("SCTP_NODELAY", Boolean.class); + public static final ChannelOption SCTP_PRIMARY_ADDR = + new ChannelOption("SCTP_PRIMARY_ADDR", SocketAddress.class); + public static final ChannelOption SCTP_SET_PEER_PRIMARY_ADDR = + new ChannelOption("SCTP_SET_PEER_PRIMARY_ADDR", SocketAddress.class); + + private static final ConcurrentMap names = new ConcurrentHashMap(); + + private final String name; + private final Class valueType; + private final String strVal; + + public ChannelOption(String name, Class valueType) { + if (name == null) { + throw new NullPointerException("name"); + } + if (valueType == null) { + throw new NullPointerException("valueType"); + } + + if (names.putIfAbsent(name, Boolean.TRUE) != null) { + throw new IllegalArgumentException("option name already in use: " + name); + } + + this.name = name; + this.valueType = valueType; + strVal = name + '[' + valueType.getSimpleName() + ']'; + } + + public String name() { + return name; + } + + public Class valueType() { + return valueType; + } + + public void validate(T value) { + if (value == null) { + throw new NullPointerException("value"); + } + } + + @Override + public int compareTo(ChannelOption o) { + return name().compareTo(o.name()); + } + + @Override + public String toString() { + return strVal; + } +} diff --git a/transport/src/main/java/io/netty/channel/DefaultChannelConfig.java b/transport/src/main/java/io/netty/channel/DefaultChannelConfig.java index 50d0e6a60f..cfc3d362f7 100644 --- a/transport/src/main/java/io/netty/channel/DefaultChannelConfig.java +++ b/transport/src/main/java/io/netty/channel/DefaultChannelConfig.java @@ -15,9 +15,10 @@ */ package io.netty.channel; +import static io.netty.channel.ChannelOption.*; import io.netty.channel.socket.SocketChannelConfig; -import io.netty.util.internal.ConversionUtil; +import java.util.IdentityHashMap; import java.util.Map; import java.util.Map.Entry; @@ -29,24 +30,76 @@ public class DefaultChannelConfig implements ChannelConfig { private static final int DEFAULT_CONNECT_TIMEOUT = 30000; private volatile int connectTimeoutMillis = DEFAULT_CONNECT_TIMEOUT; + private volatile int writeSpinCount = 16; @Override - public void setOptions(Map options) { - for (Entry e: options.entrySet()) { - setOption(e.getKey(), e.getValue()); + public Map, Object> getOptions() { + return getOptions(null, CONNECT_TIMEOUT_MILLIS, WRITE_SPIN_COUNT); + } + + protected Map, Object> getOptions(Map, Object> result, ChannelOption... options) { + if (result == null) { + result = new IdentityHashMap, Object>(); } + for (ChannelOption o: options) { + result.put(o, getOption(o)); + } + return result; } @Override - public boolean setOption(String key, Object value) { - if ("connectTimeoutMillis".equals(key)) { - setConnectTimeoutMillis(ConversionUtil.toInt(value)); + public boolean setOptions(Map, ?> options) { + if (options == null) { + throw new NullPointerException("options"); + } + + boolean setAllOptions = true; + for (Entry, ?> e: options.entrySet()) { + if (!setOption((ChannelOption) e.getKey(), e.getValue())) { + setAllOptions = false; + } + } + + return setAllOptions; + } + + @Override + public T getOption(ChannelOption option) { + if (option == null) { + throw new NullPointerException("option"); + } + + if (option == CONNECT_TIMEOUT_MILLIS) { + return (T) Integer.valueOf(getConnectTimeoutMillis()); + } else if (option == WRITE_SPIN_COUNT) { + return (T) Integer.valueOf(getWriteSpinCount()); + } + + return null; + } + + @Override + public boolean setOption(ChannelOption option, T value) { + validate(option, value); + + if (option == CONNECT_TIMEOUT_MILLIS) { + setConnectTimeoutMillis((Integer) value); + } else if (option == WRITE_SPIN_COUNT) { + setWriteSpinCount((Integer) value); } else { return false; } + return true; } + protected void validate(ChannelOption option, T value) { + if (option == null) { + throw new NullPointerException("option"); + } + option.validate(value); + } + @Override public int getConnectTimeoutMillis() { return connectTimeoutMillis; @@ -60,4 +113,18 @@ public class DefaultChannelConfig implements ChannelConfig { } this.connectTimeoutMillis = connectTimeoutMillis; } + + @Override + public int getWriteSpinCount() { + return writeSpinCount; + } + + @Override + public void setWriteSpinCount(int writeSpinCount) { + if (writeSpinCount <= 0) { + throw new IllegalArgumentException( + "writeSpinCount must be a positive integer."); + } + this.writeSpinCount = writeSpinCount; + } } diff --git a/transport/src/main/java/io/netty/channel/socket/DefaultDatagramChannelConfig.java b/transport/src/main/java/io/netty/channel/socket/DefaultDatagramChannelConfig.java index eeb3c17747..c217980ba6 100644 --- a/transport/src/main/java/io/netty/channel/socket/DefaultDatagramChannelConfig.java +++ b/transport/src/main/java/io/netty/channel/socket/DefaultDatagramChannelConfig.java @@ -15,9 +15,10 @@ */ package io.netty.channel.socket; +import static io.netty.channel.ChannelOption.*; import io.netty.channel.ChannelException; +import io.netty.channel.ChannelOption; import io.netty.channel.DefaultChannelConfig; -import io.netty.util.internal.ConversionUtil; import java.io.IOException; import java.net.DatagramSocket; @@ -25,12 +26,12 @@ import java.net.InetAddress; import java.net.MulticastSocket; import java.net.NetworkInterface; import java.net.SocketException; +import java.util.Map; /** * The default {@link DatagramChannelConfig} implementation. */ -public class DefaultDatagramChannelConfig extends DefaultChannelConfig - implements DatagramChannelConfig { +public class DefaultDatagramChannelConfig extends DefaultChannelConfig implements DatagramChannelConfig { private final DatagramSocket socket; @@ -45,32 +46,76 @@ public class DefaultDatagramChannelConfig extends DefaultChannelConfig } @Override - public boolean setOption(String key, Object value) { - if (super.setOption(key, value)) { - return true; + public Map, Object> getOptions() { + return getOptions( + super.getOptions(), + SO_BROADCAST, SO_RCVBUF, SO_SNDBUF, SO_REUSEADDR, IP_MULTICAST_LOOP_DISABLED, + IP_MULTICAST_ADDR, IP_MULTICAST_IF, IP_MULTICAST_TTL, IP_TOS); + } + + @Override + public T getOption(ChannelOption option) { + if (option == SO_BROADCAST) { + return (T) Boolean.valueOf(isBroadcast()); + } + if (option == SO_RCVBUF) { + return (T) Integer.valueOf(getReceiveBufferSize()); + } + if (option == SO_SNDBUF) { + return (T) Integer.valueOf(getSendBufferSize()); + } + if (option == SO_REUSEADDR) { + return (T) Boolean.valueOf(isReuseAddress()); + } + if (option == IP_MULTICAST_LOOP_DISABLED) { + return (T) Boolean.valueOf(isLoopbackModeDisabled()); + } + if (option == IP_MULTICAST_ADDR) { + @SuppressWarnings("unchecked") + T i = (T) getInterface(); + return i; + } + if (option == IP_MULTICAST_IF) { + @SuppressWarnings("unchecked") + T i = (T) getNetworkInterface(); + return i; + } + if (option == IP_MULTICAST_TTL) { + return (T) Integer.valueOf(getTimeToLive()); + } + if (option == IP_TOS) { + return (T) Integer.valueOf(getTrafficClass()); } - if (key.equals("broadcast")) { - setBroadcast(ConversionUtil.toBoolean(value)); - } else if (key.equals("receiveBufferSize")) { - setReceiveBufferSize(ConversionUtil.toInt(value)); - } else if (key.equals("sendBufferSize")) { - setSendBufferSize(ConversionUtil.toInt(value)); - } else if (key.equals("reuseAddress")) { - setReuseAddress(ConversionUtil.toBoolean(value)); - } else if (key.equals("loopbackModeDisabled")) { - setLoopbackModeDisabled(ConversionUtil.toBoolean(value)); - } else if (key.equals("interface")) { + return super.getOption(option); + } + + @Override + public boolean setOption(ChannelOption option, T value) { + validate(option, value); + + if (option == SO_BROADCAST) { + setBroadcast((Boolean) value); + } else if (option == SO_RCVBUF) { + setReceiveBufferSize((Integer) value); + } else if (option == SO_SNDBUF) { + setSendBufferSize((Integer) value); + } else if (option == SO_REUSEADDR) { + setReuseAddress((Boolean) value); + } else if (option == IP_MULTICAST_LOOP_DISABLED) { + setLoopbackModeDisabled((Boolean) value); + } else if (option == IP_MULTICAST_ADDR) { setInterface((InetAddress) value); - } else if (key.equals("networkInterface")) { + } else if (option == IP_MULTICAST_IF) { setNetworkInterface((NetworkInterface) value); - } else if (key.equals("timeToLive")) { - setTimeToLive(ConversionUtil.toInt(value)); - } else if (key.equals("trafficClass")) { - setTrafficClass(ConversionUtil.toInt(value)); + } else if (option == IP_MULTICAST_TTL) { + setTimeToLive((Integer) value); + } else if (option == IP_TOS) { + setTrafficClass((Integer) value); } else { - return false; + return super.setOption(option, value); } + return true; } diff --git a/transport/src/main/java/io/netty/channel/socket/DefaultServerSocketChannelConfig.java b/transport/src/main/java/io/netty/channel/socket/DefaultServerSocketChannelConfig.java index 1e7b8eeb48..b109aacf4d 100644 --- a/transport/src/main/java/io/netty/channel/socket/DefaultServerSocketChannelConfig.java +++ b/transport/src/main/java/io/netty/channel/socket/DefaultServerSocketChannelConfig.java @@ -15,12 +15,14 @@ */ package io.netty.channel.socket; +import static io.netty.channel.ChannelOption.*; import io.netty.channel.ChannelException; +import io.netty.channel.ChannelOption; import io.netty.channel.DefaultChannelConfig; -import io.netty.util.internal.ConversionUtil; import java.net.ServerSocket; import java.net.SocketException; +import java.util.Map; /** * The default {@link ServerSocketChannelConfig} implementation. @@ -42,20 +44,39 @@ public class DefaultServerSocketChannelConfig extends DefaultChannelConfig } @Override - public boolean setOption(String key, Object value) { - if (super.setOption(key, value)) { - return true; + public Map, Object> getOptions() { + return getOptions(super.getOptions(), SO_RCVBUF, SO_REUSEADDR, SO_BACKLOG); + } + + @Override + public T getOption(ChannelOption option) { + if (option == SO_RCVBUF) { + return (T) Integer.valueOf(getReceiveBufferSize()); + } + if (option == SO_REUSEADDR) { + return (T) Boolean.valueOf(isReuseAddress()); + } + if (option == SO_BACKLOG) { + return (T) Integer.valueOf(getBacklog()); } - if (key.equals("receiveBufferSize")) { - setReceiveBufferSize(ConversionUtil.toInt(value)); - } else if (key.equals("reuseAddress")) { - setReuseAddress(ConversionUtil.toBoolean(value)); - } else if (key.equals("backlog")) { - setBacklog(ConversionUtil.toInt(value)); + return super.getOption(option); + } + + @Override + public boolean setOption(ChannelOption option, T value) { + validate(option, value); + + if (option == SO_RCVBUF) { + setReceiveBufferSize((Integer) value); + } else if (option == SO_REUSEADDR) { + setReuseAddress((Boolean) value); + } else if (option == SO_BACKLOG) { + setBacklog((Integer) value); } else { - return false; + return super.setOption(option, value); } + return true; } diff --git a/transport/src/main/java/io/netty/channel/socket/DefaultSocketChannelConfig.java b/transport/src/main/java/io/netty/channel/socket/DefaultSocketChannelConfig.java index 7c2a92ffbd..34a9240cc6 100644 --- a/transport/src/main/java/io/netty/channel/socket/DefaultSocketChannelConfig.java +++ b/transport/src/main/java/io/netty/channel/socket/DefaultSocketChannelConfig.java @@ -15,12 +15,14 @@ */ package io.netty.channel.socket; +import static io.netty.channel.ChannelOption.*; +import io.netty.channel.ChannelException; +import io.netty.channel.ChannelOption; +import io.netty.channel.DefaultChannelConfig; + import java.net.Socket; import java.net.SocketException; - -import io.netty.channel.ChannelException; -import io.netty.channel.DefaultChannelConfig; -import io.netty.util.internal.ConversionUtil; +import java.util.Map; /** * The default {@link SocketChannelConfig} implementation. @@ -41,28 +43,61 @@ public class DefaultSocketChannelConfig extends DefaultChannelConfig } @Override - public boolean setOption(String key, Object value) { - if (super.setOption(key, value)) { - return true; + public Map, Object> getOptions() { + return getOptions( + super.getOptions(), + SO_RCVBUF, SO_SNDBUF, TCP_NODELAY, SO_KEEPALIVE, SO_REUSEADDR, SO_LINGER, IP_TOS); + } + + @Override + public T getOption(ChannelOption option) { + if (option == SO_RCVBUF) { + return (T) Integer.valueOf(getReceiveBufferSize()); + } + if (option == SO_SNDBUF) { + return (T) Integer.valueOf(getSendBufferSize()); + } + if (option == TCP_NODELAY) { + return (T) Boolean.valueOf(isTcpNoDelay()); + } + if (option == SO_KEEPALIVE) { + return (T) Boolean.valueOf(isKeepAlive()); + } + if (option == SO_REUSEADDR) { + return (T) Boolean.valueOf(isReuseAddress()); + } + if (option == SO_LINGER) { + return (T) Integer.valueOf(getSoLinger()); + } + if (option == IP_TOS) { + return (T) Integer.valueOf(getTrafficClass()); } - if (key.equals("receiveBufferSize")) { - setReceiveBufferSize(ConversionUtil.toInt(value)); - } else if (key.equals("sendBufferSize")) { - setSendBufferSize(ConversionUtil.toInt(value)); - } else if (key.equals("tcpNoDelay")) { - setTcpNoDelay(ConversionUtil.toBoolean(value)); - } else if (key.equals("keepAlive")) { - setKeepAlive(ConversionUtil.toBoolean(value)); - } else if (key.equals("reuseAddress")) { - setReuseAddress(ConversionUtil.toBoolean(value)); - } else if (key.equals("soLinger")) { - setSoLinger(ConversionUtil.toInt(value)); - } else if (key.equals("trafficClass")) { - setTrafficClass(ConversionUtil.toInt(value)); + return super.getOption(option); + } + + @Override + public boolean setOption(ChannelOption option, T value) { + validate(option, value); + + if (option == SO_RCVBUF) { + setReceiveBufferSize((Integer) value); + } else if (option == SO_SNDBUF) { + setSendBufferSize((Integer) value); + } else if (option == TCP_NODELAY) { + setTcpNoDelay((Boolean) value); + } else if (option == SO_KEEPALIVE) { + setKeepAlive((Boolean) value); + } else if (option == SO_REUSEADDR) { + setReuseAddress((Boolean) value); + } else if (option == SO_LINGER) { + setSoLinger((Integer) value); + } else if (option == IP_TOS) { + setTrafficClass((Integer) value); } else { - return false; + return super.setOption(option, value); } + return true; } diff --git a/transport/src/main/java/io/netty/channel/socket/SocketChannelConfig.java b/transport/src/main/java/io/netty/channel/socket/SocketChannelConfig.java index bfe216c100..8a2fb9d805 100644 --- a/transport/src/main/java/io/netty/channel/socket/SocketChannelConfig.java +++ b/transport/src/main/java/io/netty/channel/socket/SocketChannelConfig.java @@ -15,10 +15,10 @@ */ package io.netty.channel.socket; -import java.net.Socket; - import io.netty.channel.ChannelConfig; +import java.net.Socket; + /** * A {@link ChannelConfig} for a {@link SocketChannel}. * diff --git a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioChannel.java b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioChannel.java index 11a8304166..641f8c1760 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioChannel.java @@ -17,7 +17,7 @@ package io.netty.channel.socket.nio; import io.netty.channel.AbstractChannel; import io.netty.channel.Channel; -import io.netty.channel.ChannelException; +import io.netty.channel.EventLoop; import java.net.InetSocketAddress; import java.nio.channels.SelectableChannel; @@ -37,6 +37,11 @@ public abstract class AbstractNioChannel extends AbstractChannel { this.ch = ch; } + @Override + public SelectorEventLoop eventLoop() { + return (SelectorEventLoop) super.eventLoop(); + } + @Override protected SelectableChannel javaChannel() { return ch; @@ -78,15 +83,13 @@ public abstract class AbstractNioChannel extends AbstractChannel { } @Override - public abstract NioChannelConfig config(); + protected boolean isCompatible(EventLoop loop) { + return loop instanceof SelectorEventLoop; + } @Override protected void doRegister() throws Exception { - if (!(eventLoop() instanceof SelectorEventLoop)) { - throw new ChannelException("unsupported event loop: " + eventLoop().getClass().getName()); - } - - SelectorEventLoop loop = (SelectorEventLoop) eventLoop(); + SelectorEventLoop loop = eventLoop(); selectionKey = javaChannel().register(loop.selector, isActive()? SelectionKey.OP_READ : 0, this); } } diff --git a/transport/src/main/java/io/netty/channel/socket/nio/DefaultNioDatagramChannelConfig.java b/transport/src/main/java/io/netty/channel/socket/nio/DefaultNioDatagramChannelConfig.java index ba06b45222..87d0090bb5 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/DefaultNioDatagramChannelConfig.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/DefaultNioDatagramChannelConfig.java @@ -17,7 +17,6 @@ package io.netty.channel.socket.nio; import io.netty.channel.ChannelException; import io.netty.channel.socket.DefaultDatagramChannelConfig; -import io.netty.util.internal.ConversionUtil; import io.netty.util.internal.DetectionUtil; import java.lang.reflect.Method; @@ -27,8 +26,7 @@ import java.nio.channels.DatagramChannel; /** * The default {@link NioSocketChannelConfig} implementation. */ -class DefaultNioDatagramChannelConfig extends DefaultDatagramChannelConfig - implements NioDatagramChannelConfig { +class DefaultNioDatagramChannelConfig extends DefaultDatagramChannelConfig { private static final Object IP_MULTICAST_IF; private static final Method GET_OPTION; @@ -71,41 +69,12 @@ class DefaultNioDatagramChannelConfig extends DefaultDatagramChannelConfig } private final DatagramChannel channel; - private volatile int writeSpinCount = 16; DefaultNioDatagramChannelConfig(DatagramChannel channel) { super(channel.socket()); this.channel = channel; } - @Override - public boolean setOption(String key, Object value) { - if (super.setOption(key, value)) { - return true; - } - - if (key.equals("writeSpinCount")) { - setWriteSpinCount(ConversionUtil.toInt(value)); - } else { - return false; - } - return true; - } - - @Override - public int getWriteSpinCount() { - return writeSpinCount; - } - - @Override - public void setWriteSpinCount(int writeSpinCount) { - if (writeSpinCount <= 0) { - throw new IllegalArgumentException( - "writeSpinCount must be a positive integer."); - } - this.writeSpinCount = writeSpinCount; - } - @Override public void setNetworkInterface(NetworkInterface networkInterface) { if (DetectionUtil.javaVersion() < 7) { diff --git a/transport/src/main/java/io/netty/channel/socket/nio/DefaultNioSocketChannelConfig.java b/transport/src/main/java/io/netty/channel/socket/nio/DefaultNioSocketChannelConfig.java deleted file mode 100644 index e218b46242..0000000000 --- a/transport/src/main/java/io/netty/channel/socket/nio/DefaultNioSocketChannelConfig.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Copyright 2011 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package io.netty.channel.socket.nio; - -import io.netty.channel.socket.DefaultSocketChannelConfig; -import io.netty.util.internal.ConversionUtil; - -import java.net.Socket; - -/** - * The default {@link NioSocketChannelConfig} implementation. - */ -class DefaultNioSocketChannelConfig extends DefaultSocketChannelConfig - implements NioSocketChannelConfig { - - private volatile int writeSpinCount = 16; - - DefaultNioSocketChannelConfig(Socket socket) { - super(socket); - } - - @Override - public boolean setOption(String key, Object value) { - if (super.setOption(key, value)) { - return true; - } - - if (key.equals("writeSpinCount")) { - setWriteSpinCount(ConversionUtil.toInt(value)); - } else { - return false; - } - return true; - } - - @Override - public int getWriteSpinCount() { - return writeSpinCount; - } - - @Override - public void setWriteSpinCount(int writeSpinCount) { - if (writeSpinCount <= 0) { - throw new IllegalArgumentException( - "writeSpinCount must be a positive integer."); - } - this.writeSpinCount = writeSpinCount; - } -} diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioChannelConfig.java b/transport/src/main/java/io/netty/channel/socket/nio/NioChannelConfig.java deleted file mode 100644 index bdc54ffa39..0000000000 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioChannelConfig.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Copyright 2011 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package io.netty.channel.socket.nio; - -import io.netty.channel.ChannelConfig; - -import java.nio.ByteBuffer; -import java.nio.channels.WritableByteChannel; - -/** - * Special {@link ChannelConfig} sub-type which offers extra methods which are useful for NIO. - * - */ -public interface NioChannelConfig extends ChannelConfig { - /** - * Returns the maximum loop count for a write operation until - * {@link WritableByteChannel#write(ByteBuffer)} returns a non-zero value. - * It is similar to what a spin lock is used for in concurrency programming. - * It improves memory utilization and write throughput depending on - * the platform that JVM runs on. The default value is {@code 16}. - */ - int getWriteSpinCount(); - - /** - * Sets the maximum loop count for a write operation until - * {@link WritableByteChannel#write(ByteBuffer)} returns a non-zero value. - * It is similar to what a spin lock is used for in concurrency programming. - * It improves memory utilization and write throughput depending on - * the platform that JVM runs on. The default value is {@code 16}. - * - * @throws IllegalArgumentException - * if the specified value is {@code 0} or less than {@code 0} - */ - void setWriteSpinCount(int writeSpinCount); -} diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramChannelConfig.java b/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramChannelConfig.java deleted file mode 100644 index 486ba68815..0000000000 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramChannelConfig.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Copyright 2011 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package io.netty.channel.socket.nio; - -import io.netty.channel.ChannelConfig; -import io.netty.channel.socket.DatagramChannel; -import io.netty.channel.socket.DatagramChannelConfig; - -/** - * A {@link DatagramChannelConfig} for a NIO TCP/IP {@link DatagramChannel}. - * - *

Available options

- * - * In addition to the options provided by {@link ChannelConfig} and - * {@link DatagramChannelConfig}, {@link NioDatagramChannelConfig} allows the - * following options in the option map: - * - * - * - * - * - * - * - * - * - * - * - *
NameAssociated setter method
{@code "writeBufferHighWaterMark"}{@link #setWriteBufferHighWaterMark(int)}
{@code "writeBufferLowWaterMark"}{@link #setWriteBufferLowWaterMark(int)}
{@code "writeSpinCount"}{@link #setWriteSpinCount(int)}
- */ -public interface NioDatagramChannelConfig extends DatagramChannelConfig, NioChannelConfig { - - -} diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketChannel.java b/transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketChannel.java index 7e2cc6c1dc..2b2ebffae3 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketChannel.java @@ -17,6 +17,7 @@ package io.netty.channel.socket.nio; import io.netty.channel.AbstractServerChannel; import io.netty.channel.ChannelException; +import io.netty.channel.EventLoop; import io.netty.channel.socket.DefaultServerSocketChannelConfig; import io.netty.channel.socket.ServerSocketChannelConfig; import io.netty.logging.InternalLogger; @@ -73,6 +74,11 @@ public class NioServerSocketChannel extends AbstractServerChannel return config; } + @Override + public SelectorEventLoop eventLoop() { + return (SelectorEventLoop) super.eventLoop(); + } + @Override public boolean isActive() { return javaChannel().socket().isBound(); @@ -109,12 +115,13 @@ public class NioServerSocketChannel extends AbstractServerChannel } @Override - protected void doRegister() throws Exception { - if (!(eventLoop() instanceof SelectorEventLoop)) { - throw new ChannelException("unsupported event loop: " + eventLoop().getClass().getName()); - } + protected boolean isCompatible(EventLoop loop) { + return loop instanceof SelectorEventLoop; + } - SelectorEventLoop loop = (SelectorEventLoop) eventLoop(); + @Override + protected void doRegister() throws Exception { + SelectorEventLoop loop = eventLoop(); selectionKey = javaChannel().register( loop.selector, isActive()? SelectionKey.OP_ACCEPT : 0, this); } diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java b/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java index 796cc467af..a48b2d163b 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java @@ -21,12 +21,13 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelBufferHolder; import io.netty.channel.ChannelBufferHolders; import io.netty.channel.ChannelException; +import io.netty.channel.socket.DefaultSocketChannelConfig; +import io.netty.channel.socket.SocketChannelConfig; import io.netty.logging.InternalLogger; import io.netty.logging.InternalLoggerFactory; import java.io.IOException; import java.net.SocketAddress; -import java.nio.channels.AsynchronousCloseException; import java.nio.channels.SelectionKey; import java.nio.channels.SocketChannel; @@ -34,7 +35,7 @@ public class NioSocketChannel extends AbstractNioChannel implements io.netty.cha private static final InternalLogger logger = InternalLoggerFactory.getInstance(NioSocketChannel.class); - private final NioSocketChannelConfig config; + private final SocketChannelConfig config; private final ChannelBufferHolder out = ChannelBufferHolders.byteBuffer(ChannelBuffers.dynamicBuffer()); private static SocketChannel newSocket() { @@ -67,11 +68,11 @@ public class NioSocketChannel extends AbstractNioChannel implements io.netty.cha throw new ChannelException("Failed to enter non-blocking mode.", e); } - config = new DefaultNioSocketChannelConfig(socket.socket()); + config = new DefaultSocketChannelConfig(socket.socket()); } @Override - public NioSocketChannelConfig config() { + public SocketChannelConfig config() { return config; } @@ -151,6 +152,7 @@ public class NioSocketChannel extends AbstractNioChannel implements io.netty.cha @Override protected void doDeregister() throws Exception { selectionKey().cancel(); + eventLoop().cancelledKeys ++; } @Override @@ -167,7 +169,6 @@ public class NioSocketChannel extends AbstractNioChannel implements io.netty.cha return 0; } - boolean open = true; boolean addOpWrite = false; boolean removeOpWrite = false; final SocketChannel ch = javaChannel(); @@ -181,38 +182,26 @@ public class NioSocketChannel extends AbstractNioChannel implements io.netty.cha int localWrittenBytes = 0; int writtenBytes = 0; - try { - for (int i = writeSpinCount; i > 0; i --) { - localWrittenBytes = buf.readBytes(ch, bytesLeft); - if (localWrittenBytes > 0) { - bytesLeft -= localWrittenBytes; - if (bytesLeft <= 0) { - removeOpWrite = true; - break; - } - - writtenBytes += localWrittenBytes; - } else { - addOpWrite = true; + // FIXME: Spinning should be done by AbstractChannel. + for (int i = writeSpinCount; i > 0; i --) { + localWrittenBytes = buf.readBytes(ch, bytesLeft); + if (localWrittenBytes > 0) { + writtenBytes += localWrittenBytes; + bytesLeft -= localWrittenBytes; + if (bytesLeft <= 0) { + removeOpWrite = true; break; } - } - } catch (AsynchronousCloseException e) { - // Doesn't need a user attention - ignore. - } catch (Throwable t) { - if (t instanceof IOException) { - open = false; - selectionKey().cancel(); - ch.close(); + } else { + addOpWrite = true; + break; } } - if (open) { - if (addOpWrite) { - key.interestOps(interestOps | SelectionKey.OP_WRITE); - } else if (removeOpWrite) { - key.interestOps(interestOps & ~SelectionKey.OP_WRITE); - } + if (addOpWrite) { + key.interestOps(interestOps | SelectionKey.OP_WRITE); + } else if (removeOpWrite) { + key.interestOps(interestOps & ~SelectionKey.OP_WRITE); } return writtenBytes; diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannelConfig.java b/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannelConfig.java deleted file mode 100644 index 2f8da9a026..0000000000 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannelConfig.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Copyright 2011 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package io.netty.channel.socket.nio; - -import io.netty.channel.ChannelConfig; -import io.netty.channel.socket.SocketChannel; -import io.netty.channel.socket.SocketChannelConfig; - -/** - * A {@link SocketChannelConfig} for a NIO TCP/IP {@link SocketChannel}. - * - *

Available options

- * - * In addition to the options provided by {@link ChannelConfig} and - * {@link SocketChannelConfig}, {@link NioSocketChannelConfig} allows the - * following options in the option map: - * - * - * - * - * - * - * - *
NameAssociated setter method
{@code "writeSpinCount"}{@link #setWriteSpinCount(int)}
- */ -public interface NioSocketChannelConfig extends SocketChannelConfig, NioChannelConfig { - // This method does not provide a configuration property by itself. - // It just combined SocketChannelConfig and NioChannelConfig for user's sake. -} diff --git a/transport/src/main/java/io/netty/channel/socket/nio/SelectorEventLoop.java b/transport/src/main/java/io/netty/channel/socket/nio/SelectorEventLoop.java index 4ee5ff209b..e19608fdef 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/SelectorEventLoop.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/SelectorEventLoop.java @@ -16,6 +16,7 @@ package io.netty.channel.socket.nio; import io.netty.channel.Channel; +import io.netty.channel.Channel.Unsafe; import io.netty.channel.ChannelException; import io.netty.channel.EventLoopFactory; import io.netty.channel.SingleThreadEventLoop; @@ -66,8 +67,7 @@ public class SelectorEventLoop extends SingleThreadEventLoop { */ protected final AtomicBoolean wakenUp = new AtomicBoolean(); - // FIXME: It's not being increased by any channel implementations but we have to. - private volatile int cancelledKeys; // should use AtomicInteger but we just need approximation + int cancelledKeys; public SelectorEventLoop() { this(Executors.defaultThreadFactory()); @@ -186,31 +186,30 @@ public class SelectorEventLoop extends SingleThreadEventLoop { private void processSelectedKeys() throws IOException { for (Iterator i = selector.selectedKeys().iterator(); i.hasNext();) { - SelectionKey k = i.next(); - Channel ch = (Channel) k.attachment(); + final SelectionKey k = i.next(); + final Channel ch = (Channel) k.attachment(); + final Unsafe unsafe = ch.unsafe(); boolean removeKey = true; try { - int readyOps = k.readyOps(); if ((readyOps & SelectionKey.OP_READ) != 0 || readyOps == 0) { - ch.unsafe().read(); + unsafe.read(); if (!ch.isOpen()) { // Connection already closed - no need to handle write. continue; } } if ((readyOps & SelectionKey.OP_WRITE) != 0) { - ch.unsafe().flush(null); + unsafe.flush(unsafe.voidFuture()); } if ((readyOps & SelectionKey.OP_ACCEPT) != 0) { - ch.unsafe().read(); + unsafe.read(); } if ((readyOps & SelectionKey.OP_CONNECT) != 0) { - ch.unsafe().finishConnect(); + unsafe.finishConnect(); } - } catch (CancelledKeyException ignored) { - ch.unsafe().close(null); + unsafe.close(unsafe.voidFuture()); } finally { if (removeKey) { i.remove();