WebSocket client handshaker to support "force close" after timeout (#8896)
Motivation: RFC 6455 defines that, generally, a WebSocket client should not close a TCP connection as far as a server is the one who's responsible for doing that. In practice tho', it's not always possible to control the server. Server's misbehavior may lead to connections being leaked (if the server does not comply with the RFC). RFC 6455 #7.1.1 says > In abnormal cases (such as not having received a TCP Close from the server after a reasonable amount of time) a client MAY initiate the TCP Close. Modifications: * WebSocket client handshaker additional param `forceCloseAfterMillis` * Use 10 seconds as default Result: WebSocket client handshaker to comply with RFC. Fixes #8883.
This commit is contained in:
parent
135c33b478
commit
a57f73e1fe
@ -42,6 +42,9 @@ import io.netty.util.internal.ThrowableUtil;
|
||||
import java.net.URI;
|
||||
import java.nio.channels.ClosedChannelException;
|
||||
import java.util.Locale;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
|
||||
|
||||
/**
|
||||
* Base class for web socket client handshake implementations
|
||||
@ -52,6 +55,7 @@ public abstract class WebSocketClientHandshaker {
|
||||
|
||||
private static final String HTTP_SCHEME_PREFIX = HttpScheme.HTTP + "://";
|
||||
private static final String HTTPS_SCHEME_PREFIX = HttpScheme.HTTPS + "://";
|
||||
protected static final int DEFAULT_FORCE_CLOSE_TIMEOUT_MILLIS = 10000;
|
||||
|
||||
private final URI uri;
|
||||
|
||||
@ -59,6 +63,15 @@ public abstract class WebSocketClientHandshaker {
|
||||
|
||||
private volatile boolean handshakeComplete;
|
||||
|
||||
private volatile long forceCloseTimeoutMillis = DEFAULT_FORCE_CLOSE_TIMEOUT_MILLIS;
|
||||
|
||||
private volatile int forceCloseInit;
|
||||
|
||||
private static final AtomicIntegerFieldUpdater<WebSocketClientHandshaker> FORCE_CLOSE_INIT_UPDATER =
|
||||
AtomicIntegerFieldUpdater.newUpdater(WebSocketClientHandshaker.class, "forceCloseInit");
|
||||
|
||||
private volatile boolean forceCloseComplete;
|
||||
|
||||
private final String expectedSubprotocol;
|
||||
|
||||
private volatile String actualSubprotocol;
|
||||
@ -84,11 +97,35 @@ public abstract class WebSocketClientHandshaker {
|
||||
*/
|
||||
protected WebSocketClientHandshaker(URI uri, WebSocketVersion version, String subprotocol,
|
||||
HttpHeaders customHeaders, int maxFramePayloadLength) {
|
||||
this(uri, version, subprotocol, customHeaders, maxFramePayloadLength, DEFAULT_FORCE_CLOSE_TIMEOUT_MILLIS);
|
||||
}
|
||||
|
||||
/**
|
||||
* Base constructor
|
||||
*
|
||||
* @param uri
|
||||
* URL for web socket communications. e.g "ws://myhost.com/mypath". Subsequent web socket frames will be
|
||||
* sent to this URL.
|
||||
* @param version
|
||||
* Version of web socket specification to use to connect to the server
|
||||
* @param subprotocol
|
||||
* Sub protocol request sent to the server.
|
||||
* @param customHeaders
|
||||
* Map of custom headers to add to the client request
|
||||
* @param maxFramePayloadLength
|
||||
* Maximum length of a frame's payload
|
||||
* @param forceCloseTimeoutMillis
|
||||
* Close the connection if it was not closed by the server after timeout specified
|
||||
*/
|
||||
protected WebSocketClientHandshaker(URI uri, WebSocketVersion version, String subprotocol,
|
||||
HttpHeaders customHeaders, int maxFramePayloadLength,
|
||||
long forceCloseTimeoutMillis) {
|
||||
this.uri = uri;
|
||||
this.version = version;
|
||||
expectedSubprotocol = subprotocol;
|
||||
this.customHeaders = customHeaders;
|
||||
this.maxFramePayloadLength = maxFramePayloadLength;
|
||||
this.forceCloseTimeoutMillis = forceCloseTimeoutMillis;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -142,6 +179,29 @@ public abstract class WebSocketClientHandshaker {
|
||||
this.actualSubprotocol = actualSubprotocol;
|
||||
}
|
||||
|
||||
public long forceCloseTimeoutMillis() {
|
||||
return forceCloseTimeoutMillis;
|
||||
}
|
||||
|
||||
/**
|
||||
* Flag to indicate if the closing handshake was initiated because of timeout.
|
||||
* For testing only.
|
||||
*/
|
||||
protected boolean isForceCloseComplete() {
|
||||
return forceCloseComplete;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets timeout to close the connection if it was not closed by the server.
|
||||
*
|
||||
* @param forceCloseTimeoutMillis
|
||||
* Close the connection if it was not closed by the server after timeout specified
|
||||
*/
|
||||
public WebSocketClientHandshaker setForceCloseTimeoutMillis(long forceCloseTimeoutMillis) {
|
||||
this.forceCloseTimeoutMillis = forceCloseTimeoutMillis;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Begins the opening handshake
|
||||
*
|
||||
@ -414,7 +474,38 @@ public abstract class WebSocketClientHandshaker {
|
||||
*/
|
||||
public ChannelFuture close(Channel channel, CloseWebSocketFrame frame, ChannelPromise promise) {
|
||||
requireNonNull(channel, "channel");
|
||||
return channel.writeAndFlush(frame, promise);
|
||||
|
||||
channel.writeAndFlush(frame, promise);
|
||||
applyForceCloseTimeout(channel, promise);
|
||||
return promise;
|
||||
}
|
||||
|
||||
private void applyForceCloseTimeout(final Channel channel, ChannelFuture flushFuture) {
|
||||
final long forceCloseTimeoutMillis = this.forceCloseTimeoutMillis;
|
||||
final WebSocketClientHandshaker handshaker = this;
|
||||
if (forceCloseTimeoutMillis <= 0 || !channel.isActive() || forceCloseInit != 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
flushFuture.addListener(future -> {
|
||||
// If flush operation failed, there is no reason to expect
|
||||
// a server to receive CloseFrame. Thus this should be handled
|
||||
// by the application separately.
|
||||
// Also, close might be called twice from different threads.
|
||||
if (future.isSuccess() && channel.isActive() &&
|
||||
FORCE_CLOSE_INIT_UPDATER.compareAndSet(handshaker, 0, 1)) {
|
||||
final Future<?> forceCloseFuture = channel.eventLoop().schedule(() -> {
|
||||
if (channel.isActive()) {
|
||||
channel.close();
|
||||
forceCloseComplete = true;
|
||||
}
|
||||
}, forceCloseTimeoutMillis, TimeUnit.MILLISECONDS);
|
||||
|
||||
channel.closeFuture().addListener(ignore -> {
|
||||
forceCloseFuture.cancel(false);
|
||||
});
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -49,7 +49,7 @@ public class WebSocketClientHandshaker00 extends WebSocketClientHandshaker {
|
||||
private ByteBuf expectedChallengeResponseBytes;
|
||||
|
||||
/**
|
||||
* Constructor specifying the destination web socket location and version to initiate
|
||||
* Creates a new instance with the specified destination WebSocket location and version to initiate.
|
||||
*
|
||||
* @param webSocketURL
|
||||
* URL for web socket communications. e.g "ws://myhost.com/mypath". Subsequent web socket frames will be
|
||||
@ -65,7 +65,31 @@ public class WebSocketClientHandshaker00 extends WebSocketClientHandshaker {
|
||||
*/
|
||||
public WebSocketClientHandshaker00(URI webSocketURL, WebSocketVersion version, String subprotocol,
|
||||
HttpHeaders customHeaders, int maxFramePayloadLength) {
|
||||
super(webSocketURL, version, subprotocol, customHeaders, maxFramePayloadLength);
|
||||
this(webSocketURL, version, subprotocol, customHeaders, maxFramePayloadLength,
|
||||
DEFAULT_FORCE_CLOSE_TIMEOUT_MILLIS);
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new instance with the specified destination WebSocket location and version to initiate.
|
||||
*
|
||||
* @param webSocketURL
|
||||
* URL for web socket communications. e.g "ws://myhost.com/mypath". Subsequent web socket frames will be
|
||||
* sent to this URL.
|
||||
* @param version
|
||||
* Version of web socket specification to use to connect to the server
|
||||
* @param subprotocol
|
||||
* Sub protocol request sent to the server.
|
||||
* @param customHeaders
|
||||
* Map of custom headers to add to the client request
|
||||
* @param maxFramePayloadLength
|
||||
* Maximum length of a frame's payload
|
||||
* @param forceCloseTimeoutMillis
|
||||
* Close the connection if it was not closed by the server after timeout specified
|
||||
*/
|
||||
public WebSocketClientHandshaker00(URI webSocketURL, WebSocketVersion version, String subprotocol,
|
||||
HttpHeaders customHeaders, int maxFramePayloadLength,
|
||||
long forceCloseTimeoutMillis) {
|
||||
super(webSocketURL, version, subprotocol, customHeaders, maxFramePayloadLength, forceCloseTimeoutMillis);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -244,4 +268,11 @@ public class WebSocketClientHandshaker00 extends WebSocketClientHandshaker {
|
||||
protected WebSocketFrameEncoder newWebSocketEncoder() {
|
||||
return new WebSocket00FrameEncoder();
|
||||
}
|
||||
|
||||
@Override
|
||||
public WebSocketClientHandshaker00 setForceCloseTimeoutMillis(long forceCloseTimeoutMillis) {
|
||||
super.setForceCloseTimeoutMillis(forceCloseTimeoutMillis);
|
||||
return this;
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -94,10 +94,43 @@ public class WebSocketClientHandshaker07 extends WebSocketClientHandshaker {
|
||||
* When set to true, frames which are not masked properly according to the standard will still be
|
||||
* accepted.
|
||||
*/
|
||||
public WebSocketClientHandshaker07(URI webSocketURL, WebSocketVersion version, String subprotocol,
|
||||
boolean allowExtensions, HttpHeaders customHeaders, int maxFramePayloadLength,
|
||||
boolean performMasking, boolean allowMaskMismatch) {
|
||||
this(webSocketURL, version, subprotocol, allowExtensions, customHeaders, maxFramePayloadLength, performMasking,
|
||||
allowMaskMismatch, DEFAULT_FORCE_CLOSE_TIMEOUT_MILLIS);
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new instance.
|
||||
*
|
||||
* @param webSocketURL
|
||||
* URL for web socket communications. e.g "ws://myhost.com/mypath". Subsequent web socket frames will be
|
||||
* sent to this URL.
|
||||
* @param version
|
||||
* Version of web socket specification to use to connect to the server
|
||||
* @param subprotocol
|
||||
* Sub protocol request sent to the server.
|
||||
* @param allowExtensions
|
||||
* Allow extensions to be used in the reserved bits of the web socket frame
|
||||
* @param customHeaders
|
||||
* Map of custom headers to add to the client request
|
||||
* @param maxFramePayloadLength
|
||||
* Maximum length of a frame's payload
|
||||
* @param performMasking
|
||||
* Whether to mask all written websocket frames. This must be set to true in order to be fully compatible
|
||||
* with the websocket specifications. Client applications that communicate with a non-standard server
|
||||
* which doesn't require masking might set this to false to achieve a higher performance.
|
||||
* @param allowMaskMismatch
|
||||
* When set to true, frames which are not masked properly according to the standard will still be
|
||||
* accepted
|
||||
* @param forceCloseTimeoutMillis
|
||||
* Close the connection if it was not closed by the server after timeout specified.
|
||||
*/
|
||||
public WebSocketClientHandshaker07(URI webSocketURL, WebSocketVersion version, String subprotocol,
|
||||
boolean allowExtensions, HttpHeaders customHeaders, int maxFramePayloadLength,
|
||||
boolean performMasking, boolean allowMaskMismatch) {
|
||||
super(webSocketURL, version, subprotocol, customHeaders, maxFramePayloadLength);
|
||||
boolean performMasking, boolean allowMaskMismatch, long forceCloseTimeoutMillis) {
|
||||
super(webSocketURL, version, subprotocol, customHeaders, maxFramePayloadLength, forceCloseTimeoutMillis);
|
||||
this.allowExtensions = allowExtensions;
|
||||
this.performMasking = performMasking;
|
||||
this.allowMaskMismatch = allowMaskMismatch;
|
||||
@ -216,4 +249,11 @@ public class WebSocketClientHandshaker07 extends WebSocketClientHandshaker {
|
||||
protected WebSocketFrameEncoder newWebSocketEncoder() {
|
||||
return new WebSocket07FrameEncoder(performMasking);
|
||||
}
|
||||
|
||||
@Override
|
||||
public WebSocketClientHandshaker07 setForceCloseTimeoutMillis(long forceCloseTimeoutMillis) {
|
||||
super.setForceCloseTimeoutMillis(forceCloseTimeoutMillis);
|
||||
return this;
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -68,7 +68,8 @@ public class WebSocketClientHandshaker08 extends WebSocketClientHandshaker {
|
||||
*/
|
||||
public WebSocketClientHandshaker08(URI webSocketURL, WebSocketVersion version, String subprotocol,
|
||||
boolean allowExtensions, HttpHeaders customHeaders, int maxFramePayloadLength) {
|
||||
this(webSocketURL, version, subprotocol, allowExtensions, customHeaders, maxFramePayloadLength, true, false);
|
||||
this(webSocketURL, version, subprotocol, allowExtensions, customHeaders, maxFramePayloadLength, true,
|
||||
false, DEFAULT_FORCE_CLOSE_TIMEOUT_MILLIS);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -93,12 +94,45 @@ public class WebSocketClientHandshaker08 extends WebSocketClientHandshaker {
|
||||
* which doesn't require masking might set this to false to achieve a higher performance.
|
||||
* @param allowMaskMismatch
|
||||
* When set to true, frames which are not masked properly according to the standard will still be
|
||||
* accepted.
|
||||
* accepted
|
||||
*/
|
||||
public WebSocketClientHandshaker08(URI webSocketURL, WebSocketVersion version, String subprotocol,
|
||||
boolean allowExtensions, HttpHeaders customHeaders, int maxFramePayloadLength,
|
||||
boolean performMasking, boolean allowMaskMismatch) {
|
||||
this(webSocketURL, version, subprotocol, allowExtensions, customHeaders, maxFramePayloadLength, performMasking,
|
||||
allowMaskMismatch, DEFAULT_FORCE_CLOSE_TIMEOUT_MILLIS);
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new instance.
|
||||
*
|
||||
* @param webSocketURL
|
||||
* URL for web socket communications. e.g "ws://myhost.com/mypath". Subsequent web socket frames will be
|
||||
* sent to this URL.
|
||||
* @param version
|
||||
* Version of web socket specification to use to connect to the server
|
||||
* @param subprotocol
|
||||
* Sub protocol request sent to the server.
|
||||
* @param allowExtensions
|
||||
* Allow extensions to be used in the reserved bits of the web socket frame
|
||||
* @param customHeaders
|
||||
* Map of custom headers to add to the client request
|
||||
* @param maxFramePayloadLength
|
||||
* Maximum length of a frame's payload
|
||||
* @param performMasking
|
||||
* Whether to mask all written websocket frames. This must be set to true in order to be fully compatible
|
||||
* with the websocket specifications. Client applications that communicate with a non-standard server
|
||||
* which doesn't require masking might set this to false to achieve a higher performance.
|
||||
* @param allowMaskMismatch
|
||||
* When set to true, frames which are not masked properly according to the standard will still be
|
||||
* accepted
|
||||
* @param forceCloseTimeoutMillis
|
||||
* Close the connection if it was not closed by the server after timeout specified.
|
||||
*/
|
||||
public WebSocketClientHandshaker08(URI webSocketURL, WebSocketVersion version, String subprotocol,
|
||||
boolean allowExtensions, HttpHeaders customHeaders, int maxFramePayloadLength,
|
||||
boolean performMasking, boolean allowMaskMismatch) {
|
||||
super(webSocketURL, version, subprotocol, customHeaders, maxFramePayloadLength);
|
||||
boolean performMasking, boolean allowMaskMismatch, long forceCloseTimeoutMillis) {
|
||||
super(webSocketURL, version, subprotocol, customHeaders, maxFramePayloadLength, forceCloseTimeoutMillis);
|
||||
this.allowExtensions = allowExtensions;
|
||||
this.performMasking = performMasking;
|
||||
this.allowMaskMismatch = allowMaskMismatch;
|
||||
@ -217,4 +251,11 @@ public class WebSocketClientHandshaker08 extends WebSocketClientHandshaker {
|
||||
protected WebSocketFrameEncoder newWebSocketEncoder() {
|
||||
return new WebSocket08FrameEncoder(performMasking);
|
||||
}
|
||||
|
||||
@Override
|
||||
public WebSocketClientHandshaker08 setForceCloseTimeoutMillis(long forceCloseTimeoutMillis) {
|
||||
super.setForceCloseTimeoutMillis(forceCloseTimeoutMillis);
|
||||
return this;
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -68,7 +68,8 @@ public class WebSocketClientHandshaker13 extends WebSocketClientHandshaker {
|
||||
*/
|
||||
public WebSocketClientHandshaker13(URI webSocketURL, WebSocketVersion version, String subprotocol,
|
||||
boolean allowExtensions, HttpHeaders customHeaders, int maxFramePayloadLength) {
|
||||
this(webSocketURL, version, subprotocol, allowExtensions, customHeaders, maxFramePayloadLength, true, false);
|
||||
this(webSocketURL, version, subprotocol, allowExtensions, customHeaders, maxFramePayloadLength,
|
||||
true, false);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -98,7 +99,41 @@ public class WebSocketClientHandshaker13 extends WebSocketClientHandshaker {
|
||||
public WebSocketClientHandshaker13(URI webSocketURL, WebSocketVersion version, String subprotocol,
|
||||
boolean allowExtensions, HttpHeaders customHeaders, int maxFramePayloadLength,
|
||||
boolean performMasking, boolean allowMaskMismatch) {
|
||||
super(webSocketURL, version, subprotocol, customHeaders, maxFramePayloadLength);
|
||||
this(webSocketURL, version, subprotocol, allowExtensions, customHeaders, maxFramePayloadLength,
|
||||
performMasking, allowMaskMismatch, DEFAULT_FORCE_CLOSE_TIMEOUT_MILLIS);
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new instance.
|
||||
*
|
||||
* @param webSocketURL
|
||||
* URL for web socket communications. e.g "ws://myhost.com/mypath". Subsequent web socket frames will be
|
||||
* sent to this URL.
|
||||
* @param version
|
||||
* Version of web socket specification to use to connect to the server
|
||||
* @param subprotocol
|
||||
* Sub protocol request sent to the server.
|
||||
* @param allowExtensions
|
||||
* Allow extensions to be used in the reserved bits of the web socket frame
|
||||
* @param customHeaders
|
||||
* Map of custom headers to add to the client request
|
||||
* @param maxFramePayloadLength
|
||||
* Maximum length of a frame's payload
|
||||
* @param performMasking
|
||||
* Whether to mask all written websocket frames. This must be set to true in order to be fully compatible
|
||||
* with the websocket specifications. Client applications that communicate with a non-standard server
|
||||
* which doesn't require masking might set this to false to achieve a higher performance.
|
||||
* @param allowMaskMismatch
|
||||
* When set to true, frames which are not masked properly according to the standard will still be
|
||||
* accepted
|
||||
* @param forceCloseTimeoutMillis
|
||||
* Close the connection if it was not closed by the server after timeout specified.
|
||||
*/
|
||||
public WebSocketClientHandshaker13(URI webSocketURL, WebSocketVersion version, String subprotocol,
|
||||
boolean allowExtensions, HttpHeaders customHeaders, int maxFramePayloadLength,
|
||||
boolean performMasking, boolean allowMaskMismatch,
|
||||
long forceCloseTimeoutMillis) {
|
||||
super(webSocketURL, version, subprotocol, customHeaders, maxFramePayloadLength, forceCloseTimeoutMillis);
|
||||
this.allowExtensions = allowExtensions;
|
||||
this.performMasking = performMasking;
|
||||
this.allowMaskMismatch = allowMaskMismatch;
|
||||
@ -217,4 +252,11 @@ public class WebSocketClientHandshaker13 extends WebSocketClientHandshaker {
|
||||
protected WebSocketFrameEncoder newWebSocketEncoder() {
|
||||
return new WebSocket13FrameEncoder(performMasking);
|
||||
}
|
||||
|
||||
@Override
|
||||
public WebSocketClientHandshaker13 setForceCloseTimeoutMillis(long forceCloseTimeoutMillis) {
|
||||
super.setForceCloseTimeoutMillis(forceCloseTimeoutMillis);
|
||||
return this;
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -107,24 +107,59 @@ public final class WebSocketClientHandshakerFactory {
|
||||
URI webSocketURL, WebSocketVersion version, String subprotocol,
|
||||
boolean allowExtensions, HttpHeaders customHeaders, int maxFramePayloadLength,
|
||||
boolean performMasking, boolean allowMaskMismatch) {
|
||||
return newHandshaker(webSocketURL, version, subprotocol, allowExtensions, customHeaders,
|
||||
maxFramePayloadLength, true, false, -1);
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new handshaker.
|
||||
*
|
||||
* @param webSocketURL
|
||||
* URL for web socket communications. e.g "ws://myhost.com/mypath".
|
||||
* Subsequent web socket frames will be sent to this URL.
|
||||
* @param version
|
||||
* Version of web socket specification to use to connect to the server
|
||||
* @param subprotocol
|
||||
* Sub protocol request sent to the server. Null if no sub-protocol support is required.
|
||||
* @param allowExtensions
|
||||
* Allow extensions to be used in the reserved bits of the web socket frame
|
||||
* @param customHeaders
|
||||
* Custom HTTP headers to send during the handshake
|
||||
* @param maxFramePayloadLength
|
||||
* Maximum allowable frame payload length. Setting this value to your application's
|
||||
* requirement may reduce denial of service attacks using long data frames.
|
||||
* @param performMasking
|
||||
* Whether to mask all written websocket frames. This must be set to true in order to be fully compatible
|
||||
* with the websocket specifications. Client applications that communicate with a non-standard server
|
||||
* which doesn't require masking might set this to false to achieve a higher performance.
|
||||
* @param allowMaskMismatch
|
||||
* When set to true, frames which are not masked properly according to the standard will still be
|
||||
* accepted.
|
||||
* @param forceCloseTimeoutMillis
|
||||
* Close the connection if it was not closed by the server after timeout specified
|
||||
*/
|
||||
public static WebSocketClientHandshaker newHandshaker(
|
||||
URI webSocketURL, WebSocketVersion version, String subprotocol,
|
||||
boolean allowExtensions, HttpHeaders customHeaders, int maxFramePayloadLength,
|
||||
boolean performMasking, boolean allowMaskMismatch, long forceCloseTimeoutMillis) {
|
||||
if (version == V13) {
|
||||
return new WebSocketClientHandshaker13(
|
||||
webSocketURL, V13, subprotocol, allowExtensions, customHeaders,
|
||||
maxFramePayloadLength, performMasking, allowMaskMismatch);
|
||||
maxFramePayloadLength, performMasking, allowMaskMismatch, forceCloseTimeoutMillis);
|
||||
}
|
||||
if (version == V08) {
|
||||
return new WebSocketClientHandshaker08(
|
||||
webSocketURL, V08, subprotocol, allowExtensions, customHeaders,
|
||||
maxFramePayloadLength, performMasking, allowMaskMismatch);
|
||||
maxFramePayloadLength, performMasking, allowMaskMismatch, forceCloseTimeoutMillis);
|
||||
}
|
||||
if (version == V07) {
|
||||
return new WebSocketClientHandshaker07(
|
||||
webSocketURL, V07, subprotocol, allowExtensions, customHeaders,
|
||||
maxFramePayloadLength, performMasking, allowMaskMismatch);
|
||||
maxFramePayloadLength, performMasking, allowMaskMismatch, forceCloseTimeoutMillis);
|
||||
}
|
||||
if (version == V00) {
|
||||
return new WebSocketClientHandshaker00(
|
||||
webSocketURL, V00, subprotocol, customHeaders, maxFramePayloadLength);
|
||||
webSocketURL, V00, subprotocol, customHeaders, maxFramePayloadLength, forceCloseTimeoutMillis);
|
||||
}
|
||||
|
||||
throw new WebSocketHandshakeException("Protocol version " + version + " not supported.");
|
||||
|
@ -217,7 +217,7 @@ public abstract class WebSocketClientHandshakerTest {
|
||||
String url = "ws://localhost:9999/ws";
|
||||
final WebSocketClientHandshaker shaker = newHandshaker(URI.create(url));
|
||||
final WebSocketClientHandshaker handshaker = new WebSocketClientHandshaker(
|
||||
shaker.uri(), shaker.version(), null, EmptyHttpHeaders.INSTANCE, Integer.MAX_VALUE) {
|
||||
shaker.uri(), shaker.version(), null, EmptyHttpHeaders.INSTANCE, Integer.MAX_VALUE, -1) {
|
||||
@Override
|
||||
protected FullHttpRequest newHandshakeRequest() {
|
||||
return shaker.newHandshakeRequest();
|
||||
|
@ -17,10 +17,13 @@ package io.netty.handler.codec.http.websocketx;
|
||||
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.buffer.Unpooled;
|
||||
import io.netty.channel.ChannelFuture;
|
||||
import io.netty.channel.ChannelFutureListener;
|
||||
import io.netty.channel.ChannelHandler;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.SimpleChannelInboundHandler;
|
||||
import io.netty.channel.embedded.EmbeddedChannel;
|
||||
import io.netty.handler.codec.http.EmptyHttpHeaders;
|
||||
import io.netty.handler.codec.http.HttpClientCodec;
|
||||
import io.netty.handler.codec.http.HttpObjectAggregator;
|
||||
import io.netty.handler.codec.http.HttpServerCodec;
|
||||
@ -30,6 +33,7 @@ import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.net.URI;
|
||||
import java.util.List;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
@ -39,6 +43,23 @@ public class WebSocketHandshakeHandOverTest {
|
||||
private WebSocketServerProtocolHandler.HandshakeComplete serverHandshakeComplete;
|
||||
private boolean clientReceivedHandshake;
|
||||
private boolean clientReceivedMessage;
|
||||
private boolean serverReceivedCloseHandshake;
|
||||
private boolean clientForceClosed;
|
||||
|
||||
private final class CloseNoOpServerProtocolHandler extends WebSocketServerProtocolHandler {
|
||||
CloseNoOpServerProtocolHandler(String websocketPath) {
|
||||
super(websocketPath, null, false);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void decode(ChannelHandlerContext ctx, WebSocketFrame frame, List<Object> out) throws Exception {
|
||||
if (frame instanceof CloseWebSocketFrame) {
|
||||
serverReceivedCloseHandshake = true;
|
||||
return;
|
||||
}
|
||||
super.decode(ctx, frame, out);
|
||||
}
|
||||
}
|
||||
|
||||
@Before
|
||||
public void setUp() {
|
||||
@ -46,6 +67,8 @@ public class WebSocketHandshakeHandOverTest {
|
||||
serverHandshakeComplete = null;
|
||||
clientReceivedHandshake = false;
|
||||
clientReceivedMessage = false;
|
||||
serverReceivedCloseHandshake = false;
|
||||
clientForceClosed = false;
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -95,6 +118,64 @@ public class WebSocketHandshakeHandOverTest {
|
||||
assertTrue(clientReceivedMessage);
|
||||
}
|
||||
|
||||
@Test(timeout = 10000)
|
||||
public void testClientHandshakerForceClose() throws Exception {
|
||||
final WebSocketClientHandshaker handshaker = WebSocketClientHandshakerFactory.newHandshaker(
|
||||
new URI("ws://localhost:1234/test"), WebSocketVersion.V13, null, true,
|
||||
EmptyHttpHeaders.INSTANCE, Integer.MAX_VALUE, true, false, 20);
|
||||
|
||||
EmbeddedChannel serverChannel = createServerChannel(
|
||||
new CloseNoOpServerProtocolHandler("/test"),
|
||||
new SimpleChannelInboundHandler<Object>() {
|
||||
@Override
|
||||
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
|
||||
}
|
||||
});
|
||||
|
||||
EmbeddedChannel clientChannel = createClientChannel(handshaker, new SimpleChannelInboundHandler<Object>() {
|
||||
@Override
|
||||
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
|
||||
if (evt == ClientHandshakeStateEvent.HANDSHAKE_COMPLETE) {
|
||||
ctx.channel().closeFuture().addListener(new ChannelFutureListener() {
|
||||
@Override
|
||||
public void operationComplete(ChannelFuture future) throws Exception {
|
||||
clientForceClosed = true;
|
||||
}
|
||||
});
|
||||
handshaker.close(ctx.channel(), new CloseWebSocketFrame());
|
||||
}
|
||||
}
|
||||
@Override
|
||||
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
|
||||
}
|
||||
});
|
||||
|
||||
// Transfer the handshake from the client to the server
|
||||
transferAllDataWithMerge(clientChannel, serverChannel);
|
||||
// Transfer the handshake from the server to client
|
||||
transferAllDataWithMerge(serverChannel, clientChannel);
|
||||
|
||||
// Transfer closing handshake
|
||||
transferAllDataWithMerge(clientChannel, serverChannel);
|
||||
assertTrue(serverReceivedCloseHandshake);
|
||||
// Should not be closed yet as we disabled closing the connection on the server
|
||||
assertFalse(clientForceClosed);
|
||||
|
||||
while (!clientForceClosed) {
|
||||
Thread.sleep(10);
|
||||
// We need to run all pending tasks as the force close timeout is scheduled on the EventLoop.
|
||||
clientChannel.runPendingTasks();
|
||||
}
|
||||
|
||||
// clientForceClosed would be set to TRUE after any close,
|
||||
// so check here that force close timeout was actually fired
|
||||
assertTrue(handshaker.isForceCloseComplete());
|
||||
|
||||
// Both should be empty
|
||||
assertFalse(serverChannel.finishAndReleaseAll());
|
||||
assertFalse(clientChannel.finishAndReleaseAll());
|
||||
}
|
||||
|
||||
/**
|
||||
* Transfers all pending data from the source channel into the destination channel.<br>
|
||||
* Merges all data into a single buffer before transmission into the destination.
|
||||
@ -137,6 +218,16 @@ public class WebSocketHandshakeHandOverTest {
|
||||
handler);
|
||||
}
|
||||
|
||||
private static EmbeddedChannel createClientChannel(WebSocketClientHandshaker handshaker,
|
||||
ChannelHandler handler) throws Exception {
|
||||
return new EmbeddedChannel(
|
||||
new HttpClientCodec(),
|
||||
new HttpObjectAggregator(8192),
|
||||
// Note that we're switching off close frames handling on purpose to test forced close on timeout.
|
||||
new WebSocketClientProtocolHandler(handshaker, false, false),
|
||||
handler);
|
||||
}
|
||||
|
||||
private static EmbeddedChannel createServerChannel(ChannelHandler handler) {
|
||||
return new EmbeddedChannel(
|
||||
new HttpServerCodec(),
|
||||
@ -144,4 +235,14 @@ public class WebSocketHandshakeHandOverTest {
|
||||
new WebSocketServerProtocolHandler("/test", "test-proto-1, test-proto-2", false),
|
||||
handler);
|
||||
}
|
||||
|
||||
private static EmbeddedChannel createServerChannel(WebSocketServerProtocolHandler webSocketHandler,
|
||||
ChannelHandler handler) {
|
||||
return new EmbeddedChannel(
|
||||
new HttpServerCodec(),
|
||||
new HttpObjectAggregator(8192),
|
||||
webSocketHandler,
|
||||
handler);
|
||||
}
|
||||
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user