diff --git a/handler/src/main/java/io/netty/handler/ssl/SslHandler.java b/handler/src/main/java/io/netty/handler/ssl/SslHandler.java index 4cd8013ba0..82fc18657a 100644 --- a/handler/src/main/java/io/netty/handler/ssl/SslHandler.java +++ b/handler/src/main/java/io/netty/handler/ssl/SslHandler.java @@ -794,7 +794,10 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH return; } else { if (buf.isReadable()) { - pendingUnencryptedWrites.addFirst(buf); + pendingUnencryptedWrites.addFirst(buf, promise); + // When we add the buffer/promise pair back we need to be sure we don't complete the promise + // later in finishWrap. We only complete the promise if the buffer is completely consumed. + promise = null; } else { buf.release(); } @@ -1515,9 +1518,13 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH notifyHandshakeFailure(cause); } finally { // Ensure we remove and fail all pending writes in all cases and so release memory quickly. - if (pendingUnencryptedWrites != null) { - pendingUnencryptedWrites.releaseAndFailAll(ctx, cause); - } + releaseAndFailAll(cause); + } + } + + private void releaseAndFailAll(Throwable cause) { + if (pendingUnencryptedWrites != null) { + pendingUnencryptedWrites.releaseAndFailAll(ctx, cause); } } @@ -1701,7 +1708,11 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH if (promise.isDone()) { return; } - notifyHandshakeFailure(HANDSHAKE_TIMED_OUT); + try { + notifyHandshakeFailure(HANDSHAKE_TIMED_OUT); + } finally { + releaseAndFailAll(HANDSHAKE_TIMED_OUT); + } } }, handshakeTimeoutMillis, TimeUnit.MILLISECONDS); diff --git a/handler/src/test/java/io/netty/handler/ssl/SslHandlerTest.java b/handler/src/test/java/io/netty/handler/ssl/SslHandlerTest.java index 1e78a6e47d..0afe89ccf7 100644 --- a/handler/src/test/java/io/netty/handler/ssl/SslHandlerTest.java +++ b/handler/src/test/java/io/netty/handler/ssl/SslHandlerTest.java @@ -30,6 +30,7 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOutboundHandlerAdapter; +import io.netty.channel.ChannelPromise; import io.netty.channel.EventLoopGroup; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.embedded.EmbeddedChannel; @@ -57,6 +58,7 @@ import java.io.File; import java.net.InetSocketAddress; import java.nio.channels.ClosedChannelException; import java.security.KeyStore; +import java.security.NoSuchAlgorithmException; import java.security.cert.CertificateException; import java.security.cert.X509Certificate; import java.util.concurrent.BlockingQueue; @@ -185,6 +187,22 @@ public class SslHandlerTest { } } + @Test + public void testIncompleteWriteDoesNotCompletePromisePrematurely() throws NoSuchAlgorithmException { + SSLEngine engine = SSLContext.getDefault().createSSLEngine(); + engine.setUseClientMode(false); + + EmbeddedChannel ch = new EmbeddedChannel(new SslHandler(engine)); + + ChannelPromise promise = ch.newPromise(); + ByteBuf buf = Unpooled.buffer(10).writeZero(10); + ch.writeAndFlush(buf, promise); + assertFalse(promise.isDone()); + assertTrue(ch.finishAndReleaseAll()); + assertTrue(promise.isDone()); + assertThat(promise.cause(), is(instanceOf(SSLException.class))); + } + @Test public void testReleaseSslEngine() throws Exception { assumeTrue(OpenSsl.isAvailable()); diff --git a/transport/src/main/java/io/netty/channel/AbstractCoalescingBufferQueue.java b/transport/src/main/java/io/netty/channel/AbstractCoalescingBufferQueue.java index 414847e894..de9839fa61 100644 --- a/transport/src/main/java/io/netty/channel/AbstractCoalescingBufferQueue.java +++ b/transport/src/main/java/io/netty/channel/AbstractCoalescingBufferQueue.java @@ -50,10 +50,11 @@ public abstract class AbstractCoalescingBufferQueue { /** * Add a buffer to the front of the queue. */ - public final void addFirst(ByteBuf buf) { + public final void addFirst(ByteBuf buf, ChannelPromise promise) { // Listener would be added here, but since it is null there is no need. The assumption is there is already a // listener at the front of the queue, or there is a buffer at the front of the queue, which was spliced from // buf via remove(). + bufAndListenerPairs.addFirst(new DelegatingChannelPromiseNotifier(promise)); bufAndListenerPairs.addFirst(buf); incrementReadableBytes(buf.readableBytes());