Streamline CompositeByteBuf internals (#8437)

Motivation:

CompositeByteBuf is a powerful and versatile abstraction, allowing for
manipulation of large data without copying bytes. There is still a
non-negligible cost to reading/writing however relative to "singular"
ByteBufs, and this can be mostly eliminated with some rework of the
internals.

My use case is message modification/transformation while zero-copy
proxying. For example replacing a string within a large message with one
of a different length

Modifications:

- No longer slice added buffers and unwrap added slices
   - Components store target buf offset relative to position in
composite buf
   - Less allocations, object footprint, pointer indirection, offset
arithmetic
- Use Component[] rather than ArrayList<Component>
   - Avoid pointer indirection and duplicate bounds check, more
efficient backing array growth
   - Facilitates optimization when doing bulk-inserts - inserting n
ByteBufs behind m is now O(m + n) instead of O(mn)
- Avoid unnecessary casting and method call indirection via superclass
- Eliminate some duplicate range/ref checks via non-checking versions of
toComponentIndex and findComponent
- Add simple fast-path for toComponentIndex(0); add racy cache of
last-accessed Component to findComponent(int)
- Override forEachByte0(...) and forEachByteDesc0(...) methods
- Make use of RecyclableArrayList in nioBuffers(int, int) (in line with
FasterCompositeByteBuf impl)
- Modify addComponents0(boolean,int,Iterable) to use the Iterable
directly rather than copy to an array first (and possibly to an
ArrayList before that)
- Optimize addComponents0(boolean,int,ByteBuf[],int) to not perform
repeated array insertions and avoid second loop for offset updates
- Simplify other logic in various places, in particular the general
pattern used where a sub-range is iterated over
- Add benchmarks to demonstrate some improvements

While refactoring I also came across a couple of clear bugs. They are
fixed in these changes but I will open another PR with unit tests and
fixes to the current version.

Result:

Much faster creation, manipulation, and access; many fewer allocations
and smaller footprint. Benchmark results to follow.
This commit is contained in:
Nick Hill 2018-11-03 02:37:07 -07:00 committed by Norman Maurer
parent 6563f23a9b
commit 10539f4dc7
6 changed files with 972 additions and 419 deletions

View File

@ -46,7 +46,7 @@ public abstract class AbstractByteBuf extends ByteBuf {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractByteBuf.class); private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractByteBuf.class);
private static final String LEGACY_PROP_CHECK_ACCESSIBLE = "io.netty.buffer.bytebuf.checkAccessible"; private static final String LEGACY_PROP_CHECK_ACCESSIBLE = "io.netty.buffer.bytebuf.checkAccessible";
private static final String PROP_CHECK_ACCESSIBLE = "io.netty.buffer.checkAccessible"; private static final String PROP_CHECK_ACCESSIBLE = "io.netty.buffer.checkAccessible";
private static final boolean checkAccessible; static final boolean checkAccessible; // accessed from CompositeByteBuf
private static final String PROP_CHECK_BOUNDS = "io.netty.buffer.checkBounds"; private static final String PROP_CHECK_BOUNDS = "io.netty.buffer.checkBounds";
private static final boolean checkBounds; private static final boolean checkBounds;
@ -332,12 +332,12 @@ public abstract class AbstractByteBuf extends ByteBuf {
@Override @Override
public ByteBuf order(ByteOrder endianness) { public ByteBuf order(ByteOrder endianness) {
if (endianness == null) {
throw new NullPointerException("endianness");
}
if (endianness == order()) { if (endianness == order()) {
return this; return this;
} }
if (endianness == null) {
throw new NullPointerException("endianness");
}
return newSwappedByteBuf(); return newSwappedByteBuf();
} }
@ -1293,7 +1293,7 @@ public abstract class AbstractByteBuf extends ByteBuf {
} }
} }
private int forEachByteAsc0(int start, int end, ByteProcessor processor) throws Exception { int forEachByteAsc0(int start, int end, ByteProcessor processor) throws Exception {
for (; start < end; ++start) { for (; start < end; ++start) {
if (!processor.process(_getByte(start))) { if (!processor.process(_getByte(start))) {
return start; return start;
@ -1325,7 +1325,7 @@ public abstract class AbstractByteBuf extends ByteBuf {
} }
} }
private int forEachByteDesc0(int rStart, final int rEnd, ByteProcessor processor) throws Exception { int forEachByteDesc0(int rStart, final int rEnd, ByteProcessor processor) throws Exception {
for (; rStart >= rEnd; --rStart) { for (; rStart >= rEnd; --rStart) {
if (!processor.process(_getByte(rStart))) { if (!processor.process(_getByte(rStart))) {
return rStart; return rStart;

File diff suppressed because it is too large Load Diff

View File

@ -319,7 +319,7 @@ public final class Unpooled {
for (int i = 0; i < buffers.length; i++) { for (int i = 0; i < buffers.length; i++) {
ByteBuf buf = buffers[i]; ByteBuf buf = buffers[i];
if (buf.isReadable()) { if (buf.isReadable()) {
return new CompositeByteBuf(ALLOC, false, maxNumComponents, buffers, i, buffers.length); return new CompositeByteBuf(ALLOC, false, maxNumComponents, buffers, i);
} }
buf.release(); buf.release();
} }

View File

@ -0,0 +1,118 @@
/*
* Copyright 2018 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.buffer;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.TearDown;
import org.openjdk.jmh.annotations.Warmup;
import io.netty.microbench.util.AbstractMicrobenchmark;
import static io.netty.buffer.Unpooled.EMPTY_BUFFER;
import static io.netty.buffer.Unpooled.wrappedBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.TimeUnit;
@Warmup(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
@Measurement(iterations = 10, time = 1, timeUnit = TimeUnit.SECONDS)
public class CompositeByteBufRandomAccessBenchmark extends AbstractMicrobenchmark {
public enum ByteBufType {
SMALL_CHUNKS {
@Override
ByteBuf newBuffer(int length) {
return newBufferSmallChunks(length);
}
},
LARGE_CHUNKS {
@Override
ByteBuf newBuffer(int length) {
return newBufferLargeChunks(length);
}
};
abstract ByteBuf newBuffer(int length);
}
@Param({ "64", "10240", "1024000" }) // ({ "64", "1024", "10240", "102400", "1024000" })
public int size;
@Param
public ByteBufType bufferType;
private ByteBuf buffer;
private Random random;
@Setup
public void setup() {
buffer = bufferType.newBuffer(size);
random = new Random(0L);
}
@TearDown
public void teardown() {
buffer.release();
}
@Benchmark
public long setGetLong() {
int i = random.nextInt(size - 8);
return buffer.setLong(i, 1).getLong(i);
}
@Benchmark
public ByteBuf setLong() {
int i = random.nextInt(size - 8);
return buffer.setLong(i, 1);
}
private static ByteBuf newBufferSmallChunks(int length) {
List<ByteBuf> buffers = new ArrayList<ByteBuf>(((length + 1) / 45) * 19);
for (int i = 0; i < length + 45; i += 45) {
for (int j = 1; j <= 9; j++) {
buffers.add(EMPTY_BUFFER);
buffers.add(wrappedBuffer(new byte[j]));
}
buffers.add(EMPTY_BUFFER);
}
ByteBuf buffer = wrappedBuffer(Integer.MAX_VALUE, buffers.toArray(new ByteBuf[0]));
// Truncate to the requested capacity.
return buffer.capacity(length).writerIndex(0);
}
private static ByteBuf newBufferLargeChunks(int length) {
List<ByteBuf> buffers = new ArrayList<ByteBuf>((length + 1) / 512);
for (int i = 0; i < length + 1536; i += 1536) {
buffers.add(wrappedBuffer(new byte[512]));
buffers.add(EMPTY_BUFFER);
buffers.add(wrappedBuffer(new byte[1024]));
}
ByteBuf buffer = wrappedBuffer(Integer.MAX_VALUE, buffers.toArray(new ByteBuf[0]));
// Truncate to the requested capacity.
return buffer.capacity(length).writerIndex(0);
}
}

View File

@ -0,0 +1,132 @@
/*
* Copyright 2018 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.buffer;
import io.netty.microbench.util.AbstractMicrobenchmark;
import io.netty.util.ByteProcessor;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.TearDown;
import org.openjdk.jmh.annotations.Warmup;
import static io.netty.buffer.Unpooled.EMPTY_BUFFER;
import static io.netty.buffer.Unpooled.wrappedBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
@Warmup(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
@Measurement(iterations = 10, time = 1, timeUnit = TimeUnit.SECONDS)
public class CompositeByteBufSequentialBenchmark extends AbstractMicrobenchmark {
public enum ByteBufType {
SMALL_CHUNKS {
@Override
ByteBuf newBuffer(int length) {
return newBufferSmallChunks(length);
}
},
LARGE_CHUNKS {
@Override
ByteBuf newBuffer(int length) {
return newBufferLargeChunks(length);
}
};
abstract ByteBuf newBuffer(int length);
}
@Param({ "8", "64", "1024", "10240", "102400", "1024000" })
public int size;
@Param
public ByteBufType bufferType;
private ByteBuf buffer;
@Setup
public void setup() {
buffer = bufferType.newBuffer(size);
}
@TearDown
public void teardown() {
buffer.release();
}
private static final ByteProcessor TEST_PROCESSOR = new ByteProcessor() {
@Override
public boolean process(byte value) throws Exception {
return value == 'b'; // false
}
};
@Benchmark
public int forEachByte() {
buffer.setIndex(0, buffer.capacity());
buffer.forEachByte(TEST_PROCESSOR);
return buffer.forEachByteDesc(TEST_PROCESSOR);
}
@Benchmark
public int sequentialWriteAndRead() {
buffer.clear();
for (int i = 0, l = buffer.writableBytes(); i < l; i++) {
buffer.writeByte('a');
}
for (int i = 0, l = buffer.readableBytes(); i < l; i++) {
if (buffer.readByte() == 'b') {
return -1;
}
}
return 1;
}
private static ByteBuf newBufferSmallChunks(int length) {
List<ByteBuf> buffers = new ArrayList<ByteBuf>(((length + 1) / 45) * 19);
for (int i = 0; i < length + 45; i += 45) {
for (int j = 1; j <= 9; j++) {
buffers.add(EMPTY_BUFFER);
buffers.add(wrappedBuffer(new byte[j]));
}
buffers.add(EMPTY_BUFFER);
}
ByteBuf buffer = wrappedBuffer(Integer.MAX_VALUE, buffers.toArray(new ByteBuf[0]));
// Truncate to the requested capacity.
return buffer.capacity(length).writerIndex(0);
}
private static ByteBuf newBufferLargeChunks(int length) {
List<ByteBuf> buffers = new ArrayList<ByteBuf>((length + 1) / 512);
for (int i = 0; i < length + 1536; i += 1536) {
buffers.add(wrappedBuffer(new byte[512]));
buffers.add(EMPTY_BUFFER);
buffers.add(wrappedBuffer(new byte[1024]));
}
ByteBuf buffer = wrappedBuffer(Integer.MAX_VALUE, buffers.toArray(new ByteBuf[0]));
// Truncate to the requested capacity.
return buffer.capacity(length).writerIndex(0);
}
}

View File

@ -0,0 +1,114 @@
/*
* Copyright 2018 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.buffer;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.TearDown;
import org.openjdk.jmh.annotations.Warmup;
import io.netty.microbench.util.AbstractMicrobenchmark;
import static io.netty.buffer.Unpooled.wrappedBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
@Warmup(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
@Measurement(iterations = 12, time = 1, timeUnit = TimeUnit.SECONDS)
public class CompositeByteBufWriteOutBenchmark extends AbstractMicrobenchmark {
public enum ByteBufType {
SMALL_CHUNKS {
@Override
ByteBuf[] sourceBuffers(int length) {
return makeSmallChunks(length);
}
},
LARGE_CHUNKS {
@Override
ByteBuf[] sourceBuffers(int length) {
return makeLargeChunks(length);
}
};
abstract ByteBuf[] sourceBuffers(int length);
}
@Override
protected String[] jvmArgs() {
// Ensure we minimize the GC overhead by sizing the heap big enough.
return new String[] { "-XX:MaxDirectMemorySize=2g", "-Xmx4g", "-Xms4g", "-Xmn3g" };
}
@Param({ "64", "1024", "10240", "102400", "1024000" })
public int size;
@Param
public ByteBufType bufferType;
private ByteBuf targetBuffer;
private ByteBuf[] sourceBufs;
@Setup
public void setup() {
targetBuffer = PooledByteBufAllocator.DEFAULT.directBuffer(size + 2048);
sourceBufs = bufferType.sourceBuffers(size);
}
@TearDown
public void teardown() {
targetBuffer.release();
}
@Benchmark
public int writeCBB() {
ByteBuf cbb = Unpooled.wrappedBuffer(Integer.MAX_VALUE, sourceBufs); // CompositeByteBuf
return targetBuffer.clear().writeBytes(cbb).readableBytes();
}
@Benchmark
public int writeFCBB() {
ByteBuf cbb = Unpooled.wrappedUnmodifiableBuffer(sourceBufs); // FastCompositeByteBuf
return targetBuffer.clear().writeBytes(cbb).readableBytes();
}
private static ByteBuf[] makeSmallChunks(int length) {
List<ByteBuf> buffers = new ArrayList<ByteBuf>(((length + 1) / 48) * 9);
for (int i = 0; i < length + 48; i += 48) {
for (int j = 4; j <= 12; j++) {
buffers.add(wrappedBuffer(new byte[j]));
}
}
return buffers.toArray(new ByteBuf[0]);
}
private static ByteBuf[] makeLargeChunks(int length) {
List<ByteBuf> buffers = new ArrayList<ByteBuf>((length + 1) / 768);
for (int i = 0; i < length + 1536; i += 1536) {
buffers.add(wrappedBuffer(new byte[512]));
buffers.add(wrappedBuffer(new byte[1024]));
}
return buffers.toArray(new ByteBuf[0]);
}
}