Some clean. See #396

This commit is contained in:
Norman Maurer 2012-06-16 22:29:25 +02:00
parent 314ac37732
commit 70c4f59c45

View File

@ -45,7 +45,7 @@ public class AioSocketChannel extends AbstractAioChannel {
super.channelActive(ctx); super.channelActive(ctx);
// once the channel is active, the first read is scheduled // once the channel is active, the first read is scheduled
AioSocketChannel.read((AioSocketChannel)ctx.channel()); ((AioSocketChannel)ctx.channel()).read();
} finally { } finally {
ctx.pipeline().remove(this); ctx.pipeline().remove(this);
@ -136,14 +136,14 @@ public class AioSocketChannel extends AbstractAioChannel {
* Trigger a read from the {@link AioSocketChannel} * Trigger a read from the {@link AioSocketChannel}
* *
*/ */
private static void read(AioSocketChannel channel) { void read() {
ByteBuf byteBuf = channel.pipeline().inboundByteBuffer(); ByteBuf byteBuf = pipeline().inboundByteBuffer();
expandReadBuffer(byteBuf); expandReadBuffer(byteBuf);
// Get a ByteBuffer view on the ByteBuf and clear it before try to read // Get a ByteBuffer view on the ByteBuf and clear it before try to read
ByteBuffer buffer = byteBuf.nioBuffer(); ByteBuffer buffer = byteBuf.nioBuffer();
buffer.clear(); buffer.clear();
channel.javaChannel().read(buffer, channel, READ_HANDLER); javaChannel().read(buffer, this, READ_HANDLER);
} }
@ -178,6 +178,12 @@ public class AioSocketChannel extends AbstractAioChannel {
@Override @Override
protected boolean doFlushByteBuffer(ByteBuf buf) throws Exception { protected boolean doFlushByteBuffer(ByteBuf buf) throws Exception {
if (!buf.readable()) {
// Reset reader/writerIndex to 0 if the buffer is empty.
buf.clear();
return true;
}
// Only one pending write can be scheduled at one time. Otherwise // Only one pending write can be scheduled at one time. Otherwise
// a PendingWriteException will be thrown. So use CAS to not run // a PendingWriteException will be thrown. So use CAS to not run
// into this // into this
@ -199,10 +205,16 @@ public class AioSocketChannel extends AbstractAioChannel {
if (result < buf.readableBytes()) { if (result < buf.readableBytes()) {
// Update the readerIndex with the amount of read bytes // Update the readerIndex with the amount of read bytes
buf.readerIndex(buf.readerIndex() + result); buf.readerIndex(buf.readerIndex() + result);
} else {
if (!buf.readable()) {
// Reset reader/writerIndex to 0 if the buffer is empty.
buf.clear();
} else { } else {
// not enough space in the buffer anymore so discard everything that // not enough space in the buffer anymore so discard everything that
// was read already // was read already
buf.discardReadBytes(); buf.discardReadBytes();
}
} }
channel.notifyFlushFutures(); channel.notifyFlushFutures();
@ -273,7 +285,7 @@ public class AioSocketChannel extends AbstractAioChannel {
channel.close(channel.unsafe().voidFuture()); channel.close(channel.unsafe().voidFuture());
} else { } else {
// start the next read // start the next read
AioSocketChannel.read(channel); channel.read();
} }
} }
} }
@ -285,7 +297,7 @@ public class AioSocketChannel extends AbstractAioChannel {
channel.close(channel.unsafe().voidFuture()); channel.close(channel.unsafe().voidFuture());
} else { } else {
// start the next read // start the next read
AioSocketChannel.read(channel); channel.read();
} }
} }
} }
@ -297,7 +309,7 @@ public class AioSocketChannel extends AbstractAioChannel {
((AsyncUnsafe) channel.unsafe()).connectSuccess(); ((AsyncUnsafe) channel.unsafe()).connectSuccess();
// start reading from channel // start reading from channel
AioSocketChannel.read(channel); channel.read();
} }
@Override @Override