From 10f7a319088d62d6b883ea8faf149b842de7b0bf Mon Sep 17 00:00:00 2001 From: Trustin Lee Date: Thu, 7 Jun 2012 21:33:31 +0900 Subject: [PATCH] Fixed SpdySessionHandlerTest / Fixed NPE in EmbeddedChannel - Some tests like SpdySessionHandlerTest accesses outbound buffer even before the outbound buffer is initialized by AbstractEmbeddedChannel's subclasses, leading to NPE at . To fix this problem, subclasses now pass the outbound buffer as a constructor parameter to AbstractEmbeddedChannel. --- .../codec/spdy/SpdySessionHandlerTest.java | 80 +++++++++---------- .../embedded/AbstractEmbeddedChannel.java | 5 +- .../embedded/EmbeddedMessageChannel.java | 11 ++- .../embedded/EmbeddedStreamChannel.java | 18 ++--- 4 files changed, 57 insertions(+), 57 deletions(-) diff --git a/codec-http/src/test/java/io/netty/handler/codec/spdy/SpdySessionHandlerTest.java b/codec-http/src/test/java/io/netty/handler/codec/spdy/SpdySessionHandlerTest.java index 9538da2758..3f24939cd3 100644 --- a/codec-http/src/test/java/io/netty/handler/codec/spdy/SpdySessionHandlerTest.java +++ b/codec-http/src/test/java/io/netty/handler/codec/spdy/SpdySessionHandlerTest.java @@ -102,7 +102,7 @@ public class SpdySessionHandlerTest { EmbeddedMessageChannel sessionHandler = new EmbeddedMessageChannel( new SpdySessionHandler(version, server), new EchoHandler(closeSignal, server)); - while (sessionHandler.readInbound() != null) { + while (sessionHandler.readOutbound() != null) { continue; } @@ -122,35 +122,35 @@ public class SpdySessionHandlerTest { // Check if session handler returns INVALID_STREAM if it receives // a data frame for a Stream-ID that is not open sessionHandler.writeInbound(new DefaultSpdyDataFrame(localStreamID)); - assertRstStream(sessionHandler.readInbound(), localStreamID, SpdyStreamStatus.INVALID_STREAM); - Assert.assertNull(sessionHandler.readInbound()); + assertRstStream(sessionHandler.readOutbound(), localStreamID, SpdyStreamStatus.INVALID_STREAM); + Assert.assertNull(sessionHandler.readOutbound()); // Check if session handler returns PROTOCOL_ERROR if it receives // a data frame for a Stream-ID before receiving a SYN_REPLY frame sessionHandler.writeInbound(new DefaultSpdyDataFrame(remoteStreamID)); - assertRstStream(sessionHandler.readInbound(), remoteStreamID, SpdyStreamStatus.PROTOCOL_ERROR); - Assert.assertNull(sessionHandler.readInbound()); + assertRstStream(sessionHandler.readOutbound(), remoteStreamID, SpdyStreamStatus.PROTOCOL_ERROR); + Assert.assertNull(sessionHandler.readOutbound()); remoteStreamID += 2; // Check if session handler returns PROTOCOL_ERROR if it receives // multiple SYN_REPLY frames for the same active Stream-ID sessionHandler.writeInbound(new DefaultSpdySynReplyFrame(remoteStreamID)); - Assert.assertNull(sessionHandler.readInbound()); + Assert.assertNull(sessionHandler.readOutbound()); sessionHandler.writeInbound(new DefaultSpdySynReplyFrame(remoteStreamID)); - assertRstStream(sessionHandler.readInbound(), remoteStreamID, SpdyStreamStatus.STREAM_IN_USE); - Assert.assertNull(sessionHandler.readInbound()); + assertRstStream(sessionHandler.readOutbound(), remoteStreamID, SpdyStreamStatus.STREAM_IN_USE); + Assert.assertNull(sessionHandler.readOutbound()); remoteStreamID += 2; // Check if frame codec correctly compresses/uncompresses headers sessionHandler.writeInbound(spdySynStreamFrame); - assertSynReply(sessionHandler.readInbound(), localStreamID, false, spdySynStreamFrame); - Assert.assertNull(sessionHandler.readInbound()); + assertSynReply(sessionHandler.readOutbound(), localStreamID, false, spdySynStreamFrame); + Assert.assertNull(sessionHandler.readOutbound()); SpdyHeadersFrame spdyHeadersFrame = new DefaultSpdyHeadersFrame(localStreamID); spdyHeadersFrame.addHeader("HEADER","test1"); spdyHeadersFrame.addHeader("HEADER","test2"); sessionHandler.writeInbound(spdyHeadersFrame); - assertHeaders(sessionHandler.readInbound(), localStreamID, spdyHeadersFrame); - Assert.assertNull(sessionHandler.readInbound()); + assertHeaders(sessionHandler.readOutbound(), localStreamID, spdyHeadersFrame); + Assert.assertNull(sessionHandler.readOutbound()); localStreamID += 2; // Check if session handler closed the streams using the number @@ -160,34 +160,34 @@ public class SpdySessionHandlerTest { spdySynStreamFrame.setLast(true); spdySynStreamFrame.setUnidirectional(true); sessionHandler.writeInbound(spdySynStreamFrame); - assertRstStream(sessionHandler.readInbound(), localStreamID, SpdyStreamStatus.REFUSED_STREAM); - Assert.assertNull(sessionHandler.readInbound()); + assertRstStream(sessionHandler.readOutbound(), localStreamID, SpdyStreamStatus.REFUSED_STREAM); + Assert.assertNull(sessionHandler.readOutbound()); // Check if session handler drops active streams if it receives // a RST_STREAM frame for that Stream-ID sessionHandler.writeInbound(new DefaultSpdyRstStreamFrame(remoteStreamID, 3)); - Assert.assertNull(sessionHandler.readInbound()); + Assert.assertNull(sessionHandler.readOutbound()); remoteStreamID += 2; // Check if session handler honors UNIDIRECTIONAL streams spdySynStreamFrame.setLast(false); sessionHandler.writeInbound(spdySynStreamFrame); - Assert.assertNull(sessionHandler.readInbound()); + Assert.assertNull(sessionHandler.readOutbound()); spdySynStreamFrame.setUnidirectional(false); // Check if session handler returns PROTOCOL_ERROR if it receives // multiple SYN_STREAM frames for the same active Stream-ID sessionHandler.writeInbound(spdySynStreamFrame); - assertRstStream(sessionHandler.readInbound(), localStreamID, SpdyStreamStatus.PROTOCOL_ERROR); - Assert.assertNull(sessionHandler.readInbound()); + assertRstStream(sessionHandler.readOutbound(), localStreamID, SpdyStreamStatus.PROTOCOL_ERROR); + Assert.assertNull(sessionHandler.readOutbound()); localStreamID += 2; // Check if session handler returns PROTOCOL_ERROR if it receives // a SYN_STREAM frame with an invalid Stream-ID spdySynStreamFrame.setStreamID(localStreamID - 1); sessionHandler.writeInbound(spdySynStreamFrame); - assertRstStream(sessionHandler.readInbound(), localStreamID - 1, SpdyStreamStatus.PROTOCOL_ERROR); - Assert.assertNull(sessionHandler.readInbound()); + assertRstStream(sessionHandler.readOutbound(), localStreamID - 1, SpdyStreamStatus.PROTOCOL_ERROR); + Assert.assertNull(sessionHandler.readOutbound()); spdySynStreamFrame.setStreamID(localStreamID); // Check if session handler correctly limits the number of @@ -195,62 +195,62 @@ public class SpdySessionHandlerTest { SpdySettingsFrame spdySettingsFrame = new DefaultSpdySettingsFrame(); spdySettingsFrame.setValue(SpdySettingsFrame.SETTINGS_MAX_CONCURRENT_STREAMS, 2); sessionHandler.writeInbound(spdySettingsFrame); - Assert.assertNull(sessionHandler.readInbound()); + Assert.assertNull(sessionHandler.readOutbound()); sessionHandler.writeInbound(spdySynStreamFrame); - assertRstStream(sessionHandler.readInbound(), localStreamID, SpdyStreamStatus.REFUSED_STREAM); - Assert.assertNull(sessionHandler.readInbound()); + assertRstStream(sessionHandler.readOutbound(), localStreamID, SpdyStreamStatus.REFUSED_STREAM); + Assert.assertNull(sessionHandler.readOutbound()); spdySettingsFrame.setValue(SpdySettingsFrame.SETTINGS_MAX_CONCURRENT_STREAMS, 4); sessionHandler.writeInbound(spdySettingsFrame); - Assert.assertNull(sessionHandler.readInbound()); + Assert.assertNull(sessionHandler.readOutbound()); sessionHandler.writeInbound(spdySynStreamFrame); - assertSynReply(sessionHandler.readInbound(), localStreamID, false, spdySynStreamFrame); - Assert.assertNull(sessionHandler.readInbound()); + assertSynReply(sessionHandler.readOutbound(), localStreamID, false, spdySynStreamFrame); + Assert.assertNull(sessionHandler.readOutbound()); // Check if session handler rejects HEADERS for closed streams int testStreamID = spdyDataFrame.getStreamID(); sessionHandler.writeInbound(spdyDataFrame); - assertDataFrame(sessionHandler.readInbound(), testStreamID, spdyDataFrame.isLast()); - Assert.assertNull(sessionHandler.readInbound()); + assertDataFrame(sessionHandler.readOutbound(), testStreamID, spdyDataFrame.isLast()); + Assert.assertNull(sessionHandler.readOutbound()); spdyHeadersFrame.setStreamID(testStreamID); sessionHandler.writeInbound(spdyHeadersFrame); - assertRstStream(sessionHandler.readInbound(), testStreamID, SpdyStreamStatus.INVALID_STREAM); - Assert.assertNull(sessionHandler.readInbound()); + assertRstStream(sessionHandler.readOutbound(), testStreamID, SpdyStreamStatus.INVALID_STREAM); + Assert.assertNull(sessionHandler.readOutbound()); // Check if session handler returns PROTOCOL_ERROR if it receives // an invalid HEADERS frame spdyHeadersFrame.setStreamID(localStreamID); spdyHeadersFrame.setInvalid(); sessionHandler.writeInbound(spdyHeadersFrame); - assertRstStream(sessionHandler.readInbound(), localStreamID, SpdyStreamStatus.PROTOCOL_ERROR); - Assert.assertNull(sessionHandler.readInbound()); + assertRstStream(sessionHandler.readOutbound(), localStreamID, SpdyStreamStatus.PROTOCOL_ERROR); + Assert.assertNull(sessionHandler.readOutbound()); // Check if session handler returns identical local PINGs sessionHandler.writeInbound(localPingFrame); - assertPing(sessionHandler.readInbound(), localPingFrame.getID()); - Assert.assertNull(sessionHandler.readInbound()); + assertPing(sessionHandler.readOutbound(), localPingFrame.getID()); + Assert.assertNull(sessionHandler.readOutbound()); // Check if session handler ignores un-initiated remote PINGs sessionHandler.writeInbound(remotePingFrame); - Assert.assertNull(sessionHandler.readInbound()); + Assert.assertNull(sessionHandler.readOutbound()); // Check if session handler sends a GOAWAY frame when closing sessionHandler.writeInbound(closeMessage); - assertGoAway(sessionHandler.readInbound(), localStreamID); - Assert.assertNull(sessionHandler.readInbound()); + assertGoAway(sessionHandler.readOutbound(), localStreamID); + Assert.assertNull(sessionHandler.readOutbound()); localStreamID += 2; // Check if session handler returns REFUSED_STREAM if it receives // SYN_STREAM frames after sending a GOAWAY frame spdySynStreamFrame.setStreamID(localStreamID); sessionHandler.writeInbound(spdySynStreamFrame); - assertRstStream(sessionHandler.readInbound(), localStreamID, SpdyStreamStatus.REFUSED_STREAM); - Assert.assertNull(sessionHandler.readInbound()); + assertRstStream(sessionHandler.readOutbound(), localStreamID, SpdyStreamStatus.REFUSED_STREAM); + Assert.assertNull(sessionHandler.readOutbound()); // Check if session handler ignores Data frames after sending // a GOAWAY frame spdyDataFrame.setStreamID(localStreamID); sessionHandler.writeInbound(spdyDataFrame); - Assert.assertNull(sessionHandler.readInbound()); + Assert.assertNull(sessionHandler.readOutbound()); sessionHandler.finish(); } diff --git a/transport/src/main/java/io/netty/channel/embedded/AbstractEmbeddedChannel.java b/transport/src/main/java/io/netty/channel/embedded/AbstractEmbeddedChannel.java index 30bfbf94eb..75d4025d5c 100644 --- a/transport/src/main/java/io/netty/channel/embedded/AbstractEmbeddedChannel.java +++ b/transport/src/main/java/io/netty/channel/embedded/AbstractEmbeddedChannel.java @@ -48,16 +48,19 @@ public abstract class AbstractEmbeddedChannel extends AbstractChannel { private final SocketAddress remoteAddress = new EmbeddedSocketAddress(); private final Queue lastInboundMessageBuffer = new ArrayDeque(); private final ChannelBuffer lastInboundByteBuffer = ChannelBuffers.dynamicBuffer(); + protected final Object lastOutboundBuffer; private Throwable lastException; private int state; // 0 = OPEN, 1 = ACTIVE, 2 = CLOSED - AbstractEmbeddedChannel(ChannelHandler... handlers) { + AbstractEmbeddedChannel(Object lastOutboundBuffer, ChannelHandler... handlers) { super(null, null); if (handlers == null) { throw new NullPointerException("handlers"); } + this.lastOutboundBuffer = lastOutboundBuffer; + int nHandlers = 0; boolean hasBuffer = false; ChannelPipeline p = pipeline(); diff --git a/transport/src/main/java/io/netty/channel/embedded/EmbeddedMessageChannel.java b/transport/src/main/java/io/netty/channel/embedded/EmbeddedMessageChannel.java index 44ee5885ee..6bfeee135d 100644 --- a/transport/src/main/java/io/netty/channel/embedded/EmbeddedMessageChannel.java +++ b/transport/src/main/java/io/netty/channel/embedded/EmbeddedMessageChannel.java @@ -8,10 +8,8 @@ import java.util.Queue; public class EmbeddedMessageChannel extends AbstractEmbeddedChannel { - private final Queue lastOutboundBuffer = new ArrayDeque(); - public EmbeddedMessageChannel(ChannelHandler... handlers) { - super(handlers); + super(new ArrayDeque(), handlers); } @Override @@ -23,12 +21,13 @@ public class EmbeddedMessageChannel extends AbstractEmbeddedChannel { return pipeline().inboundMessageBuffer(); } + @SuppressWarnings("unchecked") public Queue lastOutboundBuffer() { - return lastOutboundBuffer; + return (Queue) lastOutboundBuffer; } public Object readOutbound() { - return lastOutboundBuffer.poll(); + return lastOutboundBuffer().poll(); } public boolean writeInbound(Object msg) { @@ -58,7 +57,7 @@ public class EmbeddedMessageChannel extends AbstractEmbeddedChannel { if (o == null) { break; } - lastOutboundBuffer.add(o); + lastOutboundBuffer().add(o); } } } diff --git a/transport/src/main/java/io/netty/channel/embedded/EmbeddedStreamChannel.java b/transport/src/main/java/io/netty/channel/embedded/EmbeddedStreamChannel.java index 92332b3f28..40f94d3b00 100644 --- a/transport/src/main/java/io/netty/channel/embedded/EmbeddedStreamChannel.java +++ b/transport/src/main/java/io/netty/channel/embedded/EmbeddedStreamChannel.java @@ -7,10 +7,8 @@ import io.netty.channel.ChannelType; public class EmbeddedStreamChannel extends AbstractEmbeddedChannel { - private final ChannelBuffer lastOutboundBuffer = ChannelBuffers.dynamicBuffer(); - public EmbeddedStreamChannel(ChannelHandler... handlers) { - super(handlers); + super(ChannelBuffers.dynamicBuffer(), handlers); } @Override @@ -23,17 +21,17 @@ public class EmbeddedStreamChannel extends AbstractEmbeddedChannel { } public ChannelBuffer lastOutboundBuffer() { - return lastOutboundBuffer; + return (ChannelBuffer) lastOutboundBuffer; } public ChannelBuffer readOutbound() { - if (!lastOutboundBuffer.readable()) { + if (!lastOutboundBuffer().readable()) { return null; } try { - return lastOutboundBuffer.readBytes(lastOutboundBuffer.readableBytes()); + return lastOutboundBuffer().readBytes(lastOutboundBuffer().readableBytes()); } finally { - lastOutboundBuffer.clear(); + lastOutboundBuffer().clear(); } } @@ -59,9 +57,9 @@ public class EmbeddedStreamChannel extends AbstractEmbeddedChannel { @Override protected void doFlushByteBuffer(ChannelBuffer buf) throws Exception { - if (!lastOutboundBuffer.readable()) { - lastOutboundBuffer.discardReadBytes(); + if (!lastOutboundBuffer().readable()) { + lastOutboundBuffer().discardReadBytes(); } - lastOutboundBuffer.writeBytes(buf); + lastOutboundBuffer().writeBytes(buf); } }