HTTP/2 Connection Listener Unchecked Exceptions

Motivation:
The DefaultHttp2Connection is not checking for RuntimeExceptions when invoking Http2Connection.Listener methods. This is a problem for a few reasons: 1. The state of DefaultHttp2Connection will be corrupted if a listener throws a RuntimeException. 2. If the first listener throws then no other listeners will be notified, which may further corrupt state that is updated as a result of listeners being notified.

Modifications:
- Document that RuntimeExceptions are not supported for Http2Connection.Listener methods, and will be logged as an error.
- Update DefaultHttp2Connection to handle and exception for each listener that is notified, and be sure that 1 listener throwing an exception does not prevent others from being notified.

Result:
More robust DefaultHttp2Connection.
This commit is contained in:
Scott Mitchell 2015-04-09 12:30:16 -07:00
parent d14afe88a4
commit 9c4af6af7e
4 changed files with 262 additions and 26 deletions

View File

@ -40,6 +40,8 @@ import io.netty.util.collection.IntObjectHashMap;
import io.netty.util.collection.IntObjectMap;
import io.netty.util.collection.PrimitiveCollections;
import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
import java.util.ArrayDeque;
import java.util.ArrayList;
@ -55,6 +57,7 @@ import java.util.Set;
* Simple implementation of {@link Http2Connection}.
*/
public class DefaultHttp2Connection implements Http2Connection {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(DefaultHttp2Connection.class);
// Fields accessed by inner classes
final IntObjectMap<Http2Stream> streamMap = new IntObjectHashMap<Http2Stream>();
final ConnectionStream connectionStream = new ConnectionStream();
@ -164,8 +167,12 @@ public class DefaultHttp2Connection implements Http2Connection {
@Override
public void goAwayReceived(final int lastKnownStream, long errorCode, ByteBuf debugData) {
localEndpoint.lastKnownStream(lastKnownStream);
for (Listener listener : listeners) {
listener.onGoAwayReceived(lastKnownStream, errorCode, debugData);
for (int i = 0; i < listeners.size(); ++i) {
try {
listeners.get(i).onGoAwayReceived(lastKnownStream, errorCode, debugData);
} catch (RuntimeException e) {
logger.error("Caught RuntimeException from listener onGoAwayReceived.", e);
}
}
try {
@ -191,8 +198,12 @@ public class DefaultHttp2Connection implements Http2Connection {
@Override
public void goAwaySent(final int lastKnownStream, long errorCode, ByteBuf debugData) {
remoteEndpoint.lastKnownStream(lastKnownStream);
for (Listener listener : listeners) {
listener.onGoAwaySent(lastKnownStream, errorCode, debugData);
for (int i = 0; i < listeners.size(); ++i) {
try {
listeners.get(i).onGoAwaySent(lastKnownStream, errorCode, debugData);
} catch (RuntimeException e) {
logger.error("Caught RuntimeException from listener onGoAwaySent.", e);
}
}
try {
@ -226,7 +237,11 @@ public class DefaultHttp2Connection implements Http2Connection {
streamMap.remove(stream.id());
for (int i = 0; i < listeners.size(); i++) {
listeners.get(i).onStreamRemoved(stream);
try {
listeners.get(i).onStreamRemoved(stream);
} catch (RuntimeException e) {
logger.error("Caught RuntimeException from listener onStreamRemoved.", e);
}
}
}
}
@ -498,7 +513,11 @@ public class DefaultHttp2Connection implements Http2Connection {
private void notifyHalfClosed(Http2Stream stream) {
for (int i = 0; i < listeners.size(); i++) {
listeners.get(i).onStreamHalfClosed(stream);
try {
listeners.get(i).onStreamHalfClosed(stream);
} catch (RuntimeException e) {
logger.error("Caught RuntimeException from listener onStreamHalfClosed.", e);
}
}
}
@ -535,7 +554,11 @@ public class DefaultHttp2Connection implements Http2Connection {
final short oldWeight = this.weight;
this.weight = weight;
for (int i = 0; i < listeners.size(); i++) {
listeners.get(i).onWeightChanged(this, oldWeight);
try {
listeners.get(i).onWeightChanged(this, oldWeight);
} catch (RuntimeException e) {
logger.error("Caught RuntimeException from listener onWeightChanged.", e);
}
}
}
}
@ -703,7 +726,11 @@ public class DefaultHttp2Connection implements Http2Connection {
* @param l The listener to notify
*/
public void notifyListener(Listener l) {
l.onPriorityTreeParentChanged(stream, oldParent);
try {
l.onPriorityTreeParentChanged(stream, oldParent);
} catch (RuntimeException e) {
logger.error("Caught RuntimeException from listener onPriorityTreeParentChanged.", e);
}
}
}
@ -722,7 +749,11 @@ public class DefaultHttp2Connection implements Http2Connection {
private void notifyParentChanging(Http2Stream stream, Http2Stream newParent) {
for (int i = 0; i < listeners.size(); i++) {
listeners.get(i).onPriorityTreeParentChanging(stream, newParent);
try {
listeners.get(i).onPriorityTreeParentChanging(stream, newParent);
} catch (RuntimeException e) {
logger.error("Caught RuntimeException from listener onPriorityTreeParentChanging.", e);
}
}
}
@ -869,7 +900,11 @@ public class DefaultHttp2Connection implements Http2Connection {
// Notify the listeners of the event.
for (int i = 0; i < listeners.size(); i++) {
listeners.get(i).onStreamAdded(stream);
try {
listeners.get(i).onStreamAdded(stream);
} catch (RuntimeException e) {
logger.error("Caught RuntimeException from listener onStreamAdded.", e);
}
}
notifyParentChanged(events);
@ -971,6 +1006,9 @@ public class DefaultHttp2Connection implements Http2Connection {
interface Event {
/**
* Trigger the original intention of this event. Expect to modify {@link #streams}.
* <p>
* If a {@link RuntimeException} object is thrown it will be logged and <strong>not propagated</strong>.
* Throwing from this method is not supported and is considered a programming error.
*/
void process();
}
@ -1033,7 +1071,11 @@ public class DefaultHttp2Connection implements Http2Connection {
if (event == null) {
break;
}
event.process();
try {
event.process();
} catch (RuntimeException e) {
logger.error("Caught RuntimeException while processing pending ActiveStreams$Event.", e);
}
}
}
}
@ -1045,7 +1087,11 @@ public class DefaultHttp2Connection implements Http2Connection {
stream.createdBy().numActiveStreams++;
for (int i = 0; i < listeners.size(); i++) {
listeners.get(i).onStreamActive(stream);
try {
listeners.get(i).onStreamActive(stream);
} catch (RuntimeException e) {
logger.error("Caught RuntimeException from listener onStreamActive.", e);
}
}
}
}
@ -1057,7 +1103,11 @@ public class DefaultHttp2Connection implements Http2Connection {
stream.createdBy().numActiveStreams--;
for (int i = 0; i < listeners.size(); i++) {
listeners.get(i).onStreamClosed(stream);
try {
listeners.get(i).onStreamClosed(stream);
} catch (RuntimeException e) {
logger.error("Caught RuntimeException from listener onStreamClosed.", e);
}
}
} finally {
// Mark this stream for removal.

View File

@ -21,7 +21,6 @@ import io.netty.buffer.ByteBuf;
* Manager for the state of an HTTP/2 connection with the remote end-point.
*/
public interface Http2Connection {
/**
* Listener for life-cycle events for streams in this connection.
*/
@ -29,23 +28,35 @@ public interface Http2Connection {
/**
* Notifies the listener that the given stream was added to the connection. This stream may
* not yet be active (i.e. {@code OPEN} or {@code HALF CLOSED}).
* <p>
* If a {@link RuntimeException} is thrown it will be logged and <strong>not propagated</strong>.
* Throwing from this method is not supported and is considered a programming error.
*/
void onStreamAdded(Http2Stream stream);
/**
* Notifies the listener that the given stream was made active (i.e. {@code OPEN} or {@code HALF CLOSED}).
* <p>
* If a {@link RuntimeException} is thrown it will be logged and <strong>not propagated</strong>.
* Throwing from this method is not supported and is considered a programming error.
*/
void onStreamActive(Http2Stream stream);
/**
* Notifies the listener that the given stream is now {@code HALF CLOSED}. The stream can be
* inspected to determine which side is {@code CLOSED}.
* <p>
* If a {@link RuntimeException} is thrown it will be logged and <strong>not propagated</strong>.
* Throwing from this method is not supported and is considered a programming error.
*/
void onStreamHalfClosed(Http2Stream stream);
/**
* Notifies the listener that the given stream is now {@code CLOSED} in both directions and will no longer
* be accessible via {@link #forEachActiveStream(Http2StreamVisitor)}.
* <p>
* If a {@link RuntimeException} is thrown it will be logged and <strong>not propagated</strong>.
* Throwing from this method is not supported and is considered a programming error.
*/
void onStreamClosed(Http2Stream stream);
@ -53,6 +64,9 @@ public interface Http2Connection {
* Notifies the listener that the given stream has now been removed from the connection and
* will no longer be returned via {@link Http2Connection#stream(int)}. The connection may
* maintain inactive streams for some time before removing them.
* <p>
* If a {@link RuntimeException} is thrown it will be logged and <strong>not propagated</strong>.
* Throwing from this method is not supported and is considered a programming error.
*/
void onStreamRemoved(Http2Stream stream);
@ -61,6 +75,9 @@ public interface Http2Connection {
* in a top down order relative to the priority tree. This method will also be invoked after all tree
* structure changes have been made and the tree is in steady state relative to the priority change
* which caused the tree structure to change.
* <p>
* If a {@link RuntimeException} is thrown it will be logged and <strong>not propagated</strong>.
* Throwing from this method is not supported and is considered a programming error.
* @param stream The stream which had a parent change (new parent and children will be steady state)
* @param oldParent The old parent which {@code stream} used to be a child of (may be {@code null})
*/
@ -70,13 +87,19 @@ public interface Http2Connection {
* Notifies the listener that a parent dependency is about to change
* This is called while the tree is being restructured and so the tree
* structure is not necessarily steady state.
* <p>
* If a {@link RuntimeException} is thrown it will be logged and <strong>not propagated</strong>.
* Throwing from this method is not supported and is considered a programming error.
* @param stream The stream which the parent is about to change to {@code newParent}
* @param newParent The stream which will be the parent of {@code stream}
*/
void onPriorityTreeParentChanging(Http2Stream stream, Http2Stream newParent);
/**
* Notifies the listener that the weight has changed for {@code stream}
* Notifies the listener that the weight has changed for {@code stream}.
* <p>
* If a {@link RuntimeException} is thrown it will be logged and <strong>not propagated</strong>.
* Throwing from this method is not supported and is considered a programming error.
* @param stream The stream which the weight has changed
* @param oldWeight The old weight for {@code stream}
*/
@ -84,7 +107,9 @@ public interface Http2Connection {
/**
* Called when a {@code GOAWAY} frame was sent for the connection.
*
* <p>
* If a {@link RuntimeException} is thrown it will be logged and <strong>not propagated</strong>.
* Throwing from this method is not supported and is considered a programming error.
* @param lastStreamId the last known stream of the remote endpoint.
* @param errorCode the error code, if abnormal closure.
* @param debugData application-defined debug data.
@ -97,7 +122,9 @@ public interface Http2Connection {
* but is added here in order to simplify application logic for handling {@code GOAWAY} in a uniform way. An
* application should generally not handle both events, but if it does this method is called second, after
* notifying the {@link Http2FrameListener}.
*
* <p>
* If a {@link RuntimeException} is thrown it will be logged and <strong>not propagated</strong>.
* Throwing from this method is not supported and is considered a programming error.
* @param lastStreamId the last known stream of the remote endpoint.
* @param errorCode the error code, if abnormal closure.
* @param debugData application-defined debug data.

View File

@ -50,11 +50,11 @@ import java.util.List;
* {@link Http2LocalFlowController}
*/
public class Http2ConnectionHandler extends ByteToMessageDecoder implements Http2LifecycleManager {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(Http2ConnectionHandler.class);
private final Http2ConnectionDecoder decoder;
private final Http2ConnectionEncoder encoder;
private ChannelFutureListener closeListener;
private BaseDecoder byteDecoder;
private static final InternalLogger logger = InternalLoggerFactory.getInstance(Http2ConnectionHandler.class);
public Http2ConnectionHandler(boolean server, Http2FrameListener listener) {
this(new DefaultHttp2Connection(server), listener);

View File

@ -21,28 +21,35 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.anyShort;
import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.isNull;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.http2.Http2Connection.Endpoint;
import io.netty.handler.codec.http2.Http2Stream.State;
import io.netty.util.internal.PlatformDependent;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import javax.xml.ws.Holder;
import java.util.Arrays;
import java.util.List;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
/**
* Tests for {@link DefaultHttp2Connection}.
@ -55,6 +62,9 @@ public class DefaultHttp2ConnectionTest {
@Mock
private Http2Connection.Listener clientListener;
@Mock
private Http2Connection.Listener clientListener2;
@Before
public void setup() {
MockitoAnnotations.initMocks(this);
@ -848,6 +858,155 @@ public class DefaultHttp2ConnectionTest {
assertEquals(p.numChildren() * DEFAULT_PRIORITY_WEIGHT, p.totalChildWeights());
}
/**
* We force {@link #clientListener} methods to all throw a {@link RuntimeException} and verify the following:
* <ol>
* <li>all listener methods are called for both {@link #clientListener} and {@link #clientListener2}</li>
* <li>{@link #clientListener2} is notified after {@link #clientListener}</li>
* <li>{@link #clientListener2} methods are all still called despite {@link #clientListener}'s
* method throwing a {@link RuntimeException}</li>
* </ol>
*/
@Test
public void listenerThrowShouldNotPreventOtherListenersFromBeingNotified() throws Http2Exception {
final boolean[] calledArray = new boolean[128];
// The following setup will ensure that clienListener throws exceptions, and marks a value in an array
// such that clientListener2 will verify that is is set or fail the test.
int methodIndex = 0;
doAnswer(new ListenerExceptionThrower(calledArray, methodIndex))
.when(clientListener).onStreamAdded(any(Http2Stream.class));
doAnswer(new ListenerVerifyCallAnswer(calledArray, methodIndex++))
.when(clientListener2).onStreamAdded(any(Http2Stream.class));
doAnswer(new ListenerExceptionThrower(calledArray, methodIndex))
.when(clientListener).onStreamActive(any(Http2Stream.class));
doAnswer(new ListenerVerifyCallAnswer(calledArray, methodIndex++))
.when(clientListener2).onStreamActive(any(Http2Stream.class));
doAnswer(new ListenerExceptionThrower(calledArray, methodIndex))
.when(clientListener).onStreamHalfClosed(any(Http2Stream.class));
doAnswer(new ListenerVerifyCallAnswer(calledArray, methodIndex++))
.when(clientListener2).onStreamHalfClosed(any(Http2Stream.class));
doAnswer(new ListenerExceptionThrower(calledArray, methodIndex))
.when(clientListener).onStreamClosed(any(Http2Stream.class));
doAnswer(new ListenerVerifyCallAnswer(calledArray, methodIndex++))
.when(clientListener2).onStreamClosed(any(Http2Stream.class));
doAnswer(new ListenerExceptionThrower(calledArray, methodIndex))
.when(clientListener).onStreamRemoved(any(Http2Stream.class));
doAnswer(new ListenerVerifyCallAnswer(calledArray, methodIndex++))
.when(clientListener2).onStreamRemoved(any(Http2Stream.class));
doAnswer(new ListenerExceptionThrower(calledArray, methodIndex))
.when(clientListener).onPriorityTreeParentChanged(any(Http2Stream.class), any(Http2Stream.class));
doAnswer(new ListenerVerifyCallAnswer(calledArray, methodIndex++))
.when(clientListener2).onPriorityTreeParentChanged(any(Http2Stream.class), any(Http2Stream.class));
doAnswer(new ListenerExceptionThrower(calledArray, methodIndex))
.when(clientListener).onPriorityTreeParentChanging(any(Http2Stream.class), any(Http2Stream.class));
doAnswer(new ListenerVerifyCallAnswer(calledArray, methodIndex++))
.when(clientListener2).onPriorityTreeParentChanging(any(Http2Stream.class), any(Http2Stream.class));
doAnswer(new ListenerExceptionThrower(calledArray, methodIndex))
.when(clientListener).onWeightChanged(any(Http2Stream.class), anyShort());
doAnswer(new ListenerVerifyCallAnswer(calledArray, methodIndex++))
.when(clientListener2).onWeightChanged(any(Http2Stream.class), anyShort());
doAnswer(new ListenerExceptionThrower(calledArray, methodIndex))
.when(clientListener).onGoAwaySent(anyInt(), anyLong(), any(ByteBuf.class));
doAnswer(new ListenerVerifyCallAnswer(calledArray, methodIndex++))
.when(clientListener2).onGoAwaySent(anyInt(), anyLong(), any(ByteBuf.class));
doAnswer(new ListenerExceptionThrower(calledArray, methodIndex))
.when(clientListener).onGoAwayReceived(anyInt(), anyLong(), any(ByteBuf.class));
doAnswer(new ListenerVerifyCallAnswer(calledArray, methodIndex++))
.when(clientListener2).onGoAwayReceived(anyInt(), anyLong(), any(ByteBuf.class));
doAnswer(new ListenerExceptionThrower(calledArray, methodIndex))
.when(clientListener).onStreamAdded(any(Http2Stream.class));
doAnswer(new ListenerVerifyCallAnswer(calledArray, methodIndex++))
.when(clientListener2).onStreamAdded(any(Http2Stream.class));
// Now we add clienListener2 and exercise all listener functionality
try {
client.addListener(clientListener2);
Http2Stream stream = client.local().createStream(3);
verify(clientListener).onStreamAdded(any(Http2Stream.class));
verify(clientListener2).onStreamAdded(any(Http2Stream.class));
stream.open(false);
verify(clientListener).onStreamActive(any(Http2Stream.class));
verify(clientListener2).onStreamActive(any(Http2Stream.class));
stream.setPriority(0, (short) (stream.weight() + 1), true);
verify(clientListener).onWeightChanged(any(Http2Stream.class), anyShort());
verify(clientListener2).onWeightChanged(any(Http2Stream.class), anyShort());
verify(clientListener).onPriorityTreeParentChanged(any(Http2Stream.class),
any(Http2Stream.class));
verify(clientListener2).onPriorityTreeParentChanged(any(Http2Stream.class),
any(Http2Stream.class));
verify(clientListener).onPriorityTreeParentChanging(any(Http2Stream.class),
any(Http2Stream.class));
verify(clientListener2).onPriorityTreeParentChanging(any(Http2Stream.class),
any(Http2Stream.class));
stream.closeLocalSide();
verify(clientListener).onStreamHalfClosed(any(Http2Stream.class));
verify(clientListener2).onStreamHalfClosed(any(Http2Stream.class));
stream.close();
verify(clientListener).onStreamClosed(any(Http2Stream.class));
verify(clientListener2).onStreamClosed(any(Http2Stream.class));
verify(clientListener).onStreamRemoved(any(Http2Stream.class));
verify(clientListener2).onStreamRemoved(any(Http2Stream.class));
client.goAwaySent(client.connectionStream().id(), Http2Error.INTERNAL_ERROR.code(), Unpooled.EMPTY_BUFFER);
verify(clientListener).onGoAwaySent(anyInt(), anyLong(), any(ByteBuf.class));
verify(clientListener2).onGoAwaySent(anyInt(), anyLong(), any(ByteBuf.class));
client.goAwayReceived(client.connectionStream().id(),
Http2Error.INTERNAL_ERROR.code(), Unpooled.EMPTY_BUFFER);
verify(clientListener).onGoAwayReceived(anyInt(), anyLong(), any(ByteBuf.class));
verify(clientListener2).onGoAwayReceived(anyInt(), anyLong(), any(ByteBuf.class));
} finally {
client.removeListener(clientListener2);
}
}
private static final class ListenerExceptionThrower implements Answer<Void> {
private static final RuntimeException FAKE_EXCEPTION = new RuntimeException("Fake Exception");
private final boolean[] array;
private final int index;
public ListenerExceptionThrower(boolean[] array, int index) {
this.array = array;
this.index = index;
}
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
array[index] = true;
throw FAKE_EXCEPTION;
}
}
private static final class ListenerVerifyCallAnswer implements Answer<Void> {
private final boolean[] array;
private final int index;
public ListenerVerifyCallAnswer(boolean[] array, int index) {
this.array = array;
this.index = index;
}
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
assertTrue(array[index]);
return null;
}
}
private void verifyParentChanging(List<Http2Stream> expectedArg1, List<Http2Stream> expectedArg2) {
assertSame(expectedArg1.size(), expectedArg2.size());
ArgumentCaptor<Http2Stream> arg1Captor = ArgumentCaptor.forClass(Http2Stream.class);
@ -912,18 +1071,18 @@ public class DefaultHttp2ConnectionTest {
}
private Http2Stream child(Http2Stream parent, final int id) {
try {
final Holder<Http2Stream> streamHolder = new Holder<Http2Stream>();
final AtomicReference<Http2Stream> streamReference = new AtomicReference<Http2Stream>();
parent.forEachChild(new Http2StreamVisitor() {
@Override
public boolean visit(Http2Stream stream) throws Http2Exception {
if (stream.id() == id) {
streamHolder.value = stream;
streamReference.set(stream);
return false;
}
return true;
}
});
return streamHolder.value;
return streamReference.get();
} catch (Http2Exception e) {
PlatformDependent.throwException(e);
return null;