HTTP/2 draft 14 HTTP message flow

Motivation:

HTTP/2 draft 14 came out a couple of weeks ago and we need to keep up
with the spec.

Modifications:
-Revert back to dispatching FullHttpMessage objects instead of individual HttpObjects
-Corrections to HttpObject comparitors to support test cases
-New test cases to support sending headers immediatley
-Bug fixes cleaned up to ensure the message flow is terminated properly

Result:
Netty HTTP/2 to HTTP/1.x translation layer will support the HTTP/2 draft message flow.
This commit is contained in:
Scott Mitchell 2014-08-22 17:22:18 -04:00 committed by nmittler
parent 5b1d50fa7c
commit 892944eba8
23 changed files with 1153 additions and 753 deletions

View File

@ -17,11 +17,13 @@ package io.netty.handler.codec.http;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.util.internal.StringUtil;
/**
* Default implementation of {@link FullHttpRequest}.
*/
public class DefaultFullHttpRequest extends DefaultHttpRequest implements FullHttpRequest {
private static final int HASH_CODE_PRIME = 31;
private final ByteBuf content;
private final HttpHeaders trailingHeader;
private final boolean validateHeaders;
@ -34,6 +36,10 @@ public class DefaultFullHttpRequest extends DefaultHttpRequest implements FullHt
this(httpVersion, method, uri, content, true);
}
public DefaultFullHttpRequest(HttpVersion httpVersion, HttpMethod method, String uri, boolean validateHeaders) {
this(httpVersion, method, uri, Unpooled.buffer(0), true);
}
public DefaultFullHttpRequest(HttpVersion httpVersion, HttpMethod method, String uri,
ByteBuf content, boolean validateHeaders) {
super(httpVersion, method, uri, validateHeaders);
@ -112,15 +118,41 @@ public class DefaultFullHttpRequest extends DefaultHttpRequest implements FullHt
return this;
}
@Override
public FullHttpRequest copy() {
/**
* Copy this object
*
* @param copyContent
* <ul>
* <li>{@code true} if this object's {@link #content()} should be used to copy.</li>
* <li>{@code false} if {@code newContent} should be used instead.</li>
* </ul>
* @param newContent
* <ul>
* <li>if {@code copyContent} is false then this will be used in the copy's content.</li>
* <li>if {@code null} then a default buffer of 0 size will be selected</li>
* </ul>
* @return A copy of this object
*/
private FullHttpRequest copy(boolean copyContent, ByteBuf newContent) {
DefaultFullHttpRequest copy = new DefaultFullHttpRequest(
protocolVersion(), method(), uri(), content().copy(), validateHeaders);
protocolVersion(), method(), uri(),
copyContent ? content().copy() :
newContent == null ? Unpooled.buffer(0) : newContent);
copy.headers().set(headers());
copy.trailingHeaders().set(trailingHeaders());
return copy;
}
@Override
public FullHttpRequest copy(ByteBuf newContent) {
return copy(false, newContent);
}
@Override
public FullHttpRequest copy() {
return copy(true, null);
}
@Override
public FullHttpRequest duplicate() {
DefaultFullHttpRequest duplicate = new DefaultFullHttpRequest(
@ -129,4 +161,38 @@ public class DefaultFullHttpRequest extends DefaultHttpRequest implements FullHt
duplicate.trailingHeaders().set(trailingHeaders());
return duplicate;
}
@Override
public int hashCode() {
int result = 1;
result = HASH_CODE_PRIME * result + content().hashCode();
result = HASH_CODE_PRIME * result + trailingHeaders().hashCode();
result = HASH_CODE_PRIME * result + super.hashCode();
return result;
}
@Override
public boolean equals(Object o) {
if (!(o instanceof DefaultFullHttpRequest)) {
return false;
}
DefaultFullHttpRequest other = (DefaultFullHttpRequest) o;
return super.equals(other) &&
content().equals(other.content()) &&
trailingHeaders().equals(other.trailingHeaders());
}
@Override
public String toString() {
StringBuilder buf = new StringBuilder();
appendAll(buf);
buf.append(StringUtil.NEWLINE);
appendHeaders(buf, trailingHeaders());
// Remove the last newline.
buf.setLength(buf.length() - StringUtil.NEWLINE.length());
return buf.toString();
}
}

View File

@ -17,13 +17,14 @@ package io.netty.handler.codec.http;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.util.internal.StringUtil;
/**
* Default implementation of a {@link FullHttpResponse}.
*/
public class DefaultFullHttpResponse extends DefaultHttpResponse implements FullHttpResponse {
private static final int HASH_CODE_PRIME = 31;
private final ByteBuf content;
private final HttpHeaders trailingHeaders;
private final boolean validateHeaders;
@ -36,6 +37,10 @@ public class DefaultFullHttpResponse extends DefaultHttpResponse implements Full
this(version, status, content, true);
}
public DefaultFullHttpResponse(HttpVersion version, HttpResponseStatus status, boolean validateHeaders) {
this(version, status, Unpooled.buffer(0), validateHeaders);
}
public DefaultFullHttpResponse(HttpVersion version, HttpResponseStatus status,
ByteBuf content, boolean validateHeaders) {
super(version, status, validateHeaders);
@ -108,15 +113,41 @@ public class DefaultFullHttpResponse extends DefaultHttpResponse implements Full
return this;
}
@Override
public FullHttpResponse copy() {
/**
* Copy this object
*
* @param copyContent
* <ul>
* <li>{@code true} if this object's {@link #content()} should be used to copy.</li>
* <li>{@code false} if {@code newContent} should be used instead.</li>
* </ul>
* @param newContent
* <ul>
* <li>if {@code copyContent} is false then this will be used in the copy's content.</li>
* <li>if {@code null} then a default buffer of 0 size will be selected</li>
* </ul>
* @return A copy of this object
*/
private FullHttpResponse copy(boolean copyContent, ByteBuf newContent) {
DefaultFullHttpResponse copy = new DefaultFullHttpResponse(
protocolVersion(), status(), content().copy(), validateHeaders);
protocolVersion(), status(),
copyContent ? content().copy() :
newContent == null ? Unpooled.buffer(0) : newContent);
copy.headers().set(headers());
copy.trailingHeaders().set(trailingHeaders());
return copy;
}
@Override
public FullHttpResponse copy(ByteBuf newContent) {
return copy(false, newContent);
}
@Override
public FullHttpResponse copy() {
return copy(true, null);
}
@Override
public FullHttpResponse duplicate() {
DefaultFullHttpResponse duplicate = new DefaultFullHttpResponse(protocolVersion(), status(),
@ -125,4 +156,38 @@ public class DefaultFullHttpResponse extends DefaultHttpResponse implements Full
duplicate.trailingHeaders().set(trailingHeaders());
return duplicate;
}
@Override
public int hashCode() {
int result = 1;
result = HASH_CODE_PRIME * result + content().hashCode();
result = HASH_CODE_PRIME * result + trailingHeaders().hashCode();
result = HASH_CODE_PRIME * result + super.hashCode();
return result;
}
@Override
public boolean equals(Object o) {
if (!(o instanceof DefaultFullHttpResponse)) {
return false;
}
DefaultFullHttpResponse other = (DefaultFullHttpResponse) o;
return super.equals(other) &&
content().equals(other.content()) &&
trailingHeaders().equals(other.trailingHeaders());
}
@Override
public String toString() {
StringBuilder buf = new StringBuilder();
appendAll(buf);
buf.append(StringUtil.NEWLINE);
appendHeaders(buf, trailingHeaders());
// Remove the last newline.
buf.setLength(buf.length() - StringUtil.NEWLINE.length());
return buf.toString();
}
}

View File

@ -104,6 +104,10 @@ public abstract class DefaultHttpMessage extends DefaultHttpObject implements Ht
}
void appendHeaders(StringBuilder buf) {
appendHeaders(buf, headers());
}
void appendHeaders(StringBuilder buf, HttpHeaders headers) {
for (Map.Entry<String, String> e: headers()) {
buf.append(e.getKey());
buf.append(": ");

View File

@ -21,7 +21,7 @@ import io.netty.util.internal.StringUtil;
* The default {@link HttpRequest} implementation.
*/
public class DefaultHttpRequest extends DefaultHttpMessage implements HttpRequest {
private static final int HASH_CODE_PRIME = 31;
private HttpMethod method;
private String uri;
@ -90,9 +90,39 @@ public class DefaultHttpRequest extends DefaultHttpMessage implements HttpReques
return this;
}
@Override
public int hashCode() {
int result = 1;
result = HASH_CODE_PRIME * result + method.hashCode();
result = HASH_CODE_PRIME * result + uri.hashCode();
result = HASH_CODE_PRIME * result + super.hashCode();
return result;
}
@Override
public boolean equals(Object o) {
if (!(o instanceof DefaultHttpRequest)) {
return false;
}
DefaultHttpRequest other = (DefaultHttpRequest) o;
return method().equals(other.method()) &&
uri().equalsIgnoreCase(other.uri()) &&
super.equals(o);
}
@Override
public String toString() {
StringBuilder buf = new StringBuilder();
appendAll(buf);
// Remove the last newline.
buf.setLength(buf.length() - StringUtil.NEWLINE.length());
return buf.toString();
}
void appendAll(StringBuilder buf) {
buf.append(StringUtil.simpleClassName(this));
buf.append("(decodeResult: ");
buf.append(decoderResult());
@ -105,9 +135,5 @@ public class DefaultHttpRequest extends DefaultHttpMessage implements HttpReques
buf.append(protocolVersion().text());
buf.append(StringUtil.NEWLINE);
appendHeaders(buf);
// Remove the last newline.
buf.setLength(buf.length() - StringUtil.NEWLINE.length());
return buf.toString();
}
}

View File

@ -21,7 +21,7 @@ import io.netty.util.internal.StringUtil;
* The default {@link HttpResponse} implementation.
*/
public class DefaultHttpResponse extends DefaultHttpMessage implements HttpResponse {
private static final int HASH_CODE_PRIME = 31;
private HttpResponseStatus status;
/**
@ -71,10 +71,9 @@ public class DefaultHttpResponse extends DefaultHttpMessage implements HttpRespo
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + status.hashCode();
result = prime * result + super.hashCode();
result = HASH_CODE_PRIME * result + status.hashCode();
result = HASH_CODE_PRIME * result + super.hashCode();
return result;
}
@ -92,6 +91,14 @@ public class DefaultHttpResponse extends DefaultHttpMessage implements HttpRespo
@Override
public String toString() {
StringBuilder buf = new StringBuilder();
appendAll(buf);
// Remove the last newline.
buf.setLength(buf.length() - StringUtil.NEWLINE.length());
return buf.toString();
}
void appendAll(StringBuilder buf) {
buf.append(StringUtil.simpleClassName(this));
buf.append("(decodeResult: ");
buf.append(decoderResult());
@ -102,9 +109,5 @@ public class DefaultHttpResponse extends DefaultHttpMessage implements HttpRespo
buf.append(status());
buf.append(StringUtil.NEWLINE);
appendHeaders(buf);
// Remove the last newline.
buf.setLength(buf.length() - StringUtil.NEWLINE.length());
return buf.toString();
}
}

View File

@ -15,11 +15,23 @@
*/
package io.netty.handler.codec.http;
import io.netty.buffer.ByteBuf;
/**
* Combines {@link HttpMessage} and {@link LastHttpContent} into one
* message. So it represent a <i>complete</i> http message.
*/
public interface FullHttpMessage extends HttpMessage, LastHttpContent {
/**
* Create a copy of this {@link FullHttpMessage} with alternative content.
*
* @param newContent The buffer to use instead of this {@link FullHttpMessage}'s content in the copy operation.
* <p>
* NOTE: retain will NOT be called on this buffer. {@code null} results in an empty default choice buffer.
* @return The result of the copy operation
*/
FullHttpMessage copy(ByteBuf newContent);
@Override
FullHttpMessage copy();

View File

@ -15,11 +15,15 @@
*/
package io.netty.handler.codec.http;
import io.netty.buffer.ByteBuf;
/**
* Combinate the {@link HttpRequest} and {@link FullHttpMessage}, so the request is a <i>complete</i> HTTP
* request.
* Combinate the {@link HttpRequest} and {@link FullHttpMessage}, so the request is a <i>complete</i> HTTP request.
*/
public interface FullHttpRequest extends HttpRequest, FullHttpMessage {
@Override
FullHttpRequest copy(ByteBuf newContent);
@Override
FullHttpRequest copy();

View File

@ -15,11 +15,15 @@
*/
package io.netty.handler.codec.http;
import io.netty.buffer.ByteBuf;
/**
* Combination of a {@link HttpResponse} and {@link FullHttpMessage}.
* So it represent a <i>complete</i> http response.
* Combination of a {@link HttpResponse} and {@link FullHttpMessage}. So it represent a <i>complete</i> http response.
*/
public interface FullHttpResponse extends HttpResponse, FullHttpMessage {
@Override
FullHttpResponse copy(ByteBuf newContent);
@Override
FullHttpResponse copy();

View File

@ -263,15 +263,41 @@ public class HttpObjectAggregator
super(request, content, trailingHeaders);
}
@Override
public FullHttpRequest copy() {
/**
* Copy this object
*
* @param copyContent
* <ul>
* <li>{@code true} if this object's {@link #content()} should be used to copy.</li>
* <li>{@code false} if {@code newContent} should be used instead.</li>
* </ul>
* @param newContent
* <ul>
* <li>if {@code copyContent} is false then this will be used in the copy's content.</li>
* <li>if {@code null} then a default buffer of 0 size will be selected</li>
* </ul>
* @return A copy of this object
*/
private FullHttpRequest copy(boolean copyContent, ByteBuf newContent) {
DefaultFullHttpRequest copy = new DefaultFullHttpRequest(
protocolVersion(), method(), uri(), content().copy());
protocolVersion(), method(), uri(),
copyContent ? content().copy() :
newContent == null ? Unpooled.buffer(0) : newContent);
copy.headers().set(headers());
copy.trailingHeaders().set(trailingHeaders());
return copy;
}
@Override
public FullHttpRequest copy(ByteBuf newContent) {
return copy(false, newContent);
}
@Override
public FullHttpRequest copy() {
return copy(true, null);
}
@Override
public FullHttpRequest duplicate() {
DefaultFullHttpRequest duplicate = new DefaultFullHttpRequest(
@ -340,15 +366,41 @@ public class HttpObjectAggregator
super(message, content, trailingHeaders);
}
@Override
public FullHttpResponse copy() {
/**
* Copy this object
*
* @param copyContent
* <ul>
* <li>{@code true} if this object's {@link #content()} should be used to copy.</li>
* <li>{@code false} if {@code newContent} should be used instead.</li>
* </ul>
* @param newContent
* <ul>
* <li>if {@code copyContent} is false then this will be used in the copy's content.</li>
* <li>if {@code null} then a default buffer of 0 size will be selected</li>
* </ul>
* @return A copy of this object
*/
private FullHttpResponse copy(boolean copyContent, ByteBuf newContent) {
DefaultFullHttpResponse copy = new DefaultFullHttpResponse(
protocolVersion(), status(), content().copy());
protocolVersion(), status(),
copyContent ? content().copy() :
newContent == null ? Unpooled.buffer(0) : newContent);
copy.headers().set(headers());
copy.trailingHeaders().set(trailingHeaders());
return copy;
}
@Override
public FullHttpResponse copy(ByteBuf newContent) {
return copy(false, newContent);
}
@Override
public FullHttpResponse copy() {
return copy(true, null);
}
@Override
public FullHttpResponse duplicate() {
DefaultFullHttpResponse duplicate = new DefaultFullHttpResponse(protocolVersion(), status(),

View File

@ -540,6 +540,13 @@ public class HttpResponseStatus implements Comparable<HttpResponseStatus> {
return reasonPhrase;
}
/**
* Determine if the status code is of the informational class (1xx)
*/
public boolean isInformational() {
return code >= 100 && code < 200;
}
@Override
public int hashCode() {
return code();

View File

@ -16,6 +16,7 @@
package io.netty.handler.codec.http.multipart;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.AsciiString;
import io.netty.handler.codec.DecoderResult;
@ -1203,15 +1204,41 @@ public class HttpPostRequestEncoder implements ChunkedInput<HttpContent> {
return this;
}
@Override
public FullHttpRequest copy() {
/**
* Copy this object
*
* @param copyContent
* <ul>
* <li>{@code true} if this object's {@link #content()} should be used to copy.</li>
* <li>{@code false} if {@code newContent} should be used instead.</li>
* </ul>
* @param newContent
* <ul>
* <li>if {@code copyContent} is false then this will be used in the copy's content.</li>
* <li>if {@code null} then a default buffer of 0 size will be selected</li>
* </ul>
* @return A copy of this object
*/
private FullHttpRequest copy(boolean copyContent, ByteBuf newContent) {
DefaultFullHttpRequest copy = new DefaultFullHttpRequest(
protocolVersion(), method(), uri(), content().copy());
protocolVersion(), method(), uri(),
copyContent ? content().copy() :
newContent == null ? Unpooled.buffer(0) : newContent);
copy.headers().set(headers());
copy.trailingHeaders().set(trailingHeaders());
return copy;
}
@Override
public FullHttpRequest copy(ByteBuf newContent) {
return copy(false, newContent);
}
@Override
public FullHttpRequest copy() {
return copy(true, null);
}
@Override
public FullHttpRequest duplicate() {
DefaultFullHttpRequest duplicate = new DefaultFullHttpRequest(

View File

@ -128,6 +128,24 @@ public final class DefaultHttp2Headers extends Http2Headers {
return new HeaderIterator();
}
@Override
public String forEach(HeaderVisitor visitor) {
if (isEmpty()) {
return null;
}
HeaderEntry e = head.after;
do {
if (visitor.visit(e)) {
e = e.after;
} else {
return e.getKey();
}
} while (e != head);
return null;
}
/**
* Short cut for {@code new DefaultHttp2Headers.Builder()}.
*/

View File

@ -68,17 +68,17 @@ public class DelegatingHttp2HttpConnectionHandler extends DelegatingHttp2Connect
}
// Consume the Authority extension header if present
value = httpHeaders.get(Http2HttpHeaders.Names.AUTHORITY);
value = httpHeaders.get(Http2ToHttpHeaders.Names.AUTHORITY);
if (value != null) {
http2Headers.authority(value);
httpHeaders.remove(Http2HttpHeaders.Names.AUTHORITY);
httpHeaders.remove(Http2ToHttpHeaders.Names.AUTHORITY);
}
// Consume the Scheme extension header if present
value = httpHeaders.get(Http2HttpHeaders.Names.SCHEME);
value = httpHeaders.get(Http2ToHttpHeaders.Names.SCHEME);
if (value != null) {
http2Headers.scheme(value);
httpHeaders.remove(Http2HttpHeaders.Names.SCHEME);
httpHeaders.remove(Http2ToHttpHeaders.Names.SCHEME);
}
}
@ -114,7 +114,7 @@ public class DelegatingHttp2HttpConnectionHandler extends DelegatingHttp2Connect
*/
private int getStreamId(HttpHeaders httpHeaders) throws Http2Exception {
int streamId = 0;
String value = httpHeaders.get(Http2HttpHeaders.Names.STREAM_ID);
String value = httpHeaders.get(Http2ToHttpHeaders.Names.STREAM_ID);
if (value == null) {
streamId = nextStreamId();
} else {
@ -124,7 +124,7 @@ public class DelegatingHttp2HttpConnectionHandler extends DelegatingHttp2Connect
throw Http2Exception.format(Http2Error.INTERNAL_ERROR,
"Invalid user-specified stream id value '%s'", value);
}
httpHeaders.remove(Http2HttpHeaders.Names.STREAM_ID);
httpHeaders.remove(Http2ToHttpHeaders.Names.STREAM_ID);
}
return streamId;

View File

@ -25,6 +25,17 @@ public interface Http2FrameObserver extends Http2DataObserver {
/**
* Handles an inbound HEADERS frame.
* <p>
* Only one of the following methods will be called for each HEADERS frame sequence.
* One will be called when the END_HEADERS flag has been received.
* <ul>
* <li>{@link #onHeadersRead(ChannelHandlerContext, int, Http2Headers, int, boolean)}</li>
* <li>{@link #onHeadersRead(ChannelHandlerContext, int, Http2Headers, int, short, boolean, int, boolean)}</li>
* <li>{@link #onPushPromiseRead(ChannelHandlerContext, int, int, Http2Headers, int)}</li>
* </ul>
* <p>
* To say it another way; the {@link Http2Headers} will contain all of the headers
* for the current message exchange step (additional queuing is not necessary).
*
* @param ctx the context from the handler where the frame was read.
* @param streamId the subject stream for the frame.
@ -37,7 +48,19 @@ public interface Http2FrameObserver extends Http2DataObserver {
boolean endStream) throws Http2Exception;
/**
* Handles an inbound HEADERS frame with priority information specified.
* Handles an inbound HEADERS frame with priority information specified. Only called if END_HEADERS encountered.
*
* <p>
* Only one of the following methods will be called for each HEADERS frame sequence.
* One will be called when the END_HEADERS flag has been received.
* <ul>
* <li>{@link #onHeadersRead(ChannelHandlerContext, int, Http2Headers, int, boolean)}</li>
* <li>{@link #onHeadersRead(ChannelHandlerContext, int, Http2Headers, int, short, boolean, int, boolean)}</li>
* <li>{@link #onPushPromiseRead(ChannelHandlerContext, int, int, Http2Headers, int)}</li>
* </ul>
* <p>
* To say it another way; the {@link Http2Headers} will contain all of the headers
* for the current message exchange step (additional queuing is not necessary).
*
* @param ctx the context from the handler where the frame was read.
* @param streamId the subject stream for the frame.
@ -109,7 +132,19 @@ public interface Http2FrameObserver extends Http2DataObserver {
void onPingAckRead(ChannelHandlerContext ctx, ByteBuf data) throws Http2Exception;
/**
* Handles an inbound PUSH_PROMISE frame.
* Handles an inbound PUSH_PROMISE frame. Only called if END_HEADERS encountered.
*
* <p>
* Only one of the following methods will be called for each HEADERS frame sequence.
* One will be called when the END_HEADERS flag has been received.
* <ul>
* <li>{@link #onHeadersRead(ChannelHandlerContext, int, Http2Headers, int, boolean)}</li>
* <li>{@link #onHeadersRead(ChannelHandlerContext, int, Http2Headers, int, short, boolean, int, boolean)}</li>
* <li>{@link #onPushPromiseRead(ChannelHandlerContext, int, int, Http2Headers, int)}</li>
* </ul>
* <p>
* To say it another way; the {@link Http2Headers} will contain all of the headers
* for the current message exchange step (additional queuing is not necessary).
*
* @param ctx the context from the handler where the frame was read.
* @param streamId the stream the frame was sent on.

View File

@ -70,6 +70,11 @@ public abstract class Http2Headers implements Iterable<Entry<String, String>> {
public Iterator<Entry<String, String>> iterator() {
return entries().iterator();
}
@Override
public String forEach(HeaderVisitor visitor) {
return null;
}
};
/**
@ -180,6 +185,27 @@ public abstract class Http2Headers implements Iterable<Entry<String, String>> {
*/
public abstract int size();
/**
* Allows a means to reduce GC pressure while iterating over a collection
*/
public interface HeaderVisitor {
/**
* @return
* <ul>
* <li>{@code true} if the processor wants to continue the loop and handle the entry.</li>
* <li>{@code false} if the processor wants to stop handling headers and abort the loop.</li>
* </ul>
*/
boolean visit(Map.Entry<String, String> entry);
}
/**
* Iterates over the entries contained within this header object in no guaranteed order
* @return {@code null} if the visitor iterated to or beyond the end of the headers.
* The last-visited header name If the {@link HeaderVisitor#visit(Entry)} returned {@code false}.
*/
public abstract String forEach(HeaderVisitor visitor);
/**
* Gets the {@link PseudoHeaderName#METHOD} header or {@code null} if there is no such header
*/

View File

@ -15,12 +15,14 @@
package io.netty.handler.codec.http2;
import io.netty.handler.codec.AsciiString;
import io.netty.handler.codec.http.HttpResponseStatus;
/**
* Provides the constants for the header names used by
* {@link InboundHttp2ToHttpAdapter} and {@link DelegatingHttp2HttpConnectionHandler}
*/
public final class Http2HttpHeaders {
public final class Http2ToHttpHeaders {
private Http2ToHttpHeaders() { }
public static final class Names {
/**
@ -59,4 +61,27 @@ public final class Http2HttpHeaders {
private Names() {
}
}
/**
* Apply HTTP/2 rules while translating status code to {@link HttpResponseStatus}
*
* @param status The status from an HTTP/2 frame
* @return The HTTP/1.x status
* @throws Http2Exception If there is a problem translating from HTTP/2 to HTTP/1.x
*/
public static HttpResponseStatus parseStatus(String status) throws Http2Exception {
HttpResponseStatus result = null;
try {
result = HttpResponseStatus.parseLine(status);
if (result == HttpResponseStatus.SWITCHING_PROTOCOLS) {
throw Http2Exception.protocolError("Invalid HTTP/2 status code '%d'", result.code());
}
} catch (Http2Exception e) {
throw e;
} catch (Exception e) {
throw Http2Exception.protocolError(
"Unrecognized HTTP status code '%s' encountered in translation to HTTP/1.x", status);
}
return result;
}
}

View File

@ -12,17 +12,16 @@
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package io.netty.handler.codec.http2;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.TooLongFrameException;
import io.netty.handler.codec.http.DefaultHttpContent;
import io.netty.handler.codec.http.DefaultHttpRequest;
import io.netty.handler.codec.http.DefaultHttpResponse;
import io.netty.handler.codec.http.DefaultLastHttpContent;
import io.netty.handler.codec.http.DefaultFullHttpRequest;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.FullHttpMessage;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpContent;
import io.netty.handler.codec.http.HttpHeaderUtil;
import io.netty.handler.codec.http.HttpHeaders;
@ -30,13 +29,12 @@ import io.netty.handler.codec.http.HttpMessage;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http.LastHttpContent;
import io.netty.handler.codec.http2.Http2Headers.HeaderVisitor;
import io.netty.util.collection.IntObjectHashMap;
import io.netty.util.collection.IntObjectMap;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
@ -46,10 +44,11 @@ import java.util.Set;
*/
public class InboundHttp2ToHttpAdapter extends Http2ConnectionAdapter implements Http2FrameObserver {
private final long maxContentLength;
private final int maxContentLength;
private final boolean validateHttpHeaders;
private final Http2Connection connection;
private final IntObjectMap<Http2HttpMessageAccumulator> messageMap;
private final ImmediateSendDetector sendDetector;
private final IntObjectMap<FullHttpMessage> messageMap;
private static final Set<String> HEADERS_TO_EXCLUDE;
private static final Map<String, String> HEADER_NAME_TRANSLATIONS_REQUEST;
@ -64,12 +63,12 @@ public class InboundHttp2ToHttpAdapter extends Http2ConnectionAdapter implements
}
HEADER_NAME_TRANSLATIONS_RESPONSE.put(Http2Headers.PseudoHeaderName.AUTHORITY.value(),
Http2HttpHeaders.Names.AUTHORITY.toString());
Http2ToHttpHeaders.Names.AUTHORITY.toString());
HEADER_NAME_TRANSLATIONS_RESPONSE.put(Http2Headers.PseudoHeaderName.SCHEME.value(),
Http2HttpHeaders.Names.SCHEME.toString());
Http2ToHttpHeaders.Names.SCHEME.toString());
HEADER_NAME_TRANSLATIONS_REQUEST.putAll(HEADER_NAME_TRANSLATIONS_RESPONSE);
HEADER_NAME_TRANSLATIONS_RESPONSE.put(Http2Headers.PseudoHeaderName.PATH.value(),
Http2HttpHeaders.Names.PATH.toString());
Http2ToHttpHeaders.Names.PATH.toString());
}
/**
@ -81,9 +80,10 @@ public class InboundHttp2ToHttpAdapter extends Http2ConnectionAdapter implements
* @throws NullPointerException If {@code connection} is null
* @throws IllegalArgumentException If {@code maxContentLength} is less than or equal to {@code 0}
*/
public static InboundHttp2ToHttpAdapter newInstance(Http2Connection connection, long maxContentLength)
public static InboundHttp2ToHttpAdapter newInstance(Http2Connection connection, int maxContentLength)
throws NullPointerException, IllegalArgumentException {
InboundHttp2ToHttpAdapter adapter = new InboundHttp2ToHttpAdapter(connection, maxContentLength);
InboundHttp2ToHttpAdapter adapter = new InboundHttp2ToHttpAdapter(connection, maxContentLength,
DefaultImmediateSendDetector.getInstance());
connection.addListener(adapter);
return adapter;
}
@ -98,12 +98,10 @@ public class InboundHttp2ToHttpAdapter extends Http2ConnectionAdapter implements
* @throws NullPointerException If {@code connection} is null
* @throws IllegalArgumentException If {@code maxContentLength} is less than or equal to {@code 0}
*/
public static InboundHttp2ToHttpAdapter newInstance(Http2Connection connection, long maxContentLength,
boolean validateHttpHeaders)
throws NullPointerException, IllegalArgumentException {
InboundHttp2ToHttpAdapter adapter = new InboundHttp2ToHttpAdapter(connection,
maxContentLength,
validateHttpHeaders);
public static InboundHttp2ToHttpAdapter newInstance(Http2Connection connection, int maxContentLength,
boolean validateHttpHeaders) throws NullPointerException, IllegalArgumentException {
InboundHttp2ToHttpAdapter adapter = new InboundHttp2ToHttpAdapter(connection, maxContentLength,
DefaultImmediateSendDetector.getInstance(), validateHttpHeaders);
connection.addListener(adapter);
return adapter;
}
@ -114,12 +112,14 @@ public class InboundHttp2ToHttpAdapter extends Http2ConnectionAdapter implements
* @param connection The object which will provide connection notification events for the current connection
* @param maxContentLength the maximum length of the message content. If the length of the message content exceeds
* this value, a {@link TooLongFrameException} will be raised.
* @param sendDetector Decides when HTTP messages must be sent before the typically HTTP message events are detected
* @throws NullPointerException If {@code connection} is null
* @throws IllegalArgumentException If {@code maxContentLength} is less than or equal to {@code 0}
*/
protected InboundHttp2ToHttpAdapter(Http2Connection connection, long maxContentLength) throws NullPointerException,
protected InboundHttp2ToHttpAdapter(Http2Connection connection, int maxContentLength,
ImmediateSendDetector sendDetector) throws NullPointerException,
IllegalArgumentException {
this(connection, maxContentLength, true);
this(connection, maxContentLength, sendDetector, true);
}
/**
@ -128,11 +128,13 @@ public class InboundHttp2ToHttpAdapter extends Http2ConnectionAdapter implements
* @param connection The object which will provide connection notification events for the current connection
* @param maxContentLength the maximum length of the message content. If the length of the message content exceeds
* this value, a {@link TooLongFrameException} will be raised.
* @param sendDetector Decides when HTTP messages must be sent before the typically HTTP message events are detected
* @param validateHeaders {@code true} if http headers should be validated
* @throws NullPointerException If {@code connection} is null
* @throws IllegalArgumentException If {@code maxContentLength} is less than or equal to {@code 0}
*/
protected InboundHttp2ToHttpAdapter(Http2Connection connection, long maxContentLength, boolean validateHttpHeaders)
protected InboundHttp2ToHttpAdapter(Http2Connection connection, int maxContentLength,
ImmediateSendDetector sendDetector, boolean validateHttpHeaders)
throws NullPointerException, IllegalArgumentException {
if (connection == null) {
throw new NullPointerException("connection");
@ -140,170 +142,172 @@ public class InboundHttp2ToHttpAdapter extends Http2ConnectionAdapter implements
if (maxContentLength <= 0) {
throw new IllegalArgumentException("maxContentLength must be a positive integer: " + maxContentLength);
}
if (sendDetector == null) {
throw new NullPointerException("sendDetector");
}
this.maxContentLength = maxContentLength;
this.validateHttpHeaders = validateHttpHeaders;
this.connection = connection;
this.messageMap = new IntObjectHashMap<Http2HttpMessageAccumulator>();
this.sendDetector = sendDetector;
this.messageMap = new IntObjectHashMap<FullHttpMessage>();
}
@Override
public void streamRemoved(Http2Stream stream) {
removeMessage(stream.id());
}
@Override
public void onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding,
boolean endOfStream) throws Http2Exception {
// Padding is already stripped out of data by super class
Http2HttpMessageAccumulator msgAccumulator = getMessage(streamId);
if (msgAccumulator == null) {
throw Http2Exception.protocolError("Data Frame recieved for unknown stream id %d", streamId);
}
try {
msgAccumulator.add(endOfStream ? new DefaultLastHttpContent(data, validateHttpHeaders)
: new DefaultHttpContent(data), ctx);
} catch (TooLongFrameException e) {
removeMessage(streamId);
throw Http2Exception.format(Http2Error.INTERNAL_ERROR,
"Content length exceeded max of %d for stream id %d", maxContentLength, streamId);
}
if (endOfStream) {
msgAccumulator.endOfStream(ctx);
removeMessage(streamId);
}
messageMap.remove(stream.id());
}
/**
* Extracts the common initial header processing and internal tracking
* Set final headers and fire a channel read event
*
* @param ctx The context to fire the event on
* @param msg The message to send
* @param streamId the streamId of the message which is being fired
*/
private void fireChannelRead(ChannelHandlerContext ctx, FullHttpMessage msg, int streamId) {
messageMap.remove(streamId);
HttpHeaderUtil.setContentLength(msg, msg.content().readableBytes());
ctx.fireChannelRead(msg);
}
/**
* Provides translation between HTTP/2 and HTTP header objects while ensuring the stream
* is in a valid state for additional headers.
*
* @param ctx The context for which this message has been received. Used to send informational header if detected.
* @param streamId The stream id the {@code headers} apply to
* @param headers The headers to process
* @param endOfStream {@code true} if the {@code streamId} has received the end of stream flag
* @param allowAppend {@code true} if headers will be appended if the stream already exists. if {@code false} and
* the stream already exists this method returns {@code null}.
* @param appendToTrailer {@code true} if a message {@code streamId} already exists then the headers
* should be added to the trailing headers.
* {@code false} then appends will be done to the initial headers.
* @return The object used to track the stream corresponding to {@code streamId}. {@code null} if
* {@code allowAppend} is {@code false} and the stream already exists.
* @throws Http2Exception If the stream id is not in the correct state to process the headers request
*/
private Http2HttpMessageAccumulator processHeadersBegin(int streamId, Http2Headers headers, boolean allowAppend)
throws Http2Exception {
Http2HttpMessageAccumulator msgAccumulator = getMessage(streamId);
try {
if (msgAccumulator == null) {
msgAccumulator = connection.isServer() ? newHttpRequestAccumulator(headers)
: newHttpResponseAccumulator(headers);
} else if (allowAppend) {
if (msgAccumulator.headerConsumed()) {
if (msgAccumulator.trailerConsumed()) {
throw new IllegalStateException("Header received and trailer already consumed");
} else {
msgAccumulator.trailer(new DefaultLastHttpContent(Unpooled.EMPTY_BUFFER, validateHttpHeaders));
}
}
msgAccumulator.add(headers);
} else {
return null;
private FullHttpMessage processHeadersBegin(ChannelHandlerContext ctx, int streamId, Http2Headers headers,
boolean endOfStream, boolean allowAppend, boolean appendToTrailer) throws Http2Exception {
FullHttpMessage msg = messageMap.get(streamId);
if (msg == null) {
msg = connection.isServer() ? newHttpRequest(streamId, headers, validateHttpHeaders) :
newHttpResponse(streamId, headers, validateHttpHeaders);
} else if (allowAppend) {
try {
addHttp2ToHttpHeaders(streamId, headers, msg, appendToTrailer);
} catch (Http2Exception e) {
messageMap.remove(streamId);
throw e;
}
msgAccumulator.setHeader(Http2HttpHeaders.Names.STREAM_ID, streamId);
} catch (IllegalStateException e) {
removeMessage(streamId);
throw Http2Exception.protocolError("Headers Frame recieved for stream id %d which is in an invalid state",
streamId);
} catch (Http2Exception e) {
removeMessage(streamId);
throw e;
} else {
msg = null;
}
return msgAccumulator;
if (sendDetector.mustSendImmediately(msg)) {
fireChannelRead(ctx, msg, streamId);
return endOfStream ? null : sendDetector.copyIfNeeded(msg);
}
return msg;
}
/**
* Extracts the common final header processing and internal tracking
* After HTTP/2 headers have been processed by {@link #processHeadersBegin} this method either
* sends the result up the pipeline or retains the message for future processing.
*
* @param ctx The context for which this message has been received
* @param streamId The stream id the {@code msgAccumulator} corresponds to
* @param msgAccumulator The object which represents all data for corresponding to {@code streamId}
* @param streamId The stream id the {@code objAccumulator} corresponds to
* @param msg The object which represents all headers/data for corresponding to {@code streamId}
* @param endOfStream {@code true} if this is the last event for the stream
*/
private void processHeadersEnd(ChannelHandlerContext ctx, int streamId,
Http2HttpMessageAccumulator msgAccumulator, boolean endOfStream) {
private void processHeadersEnd(ChannelHandlerContext ctx, int streamId, FullHttpMessage msg, boolean endOfStream) {
if (endOfStream) {
msgAccumulator.endOfStream(ctx);
removeMessage(streamId);
fireChannelRead(ctx, msg, streamId);
} else {
putMessage(streamId, msgAccumulator);
messageMap.put(streamId, msg);
}
}
@Override
public void onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding, boolean endOfStream)
throws Http2Exception {
FullHttpMessage msg = messageMap.get(streamId);
if (msg == null) {
throw Http2Exception.protocolError("Data Frame recieved for unknown stream id %d", streamId);
}
ByteBuf content = msg.content();
if (content.readableBytes() > maxContentLength - data.readableBytes()) {
throw Http2Exception.format(Http2Error.INTERNAL_ERROR,
"Content length exceeded max of %d for stream id %d", maxContentLength, streamId);
}
// TODO: provide hooks to a HttpContentDecoder type interface
// Preferably provide these hooks in the HTTP2 codec so even non-translation layer use-cases benefit
// (and then data will already be decoded here)
content.writeBytes(data, data.readerIndex(), data.readableBytes());
if (endOfStream) {
fireChannelRead(ctx, msg, streamId);
}
}
@Override
public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int padding,
boolean endOfStream) throws Http2Exception {
Http2HttpMessageAccumulator msgAccumulator = processHeadersBegin(streamId, headers, true);
processHeadersEnd(ctx, streamId, msgAccumulator, endOfStream);
}
/**
* Set/clear all HTTP/1.x headers related to stream dependencies
*
* @param msgAccumulator The object which caches the HTTP/1.x headers
* @param streamDependency The stream id for which the {@code msgAccumulator} is dependent on
* @param weight The dependency weight
* @param exclusive The exlusive HTTP/2 flag
* @throws IllegalStateException If the {@code msgAccumulator} is not in a valid state to change headers
*/
private void setDependencyHeaders(Http2HttpMessageAccumulator msgAccumulator, int streamDependency, short weight,
boolean exclusive) throws IllegalStateException {
if (streamDependency != 0) {
msgAccumulator.setHeader(Http2HttpHeaders.Names.STREAM_DEPENDENCY_ID, streamDependency);
msgAccumulator.setHeader(Http2HttpHeaders.Names.STREAM_EXCLUSIVE, exclusive);
msgAccumulator.setHeader(Http2HttpHeaders.Names.STREAM_WEIGHT, weight);
} else {
msgAccumulator.removeHeader(Http2HttpHeaders.Names.STREAM_DEPENDENCY_ID);
msgAccumulator.removeHeader(Http2HttpHeaders.Names.STREAM_EXCLUSIVE);
msgAccumulator.removeHeader(Http2HttpHeaders.Names.STREAM_WEIGHT);
FullHttpMessage msg = processHeadersBegin(ctx, streamId, headers, endOfStream, true, true);
if (msg != null) {
processHeadersEnd(ctx, streamId, msg, endOfStream);
}
}
@Override
public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int streamDependency,
short weight, boolean exclusive, int padding, boolean endOfStream) throws Http2Exception {
Http2HttpMessageAccumulator msgAccumulator = processHeadersBegin(streamId, headers, true);
try {
setDependencyHeaders(msgAccumulator, streamDependency, weight, exclusive);
} catch (IllegalStateException e) {
removeMessage(streamId);
throw Http2Exception.protocolError("Headers Frame recieved for stream id %d which is in an invalid state",
streamId);
FullHttpMessage msg = processHeadersBegin(ctx, streamId, headers, endOfStream, true, true);
if (msg != null) {
// TODO: fix me...consume priority tree listener interface
setDependencyHeaders(msg.headers(), streamDependency, weight, exclusive);
processHeadersEnd(ctx, streamId, msg, endOfStream);
}
processHeadersEnd(ctx, streamId, msgAccumulator, endOfStream);
}
@Override
public void onPriorityRead(ChannelHandlerContext ctx, int streamId, int streamDependency, short weight,
boolean exclusive) throws Http2Exception {
Http2HttpMessageAccumulator msgAccumulator = getMessage(streamId);
if (msgAccumulator == null) {
FullHttpMessage msg = messageMap.get(streamId);
if (msg == null) {
throw Http2Exception.protocolError("Priority Frame recieved for unknown stream id %d", streamId);
}
try {
setDependencyHeaders(msgAccumulator, streamDependency, weight, exclusive);
} catch (IllegalStateException e) {
removeMessage(streamId);
throw Http2Exception.protocolError("Priority Frame recieved for stream id %d which is in an invalid state",
streamId);
}
// TODO: fix me...consume priority tree listener interface
setDependencyHeaders(msg.headers(), streamDependency, weight, exclusive);
}
@Override
public void onRstStreamRead(ChannelHandlerContext ctx, int streamId, long errorCode) throws Http2Exception {
Http2HttpMessageAccumulator msgAccumulator = getMessage(streamId);
if (msgAccumulator != null) {
removeMessage(streamId);
FullHttpMessage msg = messageMap.get(streamId);
if (msg != null) {
fireChannelRead(ctx, msg, streamId);
}
}
@Override
public void onPushPromiseRead(ChannelHandlerContext ctx, int streamId, int promisedStreamId, Http2Headers headers,
int padding) throws Http2Exception {
// A push promise should not be allowed to add headers to an existing stream
FullHttpMessage msg = processHeadersBegin(ctx, promisedStreamId, headers, false, false, false);
if (msg == null) {
throw Http2Exception.protocolError("Push Promise Frame recieved for pre-existing stream id %d",
promisedStreamId);
}
msg.headers().set(Http2ToHttpHeaders.Names.STREAM_PROMISE_ID, streamId);
processHeadersEnd(ctx, promisedStreamId, msg, false);
}
@Override
public void onSettingsAckRead(ChannelHandlerContext ctx) throws Http2Exception {
// NOOP
@ -324,30 +328,6 @@ public class InboundHttp2ToHttpAdapter extends Http2ConnectionAdapter implements
// NOOP
}
@Override
public void onPushPromiseRead(ChannelHandlerContext ctx, int streamId, int promisedStreamId, Http2Headers headers,
int padding) throws Http2Exception {
// Do not allow adding of headers to existing Http2HttpMessageAccumulator
// according to spec (http://tools.ietf.org/html/draft-ietf-httpbis-http2-14#section-6.6) there must
// be a CONTINUATION frame for more headers
Http2HttpMessageAccumulator msgAccumulator = processHeadersBegin(promisedStreamId, headers, false);
if (msgAccumulator == null) {
throw Http2Exception.protocolError("Push Promise Frame recieved for pre-existing stream id %d",
promisedStreamId);
}
try {
msgAccumulator.setHeader(Http2HttpHeaders.Names.STREAM_PROMISE_ID, streamId);
} catch (IllegalStateException e) {
removeMessage(streamId);
throw Http2Exception.protocolError(
"Push Promise Frame recieved for stream id %d which is in an invalid state",
promisedStreamId);
}
processHeadersEnd(ctx, promisedStreamId, msgAccumulator, false);
}
@Override
public void onGoAwayRead(ChannelHandlerContext ctx, int lastStreamId, long errorCode, ByteBuf debugData)
throws Http2Exception {
@ -366,278 +346,226 @@ public class InboundHttp2ToHttpAdapter extends Http2ConnectionAdapter implements
// NOOP
}
private Http2HttpMessageAccumulator putMessage(int streamId, Http2HttpMessageAccumulator message) {
return messageMap.put(streamId, message);
}
/**
* Allows messages to be sent up the pipeline before the next phase in the
* HTTP message flow is detected.
*/
private interface ImmediateSendDetector {
/**
* Determine if the response should be sent immediately, or wait for the end of the stream
*
* @param msg The response to test
* @return {@code true} if the message should be sent immediately
* {@code false) if we should wait for the end of the stream
*/
boolean mustSendImmediately(FullHttpMessage msg);
private Http2HttpMessageAccumulator getMessage(int streamId) {
return messageMap.get(streamId);
}
private Http2HttpMessageAccumulator removeMessage(int streamId) {
return messageMap.remove(streamId);
/**
* Determine if a copy must be made after an immediate send happens.
* <p>
* An example of this use case is if a request is received
* with a 'Expect: 100-continue' header. The message will be sent immediately,
* and the data will be queued and sent at the end of the stream.
*
* @param msg The message which has just been sent due to {@link #mustSendImmediatley}
* @return A modified copy of the {@code msg} or {@code null} if a copy is not needed.
*/
FullHttpMessage copyIfNeeded(FullHttpMessage msg);
}
/**
* Translate HTTP/2 headers into an object which can build the corresponding HTTP/1.x response objects
*
* @param http2Headers The HTTP/2 headers corresponding to a new stream
* @return Collector for HTTP/1.x objects
* @throws Http2Exception If any of the HTTP/2 headers to not translate to valid HTTP/1.x headers
* @throws IllegalStateException If this object is not in the correct state to accept additional headers
* Default implementation of {@link ImmediateSendDetector}
*/
private Http2HttpMessageAccumulator newHttpResponseAccumulator(Http2Headers http2Headers) throws Http2Exception,
IllegalStateException {
HttpResponseStatus status = null;
try {
status = HttpResponseStatus.parseLine(http2Headers.status());
} catch (Exception e) {
throw Http2Exception.protocolError(
"Unrecognized HTTP status code '%s' encountered in translation to HTTP/1.x",
http2Headers.status());
}
// HTTP/2 does not define a way to carry the version or reason phrase that is included in an HTTP/1.1
// status line.
Http2HttpMessageAccumulator messageAccumulator = new Http2HttpMessageAccumulator(new DefaultHttpResponse(
HttpVersion.HTTP_1_1, status, validateHttpHeaders));
messageAccumulator.add(http2Headers);
return messageAccumulator;
}
private static final class DefaultImmediateSendDetector implements ImmediateSendDetector {
private static DefaultImmediateSendDetector instance;
/**
* Translate HTTP/2 headers into an object which can build the corresponding HTTP/1.x request objects
*
* @param http2Headers The HTTP/2 headers corresponding to a new stream
* @return Collector for HTTP/1.x objects
* @throws Http2Exception If any of the HTTP/2 headers to not translate to valid HTTP/1.x headers
* @throws IllegalStateException If this object is not in the correct state to accept additional headers
*/
private Http2HttpMessageAccumulator newHttpRequestAccumulator(Http2Headers http2Headers) throws Http2Exception,
IllegalStateException {
// HTTP/2 does not define a way to carry the version identifier that is
// included in the HTTP/1.1 request line.
Http2HttpMessageAccumulator messageAccumulator = new Http2HttpMessageAccumulator(new DefaultHttpRequest(
HttpVersion.HTTP_1_1, HttpMethod.valueOf(http2Headers.method()), http2Headers.path(),
validateHttpHeaders));
messageAccumulator.add(http2Headers);
return messageAccumulator;
}
/**
* Provides a container to collect HTTP/1.x objects until the end of the stream has been reached
*/
private final class Http2HttpMessageAccumulator {
private HttpMessage message;
private LastHttpContent trailer;
private long contentLength;
/**
* Creates a new instance
*
* @param message The HTTP/1.x object which represents the headers
*/
public Http2HttpMessageAccumulator(HttpMessage message) {
if (message == null) {
throw new NullPointerException("message");
}
this.message = message;
this.trailer = null;
this.contentLength = 0;
private DefaultImmediateSendDetector() {
}
/**
* Set a HTTP/1.x header
*
* @param name The name of the header
* @param value The value of the header
* @return The headers object after the set operation
* @throws IllegalStateException If this object is not in the correct state to accept additional headers
*/
public HttpHeaders setHeader(CharSequence name, Object value) throws IllegalStateException {
HttpHeaders headers = currentHeaders();
if (headers == null) {
throw new IllegalStateException("Headers object is null");
public static DefaultImmediateSendDetector getInstance() {
if (instance == null) {
instance = new DefaultImmediateSendDetector();
}
return headers.set(name, value);
return instance;
}
/**
* Removes the header with the specified name.
*
* @param name The name of the header to remove
* @return {@code true} if and only if at least one entry has been removed
* @throws IllegalStateException If this object is not in the correct state to accept additional headers
*/
public boolean removeHeader(CharSequence name) throws IllegalStateException {
HttpHeaders headers = currentHeaders();
if (headers == null) {
throw new IllegalStateException("Headers object is null");
}
return headers.remove(name);
}
/**
* Send the headers that have been accumulated so far, if they have not already been sent
*
* @param ctx The channel context for which to propagate events
* @param setContentLength {@code true} to set the Content-Length header
* @return {@code true} If a non-trailer header was fired to {@code ctx}, {@code false} otherwise
*/
private boolean sendHeaders(ChannelHandlerContext ctx, boolean setContentLength) {
HttpHeaders headers = currentHeaders();
if (headers == null) {
return false;
}
// Transfer-Encoding header is not valid
headers.remove(HttpHeaders.Names.TRANSFER_ENCODING);
headers.remove(HttpHeaders.Names.TRAILER);
// Initial header and trailer header will never need to be sent at the same time
// the behavior is that all header events will be appended to the initial header
// until some data is received, and only then will the trailer be used
if (!headerConsumed()) {
// The Connection and Keep-Alive headers are no longer valid
HttpHeaderUtil.setKeepAlive(message, true);
if (setContentLength) {
HttpHeaderUtil.setContentLength(message, contentLength);
}
ctx.fireChannelRead(message);
message = null;
return true;
} else if (trailerExists()) {
ctx.fireChannelRead(trailer);
trailer = LastHttpContent.EMPTY_LAST_CONTENT;
@Override
public boolean mustSendImmediately(FullHttpMessage msg) {
if (msg instanceof FullHttpResponse) {
return ((FullHttpResponse) msg).status().isInformational();
} else if (msg instanceof FullHttpRequest) {
return ((FullHttpRequest) msg).headers().contains(HttpHeaders.Names.EXPECT);
}
return false;
}
/**
* Called when the end of the stream is encountered
*
* @param ctx The channel context for which to propagate events
*/
public void endOfStream(ChannelHandlerContext ctx) {
if (sendHeaders(ctx, true)) {
ctx.fireChannelRead(LastHttpContent.EMPTY_LAST_CONTENT);
@Override
public FullHttpMessage copyIfNeeded(FullHttpMessage msg) {
if (msg instanceof FullHttpRequest) {
FullHttpRequest copy = ((FullHttpRequest) msg).copy(null);
copy.headers().remove(HttpHeaders.Names.EXPECT);
return copy;
}
return null;
}
}
/**
* Set/clear all HTTP/1.x headers related to stream dependencies
*
* @param headers The headers to set
* @param streamDependency The stream id for which the {@code objAccumulator} is dependent on
* @param weight The dependency weight
* @param exclusive The exclusive HTTP/2 flag
*/
private static void setDependencyHeaders(HttpHeaders headers, int streamDependency,
short weight, boolean exclusive) {
if (streamDependency != 0) {
headers.set(Http2ToHttpHeaders.Names.STREAM_DEPENDENCY_ID, streamDependency);
headers.set(Http2ToHttpHeaders.Names.STREAM_EXCLUSIVE, exclusive);
headers.set(Http2ToHttpHeaders.Names.STREAM_WEIGHT, weight);
} else {
headers.set(Http2ToHttpHeaders.Names.STREAM_DEPENDENCY_ID);
headers.set(Http2ToHttpHeaders.Names.STREAM_EXCLUSIVE);
headers.set(Http2ToHttpHeaders.Names.STREAM_WEIGHT);
}
}
/**
* Create a new object to contain the response data
*
* @param streamId The stream associated with the response
* @param http2Headers The initial set of HTTP/2 headers to create the response with
* @param validateHttpHeaders Used by http-codec to validate headers
* @return A new response object which represents headers/data
* @throws Http2Exception see {@link #addHttp2ToHttpHeaders(int, Http2Headers, FullHttpMessage, Map)}
*/
private static FullHttpMessage newHttpResponse(int streamId, Http2Headers http2Headers, boolean validateHttpHeaders)
throws Http2Exception {
HttpResponseStatus status = Http2ToHttpHeaders.parseStatus(http2Headers.status());
// HTTP/2 does not define a way to carry the version or reason phrase that is included in an HTTP/1.1
// status line.
FullHttpMessage msg = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, status, validateHttpHeaders);
addHttp2ToHttpHeaders(streamId, http2Headers, msg, false, HEADER_NAME_TRANSLATIONS_RESPONSE);
return msg;
}
/**
* Create a new object to contain the request data
*
* @param streamId The stream associated with the request
* @param http2Headers The initial set of HTTP/2 headers to create the request with
* @param validateHttpHeaders Used by http-codec to validate headers
* @return A new request object which represents headers/data
* @throws Http2Exception see {@link #addHttp2ToHttpHeaders(int, Http2Headers, FullHttpMessage, Map)}
*/
private static FullHttpMessage newHttpRequest(int streamId, Http2Headers http2Headers, boolean validateHttpHeaders)
throws Http2Exception {
// HTTP/2 does not define a way to carry the version identifier that is
// included in the HTTP/1.1 request line.
FullHttpMessage msg = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1,
HttpMethod.valueOf(http2Headers.method()), http2Headers.path(), validateHttpHeaders);
addHttp2ToHttpHeaders(streamId, http2Headers, msg, false, HEADER_NAME_TRANSLATIONS_REQUEST);
return msg;
}
/**
* Translate and add HTTP/2 headers to HTTP/1.x headers
*
* @param streamId The stream associated with {@code sourceHeaders}
* @param sourceHeaders The HTTP/2 headers to convert
* @param destinationMessage The object which will contain the resulting HTTP/1.x headers
* @param addToTrailer {@code true} to add to trailing headers. {@code false} to add to initial headers.
* @throws Http2Exception see {@link #addHttp2ToHttpHeaders(int, Http2Headers, FullHttpMessage, Map)}
*/
private static void addHttp2ToHttpHeaders(int streamId, Http2Headers sourceHeaders,
FullHttpMessage destinationMessage, boolean addToTrailer) throws Http2Exception {
addHttp2ToHttpHeaders(streamId, sourceHeaders, destinationMessage, addToTrailer,
(destinationMessage instanceof FullHttpRequest) ? HEADER_NAME_TRANSLATIONS_REQUEST
: HEADER_NAME_TRANSLATIONS_RESPONSE);
}
/**
* Translate and add HTTP/2 headers to HTTP/1.x headers
*
* @param streamId The stream associated with {@code sourceHeaders}
* @param sourceHeaders The HTTP/2 headers to convert
* @param destinationMessage The object which will contain the resulting HTTP/1.x headers
* @param addToTrailer {@code true} to add to trailing headers. {@code false} to add to initial headers.
* @param translations A map used to help translate HTTP/2 headers to HTTP/1.x headers
* @throws Http2Exception If not all HTTP/2 headers can be translated to HTTP/1.x
*/
private static void addHttp2ToHttpHeaders(int streamId, Http2Headers sourceHeaders,
FullHttpMessage destinationMessage, boolean addToTrailer, Map<String, String> translations)
throws Http2Exception {
HttpHeaders headers = addToTrailer ? destinationMessage.trailingHeaders() : destinationMessage.headers();
HttpForEachVisitor visitor = new HttpForEachVisitor(headers, translations);
sourceHeaders.forEach(visitor);
if (visitor.exception() != null) {
throw visitor.exception();
}
/**
* Add a HTTP/1.x object which represents part of the message body
*
* @param httpContent The content to add
* @param ctx The channel context for which to propagate events
* @throws TooLongFrameException If the {@code contentLength} is exceeded with the addition of the
* {@code httpContent}
*/
public void add(HttpContent httpContent, ChannelHandlerContext ctx) throws TooLongFrameException {
ByteBuf content = httpContent.content();
if (contentLength > maxContentLength - content.readableBytes()) {
throw new TooLongFrameException("HTTP/2 content length exceeded " + maxContentLength + " bytes.");
}
headers.remove(HttpHeaders.Names.TRANSFER_ENCODING);
headers.remove(HttpHeaders.Names.TRAILER);
if (!addToTrailer) {
headers.set(Http2ToHttpHeaders.Names.STREAM_ID, streamId);
HttpHeaderUtil.setKeepAlive(destinationMessage, true);
}
}
contentLength += content.readableBytes();
sendHeaders(ctx, false);
ctx.fireChannelRead(httpContent.retain());
/**
* A visitor which translates HTTP/2 headers to HTTP/1 headers
*/
private static final class HttpForEachVisitor implements HeaderVisitor {
private Map<String, String> translations;
private HttpHeaders headers;
private Http2Exception e;
/**
* Create a new instance
*
* @param headers The HTTP/1.x headers object to store the results of the translation
* @param translations A map used to help translate HTTP/2 headers to HTTP/1.x headers
*/
public HttpForEachVisitor(HttpHeaders headers, Map<String, String> translations) {
this.translations = translations;
this.headers = headers;
this.e = null;
}
/**
* Extend the current set of HTTP/1.x headers
*
* @param http2Headers The HTTP/2 headers to be added
* @throws Http2Exception If any HTTP/2 headers do not map to HTTP/1.x headers
* @throws IllegalStateException If this object is not in the correct state to accept additional headers
*/
public void add(Http2Headers http2Headers) throws Http2Exception, IllegalStateException {
add(http2Headers, InboundHttp2ToHttpAdapter.this.connection.isServer() ? HEADER_NAME_TRANSLATIONS_REQUEST
: HEADER_NAME_TRANSLATIONS_RESPONSE);
}
@Override
public boolean visit(Entry<String, String> entry) {
String translatedName = translations.get(entry.getKey());
if (translatedName != null || !HEADERS_TO_EXCLUDE.contains(entry.getKey())) {
if (translatedName == null) {
translatedName = entry.getKey();
}
/**
* Extend the current set of HTTP/1.x headers
*
* @param http2Headers The HTTP/2 headers to be added
* @param headerTranslations Translation map from HTTP/2 headers to HTTP/1.x headers
* @throws Http2Exception If any HTTP/2 headers do not map to HTTP/1.x headers
* @throws IllegalStateException If this object is not in the correct state to accept additional headers
*/
private void add(Http2Headers http2Headers, Map<String, String> headerTranslations) throws Http2Exception,
IllegalStateException {
HttpHeaders headers = currentHeaders();
if (headers == null) {
throw new IllegalStateException("Headers object is null");
}
// http://tools.ietf.org/html/draft-ietf-httpbis-http2-14#section-8.1.2.3
// All headers that start with ':' are only valid in HTTP/2 context
Iterator<Entry<String, String>> itr = http2Headers.iterator();
while (itr.hasNext()) {
Entry<String, String> entry = itr.next();
String translatedName = headerTranslations.get(entry.getKey());
if (translatedName != null || !HEADERS_TO_EXCLUDE.contains(entry.getKey())) {
if (translatedName == null) {
translatedName = entry.getKey();
}
if (translatedName.isEmpty() || translatedName.charAt(0) == ':') {
throw Http2Exception.protocolError(
"Unknown HTTP/2 header '%s' encountered in translation to HTTP/1.x",
translatedName);
} else {
headers.add(translatedName, entry.getValue());
}
// http://tools.ietf.org/html/draft-ietf-httpbis-http2-14#section-8.1.2.3
// All headers that start with ':' are only valid in HTTP/2 context
if (translatedName.isEmpty() || translatedName.charAt(0) == ':') {
e = Http2Exception
.protocolError("Unknown HTTP/2 header '%s' encountered in translation to HTTP/1.x",
translatedName);
return false;
} else {
headers.add(translatedName, entry.getValue());
}
}
return true;
}
/**
* Set the current trailer header
* Get any exceptions encountered while translating HTTP/2 headers to HTTP/1.x headers
*
* @param trailer The object which represents the trailing headers
* @return
* <ul>
* <li>{@code null} if no exceptions where encountered</li>
* <li>Otherwise an exception describing what went wrong</li>
* </ul>
*/
public void trailer(LastHttpContent trailer) {
this.trailer = trailer;
}
/**
* Determine if the initial header and continuations have been processed and sent up the pipeline
*
* @return {@code true} if the initial header and continuations have been processed and sent up the pipeline
*/
public boolean headerConsumed() {
return message == null;
}
/**
* Determine if the trailing header and continuations have been processed and sent up the pipeline
*
* @return {@code true} if the trailing header and continuations have been processed and sent up the pipeline
*/
public boolean trailerConsumed() {
return trailer != null && trailer.equals(LastHttpContent.EMPTY_LAST_CONTENT);
}
/**
* Determine if there is a trailing header that has not yet been sent up the pipeline
*
* @return {@code true} if there is a trailing header that has not yet been sent up the pipeline
*/
public boolean trailerExists() {
return trailer != null && !trailerConsumed();
}
/**
* Obtain the current object for accumulating headers
*
* @return The primary (non-trailer) header if it exists, otherwise the trailer header
*/
private HttpHeaders currentHeaders() {
if (headerConsumed()) {
return trailerExists() ? trailer.trailingHeaders() : null;
}
return message.headers();
public Http2Exception exception() {
return e;
}
}
}

View File

@ -124,10 +124,10 @@ public class DelegatingHttp2HttpConnectionHandlerTest {
public void testJustHeadersRequest() throws Exception {
final HttpRequest request = new DefaultFullHttpRequest(HTTP_1_1, GET, "/example");
final HttpHeaders httpHeaders = request.headers();
httpHeaders.set(Http2HttpHeaders.Names.STREAM_ID, 5);
httpHeaders.set(Http2ToHttpHeaders.Names.STREAM_ID, 5);
httpHeaders.set(HttpHeaders.Names.HOST, "http://my-user_name@www.example.org:5555/example");
httpHeaders.set(Http2HttpHeaders.Names.AUTHORITY, "www.example.org:5555");
httpHeaders.set(Http2HttpHeaders.Names.SCHEME, "http");
httpHeaders.set(Http2ToHttpHeaders.Names.AUTHORITY, "www.example.org:5555");
httpHeaders.set(Http2ToHttpHeaders.Names.SCHEME, "http");
httpHeaders.add("foo", "goo");
httpHeaders.add("foo", "goo2");
httpHeaders.add("foo2", "goo2");

View File

@ -21,6 +21,7 @@ import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.reset;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
@ -36,16 +37,17 @@ import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.handler.codec.http.DefaultHttpContent;
import io.netty.handler.codec.http.DefaultFullHttpRequest;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.DefaultHttpRequest;
import io.netty.handler.codec.http.DefaultLastHttpContent;
import io.netty.handler.codec.http.HttpContent;
import io.netty.handler.codec.http.FullHttpMessage;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpMessage;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpObject;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http.LastHttpContent;
import io.netty.handler.codec.http2.Http2TestUtil.Http2Runnable;
import io.netty.util.NetUtil;
@ -66,25 +68,33 @@ import org.mockito.MockitoAnnotations;
public class InboundHttp2ToHttpAdapterTest {
@Mock
private HttpResponseListener messageObserver;
private HttpResponseListener serverObserver;
@Mock
private Http2FrameObserver clientObserver;
private HttpResponseListener clientObserver;
private Http2FrameWriter frameWriter;
private ServerBootstrap sb;
private Bootstrap cb;
private Channel serverChannel;
private Channel serverConnectedChannel;
private Channel clientChannel;
private CountDownLatch serverLatch;
private long maxContentLength;
private CountDownLatch clientLatch;
private int maxContentLength;
private HttpResponseDelegator serverDelegator;
private HttpResponseDelegator clientDelegator;
@Before
public void setup() throws Exception {
MockitoAnnotations.initMocks(this);
maxContentLength = 1 << 16;
serverLatch = new CountDownLatch(1);
clientDelegator = null;
serverDelegator = null;
serverConnectedChannel = null;
maxContentLength = 1024;
setServerLatch(1);
setClientLatch(1);
frameWriter = new DefaultHttp2FrameWriter();
sb = new ServerBootstrap();
@ -97,10 +107,12 @@ public class InboundHttp2ToHttpAdapterTest {
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
Http2Connection connection = new DefaultHttp2Connection(true);
p.addLast("reader", new FrameAdapter(InboundHttp2ToHttpAdapter.newInstance(
connection, maxContentLength),
new CountDownLatch(10)));
p.addLast(new HttpResponseDelegator(messageObserver));
p.addLast("reader",
new FrameAdapter(InboundHttp2ToHttpAdapter.newInstance(connection, maxContentLength),
new CountDownLatch(10)));
serverDelegator = new HttpResponseDelegator(serverObserver, serverLatch);
p.addLast(serverDelegator);
serverConnectedChannel = ch;
}
});
@ -110,7 +122,12 @@ public class InboundHttp2ToHttpAdapterTest {
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast("reader", new FrameAdapter(clientObserver, new CountDownLatch(10)));
Http2Connection connection = new DefaultHttp2Connection(false);
p.addLast("reader",
new FrameAdapter(InboundHttp2ToHttpAdapter.newInstance(connection, maxContentLength),
new CountDownLatch(10)));
clientDelegator = new HttpResponseDelegator(clientObserver, clientLatch);
p.addLast(clientDelegator);
}
});
@ -127,315 +144,393 @@ public class InboundHttp2ToHttpAdapterTest {
serverChannel.close().sync();
sb.group().shutdownGracefully();
cb.group().shutdownGracefully();
clientDelegator = null;
serverDelegator = null;
clientChannel = null;
serverChannel = null;
serverConnectedChannel = null;
}
@Test
public void clientRequestSingleHeaderNoDataFrames() throws Exception {
serverLatch = new CountDownLatch(2);
final HttpMessage request = new DefaultHttpRequest(HttpVersion.HTTP_1_1,
HttpMethod.GET, "/some/path/resource2", true);
final HttpMessage request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET,
"/some/path/resource2", true);
HttpHeaders httpHeaders = request.headers();
httpHeaders.set(Http2HttpHeaders.Names.SCHEME, "https");
httpHeaders.set(Http2HttpHeaders.Names.AUTHORITY, "example.org");
httpHeaders.set(Http2HttpHeaders.Names.STREAM_ID, 3);
httpHeaders.set(Http2ToHttpHeaders.Names.SCHEME, "https");
httpHeaders.set(Http2ToHttpHeaders.Names.AUTHORITY, "example.org");
httpHeaders.set(Http2ToHttpHeaders.Names.STREAM_ID, 3);
httpHeaders.set(HttpHeaders.Names.CONTENT_LENGTH, 0);
final Http2Headers http2Headers = new DefaultHttp2Headers.Builder()
.method("GET").scheme("https").authority("example.org")
.path("/some/path/resource2").build();
final Http2Headers http2Headers = new DefaultHttp2Headers.Builder().method("GET").scheme("https")
.authority("example.org").path("/some/path/resource2").build();
runInChannel(clientChannel, new Http2Runnable() {
@Override
public void run() {
frameWriter.writeHeaders(ctx(), 3, http2Headers, 0, true, newPromise());
ctx().flush();
frameWriter.writeHeaders(ctxClient(), 3, http2Headers, 0, true, newPromiseClient());
ctxClient().flush();
}
});
awaitRequests();
verify(messageObserver).messageReceived(eq(request));
verify(messageObserver).messageReceived(eq(LastHttpContent.EMPTY_LAST_CONTENT));
verify(serverObserver).messageReceived(eq(request));
}
@Test
public void clientRequestOneDataFrame() throws Exception {
serverLatch = new CountDownLatch(2);
final HttpMessage request = new DefaultHttpRequest(HttpVersion.HTTP_1_1,
HttpMethod.GET, "/some/path/resource2", true);
HttpHeaders httpHeaders = request.headers();
httpHeaders.set(Http2HttpHeaders.Names.STREAM_ID, 3);
final Http2Headers http2Headers = new DefaultHttp2Headers.Builder()
.method("GET").path("/some/path/resource2").build();
final String text = "hello world";
final HttpContent content = new DefaultLastHttpContent(Unpooled.copiedBuffer(text.getBytes()),
true);
final ByteBuf content = Unpooled.copiedBuffer(text.getBytes());
final FullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET,
"/some/path/resource2", content, true);
HttpHeaders httpHeaders = request.headers();
httpHeaders.set(Http2ToHttpHeaders.Names.STREAM_ID, 3);
httpHeaders.set(HttpHeaders.Names.CONTENT_LENGTH, text.length());
final Http2Headers http2Headers = new DefaultHttp2Headers.Builder().method("GET").path("/some/path/resource2")
.build();
runInChannel(clientChannel, new Http2Runnable() {
@Override
public void run() {
frameWriter.writeHeaders(ctx(), 3, http2Headers, 0, false, newPromise());
frameWriter.writeData(ctx(), 3,
Unpooled.copiedBuffer(text.getBytes()), 0, true, newPromise());
ctx().flush();
frameWriter.writeHeaders(ctxClient(), 3, http2Headers, 0, false, newPromiseClient());
frameWriter.writeData(ctxClient(), 3, Unpooled.copiedBuffer(text.getBytes()), 0, true,
newPromiseClient());
ctxClient().flush();
}
});
awaitRequests();
ArgumentCaptor<HttpObject> httpObjectCaptor = ArgumentCaptor.forClass(HttpObject.class);
verify(messageObserver, times(2)).messageReceived(httpObjectCaptor.capture());
List<HttpObject> capturedHttpObjects = httpObjectCaptor.getAllValues();
assertEquals(request, capturedHttpObjects.get(0));
assertEquals(content, capturedHttpObjects.get(1));
verify(serverObserver).messageReceived(eq(request));
request.release();
}
@Test
public void clientRequestMultipleDataFrames() throws Exception {
serverLatch = new CountDownLatch(3);
final HttpMessage request = new DefaultHttpRequest(HttpVersion.HTTP_1_1,
HttpMethod.GET, "/some/path/resource2", true);
final String text = "hello world big time data!";
final ByteBuf content = Unpooled.copiedBuffer(text.getBytes());
final FullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET,
"/some/path/resource2", content, true);
HttpHeaders httpHeaders = request.headers();
httpHeaders.set(Http2HttpHeaders.Names.STREAM_ID, 3);
final Http2Headers http2Headers = new DefaultHttp2Headers.Builder()
.method("GET").path("/some/path/resource2").build();
final String text = "hello world";
final String text2 = "hello world2";
final HttpContent content = new DefaultHttpContent(Unpooled.copiedBuffer(text.getBytes()));
final HttpContent content2 = new DefaultLastHttpContent(Unpooled.copiedBuffer(text2.getBytes()),
true);
httpHeaders.set(Http2ToHttpHeaders.Names.STREAM_ID, 3);
httpHeaders.set(HttpHeaders.Names.CONTENT_LENGTH, text.length());
final Http2Headers http2Headers = new DefaultHttp2Headers.Builder().method("GET").path("/some/path/resource2")
.build();
final int midPoint = text.length() / 2;
runInChannel(clientChannel, new Http2Runnable() {
@Override
public void run() {
frameWriter.writeHeaders(ctx(), 3, http2Headers, 0, false, newPromise());
frameWriter.writeData(ctx(), 3,
Unpooled.copiedBuffer(text.getBytes()), 0, false, newPromise());
frameWriter.writeData(ctx(), 3,
Unpooled.copiedBuffer(text2.getBytes()), 0, true, newPromise());
ctx().flush();
frameWriter.writeHeaders(ctxClient(), 3, http2Headers, 0, false, newPromiseClient());
frameWriter.writeData(ctxClient(), 3, content.slice(0, midPoint).retain(), 0,
false, newPromiseClient());
frameWriter.writeData(ctxClient(), 3, content.slice(midPoint, text.length() - midPoint).retain(), 0,
true, newPromiseClient());
ctxClient().flush();
}
});
awaitRequests();
ArgumentCaptor<HttpObject> httpObjectCaptor = ArgumentCaptor.forClass(HttpObject.class);
verify(messageObserver, times(3)).messageReceived(httpObjectCaptor.capture());
List<HttpObject> capturedHttpObjects = httpObjectCaptor.getAllValues();
assertEquals(request, capturedHttpObjects.get(0));
assertEquals(content, capturedHttpObjects.get(1));
assertEquals(content2, capturedHttpObjects.get(2));
verify(serverObserver).messageReceived(eq(request));
request.release();
}
@Test
public void clientRequestMultipleEmptyDataFrames() throws Exception {
serverLatch = new CountDownLatch(4);
final HttpMessage request = new DefaultHttpRequest(HttpVersion.HTTP_1_1,
HttpMethod.GET, "/some/path/resource2", true);
HttpHeaders httpHeaders = request.headers();
httpHeaders.set(Http2HttpHeaders.Names.STREAM_ID, 3);
final Http2Headers http2Headers = new DefaultHttp2Headers.Builder()
.method("GET").path("/some/path/resource2").build();
final String text = "";
final HttpContent content = new DefaultHttpContent(Unpooled.copiedBuffer(text.getBytes()));
final HttpContent content2 = new DefaultLastHttpContent(Unpooled.copiedBuffer(text.getBytes()),
true);
final ByteBuf content = Unpooled.copiedBuffer(text.getBytes());
final FullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET,
"/some/path/resource2", content, true);
HttpHeaders httpHeaders = request.headers();
httpHeaders.set(Http2ToHttpHeaders.Names.STREAM_ID, 3);
httpHeaders.set(HttpHeaders.Names.CONTENT_LENGTH, text.length());
final Http2Headers http2Headers = new DefaultHttp2Headers.Builder().method("GET").path("/some/path/resource2")
.build();
runInChannel(clientChannel, new Http2Runnable() {
@Override
public void run() {
frameWriter.writeHeaders(ctx(), 3, http2Headers, 0, false, newPromise());
frameWriter.writeData(ctx(), 3, Unpooled.copiedBuffer(text.getBytes()), 0, false,
newPromise());
frameWriter.writeData(ctx(), 3, Unpooled.copiedBuffer(text.getBytes()), 0, false,
newPromise());
frameWriter.writeData(ctx(), 3, Unpooled.copiedBuffer(text.getBytes()), 0, true, newPromise());
ctx().flush();
frameWriter.writeHeaders(ctxClient(), 3, http2Headers, 0, false, newPromiseClient());
frameWriter.writeData(ctxClient(), 3, content.retain(), 0, false, newPromiseClient());
frameWriter.writeData(ctxClient(), 3, content.retain(), 0, false, newPromiseClient());
frameWriter.writeData(ctxClient(), 3, content.retain(), 0, true, newPromiseClient());
ctxClient().flush();
}
});
awaitRequests();
ArgumentCaptor<HttpObject> httpObjectCaptor = ArgumentCaptor.forClass(HttpObject.class);
verify(messageObserver, times(4)).messageReceived(httpObjectCaptor.capture());
List<HttpObject> capturedHttpObjects = httpObjectCaptor.getAllValues();
assertEquals(request, capturedHttpObjects.get(0));
assertEquals(content, capturedHttpObjects.get(1));
assertEquals(content, capturedHttpObjects.get(2));
assertEquals(content2, capturedHttpObjects.get(3));
verify(serverObserver).messageReceived(eq(request));
request.release();
}
@Test
public void clientRequestHeaderContinuation() throws Exception {
serverLatch = new CountDownLatch(2);
final HttpMessage request = new DefaultHttpRequest(HttpVersion.HTTP_1_1,
HttpMethod.GET, "/some/path/resource2", true);
HttpHeaders httpHeaders = request.headers();
httpHeaders.set(Http2HttpHeaders.Names.STREAM_ID, 3);
httpHeaders.set("foo", "goo");
httpHeaders.set("foo2", "goo2");
httpHeaders.add("foo2", "goo3");
final Http2Headers http2Headers = new DefaultHttp2Headers.Builder()
.method("GET").path("/some/path/resource2").build();
final Http2Headers http2Headers2 = new DefaultHttp2Headers.Builder().set("foo", "goo")
.set("foo2", "goo2").add("foo2", "goo3").build();
public void clientRequestMultipleHeaders() throws Exception {
// writeHeaders will implicitly add an END_HEADERS tag each time and so this test does not follow the HTTP
// message flow. We currently accept this message flow and just add the second headers to the trailing headers.
final String text = "";
final HttpContent content = new DefaultLastHttpContent(Unpooled.copiedBuffer(text.getBytes()),
true);
final ByteBuf content = Unpooled.copiedBuffer(text.getBytes());
final FullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET,
"/some/path/resource2", content, true);
HttpHeaders httpHeaders = request.headers();
httpHeaders.set(Http2ToHttpHeaders.Names.STREAM_ID, 3);
httpHeaders.set(HttpHeaders.Names.CONTENT_LENGTH, text.length());
HttpHeaders trailingHeaders = request.trailingHeaders();
trailingHeaders.set("FoO", "goo");
trailingHeaders.set("foO2", "goo2");
trailingHeaders.add("fOo2", "goo3");
final Http2Headers http2Headers = new DefaultHttp2Headers.Builder().method("GET").path("/some/path/resource2")
.build();
final Http2Headers http2Headers2 = new DefaultHttp2Headers.Builder().set("foo", "goo").set("foo2", "goo2")
.add("foo2", "goo3").build();
runInChannel(clientChannel, new Http2Runnable() {
@Override
public void run() {
frameWriter.writeHeaders(ctx(), 3, http2Headers, 0, false, newPromise());
frameWriter.writeHeaders(ctx(), 3, http2Headers2, 0, false, newPromise());
frameWriter.writeData(ctx(), 3,
Unpooled.copiedBuffer(text.getBytes()), 0, true, newPromise());
ctx().flush();
frameWriter.writeHeaders(ctxClient(), 3, http2Headers, 0, false, newPromiseClient());
frameWriter.writeHeaders(ctxClient(), 3, http2Headers2, 0, false, newPromiseClient());
frameWriter.writeData(ctxClient(), 3, Unpooled.copiedBuffer(text.getBytes()), 0, true,
newPromiseClient());
ctxClient().flush();
}
});
awaitRequests();
ArgumentCaptor<HttpObject> httpObjectCaptor = ArgumentCaptor.forClass(HttpObject.class);
verify(messageObserver, times(2)).messageReceived(httpObjectCaptor.capture());
List<HttpObject> capturedHttpObjects = httpObjectCaptor.getAllValues();
assertEquals(request, capturedHttpObjects.get(0));
assertEquals(content, capturedHttpObjects.get(1));
verify(serverObserver).messageReceived(eq(request));
request.release();
}
@Test
public void clientRequestTrailingHeaders() throws Exception {
serverLatch = new CountDownLatch(3);
final HttpMessage request = new DefaultHttpRequest(HttpVersion.HTTP_1_1,
HttpMethod.GET, "/some/path/resource2", true);
final String text = "some data";
final ByteBuf content = Unpooled.copiedBuffer(text.getBytes());
final FullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET,
"/some/path/resource2", content, true);
HttpHeaders httpHeaders = request.headers();
httpHeaders.set(Http2HttpHeaders.Names.STREAM_ID, 3);
final LastHttpContent trailer = new DefaultLastHttpContent(Unpooled.EMPTY_BUFFER, true);
HttpHeaders trailingHeaders = trailer.trailingHeaders();
trailingHeaders.set("foo", "goo");
trailingHeaders.set("foo2", "goo2");
trailingHeaders.add("foo2", "goo3");
final Http2Headers http2Headers = new DefaultHttp2Headers.Builder()
.method("GET").path("/some/path/resource2").build();
final Http2Headers http2Headers2 = new DefaultHttp2Headers.Builder().set("foo", "goo")
.set("foo2", "goo2").add("foo2", "goo3").build();
final String text = "not empty!";
final HttpContent content = new DefaultLastHttpContent(Unpooled.copiedBuffer(text.getBytes()),
true);
httpHeaders.set(Http2ToHttpHeaders.Names.STREAM_ID, 3);
httpHeaders.set(HttpHeaders.Names.CONTENT_LENGTH, text.length());
HttpHeaders trailingHeaders = request.trailingHeaders();
trailingHeaders.set("Foo", "goo");
trailingHeaders.set("fOo2", "goo2");
trailingHeaders.add("foO2", "goo3");
final Http2Headers http2Headers = new DefaultHttp2Headers.Builder().method("GET").path("/some/path/resource2")
.build();
final Http2Headers http2Headers2 = new DefaultHttp2Headers.Builder().set("foo", "goo").set("foo2", "goo2")
.add("foo2", "goo3").build();
runInChannel(clientChannel, new Http2Runnable() {
@Override
public void run() {
frameWriter.writeHeaders(ctx(), 3, http2Headers, 0, false, newPromise());
frameWriter.writeData(ctx(), 3,
Unpooled.copiedBuffer(text.getBytes()), 0, false, newPromise());
frameWriter.writeHeaders(ctx(), 3, http2Headers2, 0, true, newPromise());
ctx().flush();
frameWriter.writeHeaders(ctxClient(), 3, http2Headers, 0, false, newPromiseClient());
frameWriter.writeData(ctxClient(), 3, content.retain(), 0, false, newPromiseClient());
frameWriter.writeHeaders(ctxClient(), 3, http2Headers2, 0, true, newPromiseClient());
ctxClient().flush();
}
});
awaitRequests();
ArgumentCaptor<HttpObject> httpObjectCaptor = ArgumentCaptor.forClass(HttpObject.class);
verify(messageObserver, times(3)).messageReceived(httpObjectCaptor.capture());
List<HttpObject> capturedHttpObjects = httpObjectCaptor.getAllValues();
assertEquals(request, capturedHttpObjects.get(0));
assertEquals(content, capturedHttpObjects.get(1));
assertEquals(trailer, capturedHttpObjects.get(2));
}
@Test
public void clientRequestPushPromise() throws Exception {
serverLatch = new CountDownLatch(4);
final HttpMessage request = new DefaultHttpRequest(HttpVersion.HTTP_1_1,
HttpMethod.GET, "/some/path/resource", true);
HttpHeaders httpHeaders = request.headers();
httpHeaders.set(Http2HttpHeaders.Names.STREAM_ID, 3);
final HttpMessage request2 = new DefaultHttpRequest(HttpVersion.HTTP_1_1,
HttpMethod.GET, "/some/path/resource2", true);
HttpHeaders httpHeaders2 = request2.headers();
httpHeaders2.set(Http2HttpHeaders.Names.SCHEME, "https");
httpHeaders2.set(Http2HttpHeaders.Names.AUTHORITY, "example.org");
httpHeaders2.set(Http2HttpHeaders.Names.STREAM_ID, 5);
httpHeaders2.set(Http2HttpHeaders.Names.STREAM_PROMISE_ID, 3);
final Http2Headers http2Headers = new DefaultHttp2Headers.Builder()
.method("GET").path("/some/path/resource").build();
final Http2Headers http2Headers2 = new DefaultHttp2Headers.Builder()
.method("GET").path("/some/path/resource2").scheme("https")
.authority("example.org").build();
final String text = "hello 1!**";
final HttpContent content = new DefaultLastHttpContent(Unpooled.copiedBuffer(text.getBytes()),
true);
final String text2 = "hello 2!><";
final HttpContent content2 = new DefaultLastHttpContent(Unpooled.copiedBuffer(text2.getBytes()),
true);
runInChannel(clientChannel, new Http2Runnable() {
@Override
public void run() {
frameWriter.writeHeaders(ctx(), 3, http2Headers, 0, false, newPromise());
frameWriter.writePushPromise(ctx(), 3, 5, http2Headers2, 0, newPromise());
frameWriter.writeData(ctx(), 3,
Unpooled.copiedBuffer(text.getBytes()), 0, true, newPromise());
frameWriter.writeData(ctx(), 5,
Unpooled.copiedBuffer(text2.getBytes()), 0, true, newPromise());
ctx().flush();
}
});
awaitRequests();
ArgumentCaptor<HttpObject> httpObjectCaptor = ArgumentCaptor.forClass(HttpObject.class);
verify(messageObserver, times(4)).messageReceived(httpObjectCaptor.capture());
List<HttpObject> capturedHttpObjects = httpObjectCaptor.getAllValues();
assertEquals(request, capturedHttpObjects.get(0));
assertEquals(content, capturedHttpObjects.get(1));
assertEquals(request2, capturedHttpObjects.get(2));
assertEquals(content2, capturedHttpObjects.get(3));
verify(serverObserver).messageReceived(eq(request));
request.release();
}
@Test
public void clientRequestStreamDependency() throws Exception {
serverLatch = new CountDownLatch(4);
final HttpMessage request = new DefaultHttpRequest(HttpVersion.HTTP_1_1,
HttpMethod.GET, "/some/path/resource", true);
setServerLatch(2);
final String text = "hello world big time data!";
final ByteBuf content = Unpooled.copiedBuffer(text.getBytes());
final String text2 = "hello world big time data...number 2!!";
final ByteBuf content2 = Unpooled.copiedBuffer(text2.getBytes());
final FullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.PUT,
"/some/path/resource", content, true);
HttpHeaders httpHeaders = request.headers();
httpHeaders.set(Http2HttpHeaders.Names.STREAM_ID, 3);
final HttpMessage request2 = new DefaultHttpRequest(HttpVersion.HTTP_1_1,
HttpMethod.GET, "/some/path/resource2", true);
httpHeaders.set(Http2ToHttpHeaders.Names.STREAM_ID, 3);
httpHeaders.set(HttpHeaders.Names.CONTENT_LENGTH, text.length());
final FullHttpMessage request2 = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.PUT,
"/some/path/resource2", content2, true);
HttpHeaders httpHeaders2 = request2.headers();
httpHeaders2.set(Http2HttpHeaders.Names.STREAM_ID, 5);
httpHeaders2.set(Http2HttpHeaders.Names.STREAM_DEPENDENCY_ID, 3);
httpHeaders2.set(Http2HttpHeaders.Names.STREAM_EXCLUSIVE, true);
httpHeaders2.set(Http2HttpHeaders.Names.STREAM_WEIGHT, 256);
final Http2Headers http2Headers = new DefaultHttp2Headers.Builder()
.method("GET").path("/some/path/resource").build();
final Http2Headers http2Headers2 = new DefaultHttp2Headers.Builder()
.method("GET").path("/some/path/resource2").build();
final String text = "hello 1!**";
final HttpContent content = new DefaultLastHttpContent(Unpooled.copiedBuffer(text.getBytes()),
true);
final String text2 = "hello 2!><";
final HttpContent content2 = new DefaultLastHttpContent(Unpooled.copiedBuffer(text2.getBytes()),
true);
httpHeaders2.set(Http2ToHttpHeaders.Names.STREAM_ID, 5);
httpHeaders2.set(Http2ToHttpHeaders.Names.STREAM_DEPENDENCY_ID, 3);
httpHeaders2.set(Http2ToHttpHeaders.Names.STREAM_EXCLUSIVE, true);
httpHeaders2.set(Http2ToHttpHeaders.Names.STREAM_WEIGHT, 256);
httpHeaders2.set(HttpHeaders.Names.CONTENT_LENGTH, text2.length());
final Http2Headers http2Headers = new DefaultHttp2Headers.Builder().method("PUT").path("/some/path/resource")
.build();
final Http2Headers http2Headers2 = new DefaultHttp2Headers.Builder().method("PUT").path("/some/path/resource2")
.build();
runInChannel(clientChannel, new Http2Runnable() {
@Override
public void run() {
frameWriter.writeHeaders(ctx(), 3, http2Headers, 0, false, newPromise());
frameWriter.writeHeaders(ctx(), 5, http2Headers2, 0, false, newPromise());
frameWriter.writePriority(ctx(), 5, 3, (short) 256, true, newPromise());
frameWriter.writeData(ctx(), 3,
Unpooled.copiedBuffer(text.getBytes()), 0, true, newPromise());
frameWriter.writeData(ctx(), 5,
Unpooled.copiedBuffer(text2.getBytes()), 0, true, newPromise());
ctx().flush();
frameWriter.writeHeaders(ctxClient(), 3, http2Headers, 0, false, newPromiseClient());
frameWriter.writeHeaders(ctxClient(), 5, http2Headers2, 0, false, newPromiseClient());
frameWriter.writePriority(ctxClient(), 5, 3, (short) 256, true, newPromiseClient());
frameWriter.writeData(ctxClient(), 3, content.retain(), 0, true, newPromiseClient());
frameWriter.writeData(ctxClient(), 5, content2.retain(), 0, true, newPromiseClient());
ctxClient().flush();
}
});
awaitRequests();
ArgumentCaptor<HttpObject> httpObjectCaptor = ArgumentCaptor.forClass(HttpObject.class);
verify(messageObserver, times(4)).messageReceived(httpObjectCaptor.capture());
verify(serverObserver, times(2)).messageReceived(httpObjectCaptor.capture());
List<HttpObject> capturedHttpObjects = httpObjectCaptor.getAllValues();
assertEquals(request, capturedHttpObjects.get(0));
assertEquals(content, capturedHttpObjects.get(1));
assertEquals(request2, capturedHttpObjects.get(2));
assertEquals(content2, capturedHttpObjects.get(3));
assertEquals(request2, capturedHttpObjects.get(1));
request.release();
request2.release();
}
@Test
public void serverRequestPushPromise() throws Exception {
setClientLatch(2);
final String text = "hello world big time data!";
final ByteBuf content = Unpooled.copiedBuffer(text.getBytes());
final String text2 = "hello world smaller data?";
final ByteBuf content2 = Unpooled.copiedBuffer(text2.getBytes());
final FullHttpMessage response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK,
content, true);
HttpHeaders httpHeaders = response.headers();
httpHeaders.set(Http2ToHttpHeaders.Names.STREAM_ID, 3);
httpHeaders.set(HttpHeaders.Names.CONTENT_LENGTH, text.length());
final FullHttpMessage response2 = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.CREATED,
content2, true);
HttpHeaders httpHeaders2 = response2.headers();
httpHeaders2.set(Http2ToHttpHeaders.Names.SCHEME, "https");
httpHeaders2.set(Http2ToHttpHeaders.Names.AUTHORITY, "example.org");
httpHeaders2.set(Http2ToHttpHeaders.Names.STREAM_ID, 5);
httpHeaders2.set(Http2ToHttpHeaders.Names.STREAM_PROMISE_ID, 3);
httpHeaders2.set(HttpHeaders.Names.CONTENT_LENGTH, text2.length());
final HttpMessage request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/push/test", true);
httpHeaders = request.headers();
httpHeaders.set(Http2ToHttpHeaders.Names.STREAM_ID, 3);
httpHeaders.set(HttpHeaders.Names.CONTENT_LENGTH, 0);
final Http2Headers http2Headers3 = new DefaultHttp2Headers.Builder().method("GET").path("/push/test").build();
runInChannel(clientChannel, new Http2Runnable() {
@Override
public void run() {
frameWriter.writeHeaders(ctxClient(), 3, http2Headers3, 0, true, newPromiseClient());
ctxClient().flush();
}
});
awaitRequests();
verify(serverObserver).messageReceived(eq(request));
final Http2Headers http2Headers = new DefaultHttp2Headers.Builder().status("200").build();
final Http2Headers http2Headers2 = new DefaultHttp2Headers.Builder().status("201").scheme("https")
.authority("example.org").build();
runInChannel(serverConnectedChannel, new Http2Runnable() {
@Override
public void run() {
frameWriter.writeHeaders(ctxServer(), 3, http2Headers, 0, false, newPromiseServer());
frameWriter.writePushPromise(ctxServer(), 3, 5, http2Headers2, 0, newPromiseServer());
frameWriter.writeData(ctxServer(), 3, content.retain(), 0, true, newPromiseServer());
frameWriter.writeData(ctxServer(), 5, content2.retain(), 0, true, newPromiseServer());
ctxServer().flush();
}
});
awaitResponses();
ArgumentCaptor<HttpObject> httpObjectCaptor = ArgumentCaptor.forClass(HttpObject.class);
verify(clientObserver, times(2)).messageReceived(httpObjectCaptor.capture());
List<HttpObject> capturedHttpObjects = httpObjectCaptor.getAllValues();
assertEquals(response, capturedHttpObjects.get(0));
assertEquals(response2, capturedHttpObjects.get(1));
response.release();
response2.release();
}
@Test
public void serverResponseHeaderInformational() throws Exception {
final FullHttpMessage request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.PUT, "/info/test",
true);
HttpHeaders httpHeaders = request.headers();
httpHeaders.set(Http2ToHttpHeaders.Names.STREAM_ID, 3);
httpHeaders.set(HttpHeaders.Names.EXPECT, HttpHeaders.Values.CONTINUE);
httpHeaders.set(HttpHeaders.Names.CONTENT_LENGTH, 0);
final Http2Headers http2Headers = new DefaultHttp2Headers.Builder().method("PUT").path("/info/test")
.set(HttpHeaders.Names.EXPECT.toString(), HttpHeaders.Values.CONTINUE).build();
runInChannel(clientChannel, new Http2Runnable() {
@Override
public void run() {
frameWriter.writeHeaders(ctxClient(), 3, http2Headers, 0, false, newPromiseClient());
ctxClient().flush();
}
});
awaitRequests();
verify(serverObserver).messageReceived(eq(request));
reset(serverObserver);
final FullHttpMessage response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.CONTINUE);
httpHeaders = response.headers();
httpHeaders.set(Http2ToHttpHeaders.Names.STREAM_ID, 3);
httpHeaders.set(HttpHeaders.Names.CONTENT_LENGTH, 0);
final Http2Headers http2HeadersResponse = new DefaultHttp2Headers.Builder().status("100").build();
runInChannel(serverConnectedChannel, new Http2Runnable() {
@Override
public void run() {
frameWriter.writeHeaders(ctxServer(), 3, http2HeadersResponse, 0, false, newPromiseServer());
ctxServer().flush();
}
});
awaitResponses();
verify(clientObserver).messageReceived(eq(response));
reset(clientObserver);
setServerLatch(1);
final String text = "a big payload";
final ByteBuf payload = Unpooled.copiedBuffer(text.getBytes());
final FullHttpMessage request2 = request.copy(payload);
httpHeaders = request2.headers();
httpHeaders.set(HttpHeaders.Names.CONTENT_LENGTH, text.length());
httpHeaders.remove(HttpHeaders.Names.EXPECT);
request.release();
runInChannel(clientChannel, new Http2Runnable() {
@Override
public void run() {
frameWriter.writeData(ctxClient(), 3, payload.retain(), 0, true, newPromiseClient());
ctxClient().flush();
}
});
awaitRequests();
verify(serverObserver).messageReceived(eq(request2));
request2.release();
setClientLatch(1);
final FullHttpMessage response2 = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
httpHeaders = response2.headers();
httpHeaders.set(Http2ToHttpHeaders.Names.STREAM_ID, 3);
httpHeaders.set(HttpHeaders.Names.CONTENT_LENGTH, 0);
final Http2Headers http2HeadersResponse2 = new DefaultHttp2Headers.Builder().status("200").build();
runInChannel(serverConnectedChannel, new Http2Runnable() {
@Override
public void run() {
frameWriter.writeHeaders(ctxServer(), 3, http2HeadersResponse2, 0, true, newPromiseServer());
ctxServer().flush();
}
});
awaitResponses();
verify(clientObserver).messageReceived(eq(response2));
}
private void setServerLatch(int count) {
serverLatch = new CountDownLatch(count);
if (serverDelegator != null) {
serverDelegator.latch(serverLatch);
}
}
private void setClientLatch(int count) {
clientLatch = new CountDownLatch(count);
if (clientDelegator != null) {
clientDelegator.latch(clientLatch);
}
}
private void awaitRequests() throws Exception {
serverLatch.await(2, SECONDS);
serverLatch.await(200, SECONDS);
}
private ChannelHandlerContext ctx() {
private void awaitResponses() throws Exception {
clientLatch.await(200, SECONDS);
}
private ChannelHandlerContext ctxClient() {
return clientChannel.pipeline().firstContext();
}
private ChannelPromise newPromise() {
return ctx().newPromise();
private ChannelPromise newPromiseClient() {
return ctxClient().newPromise();
}
private ChannelHandlerContext ctxServer() {
return serverConnectedChannel.pipeline().firstContext();
}
private ChannelPromise newPromiseServer() {
return ctxServer().newPromise();
}
private interface HttpResponseListener {
@ -444,130 +539,127 @@ public class InboundHttp2ToHttpAdapterTest {
private final class HttpResponseDelegator extends SimpleChannelInboundHandler<HttpObject> {
private final HttpResponseListener listener;
private CountDownLatch latch;
public HttpResponseDelegator(HttpResponseListener listener) {
public HttpResponseDelegator(HttpResponseListener listener, CountDownLatch latch) {
super(false);
this.listener = listener;
this.latch = latch;
}
@Override
protected void messageReceived(ChannelHandlerContext ctx, HttpObject msg) throws Exception {
this.listener.messageReceived(msg);
serverLatch.countDown();
this.latch.countDown();
}
public void latch(CountDownLatch latch) {
this.latch = latch;
}
}
private final class FrameAdapter extends ByteToMessageDecoder {
private final Http2FrameObserver observer;
private final DefaultHttp2FrameReader reader;
private final CountDownLatch requestLatch;
private final CountDownLatch latch;
FrameAdapter(Http2FrameObserver observer, CountDownLatch requestLatch) {
FrameAdapter(Http2FrameObserver observer, CountDownLatch latch) {
this.observer = observer;
reader = new DefaultHttp2FrameReader();
this.requestLatch = requestLatch;
this.latch = latch;
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)
throws Exception {
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
reader.readFrame(ctx, in, new Http2FrameObserver() {
@Override
public void onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data,
int padding, boolean endOfStream)
throws Http2Exception {
public void onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding,
boolean endOfStream) throws Http2Exception {
observer.onDataRead(ctx, streamId, copy(data), padding, endOfStream);
requestLatch.countDown();
latch.countDown();
}
@Override
public void onHeadersRead(ChannelHandlerContext ctx, int streamId,
Http2Headers headers, int padding, boolean endStream)
throws Http2Exception {
public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int padding,
boolean endStream) throws Http2Exception {
observer.onHeadersRead(ctx, streamId, headers, padding, endStream);
requestLatch.countDown();
latch.countDown();
}
@Override
public void onHeadersRead(ChannelHandlerContext ctx, int streamId,
Http2Headers headers, int streamDependency, short weight,
boolean exclusive, int padding, boolean endStream)
throws Http2Exception {
observer.onHeadersRead(ctx, streamId, headers, streamDependency, weight,
exclusive, padding, endStream);
requestLatch.countDown();
public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers,
int streamDependency, short weight, boolean exclusive, int padding, boolean endStream)
throws Http2Exception {
observer.onHeadersRead(ctx, streamId, headers, streamDependency, weight, exclusive, padding,
endStream);
latch.countDown();
}
@Override
public void onPriorityRead(ChannelHandlerContext ctx, int streamId,
int streamDependency, short weight, boolean exclusive)
throws Http2Exception {
public void onPriorityRead(ChannelHandlerContext ctx, int streamId, int streamDependency, short weight,
boolean exclusive) throws Http2Exception {
observer.onPriorityRead(ctx, streamId, streamDependency, weight, exclusive);
requestLatch.countDown();
latch.countDown();
}
@Override
public void onRstStreamRead(ChannelHandlerContext ctx, int streamId, long errorCode)
throws Http2Exception {
throws Http2Exception {
observer.onRstStreamRead(ctx, streamId, errorCode);
requestLatch.countDown();
latch.countDown();
}
@Override
public void onSettingsAckRead(ChannelHandlerContext ctx) throws Http2Exception {
observer.onSettingsAckRead(ctx);
requestLatch.countDown();
latch.countDown();
}
@Override
public void onSettingsRead(ChannelHandlerContext ctx, Http2Settings settings)
throws Http2Exception {
public void onSettingsRead(ChannelHandlerContext ctx, Http2Settings settings) throws Http2Exception {
observer.onSettingsRead(ctx, settings);
requestLatch.countDown();
latch.countDown();
}
@Override
public void onPingRead(ChannelHandlerContext ctx, ByteBuf data)
throws Http2Exception {
public void onPingRead(ChannelHandlerContext ctx, ByteBuf data) throws Http2Exception {
observer.onPingRead(ctx, copy(data));
requestLatch.countDown();
latch.countDown();
}
@Override
public void onPingAckRead(ChannelHandlerContext ctx, ByteBuf data)
throws Http2Exception {
public void onPingAckRead(ChannelHandlerContext ctx, ByteBuf data) throws Http2Exception {
observer.onPingAckRead(ctx, copy(data));
requestLatch.countDown();
latch.countDown();
}
@Override
public void onPushPromiseRead(ChannelHandlerContext ctx, int streamId,
int promisedStreamId, Http2Headers headers, int padding)
throws Http2Exception {
public void onPushPromiseRead(ChannelHandlerContext ctx, int streamId, int promisedStreamId,
Http2Headers headers, int padding) throws Http2Exception {
observer.onPushPromiseRead(ctx, streamId, promisedStreamId, headers, padding);
requestLatch.countDown();
latch.countDown();
}
@Override
public void onGoAwayRead(ChannelHandlerContext ctx, int lastStreamId,
long errorCode, ByteBuf debugData) throws Http2Exception {
public void onGoAwayRead(ChannelHandlerContext ctx, int lastStreamId, long errorCode, ByteBuf debugData)
throws Http2Exception {
observer.onGoAwayRead(ctx, lastStreamId, errorCode, copy(debugData));
requestLatch.countDown();
latch.countDown();
}
@Override
public void onWindowUpdateRead(ChannelHandlerContext ctx, int streamId,
int windowSizeIncrement) throws Http2Exception {
public void onWindowUpdateRead(ChannelHandlerContext ctx, int streamId, int windowSizeIncrement)
throws Http2Exception {
observer.onWindowUpdateRead(ctx, streamId, windowSizeIncrement);
requestLatch.countDown();
latch.countDown();
}
@Override
public void onUnknownFrame(ChannelHandlerContext ctx, byte frameType, int streamId,
Http2Flags flags, ByteBuf payload) {
public void onUnknownFrame(ChannelHandlerContext ctx, byte frameType, int streamId, Http2Flags flags,
ByteBuf payload) {
observer.onUnknownFrame(ctx, frameType, streamId, flags, payload);
requestLatch.countDown();
latch.countDown();
}
});
}

View File

@ -40,7 +40,7 @@ import java.util.HashSet;
import java.util.TimeZone;
public class DefaultTextHeaders implements TextHeaders {
private static final int HASH_CODE_PRIME = 31;
private static final int BUCKET_SIZE = 17;
private static int index(int hash) {
@ -853,7 +853,17 @@ public class DefaultTextHeaders implements TextHeaders {
@Override
public Set<String> names() {
Set<String> names = new LinkedHashSet<String>(size());
return names(false);
}
/**
* Get the set of names for all text headers
* @param caseInsensitive {@code true} if names should be added in a case insensitive
* @return The set of names for all text headers
*/
public Set<String> names(boolean caseInsensitive) {
Set<String> names = caseInsensitive ? new TreeSet<String>(String.CASE_INSENSITIVE_ORDER)
: new LinkedHashSet<String>(size());
HeaderEntry e = head.after;
while (e != head) {
names.add(e.getKey().toString());
@ -880,13 +890,12 @@ public class DefaultTextHeaders implements TextHeaders {
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
for (String name : names()) {
result = prime * result + name.hashCode();
for (String name : names(true)) {
result = HASH_CODE_PRIME * result + name.hashCode();
Set<String> values = new TreeSet<String>(getAll(name));
for (String value : values) {
result = prime * result + value.hashCode();
result = HASH_CODE_PRIME * result + value.hashCode();
}
}
return result;
@ -901,8 +910,8 @@ public class DefaultTextHeaders implements TextHeaders {
DefaultTextHeaders other = (DefaultTextHeaders) o;
// First, check that the set of names match.
Set<String> names = names();
if (!names.equals(other.names())) {
Set<String> names = names(true);
if (!names.equals(other.names(true))) {
return false;
}

View File

@ -15,23 +15,22 @@
*/
package io.netty.handler.codec;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.embedded.EmbeddedChannel;
import org.junit.Test;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.*;
import org.junit.Test;
public class DefaultTextHeadersTest {
@Test
public void testEqualsMultipleHeaders() {
DefaultTextHeaders h1 = new DefaultTextHeaders();
h1.set("foo", "goo");
h1.set("Foo", "goo");
h1.set("foo2", "goo2");
DefaultTextHeaders h2 = new DefaultTextHeaders();
h2.set("foo", "goo");
h2.set("foo2", "goo2");
h2.set("FoO", "goo");
h2.set("fOO2", "goo2");
assertTrue(h1.equals(h2));
assertTrue(h2.equals(h1));
@ -42,16 +41,16 @@ public class DefaultTextHeadersTest {
@Test
public void testEqualsDuplicateMultipleHeaders() {
DefaultTextHeaders h1 = new DefaultTextHeaders();
h1.set("foo", "goo");
h1.set("foo2", "goo2");
h1.add("foo2", "goo3");
h1.set("FOO", "goo");
h1.set("Foo2", "goo2");
h1.add("fOo2", "goo3");
h1.add("foo", "goo4");
DefaultTextHeaders h2 = new DefaultTextHeaders();
h2.set("foo", "goo");
h2.set("foo2", "goo2");
h2.add("foo", "goo4");
h2.add("foo2", "goo3");
h2.add("foO2", "goo3");
assertTrue(h1.equals(h2));
assertTrue(h2.equals(h1));
@ -62,7 +61,7 @@ public class DefaultTextHeadersTest {
@Test
public void testNotEqualsDuplicateMultipleHeaders() {
DefaultTextHeaders h1 = new DefaultTextHeaders();
h1.set("foo", "goo");
h1.set("FOO", "goo");
h1.set("foo2", "goo2");
h1.add("foo2", "goo3");
h1.add("foo", "goo4");

View File

@ -54,12 +54,12 @@ public class Http2ClientInitializer extends ChannelInitializer<SocketChannel> {
new Http2FrameLogger(INFO, InternalLoggerFactory.getInstance(Http2ClientInitializer.class));
private final SslContext sslCtx;
private final long maxContentLength;
private final int maxContentLength;
private DelegatingHttp2ConnectionHandler connectionHandler;
private HttpResponseHandler responseHandler;
private Http2SettingsHandler settingsHandler;
public Http2ClientInitializer(SslContext sslCtx, long maxContentLength) {
public Http2ClientInitializer(SslContext sslCtx, int maxContentLength) {
this.sslCtx = sslCtx;
this.maxContentLength = maxContentLength;
}
@ -91,8 +91,6 @@ public class Http2ClientInitializer extends ChannelInitializer<SocketChannel> {
protected void configureEndOfPipeline(ChannelPipeline pipeline) {
pipeline.addLast("Http2SettingsHandler", settingsHandler);
pipeline.addLast("Decompressor", new HttpContentDecompressor());
pipeline.addLast("Aggregator", new HttpObjectAggregator((int) maxContentLength));
pipeline.addLast("HttpResponseHandler", responseHandler);
}

View File

@ -19,7 +19,7 @@ import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http2.Http2HttpHeaders;
import io.netty.handler.codec.http2.Http2ToHttpHeaders;
import io.netty.util.CharsetUtil;
import java.util.Iterator;
@ -76,7 +76,7 @@ public class HttpResponseHandler extends SimpleChannelInboundHandler<FullHttpRes
@Override
protected void messageReceived(ChannelHandlerContext ctx, FullHttpResponse msg) throws Exception {
int streamId = Integer.parseInt(msg.headers().get(Http2HttpHeaders.Names.STREAM_ID));
int streamId = Integer.parseInt(msg.headers().get(Http2ToHttpHeaders.Names.STREAM_ID));
ChannelPromise promise = streamidPromiseMap.get(streamId);
if (promise == null) {
System.err.println("Message received for unknown stream id " + streamId);