/* * Copyright 2014 The Netty Project * * The Netty Project licenses this file to you under the Apache License, version 2.0 (the * "License"); you may not use this file except in compliance with the License. You may obtain a * copy of the License at: * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software distributed under the License * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express * or implied. See the License for the specific language governing permissions and limitations under * the License. */ package io.netty.handler.codec.http2; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPromise; import io.netty.channel.CoalescingBufferQueue; import io.netty.handler.codec.http2.Http2Exception.ClosedStreamCreationException; import io.netty.util.internal.UnstableApi; import java.util.ArrayDeque; import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_PRIORITY_WEIGHT; import static io.netty.handler.codec.http2.Http2Error.PROTOCOL_ERROR; import static io.netty.handler.codec.http2.Http2Exception.connectionError; import static io.netty.util.internal.ObjectUtil.checkNotNull; import static java.lang.Integer.MAX_VALUE; import static java.lang.Math.min; /** * Default implementation of {@link Http2ConnectionEncoder}. */ @UnstableApi public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder { private final Http2FrameWriter frameWriter; private final Http2Connection connection; private Http2LifecycleManager lifecycleManager; // We prefer ArrayDeque to LinkedList because later will produce more GC. // This initial capacity is plenty for SETTINGS traffic. private final ArrayDeque outstandingLocalSettingsQueue = new ArrayDeque(4); public DefaultHttp2ConnectionEncoder(Http2Connection connection, Http2FrameWriter frameWriter) { this.connection = checkNotNull(connection, "connection"); this.frameWriter = checkNotNull(frameWriter, "frameWriter"); if (connection.remote().flowController() == null) { connection.remote().flowController(new DefaultHttp2RemoteFlowController(connection)); } } @Override public void lifecycleManager(Http2LifecycleManager lifecycleManager) { this.lifecycleManager = checkNotNull(lifecycleManager, "lifecycleManager"); } @Override public Http2FrameWriter frameWriter() { return frameWriter; } @Override public Http2Connection connection() { return connection; } @Override public final Http2RemoteFlowController flowController() { return connection().remote().flowController(); } @Override public void remoteSettings(Http2Settings settings) throws Http2Exception { Boolean pushEnabled = settings.pushEnabled(); Http2FrameWriter.Configuration config = configuration(); Http2HeaderTable outboundHeaderTable = config.headerTable(); Http2FrameSizePolicy outboundFrameSizePolicy = config.frameSizePolicy(); if (pushEnabled != null) { if (!connection.isServer() && pushEnabled) { throw connectionError(PROTOCOL_ERROR, "Client received a value of ENABLE_PUSH specified to other than 0"); } connection.remote().allowPushTo(pushEnabled); } Long maxConcurrentStreams = settings.maxConcurrentStreams(); if (maxConcurrentStreams != null) { // TODO(scott): define an extension setting so we can communicate/enforce the maxStreams limit locally. connection.local().maxStreams((int) min(maxConcurrentStreams, MAX_VALUE), MAX_VALUE); } Long headerTableSize = settings.headerTableSize(); if (headerTableSize != null) { outboundHeaderTable.maxHeaderTableSize((int) min(headerTableSize, MAX_VALUE)); } Integer maxHeaderListSize = settings.maxHeaderListSize(); if (maxHeaderListSize != null) { outboundHeaderTable.maxHeaderListSize(maxHeaderListSize); } Integer maxFrameSize = settings.maxFrameSize(); if (maxFrameSize != null) { outboundFrameSizePolicy.maxFrameSize(maxFrameSize); } Integer initialWindowSize = settings.initialWindowSize(); if (initialWindowSize != null) { flowController().initialWindowSize(initialWindowSize); } } @Override public ChannelFuture writeData(final ChannelHandlerContext ctx, final int streamId, ByteBuf data, int padding, final boolean endOfStream, ChannelPromise promise) { final Http2Stream stream; try { stream = requireStream(streamId); // Verify that the stream is in the appropriate state for sending DATA frames. switch (stream.state()) { case OPEN: case HALF_CLOSED_REMOTE: // Allowed sending DATA frames in these states. break; default: throw new IllegalStateException(String.format( "Stream %d in unexpected state: %s", stream.id(), stream.state())); } } catch (Throwable e) { data.release(); return promise.setFailure(e); } // Hand control of the frame to the flow controller. flowController().addFlowControlled(stream, new FlowControlledData(stream, data, padding, endOfStream, promise)); return promise; } @Override public ChannelFuture writeHeaders(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int padding, boolean endStream, ChannelPromise promise) { return writeHeaders(ctx, streamId, headers, 0, DEFAULT_PRIORITY_WEIGHT, false, padding, endStream, promise); } @Override public ChannelFuture writeHeaders(final ChannelHandlerContext ctx, final int streamId, final Http2Headers headers, final int streamDependency, final short weight, final boolean exclusive, final int padding, final boolean endOfStream, final ChannelPromise promise) { try { Http2Stream stream = connection.stream(streamId); if (stream == null) { stream = connection.local().createStream(streamId, endOfStream); } else { switch (stream.state()) { case RESERVED_LOCAL: stream.open(endOfStream); break; case OPEN: case HALF_CLOSED_REMOTE: // Allowed sending headers in these states. break; default: throw new IllegalStateException(String.format( "Stream %d in unexpected state: %s", stream.id(), stream.state())); } } // Trailing headers must go through flow control if there are other frames queued in flow control // for this stream. Http2RemoteFlowController flowController = flowController(); if (!endOfStream || !flowController.hasFlowControlled(stream)) { ChannelFuture future = frameWriter.writeHeaders(ctx, streamId, headers, streamDependency, weight, exclusive, padding, endOfStream, promise); if (endOfStream) { final Http2Stream finalStream = stream; future.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { lifecycleManager.closeStreamLocal(finalStream, promise); } }); } return future; } else { // Pass headers to the flow-controller so it can maintain their sequence relative to DATA frames. flowController.addFlowControlled(stream, new FlowControlledHeaders(stream, headers, streamDependency, weight, exclusive, padding, endOfStream, promise)); return promise; } } catch (Http2NoMoreStreamIdsException e) { lifecycleManager.onError(ctx, e); return promise.setFailure(e); } catch (Throwable e) { return promise.setFailure(e); } } @Override public ChannelFuture writePriority(ChannelHandlerContext ctx, int streamId, int streamDependency, short weight, boolean exclusive, ChannelPromise promise) { try { // Update the priority on this stream. Http2Stream stream = connection.stream(streamId); if (stream == null) { stream = connection.local().createIdleStream(streamId); } // The set priority operation must be done before sending the frame. The parent may not yet exist // and the priority tree may also be modified before sending. stream.setPriority(streamDependency, weight, exclusive); } catch (ClosedStreamCreationException ignored) { // It is possible that either the stream for this frame or the parent stream is closed. // In this case we should ignore the exception and allow the frame to be sent. } catch (Throwable t) { return promise.setFailure(t); } return frameWriter.writePriority(ctx, streamId, streamDependency, weight, exclusive, promise); } @Override public ChannelFuture writeRstStream(ChannelHandlerContext ctx, int streamId, long errorCode, ChannelPromise promise) { // Delegate to the lifecycle manager for proper updating of connection state. return lifecycleManager.resetStream(ctx, streamId, errorCode, promise); } @Override public ChannelFuture writeSettings(ChannelHandlerContext ctx, Http2Settings settings, ChannelPromise promise) { outstandingLocalSettingsQueue.add(settings); try { Boolean pushEnabled = settings.pushEnabled(); if (pushEnabled != null && connection.isServer()) { throw connectionError(PROTOCOL_ERROR, "Server sending SETTINGS frame with ENABLE_PUSH specified"); } } catch (Throwable e) { return promise.setFailure(e); } return frameWriter.writeSettings(ctx, settings, promise); } @Override public ChannelFuture writeSettingsAck(ChannelHandlerContext ctx, ChannelPromise promise) { return frameWriter.writeSettingsAck(ctx, promise); } @Override public ChannelFuture writePing(ChannelHandlerContext ctx, boolean ack, ByteBuf data, ChannelPromise promise) { return frameWriter.writePing(ctx, ack, data, promise); } @Override public ChannelFuture writePushPromise(ChannelHandlerContext ctx, int streamId, int promisedStreamId, Http2Headers headers, int padding, ChannelPromise promise) { try { if (connection.goAwayReceived()) { throw connectionError(PROTOCOL_ERROR, "Sending PUSH_PROMISE after GO_AWAY received."); } Http2Stream stream = requireStream(streamId); // Reserve the promised stream. connection.local().reservePushStream(promisedStreamId, stream); } catch (Throwable e) { return promise.setFailure(e); } return frameWriter.writePushPromise(ctx, streamId, promisedStreamId, headers, padding, promise); } @Override public ChannelFuture writeGoAway(ChannelHandlerContext ctx, int lastStreamId, long errorCode, ByteBuf debugData, ChannelPromise promise) { return lifecycleManager.goAway(ctx, lastStreamId, errorCode, debugData, promise); } @Override public ChannelFuture writeWindowUpdate(ChannelHandlerContext ctx, int streamId, int windowSizeIncrement, ChannelPromise promise) { return promise.setFailure(new UnsupportedOperationException("Use the Http2[Inbound|Outbound]FlowController" + " objects to control window sizes")); } @Override public ChannelFuture writeFrame(ChannelHandlerContext ctx, byte frameType, int streamId, Http2Flags flags, ByteBuf payload, ChannelPromise promise) { return frameWriter.writeFrame(ctx, frameType, streamId, flags, payload, promise); } @Override public void close() { frameWriter.close(); } @Override public Http2Settings pollSentSettings() { return outstandingLocalSettingsQueue.poll(); } @Override public Configuration configuration() { return frameWriter.configuration(); } private Http2Stream requireStream(int streamId) { Http2Stream stream = connection.stream(streamId); if (stream == null) { final String message; if (connection.streamMayHaveExisted(streamId)) { message = "Stream no longer exists: " + streamId; } else { message = "Stream does not exist: " + streamId; } throw new IllegalArgumentException(message); } return stream; } /** * Wrap a DATA frame so it can be written subject to flow-control. Note that this implementation assumes it * only writes padding once for the entire payload as opposed to writing it once per-frame. This makes the * {@link #size} calculation deterministic thereby greatly simplifying the implementation. *

* If frame-splitting is required to fit within max-frame-size and flow-control constraints we ensure that * the passed promise is not completed until last frame write. *

*/ private final class FlowControlledData extends FlowControlledBase { private final CoalescingBufferQueue queue; private int dataSize; public FlowControlledData(Http2Stream stream, ByteBuf buf, int padding, boolean endOfStream, ChannelPromise promise) { super(stream, padding, endOfStream, promise); queue = new CoalescingBufferQueue(promise.channel()); queue.add(buf, promise); dataSize = queue.readableBytes(); } @Override public int size() { return dataSize + padding; } @Override public void error(ChannelHandlerContext ctx, Throwable cause) { queue.releaseAndFailAll(cause); // Don't update dataSize because we need to ensure the size() method returns a consistent size even after // error so we don't invalidate flow control when returning bytes to flow control. lifecycleManager.onError(ctx, cause); } @Override public void write(ChannelHandlerContext ctx, int allowedBytes) { int queuedData = queue.readableBytes(); if (!endOfStream && (queuedData == 0 || allowedBytes == 0)) { // Nothing to write and we don't have to force a write because of EOS. return; } // Determine how much data to write. int writeableData = min(queuedData, allowedBytes); ChannelPromise writePromise = ctx.newPromise().addListener(this); ByteBuf toWrite = queue.remove(writeableData, writePromise); dataSize = queue.readableBytes(); // Determine how much padding to write. int writeablePadding = min(allowedBytes - writeableData, padding); padding -= writeablePadding; // Write the frame(s). frameWriter().writeData(ctx, stream.id(), toWrite, writeablePadding, endOfStream && size() == 0, writePromise); } @Override public boolean merge(ChannelHandlerContext ctx, Http2RemoteFlowController.FlowControlled next) { FlowControlledData nextData; if (FlowControlledData.class != next.getClass() || MAX_VALUE - (nextData = (FlowControlledData) next).size() < size()) { return false; } nextData.queue.copyTo(queue); dataSize = queue.readableBytes(); // Given that we're merging data into a frame it doesn't really make sense to accumulate padding. padding = Math.max(padding, nextData.padding); endOfStream = nextData.endOfStream; return true; } } /** * Wrap headers so they can be written subject to flow-control. While headers do not have cost against the * flow-control window their order with respect to other frames must be maintained, hence if a DATA frame is * blocked on flow-control a HEADER frame must wait until this frame has been written. */ private final class FlowControlledHeaders extends FlowControlledBase { private final Http2Headers headers; private final int streamDependency; private final short weight; private final boolean exclusive; public FlowControlledHeaders(Http2Stream stream, Http2Headers headers, int streamDependency, short weight, boolean exclusive, int padding, boolean endOfStream, ChannelPromise promise) { super(stream, padding, endOfStream, promise); this.headers = headers; this.streamDependency = streamDependency; this.weight = weight; this.exclusive = exclusive; } @Override public int size() { return 0; } @Override public void error(ChannelHandlerContext ctx, Throwable cause) { if (ctx != null) { lifecycleManager.onError(ctx, cause); } promise.tryFailure(cause); } @Override public void write(ChannelHandlerContext ctx, int allowedBytes) { if (promise.isVoid()) { promise = ctx.newPromise(); } promise.addListener(this); frameWriter.writeHeaders(ctx, stream.id(), headers, streamDependency, weight, exclusive, padding, endOfStream, promise); } @Override public boolean merge(ChannelHandlerContext ctx, Http2RemoteFlowController.FlowControlled next) { return false; } } /** * Common base type for payloads to deliver via flow-control. */ public abstract class FlowControlledBase implements Http2RemoteFlowController.FlowControlled, ChannelFutureListener { protected final Http2Stream stream; protected ChannelPromise promise; protected boolean endOfStream; protected int padding; public FlowControlledBase(final Http2Stream stream, int padding, boolean endOfStream, final ChannelPromise promise) { if (padding < 0) { throw new IllegalArgumentException("padding must be >= 0"); } this.padding = padding; this.endOfStream = endOfStream; this.stream = stream; this.promise = promise; } @Override public void writeComplete() { if (endOfStream) { lifecycleManager.closeStreamLocal(stream, promise); } } @Override public void operationComplete(ChannelFuture future) throws Exception { if (!future.isSuccess()) { error(flowController().channelHandlerContext(), future.cause()); } } } }