Support delegating task when using ReferenceCountedOpenSslEngine. (#8859)
Motivation: SSLEngine API has a notion of tasks that may be expensive and offload these to another thread. We did not support this when using our native implementation but can now for various operations during the handshake. Modifications: - Support offloading tasks during the handshake when using our native SSLEngine implementation - Correctly handle the case when NEED_TASK is returned and nothing was consumed / produced yet Result: Be able to offload long running tasks from the EventLoop when using SslHandler with our native SSLEngine.
This commit is contained in:
parent
751cbf0f26
commit
b2dc54c8c6
@ -81,7 +81,8 @@ public abstract class ReferenceCountedOpenSslContext extends SslContext implemen
|
|||||||
AccessController.doPrivileged((PrivilegedAction<Integer>) () -> Math.max(1,
|
AccessController.doPrivileged((PrivilegedAction<Integer>) () -> Math.max(1,
|
||||||
SystemPropertyUtil.getInt("io.netty.handler.ssl.openssl.bioNonApplicationBufferSize",
|
SystemPropertyUtil.getInt("io.netty.handler.ssl.openssl.bioNonApplicationBufferSize",
|
||||||
2048)));
|
2048)));
|
||||||
|
private static final boolean USE_TASKS =
|
||||||
|
SystemPropertyUtil.getBoolean("io.netty.handler.ssl.openssl.useTasks", false);
|
||||||
private static final Integer DH_KEY_LENGTH;
|
private static final Integer DH_KEY_LENGTH;
|
||||||
private static final ResourceLeakDetector<ReferenceCountedOpenSslContext> leakDetector =
|
private static final ResourceLeakDetector<ReferenceCountedOpenSslContext> leakDetector =
|
||||||
ResourceLeakDetectorFactory.instance().newResourceLeakDetector(ReferenceCountedOpenSslContext.class);
|
ResourceLeakDetectorFactory.instance().newResourceLeakDetector(ReferenceCountedOpenSslContext.class);
|
||||||
@ -333,6 +334,8 @@ public abstract class ReferenceCountedOpenSslContext extends SslContext implemen
|
|||||||
if (enableOcsp) {
|
if (enableOcsp) {
|
||||||
SSLContext.enableOcsp(ctx, isClient());
|
SSLContext.enableOcsp(ctx, isClient());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
SSLContext.setUseTasks(ctx, USE_TASKS);
|
||||||
success = true;
|
success = true;
|
||||||
} finally {
|
} finally {
|
||||||
if (!success) {
|
if (!success) {
|
||||||
|
@ -76,6 +76,7 @@ import static java.lang.Integer.MAX_VALUE;
|
|||||||
import static java.lang.Math.min;
|
import static java.lang.Math.min;
|
||||||
import static java.util.Objects.requireNonNull;
|
import static java.util.Objects.requireNonNull;
|
||||||
import static javax.net.ssl.SSLEngineResult.HandshakeStatus.FINISHED;
|
import static javax.net.ssl.SSLEngineResult.HandshakeStatus.FINISHED;
|
||||||
|
import static javax.net.ssl.SSLEngineResult.HandshakeStatus.NEED_TASK;
|
||||||
import static javax.net.ssl.SSLEngineResult.HandshakeStatus.NEED_UNWRAP;
|
import static javax.net.ssl.SSLEngineResult.HandshakeStatus.NEED_UNWRAP;
|
||||||
import static javax.net.ssl.SSLEngineResult.HandshakeStatus.NEED_WRAP;
|
import static javax.net.ssl.SSLEngineResult.HandshakeStatus.NEED_WRAP;
|
||||||
import static javax.net.ssl.SSLEngineResult.HandshakeStatus.NOT_HANDSHAKING;
|
import static javax.net.ssl.SSLEngineResult.HandshakeStatus.NOT_HANDSHAKING;
|
||||||
@ -169,6 +170,7 @@ public class ReferenceCountedOpenSslEngine extends SSLEngine implements Referenc
|
|||||||
private boolean receivedShutdown;
|
private boolean receivedShutdown;
|
||||||
private volatile int destroyed;
|
private volatile int destroyed;
|
||||||
private volatile String applicationProtocol;
|
private volatile String applicationProtocol;
|
||||||
|
private volatile boolean needTask;
|
||||||
|
|
||||||
// Reference Counting
|
// Reference Counting
|
||||||
private final ResourceLeakTracker<ReferenceCountedOpenSslEngine> leak;
|
private final ResourceLeakTracker<ReferenceCountedOpenSslEngine> leak;
|
||||||
@ -749,6 +751,10 @@ public class ReferenceCountedOpenSslEngine extends SSLEngine implements Referenc
|
|||||||
// we may have freed up space by flushing above.
|
// we may have freed up space by flushing above.
|
||||||
bytesProduced = bioLengthBefore - SSL.bioLengthByteBuffer(networkBIO);
|
bytesProduced = bioLengthBefore - SSL.bioLengthByteBuffer(networkBIO);
|
||||||
|
|
||||||
|
if (status == NEED_TASK) {
|
||||||
|
return newResult(status, 0, bytesProduced);
|
||||||
|
}
|
||||||
|
|
||||||
if (bytesProduced > 0) {
|
if (bytesProduced > 0) {
|
||||||
// If we have filled up the dst buffer and we have not finished the handshake we should try to
|
// If we have filled up the dst buffer and we have not finished the handshake we should try to
|
||||||
// wrap again. Otherwise we should only try to wrap again if there is still data pending in
|
// wrap again. Otherwise we should only try to wrap again if there is still data pending in
|
||||||
@ -879,6 +885,8 @@ public class ReferenceCountedOpenSslEngine extends SSLEngine implements Referenc
|
|||||||
// to write encrypted data to. This is an OVERFLOW condition.
|
// to write encrypted data to. This is an OVERFLOW condition.
|
||||||
// [1] https://www.openssl.org/docs/manmaster/ssl/SSL_write.html
|
// [1] https://www.openssl.org/docs/manmaster/ssl/SSL_write.html
|
||||||
return newResult(BUFFER_OVERFLOW, status, bytesConsumed, bytesProduced);
|
return newResult(BUFFER_OVERFLOW, status, bytesConsumed, bytesProduced);
|
||||||
|
} else if (sslError == SSL.SSL_ERROR_WANT_X509_LOOKUP) {
|
||||||
|
return newResult(NEED_TASK, bytesConsumed, bytesProduced);
|
||||||
} else {
|
} else {
|
||||||
// Everything else is considered as error
|
// Everything else is considered as error
|
||||||
throw shutdownWithError("SSL_write", sslError);
|
throw shutdownWithError("SSL_write", sslError);
|
||||||
@ -919,6 +927,10 @@ public class ReferenceCountedOpenSslEngine extends SSLEngine implements Referenc
|
|||||||
}
|
}
|
||||||
return new SSLEngineResult(CLOSED, hs, bytesConsumed, bytesProduced);
|
return new SSLEngineResult(CLOSED, hs, bytesConsumed, bytesProduced);
|
||||||
}
|
}
|
||||||
|
if (hs == NEED_TASK) {
|
||||||
|
// Set needTask to true so getHandshakeStatus() will return the correct value.
|
||||||
|
needTask = true;
|
||||||
|
}
|
||||||
return new SSLEngineResult(status, hs, bytesConsumed, bytesProduced);
|
return new SSLEngineResult(status, hs, bytesConsumed, bytesProduced);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1014,6 +1026,11 @@ public class ReferenceCountedOpenSslEngine extends SSLEngine implements Referenc
|
|||||||
}
|
}
|
||||||
|
|
||||||
status = handshake();
|
status = handshake();
|
||||||
|
|
||||||
|
if (status == NEED_TASK) {
|
||||||
|
return newResult(status, 0, 0);
|
||||||
|
}
|
||||||
|
|
||||||
if (status == NEED_WRAP) {
|
if (status == NEED_WRAP) {
|
||||||
return NEED_WRAP_OK;
|
return NEED_WRAP_OK;
|
||||||
}
|
}
|
||||||
@ -1155,6 +1172,9 @@ public class ReferenceCountedOpenSslEngine extends SSLEngine implements Referenc
|
|||||||
}
|
}
|
||||||
return newResultMayFinishHandshake(isInboundDone() ? CLOSED : OK, status,
|
return newResultMayFinishHandshake(isInboundDone() ? CLOSED : OK, status,
|
||||||
bytesConsumed, bytesProduced);
|
bytesConsumed, bytesProduced);
|
||||||
|
} else if (sslError == SSL.SSL_ERROR_WANT_X509_LOOKUP) {
|
||||||
|
return newResult(isInboundDone() ? CLOSED : OK,
|
||||||
|
NEED_TASK, bytesConsumed, bytesProduced);
|
||||||
} else {
|
} else {
|
||||||
return sslReadErrorResult(sslError, SSL.getLastErrorNumber(), bytesConsumed,
|
return sslReadErrorResult(sslError, SSL.getLastErrorNumber(), bytesConsumed,
|
||||||
bytesProduced);
|
bytesProduced);
|
||||||
@ -1287,11 +1307,30 @@ public class ReferenceCountedOpenSslEngine extends SSLEngine implements Referenc
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public final Runnable getDelegatedTask() {
|
public final synchronized Runnable getDelegatedTask() {
|
||||||
// Currently, we do not delegate SSL computation tasks
|
if (isDestroyed()) {
|
||||||
// TODO: in the future, possibly create tasks to do encrypt / decrypt async
|
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
final Runnable task = SSL.getTask(ssl);
|
||||||
|
if (task == null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
return new Runnable() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
try {
|
||||||
|
if (isDestroyed()) {
|
||||||
|
// The engine was destroyed in the meantime, just return.
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
task.run();
|
||||||
|
} finally {
|
||||||
|
// The task was run, reset needTask to false so getHandshakeStatus() returns the correct value.
|
||||||
|
needTask = false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public final synchronized void closeInbound() throws SSLException {
|
public final synchronized void closeInbound() throws SSLException {
|
||||||
@ -1611,7 +1650,10 @@ public class ReferenceCountedOpenSslEngine extends SSLEngine implements Referenc
|
|||||||
throw RENEGOTIATION_UNSUPPORTED;
|
throw RENEGOTIATION_UNSUPPORTED;
|
||||||
case NOT_STARTED:
|
case NOT_STARTED:
|
||||||
handshakeState = HandshakeState.STARTED_EXPLICITLY;
|
handshakeState = HandshakeState.STARTED_EXPLICITLY;
|
||||||
handshake();
|
if (handshake() == NEED_TASK) {
|
||||||
|
// Set needTask to true so getHandshakeStatus() will return the correct value.
|
||||||
|
needTask = true;
|
||||||
|
}
|
||||||
calculateMaxWrapOverhead();
|
calculateMaxWrapOverhead();
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
@ -1681,10 +1723,18 @@ public class ReferenceCountedOpenSslEngine extends SSLEngine implements Referenc
|
|||||||
int sslError = SSL.getError(ssl, code);
|
int sslError = SSL.getError(ssl, code);
|
||||||
if (sslError == SSL.SSL_ERROR_WANT_READ || sslError == SSL.SSL_ERROR_WANT_WRITE) {
|
if (sslError == SSL.SSL_ERROR_WANT_READ || sslError == SSL.SSL_ERROR_WANT_WRITE) {
|
||||||
return pendingStatus(SSL.bioLengthNonApplication(networkBIO));
|
return pendingStatus(SSL.bioLengthNonApplication(networkBIO));
|
||||||
} else {
|
}
|
||||||
|
|
||||||
|
if (sslError == SSL.SSL_ERROR_WANT_X509_LOOKUP) {
|
||||||
|
return NEED_TASK;
|
||||||
|
}
|
||||||
|
|
||||||
// Everything else is considered as error
|
// Everything else is considered as error
|
||||||
throw shutdownWithError("SSL_do_handshake", sslError);
|
throw shutdownWithError("SSL_do_handshake", sslError);
|
||||||
}
|
}
|
||||||
|
// We have produced more data as part of the handshake if this is the case the user should call wrap(...)
|
||||||
|
if (SSL.bioLengthNonApplication(networkBIO) > 0) {
|
||||||
|
return NEED_WRAP;
|
||||||
}
|
}
|
||||||
// if SSL_do_handshake returns > 0 or sslError == SSL.SSL_ERROR_NAME it means the handshake was finished.
|
// if SSL_do_handshake returns > 0 or sslError == SSL.SSL_ERROR_NAME it means the handshake was finished.
|
||||||
session.handshakeFinished();
|
session.handshakeFinished();
|
||||||
@ -1704,12 +1754,26 @@ public class ReferenceCountedOpenSslEngine extends SSLEngine implements Referenc
|
|||||||
@Override
|
@Override
|
||||||
public final synchronized SSLEngineResult.HandshakeStatus getHandshakeStatus() {
|
public final synchronized SSLEngineResult.HandshakeStatus getHandshakeStatus() {
|
||||||
// Check if we are in the initial handshake phase or shutdown phase
|
// Check if we are in the initial handshake phase or shutdown phase
|
||||||
return needPendingStatus() ? pendingStatus(SSL.bioLengthNonApplication(networkBIO)) : NOT_HANDSHAKING;
|
if (needPendingStatus()) {
|
||||||
|
if (needTask) {
|
||||||
|
// There is a task outstanding
|
||||||
|
return NEED_TASK;
|
||||||
|
}
|
||||||
|
return pendingStatus(SSL.bioLengthNonApplication(networkBIO));
|
||||||
|
}
|
||||||
|
return NOT_HANDSHAKING;
|
||||||
}
|
}
|
||||||
|
|
||||||
private SSLEngineResult.HandshakeStatus getHandshakeStatus(int pending) {
|
private SSLEngineResult.HandshakeStatus getHandshakeStatus(int pending) {
|
||||||
// Check if we are in the initial handshake phase or shutdown phase
|
// Check if we are in the initial handshake phase or shutdown phase
|
||||||
return needPendingStatus() ? pendingStatus(pending) : NOT_HANDSHAKING;
|
if (needPendingStatus()) {
|
||||||
|
if (needTask) {
|
||||||
|
// There is a task outstanding
|
||||||
|
return NEED_TASK;
|
||||||
|
}
|
||||||
|
return pendingStatus(pending);
|
||||||
|
}
|
||||||
|
return NOT_HANDSHAKING;
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean needPendingStatus() {
|
private boolean needPendingStatus() {
|
||||||
|
@ -1505,10 +1505,8 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH
|
|||||||
private boolean runDelegatedTasks(boolean inUnwrap) {
|
private boolean runDelegatedTasks(boolean inUnwrap) {
|
||||||
if (delegatedTaskExecutor == ImmediateExecutor.INSTANCE || inEventLoop(delegatedTaskExecutor)) {
|
if (delegatedTaskExecutor == ImmediateExecutor.INSTANCE || inEventLoop(delegatedTaskExecutor)) {
|
||||||
// We should run the task directly in the EventExecutor thread and not offload at all.
|
// We should run the task directly in the EventExecutor thread and not offload at all.
|
||||||
for (;;) {
|
|
||||||
runAllDelegatedTasks(engine);
|
runAllDelegatedTasks(engine);
|
||||||
return true;
|
return true;
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
executeDelegatedTasks(inUnwrap);
|
executeDelegatedTasks(inUnwrap);
|
||||||
return false;
|
return false;
|
||||||
|
2
pom.xml
2
pom.xml
@ -268,7 +268,7 @@
|
|||||||
<!-- Fedora-"like" systems. This is currently only used for the netty-tcnative dependency -->
|
<!-- Fedora-"like" systems. This is currently only used for the netty-tcnative dependency -->
|
||||||
<os.detection.classifierWithLikes>fedora</os.detection.classifierWithLikes>
|
<os.detection.classifierWithLikes>fedora</os.detection.classifierWithLikes>
|
||||||
<tcnative.artifactId>netty-tcnative</tcnative.artifactId>
|
<tcnative.artifactId>netty-tcnative</tcnative.artifactId>
|
||||||
<tcnative.version>2.0.20.Final</tcnative.version>
|
<tcnative.version>2.0.22.Final</tcnative.version>
|
||||||
<tcnative.classifier>${os.detected.classifier}</tcnative.classifier>
|
<tcnative.classifier>${os.detected.classifier}</tcnative.classifier>
|
||||||
<conscrypt.groupId>org.conscrypt</conscrypt.groupId>
|
<conscrypt.groupId>org.conscrypt</conscrypt.groupId>
|
||||||
<conscrypt.artifactId>conscrypt-openjdk-uber</conscrypt.artifactId>
|
<conscrypt.artifactId>conscrypt-openjdk-uber</conscrypt.artifactId>
|
||||||
|
Loading…
Reference in New Issue
Block a user