SslHandler consolidate state to save memory (#11160)

Motivation:
SslHandler has many independent boolean member variables. They can be
collapsed into a single variable to save memory.

Modifications:
- SslHandler boolean state consolidated into a single short variable.

Result:
Savings of 8 bytes per SslHandler (which is per connection) observed on
OpenJDK.
This commit is contained in:
Scott Mitchell 2021-04-15 08:18:54 -07:00 committed by GitHub
parent c93419ba57
commit 16b40d8a37
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -167,14 +167,29 @@ import static io.netty.handler.ssl.SslUtils.getEncryptedPacketLength;
* <a href="https://github.com/netty/netty/issues/832">#832</a> in our issue tracker. * <a href="https://github.com/netty/netty/issues/832">#832</a> in our issue tracker.
*/ */
public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundHandler { public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundHandler {
private static final InternalLogger logger = private static final InternalLogger logger =
InternalLoggerFactory.getInstance(SslHandler.class); InternalLoggerFactory.getInstance(SslHandler.class);
private static final Pattern IGNORABLE_CLASS_IN_STACK = Pattern.compile( private static final Pattern IGNORABLE_CLASS_IN_STACK = Pattern.compile(
"^.*(?:Socket|Datagram|Sctp|Udt)Channel.*$"); "^.*(?:Socket|Datagram|Sctp|Udt)Channel.*$");
private static final Pattern IGNORABLE_ERROR_MESSAGE = Pattern.compile( private static final Pattern IGNORABLE_ERROR_MESSAGE = Pattern.compile(
"^.*(?:connection.*(?:reset|closed|abort|broken)|broken.*pipe).*$", Pattern.CASE_INSENSITIVE); "^.*(?:connection.*(?:reset|closed|abort|broken)|broken.*pipe).*$", Pattern.CASE_INSENSITIVE);
private static final int STATE_SENT_FIRST_MESSAGE = 1;
private static final int STATE_FLUSHED_BEFORE_HANDSHAKE = 1 << 1;
private static final int STATE_READ_DURING_HANDSHAKE = 1 << 2;
private static final int STATE_HANDSHAKE_STARTED = 1 << 3;
/**
* Set by wrap*() methods when something is produced.
* {@link #channelReadComplete(ChannelHandlerContext)} will check this flag, clear it, and call ctx.flush().
*/
private static final int STATE_NEEDS_FLUSH = 1 << 4;
private static final int STATE_OUTBOUND_CLOSED = 1 << 5;
private static final int STATE_CLOSE_NOTIFY = 1 << 6;
private static final int STATE_PROCESS_TASK = 1 << 7;
/**
* This flag is used to determine if we need to call {@link ChannelHandlerContext#read()} to consume more data
* when {@link ChannelConfig#isAutoRead()} is {@code false}.
*/
private static final int STATE_FIRE_CHANNEL_READ = 1 << 8;
/** /**
* <a href="https://tools.ietf.org/html/rfc5246#section-6.2">2^14</a> which is the maximum sized plaintext chunk * <a href="https://tools.ietf.org/html/rfc5246#section-6.2">2^14</a> which is the maximum sized plaintext chunk
@ -380,32 +395,13 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH
private final ByteBuffer[] singleBuffer = new ByteBuffer[1]; private final ByteBuffer[] singleBuffer = new ByteBuffer[1];
private final boolean startTls; private final boolean startTls;
private boolean sentFirstMessage;
private boolean flushedBeforeHandshake;
private boolean readDuringHandshake;
private boolean handshakeStarted;
private SslHandlerCoalescingBufferQueue pendingUnencryptedWrites; private SslHandlerCoalescingBufferQueue pendingUnencryptedWrites;
private Promise<Channel> handshakePromise = new LazyChannelPromise(); private Promise<Channel> handshakePromise = new LazyChannelPromise();
private final LazyChannelPromise sslClosePromise = new LazyChannelPromise(); private final LazyChannelPromise sslClosePromise = new LazyChannelPromise();
/**
* Set by wrap*() methods when something is produced.
* {@link #channelReadComplete(ChannelHandlerContext)} will check this flag, clear it, and call ctx.flush().
*/
private boolean needsFlush;
private boolean outboundClosed;
private boolean closeNotify;
private boolean processTask;
private int packetLength; private int packetLength;
private short state;
/**
* This flag is used to determine if we need to call {@link ChannelHandlerContext#read()} to consume more data
* when {@link ChannelConfig#isAutoRead()} is {@code false}.
*/
private boolean firedChannelRead;
private volatile long handshakeTimeoutMillis = 10000; private volatile long handshakeTimeoutMillis = 10000;
private volatile long closeNotifyFlushTimeoutMillis = 3000; private volatile long closeNotifyFlushTimeoutMillis = 3000;
@ -664,7 +660,7 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH
} }
private void closeOutbound0(ChannelPromise promise) { private void closeOutbound0(ChannelPromise promise) {
outboundClosed = true; setState(STATE_OUTBOUND_CLOSED);
engine.closeOutbound(); engine.closeOutbound();
try { try {
flush(ctx, promise); flush(ctx, promise);
@ -748,7 +744,7 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH
@Override @Override
public void read(ChannelHandlerContext ctx) throws Exception { public void read(ChannelHandlerContext ctx) throws Exception {
if (!handshakePromise.isDone()) { if (!handshakePromise.isDone()) {
readDuringHandshake = true; setState(STATE_READ_DURING_HANDSHAKE);
} }
ctx.read(); ctx.read();
@ -776,8 +772,8 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH
public void flush(ChannelHandlerContext ctx) throws Exception { public void flush(ChannelHandlerContext ctx) throws Exception {
// Do not encrypt the first write request if this handler is // Do not encrypt the first write request if this handler is
// created with startTLS flag turned on. // created with startTLS flag turned on.
if (startTls && !sentFirstMessage) { if (startTls && !isStateSet(STATE_SENT_FIRST_MESSAGE)) {
sentFirstMessage = true; setState(STATE_SENT_FIRST_MESSAGE);
pendingUnencryptedWrites.writeAndRemoveAll(ctx); pendingUnencryptedWrites.writeAndRemoveAll(ctx);
forceFlush(ctx); forceFlush(ctx);
// Explicit start handshake processing once we send the first message. This will also ensure // Explicit start handshake processing once we send the first message. This will also ensure
@ -786,7 +782,7 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH
return; return;
} }
if (processTask) { if (isStateSet(STATE_PROCESS_TASK)) {
return; return;
} }
@ -807,7 +803,7 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH
pendingUnencryptedWrites.add(Unpooled.EMPTY_BUFFER, ctx.newPromise()); pendingUnencryptedWrites.add(Unpooled.EMPTY_BUFFER, ctx.newPromise());
} }
if (!handshakePromise.isDone()) { if (!handshakePromise.isDone()) {
flushedBeforeHandshake = true; setState(STATE_FLUSHED_BEFORE_HANDSHAKE);
} }
try { try {
wrap(ctx, false); wrap(ctx, false);
@ -917,7 +913,7 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH
out.release(); out.release();
} }
if (inUnwrap) { if (inUnwrap) {
needsFlush = true; setState(STATE_NEEDS_FLUSH);
} }
} }
} }
@ -953,7 +949,7 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH
} }
}); });
if (inUnwrap) { if (inUnwrap) {
needsFlush = true; setState(STATE_NEEDS_FLUSH);
} }
out = null; out = null;
} }
@ -1083,7 +1079,8 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH
ClosedChannelException exception = new ClosedChannelException(); ClosedChannelException exception = new ClosedChannelException();
// Make sure to release SSLEngine, // Make sure to release SSLEngine,
// and notify the handshake future if the connection has been closed during handshake. // and notify the handshake future if the connection has been closed during handshake.
setHandshakeFailure(ctx, exception, !outboundClosed, handshakeStarted, false); setHandshakeFailure(ctx, exception, !isStateSet(STATE_OUTBOUND_CLOSED), isStateSet(STATE_HANDSHAKE_STARTED),
false);
// Ensure we always notify the sslClosePromise as well // Ensure we always notify the sslClosePromise as well
notifyClosePromise(exception); notifyClosePromise(exception);
@ -1293,7 +1290,7 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH
@Override @Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws SSLException { protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws SSLException {
if (processTask) { if (isStateSet(STATE_PROCESS_TASK)) {
return; return;
} }
if (jdkCompatibilityMode) { if (jdkCompatibilityMode) {
@ -1315,13 +1312,14 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH
flushIfNeeded(ctx); flushIfNeeded(ctx);
readIfNeeded(ctx); readIfNeeded(ctx);
firedChannelRead = false; clearState(STATE_FIRE_CHANNEL_READ);
ctx.fireChannelReadComplete(); ctx.fireChannelReadComplete();
} }
private void readIfNeeded(ChannelHandlerContext ctx) { private void readIfNeeded(ChannelHandlerContext ctx) {
// If handshake is not finished yet, we need more data. // If handshake is not finished yet, we need more data.
if (!ctx.channel().config().isAutoRead() && (!firedChannelRead || !handshakePromise.isDone())) { if (!ctx.channel().config().isAutoRead() &&
(!isStateSet(STATE_FIRE_CHANNEL_READ) || !handshakePromise.isDone())) {
// No auto-read used and no message passed through the ChannelPipeline or the handshake was not complete // No auto-read used and no message passed through the ChannelPipeline or the handshake was not complete
// yet, which means we need to trigger the read to ensure we not encounter any stalls. // yet, which means we need to trigger the read to ensure we not encounter any stalls.
ctx.read(); ctx.read();
@ -1329,7 +1327,7 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH
} }
private void flushIfNeeded(ChannelHandlerContext ctx) { private void flushIfNeeded(ChannelHandlerContext ctx) {
if (needsFlush) { if (isStateSet(STATE_NEEDS_FLUSH)) {
forceFlush(ctx); forceFlush(ctx);
} }
} }
@ -1372,7 +1370,7 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH
overflowReadableBytes = readableBytes; overflowReadableBytes = readableBytes;
int bufferSize = engine.getSession().getApplicationBufferSize() - readableBytes; int bufferSize = engine.getSession().getApplicationBufferSize() - readableBytes;
if (readableBytes > 0) { if (readableBytes > 0) {
firedChannelRead = true; setState(STATE_FIRE_CHANNEL_READ);
ctx.fireChannelRead(decodeOut); ctx.fireChannelRead(decodeOut);
// This buffer was handled, null it out. // This buffer was handled, null it out.
@ -1480,12 +1478,12 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH
} }
} }
if (flushedBeforeHandshake && handshakePromise.isDone()) { if (isStateSet(STATE_FLUSHED_BEFORE_HANDSHAKE) && handshakePromise.isDone()) {
// We need to call wrap(...) in case there was a flush done before the handshake completed to ensure // We need to call wrap(...) in case there was a flush done before the handshake completed to ensure
// we do not stale. // we do not stale.
// //
// See https://github.com/netty/netty/pull/2437 // See https://github.com/netty/netty/pull/2437
flushedBeforeHandshake = false; clearState(STATE_FLUSHED_BEFORE_HANDSHAKE);
wrapLater = true; wrapLater = true;
} }
@ -1495,7 +1493,7 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH
} finally { } finally {
if (decodeOut != null) { if (decodeOut != null) {
if (decodeOut.isReadable()) { if (decodeOut.isReadable()) {
firedChannelRead = true; setState(STATE_FIRE_CHANNEL_READ);
ctx.fireChannelRead(decodeOut); ctx.fireChannelRead(decodeOut);
} else { } else {
@ -1548,11 +1546,11 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH
} }
private void executeDelegatedTasks(boolean inUnwrap) { private void executeDelegatedTasks(boolean inUnwrap) {
processTask = true; setState(STATE_PROCESS_TASK);
try { try {
delegatedTaskExecutor.execute(new SslTasksRunner(inUnwrap)); delegatedTaskExecutor.execute(new SslTasksRunner(inUnwrap));
} catch (RejectedExecutionException e) { } catch (RejectedExecutionException e) {
processTask = false; clearState(STATE_PROCESS_TASK);
throw e; throw e;
} }
} }
@ -1624,9 +1622,7 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH
*/ */
private void resumeOnEventExecutor() { private void resumeOnEventExecutor() {
assert ctx.executor().inEventLoop(); assert ctx.executor().inEventLoop();
clearState(STATE_PROCESS_TASK);
processTask = false;
try { try {
HandshakeStatus status = engine.getHandshakeStatus(); HandshakeStatus status = engine.getHandshakeStatus();
switch (status) { switch (status) {
@ -1731,19 +1727,19 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH
private void handleException(final Throwable cause) { private void handleException(final Throwable cause) {
if (ctx.executor().inEventLoop()) { if (ctx.executor().inEventLoop()) {
processTask = false; clearState(STATE_PROCESS_TASK);
safeExceptionCaught(cause); safeExceptionCaught(cause);
} else { } else {
try { try {
ctx.executor().execute(new Runnable() { ctx.executor().execute(new Runnable() {
@Override @Override
public void run() { public void run() {
processTask = false; clearState(STATE_PROCESS_TASK);
safeExceptionCaught(cause); safeExceptionCaught(cause);
} }
}); });
} catch (RejectedExecutionException ignore) { } catch (RejectedExecutionException ignore) {
processTask = false; clearState(STATE_PROCESS_TASK);
// the context itself will handle the rejected exception when try to schedule the operation so // the context itself will handle the rejected exception when try to schedule the operation so
// ignore the RejectedExecutionException // ignore the RejectedExecutionException
ctx.fireExceptionCaught(cause); ctx.fireExceptionCaught(cause);
@ -1769,8 +1765,8 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH
* was fired. {@code false} otherwise. * was fired. {@code false} otherwise.
*/ */
private boolean setHandshakeSuccess() { private boolean setHandshakeSuccess() {
if (readDuringHandshake && !ctx.channel().config().isAutoRead()) { if (isStateSet(STATE_READ_DURING_HANDSHAKE) && !ctx.channel().config().isAutoRead()) {
readDuringHandshake = false; clearState(STATE_READ_DURING_HANDSHAKE);
ctx.read(); ctx.read();
} }
// Our control flow may invoke this method multiple times for a single FINISHED event. For example // Our control flow may invoke this method multiple times for a single FINISHED event. For example
@ -1805,9 +1801,8 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH
private void setHandshakeFailure(ChannelHandlerContext ctx, Throwable cause, boolean closeInbound, private void setHandshakeFailure(ChannelHandlerContext ctx, Throwable cause, boolean closeInbound,
boolean notify, boolean alwaysFlushAndClose) { boolean notify, boolean alwaysFlushAndClose) {
try { try {
// Release all resources such as internal buffers that SSLEngine // Release all resources such as internal buffers that SSLEngine is managing.
// is managing. setState(STATE_OUTBOUND_CLOSED);
outboundClosed = true;
engine.closeOutbound(); engine.closeOutbound();
if (closeInbound) { if (closeInbound) {
@ -1871,7 +1866,7 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH
private void closeOutboundAndChannel( private void closeOutboundAndChannel(
final ChannelHandlerContext ctx, final ChannelPromise promise, boolean disconnect) throws Exception { final ChannelHandlerContext ctx, final ChannelPromise promise, boolean disconnect) throws Exception {
outboundClosed = true; setState(STATE_OUTBOUND_CLOSED);
engine.closeOutbound(); engine.closeOutbound();
if (!ctx.channel().isActive()) { if (!ctx.channel().isActive()) {
@ -1887,8 +1882,8 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH
try { try {
flush(ctx, closeNotifyPromise); flush(ctx, closeNotifyPromise);
} finally { } finally {
if (!closeNotify) { if (!isStateSet(STATE_CLOSE_NOTIFY)) {
closeNotify = true; setState(STATE_CLOSE_NOTIFY);
// It's important that we do not pass the original ChannelPromise to safeClose(...) as when flush(....) // It's important that we do not pass the original ChannelPromise to safeClose(...) as when flush(....)
// throws an Exception it will be propagated to the AbstractChannelHandlerContext which will try // throws an Exception it will be propagated to the AbstractChannelHandlerContext which will try
// to fail the promise because of this. This will then fail as it was already completed by // to fail the promise because of this. This will then fail as it was already completed by
@ -1936,14 +1931,16 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH
// If we weren't able to include client_hello in the TCP SYN (e.g. no token, disabled at the OS) we have to // If we weren't able to include client_hello in the TCP SYN (e.g. no token, disabled at the OS) we have to
// flush pending data in the outbound buffer later in channelActive(). // flush pending data in the outbound buffer later in channelActive().
final ChannelOutboundBuffer outboundBuffer; final ChannelOutboundBuffer outboundBuffer;
needsFlush |= fastOpen && ((outboundBuffer = channel.unsafe().outboundBuffer()) == null || if (fastOpen && ((outboundBuffer = channel.unsafe().outboundBuffer()) == null ||
outboundBuffer.totalPendingWriteBytes() > 0); outboundBuffer.totalPendingWriteBytes() > 0)) {
setState(STATE_NEEDS_FLUSH);
}
} }
} }
private void startHandshakeProcessing(boolean flushAtEnd) { private void startHandshakeProcessing(boolean flushAtEnd) {
if (!handshakeStarted) { if (!isStateSet(STATE_HANDSHAKE_STARTED)) {
handshakeStarted = true; setState(STATE_HANDSHAKE_STARTED);
if (engine.getUseClientMode()) { if (engine.getUseClientMode()) {
// Begin the initial handshake. // Begin the initial handshake.
// channelActive() event has been fired already, which means this.channelActive() will // channelActive() event has been fired already, which means this.channelActive() will
@ -1951,7 +1948,7 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH
handshake(flushAtEnd); handshake(flushAtEnd);
} }
applyHandshakeTimeout(); applyHandshakeTimeout();
} else if (needsFlush) { } else if (isStateSet(STATE_NEEDS_FLUSH)) {
forceFlush(ctx); forceFlush(ctx);
} }
} }
@ -2079,7 +2076,7 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH
} }
private void forceFlush(ChannelHandlerContext ctx) { private void forceFlush(ChannelHandlerContext ctx) {
needsFlush = false; clearState(STATE_NEEDS_FLUSH);
ctx.flush(); ctx.flush();
} }
@ -2205,6 +2202,18 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH
return engineType.allocateWrapBuffer(this, ctx.alloc(), pendingBytes, numComponents); return engineType.allocateWrapBuffer(this, ctx.alloc(), pendingBytes, numComponents);
} }
private boolean isStateSet(int bit) {
return (state & bit) == bit;
}
private void setState(int bit) {
state |= bit;
}
private void clearState(int bit) {
state &= ~bit;
}
/** /**
* Each call to SSL_write will introduce about ~100 bytes of overhead. This coalescing queue attempts to increase * Each call to SSL_write will introduce about ~100 bytes of overhead. This coalescing queue attempts to increase
* goodput by aggregating the plaintext in chunks of {@link #wrapDataSize}. If many small chunks are written * goodput by aggregating the plaintext in chunks of {@link #wrapDataSize}. If many small chunks are written