Merge branch 'master' into pseudorandom-channel-IDs

This commit is contained in:
Cruz Julian Bishop 2012-08-23 09:59:02 +10:00
commit d6023ef129
2 changed files with 143 additions and 72 deletions

View File

@ -162,22 +162,7 @@ public class SslHandler
private volatile ChannelHandlerContext ctx;
private final SSLEngine engine;
private final Executor delegatedTaskExecutor;
private final ChannelFlushFutureNotifier flushFutureNotifier = new ChannelFlushFutureNotifier() {
@Override
public synchronized void increaseWriteCounter(long delta) {
super.increaseWriteCounter(delta);
}
@Override
public synchronized void notifyFlushFutures() {
super.notifyFlushFutures();
}
@Override
public synchronized void notifyFlushFutures(Throwable cause) {
super.notifyFlushFutures(cause);
}
};
private final ChannelFlushFutureNotifier flushFutureNotifier = new ChannelFlushFutureNotifier();
private final boolean startTls;
private boolean sentFirstMessage;
@ -285,6 +270,7 @@ public class SslHandler
} catch (Exception e) {
future.setFailure(e);
ctx.fireExceptionCaught(e);
ctx.close();
}
}
});
@ -363,14 +349,20 @@ public class SslHandler
return;
}
flushFutureNotifier.addFlushFuture(future, in.readableBytes());
if (ctx.executor() == ctx.channel().eventLoop()) {
flushFutureNotifier.addFlushFuture(future, in.readableBytes());
} else {
synchronized (flushFutureNotifier) {
flushFutureNotifier.addFlushFuture(future, in.readableBytes());
}
}
boolean unwrapLater = false;
int bytesProduced = 0;
int bytesConsumed = 0;
try {
for (;;) {
SSLEngineResult result = wrap(engine, in, out);
bytesProduced += result.bytesProduced();
bytesConsumed += result.bytesConsumed();
if (result.getStatus() == Status.CLOSED) {
// SSLEngine has been closed already.
// Any further write attempts should be denied.
@ -379,6 +371,8 @@ public class SslHandler
SSLException e = new SSLException("SSLEngine already closed");
future.setFailure(e);
ctx.fireExceptionCaught(e);
flush0(ctx, bytesConsumed, e);
bytesConsumed = 0;
}
break;
} else {
@ -417,11 +411,61 @@ public class SslHandler
throw e;
} finally {
in.unsafe().discardSomeReadBytes();
flushFutureNotifier.increaseWriteCounter(bytesProduced);
ctx.flush(ctx.newFuture().addListener(flushFutureNotifier));
flush0(ctx, bytesConsumed);
}
}
private void flush0(final ChannelHandlerContext ctx, final int bytesConsumed) {
ctx.flush(ctx.newFuture().addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (ctx.executor() == ctx.channel().eventLoop()) {
notifyFlushFutures(bytesConsumed, future);
} else {
synchronized (flushFutureNotifier) {
notifyFlushFutures(bytesConsumed, future);
}
}
}
private void notifyFlushFutures(final int bytesConsumed, ChannelFuture future) {
if (future.isSuccess()) {
flushFutureNotifier.increaseWriteCounter(bytesConsumed);
flushFutureNotifier.notifyFlushFutures();
} else {
flushFutureNotifier.notifyFlushFutures(future.cause());
}
}
}));
}
private void flush0(final ChannelHandlerContext ctx, final int bytesConsumed, final Throwable cause) {
ChannelFuture flushFuture = ctx.flush(ctx.newFuture().addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (ctx.executor() == ctx.channel().eventLoop()) {
notifyFlushFutures(ctx, bytesConsumed, cause, future);
} else {
synchronized (flushFutureNotifier) {
notifyFlushFutures(ctx, bytesConsumed, cause, future);
}
}
}
private void notifyFlushFutures(final ChannelHandlerContext ctx,
final int bytesConsumed, final Throwable cause, ChannelFuture future) {
flushFutureNotifier.increaseWriteCounter(bytesConsumed);
if (future.isSuccess()) {
flushFutureNotifier.notifyFlushFutures(cause);
} else {
flushFutureNotifier.notifyFlushFutures(cause, future.cause());
}
}
}));
safeClose(ctx, flushFuture, ctx.newFuture());
}
private static SSLEngineResult wrap(SSLEngine engine, ByteBuf in, ByteBuf out) throws SSLException {
ByteBuffer in0 = in.nioBuffer();
for (;;) {
@ -614,7 +658,9 @@ public class SslHandler
NotSslRecordException e = new NotSslRecordException(
"not an SSL/TLS record: " + ByteBufUtil.hexDump(in));
in.skipBytes(in.readableBytes());
throw e;
ctx.fireExceptionCaught(e);
setHandshakeFailure(e);
return;
}
}
@ -738,16 +784,19 @@ public class SslHandler
}
}
if (cause == null) {
cause = new ClosedChannelException();
}
for (;;) {
ChannelFuture f = handshakeFutures.poll();
if (f == null) {
break;
}
if (cause == null) {
cause = new ClosedChannelException();
}
f.setFailure(cause);
}
flush0(ctx, 0, cause);
}
private void closeOutboundAndChannel(
@ -765,26 +814,7 @@ public class SslHandler
ChannelFuture closeNotifyFuture = ctx.newFuture();
flush(ctx, closeNotifyFuture);
// Force-close the connection if close_notify is not fully sent in time.
final ScheduledFuture<?> timeoutFuture = ctx.executor().schedule(new Runnable() {
@Override
public void run() {
logger.warn(ctx.channel() + "close_notify write attempt timed out. Force-closing the connection.");
ctx.close(future);
}
}, 3, TimeUnit.SECONDS); // FIXME: Magic value
// Close the connection if close_notify is sent in time.
closeNotifyFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture f)
throws Exception {
if (timeoutFuture.cancel(false)) {
ctx.close(future);
}
}
});
safeClose(ctx, closeNotifyFuture, future);
}
@Override
@ -813,11 +843,11 @@ public class SslHandler
// issue and handshake and add a listener to it which will fire an exception event if
// an exception was thrown while doing the handshake
handshake().addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
ctx.pipeline().fireExceptionCaught(future.cause());
ctx.close();
} else {
// Send the event upstream after the handshake was completed without an error.
//
@ -832,6 +862,38 @@ public class SslHandler
}
}
private static void safeClose(
final ChannelHandlerContext ctx, ChannelFuture flushFuture,
final ChannelFuture closeFuture) {
if (!ctx.channel().isActive()) {
ctx.close(closeFuture);
return;
}
// Force-close the connection if close_notify is not fully sent in time.
final ScheduledFuture<?> timeoutFuture = ctx.executor().schedule(new Runnable() {
@Override
public void run() {
logger.warn(
ctx.channel() + " last lssssswrite attempt timed out." +
" Force-closing the connection.");
ctx.close(closeFuture);
}
}, 3, TimeUnit.SECONDS); // FIXME: Magic value
// Close the connection if close_notify is sent in time.
flushFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture f)
throws Exception {
timeoutFuture.cancel(false);
if (ctx.channel().isActive()) {
ctx.close(closeFuture);
}
}
});
}
private final class SSLEngineInboundCloseFuture extends DefaultChannelFuture {
public SSLEngineInboundCloseFuture() {
super(null, true);
@ -861,6 +923,4 @@ public class SslHandler
return false;
}
}
}

View File

@ -16,12 +16,12 @@
package io.netty.channel;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.Queue;
public class ChannelFlushFutureNotifier implements ChannelFutureListener {
public class ChannelFlushFutureNotifier {
private long writeCounter;
private final Deque<FlushCheckpoint> flushCheckpoints = new ArrayDeque<FlushCheckpoint>();
private final Queue<FlushCheckpoint> flushCheckpoints = new ArrayDeque<FlushCheckpoint>();
public void addFlushFuture(ChannelFuture future, int pendingDataSize) {
long checkpoint = writeCounter + pendingDataSize;
@ -39,7 +39,34 @@ public class ChannelFlushFutureNotifier implements ChannelFutureListener {
}
public void notifyFlushFutures() {
notifyFlushFutures0(null);
}
public void notifyFlushFutures(Throwable cause) {
notifyFlushFutures();
for (;;) {
FlushCheckpoint cp = flushCheckpoints.poll();
if (cp == null) {
break;
}
cp.future().setFailure(cause);
}
}
public void notifyFlushFutures(Throwable cause1, Throwable cause2) {
notifyFlushFutures0(cause1);
for (;;) {
FlushCheckpoint cp = flushCheckpoints.poll();
if (cp == null) {
break;
}
cp.future().setFailure(cause2);
}
}
private void notifyFlushFutures0(Throwable cause) {
if (flushCheckpoints.isEmpty()) {
writeCounter = 0;
return;
}
@ -61,7 +88,11 @@ public class ChannelFlushFutureNotifier implements ChannelFutureListener {
}
flushCheckpoints.remove();
cp.future().setSuccess();
if (cause == null) {
cp.future().setSuccess();
} else {
cp.future().setFailure(cause);
}
}
// Avoid overflow
@ -76,26 +107,6 @@ public class ChannelFlushFutureNotifier implements ChannelFutureListener {
}
}
public void notifyFlushFutures(Throwable cause) {
notifyFlushFutures();
for (;;) {
FlushCheckpoint cp = flushCheckpoints.poll();
if (cp == null) {
break;
}
cp.future().setFailure(cause);
}
}
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
notifyFlushFutures();
} else {
notifyFlushFutures(future.cause());
}
}
abstract static class FlushCheckpoint {
abstract long flushCheckpoint();
abstract void flushCheckpoint(long checkpoint);