SslHandler leak
Motivation: SslHandler only supports ByteBuf objects, but will not release objects of other types. SslHandler will also not release objects if its internal state is not correctly setup. Modifications: - Release non-ByteBuf objects in write - Release all objects if the SslHandler queue is not setup Result: Less leaks in SslHandler.
This commit is contained in:
parent
35b0cd58fb
commit
570d96d8c2
@ -34,6 +34,7 @@ import io.netty.channel.ChannelPromise;
|
|||||||
import io.netty.channel.ChannelPromiseNotifier;
|
import io.netty.channel.ChannelPromiseNotifier;
|
||||||
import io.netty.handler.codec.ByteToMessageDecoder;
|
import io.netty.handler.codec.ByteToMessageDecoder;
|
||||||
import io.netty.handler.codec.UnsupportedMessageTypeException;
|
import io.netty.handler.codec.UnsupportedMessageTypeException;
|
||||||
|
import io.netty.util.ReferenceCountUtil;
|
||||||
import io.netty.util.ReferenceCounted;
|
import io.netty.util.ReferenceCounted;
|
||||||
import io.netty.util.concurrent.DefaultPromise;
|
import io.netty.util.concurrent.DefaultPromise;
|
||||||
import io.netty.util.concurrent.EventExecutor;
|
import io.netty.util.concurrent.EventExecutor;
|
||||||
@ -656,6 +657,7 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH
|
|||||||
pendingUnencryptedWrites.releaseAndFailAll(ctx,
|
pendingUnencryptedWrites.releaseAndFailAll(ctx,
|
||||||
new ChannelException("Pending write on removal of SslHandler"));
|
new ChannelException("Pending write on removal of SslHandler"));
|
||||||
}
|
}
|
||||||
|
pendingUnencryptedWrites = null;
|
||||||
if (engine instanceof ReferenceCounted) {
|
if (engine instanceof ReferenceCounted) {
|
||||||
((ReferenceCounted) engine).release();
|
((ReferenceCounted) engine).release();
|
||||||
}
|
}
|
||||||
@ -698,14 +700,23 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH
|
|||||||
ctx.read();
|
ctx.read();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static IllegalStateException newPendingWritesNullException() {
|
||||||
|
return new IllegalStateException("pendingUnencryptedWrites is null, handlerRemoved0 called?");
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void write(final ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
|
public void write(final ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
|
||||||
if (!(msg instanceof ByteBuf)) {
|
if (!(msg instanceof ByteBuf)) {
|
||||||
promise.setFailure(new UnsupportedMessageTypeException(msg, ByteBuf.class));
|
UnsupportedMessageTypeException exception = new UnsupportedMessageTypeException(msg, ByteBuf.class);
|
||||||
return;
|
ReferenceCountUtil.safeRelease(msg);
|
||||||
}
|
promise.setFailure(exception);
|
||||||
|
} else if (pendingUnencryptedWrites == null) {
|
||||||
|
ReferenceCountUtil.safeRelease(msg);
|
||||||
|
promise.setFailure(newPendingWritesNullException());
|
||||||
|
} else {
|
||||||
pendingUnencryptedWrites.add((ByteBuf) msg, promise);
|
pendingUnencryptedWrites.add((ByteBuf) msg, promise);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void flush(ChannelHandlerContext ctx) throws Exception {
|
public void flush(ChannelHandlerContext ctx) throws Exception {
|
||||||
@ -1504,9 +1515,11 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH
|
|||||||
notifyHandshakeFailure(cause);
|
notifyHandshakeFailure(cause);
|
||||||
} finally {
|
} finally {
|
||||||
// Ensure we remove and fail all pending writes in all cases and so release memory quickly.
|
// Ensure we remove and fail all pending writes in all cases and so release memory quickly.
|
||||||
|
if (pendingUnencryptedWrites != null) {
|
||||||
pendingUnencryptedWrites.releaseAndFailAll(ctx, cause);
|
pendingUnencryptedWrites.releaseAndFailAll(ctx, cause);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private void notifyHandshakeFailure(Throwable cause) {
|
private void notifyHandshakeFailure(Throwable cause) {
|
||||||
if (handshakePromise.tryFailure(cause)) {
|
if (handshakePromise.tryFailure(cause)) {
|
||||||
@ -1558,7 +1571,11 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH
|
|||||||
}
|
}
|
||||||
|
|
||||||
private void flush(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
|
private void flush(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
|
||||||
|
if (pendingUnencryptedWrites != null) {
|
||||||
pendingUnencryptedWrites.add(Unpooled.EMPTY_BUFFER, promise);
|
pendingUnencryptedWrites.add(Unpooled.EMPTY_BUFFER, promise);
|
||||||
|
} else {
|
||||||
|
promise.setFailure(newPendingWritesNullException());
|
||||||
|
}
|
||||||
flush(ctx);
|
flush(ctx);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -42,6 +42,7 @@ import io.netty.handler.codec.UnsupportedMessageTypeException;
|
|||||||
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
|
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
|
||||||
import io.netty.handler.ssl.util.SelfSignedCertificate;
|
import io.netty.handler.ssl.util.SelfSignedCertificate;
|
||||||
import io.netty.handler.ssl.util.SimpleTrustManagerFactory;
|
import io.netty.handler.ssl.util.SimpleTrustManagerFactory;
|
||||||
|
import io.netty.util.AbstractReferenceCounted;
|
||||||
import io.netty.util.IllegalReferenceCountException;
|
import io.netty.util.IllegalReferenceCountException;
|
||||||
import io.netty.util.ReferenceCountUtil;
|
import io.netty.util.ReferenceCountUtil;
|
||||||
import io.netty.util.ReferenceCounted;
|
import io.netty.util.ReferenceCounted;
|
||||||
@ -143,6 +144,33 @@ public class SslHandlerTest {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testNonByteBufWriteIsReleased() throws Exception {
|
||||||
|
SSLEngine engine = SSLContext.getDefault().createSSLEngine();
|
||||||
|
engine.setUseClientMode(false);
|
||||||
|
|
||||||
|
EmbeddedChannel ch = new EmbeddedChannel(new SslHandler(engine));
|
||||||
|
|
||||||
|
AbstractReferenceCounted referenceCounted = new AbstractReferenceCounted() {
|
||||||
|
@Override
|
||||||
|
public ReferenceCounted touch(Object hint) {
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void deallocate() {
|
||||||
|
}
|
||||||
|
};
|
||||||
|
try {
|
||||||
|
ch.write(referenceCounted).get();
|
||||||
|
fail();
|
||||||
|
} catch (ExecutionException e) {
|
||||||
|
assertThat(e.getCause(), is(instanceOf(UnsupportedMessageTypeException.class)));
|
||||||
|
}
|
||||||
|
assertEquals(0, referenceCounted.refCnt());
|
||||||
|
assertTrue(ch.finishAndReleaseAll());
|
||||||
|
}
|
||||||
|
|
||||||
@Test(expected = UnsupportedMessageTypeException.class)
|
@Test(expected = UnsupportedMessageTypeException.class)
|
||||||
public void testNonByteBufNotPassThrough() throws Exception {
|
public void testNonByteBufNotPassThrough() throws Exception {
|
||||||
SSLEngine engine = SSLContext.getDefault().createSSLEngine();
|
SSLEngine engine = SSLContext.getDefault().createSSLEngine();
|
||||||
|
Loading…
Reference in New Issue
Block a user