diff --git a/buffer/src/main/java/io/netty/buffer/PoolArena.java b/buffer/src/main/java/io/netty/buffer/PoolArena.java index 5bb0ab3523..08fbe23f70 100644 --- a/buffer/src/main/java/io/netty/buffer/PoolArena.java +++ b/buffer/src/main/java/io/netty/buffer/PoolArena.java @@ -47,6 +47,8 @@ abstract class PoolArena implements PoolArenaMetric { final int chunkSize; final int subpageOverflowMask; final int numSmallSubpagePools; + final int directMemoryCacheAlignment; + final int directMemoryCacheAlignmentMask; private final PoolSubpage[] tinySubpagePools; private final PoolSubpage[] smallSubpagePools; @@ -80,12 +82,15 @@ abstract class PoolArena implements PoolArenaMetric { // TODO: Test if adding padding helps under contention //private long pad0, pad1, pad2, pad3, pad4, pad5, pad6, pad7; - protected PoolArena(PooledByteBufAllocator parent, int pageSize, int maxOrder, int pageShifts, int chunkSize) { + protected PoolArena(PooledByteBufAllocator parent, int pageSize, + int maxOrder, int pageShifts, int chunkSize, int cacheAlignment) { this.parent = parent; this.pageSize = pageSize; this.maxOrder = maxOrder; this.pageShifts = pageShifts; this.chunkSize = chunkSize; + this.directMemoryCacheAlignment = cacheAlignment; + this.directMemoryCacheAlignmentMask = cacheAlignment - 1; subpageOverflowMask = ~(pageSize - 1); tinySubpagePools = newSubpagePoolArray(numTinySubpagePools); for (int i = 0; i < tinySubpagePools.length; i ++) { @@ -329,8 +334,9 @@ abstract class PoolArena implements PoolArenaMetric { if (reqCapacity < 0) { throw new IllegalArgumentException("capacity: " + reqCapacity + " (expected: 0+)"); } + if (reqCapacity >= chunkSize) { - return reqCapacity; + return directMemoryCacheAlignment == 0 ? reqCapacity : alignCapacity(reqCapacity); } if (!isTiny(reqCapacity)) { // >= 512 @@ -348,10 +354,15 @@ abstract class PoolArena implements PoolArenaMetric { if (normalizedCapacity < 0) { normalizedCapacity >>>= 1; } + assert directMemoryCacheAlignment == 0 || (normalizedCapacity & directMemoryCacheAlignmentMask) == 0; return normalizedCapacity; } + if (directMemoryCacheAlignment > 0) { + return alignCapacity(reqCapacity); + } + // Quantum-spaced if ((reqCapacity & 15) == 0) { return reqCapacity; @@ -360,6 +371,11 @@ abstract class PoolArena implements PoolArenaMetric { return (reqCapacity & ~15) + 16; } + int alignCapacity(int reqCapacity) { + int delta = reqCapacity & directMemoryCacheAlignmentMask; + return delta == 0 ? reqCapacity : reqCapacity + directMemoryCacheAlignment - delta; + } + void reallocate(PooledByteBuf buf, int newCapacity, boolean freeOldMemory) { if (newCapacity < 0 || newCapacity > buf.maxCapacity()) { throw new IllegalArgumentException("newCapacity: " + newCapacity); @@ -650,8 +666,10 @@ abstract class PoolArena implements PoolArenaMetric { static final class HeapArena extends PoolArena { - HeapArena(PooledByteBufAllocator parent, int pageSize, int maxOrder, int pageShifts, int chunkSize) { - super(parent, pageSize, maxOrder, pageShifts, chunkSize); + HeapArena(PooledByteBufAllocator parent, int pageSize, int maxOrder, + int pageShifts, int chunkSize, int directMemoryCacheAlignment) { + super(parent, pageSize, maxOrder, pageShifts, chunkSize, + directMemoryCacheAlignment); } @Override @@ -661,12 +679,12 @@ abstract class PoolArena implements PoolArenaMetric { @Override protected PoolChunk newChunk(int pageSize, int maxOrder, int pageShifts, int chunkSize) { - return new PoolChunk(this, new byte[chunkSize], pageSize, maxOrder, pageShifts, chunkSize); + return new PoolChunk(this, new byte[chunkSize], pageSize, maxOrder, pageShifts, chunkSize, 0); } @Override protected PoolChunk newUnpooledChunk(int capacity) { - return new PoolChunk(this, new byte[capacity], capacity); + return new PoolChunk(this, new byte[capacity], capacity, 0); } @Override @@ -692,8 +710,10 @@ abstract class PoolArena implements PoolArenaMetric { static final class DirectArena extends PoolArena { - DirectArena(PooledByteBufAllocator parent, int pageSize, int maxOrder, int pageShifts, int chunkSize) { - super(parent, pageSize, maxOrder, pageShifts, chunkSize); + DirectArena(PooledByteBufAllocator parent, int pageSize, int maxOrder, + int pageShifts, int chunkSize, int directMemoryCacheAlignment) { + super(parent, pageSize, maxOrder, pageShifts, chunkSize, + directMemoryCacheAlignment); } @Override @@ -701,16 +721,35 @@ abstract class PoolArena implements PoolArenaMetric { return true; } + private int offsetCacheLine(ByteBuffer memory) { + return (int) (PlatformDependent.directBufferAddress(memory) & directMemoryCacheAlignmentMask); + } + @Override - protected PoolChunk newChunk(int pageSize, int maxOrder, int pageShifts, int chunkSize) { - return new PoolChunk( - this, allocateDirect(chunkSize), - pageSize, maxOrder, pageShifts, chunkSize); + protected PoolChunk newChunk(int pageSize, int maxOrder, + int pageShifts, int chunkSize) { + if (directMemoryCacheAlignment == 0) { + return new PoolChunk(this, + allocateDirect(chunkSize), pageSize, maxOrder, + pageShifts, chunkSize, 0); + } + final ByteBuffer memory = allocateDirect(chunkSize + + directMemoryCacheAlignment); + return new PoolChunk(this, memory, pageSize, + maxOrder, pageShifts, chunkSize, + offsetCacheLine(memory)); } @Override protected PoolChunk newUnpooledChunk(int capacity) { - return new PoolChunk(this, allocateDirect(capacity), capacity); + if (directMemoryCacheAlignment == 0) { + return new PoolChunk(this, + allocateDirect(capacity), capacity, 0); + } + final ByteBuffer memory = allocateDirect(capacity + + directMemoryCacheAlignment); + return new PoolChunk(this, memory, capacity, + offsetCacheLine(memory)); } private static ByteBuffer allocateDirect(int capacity) { diff --git a/buffer/src/main/java/io/netty/buffer/PoolChunk.java b/buffer/src/main/java/io/netty/buffer/PoolChunk.java index dbc690236c..f107fdf8ac 100644 --- a/buffer/src/main/java/io/netty/buffer/PoolChunk.java +++ b/buffer/src/main/java/io/netty/buffer/PoolChunk.java @@ -107,6 +107,7 @@ final class PoolChunk implements PoolChunkMetric { final PoolArena arena; final T memory; final boolean unpooled; + final int offset; private final byte[] memoryMap; private final byte[] depthMap; @@ -131,7 +132,7 @@ final class PoolChunk implements PoolChunkMetric { // TODO: Test if adding padding helps under contention //private long pad0, pad1, pad2, pad3, pad4, pad5, pad6, pad7; - PoolChunk(PoolArena arena, T memory, int pageSize, int maxOrder, int pageShifts, int chunkSize) { + PoolChunk(PoolArena arena, T memory, int pageSize, int maxOrder, int pageShifts, int chunkSize, int offset) { unpooled = false; this.arena = arena; this.memory = memory; @@ -139,6 +140,7 @@ final class PoolChunk implements PoolChunkMetric { this.pageShifts = pageShifts; this.maxOrder = maxOrder; this.chunkSize = chunkSize; + this.offset = offset; unusable = (byte) (maxOrder + 1); log2ChunkSize = log2(chunkSize); subpageOverflowMask = ~(pageSize - 1); @@ -165,10 +167,11 @@ final class PoolChunk implements PoolChunkMetric { } /** Creates a special chunk that is not pooled. */ - PoolChunk(PoolArena arena, T memory, int size) { + PoolChunk(PoolArena arena, T memory, int size, int offset) { unpooled = true; this.arena = arena; this.memory = memory; + this.offset = offset; memoryMap = null; depthMap = null; subpages = null; @@ -371,7 +374,7 @@ final class PoolChunk implements PoolChunkMetric { if (bitmapIdx == 0) { byte val = value(memoryMapIdx); assert val == unusable : String.valueOf(val); - buf.init(this, handle, runOffset(memoryMapIdx), reqCapacity, runLength(memoryMapIdx), + buf.init(this, handle, runOffset(memoryMapIdx) + offset, reqCapacity, runLength(memoryMapIdx), arena.parent.threadCache()); } else { initBufWithSubpage(buf, handle, bitmapIdx, reqCapacity); @@ -393,8 +396,8 @@ final class PoolChunk implements PoolChunkMetric { buf.init( this, handle, - runOffset(memoryMapIdx) + (bitmapIdx & 0x3FFFFFFF) * subpage.elemSize, reqCapacity, subpage.elemSize, - arena.parent.threadCache()); + runOffset(memoryMapIdx) + (bitmapIdx & 0x3FFFFFFF) * subpage.elemSize + offset, + reqCapacity, subpage.elemSize, arena.parent.threadCache()); } private byte value(int id) { diff --git a/buffer/src/main/java/io/netty/buffer/PooledByteBuf.java b/buffer/src/main/java/io/netty/buffer/PooledByteBuf.java index 10354e6e6a..aac5f531cf 100644 --- a/buffer/src/main/java/io/netty/buffer/PooledByteBuf.java +++ b/buffer/src/main/java/io/netty/buffer/PooledByteBuf.java @@ -63,7 +63,7 @@ abstract class PooledByteBuf extends AbstractReferenceCountedByteBuf { this.chunk = chunk; handle = 0; memory = chunk.memory; - offset = 0; + offset = chunk.offset; this.length = maxLength = length; tmpNioBuf = null; cache = null; diff --git a/buffer/src/main/java/io/netty/buffer/PooledByteBufAllocator.java b/buffer/src/main/java/io/netty/buffer/PooledByteBufAllocator.java index 73c8146493..5442ae5ffc 100644 --- a/buffer/src/main/java/io/netty/buffer/PooledByteBufAllocator.java +++ b/buffer/src/main/java/io/netty/buffer/PooledByteBufAllocator.java @@ -43,6 +43,7 @@ public class PooledByteBufAllocator extends AbstractByteBufAllocator { private static final int DEFAULT_MAX_CACHED_BUFFER_CAPACITY; private static final int DEFAULT_CACHE_TRIM_INTERVAL; private static final boolean DEFAULT_USE_CACHE_FOR_ALL_THREADS; + private static final int DEFAULT_DIRECT_MEMORY_CACHE_ALIGNMENT; private static final int MIN_PAGE_SIZE = 4096; private static final int MAX_CHUNK_SIZE = (int) (((long) Integer.MAX_VALUE + 1) / 2); @@ -108,6 +109,9 @@ public class PooledByteBufAllocator extends AbstractByteBufAllocator { DEFAULT_USE_CACHE_FOR_ALL_THREADS = SystemPropertyUtil.getBoolean( "io.netty.allocator.useCacheForAllThreads", true); + DEFAULT_DIRECT_MEMORY_CACHE_ALIGNMENT = SystemPropertyUtil.getInt( + "io.netty.allocator.directMemoryCacheAlignment", 0); + if (logger.isDebugEnabled()) { logger.debug("-Dio.netty.allocator.numHeapArenas: {}", DEFAULT_NUM_HEAP_ARENA); logger.debug("-Dio.netty.allocator.numDirectArenas: {}", DEFAULT_NUM_DIRECT_ARENA); @@ -175,12 +179,21 @@ public class PooledByteBufAllocator extends AbstractByteBufAllocator { public PooledByteBufAllocator(boolean preferDirect, int nHeapArena, int nDirectArena, int pageSize, int maxOrder, int tinyCacheSize, int smallCacheSize, int normalCacheSize) { this(preferDirect, nHeapArena, nDirectArena, pageSize, maxOrder, tinyCacheSize, smallCacheSize, - normalCacheSize, DEFAULT_USE_CACHE_FOR_ALL_THREADS); + normalCacheSize, DEFAULT_USE_CACHE_FOR_ALL_THREADS, DEFAULT_DIRECT_MEMORY_CACHE_ALIGNMENT); + } + + public PooledByteBufAllocator(boolean preferDirect, int nHeapArena, + int nDirectArena, int pageSize, int maxOrder, int tinyCacheSize, + int smallCacheSize, int normalCacheSize, + boolean useCacheForAllThreads) { + this(preferDirect, nHeapArena, nDirectArena, pageSize, maxOrder, + tinyCacheSize, smallCacheSize, normalCacheSize, + useCacheForAllThreads, DEFAULT_DIRECT_MEMORY_CACHE_ALIGNMENT); } public PooledByteBufAllocator(boolean preferDirect, int nHeapArena, int nDirectArena, int pageSize, int maxOrder, int tinyCacheSize, int smallCacheSize, int normalCacheSize, - boolean useCacheForAllThreads) { + boolean useCacheForAllThreads, int directMemoryCacheAlignment) { super(preferDirect); threadCache = new PoolThreadLocalCache(useCacheForAllThreads); this.tinyCacheSize = tinyCacheSize; @@ -195,13 +208,25 @@ public class PooledByteBufAllocator extends AbstractByteBufAllocator { throw new IllegalArgumentException("nDirectArea: " + nDirectArena + " (expected: >= 0)"); } + if (directMemoryCacheAlignment < 0) { + throw new IllegalArgumentException("directMemoryCacheAlignment: " + + directMemoryCacheAlignment + " (expected: >= 0)"); + } + + if ((directMemoryCacheAlignment & -directMemoryCacheAlignment) != directMemoryCacheAlignment) { + throw new IllegalArgumentException("directMemoryCacheAlignment: " + + directMemoryCacheAlignment + " (expected: power of two)"); + } + int pageShifts = validateAndCalculatePageShifts(pageSize); if (nHeapArena > 0) { heapArenas = newArenaArray(nHeapArena); List metrics = new ArrayList(heapArenas.length); for (int i = 0; i < heapArenas.length; i ++) { - PoolArena.HeapArena arena = new PoolArena.HeapArena(this, pageSize, maxOrder, pageShifts, chunkSize); + PoolArena.HeapArena arena = new PoolArena.HeapArena(this, + pageSize, maxOrder, pageShifts, chunkSize, + directMemoryCacheAlignment); heapArenas[i] = arena; metrics.add(arena); } @@ -216,7 +241,7 @@ public class PooledByteBufAllocator extends AbstractByteBufAllocator { List metrics = new ArrayList(directArenas.length); for (int i = 0; i < directArenas.length; i ++) { PoolArena.DirectArena arena = new PoolArena.DirectArena( - this, pageSize, maxOrder, pageShifts, chunkSize); + this, pageSize, maxOrder, pageShifts, chunkSize, directMemoryCacheAlignment); directArenas[i] = arena; metrics.add(arena); } diff --git a/buffer/src/test/java/io/netty/buffer/PoolArenaTest.java b/buffer/src/test/java/io/netty/buffer/PoolArenaTest.java index f27c41ca3e..3fafef9c04 100644 --- a/buffer/src/test/java/io/netty/buffer/PoolArenaTest.java +++ b/buffer/src/test/java/io/netty/buffer/PoolArenaTest.java @@ -25,7 +25,7 @@ public class PoolArenaTest { @Test public void testNormalizeCapacity() throws Exception { - PoolArena arena = new PoolArena.DirectArena(null, 0, 0, 9, 999999); + PoolArena arena = new PoolArena.DirectArena(null, 0, 0, 9, 999999, 0); int[] reqCapacities = {0, 15, 510, 1024, 1023, 1025}; int[] expectedResult = {0, 16, 512, 1024, 1024, 2048}; for (int i = 0; i < reqCapacities.length; i ++) { @@ -33,6 +33,16 @@ public class PoolArenaTest { } } + @Test + public void testNormalizeAlignedCapacity() throws Exception { + PoolArena arena = new PoolArena.DirectArena(null, 0, 0, 9, 999999, 64); + int[] reqCapacities = {0, 15, 510, 1024, 1023, 1025}; + int[] expectedResult = {0, 64, 512, 1024, 1024, 2048}; + for (int i = 0; i < reqCapacities.length; i ++) { + Assert.assertEquals(expectedResult[i], arena.normalizeCapacity(reqCapacities[i])); + } + } + @Test public final void testAllocationCounter() { final PooledByteBufAllocator allocator = new PooledByteBufAllocator( diff --git a/buffer/src/test/java/io/netty/buffer/PooledByteBufAllocatorTest.java b/buffer/src/test/java/io/netty/buffer/PooledByteBufAllocatorTest.java index a51e22d244..e699905e6d 100644 --- a/buffer/src/test/java/io/netty/buffer/PooledByteBufAllocatorTest.java +++ b/buffer/src/test/java/io/netty/buffer/PooledByteBufAllocatorTest.java @@ -52,6 +52,16 @@ public class PooledByteBufAllocatorTest extends AbstractByteBufAllocatorTest { testArenaMetrics0(new PooledByteBufAllocator(true, 2, 2, 8192, 11, 1000, 1000, 1000), 100, 1, 1, 0); } + @Test + public void testArenaMetricsNoCacheAlign() { + testArenaMetrics0(new PooledByteBufAllocator(true, 2, 2, 8192, 11, 0, 0, 0, true, 64), 100, 0, 100, 100); + } + + @Test + public void testArenaMetricsCacheAlign() { + testArenaMetrics0(new PooledByteBufAllocator(true, 2, 2, 8192, 11, 1000, 1000, 1000, true, 64), 100, 1, 1, 0); + } + private static void testArenaMetrics0( PooledByteBufAllocator allocator, int num, int expectedActive, int expectedAlloc, int expectedDealloc) { for (int i = 0; i < num; i++) { diff --git a/microbench/src/main/java/io/netty/microbench/buffer/ByteBufAllocatorBenchmark.java b/microbench/src/main/java/io/netty/microbench/buffer/ByteBufAllocatorBenchmark.java index a2e7b6ea2e..5f1b787054 100644 --- a/microbench/src/main/java/io/netty/microbench/buffer/ByteBufAllocatorBenchmark.java +++ b/microbench/src/main/java/io/netty/microbench/buffer/ByteBufAllocatorBenchmark.java @@ -35,7 +35,7 @@ public class ByteBufAllocatorBenchmark extends AbstractMicrobenchmark { private static final ByteBufAllocator unpooledAllocator = new UnpooledByteBufAllocator(true); private static final ByteBufAllocator pooledAllocator = - new PooledByteBufAllocator(true, 4, 4, 8192, 11, 0, 0, 0); // Disable thread-local cache + new PooledByteBufAllocator(true, 4, 4, 8192, 11, 0, 0, 0, true, 0); // Disable thread-local cache private static final int MAX_LIVE_BUFFERS = 8192; private static final Random rand = new Random(); diff --git a/microbench/src/main/java/io/netty/microbench/buffer/PooledByteBufAllocatorAlignBenchmark.java b/microbench/src/main/java/io/netty/microbench/buffer/PooledByteBufAllocatorAlignBenchmark.java new file mode 100644 index 0000000000..32ffd13254 --- /dev/null +++ b/microbench/src/main/java/io/netty/microbench/buffer/PooledByteBufAllocatorAlignBenchmark.java @@ -0,0 +1,152 @@ +/* + * Copyright 2017 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.microbench.buffer; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.PooledByteBufAllocator; +import io.netty.microbench.util.AbstractMicrobenchmark; + +import java.util.Random; +import java.util.concurrent.TimeUnit; + +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Warmup; + +@State(Scope.Thread) +@Warmup(iterations = 5, time = 100, timeUnit = TimeUnit.MILLISECONDS) +@Measurement(iterations = 5, time = 100, timeUnit = TimeUnit.MILLISECONDS) +@Fork(5) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +public class PooledByteBufAllocatorAlignBenchmark extends + AbstractMicrobenchmark { + + private static final Random rand = new Random(); + + /** + * Cache line power of 2. + */ + private static final int CACHE_LINE_MAX = 256; + + /** + * PRNG to walk the chunk randomly to avoid streaming reads. + */ + private static final int OFFSET_ADD = CACHE_LINE_MAX * 1337; + + /** + * Block of bytes to write/read. (Corresponds to int type) + */ + private static final int BLOCK = 4; + + @Param({ "0", "64" }) + private int cacheAlign; + + @Param({ "01024", "04096", "16384", "65536", "1048576" }) + private int size; + + private ByteBuf pooledDirectBuffer; + + private byte[] bytes; + + private int sizeMask; + + private int alignOffset; + + @Setup + public void doSetup() { + PooledByteBufAllocator pooledAllocator = new PooledByteBufAllocator(true, 4, 4, 8192, 11, 0, + 0, 0, true, cacheAlign); + pooledDirectBuffer = pooledAllocator.directBuffer(size + 64); + sizeMask = size - 1; + if (cacheAlign == 0) { + long addr = pooledDirectBuffer.memoryAddress(); + // make sure address is miss-aligned + if (addr % 64 == 0) { + alignOffset = 63; + } + int off = 0; + for (int c = 0; c < size; c++) { + off = (off + OFFSET_ADD) & sizeMask; + if ((addr + off + alignOffset) % BLOCK == 0) { + throw new IllegalStateException( + "Misaligned address is not really aligned"); + } + } + } else { + alignOffset = 0; + int off = 0; + long addr = pooledDirectBuffer.memoryAddress(); + for (int c = 0; c < size; c++) { + off = (off + OFFSET_ADD) & sizeMask; + if ((addr + off) % BLOCK != 0) { + throw new IllegalStateException( + "Aligned address is not really aligned"); + } + } + } + bytes = new byte[BLOCK]; + rand.nextBytes(bytes); + } + + @TearDown + public void doTearDown() { + pooledDirectBuffer.release(); + } + + @Benchmark + public void writeRead() { + int off = 0; + int lSize = size; + int lSizeMask = sizeMask; + int lAlignOffset = alignOffset; + for (int i = 0; i < lSize; i++) { + off = (off + OFFSET_ADD) & lSizeMask; + pooledDirectBuffer.setBytes(off + lAlignOffset, bytes); + pooledDirectBuffer.getBytes(off + lAlignOffset, bytes); + } + } + + @Benchmark + public void write() { + int off = 0; + int lSize = size; + int lSizeMask = sizeMask; + int lAlignOffset = alignOffset; + for (int i = 0; i < lSize; i++) { + off = (off + OFFSET_ADD) & lSizeMask; + pooledDirectBuffer.setBytes(off + lAlignOffset, bytes); + } + } + + @Benchmark + public void read() { + int off = 0; + int lSize = size; + int lSizeMask = sizeMask; + int lAlignOffset = alignOffset; + for (int i = 0; i < lSize; i++) { + off = (off + OFFSET_ADD) & lSizeMask; + pooledDirectBuffer.getBytes(off + lAlignOffset, bytes); + } + } +}