diff --git a/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2FrameCodecTest.java b/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2FrameCodecTest.java index a28be77c2c..047f95c2de 100644 --- a/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2FrameCodecTest.java +++ b/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2FrameCodecTest.java @@ -154,6 +154,7 @@ public class Http2FrameCodecTest { assertEquals(expected, inboundData); assertEquals(1, inboundData.refCnt()); expected.release(); + inboundData.release(); assertNull(inboundHandler.readInbound()); inboundHandler.writeOutbound(new DefaultHttp2HeadersFrame(response, false).streamId(stream.id())); @@ -198,25 +199,25 @@ public class Http2FrameCodecTest { assertNotNull(stream); assertEquals(State.OPEN, stream.state()); - Http2StreamActiveEvent activeEvent = inboundHandler.readInboundMessagesAndEvents(); + Http2StreamActiveEvent activeEvent = inboundHandler.readInboundMessageOrUserEvent(); assertNotNull(activeEvent); assertEquals(stream.id(), activeEvent.streamId()); Http2HeadersFrame expectedHeaders = new DefaultHttp2HeadersFrame(request, false, 31).streamId(stream.id()); - Http2HeadersFrame actualHeaders = inboundHandler.readInboundMessagesAndEvents(); + Http2HeadersFrame actualHeaders = inboundHandler.readInboundMessageOrUserEvent(); assertEquals(expectedHeaders, actualHeaders); frameListener.onRstStreamRead(http2HandlerCtx, 3, Http2Error.NO_ERROR.code()); Http2ResetFrame expectedRst = new DefaultHttp2ResetFrame(Http2Error.NO_ERROR).streamId(stream.id()); - Http2ResetFrame actualRst = inboundHandler.readInboundMessagesAndEvents(); + Http2ResetFrame actualRst = inboundHandler.readInboundMessageOrUserEvent(); assertEquals(expectedRst, actualRst); - Http2StreamClosedEvent closedEvent = inboundHandler.readInboundMessagesAndEvents(); + Http2StreamClosedEvent closedEvent = inboundHandler.readInboundMessageOrUserEvent(); assertNotNull(closedEvent); assertEquals(stream.id(), closedEvent.streamId()); - assertNull(inboundHandler.readInboundMessagesAndEvents()); + assertNull(inboundHandler.readInboundMessageOrUserEvent()); } @Test @@ -370,14 +371,14 @@ public class Http2FrameCodecTest { Http2Stream stream = framingCodec.connectionHandler().connection().stream(3); assertNotNull(stream); - Http2StreamActiveEvent activeEvent = inboundHandler.readInboundMessagesAndEvents(); + Http2StreamActiveEvent activeEvent = inboundHandler.readInboundMessageOrUserEvent(); assertNotNull(activeEvent); assertEquals(stream.id(), activeEvent.streamId()); StreamException streamEx = new StreamException(3, Http2Error.INTERNAL_ERROR, "foo"); framingCodec.connectionHandler().onError(http2HandlerCtx, streamEx); - Http2HeadersFrame headersFrame = inboundHandler.readInboundMessagesAndEvents(); + Http2HeadersFrame headersFrame = inboundHandler.readInboundMessageOrUserEvent(); assertNotNull(headersFrame); try { @@ -387,11 +388,11 @@ public class Http2FrameCodecTest { assertEquals(streamEx, e); } - Http2StreamClosedEvent closedEvent = inboundHandler.readInboundMessagesAndEvents(); + Http2StreamClosedEvent closedEvent = inboundHandler.readInboundMessageOrUserEvent(); assertNotNull(closedEvent); assertEquals(stream.id(), closedEvent.streamId()); - assertNull(inboundHandler.readInboundMessagesAndEvents()); + assertNull(inboundHandler.readInboundMessageOrUserEvent()); } @Test diff --git a/codec-http2/src/test/java/io/netty/handler/codec/http2/LastInboundHandler.java b/codec-http2/src/test/java/io/netty/handler/codec/http2/LastInboundHandler.java index 580392faf2..9a7947030e 100644 --- a/codec-http2/src/test/java/io/netty/handler/codec/http2/LastInboundHandler.java +++ b/codec-http2/src/test/java/io/netty/handler/codec/http2/LastInboundHandler.java @@ -23,8 +23,8 @@ import io.netty.channel.embedded.EmbeddedChannel; import io.netty.util.ReferenceCountUtil; import io.netty.util.internal.PlatformDependent; -import java.util.ArrayDeque; -import java.util.Queue; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.locks.LockSupport; import static java.util.concurrent.TimeUnit.MILLISECONDS; @@ -33,9 +33,7 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS; * Channel handler that allows to easily access inbound messages. */ public class LastInboundHandler extends ChannelDuplexHandler { - private final Queue inboundMessages = new ArrayDeque(); - private final Queue userEvents = new ArrayDeque(); - private final Queue inboundMessagesAndUserEvents = new ArrayDeque(); + private final List queue = new ArrayList(); private Throwable lastException; private ChannelHandlerContext ctx; private boolean channelActive; @@ -69,14 +67,12 @@ public class LastInboundHandler extends ChannelDuplexHandler { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { - inboundMessages.add(msg); - inboundMessagesAndUserEvents.add(msg); + queue.add(msg); } @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { - userEvents.add(evt); - inboundMessagesAndUserEvents.add(evt); + queue.add(new UserEvent(evt)); } @Override @@ -99,11 +95,15 @@ public class LastInboundHandler extends ChannelDuplexHandler { @SuppressWarnings("unchecked") public T readInbound() { - T message = (T) inboundMessages.poll(); - if (message == inboundMessagesAndUserEvents.peek()) { - inboundMessagesAndUserEvents.poll(); + for (int i = 0; i < queue.size(); i++) { + Object o = queue.get(i); + if (!(o instanceof UserEvent)) { + queue.remove(i); + return (T) o; + } } - return message; + + return null; } public T blockingReadInbound() { @@ -116,27 +116,30 @@ public class LastInboundHandler extends ChannelDuplexHandler { @SuppressWarnings("unchecked") public T readUserEvent() { - T message = (T) userEvents.poll(); - if (message == inboundMessagesAndUserEvents.peek()) { - inboundMessagesAndUserEvents.poll(); + for (int i = 0; i < queue.size(); i++) { + Object o = queue.get(i); + if (o instanceof UserEvent) { + queue.remove(i); + return (T) ((UserEvent) o).evt; + } } - return message; + + return null; } /** * Useful to test order of events and messages. */ @SuppressWarnings("unchecked") - public T readInboundMessagesAndEvents() { - T message = (T) inboundMessagesAndUserEvents.poll(); - - if (message == inboundMessages.peek()) { - inboundMessages.poll(); - } else if (message == userEvents.peek()) { - userEvents.poll(); + public T readInboundMessageOrUserEvent() { + if (queue.isEmpty()) { + return null; } - - return message; + Object o = queue.remove(0); + if (o instanceof UserEvent) { + return (T) ((UserEvent) o).evt; + } + return (T) o; } public void writeOutbound(Object... msgs) throws Exception { @@ -153,7 +156,7 @@ public class LastInboundHandler extends ChannelDuplexHandler { public void finishAndReleaseAll() throws Exception { checkException(); Object o; - while ((o = readInboundMessagesAndEvents()) != null) { + while ((o = readInboundMessageOrUserEvent()) != null) { ReferenceCountUtil.release(o); } } @@ -161,4 +164,12 @@ public class LastInboundHandler extends ChannelDuplexHandler { public Channel channel() { return ctx.channel(); } + + private static final class UserEvent { + private final Object evt; + + UserEvent(Object evt) { + this.evt = evt; + } + } }