Only run one SSL task per delegation (#11462)

Motivation:

We should only run one SSL task per delegation to allow more SSLEngines to make progress in a timely manner

Modifications:

- Only run one task per delegation to the executor
- Only create new SSL task if really needed
- Only schedule if not on the EventExecutor thread

Result:

More fair usage of resources and less allocations
This commit is contained in:
Norman Maurer 2021-07-08 07:56:15 +02:00 committed by GitHub
parent ede7a604f1
commit 7e39b96402
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 37 additions and 18 deletions

View File

@ -84,7 +84,10 @@ class Hidden {
"io.netty.handler.ssl.SslHandler", "io.netty.handler.ssl.SslHandler",
"runAllDelegatedTasks" "runAllDelegatedTasks"
); );
builder.allowBlockingCallsInside(
"io.netty.handler.ssl.SslHandler",
"runDelegatedTasks"
);
builder.allowBlockingCallsInside( builder.allowBlockingCallsInside(
"io.netty.util.concurrent.GlobalEventExecutor", "io.netty.util.concurrent.GlobalEventExecutor",
"takeTask"); "takeTask");

View File

@ -394,6 +394,9 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH
private final boolean startTls; private final boolean startTls;
private final SslTasksRunner sslTaskRunnerForUnwrap = new SslTasksRunner(true);
private final SslTasksRunner sslTaskRunner = new SslTasksRunner(false);
private SslHandlerCoalescingBufferQueue pendingUnencryptedWrites; private SslHandlerCoalescingBufferQueue pendingUnencryptedWrites;
private Promise<Channel> handshakePromise = new LazyChannelPromise(); private Promise<Channel> handshakePromise = new LazyChannelPromise();
private final LazyChannelPromise sslClosePromise = new LazyChannelPromise(); private final LazyChannelPromise sslClosePromise = new LazyChannelPromise();
@ -1525,19 +1528,24 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH
*/ */
private boolean runDelegatedTasks(boolean inUnwrap) { private boolean runDelegatedTasks(boolean inUnwrap) {
if (delegatedTaskExecutor == ImmediateExecutor.INSTANCE || inEventLoop(delegatedTaskExecutor)) { if (delegatedTaskExecutor == ImmediateExecutor.INSTANCE || inEventLoop(delegatedTaskExecutor)) {
// We should run the task directly in the EventExecutor thread and not offload at all. // We should run the task directly in the EventExecutor thread and not offload at all. As we are on the
// EventLoop we can just run all tasks at once.
runAllDelegatedTasks(engine); runAllDelegatedTasks(engine);
return true; return true;
} else { } else {
executeDelegatedTasks(inUnwrap); executeDelegatedTask(inUnwrap);
return false; return false;
} }
} }
private void executeDelegatedTasks(boolean inUnwrap) { private void executeDelegatedTask(boolean inUnwrap) {
executeDelegatedTask(inUnwrap ? sslTaskRunnerForUnwrap : sslTaskRunner);
}
private void executeDelegatedTask(SslTasksRunner task) {
setState(STATE_PROCESS_TASK); setState(STATE_PROCESS_TASK);
try { try {
delegatedTaskExecutor.execute(new SslTasksRunner(inUnwrap)); delegatedTaskExecutor.execute(task);
} catch (RejectedExecutionException e) { } catch (RejectedExecutionException e) {
clearState(STATE_PROCESS_TASK); clearState(STATE_PROCESS_TASK);
throw e; throw e;
@ -1615,9 +1623,10 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH
try { try {
HandshakeStatus status = engine.getHandshakeStatus(); HandshakeStatus status = engine.getHandshakeStatus();
switch (status) { switch (status) {
// There is another task that needs to be executed and offloaded to the delegatingTaskExecutor. // There is another task that needs to be executed and offloaded to the delegatingTaskExecutor as
// a result of this. Let's reschedule....
case NEED_TASK: case NEED_TASK:
executeDelegatedTasks(inUnwrap); executeDelegatedTask(this);
break; break;
@ -1693,18 +1702,25 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH
@Override @Override
public void run() { public void run() {
try { try {
runAllDelegatedTasks(engine); Runnable task = engine.getDelegatedTask();
if (task == null) {
// The task was processed in the meantime. Let's just return.
return;
}
task.run();
// All tasks were processed. EventExecutor executor = ctx.executor();
assert engine.getHandshakeStatus() != HandshakeStatus.NEED_TASK; if (executor.inEventLoop()) {
resumeOnEventExecutor();
// Jump back on the EventExecutor. } else {
ctx.executor().execute(new Runnable() { // Jump back on the EventExecutor.
@Override executor.execute(new Runnable() {
public void run() { @Override
resumeOnEventExecutor(); public void run() {
} resumeOnEventExecutor();
}); }
});
}
} catch (final Throwable cause) { } catch (final Throwable cause) {
handleException(cause); handleException(cause);
} }