Support using an Executor to offload blocking / long-running tasks wh… (#8847)
Motivation: The SSLEngine does provide a way to signal to the caller that it may need to execute a blocking / long-running task which then can be offloaded to an Executor to ensure the I/O thread is not blocked. Currently how we handle this in SslHandler is not really optimal as while we offload to the Executor we still block the I/O Thread. Modifications: - Correctly support offloading the task to the Executor while suspending processing of SSL in the I/O Thread - Add new methods to SslContext to specify the Executor when creating a SslHandler - Remove @deprecated annotations from SslHandler constructor that takes an Executor - Adjust tests to also run with the Executor to ensure all works as expected. Result: Be able to offload long running tasks to an Executor when using SslHandler. Partly fixes https://github.com/netty/netty/issues/7862 and https://github.com/netty/netty/issues/7020.
This commit is contained in:
parent
a2ccc287c3
commit
b9d277dbcb
@ -22,6 +22,7 @@ import io.netty.buffer.ByteBufAllocator;
|
|||||||
import javax.net.ssl.SSLEngine;
|
import javax.net.ssl.SSLEngine;
|
||||||
import javax.net.ssl.SSLSessionContext;
|
import javax.net.ssl.SSLSessionContext;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.concurrent.Executor;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Adapter class which allows to wrap another {@link SslContext} and init {@link SSLEngine} instances.
|
* Adapter class which allows to wrap another {@link SslContext} and init {@link SSLEngine} instances.
|
||||||
@ -87,6 +88,21 @@ public abstract class DelegatingSslContext extends SslContext {
|
|||||||
return handler;
|
return handler;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected SslHandler newHandler(ByteBufAllocator alloc, boolean startTls, Executor executor) {
|
||||||
|
SslHandler handler = ctx.newHandler(alloc, startTls, executor);
|
||||||
|
initHandler(handler);
|
||||||
|
return handler;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected SslHandler newHandler(ByteBufAllocator alloc, String peerHost, int peerPort,
|
||||||
|
boolean startTls, Executor executor) {
|
||||||
|
SslHandler handler = ctx.newHandler(alloc, peerHost, peerPort, startTls, executor);
|
||||||
|
initHandler(handler);
|
||||||
|
return handler;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public final SSLSessionContext sessionContext() {
|
public final SSLSessionContext sessionContext() {
|
||||||
return ctx.sessionContext();
|
return ctx.sessionContext();
|
||||||
|
@ -44,6 +44,7 @@ import java.util.Collections;
|
|||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
import java.util.concurrent.Executor;
|
||||||
import java.util.concurrent.locks.Lock;
|
import java.util.concurrent.locks.Lock;
|
||||||
import java.util.concurrent.locks.ReadWriteLock;
|
import java.util.concurrent.locks.ReadWriteLock;
|
||||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||||
@ -391,6 +392,17 @@ public abstract class ReferenceCountedOpenSslContext extends SslContext implemen
|
|||||||
return new SslHandler(newEngine0(alloc, peerHost, peerPort, false), startTls);
|
return new SslHandler(newEngine0(alloc, peerHost, peerPort, false), startTls);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected SslHandler newHandler(ByteBufAllocator alloc, boolean startTls, Executor executor) {
|
||||||
|
return new SslHandler(newEngine0(alloc, null, -1, false), startTls, executor);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected SslHandler newHandler(ByteBufAllocator alloc, String peerHost, int peerPort,
|
||||||
|
boolean startTls, Executor executor) {
|
||||||
|
return new SslHandler(newEngine0(alloc, peerHost, peerPort, false), executor);
|
||||||
|
}
|
||||||
|
|
||||||
SSLEngine newEngine0(ByteBufAllocator alloc, String peerHost, int peerPort, boolean jdkCompatibilityMode) {
|
SSLEngine newEngine0(ByteBufAllocator alloc, String peerHost, int peerPort, boolean jdkCompatibilityMode) {
|
||||||
return new ReferenceCountedOpenSslEngine(this, alloc, peerHost, peerPort, jdkCompatibilityMode, true);
|
return new ReferenceCountedOpenSslEngine(this, alloc, peerHost, peerPort, jdkCompatibilityMode, true);
|
||||||
}
|
}
|
||||||
|
@ -17,6 +17,7 @@ package io.netty.handler.ssl;
|
|||||||
|
|
||||||
import static java.util.Objects.requireNonNull;
|
import static java.util.Objects.requireNonNull;
|
||||||
|
|
||||||
|
import io.netty.buffer.ByteBufAllocator;
|
||||||
import io.netty.channel.ChannelHandlerContext;
|
import io.netty.channel.ChannelHandlerContext;
|
||||||
import io.netty.handler.codec.DecoderException;
|
import io.netty.handler.codec.DecoderException;
|
||||||
import io.netty.util.AsyncMapping;
|
import io.netty.util.AsyncMapping;
|
||||||
@ -130,7 +131,7 @@ public class SniHandler extends AbstractSniHandler<SslContext> {
|
|||||||
protected void replaceHandler(ChannelHandlerContext ctx, String hostname, SslContext sslContext) throws Exception {
|
protected void replaceHandler(ChannelHandlerContext ctx, String hostname, SslContext sslContext) throws Exception {
|
||||||
SslHandler sslHandler = null;
|
SslHandler sslHandler = null;
|
||||||
try {
|
try {
|
||||||
sslHandler = sslContext.newHandler(ctx.alloc());
|
sslHandler = newSslHandler(sslContext, ctx.alloc());
|
||||||
ctx.pipeline().replace(this, SslHandler.class.getName(), sslHandler);
|
ctx.pipeline().replace(this, SslHandler.class.getName(), sslHandler);
|
||||||
sslHandler = null;
|
sslHandler = null;
|
||||||
} finally {
|
} finally {
|
||||||
@ -143,6 +144,14 @@ public class SniHandler extends AbstractSniHandler<SslContext> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns a new {@link SslHandler} using the given {@link SslContext} and {@link ByteBufAllocator}.
|
||||||
|
* Users may override this method to implement custom behavior.
|
||||||
|
*/
|
||||||
|
protected SslHandler newSslHandler(SslContext context, ByteBufAllocator allocator) {
|
||||||
|
return context.newHandler(allocator);
|
||||||
|
}
|
||||||
|
|
||||||
private static final class AsyncMappingAdapter implements AsyncMapping<String, SslContext> {
|
private static final class AsyncMappingAdapter implements AsyncMapping<String, SslContext> {
|
||||||
private final Mapping<? super String, ? extends SslContext> mapping;
|
private final Mapping<? super String, ? extends SslContext> mapping;
|
||||||
|
|
||||||
|
@ -60,6 +60,7 @@ import java.security.cert.X509Certificate;
|
|||||||
import java.security.spec.InvalidKeySpecException;
|
import java.security.spec.InvalidKeySpecException;
|
||||||
import java.security.spec.PKCS8EncodedKeySpec;
|
import java.security.spec.PKCS8EncodedKeySpec;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.concurrent.Executor;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A secure socket protocol implementation which acts as a factory for {@link SSLEngine} and {@link SslHandler}.
|
* A secure socket protocol implementation which acts as a factory for {@link SSLEngine} and {@link SslHandler}.
|
||||||
@ -879,6 +880,22 @@ public abstract class SslContext {
|
|||||||
*/
|
*/
|
||||||
public abstract SSLSessionContext sessionContext();
|
public abstract SSLSessionContext sessionContext();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a new SslHandler.
|
||||||
|
* @see #newHandler(ByteBufAllocator, Executor)
|
||||||
|
*/
|
||||||
|
public final SslHandler newHandler(ByteBufAllocator alloc) {
|
||||||
|
return newHandler(alloc, startTls);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a new SslHandler.
|
||||||
|
* @see #newHandler(ByteBufAllocator)
|
||||||
|
*/
|
||||||
|
protected SslHandler newHandler(ByteBufAllocator alloc, boolean startTls) {
|
||||||
|
return new SslHandler(newEngine(alloc), startTls);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates a new {@link SslHandler}.
|
* Creates a new {@link SslHandler}.
|
||||||
* <p>If {@link SslProvider#OPENSSL_REFCNT} is used then the returned {@link SslHandler} will release the engine
|
* <p>If {@link SslProvider#OPENSSL_REFCNT} is used then the returned {@link SslHandler} will release the engine
|
||||||
@ -900,18 +917,37 @@ public abstract class SslContext {
|
|||||||
* <a href="https://docs.oracle.com/javase/7/docs/api/javax/net/ssl/SSLEngine.html">SSLEngine javadocs</a> which
|
* <a href="https://docs.oracle.com/javase/7/docs/api/javax/net/ssl/SSLEngine.html">SSLEngine javadocs</a> which
|
||||||
* limits wrap/unwrap to operate on a single SSL/TLS packet.
|
* limits wrap/unwrap to operate on a single SSL/TLS packet.
|
||||||
* @param alloc If supported by the SSLEngine then the SSLEngine will use this to allocate ByteBuf objects.
|
* @param alloc If supported by the SSLEngine then the SSLEngine will use this to allocate ByteBuf objects.
|
||||||
|
* @param delegatedTaskExecutor the {@link Executor} that will be used to execute tasks that are returned by
|
||||||
|
* {@link SSLEngine#getDelegatedTask()}.
|
||||||
* @return a new {@link SslHandler}
|
* @return a new {@link SslHandler}
|
||||||
*/
|
*/
|
||||||
public final SslHandler newHandler(ByteBufAllocator alloc) {
|
public SslHandler newHandler(ByteBufAllocator alloc, Executor delegatedTaskExecutor) {
|
||||||
return newHandler(alloc, startTls);
|
return newHandler(alloc, startTls, delegatedTaskExecutor);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create a new SslHandler.
|
* Create a new SslHandler.
|
||||||
* @see #newHandler(ByteBufAllocator)
|
* @see #newHandler(ByteBufAllocator, String, int, boolean, Executor)
|
||||||
*/
|
*/
|
||||||
protected SslHandler newHandler(ByteBufAllocator alloc, boolean startTls) {
|
protected SslHandler newHandler(ByteBufAllocator alloc, boolean startTls, Executor executor) {
|
||||||
return new SslHandler(newEngine(alloc), startTls);
|
return new SslHandler(newEngine(alloc), startTls, executor);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates a new {@link SslHandler}
|
||||||
|
*
|
||||||
|
* @see #newHandler(ByteBufAllocator, String, int, Executor)
|
||||||
|
*/
|
||||||
|
public final SslHandler newHandler(ByteBufAllocator alloc, String peerHost, int peerPort) {
|
||||||
|
return newHandler(alloc, peerHost, peerPort, startTls);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a new SslHandler.
|
||||||
|
* @see #newHandler(ByteBufAllocator, String, int, boolean, Executor)
|
||||||
|
*/
|
||||||
|
protected SslHandler newHandler(ByteBufAllocator alloc, String peerHost, int peerPort, boolean startTls) {
|
||||||
|
return new SslHandler(newEngine(alloc, peerHost, peerPort), startTls);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -937,19 +973,19 @@ public abstract class SslContext {
|
|||||||
* @param alloc If supported by the SSLEngine then the SSLEngine will use this to allocate ByteBuf objects.
|
* @param alloc If supported by the SSLEngine then the SSLEngine will use this to allocate ByteBuf objects.
|
||||||
* @param peerHost the non-authoritative name of the host
|
* @param peerHost the non-authoritative name of the host
|
||||||
* @param peerPort the non-authoritative port
|
* @param peerPort the non-authoritative port
|
||||||
|
* @param delegatedTaskExecutor the {@link Executor} that will be used to execute tasks that are returned by
|
||||||
|
* {@link SSLEngine#getDelegatedTask()}.
|
||||||
*
|
*
|
||||||
* @return a new {@link SslHandler}
|
* @return a new {@link SslHandler}
|
||||||
*/
|
*/
|
||||||
public final SslHandler newHandler(ByteBufAllocator alloc, String peerHost, int peerPort) {
|
public SslHandler newHandler(ByteBufAllocator alloc, String peerHost, int peerPort,
|
||||||
return newHandler(alloc, peerHost, peerPort, startTls);
|
Executor delegatedTaskExecutor) {
|
||||||
|
return newHandler(alloc, peerHost, peerPort, startTls, delegatedTaskExecutor);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
protected SslHandler newHandler(ByteBufAllocator alloc, String peerHost, int peerPort, boolean startTls,
|
||||||
* Create a new SslHandler.
|
Executor delegatedTaskExecutor) {
|
||||||
* @see #newHandler(ByteBufAllocator, String, int, boolean)
|
return new SslHandler(newEngine(alloc, peerHost, peerPort), startTls, delegatedTaskExecutor);
|
||||||
*/
|
|
||||||
protected SslHandler newHandler(ByteBufAllocator alloc, String peerHost, int peerPort, boolean startTls) {
|
|
||||||
return new SslHandler(newEngine(alloc, peerHost, peerPort), startTls);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -33,6 +33,7 @@ import io.netty.channel.ChannelPipeline;
|
|||||||
import io.netty.channel.ChannelPromise;
|
import io.netty.channel.ChannelPromise;
|
||||||
import io.netty.channel.ChannelPromiseNotifier;
|
import io.netty.channel.ChannelPromiseNotifier;
|
||||||
import io.netty.handler.codec.ByteToMessageDecoder;
|
import io.netty.handler.codec.ByteToMessageDecoder;
|
||||||
|
import io.netty.handler.codec.DecoderException;
|
||||||
import io.netty.handler.codec.UnsupportedMessageTypeException;
|
import io.netty.handler.codec.UnsupportedMessageTypeException;
|
||||||
import io.netty.util.ReferenceCountUtil;
|
import io.netty.util.ReferenceCountUtil;
|
||||||
import io.netty.util.ReferenceCounted;
|
import io.netty.util.ReferenceCounted;
|
||||||
@ -55,10 +56,9 @@ import java.nio.ByteBuffer;
|
|||||||
import java.nio.channels.ClosedChannelException;
|
import java.nio.channels.ClosedChannelException;
|
||||||
import java.nio.channels.DatagramChannel;
|
import java.nio.channels.DatagramChannel;
|
||||||
import java.nio.channels.SocketChannel;
|
import java.nio.channels.SocketChannel;
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.concurrent.CountDownLatch;
|
|
||||||
import java.util.concurrent.Executor;
|
import java.util.concurrent.Executor;
|
||||||
|
import java.util.concurrent.RejectedExecutionException;
|
||||||
import java.util.concurrent.ScheduledFuture;
|
import java.util.concurrent.ScheduledFuture;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.regex.Pattern;
|
import java.util.regex.Pattern;
|
||||||
@ -391,6 +391,7 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH
|
|||||||
private boolean flushedBeforeHandshake;
|
private boolean flushedBeforeHandshake;
|
||||||
private boolean readDuringHandshake;
|
private boolean readDuringHandshake;
|
||||||
private boolean handshakeStarted;
|
private boolean handshakeStarted;
|
||||||
|
|
||||||
private SslHandlerCoalescingBufferQueue pendingUnencryptedWrites;
|
private SslHandlerCoalescingBufferQueue pendingUnencryptedWrites;
|
||||||
private Promise<Channel> handshakePromise = new LazyChannelPromise();
|
private Promise<Channel> handshakePromise = new LazyChannelPromise();
|
||||||
private final LazyChannelPromise sslClosePromise = new LazyChannelPromise();
|
private final LazyChannelPromise sslClosePromise = new LazyChannelPromise();
|
||||||
@ -403,6 +404,7 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH
|
|||||||
|
|
||||||
private boolean outboundClosed;
|
private boolean outboundClosed;
|
||||||
private boolean closeNotify;
|
private boolean closeNotify;
|
||||||
|
private boolean processTask;
|
||||||
|
|
||||||
private int packetLength;
|
private int packetLength;
|
||||||
|
|
||||||
@ -418,7 +420,7 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH
|
|||||||
volatile int wrapDataSize = MAX_PLAINTEXT_LENGTH;
|
volatile int wrapDataSize = MAX_PLAINTEXT_LENGTH;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates a new instance.
|
* Creates a new instance which runs all delegated tasks directly on the {@link EventExecutor}.
|
||||||
*
|
*
|
||||||
* @param engine the {@link SSLEngine} this handler will use
|
* @param engine the {@link SSLEngine} this handler will use
|
||||||
*/
|
*/
|
||||||
@ -427,29 +429,36 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates a new instance.
|
* Creates a new instance which runs all delegated tasks directly on the {@link EventExecutor}.
|
||||||
*
|
*
|
||||||
* @param engine the {@link SSLEngine} this handler will use
|
* @param engine the {@link SSLEngine} this handler will use
|
||||||
* @param startTls {@code true} if the first write request shouldn't be
|
* @param startTls {@code true} if the first write request shouldn't be
|
||||||
* encrypted by the {@link SSLEngine}
|
* encrypted by the {@link SSLEngine}
|
||||||
*/
|
*/
|
||||||
@SuppressWarnings("deprecation")
|
|
||||||
public SslHandler(SSLEngine engine, boolean startTls) {
|
public SslHandler(SSLEngine engine, boolean startTls) {
|
||||||
this(engine, startTls, ImmediateExecutor.INSTANCE);
|
this(engine, startTls, ImmediateExecutor.INSTANCE);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @deprecated Use {@link #SslHandler(SSLEngine)} instead.
|
* Creates a new instance.
|
||||||
|
*
|
||||||
|
* @param engine the {@link SSLEngine} this handler will use
|
||||||
|
* @param delegatedTaskExecutor the {@link Executor} that will be used to execute tasks that are returned by
|
||||||
|
* {@link SSLEngine#getDelegatedTask()}.
|
||||||
*/
|
*/
|
||||||
@Deprecated
|
|
||||||
public SslHandler(SSLEngine engine, Executor delegatedTaskExecutor) {
|
public SslHandler(SSLEngine engine, Executor delegatedTaskExecutor) {
|
||||||
this(engine, false, delegatedTaskExecutor);
|
this(engine, false, delegatedTaskExecutor);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @deprecated Use {@link #SslHandler(SSLEngine, boolean)} instead.
|
* Creates a new instance.
|
||||||
|
*
|
||||||
|
* @param engine the {@link SSLEngine} this handler will use
|
||||||
|
* @param startTls {@code true} if the first write request shouldn't be
|
||||||
|
* encrypted by the {@link SSLEngine}
|
||||||
|
* @param delegatedTaskExecutor the {@link Executor} that will be used to execute tasks that are returned by
|
||||||
|
* {@link SSLEngine#getDelegatedTask()}.
|
||||||
*/
|
*/
|
||||||
@Deprecated
|
|
||||||
public SslHandler(SSLEngine engine, boolean startTls, Executor delegatedTaskExecutor) {
|
public SslHandler(SSLEngine engine, boolean startTls, Executor delegatedTaskExecutor) {
|
||||||
requireNonNull(engine, "engine");
|
requireNonNull(engine, "engine");
|
||||||
requireNonNull(delegatedTaskExecutor, "delegatedTaskExecutor");
|
requireNonNull(delegatedTaskExecutor, "delegatedTaskExecutor");
|
||||||
@ -769,6 +778,10 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (processTask) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
wrapAndFlush(ctx);
|
wrapAndFlush(ctx);
|
||||||
} catch (Throwable cause) {
|
} catch (Throwable cause) {
|
||||||
@ -808,7 +821,7 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH
|
|||||||
final int wrapDataSize = this.wrapDataSize;
|
final int wrapDataSize = this.wrapDataSize;
|
||||||
// Only continue to loop if the handler was not removed in the meantime.
|
// Only continue to loop if the handler was not removed in the meantime.
|
||||||
// See https://github.com/netty/netty/issues/5860
|
// See https://github.com/netty/netty/issues/5860
|
||||||
while (!ctx.isRemoved()) {
|
outer: while (!ctx.isRemoved()) {
|
||||||
promise = ctx.newPromise();
|
promise = ctx.newPromise();
|
||||||
buf = wrapDataSize > 0 ?
|
buf = wrapDataSize > 0 ?
|
||||||
pendingUnencryptedWrites.remove(alloc, wrapDataSize, promise) :
|
pendingUnencryptedWrites.remove(alloc, wrapDataSize, promise) :
|
||||||
@ -845,7 +858,11 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH
|
|||||||
|
|
||||||
switch (result.getHandshakeStatus()) {
|
switch (result.getHandshakeStatus()) {
|
||||||
case NEED_TASK:
|
case NEED_TASK:
|
||||||
runDelegatedTasks();
|
if (!runDelegatedTasks(inUnwrap)) {
|
||||||
|
// We scheduled a task on the delegatingTaskExecutor, so stop processing as we will
|
||||||
|
// resume once the task completes.
|
||||||
|
break outer;
|
||||||
|
}
|
||||||
break;
|
break;
|
||||||
case FINISHED:
|
case FINISHED:
|
||||||
setHandshakeSuccess();
|
setHandshakeSuccess();
|
||||||
@ -914,7 +931,7 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH
|
|||||||
try {
|
try {
|
||||||
// Only continue to loop if the handler was not removed in the meantime.
|
// Only continue to loop if the handler was not removed in the meantime.
|
||||||
// See https://github.com/netty/netty/issues/5860
|
// See https://github.com/netty/netty/issues/5860
|
||||||
while (!ctx.isRemoved()) {
|
outer: while (!ctx.isRemoved()) {
|
||||||
if (out == null) {
|
if (out == null) {
|
||||||
// As this is called for the handshake we have no real idea how big the buffer needs to be.
|
// As this is called for the handshake we have no real idea how big the buffer needs to be.
|
||||||
// That said 2048 should give us enough room to include everything like ALPN / NPN data.
|
// That said 2048 should give us enough room to include everything like ALPN / NPN data.
|
||||||
@ -936,7 +953,11 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH
|
|||||||
setHandshakeSuccess();
|
setHandshakeSuccess();
|
||||||
return false;
|
return false;
|
||||||
case NEED_TASK:
|
case NEED_TASK:
|
||||||
runDelegatedTasks();
|
if (!runDelegatedTasks(inUnwrap)) {
|
||||||
|
// We scheduled a task on the delegatingTaskExecutor, so stop processing as we will
|
||||||
|
// resume once the task completes.
|
||||||
|
break outer;
|
||||||
|
}
|
||||||
break;
|
break;
|
||||||
case NEED_UNWRAP:
|
case NEED_UNWRAP:
|
||||||
if (inUnwrap) {
|
if (inUnwrap) {
|
||||||
@ -1237,6 +1258,9 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws SSLException {
|
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws SSLException {
|
||||||
|
if (processTask) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
if (jdkCompatibilityMode) {
|
if (jdkCompatibilityMode) {
|
||||||
decodeJdkCompatible(ctx, in);
|
decodeJdkCompatible(ctx, in);
|
||||||
} else {
|
} else {
|
||||||
@ -1246,6 +1270,10 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
|
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
|
||||||
|
channelReadComplete0(ctx);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void channelReadComplete0(ChannelHandlerContext ctx) {
|
||||||
// Discard bytes of the cumulation buffer if needed.
|
// Discard bytes of the cumulation buffer if needed.
|
||||||
discardSomeReadBytes();
|
discardSomeReadBytes();
|
||||||
|
|
||||||
@ -1360,7 +1388,16 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH
|
|||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
case NEED_TASK:
|
case NEED_TASK:
|
||||||
runDelegatedTasks();
|
if (!runDelegatedTasks(true)) {
|
||||||
|
// We scheduled a task on the delegatingTaskExecutor, so stop processing as we will
|
||||||
|
// resume once the task completes.
|
||||||
|
//
|
||||||
|
// We break out of the loop only and do NOT return here as we still may need to notify
|
||||||
|
// about the closure of the SSLEngine.
|
||||||
|
//
|
||||||
|
wrapLater = false;
|
||||||
|
break unwrapLoop;
|
||||||
|
}
|
||||||
break;
|
break;
|
||||||
case FINISHED:
|
case FINISHED:
|
||||||
setHandshakeSuccess();
|
setHandshakeSuccess();
|
||||||
@ -1441,62 +1478,215 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH
|
|||||||
out.nioBuffer(index, len);
|
out.nioBuffer(index, len);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
private static boolean inEventLoop(Executor executor) {
|
||||||
* Fetches all delegated tasks from the {@link SSLEngine} and runs them via the {@link #delegatedTaskExecutor}.
|
return executor instanceof EventExecutor && ((EventExecutor) executor).inEventLoop();
|
||||||
* If the {@link #delegatedTaskExecutor} is {@link ImmediateExecutor}, just call {@link Runnable#run()} directly
|
}
|
||||||
* instead of using {@link Executor#execute(Runnable)}. Otherwise, run the tasks via
|
|
||||||
* the {@link #delegatedTaskExecutor} and wait until the tasks are finished.
|
|
||||||
*/
|
|
||||||
private void runDelegatedTasks() {
|
|
||||||
if (delegatedTaskExecutor == ImmediateExecutor.INSTANCE) {
|
|
||||||
for (;;) {
|
|
||||||
Runnable task = engine.getDelegatedTask();
|
|
||||||
if (task == null) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
task.run();
|
private static void runAllDelegatedTasks(SSLEngine engine) {
|
||||||
}
|
for (;;) {
|
||||||
} else {
|
Runnable task = engine.getDelegatedTask();
|
||||||
final List<Runnable> tasks = new ArrayList<>(2);
|
if (task == null) {
|
||||||
for (;;) {
|
|
||||||
final Runnable task = engine.getDelegatedTask();
|
|
||||||
if (task == null) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
tasks.add(task);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (tasks.isEmpty()) {
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
task.run();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
final CountDownLatch latch = new CountDownLatch(1);
|
/**
|
||||||
delegatedTaskExecutor.execute(() -> {
|
* Will either run the delegated task directly calling {@link Runnable#run()} and return {@code true} or will
|
||||||
try {
|
* offload the delegated task using {@link Executor#execute(Runnable)} and return {@code false}.
|
||||||
for (Runnable task: tasks) {
|
*
|
||||||
task.run();
|
* If the task is offloaded it will take care to resume its work on the {@link EventExecutor} once there are no
|
||||||
}
|
* more tasks to process.
|
||||||
} catch (Exception e) {
|
*/
|
||||||
ctx.fireExceptionCaught(e);
|
private boolean runDelegatedTasks(boolean inUnwrap) {
|
||||||
} finally {
|
if (delegatedTaskExecutor == ImmediateExecutor.INSTANCE || inEventLoop(delegatedTaskExecutor)) {
|
||||||
latch.countDown();
|
// We should run the task directly in the EventExecutor thread and not offload at all.
|
||||||
}
|
for (;;) {
|
||||||
});
|
runAllDelegatedTasks(engine);
|
||||||
|
return true;
|
||||||
boolean interrupted = false;
|
|
||||||
while (latch.getCount() != 0) {
|
|
||||||
try {
|
|
||||||
latch.await();
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
// Interrupt later.
|
|
||||||
interrupted = true;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
executeDelegatedTasks(inUnwrap);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if (interrupted) {
|
private void executeDelegatedTasks(boolean inUnwrap) {
|
||||||
Thread.currentThread().interrupt();
|
processTask = true;
|
||||||
|
try {
|
||||||
|
delegatedTaskExecutor.execute(new SslTasksRunner(inUnwrap));
|
||||||
|
} catch (RejectedExecutionException e) {
|
||||||
|
processTask = false;
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* {@link Runnable} that will be scheduled on the {@code delegatedTaskExecutor} and will take care
|
||||||
|
* of resume work on the {@link EventExecutor} once the task was executed.
|
||||||
|
*/
|
||||||
|
private final class SslTasksRunner implements Runnable {
|
||||||
|
private final boolean inUnwrap;
|
||||||
|
|
||||||
|
SslTasksRunner(boolean inUnwrap) {
|
||||||
|
this.inUnwrap = inUnwrap;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Handle errors which happened during task processing.
|
||||||
|
private void taskError(Throwable e) {
|
||||||
|
if (inUnwrap) {
|
||||||
|
// As the error happened while the task was scheduled as part of unwrap(...) we also need to ensure
|
||||||
|
// we fire it through the pipeline as inbound error to be consistent with what we do in decode(...).
|
||||||
|
//
|
||||||
|
// This will also ensure we fail the handshake future and flush all produced data.
|
||||||
|
try {
|
||||||
|
handleUnwrapThrowable(ctx, e);
|
||||||
|
} catch (Throwable cause) {
|
||||||
|
safeExceptionCaught(cause);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
setHandshakeFailure(ctx, e);
|
||||||
|
forceFlush(ctx);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Try to call exceptionCaught(...)
|
||||||
|
private void safeExceptionCaught(Throwable cause) {
|
||||||
|
try {
|
||||||
|
exceptionCaught(ctx, wrapIfNeeded(cause));
|
||||||
|
} catch (Throwable error) {
|
||||||
|
ctx.fireExceptionCaught(error);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private Throwable wrapIfNeeded(Throwable cause) {
|
||||||
|
if (!inUnwrap) {
|
||||||
|
// If we are not in unwrap(...) we can just rethrow without wrapping at all.
|
||||||
|
return cause;
|
||||||
|
}
|
||||||
|
// As the exception would have been triggered by an inbound operation we will need to wrap it in a
|
||||||
|
// DecoderException to mimic what a decoder would do when decode(...) throws.
|
||||||
|
return cause instanceof DecoderException ? cause : new DecoderException(cause);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void tryDecodeAgain() {
|
||||||
|
try {
|
||||||
|
channelRead(ctx, Unpooled.EMPTY_BUFFER);
|
||||||
|
} catch (Throwable cause) {
|
||||||
|
safeExceptionCaught(cause);
|
||||||
|
} finally {
|
||||||
|
// As we called channelRead(...) we also need to call channelReadComplete(...) which
|
||||||
|
// will ensure we either call ctx.fireChannelReadComplete() or will trigger a ctx.read() if
|
||||||
|
// more data is needed.
|
||||||
|
channelReadComplete0(ctx);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Executed after the wrapped {@code task} was executed via {@code delegatedTaskExecutor} to resume work
|
||||||
|
* on the {@link EventExecutor}.
|
||||||
|
*/
|
||||||
|
private void resumeOnEventExecutor() {
|
||||||
|
assert ctx.executor().inEventLoop();
|
||||||
|
|
||||||
|
processTask = false;
|
||||||
|
|
||||||
|
try {
|
||||||
|
HandshakeStatus status = engine.getHandshakeStatus();
|
||||||
|
switch (status) {
|
||||||
|
// There is another task that needs to be executed and offloaded to the delegatingTaskExecutor.
|
||||||
|
case NEED_TASK:
|
||||||
|
executeDelegatedTasks(inUnwrap);
|
||||||
|
|
||||||
|
break;
|
||||||
|
|
||||||
|
// The handshake finished, lets notify about the completion of it and resume processing.
|
||||||
|
case FINISHED:
|
||||||
|
setHandshakeSuccess();
|
||||||
|
|
||||||
|
// deliberate fall-through
|
||||||
|
|
||||||
|
// Not handshaking anymore, lets notify about the completion if not done yet and resume processing.
|
||||||
|
case NOT_HANDSHAKING:
|
||||||
|
setHandshakeSuccessIfStillHandshaking();
|
||||||
|
try {
|
||||||
|
// Lets call wrap to ensure we produce the alert if there is any pending and also to
|
||||||
|
// ensure we flush any queued data..
|
||||||
|
wrap(ctx, inUnwrap);
|
||||||
|
} catch (Throwable e) {
|
||||||
|
taskError(e);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Flush now as we may have written some data as part of the wrap call.
|
||||||
|
forceFlush(ctx);
|
||||||
|
|
||||||
|
tryDecodeAgain();
|
||||||
|
break;
|
||||||
|
|
||||||
|
// We need more data so lets try to unwrap first and then call decode again which will feed us
|
||||||
|
// with buffered data (if there is any).
|
||||||
|
case NEED_UNWRAP:
|
||||||
|
unwrapNonAppData(ctx);
|
||||||
|
tryDecodeAgain();
|
||||||
|
break;
|
||||||
|
|
||||||
|
// To make progress we need to call SSLEngine.wrap(...) which may produce more output data
|
||||||
|
// that will be written to the Channel.
|
||||||
|
case NEED_WRAP:
|
||||||
|
try {
|
||||||
|
wrapNonAppData(ctx, inUnwrap);
|
||||||
|
} catch (Throwable e) {
|
||||||
|
taskError(e);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
// Flush now as we may have written some data as part of the wrap call.
|
||||||
|
forceFlush(ctx);
|
||||||
|
|
||||||
|
// Now try to feed in more data that we have buffered.
|
||||||
|
tryDecodeAgain();
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
// Should never reach here as we handle all cases.
|
||||||
|
throw new AssertionError();
|
||||||
|
}
|
||||||
|
} catch (Throwable cause) {
|
||||||
|
safeExceptionCaught(cause);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
try {
|
||||||
|
runAllDelegatedTasks(engine);
|
||||||
|
|
||||||
|
// All tasks were processed.
|
||||||
|
assert engine.getHandshakeStatus() != HandshakeStatus.NEED_TASK;
|
||||||
|
|
||||||
|
// Jump back on the EventExecutor.
|
||||||
|
ctx.executor().execute(this::resumeOnEventExecutor);
|
||||||
|
} catch (final Throwable cause) {
|
||||||
|
handleException(cause);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void handleException(final Throwable cause) {
|
||||||
|
if (ctx.executor().inEventLoop()) {
|
||||||
|
processTask = false;
|
||||||
|
safeExceptionCaught(cause);
|
||||||
|
} else {
|
||||||
|
try {
|
||||||
|
ctx.executor().execute(() -> {
|
||||||
|
processTask = false;
|
||||||
|
safeExceptionCaught(cause);
|
||||||
|
});
|
||||||
|
} catch (RejectedExecutionException ignore) {
|
||||||
|
processTask = false;
|
||||||
|
// the context itself will handle the rejected exception when try to schedule the operation so
|
||||||
|
// ignore the RejectedExecutionException
|
||||||
|
ctx.fireExceptionCaught(cause);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -18,6 +18,7 @@ package io.netty.handler.ssl;
|
|||||||
|
|
||||||
import io.netty.bootstrap.Bootstrap;
|
import io.netty.bootstrap.Bootstrap;
|
||||||
import io.netty.bootstrap.ServerBootstrap;
|
import io.netty.bootstrap.ServerBootstrap;
|
||||||
|
import io.netty.buffer.ByteBufAllocator;
|
||||||
import io.netty.buffer.Unpooled;
|
import io.netty.buffer.Unpooled;
|
||||||
import io.netty.channel.Channel;
|
import io.netty.channel.Channel;
|
||||||
import io.netty.channel.ChannelHandler;
|
import io.netty.channel.ChannelHandler;
|
||||||
@ -42,6 +43,9 @@ import java.util.ArrayList;
|
|||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.concurrent.Executor;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
import org.junit.AfterClass;
|
import org.junit.AfterClass;
|
||||||
@ -67,7 +71,7 @@ public class CipherSuiteCanaryTest {
|
|||||||
|
|
||||||
private static SelfSignedCertificate CERT;
|
private static SelfSignedCertificate CERT;
|
||||||
|
|
||||||
@Parameters(name = "{index}: serverSslProvider = {0}, clientSslProvider = {1}, rfcCipherName = {2}")
|
@Parameters(name = "{index}: serverSslProvider = {0}, clientSslProvider = {1}, rfcCipherName = {2}, delegate = {3}")
|
||||||
public static Collection<Object[]> parameters() {
|
public static Collection<Object[]> parameters() {
|
||||||
List<Object[]> dst = new ArrayList<>();
|
List<Object[]> dst = new ArrayList<>();
|
||||||
dst.addAll(expand("TLS_DHE_RSA_WITH_AES_128_GCM_SHA256")); // DHE-RSA-AES128-GCM-SHA256
|
dst.addAll(expand("TLS_DHE_RSA_WITH_AES_128_GCM_SHA256")); // DHE-RSA-AES128-GCM-SHA256
|
||||||
@ -81,7 +85,7 @@ public class CipherSuiteCanaryTest {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@AfterClass
|
@AfterClass
|
||||||
public static void destory() {
|
public static void destroy() {
|
||||||
GROUP.shutdownGracefully();
|
GROUP.shutdownGracefully();
|
||||||
CERT.delete();
|
CERT.delete();
|
||||||
}
|
}
|
||||||
@ -91,11 +95,14 @@ public class CipherSuiteCanaryTest {
|
|||||||
private final SslProvider clientSslProvider;
|
private final SslProvider clientSslProvider;
|
||||||
|
|
||||||
private final String rfcCipherName;
|
private final String rfcCipherName;
|
||||||
|
private final boolean delegate;
|
||||||
|
|
||||||
public CipherSuiteCanaryTest(SslProvider serverSslProvider, SslProvider clientSslProvider, String rfcCipherName) {
|
public CipherSuiteCanaryTest(SslProvider serverSslProvider, SslProvider clientSslProvider,
|
||||||
|
String rfcCipherName, boolean delegate) {
|
||||||
this.serverSslProvider = serverSslProvider;
|
this.serverSslProvider = serverSslProvider;
|
||||||
this.clientSslProvider = clientSslProvider;
|
this.clientSslProvider = clientSslProvider;
|
||||||
this.rfcCipherName = rfcCipherName;
|
this.rfcCipherName = rfcCipherName;
|
||||||
|
this.delegate = delegate;
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void assumeCipherAvailable(SslProvider provider, String cipher) throws NoSuchAlgorithmException {
|
private static void assumeCipherAvailable(SslProvider provider, String cipher) throws NoSuchAlgorithmException {
|
||||||
@ -114,6 +121,14 @@ public class CipherSuiteCanaryTest {
|
|||||||
Assume.assumeTrue("Unsupported cipher: " + cipher, cipherSupported);
|
Assume.assumeTrue("Unsupported cipher: " + cipher, cipherSupported);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static SslHandler newSslHandler(SslContext sslCtx, ByteBufAllocator allocator, Executor executor) {
|
||||||
|
if (executor == null) {
|
||||||
|
return sslCtx.newHandler(allocator);
|
||||||
|
} else {
|
||||||
|
return sslCtx.newHandler(allocator, executor);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testHandshake() throws Exception {
|
public void testHandshake() throws Exception {
|
||||||
// Check if the cipher is supported at all which may not be the case for various JDK versions and OpenSSL API
|
// Check if the cipher is supported at all which may not be the case for various JDK versions and OpenSSL API
|
||||||
@ -130,6 +145,8 @@ public class CipherSuiteCanaryTest {
|
|||||||
.protocols(SslUtils.PROTOCOL_TLS_V1_2)
|
.protocols(SslUtils.PROTOCOL_TLS_V1_2)
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
|
final ExecutorService executorService = delegate ? Executors.newCachedThreadPool() : null;
|
||||||
|
|
||||||
try {
|
try {
|
||||||
final SslContext sslClientContext = SslContextBuilder.forClient()
|
final SslContext sslClientContext = SslContextBuilder.forClient()
|
||||||
.sslProvider(clientSslProvider)
|
.sslProvider(clientSslProvider)
|
||||||
@ -147,7 +164,7 @@ public class CipherSuiteCanaryTest {
|
|||||||
@Override
|
@Override
|
||||||
protected void initChannel(Channel ch) throws Exception {
|
protected void initChannel(Channel ch) throws Exception {
|
||||||
ChannelPipeline pipeline = ch.pipeline();
|
ChannelPipeline pipeline = ch.pipeline();
|
||||||
pipeline.addLast(sslServerContext.newHandler(ch.alloc()));
|
pipeline.addLast(newSslHandler(sslServerContext, ch.alloc(), executorService));
|
||||||
|
|
||||||
pipeline.addLast(new SimpleChannelInboundHandler<Object>() {
|
pipeline.addLast(new SimpleChannelInboundHandler<Object>() {
|
||||||
@Override
|
@Override
|
||||||
@ -183,7 +200,7 @@ public class CipherSuiteCanaryTest {
|
|||||||
@Override
|
@Override
|
||||||
protected void initChannel(Channel ch) throws Exception {
|
protected void initChannel(Channel ch) throws Exception {
|
||||||
ChannelPipeline pipeline = ch.pipeline();
|
ChannelPipeline pipeline = ch.pipeline();
|
||||||
pipeline.addLast(sslClientContext.newHandler(ch.alloc()));
|
pipeline.addLast(newSslHandler(sslClientContext, ch.alloc(), executorService));
|
||||||
|
|
||||||
pipeline.addLast(new SimpleChannelInboundHandler<Object>() {
|
pipeline.addLast(new SimpleChannelInboundHandler<Object>() {
|
||||||
@Override
|
@Override
|
||||||
@ -230,6 +247,10 @@ public class CipherSuiteCanaryTest {
|
|||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
ReferenceCountUtil.release(sslServerContext);
|
ReferenceCountUtil.release(sslServerContext);
|
||||||
|
|
||||||
|
if (executorService != null) {
|
||||||
|
executorService.shutdown();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -268,7 +289,8 @@ public class CipherSuiteCanaryTest {
|
|||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
dst.add(new Object[]{serverSslProvider, clientSslProvider, rfcCipherName});
|
dst.add(new Object[]{serverSslProvider, clientSslProvider, rfcCipherName, true});
|
||||||
|
dst.add(new Object[]{serverSslProvider, clientSslProvider, rfcCipherName, false});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -31,17 +31,18 @@ import static org.junit.Assume.assumeTrue;
|
|||||||
@RunWith(Parameterized.class)
|
@RunWith(Parameterized.class)
|
||||||
public class ConscryptJdkSslEngineInteropTest extends SSLEngineTest {
|
public class ConscryptJdkSslEngineInteropTest extends SSLEngineTest {
|
||||||
|
|
||||||
@Parameterized.Parameters(name = "{index}: bufferType = {0}, combo = {1}")
|
@Parameterized.Parameters(name = "{index}: bufferType = {0}, combo = {1}, delegate = {2}")
|
||||||
public static Collection<Object[]> data() {
|
public static Collection<Object[]> data() {
|
||||||
List<Object[]> params = new ArrayList<>();
|
List<Object[]> params = new ArrayList<>();
|
||||||
for (BufferType type: BufferType.values()) {
|
for (BufferType type: BufferType.values()) {
|
||||||
params.add(new Object[] { type, ProtocolCipherCombo.tlsv12()});
|
params.add(new Object[] { type, ProtocolCipherCombo.tlsv12(), false });
|
||||||
|
params.add(new Object[] { type, ProtocolCipherCombo.tlsv12(), true });
|
||||||
}
|
}
|
||||||
return params;
|
return params;
|
||||||
}
|
}
|
||||||
|
|
||||||
public ConscryptJdkSslEngineInteropTest(BufferType type, ProtocolCipherCombo combo) {
|
public ConscryptJdkSslEngineInteropTest(BufferType type, ProtocolCipherCombo combo, boolean delegate) {
|
||||||
super(type, combo);
|
super(type, combo, delegate);
|
||||||
}
|
}
|
||||||
|
|
||||||
@BeforeClass
|
@BeforeClass
|
||||||
|
@ -30,17 +30,18 @@ import static org.junit.Assume.assumeTrue;
|
|||||||
@RunWith(Parameterized.class)
|
@RunWith(Parameterized.class)
|
||||||
public class ConscryptSslEngineTest extends SSLEngineTest {
|
public class ConscryptSslEngineTest extends SSLEngineTest {
|
||||||
|
|
||||||
@Parameterized.Parameters(name = "{index}: bufferType = {0}, combo = {1}")
|
@Parameterized.Parameters(name = "{index}: bufferType = {0}, combo = {1}, delegate = {2}")
|
||||||
public static Collection<Object[]> data() {
|
public static Collection<Object[]> data() {
|
||||||
List<Object[]> params = new ArrayList<>();
|
List<Object[]> params = new ArrayList<>();
|
||||||
for (BufferType type: BufferType.values()) {
|
for (BufferType type: BufferType.values()) {
|
||||||
params.add(new Object[] { type, ProtocolCipherCombo.tlsv12()});
|
params.add(new Object[] { type, ProtocolCipherCombo.tlsv12(), false });
|
||||||
|
params.add(new Object[] { type, ProtocolCipherCombo.tlsv12(), true });
|
||||||
}
|
}
|
||||||
return params;
|
return params;
|
||||||
}
|
}
|
||||||
|
|
||||||
public ConscryptSslEngineTest(BufferType type, ProtocolCipherCombo combo) {
|
public ConscryptSslEngineTest(BufferType type, ProtocolCipherCombo combo, boolean delegate) {
|
||||||
super(type, combo);
|
super(type, combo, delegate);
|
||||||
}
|
}
|
||||||
|
|
||||||
@BeforeClass
|
@BeforeClass
|
||||||
|
@ -32,17 +32,18 @@ import static org.junit.Assume.assumeTrue;
|
|||||||
@RunWith(Parameterized.class)
|
@RunWith(Parameterized.class)
|
||||||
public class JdkConscryptSslEngineInteropTest extends SSLEngineTest {
|
public class JdkConscryptSslEngineInteropTest extends SSLEngineTest {
|
||||||
|
|
||||||
@Parameterized.Parameters(name = "{index}: bufferType = {0}, combo = {1}")
|
@Parameterized.Parameters(name = "{index}: bufferType = {0}, combo = {1}, delegate = {2}")
|
||||||
public static Collection<Object[]> data() {
|
public static Collection<Object[]> data() {
|
||||||
List<Object[]> params = new ArrayList<>();
|
List<Object[]> params = new ArrayList<>();
|
||||||
for (BufferType type: BufferType.values()) {
|
for (BufferType type: BufferType.values()) {
|
||||||
params.add(new Object[] { type, ProtocolCipherCombo.tlsv12()});
|
params.add(new Object[] { type, ProtocolCipherCombo.tlsv12(), false });
|
||||||
|
params.add(new Object[] { type, ProtocolCipherCombo.tlsv12(), true });
|
||||||
}
|
}
|
||||||
return params;
|
return params;
|
||||||
}
|
}
|
||||||
|
|
||||||
public JdkConscryptSslEngineInteropTest(BufferType type, ProtocolCipherCombo combo) {
|
public JdkConscryptSslEngineInteropTest(BufferType type, ProtocolCipherCombo combo, boolean delegate) {
|
||||||
super(type, combo);
|
super(type, combo, delegate);
|
||||||
}
|
}
|
||||||
|
|
||||||
@BeforeClass
|
@BeforeClass
|
||||||
|
@ -33,21 +33,23 @@ import static org.junit.Assume.assumeTrue;
|
|||||||
@RunWith(Parameterized.class)
|
@RunWith(Parameterized.class)
|
||||||
public class JdkOpenSslEngineInteroptTest extends SSLEngineTest {
|
public class JdkOpenSslEngineInteroptTest extends SSLEngineTest {
|
||||||
|
|
||||||
@Parameterized.Parameters(name = "{index}: bufferType = {0}, combo = {1}")
|
@Parameterized.Parameters(name = "{index}: bufferType = {0}, combo = {1}, delegate = {2}")
|
||||||
public static Collection<Object[]> data() {
|
public static Collection<Object[]> data() {
|
||||||
List<Object[]> params = new ArrayList<>();
|
List<Object[]> params = new ArrayList<>();
|
||||||
for (BufferType type: BufferType.values()) {
|
for (BufferType type: BufferType.values()) {
|
||||||
params.add(new Object[] { type, ProtocolCipherCombo.tlsv12()});
|
params.add(new Object[] { type, ProtocolCipherCombo.tlsv12(), false });
|
||||||
|
params.add(new Object[] { type, ProtocolCipherCombo.tlsv12(), true });
|
||||||
|
|
||||||
if (PlatformDependent.javaVersion() >= 11 && OpenSsl.isTlsv13Supported()) {
|
if (PlatformDependent.javaVersion() >= 11 && OpenSsl.isTlsv13Supported()) {
|
||||||
params.add(new Object[] { type, ProtocolCipherCombo.tlsv13() });
|
params.add(new Object[] { type, ProtocolCipherCombo.tlsv13(), false });
|
||||||
|
params.add(new Object[] { type, ProtocolCipherCombo.tlsv13(), true });
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return params;
|
return params;
|
||||||
}
|
}
|
||||||
|
|
||||||
public JdkOpenSslEngineInteroptTest(BufferType type, ProtocolCipherCombo protocolCipherCombo) {
|
public JdkOpenSslEngineInteroptTest(BufferType type, ProtocolCipherCombo protocolCipherCombo, boolean delegate) {
|
||||||
super(type, protocolCipherCombo);
|
super(type, protocolCipherCombo, delegate);
|
||||||
}
|
}
|
||||||
|
|
||||||
@BeforeClass
|
@BeforeClass
|
||||||
|
@ -142,14 +142,17 @@ public class JdkSslEngineTest extends SSLEngineTest {
|
|||||||
private static final String FALLBACK_APPLICATION_LEVEL_PROTOCOL = "my-protocol-http1_1";
|
private static final String FALLBACK_APPLICATION_LEVEL_PROTOCOL = "my-protocol-http1_1";
|
||||||
private static final String APPLICATION_LEVEL_PROTOCOL_NOT_COMPATIBLE = "my-protocol-FOO";
|
private static final String APPLICATION_LEVEL_PROTOCOL_NOT_COMPATIBLE = "my-protocol-FOO";
|
||||||
|
|
||||||
@Parameterized.Parameters(name = "{index}: providerType = {0}, bufferType = {1}, combo = {2}")
|
@Parameterized.Parameters(name = "{index}: providerType = {0}, bufferType = {1}, combo = {2}, delegate = {3}")
|
||||||
public static Collection<Object[]> data() {
|
public static Collection<Object[]> data() {
|
||||||
List<Object[]> params = new ArrayList<>();
|
List<Object[]> params = new ArrayList<>();
|
||||||
for (ProviderType providerType : ProviderType.values()) {
|
for (ProviderType providerType : ProviderType.values()) {
|
||||||
for (BufferType bufferType : BufferType.values()) {
|
for (BufferType bufferType : BufferType.values()) {
|
||||||
params.add(new Object[]{ providerType, bufferType, ProtocolCipherCombo.tlsv12()});
|
params.add(new Object[]{ providerType, bufferType, ProtocolCipherCombo.tlsv12(), true });
|
||||||
|
params.add(new Object[]{ providerType, bufferType, ProtocolCipherCombo.tlsv12(), false });
|
||||||
|
|
||||||
if (PlatformDependent.javaVersion() >= 11) {
|
if (PlatformDependent.javaVersion() >= 11) {
|
||||||
params.add(new Object[] { providerType, bufferType, ProtocolCipherCombo.tlsv13() });
|
params.add(new Object[] { providerType, bufferType, ProtocolCipherCombo.tlsv13(), true });
|
||||||
|
params.add(new Object[] { providerType, bufferType, ProtocolCipherCombo.tlsv13(), false });
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -160,8 +163,9 @@ public class JdkSslEngineTest extends SSLEngineTest {
|
|||||||
|
|
||||||
private Provider provider;
|
private Provider provider;
|
||||||
|
|
||||||
public JdkSslEngineTest(ProviderType providerType, BufferType bufferType, ProtocolCipherCombo protocolCipherCombo) {
|
public JdkSslEngineTest(ProviderType providerType, BufferType bufferType,
|
||||||
super(bufferType, protocolCipherCombo);
|
ProtocolCipherCombo protocolCipherCombo, boolean delegate) {
|
||||||
|
super(bufferType, protocolCipherCombo, delegate);
|
||||||
this.providerType = providerType;
|
this.providerType = providerType;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -68,21 +68,23 @@ public class OpenSslEngineTest extends SSLEngineTest {
|
|||||||
private static final String PREFERRED_APPLICATION_LEVEL_PROTOCOL = "my-protocol-http2";
|
private static final String PREFERRED_APPLICATION_LEVEL_PROTOCOL = "my-protocol-http2";
|
||||||
private static final String FALLBACK_APPLICATION_LEVEL_PROTOCOL = "my-protocol-http1_1";
|
private static final String FALLBACK_APPLICATION_LEVEL_PROTOCOL = "my-protocol-http1_1";
|
||||||
|
|
||||||
@Parameterized.Parameters(name = "{index}: bufferType = {0}, combo = {1}")
|
@Parameterized.Parameters(name = "{index}: bufferType = {0}, combo = {1}, delegate = {2}")
|
||||||
public static Collection<Object[]> data() {
|
public static Collection<Object[]> data() {
|
||||||
List<Object[]> params = new ArrayList<>();
|
List<Object[]> params = new ArrayList<>();
|
||||||
for (BufferType type: BufferType.values()) {
|
for (BufferType type: BufferType.values()) {
|
||||||
params.add(new Object[] { type, ProtocolCipherCombo.tlsv12()});
|
params.add(new Object[] { type, ProtocolCipherCombo.tlsv12(), false });
|
||||||
|
params.add(new Object[] { type, ProtocolCipherCombo.tlsv12(), true });
|
||||||
|
|
||||||
if (OpenSsl.isTlsv13Supported()) {
|
if (OpenSsl.isTlsv13Supported()) {
|
||||||
params.add(new Object[] { type, ProtocolCipherCombo.tlsv13() });
|
params.add(new Object[] { type, ProtocolCipherCombo.tlsv13(), false });
|
||||||
|
params.add(new Object[] { type, ProtocolCipherCombo.tlsv13(), true });
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return params;
|
return params;
|
||||||
}
|
}
|
||||||
|
|
||||||
public OpenSslEngineTest(BufferType type, ProtocolCipherCombo cipherCombo) {
|
public OpenSslEngineTest(BufferType type, ProtocolCipherCombo cipherCombo, boolean delegate) {
|
||||||
super(type, cipherCombo);
|
super(type, cipherCombo, delegate);
|
||||||
}
|
}
|
||||||
|
|
||||||
@BeforeClass
|
@BeforeClass
|
||||||
|
@ -35,21 +35,23 @@ import static org.junit.Assume.assumeTrue;
|
|||||||
@RunWith(Parameterized.class)
|
@RunWith(Parameterized.class)
|
||||||
public class OpenSslJdkSslEngineInteroptTest extends SSLEngineTest {
|
public class OpenSslJdkSslEngineInteroptTest extends SSLEngineTest {
|
||||||
|
|
||||||
@Parameterized.Parameters(name = "{index}: bufferType = {0}, combo = {1}")
|
@Parameterized.Parameters(name = "{index}: bufferType = {0}, combo = {1}, delegate = {2}")
|
||||||
public static Collection<Object[]> data() {
|
public static Collection<Object[]> data() {
|
||||||
List<Object[]> params = new ArrayList<>();
|
List<Object[]> params = new ArrayList<>();
|
||||||
for (BufferType type: BufferType.values()) {
|
for (BufferType type: BufferType.values()) {
|
||||||
params.add(new Object[] { type, ProtocolCipherCombo.tlsv12()});
|
params.add(new Object[] { type, ProtocolCipherCombo.tlsv12(), false });
|
||||||
|
params.add(new Object[] { type, ProtocolCipherCombo.tlsv12(), true });
|
||||||
|
|
||||||
if (PlatformDependent.javaVersion() >= 11 && OpenSsl.isTlsv13Supported()) {
|
if (PlatformDependent.javaVersion() >= 11 && OpenSsl.isTlsv13Supported()) {
|
||||||
params.add(new Object[] { type, ProtocolCipherCombo.tlsv13() });
|
params.add(new Object[] { type, ProtocolCipherCombo.tlsv13(), false });
|
||||||
|
params.add(new Object[] { type, ProtocolCipherCombo.tlsv13(), true });
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return params;
|
return params;
|
||||||
}
|
}
|
||||||
|
|
||||||
public OpenSslJdkSslEngineInteroptTest(BufferType type, ProtocolCipherCombo combo) {
|
public OpenSslJdkSslEngineInteroptTest(BufferType type, ProtocolCipherCombo combo, boolean delegate) {
|
||||||
super(type, combo);
|
super(type, combo, delegate);
|
||||||
}
|
}
|
||||||
|
|
||||||
@BeforeClass
|
@BeforeClass
|
||||||
|
@ -23,8 +23,8 @@ import javax.net.ssl.SSLEngine;
|
|||||||
|
|
||||||
public class ReferenceCountedOpenSslEngineTest extends OpenSslEngineTest {
|
public class ReferenceCountedOpenSslEngineTest extends OpenSslEngineTest {
|
||||||
|
|
||||||
public ReferenceCountedOpenSslEngineTest(BufferType type, ProtocolCipherCombo combo) {
|
public ReferenceCountedOpenSslEngineTest(BufferType type, ProtocolCipherCombo combo, boolean delegate) {
|
||||||
super(type, combo);
|
super(type, combo, delegate);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -95,6 +95,8 @@ import java.util.Set;
|
|||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.ExecutionException;
|
import java.util.concurrent.ExecutionException;
|
||||||
import java.util.concurrent.ThreadLocalRandom;
|
import java.util.concurrent.ThreadLocalRandom;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
import static io.netty.handler.ssl.SslUtils.PROTOCOL_SSL_V2;
|
import static io.netty.handler.ssl.SslUtils.PROTOCOL_SSL_V2;
|
||||||
@ -261,10 +263,13 @@ public abstract class SSLEngineTest {
|
|||||||
|
|
||||||
private final BufferType type;
|
private final BufferType type;
|
||||||
private final ProtocolCipherCombo protocolCipherCombo;
|
private final ProtocolCipherCombo protocolCipherCombo;
|
||||||
|
private final boolean delegate;
|
||||||
|
private ExecutorService delegatingExecutor;
|
||||||
|
|
||||||
protected SSLEngineTest(BufferType type, ProtocolCipherCombo protocolCipherCombo) {
|
protected SSLEngineTest(BufferType type, ProtocolCipherCombo protocolCipherCombo, boolean delegate) {
|
||||||
this.type = type;
|
this.type = type;
|
||||||
this.protocolCipherCombo = protocolCipherCombo;
|
this.protocolCipherCombo = protocolCipherCombo;
|
||||||
|
this.delegate = delegate;
|
||||||
}
|
}
|
||||||
|
|
||||||
protected ByteBuffer allocateBuffer(int len) {
|
protected ByteBuffer allocateBuffer(int len) {
|
||||||
@ -450,6 +455,9 @@ public abstract class SSLEngineTest {
|
|||||||
MockitoAnnotations.initMocks(this);
|
MockitoAnnotations.initMocks(this);
|
||||||
serverLatch = new CountDownLatch(1);
|
serverLatch = new CountDownLatch(1);
|
||||||
clientLatch = new CountDownLatch(1);
|
clientLatch = new CountDownLatch(1);
|
||||||
|
if (delegate) {
|
||||||
|
delegatingExecutor = Executors.newCachedThreadPool();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@After
|
@After
|
||||||
@ -509,6 +517,10 @@ public abstract class SSLEngineTest {
|
|||||||
clientGroupShutdownFuture.sync();
|
clientGroupShutdownFuture.sync();
|
||||||
}
|
}
|
||||||
serverException = null;
|
serverException = null;
|
||||||
|
|
||||||
|
if (delegatingExecutor != null) {
|
||||||
|
delegatingExecutor.shutdown();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@ -713,7 +725,8 @@ public abstract class SSLEngineTest {
|
|||||||
ch.config().setAllocator(new TestByteBufAllocator(ch.config().getAllocator(), type));
|
ch.config().setAllocator(new TestByteBufAllocator(ch.config().getAllocator(), type));
|
||||||
|
|
||||||
ChannelPipeline p = ch.pipeline();
|
ChannelPipeline p = ch.pipeline();
|
||||||
SslHandler handler = serverSslCtx.newHandler(ch.alloc());
|
SslHandler handler = delegatingExecutor == null ? serverSslCtx.newHandler(ch.alloc()) :
|
||||||
|
serverSslCtx.newHandler(ch.alloc(), delegatingExecutor);
|
||||||
if (serverInitEngine) {
|
if (serverInitEngine) {
|
||||||
mySetupMutualAuthServerInitSslHandler(handler);
|
mySetupMutualAuthServerInitSslHandler(handler);
|
||||||
}
|
}
|
||||||
@ -756,7 +769,10 @@ public abstract class SSLEngineTest {
|
|||||||
protected void initChannel(Channel ch) throws Exception {
|
protected void initChannel(Channel ch) throws Exception {
|
||||||
ch.config().setAllocator(new TestByteBufAllocator(ch.config().getAllocator(), type));
|
ch.config().setAllocator(new TestByteBufAllocator(ch.config().getAllocator(), type));
|
||||||
ChannelPipeline p = ch.pipeline();
|
ChannelPipeline p = ch.pipeline();
|
||||||
p.addLast(clientSslCtx.newHandler(ch.alloc()));
|
|
||||||
|
SslHandler handler = delegatingExecutor == null ? clientSslCtx.newHandler(ch.alloc()) :
|
||||||
|
clientSslCtx.newHandler(ch.alloc(), delegatingExecutor);
|
||||||
|
p.addLast(handler);
|
||||||
p.addLast(new MessageDelegatorChannelHandler(clientReceiver, clientLatch));
|
p.addLast(new MessageDelegatorChannelHandler(clientReceiver, clientLatch));
|
||||||
p.addLast(new ChannelInboundHandlerAdapter() {
|
p.addLast(new ChannelInboundHandlerAdapter() {
|
||||||
@Override
|
@Override
|
||||||
@ -860,7 +876,10 @@ public abstract class SSLEngineTest {
|
|||||||
protected void initChannel(Channel ch) throws Exception {
|
protected void initChannel(Channel ch) throws Exception {
|
||||||
ch.config().setAllocator(new TestByteBufAllocator(ch.config().getAllocator(), type));
|
ch.config().setAllocator(new TestByteBufAllocator(ch.config().getAllocator(), type));
|
||||||
ChannelPipeline p = ch.pipeline();
|
ChannelPipeline p = ch.pipeline();
|
||||||
p.addLast(serverSslCtx.newHandler(ch.alloc()));
|
|
||||||
|
SslHandler handler = delegatingExecutor == null ? serverSslCtx.newHandler(ch.alloc()) :
|
||||||
|
serverSslCtx.newHandler(ch.alloc(), delegatingExecutor);
|
||||||
|
p.addLast(handler);
|
||||||
p.addLast(new MessageDelegatorChannelHandler(serverReceiver, serverLatch));
|
p.addLast(new MessageDelegatorChannelHandler(serverReceiver, serverLatch));
|
||||||
p.addLast(new ChannelInboundHandlerAdapter() {
|
p.addLast(new ChannelInboundHandlerAdapter() {
|
||||||
@Override
|
@Override
|
||||||
@ -900,7 +919,11 @@ public abstract class SSLEngineTest {
|
|||||||
ch.config().setAllocator(new TestByteBufAllocator(ch.config().getAllocator(), type));
|
ch.config().setAllocator(new TestByteBufAllocator(ch.config().getAllocator(), type));
|
||||||
ChannelPipeline p = ch.pipeline();
|
ChannelPipeline p = ch.pipeline();
|
||||||
InetSocketAddress remoteAddress = (InetSocketAddress) serverChannel.localAddress();
|
InetSocketAddress remoteAddress = (InetSocketAddress) serverChannel.localAddress();
|
||||||
SslHandler sslHandler = clientSslCtx.newHandler(ch.alloc(), expectedHost, 0);
|
|
||||||
|
SslHandler sslHandler = delegatingExecutor == null ?
|
||||||
|
clientSslCtx.newHandler(ch.alloc(), expectedHost, 0) :
|
||||||
|
clientSslCtx.newHandler(ch.alloc(), expectedHost, 0, delegatingExecutor);
|
||||||
|
|
||||||
SSLParameters parameters = sslHandler.engine().getSSLParameters();
|
SSLParameters parameters = sslHandler.engine().getSSLParameters();
|
||||||
if (SslUtils.isValidHostNameForSNI(expectedHost)) {
|
if (SslUtils.isValidHostNameForSNI(expectedHost)) {
|
||||||
assertEquals(1, parameters.getServerNames().size());
|
assertEquals(1, parameters.getServerNames().size());
|
||||||
@ -1065,7 +1088,10 @@ public abstract class SSLEngineTest {
|
|||||||
protected void initChannel(Channel ch) throws Exception {
|
protected void initChannel(Channel ch) throws Exception {
|
||||||
ch.config().setAllocator(new TestByteBufAllocator(ch.config().getAllocator(), type));
|
ch.config().setAllocator(new TestByteBufAllocator(ch.config().getAllocator(), type));
|
||||||
|
|
||||||
final SslHandler handler = clientSslCtx.newHandler(ch.alloc());
|
final SslHandler handler = delegatingExecutor == null ?
|
||||||
|
clientSslCtx.newHandler(ch.alloc()) :
|
||||||
|
clientSslCtx.newHandler(ch.alloc(), delegatingExecutor);
|
||||||
|
|
||||||
handler.engine().setNeedClientAuth(true);
|
handler.engine().setNeedClientAuth(true);
|
||||||
ChannelPipeline p = ch.pipeline();
|
ChannelPipeline p = ch.pipeline();
|
||||||
p.addLast(handler);
|
p.addLast(handler);
|
||||||
@ -1137,7 +1163,7 @@ public abstract class SSLEngineTest {
|
|||||||
MessageReceiver receiver) throws Exception {
|
MessageReceiver receiver) throws Exception {
|
||||||
List<ByteBuf> dataCapture = null;
|
List<ByteBuf> dataCapture = null;
|
||||||
try {
|
try {
|
||||||
assertTrue(sendChannel.writeAndFlush(message).await(5, TimeUnit.SECONDS));
|
assertTrue(sendChannel.writeAndFlush(message).await(50, TimeUnit.SECONDS));
|
||||||
receiverLatch.await(5, TimeUnit.SECONDS);
|
receiverLatch.await(5, TimeUnit.SECONDS);
|
||||||
message.readerIndex(0);
|
message.readerIndex(0);
|
||||||
ArgumentCaptor<ByteBuf> captor = ArgumentCaptor.forClass(ByteBuf.class);
|
ArgumentCaptor<ByteBuf> captor = ArgumentCaptor.forClass(ByteBuf.class);
|
||||||
@ -1260,7 +1286,12 @@ public abstract class SSLEngineTest {
|
|||||||
ch.config().setAllocator(new TestByteBufAllocator(ch.config().getAllocator(), type));
|
ch.config().setAllocator(new TestByteBufAllocator(ch.config().getAllocator(), type));
|
||||||
|
|
||||||
ChannelPipeline p = ch.pipeline();
|
ChannelPipeline p = ch.pipeline();
|
||||||
p.addLast(serverSslCtx.newHandler(ch.alloc()));
|
|
||||||
|
SslHandler handler = delegatingExecutor == null ?
|
||||||
|
serverSslCtx.newHandler(ch.alloc()) :
|
||||||
|
serverSslCtx.newHandler(ch.alloc(), delegatingExecutor);
|
||||||
|
|
||||||
|
p.addLast(handler);
|
||||||
p.addLast(new ChannelInboundHandlerAdapter() {
|
p.addLast(new ChannelInboundHandlerAdapter() {
|
||||||
@Override
|
@Override
|
||||||
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
|
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
|
||||||
@ -1313,7 +1344,11 @@ public abstract class SSLEngineTest {
|
|||||||
ch.config().setAllocator(new TestByteBufAllocator(ch.config().getAllocator(), type));
|
ch.config().setAllocator(new TestByteBufAllocator(ch.config().getAllocator(), type));
|
||||||
|
|
||||||
ChannelPipeline p = ch.pipeline();
|
ChannelPipeline p = ch.pipeline();
|
||||||
SslHandler sslHandler = clientSslCtx.newHandler(ch.alloc());
|
|
||||||
|
SslHandler sslHandler = delegatingExecutor == null ?
|
||||||
|
clientSslCtx.newHandler(ch.alloc()) :
|
||||||
|
clientSslCtx.newHandler(ch.alloc(), delegatingExecutor);
|
||||||
|
|
||||||
// The renegotiate is not expected to succeed, so we should stop trying in a timely manner so
|
// The renegotiate is not expected to succeed, so we should stop trying in a timely manner so
|
||||||
// the unit test can terminate relativley quicly.
|
// the unit test can terminate relativley quicly.
|
||||||
sslHandler.setHandshakeTimeout(1, TimeUnit.SECONDS);
|
sslHandler.setHandshakeTimeout(1, TimeUnit.SECONDS);
|
||||||
@ -1393,7 +1428,7 @@ public abstract class SSLEngineTest {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void handshake(SSLEngine clientEngine, SSLEngine serverEngine) throws SSLException {
|
protected void handshake(SSLEngine clientEngine, SSLEngine serverEngine) throws Exception {
|
||||||
ByteBuffer cTOs = allocateBuffer(clientEngine.getSession().getPacketBufferSize());
|
ByteBuffer cTOs = allocateBuffer(clientEngine.getSession().getPacketBufferSize());
|
||||||
ByteBuffer sTOc = allocateBuffer(serverEngine.getSession().getPacketBufferSize());
|
ByteBuffer sTOc = allocateBuffer(serverEngine.getSession().getPacketBufferSize());
|
||||||
|
|
||||||
@ -1486,14 +1521,18 @@ public abstract class SSLEngineTest {
|
|||||||
return result.getHandshakeStatus() == SSLEngineResult.HandshakeStatus.FINISHED;
|
return result.getHandshakeStatus() == SSLEngineResult.HandshakeStatus.FINISHED;
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void runDelegatedTasks(SSLEngineResult result, SSLEngine engine) {
|
private void runDelegatedTasks(SSLEngineResult result, SSLEngine engine) throws Exception {
|
||||||
if (result.getHandshakeStatus() == SSLEngineResult.HandshakeStatus.NEED_TASK) {
|
if (result.getHandshakeStatus() == SSLEngineResult.HandshakeStatus.NEED_TASK) {
|
||||||
for (;;) {
|
for (;;) {
|
||||||
Runnable task = engine.getDelegatedTask();
|
Runnable task = engine.getDelegatedTask();
|
||||||
if (task == null) {
|
if (task == null) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
task.run();
|
if (delegatingExecutor == null) {
|
||||||
|
task.run();
|
||||||
|
} else {
|
||||||
|
delegatingExecutor.submit(task).get();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -1596,7 +1635,12 @@ public abstract class SSLEngineTest {
|
|||||||
ch.config().setAllocator(new TestByteBufAllocator(ch.config().getAllocator(), type));
|
ch.config().setAllocator(new TestByteBufAllocator(ch.config().getAllocator(), type));
|
||||||
|
|
||||||
ChannelPipeline p = ch.pipeline();
|
ChannelPipeline p = ch.pipeline();
|
||||||
p.addLast(serverSslCtx.newHandler(ch.alloc()));
|
|
||||||
|
SslHandler sslHandler = delegatingExecutor == null ?
|
||||||
|
serverSslCtx.newHandler(ch.alloc()) :
|
||||||
|
serverSslCtx.newHandler(ch.alloc(), delegatingExecutor);
|
||||||
|
|
||||||
|
p.addLast(sslHandler);
|
||||||
p.addLast(new MessageDelegatorChannelHandler(serverReceiver, serverLatch));
|
p.addLast(new MessageDelegatorChannelHandler(serverReceiver, serverLatch));
|
||||||
p.addLast(new ChannelInboundHandlerAdapter() {
|
p.addLast(new ChannelInboundHandlerAdapter() {
|
||||||
@Override
|
@Override
|
||||||
@ -1621,7 +1665,12 @@ public abstract class SSLEngineTest {
|
|||||||
ch.config().setAllocator(new TestByteBufAllocator(ch.config().getAllocator(), type));
|
ch.config().setAllocator(new TestByteBufAllocator(ch.config().getAllocator(), type));
|
||||||
|
|
||||||
ChannelPipeline p = ch.pipeline();
|
ChannelPipeline p = ch.pipeline();
|
||||||
p.addLast(clientSslCtx.newHandler(ch.alloc()));
|
|
||||||
|
SslHandler sslHandler = delegatingExecutor == null ?
|
||||||
|
clientSslCtx.newHandler(ch.alloc()) :
|
||||||
|
clientSslCtx.newHandler(ch.alloc(), delegatingExecutor);
|
||||||
|
|
||||||
|
p.addLast(sslHandler);
|
||||||
p.addLast(new MessageDelegatorChannelHandler(clientReceiver, clientLatch));
|
p.addLast(new MessageDelegatorChannelHandler(clientReceiver, clientLatch));
|
||||||
p.addLast(new ChannelInboundHandlerAdapter() {
|
p.addLast(new ChannelInboundHandlerAdapter() {
|
||||||
@Override
|
@Override
|
||||||
@ -1672,7 +1721,11 @@ public abstract class SSLEngineTest {
|
|||||||
protected void initChannel(Channel ch) throws Exception {
|
protected void initChannel(Channel ch) throws Exception {
|
||||||
ch.config().setAllocator(new TestByteBufAllocator(ch.config().getAllocator(), type));
|
ch.config().setAllocator(new TestByteBufAllocator(ch.config().getAllocator(), type));
|
||||||
|
|
||||||
ch.pipeline().addFirst(serverSslCtx.newHandler(ch.alloc()));
|
SslHandler sslHandler = delegatingExecutor == null ?
|
||||||
|
serverSslCtx.newHandler(ch.alloc()) :
|
||||||
|
serverSslCtx.newHandler(ch.alloc(), delegatingExecutor);
|
||||||
|
|
||||||
|
ch.pipeline().addFirst(sslHandler);
|
||||||
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
|
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
|
||||||
@Override
|
@Override
|
||||||
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
|
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
|
||||||
|
@ -52,8 +52,9 @@ import io.netty.util.AbstractReferenceCounted;
|
|||||||
import io.netty.util.IllegalReferenceCountException;
|
import io.netty.util.IllegalReferenceCountException;
|
||||||
import io.netty.util.ReferenceCountUtil;
|
import io.netty.util.ReferenceCountUtil;
|
||||||
import io.netty.util.ReferenceCounted;
|
import io.netty.util.ReferenceCounted;
|
||||||
import io.netty.util.concurrent.Future;
|
|
||||||
import io.netty.util.concurrent.FutureListener;
|
import io.netty.util.concurrent.FutureListener;
|
||||||
|
import io.netty.util.concurrent.ImmediateEventExecutor;
|
||||||
|
import io.netty.util.concurrent.ImmediateExecutor;
|
||||||
import io.netty.util.concurrent.Promise;
|
import io.netty.util.concurrent.Promise;
|
||||||
import org.hamcrest.CoreMatchers;
|
import org.hamcrest.CoreMatchers;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
@ -64,6 +65,9 @@ import java.security.NoSuchAlgorithmException;
|
|||||||
import java.util.concurrent.BlockingQueue;
|
import java.util.concurrent.BlockingQueue;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.ExecutionException;
|
import java.util.concurrent.ExecutionException;
|
||||||
|
import java.util.concurrent.Executor;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
import java.util.concurrent.LinkedBlockingQueue;
|
import java.util.concurrent.LinkedBlockingQueue;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
@ -808,4 +812,157 @@ public class SslHandlerTest {
|
|||||||
ReferenceCountUtil.release(sslClientCtx);
|
ReferenceCountUtil.release(sslClientCtx);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testHandshakeWithExecutorThatExecuteDirecty() throws Exception {
|
||||||
|
testHandshakeWithExecutor(command -> command.run());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testHandshakeWithImmediateExecutor() throws Exception {
|
||||||
|
testHandshakeWithExecutor(ImmediateExecutor.INSTANCE);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testHandshakeWithImmediateEventExecutor() throws Exception {
|
||||||
|
testHandshakeWithExecutor(ImmediateEventExecutor.INSTANCE);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testHandshakeWithExecutor() throws Exception {
|
||||||
|
ExecutorService executorService = Executors.newCachedThreadPool();
|
||||||
|
try {
|
||||||
|
testHandshakeWithExecutor(executorService);
|
||||||
|
} finally {
|
||||||
|
executorService.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void testHandshakeWithExecutor(Executor executor) throws Exception {
|
||||||
|
final SslContext sslClientCtx = SslContextBuilder.forClient()
|
||||||
|
.trustManager(InsecureTrustManagerFactory.INSTANCE)
|
||||||
|
.sslProvider(SslProvider.JDK).build();
|
||||||
|
|
||||||
|
final SelfSignedCertificate cert = new SelfSignedCertificate();
|
||||||
|
final SslContext sslServerCtx = SslContextBuilder.forServer(cert.key(), cert.cert())
|
||||||
|
.sslProvider(SslProvider.JDK).build();
|
||||||
|
|
||||||
|
EventLoopGroup group = new MultithreadEventLoopGroup(NioHandler.newFactory());
|
||||||
|
Channel sc = null;
|
||||||
|
Channel cc = null;
|
||||||
|
final SslHandler clientSslHandler = sslClientCtx.newHandler(UnpooledByteBufAllocator.DEFAULT, executor);
|
||||||
|
final SslHandler serverSslHandler = sslServerCtx.newHandler(UnpooledByteBufAllocator.DEFAULT, executor);
|
||||||
|
|
||||||
|
try {
|
||||||
|
sc = new ServerBootstrap()
|
||||||
|
.group(group)
|
||||||
|
.channel(NioServerSocketChannel.class)
|
||||||
|
.childHandler(serverSslHandler)
|
||||||
|
.bind(new InetSocketAddress(0)).syncUninterruptibly().channel();
|
||||||
|
|
||||||
|
ChannelFuture future = new Bootstrap()
|
||||||
|
.group(group)
|
||||||
|
.channel(NioSocketChannel.class)
|
||||||
|
.handler(new ChannelInitializer<Channel>() {
|
||||||
|
@Override
|
||||||
|
protected void initChannel(Channel ch) {
|
||||||
|
ch.pipeline().addLast(clientSslHandler);
|
||||||
|
}
|
||||||
|
}).connect(sc.localAddress());
|
||||||
|
cc = future.syncUninterruptibly().channel();
|
||||||
|
|
||||||
|
assertTrue(clientSslHandler.handshakeFuture().await().isSuccess());
|
||||||
|
assertTrue(serverSslHandler.handshakeFuture().await().isSuccess());
|
||||||
|
} finally {
|
||||||
|
if (cc != null) {
|
||||||
|
cc.close().syncUninterruptibly();
|
||||||
|
}
|
||||||
|
if (sc != null) {
|
||||||
|
sc.close().syncUninterruptibly();
|
||||||
|
}
|
||||||
|
group.shutdownGracefully();
|
||||||
|
ReferenceCountUtil.release(sslClientCtx);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testClientHandshakeTimeoutBecauseExecutorNotExecute() throws Exception {
|
||||||
|
testHandshakeTimeoutBecauseExecutorNotExecute(true);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testServerHandshakeTimeoutBecauseExecutorNotExecute() throws Exception {
|
||||||
|
testHandshakeTimeoutBecauseExecutorNotExecute(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void testHandshakeTimeoutBecauseExecutorNotExecute(final boolean client) throws Exception {
|
||||||
|
final SslContext sslClientCtx = SslContextBuilder.forClient()
|
||||||
|
.trustManager(InsecureTrustManagerFactory.INSTANCE)
|
||||||
|
.sslProvider(SslProvider.JDK).build();
|
||||||
|
|
||||||
|
final SelfSignedCertificate cert = new SelfSignedCertificate();
|
||||||
|
final SslContext sslServerCtx = SslContextBuilder.forServer(cert.key(), cert.cert())
|
||||||
|
.sslProvider(SslProvider.JDK).build();
|
||||||
|
|
||||||
|
EventLoopGroup group = new MultithreadEventLoopGroup(NioHandler.newFactory());
|
||||||
|
Channel sc = null;
|
||||||
|
Channel cc = null;
|
||||||
|
final SslHandler clientSslHandler = sslClientCtx.newHandler(UnpooledByteBufAllocator.DEFAULT, command -> {
|
||||||
|
if (!client) {
|
||||||
|
command.run();
|
||||||
|
}
|
||||||
|
// Do nothing to simulate slow execution.
|
||||||
|
});
|
||||||
|
if (client) {
|
||||||
|
clientSslHandler.setHandshakeTimeout(100, TimeUnit.MILLISECONDS);
|
||||||
|
}
|
||||||
|
final SslHandler serverSslHandler = sslServerCtx.newHandler(UnpooledByteBufAllocator.DEFAULT, command -> {
|
||||||
|
if (client) {
|
||||||
|
command.run();
|
||||||
|
}
|
||||||
|
// Do nothing to simulate slow execution.
|
||||||
|
});
|
||||||
|
if (!client) {
|
||||||
|
serverSslHandler.setHandshakeTimeout(100, TimeUnit.MILLISECONDS);
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
sc = new ServerBootstrap()
|
||||||
|
.group(group)
|
||||||
|
.channel(NioServerSocketChannel.class)
|
||||||
|
.childHandler(serverSslHandler)
|
||||||
|
.bind(new InetSocketAddress(0)).syncUninterruptibly().channel();
|
||||||
|
|
||||||
|
ChannelFuture future = new Bootstrap()
|
||||||
|
.group(group)
|
||||||
|
.channel(NioSocketChannel.class)
|
||||||
|
.handler(new ChannelInitializer<Channel>() {
|
||||||
|
@Override
|
||||||
|
protected void initChannel(Channel ch) {
|
||||||
|
ch.pipeline().addLast(clientSslHandler);
|
||||||
|
}
|
||||||
|
}).connect(sc.localAddress());
|
||||||
|
cc = future.syncUninterruptibly().channel();
|
||||||
|
|
||||||
|
if (client) {
|
||||||
|
Throwable cause = clientSslHandler.handshakeFuture().await().cause();
|
||||||
|
assertThat(cause, CoreMatchers.<Throwable>instanceOf(SSLException.class));
|
||||||
|
assertThat(cause.getMessage(), containsString("timed out"));
|
||||||
|
assertFalse(serverSslHandler.handshakeFuture().await().isSuccess());
|
||||||
|
} else {
|
||||||
|
Throwable cause = serverSslHandler.handshakeFuture().await().cause();
|
||||||
|
assertThat(cause, CoreMatchers.<Throwable>instanceOf(SSLException.class));
|
||||||
|
assertThat(cause.getMessage(), containsString("timed out"));
|
||||||
|
assertFalse(clientSslHandler.handshakeFuture().await().isSuccess());
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
if (cc != null) {
|
||||||
|
cc.close().syncUninterruptibly();
|
||||||
|
}
|
||||||
|
if (sc != null) {
|
||||||
|
sc.close().syncUninterruptibly();
|
||||||
|
}
|
||||||
|
group.shutdownGracefully();
|
||||||
|
ReferenceCountUtil.release(sslClientCtx);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -18,6 +18,7 @@ package io.netty.testsuite.transport.socket;
|
|||||||
import io.netty.bootstrap.Bootstrap;
|
import io.netty.bootstrap.Bootstrap;
|
||||||
import io.netty.bootstrap.ServerBootstrap;
|
import io.netty.bootstrap.ServerBootstrap;
|
||||||
import io.netty.buffer.ByteBuf;
|
import io.netty.buffer.ByteBuf;
|
||||||
|
import io.netty.buffer.ByteBufAllocator;
|
||||||
import io.netty.channel.Channel;
|
import io.netty.channel.Channel;
|
||||||
import io.netty.channel.ChannelHandler.Sharable;
|
import io.netty.channel.ChannelHandler.Sharable;
|
||||||
import io.netty.channel.ChannelHandlerContext;
|
import io.netty.channel.ChannelHandlerContext;
|
||||||
@ -46,6 +47,9 @@ import java.security.cert.CertificateException;
|
|||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.concurrent.Executor;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
|
||||||
import javax.net.ssl.SSLHandshakeException;
|
import javax.net.ssl.SSLHandshakeException;
|
||||||
@ -73,7 +77,7 @@ public class SocketSslClientRenegotiateTest extends AbstractSocketTest {
|
|||||||
KEY_FILE = ssc.privateKey();
|
KEY_FILE = ssc.privateKey();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Parameters(name = "{index}: serverEngine = {0}, clientEngine = {1}")
|
@Parameters(name = "{index}: serverEngine = {0}, clientEngine = {1}, delegate = {2}")
|
||||||
public static Collection<Object[]> data() throws Exception {
|
public static Collection<Object[]> data() throws Exception {
|
||||||
List<SslContext> serverContexts = new ArrayList<>();
|
List<SslContext> serverContexts = new ArrayList<>();
|
||||||
List<SslContext> clientContexts = new ArrayList<>();
|
List<SslContext> clientContexts = new ArrayList<>();
|
||||||
@ -99,7 +103,8 @@ public class SocketSslClientRenegotiateTest extends AbstractSocketTest {
|
|||||||
for (SslContext sc: serverContexts) {
|
for (SslContext sc: serverContexts) {
|
||||||
for (SslContext cc: clientContexts) {
|
for (SslContext cc: clientContexts) {
|
||||||
for (int i = 0; i < 32; i++) {
|
for (int i = 0; i < 32; i++) {
|
||||||
params.add(new Object[] { sc, cc});
|
params.add(new Object[] { sc, cc, true});
|
||||||
|
params.add(new Object[] { sc, cc, false});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -109,6 +114,7 @@ public class SocketSslClientRenegotiateTest extends AbstractSocketTest {
|
|||||||
|
|
||||||
private final SslContext serverCtx;
|
private final SslContext serverCtx;
|
||||||
private final SslContext clientCtx;
|
private final SslContext clientCtx;
|
||||||
|
private final boolean delegate;
|
||||||
|
|
||||||
private final AtomicReference<Throwable> clientException = new AtomicReference<>();
|
private final AtomicReference<Throwable> clientException = new AtomicReference<>();
|
||||||
private final AtomicReference<Throwable> serverException = new AtomicReference<>();
|
private final AtomicReference<Throwable> serverException = new AtomicReference<>();
|
||||||
@ -124,9 +130,10 @@ public class SocketSslClientRenegotiateTest extends AbstractSocketTest {
|
|||||||
private final TestHandler serverHandler = new TestHandler(serverException);
|
private final TestHandler serverHandler = new TestHandler(serverException);
|
||||||
|
|
||||||
public SocketSslClientRenegotiateTest(
|
public SocketSslClientRenegotiateTest(
|
||||||
SslContext serverCtx, SslContext clientCtx) {
|
SslContext serverCtx, SslContext clientCtx, boolean delegate) {
|
||||||
this.serverCtx = serverCtx;
|
this.serverCtx = serverCtx;
|
||||||
this.clientCtx = clientCtx;
|
this.clientCtx = clientCtx;
|
||||||
|
this.delegate = delegate;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test(timeout = 30000)
|
@Test(timeout = 30000)
|
||||||
@ -137,58 +144,74 @@ public class SocketSslClientRenegotiateTest extends AbstractSocketTest {
|
|||||||
run();
|
run();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static SslHandler newSslHandler(SslContext sslCtx, ByteBufAllocator allocator, Executor executor) {
|
||||||
|
if (executor == null) {
|
||||||
|
return sslCtx.newHandler(allocator);
|
||||||
|
} else {
|
||||||
|
return sslCtx.newHandler(allocator, executor);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public void testSslRenegotiationRejected(ServerBootstrap sb, Bootstrap cb) throws Throwable {
|
public void testSslRenegotiationRejected(ServerBootstrap sb, Bootstrap cb) throws Throwable {
|
||||||
reset();
|
reset();
|
||||||
|
|
||||||
sb.childHandler(new ChannelInitializer<Channel>() {
|
final ExecutorService executorService = delegate ? Executors.newCachedThreadPool() : null;
|
||||||
@Override
|
|
||||||
@SuppressWarnings("deprecation")
|
|
||||||
public void initChannel(Channel sch) throws Exception {
|
|
||||||
serverChannel = sch;
|
|
||||||
serverSslHandler = serverCtx.newHandler(sch.alloc());
|
|
||||||
// As we test renegotiation we should use a protocol that support it.
|
|
||||||
serverSslHandler.engine().setEnabledProtocols(new String[] { "TLSv1.2" });
|
|
||||||
sch.pipeline().addLast("ssl", serverSslHandler);
|
|
||||||
sch.pipeline().addLast("handler", serverHandler);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
cb.handler(new ChannelInitializer<Channel>() {
|
|
||||||
@Override
|
|
||||||
@SuppressWarnings("deprecation")
|
|
||||||
public void initChannel(Channel sch) throws Exception {
|
|
||||||
clientChannel = sch;
|
|
||||||
clientSslHandler = clientCtx.newHandler(sch.alloc());
|
|
||||||
// As we test renegotiation we should use a protocol that support it.
|
|
||||||
clientSslHandler.engine().setEnabledProtocols(new String[] { "TLSv1.2" });
|
|
||||||
sch.pipeline().addLast("ssl", clientSslHandler);
|
|
||||||
sch.pipeline().addLast("handler", clientHandler);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
Channel sc = sb.bind().sync().channel();
|
|
||||||
cb.connect(sc.localAddress()).sync();
|
|
||||||
|
|
||||||
Future<Channel> clientHandshakeFuture = clientSslHandler.handshakeFuture();
|
|
||||||
clientHandshakeFuture.sync();
|
|
||||||
|
|
||||||
String renegotiation = clientSslHandler.engine().getEnabledCipherSuites()[0];
|
|
||||||
// Use the first previous enabled ciphersuite and try to renegotiate.
|
|
||||||
clientSslHandler.engine().setEnabledCipherSuites(new String[] { renegotiation });
|
|
||||||
clientSslHandler.renegotiate().await();
|
|
||||||
serverChannel.close().awaitUninterruptibly();
|
|
||||||
clientChannel.close().awaitUninterruptibly();
|
|
||||||
sc.close().awaitUninterruptibly();
|
|
||||||
try {
|
try {
|
||||||
if (serverException.get() != null) {
|
sb.childHandler(new ChannelInitializer<Channel>() {
|
||||||
throw serverException.get();
|
@Override
|
||||||
|
@SuppressWarnings("deprecation")
|
||||||
|
public void initChannel(Channel sch) throws Exception {
|
||||||
|
serverChannel = sch;
|
||||||
|
serverSslHandler = newSslHandler(serverCtx, sch.alloc(), executorService);
|
||||||
|
// As we test renegotiation we should use a protocol that support it.
|
||||||
|
serverSslHandler.engine().setEnabledProtocols(new String[]{"TLSv1.2"});
|
||||||
|
sch.pipeline().addLast("ssl", serverSslHandler);
|
||||||
|
sch.pipeline().addLast("handler", serverHandler);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
cb.handler(new ChannelInitializer<Channel>() {
|
||||||
|
@Override
|
||||||
|
@SuppressWarnings("deprecation")
|
||||||
|
public void initChannel(Channel sch) throws Exception {
|
||||||
|
clientChannel = sch;
|
||||||
|
clientSslHandler = newSslHandler(clientCtx, sch.alloc(), executorService);
|
||||||
|
// As we test renegotiation we should use a protocol that support it.
|
||||||
|
clientSslHandler.engine().setEnabledProtocols(new String[]{"TLSv1.2"});
|
||||||
|
sch.pipeline().addLast("ssl", clientSslHandler);
|
||||||
|
sch.pipeline().addLast("handler", clientHandler);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
Channel sc = sb.bind().sync().channel();
|
||||||
|
cb.connect(sc.localAddress()).sync();
|
||||||
|
|
||||||
|
Future<Channel> clientHandshakeFuture = clientSslHandler.handshakeFuture();
|
||||||
|
clientHandshakeFuture.sync();
|
||||||
|
|
||||||
|
String renegotiation = clientSslHandler.engine().getEnabledCipherSuites()[0];
|
||||||
|
// Use the first previous enabled ciphersuite and try to renegotiate.
|
||||||
|
clientSslHandler.engine().setEnabledCipherSuites(new String[]{renegotiation});
|
||||||
|
clientSslHandler.renegotiate().await();
|
||||||
|
serverChannel.close().awaitUninterruptibly();
|
||||||
|
clientChannel.close().awaitUninterruptibly();
|
||||||
|
sc.close().awaitUninterruptibly();
|
||||||
|
try {
|
||||||
|
if (serverException.get() != null) {
|
||||||
|
throw serverException.get();
|
||||||
|
}
|
||||||
|
fail();
|
||||||
|
} catch (DecoderException e) {
|
||||||
|
assertTrue(e.getCause() instanceof SSLHandshakeException);
|
||||||
|
}
|
||||||
|
if (clientException.get() != null) {
|
||||||
|
throw clientException.get();
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
if (executorService != null) {
|
||||||
|
executorService.shutdown();
|
||||||
}
|
}
|
||||||
fail();
|
|
||||||
} catch (DecoderException e) {
|
|
||||||
assertTrue(e.getCause() instanceof SSLHandshakeException);
|
|
||||||
}
|
|
||||||
if (clientException.get() != null) {
|
|
||||||
throw clientException.get();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -18,6 +18,7 @@ package io.netty.testsuite.transport.socket;
|
|||||||
import io.netty.bootstrap.Bootstrap;
|
import io.netty.bootstrap.Bootstrap;
|
||||||
import io.netty.bootstrap.ServerBootstrap;
|
import io.netty.bootstrap.ServerBootstrap;
|
||||||
import io.netty.buffer.ByteBuf;
|
import io.netty.buffer.ByteBuf;
|
||||||
|
import io.netty.buffer.ByteBufAllocator;
|
||||||
import io.netty.channel.Channel;
|
import io.netty.channel.Channel;
|
||||||
import io.netty.channel.ChannelHandlerContext;
|
import io.netty.channel.ChannelHandlerContext;
|
||||||
import io.netty.channel.ChannelInitializer;
|
import io.netty.channel.ChannelInitializer;
|
||||||
@ -48,6 +49,9 @@ import java.util.ArrayList;
|
|||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
import java.util.concurrent.Executor;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
@ -74,7 +78,7 @@ public class SocketSslGreetingTest extends AbstractSocketTest {
|
|||||||
KEY_FILE = ssc.privateKey();
|
KEY_FILE = ssc.privateKey();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Parameters(name = "{index}: serverEngine = {0}, clientEngine = {1}")
|
@Parameters(name = "{index}: serverEngine = {0}, clientEngine = {1}, delegate = {2}")
|
||||||
public static Collection<Object[]> data() throws Exception {
|
public static Collection<Object[]> data() throws Exception {
|
||||||
List<SslContext> serverContexts = new ArrayList<>();
|
List<SslContext> serverContexts = new ArrayList<>();
|
||||||
serverContexts.add(SslContextBuilder.forServer(CERT_FILE, KEY_FILE).sslProvider(SslProvider.JDK).build());
|
serverContexts.add(SslContextBuilder.forServer(CERT_FILE, KEY_FILE).sslProvider(SslProvider.JDK).build());
|
||||||
@ -95,7 +99,8 @@ public class SocketSslGreetingTest extends AbstractSocketTest {
|
|||||||
List<Object[]> params = new ArrayList<>();
|
List<Object[]> params = new ArrayList<>();
|
||||||
for (SslContext sc: serverContexts) {
|
for (SslContext sc: serverContexts) {
|
||||||
for (SslContext cc: clientContexts) {
|
for (SslContext cc: clientContexts) {
|
||||||
params.add(new Object[] { sc, cc });
|
params.add(new Object[] { sc, cc, true });
|
||||||
|
params.add(new Object[] { sc, cc, false });
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return params;
|
return params;
|
||||||
@ -103,10 +108,20 @@ public class SocketSslGreetingTest extends AbstractSocketTest {
|
|||||||
|
|
||||||
private final SslContext serverCtx;
|
private final SslContext serverCtx;
|
||||||
private final SslContext clientCtx;
|
private final SslContext clientCtx;
|
||||||
|
private final boolean delegate;
|
||||||
|
|
||||||
public SocketSslGreetingTest(SslContext serverCtx, SslContext clientCtx) {
|
public SocketSslGreetingTest(SslContext serverCtx, SslContext clientCtx, boolean delegate) {
|
||||||
this.serverCtx = serverCtx;
|
this.serverCtx = serverCtx;
|
||||||
this.clientCtx = clientCtx;
|
this.clientCtx = clientCtx;
|
||||||
|
this.delegate = delegate;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static SslHandler newSslHandler(SslContext sslCtx, ByteBufAllocator allocator, Executor executor) {
|
||||||
|
if (executor == null) {
|
||||||
|
return sslCtx.newHandler(allocator);
|
||||||
|
} else {
|
||||||
|
return sslCtx.newHandler(allocator, executor);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Test for https://github.com/netty/netty/pull/2437
|
// Test for https://github.com/netty/netty/pull/2437
|
||||||
@ -119,46 +134,53 @@ public class SocketSslGreetingTest extends AbstractSocketTest {
|
|||||||
final ServerHandler sh = new ServerHandler();
|
final ServerHandler sh = new ServerHandler();
|
||||||
final ClientHandler ch = new ClientHandler();
|
final ClientHandler ch = new ClientHandler();
|
||||||
|
|
||||||
sb.childHandler(new ChannelInitializer<Channel>() {
|
final ExecutorService executorService = delegate ? Executors.newCachedThreadPool() : null;
|
||||||
@Override
|
try {
|
||||||
public void initChannel(Channel sch) throws Exception {
|
sb.childHandler(new ChannelInitializer<Channel>() {
|
||||||
ChannelPipeline p = sch.pipeline();
|
@Override
|
||||||
p.addLast(serverCtx.newHandler(sch.alloc()));
|
public void initChannel(Channel sch) throws Exception {
|
||||||
p.addLast(new LoggingHandler(LOG_LEVEL));
|
ChannelPipeline p = sch.pipeline();
|
||||||
p.addLast(sh);
|
p.addLast(newSslHandler(serverCtx, sch.alloc(), executorService));
|
||||||
|
p.addLast(new LoggingHandler(LOG_LEVEL));
|
||||||
|
p.addLast(sh);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
cb.handler(new ChannelInitializer<Channel>() {
|
||||||
|
@Override
|
||||||
|
public void initChannel(Channel sch) throws Exception {
|
||||||
|
ChannelPipeline p = sch.pipeline();
|
||||||
|
p.addLast(newSslHandler(clientCtx, sch.alloc(), executorService));
|
||||||
|
p.addLast(new LoggingHandler(LOG_LEVEL));
|
||||||
|
p.addLast(ch);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
Channel sc = sb.bind().sync().channel();
|
||||||
|
Channel cc = cb.connect(sc.localAddress()).sync().channel();
|
||||||
|
|
||||||
|
ch.latch.await();
|
||||||
|
|
||||||
|
sh.channel.close().awaitUninterruptibly();
|
||||||
|
cc.close().awaitUninterruptibly();
|
||||||
|
sc.close().awaitUninterruptibly();
|
||||||
|
|
||||||
|
if (sh.exception.get() != null && !(sh.exception.get() instanceof IOException)) {
|
||||||
|
throw sh.exception.get();
|
||||||
}
|
}
|
||||||
});
|
if (ch.exception.get() != null && !(ch.exception.get() instanceof IOException)) {
|
||||||
|
throw ch.exception.get();
|
||||||
cb.handler(new ChannelInitializer<Channel>() {
|
}
|
||||||
@Override
|
if (sh.exception.get() != null) {
|
||||||
public void initChannel(Channel sch) throws Exception {
|
throw sh.exception.get();
|
||||||
ChannelPipeline p = sch.pipeline();
|
}
|
||||||
p.addLast(clientCtx.newHandler(sch.alloc()));
|
if (ch.exception.get() != null) {
|
||||||
p.addLast(new LoggingHandler(LOG_LEVEL));
|
throw ch.exception.get();
|
||||||
p.addLast(ch);
|
}
|
||||||
|
} finally {
|
||||||
|
if (executorService != null) {
|
||||||
|
executorService.shutdown();
|
||||||
}
|
}
|
||||||
});
|
|
||||||
|
|
||||||
Channel sc = sb.bind().sync().channel();
|
|
||||||
Channel cc = cb.connect(sc.localAddress()).sync().channel();
|
|
||||||
|
|
||||||
ch.latch.await();
|
|
||||||
|
|
||||||
sh.channel.close().awaitUninterruptibly();
|
|
||||||
cc.close().awaitUninterruptibly();
|
|
||||||
sc.close().awaitUninterruptibly();
|
|
||||||
|
|
||||||
if (sh.exception.get() != null && !(sh.exception.get() instanceof IOException)) {
|
|
||||||
throw sh.exception.get();
|
|
||||||
}
|
|
||||||
if (ch.exception.get() != null && !(ch.exception.get() instanceof IOException)) {
|
|
||||||
throw ch.exception.get();
|
|
||||||
}
|
|
||||||
if (sh.exception.get() != null) {
|
|
||||||
throw sh.exception.get();
|
|
||||||
}
|
|
||||||
if (ch.exception.get() != null) {
|
|
||||||
throw ch.exception.get();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -26,8 +26,8 @@ import java.util.List;
|
|||||||
|
|
||||||
public class EpollDomainSocketSslClientRenegotiateTest extends SocketSslClientRenegotiateTest {
|
public class EpollDomainSocketSslClientRenegotiateTest extends SocketSslClientRenegotiateTest {
|
||||||
|
|
||||||
public EpollDomainSocketSslClientRenegotiateTest(SslContext serverCtx, SslContext clientCtx) {
|
public EpollDomainSocketSslClientRenegotiateTest(SslContext serverCtx, SslContext clientCtx, boolean delegate) {
|
||||||
super(serverCtx, clientCtx);
|
super(serverCtx, clientCtx, delegate);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -26,8 +26,8 @@ import java.util.List;
|
|||||||
|
|
||||||
public class EpollDomainSocketSslGreetingTest extends SocketSslGreetingTest {
|
public class EpollDomainSocketSslGreetingTest extends SocketSslGreetingTest {
|
||||||
|
|
||||||
public EpollDomainSocketSslGreetingTest(SslContext serverCtx, SslContext clientCtx) {
|
public EpollDomainSocketSslGreetingTest(SslContext serverCtx, SslContext clientCtx, boolean delegate) {
|
||||||
super(serverCtx, clientCtx);
|
super(serverCtx, clientCtx, delegate);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -25,8 +25,8 @@ import java.util.List;
|
|||||||
|
|
||||||
public class EpollSocketSslClientRenegotiateTest extends SocketSslClientRenegotiateTest {
|
public class EpollSocketSslClientRenegotiateTest extends SocketSslClientRenegotiateTest {
|
||||||
|
|
||||||
public EpollSocketSslClientRenegotiateTest(SslContext serverCtx, SslContext clientCtx) {
|
public EpollSocketSslClientRenegotiateTest(SslContext serverCtx, SslContext clientCtx, boolean delegate) {
|
||||||
super(serverCtx, clientCtx);
|
super(serverCtx, clientCtx, delegate);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -25,8 +25,8 @@ import java.util.List;
|
|||||||
|
|
||||||
public class EpollSocketSslGreetingTest extends SocketSslGreetingTest {
|
public class EpollSocketSslGreetingTest extends SocketSslGreetingTest {
|
||||||
|
|
||||||
public EpollSocketSslGreetingTest(SslContext serverCtx, SslContext clientCtx) {
|
public EpollSocketSslGreetingTest(SslContext serverCtx, SslContext clientCtx, boolean delegate) {
|
||||||
super(serverCtx, clientCtx);
|
super(serverCtx, clientCtx, delegate);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -26,8 +26,8 @@ import java.util.List;
|
|||||||
|
|
||||||
public class KQueueDomainSocketSslClientRenegotiateTest extends SocketSslClientRenegotiateTest {
|
public class KQueueDomainSocketSslClientRenegotiateTest extends SocketSslClientRenegotiateTest {
|
||||||
|
|
||||||
public KQueueDomainSocketSslClientRenegotiateTest(SslContext serverCtx, SslContext clientCtx) {
|
public KQueueDomainSocketSslClientRenegotiateTest(SslContext serverCtx, SslContext clientCtx, boolean delegate) {
|
||||||
super(serverCtx, clientCtx);
|
super(serverCtx, clientCtx, delegate);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -26,8 +26,8 @@ import java.util.List;
|
|||||||
|
|
||||||
public class KQueueDomainSocketSslGreetingTest extends SocketSslGreetingTest {
|
public class KQueueDomainSocketSslGreetingTest extends SocketSslGreetingTest {
|
||||||
|
|
||||||
public KQueueDomainSocketSslGreetingTest(SslContext serverCtx, SslContext clientCtx) {
|
public KQueueDomainSocketSslGreetingTest(SslContext serverCtx, SslContext clientCtx, boolean delegate) {
|
||||||
super(serverCtx, clientCtx);
|
super(serverCtx, clientCtx, delegate);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -25,8 +25,8 @@ import java.util.List;
|
|||||||
|
|
||||||
public class KQueueSocketSslClientRenegotiateTest extends SocketSslClientRenegotiateTest {
|
public class KQueueSocketSslClientRenegotiateTest extends SocketSslClientRenegotiateTest {
|
||||||
|
|
||||||
public KQueueSocketSslClientRenegotiateTest(SslContext serverCtx, SslContext clientCtx) {
|
public KQueueSocketSslClientRenegotiateTest(SslContext serverCtx, SslContext clientCtx, boolean delegate) {
|
||||||
super(serverCtx, clientCtx);
|
super(serverCtx, clientCtx, delegate);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -25,8 +25,8 @@ import java.util.List;
|
|||||||
|
|
||||||
public class KQueueSocketSslGreetingTest extends SocketSslGreetingTest {
|
public class KQueueSocketSslGreetingTest extends SocketSslGreetingTest {
|
||||||
|
|
||||||
public KQueueSocketSslGreetingTest(SslContext serverCtx, SslContext clientCtx) {
|
public KQueueSocketSslGreetingTest(SslContext serverCtx, SslContext clientCtx, boolean delegate) {
|
||||||
super(serverCtx, clientCtx);
|
super(serverCtx, clientCtx, delegate);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
Loading…
Reference in New Issue
Block a user