Include error code and message in GOAWAY events.

Motivation:

The Connection.Listener GOAWAY event handler currently provides no additional information, requiring applications to hack in other ways to get at the error code and debug message.

Modifications:

Modified the Connection.Listener interface to pass on the error code and message that triggered the GOAWAY.

Result:

Application can now use Connection.Listener for all GOAWAY processing.
This commit is contained in:
nmittler 2015-03-20 10:04:38 -07:00
parent 44eeb5f6b4
commit 9737cc6cc9
17 changed files with 124 additions and 98 deletions

View File

@ -40,7 +40,7 @@ import io.netty.handler.codec.compression.ZlibWrapper;
public class CompressorHttp2ConnectionEncoder extends DecoratingHttp2ConnectionEncoder { public class CompressorHttp2ConnectionEncoder extends DecoratingHttp2ConnectionEncoder {
private static final Http2ConnectionAdapter CLEAN_UP_LISTENER = new Http2ConnectionAdapter() { private static final Http2ConnectionAdapter CLEAN_UP_LISTENER = new Http2ConnectionAdapter() {
@Override @Override
public void streamRemoved(Http2Stream stream) { public void onStreamRemoved(Http2Stream stream) {
final EmbeddedChannel compressor = stream.getProperty(CompressorHttp2ConnectionEncoder.class); final EmbeddedChannel compressor = stream.getProperty(CompressorHttp2ConnectionEncoder.class);
if (compressor != null) { if (compressor != null) {
cleanup(stream, compressor); cleanup(stream, compressor);

View File

@ -33,6 +33,8 @@ import static io.netty.handler.codec.http2.Http2Stream.State.OPEN;
import static io.netty.handler.codec.http2.Http2Stream.State.RESERVED_LOCAL; import static io.netty.handler.codec.http2.Http2Stream.State.RESERVED_LOCAL;
import static io.netty.handler.codec.http2.Http2Stream.State.RESERVED_REMOTE; import static io.netty.handler.codec.http2.Http2Stream.State.RESERVED_REMOTE;
import static io.netty.util.internal.ObjectUtil.checkNotNull; import static io.netty.util.internal.ObjectUtil.checkNotNull;
import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.http2.Http2StreamRemovalPolicy.Action; import io.netty.handler.codec.http2.Http2StreamRemovalPolicy.Action;
import io.netty.util.collection.IntObjectHashMap; import io.netty.util.collection.IntObjectHashMap;
import io.netty.util.collection.IntObjectMap; import io.netty.util.collection.IntObjectMap;
@ -150,19 +152,20 @@ public class DefaultHttp2Connection implements Http2Connection {
return remoteEndpoint; return remoteEndpoint;
} }
@Override
public boolean isGoAway() {
return goAwaySent() || goAwayReceived();
}
@Override @Override
public boolean goAwayReceived() { public boolean goAwayReceived() {
return localEndpoint.lastKnownStream >= 0; return localEndpoint.lastKnownStream >= 0;
} }
@Override @Override
public void goAwayReceived(int lastKnownStream) { public void goAwayReceived(int lastKnownStream, long errorCode, ByteBuf debugData) {
boolean alreadyNotified = goAwayReceived();
localEndpoint.lastKnownStream(lastKnownStream); localEndpoint.lastKnownStream(lastKnownStream);
if (!alreadyNotified) {
for (Listener listener : listeners) {
listener.onGoAwayReceived(lastKnownStream, errorCode, debugData);
}
}
} }
@Override @Override
@ -171,14 +174,20 @@ public class DefaultHttp2Connection implements Http2Connection {
} }
@Override @Override
public void goAwaySent(int lastKnownStream) { public void goAwaySent(int lastKnownStream, long errorCode, ByteBuf debugData) {
boolean alreadyNotified = goAwaySent();
remoteEndpoint.lastKnownStream(lastKnownStream); remoteEndpoint.lastKnownStream(lastKnownStream);
if (!alreadyNotified) {
for (Listener listener : listeners) {
listener.onGoAwaySent(lastKnownStream, errorCode, debugData);
}
}
} }
private void removeStream(DefaultStream stream) { private void removeStream(DefaultStream stream) {
// Notify the listeners of the event first. // Notify the listeners of the event first.
for (Listener listener : listeners) { for (Listener listener : listeners) {
listener.streamRemoved(stream); listener.onStreamRemoved(stream);
} }
// Remove it from the map and priority tree. // Remove it from the map and priority tree.
@ -353,7 +362,7 @@ public class DefaultHttp2Connection implements Http2Connection {
// Notify the listeners. // Notify the listeners.
for (Listener listener : listeners) { for (Listener listener : listeners) {
listener.streamActive(this); listener.onStreamActive(this);
} }
} }
return this; return this;
@ -373,7 +382,7 @@ public class DefaultHttp2Connection implements Http2Connection {
// Notify the listeners. // Notify the listeners.
for (Listener listener : listeners) { for (Listener listener : listeners) {
listener.streamClosed(this); listener.onStreamClosed(this);
} }
} finally { } finally {
// Mark this stream for removal. // Mark this stream for removal.
@ -417,7 +426,7 @@ public class DefaultHttp2Connection implements Http2Connection {
private void notifyHalfClosed(Http2Stream stream) { private void notifyHalfClosed(Http2Stream stream) {
for (Listener listener : listeners) { for (Listener listener : listeners) {
listener.streamHalfClosed(stream); listener.onStreamHalfClosed(stream);
} }
} }
@ -604,7 +613,7 @@ public class DefaultHttp2Connection implements Http2Connection {
* @param l The listener to notify * @param l The listener to notify
*/ */
public void notifyListener(Listener l) { public void notifyListener(Listener l) {
l.priorityTreeParentChanged(stream, oldParent); l.onPriorityTreeParentChanged(stream, oldParent);
} }
} }
@ -623,7 +632,7 @@ public class DefaultHttp2Connection implements Http2Connection {
private void notifyParentChanging(Http2Stream stream, Http2Stream newParent) { private void notifyParentChanging(Http2Stream stream, Http2Stream newParent) {
for (Listener l : listeners) { for (Listener l : listeners) {
l.priorityTreeParentChanging(stream, newParent); l.onPriorityTreeParentChanging(stream, newParent);
} }
} }
@ -759,7 +768,7 @@ public class DefaultHttp2Connection implements Http2Connection {
// Notify the listeners of the event. // Notify the listeners of the event.
for (Listener listener : listeners) { for (Listener listener : listeners) {
listener.streamAdded(stream); listener.onStreamAdded(stream);
} }
notifyParentChanged(events); notifyParentChanged(events);
@ -804,17 +813,7 @@ public class DefaultHttp2Connection implements Http2Connection {
} }
private void lastKnownStream(int lastKnownStream) { private void lastKnownStream(int lastKnownStream) {
boolean alreadyNotified = isGoAway();
this.lastKnownStream = lastKnownStream; this.lastKnownStream = lastKnownStream;
if (!alreadyNotified) {
notifyGoingAway();
}
}
private void notifyGoingAway() {
for (Listener listener : listeners) {
listener.goingAway();
}
} }
@Override @Override
@ -833,7 +832,7 @@ public class DefaultHttp2Connection implements Http2Connection {
} }
private void checkNewStreamAllowed(int streamId) throws Http2Exception { private void checkNewStreamAllowed(int streamId) throws Http2Exception {
if (isGoAway()) { if (goAwaySent() || goAwayReceived()) {
throw connectionError(PROTOCOL_ERROR, "Cannot create a stream since the connection is going away"); throw connectionError(PROTOCOL_ERROR, "Cannot create a stream since the connection is going away");
} }
if (streamId < 0) { if (streamId < 0) {

View File

@ -169,7 +169,7 @@ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder {
void onGoAwayRead0(ChannelHandlerContext ctx, int lastStreamId, long errorCode, ByteBuf debugData) void onGoAwayRead0(ChannelHandlerContext ctx, int lastStreamId, long errorCode, ByteBuf debugData)
throws Http2Exception { throws Http2Exception {
// Don't allow any more connections to be created. // Don't allow any more connections to be created.
connection.goAwayReceived(lastStreamId); connection.goAwayReceived(lastStreamId, errorCode, debugData);
listener.onGoAwayRead(ctx, lastStreamId, errorCode, debugData); listener.onGoAwayRead(ctx, lastStreamId, errorCode, debugData);
} }

View File

@ -112,7 +112,7 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder {
final boolean endOfStream, ChannelPromise promise) { final boolean endOfStream, ChannelPromise promise) {
final Http2Stream stream; final Http2Stream stream;
try { try {
if (connection.isGoAway()) { if (connection.goAwayReceived() || connection.goAwaySent()) {
throw new IllegalStateException("Sending data after connection going away."); throw new IllegalStateException("Sending data after connection going away.");
} }
@ -151,7 +151,7 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder {
final boolean exclusive, final int padding, final boolean endOfStream, final boolean exclusive, final int padding, final boolean endOfStream,
final ChannelPromise promise) { final ChannelPromise promise) {
try { try {
if (connection.isGoAway()) { if (connection.goAwayReceived() || connection.goAwaySent()) {
throw connectionError(PROTOCOL_ERROR, "Sending headers after connection going away."); throw connectionError(PROTOCOL_ERROR, "Sending headers after connection going away.");
} }
Http2Stream stream = connection.stream(streamId); Http2Stream stream = connection.stream(streamId);
@ -190,7 +190,7 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder {
public ChannelFuture writePriority(ChannelHandlerContext ctx, int streamId, int streamDependency, short weight, public ChannelFuture writePriority(ChannelHandlerContext ctx, int streamId, int streamDependency, short weight,
boolean exclusive, ChannelPromise promise) { boolean exclusive, ChannelPromise promise) {
try { try {
if (connection.isGoAway()) { if (connection.goAwayReceived() || connection.goAwaySent()) {
throw connectionError(PROTOCOL_ERROR, "Sending priority after connection going away."); throw connectionError(PROTOCOL_ERROR, "Sending priority after connection going away.");
} }
@ -227,7 +227,7 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder {
ChannelPromise promise) { ChannelPromise promise) {
outstandingLocalSettingsQueue.add(settings); outstandingLocalSettingsQueue.add(settings);
try { try {
if (connection.isGoAway()) { if (connection.goAwayReceived() || connection.goAwaySent()) {
throw connectionError(PROTOCOL_ERROR, "Sending settings after connection going away."); throw connectionError(PROTOCOL_ERROR, "Sending settings after connection going away.");
} }
@ -254,7 +254,7 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder {
@Override @Override
public ChannelFuture writePing(ChannelHandlerContext ctx, boolean ack, ByteBuf data, public ChannelFuture writePing(ChannelHandlerContext ctx, boolean ack, ByteBuf data,
ChannelPromise promise) { ChannelPromise promise) {
if (connection.isGoAway()) { if (connection.goAwayReceived() || connection.goAwaySent()) {
data.release(); data.release();
return promise.setFailure(connectionError(PROTOCOL_ERROR, "Sending ping after connection going away.")); return promise.setFailure(connectionError(PROTOCOL_ERROR, "Sending ping after connection going away."));
} }
@ -268,7 +268,7 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder {
public ChannelFuture writePushPromise(ChannelHandlerContext ctx, int streamId, int promisedStreamId, public ChannelFuture writePushPromise(ChannelHandlerContext ctx, int streamId, int promisedStreamId,
Http2Headers headers, int padding, ChannelPromise promise) { Http2Headers headers, int padding, ChannelPromise promise) {
try { try {
if (connection.isGoAway()) { if (connection.goAwayReceived() || connection.goAwaySent()) {
throw connectionError(PROTOCOL_ERROR, "Sending push promise after connection going away."); throw connectionError(PROTOCOL_ERROR, "Sending push promise after connection going away.");
} }

View File

@ -64,12 +64,12 @@ public class DefaultHttp2LocalFlowController implements Http2LocalFlowController
// Register for notification of new streams. // Register for notification of new streams.
connection.addListener(new Http2ConnectionAdapter() { connection.addListener(new Http2ConnectionAdapter() {
@Override @Override
public void streamAdded(Http2Stream stream) { public void onStreamAdded(Http2Stream stream) {
stream.setProperty(FlowState.class, new FlowState(stream, 0)); stream.setProperty(FlowState.class, new FlowState(stream, 0));
} }
@Override @Override
public void streamActive(Http2Stream stream) { public void onStreamActive(Http2Stream stream) {
// Need to be sure the stream's initial window is adjusted for SETTINGS // Need to be sure the stream's initial window is adjusted for SETTINGS
// frames which may have been exchanged while it was in IDLE // frames which may have been exchanged while it was in IDLE
state(stream).window(initialWindowSize); state(stream).window(initialWindowSize);

View File

@ -48,27 +48,27 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
// Register for notification of new streams. // Register for notification of new streams.
connection.addListener(new Http2ConnectionAdapter() { connection.addListener(new Http2ConnectionAdapter() {
@Override @Override
public void streamAdded(Http2Stream stream) { public void onStreamAdded(Http2Stream stream) {
// Just add a new flow state to the stream. // Just add a new flow state to the stream.
stream.setProperty(FlowState.class, new FlowState(stream, 0)); stream.setProperty(FlowState.class, new FlowState(stream, 0));
} }
@Override @Override
public void streamActive(Http2Stream stream) { public void onStreamActive(Http2Stream stream) {
// Need to be sure the stream's initial window is adjusted for SETTINGS // Need to be sure the stream's initial window is adjusted for SETTINGS
// frames which may have been exchanged while it was in IDLE // frames which may have been exchanged while it was in IDLE
state(stream).window(initialWindowSize); state(stream).window(initialWindowSize);
} }
@Override @Override
public void streamClosed(Http2Stream stream) { public void onStreamClosed(Http2Stream stream) {
// Any pending frames can never be written, cancel and // Any pending frames can never be written, cancel and
// write errors for any pending frames. // write errors for any pending frames.
state(stream).cancel(); state(stream).cancel();
} }
@Override @Override
public void streamHalfClosed(Http2Stream stream) { public void onStreamHalfClosed(Http2Stream stream) {
if (State.HALF_CLOSED_LOCAL.equals(stream.state())) { if (State.HALF_CLOSED_LOCAL.equals(stream.state())) {
/** /**
* When this method is called there should not be any * When this method is called there should not be any
@ -86,7 +86,7 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
} }
@Override @Override
public void priorityTreeParentChanged(Http2Stream stream, Http2Stream oldParent) { public void onPriorityTreeParentChanged(Http2Stream stream, Http2Stream oldParent) {
Http2Stream parent = stream.parent(); Http2Stream parent = stream.parent();
if (parent != null) { if (parent != null) {
int delta = state(stream).streamableBytesForTree(); int delta = state(stream).streamableBytesForTree();
@ -97,7 +97,7 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
} }
@Override @Override
public void priorityTreeParentChanging(Http2Stream stream, Http2Stream newParent) { public void onPriorityTreeParentChanging(Http2Stream stream, Http2Stream newParent) {
Http2Stream parent = stream.parent(); Http2Stream parent = stream.parent();
if (parent != null) { if (parent != null) {
int delta = -state(stream).streamableBytesForTree(); int delta = -state(stream).streamableBytesForTree();

View File

@ -40,7 +40,7 @@ import io.netty.handler.codec.compression.ZlibWrapper;
public class DelegatingDecompressorFrameListener extends Http2FrameListenerDecorator { public class DelegatingDecompressorFrameListener extends Http2FrameListenerDecorator {
private static final Http2ConnectionAdapter CLEAN_UP_LISTENER = new Http2ConnectionAdapter() { private static final Http2ConnectionAdapter CLEAN_UP_LISTENER = new Http2ConnectionAdapter() {
@Override @Override
public void streamRemoved(Http2Stream stream) { public void onStreamRemoved(Http2Stream stream) {
final Http2Decompressor decompressor = decompressor(stream); final Http2Decompressor decompressor = decompressor(stream);
if (decompressor != null) { if (decompressor != null) {
cleanup(stream, decompressor); cleanup(stream, decompressor);

View File

@ -15,6 +15,8 @@
package io.netty.handler.codec.http2; package io.netty.handler.codec.http2;
import io.netty.buffer.ByteBuf;
import java.util.Collection; import java.util.Collection;
/** /**
@ -30,31 +32,31 @@ public interface Http2Connection {
* Notifies the listener that the given stream was added to the connection. This stream may * Notifies the listener that the given stream was added to the connection. This stream may
* not yet be active (i.e. {@code OPEN} or {@code HALF CLOSED}). * not yet be active (i.e. {@code OPEN} or {@code HALF CLOSED}).
*/ */
void streamAdded(Http2Stream stream); void onStreamAdded(Http2Stream stream);
/** /**
* Notifies the listener that the given stream was made active (i.e. {@code OPEN} or {@code HALF CLOSED}). * Notifies the listener that the given stream was made active (i.e. {@code OPEN} or {@code HALF CLOSED}).
*/ */
void streamActive(Http2Stream stream); void onStreamActive(Http2Stream stream);
/** /**
* Notifies the listener that the given stream is now {@code HALF CLOSED}. The stream can be * Notifies the listener that the given stream is now {@code HALF CLOSED}. The stream can be
* inspected to determine which side is {@code CLOSED}. * inspected to determine which side is {@code CLOSED}.
*/ */
void streamHalfClosed(Http2Stream stream); void onStreamHalfClosed(Http2Stream stream);
/** /**
* Notifies the listener that the given stream is now {@code CLOSED} in both directions and will no longer * Notifies the listener that the given stream is now {@code CLOSED} in both directions and will no longer
* be returned by {@link #activeStreams()}. * be returned by {@link #activeStreams()}.
*/ */
void streamClosed(Http2Stream stream); void onStreamClosed(Http2Stream stream);
/** /**
* Notifies the listener that the given stream has now been removed from the connection and * Notifies the listener that the given stream has now been removed from the connection and
* will no longer be returned via {@link Http2Connection#stream(int)}. The connection may * will no longer be returned via {@link Http2Connection#stream(int)}. The connection may
* maintain inactive streams for some time before removing them. * maintain inactive streams for some time before removing them.
*/ */
void streamRemoved(Http2Stream stream); void onStreamRemoved(Http2Stream stream);
/** /**
* Notifies the listener that a priority tree parent change has occurred. This method will be invoked * Notifies the listener that a priority tree parent change has occurred. This method will be invoked
@ -64,7 +66,7 @@ public interface Http2Connection {
* @param stream The stream which had a parent change (new parent and children will be steady state) * @param stream The stream which had a parent change (new parent and children will be steady state)
* @param oldParent The old parent which {@code stream} used to be a child of (may be {@code null}) * @param oldParent The old parent which {@code stream} used to be a child of (may be {@code null})
*/ */
void priorityTreeParentChanged(Http2Stream stream, Http2Stream oldParent); void onPriorityTreeParentChanged(Http2Stream stream, Http2Stream oldParent);
/** /**
* Notifies the listener that a parent dependency is about to change * Notifies the listener that a parent dependency is about to change
@ -73,7 +75,7 @@ public interface Http2Connection {
* @param stream The stream which the parent is about to change to {@code newParent} * @param stream The stream which the parent is about to change to {@code newParent}
* @param newParent The stream which will be the parent of {@code stream} * @param newParent The stream which will be the parent of {@code stream}
*/ */
void priorityTreeParentChanging(Http2Stream stream, Http2Stream newParent); void onPriorityTreeParentChanging(Http2Stream stream, Http2Stream newParent);
/** /**
* Notifies the listener that the weight has changed for {@code stream} * Notifies the listener that the weight has changed for {@code stream}
@ -83,9 +85,26 @@ public interface Http2Connection {
void onWeightChanged(Http2Stream stream, short oldWeight); void onWeightChanged(Http2Stream stream, short oldWeight);
/** /**
* Called when a GO_AWAY frame has either been sent or received for the connection. * Called when a {@code GOAWAY} frame was sent for the connection.
*
* @param lastStreamId the last known stream of the remote endpoint.
* @param errorCode the error code, if abnormal closure.
* @param debugData application-defined debug data.
*/ */
void goingAway(); void onGoAwaySent(int lastStreamId, long errorCode, ByteBuf debugData);
/**
* Called when a {@code GOAWAY} was received from the remote endpoint. This event handler duplicates {@link
* Http2FrameListener#onGoAwayRead(io.netty.channel.ChannelHandlerContext, int, long, io.netty.buffer.ByteBuf)}
* but is added here in order to simplify application logic for handling {@code GOAWAY} in a uniform way. An
* application should generally not handle both events, but if it does this method is called first, before
* notifying the {@link Http2FrameListener}.
*
* @param lastStreamId the last known stream of the remote endpoint.
* @param errorCode the error code, if abnormal closure.
* @param debugData application-defined debug data.
*/
void onGoAwayReceived(int lastStreamId, long errorCode, ByteBuf debugData);
} }
/** /**
@ -269,7 +288,7 @@ public interface Http2Connection {
/** /**
* Indicates that a {@code GOAWAY} was received from the remote endpoint and sets the last known stream. * Indicates that a {@code GOAWAY} was received from the remote endpoint and sets the last known stream.
*/ */
void goAwayReceived(int lastKnownStream); void goAwayReceived(int lastKnownStream, long errorCode, ByteBuf message);
/** /**
* Indicates whether or not a {@code GOAWAY} was sent to the remote endpoint. * Indicates whether or not a {@code GOAWAY} was sent to the remote endpoint.
@ -279,10 +298,5 @@ public interface Http2Connection {
/** /**
* Indicates that a {@code GOAWAY} was sent to the remote endpoint and sets the last known stream. * Indicates that a {@code GOAWAY} was sent to the remote endpoint and sets the last known stream.
*/ */
void goAwaySent(int lastKnownStream); void goAwaySent(int lastKnownStream, long errorCode, ByteBuf message);
/**
* Indicates whether or not either endpoint has received a GOAWAY.
*/
boolean isGoAway();
} }

View File

@ -14,41 +14,47 @@
*/ */
package io.netty.handler.codec.http2; package io.netty.handler.codec.http2;
import io.netty.buffer.ByteBuf;
/** /**
* Provides empty implementations of all {@link Http2Connection.Listener} methods. * Provides empty implementations of all {@link Http2Connection.Listener} methods.
*/ */
public class Http2ConnectionAdapter implements Http2Connection.Listener { public class Http2ConnectionAdapter implements Http2Connection.Listener {
@Override @Override
public void streamAdded(Http2Stream stream) { public void onStreamAdded(Http2Stream stream) {
} }
@Override @Override
public void streamActive(Http2Stream stream) { public void onStreamActive(Http2Stream stream) {
} }
@Override @Override
public void streamHalfClosed(Http2Stream stream) { public void onStreamHalfClosed(Http2Stream stream) {
} }
@Override @Override
public void streamClosed(Http2Stream stream) { public void onStreamClosed(Http2Stream stream) {
} }
@Override @Override
public void streamRemoved(Http2Stream stream) { public void onStreamRemoved(Http2Stream stream) {
} }
@Override @Override
public void goingAway() { public void onGoAwaySent(int lastStreamId, long errorCode, ByteBuf debugData) {
} }
@Override @Override
public void priorityTreeParentChanged(Http2Stream stream, Http2Stream oldParent) { public void onGoAwayReceived(int lastStreamId, long errorCode, ByteBuf debugData) {
} }
@Override @Override
public void priorityTreeParentChanging(Http2Stream stream, Http2Stream newParent) { public void onPriorityTreeParentChanged(Http2Stream stream, Http2Stream oldParent) {
}
@Override
public void onPriorityTreeParentChanging(Http2Stream stream, Http2Stream newParent) {
} }
@Override @Override

View File

@ -546,15 +546,16 @@ public class Http2ConnectionHandler extends ByteToMessageDecoder implements Http
public ChannelFuture writeGoAway(ChannelHandlerContext ctx, int lastStreamId, long errorCode, ByteBuf debugData, public ChannelFuture writeGoAway(ChannelHandlerContext ctx, int lastStreamId, long errorCode, ByteBuf debugData,
ChannelPromise promise) { ChannelPromise promise) {
Http2Connection connection = connection(); Http2Connection connection = connection();
if (connection.isGoAway()) { if (connection.goAwayReceived() || connection.goAwaySent()) {
debugData.release(); debugData.release();
return ctx.newSucceededFuture(); return ctx.newSucceededFuture();
} }
connection.goAwaySent(lastStreamId, errorCode, debugData);
ChannelFuture future = frameWriter().writeGoAway(ctx, lastStreamId, errorCode, debugData, promise); ChannelFuture future = frameWriter().writeGoAway(ctx, lastStreamId, errorCode, debugData, promise);
ctx.flush(); ctx.flush();
connection.goAwaySent(lastStreamId);
return future; return future;
} }
@ -563,7 +564,7 @@ public class Http2ConnectionHandler extends ByteToMessageDecoder implements Http
*/ */
private ChannelFuture writeGoAway(ChannelHandlerContext ctx, Http2Exception cause) { private ChannelFuture writeGoAway(ChannelHandlerContext ctx, Http2Exception cause) {
Http2Connection connection = connection(); Http2Connection connection = connection();
if (connection.isGoAway()) { if (connection.goAwayReceived() || connection.goAwaySent()) {
return ctx.newSucceededFuture(); return ctx.newSucceededFuture();
} }

View File

@ -84,31 +84,31 @@ public class Http2EventAdapter implements Http2Connection.Listener, Http2FrameLi
} }
@Override @Override
public void streamAdded(Http2Stream stream) { public void onStreamAdded(Http2Stream stream) {
} }
@Override @Override
public void streamActive(Http2Stream stream) { public void onStreamActive(Http2Stream stream) {
} }
@Override @Override
public void streamHalfClosed(Http2Stream stream) { public void onStreamHalfClosed(Http2Stream stream) {
} }
@Override @Override
public void streamClosed(Http2Stream stream) { public void onStreamClosed(Http2Stream stream) {
} }
@Override @Override
public void streamRemoved(Http2Stream stream) { public void onStreamRemoved(Http2Stream stream) {
} }
@Override @Override
public void priorityTreeParentChanged(Http2Stream stream, Http2Stream oldParent) { public void onPriorityTreeParentChanged(Http2Stream stream, Http2Stream oldParent) {
} }
@Override @Override
public void priorityTreeParentChanging(Http2Stream stream, Http2Stream newParent) { public void onPriorityTreeParentChanging(Http2Stream stream, Http2Stream newParent) {
} }
@Override @Override
@ -116,6 +116,10 @@ public class Http2EventAdapter implements Http2Connection.Listener, Http2FrameLi
} }
@Override @Override
public void goingAway() { public void onGoAwaySent(int lastStreamId, long errorCode, ByteBuf debugData) {
}
@Override
public void onGoAwayReceived(int lastStreamId, long errorCode, ByteBuf debugData) {
} }
} }

View File

@ -155,7 +155,7 @@ public class InboundHttp2ToHttpAdapter extends Http2EventAdapter {
} }
@Override @Override
public void streamRemoved(Http2Stream stream) { public void onStreamRemoved(Http2Stream stream) {
removeMessage(stream.id()); removeMessage(stream.id());
} }

View File

@ -166,7 +166,7 @@ public final class InboundHttp2ToHttpPriorityAdapter extends InboundHttp2ToHttpA
} }
@Override @Override
public void priorityTreeParentChanged(Http2Stream stream, Http2Stream oldParent) { public void onPriorityTreeParentChanged(Http2Stream stream, Http2Stream oldParent) {
Http2Stream parent = stream.parent(); Http2Stream parent = stream.parent();
FullHttpMessage msg = messageMap.get(stream.id()); FullHttpMessage msg = messageMap.get(stream.id());
if (msg == null) { if (msg == null) {

View File

@ -271,7 +271,7 @@ public class DataCompressionHttp2Test {
serverConnection.addListener(new Http2ConnectionAdapter() { serverConnection.addListener(new Http2ConnectionAdapter() {
@Override @Override
public void streamHalfClosed(Http2Stream stream) { public void onStreamHalfClosed(Http2Stream stream) {
serverLatch.countDown(); serverLatch.countDown();
} }
}); });

View File

@ -491,7 +491,7 @@ public class DefaultHttp2ConnectionDecoderTest {
@Test @Test
public void goAwayShouldReadShouldUpdateConnectionState() throws Exception { public void goAwayShouldReadShouldUpdateConnectionState() throws Exception {
decode().onGoAwayRead(ctx, 1, 2L, EMPTY_BUFFER); decode().onGoAwayRead(ctx, 1, 2L, EMPTY_BUFFER);
verify(connection).goAwayReceived(1); verify(connection).goAwayReceived(eq(1), eq(2L), eq(EMPTY_BUFFER));
verify(listener).onGoAwayRead(eq(ctx), eq(1), eq(2L), eq(EMPTY_BUFFER)); verify(listener).onGoAwayRead(eq(ctx), eq(1), eq(2L), eq(EMPTY_BUFFER));
} }

View File

@ -201,7 +201,7 @@ public class DefaultHttp2ConnectionEncoderTest {
@Test @Test
public void dataWriteAfterGoAwayShouldFail() throws Exception { public void dataWriteAfterGoAwayShouldFail() throws Exception {
when(connection.isGoAway()).thenReturn(true); when(connection.goAwayReceived()).thenReturn(true);
final ByteBuf data = dummyData(); final ByteBuf data = dummyData();
try { try {
ChannelFuture future = encoder.writeData(ctx, STREAM_ID, data, 0, true, promise); ChannelFuture future = encoder.writeData(ctx, STREAM_ID, data, 0, true, promise);
@ -304,7 +304,7 @@ public class DefaultHttp2ConnectionEncoderTest {
@Test @Test
public void headersWriteAfterGoAwayShouldFail() throws Exception { public void headersWriteAfterGoAwayShouldFail() throws Exception {
when(connection.isGoAway()).thenReturn(true); when(connection.goAwayReceived()).thenReturn(true);
ChannelFuture future = encoder.writeHeaders( ChannelFuture future = encoder.writeHeaders(
ctx, 5, EmptyHttp2Headers.INSTANCE, 0, (short) 255, false, 0, false, promise); ctx, 5, EmptyHttp2Headers.INSTANCE, 0, (short) 255, false, 0, false, promise);
verify(local, never()).createStream(anyInt()); verify(local, never()).createStream(anyInt());
@ -345,7 +345,7 @@ public class DefaultHttp2ConnectionEncoderTest {
@Test @Test
public void pushPromiseWriteAfterGoAwayShouldFail() throws Exception { public void pushPromiseWriteAfterGoAwayShouldFail() throws Exception {
when(connection.isGoAway()).thenReturn(true); when(connection.goAwayReceived()).thenReturn(true);
ChannelFuture future = ChannelFuture future =
encoder.writePushPromise(ctx, STREAM_ID, PUSH_STREAM_ID, encoder.writePushPromise(ctx, STREAM_ID, PUSH_STREAM_ID,
EmptyHttp2Headers.INSTANCE, 0, promise); EmptyHttp2Headers.INSTANCE, 0, promise);
@ -362,7 +362,7 @@ public class DefaultHttp2ConnectionEncoderTest {
@Test @Test
public void priorityWriteAfterGoAwayShouldFail() throws Exception { public void priorityWriteAfterGoAwayShouldFail() throws Exception {
when(connection.isGoAway()).thenReturn(true); when(connection.goAwayReceived()).thenReturn(true);
ChannelFuture future = encoder.writePriority(ctx, STREAM_ID, 0, (short) 255, true, promise); ChannelFuture future = encoder.writePriority(ctx, STREAM_ID, 0, (short) 255, true, promise);
assertTrue(future.awaitUninterruptibly().cause() instanceof Http2Exception); assertTrue(future.awaitUninterruptibly().cause() instanceof Http2Exception);
} }
@ -426,7 +426,7 @@ public class DefaultHttp2ConnectionEncoderTest {
@Test @Test
public void pingWriteAfterGoAwayShouldFail() throws Exception { public void pingWriteAfterGoAwayShouldFail() throws Exception {
when(connection.isGoAway()).thenReturn(true); when(connection.goAwayReceived()).thenReturn(true);
ChannelFuture future = encoder.writePing(ctx, false, emptyPingBuf(), promise); ChannelFuture future = encoder.writePing(ctx, false, emptyPingBuf(), promise);
assertTrue(future.awaitUninterruptibly().cause() instanceof Http2Exception); assertTrue(future.awaitUninterruptibly().cause() instanceof Http2Exception);
} }
@ -439,7 +439,7 @@ public class DefaultHttp2ConnectionEncoderTest {
@Test @Test
public void settingsWriteAfterGoAwayShouldFail() throws Exception { public void settingsWriteAfterGoAwayShouldFail() throws Exception {
when(connection.isGoAway()).thenReturn(true); when(connection.goAwayReceived()).thenReturn(true);
ChannelFuture future = encoder.writeSettings(ctx, new Http2Settings(), promise); ChannelFuture future = encoder.writeSettings(ctx, new Http2Settings(), promise);
assertTrue(future.awaitUninterruptibly().cause() instanceof Http2Exception); assertTrue(future.awaitUninterruptibly().cause() instanceof Http2Exception);
} }

View File

@ -30,6 +30,8 @@ import static org.mockito.Mockito.never;
import static org.mockito.Mockito.reset; import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.times; import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.http2.Http2Connection.Endpoint; import io.netty.handler.codec.http2.Http2Connection.Endpoint;
import io.netty.handler.codec.http2.Http2Stream.State; import io.netty.handler.codec.http2.Http2Stream.State;
@ -176,7 +178,7 @@ public class DefaultHttp2ConnectionTest {
@Test(expected = Http2Exception.class) @Test(expected = Http2Exception.class)
public void goAwayReceivedShouldDisallowCreation() throws Http2Exception { public void goAwayReceivedShouldDisallowCreation() throws Http2Exception {
server.goAwayReceived(0); server.goAwayReceived(0, 1L, Unpooled.EMPTY_BUFFER);
server.remote().createStream(3).open(true); server.remote().createStream(3).open(true);
} }
@ -321,9 +323,9 @@ public class DefaultHttp2ConnectionTest {
streamD.setPriority(streamD.parent().id(), newWeight, false); streamD.setPriority(streamD.parent().id(), newWeight, false);
verify(clientListener).onWeightChanged(eq(streamD), eq(oldWeight)); verify(clientListener).onWeightChanged(eq(streamD), eq(oldWeight));
assertEquals(streamD.weight(), newWeight); assertEquals(streamD.weight(), newWeight);
verify(clientListener, never()).priorityTreeParentChanging(any(Http2Stream.class), verify(clientListener, never()).onPriorityTreeParentChanging(any(Http2Stream.class),
any(Http2Stream.class)); any(Http2Stream.class));
verify(clientListener, never()).priorityTreeParentChanged(any(Http2Stream.class), verify(clientListener, never()).onPriorityTreeParentChanged(any(Http2Stream.class),
any(Http2Stream.class)); any(Http2Stream.class));
} }
@ -552,7 +554,7 @@ public class DefaultHttp2ConnectionTest {
assertSame(expectedArg1.size(), expectedArg2.size()); assertSame(expectedArg1.size(), expectedArg2.size());
ArgumentCaptor<Http2Stream> arg1Captor = ArgumentCaptor.forClass(Http2Stream.class); ArgumentCaptor<Http2Stream> arg1Captor = ArgumentCaptor.forClass(Http2Stream.class);
ArgumentCaptor<Http2Stream> arg2Captor = ArgumentCaptor.forClass(Http2Stream.class); ArgumentCaptor<Http2Stream> arg2Captor = ArgumentCaptor.forClass(Http2Stream.class);
verify(clientListener, times(expectedArg1.size())).priorityTreeParentChanging(arg1Captor.capture(), verify(clientListener, times(expectedArg1.size())).onPriorityTreeParentChanging(arg1Captor.capture(),
arg2Captor.capture()); arg2Captor.capture());
List<Http2Stream> capturedArg1 = arg1Captor.getAllValues(); List<Http2Stream> capturedArg1 = arg1Captor.getAllValues();
List<Http2Stream> capturedArg2 = arg2Captor.getAllValues(); List<Http2Stream> capturedArg2 = arg2Captor.getAllValues();
@ -568,7 +570,7 @@ public class DefaultHttp2ConnectionTest {
assertSame(expectedArg1.size(), expectedArg2.size()); assertSame(expectedArg1.size(), expectedArg2.size());
ArgumentCaptor<Http2Stream> arg1Captor = ArgumentCaptor.forClass(Http2Stream.class); ArgumentCaptor<Http2Stream> arg1Captor = ArgumentCaptor.forClass(Http2Stream.class);
ArgumentCaptor<Http2Stream> arg2Captor = ArgumentCaptor.forClass(Http2Stream.class); ArgumentCaptor<Http2Stream> arg2Captor = ArgumentCaptor.forClass(Http2Stream.class);
verify(clientListener, times(expectedArg1.size())).priorityTreeParentChanged(arg1Captor.capture(), verify(clientListener, times(expectedArg1.size())).onPriorityTreeParentChanged(arg1Captor.capture(),
arg2Captor.capture()); arg2Captor.capture());
List<Http2Stream> capturedArg1 = arg1Captor.getAllValues(); List<Http2Stream> capturedArg1 = arg1Captor.getAllValues();
List<Http2Stream> capturedArg2 = arg2Captor.getAllValues(); List<Http2Stream> capturedArg2 = arg2Captor.getAllValues();
@ -597,10 +599,10 @@ public class DefaultHttp2ConnectionTest {
} }
private void verifyParentChanging(Http2Stream stream, Http2Stream newParent) { private void verifyParentChanging(Http2Stream stream, Http2Stream newParent) {
verify(clientListener).priorityTreeParentChanging(streamEq(stream), streamEq(newParent)); verify(clientListener).onPriorityTreeParentChanging(streamEq(stream), streamEq(newParent));
} }
private void verifyParentChanged(Http2Stream stream, Http2Stream oldParent) { private void verifyParentChanged(Http2Stream stream, Http2Stream oldParent) {
verify(clientListener).priorityTreeParentChanged(streamEq(stream), streamEq(oldParent)); verify(clientListener).onPriorityTreeParentChanged(streamEq(stream), streamEq(oldParent));
} }
} }