diff --git a/handler/src/main/java/io/netty/handler/ssl/SslHandler.java b/handler/src/main/java/io/netty/handler/ssl/SslHandler.java index 948d2489a0..6d67729c14 100644 --- a/handler/src/main/java/io/netty/handler/ssl/SslHandler.java +++ b/handler/src/main/java/io/netty/handler/ssl/SslHandler.java @@ -162,22 +162,7 @@ public class SslHandler private volatile ChannelHandlerContext ctx; private final SSLEngine engine; private final Executor delegatedTaskExecutor; - private final ChannelFlushFutureNotifier flushFutureNotifier = new ChannelFlushFutureNotifier() { - @Override - public synchronized void increaseWriteCounter(long delta) { - super.increaseWriteCounter(delta); - } - - @Override - public synchronized void notifyFlushFutures() { - super.notifyFlushFutures(); - } - - @Override - public synchronized void notifyFlushFutures(Throwable cause) { - super.notifyFlushFutures(cause); - } - }; + private final ChannelFlushFutureNotifier flushFutureNotifier = new ChannelFlushFutureNotifier(); private final boolean startTls; private boolean sentFirstMessage; @@ -285,6 +270,7 @@ public class SslHandler } catch (Exception e) { future.setFailure(e); ctx.fireExceptionCaught(e); + ctx.close(); } } }); @@ -363,14 +349,20 @@ public class SslHandler return; } - flushFutureNotifier.addFlushFuture(future, in.readableBytes()); + if (ctx.executor() == ctx.channel().eventLoop()) { + flushFutureNotifier.addFlushFuture(future, in.readableBytes()); + } else { + synchronized (flushFutureNotifier) { + flushFutureNotifier.addFlushFuture(future, in.readableBytes()); + } + } boolean unwrapLater = false; - int bytesProduced = 0; + int bytesConsumed = 0; try { for (;;) { SSLEngineResult result = wrap(engine, in, out); - bytesProduced += result.bytesProduced(); + bytesConsumed += result.bytesConsumed(); if (result.getStatus() == Status.CLOSED) { // SSLEngine has been closed already. // Any further write attempts should be denied. @@ -379,6 +371,8 @@ public class SslHandler SSLException e = new SSLException("SSLEngine already closed"); future.setFailure(e); ctx.fireExceptionCaught(e); + flush0(ctx, bytesConsumed, e); + bytesConsumed = 0; } break; } else { @@ -417,11 +411,61 @@ public class SslHandler throw e; } finally { in.unsafe().discardSomeReadBytes(); - flushFutureNotifier.increaseWriteCounter(bytesProduced); - ctx.flush(ctx.newFuture().addListener(flushFutureNotifier)); + flush0(ctx, bytesConsumed); } } + private void flush0(final ChannelHandlerContext ctx, final int bytesConsumed) { + ctx.flush(ctx.newFuture().addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + if (ctx.executor() == ctx.channel().eventLoop()) { + notifyFlushFutures(bytesConsumed, future); + } else { + synchronized (flushFutureNotifier) { + notifyFlushFutures(bytesConsumed, future); + } + } + } + + private void notifyFlushFutures(final int bytesConsumed, ChannelFuture future) { + if (future.isSuccess()) { + flushFutureNotifier.increaseWriteCounter(bytesConsumed); + flushFutureNotifier.notifyFlushFutures(); + } else { + flushFutureNotifier.notifyFlushFutures(future.cause()); + } + } + })); + } + + private void flush0(final ChannelHandlerContext ctx, final int bytesConsumed, final Throwable cause) { + ChannelFuture flushFuture = ctx.flush(ctx.newFuture().addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + if (ctx.executor() == ctx.channel().eventLoop()) { + notifyFlushFutures(ctx, bytesConsumed, cause, future); + } else { + synchronized (flushFutureNotifier) { + notifyFlushFutures(ctx, bytesConsumed, cause, future); + } + } + } + + private void notifyFlushFutures(final ChannelHandlerContext ctx, + final int bytesConsumed, final Throwable cause, ChannelFuture future) { + flushFutureNotifier.increaseWriteCounter(bytesConsumed); + if (future.isSuccess()) { + flushFutureNotifier.notifyFlushFutures(cause); + } else { + flushFutureNotifier.notifyFlushFutures(cause, future.cause()); + } + } + })); + + safeClose(ctx, flushFuture, ctx.newFuture()); + } + private static SSLEngineResult wrap(SSLEngine engine, ByteBuf in, ByteBuf out) throws SSLException { ByteBuffer in0 = in.nioBuffer(); for (;;) { @@ -614,6 +658,7 @@ public class SslHandler NotSslRecordException e = new NotSslRecordException( "not an SSL/TLS record: " + ByteBufUtil.hexDump(in)); in.skipBytes(in.readableBytes()); + setHandshakeFailure(e); throw e; } } @@ -738,16 +783,19 @@ public class SslHandler } } + if (cause == null) { + cause = new ClosedChannelException(); + } + for (;;) { ChannelFuture f = handshakeFutures.poll(); if (f == null) { break; } - if (cause == null) { - cause = new ClosedChannelException(); - } f.setFailure(cause); } + + flush0(ctx, 0, cause); } private void closeOutboundAndChannel( @@ -765,26 +813,7 @@ public class SslHandler ChannelFuture closeNotifyFuture = ctx.newFuture(); flush(ctx, closeNotifyFuture); - - // Force-close the connection if close_notify is not fully sent in time. - final ScheduledFuture timeoutFuture = ctx.executor().schedule(new Runnable() { - @Override - public void run() { - logger.warn(ctx.channel() + "close_notify write attempt timed out. Force-closing the connection."); - ctx.close(future); - } - }, 3, TimeUnit.SECONDS); // FIXME: Magic value - - // Close the connection if close_notify is sent in time. - closeNotifyFuture.addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture f) - throws Exception { - if (timeoutFuture.cancel(false)) { - ctx.close(future); - } - } - }); + safeClose(ctx, closeNotifyFuture, future); } @Override @@ -813,11 +842,11 @@ public class SslHandler // issue and handshake and add a listener to it which will fire an exception event if // an exception was thrown while doing the handshake handshake().addListener(new ChannelFutureListener() { - @Override public void operationComplete(ChannelFuture future) throws Exception { if (!future.isSuccess()) { ctx.pipeline().fireExceptionCaught(future.cause()); + ctx.close(); } else { // Send the event upstream after the handshake was completed without an error. // @@ -832,6 +861,38 @@ public class SslHandler } } + private static void safeClose( + final ChannelHandlerContext ctx, ChannelFuture flushFuture, + final ChannelFuture closeFuture) { + if (!ctx.channel().isActive()) { + ctx.close(closeFuture); + return; + } + + // Force-close the connection if close_notify is not fully sent in time. + final ScheduledFuture timeoutFuture = ctx.executor().schedule(new Runnable() { + @Override + public void run() { + logger.warn( + ctx.channel() + " last lssssswrite attempt timed out." + + " Force-closing the connection."); + ctx.close(closeFuture); + } + }, 3, TimeUnit.SECONDS); // FIXME: Magic value + + // Close the connection if close_notify is sent in time. + flushFuture.addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture f) + throws Exception { + timeoutFuture.cancel(false); + if (ctx.channel().isActive()) { + ctx.close(closeFuture); + } + } + }); + } + private final class SSLEngineInboundCloseFuture extends DefaultChannelFuture { public SSLEngineInboundCloseFuture() { super(null, true); @@ -861,6 +922,4 @@ public class SslHandler return false; } } - - } diff --git a/transport/src/main/java/io/netty/channel/ChannelFlushFutureNotifier.java b/transport/src/main/java/io/netty/channel/ChannelFlushFutureNotifier.java index efebf3f6be..16be01f62a 100644 --- a/transport/src/main/java/io/netty/channel/ChannelFlushFutureNotifier.java +++ b/transport/src/main/java/io/netty/channel/ChannelFlushFutureNotifier.java @@ -16,12 +16,12 @@ package io.netty.channel; import java.util.ArrayDeque; -import java.util.Deque; +import java.util.Queue; -public class ChannelFlushFutureNotifier implements ChannelFutureListener { +public class ChannelFlushFutureNotifier { private long writeCounter; - private final Deque flushCheckpoints = new ArrayDeque(); + private final Queue flushCheckpoints = new ArrayDeque(); public void addFlushFuture(ChannelFuture future, int pendingDataSize) { long checkpoint = writeCounter + pendingDataSize; @@ -39,7 +39,34 @@ public class ChannelFlushFutureNotifier implements ChannelFutureListener { } public void notifyFlushFutures() { + notifyFlushFutures0(null); + } + + public void notifyFlushFutures(Throwable cause) { + notifyFlushFutures(); + for (;;) { + FlushCheckpoint cp = flushCheckpoints.poll(); + if (cp == null) { + break; + } + cp.future().setFailure(cause); + } + } + + public void notifyFlushFutures(Throwable cause1, Throwable cause2) { + notifyFlushFutures0(cause1); + for (;;) { + FlushCheckpoint cp = flushCheckpoints.poll(); + if (cp == null) { + break; + } + cp.future().setFailure(cause2); + } + } + + private void notifyFlushFutures0(Throwable cause) { if (flushCheckpoints.isEmpty()) { + writeCounter = 0; return; } @@ -61,7 +88,11 @@ public class ChannelFlushFutureNotifier implements ChannelFutureListener { } flushCheckpoints.remove(); - cp.future().setSuccess(); + if (cause == null) { + cp.future().setSuccess(); + } else { + cp.future().setFailure(cause); + } } // Avoid overflow @@ -76,26 +107,6 @@ public class ChannelFlushFutureNotifier implements ChannelFutureListener { } } - public void notifyFlushFutures(Throwable cause) { - notifyFlushFutures(); - for (;;) { - FlushCheckpoint cp = flushCheckpoints.poll(); - if (cp == null) { - break; - } - cp.future().setFailure(cause); - } - } - - @Override - public void operationComplete(ChannelFuture future) throws Exception { - if (future.isSuccess()) { - notifyFlushFutures(); - } else { - notifyFlushFutures(future.cause()); - } - } - abstract static class FlushCheckpoint { abstract long flushCheckpoint(); abstract void flushCheckpoint(long checkpoint);