Allow to config read/write timeout for the AIO transport. See #509

This commit is contained in:
norman 2012-08-14 08:06:54 +02:00
parent b5aa2108ec
commit 061252e4b4
4 changed files with 91 additions and 9 deletions

View File

@ -283,6 +283,7 @@
<ignore>java.nio.channels.AsynchronousServerSocketChannel</ignore> <ignore>java.nio.channels.AsynchronousServerSocketChannel</ignore>
<ignore>java.nio.channels.AsynchronousChannelGroup</ignore> <ignore>java.nio.channels.AsynchronousChannelGroup</ignore>
<ignore>java.nio.channels.NetworkChannel</ignore> <ignore>java.nio.channels.NetworkChannel</ignore>
<ignore>java.nio.channels.InterruptedByTimeoutException</ignore>
</ignores> </ignores>
</configuration> </configuration>
<executions> <executions>

View File

@ -94,6 +94,11 @@ public class ChannelOption<T> extends UniqueName {
public static final ChannelOption<SocketAddress> SCTP_SET_PEER_PRIMARY_ADDR = public static final ChannelOption<SocketAddress> SCTP_SET_PEER_PRIMARY_ADDR =
new ChannelOption<SocketAddress>("SCTP_SET_PEER_PRIMARY_ADDR"); new ChannelOption<SocketAddress>("SCTP_SET_PEER_PRIMARY_ADDR");
public static final ChannelOption<Long> AIO_READ_TIMEOUT =
new ChannelOption<Long>("AIO_READ_TIMEOUT");
public static final ChannelOption<Long> AIO_WRITE_TIMEOUT =
new ChannelOption<Long>("AIO_WRITE_TIMEOUT");
public ChannelOption(String name) { public ChannelOption(String name) {
super(names, name); super(names, name);
} }

View File

@ -15,7 +15,6 @@
*/ */
package io.netty.channel.socket.aio; package io.netty.channel.socket.aio;
import static java.util.concurrent.TimeUnit.SECONDS;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.buffer.ChannelBufType; import io.netty.buffer.ChannelBufType;
import io.netty.channel.ChannelException; import io.netty.channel.ChannelException;
@ -32,6 +31,8 @@ import java.nio.channels.AsynchronousChannelGroup;
import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.ClosedChannelException; import java.nio.channels.ClosedChannelException;
import java.nio.channels.CompletionHandler; import java.nio.channels.CompletionHandler;
import java.nio.channels.InterruptedByTimeoutException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
@ -185,10 +186,11 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
if (buf.readable()) { if (buf.readable()) {
if (buf.hasNioBuffers()) { if (buf.hasNioBuffers()) {
ByteBuffer[] buffers = buf.nioBuffers(buf.readerIndex(), buf.readableBytes()); ByteBuffer[] buffers = buf.nioBuffers(buf.readerIndex(), buf.readableBytes());
javaChannel().write(buffers, 0, buffers.length, 0L, SECONDS, AioSocketChannel.this, javaChannel().write(buffers, 0, buffers.length, config.getReadTimeout(),
GATHERING_WRITE_HANDLER); TimeUnit.MILLISECONDS, AioSocketChannel.this, GATHERING_WRITE_HANDLER);
} else { } else {
javaChannel().write(buf.nioBuffer(), this, WRITE_HANDLER); javaChannel().write(buf.nioBuffer(), config.getReadTimeout(), TimeUnit.MILLISECONDS,
this, WRITE_HANDLER);
} }
} else { } else {
notifyFlushFutures(); notifyFlushFutures();
@ -215,12 +217,13 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
if (byteBuf.hasNioBuffers()) { if (byteBuf.hasNioBuffers()) {
ByteBuffer[] buffers = byteBuf.nioBuffers(byteBuf.writerIndex(), byteBuf.writableBytes()); ByteBuffer[] buffers = byteBuf.nioBuffers(byteBuf.writerIndex(), byteBuf.writableBytes());
javaChannel().read(buffers, 0, buffers.length, 0L, SECONDS, AioSocketChannel.this, javaChannel().read(buffers, 0, buffers.length, config.getWriteTimeout(),
SCATTERING_READ_HANDLER); TimeUnit.MILLISECONDS, AioSocketChannel.this, SCATTERING_READ_HANDLER);
} else { } else {
// Get a ByteBuffer view on the ByteBuf // Get a ByteBuffer view on the ByteBuf
ByteBuffer buffer = byteBuf.nioBuffer(byteBuf.writerIndex(), byteBuf.writableBytes()); ByteBuffer buffer = byteBuf.nioBuffer(byteBuf.writerIndex(), byteBuf.writableBytes());
javaChannel().read(buffer, AioSocketChannel.this, READ_HANDLER); javaChannel().read(buffer, config.getWriteTimeout(), TimeUnit.MILLISECONDS,
AioSocketChannel.this, READ_HANDLER);
} }
} }
@ -268,6 +271,15 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
channel.notifyFlushFutures(cause); channel.notifyFlushFutures(cause);
channel.pipeline().fireExceptionCaught(cause); channel.pipeline().fireExceptionCaught(cause);
// Check if the exception was raised because of an InterruptedByTimeoutException which means that the
// write timeout was hit. In that case we should close the channel as it may be unusable anyway.
//
// See http://openjdk.java.net/projects/nio/javadoc/java/nio/channels/AsynchronousSocketChannel.html
if (cause instanceof InterruptedByTimeoutException) {
channel.unsafe().close(channel.unsafe().voidFuture());
return;
}
ByteBuf buf = channel.unsafe().directOutboundContext().outboundByteBuffer(); ByteBuf buf = channel.unsafe().directOutboundContext().outboundByteBuffer();
if (!buf.readable()) { if (!buf.readable()) {
buf.discardReadBytes(); buf.discardReadBytes();
@ -343,7 +355,11 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
channel.pipeline().fireExceptionCaught(t); channel.pipeline().fireExceptionCaught(t);
if (t instanceof IOException) { // Check if the exception was raised because of an InterruptedByTimeoutException which means that the
// write timeout was hit. In that case we should close the channel as it may be unusable anyway.
//
// See http://openjdk.java.net/projects/nio/javadoc/java/nio/channels/AsynchronousSocketChannel.html
if (t instanceof IOException || t instanceof InterruptedByTimeoutException) {
channel.unsafe().close(channel.unsafe().voidFuture()); channel.unsafe().close(channel.unsafe().voidFuture());
} else { } else {
// start the next read // start the next read

View File

@ -23,6 +23,7 @@ import io.netty.channel.socket.SocketChannelConfig;
import java.io.IOException; import java.io.IOException;
import java.net.StandardSocketOptions; import java.net.StandardSocketOptions;
import java.nio.channels.InterruptedByTimeoutException;
import java.nio.channels.NetworkChannel; import java.nio.channels.NetworkChannel;
import java.util.Map; import java.util.Map;
@ -33,6 +34,8 @@ final class AioSocketChannelConfig extends DefaultChannelConfig
implements SocketChannelConfig { implements SocketChannelConfig {
private final NetworkChannel channel; private final NetworkChannel channel;
private volatile long readTimeoutInMillis;
private volatile long writeTimeoutInMillis;
/** /**
* Creates a new instance. * Creates a new instance.
@ -49,7 +52,8 @@ final class AioSocketChannelConfig extends DefaultChannelConfig
public Map<ChannelOption<?>, Object> getOptions() { public Map<ChannelOption<?>, Object> getOptions() {
return getOptions( return getOptions(
super.getOptions(), super.getOptions(),
SO_RCVBUF, SO_SNDBUF, TCP_NODELAY, SO_KEEPALIVE, SO_REUSEADDR, SO_LINGER, IP_TOS); SO_RCVBUF, SO_SNDBUF, TCP_NODELAY, SO_KEEPALIVE, SO_REUSEADDR, SO_LINGER, IP_TOS,
AIO_READ_TIMEOUT, AIO_WRITE_TIMEOUT);
} }
@Override @Override
@ -75,6 +79,12 @@ final class AioSocketChannelConfig extends DefaultChannelConfig
if (option == IP_TOS) { if (option == IP_TOS) {
return (T) Integer.valueOf(getTrafficClass()); return (T) Integer.valueOf(getTrafficClass());
} }
if (option == AIO_READ_TIMEOUT) {
return (T) Long.valueOf(getReadTimeout());
}
if (option == AIO_WRITE_TIMEOUT) {
return (T) Long.valueOf(getWriteTimeout());
}
return super.getOption(option); return super.getOption(option);
} }
@ -97,6 +107,10 @@ final class AioSocketChannelConfig extends DefaultChannelConfig
setSoLinger((Integer) value); setSoLinger((Integer) value);
} else if (option == IP_TOS) { } else if (option == IP_TOS) {
setTrafficClass((Integer) value); setTrafficClass((Integer) value);
} else if (option == AIO_READ_TIMEOUT) {
setReadTimeout((Long) value);
} else if (option == AIO_WRITE_TIMEOUT) {
setWriteTimeout((Long) value);
} else { } else {
return super.setOption(option, value); return super.setOption(option, value);
} }
@ -235,4 +249,50 @@ final class AioSocketChannelConfig extends DefaultChannelConfig
throw new ChannelException(e); throw new ChannelException(e);
} }
} }
/**
* Return the read timeout in milliseconds after which a {@link InterruptedByTimeoutException} will get thrown.
* Once such an exception was detected it will get propagated to the handlers first. After that the channel
* will get closed as it may be in an unknown state.
*
* To disable it just use <code>0</code>.
*/
public void setReadTimeout(long readTimeoutInMillis) {
if (readTimeoutInMillis < 0) {
throw new IllegalArgumentException("readTimeoutInMillis: " + readTimeoutInMillis);
}
this.readTimeoutInMillis = readTimeoutInMillis;
}
/**
* Return the write timeout in milliseconds after which a {@link InterruptedByTimeoutException} will get thrown.
* Once such an exception was detected it will get propagated to the handlers first. After that the channel
* will get closed as it may be in an unknown state.
*
* To disable it just use <code>0</code>.
*/
public void setWriteTimeout(long writeTimeoutInMillis) {
if (writeTimeoutInMillis < 0) {
throw new IllegalArgumentException("writeTimeoutInMillis: " + writeTimeoutInMillis);
}
this.writeTimeoutInMillis = writeTimeoutInMillis;
}
/**
* Return the read timeout in milliseconds after which a {@link InterruptedByTimeoutException} will get thrown.
*
* The default is <code>0</code>
*/
public long getReadTimeout() {
return readTimeoutInMillis;
}
/**
* Return the write timeout in milliseconds after which a {@link InterruptedByTimeoutException} will get thrown.
*
* The default is <code>0</code>
*/
public long getWriteTimeout() {
return writeTimeoutInMillis;
}
} }