Merge pull request #2496 from nmittler/http2

Adding a general handler for upgrading protocols
This commit is contained in:
Nathan Mittler 2014-05-21 09:58:21 -07:00
commit 2fb7af0163
21 changed files with 1699 additions and 247 deletions

View File

@ -40,7 +40,8 @@ import java.util.concurrent.atomic.AtomicLong;
*
* @see HttpServerCodec
*/
public final class HttpClientCodec extends ChannelHandlerAppender {
public final class HttpClientCodec extends ChannelHandlerAppender implements
HttpClientUpgradeHandler.SourceCodec {
/** A queue that is used for correlating a request and a response. */
private final Queue<HttpMethod> queue = new ArrayDeque<HttpMethod>();
@ -86,6 +87,16 @@ public final class HttpClientCodec extends ChannelHandlerAppender {
this.failOnMissingResponse = failOnMissingResponse;
}
/**
* Upgrades to another protocol from HTTP. Removes the {@link Decoder} and {@link Encoder} from
* the pipeline.
*/
@Override
public void upgradeFrom(ChannelHandlerContext ctx) {
ctx.pipeline().remove(Decoder.class);
ctx.pipeline().remove(Encoder.class);
}
private Decoder decoder() {
return handlerAt(0);
}

View File

@ -0,0 +1,234 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License, version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package io.netty.handler.codec.http;
import static io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION;
import static io.netty.handler.codec.http.HttpHeaders.Names.UPGRADE;
import static io.netty.handler.codec.http.HttpResponseStatus.SWITCHING_PROTOCOLS;
import static io.netty.util.ReferenceCountUtil.release;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import java.util.Collection;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
/**
* Client-side handler for handling an HTTP upgrade handshake to another protocol. When the first
* HTTP request is sent, this handler will add all appropriate headers to perform an upgrade to the
* new protocol. If the upgrade fails (i.e. response is not 101 Switching Protocols), this handler
* simply removes itself from the pipeline. If the upgrade is successful, upgrades the pipeline to
* the new protocol.
*/
public class HttpClientUpgradeHandler extends HttpObjectAggregator {
/**
* User events that are fired to notify about upgrade status.
*/
public enum UpgradeEvent {
/**
* The Upgrade request was sent to the server.
*/
UPGRADE_ISSUED,
/**
* The Upgrade to the new protocol was successful.
*/
UPGRADE_SUCCESSFUL,
/**
* The Upgrade was unsuccessful due to the server not issuing
* with a 101 Switching Protocols response.
*/
UPGRADE_REJECTED
}
/**
* The source codec that is used in the pipeline initially.
*/
public interface SourceCodec {
/**
* Removes this codec (i.e. all associated handlers) from the pipeline.
*/
void upgradeFrom(ChannelHandlerContext ctx);
}
/**
* A codec that the source can be upgraded to.
*/
public interface UpgradeCodec {
/**
* Returns the name of the protocol supported by this codec, as indicated by the {@link UPGRADE} header.
*/
String protocol();
/**
* Sets any protocol-specific headers required to the upgrade request. Returns the names of
* all headers that were added. These headers will be used to populate the CONNECTION header.
*/
Collection<String> setUpgradeHeaders(ChannelHandlerContext ctx, HttpRequest upgradeRequest);
/**
* Performs an HTTP protocol upgrade from the source codec. This method is responsible for
* adding all handlers required for the new protocol.
*
* @param ctx the context for the current handler.
* @param upgradeResponse the 101 Switching Protocols response that indicates that the server
* has switched to this protocol.
*/
void upgradeTo(ChannelHandlerContext ctx, FullHttpResponse upgradeResponse) throws Exception;
}
private final SourceCodec sourceCodec;
private final UpgradeCodec upgradeCodec;
private boolean upgradeRequested;
/**
* Constructs the client upgrade handler.
*
* @param sourceCodec the codec that is being used initially.
* @param upgradeCodec the codec that the client would like to upgrade to.
* @param maxContentLength the maximum length of the aggregated content.
*/
public HttpClientUpgradeHandler(SourceCodec sourceCodec, UpgradeCodec upgradeCodec,
int maxContentLength) {
super(maxContentLength);
if (sourceCodec == null) {
throw new NullPointerException("sourceCodec");
}
if (upgradeCodec == null) {
throw new NullPointerException("upgradeCodec");
}
this.sourceCodec = sourceCodec;
this.upgradeCodec = upgradeCodec;
}
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
throws Exception {
if (!(msg instanceof HttpRequest)) {
super.write(ctx, msg, promise);
return;
}
if (upgradeRequested) {
promise.setFailure(new IllegalStateException(
"Attempting to write HTTP request with upgrade in progress"));
return;
}
upgradeRequested = true;
setUpgradeRequestHeaders(ctx, (HttpRequest) msg);
// Continue writing the request.
super.write(ctx, msg, promise);
// Notify that the upgrade request was issued.
ctx.fireUserEventTriggered(UpgradeEvent.UPGRADE_ISSUED);
// Now we wait for the next HTTP response to see if we switch protocols.
}
@Override
protected void decode(ChannelHandlerContext ctx, HttpObject msg, List<Object> out)
throws Exception {
FullHttpResponse response = null;
try {
if (!upgradeRequested) {
throw new IllegalStateException("Read HTTP response without requesting protocol switch");
}
if (msg instanceof FullHttpResponse) {
response = (FullHttpResponse) msg;
// Need to retain since the base class will release after returning from this method.
response.retain();
out.add(response);
} else {
// Call the base class to handle the aggregation of the full request.
super.decode(ctx, msg, out);
if (out.isEmpty()) {
// The full request hasn't been created yet, still awaiting more data.
return;
}
assert out.size() == 1;
response = (FullHttpResponse) out.get(0);
}
if (!SWITCHING_PROTOCOLS.equals(response.getStatus())) {
// The server does not support the requested protocol, just remove this handler
// and continue processing HTTP.
// NOTE: not releasing the response since we're letting it propagate to the
// next handler.
ctx.fireUserEventTriggered(UpgradeEvent.UPGRADE_REJECTED);
removeThisHandler(ctx);
return;
}
String upgradeHeader = response.headers().get(UPGRADE);
if (upgradeHeader == null) {
throw new IllegalStateException(
"Switching Protocols response missing UPGRADE header");
}
if (!upgradeCodec.protocol().equalsIgnoreCase(upgradeHeader)) {
throw new IllegalStateException(
"Switching Protocols response with unexpected UPGRADE protocol: "
+ upgradeHeader);
}
// Upgrade to the new protocol.
sourceCodec.upgradeFrom(ctx);
upgradeCodec.upgradeTo(ctx, response);
// Notify that the upgrade to the new protocol completed successfully.
ctx.fireUserEventTriggered(UpgradeEvent.UPGRADE_SUCCESSFUL);
// We switched protocols, so we're done with the upgrade response.
// Release it and clear it from the output.
response.release();
out.clear();
removeThisHandler(ctx);
} catch (Throwable t) {
release(response);
ctx.fireExceptionCaught(t);
removeThisHandler(ctx);
}
}
private void removeThisHandler(ChannelHandlerContext ctx) {
ctx.pipeline().remove(ctx.name());
}
/**
* Adds all upgrade request headers necessary for an upgrade to the supported protocols.
*/
private void setUpgradeRequestHeaders(ChannelHandlerContext ctx, HttpRequest request) {
// Set the UPGRADE header on the request.
request.headers().set(UPGRADE, upgradeCodec.protocol());
// Add all protocol-specific headers to the request.
Set<String> connectionParts = new LinkedHashSet<String>(2);
connectionParts.addAll(upgradeCodec.setUpgradeHeaders(ctx, request));
// Set the CONNECTION header from the set of all protocol-specific headers that were added.
StringBuilder builder = new StringBuilder();
for (String part : connectionParts) {
builder.append(part);
builder.append(",");
}
builder.append(UPGRADE);
request.headers().set(CONNECTION, builder.toString());
}
}

View File

@ -16,15 +16,20 @@
package io.netty.handler.codec.http;
import io.netty.channel.ChannelHandlerAppender;
import io.netty.channel.ChannelHandlerContext;
/**
* A combination of {@link HttpRequestDecoder} and {@link HttpResponseEncoder}
* which enables easier server side HTTP implementation.
* which enables easier server side HTTP implementation. Also supports use with
* a {@link HttpServerUpgradeHandler} to support upgrading to another protocol
* from HTTP.
*
* @see HttpClientCodec
* @see HttpServerUpgradeHandler
*/
public final class HttpServerCodec extends ChannelHandlerAppender {
public final class HttpServerCodec extends ChannelHandlerAppender implements
HttpServerUpgradeHandler.SourceCodec {
/**
* Creates a new instance with the default decoder options
@ -49,4 +54,14 @@ public final class HttpServerCodec extends ChannelHandlerAppender {
super(new HttpRequestDecoder(maxInitialLineLength, maxHeaderSize, maxChunkSize, validateHeaders),
new HttpResponseEncoder());
}
/**
* Upgrades to another protocol from HTTP. Removes the {@link HttpRequestDecoder} and
* {@link HttpResponseEncoder} from the pipeline.
*/
@Override
public void upgradeFrom(ChannelHandlerContext ctx) {
ctx.pipeline().remove(HttpRequestDecoder.class);
ctx.pipeline().remove(HttpResponseEncoder.class);
}
}

View File

@ -0,0 +1,376 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License, version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package io.netty.handler.codec.http;
import static io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION;
import static io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_LENGTH;
import static io.netty.handler.codec.http.HttpHeaders.Names.UPGRADE;
import static io.netty.handler.codec.http.HttpResponseStatus.SWITCHING_PROTOCOLS;
import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
import static java.lang.String.CASE_INSENSITIVE_ORDER;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.ReferenceCounted;
import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
/**
* A server-side handler that receives HTTP requests and optionally performs a protocol switch if
* the requested protocol is supported. Once an upgrade is performed, this handler removes itself
* from the pipeline.
*/
public class HttpServerUpgradeHandler extends HttpObjectAggregator {
/**
* The source codec that is used in the pipeline initially.
*/
public interface SourceCodec {
/**
* Removes this codec (i.e. all associated handlers) from the pipeline.
*/
void upgradeFrom(ChannelHandlerContext ctx);
}
/**
* A codec that the source can be upgraded to.
*/
public interface UpgradeCodec {
/**
* Returns the name of the protocol supported by this codec, as indicated by the {@link UPGRADE} header.
*/
String protocol();
/**
* Gets all protocol-specific headers required by this protocol for a successful upgrade.
* Any supplied header will be required to appear in the {@link CONNECTION} header as well.
*/
Collection<String> requiredUpgradeHeaders();
/**
* Adds any headers to the 101 Switching protocols response that are appropriate for this protocol.
*/
void prepareUpgradeResponse(ChannelHandlerContext ctx, FullHttpRequest upgradeRequest,
FullHttpResponse upgradeResponse);
/**
* Performs an HTTP protocol upgrade from the source codec. This method is responsible for
* adding all handlers required for the new protocol.
*
* @param ctx the context for the current handler.
* @param upgradeRequest the request that triggered the upgrade to this protocol. The
* upgraded protocol is responsible for sending the response.
* @param upgradeResponse a 101 Switching Protocols response that is populated with the
* {@link CONNECTION} and {@link UPGRADE} headers. The protocol is required to
* send this before sending any other frames back to the client. The headers may
* be augmented as necessary by the protocol before sending.
* @return the future for the writing of the upgrade response.
*/
void upgradeTo(ChannelHandlerContext ctx, FullHttpRequest upgradeRequest,
FullHttpResponse upgradeResponse);
}
/**
* User event that is fired to notify about the completion of an HTTP upgrade
* to another protocol. Contains the original upgrade request so that the response
* (if required) can be sent using the new protocol.
*/
public static final class UpgradeEvent implements ReferenceCounted {
private final String protocol;
private final FullHttpRequest upgradeRequest;
private UpgradeEvent(String protocol, FullHttpRequest upgradeRequest) {
this.protocol = protocol;
this.upgradeRequest = upgradeRequest;
}
/**
* The protocol that the channel has been upgraded to.
*/
public String protocol() {
return protocol;
}
/**
* Gets the request that triggered the protocol upgrade.
*/
public FullHttpRequest upgradeRequest() {
return upgradeRequest;
}
@Override
public int refCnt() {
return upgradeRequest.refCnt();
}
@Override
public UpgradeEvent retain() {
upgradeRequest.retain();
return this;
}
@Override
public UpgradeEvent retain(int increment) {
upgradeRequest.retain(increment);
return this;
}
@Override
public UpgradeEvent touch() {
upgradeRequest.touch();
return this;
}
@Override
public UpgradeEvent touch(Object hint) {
upgradeRequest.touch(hint);
return this;
}
@Override
public boolean release() {
return upgradeRequest.release();
}
@Override
public boolean release(int decrement) {
return upgradeRequest.release();
}
@Override
public String toString() {
return "UpgradeEvent [protocol=" + protocol + ", upgradeRequest=" + upgradeRequest
+ "]";
}
}
private final Map<String, UpgradeCodec> upgradeCodecMap;
private final SourceCodec sourceCodec;
private boolean handlingUpgrade;
/**
* Constructs the upgrader with the supported codecs.
*
* @param sourceCodec the codec that is being used initially.
* @param upgradeCodecs the codecs (in order of preference) that this server supports
* upgrading to from the source codec.
* @param maxContentLength the maximum length of the aggregated content.
*/
public HttpServerUpgradeHandler(SourceCodec sourceCodec,
Collection<UpgradeCodec> upgradeCodecs, int maxContentLength) {
super(maxContentLength);
if (sourceCodec == null) {
throw new NullPointerException("sourceCodec");
}
if (upgradeCodecs == null) {
throw new NullPointerException("upgradeCodecs");
}
this.sourceCodec = sourceCodec;
upgradeCodecMap = new LinkedHashMap<String, UpgradeCodec>(upgradeCodecs.size());
for (UpgradeCodec upgradeCodec : upgradeCodecs) {
String name = upgradeCodec.protocol().toUpperCase(Locale.US);
upgradeCodecMap.put(name, upgradeCodec);
}
}
@Override
protected void decode(ChannelHandlerContext ctx, HttpObject msg, List<Object> out)
throws Exception {
// Determine if we're already handling an upgrade request or just starting a new one.
handlingUpgrade = handlingUpgrade || isUpgradeRequest(msg);
if (!handlingUpgrade) {
// Not handling an upgrade request, just pass it to the next handler.
ReferenceCountUtil.retain(msg);
out.add(msg);
return;
}
FullHttpRequest fullRequest = null;
if (msg instanceof FullHttpRequest) {
fullRequest = (FullHttpRequest) msg;
ReferenceCountUtil.retain(msg);
out.add(msg);
} else {
// Call the base class to handle the aggregation of the full request.
super.decode(ctx, msg, out);
if (out.isEmpty()) {
// The full request hasn't been created yet, still awaiting more data.
return;
}
// Finished aggregating the full request, get it from the output list.
assert out.size() == 1;
handlingUpgrade = false;
fullRequest = (FullHttpRequest) out.get(0);
}
if (upgrade(ctx, fullRequest)) {
// The upgrade was successful, remove the message from the output list
// so that it's not propagated to the next handler. This request will
// be propagated as a user event instead.
out.clear();
}
// The upgrade did not succeed, just allow the full request to propagate to the
// next handler.
}
/**
* Determines whether or not the message is an HTTP upgrade request.
*/
private boolean isUpgradeRequest(HttpObject msg) {
return (msg instanceof HttpRequest) && ((HttpRequest) msg).headers().get(UPGRADE) != null;
}
/**
* Attempts to upgrade to the protocol(s) identified by the {@link UPGRADE} header (if provided
* in the request).
*
* @param ctx the context for this handler.
* @param request the HTTP request.
* @return {@code true} if the upgrade occurred, otherwise {@code false}.
*/
private boolean upgrade(final ChannelHandlerContext ctx, final FullHttpRequest request) {
// Select the best protocol based on those requested in the UPGRADE header.
String upgradeHeader = request.headers().get(UPGRADE);
final UpgradeCodec upgradeCodec = selectUpgradeCodec(upgradeHeader);
if (upgradeCodec == null) {
// None of the requested protocols are supported, don't upgrade.
return false;
}
// Make sure the CONNECTION header is present.
String connectionHeader = request.headers().get(CONNECTION);
if (connectionHeader == null) {
return false;
}
// Make sure the CONNECTION header contains UPGRADE as well as all protocol-specific headers.
Collection<String> requiredHeaders = upgradeCodec.requiredUpgradeHeaders();
Set<String> values = splitHeader(connectionHeader);
if (!values.contains(UPGRADE.toString()) || !values.containsAll(requiredHeaders)) {
return false;
}
// Ensure that all required protocol-specific headers are found in the request.
for (String requiredHeader : requiredHeaders) {
if (!request.headers().contains(requiredHeader)) {
return false;
}
}
// Create the user event to be fired once the upgrade completes.
final UpgradeEvent event = new UpgradeEvent(upgradeCodec.protocol(), request);
// Prepare and send the upgrade response. Wait for this write to complete before upgrading,
// since we need the old codec in-place to properly encode the response.
final FullHttpResponse upgradeResponse = createUpgradeResponse(upgradeCodec);
upgradeCodec.prepareUpgradeResponse(ctx, request, upgradeResponse);
ctx.writeAndFlush(upgradeResponse).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
try {
if (future.isSuccess()) {
// Perform the upgrade to the new protocol.
sourceCodec.upgradeFrom(ctx);
upgradeCodec.upgradeTo(ctx, request, upgradeResponse);
// Notify that the upgrade has occurred. Retain the event to offset
// the release() in the finally block.
ctx.fireUserEventTriggered(event.retain());
// Remove this handler from the pipeline.
ctx.pipeline().remove(HttpServerUpgradeHandler.this);
} else {
future.channel().close();
}
} finally {
// Release the event if the upgrade event wasn't fired.
event.release();
}
}
});
return true;
}
/**
* Looks up the most desirable supported upgrade codec from the list of choices in the UPGRADE
* header. If no suitable codec was found, returns {@code null}.
*/
private UpgradeCodec selectUpgradeCodec(String upgradeHeader) {
Set<String> requestedProtocols = splitHeader(upgradeHeader);
// Retain only the protocols that are in the protocol map. Maintain the original insertion
// order into the protocolMap, so that the first one in the remaining set is the most
// desirable protocol for the server.
Set<String> supportedProtocols = new LinkedHashSet<String>(upgradeCodecMap.keySet());
supportedProtocols.retainAll(requestedProtocols);
if (!supportedProtocols.isEmpty()) {
String protocol = supportedProtocols.iterator().next().toUpperCase(Locale.US);
return upgradeCodecMap.get(protocol);
}
return null;
}
/**
* Creates the 101 Switching Protocols response message.
*/
private FullHttpResponse createUpgradeResponse(UpgradeCodec upgradeCodec) {
DefaultFullHttpResponse res = new DefaultFullHttpResponse(HTTP_1_1, SWITCHING_PROTOCOLS);
res.headers().add(CONNECTION, UPGRADE);
res.headers().add(UPGRADE, upgradeCodec.protocol());
res.headers().add(CONTENT_LENGTH, "0");
return res;
}
/**
* Splits a comma-separated header value. The returned set is case-insensitive and contains each
* part with whitespace removed.
*/
private static Set<String> splitHeader(String header) {
StringBuilder builder = new StringBuilder(header.length());
Set<String> protocols = new TreeSet<String>(CASE_INSENSITIVE_ORDER);
for (int i = 0; i < header.length(); ++i) {
char c = header.charAt(i);
if (Character.isWhitespace(c)) {
// Don't include any whitespace.
continue;
}
if (c == ',') {
// Add the string and reset the builder for the next protocol.
protocols.add(builder.toString());
builder.setLength(0);
} else {
builder.append(c);
}
}
// Add the last protocol
if (builder.length() > 0) {
protocols.add(builder.toString());
}
return protocols;
}
}

View File

@ -15,7 +15,9 @@
package io.netty.handler.codec.http2;
import static io.netty.handler.codec.http2.Http2CodecUtil.CONNECTION_STREAM_ID;
import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_PRIORITY_WEIGHT;
import static io.netty.handler.codec.http2.Http2CodecUtil.HTTP_UPGRADE_STREAM_ID;
import static io.netty.handler.codec.http2.Http2CodecUtil.connectionPrefaceBuf;
import static io.netty.handler.codec.http2.Http2CodecUtil.toByteBuf;
import static io.netty.handler.codec.http2.Http2CodecUtil.toHttp2Exception;
@ -61,14 +63,14 @@ public abstract class AbstractHttp2ConnectionHandler extends ByteToMessageDecode
private final Http2Connection connection;
private final Http2InboundFlowController inboundFlow;
private final Http2OutboundFlowController outboundFlow;
// We prefer ArrayDeque to LinkedList because later will produce more GC.
// This initial capacity is plenty for SETTINGS traffic.
private final ArrayDeque<Http2Settings> outstandingLocalSettingsQueue = new ArrayDeque<Http2Settings>(4);
private ByteBuf clientPrefaceString;
private boolean prefaceSent;
private boolean prefaceReceived;
private ChannelHandlerContext ctx;
private ChannelFutureListener closeListener;
// We prefer ArrayDeque to LinkedList because later will produce more GC.
// This initial capacity is plenty for SETTINGS traffic.
private final ArrayDeque<Http2Settings> outstandingLocalSettingsQueue = new ArrayDeque<Http2Settings>(4);
protected AbstractHttp2ConnectionHandler(boolean server) {
this(server, false);
@ -111,6 +113,45 @@ public abstract class AbstractHttp2ConnectionHandler extends ByteToMessageDecode
clientPrefaceString = connection.isServer()? connectionPrefaceBuf() : null;
}
/**
* Handles the client-side (cleartext) upgrade from HTTP to HTTP/2. Reserves local stream 1 for
* the HTTP/2 response.
*/
public final void onHttpClientUpgrade() throws Http2Exception {
if (connection.isServer()) {
throw protocolError("Client-side HTTP upgrade requested for a server");
}
if (prefaceSent || prefaceReceived) {
throw protocolError("HTTP upgrade must occur before HTTP/2 preface is sent or received");
}
// Create a local stream used for the HTTP cleartext upgrade.
createLocalStream(HTTP_UPGRADE_STREAM_ID, true, CONNECTION_STREAM_ID,
DEFAULT_PRIORITY_WEIGHT, false);
}
/**
* Handles the server-side (cleartext) upgrade from HTTP to HTTP/2.
*
* @param settings the settings for the remote endpoint.
*/
public final void onHttpServerUpgrade(Http2Settings settings)
throws Http2Exception {
if (!connection.isServer()) {
throw protocolError("Server-side HTTP upgrade requested for a client");
}
if (prefaceSent || prefaceReceived) {
throw protocolError("HTTP upgrade must occur before HTTP/2 preface is sent or received");
}
// Apply the settings but no ACK is necessary.
applyRemoteSettings(settings);
// Create a stream in the half-closed state.
createRemoteStream(HTTP_UPGRADE_STREAM_ID, true, CONNECTION_STREAM_ID,
DEFAULT_PRIORITY_WEIGHT, false);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// The channel just became active - send the connection preface to the remote
@ -186,6 +227,13 @@ public abstract class AbstractHttp2ConnectionHandler extends ByteToMessageDecode
return settings;
}
/**
* Gets the next stream ID that can be created by the local endpoint.
*/
protected int nextStreamId() {
return connection.local().nextStreamId();
}
protected ChannelFuture writeData(final ChannelHandlerContext ctx,
final ChannelPromise promise, int streamId, final ByteBuf data, int padding,
boolean endStream, boolean endSegment, boolean compressed) {
@ -227,14 +275,8 @@ public abstract class AbstractHttp2ConnectionHandler extends ByteToMessageDecode
Http2Stream stream = connection.stream(streamId);
if (stream == null) {
// Creates a new locally-initiated stream.
stream = connection.local().createStream(streamId, endStream);
// Allow bi-directional traffic.
inboundFlow.addStream(streamId);
if (!endStream) {
outboundFlow.addStream(streamId, streamDependency, weight, exclusive);
}
// Create a new locally-initiated stream.
stream = createLocalStream(streamId, endStream, streamDependency, weight, exclusive);
} else {
// An existing stream...
if (stream.state() == RESERVED_LOCAL) {
@ -308,11 +350,9 @@ public abstract class AbstractHttp2ConnectionHandler extends ByteToMessageDecode
throw protocolError("Sending settings after connection going away.");
}
if (settings.hasPushEnabled()) {
if (connection.isServer()) {
if (settings.hasPushEnabled() && connection.isServer()) {
throw protocolError("Server sending SETTINGS frame with ENABLE_PUSH specified");
}
}
return frameWriter.writeSettings(ctx, promise, settings);
} catch (Http2Exception e) {
@ -569,6 +609,88 @@ public abstract class AbstractHttp2ConnectionHandler extends ByteToMessageDecode
ChannelFutureListener.CLOSE_ON_FAILURE);
}
/**
* Applies settings sent from the local endpoint.
*/
private void applyLocalSettings(Http2Settings settings) throws Http2Exception {
if (settings.hasPushEnabled()) {
if (connection.isServer()) {
throw protocolError("Server sending SETTINGS frame with ENABLE_PUSH specified");
}
connection.local().allowPushTo(settings.pushEnabled());
}
if (settings.hasAllowCompressedData()) {
connection.local().allowCompressedData(settings.allowCompressedData());
}
if (settings.hasMaxConcurrentStreams()) {
connection.remote().maxStreams(settings.maxConcurrentStreams());
}
if (settings.hasMaxHeaderTableSize()) {
frameReader.maxHeaderTableSize(settings.maxHeaderTableSize());
}
if (settings.hasInitialWindowSize()) {
inboundFlow.initialInboundWindowSize(settings.initialWindowSize());
}
}
/**
* Applies settings received from the remote endpoint.
*/
private void applyRemoteSettings(Http2Settings settings) throws Http2Exception {
if (settings.hasPushEnabled()) {
if (!connection.isServer()) {
throw protocolError("Client received SETTINGS frame with ENABLE_PUSH specified");
}
connection.remote().allowPushTo(settings.pushEnabled());
}
if (settings.hasAllowCompressedData()) {
connection.remote().allowCompressedData(settings.allowCompressedData());
}
if (settings.hasMaxConcurrentStreams()) {
connection.local().maxStreams(settings.maxConcurrentStreams());
}
if (settings.hasMaxHeaderTableSize()) {
frameWriter.maxHeaderTableSize(settings.maxHeaderTableSize());
}
if (settings.hasInitialWindowSize()) {
outboundFlow.initialOutboundWindowSize(settings.initialWindowSize());
}
}
/**
* Creates a new stream initiated by the local endpoint.
*/
private Http2Stream createLocalStream(int streamId, boolean halfClosed, int streamDependency,
short weight, boolean exclusive) throws Http2Exception {
Http2Stream stream = connection.local().createStream(streamId, halfClosed);
inboundFlow.addStream(streamId);
if (!halfClosed) {
outboundFlow.addStream(streamId, streamDependency, weight, exclusive);
}
return stream;
}
/**
* Creates a new stream initiated by the remote endpoint.
*/
private Http2Stream createRemoteStream(int streamId, boolean halfClosed, int streamDependency,
short weight, boolean exclusive) throws Http2Exception {
Http2Stream stream = connection.remote().createStream(streamId, halfClosed);
outboundFlow.addStream(streamId, streamDependency, weight, exclusive);
if (!halfClosed) {
inboundFlow.addStream(streamId);
}
return stream;
}
/**
* Handles all inbound frames from the network.
*/
@ -630,14 +752,7 @@ public abstract class AbstractHttp2ConnectionHandler extends ByteToMessageDecode
Http2Stream stream = connection.stream(streamId);
if (stream == null) {
// Create the new stream.
connection.remote().createStream(streamId, endStream);
// Allow bi-directional traffic.
outboundFlow.addStream(streamId, streamDependency, weight, exclusive);
if (!endStream) {
inboundFlow.addStream(streamId);
}
createRemoteStream(streamId, endStream, streamDependency, weight, exclusive);
} else {
if (stream.state() == RESERVED_REMOTE) {
// Received headers for a reserved push stream ... open it for push to the
@ -717,25 +832,7 @@ public abstract class AbstractHttp2ConnectionHandler extends ByteToMessageDecode
Http2Settings settings = outstandingLocalSettingsQueue.poll();
if (settings != null) {
if (settings.hasPushEnabled()) {
connection.local().allowPushTo(settings.pushEnabled());
}
if (settings.hasAllowCompressedData()) {
connection.local().allowCompressedData(settings.allowCompressedData());
}
if (settings.hasMaxConcurrentStreams()) {
connection.remote().maxStreams(settings.maxConcurrentStreams());
}
if (settings.hasMaxHeaderTableSize()) {
frameReader.maxHeaderTableSize(settings.maxHeaderTableSize());
}
if (settings.hasInitialWindowSize()) {
inboundFlow.initialInboundWindowSize(settings.initialWindowSize());
}
applyLocalSettings(settings);
}
AbstractHttp2ConnectionHandler.this.onSettingsAckRead(ctx);
@ -744,28 +841,7 @@ public abstract class AbstractHttp2ConnectionHandler extends ByteToMessageDecode
@Override
public void onSettingsRead(ChannelHandlerContext ctx, Http2Settings settings)
throws Http2Exception {
if (settings.hasPushEnabled()) {
if (!connection.isServer()) {
throw protocolError("Client received SETTINGS frame with ENABLE_PUSH specified");
}
connection.remote().allowPushTo(settings.pushEnabled());
}
if (settings.hasAllowCompressedData()) {
connection.remote().allowCompressedData(settings.allowCompressedData());
}
if (settings.hasMaxConcurrentStreams()) {
connection.local().maxStreams(settings.maxConcurrentStreams());
}
if (settings.hasMaxHeaderTableSize()) {
frameWriter.maxHeaderTableSize(settings.maxHeaderTableSize());
}
if (settings.hasInitialWindowSize()) {
outboundFlow.initialOutboundWindowSize(settings.initialWindowSize());
}
applyRemoteSettings(settings);
// Acknowledge receipt of the settings.
frameWriter.writeSettingsAck(ctx, ctx.newPromise());

View File

@ -235,6 +235,13 @@ public class DefaultHttp2Connection implements Http2Connection {
maxStreams = Integer.MAX_VALUE;
}
@Override
public int nextStreamId() {
// For manually created client-side streams, 1 is reserved for HTTP upgrade, so
// start at 3.
return nextStreamId > 1? nextStreamId : nextStreamId + 2;
}
@Override
public DefaultStream createStream(int streamId, boolean halfClosed) throws Http2Exception {
checkNewStreamAllowed(streamId);

View File

@ -24,12 +24,9 @@ import static io.netty.handler.codec.http2.Http2CodecUtil.MAX_UNSIGNED_SHORT;
import static io.netty.handler.codec.http2.Http2CodecUtil.MAX_WEIGHT;
import static io.netty.handler.codec.http2.Http2CodecUtil.MIN_WEIGHT;
import static io.netty.handler.codec.http2.Http2CodecUtil.PRIORITY_ENTRY_LENGTH;
import static io.netty.handler.codec.http2.Http2CodecUtil.SETTINGS_COMPRESS_DATA;
import static io.netty.handler.codec.http2.Http2CodecUtil.SETTINGS_ENABLE_PUSH;
import static io.netty.handler.codec.http2.Http2CodecUtil.SETTINGS_HEADER_TABLE_SIZE;
import static io.netty.handler.codec.http2.Http2CodecUtil.SETTINGS_INITIAL_WINDOW_SIZE;
import static io.netty.handler.codec.http2.Http2CodecUtil.SETTINGS_MAX_CONCURRENT_STREAMS;
import static io.netty.handler.codec.http2.Http2CodecUtil.SETTING_ENTRY_LENGTH;
import static io.netty.handler.codec.http2.Http2CodecUtil.calcSettingsPayloadLength;
import static io.netty.handler.codec.http2.Http2CodecUtil.writeFrameHeader;
import static io.netty.handler.codec.http2.Http2CodecUtil.writeSettingsPayload;
import static io.netty.handler.codec.http2.Http2CodecUtil.writeUnsignedInt;
import static io.netty.handler.codec.http2.Http2CodecUtil.writeUnsignedShort;
import static io.netty.util.CharsetUtil.UTF_8;
@ -160,38 +157,10 @@ public class DefaultHttp2FrameWriter implements Http2FrameWriter {
public ChannelFuture writeSettings(ChannelHandlerContext ctx, ChannelPromise promise,
Http2Settings settings) {
try {
int numFields = 0;
numFields += settings.hasAllowCompressedData() ? 1 : 0;
numFields += settings.hasMaxHeaderTableSize() ? 1 : 0;
numFields += settings.hasInitialWindowSize() ? 1 : 0;
numFields += settings.hasMaxConcurrentStreams() ? 1 : 0;
numFields += settings.hasPushEnabled() ? 1 : 0;
int payloadLength = SETTING_ENTRY_LENGTH * numFields;
int payloadLength = calcSettingsPayloadLength(settings);
ByteBuf frame = ctx.alloc().buffer(FRAME_HEADER_LENGTH + payloadLength);
writeFrameHeader(frame, payloadLength, Http2FrameType.SETTINGS, Http2Flags.EMPTY, 0);
if (settings.hasAllowCompressedData()) {
frame.writeByte(SETTINGS_COMPRESS_DATA);
writeUnsignedInt(settings.allowCompressedData() ? 1L : 0L, frame);
}
if (settings.hasMaxHeaderTableSize()) {
frame.writeByte(SETTINGS_HEADER_TABLE_SIZE);
writeUnsignedInt(settings.maxHeaderTableSize(), frame);
}
if (settings.hasInitialWindowSize()) {
frame.writeByte(SETTINGS_INITIAL_WINDOW_SIZE);
writeUnsignedInt(settings.initialWindowSize(), frame);
}
if (settings.hasMaxConcurrentStreams()) {
frame.writeByte(SETTINGS_MAX_CONCURRENT_STREAMS);
writeUnsignedInt(settings.maxConcurrentStreams(), frame);
}
if (settings.hasPushEnabled()) {
// Only write the enable push flag from client endpoints.
frame.writeByte(SETTINGS_ENABLE_PUSH);
writeUnsignedInt(settings.pushEnabled() ? 1L : 0L, frame);
}
writeSettingsPayload(settings, frame);
return ctx.writeAndFlush(frame, promise);
} catch (RuntimeException e) {
return promise.setFailure(e);
@ -372,15 +341,6 @@ public class DefaultHttp2FrameWriter implements Http2FrameWriter {
}
}
private static void writeFrameHeader(ByteBuf out, int payloadLength, Http2FrameType type,
Http2Flags flags, int streamId) {
out.ensureWritable(FRAME_HEADER_LENGTH + payloadLength);
out.writeShort(payloadLength);
out.writeByte(type.typeCode());
out.writeByte(flags.value());
out.writeInt(streamId);
}
private ChannelFuture writeHeadersInternal(ChannelHandlerContext ctx, ChannelPromise promise,
int streamId, Http2Headers headers, int padding, boolean endStream, boolean endSegment,
boolean hasPriority, int streamDependency, short weight, boolean exclusive) {

View File

@ -0,0 +1,120 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License, version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package io.netty.handler.codec.http2;
import static io.netty.handler.codec.base64.Base64Dialect.URL_SAFE;
import static io.netty.handler.codec.http2.Http2CodecUtil.HTTP_UPGRADE_PROTOCOL_NAME;
import static io.netty.handler.codec.http2.Http2CodecUtil.HTTP_UPGRADE_SETTINGS_HEADER;
import static io.netty.handler.codec.http2.Http2CodecUtil.calcSettingsPayloadLength;
import static io.netty.handler.codec.http2.Http2CodecUtil.writeSettingsPayload;
import static io.netty.util.CharsetUtil.UTF_8;
import static io.netty.util.ReferenceCountUtil.release;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.base64.Base64;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpClientUpgradeHandler;
import io.netty.handler.codec.http.HttpRequest;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
/**
* Client-side cleartext upgrade codec from HTTP to HTTP/2.
*/
public class Http2ClientUpgradeCodec implements HttpClientUpgradeHandler.UpgradeCodec {
private static final List<String> UPGRADE_HEADERS = Collections.unmodifiableList(Arrays
.asList(HTTP_UPGRADE_SETTINGS_HEADER));
private final String handlerName;
private final AbstractHttp2ConnectionHandler connectionHandler;
/**
* Creates the codec using a default name for the connection handler when adding to the
* pipeline.
*
* @param connectionHandler the HTTP/2 connection handler.
*/
public Http2ClientUpgradeCodec(AbstractHttp2ConnectionHandler connectionHandler) {
this("http2ConnectionHandler", connectionHandler);
}
/**
* Creates the codec providing an upgrade to the given handler for HTTP/2.
*
* @param handlerName the name of the HTTP/2 connection handler to be used in the pipeline.
* @param connectionHandler the HTTP/2 connection handler.
*/
public Http2ClientUpgradeCodec(String handlerName,
AbstractHttp2ConnectionHandler connectionHandler) {
if (handlerName == null) {
throw new NullPointerException("handlerName");
}
if (connectionHandler == null) {
throw new NullPointerException("connectionHandler");
}
this.handlerName = handlerName;
this.connectionHandler = connectionHandler;
}
@Override
public String protocol() {
return HTTP_UPGRADE_PROTOCOL_NAME;
}
@Override
public Collection<String> setUpgradeHeaders(ChannelHandlerContext ctx,
HttpRequest upgradeRequest) {
String settingsValue = getSettingsHeaderValue(ctx);
upgradeRequest.headers().set(HTTP_UPGRADE_SETTINGS_HEADER, settingsValue);
return UPGRADE_HEADERS;
}
@Override
public void upgradeTo(ChannelHandlerContext ctx, FullHttpResponse upgradeResponse)
throws Exception {
// Reserve local stream 1 for the response.
connectionHandler.onHttpClientUpgrade();
// Add the handler to the pipeline.
ctx.pipeline().addAfter(ctx.name(), handlerName, connectionHandler);
}
/**
* Converts the current settings for the handler to the Base64-encoded representation used in
* the HTTP2-Settings upgrade header.
*/
private String getSettingsHeaderValue(ChannelHandlerContext ctx) {
ByteBuf buf = null;
ByteBuf encodedBuf = null;
try {
// Get the local settings for the handler.
Http2Settings settings = connectionHandler.settings();
// Serialize the payload of the SETTINGS frame.
buf = ctx.alloc().buffer(calcSettingsPayloadLength(settings));
writeSettingsPayload(settings, buf);
// Base64 encode the payload and then convert to a string for the header.
encodedBuf = Base64.encode(buf, URL_SAFE);
return encodedBuf.toString(UTF_8);
} finally {
release(buf);
release(encodedBuf);
}
}
}

View File

@ -31,6 +31,10 @@ public final class Http2CodecUtil {
private static final byte[] EMPTY_PING = new byte[8];
public static final int CONNECTION_STREAM_ID = 0;
public static final int HTTP_UPGRADE_STREAM_ID = 1;
public static final String HTTP_UPGRADE_SETTINGS_HEADER = "HTTP2-Settings";
public static final String HTTP_UPGRADE_PROTOCOL_NAME = "h2c-12";
public static final int MAX_FRAME_PAYLOAD_LENGTH = 16383;
public static final int PING_FRAME_PAYLOAD_LENGTH = 8;
public static final short MAX_UNSIGNED_BYTE = 0xFF;
@ -126,6 +130,61 @@ public final class Http2CodecUtil {
out.writeByte((int) ((value & 0xFF)));
}
/**
* Writes an HTTP/2 frame header to the output buffer.
*/
public static void writeFrameHeader(ByteBuf out, int payloadLength, Http2FrameType type,
Http2Flags flags, int streamId) {
out.ensureWritable(FRAME_HEADER_LENGTH + payloadLength);
out.writeShort(payloadLength);
out.writeByte(type.typeCode());
out.writeByte(flags.value());
out.writeInt(streamId);
}
/**
* Calculates the HTTP/2 SETTINGS payload length for the serialized representation
* of the given settings.
*/
public static int calcSettingsPayloadLength(Http2Settings settings) {
int numFields = 0;
numFields += settings.hasAllowCompressedData() ? 1 : 0;
numFields += settings.hasMaxHeaderTableSize() ? 1 : 0;
numFields += settings.hasInitialWindowSize() ? 1 : 0;
numFields += settings.hasMaxConcurrentStreams() ? 1 : 0;
numFields += settings.hasPushEnabled() ? 1 : 0;
return SETTING_ENTRY_LENGTH * numFields;
}
/**
* Serializes the settings to the output buffer in the format of an HTTP/2 SETTINGS frame
* payload.
*/
public static void writeSettingsPayload(Http2Settings settings, ByteBuf out) {
if (settings.hasAllowCompressedData()) {
out.writeByte(SETTINGS_COMPRESS_DATA);
writeUnsignedInt(settings.allowCompressedData() ? 1L : 0L, out);
}
if (settings.hasMaxHeaderTableSize()) {
out.writeByte(SETTINGS_HEADER_TABLE_SIZE);
writeUnsignedInt(settings.maxHeaderTableSize(), out);
}
if (settings.hasInitialWindowSize()) {
out.writeByte(SETTINGS_INITIAL_WINDOW_SIZE);
writeUnsignedInt(settings.initialWindowSize(), out);
}
if (settings.hasMaxConcurrentStreams()) {
out.writeByte(SETTINGS_MAX_CONCURRENT_STREAMS);
writeUnsignedInt(settings.maxConcurrentStreams(), out);
}
if (settings.hasPushEnabled()) {
// Only write the enable push flag from client endpoints.
out.writeByte(SETTINGS_ENABLE_PUSH);
writeUnsignedInt(settings.pushEnabled() ? 1L : 0L, out);
}
}
/**
* Fails the given promise with the cause and then re-throws the cause.
*/

View File

@ -24,6 +24,11 @@ public interface Http2Connection {
*/
interface Endpoint {
/**
* Returns the next valid streamId for this endpoint.
*/
int nextStreamId();
/**
* Creates a stream initiated by this endpoint. This could fail for the following reasons:
* <p/>

View File

@ -0,0 +1,91 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License, version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package io.netty.handler.codec.http2;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
/**
* Convenience class that provides no-op implementations for all methods of {@link Http2FrameObserver}.
*/
public class Http2FrameAdapter implements Http2FrameObserver {
@Override
public void onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding,
boolean endOfStream, boolean endOfSegment, boolean compressed) throws Http2Exception {
}
@Override
public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers,
int padding, boolean endStream, boolean endSegment) throws Http2Exception {
}
@Override
public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers,
int streamDependency, short weight, boolean exclusive, int padding, boolean endStream,
boolean endSegment) throws Http2Exception {
}
@Override
public void onPriorityRead(ChannelHandlerContext ctx, int streamId, int streamDependency,
short weight, boolean exclusive) throws Http2Exception {
}
@Override
public void onRstStreamRead(ChannelHandlerContext ctx, int streamId, long errorCode)
throws Http2Exception {
}
@Override
public void onSettingsAckRead(ChannelHandlerContext ctx) throws Http2Exception {
}
@Override
public void onSettingsRead(ChannelHandlerContext ctx, Http2Settings settings)
throws Http2Exception {
}
@Override
public void onPingRead(ChannelHandlerContext ctx, ByteBuf data) throws Http2Exception {
}
@Override
public void onPingAckRead(ChannelHandlerContext ctx, ByteBuf data) throws Http2Exception {
}
@Override
public void onPushPromiseRead(ChannelHandlerContext ctx, int streamId, int promisedStreamId,
Http2Headers headers, int padding) throws Http2Exception {
}
@Override
public void onGoAwayRead(ChannelHandlerContext ctx, int lastStreamId, long errorCode,
ByteBuf debugData) throws Http2Exception {
}
@Override
public void onWindowUpdateRead(ChannelHandlerContext ctx, int streamId, int windowSizeIncrement)
throws Http2Exception {
}
@Override
public void onAltSvcRead(ChannelHandlerContext ctx, int streamId, long maxAge, int port,
ByteBuf protocolId, String host, String origin) throws Http2Exception {
}
@Override
public void onBlockedRead(ChannelHandlerContext ctx, int streamId) throws Http2Exception {
}
}

View File

@ -58,7 +58,7 @@ public class Http2FrameLogger extends ChannelHandlerAdapter {
public void logHeaders(Direction direction, int streamId, Http2Headers headers, int padding,
boolean endStream, boolean endSegment) {
log(direction, "HEADERS: steramId:%d, headers=%s, padding=%d, endStream=%b, endSegment=%b",
log(direction, "HEADERS: streamId:%d, headers=%s, padding=%d, endStream=%b, endSegment=%b",
streamId, headers, padding, endStream, endSegment);
}
@ -66,7 +66,7 @@ public class Http2FrameLogger extends ChannelHandlerAdapter {
int streamDependency, short weight, boolean exclusive, int padding, boolean endStream,
boolean endSegment) {
log(direction,
"HEADERS: steramId:%d, headers=%s, streamDependency=%d, weight=%d, exclusive=%b, "
"HEADERS: streamId:%d, headers=%s, streamDependency=%d, weight=%d, exclusive=%b, "
+ "padding=%d, endStream=%b, endSegment=%b", streamId, headers,
streamDependency, weight, exclusive, padding, endStream, endSegment);
}
@ -109,7 +109,7 @@ public class Http2FrameLogger extends ChannelHandlerAdapter {
}
public void logWindowsUpdate(Direction direction, int streamId, int windowSizeIncrement) {
log(direction, "WINDOW_UPDATE: stream=%d, windowSizeIncrement=%d", streamId,
log(direction, "WINDOW_UPDATE: streamId=%d, windowSizeIncrement=%d", streamId,
windowSizeIncrement);
}

View File

@ -0,0 +1,164 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License, version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package io.netty.handler.codec.http2;
import static io.netty.handler.codec.base64.Base64Dialect.URL_SAFE;
import static io.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST;
import static io.netty.handler.codec.http2.Http2CodecUtil.HTTP_UPGRADE_PROTOCOL_NAME;
import static io.netty.handler.codec.http2.Http2CodecUtil.HTTP_UPGRADE_SETTINGS_HEADER;
import static io.netty.handler.codec.http2.Http2CodecUtil.writeFrameHeader;
import static io.netty.handler.codec.http2.Http2Flags.EMPTY;
import static io.netty.handler.codec.http2.Http2FrameType.SETTINGS;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.base64.Base64;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpServerUpgradeHandler;
import io.netty.util.CharsetUtil;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
/**
* Server-side codec for performing a cleartext upgrade from HTTP/1.x to HTTP/2.
*/
public class Http2ServerUpgradeCodec implements HttpServerUpgradeHandler.UpgradeCodec {
private static final List<String> REQUIRED_UPGRADE_HEADERS = Collections
.unmodifiableList(Arrays.asList(HTTP_UPGRADE_SETTINGS_HEADER));
private final String handlerName;
private final AbstractHttp2ConnectionHandler connectionHandler;
private final Http2FrameReader frameReader;
private Http2Settings settings;
/**
* Creates the codec using a default name for the connection handler when adding to the
* pipeline.
*
* @param connectionHandler the HTTP/2 connection handler.
*/
public Http2ServerUpgradeCodec(AbstractHttp2ConnectionHandler connectionHandler) {
this("http2ConnectionHandler", connectionHandler);
}
/**
* Creates the codec providing an upgrade to the given handler for HTTP/2.
*
* @param handlerName the name of the HTTP/2 connection handler to be used in the pipeline.
* @param connectionHandler the HTTP/2 connection handler.
*/
public Http2ServerUpgradeCodec(String handlerName,
AbstractHttp2ConnectionHandler connectionHandler) {
if (handlerName == null) {
throw new NullPointerException("handlerName");
}
if (connectionHandler == null) {
throw new NullPointerException("connectionHandler");
}
this.handlerName = handlerName;
this.connectionHandler = connectionHandler;
this.frameReader = new DefaultHttp2FrameReader();
}
@Override
public String protocol() {
return HTTP_UPGRADE_PROTOCOL_NAME;
}
@Override
public Collection<String> requiredUpgradeHeaders() {
return REQUIRED_UPGRADE_HEADERS;
}
@Override
public void prepareUpgradeResponse(ChannelHandlerContext ctx, FullHttpRequest upgradeRequest,
FullHttpResponse upgradeResponse) {
try {
// Decode the HTTP2-Settings header and set the settings on the handler to make
// sure everything is fine with the request.
String settingsHeader = upgradeRequest.headers().get(HTTP_UPGRADE_SETTINGS_HEADER);
settings = decodeSettingsHeader(ctx, settingsHeader);
connectionHandler.onHttpServerUpgrade(settings);
// Everything looks good, no need to modify the response.
} catch (Throwable e) {
// Send a failed response back to the client.
upgradeResponse.setStatus(BAD_REQUEST);
upgradeResponse.headers().clear();
}
}
@Override
public void upgradeTo(final ChannelHandlerContext ctx, FullHttpRequest upgradeRequest,
FullHttpResponse upgradeResponse) {
// Add the HTTP/2 connection handler to the pipeline immediately following the current
// handler.
ctx.pipeline().addAfter(ctx.name(), handlerName, connectionHandler);
}
/**
* Decodes the settings header and returns a {@link Http2Settings} object.
*/
private Http2Settings decodeSettingsHeader(ChannelHandlerContext ctx, String settingsHeader)
throws Http2Exception {
ByteBuf header = Unpooled.wrappedBuffer(settingsHeader.getBytes(CharsetUtil.UTF_8));
try {
// Decode the SETTINGS payload.
ByteBuf payload = Base64.decode(header, URL_SAFE);
// Create an HTTP/2 frame for the settings.
ByteBuf frame = createSettingsFrame(ctx, payload);
// Decode the SETTINGS frame and return the settings object.
return decodeSettings(ctx, frame);
} finally {
header.release();
}
}
/**
* Decodes the settings frame and returns the settings.
*/
private Http2Settings decodeSettings(ChannelHandlerContext ctx, ByteBuf frame) throws Http2Exception {
try {
final Http2Settings decodedSettings = new Http2Settings();
frameReader.readFrame(ctx, frame, new Http2FrameAdapter() {
@Override
public void onSettingsRead(ChannelHandlerContext ctx, Http2Settings settings)
throws Http2Exception {
decodedSettings.copy(settings);
}
});
return decodedSettings;
} finally {
frame.release();
}
}
/**
* Creates an HTTP2-Settings header with the given payload. The payload buffer is released.
*/
private static ByteBuf createSettingsFrame(ChannelHandlerContext ctx, ByteBuf payload) {
ByteBuf frame =
ctx.alloc().buffer(Http2CodecUtil.FRAME_HEADER_LENGTH + payload.readableBytes());
writeFrameHeader(frame, payload.readableBytes(), SETTINGS, EMPTY, 0);
frame.writeBytes(payload);
payload.release();
return frame;
}
}

View File

@ -180,6 +180,22 @@ public class Http2Settings {
return this;
}
/**
* Overwrites this settings object with the values of the given settings.
*
* @param source the source that will overwrite the current settings.
* @return this object.
*/
public Http2Settings copy(Http2Settings source) {
this.enabled = source.enabled;
this.allowCompressedData = source.allowCompressedData;
this.initialWindowSize = source.initialWindowSize;
this.maxConcurrentStreams = source.maxConcurrentStreams;
this.maxHeaderTableSize = source.maxHeaderTableSize;
this.pushEnabled = source.pushEnabled;
return this;
}
@Override
public int hashCode() {
final int prime = 31;

View File

@ -0,0 +1,99 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License, version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package io.netty.example.http2;
import java.util.Locale;
/**
* Utility methods used by the example client and server.
*/
public final class Http2ExampleUtil {
/**
* Response header sent in response to the http->http2 cleartext upgrade request.
*/
public static final String UPGRADE_RESPONSE_HEADER = "Http-To-Http2-Upgrade";
public static final String STANDARD_HOST = "localhost";
public static final int STANDARD_HTTP_PORT = 8080;
public static final int STANDARD_HTTPS_PORT = 8443;
public static final String PORT_ARG = "-port=";
public static final String SSL_ARG = "-ssl=";
public static final String HOST_ARG = "-host=";
/**
* The configuration for the example client/server endpoints.
*/
public static final class EndpointConfig {
private final boolean ssl;
private final String host;
private final int port;
public EndpointConfig(boolean ssl, String host, int port) {
this.ssl = ssl;
this.host = host;
this.port = port;
}
public boolean isSsl() {
return ssl;
}
public String host() {
return host;
}
public int port() {
return port;
}
@Override
public String toString() {
return "EndpointConfig [ssl=" + ssl + ", host=" + host + ", port=" + port + "]";
}
}
/**
* Parse the command-line arguments to determine the configuration for the endpoint.
*/
public static EndpointConfig parseEndpointConfig(String[] args) {
boolean ssl = false;
int port = STANDARD_HTTP_PORT;
String host = STANDARD_HOST;
boolean portSpecified = false;
for (String arg : args) {
arg = arg.trim().toLowerCase(Locale.US);
if (arg.startsWith(PORT_ARG)) {
String value = arg.substring(PORT_ARG.length());
port = Integer.parseInt(value);
portSpecified = true;
} else if (arg.startsWith(SSL_ARG)) {
String value = arg.substring(SSL_ARG.length());
ssl = Boolean.parseBoolean(value);
if (!portSpecified) {
// Use the appropriate default.
port = ssl ? STANDARD_HTTPS_PORT : STANDARD_HTTP_PORT;
}
} else if (arg.startsWith(HOST_ARG)) {
host = arg.substring(HOST_ARG.length());
}
}
return new EndpointConfig(ssl, host, port);
}
private Http2ExampleUtil() {
}
}

View File

@ -14,54 +14,62 @@
*/
package io.netty.example.http2.client;
import static io.netty.example.http2.Http2ExampleUtil.parseEndpointConfig;
import static io.netty.handler.codec.http.HttpHeaders.Names.HOST;
import static io.netty.handler.codec.http.HttpMethod.GET;
import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http2.DefaultHttp2Headers;
import io.netty.handler.codec.http2.Http2Exception;
import io.netty.handler.codec.http2.Http2Headers;
import io.netty.example.http2.Http2ExampleUtil.EndpointConfig;
import io.netty.handler.codec.http.DefaultFullHttpRequest;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http2.Http2OrHttpChooser.SelectedProtocol;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
import javax.net.ssl.SSLException;
import java.net.InetSocketAddress;
import java.util.concurrent.BlockingQueue;
import static java.util.concurrent.TimeUnit.*;
import javax.net.ssl.SSLException;
/**
* An HTTP2 client that allows you to send HTTP2 frames to a server. Inbound and outbound frames are
* logged.
* logged. When run from the command-line, sends a single HEADERS frame to the server and gets back
* a "Hello World" response.
* <p>
* To client accepts command-line arguments for {@code -host=<host/ip>}
* <i>(default="localhost")</i>, {@code -port=<port number>} <i>(default: http=8080,
* https=8443)</i>, and {@code -ssl=<true/false>} <i>(default=false)</i>.
*/
public class Http2Client {
private final SslContext sslCtx;
private final String host;
private final int port;
private final Http2ClientConnectionHandler http2ConnectionHandler;
private final EndpointConfig config;
private Http2ClientConnectionHandler http2ConnectionHandler;
private Channel channel;
private EventLoopGroup workerGroup;
public Http2Client(String host, int port) throws SSLException {
public Http2Client(EndpointConfig config) throws SSLException {
this.config = config;
if (config.isSsl()) {
sslCtx = SslContext.newClientContext(
null, InsecureTrustManagerFactory.INSTANCE, null,
SslContext.newApplicationProtocolSelector(
SelectedProtocol.HTTP_2.protocolName(),
SelectedProtocol.HTTP_1_1.protocolName()),
0, 0);
this.host = host;
this.port = port;
http2ConnectionHandler = new Http2ClientConnectionHandler();
} else {
sslCtx = null;
}
}
/**
* Starts the client and waits for the HTTP/2 upgrade to occur.
*/
public void start() throws Exception {
if (channel != null) {
System.out.println("Already running!");
@ -74,15 +82,40 @@ public class Http2Client {
b.group(workerGroup);
b.channel(NioSocketChannel.class);
b.option(ChannelOption.SO_KEEPALIVE, true);
b.remoteAddress(new InetSocketAddress(host, port));
b.handler(new Http2ClientInitializer(sslCtx, http2ConnectionHandler));
b.remoteAddress(new InetSocketAddress(config.host(), config.port()));
Http2ClientInitializer initializer = new Http2ClientInitializer(sslCtx);
b.handler(initializer);
// Start the client.
channel = b.connect().syncUninterruptibly().channel();
System.out.println("Connected to [" + config.host() + ':' + config.port() + ']');
// Wait for the HTTP/2 upgrade to occur.
http2ConnectionHandler = initializer.connectionHandler();
http2ConnectionHandler.awaitInitialization();
System.out.println("Connected to [" + host + ':' + port + ']');
}
/**
* Sends the given request to the server.
*/
public void sendRequest(FullHttpRequest request) throws Exception {
ChannelFuture requestFuture = channel.writeAndFlush(request).sync();
System.out.println("Back from sending headers...");
if (!requestFuture.isSuccess()) {
throw new RuntimeException(requestFuture.cause());
}
}
/**
* Waits for the full response to be received.
*/
public void awaitResponse() throws Exception {
http2ConnectionHandler.awaitResponse();
}
/**
* Closes the channel and waits for shutdown to complete.
*/
public void stop() {
try {
// Wait until the connection is closed.
@ -94,51 +127,29 @@ public class Http2Client {
}
}
public ChannelFuture sendHeaders(int streamId, Http2Headers headers) throws Http2Exception {
return http2ConnectionHandler.writeHeaders(streamId, headers, 0, true, true);
}
public ChannelFuture send(int streamId, ByteBuf data, int padding, boolean endStream,
boolean endSegment, boolean compressed) throws Http2Exception {
return http2ConnectionHandler.writeData(streamId, data, padding, endStream, endSegment,
compressed);
}
public Http2Headers headers() {
return DefaultHttp2Headers.newBuilder().authority(host).method(HttpMethod.GET.name())
.build();
}
public BlockingQueue<ChannelFuture> queue() {
return http2ConnectionHandler.queue();
}
public static void main(String[] args) throws Exception {
int port;
if (args.length > 0) {
port = Integer.parseInt(args[0]);
} else {
port = 8443;
}
final Http2Client client = new Http2Client("localhost", port);
EndpointConfig config = parseEndpointConfig(args);
System.out.println(config);
final Http2Client client = new Http2Client(config);
try {
// Start the client and wait for the HTTP/2 upgrade to complete.
client.start();
System.out.println("Sending headers...");
ChannelFuture requestFuture = client.sendHeaders(3, client.headers()).sync();
// Create a simple GET request with just headers.
FullHttpRequest request = new DefaultFullHttpRequest(HTTP_1_1, GET, "/whatever");
request.headers().add(HOST, config.host());
System.out.println("Sending request...");
ChannelFuture requestFuture = client.channel.writeAndFlush(request).sync();
System.out.println("Back from sending headers...");
if (!requestFuture.isSuccess()) {
requestFuture.cause().printStackTrace();
return;
}
// Waits for the complete response
ChannelFuture responseFuture = client.queue().poll(5, SECONDS);
if (!responseFuture.isSuccess()) {
responseFuture.cause().printStackTrace();
}
client.awaitResponse();
System.out.println("Finished HTTP/2 request");
} catch (Throwable t) {
t.printStackTrace();

View File

@ -14,15 +14,17 @@
*/
package io.netty.example.http2.client;
import static io.netty.example.http2.Http2ExampleUtil.UPGRADE_RESPONSE_HEADER;
import static io.netty.util.internal.logging.InternalLogLevel.INFO;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http.FullHttpMessage;
import io.netty.handler.codec.http2.AbstractHttp2ConnectionHandler;
import io.netty.handler.codec.http2.DefaultHttp2Connection;
import io.netty.handler.codec.http2.DefaultHttp2FrameReader;
import io.netty.handler.codec.http2.DefaultHttp2FrameWriter;
import io.netty.handler.codec.http2.DefaultHttp2Headers;
import io.netty.handler.codec.http2.DefaultHttp2InboundFlowController;
import io.netty.handler.codec.http2.DefaultHttp2OutboundFlowController;
import io.netty.handler.codec.http2.Http2Exception;
@ -36,8 +38,7 @@ import io.netty.handler.codec.http2.Http2Settings;
import io.netty.util.CharsetUtil;
import io.netty.util.internal.logging.InternalLoggerFactory;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.Map;
import java.util.concurrent.TimeUnit;
/**
@ -47,36 +48,65 @@ import java.util.concurrent.TimeUnit;
public class Http2ClientConnectionHandler extends AbstractHttp2ConnectionHandler {
private static final Http2FrameLogger logger = new Http2FrameLogger(INFO,
InternalLoggerFactory.getInstance(Http2ClientConnectionHandler.class));
private final BlockingQueue<ChannelFuture> queue = new LinkedBlockingQueue<ChannelFuture>();
private final ChannelPromise initPromise;
private final ChannelPromise responsePromise;
private ByteBuf collectedData;
private ChannelPromise initialized;
public Http2ClientConnectionHandler() {
public Http2ClientConnectionHandler(ChannelPromise initPromise, ChannelPromise responsePromise) {
super(new DefaultHttp2Connection(false, false), frameReader(), frameWriter(),
new DefaultHttp2InboundFlowController(), new DefaultHttp2OutboundFlowController());
this.initPromise = initPromise;
this.responsePromise = responsePromise;
}
public void awaitInitialization() throws InterruptedException {
initialized.await(5, TimeUnit.SECONDS);
/**
* Wait for this handler to be added after the upgrade to HTTP/2, and for initial preface
* handshake to complete.
*/
public void awaitInitialization() throws Exception {
if (!initPromise.awaitUninterruptibly(5, TimeUnit.SECONDS)) {
throw new IllegalStateException("Timed out waiting for initialization");
}
if (!initPromise.isSuccess()) {
throw new RuntimeException(initPromise.cause());
}
}
/**
* Wait for this full response to be received and printed out.
*/
public void awaitResponse() throws Exception {
if (!responsePromise.awaitUninterruptibly(5, TimeUnit.SECONDS)) {
throw new IllegalStateException("Timed out waiting for completion of the response");
}
if (!responsePromise.isSuccess()) {
throw new RuntimeException(initPromise.cause());
}
}
/**
* Handles conversion of a {@link FullHttpMessage} to HTTP/2 frames.
*/
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
super.handlerAdded(ctx);
initialized = ctx.newPromise();
}
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
throws Exception {
if (msg instanceof FullHttpMessage) {
FullHttpMessage httpMsg = (FullHttpMessage) msg;
boolean hasData = httpMsg.content().isReadable();
public ChannelFuture writeData(int streamId, ByteBuf data, int padding, boolean endStream,
boolean endSegment, boolean compressed) throws Http2Exception {
return super.writeData(ctx(), ctx().newPromise(), streamId, data, padding, endStream,
endSegment, compressed);
// Convert and write the headers.
DefaultHttp2Headers.Builder headers = DefaultHttp2Headers.newBuilder();
for (Map.Entry<String, String> entry : httpMsg.headers().entries()) {
headers.add(entry.getKey(), entry.getValue());
}
int streamId = nextStreamId();
writeHeaders(ctx, promise, streamId, headers.build(), 0, !hasData, false);
if (hasData) {
writeData(ctx, promise, streamId, httpMsg.content(), 0, true, true, false);
}
} else {
super.write(ctx, msg, promise);
}
public ChannelFuture writeHeaders(int streamId, Http2Headers headers, int padding,
boolean endStream, boolean endSegment) throws Http2Exception {
return super.writeHeaders(ctx(), ctx().newPromise(), streamId, headers, padding, endStream,
endSegment);
}
@Override
@ -107,7 +137,7 @@ public class Http2ClientConnectionHandler extends AbstractHttp2ConnectionHandler
collectedData.release();
collectedData = null;
queue.add(ctx().channel().newSucceededFuture());
responsePromise.setSuccess();
}
}
@ -120,6 +150,9 @@ public class Http2ClientConnectionHandler extends AbstractHttp2ConnectionHandler
public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers,
int streamDependency, short weight, boolean exclusive, int padding, boolean endStream,
boolean endSegment) throws Http2Exception {
if (headers.contains(UPGRADE_RESPONSE_HEADER)) {
System.out.println("Received HTTP/2 response to the HTTP->HTTP/2 upgrade request");
}
}
@Override
@ -139,8 +172,8 @@ public class Http2ClientConnectionHandler extends AbstractHttp2ConnectionHandler
@Override
public void onSettingsRead(ChannelHandlerContext ctx, Http2Settings settings)
throws Http2Exception {
if (!initialized.isDone()) {
initialized.setSuccess();
if (!initPromise.isDone()) {
initPromise.setSuccess();
}
}
@ -178,17 +211,13 @@ public class Http2ClientConnectionHandler extends AbstractHttp2ConnectionHandler
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
queue.add(ctx.channel().newFailedFuture(cause));
cause.printStackTrace();
if (!initPromise.isDone()) {
initPromise.setFailure(cause);
}
if (!responsePromise.isDone()) {
initPromise.setFailure(cause);
}
super.exceptionCaught(ctx, cause);
if (!initialized.isDone()) {
initialized.setFailure(cause);
}
}
public BlockingQueue<ChannelFuture> queue() {
return queue;
}
private static Http2FrameReader frameReader() {

View File

@ -14,9 +14,16 @@
*/
package io.netty.example.http2.client;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http2.AbstractHttp2ConnectionHandler;
import io.netty.handler.codec.http.DefaultFullHttpRequest;
import io.netty.handler.codec.http.HttpClientCodec;
import io.netty.handler.codec.http.HttpClientUpgradeHandler;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http2.Http2ClientUpgradeCodec;
import io.netty.handler.ssl.SslContext;
/**
@ -25,15 +32,72 @@ import io.netty.handler.ssl.SslContext;
public class Http2ClientInitializer extends ChannelInitializer<SocketChannel> {
private final SslContext sslCtx;
private final AbstractHttp2ConnectionHandler connectionHandler;
private Http2ClientConnectionHandler connectionHandler;
public Http2ClientInitializer(SslContext sslCtx, AbstractHttp2ConnectionHandler connectionHandler) {
public Http2ClientInitializer(SslContext sslCtx) {
this.sslCtx = sslCtx;
this.connectionHandler = connectionHandler;
}
@Override
public void initChannel(SocketChannel ch) throws Exception {
connectionHandler = new Http2ClientConnectionHandler(ch.newPromise(), ch.newPromise());
if (sslCtx != null) {
configureSsl(ch);
} else {
configureClearText(ch);
}
}
public Http2ClientConnectionHandler connectionHandler() {
return connectionHandler;
}
/**
* Configure the pipeline for TLS NPN negotiation to HTTP/2.
*/
private void configureSsl(SocketChannel ch) {
ch.pipeline().addLast(sslCtx.newHandler(ch.alloc()), connectionHandler);
}
/**
* Configure the pipeline for a cleartext upgrade from HTTP to HTTP/2.
*/
private void configureClearText(SocketChannel ch) {
HttpClientCodec sourceCodec = new HttpClientCodec();
Http2ClientUpgradeCodec upgradeCodec = new Http2ClientUpgradeCodec(connectionHandler);
HttpClientUpgradeHandler upgradeHandler = new HttpClientUpgradeHandler(sourceCodec, upgradeCodec, 65536);
ch.pipeline().addLast(sourceCodec);
ch.pipeline().addLast(upgradeHandler);
ch.pipeline().addLast(new UpgradeRequestHandler());
ch.pipeline().addLast(new UserEventLogger());
}
/**
* A handler that triggers the cleartext upgrade to HTTP/2 by sending an initial HTTP request.
*/
private static class UpgradeRequestHandler extends ChannelHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
DefaultFullHttpRequest upgradeRequest =
new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/");
ctx.writeAndFlush(upgradeRequest);
super.channelActive(ctx);
// Done with this handler, remove it from the pipeline.
ctx.pipeline().remove(ctx.name());
}
}
/**
* Class that logs any User Events triggered on this channel.
*/
private static class UserEventLogger extends ChannelHandlerAdapter {
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
System.out.println("User Event Triggered: " + evt);
super.userEventTriggered(ctx, evt);
}
}
}

View File

@ -15,26 +15,62 @@
package io.netty.example.http2.server;
import static io.netty.example.http2.Http2ExampleUtil.UPGRADE_RESPONSE_HEADER;
import static io.netty.util.internal.logging.InternalLogLevel.INFO;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.example.http2.client.Http2ClientConnectionHandler;
import io.netty.handler.codec.http.HttpServerUpgradeHandler;
import io.netty.handler.codec.http2.AbstractHttp2ConnectionHandler;
import io.netty.handler.codec.http2.DefaultHttp2Connection;
import io.netty.handler.codec.http2.DefaultHttp2FrameReader;
import io.netty.handler.codec.http2.DefaultHttp2FrameWriter;
import io.netty.handler.codec.http2.DefaultHttp2Headers;
import io.netty.handler.codec.http2.DefaultHttp2InboundFlowController;
import io.netty.handler.codec.http2.DefaultHttp2OutboundFlowController;
import io.netty.handler.codec.http2.Http2Exception;
import io.netty.handler.codec.http2.Http2FrameLogger;
import io.netty.handler.codec.http2.Http2Headers;
import io.netty.handler.codec.http2.Http2InboundFrameLogger;
import io.netty.handler.codec.http2.Http2OutboundFrameLogger;
import io.netty.handler.codec.http2.Http2Settings;
import io.netty.util.CharsetUtil;
import io.netty.util.internal.logging.InternalLoggerFactory;
/**
* A simple handler that responds with the message "Hello World!".
*/
public class HelloWorldHttp2Handler extends AbstractHttp2ConnectionHandler {
private static final Http2FrameLogger logger = new Http2FrameLogger(INFO,
InternalLoggerFactory.getInstance(Http2ClientConnectionHandler.class));
static final byte[] RESPONSE_BYTES = "Hello World".getBytes(CharsetUtil.UTF_8);
public HelloWorldHttp2Handler() {
super(true);
super(new DefaultHttp2Connection(true, false), new Http2InboundFrameLogger(
new DefaultHttp2FrameReader(), logger), new Http2OutboundFrameLogger(
new DefaultHttp2FrameWriter(), logger), new DefaultHttp2InboundFlowController(),
new DefaultHttp2OutboundFlowController());
}
/**
* Handles the cleartext HTTP upgrade event. If an upgrade occurred, sends a simple response via HTTP/2
* on stream 1 (the stream specifically reserved for cleartext HTTP upgrade).
*/
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof HttpServerUpgradeHandler.UpgradeEvent) {
// Write an HTTP/2 response to the upgrade request
Http2Headers headers =
DefaultHttp2Headers.newBuilder().set(UPGRADE_RESPONSE_HEADER, "true").build();
writeHeaders(ctx, ctx.newPromise(), 1, headers, 0, true, true);
}
super.userEventTriggered(ctx, evt);
}
/**
* If receive a frame with end-of-stream set, send a pre-canned response.
*/
@Override
public void onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding,
boolean endOfStream, boolean endOfSegment, boolean compressed) throws Http2Exception {
@ -43,6 +79,9 @@ public class HelloWorldHttp2Handler extends AbstractHttp2ConnectionHandler {
}
}
/**
* If receive a frame with end-of-stream set, send a pre-canned response.
*/
@Override
public void onHeadersRead(ChannelHandlerContext ctx, int streamId,
io.netty.handler.codec.http2.Http2Headers headers, int padding, boolean endStream,
@ -52,6 +91,9 @@ public class HelloWorldHttp2Handler extends AbstractHttp2ConnectionHandler {
}
}
/**
* If receive a frame with end-of-stream set, send a pre-canned response.
*/
@Override
public void onHeadersRead(ChannelHandlerContext ctx, int streamId,
io.netty.handler.codec.http2.Http2Headers headers, int streamDependency, short weight,
@ -77,7 +119,8 @@ public class HelloWorldHttp2Handler extends AbstractHttp2ConnectionHandler {
}
@Override
public void onSettingsRead(ChannelHandlerContext ctx, Http2Settings settings) throws Http2Exception {
public void onSettingsRead(ChannelHandlerContext ctx, Http2Settings settings)
throws Http2Exception {
}
@Override
@ -118,6 +161,9 @@ public class HelloWorldHttp2Handler extends AbstractHttp2ConnectionHandler {
super.exceptionCaught(ctx, cause);
}
/**
* Sends a "Hello World" DATA frame to the client.
*/
private void sendResponse(ChannelHandlerContext ctx, int streamId) throws Http2Exception {
// Send a frame for the response status
Http2Headers headers = DefaultHttp2Headers.newBuilder().status("200").build();

View File

@ -16,12 +16,14 @@
package io.netty.example.http2.server;
import static io.netty.example.http2.Http2ExampleUtil.parseEndpointConfig;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.example.http2.Http2ExampleUtil.EndpointConfig;
import io.netty.handler.codec.http2.Http2OrHttpChooser.SelectedProtocol;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.util.SelfSignedCertificate;
@ -29,18 +31,18 @@ import io.netty.handler.ssl.util.SelfSignedCertificate;
import java.util.Arrays;
/**
* A HTTP/2 Server that responds to requests with a Hello World.
* A HTTP/2 Server that responds to requests with a Hello World. Once started, you can test the
* server with the example client.
* <p>
* Once started, you can test the server with the example client.
* To server accepts command-line arguments for {@code -port=<port number>} <i>(default: http=8080,
* https=8443)</i>, and {@code -ssl=<true/false>} <i>(default=false)</i>.
*/
public class Http2Server {
private final SslContext sslCtx;
private final int port;
private final EndpointConfig config;
public Http2Server(SslContext sslCtx, int port) {
this.sslCtx = sslCtx;
this.port = port;
public Http2Server(EndpointConfig config) {
this.config = config;
}
public void run() throws Exception {
@ -50,10 +52,23 @@ public class Http2Server {
try {
ServerBootstrap b = new ServerBootstrap();
b.option(ChannelOption.SO_BACKLOG, 1024);
// If SSL was selected, configure the SSL context.
SslContext sslCtx = null;
if (config.isSsl()) {
SelfSignedCertificate ssc = new SelfSignedCertificate();
sslCtx = SslContext.newServerContext(
ssc.certificate(), ssc.privateKey(), null, null,
Arrays.asList(
SelectedProtocol.HTTP_2.protocolName(),
SelectedProtocol.HTTP_1_1.protocolName()),
0, 0);
}
b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
.childHandler(new Http2ServerInitializer(sslCtx));
Channel ch = b.bind(port).sync().channel();
Channel ch = b.bind(config.port()).sync().channel();
ch.closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
@ -62,24 +77,10 @@ public class Http2Server {
}
public static void main(String[] args) throws Exception {
int port;
if (args.length > 0) {
port = Integer.parseInt(args[0]);
} else {
port = 8443;
}
EndpointConfig config = parseEndpointConfig(args);
System.out.println(config);
System.out.println("HTTP2 server started at port " + port + '.');
// Configure SSL context.
SelfSignedCertificate ssc = new SelfSignedCertificate();
SslContext sslCtx = SslContext.newServerContext(
ssc.certificate(), ssc.privateKey(), null, null,
Arrays.asList(
SelectedProtocol.HTTP_2.protocolName(),
SelectedProtocol.HTTP_1_1.protocolName()),
0, 0);
new Http2Server(sslCtx, port).run();
System.out.println("HTTP2 server started at port " + config.port() + '.');
new Http2Server(config).run();
}
}

View File

@ -16,15 +16,26 @@
package io.netty.example.http2.server;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerAppender;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpRequestDecoder;
import io.netty.handler.codec.http.HttpResponseEncoder;
import io.netty.handler.codec.http.HttpServerUpgradeHandler;
import io.netty.handler.codec.http2.Http2ServerUpgradeCodec;
import io.netty.handler.ssl.SslContext;
import java.util.Arrays;
/**
* Sets up the Netty pipeline
* Sets up the Netty pipeline for the example server. Depending on the endpoint config, sets up the
* pipeline for NPN or cleartext HTTP upgrade to HTTP/2.
*/
public class Http2ServerInitializer extends ChannelInitializer<SocketChannel> {
private final SslContext sslCtx;
public Http2ServerInitializer(SslContext sslCtx) {
@ -33,6 +44,63 @@ public class Http2ServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
public void initChannel(SocketChannel ch) throws Exception {
if (sslCtx != null) {
configureSsl(ch);
} else {
configureClearText(ch);
}
}
/**
* Configure the pipeline for TLS NPN negotiation to HTTP/2.
*/
private void configureSsl(SocketChannel ch) {
ch.pipeline().addLast(sslCtx.newHandler(ch.alloc()), new Http2OrHttpHandler());
}
/**
* Configure the pipeline for a cleartext upgrade from HTTP to HTTP/2.
*/
private void configureClearText(SocketChannel ch) {
HttpCodec sourceCodec = new HttpCodec();
HttpServerUpgradeHandler.UpgradeCodec upgradeCodec =
new Http2ServerUpgradeCodec(new HelloWorldHttp2Handler());
HttpServerUpgradeHandler upgradeHandler =
new HttpServerUpgradeHandler(sourceCodec, Arrays.asList(upgradeCodec), 65536);
ch.pipeline().addLast(sourceCodec);
ch.pipeline().addLast(upgradeHandler);
ch.pipeline().addLast(new UserEventLogger());
}
/**
* Source codec for HTTP cleartext upgrade.
*/
private static final class HttpCodec extends ChannelHandlerAppender implements
HttpServerUpgradeHandler.SourceCodec {
HttpCodec() {
add("httpRequestDecoder", new HttpRequestDecoder());
add("httpResponseEncoder", new HttpResponseEncoder());
add("httpRequestAggregator", new HttpObjectAggregator(65536));
}
@Override
public void upgradeFrom(ChannelHandlerContext ctx) {
System.out.println("removing HTTP handlers");
ctx.pipeline().remove("httpRequestAggregator");
ctx.pipeline().remove("httpResponseEncoder");
ctx.pipeline().remove("httpRequestDecoder");
}
}
/**
* Class that logs any User Events triggered on this channel.
*/
private static class UserEventLogger extends ChannelHandlerAdapter {
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
System.out.println("User Event Triggered: " + evt);
super.userEventTriggered(ctx, evt);
}
}
}