Adjust usage of ChannelFutureListeners.CLOSE to use the ChannelHandlerContext (#11631)

Motivation:

Usually the outbound operation should start at the "current" ChanneöHandlercontext which was often not the case

Modifications:

Use the ChannelHandlerContext for closing the connection

Result:

Start the operation on the right position of the pipeline
This commit is contained in:
Norman Maurer 2021-08-31 12:49:30 +02:00 committed by GitHub
parent 8cc34e5c4c
commit a3c44f5a99
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
31 changed files with 42 additions and 42 deletions

View File

@ -87,7 +87,7 @@ public class HttpServerKeepAliveHandler implements ChannelHandler {
boolean shouldClose = msg instanceof LastHttpContent && !shouldKeepAlive(); boolean shouldClose = msg instanceof LastHttpContent && !shouldKeepAlive();
Future<Void> future = ctx.write(msg); Future<Void> future = ctx.write(msg);
if (shouldClose) { if (shouldClose) {
future.addListener(ctx.channel(), ChannelFutureListeners.CLOSE); future.addListener(ctx, ChannelFutureListeners.CLOSE);
} }
return future; return future;
} }

View File

@ -244,7 +244,7 @@ public class CorsHandler implements ChannelHandler {
Future<Void> future = ctx.writeAndFlush(response); Future<Void> future = ctx.writeAndFlush(response);
if (!keepAlive) { if (!keepAlive) {
future.addListener(ctx.channel(), ChannelFutureListeners.CLOSE); future.addListener(ctx, ChannelFutureListeners.CLOSE);
} }
} }
} }

View File

@ -92,7 +92,7 @@ public class Utf8FrameValidator implements ChannelHandler {
@Override @Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
if (cause instanceof CorruptedFrameException && ctx.channel().isOpen()) { if (cause instanceof CorruptedFrameException && ctx.channel().isOpen()) {
ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ctx.channel(), ChannelFutureListeners.CLOSE); ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ctx, ChannelFutureListeners.CLOSE);
} }
ctx.fireExceptionCaught(cause); ctx.fireExceptionCaught(cause);
} }

View File

@ -448,7 +448,7 @@ public class WebSocket08FrameDecoder extends ByteToMessageDecoder
} }
closeMessage = new CloseWebSocketFrame(closeStatus, reasonText); closeMessage = new CloseWebSocketFrame(closeStatus, reasonText);
} }
ctx.writeAndFlush(closeMessage).addListener(ctx.channel(), ChannelFutureListeners.CLOSE); ctx.writeAndFlush(closeMessage).addListener(ctx, ChannelFutureListeners.CLOSE);
} }
throw ex; throw ex;
} }

View File

@ -332,7 +332,7 @@ public abstract class WebSocketServerHandshaker {
*/ */
public Future<Void> close(Channel channel, CloseWebSocketFrame frame) { public Future<Void> close(Channel channel, CloseWebSocketFrame frame) {
requireNonNull(channel, "channel"); requireNonNull(channel, "channel");
return close0(channel, channel, frame); return close0(channel, frame);
} }
/** /**
@ -345,11 +345,11 @@ public abstract class WebSocketServerHandshaker {
*/ */
public Future<Void> close(ChannelHandlerContext ctx, CloseWebSocketFrame frame) { public Future<Void> close(ChannelHandlerContext ctx, CloseWebSocketFrame frame) {
requireNonNull(ctx, "ctx"); requireNonNull(ctx, "ctx");
return close0(ctx, ctx.channel(), frame); return close0(ctx, frame);
} }
private static Future<Void> close0(ChannelOutboundInvoker invoker, Channel channel, CloseWebSocketFrame frame) { private static Future<Void> close0(ChannelOutboundInvoker invoker, CloseWebSocketFrame frame) {
return invoker.writeAndFlush(frame).addListener(channel, ChannelFutureListeners.CLOSE); return invoker.writeAndFlush(frame).addListener(invoker, ChannelFutureListeners.CLOSE);
} }
/** /**

View File

@ -241,7 +241,7 @@ public class WebSocketServerProtocolHandler extends WebSocketProtocolHandler {
closeSent(promise); closeSent(promise);
handshaker.close(ctx, (CloseWebSocketFrame) frame).cascadeTo(promise); handshaker.close(ctx, (CloseWebSocketFrame) frame).cascadeTo(promise);
} else { } else {
ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ctx.channel(), ChannelFutureListeners.CLOSE); ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ctx, ChannelFutureListeners.CLOSE);
} }
return; return;
} }
@ -258,7 +258,7 @@ public class WebSocketServerProtocolHandler extends WebSocketProtocolHandler {
if (cause instanceof WebSocketHandshakeException) { if (cause instanceof WebSocketHandshakeException) {
FullHttpResponse response = new DefaultFullHttpResponse( FullHttpResponse response = new DefaultFullHttpResponse(
HTTP_1_1, HttpResponseStatus.BAD_REQUEST, Unpooled.wrappedBuffer(cause.getMessage().getBytes())); HTTP_1_1, HttpResponseStatus.BAD_REQUEST, Unpooled.wrappedBuffer(cause.getMessage().getBytes()));
ctx.channel().writeAndFlush(response).addListener(ctx.channel(), ChannelFutureListeners.CLOSE); ctx.channel().writeAndFlush(response).addListener(ctx, ChannelFutureListeners.CLOSE);
} else { } else {
ctx.fireExceptionCaught(cause); ctx.fireExceptionCaught(cause);
ctx.close(); ctx.close();

View File

@ -125,9 +125,9 @@ class WebSocketServerProtocolHandshakeHandler implements ChannelHandler {
} }
private static void sendHttpResponse(ChannelHandlerContext ctx, HttpRequest req, HttpResponse res) { private static void sendHttpResponse(ChannelHandlerContext ctx, HttpRequest req, HttpResponse res) {
Future<Void> f = ctx.channel().writeAndFlush(res); Future<Void> f = ctx.writeAndFlush(res);
if (!isKeepAlive(req) || res.status().code() != 200) { if (!isKeepAlive(req) || res.status().code() != 200) {
f.addListener(ctx.channel(), ChannelFutureListeners.CLOSE); f.addListener(ctx, ChannelFutureListeners.CLOSE);
} }
} }

View File

@ -238,7 +238,7 @@ public abstract class MessageAggregator<I, S, C extends ByteBufHolder, O extends
Future<Void> future = ctx.writeAndFlush(continueResponse).addListener(ctx, listener); Future<Void> future = ctx.writeAndFlush(continueResponse).addListener(ctx, listener);
if (closeAfterWrite) { if (closeAfterWrite) {
future.addListener(ctx.channel(), ChannelFutureListeners.CLOSE); future.addListener(ctx, ChannelFutureListeners.CLOSE);
return; return;
} }
if (handlingOversizedMessage) { if (handlingOversizedMessage) {

View File

@ -67,7 +67,7 @@ public class FileServerHandler extends SimpleChannelInboundHandler<String> {
ctx.writeAndFlush("ERR: " + ctx.writeAndFlush("ERR: " +
cause.getClass().getSimpleName() + ": " + cause.getClass().getSimpleName() + ": " +
cause.getMessage() + '\n') cause.getMessage() + '\n')
.addListener(ctx.channel(), ChannelFutureListeners.CLOSE); .addListener(ctx, ChannelFutureListeners.CLOSE);
} }
} }
} }

View File

@ -34,6 +34,6 @@ public class OkResponseHandler extends SimpleChannelInboundHandler<Object> {
final FullHttpResponse response = new DefaultFullHttpResponse( final FullHttpResponse response = new DefaultFullHttpResponse(
HttpVersion.HTTP_1_1, HttpResponseStatus.OK, Unpooled.EMPTY_BUFFER); HttpVersion.HTTP_1_1, HttpResponseStatus.OK, Unpooled.EMPTY_BUFFER);
response.headers().set("custom-response-header", "Some value"); response.headers().set("custom-response-header", "Some value");
ctx.writeAndFlush(response).addListener(ctx.channel(), ChannelFutureListeners.CLOSE); ctx.writeAndFlush(response).addListener(ctx, ChannelFutureListeners.CLOSE);
} }
} }

View File

@ -219,7 +219,7 @@ public class HttpStaticFileServerHandler extends SimpleChannelInboundHandler<Ful
// Decide whether to close the connection or not. // Decide whether to close the connection or not.
if (!keepAlive) { if (!keepAlive) {
// Close the connection when the whole content is written out. // Close the connection when the whole content is written out.
lastContentFuture.addListener(ctx.channel(), ChannelFutureListeners.CLOSE); lastContentFuture.addListener(ctx, ChannelFutureListeners.CLOSE);
} }
} }
@ -353,7 +353,7 @@ public class HttpStaticFileServerHandler extends SimpleChannelInboundHandler<Ful
if (!keepAlive) { if (!keepAlive) {
// Close the connection as soon as the response is sent. // Close the connection as soon as the response is sent.
flushPromise.addListener(ctx.channel(), ChannelFutureListeners.CLOSE); flushPromise.addListener(ctx, ChannelFutureListeners.CLOSE);
} }
} }

View File

@ -66,7 +66,7 @@ public class HttpHelloWorldServerHandler extends SimpleChannelInboundHandler<Htt
Future<Void> f = ctx.write(response); Future<Void> f = ctx.write(response);
if (!keepAlive) { if (!keepAlive) {
f.addListener(ctx.channel(), ChannelFutureListeners.CLOSE); f.addListener(ctx, ChannelFutureListeners.CLOSE);
} }
} }
} }

View File

@ -127,7 +127,7 @@ public class HttpSnoopServerHandler extends SimpleChannelInboundHandler<Object>
if (!writeResponse(trailer, ctx)) { if (!writeResponse(trailer, ctx)) {
// If keep-alive is off, close the connection once the content is fully written. // If keep-alive is off, close the connection once the content is fully written.
ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ctx.channel(), ChannelFutureListeners.CLOSE); ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ctx, ChannelFutureListeners.CLOSE);
} }
} }
} }

View File

@ -438,16 +438,16 @@ public class HttpUploadServerHandler extends SimpleChannelInboundHandler<HttpObj
} }
// Write the response. // Write the response.
Future<Void> future = ctx.channel().writeAndFlush(response); Future<Void> future = ctx.writeAndFlush(response);
// Close the connection after the write operation is done if necessary. // Close the connection after the write operation is done if necessary.
if (!keepAlive) { if (!keepAlive) {
future.addListener(ctx.channel(), ChannelFutureListeners.CLOSE); future.addListener(ctx, ChannelFutureListeners.CLOSE);
} }
} }
@Override @Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
logger.log(Level.WARNING, responseContent.toString(), cause); logger.log(Level.WARNING, responseContent.toString(), cause);
ctx.channel().close(); ctx.close();
} }
} }

View File

@ -146,7 +146,7 @@ public class WebSocketServerHandler extends SimpleChannelInboundHandler<Object>
if (!HttpUtil.isKeepAlive(req) || res.status().code() != 200) { if (!HttpUtil.isKeepAlive(req) || res.status().code() != 200) {
// Tell the client we're going to close the connection. // Tell the client we're going to close the connection.
res.headers().set(CONNECTION, CLOSE); res.headers().set(CONNECTION, CLOSE);
ctx.writeAndFlush(res).addListener(ctx.channel(), ChannelFutureListeners.CLOSE); ctx.writeAndFlush(res).addListener(ctx, ChannelFutureListeners.CLOSE);
} else { } else {
if (req.protocolVersion().equals(HTTP_1_0)) { if (req.protocolVersion().equals(HTTP_1_0)) {
res.headers().set(CONNECTION, KEEP_ALIVE); res.headers().set(CONNECTION, KEEP_ALIVE);

View File

@ -101,7 +101,7 @@ public class WebSocketIndexPageHandler extends SimpleChannelInboundHandler<FullH
if (!HttpUtil.isKeepAlive(req) || res.status().code() != 200) { if (!HttpUtil.isKeepAlive(req) || res.status().code() != 200) {
// Tell the client we're going to close the connection. // Tell the client we're going to close the connection.
res.headers().set(CONNECTION, CLOSE); res.headers().set(CONNECTION, CLOSE);
ctx.writeAndFlush(res).addListener(ctx.channel(), ChannelFutureListeners.CLOSE); ctx.writeAndFlush(res).addListener(ctx, ChannelFutureListeners.CLOSE);
} else { } else {
if (req.protocolVersion().equals(HTTP_1_0)) { if (req.protocolVersion().equals(HTTP_1_0)) {
res.headers().set(CONNECTION, KEEP_ALIVE); res.headers().set(CONNECTION, KEEP_ALIVE);

View File

@ -70,7 +70,7 @@ public class HelloWorldHttp1Handler extends SimpleChannelInboundHandler<FullHttp
} else { } else {
// Tell the client we're going to close the connection. // Tell the client we're going to close the connection.
response.headers().set(CONNECTION, CLOSE); response.headers().set(CONNECTION, CLOSE);
ctx.write(response).addListener(ctx.channel(), ChannelFutureListeners.CLOSE); ctx.write(response).addListener(ctx, ChannelFutureListeners.CLOSE);
} }
} }

View File

@ -58,7 +58,7 @@ public final class FallbackRequestHandler extends SimpleChannelInboundHandler<Ht
response.headers().set(CONTENT_TYPE, "text/html; charset=UTF-8"); response.headers().set(CONTENT_TYPE, "text/html; charset=UTF-8");
response.headers().setInt(CONTENT_LENGTH, response.content().readableBytes()); response.headers().setInt(CONTENT_LENGTH, response.content().readableBytes());
ctx.write(response).addListener(ctx.channel(), ChannelFutureListeners.CLOSE); ctx.write(response).addListener(ctx, ChannelFutureListeners.CLOSE);
} }
@Override @Override

View File

@ -60,7 +60,7 @@ public final class Http1RequestHandler extends Http2RequestHandler {
} else { } else {
// Tell the client we're going to close the connection. // Tell the client we're going to close the connection.
response.headers().set(CONNECTION, CLOSE); response.headers().set(CONNECTION, CLOSE);
ctx.writeAndFlush(response).addListener(ctx.channel(), ChannelFutureListeners.CLOSE); ctx.writeAndFlush(response).addListener(ctx, ChannelFutureListeners.CLOSE);
} }
}, latency, TimeUnit.MILLISECONDS); }, latency, TimeUnit.MILLISECONDS);
} }

View File

@ -175,7 +175,7 @@ public class StompChatHandler extends SimpleChannelInboundHandler<StompFrame> {
StompFrame receiptFrame = new DefaultStompFrame(StompCommand.RECEIPT); StompFrame receiptFrame = new DefaultStompFrame(StompCommand.RECEIPT);
receiptFrame.headers().set(RECEIPT_ID, receiptId); receiptFrame.headers().set(RECEIPT_ID, receiptId);
ctx.writeAndFlush(receiptFrame).addListener(ctx.channel(), ChannelFutureListeners.CLOSE); ctx.writeAndFlush(receiptFrame).addListener(ctx, ChannelFutureListeners.CLOSE);
} }
private static void sendErrorFrame(String message, String description, ChannelHandlerContext ctx) { private static void sendErrorFrame(String message, String description, ChannelHandlerContext ctx) {
@ -186,7 +186,7 @@ public class StompChatHandler extends SimpleChannelInboundHandler<StompFrame> {
errorFrame.content().writeCharSequence(description, CharsetUtil.UTF_8); errorFrame.content().writeCharSequence(description, CharsetUtil.UTF_8);
} }
ctx.writeAndFlush(errorFrame).addListener(ctx.channel(), ChannelFutureListeners.CLOSE); ctx.writeAndFlush(errorFrame).addListener(ctx, ChannelFutureListeners.CLOSE);
} }
private static StompFrame transformToMessage(StompFrame sendFrame, StompSubscription subscription) { private static StompFrame transformToMessage(StompFrame sendFrame, StompSubscription subscription) {

View File

@ -136,7 +136,7 @@ public final class StompWebSocketClientPageHandler extends SimpleChannelInboundH
ctx.write(response); ctx.write(response);
} else { } else {
response.headers().set(CONNECTION, CLOSE); response.headers().set(CONNECTION, CLOSE);
ctx.write(response).addListener(ctx.channel(), ChannelFutureListeners.CLOSE); ctx.write(response).addListener(ctx, ChannelFutureListeners.CLOSE);
} }
if (autoFlush) { if (autoFlush) {

View File

@ -59,7 +59,7 @@ public class TelnetServerHandler extends SimpleChannelInboundHandler<String> {
// Close the connection after sending 'Have a good day!' // Close the connection after sending 'Have a good day!'
// if the client has sent 'bye'. // if the client has sent 'bye'.
if (close) { if (close) {
future.addListener(ctx.channel(), ChannelFutureListeners.CLOSE); future.addListener(ctx, ChannelFutureListeners.CLOSE);
} }
} }

View File

@ -273,7 +273,7 @@ abstract class ProxyServer {
ctx.write(Unpooled.copiedBuffer("2\n", CharsetUtil.US_ASCII)); ctx.write(Unpooled.copiedBuffer("2\n", CharsetUtil.US_ASCII));
} else if ("C\n".equals(str)) { } else if ("C\n".equals(str)) {
ctx.write(Unpooled.copiedBuffer("3\n", CharsetUtil.US_ASCII)) ctx.write(Unpooled.copiedBuffer("3\n", CharsetUtil.US_ASCII))
.addListener(ctx.channel(), ChannelFutureListeners.CLOSE); .addListener(ctx, ChannelFutureListeners.CLOSE);
} else { } else {
throw new IllegalStateException("unexpected message: " + str); throw new IllegalStateException("unexpected message: " + str);
} }

View File

@ -65,7 +65,7 @@ public abstract class AbstractRemoteAddressFilter<T extends SocketAddress> imple
} else { } else {
Future<Void> rejectedFuture = channelRejected(ctx, remoteAddress); Future<Void> rejectedFuture = channelRejected(ctx, remoteAddress);
if (rejectedFuture != null && !rejectedFuture.isDone()) { if (rejectedFuture != null && !rejectedFuture.isDone()) {
rejectedFuture.addListener(ctx.channel(), ChannelFutureListeners.CLOSE); rejectedFuture.addListener(ctx, ChannelFutureListeners.CLOSE);
} else { } else {
ctx.close(); ctx.close();
} }

View File

@ -130,9 +130,9 @@ public class AutobahnServerHandler implements ChannelHandler {
} }
// Send the response and close the connection if necessary. // Send the response and close the connection if necessary.
Future<Void> f = ctx.channel().writeAndFlush(res); Future<Void> f = ctx.writeAndFlush(res);
if (!isKeepAlive(req) || res.status().code() != 200) { if (!isKeepAlive(req) || res.status().code() != 200) {
f.addListener(ctx.channel(), ChannelFutureListeners.CLOSE); f.addListener(ctx, ChannelFutureListeners.CLOSE);
} }
} }

View File

@ -60,7 +60,7 @@ public class HelloWorldHttp1Handler extends SimpleChannelInboundHandler<FullHttp
response.headers().setInt(CONTENT_LENGTH, response.content().readableBytes()); response.headers().setInt(CONTENT_LENGTH, response.content().readableBytes());
if (!keepAlive) { if (!keepAlive) {
ctx.write(response).addListener(ctx.channel(), ChannelFutureListeners.CLOSE); ctx.write(response).addListener(ctx, ChannelFutureListeners.CLOSE);
} else { } else {
response.headers().set(CONNECTION, HttpHeaderValues.KEEP_ALIVE); response.headers().set(CONNECTION, HttpHeaderValues.KEEP_ALIVE);
ctx.write(response); ctx.write(response);

View File

@ -51,7 +51,7 @@ public class HttpNativeServerHandler extends SimpleChannelInboundHandler<HttpObj
response.headers().setInt(CONTENT_LENGTH, response.content().readableBytes()); response.headers().setInt(CONTENT_LENGTH, response.content().readableBytes());
if (!keepAlive) { if (!keepAlive) {
ctx.write(response).addListener(ctx.channel(), ChannelFutureListeners.CLOSE); ctx.write(response).addListener(ctx, ChannelFutureListeners.CLOSE);
} else { } else {
response.headers().set(CONNECTION, KEEP_ALIVE); response.headers().set(CONNECTION, KEEP_ALIVE);
ctx.write(response); ctx.write(response);

View File

@ -62,7 +62,7 @@ public class CompositeBufferGatheringWriteTest extends AbstractSocketTest {
@Override @Override
public void channelActive(ChannelHandlerContext ctx) throws Exception { public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush(newCompositeBuffer(ctx.alloc())) ctx.writeAndFlush(newCompositeBuffer(ctx.alloc()))
.addListener(ctx.channel(), ChannelFutureListeners.CLOSE); .addListener(ctx, ChannelFutureListeners.CLOSE);
} }
}); });
} }
@ -196,7 +196,7 @@ public class CompositeBufferGatheringWriteTest extends AbstractSocketTest {
// Write the remainder of the content // Write the remainder of the content
ctx.writeAndFlush(expectedContent.retainedSlice(expectedContent.readerIndex() + offset, ctx.writeAndFlush(expectedContent.retainedSlice(expectedContent.readerIndex() + offset,
expectedContent.readableBytes() - expectedContent.readerIndex() - offset)) expectedContent.readableBytes() - expectedContent.readerIndex() - offset))
.addListener(ctx.channel(), ChannelFutureListeners.CLOSE); .addListener(ctx, ChannelFutureListeners.CLOSE);
} }
@Override @Override

View File

@ -447,7 +447,7 @@ public class SocketHalfClosedTest extends AbstractSocketTest {
public void channelActive(ChannelHandlerContext ctx) throws Exception { public void channelActive(ChannelHandlerContext ctx) throws Exception {
ByteBuf buf = ctx.alloc().buffer(totalServerBytesWritten); ByteBuf buf = ctx.alloc().buffer(totalServerBytesWritten);
buf.writerIndex(buf.capacity()); buf.writerIndex(buf.capacity());
ctx.writeAndFlush(buf).addListener(ctx.channel(), ChannelFutureListeners.CLOSE); ctx.writeAndFlush(buf).addListener(ctx, ChannelFutureListeners.CLOSE);
serverInitializedLatch.countDown(); serverInitializedLatch.countDown();
} }

View File

@ -228,7 +228,7 @@ public class ReentrantChannelTest extends BaseChannelTest {
@Override @Override
public Future<Void> write(final ChannelHandlerContext ctx, Object msg) { public Future<Void> write(final ChannelHandlerContext ctx, Object msg) {
Future<Void> f = ctx.write(msg).addListener(ctx.channel(), ChannelFutureListeners.CLOSE); Future<Void> f = ctx.write(msg).addListener(ctx, ChannelFutureListeners.CLOSE);
ctx.channel().flush(); ctx.channel().flush();
return f; return f;
} }

View File

@ -80,7 +80,7 @@ public class NioSocketChannelTest extends AbstractNioChannelTest<NioSocketChanne
public void channelActive(ChannelHandlerContext ctx) throws Exception { public void channelActive(ChannelHandlerContext ctx) throws Exception {
// Write a large enough data so that it is split into two loops. // Write a large enough data so that it is split into two loops.
futures.add(ctx.write(ctx.alloc().buffer().writeZero(1048576)) futures.add(ctx.write(ctx.alloc().buffer().writeZero(1048576))
.addListener(ctx.channel(), ChannelFutureListeners.CLOSE)); .addListener(ctx, ChannelFutureListeners.CLOSE));
futures.add(ctx.write(ctx.alloc().buffer().writeZero(1048576))); futures.add(ctx.write(ctx.alloc().buffer().writeZero(1048576)));
ctx.flush(); ctx.flush();
futures.add(ctx.write(ctx.alloc().buffer().writeZero(1048576))); futures.add(ctx.write(ctx.alloc().buffer().writeZero(1048576)));