HTTP/2 SimpleChannelPromiseAggregator don't fail fast

Motivation:
Http2Codec.SimpleChannelPromiseAggregator currently fails fast if as soon as a tryFailure or setFailure method is called. This can lead to write operations which pass the result of SimpleChannelPromiseAggregator.newPromise to multiple channel.write calls throwing exceptions due to the promise being already done. This behavior is not expected by most of the Netty codecs (SslHandler) and can also create unexpected leaks in the http2 codec (DefaultHttp2FrameWriter).

Modifications:
- Http2Codec.SimpleChannelPromiseAggregator shouldn't complete the promise until doneAllocatingPromises is called
- Usages of Http2Codec.SimpleChannelPromiseAggregator should be adjusted to handle the change in behavior
- What were leaks in DefaultHttp2FrameWriter should be fixed to catch any other cases where ctx.write may throw

Result:
SimpleChannelPromiseAggregator won't generate promises which are done when newPromise is called.
This commit is contained in:
Scott Mitchell 2016-03-07 10:05:22 -08:00
parent 45849b2fa8
commit 6a2425b846
3 changed files with 70 additions and 56 deletions

View File

@ -166,8 +166,6 @@ public class DefaultHttp2FrameWriter implements Http2FrameWriter, Http2FrameSize
ctx.write(ZERO_BUFFER.slice(0, framePaddingBytes), promiseAggregator.newPromise());
}
} while (!lastFrame);
return promiseAggregator.doneAllocatingPromises();
} catch (Throwable t) {
if (needToReleaseHeaders) {
header.release();
@ -175,8 +173,9 @@ public class DefaultHttp2FrameWriter implements Http2FrameWriter, Http2FrameSize
if (needToReleaseData) {
data.release();
}
return promiseAggregator.setFailure(t);
promiseAggregator.setFailure(t);
}
return promiseAggregator.doneAllocatingPromises();
}
@Override
@ -275,14 +274,13 @@ public class DefaultHttp2FrameWriter implements Http2FrameWriter, Http2FrameSize
// Write the debug data.
releaseData = false;
ctx.write(data, promiseAggregator.newPromise());
return promiseAggregator.doneAllocatingPromises();
} catch (Throwable t) {
if (releaseData) {
data.release();
}
return promiseAggregator.setFailure(t);
promiseAggregator.setFailure(t);
}
return promiseAggregator.doneAllocatingPromises();
}
@Override
@ -306,7 +304,7 @@ public class DefaultHttp2FrameWriter implements Http2FrameWriter, Http2FrameSize
int nonFragmentLength = INT_FIELD_LENGTH + padding + flags.getPaddingPresenceFieldLength();
int maxFragmentLength = maxFrameSize - nonFragmentLength;
ByteBuf fragment =
headerBlock.readSlice(min(headerBlock.readableBytes(), maxFragmentLength)).retain();
headerBlock.readSlice(min(headerBlock.readableBytes(), maxFragmentLength));
flags.endOfHeaders(!headerBlock.isReadable());
@ -320,7 +318,7 @@ public class DefaultHttp2FrameWriter implements Http2FrameWriter, Http2FrameSize
ctx.write(buf, promiseAggregator.newPromise());
// Write the first fragment.
ctx.write(fragment, promiseAggregator.newPromise());
ctx.write(fragment.retain(), promiseAggregator.newPromise());
if (padding > 0) { // Write out the padding, if any.
ctx.write(ZERO_BUFFER.slice(0, padding), promiseAggregator.newPromise());
@ -329,15 +327,14 @@ public class DefaultHttp2FrameWriter implements Http2FrameWriter, Http2FrameSize
if (!flags.endOfHeaders()) {
writeContinuationFrames(ctx, streamId, headerBlock, padding, promiseAggregator);
}
return promiseAggregator.doneAllocatingPromises();
} catch (Throwable t) {
return promiseAggregator.setFailure(t);
promiseAggregator.setFailure(t);
} finally {
if (headerBlock != null) {
headerBlock.release();
}
}
return promiseAggregator.doneAllocatingPromises();
}
@Override
@ -359,13 +356,13 @@ public class DefaultHttp2FrameWriter implements Http2FrameWriter, Http2FrameSize
releaseData = false;
ctx.write(debugData, promiseAggregator.newPromise());
return promiseAggregator.doneAllocatingPromises();
} catch (Throwable t) {
if (releaseData) {
debugData.release();
}
return promiseAggregator.setFailure(t);
promiseAggregator.setFailure(t);
}
return promiseAggregator.doneAllocatingPromises();
}
@Override
@ -398,13 +395,13 @@ public class DefaultHttp2FrameWriter implements Http2FrameWriter, Http2FrameSize
releaseData = false;
ctx.write(payload, promiseAggregator.newPromise());
return promiseAggregator.doneAllocatingPromises();
} catch (Throwable t) {
if (releaseData) {
payload.release();
}
return promiseAggregator.setFailure(t);
promiseAggregator.setFailure(t);
}
return promiseAggregator.doneAllocatingPromises();
}
private ChannelFuture writeHeadersInternal(ChannelHandlerContext ctx,
@ -432,7 +429,7 @@ public class DefaultHttp2FrameWriter implements Http2FrameWriter, Http2FrameSize
int nonFragmentBytes = padding + flags.getNumPriorityBytes() + flags.getPaddingPresenceFieldLength();
int maxFragmentLength = maxFrameSize - nonFragmentBytes;
ByteBuf fragment =
headerBlock.readSlice(min(headerBlock.readableBytes(), maxFragmentLength)).retain();
headerBlock.readSlice(min(headerBlock.readableBytes(), maxFragmentLength));
// Set the end of headers flag for the first frame.
flags.endOfHeaders(!headerBlock.isReadable());
@ -452,7 +449,7 @@ public class DefaultHttp2FrameWriter implements Http2FrameWriter, Http2FrameSize
ctx.write(buf, promiseAggregator.newPromise());
// Write the first fragment.
ctx.write(fragment, promiseAggregator.newPromise());
ctx.write(fragment.retain(), promiseAggregator.newPromise());
if (padding > 0) { // Write out the padding, if any.
ctx.write(ZERO_BUFFER.slice(0, padding), promiseAggregator.newPromise());
@ -461,15 +458,14 @@ public class DefaultHttp2FrameWriter implements Http2FrameWriter, Http2FrameSize
if (!flags.endOfHeaders()) {
writeContinuationFrames(ctx, streamId, headerBlock, padding, promiseAggregator);
}
return promiseAggregator.doneAllocatingPromises();
} catch (Throwable t) {
return promiseAggregator.setFailure(t);
promiseAggregator.setFailure(t);
} finally {
if (headerBlock != null) {
headerBlock.release();
}
}
return promiseAggregator.doneAllocatingPromises();
}
/**

View File

@ -207,16 +207,16 @@ public final class Http2CodecUtil {
* Provides the ability to associate the outcome of multiple {@link ChannelPromise}
* objects into a single {@link ChannelPromise} object.
*/
static class SimpleChannelPromiseAggregator extends DefaultChannelPromise {
static final class SimpleChannelPromiseAggregator extends DefaultChannelPromise {
private final ChannelPromise promise;
private int expectedCount;
private int successfulCount;
private int failureCount;
private int doneCount;
private Throwable lastFailure;
private boolean doneAllocating;
SimpleChannelPromiseAggregator(ChannelPromise promise, Channel c, EventExecutor e) {
super(c, e);
assert promise != null;
assert promise != null && !promise.isDone();
this.promise = promise;
}
@ -226,9 +226,7 @@ public final class Http2CodecUtil {
* {@code null} if {@link #doneAllocatingPromises()} was previously called.
*/
public ChannelPromise newPromise() {
if (doneAllocating) {
throw new IllegalStateException("Done allocating. No more promises can be allocated.");
}
assert !doneAllocating : "Done allocating. No more promises can be allocated.";
++expectedCount;
return this;
}
@ -241,9 +239,8 @@ public final class Http2CodecUtil {
public ChannelPromise doneAllocatingPromises() {
if (!doneAllocating) {
doneAllocating = true;
if (successfulCount == expectedCount) {
promise.setSuccess();
return super.setSuccess(null);
if (doneCount == expectedCount || expectedCount == 0) {
return setPromise();
}
}
return this;
@ -252,10 +249,10 @@ public final class Http2CodecUtil {
@Override
public boolean tryFailure(Throwable cause) {
if (allowFailure()) {
++failureCount;
if (failureCount == 1) {
promise.tryFailure(cause);
return super.tryFailure(cause);
++doneCount;
lastFailure = cause;
if (allPromisesDone()) {
return tryPromise();
}
// TODO: We break the interface a bit here.
// Multiple failure events can be processed without issue because this is an aggregation.
@ -273,30 +270,21 @@ public final class Http2CodecUtil {
@Override
public ChannelPromise setFailure(Throwable cause) {
if (allowFailure()) {
++failureCount;
if (failureCount == 1) {
promise.setFailure(cause);
return super.setFailure(cause);
++doneCount;
lastFailure = cause;
if (allPromisesDone()) {
return setPromise();
}
}
return this;
}
private boolean allowFailure() {
return awaitingPromises() || expectedCount == 0;
}
private boolean awaitingPromises() {
return successfulCount + failureCount < expectedCount;
}
@Override
public ChannelPromise setSuccess(Void result) {
if (awaitingPromises()) {
++successfulCount;
if (successfulCount == expectedCount && doneAllocating) {
promise.setSuccess(result);
return super.setSuccess(result);
++doneCount;
if (allPromisesDone()) {
setPromise();
}
}
return this;
@ -305,10 +293,9 @@ public final class Http2CodecUtil {
@Override
public boolean trySuccess(Void result) {
if (awaitingPromises()) {
++successfulCount;
if (successfulCount == expectedCount && doneAllocating) {
promise.trySuccess(result);
return super.trySuccess(result);
++doneCount;
if (allPromisesDone()) {
return tryPromise();
}
// TODO: We break the interface a bit here.
// Multiple success events can be processed without issue because this is an aggregation.
@ -316,6 +303,38 @@ public final class Http2CodecUtil {
}
return false;
}
private boolean allowFailure() {
return awaitingPromises() || expectedCount == 0;
}
private boolean awaitingPromises() {
return doneCount < expectedCount;
}
private boolean allPromisesDone() {
return doneCount == expectedCount && doneAllocating;
}
private ChannelPromise setPromise() {
if (lastFailure == null) {
promise.setSuccess();
return super.setSuccess(null);
} else {
promise.setFailure(lastFailure);
return super.setFailure(lastFailure);
}
}
private boolean tryPromise() {
if (lastFailure == null) {
promise.trySuccess();
return super.trySuccess(null);
} else {
promise.tryFailure(lastFailure);
return super.tryFailure(lastFailure);
}
}
}
private Http2CodecUtil() { }

View File

@ -105,14 +105,13 @@ public class HttpToHttp2ConnectionHandler extends Http2ConnectionHandler {
encoder.writeHeaders(ctx, currentStreamId, trailers, 0, true, promiseAggregator.newPromise());
}
}
promiseAggregator.doneAllocatingPromises();
} catch (Throwable t) {
promiseAggregator.setFailure(t);
} finally {
if (release) {
ReferenceCountUtil.release(msg);
}
promiseAggregator.doneAllocatingPromises();
}
}
}