From e6ceb91a85011585118236b151f86460f3740edb Mon Sep 17 00:00:00 2001 From: Trustin Lee Date: Sun, 3 Jun 2012 02:40:58 -0700 Subject: [PATCH] Add AbstractDatagramTest / Port unicast test / Ignore 'Socket closed' --- .../socket/AbstractDatagramTest.java | 98 ++++++-------- .../transport/socket/DatagramUnicastTest.java | 70 ++++++++++ ...st.java => SocketFixedLengthEchoTest.java} | 125 ++++++++---------- .../socket/oio/OioDatagramChannel.java | 6 + 4 files changed, 166 insertions(+), 133 deletions(-) create mode 100644 testsuite/src/test/java/io/netty/testsuite/transport/socket/DatagramUnicastTest.java rename testsuite/src/test/java/io/netty/testsuite/transport/socket/{AbstractSocketFixedLengthEchoTest.java => SocketFixedLengthEchoTest.java} (52%) diff --git a/testsuite/src/test/java/io/netty/testsuite/transport/socket/AbstractDatagramTest.java b/testsuite/src/test/java/io/netty/testsuite/transport/socket/AbstractDatagramTest.java index da052f692e..bef131dd64 100644 --- a/testsuite/src/test/java/io/netty/testsuite/transport/socket/AbstractDatagramTest.java +++ b/testsuite/src/test/java/io/netty/testsuite/transport/socket/AbstractDatagramTest.java @@ -15,77 +15,55 @@ */ package io.netty.testsuite.transport.socket; -import static org.junit.Assert.assertTrue; -import io.netty.bootstrap.ConnectionlessBootstrap; -import io.netty.buffer.ChannelBuffer; -import io.netty.buffer.ChannelBuffers; -import io.netty.channel.Channel; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.MessageEvent; -import io.netty.channel.SimpleChannelUpstreamHandler; -import io.netty.channel.socket.DatagramChannelFactory; -import io.netty.util.internal.ExecutorUtil; +import io.netty.bootstrap.Bootstrap; +import io.netty.logging.InternalLogger; +import io.netty.logging.InternalLoggerFactory; +import io.netty.testsuite.transport.socket.SocketTestPermutation.Factory; +import io.netty.testsuite.util.TestUtils; +import io.netty.util.SocketAddresses; -import java.net.InetAddress; +import java.lang.reflect.Method; import java.net.InetSocketAddress; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Executor; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; +import java.util.List; +import java.util.Map.Entry; -import org.junit.AfterClass; -import org.junit.Assert; -import org.junit.BeforeClass; -import org.junit.Test; +import org.junit.Rule; +import org.junit.rules.TestName; public abstract class AbstractDatagramTest { - private static ExecutorService executor; + private static final List, Factory>> COMBO = + SocketTestPermutation.datagram(); + @Rule + public final TestName testName = new TestName(); - @BeforeClass - public static void init() { - executor = Executors.newCachedThreadPool(); - } + protected final InternalLogger logger = InternalLoggerFactory.getInstance(getClass()); - @AfterClass - public static void destroy() { - ExecutorUtil.terminate(executor); - } + protected volatile Bootstrap sb; + protected volatile Bootstrap cb; + protected volatile InetSocketAddress addr; - protected abstract DatagramChannelFactory newServerSocketChannelFactory(Executor executor); - protected abstract DatagramChannelFactory newClientSocketChannelFactory(Executor executor); + protected void run() throws Exception { + int i = 0; + for (Entry, Factory> e: COMBO) { + sb = e.getKey().newInstance(); + cb = e.getValue().newInstance(); + addr = new InetSocketAddress( + SocketAddresses.LOCALHOST, TestUtils.getFreePort()); + sb.localAddress(addr); + cb.localAddress(0).remoteAddress(addr); - @Test - public void testSimpleSend() throws Throwable { - ConnectionlessBootstrap sb = new ConnectionlessBootstrap(newServerSocketChannelFactory(executor)); - ConnectionlessBootstrap cb = new ConnectionlessBootstrap(newClientSocketChannelFactory(executor)); - - final CountDownLatch latch = new CountDownLatch(1); - sb.pipeline().addFirst("handler", new SimpleChannelUpstreamHandler() { - - @Override - public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { - super.messageReceived(ctx, e); - Assert.assertEquals(1,((ChannelBuffer)e.getMessage()).readInt()); - - latch.countDown(); + logger.info(String.format( + "Running: %s %d of %d", testName.getMethodName(), ++ i, COMBO.size())); + try { + Method m = getClass().getDeclaredMethod( + testName.getMethodName(), Bootstrap.class, Bootstrap.class); + m.invoke(this, sb, cb); + } finally { + sb.shutdown(); + cb.shutdown(); } - - }); - cb.pipeline().addFirst("handler", new SimpleChannelUpstreamHandler()); - - Channel sc = sb.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0)); - - Channel cc = cb.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0)); - assertTrue(cc.write(ChannelBuffers.copyInt(1), sc.getLocalAddress()).awaitUninterruptibly().isSuccess()); - - assertTrue(latch.await(10, TimeUnit.SECONDS)); - sc.close().awaitUninterruptibly(); - cc.close().awaitUninterruptibly(); - sb.releaseExternalResources(); - cb.releaseExternalResources(); - + } } } diff --git a/testsuite/src/test/java/io/netty/testsuite/transport/socket/DatagramUnicastTest.java b/testsuite/src/test/java/io/netty/testsuite/transport/socket/DatagramUnicastTest.java new file mode 100644 index 0000000000..fc442373bb --- /dev/null +++ b/testsuite/src/test/java/io/netty/testsuite/transport/socket/DatagramUnicastTest.java @@ -0,0 +1,70 @@ +/* + * Copyright 2011 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.testsuite.transport.socket; + +import static org.junit.Assert.*; +import io.netty.bootstrap.Bootstrap; +import io.netty.buffer.ChannelBuffers; +import io.netty.channel.Channel; +import io.netty.channel.ChannelInboundHandlerContext; +import io.netty.channel.ChannelInboundMessageHandlerAdapter; +import io.netty.channel.socket.DatagramPacket; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.junit.Assert; +import org.junit.Test; + +public class DatagramUnicastTest extends AbstractDatagramTest { + + @Test + public void testSimpleSend() throws Throwable { + run(); + } + + public void testSimpleSend(Bootstrap sb, Bootstrap cb) throws Throwable { + final CountDownLatch latch = new CountDownLatch(1); + + sb.handler(new ChannelInboundMessageHandlerAdapter() { + @Override + public void messageReceived( + ChannelInboundHandlerContext ctx, + DatagramPacket msg) throws Exception { + Assert.assertEquals(1, msg.data().readInt()); + latch.countDown(); + } + }); + + cb.handler(new ChannelInboundMessageHandlerAdapter() { + @Override + public void messageReceived( + ChannelInboundHandlerContext ctx, + DatagramPacket msg) throws Exception { + // Nothing will be sent. + } + }); + + Channel sc = sb.bind().sync().channel(); + Channel cc = cb.bind().sync().channel(); + + cc.write(new DatagramPacket(ChannelBuffers.copyInt(1), addr)).sync(); + assertTrue(latch.await(10, TimeUnit.SECONDS)); + + sc.close().sync(); + cc.close().sync(); + } +} diff --git a/testsuite/src/test/java/io/netty/testsuite/transport/socket/AbstractSocketFixedLengthEchoTest.java b/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketFixedLengthEchoTest.java similarity index 52% rename from testsuite/src/test/java/io/netty/testsuite/transport/socket/AbstractSocketFixedLengthEchoTest.java rename to testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketFixedLengthEchoTest.java index 5281d464ea..ba711cec20 100644 --- a/testsuite/src/test/java/io/netty/testsuite/transport/socket/AbstractSocketFixedLengthEchoTest.java +++ b/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketFixedLengthEchoTest.java @@ -16,78 +16,59 @@ package io.netty.testsuite.transport.socket; import static org.junit.Assert.*; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.util.Random; -import java.util.concurrent.Executor; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.atomic.AtomicReference; - -import io.netty.bootstrap.ClientBootstrap; +import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.ChannelBuffer; import io.netty.buffer.ChannelBuffers; import io.netty.channel.Channel; -import io.netty.channel.ChannelFactory; -import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelStateEvent; -import io.netty.channel.ExceptionEvent; -import io.netty.channel.MessageEvent; -import io.netty.channel.SimpleChannelUpstreamHandler; +import io.netty.channel.ChannelInboundHandlerContext; +import io.netty.channel.ChannelInboundMessageHandlerAdapter; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.FixedLengthFrameDecoder; -import io.netty.util.SocketAddresses; -import io.netty.util.internal.ExecutorUtil; -import org.junit.AfterClass; -import org.junit.BeforeClass; + +import java.io.IOException; +import java.util.Random; +import java.util.concurrent.atomic.AtomicReference; + import org.junit.Test; -public abstract class AbstractSocketFixedLengthEchoTest { +public class SocketFixedLengthEchoTest extends AbstractSocketTest { private static final Random random = new Random(); static final byte[] data = new byte[1048576]; - private static ExecutorService executor; - static { random.nextBytes(data); } - @BeforeClass - public static void init() { - executor = Executors.newCachedThreadPool(); - } - - @AfterClass - public static void destroy() { - ExecutorUtil.terminate(executor); - } - - protected abstract ChannelFactory newServerSocketChannelFactory(Executor executor); - protected abstract ChannelFactory newClientSocketChannelFactory(Executor executor); - @Test public void testFixedLengthEcho() throws Throwable { - ServerBootstrap sb = new ServerBootstrap(newServerSocketChannelFactory(executor)); - ClientBootstrap cb = new ClientBootstrap(newClientSocketChannelFactory(executor)); + run(); + } - EchoHandler sh = new EchoHandler(); - EchoHandler ch = new EchoHandler(); + public void testFixedLengthEcho(ServerBootstrap sb, Bootstrap cb) throws Throwable { + final EchoHandler sh = new EchoHandler(); + final EchoHandler ch = new EchoHandler(); - sb.pipeline().addLast("decoder", new FixedLengthFrameDecoder(1024)); - sb.pipeline().addAfter("decoder", "handler", sh); - cb.pipeline().addLast("decoder", new FixedLengthFrameDecoder(1024)); - cb.pipeline().addAfter("decoder", "handler", ch); + sb.childHandler(new ChannelInitializer() { + @Override + public void initChannel(SocketChannel sch) throws Exception { + sch.pipeline().addLast("decoder", new FixedLengthFrameDecoder(1024)); + sch.pipeline().addAfter("decoder", "handler", sh); + } + }); - Channel sc = sb.bind(new InetSocketAddress(0)); - int port = ((InetSocketAddress) sc.getLocalAddress()).getPort(); + cb.handler(new ChannelInitializer() { + @Override + public void initChannel(SocketChannel sch) throws Exception { + sch.pipeline().addLast("decoder", new FixedLengthFrameDecoder(1024)); + sch.pipeline().addAfter("decoder", "handler", ch); + } + }); - ChannelFuture ccf = cb.connect(new InetSocketAddress(SocketAddresses.LOCALHOST, port)); - assertTrue(ccf.awaitUninterruptibly().isSuccess()); - - Channel cc = ccf.channel(); + Channel sc = sb.bind().sync().channel(); + Channel cc = cb.connect().sync().channel(); for (int i = 0; i < data.length;) { int length = Math.min(random.nextInt(1024 * 3), data.length - i); cc.write(ChannelBuffers.wrappedBuffer(data, i, length)); @@ -124,9 +105,9 @@ public abstract class AbstractSocketFixedLengthEchoTest { } } - sh.channel.close().awaitUninterruptibly(); - ch.channel.close().awaitUninterruptibly(); - sc.close().awaitUninterruptibly(); + sh.channel.close().sync(); + ch.channel.close().sync(); + sc.close().sync(); if (sh.exception.get() != null && !(sh.exception.get() instanceof IOException)) { throw sh.exception.get(); @@ -142,46 +123,44 @@ public abstract class AbstractSocketFixedLengthEchoTest { } } - private static class EchoHandler extends SimpleChannelUpstreamHandler { + private static class EchoHandler extends ChannelInboundMessageHandlerAdapter { volatile Channel channel; final AtomicReference exception = new AtomicReference(); volatile int counter; - EchoHandler() { + @Override + public void channelActive(ChannelInboundHandlerContext ctx) + throws Exception { + channel = ctx.channel(); } @Override - public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e) - throws Exception { - channel = e.channel(); - } + public void messageReceived( + ChannelInboundHandlerContext ctx, + ChannelBuffer msg) throws Exception { + assertEquals(1024, msg.readableBytes()); - @Override - public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) - throws Exception { - ChannelBuffer m = (ChannelBuffer) e.getMessage(); - assertEquals(1024, m.readableBytes()); - - byte[] actual = new byte[m.readableBytes()]; - m.getBytes(0, actual); + byte[] actual = new byte[msg.readableBytes()]; + msg.getBytes(0, actual); int lastIdx = counter; for (int i = 0; i < actual.length; i ++) { assertEquals(data[i + lastIdx], actual[i]); } - if (channel.getParent() != null) { - channel.write(m); + if (channel.parent() != null) { + channel.write(msg); } counter += actual.length; } @Override - public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) + public void exceptionCaught( + ChannelInboundHandlerContext ctx, Throwable cause) throws Exception { - if (exception.compareAndSet(null, e.cause())) { - e.channel().close(); + if (exception.compareAndSet(null, cause)) { + ctx.close(); } } } diff --git a/transport/src/main/java/io/netty/channel/socket/oio/OioDatagramChannel.java b/transport/src/main/java/io/netty/channel/socket/oio/OioDatagramChannel.java index 435c4c3fdc..60d8b7cc26 100644 --- a/transport/src/main/java/io/netty/channel/socket/oio/OioDatagramChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/oio/OioDatagramChannel.java @@ -35,6 +35,7 @@ import java.net.NetworkInterface; import java.net.SocketAddress; import java.net.SocketException; import java.net.SocketTimeoutException; +import java.util.Locale; import java.util.Queue; public class OioDatagramChannel extends AbstractOioMessageChannel @@ -164,6 +165,11 @@ public class OioDatagramChannel extends AbstractOioMessageChannel } catch (SocketTimeoutException e) { // Expected return 0; + } catch (SocketException e) { + if (!e.getMessage().toLowerCase(Locale.US).contains("socket closed")) { + throw e; + } + return -1; } }