netty5/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionEncoder.java
Scott Mitchell 2fd42cfc6b HTTP/2 Max Header List Size Bug
Motivation:
If the HPACK Decoder detects that SETTINGS_MAX_HEADER_LIST_SIZE has been violated it aborts immediately and sends a RST_STREAM frame for what ever stream caused the issue. Because HPACK is stateful this means that the HPACK state may become out of sync between peers, and the issue won't be detected until the next headers frame. We should make a best effort to keep processing to keep the HPACK state in sync with our peer, or completely close the connection.
If the HPACK Encoder is configured to verify SETTINGS_MAX_HEADER_LIST_SIZE it checks the limit and encodes at the same time. This may result in modifying the HPACK local state but not sending the headers to the peer if SETTINGS_MAX_HEADER_LIST_SIZE is violated. This will also lead to an inconsistency in HPACK state that will be flagged at some later time.

Modifications:
- HPACK Decoder now has 2 levels of limits related to SETTINGS_MAX_HEADER_LIST_SIZE. The first will attempt to keep processing data and send a RST_STREAM after all data is processed. The second will send a GO_AWAY and close the entire connection.
- When the HPACK Encoder enforces SETTINGS_MAX_HEADER_LIST_SIZE it should not modify the HPACK state until the size has been checked.
- https://tools.ietf.org/html/rfc7540#section-6.5.2 states that the initial value of SETTINGS_MAX_HEADER_LIST_SIZE is "unlimited". We currently use 8k as a limit. We should honor the specifications default value so we don't unintentionally close a connection before the remote peer is aware of the local settings.
- Remove unnecessary object allocation in DefaultHttp2HeadersDecoder and DefaultHttp2HeadersEncoder.

Result:
Fixes https://github.com/netty/netty/issues/6209.
2017-01-19 10:42:43 -08:00

506 lines
21 KiB
Java

/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License, version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package io.netty.handler.codec.http2;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.channel.CoalescingBufferQueue;
import io.netty.handler.codec.http2.Http2Exception.ClosedStreamCreationException;
import io.netty.util.internal.UnstableApi;
import java.util.ArrayDeque;
import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_PRIORITY_WEIGHT;
import static io.netty.handler.codec.http2.Http2Error.PROTOCOL_ERROR;
import static io.netty.handler.codec.http2.Http2Exception.connectionError;
import static io.netty.util.internal.ObjectUtil.checkNotNull;
import static java.lang.Integer.MAX_VALUE;
import static java.lang.Math.min;
/**
* Default implementation of {@link Http2ConnectionEncoder}.
*/
@UnstableApi
public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder {
private final Http2FrameWriter frameWriter;
private final Http2Connection connection;
private Http2LifecycleManager lifecycleManager;
// We prefer ArrayDeque to LinkedList because later will produce more GC.
// This initial capacity is plenty for SETTINGS traffic.
private final ArrayDeque<Http2Settings> outstandingLocalSettingsQueue = new ArrayDeque<Http2Settings>(4);
public DefaultHttp2ConnectionEncoder(Http2Connection connection, Http2FrameWriter frameWriter) {
this.connection = checkNotNull(connection, "connection");
this.frameWriter = checkNotNull(frameWriter, "frameWriter");
if (connection.remote().flowController() == null) {
connection.remote().flowController(new DefaultHttp2RemoteFlowController(connection));
}
}
@Override
public void lifecycleManager(Http2LifecycleManager lifecycleManager) {
this.lifecycleManager = checkNotNull(lifecycleManager, "lifecycleManager");
}
@Override
public Http2FrameWriter frameWriter() {
return frameWriter;
}
@Override
public Http2Connection connection() {
return connection;
}
@Override
public final Http2RemoteFlowController flowController() {
return connection().remote().flowController();
}
@Override
public void remoteSettings(Http2Settings settings) throws Http2Exception {
Boolean pushEnabled = settings.pushEnabled();
Http2FrameWriter.Configuration config = configuration();
Http2HeadersEncoder.Configuration outboundHeaderConfig = config.headersConfiguration();
Http2FrameSizePolicy outboundFrameSizePolicy = config.frameSizePolicy();
if (pushEnabled != null) {
if (!connection.isServer() && pushEnabled) {
throw connectionError(PROTOCOL_ERROR,
"Client received a value of ENABLE_PUSH specified to other than 0");
}
connection.remote().allowPushTo(pushEnabled);
}
Long maxConcurrentStreams = settings.maxConcurrentStreams();
if (maxConcurrentStreams != null) {
// TODO(scott): define an extension setting so we can communicate/enforce the maxStreams limit locally.
connection.local().maxStreams((int) min(maxConcurrentStreams, MAX_VALUE), MAX_VALUE);
}
Long headerTableSize = settings.headerTableSize();
if (headerTableSize != null) {
outboundHeaderConfig.maxHeaderTableSize((int) min(headerTableSize, MAX_VALUE));
}
Long maxHeaderListSize = settings.maxHeaderListSize();
if (maxHeaderListSize != null) {
outboundHeaderConfig.maxHeaderListSize(maxHeaderListSize);
}
Integer maxFrameSize = settings.maxFrameSize();
if (maxFrameSize != null) {
outboundFrameSizePolicy.maxFrameSize(maxFrameSize);
}
Integer initialWindowSize = settings.initialWindowSize();
if (initialWindowSize != null) {
flowController().initialWindowSize(initialWindowSize);
}
}
@Override
public ChannelFuture writeData(final ChannelHandlerContext ctx, final int streamId, ByteBuf data, int padding,
final boolean endOfStream, ChannelPromise promise) {
final Http2Stream stream;
try {
stream = requireStream(streamId);
// Verify that the stream is in the appropriate state for sending DATA frames.
switch (stream.state()) {
case OPEN:
case HALF_CLOSED_REMOTE:
// Allowed sending DATA frames in these states.
break;
default:
throw new IllegalStateException(String.format(
"Stream %d in unexpected state: %s", stream.id(), stream.state()));
}
} catch (Throwable e) {
data.release();
return promise.setFailure(e);
}
// Hand control of the frame to the flow controller.
flowController().addFlowControlled(stream,
new FlowControlledData(stream, data, padding, endOfStream, promise));
return promise;
}
@Override
public ChannelFuture writeHeaders(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int padding,
boolean endStream, ChannelPromise promise) {
return writeHeaders(ctx, streamId, headers, 0, DEFAULT_PRIORITY_WEIGHT, false, padding, endStream, promise);
}
@Override
public ChannelFuture writeHeaders(final ChannelHandlerContext ctx, final int streamId,
final Http2Headers headers, final int streamDependency, final short weight,
final boolean exclusive, final int padding, final boolean endOfStream, ChannelPromise promise) {
try {
Http2Stream stream = connection.stream(streamId);
if (stream == null) {
stream = connection.local().createStream(streamId, endOfStream);
} else {
switch (stream.state()) {
case RESERVED_LOCAL:
stream.open(endOfStream);
break;
case OPEN:
case HALF_CLOSED_REMOTE:
// Allowed sending headers in these states.
break;
default:
throw new IllegalStateException(String.format(
"Stream %d in unexpected state: %s", stream.id(), stream.state()));
}
}
// Trailing headers must go through flow control if there are other frames queued in flow control
// for this stream.
Http2RemoteFlowController flowController = flowController();
if (!endOfStream || !flowController.hasFlowControlled(stream)) {
if (endOfStream) {
final Http2Stream finalStream = stream;
final ChannelFutureListener closeStreamLocalListener = new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
lifecycleManager.closeStreamLocal(finalStream, future);
}
};
promise = promise.unvoid().addListener(closeStreamLocalListener);
}
ChannelFuture future = frameWriter.writeHeaders(ctx, streamId, headers, streamDependency,
weight, exclusive, padding, endOfStream, promise);
// Synchronously set the headersSent flag to ensure that we do not subsequently write
// other headers containing pseudo-header fields.
stream.headersSent();
return future;
} else {
// Pass headers to the flow-controller so it can maintain their sequence relative to DATA frames.
flowController.addFlowControlled(stream,
new FlowControlledHeaders(stream, headers, streamDependency, weight, exclusive, padding,
endOfStream, promise));
return promise;
}
} catch (Throwable t) {
lifecycleManager.onError(ctx, t);
promise.tryFailure(t);
return promise;
}
}
@Override
public ChannelFuture writePriority(ChannelHandlerContext ctx, int streamId, int streamDependency, short weight,
boolean exclusive, ChannelPromise promise) {
try {
// Update the priority on this stream.
Http2Stream stream = connection.stream(streamId);
if (stream == null) {
stream = connection.local().createIdleStream(streamId);
}
// The set priority operation must be done before sending the frame. The parent may not yet exist
// and the priority tree may also be modified before sending.
stream.setPriority(streamDependency, weight, exclusive);
} catch (ClosedStreamCreationException ignored) {
// It is possible that either the stream for this frame or the parent stream is closed.
// In this case we should ignore the exception and allow the frame to be sent.
} catch (Throwable t) {
return promise.setFailure(t);
}
return frameWriter.writePriority(ctx, streamId, streamDependency, weight, exclusive, promise);
}
@Override
public ChannelFuture writeRstStream(ChannelHandlerContext ctx, int streamId, long errorCode,
ChannelPromise promise) {
// Delegate to the lifecycle manager for proper updating of connection state.
return lifecycleManager.resetStream(ctx, streamId, errorCode, promise);
}
@Override
public ChannelFuture writeSettings(ChannelHandlerContext ctx, Http2Settings settings,
ChannelPromise promise) {
outstandingLocalSettingsQueue.add(settings);
try {
Boolean pushEnabled = settings.pushEnabled();
if (pushEnabled != null && connection.isServer()) {
throw connectionError(PROTOCOL_ERROR, "Server sending SETTINGS frame with ENABLE_PUSH specified");
}
} catch (Throwable e) {
return promise.setFailure(e);
}
return frameWriter.writeSettings(ctx, settings, promise);
}
@Override
public ChannelFuture writeSettingsAck(ChannelHandlerContext ctx, ChannelPromise promise) {
return frameWriter.writeSettingsAck(ctx, promise);
}
@Override
public ChannelFuture writePing(ChannelHandlerContext ctx, boolean ack, ByteBuf data, ChannelPromise promise) {
return frameWriter.writePing(ctx, ack, data, promise);
}
@Override
public ChannelFuture writePushPromise(ChannelHandlerContext ctx, int streamId, int promisedStreamId,
Http2Headers headers, int padding, ChannelPromise promise) {
try {
if (connection.goAwayReceived()) {
throw connectionError(PROTOCOL_ERROR, "Sending PUSH_PROMISE after GO_AWAY received.");
}
Http2Stream stream = requireStream(streamId);
// Reserve the promised stream.
connection.local().reservePushStream(promisedStreamId, stream);
ChannelFuture future = frameWriter.writePushPromise(ctx, streamId, promisedStreamId, headers, padding,
promise);
stream.pushPromiseSent();
return future;
} catch (Throwable t) {
lifecycleManager.onError(ctx, t);
promise.tryFailure(t);
return promise;
}
}
@Override
public ChannelFuture writeGoAway(ChannelHandlerContext ctx, int lastStreamId, long errorCode, ByteBuf debugData,
ChannelPromise promise) {
return lifecycleManager.goAway(ctx, lastStreamId, errorCode, debugData, promise);
}
@Override
public ChannelFuture writeWindowUpdate(ChannelHandlerContext ctx, int streamId, int windowSizeIncrement,
ChannelPromise promise) {
return promise.setFailure(new UnsupportedOperationException("Use the Http2[Inbound|Outbound]FlowController" +
" objects to control window sizes"));
}
@Override
public ChannelFuture writeFrame(ChannelHandlerContext ctx, byte frameType, int streamId, Http2Flags flags,
ByteBuf payload, ChannelPromise promise) {
return frameWriter.writeFrame(ctx, frameType, streamId, flags, payload, promise);
}
@Override
public void close() {
frameWriter.close();
}
@Override
public Http2Settings pollSentSettings() {
return outstandingLocalSettingsQueue.poll();
}
@Override
public Configuration configuration() {
return frameWriter.configuration();
}
private Http2Stream requireStream(int streamId) {
Http2Stream stream = connection.stream(streamId);
if (stream == null) {
final String message;
if (connection.streamMayHaveExisted(streamId)) {
message = "Stream no longer exists: " + streamId;
} else {
message = "Stream does not exist: " + streamId;
}
throw new IllegalArgumentException(message);
}
return stream;
}
/**
* Wrap a DATA frame so it can be written subject to flow-control. Note that this implementation assumes it
* only writes padding once for the entire payload as opposed to writing it once per-frame. This makes the
* {@link #size} calculation deterministic thereby greatly simplifying the implementation.
* <p>
* If frame-splitting is required to fit within max-frame-size and flow-control constraints we ensure that
* the passed promise is not completed until last frame write.
* </p>
*/
private final class FlowControlledData extends FlowControlledBase {
private final CoalescingBufferQueue queue;
private int dataSize;
FlowControlledData(Http2Stream stream, ByteBuf buf, int padding, boolean endOfStream,
ChannelPromise promise) {
super(stream, padding, endOfStream, promise);
queue = new CoalescingBufferQueue(promise.channel());
queue.add(buf, promise);
dataSize = queue.readableBytes();
}
@Override
public int size() {
return dataSize + padding;
}
@Override
public void error(ChannelHandlerContext ctx, Throwable cause) {
queue.releaseAndFailAll(cause);
// Don't update dataSize because we need to ensure the size() method returns a consistent size even after
// error so we don't invalidate flow control when returning bytes to flow control.
lifecycleManager.onError(ctx, cause);
}
@Override
public void write(ChannelHandlerContext ctx, int allowedBytes) {
int queuedData = queue.readableBytes();
if (!endOfStream) {
if (queuedData == 0) {
// There's no need to write any data frames because there are only empty data frames in the queue
// and it is not end of stream yet. Just complete their promises by writing an empty buffer.
ChannelPromise writePromise = ctx.newPromise().addListener(this);
queue.remove(0, writePromise).release();
ctx.write(Unpooled.EMPTY_BUFFER, writePromise);
return;
}
if (allowedBytes == 0) {
return;
}
}
// Determine how much data to write.
int writeableData = min(queuedData, allowedBytes);
ChannelPromise writePromise = ctx.newPromise().addListener(this);
ByteBuf toWrite = queue.remove(writeableData, writePromise);
dataSize = queue.readableBytes();
// Determine how much padding to write.
int writeablePadding = min(allowedBytes - writeableData, padding);
padding -= writeablePadding;
// Write the frame(s).
frameWriter().writeData(ctx, stream.id(), toWrite, writeablePadding,
endOfStream && size() == 0, writePromise);
}
@Override
public boolean merge(ChannelHandlerContext ctx, Http2RemoteFlowController.FlowControlled next) {
FlowControlledData nextData;
if (FlowControlledData.class != next.getClass() ||
MAX_VALUE - (nextData = (FlowControlledData) next).size() < size()) {
return false;
}
nextData.queue.copyTo(queue);
dataSize = queue.readableBytes();
// Given that we're merging data into a frame it doesn't really make sense to accumulate padding.
padding = Math.max(padding, nextData.padding);
endOfStream = nextData.endOfStream;
return true;
}
}
/**
* Wrap headers so they can be written subject to flow-control. While headers do not have cost against the
* flow-control window their order with respect to other frames must be maintained, hence if a DATA frame is
* blocked on flow-control a HEADER frame must wait until this frame has been written.
*/
private final class FlowControlledHeaders extends FlowControlledBase {
private final Http2Headers headers;
private final int streamDependency;
private final short weight;
private final boolean exclusive;
FlowControlledHeaders(Http2Stream stream, Http2Headers headers, int streamDependency, short weight,
boolean exclusive, int padding, boolean endOfStream, ChannelPromise promise) {
super(stream, padding, endOfStream, promise);
this.headers = headers;
this.streamDependency = streamDependency;
this.weight = weight;
this.exclusive = exclusive;
}
@Override
public int size() {
return 0;
}
@Override
public void error(ChannelHandlerContext ctx, Throwable cause) {
if (ctx != null) {
lifecycleManager.onError(ctx, cause);
}
promise.tryFailure(cause);
}
@Override
public void write(ChannelHandlerContext ctx, int allowedBytes) throws Http2Exception {
if (promise.isVoid()) {
promise = ctx.newPromise();
}
promise.addListener(this);
frameWriter.writeHeaders(ctx, stream.id(), headers, streamDependency, weight, exclusive,
padding, endOfStream, promise);
stream.headersSent();
}
@Override
public boolean merge(ChannelHandlerContext ctx, Http2RemoteFlowController.FlowControlled next) {
return false;
}
}
/**
* Common base type for payloads to deliver via flow-control.
*/
public abstract class FlowControlledBase implements Http2RemoteFlowController.FlowControlled,
ChannelFutureListener {
protected final Http2Stream stream;
protected ChannelPromise promise;
protected boolean endOfStream;
protected int padding;
protected FlowControlledBase(final Http2Stream stream, int padding, boolean endOfStream,
final ChannelPromise promise) {
if (padding < 0) {
throw new IllegalArgumentException("padding must be >= 0");
}
this.padding = padding;
this.endOfStream = endOfStream;
this.stream = stream;
this.promise = promise;
}
@Override
public void writeComplete() {
if (endOfStream) {
lifecycleManager.closeStreamLocal(stream, promise);
}
}
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
error(flowController().channelHandlerContext(), future.cause());
}
}
}
}