HTTP/2 to support asynchronous SETTINGS ACK (#9069)
Motivation: The HTTP/2 codec will synchronously respond to a SETTINGS frame with a SETTINGS ACK before the application sees the SETTINGS frame. The application may need to adjust its state depending upon what is in the SETTINGS frame before applying the remote settings and responding with an ACK (e.g. to adjust for max concurrent streams). In order to accomplish this the HTTP/2 codec should allow for the application to opt-in to sending the SETTINGS ACK. Modifications: - DefaultHttp2ConnectionDecoder should support a mode where SETTINGS frames can be queued instead of immediately applying and ACKing. - DefaultHttp2ConnectionEncoder should attempt to poll from the queue (if it exists) to apply the earliest received but not yet ACKed SETTINGS frame. - AbstractHttp2ConnectionHandlerBuilder (and sub classes) should support a new option to enable the application to opt-in to managing SETTINGS ACK. Result: HTTP/2 allows for asynchronous SETTINGS ACK managed by the application.
This commit is contained in:
parent
ddfe888173
commit
29661fdc96
@ -22,6 +22,7 @@ import io.netty.util.internal.UnstableApi;
|
||||
import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_HEADER_LIST_SIZE;
|
||||
import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_INITIAL_HUFFMAN_DECODE_CAPACITY;
|
||||
import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_MAX_RESERVED_STREAMS;
|
||||
import static io.netty.handler.codec.http2.Http2PromisedRequestVerifier.ALWAYS_VERIFY;
|
||||
import static io.netty.util.internal.ObjectUtil.checkPositive;
|
||||
import static io.netty.util.internal.ObjectUtil.checkPositiveOrZero;
|
||||
import static java.util.Objects.requireNonNull;
|
||||
@ -104,6 +105,8 @@ public abstract class AbstractHttp2ConnectionHandlerBuilder<T extends Http2Conne
|
||||
private Boolean encoderEnforceMaxConcurrentStreams;
|
||||
private Boolean encoderIgnoreMaxHeaderListSize;
|
||||
private int initialHuffmanDecodeCapacity = DEFAULT_INITIAL_HUFFMAN_DECODE_CAPACITY;
|
||||
private Http2PromisedRequestVerifier promisedRequestVerifier = ALWAYS_VERIFY;
|
||||
private boolean autoAckSettingsFrame = true;
|
||||
|
||||
/**
|
||||
* Sets the {@link Http2Settings} to use for the initial connection settings exchange.
|
||||
@ -362,6 +365,42 @@ public abstract class AbstractHttp2ConnectionHandlerBuilder<T extends Http2Conne
|
||||
return self();
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the {@link Http2PromisedRequestVerifier} to use.
|
||||
* @return this.
|
||||
*/
|
||||
protected B promisedRequestVerifier(Http2PromisedRequestVerifier promisedRequestVerifier) {
|
||||
enforceNonCodecConstraints("promisedRequestVerifier");
|
||||
this.promisedRequestVerifier = requireNonNull(promisedRequestVerifier, "promisedRequestVerifier");
|
||||
return self();
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the {@link Http2PromisedRequestVerifier} to use.
|
||||
* @return the {@link Http2PromisedRequestVerifier} to use.
|
||||
*/
|
||||
protected Http2PromisedRequestVerifier promisedRequestVerifier() {
|
||||
return promisedRequestVerifier;
|
||||
}
|
||||
|
||||
/**
|
||||
* Determine if settings frame should automatically be acknowledged and applied.
|
||||
* @return this.
|
||||
*/
|
||||
protected B autoAckSettingsFrame(boolean autoAckSettings) {
|
||||
enforceNonCodecConstraints("autoAckSettingsFrame");
|
||||
this.autoAckSettingsFrame = autoAckSettings;
|
||||
return self();
|
||||
}
|
||||
|
||||
/**
|
||||
* Determine if the SETTINGS frames should be automatically acknowledged and applied.
|
||||
* @return {@code true} if the SETTINGS frames should be automatically acknowledged and applied.
|
||||
*/
|
||||
protected boolean isAutoAckSettingsFrame() {
|
||||
return autoAckSettingsFrame;
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a new {@link Http2ConnectionHandler}.
|
||||
*/
|
||||
@ -407,7 +446,8 @@ public abstract class AbstractHttp2ConnectionHandlerBuilder<T extends Http2Conne
|
||||
encoder = new StreamBufferingEncoder(encoder);
|
||||
}
|
||||
|
||||
Http2ConnectionDecoder decoder = new DefaultHttp2ConnectionDecoder(connection, encoder, reader);
|
||||
DefaultHttp2ConnectionDecoder decoder = new DefaultHttp2ConnectionDecoder(connection, encoder, reader,
|
||||
promisedRequestVerifier(), isAutoAckSettingsFrame());
|
||||
return buildFromCodec(decoder, encoder);
|
||||
}
|
||||
|
||||
|
@ -22,7 +22,8 @@ import static java.util.Objects.requireNonNull;
|
||||
* A decorator around another {@link Http2ConnectionEncoder} instance.
|
||||
*/
|
||||
@UnstableApi
|
||||
public class DecoratingHttp2ConnectionEncoder extends DecoratingHttp2FrameWriter implements Http2ConnectionEncoder {
|
||||
public class DecoratingHttp2ConnectionEncoder extends DecoratingHttp2FrameWriter implements Http2ConnectionEncoder,
|
||||
Http2SettingsReceivedConsumer {
|
||||
private final Http2ConnectionEncoder delegate;
|
||||
|
||||
public DecoratingHttp2ConnectionEncoder(Http2ConnectionEncoder delegate) {
|
||||
@ -59,4 +60,13 @@ public class DecoratingHttp2ConnectionEncoder extends DecoratingHttp2FrameWriter
|
||||
public void remoteSettings(Http2Settings settings) throws Http2Exception {
|
||||
delegate.remoteSettings(settings);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void consumeReceivedSettings(Http2Settings settings) {
|
||||
if (delegate instanceof Http2SettingsReceivedConsumer) {
|
||||
((Http2SettingsReceivedConsumer) delegate).consumeReceivedSettings(settings);
|
||||
}
|
||||
throw new IllegalStateException("delegate " + delegate + " is not an instance of " +
|
||||
Http2SettingsReceivedConsumer.class);
|
||||
}
|
||||
}
|
||||
|
@ -57,6 +57,7 @@ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder {
|
||||
private final Http2FrameReader frameReader;
|
||||
private Http2FrameListener listener;
|
||||
private final Http2PromisedRequestVerifier requestVerifier;
|
||||
private final Http2SettingsReceivedConsumer settingsReceivedConsumer;
|
||||
|
||||
public DefaultHttp2ConnectionDecoder(Http2Connection connection,
|
||||
Http2ConnectionEncoder encoder,
|
||||
@ -68,6 +69,35 @@ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder {
|
||||
Http2ConnectionEncoder encoder,
|
||||
Http2FrameReader frameReader,
|
||||
Http2PromisedRequestVerifier requestVerifier) {
|
||||
this(connection, encoder, frameReader, requestVerifier, true);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a new instance.
|
||||
* @param connection The {@link Http2Connection} associated with this decoder.
|
||||
* @param encoder The {@link Http2ConnectionEncoder} associated with this decoder.
|
||||
* @param frameReader Responsible for reading/parsing the raw frames. As opposed to this object which applies
|
||||
* h2 semantics on top of the frames.
|
||||
* @param requestVerifier Determines if push promised streams are valid.
|
||||
* @param autoAckSettings {@code false} to disable automatically applying and sending settings acknowledge frame.
|
||||
* The {@code Http2ConnectionEncoder} is expected to be an instance of {@link Http2SettingsReceivedConsumer} and
|
||||
* will apply the earliest received but not yet ACKed SETTINGS when writing the SETTINGS ACKs.
|
||||
* {@code true} to enable automatically applying and sending settings acknowledge frame.
|
||||
*/
|
||||
public DefaultHttp2ConnectionDecoder(Http2Connection connection,
|
||||
Http2ConnectionEncoder encoder,
|
||||
Http2FrameReader frameReader,
|
||||
Http2PromisedRequestVerifier requestVerifier,
|
||||
boolean autoAckSettings) {
|
||||
if (autoAckSettings) {
|
||||
settingsReceivedConsumer = null;
|
||||
} else {
|
||||
if (!(encoder instanceof Http2SettingsReceivedConsumer)) {
|
||||
throw new IllegalArgumentException("disabling autoAckSettings requires the encoder to be a " +
|
||||
Http2SettingsReceivedConsumer.class);
|
||||
}
|
||||
settingsReceivedConsumer = (Http2SettingsReceivedConsumer) encoder;
|
||||
}
|
||||
this.connection = requireNonNull(connection, "connection");
|
||||
this.frameReader = requireNonNull(frameReader, "frameReader");
|
||||
this.encoder = requireNonNull(encoder, "encoder");
|
||||
@ -401,13 +431,17 @@ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onSettingsRead(ChannelHandlerContext ctx, Http2Settings settings) throws Http2Exception {
|
||||
// Acknowledge receipt of the settings. We should do this before we process the settings to ensure our
|
||||
// remote peer applies these settings before any subsequent frames that we may send which depend upon these
|
||||
// new settings. See https://github.com/netty/netty/issues/6520.
|
||||
encoder.writeSettingsAck(ctx, ctx.newPromise());
|
||||
public void onSettingsRead(final ChannelHandlerContext ctx, Http2Settings settings) throws Http2Exception {
|
||||
if (settingsReceivedConsumer == null) {
|
||||
// Acknowledge receipt of the settings. We should do this before we process the settings to ensure our
|
||||
// remote peer applies these settings before any subsequent frames that we may send which depend upon
|
||||
// these new settings. See https://github.com/netty/netty/issues/6520.
|
||||
encoder.writeSettingsAck(ctx, ctx.newPromise());
|
||||
|
||||
encoder.remoteSettings(settings);
|
||||
encoder.remoteSettings(settings);
|
||||
} else {
|
||||
settingsReceivedConsumer.consumeReceivedSettings(settings);
|
||||
}
|
||||
|
||||
listener.onSettingsRead(ctx, settings);
|
||||
}
|
||||
|
@ -21,12 +21,15 @@ import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.ChannelPromise;
|
||||
import io.netty.channel.CoalescingBufferQueue;
|
||||
import io.netty.handler.codec.http.HttpStatusClass;
|
||||
import io.netty.handler.codec.http2.Http2CodecUtil.SimpleChannelPromiseAggregator;
|
||||
import io.netty.util.internal.UnstableApi;
|
||||
|
||||
import java.util.ArrayDeque;
|
||||
import java.util.Queue;
|
||||
|
||||
import static io.netty.handler.codec.http.HttpStatusClass.INFORMATIONAL;
|
||||
import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_PRIORITY_WEIGHT;
|
||||
import static io.netty.handler.codec.http2.Http2Error.INTERNAL_ERROR;
|
||||
import static io.netty.handler.codec.http2.Http2Error.PROTOCOL_ERROR;
|
||||
import static io.netty.handler.codec.http2.Http2Exception.connectionError;
|
||||
import static io.netty.util.internal.ObjectUtil.checkPositiveOrZero;
|
||||
@ -38,13 +41,14 @@ import static java.util.Objects.requireNonNull;
|
||||
* Default implementation of {@link Http2ConnectionEncoder}.
|
||||
*/
|
||||
@UnstableApi
|
||||
public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder {
|
||||
public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder, Http2SettingsReceivedConsumer {
|
||||
private final Http2FrameWriter frameWriter;
|
||||
private final Http2Connection connection;
|
||||
private Http2LifecycleManager lifecycleManager;
|
||||
// We prefer ArrayDeque to LinkedList because later will produce more GC.
|
||||
// This initial capacity is plenty for SETTINGS traffic.
|
||||
private final ArrayDeque<Http2Settings> outstandingLocalSettingsQueue = new ArrayDeque<>(4);
|
||||
private final Queue<Http2Settings> outstandingLocalSettingsQueue = new ArrayDeque<>(4);
|
||||
private Queue<Http2Settings> outstandingRemoteSettingsQueue;
|
||||
|
||||
public DefaultHttp2ConnectionEncoder(Http2Connection connection, Http2FrameWriter frameWriter) {
|
||||
this.connection = requireNonNull(connection, "connection");
|
||||
@ -274,7 +278,32 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder {
|
||||
|
||||
@Override
|
||||
public ChannelFuture writeSettingsAck(ChannelHandlerContext ctx, ChannelPromise promise) {
|
||||
return frameWriter.writeSettingsAck(ctx, promise);
|
||||
if (outstandingRemoteSettingsQueue == null) {
|
||||
return frameWriter.writeSettingsAck(ctx, promise);
|
||||
}
|
||||
Http2Settings settings = outstandingRemoteSettingsQueue.poll();
|
||||
if (settings == null) {
|
||||
return promise.setFailure(new Http2Exception(INTERNAL_ERROR, "attempted to write a SETTINGS ACK with no " +
|
||||
" pending SETTINGS"));
|
||||
}
|
||||
SimpleChannelPromiseAggregator aggregator = new SimpleChannelPromiseAggregator(promise, ctx.channel(),
|
||||
ctx.executor());
|
||||
// Acknowledge receipt of the settings. We should do this before we process the settings to ensure our
|
||||
// remote peer applies these settings before any subsequent frames that we may send which depend upon
|
||||
// these new settings. See https://github.com/netty/netty/issues/6520.
|
||||
frameWriter.writeSettingsAck(ctx, aggregator);
|
||||
|
||||
// We create a "new promise" to make sure that status from both the write and the application are taken into
|
||||
// account independently.
|
||||
ChannelPromise applySettingsPromise = aggregator.newPromise();
|
||||
try {
|
||||
remoteSettings(settings);
|
||||
applySettingsPromise.setSuccess();
|
||||
} catch (Throwable e) {
|
||||
applySettingsPromise.setFailure(e);
|
||||
lifecycleManager.onError(ctx, true, e);
|
||||
}
|
||||
return aggregator.doneAllocatingPromises();
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -367,6 +396,14 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder {
|
||||
return stream;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void consumeReceivedSettings(Http2Settings settings) {
|
||||
if (outstandingRemoteSettingsQueue == null) {
|
||||
outstandingRemoteSettingsQueue = new ArrayDeque<Http2Settings>(2);
|
||||
}
|
||||
outstandingRemoteSettingsQueue.add(settings);
|
||||
}
|
||||
|
||||
/**
|
||||
* Wrap a DATA frame so it can be written subject to flow-control. Note that this implementation assumes it
|
||||
* only writes padding once for the entire payload as opposed to writing it once per-frame. This makes the
|
||||
|
@ -0,0 +1,33 @@
|
||||
/*
|
||||
* Copyright 2019 The Netty Project
|
||||
*
|
||||
* The Netty Project licenses this file to you under the Apache License,
|
||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at:
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package io.netty.handler.codec.http2;
|
||||
|
||||
import io.netty.util.internal.StringUtil;
|
||||
|
||||
/**
|
||||
* The default {@link Http2SettingsAckFrame} implementation.
|
||||
*/
|
||||
final class DefaultHttp2SettingsAckFrame implements Http2SettingsAckFrame {
|
||||
@Override
|
||||
public String name() {
|
||||
return "SETTINGS(ACK)";
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return StringUtil.simpleClassName(this);
|
||||
}
|
||||
}
|
@ -43,6 +43,20 @@ public class DefaultHttp2SettingsFrame implements Http2SettingsFrame {
|
||||
return "SETTINGS";
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (!(o instanceof Http2SettingsFrame)) {
|
||||
return false;
|
||||
}
|
||||
Http2SettingsFrame other = (Http2SettingsFrame) o;
|
||||
return settings.equals(other.settings());
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return settings.hashCode();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return StringUtil.simpleClassName(this) + "(settings=" + settings + ')';
|
||||
|
@ -88,25 +88,6 @@ public class Http2ConnectionHandler extends ByteToMessageDecoder implements Http
|
||||
}
|
||||
}
|
||||
|
||||
Http2ConnectionHandler(boolean server, Http2FrameWriter frameWriter, Http2FrameLogger frameLogger,
|
||||
Http2Settings initialSettings) {
|
||||
this.initialSettings = requireNonNull(initialSettings, "initialSettings");
|
||||
|
||||
Http2Connection connection = new DefaultHttp2Connection(server);
|
||||
|
||||
Long maxHeaderListSize = initialSettings.maxHeaderListSize();
|
||||
Http2FrameReader frameReader = new DefaultHttp2FrameReader(maxHeaderListSize == null ?
|
||||
new DefaultHttp2HeadersDecoder(true) :
|
||||
new DefaultHttp2HeadersDecoder(true, maxHeaderListSize));
|
||||
|
||||
if (frameLogger != null) {
|
||||
frameWriter = new Http2OutboundFrameLogger(frameWriter, frameLogger);
|
||||
frameReader = new Http2InboundFrameLogger(frameReader, frameLogger);
|
||||
}
|
||||
encoder = new DefaultHttp2ConnectionEncoder(connection, frameWriter);
|
||||
decoder = new DefaultHttp2ConnectionDecoder(connection, encoder, frameReader);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the amount of time (in milliseconds) this endpoint will wait for all streams to be closed before closing
|
||||
* the connection during the graceful shutdown process. Returns -1 if this connection is configured to wait
|
||||
|
@ -296,6 +296,10 @@ public class Http2FrameCodec extends Http2ConnectionHandler {
|
||||
encoder().writePing(ctx, frame.ack(), frame.content(), promise);
|
||||
} else if (msg instanceof Http2SettingsFrame) {
|
||||
encoder().writeSettings(ctx, ((Http2SettingsFrame) msg).settings(), promise);
|
||||
} else if (msg instanceof Http2SettingsAckFrame) {
|
||||
// In the event of manual SETTINGS ACK is is assumed the encoder will apply the earliest received but not
|
||||
// yet ACKed settings.
|
||||
encoder().writeSettingsAck(ctx, promise);
|
||||
} else if (msg instanceof Http2GoAwayFrame) {
|
||||
writeGoAwayFrame(ctx, (Http2GoAwayFrame) msg, promise);
|
||||
} else if (msg instanceof Http2UnknownFrame) {
|
||||
@ -574,7 +578,7 @@ public class Http2FrameCodec extends Http2ConnectionHandler {
|
||||
|
||||
@Override
|
||||
public void onSettingsAckRead(ChannelHandlerContext ctx) {
|
||||
// TODO: Maybe handle me
|
||||
onHttp2Frame(ctx, Http2SettingsAckFrame.INSTANCE);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -162,7 +162,8 @@ public class Http2FrameCodecBuilder extends
|
||||
if (encoderEnforceMaxConcurrentStreams()) {
|
||||
encoder = new StreamBufferingEncoder(encoder);
|
||||
}
|
||||
Http2ConnectionDecoder decoder = new DefaultHttp2ConnectionDecoder(connection, encoder, frameReader);
|
||||
Http2ConnectionDecoder decoder = new DefaultHttp2ConnectionDecoder(connection, encoder, frameReader,
|
||||
promisedRequestVerifier(), isAutoAckSettingsFrame());
|
||||
|
||||
return build(decoder, encoder, initialSettings());
|
||||
}
|
||||
|
@ -1111,7 +1111,7 @@ public class Http2MultiplexCodec extends Http2FrameCodec {
|
||||
}
|
||||
return;
|
||||
}
|
||||
} else {
|
||||
} else {
|
||||
String msgStr = msg.toString();
|
||||
ReferenceCountUtil.release(msg);
|
||||
promise.setFailure(new IllegalArgumentException(
|
||||
|
@ -165,6 +165,11 @@ public class Http2MultiplexCodecBuilder
|
||||
return super.initialHuffmanDecodeCapacity(initialHuffmanDecodeCapacity);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Http2MultiplexCodecBuilder autoAckSettingsFrame(boolean autoAckSettings) {
|
||||
return super.autoAckSettingsFrame(autoAckSettings);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Http2MultiplexCodec build() {
|
||||
Http2FrameWriter frameWriter = this.frameWriter;
|
||||
@ -185,7 +190,8 @@ public class Http2MultiplexCodecBuilder
|
||||
if (encoderEnforceMaxConcurrentStreams()) {
|
||||
encoder = new StreamBufferingEncoder(encoder);
|
||||
}
|
||||
Http2ConnectionDecoder decoder = new DefaultHttp2ConnectionDecoder(connection, encoder, frameReader);
|
||||
Http2ConnectionDecoder decoder = new DefaultHttp2ConnectionDecoder(connection, encoder, frameReader,
|
||||
promisedRequestVerifier(), isAutoAckSettingsFrame());
|
||||
|
||||
return build(decoder, encoder, initialSettings());
|
||||
}
|
||||
|
@ -0,0 +1,29 @@
|
||||
/*
|
||||
* Copyright 2019 The Netty Project
|
||||
*
|
||||
* The Netty Project licenses this file to you under the Apache License,
|
||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at:
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package io.netty.handler.codec.http2;
|
||||
|
||||
/**
|
||||
* An ack for a previously received {@link Http2SettingsFrame}.
|
||||
* <p>
|
||||
* The <a href="https://tools.ietf.org/html/rfc7540#section-6.5">HTTP/2 protocol</a> enforces that ACKs are applied in
|
||||
* order, so this ACK will apply to the earliest received and not yet ACKed {@link Http2SettingsFrame} frame.
|
||||
*/
|
||||
public interface Http2SettingsAckFrame extends Http2Frame {
|
||||
Http2SettingsAckFrame INSTANCE = new DefaultHttp2SettingsAckFrame();
|
||||
|
||||
@Override
|
||||
String name();
|
||||
}
|
@ -0,0 +1,25 @@
|
||||
/*
|
||||
* Copyright 2019 The Netty Project
|
||||
*
|
||||
* The Netty Project licenses this file to you under the Apache License, version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance with the License. You may obtain a
|
||||
* copy of the License at:
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software distributed under the License
|
||||
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
|
||||
* or implied. See the License for the specific language governing permissions and limitations under
|
||||
* the License.
|
||||
*/
|
||||
package io.netty.handler.codec.http2;
|
||||
|
||||
/**
|
||||
* Provides a Consumer like interface to consume remote settings received but not yet ACKed.
|
||||
*/
|
||||
public interface Http2SettingsReceivedConsumer {
|
||||
/**
|
||||
* Consume the most recently received but not yet ACKed settings.
|
||||
*/
|
||||
void consumeReceivedSettings(Http2Settings settings);
|
||||
}
|
@ -60,7 +60,6 @@ import static io.netty.handler.codec.http2.Http2TestUtil.anyChannelPromise;
|
||||
import static io.netty.handler.codec.http2.Http2TestUtil.anyHttp2Settings;
|
||||
import static io.netty.handler.codec.http2.Http2TestUtil.assertEqualsAndRelease;
|
||||
import static io.netty.handler.codec.http2.Http2TestUtil.bb;
|
||||
|
||||
import static org.hamcrest.Matchers.instanceOf;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
@ -153,6 +152,8 @@ public class Http2FrameCodecTest {
|
||||
|
||||
Http2SettingsFrame settingsFrame = inboundHandler.readInbound();
|
||||
assertNotNull(settingsFrame);
|
||||
Http2SettingsAckFrame settingsAckFrame = inboundHandler.readInbound();
|
||||
assertNotNull(settingsAckFrame);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -45,12 +45,11 @@ import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import static io.netty.util.ReferenceCountUtil.release;
|
||||
import static io.netty.handler.codec.http2.Http2TestUtil.anyChannelPromise;
|
||||
import static io.netty.handler.codec.http2.Http2TestUtil.anyHttp2Settings;
|
||||
import static io.netty.handler.codec.http2.Http2TestUtil.assertEqualsAndRelease;
|
||||
import static io.netty.handler.codec.http2.Http2TestUtil.bb;
|
||||
|
||||
import static io.netty.util.ReferenceCountUtil.release;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
@ -106,6 +105,8 @@ public class Http2MultiplexCodecTest {
|
||||
|
||||
Http2SettingsFrame settingsFrame = parentChannel.readInbound();
|
||||
assertNotNull(settingsFrame);
|
||||
Http2SettingsAckFrame settingsAckFrame = parentChannel.readInbound();
|
||||
assertNotNull(settingsAckFrame);
|
||||
|
||||
// Handshake
|
||||
verify(frameWriter).writeSettings(eqMultiplexCodecCtx(),
|
||||
|
@ -0,0 +1,142 @@
|
||||
/*
|
||||
* Copyright 2019 The Netty Project
|
||||
*
|
||||
* The Netty Project licenses this file to you under the Apache License,
|
||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at:
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package io.netty.handler.codec.http2;
|
||||
|
||||
import io.netty.bootstrap.Bootstrap;
|
||||
import io.netty.bootstrap.ServerBootstrap;
|
||||
import io.netty.channel.Channel;
|
||||
import io.netty.channel.ChannelHandler;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.ChannelInitializer;
|
||||
import io.netty.channel.EventLoopGroup;
|
||||
import io.netty.channel.nio.NioEventLoopGroup;
|
||||
import io.netty.channel.socket.nio.NioServerSocketChannel;
|
||||
import io.netty.channel.socket.nio.NioSocketChannel;
|
||||
import io.netty.util.NetUtil;
|
||||
import io.netty.util.ReferenceCountUtil;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import static java.util.concurrent.TimeUnit.MILLISECONDS;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
|
||||
public class Http2MultiplexCodecTransportTest {
|
||||
private EventLoopGroup eventLoopGroup;
|
||||
private Channel clientChannel;
|
||||
private Channel serverChannel;
|
||||
private Channel serverConnectedChannel;
|
||||
|
||||
@Before
|
||||
public void setup() {
|
||||
eventLoopGroup = new NioEventLoopGroup();
|
||||
}
|
||||
|
||||
@After
|
||||
public void teardown() {
|
||||
if (clientChannel != null) {
|
||||
clientChannel.close();
|
||||
}
|
||||
if (serverChannel != null) {
|
||||
serverChannel.close();
|
||||
}
|
||||
if (serverConnectedChannel != null) {
|
||||
serverConnectedChannel.close();
|
||||
}
|
||||
eventLoopGroup.shutdownGracefully(0, 0, MILLISECONDS);
|
||||
}
|
||||
|
||||
@Test(timeout = 10000)
|
||||
public void asyncSettingsAck() throws InterruptedException {
|
||||
// The client expects 2 settings frames. One from the connection setup and one from this test.
|
||||
final CountDownLatch serverAckOneLatch = new CountDownLatch(1);
|
||||
final CountDownLatch serverAckAllLatch = new CountDownLatch(2);
|
||||
final CountDownLatch clientSettingsLatch = new CountDownLatch(2);
|
||||
final CountDownLatch serverConnectedChannelLatch = new CountDownLatch(1);
|
||||
final AtomicReference<Channel> serverConnectedChannelRef = new AtomicReference<Channel>();
|
||||
ServerBootstrap sb = new ServerBootstrap();
|
||||
sb.group(eventLoopGroup);
|
||||
sb.channel(NioServerSocketChannel.class);
|
||||
sb.childHandler(new ChannelInitializer<Channel>() {
|
||||
@Override
|
||||
protected void initChannel(Channel ch) {
|
||||
ch.pipeline().addLast(Http2MultiplexCodecBuilder.forServer(new HttpInboundHandler()).build());
|
||||
ch.pipeline().addLast(new ChannelHandler() {
|
||||
@Override
|
||||
public void channelActive(ChannelHandlerContext ctx) {
|
||||
serverConnectedChannelRef.set(ctx.channel());
|
||||
serverConnectedChannelLatch.countDown();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void channelRead(ChannelHandlerContext ctx, Object msg) {
|
||||
if (msg instanceof Http2SettingsAckFrame) {
|
||||
serverAckOneLatch.countDown();
|
||||
serverAckAllLatch.countDown();
|
||||
}
|
||||
ReferenceCountUtil.release(msg);
|
||||
}
|
||||
});
|
||||
}
|
||||
});
|
||||
serverChannel = sb.bind(new InetSocketAddress(NetUtil.LOCALHOST, 0)).awaitUninterruptibly().channel();
|
||||
|
||||
Bootstrap bs = new Bootstrap();
|
||||
bs.group(eventLoopGroup);
|
||||
bs.channel(NioSocketChannel.class);
|
||||
bs.handler(new ChannelInitializer<Channel>() {
|
||||
@Override
|
||||
protected void initChannel(Channel ch) {
|
||||
ch.pipeline().addLast(Http2MultiplexCodecBuilder
|
||||
.forClient(new HttpInboundHandler()).autoAckSettingsFrame(false).build());
|
||||
ch.pipeline().addLast(new ChannelHandler() {
|
||||
@Override
|
||||
public void channelRead(ChannelHandlerContext ctx, Object msg) {
|
||||
if (msg instanceof Http2SettingsFrame) {
|
||||
clientSettingsLatch.countDown();
|
||||
}
|
||||
ReferenceCountUtil.release(msg);
|
||||
}
|
||||
});
|
||||
}
|
||||
});
|
||||
clientChannel = bs.connect(serverChannel.localAddress()).awaitUninterruptibly().channel();
|
||||
serverConnectedChannelLatch.await();
|
||||
serverConnectedChannel = serverConnectedChannelRef.get();
|
||||
|
||||
serverConnectedChannel.writeAndFlush(new DefaultHttp2SettingsFrame(new Http2Settings()
|
||||
.maxConcurrentStreams(10))).sync();
|
||||
|
||||
clientSettingsLatch.await();
|
||||
|
||||
// We expect a timeout here because we want to asynchronously generate the SETTINGS ACK below.
|
||||
assertFalse(serverAckOneLatch.await(300, MILLISECONDS));
|
||||
|
||||
// We expect 2 settings frames, the initial settings frame during connection establishment and the setting frame
|
||||
// written in this test. We should ack both of these settings frames.
|
||||
clientChannel.writeAndFlush(Http2SettingsAckFrame.INSTANCE).sync();
|
||||
clientChannel.writeAndFlush(Http2SettingsAckFrame.INSTANCE).sync();
|
||||
|
||||
serverAckAllLatch.await();
|
||||
}
|
||||
|
||||
@ChannelHandler.Sharable
|
||||
private static final class HttpInboundHandler implements ChannelHandler { }
|
||||
}
|
Loading…
Reference in New Issue
Block a user