diff --git a/codec-http/src/main/java/io/netty/handler/codec/http/HttpClientCodec.java b/codec-http/src/main/java/io/netty/handler/codec/http/HttpClientCodec.java index a30b2abc7c..5b3b2dca02 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/http/HttpClientCodec.java +++ b/codec-http/src/main/java/io/netty/handler/codec/http/HttpClientCodec.java @@ -40,7 +40,8 @@ import java.util.concurrent.atomic.AtomicLong; * * @see HttpServerCodec */ -public final class HttpClientCodec extends ChannelHandlerAppender { +public final class HttpClientCodec extends ChannelHandlerAppender implements + HttpClientUpgradeHandler.SourceCodec { /** A queue that is used for correlating a request and a response. */ private final Queue queue = new ArrayDeque(); @@ -86,6 +87,16 @@ public final class HttpClientCodec extends ChannelHandlerAppender { this.failOnMissingResponse = failOnMissingResponse; } + /** + * Upgrades to another protocol from HTTP. Removes the {@link Decoder} and {@link Encoder} from + * the pipeline. + */ + @Override + public void upgradeFrom(ChannelHandlerContext ctx) { + ctx.pipeline().remove(Decoder.class); + ctx.pipeline().remove(Encoder.class); + } + private Decoder decoder() { return handlerAt(0); } diff --git a/codec-http/src/main/java/io/netty/handler/codec/http/HttpClientUpgradeHandler.java b/codec-http/src/main/java/io/netty/handler/codec/http/HttpClientUpgradeHandler.java new file mode 100644 index 0000000000..9cff46e873 --- /dev/null +++ b/codec-http/src/main/java/io/netty/handler/codec/http/HttpClientUpgradeHandler.java @@ -0,0 +1,234 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package io.netty.handler.codec.http; + +import static io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION; +import static io.netty.handler.codec.http.HttpHeaders.Names.UPGRADE; +import static io.netty.handler.codec.http.HttpResponseStatus.SWITCHING_PROTOCOLS; +import static io.netty.util.ReferenceCountUtil.release; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelPromise; + +import java.util.Collection; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Set; + +/** + * Client-side handler for handling an HTTP upgrade handshake to another protocol. When the first + * HTTP request is sent, this handler will add all appropriate headers to perform an upgrade to the + * new protocol. If the upgrade fails (i.e. response is not 101 Switching Protocols), this handler + * simply removes itself from the pipeline. If the upgrade is successful, upgrades the pipeline to + * the new protocol. + */ +public class HttpClientUpgradeHandler extends HttpObjectAggregator { + + /** + * User events that are fired to notify about upgrade status. + */ + public enum UpgradeEvent { + /** + * The Upgrade request was sent to the server. + */ + UPGRADE_ISSUED, + + /** + * The Upgrade to the new protocol was successful. + */ + UPGRADE_SUCCESSFUL, + + /** + * The Upgrade was unsuccessful due to the server not issuing + * with a 101 Switching Protocols response. + */ + UPGRADE_REJECTED + } + + /** + * The source codec that is used in the pipeline initially. + */ + public interface SourceCodec { + /** + * Removes this codec (i.e. all associated handlers) from the pipeline. + */ + void upgradeFrom(ChannelHandlerContext ctx); + } + + /** + * A codec that the source can be upgraded to. + */ + public interface UpgradeCodec { + /** + * Returns the name of the protocol supported by this codec, as indicated by the {@link UPGRADE} header. + */ + String protocol(); + + /** + * Sets any protocol-specific headers required to the upgrade request. Returns the names of + * all headers that were added. These headers will be used to populate the CONNECTION header. + */ + Collection setUpgradeHeaders(ChannelHandlerContext ctx, HttpRequest upgradeRequest); + + /** + * Performs an HTTP protocol upgrade from the source codec. This method is responsible for + * adding all handlers required for the new protocol. + * + * @param ctx the context for the current handler. + * @param upgradeResponse the 101 Switching Protocols response that indicates that the server + * has switched to this protocol. + */ + void upgradeTo(ChannelHandlerContext ctx, FullHttpResponse upgradeResponse) throws Exception; + } + + private final SourceCodec sourceCodec; + private final UpgradeCodec upgradeCodec; + private boolean upgradeRequested; + + /** + * Constructs the client upgrade handler. + * + * @param sourceCodec the codec that is being used initially. + * @param upgradeCodec the codec that the client would like to upgrade to. + * @param maxContentLength the maximum length of the aggregated content. + */ + public HttpClientUpgradeHandler(SourceCodec sourceCodec, UpgradeCodec upgradeCodec, + int maxContentLength) { + super(maxContentLength); + if (sourceCodec == null) { + throw new NullPointerException("sourceCodec"); + } + if (upgradeCodec == null) { + throw new NullPointerException("upgradeCodec"); + } + this.sourceCodec = sourceCodec; + this.upgradeCodec = upgradeCodec; + } + + @Override + public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) + throws Exception { + if (!(msg instanceof HttpRequest)) { + super.write(ctx, msg, promise); + return; + } + + if (upgradeRequested) { + promise.setFailure(new IllegalStateException( + "Attempting to write HTTP request with upgrade in progress")); + return; + } + + upgradeRequested = true; + setUpgradeRequestHeaders(ctx, (HttpRequest) msg); + + // Continue writing the request. + super.write(ctx, msg, promise); + + // Notify that the upgrade request was issued. + ctx.fireUserEventTriggered(UpgradeEvent.UPGRADE_ISSUED); + // Now we wait for the next HTTP response to see if we switch protocols. + } + + @Override + protected void decode(ChannelHandlerContext ctx, HttpObject msg, List out) + throws Exception { + FullHttpResponse response = null; + try { + if (!upgradeRequested) { + throw new IllegalStateException("Read HTTP response without requesting protocol switch"); + } + + if (msg instanceof FullHttpResponse) { + response = (FullHttpResponse) msg; + // Need to retain since the base class will release after returning from this method. + response.retain(); + out.add(response); + } else { + // Call the base class to handle the aggregation of the full request. + super.decode(ctx, msg, out); + if (out.isEmpty()) { + // The full request hasn't been created yet, still awaiting more data. + return; + } + + assert out.size() == 1; + response = (FullHttpResponse) out.get(0); + } + + if (!SWITCHING_PROTOCOLS.equals(response.getStatus())) { + // The server does not support the requested protocol, just remove this handler + // and continue processing HTTP. + // NOTE: not releasing the response since we're letting it propagate to the + // next handler. + ctx.fireUserEventTriggered(UpgradeEvent.UPGRADE_REJECTED); + removeThisHandler(ctx); + return; + } + + String upgradeHeader = response.headers().get(UPGRADE); + if (upgradeHeader == null) { + throw new IllegalStateException( + "Switching Protocols response missing UPGRADE header"); + } + if (!upgradeCodec.protocol().equalsIgnoreCase(upgradeHeader)) { + throw new IllegalStateException( + "Switching Protocols response with unexpected UPGRADE protocol: " + + upgradeHeader); + } + + // Upgrade to the new protocol. + sourceCodec.upgradeFrom(ctx); + upgradeCodec.upgradeTo(ctx, response); + + // Notify that the upgrade to the new protocol completed successfully. + ctx.fireUserEventTriggered(UpgradeEvent.UPGRADE_SUCCESSFUL); + + // We switched protocols, so we're done with the upgrade response. + // Release it and clear it from the output. + response.release(); + out.clear(); + removeThisHandler(ctx); + } catch (Throwable t) { + release(response); + ctx.fireExceptionCaught(t); + removeThisHandler(ctx); + } + } + + private void removeThisHandler(ChannelHandlerContext ctx) { + ctx.pipeline().remove(ctx.name()); + } + + /** + * Adds all upgrade request headers necessary for an upgrade to the supported protocols. + */ + private void setUpgradeRequestHeaders(ChannelHandlerContext ctx, HttpRequest request) { + // Set the UPGRADE header on the request. + request.headers().set(UPGRADE, upgradeCodec.protocol()); + + // Add all protocol-specific headers to the request. + Set connectionParts = new LinkedHashSet(2); + connectionParts.addAll(upgradeCodec.setUpgradeHeaders(ctx, request)); + + // Set the CONNECTION header from the set of all protocol-specific headers that were added. + StringBuilder builder = new StringBuilder(); + for (String part : connectionParts) { + builder.append(part); + builder.append(","); + } + builder.append(UPGRADE); + request.headers().set(CONNECTION, builder.toString()); + } +} diff --git a/codec-http/src/main/java/io/netty/handler/codec/http/HttpServerCodec.java b/codec-http/src/main/java/io/netty/handler/codec/http/HttpServerCodec.java index 0b14fa26e1..8a56710b28 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/http/HttpServerCodec.java +++ b/codec-http/src/main/java/io/netty/handler/codec/http/HttpServerCodec.java @@ -16,15 +16,20 @@ package io.netty.handler.codec.http; import io.netty.channel.ChannelHandlerAppender; +import io.netty.channel.ChannelHandlerContext; /** * A combination of {@link HttpRequestDecoder} and {@link HttpResponseEncoder} - * which enables easier server side HTTP implementation. + * which enables easier server side HTTP implementation. Also supports use with + * a {@link HttpServerUpgradeHandler} to support upgrading to another protocol + * from HTTP. * * @see HttpClientCodec + * @see HttpServerUpgradeHandler */ -public final class HttpServerCodec extends ChannelHandlerAppender { +public final class HttpServerCodec extends ChannelHandlerAppender implements + HttpServerUpgradeHandler.SourceCodec { /** * Creates a new instance with the default decoder options @@ -49,4 +54,14 @@ public final class HttpServerCodec extends ChannelHandlerAppender { super(new HttpRequestDecoder(maxInitialLineLength, maxHeaderSize, maxChunkSize, validateHeaders), new HttpResponseEncoder()); } + + /** + * Upgrades to another protocol from HTTP. Removes the {@link HttpRequestDecoder} and + * {@link HttpResponseEncoder} from the pipeline. + */ + @Override + public void upgradeFrom(ChannelHandlerContext ctx) { + ctx.pipeline().remove(HttpRequestDecoder.class); + ctx.pipeline().remove(HttpResponseEncoder.class); + } } diff --git a/codec-http/src/main/java/io/netty/handler/codec/http/HttpServerUpgradeHandler.java b/codec-http/src/main/java/io/netty/handler/codec/http/HttpServerUpgradeHandler.java new file mode 100644 index 0000000000..ef1b0e8f32 --- /dev/null +++ b/codec-http/src/main/java/io/netty/handler/codec/http/HttpServerUpgradeHandler.java @@ -0,0 +1,376 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package io.netty.handler.codec.http; + +import static io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION; +import static io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_LENGTH; +import static io.netty.handler.codec.http.HttpHeaders.Names.UPGRADE; +import static io.netty.handler.codec.http.HttpResponseStatus.SWITCHING_PROTOCOLS; +import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1; +import static java.lang.String.CASE_INSENSITIVE_ORDER; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandlerContext; +import io.netty.util.ReferenceCountUtil; +import io.netty.util.ReferenceCounted; + +import java.util.Collection; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Set; +import java.util.TreeSet; + +/** + * A server-side handler that receives HTTP requests and optionally performs a protocol switch if + * the requested protocol is supported. Once an upgrade is performed, this handler removes itself + * from the pipeline. + */ +public class HttpServerUpgradeHandler extends HttpObjectAggregator { + + /** + * The source codec that is used in the pipeline initially. + */ + public interface SourceCodec { + /** + * Removes this codec (i.e. all associated handlers) from the pipeline. + */ + void upgradeFrom(ChannelHandlerContext ctx); + } + + /** + * A codec that the source can be upgraded to. + */ + public interface UpgradeCodec { + /** + * Returns the name of the protocol supported by this codec, as indicated by the {@link UPGRADE} header. + */ + String protocol(); + + /** + * Gets all protocol-specific headers required by this protocol for a successful upgrade. + * Any supplied header will be required to appear in the {@link CONNECTION} header as well. + */ + Collection requiredUpgradeHeaders(); + + /** + * Adds any headers to the 101 Switching protocols response that are appropriate for this protocol. + */ + void prepareUpgradeResponse(ChannelHandlerContext ctx, FullHttpRequest upgradeRequest, + FullHttpResponse upgradeResponse); + + /** + * Performs an HTTP protocol upgrade from the source codec. This method is responsible for + * adding all handlers required for the new protocol. + * + * @param ctx the context for the current handler. + * @param upgradeRequest the request that triggered the upgrade to this protocol. The + * upgraded protocol is responsible for sending the response. + * @param upgradeResponse a 101 Switching Protocols response that is populated with the + * {@link CONNECTION} and {@link UPGRADE} headers. The protocol is required to + * send this before sending any other frames back to the client. The headers may + * be augmented as necessary by the protocol before sending. + * @return the future for the writing of the upgrade response. + */ + void upgradeTo(ChannelHandlerContext ctx, FullHttpRequest upgradeRequest, + FullHttpResponse upgradeResponse); + } + + /** + * User event that is fired to notify about the completion of an HTTP upgrade + * to another protocol. Contains the original upgrade request so that the response + * (if required) can be sent using the new protocol. + */ + public static final class UpgradeEvent implements ReferenceCounted { + private final String protocol; + private final FullHttpRequest upgradeRequest; + + private UpgradeEvent(String protocol, FullHttpRequest upgradeRequest) { + this.protocol = protocol; + this.upgradeRequest = upgradeRequest; + } + + /** + * The protocol that the channel has been upgraded to. + */ + public String protocol() { + return protocol; + } + + /** + * Gets the request that triggered the protocol upgrade. + */ + public FullHttpRequest upgradeRequest() { + return upgradeRequest; + } + + @Override + public int refCnt() { + return upgradeRequest.refCnt(); + } + + @Override + public UpgradeEvent retain() { + upgradeRequest.retain(); + return this; + } + + @Override + public UpgradeEvent retain(int increment) { + upgradeRequest.retain(increment); + return this; + } + + @Override + public UpgradeEvent touch() { + upgradeRequest.touch(); + return this; + } + + @Override + public UpgradeEvent touch(Object hint) { + upgradeRequest.touch(hint); + return this; + } + + @Override + public boolean release() { + return upgradeRequest.release(); + } + + @Override + public boolean release(int decrement) { + return upgradeRequest.release(); + } + + @Override + public String toString() { + return "UpgradeEvent [protocol=" + protocol + ", upgradeRequest=" + upgradeRequest + + "]"; + } + } + + private final Map upgradeCodecMap; + private final SourceCodec sourceCodec; + private boolean handlingUpgrade; + + /** + * Constructs the upgrader with the supported codecs. + * + * @param sourceCodec the codec that is being used initially. + * @param upgradeCodecs the codecs (in order of preference) that this server supports + * upgrading to from the source codec. + * @param maxContentLength the maximum length of the aggregated content. + */ + public HttpServerUpgradeHandler(SourceCodec sourceCodec, + Collection upgradeCodecs, int maxContentLength) { + super(maxContentLength); + if (sourceCodec == null) { + throw new NullPointerException("sourceCodec"); + } + if (upgradeCodecs == null) { + throw new NullPointerException("upgradeCodecs"); + } + this.sourceCodec = sourceCodec; + upgradeCodecMap = new LinkedHashMap(upgradeCodecs.size()); + for (UpgradeCodec upgradeCodec : upgradeCodecs) { + String name = upgradeCodec.protocol().toUpperCase(Locale.US); + upgradeCodecMap.put(name, upgradeCodec); + } + } + + @Override + protected void decode(ChannelHandlerContext ctx, HttpObject msg, List out) + throws Exception { + // Determine if we're already handling an upgrade request or just starting a new one. + handlingUpgrade = handlingUpgrade || isUpgradeRequest(msg); + if (!handlingUpgrade) { + // Not handling an upgrade request, just pass it to the next handler. + ReferenceCountUtil.retain(msg); + out.add(msg); + return; + } + + FullHttpRequest fullRequest = null; + if (msg instanceof FullHttpRequest) { + fullRequest = (FullHttpRequest) msg; + ReferenceCountUtil.retain(msg); + out.add(msg); + } else { + // Call the base class to handle the aggregation of the full request. + super.decode(ctx, msg, out); + if (out.isEmpty()) { + // The full request hasn't been created yet, still awaiting more data. + return; + } + + // Finished aggregating the full request, get it from the output list. + assert out.size() == 1; + handlingUpgrade = false; + fullRequest = (FullHttpRequest) out.get(0); + } + + if (upgrade(ctx, fullRequest)) { + // The upgrade was successful, remove the message from the output list + // so that it's not propagated to the next handler. This request will + // be propagated as a user event instead. + out.clear(); + } + + // The upgrade did not succeed, just allow the full request to propagate to the + // next handler. + } + + /** + * Determines whether or not the message is an HTTP upgrade request. + */ + private boolean isUpgradeRequest(HttpObject msg) { + return (msg instanceof HttpRequest) && ((HttpRequest) msg).headers().get(UPGRADE) != null; + } + + /** + * Attempts to upgrade to the protocol(s) identified by the {@link UPGRADE} header (if provided + * in the request). + * + * @param ctx the context for this handler. + * @param request the HTTP request. + * @return {@code true} if the upgrade occurred, otherwise {@code false}. + */ + private boolean upgrade(final ChannelHandlerContext ctx, final FullHttpRequest request) { + // Select the best protocol based on those requested in the UPGRADE header. + String upgradeHeader = request.headers().get(UPGRADE); + final UpgradeCodec upgradeCodec = selectUpgradeCodec(upgradeHeader); + if (upgradeCodec == null) { + // None of the requested protocols are supported, don't upgrade. + return false; + } + + // Make sure the CONNECTION header is present. + String connectionHeader = request.headers().get(CONNECTION); + if (connectionHeader == null) { + return false; + } + + // Make sure the CONNECTION header contains UPGRADE as well as all protocol-specific headers. + Collection requiredHeaders = upgradeCodec.requiredUpgradeHeaders(); + Set values = splitHeader(connectionHeader); + if (!values.contains(UPGRADE.toString()) || !values.containsAll(requiredHeaders)) { + return false; + } + + // Ensure that all required protocol-specific headers are found in the request. + for (String requiredHeader : requiredHeaders) { + if (!request.headers().contains(requiredHeader)) { + return false; + } + } + + // Create the user event to be fired once the upgrade completes. + final UpgradeEvent event = new UpgradeEvent(upgradeCodec.protocol(), request); + + // Prepare and send the upgrade response. Wait for this write to complete before upgrading, + // since we need the old codec in-place to properly encode the response. + final FullHttpResponse upgradeResponse = createUpgradeResponse(upgradeCodec); + upgradeCodec.prepareUpgradeResponse(ctx, request, upgradeResponse); + ctx.writeAndFlush(upgradeResponse).addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + try { + if (future.isSuccess()) { + // Perform the upgrade to the new protocol. + sourceCodec.upgradeFrom(ctx); + upgradeCodec.upgradeTo(ctx, request, upgradeResponse); + + // Notify that the upgrade has occurred. Retain the event to offset + // the release() in the finally block. + ctx.fireUserEventTriggered(event.retain()); + + // Remove this handler from the pipeline. + ctx.pipeline().remove(HttpServerUpgradeHandler.this); + } else { + future.channel().close(); + } + } finally { + // Release the event if the upgrade event wasn't fired. + event.release(); + } + } + }); + return true; + } + + /** + * Looks up the most desirable supported upgrade codec from the list of choices in the UPGRADE + * header. If no suitable codec was found, returns {@code null}. + */ + private UpgradeCodec selectUpgradeCodec(String upgradeHeader) { + Set requestedProtocols = splitHeader(upgradeHeader); + + // Retain only the protocols that are in the protocol map. Maintain the original insertion + // order into the protocolMap, so that the first one in the remaining set is the most + // desirable protocol for the server. + Set supportedProtocols = new LinkedHashSet(upgradeCodecMap.keySet()); + supportedProtocols.retainAll(requestedProtocols); + + if (!supportedProtocols.isEmpty()) { + String protocol = supportedProtocols.iterator().next().toUpperCase(Locale.US); + return upgradeCodecMap.get(protocol); + } + return null; + } + + /** + * Creates the 101 Switching Protocols response message. + */ + private FullHttpResponse createUpgradeResponse(UpgradeCodec upgradeCodec) { + DefaultFullHttpResponse res = new DefaultFullHttpResponse(HTTP_1_1, SWITCHING_PROTOCOLS); + res.headers().add(CONNECTION, UPGRADE); + res.headers().add(UPGRADE, upgradeCodec.protocol()); + res.headers().add(CONTENT_LENGTH, "0"); + return res; + } + + /** + * Splits a comma-separated header value. The returned set is case-insensitive and contains each + * part with whitespace removed. + */ + private static Set splitHeader(String header) { + StringBuilder builder = new StringBuilder(header.length()); + Set protocols = new TreeSet(CASE_INSENSITIVE_ORDER); + for (int i = 0; i < header.length(); ++i) { + char c = header.charAt(i); + if (Character.isWhitespace(c)) { + // Don't include any whitespace. + continue; + } + if (c == ',') { + // Add the string and reset the builder for the next protocol. + protocols.add(builder.toString()); + builder.setLength(0); + } else { + builder.append(c); + } + } + + // Add the last protocol + if (builder.length() > 0) { + protocols.add(builder.toString()); + } + + return protocols; + } +} diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/AbstractHttp2ConnectionHandler.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/AbstractHttp2ConnectionHandler.java index f8c86f5d01..7587b912ee 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/AbstractHttp2ConnectionHandler.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/AbstractHttp2ConnectionHandler.java @@ -15,7 +15,9 @@ package io.netty.handler.codec.http2; +import static io.netty.handler.codec.http2.Http2CodecUtil.CONNECTION_STREAM_ID; import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_PRIORITY_WEIGHT; +import static io.netty.handler.codec.http2.Http2CodecUtil.HTTP_UPGRADE_STREAM_ID; import static io.netty.handler.codec.http2.Http2CodecUtil.connectionPrefaceBuf; import static io.netty.handler.codec.http2.Http2CodecUtil.toByteBuf; import static io.netty.handler.codec.http2.Http2CodecUtil.toHttp2Exception; @@ -61,14 +63,14 @@ public abstract class AbstractHttp2ConnectionHandler extends ByteToMessageDecode private final Http2Connection connection; private final Http2InboundFlowController inboundFlow; private final Http2OutboundFlowController outboundFlow; + // We prefer ArrayDeque to LinkedList because later will produce more GC. + // This initial capacity is plenty for SETTINGS traffic. + private final ArrayDeque outstandingLocalSettingsQueue = new ArrayDeque(4); private ByteBuf clientPrefaceString; private boolean prefaceSent; private boolean prefaceReceived; private ChannelHandlerContext ctx; private ChannelFutureListener closeListener; - // We prefer ArrayDeque to LinkedList because later will produce more GC. - // This initial capacity is plenty for SETTINGS traffic. - private final ArrayDeque outstandingLocalSettingsQueue = new ArrayDeque(4); protected AbstractHttp2ConnectionHandler(boolean server) { this(server, false); @@ -111,6 +113,45 @@ public abstract class AbstractHttp2ConnectionHandler extends ByteToMessageDecode clientPrefaceString = connection.isServer()? connectionPrefaceBuf() : null; } + /** + * Handles the client-side (cleartext) upgrade from HTTP to HTTP/2. Reserves local stream 1 for + * the HTTP/2 response. + */ + public final void onHttpClientUpgrade() throws Http2Exception { + if (connection.isServer()) { + throw protocolError("Client-side HTTP upgrade requested for a server"); + } + if (prefaceSent || prefaceReceived) { + throw protocolError("HTTP upgrade must occur before HTTP/2 preface is sent or received"); + } + + // Create a local stream used for the HTTP cleartext upgrade. + createLocalStream(HTTP_UPGRADE_STREAM_ID, true, CONNECTION_STREAM_ID, + DEFAULT_PRIORITY_WEIGHT, false); + } + + /** + * Handles the server-side (cleartext) upgrade from HTTP to HTTP/2. + * + * @param settings the settings for the remote endpoint. + */ + public final void onHttpServerUpgrade(Http2Settings settings) + throws Http2Exception { + if (!connection.isServer()) { + throw protocolError("Server-side HTTP upgrade requested for a client"); + } + if (prefaceSent || prefaceReceived) { + throw protocolError("HTTP upgrade must occur before HTTP/2 preface is sent or received"); + } + + // Apply the settings but no ACK is necessary. + applyRemoteSettings(settings); + + // Create a stream in the half-closed state. + createRemoteStream(HTTP_UPGRADE_STREAM_ID, true, CONNECTION_STREAM_ID, + DEFAULT_PRIORITY_WEIGHT, false); + } + @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { // The channel just became active - send the connection preface to the remote @@ -186,6 +227,13 @@ public abstract class AbstractHttp2ConnectionHandler extends ByteToMessageDecode return settings; } + /** + * Gets the next stream ID that can be created by the local endpoint. + */ + protected int nextStreamId() { + return connection.local().nextStreamId(); + } + protected ChannelFuture writeData(final ChannelHandlerContext ctx, final ChannelPromise promise, int streamId, final ByteBuf data, int padding, boolean endStream, boolean endSegment, boolean compressed) { @@ -227,14 +275,8 @@ public abstract class AbstractHttp2ConnectionHandler extends ByteToMessageDecode Http2Stream stream = connection.stream(streamId); if (stream == null) { - // Creates a new locally-initiated stream. - stream = connection.local().createStream(streamId, endStream); - - // Allow bi-directional traffic. - inboundFlow.addStream(streamId); - if (!endStream) { - outboundFlow.addStream(streamId, streamDependency, weight, exclusive); - } + // Create a new locally-initiated stream. + stream = createLocalStream(streamId, endStream, streamDependency, weight, exclusive); } else { // An existing stream... if (stream.state() == RESERVED_LOCAL) { @@ -308,10 +350,8 @@ public abstract class AbstractHttp2ConnectionHandler extends ByteToMessageDecode throw protocolError("Sending settings after connection going away."); } - if (settings.hasPushEnabled()) { - if (connection.isServer()) { - throw protocolError("Server sending SETTINGS frame with ENABLE_PUSH specified"); - } + if (settings.hasPushEnabled() && connection.isServer()) { + throw protocolError("Server sending SETTINGS frame with ENABLE_PUSH specified"); } return frameWriter.writeSettings(ctx, promise, settings); @@ -569,6 +609,88 @@ public abstract class AbstractHttp2ConnectionHandler extends ByteToMessageDecode ChannelFutureListener.CLOSE_ON_FAILURE); } + /** + * Applies settings sent from the local endpoint. + */ + private void applyLocalSettings(Http2Settings settings) throws Http2Exception { + if (settings.hasPushEnabled()) { + if (connection.isServer()) { + throw protocolError("Server sending SETTINGS frame with ENABLE_PUSH specified"); + } + connection.local().allowPushTo(settings.pushEnabled()); + } + + if (settings.hasAllowCompressedData()) { + connection.local().allowCompressedData(settings.allowCompressedData()); + } + + if (settings.hasMaxConcurrentStreams()) { + connection.remote().maxStreams(settings.maxConcurrentStreams()); + } + + if (settings.hasMaxHeaderTableSize()) { + frameReader.maxHeaderTableSize(settings.maxHeaderTableSize()); + } + + if (settings.hasInitialWindowSize()) { + inboundFlow.initialInboundWindowSize(settings.initialWindowSize()); + } + } + + /** + * Applies settings received from the remote endpoint. + */ + private void applyRemoteSettings(Http2Settings settings) throws Http2Exception { + if (settings.hasPushEnabled()) { + if (!connection.isServer()) { + throw protocolError("Client received SETTINGS frame with ENABLE_PUSH specified"); + } + connection.remote().allowPushTo(settings.pushEnabled()); + } + + if (settings.hasAllowCompressedData()) { + connection.remote().allowCompressedData(settings.allowCompressedData()); + } + + if (settings.hasMaxConcurrentStreams()) { + connection.local().maxStreams(settings.maxConcurrentStreams()); + } + + if (settings.hasMaxHeaderTableSize()) { + frameWriter.maxHeaderTableSize(settings.maxHeaderTableSize()); + } + + if (settings.hasInitialWindowSize()) { + outboundFlow.initialOutboundWindowSize(settings.initialWindowSize()); + } + } + + /** + * Creates a new stream initiated by the local endpoint. + */ + private Http2Stream createLocalStream(int streamId, boolean halfClosed, int streamDependency, + short weight, boolean exclusive) throws Http2Exception { + Http2Stream stream = connection.local().createStream(streamId, halfClosed); + inboundFlow.addStream(streamId); + if (!halfClosed) { + outboundFlow.addStream(streamId, streamDependency, weight, exclusive); + } + return stream; + } + + /** + * Creates a new stream initiated by the remote endpoint. + */ + private Http2Stream createRemoteStream(int streamId, boolean halfClosed, int streamDependency, + short weight, boolean exclusive) throws Http2Exception { + Http2Stream stream = connection.remote().createStream(streamId, halfClosed); + outboundFlow.addStream(streamId, streamDependency, weight, exclusive); + if (!halfClosed) { + inboundFlow.addStream(streamId); + } + return stream; + } + /** * Handles all inbound frames from the network. */ @@ -630,14 +752,7 @@ public abstract class AbstractHttp2ConnectionHandler extends ByteToMessageDecode Http2Stream stream = connection.stream(streamId); if (stream == null) { - // Create the new stream. - connection.remote().createStream(streamId, endStream); - - // Allow bi-directional traffic. - outboundFlow.addStream(streamId, streamDependency, weight, exclusive); - if (!endStream) { - inboundFlow.addStream(streamId); - } + createRemoteStream(streamId, endStream, streamDependency, weight, exclusive); } else { if (stream.state() == RESERVED_REMOTE) { // Received headers for a reserved push stream ... open it for push to the @@ -717,25 +832,7 @@ public abstract class AbstractHttp2ConnectionHandler extends ByteToMessageDecode Http2Settings settings = outstandingLocalSettingsQueue.poll(); if (settings != null) { - if (settings.hasPushEnabled()) { - connection.local().allowPushTo(settings.pushEnabled()); - } - - if (settings.hasAllowCompressedData()) { - connection.local().allowCompressedData(settings.allowCompressedData()); - } - - if (settings.hasMaxConcurrentStreams()) { - connection.remote().maxStreams(settings.maxConcurrentStreams()); - } - - if (settings.hasMaxHeaderTableSize()) { - frameReader.maxHeaderTableSize(settings.maxHeaderTableSize()); - } - - if (settings.hasInitialWindowSize()) { - inboundFlow.initialInboundWindowSize(settings.initialWindowSize()); - } + applyLocalSettings(settings); } AbstractHttp2ConnectionHandler.this.onSettingsAckRead(ctx); @@ -744,28 +841,7 @@ public abstract class AbstractHttp2ConnectionHandler extends ByteToMessageDecode @Override public void onSettingsRead(ChannelHandlerContext ctx, Http2Settings settings) throws Http2Exception { - if (settings.hasPushEnabled()) { - if (!connection.isServer()) { - throw protocolError("Client received SETTINGS frame with ENABLE_PUSH specified"); - } - connection.remote().allowPushTo(settings.pushEnabled()); - } - - if (settings.hasAllowCompressedData()) { - connection.remote().allowCompressedData(settings.allowCompressedData()); - } - - if (settings.hasMaxConcurrentStreams()) { - connection.local().maxStreams(settings.maxConcurrentStreams()); - } - - if (settings.hasMaxHeaderTableSize()) { - frameWriter.maxHeaderTableSize(settings.maxHeaderTableSize()); - } - - if (settings.hasInitialWindowSize()) { - outboundFlow.initialOutboundWindowSize(settings.initialWindowSize()); - } + applyRemoteSettings(settings); // Acknowledge receipt of the settings. frameWriter.writeSettingsAck(ctx, ctx.newPromise()); diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2Connection.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2Connection.java index bd8966a689..03d461fa95 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2Connection.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2Connection.java @@ -235,6 +235,13 @@ public class DefaultHttp2Connection implements Http2Connection { maxStreams = Integer.MAX_VALUE; } + @Override + public int nextStreamId() { + // For manually created client-side streams, 1 is reserved for HTTP upgrade, so + // start at 3. + return nextStreamId > 1? nextStreamId : nextStreamId + 2; + } + @Override public DefaultStream createStream(int streamId, boolean halfClosed) throws Http2Exception { checkNewStreamAllowed(streamId); diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2FrameWriter.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2FrameWriter.java index de95335096..40e4845ea8 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2FrameWriter.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2FrameWriter.java @@ -24,12 +24,9 @@ import static io.netty.handler.codec.http2.Http2CodecUtil.MAX_UNSIGNED_SHORT; import static io.netty.handler.codec.http2.Http2CodecUtil.MAX_WEIGHT; import static io.netty.handler.codec.http2.Http2CodecUtil.MIN_WEIGHT; import static io.netty.handler.codec.http2.Http2CodecUtil.PRIORITY_ENTRY_LENGTH; -import static io.netty.handler.codec.http2.Http2CodecUtil.SETTINGS_COMPRESS_DATA; -import static io.netty.handler.codec.http2.Http2CodecUtil.SETTINGS_ENABLE_PUSH; -import static io.netty.handler.codec.http2.Http2CodecUtil.SETTINGS_HEADER_TABLE_SIZE; -import static io.netty.handler.codec.http2.Http2CodecUtil.SETTINGS_INITIAL_WINDOW_SIZE; -import static io.netty.handler.codec.http2.Http2CodecUtil.SETTINGS_MAX_CONCURRENT_STREAMS; -import static io.netty.handler.codec.http2.Http2CodecUtil.SETTING_ENTRY_LENGTH; +import static io.netty.handler.codec.http2.Http2CodecUtil.calcSettingsPayloadLength; +import static io.netty.handler.codec.http2.Http2CodecUtil.writeFrameHeader; +import static io.netty.handler.codec.http2.Http2CodecUtil.writeSettingsPayload; import static io.netty.handler.codec.http2.Http2CodecUtil.writeUnsignedInt; import static io.netty.handler.codec.http2.Http2CodecUtil.writeUnsignedShort; import static io.netty.util.CharsetUtil.UTF_8; @@ -160,38 +157,10 @@ public class DefaultHttp2FrameWriter implements Http2FrameWriter { public ChannelFuture writeSettings(ChannelHandlerContext ctx, ChannelPromise promise, Http2Settings settings) { try { - int numFields = 0; - numFields += settings.hasAllowCompressedData() ? 1 : 0; - numFields += settings.hasMaxHeaderTableSize() ? 1 : 0; - numFields += settings.hasInitialWindowSize() ? 1 : 0; - numFields += settings.hasMaxConcurrentStreams() ? 1 : 0; - numFields += settings.hasPushEnabled() ? 1 : 0; - - int payloadLength = SETTING_ENTRY_LENGTH * numFields; + int payloadLength = calcSettingsPayloadLength(settings); ByteBuf frame = ctx.alloc().buffer(FRAME_HEADER_LENGTH + payloadLength); writeFrameHeader(frame, payloadLength, Http2FrameType.SETTINGS, Http2Flags.EMPTY, 0); - - if (settings.hasAllowCompressedData()) { - frame.writeByte(SETTINGS_COMPRESS_DATA); - writeUnsignedInt(settings.allowCompressedData() ? 1L : 0L, frame); - } - if (settings.hasMaxHeaderTableSize()) { - frame.writeByte(SETTINGS_HEADER_TABLE_SIZE); - writeUnsignedInt(settings.maxHeaderTableSize(), frame); - } - if (settings.hasInitialWindowSize()) { - frame.writeByte(SETTINGS_INITIAL_WINDOW_SIZE); - writeUnsignedInt(settings.initialWindowSize(), frame); - } - if (settings.hasMaxConcurrentStreams()) { - frame.writeByte(SETTINGS_MAX_CONCURRENT_STREAMS); - writeUnsignedInt(settings.maxConcurrentStreams(), frame); - } - if (settings.hasPushEnabled()) { - // Only write the enable push flag from client endpoints. - frame.writeByte(SETTINGS_ENABLE_PUSH); - writeUnsignedInt(settings.pushEnabled() ? 1L : 0L, frame); - } + writeSettingsPayload(settings, frame); return ctx.writeAndFlush(frame, promise); } catch (RuntimeException e) { return promise.setFailure(e); @@ -372,15 +341,6 @@ public class DefaultHttp2FrameWriter implements Http2FrameWriter { } } - private static void writeFrameHeader(ByteBuf out, int payloadLength, Http2FrameType type, - Http2Flags flags, int streamId) { - out.ensureWritable(FRAME_HEADER_LENGTH + payloadLength); - out.writeShort(payloadLength); - out.writeByte(type.typeCode()); - out.writeByte(flags.value()); - out.writeInt(streamId); - } - private ChannelFuture writeHeadersInternal(ChannelHandlerContext ctx, ChannelPromise promise, int streamId, Http2Headers headers, int padding, boolean endStream, boolean endSegment, boolean hasPriority, int streamDependency, short weight, boolean exclusive) { diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2ClientUpgradeCodec.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2ClientUpgradeCodec.java new file mode 100644 index 0000000000..776ef8053b --- /dev/null +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2ClientUpgradeCodec.java @@ -0,0 +1,120 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package io.netty.handler.codec.http2; + +import static io.netty.handler.codec.base64.Base64Dialect.URL_SAFE; +import static io.netty.handler.codec.http2.Http2CodecUtil.HTTP_UPGRADE_PROTOCOL_NAME; +import static io.netty.handler.codec.http2.Http2CodecUtil.HTTP_UPGRADE_SETTINGS_HEADER; +import static io.netty.handler.codec.http2.Http2CodecUtil.calcSettingsPayloadLength; +import static io.netty.handler.codec.http2.Http2CodecUtil.writeSettingsPayload; +import static io.netty.util.CharsetUtil.UTF_8; +import static io.netty.util.ReferenceCountUtil.release; +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.base64.Base64; +import io.netty.handler.codec.http.FullHttpResponse; +import io.netty.handler.codec.http.HttpClientUpgradeHandler; +import io.netty.handler.codec.http.HttpRequest; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; + +/** + * Client-side cleartext upgrade codec from HTTP to HTTP/2. + */ +public class Http2ClientUpgradeCodec implements HttpClientUpgradeHandler.UpgradeCodec { + + private static final List UPGRADE_HEADERS = Collections.unmodifiableList(Arrays + .asList(HTTP_UPGRADE_SETTINGS_HEADER)); + private final String handlerName; + private final AbstractHttp2ConnectionHandler connectionHandler; + + /** + * Creates the codec using a default name for the connection handler when adding to the + * pipeline. + * + * @param connectionHandler the HTTP/2 connection handler. + */ + public Http2ClientUpgradeCodec(AbstractHttp2ConnectionHandler connectionHandler) { + this("http2ConnectionHandler", connectionHandler); + } + + /** + * Creates the codec providing an upgrade to the given handler for HTTP/2. + * + * @param handlerName the name of the HTTP/2 connection handler to be used in the pipeline. + * @param connectionHandler the HTTP/2 connection handler. + */ + public Http2ClientUpgradeCodec(String handlerName, + AbstractHttp2ConnectionHandler connectionHandler) { + if (handlerName == null) { + throw new NullPointerException("handlerName"); + } + if (connectionHandler == null) { + throw new NullPointerException("connectionHandler"); + } + this.handlerName = handlerName; + this.connectionHandler = connectionHandler; + } + + @Override + public String protocol() { + return HTTP_UPGRADE_PROTOCOL_NAME; + } + + @Override + public Collection setUpgradeHeaders(ChannelHandlerContext ctx, + HttpRequest upgradeRequest) { + String settingsValue = getSettingsHeaderValue(ctx); + upgradeRequest.headers().set(HTTP_UPGRADE_SETTINGS_HEADER, settingsValue); + return UPGRADE_HEADERS; + } + + @Override + public void upgradeTo(ChannelHandlerContext ctx, FullHttpResponse upgradeResponse) + throws Exception { + // Reserve local stream 1 for the response. + connectionHandler.onHttpClientUpgrade(); + + // Add the handler to the pipeline. + ctx.pipeline().addAfter(ctx.name(), handlerName, connectionHandler); + } + + /** + * Converts the current settings for the handler to the Base64-encoded representation used in + * the HTTP2-Settings upgrade header. + */ + private String getSettingsHeaderValue(ChannelHandlerContext ctx) { + ByteBuf buf = null; + ByteBuf encodedBuf = null; + try { + // Get the local settings for the handler. + Http2Settings settings = connectionHandler.settings(); + + // Serialize the payload of the SETTINGS frame. + buf = ctx.alloc().buffer(calcSettingsPayloadLength(settings)); + writeSettingsPayload(settings, buf); + + // Base64 encode the payload and then convert to a string for the header. + encodedBuf = Base64.encode(buf, URL_SAFE); + return encodedBuf.toString(UTF_8); + } finally { + release(buf); + release(encodedBuf); + } + } +} diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2CodecUtil.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2CodecUtil.java index b33bbaf341..70b509b203 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2CodecUtil.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2CodecUtil.java @@ -31,6 +31,10 @@ public final class Http2CodecUtil { private static final byte[] EMPTY_PING = new byte[8]; public static final int CONNECTION_STREAM_ID = 0; + public static final int HTTP_UPGRADE_STREAM_ID = 1; + public static final String HTTP_UPGRADE_SETTINGS_HEADER = "HTTP2-Settings"; + public static final String HTTP_UPGRADE_PROTOCOL_NAME = "h2c-12"; + public static final int MAX_FRAME_PAYLOAD_LENGTH = 16383; public static final int PING_FRAME_PAYLOAD_LENGTH = 8; public static final short MAX_UNSIGNED_BYTE = 0xFF; @@ -126,6 +130,61 @@ public final class Http2CodecUtil { out.writeByte((int) ((value & 0xFF))); } + /** + * Writes an HTTP/2 frame header to the output buffer. + */ + public static void writeFrameHeader(ByteBuf out, int payloadLength, Http2FrameType type, + Http2Flags flags, int streamId) { + out.ensureWritable(FRAME_HEADER_LENGTH + payloadLength); + out.writeShort(payloadLength); + out.writeByte(type.typeCode()); + out.writeByte(flags.value()); + out.writeInt(streamId); + } + + /** + * Calculates the HTTP/2 SETTINGS payload length for the serialized representation + * of the given settings. + */ + public static int calcSettingsPayloadLength(Http2Settings settings) { + int numFields = 0; + numFields += settings.hasAllowCompressedData() ? 1 : 0; + numFields += settings.hasMaxHeaderTableSize() ? 1 : 0; + numFields += settings.hasInitialWindowSize() ? 1 : 0; + numFields += settings.hasMaxConcurrentStreams() ? 1 : 0; + numFields += settings.hasPushEnabled() ? 1 : 0; + + return SETTING_ENTRY_LENGTH * numFields; + } + + /** + * Serializes the settings to the output buffer in the format of an HTTP/2 SETTINGS frame + * payload. + */ + public static void writeSettingsPayload(Http2Settings settings, ByteBuf out) { + if (settings.hasAllowCompressedData()) { + out.writeByte(SETTINGS_COMPRESS_DATA); + writeUnsignedInt(settings.allowCompressedData() ? 1L : 0L, out); + } + if (settings.hasMaxHeaderTableSize()) { + out.writeByte(SETTINGS_HEADER_TABLE_SIZE); + writeUnsignedInt(settings.maxHeaderTableSize(), out); + } + if (settings.hasInitialWindowSize()) { + out.writeByte(SETTINGS_INITIAL_WINDOW_SIZE); + writeUnsignedInt(settings.initialWindowSize(), out); + } + if (settings.hasMaxConcurrentStreams()) { + out.writeByte(SETTINGS_MAX_CONCURRENT_STREAMS); + writeUnsignedInt(settings.maxConcurrentStreams(), out); + } + if (settings.hasPushEnabled()) { + // Only write the enable push flag from client endpoints. + out.writeByte(SETTINGS_ENABLE_PUSH); + writeUnsignedInt(settings.pushEnabled() ? 1L : 0L, out); + } + } + /** * Fails the given promise with the cause and then re-throws the cause. */ diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2Connection.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2Connection.java index 15d0754364..c94d56cfa6 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2Connection.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2Connection.java @@ -24,6 +24,11 @@ public interface Http2Connection { */ interface Endpoint { + /** + * Returns the next valid streamId for this endpoint. + */ + int nextStreamId(); + /** * Creates a stream initiated by this endpoint. This could fail for the following reasons: *

diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2FrameAdapter.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2FrameAdapter.java new file mode 100644 index 0000000000..8255a2ce17 --- /dev/null +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2FrameAdapter.java @@ -0,0 +1,91 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package io.netty.handler.codec.http2; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; + +/** + * Convenience class that provides no-op implementations for all methods of {@link Http2FrameObserver}. + */ +public class Http2FrameAdapter implements Http2FrameObserver { + + @Override + public void onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding, + boolean endOfStream, boolean endOfSegment, boolean compressed) throws Http2Exception { + } + + @Override + public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers, + int padding, boolean endStream, boolean endSegment) throws Http2Exception { + } + + @Override + public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers, + int streamDependency, short weight, boolean exclusive, int padding, boolean endStream, + boolean endSegment) throws Http2Exception { + } + + @Override + public void onPriorityRead(ChannelHandlerContext ctx, int streamId, int streamDependency, + short weight, boolean exclusive) throws Http2Exception { + } + + @Override + public void onRstStreamRead(ChannelHandlerContext ctx, int streamId, long errorCode) + throws Http2Exception { + } + + @Override + public void onSettingsAckRead(ChannelHandlerContext ctx) throws Http2Exception { + } + + @Override + public void onSettingsRead(ChannelHandlerContext ctx, Http2Settings settings) + throws Http2Exception { + } + + @Override + public void onPingRead(ChannelHandlerContext ctx, ByteBuf data) throws Http2Exception { + } + + @Override + public void onPingAckRead(ChannelHandlerContext ctx, ByteBuf data) throws Http2Exception { + } + + @Override + public void onPushPromiseRead(ChannelHandlerContext ctx, int streamId, int promisedStreamId, + Http2Headers headers, int padding) throws Http2Exception { + } + + @Override + public void onGoAwayRead(ChannelHandlerContext ctx, int lastStreamId, long errorCode, + ByteBuf debugData) throws Http2Exception { + } + + @Override + public void onWindowUpdateRead(ChannelHandlerContext ctx, int streamId, int windowSizeIncrement) + throws Http2Exception { + } + + @Override + public void onAltSvcRead(ChannelHandlerContext ctx, int streamId, long maxAge, int port, + ByteBuf protocolId, String host, String origin) throws Http2Exception { + } + + @Override + public void onBlockedRead(ChannelHandlerContext ctx, int streamId) throws Http2Exception { + } +} diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2FrameLogger.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2FrameLogger.java index 995c02b1c7..6d532236d3 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2FrameLogger.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2FrameLogger.java @@ -58,7 +58,7 @@ public class Http2FrameLogger extends ChannelHandlerAdapter { public void logHeaders(Direction direction, int streamId, Http2Headers headers, int padding, boolean endStream, boolean endSegment) { - log(direction, "HEADERS: steramId:%d, headers=%s, padding=%d, endStream=%b, endSegment=%b", + log(direction, "HEADERS: streamId:%d, headers=%s, padding=%d, endStream=%b, endSegment=%b", streamId, headers, padding, endStream, endSegment); } @@ -66,7 +66,7 @@ public class Http2FrameLogger extends ChannelHandlerAdapter { int streamDependency, short weight, boolean exclusive, int padding, boolean endStream, boolean endSegment) { log(direction, - "HEADERS: steramId:%d, headers=%s, streamDependency=%d, weight=%d, exclusive=%b, " + "HEADERS: streamId:%d, headers=%s, streamDependency=%d, weight=%d, exclusive=%b, " + "padding=%d, endStream=%b, endSegment=%b", streamId, headers, streamDependency, weight, exclusive, padding, endStream, endSegment); } @@ -109,7 +109,7 @@ public class Http2FrameLogger extends ChannelHandlerAdapter { } public void logWindowsUpdate(Direction direction, int streamId, int windowSizeIncrement) { - log(direction, "WINDOW_UPDATE: stream=%d, windowSizeIncrement=%d", streamId, + log(direction, "WINDOW_UPDATE: streamId=%d, windowSizeIncrement=%d", streamId, windowSizeIncrement); } diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2ServerUpgradeCodec.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2ServerUpgradeCodec.java new file mode 100644 index 0000000000..32577eb1c3 --- /dev/null +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2ServerUpgradeCodec.java @@ -0,0 +1,164 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package io.netty.handler.codec.http2; + +import static io.netty.handler.codec.base64.Base64Dialect.URL_SAFE; +import static io.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST; +import static io.netty.handler.codec.http2.Http2CodecUtil.HTTP_UPGRADE_PROTOCOL_NAME; +import static io.netty.handler.codec.http2.Http2CodecUtil.HTTP_UPGRADE_SETTINGS_HEADER; +import static io.netty.handler.codec.http2.Http2CodecUtil.writeFrameHeader; +import static io.netty.handler.codec.http2.Http2Flags.EMPTY; +import static io.netty.handler.codec.http2.Http2FrameType.SETTINGS; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.base64.Base64; +import io.netty.handler.codec.http.FullHttpRequest; +import io.netty.handler.codec.http.FullHttpResponse; +import io.netty.handler.codec.http.HttpServerUpgradeHandler; +import io.netty.util.CharsetUtil; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; + +/** + * Server-side codec for performing a cleartext upgrade from HTTP/1.x to HTTP/2. + */ +public class Http2ServerUpgradeCodec implements HttpServerUpgradeHandler.UpgradeCodec { + + private static final List REQUIRED_UPGRADE_HEADERS = Collections + .unmodifiableList(Arrays.asList(HTTP_UPGRADE_SETTINGS_HEADER)); + private final String handlerName; + private final AbstractHttp2ConnectionHandler connectionHandler; + private final Http2FrameReader frameReader; + private Http2Settings settings; + + /** + * Creates the codec using a default name for the connection handler when adding to the + * pipeline. + * + * @param connectionHandler the HTTP/2 connection handler. + */ + public Http2ServerUpgradeCodec(AbstractHttp2ConnectionHandler connectionHandler) { + this("http2ConnectionHandler", connectionHandler); + } + + /** + * Creates the codec providing an upgrade to the given handler for HTTP/2. + * + * @param handlerName the name of the HTTP/2 connection handler to be used in the pipeline. + * @param connectionHandler the HTTP/2 connection handler. + */ + public Http2ServerUpgradeCodec(String handlerName, + AbstractHttp2ConnectionHandler connectionHandler) { + if (handlerName == null) { + throw new NullPointerException("handlerName"); + } + if (connectionHandler == null) { + throw new NullPointerException("connectionHandler"); + } + this.handlerName = handlerName; + this.connectionHandler = connectionHandler; + this.frameReader = new DefaultHttp2FrameReader(); + } + + @Override + public String protocol() { + return HTTP_UPGRADE_PROTOCOL_NAME; + } + + @Override + public Collection requiredUpgradeHeaders() { + return REQUIRED_UPGRADE_HEADERS; + } + + @Override + public void prepareUpgradeResponse(ChannelHandlerContext ctx, FullHttpRequest upgradeRequest, + FullHttpResponse upgradeResponse) { + try { + // Decode the HTTP2-Settings header and set the settings on the handler to make + // sure everything is fine with the request. + String settingsHeader = upgradeRequest.headers().get(HTTP_UPGRADE_SETTINGS_HEADER); + settings = decodeSettingsHeader(ctx, settingsHeader); + connectionHandler.onHttpServerUpgrade(settings); + // Everything looks good, no need to modify the response. + } catch (Throwable e) { + // Send a failed response back to the client. + upgradeResponse.setStatus(BAD_REQUEST); + upgradeResponse.headers().clear(); + } + } + + @Override + public void upgradeTo(final ChannelHandlerContext ctx, FullHttpRequest upgradeRequest, + FullHttpResponse upgradeResponse) { + // Add the HTTP/2 connection handler to the pipeline immediately following the current + // handler. + ctx.pipeline().addAfter(ctx.name(), handlerName, connectionHandler); + } + + /** + * Decodes the settings header and returns a {@link Http2Settings} object. + */ + private Http2Settings decodeSettingsHeader(ChannelHandlerContext ctx, String settingsHeader) + throws Http2Exception { + ByteBuf header = Unpooled.wrappedBuffer(settingsHeader.getBytes(CharsetUtil.UTF_8)); + try { + // Decode the SETTINGS payload. + ByteBuf payload = Base64.decode(header, URL_SAFE); + + // Create an HTTP/2 frame for the settings. + ByteBuf frame = createSettingsFrame(ctx, payload); + + // Decode the SETTINGS frame and return the settings object. + return decodeSettings(ctx, frame); + } finally { + header.release(); + } + } + + /** + * Decodes the settings frame and returns the settings. + */ + private Http2Settings decodeSettings(ChannelHandlerContext ctx, ByteBuf frame) throws Http2Exception { + try { + final Http2Settings decodedSettings = new Http2Settings(); + frameReader.readFrame(ctx, frame, new Http2FrameAdapter() { + @Override + public void onSettingsRead(ChannelHandlerContext ctx, Http2Settings settings) + throws Http2Exception { + decodedSettings.copy(settings); + } + }); + return decodedSettings; + } finally { + frame.release(); + } + } + + /** + * Creates an HTTP2-Settings header with the given payload. The payload buffer is released. + */ + private static ByteBuf createSettingsFrame(ChannelHandlerContext ctx, ByteBuf payload) { + ByteBuf frame = + ctx.alloc().buffer(Http2CodecUtil.FRAME_HEADER_LENGTH + payload.readableBytes()); + writeFrameHeader(frame, payload.readableBytes(), SETTINGS, EMPTY, 0); + frame.writeBytes(payload); + payload.release(); + return frame; + } +} diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2Settings.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2Settings.java index 7b6788c248..c400685be2 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2Settings.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2Settings.java @@ -180,6 +180,22 @@ public class Http2Settings { return this; } + /** + * Overwrites this settings object with the values of the given settings. + * + * @param source the source that will overwrite the current settings. + * @return this object. + */ + public Http2Settings copy(Http2Settings source) { + this.enabled = source.enabled; + this.allowCompressedData = source.allowCompressedData; + this.initialWindowSize = source.initialWindowSize; + this.maxConcurrentStreams = source.maxConcurrentStreams; + this.maxHeaderTableSize = source.maxHeaderTableSize; + this.pushEnabled = source.pushEnabled; + return this; + } + @Override public int hashCode() { final int prime = 31; diff --git a/example/src/main/java/io/netty/example/http2/Http2ExampleUtil.java b/example/src/main/java/io/netty/example/http2/Http2ExampleUtil.java new file mode 100644 index 0000000000..e86224709a --- /dev/null +++ b/example/src/main/java/io/netty/example/http2/Http2ExampleUtil.java @@ -0,0 +1,121 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package io.netty.example.http2; + +import java.util.Locale; + +/** + * Utility methods used by the example client and server. + */ +public final class Http2ExampleUtil { + + /** + * Response header sent in response to the http->http2 cleartext upgrade request. + */ + public static final String UPGRADE_RESPONSE_HEADER = "Http-To-Http2-Upgrade"; + + public static final String STANDARD_HOST = "localhost"; + public static final int STANDARD_HTTP_PORT = 8080; + public static final int STANDARD_HTTPS_PORT = 8443; + public static final String PORT_ARG = "-port="; + public static final String SSL_ARG = "-ssl="; + public static final String HOST_ARG = "-host="; + + /** + * The configuration for the example client/server endpoints. + */ + public static final class EndpointConfig { + private final boolean ssl; + private final String host; + private final int port; + + public EndpointConfig(boolean ssl, String host, int port) { + this.ssl = ssl; + this.host = host; + this.port = port; + } + + public boolean isSsl() { + return ssl; + } + + public String host() { + return host; + } + + public int port() { + return port; + } + + @Override + public String toString() { + return "EndpointConfig [ssl=" + ssl + ", host=" + host + ", port=" + port + "]"; + } + } + + /** + * Parse the command-line arguments to determine the configuration for the endpoint. + */ + public static EndpointConfig parseEndpointConfig(String[] args) { + boolean ssl = false; + int port = STANDARD_HTTP_PORT; + String host = STANDARD_HOST; + boolean portSpecified = false; + for (String arg : args) { + arg = arg.trim().toLowerCase(Locale.US); + if (arg.startsWith(PORT_ARG)) { + String value = arg.substring(PORT_ARG.length()); + port = Integer.parseInt(value); + portSpecified = true; + } else if (arg.startsWith(SSL_ARG)) { + String value = arg.substring(SSL_ARG.length()); + ssl = Boolean.parseBoolean(value); + if (!portSpecified) { + // Use the appropriate default. + port = ssl ? STANDARD_HTTPS_PORT : STANDARD_HTTP_PORT; + } + } else if (arg.startsWith(HOST_ARG)) { + host = arg.substring(HOST_ARG.length()); + } + } + + // If SSL was selected, verify that NPN is supported. + if (ssl) { + checkForNpnSupport(); + } + + return new EndpointConfig(ssl, host, port); + } + + /** + * Checks for NPN support. If not supported, prints an error message throws an exception. + */ + private static void checkForNpnSupport() { + try { + Class.forName("sun.security.ssl.NextProtoNegoExtension"); + } catch (ClassNotFoundException ignored) { + System.err.println(); + System.err.println("Could not locate Next Protocol Negotiation (NPN) implementation."); + System.err.println("The NPN jar should have been made available when building the examples with maven."); + System.err.println("Please check that your JDK is among those supported by Jetty-NPN:"); + System.err.println("http://wiki.eclipse.org/Jetty/Feature/NPN#Versions"); + System.err.println(); + throw new IllegalStateException("Could not locate NPN implementation. See console err for details."); + } + } + + private Http2ExampleUtil() { + } +} diff --git a/example/src/main/java/io/netty/example/http2/client/Http2Client.java b/example/src/main/java/io/netty/example/http2/client/Http2Client.java index 811beda98d..be07e90c98 100644 --- a/example/src/main/java/io/netty/example/http2/client/Http2Client.java +++ b/example/src/main/java/io/netty/example/http2/client/Http2Client.java @@ -14,42 +14,46 @@ */ package io.netty.example.http2.client; -import static java.util.concurrent.TimeUnit.SECONDS; +import static io.netty.example.http2.Http2ExampleUtil.parseEndpointConfig; +import static io.netty.handler.codec.http.HttpHeaders.Names.HOST; +import static io.netty.handler.codec.http.HttpMethod.GET; +import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1; import io.netty.bootstrap.Bootstrap; -import io.netty.buffer.ByteBuf; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; -import io.netty.example.http2.server.Http2Server; -import io.netty.handler.codec.http.HttpMethod; -import io.netty.handler.codec.http2.DefaultHttp2Headers; -import io.netty.handler.codec.http2.Http2Exception; -import io.netty.handler.codec.http2.Http2Headers; +import io.netty.example.http2.Http2ExampleUtil.EndpointConfig; +import io.netty.handler.codec.http.DefaultFullHttpRequest; +import io.netty.handler.codec.http.FullHttpRequest; import java.net.InetSocketAddress; -import java.util.concurrent.BlockingQueue; /** * An HTTP2 client that allows you to send HTTP2 frames to a server. Inbound and outbound frames are - * logged. + * logged. When run from the command-line, sends a single HEADERS frame to the server and gets back + * a "Hello World" response. + *

+ * To client accepts command-line arguments for {@code -host=} + * (default="localhost"), {@code -port=} (default: http=8080, + * https=8443), and {@code -ssl=} (default=false). */ public class Http2Client { - private final String host; - private final int port; - private final Http2ClientConnectionHandler http2ConnectionHandler; + private final EndpointConfig config; + private Http2ClientConnectionHandler http2ConnectionHandler; private Channel channel; private EventLoopGroup workerGroup; - public Http2Client(String host, int port) { - this.host = host; - this.port = port; - http2ConnectionHandler = new Http2ClientConnectionHandler(); + public Http2Client(EndpointConfig config) { + this.config = config; } + /** + * Starts the client and waits for the HTTP/2 upgrade to occur. + */ public void start() throws Exception { if (channel != null) { System.out.println("Already running!"); @@ -62,15 +66,40 @@ public class Http2Client { b.group(workerGroup); b.channel(NioSocketChannel.class); b.option(ChannelOption.SO_KEEPALIVE, true); - b.remoteAddress(new InetSocketAddress(host, port)); - b.handler(new Http2ClientInitializer(http2ConnectionHandler)); + b.remoteAddress(new InetSocketAddress(config.host(), config.port())); + Http2ClientInitializer initializer = new Http2ClientInitializer(config.isSsl()); + b.handler(initializer); // Start the client. channel = b.connect().syncUninterruptibly().channel(); + System.out.println("Connected to [" + config.host() + ':' + config.port() + ']'); + + // Wait for the HTTP/2 upgrade to occur. + http2ConnectionHandler = initializer.connectionHandler(); http2ConnectionHandler.awaitInitialization(); - System.out.println("Connected to [" + host + ':' + port + ']'); } + /** + * Sends the given request to the server. + */ + public void sendRequest(FullHttpRequest request) throws Exception { + ChannelFuture requestFuture = channel.writeAndFlush(request).sync(); + System.out.println("Back from sending headers..."); + if (!requestFuture.isSuccess()) { + throw new RuntimeException(requestFuture.cause()); + } + } + + /** + * Waits for the full response to be received. + */ + public void awaitResponse() throws Exception { + http2ConnectionHandler.awaitResponse(); + } + + /** + * Closes the channel and waits for shutdown to complete. + */ public void stop() { try { // Wait until the connection is closed. @@ -82,52 +111,29 @@ public class Http2Client { } } - public ChannelFuture sendHeaders(int streamId, Http2Headers headers) throws Http2Exception { - return http2ConnectionHandler.writeHeaders(streamId, headers, 0, true, true); - } - - public ChannelFuture send(int streamId, ByteBuf data, int padding, boolean endStream, - boolean endSegment, boolean compressed) throws Http2Exception { - return http2ConnectionHandler.writeData(streamId, data, padding, endStream, endSegment, - compressed); - } - - public Http2Headers headers() { - return DefaultHttp2Headers.newBuilder().authority(host).method(HttpMethod.GET.name()) - .build(); - } - - public BlockingQueue queue() { - return http2ConnectionHandler.queue(); - } - public static void main(String[] args) throws Exception { - Http2Server.checkForNpnSupport(); - int port; - if (args.length > 0) { - port = Integer.parseInt(args[0]); - } else { - port = 8443; - } - - final Http2Client client = new Http2Client("localhost", port); + EndpointConfig config = parseEndpointConfig(args); + System.out.println(config); + final Http2Client client = new Http2Client(config); try { + // Start the client and wait for the HTTP/2 upgrade to complete. client.start(); - System.out.println("Sending headers..."); - ChannelFuture requestFuture = client.sendHeaders(3, client.headers()).sync(); + + // Create a simple GET request with just headers. + FullHttpRequest request = new DefaultFullHttpRequest(HTTP_1_1, GET, "/whatever"); + request.headers().add(HOST, config.host()); + + System.out.println("Sending request..."); + ChannelFuture requestFuture = client.channel.writeAndFlush(request).sync(); System.out.println("Back from sending headers..."); if (!requestFuture.isSuccess()) { requestFuture.cause().printStackTrace(); + return; } // Waits for the complete response - ChannelFuture responseFuture = client.queue().poll(5, SECONDS); - - if (!responseFuture.isSuccess()) { - responseFuture.cause().printStackTrace(); - } - + client.awaitResponse(); System.out.println("Finished HTTP/2 request"); } catch (Throwable t) { t.printStackTrace(); diff --git a/example/src/main/java/io/netty/example/http2/client/Http2ClientConnectionHandler.java b/example/src/main/java/io/netty/example/http2/client/Http2ClientConnectionHandler.java index 8f09b738da..2486bbb703 100644 --- a/example/src/main/java/io/netty/example/http2/client/Http2ClientConnectionHandler.java +++ b/example/src/main/java/io/netty/example/http2/client/Http2ClientConnectionHandler.java @@ -14,15 +14,17 @@ */ package io.netty.example.http2.client; +import static io.netty.example.http2.Http2ExampleUtil.UPGRADE_RESPONSE_HEADER; import static io.netty.util.internal.logging.InternalLogLevel.INFO; import io.netty.buffer.ByteBuf; -import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPromise; +import io.netty.handler.codec.http.FullHttpMessage; import io.netty.handler.codec.http2.AbstractHttp2ConnectionHandler; import io.netty.handler.codec.http2.DefaultHttp2Connection; import io.netty.handler.codec.http2.DefaultHttp2FrameReader; import io.netty.handler.codec.http2.DefaultHttp2FrameWriter; +import io.netty.handler.codec.http2.DefaultHttp2Headers; import io.netty.handler.codec.http2.DefaultHttp2InboundFlowController; import io.netty.handler.codec.http2.DefaultHttp2OutboundFlowController; import io.netty.handler.codec.http2.Http2Exception; @@ -36,8 +38,7 @@ import io.netty.handler.codec.http2.Http2Settings; import io.netty.util.CharsetUtil; import io.netty.util.internal.logging.InternalLoggerFactory; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; +import java.util.Map; import java.util.concurrent.TimeUnit; /** @@ -47,36 +48,65 @@ import java.util.concurrent.TimeUnit; public class Http2ClientConnectionHandler extends AbstractHttp2ConnectionHandler { private static final Http2FrameLogger logger = new Http2FrameLogger(INFO, InternalLoggerFactory.getInstance(Http2ClientConnectionHandler.class)); - private final BlockingQueue queue = new LinkedBlockingQueue(); - + private final ChannelPromise initPromise; + private final ChannelPromise responsePromise; private ByteBuf collectedData; - private ChannelPromise initialized; - public Http2ClientConnectionHandler() { + public Http2ClientConnectionHandler(ChannelPromise initPromise, ChannelPromise responsePromise) { super(new DefaultHttp2Connection(false, false), frameReader(), frameWriter(), new DefaultHttp2InboundFlowController(), new DefaultHttp2OutboundFlowController()); + this.initPromise = initPromise; + this.responsePromise = responsePromise; } - public void awaitInitialization() throws InterruptedException { - initialized.await(5, TimeUnit.SECONDS); + /** + * Wait for this handler to be added after the upgrade to HTTP/2, and for initial preface + * handshake to complete. + */ + public void awaitInitialization() throws Exception { + if (!initPromise.awaitUninterruptibly(5, TimeUnit.SECONDS)) { + throw new IllegalStateException("Timed out waiting for initialization"); + } + if (!initPromise.isSuccess()) { + throw new RuntimeException(initPromise.cause()); + } } + /** + * Wait for this full response to be received and printed out. + */ + public void awaitResponse() throws Exception { + if (!responsePromise.awaitUninterruptibly(5, TimeUnit.SECONDS)) { + throw new IllegalStateException("Timed out waiting for completion of the response"); + } + if (!responsePromise.isSuccess()) { + throw new RuntimeException(initPromise.cause()); + } + } + + /** + * Handles conversion of a {@link FullHttpMessage} to HTTP/2 frames. + */ @Override - public void handlerAdded(ChannelHandlerContext ctx) throws Exception { - super.handlerAdded(ctx); - initialized = ctx.newPromise(); - } + public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) + throws Exception { + if (msg instanceof FullHttpMessage) { + FullHttpMessage httpMsg = (FullHttpMessage) msg; + boolean hasData = httpMsg.content().isReadable(); - public ChannelFuture writeData(int streamId, ByteBuf data, int padding, boolean endStream, - boolean endSegment, boolean compressed) throws Http2Exception { - return super.writeData(ctx(), ctx().newPromise(), streamId, data, padding, endStream, - endSegment, compressed); - } - - public ChannelFuture writeHeaders(int streamId, Http2Headers headers, int padding, - boolean endStream, boolean endSegment) throws Http2Exception { - return super.writeHeaders(ctx(), ctx().newPromise(), streamId, headers, padding, endStream, - endSegment); + // Convert and write the headers. + DefaultHttp2Headers.Builder headers = DefaultHttp2Headers.newBuilder(); + for (Map.Entry entry : httpMsg.headers().entries()) { + headers.add(entry.getKey(), entry.getValue()); + } + int streamId = nextStreamId(); + writeHeaders(ctx, promise, streamId, headers.build(), 0, !hasData, false); + if (hasData) { + writeData(ctx, promise, streamId, httpMsg.content(), 0, true, true, false); + } + } else { + super.write(ctx, msg, promise); + } } @Override @@ -107,7 +137,7 @@ public class Http2ClientConnectionHandler extends AbstractHttp2ConnectionHandler collectedData.release(); collectedData = null; - queue.add(ctx().channel().newSucceededFuture()); + responsePromise.setSuccess(); } } @@ -120,6 +150,9 @@ public class Http2ClientConnectionHandler extends AbstractHttp2ConnectionHandler public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int streamDependency, short weight, boolean exclusive, int padding, boolean endStream, boolean endSegment) throws Http2Exception { + if (headers.contains(UPGRADE_RESPONSE_HEADER)) { + System.out.println("Received HTTP/2 response to the HTTP->HTTP/2 upgrade request"); + } } @Override @@ -139,8 +172,8 @@ public class Http2ClientConnectionHandler extends AbstractHttp2ConnectionHandler @Override public void onSettingsRead(ChannelHandlerContext ctx, Http2Settings settings) throws Http2Exception { - if (!initialized.isDone()) { - initialized.setSuccess(); + if (!initPromise.isDone()) { + initPromise.setSuccess(); } } @@ -178,17 +211,13 @@ public class Http2ClientConnectionHandler extends AbstractHttp2ConnectionHandler @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { - queue.add(ctx.channel().newFailedFuture(cause)); - cause.printStackTrace(); - - super.exceptionCaught(ctx, cause); - if (!initialized.isDone()) { - initialized.setFailure(cause); + if (!initPromise.isDone()) { + initPromise.setFailure(cause); } - } - - public BlockingQueue queue() { - return queue; + if (!responsePromise.isDone()) { + initPromise.setFailure(cause); + } + super.exceptionCaught(ctx, cause); } private static Http2FrameReader frameReader() { diff --git a/example/src/main/java/io/netty/example/http2/client/Http2ClientInitializer.java b/example/src/main/java/io/netty/example/http2/client/Http2ClientInitializer.java index 939c22e750..c11de637d7 100644 --- a/example/src/main/java/io/netty/example/http2/client/Http2ClientInitializer.java +++ b/example/src/main/java/io/netty/example/http2/client/Http2ClientInitializer.java @@ -14,11 +14,18 @@ */ package io.netty.example.http2.client; +import io.netty.channel.ChannelHandlerAdapter; +import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; import io.netty.example.securechat.SecureChatSslContextFactory; -import io.netty.handler.codec.http2.AbstractHttp2ConnectionHandler; +import io.netty.handler.codec.http.DefaultFullHttpRequest; +import io.netty.handler.codec.http.HttpClientCodec; +import io.netty.handler.codec.http.HttpClientUpgradeHandler; +import io.netty.handler.codec.http.HttpMethod; +import io.netty.handler.codec.http.HttpVersion; +import io.netty.handler.codec.http2.Http2ClientUpgradeCodec; import io.netty.handler.ssl.SslHandler; import javax.net.ssl.SSLEngine; @@ -30,14 +37,31 @@ import org.eclipse.jetty.npn.NextProtoNego; */ public class Http2ClientInitializer extends ChannelInitializer { - private final AbstractHttp2ConnectionHandler connectionHandler; + private Http2ClientConnectionHandler connectionHandler; + private final boolean ssl; - public Http2ClientInitializer(AbstractHttp2ConnectionHandler connectionHandler) { - this.connectionHandler = connectionHandler; + public Http2ClientInitializer(boolean ssl) { + this.ssl = ssl; } @Override public void initChannel(SocketChannel ch) throws Exception { + connectionHandler = new Http2ClientConnectionHandler(ch.newPromise(), ch.newPromise()); + if (ssl) { + configureSsl(ch); + } else { + configureClearText(ch); + } + } + + public Http2ClientConnectionHandler connectionHandler() { + return connectionHandler; + } + + /** + * Configure the pipeline for TLS NPN negotiation to HTTP/2. + */ + private void configureSsl(SocketChannel ch) { SSLEngine engine = SecureChatSslContextFactory.getClientContext().createSSLEngine(); engine.setUseClientMode(true); NextProtoNego.put(engine, new Http2ClientProvider()); @@ -48,4 +72,46 @@ public class Http2ClientInitializer extends ChannelInitializer { pipeline.addLast("ssl", new SslHandler(engine)); pipeline.addLast("http2ConnectionHandler", connectionHandler); } + + /** + * Configure the pipeline for a cleartext upgrade from HTTP to HTTP/2. + */ + private void configureClearText(SocketChannel ch) { + HttpClientCodec sourceCodec = new HttpClientCodec(); + Http2ClientUpgradeCodec upgradeCodec = new Http2ClientUpgradeCodec(connectionHandler); + HttpClientUpgradeHandler upgradeHandler = new HttpClientUpgradeHandler(sourceCodec, upgradeCodec, 65536); + + ch.pipeline().addLast(sourceCodec); + ch.pipeline().addLast(upgradeHandler); + ch.pipeline().addLast(new UpgradeRequestHandler()); + ch.pipeline().addLast(new UserEventLogger()); + } + + /** + * A handler that triggers the cleartext upgrade to HTTP/2 by sending an initial HTTP request. + */ + private static class UpgradeRequestHandler extends ChannelHandlerAdapter { + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + DefaultFullHttpRequest upgradeRequest = + new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/"); + ctx.writeAndFlush(upgradeRequest); + + super.channelActive(ctx); + + // Done with this handler, remove it from the pipeline. + ctx.pipeline().remove(ctx.name()); + } + } + + /** + * Class that logs any User Events triggered on this channel. + */ + private static class UserEventLogger extends ChannelHandlerAdapter { + @Override + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { + System.out.println("User Event Triggered: " + evt); + super.userEventTriggered(ctx, evt); + } + } } diff --git a/example/src/main/java/io/netty/example/http2/server/HelloWorldHttp2Handler.java b/example/src/main/java/io/netty/example/http2/server/HelloWorldHttp2Handler.java index 959af2b285..fd2d90578a 100644 --- a/example/src/main/java/io/netty/example/http2/server/HelloWorldHttp2Handler.java +++ b/example/src/main/java/io/netty/example/http2/server/HelloWorldHttp2Handler.java @@ -15,26 +15,62 @@ package io.netty.example.http2.server; +import static io.netty.example.http2.Http2ExampleUtil.UPGRADE_RESPONSE_HEADER; +import static io.netty.util.internal.logging.InternalLogLevel.INFO; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; +import io.netty.example.http2.client.Http2ClientConnectionHandler; +import io.netty.handler.codec.http.HttpServerUpgradeHandler; import io.netty.handler.codec.http2.AbstractHttp2ConnectionHandler; +import io.netty.handler.codec.http2.DefaultHttp2Connection; +import io.netty.handler.codec.http2.DefaultHttp2FrameReader; +import io.netty.handler.codec.http2.DefaultHttp2FrameWriter; import io.netty.handler.codec.http2.DefaultHttp2Headers; +import io.netty.handler.codec.http2.DefaultHttp2InboundFlowController; +import io.netty.handler.codec.http2.DefaultHttp2OutboundFlowController; import io.netty.handler.codec.http2.Http2Exception; +import io.netty.handler.codec.http2.Http2FrameLogger; import io.netty.handler.codec.http2.Http2Headers; +import io.netty.handler.codec.http2.Http2InboundFrameLogger; +import io.netty.handler.codec.http2.Http2OutboundFrameLogger; import io.netty.handler.codec.http2.Http2Settings; import io.netty.util.CharsetUtil; +import io.netty.util.internal.logging.InternalLoggerFactory; /** * A simple handler that responds with the message "Hello World!". */ public class HelloWorldHttp2Handler extends AbstractHttp2ConnectionHandler { + private static final Http2FrameLogger logger = new Http2FrameLogger(INFO, + InternalLoggerFactory.getInstance(Http2ClientConnectionHandler.class)); static final byte[] RESPONSE_BYTES = "Hello World".getBytes(CharsetUtil.UTF_8); public HelloWorldHttp2Handler() { - super(true); + super(new DefaultHttp2Connection(true, false), new Http2InboundFrameLogger( + new DefaultHttp2FrameReader(), logger), new Http2OutboundFrameLogger( + new DefaultHttp2FrameWriter(), logger), new DefaultHttp2InboundFlowController(), + new DefaultHttp2OutboundFlowController()); } + /** + * Handles the cleartext HTTP upgrade event. If an upgrade occurred, sends a simple response via HTTP/2 + * on stream 1 (the stream specifically reserved for cleartext HTTP upgrade). + */ + @Override + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { + if (evt instanceof HttpServerUpgradeHandler.UpgradeEvent) { + // Write an HTTP/2 response to the upgrade request + Http2Headers headers = + DefaultHttp2Headers.newBuilder().set(UPGRADE_RESPONSE_HEADER, "true").build(); + writeHeaders(ctx, ctx.newPromise(), 1, headers, 0, true, true); + } + super.userEventTriggered(ctx, evt); + } + + /** + * If receive a frame with end-of-stream set, send a pre-canned response. + */ @Override public void onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding, boolean endOfStream, boolean endOfSegment, boolean compressed) throws Http2Exception { @@ -43,6 +79,9 @@ public class HelloWorldHttp2Handler extends AbstractHttp2ConnectionHandler { } } + /** + * If receive a frame with end-of-stream set, send a pre-canned response. + */ @Override public void onHeadersRead(ChannelHandlerContext ctx, int streamId, io.netty.handler.codec.http2.Http2Headers headers, int padding, boolean endStream, @@ -52,6 +91,9 @@ public class HelloWorldHttp2Handler extends AbstractHttp2ConnectionHandler { } } + /** + * If receive a frame with end-of-stream set, send a pre-canned response. + */ @Override public void onHeadersRead(ChannelHandlerContext ctx, int streamId, io.netty.handler.codec.http2.Http2Headers headers, int streamDependency, short weight, @@ -77,7 +119,8 @@ public class HelloWorldHttp2Handler extends AbstractHttp2ConnectionHandler { } @Override - public void onSettingsRead(ChannelHandlerContext ctx, Http2Settings settings) throws Http2Exception { + public void onSettingsRead(ChannelHandlerContext ctx, Http2Settings settings) + throws Http2Exception { } @Override @@ -100,7 +143,7 @@ public class HelloWorldHttp2Handler extends AbstractHttp2ConnectionHandler { @Override public void onWindowUpdateRead(ChannelHandlerContext ctx, int streamId, int windowSizeIncrement) - throws Http2Exception { + throws Http2Exception { } @Override @@ -118,6 +161,9 @@ public class HelloWorldHttp2Handler extends AbstractHttp2ConnectionHandler { super.exceptionCaught(ctx, cause); } + /** + * Sends a "Hello World" DATA frame to the client. + */ private void sendResponse(ChannelHandlerContext ctx, int streamId) throws Http2Exception { // Send a frame for the response status Http2Headers headers = DefaultHttp2Headers.newBuilder().status("200").build(); diff --git a/example/src/main/java/io/netty/example/http2/server/Http2Server.java b/example/src/main/java/io/netty/example/http2/server/Http2Server.java index 5bbf6c4d09..27484a7cf4 100644 --- a/example/src/main/java/io/netty/example/http2/server/Http2Server.java +++ b/example/src/main/java/io/netty/example/http2/server/Http2Server.java @@ -16,24 +16,28 @@ package io.netty.example.http2.server; +import static io.netty.example.http2.Http2ExampleUtil.parseEndpointConfig; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.example.http2.Http2ExampleUtil.EndpointConfig; /** - * A HTTP/2 Server that responds to requests with a Hello World. + * A HTTP/2 Server that responds to requests with a Hello World. Once started, you can test the + * server with the example client. *

- * Once started, you can test the server with the example client. + * To server accepts command-line arguments for {@code -port=} (default: http=8080, + * https=8443), and {@code -ssl=} (default=false). */ public class Http2Server { - private final int port; + private final EndpointConfig config; - public Http2Server(int port) { - this.port = port; + public Http2Server(EndpointConfig config) { + this.config = config; } public void run() throws Exception { @@ -44,9 +48,9 @@ public class Http2Server { ServerBootstrap b = new ServerBootstrap(); b.option(ChannelOption.SO_BACKLOG, 1024); b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class) - .childHandler(new Http2ServerInitializer()); + .childHandler(new Http2ServerInitializer(config.isSsl())); - Channel ch = b.bind(port).sync().channel(); + Channel ch = b.bind(config.port()).sync().channel(); ch.closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); @@ -55,30 +59,10 @@ public class Http2Server { } public static void main(String[] args) throws Exception { - checkForNpnSupport(); - int port; - if (args.length > 0) { - port = Integer.parseInt(args[0]); - } else { - port = 8443; - } + EndpointConfig config = parseEndpointConfig(args); + System.out.println(config); - System.out.println("HTTP2 server started at port " + port + '.'); - - new Http2Server(port).run(); - } - - public static void checkForNpnSupport() { - try { - Class.forName("sun.security.ssl.NextProtoNegoExtension"); - } catch (ClassNotFoundException ignored) { - System.err.println(); - System.err.println("Could not locate Next Protocol Negotiation (NPN) implementation."); - System.err.println("The NPN jar should have been made available when building the examples with maven."); - System.err.println("Please check that your JDK is among those supported by Jetty-NPN:"); - System.err.println("http://wiki.eclipse.org/Jetty/Feature/NPN#Versions"); - System.err.println(); - throw new IllegalStateException("Could not locate NPN implementation. See console err for details."); - } + System.out.println("HTTP2 server started at port " + config.port() + '.'); + new Http2Server(config).run(); } } diff --git a/example/src/main/java/io/netty/example/http2/server/Http2ServerInitializer.java b/example/src/main/java/io/netty/example/http2/server/Http2ServerInitializer.java index 60c3101514..224b237be6 100644 --- a/example/src/main/java/io/netty/example/http2/server/Http2ServerInitializer.java +++ b/example/src/main/java/io/netty/example/http2/server/Http2ServerInitializer.java @@ -16,21 +16,50 @@ package io.netty.example.http2.server; +import io.netty.channel.ChannelHandlerAdapter; +import io.netty.channel.ChannelHandlerAppender; +import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; import io.netty.example.securechat.SecureChatSslContextFactory; +import io.netty.handler.codec.http.HttpObjectAggregator; +import io.netty.handler.codec.http.HttpRequestDecoder; +import io.netty.handler.codec.http.HttpResponseEncoder; +import io.netty.handler.codec.http.HttpServerUpgradeHandler; +import io.netty.handler.codec.http2.Http2ServerUpgradeCodec; import io.netty.handler.ssl.SslHandler; -import org.eclipse.jetty.npn.NextProtoNego; + +import java.util.Arrays; import javax.net.ssl.SSLEngine; +import org.eclipse.jetty.npn.NextProtoNego; + /** - * Sets up the Netty pipeline + * Sets up the Netty pipeline for the example server. Depending on the endpoint config, sets up the + * pipeline for NPN or cleartext HTTP upgrade to HTTP/2. */ public class Http2ServerInitializer extends ChannelInitializer { + private final boolean ssl; + + public Http2ServerInitializer(boolean ssl) { + this.ssl = ssl; + } + @Override public void initChannel(SocketChannel ch) throws Exception { + if (ssl) { + configureSsl(ch); + } else { + configureClearText(ch); + } + } + + /** + * Configure the pipeline for TLS NPN negotiation to HTTP/2. + */ + private void configureSsl(SocketChannel ch) { ChannelPipeline p = ch.pipeline(); SSLEngine engine = SecureChatSslContextFactory.getServerContext().createSSLEngine(); @@ -44,4 +73,50 @@ public class Http2ServerInitializer extends ChannelInitializer { // Negotiates with the browser if HTTP2 or HTTP is going to be used p.addLast("handler", new Http2OrHttpHandler()); } + + /** + * Configure the pipeline for a cleartext upgrade from HTTP to HTTP/2. + */ + private void configureClearText(SocketChannel ch) { + HttpCodec sourceCodec = new HttpCodec(); + HttpServerUpgradeHandler.UpgradeCodec upgradeCodec = + new Http2ServerUpgradeCodec(new HelloWorldHttp2Handler()); + HttpServerUpgradeHandler upgradeHandler = + new HttpServerUpgradeHandler(sourceCodec, Arrays.asList(upgradeCodec), 65536); + + ch.pipeline().addLast(sourceCodec); + ch.pipeline().addLast(upgradeHandler); + ch.pipeline().addLast(new UserEventLogger()); + } + + /** + * Source codec for HTTP cleartext upgrade. + */ + private static final class HttpCodec extends ChannelHandlerAppender implements + HttpServerUpgradeHandler.SourceCodec { + HttpCodec() { + add("httpRequestDecoder", new HttpRequestDecoder()); + add("httpResponseEncoder", new HttpResponseEncoder()); + add("httpRequestAggregator", new HttpObjectAggregator(65536)); + } + + @Override + public void upgradeFrom(ChannelHandlerContext ctx) { + System.out.println("removing HTTP handlers"); + ctx.pipeline().remove("httpRequestAggregator"); + ctx.pipeline().remove("httpResponseEncoder"); + ctx.pipeline().remove("httpRequestDecoder"); + } + } + + /** + * Class that logs any User Events triggered on this channel. + */ + private static class UserEventLogger extends ChannelHandlerAdapter { + @Override + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { + System.out.println("User Event Triggered: " + evt); + super.userEventTriggered(ctx, evt); + } + } }