From e5070b726613cc6c1297ce2cc2dd5ab377cf0fbc Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Fri, 18 Nov 2016 11:39:21 +0000 Subject: [PATCH] [#6023] Ensure LastInboundHandler correctly handle different events / data Motivation: LastInboundHandler maintains 2 queues which may contain the same data and tries to match these up when you read elements out of it. Because of this it can happen that you remove an element only out of one queue and so double free stuff later. Modifications: Just use one "queue" to store things. Result: Not possible to only remove things from one queue and so get into trouble later when release everything that sits in the handler. --- .../codec/http2/Http2FrameCodecTest.java | 19 +++--- .../codec/http2/LastInboundHandler.java | 65 +++++++++++-------- 2 files changed, 48 insertions(+), 36 deletions(-) diff --git a/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2FrameCodecTest.java b/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2FrameCodecTest.java index a28be77c2c..047f95c2de 100644 --- a/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2FrameCodecTest.java +++ b/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2FrameCodecTest.java @@ -154,6 +154,7 @@ public class Http2FrameCodecTest { assertEquals(expected, inboundData); assertEquals(1, inboundData.refCnt()); expected.release(); + inboundData.release(); assertNull(inboundHandler.readInbound()); inboundHandler.writeOutbound(new DefaultHttp2HeadersFrame(response, false).streamId(stream.id())); @@ -198,25 +199,25 @@ public class Http2FrameCodecTest { assertNotNull(stream); assertEquals(State.OPEN, stream.state()); - Http2StreamActiveEvent activeEvent = inboundHandler.readInboundMessagesAndEvents(); + Http2StreamActiveEvent activeEvent = inboundHandler.readInboundMessageOrUserEvent(); assertNotNull(activeEvent); assertEquals(stream.id(), activeEvent.streamId()); Http2HeadersFrame expectedHeaders = new DefaultHttp2HeadersFrame(request, false, 31).streamId(stream.id()); - Http2HeadersFrame actualHeaders = inboundHandler.readInboundMessagesAndEvents(); + Http2HeadersFrame actualHeaders = inboundHandler.readInboundMessageOrUserEvent(); assertEquals(expectedHeaders, actualHeaders); frameListener.onRstStreamRead(http2HandlerCtx, 3, Http2Error.NO_ERROR.code()); Http2ResetFrame expectedRst = new DefaultHttp2ResetFrame(Http2Error.NO_ERROR).streamId(stream.id()); - Http2ResetFrame actualRst = inboundHandler.readInboundMessagesAndEvents(); + Http2ResetFrame actualRst = inboundHandler.readInboundMessageOrUserEvent(); assertEquals(expectedRst, actualRst); - Http2StreamClosedEvent closedEvent = inboundHandler.readInboundMessagesAndEvents(); + Http2StreamClosedEvent closedEvent = inboundHandler.readInboundMessageOrUserEvent(); assertNotNull(closedEvent); assertEquals(stream.id(), closedEvent.streamId()); - assertNull(inboundHandler.readInboundMessagesAndEvents()); + assertNull(inboundHandler.readInboundMessageOrUserEvent()); } @Test @@ -370,14 +371,14 @@ public class Http2FrameCodecTest { Http2Stream stream = framingCodec.connectionHandler().connection().stream(3); assertNotNull(stream); - Http2StreamActiveEvent activeEvent = inboundHandler.readInboundMessagesAndEvents(); + Http2StreamActiveEvent activeEvent = inboundHandler.readInboundMessageOrUserEvent(); assertNotNull(activeEvent); assertEquals(stream.id(), activeEvent.streamId()); StreamException streamEx = new StreamException(3, Http2Error.INTERNAL_ERROR, "foo"); framingCodec.connectionHandler().onError(http2HandlerCtx, streamEx); - Http2HeadersFrame headersFrame = inboundHandler.readInboundMessagesAndEvents(); + Http2HeadersFrame headersFrame = inboundHandler.readInboundMessageOrUserEvent(); assertNotNull(headersFrame); try { @@ -387,11 +388,11 @@ public class Http2FrameCodecTest { assertEquals(streamEx, e); } - Http2StreamClosedEvent closedEvent = inboundHandler.readInboundMessagesAndEvents(); + Http2StreamClosedEvent closedEvent = inboundHandler.readInboundMessageOrUserEvent(); assertNotNull(closedEvent); assertEquals(stream.id(), closedEvent.streamId()); - assertNull(inboundHandler.readInboundMessagesAndEvents()); + assertNull(inboundHandler.readInboundMessageOrUserEvent()); } @Test diff --git a/codec-http2/src/test/java/io/netty/handler/codec/http2/LastInboundHandler.java b/codec-http2/src/test/java/io/netty/handler/codec/http2/LastInboundHandler.java index 580392faf2..9a7947030e 100644 --- a/codec-http2/src/test/java/io/netty/handler/codec/http2/LastInboundHandler.java +++ b/codec-http2/src/test/java/io/netty/handler/codec/http2/LastInboundHandler.java @@ -23,8 +23,8 @@ import io.netty.channel.embedded.EmbeddedChannel; import io.netty.util.ReferenceCountUtil; import io.netty.util.internal.PlatformDependent; -import java.util.ArrayDeque; -import java.util.Queue; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.locks.LockSupport; import static java.util.concurrent.TimeUnit.MILLISECONDS; @@ -33,9 +33,7 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS; * Channel handler that allows to easily access inbound messages. */ public class LastInboundHandler extends ChannelDuplexHandler { - private final Queue inboundMessages = new ArrayDeque(); - private final Queue userEvents = new ArrayDeque(); - private final Queue inboundMessagesAndUserEvents = new ArrayDeque(); + private final List queue = new ArrayList(); private Throwable lastException; private ChannelHandlerContext ctx; private boolean channelActive; @@ -69,14 +67,12 @@ public class LastInboundHandler extends ChannelDuplexHandler { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { - inboundMessages.add(msg); - inboundMessagesAndUserEvents.add(msg); + queue.add(msg); } @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { - userEvents.add(evt); - inboundMessagesAndUserEvents.add(evt); + queue.add(new UserEvent(evt)); } @Override @@ -99,11 +95,15 @@ public class LastInboundHandler extends ChannelDuplexHandler { @SuppressWarnings("unchecked") public T readInbound() { - T message = (T) inboundMessages.poll(); - if (message == inboundMessagesAndUserEvents.peek()) { - inboundMessagesAndUserEvents.poll(); + for (int i = 0; i < queue.size(); i++) { + Object o = queue.get(i); + if (!(o instanceof UserEvent)) { + queue.remove(i); + return (T) o; + } } - return message; + + return null; } public T blockingReadInbound() { @@ -116,27 +116,30 @@ public class LastInboundHandler extends ChannelDuplexHandler { @SuppressWarnings("unchecked") public T readUserEvent() { - T message = (T) userEvents.poll(); - if (message == inboundMessagesAndUserEvents.peek()) { - inboundMessagesAndUserEvents.poll(); + for (int i = 0; i < queue.size(); i++) { + Object o = queue.get(i); + if (o instanceof UserEvent) { + queue.remove(i); + return (T) ((UserEvent) o).evt; + } } - return message; + + return null; } /** * Useful to test order of events and messages. */ @SuppressWarnings("unchecked") - public T readInboundMessagesAndEvents() { - T message = (T) inboundMessagesAndUserEvents.poll(); - - if (message == inboundMessages.peek()) { - inboundMessages.poll(); - } else if (message == userEvents.peek()) { - userEvents.poll(); + public T readInboundMessageOrUserEvent() { + if (queue.isEmpty()) { + return null; } - - return message; + Object o = queue.remove(0); + if (o instanceof UserEvent) { + return (T) ((UserEvent) o).evt; + } + return (T) o; } public void writeOutbound(Object... msgs) throws Exception { @@ -153,7 +156,7 @@ public class LastInboundHandler extends ChannelDuplexHandler { public void finishAndReleaseAll() throws Exception { checkException(); Object o; - while ((o = readInboundMessagesAndEvents()) != null) { + while ((o = readInboundMessageOrUserEvent()) != null) { ReferenceCountUtil.release(o); } } @@ -161,4 +164,12 @@ public class LastInboundHandler extends ChannelDuplexHandler { public Channel channel() { return ctx.channel(); } + + private static final class UserEvent { + private final Object evt; + + UserEvent(Object evt) { + this.evt = evt; + } + } }