Allow SingleOutboundMessageHandler.beginFlush() to reject the flush request by returning false / Replace PartialFlushException with IncompleteFlushException which is more correct.

This commit is contained in:
Trustin Lee 2013-03-12 15:20:46 +09:00
parent 83cdbeca1d
commit 397830d238
5 changed files with 42 additions and 25 deletions

View File

@ -20,7 +20,7 @@ import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundByteHandlerAdapter; import io.netty.channel.ChannelOutboundByteHandlerAdapter;
import io.netty.channel.ChannelPromise; import io.netty.channel.ChannelPromise;
import io.netty.channel.FileRegion; import io.netty.channel.FileRegion;
import io.netty.channel.PartialFlushException; import io.netty.channel.IncompleteFlushException;
import java.io.EOFException; import java.io.EOFException;
import java.io.IOException; import java.io.IOException;
@ -69,7 +69,7 @@ public abstract class ByteToByteEncoder extends ChannelOutboundByteHandlerAdapte
cause = new EncoderException(t); cause = new EncoderException(t);
} }
if (encoded) { if (encoded) {
cause = new PartialFlushException("Unable to encoded all bytes", cause); cause = new IncompleteFlushException("Unable to encoded all bytes", cause);
} }
in.discardSomeReadBytes(); in.discardSomeReadBytes();
promise.setFailure(cause); promise.setFailure(cause);

View File

@ -93,7 +93,11 @@ public final class ChannelHandlerUtil {
int processed = 0; int processed = 0;
try { try {
handler.beginFlush(ctx); if (!handler.beginFlush(ctx)) {
throw new IncompleteFlushException(
"beginFlush(..) rejected the flush request by returning false. " +
"none of " + inSize + " message(s) fulshed.");
}
for (;;) { for (;;) {
Object msg = in.poll(); Object msg = in.poll();
if (msg == null) { if (msg == null) {
@ -116,14 +120,18 @@ public final class ChannelHandlerUtil {
} }
} }
} catch (Throwable t) { } catch (Throwable t) {
PartialFlushException pfe; IncompleteFlushException pfe;
String msg = processed + " out of " + inSize + " message(s) flushed"; if (t instanceof IncompleteFlushException) {
if (t instanceof Signal) { pfe = (IncompleteFlushException) t;
Signal abort = (Signal) t;
abort.expect(ABORT);
pfe = new PartialFlushException("aborted: " + msg);
} else { } else {
pfe = new PartialFlushException(msg, t); String msg = processed + " out of " + inSize + " message(s) flushed";
if (t instanceof Signal) {
Signal abort = (Signal) t;
abort.expect(ABORT);
pfe = new IncompleteFlushException("aborted: " + msg);
} else {
pfe = new IncompleteFlushException(msg, t);
}
} }
fail(ctx, promise, closeOnFailedFlush, pfe); fail(ctx, promise, closeOnFailedFlush, pfe);
} }
@ -284,8 +292,11 @@ public final class ChannelHandlerUtil {
* was called. * was called.
* *
* @param ctx the {@link ChannelHandlerContext} which this {@link ChannelHandler} belongs to * @param ctx the {@link ChannelHandlerContext} which this {@link ChannelHandler} belongs to
*
* @return {@code true} to accept the flush request. {@code false} to reject the flush request and
* to fail the promise associated with the flush request with {@link IncompleteFlushException}.
*/ */
void beginFlush(ChannelHandlerContext ctx) throws Exception; boolean beginFlush(ChannelHandlerContext ctx) throws Exception;
/** /**
* Is called once a message is being flushed. * Is called once a message is being flushed.

View File

@ -15,9 +15,10 @@
*/ */
package io.netty.channel; package io.netty.channel;
import io.netty.util.concurrent.EventExecutor;
import java.net.ConnectException; import java.net.ConnectException;
import java.net.SocketAddress; import java.net.SocketAddress;
import io.netty.util.concurrent.EventExecutor;
/** /**
* Interface which is shared by others which need to execute outbound logic. * Interface which is shared by others which need to execute outbound logic.
*/ */
@ -104,9 +105,9 @@ interface ChannelOutboundInvoker {
* an error. * an error.
* <p> * <p>
* Be aware that the flush could be only partially successful. In such cases the {@link ChannelFuture} will be * Be aware that the flush could be only partially successful. In such cases the {@link ChannelFuture} will be
* failed with an {@link PartialFlushException}. So if you are interested to know if it was partial successful you * failed with an {@link IncompleteFlushException}. So if you are interested to know if it was partial successful you
* need to check if the returned {@link ChannelFuture#cause()} returns an instance of * need to check if the returned {@link ChannelFuture#cause()} returns an instance of
* {@link PartialFlushException}. In such cases you may want to call {@link #flush(ChannelPromise)} or * {@link IncompleteFlushException}. In such cases you may want to call {@link #flush(ChannelPromise)} or
* {@link #flush()} to flush the rest of the data or just close the connection via {@link #close(ChannelPromise)} or * {@link #flush()} to flush the rest of the data or just close the connection via {@link #close(ChannelPromise)} or
* {@link #close()} if it is not possible to recover. * {@link #close()} if it is not possible to recover.
* <p> * <p>
@ -124,7 +125,7 @@ interface ChannelOutboundInvoker {
* If you want to write a {@link FileRegion} use {@link #sendFile(FileRegion)}. * If you want to write a {@link FileRegion} use {@link #sendFile(FileRegion)}.
* <p> * <p>
* Be aware that the write could be only partially successful as the message may need to get encoded before write it * Be aware that the write could be only partially successful as the message may need to get encoded before write it
* to the remote peer. In such cases the {@link ChannelFuture} will be failed with a {@link PartialFlushException}. * to the remote peer. In such cases the {@link ChannelFuture} will be failed with a {@link IncompleteFlushException}.
* In such cases you may want to call {@link #flush(ChannelPromise)} or {@link #flush()} to flush the rest of the * In such cases you may want to call {@link #flush(ChannelPromise)} or {@link #flush()} to flush the rest of the
* data or just close the connection via {@link #close(ChannelPromise)} or {@link #close()} if it is not possible * data or just close the connection via {@link #close(ChannelPromise)} or {@link #close()} if it is not possible
* to recover. * to recover.
@ -254,9 +255,9 @@ interface ChannelOutboundInvoker {
* an error. * an error.
* <p> * <p>
* Be aware that the flush could be only partially successful. In such cases the {@link ChannelFuture} will be * Be aware that the flush could be only partially successful. In such cases the {@link ChannelFuture} will be
* failed with an {@link PartialFlushException}. So if you are interested to know if it was partial successful you * failed with an {@link IncompleteFlushException}. So if you are interested to know if it was partial successful you
* need to check if the returned {@link ChannelFuture#cause()} returns an instance of * need to check if the returned {@link ChannelFuture#cause()} returns an instance of
* {@link PartialFlushException}. In such cases you may want to call {@link #flush(ChannelPromise)} or * {@link IncompleteFlushException}. In such cases you may want to call {@link #flush(ChannelPromise)} or
* {@link #flush()} to flush the rest of the data or just close the connection via {@link #close(ChannelPromise)} or * {@link #flush()} to flush the rest of the data or just close the connection via {@link #close(ChannelPromise)} or
* {@link #close()} if it is not possible to recover. * {@link #close()} if it is not possible to recover.
* *
@ -277,7 +278,7 @@ interface ChannelOutboundInvoker {
* If you want to write a {@link FileRegion} use {@link #sendFile(FileRegion)}. * If you want to write a {@link FileRegion} use {@link #sendFile(FileRegion)}.
* <p> * <p>
* Be aware that the write could be only partially successful as the message may need to get encoded before write it * Be aware that the write could be only partially successful as the message may need to get encoded before write it
* to the remote peer. In such cases the {@link ChannelFuture} will be failed with a {@link PartialFlushException}. * to the remote peer. In such cases the {@link ChannelFuture} will be failed with a {@link IncompleteFlushException}.
* In such cases you may want to call {@link #flush(ChannelPromise)} or {@link #flush()} to flush the rest of the * In such cases you may want to call {@link #flush(ChannelPromise)} or {@link #flush()} to flush the rest of the
* data or just close the connection via {@link #close(ChannelPromise)} or {@link #close()} if it is not possible * data or just close the connection via {@link #close(ChannelPromise)} or {@link #close()} if it is not possible
* to recover. * to recover.

View File

@ -76,7 +76,9 @@ public abstract class ChannelOutboundMessageHandlerAdapter<I>
} }
@Override @Override
public void beginFlush(ChannelHandlerContext ctx) throws Exception { } public boolean beginFlush(ChannelHandlerContext ctx) throws Exception {
return true;
}
@Override @Override
public void endFlush(ChannelHandlerContext ctx) throws Exception { } public void endFlush(ChannelHandlerContext ctx) throws Exception { }

View File

@ -16,22 +16,25 @@
package io.netty.channel; package io.netty.channel;
/** /**
* Special {@link RuntimeException} which will be used by {@link ChannelOutboundInvoker#flush(ChannelPromise)}, * Special {@link ChannelException} which will be used by {@link ChannelOutboundInvoker#flush(ChannelPromise)},
* {@link ChannelOutboundInvoker#flush()}, {@link ChannelOutboundInvoker#write(Object)} and * {@link ChannelOutboundInvoker#flush()}, {@link ChannelOutboundInvoker#write(Object)} and
* {@link ChannelOutboundInvoker#write(Object, ChannelPromise)} if the operation was only partial successful. * {@link ChannelOutboundInvoker#write(Object, ChannelPromise)} if the operation was only partial successful.
*/ */
public class PartialFlushException extends RuntimeException { public class IncompleteFlushException extends ChannelException {
private static final long serialVersionUID = 990261865971015004L;
public PartialFlushException(String message) { private static final long serialVersionUID = -9049491093800487565L;
public IncompleteFlushException() { }
public IncompleteFlushException(String message) {
super(message); super(message);
} }
public PartialFlushException(String message, Throwable cause) { public IncompleteFlushException(String message, Throwable cause) {
super(message, cause); super(message, cause);
} }
public PartialFlushException(Throwable cause) { public IncompleteFlushException(Throwable cause) {
super(cause); super(cause);
} }
} }