Factor out the PendingWrite class and put it in internal package. Make use of it in SslHandler and ChunkedWriteHandler to reduce GC-pressure

This commit is contained in:
Norman Maurer 2013-07-25 12:36:24 +02:00
parent 6ff87cc20d
commit 2b3ac3d446
3 changed files with 127 additions and 89 deletions

View File

@ -0,0 +1,97 @@
/*
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.util.internal;
import io.netty.util.Recycler;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.concurrent.Promise;
/**
* Some pending write which should be picked up later.
*/
public final class PendingWrite {
private static final Recycler<PendingWrite> RECYCLER = new Recycler<PendingWrite>() {
@Override
protected PendingWrite newObject(Handle handle) {
return new PendingWrite(handle);
}
};
/**
* Create a new empty {@link RecyclableArrayList} instance
*/
public static PendingWrite newInstance(Object msg, Promise<Void> promise) {
PendingWrite pending = RECYCLER.get();
pending.msg = msg;
pending.promise = promise;
return pending;
}
private final Recycler.Handle handle;
private Object msg;
private Promise<Void> promise;
private PendingWrite(Recycler.Handle handle) {
this.handle = handle;
}
/**
* Clear and recycle this instance.
*/
public boolean recycle() {
msg = null;
promise = null;
return RECYCLER.recycle(this, handle);
}
/**
* Fails the underlying {@link Promise} with the given cause and reycle this instance.
*/
public boolean failAndRecycle(Throwable cause) {
ReferenceCountUtil.release(msg);
if (promise != null) {
promise.setFailure(cause);
}
return recycle();
}
/**
* Mark the underlying {@link Promise} successed and reycle this instance.
*/
public boolean successAndRecycle() {
if (promise != null) {
promise.setSuccess(null);
}
return recycle();
}
public Object msg() {
return msg;
}
public Promise<Void> promise() {
return promise;
}
/**
* Recycle this instance and return the {@link Promise}.
*/
public Promise<Void> recycleAndGet() {
Promise<Void> promise = this.promise;
recycle();
return promise;
}
}

View File

@ -28,13 +28,13 @@ import io.netty.channel.ChannelOutboundHandler;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.util.Recycler;
import io.netty.util.concurrent.DefaultPromise;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import io.netty.util.concurrent.ImmediateExecutor;
import io.netty.util.internal.EmptyArrays;
import io.netty.util.internal.PendingWrite;
import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.RecyclableArrayList;
import io.netty.util.internal.logging.InternalLogger;
@ -408,8 +408,7 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH
if (pendingWrite == null) {
break;
}
ctx.write(pendingWrite.buf, pendingWrite.promise);
pendingWrite.recycle();
ctx.write(pendingWrite.msg(), (ChannelPromise) pendingWrite.recycleAndGet());
}
ctx.flush();
return;
@ -432,12 +431,12 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH
if (out == null) {
out = ctx.alloc().buffer();
}
SSLEngineResult result = wrap(engine, pending.buf, out);
ByteBuf buf = (ByteBuf) pending.msg();
SSLEngineResult result = wrap(engine, buf, out);
if (!pending.buf.isReadable()) {
pending.buf.release();
promise = pending.promise;
pending.recycle();
if (!buf.isReadable()) {
buf.release();
promise = (ChannelPromise) pending.recycleAndGet();
pendingUnencryptedWrites.remove();
} else {
promise = null;
@ -1083,48 +1082,4 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH
return ctx.executor();
}
}
private static final class PendingWrite {
private static final Recycler<PendingWrite> RECYCLER = new Recycler<PendingWrite>() {
@Override
protected PendingWrite newObject(Handle handle) {
return new PendingWrite(handle);
}
};
/**
* Create a new empty {@link RecyclableArrayList} instance
*/
public static PendingWrite newInstance(ByteBuf buf, ChannelPromise promise) {
PendingWrite pending = RECYCLER.get();
pending.buf = buf;
pending.promise = promise;
return pending;
}
private final Recycler.Handle handle;
private ByteBuf buf;
private ChannelPromise promise;
private PendingWrite(Recycler.Handle handle) {
this.handle = handle;
}
/**
* Clear and recycle this instance.
*/
boolean recycle() {
buf = null;
promise = null;
return RECYCLER.recycle(this, handle);
}
boolean failAndRecycle(Throwable cause) {
buf.release();
if (promise != null) {
promise.setFailure(cause);
}
return recycle();
}
}
}

View File

@ -25,6 +25,7 @@ import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelProgressivePromise;
import io.netty.channel.ChannelPromise;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.internal.PendingWrite;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
@ -75,6 +76,8 @@ public class ChunkedWriteHandler
private volatile ChannelHandlerContext ctx;
private final AtomicInteger pendingWrites = new AtomicInteger();
private PendingWrite currentWrite;
private int progress;
public ChunkedWriteHandler() {
this(4);
}
@ -137,7 +140,7 @@ public class ChunkedWriteHandler
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
queue.add(new PendingWrite(msg, promise));
queue.add(PendingWrite.newInstance(msg, promise));
}
@Override
@ -166,7 +169,7 @@ public class ChunkedWriteHandler
if (currentWrite == null) {
break;
}
Object message = currentWrite.msg;
Object message = currentWrite.msg();
if (message instanceof ChunkedInput) {
ChunkedInput<?> in = (ChunkedInput<?>) message;
try {
@ -174,13 +177,13 @@ public class ChunkedWriteHandler
if (cause == null) {
cause = new ClosedChannelException();
}
currentWrite.fail(cause);
currentWrite.failAndRecycle(cause);
} else {
currentWrite.promise.setSuccess();
currentWrite.successAndRecycle();
}
closeInput(in);
} catch (Exception e) {
currentWrite.fail(e);
currentWrite.failAndRecycle(e);
logger.warn(ChunkedInput.class.getSimpleName() + ".isEndOfInput() failed", e);
closeInput(in);
}
@ -188,7 +191,7 @@ public class ChunkedWriteHandler
if (cause == null) {
cause = new ClosedChannelException();
}
currentWrite.fail(cause);
currentWrite.failAndRecycle(cause);
}
}
}
@ -210,7 +213,7 @@ public class ChunkedWriteHandler
}
needsFlush = true;
final PendingWrite currentWrite = this.currentWrite;
final Object pendingMessage = currentWrite.msg;
final Object pendingMessage = currentWrite.msg();
if (pendingMessage instanceof ChunkedInput) {
final ChunkedInput<?> chunks = (ChunkedInput<?>) pendingMessage;
@ -234,7 +237,7 @@ public class ChunkedWriteHandler
ReferenceCountUtil.release(message);
}
currentWrite.fail(t);
currentWrite.failAndRecycle(t);
if (ctx.executor().inEventLoop()) {
ctx.fireExceptionCaught(t);
} else {
@ -271,7 +274,7 @@ public class ChunkedWriteHandler
@Override
public void operationComplete(ChannelFuture future) throws Exception {
pendingWrites.decrementAndGet();
currentWrite.promise.setSuccess();
currentWrite.successAndRecycle();
closeInput(chunks);
}
});
@ -282,9 +285,9 @@ public class ChunkedWriteHandler
pendingWrites.decrementAndGet();
if (!future.isSuccess()) {
closeInput((ChunkedInput<?>) pendingMessage);
currentWrite.fail(future.cause());
currentWrite.failAndRecycle(future.cause());
} else {
currentWrite.progress();
progress((ChannelPromise) currentWrite.promise());
}
}
});
@ -295,9 +298,9 @@ public class ChunkedWriteHandler
pendingWrites.decrementAndGet();
if (!future.isSuccess()) {
closeInput((ChunkedInput<?>) pendingMessage);
currentWrite.fail(future.cause());
currentWrite.failAndRecycle(future.cause());
} else {
currentWrite.progress();
progress((ChannelPromise) currentWrite.promise());
if (isWritable()) {
resumeTransfer();
}
@ -306,7 +309,7 @@ public class ChunkedWriteHandler
});
}
} else {
ctx.write(pendingMessage, currentWrite.promise);
ctx.write(pendingMessage, (ChannelPromise) currentWrite.recycleAndGet());
this.currentWrite = null;
}
@ -320,7 +323,7 @@ public class ChunkedWriteHandler
}
}
static void closeInput(ChunkedInput<?> chunks) {
void closeInput(ChunkedInput<?> chunks) {
try {
chunks.close();
} catch (Throwable t) {
@ -328,30 +331,13 @@ public class ChunkedWriteHandler
logger.warn("Failed to close a chunked input.", t);
}
}
progress = 0;
}
private static final class PendingWrite {
final Object msg;
final ChannelPromise promise;
private long progress;
PendingWrite(Object msg, ChannelPromise promise) {
this.msg = msg;
this.promise = promise;
}
void fail(Throwable cause) {
ReferenceCountUtil.release(msg);
if (promise != null) {
promise.setFailure(cause);
}
}
void progress() {
void progress(ChannelPromise promise) {
progress ++;
if (promise instanceof ChannelProgressivePromise) {
((ChannelProgressivePromise) promise).tryProgress(progress, -1);
}
}
}
}