diff --git a/example/src/main/java/io/netty/example/securechat/SecureChatClientHandler.java b/example/src/main/java/io/netty/example/securechat/SecureChatClientHandler.java
index 25aa94b7b4..4573fd6c75 100644
--- a/example/src/main/java/io/netty/example/securechat/SecureChatClientHandler.java
+++ b/example/src/main/java/io/netty/example/securechat/SecureChatClientHandler.java
@@ -17,7 +17,6 @@ package io.netty.example.securechat;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundMessageHandlerAdapter;
-import io.netty.handler.ssl.SslHandler;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -30,16 +29,6 @@ public class SecureChatClientHandler extends ChannelInboundMessageHandlerAdapter
private static final Logger logger = Logger.getLogger(
SecureChatClientHandler.class.getName());
- @Override
- public void channelActive(ChannelHandlerContext ctx) throws Exception {
- // Get the SslHandler from the pipeline
- // which were added in SecureChatPipelineFactory.
- SslHandler sslHandler = ctx.pipeline().get(SslHandler.class);
-
- // Begin handshake.
- sslHandler.handshake();
- }
-
@Override
public void messageReceived(ChannelHandlerContext ctx, String msg) throws Exception {
System.err.println(msg);
diff --git a/example/src/main/java/io/netty/example/securechat/SecureChatServerHandler.java b/example/src/main/java/io/netty/example/securechat/SecureChatServerHandler.java
index c62820ffdc..a4a2b36357 100644
--- a/example/src/main/java/io/netty/example/securechat/SecureChatServerHandler.java
+++ b/example/src/main/java/io/netty/example/securechat/SecureChatServerHandler.java
@@ -16,8 +16,6 @@
package io.netty.example.securechat;
import io.netty.channel.Channel;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundMessageHandlerAdapter;
import io.netty.channel.group.ChannelGroup;
@@ -40,13 +38,18 @@ public class SecureChatServerHandler extends ChannelInboundMessageHandlerAdapter
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
- // Get the SslHandler in the current pipeline.
- // We added it in SecureChatPipelineFactory.
- final SslHandler sslHandler = ctx.pipeline().get(SslHandler.class);
+ // Once session is secured, send a greeting.
+ ctx.write(
+ "Welcome to " + InetAddress.getLocalHost().getHostName() +
+ " secure chat service!\n");
+ ctx.write(
+ "Your session is protected by " +
+ ctx.pipeline().get(SslHandler.class).getEngine().getSession().getCipherSuite() +
+ " cipher suite.\n");
- // Get notified when SSL handshake is done.
- ChannelFuture handshakeFuture = sslHandler.handshake();
- handshakeFuture.addListener(new Greeter(sslHandler));
+ // Register the channel to the global channel list
+ // so the channel received the messages from others.
+ channels.add(ctx.channel());
}
@Override
@@ -74,33 +77,4 @@ public class SecureChatServerHandler extends ChannelInboundMessageHandlerAdapter
"Unexpected exception from downstream.", cause);
ctx.close();
}
-
- private static final class Greeter implements ChannelFutureListener {
-
- private final SslHandler sslHandler;
-
- Greeter(SslHandler sslHandler) {
- this.sslHandler = sslHandler;
- }
-
- @Override
- public void operationComplete(ChannelFuture future) throws Exception {
- if (future.isSuccess()) {
- // Once session is secured, send a greeting.
- future.channel().write(
- "Welcome to " + InetAddress.getLocalHost().getHostName() +
- " secure chat service!\n");
- future.channel().write(
- "Your session is protected by " +
- sslHandler.getEngine().getSession().getCipherSuite() +
- " cipher suite.\n");
-
- // Register the channel to the global channel list
- // so the channel received the messages from others.
- channels.add(future.channel());
- } else {
- future.channel().close();
- }
- }
- }
}
diff --git a/handler/pom.xml b/handler/pom.xml
index 5f39fea800..3aa45af965 100644
--- a/handler/pom.xml
+++ b/handler/pom.xml
@@ -39,11 +39,6 @@
netty-transport
${project.version}
-
- ${project.groupId}
- netty-codec
- ${project.version}
-
diff --git a/handler/src/main/java/io/netty/handler/ssl/SslHandler.java b/handler/src/main/java/io/netty/handler/ssl/SslHandler.java
index 1b654782e7..4c4e95ff45 100644
--- a/handler/src/main/java/io/netty/handler/ssl/SslHandler.java
+++ b/handler/src/main/java/io/netty/handler/ssl/SslHandler.java
@@ -16,26 +16,32 @@
package io.netty.handler.ssl;
import io.netty.buffer.ChannelBuffer;
-import io.netty.buffer.ChannelBuffers;
import io.netty.channel.Channel;
+import io.netty.channel.ChannelBufferHolder;
+import io.netty.channel.ChannelBufferHolders;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandler;
+import io.netty.channel.ChannelOutboundHandler;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.DefaultChannelFuture;
-import io.netty.handler.codec.StreamToStreamCodec;
import io.netty.logging.InternalLogger;
import io.netty.logging.InternalLoggerFactory;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
+import java.util.ArrayDeque;
+import java.util.Queue;
import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLEngineResult;
-import javax.net.ssl.SSLEngineResult.HandshakeStatus;
import javax.net.ssl.SSLEngineResult.Status;
import javax.net.ssl.SSLException;
@@ -134,55 +140,26 @@ import javax.net.ssl.SSLException;
* @apiviz.landmark
* @apiviz.uses io.netty.handler.ssl.SslBufferPool
*/
-public class SslHandler extends StreamToStreamCodec {
+public class SslHandler
+ extends ChannelHandlerAdapter
+ implements ChannelInboundHandler, ChannelOutboundHandler {
private static final InternalLogger logger =
InternalLoggerFactory.getInstance(SslHandler.class);
- private static final ByteBuffer EMPTY_BUFFER = ByteBuffer.allocate(0);
-
private static final Pattern IGNORABLE_ERROR_MESSAGE = Pattern.compile(
"^.*(?:connection.*reset|connection.*closed|broken.*pipe).*$",
Pattern.CASE_INSENSITIVE);
- private static SslBufferPool defaultBufferPool;
-
- /**
- * Returns the default {@link SslBufferPool} used when no pool is
- * specified in the constructor.
- */
- public static synchronized SslBufferPool getDefaultBufferPool() {
- if (defaultBufferPool == null) {
- defaultBufferPool = new DefaultSslBufferPool();
- }
- return defaultBufferPool;
- }
-
- private ChannelHandlerContext ctx;
+ private volatile ChannelHandlerContext ctx;
private final SSLEngine engine;
- private final SslBufferPool bufferPool;
private final Executor delegatedTaskExecutor;
private final boolean startTls;
private boolean sentFirstMessage;
- private volatile boolean enableRenegotiation = true;
-
- final Object handshakeLock = new Object();
-
- private boolean handshaking;
- private volatile boolean handshaken;
- private ChannelFuture handshakeFuture;
-
- private boolean sentCloseNotify;
-
- int ignoreClosedChannelException;
- final Object ignoreClosedChannelExceptionLock = new Object();
- private volatile boolean issueHandshake;
-
- private final SSLEngineInboundCloseFuture sslEngineCloseFuture = new SSLEngineInboundCloseFuture();
-
- private int packetLength = -1;
+ private final Queue handshakeFutures = new ArrayDeque();
+ private final SSLEngineInboundCloseFuture sslCloseFuture = new SSLEngineInboundCloseFuture();
/**
* Creates a new instance.
@@ -190,18 +167,7 @@ public class SslHandler extends StreamToStreamCodec {
* @param engine the {@link SSLEngine} this handler will use
*/
public SslHandler(SSLEngine engine) {
- this(engine, getDefaultBufferPool(), ImmediateExecutor.INSTANCE);
- }
-
- /**
- * Creates a new instance.
- *
- * @param engine the {@link SSLEngine} this handler will use
- * @param bufferPool the {@link SslBufferPool} where this handler will
- * acquire the buffers required by the {@link SSLEngine}
- */
- public SslHandler(SSLEngine engine, SslBufferPool bufferPool) {
- this(engine, bufferPool, ImmediateExecutor.INSTANCE);
+ this(engine, ImmediateExecutor.INSTANCE);
}
/**
@@ -212,20 +178,7 @@ public class SslHandler extends StreamToStreamCodec {
* encrypted by the {@link SSLEngine}
*/
public SslHandler(SSLEngine engine, boolean startTls) {
- this(engine, getDefaultBufferPool(), startTls);
- }
-
- /**
- * Creates a new instance.
- *
- * @param engine the {@link SSLEngine} this handler will use
- * @param bufferPool the {@link SslBufferPool} where this handler will
- * acquire the buffers required by the {@link SSLEngine}
- * @param startTls {@code true} if the first write request shouldn't be
- * encrypted by the {@link SSLEngine}
- */
- public SslHandler(SSLEngine engine, SslBufferPool bufferPool, boolean startTls) {
- this(engine, bufferPool, startTls, ImmediateExecutor.INSTANCE);
+ this(engine, startTls, ImmediateExecutor.INSTANCE);
}
/**
@@ -238,23 +191,7 @@ public class SslHandler extends StreamToStreamCodec {
* that {@link SSLEngine#getDelegatedTask()} will return
*/
public SslHandler(SSLEngine engine, Executor delegatedTaskExecutor) {
- this(engine, getDefaultBufferPool(), delegatedTaskExecutor);
- }
-
- /**
- * Creates a new instance.
- *
- * @param engine
- * the {@link SSLEngine} this handler will use
- * @param bufferPool
- * the {@link SslBufferPool} where this handler will acquire
- * the buffers required by the {@link SSLEngine}
- * @param delegatedTaskExecutor
- * the {@link Executor} which will execute the delegated task
- * that {@link SSLEngine#getDelegatedTask()} will return
- */
- public SslHandler(SSLEngine engine, SslBufferPool bufferPool, Executor delegatedTaskExecutor) {
- this(engine, bufferPool, false, delegatedTaskExecutor);
+ this(engine, false, delegatedTaskExecutor);
}
/**
@@ -270,36 +207,13 @@ public class SslHandler extends StreamToStreamCodec {
* that {@link SSLEngine#getDelegatedTask()} will return
*/
public SslHandler(SSLEngine engine, boolean startTls, Executor delegatedTaskExecutor) {
- this(engine, getDefaultBufferPool(), startTls, delegatedTaskExecutor);
- }
-
- /**
- * Creates a new instance.
- *
- * @param engine
- * the {@link SSLEngine} this handler will use
- * @param bufferPool
- * the {@link SslBufferPool} where this handler will acquire
- * the buffers required by the {@link SSLEngine}
- * @param startTls
- * {@code true} if the first write request shouldn't be encrypted
- * by the {@link SSLEngine}
- * @param delegatedTaskExecutor
- * the {@link Executor} which will execute the delegated task
- * that {@link SSLEngine#getDelegatedTask()} will return
- */
- public SslHandler(SSLEngine engine, SslBufferPool bufferPool, boolean startTls, Executor delegatedTaskExecutor) {
if (engine == null) {
throw new NullPointerException("engine");
}
- if (bufferPool == null) {
- throw new NullPointerException("bufferPool");
- }
if (delegatedTaskExecutor == null) {
throw new NullPointerException("delegatedTaskExecutor");
}
this.engine = engine;
- this.bufferPool = bufferPool;
this.delegatedTaskExecutor = delegatedTaskExecutor;
this.startTls = startTls;
}
@@ -311,69 +225,47 @@ public class SslHandler extends StreamToStreamCodec {
return engine;
}
+ public ChannelFuture handshake() {
+ return handshake(ctx.newFuture());
+ }
+
/**
* Starts an SSL / TLS handshake for the specified channel.
*
* @return a {@link ChannelFuture} which is notified when the handshake
* succeeds or fails.
*/
- public ChannelFuture handshake() {
- if (handshaken && !isEnableRenegotiation()) {
- throw new IllegalStateException("renegotiation disabled");
- }
+ public ChannelFuture handshake(final ChannelFuture future) {
+ final ChannelHandlerContext ctx = this.ctx;
- Channel channel = ctx.channel();
- Exception exception = null;
-
- synchronized (handshakeLock) {
- if (handshaking) {
- return handshakeFuture;
- } else {
- handshaking = true;
- if (ctx.executor().inEventLoop()) {
- try {
- engine.beginHandshake();
- runDelegatedTasks();
- wrapNonAppData(ctx, channel);
-
- } catch (Exception e) {
- exception = e;
- }
-
- } else {
- ctx.executor().execute(new Runnable() {
-
- @Override
- public void run() {
- Throwable exception = null;
- synchronized (handshakeLock) {
- try {
-
- engine.beginHandshake();
- runDelegatedTasks();
- wrapNonAppData(ctx, ctx.channel());
-
- } catch (Exception e) {
- exception = e;
- }
- }
- if (exception != null) { // Failed to initiate handshake.
- handshakeFuture.setFailure(exception);
- ctx.fireExceptionCaught(exception);
- }
- }
- });
+ ctx.executor().schedule(new Runnable() {
+ @Override
+ public void run() {
+ if (future.isDone()) {
+ return;
}
+ SSLException e = new SSLException("handshake timed out");
+ future.setFailure(e);
+ ctx.fireExceptionCaught(e);
+ ctx.close();
}
- }
+ }, 10, TimeUnit.SECONDS); // FIXME: Magic value
+ ctx.executor().execute(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ engine.beginHandshake();
+ handshakeFutures.add(future);
+ flush(ctx, ctx.newFuture());
+ } catch (Exception e) {
+ future.setFailure(e);
+ ctx.fireExceptionCaught(e);
+ }
+ }
+ });
- if (exception != null) { // Failed to initiate handshake.
- handshakeFuture.setFailure(exception);
- ctx.fireExceptionCaught(exception);
- }
-
- return handshakeFuture;
+ return future;
}
/**
@@ -381,45 +273,20 @@ public class SslHandler extends StreamToStreamCodec {
* destroys the underlying {@link SSLEngine}.
*/
public ChannelFuture close() {
- ChannelHandlerContext ctx = this.ctx;
- Channel channel = ctx.channel();
- try {
- engine.closeOutbound();
- return wrapNonAppData(ctx, channel);
- } catch (SSLException e) {
- ctx.fireExceptionCaught(e);
- return channel.newFailedFuture(e);
- }
+ return close(ctx.newFuture());
}
- /**
- * Returns {@code true} if and only if TLS renegotiation is enabled.
- */
- public boolean isEnableRenegotiation() {
- return enableRenegotiation;
- }
+ public ChannelFuture close(final ChannelFuture future) {
+ final ChannelHandlerContext ctx = this.ctx;
+ ctx.executor().execute(new Runnable() {
+ @Override
+ public void run() {
+ engine.closeOutbound();
+ ctx.flush(future);
+ }
+ });
- /**
- * Enables or disables TLS renegotiation.
- */
- public void setEnableRenegotiation(boolean enableRenegotiation) {
- this.enableRenegotiation = enableRenegotiation;
- }
-
-
- /**
- * Enables or disables the automatic handshake once the {@link Channel} is connected. The value will only have affect if its set before the
- * {@link Channel} is connected.
- */
- public void setIssueHandshake(boolean issueHandshake) {
- this.issueHandshake = issueHandshake;
- }
-
- /**
- * Returns true
if the automatic handshake is enabled
- */
- public boolean isIssueHandshake() {
- return issueHandshake;
+ return future;
}
/**
@@ -430,8 +297,18 @@ public class SslHandler extends StreamToStreamCodec {
* For more informations see the apidocs of {@link SSLEngine}
*
*/
- public ChannelFuture getSSLEngineInboundCloseFuture() {
- return sslEngineCloseFuture;
+ public ChannelFuture sslCloseFuture() {
+ return sslCloseFuture;
+ }
+
+ @Override
+ public ChannelBufferHolder newOutboundBuffer(ChannelHandlerContext ctx) throws Exception {
+ return ChannelBufferHolders.byteBuffer();
+ }
+
+ @Override
+ public ChannelBufferHolder newInboundBuffer(ChannelHandlerContext ctx) throws Exception {
+ return ChannelBufferHolders.byteBuffer();
}
@Override
@@ -446,98 +323,93 @@ public class SslHandler extends StreamToStreamCodec {
closeOutboundAndChannel(ctx, future, false);
}
- @Override
- public void encode(ChannelHandlerContext ctx, ChannelBuffer in, ChannelBuffer out) throws Exception {
+ @Override
+ public void flush(final ChannelHandlerContext ctx, ChannelFuture future) throws Exception {
+ final ChannelBuffer in = ctx.outboundByteBuffer();
+ final ChannelBuffer out = ctx.nextOutboundByteBuffer();
+
+ out.discardReadBytes();
// Do not encrypt the first write request if this handler is
// created with startTLS flag turned on.
if (startTls && !sentFirstMessage) {
sentFirstMessage = true;
out.writeBytes(in);
+ ctx.flush(future);
return;
}
- ByteBuffer outNetBuf = bufferPool.acquireBuffer();
- boolean success = true;
- boolean needsUnwrap = false;
+ boolean unwrapLater = false;
+ int bytesProduced = 0;
try {
- ByteBuffer outAppBuf = in.nioBuffer();
-
- while (in.readable()) {
-
- int read;
- int remaining = outAppBuf.remaining();
- SSLEngineResult result = null;
-
- synchronized (handshakeLock) {
- result = engine.wrap(outAppBuf, outNetBuf);
- }
- read = remaining - outAppBuf.remaining();
- in.readerIndex(in.readerIndex() + read);
-
-
-
- if (result.bytesProduced() > 0) {
- outNetBuf.flip();
- out.writeBytes(outNetBuf);
- outNetBuf.clear();
-
-
- } else if (result.getStatus() == Status.CLOSED) {
+ loop: for (;;) {
+ SSLEngineResult result = wrap(engine, in, out);
+ bytesProduced += result.bytesProduced();
+ if (result.getStatus() == Status.CLOSED) {
// SSLEngine has been closed already.
// Any further write attempts should be denied.
- success = false;
+ if (in.readable()) {
+ in.clear();
+ SSLException e = new SSLException("SSLEngine already closed");
+ future.setFailure(e);
+ ctx.fireExceptionCaught(e);
+ }
break;
} else {
- final HandshakeStatus handshakeStatus = result.getHandshakeStatus();
- handleRenegotiation(handshakeStatus);
- switch (handshakeStatus) {
+ switch (result.getHandshakeStatus()) {
case NEED_WRAP:
- if (outAppBuf.hasRemaining()) {
- break;
- } else {
- break;
- }
+ ctx.flush();
+ continue;
case NEED_UNWRAP:
- needsUnwrap = true;
+ if (ctx.inboundByteBuffer().readable()) {
+ unwrapLater = true;
+ }
+ break;
case NEED_TASK:
runDelegatedTasks();
- break;
+ continue;
case FINISHED:
+ setHandshakeSuccess();
+ continue;
case NOT_HANDSHAKING:
- if (handshakeStatus == HandshakeStatus.FINISHED) {
- setHandshakeSuccess(ctx.channel());
- }
- if (result.getStatus() == Status.CLOSED) {
- success = false;
- }
break;
default:
- throw new IllegalStateException("Unknown handshake status: " + handshakeStatus);
+ throw new IllegalStateException("Unknown handshake status: " + result.getHandshakeStatus());
+ }
+
+ if (result.bytesConsumed() == 0 && result.bytesProduced() == 0) {
+ break loop;
}
}
+ }
+ if (unwrapLater) {
+ inboundBufferUpdated(ctx);
}
} catch (SSLException e) {
- success = false;
- setHandshakeFailure(ctx.channel(), e);
+ setHandshakeFailure(e);
throw e;
} finally {
- bufferPool.releaseBuffer(outNetBuf);
-
- if (!success) {
- // mark all bytes as read
- in.readerIndex(in.readerIndex() + in.readableBytes());
-
- throw new IllegalStateException("SSLEngine already closed");
-
-
+ if (bytesProduced > 0) {
+ in.discardReadBytes();
+ ctx.flush(future);
}
}
+ }
- if (needsUnwrap) {
- unwrap(ctx, ctx.channel(), ChannelBuffers.EMPTY_BUFFER, 0, 0, ChannelBuffers.EMPTY_BUFFER);
+ private static SSLEngineResult wrap(SSLEngine engine, ChannelBuffer in, ChannelBuffer out) throws SSLException {
+ ByteBuffer in0 = in.nioBuffer();
+ for (;;) {
+ ByteBuffer out0 = out.nioBuffer(out.writerIndex(), out.writableBytes());
+ SSLEngineResult result = engine.wrap(in0, out0);
+ in.skipBytes(result.bytesConsumed());
+ out.writerIndex(out.writerIndex() + result.bytesProduced());
+ if (result.getStatus() == Status.BUFFER_OVERFLOW) {
+ out.ensureWritableBytes(engine.getSession().getPacketBufferSize());
+ } else {
+ return result;
+ }
}
}
@@ -545,462 +417,177 @@ public class SslHandler extends StreamToStreamCodec {
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
// Make sure the handshake future is notified when a connection has
// been closed during handshake.
- synchronized (handshakeLock) {
- if (handshaking) {
- handshakeFuture.setFailure(new ClosedChannelException());
- }
- }
+ setHandshakeFailure(null);
try {
- super.channelInactive(ctx);
+ inboundBufferUpdated(ctx);
} finally {
- unwrap(ctx, ctx.channel(), ChannelBuffers.EMPTY_BUFFER, 0, 0, ChannelBuffers.EMPTY_BUFFER);
engine.closeOutbound();
- if (!sentCloseNotify && handshaken) {
- try {
- engine.closeInbound();
- } catch (SSLException ex) {
- if (logger.isDebugEnabled()) {
- logger.debug("Failed to clean up SSLEngine.", ex);
- }
+ try {
+ engine.closeInbound();
+ } catch (SSLException ex) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Failed to clean up SSLEngine.", ex);
}
}
+ ctx.fireChannelInactive();
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
- if (cause instanceof IOException) {
- if (cause instanceof ClosedChannelException) {
- synchronized (ignoreClosedChannelExceptionLock) {
- if (ignoreClosedChannelException > 0) {
- ignoreClosedChannelException --;
- if (logger.isDebugEnabled()) {
- logger.debug(
- "Swallowing an exception raised while " +
- "writing non-app data", cause);
- }
-
- return;
- }
+ if (cause instanceof IOException && engine.isOutboundDone()) {
+ String message = String.valueOf(cause.getMessage()).toLowerCase();
+ if (IGNORABLE_ERROR_MESSAGE.matcher(message).matches()) {
+ // It is safe to ignore the 'connection reset by peer' or
+ // 'broken pipe' error after sending closure_notify.
+ if (logger.isDebugEnabled()) {
+ logger.debug(
+ "Swallowing a 'connection reset by peer / " +
+ "broken pipe' error occurred while writing " +
+ "'closure_notify'", cause);
}
- } else if (engine.isOutboundDone()) {
- String message = String.valueOf(cause.getMessage()).toLowerCase();
- if (IGNORABLE_ERROR_MESSAGE.matcher(message).matches()) {
- // It is safe to ignore the 'connection reset by peer' or
- // 'broken pipe' error after sending closure_notify.
- if (logger.isDebugEnabled()) {
- logger.debug(
- "Swallowing a 'connection reset by peer / " +
- "broken pipe' error occurred while writing " +
- "'closure_notify'", cause);
- }
- // Close the connection explicitly just in case the transport
- // did not close the connection automatically.
- ctx.close(ctx.channel().newFuture());
- return;
+ // Close the connection explicitly just in case the transport
+ // did not close the connection automatically.
+ if (ctx.channel().isActive()) {
+ ctx.close();
}
+ return;
}
}
super.exceptionCaught(ctx, cause);
}
-
-
@Override
- public void decode(ChannelHandlerContext ctx, ChannelBuffer in, ChannelBuffer out) throws Exception {
+ public void inboundBufferUpdated(final ChannelHandlerContext ctx) throws Exception {
+ final ChannelBuffer in = ctx.inboundByteBuffer();
+ final ChannelBuffer out = ctx.nextInboundByteBuffer();
+ out.discardReadBytes();
- // check if the packet lenght was read before
- if (packetLength == -1) {
- if (in.readableBytes() < 5) {
- return;
- }
- // SSLv3 or TLS - Check ContentType
- boolean tls;
- switch (in.getUnsignedByte(in.readerIndex())) {
- case 20: // change_cipher_spec
- case 21: // alert
- case 22: // handshake
- case 23: // application_data
- tls = true;
- break;
- default:
- // SSLv2 or bad data
- tls = false;
- }
-
- if (tls) {
- // SSLv3 or TLS - Check ProtocolVersion
- int majorVersion = in.getUnsignedByte(in.readerIndex() + 1);
- if (majorVersion == 3) {
- // SSLv3 or TLS
- packetLength = (getShort(in, in.readerIndex() + 3) & 0xFFFF) + 5;
- if (packetLength <= 5) {
- // Neither SSLv3 or TLSv1 (i.e. SSLv2 or bad data)
- tls = false;
- }
- } else {
- // Neither SSLv3 or TLSv1 (i.e. SSLv2 or bad data)
- tls = false;
- }
- }
-
- if (!tls) {
- // SSLv2 or bad data - Check the version
- boolean sslv2 = true;
- int headerLength = (in.getUnsignedByte(
- in.readerIndex()) & 0x80) != 0 ? 2 : 3;
- int majorVersion = in.getUnsignedByte(
- in.readerIndex() + headerLength + 1);
- if (majorVersion == 2 || majorVersion == 3) {
- // SSLv2
- if (headerLength == 2) {
- packetLength = (getShort(in, in.readerIndex()) & 0x7FFF) + 2;
- } else {
- packetLength = (getShort(in, in.readerIndex()) & 0x3FFF) + 3;
- }
- if (packetLength <= headerLength) {
- sslv2 = false;
- }
- } else {
- sslv2 = false;
- }
-
- if (!sslv2) {
- // Bad data - discard the buffer and raise an exception.
- SSLException e = new SSLException(
- "not an SSL/TLS record: " + ChannelBuffers.hexDump(in));
- in.skipBytes(in.readableBytes());
- throw e;
- }
- }
-
- assert packetLength > 0;
- }
-
-
- if (in.readableBytes() < packetLength) {
- // not enough bytes left to read the packet
- // so return here for now
- return;
- }
-
- // We advance the buffer's readerIndex before calling unwrap() because
- // unwrap() can trigger FrameDecoder call decode(), this method, recursively.
- // The recursive call results in decoding the same packet twice if
- // the readerIndex is advanced *after* decode().
- //
- // Here's an example:
- // 1) An SSL packet is received from the wire.
- // 2) SslHandler.decode() deciphers the packet and calls the user code.
- // 3) The user closes the channel in the same thread.
- // 4) The same thread triggers a channelDisconnected() event.
- // 5) FrameDecoder.cleanup() is called, and it calls SslHandler.decode().
- // 6) SslHandler.decode() will feed the same packet with what was
- // deciphered at the step 2 again if the readerIndex was not advanced
- // before calling the user code.
- final int packetOffset = in.readerIndex();
- in.skipBytes(packetLength);
+ boolean wrapLater = false;
+ int bytesProduced = 0;
try {
- unwrap(ctx, ctx.channel(), in, packetOffset, packetLength, out);
- } finally {
- // reset packet length
- packetLength = -1;
- }
- }
-
- /**
- * Reads a big-endian short integer from the buffer. Please note that we do not use
- * {@link ChannelBuffer#getShort(int)} because it might be a little-endian buffer.
- */
- private static short getShort(ChannelBuffer buf, int offset) {
- return (short) (buf.getByte(offset) << 8 | buf.getByte(offset + 1) & 0xFF);
- }
-
- private ChannelFuture wrapNonAppData(ChannelHandlerContext ctx, Channel channel) throws SSLException {
- ChannelFuture future = null;
- ByteBuffer outNetBuf = bufferPool.acquireBuffer();
-
- SSLEngineResult result;
- try {
- for (;;) {
- synchronized (handshakeLock) {
- result = engine.wrap(EMPTY_BUFFER, outNetBuf);
- }
-
- if (result.bytesProduced() > 0) {
- outNetBuf.flip();
- ChannelBuffer msg = ChannelBuffers.buffer(outNetBuf.remaining());
- // Transfer the bytes to the new ChannelBuffer using some safe method that will also
- // work with "non" heap buffers
- //
- // See https://github.com/netty/netty/issues/329
- msg.writeBytes(outNetBuf);
- outNetBuf.clear();
-
- future = channel.newFuture();
- future.addListener(new ChannelFutureListener() {
- @Override
- public void operationComplete(ChannelFuture future)
- throws Exception {
- if (future.cause() instanceof ClosedChannelException) {
- synchronized (ignoreClosedChannelExceptionLock) {
- ignoreClosedChannelException ++;
- }
- }
- }
- });
-
- ctx.write(msg, future);
- }
-
- final HandshakeStatus handshakeStatus = result.getHandshakeStatus();
- handleRenegotiation(handshakeStatus);
- switch (handshakeStatus) {
- case FINISHED:
- setHandshakeSuccess(channel);
- runDelegatedTasks();
- break;
- case NEED_TASK:
- runDelegatedTasks();
- break;
- case NEED_UNWRAP:
- if (!Thread.holdsLock(handshakeLock)) {
- // unwrap shouldn't be called when this method was
- // called by unwrap - unwrap will keep running after
- // this method returns.
- unwrap(ctx, channel, ChannelBuffers.EMPTY_BUFFER, 0, 0, ChannelBuffers.EMPTY_BUFFER);
- }
- break;
- case NOT_HANDSHAKING:
- case NEED_WRAP:
- break;
- default:
- throw new IllegalStateException(
- "Unexpected handshake status: " + handshakeStatus);
- }
-
- if (result.bytesProduced() == 0) {
- break;
- }
- }
- } catch (SSLException e) {
- setHandshakeFailure(channel, e);
- throw e;
- } finally {
- bufferPool.releaseBuffer(outNetBuf);
- }
-
- if (future == null) {
- future = channel.newSucceededFuture();
- }
-
- return future;
- }
-
- private void unwrap(
- ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer, int offset, int length, ChannelBuffer out) throws SSLException {
- ByteBuffer inNetBuf = buffer.nioBuffer(offset, length);
- ByteBuffer outAppBuf = bufferPool.acquireBuffer();
-
- try {
- boolean needsWrap = false;
loop:
for (;;) {
- SSLEngineResult result;
- boolean needsHandshake = false;
- synchronized (handshakeLock) {
- if (!handshaken && !handshaking &&
- !engine.getUseClientMode() &&
- !engine.isInboundDone() && !engine.isOutboundDone()) {
- needsHandshake = true;
+ SSLEngineResult result = unwrap(engine, in, out);
+ bytesProduced += result.bytesProduced();
- }
- }
- if (needsHandshake) {
- handshake();
+ switch (result.getStatus()) {
+ case CLOSED:
+ // notify about the CLOSED state of the SSLEngine. See #137
+ sslCloseFuture.setClosed();
+ break;
+ case BUFFER_UNDERFLOW:
+ break loop;
}
- synchronized (handshakeLock) {
- result = engine.unwrap(inNetBuf, outAppBuf);
- }
-
- // notify about the CLOSED state of the SSLEngine. See #137
- if (result.getStatus() == Status.CLOSED) {
- sslEngineCloseFuture.setClosed();
- }
-
- final HandshakeStatus handshakeStatus = result.getHandshakeStatus();
- handleRenegotiation(handshakeStatus);
- switch (handshakeStatus) {
+ switch (result.getHandshakeStatus()) {
case NEED_UNWRAP:
- if (inNetBuf.hasRemaining() && !engine.isInboundDone()) {
- break;
- } else {
- break loop;
- }
+ break;
case NEED_WRAP:
- wrapNonAppData(ctx, channel);
+ wrapLater = true;
break;
case NEED_TASK:
runDelegatedTasks();
break;
case FINISHED:
- setHandshakeSuccess(channel);
- needsWrap = true;
- break loop;
+ setHandshakeSuccess();
+ continue;
case NOT_HANDSHAKING:
- needsWrap = true;
- break loop;
+ break;
default:
throw new IllegalStateException(
- "Unknown handshake status: " + handshakeStatus);
+ "Unknown handshake status: " + result.getHandshakeStatus());
}
+ if (result.bytesConsumed() == 0 && result.bytesProduced() == 0) {
+ break loop;
+ }
}
- if (needsWrap) {
- wrapNonAppData(ctx, channel);
- }
-
- outAppBuf.flip();
-
- if (outAppBuf.hasRemaining()) {
- // Transfer the bytes to the new ChannelBuffer using some safe method that will also
- // work with "non" heap buffers
- //
- // See https://github.com/netty/netty/issues/329
- out.writeBytes(outAppBuf);
+ if (wrapLater) {
+ flush(ctx, ctx.newFuture());
}
} catch (SSLException e) {
- setHandshakeFailure(channel, e);
+ setHandshakeFailure(e);
throw e;
} finally {
- bufferPool.releaseBuffer(outAppBuf);
+ if (bytesProduced > 0) {
+ in.discardReadBytes();
+ ctx.fireInboundBufferUpdated();
+ }
}
}
- private void handleRenegotiation(HandshakeStatus handshakeStatus) {
- if (handshakeStatus == HandshakeStatus.NOT_HANDSHAKING ||
- handshakeStatus == HandshakeStatus.FINISHED) {
- // Not handshaking
- return;
- }
-
- if (!handshaken) {
- // Not renegotiation
- return;
- }
-
- final boolean renegotiate;
- synchronized (handshakeLock) {
- if (handshaking) {
- // Renegotiation in progress or failed already.
- // i.e. Renegotiation check has been done already below.
- return;
+ private static SSLEngineResult unwrap(SSLEngine engine, ChannelBuffer in, ChannelBuffer out) throws SSLException {
+ ByteBuffer in0 = in.nioBuffer();
+ for (;;) {
+ ByteBuffer out0 = out.nioBuffer(out.writerIndex(), out.writableBytes());
+ SSLEngineResult result = engine.unwrap(in0, out0);
+ in.skipBytes(result.bytesConsumed());
+ out.writerIndex(out.writerIndex() + result.bytesProduced());
+ switch (result.getStatus()) {
+ case BUFFER_OVERFLOW:
+ out.ensureWritableBytes(engine.getSession().getApplicationBufferSize());
+ break;
+ default:
+ return result;
}
-
- if (engine.isInboundDone() || engine.isOutboundDone()) {
- // Not handshaking but closing.
- return;
- }
-
- if (isEnableRenegotiation()) {
- // Continue renegotiation.
- renegotiate = true;
- } else {
- // Do not renegotiate.
- renegotiate = false;
- // Prevent reentrance of this method.
- handshaking = true;
- }
- }
-
- if (renegotiate) {
- // Renegotiate.
- handshake();
- } else {
- // Raise an exception.
- ctx.fireExceptionCaught(new SSLException(
- "renegotiation attempted by peer; " +
- "closing the connection"));
-
- // Close the connection to stop renegotiation.
- ctx.close(ctx.channel().newSucceededFuture());
}
}
private void runDelegatedTasks() {
for (;;) {
- final Runnable task;
- synchronized (handshakeLock) {
- task = engine.getDelegatedTask();
- }
-
+ Runnable task = engine.getDelegatedTask();
if (task == null) {
break;
}
- delegatedTaskExecutor.execute(new Runnable() {
- @Override
- public void run() {
- synchronized (handshakeLock) {
- task.run();
- }
- }
- });
+ delegatedTaskExecutor.execute(task);
}
}
- private void setHandshakeSuccess(Channel channel) {
- synchronized (handshakeLock) {
- handshaking = false;
- handshaken = true;
-
- if (handshakeFuture == null) {
- handshakeFuture = channel.newFuture();
+ private void setHandshakeSuccess() {
+ for (;;) {
+ ChannelFuture f = handshakeFutures.poll();
+ if (f == null) {
+ break;
}
+ f.setSuccess();
}
-
- handshakeFuture.setSuccess();
}
- private void setHandshakeFailure(Channel channel, SSLException cause) {
- synchronized (handshakeLock) {
- if (!handshaking) {
- return;
- }
- handshaking = false;
- handshaken = false;
-
- if (handshakeFuture == null) {
- handshakeFuture = channel.newFuture();
+ private void setHandshakeFailure(Throwable cause) {
+ // Release all resources such as internal buffers that SSLEngine
+ // is managing.
+ engine.closeOutbound();
+ try {
+ engine.closeInbound();
+ } catch (SSLException e) {
+ if (logger.isDebugEnabled()) {
+ logger.debug(
+ "SSLEngine.closeInbound() raised an exception after " +
+ "a handshake failure.", e);
}
- // Release all resources such as internal buffers that SSLEngine
- // is managing.
-
- engine.closeOutbound();
-
- try {
- engine.closeInbound();
- } catch (SSLException e) {
- if (logger.isDebugEnabled()) {
- logger.debug(
- "SSLEngine.closeInbound() raised an exception after " +
- "a handshake failure.", e);
- }
-
- }
}
- handshakeFuture.setFailure(cause);
+ for (;;) {
+ ChannelFuture f = handshakeFutures.poll();
+ if (f == null) {
+ break;
+ }
+ if (cause == null) {
+ cause = new ClosedChannelException();
+ }
+ f.setFailure(cause);
+ }
}
private void closeOutboundAndChannel(
- final ChannelHandlerContext context, final ChannelFuture future, boolean disconnect) throws Exception {
- if (!context.channel().isActive()) {
+ final ChannelHandlerContext ctx, final ChannelFuture future, boolean disconnect) throws Exception {
+ if (!ctx.channel().isActive()) {
if (disconnect) {
ctx.disconnect(future);
} else {
@@ -1009,79 +596,56 @@ public class SslHandler extends StreamToStreamCodec {
return;
}
- boolean success = false;
- try {
- try {
- unwrap(context, context.channel(), ChannelBuffers.EMPTY_BUFFER, 0, 0, ChannelBuffers.EMPTY_BUFFER);
- } catch (SSLException ex) {
- if (logger.isDebugEnabled()) {
- logger.debug("Failed to unwrap before sending a close_notify message", ex);
+ engine.closeOutbound();
+
+ ChannelFuture closeNotifyFuture = ctx.newFuture();
+ flush(ctx, closeNotifyFuture);
+
+ // Force-close the connection if close_notify is not fully sent in time.
+ final ScheduledFuture> timeoutFuture = ctx.executor().schedule(new Runnable() {
+ @Override
+ public void run() {
+ if (future.setSuccess()) {
+ logger.debug("close_notify write attempt timed out. Force-closing the connection.");
+ ctx.close(ctx.newFuture());
}
}
+ }, 3, TimeUnit.SECONDS); // FIXME: Magic value
- if (!engine.isInboundDone()) {
- if (!sentCloseNotify) {
- sentCloseNotify = true;
- engine.closeOutbound();
- try {
- ChannelFuture closeNotifyFuture = wrapNonAppData(context, context.channel());
- closeNotifyFuture.addListener(
- new ClosingChannelFutureListener(context, future));
- success = true;
- } catch (SSLException ex) {
- if (logger.isDebugEnabled()) {
- logger.debug("Failed to encode a close_notify message", ex);
- }
- }
- }
- } else {
- success = true;
+ // Close the connection if close_notify is sent in time.
+ closeNotifyFuture.addListener(new ChannelFutureListener() {
+ @Override
+ public void operationComplete(ChannelFuture f)
+ throws Exception {
+ timeoutFuture.cancel(false);
+ ctx.close(future);
}
- } finally {
- if (!success) {
- if (disconnect) {
- ctx.disconnect(future);
- } else {
- ctx.close(future);
- }
- }
- }
- }
- private static final class ClosingChannelFutureListener implements ChannelFutureListener {
-
- private final ChannelHandlerContext context;
- private final ChannelFuture f;
-
- ClosingChannelFutureListener(
- ChannelHandlerContext context, ChannelFuture f) {
- this.context = context;
- this.f = f;
- }
-
- @Override
- public void operationComplete(ChannelFuture closeNotifyFuture) throws Exception {
- if (!(closeNotifyFuture.cause() instanceof ClosedChannelException)) {
- context.close(f);
- } else {
- f.setSuccess();
- }
- }
+ });
}
@Override
public void beforeAdd(ChannelHandlerContext ctx) throws Exception {
this.ctx = ctx;
- handshakeFuture = ctx.newFuture();
}
-
+ @Override
+ public void afterAdd(ChannelHandlerContext ctx) throws Exception {
+ if (ctx.channel().isActive()) {
+ // channelActvie() event has been fired already, which means this.channelActive() will
+ // not be invoked. We have to initialize here instead.
+ handshake();
+ } else {
+ // channelActive() event has not been fired yet. this.channelOpen() will be invoked
+ // and initialization will occur there.
+ }
+ }
/**
* Calls {@link #handshake()} once the {@link Channel} is connected
*/
@Override
public void channelActive(final ChannelHandlerContext ctx) throws Exception {
- if (issueHandshake) {
+ if (!startTls && engine.getUseClientMode()) {
// issue and handshake and add a listener to it which will fire an exception event if an exception was thrown while doing the handshake
handshake().addListener(new ChannelFutureListener() {
@@ -1099,7 +663,7 @@ public class SslHandler extends StreamToStreamCodec {
}
});
} else {
- super.channelActive(ctx);
+ ctx.fireChannelActive();
}
}
diff --git a/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketSslEchoTest.java b/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketSslEchoTest.java
index 56f26f9edd..2ee6392886 100644
--- a/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketSslEchoTest.java
+++ b/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketSslEchoTest.java
@@ -92,15 +92,7 @@ public class SocketSslEchoTest extends AbstractSocketTest {
Channel sc = sb.bind().sync().channel();
Channel cc = cb.connect().sync().channel();
ChannelFuture hf = cc.pipeline().get(SslHandler.class).handshake();
- hf.awaitUninterruptibly();
- if (!hf.isSuccess()) {
- logger.error("Handshake failed", hf.cause());
- sh.channel.close().awaitUninterruptibly();
- ch.channel.close().awaitUninterruptibly();
- sc.close().awaitUninterruptibly();
- }
-
- assertTrue(hf.isSuccess());
+ hf.sync();
for (int i = 0; i < data.length;) {
int length = Math.min(random.nextInt(1024 * 64), data.length - i);
diff --git a/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioChannel.java b/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioChannel.java
index 60d7062fc7..702fd51d19 100644
--- a/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioChannel.java
+++ b/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioChannel.java
@@ -25,6 +25,8 @@ import java.net.SocketAddress;
abstract class AbstractOioChannel extends AbstractChannel {
+ static final int SO_TIMEOUT = 1000;
+
protected AbstractOioChannel(Channel parent, Integer id) {
super(parent, id);
}
diff --git a/transport/src/main/java/io/netty/channel/socket/oio/OioChildEventLoop.java b/transport/src/main/java/io/netty/channel/socket/oio/OioChildEventLoop.java
index 495a5631b4..9de4268024 100644
--- a/transport/src/main/java/io/netty/channel/socket/oio/OioChildEventLoop.java
+++ b/transport/src/main/java/io/netty/channel/socket/oio/OioChildEventLoop.java
@@ -58,7 +58,21 @@ class OioChildEventLoop extends SingleThreadEventLoop {
// Waken up by interruptThread()
}
} else {
- runAllTasks();
+ long startTime = System.nanoTime();
+ for (;;) {
+ final Runnable task = pollTask();
+ if (task == null) {
+ break;
+ }
+
+ task.run();
+
+ // Ensure running tasks doesn't take too much time.
+ if (System.nanoTime() - startTime > AbstractOioChannel.SO_TIMEOUT * 1000000L) {
+ break;
+ }
+ }
+
ch.unsafe().read();
// Handle deregistration
diff --git a/transport/src/main/java/io/netty/channel/socket/oio/OioDatagramChannel.java b/transport/src/main/java/io/netty/channel/socket/oio/OioDatagramChannel.java
index 6710b22674..0753d83906 100644
--- a/transport/src/main/java/io/netty/channel/socket/oio/OioDatagramChannel.java
+++ b/transport/src/main/java/io/netty/channel/socket/oio/OioDatagramChannel.java
@@ -69,7 +69,7 @@ public class OioDatagramChannel extends AbstractOioMessageChannel
boolean success = false;
try {
- socket.setSoTimeout(1000);
+ socket.setSoTimeout(SO_TIMEOUT);
socket.setBroadcast(false);
success = true;
} catch (SocketException e) {
diff --git a/transport/src/main/java/io/netty/channel/socket/oio/OioServerSocketChannel.java b/transport/src/main/java/io/netty/channel/socket/oio/OioServerSocketChannel.java
index 73f07fea08..b2cf3a45e5 100644
--- a/transport/src/main/java/io/netty/channel/socket/oio/OioServerSocketChannel.java
+++ b/transport/src/main/java/io/netty/channel/socket/oio/OioServerSocketChannel.java
@@ -66,7 +66,7 @@ public class OioServerSocketChannel extends AbstractOioMessageChannel
boolean success = false;
try {
- socket.setSoTimeout(1000);
+ socket.setSoTimeout(SO_TIMEOUT);
success = true;
} catch (IOException e) {
throw new ChannelException(
diff --git a/transport/src/main/java/io/netty/channel/socket/oio/OioSocketChannel.java b/transport/src/main/java/io/netty/channel/socket/oio/OioSocketChannel.java
index a28b8465f3..14965a89fc 100644
--- a/transport/src/main/java/io/netty/channel/socket/oio/OioSocketChannel.java
+++ b/transport/src/main/java/io/netty/channel/socket/oio/OioSocketChannel.java
@@ -62,7 +62,7 @@ public class OioSocketChannel extends AbstractOioStreamChannel
is = socket.getInputStream();
os = socket.getOutputStream();
}
- socket.setSoTimeout(1000);
+ socket.setSoTimeout(SO_TIMEOUT);
success = true;
} catch (Exception e) {
throw new ChannelException("failed to initialize a socket", e);