Fixing bugs in HTTP/2 pipeline exception handling

Motivation:

HTTP/2 codec does not properly test exception passed to
exceptionCaught() for instanceof Http2Exception (since the exception
will always be wrapped in a PipelineException), so it will never
properly handle Http2Exceptions in the pipeline.

Also if any streams are present, the connection close logic will execute
twice when a pipeline exception. This is because the exception logic
calls ctx.close() which then triggers the handleInActive() logic to
execute.  This clears all of the remaining streams and then attempts to
run the closeListener logic (which has already been run).

Modifications:

Changed exceptionCaught logic to properly extract Http2Exception from
the PipelineException.  Also added logic to the closeListener so that is
only run once.

Changed Http2CodecUtil.toHttp2Exception() to avoid NPE when creating
an exception with cause.getMessage().

Refactored Http2ConnectionHandler to more cleanly separate inbound and
outbound flows (Http2ConnectionDecoder/Http2ConnectionEncoder).

Added a test for verifying that a pipeline exception closes the
connection.

Result:

Exception handling logic is tidied up.
This commit is contained in:
nmittler 2014-09-25 15:38:55 -07:00
parent 741ea7766c
commit 2b7f344a01
32 changed files with 1576 additions and 1267 deletions

View File

@ -29,8 +29,7 @@ import static io.netty.handler.codec.http2.Http2Stream.State.IDLE;
import static io.netty.handler.codec.http2.Http2Stream.State.OPEN;
import static io.netty.handler.codec.http2.Http2Stream.State.RESERVED_LOCAL;
import static io.netty.handler.codec.http2.Http2Stream.State.RESERVED_REMOTE;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import static io.netty.util.internal.ObjectUtil.checkNotNull;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.handler.codec.http2.Http2StreamRemovalPolicy.Action;
import io.netty.util.collection.IntObjectHashMap;
@ -77,10 +76,8 @@ public class DefaultHttp2Connection implements Http2Connection {
* the policy to be used for removal of closed stream.
*/
public DefaultHttp2Connection(boolean server, Http2StreamRemovalPolicy removalPolicy) {
if (removalPolicy == null) {
throw new NullPointerException("removalPolicy");
}
this.removalPolicy = removalPolicy;
this.removalPolicy = checkNotNull(removalPolicy, "removalPolicy");
localEndpoint = new DefaultEndpoint(server);
remoteEndpoint = new DefaultEndpoint(!server);
@ -165,17 +162,6 @@ public class DefaultHttp2Connection implements Http2Connection {
return remote().createStream(streamId, halfClosed);
}
@Override
public void close(Http2Stream stream, ChannelFuture future, ChannelFutureListener closeListener) {
stream.close();
// If this connection is closing and there are no longer any
// active streams, close after the current operation completes.
if (closeListener != null && numActiveStreams() == 0) {
future.addListener(closeListener);
}
}
private void removeStream(DefaultStream stream) {
// Notify the listeners of the event first.
for (Listener listener : listeners) {

View File

@ -15,8 +15,6 @@
package io.netty.handler.codec.http2;
import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_PRIORITY_WEIGHT;
import static io.netty.handler.codec.http2.Http2CodecUtil.HTTP_UPGRADE_STREAM_ID;
import static io.netty.handler.codec.http2.Http2CodecUtil.connectionPrefaceBuf;
import static io.netty.handler.codec.http2.Http2Error.PROTOCOL_ERROR;
import static io.netty.handler.codec.http2.Http2Error.STREAM_CLOSED;
import static io.netty.handler.codec.http2.Http2Exception.protocolError;
@ -24,14 +22,10 @@ import static io.netty.handler.codec.http2.Http2Stream.State.CLOSED;
import static io.netty.handler.codec.http2.Http2Stream.State.HALF_CLOSED_LOCAL;
import static io.netty.handler.codec.http2.Http2Stream.State.OPEN;
import static io.netty.handler.codec.http2.Http2Stream.State.RESERVED_REMOTE;
import static io.netty.util.internal.ObjectUtil.checkNotNull;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.ByteToMessageDecoder;
import java.util.Collection;
import java.util.List;
/**
@ -42,89 +36,54 @@ import java.util.List;
* <p>
* This interface enforces inbound flow control functionality through {@link Http2InboundFlowController}
*/
public class Http2InboundConnectionHandler extends ByteToMessageDecoder {
public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder {
private final Http2FrameListener internalFrameListener = new FrameReadListener();
protected final Http2OutboundConnectionAdapter outbound;
private final Http2FrameListener listener;
private final Http2Connection connection;
private final Http2LifecycleManager lifecycleManager;
private final Http2ConnectionEncoder encoder;
private final Http2FrameReader frameReader;
protected final Http2Connection connection;
private final Http2InboundFlowController inboundFlow;
private ByteBuf clientPrefaceString;
private boolean prefaceSent;
private final Http2FrameListener listener;
private boolean prefaceReceived;
public Http2InboundConnectionHandler(Http2Connection connection, Http2FrameListener listener,
Http2FrameReader frameReader, Http2InboundFlowController inboundFlow,
Http2OutboundConnectionAdapter outbound) {
if (connection == null) {
throw new NullPointerException("connection");
}
if (frameReader == null) {
throw new NullPointerException("frameReader");
}
if (listener == null) {
throw new NullPointerException("listener");
}
if (inboundFlow == null) {
throw new NullPointerException("inboundFlow");
}
if (outbound == null) {
throw new NullPointerException("outbound");
}
this.connection = connection;
this.frameReader = frameReader;
this.listener = listener;
this.outbound = outbound;
this.inboundFlow = inboundFlow;
// Set the expected client preface string. Only servers should receive this.
clientPrefaceString = connection.isServer() ? connectionPrefaceBuf() : null;
public DefaultHttp2ConnectionDecoder(Http2Connection connection, Http2FrameReader frameReader,
Http2InboundFlowController inboundFlow, Http2ConnectionEncoder encoder,
Http2LifecycleManager lifecycleManager, Http2FrameListener listener) {
this.connection = checkNotNull(connection, "connection");
this.frameReader = checkNotNull(frameReader, "frameReader");
this.lifecycleManager = checkNotNull(lifecycleManager, "lifecycleManager");
this.encoder = checkNotNull(encoder, "encoder");
this.inboundFlow = checkNotNull(inboundFlow, "inboundFlow");
this.listener = checkNotNull(listener, "listener");
}
/**
* Handles the client-side (cleartext) upgrade from HTTP to HTTP/2.
* Reserves local stream 1 for the HTTP/2 response.
*/
public void onHttpClientUpgrade() throws Http2Exception {
if (connection.isServer()) {
throw protocolError("Client-side HTTP upgrade requested for a server");
}
if (prefaceSent || prefaceReceived) {
throw protocolError("HTTP upgrade must occur before HTTP/2 preface is sent or received");
}
// Create a local stream used for the HTTP cleartext upgrade.
connection.createLocalStream(HTTP_UPGRADE_STREAM_ID, true);
public Http2Connection connection() {
return connection;
}
/**
* Handles the server-side (cleartext) upgrade from HTTP to HTTP/2.
* @param settings the settings for the remote endpoint.
*/
public void onHttpServerUpgrade(Http2Settings settings) throws Http2Exception {
if (!connection.isServer()) {
throw protocolError("Server-side HTTP upgrade requested for a client");
}
if (prefaceSent || prefaceReceived) {
throw protocolError("HTTP upgrade must occur before HTTP/2 preface is sent or received");
}
// Apply the settings but no ACK is necessary.
applyRemoteSettings(settings);
// Create a stream in the half-closed state.
connection.createRemoteStream(HTTP_UPGRADE_STREAM_ID, true);
public Http2FrameListener listener() {
return listener;
}
/**
* Gets the local settings for this endpoint of the HTTP/2 connection.
*/
public Http2Settings settings() {
public Http2LifecycleManager lifecycleManager() {
return lifecycleManager;
}
public boolean prefaceReceived() {
return prefaceReceived;
}
@Override
public void decodeFrame(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Http2Exception {
frameReader.readFrame(ctx, in, internalFrameListener);
}
@Override
public Http2Settings localSettings() {
Http2Settings settings = new Http2Settings();
final Http2FrameReader.Configuration config = frameReader.configuration();
final Http2HeaderTable headerTable = config.headerTable();
final Http2FrameSizePolicy frameSizePolicy = config.frameSizePolicy();
Http2FrameReader.Configuration config = frameReader.configuration();
Http2HeaderTable headerTable = config.headerTable();
Http2FrameSizePolicy frameSizePolicy = config.frameSizePolicy();
settings.initialWindowSize(inboundFlow.initialInboundWindowSize());
settings.maxConcurrentStreams(connection.remote().maxStreams());
settings.headerTableSize(headerTable.maxHeaderTableSize());
@ -137,212 +96,49 @@ public class Http2InboundConnectionHandler extends ByteToMessageDecoder {
return settings;
}
/**
* Closes all closable resources and frees any allocated resources.
* <p>
* This does NOT close the {@link Http2OutboundConnectionAdapter} reference in this class
*/
public void close() {
frameReader.close();
if (clientPrefaceString != null) {
clientPrefaceString.release();
clientPrefaceString = null;
}
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// The channel just became active - send the connection preface to the remote
// endpoint.
sendPreface(ctx);
super.channelActive(ctx);
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
// This handler was just added to the context. In case it was handled after
// the connection became active, send the connection preface now.
sendPreface(ctx);
}
@Override
protected void handlerRemoved0(ChannelHandlerContext ctx) throws Exception {
close();
}
@Override
public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
// Avoid NotYetConnectedException
if (!ctx.channel().isActive()) {
ctx.close(promise);
return;
}
outbound.sendGoAway(ctx, promise, null);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
ChannelFuture future = ctx.newSucceededFuture();
final Collection<Http2Stream> streams = connection.activeStreams();
for (Http2Stream s : streams.toArray(new Http2Stream[streams.size()])) {
connection.close(s, future, outbound.closeListener());
}
super.channelInactive(ctx);
}
/**
* Handles {@link Http2Exception} objects that were thrown from other handlers. Ignores all other exceptions.
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
if (cause instanceof Http2Exception) {
onHttp2Exception(ctx, (Http2Exception) cause);
}
super.exceptionCaught(ctx, cause);
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
try {
// Read the remaining of the client preface string if we haven't already.
// If this is a client endpoint, always returns true.
if (!readClientPrefaceString(ctx, in)) {
// Still processing the client preface.
return;
}
frameReader.readFrame(ctx, in, internalFrameListener);
} catch (Http2Exception e) {
onHttp2Exception(ctx, e);
} catch (Throwable e) {
onHttp2Exception(ctx, new Http2Exception(Http2Error.INTERNAL_ERROR, e.getMessage(), e));
}
}
/**
* Processes the given exception. Depending on the type of exception, delegates to either
* {@link #onConnectionError(ChannelHandlerContext, Http2Exception)} or
* {@link #onStreamError(ChannelHandlerContext, Http2StreamException)}.
*/
protected final void onHttp2Exception(ChannelHandlerContext ctx, Http2Exception e) {
if (e instanceof Http2StreamException) {
onStreamError(ctx, (Http2StreamException) e);
} else {
onConnectionError(ctx, e);
}
}
/**
* Handler for a connection error. Sends a GO_AWAY frame to the remote endpoint and waits until all streams are
* closed before shutting down the connection.
*/
protected void onConnectionError(ChannelHandlerContext ctx, Http2Exception cause) {
outbound.sendGoAway(ctx, ctx.newPromise(), cause);
}
/**
* Handler for a stream error. Sends a RST_STREAM frame to the remote endpoint and closes the stream.
*/
protected void onStreamError(ChannelHandlerContext ctx, Http2StreamException cause) {
outbound.writeRstStream(ctx, cause.streamId(), cause.error().code(), ctx.newPromise(), true);
}
/**
* Sends the HTTP/2 connection preface upon establishment of the connection, if not already sent.
*/
private void sendPreface(final ChannelHandlerContext ctx) {
if (prefaceSent || !ctx.channel().isActive()) {
return;
}
prefaceSent = true;
if (!connection.isServer()) {
// Clients must send the preface string as the first bytes on the connection.
ctx.write(connectionPrefaceBuf()).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
}
// Both client and server must send their initial settings.
outbound.writeSettings(ctx, settings(), ctx.newPromise()).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
}
/**
* Applies settings received from the remote endpoint.
*/
private void applyRemoteSettings(Http2Settings settings) throws Http2Exception {
public void localSettings(Http2Settings settings) throws Http2Exception {
Boolean pushEnabled = settings.pushEnabled();
final Http2FrameWriter.Configuration config = outbound.configuration();
final Http2HeaderTable headerTable = config.headerTable();
final Http2FrameSizePolicy frameSizePolicy = config.frameSizePolicy();
Http2FrameReader.Configuration config = frameReader.configuration();
Http2HeaderTable inboundHeaderTable = config.headerTable();
Http2FrameSizePolicy inboundFrameSizePolicy = config.frameSizePolicy();
if (pushEnabled != null) {
if (!connection.isServer()) {
throw protocolError("Client received SETTINGS frame with ENABLE_PUSH specified");
if (connection.isServer()) {
throw protocolError("Server sending SETTINGS frame with ENABLE_PUSH specified");
}
connection.remote().allowPushTo(pushEnabled);
connection.local().allowPushTo(pushEnabled);
}
Long maxConcurrentStreams = settings.maxConcurrentStreams();
if (maxConcurrentStreams != null) {
int value = (int) Math.min(maxConcurrentStreams, Integer.MAX_VALUE);
connection.local().maxStreams(value);
connection.remote().maxStreams(value);
}
Long headerTableSize = settings.headerTableSize();
if (headerTableSize != null) {
headerTable.maxHeaderTableSize((int) Math.min(headerTableSize.intValue(), Integer.MAX_VALUE));
inboundHeaderTable.maxHeaderTableSize((int) Math.min(headerTableSize, Integer.MAX_VALUE));
}
Integer maxHeaderListSize = settings.maxHeaderListSize();
if (maxHeaderListSize != null) {
headerTable.maxHeaderListSize(maxHeaderListSize);
inboundHeaderTable.maxHeaderListSize(maxHeaderListSize);
}
Integer maxFrameSize = settings.maxFrameSize();
if (maxFrameSize != null) {
frameSizePolicy.maxFrameSize(maxFrameSize);
inboundFrameSizePolicy.maxFrameSize(maxFrameSize);
}
Integer initialWindowSize = settings.initialWindowSize();
if (initialWindowSize != null) {
outbound.initialOutboundWindowSize(initialWindowSize);
inboundFlow.initialInboundWindowSize(initialWindowSize);
}
}
/**
* Decodes the client connection preface string from the input buffer.
*
* @return {@code true} if processing of the client preface string is complete. Since client preface strings can
* only be received by servers, returns true immediately for client endpoints.
*/
private boolean readClientPrefaceString(ChannelHandlerContext ctx, ByteBuf in) {
if (clientPrefaceString == null) {
return true;
}
int prefaceRemaining = clientPrefaceString.readableBytes();
int bytesRead = Math.min(in.readableBytes(), prefaceRemaining);
// Read the portion of the input up to the length of the preface, if reached.
ByteBuf sourceSlice = in.readSlice(bytesRead);
// Read the same number of bytes from the preface buffer.
ByteBuf prefaceSlice = clientPrefaceString.readSlice(bytesRead);
// If the input so far doesn't match the preface, break the connection.
if (bytesRead == 0 || !prefaceSlice.equals(sourceSlice)) {
ctx.close();
return false;
}
if (!clientPrefaceString.isReadable()) {
// Entire preface has been read.
clientPrefaceString.release();
clientPrefaceString = null;
return true;
}
return false;
@Override
public void close() {
frameReader.close();
}
/**
@ -372,7 +168,7 @@ public class Http2InboundConnectionHandler extends ByteToMessageDecoder {
listener.onDataRead(ctx, streamId, data, padding, endOfStream);
if (endOfStream) {
closeRemoteSide(stream, ctx.newSucceededFuture());
lifecycleManager.closeRemoteSide(stream, ctx.newSucceededFuture());
}
}
@ -385,25 +181,6 @@ public class Http2InboundConnectionHandler extends ByteToMessageDecoder {
}
}
/**
* Closes the remote side of the given stream. If this causes the stream to be closed, adds a hook to close the
* channel after the given future completes.
*
* @param stream the stream to be half closed.
* @param future If closing, the future after which to close the channel. If {@code null}, ignored.
*/
private void closeRemoteSide(Http2Stream stream, ChannelFuture future) {
switch (stream.state()) {
case HALF_CLOSED_REMOTE:
case OPEN:
stream.closeRemoteSide();
break;
default:
connection.close(stream, future, outbound.closeListener());
break;
}
}
@Override
public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int padding,
boolean endStream) throws Http2Exception {
@ -436,13 +213,14 @@ public class Http2InboundConnectionHandler extends ByteToMessageDecoder {
}
}
listener.onHeadersRead(ctx, streamId, headers, streamDependency, weight, exclusive, padding, endStream);
listener.onHeadersRead(ctx, streamId, headers,
streamDependency, weight, exclusive, padding, endStream);
stream.setPriority(streamDependency, weight, exclusive);
// If the headers completes this stream, close it.
if (endStream) {
closeRemoteSide(stream, ctx.newSucceededFuture());
lifecycleManager.closeRemoteSide(stream, ctx.newSucceededFuture());
}
}
@ -479,7 +257,7 @@ public class Http2InboundConnectionHandler extends ByteToMessageDecoder {
listener.onRstStreamRead(ctx, streamId, errorCode);
connection.close(stream, ctx.newSucceededFuture(), outbound.closeListener());
lifecycleManager.closeStream(stream, ctx.newSucceededFuture());
}
@Override
@ -487,7 +265,7 @@ public class Http2InboundConnectionHandler extends ByteToMessageDecoder {
verifyPrefaceReceived();
// Apply oldest outstanding local settings here. This is a synchronization point
// between endpoints.
Http2Settings settings = outbound.pollSettings();
Http2Settings settings = encoder.pollSentSettings();
if (settings != null) {
applyLocalSettings(settings);
@ -540,10 +318,10 @@ public class Http2InboundConnectionHandler extends ByteToMessageDecoder {
@Override
public void onSettingsRead(ChannelHandlerContext ctx, Http2Settings settings) throws Http2Exception {
applyRemoteSettings(settings);
encoder.remoteSettings(settings);
// Acknowledge receipt of the settings.
outbound.writeSettingsAck(ctx, ctx.newPromise());
encoder.writeSettingsAck(ctx, ctx.newPromise());
ctx.flush();
// We've received at least one non-ack settings frame from the remote endpoint.
@ -558,7 +336,7 @@ public class Http2InboundConnectionHandler extends ByteToMessageDecoder {
// Send an ack back to the remote client.
// Need to retain the buffer here since it will be released after the write completes.
outbound.writePing(ctx, true, data.retain(), ctx.newPromise());
encoder.writePing(ctx, true, data.retain(), ctx.newPromise());
ctx.flush();
listener.onPingRead(ctx, data);
@ -613,7 +391,7 @@ public class Http2InboundConnectionHandler extends ByteToMessageDecoder {
}
// Update the outbound flow controller.
outbound.updateOutboundWindowSize(streamId, windowSizeIncrement);
encoder.updateOutboundWindowSize(streamId, windowSizeIncrement);
listener.onWindowUpdateRead(ctx, streamId, windowSizeIncrement);
}

View File

@ -15,48 +15,78 @@
package io.netty.handler.codec.http2;
import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_PRIORITY_WEIGHT;
import static io.netty.handler.codec.http2.Http2CodecUtil.toByteBuf;
import static io.netty.handler.codec.http2.Http2CodecUtil.toHttp2Exception;
import static io.netty.handler.codec.http2.Http2Error.NO_ERROR;
import static io.netty.handler.codec.http2.Http2Error.PROTOCOL_ERROR;
import static io.netty.handler.codec.http2.Http2Exception.protocolError;
import static io.netty.handler.codec.http2.Http2Stream.State.HALF_CLOSED_REMOTE;
import static io.netty.handler.codec.http2.Http2Stream.State.OPEN;
import static io.netty.handler.codec.http2.Http2Stream.State.RESERVED_LOCAL;
import static io.netty.util.internal.ObjectUtil.checkNotNull;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import java.io.Closeable;
import java.util.ArrayDeque;
/**
* Provides the ability to write HTTP/2 frames
* <p>
* This class provides write methods which turn java calls into HTTP/2 frames
* <p>
* This interface enforces outbound flow control functionality through {@link Http2OutboundFlowController}
* Default implementation of {@link Http2ConnectionEncoder}.
*/
public class Http2OutboundConnectionAdapter implements Http2FrameWriter, Http2OutboundFlowController, Closeable {
public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder {
private final Http2FrameWriter frameWriter;
private final Http2Connection connection;
private final Http2OutboundFlowController outboundFlow;
private final Http2LifecycleManager lifecycleManager;
// We prefer ArrayDeque to LinkedList because later will produce more GC.
// This initial capacity is plenty for SETTINGS traffic.
private final ArrayDeque<Http2Settings> outstandingLocalSettingsQueue = new ArrayDeque<Http2Settings>(4);
private ChannelFutureListener closeListener;
public Http2OutboundConnectionAdapter(Http2Connection connection, Http2FrameWriter frameWriter) {
this(connection, frameWriter, new DefaultHttp2OutboundFlowController(connection, frameWriter));
public DefaultHttp2ConnectionEncoder(Http2Connection connection, Http2FrameWriter frameWriter,
Http2OutboundFlowController outboundFlow, Http2LifecycleManager lifecycleManager) {
this.frameWriter = checkNotNull(frameWriter, "frameWriter");
this.connection = checkNotNull(connection, "connection");
this.outboundFlow = checkNotNull(outboundFlow, "outboundFlow");
this.lifecycleManager = checkNotNull(lifecycleManager, "lifecycleManager");
}
public Http2OutboundConnectionAdapter(Http2Connection connection, Http2FrameWriter frameWriter,
Http2OutboundFlowController outboundFlow) {
this.frameWriter = frameWriter;
this.connection = connection;
this.outboundFlow = outboundFlow;
@Override
public void remoteSettings(Http2Settings settings) throws Http2Exception {
Boolean pushEnabled = settings.pushEnabled();
Http2FrameWriter.Configuration config = configuration();
Http2HeaderTable outboundHeaderTable = config.headerTable();
Http2FrameSizePolicy outboundFrameSizePolicy = config.frameSizePolicy();
if (pushEnabled != null) {
if (!connection.isServer()) {
throw protocolError("Client received SETTINGS frame with ENABLE_PUSH specified");
}
connection.remote().allowPushTo(pushEnabled);
}
Long maxConcurrentStreams = settings.maxConcurrentStreams();
if (maxConcurrentStreams != null) {
connection.local().maxStreams((int) Math.min(maxConcurrentStreams, Integer.MAX_VALUE));
}
Long headerTableSize = settings.headerTableSize();
if (headerTableSize != null) {
outboundHeaderTable.maxHeaderTableSize((int) Math.min(headerTableSize, Integer.MAX_VALUE));
}
Integer maxHeaderListSize = settings.maxHeaderListSize();
if (maxHeaderListSize != null) {
outboundHeaderTable.maxHeaderListSize(maxHeaderListSize);
}
Integer maxFrameSize = settings.maxFrameSize();
if (maxFrameSize != null) {
outboundFrameSizePolicy.maxFrameSize(maxFrameSize);
}
Integer initialWindowSize = settings.initialWindowSize();
if (initialWindowSize != null) {
initialOutboundWindowSize(initialWindowSize);
}
}
@Override
@ -79,11 +109,11 @@ public class Http2OutboundConnectionAdapter implements Http2FrameWriter, Http2Ou
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
// The write failed, handle the error.
onHttp2Exception(ctx, toHttp2Exception(future.cause()));
lifecycleManager.onHttp2Exception(ctx, toHttp2Exception(future.cause()));
} else if (endStream) {
// Close the local side of the stream if this is the last frame
Http2Stream stream = connection.stream(streamId);
closeLocalSide(stream, ctx.newPromise());
lifecycleManager.closeLocalSide(stream, ctx.newPromise());
}
}
});
@ -139,7 +169,7 @@ public class Http2OutboundConnectionAdapter implements Http2FrameWriter, Http2Ou
// If the headers are the end of the stream, close it now.
if (endStream) {
closeLocalSide(stream, promise);
lifecycleManager.closeLocalSide(stream, promise);
}
return future;
@ -171,7 +201,8 @@ public class Http2OutboundConnectionAdapter implements Http2FrameWriter, Http2Ou
@Override
public ChannelFuture writeRstStream(ChannelHandlerContext ctx, int streamId, long errorCode,
ChannelPromise promise) {
return writeRstStream(ctx, streamId, errorCode, promise, false);
// Delegate to the lifecycle manager for proper updating of connection state.
return lifecycleManager.writeRstStream(ctx, streamId, errorCode, promise);
}
/**
@ -201,7 +232,7 @@ public class Http2OutboundConnectionAdapter implements Http2FrameWriter, Http2Ou
if (stream != null) {
stream.terminateSent();
connection.close(stream, promise, closeListener);
lifecycleManager.closeStream(stream, promise);
}
return future;
@ -274,42 +305,10 @@ public class Http2OutboundConnectionAdapter implements Http2FrameWriter, Http2Ou
}
}
/**
* Sends a GO_AWAY frame to the remote endpoint. Waits until all streams are closed before shutting down the
* connection.
* @param ctx the handler context
* @param promise the promise used to create the close listener.
* @param cause connection error that caused this GO_AWAY, or {@code null} if normal termination.
*/
public void sendGoAway(ChannelHandlerContext ctx, ChannelPromise promise, Http2Exception cause) {
ChannelFuture future = null;
ChannelPromise closePromise = promise;
if (!connection.isGoAway()) {
int errorCode = cause != null ? cause.error().code() : NO_ERROR.code();
ByteBuf debugData = toByteBuf(ctx, cause);
future = writeGoAway(ctx, connection.remote().lastStreamCreated(), errorCode, debugData, promise);
ctx.flush();
closePromise = null;
}
closeListener = getOrCreateCloseListener(ctx, closePromise);
// If there are no active streams, close immediately after the send is complete.
// Otherwise wait until all streams are inactive.
if (cause != null || connection.numActiveStreams() == 0) {
if (future == null) {
future = ctx.newSucceededFuture();
}
future.addListener(closeListener);
}
}
@Override
public ChannelFuture writeGoAway(ChannelHandlerContext ctx, int lastStreamId, long errorCode, ByteBuf debugData,
ChannelPromise promise) {
connection.remote().goAwayReceived(lastStreamId);
return frameWriter.writeGoAway(ctx, lastStreamId, errorCode, debugData, promise);
return lifecycleManager.writeGoAway(ctx, lastStreamId, errorCode, debugData, promise);
}
@Override
@ -324,85 +323,13 @@ public class Http2OutboundConnectionAdapter implements Http2FrameWriter, Http2Ou
return frameWriter.writeFrame(ctx, frameType, streamId, flags, payload, promise);
}
/**
* Processes the given exception. Depending on the type of exception, delegates to either
* {@link #onConnectionError(ChannelHandlerContext, Http2Exception)} or
* {@link #onStreamError(ChannelHandlerContext, Http2StreamException)}.
*/
protected final void onHttp2Exception(ChannelHandlerContext ctx, Http2Exception e) {
if (e instanceof Http2StreamException) {
onStreamError(ctx, (Http2StreamException) e);
} else {
onConnectionError(ctx, e);
}
}
/**
* Handler for a stream error. Sends a RST_STREAM frame to the remote endpoint and closes the stream.
*/
protected void onStreamError(ChannelHandlerContext ctx, Http2StreamException cause) {
writeRstStream(ctx, cause.streamId(), cause.error().code(), ctx.newPromise(), true);
}
/**
* Handler for a connection error. Sends a GO_AWAY frame to the remote endpoint and waits until all streams are
* closed before shutting down the connection.
*/
protected void onConnectionError(ChannelHandlerContext ctx, Http2Exception cause) {
sendGoAway(ctx, ctx.newPromise(), cause);
}
/**
* If not already created, creates a new listener for the given promise which, when complete, closes the connection
* and frees any resources.
*/
private ChannelFutureListener getOrCreateCloseListener(final ChannelHandlerContext ctx, ChannelPromise promise) {
final ChannelPromise closePromise = promise == null ? ctx.newPromise() : promise;
if (closeListener == null) {
// If no promise was provided, create a new one.
closeListener = new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
ctx.close(closePromise);
close();
}
};
} else {
closePromise.setSuccess();
}
return closeListener;
}
/**
* Closes the remote side of the given stream. If this causes the stream to be closed, adds a hook to close the
* channel after the given future completes.
*
* @param stream the stream to be half closed.
* @param future If closing, the future after which to close the channel. If {@code null}, ignored.
*/
private void closeLocalSide(Http2Stream stream, ChannelFuture future) {
switch (stream.state()) {
case HALF_CLOSED_LOCAL:
case OPEN:
stream.closeLocalSide();
break;
default:
connection.close(stream, future, closeListener);
break;
}
}
@Override
public void close() {
frameWriter.close();
}
/**
* Get the {@link Http2Settings} object on the top of the queue that has been sent but not ACKed.
* This may return {@code null}.
*/
public Http2Settings pollSettings() {
@Override
public Http2Settings pollSentSettings() {
return outstandingLocalSettingsQueue.poll();
}
@ -411,14 +338,6 @@ public class Http2OutboundConnectionAdapter implements Http2FrameWriter, Http2Ou
return frameWriter.configuration();
}
/**
* Get the close listener associated with this object
* @return
*/
public ChannelFutureListener closeListener() {
return closeListener;
}
@Override
public void initialOutboundWindowSize(int newWindowSize) throws Http2Exception {
outboundFlow.initialOutboundWindowSize(newWindowSize);

View File

@ -39,6 +39,7 @@ import static io.netty.handler.codec.http2.Http2FrameTypes.PUSH_PROMISE;
import static io.netty.handler.codec.http2.Http2FrameTypes.RST_STREAM;
import static io.netty.handler.codec.http2.Http2FrameTypes.SETTINGS;
import static io.netty.handler.codec.http2.Http2FrameTypes.WINDOW_UPDATE;
import static io.netty.util.internal.ObjectUtil.checkNotNull;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf;
import io.netty.channel.ChannelFuture;
@ -183,9 +184,7 @@ public class DefaultHttp2FrameWriter implements Http2FrameWriter, Http2FrameSize
public ChannelFuture writeSettings(ChannelHandlerContext ctx, Http2Settings settings,
ChannelPromise promise) {
try {
if (settings == null) {
throw new NullPointerException("settings");
}
checkNotNull(settings, "settings");
int payloadLength = SETTING_ENTRY_LENGTH * settings.size();
ByteBuf frame = ctx.alloc().buffer(FRAME_HEADER_LENGTH + payloadLength);
writeFrameHeader(frame, payloadLength, SETTINGS, new Http2Flags(), 0);

View File

@ -19,6 +19,7 @@ import static io.netty.handler.codec.http2.Http2CodecUtil.CONNECTION_STREAM_ID;
import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_WINDOW_SIZE;
import static io.netty.handler.codec.http2.Http2Exception.flowControlError;
import static io.netty.handler.codec.http2.Http2Exception.protocolError;
import static io.netty.util.internal.ObjectUtil.checkNotNull;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
@ -43,14 +44,8 @@ public class DefaultHttp2InboundFlowController implements Http2InboundFlowContro
private int initialWindowSize = DEFAULT_WINDOW_SIZE;
public DefaultHttp2InboundFlowController(Http2Connection connection, Http2FrameWriter frameWriter) {
if (connection == null) {
throw new NullPointerException("connection");
}
if (frameWriter == null) {
throw new NullPointerException("frameWriter");
}
this.connection = connection;
this.frameWriter = frameWriter;
this.connection = checkNotNull(connection, "connection");
this.frameWriter = checkNotNull(frameWriter, "frameWriter");
// Add a flow state for the connection.
connection.connectionStream().inboundFlow(new InboundFlowState(CONNECTION_STREAM_ID));

View File

@ -15,6 +15,15 @@
package io.netty.handler.codec.http2;
import static io.netty.handler.codec.http2.Http2CodecUtil.CONNECTION_STREAM_ID;
import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_WINDOW_SIZE;
import static io.netty.handler.codec.http2.Http2Error.FLOW_CONTROL_ERROR;
import static io.netty.handler.codec.http2.Http2Error.STREAM_CLOSED;
import static io.netty.handler.codec.http2.Http2Exception.format;
import static io.netty.handler.codec.http2.Http2Exception.protocolError;
import static io.netty.util.internal.ObjectUtil.checkNotNull;
import static java.lang.Math.max;
import static java.lang.Math.min;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
@ -28,15 +37,6 @@ import java.util.Comparator;
import java.util.List;
import java.util.Queue;
import static io.netty.handler.codec.http2.Http2CodecUtil.CONNECTION_STREAM_ID;
import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_WINDOW_SIZE;
import static io.netty.handler.codec.http2.Http2Error.FLOW_CONTROL_ERROR;
import static io.netty.handler.codec.http2.Http2Error.STREAM_CLOSED;
import static io.netty.handler.codec.http2.Http2Exception.format;
import static io.netty.handler.codec.http2.Http2Exception.protocolError;
import static java.lang.Math.max;
import static java.lang.Math.min;
/**
* Basic implementation of {@link Http2OutboundFlowController}.
*/
@ -60,14 +60,8 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont
private ChannelHandlerContext ctx;
public DefaultHttp2OutboundFlowController(Http2Connection connection, Http2FrameWriter frameWriter) {
if (connection == null) {
throw new NullPointerException("connection");
}
if (frameWriter == null) {
throw new NullPointerException("frameWriter");
}
this.connection = connection;
this.frameWriter = frameWriter;
this.connection = checkNotNull(connection, "connection");
this.frameWriter = checkNotNull(frameWriter, "frameWriter");
// Add a flow state for the connection.
connection.connectionStream().outboundFlow(new OutboundFlowState(connection.connectionStream()));
@ -161,15 +155,9 @@ public class DefaultHttp2OutboundFlowController implements Http2OutboundFlowCont
@Override
public ChannelFuture writeData(ChannelHandlerContext ctx, int streamId, ByteBuf data,
int padding, boolean endStream, ChannelPromise promise) {
if (ctx == null) {
throw new NullPointerException("ctx");
}
if (promise == null) {
throw new NullPointerException("promise");
}
if (data == null) {
throw new NullPointerException("data");
}
checkNotNull(ctx, "ctx");
checkNotNull(promise, "promise");
checkNotNull(data, "data");
if (this.ctx != null && this.ctx != ctx) {
throw new IllegalArgumentException("Writing data from multiple ChannelHandlerContexts is not supported");
}

View File

@ -14,6 +14,15 @@
*/
package io.netty.handler.codec.http2;
import static io.netty.handler.codec.base64.Base64Dialect.URL_SAFE;
import static io.netty.handler.codec.http2.Http2CodecUtil.HTTP_UPGRADE_PROTOCOL_NAME;
import static io.netty.handler.codec.http2.Http2CodecUtil.HTTP_UPGRADE_SETTINGS_HEADER;
import static io.netty.handler.codec.http2.Http2CodecUtil.SETTING_ENTRY_LENGTH;
import static io.netty.handler.codec.http2.Http2CodecUtil.writeUnsignedInt;
import static io.netty.handler.codec.http2.Http2CodecUtil.writeUnsignedShort;
import static io.netty.util.CharsetUtil.UTF_8;
import static io.netty.util.ReferenceCountUtil.release;
import static io.netty.util.internal.ObjectUtil.checkNotNull;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.base64.Base64;
@ -26,11 +35,6 @@ import java.util.Collection;
import java.util.Collections;
import java.util.List;
import static io.netty.handler.codec.base64.Base64Dialect.*;
import static io.netty.handler.codec.http2.Http2CodecUtil.*;
import static io.netty.util.CharsetUtil.*;
import static io.netty.util.ReferenceCountUtil.*;
/**
* Client-side cleartext upgrade codec from HTTP to HTTP/2.
*/
@ -39,7 +43,7 @@ public class Http2ClientUpgradeCodec implements HttpClientUpgradeHandler.Upgrade
private static final List<String> UPGRADE_HEADERS = Collections.singletonList(HTTP_UPGRADE_SETTINGS_HEADER);
private final String handlerName;
private final Http2InboundConnectionHandler connectionHandler;
private final Http2ConnectionHandler connectionHandler;
/**
* Creates the codec using a default name for the connection handler when adding to the
@ -47,7 +51,7 @@ public class Http2ClientUpgradeCodec implements HttpClientUpgradeHandler.Upgrade
*
* @param connectionHandler the HTTP/2 connection handler.
*/
public Http2ClientUpgradeCodec(Http2InboundConnectionHandler connectionHandler) {
public Http2ClientUpgradeCodec(Http2ConnectionHandler connectionHandler) {
this("http2ConnectionHandler", connectionHandler);
}
@ -58,15 +62,9 @@ public class Http2ClientUpgradeCodec implements HttpClientUpgradeHandler.Upgrade
* @param connectionHandler the HTTP/2 connection handler.
*/
public Http2ClientUpgradeCodec(String handlerName,
Http2InboundConnectionHandler connectionHandler) {
if (handlerName == null) {
throw new NullPointerException("handlerName");
}
if (connectionHandler == null) {
throw new NullPointerException("connectionHandler");
}
this.handlerName = handlerName;
this.connectionHandler = connectionHandler;
Http2ConnectionHandler connectionHandler) {
this.handlerName = checkNotNull(handlerName, "handlerName");
this.connectionHandler = checkNotNull(connectionHandler, "connectionHandler");
}
@Override
@ -101,7 +99,7 @@ public class Http2ClientUpgradeCodec implements HttpClientUpgradeHandler.Upgrade
ByteBuf encodedBuf = null;
try {
// Get the local settings for the handler.
Http2Settings settings = connectionHandler.settings();
Http2Settings settings = connectionHandler.decoder().localSettings();
// Serialize the payload of the SETTINGS frame.
int payloadLength = SETTING_ENTRY_LENGTH * settings.size();

View File

@ -16,8 +16,8 @@
package io.netty.handler.codec.http2;
import static io.netty.handler.codec.http2.Http2Error.INTERNAL_ERROR;
import static io.netty.handler.codec.http2.Http2Exception.format;
import static io.netty.util.CharsetUtil.UTF_8;
import static io.netty.util.internal.ObjectUtil.checkNotNull;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler;
@ -105,10 +105,7 @@ public final class Http2CodecUtil {
@Override
public void setAction(Action action) {
if (action == null) {
throw new NullPointerException("action");
}
this.action = action;
this.action = checkNotNull(action, "action");
}
@Override
@ -135,11 +132,27 @@ public final class Http2CodecUtil {
* Converts the given cause to a {@link Http2Exception} if it isn't already.
*/
public static Http2Exception toHttp2Exception(Throwable cause) {
if (cause instanceof Http2Exception) {
return (Http2Exception) cause;
// Look for an embedded Http2Exception.
Http2Exception httpException = getEmbeddedHttp2Exception(cause);
if (httpException != null) {
return httpException;
}
String msg = cause != null ? cause.getMessage() : "Failed writing the data frame.";
return format(INTERNAL_ERROR, msg);
return new Http2Exception(INTERNAL_ERROR, cause.getMessage(), cause);
}
/**
* Iteratively looks through the causaility chain for the given exception and returns the first
* {@link Http2Exception} or {@code null} if none.
*/
public static Http2Exception getEmbeddedHttp2Exception(Throwable cause) {
while (cause != null) {
if (cause instanceof Http2Exception) {
return (Http2Exception) cause;
}
cause = cause.getCause();
}
return null;
}
/**

View File

@ -15,9 +15,6 @@
package io.netty.handler.codec.http2;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import java.util.Collection;
/**
@ -272,12 +269,4 @@ public interface Http2Connection {
* Indicates whether or not either endpoint has received a GOAWAY.
*/
boolean isGoAway();
/**
* Closes the given stream and adds a hook to close the channel after the given future completes.
* @param stream the stream to be closed.
* @param future the future after which to close the channel. If {@code null}, ignored.
* @param closeListener the listener to add to the {@code future} if notification is expected
*/
void close(Http2Stream stream, ChannelFuture future, ChannelFutureListener closeListener);
}

View File

@ -0,0 +1,50 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License, version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package io.netty.handler.codec.http2;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import java.io.Closeable;
import java.util.List;
/**
* Handler for inbound traffic on behalf of {@link Http2ConnectionHandler}.
*/
public interface Http2ConnectionDecoder extends Closeable {
/**
* Called by the {@link Http2ConnectionHandler} to decode the next frame from the input buffer.
*/
void decodeFrame(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Http2Exception;
/**
* Gets the local settings for this endpoint of the HTTP/2 connection.
*/
Http2Settings localSettings();
/**
* Sets the local settings for this endpoint of the HTTP/2 connection.
*/
void localSettings(Http2Settings settings) throws Http2Exception;
/**
* Indicates whether or not the first initial {@code SETTINGS} frame was received from the remote endpoint.
*/
boolean prefaceReceived();
@Override
void close();
}

View File

@ -0,0 +1,32 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License, version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package io.netty.handler.codec.http2;
/**
* Handler for outbound traffic on behalf of {@link Http2ConectionHandler}.
*/
public interface Http2ConnectionEncoder extends Http2FrameWriter, Http2OutboundFlowController {
/**
* Gets the local settings on the top of the queue that has been sent but not ACKed. This may
* return {@code null}.
*/
Http2Settings pollSentSettings();
/**
* Sets the settings for the remote endpoint of the HTTP/2 connection.
*/
void remoteSettings(Http2Settings settings) throws Http2Exception;
}

View File

@ -14,114 +14,266 @@
*/
package io.netty.handler.codec.http2;
import static io.netty.handler.codec.http2.Http2CodecUtil.HTTP_UPGRADE_STREAM_ID;
import static io.netty.handler.codec.http2.Http2CodecUtil.connectionPrefaceBuf;
import static io.netty.handler.codec.http2.Http2CodecUtil.getEmbeddedHttp2Exception;
import static io.netty.handler.codec.http2.Http2Exception.protocolError;
import static io.netty.util.internal.ObjectUtil.checkNotNull;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.ByteToMessageDecoder;
import java.util.Collection;
import java.util.List;
/**
* This class handles writing HTTP/2 frames, delegating responses to a {@link Http2FrameListener},
* and can be inserted into a Netty pipeline.
* Provides the default implementation for processing inbound frame events
* and delegates to a {@link Http2FrameListener}
* <p>
* This class will read HTTP/2 frames and delegate the events to a {@link Http2FrameListener}
* <p>
* This interface enforces inbound flow control functionality through {@link Http2InboundFlowController}
*/
public class Http2ConnectionHandler extends Http2InboundConnectionHandler implements Http2FrameWriter {
public class Http2ConnectionHandler extends ByteToMessageDecoder {
private final Http2LifecycleManager lifecycleManager;
private final Http2ConnectionDecoder decoder;
private final Http2ConnectionEncoder encoder;
private final Http2Connection connection;
private ByteBuf clientPrefaceString;
private boolean prefaceSent;
public Http2ConnectionHandler(boolean server, Http2FrameListener listener) {
this(new DefaultHttp2Connection(server), listener);
}
public Http2ConnectionHandler(Http2Connection connection, Http2FrameListener listener) {
this(connection, listener, new DefaultHttp2FrameReader(), new DefaultHttp2FrameWriter());
this(connection, new DefaultHttp2FrameReader(), new DefaultHttp2FrameWriter(), listener);
}
public Http2ConnectionHandler(Http2Connection connection, Http2FrameListener listener,
Http2FrameReader frameReader, Http2FrameWriter frameWriter) {
this(connection, listener, frameReader, new DefaultHttp2InboundFlowController(connection, frameWriter),
new Http2OutboundConnectionAdapter(connection, frameWriter));
public Http2ConnectionHandler(Http2Connection connection, Http2FrameReader frameReader,
Http2FrameWriter frameWriter, Http2FrameListener listener) {
this(connection, frameReader, frameWriter, new DefaultHttp2InboundFlowController(
connection, frameWriter), new DefaultHttp2OutboundFlowController(connection,
frameWriter), listener);
}
public Http2ConnectionHandler(Http2Connection connection, Http2FrameListener listener,
Http2FrameReader frameReader, Http2InboundFlowController inboundFlow,
Http2OutboundConnectionAdapter outbound) {
super(connection, listener, frameReader, inboundFlow, outbound);
public Http2ConnectionHandler(Http2Connection connection, Http2FrameReader frameReader,
Http2FrameWriter frameWriter, Http2InboundFlowController inboundFlow,
Http2OutboundFlowController outboundFlow, Http2FrameListener listener) {
checkNotNull(frameWriter, "frameWriter");
checkNotNull(inboundFlow, "inboundFlow");
checkNotNull(outboundFlow, "outboundFlow");
checkNotNull(listener, "listener");
this.connection = checkNotNull(connection, "connection");
this.lifecycleManager = new Http2LifecycleManager(connection, frameWriter);
this.encoder =
new DefaultHttp2ConnectionEncoder(connection, frameWriter, outboundFlow,
lifecycleManager);
this.decoder =
new DefaultHttp2ConnectionDecoder(connection, frameReader, inboundFlow, encoder,
lifecycleManager, listener);
clientPrefaceString = clientPrefaceString(connection);
}
public Http2ConnectionHandler(Http2Connection connection, Http2ConnectionDecoder decoder,
Http2ConnectionEncoder encoder, Http2LifecycleManager lifecycleManager) {
this.connection = checkNotNull(connection, "connection");
this.lifecycleManager = checkNotNull(lifecycleManager, "lifecycleManager");
this.encoder = checkNotNull(encoder, "encoder");
this.decoder = checkNotNull(decoder, "decoder");
clientPrefaceString = clientPrefaceString(connection);
}
public Http2Connection connection() {
return connection;
}
public Http2LifecycleManager lifecycleManager() {
return lifecycleManager;
}
public Http2ConnectionDecoder decoder() {
return decoder;
}
public Http2ConnectionEncoder encoder() {
return encoder;
}
/**
* Handles the client-side (cleartext) upgrade from HTTP to HTTP/2.
* Reserves local stream 1 for the HTTP/2 response.
*/
public void onHttpClientUpgrade() throws Http2Exception {
if (connection.isServer()) {
throw protocolError("Client-side HTTP upgrade requested for a server");
}
if (prefaceSent || decoder.prefaceReceived()) {
throw protocolError("HTTP upgrade must occur before HTTP/2 preface is sent or received");
}
// Create a local stream used for the HTTP cleartext upgrade.
connection.createLocalStream(HTTP_UPGRADE_STREAM_ID, true);
}
/**
* Handles the server-side (cleartext) upgrade from HTTP to HTTP/2.
* @param settings the settings for the remote endpoint.
*/
public void onHttpServerUpgrade(Http2Settings settings) throws Http2Exception {
if (!connection.isServer()) {
throw protocolError("Server-side HTTP upgrade requested for a client");
}
if (prefaceSent || decoder.prefaceReceived()) {
throw protocolError("HTTP upgrade must occur before HTTP/2 preface is sent or received");
}
// Apply the settings but no ACK is necessary.
encoder.remoteSettings(settings);
// Create a stream in the half-closed state.
connection.createRemoteStream(HTTP_UPGRADE_STREAM_ID, true);
}
@Override
public ChannelFuture writeHeaders(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int padding,
boolean endStream, ChannelPromise promise) {
return outbound.writeHeaders(ctx, streamId, headers, padding, endStream, promise);
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// The channel just became active - send the connection preface to the remote
// endpoint.
sendPreface(ctx);
super.channelActive(ctx);
}
@Override
public ChannelFuture writeHeaders(ChannelHandlerContext ctx, int streamId, Http2Headers headers,
int streamDependency, short weight, boolean exclusive, int padding, boolean endStream,
ChannelPromise promise) {
return outbound.writeHeaders(ctx, streamId, headers, padding, endStream, promise);
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
// This handler was just added to the context. In case it was handled after
// the connection became active, send the connection preface now.
sendPreface(ctx);
}
@Override
public ChannelFuture writePriority(ChannelHandlerContext ctx, int streamId, int streamDependency, short weight,
boolean exclusive, ChannelPromise promise) {
return outbound.writePriority(ctx, streamId, streamDependency, weight, exclusive, promise);
protected void handlerRemoved0(ChannelHandlerContext ctx) throws Exception {
dispose();
}
@Override
public ChannelFuture writeRstStream(ChannelHandlerContext ctx, int streamId, long errorCode,
ChannelPromise promise) {
return outbound.writeRstStream(ctx, streamId, errorCode, promise);
public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
lifecycleManager.close(ctx, promise);
}
@Override
public ChannelFuture writeSettings(ChannelHandlerContext ctx, Http2Settings settings, ChannelPromise promise) {
return outbound.writeSettings(ctx, settings, promise);
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
ChannelFuture future = ctx.newSucceededFuture();
final Collection<Http2Stream> streams = connection.activeStreams();
for (Http2Stream s : streams.toArray(new Http2Stream[streams.size()])) {
lifecycleManager.closeStream(s, future);
}
super.channelInactive(ctx);
}
/**
* Handles {@link Http2Exception} objects that were thrown from other handlers. Ignores all other exceptions.
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
Http2Exception ex = getEmbeddedHttp2Exception(cause);
if (ex != null) {
lifecycleManager.onHttp2Exception(ctx, ex);
} else {
super.exceptionCaught(ctx, cause);
}
}
@Override
public ChannelFuture writeSettingsAck(ChannelHandlerContext ctx, ChannelPromise promise) {
return outbound.writeSettingsAck(ctx, promise);
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
try {
// Read the remaining of the client preface string if we haven't already.
// If this is a client endpoint, always returns true.
if (!readClientPrefaceString(ctx, in)) {
// Still processing the client preface.
return;
}
decoder.decodeFrame(ctx, in, out);
} catch (Http2Exception e) {
lifecycleManager.onHttp2Exception(ctx, e);
} catch (Throwable e) {
lifecycleManager.onHttp2Exception(ctx, new Http2Exception(Http2Error.INTERNAL_ERROR, e.getMessage(), e));
}
}
@Override
public ChannelFuture writePing(ChannelHandlerContext ctx, boolean ack, ByteBuf data, ChannelPromise promise) {
return outbound.writePing(ctx, ack, data, promise);
/**
* Sends the HTTP/2 connection preface upon establishment of the connection, if not already sent.
*/
private void sendPreface(final ChannelHandlerContext ctx) {
if (prefaceSent || !ctx.channel().isActive()) {
return;
}
prefaceSent = true;
if (!connection.isServer()) {
// Clients must send the preface string as the first bytes on the connection.
ctx.write(connectionPrefaceBuf()).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
}
// Both client and server must send their initial settings.
encoder.writeSettings(ctx, decoder.localSettings(), ctx.newPromise()).addListener(
ChannelFutureListener.CLOSE_ON_FAILURE);
}
@Override
public ChannelFuture writePushPromise(ChannelHandlerContext ctx, int streamId, int promisedStreamId,
Http2Headers headers, int padding, ChannelPromise promise) {
return outbound.writePushPromise(ctx, streamId, promisedStreamId, headers, padding, promise);
/**
* Disposes of all resources.
*/
private void dispose() {
encoder.close();
decoder.close();
if (clientPrefaceString != null) {
clientPrefaceString.release();
clientPrefaceString = null;
}
}
@Override
public ChannelFuture writeGoAway(ChannelHandlerContext ctx, int lastStreamId, long errorCode, ByteBuf debugData,
ChannelPromise promise) {
return outbound.writeGoAway(ctx, lastStreamId, errorCode, debugData, promise);
/**
* Decodes the client connection preface string from the input buffer.
*
* @return {@code true} if processing of the client preface string is complete. Since client preface strings can
* only be received by servers, returns true immediately for client endpoints.
*/
private boolean readClientPrefaceString(ChannelHandlerContext ctx, ByteBuf in) throws Http2Exception {
if (clientPrefaceString == null) {
return true;
}
int prefaceRemaining = clientPrefaceString.readableBytes();
int bytesRead = Math.min(in.readableBytes(), prefaceRemaining);
// Read the portion of the input up to the length of the preface, if reached.
ByteBuf sourceSlice = in.readSlice(bytesRead);
// Read the same number of bytes from the preface buffer.
ByteBuf prefaceSlice = clientPrefaceString.readSlice(bytesRead);
// If the input so far doesn't match the preface, break the connection.
if (bytesRead == 0 || !prefaceSlice.equals(sourceSlice)) {
throw protocolError("HTTP/2 client preface string missing or corrupt.");
}
if (!clientPrefaceString.isReadable()) {
// Entire preface has been read.
clientPrefaceString.release();
clientPrefaceString = null;
return true;
}
return false;
}
@Override
public ChannelFuture writeWindowUpdate(ChannelHandlerContext ctx, int streamId, int windowSizeIncrement,
ChannelPromise promise) {
return outbound.writeWindowUpdate(ctx, streamId, windowSizeIncrement, promise);
}
@Override
public ChannelFuture writeFrame(ChannelHandlerContext ctx, byte frameType, int streamId, Http2Flags flags,
ByteBuf payload, ChannelPromise promise) {
return outbound.writeFrame(ctx, frameType, streamId, flags, payload, promise);
}
@Override
public ChannelFuture writeData(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding,
boolean endStream, ChannelPromise promise) {
return outbound.writeData(ctx, streamId, data, padding, endStream, promise);
}
@Override
public void close() {
outbound.close();
super.close();
}
@Override
public Configuration configuration() {
return outbound.configuration();
/**
* Returns the client preface string if this is a client connection, otherwise returns {@code null}.
*/
private static ByteBuf clientPrefaceString(Http2Connection connection) {
return connection.isServer() ? connectionPrefaceBuf() : null;
}
}

View File

@ -14,6 +14,7 @@
*/
package io.netty.handler.codec.http2;
import static io.netty.util.internal.ObjectUtil.checkNotNull;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
@ -24,10 +25,7 @@ public class Http2FrameListenerDecorator implements Http2FrameListener {
protected final Http2FrameListener listener;
public Http2FrameListenerDecorator(Http2FrameListener listener) {
if (listener == null) {
throw new NullPointerException("listener");
}
this.listener = listener;
this.listener = checkNotNull(listener, "listener");
}
@Override

View File

@ -15,6 +15,7 @@
*/
package io.netty.handler.codec.http2;
import static io.netty.util.internal.ObjectUtil.checkNotNull;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.channel.ChannelHandlerAdapter;
@ -40,14 +41,8 @@ public class Http2FrameLogger extends ChannelHandlerAdapter {
}
public Http2FrameLogger(InternalLogLevel level, InternalLogger logger) {
if (level == null) {
throw new NullPointerException("level");
}
if (logger == null) {
throw new NullPointerException("logger");
}
this.level = level;
this.logger = logger;
this.level = checkNotNull(level, "level");
this.logger = checkNotNull(logger, "logger");
}
public void logData(Direction direction, int streamId, ByteBuf data, int padding,

View File

@ -16,6 +16,7 @@
package io.netty.handler.codec.http2;
import static io.netty.handler.codec.http2.Http2FrameLogger.Direction.INBOUND;
import static io.netty.util.internal.ObjectUtil.checkNotNull;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
@ -28,14 +29,8 @@ public class Http2InboundFrameLogger implements Http2FrameReader {
private final Http2FrameLogger logger;
public Http2InboundFrameLogger(Http2FrameReader reader, Http2FrameLogger logger) {
if (reader == null) {
throw new NullPointerException("reader");
}
if (logger == null) {
throw new NullPointerException("logger");
}
this.reader = reader;
this.logger = logger;
this.reader = checkNotNull(reader, "reader");
this.logger = checkNotNull(logger, "logger");
}
@Override

View File

@ -0,0 +1,211 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License, version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package io.netty.handler.codec.http2;
import static io.netty.handler.codec.http2.Http2Error.NO_ERROR;
import static io.netty.util.internal.ObjectUtil.checkNotNull;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
/**
* Manager for the life cycle of the HTTP/2 connection. Handles graceful shutdown of the channel,
* closing only after all of the streams have closed.
*/
public class Http2LifecycleManager {
private final Http2Connection connection;
private final Http2FrameWriter frameWriter;
private ChannelFutureListener closeListener;
public Http2LifecycleManager(Http2Connection connection, Http2FrameWriter frameWriter) {
this.connection = checkNotNull(connection, "connection");
this.frameWriter = checkNotNull(frameWriter, "frameWriter");
}
/**
* Handles the close processing on behalf of the {@link DelegatingHttp2ConnectionHandler}.
*/
public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
// Avoid NotYetConnectedException
if (!ctx.channel().isActive()) {
ctx.close(promise);
return;
}
ChannelFuture future = writeGoAway(ctx, null);
// If there are no active streams, close immediately after the send is complete.
// Otherwise wait until all streams are inactive.
if (connection.numActiveStreams() == 0) {
future.addListener(new ClosingChannelFutureListener(ctx, promise));
} else {
closeListener = new ClosingChannelFutureListener(ctx, promise);
}
}
/**
* Closes the remote side of the given stream. If this causes the stream to be closed, adds a
* hook to close the channel after the given future completes.
*
* @param stream the stream to be half closed.
* @param future If closing, the future after which to close the channel.
*/
public void closeLocalSide(Http2Stream stream, ChannelFuture future) {
switch (stream.state()) {
case HALF_CLOSED_LOCAL:
case OPEN:
stream.closeLocalSide();
break;
default:
closeStream(stream, future);
break;
}
}
/**
* Closes the remote side of the given stream. If this causes the stream to be closed, adds a
* hook to close the channel after the given future completes.
*
* @param stream the stream to be half closed.
* @param future If closing, the future after which to close the channel.
*/
public void closeRemoteSide(Http2Stream stream, ChannelFuture future) {
switch (stream.state()) {
case HALF_CLOSED_REMOTE:
case OPEN:
stream.closeRemoteSide();
break;
default:
closeStream(stream, future);
break;
}
}
/**
* Closes the given stream and adds a hook to close the channel after the given future
* completes.
*
* @param stream the stream to be closed.
* @param future the future after which to close the channel.
*/
public void closeStream(Http2Stream stream, ChannelFuture future) {
stream.close();
// If this connection is closing and there are no longer any
// active streams, close after the current operation completes.
if (closeListener != null && connection.numActiveStreams() == 0) {
future.addListener(closeListener);
}
}
/**
* Processes the given exception. Depending on the type of exception, delegates to either
* {@link #onConnectionError(ChannelHandlerContext, Http2Exception)} or
* {@link #onStreamError(ChannelHandlerContext, Http2StreamException)}.
*/
public void onHttp2Exception(ChannelHandlerContext ctx, Http2Exception e) {
if (e instanceof Http2StreamException) {
onStreamError(ctx, (Http2StreamException) e);
} else {
onConnectionError(ctx, e);
}
}
/**
* Handler for a connection error. Sends a GO_AWAY frame to the remote endpoint and waits until
* all streams are closed before shutting down the connection.
*/
private void onConnectionError(ChannelHandlerContext ctx, Http2Exception cause) {
writeGoAway(ctx, cause).addListener(new ClosingChannelFutureListener(ctx, ctx.newPromise()));
}
/**
* Handler for a stream error. Sends a RST_STREAM frame to the remote endpoint and closes the stream.
*/
private void onStreamError(ChannelHandlerContext ctx, Http2StreamException cause) {
writeRstStream(ctx, cause.streamId(), cause.error().code(), ctx.newPromise());
}
/**
* Writes a RST_STREAM frame to the remote endpoint and updates the connection state appropriately.
*/
public ChannelFuture writeRstStream(ChannelHandlerContext ctx, int streamId, long errorCode,
ChannelPromise promise) {
Http2Stream stream = connection.stream(streamId);
ChannelFuture future = frameWriter.writeRstStream(ctx, streamId, errorCode, promise);
ctx.flush();
if (stream != null) {
stream.terminateSent();
closeStream(stream, promise);
}
return future;
}
/**
* Sends a {@code GO_AWAY} frame to the remote endpoint and updates the connection state appropriately.
*/
public ChannelFuture writeGoAway(ChannelHandlerContext ctx, int lastStreamId, long errorCode, ByteBuf debugData,
ChannelPromise promise) {
if (connection.isGoAway()) {
debugData.release();
return ctx.newSucceededFuture();
}
ChannelFuture future = frameWriter.writeGoAway(ctx, lastStreamId, errorCode, debugData, promise);
ctx.flush();
connection.remote().goAwayReceived(lastStreamId);
return future;
}
/**
* Sends a GO_AWAY frame appropriate for the given exception.
*/
private ChannelFuture writeGoAway(ChannelHandlerContext ctx, Http2Exception cause) {
if (connection.isGoAway()) {
return ctx.newSucceededFuture();
}
// The connection isn't alredy going away, send the GO_AWAY frame now to start
// the process.
int errorCode = cause != null ? cause.error().code() : NO_ERROR.code();
ByteBuf debugData = Http2CodecUtil.toByteBuf(ctx, cause);
int lastKnownStream = connection.remote().lastStreamCreated();
return writeGoAway(ctx, lastKnownStream, errorCode, debugData, ctx.newPromise());
}
/**
* Closes the channel when the future completes.
*/
private static final class ClosingChannelFutureListener implements ChannelFutureListener {
private final ChannelHandlerContext ctx;
private final ChannelPromise promise;
ClosingChannelFutureListener(ChannelHandlerContext ctx, ChannelPromise promise) {
this.ctx = ctx;
this.promise = promise;
}
@Override
public void operationComplete(ChannelFuture sentGoAwayFuture) throws Exception {
ctx.close(promise);
}
}
}

View File

@ -150,7 +150,7 @@ public abstract class Http2OrHttpChooser extends ByteToMessageDecoder {
/**
* Create the {@link io.netty.channel.ChannelHandler} that is responsible for handling the http
* responses when the when the {@link SelectedProtocol} was {@link SelectedProtocol#HTTP_2}. The
* returned class should be a subclass of {@link Http2ConnectionHandler}.
* returned class should be a subclass of {@link DelegatingHttp2ConnectionHandler}.
*/
protected abstract ChannelHandler createHttp2RequestHandler();
}

View File

@ -16,6 +16,7 @@
package io.netty.handler.codec.http2;
import static io.netty.handler.codec.http2.Http2FrameLogger.Direction.OUTBOUND;
import static io.netty.util.internal.ObjectUtil.checkNotNull;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
@ -30,14 +31,8 @@ public class Http2OutboundFrameLogger implements Http2FrameWriter {
private final Http2FrameLogger logger;
public Http2OutboundFrameLogger(Http2FrameWriter writer, Http2FrameLogger logger) {
if (writer == null) {
throw new NullPointerException("writer");
}
if (logger == null) {
throw new NullPointerException("logger");
}
this.writer = writer;
this.logger = logger;
this.writer = checkNotNull(writer, "writer");
this.logger = checkNotNull(logger, "logger");
}
@Override

View File

@ -21,6 +21,7 @@ import static io.netty.handler.codec.http2.Http2CodecUtil.HTTP_UPGRADE_PROTOCOL_
import static io.netty.handler.codec.http2.Http2CodecUtil.HTTP_UPGRADE_SETTINGS_HEADER;
import static io.netty.handler.codec.http2.Http2CodecUtil.writeFrameHeader;
import static io.netty.handler.codec.http2.Http2FrameTypes.SETTINGS;
import static io.netty.util.internal.ObjectUtil.checkNotNull;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
@ -43,7 +44,7 @@ public class Http2ServerUpgradeCodec implements HttpServerUpgradeHandler.Upgrade
Collections.singletonList(HTTP_UPGRADE_SETTINGS_HEADER);
private final String handlerName;
private final Http2InboundConnectionHandler connectionHandler;
private final Http2ConnectionHandler connectionHandler;
private final Http2FrameReader frameReader;
private Http2Settings settings;
@ -53,7 +54,7 @@ public class Http2ServerUpgradeCodec implements HttpServerUpgradeHandler.Upgrade
*
* @param connectionHandler the HTTP/2 connection handler.
*/
public Http2ServerUpgradeCodec(Http2InboundConnectionHandler connectionHandler) {
public Http2ServerUpgradeCodec(Http2ConnectionHandler connectionHandler) {
this("http2ConnectionHandler", connectionHandler);
}
@ -63,16 +64,9 @@ public class Http2ServerUpgradeCodec implements HttpServerUpgradeHandler.Upgrade
* @param handlerName the name of the HTTP/2 connection handler to be used in the pipeline.
* @param connectionHandler the HTTP/2 connection handler.
*/
public Http2ServerUpgradeCodec(String handlerName,
Http2InboundConnectionHandler connectionHandler) {
if (handlerName == null) {
throw new NullPointerException("handlerName");
}
if (connectionHandler == null) {
throw new NullPointerException("connectionHandler");
}
this.handlerName = handlerName;
this.connectionHandler = connectionHandler;
public Http2ServerUpgradeCodec(String handlerName, Http2ConnectionHandler connectionHandler) {
this.handlerName = checkNotNull(handlerName, "handlerName");
this.connectionHandler = checkNotNull(connectionHandler, "connectionHandler");
frameReader = new DefaultHttp2FrameReader();
}

View File

@ -23,6 +23,7 @@ import static io.netty.handler.codec.http2.Http2CodecUtil.SETTINGS_MAX_CONCURREN
import static io.netty.handler.codec.http2.Http2CodecUtil.SETTINGS_MAX_FRAME_SIZE;
import static io.netty.handler.codec.http2.Http2CodecUtil.SETTINGS_MAX_HEADER_LIST_SIZE;
import static io.netty.handler.codec.http2.Http2CodecUtil.isMaxFrameSizeValid;
import static io.netty.util.internal.ObjectUtil.checkNotNull;
import io.netty.util.collection.IntObjectHashMap;
/**
@ -177,9 +178,7 @@ public final class Http2Settings extends IntObjectHashMap<Long> {
}
private void verifyStandardSetting(int key, Long value) {
if (value == null) {
throw new NullPointerException("value");
}
checkNotNull(value, "value");
switch (key) {
case SETTINGS_HEADER_TABLE_SIZE:
if (value < 0L || value > MAX_UNSIGNED_INT) {

View File

@ -22,7 +22,7 @@ import io.netty.handler.codec.http.FullHttpMessage;
import io.netty.handler.codec.http.HttpHeaders;
/**
* Light weight wrapper around {@link Http2ConnectionHandler} to provide HTTP/1.x objects to HTTP/2 frames
* Light weight wrapper around {@link DelegatingHttp2ConnectionHandler} to provide HTTP/1.x objects to HTTP/2 frames
* <p>
* See {@link InboundHttp2ToHttpAdapter} to get translation from HTTP/2 frames to HTTP/1.x objects
*/
@ -35,15 +35,15 @@ public class Http2ToHttpConnectionHandler extends Http2ConnectionHandler {
super(connection, listener);
}
public Http2ToHttpConnectionHandler(Http2Connection connection, Http2FrameListener listener,
Http2FrameReader frameReader, Http2FrameWriter frameWriter) {
super(connection, listener, frameReader, frameWriter);
public Http2ToHttpConnectionHandler(Http2Connection connection, Http2FrameReader frameReader,
Http2FrameWriter frameWriter, Http2FrameListener listener) {
super(connection, frameReader, frameWriter, listener);
}
public Http2ToHttpConnectionHandler(Http2Connection connection, Http2FrameListener listener,
Http2FrameReader frameReader, Http2InboundFlowController inboundFlow,
Http2OutboundConnectionAdapter outbound) {
super(connection, listener, frameReader, inboundFlow, outbound);
public Http2ToHttpConnectionHandler(Http2Connection connection, Http2FrameReader frameReader,
Http2FrameWriter frameWriter, Http2InboundFlowController inboundFlow,
Http2OutboundFlowController outboundFlow, Http2FrameListener listener) {
super(connection, frameReader, frameWriter, inboundFlow, outboundFlow, listener);
}
/**
@ -57,7 +57,7 @@ public class Http2ToHttpConnectionHandler extends Http2ConnectionHandler {
int streamId = 0;
String value = httpHeaders.get(HttpUtil.ExtensionHeaderNames.STREAM_ID.text());
if (value == null) {
streamId = connection.local().nextStreamId();
streamId = connection().local().nextStreamId();
} else {
try {
streamId = Integer.parseInt(value);
@ -97,10 +97,10 @@ public class Http2ToHttpConnectionHandler extends Http2ConnectionHandler {
ChannelPromise headerPromise = ctx.newPromise();
ChannelPromise dataPromise = ctx.newPromise();
promiseAggregator.add(headerPromise, dataPromise);
writeHeaders(ctx, streamId, http2Headers, 0, false, headerPromise);
writeData(ctx, streamId, httpMsg.content(), 0, true, dataPromise);
encoder().writeHeaders(ctx, streamId, http2Headers, 0, false, headerPromise);
encoder().writeData(ctx, streamId, httpMsg.content(), 0, true, dataPromise);
} else {
writeHeaders(ctx, streamId, http2Headers, 0, true, promise);
encoder().writeHeaders(ctx, streamId, http2Headers, 0, true, promise);
}
} else {
ctx.write(msg, promise);

View File

@ -14,6 +14,7 @@
*/
package io.netty.handler.codec.http2;
import static io.netty.util.internal.ObjectUtil.checkNotNull;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.TooLongFrameException;
@ -124,9 +125,7 @@ public class InboundHttp2ToHttpAdapter extends Http2EventAdapter {
*/
protected InboundHttp2ToHttpAdapter(Http2Connection connection, int maxContentLength,
boolean validateHttpHeaders) {
if (connection == null) {
throw new NullPointerException("connection");
}
checkNotNull(connection, "connection");
if (maxContentLength <= 0) {
throw new IllegalArgumentException("maxContentLength must be a positive integer: " + maxContentLength);
}

View File

@ -0,0 +1,364 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License, version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package io.netty.handler.codec.http2;
import static io.netty.buffer.Unpooled.EMPTY_BUFFER;
import static io.netty.buffer.Unpooled.wrappedBuffer;
import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_PRIORITY_WEIGHT;
import static io.netty.handler.codec.http2.Http2CodecUtil.emptyPingBuf;
import static io.netty.handler.codec.http2.Http2Error.PROTOCOL_ERROR;
import static io.netty.handler.codec.http2.Http2Exception.protocolError;
import static io.netty.handler.codec.http2.Http2Stream.State.OPEN;
import static io.netty.handler.codec.http2.Http2Stream.State.RESERVED_REMOTE;
import static io.netty.util.CharsetUtil.UTF_8;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.anyShort;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.channel.DefaultChannelPromise;
import java.util.Collections;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
/**
* Tests for {@link DefaultHttp2ConnectionDecoder}.
*/
public class DefaultHttp2ConnectionDecoderTest {
private static final int STREAM_ID = 1;
private static final int PUSH_STREAM_ID = 2;
private DefaultHttp2ConnectionDecoder decoder;
@Mock
private Http2Connection connection;
@Mock
private Http2Connection.Endpoint remote;
@Mock
private Http2Connection.Endpoint local;
@Mock
private Http2InboundFlowController inboundFlow;
@Mock
private ChannelHandlerContext ctx;
@Mock
private Channel channel;
private ChannelPromise promise;
@Mock
private ChannelFuture future;
@Mock
private Http2Stream stream;
@Mock
private Http2Stream pushStream;
@Mock
private Http2FrameListener listener;
@Mock
private Http2FrameReader reader;
@Mock
private Http2ConnectionEncoder encoder;
@Mock
private Http2LifecycleManager lifecycleManager;
@Before
public void setup() throws Exception {
MockitoAnnotations.initMocks(this);
promise = new DefaultChannelPromise(channel);
when(channel.isActive()).thenReturn(true);
when(stream.id()).thenReturn(STREAM_ID);
when(stream.state()).thenReturn(OPEN);
when(pushStream.id()).thenReturn(PUSH_STREAM_ID);
when(connection.activeStreams()).thenReturn(Collections.singletonList(stream));
when(connection.stream(STREAM_ID)).thenReturn(stream);
when(connection.requireStream(STREAM_ID)).thenReturn(stream);
when(connection.local()).thenReturn(local);
when(connection.remote()).thenReturn(remote);
doAnswer(new Answer<Http2Stream>() {
@Override
public Http2Stream answer(InvocationOnMock invocation) throws Throwable {
Object[] args = invocation.getArguments();
return local.createStream((Integer) args[0], (Boolean) args[1]);
}
}).when(connection).createLocalStream(anyInt(), anyBoolean());
doAnswer(new Answer<Http2Stream>() {
@Override
public Http2Stream answer(InvocationOnMock invocation) throws Throwable {
Object[] args = invocation.getArguments();
return remote.createStream((Integer) args[0], (Boolean) args[1]);
}
}).when(connection).createRemoteStream(anyInt(), anyBoolean());
when(local.createStream(eq(STREAM_ID), anyBoolean())).thenReturn(stream);
when(local.reservePushStream(eq(PUSH_STREAM_ID), eq(stream))).thenReturn(pushStream);
when(remote.createStream(eq(STREAM_ID), anyBoolean())).thenReturn(stream);
when(remote.reservePushStream(eq(PUSH_STREAM_ID), eq(stream))).thenReturn(pushStream);
when(ctx.alloc()).thenReturn(UnpooledByteBufAllocator.DEFAULT);
when(ctx.channel()).thenReturn(channel);
when(ctx.newSucceededFuture()).thenReturn(future);
when(ctx.newPromise()).thenReturn(promise);
when(ctx.write(any())).thenReturn(future);
decoder =
new DefaultHttp2ConnectionDecoder(connection, reader, inboundFlow, encoder,
lifecycleManager, listener);
// Simulate receiving the initial settings from the remote endpoint.
decode().onSettingsRead(ctx, new Http2Settings());
verify(listener).onSettingsRead(eq(ctx), eq(new Http2Settings()));
assertTrue(decoder.prefaceReceived());
verify(encoder).writeSettingsAck(eq(ctx), eq(promise));
// Simulate receiving the SETTINGS ACK for the initial settings.
decode().onSettingsAckRead(ctx);
}
@Test
public void dataReadAfterGoAwayShouldApplyFlowControl() throws Exception {
when(remote.isGoAwayReceived()).thenReturn(true);
final ByteBuf data = dummyData();
try {
decode().onDataRead(ctx, STREAM_ID, data, 10, true);
verify(inboundFlow).onDataRead(eq(ctx), eq(STREAM_ID), eq(data), eq(10), eq(true));
// Verify that the event was absorbed and not propagated to the oberver.
verify(listener, never()).onDataRead(eq(ctx), anyInt(), any(ByteBuf.class), anyInt(), anyBoolean());
} finally {
data.release();
}
}
@Test
public void dataReadWithEndOfStreamShouldCloseRemoteSide() throws Exception {
final ByteBuf data = dummyData();
try {
decode().onDataRead(ctx, STREAM_ID, data, 10, true);
verify(inboundFlow).onDataRead(eq(ctx), eq(STREAM_ID), eq(data), eq(10), eq(true));
verify(lifecycleManager).closeRemoteSide(eq(stream), eq(future));
verify(listener).onDataRead(eq(ctx), eq(STREAM_ID), eq(data), eq(10), eq(true));
} finally {
data.release();
}
}
@Test
public void headersReadAfterGoAwayShouldBeIgnored() throws Exception {
when(remote.isGoAwayReceived()).thenReturn(true);
decode().onHeadersRead(ctx, STREAM_ID, EmptyHttp2Headers.INSTANCE, 0, false);
verify(remote, never()).createStream(eq(STREAM_ID), eq(false));
// Verify that the event was absorbed and not propagated to the oberver.
verify(listener, never()).onHeadersRead(eq(ctx), anyInt(), any(Http2Headers.class), anyInt(), anyBoolean());
verify(remote, never()).createStream(anyInt(), anyBoolean());
}
@Test
public void headersReadForUnknownStreamShouldCreateStream() throws Exception {
when(remote.createStream(eq(5), eq(false))).thenReturn(stream);
decode().onHeadersRead(ctx, 5, EmptyHttp2Headers.INSTANCE, 0, false);
verify(remote).createStream(eq(5), eq(false));
verify(listener).onHeadersRead(eq(ctx), eq(5), eq(EmptyHttp2Headers.INSTANCE), eq(0),
eq(DEFAULT_PRIORITY_WEIGHT), eq(false), eq(0), eq(false));
}
@Test
public void headersReadForUnknownStreamShouldCreateHalfClosedStream() throws Exception {
when(remote.createStream(eq(5), eq(true))).thenReturn(stream);
decode().onHeadersRead(ctx, 5, EmptyHttp2Headers.INSTANCE, 0, true);
verify(remote).createStream(eq(5), eq(true));
verify(listener).onHeadersRead(eq(ctx), eq(5), eq(EmptyHttp2Headers.INSTANCE), eq(0),
eq(DEFAULT_PRIORITY_WEIGHT), eq(false), eq(0), eq(true));
}
@Test
public void headersReadForPromisedStreamShouldHalfOpenStream() throws Exception {
when(stream.state()).thenReturn(RESERVED_REMOTE);
decode().onHeadersRead(ctx, STREAM_ID, EmptyHttp2Headers.INSTANCE, 0, false);
verify(stream).openForPush();
verify(listener).onHeadersRead(eq(ctx), eq(STREAM_ID), eq(EmptyHttp2Headers.INSTANCE), eq(0),
eq(DEFAULT_PRIORITY_WEIGHT), eq(false), eq(0), eq(false));
}
@Test
public void headersReadForPromisedStreamShouldCloseStream() throws Exception {
when(stream.state()).thenReturn(RESERVED_REMOTE);
decode().onHeadersRead(ctx, STREAM_ID, EmptyHttp2Headers.INSTANCE, 0, true);
verify(stream).openForPush();
verify(lifecycleManager).closeRemoteSide(eq(stream), eq(future));
verify(listener).onHeadersRead(eq(ctx), eq(STREAM_ID), eq(EmptyHttp2Headers.INSTANCE), eq(0),
eq(DEFAULT_PRIORITY_WEIGHT), eq(false), eq(0), eq(true));
}
@Test
public void pushPromiseReadAfterGoAwayShouldBeIgnored() throws Exception {
when(remote.isGoAwayReceived()).thenReturn(true);
decode().onPushPromiseRead(ctx, STREAM_ID, PUSH_STREAM_ID, EmptyHttp2Headers.INSTANCE, 0);
verify(remote, never()).reservePushStream(anyInt(), any(Http2Stream.class));
verify(listener, never()).onPushPromiseRead(eq(ctx), anyInt(), anyInt(), any(Http2Headers.class), anyInt());
}
@Test
public void pushPromiseReadShouldSucceed() throws Exception {
decode().onPushPromiseRead(ctx, STREAM_ID, PUSH_STREAM_ID, EmptyHttp2Headers.INSTANCE, 0);
verify(remote).reservePushStream(eq(PUSH_STREAM_ID), eq(stream));
verify(listener).onPushPromiseRead(eq(ctx), eq(STREAM_ID), eq(PUSH_STREAM_ID),
eq(EmptyHttp2Headers.INSTANCE), eq(0));
}
@Test
public void priorityReadAfterGoAwayShouldBeIgnored() throws Exception {
when(remote.isGoAwayReceived()).thenReturn(true);
decode().onPriorityRead(ctx, STREAM_ID, 0, (short) 255, true);
verify(stream, never()).setPriority(anyInt(), anyShort(), anyBoolean());
verify(listener, never()).onPriorityRead(eq(ctx), anyInt(), anyInt(), anyShort(), anyBoolean());
}
@Test
public void priorityReadShouldSucceed() throws Exception {
decode().onPriorityRead(ctx, STREAM_ID, 0, (short) 255, true);
verify(stream).setPriority(eq(0), eq((short) 255), eq(true));
verify(listener).onPriorityRead(eq(ctx), eq(STREAM_ID), eq(0), eq((short) 255), eq(true));
}
@Test
public void windowUpdateReadAfterGoAwayShouldBeIgnored() throws Exception {
when(remote.isGoAwayReceived()).thenReturn(true);
decode().onWindowUpdateRead(ctx, STREAM_ID, 10);
verify(encoder, never()).updateOutboundWindowSize(anyInt(), anyInt());
verify(listener, never()).onWindowUpdateRead(eq(ctx), anyInt(), anyInt());
}
@Test(expected = Http2Exception.class)
public void windowUpdateReadForUnknownStreamShouldThrow() throws Exception {
when(connection.requireStream(5)).thenThrow(protocolError(""));
decode().onWindowUpdateRead(ctx, 5, 10);
}
@Test
public void windowUpdateReadShouldSucceed() throws Exception {
decode().onWindowUpdateRead(ctx, STREAM_ID, 10);
verify(encoder).updateOutboundWindowSize(eq(STREAM_ID), eq(10));
verify(listener).onWindowUpdateRead(eq(ctx), eq(STREAM_ID), eq(10));
}
@Test
public void rstStreamReadAfterGoAwayShouldSucceed() throws Exception {
when(remote.isGoAwayReceived()).thenReturn(true);
decode().onRstStreamRead(ctx, STREAM_ID, PROTOCOL_ERROR.code());
verify(lifecycleManager).closeStream(eq(stream), eq(future));
verify(listener).onRstStreamRead(eq(ctx), anyInt(), anyLong());
}
@Test(expected = Http2Exception.class)
public void rstStreamReadForUnknownStreamShouldThrow() throws Exception {
when(connection.requireStream(5)).thenThrow(protocolError(""));
decode().onRstStreamRead(ctx, 5, PROTOCOL_ERROR.code());
}
@Test
public void rstStreamReadShouldCloseStream() throws Exception {
decode().onRstStreamRead(ctx, STREAM_ID, PROTOCOL_ERROR.code());
verify(lifecycleManager).closeStream(eq(stream), eq(future));
verify(listener).onRstStreamRead(eq(ctx), eq(STREAM_ID), eq((long) PROTOCOL_ERROR.code()));
}
@Test
public void pingReadWithAckShouldNotifylistener() throws Exception {
decode().onPingAckRead(ctx, emptyPingBuf());
verify(listener).onPingAckRead(eq(ctx), eq(emptyPingBuf()));
}
@Test
public void pingReadShouldReplyWithAck() throws Exception {
decode().onPingRead(ctx, emptyPingBuf());
verify(encoder).writePing(eq(ctx), eq(true), eq(emptyPingBuf()), eq(promise));
verify(listener, never()).onPingAckRead(eq(ctx), any(ByteBuf.class));
}
@Test
public void settingsReadWithAckShouldNotifylistener() throws Exception {
decode().onSettingsAckRead(ctx);
// Take into account the time this was called during setup().
verify(listener, times(2)).onSettingsAckRead(eq(ctx));
}
@Test
public void settingsReadShouldSetValues() throws Exception {
when(connection.isServer()).thenReturn(true);
Http2Settings settings = new Http2Settings();
settings.pushEnabled(true);
settings.initialWindowSize(123);
settings.maxConcurrentStreams(456);
settings.headerTableSize(789);
decode().onSettingsRead(ctx, settings);
verify(encoder).remoteSettings(settings);
verify(listener).onSettingsRead(eq(ctx), eq(settings));
}
@Test
public void goAwayShouldReadShouldUpdateConnectionState() throws Exception {
decode().onGoAwayRead(ctx, 1, 2L, EMPTY_BUFFER);
verify(local).goAwayReceived(1);
verify(listener).onGoAwayRead(eq(ctx), eq(1), eq(2L), eq(EMPTY_BUFFER));
}
private static ByteBuf dummyData() {
// The buffer is purposely 8 bytes so it will even work for a ping frame.
return wrappedBuffer("abcdefgh".getBytes(UTF_8));
}
/**
* Calls the decode method on the handler and gets back the captured internal listener
*/
private Http2FrameListener decode() throws Exception {
ArgumentCaptor<Http2FrameListener> internallistener = ArgumentCaptor.forClass(Http2FrameListener.class);
doNothing().when(reader).readFrame(eq(ctx), any(ByteBuf.class), internallistener.capture());
decoder.decodeFrame(ctx, EMPTY_BUFFER, Collections.emptyList());
return internallistener.getValue();
}
}

View File

@ -0,0 +1,320 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License, version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package io.netty.handler.codec.http2;
import static io.netty.buffer.Unpooled.wrappedBuffer;
import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_PRIORITY_WEIGHT;
import static io.netty.handler.codec.http2.Http2CodecUtil.emptyPingBuf;
import static io.netty.handler.codec.http2.Http2Error.PROTOCOL_ERROR;
import static io.netty.handler.codec.http2.Http2Stream.State.HALF_CLOSED_LOCAL;
import static io.netty.handler.codec.http2.Http2Stream.State.OPEN;
import static io.netty.handler.codec.http2.Http2Stream.State.RESERVED_LOCAL;
import static io.netty.util.CharsetUtil.UTF_8;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.channel.DefaultChannelPromise;
import java.util.Collections;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
/**
* Tests for {@link DefaultHttp2ConnectionEncoder}
*/
public class DefaultHttp2ConnectionEncoderTest {
private static final int STREAM_ID = 1;
private static final int PUSH_STREAM_ID = 2;
private DefaultHttp2ConnectionEncoder encoder;
@Mock
private Http2Connection connection;
@Mock
private Http2Connection.Endpoint remote;
@Mock
private Http2Connection.Endpoint local;
@Mock
private Http2OutboundFlowController outboundFlow;
@Mock
private ChannelHandlerContext ctx;
@Mock
private Channel channel;
private ChannelPromise promise;
@Mock
private ChannelFuture future;
@Mock
private Http2Stream stream;
@Mock
private Http2Stream pushStream;
@Mock
private Http2FrameListener listener;
@Mock
private Http2FrameWriter writer;
@Mock
private Http2LifecycleManager lifecycleManager;
@Before
public void setup() throws Exception {
MockitoAnnotations.initMocks(this);
promise = new DefaultChannelPromise(channel);
when(channel.isActive()).thenReturn(true);
when(stream.id()).thenReturn(STREAM_ID);
when(stream.state()).thenReturn(OPEN);
when(pushStream.id()).thenReturn(PUSH_STREAM_ID);
when(connection.activeStreams()).thenReturn(Collections.singletonList(stream));
when(connection.stream(STREAM_ID)).thenReturn(stream);
when(connection.requireStream(STREAM_ID)).thenReturn(stream);
when(connection.local()).thenReturn(local);
when(connection.remote()).thenReturn(remote);
doAnswer(new Answer<Http2Stream>() {
@Override
public Http2Stream answer(InvocationOnMock invocation) throws Throwable {
Object[] args = invocation.getArguments();
return local.createStream((Integer) args[0], (Boolean) args[1]);
}
}).when(connection).createLocalStream(anyInt(), anyBoolean());
doAnswer(new Answer<Http2Stream>() {
@Override
public Http2Stream answer(InvocationOnMock invocation) throws Throwable {
Object[] args = invocation.getArguments();
return remote.createStream((Integer) args[0], (Boolean) args[1]);
}
}).when(connection).createRemoteStream(anyInt(), anyBoolean());
when(local.createStream(eq(STREAM_ID), anyBoolean())).thenReturn(stream);
when(local.reservePushStream(eq(PUSH_STREAM_ID), eq(stream))).thenReturn(pushStream);
when(remote.createStream(eq(STREAM_ID), anyBoolean())).thenReturn(stream);
when(remote.reservePushStream(eq(PUSH_STREAM_ID), eq(stream))).thenReturn(pushStream);
when(writer.writeSettings(eq(ctx), any(Http2Settings.class), eq(promise))).thenReturn(future);
when(writer.writeGoAway(eq(ctx), anyInt(), anyInt(), any(ByteBuf.class), eq(promise))).thenReturn(future);
when(outboundFlow.writeData(eq(ctx), anyInt(), any(ByteBuf.class), anyInt(), anyBoolean(), eq(promise)))
.thenReturn(future);
when(ctx.alloc()).thenReturn(UnpooledByteBufAllocator.DEFAULT);
when(ctx.channel()).thenReturn(channel);
when(ctx.newSucceededFuture()).thenReturn(future);
when(ctx.newPromise()).thenReturn(promise);
when(ctx.write(any())).thenReturn(future);
encoder = new DefaultHttp2ConnectionEncoder(connection, writer, outboundFlow, lifecycleManager);
}
@Test
public void dataWriteAfterGoAwayShouldFail() throws Exception {
when(connection.isGoAway()).thenReturn(true);
final ByteBuf data = dummyData();
try {
ChannelFuture future = encoder.writeData(ctx, STREAM_ID, data, 0, false, promise);
assertTrue(future.awaitUninterruptibly().cause() instanceof Http2Exception);
} finally {
while (data.refCnt() > 0) {
data.release();
}
}
}
@Test
public void dataWriteShouldSucceed() throws Exception {
final ByteBuf data = dummyData();
try {
encoder.writeData(ctx, STREAM_ID, data, 0, false, promise);
verify(outboundFlow).writeData(eq(ctx), eq(STREAM_ID), eq(data), eq(0), eq(false), eq(promise));
} finally {
data.release();
}
}
@Test
public void dataWriteShouldHalfCloseStream() throws Exception {
reset(future);
final ByteBuf data = dummyData();
try {
encoder.writeData(ctx, STREAM_ID, data, 0, true, promise);
verify(outboundFlow).writeData(eq(ctx), eq(STREAM_ID), eq(data), eq(0), eq(true), eq(promise));
// Invoke the listener callback indicating that the write completed successfully.
ArgumentCaptor<ChannelFutureListener> captor = ArgumentCaptor.forClass(ChannelFutureListener.class);
verify(future).addListener(captor.capture());
when(future.isSuccess()).thenReturn(true);
captor.getValue().operationComplete(future);
verify(lifecycleManager).closeLocalSide(eq(stream), eq(promise));
} finally {
data.release();
}
}
@Test
public void headersWriteAfterGoAwayShouldFail() throws Exception {
when(connection.isGoAway()).thenReturn(true);
ChannelFuture future = encoder.writeHeaders(
ctx, 5, EmptyHttp2Headers.INSTANCE, 0, (short) 255, false, 0, false, promise);
verify(local, never()).createStream(anyInt(), anyBoolean());
verify(writer, never()).writeHeaders(eq(ctx), anyInt(), any(Http2Headers.class), anyInt(), anyBoolean(),
eq(promise));
assertTrue(future.awaitUninterruptibly().cause() instanceof Http2Exception);
}
@Test
public void headersWriteForUnknownStreamShouldCreateStream() throws Exception {
when(local.createStream(eq(5), eq(false))).thenReturn(stream);
encoder.writeHeaders(ctx, 5, EmptyHttp2Headers.INSTANCE, 0, false, promise);
verify(local).createStream(eq(5), eq(false));
verify(writer).writeHeaders(eq(ctx), eq(5), eq(EmptyHttp2Headers.INSTANCE), eq(0),
eq(DEFAULT_PRIORITY_WEIGHT), eq(false), eq(0), eq(false), eq(promise));
}
@Test
public void headersWriteShouldCreateHalfClosedStream() throws Exception {
when(local.createStream(eq(5), eq(true))).thenReturn(stream);
encoder.writeHeaders(ctx, 5, EmptyHttp2Headers.INSTANCE, 0, true, promise);
verify(local).createStream(eq(5), eq(true));
verify(writer).writeHeaders(eq(ctx), eq(5), eq(EmptyHttp2Headers.INSTANCE), eq(0),
eq(DEFAULT_PRIORITY_WEIGHT), eq(false), eq(0), eq(true), eq(promise));
}
@Test
public void headersWriteShouldOpenStreamForPush() throws Exception {
when(stream.state()).thenReturn(RESERVED_LOCAL);
encoder.writeHeaders(ctx, STREAM_ID, EmptyHttp2Headers.INSTANCE, 0, false, promise);
verify(stream).openForPush();
verify(stream, never()).closeLocalSide();
verify(writer).writeHeaders(eq(ctx), eq(STREAM_ID), eq(EmptyHttp2Headers.INSTANCE), eq(0),
eq(DEFAULT_PRIORITY_WEIGHT), eq(false), eq(0), eq(false), eq(promise));
}
@Test
public void headersWriteShouldClosePushStream() throws Exception {
when(stream.state()).thenReturn(RESERVED_LOCAL).thenReturn(HALF_CLOSED_LOCAL);
encoder.writeHeaders(ctx, STREAM_ID, EmptyHttp2Headers.INSTANCE, 0, true, promise);
verify(stream).openForPush();
verify(lifecycleManager).closeLocalSide(eq(stream), eq(promise));
verify(writer).writeHeaders(eq(ctx), eq(STREAM_ID), eq(EmptyHttp2Headers.INSTANCE), eq(0),
eq(DEFAULT_PRIORITY_WEIGHT), eq(false), eq(0), eq(true), eq(promise));
}
@Test
public void pushPromiseWriteAfterGoAwayShouldFail() throws Exception {
when(connection.isGoAway()).thenReturn(true);
ChannelFuture future =
encoder.writePushPromise(ctx, STREAM_ID, PUSH_STREAM_ID,
EmptyHttp2Headers.INSTANCE, 0, promise);
assertTrue(future.awaitUninterruptibly().cause() instanceof Http2Exception);
}
@Test
public void pushPromiseWriteShouldReserveStream() throws Exception {
encoder.writePushPromise(ctx, STREAM_ID, PUSH_STREAM_ID, EmptyHttp2Headers.INSTANCE, 0, promise);
verify(local).reservePushStream(eq(PUSH_STREAM_ID), eq(stream));
verify(writer).writePushPromise(eq(ctx), eq(STREAM_ID), eq(PUSH_STREAM_ID),
eq(EmptyHttp2Headers.INSTANCE), eq(0), eq(promise));
}
@Test
public void priorityWriteAfterGoAwayShouldFail() throws Exception {
when(connection.isGoAway()).thenReturn(true);
ChannelFuture future = encoder.writePriority(ctx, STREAM_ID, 0, (short) 255, true, promise);
assertTrue(future.awaitUninterruptibly().cause() instanceof Http2Exception);
}
@Test
public void priorityWriteShouldSetPriorityForStream() throws Exception {
encoder.writePriority(ctx, STREAM_ID, 0, (short) 255, true, promise);
verify(stream).setPriority(eq(0), eq((short) 255), eq(true));
verify(writer).writePriority(eq(ctx), eq(STREAM_ID), eq(0), eq((short) 255), eq(true), eq(promise));
}
@Test
public void rstStreamWriteForUnknownStreamShouldIgnore() throws Exception {
encoder.writeRstStream(ctx, 5, PROTOCOL_ERROR.code(), promise);
verify(writer, never()).writeRstStream(eq(ctx), anyInt(), anyLong(), eq(promise));
}
@Test
public void rstStreamWriteShouldCloseStream() throws Exception {
encoder.writeRstStream(ctx, STREAM_ID, PROTOCOL_ERROR.code(), promise);
verify(lifecycleManager).writeRstStream(eq(ctx), eq(STREAM_ID), eq((long) PROTOCOL_ERROR.code()), eq(promise));
}
@Test
public void pingWriteAfterGoAwayShouldFail() throws Exception {
when(connection.isGoAway()).thenReturn(true);
ChannelFuture future = encoder.writePing(ctx, false, emptyPingBuf(), promise);
assertTrue(future.awaitUninterruptibly().cause() instanceof Http2Exception);
}
@Test
public void pingWriteShouldSucceed() throws Exception {
encoder.writePing(ctx, false, emptyPingBuf(), promise);
verify(writer).writePing(eq(ctx), eq(false), eq(emptyPingBuf()), eq(promise));
}
@Test
public void settingsWriteAfterGoAwayShouldFail() throws Exception {
when(connection.isGoAway()).thenReturn(true);
ChannelFuture future = encoder.writeSettings(ctx, new Http2Settings(), promise);
assertTrue(future.awaitUninterruptibly().cause() instanceof Http2Exception);
}
@Test
public void settingsWriteShouldNotUpdateSettings() throws Exception {
Http2Settings settings = new Http2Settings();
settings.initialWindowSize(100);
settings.pushEnabled(false);
settings.maxConcurrentStreams(1000);
settings.headerTableSize(2000);
encoder.writeSettings(ctx, settings, promise);
verify(writer).writeSettings(eq(ctx), eq(settings), eq(promise));
}
private static ByteBuf dummyData() {
// The buffer is purposely 8 bytes so it will even work for a ping frame.
return wrappedBuffer("abcdefgh".getBytes(UTF_8));
}
}

View File

@ -15,43 +15,22 @@
package io.netty.handler.codec.http2;
import static io.netty.buffer.Unpooled.EMPTY_BUFFER;
import static io.netty.buffer.Unpooled.copiedBuffer;
import static io.netty.buffer.Unpooled.wrappedBuffer;
import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_MAX_FRAME_SIZE;
import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_PRIORITY_WEIGHT;
import static io.netty.handler.codec.http2.Http2CodecUtil.connectionPrefaceBuf;
import static io.netty.handler.codec.http2.Http2CodecUtil.emptyPingBuf;
import static io.netty.handler.codec.http2.Http2Error.INTERNAL_ERROR;
import static io.netty.handler.codec.http2.Http2Error.NO_ERROR;
import static io.netty.handler.codec.http2.Http2Error.PROTOCOL_ERROR;
import static io.netty.handler.codec.http2.Http2Exception.protocolError;
import static io.netty.handler.codec.http2.Http2Stream.State.HALF_CLOSED_LOCAL;
import static io.netty.handler.codec.http2.Http2Stream.State.OPEN;
import static io.netty.handler.codec.http2.Http2Stream.State.RESERVED_LOCAL;
import static io.netty.handler.codec.http2.Http2Stream.State.RESERVED_REMOTE;
import static io.netty.util.CharsetUtil.UTF_8;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.anyShort;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.channel.DefaultChannelPromise;
@ -62,7 +41,7 @@ import java.util.List;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Matchers;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.mockito.invocation.InvocationOnMock;
@ -73,7 +52,6 @@ import org.mockito.stubbing.Answer;
*/
public class Http2ConnectionHandlerTest {
private static final int STREAM_ID = 1;
private static final int PUSH_STREAM_ID = 2;
private Http2ConnectionHandler handler;
@ -86,12 +64,6 @@ public class Http2ConnectionHandlerTest {
@Mock
private Http2Connection.Endpoint local;
@Mock
private Http2InboundFlowController inboundFlow;
@Mock
private Http2OutboundFlowController outboundFlow;
@Mock
private ChannelHandlerContext ctx;
@ -107,34 +79,13 @@ public class Http2ConnectionHandlerTest {
private Http2Stream stream;
@Mock
private Http2Stream pushStream;
private Http2LifecycleManager lifecycleManager;
@Mock
private Http2FrameListener listener;
private Http2ConnectionDecoder decoder;
@Mock
private Http2FrameReader reader;
@Mock
private Http2FrameWriter writer;
@Mock
private Http2HeaderTable readerTable;
@Mock
private Http2HeaderTable writerTable;
@Mock
private Http2FrameSizePolicy readerFrameSizePolicy;
@Mock
private Http2FrameSizePolicy writerFrameSizePolicy;
@Mock
private Http2FrameReader.Configuration readerConfiguration;
@Mock
private Http2FrameWriter.Configuration writerConfiguration;
private Http2ConnectionEncoder encoder;
@Before
public void setup() throws Exception {
@ -143,21 +94,7 @@ public class Http2ConnectionHandlerTest {
promise = new DefaultChannelPromise(channel);
when(channel.isActive()).thenReturn(true);
when(stream.id()).thenReturn(STREAM_ID);
when(stream.state()).thenReturn(OPEN);
when(pushStream.id()).thenReturn(PUSH_STREAM_ID);
when(connection.activeStreams()).thenReturn(Collections.singletonList(stream));
when(connection.stream(STREAM_ID)).thenReturn(stream);
when(connection.requireStream(STREAM_ID)).thenReturn(stream);
when(connection.local()).thenReturn(local);
when(connection.remote()).thenReturn(remote);
doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
((Http2Stream) invocation.getArguments()[0]).close();
return null;
}
}).when(connection).close(any(Http2Stream.class), any(ChannelFuture.class), any(ChannelFutureListener.class));
doAnswer(new Answer<Http2Stream>() {
@Override
public Http2Stream answer(InvocationOnMock invocation) throws Throwable {
@ -172,59 +109,18 @@ public class Http2ConnectionHandlerTest {
return remote.createStream((Integer) args[0], (Boolean) args[1]);
}
}).when(connection).createRemoteStream(anyInt(), anyBoolean());
when(local.createStream(eq(STREAM_ID), anyBoolean())).thenReturn(stream);
when(local.reservePushStream(eq(PUSH_STREAM_ID), eq(stream))).thenReturn(pushStream);
when(remote.createStream(eq(STREAM_ID), anyBoolean())).thenReturn(stream);
when(remote.reservePushStream(eq(PUSH_STREAM_ID), eq(stream))).thenReturn(pushStream);
when(writer.writeSettings(eq(ctx), any(Http2Settings.class), eq(promise))).thenReturn(future);
when(writer.writeGoAway(eq(ctx), anyInt(), anyInt(), any(ByteBuf.class), eq(promise))).thenReturn(future);
when(outboundFlow.writeData(eq(ctx), anyInt(), any(ByteBuf.class), anyInt(), anyBoolean(), eq(promise)))
.thenReturn(future);
mockContext();
when(encoder.writeSettings(eq(ctx), any(Http2Settings.class), eq(promise))).thenReturn(future);
when(ctx.alloc()).thenReturn(UnpooledByteBufAllocator.DEFAULT);
when(ctx.channel()).thenReturn(channel);
when(ctx.newSucceededFuture()).thenReturn(future);
when(ctx.newPromise()).thenReturn(promise);
when(ctx.write(any())).thenReturn(future);
handler = newConnectionHandler();
// Simulate activation of the handler to force writing the initial settings.
Http2Settings settings = new Http2Settings();
settings.initialWindowSize(10);
settings.pushEnabled(true);
settings.maxConcurrentStreams(100);
settings.headerTableSize(200);
settings.maxFrameSize(DEFAULT_MAX_FRAME_SIZE);
settings.maxHeaderListSize(Integer.MAX_VALUE);
when(inboundFlow.initialInboundWindowSize()).thenReturn(10);
when(local.allowPushTo()).thenReturn(true);
when(remote.maxStreams()).thenReturn(100);
when(reader.configuration()).thenReturn(readerConfiguration);
when(writer.configuration()).thenReturn(writerConfiguration);
when(readerConfiguration.frameSizePolicy()).thenReturn(readerFrameSizePolicy);
when(writerConfiguration.frameSizePolicy()).thenReturn(writerFrameSizePolicy);
when(readerFrameSizePolicy.maxFrameSize()).thenReturn(DEFAULT_MAX_FRAME_SIZE);
when(writerFrameSizePolicy.maxFrameSize()).thenReturn(DEFAULT_MAX_FRAME_SIZE);
when(readerConfiguration.headerTable()).thenReturn(readerTable);
when(writerConfiguration.headerTable()).thenReturn(writerTable);
when(readerTable.maxHeaderTableSize()).thenReturn(200);
when(readerTable.maxHeaderListSize()).thenReturn(Integer.MAX_VALUE);
when(writerTable.maxHeaderListSize()).thenReturn(Integer.MAX_VALUE);
handler.handlerAdded(ctx);
verify(writer).writeSettings(eq(ctx), eq(settings), eq(promise));
// Simulate receiving the initial settings from the remote endpoint.
decode().onSettingsRead(ctx, new Http2Settings());
verify(listener).onSettingsRead(eq(ctx), eq(new Http2Settings()));
verify(writer).writeSettingsAck(eq(ctx), eq(promise));
// Simulate receiving the SETTINGS ACK for the initial settings.
decode().onSettingsAckRead(ctx);
// Re-mock the context so no calls are registered.
mockContext();
handler.handlerAdded(ctx);
handler = newHandler();
}
private Http2ConnectionHandler newConnectionHandler() {
return new Http2ConnectionHandler(connection, listener, reader, inboundFlow,
new Http2OutboundConnectionAdapter(connection, writer, outboundFlow));
private Http2ConnectionHandler newHandler() {
return new Http2ConnectionHandler(connection, decoder, encoder, lifecycleManager);
}
@After
@ -235,7 +131,6 @@ public class Http2ConnectionHandlerTest {
@Test
public void clientShouldSendClientPrefaceStringWhenActive() throws Exception {
when(connection.isServer()).thenReturn(false);
handler = newConnectionHandler();
handler.channelActive(ctx);
verify(ctx).write(eq(connectionPrefaceBuf()));
}
@ -243,497 +138,42 @@ public class Http2ConnectionHandlerTest {
@Test
public void serverShouldNotSendClientPrefaceStringWhenActive() throws Exception {
when(connection.isServer()).thenReturn(true);
handler = newConnectionHandler();
handler.channelActive(ctx);
verify(ctx, never()).write(eq(connectionPrefaceBuf()));
}
@Test
public void serverReceivingInvalidClientPrefaceStringShouldCloseConnection() throws Exception {
public void serverReceivingInvalidClientPrefaceStringShouldHandleException() throws Exception {
when(connection.isServer()).thenReturn(true);
handler = newConnectionHandler();
handler = newHandler();
handler.channelRead(ctx, copiedBuffer("BAD_PREFACE", UTF_8));
verify(ctx).close();
verify(lifecycleManager).onHttp2Exception(eq(ctx), any(Http2Exception.class));
}
@Test
public void serverReceivingValidClientPrefaceStringShouldContinueReadingFrames() throws Exception {
reset(listener);
when(connection.isServer()).thenReturn(true);
handler = newConnectionHandler();
handler.channelRead(ctx, connectionPrefaceBuf());
verify(ctx, never()).close();
decode().onSettingsRead(ctx, new Http2Settings());
verify(listener).onSettingsRead(eq(ctx), eq(new Http2Settings()));
verify(decoder).decodeFrame(eq(ctx), any(ByteBuf.class), Matchers.<List<Object>>any());
}
@Test
public void closeShouldSendGoAway() throws Exception {
public void closeShouldCallLifecycleManager() throws Exception {
handler.close(ctx, promise);
verify(writer).writeGoAway(eq(ctx), eq(0), eq((long) NO_ERROR.code()), eq(EMPTY_BUFFER), eq(promise));
verify(remote).goAwayReceived(0);
verify(lifecycleManager).close(eq(ctx), eq(promise));
}
@Test
public void channelInactiveShouldCloseStreams() throws Exception {
handler.channelInactive(ctx);
verify(stream).close();
verify(lifecycleManager).closeStream(eq(stream), eq(future));
}
@Test
public void streamErrorShouldCloseStream() throws Exception {
Http2Exception e = new Http2StreamException(STREAM_ID, PROTOCOL_ERROR);
handler.exceptionCaught(ctx, e);
verify(stream).close();
verify(writer).writeRstStream(eq(ctx), eq(STREAM_ID), eq((long) PROTOCOL_ERROR.code()), eq(promise));
}
@Test
public void connectionErrorShouldSendGoAway() throws Exception {
public void http2ExceptionShouldCallLifecycleManager() throws Exception {
Http2Exception e = new Http2Exception(PROTOCOL_ERROR);
when(remote.lastStreamCreated()).thenReturn(STREAM_ID);
handler.exceptionCaught(ctx, e);
verify(remote).goAwayReceived(STREAM_ID);
verify(writer).writeGoAway(eq(ctx), eq(STREAM_ID), eq((long) PROTOCOL_ERROR.code()), eq(EMPTY_BUFFER),
eq(promise));
}
@Test
public void dataReadAfterGoAwayShouldApplyFlowControl() throws Exception {
when(remote.isGoAwayReceived()).thenReturn(true);
final ByteBuf data = dummyData();
try {
decode().onDataRead(ctx, STREAM_ID, data, 10, true);
verify(inboundFlow).onDataRead(eq(ctx), eq(STREAM_ID), eq(data), eq(10), eq(true));
// Verify that the event was absorbed and not propagated to the oberver.
verify(listener, never()).onDataRead(eq(ctx), anyInt(), any(ByteBuf.class), anyInt(), anyBoolean());
} finally {
data.release();
}
}
@Test
public void dataReadWithEndOfStreamShouldCloseRemoteSide() throws Exception {
final ByteBuf data = dummyData();
try {
decode().onDataRead(ctx, STREAM_ID, data, 10, true);
verify(inboundFlow).onDataRead(eq(ctx), eq(STREAM_ID), eq(data), eq(10), eq(true));
verify(stream).closeRemoteSide();
verify(listener).onDataRead(eq(ctx), eq(STREAM_ID), eq(data), eq(10), eq(true));
} finally {
data.release();
}
}
@Test
public void headersReadAfterGoAwayShouldBeIgnored() throws Exception {
when(remote.isGoAwayReceived()).thenReturn(true);
decode().onHeadersRead(ctx, STREAM_ID, EmptyHttp2Headers.INSTANCE, 0, false);
verify(remote, never()).createStream(eq(STREAM_ID), eq(false));
// Verify that the event was absorbed and not propagated to the oberver.
verify(listener, never()).onHeadersRead(eq(ctx), anyInt(), any(Http2Headers.class), anyInt(), anyBoolean());
verify(remote, never()).createStream(anyInt(), anyBoolean());
}
@Test
public void headersReadForUnknownStreamShouldCreateStream() throws Exception {
when(remote.createStream(eq(5), eq(false))).thenReturn(stream);
decode().onHeadersRead(ctx, 5, EmptyHttp2Headers.INSTANCE, 0, false);
verify(remote).createStream(eq(5), eq(false));
verify(listener).onHeadersRead(eq(ctx), eq(5), eq(EmptyHttp2Headers.INSTANCE), eq(0),
eq(DEFAULT_PRIORITY_WEIGHT), eq(false), eq(0), eq(false));
}
@Test
public void headersReadForUnknownStreamShouldCreateHalfClosedStream() throws Exception {
when(remote.createStream(eq(5), eq(true))).thenReturn(stream);
decode().onHeadersRead(ctx, 5, EmptyHttp2Headers.INSTANCE, 0, true);
verify(remote).createStream(eq(5), eq(true));
verify(listener).onHeadersRead(eq(ctx), eq(5), eq(EmptyHttp2Headers.INSTANCE), eq(0),
eq(DEFAULT_PRIORITY_WEIGHT), eq(false), eq(0), eq(true));
}
@Test
public void headersReadForPromisedStreamShouldHalfOpenStream() throws Exception {
when(stream.state()).thenReturn(RESERVED_REMOTE);
decode().onHeadersRead(ctx, STREAM_ID, EmptyHttp2Headers.INSTANCE, 0, false);
verify(stream).openForPush();
verify(listener).onHeadersRead(eq(ctx), eq(STREAM_ID), eq(EmptyHttp2Headers.INSTANCE), eq(0),
eq(DEFAULT_PRIORITY_WEIGHT), eq(false), eq(0), eq(false));
}
@Test
public void headersReadForPromisedStreamShouldCloseStream() throws Exception {
when(stream.state()).thenReturn(RESERVED_REMOTE);
decode().onHeadersRead(ctx, STREAM_ID, EmptyHttp2Headers.INSTANCE, 0, true);
verify(stream).openForPush();
verify(stream).close();
verify(listener).onHeadersRead(eq(ctx), eq(STREAM_ID), eq(EmptyHttp2Headers.INSTANCE), eq(0),
eq(DEFAULT_PRIORITY_WEIGHT), eq(false), eq(0), eq(true));
}
@Test
public void pushPromiseReadAfterGoAwayShouldBeIgnored() throws Exception {
when(remote.isGoAwayReceived()).thenReturn(true);
decode().onPushPromiseRead(ctx, STREAM_ID, PUSH_STREAM_ID, EmptyHttp2Headers.INSTANCE, 0);
verify(remote, never()).reservePushStream(anyInt(), any(Http2Stream.class));
verify(listener, never()).onPushPromiseRead(eq(ctx), anyInt(), anyInt(), any(Http2Headers.class), anyInt());
}
@Test
public void pushPromiseReadShouldSucceed() throws Exception {
decode().onPushPromiseRead(ctx, STREAM_ID, PUSH_STREAM_ID, EmptyHttp2Headers.INSTANCE, 0);
verify(remote).reservePushStream(eq(PUSH_STREAM_ID), eq(stream));
verify(listener).onPushPromiseRead(eq(ctx), eq(STREAM_ID), eq(PUSH_STREAM_ID),
eq(EmptyHttp2Headers.INSTANCE), eq(0));
}
@Test
public void priorityReadAfterGoAwayShouldBeIgnored() throws Exception {
when(remote.isGoAwayReceived()).thenReturn(true);
decode().onPriorityRead(ctx, STREAM_ID, 0, (short) 255, true);
verify(stream, never()).setPriority(anyInt(), anyShort(), anyBoolean());
verify(listener, never()).onPriorityRead(eq(ctx), anyInt(), anyInt(), anyShort(), anyBoolean());
}
@Test
public void priorityReadShouldSucceed() throws Exception {
decode().onPriorityRead(ctx, STREAM_ID, 0, (short) 255, true);
verify(stream).setPriority(eq(0), eq((short) 255), eq(true));
verify(listener).onPriorityRead(eq(ctx), eq(STREAM_ID), eq(0), eq((short) 255), eq(true));
}
@Test
public void windowUpdateReadAfterGoAwayShouldBeIgnored() throws Exception {
when(remote.isGoAwayReceived()).thenReturn(true);
decode().onWindowUpdateRead(ctx, STREAM_ID, 10);
verify(outboundFlow, never()).updateOutboundWindowSize(anyInt(), anyInt());
verify(listener, never()).onWindowUpdateRead(eq(ctx), anyInt(), anyInt());
}
@Test(expected = Http2Exception.class)
public void windowUpdateReadForUnknownStreamShouldThrow() throws Exception {
when(connection.requireStream(5)).thenThrow(protocolError(""));
decode().onWindowUpdateRead(ctx, 5, 10);
}
@Test
public void windowUpdateReadShouldSucceed() throws Exception {
decode().onWindowUpdateRead(ctx, STREAM_ID, 10);
verify(outboundFlow).updateOutboundWindowSize(eq(STREAM_ID), eq(10));
verify(listener).onWindowUpdateRead(eq(ctx), eq(STREAM_ID), eq(10));
}
@Test
public void rstStreamReadAfterGoAwayShouldSucceed() throws Exception {
when(remote.isGoAwayReceived()).thenReturn(true);
decode().onRstStreamRead(ctx, STREAM_ID, PROTOCOL_ERROR.code());
verify(stream).close();
verify(listener).onRstStreamRead(eq(ctx), anyInt(), anyLong());
}
@Test(expected = Http2Exception.class)
public void rstStreamReadForUnknownStreamShouldThrow() throws Exception {
when(connection.requireStream(5)).thenThrow(protocolError(""));
decode().onRstStreamRead(ctx, 5, PROTOCOL_ERROR.code());
}
@Test
public void rstStreamReadShouldCloseStream() throws Exception {
decode().onRstStreamRead(ctx, STREAM_ID, PROTOCOL_ERROR.code());
verify(stream).close();
verify(listener).onRstStreamRead(eq(ctx), eq(STREAM_ID), eq((long) PROTOCOL_ERROR.code()));
}
@Test
public void pingReadWithAckShouldNotifylistener() throws Exception {
decode().onPingAckRead(ctx, emptyPingBuf());
verify(listener).onPingAckRead(eq(ctx), eq(emptyPingBuf()));
}
@Test
public void pingReadShouldReplyWithAck() throws Exception {
decode().onPingRead(ctx, emptyPingBuf());
verify(writer).writePing(eq(ctx), eq(true), eq(emptyPingBuf()), eq(promise));
verify(listener, never()).onPingAckRead(eq(ctx), any(ByteBuf.class));
}
@Test
public void settingsReadWithAckShouldNotifylistener() throws Exception {
decode().onSettingsAckRead(ctx);
// Take into account the time this was called during setup().
verify(listener, times(2)).onSettingsAckRead(eq(ctx));
}
@Test(expected = Http2Exception.class)
public void clientSettingsReadWithPushShouldThrow() throws Exception {
when(connection.isServer()).thenReturn(false);
Http2Settings settings = new Http2Settings();
settings.pushEnabled(true);
decode().onSettingsRead(ctx, settings);
}
@Test
public void settingsReadShouldSetValues() throws Exception {
when(connection.isServer()).thenReturn(true);
Http2Settings settings = new Http2Settings();
settings.pushEnabled(true);
settings.initialWindowSize(123);
settings.maxConcurrentStreams(456);
settings.headerTableSize(789);
decode().onSettingsRead(ctx, settings);
verify(remote).allowPushTo(true);
verify(outboundFlow).initialOutboundWindowSize(123);
verify(local).maxStreams(456);
verify(writerTable).maxHeaderTableSize(789);
// Take into account the time this was called during setup().
verify(writer, times(2)).writeSettingsAck(eq(ctx), eq(promise));
verify(listener).onSettingsRead(eq(ctx), eq(settings));
}
@Test
public void goAwayShouldReadShouldUpdateConnectionState() throws Exception {
decode().onGoAwayRead(ctx, 1, 2L, EMPTY_BUFFER);
verify(local).goAwayReceived(1);
verify(listener).onGoAwayRead(eq(ctx), eq(1), eq(2L), eq(EMPTY_BUFFER));
}
@Test
public void dataWriteAfterGoAwayShouldFail() throws Exception {
when(connection.isGoAway()).thenReturn(true);
final ByteBuf data = dummyData();
try {
ChannelFuture future = handler.writeData(ctx, STREAM_ID, data, 0, false, promise);
assertTrue(future.awaitUninterruptibly().cause() instanceof Http2Exception);
} finally {
while (data.refCnt() > 0) {
data.release();
}
}
}
@Test
public void dataWriteShouldSucceed() throws Exception {
final ByteBuf data = dummyData();
try {
handler.writeData(ctx, STREAM_ID, data, 0, false, promise);
verify(outboundFlow).writeData(eq(ctx), eq(STREAM_ID), eq(data), eq(0), eq(false), eq(promise));
} finally {
data.release();
}
}
@Test
public void dataWriteShouldHalfCloseStream() throws Exception {
reset(future);
final ByteBuf data = dummyData();
try {
handler.writeData(ctx, STREAM_ID, data, 0, true, promise);
verify(outboundFlow).writeData(eq(ctx), eq(STREAM_ID), eq(data), eq(0), eq(true), eq(promise));
// Invoke the listener callback indicating that the write completed successfully.
ArgumentCaptor<ChannelFutureListener> captor = ArgumentCaptor.forClass(ChannelFutureListener.class);
verify(future).addListener(captor.capture());
when(future.isSuccess()).thenReturn(true);
captor.getValue().operationComplete(future);
verify(stream).closeLocalSide();
} finally {
data.release();
}
}
@Test
public void dataWriteWithFailureShouldHandleException() throws Exception {
reset(future);
final String msg = "fake exception";
final ByteBuf exceptionData = Unpooled.copiedBuffer(msg.getBytes(UTF_8));
final ByteBuf data = dummyData();
List<ByteBuf> goAwayDataCapture = null;
try {
handler.writeData(ctx, STREAM_ID, data, 0, true, promise);
verify(outboundFlow).writeData(eq(ctx), eq(STREAM_ID), eq(data), eq(0), eq(true), eq(promise));
// Invoke the listener callback indicating that the write failed.
ArgumentCaptor<ChannelFutureListener> captor = ArgumentCaptor.forClass(ChannelFutureListener.class);
verify(future).addListener(captor.capture());
when(future.isSuccess()).thenReturn(false);
when(future.cause()).thenReturn(new RuntimeException(msg));
captor.getValue().operationComplete(future);
final ArgumentCaptor<ByteBuf> bufferCaptor = ArgumentCaptor.forClass(ByteBuf.class);
verify(writer).writeGoAway(eq(ctx), eq(0), eq((long) INTERNAL_ERROR.code()), bufferCaptor.capture(),
eq(promise));
goAwayDataCapture = bufferCaptor.getAllValues();
assertEquals(exceptionData, goAwayDataCapture.get(0));
verify(remote).goAwayReceived(0);
} finally {
data.release();
exceptionData.release();
if (goAwayDataCapture != null) {
for (int i = 0; i < goAwayDataCapture.size(); ++i) {
goAwayDataCapture.get(i).release();
}
}
}
}
@Test
public void headersWriteAfterGoAwayShouldFail() throws Exception {
when(connection.isGoAway()).thenReturn(true);
ChannelFuture future = handler.writeHeaders(
ctx, 5, EmptyHttp2Headers.INSTANCE, 0, (short) 255, false, 0, false, promise);
verify(local, never()).createStream(anyInt(), anyBoolean());
verify(writer, never()).writeHeaders(eq(ctx), anyInt(), any(Http2Headers.class), anyInt(), anyBoolean(),
eq(promise));
assertTrue(future.awaitUninterruptibly().cause() instanceof Http2Exception);
}
@Test
public void headersWriteForUnknownStreamShouldCreateStream() throws Exception {
when(local.createStream(eq(5), eq(false))).thenReturn(stream);
handler.writeHeaders(ctx, 5, EmptyHttp2Headers.INSTANCE, 0, false, promise);
verify(local).createStream(eq(5), eq(false));
verify(writer).writeHeaders(eq(ctx), eq(5), eq(EmptyHttp2Headers.INSTANCE), eq(0),
eq(DEFAULT_PRIORITY_WEIGHT), eq(false), eq(0), eq(false), eq(promise));
}
@Test
public void headersWriteShouldCreateHalfClosedStream() throws Exception {
when(local.createStream(eq(5), eq(true))).thenReturn(stream);
handler.writeHeaders(ctx, 5, EmptyHttp2Headers.INSTANCE, 0, true, promise);
verify(local).createStream(eq(5), eq(true));
verify(writer).writeHeaders(eq(ctx), eq(5), eq(EmptyHttp2Headers.INSTANCE), eq(0),
eq(DEFAULT_PRIORITY_WEIGHT), eq(false), eq(0), eq(true), eq(promise));
}
@Test
public void headersWriteShouldOpenStreamForPush() throws Exception {
when(stream.state()).thenReturn(RESERVED_LOCAL);
handler.writeHeaders(ctx, STREAM_ID, EmptyHttp2Headers.INSTANCE, 0, false, promise);
verify(stream).openForPush();
verify(stream, never()).closeLocalSide();
verify(writer).writeHeaders(eq(ctx), eq(STREAM_ID), eq(EmptyHttp2Headers.INSTANCE), eq(0),
eq(DEFAULT_PRIORITY_WEIGHT), eq(false), eq(0), eq(false), eq(promise));
}
@Test
public void headersWriteShouldClosePushStream() throws Exception {
when(stream.state()).thenReturn(RESERVED_LOCAL).thenReturn(HALF_CLOSED_LOCAL);
handler.writeHeaders(ctx, STREAM_ID, EmptyHttp2Headers.INSTANCE, 0, true, promise);
verify(stream).openForPush();
verify(stream).closeLocalSide();
verify(writer).writeHeaders(eq(ctx), eq(STREAM_ID), eq(EmptyHttp2Headers.INSTANCE), eq(0),
eq(DEFAULT_PRIORITY_WEIGHT), eq(false), eq(0), eq(true), eq(promise));
}
@Test
public void pushPromiseWriteAfterGoAwayShouldFail() throws Exception {
when(connection.isGoAway()).thenReturn(true);
ChannelFuture future =
handler.writePushPromise(ctx, STREAM_ID, PUSH_STREAM_ID,
EmptyHttp2Headers.INSTANCE, 0, promise);
assertTrue(future.awaitUninterruptibly().cause() instanceof Http2Exception);
}
@Test
public void pushPromiseWriteShouldReserveStream() throws Exception {
handler.writePushPromise(ctx, STREAM_ID, PUSH_STREAM_ID, EmptyHttp2Headers.INSTANCE, 0, promise);
verify(local).reservePushStream(eq(PUSH_STREAM_ID), eq(stream));
verify(writer).writePushPromise(eq(ctx), eq(STREAM_ID), eq(PUSH_STREAM_ID),
eq(EmptyHttp2Headers.INSTANCE), eq(0), eq(promise));
}
@Test
public void priorityWriteAfterGoAwayShouldFail() throws Exception {
when(connection.isGoAway()).thenReturn(true);
ChannelFuture future = handler.writePriority(ctx, STREAM_ID, 0, (short) 255, true, promise);
assertTrue(future.awaitUninterruptibly().cause() instanceof Http2Exception);
}
@Test
public void priorityWriteShouldSetPriorityForStream() throws Exception {
handler.writePriority(ctx, STREAM_ID, 0, (short) 255, true, promise);
verify(stream).setPriority(eq(0), eq((short) 255), eq(true));
verify(writer).writePriority(eq(ctx), eq(STREAM_ID), eq(0), eq((short) 255), eq(true), eq(promise));
}
@Test
public void rstStreamWriteForUnknownStreamShouldIgnore() throws Exception {
handler.writeRstStream(ctx, 5, PROTOCOL_ERROR.code(), promise);
verify(writer, never()).writeRstStream(eq(ctx), anyInt(), anyLong(), eq(promise));
}
@Test
public void rstStreamWriteShouldCloseStream() throws Exception {
handler.writeRstStream(ctx, STREAM_ID, PROTOCOL_ERROR.code(), promise);
verify(stream).close();
verify(writer).writeRstStream(eq(ctx), eq(STREAM_ID), eq((long) PROTOCOL_ERROR.code()), eq(promise));
}
@Test
public void pingWriteAfterGoAwayShouldFail() throws Exception {
when(connection.isGoAway()).thenReturn(true);
ChannelFuture future = handler.writePing(ctx, false, emptyPingBuf(), promise);
assertTrue(future.awaitUninterruptibly().cause() instanceof Http2Exception);
}
@Test
public void pingWriteShouldSucceed() throws Exception {
handler.writePing(ctx, false, emptyPingBuf(), promise);
verify(writer).writePing(eq(ctx), eq(false), eq(emptyPingBuf()), eq(promise));
}
@Test
public void settingsWriteAfterGoAwayShouldFail() throws Exception {
when(connection.isGoAway()).thenReturn(true);
ChannelFuture future = handler.writeSettings(ctx, new Http2Settings(), promise);
assertTrue(future.awaitUninterruptibly().cause() instanceof Http2Exception);
}
@Test
public void settingsWriteShouldNotUpdateSettings() throws Exception {
Http2Settings settings = new Http2Settings();
settings.initialWindowSize(100);
settings.pushEnabled(false);
settings.maxConcurrentStreams(1000);
settings.headerTableSize(2000);
handler.writeSettings(ctx, settings, promise);
verify(writer).writeSettings(eq(ctx), eq(settings), eq(promise));
// Verify that application of local settings must not be done when it is dispatched.
verify(inboundFlow, never()).initialInboundWindowSize(eq(100));
verify(local, never()).allowPushTo(eq(false));
verify(remote, never()).maxStreams(eq(1000));
verify(readerTable, never()).maxHeaderTableSize(eq(2000));
// Verify that settings values are applied on the reception of SETTINGS ACK
decode().onSettingsAckRead(ctx);
verify(inboundFlow).initialInboundWindowSize(eq(100));
verify(local).allowPushTo(eq(false));
verify(remote).maxStreams(eq(1000));
verify(readerTable).maxHeaderTableSize(eq(2000));
}
private static ByteBuf dummyData() {
// The buffer is purposely 8 bytes so it will even work for a ping frame.
return wrappedBuffer("abcdefgh".getBytes(UTF_8));
}
private void mockContext() {
reset(ctx);
when(ctx.alloc()).thenReturn(UnpooledByteBufAllocator.DEFAULT);
when(ctx.channel()).thenReturn(channel);
when(ctx.newSucceededFuture()).thenReturn(future);
when(ctx.newPromise()).thenReturn(promise);
when(ctx.write(any())).thenReturn(future);
}
/**
* Calls the decode method on the handler and gets back the captured internal listener
*/
private Http2FrameListener decode() throws Exception {
ArgumentCaptor<Http2FrameListener> internallistener = ArgumentCaptor.forClass(Http2FrameListener.class);
doNothing().when(reader).readFrame(eq(ctx), any(ByteBuf.class), internallistener.capture());
handler.decode(ctx, EMPTY_BUFFER, Collections.emptyList());
return internallistener.getValue();
verify(lifecycleManager).onHttp2Exception(eq(ctx), eq(e));
}
}

View File

@ -22,6 +22,7 @@ import static io.netty.util.CharsetUtil.UTF_8;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyInt;
@ -35,6 +36,8 @@ import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
@ -139,6 +142,84 @@ public class Http2ConnectionRoundtripTest {
clientGroup.sync();
}
@Test
public void http2ExceptionInPipelineShouldCloseConnection() throws Exception {
// Create a latch to track when the close occurs.
final CountDownLatch closeLatch = new CountDownLatch(1);
clientChannel.closeFuture().addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
closeLatch.countDown();
}
});
// Create a single stream by sending a HEADERS frame to the server.
final Http2Headers headers = dummyHeaders();
requestLatch(new CountDownLatch(1));
runInChannel(clientChannel, new Http2Runnable() {
@Override
public void run() {
http2Client.encoder().writeHeaders(ctx(), 3, headers, 0, (short) 16, false, 0, false,
newPromise());
}
});
// Wait for the server to create the stream.
assertTrue(requestLatch.await(5, TimeUnit.SECONDS));
// Add a handler that will immediately throw an exception.
clientChannel.pipeline().addFirst(new ChannelHandlerAdapter() {
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
throw Http2Exception.protocolError("Fake Exception");
}
});
// Wait for the close to occur.
assertTrue(closeLatch.await(5, TimeUnit.SECONDS));
assertFalse(clientChannel.isOpen());
}
@Test
public void nonHttp2ExceptionInPipelineShouldNotCloseConnection() throws Exception {
// Create a latch to track when the close occurs.
final CountDownLatch closeLatch = new CountDownLatch(1);
clientChannel.closeFuture().addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
closeLatch.countDown();
}
});
// Create a single stream by sending a HEADERS frame to the server.
final Http2Headers headers = dummyHeaders();
requestLatch(new CountDownLatch(1));
runInChannel(clientChannel, new Http2Runnable() {
@Override
public void run() {
http2Client.encoder().writeHeaders(ctx(), 3, headers, 0, (short) 16, false, 0, false,
newPromise());
}
});
// Wait for the server to create the stream.
assertTrue(requestLatch.await(5, TimeUnit.SECONDS));
// Add a handler that will immediately throw an exception.
clientChannel.pipeline().addFirst(new ChannelHandlerAdapter() {
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
throw new RuntimeException("Fake Exception");
}
});
// The close should NOT occur.
assertFalse(closeLatch.await(5, TimeUnit.SECONDS));
assertTrue(clientChannel.isOpen());
}
@Test
public void flowControlProperlyChunksLargeMessage() throws Exception {
final Http2Headers headers = dummyHeaders();
@ -169,8 +250,9 @@ public class Http2ConnectionRoundtripTest {
runInChannel(clientChannel, new Http2Runnable() {
@Override
public void run() {
http2Client.writeHeaders(ctx(), 3, headers, 0, (short) 16, false, 0, false, newPromise());
http2Client.writeData(ctx(), 3, data.retain(), 0, true, newPromise());
http2Client.encoder().writeHeaders(ctx(), 3, headers, 0, (short) 16, false, 0,
false, newPromise());
http2Client.encoder().writeData(ctx(), 3, data.retain(), 0, true, newPromise());
}
});
@ -222,10 +304,12 @@ public class Http2ConnectionRoundtripTest {
@Override
public void run() {
for (int i = 0, nextStream = 3; i < NUM_STREAMS; ++i, nextStream += 2) {
http2Client.writeHeaders(ctx(), nextStream, headers, 0, (short) 16, false, 0, false,
http2Client.encoder().writeHeaders(ctx(), nextStream, headers, 0,
(short) 16, false, 0, false, newPromise());
http2Client.encoder().writePing(ctx(), false, pingData.slice().retain(),
newPromise());
http2Client.writePing(ctx(), false, pingData.slice().retain(), newPromise());
http2Client.writeData(ctx(), nextStream, data.slice().retain(), 0, true, newPromise());
http2Client.encoder().writeData(ctx(), nextStream, data.slice().retain(),
0, true, newPromise());
}
}
});

View File

@ -14,6 +14,7 @@
*/
package io.netty.handler.codec;
import static io.netty.util.internal.ObjectUtil.checkNotNull;
import io.netty.util.internal.PlatformDependent;
import java.util.Arrays;
@ -289,12 +290,6 @@ public class DefaultBinaryHeaders implements BinaryHeaders {
return Utils.toStringUtf8(this);
}
static <T> void checkNotNull(T value, String name) {
if (value == null) {
throw new NullPointerException(name);
}
}
private static final class AsciiStringHeaderEntry implements Map.Entry<AsciiString, AsciiString> {
private final Entry<CharSequence, CharSequence> entry;

View File

@ -16,6 +16,7 @@
package io.netty.handler.codec;
import static io.netty.util.internal.ObjectUtil.checkNotNull;
import io.netty.util.concurrent.FastThreadLocal;
import io.netty.util.internal.PlatformDependent;
@ -596,12 +597,6 @@ public class DefaultTextHeaders implements TextHeaders {
return true;
}
private static <T> void checkNotNull(T value, String name) {
if (value == null) {
throw new NullPointerException(name);
}
}
private static final class StringHeaderEntry implements Entry<String, String> {
private final Entry<CharSequence, CharSequence> entry;
private String name;

View File

@ -14,6 +14,8 @@
*/
package io.netty.handler.codec;
import static io.netty.util.internal.ObjectUtil.checkNotNull;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@ -747,13 +749,6 @@ public class HeaderMap implements Iterable<Entry<CharSequence, CharSequence>> {
return nameConverter.convertName(checkNotNull(name, "name"));
}
private static <T> T checkNotNull(T value, String name) {
if (value == null) {
throw new NullPointerException(name);
}
return value;
}
private static int hashCode(CharSequence name) {
return AsciiString.caseInsensitiveHashCode(name);
}

View File

@ -0,0 +1,35 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License, version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package io.netty.util.internal;
/**
* A grab-bag of useful utility methods.
*/
public final class ObjectUtil {
private ObjectUtil() {
}
/**
* Checks that the given argument is not null. If it is, throws {@link NullPointerException}.
* Otherwise, returns the argument.
*/
public static <T> T checkNotNull(T arg, String text) {
if (arg == null) {
throw new NullPointerException(text);
}
return arg;
}
}

View File

@ -29,7 +29,7 @@ import io.netty.handler.codec.http2.DefaultHttp2Connection;
import io.netty.handler.codec.http2.DefaultHttp2FrameReader;
import io.netty.handler.codec.http2.DefaultHttp2FrameWriter;
import io.netty.handler.codec.http2.DefaultHttp2InboundFlowController;
import io.netty.handler.codec.http2.Http2OutboundConnectionAdapter;
import io.netty.handler.codec.http2.DefaultHttp2OutboundFlowController;
import io.netty.handler.codec.http2.DelegatingDecompressorFrameListener;
import io.netty.handler.codec.http2.Http2ClientUpgradeCodec;
import io.netty.handler.codec.http2.Http2Connection;
@ -38,8 +38,8 @@ import io.netty.handler.codec.http2.Http2FrameReader;
import io.netty.handler.codec.http2.Http2FrameWriter;
import io.netty.handler.codec.http2.Http2InboundFrameLogger;
import io.netty.handler.codec.http2.Http2OutboundFrameLogger;
import io.netty.handler.codec.http2.InboundHttp2ToHttpAdapter;
import io.netty.handler.codec.http2.Http2ToHttpConnectionHandler;
import io.netty.handler.codec.http2.InboundHttp2ToHttpAdapter;
import io.netty.handler.ssl.SslContext;
import io.netty.util.internal.logging.InternalLoggerFactory;
@ -66,11 +66,12 @@ public class Http2ClientInitializer extends ChannelInitializer<SocketChannel> {
final Http2Connection connection = new DefaultHttp2Connection(false);
final Http2FrameWriter frameWriter = frameWriter();
connectionHandler = new Http2ToHttpConnectionHandler(connection,
new DelegatingDecompressorFrameListener(connection,
InboundHttp2ToHttpAdapter.newInstance(connection, maxContentLength)),
frameReader(),
frameWriter,
new DefaultHttp2InboundFlowController(connection, frameWriter),
new Http2OutboundConnectionAdapter(connection, frameWriter));
new DefaultHttp2OutboundFlowController(connection, frameWriter),
new DelegatingDecompressorFrameListener(connection,
InboundHttp2ToHttpAdapter.newInstance(connection, maxContentLength)));
responseHandler = new HttpResponseHandler();
settingsHandler = new Http2SettingsHandler(ch.newPromise());
if (sslCtx != null) {

View File

@ -27,16 +27,15 @@ import io.netty.handler.codec.http2.DefaultHttp2Connection;
import io.netty.handler.codec.http2.DefaultHttp2FrameReader;
import io.netty.handler.codec.http2.DefaultHttp2FrameWriter;
import io.netty.handler.codec.http2.DefaultHttp2Headers;
import io.netty.handler.codec.http2.DefaultHttp2InboundFlowController;
import io.netty.handler.codec.http2.Http2Connection;
import io.netty.handler.codec.http2.Http2ConnectionHandler;
import io.netty.handler.codec.http2.Http2Exception;
import io.netty.handler.codec.http2.Http2FrameAdapter;
import io.netty.handler.codec.http2.Http2FrameLogger;
import io.netty.handler.codec.http2.Http2FrameReader;
import io.netty.handler.codec.http2.Http2FrameWriter;
import io.netty.handler.codec.http2.Http2Headers;
import io.netty.handler.codec.http2.Http2InboundFrameLogger;
import io.netty.handler.codec.http2.Http2OutboundConnectionAdapter;
import io.netty.handler.codec.http2.Http2OutboundFrameLogger;
import io.netty.util.CharsetUtil;
import io.netty.util.internal.logging.InternalLoggerFactory;
@ -51,18 +50,14 @@ public class HelloWorldHttp2Handler extends Http2ConnectionHandler {
static final ByteBuf RESPONSE_BYTES = unreleasableBuffer(copiedBuffer("Hello World", CharsetUtil.UTF_8));
public HelloWorldHttp2Handler() {
this(new DefaultHttp2Connection(true), new Http2OutboundFrameLogger(new DefaultHttp2FrameWriter(), logger));
this(new DefaultHttp2Connection(true), new Http2InboundFrameLogger(
new DefaultHttp2FrameReader(), logger), new Http2OutboundFrameLogger(
new DefaultHttp2FrameWriter(), logger));
}
private HelloWorldHttp2Handler(Http2Connection connection, Http2FrameWriter frameWriter) {
this(connection, frameWriter, new Http2OutboundConnectionAdapter(connection, frameWriter));
}
private HelloWorldHttp2Handler(Http2Connection connection, Http2FrameWriter frameWriter,
Http2OutboundConnectionAdapter outbound) {
super(connection, new SimpleHttp2FrameListener(outbound),
new Http2InboundFrameLogger(new DefaultHttp2FrameReader(), logger),
new DefaultHttp2InboundFlowController(connection, frameWriter), outbound);
private HelloWorldHttp2Handler(Http2Connection connection, Http2FrameReader frameReader,
Http2FrameWriter frameWriter) {
super(connection, frameReader, frameWriter, new SimpleHttp2FrameListener(frameWriter));
}
/**
@ -76,7 +71,7 @@ public class HelloWorldHttp2Handler extends Http2ConnectionHandler {
Http2Headers headers =
new DefaultHttp2Headers().status(new AsciiString("200"))
.set(new AsciiString(UPGRADE_RESPONSE_HEADER), new AsciiString("true"));
writeHeaders(ctx, 1, headers, 0, true, ctx.newPromise());
encoder().writeHeaders(ctx, 1, headers, 0, true, ctx.newPromise());
}
super.userEventTriggered(ctx, evt);
}
@ -88,10 +83,10 @@ public class HelloWorldHttp2Handler extends Http2ConnectionHandler {
}
private static class SimpleHttp2FrameListener extends Http2FrameAdapter {
private Http2OutboundConnectionAdapter outbound;
private Http2FrameWriter frameWriter;
public SimpleHttp2FrameListener(Http2OutboundConnectionAdapter outbound) {
this.outbound = outbound;
public SimpleHttp2FrameListener(Http2FrameWriter frameWriter) {
this.frameWriter = frameWriter;
}
/**
@ -123,8 +118,8 @@ public class HelloWorldHttp2Handler extends Http2ConnectionHandler {
private void sendResponse(ChannelHandlerContext ctx, int streamId, ByteBuf payload) {
// Send a frame for the response status
Http2Headers headers = new DefaultHttp2Headers().status(new AsciiString("200"));
outbound.writeHeaders(ctx, streamId, headers, 0, false, ctx.newPromise());
outbound.writeData(ctx, streamId, payload, 0, true, ctx.newPromise());
frameWriter.writeHeaders(ctx, streamId, headers, 0, false, ctx.newPromise());
frameWriter.writeData(ctx, streamId, payload, 0, true, ctx.newPromise());
}
};
}