Ported all tests in netty-codec to the new embedder

This commit is contained in:
Trustin Lee 2012-06-07 20:17:26 +09:00
parent 7bc10f2eba
commit 3442ff90e8
11 changed files with 107 additions and 92 deletions

View File

@ -18,7 +18,7 @@ package io.netty.handler.codec.bytes;
import static io.netty.buffer.ChannelBuffers.*; import static io.netty.buffer.ChannelBuffers.*;
import static org.hamcrest.core.Is.*; import static org.hamcrest.core.Is.*;
import static org.junit.Assert.*; import static org.junit.Assert.*;
import io.netty.handler.codec.embedder.DecoderEmbedder; import io.netty.channel.embedded.EmbeddedMessageChannel;
import java.util.Random; import java.util.Random;
@ -29,32 +29,32 @@ import org.junit.Test;
*/ */
public class ByteArrayDecoderTest { public class ByteArrayDecoderTest {
private DecoderEmbedder<byte[]> embedder; private EmbeddedMessageChannel ch;
@Before @Before
public void setUp() { public void setUp() {
embedder = new DecoderEmbedder<byte[]>(new ByteArrayDecoder()); ch = new EmbeddedMessageChannel(new ByteArrayDecoder());
} }
@Test @Test
public void testDecode() { public void testDecode() {
byte[] b = new byte[2048]; byte[] b = new byte[2048];
new Random().nextBytes(b); new Random().nextBytes(b);
embedder.offer(wrappedBuffer(b)); ch.writeInbound(wrappedBuffer(b));
assertThat(embedder.poll(), is(b)); assertThat((byte[]) ch.readInbound(), is(b));
} }
@Test @Test
public void testDecodeEmpty() { public void testDecodeEmpty() {
byte[] b = new byte[0]; byte[] b = new byte[0];
embedder.offer(wrappedBuffer(b)); ch.writeInbound(wrappedBuffer(b));
assertThat(embedder.poll(), is(b)); assertThat((byte[]) ch.readInbound(), is(b));
} }
@Test @Test
public void testDecodeOtherType() { public void testDecodeOtherType() {
String str = "Meep!"; String str = "Meep!";
embedder.offer(str); ch.writeInbound(str);
assertThat(embedder.poll(), is((Object) str)); assertThat(ch.readInbound(), is((Object) str));
} }
} }

View File

@ -20,7 +20,7 @@ import static org.hamcrest.core.Is.*;
import static org.hamcrest.core.IsNull.*; import static org.hamcrest.core.IsNull.*;
import static org.junit.Assert.*; import static org.junit.Assert.*;
import io.netty.buffer.ChannelBuffer; import io.netty.buffer.ChannelBuffer;
import io.netty.handler.codec.embedder.EncoderEmbedder; import io.netty.channel.embedded.EmbeddedMessageChannel;
import java.util.Random; import java.util.Random;
@ -31,32 +31,32 @@ import org.junit.Test;
*/ */
public class ByteArrayEncoderTest { public class ByteArrayEncoderTest {
private EncoderEmbedder<ChannelBuffer> embedder; private EmbeddedMessageChannel ch;
@Before @Before
public void setUp() { public void setUp() {
embedder = new EncoderEmbedder<ChannelBuffer>(new ByteArrayEncoder()); ch = new EmbeddedMessageChannel(new ByteArrayEncoder());
} }
@Test @Test
public void testEncode() { public void testEncode() {
byte[] b = new byte[2048]; byte[] b = new byte[2048];
new Random().nextBytes(b); new Random().nextBytes(b);
embedder.offer(b); ch.writeOutbound(b);
assertThat(embedder.poll(), is(wrappedBuffer(b))); assertThat((ChannelBuffer) ch.readOutbound(), is(wrappedBuffer(b)));
} }
@Test @Test
public void testEncodeEmpty() { public void testEncodeEmpty() {
byte[] b = new byte[0]; byte[] b = new byte[0];
embedder.offer(b); ch.writeOutbound(b);
assertThat(embedder.poll(), nullValue()); assertThat(ch.readOutbound(), nullValue());
} }
@Test @Test
public void testEncodeOtherType() { public void testEncodeOtherType() {
String str = "Meep!"; String str = "Meep!";
embedder.offer(str); ch.writeOutbound(str);
assertThat(embedder.poll(), is((Object) str)); assertThat(ch.readOutbound(), is((Object) str));
} }
} }

View File

@ -18,54 +18,53 @@ package io.netty.handler.codec.frame;
import static org.junit.Assert.*; import static org.junit.Assert.*;
import io.netty.buffer.ChannelBuffer; import io.netty.buffer.ChannelBuffer;
import io.netty.buffer.ChannelBuffers; import io.netty.buffer.ChannelBuffers;
import io.netty.channel.embedded.EmbeddedStreamChannel;
import io.netty.handler.codec.DecoderException; import io.netty.handler.codec.DecoderException;
import io.netty.handler.codec.DelimiterBasedFrameDecoder; import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.Delimiters; import io.netty.handler.codec.Delimiters;
import io.netty.handler.codec.TooLongFrameException; import io.netty.handler.codec.TooLongFrameException;
import io.netty.handler.codec.embedder.DecoderEmbedder;
import io.netty.util.CharsetUtil; import io.netty.util.CharsetUtil;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
public class DelimiterBasedFrameDecoderTest { public class DelimiterBasedFrameDecoderTest {
@Test @Test
public void testFailSlowTooLongFrameRecovery() throws Exception { public void testFailSlowTooLongFrameRecovery() throws Exception {
DecoderEmbedder<ChannelBuffer> embedder = new DecoderEmbedder<ChannelBuffer>( EmbeddedStreamChannel ch = new EmbeddedStreamChannel(
new DelimiterBasedFrameDecoder(1, true, false, Delimiters.nulDelimiter())); new DelimiterBasedFrameDecoder(1, true, false, Delimiters.nulDelimiter()));
for (int i = 0; i < 2; i ++) { for (int i = 0; i < 2; i ++) {
embedder.offer(ChannelBuffers.wrappedBuffer(new byte[] { 1, 2 })); ch.writeInbound(ChannelBuffers.wrappedBuffer(new byte[] { 1, 2 }));
try { try {
assertTrue(embedder.offer(ChannelBuffers.wrappedBuffer(new byte[] { 0 }))); assertTrue(ch.writeInbound(ChannelBuffers.wrappedBuffer(new byte[] { 0 })));
embedder.poll();
Assert.fail(DecoderException.class.getSimpleName() + " must be raised."); Assert.fail(DecoderException.class.getSimpleName() + " must be raised.");
} catch (TooLongFrameException e) { } catch (TooLongFrameException e) {
// Expected // Expected
} }
embedder.offer(ChannelBuffers.wrappedBuffer(new byte[] { 'A', 0 })); ch.writeInbound(ChannelBuffers.wrappedBuffer(new byte[] { 'A', 0 }));
ChannelBuffer buf = embedder.poll(); ChannelBuffer buf = (ChannelBuffer) ch.readInbound();
Assert.assertEquals("A", buf.toString(CharsetUtil.ISO_8859_1)); Assert.assertEquals("A", buf.toString(CharsetUtil.ISO_8859_1));
} }
} }
@Test @Test
public void testFailFastTooLongFrameRecovery() throws Exception { public void testFailFastTooLongFrameRecovery() throws Exception {
DecoderEmbedder<ChannelBuffer> embedder = new DecoderEmbedder<ChannelBuffer>( EmbeddedStreamChannel ch = new EmbeddedStreamChannel(
new DelimiterBasedFrameDecoder(1, Delimiters.nulDelimiter())); new DelimiterBasedFrameDecoder(1, Delimiters.nulDelimiter()));
for (int i = 0; i < 2; i ++) { for (int i = 0; i < 2; i ++) {
try { try {
assertTrue(embedder.offer(ChannelBuffers.wrappedBuffer(new byte[] { 1, 2 }))); assertTrue(ch.writeInbound(ChannelBuffers.wrappedBuffer(new byte[] { 1, 2 })));
embedder.poll();
Assert.fail(DecoderException.class.getSimpleName() + " must be raised."); Assert.fail(DecoderException.class.getSimpleName() + " must be raised.");
} catch (TooLongFrameException e) { } catch (TooLongFrameException e) {
// Expected // Expected
} }
embedder.offer(ChannelBuffers.wrappedBuffer(new byte[] { 0, 'A', 0 })); ch.writeInbound(ChannelBuffers.wrappedBuffer(new byte[] { 0, 'A', 0 }));
ChannelBuffer buf = embedder.poll(); ChannelBuffer buf = (ChannelBuffer) ch.readInbound();
Assert.assertEquals("A", buf.toString(CharsetUtil.ISO_8859_1)); Assert.assertEquals("A", buf.toString(CharsetUtil.ISO_8859_1));
} }
} }

View File

@ -18,10 +18,10 @@ package io.netty.handler.codec.frame;
import static org.junit.Assert.*; import static org.junit.Assert.*;
import io.netty.buffer.ChannelBuffer; import io.netty.buffer.ChannelBuffer;
import io.netty.buffer.ChannelBuffers; import io.netty.buffer.ChannelBuffers;
import io.netty.channel.embedded.EmbeddedStreamChannel;
import io.netty.handler.codec.DecoderException; import io.netty.handler.codec.DecoderException;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.TooLongFrameException; import io.netty.handler.codec.TooLongFrameException;
import io.netty.handler.codec.embedder.DecoderEmbedder;
import io.netty.util.CharsetUtil; import io.netty.util.CharsetUtil;
import org.junit.Assert; import org.junit.Assert;
@ -30,41 +30,39 @@ import org.junit.Test;
public class LengthFieldBasedFrameDecoderTest { public class LengthFieldBasedFrameDecoderTest {
@Test @Test
public void testFailSlowTooLongFrameRecovery() throws Exception { public void testFailSlowTooLongFrameRecovery() throws Exception {
DecoderEmbedder<ChannelBuffer> embedder = new DecoderEmbedder<ChannelBuffer>( EmbeddedStreamChannel ch = new EmbeddedStreamChannel(
new LengthFieldBasedFrameDecoder(5, 0, 4, 0, 4, false)); new LengthFieldBasedFrameDecoder(5, 0, 4, 0, 4, false));
for (int i = 0; i < 2; i ++) { for (int i = 0; i < 2; i ++) {
assertFalse(embedder.offer(ChannelBuffers.wrappedBuffer(new byte[] { 0, 0, 0, 2 }))); assertFalse(ch.writeInbound(ChannelBuffers.wrappedBuffer(new byte[] { 0, 0, 0, 2 })));
try { try {
assertTrue(embedder.offer(ChannelBuffers.wrappedBuffer(new byte[] { 0, 0 }))); assertTrue(ch.writeInbound(ChannelBuffers.wrappedBuffer(new byte[] { 0, 0 })));
embedder.poll();
Assert.fail(DecoderException.class.getSimpleName() + " must be raised."); Assert.fail(DecoderException.class.getSimpleName() + " must be raised.");
} catch (TooLongFrameException e) { } catch (TooLongFrameException e) {
// Expected // Expected
} }
embedder.offer(ChannelBuffers.wrappedBuffer(new byte[] { 0, 0, 0, 1, 'A' })); ch.writeInbound(ChannelBuffers.wrappedBuffer(new byte[] { 0, 0, 0, 1, 'A' }));
ChannelBuffer buf = embedder.poll(); ChannelBuffer buf = (ChannelBuffer) ch.readInbound();
Assert.assertEquals("A", buf.toString(CharsetUtil.ISO_8859_1)); Assert.assertEquals("A", buf.toString(CharsetUtil.ISO_8859_1));
} }
} }
@Test @Test
public void testFailFastTooLongFrameRecovery() throws Exception { public void testFailFastTooLongFrameRecovery() throws Exception {
DecoderEmbedder<ChannelBuffer> embedder = new DecoderEmbedder<ChannelBuffer>( EmbeddedStreamChannel ch = new EmbeddedStreamChannel(
new LengthFieldBasedFrameDecoder(5, 0, 4, 0, 4)); new LengthFieldBasedFrameDecoder(5, 0, 4, 0, 4));
for (int i = 0; i < 2; i ++) { for (int i = 0; i < 2; i ++) {
try { try {
assertTrue(embedder.offer(ChannelBuffers.wrappedBuffer(new byte[] { 0, 0, 0, 2 }))); assertTrue(ch.writeInbound(ChannelBuffers.wrappedBuffer(new byte[] { 0, 0, 0, 2 })));
embedder.poll();
Assert.fail(DecoderException.class.getSimpleName() + " must be raised."); Assert.fail(DecoderException.class.getSimpleName() + " must be raised.");
} catch (TooLongFrameException e) { } catch (TooLongFrameException e) {
// Expected // Expected
} }
embedder.offer(ChannelBuffers.wrappedBuffer(new byte[] { 0, 0, 0, 0, 0, 1, 'A' })); ch.writeInbound(ChannelBuffers.wrappedBuffer(new byte[] { 0, 0, 0, 0, 0, 1, 'A' }));
ChannelBuffer buf = embedder.poll(); ChannelBuffer buf = (ChannelBuffer) ch.readInbound();
Assert.assertEquals("A", buf.toString(CharsetUtil.ISO_8859_1)); Assert.assertEquals("A", buf.toString(CharsetUtil.ISO_8859_1));
} }
} }

View File

@ -19,9 +19,9 @@ import static org.junit.Assert.*;
import io.netty.buffer.ChannelBuffer; import io.netty.buffer.ChannelBuffer;
import io.netty.buffer.ChannelBuffers; import io.netty.buffer.ChannelBuffers;
import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandler;
import io.netty.channel.embedded.EmbeddedStreamChannel;
import io.netty.handler.codec.CodecException; import io.netty.handler.codec.CodecException;
import io.netty.handler.codec.TooLongFrameException; import io.netty.handler.codec.TooLongFrameException;
import io.netty.handler.codec.embedder.DecoderEmbedder;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
import java.io.IOException; import java.io.IOException;
@ -42,7 +42,7 @@ public abstract class AbstractCompatibleMarshallingDecoderTest {
MarshallerFactory marshallerFactory = createMarshallerFactory(); MarshallerFactory marshallerFactory = createMarshallerFactory();
MarshallingConfiguration configuration = createMarshallingConfig(); MarshallingConfiguration configuration = createMarshallingConfig();
DecoderEmbedder<Object> decoder = new DecoderEmbedder<Object>(createDecoder(Integer.MAX_VALUE)); EmbeddedStreamChannel ch = new EmbeddedStreamChannel(createDecoder(Integer.MAX_VALUE));
ByteArrayOutputStream bout = new ByteArrayOutputStream(); ByteArrayOutputStream bout = new ByteArrayOutputStream();
Marshaller marshaller = marshallerFactory.createMarshaller(configuration); Marshaller marshaller = marshallerFactory.createMarshaller(configuration);
@ -53,14 +53,14 @@ public abstract class AbstractCompatibleMarshallingDecoderTest {
byte[] testBytes = bout.toByteArray(); byte[] testBytes = bout.toByteArray();
decoder.offer(input(testBytes)); ch.writeInbound(input(testBytes));
assertTrue(decoder.finish()); assertTrue(ch.finish());
String unmarshalled = (String) decoder.poll(); String unmarshalled = (String) ch.readInbound();
Assert.assertEquals(testObject, unmarshalled); Assert.assertEquals(testObject, unmarshalled);
Assert.assertNull(decoder.poll()); Assert.assertNull(ch.readInbound());
} }
protected ChannelBuffer input(byte[] input) { protected ChannelBuffer input(byte[] input) {
@ -72,7 +72,7 @@ public abstract class AbstractCompatibleMarshallingDecoderTest {
MarshallerFactory marshallerFactory = createMarshallerFactory(); MarshallerFactory marshallerFactory = createMarshallerFactory();
MarshallingConfiguration configuration = createMarshallingConfig(); MarshallingConfiguration configuration = createMarshallingConfig();
DecoderEmbedder<Object> decoder = new DecoderEmbedder<Object>(createDecoder(Integer.MAX_VALUE)); EmbeddedStreamChannel ch = new EmbeddedStreamChannel(createDecoder(Integer.MAX_VALUE));
ByteArrayOutputStream bout = new ByteArrayOutputStream(); ByteArrayOutputStream bout = new ByteArrayOutputStream();
Marshaller marshaller = marshallerFactory.createMarshaller(configuration); Marshaller marshaller = marshallerFactory.createMarshaller(configuration);
@ -86,16 +86,15 @@ public abstract class AbstractCompatibleMarshallingDecoderTest {
ChannelBuffer buffer = input(testBytes); ChannelBuffer buffer = input(testBytes);
ChannelBuffer slice = buffer.readSlice(2); ChannelBuffer slice = buffer.readSlice(2);
decoder.offer(slice); ch.writeInbound(slice);
decoder.offer(buffer); ch.writeInbound(buffer);
assertTrue(decoder.finish()); assertTrue(ch.finish());
String unmarshalled = (String) ch.readInbound();
String unmarshalled = (String) decoder.poll();
Assert.assertEquals(testObject, unmarshalled); Assert.assertEquals(testObject, unmarshalled);
Assert.assertNull(decoder.poll()); Assert.assertNull(ch.readInbound());
} }
@Test @Test
@ -104,7 +103,7 @@ public abstract class AbstractCompatibleMarshallingDecoderTest {
MarshallingConfiguration configuration = createMarshallingConfig(); MarshallingConfiguration configuration = createMarshallingConfig();
ChannelHandler mDecoder = createDecoder(4); ChannelHandler mDecoder = createDecoder(4);
DecoderEmbedder<Object> decoder = new DecoderEmbedder<Object>(mDecoder); EmbeddedStreamChannel ch = new EmbeddedStreamChannel(mDecoder);
ByteArrayOutputStream bout = new ByteArrayOutputStream(); ByteArrayOutputStream bout = new ByteArrayOutputStream();
Marshaller marshaller = marshallerFactory.createMarshaller(configuration); Marshaller marshaller = marshallerFactory.createMarshaller(configuration);
@ -114,10 +113,8 @@ public abstract class AbstractCompatibleMarshallingDecoderTest {
marshaller.close(); marshaller.close();
byte[] testBytes = bout.toByteArray(); byte[] testBytes = bout.toByteArray();
decoder.offer(input(testBytes));
try { try {
decoder.poll(); ch.writeInbound(input(testBytes));
fail(); fail();
} catch (CodecException e) { } catch (CodecException e) {
assertEquals(TooLongFrameException.class, e.getClass()); assertEquals(TooLongFrameException.class, e.getClass());

View File

@ -17,7 +17,7 @@ package io.netty.handler.codec.marshalling;
import io.netty.buffer.ChannelBuffer; import io.netty.buffer.ChannelBuffer;
import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandler;
import io.netty.handler.codec.embedder.EncoderEmbedder; import io.netty.channel.embedded.EmbeddedStreamChannel;
import java.io.IOException; import java.io.IOException;
@ -38,12 +38,12 @@ public abstract class AbstractCompatibleMarshallingEncoderTest {
final MarshallerFactory marshallerFactory = createMarshallerFactory(); final MarshallerFactory marshallerFactory = createMarshallerFactory();
final MarshallingConfiguration configuration = createMarshallingConfig(); final MarshallingConfiguration configuration = createMarshallingConfig();
EncoderEmbedder<ChannelBuffer> encoder = new EncoderEmbedder<ChannelBuffer>(createEncoder()); EmbeddedStreamChannel ch = new EmbeddedStreamChannel(createEncoder());
encoder.offer(testObject); ch.writeOutbound(testObject);
Assert.assertTrue(encoder.finish()); Assert.assertTrue(ch.finish());
ChannelBuffer buffer = encoder.poll(); ChannelBuffer buffer = ch.readOutbound();
Unmarshaller unmarshaller = marshallerFactory.createUnmarshaller(configuration); Unmarshaller unmarshaller = marshallerFactory.createUnmarshaller(configuration);
unmarshaller.start(Marshalling.createByteInput(truncate(buffer).nioBuffer())); unmarshaller.start(Marshalling.createByteInput(truncate(buffer).nioBuffer()));
@ -52,7 +52,7 @@ public abstract class AbstractCompatibleMarshallingEncoderTest {
Assert.assertEquals(-1, unmarshaller.read()); Assert.assertEquals(-1, unmarshaller.read());
Assert.assertNull(encoder.poll()); Assert.assertNull(ch.readOutbound());
unmarshaller.finish(); unmarshaller.finish();
unmarshaller.close(); unmarshaller.close();

View File

@ -20,30 +20,30 @@ import static org.hamcrest.core.Is.*;
import static org.hamcrest.core.IsNull.*; import static org.hamcrest.core.IsNull.*;
import static org.junit.Assert.*; import static org.junit.Assert.*;
import io.netty.buffer.ChannelBuffer; import io.netty.buffer.ChannelBuffer;
import io.netty.handler.codec.embedder.DecoderEmbedder; import io.netty.channel.embedded.EmbeddedStreamChannel;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
public class ProtobufVarint32FrameDecoderTest { public class ProtobufVarint32FrameDecoderTest {
private DecoderEmbedder<ChannelBuffer> embedder; private EmbeddedStreamChannel ch;
@Before @Before
public void setUp() { public void setUp() {
embedder = new DecoderEmbedder<ChannelBuffer>( ch = new EmbeddedStreamChannel(new ProtobufVarint32FrameDecoder());
new ProtobufVarint32FrameDecoder());
} }
@Test @Test
public void testTinyDecode() { public void testTinyDecode() {
byte[] b = { 4, 1, 1, 1, 1 }; byte[] b = { 4, 1, 1, 1, 1 };
embedder.offer(wrappedBuffer(b, 0, 1)); ch.writeInbound(wrappedBuffer(b, 0, 1));
assertThat(embedder.poll(), is(nullValue())); assertThat(ch.readInbound(), is(nullValue()));
embedder.offer(wrappedBuffer(b, 1, 2)); ch.writeInbound(wrappedBuffer(b, 1, 2));
assertThat(embedder.poll(), is(nullValue())); assertThat(ch.readInbound(), is(nullValue()));
embedder.offer(wrappedBuffer(b, 3, b.length - 3)); ch.writeInbound(wrappedBuffer(b, 3, b.length - 3));
assertThat(embedder.poll(), assertThat(
(ChannelBuffer) ch.readInbound(),
is(wrappedBuffer(new byte[] { 1, 1, 1, 1 }))); is(wrappedBuffer(new byte[] { 1, 1, 1, 1 })));
} }
@ -55,12 +55,11 @@ public class ProtobufVarint32FrameDecoderTest {
} }
b[0] = -2; b[0] = -2;
b[1] = 15; b[1] = 15;
embedder.offer(wrappedBuffer(b, 0, 127)); ch.writeInbound(wrappedBuffer(b, 0, 127));
assertThat(embedder.poll(), is(nullValue())); assertThat(ch.readInbound(), is(nullValue()));
embedder.offer(wrappedBuffer(b, 127, 600)); ch.writeInbound(wrappedBuffer(b, 127, 600));
assertThat(embedder.poll(), is(nullValue())); assertThat(ch.readInbound(), is(nullValue()));
embedder.offer(wrappedBuffer(b, 727, b.length - 727)); ch.writeInbound(wrappedBuffer(b, 727, b.length - 727));
assertThat(embedder.poll(), is(wrappedBuffer(b, 2, b.length - 2))); assertThat((ChannelBuffer) ch.readInbound(), is(wrappedBuffer(b, 2, b.length - 2)));
} }
} }

View File

@ -18,27 +18,25 @@ package io.netty.handler.codec.protobuf;
import static io.netty.buffer.ChannelBuffers.*; import static io.netty.buffer.ChannelBuffers.*;
import static org.hamcrest.core.Is.*; import static org.hamcrest.core.Is.*;
import static org.junit.Assert.*; import static org.junit.Assert.*;
import io.netty.buffer.ChannelBuffer; import io.netty.channel.embedded.EmbeddedStreamChannel;
import io.netty.handler.codec.embedder.EncoderEmbedder;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
public class ProtobufVarint32LengthFieldPrependerTest { public class ProtobufVarint32LengthFieldPrependerTest {
private EncoderEmbedder<ChannelBuffer> embedder; private EmbeddedStreamChannel ch;
@Before @Before
public void setUp() { public void setUp() {
embedder = new EncoderEmbedder<ChannelBuffer>( ch = new EmbeddedStreamChannel(new ProtobufVarint32LengthFieldPrepender());
new ProtobufVarint32LengthFieldPrepender());
} }
@Test @Test
public void testTinyEncode() { public void testTinyEncode() {
byte[] b = { 4, 1, 1, 1, 1 }; byte[] b = { 4, 1, 1, 1, 1 };
embedder.offer(wrappedBuffer(b, 1, b.length - 1)); ch.writeOutbound(wrappedBuffer(b, 1, b.length - 1));
assertThat(embedder.poll(), is(wrappedBuffer(b))); assertThat(ch.readOutbound(), is(wrappedBuffer(b)));
} }
@Test @Test
@ -49,7 +47,7 @@ public class ProtobufVarint32LengthFieldPrependerTest {
} }
b[0] = -2; b[0] = -2;
b[1] = 15; b[1] = 15;
embedder.offer(wrappedBuffer(b, 2, b.length - 2)); ch.writeOutbound(wrappedBuffer(b, 2, b.length - 2));
assertThat(embedder.poll(), is(wrappedBuffer(b))); assertThat(ch.readOutbound(), is(wrappedBuffer(b)));
} }
} }

View File

@ -31,6 +31,8 @@ import io.netty.channel.ChannelOutboundHandler;
import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPipeline;
import io.netty.channel.DefaultChannelConfig; import io.netty.channel.DefaultChannelConfig;
import io.netty.channel.EventLoop; import io.netty.channel.EventLoop;
import io.netty.logging.InternalLogger;
import io.netty.logging.InternalLoggerFactory;
import java.net.SocketAddress; import java.net.SocketAddress;
import java.util.ArrayDeque; import java.util.ArrayDeque;
@ -38,6 +40,8 @@ import java.util.Queue;
public abstract class AbstractEmbeddedChannel extends AbstractChannel { public abstract class AbstractEmbeddedChannel extends AbstractChannel {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractEmbeddedChannel.class);
private final EmbeddedEventLoop loop = new EmbeddedEventLoop(); private final EmbeddedEventLoop loop = new EmbeddedEventLoop();
private final ChannelConfig config = new DefaultChannelConfig(); private final ChannelConfig config = new DefaultChannelConfig();
private final SocketAddress localAddress = new EmbeddedSocketAddress(); private final SocketAddress localAddress = new EmbeddedSocketAddress();
@ -205,7 +209,13 @@ public abstract class AbstractEmbeddedChannel extends AbstractChannel {
@Override @Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception { throws Exception {
if (lastException == null) {
lastException = cause; lastException = cause;
} else {
logger.warn(
"More than one exception was raised. " +
"Will report only the first one and log others.", cause);
}
} }
} }

View File

@ -45,6 +45,13 @@ public class EmbeddedMessageChannel extends AbstractEmbeddedChannel {
return !lastOutboundBuffer().isEmpty(); return !lastOutboundBuffer().isEmpty();
} }
public boolean finish() {
close();
checkException();
return lastInboundByteBuffer().readable() || !lastInboundMessageBuffer().isEmpty() ||
!lastOutboundBuffer().isEmpty();
}
@Override @Override
protected void doFlushMessageBuffer(Queue<Object> buf) throws Exception { protected void doFlushMessageBuffer(Queue<Object> buf) throws Exception {
for (;;) { for (;;) {

View File

@ -51,6 +51,13 @@ public class EmbeddedStreamChannel extends AbstractEmbeddedChannel {
return lastOutboundBuffer().readable(); return lastOutboundBuffer().readable();
} }
public boolean finish() {
close();
checkException();
return lastInboundByteBuffer().readable() || !lastInboundMessageBuffer().isEmpty() ||
lastOutboundBuffer().readable();
}
@Override @Override
protected void doFlushByteBuffer(ChannelBuffer buf) throws Exception { protected void doFlushByteBuffer(ChannelBuffer buf) throws Exception {
if (!lastOutboundBuffer.readable()) { if (!lastOutboundBuffer.readable()) {