From 6a2425b846ec04c40da3bf7482f4c464334cb2b2 Mon Sep 17 00:00:00 2001 From: Scott Mitchell Date: Mon, 7 Mar 2016 10:05:22 -0800 Subject: [PATCH] HTTP/2 SimpleChannelPromiseAggregator don't fail fast Motivation: Http2Codec.SimpleChannelPromiseAggregator currently fails fast if as soon as a tryFailure or setFailure method is called. This can lead to write operations which pass the result of SimpleChannelPromiseAggregator.newPromise to multiple channel.write calls throwing exceptions due to the promise being already done. This behavior is not expected by most of the Netty codecs (SslHandler) and can also create unexpected leaks in the http2 codec (DefaultHttp2FrameWriter). Modifications: - Http2Codec.SimpleChannelPromiseAggregator shouldn't complete the promise until doneAllocatingPromises is called - Usages of Http2Codec.SimpleChannelPromiseAggregator should be adjusted to handle the change in behavior - What were leaks in DefaultHttp2FrameWriter should be fixed to catch any other cases where ctx.write may throw Result: SimpleChannelPromiseAggregator won't generate promises which are done when newPromise is called. --- .../codec/http2/DefaultHttp2FrameWriter.java | 36 ++++---- .../handler/codec/http2/Http2CodecUtil.java | 87 +++++++++++-------- .../http2/HttpToHttp2ConnectionHandler.java | 3 +- 3 files changed, 70 insertions(+), 56 deletions(-) diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2FrameWriter.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2FrameWriter.java index 6ff98f1b3d..98ea24fd76 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2FrameWriter.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2FrameWriter.java @@ -166,8 +166,6 @@ public class DefaultHttp2FrameWriter implements Http2FrameWriter, Http2FrameSize ctx.write(ZERO_BUFFER.slice(0, framePaddingBytes), promiseAggregator.newPromise()); } } while (!lastFrame); - - return promiseAggregator.doneAllocatingPromises(); } catch (Throwable t) { if (needToReleaseHeaders) { header.release(); @@ -175,8 +173,9 @@ public class DefaultHttp2FrameWriter implements Http2FrameWriter, Http2FrameSize if (needToReleaseData) { data.release(); } - return promiseAggregator.setFailure(t); + promiseAggregator.setFailure(t); } + return promiseAggregator.doneAllocatingPromises(); } @Override @@ -275,14 +274,13 @@ public class DefaultHttp2FrameWriter implements Http2FrameWriter, Http2FrameSize // Write the debug data. releaseData = false; ctx.write(data, promiseAggregator.newPromise()); - - return promiseAggregator.doneAllocatingPromises(); } catch (Throwable t) { if (releaseData) { data.release(); } - return promiseAggregator.setFailure(t); + promiseAggregator.setFailure(t); } + return promiseAggregator.doneAllocatingPromises(); } @Override @@ -306,7 +304,7 @@ public class DefaultHttp2FrameWriter implements Http2FrameWriter, Http2FrameSize int nonFragmentLength = INT_FIELD_LENGTH + padding + flags.getPaddingPresenceFieldLength(); int maxFragmentLength = maxFrameSize - nonFragmentLength; ByteBuf fragment = - headerBlock.readSlice(min(headerBlock.readableBytes(), maxFragmentLength)).retain(); + headerBlock.readSlice(min(headerBlock.readableBytes(), maxFragmentLength)); flags.endOfHeaders(!headerBlock.isReadable()); @@ -320,7 +318,7 @@ public class DefaultHttp2FrameWriter implements Http2FrameWriter, Http2FrameSize ctx.write(buf, promiseAggregator.newPromise()); // Write the first fragment. - ctx.write(fragment, promiseAggregator.newPromise()); + ctx.write(fragment.retain(), promiseAggregator.newPromise()); if (padding > 0) { // Write out the padding, if any. ctx.write(ZERO_BUFFER.slice(0, padding), promiseAggregator.newPromise()); @@ -329,15 +327,14 @@ public class DefaultHttp2FrameWriter implements Http2FrameWriter, Http2FrameSize if (!flags.endOfHeaders()) { writeContinuationFrames(ctx, streamId, headerBlock, padding, promiseAggregator); } - - return promiseAggregator.doneAllocatingPromises(); } catch (Throwable t) { - return promiseAggregator.setFailure(t); + promiseAggregator.setFailure(t); } finally { if (headerBlock != null) { headerBlock.release(); } } + return promiseAggregator.doneAllocatingPromises(); } @Override @@ -359,13 +356,13 @@ public class DefaultHttp2FrameWriter implements Http2FrameWriter, Http2FrameSize releaseData = false; ctx.write(debugData, promiseAggregator.newPromise()); - return promiseAggregator.doneAllocatingPromises(); } catch (Throwable t) { if (releaseData) { debugData.release(); } - return promiseAggregator.setFailure(t); + promiseAggregator.setFailure(t); } + return promiseAggregator.doneAllocatingPromises(); } @Override @@ -398,13 +395,13 @@ public class DefaultHttp2FrameWriter implements Http2FrameWriter, Http2FrameSize releaseData = false; ctx.write(payload, promiseAggregator.newPromise()); - return promiseAggregator.doneAllocatingPromises(); } catch (Throwable t) { if (releaseData) { payload.release(); } - return promiseAggregator.setFailure(t); + promiseAggregator.setFailure(t); } + return promiseAggregator.doneAllocatingPromises(); } private ChannelFuture writeHeadersInternal(ChannelHandlerContext ctx, @@ -432,7 +429,7 @@ public class DefaultHttp2FrameWriter implements Http2FrameWriter, Http2FrameSize int nonFragmentBytes = padding + flags.getNumPriorityBytes() + flags.getPaddingPresenceFieldLength(); int maxFragmentLength = maxFrameSize - nonFragmentBytes; ByteBuf fragment = - headerBlock.readSlice(min(headerBlock.readableBytes(), maxFragmentLength)).retain(); + headerBlock.readSlice(min(headerBlock.readableBytes(), maxFragmentLength)); // Set the end of headers flag for the first frame. flags.endOfHeaders(!headerBlock.isReadable()); @@ -452,7 +449,7 @@ public class DefaultHttp2FrameWriter implements Http2FrameWriter, Http2FrameSize ctx.write(buf, promiseAggregator.newPromise()); // Write the first fragment. - ctx.write(fragment, promiseAggregator.newPromise()); + ctx.write(fragment.retain(), promiseAggregator.newPromise()); if (padding > 0) { // Write out the padding, if any. ctx.write(ZERO_BUFFER.slice(0, padding), promiseAggregator.newPromise()); @@ -461,15 +458,14 @@ public class DefaultHttp2FrameWriter implements Http2FrameWriter, Http2FrameSize if (!flags.endOfHeaders()) { writeContinuationFrames(ctx, streamId, headerBlock, padding, promiseAggregator); } - - return promiseAggregator.doneAllocatingPromises(); } catch (Throwable t) { - return promiseAggregator.setFailure(t); + promiseAggregator.setFailure(t); } finally { if (headerBlock != null) { headerBlock.release(); } } + return promiseAggregator.doneAllocatingPromises(); } /** diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2CodecUtil.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2CodecUtil.java index 6ed55026f8..1b489dc2d4 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2CodecUtil.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2CodecUtil.java @@ -207,16 +207,16 @@ public final class Http2CodecUtil { * Provides the ability to associate the outcome of multiple {@link ChannelPromise} * objects into a single {@link ChannelPromise} object. */ - static class SimpleChannelPromiseAggregator extends DefaultChannelPromise { + static final class SimpleChannelPromiseAggregator extends DefaultChannelPromise { private final ChannelPromise promise; private int expectedCount; - private int successfulCount; - private int failureCount; + private int doneCount; + private Throwable lastFailure; private boolean doneAllocating; SimpleChannelPromiseAggregator(ChannelPromise promise, Channel c, EventExecutor e) { super(c, e); - assert promise != null; + assert promise != null && !promise.isDone(); this.promise = promise; } @@ -226,9 +226,7 @@ public final class Http2CodecUtil { * {@code null} if {@link #doneAllocatingPromises()} was previously called. */ public ChannelPromise newPromise() { - if (doneAllocating) { - throw new IllegalStateException("Done allocating. No more promises can be allocated."); - } + assert !doneAllocating : "Done allocating. No more promises can be allocated."; ++expectedCount; return this; } @@ -241,9 +239,8 @@ public final class Http2CodecUtil { public ChannelPromise doneAllocatingPromises() { if (!doneAllocating) { doneAllocating = true; - if (successfulCount == expectedCount) { - promise.setSuccess(); - return super.setSuccess(null); + if (doneCount == expectedCount || expectedCount == 0) { + return setPromise(); } } return this; @@ -252,10 +249,10 @@ public final class Http2CodecUtil { @Override public boolean tryFailure(Throwable cause) { if (allowFailure()) { - ++failureCount; - if (failureCount == 1) { - promise.tryFailure(cause); - return super.tryFailure(cause); + ++doneCount; + lastFailure = cause; + if (allPromisesDone()) { + return tryPromise(); } // TODO: We break the interface a bit here. // Multiple failure events can be processed without issue because this is an aggregation. @@ -273,30 +270,21 @@ public final class Http2CodecUtil { @Override public ChannelPromise setFailure(Throwable cause) { if (allowFailure()) { - ++failureCount; - if (failureCount == 1) { - promise.setFailure(cause); - return super.setFailure(cause); + ++doneCount; + lastFailure = cause; + if (allPromisesDone()) { + return setPromise(); } } return this; } - private boolean allowFailure() { - return awaitingPromises() || expectedCount == 0; - } - - private boolean awaitingPromises() { - return successfulCount + failureCount < expectedCount; - } - @Override public ChannelPromise setSuccess(Void result) { if (awaitingPromises()) { - ++successfulCount; - if (successfulCount == expectedCount && doneAllocating) { - promise.setSuccess(result); - return super.setSuccess(result); + ++doneCount; + if (allPromisesDone()) { + setPromise(); } } return this; @@ -305,10 +293,9 @@ public final class Http2CodecUtil { @Override public boolean trySuccess(Void result) { if (awaitingPromises()) { - ++successfulCount; - if (successfulCount == expectedCount && doneAllocating) { - promise.trySuccess(result); - return super.trySuccess(result); + ++doneCount; + if (allPromisesDone()) { + return tryPromise(); } // TODO: We break the interface a bit here. // Multiple success events can be processed without issue because this is an aggregation. @@ -316,6 +303,38 @@ public final class Http2CodecUtil { } return false; } + + private boolean allowFailure() { + return awaitingPromises() || expectedCount == 0; + } + + private boolean awaitingPromises() { + return doneCount < expectedCount; + } + + private boolean allPromisesDone() { + return doneCount == expectedCount && doneAllocating; + } + + private ChannelPromise setPromise() { + if (lastFailure == null) { + promise.setSuccess(); + return super.setSuccess(null); + } else { + promise.setFailure(lastFailure); + return super.setFailure(lastFailure); + } + } + + private boolean tryPromise() { + if (lastFailure == null) { + promise.trySuccess(); + return super.trySuccess(null); + } else { + promise.tryFailure(lastFailure); + return super.tryFailure(lastFailure); + } + } } private Http2CodecUtil() { } diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/HttpToHttp2ConnectionHandler.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/HttpToHttp2ConnectionHandler.java index 34d9f02363..52ca74dfcd 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/HttpToHttp2ConnectionHandler.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/HttpToHttp2ConnectionHandler.java @@ -105,14 +105,13 @@ public class HttpToHttp2ConnectionHandler extends Http2ConnectionHandler { encoder.writeHeaders(ctx, currentStreamId, trailers, 0, true, promiseAggregator.newPromise()); } } - - promiseAggregator.doneAllocatingPromises(); } catch (Throwable t) { promiseAggregator.setFailure(t); } finally { if (release) { ReferenceCountUtil.release(msg); } + promiseAggregator.doneAllocatingPromises(); } } }