From 697952e3e65acd8f40f9c5bb57c9a9134f77d397 Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Wed, 21 Apr 2021 15:16:10 +0200 Subject: [PATCH] Add WebSocketClientHandshaker / WebSocketServerHandshaker close methods that take ChannelHandlerContext as parameter (#11171) Motivation: At the moment we only expose close(...) methods that take a Channel as paramater. This can be problematic as the write will start at the end of the pipeline which may contain ChannelOutboundHandler implementations that not expect WebSocketFrame objects. We should better also support to pass in a ChannelHandlerContext as starting point for the write which ensures that the WebSocketFrame objects will be handled correctly from this position of the pipeline. Modifications: - Add new close(...) methods that take a ChannelHandlerContext - Add javadoc sentence to point users to the new methods. Result: Be able to "start" the close at the right position in the pipeline. --- .../websocketx/WebSocketClientHandshaker.java | 55 ++++++++++++++++--- .../websocketx/WebSocketProtocolHandler.java | 6 +- .../websocketx/WebSocketServerHandshaker.java | 55 ++++++++++++++++--- .../WebSocketServerHandshaker00.java | 24 +++++++- .../WebSocketServerProtocolHandler.java | 5 +- .../WebSocketServerHandler.java | 2 +- .../autobahn/AutobahnServerHandler.java | 2 +- 7 files changed, 126 insertions(+), 23 deletions(-) diff --git a/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketClientHandshaker.java b/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketClientHandshaker.java index a90fb79fa9..e59e30a048 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketClientHandshaker.java +++ b/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketClientHandshaker.java @@ -18,7 +18,9 @@ package io.netty.handler.codec.http.websocketx; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelOutboundInvoker; import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPromise; import io.netty.channel.SimpleChannelInboundHandler; @@ -491,7 +493,10 @@ public abstract class WebSocketClientHandshaker { protected abstract WebSocketFrameEncoder newWebSocketEncoder(); /** - * Performs the closing handshake + * Performs the closing handshake. + * + * When called from within a {@link ChannelHandler} you most likely want to use + * {@link #close(ChannelHandlerContext, CloseWebSocketFrame)}. * * @param channel * Channel @@ -506,6 +511,9 @@ public abstract class WebSocketClientHandshaker { /** * Performs the closing handshake * + * When called from within a {@link ChannelHandler} you most likely want to use + * {@link #close(ChannelHandlerContext, CloseWebSocketFrame, ChannelPromise)}. + * * @param channel * Channel * @param frame @@ -515,21 +523,49 @@ public abstract class WebSocketClientHandshaker { */ public ChannelFuture close(Channel channel, CloseWebSocketFrame frame, ChannelPromise promise) { ObjectUtil.checkNotNull(channel, "channel"); - channel.writeAndFlush(frame, promise); - applyForceCloseTimeout(channel, promise); - return promise; + return close0(channel, channel, frame, promise); } - private void applyForceCloseTimeout(final Channel channel, ChannelFuture flushFuture) { + /** + * Performs the closing handshake + * + * @param ctx + * the {@link ChannelHandlerContext} to use. + * @param frame + * Closing Frame that was received + */ + public ChannelFuture close(ChannelHandlerContext ctx, CloseWebSocketFrame frame) { + ObjectUtil.checkNotNull(ctx, "ctx"); + return close(ctx, frame, ctx.newPromise()); + } + + /** + * Performs the closing handshake + * + * @param ctx + * the {@link ChannelHandlerContext} to use. + * @param frame + * Closing Frame that was received + * @param promise + * the {@link ChannelPromise} to be notified when the closing handshake is done + */ + public ChannelFuture close(ChannelHandlerContext ctx, CloseWebSocketFrame frame, ChannelPromise promise) { + ObjectUtil.checkNotNull(ctx, "ctx"); + return close0(ctx, ctx.channel(), frame, promise); + } + + private ChannelFuture close0(final ChannelOutboundInvoker invoker, final Channel channel, + CloseWebSocketFrame frame, ChannelPromise promise) { + invoker.writeAndFlush(frame, promise); final long forceCloseTimeoutMillis = this.forceCloseTimeoutMillis; final WebSocketClientHandshaker handshaker = this; if (forceCloseTimeoutMillis <= 0 || !channel.isActive() || forceCloseInit != 0) { - return; + return promise; } - flushFuture.addListener(new ChannelFutureListener() { + promise.addListener(new ChannelFutureListener() { @Override - public void operationComplete(ChannelFuture future) throws Exception { + public void operationComplete(ChannelFuture future) { // If flush operation failed, there is no reason to expect // a server to receive CloseFrame. Thus this should be handled // by the application separately. @@ -540,7 +576,7 @@ public abstract class WebSocketClientHandshaker { @Override public void run() { if (channel.isActive()) { - channel.close(); + invoker.close(); forceCloseComplete = true; } } @@ -555,6 +591,7 @@ public abstract class WebSocketClientHandshaker { } } }); + return promise; } /** diff --git a/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketProtocolHandler.java b/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketProtocolHandler.java index 59b6613622..9f767a72cf 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketProtocolHandler.java +++ b/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketProtocolHandler.java @@ -112,13 +112,17 @@ abstract class WebSocketProtocolHandler extends MessageToMessageDecoder // Check for closing frame if (frame instanceof CloseWebSocketFrame) { - handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain()); + handshaker.close(ctx, (CloseWebSocketFrame) frame.retain()); return; } if (frame instanceof PingWebSocketFrame) { diff --git a/testsuite-autobahn/src/main/java/io/netty/testsuite/autobahn/AutobahnServerHandler.java b/testsuite-autobahn/src/main/java/io/netty/testsuite/autobahn/AutobahnServerHandler.java index ac10f32f7d..3aa2cbe70e 100644 --- a/testsuite-autobahn/src/main/java/io/netty/testsuite/autobahn/AutobahnServerHandler.java +++ b/testsuite-autobahn/src/main/java/io/netty/testsuite/autobahn/AutobahnServerHandler.java @@ -101,7 +101,7 @@ public class AutobahnServerHandler extends ChannelInboundHandlerAdapter { } if (frame instanceof CloseWebSocketFrame) { - handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame); + handshaker.close(ctx, (CloseWebSocketFrame) frame); } else if (frame instanceof PingWebSocketFrame) { ctx.write(new PongWebSocketFrame(frame.isFinalFragment(), frame.rsv(), frame.content())); } else if (frame instanceof TextWebSocketFrame ||