From 57a459d970ab6482267e9c7f270e751403a524e9 Mon Sep 17 00:00:00 2001 From: Andrei Pozolotin Date: Sun, 12 May 2013 14:48:22 -0500 Subject: [PATCH] updated udt and connection test --- pom.xml | 2 +- testsuite/.gitignore | 6 + .../udt/UDTClientServerConnectionTest.java | 539 ++++++++++-------- .../testsuite/transport/udt/package-info.java | 21 + 4 files changed, 332 insertions(+), 236 deletions(-) create mode 100644 testsuite/.gitignore create mode 100644 testsuite/src/test/java/io/netty/testsuite/transport/udt/package-info.java diff --git a/pom.xml b/pom.xml index 796680a619..1879a4cdce 100644 --- a/pom.xml +++ b/pom.xml @@ -144,7 +144,7 @@ com.barchart.udt barchart-udt-bundle - 2.2.6 + 2.3.0-SNAPSHOT diff --git a/testsuite/.gitignore b/testsuite/.gitignore new file mode 100644 index 0000000000..24514f4dba --- /dev/null +++ b/testsuite/.gitignore @@ -0,0 +1,6 @@ + +# +# UDT native libraries extract location. +# + +/lib diff --git a/testsuite/src/test/java/io/netty/testsuite/transport/udt/UDTClientServerConnectionTest.java b/testsuite/src/test/java/io/netty/testsuite/transport/udt/UDTClientServerConnectionTest.java index 2b35d2b4f4..314f0f2b78 100644 --- a/testsuite/src/test/java/io/netty/testsuite/transport/udt/UDTClientServerConnectionTest.java +++ b/testsuite/src/test/java/io/netty/testsuite/transport/udt/UDTClientServerConnectionTest.java @@ -15,10 +15,9 @@ */ package io.netty.testsuite.transport.udt; -import static org.junit.Assert.assertTrue; +import static org.junit.Assert.*; import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.ServerBootstrap; -import io.netty.buffer.BufType; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundMessageHandlerAdapter; @@ -33,164 +32,251 @@ import io.netty.handler.codec.DelimiterBasedFrameDecoder; import io.netty.handler.codec.Delimiters; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; -import io.netty.testsuite.transport.udt.UDTClientServerConnectionTest.Server.TestThreadFactory; import io.netty.util.CharsetUtil; -import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicInteger; -import java.util.logging.Level; -import java.util.logging.Logger; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +/** + * Verify UDT connect/disconnect life cycle. + */ public class UDTClientServerConnectionTest { - @Test - public void test() throws InterruptedException { - // first start server + static class Client implements Runnable { - int port = 1234; - Server server = new Server(port); - Thread serverTread = new Thread(server); - serverTread.start(); + static final Logger log = LoggerFactory.getLogger(Client.class); - Thread.sleep(1000); + final String host; + final int port; - // start a client - final String host = "localhost"; + volatile Channel channel; + volatile boolean isRunning; + volatile boolean isShutdown; - TestClient client = new TestClient(host, port); - Thread clientThread = new Thread(client); - clientThread.start(); - - Thread.sleep(1000); - - // check number of connections - assertTrue(server.connectedClients() == 1); - - // close client - client.shutdown(); - - Thread.sleep(1000); - - // check connections again - assertTrue(server.connectedClients() == 0); - } - - static class Server implements Runnable { - private final ChannelGroup channels = new DefaultChannelGroup( - "all channels"); - private Channel serverChannel; - - private static final Logger log = Logger.getLogger(Server.class - .getName()); - - private final int port; - private ServerBootstrap b; - private boolean running; - private boolean shutdown; - - public Server(final int port) { + Client(final String host, final int port) { + this.host = host; this.port = port; } - public void shutdown() { - log.info("shutting down server..."); - running = false; - while (!shutdown) { - try { - Thread.sleep(100); - } catch (InterruptedException e) { - //ignore - } - } - } - + @Override public void run() { - running = true; - b = new ServerBootstrap(); - final ThreadFactory acceptFactory = new TestThreadFactory("accept"); - final ThreadFactory connectFactory = new TestThreadFactory( - "connect"); - final NioEventLoopGroup acceptGroup = new NioEventLoopGroup(1, - acceptFactory, NioUdtProvider.BYTE_PROVIDER); + final Bootstrap boot = new Bootstrap(); + final ThreadFactory clientFactory = new ThreadFactory("client"); final NioEventLoopGroup connectGroup = new NioEventLoopGroup(1, - connectFactory, NioUdtProvider.BYTE_PROVIDER); + clientFactory, NioUdtProvider.BYTE_PROVIDER); try { - // Configure the server. - b.group(acceptGroup, connectGroup) - .channelFactory(NioUdtProvider.BYTE_ACCEPTOR) - .childHandler(new ChannelInitializer() { + boot.group(connectGroup) + .channelFactory(NioUdtProvider.BYTE_CONNECTOR) + .handler(new ChannelInitializer() { @Override - protected void initChannel(UdtChannel ch) + protected void initChannel(final UdtChannel ch) throws Exception { - ChannelPipeline pipeline = ch.pipeline(); + final ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast("framer", new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter())); pipeline.addLast("decoder", new StringDecoder( CharsetUtil.UTF_8)); pipeline.addLast("encoder", new StringEncoder( - BufType.BYTE)); - - pipeline.addLast("handler", new ServerHandler()); - channels.add(ch); - } - - @Override - public void channelInactive( - ChannelHandlerContext ctx) throws Exception { - log.log(Level.INFO, - "channel inactive, removing from channelgroup"); - channels.remove(ctx.channel()); + CharsetUtil.UTF_8)); + pipeline.addLast("handler", new ClientHandler()); } }); - // Start the server. - serverChannel = b.bind(port).sync().channel(); - waitForShutdownCommand(); - log.info("closing server channel..."); - serverChannel.close().sync(); - log.info("closing all accepted gateway channels..."); - channels.close().sync(); - log.info("channels closed"); - } catch (Exception e) { - log.log(Level.SEVERE, "GATEWAY SERVER DIED!", e); + channel = boot.connect(host, port).sync().channel(); + isRunning = true; + log.info("Client ready."); + waitForRunning(false); + log.info("Client closing..."); + channel.close().sync(); + isShutdown = true; + log.info("Client is done."); + } catch (final Throwable e) { + log.error("Client failed.", e); } finally { - acceptGroup.shutdownGracefully(); connectGroup.shutdownGracefully(); } - shutdown = true; } - public int connectedClients() { - return channels.size(); + void shutdown() { + isRunning = false; } - private void waitForShutdownCommand() { - while (running) { - try { - Thread.sleep(100); - } catch (InterruptedException e) { - e.printStackTrace(); + void waitForActive(final boolean isActive) throws Exception { + for (int k = 0; k < WAIT_COUNT; k++) { + Thread.sleep(WAIT_SLEEP); + final ClientHandler handler = channel.pipeline().get( + ClientHandler.class); + if (handler != null && isActive == handler.isActive) { + return; } } } - static class TestThreadFactory implements ThreadFactory { - - private static final AtomicInteger counter = new AtomicInteger(); - - private final String name; - - public TestThreadFactory(final String name) { - this.name = name; + void waitForRunning(final boolean isRunning) throws Exception { + for (int k = 0; k < WAIT_COUNT; k++) { + if (isRunning == this.isRunning) { + return; + } + Thread.sleep(WAIT_SLEEP); } + } - @Override - public Thread newThread(final Runnable runnable) { - return new Thread(runnable, name + '-' - + counter.getAndIncrement()); + private void waitForShutdown() throws Exception { + for (int k = 0; k < WAIT_COUNT; k++) { + if (isShutdown) { + return; + } + Thread.sleep(WAIT_SLEEP); + } + } + } + + static class ClientHandler extends + ChannelInboundMessageHandlerAdapter { + + static final Logger log = LoggerFactory.getLogger(ClientHandler.class); + + volatile boolean isActive; + + ClientHandler() { + } + + @Override + public void channelActive(final ChannelHandlerContext ctx) + throws Exception { + isActive = true; + log.info("Client active {}", ctx.channel()); + super.channelActive(ctx); + } + + @Override + public void channelInactive(final ChannelHandlerContext ctx) + throws Exception { + isActive = false; + log.info("Client inactive {}", ctx.channel()); + super.channelInactive(ctx); + } + + @Override + public void exceptionCaught(final ChannelHandlerContext ctx, + final Throwable cause) throws Exception { + log.warn("Client unexpected exception from downstream.", cause); + ctx.close(); + } + + @Override + public void messageReceived(final ChannelHandlerContext ctx, + final String msg) throws Exception { + log.info("Client received: " + msg); + } + } + + static class Server implements Runnable { + + static final Logger log = LoggerFactory.getLogger(Server.class); + + final ChannelGroup group = new DefaultChannelGroup("server group"); + + final String host; + final int port; + + volatile Channel channel; + volatile boolean isRunning; + volatile boolean isShutdown; + + Server(final String host, final int port) { + this.host = host; + this.port = port; + } + + @Override + public void run() { + final ServerBootstrap boot = new ServerBootstrap(); + final ThreadFactory acceptFactory = new ThreadFactory("accept"); + final ThreadFactory serverFactory = new ThreadFactory("server"); + final NioEventLoopGroup acceptGroup = new NioEventLoopGroup(1, + acceptFactory, NioUdtProvider.BYTE_PROVIDER); + final NioEventLoopGroup connectGroup = new NioEventLoopGroup(1, + serverFactory, NioUdtProvider.BYTE_PROVIDER); + try { + boot.group(acceptGroup, connectGroup) + .channelFactory(NioUdtProvider.BYTE_ACCEPTOR) + .childHandler(new ChannelInitializer() { + @Override + protected void initChannel(final UdtChannel ch) + throws Exception { + final ChannelPipeline pipeline = ch.pipeline(); + pipeline.addLast("framer", + new DelimiterBasedFrameDecoder(8192, + Delimiters.lineDelimiter())); + pipeline.addLast("decoder", new StringDecoder( + CharsetUtil.UTF_8)); + pipeline.addLast("encoder", new StringEncoder( + CharsetUtil.UTF_8)); + pipeline.addLast("handler", new ServerHandler( + group)); + } + }); + channel = boot.bind(port).sync().channel(); + isRunning = true; + log.info("Server ready."); + waitForRunning(false); + log.info("Server closing acceptor..."); + channel.close().sync(); + log.info("Server closing connectors..."); + group.close().sync(); + isShutdown = true; + log.info("Server is done."); + } catch (final Throwable e) { + log.error("Server failure.", e); + } finally { + acceptGroup.shutdownGracefully(); + connectGroup.shutdownGracefully(); + } + } + + void shutdown() { + isRunning = false; + } + + void waitForActive(final boolean isActive) throws Exception { + for (int k = 0; k < WAIT_COUNT; k++) { + Thread.sleep(WAIT_SLEEP); + if (isActive) { + for (final Channel channel : group) { + final ServerHandler handler = channel.pipeline().get( + ServerHandler.class); + if (handler != null && handler.isActive) { + return; + } + } + } else { + if (group.size() == 0) { + return; + } + } + } + } + + void waitForRunning(final boolean isRunning) throws Exception { + for (int k = 0; k < WAIT_COUNT; k++) { + if (isRunning == this.isRunning) { + return; + } + Thread.sleep(WAIT_SLEEP); + } + } + + void waitForShutdown() throws Exception { + for (int k = 0; k < WAIT_COUNT; k++) { + if (isShutdown) { + return; + } + Thread.sleep(WAIT_SLEEP); } } } @@ -198,140 +284,123 @@ public class UDTClientServerConnectionTest { static class ServerHandler extends ChannelInboundMessageHandlerAdapter { - private boolean isClosed; - private static final Logger log = Logger.getLogger(ServerHandler.class - .getName()); + static final Logger log = LoggerFactory.getLogger(ServerHandler.class); + + final ChannelGroup group; + + volatile boolean isActive; + + ServerHandler(final ChannelGroup group) { + this.group = group; + } + + @Override + public void channelActive(final ChannelHandlerContext ctx) + throws Exception { + group.add(ctx.channel()); + isActive = true; + log.info("Server active : {}", ctx.channel()); + super.channelActive(ctx); + } + + @Override + public void channelInactive(final ChannelHandlerContext ctx) + throws Exception { + group.remove(ctx.channel()); + isActive = false; + log.info("Server inactive: {}", ctx.channel()); + super.channelInactive(ctx); + } @Override public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) { - log.log(Level.WARNING, - "close the connection when an exception is raised", cause); + log.warn("Server close on exception.", cause); ctx.close(); } @Override - public void messageReceived(ChannelHandlerContext ctx, String message) - throws Exception { - log.log(Level.INFO, "received: " + message); - } - - @Override - public void channelInactive(ChannelHandlerContext ctx) throws Exception { - log.log(Level.INFO, "channel inactive"); - isClosed = true; - } - - @Override - public void channelUnregistered(ChannelHandlerContext ctx) - throws Exception { - log.log(Level.INFO, "channel unregistered"); - } - - public boolean isClosed() { - return isClosed; + public void messageReceived(final ChannelHandlerContext ctx, + final String message) throws Exception { + log.info("Server received: " + message); } } - static class TestClient implements Runnable { + static class ThreadFactory implements java.util.concurrent.ThreadFactory { - private static final Logger log = Logger.getLogger(TestClient.class - .getName()); + static final AtomicInteger counter = new AtomicInteger(); - private final String host; - private final int port; - private Channel channel; - private boolean running; + final String name; - public TestClient(final String host, final int port) { - this.host = host; - this.port = port; - } - - public void run() { - running = true; - // Configure the client. - final Bootstrap boot = new Bootstrap(); - final ThreadFactory connectFactory = new TestThreadFactory( - "connect"); - final NioEventLoopGroup connectGroup = new NioEventLoopGroup(1, - connectFactory, NioUdtProvider.BYTE_PROVIDER); - try { - boot.group(connectGroup) - .channelFactory(NioUdtProvider.BYTE_CONNECTOR) - .handler(new ChannelInitializer() { - - @Override - protected void initChannel(UdtChannel ch) - throws Exception { - ChannelPipeline pipeline = ch.pipeline(); - - // On top of the SSL handler, add the text line - // codec. - pipeline.addLast("framer", - new DelimiterBasedFrameDecoder(8192, - Delimiters.lineDelimiter())); - pipeline.addLast("decoder", new StringDecoder()); - pipeline.addLast("encoder", new StringEncoder( - BufType.BYTE)); - - // and then business logic. - pipeline.addLast("handler", new TestClientHandler()); - } - }); - // Start the connection attempt. - channel = boot.connect(host, port).sync().channel(); - waitForShutdownCommand(); - channel.disconnect().sync(); - channel.close().sync(); // close the channel and wait until done - - // channel.closeFuture().sync(); //wait for the channel to close - - } catch (Exception e) { - e.printStackTrace(); - } finally { - // Shut down the event loop to terminate all threads. - connectGroup.shutdownGracefully(); - } - log.info("test client done"); - } - - private void waitForShutdownCommand() { - while (running) { - try { - Thread.sleep(100); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } - } - - public void shutdown() { - running = false; - } - } - - static class TestClientHandler extends - ChannelInboundMessageHandlerAdapter { - - TestClientHandler() { - } - - private static final Logger logger = Logger - .getLogger(TestClientHandler.class.getName()); - - @Override - public void messageReceived(ChannelHandlerContext ctx, String msg) - throws Exception { - logger.info("client received: " + msg); + ThreadFactory(final String name) { + this.name = name; } @Override - public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) - throws Exception { - logger.log(Level.WARNING, "Unexpected exception from downstream.", - cause); - ctx.close(); + public Thread newThread(final Runnable runnable) { + return new Thread(runnable, name + '-' + counter.getAndIncrement()); } } + + static final Logger log = LoggerFactory + .getLogger(UDTClientServerConnectionTest.class); + + /** + * Maximum wait time is 5 seconds. + *

+ * wait-time = {@link #WAIT_COUNT} * {@value #WAIT_SLEEP} + */ + static final int WAIT_COUNT = 50; + static final int WAIT_SLEEP = 100; + + /** + * Verify UDT client/server connect and disconnect. + */ + @Test + public void connection() throws Exception { + + final String host = "localhost"; + final int port = 1234; + + log.info("Starting server."); + final Server server = new Server(host, port); + final Thread serverTread = new Thread(server, "server-*"); + serverTread.start(); + server.waitForRunning(true); + assertTrue(server.isRunning); + + log.info("Starting client."); + final Client client = new Client(host, port); + final Thread clientThread = new Thread(client, "client-*"); + clientThread.start(); + client.waitForRunning(true); + assertTrue(client.isRunning); + + log.info("Wait till connection is active."); + client.waitForActive(true); + server.waitForActive(true); + + log.info("Verify connection is active."); + assertEquals("group must have one", 1, server.group.size()); + + log.info("Stopping client."); + client.shutdown(); + client.waitForShutdown(); + assertTrue(client.isShutdown); + + log.info("Wait till connection is inactive."); + client.waitForActive(false); + server.waitForActive(false); + + log.info("Verify connection is inactive."); + assertEquals("group must be empty", 0, server.group.size()); + + log.info("Stopping server."); + server.shutdown(); + server.waitForShutdown(); + assertTrue(server.isShutdown); + + log.info("Finished server."); + } + } diff --git a/testsuite/src/test/java/io/netty/testsuite/transport/udt/package-info.java b/testsuite/src/test/java/io/netty/testsuite/transport/udt/package-info.java new file mode 100644 index 0000000000..3345a22a72 --- /dev/null +++ b/testsuite/src/test/java/io/netty/testsuite/transport/udt/package-info.java @@ -0,0 +1,21 @@ +/* + * Copyright 2012 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +/** + * Test suite classes for UDT. + */ +package io.netty.testsuite.transport.udt; +