diff --git a/transport/src/main/java/io/netty/channel/AbstractChannel.java b/transport/src/main/java/io/netty/channel/AbstractChannel.java index 366223f417..d84b53d143 100644 --- a/transport/src/main/java/io/netty/channel/AbstractChannel.java +++ b/transport/src/main/java/io/netty/channel/AbstractChannel.java @@ -641,9 +641,12 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha cause = t; } finally { final int newSize = out.readableBytes(); - writeCounter += oldSize - newSize; - if (newSize == 0) { - out.discardReadBytes(); + final int writtenBytes = oldSize - newSize; + if (writtenBytes > 0) { + writeCounter += writtenBytes; + if (newSize == 0) { + out.discardReadBytes(); + } } } } else { @@ -723,6 +726,12 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha protected abstract boolean isFlushPending(); protected void notifyFlushFutures() { + notifyFlushFutures(0); + } + + protected void notifyFlushFutures(long writtenBytes) { + writeCounter += writtenBytes; + if (flushCheckpoints.isEmpty()) { return; } diff --git a/transport/src/main/java/io/netty/channel/socket/aio/AbstractAioChannel.java b/transport/src/main/java/io/netty/channel/socket/aio/AbstractAioChannel.java index 3fab6e445a..8a16d6366d 100644 --- a/transport/src/main/java/io/netty/channel/socket/aio/AbstractAioChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/aio/AbstractAioChannel.java @@ -43,7 +43,6 @@ public abstract class AbstractAioChannel extends AbstractChannel { super(parent, id); } - @Override public InetSocketAddress localAddress() { if (ch == null) { @@ -64,7 +63,6 @@ public abstract class AbstractAioChannel extends AbstractChannel { return ch; } - @Override public boolean isOpen() { return ch == null || ch.isOpen(); @@ -162,6 +160,7 @@ public abstract class AbstractAioChannel extends AbstractChannel { } } } + protected abstract void doConnect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelFuture connectFuture); diff --git a/transport/src/main/java/io/netty/channel/socket/aio/AioCompletionHandler.java b/transport/src/main/java/io/netty/channel/socket/aio/AioCompletionHandler.java index a78235b71b..42201f7d43 100644 --- a/transport/src/main/java/io/netty/channel/socket/aio/AioCompletionHandler.java +++ b/transport/src/main/java/io/netty/channel/socket/aio/AioCompletionHandler.java @@ -43,7 +43,6 @@ abstract class AioCompletionHandler implements CompletionH completed0(result, channel); } else { channel.eventLoop().execute(new Runnable() { - @Override public void run() { completed0(result, channel); @@ -58,7 +57,6 @@ abstract class AioCompletionHandler implements CompletionH failed0(exc, channel); } else { channel.eventLoop().execute(new Runnable() { - @Override public void run() { failed0(exc, channel); diff --git a/transport/src/main/java/io/netty/channel/socket/aio/AioSocketChannel.java b/transport/src/main/java/io/netty/channel/socket/aio/AioSocketChannel.java index 3f51f9cd6b..3d33d55d5d 100644 --- a/transport/src/main/java/io/netty/channel/socket/aio/AioSocketChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/aio/AioSocketChannel.java @@ -129,7 +129,6 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne /** * Trigger a read from the {@link AioSocketChannel} - * */ void read() { ByteBuf byteBuf = pipeline().inboundByteBuffer(); @@ -139,7 +138,6 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne javaChannel().read(buffer, this, READ_HANDLER); } - private static boolean expandReadBuffer(ByteBuf byteBuf) { if (!byteBuf.writable()) { // FIXME: Magic number @@ -175,51 +173,55 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne @Override protected void doFlushByteBuffer(ByteBuf buf) throws Exception { if (!buf.readable()) { - // Reset reader/writerIndex to 0 if the buffer is empty. - buf.clear(); + return; } - // Only one pending write can be scheduled at one time. Otherwise - // a PendingWriteException will be thrown. So use CAS to not run - // into this if (!flushing) { flushing = true; - ByteBuffer buffer = buf.nioBuffer(); - javaChannel().write(buffer, this, WRITE_HANDLER); + buf.discardReadBytes(); + javaChannel().write(buf.nioBuffer(), this, WRITE_HANDLER); } } - private static final class WriteHandler extends AioCompletionHandler { @Override protected void completed0(Integer result, AioSocketChannel channel) { ByteBuf buf = channel.unsafe().directOutboundContext().outboundByteBuffer(); - if (result > 0) { - + int writtenBytes = result; + if (writtenBytes > 0) { // Update the readerIndex with the amount of read bytes - buf.readerIndex(buf.readerIndex() + result); - - channel.notifyFlushFutures(); - if (!buf.readable()) { - buf.discardReadBytes(); - } + buf.readerIndex(buf.readerIndex() + writtenBytes); } + boolean empty = !buf.readable(); + + if (empty) { + // Reset reader/writerIndex to 0 if the buffer is empty. + buf.clear(); + } + + channel.notifyFlushFutures(writtenBytes); + // Allow to have the next write pending channel.flushing = false; // Stop flushing if disconnected. if (!channel.isActive()) { + if (!empty) { + channel.notifyFlushFutures(new ClosedChannelException()); + } return; } - try { - // try to flush it again if nothing is left it will return fast here - channel.doFlushByteBuffer(buf); - } catch (Exception e) { - // Should never happen, anyway call failed just in case - failed(e, channel); + if (buf.readable()) { + try { + // try to flush it again if nothing is left it will return fast here + channel.doFlushByteBuffer(buf); + } catch (Exception e) { + // Should never happen, anyway call failed just in case + failed0(e, channel); + } } } @@ -232,7 +234,6 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne channel.notifyFlushFutures(cause); channel.pipeline().fireExceptionCaught(cause); if (cause instanceof IOException) { - channel.unsafe().close(channel.unsafe().voidFuture()); } else { ByteBuf buf = channel.pipeline().outboundByteBuffer(); @@ -265,13 +266,12 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne byteBuf.writerIndex(byteBuf.writerIndex() + localReadAmount); read = true; - } else if (localReadAmount < 0) { closed = true; } } catch (Throwable t) { - if (t instanceof AsynchronousCloseException) { + if (t instanceof ClosedChannelException) { channel.closed = true; } @@ -294,7 +294,6 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne } else { // start the next read channel.read(); - } } }