Implement flush-future properly / Make channel options type-safe

- AbstractChannel keeps the expected number of written bytes so that
  the ChannelFuture of a flush() operation is notified on right timing.
  - Added ChannelBufferHolder.size() to make this possible
- Added AbstractChannel.isCompatible() so that only compatible EventLoop
  is accepted by a channel on registration
- Added ChannelOption to make channel options type-safe
- Moved writeSpinCount property to ChannelConfig and removed Nio*Config
- Miscellaneous cleanup

introducing
ChannelOption
This commit is contained in:
Trustin Lee 2012-05-13 00:40:28 +09:00
parent 95f05ae215
commit 08137e2c49
21 changed files with 625 additions and 398 deletions

View File

@ -15,21 +15,25 @@
*/
package io.netty.channel.sctp;
import com.sun.nio.sctp.SctpChannel;
import static com.sun.nio.sctp.SctpStandardSocketOptions.*;
import static io.netty.channel.ChannelOption.*;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelOption;
import io.netty.channel.DefaultChannelConfig;
import io.netty.channel.socket.nio.NioSocketChannelConfig;
import io.netty.util.internal.ConversionUtil;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import com.sun.nio.sctp.SctpChannel;
import com.sun.nio.sctp.SctpStandardSocketOptions.InitMaxStreams;
/**
* The default {@link NioSocketChannelConfig} implementation for SCTP.
*/
class DefaultSctpChannelConfig extends DefaultChannelConfig implements SctpChannelConfig {
private SctpChannel channel;
private final SctpChannel channel;
DefaultSctpChannelConfig(SctpChannel channel) {
if (channel == null) {
@ -39,30 +43,63 @@ class DefaultSctpChannelConfig extends DefaultChannelConfig implements SctpChann
}
@Override
public boolean setOption(String key, Object value) {
if (super.setOption(key, value)) {
return true;
public Map<ChannelOption<?>, Object> getOptions() {
// TODO: Investigate if other SCTP options such as SCTP_PRIMARY_ADDR can be exposed.
return getOptions(super.getOptions(), SO_RCVBUF, SO_SNDBUF, SCTP_NODELAY, SCTP_INIT_MAXSTREAMS);
}
@Override
public <T> T getOption(ChannelOption<T> option) {
if (option == SO_RCVBUF) {
return (T) Integer.valueOf(getReceiveBufferSize());
}
if (option == SO_SNDBUF) {
return (T) Integer.valueOf(getSendBufferSize());
}
if (option == SCTP_NODELAY) {
return (T) Boolean.valueOf(isSctpNoDelay());
}
if (option == SCTP_INIT_MAXSTREAMS) {
InitMaxStreams ims = getInitMaxStreams();
if (ims == null) {
return null;
}
List<Integer> values = new ArrayList<Integer>(2);
values.add(ims.maxInStreams());
values.add(ims.maxOutStreams());
@SuppressWarnings("unchecked")
T ret = (T) values;
return ret;
}
if (key.equals("receiveBufferSize")) {
setReceiveBufferSize(ConversionUtil.toInt(value));
} else if (key.equals("sendBufferSize")) {
setSendBufferSize(ConversionUtil.toInt(value));
} else if (key.equals("sctpNoDelay")) {
setSctpNoDelay(ConversionUtil.toBoolean(value));
} else if (key.equals("sctpInitMaxStreams")) {
final Integer maxInOutStreams = ConversionUtil.toInt(value);
setInitMaxStreams(InitMaxStreams.create(maxInOutStreams, maxInOutStreams));
return super.getOption(option);
}
@Override
public <T> boolean setOption(ChannelOption<T> option, T value) {
validate(option, value);
if (option == SO_RCVBUF) {
setReceiveBufferSize((Integer) value);
} else if (option == SO_SNDBUF) {
setSendBufferSize((Integer) value);
} else if (option == SCTP_NODELAY) {
setSctpNoDelay((Boolean) value);
} else if (option == SCTP_INIT_MAXSTREAMS) {
@SuppressWarnings("unchecked")
List<Integer> values = (List<Integer>) value;
setInitMaxStreams(InitMaxStreams.create(values.get(0), values.get(1)));
} else {
return false;
return super.setOption(option, value);
}
return true;
}
@Override
public boolean isSctpNoDelay() {
try {
return channel.getOption(SCTP_NODELAY);
return channel.getOption(com.sun.nio.sctp.SctpStandardSocketOptions.SCTP_NODELAY);
} catch (IOException e) {
throw new ChannelException(e);
}
@ -71,7 +108,7 @@ class DefaultSctpChannelConfig extends DefaultChannelConfig implements SctpChann
@Override
public void setSctpNoDelay(boolean sctpNoDelay) {
try {
channel.setOption(SCTP_NODELAY, sctpNoDelay);
channel.setOption(com.sun.nio.sctp.SctpStandardSocketOptions.SCTP_NODELAY, sctpNoDelay);
} catch (IOException e) {
throw new ChannelException(e);
}
@ -80,7 +117,7 @@ class DefaultSctpChannelConfig extends DefaultChannelConfig implements SctpChann
@Override
public int getSendBufferSize() {
try {
return channel.getOption(SO_SNDBUF);
return channel.getOption(com.sun.nio.sctp.SctpStandardSocketOptions.SO_SNDBUF);
} catch (IOException e) {
throw new ChannelException(e);
}
@ -89,7 +126,7 @@ class DefaultSctpChannelConfig extends DefaultChannelConfig implements SctpChann
@Override
public void setSendBufferSize(int sendBufferSize) {
try {
channel.setOption(SO_SNDBUF, sendBufferSize);
channel.setOption(com.sun.nio.sctp.SctpStandardSocketOptions.SO_SNDBUF, sendBufferSize);
} catch (IOException e) {
throw new ChannelException(e);
}
@ -98,7 +135,7 @@ class DefaultSctpChannelConfig extends DefaultChannelConfig implements SctpChann
@Override
public int getReceiveBufferSize() {
try {
return channel.getOption(SO_RCVBUF);
return channel.getOption(com.sun.nio.sctp.SctpStandardSocketOptions.SO_RCVBUF);
} catch (IOException e) {
throw new ChannelException(e);
}
@ -107,7 +144,7 @@ class DefaultSctpChannelConfig extends DefaultChannelConfig implements SctpChann
@Override
public void setReceiveBufferSize(int receiveBufferSize) {
try {
channel.setOption(SO_RCVBUF, receiveBufferSize);
channel.setOption(com.sun.nio.sctp.SctpStandardSocketOptions.SO_RCVBUF, receiveBufferSize);
} catch (IOException e) {
throw new ChannelException(e);
}
@ -116,7 +153,7 @@ class DefaultSctpChannelConfig extends DefaultChannelConfig implements SctpChann
@Override
public InitMaxStreams getInitMaxStreams() {
try {
return channel.getOption(SCTP_INIT_MAXSTREAMS);
return channel.getOption(com.sun.nio.sctp.SctpStandardSocketOptions.SCTP_INIT_MAXSTREAMS);
} catch (IOException e) {
throw new ChannelException(e);
}
@ -125,7 +162,7 @@ class DefaultSctpChannelConfig extends DefaultChannelConfig implements SctpChann
@Override
public void setInitMaxStreams(InitMaxStreams initMaxStreams) {
try {
channel.setOption(SCTP_INIT_MAXSTREAMS, initMaxStreams);
channel.setOption(com.sun.nio.sctp.SctpStandardSocketOptions.SCTP_INIT_MAXSTREAMS, initMaxStreams);
} catch (IOException e) {
throw new ChannelException(e);
}

View File

@ -90,7 +90,10 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
private ScheduledFuture<?> connectTimeoutFuture;
private ConnectException connectTimeoutException;
private long writtenAmount;
private long flushedAmount;
private FlushFutureEntry flushFuture;
private FlushFutureEntry lastFlushFuture;
private ClosedChannelException closedChannelException;
/** Cache for the string representation of this channel */
private boolean strValActive;
@ -425,6 +428,10 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
if (AbstractChannel.this.eventLoop != null) {
throw new IllegalStateException("registered to an event loop already");
}
if (!isCompatible(eventLoop)) {
throw new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName());
}
AbstractChannel.this.eventLoop = eventLoop;
assert eventLoop().inEventLoop();
@ -598,7 +605,13 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
future.setFailure(t);
}
if (closedChannelException != null) {
closedChannelException = new ClosedChannelException();
}
notifyFlushFutures(closedChannelException);
notifyClosureListeners();
if (wasActive && !isActive()) {
pipeline().fireChannelInactive();
}
@ -645,8 +658,8 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
public void read() {
assert eventLoop().inEventLoop();
long readAmount = 0;
boolean closed = false;
try {
boolean closed = false;
for (;;) {
int localReadAmount = doRead();
if (localReadAmount > 0) {
@ -665,41 +678,104 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
if (readAmount > 0) {
pipeline.fireInboundBufferUpdated();
}
if (closed) {
close(voidFuture());
}
} catch (Throwable t) {
pipeline().fireExceptionCaught(t);
if (t instanceof IOException) {
close(voidFuture());
}
} finally {
if (closed && isOpen()) {
close(voidFuture());
}
}
}
@Override
public void flush(final ChannelFuture future) {
// FIXME: Notify future properly using writtenAmount.
if (eventLoop().inEventLoop()) {
// Append flush future to the notification list.
if (future != voidFuture && !future.isDone()) {
FlushFutureEntry newEntry = new FlushFutureEntry(future, flushedAmount + out().size(), null);
if (flushFuture == null) {
flushFuture = lastFlushFuture = newEntry;
} else {
lastFlushFuture.next = newEntry;
lastFlushFuture = newEntry;
}
}
// Perform outbound I/O.
try {
writtenAmount += doFlush();
} catch (Exception e) {
future.setFailure(e);
flushedAmount += doFlush();
} catch (Throwable t) {
notifyFlushFutures(t);
pipeline().fireExceptionCaught(t);
if (t instanceof IOException) {
close(voidFuture());
}
} finally {
// Notify flush futures if necessary.
notifyFlushFutures();
if (!isActive()) {
close(voidFuture());
}
}
} else {
eventLoop().execute(new Runnable() {
@Override
public void run() {
try {
writtenAmount += doFlush();
} catch (Exception e) {
future.setFailure(e);
}
flush(future);
}
});
}
}
private void notifyFlushFutures() {
FlushFutureEntry e = flushFuture;
if (e == null) {
return;
}
final long flushedAmount = AbstractChannel.this.flushedAmount;
do {
if (e.expectedFlushedAmount > flushedAmount) {
break;
}
e.future.setSuccess();
e = e.next;
} while (e != null);
flushFuture = e;
// Avoid overflow
if (e == null) {
// Reset the counter if there's nothing in the notification list.
AbstractChannel.this.flushedAmount = 0;
} else if (flushedAmount >= 0x1000000000000000L) {
// Otherwise, reset the counter only when the counter grew pretty large
// so that we can reduce the cost of updating all entries in the notification list.
AbstractChannel.this.flushedAmount = 0;
do {
e.expectedFlushedAmount -= flushedAmount;
e = e.next;
} while (e != null);
}
}
private void notifyFlushFutures(Throwable cause) {
FlushFutureEntry e = flushFuture;
if (e == null) {
return;
}
do {
e.future.setFailure(cause);
e = e.next;
} while (e != null);
flushFuture = null;
}
private boolean ensureOpen(ChannelFuture future) {
if (isOpen()) {
return true;
@ -719,6 +795,20 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
}
}
private static class FlushFutureEntry {
private final ChannelFuture future;
private long expectedFlushedAmount;
private FlushFutureEntry next;
FlushFutureEntry(ChannelFuture future, long expectedWrittenAmount, FlushFutureEntry next) {
this.future = future;
expectedFlushedAmount = expectedWrittenAmount;
this.next = next;
}
}
protected abstract boolean isCompatible(EventLoop loop);
protected abstract java.nio.channels.Channel javaChannel();
protected abstract ChannelBufferHolder<Object> firstOut();

View File

@ -18,7 +18,6 @@ package io.netty.channel;
import io.netty.channel.socket.DatagramChannel;
import io.netty.channel.socket.ServerSocketChannel;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannelConfig;
import io.netty.util.AttributeMap;
import java.net.InetSocketAddress;

View File

@ -122,6 +122,22 @@ public final class ChannelBufferHolder<E> {
default:
throw new Error();
}
}
public int size() {
switch (bypassDirection) {
case 0:
if (hasMessageBuffer()) {
return messageBuffer().size();
} else {
return byteBuffer().readableBytes();
}
case 1:
return ctx.nextIn().size();
case 2:
return ctx.out().size();
default:
throw new Error();
}
}
}

View File

@ -16,8 +16,9 @@
package io.netty.channel;
import io.netty.channel.socket.SocketChannelConfig;
import io.netty.channel.socket.nio.NioChannelConfig;
import java.nio.ByteBuffer;
import java.nio.channels.WritableByteChannel;
import java.util.Map;
/**
@ -50,7 +51,7 @@ import java.util.Map;
* <p>
* More options are available in the sub-types of {@link ChannelConfig}. For
* example, you can configure the parameters which are specific to a TCP/IP
* socket as explained in {@link SocketChannelConfig} or {@link NioChannelConfig}.
* socket as explained in {@link SocketChannelConfig}.
*
* @apiviz.has io.netty.channel.ChannelPipelineFactory
* @apiviz.composedOf io.netty.channel.ReceiveBufferSizePredictor
@ -59,10 +60,14 @@ import java.util.Map;
*/
public interface ChannelConfig {
Map<ChannelOption<?>, Object> getOptions();
/**
* Sets the configuration properties from the specified {@link Map}.
*/
void setOptions(Map<String, Object> options);
boolean setOptions(Map<ChannelOption<?>, ?> options);
<T> T getOption(ChannelOption<T> option);
/**
* Sets a configuration property with the specified name and value.
@ -84,7 +89,7 @@ public interface ChannelConfig {
*
* @return {@code true} if and only if the property has been set
*/
boolean setOption(String name, Object value);
<T> boolean setOption(ChannelOption<T> option, T value);
/**
* Returns the connect timeout of the channel in milliseconds. If the
@ -104,4 +109,25 @@ public interface ChannelConfig {
* {@code 0} to disable.
*/
void setConnectTimeoutMillis(int connectTimeoutMillis);
/**
* Returns the maximum loop count for a write operation until
* {@link WritableByteChannel#write(ByteBuffer)} returns a non-zero value.
* It is similar to what a spin lock is used for in concurrency programming.
* It improves memory utilization and write throughput depending on
* the platform that JVM runs on. The default value is {@code 16}.
*/
int getWriteSpinCount();
/**
* Sets the maximum loop count for a write operation until
* {@link WritableByteChannel#write(ByteBuffer)} returns a non-zero value.
* It is similar to what a spin lock is used for in concurrency programming.
* It improves memory utilization and write throughput depending on
* the platform that JVM runs on. The default value is {@code 16}.
*
* @throws IllegalArgumentException
* if the specified value is {@code 0} or less than {@code 0}
*/
void setWriteSpinCount(int writeSpinCount);
}

View File

@ -15,10 +15,10 @@
*/
package io.netty.channel;
import java.util.concurrent.TimeUnit;
import io.netty.bootstrap.ClientBootstrap;
import java.util.concurrent.TimeUnit;
/**
* The result of an asynchronous {@link Channel} I/O operation.
* <p>

View File

@ -0,0 +1,122 @@
package io.netty.channel;
import java.net.InetAddress;
import java.net.NetworkInterface;
import java.net.SocketAddress;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
public class ChannelOption<T> implements Comparable<ChannelOption<T>> {
public static final ChannelOption<Integer> CONNECT_TIMEOUT_MILLIS =
new ChannelOption<Integer>("CONNECT_TIMEOUT_MILLIS", Integer.class);
public static final ChannelOption<Integer> WRITE_SPIN_COUNT =
new ChannelOption<Integer>("WRITE_SPIN_COUNT", Integer.class);
public static final ChannelOption<Boolean> SO_BROADCAST =
new ChannelOption<Boolean>("SO_BROADCAST", Boolean.class);
public static final ChannelOption<Boolean> SO_KEEPALIVE =
new ChannelOption<Boolean>("SO_KEEPALIVE", Boolean.class);
public static final ChannelOption<Integer> SO_SNDBUF =
new ChannelOption<Integer>("SO_SNDBUF", Integer.class);
public static final ChannelOption<Integer> SO_RCVBUF =
new ChannelOption<Integer>("SO_RCVBUF", Integer.class);
public static final ChannelOption<Boolean> SO_REUSEADDR =
new ChannelOption<Boolean>("SO_REUSEADDR", Boolean.class);
public static final ChannelOption<Integer> SO_LINGER =
new ChannelOption<Integer>("SO_LINGER", Integer.class);
public static final ChannelOption<Integer> SO_BACKLOG =
new ChannelOption<Integer>("SO_BACKLOG", Integer.class);
public static final ChannelOption<Integer> IP_TOS =
new ChannelOption<Integer>("IP_TOS", Integer.class);
public static final ChannelOption<InetAddress> IP_MULTICAST_ADDR =
new ChannelOption<InetAddress>("IP_MULTICAST_ADDR", InetAddress.class);
public static final ChannelOption<NetworkInterface> IP_MULTICAST_IF =
new ChannelOption<NetworkInterface>("IP_MULTICAST_IF", NetworkInterface.class);
public static final ChannelOption<Integer> IP_MULTICAST_TTL =
new ChannelOption<Integer>("IP_MULTICAST_TTL", Integer.class);
public static final ChannelOption<Boolean> IP_MULTICAST_LOOP_DISABLED =
new ChannelOption<Boolean>("IP_MULTICAST_LOOP_DISABLED", Boolean.class);
public static final ChannelOption<Boolean> TCP_NODELAY =
new ChannelOption<Boolean>("TCP_NODELAY", Boolean.class);
public static final ChannelOption<Boolean> SCTP_DISABLE_FRAGMENTS =
new ChannelOption<Boolean>("SCTP_DISABLE_FRAGMENTS", Boolean.class);
public static final ChannelOption<Boolean> SCTP_EXPLICIT_COMPLETE =
new ChannelOption<Boolean>("SCTP_EXPLICIT_COMPLETE", Boolean.class);
public static final ChannelOption<Integer> SCTP_FRAGMENT_INTERLEAVE =
new ChannelOption<Integer>("SCTP_FRAGMENT_INTERLEAVE", Integer.class);
@SuppressWarnings("unchecked")
public static final ChannelOption<List<Integer>> SCTP_INIT_MAXSTREAMS =
new ChannelOption<List<Integer>>("SCTP_INIT_MAXSTREAMS", (Class<List<Integer>>)(Class<?>) List.class) {
@Override
public void validate(List<Integer> value) {
super.validate(value);
if (value.size() != 2) {
throw new IllegalArgumentException("value must be a List of 2 Integers: " + value);
}
if (value.get(0) == null) {
throw new NullPointerException("value[0]");
}
if (value.get(1) == null) {
throw new NullPointerException("value[1]");
}
}
};
public static final ChannelOption<Boolean> SCTP_NODELAY =
new ChannelOption<Boolean>("SCTP_NODELAY", Boolean.class);
public static final ChannelOption<SocketAddress> SCTP_PRIMARY_ADDR =
new ChannelOption<SocketAddress>("SCTP_PRIMARY_ADDR", SocketAddress.class);
public static final ChannelOption<SocketAddress> SCTP_SET_PEER_PRIMARY_ADDR =
new ChannelOption<SocketAddress>("SCTP_SET_PEER_PRIMARY_ADDR", SocketAddress.class);
private static final ConcurrentMap<String, Boolean> names = new ConcurrentHashMap<String, Boolean>();
private final String name;
private final Class<T> valueType;
private final String strVal;
public ChannelOption(String name, Class<T> valueType) {
if (name == null) {
throw new NullPointerException("name");
}
if (valueType == null) {
throw new NullPointerException("valueType");
}
if (names.putIfAbsent(name, Boolean.TRUE) != null) {
throw new IllegalArgumentException("option name already in use: " + name);
}
this.name = name;
this.valueType = valueType;
strVal = name + '[' + valueType.getSimpleName() + ']';
}
public String name() {
return name;
}
public Class<T> valueType() {
return valueType;
}
public void validate(T value) {
if (value == null) {
throw new NullPointerException("value");
}
}
@Override
public int compareTo(ChannelOption<T> o) {
return name().compareTo(o.name());
}
@Override
public String toString() {
return strVal;
}
}

View File

@ -15,9 +15,10 @@
*/
package io.netty.channel;
import static io.netty.channel.ChannelOption.*;
import io.netty.channel.socket.SocketChannelConfig;
import io.netty.util.internal.ConversionUtil;
import java.util.IdentityHashMap;
import java.util.Map;
import java.util.Map.Entry;
@ -29,24 +30,76 @@ public class DefaultChannelConfig implements ChannelConfig {
private static final int DEFAULT_CONNECT_TIMEOUT = 30000;
private volatile int connectTimeoutMillis = DEFAULT_CONNECT_TIMEOUT;
private volatile int writeSpinCount = 16;
@Override
public void setOptions(Map<String, Object> options) {
for (Entry<String, Object> e: options.entrySet()) {
setOption(e.getKey(), e.getValue());
public Map<ChannelOption<?>, Object> getOptions() {
return getOptions(null, CONNECT_TIMEOUT_MILLIS, WRITE_SPIN_COUNT);
}
protected Map<ChannelOption<?>, Object> getOptions(Map<ChannelOption<?>, Object> result, ChannelOption<?>... options) {
if (result == null) {
result = new IdentityHashMap<ChannelOption<?>, Object>();
}
for (ChannelOption<?> o: options) {
result.put(o, getOption(o));
}
return result;
}
@Override
public boolean setOption(String key, Object value) {
if ("connectTimeoutMillis".equals(key)) {
setConnectTimeoutMillis(ConversionUtil.toInt(value));
public boolean setOptions(Map<ChannelOption<?>, ?> options) {
if (options == null) {
throw new NullPointerException("options");
}
boolean setAllOptions = true;
for (Entry<ChannelOption<?>, ?> e: options.entrySet()) {
if (!setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {
setAllOptions = false;
}
}
return setAllOptions;
}
@Override
public <T> T getOption(ChannelOption<T> option) {
if (option == null) {
throw new NullPointerException("option");
}
if (option == CONNECT_TIMEOUT_MILLIS) {
return (T) Integer.valueOf(getConnectTimeoutMillis());
} else if (option == WRITE_SPIN_COUNT) {
return (T) Integer.valueOf(getWriteSpinCount());
}
return null;
}
@Override
public <T> boolean setOption(ChannelOption<T> option, T value) {
validate(option, value);
if (option == CONNECT_TIMEOUT_MILLIS) {
setConnectTimeoutMillis((Integer) value);
} else if (option == WRITE_SPIN_COUNT) {
setWriteSpinCount((Integer) value);
} else {
return false;
}
return true;
}
protected <T> void validate(ChannelOption<T> option, T value) {
if (option == null) {
throw new NullPointerException("option");
}
option.validate(value);
}
@Override
public int getConnectTimeoutMillis() {
return connectTimeoutMillis;
@ -60,4 +113,18 @@ public class DefaultChannelConfig implements ChannelConfig {
}
this.connectTimeoutMillis = connectTimeoutMillis;
}
@Override
public int getWriteSpinCount() {
return writeSpinCount;
}
@Override
public void setWriteSpinCount(int writeSpinCount) {
if (writeSpinCount <= 0) {
throw new IllegalArgumentException(
"writeSpinCount must be a positive integer.");
}
this.writeSpinCount = writeSpinCount;
}
}

View File

@ -15,9 +15,10 @@
*/
package io.netty.channel.socket;
import static io.netty.channel.ChannelOption.*;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelOption;
import io.netty.channel.DefaultChannelConfig;
import io.netty.util.internal.ConversionUtil;
import java.io.IOException;
import java.net.DatagramSocket;
@ -25,12 +26,12 @@ import java.net.InetAddress;
import java.net.MulticastSocket;
import java.net.NetworkInterface;
import java.net.SocketException;
import java.util.Map;
/**
* The default {@link DatagramChannelConfig} implementation.
*/
public class DefaultDatagramChannelConfig extends DefaultChannelConfig
implements DatagramChannelConfig {
public class DefaultDatagramChannelConfig extends DefaultChannelConfig implements DatagramChannelConfig {
private final DatagramSocket socket;
@ -45,32 +46,76 @@ public class DefaultDatagramChannelConfig extends DefaultChannelConfig
}
@Override
public boolean setOption(String key, Object value) {
if (super.setOption(key, value)) {
return true;
public Map<ChannelOption<?>, Object> getOptions() {
return getOptions(
super.getOptions(),
SO_BROADCAST, SO_RCVBUF, SO_SNDBUF, SO_REUSEADDR, IP_MULTICAST_LOOP_DISABLED,
IP_MULTICAST_ADDR, IP_MULTICAST_IF, IP_MULTICAST_TTL, IP_TOS);
}
@Override
public <T> T getOption(ChannelOption<T> option) {
if (option == SO_BROADCAST) {
return (T) Boolean.valueOf(isBroadcast());
}
if (option == SO_RCVBUF) {
return (T) Integer.valueOf(getReceiveBufferSize());
}
if (option == SO_SNDBUF) {
return (T) Integer.valueOf(getSendBufferSize());
}
if (option == SO_REUSEADDR) {
return (T) Boolean.valueOf(isReuseAddress());
}
if (option == IP_MULTICAST_LOOP_DISABLED) {
return (T) Boolean.valueOf(isLoopbackModeDisabled());
}
if (option == IP_MULTICAST_ADDR) {
@SuppressWarnings("unchecked")
T i = (T) getInterface();
return i;
}
if (option == IP_MULTICAST_IF) {
@SuppressWarnings("unchecked")
T i = (T) getNetworkInterface();
return i;
}
if (option == IP_MULTICAST_TTL) {
return (T) Integer.valueOf(getTimeToLive());
}
if (option == IP_TOS) {
return (T) Integer.valueOf(getTrafficClass());
}
if (key.equals("broadcast")) {
setBroadcast(ConversionUtil.toBoolean(value));
} else if (key.equals("receiveBufferSize")) {
setReceiveBufferSize(ConversionUtil.toInt(value));
} else if (key.equals("sendBufferSize")) {
setSendBufferSize(ConversionUtil.toInt(value));
} else if (key.equals("reuseAddress")) {
setReuseAddress(ConversionUtil.toBoolean(value));
} else if (key.equals("loopbackModeDisabled")) {
setLoopbackModeDisabled(ConversionUtil.toBoolean(value));
} else if (key.equals("interface")) {
return super.getOption(option);
}
@Override
public <T> boolean setOption(ChannelOption<T> option, T value) {
validate(option, value);
if (option == SO_BROADCAST) {
setBroadcast((Boolean) value);
} else if (option == SO_RCVBUF) {
setReceiveBufferSize((Integer) value);
} else if (option == SO_SNDBUF) {
setSendBufferSize((Integer) value);
} else if (option == SO_REUSEADDR) {
setReuseAddress((Boolean) value);
} else if (option == IP_MULTICAST_LOOP_DISABLED) {
setLoopbackModeDisabled((Boolean) value);
} else if (option == IP_MULTICAST_ADDR) {
setInterface((InetAddress) value);
} else if (key.equals("networkInterface")) {
} else if (option == IP_MULTICAST_IF) {
setNetworkInterface((NetworkInterface) value);
} else if (key.equals("timeToLive")) {
setTimeToLive(ConversionUtil.toInt(value));
} else if (key.equals("trafficClass")) {
setTrafficClass(ConversionUtil.toInt(value));
} else if (option == IP_MULTICAST_TTL) {
setTimeToLive((Integer) value);
} else if (option == IP_TOS) {
setTrafficClass((Integer) value);
} else {
return false;
return super.setOption(option, value);
}
return true;
}

View File

@ -15,12 +15,14 @@
*/
package io.netty.channel.socket;
import static io.netty.channel.ChannelOption.*;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelOption;
import io.netty.channel.DefaultChannelConfig;
import io.netty.util.internal.ConversionUtil;
import java.net.ServerSocket;
import java.net.SocketException;
import java.util.Map;
/**
* The default {@link ServerSocketChannelConfig} implementation.
@ -42,20 +44,39 @@ public class DefaultServerSocketChannelConfig extends DefaultChannelConfig
}
@Override
public boolean setOption(String key, Object value) {
if (super.setOption(key, value)) {
return true;
public Map<ChannelOption<?>, Object> getOptions() {
return getOptions(super.getOptions(), SO_RCVBUF, SO_REUSEADDR, SO_BACKLOG);
}
@Override
public <T> T getOption(ChannelOption<T> option) {
if (option == SO_RCVBUF) {
return (T) Integer.valueOf(getReceiveBufferSize());
}
if (option == SO_REUSEADDR) {
return (T) Boolean.valueOf(isReuseAddress());
}
if (option == SO_BACKLOG) {
return (T) Integer.valueOf(getBacklog());
}
if (key.equals("receiveBufferSize")) {
setReceiveBufferSize(ConversionUtil.toInt(value));
} else if (key.equals("reuseAddress")) {
setReuseAddress(ConversionUtil.toBoolean(value));
} else if (key.equals("backlog")) {
setBacklog(ConversionUtil.toInt(value));
return super.getOption(option);
}
@Override
public <T> boolean setOption(ChannelOption<T> option, T value) {
validate(option, value);
if (option == SO_RCVBUF) {
setReceiveBufferSize((Integer) value);
} else if (option == SO_REUSEADDR) {
setReuseAddress((Boolean) value);
} else if (option == SO_BACKLOG) {
setBacklog((Integer) value);
} else {
return false;
return super.setOption(option, value);
}
return true;
}

View File

@ -15,12 +15,14 @@
*/
package io.netty.channel.socket;
import static io.netty.channel.ChannelOption.*;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelOption;
import io.netty.channel.DefaultChannelConfig;
import java.net.Socket;
import java.net.SocketException;
import io.netty.channel.ChannelException;
import io.netty.channel.DefaultChannelConfig;
import io.netty.util.internal.ConversionUtil;
import java.util.Map;
/**
* The default {@link SocketChannelConfig} implementation.
@ -41,28 +43,61 @@ public class DefaultSocketChannelConfig extends DefaultChannelConfig
}
@Override
public boolean setOption(String key, Object value) {
if (super.setOption(key, value)) {
return true;
public Map<ChannelOption<?>, Object> getOptions() {
return getOptions(
super.getOptions(),
SO_RCVBUF, SO_SNDBUF, TCP_NODELAY, SO_KEEPALIVE, SO_REUSEADDR, SO_LINGER, IP_TOS);
}
@Override
public <T> T getOption(ChannelOption<T> option) {
if (option == SO_RCVBUF) {
return (T) Integer.valueOf(getReceiveBufferSize());
}
if (option == SO_SNDBUF) {
return (T) Integer.valueOf(getSendBufferSize());
}
if (option == TCP_NODELAY) {
return (T) Boolean.valueOf(isTcpNoDelay());
}
if (option == SO_KEEPALIVE) {
return (T) Boolean.valueOf(isKeepAlive());
}
if (option == SO_REUSEADDR) {
return (T) Boolean.valueOf(isReuseAddress());
}
if (option == SO_LINGER) {
return (T) Integer.valueOf(getSoLinger());
}
if (option == IP_TOS) {
return (T) Integer.valueOf(getTrafficClass());
}
if (key.equals("receiveBufferSize")) {
setReceiveBufferSize(ConversionUtil.toInt(value));
} else if (key.equals("sendBufferSize")) {
setSendBufferSize(ConversionUtil.toInt(value));
} else if (key.equals("tcpNoDelay")) {
setTcpNoDelay(ConversionUtil.toBoolean(value));
} else if (key.equals("keepAlive")) {
setKeepAlive(ConversionUtil.toBoolean(value));
} else if (key.equals("reuseAddress")) {
setReuseAddress(ConversionUtil.toBoolean(value));
} else if (key.equals("soLinger")) {
setSoLinger(ConversionUtil.toInt(value));
} else if (key.equals("trafficClass")) {
setTrafficClass(ConversionUtil.toInt(value));
return super.getOption(option);
}
@Override
public <T> boolean setOption(ChannelOption<T> option, T value) {
validate(option, value);
if (option == SO_RCVBUF) {
setReceiveBufferSize((Integer) value);
} else if (option == SO_SNDBUF) {
setSendBufferSize((Integer) value);
} else if (option == TCP_NODELAY) {
setTcpNoDelay((Boolean) value);
} else if (option == SO_KEEPALIVE) {
setKeepAlive((Boolean) value);
} else if (option == SO_REUSEADDR) {
setReuseAddress((Boolean) value);
} else if (option == SO_LINGER) {
setSoLinger((Integer) value);
} else if (option == IP_TOS) {
setTrafficClass((Integer) value);
} else {
return false;
return super.setOption(option, value);
}
return true;
}

View File

@ -15,10 +15,10 @@
*/
package io.netty.channel.socket;
import java.net.Socket;
import io.netty.channel.ChannelConfig;
import java.net.Socket;
/**
* A {@link ChannelConfig} for a {@link SocketChannel}.
*

View File

@ -17,7 +17,7 @@ package io.netty.channel.socket.nio;
import io.netty.channel.AbstractChannel;
import io.netty.channel.Channel;
import io.netty.channel.ChannelException;
import io.netty.channel.EventLoop;
import java.net.InetSocketAddress;
import java.nio.channels.SelectableChannel;
@ -37,6 +37,11 @@ public abstract class AbstractNioChannel extends AbstractChannel {
this.ch = ch;
}
@Override
public SelectorEventLoop eventLoop() {
return (SelectorEventLoop) super.eventLoop();
}
@Override
protected SelectableChannel javaChannel() {
return ch;
@ -78,15 +83,13 @@ public abstract class AbstractNioChannel extends AbstractChannel {
}
@Override
public abstract NioChannelConfig config();
protected boolean isCompatible(EventLoop loop) {
return loop instanceof SelectorEventLoop;
}
@Override
protected void doRegister() throws Exception {
if (!(eventLoop() instanceof SelectorEventLoop)) {
throw new ChannelException("unsupported event loop: " + eventLoop().getClass().getName());
}
SelectorEventLoop loop = (SelectorEventLoop) eventLoop();
SelectorEventLoop loop = eventLoop();
selectionKey = javaChannel().register(loop.selector, isActive()? SelectionKey.OP_READ : 0, this);
}
}

View File

@ -17,7 +17,6 @@ package io.netty.channel.socket.nio;
import io.netty.channel.ChannelException;
import io.netty.channel.socket.DefaultDatagramChannelConfig;
import io.netty.util.internal.ConversionUtil;
import io.netty.util.internal.DetectionUtil;
import java.lang.reflect.Method;
@ -27,8 +26,7 @@ import java.nio.channels.DatagramChannel;
/**
* The default {@link NioSocketChannelConfig} implementation.
*/
class DefaultNioDatagramChannelConfig extends DefaultDatagramChannelConfig
implements NioDatagramChannelConfig {
class DefaultNioDatagramChannelConfig extends DefaultDatagramChannelConfig {
private static final Object IP_MULTICAST_IF;
private static final Method GET_OPTION;
@ -71,41 +69,12 @@ class DefaultNioDatagramChannelConfig extends DefaultDatagramChannelConfig
}
private final DatagramChannel channel;
private volatile int writeSpinCount = 16;
DefaultNioDatagramChannelConfig(DatagramChannel channel) {
super(channel.socket());
this.channel = channel;
}
@Override
public boolean setOption(String key, Object value) {
if (super.setOption(key, value)) {
return true;
}
if (key.equals("writeSpinCount")) {
setWriteSpinCount(ConversionUtil.toInt(value));
} else {
return false;
}
return true;
}
@Override
public int getWriteSpinCount() {
return writeSpinCount;
}
@Override
public void setWriteSpinCount(int writeSpinCount) {
if (writeSpinCount <= 0) {
throw new IllegalArgumentException(
"writeSpinCount must be a positive integer.");
}
this.writeSpinCount = writeSpinCount;
}
@Override
public void setNetworkInterface(NetworkInterface networkInterface) {
if (DetectionUtil.javaVersion() < 7) {

View File

@ -1,62 +0,0 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.socket.nio;
import io.netty.channel.socket.DefaultSocketChannelConfig;
import io.netty.util.internal.ConversionUtil;
import java.net.Socket;
/**
* The default {@link NioSocketChannelConfig} implementation.
*/
class DefaultNioSocketChannelConfig extends DefaultSocketChannelConfig
implements NioSocketChannelConfig {
private volatile int writeSpinCount = 16;
DefaultNioSocketChannelConfig(Socket socket) {
super(socket);
}
@Override
public boolean setOption(String key, Object value) {
if (super.setOption(key, value)) {
return true;
}
if (key.equals("writeSpinCount")) {
setWriteSpinCount(ConversionUtil.toInt(value));
} else {
return false;
}
return true;
}
@Override
public int getWriteSpinCount() {
return writeSpinCount;
}
@Override
public void setWriteSpinCount(int writeSpinCount) {
if (writeSpinCount <= 0) {
throw new IllegalArgumentException(
"writeSpinCount must be a positive integer.");
}
this.writeSpinCount = writeSpinCount;
}
}

View File

@ -1,48 +0,0 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.socket.nio;
import io.netty.channel.ChannelConfig;
import java.nio.ByteBuffer;
import java.nio.channels.WritableByteChannel;
/**
* Special {@link ChannelConfig} sub-type which offers extra methods which are useful for NIO.
*
*/
public interface NioChannelConfig extends ChannelConfig {
/**
* Returns the maximum loop count for a write operation until
* {@link WritableByteChannel#write(ByteBuffer)} returns a non-zero value.
* It is similar to what a spin lock is used for in concurrency programming.
* It improves memory utilization and write throughput depending on
* the platform that JVM runs on. The default value is {@code 16}.
*/
int getWriteSpinCount();
/**
* Sets the maximum loop count for a write operation until
* {@link WritableByteChannel#write(ByteBuffer)} returns a non-zero value.
* It is similar to what a spin lock is used for in concurrency programming.
* It improves memory utilization and write throughput depending on
* the platform that JVM runs on. The default value is {@code 16}.
*
* @throws IllegalArgumentException
* if the specified value is {@code 0} or less than {@code 0}
*/
void setWriteSpinCount(int writeSpinCount);
}

View File

@ -1,46 +0,0 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.socket.nio;
import io.netty.channel.ChannelConfig;
import io.netty.channel.socket.DatagramChannel;
import io.netty.channel.socket.DatagramChannelConfig;
/**
* A {@link DatagramChannelConfig} for a NIO TCP/IP {@link DatagramChannel}.
*
* <h3>Available options</h3>
*
* In addition to the options provided by {@link ChannelConfig} and
* {@link DatagramChannelConfig}, {@link NioDatagramChannelConfig} allows the
* following options in the option map:
*
* <table border="1" cellspacing="0" cellpadding="6">
* <tr>
* <th>Name</th><th>Associated setter method</th>
* </tr><tr>
* <td>{@code "writeBufferHighWaterMark"}</td><td>{@link #setWriteBufferHighWaterMark(int)}</td>
* </tr><tr>
* <td>{@code "writeBufferLowWaterMark"}</td><td>{@link #setWriteBufferLowWaterMark(int)}</td>
* </tr><tr>
* <td>{@code "writeSpinCount"}</td><td>{@link #setWriteSpinCount(int)}</td>
* </tr><tr>
* </table>
*/
public interface NioDatagramChannelConfig extends DatagramChannelConfig, NioChannelConfig {
}

View File

@ -17,6 +17,7 @@ package io.netty.channel.socket.nio;
import io.netty.channel.AbstractServerChannel;
import io.netty.channel.ChannelException;
import io.netty.channel.EventLoop;
import io.netty.channel.socket.DefaultServerSocketChannelConfig;
import io.netty.channel.socket.ServerSocketChannelConfig;
import io.netty.logging.InternalLogger;
@ -73,6 +74,11 @@ public class NioServerSocketChannel extends AbstractServerChannel
return config;
}
@Override
public SelectorEventLoop eventLoop() {
return (SelectorEventLoop) super.eventLoop();
}
@Override
public boolean isActive() {
return javaChannel().socket().isBound();
@ -109,12 +115,13 @@ public class NioServerSocketChannel extends AbstractServerChannel
}
@Override
protected void doRegister() throws Exception {
if (!(eventLoop() instanceof SelectorEventLoop)) {
throw new ChannelException("unsupported event loop: " + eventLoop().getClass().getName());
}
protected boolean isCompatible(EventLoop loop) {
return loop instanceof SelectorEventLoop;
}
SelectorEventLoop loop = (SelectorEventLoop) eventLoop();
@Override
protected void doRegister() throws Exception {
SelectorEventLoop loop = eventLoop();
selectionKey = javaChannel().register(
loop.selector, isActive()? SelectionKey.OP_ACCEPT : 0, this);
}

View File

@ -21,12 +21,13 @@ import io.netty.channel.Channel;
import io.netty.channel.ChannelBufferHolder;
import io.netty.channel.ChannelBufferHolders;
import io.netty.channel.ChannelException;
import io.netty.channel.socket.DefaultSocketChannelConfig;
import io.netty.channel.socket.SocketChannelConfig;
import io.netty.logging.InternalLogger;
import io.netty.logging.InternalLoggerFactory;
import java.io.IOException;
import java.net.SocketAddress;
import java.nio.channels.AsynchronousCloseException;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
@ -34,7 +35,7 @@ public class NioSocketChannel extends AbstractNioChannel implements io.netty.cha
private static final InternalLogger logger = InternalLoggerFactory.getInstance(NioSocketChannel.class);
private final NioSocketChannelConfig config;
private final SocketChannelConfig config;
private final ChannelBufferHolder<?> out = ChannelBufferHolders.byteBuffer(ChannelBuffers.dynamicBuffer());
private static SocketChannel newSocket() {
@ -67,11 +68,11 @@ public class NioSocketChannel extends AbstractNioChannel implements io.netty.cha
throw new ChannelException("Failed to enter non-blocking mode.", e);
}
config = new DefaultNioSocketChannelConfig(socket.socket());
config = new DefaultSocketChannelConfig(socket.socket());
}
@Override
public NioSocketChannelConfig config() {
public SocketChannelConfig config() {
return config;
}
@ -151,6 +152,7 @@ public class NioSocketChannel extends AbstractNioChannel implements io.netty.cha
@Override
protected void doDeregister() throws Exception {
selectionKey().cancel();
eventLoop().cancelledKeys ++;
}
@Override
@ -167,7 +169,6 @@ public class NioSocketChannel extends AbstractNioChannel implements io.netty.cha
return 0;
}
boolean open = true;
boolean addOpWrite = false;
boolean removeOpWrite = false;
final SocketChannel ch = javaChannel();
@ -181,38 +182,26 @@ public class NioSocketChannel extends AbstractNioChannel implements io.netty.cha
int localWrittenBytes = 0;
int writtenBytes = 0;
try {
for (int i = writeSpinCount; i > 0; i --) {
localWrittenBytes = buf.readBytes(ch, bytesLeft);
if (localWrittenBytes > 0) {
bytesLeft -= localWrittenBytes;
if (bytesLeft <= 0) {
removeOpWrite = true;
break;
}
writtenBytes += localWrittenBytes;
} else {
addOpWrite = true;
// FIXME: Spinning should be done by AbstractChannel.
for (int i = writeSpinCount; i > 0; i --) {
localWrittenBytes = buf.readBytes(ch, bytesLeft);
if (localWrittenBytes > 0) {
writtenBytes += localWrittenBytes;
bytesLeft -= localWrittenBytes;
if (bytesLeft <= 0) {
removeOpWrite = true;
break;
}
}
} catch (AsynchronousCloseException e) {
// Doesn't need a user attention - ignore.
} catch (Throwable t) {
if (t instanceof IOException) {
open = false;
selectionKey().cancel();
ch.close();
} else {
addOpWrite = true;
break;
}
}
if (open) {
if (addOpWrite) {
key.interestOps(interestOps | SelectionKey.OP_WRITE);
} else if (removeOpWrite) {
key.interestOps(interestOps & ~SelectionKey.OP_WRITE);
}
if (addOpWrite) {
key.interestOps(interestOps | SelectionKey.OP_WRITE);
} else if (removeOpWrite) {
key.interestOps(interestOps & ~SelectionKey.OP_WRITE);
}
return writtenBytes;

View File

@ -1,42 +0,0 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.socket.nio;
import io.netty.channel.ChannelConfig;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.SocketChannelConfig;
/**
* A {@link SocketChannelConfig} for a NIO TCP/IP {@link SocketChannel}.
*
* <h3>Available options</h3>
*
* In addition to the options provided by {@link ChannelConfig} and
* {@link SocketChannelConfig}, {@link NioSocketChannelConfig} allows the
* following options in the option map:
*
* <table border="1" cellspacing="0" cellpadding="6">
* <tr>
* <th>Name</th><th>Associated setter method</th>
* </tr><tr>
* <td>{@code "writeSpinCount"}</td><td>{@link #setWriteSpinCount(int)}</td>
* </tr>
* </table>
*/
public interface NioSocketChannelConfig extends SocketChannelConfig, NioChannelConfig {
// This method does not provide a configuration property by itself.
// It just combined SocketChannelConfig and NioChannelConfig for user's sake.
}

View File

@ -16,6 +16,7 @@
package io.netty.channel.socket.nio;
import io.netty.channel.Channel;
import io.netty.channel.Channel.Unsafe;
import io.netty.channel.ChannelException;
import io.netty.channel.EventLoopFactory;
import io.netty.channel.SingleThreadEventLoop;
@ -66,8 +67,7 @@ public class SelectorEventLoop extends SingleThreadEventLoop {
*/
protected final AtomicBoolean wakenUp = new AtomicBoolean();
// FIXME: It's not being increased by any channel implementations but we have to.
private volatile int cancelledKeys; // should use AtomicInteger but we just need approximation
int cancelledKeys;
public SelectorEventLoop() {
this(Executors.defaultThreadFactory());
@ -186,31 +186,30 @@ public class SelectorEventLoop extends SingleThreadEventLoop {
private void processSelectedKeys() throws IOException {
for (Iterator<SelectionKey> i = selector.selectedKeys().iterator(); i.hasNext();) {
SelectionKey k = i.next();
Channel ch = (Channel) k.attachment();
final SelectionKey k = i.next();
final Channel ch = (Channel) k.attachment();
final Unsafe unsafe = ch.unsafe();
boolean removeKey = true;
try {
int readyOps = k.readyOps();
if ((readyOps & SelectionKey.OP_READ) != 0 || readyOps == 0) {
ch.unsafe().read();
unsafe.read();
if (!ch.isOpen()) {
// Connection already closed - no need to handle write.
continue;
}
}
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
ch.unsafe().flush(null);
unsafe.flush(unsafe.voidFuture());
}
if ((readyOps & SelectionKey.OP_ACCEPT) != 0) {
ch.unsafe().read();
unsafe.read();
}
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
ch.unsafe().finishConnect();
unsafe.finishConnect();
}
} catch (CancelledKeyException ignored) {
ch.unsafe().close(null);
unsafe.close(unsafe.voidFuture());
} finally {
if (removeKey) {
i.remove();