Remove throws Exception from ChannelHandler methods that handle o… (#11417)

Motivation:

At the moment all methods in `ChannelHandler` declare `throws Exception` as part of their method signature. While this is fine for methods that handle inbound events it is quite confusing for methods that handle outbound events. This comes due the fact that these methods also take a `ChannelPromise` which actually need to be fullfilled to signal back either success or failure. Define `throws...` for these methods is confusing at best. We should just always require the implementation to use the passed in promise to signal back success or failure. Doing so also clears up semantics in general. Due the fact that we can't "forbid" throwing `RuntimeException` we still need to handle this in some way tho. In this case we should just consider it a "bug" and so log it and close the `Channel` in question. The user should never have an exception "escape" their implementation and just use the promise. This also clears up the ownership of the passed in message etc.

As `flush(ChannelHandlerContext)` and `read(ChannelHandlerContext)` don't take a `ChannelPromise` as argument this also means that these methods can never produce an error. This makes kind of sense as these really are just "signals" for the underlying transports to do something. For `RuntimeException` the same rule is used as for other outbound event handling methods, which is logging and closing the `Channel`.

Motifications:

- Remove `throws Exception` from signature
- Adjust code to not throw and just notify the promise directly
- Adjust unit tests

Result:

Much cleaner API and semantics.
This commit is contained in:
Norman Maurer 2021-07-08 10:16:00 +02:00 committed by GitHub
parent 54aa4d9b68
commit 6ac8ef54f7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
46 changed files with 172 additions and 188 deletions

View File

@ -121,33 +121,33 @@ public class HttpClientUpgradeHandler extends HttpObjectAggregator {
} }
@Override @Override
public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception { public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) {
ctx.bind(localAddress, promise); ctx.bind(localAddress, promise);
} }
@Override @Override
public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress,
ChannelPromise promise) throws Exception { ChannelPromise promise) {
ctx.connect(remoteAddress, localAddress, promise); ctx.connect(remoteAddress, localAddress, promise);
} }
@Override @Override
public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) {
ctx.disconnect(promise); ctx.disconnect(promise);
} }
@Override @Override
public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { public void close(ChannelHandlerContext ctx, ChannelPromise promise) {
ctx.close(promise); ctx.close(promise);
} }
@Override @Override
public void register(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { public void register(ChannelHandlerContext ctx, ChannelPromise promise) {
ctx.register(promise); ctx.register(promise);
} }
@Override @Override
public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) {
ctx.deregister(promise); ctx.deregister(promise);
} }
@ -157,8 +157,7 @@ public class HttpClientUpgradeHandler extends HttpObjectAggregator {
} }
@Override @Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
throws Exception {
if (!(msg instanceof HttpRequest)) { if (!(msg instanceof HttpRequest)) {
ctx.write(msg, promise); ctx.write(msg, promise);
return; return;
@ -182,7 +181,7 @@ public class HttpClientUpgradeHandler extends HttpObjectAggregator {
} }
@Override @Override
public void flush(ChannelHandlerContext ctx) throws Exception { public void flush(ChannelHandlerContext ctx) {
ctx.flush(); ctx.flush();
} }

View File

@ -65,7 +65,7 @@ public class HttpServerKeepAliveHandler implements ChannelHandler {
} }
@Override @Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
// modify message on way out to add headers if needed // modify message on way out to add headers if needed
if (msg instanceof HttpResponse) { if (msg instanceof HttpResponse) {
final HttpResponse response = (HttpResponse) msg; final HttpResponse response = (HttpResponse) msg;

View File

@ -215,8 +215,7 @@ public class CorsHandler implements ChannelHandler {
} }
@Override @Override
public void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise) public void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise) {
throws Exception {
if (config != null && config.isCorsSupportEnabled() && msg instanceof HttpResponse) { if (config != null && config.isCorsSupportEnabled() && msg instanceof HttpResponse) {
final HttpResponse response = (HttpResponse) msg; final HttpResponse response = (HttpResponse) msg;
if (setOrigin(response)) { if (setOrigin(response)) {

View File

@ -82,7 +82,7 @@ abstract class WebSocketProtocolHandler extends MessageToMessageDecoder<WebSocke
} }
@Override @Override
public void close(final ChannelHandlerContext ctx, final ChannelPromise promise) throws Exception { public void close(final ChannelHandlerContext ctx, final ChannelPromise promise) {
if (closeStatus == null || !ctx.channel().isActive()) { if (closeStatus == null || !ctx.channel().isActive()) {
ctx.close(promise); ctx.close(promise);
} else { } else {
@ -96,7 +96,7 @@ abstract class WebSocketProtocolHandler extends MessageToMessageDecoder<WebSocke
} }
@Override @Override
public void write(final ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { public void write(final ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
if (closeSent != null) { if (closeSent != null) {
ReferenceCountUtil.release(msg); ReferenceCountUtil.release(msg);
promise.setFailure(new ClosedChannelException()); promise.setFailure(new ClosedChannelException());

View File

@ -56,7 +56,7 @@ public class WebSocketClientExtensionHandler implements ChannelHandler {
} }
@Override @Override
public void write(final ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { public void write(final ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
if (msg instanceof HttpRequest && WebSocketExtensionUtil.isWebsocketUpgrade(((HttpRequest) msg).headers())) { if (msg instanceof HttpRequest && WebSocketExtensionUtil.isWebsocketUpgrade(((HttpRequest) msg).headers())) {
HttpRequest request = (HttpRequest) msg; HttpRequest request = (HttpRequest) msg;
String headerValue = request.headers().getAsString(HttpHeaderNames.SEC_WEBSOCKET_EXTENSIONS); String headerValue = request.headers().getAsString(HttpHeaderNames.SEC_WEBSOCKET_EXTENSIONS);

View File

@ -99,7 +99,7 @@ public class WebSocketServerExtensionHandler implements ChannelHandler {
} }
@Override @Override
public void write(final ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { public void write(final ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
if (msg instanceof HttpResponse) { if (msg instanceof HttpResponse) {
HttpResponse httpResponse = (HttpResponse) msg; HttpResponse httpResponse = (HttpResponse) msg;
//checking the status is faster than looking at headers //checking the status is faster than looking at headers

View File

@ -17,7 +17,6 @@ package io.netty.handler.codec.http.websocketx;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled; import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;

View File

@ -436,23 +436,23 @@ public class Http2ConnectionHandler extends ByteToMessageDecoder implements Http
} }
@Override @Override
public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception { public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) {
ctx.bind(localAddress, promise); ctx.bind(localAddress, promise);
} }
@Override @Override
public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress,
ChannelPromise promise) throws Exception { ChannelPromise promise) {
ctx.connect(remoteAddress, localAddress, promise); ctx.connect(remoteAddress, localAddress, promise);
} }
@Override @Override
public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) {
ctx.disconnect(promise); ctx.disconnect(promise);
} }
@Override @Override
public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { public void close(ChannelHandlerContext ctx, ChannelPromise promise) {
if (decoupleCloseAndGoAway) { if (decoupleCloseAndGoAway) {
ctx.close(promise); ctx.close(promise);
return; return;
@ -508,12 +508,12 @@ public class Http2ConnectionHandler extends ByteToMessageDecoder implements Http
} }
@Override @Override
public void register(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { public void register(ChannelHandlerContext ctx, ChannelPromise promise) {
ctx.register(promise); ctx.register(promise);
} }
@Override @Override
public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) {
ctx.deregister(promise); ctx.deregister(promise);
} }
@ -523,7 +523,7 @@ public class Http2ConnectionHandler extends ByteToMessageDecoder implements Http
} }
@Override @Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
ctx.write(msg, promise); ctx.write(msg, promise);
} }

View File

@ -342,7 +342,7 @@ public class Http2FrameCodec extends Http2ConnectionHandler {
ctx.write(msg, promise); ctx.write(msg, promise);
} else { } else {
ReferenceCountUtil.release(msg); ReferenceCountUtil.release(msg);
throw new UnsupportedMessageTypeException(msg); promise.setFailure(new UnsupportedMessageTypeException(msg));
} }
} }

View File

@ -647,12 +647,17 @@ public class Http2ConnectionRoundtripTest {
newPromise()); newPromise());
clientChannel.pipeline().addFirst(new ChannelHandler() { clientChannel.pipeline().addFirst(new ChannelHandler() {
@Override @Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
ReferenceCountUtil.release(msg); ReferenceCountUtil.release(msg);
// Ensure we update the window size so we will try to write the rest of the frame while try {
// processing the flush. // Ensure we update the window size so we will try to write the rest of the frame while
http2Client.encoder().flowController().initialWindowSize(8); // processing the flush.
http2Client.encoder().flowController().initialWindowSize(8);
} catch (Http2Exception e) {
promise.setFailure(e);
return;
}
promise.setFailure(new IllegalStateException()); promise.setFailure(new IllegalStateException());
} }
}); });

View File

@ -1278,7 +1278,7 @@ public abstract class Http2MultiplexTest<C extends Http2FrameCodec> {
} }
@Override @Override
public void flush(ChannelHandlerContext ctx) throws Exception { public void flush(ChannelHandlerContext ctx) {
didFlush = true; didFlush = true;
ctx.flush(); ctx.flush();
} }

View File

@ -454,7 +454,7 @@ public class Http2StreamFrameToHttpObjectCodecTest {
EmbeddedChannel ch = new EmbeddedChannel(ctx.newHandler(ByteBufAllocator.DEFAULT), EmbeddedChannel ch = new EmbeddedChannel(ctx.newHandler(ByteBufAllocator.DEFAULT),
new ChannelHandler() { new ChannelHandler() {
@Override @Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
if (msg instanceof Http2StreamFrame) { if (msg instanceof Http2StreamFrame) {
frames.add((Http2StreamFrame) msg); frames.add((Http2StreamFrame) msg);
ctx.write(Unpooled.EMPTY_BUFFER, promise); ctx.write(Unpooled.EMPTY_BUFFER, promise);

View File

@ -102,7 +102,7 @@ public abstract class ByteToMessageCodec<I> extends ChannelHandlerAdapter {
} }
@Override @Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
encoder.write(ctx, msg, promise); encoder.write(ctx, msg, promise);
} }

View File

@ -91,29 +91,29 @@ public class DatagramPacketEncoder<M> extends MessageToMessageEncoder<AddressedE
} }
@Override @Override
public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception { public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) {
encoder.bind(ctx, localAddress, promise); encoder.bind(ctx, localAddress, promise);
} }
@Override @Override
public void connect( public void connect(
ChannelHandlerContext ctx, SocketAddress remoteAddress, ChannelHandlerContext ctx, SocketAddress remoteAddress,
SocketAddress localAddress, ChannelPromise promise) throws Exception { SocketAddress localAddress, ChannelPromise promise) {
encoder.connect(ctx, remoteAddress, localAddress, promise); encoder.connect(ctx, remoteAddress, localAddress, promise);
} }
@Override @Override
public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) {
encoder.disconnect(ctx, promise); encoder.disconnect(ctx, promise);
} }
@Override @Override
public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { public void close(ChannelHandlerContext ctx, ChannelPromise promise) {
encoder.close(ctx, promise); encoder.close(ctx, promise);
} }
@Override @Override
public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) {
encoder.deregister(ctx, promise); encoder.deregister(ctx, promise);
} }
@ -123,7 +123,7 @@ public class DatagramPacketEncoder<M> extends MessageToMessageEncoder<AddressedE
} }
@Override @Override
public void flush(ChannelHandlerContext ctx) throws Exception { public void flush(ChannelHandlerContext ctx) {
encoder.flush(ctx); encoder.flush(ctx);
} }

View File

@ -96,7 +96,7 @@ public abstract class MessageToByteEncoder<I> extends ChannelHandlerAdapter {
} }
@Override @Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
ByteBuf buf = null; ByteBuf buf = null;
try { try {
if (acceptOutboundMessage(msg)) { if (acceptOutboundMessage(msg)) {
@ -120,9 +120,9 @@ public abstract class MessageToByteEncoder<I> extends ChannelHandlerAdapter {
ctx.write(msg, promise); ctx.write(msg, promise);
} }
} catch (EncoderException e) { } catch (EncoderException e) {
throw e; promise.setFailure(e);
} catch (Throwable e) { } catch (Throwable e) {
throw new EncoderException(e); promise.setFailure(new EncoderException(e));
} finally { } finally {
if (buf != null) { if (buf != null) {
buf.release(); buf.release();

View File

@ -112,7 +112,7 @@ public abstract class MessageToMessageCodec<INBOUND_IN, OUTBOUND_IN> extends Cha
} }
@Override @Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
encoder.write(ctx, msg, promise); encoder.write(ctx, msg, promise);
} }

View File

@ -78,7 +78,7 @@ public abstract class MessageToMessageEncoder<I> extends ChannelHandlerAdapter {
} }
@Override @Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
CodecOutputList out = null; CodecOutputList out = null;
try { try {
if (acceptOutboundMessage(msg)) { if (acceptOutboundMessage(msg)) {
@ -99,9 +99,9 @@ public abstract class MessageToMessageEncoder<I> extends ChannelHandlerAdapter {
ctx.write(msg, promise); ctx.write(msg, promise);
} }
} catch (EncoderException e) { } catch (EncoderException e) {
throw e; promise.setFailure(e);
} catch (Throwable t) { } catch (Throwable t) {
throw new EncoderException(t); promise.setFailure(new EncoderException(t));
} finally { } finally {
if (out != null) { if (out != null) {
try { try {

View File

@ -193,7 +193,7 @@ public class Bzip2Encoder extends MessageToByteEncoder<ByteBuf> {
} }
@Override @Override
public void close(final ChannelHandlerContext ctx, final ChannelPromise promise) throws Exception { public void close(final ChannelHandlerContext ctx, final ChannelPromise promise) {
ChannelFuture f = finishEncode(ctx, ctx.newPromise()); ChannelFuture f = finishEncode(ctx, ctx.newPromise());
f.addListener((ChannelFutureListener) f1 -> ctx.close(promise)); f.addListener((ChannelFutureListener) f1 -> ctx.close(promise));

View File

@ -262,7 +262,7 @@ public class JdkZlibEncoder extends ZlibEncoder {
} }
@Override @Override
public void close(final ChannelHandlerContext ctx, final ChannelPromise promise) throws Exception { public void close(final ChannelHandlerContext ctx, final ChannelPromise promise) {
ChannelFuture f = finishEncode(ctx, ctx.newPromise()); ChannelFuture f = finishEncode(ctx, ctx.newPromise());
f.addListener((ChannelFutureListener) f1 -> ctx.close(promise)); f.addListener((ChannelFutureListener) f1 -> ctx.close(promise));

View File

@ -297,7 +297,7 @@ public class Lz4FrameEncoder extends MessageToByteEncoder<ByteBuf> {
} }
@Override @Override
public void flush(final ChannelHandlerContext ctx) throws Exception { public void flush(final ChannelHandlerContext ctx) {
if (buffer != null && buffer.isReadable()) { if (buffer != null && buffer.isReadable()) {
final ByteBuf buf = allocateBuffer(ctx, Unpooled.EMPTY_BUFFER, isPreferDirect(), false); final ByteBuf buf = allocateBuffer(ctx, Unpooled.EMPTY_BUFFER, isPreferDirect(), false);
flushBufferedData(buf); flushBufferedData(buf);
@ -366,7 +366,7 @@ public class Lz4FrameEncoder extends MessageToByteEncoder<ByteBuf> {
} }
@Override @Override
public void close(final ChannelHandlerContext ctx, final ChannelPromise promise) throws Exception { public void close(final ChannelHandlerContext ctx, final ChannelPromise promise) {
ChannelFuture f = finishEncode(ctx, ctx.newPromise()); ChannelFuture f = finishEncode(ctx, ctx.newPromise());
f.addListener((ChannelFutureListener) f1 -> ctx.close(promise)); f.addListener((ChannelFutureListener) f1 -> ctx.close(promise));

View File

@ -33,7 +33,7 @@ public class HAProxyHandler extends ChannelOutboundHandlerAdapter {
} }
@Override @Override
public void write(final ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { public void write(final ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
ChannelFuture future = ctx.write(msg, promise); ChannelFuture future = ctx.write(msg, promise);
if (msg instanceof HAProxyMessage) { if (msg instanceof HAProxyMessage) {
future.addListener(new ChannelFutureListener() { future.addListener(new ChannelFutureListener() {

View File

@ -286,28 +286,28 @@ public final class HttpProxyHandler extends ProxyHandler {
@Override @Override
public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, public void bind(ChannelHandlerContext ctx, SocketAddress localAddress,
ChannelPromise promise) throws Exception { ChannelPromise promise) {
codec.bind(ctx, localAddress, promise); codec.bind(ctx, localAddress, promise);
} }
@Override @Override
public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress,
ChannelPromise promise) throws Exception { ChannelPromise promise) {
codec.connect(ctx, remoteAddress, localAddress, promise); codec.connect(ctx, remoteAddress, localAddress, promise);
} }
@Override @Override
public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) {
codec.disconnect(ctx, promise); codec.disconnect(ctx, promise);
} }
@Override @Override
public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { public void close(ChannelHandlerContext ctx, ChannelPromise promise) {
codec.close(ctx, promise); codec.close(ctx, promise);
} }
@Override @Override
public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) {
codec.deregister(ctx, promise); codec.deregister(ctx, promise);
} }
@ -317,12 +317,12 @@ public final class HttpProxyHandler extends ProxyHandler {
} }
@Override @Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
codec.write(ctx, msg, promise); codec.write(ctx, msg, promise);
} }
@Override @Override
public void flush(ChannelHandlerContext ctx) throws Exception { public void flush(ChannelHandlerContext ctx) {
codec.flush(ctx); codec.flush(ctx);
} }
} }

View File

@ -167,7 +167,7 @@ public abstract class ProxyHandler implements ChannelHandler {
@Override @Override
public final void connect( public final void connect(
ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress,
ChannelPromise promise) throws Exception { ChannelPromise promise) {
if (destinationAddress != null) { if (destinationAddress != null) {
promise.setFailure(new ConnectionPendingException()); promise.setFailure(new ConnectionPendingException());
@ -394,7 +394,7 @@ public abstract class ProxyHandler implements ChannelHandler {
} }
@Override @Override
public final void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { public final void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
if (finished) { if (finished) {
writePendingWrites(); writePendingWrites();
ctx.write(msg, promise); ctx.write(msg, promise);
@ -404,7 +404,7 @@ public abstract class ProxyHandler implements ChannelHandler {
} }
@Override @Override
public final void flush(ChannelHandlerContext ctx) throws Exception { public final void flush(ChannelHandlerContext ctx) {
if (finished) { if (finished) {
writePendingWrites(); writePendingWrites();
ctx.flush(); ctx.flush();

View File

@ -28,8 +28,8 @@ import java.net.SocketAddress;
/** /**
* {@link ChannelHandler} which will resolve the {@link SocketAddress} that is passed to * {@link ChannelHandler} which will resolve the {@link SocketAddress} that is passed to
* {@link #connect(ChannelHandlerContext, SocketAddress, SocketAddress, ChannelPromise)} if it is not already resolved * {@link ChannelHandler#connect(ChannelHandlerContext, SocketAddress, SocketAddress, ChannelPromise)} if it is not
* and the {@link AddressResolver} supports the type of {@link SocketAddress}. * already resolved and the {@link AddressResolver} supports the type of {@link SocketAddress}.
*/ */
@Sharable @Sharable
public class ResolveAddressHandler implements ChannelHandler { public class ResolveAddressHandler implements ChannelHandler {
@ -42,7 +42,7 @@ public class ResolveAddressHandler implements ChannelHandler {
@Override @Override
public void connect(final ChannelHandlerContext ctx, SocketAddress remoteAddress, public void connect(final ChannelHandlerContext ctx, SocketAddress remoteAddress,
final SocketAddress localAddress, final ChannelPromise promise) { final SocketAddress localAddress, final ChannelPromise promise) {
AddressResolver<? extends SocketAddress> resolver = resolverGroup.getResolver(ctx.executor()); AddressResolver<? extends SocketAddress> resolver = resolverGroup.getResolver(ctx.executor());
if (resolver.isSupported(remoteAddress) && !resolver.isResolved(remoteAddress)) { if (resolver.isSupported(remoteAddress) && !resolver.isResolved(remoteAddress)) {
resolver.resolve(remoteAddress).addListener((FutureListener<SocketAddress>) future -> { resolver.resolve(remoteAddress).addListener((FutureListener<SocketAddress>) future -> {

View File

@ -36,8 +36,8 @@ import java.util.concurrent.Future;
* in most cases (where write latency can be traded with throughput) a good idea to try to minimize flush operations * in most cases (where write latency can be traded with throughput) a good idea to try to minimize flush operations
* as much as possible. * as much as possible.
* <p> * <p>
* If a read loop is currently ongoing, {@link #flush(ChannelHandlerContext)} will not be passed on to the next * If a read loop is currently ongoing, {@link ChannelHandler#flush(ChannelHandlerContext)} will not be passed on to
* {@link ChannelHandler} in the {@link ChannelPipeline}, as it will pick up any pending flushes when * the next {@link ChannelHandler} in the {@link ChannelPipeline}, as it will pick up any pending flushes when
* {@link #channelReadComplete(ChannelHandlerContext)} is triggered. * {@link #channelReadComplete(ChannelHandlerContext)} is triggered.
* If no read loop is ongoing, the behavior depends on the {@code consolidateWhenNoReadInProgress} constructor argument: * If no read loop is ongoing, the behavior depends on the {@code consolidateWhenNoReadInProgress} constructor argument:
* <ul> * <ul>
@ -114,7 +114,7 @@ public class FlushConsolidationHandler implements ChannelHandler {
} }
@Override @Override
public void flush(ChannelHandlerContext ctx) throws Exception { public void flush(ChannelHandlerContext ctx) {
if (readInProgress) { if (readInProgress) {
// If there is still a read in progress we are sure we will see a channelReadComplete(...) call. Thus // If there is still a read in progress we are sure we will see a channelReadComplete(...) call. Thus
// we only need to flush if we reach the explicitFlushAfterFlushes limit. // we only need to flush if we reach the explicitFlushAfterFlushes limit.
@ -155,14 +155,14 @@ public class FlushConsolidationHandler implements ChannelHandler {
} }
@Override @Override
public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) {
// Try to flush one last time if flushes are pending before disconnect the channel. // Try to flush one last time if flushes are pending before disconnect the channel.
resetReadAndFlushIfNeeded(ctx); resetReadAndFlushIfNeeded(ctx);
ctx.disconnect(promise); ctx.disconnect(promise);
} }
@Override @Override
public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { public void close(ChannelHandlerContext ctx, ChannelPromise promise) {
// Try to flush one last time if flushes are pending before close the channel. // Try to flush one last time if flushes are pending before close the channel.
resetReadAndFlushIfNeeded(ctx); resetReadAndFlushIfNeeded(ctx);
ctx.close(promise); ctx.close(promise);

View File

@ -222,7 +222,7 @@ public class LoggingHandler implements ChannelHandler {
} }
@Override @Override
public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception { public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) {
if (logger.isEnabled(internalLevel)) { if (logger.isEnabled(internalLevel)) {
logger.log(internalLevel, format(ctx, "BIND", localAddress)); logger.log(internalLevel, format(ctx, "BIND", localAddress));
} }
@ -232,7 +232,7 @@ public class LoggingHandler implements ChannelHandler {
@Override @Override
public void connect( public void connect(
ChannelHandlerContext ctx, ChannelHandlerContext ctx,
SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) throws Exception { SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
if (logger.isEnabled(internalLevel)) { if (logger.isEnabled(internalLevel)) {
logger.log(internalLevel, format(ctx, "CONNECT", remoteAddress, localAddress)); logger.log(internalLevel, format(ctx, "CONNECT", remoteAddress, localAddress));
} }
@ -240,7 +240,7 @@ public class LoggingHandler implements ChannelHandler {
} }
@Override @Override
public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) {
if (logger.isEnabled(internalLevel)) { if (logger.isEnabled(internalLevel)) {
logger.log(internalLevel, format(ctx, "DISCONNECT")); logger.log(internalLevel, format(ctx, "DISCONNECT"));
} }
@ -248,7 +248,7 @@ public class LoggingHandler implements ChannelHandler {
} }
@Override @Override
public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { public void close(ChannelHandlerContext ctx, ChannelPromise promise) {
if (logger.isEnabled(internalLevel)) { if (logger.isEnabled(internalLevel)) {
logger.log(internalLevel, format(ctx, "CLOSE")); logger.log(internalLevel, format(ctx, "CLOSE"));
} }
@ -256,7 +256,7 @@ public class LoggingHandler implements ChannelHandler {
} }
@Override @Override
public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) {
if (logger.isEnabled(internalLevel)) { if (logger.isEnabled(internalLevel)) {
logger.log(internalLevel, format(ctx, "DEREGISTER")); logger.log(internalLevel, format(ctx, "DEREGISTER"));
} }
@ -280,7 +280,7 @@ public class LoggingHandler implements ChannelHandler {
} }
@Override @Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
if (logger.isEnabled(internalLevel)) { if (logger.isEnabled(internalLevel)) {
logger.log(internalLevel, format(ctx, "WRITE", msg)); logger.log(internalLevel, format(ctx, "WRITE", msg));
} }
@ -296,7 +296,7 @@ public class LoggingHandler implements ChannelHandler {
} }
@Override @Override
public void flush(ChannelHandlerContext ctx) throws Exception { public void flush(ChannelHandlerContext ctx) {
if (logger.isEnabled(internalLevel)) { if (logger.isEnabled(internalLevel)) {
logger.log(internalLevel, format(ctx, "FLUSH")); logger.log(internalLevel, format(ctx, "FLUSH"));
} }

View File

@ -239,7 +239,7 @@ public final class PcapWriteHandler extends ChannelDuplexHandler implements Clos
} }
@Override @Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
if (!isClosed) { if (!isClosed) {
if (ctx.channel() instanceof SocketChannel) { if (ctx.channel() instanceof SocketChannel) {
handleTCP(ctx, msg, true); handleTCP(ctx, msg, true);

View File

@ -475,7 +475,8 @@ public class SslHandler extends ByteToMessageDecoder {
* Sets the number of bytes to pass to each {@link SSLEngine#wrap(ByteBuffer[], int, int, ByteBuffer)} call. * Sets the number of bytes to pass to each {@link SSLEngine#wrap(ByteBuffer[], int, int, ByteBuffer)} call.
* <p> * <p>
* This value will partition data which is passed to write * This value will partition data which is passed to write
* {@link #write(ChannelHandlerContext, Object, ChannelPromise)}. The partitioning will work as follows: * {@link ChannelHandler#write(ChannelHandlerContext, Object, ChannelPromise)}.
* The partitioning will work as follows:
* <ul> * <ul>
* <li>If {@code wrapDataSize <= 0} then we will write each data chunk as is.</li> * <li>If {@code wrapDataSize <= 0} then we will write each data chunk as is.</li>
* <li>If {@code wrapDataSize > data size} then we will attempt to aggregate multiple data chunks together.</li> * <li>If {@code wrapDataSize > data size} then we will attempt to aggregate multiple data chunks together.</li>
@ -700,35 +701,35 @@ public class SslHandler extends ByteToMessageDecoder {
} }
@Override @Override
public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception { public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) {
ctx.bind(localAddress, promise); ctx.bind(localAddress, promise);
} }
@Override @Override
public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress,
ChannelPromise promise) throws Exception { ChannelPromise promise) {
ctx.connect(remoteAddress, localAddress, promise); ctx.connect(remoteAddress, localAddress, promise);
} }
@Override @Override
public void register(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { public void register(ChannelHandlerContext ctx, ChannelPromise promise) {
ctx.register(promise); ctx.register(promise);
} }
@Override @Override
public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) {
ctx.deregister(promise); ctx.deregister(promise);
} }
@Override @Override
public void disconnect(final ChannelHandlerContext ctx, public void disconnect(final ChannelHandlerContext ctx,
final ChannelPromise promise) throws Exception { final ChannelPromise promise) {
closeOutboundAndChannel(ctx, promise, true); closeOutboundAndChannel(ctx, promise, true);
} }
@Override @Override
public void close(final ChannelHandlerContext ctx, public void close(final ChannelHandlerContext ctx,
final ChannelPromise promise) throws Exception { final ChannelPromise promise) {
closeOutboundAndChannel(ctx, promise, false); closeOutboundAndChannel(ctx, promise, false);
} }
@ -746,7 +747,7 @@ public class SslHandler extends ByteToMessageDecoder {
} }
@Override @Override
public void write(final ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { public void write(final ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
if (!(msg instanceof ByteBufConvertible)) { if (!(msg instanceof ByteBufConvertible)) {
UnsupportedMessageTypeException exception = new UnsupportedMessageTypeException(msg, ByteBuf.class); UnsupportedMessageTypeException exception = new UnsupportedMessageTypeException(msg, ByteBuf.class);
ReferenceCountUtil.safeRelease(msg); ReferenceCountUtil.safeRelease(msg);
@ -760,7 +761,7 @@ public class SslHandler extends ByteToMessageDecoder {
} }
@Override @Override
public void flush(ChannelHandlerContext ctx) throws Exception { public void flush(ChannelHandlerContext ctx) {
// Do not encrypt the first write request if this handler is // Do not encrypt the first write request if this handler is
// created with startTLS flag turned on. // created with startTLS flag turned on.
if (startTls && !isStateSet(STATE_SENT_FIRST_MESSAGE)) { if (startTls && !isStateSet(STATE_SENT_FIRST_MESSAGE)) {
@ -781,7 +782,6 @@ public class SslHandler extends ByteToMessageDecoder {
wrapAndFlush(ctx); wrapAndFlush(ctx);
} catch (Throwable cause) { } catch (Throwable cause) {
setHandshakeFailure(ctx, cause); setHandshakeFailure(ctx, cause);
throw cause;
} }
} }
@ -1832,7 +1832,7 @@ public class SslHandler extends ByteToMessageDecoder {
} }
private void closeOutboundAndChannel( private void closeOutboundAndChannel(
final ChannelHandlerContext ctx, final ChannelPromise promise, boolean disconnect) throws Exception { final ChannelHandlerContext ctx, final ChannelPromise promise, boolean disconnect) {
setState(STATE_OUTBOUND_CLOSED); setState(STATE_OUTBOUND_CLOSED);
engine.closeOutbound(); engine.closeOutbound();
@ -1868,7 +1868,7 @@ public class SslHandler extends ByteToMessageDecoder {
} }
} }
private void flush(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { private void flush(ChannelHandlerContext ctx, ChannelPromise promise) {
if (pendingUnencryptedWrites != null) { if (pendingUnencryptedWrites != null) {
pendingUnencryptedWrites.add(Unpooled.EMPTY_BUFFER, promise); pendingUnencryptedWrites.add(Unpooled.EMPTY_BUFFER, promise);
} else { } else {

View File

@ -114,12 +114,12 @@ public class ChunkedWriteHandler implements ChannelHandler {
} }
@Override @Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
queue.add(new PendingWrite(msg, promise)); queue.add(new PendingWrite(msg, promise));
} }
@Override @Override
public void flush(ChannelHandlerContext ctx) throws Exception { public void flush(ChannelHandlerContext ctx) {
doFlush(ctx); doFlush(ctx);
} }

View File

@ -297,7 +297,7 @@ public class IdleStateHandler implements ChannelHandler {
} }
@Override @Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
// Allow writing with void promise if handler is only configured for read timeout events. // Allow writing with void promise if handler is only configured for read timeout events.
if (writerIdleTimeNanos > 0 || allIdleTimeNanos > 0) { if (writerIdleTimeNanos > 0 || allIdleTimeNanos > 0) {
ctx.write(msg, promise).addListener(writeListener); ctx.write(msg, promise).addListener(writeListener);

View File

@ -104,7 +104,7 @@ public class WriteTimeoutHandler implements ChannelHandler {
} }
@Override @Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
if (timeoutNanos > 0) { if (timeoutNanos > 0) {
scheduleTimeout(ctx, promise); scheduleTimeout(ctx, promise);
} }

View File

@ -549,8 +549,7 @@ public abstract class AbstractTrafficShapingHandler implements ChannelHandler {
} }
@Override @Override
public void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise) public void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise) {
throws Exception {
long size = calculateSize(msg); long size = calculateSize(msg);
long now = TrafficCounter.milliSecondFromNano(); long now = TrafficCounter.milliSecondFromNano();
if (size > 0) { if (size > 0) {

View File

@ -648,8 +648,7 @@ public class GlobalChannelTrafficShapingHandler extends AbstractTrafficShapingHa
} }
@Override @Override
public void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise) public void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise) {
throws Exception {
long size = calculateSize(msg); long size = calculateSize(msg);
long now = TrafficCounter.milliSecondFromNano(); long now = TrafficCounter.milliSecondFromNano();
if (size > 0) { if (size > 0) {

View File

@ -182,7 +182,7 @@ public class FlushConsolidationHandlerTest {
return new EmbeddedChannel( return new EmbeddedChannel(
new ChannelHandler() { new ChannelHandler() {
@Override @Override
public void flush(ChannelHandlerContext ctx) throws Exception { public void flush(ChannelHandlerContext ctx) {
flushCount.incrementAndGet(); flushCount.incrementAndGet();
ctx.flush(); ctx.flush();
} }

View File

@ -118,7 +118,7 @@ public class SslHandlerTest {
SSLEngine engine = newClientModeSSLEngine(); SSLEngine engine = newClientModeSSLEngine();
SslHandler handler = new SslHandler(engine) { SslHandler handler = new SslHandler(engine) {
@Override @Override
public void write(final ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { public void write(final ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
super.write(ctx, msg, promise); super.write(ctx, msg, promise);
writeLatch.countDown(); writeLatch.countDown();
} }

View File

@ -616,7 +616,7 @@ public class ChunkedWriteHandlerTest {
EmbeddedChannel ch = new EmbeddedChannel(new ChannelHandler() { EmbeddedChannel ch = new EmbeddedChannel(new ChannelHandler() {
@Override @Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
ReferenceCountUtil.release(msg); ReferenceCountUtil.release(msg);
// Calling close so we will drop all queued messages in the ChunkedWriteHandler. // Calling close so we will drop all queued messages in the ChunkedWriteHandler.
ctx.close(); ctx.close();

View File

@ -104,8 +104,7 @@ public class EpollSocketChannelBenchmark extends AbstractMicrobenchmark {
} }
@Override @Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
throws Exception {
if (lastWritePromise != null) { if (lastWritePromise != null) {
throw new IllegalStateException(); throw new IllegalStateException();
} }

View File

@ -278,10 +278,9 @@ public interface ChannelHandler {
* @param ctx the {@link ChannelHandlerContext} for which the bind operation is made * @param ctx the {@link ChannelHandlerContext} for which the bind operation is made
* @param localAddress the {@link SocketAddress} to which it should bound * @param localAddress the {@link SocketAddress} to which it should bound
* @param promise the {@link ChannelPromise} to notify once the operation completes * @param promise the {@link ChannelPromise} to notify once the operation completes
* @throws Exception thrown if an error occurs
*/ */
@Skip @Skip
default void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception { default void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) {
ctx.bind(localAddress, promise); ctx.bind(localAddress, promise);
} }
@ -292,12 +291,11 @@ public interface ChannelHandler {
* @param remoteAddress the {@link SocketAddress} to which it should connect * @param remoteAddress the {@link SocketAddress} to which it should connect
* @param localAddress the {@link SocketAddress} which is used as source on connect * @param localAddress the {@link SocketAddress} which is used as source on connect
* @param promise the {@link ChannelPromise} to notify once the operation completes * @param promise the {@link ChannelPromise} to notify once the operation completes
* @throws Exception thrown if an error occurs
*/ */
@Skip @Skip
default void connect( default void connect(
ChannelHandlerContext ctx, SocketAddress remoteAddress, ChannelHandlerContext ctx, SocketAddress remoteAddress,
SocketAddress localAddress, ChannelPromise promise) throws Exception { SocketAddress localAddress, ChannelPromise promise) {
ctx.connect(remoteAddress, localAddress, promise); ctx.connect(remoteAddress, localAddress, promise);
} }
@ -306,10 +304,9 @@ public interface ChannelHandler {
* *
* @param ctx the {@link ChannelHandlerContext} for which the disconnect operation is made * @param ctx the {@link ChannelHandlerContext} for which the disconnect operation is made
* @param promise the {@link ChannelPromise} to notify once the operation completes * @param promise the {@link ChannelPromise} to notify once the operation completes
* @throws Exception thrown if an error occurs
*/ */
@Skip @Skip
default void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { default void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) {
ctx.disconnect(promise); ctx.disconnect(promise);
} }
@ -318,10 +315,9 @@ public interface ChannelHandler {
* *
* @param ctx the {@link ChannelHandlerContext} for which the close operation is made * @param ctx the {@link ChannelHandlerContext} for which the close operation is made
* @param promise the {@link ChannelPromise} to notify once the operation completes * @param promise the {@link ChannelPromise} to notify once the operation completes
* @throws Exception thrown if an error occurs
*/ */
@Skip @Skip
default void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { default void close(ChannelHandlerContext ctx, ChannelPromise promise) {
ctx.close(promise); ctx.close(promise);
} }
@ -330,10 +326,9 @@ public interface ChannelHandler {
* *
* @param ctx the {@link ChannelHandlerContext} for which the register operation is made * @param ctx the {@link ChannelHandlerContext} for which the register operation is made
* @param promise the {@link ChannelPromise} to notify once the operation completes * @param promise the {@link ChannelPromise} to notify once the operation completes
* @throws Exception thrown if an error occurs
*/ */
@Skip @Skip
default void register(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { default void register(ChannelHandlerContext ctx, ChannelPromise promise) {
ctx.register(promise); ctx.register(promise);
} }
@ -342,10 +337,9 @@ public interface ChannelHandler {
* *
* @param ctx the {@link ChannelHandlerContext} for which the deregister operation is made * @param ctx the {@link ChannelHandlerContext} for which the deregister operation is made
* @param promise the {@link ChannelPromise} to notify once the operation completes * @param promise the {@link ChannelPromise} to notify once the operation completes
* @throws Exception thrown if an error occurs
*/ */
@Skip @Skip
default void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { default void deregister(ChannelHandlerContext ctx, ChannelPromise promise) {
ctx.deregister(promise); ctx.deregister(promise);
} }
@ -360,15 +354,14 @@ public interface ChannelHandler {
/** /**
* Called once a write operation is made. The write operation will write the messages through the * Called once a write operation is made. The write operation will write the messages through the
* {@link ChannelPipeline}. Those are then ready to be flushed to the actual {@link Channel} once * {@link ChannelPipeline}. Those are then ready to be flushed to the actual {@link Channel} once
* {@link Channel#flush()} is called * {@link Channel#flush()} is called.
* *
* @param ctx the {@link ChannelHandlerContext} for which the write operation is made * @param ctx the {@link ChannelHandlerContext} for which the write operation is made
* @param msg the message to write * @param msg the message to write
* @param promise the {@link ChannelPromise} to notify once the operation completes * @param promise the {@link ChannelPromise} to notify once the operation completes
* @throws Exception thrown if an error occurs
*/ */
@Skip @Skip
default void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { default void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
ctx.write(msg, promise); ctx.write(msg, promise);
} }
@ -377,10 +370,9 @@ public interface ChannelHandler {
* that are pending. * that are pending.
* *
* @param ctx the {@link ChannelHandlerContext} for which the flush operation is made * @param ctx the {@link ChannelHandlerContext} for which the flush operation is made
* @throws Exception thrown if an error occurs
*/ */
@Skip @Skip
default void flush(ChannelHandlerContext ctx) throws Exception { default void flush(ChannelHandlerContext ctx) {
ctx.flush(); ctx.flush();
} }
} }

View File

@ -243,7 +243,7 @@ public class CombinedChannelDuplexHandler<I extends ChannelHandler, O extends Ch
@Override @Override
public void bind( public void bind(
ChannelHandlerContext ctx, ChannelHandlerContext ctx,
SocketAddress localAddress, ChannelPromise promise) throws Exception { SocketAddress localAddress, ChannelPromise promise) {
assert ctx == outboundCtx.ctx; assert ctx == outboundCtx.ctx;
if (!outboundCtx.removed) { if (!outboundCtx.removed) {
outboundHandler.bind(outboundCtx, localAddress, promise); outboundHandler.bind(outboundCtx, localAddress, promise);
@ -256,7 +256,7 @@ public class CombinedChannelDuplexHandler<I extends ChannelHandler, O extends Ch
public void connect( public void connect(
ChannelHandlerContext ctx, ChannelHandlerContext ctx,
SocketAddress remoteAddress, SocketAddress localAddress, SocketAddress remoteAddress, SocketAddress localAddress,
ChannelPromise promise) throws Exception { ChannelPromise promise) {
assert ctx == outboundCtx.ctx; assert ctx == outboundCtx.ctx;
if (!outboundCtx.removed) { if (!outboundCtx.removed) {
outboundHandler.connect(outboundCtx, remoteAddress, localAddress, promise); outboundHandler.connect(outboundCtx, remoteAddress, localAddress, promise);
@ -266,7 +266,7 @@ public class CombinedChannelDuplexHandler<I extends ChannelHandler, O extends Ch
} }
@Override @Override
public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) {
assert ctx == outboundCtx.ctx; assert ctx == outboundCtx.ctx;
if (!outboundCtx.removed) { if (!outboundCtx.removed) {
outboundHandler.disconnect(outboundCtx, promise); outboundHandler.disconnect(outboundCtx, promise);
@ -276,7 +276,7 @@ public class CombinedChannelDuplexHandler<I extends ChannelHandler, O extends Ch
} }
@Override @Override
public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { public void close(ChannelHandlerContext ctx, ChannelPromise promise) {
assert ctx == outboundCtx.ctx; assert ctx == outboundCtx.ctx;
if (!outboundCtx.removed) { if (!outboundCtx.removed) {
outboundHandler.close(outboundCtx, promise); outboundHandler.close(outboundCtx, promise);
@ -286,7 +286,7 @@ public class CombinedChannelDuplexHandler<I extends ChannelHandler, O extends Ch
} }
@Override @Override
public void register(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { public void register(ChannelHandlerContext ctx, ChannelPromise promise) {
assert ctx == outboundCtx.ctx; assert ctx == outboundCtx.ctx;
if (!outboundCtx.removed) { if (!outboundCtx.removed) {
outboundHandler.register(outboundCtx, promise); outboundHandler.register(outboundCtx, promise);
@ -296,7 +296,7 @@ public class CombinedChannelDuplexHandler<I extends ChannelHandler, O extends Ch
} }
@Override @Override
public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) {
assert ctx == outboundCtx.ctx; assert ctx == outboundCtx.ctx;
if (!outboundCtx.removed) { if (!outboundCtx.removed) {
outboundHandler.deregister(outboundCtx, promise); outboundHandler.deregister(outboundCtx, promise);
@ -316,7 +316,7 @@ public class CombinedChannelDuplexHandler<I extends ChannelHandler, O extends Ch
} }
@Override @Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
assert ctx == outboundCtx.ctx; assert ctx == outboundCtx.ctx;
if (!outboundCtx.removed) { if (!outboundCtx.removed) {
outboundHandler.write(outboundCtx, msg, promise); outboundHandler.write(outboundCtx, msg, promise);
@ -326,7 +326,7 @@ public class CombinedChannelDuplexHandler<I extends ChannelHandler, O extends Ch
} }
@Override @Override
public void flush(ChannelHandlerContext ctx) throws Exception { public void flush(ChannelHandlerContext ctx) {
assert ctx == outboundCtx.ctx; assert ctx == outboundCtx.ctx;
if (!outboundCtx.removed) { if (!outboundCtx.removed) {
outboundHandler.flush(outboundCtx); outboundHandler.flush(outboundCtx);

View File

@ -24,7 +24,6 @@ import io.netty.util.ReferenceCountUtil;
import io.netty.util.ResourceLeakHint; import io.netty.util.ResourceLeakHint;
import io.netty.util.concurrent.EventExecutor; import io.netty.util.concurrent.EventExecutor;
import io.netty.util.internal.ObjectPool; import io.netty.util.internal.ObjectPool;
import io.netty.util.internal.PromiseNotificationUtil;
import io.netty.util.internal.ThrowableUtil; import io.netty.util.internal.ThrowableUtil;
import io.netty.util.internal.StringUtil; import io.netty.util.internal.StringUtil;
import io.netty.util.internal.SystemPropertyUtil; import io.netty.util.internal.SystemPropertyUtil;
@ -483,7 +482,7 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou
try { try {
handler().bind(this, localAddress, promise); handler().bind(this, localAddress, promise);
} catch (Throwable t) { } catch (Throwable t) {
notifyOutboundHandlerException(t, promise); handleOutboundHandlerException(t, false, promise);
} }
} }
@ -523,7 +522,7 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou
try { try {
handler().connect(this, remoteAddress, localAddress, promise); handler().connect(this, remoteAddress, localAddress, promise);
} catch (Throwable t) { } catch (Throwable t) {
notifyOutboundHandlerException(t, promise); handleOutboundHandlerException(t, false, promise);
} }
} }
@ -562,7 +561,7 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou
try { try {
handler().disconnect(this, promise); handler().disconnect(this, promise);
} catch (Throwable t) { } catch (Throwable t) {
notifyOutboundHandlerException(t, promise); handleOutboundHandlerException(t, false, promise);
} }
} }
@ -595,7 +594,7 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou
try { try {
handler().close(this, promise); handler().close(this, promise);
} catch (Throwable t) { } catch (Throwable t) {
notifyOutboundHandlerException(t, promise); handleOutboundHandlerException(t, true, promise);
} }
} }
@ -628,7 +627,7 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou
try { try {
handler().register(this, promise); handler().register(this, promise);
} catch (Throwable t) { } catch (Throwable t) {
notifyOutboundHandlerException(t, promise); handleOutboundHandlerException(t, false, promise);
} }
} }
@ -661,7 +660,7 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou
try { try {
handler().deregister(this, promise); handler().deregister(this, promise);
} catch (Throwable t) { } catch (Throwable t) {
notifyOutboundHandlerException(t, promise); handleOutboundHandlerException(t, false, promise);
} }
} }
@ -688,20 +687,7 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou
try { try {
handler().read(this); handler().read(this);
} catch (Throwable t) { } catch (Throwable t) {
invokeExceptionCaughtFromOutbound(t); handleOutboundHandlerException(t, false, null);
}
}
private void invokeExceptionCaughtFromOutbound(Throwable t) {
if ((executionMask & MASK_EXCEPTION_CAUGHT) != 0) {
invokeExceptionCaught(t);
} else {
DefaultChannelHandlerContext ctx = findContextInbound(MASK_EXCEPTION_CAUGHT);
if (ctx == null) {
notifyHandlerRemovedAlready();
return;
}
ctx.invokeExceptionCaught(t);
} }
} }
@ -722,7 +708,7 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou
try { try {
handler().write(this, m, promise); handler().write(this, m, promise);
} catch (Throwable t) { } catch (Throwable t) {
notifyOutboundHandlerException(t, promise); handleOutboundHandlerException(t, false, promise);
} }
} }
@ -753,7 +739,7 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou
try { try {
handler().flush(this); handler().flush(this);
} catch (Throwable t) { } catch (Throwable t) {
invokeExceptionCaughtFromOutbound(t); handleOutboundHandlerException(t, false, null);
} }
} }
@ -817,8 +803,24 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou
return writeAndFlush(msg, newPromise()); return writeAndFlush(msg, newPromise());
} }
private static void notifyOutboundHandlerException(Throwable cause, ChannelPromise promise) { private void handleOutboundHandlerException(Throwable cause, boolean closeDidThrow, ChannelPromise promise) {
PromiseNotificationUtil.tryFailure(promise, cause, logger); String msg = handler() + " threw an exception while handling an outbound event. This is most likely a bug";
if (promise != null && !promise.isDone()) {
// This is a "best-effort" approach to at least ensure the promise will be completed somehow.
promise.tryFailure(new IllegalStateException(msg, cause));
}
logger.warn("{}. This is most likely a bug, closing the channel.", msg, cause);
if (closeDidThrow) {
// Close itself did throw, just call close() directly and so have the next handler invoked. If we would
// call close() on the Channel we would risk an infinite-loop.
close();
} else {
// Let's close the channel. Calling close on the Channel ensure we start from the end of the pipeline
// and so give all handlers the chance to do something during close.
channel().close();
}
} }
@Override @Override

View File

@ -397,7 +397,7 @@ public class BootstrapTest {
private ChannelPromise registerPromise; private ChannelPromise registerPromise;
@Override @Override
public void register(ChannelHandlerContext ctx, final ChannelPromise promise) throws Exception { public void register(ChannelHandlerContext ctx, final ChannelPromise promise) {
registerPromise = promise; registerPromise = promise;
latch.countDown(); latch.countDown();
ChannelPromise newPromise = ctx.newPromise(); ChannelPromise newPromise = ctx.newPromise();

View File

@ -1941,7 +1941,7 @@ public class DefaultChannelPipelineTest {
final Queue<Object> outboundBuffer = new ArrayDeque<>(); final Queue<Object> outboundBuffer = new ArrayDeque<>();
@Override @Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
outboundBuffer.add(msg); outboundBuffer.add(msg);
} }

View File

@ -30,51 +30,50 @@ final class LoggingHandler implements ChannelHandler {
private final EnumSet<Event> interest = EnumSet.allOf(Event.class); private final EnumSet<Event> interest = EnumSet.allOf(Event.class);
@Override @Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
log(Event.WRITE); log(Event.WRITE);
ctx.write(msg, promise); ctx.write(msg, promise);
} }
@Override @Override
public void flush(ChannelHandlerContext ctx) throws Exception { public void flush(ChannelHandlerContext ctx) {
log(Event.FLUSH); log(Event.FLUSH);
ctx.flush(); ctx.flush();
} }
@Override @Override
public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) {
throws Exception {
log(Event.BIND, "localAddress=" + localAddress); log(Event.BIND, "localAddress=" + localAddress);
ctx.bind(localAddress, promise); ctx.bind(localAddress, promise);
} }
@Override @Override
public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress,
ChannelPromise promise) throws Exception { ChannelPromise promise) {
log(Event.CONNECT, "remoteAddress=" + remoteAddress + " localAddress=" + localAddress); log(Event.CONNECT, "remoteAddress=" + remoteAddress + " localAddress=" + localAddress);
ctx.connect(remoteAddress, localAddress, promise); ctx.connect(remoteAddress, localAddress, promise);
} }
@Override @Override
public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) {
log(Event.DISCONNECT); log(Event.DISCONNECT);
ctx.disconnect(promise); ctx.disconnect(promise);
} }
@Override @Override
public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { public void close(ChannelHandlerContext ctx, ChannelPromise promise) {
log(Event.CLOSE); log(Event.CLOSE);
ctx.close(promise); ctx.close(promise);
} }
@Override @Override
public void register(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { public void register(ChannelHandlerContext ctx, ChannelPromise promise) {
log(Event.REGISTER); log(Event.REGISTER);
ctx.register(promise); ctx.register(promise);
} }
@Override @Override
public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) {
log(Event.DEREGISTER); log(Event.DEREGISTER);
ctx.deregister(promise); ctx.deregister(promise);
} }

View File

@ -44,7 +44,7 @@ public class PendingWriteQueueTest {
public void testRemoveAndWrite() { public void testRemoveAndWrite() {
assertWrite(new TestHandler() { assertWrite(new TestHandler() {
@Override @Override
public void flush(ChannelHandlerContext ctx) throws Exception { public void flush(ChannelHandlerContext ctx) {
assertFalse(ctx.channel().isWritable(), "Should not be writable anymore"); assertFalse(ctx.channel().isWritable(), "Should not be writable anymore");
ChannelFuture future = queue.removeAndWrite(); ChannelFuture future = queue.removeAndWrite();
@ -58,7 +58,7 @@ public class PendingWriteQueueTest {
public void testRemoveAndWriteAll() { public void testRemoveAndWriteAll() {
assertWrite(new TestHandler() { assertWrite(new TestHandler() {
@Override @Override
public void flush(ChannelHandlerContext ctx) throws Exception { public void flush(ChannelHandlerContext ctx) {
assertFalse(ctx.channel().isWritable(), "Should not be writable anymore"); assertFalse(ctx.channel().isWritable(), "Should not be writable anymore");
ChannelFuture future = queue.removeAndWriteAll(); ChannelFuture future = queue.removeAndWriteAll();
@ -73,7 +73,7 @@ public class PendingWriteQueueTest {
assertWriteFails(new TestHandler() { assertWriteFails(new TestHandler() {
@Override @Override
public void flush(ChannelHandlerContext ctx) throws Exception { public void flush(ChannelHandlerContext ctx) {
queue.removeAndFail(new TestException()); queue.removeAndFail(new TestException());
super.flush(ctx); super.flush(ctx);
} }
@ -84,7 +84,7 @@ public class PendingWriteQueueTest {
public void testRemoveAndFailAll() { public void testRemoveAndFailAll() {
assertWriteFails(new TestHandler() { assertWriteFails(new TestHandler() {
@Override @Override
public void flush(ChannelHandlerContext ctx) throws Exception { public void flush(ChannelHandlerContext ctx) {
queue.removeAndFailAll(new TestException()); queue.removeAndFailAll(new TestException());
super.flush(ctx); super.flush(ctx);
} }
@ -358,7 +358,7 @@ public class PendingWriteQueueTest {
} }
@Override @Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
queue.add(msg, promise); queue.add(msg, promise);
assertFalse(queue.isEmpty()); assertFalse(queue.isEmpty());
assertEquals(++expectedSize, queue.size()); assertEquals(++expectedSize, queue.size());

View File

@ -173,7 +173,7 @@ public class ReentrantChannelTest extends BaseChannelTest {
int flushCount; int flushCount;
@Override @Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
if (writeCount < 5) { if (writeCount < 5) {
writeCount++; writeCount++;
ctx.channel().flush(); ctx.channel().flush();
@ -182,7 +182,7 @@ public class ReentrantChannelTest extends BaseChannelTest {
} }
@Override @Override
public void flush(ChannelHandlerContext ctx) throws Exception { public void flush(ChannelHandlerContext ctx) {
if (flushCount < 5) { if (flushCount < 5) {
flushCount++; flushCount++;
ctx.channel().write(createTestBuf(2000)); ctx.channel().write(createTestBuf(2000));
@ -227,7 +227,7 @@ public class ReentrantChannelTest extends BaseChannelTest {
clientChannel.pipeline().addLast(new ChannelHandler() { clientChannel.pipeline().addLast(new ChannelHandler() {
@Override @Override
public void write(final ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { public void write(final ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
promise.addListener(future -> ctx.channel().close()); promise.addListener(future -> ctx.channel().close());
ctx.write(msg, promise); ctx.write(msg, promise);
ctx.channel().flush(); ctx.channel().flush();
@ -257,14 +257,8 @@ public class ReentrantChannelTest extends BaseChannelTest {
clientChannel.pipeline().addLast(new ChannelHandler() { clientChannel.pipeline().addLast(new ChannelHandler() {
@Override @Override
public void flush(ChannelHandlerContext ctx) throws Exception { public void flush(ChannelHandlerContext ctx) {
throw new Exception("intentional failure"); throw new RuntimeException("intentional failure");
}
}, new ChannelHandler() {
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
ctx.close();
} }
}); });

View File

@ -255,7 +255,7 @@ public class EmbeddedChannelTest {
public void testHasNoDisconnectSkipDisconnect() throws InterruptedException { public void testHasNoDisconnectSkipDisconnect() throws InterruptedException {
EmbeddedChannel channel = new EmbeddedChannel(false, new ChannelHandler() { EmbeddedChannel channel = new EmbeddedChannel(false, new ChannelHandler() {
@Override @Override
public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { public void close(ChannelHandlerContext ctx, ChannelPromise promise) {
promise.tryFailure(new Throwable()); promise.tryFailure(new Throwable());
} }
}); });
@ -347,8 +347,7 @@ public class EmbeddedChannelTest {
public void testWriteLater() { public void testWriteLater() {
EmbeddedChannel channel = new EmbeddedChannel(new ChannelHandler() { EmbeddedChannel channel = new EmbeddedChannel(new ChannelHandler() {
@Override @Override
public void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise) public void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise) {
throws Exception {
ctx.executor().execute(() -> ctx.write(msg, promise)); ctx.executor().execute(() -> ctx.write(msg, promise));
} }
}); });
@ -365,8 +364,7 @@ public class EmbeddedChannelTest {
final int delay = 500; final int delay = 500;
EmbeddedChannel channel = new EmbeddedChannel(new ChannelHandler() { EmbeddedChannel channel = new EmbeddedChannel(new ChannelHandler() {
@Override @Override
public void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise) public void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise) {
throws Exception {
ctx.executor().schedule(() -> { ctx.executor().schedule(() -> {
ctx.writeAndFlush(msg, promise); ctx.writeAndFlush(msg, promise);
}, delay, TimeUnit.MILLISECONDS); }, delay, TimeUnit.MILLISECONDS);
@ -434,7 +432,7 @@ public class EmbeddedChannelTest {
final CountDownLatch latch = new CountDownLatch(1); final CountDownLatch latch = new CountDownLatch(1);
EmbeddedChannel channel = new EmbeddedChannel(new ChannelHandler() { EmbeddedChannel channel = new EmbeddedChannel(new ChannelHandler() {
@Override @Override
public void flush(ChannelHandlerContext ctx) throws Exception { public void flush(ChannelHandlerContext ctx) {
latch.countDown(); latch.countDown();
} }
}); });
@ -453,13 +451,13 @@ public class EmbeddedChannelTest {
EmbeddedChannel channel = new EmbeddedChannel(new ChannelHandler() { EmbeddedChannel channel = new EmbeddedChannel(new ChannelHandler() {
@Override @Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
ctx.write(msg, promise); ctx.write(msg, promise);
latch.countDown(); latch.countDown();
} }
@Override @Override
public void flush(ChannelHandlerContext ctx) throws Exception { public void flush(ChannelHandlerContext ctx) {
flushCount.incrementAndGet(); flushCount.incrementAndGet();
} }
}); });
@ -572,13 +570,13 @@ public class EmbeddedChannelTest {
private final Queue<Integer> queue = new ArrayDeque<>(); private final Queue<Integer> queue = new ArrayDeque<>();
@Override @Override
public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) {
queue.add(DISCONNECT); queue.add(DISCONNECT);
promise.setSuccess(); promise.setSuccess();
} }
@Override @Override
public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { public void close(ChannelHandlerContext ctx, ChannelPromise promise) {
queue.add(CLOSE); queue.add(CLOSE);
promise.setSuccess(); promise.setSuccess();
} }