Allow to allign allocated Buffers

Motivation:

64-byte alignment is recommended by the Intel performance guide (https://software.intel.com/en-us/articles/practical-intel-avx-optimization-on-2nd-generation-intel-core-processors) for data-structures over 64 bytes.
Requiring padding to a multiple of 64 bytes allows for using SIMD instructions consistently in loops without additional conditional checks. This should allow for simpler and more efficient code.

Modification:

At the moment cache alignment must be setup manually. But probably it might be taken from the system. The original code was introduced by @normanmaurer https://github.com/netty/netty/pull/4726/files

Result:

Buffer alignment works better than miss-align cache.
This commit is contained in:
Kiril Menshikov 2017-01-29 23:26:40 +02:00 committed by Norman Maurer
parent 48f6541cb3
commit 66b9be3a46
8 changed files with 264 additions and 25 deletions

View File

@ -47,6 +47,8 @@ abstract class PoolArena<T> implements PoolArenaMetric {
final int chunkSize;
final int subpageOverflowMask;
final int numSmallSubpagePools;
final int directMemoryCacheAlignment;
final int directMemoryCacheAlignmentMask;
private final PoolSubpage<T>[] tinySubpagePools;
private final PoolSubpage<T>[] smallSubpagePools;
@ -80,12 +82,15 @@ abstract class PoolArena<T> implements PoolArenaMetric {
// TODO: Test if adding padding helps under contention
//private long pad0, pad1, pad2, pad3, pad4, pad5, pad6, pad7;
protected PoolArena(PooledByteBufAllocator parent, int pageSize, int maxOrder, int pageShifts, int chunkSize) {
protected PoolArena(PooledByteBufAllocator parent, int pageSize,
int maxOrder, int pageShifts, int chunkSize, int cacheAlignment) {
this.parent = parent;
this.pageSize = pageSize;
this.maxOrder = maxOrder;
this.pageShifts = pageShifts;
this.chunkSize = chunkSize;
this.directMemoryCacheAlignment = cacheAlignment;
this.directMemoryCacheAlignmentMask = cacheAlignment - 1;
subpageOverflowMask = ~(pageSize - 1);
tinySubpagePools = newSubpagePoolArray(numTinySubpagePools);
for (int i = 0; i < tinySubpagePools.length; i ++) {
@ -329,8 +334,9 @@ abstract class PoolArena<T> implements PoolArenaMetric {
if (reqCapacity < 0) {
throw new IllegalArgumentException("capacity: " + reqCapacity + " (expected: 0+)");
}
if (reqCapacity >= chunkSize) {
return reqCapacity;
return directMemoryCacheAlignment == 0 ? reqCapacity : alignCapacity(reqCapacity);
}
if (!isTiny(reqCapacity)) { // >= 512
@ -348,10 +354,15 @@ abstract class PoolArena<T> implements PoolArenaMetric {
if (normalizedCapacity < 0) {
normalizedCapacity >>>= 1;
}
assert directMemoryCacheAlignment == 0 || (normalizedCapacity & directMemoryCacheAlignmentMask) == 0;
return normalizedCapacity;
}
if (directMemoryCacheAlignment > 0) {
return alignCapacity(reqCapacity);
}
// Quantum-spaced
if ((reqCapacity & 15) == 0) {
return reqCapacity;
@ -360,6 +371,11 @@ abstract class PoolArena<T> implements PoolArenaMetric {
return (reqCapacity & ~15) + 16;
}
int alignCapacity(int reqCapacity) {
int delta = reqCapacity & directMemoryCacheAlignmentMask;
return delta == 0 ? reqCapacity : reqCapacity + directMemoryCacheAlignment - delta;
}
void reallocate(PooledByteBuf<T> buf, int newCapacity, boolean freeOldMemory) {
if (newCapacity < 0 || newCapacity > buf.maxCapacity()) {
throw new IllegalArgumentException("newCapacity: " + newCapacity);
@ -650,8 +666,10 @@ abstract class PoolArena<T> implements PoolArenaMetric {
static final class HeapArena extends PoolArena<byte[]> {
HeapArena(PooledByteBufAllocator parent, int pageSize, int maxOrder, int pageShifts, int chunkSize) {
super(parent, pageSize, maxOrder, pageShifts, chunkSize);
HeapArena(PooledByteBufAllocator parent, int pageSize, int maxOrder,
int pageShifts, int chunkSize, int directMemoryCacheAlignment) {
super(parent, pageSize, maxOrder, pageShifts, chunkSize,
directMemoryCacheAlignment);
}
@Override
@ -661,12 +679,12 @@ abstract class PoolArena<T> implements PoolArenaMetric {
@Override
protected PoolChunk<byte[]> newChunk(int pageSize, int maxOrder, int pageShifts, int chunkSize) {
return new PoolChunk<byte[]>(this, new byte[chunkSize], pageSize, maxOrder, pageShifts, chunkSize);
return new PoolChunk<byte[]>(this, new byte[chunkSize], pageSize, maxOrder, pageShifts, chunkSize, 0);
}
@Override
protected PoolChunk<byte[]> newUnpooledChunk(int capacity) {
return new PoolChunk<byte[]>(this, new byte[capacity], capacity);
return new PoolChunk<byte[]>(this, new byte[capacity], capacity, 0);
}
@Override
@ -692,8 +710,10 @@ abstract class PoolArena<T> implements PoolArenaMetric {
static final class DirectArena extends PoolArena<ByteBuffer> {
DirectArena(PooledByteBufAllocator parent, int pageSize, int maxOrder, int pageShifts, int chunkSize) {
super(parent, pageSize, maxOrder, pageShifts, chunkSize);
DirectArena(PooledByteBufAllocator parent, int pageSize, int maxOrder,
int pageShifts, int chunkSize, int directMemoryCacheAlignment) {
super(parent, pageSize, maxOrder, pageShifts, chunkSize,
directMemoryCacheAlignment);
}
@Override
@ -701,16 +721,35 @@ abstract class PoolArena<T> implements PoolArenaMetric {
return true;
}
private int offsetCacheLine(ByteBuffer memory) {
return (int) (PlatformDependent.directBufferAddress(memory) & directMemoryCacheAlignmentMask);
}
@Override
protected PoolChunk<ByteBuffer> newChunk(int pageSize, int maxOrder, int pageShifts, int chunkSize) {
return new PoolChunk<ByteBuffer>(
this, allocateDirect(chunkSize),
pageSize, maxOrder, pageShifts, chunkSize);
protected PoolChunk<ByteBuffer> newChunk(int pageSize, int maxOrder,
int pageShifts, int chunkSize) {
if (directMemoryCacheAlignment == 0) {
return new PoolChunk<ByteBuffer>(this,
allocateDirect(chunkSize), pageSize, maxOrder,
pageShifts, chunkSize, 0);
}
final ByteBuffer memory = allocateDirect(chunkSize
+ directMemoryCacheAlignment);
return new PoolChunk<ByteBuffer>(this, memory, pageSize,
maxOrder, pageShifts, chunkSize,
offsetCacheLine(memory));
}
@Override
protected PoolChunk<ByteBuffer> newUnpooledChunk(int capacity) {
return new PoolChunk<ByteBuffer>(this, allocateDirect(capacity), capacity);
if (directMemoryCacheAlignment == 0) {
return new PoolChunk<ByteBuffer>(this,
allocateDirect(capacity), capacity, 0);
}
final ByteBuffer memory = allocateDirect(capacity
+ directMemoryCacheAlignment);
return new PoolChunk<ByteBuffer>(this, memory, capacity,
offsetCacheLine(memory));
}
private static ByteBuffer allocateDirect(int capacity) {

View File

@ -107,6 +107,7 @@ final class PoolChunk<T> implements PoolChunkMetric {
final PoolArena<T> arena;
final T memory;
final boolean unpooled;
final int offset;
private final byte[] memoryMap;
private final byte[] depthMap;
@ -131,7 +132,7 @@ final class PoolChunk<T> implements PoolChunkMetric {
// TODO: Test if adding padding helps under contention
//private long pad0, pad1, pad2, pad3, pad4, pad5, pad6, pad7;
PoolChunk(PoolArena<T> arena, T memory, int pageSize, int maxOrder, int pageShifts, int chunkSize) {
PoolChunk(PoolArena<T> arena, T memory, int pageSize, int maxOrder, int pageShifts, int chunkSize, int offset) {
unpooled = false;
this.arena = arena;
this.memory = memory;
@ -139,6 +140,7 @@ final class PoolChunk<T> implements PoolChunkMetric {
this.pageShifts = pageShifts;
this.maxOrder = maxOrder;
this.chunkSize = chunkSize;
this.offset = offset;
unusable = (byte) (maxOrder + 1);
log2ChunkSize = log2(chunkSize);
subpageOverflowMask = ~(pageSize - 1);
@ -165,10 +167,11 @@ final class PoolChunk<T> implements PoolChunkMetric {
}
/** Creates a special chunk that is not pooled. */
PoolChunk(PoolArena<T> arena, T memory, int size) {
PoolChunk(PoolArena<T> arena, T memory, int size, int offset) {
unpooled = true;
this.arena = arena;
this.memory = memory;
this.offset = offset;
memoryMap = null;
depthMap = null;
subpages = null;
@ -371,7 +374,7 @@ final class PoolChunk<T> implements PoolChunkMetric {
if (bitmapIdx == 0) {
byte val = value(memoryMapIdx);
assert val == unusable : String.valueOf(val);
buf.init(this, handle, runOffset(memoryMapIdx), reqCapacity, runLength(memoryMapIdx),
buf.init(this, handle, runOffset(memoryMapIdx) + offset, reqCapacity, runLength(memoryMapIdx),
arena.parent.threadCache());
} else {
initBufWithSubpage(buf, handle, bitmapIdx, reqCapacity);
@ -393,8 +396,8 @@ final class PoolChunk<T> implements PoolChunkMetric {
buf.init(
this, handle,
runOffset(memoryMapIdx) + (bitmapIdx & 0x3FFFFFFF) * subpage.elemSize, reqCapacity, subpage.elemSize,
arena.parent.threadCache());
runOffset(memoryMapIdx) + (bitmapIdx & 0x3FFFFFFF) * subpage.elemSize + offset,
reqCapacity, subpage.elemSize, arena.parent.threadCache());
}
private byte value(int id) {

View File

@ -63,7 +63,7 @@ abstract class PooledByteBuf<T> extends AbstractReferenceCountedByteBuf {
this.chunk = chunk;
handle = 0;
memory = chunk.memory;
offset = 0;
offset = chunk.offset;
this.length = maxLength = length;
tmpNioBuf = null;
cache = null;

View File

@ -43,6 +43,7 @@ public class PooledByteBufAllocator extends AbstractByteBufAllocator {
private static final int DEFAULT_MAX_CACHED_BUFFER_CAPACITY;
private static final int DEFAULT_CACHE_TRIM_INTERVAL;
private static final boolean DEFAULT_USE_CACHE_FOR_ALL_THREADS;
private static final int DEFAULT_DIRECT_MEMORY_CACHE_ALIGNMENT;
private static final int MIN_PAGE_SIZE = 4096;
private static final int MAX_CHUNK_SIZE = (int) (((long) Integer.MAX_VALUE + 1) / 2);
@ -108,6 +109,9 @@ public class PooledByteBufAllocator extends AbstractByteBufAllocator {
DEFAULT_USE_CACHE_FOR_ALL_THREADS = SystemPropertyUtil.getBoolean(
"io.netty.allocator.useCacheForAllThreads", true);
DEFAULT_DIRECT_MEMORY_CACHE_ALIGNMENT = SystemPropertyUtil.getInt(
"io.netty.allocator.directMemoryCacheAlignment", 0);
if (logger.isDebugEnabled()) {
logger.debug("-Dio.netty.allocator.numHeapArenas: {}", DEFAULT_NUM_HEAP_ARENA);
logger.debug("-Dio.netty.allocator.numDirectArenas: {}", DEFAULT_NUM_DIRECT_ARENA);
@ -175,12 +179,21 @@ public class PooledByteBufAllocator extends AbstractByteBufAllocator {
public PooledByteBufAllocator(boolean preferDirect, int nHeapArena, int nDirectArena, int pageSize, int maxOrder,
int tinyCacheSize, int smallCacheSize, int normalCacheSize) {
this(preferDirect, nHeapArena, nDirectArena, pageSize, maxOrder, tinyCacheSize, smallCacheSize,
normalCacheSize, DEFAULT_USE_CACHE_FOR_ALL_THREADS);
normalCacheSize, DEFAULT_USE_CACHE_FOR_ALL_THREADS, DEFAULT_DIRECT_MEMORY_CACHE_ALIGNMENT);
}
public PooledByteBufAllocator(boolean preferDirect, int nHeapArena,
int nDirectArena, int pageSize, int maxOrder, int tinyCacheSize,
int smallCacheSize, int normalCacheSize,
boolean useCacheForAllThreads) {
this(preferDirect, nHeapArena, nDirectArena, pageSize, maxOrder,
tinyCacheSize, smallCacheSize, normalCacheSize,
useCacheForAllThreads, DEFAULT_DIRECT_MEMORY_CACHE_ALIGNMENT);
}
public PooledByteBufAllocator(boolean preferDirect, int nHeapArena, int nDirectArena, int pageSize, int maxOrder,
int tinyCacheSize, int smallCacheSize, int normalCacheSize,
boolean useCacheForAllThreads) {
boolean useCacheForAllThreads, int directMemoryCacheAlignment) {
super(preferDirect);
threadCache = new PoolThreadLocalCache(useCacheForAllThreads);
this.tinyCacheSize = tinyCacheSize;
@ -195,13 +208,25 @@ public class PooledByteBufAllocator extends AbstractByteBufAllocator {
throw new IllegalArgumentException("nDirectArea: " + nDirectArena + " (expected: >= 0)");
}
if (directMemoryCacheAlignment < 0) {
throw new IllegalArgumentException("directMemoryCacheAlignment: "
+ directMemoryCacheAlignment + " (expected: >= 0)");
}
if ((directMemoryCacheAlignment & -directMemoryCacheAlignment) != directMemoryCacheAlignment) {
throw new IllegalArgumentException("directMemoryCacheAlignment: "
+ directMemoryCacheAlignment + " (expected: power of two)");
}
int pageShifts = validateAndCalculatePageShifts(pageSize);
if (nHeapArena > 0) {
heapArenas = newArenaArray(nHeapArena);
List<PoolArenaMetric> metrics = new ArrayList<PoolArenaMetric>(heapArenas.length);
for (int i = 0; i < heapArenas.length; i ++) {
PoolArena.HeapArena arena = new PoolArena.HeapArena(this, pageSize, maxOrder, pageShifts, chunkSize);
PoolArena.HeapArena arena = new PoolArena.HeapArena(this,
pageSize, maxOrder, pageShifts, chunkSize,
directMemoryCacheAlignment);
heapArenas[i] = arena;
metrics.add(arena);
}
@ -216,7 +241,7 @@ public class PooledByteBufAllocator extends AbstractByteBufAllocator {
List<PoolArenaMetric> metrics = new ArrayList<PoolArenaMetric>(directArenas.length);
for (int i = 0; i < directArenas.length; i ++) {
PoolArena.DirectArena arena = new PoolArena.DirectArena(
this, pageSize, maxOrder, pageShifts, chunkSize);
this, pageSize, maxOrder, pageShifts, chunkSize, directMemoryCacheAlignment);
directArenas[i] = arena;
metrics.add(arena);
}

View File

@ -25,7 +25,7 @@ public class PoolArenaTest {
@Test
public void testNormalizeCapacity() throws Exception {
PoolArena<ByteBuffer> arena = new PoolArena.DirectArena(null, 0, 0, 9, 999999);
PoolArena<ByteBuffer> arena = new PoolArena.DirectArena(null, 0, 0, 9, 999999, 0);
int[] reqCapacities = {0, 15, 510, 1024, 1023, 1025};
int[] expectedResult = {0, 16, 512, 1024, 1024, 2048};
for (int i = 0; i < reqCapacities.length; i ++) {
@ -33,6 +33,16 @@ public class PoolArenaTest {
}
}
@Test
public void testNormalizeAlignedCapacity() throws Exception {
PoolArena<ByteBuffer> arena = new PoolArena.DirectArena(null, 0, 0, 9, 999999, 64);
int[] reqCapacities = {0, 15, 510, 1024, 1023, 1025};
int[] expectedResult = {0, 64, 512, 1024, 1024, 2048};
for (int i = 0; i < reqCapacities.length; i ++) {
Assert.assertEquals(expectedResult[i], arena.normalizeCapacity(reqCapacities[i]));
}
}
@Test
public final void testAllocationCounter() {
final PooledByteBufAllocator allocator = new PooledByteBufAllocator(

View File

@ -52,6 +52,16 @@ public class PooledByteBufAllocatorTest extends AbstractByteBufAllocatorTest {
testArenaMetrics0(new PooledByteBufAllocator(true, 2, 2, 8192, 11, 1000, 1000, 1000), 100, 1, 1, 0);
}
@Test
public void testArenaMetricsNoCacheAlign() {
testArenaMetrics0(new PooledByteBufAllocator(true, 2, 2, 8192, 11, 0, 0, 0, true, 64), 100, 0, 100, 100);
}
@Test
public void testArenaMetricsCacheAlign() {
testArenaMetrics0(new PooledByteBufAllocator(true, 2, 2, 8192, 11, 1000, 1000, 1000, true, 64), 100, 1, 1, 0);
}
private static void testArenaMetrics0(
PooledByteBufAllocator allocator, int num, int expectedActive, int expectedAlloc, int expectedDealloc) {
for (int i = 0; i < num; i++) {

View File

@ -35,7 +35,7 @@ public class ByteBufAllocatorBenchmark extends AbstractMicrobenchmark {
private static final ByteBufAllocator unpooledAllocator = new UnpooledByteBufAllocator(true);
private static final ByteBufAllocator pooledAllocator =
new PooledByteBufAllocator(true, 4, 4, 8192, 11, 0, 0, 0); // Disable thread-local cache
new PooledByteBufAllocator(true, 4, 4, 8192, 11, 0, 0, 0, true, 0); // Disable thread-local cache
private static final int MAX_LIVE_BUFFERS = 8192;
private static final Random rand = new Random();

View File

@ -0,0 +1,152 @@
/*
* Copyright 2017 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.microbench.buffer;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.microbench.util.AbstractMicrobenchmark;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.TearDown;
import org.openjdk.jmh.annotations.Warmup;
@State(Scope.Thread)
@Warmup(iterations = 5, time = 100, timeUnit = TimeUnit.MILLISECONDS)
@Measurement(iterations = 5, time = 100, timeUnit = TimeUnit.MILLISECONDS)
@Fork(5)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
public class PooledByteBufAllocatorAlignBenchmark extends
AbstractMicrobenchmark {
private static final Random rand = new Random();
/**
* Cache line power of 2.
*/
private static final int CACHE_LINE_MAX = 256;
/**
* PRNG to walk the chunk randomly to avoid streaming reads.
*/
private static final int OFFSET_ADD = CACHE_LINE_MAX * 1337;
/**
* Block of bytes to write/read. (Corresponds to int type)
*/
private static final int BLOCK = 4;
@Param({ "0", "64" })
private int cacheAlign;
@Param({ "01024", "04096", "16384", "65536", "1048576" })
private int size;
private ByteBuf pooledDirectBuffer;
private byte[] bytes;
private int sizeMask;
private int alignOffset;
@Setup
public void doSetup() {
PooledByteBufAllocator pooledAllocator = new PooledByteBufAllocator(true, 4, 4, 8192, 11, 0,
0, 0, true, cacheAlign);
pooledDirectBuffer = pooledAllocator.directBuffer(size + 64);
sizeMask = size - 1;
if (cacheAlign == 0) {
long addr = pooledDirectBuffer.memoryAddress();
// make sure address is miss-aligned
if (addr % 64 == 0) {
alignOffset = 63;
}
int off = 0;
for (int c = 0; c < size; c++) {
off = (off + OFFSET_ADD) & sizeMask;
if ((addr + off + alignOffset) % BLOCK == 0) {
throw new IllegalStateException(
"Misaligned address is not really aligned");
}
}
} else {
alignOffset = 0;
int off = 0;
long addr = pooledDirectBuffer.memoryAddress();
for (int c = 0; c < size; c++) {
off = (off + OFFSET_ADD) & sizeMask;
if ((addr + off) % BLOCK != 0) {
throw new IllegalStateException(
"Aligned address is not really aligned");
}
}
}
bytes = new byte[BLOCK];
rand.nextBytes(bytes);
}
@TearDown
public void doTearDown() {
pooledDirectBuffer.release();
}
@Benchmark
public void writeRead() {
int off = 0;
int lSize = size;
int lSizeMask = sizeMask;
int lAlignOffset = alignOffset;
for (int i = 0; i < lSize; i++) {
off = (off + OFFSET_ADD) & lSizeMask;
pooledDirectBuffer.setBytes(off + lAlignOffset, bytes);
pooledDirectBuffer.getBytes(off + lAlignOffset, bytes);
}
}
@Benchmark
public void write() {
int off = 0;
int lSize = size;
int lSizeMask = sizeMask;
int lAlignOffset = alignOffset;
for (int i = 0; i < lSize; i++) {
off = (off + OFFSET_ADD) & lSizeMask;
pooledDirectBuffer.setBytes(off + lAlignOffset, bytes);
}
}
@Benchmark
public void read() {
int off = 0;
int lSize = size;
int lSizeMask = sizeMask;
int lAlignOffset = alignOffset;
for (int i = 0; i < lSize; i++) {
off = (off + OFFSET_ADD) & lSizeMask;
pooledDirectBuffer.getBytes(off + lAlignOffset, bytes);
}
}
}