ApplicationProtocolNegotiationHandler
should drain buffer messages on channel close (#11445)
__Motivation__ `ApplicationProtocolNegotiationHandler` buffers messages which are read before SSL handshake complete event is received and drains them when the handler is removed. However, the channel may be closed (or input shutdown) before SSL handshake event is received in which case we may fire channel read after channel closure (from `handlerRemoved()`). __Modification__ Intercept `channelInactive()` and input closed event and drain the buffer. __Result__ If channel is closed before SSL handshake complete event is received, we still maintain the order of message read and channel closure. Co-authored-by: Norman Maurer <norman_maurer@apple.com>
This commit is contained in:
parent
7c57c4be17
commit
f2295628e9
@ -20,8 +20,8 @@ import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.ChannelInboundHandlerAdapter;
|
||||
import io.netty.channel.ChannelInitializer;
|
||||
import io.netty.channel.ChannelPipeline;
|
||||
import io.netty.channel.socket.ChannelInputShutdownEvent;
|
||||
import io.netty.handler.codec.DecoderException;
|
||||
import io.netty.util.ReferenceCountUtil;
|
||||
import io.netty.util.internal.ObjectUtil;
|
||||
import io.netty.util.internal.RecyclableArrayList;
|
||||
import io.netty.util.internal.logging.InternalLogger;
|
||||
@ -145,9 +145,19 @@ public abstract class ApplicationProtocolNegotiationHandler extends ChannelInbou
|
||||
}
|
||||
}
|
||||
|
||||
if (evt instanceof ChannelInputShutdownEvent) {
|
||||
fireBufferedMessages();
|
||||
}
|
||||
|
||||
ctx.fireUserEventTriggered(evt);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
|
||||
fireBufferedMessages();
|
||||
super.channelInactive(ctx);
|
||||
}
|
||||
|
||||
private void removeSelfIfPresent(ChannelHandlerContext ctx) {
|
||||
ChannelPipeline pipeline = ctx.pipeline();
|
||||
if (pipeline.context(this) != null) {
|
||||
|
@ -20,16 +20,17 @@ import io.netty.channel.ChannelHandler;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.ChannelInboundHandlerAdapter;
|
||||
import io.netty.channel.embedded.EmbeddedChannel;
|
||||
import io.netty.channel.socket.ChannelInputShutdownEvent;
|
||||
import io.netty.handler.codec.DecoderException;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.function.Executable;
|
||||
|
||||
import java.security.NoSuchAlgorithmException;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import javax.net.ssl.SSLContext;
|
||||
import javax.net.ssl.SSLEngine;
|
||||
import javax.net.ssl.SSLHandshakeException;
|
||||
import java.security.NoSuchAlgorithmException;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import static io.netty.handler.ssl.CloseNotifyTest.assertCloseNotify;
|
||||
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
|
||||
@ -108,6 +109,33 @@ public class ApplicationProtocolNegotiationHandlerTest {
|
||||
|
||||
@Test
|
||||
public void testBufferMessagesUntilHandshakeComplete() throws Exception {
|
||||
testBufferMessagesUntilHandshakeComplete(null);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBufferMessagesUntilHandshakeCompleteWithClose() throws Exception {
|
||||
testBufferMessagesUntilHandshakeComplete(
|
||||
new ApplicationProtocolNegotiationHandlerTest.Consumer<ChannelHandlerContext>() {
|
||||
@Override
|
||||
public void consume(ChannelHandlerContext ctx) {
|
||||
ctx.channel().close();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBufferMessagesUntilHandshakeCompleteWithInputShutdown() throws Exception {
|
||||
testBufferMessagesUntilHandshakeComplete(
|
||||
new ApplicationProtocolNegotiationHandlerTest.Consumer<ChannelHandlerContext>() {
|
||||
@Override
|
||||
public void consume(ChannelHandlerContext ctx) {
|
||||
ctx.fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private void testBufferMessagesUntilHandshakeComplete(final Consumer<ChannelHandlerContext> pipelineConfigurator)
|
||||
throws Exception {
|
||||
final AtomicReference<byte[]> channelReadData = new AtomicReference<byte[]>();
|
||||
final AtomicBoolean channelReadCompleteCalled = new AtomicBoolean(false);
|
||||
ChannelHandler alpnHandler = new ApplicationProtocolNegotiationHandler(ApplicationProtocolNames.HTTP_1_1) {
|
||||
@ -116,15 +144,18 @@ public class ApplicationProtocolNegotiationHandlerTest {
|
||||
assertEquals(ApplicationProtocolNames.HTTP_1_1, protocol);
|
||||
ctx.pipeline().addLast(new ChannelInboundHandlerAdapter() {
|
||||
@Override
|
||||
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
|
||||
public void channelRead(ChannelHandlerContext ctx, Object msg) {
|
||||
channelReadData.set((byte[]) msg);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
|
||||
public void channelReadComplete(ChannelHandlerContext ctx) {
|
||||
channelReadCompleteCalled.set(true);
|
||||
}
|
||||
});
|
||||
if (pipelineConfigurator != null) {
|
||||
pipelineConfigurator.consume(ctx);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
@ -149,6 +180,10 @@ public class ApplicationProtocolNegotiationHandlerTest {
|
||||
assertArrayEquals(someBytes, channelReadData.get());
|
||||
assertTrue(channelReadCompleteCalled.get());
|
||||
assertNull(channel.readInbound());
|
||||
channel.finishAndReleaseAll();
|
||||
assertTrue(channel.finishAndReleaseAll());
|
||||
}
|
||||
|
||||
private interface Consumer<T> {
|
||||
void consume(T t);
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user