ApplicationProtocolNegotiationHandler should drain buffer messages on channel close (#11445)

__Motivation__

`ApplicationProtocolNegotiationHandler` buffers messages which are read before SSL handshake complete event is received and drains them when the handler is removed. However, the channel may be closed (or input shutdown) before SSL handshake  event is received in which case we may fire channel read after channel closure (from `handlerRemoved()`).

__Modification__

Intercept `channelInactive()` and input closed event and drain the buffer.

__Result__

If channel is closed before SSL handshake complete event is received, we still maintain the order of message read and channel closure.

Co-authored-by: Norman Maurer <norman_maurer@apple.com>
This commit is contained in:
Nitesh Kant 2021-07-06 05:01:17 -07:00 committed by Norman Maurer
parent 2b9f4836be
commit c62eb26b09
2 changed files with 52 additions and 6 deletions

View File

@ -22,6 +22,7 @@ import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.ChannelInputShutdownEvent;
import io.netty.handler.codec.DecoderException;
import io.netty.util.internal.RecyclableArrayList;
import io.netty.util.internal.logging.InternalLogger;
@ -143,10 +144,20 @@ public abstract class ApplicationProtocolNegotiationHandler implements ChannelHa
}
}
} else {
if (evt instanceof ChannelInputShutdownEvent) {
fireBufferedMessages();
}
ctx.fireUserEventTriggered(evt);
}
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
fireBufferedMessages();
ctx.fireChannelInactive();
}
private void removeSelfIfPresent(ChannelHandlerContext ctx) {
ChannelPipeline pipeline = ctx.pipeline();
if (pipeline.context(this) != null) {

View File

@ -19,16 +19,17 @@ import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.channel.socket.ChannelInputShutdownEvent;
import io.netty.handler.codec.DecoderException;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.function.Executable;
import java.security.NoSuchAlgorithmException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLHandshakeException;
import java.security.NoSuchAlgorithmException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import static io.netty.handler.ssl.CloseNotifyTest.assertCloseNotify;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
@ -107,6 +108,33 @@ public class ApplicationProtocolNegotiationHandlerTest {
@Test
public void testBufferMessagesUntilHandshakeComplete() throws Exception {
testBufferMessagesUntilHandshakeComplete(null);
}
@Test
public void testBufferMessagesUntilHandshakeCompleteWithClose() throws Exception {
testBufferMessagesUntilHandshakeComplete(
new ApplicationProtocolNegotiationHandlerTest.Consumer<ChannelHandlerContext>() {
@Override
public void consume(ChannelHandlerContext ctx) {
ctx.channel().close();
}
});
}
@Test
public void testBufferMessagesUntilHandshakeCompleteWithInputShutdown() throws Exception {
testBufferMessagesUntilHandshakeComplete(
new ApplicationProtocolNegotiationHandlerTest.Consumer<ChannelHandlerContext>() {
@Override
public void consume(ChannelHandlerContext ctx) {
ctx.fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE);
}
});
}
private void testBufferMessagesUntilHandshakeComplete(final Consumer<ChannelHandlerContext> pipelineConfigurator)
throws Exception {
final AtomicReference<byte[]> channelReadData = new AtomicReference<byte[]>();
final AtomicBoolean channelReadCompleteCalled = new AtomicBoolean(false);
ChannelHandler alpnHandler = new ApplicationProtocolNegotiationHandler(ApplicationProtocolNames.HTTP_1_1) {
@ -115,15 +143,18 @@ public class ApplicationProtocolNegotiationHandlerTest {
assertEquals(ApplicationProtocolNames.HTTP_1_1, protocol);
ctx.pipeline().addLast(new ChannelHandler() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
public void channelRead(ChannelHandlerContext ctx, Object msg) {
channelReadData.set((byte[]) msg);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
public void channelReadComplete(ChannelHandlerContext ctx) {
channelReadCompleteCalled.set(true);
}
});
if (pipelineConfigurator != null) {
pipelineConfigurator.consume(ctx);
}
}
};
@ -148,6 +179,10 @@ public class ApplicationProtocolNegotiationHandlerTest {
assertArrayEquals(someBytes, channelReadData.get());
assertTrue(channelReadCompleteCalled.get());
assertNull(channel.readInbound());
channel.finishAndReleaseAll();
assertTrue(channel.finishAndReleaseAll());
}
private interface Consumer<T> {
void consume(T t);
}
}