Fix data corruption in the AIO transport

This commit is contained in:
Trustin Lee 2012-07-07 21:29:03 +09:00
parent 2bc26fbc70
commit 613834f326
4 changed files with 40 additions and 35 deletions

View File

@ -641,11 +641,14 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
cause = t; cause = t;
} finally { } finally {
final int newSize = out.readableBytes(); final int newSize = out.readableBytes();
writeCounter += oldSize - newSize; final int writtenBytes = oldSize - newSize;
if (writtenBytes > 0) {
writeCounter += writtenBytes;
if (newSize == 0) { if (newSize == 0) {
out.discardReadBytes(); out.discardReadBytes();
} }
} }
}
} else { } else {
MessageBuf<Object> out = ctx.outboundMessageBuffer(); MessageBuf<Object> out = ctx.outboundMessageBuffer();
int oldSize = out.size(); int oldSize = out.size();
@ -723,6 +726,12 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
protected abstract boolean isFlushPending(); protected abstract boolean isFlushPending();
protected void notifyFlushFutures() { protected void notifyFlushFutures() {
notifyFlushFutures(0);
}
protected void notifyFlushFutures(long writtenBytes) {
writeCounter += writtenBytes;
if (flushCheckpoints.isEmpty()) { if (flushCheckpoints.isEmpty()) {
return; return;
} }

View File

@ -43,7 +43,6 @@ public abstract class AbstractAioChannel extends AbstractChannel {
super(parent, id); super(parent, id);
} }
@Override @Override
public InetSocketAddress localAddress() { public InetSocketAddress localAddress() {
if (ch == null) { if (ch == null) {
@ -64,7 +63,6 @@ public abstract class AbstractAioChannel extends AbstractChannel {
return ch; return ch;
} }
@Override @Override
public boolean isOpen() { public boolean isOpen() {
return ch == null || ch.isOpen(); return ch == null || ch.isOpen();
@ -162,6 +160,7 @@ public abstract class AbstractAioChannel extends AbstractChannel {
} }
} }
} }
protected abstract void doConnect(SocketAddress remoteAddress, protected abstract void doConnect(SocketAddress remoteAddress,
SocketAddress localAddress, ChannelFuture connectFuture); SocketAddress localAddress, ChannelFuture connectFuture);

View File

@ -43,7 +43,6 @@ abstract class AioCompletionHandler<V, A extends Channel> implements CompletionH
completed0(result, channel); completed0(result, channel);
} else { } else {
channel.eventLoop().execute(new Runnable() { channel.eventLoop().execute(new Runnable() {
@Override @Override
public void run() { public void run() {
completed0(result, channel); completed0(result, channel);
@ -58,7 +57,6 @@ abstract class AioCompletionHandler<V, A extends Channel> implements CompletionH
failed0(exc, channel); failed0(exc, channel);
} else { } else {
channel.eventLoop().execute(new Runnable() { channel.eventLoop().execute(new Runnable() {
@Override @Override
public void run() { public void run() {
failed0(exc, channel); failed0(exc, channel);

View File

@ -129,7 +129,6 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
/** /**
* Trigger a read from the {@link AioSocketChannel} * Trigger a read from the {@link AioSocketChannel}
*
*/ */
void read() { void read() {
ByteBuf byteBuf = pipeline().inboundByteBuffer(); ByteBuf byteBuf = pipeline().inboundByteBuffer();
@ -139,7 +138,6 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
javaChannel().read(buffer, this, READ_HANDLER); javaChannel().read(buffer, this, READ_HANDLER);
} }
private static boolean expandReadBuffer(ByteBuf byteBuf) { private static boolean expandReadBuffer(ByteBuf byteBuf) {
if (!byteBuf.writable()) { if (!byteBuf.writable()) {
// FIXME: Magic number // FIXME: Magic number
@ -175,51 +173,55 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
@Override @Override
protected void doFlushByteBuffer(ByteBuf buf) throws Exception { protected void doFlushByteBuffer(ByteBuf buf) throws Exception {
if (!buf.readable()) { if (!buf.readable()) {
// Reset reader/writerIndex to 0 if the buffer is empty. return;
buf.clear();
} }
// Only one pending write can be scheduled at one time. Otherwise
// a PendingWriteException will be thrown. So use CAS to not run
// into this
if (!flushing) { if (!flushing) {
flushing = true; flushing = true;
ByteBuffer buffer = buf.nioBuffer(); buf.discardReadBytes();
javaChannel().write(buffer, this, WRITE_HANDLER); javaChannel().write(buf.nioBuffer(), this, WRITE_HANDLER);
} }
} }
private static final class WriteHandler extends AioCompletionHandler<Integer, AioSocketChannel> { private static final class WriteHandler extends AioCompletionHandler<Integer, AioSocketChannel> {
@Override @Override
protected void completed0(Integer result, AioSocketChannel channel) { protected void completed0(Integer result, AioSocketChannel channel) {
ByteBuf buf = channel.unsafe().directOutboundContext().outboundByteBuffer(); ByteBuf buf = channel.unsafe().directOutboundContext().outboundByteBuffer();
if (result > 0) { int writtenBytes = result;
if (writtenBytes > 0) {
// Update the readerIndex with the amount of read bytes // Update the readerIndex with the amount of read bytes
buf.readerIndex(buf.readerIndex() + result); buf.readerIndex(buf.readerIndex() + writtenBytes);
}
channel.notifyFlushFutures(); boolean empty = !buf.readable();
if (!buf.readable()) {
buf.discardReadBytes(); if (empty) {
} // Reset reader/writerIndex to 0 if the buffer is empty.
buf.clear();
} }
channel.notifyFlushFutures(writtenBytes);
// Allow to have the next write pending // Allow to have the next write pending
channel.flushing = false; channel.flushing = false;
// Stop flushing if disconnected. // Stop flushing if disconnected.
if (!channel.isActive()) { if (!channel.isActive()) {
if (!empty) {
channel.notifyFlushFutures(new ClosedChannelException());
}
return; return;
} }
if (buf.readable()) {
try { try {
// try to flush it again if nothing is left it will return fast here // try to flush it again if nothing is left it will return fast here
channel.doFlushByteBuffer(buf); channel.doFlushByteBuffer(buf);
} catch (Exception e) { } catch (Exception e) {
// Should never happen, anyway call failed just in case // Should never happen, anyway call failed just in case
failed(e, channel); failed0(e, channel);
}
} }
} }
@ -232,7 +234,6 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
channel.notifyFlushFutures(cause); channel.notifyFlushFutures(cause);
channel.pipeline().fireExceptionCaught(cause); channel.pipeline().fireExceptionCaught(cause);
if (cause instanceof IOException) { if (cause instanceof IOException) {
channel.unsafe().close(channel.unsafe().voidFuture()); channel.unsafe().close(channel.unsafe().voidFuture());
} else { } else {
ByteBuf buf = channel.pipeline().outboundByteBuffer(); ByteBuf buf = channel.pipeline().outboundByteBuffer();
@ -265,13 +266,12 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
byteBuf.writerIndex(byteBuf.writerIndex() + localReadAmount); byteBuf.writerIndex(byteBuf.writerIndex() + localReadAmount);
read = true; read = true;
} else if (localReadAmount < 0) { } else if (localReadAmount < 0) {
closed = true; closed = true;
} }
} catch (Throwable t) { } catch (Throwable t) {
if (t instanceof AsynchronousCloseException) { if (t instanceof ClosedChannelException) {
channel.closed = true; channel.closed = true;
} }
@ -294,7 +294,6 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
} else { } else {
// start the next read // start the next read
channel.read(); channel.read();
} }
} }
} }