Fixed SpdySessionHandlerTest / Fixed NPE in EmbeddedChannel

- Some tests like SpdySessionHandlerTest accesses outbound buffer
  even before the outbound buffer is initialized by
  AbstractEmbeddedChannel's subclasses, leading to NPE at <init>.
  To fix this problem, subclasses now pass the outbound buffer as
  a constructor parameter to AbstractEmbeddedChannel.
This commit is contained in:
Trustin Lee 2012-06-07 21:33:31 +09:00
parent 994038975a
commit 10f7a31908
4 changed files with 57 additions and 57 deletions

View File

@ -102,7 +102,7 @@ public class SpdySessionHandlerTest {
EmbeddedMessageChannel sessionHandler = new EmbeddedMessageChannel(
new SpdySessionHandler(version, server), new EchoHandler(closeSignal, server));
while (sessionHandler.readInbound() != null) {
while (sessionHandler.readOutbound() != null) {
continue;
}
@ -122,35 +122,35 @@ public class SpdySessionHandlerTest {
// Check if session handler returns INVALID_STREAM if it receives
// a data frame for a Stream-ID that is not open
sessionHandler.writeInbound(new DefaultSpdyDataFrame(localStreamID));
assertRstStream(sessionHandler.readInbound(), localStreamID, SpdyStreamStatus.INVALID_STREAM);
Assert.assertNull(sessionHandler.readInbound());
assertRstStream(sessionHandler.readOutbound(), localStreamID, SpdyStreamStatus.INVALID_STREAM);
Assert.assertNull(sessionHandler.readOutbound());
// Check if session handler returns PROTOCOL_ERROR if it receives
// a data frame for a Stream-ID before receiving a SYN_REPLY frame
sessionHandler.writeInbound(new DefaultSpdyDataFrame(remoteStreamID));
assertRstStream(sessionHandler.readInbound(), remoteStreamID, SpdyStreamStatus.PROTOCOL_ERROR);
Assert.assertNull(sessionHandler.readInbound());
assertRstStream(sessionHandler.readOutbound(), remoteStreamID, SpdyStreamStatus.PROTOCOL_ERROR);
Assert.assertNull(sessionHandler.readOutbound());
remoteStreamID += 2;
// Check if session handler returns PROTOCOL_ERROR if it receives
// multiple SYN_REPLY frames for the same active Stream-ID
sessionHandler.writeInbound(new DefaultSpdySynReplyFrame(remoteStreamID));
Assert.assertNull(sessionHandler.readInbound());
Assert.assertNull(sessionHandler.readOutbound());
sessionHandler.writeInbound(new DefaultSpdySynReplyFrame(remoteStreamID));
assertRstStream(sessionHandler.readInbound(), remoteStreamID, SpdyStreamStatus.STREAM_IN_USE);
Assert.assertNull(sessionHandler.readInbound());
assertRstStream(sessionHandler.readOutbound(), remoteStreamID, SpdyStreamStatus.STREAM_IN_USE);
Assert.assertNull(sessionHandler.readOutbound());
remoteStreamID += 2;
// Check if frame codec correctly compresses/uncompresses headers
sessionHandler.writeInbound(spdySynStreamFrame);
assertSynReply(sessionHandler.readInbound(), localStreamID, false, spdySynStreamFrame);
Assert.assertNull(sessionHandler.readInbound());
assertSynReply(sessionHandler.readOutbound(), localStreamID, false, spdySynStreamFrame);
Assert.assertNull(sessionHandler.readOutbound());
SpdyHeadersFrame spdyHeadersFrame = new DefaultSpdyHeadersFrame(localStreamID);
spdyHeadersFrame.addHeader("HEADER","test1");
spdyHeadersFrame.addHeader("HEADER","test2");
sessionHandler.writeInbound(spdyHeadersFrame);
assertHeaders(sessionHandler.readInbound(), localStreamID, spdyHeadersFrame);
Assert.assertNull(sessionHandler.readInbound());
assertHeaders(sessionHandler.readOutbound(), localStreamID, spdyHeadersFrame);
Assert.assertNull(sessionHandler.readOutbound());
localStreamID += 2;
// Check if session handler closed the streams using the number
@ -160,34 +160,34 @@ public class SpdySessionHandlerTest {
spdySynStreamFrame.setLast(true);
spdySynStreamFrame.setUnidirectional(true);
sessionHandler.writeInbound(spdySynStreamFrame);
assertRstStream(sessionHandler.readInbound(), localStreamID, SpdyStreamStatus.REFUSED_STREAM);
Assert.assertNull(sessionHandler.readInbound());
assertRstStream(sessionHandler.readOutbound(), localStreamID, SpdyStreamStatus.REFUSED_STREAM);
Assert.assertNull(sessionHandler.readOutbound());
// Check if session handler drops active streams if it receives
// a RST_STREAM frame for that Stream-ID
sessionHandler.writeInbound(new DefaultSpdyRstStreamFrame(remoteStreamID, 3));
Assert.assertNull(sessionHandler.readInbound());
Assert.assertNull(sessionHandler.readOutbound());
remoteStreamID += 2;
// Check if session handler honors UNIDIRECTIONAL streams
spdySynStreamFrame.setLast(false);
sessionHandler.writeInbound(spdySynStreamFrame);
Assert.assertNull(sessionHandler.readInbound());
Assert.assertNull(sessionHandler.readOutbound());
spdySynStreamFrame.setUnidirectional(false);
// Check if session handler returns PROTOCOL_ERROR if it receives
// multiple SYN_STREAM frames for the same active Stream-ID
sessionHandler.writeInbound(spdySynStreamFrame);
assertRstStream(sessionHandler.readInbound(), localStreamID, SpdyStreamStatus.PROTOCOL_ERROR);
Assert.assertNull(sessionHandler.readInbound());
assertRstStream(sessionHandler.readOutbound(), localStreamID, SpdyStreamStatus.PROTOCOL_ERROR);
Assert.assertNull(sessionHandler.readOutbound());
localStreamID += 2;
// Check if session handler returns PROTOCOL_ERROR if it receives
// a SYN_STREAM frame with an invalid Stream-ID
spdySynStreamFrame.setStreamID(localStreamID - 1);
sessionHandler.writeInbound(spdySynStreamFrame);
assertRstStream(sessionHandler.readInbound(), localStreamID - 1, SpdyStreamStatus.PROTOCOL_ERROR);
Assert.assertNull(sessionHandler.readInbound());
assertRstStream(sessionHandler.readOutbound(), localStreamID - 1, SpdyStreamStatus.PROTOCOL_ERROR);
Assert.assertNull(sessionHandler.readOutbound());
spdySynStreamFrame.setStreamID(localStreamID);
// Check if session handler correctly limits the number of
@ -195,62 +195,62 @@ public class SpdySessionHandlerTest {
SpdySettingsFrame spdySettingsFrame = new DefaultSpdySettingsFrame();
spdySettingsFrame.setValue(SpdySettingsFrame.SETTINGS_MAX_CONCURRENT_STREAMS, 2);
sessionHandler.writeInbound(spdySettingsFrame);
Assert.assertNull(sessionHandler.readInbound());
Assert.assertNull(sessionHandler.readOutbound());
sessionHandler.writeInbound(spdySynStreamFrame);
assertRstStream(sessionHandler.readInbound(), localStreamID, SpdyStreamStatus.REFUSED_STREAM);
Assert.assertNull(sessionHandler.readInbound());
assertRstStream(sessionHandler.readOutbound(), localStreamID, SpdyStreamStatus.REFUSED_STREAM);
Assert.assertNull(sessionHandler.readOutbound());
spdySettingsFrame.setValue(SpdySettingsFrame.SETTINGS_MAX_CONCURRENT_STREAMS, 4);
sessionHandler.writeInbound(spdySettingsFrame);
Assert.assertNull(sessionHandler.readInbound());
Assert.assertNull(sessionHandler.readOutbound());
sessionHandler.writeInbound(spdySynStreamFrame);
assertSynReply(sessionHandler.readInbound(), localStreamID, false, spdySynStreamFrame);
Assert.assertNull(sessionHandler.readInbound());
assertSynReply(sessionHandler.readOutbound(), localStreamID, false, spdySynStreamFrame);
Assert.assertNull(sessionHandler.readOutbound());
// Check if session handler rejects HEADERS for closed streams
int testStreamID = spdyDataFrame.getStreamID();
sessionHandler.writeInbound(spdyDataFrame);
assertDataFrame(sessionHandler.readInbound(), testStreamID, spdyDataFrame.isLast());
Assert.assertNull(sessionHandler.readInbound());
assertDataFrame(sessionHandler.readOutbound(), testStreamID, spdyDataFrame.isLast());
Assert.assertNull(sessionHandler.readOutbound());
spdyHeadersFrame.setStreamID(testStreamID);
sessionHandler.writeInbound(spdyHeadersFrame);
assertRstStream(sessionHandler.readInbound(), testStreamID, SpdyStreamStatus.INVALID_STREAM);
Assert.assertNull(sessionHandler.readInbound());
assertRstStream(sessionHandler.readOutbound(), testStreamID, SpdyStreamStatus.INVALID_STREAM);
Assert.assertNull(sessionHandler.readOutbound());
// Check if session handler returns PROTOCOL_ERROR if it receives
// an invalid HEADERS frame
spdyHeadersFrame.setStreamID(localStreamID);
spdyHeadersFrame.setInvalid();
sessionHandler.writeInbound(spdyHeadersFrame);
assertRstStream(sessionHandler.readInbound(), localStreamID, SpdyStreamStatus.PROTOCOL_ERROR);
Assert.assertNull(sessionHandler.readInbound());
assertRstStream(sessionHandler.readOutbound(), localStreamID, SpdyStreamStatus.PROTOCOL_ERROR);
Assert.assertNull(sessionHandler.readOutbound());
// Check if session handler returns identical local PINGs
sessionHandler.writeInbound(localPingFrame);
assertPing(sessionHandler.readInbound(), localPingFrame.getID());
Assert.assertNull(sessionHandler.readInbound());
assertPing(sessionHandler.readOutbound(), localPingFrame.getID());
Assert.assertNull(sessionHandler.readOutbound());
// Check if session handler ignores un-initiated remote PINGs
sessionHandler.writeInbound(remotePingFrame);
Assert.assertNull(sessionHandler.readInbound());
Assert.assertNull(sessionHandler.readOutbound());
// Check if session handler sends a GOAWAY frame when closing
sessionHandler.writeInbound(closeMessage);
assertGoAway(sessionHandler.readInbound(), localStreamID);
Assert.assertNull(sessionHandler.readInbound());
assertGoAway(sessionHandler.readOutbound(), localStreamID);
Assert.assertNull(sessionHandler.readOutbound());
localStreamID += 2;
// Check if session handler returns REFUSED_STREAM if it receives
// SYN_STREAM frames after sending a GOAWAY frame
spdySynStreamFrame.setStreamID(localStreamID);
sessionHandler.writeInbound(spdySynStreamFrame);
assertRstStream(sessionHandler.readInbound(), localStreamID, SpdyStreamStatus.REFUSED_STREAM);
Assert.assertNull(sessionHandler.readInbound());
assertRstStream(sessionHandler.readOutbound(), localStreamID, SpdyStreamStatus.REFUSED_STREAM);
Assert.assertNull(sessionHandler.readOutbound());
// Check if session handler ignores Data frames after sending
// a GOAWAY frame
spdyDataFrame.setStreamID(localStreamID);
sessionHandler.writeInbound(spdyDataFrame);
Assert.assertNull(sessionHandler.readInbound());
Assert.assertNull(sessionHandler.readOutbound());
sessionHandler.finish();
}

View File

@ -48,16 +48,19 @@ public abstract class AbstractEmbeddedChannel extends AbstractChannel {
private final SocketAddress remoteAddress = new EmbeddedSocketAddress();
private final Queue<Object> lastInboundMessageBuffer = new ArrayDeque<Object>();
private final ChannelBuffer lastInboundByteBuffer = ChannelBuffers.dynamicBuffer();
protected final Object lastOutboundBuffer;
private Throwable lastException;
private int state; // 0 = OPEN, 1 = ACTIVE, 2 = CLOSED
AbstractEmbeddedChannel(ChannelHandler... handlers) {
AbstractEmbeddedChannel(Object lastOutboundBuffer, ChannelHandler... handlers) {
super(null, null);
if (handlers == null) {
throw new NullPointerException("handlers");
}
this.lastOutboundBuffer = lastOutboundBuffer;
int nHandlers = 0;
boolean hasBuffer = false;
ChannelPipeline p = pipeline();

View File

@ -8,10 +8,8 @@ import java.util.Queue;
public class EmbeddedMessageChannel extends AbstractEmbeddedChannel {
private final Queue<Object> lastOutboundBuffer = new ArrayDeque<Object>();
public EmbeddedMessageChannel(ChannelHandler... handlers) {
super(handlers);
super(new ArrayDeque<Object>(), handlers);
}
@Override
@ -23,12 +21,13 @@ public class EmbeddedMessageChannel extends AbstractEmbeddedChannel {
return pipeline().inboundMessageBuffer();
}
@SuppressWarnings("unchecked")
public Queue<Object> lastOutboundBuffer() {
return lastOutboundBuffer;
return (Queue<Object>) lastOutboundBuffer;
}
public Object readOutbound() {
return lastOutboundBuffer.poll();
return lastOutboundBuffer().poll();
}
public boolean writeInbound(Object msg) {
@ -58,7 +57,7 @@ public class EmbeddedMessageChannel extends AbstractEmbeddedChannel {
if (o == null) {
break;
}
lastOutboundBuffer.add(o);
lastOutboundBuffer().add(o);
}
}
}

View File

@ -7,10 +7,8 @@ import io.netty.channel.ChannelType;
public class EmbeddedStreamChannel extends AbstractEmbeddedChannel {
private final ChannelBuffer lastOutboundBuffer = ChannelBuffers.dynamicBuffer();
public EmbeddedStreamChannel(ChannelHandler... handlers) {
super(handlers);
super(ChannelBuffers.dynamicBuffer(), handlers);
}
@Override
@ -23,17 +21,17 @@ public class EmbeddedStreamChannel extends AbstractEmbeddedChannel {
}
public ChannelBuffer lastOutboundBuffer() {
return lastOutboundBuffer;
return (ChannelBuffer) lastOutboundBuffer;
}
public ChannelBuffer readOutbound() {
if (!lastOutboundBuffer.readable()) {
if (!lastOutboundBuffer().readable()) {
return null;
}
try {
return lastOutboundBuffer.readBytes(lastOutboundBuffer.readableBytes());
return lastOutboundBuffer().readBytes(lastOutboundBuffer().readableBytes());
} finally {
lastOutboundBuffer.clear();
lastOutboundBuffer().clear();
}
}
@ -59,9 +57,9 @@ public class EmbeddedStreamChannel extends AbstractEmbeddedChannel {
@Override
protected void doFlushByteBuffer(ChannelBuffer buf) throws Exception {
if (!lastOutboundBuffer.readable()) {
lastOutboundBuffer.discardReadBytes();
if (!lastOutboundBuffer().readable()) {
lastOutboundBuffer().discardReadBytes();
}
lastOutboundBuffer.writeBytes(buf);
lastOutboundBuffer().writeBytes(buf);
}
}