diff --git a/codec/src/main/java/io/netty/handler/codec/ByteToByteEncoder.java b/codec/src/main/java/io/netty/handler/codec/ByteToByteEncoder.java index 478b3c7fd7..4fc684c71a 100644 --- a/codec/src/main/java/io/netty/handler/codec/ByteToByteEncoder.java +++ b/codec/src/main/java/io/netty/handler/codec/ByteToByteEncoder.java @@ -20,7 +20,7 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelOutboundByteHandlerAdapter; import io.netty.channel.ChannelPromise; import io.netty.channel.FileRegion; -import io.netty.channel.PartialFlushException; +import io.netty.channel.IncompleteFlushException; import java.io.EOFException; import java.io.IOException; @@ -69,7 +69,7 @@ public abstract class ByteToByteEncoder extends ChannelOutboundByteHandlerAdapte cause = new EncoderException(t); } if (encoded) { - cause = new PartialFlushException("Unable to encoded all bytes", cause); + cause = new IncompleteFlushException("Unable to encoded all bytes", cause); } in.discardSomeReadBytes(); promise.setFailure(cause); diff --git a/transport/src/main/java/io/netty/channel/ChannelHandlerUtil.java b/transport/src/main/java/io/netty/channel/ChannelHandlerUtil.java index ca68e6cc8c..596265abc4 100644 --- a/transport/src/main/java/io/netty/channel/ChannelHandlerUtil.java +++ b/transport/src/main/java/io/netty/channel/ChannelHandlerUtil.java @@ -93,7 +93,11 @@ public final class ChannelHandlerUtil { int processed = 0; try { - handler.beginFlush(ctx); + if (!handler.beginFlush(ctx)) { + throw new IncompleteFlushException( + "beginFlush(..) rejected the flush request by returning false. " + + "none of " + inSize + " message(s) fulshed."); + } for (;;) { Object msg = in.poll(); if (msg == null) { @@ -116,14 +120,18 @@ public final class ChannelHandlerUtil { } } } catch (Throwable t) { - PartialFlushException pfe; - String msg = processed + " out of " + inSize + " message(s) flushed"; - if (t instanceof Signal) { - Signal abort = (Signal) t; - abort.expect(ABORT); - pfe = new PartialFlushException("aborted: " + msg); + IncompleteFlushException pfe; + if (t instanceof IncompleteFlushException) { + pfe = (IncompleteFlushException) t; } else { - pfe = new PartialFlushException(msg, t); + String msg = processed + " out of " + inSize + " message(s) flushed"; + if (t instanceof Signal) { + Signal abort = (Signal) t; + abort.expect(ABORT); + pfe = new IncompleteFlushException("aborted: " + msg); + } else { + pfe = new IncompleteFlushException(msg, t); + } } fail(ctx, promise, closeOnFailedFlush, pfe); } @@ -284,8 +292,11 @@ public final class ChannelHandlerUtil { * was called. * * @param ctx the {@link ChannelHandlerContext} which this {@link ChannelHandler} belongs to + * + * @return {@code true} to accept the flush request. {@code false} to reject the flush request and + * to fail the promise associated with the flush request with {@link IncompleteFlushException}. */ - void beginFlush(ChannelHandlerContext ctx) throws Exception; + boolean beginFlush(ChannelHandlerContext ctx) throws Exception; /** * Is called once a message is being flushed. diff --git a/transport/src/main/java/io/netty/channel/ChannelOutboundInvoker.java b/transport/src/main/java/io/netty/channel/ChannelOutboundInvoker.java index 2444d0c58f..b7a388a3d9 100644 --- a/transport/src/main/java/io/netty/channel/ChannelOutboundInvoker.java +++ b/transport/src/main/java/io/netty/channel/ChannelOutboundInvoker.java @@ -15,9 +15,10 @@ */ package io.netty.channel; +import io.netty.util.concurrent.EventExecutor; + import java.net.ConnectException; import java.net.SocketAddress; -import io.netty.util.concurrent.EventExecutor; /** * Interface which is shared by others which need to execute outbound logic. */ @@ -104,9 +105,9 @@ interface ChannelOutboundInvoker { * an error. *

* Be aware that the flush could be only partially successful. In such cases the {@link ChannelFuture} will be - * failed with an {@link PartialFlushException}. So if you are interested to know if it was partial successful you + * failed with an {@link IncompleteFlushException}. So if you are interested to know if it was partial successful you * need to check if the returned {@link ChannelFuture#cause()} returns an instance of - * {@link PartialFlushException}. In such cases you may want to call {@link #flush(ChannelPromise)} or + * {@link IncompleteFlushException}. In such cases you may want to call {@link #flush(ChannelPromise)} or * {@link #flush()} to flush the rest of the data or just close the connection via {@link #close(ChannelPromise)} or * {@link #close()} if it is not possible to recover. *

@@ -124,7 +125,7 @@ interface ChannelOutboundInvoker { * If you want to write a {@link FileRegion} use {@link #sendFile(FileRegion)}. *

* Be aware that the write could be only partially successful as the message may need to get encoded before write it - * to the remote peer. In such cases the {@link ChannelFuture} will be failed with a {@link PartialFlushException}. + * to the remote peer. In such cases the {@link ChannelFuture} will be failed with a {@link IncompleteFlushException}. * In such cases you may want to call {@link #flush(ChannelPromise)} or {@link #flush()} to flush the rest of the * data or just close the connection via {@link #close(ChannelPromise)} or {@link #close()} if it is not possible * to recover. @@ -254,9 +255,9 @@ interface ChannelOutboundInvoker { * an error. *

* Be aware that the flush could be only partially successful. In such cases the {@link ChannelFuture} will be - * failed with an {@link PartialFlushException}. So if you are interested to know if it was partial successful you + * failed with an {@link IncompleteFlushException}. So if you are interested to know if it was partial successful you * need to check if the returned {@link ChannelFuture#cause()} returns an instance of - * {@link PartialFlushException}. In such cases you may want to call {@link #flush(ChannelPromise)} or + * {@link IncompleteFlushException}. In such cases you may want to call {@link #flush(ChannelPromise)} or * {@link #flush()} to flush the rest of the data or just close the connection via {@link #close(ChannelPromise)} or * {@link #close()} if it is not possible to recover. * @@ -277,7 +278,7 @@ interface ChannelOutboundInvoker { * If you want to write a {@link FileRegion} use {@link #sendFile(FileRegion)}. *

* Be aware that the write could be only partially successful as the message may need to get encoded before write it - * to the remote peer. In such cases the {@link ChannelFuture} will be failed with a {@link PartialFlushException}. + * to the remote peer. In such cases the {@link ChannelFuture} will be failed with a {@link IncompleteFlushException}. * In such cases you may want to call {@link #flush(ChannelPromise)} or {@link #flush()} to flush the rest of the * data or just close the connection via {@link #close(ChannelPromise)} or {@link #close()} if it is not possible * to recover. diff --git a/transport/src/main/java/io/netty/channel/ChannelOutboundMessageHandlerAdapter.java b/transport/src/main/java/io/netty/channel/ChannelOutboundMessageHandlerAdapter.java index ed090219d8..5bdd85243e 100644 --- a/transport/src/main/java/io/netty/channel/ChannelOutboundMessageHandlerAdapter.java +++ b/transport/src/main/java/io/netty/channel/ChannelOutboundMessageHandlerAdapter.java @@ -76,7 +76,9 @@ public abstract class ChannelOutboundMessageHandlerAdapter } @Override - public void beginFlush(ChannelHandlerContext ctx) throws Exception { } + public boolean beginFlush(ChannelHandlerContext ctx) throws Exception { + return true; + } @Override public void endFlush(ChannelHandlerContext ctx) throws Exception { } diff --git a/transport/src/main/java/io/netty/channel/PartialFlushException.java b/transport/src/main/java/io/netty/channel/IncompleteFlushException.java similarity index 70% rename from transport/src/main/java/io/netty/channel/PartialFlushException.java rename to transport/src/main/java/io/netty/channel/IncompleteFlushException.java index 84db69e669..31bd5bcca1 100644 --- a/transport/src/main/java/io/netty/channel/PartialFlushException.java +++ b/transport/src/main/java/io/netty/channel/IncompleteFlushException.java @@ -16,22 +16,25 @@ package io.netty.channel; /** - * Special {@link RuntimeException} which will be used by {@link ChannelOutboundInvoker#flush(ChannelPromise)}, + * Special {@link ChannelException} which will be used by {@link ChannelOutboundInvoker#flush(ChannelPromise)}, * {@link ChannelOutboundInvoker#flush()}, {@link ChannelOutboundInvoker#write(Object)} and * {@link ChannelOutboundInvoker#write(Object, ChannelPromise)} if the operation was only partial successful. */ -public class PartialFlushException extends RuntimeException { - private static final long serialVersionUID = 990261865971015004L; +public class IncompleteFlushException extends ChannelException { - public PartialFlushException(String message) { + private static final long serialVersionUID = -9049491093800487565L; + + public IncompleteFlushException() { } + + public IncompleteFlushException(String message) { super(message); } - public PartialFlushException(String message, Throwable cause) { + public IncompleteFlushException(String message, Throwable cause) { super(message, cause); } - public PartialFlushException(Throwable cause) { + public IncompleteFlushException(Throwable cause) { super(cause); } }