Update ChunkedWriteHandler to the latest revision at branch 3

This commit is contained in:
Trustin Lee 2012-05-30 18:47:55 -07:00
parent 7b434e225f
commit 57d3d0cbb5
2 changed files with 17 additions and 36 deletions

View File

@ -1,14 +0,0 @@
package io.netty.array;
public class ObjectArray<E> extends AbstractArray<E> {
public ObjectArray(E[] array, int offset, int length) {
super(array, offset, length);
}
@Override
public E[] array() {
return (E[]) super.array();
}
}

View File

@ -94,7 +94,6 @@ public class ChunkedWriteHandler implements ChannelUpstreamHandler, ChannelDowns
}
}
@Override
public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent e)
throws Exception {
if (!(e instanceof MessageEvent)) {
@ -105,17 +104,15 @@ public class ChunkedWriteHandler implements ChannelUpstreamHandler, ChannelDowns
boolean offered = queue.offer((MessageEvent) e);
assert offered;
final Channel channel = ctx.channel();
if (channel.isWritable()) {
final Channel channel = ctx.getChannel();
// call flush if the channel is writable or not connected. flush(..) will take care of the rest
if (channel.isWritable() || !channel.isConnected()) {
this.ctx = ctx;
flush(ctx, false);
} else if (!channel.isConnected()) {
this.ctx = ctx;
discard(ctx, false);
}
}
@Override
public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e)
throws Exception {
if (e instanceof ChannelStateEvent) {
@ -128,7 +125,7 @@ public class ChunkedWriteHandler implements ChannelUpstreamHandler, ChannelDowns
case OPEN:
if (!Boolean.TRUE.equals(cse.getValue())) {
// Fail all pending writes
discard(ctx, true);
flush(ctx, true);
}
break;
}
@ -138,7 +135,6 @@ public class ChunkedWriteHandler implements ChannelUpstreamHandler, ChannelDowns
private void discard(ChannelHandlerContext ctx, boolean fireNow) {
ClosedChannelException cause = null;
boolean fireExceptionCaught = false;
for (;;) {
MessageEvent currentEvent = this.currentEvent;
@ -164,15 +160,16 @@ public class ChunkedWriteHandler implements ChannelUpstreamHandler, ChannelDowns
cause = new ClosedChannelException();
}
currentEvent.getFuture().setFailure(cause);
fireExceptionCaught = true;
currentEvent = null;
}
if (fireExceptionCaught) {
if (cause != null) {
if (fireNow) {
fireExceptionCaught(ctx, cause);
Channels.fireExceptionCaught(ctx.getChannel(), cause);
} else {
fireExceptionCaughtLater(ctx, cause);
Channels.fireExceptionCaughtLater(ctx.getChannel(), cause);
}
}
}
@ -187,6 +184,7 @@ public class ChunkedWriteHandler implements ChannelUpstreamHandler, ChannelDowns
if (!channel.isConnected()) {
discard(ctx, fireNow);
return;
}
while (channel.isWritable()) {
@ -236,8 +234,8 @@ public class ChunkedWriteHandler implements ChannelUpstreamHandler, ChannelDowns
if (suspend) {
// ChunkedInput.nextChunk() returned null and it has
// not reached at the end of input. Let's wait until
// more chunks arrive. Nothing to write or notify.
// not reached at the end of input. Let's wait until
// more chunks arrive. Nothing to write or notify.
break;
} else {
ChannelFuture writeFuture;
@ -250,6 +248,7 @@ public class ChunkedWriteHandler implements ChannelUpstreamHandler, ChannelDowns
//
// See https://github.com/netty/netty/issues/303
writeFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
closeInput(chunks);
@ -280,16 +279,17 @@ public class ChunkedWriteHandler implements ChannelUpstreamHandler, ChannelDowns
if (!channel.isConnected()) {
discard(ctx, fireNow);
break;
return;
}
}
} finally {
// mark the flush as done
flush.set(false);
}
}
if (acquired && !channel.isConnected() || channel.isWritable() && !queue.isEmpty()) {
if (acquired && (!channel.isConnected() || channel.isWritable() && !queue.isEmpty())) {
flush(ctx, fireNow);
}
}
@ -304,20 +304,16 @@ public class ChunkedWriteHandler implements ChannelUpstreamHandler, ChannelDowns
}
}
@Override
public void beforeAdd(ChannelHandlerContext ctx) throws Exception {
// nothing to do
}
@Override
public void afterAdd(ChannelHandlerContext ctx) throws Exception {
// nothing to do
}
@Override
public void beforeRemove(ChannelHandlerContext ctx) throws Exception {
// try to flush again a last time.
//
@ -326,7 +322,6 @@ public class ChunkedWriteHandler implements ChannelUpstreamHandler, ChannelDowns
}
// This method should not need any synchronization as the ChunkedWriteHandler will not receive any new events
@Override
public void afterRemove(ChannelHandlerContext ctx) throws Exception {
// Fail all MessageEvent's that are left. This is needed because otherwise we would never notify the
// ChannelFuture and the registered FutureListener. See #304