diff --git a/codec/src/test/java/io/netty/handler/codec/bytes/ByteArrayDecoderTest.java b/codec/src/test/java/io/netty/handler/codec/bytes/ByteArrayDecoderTest.java index 8f374b2a57..ef3791f806 100644 --- a/codec/src/test/java/io/netty/handler/codec/bytes/ByteArrayDecoderTest.java +++ b/codec/src/test/java/io/netty/handler/codec/bytes/ByteArrayDecoderTest.java @@ -18,7 +18,7 @@ package io.netty.handler.codec.bytes; import static io.netty.buffer.ChannelBuffers.*; import static org.hamcrest.core.Is.*; import static org.junit.Assert.*; -import io.netty.handler.codec.embedder.DecoderEmbedder; +import io.netty.channel.embedded.EmbeddedMessageChannel; import java.util.Random; @@ -29,32 +29,32 @@ import org.junit.Test; */ public class ByteArrayDecoderTest { - private DecoderEmbedder embedder; + private EmbeddedMessageChannel ch; @Before public void setUp() { - embedder = new DecoderEmbedder(new ByteArrayDecoder()); + ch = new EmbeddedMessageChannel(new ByteArrayDecoder()); } @Test public void testDecode() { byte[] b = new byte[2048]; new Random().nextBytes(b); - embedder.offer(wrappedBuffer(b)); - assertThat(embedder.poll(), is(b)); + ch.writeInbound(wrappedBuffer(b)); + assertThat((byte[]) ch.readInbound(), is(b)); } @Test public void testDecodeEmpty() { byte[] b = new byte[0]; - embedder.offer(wrappedBuffer(b)); - assertThat(embedder.poll(), is(b)); + ch.writeInbound(wrappedBuffer(b)); + assertThat((byte[]) ch.readInbound(), is(b)); } @Test public void testDecodeOtherType() { String str = "Meep!"; - embedder.offer(str); - assertThat(embedder.poll(), is((Object) str)); + ch.writeInbound(str); + assertThat(ch.readInbound(), is((Object) str)); } } diff --git a/codec/src/test/java/io/netty/handler/codec/bytes/ByteArrayEncoderTest.java b/codec/src/test/java/io/netty/handler/codec/bytes/ByteArrayEncoderTest.java index 85139a82d4..03307e10b8 100644 --- a/codec/src/test/java/io/netty/handler/codec/bytes/ByteArrayEncoderTest.java +++ b/codec/src/test/java/io/netty/handler/codec/bytes/ByteArrayEncoderTest.java @@ -20,7 +20,7 @@ import static org.hamcrest.core.Is.*; import static org.hamcrest.core.IsNull.*; import static org.junit.Assert.*; import io.netty.buffer.ChannelBuffer; -import io.netty.handler.codec.embedder.EncoderEmbedder; +import io.netty.channel.embedded.EmbeddedMessageChannel; import java.util.Random; @@ -31,32 +31,32 @@ import org.junit.Test; */ public class ByteArrayEncoderTest { - private EncoderEmbedder embedder; + private EmbeddedMessageChannel ch; @Before public void setUp() { - embedder = new EncoderEmbedder(new ByteArrayEncoder()); + ch = new EmbeddedMessageChannel(new ByteArrayEncoder()); } @Test public void testEncode() { byte[] b = new byte[2048]; new Random().nextBytes(b); - embedder.offer(b); - assertThat(embedder.poll(), is(wrappedBuffer(b))); + ch.writeOutbound(b); + assertThat((ChannelBuffer) ch.readOutbound(), is(wrappedBuffer(b))); } @Test public void testEncodeEmpty() { byte[] b = new byte[0]; - embedder.offer(b); - assertThat(embedder.poll(), nullValue()); + ch.writeOutbound(b); + assertThat(ch.readOutbound(), nullValue()); } @Test public void testEncodeOtherType() { String str = "Meep!"; - embedder.offer(str); - assertThat(embedder.poll(), is((Object) str)); + ch.writeOutbound(str); + assertThat(ch.readOutbound(), is((Object) str)); } } diff --git a/codec/src/test/java/io/netty/handler/codec/frame/DelimiterBasedFrameDecoderTest.java b/codec/src/test/java/io/netty/handler/codec/frame/DelimiterBasedFrameDecoderTest.java index 2a3febad9b..09c51e8227 100644 --- a/codec/src/test/java/io/netty/handler/codec/frame/DelimiterBasedFrameDecoderTest.java +++ b/codec/src/test/java/io/netty/handler/codec/frame/DelimiterBasedFrameDecoderTest.java @@ -18,54 +18,53 @@ package io.netty.handler.codec.frame; import static org.junit.Assert.*; import io.netty.buffer.ChannelBuffer; import io.netty.buffer.ChannelBuffers; +import io.netty.channel.embedded.EmbeddedStreamChannel; import io.netty.handler.codec.DecoderException; import io.netty.handler.codec.DelimiterBasedFrameDecoder; import io.netty.handler.codec.Delimiters; import io.netty.handler.codec.TooLongFrameException; -import io.netty.handler.codec.embedder.DecoderEmbedder; import io.netty.util.CharsetUtil; import org.junit.Assert; import org.junit.Test; public class DelimiterBasedFrameDecoderTest { + @Test public void testFailSlowTooLongFrameRecovery() throws Exception { - DecoderEmbedder embedder = new DecoderEmbedder( + EmbeddedStreamChannel ch = new EmbeddedStreamChannel( new DelimiterBasedFrameDecoder(1, true, false, Delimiters.nulDelimiter())); for (int i = 0; i < 2; i ++) { - embedder.offer(ChannelBuffers.wrappedBuffer(new byte[] { 1, 2 })); + ch.writeInbound(ChannelBuffers.wrappedBuffer(new byte[] { 1, 2 })); try { - assertTrue(embedder.offer(ChannelBuffers.wrappedBuffer(new byte[] { 0 }))); - embedder.poll(); + assertTrue(ch.writeInbound(ChannelBuffers.wrappedBuffer(new byte[] { 0 }))); Assert.fail(DecoderException.class.getSimpleName() + " must be raised."); } catch (TooLongFrameException e) { // Expected } - embedder.offer(ChannelBuffers.wrappedBuffer(new byte[] { 'A', 0 })); - ChannelBuffer buf = embedder.poll(); + ch.writeInbound(ChannelBuffers.wrappedBuffer(new byte[] { 'A', 0 })); + ChannelBuffer buf = (ChannelBuffer) ch.readInbound(); Assert.assertEquals("A", buf.toString(CharsetUtil.ISO_8859_1)); } } @Test public void testFailFastTooLongFrameRecovery() throws Exception { - DecoderEmbedder embedder = new DecoderEmbedder( + EmbeddedStreamChannel ch = new EmbeddedStreamChannel( new DelimiterBasedFrameDecoder(1, Delimiters.nulDelimiter())); for (int i = 0; i < 2; i ++) { try { - assertTrue(embedder.offer(ChannelBuffers.wrappedBuffer(new byte[] { 1, 2 }))); - embedder.poll(); + assertTrue(ch.writeInbound(ChannelBuffers.wrappedBuffer(new byte[] { 1, 2 }))); Assert.fail(DecoderException.class.getSimpleName() + " must be raised."); } catch (TooLongFrameException e) { // Expected } - embedder.offer(ChannelBuffers.wrappedBuffer(new byte[] { 0, 'A', 0 })); - ChannelBuffer buf = embedder.poll(); + ch.writeInbound(ChannelBuffers.wrappedBuffer(new byte[] { 0, 'A', 0 })); + ChannelBuffer buf = (ChannelBuffer) ch.readInbound(); Assert.assertEquals("A", buf.toString(CharsetUtil.ISO_8859_1)); } } diff --git a/codec/src/test/java/io/netty/handler/codec/frame/LengthFieldBasedFrameDecoderTest.java b/codec/src/test/java/io/netty/handler/codec/frame/LengthFieldBasedFrameDecoderTest.java index c26fcdc335..8b20b44178 100644 --- a/codec/src/test/java/io/netty/handler/codec/frame/LengthFieldBasedFrameDecoderTest.java +++ b/codec/src/test/java/io/netty/handler/codec/frame/LengthFieldBasedFrameDecoderTest.java @@ -18,10 +18,10 @@ package io.netty.handler.codec.frame; import static org.junit.Assert.*; import io.netty.buffer.ChannelBuffer; import io.netty.buffer.ChannelBuffers; +import io.netty.channel.embedded.EmbeddedStreamChannel; import io.netty.handler.codec.DecoderException; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import io.netty.handler.codec.TooLongFrameException; -import io.netty.handler.codec.embedder.DecoderEmbedder; import io.netty.util.CharsetUtil; import org.junit.Assert; @@ -30,41 +30,39 @@ import org.junit.Test; public class LengthFieldBasedFrameDecoderTest { @Test public void testFailSlowTooLongFrameRecovery() throws Exception { - DecoderEmbedder embedder = new DecoderEmbedder( + EmbeddedStreamChannel ch = new EmbeddedStreamChannel( new LengthFieldBasedFrameDecoder(5, 0, 4, 0, 4, false)); for (int i = 0; i < 2; i ++) { - assertFalse(embedder.offer(ChannelBuffers.wrappedBuffer(new byte[] { 0, 0, 0, 2 }))); + assertFalse(ch.writeInbound(ChannelBuffers.wrappedBuffer(new byte[] { 0, 0, 0, 2 }))); try { - assertTrue(embedder.offer(ChannelBuffers.wrappedBuffer(new byte[] { 0, 0 }))); - embedder.poll(); + assertTrue(ch.writeInbound(ChannelBuffers.wrappedBuffer(new byte[] { 0, 0 }))); Assert.fail(DecoderException.class.getSimpleName() + " must be raised."); } catch (TooLongFrameException e) { // Expected } - embedder.offer(ChannelBuffers.wrappedBuffer(new byte[] { 0, 0, 0, 1, 'A' })); - ChannelBuffer buf = embedder.poll(); + ch.writeInbound(ChannelBuffers.wrappedBuffer(new byte[] { 0, 0, 0, 1, 'A' })); + ChannelBuffer buf = (ChannelBuffer) ch.readInbound(); Assert.assertEquals("A", buf.toString(CharsetUtil.ISO_8859_1)); } } @Test public void testFailFastTooLongFrameRecovery() throws Exception { - DecoderEmbedder embedder = new DecoderEmbedder( + EmbeddedStreamChannel ch = new EmbeddedStreamChannel( new LengthFieldBasedFrameDecoder(5, 0, 4, 0, 4)); for (int i = 0; i < 2; i ++) { try { - assertTrue(embedder.offer(ChannelBuffers.wrappedBuffer(new byte[] { 0, 0, 0, 2 }))); - embedder.poll(); + assertTrue(ch.writeInbound(ChannelBuffers.wrappedBuffer(new byte[] { 0, 0, 0, 2 }))); Assert.fail(DecoderException.class.getSimpleName() + " must be raised."); } catch (TooLongFrameException e) { // Expected } - embedder.offer(ChannelBuffers.wrappedBuffer(new byte[] { 0, 0, 0, 0, 0, 1, 'A' })); - ChannelBuffer buf = embedder.poll(); + ch.writeInbound(ChannelBuffers.wrappedBuffer(new byte[] { 0, 0, 0, 0, 0, 1, 'A' })); + ChannelBuffer buf = (ChannelBuffer) ch.readInbound(); Assert.assertEquals("A", buf.toString(CharsetUtil.ISO_8859_1)); } } diff --git a/codec/src/test/java/io/netty/handler/codec/marshalling/AbstractCompatibleMarshallingDecoderTest.java b/codec/src/test/java/io/netty/handler/codec/marshalling/AbstractCompatibleMarshallingDecoderTest.java index 515c8950a7..3e1a5b70b6 100644 --- a/codec/src/test/java/io/netty/handler/codec/marshalling/AbstractCompatibleMarshallingDecoderTest.java +++ b/codec/src/test/java/io/netty/handler/codec/marshalling/AbstractCompatibleMarshallingDecoderTest.java @@ -19,9 +19,9 @@ import static org.junit.Assert.*; import io.netty.buffer.ChannelBuffer; import io.netty.buffer.ChannelBuffers; import io.netty.channel.ChannelHandler; +import io.netty.channel.embedded.EmbeddedStreamChannel; import io.netty.handler.codec.CodecException; import io.netty.handler.codec.TooLongFrameException; -import io.netty.handler.codec.embedder.DecoderEmbedder; import java.io.ByteArrayOutputStream; import java.io.IOException; @@ -42,7 +42,7 @@ public abstract class AbstractCompatibleMarshallingDecoderTest { MarshallerFactory marshallerFactory = createMarshallerFactory(); MarshallingConfiguration configuration = createMarshallingConfig(); - DecoderEmbedder decoder = new DecoderEmbedder(createDecoder(Integer.MAX_VALUE)); + EmbeddedStreamChannel ch = new EmbeddedStreamChannel(createDecoder(Integer.MAX_VALUE)); ByteArrayOutputStream bout = new ByteArrayOutputStream(); Marshaller marshaller = marshallerFactory.createMarshaller(configuration); @@ -53,14 +53,14 @@ public abstract class AbstractCompatibleMarshallingDecoderTest { byte[] testBytes = bout.toByteArray(); - decoder.offer(input(testBytes)); - assertTrue(decoder.finish()); + ch.writeInbound(input(testBytes)); + assertTrue(ch.finish()); - String unmarshalled = (String) decoder.poll(); + String unmarshalled = (String) ch.readInbound(); Assert.assertEquals(testObject, unmarshalled); - Assert.assertNull(decoder.poll()); + Assert.assertNull(ch.readInbound()); } protected ChannelBuffer input(byte[] input) { @@ -72,7 +72,7 @@ public abstract class AbstractCompatibleMarshallingDecoderTest { MarshallerFactory marshallerFactory = createMarshallerFactory(); MarshallingConfiguration configuration = createMarshallingConfig(); - DecoderEmbedder decoder = new DecoderEmbedder(createDecoder(Integer.MAX_VALUE)); + EmbeddedStreamChannel ch = new EmbeddedStreamChannel(createDecoder(Integer.MAX_VALUE)); ByteArrayOutputStream bout = new ByteArrayOutputStream(); Marshaller marshaller = marshallerFactory.createMarshaller(configuration); @@ -86,16 +86,15 @@ public abstract class AbstractCompatibleMarshallingDecoderTest { ChannelBuffer buffer = input(testBytes); ChannelBuffer slice = buffer.readSlice(2); - decoder.offer(slice); - decoder.offer(buffer); - assertTrue(decoder.finish()); + ch.writeInbound(slice); + ch.writeInbound(buffer); + assertTrue(ch.finish()); - - String unmarshalled = (String) decoder.poll(); + String unmarshalled = (String) ch.readInbound(); Assert.assertEquals(testObject, unmarshalled); - Assert.assertNull(decoder.poll()); + Assert.assertNull(ch.readInbound()); } @Test @@ -104,7 +103,7 @@ public abstract class AbstractCompatibleMarshallingDecoderTest { MarshallingConfiguration configuration = createMarshallingConfig(); ChannelHandler mDecoder = createDecoder(4); - DecoderEmbedder decoder = new DecoderEmbedder(mDecoder); + EmbeddedStreamChannel ch = new EmbeddedStreamChannel(mDecoder); ByteArrayOutputStream bout = new ByteArrayOutputStream(); Marshaller marshaller = marshallerFactory.createMarshaller(configuration); @@ -114,10 +113,8 @@ public abstract class AbstractCompatibleMarshallingDecoderTest { marshaller.close(); byte[] testBytes = bout.toByteArray(); - - decoder.offer(input(testBytes)); try { - decoder.poll(); + ch.writeInbound(input(testBytes)); fail(); } catch (CodecException e) { assertEquals(TooLongFrameException.class, e.getClass()); diff --git a/codec/src/test/java/io/netty/handler/codec/marshalling/AbstractCompatibleMarshallingEncoderTest.java b/codec/src/test/java/io/netty/handler/codec/marshalling/AbstractCompatibleMarshallingEncoderTest.java index e0e7dd37bc..fff310933b 100644 --- a/codec/src/test/java/io/netty/handler/codec/marshalling/AbstractCompatibleMarshallingEncoderTest.java +++ b/codec/src/test/java/io/netty/handler/codec/marshalling/AbstractCompatibleMarshallingEncoderTest.java @@ -17,7 +17,7 @@ package io.netty.handler.codec.marshalling; import io.netty.buffer.ChannelBuffer; import io.netty.channel.ChannelHandler; -import io.netty.handler.codec.embedder.EncoderEmbedder; +import io.netty.channel.embedded.EmbeddedStreamChannel; import java.io.IOException; @@ -38,12 +38,12 @@ public abstract class AbstractCompatibleMarshallingEncoderTest { final MarshallerFactory marshallerFactory = createMarshallerFactory(); final MarshallingConfiguration configuration = createMarshallingConfig(); - EncoderEmbedder encoder = new EncoderEmbedder(createEncoder()); + EmbeddedStreamChannel ch = new EmbeddedStreamChannel(createEncoder()); - encoder.offer(testObject); - Assert.assertTrue(encoder.finish()); + ch.writeOutbound(testObject); + Assert.assertTrue(ch.finish()); - ChannelBuffer buffer = encoder.poll(); + ChannelBuffer buffer = ch.readOutbound(); Unmarshaller unmarshaller = marshallerFactory.createUnmarshaller(configuration); unmarshaller.start(Marshalling.createByteInput(truncate(buffer).nioBuffer())); @@ -52,7 +52,7 @@ public abstract class AbstractCompatibleMarshallingEncoderTest { Assert.assertEquals(-1, unmarshaller.read()); - Assert.assertNull(encoder.poll()); + Assert.assertNull(ch.readOutbound()); unmarshaller.finish(); unmarshaller.close(); diff --git a/codec/src/test/java/io/netty/handler/codec/protobuf/ProtobufVarint32FrameDecoderTest.java b/codec/src/test/java/io/netty/handler/codec/protobuf/ProtobufVarint32FrameDecoderTest.java index b8a9e38191..93e503b979 100644 --- a/codec/src/test/java/io/netty/handler/codec/protobuf/ProtobufVarint32FrameDecoderTest.java +++ b/codec/src/test/java/io/netty/handler/codec/protobuf/ProtobufVarint32FrameDecoderTest.java @@ -20,30 +20,30 @@ import static org.hamcrest.core.Is.*; import static org.hamcrest.core.IsNull.*; import static org.junit.Assert.*; import io.netty.buffer.ChannelBuffer; -import io.netty.handler.codec.embedder.DecoderEmbedder; +import io.netty.channel.embedded.EmbeddedStreamChannel; import org.junit.Before; import org.junit.Test; public class ProtobufVarint32FrameDecoderTest { - private DecoderEmbedder embedder; + private EmbeddedStreamChannel ch; @Before public void setUp() { - embedder = new DecoderEmbedder( - new ProtobufVarint32FrameDecoder()); + ch = new EmbeddedStreamChannel(new ProtobufVarint32FrameDecoder()); } @Test public void testTinyDecode() { byte[] b = { 4, 1, 1, 1, 1 }; - embedder.offer(wrappedBuffer(b, 0, 1)); - assertThat(embedder.poll(), is(nullValue())); - embedder.offer(wrappedBuffer(b, 1, 2)); - assertThat(embedder.poll(), is(nullValue())); - embedder.offer(wrappedBuffer(b, 3, b.length - 3)); - assertThat(embedder.poll(), + ch.writeInbound(wrappedBuffer(b, 0, 1)); + assertThat(ch.readInbound(), is(nullValue())); + ch.writeInbound(wrappedBuffer(b, 1, 2)); + assertThat(ch.readInbound(), is(nullValue())); + ch.writeInbound(wrappedBuffer(b, 3, b.length - 3)); + assertThat( + (ChannelBuffer) ch.readInbound(), is(wrappedBuffer(new byte[] { 1, 1, 1, 1 }))); } @@ -55,12 +55,11 @@ public class ProtobufVarint32FrameDecoderTest { } b[0] = -2; b[1] = 15; - embedder.offer(wrappedBuffer(b, 0, 127)); - assertThat(embedder.poll(), is(nullValue())); - embedder.offer(wrappedBuffer(b, 127, 600)); - assertThat(embedder.poll(), is(nullValue())); - embedder.offer(wrappedBuffer(b, 727, b.length - 727)); - assertThat(embedder.poll(), is(wrappedBuffer(b, 2, b.length - 2))); + ch.writeInbound(wrappedBuffer(b, 0, 127)); + assertThat(ch.readInbound(), is(nullValue())); + ch.writeInbound(wrappedBuffer(b, 127, 600)); + assertThat(ch.readInbound(), is(nullValue())); + ch.writeInbound(wrappedBuffer(b, 727, b.length - 727)); + assertThat((ChannelBuffer) ch.readInbound(), is(wrappedBuffer(b, 2, b.length - 2))); } - } diff --git a/codec/src/test/java/io/netty/handler/codec/protobuf/ProtobufVarint32LengthFieldPrependerTest.java b/codec/src/test/java/io/netty/handler/codec/protobuf/ProtobufVarint32LengthFieldPrependerTest.java index 359a97a516..2e714fc455 100644 --- a/codec/src/test/java/io/netty/handler/codec/protobuf/ProtobufVarint32LengthFieldPrependerTest.java +++ b/codec/src/test/java/io/netty/handler/codec/protobuf/ProtobufVarint32LengthFieldPrependerTest.java @@ -18,27 +18,25 @@ package io.netty.handler.codec.protobuf; import static io.netty.buffer.ChannelBuffers.*; import static org.hamcrest.core.Is.*; import static org.junit.Assert.*; -import io.netty.buffer.ChannelBuffer; -import io.netty.handler.codec.embedder.EncoderEmbedder; +import io.netty.channel.embedded.EmbeddedStreamChannel; import org.junit.Before; import org.junit.Test; public class ProtobufVarint32LengthFieldPrependerTest { - private EncoderEmbedder embedder; + private EmbeddedStreamChannel ch; @Before public void setUp() { - embedder = new EncoderEmbedder( - new ProtobufVarint32LengthFieldPrepender()); + ch = new EmbeddedStreamChannel(new ProtobufVarint32LengthFieldPrepender()); } @Test public void testTinyEncode() { byte[] b = { 4, 1, 1, 1, 1 }; - embedder.offer(wrappedBuffer(b, 1, b.length - 1)); - assertThat(embedder.poll(), is(wrappedBuffer(b))); + ch.writeOutbound(wrappedBuffer(b, 1, b.length - 1)); + assertThat(ch.readOutbound(), is(wrappedBuffer(b))); } @Test @@ -49,7 +47,7 @@ public class ProtobufVarint32LengthFieldPrependerTest { } b[0] = -2; b[1] = 15; - embedder.offer(wrappedBuffer(b, 2, b.length - 2)); - assertThat(embedder.poll(), is(wrappedBuffer(b))); + ch.writeOutbound(wrappedBuffer(b, 2, b.length - 2)); + assertThat(ch.readOutbound(), is(wrappedBuffer(b))); } } diff --git a/transport/src/main/java/io/netty/channel/embedded/AbstractEmbeddedChannel.java b/transport/src/main/java/io/netty/channel/embedded/AbstractEmbeddedChannel.java index a6775b32bf..60bb9d86ed 100644 --- a/transport/src/main/java/io/netty/channel/embedded/AbstractEmbeddedChannel.java +++ b/transport/src/main/java/io/netty/channel/embedded/AbstractEmbeddedChannel.java @@ -31,6 +31,8 @@ import io.netty.channel.ChannelOutboundHandler; import io.netty.channel.ChannelPipeline; import io.netty.channel.DefaultChannelConfig; import io.netty.channel.EventLoop; +import io.netty.logging.InternalLogger; +import io.netty.logging.InternalLoggerFactory; import java.net.SocketAddress; import java.util.ArrayDeque; @@ -38,6 +40,8 @@ import java.util.Queue; public abstract class AbstractEmbeddedChannel extends AbstractChannel { + private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractEmbeddedChannel.class); + private final EmbeddedEventLoop loop = new EmbeddedEventLoop(); private final ChannelConfig config = new DefaultChannelConfig(); private final SocketAddress localAddress = new EmbeddedSocketAddress(); @@ -205,7 +209,13 @@ public abstract class AbstractEmbeddedChannel extends AbstractChannel { @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { - lastException = cause; + if (lastException == null) { + lastException = cause; + } else { + logger.warn( + "More than one exception was raised. " + + "Will report only the first one and log others.", cause); + } } } diff --git a/transport/src/main/java/io/netty/channel/embedded/EmbeddedMessageChannel.java b/transport/src/main/java/io/netty/channel/embedded/EmbeddedMessageChannel.java index d3e5ca5a0d..f642d1596f 100644 --- a/transport/src/main/java/io/netty/channel/embedded/EmbeddedMessageChannel.java +++ b/transport/src/main/java/io/netty/channel/embedded/EmbeddedMessageChannel.java @@ -45,6 +45,13 @@ public class EmbeddedMessageChannel extends AbstractEmbeddedChannel { return !lastOutboundBuffer().isEmpty(); } + public boolean finish() { + close(); + checkException(); + return lastInboundByteBuffer().readable() || !lastInboundMessageBuffer().isEmpty() || + !lastOutboundBuffer().isEmpty(); + } + @Override protected void doFlushMessageBuffer(Queue buf) throws Exception { for (;;) { diff --git a/transport/src/main/java/io/netty/channel/embedded/EmbeddedStreamChannel.java b/transport/src/main/java/io/netty/channel/embedded/EmbeddedStreamChannel.java index 7cad3a69c8..089ad44293 100644 --- a/transport/src/main/java/io/netty/channel/embedded/EmbeddedStreamChannel.java +++ b/transport/src/main/java/io/netty/channel/embedded/EmbeddedStreamChannel.java @@ -51,6 +51,13 @@ public class EmbeddedStreamChannel extends AbstractEmbeddedChannel { return lastOutboundBuffer().readable(); } + public boolean finish() { + close(); + checkException(); + return lastInboundByteBuffer().readable() || !lastInboundMessageBuffer().isEmpty() || + lastOutboundBuffer().readable(); + } + @Override protected void doFlushByteBuffer(ChannelBuffer buf) throws Exception { if (!lastOutboundBuffer.readable()) {