Fixed a problem where SslHandler does not notify close futures when partial SSL frame is received.
This commit is contained in:
parent
9252f449bd
commit
e323b221d7
@ -24,13 +24,14 @@ package org.jboss.netty.handler.ssl;
|
|||||||
|
|
||||||
import static org.jboss.netty.channel.Channels.*;
|
import static org.jboss.netty.channel.Channels.*;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
import java.nio.channels.ClosedChannelException;
|
import java.nio.channels.ClosedChannelException;
|
||||||
import java.util.LinkedList;
|
import java.util.LinkedList;
|
||||||
import java.util.Queue;
|
import java.util.Queue;
|
||||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
|
||||||
import java.util.concurrent.Executor;
|
import java.util.concurrent.Executor;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
import java.util.regex.Pattern;
|
||||||
|
|
||||||
import javax.net.ssl.SSLEngine;
|
import javax.net.ssl.SSLEngine;
|
||||||
import javax.net.ssl.SSLEngineResult;
|
import javax.net.ssl.SSLEngineResult;
|
||||||
@ -49,6 +50,7 @@ import org.jboss.netty.channel.ChannelPipeline;
|
|||||||
import org.jboss.netty.channel.ChannelStateEvent;
|
import org.jboss.netty.channel.ChannelStateEvent;
|
||||||
import org.jboss.netty.channel.Channels;
|
import org.jboss.netty.channel.Channels;
|
||||||
import org.jboss.netty.channel.DownstreamMessageEvent;
|
import org.jboss.netty.channel.DownstreamMessageEvent;
|
||||||
|
import org.jboss.netty.channel.ExceptionEvent;
|
||||||
import org.jboss.netty.channel.MessageEvent;
|
import org.jboss.netty.channel.MessageEvent;
|
||||||
import org.jboss.netty.handler.codec.frame.FrameDecoder;
|
import org.jboss.netty.handler.codec.frame.FrameDecoder;
|
||||||
import org.jboss.netty.logging.InternalLogger;
|
import org.jboss.netty.logging.InternalLogger;
|
||||||
@ -107,6 +109,9 @@ public class SslHandler extends FrameDecoder {
|
|||||||
|
|
||||||
private static final ByteBuffer EMPTY_BUFFER = ByteBuffer.allocate(0);
|
private static final ByteBuffer EMPTY_BUFFER = ByteBuffer.allocate(0);
|
||||||
|
|
||||||
|
private static final Pattern CONNECTION_RESET =
|
||||||
|
Pattern.compile("^.*Connection\\s*reset.*$", Pattern.CASE_INSENSITIVE);
|
||||||
|
|
||||||
private static SslBufferPool defaultBufferPool;
|
private static SslBufferPool defaultBufferPool;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -133,7 +138,6 @@ public class SslHandler extends FrameDecoder {
|
|||||||
|
|
||||||
private final AtomicBoolean sentFirstMessage = new AtomicBoolean();
|
private final AtomicBoolean sentFirstMessage = new AtomicBoolean();
|
||||||
private final AtomicBoolean sentCloseNotify = new AtomicBoolean();
|
private final AtomicBoolean sentCloseNotify = new AtomicBoolean();
|
||||||
final Queue<ChannelFuture> closeFutures = new ConcurrentLinkedQueue<ChannelFuture>();
|
|
||||||
private final Queue<PendingWrite> pendingUnencryptedWrites = new LinkedList<PendingWrite>();
|
private final Queue<PendingWrite> pendingUnencryptedWrites = new LinkedList<PendingWrite>();
|
||||||
private final Queue<MessageEvent> pendingEncryptedWrites = new LinkedList<MessageEvent>();
|
private final Queue<MessageEvent> pendingEncryptedWrites = new LinkedList<MessageEvent>();
|
||||||
|
|
||||||
@ -358,17 +362,6 @@ public class SslHandler extends FrameDecoder {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Notify all close futures which were not notified yet.
|
|
||||||
synchronized (closeFutures) {
|
|
||||||
for (;;) {
|
|
||||||
ChannelFuture future = closeFutures.poll();
|
|
||||||
if (future == null) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
future.setSuccess();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
try {
|
try {
|
||||||
super.channelDisconnected(ctx, e);
|
super.channelDisconnected(ctx, e);
|
||||||
} finally {
|
} finally {
|
||||||
@ -384,9 +377,33 @@ public class SslHandler extends FrameDecoder {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
|
||||||
|
throws Exception {
|
||||||
|
|
||||||
|
Throwable cause = e.getCause();
|
||||||
|
if (cause instanceof IOException && engine.isOutboundDone()) {
|
||||||
|
String message = String.valueOf(cause.getMessage()).toLowerCase();
|
||||||
|
if (CONNECTION_RESET.matcher(message).matches()) {
|
||||||
|
// It is safe to ignore the 'connection reset by peer' error
|
||||||
|
// after sending closure_notify.
|
||||||
|
logger.debug(
|
||||||
|
"Ignoring a 'connection reset by peer' error",
|
||||||
|
cause);
|
||||||
|
|
||||||
|
// Close the connection explicitly just in case the transport
|
||||||
|
// did not close the connection automatically.
|
||||||
|
Channels.close(ctx, succeededFuture(e.getChannel()));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx.sendUpstream(e);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected Object decode(
|
protected Object decode(
|
||||||
ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws Exception {
|
final ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws Exception {
|
||||||
if (buffer.readableBytes() < 2) {
|
if (buffer.readableBytes() < 2) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
@ -414,17 +431,6 @@ public class SslHandler extends FrameDecoder {
|
|||||||
buffer.skipBytes(packetLength);
|
buffer.skipBytes(packetLength);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (frame == null && engine.isInboundDone()) {
|
|
||||||
synchronized (closeFutures) {
|
|
||||||
for (;;) {
|
|
||||||
ChannelFuture future = closeFutures.poll();
|
|
||||||
if (future == null) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
Channels.close(ctx, future);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return frame;
|
return frame;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -772,15 +778,12 @@ public class SslHandler extends FrameDecoder {
|
|||||||
if (!engine.isInboundDone()) {
|
if (!engine.isInboundDone()) {
|
||||||
if (sentCloseNotify.compareAndSet(false, true)) {
|
if (sentCloseNotify.compareAndSet(false, true)) {
|
||||||
engine.closeOutbound();
|
engine.closeOutbound();
|
||||||
synchronized (closeFutures) {
|
|
||||||
ChannelFuture closeNotifyFuture = wrapNonAppData(context, e.getChannel());
|
ChannelFuture closeNotifyFuture = wrapNonAppData(context, e.getChannel());
|
||||||
closeNotifyFuture.addListener(new ChannelFutureListener() {
|
closeNotifyFuture.addListener(new ChannelFutureListener() {
|
||||||
public void operationComplete(ChannelFuture closeNotifyFuture) throws Exception {
|
public void operationComplete(ChannelFuture closeNotifyFuture) throws Exception {
|
||||||
boolean offered = closeFutures.offer(e.getFuture());
|
Channels.close(context, e.getFuture());
|
||||||
assert offered;
|
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user