Make PendingWrites recyclable to reduce GC pressure
This commit is contained in:
parent
7f86550ef8
commit
fa4e15e198
@ -28,6 +28,7 @@ import io.netty.channel.ChannelOutboundHandler;
|
|||||||
import io.netty.channel.ChannelPipeline;
|
import io.netty.channel.ChannelPipeline;
|
||||||
import io.netty.channel.ChannelPromise;
|
import io.netty.channel.ChannelPromise;
|
||||||
import io.netty.handler.codec.ByteToMessageDecoder;
|
import io.netty.handler.codec.ByteToMessageDecoder;
|
||||||
|
import io.netty.util.Recycler;
|
||||||
import io.netty.util.concurrent.DefaultPromise;
|
import io.netty.util.concurrent.DefaultPromise;
|
||||||
import io.netty.util.concurrent.EventExecutor;
|
import io.netty.util.concurrent.EventExecutor;
|
||||||
import io.netty.util.concurrent.Future;
|
import io.netty.util.concurrent.Future;
|
||||||
@ -354,7 +355,7 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH
|
|||||||
if (write == null) {
|
if (write == null) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
write.fail(new ChannelException("Pending write on removal of SslHandler"));
|
write.failAndRecycle(new ChannelException("Pending write on removal of SslHandler"));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -393,7 +394,7 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void write(final ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
|
public void write(final ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
|
||||||
pendingUnencryptedWrites.add(new PendingWrite((ByteBuf) msg, promise));
|
pendingUnencryptedWrites.add(PendingWrite.newInstance((ByteBuf) msg, promise));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -408,12 +409,13 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH
|
|||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
ctx.write(pendingWrite.buf, pendingWrite.promise);
|
ctx.write(pendingWrite.buf, pendingWrite.promise);
|
||||||
|
pendingWrite.recycle();
|
||||||
}
|
}
|
||||||
ctx.flush();
|
ctx.flush();
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
if (pendingUnencryptedWrites.isEmpty()) {
|
if (pendingUnencryptedWrites.isEmpty()) {
|
||||||
pendingUnencryptedWrites.add(new PendingWrite(Unpooled.EMPTY_BUFFER, null));
|
pendingUnencryptedWrites.add(PendingWrite.newInstance(Unpooled.EMPTY_BUFFER, null));
|
||||||
}
|
}
|
||||||
flush0(ctx);
|
flush0(ctx);
|
||||||
}
|
}
|
||||||
@ -435,6 +437,7 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH
|
|||||||
if (!pending.buf.isReadable()) {
|
if (!pending.buf.isReadable()) {
|
||||||
pending.buf.release();
|
pending.buf.release();
|
||||||
promise = pending.promise;
|
promise = pending.promise;
|
||||||
|
pending.recycle();
|
||||||
pendingUnencryptedWrites.remove();
|
pendingUnencryptedWrites.remove();
|
||||||
} else {
|
} else {
|
||||||
promise = null;
|
promise = null;
|
||||||
@ -448,7 +451,7 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH
|
|||||||
if (w == null) {
|
if (w == null) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
w.fail(SSLENGINE_CLOSED);
|
w.failAndRecycle(SSLENGINE_CLOSED);
|
||||||
}
|
}
|
||||||
return;
|
return;
|
||||||
} else {
|
} else {
|
||||||
@ -934,7 +937,7 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH
|
|||||||
if (write == null) {
|
if (write == null) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
write.fail(cause);
|
write.failAndRecycle(cause);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1082,23 +1085,46 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH
|
|||||||
}
|
}
|
||||||
|
|
||||||
private static final class PendingWrite {
|
private static final class PendingWrite {
|
||||||
final ByteBuf buf;
|
private static final Recycler<PendingWrite> RECYCLER = new Recycler<PendingWrite>() {
|
||||||
final ChannelPromise promise;
|
@Override
|
||||||
|
protected PendingWrite newObject(Handle handle) {
|
||||||
|
return new PendingWrite(handle);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
PendingWrite(ByteBuf buf, ChannelPromise promise) {
|
/**
|
||||||
this.buf = buf;
|
* Create a new empty {@link RecyclableArrayList} instance
|
||||||
this.promise = promise;
|
*/
|
||||||
|
public static PendingWrite newInstance(ByteBuf buf, ChannelPromise promise) {
|
||||||
|
PendingWrite pending = RECYCLER.get();
|
||||||
|
pending.buf = buf;
|
||||||
|
pending.promise = promise;
|
||||||
|
return pending;
|
||||||
}
|
}
|
||||||
|
|
||||||
PendingWrite(ByteBuf buf) {
|
private final Recycler.Handle handle;
|
||||||
this(buf, null);
|
private ByteBuf buf;
|
||||||
|
private ChannelPromise promise;
|
||||||
|
|
||||||
|
private PendingWrite(Recycler.Handle handle) {
|
||||||
|
this.handle = handle;
|
||||||
}
|
}
|
||||||
|
|
||||||
void fail(Throwable cause) {
|
/**
|
||||||
|
* Clear and recycle this instance.
|
||||||
|
*/
|
||||||
|
boolean recycle() {
|
||||||
|
buf = null;
|
||||||
|
promise = null;
|
||||||
|
return RECYCLER.recycle(this, handle);
|
||||||
|
}
|
||||||
|
|
||||||
|
boolean failAndRecycle(Throwable cause) {
|
||||||
buf.release();
|
buf.release();
|
||||||
if (promise != null) {
|
if (promise != null) {
|
||||||
promise.setFailure(cause);
|
promise.setFailure(cause);
|
||||||
}
|
}
|
||||||
|
return recycle();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user