Eliminate most of the synchonization stuff in ChunkedWriteHandler as its not needed anymore with the new thread-model

This commit is contained in:
norman 2012-06-05 14:13:56 +02:00
parent caa35c9772
commit 12069d3bf4

View File

@ -30,11 +30,10 @@ import io.netty.channel.ChannelOutboundHandlerContext;
import io.netty.channel.ChannelPipeline;
import io.netty.logging.InternalLogger;
import io.netty.logging.InternalLoggerFactory;
import io.netty.util.internal.QueueFactory;
import java.nio.channels.ClosedChannelException;
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
/**
@ -77,11 +76,10 @@ public class ChunkedWriteHandler extends ChannelHandlerAdapter<Object, Object> {
private static final int MAX_PENDING_WRITES = 4;
private final Queue<Object> queue = QueueFactory.createQueue();
private final Queue<Object> queue = new LinkedList<Object>();
private volatile ChannelHandlerContext ctx;
private final AtomicInteger pendingWrites = new AtomicInteger();
private final AtomicBoolean flush = new AtomicBoolean(false);
private Object currentEvent;
@Override
@ -105,17 +103,33 @@ public class ChunkedWriteHandler extends ChannelHandlerAdapter<Object, Object> {
* Continues to fetch the chunks from the input.
*/
public void resumeTransfer() {
ChannelHandlerContext ctx = this.ctx;
final ChannelHandlerContext ctx = this.ctx;
if (ctx == null) {
return;
}
try {
doFlush(ctx, false);
} catch (Exception e) {
if (logger.isWarnEnabled()) {
logger.warn("Unexpected exception while sending chunks.", e);
if (ctx.executor().inEventLoop()) {
try {
doFlush(ctx);
} catch (Exception e) {
if (logger.isWarnEnabled()) {
logger.warn("Unexpected exception while sending chunks.", e);
}
}
} else {
// let the transfer resume on the next event loop round
ctx.executor().execute(new Runnable() {
@Override
public void run() {
try {
doFlush(ctx);
} catch (Exception e) {
if (logger.isWarnEnabled()) {
logger.warn("Unexpected exception while sending chunks.", e);
}
}
}
});
}
}
@ -123,13 +137,13 @@ public class ChunkedWriteHandler extends ChannelHandlerAdapter<Object, Object> {
public void flush(ChannelOutboundHandlerContext<Object> ctx, ChannelFuture future) throws Exception {
queue.add(future);
if (isWritable() || !ctx.channel().isActive()) {
doFlush(ctx, false);
doFlush(ctx);
}
}
@Override
public void channelInactive(ChannelInboundHandlerContext<Object> ctx) throws Exception {
doFlush(ctx, true);
doFlush(ctx);
super.channelInactive(ctx);
}
@ -158,141 +172,121 @@ public class ChunkedWriteHandler extends ChannelHandlerAdapter<Object, Object> {
}
if (fireExceptionCaught) {
if (ctx.executor().inEventLoop()) {
ctx.fireExceptionCaught(cause);
} else {
ctx.executor().execute(new Runnable() {
@Override
public void run() {
ctx.fireExceptionCaught(cause);
}
});
}
ctx.fireExceptionCaught(cause);
}
}
private void doFlush(final ChannelHandlerContext ctx, boolean fireNow) throws Exception {
boolean acquired = false;
private void doFlush(final ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
// use CAS to see if the have flush already running, if so we don't need to take further actions
if (acquired = flush.compareAndSet(false, true)) {
try {
if (!channel.isActive()) {
discard(ctx, new ClosedChannelException());
return;
}
while (isWritable()) {
if (currentEvent == null) {
currentEvent = queue.poll();
}
if (!channel.isActive()) {
discard(ctx, new ClosedChannelException());
return;
}
while (isWritable()) {
if (currentEvent == null) {
currentEvent = queue.poll();
}
if (currentEvent == null) {
break;
}
if (currentEvent == null) {
break;
}
final Object currentEvent = this.currentEvent;
if (currentEvent instanceof ChannelFuture) {
this.currentEvent = null;
ctx.flush((ChannelFuture) currentEvent);
} else if (currentEvent instanceof ChunkedInput) {
final ChunkedInput chunks = (ChunkedInput) currentEvent;
Object chunk;
boolean endOfInput;
boolean suspend;
try {
chunk = chunks.nextChunk();
endOfInput = chunks.isEndOfInput();
if (chunk == null) {
chunk = ChannelBuffers.EMPTY_BUFFER;
// No need to suspend when reached at the end.
suspend = !endOfInput;
} else {
suspend = false;
}
} catch (final Throwable t) {
this.currentEvent = null;
if (ctx.executor().inEventLoop()) {
ctx.fireExceptionCaught(t);
} else {
ctx.executor().execute(new Runnable() {
@Override
public void run() {
ctx.fireExceptionCaught(t);
}
});
}
closeInput(chunks);
break;
}
if (suspend) {
// ChunkedInput.nextChunk() returned null and it has
// not reached at the end of input. Let's wait until
// more chunks arrive. Nothing to write or notify.
break;
}
pendingWrites.incrementAndGet();
ctx.nextOutboundMessageBuffer().add(chunk);
ChannelFuture f = ctx.flush();
if (endOfInput) {
this.currentEvent = null;
// Register a listener which will close the input once the write is complete. This is needed because the Chunk may have
// some resource bound that can not be closed before its not written
//
// See https://github.com/netty/netty/issues/303
f.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
pendingWrites.decrementAndGet();
closeInput(chunks);
}
});
} else if (isWritable()) {
f.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
pendingWrites.decrementAndGet();
if (!future.isSuccess()) {
closeInput((ChunkedInput) currentEvent);
}
}
});
} else {
f.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
pendingWrites.decrementAndGet();
if (!future.isSuccess()) {
closeInput((ChunkedInput) currentEvent);
} else if (isWritable()) {
resumeTransfer();
}
}
});
}
final Object currentEvent = this.currentEvent;
if (currentEvent instanceof ChannelFuture) {
this.currentEvent = null;
ctx.flush((ChannelFuture) currentEvent);
} else if (currentEvent instanceof ChunkedInput) {
final ChunkedInput chunks = (ChunkedInput) currentEvent;
Object chunk;
boolean endOfInput;
boolean suspend;
try {
chunk = chunks.nextChunk();
endOfInput = chunks.isEndOfInput();
if (chunk == null) {
chunk = ChannelBuffers.EMPTY_BUFFER;
// No need to suspend when reached at the end.
suspend = !endOfInput;
} else {
ctx.nextOutboundMessageBuffer().add(currentEvent);
this.currentEvent = null;
suspend = false;
}
} catch (final Throwable t) {
this.currentEvent = null;
if (ctx.executor().inEventLoop()) {
ctx.fireExceptionCaught(t);
} else {
ctx.executor().execute(new Runnable() {
@Override
public void run() {
ctx.fireExceptionCaught(t);
}
});
}
if (!channel.isActive()) {
discard(ctx, new ClosedChannelException());
return;
}
closeInput(chunks);
break;
}
} finally {
// mark the flush as done
flush.set(false);
if (suspend) {
// ChunkedInput.nextChunk() returned null and it has
// not reached at the end of input. Let's wait until
// more chunks arrive. Nothing to write or notify.
break;
}
pendingWrites.incrementAndGet();
ctx.nextOutboundMessageBuffer().add(chunk);
ChannelFuture f = ctx.flush();
if (endOfInput) {
this.currentEvent = null;
// Register a listener which will close the input once the write is complete. This is needed because the Chunk may have
// some resource bound that can not be closed before its not written
//
// See https://github.com/netty/netty/issues/303
f.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
pendingWrites.decrementAndGet();
closeInput(chunks);
}
});
} else if (isWritable()) {
f.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
pendingWrites.decrementAndGet();
if (!future.isSuccess()) {
closeInput((ChunkedInput) currentEvent);
}
}
});
} else {
f.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
pendingWrites.decrementAndGet();
if (!future.isSuccess()) {
closeInput((ChunkedInput) currentEvent);
} else if (isWritable()) {
resumeTransfer();
}
}
});
}
} else {
ctx.nextOutboundMessageBuffer().add(currentEvent);
this.currentEvent = null;
}
if (!channel.isActive()) {
discard(ctx, new ClosedChannelException());
return;
}
}
if (acquired && (!channel.isActive() || isWritable() && !queue.isEmpty())) {
doFlush(ctx, fireNow);
}
}
static void closeInput(ChunkedInput chunks) {
@ -310,7 +304,7 @@ public class ChunkedWriteHandler extends ChannelHandlerAdapter<Object, Object> {
// try to flush again a last time.
//
// See #304
doFlush(ctx, false);
doFlush(ctx);
}
// This method should not need any synchronization as the ChunkedWriteHandler will not receive any new events