Adding a general handler for upgrading protocols

Adding full implementation for cleartext upgrade from HTTP to HTTP/2.
This commit is contained in:
nmittler 2014-05-13 10:26:42 -07:00
parent 856c89dd70
commit 086dc9140a
21 changed files with 1707 additions and 246 deletions

View File

@ -40,7 +40,8 @@ import java.util.concurrent.atomic.AtomicLong;
* *
* @see HttpServerCodec * @see HttpServerCodec
*/ */
public final class HttpClientCodec extends ChannelHandlerAppender { public final class HttpClientCodec extends ChannelHandlerAppender implements
HttpClientUpgradeHandler.SourceCodec {
/** A queue that is used for correlating a request and a response. */ /** A queue that is used for correlating a request and a response. */
private final Queue<HttpMethod> queue = new ArrayDeque<HttpMethod>(); private final Queue<HttpMethod> queue = new ArrayDeque<HttpMethod>();
@ -86,6 +87,16 @@ public final class HttpClientCodec extends ChannelHandlerAppender {
this.failOnMissingResponse = failOnMissingResponse; this.failOnMissingResponse = failOnMissingResponse;
} }
/**
* Upgrades to another protocol from HTTP. Removes the {@link Decoder} and {@link Encoder} from
* the pipeline.
*/
@Override
public void upgradeFrom(ChannelHandlerContext ctx) {
ctx.pipeline().remove(Decoder.class);
ctx.pipeline().remove(Encoder.class);
}
private Decoder decoder() { private Decoder decoder() {
return handlerAt(0); return handlerAt(0);
} }

View File

@ -0,0 +1,234 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License, version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package io.netty.handler.codec.http;
import static io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION;
import static io.netty.handler.codec.http.HttpHeaders.Names.UPGRADE;
import static io.netty.handler.codec.http.HttpResponseStatus.SWITCHING_PROTOCOLS;
import static io.netty.util.ReferenceCountUtil.release;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import java.util.Collection;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
/**
* Client-side handler for handling an HTTP upgrade handshake to another protocol. When the first
* HTTP request is sent, this handler will add all appropriate headers to perform an upgrade to the
* new protocol. If the upgrade fails (i.e. response is not 101 Switching Protocols), this handler
* simply removes itself from the pipeline. If the upgrade is successful, upgrades the pipeline to
* the new protocol.
*/
public class HttpClientUpgradeHandler extends HttpObjectAggregator {
/**
* User events that are fired to notify about upgrade status.
*/
public enum UpgradeEvent {
/**
* The Upgrade request was sent to the server.
*/
UPGRADE_ISSUED,
/**
* The Upgrade to the new protocol was successful.
*/
UPGRADE_SUCCESSFUL,
/**
* The Upgrade was unsuccessful due to the server not issuing
* with a 101 Switching Protocols response.
*/
UPGRADE_REJECTED
}
/**
* The source codec that is used in the pipeline initially.
*/
public interface SourceCodec {
/**
* Removes this codec (i.e. all associated handlers) from the pipeline.
*/
void upgradeFrom(ChannelHandlerContext ctx);
}
/**
* A codec that the source can be upgraded to.
*/
public interface UpgradeCodec {
/**
* Returns the name of the protocol supported by this codec, as indicated by the {@link UPGRADE} header.
*/
String protocol();
/**
* Sets any protocol-specific headers required to the upgrade request. Returns the names of
* all headers that were added. These headers will be used to populate the CONNECTION header.
*/
Collection<String> setUpgradeHeaders(ChannelHandlerContext ctx, HttpRequest upgradeRequest);
/**
* Performs an HTTP protocol upgrade from the source codec. This method is responsible for
* adding all handlers required for the new protocol.
*
* @param ctx the context for the current handler.
* @param upgradeResponse the 101 Switching Protocols response that indicates that the server
* has switched to this protocol.
*/
void upgradeTo(ChannelHandlerContext ctx, FullHttpResponse upgradeResponse) throws Exception;
}
private final SourceCodec sourceCodec;
private final UpgradeCodec upgradeCodec;
private boolean upgradeRequested;
/**
* Constructs the client upgrade handler.
*
* @param sourceCodec the codec that is being used initially.
* @param upgradeCodec the codec that the client would like to upgrade to.
* @param maxContentLength the maximum length of the aggregated content.
*/
public HttpClientUpgradeHandler(SourceCodec sourceCodec, UpgradeCodec upgradeCodec,
int maxContentLength) {
super(maxContentLength);
if (sourceCodec == null) {
throw new NullPointerException("sourceCodec");
}
if (upgradeCodec == null) {
throw new NullPointerException("upgradeCodec");
}
this.sourceCodec = sourceCodec;
this.upgradeCodec = upgradeCodec;
}
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
throws Exception {
if (!(msg instanceof HttpRequest)) {
super.write(ctx, msg, promise);
return;
}
if (upgradeRequested) {
promise.setFailure(new IllegalStateException(
"Attempting to write HTTP request with upgrade in progress"));
return;
}
upgradeRequested = true;
setUpgradeRequestHeaders(ctx, (HttpRequest) msg);
// Continue writing the request.
super.write(ctx, msg, promise);
// Notify that the upgrade request was issued.
ctx.fireUserEventTriggered(UpgradeEvent.UPGRADE_ISSUED);
// Now we wait for the next HTTP response to see if we switch protocols.
}
@Override
protected void decode(ChannelHandlerContext ctx, HttpObject msg, List<Object> out)
throws Exception {
FullHttpResponse response = null;
try {
if (!upgradeRequested) {
throw new IllegalStateException("Read HTTP response without requesting protocol switch");
}
if (msg instanceof FullHttpResponse) {
response = (FullHttpResponse) msg;
// Need to retain since the base class will release after returning from this method.
response.retain();
out.add(response);
} else {
// Call the base class to handle the aggregation of the full request.
super.decode(ctx, msg, out);
if (out.isEmpty()) {
// The full request hasn't been created yet, still awaiting more data.
return;
}
assert out.size() == 1;
response = (FullHttpResponse) out.get(0);
}
if (!SWITCHING_PROTOCOLS.equals(response.getStatus())) {
// The server does not support the requested protocol, just remove this handler
// and continue processing HTTP.
// NOTE: not releasing the response since we're letting it propagate to the
// next handler.
ctx.fireUserEventTriggered(UpgradeEvent.UPGRADE_REJECTED);
removeThisHandler(ctx);
return;
}
String upgradeHeader = response.headers().get(UPGRADE);
if (upgradeHeader == null) {
throw new IllegalStateException(
"Switching Protocols response missing UPGRADE header");
}
if (!upgradeCodec.protocol().equalsIgnoreCase(upgradeHeader)) {
throw new IllegalStateException(
"Switching Protocols response with unexpected UPGRADE protocol: "
+ upgradeHeader);
}
// Upgrade to the new protocol.
sourceCodec.upgradeFrom(ctx);
upgradeCodec.upgradeTo(ctx, response);
// Notify that the upgrade to the new protocol completed successfully.
ctx.fireUserEventTriggered(UpgradeEvent.UPGRADE_SUCCESSFUL);
// We switched protocols, so we're done with the upgrade response.
// Release it and clear it from the output.
response.release();
out.clear();
removeThisHandler(ctx);
} catch (Throwable t) {
release(response);
ctx.fireExceptionCaught(t);
removeThisHandler(ctx);
}
}
private void removeThisHandler(ChannelHandlerContext ctx) {
ctx.pipeline().remove(ctx.name());
}
/**
* Adds all upgrade request headers necessary for an upgrade to the supported protocols.
*/
private void setUpgradeRequestHeaders(ChannelHandlerContext ctx, HttpRequest request) {
// Set the UPGRADE header on the request.
request.headers().set(UPGRADE, upgradeCodec.protocol());
// Add all protocol-specific headers to the request.
Set<String> connectionParts = new LinkedHashSet<String>(2);
connectionParts.addAll(upgradeCodec.setUpgradeHeaders(ctx, request));
// Set the CONNECTION header from the set of all protocol-specific headers that were added.
StringBuilder builder = new StringBuilder();
for (String part : connectionParts) {
builder.append(part);
builder.append(",");
}
builder.append(UPGRADE);
request.headers().set(CONNECTION, builder.toString());
}
}

View File

@ -16,15 +16,20 @@
package io.netty.handler.codec.http; package io.netty.handler.codec.http;
import io.netty.channel.ChannelHandlerAppender; import io.netty.channel.ChannelHandlerAppender;
import io.netty.channel.ChannelHandlerContext;
/** /**
* A combination of {@link HttpRequestDecoder} and {@link HttpResponseEncoder} * A combination of {@link HttpRequestDecoder} and {@link HttpResponseEncoder}
* which enables easier server side HTTP implementation. * which enables easier server side HTTP implementation. Also supports use with
* a {@link HttpServerUpgradeHandler} to support upgrading to another protocol
* from HTTP.
* *
* @see HttpClientCodec * @see HttpClientCodec
* @see HttpServerUpgradeHandler
*/ */
public final class HttpServerCodec extends ChannelHandlerAppender { public final class HttpServerCodec extends ChannelHandlerAppender implements
HttpServerUpgradeHandler.SourceCodec {
/** /**
* Creates a new instance with the default decoder options * Creates a new instance with the default decoder options
@ -49,4 +54,14 @@ public final class HttpServerCodec extends ChannelHandlerAppender {
super(new HttpRequestDecoder(maxInitialLineLength, maxHeaderSize, maxChunkSize, validateHeaders), super(new HttpRequestDecoder(maxInitialLineLength, maxHeaderSize, maxChunkSize, validateHeaders),
new HttpResponseEncoder()); new HttpResponseEncoder());
} }
/**
* Upgrades to another protocol from HTTP. Removes the {@link HttpRequestDecoder} and
* {@link HttpResponseEncoder} from the pipeline.
*/
@Override
public void upgradeFrom(ChannelHandlerContext ctx) {
ctx.pipeline().remove(HttpRequestDecoder.class);
ctx.pipeline().remove(HttpResponseEncoder.class);
}
} }

View File

@ -0,0 +1,376 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License, version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package io.netty.handler.codec.http;
import static io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION;
import static io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_LENGTH;
import static io.netty.handler.codec.http.HttpHeaders.Names.UPGRADE;
import static io.netty.handler.codec.http.HttpResponseStatus.SWITCHING_PROTOCOLS;
import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
import static java.lang.String.CASE_INSENSITIVE_ORDER;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.ReferenceCounted;
import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
/**
* A server-side handler that receives HTTP requests and optionally performs a protocol switch if
* the requested protocol is supported. Once an upgrade is performed, this handler removes itself
* from the pipeline.
*/
public class HttpServerUpgradeHandler extends HttpObjectAggregator {
/**
* The source codec that is used in the pipeline initially.
*/
public interface SourceCodec {
/**
* Removes this codec (i.e. all associated handlers) from the pipeline.
*/
void upgradeFrom(ChannelHandlerContext ctx);
}
/**
* A codec that the source can be upgraded to.
*/
public interface UpgradeCodec {
/**
* Returns the name of the protocol supported by this codec, as indicated by the {@link UPGRADE} header.
*/
String protocol();
/**
* Gets all protocol-specific headers required by this protocol for a successful upgrade.
* Any supplied header will be required to appear in the {@link CONNECTION} header as well.
*/
Collection<String> requiredUpgradeHeaders();
/**
* Adds any headers to the 101 Switching protocols response that are appropriate for this protocol.
*/
void prepareUpgradeResponse(ChannelHandlerContext ctx, FullHttpRequest upgradeRequest,
FullHttpResponse upgradeResponse);
/**
* Performs an HTTP protocol upgrade from the source codec. This method is responsible for
* adding all handlers required for the new protocol.
*
* @param ctx the context for the current handler.
* @param upgradeRequest the request that triggered the upgrade to this protocol. The
* upgraded protocol is responsible for sending the response.
* @param upgradeResponse a 101 Switching Protocols response that is populated with the
* {@link CONNECTION} and {@link UPGRADE} headers. The protocol is required to
* send this before sending any other frames back to the client. The headers may
* be augmented as necessary by the protocol before sending.
* @return the future for the writing of the upgrade response.
*/
void upgradeTo(ChannelHandlerContext ctx, FullHttpRequest upgradeRequest,
FullHttpResponse upgradeResponse);
}
/**
* User event that is fired to notify about the completion of an HTTP upgrade
* to another protocol. Contains the original upgrade request so that the response
* (if required) can be sent using the new protocol.
*/
public static final class UpgradeEvent implements ReferenceCounted {
private final String protocol;
private final FullHttpRequest upgradeRequest;
private UpgradeEvent(String protocol, FullHttpRequest upgradeRequest) {
this.protocol = protocol;
this.upgradeRequest = upgradeRequest;
}
/**
* The protocol that the channel has been upgraded to.
*/
public String protocol() {
return protocol;
}
/**
* Gets the request that triggered the protocol upgrade.
*/
public FullHttpRequest upgradeRequest() {
return upgradeRequest;
}
@Override
public int refCnt() {
return upgradeRequest.refCnt();
}
@Override
public UpgradeEvent retain() {
upgradeRequest.retain();
return this;
}
@Override
public UpgradeEvent retain(int increment) {
upgradeRequest.retain(increment);
return this;
}
@Override
public UpgradeEvent touch() {
upgradeRequest.touch();
return this;
}
@Override
public UpgradeEvent touch(Object hint) {
upgradeRequest.touch(hint);
return this;
}
@Override
public boolean release() {
return upgradeRequest.release();
}
@Override
public boolean release(int decrement) {
return upgradeRequest.release();
}
@Override
public String toString() {
return "UpgradeEvent [protocol=" + protocol + ", upgradeRequest=" + upgradeRequest
+ "]";
}
}
private final Map<String, UpgradeCodec> upgradeCodecMap;
private final SourceCodec sourceCodec;
private boolean handlingUpgrade;
/**
* Constructs the upgrader with the supported codecs.
*
* @param sourceCodec the codec that is being used initially.
* @param upgradeCodecs the codecs (in order of preference) that this server supports
* upgrading to from the source codec.
* @param maxContentLength the maximum length of the aggregated content.
*/
public HttpServerUpgradeHandler(SourceCodec sourceCodec,
Collection<UpgradeCodec> upgradeCodecs, int maxContentLength) {
super(maxContentLength);
if (sourceCodec == null) {
throw new NullPointerException("sourceCodec");
}
if (upgradeCodecs == null) {
throw new NullPointerException("upgradeCodecs");
}
this.sourceCodec = sourceCodec;
upgradeCodecMap = new LinkedHashMap<String, UpgradeCodec>(upgradeCodecs.size());
for (UpgradeCodec upgradeCodec : upgradeCodecs) {
String name = upgradeCodec.protocol().toUpperCase(Locale.US);
upgradeCodecMap.put(name, upgradeCodec);
}
}
@Override
protected void decode(ChannelHandlerContext ctx, HttpObject msg, List<Object> out)
throws Exception {
// Determine if we're already handling an upgrade request or just starting a new one.
handlingUpgrade = handlingUpgrade || isUpgradeRequest(msg);
if (!handlingUpgrade) {
// Not handling an upgrade request, just pass it to the next handler.
ReferenceCountUtil.retain(msg);
out.add(msg);
return;
}
FullHttpRequest fullRequest = null;
if (msg instanceof FullHttpRequest) {
fullRequest = (FullHttpRequest) msg;
ReferenceCountUtil.retain(msg);
out.add(msg);
} else {
// Call the base class to handle the aggregation of the full request.
super.decode(ctx, msg, out);
if (out.isEmpty()) {
// The full request hasn't been created yet, still awaiting more data.
return;
}
// Finished aggregating the full request, get it from the output list.
assert out.size() == 1;
handlingUpgrade = false;
fullRequest = (FullHttpRequest) out.get(0);
}
if (upgrade(ctx, fullRequest)) {
// The upgrade was successful, remove the message from the output list
// so that it's not propagated to the next handler. This request will
// be propagated as a user event instead.
out.clear();
}
// The upgrade did not succeed, just allow the full request to propagate to the
// next handler.
}
/**
* Determines whether or not the message is an HTTP upgrade request.
*/
private boolean isUpgradeRequest(HttpObject msg) {
return (msg instanceof HttpRequest) && ((HttpRequest) msg).headers().get(UPGRADE) != null;
}
/**
* Attempts to upgrade to the protocol(s) identified by the {@link UPGRADE} header (if provided
* in the request).
*
* @param ctx the context for this handler.
* @param request the HTTP request.
* @return {@code true} if the upgrade occurred, otherwise {@code false}.
*/
private boolean upgrade(final ChannelHandlerContext ctx, final FullHttpRequest request) {
// Select the best protocol based on those requested in the UPGRADE header.
String upgradeHeader = request.headers().get(UPGRADE);
final UpgradeCodec upgradeCodec = selectUpgradeCodec(upgradeHeader);
if (upgradeCodec == null) {
// None of the requested protocols are supported, don't upgrade.
return false;
}
// Make sure the CONNECTION header is present.
String connectionHeader = request.headers().get(CONNECTION);
if (connectionHeader == null) {
return false;
}
// Make sure the CONNECTION header contains UPGRADE as well as all protocol-specific headers.
Collection<String> requiredHeaders = upgradeCodec.requiredUpgradeHeaders();
Set<String> values = splitHeader(connectionHeader);
if (!values.contains(UPGRADE.toString()) || !values.containsAll(requiredHeaders)) {
return false;
}
// Ensure that all required protocol-specific headers are found in the request.
for (String requiredHeader : requiredHeaders) {
if (!request.headers().contains(requiredHeader)) {
return false;
}
}
// Create the user event to be fired once the upgrade completes.
final UpgradeEvent event = new UpgradeEvent(upgradeCodec.protocol(), request);
// Prepare and send the upgrade response. Wait for this write to complete before upgrading,
// since we need the old codec in-place to properly encode the response.
final FullHttpResponse upgradeResponse = createUpgradeResponse(upgradeCodec);
upgradeCodec.prepareUpgradeResponse(ctx, request, upgradeResponse);
ctx.writeAndFlush(upgradeResponse).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
try {
if (future.isSuccess()) {
// Perform the upgrade to the new protocol.
sourceCodec.upgradeFrom(ctx);
upgradeCodec.upgradeTo(ctx, request, upgradeResponse);
// Notify that the upgrade has occurred. Retain the event to offset
// the release() in the finally block.
ctx.fireUserEventTriggered(event.retain());
// Remove this handler from the pipeline.
ctx.pipeline().remove(HttpServerUpgradeHandler.this);
} else {
future.channel().close();
}
} finally {
// Release the event if the upgrade event wasn't fired.
event.release();
}
}
});
return true;
}
/**
* Looks up the most desirable supported upgrade codec from the list of choices in the UPGRADE
* header. If no suitable codec was found, returns {@code null}.
*/
private UpgradeCodec selectUpgradeCodec(String upgradeHeader) {
Set<String> requestedProtocols = splitHeader(upgradeHeader);
// Retain only the protocols that are in the protocol map. Maintain the original insertion
// order into the protocolMap, so that the first one in the remaining set is the most
// desirable protocol for the server.
Set<String> supportedProtocols = new LinkedHashSet<String>(upgradeCodecMap.keySet());
supportedProtocols.retainAll(requestedProtocols);
if (!supportedProtocols.isEmpty()) {
String protocol = supportedProtocols.iterator().next().toUpperCase(Locale.US);
return upgradeCodecMap.get(protocol);
}
return null;
}
/**
* Creates the 101 Switching Protocols response message.
*/
private FullHttpResponse createUpgradeResponse(UpgradeCodec upgradeCodec) {
DefaultFullHttpResponse res = new DefaultFullHttpResponse(HTTP_1_1, SWITCHING_PROTOCOLS);
res.headers().add(CONNECTION, UPGRADE);
res.headers().add(UPGRADE, upgradeCodec.protocol());
res.headers().add(CONTENT_LENGTH, "0");
return res;
}
/**
* Splits a comma-separated header value. The returned set is case-insensitive and contains each
* part with whitespace removed.
*/
private static Set<String> splitHeader(String header) {
StringBuilder builder = new StringBuilder(header.length());
Set<String> protocols = new TreeSet<String>(CASE_INSENSITIVE_ORDER);
for (int i = 0; i < header.length(); ++i) {
char c = header.charAt(i);
if (Character.isWhitespace(c)) {
// Don't include any whitespace.
continue;
}
if (c == ',') {
// Add the string and reset the builder for the next protocol.
protocols.add(builder.toString());
builder.setLength(0);
} else {
builder.append(c);
}
}
// Add the last protocol
if (builder.length() > 0) {
protocols.add(builder.toString());
}
return protocols;
}
}

View File

@ -15,7 +15,9 @@
package io.netty.handler.codec.http2; package io.netty.handler.codec.http2;
import static io.netty.handler.codec.http2.Http2CodecUtil.CONNECTION_STREAM_ID;
import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_PRIORITY_WEIGHT; import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_PRIORITY_WEIGHT;
import static io.netty.handler.codec.http2.Http2CodecUtil.HTTP_UPGRADE_STREAM_ID;
import static io.netty.handler.codec.http2.Http2CodecUtil.connectionPrefaceBuf; import static io.netty.handler.codec.http2.Http2CodecUtil.connectionPrefaceBuf;
import static io.netty.handler.codec.http2.Http2CodecUtil.toByteBuf; import static io.netty.handler.codec.http2.Http2CodecUtil.toByteBuf;
import static io.netty.handler.codec.http2.Http2CodecUtil.toHttp2Exception; import static io.netty.handler.codec.http2.Http2CodecUtil.toHttp2Exception;
@ -61,14 +63,14 @@ public abstract class AbstractHttp2ConnectionHandler extends ByteToMessageDecode
private final Http2Connection connection; private final Http2Connection connection;
private final Http2InboundFlowController inboundFlow; private final Http2InboundFlowController inboundFlow;
private final Http2OutboundFlowController outboundFlow; private final Http2OutboundFlowController outboundFlow;
// We prefer ArrayDeque to LinkedList because later will produce more GC.
// This initial capacity is plenty for SETTINGS traffic.
private final ArrayDeque<Http2Settings> outstandingLocalSettingsQueue = new ArrayDeque<Http2Settings>(4);
private ByteBuf clientPrefaceString; private ByteBuf clientPrefaceString;
private boolean prefaceSent; private boolean prefaceSent;
private boolean prefaceReceived; private boolean prefaceReceived;
private ChannelHandlerContext ctx; private ChannelHandlerContext ctx;
private ChannelFutureListener closeListener; private ChannelFutureListener closeListener;
// We prefer ArrayDeque to LinkedList because later will produce more GC.
// This initial capacity is plenty for SETTINGS traffic.
private final ArrayDeque<Http2Settings> outstandingLocalSettingsQueue = new ArrayDeque<Http2Settings>(4);
protected AbstractHttp2ConnectionHandler(boolean server) { protected AbstractHttp2ConnectionHandler(boolean server) {
this(server, false); this(server, false);
@ -111,6 +113,45 @@ public abstract class AbstractHttp2ConnectionHandler extends ByteToMessageDecode
clientPrefaceString = connection.isServer()? connectionPrefaceBuf() : null; clientPrefaceString = connection.isServer()? connectionPrefaceBuf() : null;
} }
/**
* Handles the client-side (cleartext) upgrade from HTTP to HTTP/2. Reserves local stream 1 for
* the HTTP/2 response.
*/
public final void onHttpClientUpgrade() throws Http2Exception {
if (connection.isServer()) {
throw protocolError("Client-side HTTP upgrade requested for a server");
}
if (prefaceSent || prefaceReceived) {
throw protocolError("HTTP upgrade must occur before HTTP/2 preface is sent or received");
}
// Create a local stream used for the HTTP cleartext upgrade.
createLocalStream(HTTP_UPGRADE_STREAM_ID, true, CONNECTION_STREAM_ID,
DEFAULT_PRIORITY_WEIGHT, false);
}
/**
* Handles the server-side (cleartext) upgrade from HTTP to HTTP/2.
*
* @param settings the settings for the remote endpoint.
*/
public final void onHttpServerUpgrade(Http2Settings settings)
throws Http2Exception {
if (!connection.isServer()) {
throw protocolError("Server-side HTTP upgrade requested for a client");
}
if (prefaceSent || prefaceReceived) {
throw protocolError("HTTP upgrade must occur before HTTP/2 preface is sent or received");
}
// Apply the settings but no ACK is necessary.
applyRemoteSettings(settings);
// Create a stream in the half-closed state.
createRemoteStream(HTTP_UPGRADE_STREAM_ID, true, CONNECTION_STREAM_ID,
DEFAULT_PRIORITY_WEIGHT, false);
}
@Override @Override
public void channelActive(ChannelHandlerContext ctx) throws Exception { public void channelActive(ChannelHandlerContext ctx) throws Exception {
// The channel just became active - send the connection preface to the remote // The channel just became active - send the connection preface to the remote
@ -186,6 +227,13 @@ public abstract class AbstractHttp2ConnectionHandler extends ByteToMessageDecode
return settings; return settings;
} }
/**
* Gets the next stream ID that can be created by the local endpoint.
*/
protected int nextStreamId() {
return connection.local().nextStreamId();
}
protected ChannelFuture writeData(final ChannelHandlerContext ctx, protected ChannelFuture writeData(final ChannelHandlerContext ctx,
final ChannelPromise promise, int streamId, final ByteBuf data, int padding, final ChannelPromise promise, int streamId, final ByteBuf data, int padding,
boolean endStream, boolean endSegment, boolean compressed) { boolean endStream, boolean endSegment, boolean compressed) {
@ -227,14 +275,8 @@ public abstract class AbstractHttp2ConnectionHandler extends ByteToMessageDecode
Http2Stream stream = connection.stream(streamId); Http2Stream stream = connection.stream(streamId);
if (stream == null) { if (stream == null) {
// Creates a new locally-initiated stream. // Create a new locally-initiated stream.
stream = connection.local().createStream(streamId, endStream); stream = createLocalStream(streamId, endStream, streamDependency, weight, exclusive);
// Allow bi-directional traffic.
inboundFlow.addStream(streamId);
if (!endStream) {
outboundFlow.addStream(streamId, streamDependency, weight, exclusive);
}
} else { } else {
// An existing stream... // An existing stream...
if (stream.state() == RESERVED_LOCAL) { if (stream.state() == RESERVED_LOCAL) {
@ -308,10 +350,8 @@ public abstract class AbstractHttp2ConnectionHandler extends ByteToMessageDecode
throw protocolError("Sending settings after connection going away."); throw protocolError("Sending settings after connection going away.");
} }
if (settings.hasPushEnabled()) { if (settings.hasPushEnabled() && connection.isServer()) {
if (connection.isServer()) { throw protocolError("Server sending SETTINGS frame with ENABLE_PUSH specified");
throw protocolError("Server sending SETTINGS frame with ENABLE_PUSH specified");
}
} }
return frameWriter.writeSettings(ctx, promise, settings); return frameWriter.writeSettings(ctx, promise, settings);
@ -569,6 +609,88 @@ public abstract class AbstractHttp2ConnectionHandler extends ByteToMessageDecode
ChannelFutureListener.CLOSE_ON_FAILURE); ChannelFutureListener.CLOSE_ON_FAILURE);
} }
/**
* Applies settings sent from the local endpoint.
*/
private void applyLocalSettings(Http2Settings settings) throws Http2Exception {
if (settings.hasPushEnabled()) {
if (connection.isServer()) {
throw protocolError("Server sending SETTINGS frame with ENABLE_PUSH specified");
}
connection.local().allowPushTo(settings.pushEnabled());
}
if (settings.hasAllowCompressedData()) {
connection.local().allowCompressedData(settings.allowCompressedData());
}
if (settings.hasMaxConcurrentStreams()) {
connection.remote().maxStreams(settings.maxConcurrentStreams());
}
if (settings.hasMaxHeaderTableSize()) {
frameReader.maxHeaderTableSize(settings.maxHeaderTableSize());
}
if (settings.hasInitialWindowSize()) {
inboundFlow.initialInboundWindowSize(settings.initialWindowSize());
}
}
/**
* Applies settings received from the remote endpoint.
*/
private void applyRemoteSettings(Http2Settings settings) throws Http2Exception {
if (settings.hasPushEnabled()) {
if (!connection.isServer()) {
throw protocolError("Client received SETTINGS frame with ENABLE_PUSH specified");
}
connection.remote().allowPushTo(settings.pushEnabled());
}
if (settings.hasAllowCompressedData()) {
connection.remote().allowCompressedData(settings.allowCompressedData());
}
if (settings.hasMaxConcurrentStreams()) {
connection.local().maxStreams(settings.maxConcurrentStreams());
}
if (settings.hasMaxHeaderTableSize()) {
frameWriter.maxHeaderTableSize(settings.maxHeaderTableSize());
}
if (settings.hasInitialWindowSize()) {
outboundFlow.initialOutboundWindowSize(settings.initialWindowSize());
}
}
/**
* Creates a new stream initiated by the local endpoint.
*/
private Http2Stream createLocalStream(int streamId, boolean halfClosed, int streamDependency,
short weight, boolean exclusive) throws Http2Exception {
Http2Stream stream = connection.local().createStream(streamId, halfClosed);
inboundFlow.addStream(streamId);
if (!halfClosed) {
outboundFlow.addStream(streamId, streamDependency, weight, exclusive);
}
return stream;
}
/**
* Creates a new stream initiated by the remote endpoint.
*/
private Http2Stream createRemoteStream(int streamId, boolean halfClosed, int streamDependency,
short weight, boolean exclusive) throws Http2Exception {
Http2Stream stream = connection.remote().createStream(streamId, halfClosed);
outboundFlow.addStream(streamId, streamDependency, weight, exclusive);
if (!halfClosed) {
inboundFlow.addStream(streamId);
}
return stream;
}
/** /**
* Handles all inbound frames from the network. * Handles all inbound frames from the network.
*/ */
@ -630,14 +752,7 @@ public abstract class AbstractHttp2ConnectionHandler extends ByteToMessageDecode
Http2Stream stream = connection.stream(streamId); Http2Stream stream = connection.stream(streamId);
if (stream == null) { if (stream == null) {
// Create the new stream. createRemoteStream(streamId, endStream, streamDependency, weight, exclusive);
connection.remote().createStream(streamId, endStream);
// Allow bi-directional traffic.
outboundFlow.addStream(streamId, streamDependency, weight, exclusive);
if (!endStream) {
inboundFlow.addStream(streamId);
}
} else { } else {
if (stream.state() == RESERVED_REMOTE) { if (stream.state() == RESERVED_REMOTE) {
// Received headers for a reserved push stream ... open it for push to the // Received headers for a reserved push stream ... open it for push to the
@ -717,25 +832,7 @@ public abstract class AbstractHttp2ConnectionHandler extends ByteToMessageDecode
Http2Settings settings = outstandingLocalSettingsQueue.poll(); Http2Settings settings = outstandingLocalSettingsQueue.poll();
if (settings != null) { if (settings != null) {
if (settings.hasPushEnabled()) { applyLocalSettings(settings);
connection.local().allowPushTo(settings.pushEnabled());
}
if (settings.hasAllowCompressedData()) {
connection.local().allowCompressedData(settings.allowCompressedData());
}
if (settings.hasMaxConcurrentStreams()) {
connection.remote().maxStreams(settings.maxConcurrentStreams());
}
if (settings.hasMaxHeaderTableSize()) {
frameReader.maxHeaderTableSize(settings.maxHeaderTableSize());
}
if (settings.hasInitialWindowSize()) {
inboundFlow.initialInboundWindowSize(settings.initialWindowSize());
}
} }
AbstractHttp2ConnectionHandler.this.onSettingsAckRead(ctx); AbstractHttp2ConnectionHandler.this.onSettingsAckRead(ctx);
@ -744,28 +841,7 @@ public abstract class AbstractHttp2ConnectionHandler extends ByteToMessageDecode
@Override @Override
public void onSettingsRead(ChannelHandlerContext ctx, Http2Settings settings) public void onSettingsRead(ChannelHandlerContext ctx, Http2Settings settings)
throws Http2Exception { throws Http2Exception {
if (settings.hasPushEnabled()) { applyRemoteSettings(settings);
if (!connection.isServer()) {
throw protocolError("Client received SETTINGS frame with ENABLE_PUSH specified");
}
connection.remote().allowPushTo(settings.pushEnabled());
}
if (settings.hasAllowCompressedData()) {
connection.remote().allowCompressedData(settings.allowCompressedData());
}
if (settings.hasMaxConcurrentStreams()) {
connection.local().maxStreams(settings.maxConcurrentStreams());
}
if (settings.hasMaxHeaderTableSize()) {
frameWriter.maxHeaderTableSize(settings.maxHeaderTableSize());
}
if (settings.hasInitialWindowSize()) {
outboundFlow.initialOutboundWindowSize(settings.initialWindowSize());
}
// Acknowledge receipt of the settings. // Acknowledge receipt of the settings.
frameWriter.writeSettingsAck(ctx, ctx.newPromise()); frameWriter.writeSettingsAck(ctx, ctx.newPromise());

View File

@ -235,6 +235,13 @@ public class DefaultHttp2Connection implements Http2Connection {
maxStreams = Integer.MAX_VALUE; maxStreams = Integer.MAX_VALUE;
} }
@Override
public int nextStreamId() {
// For manually created client-side streams, 1 is reserved for HTTP upgrade, so
// start at 3.
return nextStreamId > 1? nextStreamId : nextStreamId + 2;
}
@Override @Override
public DefaultStream createStream(int streamId, boolean halfClosed) throws Http2Exception { public DefaultStream createStream(int streamId, boolean halfClosed) throws Http2Exception {
checkNewStreamAllowed(streamId); checkNewStreamAllowed(streamId);

View File

@ -24,12 +24,9 @@ import static io.netty.handler.codec.http2.Http2CodecUtil.MAX_UNSIGNED_SHORT;
import static io.netty.handler.codec.http2.Http2CodecUtil.MAX_WEIGHT; import static io.netty.handler.codec.http2.Http2CodecUtil.MAX_WEIGHT;
import static io.netty.handler.codec.http2.Http2CodecUtil.MIN_WEIGHT; import static io.netty.handler.codec.http2.Http2CodecUtil.MIN_WEIGHT;
import static io.netty.handler.codec.http2.Http2CodecUtil.PRIORITY_ENTRY_LENGTH; import static io.netty.handler.codec.http2.Http2CodecUtil.PRIORITY_ENTRY_LENGTH;
import static io.netty.handler.codec.http2.Http2CodecUtil.SETTINGS_COMPRESS_DATA; import static io.netty.handler.codec.http2.Http2CodecUtil.calcSettingsPayloadLength;
import static io.netty.handler.codec.http2.Http2CodecUtil.SETTINGS_ENABLE_PUSH; import static io.netty.handler.codec.http2.Http2CodecUtil.writeFrameHeader;
import static io.netty.handler.codec.http2.Http2CodecUtil.SETTINGS_HEADER_TABLE_SIZE; import static io.netty.handler.codec.http2.Http2CodecUtil.writeSettingsPayload;
import static io.netty.handler.codec.http2.Http2CodecUtil.SETTINGS_INITIAL_WINDOW_SIZE;
import static io.netty.handler.codec.http2.Http2CodecUtil.SETTINGS_MAX_CONCURRENT_STREAMS;
import static io.netty.handler.codec.http2.Http2CodecUtil.SETTING_ENTRY_LENGTH;
import static io.netty.handler.codec.http2.Http2CodecUtil.writeUnsignedInt; import static io.netty.handler.codec.http2.Http2CodecUtil.writeUnsignedInt;
import static io.netty.handler.codec.http2.Http2CodecUtil.writeUnsignedShort; import static io.netty.handler.codec.http2.Http2CodecUtil.writeUnsignedShort;
import static io.netty.util.CharsetUtil.UTF_8; import static io.netty.util.CharsetUtil.UTF_8;
@ -160,38 +157,10 @@ public class DefaultHttp2FrameWriter implements Http2FrameWriter {
public ChannelFuture writeSettings(ChannelHandlerContext ctx, ChannelPromise promise, public ChannelFuture writeSettings(ChannelHandlerContext ctx, ChannelPromise promise,
Http2Settings settings) { Http2Settings settings) {
try { try {
int numFields = 0; int payloadLength = calcSettingsPayloadLength(settings);
numFields += settings.hasAllowCompressedData() ? 1 : 0;
numFields += settings.hasMaxHeaderTableSize() ? 1 : 0;
numFields += settings.hasInitialWindowSize() ? 1 : 0;
numFields += settings.hasMaxConcurrentStreams() ? 1 : 0;
numFields += settings.hasPushEnabled() ? 1 : 0;
int payloadLength = SETTING_ENTRY_LENGTH * numFields;
ByteBuf frame = ctx.alloc().buffer(FRAME_HEADER_LENGTH + payloadLength); ByteBuf frame = ctx.alloc().buffer(FRAME_HEADER_LENGTH + payloadLength);
writeFrameHeader(frame, payloadLength, Http2FrameType.SETTINGS, Http2Flags.EMPTY, 0); writeFrameHeader(frame, payloadLength, Http2FrameType.SETTINGS, Http2Flags.EMPTY, 0);
writeSettingsPayload(settings, frame);
if (settings.hasAllowCompressedData()) {
frame.writeByte(SETTINGS_COMPRESS_DATA);
writeUnsignedInt(settings.allowCompressedData() ? 1L : 0L, frame);
}
if (settings.hasMaxHeaderTableSize()) {
frame.writeByte(SETTINGS_HEADER_TABLE_SIZE);
writeUnsignedInt(settings.maxHeaderTableSize(), frame);
}
if (settings.hasInitialWindowSize()) {
frame.writeByte(SETTINGS_INITIAL_WINDOW_SIZE);
writeUnsignedInt(settings.initialWindowSize(), frame);
}
if (settings.hasMaxConcurrentStreams()) {
frame.writeByte(SETTINGS_MAX_CONCURRENT_STREAMS);
writeUnsignedInt(settings.maxConcurrentStreams(), frame);
}
if (settings.hasPushEnabled()) {
// Only write the enable push flag from client endpoints.
frame.writeByte(SETTINGS_ENABLE_PUSH);
writeUnsignedInt(settings.pushEnabled() ? 1L : 0L, frame);
}
return ctx.writeAndFlush(frame, promise); return ctx.writeAndFlush(frame, promise);
} catch (RuntimeException e) { } catch (RuntimeException e) {
return promise.setFailure(e); return promise.setFailure(e);
@ -372,15 +341,6 @@ public class DefaultHttp2FrameWriter implements Http2FrameWriter {
} }
} }
private static void writeFrameHeader(ByteBuf out, int payloadLength, Http2FrameType type,
Http2Flags flags, int streamId) {
out.ensureWritable(FRAME_HEADER_LENGTH + payloadLength);
out.writeShort(payloadLength);
out.writeByte(type.typeCode());
out.writeByte(flags.value());
out.writeInt(streamId);
}
private ChannelFuture writeHeadersInternal(ChannelHandlerContext ctx, ChannelPromise promise, private ChannelFuture writeHeadersInternal(ChannelHandlerContext ctx, ChannelPromise promise,
int streamId, Http2Headers headers, int padding, boolean endStream, boolean endSegment, int streamId, Http2Headers headers, int padding, boolean endStream, boolean endSegment,
boolean hasPriority, int streamDependency, short weight, boolean exclusive) { boolean hasPriority, int streamDependency, short weight, boolean exclusive) {

View File

@ -0,0 +1,120 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License, version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package io.netty.handler.codec.http2;
import static io.netty.handler.codec.base64.Base64Dialect.URL_SAFE;
import static io.netty.handler.codec.http2.Http2CodecUtil.HTTP_UPGRADE_PROTOCOL_NAME;
import static io.netty.handler.codec.http2.Http2CodecUtil.HTTP_UPGRADE_SETTINGS_HEADER;
import static io.netty.handler.codec.http2.Http2CodecUtil.calcSettingsPayloadLength;
import static io.netty.handler.codec.http2.Http2CodecUtil.writeSettingsPayload;
import static io.netty.util.CharsetUtil.UTF_8;
import static io.netty.util.ReferenceCountUtil.release;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.base64.Base64;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpClientUpgradeHandler;
import io.netty.handler.codec.http.HttpRequest;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
/**
* Client-side cleartext upgrade codec from HTTP to HTTP/2.
*/
public class Http2ClientUpgradeCodec implements HttpClientUpgradeHandler.UpgradeCodec {
private static final List<String> UPGRADE_HEADERS = Collections.unmodifiableList(Arrays
.asList(HTTP_UPGRADE_SETTINGS_HEADER));
private final String handlerName;
private final AbstractHttp2ConnectionHandler connectionHandler;
/**
* Creates the codec using a default name for the connection handler when adding to the
* pipeline.
*
* @param connectionHandler the HTTP/2 connection handler.
*/
public Http2ClientUpgradeCodec(AbstractHttp2ConnectionHandler connectionHandler) {
this("http2ConnectionHandler", connectionHandler);
}
/**
* Creates the codec providing an upgrade to the given handler for HTTP/2.
*
* @param handlerName the name of the HTTP/2 connection handler to be used in the pipeline.
* @param connectionHandler the HTTP/2 connection handler.
*/
public Http2ClientUpgradeCodec(String handlerName,
AbstractHttp2ConnectionHandler connectionHandler) {
if (handlerName == null) {
throw new NullPointerException("handlerName");
}
if (connectionHandler == null) {
throw new NullPointerException("connectionHandler");
}
this.handlerName = handlerName;
this.connectionHandler = connectionHandler;
}
@Override
public String protocol() {
return HTTP_UPGRADE_PROTOCOL_NAME;
}
@Override
public Collection<String> setUpgradeHeaders(ChannelHandlerContext ctx,
HttpRequest upgradeRequest) {
String settingsValue = getSettingsHeaderValue(ctx);
upgradeRequest.headers().set(HTTP_UPGRADE_SETTINGS_HEADER, settingsValue);
return UPGRADE_HEADERS;
}
@Override
public void upgradeTo(ChannelHandlerContext ctx, FullHttpResponse upgradeResponse)
throws Exception {
// Reserve local stream 1 for the response.
connectionHandler.onHttpClientUpgrade();
// Add the handler to the pipeline.
ctx.pipeline().addAfter(ctx.name(), handlerName, connectionHandler);
}
/**
* Converts the current settings for the handler to the Base64-encoded representation used in
* the HTTP2-Settings upgrade header.
*/
private String getSettingsHeaderValue(ChannelHandlerContext ctx) {
ByteBuf buf = null;
ByteBuf encodedBuf = null;
try {
// Get the local settings for the handler.
Http2Settings settings = connectionHandler.settings();
// Serialize the payload of the SETTINGS frame.
buf = ctx.alloc().buffer(calcSettingsPayloadLength(settings));
writeSettingsPayload(settings, buf);
// Base64 encode the payload and then convert to a string for the header.
encodedBuf = Base64.encode(buf, URL_SAFE);
return encodedBuf.toString(UTF_8);
} finally {
release(buf);
release(encodedBuf);
}
}
}

View File

@ -31,6 +31,10 @@ public final class Http2CodecUtil {
private static final byte[] EMPTY_PING = new byte[8]; private static final byte[] EMPTY_PING = new byte[8];
public static final int CONNECTION_STREAM_ID = 0; public static final int CONNECTION_STREAM_ID = 0;
public static final int HTTP_UPGRADE_STREAM_ID = 1;
public static final String HTTP_UPGRADE_SETTINGS_HEADER = "HTTP2-Settings";
public static final String HTTP_UPGRADE_PROTOCOL_NAME = "h2c-12";
public static final int MAX_FRAME_PAYLOAD_LENGTH = 16383; public static final int MAX_FRAME_PAYLOAD_LENGTH = 16383;
public static final int PING_FRAME_PAYLOAD_LENGTH = 8; public static final int PING_FRAME_PAYLOAD_LENGTH = 8;
public static final short MAX_UNSIGNED_BYTE = 0xFF; public static final short MAX_UNSIGNED_BYTE = 0xFF;
@ -126,6 +130,61 @@ public final class Http2CodecUtil {
out.writeByte((int) ((value & 0xFF))); out.writeByte((int) ((value & 0xFF)));
} }
/**
* Writes an HTTP/2 frame header to the output buffer.
*/
public static void writeFrameHeader(ByteBuf out, int payloadLength, Http2FrameType type,
Http2Flags flags, int streamId) {
out.ensureWritable(FRAME_HEADER_LENGTH + payloadLength);
out.writeShort(payloadLength);
out.writeByte(type.typeCode());
out.writeByte(flags.value());
out.writeInt(streamId);
}
/**
* Calculates the HTTP/2 SETTINGS payload length for the serialized representation
* of the given settings.
*/
public static int calcSettingsPayloadLength(Http2Settings settings) {
int numFields = 0;
numFields += settings.hasAllowCompressedData() ? 1 : 0;
numFields += settings.hasMaxHeaderTableSize() ? 1 : 0;
numFields += settings.hasInitialWindowSize() ? 1 : 0;
numFields += settings.hasMaxConcurrentStreams() ? 1 : 0;
numFields += settings.hasPushEnabled() ? 1 : 0;
return SETTING_ENTRY_LENGTH * numFields;
}
/**
* Serializes the settings to the output buffer in the format of an HTTP/2 SETTINGS frame
* payload.
*/
public static void writeSettingsPayload(Http2Settings settings, ByteBuf out) {
if (settings.hasAllowCompressedData()) {
out.writeByte(SETTINGS_COMPRESS_DATA);
writeUnsignedInt(settings.allowCompressedData() ? 1L : 0L, out);
}
if (settings.hasMaxHeaderTableSize()) {
out.writeByte(SETTINGS_HEADER_TABLE_SIZE);
writeUnsignedInt(settings.maxHeaderTableSize(), out);
}
if (settings.hasInitialWindowSize()) {
out.writeByte(SETTINGS_INITIAL_WINDOW_SIZE);
writeUnsignedInt(settings.initialWindowSize(), out);
}
if (settings.hasMaxConcurrentStreams()) {
out.writeByte(SETTINGS_MAX_CONCURRENT_STREAMS);
writeUnsignedInt(settings.maxConcurrentStreams(), out);
}
if (settings.hasPushEnabled()) {
// Only write the enable push flag from client endpoints.
out.writeByte(SETTINGS_ENABLE_PUSH);
writeUnsignedInt(settings.pushEnabled() ? 1L : 0L, out);
}
}
/** /**
* Fails the given promise with the cause and then re-throws the cause. * Fails the given promise with the cause and then re-throws the cause.
*/ */

View File

@ -24,6 +24,11 @@ public interface Http2Connection {
*/ */
interface Endpoint { interface Endpoint {
/**
* Returns the next valid streamId for this endpoint.
*/
int nextStreamId();
/** /**
* Creates a stream initiated by this endpoint. This could fail for the following reasons: * Creates a stream initiated by this endpoint. This could fail for the following reasons:
* <p/> * <p/>

View File

@ -0,0 +1,91 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License, version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package io.netty.handler.codec.http2;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
/**
* Convenience class that provides no-op implementations for all methods of {@link Http2FrameObserver}.
*/
public class Http2FrameAdapter implements Http2FrameObserver {
@Override
public void onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding,
boolean endOfStream, boolean endOfSegment, boolean compressed) throws Http2Exception {
}
@Override
public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers,
int padding, boolean endStream, boolean endSegment) throws Http2Exception {
}
@Override
public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers,
int streamDependency, short weight, boolean exclusive, int padding, boolean endStream,
boolean endSegment) throws Http2Exception {
}
@Override
public void onPriorityRead(ChannelHandlerContext ctx, int streamId, int streamDependency,
short weight, boolean exclusive) throws Http2Exception {
}
@Override
public void onRstStreamRead(ChannelHandlerContext ctx, int streamId, long errorCode)
throws Http2Exception {
}
@Override
public void onSettingsAckRead(ChannelHandlerContext ctx) throws Http2Exception {
}
@Override
public void onSettingsRead(ChannelHandlerContext ctx, Http2Settings settings)
throws Http2Exception {
}
@Override
public void onPingRead(ChannelHandlerContext ctx, ByteBuf data) throws Http2Exception {
}
@Override
public void onPingAckRead(ChannelHandlerContext ctx, ByteBuf data) throws Http2Exception {
}
@Override
public void onPushPromiseRead(ChannelHandlerContext ctx, int streamId, int promisedStreamId,
Http2Headers headers, int padding) throws Http2Exception {
}
@Override
public void onGoAwayRead(ChannelHandlerContext ctx, int lastStreamId, long errorCode,
ByteBuf debugData) throws Http2Exception {
}
@Override
public void onWindowUpdateRead(ChannelHandlerContext ctx, int streamId, int windowSizeIncrement)
throws Http2Exception {
}
@Override
public void onAltSvcRead(ChannelHandlerContext ctx, int streamId, long maxAge, int port,
ByteBuf protocolId, String host, String origin) throws Http2Exception {
}
@Override
public void onBlockedRead(ChannelHandlerContext ctx, int streamId) throws Http2Exception {
}
}

View File

@ -58,7 +58,7 @@ public class Http2FrameLogger extends ChannelHandlerAdapter {
public void logHeaders(Direction direction, int streamId, Http2Headers headers, int padding, public void logHeaders(Direction direction, int streamId, Http2Headers headers, int padding,
boolean endStream, boolean endSegment) { boolean endStream, boolean endSegment) {
log(direction, "HEADERS: steramId:%d, headers=%s, padding=%d, endStream=%b, endSegment=%b", log(direction, "HEADERS: streamId:%d, headers=%s, padding=%d, endStream=%b, endSegment=%b",
streamId, headers, padding, endStream, endSegment); streamId, headers, padding, endStream, endSegment);
} }
@ -66,7 +66,7 @@ public class Http2FrameLogger extends ChannelHandlerAdapter {
int streamDependency, short weight, boolean exclusive, int padding, boolean endStream, int streamDependency, short weight, boolean exclusive, int padding, boolean endStream,
boolean endSegment) { boolean endSegment) {
log(direction, log(direction,
"HEADERS: steramId:%d, headers=%s, streamDependency=%d, weight=%d, exclusive=%b, " "HEADERS: streamId:%d, headers=%s, streamDependency=%d, weight=%d, exclusive=%b, "
+ "padding=%d, endStream=%b, endSegment=%b", streamId, headers, + "padding=%d, endStream=%b, endSegment=%b", streamId, headers,
streamDependency, weight, exclusive, padding, endStream, endSegment); streamDependency, weight, exclusive, padding, endStream, endSegment);
} }
@ -109,7 +109,7 @@ public class Http2FrameLogger extends ChannelHandlerAdapter {
} }
public void logWindowsUpdate(Direction direction, int streamId, int windowSizeIncrement) { public void logWindowsUpdate(Direction direction, int streamId, int windowSizeIncrement) {
log(direction, "WINDOW_UPDATE: stream=%d, windowSizeIncrement=%d", streamId, log(direction, "WINDOW_UPDATE: streamId=%d, windowSizeIncrement=%d", streamId,
windowSizeIncrement); windowSizeIncrement);
} }

View File

@ -0,0 +1,164 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License, version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package io.netty.handler.codec.http2;
import static io.netty.handler.codec.base64.Base64Dialect.URL_SAFE;
import static io.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST;
import static io.netty.handler.codec.http2.Http2CodecUtil.HTTP_UPGRADE_PROTOCOL_NAME;
import static io.netty.handler.codec.http2.Http2CodecUtil.HTTP_UPGRADE_SETTINGS_HEADER;
import static io.netty.handler.codec.http2.Http2CodecUtil.writeFrameHeader;
import static io.netty.handler.codec.http2.Http2Flags.EMPTY;
import static io.netty.handler.codec.http2.Http2FrameType.SETTINGS;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.base64.Base64;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpServerUpgradeHandler;
import io.netty.util.CharsetUtil;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
/**
* Server-side codec for performing a cleartext upgrade from HTTP/1.x to HTTP/2.
*/
public class Http2ServerUpgradeCodec implements HttpServerUpgradeHandler.UpgradeCodec {
private static final List<String> REQUIRED_UPGRADE_HEADERS = Collections
.unmodifiableList(Arrays.asList(HTTP_UPGRADE_SETTINGS_HEADER));
private final String handlerName;
private final AbstractHttp2ConnectionHandler connectionHandler;
private final Http2FrameReader frameReader;
private Http2Settings settings;
/**
* Creates the codec using a default name for the connection handler when adding to the
* pipeline.
*
* @param connectionHandler the HTTP/2 connection handler.
*/
public Http2ServerUpgradeCodec(AbstractHttp2ConnectionHandler connectionHandler) {
this("http2ConnectionHandler", connectionHandler);
}
/**
* Creates the codec providing an upgrade to the given handler for HTTP/2.
*
* @param handlerName the name of the HTTP/2 connection handler to be used in the pipeline.
* @param connectionHandler the HTTP/2 connection handler.
*/
public Http2ServerUpgradeCodec(String handlerName,
AbstractHttp2ConnectionHandler connectionHandler) {
if (handlerName == null) {
throw new NullPointerException("handlerName");
}
if (connectionHandler == null) {
throw new NullPointerException("connectionHandler");
}
this.handlerName = handlerName;
this.connectionHandler = connectionHandler;
this.frameReader = new DefaultHttp2FrameReader();
}
@Override
public String protocol() {
return HTTP_UPGRADE_PROTOCOL_NAME;
}
@Override
public Collection<String> requiredUpgradeHeaders() {
return REQUIRED_UPGRADE_HEADERS;
}
@Override
public void prepareUpgradeResponse(ChannelHandlerContext ctx, FullHttpRequest upgradeRequest,
FullHttpResponse upgradeResponse) {
try {
// Decode the HTTP2-Settings header and set the settings on the handler to make
// sure everything is fine with the request.
String settingsHeader = upgradeRequest.headers().get(HTTP_UPGRADE_SETTINGS_HEADER);
settings = decodeSettingsHeader(ctx, settingsHeader);
connectionHandler.onHttpServerUpgrade(settings);
// Everything looks good, no need to modify the response.
} catch (Throwable e) {
// Send a failed response back to the client.
upgradeResponse.setStatus(BAD_REQUEST);
upgradeResponse.headers().clear();
}
}
@Override
public void upgradeTo(final ChannelHandlerContext ctx, FullHttpRequest upgradeRequest,
FullHttpResponse upgradeResponse) {
// Add the HTTP/2 connection handler to the pipeline immediately following the current
// handler.
ctx.pipeline().addAfter(ctx.name(), handlerName, connectionHandler);
}
/**
* Decodes the settings header and returns a {@link Http2Settings} object.
*/
private Http2Settings decodeSettingsHeader(ChannelHandlerContext ctx, String settingsHeader)
throws Http2Exception {
ByteBuf header = Unpooled.wrappedBuffer(settingsHeader.getBytes(CharsetUtil.UTF_8));
try {
// Decode the SETTINGS payload.
ByteBuf payload = Base64.decode(header, URL_SAFE);
// Create an HTTP/2 frame for the settings.
ByteBuf frame = createSettingsFrame(ctx, payload);
// Decode the SETTINGS frame and return the settings object.
return decodeSettings(ctx, frame);
} finally {
header.release();
}
}
/**
* Decodes the settings frame and returns the settings.
*/
private Http2Settings decodeSettings(ChannelHandlerContext ctx, ByteBuf frame) throws Http2Exception {
try {
final Http2Settings decodedSettings = new Http2Settings();
frameReader.readFrame(ctx, frame, new Http2FrameAdapter() {
@Override
public void onSettingsRead(ChannelHandlerContext ctx, Http2Settings settings)
throws Http2Exception {
decodedSettings.copy(settings);
}
});
return decodedSettings;
} finally {
frame.release();
}
}
/**
* Creates an HTTP2-Settings header with the given payload. The payload buffer is released.
*/
private static ByteBuf createSettingsFrame(ChannelHandlerContext ctx, ByteBuf payload) {
ByteBuf frame =
ctx.alloc().buffer(Http2CodecUtil.FRAME_HEADER_LENGTH + payload.readableBytes());
writeFrameHeader(frame, payload.readableBytes(), SETTINGS, EMPTY, 0);
frame.writeBytes(payload);
payload.release();
return frame;
}
}

View File

@ -180,6 +180,22 @@ public class Http2Settings {
return this; return this;
} }
/**
* Overwrites this settings object with the values of the given settings.
*
* @param source the source that will overwrite the current settings.
* @return this object.
*/
public Http2Settings copy(Http2Settings source) {
this.enabled = source.enabled;
this.allowCompressedData = source.allowCompressedData;
this.initialWindowSize = source.initialWindowSize;
this.maxConcurrentStreams = source.maxConcurrentStreams;
this.maxHeaderTableSize = source.maxHeaderTableSize;
this.pushEnabled = source.pushEnabled;
return this;
}
@Override @Override
public int hashCode() { public int hashCode() {
final int prime = 31; final int prime = 31;

View File

@ -0,0 +1,121 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License, version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package io.netty.example.http2;
import java.util.Locale;
/**
* Utility methods used by the example client and server.
*/
public final class Http2ExampleUtil {
/**
* Response header sent in response to the http->http2 cleartext upgrade request.
*/
public static final String UPGRADE_RESPONSE_HEADER = "Http-To-Http2-Upgrade";
public static final String STANDARD_HOST = "localhost";
public static final int STANDARD_HTTP_PORT = 8080;
public static final int STANDARD_HTTPS_PORT = 8443;
public static final String PORT_ARG = "-port=";
public static final String SSL_ARG = "-ssl=";
public static final String HOST_ARG = "-host=";
/**
* The configuration for the example client/server endpoints.
*/
public static final class EndpointConfig {
private final boolean ssl;
private final String host;
private final int port;
public EndpointConfig(boolean ssl, String host, int port) {
this.ssl = ssl;
this.host = host;
this.port = port;
}
public boolean isSsl() {
return ssl;
}
public String host() {
return host;
}
public int port() {
return port;
}
@Override
public String toString() {
return "EndpointConfig [ssl=" + ssl + ", host=" + host + ", port=" + port + "]";
}
}
/**
* Parse the command-line arguments to determine the configuration for the endpoint.
*/
public static EndpointConfig parseEndpointConfig(String[] args) {
boolean ssl = false;
int port = STANDARD_HTTP_PORT;
String host = STANDARD_HOST;
boolean portSpecified = false;
for (String arg : args) {
arg = arg.trim().toLowerCase(Locale.US);
if (arg.startsWith(PORT_ARG)) {
String value = arg.substring(PORT_ARG.length());
port = Integer.parseInt(value);
portSpecified = true;
} else if (arg.startsWith(SSL_ARG)) {
String value = arg.substring(SSL_ARG.length());
ssl = Boolean.parseBoolean(value);
if (!portSpecified) {
// Use the appropriate default.
port = ssl ? STANDARD_HTTPS_PORT : STANDARD_HTTP_PORT;
}
} else if (arg.startsWith(HOST_ARG)) {
host = arg.substring(HOST_ARG.length());
}
}
// If SSL was selected, verify that NPN is supported.
if (ssl) {
checkForNpnSupport();
}
return new EndpointConfig(ssl, host, port);
}
/**
* Checks for NPN support. If not supported, prints an error message throws an exception.
*/
private static void checkForNpnSupport() {
try {
Class.forName("sun.security.ssl.NextProtoNegoExtension");
} catch (ClassNotFoundException ignored) {
System.err.println();
System.err.println("Could not locate Next Protocol Negotiation (NPN) implementation.");
System.err.println("The NPN jar should have been made available when building the examples with maven.");
System.err.println("Please check that your JDK is among those supported by Jetty-NPN:");
System.err.println("http://wiki.eclipse.org/Jetty/Feature/NPN#Versions");
System.err.println();
throw new IllegalStateException("Could not locate NPN implementation. See console err for details.");
}
}
private Http2ExampleUtil() {
}
}

View File

@ -14,42 +14,46 @@
*/ */
package io.netty.example.http2.client; package io.netty.example.http2.client;
import static java.util.concurrent.TimeUnit.SECONDS; import static io.netty.example.http2.Http2ExampleUtil.parseEndpointConfig;
import static io.netty.handler.codec.http.HttpHeaders.Names.HOST;
import static io.netty.handler.codec.http.HttpMethod.GET;
import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption; import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup; import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.example.http2.server.Http2Server; import io.netty.example.http2.Http2ExampleUtil.EndpointConfig;
import io.netty.handler.codec.http.HttpMethod; import io.netty.handler.codec.http.DefaultFullHttpRequest;
import io.netty.handler.codec.http2.DefaultHttp2Headers; import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http2.Http2Exception;
import io.netty.handler.codec.http2.Http2Headers;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.concurrent.BlockingQueue;
/** /**
* An HTTP2 client that allows you to send HTTP2 frames to a server. Inbound and outbound frames are * An HTTP2 client that allows you to send HTTP2 frames to a server. Inbound and outbound frames are
* logged. * logged. When run from the command-line, sends a single HEADERS frame to the server and gets back
* a "Hello World" response.
* <p>
* To client accepts command-line arguments for {@code -host=<host/ip>}
* <i>(default="localhost")</i>, {@code -port=<port number>} <i>(default: http=8080,
* https=8443)</i>, and {@code -ssl=<true/false>} <i>(default=false)</i>.
*/ */
public class Http2Client { public class Http2Client {
private final String host; private final EndpointConfig config;
private final int port; private Http2ClientConnectionHandler http2ConnectionHandler;
private final Http2ClientConnectionHandler http2ConnectionHandler;
private Channel channel; private Channel channel;
private EventLoopGroup workerGroup; private EventLoopGroup workerGroup;
public Http2Client(String host, int port) { public Http2Client(EndpointConfig config) {
this.host = host; this.config = config;
this.port = port;
http2ConnectionHandler = new Http2ClientConnectionHandler();
} }
/**
* Starts the client and waits for the HTTP/2 upgrade to occur.
*/
public void start() throws Exception { public void start() throws Exception {
if (channel != null) { if (channel != null) {
System.out.println("Already running!"); System.out.println("Already running!");
@ -62,15 +66,40 @@ public class Http2Client {
b.group(workerGroup); b.group(workerGroup);
b.channel(NioSocketChannel.class); b.channel(NioSocketChannel.class);
b.option(ChannelOption.SO_KEEPALIVE, true); b.option(ChannelOption.SO_KEEPALIVE, true);
b.remoteAddress(new InetSocketAddress(host, port)); b.remoteAddress(new InetSocketAddress(config.host(), config.port()));
b.handler(new Http2ClientInitializer(http2ConnectionHandler)); Http2ClientInitializer initializer = new Http2ClientInitializer(config.isSsl());
b.handler(initializer);
// Start the client. // Start the client.
channel = b.connect().syncUninterruptibly().channel(); channel = b.connect().syncUninterruptibly().channel();
System.out.println("Connected to [" + config.host() + ':' + config.port() + ']');
// Wait for the HTTP/2 upgrade to occur.
http2ConnectionHandler = initializer.connectionHandler();
http2ConnectionHandler.awaitInitialization(); http2ConnectionHandler.awaitInitialization();
System.out.println("Connected to [" + host + ':' + port + ']');
} }
/**
* Sends the given request to the server.
*/
public void sendRequest(FullHttpRequest request) throws Exception {
ChannelFuture requestFuture = channel.writeAndFlush(request).sync();
System.out.println("Back from sending headers...");
if (!requestFuture.isSuccess()) {
throw new RuntimeException(requestFuture.cause());
}
}
/**
* Waits for the full response to be received.
*/
public void awaitResponse() throws Exception {
http2ConnectionHandler.awaitResponse();
}
/**
* Closes the channel and waits for shutdown to complete.
*/
public void stop() { public void stop() {
try { try {
// Wait until the connection is closed. // Wait until the connection is closed.
@ -82,52 +111,29 @@ public class Http2Client {
} }
} }
public ChannelFuture sendHeaders(int streamId, Http2Headers headers) throws Http2Exception {
return http2ConnectionHandler.writeHeaders(streamId, headers, 0, true, true);
}
public ChannelFuture send(int streamId, ByteBuf data, int padding, boolean endStream,
boolean endSegment, boolean compressed) throws Http2Exception {
return http2ConnectionHandler.writeData(streamId, data, padding, endStream, endSegment,
compressed);
}
public Http2Headers headers() {
return DefaultHttp2Headers.newBuilder().authority(host).method(HttpMethod.GET.name())
.build();
}
public BlockingQueue<ChannelFuture> queue() {
return http2ConnectionHandler.queue();
}
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
Http2Server.checkForNpnSupport(); EndpointConfig config = parseEndpointConfig(args);
int port; System.out.println(config);
if (args.length > 0) {
port = Integer.parseInt(args[0]);
} else {
port = 8443;
}
final Http2Client client = new Http2Client("localhost", port);
final Http2Client client = new Http2Client(config);
try { try {
// Start the client and wait for the HTTP/2 upgrade to complete.
client.start(); client.start();
System.out.println("Sending headers...");
ChannelFuture requestFuture = client.sendHeaders(3, client.headers()).sync(); // Create a simple GET request with just headers.
FullHttpRequest request = new DefaultFullHttpRequest(HTTP_1_1, GET, "/whatever");
request.headers().add(HOST, config.host());
System.out.println("Sending request...");
ChannelFuture requestFuture = client.channel.writeAndFlush(request).sync();
System.out.println("Back from sending headers..."); System.out.println("Back from sending headers...");
if (!requestFuture.isSuccess()) { if (!requestFuture.isSuccess()) {
requestFuture.cause().printStackTrace(); requestFuture.cause().printStackTrace();
return;
} }
// Waits for the complete response // Waits for the complete response
ChannelFuture responseFuture = client.queue().poll(5, SECONDS); client.awaitResponse();
if (!responseFuture.isSuccess()) {
responseFuture.cause().printStackTrace();
}
System.out.println("Finished HTTP/2 request"); System.out.println("Finished HTTP/2 request");
} catch (Throwable t) { } catch (Throwable t) {
t.printStackTrace(); t.printStackTrace();

View File

@ -14,15 +14,17 @@
*/ */
package io.netty.example.http2.client; package io.netty.example.http2.client;
import static io.netty.example.http2.Http2ExampleUtil.UPGRADE_RESPONSE_HEADER;
import static io.netty.util.internal.logging.InternalLogLevel.INFO; import static io.netty.util.internal.logging.InternalLogLevel.INFO;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise; import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http.FullHttpMessage;
import io.netty.handler.codec.http2.AbstractHttp2ConnectionHandler; import io.netty.handler.codec.http2.AbstractHttp2ConnectionHandler;
import io.netty.handler.codec.http2.DefaultHttp2Connection; import io.netty.handler.codec.http2.DefaultHttp2Connection;
import io.netty.handler.codec.http2.DefaultHttp2FrameReader; import io.netty.handler.codec.http2.DefaultHttp2FrameReader;
import io.netty.handler.codec.http2.DefaultHttp2FrameWriter; import io.netty.handler.codec.http2.DefaultHttp2FrameWriter;
import io.netty.handler.codec.http2.DefaultHttp2Headers;
import io.netty.handler.codec.http2.DefaultHttp2InboundFlowController; import io.netty.handler.codec.http2.DefaultHttp2InboundFlowController;
import io.netty.handler.codec.http2.DefaultHttp2OutboundFlowController; import io.netty.handler.codec.http2.DefaultHttp2OutboundFlowController;
import io.netty.handler.codec.http2.Http2Exception; import io.netty.handler.codec.http2.Http2Exception;
@ -36,8 +38,7 @@ import io.netty.handler.codec.http2.Http2Settings;
import io.netty.util.CharsetUtil; import io.netty.util.CharsetUtil;
import io.netty.util.internal.logging.InternalLoggerFactory; import io.netty.util.internal.logging.InternalLoggerFactory;
import java.util.concurrent.BlockingQueue; import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
/** /**
@ -47,36 +48,65 @@ import java.util.concurrent.TimeUnit;
public class Http2ClientConnectionHandler extends AbstractHttp2ConnectionHandler { public class Http2ClientConnectionHandler extends AbstractHttp2ConnectionHandler {
private static final Http2FrameLogger logger = new Http2FrameLogger(INFO, private static final Http2FrameLogger logger = new Http2FrameLogger(INFO,
InternalLoggerFactory.getInstance(Http2ClientConnectionHandler.class)); InternalLoggerFactory.getInstance(Http2ClientConnectionHandler.class));
private final BlockingQueue<ChannelFuture> queue = new LinkedBlockingQueue<ChannelFuture>(); private final ChannelPromise initPromise;
private final ChannelPromise responsePromise;
private ByteBuf collectedData; private ByteBuf collectedData;
private ChannelPromise initialized;
public Http2ClientConnectionHandler() { public Http2ClientConnectionHandler(ChannelPromise initPromise, ChannelPromise responsePromise) {
super(new DefaultHttp2Connection(false, false), frameReader(), frameWriter(), super(new DefaultHttp2Connection(false, false), frameReader(), frameWriter(),
new DefaultHttp2InboundFlowController(), new DefaultHttp2OutboundFlowController()); new DefaultHttp2InboundFlowController(), new DefaultHttp2OutboundFlowController());
this.initPromise = initPromise;
this.responsePromise = responsePromise;
} }
public void awaitInitialization() throws InterruptedException { /**
initialized.await(5, TimeUnit.SECONDS); * Wait for this handler to be added after the upgrade to HTTP/2, and for initial preface
* handshake to complete.
*/
public void awaitInitialization() throws Exception {
if (!initPromise.awaitUninterruptibly(5, TimeUnit.SECONDS)) {
throw new IllegalStateException("Timed out waiting for initialization");
}
if (!initPromise.isSuccess()) {
throw new RuntimeException(initPromise.cause());
}
} }
/**
* Wait for this full response to be received and printed out.
*/
public void awaitResponse() throws Exception {
if (!responsePromise.awaitUninterruptibly(5, TimeUnit.SECONDS)) {
throw new IllegalStateException("Timed out waiting for completion of the response");
}
if (!responsePromise.isSuccess()) {
throw new RuntimeException(initPromise.cause());
}
}
/**
* Handles conversion of a {@link FullHttpMessage} to HTTP/2 frames.
*/
@Override @Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception { public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
super.handlerAdded(ctx); throws Exception {
initialized = ctx.newPromise(); if (msg instanceof FullHttpMessage) {
} FullHttpMessage httpMsg = (FullHttpMessage) msg;
boolean hasData = httpMsg.content().isReadable();
public ChannelFuture writeData(int streamId, ByteBuf data, int padding, boolean endStream, // Convert and write the headers.
boolean endSegment, boolean compressed) throws Http2Exception { DefaultHttp2Headers.Builder headers = DefaultHttp2Headers.newBuilder();
return super.writeData(ctx(), ctx().newPromise(), streamId, data, padding, endStream, for (Map.Entry<String, String> entry : httpMsg.headers().entries()) {
endSegment, compressed); headers.add(entry.getKey(), entry.getValue());
} }
int streamId = nextStreamId();
public ChannelFuture writeHeaders(int streamId, Http2Headers headers, int padding, writeHeaders(ctx, promise, streamId, headers.build(), 0, !hasData, false);
boolean endStream, boolean endSegment) throws Http2Exception { if (hasData) {
return super.writeHeaders(ctx(), ctx().newPromise(), streamId, headers, padding, endStream, writeData(ctx, promise, streamId, httpMsg.content(), 0, true, true, false);
endSegment); }
} else {
super.write(ctx, msg, promise);
}
} }
@Override @Override
@ -107,7 +137,7 @@ public class Http2ClientConnectionHandler extends AbstractHttp2ConnectionHandler
collectedData.release(); collectedData.release();
collectedData = null; collectedData = null;
queue.add(ctx().channel().newSucceededFuture()); responsePromise.setSuccess();
} }
} }
@ -120,6 +150,9 @@ public class Http2ClientConnectionHandler extends AbstractHttp2ConnectionHandler
public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers, public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers,
int streamDependency, short weight, boolean exclusive, int padding, boolean endStream, int streamDependency, short weight, boolean exclusive, int padding, boolean endStream,
boolean endSegment) throws Http2Exception { boolean endSegment) throws Http2Exception {
if (headers.contains(UPGRADE_RESPONSE_HEADER)) {
System.out.println("Received HTTP/2 response to the HTTP->HTTP/2 upgrade request");
}
} }
@Override @Override
@ -139,8 +172,8 @@ public class Http2ClientConnectionHandler extends AbstractHttp2ConnectionHandler
@Override @Override
public void onSettingsRead(ChannelHandlerContext ctx, Http2Settings settings) public void onSettingsRead(ChannelHandlerContext ctx, Http2Settings settings)
throws Http2Exception { throws Http2Exception {
if (!initialized.isDone()) { if (!initPromise.isDone()) {
initialized.setSuccess(); initPromise.setSuccess();
} }
} }
@ -178,17 +211,13 @@ public class Http2ClientConnectionHandler extends AbstractHttp2ConnectionHandler
@Override @Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
queue.add(ctx.channel().newFailedFuture(cause)); if (!initPromise.isDone()) {
cause.printStackTrace(); initPromise.setFailure(cause);
super.exceptionCaught(ctx, cause);
if (!initialized.isDone()) {
initialized.setFailure(cause);
} }
} if (!responsePromise.isDone()) {
initPromise.setFailure(cause);
public BlockingQueue<ChannelFuture> queue() { }
return queue; super.exceptionCaught(ctx, cause);
} }
private static Http2FrameReader frameReader() { private static Http2FrameReader frameReader() {

View File

@ -14,11 +14,18 @@
*/ */
package io.netty.example.http2.client; package io.netty.example.http2.client;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.SocketChannel;
import io.netty.example.securechat.SecureChatSslContextFactory; import io.netty.example.securechat.SecureChatSslContextFactory;
import io.netty.handler.codec.http2.AbstractHttp2ConnectionHandler; import io.netty.handler.codec.http.DefaultFullHttpRequest;
import io.netty.handler.codec.http.HttpClientCodec;
import io.netty.handler.codec.http.HttpClientUpgradeHandler;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http2.Http2ClientUpgradeCodec;
import io.netty.handler.ssl.SslHandler; import io.netty.handler.ssl.SslHandler;
import javax.net.ssl.SSLEngine; import javax.net.ssl.SSLEngine;
@ -30,14 +37,31 @@ import org.eclipse.jetty.npn.NextProtoNego;
*/ */
public class Http2ClientInitializer extends ChannelInitializer<SocketChannel> { public class Http2ClientInitializer extends ChannelInitializer<SocketChannel> {
private final AbstractHttp2ConnectionHandler connectionHandler; private Http2ClientConnectionHandler connectionHandler;
private final boolean ssl;
public Http2ClientInitializer(AbstractHttp2ConnectionHandler connectionHandler) { public Http2ClientInitializer(boolean ssl) {
this.connectionHandler = connectionHandler; this.ssl = ssl;
} }
@Override @Override
public void initChannel(SocketChannel ch) throws Exception { public void initChannel(SocketChannel ch) throws Exception {
connectionHandler = new Http2ClientConnectionHandler(ch.newPromise(), ch.newPromise());
if (ssl) {
configureSsl(ch);
} else {
configureClearText(ch);
}
}
public Http2ClientConnectionHandler connectionHandler() {
return connectionHandler;
}
/**
* Configure the pipeline for TLS NPN negotiation to HTTP/2.
*/
private void configureSsl(SocketChannel ch) {
SSLEngine engine = SecureChatSslContextFactory.getClientContext().createSSLEngine(); SSLEngine engine = SecureChatSslContextFactory.getClientContext().createSSLEngine();
engine.setUseClientMode(true); engine.setUseClientMode(true);
NextProtoNego.put(engine, new Http2ClientProvider()); NextProtoNego.put(engine, new Http2ClientProvider());
@ -48,4 +72,46 @@ public class Http2ClientInitializer extends ChannelInitializer<SocketChannel> {
pipeline.addLast("ssl", new SslHandler(engine)); pipeline.addLast("ssl", new SslHandler(engine));
pipeline.addLast("http2ConnectionHandler", connectionHandler); pipeline.addLast("http2ConnectionHandler", connectionHandler);
} }
/**
* Configure the pipeline for a cleartext upgrade from HTTP to HTTP/2.
*/
private void configureClearText(SocketChannel ch) {
HttpClientCodec sourceCodec = new HttpClientCodec();
Http2ClientUpgradeCodec upgradeCodec = new Http2ClientUpgradeCodec(connectionHandler);
HttpClientUpgradeHandler upgradeHandler = new HttpClientUpgradeHandler(sourceCodec, upgradeCodec, 65536);
ch.pipeline().addLast(sourceCodec);
ch.pipeline().addLast(upgradeHandler);
ch.pipeline().addLast(new UpgradeRequestHandler());
ch.pipeline().addLast(new UserEventLogger());
}
/**
* A handler that triggers the cleartext upgrade to HTTP/2 by sending an initial HTTP request.
*/
private static class UpgradeRequestHandler extends ChannelHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
DefaultFullHttpRequest upgradeRequest =
new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/");
ctx.writeAndFlush(upgradeRequest);
super.channelActive(ctx);
// Done with this handler, remove it from the pipeline.
ctx.pipeline().remove(ctx.name());
}
}
/**
* Class that logs any User Events triggered on this channel.
*/
private static class UserEventLogger extends ChannelHandlerAdapter {
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
System.out.println("User Event Triggered: " + evt);
super.userEventTriggered(ctx, evt);
}
}
} }

View File

@ -15,26 +15,62 @@
package io.netty.example.http2.server; package io.netty.example.http2.server;
import static io.netty.example.http2.Http2ExampleUtil.UPGRADE_RESPONSE_HEADER;
import static io.netty.util.internal.logging.InternalLogLevel.INFO;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.example.http2.client.Http2ClientConnectionHandler;
import io.netty.handler.codec.http.HttpServerUpgradeHandler;
import io.netty.handler.codec.http2.AbstractHttp2ConnectionHandler; import io.netty.handler.codec.http2.AbstractHttp2ConnectionHandler;
import io.netty.handler.codec.http2.DefaultHttp2Connection;
import io.netty.handler.codec.http2.DefaultHttp2FrameReader;
import io.netty.handler.codec.http2.DefaultHttp2FrameWriter;
import io.netty.handler.codec.http2.DefaultHttp2Headers; import io.netty.handler.codec.http2.DefaultHttp2Headers;
import io.netty.handler.codec.http2.DefaultHttp2InboundFlowController;
import io.netty.handler.codec.http2.DefaultHttp2OutboundFlowController;
import io.netty.handler.codec.http2.Http2Exception; import io.netty.handler.codec.http2.Http2Exception;
import io.netty.handler.codec.http2.Http2FrameLogger;
import io.netty.handler.codec.http2.Http2Headers; import io.netty.handler.codec.http2.Http2Headers;
import io.netty.handler.codec.http2.Http2InboundFrameLogger;
import io.netty.handler.codec.http2.Http2OutboundFrameLogger;
import io.netty.handler.codec.http2.Http2Settings; import io.netty.handler.codec.http2.Http2Settings;
import io.netty.util.CharsetUtil; import io.netty.util.CharsetUtil;
import io.netty.util.internal.logging.InternalLoggerFactory;
/** /**
* A simple handler that responds with the message "Hello World!". * A simple handler that responds with the message "Hello World!".
*/ */
public class HelloWorldHttp2Handler extends AbstractHttp2ConnectionHandler { public class HelloWorldHttp2Handler extends AbstractHttp2ConnectionHandler {
private static final Http2FrameLogger logger = new Http2FrameLogger(INFO,
InternalLoggerFactory.getInstance(Http2ClientConnectionHandler.class));
static final byte[] RESPONSE_BYTES = "Hello World".getBytes(CharsetUtil.UTF_8); static final byte[] RESPONSE_BYTES = "Hello World".getBytes(CharsetUtil.UTF_8);
public HelloWorldHttp2Handler() { public HelloWorldHttp2Handler() {
super(true); super(new DefaultHttp2Connection(true, false), new Http2InboundFrameLogger(
new DefaultHttp2FrameReader(), logger), new Http2OutboundFrameLogger(
new DefaultHttp2FrameWriter(), logger), new DefaultHttp2InboundFlowController(),
new DefaultHttp2OutboundFlowController());
} }
/**
* Handles the cleartext HTTP upgrade event. If an upgrade occurred, sends a simple response via HTTP/2
* on stream 1 (the stream specifically reserved for cleartext HTTP upgrade).
*/
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof HttpServerUpgradeHandler.UpgradeEvent) {
// Write an HTTP/2 response to the upgrade request
Http2Headers headers =
DefaultHttp2Headers.newBuilder().set(UPGRADE_RESPONSE_HEADER, "true").build();
writeHeaders(ctx, ctx.newPromise(), 1, headers, 0, true, true);
}
super.userEventTriggered(ctx, evt);
}
/**
* If receive a frame with end-of-stream set, send a pre-canned response.
*/
@Override @Override
public void onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding, public void onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding,
boolean endOfStream, boolean endOfSegment, boolean compressed) throws Http2Exception { boolean endOfStream, boolean endOfSegment, boolean compressed) throws Http2Exception {
@ -43,6 +79,9 @@ public class HelloWorldHttp2Handler extends AbstractHttp2ConnectionHandler {
} }
} }
/**
* If receive a frame with end-of-stream set, send a pre-canned response.
*/
@Override @Override
public void onHeadersRead(ChannelHandlerContext ctx, int streamId, public void onHeadersRead(ChannelHandlerContext ctx, int streamId,
io.netty.handler.codec.http2.Http2Headers headers, int padding, boolean endStream, io.netty.handler.codec.http2.Http2Headers headers, int padding, boolean endStream,
@ -52,6 +91,9 @@ public class HelloWorldHttp2Handler extends AbstractHttp2ConnectionHandler {
} }
} }
/**
* If receive a frame with end-of-stream set, send a pre-canned response.
*/
@Override @Override
public void onHeadersRead(ChannelHandlerContext ctx, int streamId, public void onHeadersRead(ChannelHandlerContext ctx, int streamId,
io.netty.handler.codec.http2.Http2Headers headers, int streamDependency, short weight, io.netty.handler.codec.http2.Http2Headers headers, int streamDependency, short weight,
@ -77,7 +119,8 @@ public class HelloWorldHttp2Handler extends AbstractHttp2ConnectionHandler {
} }
@Override @Override
public void onSettingsRead(ChannelHandlerContext ctx, Http2Settings settings) throws Http2Exception { public void onSettingsRead(ChannelHandlerContext ctx, Http2Settings settings)
throws Http2Exception {
} }
@Override @Override
@ -100,7 +143,7 @@ public class HelloWorldHttp2Handler extends AbstractHttp2ConnectionHandler {
@Override @Override
public void onWindowUpdateRead(ChannelHandlerContext ctx, int streamId, int windowSizeIncrement) public void onWindowUpdateRead(ChannelHandlerContext ctx, int streamId, int windowSizeIncrement)
throws Http2Exception { throws Http2Exception {
} }
@Override @Override
@ -118,6 +161,9 @@ public class HelloWorldHttp2Handler extends AbstractHttp2ConnectionHandler {
super.exceptionCaught(ctx, cause); super.exceptionCaught(ctx, cause);
} }
/**
* Sends a "Hello World" DATA frame to the client.
*/
private void sendResponse(ChannelHandlerContext ctx, int streamId) throws Http2Exception { private void sendResponse(ChannelHandlerContext ctx, int streamId) throws Http2Exception {
// Send a frame for the response status // Send a frame for the response status
Http2Headers headers = DefaultHttp2Headers.newBuilder().status("200").build(); Http2Headers headers = DefaultHttp2Headers.newBuilder().status("200").build();

View File

@ -16,24 +16,28 @@
package io.netty.example.http2.server; package io.netty.example.http2.server;
import static io.netty.example.http2.Http2ExampleUtil.parseEndpointConfig;
import io.netty.bootstrap.ServerBootstrap; import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelOption; import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup; import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.example.http2.Http2ExampleUtil.EndpointConfig;
/** /**
* A HTTP/2 Server that responds to requests with a Hello World. * A HTTP/2 Server that responds to requests with a Hello World. Once started, you can test the
* server with the example client.
* <p> * <p>
* Once started, you can test the server with the example client. * To server accepts command-line arguments for {@code -port=<port number>} <i>(default: http=8080,
* https=8443)</i>, and {@code -ssl=<true/false>} <i>(default=false)</i>.
*/ */
public class Http2Server { public class Http2Server {
private final int port; private final EndpointConfig config;
public Http2Server(int port) { public Http2Server(EndpointConfig config) {
this.port = port; this.config = config;
} }
public void run() throws Exception { public void run() throws Exception {
@ -44,9 +48,9 @@ public class Http2Server {
ServerBootstrap b = new ServerBootstrap(); ServerBootstrap b = new ServerBootstrap();
b.option(ChannelOption.SO_BACKLOG, 1024); b.option(ChannelOption.SO_BACKLOG, 1024);
b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class) b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
.childHandler(new Http2ServerInitializer()); .childHandler(new Http2ServerInitializer(config.isSsl()));
Channel ch = b.bind(port).sync().channel(); Channel ch = b.bind(config.port()).sync().channel();
ch.closeFuture().sync(); ch.closeFuture().sync();
} finally { } finally {
bossGroup.shutdownGracefully(); bossGroup.shutdownGracefully();
@ -55,30 +59,10 @@ public class Http2Server {
} }
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
checkForNpnSupport(); EndpointConfig config = parseEndpointConfig(args);
int port; System.out.println(config);
if (args.length > 0) {
port = Integer.parseInt(args[0]);
} else {
port = 8443;
}
System.out.println("HTTP2 server started at port " + port + '.'); System.out.println("HTTP2 server started at port " + config.port() + '.');
new Http2Server(config).run();
new Http2Server(port).run();
}
public static void checkForNpnSupport() {
try {
Class.forName("sun.security.ssl.NextProtoNegoExtension");
} catch (ClassNotFoundException ignored) {
System.err.println();
System.err.println("Could not locate Next Protocol Negotiation (NPN) implementation.");
System.err.println("The NPN jar should have been made available when building the examples with maven.");
System.err.println("Please check that your JDK is among those supported by Jetty-NPN:");
System.err.println("http://wiki.eclipse.org/Jetty/Feature/NPN#Versions");
System.err.println();
throw new IllegalStateException("Could not locate NPN implementation. See console err for details.");
}
} }
} }

View File

@ -16,21 +16,50 @@
package io.netty.example.http2.server; package io.netty.example.http2.server;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerAppender;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.SocketChannel;
import io.netty.example.securechat.SecureChatSslContextFactory; import io.netty.example.securechat.SecureChatSslContextFactory;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpRequestDecoder;
import io.netty.handler.codec.http.HttpResponseEncoder;
import io.netty.handler.codec.http.HttpServerUpgradeHandler;
import io.netty.handler.codec.http2.Http2ServerUpgradeCodec;
import io.netty.handler.ssl.SslHandler; import io.netty.handler.ssl.SslHandler;
import org.eclipse.jetty.npn.NextProtoNego;
import java.util.Arrays;
import javax.net.ssl.SSLEngine; import javax.net.ssl.SSLEngine;
import org.eclipse.jetty.npn.NextProtoNego;
/** /**
* Sets up the Netty pipeline * Sets up the Netty pipeline for the example server. Depending on the endpoint config, sets up the
* pipeline for NPN or cleartext HTTP upgrade to HTTP/2.
*/ */
public class Http2ServerInitializer extends ChannelInitializer<SocketChannel> { public class Http2ServerInitializer extends ChannelInitializer<SocketChannel> {
private final boolean ssl;
public Http2ServerInitializer(boolean ssl) {
this.ssl = ssl;
}
@Override @Override
public void initChannel(SocketChannel ch) throws Exception { public void initChannel(SocketChannel ch) throws Exception {
if (ssl) {
configureSsl(ch);
} else {
configureClearText(ch);
}
}
/**
* Configure the pipeline for TLS NPN negotiation to HTTP/2.
*/
private void configureSsl(SocketChannel ch) {
ChannelPipeline p = ch.pipeline(); ChannelPipeline p = ch.pipeline();
SSLEngine engine = SecureChatSslContextFactory.getServerContext().createSSLEngine(); SSLEngine engine = SecureChatSslContextFactory.getServerContext().createSSLEngine();
@ -44,4 +73,50 @@ public class Http2ServerInitializer extends ChannelInitializer<SocketChannel> {
// Negotiates with the browser if HTTP2 or HTTP is going to be used // Negotiates with the browser if HTTP2 or HTTP is going to be used
p.addLast("handler", new Http2OrHttpHandler()); p.addLast("handler", new Http2OrHttpHandler());
} }
/**
* Configure the pipeline for a cleartext upgrade from HTTP to HTTP/2.
*/
private void configureClearText(SocketChannel ch) {
HttpCodec sourceCodec = new HttpCodec();
HttpServerUpgradeHandler.UpgradeCodec upgradeCodec =
new Http2ServerUpgradeCodec(new HelloWorldHttp2Handler());
HttpServerUpgradeHandler upgradeHandler =
new HttpServerUpgradeHandler(sourceCodec, Arrays.asList(upgradeCodec), 65536);
ch.pipeline().addLast(sourceCodec);
ch.pipeline().addLast(upgradeHandler);
ch.pipeline().addLast(new UserEventLogger());
}
/**
* Source codec for HTTP cleartext upgrade.
*/
private static final class HttpCodec extends ChannelHandlerAppender implements
HttpServerUpgradeHandler.SourceCodec {
HttpCodec() {
add("httpRequestDecoder", new HttpRequestDecoder());
add("httpResponseEncoder", new HttpResponseEncoder());
add("httpRequestAggregator", new HttpObjectAggregator(65536));
}
@Override
public void upgradeFrom(ChannelHandlerContext ctx) {
System.out.println("removing HTTP handlers");
ctx.pipeline().remove("httpRequestAggregator");
ctx.pipeline().remove("httpResponseEncoder");
ctx.pipeline().remove("httpRequestDecoder");
}
}
/**
* Class that logs any User Events triggered on this channel.
*/
private static class UserEventLogger extends ChannelHandlerAdapter {
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
System.out.println("User Event Triggered: " + evt);
super.userEventTriggered(ctx, evt);
}
}
} }