Replace codec embedder with EmbeddedChannel which can test any handlers

- Added EventExecutor.inEventLoop(Thread) and replaced executor identity
  comparison in DefaultChannelPipeline with it - more elegant IMO
- Removed the test classes that needs rewrite or is of no use
This commit is contained in:
Trustin Lee 2012-06-07 19:39:37 +09:00
parent 8701e24b9a
commit 7bc10f2eba
23 changed files with 424 additions and 1165 deletions

View File

@ -1,307 +0,0 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.embedder;
import io.netty.buffer.ChannelBuffer;
import io.netty.channel.Channel;
import io.netty.channel.ChannelBufferHolder;
import io.netty.channel.ChannelBufferHolders;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoop;
import io.netty.handler.codec.CodecException;
import java.lang.reflect.Array;
import java.util.ConcurrentModificationException;
import java.util.LinkedList;
import java.util.Queue;
/**
* A skeletal {@link CodecEmbedder} implementation.
*/
abstract class AbstractCodecEmbedder<E> implements CodecEmbedder<E> {
private static final EventLoop loop = new EmbeddedEventLoop();
private final Queue<Object> productQueue = new LinkedList<Object>();
private final Channel channel = new EmbeddedChannel(productQueue);
/**
* Creates a new embedder whose pipeline is composed of the specified
* handlers.
*/
protected AbstractCodecEmbedder(ChannelHandler... handlers) {
if (handlers == null) {
throw new NullPointerException("handlers");
}
int inboundType = 0; // 0 - unknown, 1 - stream, 2 - message
int outboundType = 0;
int nHandlers = 0;
ChannelPipeline p = channel.pipeline();
for (ChannelHandler h: handlers) {
if (h == null) {
break;
}
nHandlers ++;
p.addLast(h);
ChannelHandlerContext ctx = p.context(h);
if (inboundType == 0) {
if (ctx.canHandleInbound()) {
ChannelHandlerContext inCtx = (ChannelHandlerContext) ctx;
if (inCtx.inbound().hasByteBuffer()) {
inboundType = 1;
} else {
inboundType = 2;
}
}
}
if (ctx.canHandleOutbound()) {
ChannelHandlerContext outCtx = (ChannelHandlerContext) ctx;
if (outCtx.outbound().hasByteBuffer()) {
outboundType = 1;
} else {
outboundType = 2;
}
}
}
if (nHandlers == 0) {
throw new IllegalArgumentException("handlers is empty.");
}
if (inboundType == 0 && outboundType == 0) {
throw new IllegalArgumentException("handlers does not provide any buffers.");
}
p.addFirst(StreamToChannelBufferEncoder.INSTANCE);
if (inboundType == 1) {
p.addFirst(ChannelBufferToStreamDecoder.INSTANCE);
}
if (outboundType == 1) {
p.addLast(ChannelBufferToStreamEncoder.INSTANCE);
}
p.addLast(new LastHandler());
loop.register(channel);
}
@Override
public boolean finish() {
channel.pipeline().close().syncUninterruptibly();
return !productQueue.isEmpty();
}
/**
* Returns the virtual {@link Channel} which will be used as a mock
* during encoding and decoding.
*/
protected final Channel channel() {
return channel;
}
/**
* Returns {@code true} if and only if the produce queue is empty and
* therefore {@link #poll()} will return {@code null}.
*/
protected final boolean isEmpty() {
return productQueue.isEmpty();
}
@Override
public final E poll() {
return product(productQueue.poll());
}
@Override
public final E peek() {
return product(productQueue.peek());
}
@SuppressWarnings("unchecked")
private E product(Object p) {
if (p instanceof CodecException) {
throw (CodecException) p;
}
if (p instanceof Throwable) {
throw newCodecException((Throwable) p);
}
return (E) p;
}
protected abstract CodecException newCodecException(Throwable t);
@Override
public final Object[] pollAll() {
final int size = size();
Object[] a = new Object[size];
for (int i = 0; i < size; i ++) {
E product = poll();
if (product == null) {
throw new ConcurrentModificationException();
}
a[i] = product;
}
return a;
}
@Override
@SuppressWarnings("unchecked")
public final <T> T[] pollAll(T[] a) {
if (a == null) {
throw new NullPointerException("a");
}
final int size = size();
// Create a new array if the specified one is too small.
if (a.length < size) {
a = (T[]) Array.newInstance(a.getClass().getComponentType(), size);
}
for (int i = 0;; i ++) {
T product = (T) poll();
if (product == null) {
break;
}
a[i] = product;
}
// Put the terminator if necessary.
if (a.length > size) {
a[size] = null;
}
return a;
}
@Override
public final int size() {
return productQueue.size();
}
@Override
public ChannelPipeline pipeline() {
return channel.pipeline();
}
private final class LastHandler extends ChannelInboundHandlerAdapter<Object> {
@Override
public ChannelBufferHolder<Object> newInboundBuffer(
ChannelHandlerContext ctx) throws Exception {
return ChannelBufferHolders.messageBuffer(productQueue);
}
@Override
public void inboundBufferUpdated(ChannelHandlerContext ctx) throws Exception {
// NOOP
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
productQueue.add(cause);
}
}
@ChannelHandler.Sharable
private static final class StreamToChannelBufferEncoder extends ChannelOutboundHandlerAdapter<Byte> {
static final StreamToChannelBufferEncoder INSTANCE = new StreamToChannelBufferEncoder();
@Override
public ChannelBufferHolder<Byte> newOutboundBuffer(
ChannelHandlerContext ctx) throws Exception {
return ChannelBufferHolders.byteBuffer();
}
@Override
public void flush(ChannelHandlerContext ctx, ChannelFuture future) throws Exception {
ChannelBuffer in = ctx.outboundByteBuffer();
if (in.readable()) {
ctx.nextOutboundMessageBuffer().add(in.readBytes(in.readableBytes()));
}
ctx.flush(future);
}
}
@ChannelHandler.Sharable
private static final class ChannelBufferToStreamDecoder extends ChannelInboundHandlerAdapter<Object> {
static final ChannelBufferToStreamDecoder INSTANCE = new ChannelBufferToStreamDecoder();
@Override
public ChannelBufferHolder<Object> newInboundBuffer(
ChannelHandlerContext ctx) throws Exception {
return ChannelBufferHolders.messageBuffer();
}
@Override
public void inboundBufferUpdated(ChannelHandlerContext ctx) throws Exception {
Queue<Object> in = ctx.inboundMessageBuffer();
for (;;) {
Object msg = in.poll();
if (msg == null) {
break;
}
if (msg instanceof ChannelBuffer) {
ChannelBuffer buf = (ChannelBuffer) msg;
ctx.nextInboundByteBuffer().writeBytes(buf, buf.readerIndex(), buf.readableBytes());
} else {
ctx.nextInboundMessageBuffer().add(msg);
}
}
ctx.fireInboundBufferUpdated();
}
}
@ChannelHandler.Sharable
private static final class ChannelBufferToStreamEncoder extends ChannelOutboundHandlerAdapter<Object> {
static final ChannelBufferToStreamEncoder INSTANCE = new ChannelBufferToStreamEncoder();
@Override
public ChannelBufferHolder<Object> newOutboundBuffer(
ChannelHandlerContext ctx) throws Exception {
return ChannelBufferHolders.messageBuffer();
}
@Override
public void flush(ChannelHandlerContext ctx, ChannelFuture future) throws Exception {
Queue<Object> in = ctx.outboundMessageBuffer();
for (;;) {
Object msg = in.poll();
if (msg == null) {
break;
}
if (msg instanceof ChannelBuffer) {
ChannelBuffer buf = (ChannelBuffer) msg;
ctx.nextOutboundByteBuffer().writeBytes(buf, buf.readerIndex(), buf.readableBytes());
} else {
ctx.nextOutboundMessageBuffer().add(msg);
}
}
ctx.flush(future);
}
}
}

View File

@ -1,99 +0,0 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.embedder;
import io.netty.channel.ChannelPipeline;
import java.util.Collection;
/**
* A helper that wraps an encoder or a decoder (codec) so that they can be used
* without doing actual I/O in unit tests or higher level codecs. Please refer
* to {@link EncoderEmbedder} and {@link DecoderEmbedder} for more information.
*/
public interface CodecEmbedder<E> {
/**
* Offers an input object to the pipeline of this embedder.
*
* @return {@code true} if and only if there is something to read in the
* product queue (see {@link #poll()} and {@link #peek()})
*/
boolean offer(Object input);
/**
* Signals the pipeline that the encoding or decoding has been finished and
* no more data will be offered.
*
* @return {@code true} if and only if there is something to read in the
* product queue (see {@link #poll()} and {@link #peek()})
*/
boolean finish();
/**
* Consumes an encoded or decoded output from the product queue. The output
* object is generated by the offered input objects.
*
* @return an encoded or decoded object.
* {@code null} if and only if there is no output object left in the
* product queue.
*/
E poll();
/**
* Reads an encoded or decoded output from the head of the product queue.
* The difference from {@link #poll()} is that it does not remove the
* retrieved object from the product queue.
*
* @return an encoded or decoded object.
* {@code null} if and only if there is no output object left in the
* product queue.
*/
E peek();
/**
* Consumes all encoded or decoded output from the product queue. The
* output object is generated by the offered input objects. The behavior
* of this method is identical with {@link Collection#toArray()} except that
* the product queue is cleared.
*
* @return an array of all encoded or decoded objects.
* An empty array is returned if and only if there is no output
* object left in the product queue.
*/
Object[] pollAll();
/**
* Consumes all encoded or decoded output from the product queue. The
* output object is generated by the offered input objects. The behavior
* of this method is identical with {@link Collection#toArray(Object[])}
* except that the product queue is cleared.
*
* @return an array of all encoded or decoded objects.
* An empty array is returned if and only if there is no output
* object left in the product queue.
*/
<T> T[] pollAll(T[] a);
/**
* Returns the number of encoded or decoded output in the product queue.
*/
int size();
/**
* Returns the {@link ChannelPipeline} that handles the input.
*/
ChannelPipeline pipeline();
}

View File

@ -1,80 +0,0 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.embedder;
import io.netty.buffer.ChannelBuffer;
import io.netty.buffer.ChannelBuffers;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.NoSuchBufferException;
import io.netty.handler.codec.CodecException;
import io.netty.handler.codec.DecoderException;
import io.netty.handler.codec.base64.Base64Decoder;
import io.netty.handler.codec.string.StringDecoder;
/**
* A helper that wraps a decoder so that it can be used without doing actual
* I/O in unit tests or higher level codecs. For example, you can decode a
* Base64-encoded {@link ChannelBuffer} with {@link Base64Decoder} and
* {@link StringDecoder} without setting up the {@link ChannelPipeline} and
* other mock objects by yourself:
* <pre>
* {@link ChannelBuffer} base64Data = {@link ChannelBuffers}.copiedBuffer("Zm9vYmFy", CharsetUtil.US_ASCII);
*
* {@link DecoderEmbedder}&lt;String&gt; embedder = new {@link DecoderEmbedder}&lt;String&gt;(
* new {@link Base64Decoder}(), new {@link StringDecoder}());
*
* embedder.offer(base64Data);
*
* String decoded = embedder.poll();
* assert decoded.equals("foobar");
* </pre>
* @apiviz.landmark
* @see EncoderEmbedder
*/
public class DecoderEmbedder<E> extends AbstractCodecEmbedder<E> {
/**
* Creates a new embedder whose pipeline is composed of the specified
* handlers.
*/
public DecoderEmbedder(ChannelHandler... handlers) {
super(handlers);
}
@Override
public boolean offer(Object input) {
if (input instanceof ChannelBuffer) {
try {
pipeline().inboundByteBuffer().writeBytes((ChannelBuffer) input);
} catch (NoSuchBufferException e) {
// Throwing and catching this exception is cheap because we do not fill
// stack traces internally (see DefaultChannelPipeline).
pipeline().inboundMessageBuffer().add(input);
}
} else {
pipeline().inboundMessageBuffer().add(input);
}
pipeline().fireInboundBufferUpdated();
return !isEmpty();
}
@Override
protected CodecException newCodecException(Throwable t) {
return new DecoderException(t);
}
}

View File

@ -1,130 +0,0 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.embedder;
import io.netty.channel.AbstractChannel;
import io.netty.channel.ChannelBufferHolder;
import io.netty.channel.ChannelBufferHolders;
import io.netty.channel.ChannelConfig;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelType;
import io.netty.channel.DefaultChannelConfig;
import io.netty.channel.EventLoop;
import java.net.SocketAddress;
import java.util.Queue;
class EmbeddedChannel extends AbstractChannel {
private final ChannelConfig config = new DefaultChannelConfig();
private final SocketAddress localAddress = new EmbeddedSocketAddress();
private final SocketAddress remoteAddress = new EmbeddedSocketAddress();
private final Queue<Object> productQueue;
private int state; // 0 = OPEN, 1 = ACTIVE, 2 = CLOSED
EmbeddedChannel(Queue<Object> productQueue) {
super(null, null, ChannelBufferHolders.messageBuffer());
this.productQueue = productQueue;
}
@Override
public ChannelType type() {
return ChannelType.MESSAGE;
}
@Override
public ChannelConfig config() {
return config;
}
@Override
public boolean isOpen() {
return state < 2;
}
@Override
public boolean isActive() {
return state == 1;
}
@Override
protected boolean isCompatible(EventLoop loop) {
return loop instanceof EmbeddedEventLoop;
}
@Override
protected SocketAddress localAddress0() {
return isActive()? localAddress : null;
}
@Override
protected SocketAddress remoteAddress0() {
return isActive()? remoteAddress : null;
}
@Override
protected Runnable doRegister() throws Exception {
state = 1;
return null;
}
@Override
protected void doBind(SocketAddress localAddress) throws Exception {
// NOOP
}
@Override
protected void doDisconnect() throws Exception {
doClose();
}
@Override
protected void doClose() throws Exception {
state = 2;
}
@Override
protected void doDeregister() throws Exception {
// NOOP
}
@Override
protected void doFlush(ChannelBufferHolder<Object> buf) throws Exception {
Queue<Object> msgBuf = buf.messageBuffer();
if (!msgBuf.isEmpty()) {
productQueue.addAll(msgBuf);
msgBuf.clear();
}
}
@Override
protected Unsafe newUnsafe() {
return new DefaultUnsafe();
}
@Override
protected boolean isFlushPending() {
return false;
}
private class DefaultUnsafe extends AbstractUnsafe {
@Override
public void connect(SocketAddress remoteAddress,
SocketAddress localAddress, ChannelFuture future) {
future.setSuccess();
}
}
}

View File

@ -1,67 +0,0 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.embedder;
import io.netty.buffer.ChannelBuffer;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.CodecException;
import io.netty.handler.codec.EncoderException;
import io.netty.handler.codec.base64.Base64Encoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.CharsetUtil;
/**
* A helper that wraps an encoder so that it can be used without doing actual
* I/O in unit tests or higher level codecs. For example, you can encode a
* {@link String} into a Base64-encoded {@link ChannelBuffer} with
* {@link Base64Encoder} and {@link StringEncoder} without setting up the
* {@link ChannelPipeline} and other mock objects by yourself:
* <pre>
* String data = "foobar";
*
* {@link EncoderEmbedder}&lt;{@link ChannelBuffer}&gt; embedder = new {@link EncoderEmbedder}&lt;{@link ChannelBuffer}&gt;(
* new {@link Base64Encoder}(), new {@link StringEncoder}());
*
* embedder.offer(data);
*
* {@link ChannelBuffer} encoded = embedder.poll();
* assert encoded.toString({@link CharsetUtil}.US_ASCII).equals("Zm9vYmFy");
* </pre>
* @apiviz.landmark
* @see DecoderEmbedder
*/
public class EncoderEmbedder<E> extends AbstractCodecEmbedder<E> {
/**
* Creates a new embedder whose pipeline is composed of the specified
* handlers.
*/
public EncoderEmbedder(ChannelHandler... handlers) {
super(handlers);
}
@Override
public boolean offer(Object input) {
channel().write(input);
return !isEmpty();
}
@Override
protected CodecException newCodecException(Throwable t) {
return new EncoderException(t);
}
}

View File

@ -20,7 +20,7 @@ import io.netty.buffer.ChannelBuffer;
import io.netty.buffer.ChannelBufferIndexFinder;
import io.netty.buffer.ChannelBuffers;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.embedder.DecoderEmbedder;
import io.netty.channel.embedded.EmbeddedStreamChannel;
import io.netty.util.VoidEnum;
import org.junit.Test;
@ -29,23 +29,23 @@ public class ReplayingDecoderTest {
@Test
public void testLineProtocol() {
DecoderEmbedder<ChannelBuffer> e = new DecoderEmbedder<ChannelBuffer>(new LineDecoder());
EmbeddedStreamChannel ch = new EmbeddedStreamChannel(new LineDecoder());
// Ordinary input
e.offer(ChannelBuffers.wrappedBuffer(new byte[] { 'A' }));
assertNull(e.poll());
e.offer(ChannelBuffers.wrappedBuffer(new byte[] { 'B' }));
assertNull(e.poll());
e.offer(ChannelBuffers.wrappedBuffer(new byte[] { 'C' }));
assertNull(e.poll());
e.offer(ChannelBuffers.wrappedBuffer(new byte[] { '\n' }));
assertEquals(ChannelBuffers.wrappedBuffer(new byte[] { 'A', 'B', 'C' }), e.poll());
ch.writeInbound(ChannelBuffers.wrappedBuffer(new byte[] { 'A' }));
assertNull(ch.readInbound());
ch.writeInbound(ChannelBuffers.wrappedBuffer(new byte[] { 'B' }));
assertNull(ch.readInbound());
ch.writeInbound(ChannelBuffers.wrappedBuffer(new byte[] { 'C' }));
assertNull(ch.readInbound());
ch.writeInbound(ChannelBuffers.wrappedBuffer(new byte[] { '\n' }));
assertEquals(ChannelBuffers.wrappedBuffer(new byte[] { 'A', 'B', 'C' }), ch.readInbound());
// Truncated input
e.offer(ChannelBuffers.wrappedBuffer(new byte[] { 'A' }));
assertNull(e.poll());
e.finish();
assertNull(e.poll());
ch.writeInbound(ChannelBuffers.wrappedBuffer(new byte[] { 'A' }));
assertNull(ch.readInbound());
ch.close();
assertNull(ch.readInbound());
}
private static final class LineDecoder extends ReplayingDecoder<ChannelBuffer, VoidEnum> {

View File

@ -1,171 +0,0 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.testsuite.transport.socket;
import static org.junit.Assert.*;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelPipelineException;
import io.netty.testsuite.util.DummyHandler;
import io.netty.util.SocketAddresses;
import io.netty.util.internal.ExecutorUtil;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.ServerSocketChannel;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.easymock.EasyMock;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
/**
* An abstract test class to test socket client bootstraps
*/
public abstract class AbstractSocketClientBootstrapTest {
private static ExecutorService executor;
@BeforeClass
public static void init() {
executor = Executors.newCachedThreadPool();
}
@AfterClass
public static void destroy() {
ExecutorUtil.terminate(executor);
}
protected abstract ChannelFactory newClientSocketChannelFactory(Executor executor);
@Test(timeout = 10000)
public void testFailedConnectionAttempt() throws Exception {
ClientBootstrap bootstrap = new ClientBootstrap();
bootstrap.setFactory(newClientSocketChannelFactory(executor));
bootstrap.pipeline().addLast("dummy", new DummyHandler());
bootstrap.setOption("remoteAddress", new InetSocketAddress("255.255.255.255", 1));
ChannelFuture future = bootstrap.connect();
future.awaitUninterruptibly();
assertFalse(future.isSuccess());
assertTrue(future.cause() instanceof IOException);
}
@Test(timeout = 10000)
public void testSuccessfulConnectionAttempt() throws Throwable {
ServerSocketChannel serverSocket = ServerSocketChannel.open();
serverSocket.socket().bind(new InetSocketAddress(0));
try {
serverSocket.configureBlocking(false);
ClientBootstrap bootstrap =
new ClientBootstrap(newClientSocketChannelFactory(executor));
bootstrap.pipeline().addLast("dummy", new DummyHandler());
bootstrap.setOption(
"remoteAddress",
new InetSocketAddress(
SocketAddresses.LOCALHOST,
serverSocket.socket().getLocalPort()));
ChannelFuture future = bootstrap.connect();
serverSocket.accept();
future.awaitUninterruptibly();
if (future.cause() != null) {
throw future.cause();
}
assertTrue(future.isSuccess());
future.channel().close().awaitUninterruptibly();
} finally {
try {
serverSocket.close();
} catch (IOException e) {
// Ignore.
}
}
}
@Test(timeout = 10000)
public void testSuccessfulConnectionAttemptWithLocalAddress() throws Throwable {
ServerSocketChannel serverSocket = ServerSocketChannel.open();
serverSocket.socket().bind(new InetSocketAddress(0));
try {
serverSocket.configureBlocking(false);
ClientBootstrap bootstrap =
new ClientBootstrap(newClientSocketChannelFactory(executor));
bootstrap.pipeline().addLast("dummy", new DummyHandler());
bootstrap.setOption(
"remoteAddress",
new InetSocketAddress(
SocketAddresses.LOCALHOST,
serverSocket.socket().getLocalPort()));
bootstrap.setOption("localAddress", new InetSocketAddress(0));
ChannelFuture future = bootstrap.connect();
serverSocket.accept();
future.awaitUninterruptibly();
if (future.cause() != null) {
throw future.cause();
}
assertTrue(future.isSuccess());
future.channel().close().awaitUninterruptibly();
} finally {
try {
serverSocket.close();
} catch (IOException e) {
// Ignore.
}
}
}
@Test(expected = ChannelPipelineException.class)
public void testFailedPipelineInitialization() throws Exception {
ClientBootstrap bootstrap = new ClientBootstrap(EasyMock.createMock(ChannelFactory.class));
ChannelPipelineFactory pipelineFactory = EasyMock.createMock(ChannelPipelineFactory.class);
bootstrap.setPipelineFactory(pipelineFactory);
EasyMock.expect(pipelineFactory.pipeline()).andThrow(new ChannelPipelineException());
EasyMock.replay(pipelineFactory);
bootstrap.connect(new InetSocketAddress(SocketAddresses.LOCALHOST, 1));
}
@Test(expected = IllegalStateException.class)
public void shouldHaveRemoteAddressOption() {
new ClientBootstrap(EasyMock.createMock(ChannelFactory.class)).connect();
}
@Test(expected = NullPointerException.class)
public void shouldDisallowNullRemoteAddressParameter1() {
new ClientBootstrap(EasyMock.createMock(ChannelFactory.class)).connect(null);
}
@Test(expected = NullPointerException.class)
public void shouldDisallowNullRemoteAddressParameter2() {
new ClientBootstrap(EasyMock.createMock(ChannelFactory.class)).connect(null, null);
}
}

View File

@ -1,222 +0,0 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.testsuite.transport.socket;
import static org.junit.Assert.*;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipelineException;
import io.netty.channel.socket.SocketChannelConfig;
import io.netty.logging.InternalLogger;
import io.netty.logging.InternalLoggerFactory;
import io.netty.testsuite.util.DummyHandler;
import io.netty.util.SocketAddresses;
import io.netty.util.internal.ExecutorUtil;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.easymock.EasyMock;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
/**
* An abstract test class to test server socket bootstraps
*/
public abstract class AbstractSocketServerBootstrapTest {
private static final InternalLogger logger =
InternalLoggerFactory.getInstance(AbstractSocketServerBootstrapTest.class);
private static final boolean BUFSIZE_MODIFIABLE;
static {
boolean bufSizeModifiable = true;
Socket s = new Socket();
try {
s.setReceiveBufferSize(1234);
try {
if (s.getReceiveBufferSize() != 1234) {
throw new IllegalStateException();
}
} catch (Exception e) {
bufSizeModifiable = false;
logger.warn(
"Socket.getReceiveBufferSize() does not work: " + e);
}
} catch (Exception e) {
bufSizeModifiable = false;
logger.warn(
"Socket.setReceiveBufferSize() does not work: " + e);
} finally {
BUFSIZE_MODIFIABLE = bufSizeModifiable;
try {
s.close();
} catch (IOException e) {
// Ignore.
}
}
}
private static ExecutorService executor;
@BeforeClass
public static void init() {
executor = Executors.newCachedThreadPool();
}
@AfterClass
public static void destroy() {
ExecutorUtil.terminate(executor);
}
protected abstract ChannelFactory newServerSocketChannelFactory(Executor executor);
@Test(timeout = 30000, expected = ChannelException.class)
public void testFailedBindAttempt() throws Exception {
final ServerSocket ss = new ServerSocket(0);
final int boundPort = ss.getLocalPort();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.setFactory(newServerSocketChannelFactory(executor));
bootstrap.setOption("localAddress", new InetSocketAddress(boundPort));
bootstrap.bind().close().awaitUninterruptibly();
} finally {
ss.close();
}
}
@Test(timeout = 30000)
public void testSuccessfulBindAttempt() throws Exception {
ServerBootstrap bootstrap = new ServerBootstrap(
newServerSocketChannelFactory(executor));
bootstrap.setParentHandler(new ParentChannelHandler());
bootstrap.setOption("localAddress", new InetSocketAddress(0));
bootstrap.setOption("child.receiveBufferSize", 9753);
bootstrap.setOption("child.sendBufferSize", 8642);
bootstrap.pipeline().addLast("dummy", new DummyHandler());
Channel channel = bootstrap.bind();
ParentChannelHandler pch =
channel.pipeline().get(ParentChannelHandler.class);
Socket socket = null;
try {
socket = new Socket(
SocketAddresses.LOCALHOST,
((InetSocketAddress) channel.getLocalAddress()).getPort());
// Wait until the connection is open in the server side.
while (pch.child == null) {
Thread.yield();
}
SocketChannelConfig cfg = (SocketChannelConfig) pch.child.getConfig();
if (BUFSIZE_MODIFIABLE) {
assertEquals(9753, cfg.getReceiveBufferSize());
assertEquals(8642, cfg.getSendBufferSize());
}
} finally {
if (socket != null) {
try {
socket.close();
} catch (IOException e) {
// Ignore.
}
}
channel.close().awaitUninterruptibly();
}
// Wait until the child connection is closed in the client side.
// We do not use Channel.close() to make sure it is closed automatically.
while (pch.child.isOpen()) {
try {
Thread.sleep(1);
} catch (InterruptedException e) {
// Ignore
}
}
// Wait until all child events are fired.
while (pch.result.length() < 2) {
try {
Thread.sleep(1);
} catch (InterruptedException e) {
// Ignore
}
}
// Confirm the received child events.
assertEquals("12", pch.result.toString());
}
@Test(expected = ChannelPipelineException.class)
public void testFailedPipelineInitialization() throws Exception {
ClientBootstrap bootstrap = new ClientBootstrap(EasyMock.createMock(ChannelFactory.class));
ChannelPipelineFactory pipelineFactory = EasyMock.createMock(ChannelPipelineFactory.class);
bootstrap.setPipelineFactory(pipelineFactory);
EasyMock.expect(pipelineFactory.pipeline()).andThrow(new ChannelPipelineException());
EasyMock.replay(pipelineFactory);
bootstrap.connect(new InetSocketAddress(SocketAddresses.LOCALHOST, 1));
}
@Test(expected = IllegalStateException.class)
public void shouldHaveLocalAddressOption() {
new ServerBootstrap(EasyMock.createMock(ServerChannelFactory.class)).bind();
}
@Test(expected = NullPointerException.class)
public void shouldDisallowNullLocalAddressParameter() {
new ServerBootstrap(EasyMock.createMock(ServerChannelFactory.class)).bind(null);
}
private static class ParentChannelHandler extends SimpleChannelUpstreamHandler {
volatile Channel child;
final StringBuffer result = new StringBuffer();
ParentChannelHandler() {
}
@Override
public void childChannelClosed(ChannelHandlerContext ctx,
ChildChannelStateEvent e) throws Exception {
result.append('2');
}
@Override
public void childChannelOpen(ChannelHandlerContext ctx,
ChildChannelStateEvent e) throws Exception {
child = e.getChildChannel();
result.append('1');
}
}
}

View File

@ -1,31 +0,0 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.testsuite.util;
import io.netty.channel.ChannelHandlerContext;
public class DummyHandler implements ChannelUpstreamHandler, ChannelDownstreamHandler {
public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e)
throws Exception {
ctx.sendUpstream(e);
}
public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent e)
throws Exception {
ctx.sendDownstream(e);
}
}

View File

@ -401,22 +401,22 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
@Override
public ChannelBuffer nextInboundByteBuffer() {
return DefaultChannelPipeline.nextInboundByteBuffer(executor(), next);
return DefaultChannelPipeline.nextInboundByteBuffer(next);
}
@Override
public Queue<Object> nextInboundMessageBuffer() {
return DefaultChannelPipeline.nextInboundMessageBuffer(executor(), next);
return DefaultChannelPipeline.nextInboundMessageBuffer(next);
}
@Override
public ChannelBuffer nextOutboundByteBuffer() {
return pipeline.nextOutboundByteBuffer(executor(), prev);
return pipeline.nextOutboundByteBuffer(prev);
}
@Override
public Queue<Object> nextOutboundMessageBuffer() {
return pipeline.nextOutboundMessageBuffer(executor(), prev);
return pipeline.nextOutboundMessageBuffer(prev);
}
@Override

View File

@ -31,7 +31,6 @@ import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Queue;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
/**
@ -891,7 +890,7 @@ public class DefaultChannelPipeline implements ChannelPipeline {
throw new NoSuchBufferException(
"The first inbound buffer of this channel must be a message buffer.");
}
return nextInboundMessageBuffer(SingleThreadEventExecutor.currentEventLoop(), head.next);
return nextInboundMessageBuffer(head.next);
}
@Override
@ -900,17 +899,17 @@ public class DefaultChannelPipeline implements ChannelPipeline {
throw new NoSuchBufferException(
"The first inbound buffer of this channel must be a byte buffer.");
}
return nextInboundByteBuffer(SingleThreadEventExecutor.currentEventLoop(), head.next);
return nextInboundByteBuffer(head.next);
}
@Override
public Queue<Object> outboundMessageBuffer() {
return nextOutboundMessageBuffer(SingleThreadEventExecutor.currentEventLoop(), tail);
return nextOutboundMessageBuffer(tail);
}
@Override
public ChannelBuffer outboundByteBuffer() {
return nextOutboundByteBuffer(SingleThreadEventExecutor.currentEventLoop(), tail);
return nextOutboundByteBuffer(tail);
}
static boolean hasNextInboundByteBuffer(DefaultChannelHandlerContext ctx) {
@ -937,13 +936,14 @@ public class DefaultChannelPipeline implements ChannelPipeline {
}
}
static ChannelBuffer nextInboundByteBuffer(Executor currentExecutor, DefaultChannelHandlerContext ctx) {
static ChannelBuffer nextInboundByteBuffer(DefaultChannelHandlerContext ctx) {
final Thread currentThread = Thread.currentThread();
for (;;) {
if (ctx == null) {
throw new NoSuchBufferException();
}
if (ctx.inByteBuf != null) {
if (currentExecutor == ctx.executor()) {
if (ctx.executor().inEventLoop(currentThread)) {
return ctx.inByteBuf;
} else {
StreamBridge bridge = ctx.inByteBridge.get();
@ -960,14 +960,15 @@ public class DefaultChannelPipeline implements ChannelPipeline {
}
}
static Queue<Object> nextInboundMessageBuffer(Executor currentExecutor, DefaultChannelHandlerContext ctx) {
static Queue<Object> nextInboundMessageBuffer(DefaultChannelHandlerContext ctx) {
final Thread currentThread = Thread.currentThread();
for (;;) {
if (ctx == null) {
throw new NoSuchBufferException();
}
if (ctx.inMsgBuf != null) {
if (currentExecutor == ctx.executor()) {
if (ctx.executor().inEventLoop(currentThread)) {
return ctx.inMsgBuf;
} else {
MessageBridge bridge = ctx.inMsgBridge.get();
@ -1010,14 +1011,15 @@ public class DefaultChannelPipeline implements ChannelPipeline {
}
}
ChannelBuffer nextOutboundByteBuffer(Executor currentExecutor, DefaultChannelHandlerContext ctx) {
ChannelBuffer nextOutboundByteBuffer(DefaultChannelHandlerContext ctx) {
final Thread currentThread = Thread.currentThread();
for (;;) {
if (ctx == null) {
throw new NoSuchBufferException();
}
if (ctx.outByteBuf != null) {
if (currentExecutor == ctx.executor()) {
if (ctx.executor().inEventLoop(currentThread)) {
return ctx.outByteBuf;
} else {
StreamBridge bridge = ctx.outByteBridge.get();
@ -1034,14 +1036,15 @@ public class DefaultChannelPipeline implements ChannelPipeline {
}
}
Queue<Object> nextOutboundMessageBuffer(Executor currentExecutor, DefaultChannelHandlerContext ctx) {
Queue<Object> nextOutboundMessageBuffer(DefaultChannelHandlerContext ctx) {
final Thread currentThread = Thread.currentThread();
for (;;) {
if (ctx == null) {
throw new NoSuchBufferException();
}
if (ctx.outMsgBuf != null) {
if (currentExecutor == ctx.executor()) {
if (ctx.executor().inEventLoop(currentThread)) {
return ctx.outMsgBuf;
} else {
MessageBridge bridge = ctx.outMsgBridge.get();
@ -1618,7 +1621,7 @@ public class DefaultChannelPipeline implements ChannelPipeline {
}
@SuppressWarnings("rawtypes")
private final class HeadHandler implements ChannelOutboundHandler, ChannelOperationHandler {
private final class HeadHandler implements ChannelOutboundHandler {
@Override
public ChannelBufferHolder newOutboundBuffer(ChannelHandlerContext ctx) throws Exception {
switch (channel.type()) {

View File

@ -19,6 +19,7 @@ import java.util.concurrent.ScheduledExecutorService;
public interface EventExecutor extends ScheduledExecutorService {
boolean inEventLoop();
boolean inEventLoop(Thread thread);
Unsafe unsafe();
public interface Unsafe {

View File

@ -204,6 +204,11 @@ public abstract class MultithreadEventExecutor implements EventExecutor {
return SingleThreadEventExecutor.currentEventLoop() != null;
}
@Override
public boolean inEventLoop(Thread thread) {
throw new UnsupportedOperationException();
}
private static EventExecutor currentEventLoop() {
EventExecutor loop = SingleThreadEventExecutor.currentEventLoop();
if (loop == null) {

View File

@ -216,7 +216,12 @@ public abstract class SingleThreadEventExecutor extends AbstractExecutorService
@Override
public boolean inEventLoop() {
return Thread.currentThread() == thread;
return inEventLoop(Thread.currentThread());
}
@Override
public boolean inEventLoop(Thread thread) {
return thread == this.thread;
}
public void addShutdownHook(final Runnable task) {

View File

@ -0,0 +1,223 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.embedded;
import io.netty.buffer.ChannelBuffer;
import io.netty.buffer.ChannelBuffers;
import io.netty.channel.AbstractChannel;
import io.netty.channel.ChannelBufferHolder;
import io.netty.channel.ChannelBufferHolders;
import io.netty.channel.ChannelConfig;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandler;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelOutboundHandler;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.DefaultChannelConfig;
import io.netty.channel.EventLoop;
import java.net.SocketAddress;
import java.util.ArrayDeque;
import java.util.Queue;
public abstract class AbstractEmbeddedChannel extends AbstractChannel {
private final EmbeddedEventLoop loop = new EmbeddedEventLoop();
private final ChannelConfig config = new DefaultChannelConfig();
private final SocketAddress localAddress = new EmbeddedSocketAddress();
private final SocketAddress remoteAddress = new EmbeddedSocketAddress();
private final Queue<Object> lastInboundMessageBuffer = new ArrayDeque<Object>();
private final ChannelBuffer lastInboundByteBuffer = ChannelBuffers.dynamicBuffer();
private Throwable lastException;
private int state; // 0 = OPEN, 1 = ACTIVE, 2 = CLOSED
AbstractEmbeddedChannel(ChannelBufferHolder<?> outboundBuffer, ChannelHandler... handlers) {
super(null, null, outboundBuffer);
if (handlers == null) {
throw new NullPointerException("handlers");
}
int nHandlers = 0;
boolean hasBuffer = false;
ChannelPipeline p = pipeline();
for (ChannelHandler h: handlers) {
if (h == null) {
break;
}
nHandlers ++;
p.addLast(h);
if (h instanceof ChannelInboundHandler || h instanceof ChannelOutboundHandler) {
hasBuffer = true;
}
}
if (nHandlers == 0) {
throw new IllegalArgumentException("handlers is empty.");
}
if (!hasBuffer) {
throw new IllegalArgumentException("handlers does not provide any buffers.");
}
p.addLast(new LastInboundMessageHandler(), new LastInboundStreamHandler());
loop.register(this);
}
@Override
public ChannelConfig config() {
return config;
}
@Override
public boolean isOpen() {
return state < 2;
}
@Override
public boolean isActive() {
return state == 1;
}
public Queue<Object> lastInboundMessageBuffer() {
return lastInboundMessageBuffer;
}
public ChannelBuffer lastInboundByteBuffer() {
return lastInboundByteBuffer;
}
public Object readInbound() {
if (lastInboundByteBuffer.readable()) {
try {
return lastInboundByteBuffer.readBytes(lastInboundByteBuffer.readableBytes());
} finally {
lastInboundByteBuffer.clear();
}
}
return lastInboundMessageBuffer.poll();
}
public void checkException() {
Throwable t = lastException;
if (t == null) {
return;
}
lastException = null;
if (t instanceof RuntimeException) {
throw (RuntimeException) t;
}
if (t instanceof Error) {
throw (Error) t;
}
throw new ChannelException(t);
}
@Override
protected boolean isCompatible(EventLoop loop) {
return loop instanceof EmbeddedEventLoop;
}
@Override
protected SocketAddress localAddress0() {
return isActive()? localAddress : null;
}
@Override
protected SocketAddress remoteAddress0() {
return isActive()? remoteAddress : null;
}
@Override
protected Runnable doRegister() throws Exception {
state = 1;
return null;
}
@Override
protected void doBind(SocketAddress localAddress) throws Exception {
// NOOP
}
@Override
protected void doDisconnect() throws Exception {
doClose();
}
@Override
protected void doClose() throws Exception {
state = 2;
}
@Override
protected void doDeregister() throws Exception {
// NOOP
}
@Override
protected Unsafe newUnsafe() {
return new DefaultUnsafe();
}
@Override
protected boolean isFlushPending() {
return false;
}
private class DefaultUnsafe extends AbstractUnsafe {
@Override
public void connect(SocketAddress remoteAddress,
SocketAddress localAddress, ChannelFuture future) {
future.setSuccess();
}
}
private final class LastInboundMessageHandler extends ChannelInboundHandlerAdapter<Object> {
@Override
public ChannelBufferHolder<Object> newInboundBuffer(ChannelHandlerContext ctx) throws Exception {
return ChannelBufferHolders.messageBuffer(lastInboundMessageBuffer);
}
@Override
public void inboundBufferUpdated(ChannelHandlerContext ctx) throws Exception {
// Do nothing.
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
lastException = cause;
}
}
private final class LastInboundStreamHandler extends ChannelInboundHandlerAdapter<Byte> {
@Override
public ChannelBufferHolder<Byte> newInboundBuffer(ChannelHandlerContext ctx) throws Exception {
return ChannelBufferHolders.byteBuffer(lastInboundByteBuffer);
}
@Override
public void inboundBufferUpdated(ChannelHandlerContext ctx) throws Exception {
// No nothing
}
}
}

View File

@ -13,7 +13,7 @@
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.embedder;
package io.netty.channel.embedded;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
@ -102,6 +102,11 @@ class EmbeddedEventLoop extends AbstractExecutorService implements
return true;
}
@Override
public boolean inEventLoop(Thread thread) {
return true;
}
@Override
public Unsafe unsafe() {
return this;

View File

@ -0,0 +1,58 @@
package io.netty.channel.embedded;
import io.netty.channel.ChannelBufferHolders;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelType;
import java.util.ArrayDeque;
import java.util.Queue;
public class EmbeddedMessageChannel extends AbstractEmbeddedChannel {
private final Queue<Object> lastOutboundBuffer = new ArrayDeque<Object>();
public EmbeddedMessageChannel(ChannelHandler... handlers) {
super(ChannelBufferHolders.messageBuffer(), handlers);
}
@Override
public ChannelType type() {
return ChannelType.MESSAGE;
}
public Queue<Object> inboundBuffer() {
return pipeline().inboundMessageBuffer();
}
public Queue<Object> lastOutboundBuffer() {
return lastOutboundBuffer;
}
public Object readOutbound() {
return lastOutboundBuffer.poll();
}
public boolean writeInbound(Object msg) {
inboundBuffer().add(msg);
pipeline().fireInboundBufferUpdated();
checkException();
return lastInboundByteBuffer().readable() || !lastInboundMessageBuffer().isEmpty();
}
public boolean writeOutbound(Object msg) {
write(msg);
checkException();
return !lastOutboundBuffer().isEmpty();
}
@Override
protected void doFlushMessageBuffer(Queue<Object> buf) throws Exception {
for (;;) {
Object o = buf.poll();
if (o == null) {
break;
}
lastOutboundBuffer.add(o);
}
}
}

View File

@ -13,7 +13,7 @@
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.embedder;
package io.netty.channel.embedded;
import java.net.SocketAddress;

View File

@ -0,0 +1,61 @@
package io.netty.channel.embedded;
import io.netty.buffer.ChannelBuffer;
import io.netty.buffer.ChannelBuffers;
import io.netty.channel.ChannelBufferHolders;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelType;
public class EmbeddedStreamChannel extends AbstractEmbeddedChannel {
private final ChannelBuffer lastOutboundBuffer = ChannelBuffers.dynamicBuffer();
public EmbeddedStreamChannel(ChannelHandler... handlers) {
super(ChannelBufferHolders.messageBuffer(), handlers);
}
@Override
public ChannelType type() {
return ChannelType.STREAM;
}
public ChannelBuffer inboundBuffer() {
return pipeline().inboundByteBuffer();
}
public ChannelBuffer lastOutboundBuffer() {
return lastOutboundBuffer;
}
public ChannelBuffer readOutbound() {
if (!lastOutboundBuffer.readable()) {
return null;
}
try {
return lastOutboundBuffer.readBytes(lastOutboundBuffer.readableBytes());
} finally {
lastOutboundBuffer.clear();
}
}
public boolean writeInbound(ChannelBuffer data) {
inboundBuffer().writeBytes(data);
pipeline().fireInboundBufferUpdated();
checkException();
return lastInboundByteBuffer().readable() || !lastInboundMessageBuffer().isEmpty();
}
public boolean writeOutbound(Object msg) {
write(msg);
checkException();
return lastOutboundBuffer().readable();
}
@Override
protected void doFlushByteBuffer(ChannelBuffer buf) throws Exception {
if (!lastOutboundBuffer.readable()) {
lastOutboundBuffer.discardReadBytes();
}
lastOutboundBuffer.writeBytes(buf);
}
}

View File

@ -15,10 +15,8 @@
*/
/**
* A helper that wraps an encoder or a decoder so that they can be used without
* doing actual I/O in unit tests or higher level codecs.
*
* @apiviz.exclude CodecEmbedder$
* A virtual {@link io.netty.channel.Channel} that helps wrapping a series of handlers to
* unit test the handlers or use them in non-I/O context.
*/
package io.netty.handler.codec.embedder;
package io.netty.channel.embedded;

View File

@ -259,6 +259,11 @@ public class OioEventLoop implements EventLoop {
return SingleThreadEventExecutor.currentEventLoop() != null;
}
@Override
public boolean inEventLoop(Thread thread) {
throw new UnsupportedOperationException();
}
private EventLoop nextChild() {
OioChildEventLoop loop = idleChildren.poll();
if (loop == null) {

View File

@ -52,17 +52,7 @@ public class DefaultChannelPipelineTest {
}
@Sharable
private static class TestHandler extends ChannelHandlerAdapter<Byte, Byte> {
@Override
public ChannelBufferHolder<Byte> newInboundBuffer(
ChannelHandlerContext ctx) throws Exception {
return ChannelBufferHolders.byteBuffer();
private static class TestHandler extends ChannelHandlerAdapter {
// Dummy
}
@Override
public ChannelBufferHolder<Byte> newOutboundBuffer(
ChannelHandlerContext ctx) throws Exception {
return ChannelBufferHolders.byteBuffer();
}
};
}

View File

@ -23,8 +23,10 @@ import io.netty.channel.ChannelBufferHolders;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandler;
import io.netty.channel.ChannelInboundMessageHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOutboundHandler;
import io.netty.channel.DefaultEventExecutor;
import io.netty.channel.EventExecutor;
import io.netty.channel.EventLoop;
@ -326,7 +328,9 @@ public class LocalTransportThreadModelTest {
}
}
private static class ThreadNameAuditor extends ChannelHandlerAdapter<Object, Object> {
private static class ThreadNameAuditor
extends ChannelHandlerAdapter
implements ChannelInboundHandler<Object>, ChannelOutboundHandler<Object> {
private final AtomicReference<Throwable> exception = new AtomicReference<Throwable>();
@ -373,7 +377,9 @@ public class LocalTransportThreadModelTest {
/**
* Converts integers into a binary stream.
*/
private static class MessageForwarder1 extends ChannelHandlerAdapter<Integer, Byte> {
private static class MessageForwarder1
extends ChannelHandlerAdapter
implements ChannelInboundHandler<Integer>, ChannelOutboundHandler<Byte> {
private final AtomicReference<Throwable> exception = new AtomicReference<Throwable>();
private volatile int inCnt;
@ -456,7 +462,9 @@ public class LocalTransportThreadModelTest {
/**
* Converts a binary stream into integers.
*/
private static class MessageForwarder2 extends ChannelHandlerAdapter<Byte, Integer> {
private static class MessageForwarder2
extends ChannelHandlerAdapter
implements ChannelInboundHandler<Byte>, ChannelOutboundHandler<Integer> {
private final AtomicReference<Throwable> exception = new AtomicReference<Throwable>();
private volatile int inCnt;
@ -531,7 +539,9 @@ public class LocalTransportThreadModelTest {
/**
* Simply forwards the received object to the next handler.
*/
private static class MessageForwarder3 extends ChannelHandlerAdapter<Object, Object> {
private static class MessageForwarder3
extends ChannelHandlerAdapter
implements ChannelInboundHandler<Object>, ChannelOutboundHandler<Object> {
private final AtomicReference<Throwable> exception = new AtomicReference<Throwable>();
private volatile int inCnt;
@ -607,7 +617,9 @@ public class LocalTransportThreadModelTest {
/**
* Discards all received messages.
*/
private static class MessageDiscarder extends ChannelHandlerAdapter<Object, Object> {
private static class MessageDiscarder
extends ChannelHandlerAdapter
implements ChannelInboundHandler<Object>, ChannelOutboundHandler<Object> {
private final AtomicReference<Throwable> exception = new AtomicReference<Throwable>();
private volatile int inCnt;