From 2ff22ff4c3246befd19b073aa3ddca48733c47cf Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Sat, 16 Jun 2012 21:16:54 +0200 Subject: [PATCH] Add javadocs and also handle writes correctly. See #396 --- .../socket/nio2/AbstractAsyncChannel.java | 2 +- .../socket/nio2/AsyncSocketchannel.java | 25 ++++++++++++++----- 2 files changed, 20 insertions(+), 7 deletions(-) diff --git a/transport/src/main/java/io/netty/channel/socket/nio2/AbstractAsyncChannel.java b/transport/src/main/java/io/netty/channel/socket/nio2/AbstractAsyncChannel.java index 01a85958bb..7ac6884a85 100755 --- a/transport/src/main/java/io/netty/channel/socket/nio2/AbstractAsyncChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/nio2/AbstractAsyncChannel.java @@ -72,7 +72,7 @@ public abstract class AbstractAsyncChannel extends AbstractChannel { @Override protected void doDeregister() throws Exception { - throw new UnsupportedOperationException("Deregistration is not supported by AbstractAsyncChannel"); + // NOOP } @Override diff --git a/transport/src/main/java/io/netty/channel/socket/nio2/AsyncSocketchannel.java b/transport/src/main/java/io/netty/channel/socket/nio2/AsyncSocketchannel.java index 98946f9882..60da25504a 100755 --- a/transport/src/main/java/io/netty/channel/socket/nio2/AsyncSocketchannel.java +++ b/transport/src/main/java/io/netty/channel/socket/nio2/AsyncSocketchannel.java @@ -173,13 +173,17 @@ public class AsyncSocketchannel extends AbstractAsyncChannel { @Override protected boolean isFlushPending() { - // TODO: Fix me - return true; + return false; } + @Override protected boolean doFlushByteBuffer(ByteBuf buf) throws Exception { + // Only one pending write can be scheduled at one time. Otherwise + // a PendingWriteException will be thrown. So use CAS to not run + // into this if (flushing.compareAndSet(false, true)) { - javaChannel().write(buf.nioBuffer(), this, WRITE_HANDLER); + ByteBuffer buffer = (ByteBuffer)buf.nioBuffer(); + javaChannel().write(buffer, this, WRITE_HANDLER); } return false; } @@ -190,13 +194,21 @@ public class AsyncSocketchannel extends AbstractAsyncChannel { @Override public void completed(Integer result, AsyncSocketchannel channel) { ByteBuf buf = channel.pipeline().outboundByteBuffer(); - if (!buf.readable()) { - buf.discardReadBytes(); - } if (result > 0) { + if (result < buf.readableBytes()) { + // Update the readerIndex with the amount of read bytes + buf.readerIndex(buf.readerIndex() + result); + } else { + // not enough space in the buffer anymore so discard everything that + // was read already + buf.discardReadBytes(); + + } channel.notifyFlushFutures(); } + + // Allow to have the next write pending channel.flushing.set(false); } @@ -212,6 +224,7 @@ public class AsyncSocketchannel extends AbstractAsyncChannel { if (cause instanceof IOException) { channel.close(channel.unsafe().voidFuture()); } + // Allow to have the next write pending channel.flushing.set(false); } }