Add WebSocketClientHandshaker / WebSocketServerHandshaker close methods that take ChannelHandlerContext as parameter (#11171)

Motivation:

At the moment we only expose close(...) methods that take a Channel as paramater. This can be problematic as the write will start at the end of the pipeline which may contain ChannelOutboundHandler implementations that not expect WebSocketFrame objects. We should better also support to pass in a ChannelHandlerContext as starting point for the write which ensures that the WebSocketFrame objects will be handled correctly from this position of the pipeline.

Modifications:

- Add new close(...) methods that take a ChannelHandlerContext
- Add javadoc sentence to point users to the new methods.

Result:

Be able to "start" the close at the right position in the pipeline.
This commit is contained in:
Norman Maurer 2021-04-21 15:16:10 +02:00 committed by GitHub
parent d342124390
commit 697952e3e6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 126 additions and 23 deletions

View File

@ -18,7 +18,9 @@ package io.netty.handler.codec.http.websocketx;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundInvoker;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise;
import io.netty.channel.SimpleChannelInboundHandler;
@ -491,7 +493,10 @@ public abstract class WebSocketClientHandshaker {
protected abstract WebSocketFrameEncoder newWebSocketEncoder();
/**
* Performs the closing handshake
* Performs the closing handshake.
*
* When called from within a {@link ChannelHandler} you most likely want to use
* {@link #close(ChannelHandlerContext, CloseWebSocketFrame)}.
*
* @param channel
* Channel
@ -506,6 +511,9 @@ public abstract class WebSocketClientHandshaker {
/**
* Performs the closing handshake
*
* When called from within a {@link ChannelHandler} you most likely want to use
* {@link #close(ChannelHandlerContext, CloseWebSocketFrame, ChannelPromise)}.
*
* @param channel
* Channel
* @param frame
@ -515,21 +523,49 @@ public abstract class WebSocketClientHandshaker {
*/
public ChannelFuture close(Channel channel, CloseWebSocketFrame frame, ChannelPromise promise) {
ObjectUtil.checkNotNull(channel, "channel");
channel.writeAndFlush(frame, promise);
applyForceCloseTimeout(channel, promise);
return promise;
return close0(channel, channel, frame, promise);
}
private void applyForceCloseTimeout(final Channel channel, ChannelFuture flushFuture) {
/**
* Performs the closing handshake
*
* @param ctx
* the {@link ChannelHandlerContext} to use.
* @param frame
* Closing Frame that was received
*/
public ChannelFuture close(ChannelHandlerContext ctx, CloseWebSocketFrame frame) {
ObjectUtil.checkNotNull(ctx, "ctx");
return close(ctx, frame, ctx.newPromise());
}
/**
* Performs the closing handshake
*
* @param ctx
* the {@link ChannelHandlerContext} to use.
* @param frame
* Closing Frame that was received
* @param promise
* the {@link ChannelPromise} to be notified when the closing handshake is done
*/
public ChannelFuture close(ChannelHandlerContext ctx, CloseWebSocketFrame frame, ChannelPromise promise) {
ObjectUtil.checkNotNull(ctx, "ctx");
return close0(ctx, ctx.channel(), frame, promise);
}
private ChannelFuture close0(final ChannelOutboundInvoker invoker, final Channel channel,
CloseWebSocketFrame frame, ChannelPromise promise) {
invoker.writeAndFlush(frame, promise);
final long forceCloseTimeoutMillis = this.forceCloseTimeoutMillis;
final WebSocketClientHandshaker handshaker = this;
if (forceCloseTimeoutMillis <= 0 || !channel.isActive() || forceCloseInit != 0) {
return;
return promise;
}
flushFuture.addListener(new ChannelFutureListener() {
promise.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
public void operationComplete(ChannelFuture future) {
// If flush operation failed, there is no reason to expect
// a server to receive CloseFrame. Thus this should be handled
// by the application separately.
@ -540,7 +576,7 @@ public abstract class WebSocketClientHandshaker {
@Override
public void run() {
if (channel.isActive()) {
channel.close();
invoker.close();
forceCloseComplete = true;
}
}
@ -555,6 +591,7 @@ public abstract class WebSocketClientHandshaker {
}
}
});
return promise;
}
/**

View File

@ -112,13 +112,17 @@ abstract class WebSocketProtocolHandler extends MessageToMessageDecoder<WebSocke
ReferenceCountUtil.release(msg);
promise.setFailure(new ClosedChannelException());
} else if (msg instanceof CloseWebSocketFrame) {
closeSent = promise.unvoid();
closeSent(promise.unvoid());
ctx.write(msg).addListener(new ChannelPromiseNotifier(false, closeSent));
} else {
ctx.write(msg, promise);
}
}
void closeSent(ChannelPromise promise) {
closeSent = promise;
}
private void applyCloseSentTimeout(ChannelHandlerContext ctx) {
if (closeSent.isDone() || forceCloseTimeoutMillis < 0) {
return;

View File

@ -18,7 +18,9 @@ package io.netty.handler.codec.http.websocketx;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundInvoker;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise;
import io.netty.channel.SimpleChannelInboundHandler;
@ -329,12 +331,15 @@ public abstract class WebSocketServerHandshaker {
protected abstract FullHttpResponse newHandshakeResponse(FullHttpRequest req,
HttpHeaders responseHeaders);
/**
* Performs the closing handshake
* Performs the closing handshake.
*
* When called from within a {@link ChannelHandler} you most likely want to use
* {@link #close(ChannelHandlerContext, CloseWebSocketFrame)}.
*
* @param channel
* Channel
* the {@link Channel} to use.
* @param frame
* Closing Frame that was received
* Closing Frame that was received.
*/
public ChannelFuture close(Channel channel, CloseWebSocketFrame frame) {
ObjectUtil.checkNotNull(channel, "channel");
@ -342,18 +347,52 @@ public abstract class WebSocketServerHandshaker {
}
/**
* Performs the closing handshake
* Performs the closing handshake.
*
* When called from within a {@link ChannelHandler} you most likely want to use
* {@link #close(ChannelHandlerContext, CloseWebSocketFrame, ChannelPromise)}.
*
* @param channel
* Channel
* the {@link Channel} to use.
* @param frame
* Closing Frame that was received
* Closing Frame that was received.
* @param promise
* the {@link ChannelPromise} to be notified when the closing handshake is done
*/
public ChannelFuture close(Channel channel, CloseWebSocketFrame frame, ChannelPromise promise) {
ObjectUtil.checkNotNull(channel, "channel");
return channel.writeAndFlush(frame, promise).addListener(ChannelFutureListener.CLOSE);
return close0(channel, frame, promise);
}
/**
* Performs the closing handshake.
*
* @param ctx
* the {@link ChannelHandlerContext} to use.
* @param frame
* Closing Frame that was received.
*/
public ChannelFuture close(ChannelHandlerContext ctx, CloseWebSocketFrame frame) {
ObjectUtil.checkNotNull(ctx, "ctx");
return close(ctx, frame, ctx.newPromise());
}
/**
* Performs the closing handshake.
*
* @param ctx
* the {@link ChannelHandlerContext} to use.
* @param frame
* Closing Frame that was received.
* @param promise
* the {@link ChannelPromise} to be notified when the closing handshake is done.
*/
public ChannelFuture close(ChannelHandlerContext ctx, CloseWebSocketFrame frame, ChannelPromise promise) {
ObjectUtil.checkNotNull(ctx, "ctx");
return close0(ctx, frame, promise).addListener(ChannelFutureListener.CLOSE);
}
private ChannelFuture close0(ChannelOutboundInvoker invoker, CloseWebSocketFrame frame, ChannelPromise promise) {
return invoker.writeAndFlush(frame, promise).addListener(ChannelFutureListener.CLOSE);
}
/**

View File

@ -19,6 +19,8 @@ import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundInvoker;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.FullHttpRequest;
@ -199,15 +201,33 @@ public class WebSocketServerHandshaker00 extends WebSocketServerHandshaker {
* Echo back the closing frame
*
* @param channel
* Channel
* the {@link Channel} to use.
* @param frame
* Web Socket frame that was received
* Web Socket frame that was received.
* @param promise
* the {@link ChannelPromise} to be notified when the closing handshake is done.
*/
@Override
public ChannelFuture close(Channel channel, CloseWebSocketFrame frame, ChannelPromise promise) {
return channel.writeAndFlush(frame, promise);
}
/**
* Echo back the closing frame
*
* @param ctx
* the {@link ChannelHandlerContext} to use.
* @param frame
* Closing Frame that was received.
* @param promise
* the {@link ChannelPromise} to be notified when the closing handshake is done.
*/
@Override
public ChannelFuture close(ChannelHandlerContext ctx, CloseWebSocketFrame frame,
ChannelPromise promise) {
return ctx.writeAndFlush(frame, promise);
}
@Override
protected WebSocketFrameDecoder newWebsocketDecoder() {
return new WebSocket00FrameDecoder(decoderConfig());

View File

@ -21,6 +21,7 @@ import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandler;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpHeaders;
@ -237,7 +238,9 @@ public class WebSocketServerProtocolHandler extends WebSocketProtocolHandler {
WebSocketServerHandshaker handshaker = getHandshaker(ctx.channel());
if (handshaker != null) {
frame.retain();
handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame);
ChannelPromise promise = ctx.newPromise();
closeSent(promise);
handshaker.close(ctx, (CloseWebSocketFrame) frame, promise);
} else {
ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
}

View File

@ -111,7 +111,7 @@ public class WebSocketServerHandler extends SimpleChannelInboundHandler<Object>
// Check for closing frame
if (frame instanceof CloseWebSocketFrame) {
handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
handshaker.close(ctx, (CloseWebSocketFrame) frame.retain());
return;
}
if (frame instanceof PingWebSocketFrame) {

View File

@ -101,7 +101,7 @@ public class AutobahnServerHandler extends ChannelInboundHandlerAdapter {
}
if (frame instanceof CloseWebSocketFrame) {
handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame);
handshaker.close(ctx, (CloseWebSocketFrame) frame);
} else if (frame instanceof PingWebSocketFrame) {
ctx.write(new PongWebSocketFrame(frame.isFinalFragment(), frame.rsv(), frame.content()));
} else if (frame instanceof TextWebSocketFrame ||