HTTP/2 CompressorHttp2ConnectionEncoder bug

Motivation:
The CompressorHttp2ConnectionEncoder is attempting to attach a property to streams before the exist.

Modifications:
- Allow the super class to create the streams before attempting to attach a property to the stream.

Result:
CompressorHttp2ConnectionEncoder is able to set the property and access the compressor.
This commit is contained in:
Scott Mitchell 2015-07-17 09:29:51 -07:00
parent 84f599639e
commit ef195348b5
3 changed files with 78 additions and 85 deletions

View File

@ -147,17 +147,43 @@ public class CompressorHttp2ConnectionEncoder extends DecoratingHttp2ConnectionE
@Override @Override
public ChannelFuture writeHeaders(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int padding, public ChannelFuture writeHeaders(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int padding,
boolean endStream, ChannelPromise promise) { boolean endStream, ChannelPromise promise) {
initCompressor(streamId, headers, endStream); try {
return super.writeHeaders(ctx, streamId, headers, padding, endStream, promise); // Determine if compression is required and sanitize the headers.
EmbeddedChannel compressor = newCompressor(headers, endStream);
// Write the headers and create the stream object.
ChannelFuture future = super.writeHeaders(ctx, streamId, headers, padding, endStream, promise);
// After the stream object has been created, then attach the compressor as a property for data compression.
bindCompressorToStream(compressor, streamId);
return future;
} catch (Throwable e) {
promise.tryFailure(e);
}
return promise;
} }
@Override @Override
public ChannelFuture writeHeaders(final ChannelHandlerContext ctx, final int streamId, final Http2Headers headers, public ChannelFuture writeHeaders(final ChannelHandlerContext ctx, final int streamId, final Http2Headers headers,
final int streamDependency, final short weight, final boolean exclusive, final int padding, final int streamDependency, final short weight, final boolean exclusive, final int padding,
final boolean endOfStream, final ChannelPromise promise) { final boolean endOfStream, final ChannelPromise promise) {
initCompressor(streamId, headers, endOfStream); try {
return super.writeHeaders(ctx, streamId, headers, streamDependency, weight, exclusive, padding, endOfStream, // Determine if compression is required and sanitize the headers.
promise); EmbeddedChannel compressor = newCompressor(headers, endOfStream);
// Write the headers and create the stream object.
ChannelFuture future = super.writeHeaders(ctx, streamId, headers, streamDependency, weight, exclusive,
padding, endOfStream, promise);
// After the stream object has been created, then attach the compressor as a property for data compression.
bindCompressorToStream(compressor, streamId);
return future;
} catch (Throwable e) {
promise.tryFailure(e);
}
return promise;
} }
/** /**
@ -205,48 +231,50 @@ public class CompressorHttp2ConnectionEncoder extends DecoratingHttp2ConnectionE
* Checks if a new compressor object is needed for the stream identified by {@code streamId}. This method will * Checks if a new compressor object is needed for the stream identified by {@code streamId}. This method will
* modify the {@code content-encoding} header contained in {@code headers}. * modify the {@code content-encoding} header contained in {@code headers}.
* *
* @param streamId The identifier for the headers inside {@code headers}
* @param headers Object representing headers which are to be written * @param headers Object representing headers which are to be written
* @param endOfStream Indicates if the stream has ended * @param endOfStream Indicates if the stream has ended
* @return The channel used to compress data.
* @throws Http2Exception if any problems occur during initialization.
*/ */
private void initCompressor(int streamId, Http2Headers headers, boolean endOfStream) { private EmbeddedChannel newCompressor(Http2Headers headers, boolean endOfStream) throws Http2Exception {
final Http2Stream stream = connection().stream(streamId); if (endOfStream) {
if (stream == null) { return null;
return;
} }
EmbeddedChannel compressor = stream.getProperty(propertyKey); ByteString encoding = headers.get(CONTENT_ENCODING);
if (compressor == null) { if (encoding == null) {
if (!endOfStream) { encoding = IDENTITY;
ByteString encoding = headers.get(CONTENT_ENCODING);
if (encoding == null) {
encoding = IDENTITY;
}
try {
compressor = newContentCompressor(encoding);
if (compressor != null) {
stream.setProperty(propertyKey, compressor);
ByteString targetContentEncoding = getTargetContentEncoding(encoding);
if (IDENTITY.equals(targetContentEncoding)) {
headers.remove(CONTENT_ENCODING);
} else {
headers.set(CONTENT_ENCODING, targetContentEncoding);
}
}
} catch (Throwable ignored) {
// Ignore
}
}
} else if (endOfStream) {
cleanup(stream, compressor);
} }
final EmbeddedChannel compressor = newContentCompressor(encoding);
if (compressor != null) { if (compressor != null) {
ByteString targetContentEncoding = getTargetContentEncoding(encoding);
if (IDENTITY.equals(targetContentEncoding)) {
headers.remove(CONTENT_ENCODING);
} else {
headers.set(CONTENT_ENCODING, targetContentEncoding);
}
// The content length will be for the decompressed data. Since we will compress the data // The content length will be for the decompressed data. Since we will compress the data
// this content-length will not be correct. Instead of queuing messages or delaying sending // this content-length will not be correct. Instead of queuing messages or delaying sending
// header frames...just remove the content-length header // header frames...just remove the content-length header
headers.remove(CONTENT_LENGTH); headers.remove(CONTENT_LENGTH);
} }
return compressor;
}
/**
* Called after the super class has written the headers and created any associated stream objects.
* @param compressor The compressor associated with the stream identified by {@code streamId}.
* @param streamId The stream id for which the headers were written.
*/
private void bindCompressorToStream(EmbeddedChannel compressor, int streamId) {
if (compressor != null) {
Http2Stream stream = connection().stream(streamId);
if (stream != null) {
stream.setProperty(propertyKey, compressor);
}
}
} }
/** /**

View File

@ -43,8 +43,6 @@ import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.HttpHeaderNames; import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpHeaderValues; import io.netty.handler.codec.http.HttpHeaderValues;
import io.netty.handler.codec.http2.Http2Stream.State; import io.netty.handler.codec.http2.Http2Stream.State;
import io.netty.handler.codec.http2.Http2TestUtil.FrameAdapter;
import io.netty.handler.codec.http2.Http2TestUtil.FrameCountDown;
import io.netty.handler.codec.http2.Http2TestUtil.Http2Runnable; import io.netty.handler.codec.http2.Http2TestUtil.Http2Runnable;
import io.netty.util.AsciiString; import io.netty.util.AsciiString;
import io.netty.util.CharsetUtil; import io.netty.util.CharsetUtil;
@ -83,9 +81,7 @@ public class DataCompressionHttp2Test {
private Bootstrap cb; private Bootstrap cb;
private Channel serverChannel; private Channel serverChannel;
private Channel clientChannel; private Channel clientChannel;
private CountDownLatch serverLatch; private CountDownLatch serverCloseLatch;
private CountDownLatch clientLatch;
private CountDownLatch clientSettingsAckLatch;
private Http2Connection serverConnection; private Http2Connection serverConnection;
private Http2Connection clientConnection; private Http2Connection clientConnection;
private Http2ConnectionHandler clientHandler; private Http2ConnectionHandler clientHandler;
@ -114,14 +110,10 @@ public class DataCompressionHttp2Test {
@Test @Test
public void justHeadersNoData() throws Exception { public void justHeadersNoData() throws Exception {
bootstrapEnv(1, 1, 0, 1); bootstrapEnv(0);
final Http2Headers headers = new DefaultHttp2Headers().method(GET).path(PATH) final Http2Headers headers = new DefaultHttp2Headers().method(GET).path(PATH)
.set(HttpHeaderNames.CONTENT_ENCODING, HttpHeaderValues.GZIP); .set(HttpHeaderNames.CONTENT_ENCODING, HttpHeaderValues.GZIP);
// Required because the decompressor intercepts the onXXXRead events before
// our {@link Http2TestUtil$FrameAdapter} does.
FrameAdapter.getOrCreateStream(serverConnection, 3, false);
FrameAdapter.getOrCreateStream(clientConnection, 3, false);
runInChannel(clientChannel, new Http2Runnable() { runInChannel(clientChannel, new Http2Runnable() {
@Override @Override
public void run() throws Http2Exception { public void run() throws Http2Exception {
@ -138,15 +130,11 @@ public class DataCompressionHttp2Test {
public void gzipEncodingSingleEmptyMessage() throws Exception { public void gzipEncodingSingleEmptyMessage() throws Exception {
final String text = ""; final String text = "";
final ByteBuf data = Unpooled.copiedBuffer(text.getBytes()); final ByteBuf data = Unpooled.copiedBuffer(text.getBytes());
bootstrapEnv(1, 1, data.readableBytes(), 1); bootstrapEnv(data.readableBytes());
try { try {
final Http2Headers headers = new DefaultHttp2Headers().method(POST).path(PATH) final Http2Headers headers = new DefaultHttp2Headers().method(POST).path(PATH)
.set(HttpHeaderNames.CONTENT_ENCODING, HttpHeaderValues.GZIP); .set(HttpHeaderNames.CONTENT_ENCODING, HttpHeaderValues.GZIP);
// Required because the decompressor intercepts the onXXXRead events before
// our {@link Http2TestUtil$FrameAdapter} does.
Http2Stream stream = FrameAdapter.getOrCreateStream(serverConnection, 3, false);
FrameAdapter.getOrCreateStream(clientConnection, 3, false);
runInChannel(clientChannel, new Http2Runnable() { runInChannel(clientChannel, new Http2Runnable() {
@Override @Override
public void run() throws Http2Exception { public void run() throws Http2Exception {
@ -156,7 +144,6 @@ public class DataCompressionHttp2Test {
} }
}); });
awaitServer(); awaitServer();
assertEquals(0, serverConnection.local().flowController().unconsumedBytes(stream));
assertEquals(text, serverOut.toString(CharsetUtil.UTF_8.name())); assertEquals(text, serverOut.toString(CharsetUtil.UTF_8.name()));
} finally { } finally {
data.release(); data.release();
@ -167,15 +154,11 @@ public class DataCompressionHttp2Test {
public void gzipEncodingSingleMessage() throws Exception { public void gzipEncodingSingleMessage() throws Exception {
final String text = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaabbbbbbbbbbbbbbbbbbbbbbbbbbbbbccccccccccccccccccccccc"; final String text = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaabbbbbbbbbbbbbbbbbbbbbbbbbbbbbccccccccccccccccccccccc";
final ByteBuf data = Unpooled.copiedBuffer(text.getBytes()); final ByteBuf data = Unpooled.copiedBuffer(text.getBytes());
bootstrapEnv(1, 1, data.readableBytes(), 1); bootstrapEnv(data.readableBytes());
try { try {
final Http2Headers headers = new DefaultHttp2Headers().method(POST).path(PATH) final Http2Headers headers = new DefaultHttp2Headers().method(POST).path(PATH)
.set(HttpHeaderNames.CONTENT_ENCODING, HttpHeaderValues.GZIP); .set(HttpHeaderNames.CONTENT_ENCODING, HttpHeaderValues.GZIP);
// Required because the decompressor intercepts the onXXXRead events before
// our {@link Http2TestUtil$FrameAdapter} does.
Http2Stream stream = FrameAdapter.getOrCreateStream(serverConnection, 3, false);
FrameAdapter.getOrCreateStream(clientConnection, 3, false);
runInChannel(clientChannel, new Http2Runnable() { runInChannel(clientChannel, new Http2Runnable() {
@Override @Override
public void run() throws Http2Exception { public void run() throws Http2Exception {
@ -185,7 +168,6 @@ public class DataCompressionHttp2Test {
} }
}); });
awaitServer(); awaitServer();
assertEquals(0, serverConnection.local().flowController().unconsumedBytes(stream));
assertEquals(text, serverOut.toString(CharsetUtil.UTF_8.name())); assertEquals(text, serverOut.toString(CharsetUtil.UTF_8.name()));
} finally { } finally {
data.release(); data.release();
@ -198,16 +180,12 @@ public class DataCompressionHttp2Test {
final String text2 = "dddddddddddddddddddeeeeeeeeeeeeeeeeeeeffffffffffffffffffff"; final String text2 = "dddddddddddddddddddeeeeeeeeeeeeeeeeeeeffffffffffffffffffff";
final ByteBuf data1 = Unpooled.copiedBuffer(text1.getBytes()); final ByteBuf data1 = Unpooled.copiedBuffer(text1.getBytes());
final ByteBuf data2 = Unpooled.copiedBuffer(text2.getBytes()); final ByteBuf data2 = Unpooled.copiedBuffer(text2.getBytes());
bootstrapEnv(1, 1, data1.readableBytes() + data2.readableBytes(), 1); bootstrapEnv(data1.readableBytes() + data2.readableBytes());
try { try {
final Http2Headers headers = new DefaultHttp2Headers().method(POST).path(PATH) final Http2Headers headers = new DefaultHttp2Headers().method(POST).path(PATH)
.set(HttpHeaderNames.CONTENT_ENCODING, HttpHeaderValues.GZIP); .set(HttpHeaderNames.CONTENT_ENCODING, HttpHeaderValues.GZIP);
// Required because the decompressor intercepts the onXXXRead events before runInChannel(clientChannel, new Http2Runnable() {
// our {@link Http2TestUtil$FrameAdapter} does.
Http2Stream stream = FrameAdapter.getOrCreateStream(serverConnection, 3, false);
FrameAdapter.getOrCreateStream(clientConnection, 3, false);
runInChannel(clientChannel, new Http2Runnable() {
@Override @Override
public void run() throws Http2Exception { public void run() throws Http2Exception {
clientEncoder.writeHeaders(ctxClient(), 3, headers, 0, false, newPromiseClient()); clientEncoder.writeHeaders(ctxClient(), 3, headers, 0, false, newPromiseClient());
@ -217,9 +195,7 @@ public class DataCompressionHttp2Test {
} }
}); });
awaitServer(); awaitServer();
assertEquals(0, serverConnection.local().flowController().unconsumedBytes(stream)); assertEquals(text1 + text2, serverOut.toString(CharsetUtil.UTF_8.name()));
assertEquals(text1 + text2,
serverOut.toString(CharsetUtil.UTF_8.name()));
} finally { } finally {
data1.release(); data1.release();
data2.release(); data2.release();
@ -231,16 +207,12 @@ public class DataCompressionHttp2Test {
final int BUFFER_SIZE = 1 << 12; final int BUFFER_SIZE = 1 << 12;
final byte[] bytes = new byte[BUFFER_SIZE]; final byte[] bytes = new byte[BUFFER_SIZE];
new Random().nextBytes(bytes); new Random().nextBytes(bytes);
bootstrapEnv(1, 1, BUFFER_SIZE, 1); bootstrapEnv(BUFFER_SIZE);
final ByteBuf data = Unpooled.wrappedBuffer(bytes); final ByteBuf data = Unpooled.wrappedBuffer(bytes);
try { try {
final Http2Headers headers = new DefaultHttp2Headers().method(POST).path(PATH) final Http2Headers headers = new DefaultHttp2Headers().method(POST).path(PATH)
.set(HttpHeaderNames.CONTENT_ENCODING, HttpHeaderValues.DEFLATE); .set(HttpHeaderNames.CONTENT_ENCODING, HttpHeaderValues.DEFLATE);
// Required because the decompressor intercepts the onXXXRead events before
// our {@link Http2TestUtil$FrameAdapter} does.
Http2Stream stream = FrameAdapter.getOrCreateStream(serverConnection, 3, false);
FrameAdapter.getOrCreateStream(clientConnection, 3, false);
runInChannel(clientChannel, new Http2Runnable() { runInChannel(clientChannel, new Http2Runnable() {
@Override @Override
public void run() throws Http2Exception { public void run() throws Http2Exception {
@ -250,7 +222,6 @@ public class DataCompressionHttp2Test {
} }
}); });
awaitServer(); awaitServer();
assertEquals(0, serverConnection.local().flowController().unconsumedBytes(stream));
assertEquals(data.resetReaderIndex().toString(CharsetUtil.UTF_8), assertEquals(data.resetReaderIndex().toString(CharsetUtil.UTF_8),
serverOut.toString(CharsetUtil.UTF_8.name())); serverOut.toString(CharsetUtil.UTF_8.name()));
} finally { } finally {
@ -258,12 +229,9 @@ public class DataCompressionHttp2Test {
} }
} }
private void bootstrapEnv(int serverHalfClosedCount, int clientSettingsAckLatchCount, private void bootstrapEnv(int serverOutSize) throws Exception {
int serverOutSize, int clientCount) throws Exception {
serverOut = new ByteArrayOutputStream(serverOutSize); serverOut = new ByteArrayOutputStream(serverOutSize);
serverLatch = new CountDownLatch(serverHalfClosedCount); serverCloseLatch = new CountDownLatch(1);
clientLatch = new CountDownLatch(clientCount);
clientSettingsAckLatch = new CountDownLatch(clientSettingsAckLatchCount);
sb = new ServerBootstrap(); sb = new ServerBootstrap();
cb = new Bootstrap(); cb = new Bootstrap();
@ -275,12 +243,12 @@ public class DataCompressionHttp2Test {
@Override @Override
public void onStreamActive(Http2Stream stream) { public void onStreamActive(Http2Stream stream) {
if (stream.state() == State.HALF_CLOSED_LOCAL || stream.state() == State.HALF_CLOSED_REMOTE) { if (stream.state() == State.HALF_CLOSED_LOCAL || stream.state() == State.HALF_CLOSED_REMOTE) {
serverLatch.countDown(); serverCloseLatch.countDown();
} }
} }
@Override @Override
public void onStreamHalfClosed(Http2Stream stream) { public void onStreamHalfClosed(Http2Stream stream) {
serverLatch.countDown(); serverCloseLatch.countDown();
} }
}); });
@ -322,15 +290,13 @@ public class DataCompressionHttp2Test {
@Override @Override
protected void initChannel(Channel ch) throws Exception { protected void initChannel(Channel ch) throws Exception {
ChannelPipeline p = ch.pipeline(); ChannelPipeline p = ch.pipeline();
FrameCountDown clientFrameCountDown = new FrameCountDown(clientListener,
clientSettingsAckLatch, clientLatch);
clientEncoder = new CompressorHttp2ConnectionEncoder( clientEncoder = new CompressorHttp2ConnectionEncoder(
new DefaultHttp2ConnectionEncoder(clientConnection, new DefaultHttp2FrameWriter())); new DefaultHttp2ConnectionEncoder(clientConnection, new DefaultHttp2FrameWriter()));
Http2ConnectionDecoder decoder = Http2ConnectionDecoder decoder =
new DefaultHttp2ConnectionDecoder(clientConnection, clientEncoder, new DefaultHttp2ConnectionDecoder(clientConnection, clientEncoder,
new DefaultHttp2FrameReader(), new DefaultHttp2FrameReader(),
new DelegatingDecompressorFrameListener(clientConnection, new DelegatingDecompressorFrameListener(clientConnection,
clientFrameCountDown)); clientListener));
clientHandler = new Http2ConnectionHandler(decoder, clientEncoder); clientHandler = new Http2ConnectionHandler(decoder, clientEncoder);
p.addLast(clientHandler); p.addLast(clientHandler);
} }
@ -346,8 +312,7 @@ public class DataCompressionHttp2Test {
} }
private void awaitServer() throws Exception { private void awaitServer() throws Exception {
assertTrue(clientSettingsAckLatch.await(5, SECONDS)); assertTrue(serverCloseLatch.await(5, SECONDS));
assertTrue(serverLatch.await(5, SECONDS));
serverOut.flush(); serverOut.flush();
} }

View File

@ -114,7 +114,7 @@ final class Http2TestUtil {
this.latch = latch; this.latch = latch;
} }
public Http2Stream getOrCreateStream(int streamId, boolean halfClosed) throws Http2Exception { private Http2Stream getOrCreateStream(int streamId, boolean halfClosed) throws Http2Exception {
return getOrCreateStream(connection, streamId, halfClosed); return getOrCreateStream(connection, streamId, halfClosed);
} }