Handle the case where JDK notifies aync I/O handler immediately

After some debugging, I found that JDK AIO implementation often performs
I/O immediately from the caller thread if the caller thread is the I/O
thread, and notifies the completion handler also immediately.  This
commit handles such a case correctly during reads and writes.

Additionally, this commit also changes SingleThreadEventExecutor to let
it handle unexpected exceptions such as AssertionError in a robus
manner.
This commit is contained in:
Trustin Lee 2012-12-02 20:03:35 +09:00
parent 00c4b944e4
commit 95e8ec1db9
2 changed files with 108 additions and 65 deletions

View File

@ -98,6 +98,9 @@ public abstract class SingleThreadEventExecutor extends AbstractExecutorService
try { try {
SingleThreadEventExecutor.this.run(); SingleThreadEventExecutor.this.run();
success = true; success = true;
} catch (Throwable t) {
logger.warn("Unexpected exception from an event executor: ", t);
shutdown();
} finally { } finally {
// Check if confirmShutdown() was called at the end of the loop. // Check if confirmShutdown() was called at the end of the loop.
if (success && lastAccessTimeNanos == 0) { if (success && lastAccessTimeNanos == 0) {
@ -122,7 +125,11 @@ public abstract class SingleThreadEventExecutor extends AbstractExecutorService
cleanup(); cleanup();
} finally { } finally {
threadLock.release(); threadLock.release();
assert taskQueue.isEmpty(); if (!taskQueue.isEmpty()) {
logger.warn(
"An event executor terminated with " +
"non-empty task queue (" + taskQueue.size() + ')');
}
} }
} }
} }

View File

@ -25,6 +25,7 @@ import io.netty.channel.ChannelMetadata;
import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoop; import io.netty.channel.EventLoop;
import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.SocketChannelConfig;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
@ -61,9 +62,12 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
private volatile boolean inputShutdown; private volatile boolean inputShutdown;
private volatile boolean outputShutdown; private volatile boolean outputShutdown;
private boolean flushing; private boolean asyncWriteInProgress;
private boolean inDoFlushByteBuffer;
private boolean asyncReadInProgress;
private boolean inBeginRead;
private final AtomicBoolean readSuspended = new AtomicBoolean(); private final AtomicBoolean readSuspended = new AtomicBoolean();
private final AtomicBoolean readInProgress = new AtomicBoolean();
private final Runnable readTask = new Runnable() { private final Runnable readTask = new Runnable() {
@Override @Override
@ -231,58 +235,95 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
@Override @Override
protected void doFlushByteBuffer(ByteBuf buf) throws Exception { protected void doFlushByteBuffer(ByteBuf buf) throws Exception {
if (flushing) { if (inDoFlushByteBuffer || asyncWriteInProgress) {
return; return;
} }
flushing = true; inDoFlushByteBuffer = true;
// Ensure the readerIndex of the buffer is 0 before beginning an async write. try {
// Otherwise, JDK can write into a wrong region of the buffer when a handler calls if (buf.readable()) {
// discardReadBytes() later, modifying the readerIndex and the writerIndex unexpectedly. for (;;) {
buf.discardReadBytes(); // Ensure the readerIndex of the buffer is 0 before beginning an async write.
// Otherwise, JDK can write into a wrong region of the buffer when a handler calls
// discardReadBytes() later, modifying the readerIndex and the writerIndex unexpectedly.
buf.discardReadBytes();
if (buf.readable()) { asyncWriteInProgress = true;
if (buf.hasNioBuffers()) { if (buf.hasNioBuffers()) {
ByteBuffer[] buffers = buf.nioBuffers(buf.readerIndex(), buf.readableBytes()); ByteBuffer[] buffers = buf.nioBuffers(buf.readerIndex(), buf.readableBytes());
javaChannel().write(buffers, 0, buffers.length, config.getReadTimeout(), if (buffers.length == 1) {
TimeUnit.MILLISECONDS, this, GATHERING_WRITE_HANDLER); javaChannel().write(
buffers[0], config.getWriteTimeout(), TimeUnit.MILLISECONDS, this, WRITE_HANDLER);
} else {
javaChannel().write(
buffers, 0, buffers.length, config.getWriteTimeout(), TimeUnit.MILLISECONDS,
this, GATHERING_WRITE_HANDLER);
}
} else {
javaChannel().write(
buf.nioBuffer(), config.getWriteTimeout(), TimeUnit.MILLISECONDS, this, WRITE_HANDLER);
}
if (asyncWriteInProgress) {
// JDK decided to write data (or notify handler) later.
break;
}
// JDK performed the write operation immediately and notified the handler.
// We know this because we set asyncWriteInProgress to false in the handler.
if (!buf.readable()) {
// There's nothing left in the buffer. No need to retry writing.
break;
}
// There's more to write. Continue the loop.
}
} else { } else {
javaChannel().write(buf.nioBuffer(), config.getReadTimeout(), TimeUnit.MILLISECONDS, flushFutureNotifier.notifyFlushFutures();
this, WRITE_HANDLER);
} }
} else { } finally {
flushFutureNotifier.notifyFlushFutures(); inDoFlushByteBuffer = false;
flushing = false;
} }
} }
private void beginRead() { private void beginRead() {
if (readSuspended.get() || inputShutdown) { if (inBeginRead || asyncReadInProgress || inputShutdown || readSuspended.get()) {
return; return;
} }
// prevent ReadPendingException inBeginRead = true;
if (!readInProgress.compareAndSet(false, true)) {
return;
}
ByteBuf byteBuf = pipeline().inboundByteBuffer(); try {
if (!byteBuf.readable()) { for (;;) {
byteBuf.discardReadBytes(); ByteBuf byteBuf = pipeline().inboundByteBuffer();
} if (!byteBuf.readable()) {
byteBuf.discardReadBytes();
}
expandReadBuffer(byteBuf); expandReadBuffer(byteBuf);
if (byteBuf.hasNioBuffers()) { asyncReadInProgress = true;
ByteBuffer[] buffers = byteBuf.nioBuffers(byteBuf.writerIndex(), byteBuf.writableBytes()); if (byteBuf.hasNioBuffers()) {
javaChannel().read(buffers, 0, buffers.length, config.getWriteTimeout(), ByteBuffer[] buffers = byteBuf.nioBuffers(byteBuf.writerIndex(), byteBuf.writableBytes());
TimeUnit.MILLISECONDS, this, SCATTERING_READ_HANDLER); javaChannel().read(buffers, 0, buffers.length, config.getWriteTimeout(),
} else { TimeUnit.MILLISECONDS, this, SCATTERING_READ_HANDLER);
// Get a ByteBuffer view on the ByteBuf } else {
ByteBuffer buffer = byteBuf.nioBuffer(byteBuf.writerIndex(), byteBuf.writableBytes()); // Get a ByteBuffer view on the ByteBuf
javaChannel().read(buffer, config.getWriteTimeout(), TimeUnit.MILLISECONDS, ByteBuffer buffer = byteBuf.nioBuffer(byteBuf.writerIndex(), byteBuf.writableBytes());
this, READ_HANDLER); javaChannel().read(buffer, config.getWriteTimeout(), TimeUnit.MILLISECONDS,
this, READ_HANDLER);
}
if (asyncReadInProgress) {
// JDK decided to read data (or notify handler) later.
break;
}
// The read operation has been finished immediately - schedule another read operation.
}
} finally {
inBeginRead = false;
} }
} }
@ -290,6 +331,7 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
@Override @Override
protected void completed0(T result, AioSocketChannel channel) { protected void completed0(T result, AioSocketChannel channel) {
channel.asyncWriteInProgress = false;
ByteBuf buf = channel.unsafe().directOutboundContext().outboundByteBuffer(); ByteBuf buf = channel.unsafe().directOutboundContext().outboundByteBuffer();
int writtenBytes = result.intValue(); int writtenBytes = result.intValue();
if (writtenBytes > 0) { if (writtenBytes > 0) {
@ -297,38 +339,33 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
buf.readerIndex(buf.readerIndex() + writtenBytes); buf.readerIndex(buf.readerIndex() + writtenBytes);
} }
boolean empty = !buf.readable(); if (channel.inDoFlushByteBuffer) {
// JDK performed the write operation immediately and notified this handler immediately.
if (empty) { // doFlushByteBuffer() will do subsequent write operations if necessary for us.
// Reset reader/writerIndex to 0 if the buffer is empty. return;
buf.discardReadBytes();
} }
// Notify flush futures only when the handler is called outside of unsafe().flushNow()
// because flushNow() will do that for us.
ChannelFlushFutureNotifier notifier = channel.flushFutureNotifier; ChannelFlushFutureNotifier notifier = channel.flushFutureNotifier;
notifier.increaseWriteCounter(writtenBytes); notifier.increaseWriteCounter(writtenBytes);
notifier.notifyFlushFutures(); notifier.notifyFlushFutures();
// Allow to have the next write pending
channel.flushing = false;
// Stop flushing if disconnected. // Stop flushing if disconnected.
if (!channel.isActive()) { if (!channel.isActive()) {
return; return;
} }
if (buf.readable()) { if (buf.readable()) {
try { channel.unsafe().flushNow();
// Try to flush it again. } else {
channel.doFlushByteBuffer(buf); buf.discardReadBytes();
} catch (Exception e) {
// Should never happen, anyway call failed just in case
failed0(e, channel);
}
} }
} }
@Override @Override
protected void failed0(Throwable cause, AioSocketChannel channel) { protected void failed0(Throwable cause, AioSocketChannel channel) {
channel.asyncWriteInProgress = false;
channel.flushFutureNotifier.notifyFlushFutures(cause); channel.flushFutureNotifier.notifyFlushFutures(cause);
channel.pipeline().fireExceptionCaught(cause); channel.pipeline().fireExceptionCaught(cause);
@ -341,13 +378,12 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
return; return;
} }
ByteBuf buf = channel.unsafe().directOutboundContext().outboundByteBuffer(); if (!channel.inDoFlushByteBuffer) {
if (!buf.readable()) { ByteBuf buf = channel.unsafe().directOutboundContext().outboundByteBuffer();
buf.discardReadBytes(); if (!buf.readable()) {
buf.discardReadBytes();
}
} }
// Allow to have the next write pending
channel.flushing = false;
} }
} }
@ -355,6 +391,8 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
@Override @Override
protected void completed0(T result, AioSocketChannel channel) { protected void completed0(T result, AioSocketChannel channel) {
channel.asyncReadInProgress = false;
if (channel.inputShutdown) { if (channel.inputShutdown) {
// Channel has been closed during read. Because the inbound buffer has been deallocated already, // Channel has been closed during read. Because the inbound buffer has been deallocated already,
// there's no way to let a user handler access it unfortunately. // there's no way to let a user handler access it unfortunately.
@ -394,9 +432,6 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
} }
} }
} finally { } finally {
// see beginRead
channel.readInProgress.set(false);
if (read) { if (read) {
pipeline.fireInboundBufferUpdated(); pipeline.fireInboundBufferUpdated();
} }
@ -412,7 +447,7 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
} }
} }
} else { } else {
// start the next read // Schedule another read operation.
channel.beginRead(); channel.beginRead();
} }
} }
@ -420,6 +455,7 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
@Override @Override
protected void failed0(Throwable t, AioSocketChannel channel) { protected void failed0(Throwable t, AioSocketChannel channel) {
channel.asyncReadInProgress = false;
if (t instanceof ClosedChannelException) { if (t instanceof ClosedChannelException) {
return; return;
} }
@ -433,7 +469,7 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
if (t instanceof IOException || t instanceof InterruptedByTimeoutException) { if (t instanceof IOException || t instanceof InterruptedByTimeoutException) {
channel.unsafe().close(channel.unsafe().voidFuture()); channel.unsafe().close(channel.unsafe().voidFuture());
} else { } else {
// start the next read // Schedule another read operation.
channel.beginRead(); channel.beginRead();
} }
} }
@ -455,7 +491,7 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
} }
@Override @Override
public AioSocketChannelConfig config() { public SocketChannelConfig config() {
return config; return config;
} }