Fix writes. See #396

This commit is contained in:
Norman Maurer 2012-06-17 13:12:15 +02:00
parent 22bc8fecca
commit 7412c371f2

View File

@ -145,8 +145,7 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
expandReadBuffer(byteBuf);
// Get a ByteBuffer view on the ByteBuf and clear it before try to read
ByteBuffer buffer = byteBuf.nioBuffer();
buffer.clear();
ByteBuffer buffer = (ByteBuffer) byteBuf.nioBuffer(byteBuf.writerIndex(), byteBuf.writableBytes()).clear();
javaChannel().read(buffer, this, READ_HANDLER);
}
@ -192,7 +191,7 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
// a PendingWriteException will be thrown. So use CAS to not run
// into this
if (flushing.compareAndSet(false, true)) {
ByteBuffer buffer = (ByteBuffer)buf.nioBuffer();
ByteBuffer buffer = (ByteBuffer) buf.nioBuffer();
javaChannel().write(buffer, this, WRITE_HANDLER);
}
return false;
@ -206,22 +205,13 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
ByteBuf buf = channel.pipeline().outboundByteBuffer();
if (result > 0) {
if (result < buf.readableBytes()) {
// Update the readerIndex with the amount of read bytes
buf.readerIndex(buf.readerIndex() + result);
} else {
if (!buf.readable()) {
// Reset reader/writerIndex to 0 if the buffer is empty.
buf.clear();
} else {
// not enough space in the buffer anymore so discard everything that
// was read already
buf.discardReadBytes();
}
}
// Update the readerIndex with the amount of read bytes
buf.readerIndex(buf.readerIndex() + result);
channel.notifyFlushFutures();
if (!buf.readable()) {
buf.discardReadBytes();
}
}
// Allow to have the next write pending
@ -230,15 +220,16 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
@Override
public void failed(Throwable cause, AioSocketChannel channel) {
ByteBuf buf = channel.pipeline().outboundByteBuffer();
if (!buf.readable()) {
buf.discardReadBytes();
}
channel.notifyFlushFutures(cause);
channel.pipeline().fireExceptionCaught(cause);
if (cause instanceof IOException) {
channel.close(channel.unsafe().voidFuture());
channel.unsafe().close(channel.unsafe().voidFuture());
} else {
ByteBuf buf = channel.pipeline().outboundByteBuffer();
if (!buf.readable()) {
buf.discardReadBytes();
}
}
// Allow to have the next write pending
channel.flushing.set(false);
@ -279,14 +270,14 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
}
pipeline.fireExceptionCaught(t);
if (t instanceof IOException) {
channel.close(channel.unsafe().voidFuture());
channel.unsafe().close(channel.unsafe().voidFuture());
}
} finally {
if (read) {
pipeline.fireInboundBufferUpdated();
}
if (closed && channel.isOpen()) {
channel.close(channel.unsafe().voidFuture());
channel.unsafe().close(channel.unsafe().voidFuture());
} else {
// start the next read
channel.read();
@ -298,7 +289,7 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
public void failed(Throwable t, AioSocketChannel channel) {
channel.pipeline().fireExceptionCaught(t);
if (t instanceof IOException) {
channel.close(channel.unsafe().voidFuture());
channel.unsafe().close(channel.unsafe().voidFuture());
} else {
// start the next read
channel.read();