From 50fafdc3d32a67817050f011a39dd3e1bfbaac40 Mon Sep 17 00:00:00 2001 From: Trustin Lee Date: Fri, 8 Jun 2012 01:22:35 +0900 Subject: [PATCH] Rewrite SslHandler / Reduce the chance of OIO-OIO dead lock - SslHandler always begins handshake unless startTls is true - Removed issueHandshake property - If a user wants to start handshake later, he/she has to add SslHandler later. - Removed enableRenegotiation property - JDK upgrade fixes the security vulnerability - no need to complicate our code - Some property name changes - getSSLEngineInboundCloseFuture() -> sslCloseFuture() - Updated securechat example - Added timeout for handshake and close_notify for better security - However, it's currently hard-coded. Will make it a property later. --- .../securechat/SecureChatClientHandler.java | 11 - .../securechat/SecureChatServerHandler.java | 48 +- handler/pom.xml | 5 - .../java/io/netty/handler/ssl/SslHandler.java | 952 +++++------------- .../transport/socket/SocketSslEchoTest.java | 10 +- .../socket/oio/AbstractOioChannel.java | 2 + .../channel/socket/oio/OioChildEventLoop.java | 16 +- .../socket/oio/OioDatagramChannel.java | 2 +- .../socket/oio/OioServerSocketChannel.java | 2 +- .../channel/socket/oio/OioSocketChannel.java | 2 +- 10 files changed, 290 insertions(+), 760 deletions(-) diff --git a/example/src/main/java/io/netty/example/securechat/SecureChatClientHandler.java b/example/src/main/java/io/netty/example/securechat/SecureChatClientHandler.java index 25aa94b7b4..4573fd6c75 100644 --- a/example/src/main/java/io/netty/example/securechat/SecureChatClientHandler.java +++ b/example/src/main/java/io/netty/example/securechat/SecureChatClientHandler.java @@ -17,7 +17,6 @@ package io.netty.example.securechat; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundMessageHandlerAdapter; -import io.netty.handler.ssl.SslHandler; import java.util.logging.Level; import java.util.logging.Logger; @@ -30,16 +29,6 @@ public class SecureChatClientHandler extends ChannelInboundMessageHandlerAdapter private static final Logger logger = Logger.getLogger( SecureChatClientHandler.class.getName()); - @Override - public void channelActive(ChannelHandlerContext ctx) throws Exception { - // Get the SslHandler from the pipeline - // which were added in SecureChatPipelineFactory. - SslHandler sslHandler = ctx.pipeline().get(SslHandler.class); - - // Begin handshake. - sslHandler.handshake(); - } - @Override public void messageReceived(ChannelHandlerContext ctx, String msg) throws Exception { System.err.println(msg); diff --git a/example/src/main/java/io/netty/example/securechat/SecureChatServerHandler.java b/example/src/main/java/io/netty/example/securechat/SecureChatServerHandler.java index c62820ffdc..a4a2b36357 100644 --- a/example/src/main/java/io/netty/example/securechat/SecureChatServerHandler.java +++ b/example/src/main/java/io/netty/example/securechat/SecureChatServerHandler.java @@ -16,8 +16,6 @@ package io.netty.example.securechat; import io.netty.channel.Channel; -import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundMessageHandlerAdapter; import io.netty.channel.group.ChannelGroup; @@ -40,13 +38,18 @@ public class SecureChatServerHandler extends ChannelInboundMessageHandlerAdapter @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { - // Get the SslHandler in the current pipeline. - // We added it in SecureChatPipelineFactory. - final SslHandler sslHandler = ctx.pipeline().get(SslHandler.class); + // Once session is secured, send a greeting. + ctx.write( + "Welcome to " + InetAddress.getLocalHost().getHostName() + + " secure chat service!\n"); + ctx.write( + "Your session is protected by " + + ctx.pipeline().get(SslHandler.class).getEngine().getSession().getCipherSuite() + + " cipher suite.\n"); - // Get notified when SSL handshake is done. - ChannelFuture handshakeFuture = sslHandler.handshake(); - handshakeFuture.addListener(new Greeter(sslHandler)); + // Register the channel to the global channel list + // so the channel received the messages from others. + channels.add(ctx.channel()); } @Override @@ -74,33 +77,4 @@ public class SecureChatServerHandler extends ChannelInboundMessageHandlerAdapter "Unexpected exception from downstream.", cause); ctx.close(); } - - private static final class Greeter implements ChannelFutureListener { - - private final SslHandler sslHandler; - - Greeter(SslHandler sslHandler) { - this.sslHandler = sslHandler; - } - - @Override - public void operationComplete(ChannelFuture future) throws Exception { - if (future.isSuccess()) { - // Once session is secured, send a greeting. - future.channel().write( - "Welcome to " + InetAddress.getLocalHost().getHostName() + - " secure chat service!\n"); - future.channel().write( - "Your session is protected by " + - sslHandler.getEngine().getSession().getCipherSuite() + - " cipher suite.\n"); - - // Register the channel to the global channel list - // so the channel received the messages from others. - channels.add(future.channel()); - } else { - future.channel().close(); - } - } - } } diff --git a/handler/pom.xml b/handler/pom.xml index 5f39fea800..3aa45af965 100644 --- a/handler/pom.xml +++ b/handler/pom.xml @@ -39,11 +39,6 @@ netty-transport ${project.version} - - ${project.groupId} - netty-codec - ${project.version} - diff --git a/handler/src/main/java/io/netty/handler/ssl/SslHandler.java b/handler/src/main/java/io/netty/handler/ssl/SslHandler.java index 1b654782e7..4c4e95ff45 100644 --- a/handler/src/main/java/io/netty/handler/ssl/SslHandler.java +++ b/handler/src/main/java/io/netty/handler/ssl/SslHandler.java @@ -16,26 +16,32 @@ package io.netty.handler.ssl; import io.netty.buffer.ChannelBuffer; -import io.netty.buffer.ChannelBuffers; import io.netty.channel.Channel; +import io.netty.channel.ChannelBufferHolder; +import io.netty.channel.ChannelBufferHolders; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandler; +import io.netty.channel.ChannelOutboundHandler; import io.netty.channel.ChannelPipeline; import io.netty.channel.DefaultChannelFuture; -import io.netty.handler.codec.StreamToStreamCodec; import io.netty.logging.InternalLogger; import io.netty.logging.InternalLoggerFactory; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.ClosedChannelException; +import java.util.ArrayDeque; +import java.util.Queue; import java.util.concurrent.Executor; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; import java.util.regex.Pattern; import javax.net.ssl.SSLEngine; import javax.net.ssl.SSLEngineResult; -import javax.net.ssl.SSLEngineResult.HandshakeStatus; import javax.net.ssl.SSLEngineResult.Status; import javax.net.ssl.SSLException; @@ -134,55 +140,26 @@ import javax.net.ssl.SSLException; * @apiviz.landmark * @apiviz.uses io.netty.handler.ssl.SslBufferPool */ -public class SslHandler extends StreamToStreamCodec { +public class SslHandler + extends ChannelHandlerAdapter + implements ChannelInboundHandler, ChannelOutboundHandler { private static final InternalLogger logger = InternalLoggerFactory.getInstance(SslHandler.class); - private static final ByteBuffer EMPTY_BUFFER = ByteBuffer.allocate(0); - private static final Pattern IGNORABLE_ERROR_MESSAGE = Pattern.compile( "^.*(?:connection.*reset|connection.*closed|broken.*pipe).*$", Pattern.CASE_INSENSITIVE); - private static SslBufferPool defaultBufferPool; - - /** - * Returns the default {@link SslBufferPool} used when no pool is - * specified in the constructor. - */ - public static synchronized SslBufferPool getDefaultBufferPool() { - if (defaultBufferPool == null) { - defaultBufferPool = new DefaultSslBufferPool(); - } - return defaultBufferPool; - } - - private ChannelHandlerContext ctx; + private volatile ChannelHandlerContext ctx; private final SSLEngine engine; - private final SslBufferPool bufferPool; private final Executor delegatedTaskExecutor; private final boolean startTls; private boolean sentFirstMessage; - private volatile boolean enableRenegotiation = true; - - final Object handshakeLock = new Object(); - - private boolean handshaking; - private volatile boolean handshaken; - private ChannelFuture handshakeFuture; - - private boolean sentCloseNotify; - - int ignoreClosedChannelException; - final Object ignoreClosedChannelExceptionLock = new Object(); - private volatile boolean issueHandshake; - - private final SSLEngineInboundCloseFuture sslEngineCloseFuture = new SSLEngineInboundCloseFuture(); - - private int packetLength = -1; + private final Queue handshakeFutures = new ArrayDeque(); + private final SSLEngineInboundCloseFuture sslCloseFuture = new SSLEngineInboundCloseFuture(); /** * Creates a new instance. @@ -190,18 +167,7 @@ public class SslHandler extends StreamToStreamCodec { * @param engine the {@link SSLEngine} this handler will use */ public SslHandler(SSLEngine engine) { - this(engine, getDefaultBufferPool(), ImmediateExecutor.INSTANCE); - } - - /** - * Creates a new instance. - * - * @param engine the {@link SSLEngine} this handler will use - * @param bufferPool the {@link SslBufferPool} where this handler will - * acquire the buffers required by the {@link SSLEngine} - */ - public SslHandler(SSLEngine engine, SslBufferPool bufferPool) { - this(engine, bufferPool, ImmediateExecutor.INSTANCE); + this(engine, ImmediateExecutor.INSTANCE); } /** @@ -212,20 +178,7 @@ public class SslHandler extends StreamToStreamCodec { * encrypted by the {@link SSLEngine} */ public SslHandler(SSLEngine engine, boolean startTls) { - this(engine, getDefaultBufferPool(), startTls); - } - - /** - * Creates a new instance. - * - * @param engine the {@link SSLEngine} this handler will use - * @param bufferPool the {@link SslBufferPool} where this handler will - * acquire the buffers required by the {@link SSLEngine} - * @param startTls {@code true} if the first write request shouldn't be - * encrypted by the {@link SSLEngine} - */ - public SslHandler(SSLEngine engine, SslBufferPool bufferPool, boolean startTls) { - this(engine, bufferPool, startTls, ImmediateExecutor.INSTANCE); + this(engine, startTls, ImmediateExecutor.INSTANCE); } /** @@ -238,23 +191,7 @@ public class SslHandler extends StreamToStreamCodec { * that {@link SSLEngine#getDelegatedTask()} will return */ public SslHandler(SSLEngine engine, Executor delegatedTaskExecutor) { - this(engine, getDefaultBufferPool(), delegatedTaskExecutor); - } - - /** - * Creates a new instance. - * - * @param engine - * the {@link SSLEngine} this handler will use - * @param bufferPool - * the {@link SslBufferPool} where this handler will acquire - * the buffers required by the {@link SSLEngine} - * @param delegatedTaskExecutor - * the {@link Executor} which will execute the delegated task - * that {@link SSLEngine#getDelegatedTask()} will return - */ - public SslHandler(SSLEngine engine, SslBufferPool bufferPool, Executor delegatedTaskExecutor) { - this(engine, bufferPool, false, delegatedTaskExecutor); + this(engine, false, delegatedTaskExecutor); } /** @@ -270,36 +207,13 @@ public class SslHandler extends StreamToStreamCodec { * that {@link SSLEngine#getDelegatedTask()} will return */ public SslHandler(SSLEngine engine, boolean startTls, Executor delegatedTaskExecutor) { - this(engine, getDefaultBufferPool(), startTls, delegatedTaskExecutor); - } - - /** - * Creates a new instance. - * - * @param engine - * the {@link SSLEngine} this handler will use - * @param bufferPool - * the {@link SslBufferPool} where this handler will acquire - * the buffers required by the {@link SSLEngine} - * @param startTls - * {@code true} if the first write request shouldn't be encrypted - * by the {@link SSLEngine} - * @param delegatedTaskExecutor - * the {@link Executor} which will execute the delegated task - * that {@link SSLEngine#getDelegatedTask()} will return - */ - public SslHandler(SSLEngine engine, SslBufferPool bufferPool, boolean startTls, Executor delegatedTaskExecutor) { if (engine == null) { throw new NullPointerException("engine"); } - if (bufferPool == null) { - throw new NullPointerException("bufferPool"); - } if (delegatedTaskExecutor == null) { throw new NullPointerException("delegatedTaskExecutor"); } this.engine = engine; - this.bufferPool = bufferPool; this.delegatedTaskExecutor = delegatedTaskExecutor; this.startTls = startTls; } @@ -311,69 +225,47 @@ public class SslHandler extends StreamToStreamCodec { return engine; } + public ChannelFuture handshake() { + return handshake(ctx.newFuture()); + } + /** * Starts an SSL / TLS handshake for the specified channel. * * @return a {@link ChannelFuture} which is notified when the handshake * succeeds or fails. */ - public ChannelFuture handshake() { - if (handshaken && !isEnableRenegotiation()) { - throw new IllegalStateException("renegotiation disabled"); - } + public ChannelFuture handshake(final ChannelFuture future) { + final ChannelHandlerContext ctx = this.ctx; - Channel channel = ctx.channel(); - Exception exception = null; - - synchronized (handshakeLock) { - if (handshaking) { - return handshakeFuture; - } else { - handshaking = true; - if (ctx.executor().inEventLoop()) { - try { - engine.beginHandshake(); - runDelegatedTasks(); - wrapNonAppData(ctx, channel); - - } catch (Exception e) { - exception = e; - } - - } else { - ctx.executor().execute(new Runnable() { - - @Override - public void run() { - Throwable exception = null; - synchronized (handshakeLock) { - try { - - engine.beginHandshake(); - runDelegatedTasks(); - wrapNonAppData(ctx, ctx.channel()); - - } catch (Exception e) { - exception = e; - } - } - if (exception != null) { // Failed to initiate handshake. - handshakeFuture.setFailure(exception); - ctx.fireExceptionCaught(exception); - } - } - }); + ctx.executor().schedule(new Runnable() { + @Override + public void run() { + if (future.isDone()) { + return; } + SSLException e = new SSLException("handshake timed out"); + future.setFailure(e); + ctx.fireExceptionCaught(e); + ctx.close(); } - } + }, 10, TimeUnit.SECONDS); // FIXME: Magic value + ctx.executor().execute(new Runnable() { + @Override + public void run() { + try { + engine.beginHandshake(); + handshakeFutures.add(future); + flush(ctx, ctx.newFuture()); + } catch (Exception e) { + future.setFailure(e); + ctx.fireExceptionCaught(e); + } + } + }); - if (exception != null) { // Failed to initiate handshake. - handshakeFuture.setFailure(exception); - ctx.fireExceptionCaught(exception); - } - - return handshakeFuture; + return future; } /** @@ -381,45 +273,20 @@ public class SslHandler extends StreamToStreamCodec { * destroys the underlying {@link SSLEngine}. */ public ChannelFuture close() { - ChannelHandlerContext ctx = this.ctx; - Channel channel = ctx.channel(); - try { - engine.closeOutbound(); - return wrapNonAppData(ctx, channel); - } catch (SSLException e) { - ctx.fireExceptionCaught(e); - return channel.newFailedFuture(e); - } + return close(ctx.newFuture()); } - /** - * Returns {@code true} if and only if TLS renegotiation is enabled. - */ - public boolean isEnableRenegotiation() { - return enableRenegotiation; - } + public ChannelFuture close(final ChannelFuture future) { + final ChannelHandlerContext ctx = this.ctx; + ctx.executor().execute(new Runnable() { + @Override + public void run() { + engine.closeOutbound(); + ctx.flush(future); + } + }); - /** - * Enables or disables TLS renegotiation. - */ - public void setEnableRenegotiation(boolean enableRenegotiation) { - this.enableRenegotiation = enableRenegotiation; - } - - - /** - * Enables or disables the automatic handshake once the {@link Channel} is connected. The value will only have affect if its set before the - * {@link Channel} is connected. - */ - public void setIssueHandshake(boolean issueHandshake) { - this.issueHandshake = issueHandshake; - } - - /** - * Returns true if the automatic handshake is enabled - */ - public boolean isIssueHandshake() { - return issueHandshake; + return future; } /** @@ -430,8 +297,18 @@ public class SslHandler extends StreamToStreamCodec { * For more informations see the apidocs of {@link SSLEngine} * */ - public ChannelFuture getSSLEngineInboundCloseFuture() { - return sslEngineCloseFuture; + public ChannelFuture sslCloseFuture() { + return sslCloseFuture; + } + + @Override + public ChannelBufferHolder newOutboundBuffer(ChannelHandlerContext ctx) throws Exception { + return ChannelBufferHolders.byteBuffer(); + } + + @Override + public ChannelBufferHolder newInboundBuffer(ChannelHandlerContext ctx) throws Exception { + return ChannelBufferHolders.byteBuffer(); } @Override @@ -446,98 +323,93 @@ public class SslHandler extends StreamToStreamCodec { closeOutboundAndChannel(ctx, future, false); } - @Override - public void encode(ChannelHandlerContext ctx, ChannelBuffer in, ChannelBuffer out) throws Exception { + @Override + public void flush(final ChannelHandlerContext ctx, ChannelFuture future) throws Exception { + final ChannelBuffer in = ctx.outboundByteBuffer(); + final ChannelBuffer out = ctx.nextOutboundByteBuffer(); + + out.discardReadBytes(); // Do not encrypt the first write request if this handler is // created with startTLS flag turned on. if (startTls && !sentFirstMessage) { sentFirstMessage = true; out.writeBytes(in); + ctx.flush(future); return; } - ByteBuffer outNetBuf = bufferPool.acquireBuffer(); - boolean success = true; - boolean needsUnwrap = false; + boolean unwrapLater = false; + int bytesProduced = 0; try { - ByteBuffer outAppBuf = in.nioBuffer(); - - while (in.readable()) { - - int read; - int remaining = outAppBuf.remaining(); - SSLEngineResult result = null; - - synchronized (handshakeLock) { - result = engine.wrap(outAppBuf, outNetBuf); - } - read = remaining - outAppBuf.remaining(); - in.readerIndex(in.readerIndex() + read); - - - - if (result.bytesProduced() > 0) { - outNetBuf.flip(); - out.writeBytes(outNetBuf); - outNetBuf.clear(); - - - } else if (result.getStatus() == Status.CLOSED) { + loop: for (;;) { + SSLEngineResult result = wrap(engine, in, out); + bytesProduced += result.bytesProduced(); + if (result.getStatus() == Status.CLOSED) { // SSLEngine has been closed already. // Any further write attempts should be denied. - success = false; + if (in.readable()) { + in.clear(); + SSLException e = new SSLException("SSLEngine already closed"); + future.setFailure(e); + ctx.fireExceptionCaught(e); + } break; } else { - final HandshakeStatus handshakeStatus = result.getHandshakeStatus(); - handleRenegotiation(handshakeStatus); - switch (handshakeStatus) { + switch (result.getHandshakeStatus()) { case NEED_WRAP: - if (outAppBuf.hasRemaining()) { - break; - } else { - break; - } + ctx.flush(); + continue; case NEED_UNWRAP: - needsUnwrap = true; + if (ctx.inboundByteBuffer().readable()) { + unwrapLater = true; + } + break; case NEED_TASK: runDelegatedTasks(); - break; + continue; case FINISHED: + setHandshakeSuccess(); + continue; case NOT_HANDSHAKING: - if (handshakeStatus == HandshakeStatus.FINISHED) { - setHandshakeSuccess(ctx.channel()); - } - if (result.getStatus() == Status.CLOSED) { - success = false; - } break; default: - throw new IllegalStateException("Unknown handshake status: " + handshakeStatus); + throw new IllegalStateException("Unknown handshake status: " + result.getHandshakeStatus()); + } + + if (result.bytesConsumed() == 0 && result.bytesProduced() == 0) { + break loop; } } + } + if (unwrapLater) { + inboundBufferUpdated(ctx); } } catch (SSLException e) { - success = false; - setHandshakeFailure(ctx.channel(), e); + setHandshakeFailure(e); throw e; } finally { - bufferPool.releaseBuffer(outNetBuf); - - if (!success) { - // mark all bytes as read - in.readerIndex(in.readerIndex() + in.readableBytes()); - - throw new IllegalStateException("SSLEngine already closed"); - - + if (bytesProduced > 0) { + in.discardReadBytes(); + ctx.flush(future); } } + } - if (needsUnwrap) { - unwrap(ctx, ctx.channel(), ChannelBuffers.EMPTY_BUFFER, 0, 0, ChannelBuffers.EMPTY_BUFFER); + private static SSLEngineResult wrap(SSLEngine engine, ChannelBuffer in, ChannelBuffer out) throws SSLException { + ByteBuffer in0 = in.nioBuffer(); + for (;;) { + ByteBuffer out0 = out.nioBuffer(out.writerIndex(), out.writableBytes()); + SSLEngineResult result = engine.wrap(in0, out0); + in.skipBytes(result.bytesConsumed()); + out.writerIndex(out.writerIndex() + result.bytesProduced()); + if (result.getStatus() == Status.BUFFER_OVERFLOW) { + out.ensureWritableBytes(engine.getSession().getPacketBufferSize()); + } else { + return result; + } } } @@ -545,462 +417,177 @@ public class SslHandler extends StreamToStreamCodec { public void channelInactive(ChannelHandlerContext ctx) throws Exception { // Make sure the handshake future is notified when a connection has // been closed during handshake. - synchronized (handshakeLock) { - if (handshaking) { - handshakeFuture.setFailure(new ClosedChannelException()); - } - } + setHandshakeFailure(null); try { - super.channelInactive(ctx); + inboundBufferUpdated(ctx); } finally { - unwrap(ctx, ctx.channel(), ChannelBuffers.EMPTY_BUFFER, 0, 0, ChannelBuffers.EMPTY_BUFFER); engine.closeOutbound(); - if (!sentCloseNotify && handshaken) { - try { - engine.closeInbound(); - } catch (SSLException ex) { - if (logger.isDebugEnabled()) { - logger.debug("Failed to clean up SSLEngine.", ex); - } + try { + engine.closeInbound(); + } catch (SSLException ex) { + if (logger.isDebugEnabled()) { + logger.debug("Failed to clean up SSLEngine.", ex); } } + ctx.fireChannelInactive(); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { - if (cause instanceof IOException) { - if (cause instanceof ClosedChannelException) { - synchronized (ignoreClosedChannelExceptionLock) { - if (ignoreClosedChannelException > 0) { - ignoreClosedChannelException --; - if (logger.isDebugEnabled()) { - logger.debug( - "Swallowing an exception raised while " + - "writing non-app data", cause); - } - - return; - } + if (cause instanceof IOException && engine.isOutboundDone()) { + String message = String.valueOf(cause.getMessage()).toLowerCase(); + if (IGNORABLE_ERROR_MESSAGE.matcher(message).matches()) { + // It is safe to ignore the 'connection reset by peer' or + // 'broken pipe' error after sending closure_notify. + if (logger.isDebugEnabled()) { + logger.debug( + "Swallowing a 'connection reset by peer / " + + "broken pipe' error occurred while writing " + + "'closure_notify'", cause); } - } else if (engine.isOutboundDone()) { - String message = String.valueOf(cause.getMessage()).toLowerCase(); - if (IGNORABLE_ERROR_MESSAGE.matcher(message).matches()) { - // It is safe to ignore the 'connection reset by peer' or - // 'broken pipe' error after sending closure_notify. - if (logger.isDebugEnabled()) { - logger.debug( - "Swallowing a 'connection reset by peer / " + - "broken pipe' error occurred while writing " + - "'closure_notify'", cause); - } - // Close the connection explicitly just in case the transport - // did not close the connection automatically. - ctx.close(ctx.channel().newFuture()); - return; + // Close the connection explicitly just in case the transport + // did not close the connection automatically. + if (ctx.channel().isActive()) { + ctx.close(); } + return; } } super.exceptionCaught(ctx, cause); } - - @Override - public void decode(ChannelHandlerContext ctx, ChannelBuffer in, ChannelBuffer out) throws Exception { + public void inboundBufferUpdated(final ChannelHandlerContext ctx) throws Exception { + final ChannelBuffer in = ctx.inboundByteBuffer(); + final ChannelBuffer out = ctx.nextInboundByteBuffer(); + out.discardReadBytes(); - // check if the packet lenght was read before - if (packetLength == -1) { - if (in.readableBytes() < 5) { - return; - } - // SSLv3 or TLS - Check ContentType - boolean tls; - switch (in.getUnsignedByte(in.readerIndex())) { - case 20: // change_cipher_spec - case 21: // alert - case 22: // handshake - case 23: // application_data - tls = true; - break; - default: - // SSLv2 or bad data - tls = false; - } - - if (tls) { - // SSLv3 or TLS - Check ProtocolVersion - int majorVersion = in.getUnsignedByte(in.readerIndex() + 1); - if (majorVersion == 3) { - // SSLv3 or TLS - packetLength = (getShort(in, in.readerIndex() + 3) & 0xFFFF) + 5; - if (packetLength <= 5) { - // Neither SSLv3 or TLSv1 (i.e. SSLv2 or bad data) - tls = false; - } - } else { - // Neither SSLv3 or TLSv1 (i.e. SSLv2 or bad data) - tls = false; - } - } - - if (!tls) { - // SSLv2 or bad data - Check the version - boolean sslv2 = true; - int headerLength = (in.getUnsignedByte( - in.readerIndex()) & 0x80) != 0 ? 2 : 3; - int majorVersion = in.getUnsignedByte( - in.readerIndex() + headerLength + 1); - if (majorVersion == 2 || majorVersion == 3) { - // SSLv2 - if (headerLength == 2) { - packetLength = (getShort(in, in.readerIndex()) & 0x7FFF) + 2; - } else { - packetLength = (getShort(in, in.readerIndex()) & 0x3FFF) + 3; - } - if (packetLength <= headerLength) { - sslv2 = false; - } - } else { - sslv2 = false; - } - - if (!sslv2) { - // Bad data - discard the buffer and raise an exception. - SSLException e = new SSLException( - "not an SSL/TLS record: " + ChannelBuffers.hexDump(in)); - in.skipBytes(in.readableBytes()); - throw e; - } - } - - assert packetLength > 0; - } - - - if (in.readableBytes() < packetLength) { - // not enough bytes left to read the packet - // so return here for now - return; - } - - // We advance the buffer's readerIndex before calling unwrap() because - // unwrap() can trigger FrameDecoder call decode(), this method, recursively. - // The recursive call results in decoding the same packet twice if - // the readerIndex is advanced *after* decode(). - // - // Here's an example: - // 1) An SSL packet is received from the wire. - // 2) SslHandler.decode() deciphers the packet and calls the user code. - // 3) The user closes the channel in the same thread. - // 4) The same thread triggers a channelDisconnected() event. - // 5) FrameDecoder.cleanup() is called, and it calls SslHandler.decode(). - // 6) SslHandler.decode() will feed the same packet with what was - // deciphered at the step 2 again if the readerIndex was not advanced - // before calling the user code. - final int packetOffset = in.readerIndex(); - in.skipBytes(packetLength); + boolean wrapLater = false; + int bytesProduced = 0; try { - unwrap(ctx, ctx.channel(), in, packetOffset, packetLength, out); - } finally { - // reset packet length - packetLength = -1; - } - } - - /** - * Reads a big-endian short integer from the buffer. Please note that we do not use - * {@link ChannelBuffer#getShort(int)} because it might be a little-endian buffer. - */ - private static short getShort(ChannelBuffer buf, int offset) { - return (short) (buf.getByte(offset) << 8 | buf.getByte(offset + 1) & 0xFF); - } - - private ChannelFuture wrapNonAppData(ChannelHandlerContext ctx, Channel channel) throws SSLException { - ChannelFuture future = null; - ByteBuffer outNetBuf = bufferPool.acquireBuffer(); - - SSLEngineResult result; - try { - for (;;) { - synchronized (handshakeLock) { - result = engine.wrap(EMPTY_BUFFER, outNetBuf); - } - - if (result.bytesProduced() > 0) { - outNetBuf.flip(); - ChannelBuffer msg = ChannelBuffers.buffer(outNetBuf.remaining()); - // Transfer the bytes to the new ChannelBuffer using some safe method that will also - // work with "non" heap buffers - // - // See https://github.com/netty/netty/issues/329 - msg.writeBytes(outNetBuf); - outNetBuf.clear(); - - future = channel.newFuture(); - future.addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture future) - throws Exception { - if (future.cause() instanceof ClosedChannelException) { - synchronized (ignoreClosedChannelExceptionLock) { - ignoreClosedChannelException ++; - } - } - } - }); - - ctx.write(msg, future); - } - - final HandshakeStatus handshakeStatus = result.getHandshakeStatus(); - handleRenegotiation(handshakeStatus); - switch (handshakeStatus) { - case FINISHED: - setHandshakeSuccess(channel); - runDelegatedTasks(); - break; - case NEED_TASK: - runDelegatedTasks(); - break; - case NEED_UNWRAP: - if (!Thread.holdsLock(handshakeLock)) { - // unwrap shouldn't be called when this method was - // called by unwrap - unwrap will keep running after - // this method returns. - unwrap(ctx, channel, ChannelBuffers.EMPTY_BUFFER, 0, 0, ChannelBuffers.EMPTY_BUFFER); - } - break; - case NOT_HANDSHAKING: - case NEED_WRAP: - break; - default: - throw new IllegalStateException( - "Unexpected handshake status: " + handshakeStatus); - } - - if (result.bytesProduced() == 0) { - break; - } - } - } catch (SSLException e) { - setHandshakeFailure(channel, e); - throw e; - } finally { - bufferPool.releaseBuffer(outNetBuf); - } - - if (future == null) { - future = channel.newSucceededFuture(); - } - - return future; - } - - private void unwrap( - ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer, int offset, int length, ChannelBuffer out) throws SSLException { - ByteBuffer inNetBuf = buffer.nioBuffer(offset, length); - ByteBuffer outAppBuf = bufferPool.acquireBuffer(); - - try { - boolean needsWrap = false; loop: for (;;) { - SSLEngineResult result; - boolean needsHandshake = false; - synchronized (handshakeLock) { - if (!handshaken && !handshaking && - !engine.getUseClientMode() && - !engine.isInboundDone() && !engine.isOutboundDone()) { - needsHandshake = true; + SSLEngineResult result = unwrap(engine, in, out); + bytesProduced += result.bytesProduced(); - } - } - if (needsHandshake) { - handshake(); + switch (result.getStatus()) { + case CLOSED: + // notify about the CLOSED state of the SSLEngine. See #137 + sslCloseFuture.setClosed(); + break; + case BUFFER_UNDERFLOW: + break loop; } - synchronized (handshakeLock) { - result = engine.unwrap(inNetBuf, outAppBuf); - } - - // notify about the CLOSED state of the SSLEngine. See #137 - if (result.getStatus() == Status.CLOSED) { - sslEngineCloseFuture.setClosed(); - } - - final HandshakeStatus handshakeStatus = result.getHandshakeStatus(); - handleRenegotiation(handshakeStatus); - switch (handshakeStatus) { + switch (result.getHandshakeStatus()) { case NEED_UNWRAP: - if (inNetBuf.hasRemaining() && !engine.isInboundDone()) { - break; - } else { - break loop; - } + break; case NEED_WRAP: - wrapNonAppData(ctx, channel); + wrapLater = true; break; case NEED_TASK: runDelegatedTasks(); break; case FINISHED: - setHandshakeSuccess(channel); - needsWrap = true; - break loop; + setHandshakeSuccess(); + continue; case NOT_HANDSHAKING: - needsWrap = true; - break loop; + break; default: throw new IllegalStateException( - "Unknown handshake status: " + handshakeStatus); + "Unknown handshake status: " + result.getHandshakeStatus()); } + if (result.bytesConsumed() == 0 && result.bytesProduced() == 0) { + break loop; + } } - if (needsWrap) { - wrapNonAppData(ctx, channel); - } - - outAppBuf.flip(); - - if (outAppBuf.hasRemaining()) { - // Transfer the bytes to the new ChannelBuffer using some safe method that will also - // work with "non" heap buffers - // - // See https://github.com/netty/netty/issues/329 - out.writeBytes(outAppBuf); + if (wrapLater) { + flush(ctx, ctx.newFuture()); } } catch (SSLException e) { - setHandshakeFailure(channel, e); + setHandshakeFailure(e); throw e; } finally { - bufferPool.releaseBuffer(outAppBuf); + if (bytesProduced > 0) { + in.discardReadBytes(); + ctx.fireInboundBufferUpdated(); + } } } - private void handleRenegotiation(HandshakeStatus handshakeStatus) { - if (handshakeStatus == HandshakeStatus.NOT_HANDSHAKING || - handshakeStatus == HandshakeStatus.FINISHED) { - // Not handshaking - return; - } - - if (!handshaken) { - // Not renegotiation - return; - } - - final boolean renegotiate; - synchronized (handshakeLock) { - if (handshaking) { - // Renegotiation in progress or failed already. - // i.e. Renegotiation check has been done already below. - return; + private static SSLEngineResult unwrap(SSLEngine engine, ChannelBuffer in, ChannelBuffer out) throws SSLException { + ByteBuffer in0 = in.nioBuffer(); + for (;;) { + ByteBuffer out0 = out.nioBuffer(out.writerIndex(), out.writableBytes()); + SSLEngineResult result = engine.unwrap(in0, out0); + in.skipBytes(result.bytesConsumed()); + out.writerIndex(out.writerIndex() + result.bytesProduced()); + switch (result.getStatus()) { + case BUFFER_OVERFLOW: + out.ensureWritableBytes(engine.getSession().getApplicationBufferSize()); + break; + default: + return result; } - - if (engine.isInboundDone() || engine.isOutboundDone()) { - // Not handshaking but closing. - return; - } - - if (isEnableRenegotiation()) { - // Continue renegotiation. - renegotiate = true; - } else { - // Do not renegotiate. - renegotiate = false; - // Prevent reentrance of this method. - handshaking = true; - } - } - - if (renegotiate) { - // Renegotiate. - handshake(); - } else { - // Raise an exception. - ctx.fireExceptionCaught(new SSLException( - "renegotiation attempted by peer; " + - "closing the connection")); - - // Close the connection to stop renegotiation. - ctx.close(ctx.channel().newSucceededFuture()); } } private void runDelegatedTasks() { for (;;) { - final Runnable task; - synchronized (handshakeLock) { - task = engine.getDelegatedTask(); - } - + Runnable task = engine.getDelegatedTask(); if (task == null) { break; } - delegatedTaskExecutor.execute(new Runnable() { - @Override - public void run() { - synchronized (handshakeLock) { - task.run(); - } - } - }); + delegatedTaskExecutor.execute(task); } } - private void setHandshakeSuccess(Channel channel) { - synchronized (handshakeLock) { - handshaking = false; - handshaken = true; - - if (handshakeFuture == null) { - handshakeFuture = channel.newFuture(); + private void setHandshakeSuccess() { + for (;;) { + ChannelFuture f = handshakeFutures.poll(); + if (f == null) { + break; } + f.setSuccess(); } - - handshakeFuture.setSuccess(); } - private void setHandshakeFailure(Channel channel, SSLException cause) { - synchronized (handshakeLock) { - if (!handshaking) { - return; - } - handshaking = false; - handshaken = false; - - if (handshakeFuture == null) { - handshakeFuture = channel.newFuture(); + private void setHandshakeFailure(Throwable cause) { + // Release all resources such as internal buffers that SSLEngine + // is managing. + engine.closeOutbound(); + try { + engine.closeInbound(); + } catch (SSLException e) { + if (logger.isDebugEnabled()) { + logger.debug( + "SSLEngine.closeInbound() raised an exception after " + + "a handshake failure.", e); } - // Release all resources such as internal buffers that SSLEngine - // is managing. - - engine.closeOutbound(); - - try { - engine.closeInbound(); - } catch (SSLException e) { - if (logger.isDebugEnabled()) { - logger.debug( - "SSLEngine.closeInbound() raised an exception after " + - "a handshake failure.", e); - } - - } } - handshakeFuture.setFailure(cause); + for (;;) { + ChannelFuture f = handshakeFutures.poll(); + if (f == null) { + break; + } + if (cause == null) { + cause = new ClosedChannelException(); + } + f.setFailure(cause); + } } private void closeOutboundAndChannel( - final ChannelHandlerContext context, final ChannelFuture future, boolean disconnect) throws Exception { - if (!context.channel().isActive()) { + final ChannelHandlerContext ctx, final ChannelFuture future, boolean disconnect) throws Exception { + if (!ctx.channel().isActive()) { if (disconnect) { ctx.disconnect(future); } else { @@ -1009,79 +596,56 @@ public class SslHandler extends StreamToStreamCodec { return; } - boolean success = false; - try { - try { - unwrap(context, context.channel(), ChannelBuffers.EMPTY_BUFFER, 0, 0, ChannelBuffers.EMPTY_BUFFER); - } catch (SSLException ex) { - if (logger.isDebugEnabled()) { - logger.debug("Failed to unwrap before sending a close_notify message", ex); + engine.closeOutbound(); + + ChannelFuture closeNotifyFuture = ctx.newFuture(); + flush(ctx, closeNotifyFuture); + + // Force-close the connection if close_notify is not fully sent in time. + final ScheduledFuture timeoutFuture = ctx.executor().schedule(new Runnable() { + @Override + public void run() { + if (future.setSuccess()) { + logger.debug("close_notify write attempt timed out. Force-closing the connection."); + ctx.close(ctx.newFuture()); } } + }, 3, TimeUnit.SECONDS); // FIXME: Magic value - if (!engine.isInboundDone()) { - if (!sentCloseNotify) { - sentCloseNotify = true; - engine.closeOutbound(); - try { - ChannelFuture closeNotifyFuture = wrapNonAppData(context, context.channel()); - closeNotifyFuture.addListener( - new ClosingChannelFutureListener(context, future)); - success = true; - } catch (SSLException ex) { - if (logger.isDebugEnabled()) { - logger.debug("Failed to encode a close_notify message", ex); - } - } - } - } else { - success = true; + // Close the connection if close_notify is sent in time. + closeNotifyFuture.addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture f) + throws Exception { + timeoutFuture.cancel(false); + ctx.close(future); } - } finally { - if (!success) { - if (disconnect) { - ctx.disconnect(future); - } else { - ctx.close(future); - } - } - } - } - private static final class ClosingChannelFutureListener implements ChannelFutureListener { - - private final ChannelHandlerContext context; - private final ChannelFuture f; - - ClosingChannelFutureListener( - ChannelHandlerContext context, ChannelFuture f) { - this.context = context; - this.f = f; - } - - @Override - public void operationComplete(ChannelFuture closeNotifyFuture) throws Exception { - if (!(closeNotifyFuture.cause() instanceof ClosedChannelException)) { - context.close(f); - } else { - f.setSuccess(); - } - } + }); } @Override public void beforeAdd(ChannelHandlerContext ctx) throws Exception { this.ctx = ctx; - handshakeFuture = ctx.newFuture(); } - + @Override + public void afterAdd(ChannelHandlerContext ctx) throws Exception { + if (ctx.channel().isActive()) { + // channelActvie() event has been fired already, which means this.channelActive() will + // not be invoked. We have to initialize here instead. + handshake(); + } else { + // channelActive() event has not been fired yet. this.channelOpen() will be invoked + // and initialization will occur there. + } + } /** * Calls {@link #handshake()} once the {@link Channel} is connected */ @Override public void channelActive(final ChannelHandlerContext ctx) throws Exception { - if (issueHandshake) { + if (!startTls && engine.getUseClientMode()) { // issue and handshake and add a listener to it which will fire an exception event if an exception was thrown while doing the handshake handshake().addListener(new ChannelFutureListener() { @@ -1099,7 +663,7 @@ public class SslHandler extends StreamToStreamCodec { } }); } else { - super.channelActive(ctx); + ctx.fireChannelActive(); } } diff --git a/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketSslEchoTest.java b/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketSslEchoTest.java index 56f26f9edd..2ee6392886 100644 --- a/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketSslEchoTest.java +++ b/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketSslEchoTest.java @@ -92,15 +92,7 @@ public class SocketSslEchoTest extends AbstractSocketTest { Channel sc = sb.bind().sync().channel(); Channel cc = cb.connect().sync().channel(); ChannelFuture hf = cc.pipeline().get(SslHandler.class).handshake(); - hf.awaitUninterruptibly(); - if (!hf.isSuccess()) { - logger.error("Handshake failed", hf.cause()); - sh.channel.close().awaitUninterruptibly(); - ch.channel.close().awaitUninterruptibly(); - sc.close().awaitUninterruptibly(); - } - - assertTrue(hf.isSuccess()); + hf.sync(); for (int i = 0; i < data.length;) { int length = Math.min(random.nextInt(1024 * 64), data.length - i); diff --git a/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioChannel.java b/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioChannel.java index 60d7062fc7..702fd51d19 100644 --- a/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioChannel.java @@ -25,6 +25,8 @@ import java.net.SocketAddress; abstract class AbstractOioChannel extends AbstractChannel { + static final int SO_TIMEOUT = 1000; + protected AbstractOioChannel(Channel parent, Integer id) { super(parent, id); } diff --git a/transport/src/main/java/io/netty/channel/socket/oio/OioChildEventLoop.java b/transport/src/main/java/io/netty/channel/socket/oio/OioChildEventLoop.java index 495a5631b4..9de4268024 100644 --- a/transport/src/main/java/io/netty/channel/socket/oio/OioChildEventLoop.java +++ b/transport/src/main/java/io/netty/channel/socket/oio/OioChildEventLoop.java @@ -58,7 +58,21 @@ class OioChildEventLoop extends SingleThreadEventLoop { // Waken up by interruptThread() } } else { - runAllTasks(); + long startTime = System.nanoTime(); + for (;;) { + final Runnable task = pollTask(); + if (task == null) { + break; + } + + task.run(); + + // Ensure running tasks doesn't take too much time. + if (System.nanoTime() - startTime > AbstractOioChannel.SO_TIMEOUT * 1000000L) { + break; + } + } + ch.unsafe().read(); // Handle deregistration diff --git a/transport/src/main/java/io/netty/channel/socket/oio/OioDatagramChannel.java b/transport/src/main/java/io/netty/channel/socket/oio/OioDatagramChannel.java index 6710b22674..0753d83906 100644 --- a/transport/src/main/java/io/netty/channel/socket/oio/OioDatagramChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/oio/OioDatagramChannel.java @@ -69,7 +69,7 @@ public class OioDatagramChannel extends AbstractOioMessageChannel boolean success = false; try { - socket.setSoTimeout(1000); + socket.setSoTimeout(SO_TIMEOUT); socket.setBroadcast(false); success = true; } catch (SocketException e) { diff --git a/transport/src/main/java/io/netty/channel/socket/oio/OioServerSocketChannel.java b/transport/src/main/java/io/netty/channel/socket/oio/OioServerSocketChannel.java index 73f07fea08..b2cf3a45e5 100644 --- a/transport/src/main/java/io/netty/channel/socket/oio/OioServerSocketChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/oio/OioServerSocketChannel.java @@ -66,7 +66,7 @@ public class OioServerSocketChannel extends AbstractOioMessageChannel boolean success = false; try { - socket.setSoTimeout(1000); + socket.setSoTimeout(SO_TIMEOUT); success = true; } catch (IOException e) { throw new ChannelException( diff --git a/transport/src/main/java/io/netty/channel/socket/oio/OioSocketChannel.java b/transport/src/main/java/io/netty/channel/socket/oio/OioSocketChannel.java index a28b8465f3..14965a89fc 100644 --- a/transport/src/main/java/io/netty/channel/socket/oio/OioSocketChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/oio/OioSocketChannel.java @@ -62,7 +62,7 @@ public class OioSocketChannel extends AbstractOioStreamChannel is = socket.getInputStream(); os = socket.getOutputStream(); } - socket.setSoTimeout(1000); + socket.setSoTimeout(SO_TIMEOUT); success = true; } catch (Exception e) { throw new ChannelException("failed to initialize a socket", e);