HTTP/2 Codec Read/Write Restructure
Motivation: The HTTP/2 codec has some duplication and the read/write interfaces are not cleanly exposed to users of the codec. Modifications: -Restructure the AbstractHttp2ConnectionHandler class to be able to extend write behavior before the outbound flow control gets the data -Add Http2InboundConnectionHandler and Http2OutboundConnectionHandler interfaces and restructure external codec interface around these concepts Result: HTTP/2 codec provides a cleaner external interface which is easy to extend for read/write events.
This commit is contained in:
parent
2d6d1fa139
commit
95cec357ee
File diff suppressed because it is too large
Load Diff
@ -29,6 +29,8 @@ import static io.netty.handler.codec.http2.Http2Stream.State.IDLE;
|
||||
import static io.netty.handler.codec.http2.Http2Stream.State.OPEN;
|
||||
import static io.netty.handler.codec.http2.Http2Stream.State.RESERVED_LOCAL;
|
||||
import static io.netty.handler.codec.http2.Http2Stream.State.RESERVED_REMOTE;
|
||||
import io.netty.channel.ChannelFuture;
|
||||
import io.netty.channel.ChannelFutureListener;
|
||||
import io.netty.channel.embedded.EmbeddedChannel;
|
||||
import io.netty.handler.codec.http2.Http2StreamRemovalPolicy.Action;
|
||||
import io.netty.util.collection.IntObjectHashMap;
|
||||
@ -153,6 +155,27 @@ public class DefaultHttp2Connection implements Http2Connection {
|
||||
return localEndpoint.isGoAwayReceived() || remoteEndpoint.isGoAwayReceived();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Http2Stream createLocalStream(int streamId, boolean halfClosed) throws Http2Exception {
|
||||
return local().createStream(streamId, halfClosed);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Http2Stream createRemoteStream(int streamId, boolean halfClosed) throws Http2Exception {
|
||||
return remote().createStream(streamId, halfClosed);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close(Http2Stream stream, ChannelFuture future, ChannelFutureListener closeListener) {
|
||||
stream.close();
|
||||
|
||||
// If this connection is closing and there are no longer any
|
||||
// active streams, close after the current operation completes.
|
||||
if (closeListener != null && numActiveStreams() == 0) {
|
||||
future.addListener(closeListener);
|
||||
}
|
||||
}
|
||||
|
||||
private void removeStream(DefaultStream stream) {
|
||||
// Notify the listeners of the event first.
|
||||
for (Listener listener : listeners) {
|
||||
|
@ -23,6 +23,7 @@ import static io.netty.handler.codec.http2.Http2CodecUtil.SETTINGS_MAX_FRAME_SIZ
|
||||
import static io.netty.handler.codec.http2.Http2CodecUtil.SETTING_ENTRY_LENGTH;
|
||||
import static io.netty.handler.codec.http2.Http2CodecUtil.isMaxFrameSizeValid;
|
||||
import static io.netty.handler.codec.http2.Http2CodecUtil.readUnsignedInt;
|
||||
import static io.netty.handler.codec.http2.Http2Error.FRAME_SIZE_ERROR;
|
||||
import static io.netty.handler.codec.http2.Http2Exception.protocolError;
|
||||
import static io.netty.handler.codec.http2.Http2FrameTypes.CONTINUATION;
|
||||
import static io.netty.handler.codec.http2.Http2FrameTypes.DATA;
|
||||
@ -37,12 +38,12 @@ import static io.netty.handler.codec.http2.Http2FrameTypes.WINDOW_UPDATE;
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.buffer.ByteBufAllocator;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.handler.codec.http2.Http2FrameReader.Configuration;
|
||||
|
||||
/**
|
||||
* A {@link Http2FrameReader} that supports all frame types defined by the HTTP/2 specification.
|
||||
*/
|
||||
public class DefaultHttp2FrameReader implements Http2FrameReader {
|
||||
|
||||
public class DefaultHttp2FrameReader implements Http2FrameReader, Http2FrameSizePolicy, Configuration {
|
||||
private enum State {
|
||||
FRAME_HEADER,
|
||||
FRAME_PAYLOAD,
|
||||
@ -57,7 +58,7 @@ public class DefaultHttp2FrameReader implements Http2FrameReader {
|
||||
private Http2Flags flags;
|
||||
private int payloadLength;
|
||||
private HeadersContinuation headersContinuation;
|
||||
private int maxFrameSize = DEFAULT_MAX_FRAME_SIZE;
|
||||
private int maxFrameSize;
|
||||
|
||||
public DefaultHttp2FrameReader() {
|
||||
this(new DefaultHttp2HeadersDecoder());
|
||||
@ -65,22 +66,28 @@ public class DefaultHttp2FrameReader implements Http2FrameReader {
|
||||
|
||||
public DefaultHttp2FrameReader(Http2HeadersDecoder headersDecoder) {
|
||||
this.headersDecoder = headersDecoder;
|
||||
maxFrameSize = DEFAULT_MAX_FRAME_SIZE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void maxHeaderTableSize(long max) {
|
||||
headersDecoder.maxHeaderTableSize((int) Math.min(max, Integer.MAX_VALUE));
|
||||
public Http2HeaderTable headerTable() {
|
||||
return headersDecoder.configuration().headerTable();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long maxHeaderTableSize() {
|
||||
return headersDecoder.maxHeaderTableSize();
|
||||
public Configuration configuration() {
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void maxFrameSize(int max) {
|
||||
public Http2FrameSizePolicy frameSizePolicy() {
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void maxFrameSize(int max) throws Http2Exception {
|
||||
if (!isMaxFrameSizeValid(max)) {
|
||||
throw new IllegalArgumentException("maxFrameSize is invalid: " + max);
|
||||
Http2Exception.format(FRAME_SIZE_ERROR, "Invalid MAX_FRAME_SIZE specified in sent settings: %d", max);
|
||||
}
|
||||
maxFrameSize = max;
|
||||
}
|
||||
@ -90,16 +97,6 @@ public class DefaultHttp2FrameReader implements Http2FrameReader {
|
||||
return maxFrameSize;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void maxHeaderListSize(int max) {
|
||||
headersDecoder.maxHeaderListSize(max);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int maxHeaderListSize() {
|
||||
return headersDecoder.maxHeaderListSize();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
if (headersContinuation != null) {
|
||||
@ -643,7 +640,7 @@ public class DefaultHttp2FrameReader implements Http2FrameReader {
|
||||
return;
|
||||
}
|
||||
if (headerBlock.isWritable(fragment.readableBytes())) {
|
||||
// The buffer can hold the requeste bytes, just write it directly.
|
||||
// The buffer can hold the requested bytes, just write it directly.
|
||||
headerBlock.writeBytes(fragment);
|
||||
} else {
|
||||
// Allocate a new buffer that is big enough to hold the entire header block so far.
|
||||
|
@ -28,6 +28,7 @@ import static io.netty.handler.codec.http2.Http2CodecUtil.isMaxFrameSizeValid;
|
||||
import static io.netty.handler.codec.http2.Http2CodecUtil.writeFrameHeader;
|
||||
import static io.netty.handler.codec.http2.Http2CodecUtil.writeUnsignedInt;
|
||||
import static io.netty.handler.codec.http2.Http2CodecUtil.writeUnsignedShort;
|
||||
import static io.netty.handler.codec.http2.Http2Error.FRAME_SIZE_ERROR;
|
||||
import static io.netty.handler.codec.http2.Http2FrameTypes.CONTINUATION;
|
||||
import static io.netty.handler.codec.http2.Http2FrameTypes.DATA;
|
||||
import static io.netty.handler.codec.http2.Http2FrameTypes.GO_AWAY;
|
||||
@ -43,15 +44,15 @@ import io.netty.buffer.CompositeByteBuf;
|
||||
import io.netty.channel.ChannelFuture;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.ChannelPromise;
|
||||
import io.netty.handler.codec.http2.Http2FrameWriter.Configuration;
|
||||
import io.netty.util.collection.IntObjectMap;
|
||||
|
||||
/**
|
||||
* A {@link Http2FrameWriter} that supports all frame types defined by the HTTP/2 specification.
|
||||
*/
|
||||
public class DefaultHttp2FrameWriter implements Http2FrameWriter {
|
||||
|
||||
public class DefaultHttp2FrameWriter implements Http2FrameWriter, Http2FrameSizePolicy, Configuration {
|
||||
private final Http2HeadersEncoder headersEncoder;
|
||||
private int maxFrameSize = DEFAULT_MAX_FRAME_SIZE;
|
||||
private int maxFrameSize;
|
||||
|
||||
public DefaultHttp2FrameWriter() {
|
||||
this(new DefaultHttp2HeadersEncoder());
|
||||
@ -59,22 +60,28 @@ public class DefaultHttp2FrameWriter implements Http2FrameWriter {
|
||||
|
||||
public DefaultHttp2FrameWriter(Http2HeadersEncoder headersEncoder) {
|
||||
this.headersEncoder = headersEncoder;
|
||||
maxFrameSize = DEFAULT_MAX_FRAME_SIZE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void maxHeaderTableSize(long max) throws Http2Exception {
|
||||
headersEncoder.maxHeaderTableSize((int) Math.min(max, Integer.MAX_VALUE));
|
||||
public Configuration configuration() {
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long maxHeaderTableSize() {
|
||||
return headersEncoder.maxHeaderTableSize();
|
||||
public Http2HeaderTable headerTable() {
|
||||
return headersEncoder.configuration().headerTable();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void maxFrameSize(int max) {
|
||||
public Http2FrameSizePolicy frameSizePolicy() {
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void maxFrameSize(int max) throws Http2Exception {
|
||||
if (!isMaxFrameSizeValid(max)) {
|
||||
throw new IllegalArgumentException("maxFrameSize is invalid: " + max);
|
||||
Http2Exception.format(FRAME_SIZE_ERROR, "Invalid MAX_FRAME_SIZE specified in sent settings: %d", max);
|
||||
}
|
||||
maxFrameSize = max;
|
||||
}
|
||||
@ -85,19 +92,7 @@ public class DefaultHttp2FrameWriter implements Http2FrameWriter {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void maxHeaderListSize(int max) {
|
||||
headersEncoder.maxHeaderListSize(max);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int maxHeaderListSize() {
|
||||
return headersEncoder.maxHeaderListSize();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
// Nothing to do.
|
||||
}
|
||||
public void close() { }
|
||||
|
||||
@Override
|
||||
public ChannelFuture writeData(ChannelHandlerContext ctx, int streamId, ByteBuf data,
|
||||
|
@ -0,0 +1,33 @@
|
||||
/*
|
||||
* Copyright 2014 The Netty Project
|
||||
*
|
||||
* The Netty Project licenses this file to you under the Apache License, version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance with the License. You may obtain a
|
||||
* copy of the License at:
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software distributed under the License
|
||||
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
|
||||
* or implied. See the License for the specific language governing permissions and limitations under
|
||||
* the License.
|
||||
*/
|
||||
package io.netty.handler.codec.http2;
|
||||
|
||||
/**
|
||||
* Provides common functionality for {@link Http2HeaderTable}
|
||||
*/
|
||||
class DefaultHttp2HeaderTableListSize {
|
||||
private int maxHeaderListSize = Integer.MAX_VALUE;
|
||||
|
||||
public void maxHeaderListSize(int max) throws Http2Exception {
|
||||
if (max < 0) {
|
||||
throw Http2Exception.protocolError("Header List Size must be non-negative but was %d", max);
|
||||
}
|
||||
maxHeaderListSize = max;
|
||||
}
|
||||
|
||||
public int maxHeaderListSize() {
|
||||
return maxHeaderListSize;
|
||||
}
|
||||
}
|
@ -29,10 +29,9 @@ import java.io.InputStream;
|
||||
import com.twitter.hpack.Decoder;
|
||||
import com.twitter.hpack.HeaderListener;
|
||||
|
||||
public class DefaultHttp2HeadersDecoder implements Http2HeadersDecoder {
|
||||
|
||||
public class DefaultHttp2HeadersDecoder implements Http2HeadersDecoder, Http2HeadersDecoder.Configuration {
|
||||
private final Decoder decoder;
|
||||
private int maxHeaderListSize = Integer.MAX_VALUE;
|
||||
private final Http2HeaderTable headerTable;
|
||||
|
||||
public DefaultHttp2HeadersDecoder() {
|
||||
this(DEFAULT_MAX_HEADER_SIZE, DEFAULT_HEADER_TABLE_SIZE);
|
||||
@ -40,29 +39,17 @@ public class DefaultHttp2HeadersDecoder implements Http2HeadersDecoder {
|
||||
|
||||
public DefaultHttp2HeadersDecoder(int maxHeaderSize, int maxHeaderTableSize) {
|
||||
decoder = new Decoder(maxHeaderSize, maxHeaderTableSize);
|
||||
headerTable = new Http2HeaderTableDecoder();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void maxHeaderTableSize(int size) {
|
||||
decoder.setMaxHeaderTableSize(size);
|
||||
public Http2HeaderTable headerTable() {
|
||||
return headerTable;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int maxHeaderTableSize() {
|
||||
return decoder.getMaxHeaderTableSize();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void maxHeaderListSize(int max) {
|
||||
if (max < 0) {
|
||||
throw new IllegalArgumentException("maxHeaderListSize must be >= 0: " + max);
|
||||
}
|
||||
maxHeaderListSize = max;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int maxHeaderListSize() {
|
||||
return maxHeaderListSize;
|
||||
public Configuration configuration() {
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -83,9 +70,9 @@ public class DefaultHttp2HeadersDecoder implements Http2HeadersDecoder {
|
||||
// TODO: what's the right thing to do here?
|
||||
}
|
||||
|
||||
if (headers.size() > maxHeaderListSize) {
|
||||
if (headers.size() > headerTable.maxHeaderListSize()) {
|
||||
throw protocolError("Number of headers (%d) exceeds maxHeaderListSize (%d)",
|
||||
headers.size(), maxHeaderListSize);
|
||||
headers.size(), headerTable.maxHeaderListSize());
|
||||
}
|
||||
|
||||
return headers;
|
||||
@ -93,8 +80,7 @@ public class DefaultHttp2HeadersDecoder implements Http2HeadersDecoder {
|
||||
throw new Http2Exception(COMPRESSION_ERROR, e.getMessage());
|
||||
} catch (Throwable e) {
|
||||
// Default handler for any other types of errors that may have occurred. For example,
|
||||
// the the Header builder throws IllegalArgumentException if the key or value was
|
||||
// invalid
|
||||
// the the Header builder throws IllegalArgumentException if the key or value was invalid
|
||||
// for any reason (e.g. the key was an invalid pseudo-header).
|
||||
throw new Http2Exception(Http2Error.PROTOCOL_ERROR, e.getMessage(), e);
|
||||
} finally {
|
||||
@ -105,4 +91,26 @@ public class DefaultHttp2HeadersDecoder implements Http2HeadersDecoder {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* {@link Http2HeaderTable} implementation to support {@link Http2HeadersDecoder}
|
||||
*/
|
||||
private final class Http2HeaderTableDecoder extends DefaultHttp2HeaderTableListSize implements Http2HeaderTable {
|
||||
@Override
|
||||
public void maxHeaderTableSize(int max) throws Http2Exception {
|
||||
if (max < 0) {
|
||||
throw Http2Exception.protocolError("Header Table Size must be non-negative but was %d", max);
|
||||
}
|
||||
try {
|
||||
decoder.setMaxHeaderTableSize(max);
|
||||
} catch (Throwable t) {
|
||||
throw Http2Exception.format(Http2Error.PROTOCOL_ERROR, t.getMessage(), t);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public int maxHeaderTableSize() {
|
||||
return decoder.getMaxHeaderTableSize();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -31,11 +31,11 @@ import java.util.TreeSet;
|
||||
|
||||
import com.twitter.hpack.Encoder;
|
||||
|
||||
public class DefaultHttp2HeadersEncoder implements Http2HeadersEncoder {
|
||||
public class DefaultHttp2HeadersEncoder implements Http2HeadersEncoder, Http2HeadersEncoder.Configuration {
|
||||
private final Encoder encoder;
|
||||
private final ByteArrayOutputStream tableSizeChangeOutput = new ByteArrayOutputStream();
|
||||
private final Set<String> sensitiveHeaders = new TreeSet<String>(String.CASE_INSENSITIVE_ORDER);
|
||||
private int maxHeaderListSize = Integer.MAX_VALUE;
|
||||
private final Http2HeaderTable headerTable;
|
||||
|
||||
public DefaultHttp2HeadersEncoder() {
|
||||
this(DEFAULT_HEADER_TABLE_SIZE, Collections.<String>emptySet());
|
||||
@ -44,15 +44,16 @@ public class DefaultHttp2HeadersEncoder implements Http2HeadersEncoder {
|
||||
public DefaultHttp2HeadersEncoder(int maxHeaderTableSize, Set<String> sensitiveHeaders) {
|
||||
encoder = new Encoder(maxHeaderTableSize);
|
||||
this.sensitiveHeaders.addAll(sensitiveHeaders);
|
||||
headerTable = new Http2HeaderTableEncoder();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void encodeHeaders(Http2Headers headers, ByteBuf buffer) throws Http2Exception {
|
||||
final OutputStream stream = new ByteBufOutputStream(buffer);
|
||||
try {
|
||||
if (headers.size() > maxHeaderListSize) {
|
||||
if (headers.size() > headerTable.maxHeaderListSize()) {
|
||||
throw protocolError("Number of headers (%d) exceeds maxHeaderListSize (%d)",
|
||||
headers.size(), maxHeaderListSize);
|
||||
headers.size(), headerTable.maxHeaderListSize());
|
||||
}
|
||||
|
||||
// If there was a change in the table size, serialize the output from the encoder
|
||||
@ -92,35 +93,42 @@ public class DefaultHttp2HeadersEncoder implements Http2HeadersEncoder {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void maxHeaderTableSize(int size) throws Http2Exception {
|
||||
try {
|
||||
// No headers should be emitted. If they are, we throw.
|
||||
encoder.setMaxHeaderTableSize(tableSizeChangeOutput, size);
|
||||
} catch (IOException e) {
|
||||
throw new Http2Exception(Http2Error.COMPRESSION_ERROR, e.getMessage(), e);
|
||||
}
|
||||
public Http2HeaderTable headerTable() {
|
||||
return headerTable;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int maxHeaderTableSize() {
|
||||
return encoder.getMaxHeaderTableSize();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void maxHeaderListSize(int max) {
|
||||
if (max < 0) {
|
||||
throw new IllegalArgumentException("maxHeaderListSize must be positive: " + max);
|
||||
}
|
||||
maxHeaderListSize = max;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int maxHeaderListSize() {
|
||||
return maxHeaderListSize;
|
||||
public Configuration configuration() {
|
||||
return this;
|
||||
}
|
||||
|
||||
private void encodeHeader(AsciiString key, AsciiString value, OutputStream stream) throws IOException {
|
||||
boolean sensitive = sensitiveHeaders.contains(key);
|
||||
encoder.encodeHeader(stream, key.array(), value.array(), sensitive);
|
||||
}
|
||||
|
||||
/**
|
||||
* {@link Http2HeaderTable} implementation to support {@link Http2HeadersEncoder}
|
||||
*/
|
||||
private final class Http2HeaderTableEncoder extends DefaultHttp2HeaderTableListSize implements Http2HeaderTable {
|
||||
@Override
|
||||
public void maxHeaderTableSize(int max) throws Http2Exception {
|
||||
if (max < 0) {
|
||||
throw Http2Exception.protocolError("Header Table Size must be non-negative but was %d", max);
|
||||
}
|
||||
try {
|
||||
// No headers should be emitted. If they are, we throw.
|
||||
encoder.setMaxHeaderTableSize(tableSizeChangeOutput, max);
|
||||
} catch (IOException e) {
|
||||
throw new Http2Exception(Http2Error.COMPRESSION_ERROR, e.getMessage(), e);
|
||||
} catch (Throwable t) {
|
||||
throw new Http2Exception(Http2Error.PROTOCOL_ERROR, t.getMessage(), t);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public int maxHeaderTableSize() {
|
||||
return encoder.getMaxHeaderTableSize();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -41,26 +41,16 @@ import static java.lang.Math.min;
|
||||
* Basic implementation of {@link Http2OutboundFlowController}.
|
||||
*/
|
||||
public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowController {
|
||||
|
||||
/**
|
||||
* A comparators that sorts priority nodes in ascending order by the amount of priority data available for its
|
||||
* subtree.
|
||||
*/
|
||||
private static final Comparator<Http2Stream> DATA_WEIGHT = new Comparator<Http2Stream>() {
|
||||
private static final int MAX_DATA_THRESHOLD = Integer.MAX_VALUE / 256;
|
||||
|
||||
@Override
|
||||
public int compare(Http2Stream o1, Http2Stream o2) {
|
||||
int o1Data = state(o1).priorityBytes();
|
||||
int o2Data = state(o2).priorityBytes();
|
||||
if (o1Data > MAX_DATA_THRESHOLD || o2Data > MAX_DATA_THRESHOLD) {
|
||||
// Corner case to make sure we don't overflow an integer with
|
||||
// the multiply.
|
||||
return o1Data - o2Data;
|
||||
}
|
||||
|
||||
// Scale the data by the weight.
|
||||
return o1Data * o1.weight() - o2Data * o2.weight();
|
||||
final long result = ((long) state(o1).priorityBytes()) * o1.weight() -
|
||||
((long) state(o2).priorityBytes()) * o2.weight();
|
||||
return result > 0 ? 1 : (result < 0 ? -1 : 0);
|
||||
}
|
||||
};
|
||||
|
||||
@ -621,9 +611,10 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont
|
||||
void write() throws Http2Exception {
|
||||
// Using a do/while loop because if the buffer is empty we still need to call
|
||||
// the writer once to send the empty frame.
|
||||
final Http2FrameSizePolicy frameSizePolicy = frameWriter.configuration().frameSizePolicy();
|
||||
do {
|
||||
int bytesToWrite = size();
|
||||
int frameBytes = Math.min(bytesToWrite, frameWriter.maxFrameSize());
|
||||
int frameBytes = Math.min(bytesToWrite, frameSizePolicy.maxFrameSize());
|
||||
if (frameBytes == bytesToWrite) {
|
||||
// All the bytes fit into a single HTTP/2 frame, just send it all.
|
||||
connectionState().incrementStreamWindow(-bytesToWrite);
|
||||
|
@ -1,172 +0,0 @@
|
||||
/*
|
||||
* Copyright 2014 The Netty Project
|
||||
*
|
||||
* The Netty Project licenses this file to you under the Apache License, version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance with the License. You may obtain a
|
||||
* copy of the License at:
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software distributed under the License
|
||||
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
|
||||
* or implied. See the License for the specific language governing permissions and limitations under
|
||||
* the License.
|
||||
*/
|
||||
|
||||
package io.netty.handler.codec.http2;
|
||||
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.channel.ChannelFuture;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.ChannelPromise;
|
||||
|
||||
/**
|
||||
* Helper class that facilitates use of {@link AbstractHttp2ConnectionHandler} in compositional
|
||||
* models, rather than having to subclass it directly.
|
||||
* <p>
|
||||
* Exposes all {@code writeXXX} methods as public and delegates all frame read events to a provided
|
||||
* {@link Http2FrameListener}.
|
||||
* <p>
|
||||
* The {@link #channelActive} and {@link #handlerAdded} should called when appropriate to ensure
|
||||
* that the initial SETTINGS frame is sent to the remote endpoint.
|
||||
*/
|
||||
public class DelegatingHttp2ConnectionHandler extends AbstractHttp2ConnectionHandler {
|
||||
private final Http2FrameListener listener;
|
||||
|
||||
public DelegatingHttp2ConnectionHandler(boolean server, Http2FrameListener listener) {
|
||||
super(server);
|
||||
this.listener = listener;
|
||||
}
|
||||
|
||||
public DelegatingHttp2ConnectionHandler(Http2Connection connection,
|
||||
Http2FrameReader frameReader, Http2FrameWriter frameWriter,
|
||||
Http2InboundFlowController inboundFlow, Http2OutboundFlowController outboundFlow,
|
||||
Http2FrameListener listener) {
|
||||
super(connection, frameReader, frameWriter, inboundFlow, outboundFlow);
|
||||
this.listener = listener;
|
||||
}
|
||||
|
||||
public DelegatingHttp2ConnectionHandler(Http2Connection connection, Http2FrameListener listener) {
|
||||
super(connection);
|
||||
this.listener = listener;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelFuture writeData(ChannelHandlerContext ctx, int streamId, ByteBuf data,
|
||||
int padding, boolean endStream, ChannelPromise promise) {
|
||||
return super.writeData(ctx, streamId, data, padding, endStream, promise);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelFuture writeHeaders(ChannelHandlerContext ctx, int streamId,
|
||||
Http2Headers headers, int padding, boolean endStream, ChannelPromise promise) {
|
||||
return super.writeHeaders(ctx, streamId, headers, padding, endStream, promise);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelFuture writeHeaders(ChannelHandlerContext ctx, int streamId,
|
||||
Http2Headers headers, int streamDependency, short weight, boolean exclusive,
|
||||
int padding, boolean endStream, ChannelPromise promise) {
|
||||
return super.writeHeaders(ctx, streamId, headers, streamDependency, weight,
|
||||
exclusive, padding, endStream, promise);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelFuture writePriority(ChannelHandlerContext ctx, int streamId,
|
||||
int streamDependency, short weight, boolean exclusive, ChannelPromise promise) {
|
||||
return super.writePriority(ctx, streamId, streamDependency, weight, exclusive, promise);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelFuture writeRstStream(ChannelHandlerContext ctx, int streamId, long errorCode,
|
||||
ChannelPromise promise) {
|
||||
return super.writeRstStream(ctx, streamId, errorCode, promise);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelFuture writeSettings(ChannelHandlerContext ctx,
|
||||
Http2Settings settings, ChannelPromise promise) {
|
||||
return super.writeSettings(ctx, settings, promise);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelFuture writePing(ChannelHandlerContext ctx, ByteBuf data, ChannelPromise promise) {
|
||||
return super.writePing(ctx, data, promise);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelFuture writePushPromise(ChannelHandlerContext ctx, int streamId,
|
||||
int promisedStreamId, Http2Headers headers, int padding, ChannelPromise promise) {
|
||||
return super.writePushPromise(ctx, streamId, promisedStreamId, headers, padding, promise);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding,
|
||||
boolean endOfStream) throws Http2Exception {
|
||||
listener.onDataRead(ctx, streamId, data, padding, endOfStream);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers,
|
||||
int streamDependency, short weight, boolean exclusive, int padding, boolean endStream)
|
||||
throws Http2Exception {
|
||||
listener.onHeadersRead(ctx, streamId, headers, streamDependency, weight, exclusive,
|
||||
padding, endStream);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onPriorityRead(ChannelHandlerContext ctx, int streamId, int streamDependency,
|
||||
short weight, boolean exclusive) throws Http2Exception {
|
||||
listener.onPriorityRead(ctx, streamId, streamDependency, weight, exclusive);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onRstStreamRead(ChannelHandlerContext ctx, int streamId, long errorCode)
|
||||
throws Http2Exception {
|
||||
listener.onRstStreamRead(ctx, streamId, errorCode);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onSettingsAckRead(ChannelHandlerContext ctx) throws Http2Exception {
|
||||
listener.onSettingsAckRead(ctx);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onSettingsRead(ChannelHandlerContext ctx, Http2Settings settings) throws Http2Exception {
|
||||
listener.onSettingsRead(ctx, settings);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onPingRead(ChannelHandlerContext ctx, ByteBuf data) throws Http2Exception {
|
||||
listener.onPingRead(ctx, data);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onPingAckRead(ChannelHandlerContext ctx, ByteBuf data) throws Http2Exception {
|
||||
listener.onPingAckRead(ctx, data);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onPushPromiseRead(ChannelHandlerContext ctx, int streamId, int promisedStreamId,
|
||||
Http2Headers headers, int padding) throws Http2Exception {
|
||||
listener.onPushPromiseRead(ctx, streamId, promisedStreamId, headers, padding);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onGoAwayRead(ChannelHandlerContext ctx, int lastStreamId, long errorCode, ByteBuf debugData)
|
||||
throws Http2Exception {
|
||||
listener.onGoAwayRead(ctx, lastStreamId, errorCode, debugData);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onWindowUpdateRead(ChannelHandlerContext ctx, int streamId, int windowSizeIncrement)
|
||||
throws Http2Exception {
|
||||
listener.onWindowUpdateRead(ctx, streamId, windowSizeIncrement);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onUnknownFrame(ChannelHandlerContext ctx, byte frameType, int streamId, Http2Flags flags,
|
||||
ByteBuf payload) {
|
||||
listener.onUnknownFrame(ctx, frameType, streamId, flags, payload);
|
||||
}
|
||||
}
|
@ -39,7 +39,7 @@ public class Http2ClientUpgradeCodec implements HttpClientUpgradeHandler.Upgrade
|
||||
private static final List<String> UPGRADE_HEADERS = Collections.singletonList(HTTP_UPGRADE_SETTINGS_HEADER);
|
||||
|
||||
private final String handlerName;
|
||||
private final AbstractHttp2ConnectionHandler connectionHandler;
|
||||
private final Http2InboundConnectionHandler connectionHandler;
|
||||
|
||||
/**
|
||||
* Creates the codec using a default name for the connection handler when adding to the
|
||||
@ -47,7 +47,7 @@ public class Http2ClientUpgradeCodec implements HttpClientUpgradeHandler.Upgrade
|
||||
*
|
||||
* @param connectionHandler the HTTP/2 connection handler.
|
||||
*/
|
||||
public Http2ClientUpgradeCodec(AbstractHttp2ConnectionHandler connectionHandler) {
|
||||
public Http2ClientUpgradeCodec(Http2InboundConnectionHandler connectionHandler) {
|
||||
this("http2ConnectionHandler", connectionHandler);
|
||||
}
|
||||
|
||||
@ -58,7 +58,7 @@ public class Http2ClientUpgradeCodec implements HttpClientUpgradeHandler.Upgrade
|
||||
* @param connectionHandler the HTTP/2 connection handler.
|
||||
*/
|
||||
public Http2ClientUpgradeCodec(String handlerName,
|
||||
AbstractHttp2ConnectionHandler connectionHandler) {
|
||||
Http2InboundConnectionHandler connectionHandler) {
|
||||
if (handlerName == null) {
|
||||
throw new NullPointerException("handlerName");
|
||||
}
|
||||
|
@ -15,6 +15,9 @@
|
||||
|
||||
package io.netty.handler.codec.http2;
|
||||
|
||||
import io.netty.channel.ChannelFuture;
|
||||
import io.netty.channel.ChannelFutureListener;
|
||||
|
||||
import java.util.Collection;
|
||||
|
||||
/**
|
||||
@ -250,13 +253,31 @@ public interface Http2Connection {
|
||||
*/
|
||||
Endpoint local();
|
||||
|
||||
/**
|
||||
* Creates a new stream initiated by the local endpoint. See {@link Endpoint#createStream(int, boolean)}.
|
||||
*/
|
||||
Http2Stream createLocalStream(int streamId, boolean halfClosed) throws Http2Exception;
|
||||
|
||||
/**
|
||||
* Gets a view of this connection from the remote {@link Endpoint}.
|
||||
*/
|
||||
Endpoint remote();
|
||||
|
||||
/**
|
||||
* Creates a new stream initiated by the remote endpoint. See {@link Endpoint#createStream(int, boolean)}.
|
||||
*/
|
||||
Http2Stream createRemoteStream(int streamId, boolean halfClosed) throws Http2Exception;
|
||||
|
||||
/**
|
||||
* Indicates whether or not either endpoint has received a GOAWAY.
|
||||
*/
|
||||
boolean isGoAway();
|
||||
|
||||
/**
|
||||
* Closes the given stream and adds a hook to close the channel after the given future completes.
|
||||
* @param stream the stream to be closed.
|
||||
* @param future the future after which to close the channel. If {@code null}, ignored.
|
||||
* @param closeListener the listener to add to the {@code future} if notification is expected
|
||||
*/
|
||||
void close(Http2Stream stream, ChannelFuture future, ChannelFutureListener closeListener);
|
||||
}
|
||||
|
@ -0,0 +1,127 @@
|
||||
/*
|
||||
* Copyright 2014 The Netty Project
|
||||
*
|
||||
* The Netty Project licenses this file to you under the Apache License, version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance with the License. You may obtain a
|
||||
* copy of the License at:
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software distributed under the License
|
||||
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
|
||||
* or implied. See the License for the specific language governing permissions and limitations under
|
||||
* the License.
|
||||
*/
|
||||
package io.netty.handler.codec.http2;
|
||||
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.channel.ChannelFuture;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.ChannelPromise;
|
||||
|
||||
/**
|
||||
* This class handles writing HTTP/2 frames, delegating responses to a {@link Http2FrameListener},
|
||||
* and can be inserted into a Netty pipeline.
|
||||
*/
|
||||
public class Http2ConnectionHandler extends Http2InboundConnectionHandler implements Http2FrameWriter {
|
||||
public Http2ConnectionHandler(boolean server, Http2FrameListener listener) {
|
||||
this(new DefaultHttp2Connection(server), listener);
|
||||
}
|
||||
|
||||
public Http2ConnectionHandler(Http2Connection connection, Http2FrameListener listener) {
|
||||
this(connection, listener, new DefaultHttp2FrameReader(), new DefaultHttp2FrameWriter());
|
||||
}
|
||||
|
||||
public Http2ConnectionHandler(Http2Connection connection, Http2FrameListener listener,
|
||||
Http2FrameReader frameReader, Http2FrameWriter frameWriter) {
|
||||
this(connection, listener, frameReader, new DefaultHttp2InboundFlowController(connection, frameWriter),
|
||||
new Http2OutboundConnectionAdapter(connection, frameWriter));
|
||||
}
|
||||
|
||||
public Http2ConnectionHandler(Http2Connection connection, Http2FrameListener listener,
|
||||
Http2FrameReader frameReader, Http2InboundFlowController inboundFlow,
|
||||
Http2OutboundConnectionAdapter outbound) {
|
||||
super(connection, listener, frameReader, inboundFlow, outbound);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelFuture writeHeaders(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int padding,
|
||||
boolean endStream, ChannelPromise promise) {
|
||||
return outbound.writeHeaders(ctx, streamId, headers, padding, endStream, promise);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelFuture writeHeaders(ChannelHandlerContext ctx, int streamId, Http2Headers headers,
|
||||
int streamDependency, short weight, boolean exclusive, int padding, boolean endStream,
|
||||
ChannelPromise promise) {
|
||||
return outbound.writeHeaders(ctx, streamId, headers, padding, endStream, promise);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelFuture writePriority(ChannelHandlerContext ctx, int streamId, int streamDependency, short weight,
|
||||
boolean exclusive, ChannelPromise promise) {
|
||||
return outbound.writePriority(ctx, streamId, streamDependency, weight, exclusive, promise);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelFuture writeRstStream(ChannelHandlerContext ctx, int streamId, long errorCode,
|
||||
ChannelPromise promise) {
|
||||
return outbound.writeRstStream(ctx, streamId, errorCode, promise);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelFuture writeSettings(ChannelHandlerContext ctx, Http2Settings settings, ChannelPromise promise) {
|
||||
return outbound.writeSettings(ctx, settings, promise);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelFuture writeSettingsAck(ChannelHandlerContext ctx, ChannelPromise promise) {
|
||||
return outbound.writeSettingsAck(ctx, promise);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelFuture writePing(ChannelHandlerContext ctx, boolean ack, ByteBuf data, ChannelPromise promise) {
|
||||
return outbound.writePing(ctx, ack, data, promise);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelFuture writePushPromise(ChannelHandlerContext ctx, int streamId, int promisedStreamId,
|
||||
Http2Headers headers, int padding, ChannelPromise promise) {
|
||||
return outbound.writePushPromise(ctx, streamId, promisedStreamId, headers, padding, promise);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelFuture writeGoAway(ChannelHandlerContext ctx, int lastStreamId, long errorCode, ByteBuf debugData,
|
||||
ChannelPromise promise) {
|
||||
return outbound.writeGoAway(ctx, lastStreamId, errorCode, debugData, promise);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelFuture writeWindowUpdate(ChannelHandlerContext ctx, int streamId, int windowSizeIncrement,
|
||||
ChannelPromise promise) {
|
||||
return outbound.writeWindowUpdate(ctx, streamId, windowSizeIncrement, promise);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelFuture writeFrame(ChannelHandlerContext ctx, byte frameType, int streamId, Http2Flags flags,
|
||||
ByteBuf payload, ChannelPromise promise) {
|
||||
return outbound.writeFrame(ctx, frameType, streamId, flags, payload, promise);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelFuture writeData(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding,
|
||||
boolean endStream, ChannelPromise promise) {
|
||||
return outbound.writeData(ctx, streamId, data, padding, endStream, promise);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
outbound.close();
|
||||
super.close();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Configuration configuration() {
|
||||
return outbound.configuration();
|
||||
}
|
||||
}
|
@ -20,10 +20,9 @@ import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.ChannelPromise;
|
||||
|
||||
/**
|
||||
* Interface that defines an object capable of writing HTTP/2 data frames.
|
||||
* Interface that defines an object capable of producing HTTP/2 data frames.
|
||||
*/
|
||||
public interface Http2DataWriter {
|
||||
|
||||
/**
|
||||
* Writes a {@code DATA} frame to the remote endpoint.
|
||||
*
|
||||
|
@ -25,6 +25,20 @@ import java.io.Closeable;
|
||||
* {@link Http2FrameListener} when frames are complete.
|
||||
*/
|
||||
public interface Http2FrameReader extends Closeable {
|
||||
/**
|
||||
* Configuration specific to {@link Http2FrameReader}
|
||||
*/
|
||||
public interface Configuration {
|
||||
/**
|
||||
* Get the {@link Http2HeaderTable} for this {@link Http2FrameReader}
|
||||
*/
|
||||
Http2HeaderTable headerTable();
|
||||
|
||||
/**
|
||||
* Get the {@link Http2FrameSizePolicy} for this {@link Http2FrameReader}
|
||||
*/
|
||||
Http2FrameSizePolicy frameSizePolicy();
|
||||
}
|
||||
|
||||
/**
|
||||
* Attempts to read the next frame from the input buffer. If enough data is available to fully
|
||||
@ -34,34 +48,9 @@ public interface Http2FrameReader extends Closeable {
|
||||
throws Http2Exception;
|
||||
|
||||
/**
|
||||
* Sets the maximum size of the HPACK header table used for decoding HTTP/2 headers.
|
||||
* Get the configuration related elements for this {@link Http2FrameReader}
|
||||
*/
|
||||
void maxHeaderTableSize(long max);
|
||||
|
||||
/**
|
||||
* Gets the maximum size of the HPACK header table used for decoding HTTP/2 headers.
|
||||
*/
|
||||
long maxHeaderTableSize();
|
||||
|
||||
/**
|
||||
* Sets the maximum allowed frame size. Attempts to read frames longer than this maximum will fail.
|
||||
*/
|
||||
void maxFrameSize(int max);
|
||||
|
||||
/**
|
||||
* Gets the maximum allowed frame size.
|
||||
*/
|
||||
int maxFrameSize();
|
||||
|
||||
/**
|
||||
* Sets the maximum allowed header elements.
|
||||
*/
|
||||
void maxHeaderListSize(int max);
|
||||
|
||||
/**
|
||||
* Gets the maximum allowed header elements.
|
||||
*/
|
||||
int maxHeaderListSize();
|
||||
Configuration configuration();
|
||||
|
||||
/**
|
||||
* Closes this reader and frees any allocated resources.
|
||||
|
@ -0,0 +1,27 @@
|
||||
/*
|
||||
* Copyright 2014 The Netty Project
|
||||
*
|
||||
* The Netty Project licenses this file to you under the Apache License, version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance with the License. You may obtain a
|
||||
* copy of the License at:
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software distributed under the License
|
||||
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
|
||||
* or implied. See the License for the specific language governing permissions and limitations under
|
||||
* the License.
|
||||
*/
|
||||
package io.netty.handler.codec.http2;
|
||||
|
||||
public interface Http2FrameSizePolicy {
|
||||
/**
|
||||
* Sets the maximum allowed frame size. Attempts to write frames longer than this maximum will fail.
|
||||
*/
|
||||
void maxFrameSize(int max) throws Http2Exception;
|
||||
|
||||
/**
|
||||
* Gets the maximum allowed frame size.
|
||||
*/
|
||||
int maxFrameSize();
|
||||
}
|
@ -23,11 +23,25 @@ import io.netty.channel.ChannelPromise;
|
||||
import java.io.Closeable;
|
||||
|
||||
/**
|
||||
* A writer responsible for marshalling HTTP/2 frames to the channel. All of the write methods in
|
||||
* A writer responsible for marshaling HTTP/2 frames to the channel. All of the write methods in
|
||||
* this interface write to the context, but DO NOT FLUSH. To perform a flush, you must separately
|
||||
* call {@link ChannelHandlerContext#flush()}.
|
||||
*/
|
||||
public interface Http2FrameWriter extends Http2DataWriter, Closeable {
|
||||
/**
|
||||
* Configuration specific to {@link Http2FrameWriter}
|
||||
*/
|
||||
public interface Configuration {
|
||||
/**
|
||||
* Get the {@link Http2HeaderTable} for this {@link Http2FrameWriter}
|
||||
*/
|
||||
Http2HeaderTable headerTable();
|
||||
|
||||
/**
|
||||
* Get the {@link Http2FrameSizePolicy} for this {@link Http2FrameWriter}
|
||||
*/
|
||||
Http2FrameSizePolicy frameSizePolicy();
|
||||
}
|
||||
|
||||
/**
|
||||
* Writes a HEADERS frame to the remote endpoint.
|
||||
@ -176,39 +190,14 @@ public interface Http2FrameWriter extends Http2DataWriter, Closeable {
|
||||
ChannelFuture writeFrame(ChannelHandlerContext ctx, byte frameType, int streamId,
|
||||
Http2Flags flags, ByteBuf payload, ChannelPromise promise);
|
||||
|
||||
/**
|
||||
* Get the configuration related elements for this {@link Http2FrameWriter}
|
||||
*/
|
||||
Configuration configuration();
|
||||
|
||||
/**
|
||||
* Closes this writer and frees any allocated resources.
|
||||
*/
|
||||
@Override
|
||||
void close();
|
||||
|
||||
/**
|
||||
* Sets the maximum size of the HPACK header table used for decoding HTTP/2 headers.
|
||||
*/
|
||||
void maxHeaderTableSize(long max) throws Http2Exception;
|
||||
|
||||
/**
|
||||
* Gets the maximum size of the HPACK header table used for decoding HTTP/2 headers.
|
||||
*/
|
||||
long maxHeaderTableSize();
|
||||
|
||||
/**
|
||||
* Sets the maximum allowed frame size. Attempts to write frames longer than this maximum will fail.
|
||||
*/
|
||||
void maxFrameSize(int max);
|
||||
|
||||
/**
|
||||
* Gets the maximum allowed frame size.
|
||||
*/
|
||||
int maxFrameSize();
|
||||
|
||||
/**
|
||||
* Sets the maximum allowed header elements.
|
||||
*/
|
||||
void maxHeaderListSize(int max);
|
||||
|
||||
/**
|
||||
* Gets the maximum allowed header elements.
|
||||
*/
|
||||
int maxHeaderListSize();
|
||||
}
|
||||
|
@ -0,0 +1,40 @@
|
||||
/*
|
||||
* Copyright 2014 The Netty Project
|
||||
*
|
||||
* The Netty Project licenses this file to you under the Apache License, version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance with the License. You may obtain a
|
||||
* copy of the License at:
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software distributed under the License
|
||||
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
|
||||
* or implied. See the License for the specific language governing permissions and limitations under
|
||||
* the License.
|
||||
*/
|
||||
package io.netty.handler.codec.http2;
|
||||
|
||||
/**
|
||||
* Extracts a common interface for encoding and processing HPACK header constraints
|
||||
*/
|
||||
public interface Http2HeaderTable {
|
||||
/**
|
||||
* Sets the maximum size of the HPACK header table used for decoding HTTP/2 headers.
|
||||
*/
|
||||
void maxHeaderTableSize(int max) throws Http2Exception;
|
||||
|
||||
/**
|
||||
* Gets the maximum size of the HPACK header table used for decoding HTTP/2 headers.
|
||||
*/
|
||||
int maxHeaderTableSize();
|
||||
|
||||
/**
|
||||
* Sets the maximum allowed header elements.
|
||||
*/
|
||||
void maxHeaderListSize(int max) throws Http2Exception;
|
||||
|
||||
/**
|
||||
* Gets the maximum allowed header elements.
|
||||
*/
|
||||
int maxHeaderListSize();
|
||||
}
|
@ -21,6 +21,15 @@ import io.netty.buffer.ByteBuf;
|
||||
* Decodes HPACK-encoded headers blocks into {@link Http2Headers}.
|
||||
*/
|
||||
public interface Http2HeadersDecoder {
|
||||
/**
|
||||
* Configuration related elements for the {@link Http2HeadersDecoder} interface
|
||||
*/
|
||||
public interface Configuration {
|
||||
/**
|
||||
* Access the Http2HeaderTable for this {@link Http2HeadersDecoder}
|
||||
*/
|
||||
Http2HeaderTable headerTable();
|
||||
}
|
||||
|
||||
/**
|
||||
* Decodes the given headers block and returns the headers.
|
||||
@ -28,22 +37,7 @@ public interface Http2HeadersDecoder {
|
||||
Http2Headers decodeHeaders(ByteBuf headerBlock) throws Http2Exception;
|
||||
|
||||
/**
|
||||
* Sets the new max header table size for this decoder.
|
||||
* Get the {@link Configuration} for this {@link Http2HeadersDecoder}
|
||||
*/
|
||||
void maxHeaderTableSize(int size);
|
||||
|
||||
/**
|
||||
* Gets the maximum header table size for this decoder.
|
||||
*/
|
||||
int maxHeaderTableSize();
|
||||
|
||||
/**
|
||||
* Sets the maximum allowed header elements.
|
||||
*/
|
||||
void maxHeaderListSize(int max);
|
||||
|
||||
/**
|
||||
* Gets the maximum allowed header elements.
|
||||
*/
|
||||
int maxHeaderListSize();
|
||||
Configuration configuration();
|
||||
}
|
||||
|
@ -21,6 +21,15 @@ import io.netty.buffer.ByteBuf;
|
||||
* Encodes {@link Http2Headers} into HPACK-encoded headers blocks.
|
||||
*/
|
||||
public interface Http2HeadersEncoder {
|
||||
/**
|
||||
* Configuration related elements for the {@link Http2HeadersEncoder} interface
|
||||
*/
|
||||
public interface Configuration {
|
||||
/**
|
||||
* Access the Http2HeaderTable for this {@link Http2HeadersEncoder}
|
||||
*/
|
||||
Http2HeaderTable headerTable();
|
||||
}
|
||||
|
||||
/**
|
||||
* Encodes the given headers and writes the output headers block to the given output buffer.
|
||||
@ -31,22 +40,7 @@ public interface Http2HeadersEncoder {
|
||||
void encodeHeaders(Http2Headers headers, ByteBuf buffer) throws Http2Exception;
|
||||
|
||||
/**
|
||||
* Updates the maximum header table size for this encoder.
|
||||
* Get the {@link Configuration} for this {@link Http2HeadersEncoder}
|
||||
*/
|
||||
void maxHeaderTableSize(int size) throws Http2Exception;
|
||||
|
||||
/**
|
||||
* Gets the current maximum value for the header table size.
|
||||
*/
|
||||
int maxHeaderTableSize();
|
||||
|
||||
/**
|
||||
* Sets the maximum allowed header elements.
|
||||
*/
|
||||
void maxHeaderListSize(int max);
|
||||
|
||||
/**
|
||||
* Gets the maximum allowed header elements.
|
||||
*/
|
||||
int maxHeaderListSize();
|
||||
Configuration configuration();
|
||||
}
|
||||
|
@ -0,0 +1,663 @@
|
||||
/*
|
||||
* Copyright 2014 The Netty Project
|
||||
*
|
||||
* The Netty Project licenses this file to you under the Apache License, version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance with the License. You may obtain a
|
||||
* copy of the License at:
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software distributed under the License
|
||||
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
|
||||
* or implied. See the License for the specific language governing permissions and limitations under
|
||||
* the License.
|
||||
*/
|
||||
package io.netty.handler.codec.http2;
|
||||
|
||||
import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_PRIORITY_WEIGHT;
|
||||
import static io.netty.handler.codec.http2.Http2CodecUtil.HTTP_UPGRADE_STREAM_ID;
|
||||
import static io.netty.handler.codec.http2.Http2CodecUtil.connectionPrefaceBuf;
|
||||
import static io.netty.handler.codec.http2.Http2Error.PROTOCOL_ERROR;
|
||||
import static io.netty.handler.codec.http2.Http2Error.STREAM_CLOSED;
|
||||
import static io.netty.handler.codec.http2.Http2Exception.protocolError;
|
||||
import static io.netty.handler.codec.http2.Http2Stream.State.CLOSED;
|
||||
import static io.netty.handler.codec.http2.Http2Stream.State.HALF_CLOSED_LOCAL;
|
||||
import static io.netty.handler.codec.http2.Http2Stream.State.OPEN;
|
||||
import static io.netty.handler.codec.http2.Http2Stream.State.RESERVED_REMOTE;
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.channel.ChannelFuture;
|
||||
import io.netty.channel.ChannelFutureListener;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.ChannelPromise;
|
||||
import io.netty.handler.codec.ByteToMessageDecoder;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* Provides the default implementation for processing inbound frame events
|
||||
* and delegates to a {@link Http2FrameListener}
|
||||
* <p>
|
||||
* This class will read HTTP/2 frames and delegate the events to a {@link Http2FrameListener}
|
||||
* <p>
|
||||
* This interface enforces inbound flow control functionality through {@link Http2InboundFlowController}
|
||||
*/
|
||||
public class Http2InboundConnectionHandler extends ByteToMessageDecoder {
|
||||
private final Http2FrameListener internalFrameListener = new FrameReadListener();
|
||||
protected final Http2OutboundConnectionAdapter outbound;
|
||||
private final Http2FrameListener listener;
|
||||
private final Http2FrameReader frameReader;
|
||||
protected final Http2Connection connection;
|
||||
private final Http2InboundFlowController inboundFlow;
|
||||
private ByteBuf clientPrefaceString;
|
||||
private boolean prefaceSent;
|
||||
private boolean prefaceReceived;
|
||||
|
||||
public Http2InboundConnectionHandler(Http2Connection connection, Http2FrameListener listener,
|
||||
Http2FrameReader frameReader, Http2InboundFlowController inboundFlow,
|
||||
Http2OutboundConnectionAdapter outbound) {
|
||||
if (connection == null) {
|
||||
throw new NullPointerException("connection");
|
||||
}
|
||||
if (frameReader == null) {
|
||||
throw new NullPointerException("frameReader");
|
||||
}
|
||||
if (listener == null) {
|
||||
throw new NullPointerException("listener");
|
||||
}
|
||||
if (inboundFlow == null) {
|
||||
throw new NullPointerException("inboundFlow");
|
||||
}
|
||||
if (outbound == null) {
|
||||
throw new NullPointerException("outbound");
|
||||
}
|
||||
|
||||
this.connection = connection;
|
||||
this.frameReader = frameReader;
|
||||
this.listener = listener;
|
||||
this.outbound = outbound;
|
||||
this.inboundFlow = inboundFlow;
|
||||
|
||||
// Set the expected client preface string. Only servers should receive this.
|
||||
clientPrefaceString = connection.isServer() ? connectionPrefaceBuf() : null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Handles the client-side (cleartext) upgrade from HTTP to HTTP/2.
|
||||
* Reserves local stream 1 for the HTTP/2 response.
|
||||
*/
|
||||
public void onHttpClientUpgrade() throws Http2Exception {
|
||||
if (connection.isServer()) {
|
||||
throw protocolError("Client-side HTTP upgrade requested for a server");
|
||||
}
|
||||
if (prefaceSent || prefaceReceived) {
|
||||
throw protocolError("HTTP upgrade must occur before HTTP/2 preface is sent or received");
|
||||
}
|
||||
|
||||
// Create a local stream used for the HTTP cleartext upgrade.
|
||||
connection.createLocalStream(HTTP_UPGRADE_STREAM_ID, true);
|
||||
}
|
||||
|
||||
/**
|
||||
* Handles the server-side (cleartext) upgrade from HTTP to HTTP/2.
|
||||
* @param settings the settings for the remote endpoint.
|
||||
*/
|
||||
public void onHttpServerUpgrade(Http2Settings settings) throws Http2Exception {
|
||||
if (!connection.isServer()) {
|
||||
throw protocolError("Server-side HTTP upgrade requested for a client");
|
||||
}
|
||||
if (prefaceSent || prefaceReceived) {
|
||||
throw protocolError("HTTP upgrade must occur before HTTP/2 preface is sent or received");
|
||||
}
|
||||
|
||||
// Apply the settings but no ACK is necessary.
|
||||
applyRemoteSettings(settings);
|
||||
|
||||
// Create a stream in the half-closed state.
|
||||
connection.createRemoteStream(HTTP_UPGRADE_STREAM_ID, true);
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the local settings for this endpoint of the HTTP/2 connection.
|
||||
*/
|
||||
public Http2Settings settings() {
|
||||
Http2Settings settings = new Http2Settings();
|
||||
final Http2FrameReader.Configuration config = frameReader.configuration();
|
||||
final Http2HeaderTable headerTable = config.headerTable();
|
||||
final Http2FrameSizePolicy frameSizePolicy = config.frameSizePolicy();
|
||||
settings.initialWindowSize(inboundFlow.initialInboundWindowSize());
|
||||
settings.maxConcurrentStreams(connection.remote().maxStreams());
|
||||
settings.headerTableSize(headerTable.maxHeaderTableSize());
|
||||
settings.maxFrameSize(frameSizePolicy.maxFrameSize());
|
||||
settings.maxHeaderListSize(headerTable.maxHeaderListSize());
|
||||
if (!connection.isServer()) {
|
||||
// Only set the pushEnabled flag if this is a client endpoint.
|
||||
settings.pushEnabled(connection.local().allowPushTo());
|
||||
}
|
||||
return settings;
|
||||
}
|
||||
|
||||
/**
|
||||
* Closes all closable resources and frees any allocated resources.
|
||||
* <p>
|
||||
* This does NOT close the {@link Http2OutboundConnectionAdapter} reference in this class
|
||||
*/
|
||||
public void close() {
|
||||
frameReader.close();
|
||||
if (clientPrefaceString != null) {
|
||||
clientPrefaceString.release();
|
||||
clientPrefaceString = null;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void channelActive(ChannelHandlerContext ctx) throws Exception {
|
||||
// The channel just became active - send the connection preface to the remote
|
||||
// endpoint.
|
||||
sendPreface(ctx);
|
||||
super.channelActive(ctx);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
|
||||
// This handler was just added to the context. In case it was handled after
|
||||
// the connection became active, send the connection preface now.
|
||||
sendPreface(ctx);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void handlerRemoved0(ChannelHandlerContext ctx) throws Exception {
|
||||
close();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
|
||||
// Avoid NotYetConnectedException
|
||||
if (!ctx.channel().isActive()) {
|
||||
ctx.close(promise);
|
||||
return;
|
||||
}
|
||||
|
||||
outbound.sendGoAway(ctx, promise, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
|
||||
ChannelFuture future = ctx.newSucceededFuture();
|
||||
final Collection<Http2Stream> streams = connection.activeStreams();
|
||||
for (Http2Stream s : streams.toArray(new Http2Stream[streams.size()])) {
|
||||
connection.close(s, future, outbound.closeListener());
|
||||
}
|
||||
super.channelInactive(ctx);
|
||||
}
|
||||
|
||||
/**
|
||||
* Handles {@link Http2Exception} objects that were thrown from other handlers. Ignores all other exceptions.
|
||||
*/
|
||||
@Override
|
||||
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
|
||||
if (cause instanceof Http2Exception) {
|
||||
onHttp2Exception(ctx, (Http2Exception) cause);
|
||||
}
|
||||
|
||||
super.exceptionCaught(ctx, cause);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
|
||||
try {
|
||||
// Read the remaining of the client preface string if we haven't already.
|
||||
// If this is a client endpoint, always returns true.
|
||||
if (!readClientPrefaceString(ctx, in)) {
|
||||
// Still processing the client preface.
|
||||
return;
|
||||
}
|
||||
|
||||
frameReader.readFrame(ctx, in, internalFrameListener);
|
||||
} catch (Http2Exception e) {
|
||||
onHttp2Exception(ctx, e);
|
||||
} catch (Throwable e) {
|
||||
onHttp2Exception(ctx, new Http2Exception(Http2Error.INTERNAL_ERROR, e.getMessage(), e));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Processes the given exception. Depending on the type of exception, delegates to either
|
||||
* {@link #onConnectionError(ChannelHandlerContext, Http2Exception)} or
|
||||
* {@link #onStreamError(ChannelHandlerContext, Http2StreamException)}.
|
||||
*/
|
||||
protected final void onHttp2Exception(ChannelHandlerContext ctx, Http2Exception e) {
|
||||
if (e instanceof Http2StreamException) {
|
||||
onStreamError(ctx, (Http2StreamException) e);
|
||||
} else {
|
||||
onConnectionError(ctx, e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Handler for a connection error. Sends a GO_AWAY frame to the remote endpoint and waits until all streams are
|
||||
* closed before shutting down the connection.
|
||||
*/
|
||||
protected void onConnectionError(ChannelHandlerContext ctx, Http2Exception cause) {
|
||||
outbound.sendGoAway(ctx, ctx.newPromise(), cause);
|
||||
}
|
||||
|
||||
/**
|
||||
* Handler for a stream error. Sends a RST_STREAM frame to the remote endpoint and closes the stream.
|
||||
*/
|
||||
protected void onStreamError(ChannelHandlerContext ctx, Http2StreamException cause) {
|
||||
outbound.writeRstStream(ctx, cause.streamId(), cause.error().code(), ctx.newPromise(), true);
|
||||
}
|
||||
|
||||
/**
|
||||
* Sends the HTTP/2 connection preface upon establishment of the connection, if not already sent.
|
||||
*/
|
||||
private void sendPreface(final ChannelHandlerContext ctx) {
|
||||
if (prefaceSent || !ctx.channel().isActive()) {
|
||||
return;
|
||||
}
|
||||
|
||||
prefaceSent = true;
|
||||
|
||||
if (!connection.isServer()) {
|
||||
// Clients must send the preface string as the first bytes on the connection.
|
||||
ctx.write(connectionPrefaceBuf()).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
|
||||
}
|
||||
|
||||
// Both client and server must send their initial settings.
|
||||
outbound.writeSettings(ctx, settings(), ctx.newPromise()).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
|
||||
}
|
||||
|
||||
/**
|
||||
* Applies settings received from the remote endpoint.
|
||||
*/
|
||||
private void applyRemoteSettings(Http2Settings settings) throws Http2Exception {
|
||||
Boolean pushEnabled = settings.pushEnabled();
|
||||
final Http2FrameWriter.Configuration config = outbound.configuration();
|
||||
final Http2HeaderTable headerTable = config.headerTable();
|
||||
final Http2FrameSizePolicy frameSizePolicy = config.frameSizePolicy();
|
||||
if (pushEnabled != null) {
|
||||
if (!connection.isServer()) {
|
||||
throw protocolError("Client received SETTINGS frame with ENABLE_PUSH specified");
|
||||
}
|
||||
connection.remote().allowPushTo(pushEnabled);
|
||||
}
|
||||
|
||||
Long maxConcurrentStreams = settings.maxConcurrentStreams();
|
||||
if (maxConcurrentStreams != null) {
|
||||
int value = (int) Math.min(maxConcurrentStreams, Integer.MAX_VALUE);
|
||||
connection.local().maxStreams(value);
|
||||
}
|
||||
|
||||
Long headerTableSize = settings.headerTableSize();
|
||||
if (headerTableSize != null) {
|
||||
headerTable.maxHeaderTableSize((int) Math.min(headerTableSize.intValue(), Integer.MAX_VALUE));
|
||||
}
|
||||
|
||||
Integer maxHeaderListSize = settings.maxHeaderListSize();
|
||||
if (maxHeaderListSize != null) {
|
||||
headerTable.maxHeaderListSize(maxHeaderListSize);
|
||||
}
|
||||
|
||||
Integer maxFrameSize = settings.maxFrameSize();
|
||||
if (maxFrameSize != null) {
|
||||
frameSizePolicy.maxFrameSize(maxFrameSize);
|
||||
}
|
||||
|
||||
Integer initialWindowSize = settings.initialWindowSize();
|
||||
if (initialWindowSize != null) {
|
||||
outbound.initialOutboundWindowSize(initialWindowSize);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Decodes the client connection preface string from the input buffer.
|
||||
*
|
||||
* @return {@code true} if processing of the client preface string is complete. Since client preface strings can
|
||||
* only be received by servers, returns true immediately for client endpoints.
|
||||
*/
|
||||
private boolean readClientPrefaceString(ChannelHandlerContext ctx, ByteBuf in) {
|
||||
if (clientPrefaceString == null) {
|
||||
return true;
|
||||
}
|
||||
|
||||
int prefaceRemaining = clientPrefaceString.readableBytes();
|
||||
int bytesRead = Math.min(in.readableBytes(), prefaceRemaining);
|
||||
|
||||
// Read the portion of the input up to the length of the preface, if reached.
|
||||
ByteBuf sourceSlice = in.readSlice(bytesRead);
|
||||
|
||||
// Read the same number of bytes from the preface buffer.
|
||||
ByteBuf prefaceSlice = clientPrefaceString.readSlice(bytesRead);
|
||||
|
||||
// If the input so far doesn't match the preface, break the connection.
|
||||
if (bytesRead == 0 || !prefaceSlice.equals(sourceSlice)) {
|
||||
ctx.close();
|
||||
return false;
|
||||
}
|
||||
|
||||
if (!clientPrefaceString.isReadable()) {
|
||||
// Entire preface has been read.
|
||||
clientPrefaceString.release();
|
||||
clientPrefaceString = null;
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Handles all inbound frames from the network.
|
||||
*/
|
||||
private final class FrameReadListener implements Http2FrameListener {
|
||||
|
||||
@Override
|
||||
public void onDataRead(final ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding,
|
||||
boolean endOfStream) throws Http2Exception {
|
||||
verifyPrefaceReceived();
|
||||
|
||||
// Check if we received a data frame for a stream which is half-closed
|
||||
Http2Stream stream = connection.requireStream(streamId);
|
||||
stream.verifyState(STREAM_CLOSED, OPEN, HALF_CLOSED_LOCAL);
|
||||
|
||||
// Apply flow control.
|
||||
inboundFlow.onDataRead(ctx, streamId, data, padding, endOfStream);
|
||||
|
||||
verifyGoAwayNotReceived();
|
||||
verifyRstStreamNotReceived(stream);
|
||||
if (shouldIgnoreFrame(stream)) {
|
||||
// Ignore this frame.
|
||||
return;
|
||||
}
|
||||
|
||||
listener.onDataRead(ctx, streamId, data, padding, endOfStream);
|
||||
|
||||
if (endOfStream) {
|
||||
closeRemoteSide(stream, ctx.newSucceededFuture());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Verifies that the HTTP/2 connection preface has been received from the remote endpoint.
|
||||
*/
|
||||
private void verifyPrefaceReceived() throws Http2Exception {
|
||||
if (!prefaceReceived) {
|
||||
throw protocolError("Received non-SETTINGS as first frame.");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Closes the remote side of the given stream. If this causes the stream to be closed, adds a hook to close the
|
||||
* channel after the given future completes.
|
||||
*
|
||||
* @param stream the stream to be half closed.
|
||||
* @param future If closing, the future after which to close the channel. If {@code null}, ignored.
|
||||
*/
|
||||
private void closeRemoteSide(Http2Stream stream, ChannelFuture future) {
|
||||
switch (stream.state()) {
|
||||
case HALF_CLOSED_REMOTE:
|
||||
case OPEN:
|
||||
stream.closeRemoteSide();
|
||||
break;
|
||||
default:
|
||||
connection.close(stream, future, outbound.closeListener());
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int padding,
|
||||
boolean endStream) throws Http2Exception {
|
||||
onHeadersRead(ctx, streamId, headers, 0, DEFAULT_PRIORITY_WEIGHT, false, padding, endStream);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int streamDependency,
|
||||
short weight, boolean exclusive, int padding, boolean endStream) throws Http2Exception {
|
||||
verifyPrefaceReceived();
|
||||
|
||||
Http2Stream stream = connection.stream(streamId);
|
||||
verifyGoAwayNotReceived();
|
||||
verifyRstStreamNotReceived(stream);
|
||||
if (connection.remote().isGoAwayReceived() || stream != null && shouldIgnoreFrame(stream)) {
|
||||
// Ignore this frame.
|
||||
return;
|
||||
}
|
||||
|
||||
if (stream == null) {
|
||||
stream = connection.createRemoteStream(streamId, endStream);
|
||||
} else {
|
||||
if (stream.state() == RESERVED_REMOTE) {
|
||||
// Received headers for a reserved push stream ... open it for push to the local endpoint.
|
||||
stream.verifyState(PROTOCOL_ERROR, RESERVED_REMOTE);
|
||||
stream.openForPush();
|
||||
} else {
|
||||
// Receiving headers on an existing stream. Make sure the stream is in an allowed state.
|
||||
stream.verifyState(PROTOCOL_ERROR, OPEN, HALF_CLOSED_LOCAL);
|
||||
}
|
||||
}
|
||||
|
||||
listener.onHeadersRead(ctx, streamId, headers, streamDependency, weight, exclusive, padding, endStream);
|
||||
|
||||
stream.setPriority(streamDependency, weight, exclusive);
|
||||
|
||||
// If the headers completes this stream, close it.
|
||||
if (endStream) {
|
||||
closeRemoteSide(stream, ctx.newSucceededFuture());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onPriorityRead(ChannelHandlerContext ctx, int streamId, int streamDependency, short weight,
|
||||
boolean exclusive) throws Http2Exception {
|
||||
verifyPrefaceReceived();
|
||||
|
||||
Http2Stream stream = connection.requireStream(streamId);
|
||||
verifyGoAwayNotReceived();
|
||||
verifyRstStreamNotReceived(stream);
|
||||
if (stream.state() == CLOSED || shouldIgnoreFrame(stream)) {
|
||||
// Ignore frames for any stream created after we sent a go-away.
|
||||
return;
|
||||
}
|
||||
|
||||
listener.onPriorityRead(ctx, streamId, streamDependency, weight, exclusive);
|
||||
|
||||
stream.setPriority(streamDependency, weight, exclusive);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onRstStreamRead(ChannelHandlerContext ctx, int streamId, long errorCode) throws Http2Exception {
|
||||
verifyPrefaceReceived();
|
||||
|
||||
Http2Stream stream = connection.requireStream(streamId);
|
||||
verifyRstStreamNotReceived(stream);
|
||||
if (stream.state() == CLOSED) {
|
||||
// RstStream frames must be ignored for closed streams.
|
||||
return;
|
||||
}
|
||||
|
||||
stream.terminateReceived();
|
||||
|
||||
listener.onRstStreamRead(ctx, streamId, errorCode);
|
||||
|
||||
connection.close(stream, ctx.newSucceededFuture(), outbound.closeListener());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onSettingsAckRead(ChannelHandlerContext ctx) throws Http2Exception {
|
||||
verifyPrefaceReceived();
|
||||
// Apply oldest outstanding local settings here. This is a synchronization point
|
||||
// between endpoints.
|
||||
Http2Settings settings = outbound.pollSettings();
|
||||
|
||||
if (settings != null) {
|
||||
applyLocalSettings(settings);
|
||||
}
|
||||
|
||||
listener.onSettingsAckRead(ctx);
|
||||
}
|
||||
|
||||
/**
|
||||
* Applies settings sent from the local endpoint.
|
||||
*/
|
||||
private void applyLocalSettings(Http2Settings settings) throws Http2Exception {
|
||||
Boolean pushEnabled = settings.pushEnabled();
|
||||
final Http2FrameReader.Configuration config = frameReader.configuration();
|
||||
final Http2HeaderTable headerTable = config.headerTable();
|
||||
final Http2FrameSizePolicy frameSizePolicy = config.frameSizePolicy();
|
||||
if (pushEnabled != null) {
|
||||
if (connection.isServer()) {
|
||||
throw protocolError("Server sending SETTINGS frame with ENABLE_PUSH specified");
|
||||
}
|
||||
connection.local().allowPushTo(pushEnabled);
|
||||
}
|
||||
|
||||
Long maxConcurrentStreams = settings.maxConcurrentStreams();
|
||||
if (maxConcurrentStreams != null) {
|
||||
int value = (int) Math.min(maxConcurrentStreams, Integer.MAX_VALUE);
|
||||
connection.remote().maxStreams(value);
|
||||
}
|
||||
|
||||
Long headerTableSize = settings.headerTableSize();
|
||||
if (headerTableSize != null) {
|
||||
headerTable.maxHeaderTableSize((int) Math.min(headerTableSize, Integer.MAX_VALUE));
|
||||
}
|
||||
|
||||
Integer maxHeaderListSize = settings.maxHeaderListSize();
|
||||
if (maxHeaderListSize != null) {
|
||||
headerTable.maxHeaderListSize(maxHeaderListSize);
|
||||
}
|
||||
|
||||
Integer maxFrameSize = settings.maxFrameSize();
|
||||
if (maxFrameSize != null) {
|
||||
frameSizePolicy.maxFrameSize(maxFrameSize);
|
||||
}
|
||||
|
||||
Integer initialWindowSize = settings.initialWindowSize();
|
||||
if (initialWindowSize != null) {
|
||||
inboundFlow.initialInboundWindowSize(initialWindowSize);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onSettingsRead(ChannelHandlerContext ctx, Http2Settings settings) throws Http2Exception {
|
||||
applyRemoteSettings(settings);
|
||||
|
||||
// Acknowledge receipt of the settings.
|
||||
outbound.writeSettingsAck(ctx, ctx.newPromise());
|
||||
ctx.flush();
|
||||
|
||||
// We've received at least one non-ack settings frame from the remote endpoint.
|
||||
prefaceReceived = true;
|
||||
|
||||
listener.onSettingsRead(ctx, settings);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onPingRead(ChannelHandlerContext ctx, ByteBuf data) throws Http2Exception {
|
||||
verifyPrefaceReceived();
|
||||
|
||||
// Send an ack back to the remote client.
|
||||
// Need to retain the buffer here since it will be released after the write completes.
|
||||
outbound.writePing(ctx, true, data.retain(), ctx.newPromise());
|
||||
ctx.flush();
|
||||
|
||||
listener.onPingRead(ctx, data);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onPingAckRead(ChannelHandlerContext ctx, ByteBuf data) throws Http2Exception {
|
||||
verifyPrefaceReceived();
|
||||
|
||||
listener.onPingAckRead(ctx, data);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onPushPromiseRead(ChannelHandlerContext ctx, int streamId, int promisedStreamId,
|
||||
Http2Headers headers, int padding) throws Http2Exception {
|
||||
verifyPrefaceReceived();
|
||||
|
||||
Http2Stream parentStream = connection.requireStream(streamId);
|
||||
verifyGoAwayNotReceived();
|
||||
verifyRstStreamNotReceived(parentStream);
|
||||
if (shouldIgnoreFrame(parentStream)) {
|
||||
// Ignore frames for any stream created after we sent a go-away.
|
||||
return;
|
||||
}
|
||||
|
||||
// Reserve the push stream based with a priority based on the current stream's priority.
|
||||
connection.remote().reservePushStream(promisedStreamId, parentStream);
|
||||
|
||||
listener.onPushPromiseRead(ctx, streamId, promisedStreamId, headers, padding);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onGoAwayRead(ChannelHandlerContext ctx, int lastStreamId, long errorCode, ByteBuf debugData)
|
||||
throws Http2Exception {
|
||||
// Don't allow any more connections to be created.
|
||||
connection.local().goAwayReceived(lastStreamId);
|
||||
|
||||
listener.onGoAwayRead(ctx, lastStreamId, errorCode, debugData);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onWindowUpdateRead(ChannelHandlerContext ctx, int streamId, int windowSizeIncrement)
|
||||
throws Http2Exception {
|
||||
verifyPrefaceReceived();
|
||||
|
||||
Http2Stream stream = connection.requireStream(streamId);
|
||||
verifyGoAwayNotReceived();
|
||||
verifyRstStreamNotReceived(stream);
|
||||
if (stream.state() == CLOSED || shouldIgnoreFrame(stream)) {
|
||||
// Ignore frames for any stream created after we sent a go-away.
|
||||
return;
|
||||
}
|
||||
|
||||
// Update the outbound flow controller.
|
||||
outbound.updateOutboundWindowSize(streamId, windowSizeIncrement);
|
||||
|
||||
listener.onWindowUpdateRead(ctx, streamId, windowSizeIncrement);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onUnknownFrame(ChannelHandlerContext ctx, byte frameType, int streamId, Http2Flags flags,
|
||||
ByteBuf payload) {
|
||||
listener.onUnknownFrame(ctx, frameType, streamId, flags, payload);
|
||||
}
|
||||
|
||||
/**
|
||||
* Indicates whether or not frames for the given stream should be ignored based on the state of the
|
||||
* stream/connection.
|
||||
*/
|
||||
private boolean shouldIgnoreFrame(Http2Stream stream) {
|
||||
if (connection.remote().isGoAwayReceived() && connection.remote().lastStreamCreated() <= stream.id()) {
|
||||
// Frames from streams created after we sent a go-away should be ignored.
|
||||
// Frames for the connection stream ID (i.e. 0) will always be allowed.
|
||||
return true;
|
||||
}
|
||||
|
||||
// Also ignore inbound frames after we sent a RST_STREAM frame.
|
||||
return stream.isTerminateSent();
|
||||
}
|
||||
|
||||
/**
|
||||
* Verifies that a GO_AWAY frame was not previously received from the remote endpoint. If it was, throws an
|
||||
* exception.
|
||||
*/
|
||||
private void verifyGoAwayNotReceived() throws Http2Exception {
|
||||
if (connection.local().isGoAwayReceived()) {
|
||||
throw protocolError("Received frames after receiving GO_AWAY");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Verifies that a RST_STREAM frame was not previously received for the given stream. If it was, throws an
|
||||
* exception.
|
||||
*/
|
||||
private void verifyRstStreamNotReceived(Http2Stream stream) throws Http2Exception {
|
||||
if (stream != null && stream.isTerminateReceived()) {
|
||||
throw new Http2StreamException(stream.id(), STREAM_CLOSED,
|
||||
"Frame received after receiving RST_STREAM for stream: " + stream.id());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
@ -24,7 +24,6 @@ import io.netty.channel.ChannelHandlerContext;
|
||||
* back the listener.
|
||||
*/
|
||||
public class Http2InboundFrameLogger implements Http2FrameReader {
|
||||
|
||||
private final Http2FrameReader reader;
|
||||
private final Http2FrameLogger logger;
|
||||
|
||||
@ -145,32 +144,7 @@ public class Http2InboundFrameLogger implements Http2FrameReader {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void maxHeaderTableSize(long max) {
|
||||
reader.maxHeaderTableSize(max);
|
||||
}
|
||||
|
||||
@Override
|
||||
public long maxHeaderTableSize() {
|
||||
return reader.maxHeaderTableSize();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void maxFrameSize(int max) {
|
||||
reader.maxFrameSize(max);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int maxFrameSize() {
|
||||
return reader.maxFrameSize();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void maxHeaderListSize(int max) {
|
||||
reader.maxHeaderListSize(max);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int maxHeaderListSize() {
|
||||
return reader.maxHeaderListSize();
|
||||
public Configuration configuration() {
|
||||
return reader.configuration();
|
||||
}
|
||||
}
|
||||
|
@ -150,7 +150,7 @@ public abstract class Http2OrHttpChooser extends ByteToMessageDecoder {
|
||||
/**
|
||||
* Create the {@link io.netty.channel.ChannelHandler} that is responsible for handling the http
|
||||
* responses when the when the {@link SelectedProtocol} was {@link SelectedProtocol#HTTP_2}. The
|
||||
* returned class should be a subclass of {@link AbstractHttp2ConnectionHandler}.
|
||||
* returned class should be a subclass of {@link Http2ConnectionHandler}.
|
||||
*/
|
||||
protected abstract ChannelHandler createHttp2RequestHandler();
|
||||
}
|
||||
|
@ -0,0 +1,436 @@
|
||||
/*
|
||||
* Copyright 2014 The Netty Project
|
||||
*
|
||||
* The Netty Project licenses this file to you under the Apache License, version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance with the License. You may obtain a
|
||||
* copy of the License at:
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software distributed under the License
|
||||
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
|
||||
* or implied. See the License for the specific language governing permissions and limitations under
|
||||
* the License.
|
||||
*/
|
||||
package io.netty.handler.codec.http2;
|
||||
|
||||
import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_PRIORITY_WEIGHT;
|
||||
import static io.netty.handler.codec.http2.Http2CodecUtil.toByteBuf;
|
||||
import static io.netty.handler.codec.http2.Http2CodecUtil.toHttp2Exception;
|
||||
import static io.netty.handler.codec.http2.Http2Error.NO_ERROR;
|
||||
import static io.netty.handler.codec.http2.Http2Error.PROTOCOL_ERROR;
|
||||
import static io.netty.handler.codec.http2.Http2Exception.protocolError;
|
||||
import static io.netty.handler.codec.http2.Http2Stream.State.HALF_CLOSED_REMOTE;
|
||||
import static io.netty.handler.codec.http2.Http2Stream.State.OPEN;
|
||||
import static io.netty.handler.codec.http2.Http2Stream.State.RESERVED_LOCAL;
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.channel.ChannelFuture;
|
||||
import io.netty.channel.ChannelFutureListener;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.ChannelPromise;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.util.ArrayDeque;
|
||||
|
||||
/**
|
||||
* Provides the ability to write HTTP/2 frames
|
||||
* <p>
|
||||
* This class provides write methods which turn java calls into HTTP/2 frames
|
||||
* <p>
|
||||
* This interface enforces outbound flow control functionality through {@link Http2OutboundFlowController}
|
||||
*/
|
||||
public class Http2OutboundConnectionAdapter implements Http2FrameWriter, Http2OutboundFlowController, Closeable {
|
||||
private final Http2FrameWriter frameWriter;
|
||||
private final Http2Connection connection;
|
||||
private final Http2OutboundFlowController outboundFlow;
|
||||
// We prefer ArrayDeque to LinkedList because later will produce more GC.
|
||||
// This initial capacity is plenty for SETTINGS traffic.
|
||||
private final ArrayDeque<Http2Settings> outstandingLocalSettingsQueue = new ArrayDeque<Http2Settings>(4);
|
||||
private ChannelFutureListener closeListener;
|
||||
|
||||
public Http2OutboundConnectionAdapter(Http2Connection connection, Http2FrameWriter frameWriter) {
|
||||
this(connection, frameWriter, new DefaultHttp2OutboundFlowController(connection, frameWriter));
|
||||
}
|
||||
|
||||
public Http2OutboundConnectionAdapter(Http2Connection connection, Http2FrameWriter frameWriter,
|
||||
Http2OutboundFlowController outboundFlow) {
|
||||
this.frameWriter = frameWriter;
|
||||
this.connection = connection;
|
||||
this.outboundFlow = outboundFlow;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelFuture writeData(final ChannelHandlerContext ctx, final int streamId, ByteBuf data, int padding,
|
||||
final boolean endStream, ChannelPromise promise) {
|
||||
boolean release = true;
|
||||
try {
|
||||
if (connection.isGoAway()) {
|
||||
throw protocolError("Sending data after connection going away.");
|
||||
}
|
||||
|
||||
Http2Stream stream = connection.requireStream(streamId);
|
||||
stream.verifyState(PROTOCOL_ERROR, OPEN, HALF_CLOSED_REMOTE);
|
||||
|
||||
// Hand control of the frame to the flow controller.
|
||||
ChannelFuture future = outboundFlow.writeData(ctx, streamId, data, padding, endStream, promise);
|
||||
release = false;
|
||||
future.addListener(new ChannelFutureListener() {
|
||||
@Override
|
||||
public void operationComplete(ChannelFuture future) throws Exception {
|
||||
if (!future.isSuccess()) {
|
||||
// The write failed, handle the error.
|
||||
onHttp2Exception(ctx, toHttp2Exception(future.cause()));
|
||||
} else if (endStream) {
|
||||
// Close the local side of the stream if this is the last frame
|
||||
Http2Stream stream = connection.stream(streamId);
|
||||
closeLocalSide(stream, ctx.newPromise());
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
return future;
|
||||
} catch (Http2Exception e) {
|
||||
if (release) {
|
||||
data.release();
|
||||
}
|
||||
return promise.setFailure(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelFuture writeHeaders(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int padding,
|
||||
boolean endStream, ChannelPromise promise) {
|
||||
return writeHeaders(ctx, streamId, headers, 0, DEFAULT_PRIORITY_WEIGHT, false, padding, endStream, promise);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelFuture writeHeaders(ChannelHandlerContext ctx, int streamId, Http2Headers headers,
|
||||
int streamDependency, short weight, boolean exclusive, int padding, boolean endStream,
|
||||
ChannelPromise promise) {
|
||||
try {
|
||||
if (connection.isGoAway()) {
|
||||
throw protocolError("Sending headers after connection going away.");
|
||||
}
|
||||
|
||||
Http2Stream stream = connection.stream(streamId);
|
||||
if (stream == null) {
|
||||
// Create a new locally-initiated stream.
|
||||
stream = connection.createLocalStream(streamId, endStream);
|
||||
} else {
|
||||
// An existing stream...
|
||||
if (stream.state() == RESERVED_LOCAL) {
|
||||
// Sending headers on a reserved push stream ... open it for push to the remote
|
||||
// endpoint.
|
||||
stream.openForPush();
|
||||
} else {
|
||||
// The stream already exists, make sure it's in an allowed state.
|
||||
stream.verifyState(PROTOCOL_ERROR, OPEN, HALF_CLOSED_REMOTE);
|
||||
|
||||
// Update the priority for this stream only if we'll be sending more data.
|
||||
if (!endStream) {
|
||||
stream.setPriority(streamDependency, weight, exclusive);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
ChannelFuture future = frameWriter.writeHeaders(ctx, streamId, headers, streamDependency, weight,
|
||||
exclusive, padding, endStream, promise);
|
||||
ctx.flush();
|
||||
|
||||
// If the headers are the end of the stream, close it now.
|
||||
if (endStream) {
|
||||
closeLocalSide(stream, promise);
|
||||
}
|
||||
|
||||
return future;
|
||||
} catch (Http2Exception e) {
|
||||
return promise.setFailure(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelFuture writePriority(ChannelHandlerContext ctx, int streamId, int streamDependency, short weight,
|
||||
boolean exclusive, ChannelPromise promise) {
|
||||
try {
|
||||
if (connection.isGoAway()) {
|
||||
throw protocolError("Sending priority after connection going away.");
|
||||
}
|
||||
|
||||
// Update the priority on this stream.
|
||||
connection.requireStream(streamId).setPriority(streamDependency, weight, exclusive);
|
||||
|
||||
ChannelFuture future = frameWriter.writePriority(ctx, streamId, streamDependency, weight, exclusive,
|
||||
promise);
|
||||
ctx.flush();
|
||||
return future;
|
||||
} catch (Http2Exception e) {
|
||||
return promise.setFailure(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelFuture writeRstStream(ChannelHandlerContext ctx, int streamId, long errorCode,
|
||||
ChannelPromise promise) {
|
||||
return writeRstStream(ctx, streamId, errorCode, promise, false);
|
||||
}
|
||||
|
||||
/**
|
||||
* Writes a RST_STREAM frame to the remote endpoint.
|
||||
* @param ctx the context to use for writing.
|
||||
* @param streamId the stream for which to send the frame.
|
||||
* @param errorCode the error code indicating the nature of the failure.
|
||||
* @param promise the promise for the write.
|
||||
* @param writeIfNoStream
|
||||
* <ul>
|
||||
* <li>{@code true} will force a write of a RST_STREAM even if the stream object does not exist locally.</li>
|
||||
* <li>{@code false} will only send a RST_STREAM only if the stream is known about locally</li>
|
||||
* </ul>
|
||||
* @return the future for the write.
|
||||
*/
|
||||
public ChannelFuture writeRstStream(ChannelHandlerContext ctx, int streamId, long errorCode,
|
||||
ChannelPromise promise, boolean writeIfNoStream) {
|
||||
Http2Stream stream = connection.stream(streamId);
|
||||
if (stream == null && !writeIfNoStream) {
|
||||
// The stream may already have been closed ... ignore.
|
||||
promise.setSuccess();
|
||||
return promise;
|
||||
}
|
||||
|
||||
ChannelFuture future = frameWriter.writeRstStream(ctx, streamId, errorCode, promise);
|
||||
ctx.flush();
|
||||
|
||||
if (stream != null) {
|
||||
stream.terminateSent();
|
||||
connection.close(stream, promise, closeListener);
|
||||
}
|
||||
|
||||
return future;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelFuture writeSettings(ChannelHandlerContext ctx, Http2Settings settings, ChannelPromise promise) {
|
||||
outstandingLocalSettingsQueue.add(settings);
|
||||
try {
|
||||
if (connection.isGoAway()) {
|
||||
throw protocolError("Sending settings after connection going away.");
|
||||
}
|
||||
|
||||
Boolean pushEnabled = settings.pushEnabled();
|
||||
if (pushEnabled != null && connection.isServer()) {
|
||||
throw protocolError("Server sending SETTINGS frame with ENABLE_PUSH specified");
|
||||
}
|
||||
|
||||
ChannelFuture future = frameWriter.writeSettings(ctx, settings, promise);
|
||||
ctx.flush();
|
||||
return future;
|
||||
} catch (Http2Exception e) {
|
||||
return promise.setFailure(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelFuture writeSettingsAck(ChannelHandlerContext ctx, ChannelPromise promise) {
|
||||
return frameWriter.writeSettingsAck(ctx, promise);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelFuture writePing(ChannelHandlerContext ctx, boolean ack, ByteBuf data, ChannelPromise promise) {
|
||||
boolean release = true;
|
||||
try {
|
||||
if (connection.isGoAway()) {
|
||||
throw protocolError("Sending ping after connection going away.");
|
||||
}
|
||||
|
||||
frameWriter.writePing(ctx, ack, data, promise);
|
||||
release = false;
|
||||
ctx.flush();
|
||||
return promise;
|
||||
} catch (Http2Exception e) {
|
||||
if (release) {
|
||||
data.release();
|
||||
}
|
||||
return promise.setFailure(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelFuture writePushPromise(ChannelHandlerContext ctx, int streamId, int promisedStreamId,
|
||||
Http2Headers headers, int padding, ChannelPromise promise) {
|
||||
try {
|
||||
if (connection.isGoAway()) {
|
||||
throw protocolError("Sending push promise after connection going away.");
|
||||
}
|
||||
|
||||
// Reserve the promised stream.
|
||||
Http2Stream stream = connection.requireStream(streamId);
|
||||
connection.local().reservePushStream(promisedStreamId, stream);
|
||||
|
||||
// Write the frame.
|
||||
frameWriter.writePushPromise(ctx, streamId, promisedStreamId, headers, padding, promise);
|
||||
ctx.flush();
|
||||
return promise;
|
||||
} catch (Http2Exception e) {
|
||||
return promise.setFailure(e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Sends a GO_AWAY frame to the remote endpoint. Waits until all streams are closed before shutting down the
|
||||
* connection.
|
||||
* @param ctx the handler context
|
||||
* @param promise the promise used to create the close listener.
|
||||
* @param cause connection error that caused this GO_AWAY, or {@code null} if normal termination.
|
||||
*/
|
||||
public void sendGoAway(ChannelHandlerContext ctx, ChannelPromise promise, Http2Exception cause) {
|
||||
ChannelFuture future = null;
|
||||
ChannelPromise closePromise = promise;
|
||||
if (!connection.isGoAway()) {
|
||||
int errorCode = cause != null ? cause.error().code() : NO_ERROR.code();
|
||||
ByteBuf debugData = toByteBuf(ctx, cause);
|
||||
|
||||
future = writeGoAway(ctx, connection.remote().lastStreamCreated(), errorCode, debugData, promise);
|
||||
ctx.flush();
|
||||
closePromise = null;
|
||||
}
|
||||
|
||||
closeListener = getOrCreateCloseListener(ctx, closePromise);
|
||||
|
||||
// If there are no active streams, close immediately after the send is complete.
|
||||
// Otherwise wait until all streams are inactive.
|
||||
if (cause != null || connection.numActiveStreams() == 0) {
|
||||
if (future == null) {
|
||||
future = ctx.newSucceededFuture();
|
||||
}
|
||||
future.addListener(closeListener);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelFuture writeGoAway(ChannelHandlerContext ctx, int lastStreamId, long errorCode, ByteBuf debugData,
|
||||
ChannelPromise promise) {
|
||||
connection.remote().goAwayReceived(lastStreamId);
|
||||
return frameWriter.writeGoAway(ctx, lastStreamId, errorCode, debugData, promise);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelFuture writeWindowUpdate(ChannelHandlerContext ctx, int streamId, int windowSizeIncrement,
|
||||
ChannelPromise promise) {
|
||||
return frameWriter.writeWindowUpdate(ctx, streamId, windowSizeIncrement, promise);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelFuture writeFrame(ChannelHandlerContext ctx, byte frameType, int streamId, Http2Flags flags,
|
||||
ByteBuf payload, ChannelPromise promise) {
|
||||
return frameWriter.writeFrame(ctx, frameType, streamId, flags, payload, promise);
|
||||
}
|
||||
|
||||
/**
|
||||
* Processes the given exception. Depending on the type of exception, delegates to either
|
||||
* {@link #onConnectionError(ChannelHandlerContext, Http2Exception)} or
|
||||
* {@link #onStreamError(ChannelHandlerContext, Http2StreamException)}.
|
||||
*/
|
||||
protected final void onHttp2Exception(ChannelHandlerContext ctx, Http2Exception e) {
|
||||
if (e instanceof Http2StreamException) {
|
||||
onStreamError(ctx, (Http2StreamException) e);
|
||||
} else {
|
||||
onConnectionError(ctx, e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Handler for a stream error. Sends a RST_STREAM frame to the remote endpoint and closes the stream.
|
||||
*/
|
||||
protected void onStreamError(ChannelHandlerContext ctx, Http2StreamException cause) {
|
||||
writeRstStream(ctx, cause.streamId(), cause.error().code(), ctx.newPromise(), true);
|
||||
}
|
||||
|
||||
/**
|
||||
* Handler for a connection error. Sends a GO_AWAY frame to the remote endpoint and waits until all streams are
|
||||
* closed before shutting down the connection.
|
||||
*/
|
||||
protected void onConnectionError(ChannelHandlerContext ctx, Http2Exception cause) {
|
||||
sendGoAway(ctx, ctx.newPromise(), cause);
|
||||
}
|
||||
|
||||
/**
|
||||
* If not already created, creates a new listener for the given promise which, when complete, closes the connection
|
||||
* and frees any resources.
|
||||
*/
|
||||
private ChannelFutureListener getOrCreateCloseListener(final ChannelHandlerContext ctx, ChannelPromise promise) {
|
||||
final ChannelPromise closePromise = promise == null ? ctx.newPromise() : promise;
|
||||
if (closeListener == null) {
|
||||
// If no promise was provided, create a new one.
|
||||
closeListener = new ChannelFutureListener() {
|
||||
@Override
|
||||
public void operationComplete(ChannelFuture future) throws Exception {
|
||||
ctx.close(closePromise);
|
||||
close();
|
||||
}
|
||||
};
|
||||
} else {
|
||||
closePromise.setSuccess();
|
||||
}
|
||||
|
||||
return closeListener;
|
||||
}
|
||||
|
||||
/**
|
||||
* Closes the remote side of the given stream. If this causes the stream to be closed, adds a hook to close the
|
||||
* channel after the given future completes.
|
||||
*
|
||||
* @param stream the stream to be half closed.
|
||||
* @param future If closing, the future after which to close the channel. If {@code null}, ignored.
|
||||
*/
|
||||
private void closeLocalSide(Http2Stream stream, ChannelFuture future) {
|
||||
switch (stream.state()) {
|
||||
case HALF_CLOSED_LOCAL:
|
||||
case OPEN:
|
||||
stream.closeLocalSide();
|
||||
break;
|
||||
default:
|
||||
connection.close(stream, future, closeListener);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
frameWriter.close();
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the {@link Http2Settings} object on the top of the queue that has been sent but not ACKed.
|
||||
* This may return {@code null}.
|
||||
*/
|
||||
public Http2Settings pollSettings() {
|
||||
return outstandingLocalSettingsQueue.poll();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Configuration configuration() {
|
||||
return frameWriter.configuration();
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the close listener associated with this object
|
||||
* @return
|
||||
*/
|
||||
public ChannelFutureListener closeListener() {
|
||||
return closeListener;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void initialOutboundWindowSize(int newWindowSize) throws Http2Exception {
|
||||
outboundFlow.initialOutboundWindowSize(newWindowSize);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int initialOutboundWindowSize() {
|
||||
return outboundFlow.initialOutboundWindowSize();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void updateOutboundWindowSize(int streamId, int deltaWindowSize) throws Http2Exception {
|
||||
outboundFlow.updateOutboundWindowSize(streamId, deltaWindowSize);
|
||||
}
|
||||
}
|
@ -26,7 +26,6 @@ import io.netty.channel.ChannelPromise;
|
||||
* writer.
|
||||
*/
|
||||
public class Http2OutboundFrameLogger implements Http2FrameWriter {
|
||||
|
||||
private final Http2FrameWriter writer;
|
||||
private final Http2FrameLogger logger;
|
||||
|
||||
@ -132,32 +131,7 @@ public class Http2OutboundFrameLogger implements Http2FrameWriter {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void maxHeaderTableSize(long max) throws Http2Exception {
|
||||
writer.maxHeaderTableSize(max);
|
||||
}
|
||||
|
||||
@Override
|
||||
public long maxHeaderTableSize() {
|
||||
return writer.maxHeaderTableSize();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void maxFrameSize(int max) {
|
||||
writer.maxFrameSize(max);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int maxFrameSize() {
|
||||
return writer.maxFrameSize();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void maxHeaderListSize(int max) {
|
||||
writer.maxHeaderListSize(max);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int maxHeaderListSize() {
|
||||
return writer.maxHeaderListSize();
|
||||
public Configuration configuration() {
|
||||
return writer.configuration();
|
||||
}
|
||||
}
|
||||
|
@ -43,7 +43,7 @@ public class Http2ServerUpgradeCodec implements HttpServerUpgradeHandler.Upgrade
|
||||
Collections.singletonList(HTTP_UPGRADE_SETTINGS_HEADER);
|
||||
|
||||
private final String handlerName;
|
||||
private final AbstractHttp2ConnectionHandler connectionHandler;
|
||||
private final Http2InboundConnectionHandler connectionHandler;
|
||||
private final Http2FrameReader frameReader;
|
||||
private Http2Settings settings;
|
||||
|
||||
@ -53,7 +53,7 @@ public class Http2ServerUpgradeCodec implements HttpServerUpgradeHandler.Upgrade
|
||||
*
|
||||
* @param connectionHandler the HTTP/2 connection handler.
|
||||
*/
|
||||
public Http2ServerUpgradeCodec(AbstractHttp2ConnectionHandler connectionHandler) {
|
||||
public Http2ServerUpgradeCodec(Http2InboundConnectionHandler connectionHandler) {
|
||||
this("http2ConnectionHandler", connectionHandler);
|
||||
}
|
||||
|
||||
@ -64,7 +64,7 @@ public class Http2ServerUpgradeCodec implements HttpServerUpgradeHandler.Upgrade
|
||||
* @param connectionHandler the HTTP/2 connection handler.
|
||||
*/
|
||||
public Http2ServerUpgradeCodec(String handlerName,
|
||||
AbstractHttp2ConnectionHandler connectionHandler) {
|
||||
Http2InboundConnectionHandler connectionHandler) {
|
||||
if (handlerName == null) {
|
||||
throw new NullPointerException("handlerName");
|
||||
}
|
||||
|
@ -22,22 +22,28 @@ import io.netty.handler.codec.http.FullHttpMessage;
|
||||
import io.netty.handler.codec.http.HttpHeaders;
|
||||
|
||||
/**
|
||||
* Light weight wrapper around {@link DelegatingHttp2ConnectionHandler} to provide HTTP/1.x object to HTTP/2 encoding
|
||||
* Light weight wrapper around {@link Http2ConnectionHandler} to provide HTTP/1.x objects to HTTP/2 frames
|
||||
* <p>
|
||||
* See {@link InboundHttp2ToHttpAdapter} to get translation from HTTP/2 frames to HTTP/1.x objects
|
||||
*/
|
||||
public class DelegatingHttp2HttpConnectionHandler extends DelegatingHttp2ConnectionHandler {
|
||||
|
||||
public DelegatingHttp2HttpConnectionHandler(boolean server, Http2FrameListener listener) {
|
||||
public class Http2ToHttpConnectionHandler extends Http2ConnectionHandler {
|
||||
public Http2ToHttpConnectionHandler(boolean server, Http2FrameListener listener) {
|
||||
super(server, listener);
|
||||
}
|
||||
|
||||
public DelegatingHttp2HttpConnectionHandler(Http2Connection connection, Http2FrameReader frameReader,
|
||||
Http2FrameWriter frameWriter, Http2InboundFlowController inboundFlow,
|
||||
Http2OutboundFlowController outboundFlow, Http2FrameListener listener) {
|
||||
super(connection, frameReader, frameWriter, inboundFlow, outboundFlow, listener);
|
||||
public Http2ToHttpConnectionHandler(Http2Connection connection, Http2FrameListener listener) {
|
||||
super(connection, listener);
|
||||
}
|
||||
|
||||
public DelegatingHttp2HttpConnectionHandler(Http2Connection connection, Http2FrameListener listener) {
|
||||
super(connection, listener);
|
||||
public Http2ToHttpConnectionHandler(Http2Connection connection, Http2FrameListener listener,
|
||||
Http2FrameReader frameReader, Http2FrameWriter frameWriter) {
|
||||
super(connection, listener, frameReader, frameWriter);
|
||||
}
|
||||
|
||||
public Http2ToHttpConnectionHandler(Http2Connection connection, Http2FrameListener listener,
|
||||
Http2FrameReader frameReader, Http2InboundFlowController inboundFlow,
|
||||
Http2OutboundConnectionAdapter outbound) {
|
||||
super(connection, listener, frameReader, inboundFlow, outbound);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -51,13 +57,13 @@ public class DelegatingHttp2HttpConnectionHandler extends DelegatingHttp2Connect
|
||||
int streamId = 0;
|
||||
String value = httpHeaders.get(HttpUtil.ExtensionHeaderNames.STREAM_ID.text());
|
||||
if (value == null) {
|
||||
streamId = nextStreamId();
|
||||
streamId = connection.local().nextStreamId();
|
||||
} else {
|
||||
try {
|
||||
streamId = Integer.parseInt(value);
|
||||
} catch (NumberFormatException e) {
|
||||
throw Http2Exception.format(Http2Error.INTERNAL_ERROR,
|
||||
"Invalid user-specified stream id value '%s'", value);
|
||||
throw Http2Exception.format(Http2Error.INTERNAL_ERROR, "Invalid user-specified stream id value '%s'",
|
||||
value);
|
||||
}
|
||||
}
|
||||
|
@ -50,7 +50,7 @@ public class DefaultHttp2HeadersEncoderTest {
|
||||
@Test(expected = Http2Exception.class)
|
||||
public void headersExceedMaxSetSizeShouldFail() throws Http2Exception {
|
||||
Http2Headers headers = headers();
|
||||
encoder.maxHeaderListSize(2);
|
||||
encoder.headerTable().maxHeaderListSize(2);
|
||||
encoder.encodeHeaders(headers, Unpooled.buffer());
|
||||
}
|
||||
|
||||
|
@ -32,6 +32,7 @@ import io.netty.buffer.Unpooled;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.ChannelPromise;
|
||||
import io.netty.handler.codec.http2.DefaultHttp2OutboundFlowController.OutboundFlowState;
|
||||
import io.netty.handler.codec.http2.Http2FrameWriter.Configuration;
|
||||
import io.netty.util.collection.IntObjectHashMap;
|
||||
import io.netty.util.collection.IntObjectMap;
|
||||
|
||||
@ -63,6 +64,12 @@ public class DefaultHttp2OutboundFlowControllerTest {
|
||||
@Mock
|
||||
private Http2FrameWriter frameWriter;
|
||||
|
||||
@Mock
|
||||
private Http2FrameSizePolicy frameWriterSizePolicy;
|
||||
|
||||
@Mock
|
||||
private Configuration frameWriterConfiguration;
|
||||
|
||||
@Mock
|
||||
private ChannelHandlerContext ctx;
|
||||
|
||||
@ -134,7 +141,7 @@ public class DefaultHttp2OutboundFlowControllerTest {
|
||||
|
||||
@Test
|
||||
public void frameLargerThanMaxFrameSizeShouldBeSplit() throws Http2Exception {
|
||||
when(frameWriter.maxFrameSize()).thenReturn(3);
|
||||
when(frameWriterSizePolicy.maxFrameSize()).thenReturn(3);
|
||||
|
||||
final ByteBuf data = dummyData(5, 0);
|
||||
try {
|
||||
@ -157,7 +164,7 @@ public class DefaultHttp2OutboundFlowControllerTest {
|
||||
|
||||
@Test
|
||||
public void frameShouldSplitForMaxFrameSize() throws Http2Exception {
|
||||
when(frameWriter.maxFrameSize()).thenReturn(5);
|
||||
when(frameWriterSizePolicy.maxFrameSize()).thenReturn(5);
|
||||
final ByteBuf data = dummyData(10, 0);
|
||||
try {
|
||||
ByteBuf slice1 = data.slice(0, 5);
|
||||
@ -1223,7 +1230,9 @@ public class DefaultHttp2OutboundFlowControllerTest {
|
||||
|
||||
private void resetFrameWriter() {
|
||||
Mockito.reset(frameWriter);
|
||||
when(frameWriter.maxFrameSize()).thenReturn(Integer.MAX_VALUE);
|
||||
when(frameWriter.configuration()).thenReturn(frameWriterConfiguration);
|
||||
when(frameWriterConfiguration.frameSizePolicy()).thenReturn(frameWriterSizePolicy);
|
||||
when(frameWriterSizePolicy.maxFrameSize()).thenReturn(Integer.MAX_VALUE);
|
||||
}
|
||||
|
||||
private int window(int streamId) {
|
||||
|
@ -66,9 +66,9 @@ import org.mockito.invocation.InvocationOnMock;
|
||||
import org.mockito.stubbing.Answer;
|
||||
|
||||
/**
|
||||
* Testing the {@link DelegatingHttp2HttpConnectionHandler} for {@link FullHttpRequest} objects into HTTP/2 frames
|
||||
* Testing the {@link Http2ToHttpConnectionHandler} for {@link FullHttpRequest} objects into HTTP/2 frames
|
||||
*/
|
||||
public class DelegatingHttp2HttpConnectionHandlerTest {
|
||||
public class DefaultHttp2ToHttpConnectionHandlerTest {
|
||||
private static final int CONNECTION_SETUP_READ_COUNT = 2;
|
||||
|
||||
@Mock
|
||||
@ -100,7 +100,7 @@ public class DelegatingHttp2HttpConnectionHandlerTest {
|
||||
protected void initChannel(Channel ch) throws Exception {
|
||||
ChannelPipeline p = ch.pipeline();
|
||||
serverFrameCountDown = new Http2TestUtil.FrameCountDown(serverListener, requestLatch);
|
||||
p.addLast(new DelegatingHttp2ConnectionHandler(true, serverFrameCountDown));
|
||||
p.addLast(new Http2ToHttpConnectionHandler(true, serverFrameCountDown));
|
||||
p.addLast(ignoreSettingsHandler());
|
||||
}
|
||||
});
|
||||
@ -111,7 +111,7 @@ public class DelegatingHttp2HttpConnectionHandlerTest {
|
||||
@Override
|
||||
protected void initChannel(Channel ch) throws Exception {
|
||||
ChannelPipeline p = ch.pipeline();
|
||||
p.addLast(new DelegatingHttp2HttpConnectionHandler(false, clientListener));
|
||||
p.addLast(new Http2ToHttpConnectionHandler(false, clientListener));
|
||||
p.addLast(ignoreSettingsHandler());
|
||||
}
|
||||
});
|
@ -39,6 +39,7 @@ import static org.mockito.Matchers.anyInt;
|
||||
import static org.mockito.Matchers.anyLong;
|
||||
import static org.mockito.Matchers.anyShort;
|
||||
import static org.mockito.Matchers.eq;
|
||||
import static org.mockito.Mockito.doAnswer;
|
||||
import static org.mockito.Mockito.doNothing;
|
||||
import static org.mockito.Mockito.never;
|
||||
import static org.mockito.Mockito.reset;
|
||||
@ -64,15 +65,17 @@ import org.junit.Test;
|
||||
import org.mockito.ArgumentCaptor;
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.MockitoAnnotations;
|
||||
import org.mockito.invocation.InvocationOnMock;
|
||||
import org.mockito.stubbing.Answer;
|
||||
|
||||
/**
|
||||
* Tests for {@link DelegatingHttp2ConnectionHandlerTest} and its base class {@link AbstractHttp2ConnectionHandler}.
|
||||
* Tests for {@link Http2ConnectionHandler}
|
||||
*/
|
||||
public class DelegatingHttp2ConnectionHandlerTest {
|
||||
public class Http2ConnectionHandlerTest {
|
||||
private static final int STREAM_ID = 1;
|
||||
private static final int PUSH_STREAM_ID = 2;
|
||||
|
||||
private DelegatingHttp2ConnectionHandler handler;
|
||||
private Http2ConnectionHandler handler;
|
||||
|
||||
@Mock
|
||||
private Http2Connection connection;
|
||||
@ -115,6 +118,24 @@ public class DelegatingHttp2ConnectionHandlerTest {
|
||||
@Mock
|
||||
private Http2FrameWriter writer;
|
||||
|
||||
@Mock
|
||||
private Http2HeaderTable readerTable;
|
||||
|
||||
@Mock
|
||||
private Http2HeaderTable writerTable;
|
||||
|
||||
@Mock
|
||||
private Http2FrameSizePolicy readerFrameSizePolicy;
|
||||
|
||||
@Mock
|
||||
private Http2FrameSizePolicy writerFrameSizePolicy;
|
||||
|
||||
@Mock
|
||||
private Http2FrameReader.Configuration readerConfiguration;
|
||||
|
||||
@Mock
|
||||
private Http2FrameWriter.Configuration writerConfiguration;
|
||||
|
||||
@Before
|
||||
public void setup() throws Exception {
|
||||
MockitoAnnotations.initMocks(this);
|
||||
@ -130,6 +151,27 @@ public class DelegatingHttp2ConnectionHandlerTest {
|
||||
when(connection.requireStream(STREAM_ID)).thenReturn(stream);
|
||||
when(connection.local()).thenReturn(local);
|
||||
when(connection.remote()).thenReturn(remote);
|
||||
doAnswer(new Answer<Void>() {
|
||||
@Override
|
||||
public Void answer(InvocationOnMock invocation) throws Throwable {
|
||||
((Http2Stream) invocation.getArguments()[0]).close();
|
||||
return null;
|
||||
}
|
||||
}).when(connection).close(any(Http2Stream.class), any(ChannelFuture.class), any(ChannelFutureListener.class));
|
||||
doAnswer(new Answer<Http2Stream>() {
|
||||
@Override
|
||||
public Http2Stream answer(InvocationOnMock invocation) throws Throwable {
|
||||
Object[] args = invocation.getArguments();
|
||||
return local.createStream((Integer) args[0], (Boolean) args[1]);
|
||||
}
|
||||
}).when(connection).createLocalStream(anyInt(), anyBoolean());
|
||||
doAnswer(new Answer<Http2Stream>() {
|
||||
@Override
|
||||
public Http2Stream answer(InvocationOnMock invocation) throws Throwable {
|
||||
Object[] args = invocation.getArguments();
|
||||
return remote.createStream((Integer) args[0], (Boolean) args[1]);
|
||||
}
|
||||
}).when(connection).createRemoteStream(anyInt(), anyBoolean());
|
||||
when(local.createStream(eq(STREAM_ID), anyBoolean())).thenReturn(stream);
|
||||
when(local.reservePushStream(eq(PUSH_STREAM_ID), eq(stream))).thenReturn(pushStream);
|
||||
when(remote.createStream(eq(STREAM_ID), anyBoolean())).thenReturn(stream);
|
||||
@ -140,7 +182,7 @@ public class DelegatingHttp2ConnectionHandlerTest {
|
||||
.thenReturn(future);
|
||||
mockContext();
|
||||
|
||||
handler = new DelegatingHttp2ConnectionHandler(connection, reader, writer, inboundFlow, outboundFlow, listener);
|
||||
handler = newConnectionHandler();
|
||||
|
||||
// Simulate activation of the handler to force writing the initial settings.
|
||||
Http2Settings settings = new Http2Settings();
|
||||
@ -153,11 +195,17 @@ public class DelegatingHttp2ConnectionHandlerTest {
|
||||
when(inboundFlow.initialInboundWindowSize()).thenReturn(10);
|
||||
when(local.allowPushTo()).thenReturn(true);
|
||||
when(remote.maxStreams()).thenReturn(100);
|
||||
when(reader.maxHeaderTableSize()).thenReturn(200L);
|
||||
when(reader.maxFrameSize()).thenReturn(DEFAULT_MAX_FRAME_SIZE);
|
||||
when(writer.maxFrameSize()).thenReturn(DEFAULT_MAX_FRAME_SIZE);
|
||||
when(reader.maxHeaderListSize()).thenReturn(Integer.MAX_VALUE);
|
||||
when(writer.maxHeaderListSize()).thenReturn(Integer.MAX_VALUE);
|
||||
when(reader.configuration()).thenReturn(readerConfiguration);
|
||||
when(writer.configuration()).thenReturn(writerConfiguration);
|
||||
when(readerConfiguration.frameSizePolicy()).thenReturn(readerFrameSizePolicy);
|
||||
when(writerConfiguration.frameSizePolicy()).thenReturn(writerFrameSizePolicy);
|
||||
when(readerFrameSizePolicy.maxFrameSize()).thenReturn(DEFAULT_MAX_FRAME_SIZE);
|
||||
when(writerFrameSizePolicy.maxFrameSize()).thenReturn(DEFAULT_MAX_FRAME_SIZE);
|
||||
when(readerConfiguration.headerTable()).thenReturn(readerTable);
|
||||
when(writerConfiguration.headerTable()).thenReturn(writerTable);
|
||||
when(readerTable.maxHeaderTableSize()).thenReturn(200);
|
||||
when(readerTable.maxHeaderListSize()).thenReturn(Integer.MAX_VALUE);
|
||||
when(writerTable.maxHeaderListSize()).thenReturn(Integer.MAX_VALUE);
|
||||
handler.handlerAdded(ctx);
|
||||
verify(writer).writeSettings(eq(ctx), eq(settings), eq(promise));
|
||||
|
||||
@ -174,6 +222,11 @@ public class DelegatingHttp2ConnectionHandlerTest {
|
||||
handler.handlerAdded(ctx);
|
||||
}
|
||||
|
||||
private Http2ConnectionHandler newConnectionHandler() {
|
||||
return new Http2ConnectionHandler(connection, listener, reader, inboundFlow,
|
||||
new Http2OutboundConnectionAdapter(connection, writer, outboundFlow));
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
handler.handlerRemoved(ctx);
|
||||
@ -182,7 +235,7 @@ public class DelegatingHttp2ConnectionHandlerTest {
|
||||
@Test
|
||||
public void clientShouldSendClientPrefaceStringWhenActive() throws Exception {
|
||||
when(connection.isServer()).thenReturn(false);
|
||||
handler = new DelegatingHttp2ConnectionHandler(connection, reader, writer, inboundFlow, outboundFlow, listener);
|
||||
handler = newConnectionHandler();
|
||||
handler.channelActive(ctx);
|
||||
verify(ctx).write(eq(connectionPrefaceBuf()));
|
||||
}
|
||||
@ -190,7 +243,7 @@ public class DelegatingHttp2ConnectionHandlerTest {
|
||||
@Test
|
||||
public void serverShouldNotSendClientPrefaceStringWhenActive() throws Exception {
|
||||
when(connection.isServer()).thenReturn(true);
|
||||
handler = new DelegatingHttp2ConnectionHandler(connection, reader, writer, inboundFlow, outboundFlow, listener);
|
||||
handler = newConnectionHandler();
|
||||
handler.channelActive(ctx);
|
||||
verify(ctx, never()).write(eq(connectionPrefaceBuf()));
|
||||
}
|
||||
@ -198,7 +251,7 @@ public class DelegatingHttp2ConnectionHandlerTest {
|
||||
@Test
|
||||
public void serverReceivingInvalidClientPrefaceStringShouldCloseConnection() throws Exception {
|
||||
when(connection.isServer()).thenReturn(true);
|
||||
handler = new DelegatingHttp2ConnectionHandler(connection, reader, writer, inboundFlow, outboundFlow, listener);
|
||||
handler = newConnectionHandler();
|
||||
handler.channelRead(ctx, copiedBuffer("BAD_PREFACE", UTF_8));
|
||||
verify(ctx).close();
|
||||
}
|
||||
@ -207,7 +260,7 @@ public class DelegatingHttp2ConnectionHandlerTest {
|
||||
public void serverReceivingValidClientPrefaceStringShouldContinueReadingFrames() throws Exception {
|
||||
reset(listener);
|
||||
when(connection.isServer()).thenReturn(true);
|
||||
handler = new DelegatingHttp2ConnectionHandler(connection, reader, writer, inboundFlow, outboundFlow, listener);
|
||||
handler = newConnectionHandler();
|
||||
handler.channelRead(ctx, connectionPrefaceBuf());
|
||||
verify(ctx, never()).close();
|
||||
decode().onSettingsRead(ctx, new Http2Settings());
|
||||
@ -434,7 +487,7 @@ public class DelegatingHttp2ConnectionHandlerTest {
|
||||
verify(remote).allowPushTo(true);
|
||||
verify(outboundFlow).initialOutboundWindowSize(123);
|
||||
verify(local).maxStreams(456);
|
||||
verify(writer).maxHeaderTableSize(789L);
|
||||
verify(writerTable).maxHeaderTableSize(789);
|
||||
// Take into account the time this was called during setup().
|
||||
verify(writer, times(2)).writeSettingsAck(eq(ctx), eq(promise));
|
||||
verify(listener).onSettingsRead(eq(ctx), eq(settings));
|
||||
@ -621,13 +674,13 @@ public class DelegatingHttp2ConnectionHandlerTest {
|
||||
@Test
|
||||
public void pingWriteAfterGoAwayShouldFail() throws Exception {
|
||||
when(connection.isGoAway()).thenReturn(true);
|
||||
ChannelFuture future = handler.writePing(ctx, emptyPingBuf(), promise);
|
||||
ChannelFuture future = handler.writePing(ctx, false, emptyPingBuf(), promise);
|
||||
assertTrue(future.awaitUninterruptibly().cause() instanceof Http2Exception);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void pingWriteShouldSucceed() throws Exception {
|
||||
handler.writePing(ctx, emptyPingBuf(), promise);
|
||||
handler.writePing(ctx, false, emptyPingBuf(), promise);
|
||||
verify(writer).writePing(eq(ctx), eq(false), eq(emptyPingBuf()), eq(promise));
|
||||
}
|
||||
|
||||
@ -651,13 +704,13 @@ public class DelegatingHttp2ConnectionHandlerTest {
|
||||
verify(inboundFlow, never()).initialInboundWindowSize(eq(100));
|
||||
verify(local, never()).allowPushTo(eq(false));
|
||||
verify(remote, never()).maxStreams(eq(1000));
|
||||
verify(reader, never()).maxHeaderTableSize(eq(2000L));
|
||||
verify(readerTable, never()).maxHeaderTableSize(eq(2000));
|
||||
// Verify that settings values are applied on the reception of SETTINGS ACK
|
||||
decode().onSettingsAckRead(ctx);
|
||||
verify(inboundFlow).initialInboundWindowSize(eq(100));
|
||||
verify(local).allowPushTo(eq(false));
|
||||
verify(remote).maxStreams(eq(1000));
|
||||
verify(reader).maxHeaderTableSize(eq(2000L));
|
||||
verify(readerTable).maxHeaderTableSize(eq(2000));
|
||||
}
|
||||
|
||||
private static ByteBuf dummyData() {
|
@ -77,7 +77,7 @@ public class Http2ConnectionRoundtripTest {
|
||||
@Mock
|
||||
private Http2FrameListener serverListener;
|
||||
|
||||
private DelegatingHttp2ConnectionHandler http2Client;
|
||||
private Http2ConnectionHandler http2Client;
|
||||
private ServerBootstrap sb;
|
||||
private Bootstrap cb;
|
||||
private Channel serverChannel;
|
||||
@ -102,7 +102,7 @@ public class Http2ConnectionRoundtripTest {
|
||||
protected void initChannel(Channel ch) throws Exception {
|
||||
ChannelPipeline p = ch.pipeline();
|
||||
serverFrameCountDown = new Http2TestUtil.FrameCountDown(serverListener, requestLatch, dataLatch);
|
||||
p.addLast(new DelegatingHttp2ConnectionHandler(true, serverFrameCountDown));
|
||||
p.addLast(new Http2ConnectionHandler(true, serverFrameCountDown));
|
||||
p.addLast(Http2CodecUtil.ignoreSettingsHandler());
|
||||
}
|
||||
});
|
||||
@ -113,7 +113,7 @@ public class Http2ConnectionRoundtripTest {
|
||||
@Override
|
||||
protected void initChannel(Channel ch) throws Exception {
|
||||
ChannelPipeline p = ch.pipeline();
|
||||
p.addLast(new DelegatingHttp2ConnectionHandler(false, clientListener));
|
||||
p.addLast(new Http2ConnectionHandler(false, clientListener));
|
||||
p.addLast(Http2CodecUtil.ignoreSettingsHandler());
|
||||
}
|
||||
});
|
||||
@ -124,7 +124,7 @@ public class Http2ConnectionRoundtripTest {
|
||||
ChannelFuture ccf = cb.connect(new InetSocketAddress(NetUtil.LOCALHOST, port));
|
||||
assertTrue(ccf.awaitUninterruptibly().isSuccess());
|
||||
clientChannel = ccf.channel();
|
||||
http2Client = clientChannel.pipeline().get(DelegatingHttp2ConnectionHandler.class);
|
||||
http2Client = clientChannel.pipeline().get(Http2ConnectionHandler.class);
|
||||
}
|
||||
|
||||
@After
|
||||
@ -223,7 +223,7 @@ public class Http2ConnectionRoundtripTest {
|
||||
for (int i = 0, nextStream = 3; i < NUM_STREAMS; ++i, nextStream += 2) {
|
||||
http2Client.writeHeaders(ctx(), nextStream, headers, 0, (short) 16, false, 0, false,
|
||||
newPromise());
|
||||
http2Client.writePing(ctx(), pingData.slice().retain(), newPromise());
|
||||
http2Client.writePing(ctx(), false, pingData.slice().retain(), newPromise());
|
||||
http2Client.writeData(ctx(), nextStream, data.slice().retain(), 0, true, newPromise());
|
||||
}
|
||||
}
|
||||
|
@ -77,10 +77,10 @@ public class Http2HeaderBlockIOTest {
|
||||
|
||||
@Test
|
||||
public void setMaxHeaderSizeShouldBeSuccessful() throws Http2Exception {
|
||||
encoder.maxHeaderTableSize(10);
|
||||
encoder.headerTable().maxHeaderTableSize(10);
|
||||
Http2Headers in = headers();
|
||||
assertRoundtripSuccessful(in);
|
||||
assertEquals(10, decoder.maxHeaderTableSize());
|
||||
assertEquals(10, decoder.headerTable().maxHeaderTableSize());
|
||||
}
|
||||
|
||||
private void assertRoundtripSuccessful(Http2Headers in) throws Http2Exception {
|
||||
|
@ -29,10 +29,8 @@ import io.netty.handler.codec.http2.DefaultHttp2Connection;
|
||||
import io.netty.handler.codec.http2.DefaultHttp2FrameReader;
|
||||
import io.netty.handler.codec.http2.DefaultHttp2FrameWriter;
|
||||
import io.netty.handler.codec.http2.DefaultHttp2InboundFlowController;
|
||||
import io.netty.handler.codec.http2.DefaultHttp2OutboundFlowController;
|
||||
import io.netty.handler.codec.http2.Http2OutboundConnectionAdapter;
|
||||
import io.netty.handler.codec.http2.DelegatingDecompressorFrameListener;
|
||||
import io.netty.handler.codec.http2.DelegatingHttp2ConnectionHandler;
|
||||
import io.netty.handler.codec.http2.DelegatingHttp2HttpConnectionHandler;
|
||||
import io.netty.handler.codec.http2.Http2ClientUpgradeCodec;
|
||||
import io.netty.handler.codec.http2.Http2Connection;
|
||||
import io.netty.handler.codec.http2.Http2FrameLogger;
|
||||
@ -41,6 +39,7 @@ import io.netty.handler.codec.http2.Http2FrameWriter;
|
||||
import io.netty.handler.codec.http2.Http2InboundFrameLogger;
|
||||
import io.netty.handler.codec.http2.Http2OutboundFrameLogger;
|
||||
import io.netty.handler.codec.http2.InboundHttp2ToHttpAdapter;
|
||||
import io.netty.handler.codec.http2.Http2ToHttpConnectionHandler;
|
||||
import io.netty.handler.ssl.SslContext;
|
||||
import io.netty.util.internal.logging.InternalLoggerFactory;
|
||||
|
||||
@ -53,7 +52,7 @@ public class Http2ClientInitializer extends ChannelInitializer<SocketChannel> {
|
||||
|
||||
private final SslContext sslCtx;
|
||||
private final int maxContentLength;
|
||||
private DelegatingHttp2ConnectionHandler connectionHandler;
|
||||
private Http2ToHttpConnectionHandler connectionHandler;
|
||||
private HttpResponseHandler responseHandler;
|
||||
private Http2SettingsHandler settingsHandler;
|
||||
|
||||
@ -66,12 +65,12 @@ public class Http2ClientInitializer extends ChannelInitializer<SocketChannel> {
|
||||
public void initChannel(SocketChannel ch) throws Exception {
|
||||
final Http2Connection connection = new DefaultHttp2Connection(false);
|
||||
final Http2FrameWriter frameWriter = frameWriter();
|
||||
connectionHandler = new DelegatingHttp2HttpConnectionHandler(connection,
|
||||
frameReader(), frameWriter,
|
||||
new DefaultHttp2InboundFlowController(connection, frameWriter),
|
||||
new DefaultHttp2OutboundFlowController(connection, frameWriter),
|
||||
new DelegatingDecompressorFrameListener(connection,
|
||||
InboundHttp2ToHttpAdapter.newInstance(connection, maxContentLength)));
|
||||
connectionHandler = new Http2ToHttpConnectionHandler(connection,
|
||||
new DelegatingDecompressorFrameListener(connection,
|
||||
InboundHttp2ToHttpAdapter.newInstance(connection, maxContentLength)),
|
||||
frameReader(),
|
||||
new DefaultHttp2InboundFlowController(connection, frameWriter),
|
||||
new Http2OutboundConnectionAdapter(connection, frameWriter));
|
||||
responseHandler = new HttpResponseHandler();
|
||||
settingsHandler = new Http2SettingsHandler(ch.newPromise());
|
||||
if (sslCtx != null) {
|
||||
|
@ -23,19 +23,20 @@ import io.netty.buffer.ByteBuf;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.handler.codec.AsciiString;
|
||||
import io.netty.handler.codec.http.HttpServerUpgradeHandler;
|
||||
import io.netty.handler.codec.http2.AbstractHttp2ConnectionHandler;
|
||||
import io.netty.handler.codec.http2.DefaultHttp2Connection;
|
||||
import io.netty.handler.codec.http2.DefaultHttp2FrameReader;
|
||||
import io.netty.handler.codec.http2.DefaultHttp2FrameWriter;
|
||||
import io.netty.handler.codec.http2.DefaultHttp2Headers;
|
||||
import io.netty.handler.codec.http2.DefaultHttp2InboundFlowController;
|
||||
import io.netty.handler.codec.http2.DefaultHttp2OutboundFlowController;
|
||||
import io.netty.handler.codec.http2.Http2Connection;
|
||||
import io.netty.handler.codec.http2.Http2ConnectionHandler;
|
||||
import io.netty.handler.codec.http2.Http2Exception;
|
||||
import io.netty.handler.codec.http2.Http2FrameAdapter;
|
||||
import io.netty.handler.codec.http2.Http2FrameLogger;
|
||||
import io.netty.handler.codec.http2.Http2FrameWriter;
|
||||
import io.netty.handler.codec.http2.Http2Headers;
|
||||
import io.netty.handler.codec.http2.Http2InboundFrameLogger;
|
||||
import io.netty.handler.codec.http2.Http2OutboundConnectionAdapter;
|
||||
import io.netty.handler.codec.http2.Http2OutboundFrameLogger;
|
||||
import io.netty.util.CharsetUtil;
|
||||
import io.netty.util.internal.logging.InternalLoggerFactory;
|
||||
@ -43,7 +44,7 @@ import io.netty.util.internal.logging.InternalLoggerFactory;
|
||||
/**
|
||||
* A simple handler that responds with the message "Hello World!".
|
||||
*/
|
||||
public class HelloWorldHttp2Handler extends AbstractHttp2ConnectionHandler {
|
||||
public class HelloWorldHttp2Handler extends Http2ConnectionHandler {
|
||||
|
||||
private static final Http2FrameLogger logger = new Http2FrameLogger(INFO,
|
||||
InternalLoggerFactory.getInstance(HelloWorldHttp2Handler.class));
|
||||
@ -54,10 +55,14 @@ public class HelloWorldHttp2Handler extends AbstractHttp2ConnectionHandler {
|
||||
}
|
||||
|
||||
private HelloWorldHttp2Handler(Http2Connection connection, Http2FrameWriter frameWriter) {
|
||||
super(connection, new Http2InboundFrameLogger(new DefaultHttp2FrameReader(), logger),
|
||||
frameWriter,
|
||||
new DefaultHttp2InboundFlowController(connection, frameWriter),
|
||||
new DefaultHttp2OutboundFlowController(connection, frameWriter));
|
||||
this(connection, frameWriter, new Http2OutboundConnectionAdapter(connection, frameWriter));
|
||||
}
|
||||
|
||||
private HelloWorldHttp2Handler(Http2Connection connection, Http2FrameWriter frameWriter,
|
||||
Http2OutboundConnectionAdapter outbound) {
|
||||
super(connection, new SimpleHttp2FrameListener(outbound),
|
||||
new Http2InboundFrameLogger(new DefaultHttp2FrameReader(), logger),
|
||||
new DefaultHttp2InboundFlowController(connection, frameWriter), outbound);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -76,43 +81,50 @@ public class HelloWorldHttp2Handler extends AbstractHttp2ConnectionHandler {
|
||||
super.userEventTriggered(ctx, evt);
|
||||
}
|
||||
|
||||
/**
|
||||
* If receive a frame with end-of-stream set, send a pre-canned response.
|
||||
*/
|
||||
@Override
|
||||
public void onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding,
|
||||
boolean endOfStream) throws Http2Exception {
|
||||
if (endOfStream) {
|
||||
sendResponse(ctx(), streamId, data.retain());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* If receive a frame with end-of-stream set, send a pre-canned response.
|
||||
*/
|
||||
@Override
|
||||
public void onHeadersRead(ChannelHandlerContext ctx, int streamId,
|
||||
Http2Headers headers, int streamDependency, short weight,
|
||||
boolean exclusive, int padding, boolean endStream) throws Http2Exception {
|
||||
if (endStream) {
|
||||
sendResponse(ctx(), streamId, RESPONSE_BYTES.duplicate());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
|
||||
cause.printStackTrace();
|
||||
ctx.close();
|
||||
}
|
||||
|
||||
/**
|
||||
* Sends a "Hello World" DATA frame to the client.
|
||||
*/
|
||||
private void sendResponse(ChannelHandlerContext ctx, int streamId, ByteBuf payload) {
|
||||
// Send a frame for the response status
|
||||
Http2Headers headers = new DefaultHttp2Headers().status(new AsciiString("200"));
|
||||
writeHeaders(ctx(), streamId, headers, 0, false, ctx().newPromise());
|
||||
private static class SimpleHttp2FrameListener extends Http2FrameAdapter {
|
||||
private Http2OutboundConnectionAdapter outbound;
|
||||
|
||||
writeData(ctx(), streamId, payload, 0, true, ctx().newPromise());
|
||||
}
|
||||
public SimpleHttp2FrameListener(Http2OutboundConnectionAdapter outbound) {
|
||||
this.outbound = outbound;
|
||||
}
|
||||
|
||||
/**
|
||||
* If receive a frame with end-of-stream set, send a pre-canned response.
|
||||
*/
|
||||
@Override
|
||||
public void onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding,
|
||||
boolean endOfStream) throws Http2Exception {
|
||||
if (endOfStream) {
|
||||
sendResponse(ctx, streamId, data.retain());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* If receive a frame with end-of-stream set, send a pre-canned response.
|
||||
*/
|
||||
@Override
|
||||
public void onHeadersRead(ChannelHandlerContext ctx, int streamId,
|
||||
Http2Headers headers, int streamDependency, short weight,
|
||||
boolean exclusive, int padding, boolean endStream) throws Http2Exception {
|
||||
if (endStream) {
|
||||
sendResponse(ctx, streamId, RESPONSE_BYTES.duplicate());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Sends a "Hello World" DATA frame to the client.
|
||||
*/
|
||||
private void sendResponse(ChannelHandlerContext ctx, int streamId, ByteBuf payload) {
|
||||
// Send a frame for the response status
|
||||
Http2Headers headers = new DefaultHttp2Headers().status(new AsciiString("200"));
|
||||
outbound.writeHeaders(ctx, streamId, headers, 0, false, ctx.newPromise());
|
||||
outbound.writeData(ctx, streamId, payload, 0, true, ctx.newPromise());
|
||||
}
|
||||
};
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user