Fail all queued writes if the ChunkedWriteHandler is removed from the ChannelPipeline. See #304
This commit is contained in:
parent
769275e751
commit
33ff0421e2
|
@ -17,6 +17,7 @@ package org.jboss.netty.handler.stream;
|
||||||
|
|
||||||
import static org.jboss.netty.channel.Channels.*;
|
import static org.jboss.netty.channel.Channels.*;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
import java.nio.channels.ClosedChannelException;
|
import java.nio.channels.ClosedChannelException;
|
||||||
import java.util.Queue;
|
import java.util.Queue;
|
||||||
|
|
||||||
|
@ -32,6 +33,7 @@ import org.jboss.netty.channel.ChannelPipeline;
|
||||||
import org.jboss.netty.channel.ChannelStateEvent;
|
import org.jboss.netty.channel.ChannelStateEvent;
|
||||||
import org.jboss.netty.channel.ChannelUpstreamHandler;
|
import org.jboss.netty.channel.ChannelUpstreamHandler;
|
||||||
import org.jboss.netty.channel.Channels;
|
import org.jboss.netty.channel.Channels;
|
||||||
|
import org.jboss.netty.channel.LifeCycleAwareChannelHandler;
|
||||||
import org.jboss.netty.channel.MessageEvent;
|
import org.jboss.netty.channel.MessageEvent;
|
||||||
import org.jboss.netty.logging.InternalLogger;
|
import org.jboss.netty.logging.InternalLogger;
|
||||||
import org.jboss.netty.logging.InternalLoggerFactory;
|
import org.jboss.netty.logging.InternalLoggerFactory;
|
||||||
|
@ -70,7 +72,7 @@ import org.jboss.netty.util.internal.QueueFactory;
|
||||||
* @apiviz.landmark
|
* @apiviz.landmark
|
||||||
* @apiviz.has org.jboss.netty.handler.stream.ChunkedInput oneway - - reads from
|
* @apiviz.has org.jboss.netty.handler.stream.ChunkedInput oneway - - reads from
|
||||||
*/
|
*/
|
||||||
public class ChunkedWriteHandler implements ChannelUpstreamHandler, ChannelDownstreamHandler {
|
public class ChunkedWriteHandler implements ChannelUpstreamHandler, ChannelDownstreamHandler, LifeCycleAwareChannelHandler {
|
||||||
|
|
||||||
private static final InternalLogger logger =
|
private static final InternalLogger logger =
|
||||||
InternalLoggerFactory.getInstance(ChunkedWriteHandler.class);
|
InternalLoggerFactory.getInstance(ChunkedWriteHandler.class);
|
||||||
|
@ -80,13 +82,6 @@ public class ChunkedWriteHandler implements ChannelUpstreamHandler, ChannelDowns
|
||||||
private volatile ChannelHandlerContext ctx;
|
private volatile ChannelHandlerContext ctx;
|
||||||
private MessageEvent currentEvent;
|
private MessageEvent currentEvent;
|
||||||
|
|
||||||
/**
|
|
||||||
* Creates a new instance.
|
|
||||||
*/
|
|
||||||
public ChunkedWriteHandler() {
|
|
||||||
super();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Continues to fetch the chunks from the input.
|
* Continues to fetch the chunks from the input.
|
||||||
*/
|
*/
|
||||||
|
@ -299,4 +294,62 @@ public class ChunkedWriteHandler implements ChannelUpstreamHandler, ChannelDowns
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void beforeAdd(ChannelHandlerContext ctx) throws Exception {
|
||||||
|
// nothing to do
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
public void afterAdd(ChannelHandlerContext ctx) throws Exception {
|
||||||
|
// nothing to do
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
public void beforeRemove(ChannelHandlerContext ctx) throws Exception {
|
||||||
|
// try to flush again a last time.
|
||||||
|
//
|
||||||
|
// See #304
|
||||||
|
flush(ctx, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
// This method should not need any synchronization as the ChunkedWriteHandler will not receive any new events
|
||||||
|
public void afterRemove(ChannelHandlerContext ctx) throws Exception {
|
||||||
|
// Fail all MessageEvent's that are left. This is needed because otherwise we would never notify the
|
||||||
|
// ChannelFuture and the registered FutureListener. See #304
|
||||||
|
//
|
||||||
|
Throwable cause = null;
|
||||||
|
boolean fireExceptionCaught = false;
|
||||||
|
|
||||||
|
for (;;) {
|
||||||
|
MessageEvent currentEvent = this.currentEvent;
|
||||||
|
|
||||||
|
if (this.currentEvent == null) {
|
||||||
|
currentEvent = queue.poll();
|
||||||
|
} else {
|
||||||
|
this.currentEvent = null;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (currentEvent == null) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
Object m = currentEvent.getMessage();
|
||||||
|
if (m instanceof ChunkedInput) {
|
||||||
|
closeInput((ChunkedInput) m);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create exception
|
||||||
|
if (cause == null) {
|
||||||
|
cause = new IOException("Unable to flush event, discarding");
|
||||||
|
}
|
||||||
|
currentEvent.getFuture().setFailure(cause);
|
||||||
|
fireExceptionCaught = true;
|
||||||
|
|
||||||
|
currentEvent = null;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (fireExceptionCaught) {
|
||||||
|
Channels.fireExceptionCaughtLater(ctx.getChannel(), cause);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue
Block a user