Fix a bug where ChunkedWriteHandler stalls

- Other encoders in the pipeline were swallowing the flush request.
- Do not allocate a new buffer unnecessarily in ChunkedNioFile
This commit is contained in:
Trustin Lee 2012-06-11 11:23:09 +09:00
parent 9dce123938
commit 89444ef4ec
5 changed files with 44 additions and 29 deletions

View File

@ -46,8 +46,9 @@ public abstract class ByteToByteEncoder extends ChannelOutboundByteHandlerAdapte
if (out.readableBytes() > oldOutSize) { if (out.readableBytes() > oldOutSize) {
in.discardReadBytes(); in.discardReadBytes();
ctx.flush(future);
} }
ctx.flush(future);
} }
public abstract void encode(ChannelHandlerContext ctx, ByteBuf in, ByteBuf out) throws Exception; public abstract void encode(ChannelHandlerContext ctx, ByteBuf in, ByteBuf out) throws Exception;

View File

@ -29,8 +29,6 @@ public abstract class MessageToByteEncoder<I> extends ChannelOutboundMessageHand
Queue<I> in = ctx.outboundMessageBuffer(); Queue<I> in = ctx.outboundMessageBuffer();
ByteBuf out = ctx.nextOutboundByteBuffer(); ByteBuf out = ctx.nextOutboundByteBuffer();
boolean notify = false;
int oldOutSize = out.readableBytes();
for (;;) { for (;;) {
Object msg = in.poll(); Object msg = in.poll();
if (msg == null) { if (msg == null) {
@ -39,7 +37,6 @@ public abstract class MessageToByteEncoder<I> extends ChannelOutboundMessageHand
if (!isEncodable(msg)) { if (!isEncodable(msg)) {
ctx.nextOutboundMessageBuffer().add(msg); ctx.nextOutboundMessageBuffer().add(msg);
notify = true;
continue; continue;
} }
@ -56,10 +53,8 @@ public abstract class MessageToByteEncoder<I> extends ChannelOutboundMessageHand
} }
} }
if (out.readableBytes() > oldOutSize || notify) {
ctx.flush(future); ctx.flush(future);
} }
}
/** /**
* Returns {@code true} if and only if the specified message can be encoded by this encoder. * Returns {@code true} if and only if the specified message can be encoded by this encoder.

View File

@ -26,7 +26,6 @@ public abstract class MessageToMessageEncoder<I, O> extends ChannelOutboundMessa
@Override @Override
public void flush(ChannelHandlerContext ctx, ChannelFuture future) throws Exception { public void flush(ChannelHandlerContext ctx, ChannelFuture future) throws Exception {
Queue<I> in = ctx.outboundMessageBuffer(); Queue<I> in = ctx.outboundMessageBuffer();
boolean notify = false;
for (;;) { for (;;) {
try { try {
Object msg = in.poll(); Object msg = in.poll();
@ -36,7 +35,6 @@ public abstract class MessageToMessageEncoder<I, O> extends ChannelOutboundMessa
if (!isEncodable(msg)) { if (!isEncodable(msg)) {
ctx.nextOutboundMessageBuffer().add(msg); ctx.nextOutboundMessageBuffer().add(msg);
notify = true;
continue; continue;
} }
@ -49,9 +47,7 @@ public abstract class MessageToMessageEncoder<I, O> extends ChannelOutboundMessa
continue; continue;
} }
if (CodecUtil.unfoldAndAdd(ctx, omsg, false)) { CodecUtil.unfoldAndAdd(ctx, omsg, false);
notify = true;
}
} catch (Throwable t) { } catch (Throwable t) {
if (t instanceof CodecException) { if (t instanceof CodecException) {
ctx.fireExceptionCaught(t); ctx.fireExceptionCaught(t);
@ -61,10 +57,8 @@ public abstract class MessageToMessageEncoder<I, O> extends ChannelOutboundMessa
} }
} }
if (notify) {
ctx.flush(future); ctx.flush(future);
} }
}
/** /**
* Returns {@code true} if and only if the specified message can be encoded by this encoder. * Returns {@code true} if and only if the specified message can be encoded by this encoder.

View File

@ -20,7 +20,6 @@ import io.netty.buffer.ByteBuf;
import java.io.File; import java.io.File;
import java.io.FileInputStream; import java.io.FileInputStream;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel; import java.nio.channels.FileChannel;
/** /**
@ -148,11 +147,9 @@ public class ChunkedNioFile implements ChunkedByteInput {
} }
int chunkSize = (int) Math.min(this.chunkSize, endOffset - offset); int chunkSize = (int) Math.min(this.chunkSize, endOffset - offset);
byte[] chunkArray = new byte[chunkSize];
ByteBuffer chunk = ByteBuffer.wrap(chunkArray);
int readBytes = 0; int readBytes = 0;
for (;;) { for (;;) {
int localReadBytes = in.read(chunk); int localReadBytes = buffer.writeBytes(in, chunkSize - readBytes);
if (localReadBytes < 0) { if (localReadBytes < 0) {
break; break;
} }
@ -161,8 +158,6 @@ public class ChunkedNioFile implements ChunkedByteInput {
break; break;
} }
} }
chunk.flip();
buffer.writeBytes(chunk);
this.offset += readBytes; this.offset += readBytes;
return true; return true;

View File

@ -71,21 +71,33 @@ public class ChunkedWriteHandler
private static final InternalLogger logger = private static final InternalLogger logger =
InternalLoggerFactory.getInstance(ChunkedWriteHandler.class); InternalLoggerFactory.getInstance(ChunkedWriteHandler.class);
private static final int MAX_PENDING_WRITES = 4;
private final MessageBuf<Object> queue = MessageBufs.buffer(); private final MessageBuf<Object> queue = MessageBufs.buffer();
private final int maxPendingWrites;
private volatile ChannelHandlerContext ctx; private volatile ChannelHandlerContext ctx;
private final AtomicInteger pendingWrites = new AtomicInteger(); private final AtomicInteger pendingWrites = new AtomicInteger();
private Object currentEvent; private Object currentEvent;
public ChunkedWriteHandler() {
this(4);
}
public ChunkedWriteHandler(int maxPendingWrites) {
if (maxPendingWrites <= 0) {
throw new IllegalArgumentException(
"maxPendingWrites: " + maxPendingWrites + " (expected: > 0)");
}
this.maxPendingWrites = maxPendingWrites;
}
@Override @Override
public MessageBuf<Object> newOutboundBuffer(ChannelHandlerContext ctx) throws Exception { public MessageBuf<Object> newOutboundBuffer(ChannelHandlerContext ctx) throws Exception {
this.ctx = ctx;
return queue; return queue;
} }
private boolean isWritable() { private boolean isWritable() {
return pendingWrites.get() < MAX_PENDING_WRITES; return pendingWrites.get() < maxPendingWrites;
} }
/** /**
@ -136,9 +148,10 @@ public class ChunkedWriteHandler
super.channelInactive(ctx); super.channelInactive(ctx);
} }
private void discard(final ChannelHandlerContext ctx, final Throwable cause) { private void discard(final ChannelHandlerContext ctx, Throwable cause) {
boolean fireExceptionCaught = false; boolean fireExceptionCaught = false;
boolean success = true;
for (;;) { for (;;) {
Object currentEvent = this.currentEvent; Object currentEvent = this.currentEvent;
@ -153,10 +166,27 @@ public class ChunkedWriteHandler
} }
if (currentEvent instanceof ChunkedInput) { if (currentEvent instanceof ChunkedInput) {
closeInput((ChunkedInput<?>) currentEvent); ChunkedInput<?> in = (ChunkedInput<?>) currentEvent;
try {
if (!in.isEndOfInput()) {
success = false;
}
} catch (Exception e) {
success = false;
logger.warn(ChunkedInput.class.getSimpleName() + ".isEndOfInput() failed", e);
}
closeInput(in);
} else if (currentEvent instanceof ChannelFuture) { } else if (currentEvent instanceof ChannelFuture) {
ChannelFuture f = (ChannelFuture) currentEvent;
if (!success) {
fireExceptionCaught = true; fireExceptionCaught = true;
((ChannelFuture) currentEvent).setFailure(cause); if (cause == null) {
cause = new ClosedChannelException();
}
f.setFailure(cause);
} else {
f.setSuccess();
}
} }
} }
@ -168,7 +198,7 @@ public class ChunkedWriteHandler
private void doFlush(final ChannelHandlerContext ctx) throws Exception { private void doFlush(final ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel(); Channel channel = ctx.channel();
if (!channel.isActive()) { if (!channel.isActive()) {
discard(ctx, new ClosedChannelException()); discard(ctx, null);
return; return;
} }
while (isWritable()) { while (isWritable()) {