Introduce CodecOutputList to reduce overhead of encoder/decoder

Motivation:

99dfc9ea79 introduced some code that will more frequently try to forward messages out of the list of decoded messages to reduce latency and memory footprint. Unfortunally this has the side-effect that RecycleableArrayList.clear() will be called more often and so introduce some overhead as ArrayList will null out the array on each call.

Modifications:

- Introduce a CodecOutputList which allows to not null out the array until we recycle it and also allows to access internal array with extra range checks.
- Add benchmark that add elements to different List implementations and clear them

Result:

Less overhead when decode / encode messages.

Benchmark                                     (elements)   Mode  Cnt         Score        Error  Units
CodecOutputListBenchmark.arrayList                     1  thrpt   20  24853764.609 ± 161582.376  ops/s
CodecOutputListBenchmark.arrayList                     4  thrpt   20  17310636.508 ± 930517.403  ops/s
CodecOutputListBenchmark.codecOutList                  1  thrpt   20  26670751.661 ± 587812.655  ops/s
CodecOutputListBenchmark.codecOutList                  4  thrpt   20  25166421.089 ± 166945.599  ops/s
CodecOutputListBenchmark.recyclableArrayList           1  thrpt   20  24565992.626 ± 210017.290  ops/s
CodecOutputListBenchmark.recyclableArrayList           4  thrpt   20  18477881.775 ± 157003.777  ops/s

Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 246.748 sec - in io.netty.handler.codec.CodecOutputListBenchmark
This commit is contained in:
Norman Maurer 2016-05-03 16:43:47 +02:00
parent a425a8551d
commit e59d0e9efb
7 changed files with 307 additions and 37 deletions

View File

@ -22,7 +22,6 @@ import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.socket.ChannelInputShutdownEvent;
import io.netty.util.internal.RecyclableArrayList;
import io.netty.util.internal.StringUtil;
import java.util.List;
@ -233,7 +232,7 @@ public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof ByteBuf) {
RecyclableArrayList out = RecyclableArrayList.newInstance();
CodecOutputList out = CodecOutputList.newInstance();
try {
ByteBuf data = (ByteBuf) msg;
first = cumulation == null;
@ -273,8 +272,21 @@ public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter
* Get {@code numElements} out of the {@link List} and forward these through the pipeline.
*/
static void fireChannelRead(ChannelHandlerContext ctx, List<Object> msgs, int numElements) {
if (msgs instanceof CodecOutputList) {
fireChannelRead(ctx, (CodecOutputList) msgs, numElements);
} else {
for (int i = 0; i < numElements; i++) {
ctx.fireChannelRead(msgs.get(i));
}
}
}
/**
* Get {@code numElements} out of the {@link CodecOutputList} and forward these through the pipeline.
*/
static void fireChannelRead(ChannelHandlerContext ctx, CodecOutputList msgs, int numElements) {
for (int i = 0; i < numElements; i ++) {
ctx.fireChannelRead(msgs.get(i));
ctx.fireChannelRead(msgs.getUnsafe(i));
}
}
@ -321,7 +333,7 @@ public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter
}
private void channelInputClosed(ChannelHandlerContext ctx, boolean callChannelInactive) throws Exception {
RecyclableArrayList out = RecyclableArrayList.newInstance();
CodecOutputList out = CodecOutputList.newInstance();
try {
channelInputClosed(ctx, out);
} catch (DecoderException e) {
@ -344,7 +356,7 @@ public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter
ctx.fireChannelInactive();
}
} finally {
// recycle in all cases
// Recycle in all cases
out.recycle();
}
}

View File

@ -0,0 +1,174 @@
/*
* Copyright 2016 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec;
import io.netty.util.Recycler;
import java.util.AbstractList;
import java.util.RandomAccess;
import static io.netty.util.internal.ObjectUtil.checkNotNull;
/**
* Special {@link AbstractList} implementation which is used within our codec base classes.
*/
final class CodecOutputList extends AbstractList<Object> implements RandomAccess {
private static final Recycler<CodecOutputList> RECYCLER = new Recycler<CodecOutputList>() {
@Override
protected CodecOutputList newObject(Handle<CodecOutputList> handle) {
return new CodecOutputList(handle);
}
};
static CodecOutputList newInstance() {
return RECYCLER.get();
}
private final Recycler.Handle<CodecOutputList> handle;
private int size;
// Size of 16 should be good enough for 99 % of all users.
private Object[] array = new Object[16];
private boolean insertSinceRecycled;
private CodecOutputList(Recycler.Handle<CodecOutputList> handle) {
this.handle = handle;
}
@Override
public Object get(int index) {
checkIndex(index);
return array[index];
}
@Override
public int size() {
return size;
}
@Override
public boolean add(Object element) {
checkNotNull(element, "element");
try {
insert(size, element);
} catch (IndexOutOfBoundsException ignore) {
// This should happen very infrequently so we just catch the exception and try again.
expandArray();
insert(size, element);
}
++ size;
return true;
}
@Override
public Object set(int index, Object element) {
checkNotNull(element, "element");
checkIndex(index);
Object old = array[index];
insert(index, element);
return old;
}
@Override
public void add(int index, Object element) {
checkNotNull(element, "element");
checkIndex(index);
if (size == array.length) {
expandArray();
}
if (index != size - 1) {
System.arraycopy(array, index, array, index + 1, size - index);
}
insert(index, element);
++ size;
}
@Override
public Object remove(int index) {
checkIndex(index);
Object old = array[index];
int len = size - index - 1;
if (len > 0) {
System.arraycopy(array, index + 1, array, index, len);
}
array[-- size] = null;
return old;
}
@Override
public void clear() {
// We only set the size to 0 and not null out the array. Null out the array will explicit requested by
// calling recycle()
size = 0;
}
/**
* Returns {@code true} if any elements where added or set. This will be reset once {@link #recycle()} was called.
*/
boolean insertSinceRecycled() {
return insertSinceRecycled;
}
/**
* Recycle the array which will clear it and null out all entries in the internal storage.
*/
void recycle() {
for (int i = 0 ; i < size; i ++) {
array[i] = null;
}
clear();
insertSinceRecycled = false;
handle.recycle(this);
}
/**
* Returns the element on the given index. This operation will not do any range-checks and so is considered unsafe.
*/
Object getUnsafe(int index) {
return array[index];
}
private void checkIndex(int index) {
if (index >= size) {
throw new IndexOutOfBoundsException();
}
}
private void insert(int index, Object element) {
array[index] = element;
insertSinceRecycled = true;
}
private void expandArray() {
// double capacity
int newCapacity = array.length << 1;
if (newCapacity < 0) {
throw new OutOfMemoryError();
}
Object[] newArray = new Object[newCapacity];
System.arraycopy(array, 0, newArray, 0, array.length);
array = newArray;
}
}

View File

@ -22,8 +22,6 @@ import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise;
import io.netty.channel.socket.DatagramPacket;
import io.netty.handler.codec.protobuf.ProtobufEncoder;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.internal.RecyclableArrayList;
import io.netty.util.internal.StringUtil;
import static io.netty.util.internal.ObjectUtil.checkNotNull;
@ -62,6 +60,7 @@ public class DatagramPacketEncoder<M> extends MessageToMessageEncoder<AddressedE
@Override
public boolean acceptOutboundMessage(Object msg) throws Exception {
if (super.acceptOutboundMessage(msg)) {
@SuppressWarnings("rawtypes")
AddressedEnvelope envelope = (AddressedEnvelope) msg;
return encoder.acceptOutboundMessage(envelope.content())
&& envelope.sender() instanceof InetSocketAddress
@ -73,28 +72,20 @@ public class DatagramPacketEncoder<M> extends MessageToMessageEncoder<AddressedE
@Override
protected void encode(
ChannelHandlerContext ctx, AddressedEnvelope<M, InetSocketAddress> msg, List<Object> out) throws Exception {
RecyclableArrayList buffers = null;
try {
buffers = RecyclableArrayList.newInstance();
encoder.encode(ctx, msg.content(), buffers);
if (buffers.size() != 1) {
throw new EncoderException(
StringUtil.simpleClassName(encoder) + " must produce only one message.");
}
Object content = buffers.get(0);
if (content instanceof ByteBuf) {
out.add(new DatagramPacket(((ByteBuf) content).retain(), msg.recipient(), msg.sender()));
} else {
throw new EncoderException(
StringUtil.simpleClassName(encoder) + " must produce only ByteBuf.");
}
} finally {
if (buffers != null) {
for (Object o : buffers) {
ReferenceCountUtil.release(o);
}
buffers.recycle();
}
assert out.isEmpty();
encoder.encode(ctx, msg.content(), out);
if (out.size() != 1) {
throw new EncoderException(
StringUtil.simpleClassName(encoder) + " must produce only one message.");
}
Object content = out.get(0);
if (content instanceof ByteBuf) {
// Replace the ByteBuf with a DatagramPacket.
out.set(0, new DatagramPacket((ByteBuf) content, msg.recipient(), msg.sender()));
} else {
throw new EncoderException(
StringUtil.simpleClassName(encoder) + " must produce only ByteBuf.");
}
}

View File

@ -21,7 +21,6 @@ import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelPipeline;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.ReferenceCounted;
import io.netty.util.internal.RecyclableArrayList;
import io.netty.util.internal.TypeParameterMatcher;
import java.util.List;
@ -80,7 +79,7 @@ public abstract class MessageToMessageDecoder<I> extends ChannelInboundHandlerAd
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
RecyclableArrayList out = RecyclableArrayList.newInstance();
CodecOutputList out = CodecOutputList.newInstance();
try {
if (acceptInboundMessage(msg)) {
@SuppressWarnings("unchecked")
@ -100,7 +99,7 @@ public abstract class MessageToMessageDecoder<I> extends ChannelInboundHandlerAd
} finally {
int size = out.size();
for (int i = 0; i < size; i ++) {
ctx.fireChannelRead(out.get(i));
ctx.fireChannelRead(out.getUnsafe(i));
}
out.recycle();
}

View File

@ -22,7 +22,6 @@ import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.ReferenceCounted;
import io.netty.util.internal.RecyclableArrayList;
import io.netty.util.internal.StringUtil;
import io.netty.util.internal.TypeParameterMatcher;
@ -79,10 +78,10 @@ public abstract class MessageToMessageEncoder<I> extends ChannelOutboundHandlerA
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
RecyclableArrayList out = null;
CodecOutputList out = null;
try {
if (acceptOutboundMessage(msg)) {
out = RecyclableArrayList.newInstance();
out = CodecOutputList.newInstance();
@SuppressWarnings("unchecked")
I cast = (I) msg;
try {
@ -122,9 +121,9 @@ public abstract class MessageToMessageEncoder<I> extends ChannelOutboundHandlerA
} else {
p = ctx.newPromise();
}
ctx.write(out.get(i), p);
ctx.write(out.getUnsafe(i), p);
}
ctx.write(out.get(sizeMinusOne), promise);
ctx.write(out.getUnsafe(sizeMinusOne), promise);
}
out.recycle();
}

View File

@ -0,0 +1,76 @@
/*
* Copyright 2016 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec;
import io.netty.microbench.util.AbstractMicrobenchmark;
import io.netty.util.internal.RecyclableArrayList;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.TearDown;
import java.util.ArrayList;
import java.util.List;
@State(Scope.Benchmark)
public class CodecOutputListBenchmark extends AbstractMicrobenchmark {
private static final Object ELEMENT = new Object();
private CodecOutputList codecOutputList;
private RecyclableArrayList recycleableArrayList;
private List<Object> arrayList;
@Param({ "1", "4" })
public int elements;
@Setup(Level.Invocation)
public void setup() {
codecOutputList = CodecOutputList.newInstance();
recycleableArrayList = RecyclableArrayList.newInstance(16);
arrayList = new ArrayList<Object>(16);
}
@TearDown
public void destroy() {
codecOutputList.recycle();
recycleableArrayList.recycle();
}
@Benchmark
public void codecOutList() {
benchmarkAddAndClear(codecOutputList, elements);
}
@Benchmark
public void recyclableArrayList() {
benchmarkAddAndClear(recycleableArrayList, elements);
}
@Benchmark
public void arrayList() {
benchmarkAddAndClear(arrayList, elements);
}
private static void benchmarkAddAndClear(List<Object> list, int elements) {
for (int i = 0; i < elements; i++) {
list.add(ELEMENT);
}
list.clear();
}
}

View File

@ -0,0 +1,19 @@
/*
* Copyright 2016 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
/**
* Benchmarks for {@link io.netty.handler.codec}.
*/
package io.netty.handler.codec;