Introduce CodecOutputList to reduce overhead of encoder/decoder
Motivation: 99dfc9ea799348430a1c25776ce30a95bc10a1ff introduced some code that will more frequently try to forward messages out of the list of decoded messages to reduce latency and memory footprint. Unfortunally this has the side-effect that RecycleableArrayList.clear() will be called more often and so introduce some overhead as ArrayList will null out the array on each call. Modifications: - Introduce a CodecOutputList which allows to not null out the array until we recycle it and also allows to access internal array with extra range checks. - Add benchmark that add elements to different List implementations and clear them Result: Less overhead when decode / encode messages. Benchmark (elements) Mode Cnt Score Error Units CodecOutputListBenchmark.arrayList 1 thrpt 20 24853764.609 ± 161582.376 ops/s CodecOutputListBenchmark.arrayList 4 thrpt 20 17310636.508 ± 930517.403 ops/s CodecOutputListBenchmark.codecOutList 1 thrpt 20 26670751.661 ± 587812.655 ops/s CodecOutputListBenchmark.codecOutList 4 thrpt 20 25166421.089 ± 166945.599 ops/s CodecOutputListBenchmark.recyclableArrayList 1 thrpt 20 24565992.626 ± 210017.290 ops/s CodecOutputListBenchmark.recyclableArrayList 4 thrpt 20 18477881.775 ± 157003.777 ops/s Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 246.748 sec - in io.netty.handler.codec.CodecOutputListBenchmark
This commit is contained in:
parent
9c34c3f344
commit
0ea4597542
@ -22,7 +22,6 @@ import io.netty.buffer.Unpooled;
|
|||||||
import io.netty.channel.ChannelHandlerContext;
|
import io.netty.channel.ChannelHandlerContext;
|
||||||
import io.netty.channel.ChannelInboundHandlerAdapter;
|
import io.netty.channel.ChannelInboundHandlerAdapter;
|
||||||
import io.netty.channel.socket.ChannelInputShutdownEvent;
|
import io.netty.channel.socket.ChannelInputShutdownEvent;
|
||||||
import io.netty.util.internal.RecyclableArrayList;
|
|
||||||
import io.netty.util.internal.StringUtil;
|
import io.netty.util.internal.StringUtil;
|
||||||
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
@ -233,7 +232,7 @@ public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter
|
|||||||
@Override
|
@Override
|
||||||
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
|
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
|
||||||
if (msg instanceof ByteBuf) {
|
if (msg instanceof ByteBuf) {
|
||||||
RecyclableArrayList out = RecyclableArrayList.newInstance();
|
CodecOutputList out = CodecOutputList.newInstance();
|
||||||
try {
|
try {
|
||||||
ByteBuf data = (ByteBuf) msg;
|
ByteBuf data = (ByteBuf) msg;
|
||||||
first = cumulation == null;
|
first = cumulation == null;
|
||||||
@ -273,10 +272,23 @@ public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter
|
|||||||
* Get {@code numElements} out of the {@link List} and forward these through the pipeline.
|
* Get {@code numElements} out of the {@link List} and forward these through the pipeline.
|
||||||
*/
|
*/
|
||||||
static void fireChannelRead(ChannelHandlerContext ctx, List<Object> msgs, int numElements) {
|
static void fireChannelRead(ChannelHandlerContext ctx, List<Object> msgs, int numElements) {
|
||||||
|
if (msgs instanceof CodecOutputList) {
|
||||||
|
fireChannelRead(ctx, (CodecOutputList) msgs, numElements);
|
||||||
|
} else {
|
||||||
for (int i = 0; i < numElements; i++) {
|
for (int i = 0; i < numElements; i++) {
|
||||||
ctx.fireChannelRead(msgs.get(i));
|
ctx.fireChannelRead(msgs.get(i));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get {@code numElements} out of the {@link CodecOutputList} and forward these through the pipeline.
|
||||||
|
*/
|
||||||
|
static void fireChannelRead(ChannelHandlerContext ctx, CodecOutputList msgs, int numElements) {
|
||||||
|
for (int i = 0; i < numElements; i ++) {
|
||||||
|
ctx.fireChannelRead(msgs.getUnsafe(i));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
|
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
|
||||||
@ -321,7 +333,7 @@ public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter
|
|||||||
}
|
}
|
||||||
|
|
||||||
private void channelInputClosed(ChannelHandlerContext ctx, boolean callChannelInactive) throws Exception {
|
private void channelInputClosed(ChannelHandlerContext ctx, boolean callChannelInactive) throws Exception {
|
||||||
RecyclableArrayList out = RecyclableArrayList.newInstance();
|
CodecOutputList out = CodecOutputList.newInstance();
|
||||||
try {
|
try {
|
||||||
channelInputClosed(ctx, out);
|
channelInputClosed(ctx, out);
|
||||||
} catch (DecoderException e) {
|
} catch (DecoderException e) {
|
||||||
@ -344,7 +356,7 @@ public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter
|
|||||||
ctx.fireChannelInactive();
|
ctx.fireChannelInactive();
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
// recycle in all cases
|
// Recycle in all cases
|
||||||
out.recycle();
|
out.recycle();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
174
codec/src/main/java/io/netty/handler/codec/CodecOutputList.java
Normal file
174
codec/src/main/java/io/netty/handler/codec/CodecOutputList.java
Normal file
@ -0,0 +1,174 @@
|
|||||||
|
/*
|
||||||
|
* Copyright 2016 The Netty Project
|
||||||
|
*
|
||||||
|
* The Netty Project licenses this file to you under the Apache License,
|
||||||
|
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at:
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
* License for the specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
package io.netty.handler.codec;
|
||||||
|
|
||||||
|
import io.netty.util.Recycler;
|
||||||
|
|
||||||
|
import java.util.AbstractList;
|
||||||
|
import java.util.RandomAccess;
|
||||||
|
|
||||||
|
import static io.netty.util.internal.ObjectUtil.checkNotNull;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Special {@link AbstractList} implementation which is used within our codec base classes.
|
||||||
|
*/
|
||||||
|
final class CodecOutputList extends AbstractList<Object> implements RandomAccess {
|
||||||
|
|
||||||
|
private static final Recycler<CodecOutputList> RECYCLER = new Recycler<CodecOutputList>() {
|
||||||
|
@Override
|
||||||
|
protected CodecOutputList newObject(Handle handle) {
|
||||||
|
return new CodecOutputList(handle);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
static CodecOutputList newInstance() {
|
||||||
|
return RECYCLER.get();
|
||||||
|
}
|
||||||
|
|
||||||
|
private final Recycler.Handle handle;
|
||||||
|
private int size;
|
||||||
|
// Size of 16 should be good enough for 99 % of all users.
|
||||||
|
private Object[] array = new Object[16];
|
||||||
|
private boolean insertSinceRecycled;
|
||||||
|
|
||||||
|
private CodecOutputList(Recycler.Handle handle) {
|
||||||
|
this.handle = handle;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Object get(int index) {
|
||||||
|
checkIndex(index);
|
||||||
|
return array[index];
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int size() {
|
||||||
|
return size;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean add(Object element) {
|
||||||
|
checkNotNull(element, "element");
|
||||||
|
try {
|
||||||
|
insert(size, element);
|
||||||
|
} catch (IndexOutOfBoundsException ignore) {
|
||||||
|
// This should happen very infrequently so we just catch the exception and try again.
|
||||||
|
expandArray();
|
||||||
|
insert(size, element);
|
||||||
|
}
|
||||||
|
++ size;
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Object set(int index, Object element) {
|
||||||
|
checkNotNull(element, "element");
|
||||||
|
checkIndex(index);
|
||||||
|
|
||||||
|
Object old = array[index];
|
||||||
|
insert(index, element);
|
||||||
|
return old;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void add(int index, Object element) {
|
||||||
|
checkNotNull(element, "element");
|
||||||
|
checkIndex(index);
|
||||||
|
|
||||||
|
if (size == array.length) {
|
||||||
|
expandArray();
|
||||||
|
}
|
||||||
|
|
||||||
|
if (index != size - 1) {
|
||||||
|
System.arraycopy(array, index, array, index + 1, size - index);
|
||||||
|
}
|
||||||
|
|
||||||
|
insert(index, element);
|
||||||
|
++ size;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Object remove(int index) {
|
||||||
|
checkIndex(index);
|
||||||
|
Object old = array[index];
|
||||||
|
|
||||||
|
int len = size - index - 1;
|
||||||
|
if (len > 0) {
|
||||||
|
System.arraycopy(array, index + 1, array, index, len);
|
||||||
|
}
|
||||||
|
array[-- size] = null;
|
||||||
|
|
||||||
|
return old;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void clear() {
|
||||||
|
// We only set the size to 0 and not null out the array. Null out the array will explicit requested by
|
||||||
|
// calling recycle()
|
||||||
|
size = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns {@code true} if any elements where added or set. This will be reset once {@link #recycle()} was called.
|
||||||
|
*/
|
||||||
|
boolean insertSinceRecycled() {
|
||||||
|
return insertSinceRecycled;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Recycle the array which will clear it and null out all entries in the internal storage.
|
||||||
|
*/
|
||||||
|
void recycle() {
|
||||||
|
for (int i = 0 ; i < size; i ++) {
|
||||||
|
array[i] = null;
|
||||||
|
}
|
||||||
|
clear();
|
||||||
|
insertSinceRecycled = false;
|
||||||
|
RECYCLER.recycle(this, handle);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the element on the given index. This operation will not do any range-checks and so is considered unsafe.
|
||||||
|
*/
|
||||||
|
Object getUnsafe(int index) {
|
||||||
|
return array[index];
|
||||||
|
}
|
||||||
|
|
||||||
|
private void checkIndex(int index) {
|
||||||
|
if (index >= size) {
|
||||||
|
throw new IndexOutOfBoundsException();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void insert(int index, Object element) {
|
||||||
|
array[index] = element;
|
||||||
|
insertSinceRecycled = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void expandArray() {
|
||||||
|
// double capacity
|
||||||
|
int newCapacity = array.length << 1;
|
||||||
|
|
||||||
|
if (newCapacity < 0) {
|
||||||
|
throw new OutOfMemoryError();
|
||||||
|
}
|
||||||
|
|
||||||
|
Object[] newArray = new Object[newCapacity];
|
||||||
|
System.arraycopy(array, 0, newArray, 0, array.length);
|
||||||
|
|
||||||
|
array = newArray;
|
||||||
|
}
|
||||||
|
}
|
@ -21,7 +21,6 @@ import io.netty.channel.ChannelInboundHandlerAdapter;
|
|||||||
import io.netty.channel.ChannelPipeline;
|
import io.netty.channel.ChannelPipeline;
|
||||||
import io.netty.util.ReferenceCountUtil;
|
import io.netty.util.ReferenceCountUtil;
|
||||||
import io.netty.util.ReferenceCounted;
|
import io.netty.util.ReferenceCounted;
|
||||||
import io.netty.util.internal.RecyclableArrayList;
|
|
||||||
import io.netty.util.internal.TypeParameterMatcher;
|
import io.netty.util.internal.TypeParameterMatcher;
|
||||||
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
@ -80,7 +79,7 @@ public abstract class MessageToMessageDecoder<I> extends ChannelInboundHandlerAd
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
|
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
|
||||||
RecyclableArrayList out = RecyclableArrayList.newInstance();
|
CodecOutputList out = CodecOutputList.newInstance();
|
||||||
try {
|
try {
|
||||||
if (acceptInboundMessage(msg)) {
|
if (acceptInboundMessage(msg)) {
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
@ -100,7 +99,7 @@ public abstract class MessageToMessageDecoder<I> extends ChannelInboundHandlerAd
|
|||||||
} finally {
|
} finally {
|
||||||
int size = out.size();
|
int size = out.size();
|
||||||
for (int i = 0; i < size; i ++) {
|
for (int i = 0; i < size; i ++) {
|
||||||
ctx.fireChannelRead(out.get(i));
|
ctx.fireChannelRead(out.getUnsafe(i));
|
||||||
}
|
}
|
||||||
out.recycle();
|
out.recycle();
|
||||||
}
|
}
|
||||||
|
@ -22,7 +22,6 @@ import io.netty.channel.ChannelPipeline;
|
|||||||
import io.netty.channel.ChannelPromise;
|
import io.netty.channel.ChannelPromise;
|
||||||
import io.netty.util.ReferenceCountUtil;
|
import io.netty.util.ReferenceCountUtil;
|
||||||
import io.netty.util.ReferenceCounted;
|
import io.netty.util.ReferenceCounted;
|
||||||
import io.netty.util.internal.RecyclableArrayList;
|
|
||||||
import io.netty.util.internal.StringUtil;
|
import io.netty.util.internal.StringUtil;
|
||||||
import io.netty.util.internal.TypeParameterMatcher;
|
import io.netty.util.internal.TypeParameterMatcher;
|
||||||
|
|
||||||
@ -79,10 +78,10 @@ public abstract class MessageToMessageEncoder<I> extends ChannelOutboundHandlerA
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
|
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
|
||||||
RecyclableArrayList out = null;
|
CodecOutputList out = null;
|
||||||
try {
|
try {
|
||||||
if (acceptOutboundMessage(msg)) {
|
if (acceptOutboundMessage(msg)) {
|
||||||
out = RecyclableArrayList.newInstance();
|
out = CodecOutputList.newInstance();
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
I cast = (I) msg;
|
I cast = (I) msg;
|
||||||
try {
|
try {
|
||||||
@ -122,9 +121,9 @@ public abstract class MessageToMessageEncoder<I> extends ChannelOutboundHandlerA
|
|||||||
} else {
|
} else {
|
||||||
p = ctx.newPromise();
|
p = ctx.newPromise();
|
||||||
}
|
}
|
||||||
ctx.write(out.get(i), p);
|
ctx.write(out.getUnsafe(i), p);
|
||||||
}
|
}
|
||||||
ctx.write(out.get(sizeMinusOne), promise);
|
ctx.write(out.getUnsafe(sizeMinusOne), promise);
|
||||||
}
|
}
|
||||||
out.recycle();
|
out.recycle();
|
||||||
}
|
}
|
||||||
|
@ -0,0 +1,76 @@
|
|||||||
|
/*
|
||||||
|
* Copyright 2016 The Netty Project
|
||||||
|
*
|
||||||
|
* The Netty Project licenses this file to you under the Apache License,
|
||||||
|
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at:
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
* License for the specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
package io.netty.handler.codec;
|
||||||
|
|
||||||
|
import io.netty.microbench.util.AbstractMicrobenchmark;
|
||||||
|
import io.netty.util.internal.RecyclableArrayList;
|
||||||
|
import org.openjdk.jmh.annotations.Benchmark;
|
||||||
|
import org.openjdk.jmh.annotations.Level;
|
||||||
|
import org.openjdk.jmh.annotations.Param;
|
||||||
|
import org.openjdk.jmh.annotations.Scope;
|
||||||
|
import org.openjdk.jmh.annotations.Setup;
|
||||||
|
import org.openjdk.jmh.annotations.State;
|
||||||
|
import org.openjdk.jmh.annotations.TearDown;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
@State(Scope.Benchmark)
|
||||||
|
public class CodecOutputListBenchmark extends AbstractMicrobenchmark {
|
||||||
|
|
||||||
|
private static final Object ELEMENT = new Object();
|
||||||
|
private CodecOutputList codecOutputList;
|
||||||
|
private RecyclableArrayList recycleableArrayList;
|
||||||
|
private List<Object> arrayList;
|
||||||
|
|
||||||
|
@Param({ "1", "4" })
|
||||||
|
public int elements;
|
||||||
|
|
||||||
|
@Setup(Level.Invocation)
|
||||||
|
public void setup() {
|
||||||
|
codecOutputList = CodecOutputList.newInstance();
|
||||||
|
recycleableArrayList = RecyclableArrayList.newInstance(16);
|
||||||
|
arrayList = new ArrayList<Object>(16);
|
||||||
|
}
|
||||||
|
|
||||||
|
@TearDown
|
||||||
|
public void destroy() {
|
||||||
|
codecOutputList.recycle();
|
||||||
|
recycleableArrayList.recycle();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Benchmark
|
||||||
|
public void codecOutList() {
|
||||||
|
benchmarkAddAndClear(codecOutputList, elements);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Benchmark
|
||||||
|
public void recyclableArrayList() {
|
||||||
|
benchmarkAddAndClear(recycleableArrayList, elements);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Benchmark
|
||||||
|
public void arrayList() {
|
||||||
|
benchmarkAddAndClear(arrayList, elements);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void benchmarkAddAndClear(List<Object> list, int elements) {
|
||||||
|
for (int i = 0; i < elements; i++) {
|
||||||
|
list.add(ELEMENT);
|
||||||
|
}
|
||||||
|
list.clear();
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,19 @@
|
|||||||
|
/*
|
||||||
|
* Copyright 2016 The Netty Project
|
||||||
|
*
|
||||||
|
* The Netty Project licenses this file to you under the Apache License,
|
||||||
|
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at:
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
* License for the specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
/**
|
||||||
|
* Benchmarks for {@link io.netty.handler.codec}.
|
||||||
|
*/
|
||||||
|
package io.netty.handler.codec;
|
Loading…
x
Reference in New Issue
Block a user