diff --git a/codec/src/main/java/io/netty/handler/codec/embedder/AbstractCodecEmbedder.java b/codec/src/main/java/io/netty/handler/codec/embedder/AbstractCodecEmbedder.java deleted file mode 100644 index befc7539e4..0000000000 --- a/codec/src/main/java/io/netty/handler/codec/embedder/AbstractCodecEmbedder.java +++ /dev/null @@ -1,307 +0,0 @@ -/* - * Copyright 2012 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package io.netty.handler.codec.embedder; - -import io.netty.buffer.ChannelBuffer; -import io.netty.channel.Channel; -import io.netty.channel.ChannelBufferHolder; -import io.netty.channel.ChannelBufferHolders; -import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelHandler; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelInboundHandlerAdapter; -import io.netty.channel.ChannelOutboundHandlerAdapter; -import io.netty.channel.ChannelPipeline; -import io.netty.channel.EventLoop; -import io.netty.handler.codec.CodecException; - -import java.lang.reflect.Array; -import java.util.ConcurrentModificationException; -import java.util.LinkedList; -import java.util.Queue; - -/** - * A skeletal {@link CodecEmbedder} implementation. - */ -abstract class AbstractCodecEmbedder implements CodecEmbedder { - - private static final EventLoop loop = new EmbeddedEventLoop(); - - private final Queue productQueue = new LinkedList(); - private final Channel channel = new EmbeddedChannel(productQueue); - - /** - * Creates a new embedder whose pipeline is composed of the specified - * handlers. - */ - protected AbstractCodecEmbedder(ChannelHandler... handlers) { - if (handlers == null) { - throw new NullPointerException("handlers"); - } - - int inboundType = 0; // 0 - unknown, 1 - stream, 2 - message - int outboundType = 0; - int nHandlers = 0; - ChannelPipeline p = channel.pipeline(); - for (ChannelHandler h: handlers) { - if (h == null) { - break; - } - nHandlers ++; - - p.addLast(h); - ChannelHandlerContext ctx = p.context(h); - if (inboundType == 0) { - if (ctx.canHandleInbound()) { - ChannelHandlerContext inCtx = (ChannelHandlerContext) ctx; - if (inCtx.inbound().hasByteBuffer()) { - inboundType = 1; - } else { - inboundType = 2; - } - } - } - if (ctx.canHandleOutbound()) { - ChannelHandlerContext outCtx = (ChannelHandlerContext) ctx; - if (outCtx.outbound().hasByteBuffer()) { - outboundType = 1; - } else { - outboundType = 2; - } - } - } - - if (nHandlers == 0) { - throw new IllegalArgumentException("handlers is empty."); - } - - if (inboundType == 0 && outboundType == 0) { - throw new IllegalArgumentException("handlers does not provide any buffers."); - } - - p.addFirst(StreamToChannelBufferEncoder.INSTANCE); - - if (inboundType == 1) { - p.addFirst(ChannelBufferToStreamDecoder.INSTANCE); - } - - if (outboundType == 1) { - p.addLast(ChannelBufferToStreamEncoder.INSTANCE); - } - - p.addLast(new LastHandler()); - - loop.register(channel); - } - - @Override - public boolean finish() { - channel.pipeline().close().syncUninterruptibly(); - return !productQueue.isEmpty(); - } - - /** - * Returns the virtual {@link Channel} which will be used as a mock - * during encoding and decoding. - */ - protected final Channel channel() { - return channel; - } - - /** - * Returns {@code true} if and only if the produce queue is empty and - * therefore {@link #poll()} will return {@code null}. - */ - protected final boolean isEmpty() { - return productQueue.isEmpty(); - } - - @Override - public final E poll() { - return product(productQueue.poll()); - } - - @Override - public final E peek() { - return product(productQueue.peek()); - } - - @SuppressWarnings("unchecked") - private E product(Object p) { - if (p instanceof CodecException) { - throw (CodecException) p; - } - if (p instanceof Throwable) { - throw newCodecException((Throwable) p); - } - return (E) p; - } - - protected abstract CodecException newCodecException(Throwable t); - - @Override - public final Object[] pollAll() { - final int size = size(); - Object[] a = new Object[size]; - for (int i = 0; i < size; i ++) { - E product = poll(); - if (product == null) { - throw new ConcurrentModificationException(); - } - a[i] = product; - } - return a; - } - - @Override - @SuppressWarnings("unchecked") - public final T[] pollAll(T[] a) { - if (a == null) { - throw new NullPointerException("a"); - } - - final int size = size(); - - // Create a new array if the specified one is too small. - if (a.length < size) { - a = (T[]) Array.newInstance(a.getClass().getComponentType(), size); - } - - for (int i = 0;; i ++) { - T product = (T) poll(); - if (product == null) { - break; - } - a[i] = product; - } - - // Put the terminator if necessary. - if (a.length > size) { - a[size] = null; - } - - return a; - } - - @Override - public final int size() { - return productQueue.size(); - } - - @Override - public ChannelPipeline pipeline() { - return channel.pipeline(); - } - - private final class LastHandler extends ChannelInboundHandlerAdapter { - @Override - public ChannelBufferHolder newInboundBuffer( - ChannelHandlerContext ctx) throws Exception { - return ChannelBufferHolders.messageBuffer(productQueue); - } - - @Override - public void inboundBufferUpdated(ChannelHandlerContext ctx) throws Exception { - // NOOP - } - - @Override - public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { - productQueue.add(cause); - } - } - - @ChannelHandler.Sharable - private static final class StreamToChannelBufferEncoder extends ChannelOutboundHandlerAdapter { - - static final StreamToChannelBufferEncoder INSTANCE = new StreamToChannelBufferEncoder(); - - @Override - public ChannelBufferHolder newOutboundBuffer( - ChannelHandlerContext ctx) throws Exception { - return ChannelBufferHolders.byteBuffer(); - } - - @Override - public void flush(ChannelHandlerContext ctx, ChannelFuture future) throws Exception { - ChannelBuffer in = ctx.outboundByteBuffer(); - if (in.readable()) { - ctx.nextOutboundMessageBuffer().add(in.readBytes(in.readableBytes())); - } - ctx.flush(future); - } - } - - @ChannelHandler.Sharable - private static final class ChannelBufferToStreamDecoder extends ChannelInboundHandlerAdapter { - - static final ChannelBufferToStreamDecoder INSTANCE = new ChannelBufferToStreamDecoder(); - - @Override - public ChannelBufferHolder newInboundBuffer( - ChannelHandlerContext ctx) throws Exception { - return ChannelBufferHolders.messageBuffer(); - } - - @Override - public void inboundBufferUpdated(ChannelHandlerContext ctx) throws Exception { - Queue in = ctx.inboundMessageBuffer(); - for (;;) { - Object msg = in.poll(); - if (msg == null) { - break; - } - if (msg instanceof ChannelBuffer) { - ChannelBuffer buf = (ChannelBuffer) msg; - ctx.nextInboundByteBuffer().writeBytes(buf, buf.readerIndex(), buf.readableBytes()); - } else { - ctx.nextInboundMessageBuffer().add(msg); - } - } - ctx.fireInboundBufferUpdated(); - } - } - - @ChannelHandler.Sharable - private static final class ChannelBufferToStreamEncoder extends ChannelOutboundHandlerAdapter { - - static final ChannelBufferToStreamEncoder INSTANCE = new ChannelBufferToStreamEncoder(); - - @Override - public ChannelBufferHolder newOutboundBuffer( - ChannelHandlerContext ctx) throws Exception { - return ChannelBufferHolders.messageBuffer(); - } - - @Override - public void flush(ChannelHandlerContext ctx, ChannelFuture future) throws Exception { - Queue in = ctx.outboundMessageBuffer(); - for (;;) { - Object msg = in.poll(); - if (msg == null) { - break; - } - if (msg instanceof ChannelBuffer) { - ChannelBuffer buf = (ChannelBuffer) msg; - ctx.nextOutboundByteBuffer().writeBytes(buf, buf.readerIndex(), buf.readableBytes()); - } else { - ctx.nextOutboundMessageBuffer().add(msg); - } - } - ctx.flush(future); - } - } -} diff --git a/codec/src/main/java/io/netty/handler/codec/embedder/CodecEmbedder.java b/codec/src/main/java/io/netty/handler/codec/embedder/CodecEmbedder.java deleted file mode 100644 index 51ef217e6c..0000000000 --- a/codec/src/main/java/io/netty/handler/codec/embedder/CodecEmbedder.java +++ /dev/null @@ -1,99 +0,0 @@ -/* - * Copyright 2012 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package io.netty.handler.codec.embedder; - -import io.netty.channel.ChannelPipeline; - -import java.util.Collection; - -/** - * A helper that wraps an encoder or a decoder (codec) so that they can be used - * without doing actual I/O in unit tests or higher level codecs. Please refer - * to {@link EncoderEmbedder} and {@link DecoderEmbedder} for more information. - */ -public interface CodecEmbedder { - /** - * Offers an input object to the pipeline of this embedder. - * - * @return {@code true} if and only if there is something to read in the - * product queue (see {@link #poll()} and {@link #peek()}) - */ - boolean offer(Object input); - - /** - * Signals the pipeline that the encoding or decoding has been finished and - * no more data will be offered. - * - * @return {@code true} if and only if there is something to read in the - * product queue (see {@link #poll()} and {@link #peek()}) - */ - boolean finish(); - - /** - * Consumes an encoded or decoded output from the product queue. The output - * object is generated by the offered input objects. - * - * @return an encoded or decoded object. - * {@code null} if and only if there is no output object left in the - * product queue. - */ - E poll(); - - /** - * Reads an encoded or decoded output from the head of the product queue. - * The difference from {@link #poll()} is that it does not remove the - * retrieved object from the product queue. - * - * @return an encoded or decoded object. - * {@code null} if and only if there is no output object left in the - * product queue. - */ - E peek(); - - /** - * Consumes all encoded or decoded output from the product queue. The - * output object is generated by the offered input objects. The behavior - * of this method is identical with {@link Collection#toArray()} except that - * the product queue is cleared. - * - * @return an array of all encoded or decoded objects. - * An empty array is returned if and only if there is no output - * object left in the product queue. - */ - Object[] pollAll(); - - /** - * Consumes all encoded or decoded output from the product queue. The - * output object is generated by the offered input objects. The behavior - * of this method is identical with {@link Collection#toArray(Object[])} - * except that the product queue is cleared. - * - * @return an array of all encoded or decoded objects. - * An empty array is returned if and only if there is no output - * object left in the product queue. - */ - T[] pollAll(T[] a); - - /** - * Returns the number of encoded or decoded output in the product queue. - */ - int size(); - - /** - * Returns the {@link ChannelPipeline} that handles the input. - */ - ChannelPipeline pipeline(); -} diff --git a/codec/src/main/java/io/netty/handler/codec/embedder/DecoderEmbedder.java b/codec/src/main/java/io/netty/handler/codec/embedder/DecoderEmbedder.java deleted file mode 100644 index 8f655ad83b..0000000000 --- a/codec/src/main/java/io/netty/handler/codec/embedder/DecoderEmbedder.java +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Copyright 2012 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package io.netty.handler.codec.embedder; - -import io.netty.buffer.ChannelBuffer; -import io.netty.buffer.ChannelBuffers; -import io.netty.channel.ChannelHandler; -import io.netty.channel.ChannelPipeline; -import io.netty.channel.NoSuchBufferException; -import io.netty.handler.codec.CodecException; -import io.netty.handler.codec.DecoderException; -import io.netty.handler.codec.base64.Base64Decoder; -import io.netty.handler.codec.string.StringDecoder; - -/** - * A helper that wraps a decoder so that it can be used without doing actual - * I/O in unit tests or higher level codecs. For example, you can decode a - * Base64-encoded {@link ChannelBuffer} with {@link Base64Decoder} and - * {@link StringDecoder} without setting up the {@link ChannelPipeline} and - * other mock objects by yourself: - *
- * {@link ChannelBuffer} base64Data = {@link ChannelBuffers}.copiedBuffer("Zm9vYmFy", CharsetUtil.US_ASCII);
- *
- * {@link DecoderEmbedder}<String> embedder = new {@link DecoderEmbedder}<String>(
- *         new {@link Base64Decoder}(), new {@link StringDecoder}());
- *
- * embedder.offer(base64Data);
- *
- * String decoded = embedder.poll();
- * assert decoded.equals("foobar");
- * 
- * @apiviz.landmark - * @see EncoderEmbedder - */ -public class DecoderEmbedder extends AbstractCodecEmbedder { - - /** - * Creates a new embedder whose pipeline is composed of the specified - * handlers. - */ - public DecoderEmbedder(ChannelHandler... handlers) { - super(handlers); - } - - @Override - public boolean offer(Object input) { - if (input instanceof ChannelBuffer) { - try { - pipeline().inboundByteBuffer().writeBytes((ChannelBuffer) input); - } catch (NoSuchBufferException e) { - // Throwing and catching this exception is cheap because we do not fill - // stack traces internally (see DefaultChannelPipeline). - pipeline().inboundMessageBuffer().add(input); - } - } else { - pipeline().inboundMessageBuffer().add(input); - } - - pipeline().fireInboundBufferUpdated(); - return !isEmpty(); - } - - @Override - protected CodecException newCodecException(Throwable t) { - return new DecoderException(t); - } -} diff --git a/codec/src/main/java/io/netty/handler/codec/embedder/EmbeddedChannel.java b/codec/src/main/java/io/netty/handler/codec/embedder/EmbeddedChannel.java deleted file mode 100644 index 61bf01cb12..0000000000 --- a/codec/src/main/java/io/netty/handler/codec/embedder/EmbeddedChannel.java +++ /dev/null @@ -1,130 +0,0 @@ -/* - * Copyright 2012 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package io.netty.handler.codec.embedder; - -import io.netty.channel.AbstractChannel; -import io.netty.channel.ChannelBufferHolder; -import io.netty.channel.ChannelBufferHolders; -import io.netty.channel.ChannelConfig; -import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelType; -import io.netty.channel.DefaultChannelConfig; -import io.netty.channel.EventLoop; - -import java.net.SocketAddress; -import java.util.Queue; - -class EmbeddedChannel extends AbstractChannel { - - private final ChannelConfig config = new DefaultChannelConfig(); - private final SocketAddress localAddress = new EmbeddedSocketAddress(); - private final SocketAddress remoteAddress = new EmbeddedSocketAddress(); - private final Queue productQueue; - private int state; // 0 = OPEN, 1 = ACTIVE, 2 = CLOSED - - EmbeddedChannel(Queue productQueue) { - super(null, null, ChannelBufferHolders.messageBuffer()); - this.productQueue = productQueue; - } - - @Override - public ChannelType type() { - return ChannelType.MESSAGE; - } - - @Override - public ChannelConfig config() { - return config; - } - - @Override - public boolean isOpen() { - return state < 2; - } - - @Override - public boolean isActive() { - return state == 1; - } - - @Override - protected boolean isCompatible(EventLoop loop) { - return loop instanceof EmbeddedEventLoop; - } - - @Override - protected SocketAddress localAddress0() { - return isActive()? localAddress : null; - } - - @Override - protected SocketAddress remoteAddress0() { - return isActive()? remoteAddress : null; - } - - @Override - protected Runnable doRegister() throws Exception { - state = 1; - return null; - } - - @Override - protected void doBind(SocketAddress localAddress) throws Exception { - // NOOP - } - - @Override - protected void doDisconnect() throws Exception { - doClose(); - } - - @Override - protected void doClose() throws Exception { - state = 2; - } - - @Override - protected void doDeregister() throws Exception { - // NOOP - } - - @Override - protected void doFlush(ChannelBufferHolder buf) throws Exception { - Queue msgBuf = buf.messageBuffer(); - if (!msgBuf.isEmpty()) { - productQueue.addAll(msgBuf); - msgBuf.clear(); - } - } - - @Override - protected Unsafe newUnsafe() { - return new DefaultUnsafe(); - } - - @Override - protected boolean isFlushPending() { - return false; - } - - private class DefaultUnsafe extends AbstractUnsafe { - @Override - public void connect(SocketAddress remoteAddress, - SocketAddress localAddress, ChannelFuture future) { - future.setSuccess(); - } - } -} diff --git a/codec/src/main/java/io/netty/handler/codec/embedder/EncoderEmbedder.java b/codec/src/main/java/io/netty/handler/codec/embedder/EncoderEmbedder.java deleted file mode 100644 index f7aae488f3..0000000000 --- a/codec/src/main/java/io/netty/handler/codec/embedder/EncoderEmbedder.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Copyright 2012 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package io.netty.handler.codec.embedder; - -import io.netty.buffer.ChannelBuffer; -import io.netty.channel.ChannelHandler; -import io.netty.channel.ChannelPipeline; -import io.netty.handler.codec.CodecException; -import io.netty.handler.codec.EncoderException; -import io.netty.handler.codec.base64.Base64Encoder; -import io.netty.handler.codec.string.StringEncoder; -import io.netty.util.CharsetUtil; - -/** - * A helper that wraps an encoder so that it can be used without doing actual - * I/O in unit tests or higher level codecs. For example, you can encode a - * {@link String} into a Base64-encoded {@link ChannelBuffer} with - * {@link Base64Encoder} and {@link StringEncoder} without setting up the - * {@link ChannelPipeline} and other mock objects by yourself: - *
- * String data = "foobar";
- *
- * {@link EncoderEmbedder}<{@link ChannelBuffer}> embedder = new {@link EncoderEmbedder}<{@link ChannelBuffer}>(
- *         new {@link Base64Encoder}(), new {@link StringEncoder}());
- *
- * embedder.offer(data);
- *
- * {@link ChannelBuffer} encoded = embedder.poll();
- * assert encoded.toString({@link CharsetUtil}.US_ASCII).equals("Zm9vYmFy");
- * 
- * @apiviz.landmark - * @see DecoderEmbedder - */ -public class EncoderEmbedder extends AbstractCodecEmbedder { - - /** - * Creates a new embedder whose pipeline is composed of the specified - * handlers. - */ - public EncoderEmbedder(ChannelHandler... handlers) { - super(handlers); - } - - @Override - public boolean offer(Object input) { - channel().write(input); - return !isEmpty(); - } - - @Override - protected CodecException newCodecException(Throwable t) { - return new EncoderException(t); - } -} diff --git a/codec/src/test/java/io/netty/handler/codec/ReplayingDecoderTest.java b/codec/src/test/java/io/netty/handler/codec/ReplayingDecoderTest.java index 7b751dd87c..71b87eb17e 100644 --- a/codec/src/test/java/io/netty/handler/codec/ReplayingDecoderTest.java +++ b/codec/src/test/java/io/netty/handler/codec/ReplayingDecoderTest.java @@ -20,7 +20,7 @@ import io.netty.buffer.ChannelBuffer; import io.netty.buffer.ChannelBufferIndexFinder; import io.netty.buffer.ChannelBuffers; import io.netty.channel.ChannelHandlerContext; -import io.netty.handler.codec.embedder.DecoderEmbedder; +import io.netty.channel.embedded.EmbeddedStreamChannel; import io.netty.util.VoidEnum; import org.junit.Test; @@ -29,23 +29,23 @@ public class ReplayingDecoderTest { @Test public void testLineProtocol() { - DecoderEmbedder e = new DecoderEmbedder(new LineDecoder()); + EmbeddedStreamChannel ch = new EmbeddedStreamChannel(new LineDecoder()); // Ordinary input - e.offer(ChannelBuffers.wrappedBuffer(new byte[] { 'A' })); - assertNull(e.poll()); - e.offer(ChannelBuffers.wrappedBuffer(new byte[] { 'B' })); - assertNull(e.poll()); - e.offer(ChannelBuffers.wrappedBuffer(new byte[] { 'C' })); - assertNull(e.poll()); - e.offer(ChannelBuffers.wrappedBuffer(new byte[] { '\n' })); - assertEquals(ChannelBuffers.wrappedBuffer(new byte[] { 'A', 'B', 'C' }), e.poll()); + ch.writeInbound(ChannelBuffers.wrappedBuffer(new byte[] { 'A' })); + assertNull(ch.readInbound()); + ch.writeInbound(ChannelBuffers.wrappedBuffer(new byte[] { 'B' })); + assertNull(ch.readInbound()); + ch.writeInbound(ChannelBuffers.wrappedBuffer(new byte[] { 'C' })); + assertNull(ch.readInbound()); + ch.writeInbound(ChannelBuffers.wrappedBuffer(new byte[] { '\n' })); + assertEquals(ChannelBuffers.wrappedBuffer(new byte[] { 'A', 'B', 'C' }), ch.readInbound()); // Truncated input - e.offer(ChannelBuffers.wrappedBuffer(new byte[] { 'A' })); - assertNull(e.poll()); - e.finish(); - assertNull(e.poll()); + ch.writeInbound(ChannelBuffers.wrappedBuffer(new byte[] { 'A' })); + assertNull(ch.readInbound()); + ch.close(); + assertNull(ch.readInbound()); } private static final class LineDecoder extends ReplayingDecoder { diff --git a/testsuite/src/test/java/io/netty/testsuite/transport/socket/AbstractSocketClientBootstrapTest.java b/testsuite/src/test/java/io/netty/testsuite/transport/socket/AbstractSocketClientBootstrapTest.java deleted file mode 100644 index 22b18c4083..0000000000 --- a/testsuite/src/test/java/io/netty/testsuite/transport/socket/AbstractSocketClientBootstrapTest.java +++ /dev/null @@ -1,171 +0,0 @@ -/* - * Copyright 2012 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package io.netty.testsuite.transport.socket; - -import static org.junit.Assert.*; -import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelPipelineException; -import io.netty.testsuite.util.DummyHandler; -import io.netty.util.SocketAddresses; -import io.netty.util.internal.ExecutorUtil; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.nio.channels.ServerSocketChannel; -import java.util.concurrent.Executor; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; - -import org.easymock.EasyMock; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Test; - - -/** - * An abstract test class to test socket client bootstraps - */ -public abstract class AbstractSocketClientBootstrapTest { - - private static ExecutorService executor; - - @BeforeClass - public static void init() { - executor = Executors.newCachedThreadPool(); - } - - @AfterClass - public static void destroy() { - ExecutorUtil.terminate(executor); - } - - protected abstract ChannelFactory newClientSocketChannelFactory(Executor executor); - - @Test(timeout = 10000) - public void testFailedConnectionAttempt() throws Exception { - ClientBootstrap bootstrap = new ClientBootstrap(); - bootstrap.setFactory(newClientSocketChannelFactory(executor)); - bootstrap.pipeline().addLast("dummy", new DummyHandler()); - bootstrap.setOption("remoteAddress", new InetSocketAddress("255.255.255.255", 1)); - ChannelFuture future = bootstrap.connect(); - future.awaitUninterruptibly(); - assertFalse(future.isSuccess()); - assertTrue(future.cause() instanceof IOException); - } - - @Test(timeout = 10000) - public void testSuccessfulConnectionAttempt() throws Throwable { - ServerSocketChannel serverSocket = ServerSocketChannel.open(); - serverSocket.socket().bind(new InetSocketAddress(0)); - - try { - serverSocket.configureBlocking(false); - - ClientBootstrap bootstrap = - new ClientBootstrap(newClientSocketChannelFactory(executor)); - - bootstrap.pipeline().addLast("dummy", new DummyHandler()); - bootstrap.setOption( - "remoteAddress", - new InetSocketAddress( - SocketAddresses.LOCALHOST, - serverSocket.socket().getLocalPort())); - - ChannelFuture future = bootstrap.connect(); - serverSocket.accept(); - future.awaitUninterruptibly(); - - if (future.cause() != null) { - throw future.cause(); - } - assertTrue(future.isSuccess()); - - future.channel().close().awaitUninterruptibly(); - } finally { - try { - serverSocket.close(); - } catch (IOException e) { - // Ignore. - } - } - } - - @Test(timeout = 10000) - public void testSuccessfulConnectionAttemptWithLocalAddress() throws Throwable { - ServerSocketChannel serverSocket = ServerSocketChannel.open(); - serverSocket.socket().bind(new InetSocketAddress(0)); - - try { - serverSocket.configureBlocking(false); - - ClientBootstrap bootstrap = - new ClientBootstrap(newClientSocketChannelFactory(executor)); - - bootstrap.pipeline().addLast("dummy", new DummyHandler()); - bootstrap.setOption( - "remoteAddress", - new InetSocketAddress( - SocketAddresses.LOCALHOST, - serverSocket.socket().getLocalPort())); - bootstrap.setOption("localAddress", new InetSocketAddress(0)); - - ChannelFuture future = bootstrap.connect(); - serverSocket.accept(); - future.awaitUninterruptibly(); - - if (future.cause() != null) { - throw future.cause(); - } - assertTrue(future.isSuccess()); - - future.channel().close().awaitUninterruptibly(); - } finally { - try { - serverSocket.close(); - } catch (IOException e) { - // Ignore. - } - } - } - - @Test(expected = ChannelPipelineException.class) - public void testFailedPipelineInitialization() throws Exception { - ClientBootstrap bootstrap = new ClientBootstrap(EasyMock.createMock(ChannelFactory.class)); - ChannelPipelineFactory pipelineFactory = EasyMock.createMock(ChannelPipelineFactory.class); - bootstrap.setPipelineFactory(pipelineFactory); - - EasyMock.expect(pipelineFactory.pipeline()).andThrow(new ChannelPipelineException()); - EasyMock.replay(pipelineFactory); - - bootstrap.connect(new InetSocketAddress(SocketAddresses.LOCALHOST, 1)); - } - - @Test(expected = IllegalStateException.class) - public void shouldHaveRemoteAddressOption() { - new ClientBootstrap(EasyMock.createMock(ChannelFactory.class)).connect(); - } - - - @Test(expected = NullPointerException.class) - public void shouldDisallowNullRemoteAddressParameter1() { - new ClientBootstrap(EasyMock.createMock(ChannelFactory.class)).connect(null); - } - - @Test(expected = NullPointerException.class) - public void shouldDisallowNullRemoteAddressParameter2() { - new ClientBootstrap(EasyMock.createMock(ChannelFactory.class)).connect(null, null); - } -} diff --git a/testsuite/src/test/java/io/netty/testsuite/transport/socket/AbstractSocketServerBootstrapTest.java b/testsuite/src/test/java/io/netty/testsuite/transport/socket/AbstractSocketServerBootstrapTest.java deleted file mode 100644 index 0f89733333..0000000000 --- a/testsuite/src/test/java/io/netty/testsuite/transport/socket/AbstractSocketServerBootstrapTest.java +++ /dev/null @@ -1,222 +0,0 @@ -/* - * Copyright 2012 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package io.netty.testsuite.transport.socket; - -import static org.junit.Assert.*; -import io.netty.bootstrap.ServerBootstrap; -import io.netty.channel.Channel; -import io.netty.channel.ChannelException; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelPipelineException; -import io.netty.channel.socket.SocketChannelConfig; -import io.netty.logging.InternalLogger; -import io.netty.logging.InternalLoggerFactory; -import io.netty.testsuite.util.DummyHandler; -import io.netty.util.SocketAddresses; -import io.netty.util.internal.ExecutorUtil; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.net.ServerSocket; -import java.net.Socket; -import java.util.concurrent.Executor; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; - -import org.easymock.EasyMock; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Test; - - -/** - * An abstract test class to test server socket bootstraps - */ -public abstract class AbstractSocketServerBootstrapTest { - - private static final InternalLogger logger = - InternalLoggerFactory.getInstance(AbstractSocketServerBootstrapTest.class); - - private static final boolean BUFSIZE_MODIFIABLE; - - static { - boolean bufSizeModifiable = true; - - Socket s = new Socket(); - try { - s.setReceiveBufferSize(1234); - try { - if (s.getReceiveBufferSize() != 1234) { - throw new IllegalStateException(); - } - } catch (Exception e) { - bufSizeModifiable = false; - logger.warn( - "Socket.getReceiveBufferSize() does not work: " + e); - } - } catch (Exception e) { - bufSizeModifiable = false; - logger.warn( - "Socket.setReceiveBufferSize() does not work: " + e); - } finally { - BUFSIZE_MODIFIABLE = bufSizeModifiable; - try { - s.close(); - } catch (IOException e) { - // Ignore. - } - } - } - - private static ExecutorService executor; - - @BeforeClass - public static void init() { - executor = Executors.newCachedThreadPool(); - } - - @AfterClass - public static void destroy() { - ExecutorUtil.terminate(executor); - } - - protected abstract ChannelFactory newServerSocketChannelFactory(Executor executor); - - @Test(timeout = 30000, expected = ChannelException.class) - public void testFailedBindAttempt() throws Exception { - final ServerSocket ss = new ServerSocket(0); - final int boundPort = ss.getLocalPort(); - try { - ServerBootstrap bootstrap = new ServerBootstrap(); - bootstrap.setFactory(newServerSocketChannelFactory(executor)); - bootstrap.setOption("localAddress", new InetSocketAddress(boundPort)); - bootstrap.bind().close().awaitUninterruptibly(); - } finally { - ss.close(); - } - } - - @Test(timeout = 30000) - public void testSuccessfulBindAttempt() throws Exception { - ServerBootstrap bootstrap = new ServerBootstrap( - newServerSocketChannelFactory(executor)); - - bootstrap.setParentHandler(new ParentChannelHandler()); - bootstrap.setOption("localAddress", new InetSocketAddress(0)); - bootstrap.setOption("child.receiveBufferSize", 9753); - bootstrap.setOption("child.sendBufferSize", 8642); - - bootstrap.pipeline().addLast("dummy", new DummyHandler()); - - Channel channel = bootstrap.bind(); - ParentChannelHandler pch = - channel.pipeline().get(ParentChannelHandler.class); - - Socket socket = null; - try { - socket = new Socket( - SocketAddresses.LOCALHOST, - ((InetSocketAddress) channel.getLocalAddress()).getPort()); - - // Wait until the connection is open in the server side. - while (pch.child == null) { - Thread.yield(); - } - - SocketChannelConfig cfg = (SocketChannelConfig) pch.child.getConfig(); - if (BUFSIZE_MODIFIABLE) { - assertEquals(9753, cfg.getReceiveBufferSize()); - assertEquals(8642, cfg.getSendBufferSize()); - } - } finally { - if (socket != null) { - try { - socket.close(); - } catch (IOException e) { - // Ignore. - } - } - channel.close().awaitUninterruptibly(); - } - - // Wait until the child connection is closed in the client side. - // We do not use Channel.close() to make sure it is closed automatically. - while (pch.child.isOpen()) { - try { - Thread.sleep(1); - } catch (InterruptedException e) { - // Ignore - } - } - - // Wait until all child events are fired. - while (pch.result.length() < 2) { - try { - Thread.sleep(1); - } catch (InterruptedException e) { - // Ignore - } - } - - // Confirm the received child events. - assertEquals("12", pch.result.toString()); - } - - @Test(expected = ChannelPipelineException.class) - public void testFailedPipelineInitialization() throws Exception { - ClientBootstrap bootstrap = new ClientBootstrap(EasyMock.createMock(ChannelFactory.class)); - ChannelPipelineFactory pipelineFactory = EasyMock.createMock(ChannelPipelineFactory.class); - bootstrap.setPipelineFactory(pipelineFactory); - - EasyMock.expect(pipelineFactory.pipeline()).andThrow(new ChannelPipelineException()); - EasyMock.replay(pipelineFactory); - - bootstrap.connect(new InetSocketAddress(SocketAddresses.LOCALHOST, 1)); - } - - @Test(expected = IllegalStateException.class) - public void shouldHaveLocalAddressOption() { - new ServerBootstrap(EasyMock.createMock(ServerChannelFactory.class)).bind(); - } - - - @Test(expected = NullPointerException.class) - public void shouldDisallowNullLocalAddressParameter() { - new ServerBootstrap(EasyMock.createMock(ServerChannelFactory.class)).bind(null); - } - - private static class ParentChannelHandler extends SimpleChannelUpstreamHandler { - - volatile Channel child; - final StringBuffer result = new StringBuffer(); - - ParentChannelHandler() { - } - - @Override - public void childChannelClosed(ChannelHandlerContext ctx, - ChildChannelStateEvent e) throws Exception { - result.append('2'); - } - - @Override - public void childChannelOpen(ChannelHandlerContext ctx, - ChildChannelStateEvent e) throws Exception { - child = e.getChildChannel(); - result.append('1'); - } - } -} diff --git a/testsuite/src/test/java/io/netty/testsuite/util/DummyHandler.java b/testsuite/src/test/java/io/netty/testsuite/util/DummyHandler.java deleted file mode 100644 index 5b5452a0fc..0000000000 --- a/testsuite/src/test/java/io/netty/testsuite/util/DummyHandler.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Copyright 2012 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package io.netty.testsuite.util; - -import io.netty.channel.ChannelHandlerContext; - -public class DummyHandler implements ChannelUpstreamHandler, ChannelDownstreamHandler { - - public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e) - throws Exception { - ctx.sendUpstream(e); - } - - public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent e) - throws Exception { - ctx.sendDownstream(e); - } -} diff --git a/transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java b/transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java index 228227bfce..06874290a8 100644 --- a/transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java +++ b/transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java @@ -401,22 +401,22 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements @Override public ChannelBuffer nextInboundByteBuffer() { - return DefaultChannelPipeline.nextInboundByteBuffer(executor(), next); + return DefaultChannelPipeline.nextInboundByteBuffer(next); } @Override public Queue nextInboundMessageBuffer() { - return DefaultChannelPipeline.nextInboundMessageBuffer(executor(), next); + return DefaultChannelPipeline.nextInboundMessageBuffer(next); } @Override public ChannelBuffer nextOutboundByteBuffer() { - return pipeline.nextOutboundByteBuffer(executor(), prev); + return pipeline.nextOutboundByteBuffer(prev); } @Override public Queue nextOutboundMessageBuffer() { - return pipeline.nextOutboundMessageBuffer(executor(), prev); + return pipeline.nextOutboundMessageBuffer(prev); } @Override diff --git a/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java b/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java index d371e1b25a..58f2f0d916 100644 --- a/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java +++ b/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java @@ -31,7 +31,6 @@ import java.util.Map; import java.util.NoSuchElementException; import java.util.Queue; import java.util.concurrent.Callable; -import java.util.concurrent.Executor; import java.util.concurrent.Future; /** @@ -891,7 +890,7 @@ public class DefaultChannelPipeline implements ChannelPipeline { throw new NoSuchBufferException( "The first inbound buffer of this channel must be a message buffer."); } - return nextInboundMessageBuffer(SingleThreadEventExecutor.currentEventLoop(), head.next); + return nextInboundMessageBuffer(head.next); } @Override @@ -900,17 +899,17 @@ public class DefaultChannelPipeline implements ChannelPipeline { throw new NoSuchBufferException( "The first inbound buffer of this channel must be a byte buffer."); } - return nextInboundByteBuffer(SingleThreadEventExecutor.currentEventLoop(), head.next); + return nextInboundByteBuffer(head.next); } @Override public Queue outboundMessageBuffer() { - return nextOutboundMessageBuffer(SingleThreadEventExecutor.currentEventLoop(), tail); + return nextOutboundMessageBuffer(tail); } @Override public ChannelBuffer outboundByteBuffer() { - return nextOutboundByteBuffer(SingleThreadEventExecutor.currentEventLoop(), tail); + return nextOutboundByteBuffer(tail); } static boolean hasNextInboundByteBuffer(DefaultChannelHandlerContext ctx) { @@ -937,13 +936,14 @@ public class DefaultChannelPipeline implements ChannelPipeline { } } - static ChannelBuffer nextInboundByteBuffer(Executor currentExecutor, DefaultChannelHandlerContext ctx) { + static ChannelBuffer nextInboundByteBuffer(DefaultChannelHandlerContext ctx) { + final Thread currentThread = Thread.currentThread(); for (;;) { if (ctx == null) { throw new NoSuchBufferException(); } if (ctx.inByteBuf != null) { - if (currentExecutor == ctx.executor()) { + if (ctx.executor().inEventLoop(currentThread)) { return ctx.inByteBuf; } else { StreamBridge bridge = ctx.inByteBridge.get(); @@ -960,14 +960,15 @@ public class DefaultChannelPipeline implements ChannelPipeline { } } - static Queue nextInboundMessageBuffer(Executor currentExecutor, DefaultChannelHandlerContext ctx) { + static Queue nextInboundMessageBuffer(DefaultChannelHandlerContext ctx) { + final Thread currentThread = Thread.currentThread(); for (;;) { if (ctx == null) { throw new NoSuchBufferException(); } if (ctx.inMsgBuf != null) { - if (currentExecutor == ctx.executor()) { + if (ctx.executor().inEventLoop(currentThread)) { return ctx.inMsgBuf; } else { MessageBridge bridge = ctx.inMsgBridge.get(); @@ -1010,14 +1011,15 @@ public class DefaultChannelPipeline implements ChannelPipeline { } } - ChannelBuffer nextOutboundByteBuffer(Executor currentExecutor, DefaultChannelHandlerContext ctx) { + ChannelBuffer nextOutboundByteBuffer(DefaultChannelHandlerContext ctx) { + final Thread currentThread = Thread.currentThread(); for (;;) { if (ctx == null) { throw new NoSuchBufferException(); } if (ctx.outByteBuf != null) { - if (currentExecutor == ctx.executor()) { + if (ctx.executor().inEventLoop(currentThread)) { return ctx.outByteBuf; } else { StreamBridge bridge = ctx.outByteBridge.get(); @@ -1034,14 +1036,15 @@ public class DefaultChannelPipeline implements ChannelPipeline { } } - Queue nextOutboundMessageBuffer(Executor currentExecutor, DefaultChannelHandlerContext ctx) { + Queue nextOutboundMessageBuffer(DefaultChannelHandlerContext ctx) { + final Thread currentThread = Thread.currentThread(); for (;;) { if (ctx == null) { throw new NoSuchBufferException(); } if (ctx.outMsgBuf != null) { - if (currentExecutor == ctx.executor()) { + if (ctx.executor().inEventLoop(currentThread)) { return ctx.outMsgBuf; } else { MessageBridge bridge = ctx.outMsgBridge.get(); @@ -1618,7 +1621,7 @@ public class DefaultChannelPipeline implements ChannelPipeline { } @SuppressWarnings("rawtypes") - private final class HeadHandler implements ChannelOutboundHandler, ChannelOperationHandler { + private final class HeadHandler implements ChannelOutboundHandler { @Override public ChannelBufferHolder newOutboundBuffer(ChannelHandlerContext ctx) throws Exception { switch (channel.type()) { diff --git a/transport/src/main/java/io/netty/channel/EventExecutor.java b/transport/src/main/java/io/netty/channel/EventExecutor.java index 9b5551f944..0e595465a7 100644 --- a/transport/src/main/java/io/netty/channel/EventExecutor.java +++ b/transport/src/main/java/io/netty/channel/EventExecutor.java @@ -19,6 +19,7 @@ import java.util.concurrent.ScheduledExecutorService; public interface EventExecutor extends ScheduledExecutorService { boolean inEventLoop(); + boolean inEventLoop(Thread thread); Unsafe unsafe(); public interface Unsafe { diff --git a/transport/src/main/java/io/netty/channel/MultithreadEventExecutor.java b/transport/src/main/java/io/netty/channel/MultithreadEventExecutor.java index b2b20fd18a..d4b2c3f308 100644 --- a/transport/src/main/java/io/netty/channel/MultithreadEventExecutor.java +++ b/transport/src/main/java/io/netty/channel/MultithreadEventExecutor.java @@ -204,6 +204,11 @@ public abstract class MultithreadEventExecutor implements EventExecutor { return SingleThreadEventExecutor.currentEventLoop() != null; } + @Override + public boolean inEventLoop(Thread thread) { + throw new UnsupportedOperationException(); + } + private static EventExecutor currentEventLoop() { EventExecutor loop = SingleThreadEventExecutor.currentEventLoop(); if (loop == null) { diff --git a/transport/src/main/java/io/netty/channel/SingleThreadEventExecutor.java b/transport/src/main/java/io/netty/channel/SingleThreadEventExecutor.java index 73c76f7870..e2194ecc79 100644 --- a/transport/src/main/java/io/netty/channel/SingleThreadEventExecutor.java +++ b/transport/src/main/java/io/netty/channel/SingleThreadEventExecutor.java @@ -216,7 +216,12 @@ public abstract class SingleThreadEventExecutor extends AbstractExecutorService @Override public boolean inEventLoop() { - return Thread.currentThread() == thread; + return inEventLoop(Thread.currentThread()); + } + + @Override + public boolean inEventLoop(Thread thread) { + return thread == this.thread; } public void addShutdownHook(final Runnable task) { diff --git a/transport/src/main/java/io/netty/channel/embedded/AbstractEmbeddedChannel.java b/transport/src/main/java/io/netty/channel/embedded/AbstractEmbeddedChannel.java new file mode 100644 index 0000000000..a6775b32bf --- /dev/null +++ b/transport/src/main/java/io/netty/channel/embedded/AbstractEmbeddedChannel.java @@ -0,0 +1,223 @@ +/* + * Copyright 2012 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.embedded; + +import io.netty.buffer.ChannelBuffer; +import io.netty.buffer.ChannelBuffers; +import io.netty.channel.AbstractChannel; +import io.netty.channel.ChannelBufferHolder; +import io.netty.channel.ChannelBufferHolders; +import io.netty.channel.ChannelConfig; +import io.netty.channel.ChannelException; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandler; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.ChannelOutboundHandler; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.DefaultChannelConfig; +import io.netty.channel.EventLoop; + +import java.net.SocketAddress; +import java.util.ArrayDeque; +import java.util.Queue; + +public abstract class AbstractEmbeddedChannel extends AbstractChannel { + + private final EmbeddedEventLoop loop = new EmbeddedEventLoop(); + private final ChannelConfig config = new DefaultChannelConfig(); + private final SocketAddress localAddress = new EmbeddedSocketAddress(); + private final SocketAddress remoteAddress = new EmbeddedSocketAddress(); + private final Queue lastInboundMessageBuffer = new ArrayDeque(); + private final ChannelBuffer lastInboundByteBuffer = ChannelBuffers.dynamicBuffer(); + private Throwable lastException; + private int state; // 0 = OPEN, 1 = ACTIVE, 2 = CLOSED + + AbstractEmbeddedChannel(ChannelBufferHolder outboundBuffer, ChannelHandler... handlers) { + super(null, null, outboundBuffer); + + if (handlers == null) { + throw new NullPointerException("handlers"); + } + + int nHandlers = 0; + boolean hasBuffer = false; + ChannelPipeline p = pipeline(); + for (ChannelHandler h: handlers) { + if (h == null) { + break; + } + nHandlers ++; + p.addLast(h); + if (h instanceof ChannelInboundHandler || h instanceof ChannelOutboundHandler) { + hasBuffer = true; + } + } + + if (nHandlers == 0) { + throw new IllegalArgumentException("handlers is empty."); + } + + if (!hasBuffer) { + throw new IllegalArgumentException("handlers does not provide any buffers."); + } + + p.addLast(new LastInboundMessageHandler(), new LastInboundStreamHandler()); + loop.register(this); + } + + @Override + public ChannelConfig config() { + return config; + } + + @Override + public boolean isOpen() { + return state < 2; + } + + @Override + public boolean isActive() { + return state == 1; + } + + public Queue lastInboundMessageBuffer() { + return lastInboundMessageBuffer; + } + + public ChannelBuffer lastInboundByteBuffer() { + return lastInboundByteBuffer; + } + + public Object readInbound() { + if (lastInboundByteBuffer.readable()) { + try { + return lastInboundByteBuffer.readBytes(lastInboundByteBuffer.readableBytes()); + } finally { + lastInboundByteBuffer.clear(); + } + } + return lastInboundMessageBuffer.poll(); + } + + public void checkException() { + Throwable t = lastException; + if (t == null) { + return; + } + + lastException = null; + + if (t instanceof RuntimeException) { + throw (RuntimeException) t; + } + if (t instanceof Error) { + throw (Error) t; + } + + throw new ChannelException(t); + } + + @Override + protected boolean isCompatible(EventLoop loop) { + return loop instanceof EmbeddedEventLoop; + } + + @Override + protected SocketAddress localAddress0() { + return isActive()? localAddress : null; + } + + @Override + protected SocketAddress remoteAddress0() { + return isActive()? remoteAddress : null; + } + + @Override + protected Runnable doRegister() throws Exception { + state = 1; + return null; + } + + @Override + protected void doBind(SocketAddress localAddress) throws Exception { + // NOOP + } + + @Override + protected void doDisconnect() throws Exception { + doClose(); + } + + @Override + protected void doClose() throws Exception { + state = 2; + } + + @Override + protected void doDeregister() throws Exception { + // NOOP + } + + @Override + protected Unsafe newUnsafe() { + return new DefaultUnsafe(); + } + + @Override + protected boolean isFlushPending() { + return false; + } + + private class DefaultUnsafe extends AbstractUnsafe { + @Override + public void connect(SocketAddress remoteAddress, + SocketAddress localAddress, ChannelFuture future) { + future.setSuccess(); + } + } + + private final class LastInboundMessageHandler extends ChannelInboundHandlerAdapter { + @Override + public ChannelBufferHolder newInboundBuffer(ChannelHandlerContext ctx) throws Exception { + return ChannelBufferHolders.messageBuffer(lastInboundMessageBuffer); + } + + @Override + public void inboundBufferUpdated(ChannelHandlerContext ctx) throws Exception { + // Do nothing. + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) + throws Exception { + lastException = cause; + } + } + + private final class LastInboundStreamHandler extends ChannelInboundHandlerAdapter { + @Override + public ChannelBufferHolder newInboundBuffer(ChannelHandlerContext ctx) throws Exception { + return ChannelBufferHolders.byteBuffer(lastInboundByteBuffer); + } + + @Override + public void inboundBufferUpdated(ChannelHandlerContext ctx) throws Exception { + // No nothing + } + } +} diff --git a/codec/src/main/java/io/netty/handler/codec/embedder/EmbeddedEventLoop.java b/transport/src/main/java/io/netty/channel/embedded/EmbeddedEventLoop.java similarity index 96% rename from codec/src/main/java/io/netty/handler/codec/embedder/EmbeddedEventLoop.java rename to transport/src/main/java/io/netty/channel/embedded/EmbeddedEventLoop.java index f385f6340f..667a461752 100644 --- a/codec/src/main/java/io/netty/handler/codec/embedder/EmbeddedEventLoop.java +++ b/transport/src/main/java/io/netty/channel/embedded/EmbeddedEventLoop.java @@ -13,7 +13,7 @@ * License for the specific language governing permissions and limitations * under the License. */ -package io.netty.handler.codec.embedder; +package io.netty.channel.embedded; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; @@ -102,6 +102,11 @@ class EmbeddedEventLoop extends AbstractExecutorService implements return true; } + @Override + public boolean inEventLoop(Thread thread) { + return true; + } + @Override public Unsafe unsafe() { return this; diff --git a/transport/src/main/java/io/netty/channel/embedded/EmbeddedMessageChannel.java b/transport/src/main/java/io/netty/channel/embedded/EmbeddedMessageChannel.java new file mode 100644 index 0000000000..d3e5ca5a0d --- /dev/null +++ b/transport/src/main/java/io/netty/channel/embedded/EmbeddedMessageChannel.java @@ -0,0 +1,58 @@ +package io.netty.channel.embedded; + +import io.netty.channel.ChannelBufferHolders; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelType; + +import java.util.ArrayDeque; +import java.util.Queue; + +public class EmbeddedMessageChannel extends AbstractEmbeddedChannel { + + private final Queue lastOutboundBuffer = new ArrayDeque(); + + public EmbeddedMessageChannel(ChannelHandler... handlers) { + super(ChannelBufferHolders.messageBuffer(), handlers); + } + + @Override + public ChannelType type() { + return ChannelType.MESSAGE; + } + + public Queue inboundBuffer() { + return pipeline().inboundMessageBuffer(); + } + + public Queue lastOutboundBuffer() { + return lastOutboundBuffer; + } + + public Object readOutbound() { + return lastOutboundBuffer.poll(); + } + + public boolean writeInbound(Object msg) { + inboundBuffer().add(msg); + pipeline().fireInboundBufferUpdated(); + checkException(); + return lastInboundByteBuffer().readable() || !lastInboundMessageBuffer().isEmpty(); + } + + public boolean writeOutbound(Object msg) { + write(msg); + checkException(); + return !lastOutboundBuffer().isEmpty(); + } + + @Override + protected void doFlushMessageBuffer(Queue buf) throws Exception { + for (;;) { + Object o = buf.poll(); + if (o == null) { + break; + } + lastOutboundBuffer.add(o); + } + } +} diff --git a/codec/src/main/java/io/netty/handler/codec/embedder/EmbeddedSocketAddress.java b/transport/src/main/java/io/netty/channel/embedded/EmbeddedSocketAddress.java similarity index 95% rename from codec/src/main/java/io/netty/handler/codec/embedder/EmbeddedSocketAddress.java rename to transport/src/main/java/io/netty/channel/embedded/EmbeddedSocketAddress.java index 814da31274..7e5bc93ff2 100644 --- a/codec/src/main/java/io/netty/handler/codec/embedder/EmbeddedSocketAddress.java +++ b/transport/src/main/java/io/netty/channel/embedded/EmbeddedSocketAddress.java @@ -13,7 +13,7 @@ * License for the specific language governing permissions and limitations * under the License. */ -package io.netty.handler.codec.embedder; +package io.netty.channel.embedded; import java.net.SocketAddress; diff --git a/transport/src/main/java/io/netty/channel/embedded/EmbeddedStreamChannel.java b/transport/src/main/java/io/netty/channel/embedded/EmbeddedStreamChannel.java new file mode 100644 index 0000000000..7cad3a69c8 --- /dev/null +++ b/transport/src/main/java/io/netty/channel/embedded/EmbeddedStreamChannel.java @@ -0,0 +1,61 @@ +package io.netty.channel.embedded; + +import io.netty.buffer.ChannelBuffer; +import io.netty.buffer.ChannelBuffers; +import io.netty.channel.ChannelBufferHolders; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelType; + +public class EmbeddedStreamChannel extends AbstractEmbeddedChannel { + + private final ChannelBuffer lastOutboundBuffer = ChannelBuffers.dynamicBuffer(); + + public EmbeddedStreamChannel(ChannelHandler... handlers) { + super(ChannelBufferHolders.messageBuffer(), handlers); + } + + @Override + public ChannelType type() { + return ChannelType.STREAM; + } + + public ChannelBuffer inboundBuffer() { + return pipeline().inboundByteBuffer(); + } + + public ChannelBuffer lastOutboundBuffer() { + return lastOutboundBuffer; + } + + public ChannelBuffer readOutbound() { + if (!lastOutboundBuffer.readable()) { + return null; + } + try { + return lastOutboundBuffer.readBytes(lastOutboundBuffer.readableBytes()); + } finally { + lastOutboundBuffer.clear(); + } + } + + public boolean writeInbound(ChannelBuffer data) { + inboundBuffer().writeBytes(data); + pipeline().fireInboundBufferUpdated(); + checkException(); + return lastInboundByteBuffer().readable() || !lastInboundMessageBuffer().isEmpty(); + } + + public boolean writeOutbound(Object msg) { + write(msg); + checkException(); + return lastOutboundBuffer().readable(); + } + + @Override + protected void doFlushByteBuffer(ChannelBuffer buf) throws Exception { + if (!lastOutboundBuffer.readable()) { + lastOutboundBuffer.discardReadBytes(); + } + lastOutboundBuffer.writeBytes(buf); + } +} diff --git a/codec/src/main/java/io/netty/handler/codec/embedder/package-info.java b/transport/src/main/java/io/netty/channel/embedded/package-info.java similarity index 74% rename from codec/src/main/java/io/netty/handler/codec/embedder/package-info.java rename to transport/src/main/java/io/netty/channel/embedded/package-info.java index 3681552fa9..b69473f70f 100644 --- a/codec/src/main/java/io/netty/handler/codec/embedder/package-info.java +++ b/transport/src/main/java/io/netty/channel/embedded/package-info.java @@ -15,10 +15,8 @@ */ /** - * A helper that wraps an encoder or a decoder so that they can be used without - * doing actual I/O in unit tests or higher level codecs. - * - * @apiviz.exclude CodecEmbedder$ + * A virtual {@link io.netty.channel.Channel} that helps wrapping a series of handlers to + * unit test the handlers or use them in non-I/O context. */ -package io.netty.handler.codec.embedder; +package io.netty.channel.embedded; diff --git a/transport/src/main/java/io/netty/channel/socket/oio/OioEventLoop.java b/transport/src/main/java/io/netty/channel/socket/oio/OioEventLoop.java index 2364b3726f..d8783b0817 100644 --- a/transport/src/main/java/io/netty/channel/socket/oio/OioEventLoop.java +++ b/transport/src/main/java/io/netty/channel/socket/oio/OioEventLoop.java @@ -259,6 +259,11 @@ public class OioEventLoop implements EventLoop { return SingleThreadEventExecutor.currentEventLoop() != null; } + @Override + public boolean inEventLoop(Thread thread) { + throw new UnsupportedOperationException(); + } + private EventLoop nextChild() { OioChildEventLoop loop = idleChildren.poll(); if (loop == null) { diff --git a/transport/src/test/java/io/netty/channel/DefaultChannelPipelineTest.java b/transport/src/test/java/io/netty/channel/DefaultChannelPipelineTest.java index 86a6478b4f..045e8df1eb 100644 --- a/transport/src/test/java/io/netty/channel/DefaultChannelPipelineTest.java +++ b/transport/src/test/java/io/netty/channel/DefaultChannelPipelineTest.java @@ -52,17 +52,7 @@ public class DefaultChannelPipelineTest { } @Sharable - private static class TestHandler extends ChannelHandlerAdapter { - @Override - public ChannelBufferHolder newInboundBuffer( - ChannelHandlerContext ctx) throws Exception { - return ChannelBufferHolders.byteBuffer(); - } - - @Override - public ChannelBufferHolder newOutboundBuffer( - ChannelHandlerContext ctx) throws Exception { - return ChannelBufferHolders.byteBuffer(); - } - }; + private static class TestHandler extends ChannelHandlerAdapter { + // Dummy + } } diff --git a/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest.java b/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest.java index 4d5d386872..330f4530c8 100644 --- a/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest.java +++ b/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest.java @@ -23,8 +23,10 @@ import io.netty.channel.ChannelBufferHolders; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandler; import io.netty.channel.ChannelInboundMessageHandlerAdapter; import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOutboundHandler; import io.netty.channel.DefaultEventExecutor; import io.netty.channel.EventExecutor; import io.netty.channel.EventLoop; @@ -326,7 +328,9 @@ public class LocalTransportThreadModelTest { } } - private static class ThreadNameAuditor extends ChannelHandlerAdapter { + private static class ThreadNameAuditor + extends ChannelHandlerAdapter + implements ChannelInboundHandler, ChannelOutboundHandler { private final AtomicReference exception = new AtomicReference(); @@ -373,7 +377,9 @@ public class LocalTransportThreadModelTest { /** * Converts integers into a binary stream. */ - private static class MessageForwarder1 extends ChannelHandlerAdapter { + private static class MessageForwarder1 + extends ChannelHandlerAdapter + implements ChannelInboundHandler, ChannelOutboundHandler { private final AtomicReference exception = new AtomicReference(); private volatile int inCnt; @@ -456,7 +462,9 @@ public class LocalTransportThreadModelTest { /** * Converts a binary stream into integers. */ - private static class MessageForwarder2 extends ChannelHandlerAdapter { + private static class MessageForwarder2 + extends ChannelHandlerAdapter + implements ChannelInboundHandler, ChannelOutboundHandler { private final AtomicReference exception = new AtomicReference(); private volatile int inCnt; @@ -531,7 +539,9 @@ public class LocalTransportThreadModelTest { /** * Simply forwards the received object to the next handler. */ - private static class MessageForwarder3 extends ChannelHandlerAdapter { + private static class MessageForwarder3 + extends ChannelHandlerAdapter + implements ChannelInboundHandler, ChannelOutboundHandler { private final AtomicReference exception = new AtomicReference(); private volatile int inCnt; @@ -607,7 +617,9 @@ public class LocalTransportThreadModelTest { /** * Discards all received messages. */ - private static class MessageDiscarder extends ChannelHandlerAdapter { + private static class MessageDiscarder + extends ChannelHandlerAdapter + implements ChannelInboundHandler, ChannelOutboundHandler { private final AtomicReference exception = new AtomicReference(); private volatile int inCnt;