Fix a regression in SslHandler where delegated tasks run in a different executor makes the session hang

- Fixes #2098
This commit is contained in:
Trustin Lee 2014-01-09 15:13:31 +09:00
parent fc9a794613
commit 8198f23dbb
2 changed files with 98 additions and 18 deletions

View File

@ -49,7 +49,9 @@ import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.DatagramChannel;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
@ -431,7 +433,7 @@ public class SslHandler extends FrameDecoder
handshaking = true;
try {
engine.beginHandshake();
runDelegatedTasks();
runDelegatedTasks(false);
handshakeFuture = this.handshakeFuture = future(channel);
if (handshakeTimeoutInMillis > 0) {
handshakeTimeout = timer.newTimeout(new TimerTask() {
@ -1037,7 +1039,7 @@ public class SslHandler extends FrameDecoder
needsUnwrap = true;
break loop;
case NEED_TASK:
runDelegatedTasks();
runDelegatedTasks(false);
break;
case FINISHED:
case NOT_HANDSHAKING:
@ -1182,10 +1184,10 @@ public class SslHandler extends FrameDecoder
switch (handshakeStatus) {
case FINISHED:
setHandshakeSuccess(channel);
runDelegatedTasks();
runDelegatedTasks(false);
break;
case NEED_TASK:
runDelegatedTasks();
runDelegatedTasks(false);
break;
case NEED_UNWRAP:
if (!Thread.holdsLock(handshakeLock)) {
@ -1294,7 +1296,7 @@ public class SslHandler extends FrameDecoder
wrapNonAppData(ctx, channel);
break;
case NEED_TASK:
runDelegatedTasks();
runDelegatedTasks(true);
break;
case FINISHED:
setHandshakeSuccess(channel);
@ -1393,21 +1395,64 @@ public class SslHandler extends FrameDecoder
}
}
private void runDelegatedTasks() {
for (;;) {
final Runnable task;
synchronized (handshakeLock) {
task = engine.getDelegatedTask();
/**
* Fetches all delegated tasks from the {@link SSLEngine} and runs them via the {@link #delegatedTaskExecutor}.
* If the {@link #delegatedTaskExecutor} is {@link ImmediateExecutor}, just call {@link Runnable#run()} directly
* instead of using {@link Executor#execute(Runnable)}. Otherwise, run the tasks via
* the {@link #delegatedTaskExecutor} and continue unwrapping so that the handshake completes.
*/
private void runDelegatedTasks(final boolean unwrapLater) {
if (delegatedTaskExecutor == ImmediateExecutor.INSTANCE) {
for (;;) {
final Runnable task;
synchronized (handshakeLock) {
task = engine.getDelegatedTask();
}
if (task == null) {
break;
}
delegatedTaskExecutor.execute(task);
}
} else {
final List<Runnable> tasks = new ArrayList<Runnable>(2);
for (;;) {
final Runnable task;
synchronized (handshakeLock) {
task = engine.getDelegatedTask();
}
if (task == null) {
break;
}
tasks.add(task);
}
if (task == null) {
break;
if (tasks.isEmpty()) {
return;
}
delegatedTaskExecutor.execute(new Runnable() {
public void run() {
synchronized (handshakeLock) {
task.run();
try {
for (Runnable task: tasks) {
task.run();
}
if (unwrapLater) {
ctx.getPipeline().execute(new Runnable() {
public void run() {
try {
unwrap(ctx, ctx.getChannel(), ChannelBuffers.EMPTY_BUFFER, 0, 0);
} catch (Exception e) {
fireExceptionCaught(ctx, e);
}
}
});
}
} catch (Exception e) {
fireExceptionCaught(ctx, e);
}
}
});
@ -1664,7 +1709,7 @@ public class SslHandler extends FrameDecoder
}
private final class SSLEngineInboundCloseFuture extends DefaultChannelFuture {
public SSLEngineInboundCloseFuture() {
SSLEngineInboundCloseFuture() {
super(null, true);
}

View File

@ -66,7 +66,29 @@ public abstract class AbstractSocketSslEchoTest {
}
@Test
public void testSslEcho() throws Throwable {
public void testSslEcho1() throws Throwable {
testSslEcho(false, false);
}
@Test
public void testSslEcho2() throws Throwable {
testSslEcho(false, true);
}
@Test
public void testSslEcho3() throws Throwable {
testSslEcho(true, false);
}
@Test
public void testSslEcho4() throws Throwable {
testSslEcho(true, true);
}
private void testSslEcho(
boolean serverUsesDelegatedTaskExecutor, boolean clientUsesDelegatedTaskExecutor) throws Throwable {
ExecutorService delegatedTaskExecutor = Executors.newCachedThreadPool();
ServerBootstrap sb = new ServerBootstrap(newServerSocketChannelFactory(Executors.newCachedThreadPool()));
ClientBootstrap cb = new ClientBootstrap(newClientSocketChannelFactory(Executors.newCachedThreadPool()));
@ -82,10 +104,22 @@ public abstract class AbstractSocketSslEchoTest {
sb.setOption("receiveBufferSize", 1048576);
sb.setOption("receiveBufferSize", 1048576);
sb.getPipeline().addFirst("ssl", new SslHandler(sse));
// Configure the server pipeline.
if (serverUsesDelegatedTaskExecutor) {
sb.getPipeline().addFirst("ssl", new SslHandler(sse, delegatedTaskExecutor));
} else {
sb.getPipeline().addFirst("ssl", new SslHandler(sse));
}
sb.getPipeline().addLast("handler", sh);
cb.getPipeline().addFirst("ssl", new SslHandler(cse));
// Configure the client pipeline.
if (clientUsesDelegatedTaskExecutor) {
cb.getPipeline().addFirst("ssl", new SslHandler(cse, delegatedTaskExecutor));
} else {
cb.getPipeline().addFirst("ssl", new SslHandler(cse));
}
cb.getPipeline().addLast("handler", ch);
ExecutorService eventExecutor = null;
if (isExecutorRequired()) {
eventExecutor = new OrderedMemoryAwareThreadPoolExecutor(16, 0, 0);
@ -159,6 +193,7 @@ public abstract class AbstractSocketSslEchoTest {
sb.shutdown();
cb.releaseExternalResources();
sb.releaseExternalResources();
delegatedTaskExecutor.shutdown();
if (eventExecutor != null) {
eventExecutor.shutdown();