[#160] No response to write if server is using SslHandler and client is not

- Make SslHandler close the connection on SSLException or NotSslRecordException
This commit is contained in:
Trustin Lee 2012-08-22 13:38:09 +09:00
parent 73720c422d
commit 00188a2923
2 changed files with 141 additions and 71 deletions

View File

@ -162,22 +162,7 @@ public class SslHandler
private volatile ChannelHandlerContext ctx; private volatile ChannelHandlerContext ctx;
private final SSLEngine engine; private final SSLEngine engine;
private final Executor delegatedTaskExecutor; private final Executor delegatedTaskExecutor;
private final ChannelFlushFutureNotifier flushFutureNotifier = new ChannelFlushFutureNotifier() { private final ChannelFlushFutureNotifier flushFutureNotifier = new ChannelFlushFutureNotifier();
@Override
public synchronized void increaseWriteCounter(long delta) {
super.increaseWriteCounter(delta);
}
@Override
public synchronized void notifyFlushFutures() {
super.notifyFlushFutures();
}
@Override
public synchronized void notifyFlushFutures(Throwable cause) {
super.notifyFlushFutures(cause);
}
};
private final boolean startTls; private final boolean startTls;
private boolean sentFirstMessage; private boolean sentFirstMessage;
@ -285,6 +270,7 @@ public class SslHandler
} catch (Exception e) { } catch (Exception e) {
future.setFailure(e); future.setFailure(e);
ctx.fireExceptionCaught(e); ctx.fireExceptionCaught(e);
ctx.close();
} }
} }
}); });
@ -363,14 +349,20 @@ public class SslHandler
return; return;
} }
if (ctx.executor() == ctx.channel().eventLoop()) {
flushFutureNotifier.addFlushFuture(future, in.readableBytes()); flushFutureNotifier.addFlushFuture(future, in.readableBytes());
} else {
synchronized (flushFutureNotifier) {
flushFutureNotifier.addFlushFuture(future, in.readableBytes());
}
}
boolean unwrapLater = false; boolean unwrapLater = false;
int bytesProduced = 0; int bytesConsumed = 0;
try { try {
for (;;) { for (;;) {
SSLEngineResult result = wrap(engine, in, out); SSLEngineResult result = wrap(engine, in, out);
bytesProduced += result.bytesProduced(); bytesConsumed += result.bytesConsumed();
if (result.getStatus() == Status.CLOSED) { if (result.getStatus() == Status.CLOSED) {
// SSLEngine has been closed already. // SSLEngine has been closed already.
// Any further write attempts should be denied. // Any further write attempts should be denied.
@ -379,6 +371,8 @@ public class SslHandler
SSLException e = new SSLException("SSLEngine already closed"); SSLException e = new SSLException("SSLEngine already closed");
future.setFailure(e); future.setFailure(e);
ctx.fireExceptionCaught(e); ctx.fireExceptionCaught(e);
flush0(ctx, bytesConsumed, e);
bytesConsumed = 0;
} }
break; break;
} else { } else {
@ -417,11 +411,61 @@ public class SslHandler
throw e; throw e;
} finally { } finally {
in.unsafe().discardSomeReadBytes(); in.unsafe().discardSomeReadBytes();
flushFutureNotifier.increaseWriteCounter(bytesProduced); flush0(ctx, bytesConsumed);
ctx.flush(ctx.newFuture().addListener(flushFutureNotifier));
} }
} }
private void flush0(final ChannelHandlerContext ctx, final int bytesConsumed) {
ctx.flush(ctx.newFuture().addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (ctx.executor() == ctx.channel().eventLoop()) {
notifyFlushFutures(bytesConsumed, future);
} else {
synchronized (flushFutureNotifier) {
notifyFlushFutures(bytesConsumed, future);
}
}
}
private void notifyFlushFutures(final int bytesConsumed, ChannelFuture future) {
if (future.isSuccess()) {
flushFutureNotifier.increaseWriteCounter(bytesConsumed);
flushFutureNotifier.notifyFlushFutures();
} else {
flushFutureNotifier.notifyFlushFutures(future.cause());
}
}
}));
}
private void flush0(final ChannelHandlerContext ctx, final int bytesConsumed, final Throwable cause) {
ChannelFuture flushFuture = ctx.flush(ctx.newFuture().addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (ctx.executor() == ctx.channel().eventLoop()) {
notifyFlushFutures(ctx, bytesConsumed, cause, future);
} else {
synchronized (flushFutureNotifier) {
notifyFlushFutures(ctx, bytesConsumed, cause, future);
}
}
}
private void notifyFlushFutures(final ChannelHandlerContext ctx,
final int bytesConsumed, final Throwable cause, ChannelFuture future) {
flushFutureNotifier.increaseWriteCounter(bytesConsumed);
if (future.isSuccess()) {
flushFutureNotifier.notifyFlushFutures(cause);
} else {
flushFutureNotifier.notifyFlushFutures(cause, future.cause());
}
}
}));
safeClose(ctx, flushFuture, ctx.newFuture());
}
private static SSLEngineResult wrap(SSLEngine engine, ByteBuf in, ByteBuf out) throws SSLException { private static SSLEngineResult wrap(SSLEngine engine, ByteBuf in, ByteBuf out) throws SSLException {
ByteBuffer in0 = in.nioBuffer(); ByteBuffer in0 = in.nioBuffer();
for (;;) { for (;;) {
@ -614,6 +658,7 @@ public class SslHandler
NotSslRecordException e = new NotSslRecordException( NotSslRecordException e = new NotSslRecordException(
"not an SSL/TLS record: " + ByteBufUtil.hexDump(in)); "not an SSL/TLS record: " + ByteBufUtil.hexDump(in));
in.skipBytes(in.readableBytes()); in.skipBytes(in.readableBytes());
setHandshakeFailure(e);
throw e; throw e;
} }
} }
@ -738,16 +783,19 @@ public class SslHandler
} }
} }
if (cause == null) {
cause = new ClosedChannelException();
}
for (;;) { for (;;) {
ChannelFuture f = handshakeFutures.poll(); ChannelFuture f = handshakeFutures.poll();
if (f == null) { if (f == null) {
break; break;
} }
if (cause == null) {
cause = new ClosedChannelException();
}
f.setFailure(cause); f.setFailure(cause);
} }
flush0(ctx, 0, cause);
} }
private void closeOutboundAndChannel( private void closeOutboundAndChannel(
@ -765,26 +813,7 @@ public class SslHandler
ChannelFuture closeNotifyFuture = ctx.newFuture(); ChannelFuture closeNotifyFuture = ctx.newFuture();
flush(ctx, closeNotifyFuture); flush(ctx, closeNotifyFuture);
safeClose(ctx, closeNotifyFuture, future);
// Force-close the connection if close_notify is not fully sent in time.
final ScheduledFuture<?> timeoutFuture = ctx.executor().schedule(new Runnable() {
@Override
public void run() {
logger.warn(ctx.channel() + "close_notify write attempt timed out. Force-closing the connection.");
ctx.close(future);
}
}, 3, TimeUnit.SECONDS); // FIXME: Magic value
// Close the connection if close_notify is sent in time.
closeNotifyFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture f)
throws Exception {
if (timeoutFuture.cancel(false)) {
ctx.close(future);
}
}
});
} }
@Override @Override
@ -813,11 +842,11 @@ public class SslHandler
// issue and handshake and add a listener to it which will fire an exception event if // issue and handshake and add a listener to it which will fire an exception event if
// an exception was thrown while doing the handshake // an exception was thrown while doing the handshake
handshake().addListener(new ChannelFutureListener() { handshake().addListener(new ChannelFutureListener() {
@Override @Override
public void operationComplete(ChannelFuture future) throws Exception { public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) { if (!future.isSuccess()) {
ctx.pipeline().fireExceptionCaught(future.cause()); ctx.pipeline().fireExceptionCaught(future.cause());
ctx.close();
} else { } else {
// Send the event upstream after the handshake was completed without an error. // Send the event upstream after the handshake was completed without an error.
// //
@ -832,6 +861,38 @@ public class SslHandler
} }
} }
private static void safeClose(
final ChannelHandlerContext ctx, ChannelFuture flushFuture,
final ChannelFuture closeFuture) {
if (!ctx.channel().isActive()) {
ctx.close(closeFuture);
return;
}
// Force-close the connection if close_notify is not fully sent in time.
final ScheduledFuture<?> timeoutFuture = ctx.executor().schedule(new Runnable() {
@Override
public void run() {
logger.warn(
ctx.channel() + " last lssssswrite attempt timed out." +
" Force-closing the connection.");
ctx.close(closeFuture);
}
}, 3, TimeUnit.SECONDS); // FIXME: Magic value
// Close the connection if close_notify is sent in time.
flushFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture f)
throws Exception {
timeoutFuture.cancel(false);
if (ctx.channel().isActive()) {
ctx.close(closeFuture);
}
}
});
}
private final class SSLEngineInboundCloseFuture extends DefaultChannelFuture { private final class SSLEngineInboundCloseFuture extends DefaultChannelFuture {
public SSLEngineInboundCloseFuture() { public SSLEngineInboundCloseFuture() {
super(null, true); super(null, true);
@ -861,6 +922,4 @@ public class SslHandler
return false; return false;
} }
} }
} }

View File

@ -16,12 +16,12 @@
package io.netty.channel; package io.netty.channel;
import java.util.ArrayDeque; import java.util.ArrayDeque;
import java.util.Deque; import java.util.Queue;
public class ChannelFlushFutureNotifier implements ChannelFutureListener { public class ChannelFlushFutureNotifier {
private long writeCounter; private long writeCounter;
private final Deque<FlushCheckpoint> flushCheckpoints = new ArrayDeque<FlushCheckpoint>(); private final Queue<FlushCheckpoint> flushCheckpoints = new ArrayDeque<FlushCheckpoint>();
public void addFlushFuture(ChannelFuture future, int pendingDataSize) { public void addFlushFuture(ChannelFuture future, int pendingDataSize) {
long checkpoint = writeCounter + pendingDataSize; long checkpoint = writeCounter + pendingDataSize;
@ -39,7 +39,34 @@ public class ChannelFlushFutureNotifier implements ChannelFutureListener {
} }
public void notifyFlushFutures() { public void notifyFlushFutures() {
notifyFlushFutures0(null);
}
public void notifyFlushFutures(Throwable cause) {
notifyFlushFutures();
for (;;) {
FlushCheckpoint cp = flushCheckpoints.poll();
if (cp == null) {
break;
}
cp.future().setFailure(cause);
}
}
public void notifyFlushFutures(Throwable cause1, Throwable cause2) {
notifyFlushFutures0(cause1);
for (;;) {
FlushCheckpoint cp = flushCheckpoints.poll();
if (cp == null) {
break;
}
cp.future().setFailure(cause2);
}
}
private void notifyFlushFutures0(Throwable cause) {
if (flushCheckpoints.isEmpty()) { if (flushCheckpoints.isEmpty()) {
writeCounter = 0;
return; return;
} }
@ -61,7 +88,11 @@ public class ChannelFlushFutureNotifier implements ChannelFutureListener {
} }
flushCheckpoints.remove(); flushCheckpoints.remove();
if (cause == null) {
cp.future().setSuccess(); cp.future().setSuccess();
} else {
cp.future().setFailure(cause);
}
} }
// Avoid overflow // Avoid overflow
@ -76,26 +107,6 @@ public class ChannelFlushFutureNotifier implements ChannelFutureListener {
} }
} }
public void notifyFlushFutures(Throwable cause) {
notifyFlushFutures();
for (;;) {
FlushCheckpoint cp = flushCheckpoints.poll();
if (cp == null) {
break;
}
cp.future().setFailure(cause);
}
}
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
notifyFlushFutures();
} else {
notifyFlushFutures(future.cause());
}
}
abstract static class FlushCheckpoint { abstract static class FlushCheckpoint {
abstract long flushCheckpoint(); abstract long flushCheckpoint();
abstract void flushCheckpoint(long checkpoint); abstract void flushCheckpoint(long checkpoint);