diff --git a/transport/src/main/java/io/netty/channel/socket/aio/AioSocketChannel.java b/transport/src/main/java/io/netty/channel/socket/aio/AioSocketChannel.java index 932f24222f..60dd8b5418 100755 --- a/transport/src/main/java/io/netty/channel/socket/aio/AioSocketChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/aio/AioSocketChannel.java @@ -76,7 +76,12 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne private boolean readInProgress; private boolean inDoBeginRead; private boolean readAgain; - private boolean writeInProgress; + + private static final int NO_WRITE_IN_PROGRESS = 0; + private static final int WRITE_IN_PROGRESS = 1; + private static final int WRITE_FAILED = -2; + + private int writeInProgress; private boolean inDoFlushByteBuffer; /** @@ -246,7 +251,7 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne @Override protected void doFlushByteBuffer(ByteBuf buf) throws Exception { - if (inDoFlushByteBuffer || writeInProgress) { + if (inDoFlushByteBuffer || writeInProgress != NO_WRITE_IN_PROGRESS) { return; } @@ -263,7 +268,7 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne // discardReadBytes() later, modifying the readerIndex and the writerIndex unexpectedly. buf.discardReadBytes(); - writeInProgress = true; + writeInProgress = WRITE_IN_PROGRESS; if (buf.nioBufferCount() == 1) { javaChannel().write( buf.nioBuffer(), config.getWriteTimeout(), TimeUnit.MILLISECONDS, this, WRITE_HANDLER); @@ -279,7 +284,13 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne } } - if (writeInProgress) { + if (writeInProgress != NO_WRITE_IN_PROGRESS) { + if (writeInProgress == WRITE_FAILED) { + // failed because of an exception so reset state and break out of the loop now + // See #1242 + writeInProgress = NO_WRITE_IN_PROGRESS; + break; + } // JDK decided to write data (or notify handler) later. buf.suspendIntermediaryDeallocations(); break; @@ -368,7 +379,7 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne @Override protected void completed0(T result, AioSocketChannel channel) { - channel.writeInProgress = false; + channel.writeInProgress = NO_WRITE_IN_PROGRESS; ByteBuf buf = channel.unsafe().directOutboundContext().outboundByteBuffer(); if (buf.refCnt() == 0) { @@ -407,7 +418,7 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne @Override protected void failed0(Throwable cause, AioSocketChannel channel) { - channel.writeInProgress = false; + channel.writeInProgress = WRITE_FAILED; channel.flushFutureNotifier.notifyFlushFutures(cause); // Check if the exception was raised because of an InterruptedByTimeoutException which means that the