From b9d277dbcb3f318c14388b4ebfe75072ff6daea9 Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Mon, 11 Feb 2019 09:47:44 +0100 Subject: [PATCH] =?UTF-8?q?Support=20using=20an=20Executor=20to=20offload?= =?UTF-8?q?=20blocking=20/=20long-running=20tasks=20wh=E2=80=A6=20(#8847)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Motivation: The SSLEngine does provide a way to signal to the caller that it may need to execute a blocking / long-running task which then can be offloaded to an Executor to ensure the I/O thread is not blocked. Currently how we handle this in SslHandler is not really optimal as while we offload to the Executor we still block the I/O Thread. Modifications: - Correctly support offloading the task to the Executor while suspending processing of SSL in the I/O Thread - Add new methods to SslContext to specify the Executor when creating a SslHandler - Remove @deprecated annotations from SslHandler constructor that takes an Executor - Adjust tests to also run with the Executor to ensure all works as expected. Result: Be able to offload long running tasks to an Executor when using SslHandler. Partly fixes https://github.com/netty/netty/issues/7862 and https://github.com/netty/netty/issues/7020. --- .../handler/ssl/DelegatingSslContext.java | 16 + .../ssl/ReferenceCountedOpenSslContext.java | 12 + .../java/io/netty/handler/ssl/SniHandler.java | 11 +- .../java/io/netty/handler/ssl/SslContext.java | 62 +++- .../java/io/netty/handler/ssl/SslHandler.java | 318 ++++++++++++++---- .../handler/ssl/CipherSuiteCanaryTest.java | 34 +- .../ssl/ConscryptJdkSslEngineInteropTest.java | 9 +- .../handler/ssl/ConscryptSslEngineTest.java | 9 +- .../ssl/JdkConscryptSslEngineInteropTest.java | 9 +- .../ssl/JdkOpenSslEngineInteroptTest.java | 12 +- .../netty/handler/ssl/JdkSslEngineTest.java | 14 +- .../netty/handler/ssl/OpenSslEngineTest.java | 12 +- .../ssl/OpenSslJdkSslEngineInteroptTest.java | 12 +- .../ReferenceCountedOpenSslEngineTest.java | 4 +- .../io/netty/handler/ssl/SSLEngineTest.java | 83 ++++- .../io/netty/handler/ssl/SslHandlerTest.java | 159 ++++++++- .../SocketSslClientRenegotiateTest.java | 121 ++++--- .../socket/SocketSslGreetingTest.java | 104 +++--- ...lDomainSocketSslClientRenegotiateTest.java | 4 +- .../EpollDomainSocketSslGreetingTest.java | 4 +- .../EpollSocketSslClientRenegotiateTest.java | 4 +- .../epoll/EpollSocketSslGreetingTest.java | 4 +- ...eDomainSocketSslClientRenegotiateTest.java | 4 +- .../KQueueDomainSocketSslGreetingTest.java | 4 +- .../KQueueSocketSslClientRenegotiateTest.java | 4 +- .../kqueue/KQueueSocketSslGreetingTest.java | 4 +- 26 files changed, 793 insertions(+), 240 deletions(-) diff --git a/handler/src/main/java/io/netty/handler/ssl/DelegatingSslContext.java b/handler/src/main/java/io/netty/handler/ssl/DelegatingSslContext.java index 76358a8e89..82dad484ad 100644 --- a/handler/src/main/java/io/netty/handler/ssl/DelegatingSslContext.java +++ b/handler/src/main/java/io/netty/handler/ssl/DelegatingSslContext.java @@ -22,6 +22,7 @@ import io.netty.buffer.ByteBufAllocator; import javax.net.ssl.SSLEngine; import javax.net.ssl.SSLSessionContext; import java.util.List; +import java.util.concurrent.Executor; /** * Adapter class which allows to wrap another {@link SslContext} and init {@link SSLEngine} instances. @@ -87,6 +88,21 @@ public abstract class DelegatingSslContext extends SslContext { return handler; } + @Override + protected SslHandler newHandler(ByteBufAllocator alloc, boolean startTls, Executor executor) { + SslHandler handler = ctx.newHandler(alloc, startTls, executor); + initHandler(handler); + return handler; + } + + @Override + protected SslHandler newHandler(ByteBufAllocator alloc, String peerHost, int peerPort, + boolean startTls, Executor executor) { + SslHandler handler = ctx.newHandler(alloc, peerHost, peerPort, startTls, executor); + initHandler(handler); + return handler; + } + @Override public final SSLSessionContext sessionContext() { return ctx.sessionContext(); diff --git a/handler/src/main/java/io/netty/handler/ssl/ReferenceCountedOpenSslContext.java b/handler/src/main/java/io/netty/handler/ssl/ReferenceCountedOpenSslContext.java index 1836428b17..10026323c8 100644 --- a/handler/src/main/java/io/netty/handler/ssl/ReferenceCountedOpenSslContext.java +++ b/handler/src/main/java/io/netty/handler/ssl/ReferenceCountedOpenSslContext.java @@ -44,6 +44,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executor; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -391,6 +392,17 @@ public abstract class ReferenceCountedOpenSslContext extends SslContext implemen return new SslHandler(newEngine0(alloc, peerHost, peerPort, false), startTls); } + @Override + protected SslHandler newHandler(ByteBufAllocator alloc, boolean startTls, Executor executor) { + return new SslHandler(newEngine0(alloc, null, -1, false), startTls, executor); + } + + @Override + protected SslHandler newHandler(ByteBufAllocator alloc, String peerHost, int peerPort, + boolean startTls, Executor executor) { + return new SslHandler(newEngine0(alloc, peerHost, peerPort, false), executor); + } + SSLEngine newEngine0(ByteBufAllocator alloc, String peerHost, int peerPort, boolean jdkCompatibilityMode) { return new ReferenceCountedOpenSslEngine(this, alloc, peerHost, peerPort, jdkCompatibilityMode, true); } diff --git a/handler/src/main/java/io/netty/handler/ssl/SniHandler.java b/handler/src/main/java/io/netty/handler/ssl/SniHandler.java index 45c588db16..0e5e232116 100644 --- a/handler/src/main/java/io/netty/handler/ssl/SniHandler.java +++ b/handler/src/main/java/io/netty/handler/ssl/SniHandler.java @@ -17,6 +17,7 @@ package io.netty.handler.ssl; import static java.util.Objects.requireNonNull; +import io.netty.buffer.ByteBufAllocator; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.DecoderException; import io.netty.util.AsyncMapping; @@ -130,7 +131,7 @@ public class SniHandler extends AbstractSniHandler { protected void replaceHandler(ChannelHandlerContext ctx, String hostname, SslContext sslContext) throws Exception { SslHandler sslHandler = null; try { - sslHandler = sslContext.newHandler(ctx.alloc()); + sslHandler = newSslHandler(sslContext, ctx.alloc()); ctx.pipeline().replace(this, SslHandler.class.getName(), sslHandler); sslHandler = null; } finally { @@ -143,6 +144,14 @@ public class SniHandler extends AbstractSniHandler { } } + /** + * Returns a new {@link SslHandler} using the given {@link SslContext} and {@link ByteBufAllocator}. + * Users may override this method to implement custom behavior. + */ + protected SslHandler newSslHandler(SslContext context, ByteBufAllocator allocator) { + return context.newHandler(allocator); + } + private static final class AsyncMappingAdapter implements AsyncMapping { private final Mapping mapping; diff --git a/handler/src/main/java/io/netty/handler/ssl/SslContext.java b/handler/src/main/java/io/netty/handler/ssl/SslContext.java index 6c5a6c6109..dab0ce745d 100644 --- a/handler/src/main/java/io/netty/handler/ssl/SslContext.java +++ b/handler/src/main/java/io/netty/handler/ssl/SslContext.java @@ -60,6 +60,7 @@ import java.security.cert.X509Certificate; import java.security.spec.InvalidKeySpecException; import java.security.spec.PKCS8EncodedKeySpec; import java.util.List; +import java.util.concurrent.Executor; /** * A secure socket protocol implementation which acts as a factory for {@link SSLEngine} and {@link SslHandler}. @@ -879,6 +880,22 @@ public abstract class SslContext { */ public abstract SSLSessionContext sessionContext(); + /** + * Create a new SslHandler. + * @see #newHandler(ByteBufAllocator, Executor) + */ + public final SslHandler newHandler(ByteBufAllocator alloc) { + return newHandler(alloc, startTls); + } + + /** + * Create a new SslHandler. + * @see #newHandler(ByteBufAllocator) + */ + protected SslHandler newHandler(ByteBufAllocator alloc, boolean startTls) { + return new SslHandler(newEngine(alloc), startTls); + } + /** * Creates a new {@link SslHandler}. *

If {@link SslProvider#OPENSSL_REFCNT} is used then the returned {@link SslHandler} will release the engine @@ -900,18 +917,37 @@ public abstract class SslContext { * SSLEngine javadocs which * limits wrap/unwrap to operate on a single SSL/TLS packet. * @param alloc If supported by the SSLEngine then the SSLEngine will use this to allocate ByteBuf objects. + * @param delegatedTaskExecutor the {@link Executor} that will be used to execute tasks that are returned by + * {@link SSLEngine#getDelegatedTask()}. * @return a new {@link SslHandler} */ - public final SslHandler newHandler(ByteBufAllocator alloc) { - return newHandler(alloc, startTls); + public SslHandler newHandler(ByteBufAllocator alloc, Executor delegatedTaskExecutor) { + return newHandler(alloc, startTls, delegatedTaskExecutor); } /** * Create a new SslHandler. - * @see #newHandler(ByteBufAllocator) + * @see #newHandler(ByteBufAllocator, String, int, boolean, Executor) */ - protected SslHandler newHandler(ByteBufAllocator alloc, boolean startTls) { - return new SslHandler(newEngine(alloc), startTls); + protected SslHandler newHandler(ByteBufAllocator alloc, boolean startTls, Executor executor) { + return new SslHandler(newEngine(alloc), startTls, executor); + } + + /** + * Creates a new {@link SslHandler} + * + * @see #newHandler(ByteBufAllocator, String, int, Executor) + */ + public final SslHandler newHandler(ByteBufAllocator alloc, String peerHost, int peerPort) { + return newHandler(alloc, peerHost, peerPort, startTls); + } + + /** + * Create a new SslHandler. + * @see #newHandler(ByteBufAllocator, String, int, boolean, Executor) + */ + protected SslHandler newHandler(ByteBufAllocator alloc, String peerHost, int peerPort, boolean startTls) { + return new SslHandler(newEngine(alloc, peerHost, peerPort), startTls); } /** @@ -937,19 +973,19 @@ public abstract class SslContext { * @param alloc If supported by the SSLEngine then the SSLEngine will use this to allocate ByteBuf objects. * @param peerHost the non-authoritative name of the host * @param peerPort the non-authoritative port + * @param delegatedTaskExecutor the {@link Executor} that will be used to execute tasks that are returned by + * {@link SSLEngine#getDelegatedTask()}. * * @return a new {@link SslHandler} */ - public final SslHandler newHandler(ByteBufAllocator alloc, String peerHost, int peerPort) { - return newHandler(alloc, peerHost, peerPort, startTls); + public SslHandler newHandler(ByteBufAllocator alloc, String peerHost, int peerPort, + Executor delegatedTaskExecutor) { + return newHandler(alloc, peerHost, peerPort, startTls, delegatedTaskExecutor); } - /** - * Create a new SslHandler. - * @see #newHandler(ByteBufAllocator, String, int, boolean) - */ - protected SslHandler newHandler(ByteBufAllocator alloc, String peerHost, int peerPort, boolean startTls) { - return new SslHandler(newEngine(alloc, peerHost, peerPort), startTls); + protected SslHandler newHandler(ByteBufAllocator alloc, String peerHost, int peerPort, boolean startTls, + Executor delegatedTaskExecutor) { + return new SslHandler(newEngine(alloc, peerHost, peerPort), startTls, delegatedTaskExecutor); } /** diff --git a/handler/src/main/java/io/netty/handler/ssl/SslHandler.java b/handler/src/main/java/io/netty/handler/ssl/SslHandler.java index c6e8872afb..599bc45994 100644 --- a/handler/src/main/java/io/netty/handler/ssl/SslHandler.java +++ b/handler/src/main/java/io/netty/handler/ssl/SslHandler.java @@ -33,6 +33,7 @@ import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPromise; import io.netty.channel.ChannelPromiseNotifier; import io.netty.handler.codec.ByteToMessageDecoder; +import io.netty.handler.codec.DecoderException; import io.netty.handler.codec.UnsupportedMessageTypeException; import io.netty.util.ReferenceCountUtil; import io.netty.util.ReferenceCounted; @@ -55,10 +56,9 @@ import java.nio.ByteBuffer; import java.nio.channels.ClosedChannelException; import java.nio.channels.DatagramChannel; import java.nio.channels.SocketChannel; -import java.util.ArrayList; import java.util.List; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.regex.Pattern; @@ -391,6 +391,7 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH private boolean flushedBeforeHandshake; private boolean readDuringHandshake; private boolean handshakeStarted; + private SslHandlerCoalescingBufferQueue pendingUnencryptedWrites; private Promise handshakePromise = new LazyChannelPromise(); private final LazyChannelPromise sslClosePromise = new LazyChannelPromise(); @@ -403,6 +404,7 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH private boolean outboundClosed; private boolean closeNotify; + private boolean processTask; private int packetLength; @@ -418,7 +420,7 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH volatile int wrapDataSize = MAX_PLAINTEXT_LENGTH; /** - * Creates a new instance. + * Creates a new instance which runs all delegated tasks directly on the {@link EventExecutor}. * * @param engine the {@link SSLEngine} this handler will use */ @@ -427,29 +429,36 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH } /** - * Creates a new instance. + * Creates a new instance which runs all delegated tasks directly on the {@link EventExecutor}. * * @param engine the {@link SSLEngine} this handler will use * @param startTls {@code true} if the first write request shouldn't be * encrypted by the {@link SSLEngine} */ - @SuppressWarnings("deprecation") public SslHandler(SSLEngine engine, boolean startTls) { this(engine, startTls, ImmediateExecutor.INSTANCE); } /** - * @deprecated Use {@link #SslHandler(SSLEngine)} instead. + * Creates a new instance. + * + * @param engine the {@link SSLEngine} this handler will use + * @param delegatedTaskExecutor the {@link Executor} that will be used to execute tasks that are returned by + * {@link SSLEngine#getDelegatedTask()}. */ - @Deprecated public SslHandler(SSLEngine engine, Executor delegatedTaskExecutor) { this(engine, false, delegatedTaskExecutor); } /** - * @deprecated Use {@link #SslHandler(SSLEngine, boolean)} instead. + * Creates a new instance. + * + * @param engine the {@link SSLEngine} this handler will use + * @param startTls {@code true} if the first write request shouldn't be + * encrypted by the {@link SSLEngine} + * @param delegatedTaskExecutor the {@link Executor} that will be used to execute tasks that are returned by + * {@link SSLEngine#getDelegatedTask()}. */ - @Deprecated public SslHandler(SSLEngine engine, boolean startTls, Executor delegatedTaskExecutor) { requireNonNull(engine, "engine"); requireNonNull(delegatedTaskExecutor, "delegatedTaskExecutor"); @@ -769,6 +778,10 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH return; } + if (processTask) { + return; + } + try { wrapAndFlush(ctx); } catch (Throwable cause) { @@ -808,7 +821,7 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH final int wrapDataSize = this.wrapDataSize; // Only continue to loop if the handler was not removed in the meantime. // See https://github.com/netty/netty/issues/5860 - while (!ctx.isRemoved()) { + outer: while (!ctx.isRemoved()) { promise = ctx.newPromise(); buf = wrapDataSize > 0 ? pendingUnencryptedWrites.remove(alloc, wrapDataSize, promise) : @@ -845,7 +858,11 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH switch (result.getHandshakeStatus()) { case NEED_TASK: - runDelegatedTasks(); + if (!runDelegatedTasks(inUnwrap)) { + // We scheduled a task on the delegatingTaskExecutor, so stop processing as we will + // resume once the task completes. + break outer; + } break; case FINISHED: setHandshakeSuccess(); @@ -914,7 +931,7 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH try { // Only continue to loop if the handler was not removed in the meantime. // See https://github.com/netty/netty/issues/5860 - while (!ctx.isRemoved()) { + outer: while (!ctx.isRemoved()) { if (out == null) { // As this is called for the handshake we have no real idea how big the buffer needs to be. // That said 2048 should give us enough room to include everything like ALPN / NPN data. @@ -936,7 +953,11 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH setHandshakeSuccess(); return false; case NEED_TASK: - runDelegatedTasks(); + if (!runDelegatedTasks(inUnwrap)) { + // We scheduled a task on the delegatingTaskExecutor, so stop processing as we will + // resume once the task completes. + break outer; + } break; case NEED_UNWRAP: if (inUnwrap) { @@ -1237,6 +1258,9 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws SSLException { + if (processTask) { + return; + } if (jdkCompatibilityMode) { decodeJdkCompatible(ctx, in); } else { @@ -1246,6 +1270,10 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { + channelReadComplete0(ctx); + } + + private void channelReadComplete0(ChannelHandlerContext ctx) { // Discard bytes of the cumulation buffer if needed. discardSomeReadBytes(); @@ -1360,7 +1388,16 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH } break; case NEED_TASK: - runDelegatedTasks(); + if (!runDelegatedTasks(true)) { + // We scheduled a task on the delegatingTaskExecutor, so stop processing as we will + // resume once the task completes. + // + // We break out of the loop only and do NOT return here as we still may need to notify + // about the closure of the SSLEngine. + // + wrapLater = false; + break unwrapLoop; + } break; case FINISHED: setHandshakeSuccess(); @@ -1441,62 +1478,215 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH out.nioBuffer(index, len); } - /** - * Fetches all delegated tasks from the {@link SSLEngine} and runs them via the {@link #delegatedTaskExecutor}. - * If the {@link #delegatedTaskExecutor} is {@link ImmediateExecutor}, just call {@link Runnable#run()} directly - * instead of using {@link Executor#execute(Runnable)}. Otherwise, run the tasks via - * the {@link #delegatedTaskExecutor} and wait until the tasks are finished. - */ - private void runDelegatedTasks() { - if (delegatedTaskExecutor == ImmediateExecutor.INSTANCE) { - for (;;) { - Runnable task = engine.getDelegatedTask(); - if (task == null) { - break; - } + private static boolean inEventLoop(Executor executor) { + return executor instanceof EventExecutor && ((EventExecutor) executor).inEventLoop(); + } - task.run(); - } - } else { - final List tasks = new ArrayList<>(2); - for (;;) { - final Runnable task = engine.getDelegatedTask(); - if (task == null) { - break; - } - - tasks.add(task); - } - - if (tasks.isEmpty()) { + private static void runAllDelegatedTasks(SSLEngine engine) { + for (;;) { + Runnable task = engine.getDelegatedTask(); + if (task == null) { return; } + task.run(); + } + } - final CountDownLatch latch = new CountDownLatch(1); - delegatedTaskExecutor.execute(() -> { - try { - for (Runnable task: tasks) { - task.run(); - } - } catch (Exception e) { - ctx.fireExceptionCaught(e); - } finally { - latch.countDown(); - } - }); - - boolean interrupted = false; - while (latch.getCount() != 0) { - try { - latch.await(); - } catch (InterruptedException e) { - // Interrupt later. - interrupted = true; - } + /** + * Will either run the delegated task directly calling {@link Runnable#run()} and return {@code true} or will + * offload the delegated task using {@link Executor#execute(Runnable)} and return {@code false}. + * + * If the task is offloaded it will take care to resume its work on the {@link EventExecutor} once there are no + * more tasks to process. + */ + private boolean runDelegatedTasks(boolean inUnwrap) { + if (delegatedTaskExecutor == ImmediateExecutor.INSTANCE || inEventLoop(delegatedTaskExecutor)) { + // We should run the task directly in the EventExecutor thread and not offload at all. + for (;;) { + runAllDelegatedTasks(engine); + return true; } + } else { + executeDelegatedTasks(inUnwrap); + return false; + } + } - if (interrupted) { - Thread.currentThread().interrupt(); + private void executeDelegatedTasks(boolean inUnwrap) { + processTask = true; + try { + delegatedTaskExecutor.execute(new SslTasksRunner(inUnwrap)); + } catch (RejectedExecutionException e) { + processTask = false; + throw e; + } + } + + /** + * {@link Runnable} that will be scheduled on the {@code delegatedTaskExecutor} and will take care + * of resume work on the {@link EventExecutor} once the task was executed. + */ + private final class SslTasksRunner implements Runnable { + private final boolean inUnwrap; + + SslTasksRunner(boolean inUnwrap) { + this.inUnwrap = inUnwrap; + } + + // Handle errors which happened during task processing. + private void taskError(Throwable e) { + if (inUnwrap) { + // As the error happened while the task was scheduled as part of unwrap(...) we also need to ensure + // we fire it through the pipeline as inbound error to be consistent with what we do in decode(...). + // + // This will also ensure we fail the handshake future and flush all produced data. + try { + handleUnwrapThrowable(ctx, e); + } catch (Throwable cause) { + safeExceptionCaught(cause); + } + } else { + setHandshakeFailure(ctx, e); + forceFlush(ctx); + } + } + + // Try to call exceptionCaught(...) + private void safeExceptionCaught(Throwable cause) { + try { + exceptionCaught(ctx, wrapIfNeeded(cause)); + } catch (Throwable error) { + ctx.fireExceptionCaught(error); + } + } + + private Throwable wrapIfNeeded(Throwable cause) { + if (!inUnwrap) { + // If we are not in unwrap(...) we can just rethrow without wrapping at all. + return cause; + } + // As the exception would have been triggered by an inbound operation we will need to wrap it in a + // DecoderException to mimic what a decoder would do when decode(...) throws. + return cause instanceof DecoderException ? cause : new DecoderException(cause); + } + + private void tryDecodeAgain() { + try { + channelRead(ctx, Unpooled.EMPTY_BUFFER); + } catch (Throwable cause) { + safeExceptionCaught(cause); + } finally { + // As we called channelRead(...) we also need to call channelReadComplete(...) which + // will ensure we either call ctx.fireChannelReadComplete() or will trigger a ctx.read() if + // more data is needed. + channelReadComplete0(ctx); + } + } + + /** + * Executed after the wrapped {@code task} was executed via {@code delegatedTaskExecutor} to resume work + * on the {@link EventExecutor}. + */ + private void resumeOnEventExecutor() { + assert ctx.executor().inEventLoop(); + + processTask = false; + + try { + HandshakeStatus status = engine.getHandshakeStatus(); + switch (status) { + // There is another task that needs to be executed and offloaded to the delegatingTaskExecutor. + case NEED_TASK: + executeDelegatedTasks(inUnwrap); + + break; + + // The handshake finished, lets notify about the completion of it and resume processing. + case FINISHED: + setHandshakeSuccess(); + + // deliberate fall-through + + // Not handshaking anymore, lets notify about the completion if not done yet and resume processing. + case NOT_HANDSHAKING: + setHandshakeSuccessIfStillHandshaking(); + try { + // Lets call wrap to ensure we produce the alert if there is any pending and also to + // ensure we flush any queued data.. + wrap(ctx, inUnwrap); + } catch (Throwable e) { + taskError(e); + return; + } + + // Flush now as we may have written some data as part of the wrap call. + forceFlush(ctx); + + tryDecodeAgain(); + break; + + // We need more data so lets try to unwrap first and then call decode again which will feed us + // with buffered data (if there is any). + case NEED_UNWRAP: + unwrapNonAppData(ctx); + tryDecodeAgain(); + break; + + // To make progress we need to call SSLEngine.wrap(...) which may produce more output data + // that will be written to the Channel. + case NEED_WRAP: + try { + wrapNonAppData(ctx, inUnwrap); + } catch (Throwable e) { + taskError(e); + return; + } + // Flush now as we may have written some data as part of the wrap call. + forceFlush(ctx); + + // Now try to feed in more data that we have buffered. + tryDecodeAgain(); + break; + default: + // Should never reach here as we handle all cases. + throw new AssertionError(); + } + } catch (Throwable cause) { + safeExceptionCaught(cause); + } + } + + @Override + public void run() { + try { + runAllDelegatedTasks(engine); + + // All tasks were processed. + assert engine.getHandshakeStatus() != HandshakeStatus.NEED_TASK; + + // Jump back on the EventExecutor. + ctx.executor().execute(this::resumeOnEventExecutor); + } catch (final Throwable cause) { + handleException(cause); + } + } + + private void handleException(final Throwable cause) { + if (ctx.executor().inEventLoop()) { + processTask = false; + safeExceptionCaught(cause); + } else { + try { + ctx.executor().execute(() -> { + processTask = false; + safeExceptionCaught(cause); + }); + } catch (RejectedExecutionException ignore) { + processTask = false; + // the context itself will handle the rejected exception when try to schedule the operation so + // ignore the RejectedExecutionException + ctx.fireExceptionCaught(cause); + } } } } diff --git a/handler/src/test/java/io/netty/handler/ssl/CipherSuiteCanaryTest.java b/handler/src/test/java/io/netty/handler/ssl/CipherSuiteCanaryTest.java index e6b90a8508..3644cdcc55 100644 --- a/handler/src/test/java/io/netty/handler/ssl/CipherSuiteCanaryTest.java +++ b/handler/src/test/java/io/netty/handler/ssl/CipherSuiteCanaryTest.java @@ -18,6 +18,7 @@ package io.netty.handler.ssl; import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; import io.netty.channel.ChannelHandler; @@ -42,6 +43,9 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import org.junit.AfterClass; @@ -67,7 +71,7 @@ public class CipherSuiteCanaryTest { private static SelfSignedCertificate CERT; - @Parameters(name = "{index}: serverSslProvider = {0}, clientSslProvider = {1}, rfcCipherName = {2}") + @Parameters(name = "{index}: serverSslProvider = {0}, clientSslProvider = {1}, rfcCipherName = {2}, delegate = {3}") public static Collection parameters() { List dst = new ArrayList<>(); dst.addAll(expand("TLS_DHE_RSA_WITH_AES_128_GCM_SHA256")); // DHE-RSA-AES128-GCM-SHA256 @@ -81,7 +85,7 @@ public class CipherSuiteCanaryTest { } @AfterClass - public static void destory() { + public static void destroy() { GROUP.shutdownGracefully(); CERT.delete(); } @@ -91,11 +95,14 @@ public class CipherSuiteCanaryTest { private final SslProvider clientSslProvider; private final String rfcCipherName; + private final boolean delegate; - public CipherSuiteCanaryTest(SslProvider serverSslProvider, SslProvider clientSslProvider, String rfcCipherName) { + public CipherSuiteCanaryTest(SslProvider serverSslProvider, SslProvider clientSslProvider, + String rfcCipherName, boolean delegate) { this.serverSslProvider = serverSslProvider; this.clientSslProvider = clientSslProvider; this.rfcCipherName = rfcCipherName; + this.delegate = delegate; } private static void assumeCipherAvailable(SslProvider provider, String cipher) throws NoSuchAlgorithmException { @@ -114,6 +121,14 @@ public class CipherSuiteCanaryTest { Assume.assumeTrue("Unsupported cipher: " + cipher, cipherSupported); } + private static SslHandler newSslHandler(SslContext sslCtx, ByteBufAllocator allocator, Executor executor) { + if (executor == null) { + return sslCtx.newHandler(allocator); + } else { + return sslCtx.newHandler(allocator, executor); + } + } + @Test public void testHandshake() throws Exception { // Check if the cipher is supported at all which may not be the case for various JDK versions and OpenSSL API @@ -130,6 +145,8 @@ public class CipherSuiteCanaryTest { .protocols(SslUtils.PROTOCOL_TLS_V1_2) .build(); + final ExecutorService executorService = delegate ? Executors.newCachedThreadPool() : null; + try { final SslContext sslClientContext = SslContextBuilder.forClient() .sslProvider(clientSslProvider) @@ -147,7 +164,7 @@ public class CipherSuiteCanaryTest { @Override protected void initChannel(Channel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); - pipeline.addLast(sslServerContext.newHandler(ch.alloc())); + pipeline.addLast(newSslHandler(sslServerContext, ch.alloc(), executorService)); pipeline.addLast(new SimpleChannelInboundHandler() { @Override @@ -183,7 +200,7 @@ public class CipherSuiteCanaryTest { @Override protected void initChannel(Channel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); - pipeline.addLast(sslClientContext.newHandler(ch.alloc())); + pipeline.addLast(newSslHandler(sslClientContext, ch.alloc(), executorService)); pipeline.addLast(new SimpleChannelInboundHandler() { @Override @@ -230,6 +247,10 @@ public class CipherSuiteCanaryTest { } } finally { ReferenceCountUtil.release(sslServerContext); + + if (executorService != null) { + executorService.shutdown(); + } } } @@ -268,7 +289,8 @@ public class CipherSuiteCanaryTest { continue; } - dst.add(new Object[]{serverSslProvider, clientSslProvider, rfcCipherName}); + dst.add(new Object[]{serverSslProvider, clientSslProvider, rfcCipherName, true}); + dst.add(new Object[]{serverSslProvider, clientSslProvider, rfcCipherName, false}); } } diff --git a/handler/src/test/java/io/netty/handler/ssl/ConscryptJdkSslEngineInteropTest.java b/handler/src/test/java/io/netty/handler/ssl/ConscryptJdkSslEngineInteropTest.java index 17daeb203f..3ad54a9310 100644 --- a/handler/src/test/java/io/netty/handler/ssl/ConscryptJdkSslEngineInteropTest.java +++ b/handler/src/test/java/io/netty/handler/ssl/ConscryptJdkSslEngineInteropTest.java @@ -31,17 +31,18 @@ import static org.junit.Assume.assumeTrue; @RunWith(Parameterized.class) public class ConscryptJdkSslEngineInteropTest extends SSLEngineTest { - @Parameterized.Parameters(name = "{index}: bufferType = {0}, combo = {1}") + @Parameterized.Parameters(name = "{index}: bufferType = {0}, combo = {1}, delegate = {2}") public static Collection data() { List params = new ArrayList<>(); for (BufferType type: BufferType.values()) { - params.add(new Object[] { type, ProtocolCipherCombo.tlsv12()}); + params.add(new Object[] { type, ProtocolCipherCombo.tlsv12(), false }); + params.add(new Object[] { type, ProtocolCipherCombo.tlsv12(), true }); } return params; } - public ConscryptJdkSslEngineInteropTest(BufferType type, ProtocolCipherCombo combo) { - super(type, combo); + public ConscryptJdkSslEngineInteropTest(BufferType type, ProtocolCipherCombo combo, boolean delegate) { + super(type, combo, delegate); } @BeforeClass diff --git a/handler/src/test/java/io/netty/handler/ssl/ConscryptSslEngineTest.java b/handler/src/test/java/io/netty/handler/ssl/ConscryptSslEngineTest.java index 2003d40c7a..3f3c8777ad 100644 --- a/handler/src/test/java/io/netty/handler/ssl/ConscryptSslEngineTest.java +++ b/handler/src/test/java/io/netty/handler/ssl/ConscryptSslEngineTest.java @@ -30,17 +30,18 @@ import static org.junit.Assume.assumeTrue; @RunWith(Parameterized.class) public class ConscryptSslEngineTest extends SSLEngineTest { - @Parameterized.Parameters(name = "{index}: bufferType = {0}, combo = {1}") + @Parameterized.Parameters(name = "{index}: bufferType = {0}, combo = {1}, delegate = {2}") public static Collection data() { List params = new ArrayList<>(); for (BufferType type: BufferType.values()) { - params.add(new Object[] { type, ProtocolCipherCombo.tlsv12()}); + params.add(new Object[] { type, ProtocolCipherCombo.tlsv12(), false }); + params.add(new Object[] { type, ProtocolCipherCombo.tlsv12(), true }); } return params; } - public ConscryptSslEngineTest(BufferType type, ProtocolCipherCombo combo) { - super(type, combo); + public ConscryptSslEngineTest(BufferType type, ProtocolCipherCombo combo, boolean delegate) { + super(type, combo, delegate); } @BeforeClass diff --git a/handler/src/test/java/io/netty/handler/ssl/JdkConscryptSslEngineInteropTest.java b/handler/src/test/java/io/netty/handler/ssl/JdkConscryptSslEngineInteropTest.java index 82d94363f3..b3afb5a6e5 100644 --- a/handler/src/test/java/io/netty/handler/ssl/JdkConscryptSslEngineInteropTest.java +++ b/handler/src/test/java/io/netty/handler/ssl/JdkConscryptSslEngineInteropTest.java @@ -32,17 +32,18 @@ import static org.junit.Assume.assumeTrue; @RunWith(Parameterized.class) public class JdkConscryptSslEngineInteropTest extends SSLEngineTest { - @Parameterized.Parameters(name = "{index}: bufferType = {0}, combo = {1}") + @Parameterized.Parameters(name = "{index}: bufferType = {0}, combo = {1}, delegate = {2}") public static Collection data() { List params = new ArrayList<>(); for (BufferType type: BufferType.values()) { - params.add(new Object[] { type, ProtocolCipherCombo.tlsv12()}); + params.add(new Object[] { type, ProtocolCipherCombo.tlsv12(), false }); + params.add(new Object[] { type, ProtocolCipherCombo.tlsv12(), true }); } return params; } - public JdkConscryptSslEngineInteropTest(BufferType type, ProtocolCipherCombo combo) { - super(type, combo); + public JdkConscryptSslEngineInteropTest(BufferType type, ProtocolCipherCombo combo, boolean delegate) { + super(type, combo, delegate); } @BeforeClass diff --git a/handler/src/test/java/io/netty/handler/ssl/JdkOpenSslEngineInteroptTest.java b/handler/src/test/java/io/netty/handler/ssl/JdkOpenSslEngineInteroptTest.java index 31e11c30b7..64add35a19 100644 --- a/handler/src/test/java/io/netty/handler/ssl/JdkOpenSslEngineInteroptTest.java +++ b/handler/src/test/java/io/netty/handler/ssl/JdkOpenSslEngineInteroptTest.java @@ -33,21 +33,23 @@ import static org.junit.Assume.assumeTrue; @RunWith(Parameterized.class) public class JdkOpenSslEngineInteroptTest extends SSLEngineTest { - @Parameterized.Parameters(name = "{index}: bufferType = {0}, combo = {1}") + @Parameterized.Parameters(name = "{index}: bufferType = {0}, combo = {1}, delegate = {2}") public static Collection data() { List params = new ArrayList<>(); for (BufferType type: BufferType.values()) { - params.add(new Object[] { type, ProtocolCipherCombo.tlsv12()}); + params.add(new Object[] { type, ProtocolCipherCombo.tlsv12(), false }); + params.add(new Object[] { type, ProtocolCipherCombo.tlsv12(), true }); if (PlatformDependent.javaVersion() >= 11 && OpenSsl.isTlsv13Supported()) { - params.add(new Object[] { type, ProtocolCipherCombo.tlsv13() }); + params.add(new Object[] { type, ProtocolCipherCombo.tlsv13(), false }); + params.add(new Object[] { type, ProtocolCipherCombo.tlsv13(), true }); } } return params; } - public JdkOpenSslEngineInteroptTest(BufferType type, ProtocolCipherCombo protocolCipherCombo) { - super(type, protocolCipherCombo); + public JdkOpenSslEngineInteroptTest(BufferType type, ProtocolCipherCombo protocolCipherCombo, boolean delegate) { + super(type, protocolCipherCombo, delegate); } @BeforeClass diff --git a/handler/src/test/java/io/netty/handler/ssl/JdkSslEngineTest.java b/handler/src/test/java/io/netty/handler/ssl/JdkSslEngineTest.java index ad2ca68b35..bc8786534e 100644 --- a/handler/src/test/java/io/netty/handler/ssl/JdkSslEngineTest.java +++ b/handler/src/test/java/io/netty/handler/ssl/JdkSslEngineTest.java @@ -142,14 +142,17 @@ public class JdkSslEngineTest extends SSLEngineTest { private static final String FALLBACK_APPLICATION_LEVEL_PROTOCOL = "my-protocol-http1_1"; private static final String APPLICATION_LEVEL_PROTOCOL_NOT_COMPATIBLE = "my-protocol-FOO"; - @Parameterized.Parameters(name = "{index}: providerType = {0}, bufferType = {1}, combo = {2}") + @Parameterized.Parameters(name = "{index}: providerType = {0}, bufferType = {1}, combo = {2}, delegate = {3}") public static Collection data() { List params = new ArrayList<>(); for (ProviderType providerType : ProviderType.values()) { for (BufferType bufferType : BufferType.values()) { - params.add(new Object[]{ providerType, bufferType, ProtocolCipherCombo.tlsv12()}); + params.add(new Object[]{ providerType, bufferType, ProtocolCipherCombo.tlsv12(), true }); + params.add(new Object[]{ providerType, bufferType, ProtocolCipherCombo.tlsv12(), false }); + if (PlatformDependent.javaVersion() >= 11) { - params.add(new Object[] { providerType, bufferType, ProtocolCipherCombo.tlsv13() }); + params.add(new Object[] { providerType, bufferType, ProtocolCipherCombo.tlsv13(), true }); + params.add(new Object[] { providerType, bufferType, ProtocolCipherCombo.tlsv13(), false }); } } } @@ -160,8 +163,9 @@ public class JdkSslEngineTest extends SSLEngineTest { private Provider provider; - public JdkSslEngineTest(ProviderType providerType, BufferType bufferType, ProtocolCipherCombo protocolCipherCombo) { - super(bufferType, protocolCipherCombo); + public JdkSslEngineTest(ProviderType providerType, BufferType bufferType, + ProtocolCipherCombo protocolCipherCombo, boolean delegate) { + super(bufferType, protocolCipherCombo, delegate); this.providerType = providerType; } diff --git a/handler/src/test/java/io/netty/handler/ssl/OpenSslEngineTest.java b/handler/src/test/java/io/netty/handler/ssl/OpenSslEngineTest.java index 3a6e1358c6..824a11d8e9 100644 --- a/handler/src/test/java/io/netty/handler/ssl/OpenSslEngineTest.java +++ b/handler/src/test/java/io/netty/handler/ssl/OpenSslEngineTest.java @@ -68,21 +68,23 @@ public class OpenSslEngineTest extends SSLEngineTest { private static final String PREFERRED_APPLICATION_LEVEL_PROTOCOL = "my-protocol-http2"; private static final String FALLBACK_APPLICATION_LEVEL_PROTOCOL = "my-protocol-http1_1"; - @Parameterized.Parameters(name = "{index}: bufferType = {0}, combo = {1}") + @Parameterized.Parameters(name = "{index}: bufferType = {0}, combo = {1}, delegate = {2}") public static Collection data() { List params = new ArrayList<>(); for (BufferType type: BufferType.values()) { - params.add(new Object[] { type, ProtocolCipherCombo.tlsv12()}); + params.add(new Object[] { type, ProtocolCipherCombo.tlsv12(), false }); + params.add(new Object[] { type, ProtocolCipherCombo.tlsv12(), true }); if (OpenSsl.isTlsv13Supported()) { - params.add(new Object[] { type, ProtocolCipherCombo.tlsv13() }); + params.add(new Object[] { type, ProtocolCipherCombo.tlsv13(), false }); + params.add(new Object[] { type, ProtocolCipherCombo.tlsv13(), true }); } } return params; } - public OpenSslEngineTest(BufferType type, ProtocolCipherCombo cipherCombo) { - super(type, cipherCombo); + public OpenSslEngineTest(BufferType type, ProtocolCipherCombo cipherCombo, boolean delegate) { + super(type, cipherCombo, delegate); } @BeforeClass diff --git a/handler/src/test/java/io/netty/handler/ssl/OpenSslJdkSslEngineInteroptTest.java b/handler/src/test/java/io/netty/handler/ssl/OpenSslJdkSslEngineInteroptTest.java index f57722cc80..df10fc731a 100644 --- a/handler/src/test/java/io/netty/handler/ssl/OpenSslJdkSslEngineInteroptTest.java +++ b/handler/src/test/java/io/netty/handler/ssl/OpenSslJdkSslEngineInteroptTest.java @@ -35,21 +35,23 @@ import static org.junit.Assume.assumeTrue; @RunWith(Parameterized.class) public class OpenSslJdkSslEngineInteroptTest extends SSLEngineTest { - @Parameterized.Parameters(name = "{index}: bufferType = {0}, combo = {1}") + @Parameterized.Parameters(name = "{index}: bufferType = {0}, combo = {1}, delegate = {2}") public static Collection data() { List params = new ArrayList<>(); for (BufferType type: BufferType.values()) { - params.add(new Object[] { type, ProtocolCipherCombo.tlsv12()}); + params.add(new Object[] { type, ProtocolCipherCombo.tlsv12(), false }); + params.add(new Object[] { type, ProtocolCipherCombo.tlsv12(), true }); if (PlatformDependent.javaVersion() >= 11 && OpenSsl.isTlsv13Supported()) { - params.add(new Object[] { type, ProtocolCipherCombo.tlsv13() }); + params.add(new Object[] { type, ProtocolCipherCombo.tlsv13(), false }); + params.add(new Object[] { type, ProtocolCipherCombo.tlsv13(), true }); } } return params; } - public OpenSslJdkSslEngineInteroptTest(BufferType type, ProtocolCipherCombo combo) { - super(type, combo); + public OpenSslJdkSslEngineInteroptTest(BufferType type, ProtocolCipherCombo combo, boolean delegate) { + super(type, combo, delegate); } @BeforeClass diff --git a/handler/src/test/java/io/netty/handler/ssl/ReferenceCountedOpenSslEngineTest.java b/handler/src/test/java/io/netty/handler/ssl/ReferenceCountedOpenSslEngineTest.java index 588619d3a7..1728a53844 100644 --- a/handler/src/test/java/io/netty/handler/ssl/ReferenceCountedOpenSslEngineTest.java +++ b/handler/src/test/java/io/netty/handler/ssl/ReferenceCountedOpenSslEngineTest.java @@ -23,8 +23,8 @@ import javax.net.ssl.SSLEngine; public class ReferenceCountedOpenSslEngineTest extends OpenSslEngineTest { - public ReferenceCountedOpenSslEngineTest(BufferType type, ProtocolCipherCombo combo) { - super(type, combo); + public ReferenceCountedOpenSslEngineTest(BufferType type, ProtocolCipherCombo combo, boolean delegate) { + super(type, combo, delegate); } @Override diff --git a/handler/src/test/java/io/netty/handler/ssl/SSLEngineTest.java b/handler/src/test/java/io/netty/handler/ssl/SSLEngineTest.java index e17f3e30df..1d728777da 100644 --- a/handler/src/test/java/io/netty/handler/ssl/SSLEngineTest.java +++ b/handler/src/test/java/io/netty/handler/ssl/SSLEngineTest.java @@ -95,6 +95,8 @@ import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import static io.netty.handler.ssl.SslUtils.PROTOCOL_SSL_V2; @@ -261,10 +263,13 @@ public abstract class SSLEngineTest { private final BufferType type; private final ProtocolCipherCombo protocolCipherCombo; + private final boolean delegate; + private ExecutorService delegatingExecutor; - protected SSLEngineTest(BufferType type, ProtocolCipherCombo protocolCipherCombo) { + protected SSLEngineTest(BufferType type, ProtocolCipherCombo protocolCipherCombo, boolean delegate) { this.type = type; this.protocolCipherCombo = protocolCipherCombo; + this.delegate = delegate; } protected ByteBuffer allocateBuffer(int len) { @@ -450,6 +455,9 @@ public abstract class SSLEngineTest { MockitoAnnotations.initMocks(this); serverLatch = new CountDownLatch(1); clientLatch = new CountDownLatch(1); + if (delegate) { + delegatingExecutor = Executors.newCachedThreadPool(); + } } @After @@ -509,6 +517,10 @@ public abstract class SSLEngineTest { clientGroupShutdownFuture.sync(); } serverException = null; + + if (delegatingExecutor != null) { + delegatingExecutor.shutdown(); + } } @Test @@ -713,7 +725,8 @@ public abstract class SSLEngineTest { ch.config().setAllocator(new TestByteBufAllocator(ch.config().getAllocator(), type)); ChannelPipeline p = ch.pipeline(); - SslHandler handler = serverSslCtx.newHandler(ch.alloc()); + SslHandler handler = delegatingExecutor == null ? serverSslCtx.newHandler(ch.alloc()) : + serverSslCtx.newHandler(ch.alloc(), delegatingExecutor); if (serverInitEngine) { mySetupMutualAuthServerInitSslHandler(handler); } @@ -756,7 +769,10 @@ public abstract class SSLEngineTest { protected void initChannel(Channel ch) throws Exception { ch.config().setAllocator(new TestByteBufAllocator(ch.config().getAllocator(), type)); ChannelPipeline p = ch.pipeline(); - p.addLast(clientSslCtx.newHandler(ch.alloc())); + + SslHandler handler = delegatingExecutor == null ? clientSslCtx.newHandler(ch.alloc()) : + clientSslCtx.newHandler(ch.alloc(), delegatingExecutor); + p.addLast(handler); p.addLast(new MessageDelegatorChannelHandler(clientReceiver, clientLatch)); p.addLast(new ChannelInboundHandlerAdapter() { @Override @@ -860,7 +876,10 @@ public abstract class SSLEngineTest { protected void initChannel(Channel ch) throws Exception { ch.config().setAllocator(new TestByteBufAllocator(ch.config().getAllocator(), type)); ChannelPipeline p = ch.pipeline(); - p.addLast(serverSslCtx.newHandler(ch.alloc())); + + SslHandler handler = delegatingExecutor == null ? serverSslCtx.newHandler(ch.alloc()) : + serverSslCtx.newHandler(ch.alloc(), delegatingExecutor); + p.addLast(handler); p.addLast(new MessageDelegatorChannelHandler(serverReceiver, serverLatch)); p.addLast(new ChannelInboundHandlerAdapter() { @Override @@ -900,7 +919,11 @@ public abstract class SSLEngineTest { ch.config().setAllocator(new TestByteBufAllocator(ch.config().getAllocator(), type)); ChannelPipeline p = ch.pipeline(); InetSocketAddress remoteAddress = (InetSocketAddress) serverChannel.localAddress(); - SslHandler sslHandler = clientSslCtx.newHandler(ch.alloc(), expectedHost, 0); + + SslHandler sslHandler = delegatingExecutor == null ? + clientSslCtx.newHandler(ch.alloc(), expectedHost, 0) : + clientSslCtx.newHandler(ch.alloc(), expectedHost, 0, delegatingExecutor); + SSLParameters parameters = sslHandler.engine().getSSLParameters(); if (SslUtils.isValidHostNameForSNI(expectedHost)) { assertEquals(1, parameters.getServerNames().size()); @@ -1065,7 +1088,10 @@ public abstract class SSLEngineTest { protected void initChannel(Channel ch) throws Exception { ch.config().setAllocator(new TestByteBufAllocator(ch.config().getAllocator(), type)); - final SslHandler handler = clientSslCtx.newHandler(ch.alloc()); + final SslHandler handler = delegatingExecutor == null ? + clientSslCtx.newHandler(ch.alloc()) : + clientSslCtx.newHandler(ch.alloc(), delegatingExecutor); + handler.engine().setNeedClientAuth(true); ChannelPipeline p = ch.pipeline(); p.addLast(handler); @@ -1137,7 +1163,7 @@ public abstract class SSLEngineTest { MessageReceiver receiver) throws Exception { List dataCapture = null; try { - assertTrue(sendChannel.writeAndFlush(message).await(5, TimeUnit.SECONDS)); + assertTrue(sendChannel.writeAndFlush(message).await(50, TimeUnit.SECONDS)); receiverLatch.await(5, TimeUnit.SECONDS); message.readerIndex(0); ArgumentCaptor captor = ArgumentCaptor.forClass(ByteBuf.class); @@ -1260,7 +1286,12 @@ public abstract class SSLEngineTest { ch.config().setAllocator(new TestByteBufAllocator(ch.config().getAllocator(), type)); ChannelPipeline p = ch.pipeline(); - p.addLast(serverSslCtx.newHandler(ch.alloc())); + + SslHandler handler = delegatingExecutor == null ? + serverSslCtx.newHandler(ch.alloc()) : + serverSslCtx.newHandler(ch.alloc(), delegatingExecutor); + + p.addLast(handler); p.addLast(new ChannelInboundHandlerAdapter() { @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) { @@ -1313,7 +1344,11 @@ public abstract class SSLEngineTest { ch.config().setAllocator(new TestByteBufAllocator(ch.config().getAllocator(), type)); ChannelPipeline p = ch.pipeline(); - SslHandler sslHandler = clientSslCtx.newHandler(ch.alloc()); + + SslHandler sslHandler = delegatingExecutor == null ? + clientSslCtx.newHandler(ch.alloc()) : + clientSslCtx.newHandler(ch.alloc(), delegatingExecutor); + // The renegotiate is not expected to succeed, so we should stop trying in a timely manner so // the unit test can terminate relativley quicly. sslHandler.setHandshakeTimeout(1, TimeUnit.SECONDS); @@ -1393,7 +1428,7 @@ public abstract class SSLEngineTest { } } - protected void handshake(SSLEngine clientEngine, SSLEngine serverEngine) throws SSLException { + protected void handshake(SSLEngine clientEngine, SSLEngine serverEngine) throws Exception { ByteBuffer cTOs = allocateBuffer(clientEngine.getSession().getPacketBufferSize()); ByteBuffer sTOc = allocateBuffer(serverEngine.getSession().getPacketBufferSize()); @@ -1486,14 +1521,18 @@ public abstract class SSLEngineTest { return result.getHandshakeStatus() == SSLEngineResult.HandshakeStatus.FINISHED; } - private static void runDelegatedTasks(SSLEngineResult result, SSLEngine engine) { + private void runDelegatedTasks(SSLEngineResult result, SSLEngine engine) throws Exception { if (result.getHandshakeStatus() == SSLEngineResult.HandshakeStatus.NEED_TASK) { for (;;) { Runnable task = engine.getDelegatedTask(); if (task == null) { break; } - task.run(); + if (delegatingExecutor == null) { + task.run(); + } else { + delegatingExecutor.submit(task).get(); + } } } } @@ -1596,7 +1635,12 @@ public abstract class SSLEngineTest { ch.config().setAllocator(new TestByteBufAllocator(ch.config().getAllocator(), type)); ChannelPipeline p = ch.pipeline(); - p.addLast(serverSslCtx.newHandler(ch.alloc())); + + SslHandler sslHandler = delegatingExecutor == null ? + serverSslCtx.newHandler(ch.alloc()) : + serverSslCtx.newHandler(ch.alloc(), delegatingExecutor); + + p.addLast(sslHandler); p.addLast(new MessageDelegatorChannelHandler(serverReceiver, serverLatch)); p.addLast(new ChannelInboundHandlerAdapter() { @Override @@ -1621,7 +1665,12 @@ public abstract class SSLEngineTest { ch.config().setAllocator(new TestByteBufAllocator(ch.config().getAllocator(), type)); ChannelPipeline p = ch.pipeline(); - p.addLast(clientSslCtx.newHandler(ch.alloc())); + + SslHandler sslHandler = delegatingExecutor == null ? + clientSslCtx.newHandler(ch.alloc()) : + clientSslCtx.newHandler(ch.alloc(), delegatingExecutor); + + p.addLast(sslHandler); p.addLast(new MessageDelegatorChannelHandler(clientReceiver, clientLatch)); p.addLast(new ChannelInboundHandlerAdapter() { @Override @@ -1672,7 +1721,11 @@ public abstract class SSLEngineTest { protected void initChannel(Channel ch) throws Exception { ch.config().setAllocator(new TestByteBufAllocator(ch.config().getAllocator(), type)); - ch.pipeline().addFirst(serverSslCtx.newHandler(ch.alloc())); + SslHandler sslHandler = delegatingExecutor == null ? + serverSslCtx.newHandler(ch.alloc()) : + serverSslCtx.newHandler(ch.alloc(), delegatingExecutor); + + ch.pipeline().addFirst(sslHandler); ch.pipeline().addLast(new ChannelInboundHandlerAdapter() { @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { diff --git a/handler/src/test/java/io/netty/handler/ssl/SslHandlerTest.java b/handler/src/test/java/io/netty/handler/ssl/SslHandlerTest.java index 521e2e7374..e8dcba2c5b 100644 --- a/handler/src/test/java/io/netty/handler/ssl/SslHandlerTest.java +++ b/handler/src/test/java/io/netty/handler/ssl/SslHandlerTest.java @@ -52,8 +52,9 @@ import io.netty.util.AbstractReferenceCounted; import io.netty.util.IllegalReferenceCountException; import io.netty.util.ReferenceCountUtil; import io.netty.util.ReferenceCounted; -import io.netty.util.concurrent.Future; import io.netty.util.concurrent.FutureListener; +import io.netty.util.concurrent.ImmediateEventExecutor; +import io.netty.util.concurrent.ImmediateExecutor; import io.netty.util.concurrent.Promise; import org.hamcrest.CoreMatchers; import org.junit.Test; @@ -64,6 +65,9 @@ import java.security.NoSuchAlgorithmException; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -808,4 +812,157 @@ public class SslHandlerTest { ReferenceCountUtil.release(sslClientCtx); } } + + @Test + public void testHandshakeWithExecutorThatExecuteDirecty() throws Exception { + testHandshakeWithExecutor(command -> command.run()); + } + + @Test + public void testHandshakeWithImmediateExecutor() throws Exception { + testHandshakeWithExecutor(ImmediateExecutor.INSTANCE); + } + + @Test + public void testHandshakeWithImmediateEventExecutor() throws Exception { + testHandshakeWithExecutor(ImmediateEventExecutor.INSTANCE); + } + + @Test + public void testHandshakeWithExecutor() throws Exception { + ExecutorService executorService = Executors.newCachedThreadPool(); + try { + testHandshakeWithExecutor(executorService); + } finally { + executorService.shutdown(); + } + } + + private void testHandshakeWithExecutor(Executor executor) throws Exception { + final SslContext sslClientCtx = SslContextBuilder.forClient() + .trustManager(InsecureTrustManagerFactory.INSTANCE) + .sslProvider(SslProvider.JDK).build(); + + final SelfSignedCertificate cert = new SelfSignedCertificate(); + final SslContext sslServerCtx = SslContextBuilder.forServer(cert.key(), cert.cert()) + .sslProvider(SslProvider.JDK).build(); + + EventLoopGroup group = new MultithreadEventLoopGroup(NioHandler.newFactory()); + Channel sc = null; + Channel cc = null; + final SslHandler clientSslHandler = sslClientCtx.newHandler(UnpooledByteBufAllocator.DEFAULT, executor); + final SslHandler serverSslHandler = sslServerCtx.newHandler(UnpooledByteBufAllocator.DEFAULT, executor); + + try { + sc = new ServerBootstrap() + .group(group) + .channel(NioServerSocketChannel.class) + .childHandler(serverSslHandler) + .bind(new InetSocketAddress(0)).syncUninterruptibly().channel(); + + ChannelFuture future = new Bootstrap() + .group(group) + .channel(NioSocketChannel.class) + .handler(new ChannelInitializer() { + @Override + protected void initChannel(Channel ch) { + ch.pipeline().addLast(clientSslHandler); + } + }).connect(sc.localAddress()); + cc = future.syncUninterruptibly().channel(); + + assertTrue(clientSslHandler.handshakeFuture().await().isSuccess()); + assertTrue(serverSslHandler.handshakeFuture().await().isSuccess()); + } finally { + if (cc != null) { + cc.close().syncUninterruptibly(); + } + if (sc != null) { + sc.close().syncUninterruptibly(); + } + group.shutdownGracefully(); + ReferenceCountUtil.release(sslClientCtx); + } + } + + @Test + public void testClientHandshakeTimeoutBecauseExecutorNotExecute() throws Exception { + testHandshakeTimeoutBecauseExecutorNotExecute(true); + } + + @Test + public void testServerHandshakeTimeoutBecauseExecutorNotExecute() throws Exception { + testHandshakeTimeoutBecauseExecutorNotExecute(false); + } + + private void testHandshakeTimeoutBecauseExecutorNotExecute(final boolean client) throws Exception { + final SslContext sslClientCtx = SslContextBuilder.forClient() + .trustManager(InsecureTrustManagerFactory.INSTANCE) + .sslProvider(SslProvider.JDK).build(); + + final SelfSignedCertificate cert = new SelfSignedCertificate(); + final SslContext sslServerCtx = SslContextBuilder.forServer(cert.key(), cert.cert()) + .sslProvider(SslProvider.JDK).build(); + + EventLoopGroup group = new MultithreadEventLoopGroup(NioHandler.newFactory()); + Channel sc = null; + Channel cc = null; + final SslHandler clientSslHandler = sslClientCtx.newHandler(UnpooledByteBufAllocator.DEFAULT, command -> { + if (!client) { + command.run(); + } + // Do nothing to simulate slow execution. + }); + if (client) { + clientSslHandler.setHandshakeTimeout(100, TimeUnit.MILLISECONDS); + } + final SslHandler serverSslHandler = sslServerCtx.newHandler(UnpooledByteBufAllocator.DEFAULT, command -> { + if (client) { + command.run(); + } + // Do nothing to simulate slow execution. + }); + if (!client) { + serverSslHandler.setHandshakeTimeout(100, TimeUnit.MILLISECONDS); + } + try { + sc = new ServerBootstrap() + .group(group) + .channel(NioServerSocketChannel.class) + .childHandler(serverSslHandler) + .bind(new InetSocketAddress(0)).syncUninterruptibly().channel(); + + ChannelFuture future = new Bootstrap() + .group(group) + .channel(NioSocketChannel.class) + .handler(new ChannelInitializer() { + @Override + protected void initChannel(Channel ch) { + ch.pipeline().addLast(clientSslHandler); + } + }).connect(sc.localAddress()); + cc = future.syncUninterruptibly().channel(); + + if (client) { + Throwable cause = clientSslHandler.handshakeFuture().await().cause(); + assertThat(cause, CoreMatchers.instanceOf(SSLException.class)); + assertThat(cause.getMessage(), containsString("timed out")); + assertFalse(serverSslHandler.handshakeFuture().await().isSuccess()); + } else { + Throwable cause = serverSslHandler.handshakeFuture().await().cause(); + assertThat(cause, CoreMatchers.instanceOf(SSLException.class)); + assertThat(cause.getMessage(), containsString("timed out")); + assertFalse(clientSslHandler.handshakeFuture().await().isSuccess()); + } + } finally { + if (cc != null) { + cc.close().syncUninterruptibly(); + } + if (sc != null) { + sc.close().syncUninterruptibly(); + } + group.shutdownGracefully(); + ReferenceCountUtil.release(sslClientCtx); + } + } } diff --git a/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketSslClientRenegotiateTest.java b/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketSslClientRenegotiateTest.java index 6e07fc3c43..dd598adea7 100644 --- a/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketSslClientRenegotiateTest.java +++ b/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketSslClientRenegotiateTest.java @@ -18,6 +18,7 @@ package io.netty.testsuite.transport.socket; import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; import io.netty.channel.Channel; import io.netty.channel.ChannelHandler.Sharable; import io.netty.channel.ChannelHandlerContext; @@ -46,6 +47,9 @@ import java.security.cert.CertificateException; import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicReference; import javax.net.ssl.SSLHandshakeException; @@ -73,7 +77,7 @@ public class SocketSslClientRenegotiateTest extends AbstractSocketTest { KEY_FILE = ssc.privateKey(); } - @Parameters(name = "{index}: serverEngine = {0}, clientEngine = {1}") + @Parameters(name = "{index}: serverEngine = {0}, clientEngine = {1}, delegate = {2}") public static Collection data() throws Exception { List serverContexts = new ArrayList<>(); List clientContexts = new ArrayList<>(); @@ -99,7 +103,8 @@ public class SocketSslClientRenegotiateTest extends AbstractSocketTest { for (SslContext sc: serverContexts) { for (SslContext cc: clientContexts) { for (int i = 0; i < 32; i++) { - params.add(new Object[] { sc, cc}); + params.add(new Object[] { sc, cc, true}); + params.add(new Object[] { sc, cc, false}); } } } @@ -109,6 +114,7 @@ public class SocketSslClientRenegotiateTest extends AbstractSocketTest { private final SslContext serverCtx; private final SslContext clientCtx; + private final boolean delegate; private final AtomicReference clientException = new AtomicReference<>(); private final AtomicReference serverException = new AtomicReference<>(); @@ -124,9 +130,10 @@ public class SocketSslClientRenegotiateTest extends AbstractSocketTest { private final TestHandler serverHandler = new TestHandler(serverException); public SocketSslClientRenegotiateTest( - SslContext serverCtx, SslContext clientCtx) { + SslContext serverCtx, SslContext clientCtx, boolean delegate) { this.serverCtx = serverCtx; this.clientCtx = clientCtx; + this.delegate = delegate; } @Test(timeout = 30000) @@ -137,58 +144,74 @@ public class SocketSslClientRenegotiateTest extends AbstractSocketTest { run(); } + private static SslHandler newSslHandler(SslContext sslCtx, ByteBufAllocator allocator, Executor executor) { + if (executor == null) { + return sslCtx.newHandler(allocator); + } else { + return sslCtx.newHandler(allocator, executor); + } + } + public void testSslRenegotiationRejected(ServerBootstrap sb, Bootstrap cb) throws Throwable { reset(); - sb.childHandler(new ChannelInitializer() { - @Override - @SuppressWarnings("deprecation") - public void initChannel(Channel sch) throws Exception { - serverChannel = sch; - serverSslHandler = serverCtx.newHandler(sch.alloc()); - // As we test renegotiation we should use a protocol that support it. - serverSslHandler.engine().setEnabledProtocols(new String[] { "TLSv1.2" }); - sch.pipeline().addLast("ssl", serverSslHandler); - sch.pipeline().addLast("handler", serverHandler); - } - }); + final ExecutorService executorService = delegate ? Executors.newCachedThreadPool() : null; - cb.handler(new ChannelInitializer() { - @Override - @SuppressWarnings("deprecation") - public void initChannel(Channel sch) throws Exception { - clientChannel = sch; - clientSslHandler = clientCtx.newHandler(sch.alloc()); - // As we test renegotiation we should use a protocol that support it. - clientSslHandler.engine().setEnabledProtocols(new String[] { "TLSv1.2" }); - sch.pipeline().addLast("ssl", clientSslHandler); - sch.pipeline().addLast("handler", clientHandler); - } - }); - - Channel sc = sb.bind().sync().channel(); - cb.connect(sc.localAddress()).sync(); - - Future clientHandshakeFuture = clientSslHandler.handshakeFuture(); - clientHandshakeFuture.sync(); - - String renegotiation = clientSslHandler.engine().getEnabledCipherSuites()[0]; - // Use the first previous enabled ciphersuite and try to renegotiate. - clientSslHandler.engine().setEnabledCipherSuites(new String[] { renegotiation }); - clientSslHandler.renegotiate().await(); - serverChannel.close().awaitUninterruptibly(); - clientChannel.close().awaitUninterruptibly(); - sc.close().awaitUninterruptibly(); try { - if (serverException.get() != null) { - throw serverException.get(); + sb.childHandler(new ChannelInitializer() { + @Override + @SuppressWarnings("deprecation") + public void initChannel(Channel sch) throws Exception { + serverChannel = sch; + serverSslHandler = newSslHandler(serverCtx, sch.alloc(), executorService); + // As we test renegotiation we should use a protocol that support it. + serverSslHandler.engine().setEnabledProtocols(new String[]{"TLSv1.2"}); + sch.pipeline().addLast("ssl", serverSslHandler); + sch.pipeline().addLast("handler", serverHandler); + } + }); + + cb.handler(new ChannelInitializer() { + @Override + @SuppressWarnings("deprecation") + public void initChannel(Channel sch) throws Exception { + clientChannel = sch; + clientSslHandler = newSslHandler(clientCtx, sch.alloc(), executorService); + // As we test renegotiation we should use a protocol that support it. + clientSslHandler.engine().setEnabledProtocols(new String[]{"TLSv1.2"}); + sch.pipeline().addLast("ssl", clientSslHandler); + sch.pipeline().addLast("handler", clientHandler); + } + }); + + Channel sc = sb.bind().sync().channel(); + cb.connect(sc.localAddress()).sync(); + + Future clientHandshakeFuture = clientSslHandler.handshakeFuture(); + clientHandshakeFuture.sync(); + + String renegotiation = clientSslHandler.engine().getEnabledCipherSuites()[0]; + // Use the first previous enabled ciphersuite and try to renegotiate. + clientSslHandler.engine().setEnabledCipherSuites(new String[]{renegotiation}); + clientSslHandler.renegotiate().await(); + serverChannel.close().awaitUninterruptibly(); + clientChannel.close().awaitUninterruptibly(); + sc.close().awaitUninterruptibly(); + try { + if (serverException.get() != null) { + throw serverException.get(); + } + fail(); + } catch (DecoderException e) { + assertTrue(e.getCause() instanceof SSLHandshakeException); + } + if (clientException.get() != null) { + throw clientException.get(); + } + } finally { + if (executorService != null) { + executorService.shutdown(); } - fail(); - } catch (DecoderException e) { - assertTrue(e.getCause() instanceof SSLHandshakeException); - } - if (clientException.get() != null) { - throw clientException.get(); } } diff --git a/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketSslGreetingTest.java b/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketSslGreetingTest.java index ba0662d723..53dc1d552b 100644 --- a/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketSslGreetingTest.java +++ b/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketSslGreetingTest.java @@ -18,6 +18,7 @@ package io.netty.testsuite.transport.socket; import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInitializer; @@ -48,6 +49,9 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicReference; import static org.junit.Assert.assertEquals; @@ -74,7 +78,7 @@ public class SocketSslGreetingTest extends AbstractSocketTest { KEY_FILE = ssc.privateKey(); } - @Parameters(name = "{index}: serverEngine = {0}, clientEngine = {1}") + @Parameters(name = "{index}: serverEngine = {0}, clientEngine = {1}, delegate = {2}") public static Collection data() throws Exception { List serverContexts = new ArrayList<>(); serverContexts.add(SslContextBuilder.forServer(CERT_FILE, KEY_FILE).sslProvider(SslProvider.JDK).build()); @@ -95,7 +99,8 @@ public class SocketSslGreetingTest extends AbstractSocketTest { List params = new ArrayList<>(); for (SslContext sc: serverContexts) { for (SslContext cc: clientContexts) { - params.add(new Object[] { sc, cc }); + params.add(new Object[] { sc, cc, true }); + params.add(new Object[] { sc, cc, false }); } } return params; @@ -103,10 +108,20 @@ public class SocketSslGreetingTest extends AbstractSocketTest { private final SslContext serverCtx; private final SslContext clientCtx; + private final boolean delegate; - public SocketSslGreetingTest(SslContext serverCtx, SslContext clientCtx) { + public SocketSslGreetingTest(SslContext serverCtx, SslContext clientCtx, boolean delegate) { this.serverCtx = serverCtx; this.clientCtx = clientCtx; + this.delegate = delegate; + } + + private static SslHandler newSslHandler(SslContext sslCtx, ByteBufAllocator allocator, Executor executor) { + if (executor == null) { + return sslCtx.newHandler(allocator); + } else { + return sslCtx.newHandler(allocator, executor); + } } // Test for https://github.com/netty/netty/pull/2437 @@ -119,46 +134,53 @@ public class SocketSslGreetingTest extends AbstractSocketTest { final ServerHandler sh = new ServerHandler(); final ClientHandler ch = new ClientHandler(); - sb.childHandler(new ChannelInitializer() { - @Override - public void initChannel(Channel sch) throws Exception { - ChannelPipeline p = sch.pipeline(); - p.addLast(serverCtx.newHandler(sch.alloc())); - p.addLast(new LoggingHandler(LOG_LEVEL)); - p.addLast(sh); + final ExecutorService executorService = delegate ? Executors.newCachedThreadPool() : null; + try { + sb.childHandler(new ChannelInitializer() { + @Override + public void initChannel(Channel sch) throws Exception { + ChannelPipeline p = sch.pipeline(); + p.addLast(newSslHandler(serverCtx, sch.alloc(), executorService)); + p.addLast(new LoggingHandler(LOG_LEVEL)); + p.addLast(sh); + } + }); + + cb.handler(new ChannelInitializer() { + @Override + public void initChannel(Channel sch) throws Exception { + ChannelPipeline p = sch.pipeline(); + p.addLast(newSslHandler(clientCtx, sch.alloc(), executorService)); + p.addLast(new LoggingHandler(LOG_LEVEL)); + p.addLast(ch); + } + }); + + Channel sc = sb.bind().sync().channel(); + Channel cc = cb.connect(sc.localAddress()).sync().channel(); + + ch.latch.await(); + + sh.channel.close().awaitUninterruptibly(); + cc.close().awaitUninterruptibly(); + sc.close().awaitUninterruptibly(); + + if (sh.exception.get() != null && !(sh.exception.get() instanceof IOException)) { + throw sh.exception.get(); } - }); - - cb.handler(new ChannelInitializer() { - @Override - public void initChannel(Channel sch) throws Exception { - ChannelPipeline p = sch.pipeline(); - p.addLast(clientCtx.newHandler(sch.alloc())); - p.addLast(new LoggingHandler(LOG_LEVEL)); - p.addLast(ch); + if (ch.exception.get() != null && !(ch.exception.get() instanceof IOException)) { + throw ch.exception.get(); + } + if (sh.exception.get() != null) { + throw sh.exception.get(); + } + if (ch.exception.get() != null) { + throw ch.exception.get(); + } + } finally { + if (executorService != null) { + executorService.shutdown(); } - }); - - Channel sc = sb.bind().sync().channel(); - Channel cc = cb.connect(sc.localAddress()).sync().channel(); - - ch.latch.await(); - - sh.channel.close().awaitUninterruptibly(); - cc.close().awaitUninterruptibly(); - sc.close().awaitUninterruptibly(); - - if (sh.exception.get() != null && !(sh.exception.get() instanceof IOException)) { - throw sh.exception.get(); - } - if (ch.exception.get() != null && !(ch.exception.get() instanceof IOException)) { - throw ch.exception.get(); - } - if (sh.exception.get() != null) { - throw sh.exception.get(); - } - if (ch.exception.get() != null) { - throw ch.exception.get(); } } diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollDomainSocketSslClientRenegotiateTest.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollDomainSocketSslClientRenegotiateTest.java index 9037c18adf..3f493d0835 100644 --- a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollDomainSocketSslClientRenegotiateTest.java +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollDomainSocketSslClientRenegotiateTest.java @@ -26,8 +26,8 @@ import java.util.List; public class EpollDomainSocketSslClientRenegotiateTest extends SocketSslClientRenegotiateTest { - public EpollDomainSocketSslClientRenegotiateTest(SslContext serverCtx, SslContext clientCtx) { - super(serverCtx, clientCtx); + public EpollDomainSocketSslClientRenegotiateTest(SslContext serverCtx, SslContext clientCtx, boolean delegate) { + super(serverCtx, clientCtx, delegate); } @Override diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollDomainSocketSslGreetingTest.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollDomainSocketSslGreetingTest.java index a1ed0c5ac2..4683a701ea 100644 --- a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollDomainSocketSslGreetingTest.java +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollDomainSocketSslGreetingTest.java @@ -26,8 +26,8 @@ import java.util.List; public class EpollDomainSocketSslGreetingTest extends SocketSslGreetingTest { - public EpollDomainSocketSslGreetingTest(SslContext serverCtx, SslContext clientCtx) { - super(serverCtx, clientCtx); + public EpollDomainSocketSslGreetingTest(SslContext serverCtx, SslContext clientCtx, boolean delegate) { + super(serverCtx, clientCtx, delegate); } @Override diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketSslClientRenegotiateTest.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketSslClientRenegotiateTest.java index abe74d2ff5..3f69196f12 100644 --- a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketSslClientRenegotiateTest.java +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketSslClientRenegotiateTest.java @@ -25,8 +25,8 @@ import java.util.List; public class EpollSocketSslClientRenegotiateTest extends SocketSslClientRenegotiateTest { - public EpollSocketSslClientRenegotiateTest(SslContext serverCtx, SslContext clientCtx) { - super(serverCtx, clientCtx); + public EpollSocketSslClientRenegotiateTest(SslContext serverCtx, SslContext clientCtx, boolean delegate) { + super(serverCtx, clientCtx, delegate); } @Override diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketSslGreetingTest.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketSslGreetingTest.java index 21d86b4fc7..34bf98a150 100644 --- a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketSslGreetingTest.java +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketSslGreetingTest.java @@ -25,8 +25,8 @@ import java.util.List; public class EpollSocketSslGreetingTest extends SocketSslGreetingTest { - public EpollSocketSslGreetingTest(SslContext serverCtx, SslContext clientCtx) { - super(serverCtx, clientCtx); + public EpollSocketSslGreetingTest(SslContext serverCtx, SslContext clientCtx, boolean delegate) { + super(serverCtx, clientCtx, delegate); } @Override diff --git a/transport-native-kqueue/src/test/java/io/netty/channel/kqueue/KQueueDomainSocketSslClientRenegotiateTest.java b/transport-native-kqueue/src/test/java/io/netty/channel/kqueue/KQueueDomainSocketSslClientRenegotiateTest.java index 47431fa8a0..a719b7c73e 100644 --- a/transport-native-kqueue/src/test/java/io/netty/channel/kqueue/KQueueDomainSocketSslClientRenegotiateTest.java +++ b/transport-native-kqueue/src/test/java/io/netty/channel/kqueue/KQueueDomainSocketSslClientRenegotiateTest.java @@ -26,8 +26,8 @@ import java.util.List; public class KQueueDomainSocketSslClientRenegotiateTest extends SocketSslClientRenegotiateTest { - public KQueueDomainSocketSslClientRenegotiateTest(SslContext serverCtx, SslContext clientCtx) { - super(serverCtx, clientCtx); + public KQueueDomainSocketSslClientRenegotiateTest(SslContext serverCtx, SslContext clientCtx, boolean delegate) { + super(serverCtx, clientCtx, delegate); } @Override diff --git a/transport-native-kqueue/src/test/java/io/netty/channel/kqueue/KQueueDomainSocketSslGreetingTest.java b/transport-native-kqueue/src/test/java/io/netty/channel/kqueue/KQueueDomainSocketSslGreetingTest.java index 492021a52b..8cbaa00efa 100644 --- a/transport-native-kqueue/src/test/java/io/netty/channel/kqueue/KQueueDomainSocketSslGreetingTest.java +++ b/transport-native-kqueue/src/test/java/io/netty/channel/kqueue/KQueueDomainSocketSslGreetingTest.java @@ -26,8 +26,8 @@ import java.util.List; public class KQueueDomainSocketSslGreetingTest extends SocketSslGreetingTest { - public KQueueDomainSocketSslGreetingTest(SslContext serverCtx, SslContext clientCtx) { - super(serverCtx, clientCtx); + public KQueueDomainSocketSslGreetingTest(SslContext serverCtx, SslContext clientCtx, boolean delegate) { + super(serverCtx, clientCtx, delegate); } @Override diff --git a/transport-native-kqueue/src/test/java/io/netty/channel/kqueue/KQueueSocketSslClientRenegotiateTest.java b/transport-native-kqueue/src/test/java/io/netty/channel/kqueue/KQueueSocketSslClientRenegotiateTest.java index a804ca98ee..a3ba238181 100644 --- a/transport-native-kqueue/src/test/java/io/netty/channel/kqueue/KQueueSocketSslClientRenegotiateTest.java +++ b/transport-native-kqueue/src/test/java/io/netty/channel/kqueue/KQueueSocketSslClientRenegotiateTest.java @@ -25,8 +25,8 @@ import java.util.List; public class KQueueSocketSslClientRenegotiateTest extends SocketSslClientRenegotiateTest { - public KQueueSocketSslClientRenegotiateTest(SslContext serverCtx, SslContext clientCtx) { - super(serverCtx, clientCtx); + public KQueueSocketSslClientRenegotiateTest(SslContext serverCtx, SslContext clientCtx, boolean delegate) { + super(serverCtx, clientCtx, delegate); } @Override diff --git a/transport-native-kqueue/src/test/java/io/netty/channel/kqueue/KQueueSocketSslGreetingTest.java b/transport-native-kqueue/src/test/java/io/netty/channel/kqueue/KQueueSocketSslGreetingTest.java index 9242fc388f..6eecc35a51 100644 --- a/transport-native-kqueue/src/test/java/io/netty/channel/kqueue/KQueueSocketSslGreetingTest.java +++ b/transport-native-kqueue/src/test/java/io/netty/channel/kqueue/KQueueSocketSslGreetingTest.java @@ -25,8 +25,8 @@ import java.util.List; public class KQueueSocketSslGreetingTest extends SocketSslGreetingTest { - public KQueueSocketSslGreetingTest(SslContext serverCtx, SslContext clientCtx) { - super(serverCtx, clientCtx); + public KQueueSocketSslGreetingTest(SslContext serverCtx, SslContext clientCtx, boolean delegate) { + super(serverCtx, clientCtx, delegate); } @Override