Cleaning up code for determining whether the HTTP/2 connection header
and initial settings frame were sent. Motivation: The current logic for marking the connection header and initial settings frame is a bit complicated. Modifications: - Http2FrameEncoder, Http2ConnectionHandler: removed sent listeners and set the sent flag immediately. Result: Just a code cleanup ... behavior is the same.
This commit is contained in:
parent
58423a448a
commit
a992e7fb48
@ -25,7 +25,6 @@ import static io.netty.handler.codec.http2.draft10.connection.Http2Stream.State.
|
||||
import static io.netty.handler.codec.http2.draft10.connection.Http2Stream.State.OPEN;
|
||||
import static io.netty.handler.codec.http2.draft10.connection.Http2Stream.State.RESERVED_LOCAL;
|
||||
import static io.netty.handler.codec.http2.draft10.connection.Http2Stream.State.RESERVED_REMOTE;
|
||||
import static io.netty.handler.codec.http2.draft10.frame.Http2FrameCodecUtil.CONNECTION_PREFACE;
|
||||
import io.netty.channel.ChannelFuture;
|
||||
import io.netty.channel.ChannelFutureListener;
|
||||
import io.netty.channel.ChannelHandlerAdapter;
|
||||
@ -110,6 +109,21 @@ public class Http2ConnectionHandler extends ChannelHandlerAdapter {
|
||||
this.outboundFlow = outboundFlow;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void channelActive(ChannelHandlerContext ctx) throws Exception {
|
||||
// The channel just became active - send the initial settings frame to the remote
|
||||
// endpoint.
|
||||
sendInitialSettings(ctx);
|
||||
super.handlerAdded(ctx);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
|
||||
// This handler was just added to the context. In case it was handled after
|
||||
// the connection became active, send the initial settings frame now.
|
||||
sendInitialSettings(ctx);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
|
||||
// Avoid NotYetConnectedException
|
||||
@ -145,11 +159,7 @@ public class Http2ConnectionHandler extends ChannelHandlerAdapter {
|
||||
@Override
|
||||
public void channelRead(ChannelHandlerContext ctx, Object inMsg) throws Exception {
|
||||
try {
|
||||
if (inMsg == CONNECTION_PREFACE) {
|
||||
// The connection preface has been received from the remote endpoint, we're
|
||||
// beginning an HTTP2 connection. Send the initial settings to the remote endpoint.
|
||||
sendInitialSettings(ctx);
|
||||
} else if (inMsg instanceof Http2DataFrame) {
|
||||
if (inMsg instanceof Http2DataFrame) {
|
||||
handleInboundData(ctx, (Http2DataFrame) inMsg);
|
||||
} else if (inMsg instanceof Http2HeadersFrame) {
|
||||
handleInboundHeaders(ctx, (Http2HeadersFrame) inMsg);
|
||||
@ -181,12 +191,6 @@ public class Http2ConnectionHandler extends ChannelHandlerAdapter {
|
||||
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
|
||||
throws Exception {
|
||||
try {
|
||||
if (!initialSettingsReceived) {
|
||||
throw protocolError(
|
||||
"Attempting to send frame (%s) before initial settings received", msg
|
||||
.getClass().getName());
|
||||
}
|
||||
|
||||
if (msg instanceof Http2DataFrame) {
|
||||
handleOutboundData(ctx, (Http2DataFrame) msg, promise);
|
||||
} else if (msg instanceof Http2HeadersFrame) {
|
||||
@ -649,25 +653,25 @@ public class Http2ConnectionHandler extends ChannelHandlerAdapter {
|
||||
/**
|
||||
* Sends the initial settings frame upon establishment of the connection, if not already sent.
|
||||
*/
|
||||
private void sendInitialSettings(ChannelHandlerContext ctx) throws Http2Exception {
|
||||
if (initialSettingsSent) {
|
||||
throw protocolError("Already sent initial settings.");
|
||||
}
|
||||
private void sendInitialSettings(final ChannelHandlerContext ctx) throws Http2Exception {
|
||||
if (!initialSettingsSent && ctx.channel().isActive()) {
|
||||
initialSettingsSent = true;
|
||||
|
||||
// Create and send the frame to the remote endpoint.
|
||||
DefaultHttp2SettingsFrame frame =
|
||||
new DefaultHttp2SettingsFrame.Builder()
|
||||
.setInitialWindowSize(inboundFlow.getInitialInboundWindowSize())
|
||||
.setMaxConcurrentStreams(connection.remote().getMaxStreams())
|
||||
.setPushEnabled(connection.local().isPushToAllowed()).build();
|
||||
|
||||
ctx.writeAndFlush(frame).addListener(new ChannelFutureListener() {
|
||||
@Override
|
||||
public void operationComplete(ChannelFuture future) throws Exception {
|
||||
if (future.isSuccess()) {
|
||||
initialSettingsSent = true;
|
||||
// Create and send the frame to the remote endpoint.
|
||||
DefaultHttp2SettingsFrame frame =
|
||||
new DefaultHttp2SettingsFrame.Builder()
|
||||
.setInitialWindowSize(inboundFlow.getInitialInboundWindowSize())
|
||||
.setMaxConcurrentStreams(connection.remote().getMaxStreams())
|
||||
.setPushEnabled(connection.local().isPushToAllowed()).build();
|
||||
ctx.writeAndFlush(frame).addListener(new ChannelFutureListener() {
|
||||
@Override
|
||||
public void operationComplete(ChannelFuture future) throws Exception {
|
||||
if (!future.isSuccess() && ctx.channel().isOpen()) {
|
||||
// The write failed, close the connection.
|
||||
ctx.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -17,7 +17,6 @@ package io.netty.handler.codec.http2.draft10.frame.decoder;
|
||||
|
||||
import static io.netty.handler.codec.http2.draft10.Http2Error.PROTOCOL_ERROR;
|
||||
import static io.netty.handler.codec.http2.draft10.Http2Exception.format;
|
||||
import static io.netty.handler.codec.http2.draft10.frame.Http2FrameCodecUtil.CONNECTION_PREFACE;
|
||||
import static io.netty.handler.codec.http2.draft10.frame.Http2FrameCodecUtil.FRAME_HEADER_LENGTH;
|
||||
import static io.netty.handler.codec.http2.draft10.frame.Http2FrameCodecUtil.FRAME_LENGTH_MASK;
|
||||
import static io.netty.handler.codec.http2.draft10.frame.Http2FrameCodecUtil.connectionPrefaceBuf;
|
||||
@ -125,10 +124,6 @@ public class Http2FrameDecoder extends ByteToMessageDecoder {
|
||||
return;
|
||||
}
|
||||
|
||||
// Fire the connection preface to notify the connection handler that it should send the
|
||||
// initial settings frame.
|
||||
ctx.fireChannelRead(CONNECTION_PREFACE);
|
||||
|
||||
// Start processing the first header.
|
||||
state = State.FRAME_HEADER;
|
||||
}
|
||||
|
@ -15,7 +15,6 @@
|
||||
|
||||
package io.netty.handler.codec.http2.draft10.frame.encoder;
|
||||
|
||||
import static io.netty.handler.codec.http2.draft10.Http2Exception.protocolError;
|
||||
import static io.netty.handler.codec.http2.draft10.frame.Http2FrameCodecUtil.connectionPrefaceBuf;
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.channel.ChannelFuture;
|
||||
@ -34,7 +33,6 @@ import io.netty.handler.codec.http2.draft10.frame.Http2Frame;
|
||||
public class Http2FrameEncoder extends MessageToByteEncoder<Http2Frame> {
|
||||
|
||||
private final Http2FrameMarshaller frameMarshaller;
|
||||
private ChannelFutureListener prefaceWriteListener;
|
||||
private boolean prefaceWritten;
|
||||
|
||||
public Http2FrameEncoder() {
|
||||
@ -68,12 +66,6 @@ public class Http2FrameEncoder extends MessageToByteEncoder<Http2Frame> {
|
||||
protected void encode(ChannelHandlerContext ctx, Http2Frame frame, ByteBuf out)
|
||||
throws Exception {
|
||||
try {
|
||||
if (!prefaceWritten) {
|
||||
throw protocolError(
|
||||
"Attempting to send frame before connection preface written: %s", frame
|
||||
.getClass().getName());
|
||||
}
|
||||
|
||||
frameMarshaller.marshall(frame, out, ctx.alloc());
|
||||
} catch (Throwable t) {
|
||||
ctx.fireExceptionCaught(t);
|
||||
@ -84,20 +76,17 @@ public class Http2FrameEncoder extends MessageToByteEncoder<Http2Frame> {
|
||||
* Sends the HTTP2 connection preface to the remote endpoint, if not already sent.
|
||||
*/
|
||||
private void sendPreface(final ChannelHandlerContext ctx) {
|
||||
if (!prefaceWritten && prefaceWriteListener == null && ctx.channel().isActive()) {
|
||||
prefaceWriteListener = new ChannelFutureListener() {
|
||||
if (!prefaceWritten && ctx.channel().isActive()) {
|
||||
prefaceWritten = true;
|
||||
ctx.writeAndFlush(connectionPrefaceBuf()).addListener(new ChannelFutureListener() {
|
||||
@Override
|
||||
public void operationComplete(ChannelFuture future) throws Exception {
|
||||
if (future.isSuccess()) {
|
||||
prefaceWritten = true;
|
||||
prefaceWriteListener = null;
|
||||
} else if (ctx.channel().isOpen()) {
|
||||
if (!future.isSuccess() && ctx.channel().isOpen()) {
|
||||
// The write failed, close the connection.
|
||||
ctx.close();
|
||||
}
|
||||
}
|
||||
};
|
||||
ctx.writeAndFlush(connectionPrefaceBuf()).addListener(prefaceWriteListener);
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -15,7 +15,6 @@
|
||||
|
||||
package io.netty.handler.codec.http2.draft10.frame;
|
||||
|
||||
import static io.netty.handler.codec.http2.draft10.frame.Http2FrameCodecUtil.CONNECTION_PREFACE;
|
||||
import static io.netty.util.CharsetUtil.UTF_8;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import io.netty.bootstrap.Bootstrap;
|
||||
@ -114,8 +113,8 @@ public class Http2PrefaceTest {
|
||||
});
|
||||
|
||||
// Wait a bit and verify that the connection was closed.
|
||||
assertTrue(serverHandler.awaitClose());
|
||||
assertTrue(clientHandler.awaitClose());
|
||||
serverHandler.awaitClose();
|
||||
clientHandler.awaitClose();
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -127,15 +126,12 @@ public class Http2PrefaceTest {
|
||||
p.addLast("codec", new Http2FrameCodec());
|
||||
p.addLast("wrongFrameGenerator", new ChannelHandlerAdapter() {
|
||||
@Override
|
||||
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
|
||||
if (msg == CONNECTION_PREFACE) {
|
||||
ByteBuf buf = Unpooled.copiedBuffer("01234567", UTF_8);
|
||||
Http2PingFrame frame =
|
||||
new DefaultHttp2PingFrame.Builder().setData(buf).build();
|
||||
ctx.writeAndFlush(frame);
|
||||
} else {
|
||||
super.channelRead(ctx, msg);
|
||||
}
|
||||
public void channelActive(ChannelHandlerContext ctx) throws Exception {
|
||||
ByteBuf buf = Unpooled.copiedBuffer("01234567", UTF_8);
|
||||
Http2PingFrame frame =
|
||||
new DefaultHttp2PingFrame.Builder().setData(buf).build();
|
||||
ctx.writeAndFlush(frame);
|
||||
super.channelActive(ctx);
|
||||
}
|
||||
});
|
||||
p.addLast("handler", clientHandler);
|
||||
@ -143,8 +139,8 @@ public class Http2PrefaceTest {
|
||||
});
|
||||
|
||||
// Wait a bit and verify that the connection was closed.
|
||||
assertTrue(serverHandler.awaitClose());
|
||||
assertTrue(clientHandler.awaitClose());
|
||||
serverHandler.awaitClose();
|
||||
clientHandler.awaitClose();
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -181,29 +177,23 @@ public class Http2PrefaceTest {
|
||||
private static class CaptureHandler extends ChannelHandlerAdapter {
|
||||
final DefaultPromise<Http2SettingsFrame> settings;
|
||||
final DefaultPromise<Http2SettingsFrame> settingsAck;
|
||||
final DefaultPromise<Void> initFuture;
|
||||
Channel channel;
|
||||
final DefaultPromise<Void> closeFuture;
|
||||
//Channel channel;
|
||||
|
||||
CaptureHandler(EventExecutor executor) {
|
||||
settings = new DefaultPromise<Http2SettingsFrame>(executor);
|
||||
settingsAck = new DefaultPromise<Http2SettingsFrame>(executor);
|
||||
initFuture = new DefaultPromise<Void>(executor);
|
||||
closeFuture = new DefaultPromise<Void>(executor);
|
||||
}
|
||||
|
||||
public boolean awaitClose() throws Exception {
|
||||
initFuture.await();
|
||||
for (int i = 0; channel.isOpen() && i < 5; ++i) {
|
||||
Thread.sleep(10);
|
||||
}
|
||||
return !channel.isOpen();
|
||||
public void awaitClose() throws Exception {
|
||||
closeFuture.await(5000);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void channelActive(ChannelHandlerContext ctx) throws Exception {
|
||||
channel = ctx.channel();
|
||||
initFuture.setSuccess(null);
|
||||
|
||||
super.channelActive(ctx);
|
||||
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
|
||||
closeFuture.setSuccess(null);
|
||||
super.channelInactive(ctx);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
Loading…
Reference in New Issue
Block a user