SslHandler promise completion incorrect if write doesn't immediately

complete

Motivation:
SslHandler removes a Buffer/Promise pair from
AbstractCoalescingBufferQueue when wrapping data. However it is possible
the SSLEngine will not consume the entire buffer. In this case
SslHandler adds the Buffer back to the queue, but doesn't add the
Promise back to the queue. This may result in the promise completing
immediately in finishFlush, and generally not correlating to the
completion of writing the corresponding Buffer

Modifications:
- AbstractCoalescingBufferQueue#addFirst should also support adding the
ChannelPromise
- In the event of a handshake timeout we should immediately fail pending
writes immediately to get a more accurate exception

Result:
Fixes https://github.com/netty/netty/issues/7378.
This commit is contained in:
Scott Mitchell 2017-11-07 07:46:14 -08:00
parent 8618a3351c
commit 8c5eeb581e
3 changed files with 36 additions and 6 deletions

View File

@ -794,7 +794,10 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH
return;
} else {
if (buf.isReadable()) {
pendingUnencryptedWrites.addFirst(buf);
pendingUnencryptedWrites.addFirst(buf, promise);
// When we add the buffer/promise pair back we need to be sure we don't complete the promise
// later in finishWrap. We only complete the promise if the buffer is completely consumed.
promise = null;
} else {
buf.release();
}
@ -1515,9 +1518,13 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH
notifyHandshakeFailure(cause);
} finally {
// Ensure we remove and fail all pending writes in all cases and so release memory quickly.
if (pendingUnencryptedWrites != null) {
pendingUnencryptedWrites.releaseAndFailAll(ctx, cause);
}
releaseAndFailAll(cause);
}
}
private void releaseAndFailAll(Throwable cause) {
if (pendingUnencryptedWrites != null) {
pendingUnencryptedWrites.releaseAndFailAll(ctx, cause);
}
}
@ -1701,7 +1708,11 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH
if (promise.isDone()) {
return;
}
notifyHandshakeFailure(HANDSHAKE_TIMED_OUT);
try {
notifyHandshakeFailure(HANDSHAKE_TIMED_OUT);
} finally {
releaseAndFailAll(HANDSHAKE_TIMED_OUT);
}
}
}, handshakeTimeoutMillis, TimeUnit.MILLISECONDS);

View File

@ -30,6 +30,7 @@ import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.embedded.EmbeddedChannel;
@ -57,6 +58,7 @@ import java.io.File;
import java.net.InetSocketAddress;
import java.nio.channels.ClosedChannelException;
import java.security.KeyStore;
import java.security.NoSuchAlgorithmException;
import java.security.cert.CertificateException;
import java.security.cert.X509Certificate;
import java.util.concurrent.BlockingQueue;
@ -185,6 +187,22 @@ public class SslHandlerTest {
}
}
@Test
public void testIncompleteWriteDoesNotCompletePromisePrematurely() throws NoSuchAlgorithmException {
SSLEngine engine = SSLContext.getDefault().createSSLEngine();
engine.setUseClientMode(false);
EmbeddedChannel ch = new EmbeddedChannel(new SslHandler(engine));
ChannelPromise promise = ch.newPromise();
ByteBuf buf = Unpooled.buffer(10).writeZero(10);
ch.writeAndFlush(buf, promise);
assertFalse(promise.isDone());
assertTrue(ch.finishAndReleaseAll());
assertTrue(promise.isDone());
assertThat(promise.cause(), is(instanceOf(SSLException.class)));
}
@Test
public void testReleaseSslEngine() throws Exception {
assumeTrue(OpenSsl.isAvailable());

View File

@ -50,10 +50,11 @@ public abstract class AbstractCoalescingBufferQueue {
/**
* Add a buffer to the front of the queue.
*/
public final void addFirst(ByteBuf buf) {
public final void addFirst(ByteBuf buf, ChannelPromise promise) {
// Listener would be added here, but since it is null there is no need. The assumption is there is already a
// listener at the front of the queue, or there is a buffer at the front of the queue, which was spliced from
// buf via remove().
bufAndListenerPairs.addFirst(new DelegatingChannelPromiseNotifier(promise));
bufAndListenerPairs.addFirst(buf);
incrementReadableBytes(buf.readableBytes());