From 98a2bb62f5f4bb0b6bfc08e01ced827e06e2ea2d Mon Sep 17 00:00:00 2001 From: Trustin Lee Date: Tue, 16 Dec 2014 17:40:00 +0900 Subject: [PATCH] Log detailed information about renegotiation and traffic Motivation: We need more information to understand why SocketSslEchoTest fails sporadically in the CI machine. Modifications: - Refactor SocketSslEchoTest so that it is easier to retrieve the information about renegotiation and the current progress Result: We will get more information when the test fails. --- .../transport/socket/SocketSslEchoTest.java | 327 +++++++++++------- 1 file changed, 210 insertions(+), 117 deletions(-) diff --git a/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketSslEchoTest.java b/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketSslEchoTest.java index 11aec876cc..5e4435234f 100644 --- a/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketSslEchoTest.java +++ b/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketSslEchoTest.java @@ -21,6 +21,7 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelHandler.Sharable; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInitializer; import io.netty.channel.SimpleChannelInboundHandler; @@ -55,7 +56,7 @@ import java.util.List; import java.util.Random; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import static org.hamcrest.Matchers.*; @@ -171,6 +172,28 @@ public class SocketSslEchoTest extends AbstractSocketTest { private final boolean useChunkedWriteHandler; private final boolean useCompositeByteBuf; + private final AtomicReference clientException = new AtomicReference(); + private final AtomicReference serverException = new AtomicReference(); + + private final AtomicInteger clientSendCounter = new AtomicInteger(); + private final AtomicInteger clientRecvCounter = new AtomicInteger(); + private final AtomicInteger serverRecvCounter = new AtomicInteger(); + + private final AtomicInteger clientNegoCounter = new AtomicInteger(); + private final AtomicInteger serverNegoCounter = new AtomicInteger(); + + private volatile SocketChannel clientChannel; + private volatile SocketChannel serverChannel; + + private volatile SslHandler clientSslHandler; + private volatile SslHandler serverSslHandler; + + private final EchoClientHandler clientHandler = + new EchoClientHandler(clientRecvCounter, clientNegoCounter, clientException); + + private final EchoServerHandler serverHandler = + new EchoServerHandler(serverRecvCounter, serverNegoCounter, serverException); + public SocketSslEchoTest( SslContext serverCtx, SslContext clientCtx, Renegotiation renegotiation, boolean serverUsesDelegatedTaskExecutor, boolean clientUsesDelegatedTaskExecutor, @@ -197,23 +220,26 @@ public class SocketSslEchoTest extends AbstractSocketTest { public void testSslEcho(ServerBootstrap sb, Bootstrap cb) throws Throwable { final ExecutorService delegatedTaskExecutor = Executors.newCachedThreadPool(); - final EchoHandler sh = new EchoHandler(true); - final EchoHandler ch = new EchoHandler(false); + reset(); sb.childHandler(new ChannelInitializer() { @Override @SuppressWarnings("deprecation") public void initChannel(SocketChannel sch) throws Exception { + serverChannel = sch; + if (serverUsesDelegatedTaskExecutor) { SSLEngine sse = serverCtx.newEngine(sch.alloc()); - sch.pipeline().addFirst("ssl", new SslHandler(sse, delegatedTaskExecutor)); + serverSslHandler = new SslHandler(sse, delegatedTaskExecutor); } else { - sch.pipeline().addFirst("ssl", serverCtx.newHandler(sch.alloc())); + serverSslHandler = serverCtx.newHandler(sch.alloc()); } + + sch.pipeline().addLast("ssl", serverSslHandler); if (useChunkedWriteHandler) { sch.pipeline().addLast(new ChunkedWriteHandler()); } - sch.pipeline().addLast("handler", sh); + sch.pipeline().addLast("handler", serverHandler); } }); @@ -221,46 +247,53 @@ public class SocketSslEchoTest extends AbstractSocketTest { @Override @SuppressWarnings("deprecation") public void initChannel(SocketChannel sch) throws Exception { + clientChannel = sch; + if (clientUsesDelegatedTaskExecutor) { SSLEngine cse = clientCtx.newEngine(sch.alloc()); - sch.pipeline().addFirst("ssl", new SslHandler(cse, delegatedTaskExecutor)); + clientSslHandler = new SslHandler(cse, delegatedTaskExecutor); } else { - sch.pipeline().addFirst("ssl", clientCtx.newHandler(sch.alloc())); + clientSslHandler = clientCtx.newHandler(sch.alloc()); } + + sch.pipeline().addLast("ssl", clientSslHandler); if (useChunkedWriteHandler) { sch.pipeline().addLast(new ChunkedWriteHandler()); } - sch.pipeline().addLast("handler", ch); + sch.pipeline().addLast("handler", clientHandler); } }); - Channel sc = sb.bind().sync().channel(); - Channel cc = cb.connect().sync().channel(); - Future hf = ch.sslHandler.handshakeFuture(); - cc.writeAndFlush(Unpooled.wrappedBuffer(data, 0, FIRST_MESSAGE_SIZE)); - final AtomicBoolean firstByteWriteFutureDone = new AtomicBoolean(); + final Channel sc = sb.bind().sync().channel(); + cb.connect().sync(); - hf.sync(); + final Future clientHandshakeFuture = clientSslHandler.handshakeFuture(); - assertFalse(firstByteWriteFutureDone.get()); + clientChannel.writeAndFlush(Unpooled.wrappedBuffer(data, 0, FIRST_MESSAGE_SIZE)); + clientSendCounter.set(FIRST_MESSAGE_SIZE); + clientHandshakeFuture.sync(); boolean needsRenegotiation = renegotiation.type == RenegotiationType.CLIENT_INITIATED; Future renegoFuture = null; - for (int i = FIRST_MESSAGE_SIZE; i < data.length;) { - int length = Math.min(random.nextInt(1024 * 64), data.length - i); - ByteBuf buf = Unpooled.wrappedBuffer(data, i, length); + while (clientSendCounter.get() < data.length) { + int clientSendCounterVal = clientSendCounter.get(); + int length = Math.min(random.nextInt(1024 * 64), data.length - clientSendCounterVal); + ByteBuf buf = Unpooled.wrappedBuffer(data, clientSendCounterVal, length); if (useCompositeByteBuf) { buf = Unpooled.compositeBuffer().addComponent(buf).writerIndex(buf.writerIndex()); } - ChannelFuture future = cc.writeAndFlush(buf); - future.sync(); - i += length; - if (needsRenegotiation && i >= data.length / 2) { + ChannelFuture future = clientChannel.writeAndFlush(buf); + clientSendCounter.set(clientSendCounterVal += length); + future.sync(); + + + if (needsRenegotiation && clientSendCounterVal >= data.length / 2) { needsRenegotiation = false; - ch.sslHandler.engine().setEnabledCipherSuites(new String[] { renegotiation.cipherSuite }); - renegoFuture = ch.sslHandler.renegotiate(); - assertThat(renegoFuture, is(not(sameInstance(hf)))); + clientSslHandler.engine().setEnabledCipherSuites(new String[] { renegotiation.cipherSuite }); + renegoFuture = clientSslHandler.renegotiate(); + logStats("CLIENT RENEGOTIATES"); + assertThat(renegoFuture, is(not(sameInstance(clientHandshakeFuture)))); assertThat(renegoFuture.isDone(), is(false)); } } @@ -269,16 +302,16 @@ public class SocketSslEchoTest extends AbstractSocketTest { if (renegoFuture != null) { renegoFuture.sync(); } - if (sh.renegoFuture != null) { - sh.renegoFuture.sync(); + if (serverHandler.renegoFuture != null) { + serverHandler.renegoFuture.sync(); } // Ensure all data has been exchanged. - while (ch.counter < data.length) { - if (sh.exception.get() != null) { + while (clientRecvCounter.get() < data.length) { + if (serverException.get() != null) { break; } - if (ch.exception.get() != null) { + if (serverException.get() != null) { break; } @@ -289,11 +322,11 @@ public class SocketSslEchoTest extends AbstractSocketTest { } } - while (sh.counter < data.length) { - if (sh.exception.get() != null) { + while (serverRecvCounter.get() < data.length) { + if (serverException.get() != null) { break; } - if (ch.exception.get() != null) { + if (clientException.get() != null) { break; } @@ -304,107 +337,92 @@ public class SocketSslEchoTest extends AbstractSocketTest { } } - sh.channel.close().awaitUninterruptibly(); - ch.channel.close().awaitUninterruptibly(); + serverChannel.close().awaitUninterruptibly(); + clientChannel.close().awaitUninterruptibly(); sc.close().awaitUninterruptibly(); delegatedTaskExecutor.shutdown(); - if (sh.exception.get() != null && !(sh.exception.get() instanceof IOException)) { - throw sh.exception.get(); + if (serverException.get() != null && !(serverException.get() instanceof IOException)) { + throw serverException.get(); } - if (ch.exception.get() != null && !(ch.exception.get() instanceof IOException)) { - throw ch.exception.get(); + if (clientException.get() != null && !(clientException.get() instanceof IOException)) { + throw clientException.get(); } - if (sh.exception.get() != null) { - throw sh.exception.get(); + if (serverException.get() != null) { + throw serverException.get(); } - if (ch.exception.get() != null) { - throw ch.exception.get(); + if (clientException.get() != null) { + throw clientException.get(); } // When renegotiation is done, both the client and server side should be notified. try { if (renegotiation.type != RenegotiationType.NONE) { - assertThat(sh.sslHandler.engine().getSession().getCipherSuite(), is(renegotiation.cipherSuite)); - assertThat(sh.negoCounter, is(2)); - assertThat(ch.sslHandler.engine().getSession().getCipherSuite(), is(renegotiation.cipherSuite)); - assertThat(ch.negoCounter, is(2)); + assertThat(serverSslHandler.engine().getSession().getCipherSuite(), is(renegotiation.cipherSuite)); + assertThat(serverNegoCounter.get(), is(2)); + assertThat(clientSslHandler.engine().getSession().getCipherSuite(), is(renegotiation.cipherSuite)); + assertThat(clientNegoCounter.get(), is(2)); } else { - assertThat(sh.negoCounter, is(1)); - assertThat(ch.negoCounter, is(1)); + assertThat(serverNegoCounter.get(), is(1)); + assertThat(clientNegoCounter.get(), is(1)); } } catch (Throwable t) { // TODO: Remove this once we fix this test. - logger.warn(sh.channel + - "[S] cipherSuite: " + sh.sslHandler.engine().getSession().getCipherSuite() + - ", negoCounter: " + sh.negoCounter); - logger.warn(ch.channel + - "[C] cipherSuite: " + ch.sslHandler.engine().getSession().getCipherSuite() + - ", negoCounter: " + ch.negoCounter); - TestUtils.dump(StringUtil.simpleClassName(this)); throw t; + } finally { + logStats("STATS"); } } - private class EchoHandler extends SimpleChannelInboundHandler { - volatile Channel channel; - final AtomicReference exception = new AtomicReference(); - volatile int counter; - private final boolean server; - volatile SslHandler sslHandler; - volatile Future renegoFuture; - volatile int negoCounter; + private void reset() { + clientException.set(null); + serverException.set(null); - EchoHandler(boolean server) { - this.server = server; + clientSendCounter.set(0); + clientRecvCounter.set(0); + serverRecvCounter.set(0); + + clientNegoCounter.set(0); + serverNegoCounter.set(0); + + clientChannel = null; + serverChannel = null; + + clientSslHandler = null; + serverSslHandler = null; + } + + void logStats(String message) { + logger.debug( + "{}:\n" + + "\tclient { sent: {}, rcvd: {}, nego: {}, cipher: {} },\n" + + "\tserver { rcvd: {}, nego: {}, cipher: {} }", + message, + clientSendCounter, clientRecvCounter, clientNegoCounter, + clientSslHandler.engine().getSession().getCipherSuite(), + serverRecvCounter, serverNegoCounter, + serverSslHandler.engine().getSession().getCipherSuite()); + } + + @Sharable + private abstract class EchoHandler extends SimpleChannelInboundHandler { + + protected final AtomicInteger recvCounter; + protected final AtomicInteger negoCounter; + protected final AtomicReference exception; + + EchoHandler( + AtomicInteger recvCounter, AtomicInteger negoCounter, + AtomicReference exception) { + + this.recvCounter = recvCounter; + this.negoCounter = negoCounter; + this.exception = exception; } @Override - public void channelRegistered(ChannelHandlerContext ctx) throws Exception { - channel = ctx.channel(); - sslHandler = channel.pipeline().get(SslHandler.class); - } - - @Override - public void channelRead0(ChannelHandlerContext ctx, ByteBuf in) throws Exception { - byte[] actual = new byte[in.readableBytes()]; - in.readBytes(actual); - - int lastIdx = counter; - for (int i = 0; i < actual.length; i ++) { - assertEquals(data[i + lastIdx], actual[i]); - } - - if (channel.parent() != null) { - ByteBuf buf = Unpooled.wrappedBuffer(actual); - if (useCompositeByteBuf) { - buf = Unpooled.compositeBuffer().addComponent(buf).writerIndex(buf.writerIndex()); - } - channel.write(buf); - } - - counter += actual.length; - - // Perform server-initiated renegotiation if necessary. - if (server && renegotiation.type == RenegotiationType.SERVER_INITIATED && - counter > data.length / 2 && renegoFuture == null) { - - SslHandler sslHandler = ctx.pipeline().get(SslHandler.class); - - Future hf = sslHandler.handshakeFuture(); - assertThat(hf.isDone(), is(true)); - - sslHandler.engine().setEnabledCipherSuites(new String[] { renegotiation.cipherSuite }); - renegoFuture = sslHandler.renegotiate(); - assertThat(renegoFuture, is(not(sameInstance(hf)))); - assertThat(renegoFuture, is(sameInstance(sslHandler.handshakeFuture()))); - assertThat(renegoFuture.isDone(), is(false)); - } - } - - @Override - public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { + public final void channelReadComplete(ChannelHandlerContext ctx) throws Exception { try { ctx.flush(); } finally { @@ -415,23 +433,98 @@ public class SocketSslEchoTest extends AbstractSocketTest { } @Override - public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { + public final void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof SslHandshakeCompletionEvent) { assertSame(SslHandshakeCompletionEvent.SUCCESS, evt); - negoCounter ++; + negoCounter.incrementAndGet(); + logStats("HANDSHAKEN"); } } @Override - public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + public final void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { if (logger.isWarnEnabled()) { - logger.warn( - "Unexpected exception from the " + - (server? "server" : "client") + " side", cause); + logger.warn("Unexpected exception from the client side:", cause); } exception.compareAndSet(null, cause); ctx.close(); } } + + private class EchoClientHandler extends EchoHandler { + + EchoClientHandler( + AtomicInteger recvCounter, AtomicInteger negoCounter, + AtomicReference exception) { + + super(recvCounter, negoCounter, exception); + } + + @Override + public void channelRead0(ChannelHandlerContext ctx, ByteBuf in) throws Exception { + byte[] actual = new byte[in.readableBytes()]; + in.readBytes(actual); + + int lastIdx = recvCounter.get(); + for (int i = 0; i < actual.length; i ++) { + assertEquals(data[i + lastIdx], actual[i]); + } + + recvCounter.addAndGet(actual.length); + } + } + + + private class EchoServerHandler extends EchoHandler { + volatile Future renegoFuture; + + EchoServerHandler( + AtomicInteger recvCounter, AtomicInteger negoCounter, + AtomicReference exception) { + + super(recvCounter, negoCounter, exception); + } + + @Override + public final void channelRegistered(ChannelHandlerContext ctx) throws Exception { + renegoFuture = null; + } + + @Override + public void channelRead0(ChannelHandlerContext ctx, ByteBuf in) throws Exception { + byte[] actual = new byte[in.readableBytes()]; + in.readBytes(actual); + + int lastIdx = recvCounter.get(); + for (int i = 0; i < actual.length; i ++) { + assertEquals(data[i + lastIdx], actual[i]); + } + + ByteBuf buf = Unpooled.wrappedBuffer(actual); + if (useCompositeByteBuf) { + buf = Unpooled.compositeBuffer().addComponent(buf).writerIndex(buf.writerIndex()); + } + ctx.write(buf); + + recvCounter.addAndGet(actual.length); + + // Perform server-initiated renegotiation if necessary. + if (renegotiation.type == RenegotiationType.SERVER_INITIATED && + recvCounter.get() > data.length / 2 && renegoFuture == null) { + + SslHandler sslHandler = ctx.pipeline().get(SslHandler.class); + + Future hf = sslHandler.handshakeFuture(); + assertThat(hf.isDone(), is(true)); + + sslHandler.engine().setEnabledCipherSuites(new String[] { renegotiation.cipherSuite }); + logStats("SERVER RENEGOTIATES"); + renegoFuture = sslHandler.renegotiate(); + assertThat(renegoFuture, is(not(sameInstance(hf)))); + assertThat(renegoFuture, is(sameInstance(sslHandler.handshakeFuture()))); + assertThat(renegoFuture.isDone(), is(false)); + } + } + } }