Only run one SSL task per delegation (#11462)

Motivation:

We should only run one SSL task per delegation to allow more SSLEngines to make progress in a timely manner

Modifications:

- Only run one task per delegation to the executor
- Only create new SSL task if really needed
- Only schedule if not on the EventExecutor thread

Result:

More fair usage of resources and less allocations
This commit is contained in:
Norman Maurer 2021-07-08 07:56:15 +02:00
parent 04cb23626d
commit 54aa4d9b68
2 changed files with 31 additions and 12 deletions

View File

@ -94,6 +94,10 @@ class Hidden {
"io.netty.handler.ssl.SslHandler", "io.netty.handler.ssl.SslHandler",
"runAllDelegatedTasks" "runAllDelegatedTasks"
); );
builder.allowBlockingCallsInside(
"io.netty.handler.ssl.SslHandler",
"runDelegatedTasks"
);
builder.allowBlockingCallsInside( builder.allowBlockingCallsInside(
"io.netty.handler.ssl.ReferenceCountedOpenSslClientContext$ExtendedTrustManagerVerifyCallback", "io.netty.handler.ssl.ReferenceCountedOpenSslClientContext$ExtendedTrustManagerVerifyCallback",

View File

@ -392,6 +392,9 @@ public class SslHandler extends ByteToMessageDecoder {
private final boolean startTls; private final boolean startTls;
private final SslTasksRunner sslTaskRunnerForUnwrap = new SslTasksRunner(true);
private final SslTasksRunner sslTaskRunner = new SslTasksRunner(false);
private SslHandlerCoalescingBufferQueue pendingUnencryptedWrites; private SslHandlerCoalescingBufferQueue pendingUnencryptedWrites;
private Promise<Channel> handshakePromise = new LazyPromise(); private Promise<Channel> handshakePromise = new LazyPromise();
private final Promise<Channel> sslClosePromise = new LazyPromise(); private final Promise<Channel> sslClosePromise = new LazyPromise();
@ -1510,19 +1513,24 @@ public class SslHandler extends ByteToMessageDecoder {
*/ */
private boolean runDelegatedTasks(boolean inUnwrap) { private boolean runDelegatedTasks(boolean inUnwrap) {
if (delegatedTaskExecutor == ImmediateExecutor.INSTANCE || inEventLoop(delegatedTaskExecutor)) { if (delegatedTaskExecutor == ImmediateExecutor.INSTANCE || inEventLoop(delegatedTaskExecutor)) {
// We should run the task directly in the EventExecutor thread and not offload at all. // We should run the task directly in the EventExecutor thread and not offload at all. As we are on the
// EventLoop we can just run all tasks at once.
runAllDelegatedTasks(engine); runAllDelegatedTasks(engine);
return true; return true;
} else { } else {
executeDelegatedTasks(inUnwrap); executeDelegatedTask(inUnwrap);
return false; return false;
} }
} }
private void executeDelegatedTasks(boolean inUnwrap) { private void executeDelegatedTask(boolean inUnwrap) {
executeDelegatedTask(inUnwrap ? sslTaskRunnerForUnwrap : sslTaskRunner);
}
private void executeDelegatedTask(SslTasksRunner task) {
setState(STATE_PROCESS_TASK); setState(STATE_PROCESS_TASK);
try { try {
delegatedTaskExecutor.execute(new SslTasksRunner(inUnwrap)); delegatedTaskExecutor.execute(task);
} catch (RejectedExecutionException e) { } catch (RejectedExecutionException e) {
clearState(STATE_PROCESS_TASK); clearState(STATE_PROCESS_TASK);
throw e; throw e;
@ -1600,9 +1608,10 @@ public class SslHandler extends ByteToMessageDecoder {
try { try {
HandshakeStatus status = engine.getHandshakeStatus(); HandshakeStatus status = engine.getHandshakeStatus();
switch (status) { switch (status) {
// There is another task that needs to be executed and offloaded to the delegatingTaskExecutor. // There is another task that needs to be executed and offloaded to the delegatingTaskExecutor as
// a result of this. Let's reschedule....
case NEED_TASK: case NEED_TASK:
executeDelegatedTasks(inUnwrap); executeDelegatedTask(this);
break; break;
@ -1678,13 +1687,19 @@ public class SslHandler extends ByteToMessageDecoder {
@Override @Override
public void run() { public void run() {
try { try {
runAllDelegatedTasks(engine); Runnable task = engine.getDelegatedTask();
if (task == null) {
// The task was processed in the meantime. Let's just return.
return;
}
task.run();
// All tasks were processed. EventExecutor executor = ctx.executor();
assert engine.getHandshakeStatus() != HandshakeStatus.NEED_TASK; if (executor.inEventLoop()) {
resumeOnEventExecutor();
// Jump back on the EventExecutor. } else {
ctx.executor().execute(this::resumeOnEventExecutor); executor.execute(this::resumeOnEventExecutor);
}
} catch (final Throwable cause) { } catch (final Throwable cause) {
handleException(cause); handleException(cause);
} }