Don't reuse ChannelPromise in WebSocketProtocolHandler (#10248)
Motivation: We cant reuse the ChannelPromise as it will cause an error when trying to ful-fill it multiple times. Modifications: - Use a new promise and chain it with the old one - Add unit test Result: Fixes https://github.com/netty/netty/issues/10240
This commit is contained in:
parent
40448db5bb
commit
f00160bca3
@ -21,6 +21,7 @@ import io.netty.channel.ChannelFutureListener;
|
|||||||
import io.netty.channel.ChannelHandlerContext;
|
import io.netty.channel.ChannelHandlerContext;
|
||||||
import io.netty.channel.ChannelOutboundHandler;
|
import io.netty.channel.ChannelOutboundHandler;
|
||||||
import io.netty.channel.ChannelPromise;
|
import io.netty.channel.ChannelPromise;
|
||||||
|
import io.netty.channel.ChannelPromiseNotifier;
|
||||||
import io.netty.handler.codec.MessageToMessageDecoder;
|
import io.netty.handler.codec.MessageToMessageDecoder;
|
||||||
import io.netty.util.ReferenceCountUtil;
|
import io.netty.util.ReferenceCountUtil;
|
||||||
import io.netty.util.concurrent.ScheduledFuture;
|
import io.netty.util.concurrent.ScheduledFuture;
|
||||||
@ -110,14 +111,13 @@ abstract class WebSocketProtocolHandler extends MessageToMessageDecoder<WebSocke
|
|||||||
if (closeSent != null) {
|
if (closeSent != null) {
|
||||||
ReferenceCountUtil.release(msg);
|
ReferenceCountUtil.release(msg);
|
||||||
promise.setFailure(new ClosedChannelException());
|
promise.setFailure(new ClosedChannelException());
|
||||||
return;
|
} else if (msg instanceof CloseWebSocketFrame) {
|
||||||
}
|
closeSent = promise.unvoid();
|
||||||
if (msg instanceof CloseWebSocketFrame) {
|
ctx.write(msg).addListener(new ChannelPromiseNotifier(false, closeSent));
|
||||||
promise = promise.unvoid();
|
} else {
|
||||||
closeSent = promise;
|
|
||||||
}
|
|
||||||
ctx.write(msg, promise);
|
ctx.write(msg, promise);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private void applyCloseSentTimeout(ChannelHandlerContext ctx) {
|
private void applyCloseSentTimeout(ChannelHandlerContext ctx) {
|
||||||
if (closeSent.isDone() || forceCloseTimeoutMillis < 0) {
|
if (closeSent.isDone() || forceCloseTimeoutMillis < 0) {
|
||||||
|
@ -18,10 +18,18 @@ package io.netty.handler.codec.http.websocketx;
|
|||||||
|
|
||||||
import io.netty.buffer.ByteBuf;
|
import io.netty.buffer.ByteBuf;
|
||||||
import io.netty.buffer.Unpooled;
|
import io.netty.buffer.Unpooled;
|
||||||
|
import io.netty.channel.ChannelFuture;
|
||||||
|
import io.netty.channel.ChannelHandlerContext;
|
||||||
|
import io.netty.channel.ChannelOutboundHandlerAdapter;
|
||||||
|
import io.netty.channel.ChannelPromise;
|
||||||
import io.netty.channel.embedded.EmbeddedChannel;
|
import io.netty.channel.embedded.EmbeddedChannel;
|
||||||
import io.netty.handler.flow.FlowControlHandler;
|
import io.netty.handler.flow.FlowControlHandler;
|
||||||
|
import io.netty.util.ReferenceCountUtil;
|
||||||
|
import org.hamcrest.Matchers;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
|
||||||
import static io.netty.util.CharsetUtil.UTF_8;
|
import static io.netty.util.CharsetUtil.UTF_8;
|
||||||
import static org.junit.Assert.*;
|
import static org.junit.Assert.*;
|
||||||
|
|
||||||
@ -139,6 +147,33 @@ public class WebSocketProtocolHandlerTest {
|
|||||||
assertFalse(channel.finish());
|
assertFalse(channel.finish());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testTimeout() throws Exception {
|
||||||
|
final AtomicReference<ChannelPromise> ref = new AtomicReference<ChannelPromise>();
|
||||||
|
WebSocketProtocolHandler handler = new WebSocketProtocolHandler(
|
||||||
|
false, WebSocketCloseStatus.NORMAL_CLOSURE, 1) { };
|
||||||
|
EmbeddedChannel channel = new EmbeddedChannel(new ChannelOutboundHandlerAdapter() {
|
||||||
|
@Override
|
||||||
|
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
|
||||||
|
ref.set(promise);
|
||||||
|
ReferenceCountUtil.release(msg);
|
||||||
|
}
|
||||||
|
}, handler);
|
||||||
|
|
||||||
|
ChannelFuture future = channel.writeAndFlush(new CloseWebSocketFrame());
|
||||||
|
ChannelHandlerContext ctx = channel.pipeline().context(WebSocketProtocolHandler.class);
|
||||||
|
handler.close(ctx, ctx.newPromise());
|
||||||
|
|
||||||
|
do {
|
||||||
|
Thread.sleep(10);
|
||||||
|
channel.runPendingTasks();
|
||||||
|
} while (!future.isDone());
|
||||||
|
|
||||||
|
assertThat(future.cause(), Matchers.instanceOf(WebSocketHandshakeException.class));
|
||||||
|
assertFalse(ref.get().isDone());
|
||||||
|
assertFalse(channel.finish());
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Asserts that a message was propagated inbound through the channel.
|
* Asserts that a message was propagated inbound through the channel.
|
||||||
*/
|
*/
|
||||||
|
Loading…
Reference in New Issue
Block a user