Create bespoke long/long hashmap and long-valued priority queue for PoolChunk (#10826)

Motivation:
The uncached access to PoolChunk can be made faster, and avoid allocating boxed Longs, if we have a primitive hash map and priority queue implementation for it.

Modification:
Add bespoke primitive implementations of a hash map and a priority queue for PoolChunk.
Remove all the long-boxing caused by the previous implementation.
The hashmap is a linear probing map with a fairly short probe that keeps the search within a couple of cache lines.
The priority queue is the same binary heap algorithm that's described in Algorithms by Sedgewick and Wayne.
The implementation avoids the Long boxing by relying on a long[] array.
This makes the internal-remove method faster, which is an important operation in PoolChunk.

Result:
Roughly 13% performance uplift in buffer allocations that miss cache.
This commit is contained in:
Chris Vest 2020-11-29 11:29:46 +01:00 committed by Norman Maurer
parent 23c0bbb904
commit 86730f53ca
6 changed files with 483 additions and 30 deletions

View File

@ -0,0 +1,129 @@
/*
* Copyright 2020 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.buffer;
/**
* Internal primitive map implementation that is specifically optimised for the runs availability map use case in {@link
* PoolChunk}.
*/
final class LongLongHashMap {
private static final int MASK_TEMPLATE = ~1;
private int mask;
private long[] array;
private int maxProbe;
private long zeroVal;
private final long emptyVal;
LongLongHashMap(long emptyVal) {
this.emptyVal = emptyVal;
zeroVal = emptyVal;
int initialSize = 32;
array = new long[initialSize];
mask = initialSize - 1;
computeMaskAndProbe();
}
public long put(long key, long value) {
if (key == 0) {
long prev = zeroVal;
zeroVal = value;
return prev;
}
for (;;) {
int index = index(key);
for (int i = 0; i < maxProbe; i++) {
long existing = array[index];
if (existing == key || existing == 0) {
long prev = existing == 0? emptyVal : array[index + 1];
array[index] = key;
array[index + 1] = value;
for (; i < maxProbe; i++) { // Nerf any existing misplaced entries.
index = index + 2 & mask;
if (array[index] == key) {
array[index] = 0;
prev = array[index + 1];
break;
}
}
return prev;
}
index = index + 2 & mask;
}
expand(); // Grow array and re-hash.
}
}
public void remove(long key) {
if (key == 0) {
zeroVal = emptyVal;
return;
}
int index = index(key);
for (int i = 0; i < maxProbe; i++) {
long existing = array[index];
if (existing == key) {
array[index] = 0;
break;
}
index = index + 2 & mask;
}
}
public long get(long key) {
if (key == 0) {
return zeroVal;
}
int index = index(key);
for (int i = 0; i < maxProbe; i++) {
long existing = array[index];
if (existing == key) {
return array[index + 1];
}
index = index + 2 & mask;
}
return emptyVal;
}
private int index(long key) {
// Hash with murmur64, and mask.
key ^= key >>> 33;
key *= 0xff51afd7ed558ccdL;
key ^= key >>> 33;
key *= 0xc4ceb9fe1a85ec53L;
key ^= key >>> 33;
return (int) key & mask;
}
private void expand() {
long[] prev = array;
array = new long[prev.length * 2];
computeMaskAndProbe();
for (int i = 0; i < prev.length; i += 2) {
long key = prev[i];
if (key != 0) {
long val = prev[i + 1];
put(key, val);
}
}
}
private void computeMaskAndProbe() {
int length = array.length;
mask = length - 1 & MASK_TEMPLATE;
maxProbe = (int) Math.log(length);
}
}

View File

@ -0,0 +1,105 @@
/*
* Copyright 2020 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.buffer;
import java.util.Arrays;
/**
* Internal primitive priority queue, used by {@link PoolChunk}.
* The implementation is based on the binary heap, as described in Algorithms by Sedgewick and Wayne.
*/
final class LongPriorityQueue {
public static final int NO_VALUE = -1;
private long[] array = new long[9];
private int size;
public void offer(long handle) {
if (handle == NO_VALUE) {
throw new IllegalArgumentException("The NO_VALUE (" + NO_VALUE + ") cannot be added to the queue.");
}
size++;
if (size == array.length) {
// Grow queue capacity.
array = Arrays.copyOf(array, 1 + (array.length - 1) * 2);
}
array[size] = handle;
lift();
}
public void remove(long value) {
for (int i = 1; i <= size; i++) {
if (array[i] == value) {
if (i == size) {
array[i] = 0;
} else {
array[i] = array[size];
sink(i);
}
size--;
return;
}
}
}
public long poll() {
if (size == 0) {
return NO_VALUE;
}
long val = array[1];
array[1] = array[size];
array[size] = 0;
size--;
sink(1);
return val;
}
public boolean isEmpty() {
return size == 0;
}
private void lift() {
int index = size;
int parentIndex;
while (index > 1 && subord(parentIndex = index >> 1, index)) {
swap(index, parentIndex);
index = parentIndex;
}
}
private void sink(int index) {
int child;
while ((child = index << 1) <= size) {
if (child < size && subord(child, child + 1)) {
child++;
}
if (!subord(index, child)) {
break;
}
swap(index, child);
index = child;
}
}
private boolean subord(int a, int b) {
return array[a] > array[b];
}
private void swap(int a, int b) {
long value = array[a];
array[a] = array[b];
array[b] = value;
}
}

View File

@ -191,7 +191,7 @@ abstract class PoolArena<T> extends SizeClasses implements PoolArenaMetric {
}
}
// Method must be called inside synchronized(this) { ... } block
// Method must be called inside synchronized(this) { ... } block
private void allocateNormal(PooledByteBuf<T> buf, int reqCapacity, int sizeIdx, PoolThreadCache threadCache) {
if (q050.allocate(buf, reqCapacity, sizeIdx, threadCache) ||
q025.allocate(buf, reqCapacity, sizeIdx, threadCache) ||

View File

@ -15,9 +15,6 @@
*/
package io.netty.buffer;
import io.netty.util.collection.IntObjectHashMap;
import io.netty.util.collection.IntObjectMap;
import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.Deque;
@ -133,8 +130,6 @@ import java.util.PriorityQueue;
*
*/
final class PoolChunk<T> implements PoolChunkMetric {
private static final int OFFSET_BIT_LENGTH = 15;
private static final int SIZE_BIT_LENGTH = 15;
private static final int INUSED_BIT_LENGTH = 1;
private static final int SUBPAGE_BIT_LENGTH = 1;
@ -153,12 +148,12 @@ final class PoolChunk<T> implements PoolChunkMetric {
/**
* store the first page and last page of each avail run
*/
private final IntObjectMap<Long> runsAvailMap;
private final LongLongHashMap runsAvailMap;
/**
* manage all avail runs
*/
private final PriorityQueue<Long>[] runsAvail;
private final LongPriorityQueue[] runsAvail;
/**
* manage all subpages in this chunk
@ -197,7 +192,7 @@ final class PoolChunk<T> implements PoolChunkMetric {
freeBytes = chunkSize;
runsAvail = newRunsAvailqueueArray(maxPageIdx);
runsAvailMap = new IntObjectHashMap<Long>();
runsAvailMap = new LongLongHashMap(-1);
subpages = new PoolSubpage[chunkSize >> pageShifts];
//insert initial run, offset = 0, pages = chunkSize / pageSize
@ -223,18 +218,17 @@ final class PoolChunk<T> implements PoolChunkMetric {
cachedNioBuffers = null;
}
@SuppressWarnings("unchecked")
private static PriorityQueue<Long>[] newRunsAvailqueueArray(int size) {
PriorityQueue<Long>[] queueArray = new PriorityQueue[size];
private static LongPriorityQueue[] newRunsAvailqueueArray(int size) {
LongPriorityQueue[] queueArray = new LongPriorityQueue[size];
for (int i = 0; i < queueArray.length; i++) {
queueArray[i] = new PriorityQueue<Long>();
queueArray[i] = new LongPriorityQueue();
}
return queueArray;
}
private void insertAvailRun(int runOffset, int pages, Long handle) {
private void insertAvailRun(int runOffset, int pages, long handle) {
int pageIdxFloor = arena.pages2pageIdxFloor(pages);
PriorityQueue<Long> queue = runsAvail[pageIdxFloor];
LongPriorityQueue queue = runsAvail[pageIdxFloor];
queue.offer(handle);
//insert first page of run
@ -245,18 +239,18 @@ final class PoolChunk<T> implements PoolChunkMetric {
}
}
private void insertAvailRun0(int runOffset, Long handle) {
Long pre = runsAvailMap.put(runOffset, handle);
assert pre == null;
private void insertAvailRun0(int runOffset, long handle) {
long pre = runsAvailMap.put(runOffset, handle);
assert pre == -1;
}
private void removeAvailRun(Long handle) {
private void removeAvailRun(long handle) {
int pageIdxFloor = arena.pages2pageIdxFloor(runPages(handle));
PriorityQueue<Long> queue = runsAvail[pageIdxFloor];
LongPriorityQueue queue = runsAvail[pageIdxFloor];
removeAvailRun(queue, handle);
}
private void removeAvailRun(PriorityQueue<Long> queue, Long handle) {
private void removeAvailRun(LongPriorityQueue queue, long handle) {
queue.remove(handle);
int runOffset = runOffset(handle);
@ -273,7 +267,7 @@ final class PoolChunk<T> implements PoolChunkMetric {
return runOffset + pages - 1;
}
private Long getAvailRunByOffset(int runOffset) {
private long getAvailRunByOffset(int runOffset) {
return runsAvailMap.get(runOffset);
}
@ -334,10 +328,10 @@ final class PoolChunk<T> implements PoolChunkMetric {
}
//get run with min offset in this queue
PriorityQueue<Long> queue = runsAvail[queueIdx];
Long handle = queue.poll();
LongPriorityQueue queue = runsAvail[queueIdx];
long handle = queue.poll();
assert handle != null && !isUsed(handle);
assert handle != LongPriorityQueue.NO_VALUE && !isUsed(handle) : "invalid handle: " + handle;
removeAvailRun(queue, handle);
@ -380,7 +374,7 @@ final class PoolChunk<T> implements PoolChunkMetric {
return arena.nPSizes - 1;
}
for (int i = pageIdx; i < arena.nPSizes; i++) {
PriorityQueue<Long> queue = runsAvail[i];
LongPriorityQueue queue = runsAvail[i];
if (queue != null && !queue.isEmpty()) {
return i;
}
@ -507,8 +501,8 @@ final class PoolChunk<T> implements PoolChunkMetric {
int runOffset = runOffset(handle);
int runPages = runPages(handle);
Long pastRun = getAvailRunByOffset(runOffset - 1);
if (pastRun == null) {
long pastRun = getAvailRunByOffset(runOffset - 1);
if (pastRun == -1) {
return handle;
}
@ -531,8 +525,8 @@ final class PoolChunk<T> implements PoolChunkMetric {
int runOffset = runOffset(handle);
int runPages = runPages(handle);
Long nextRun = getAvailRunByOffset(runOffset + runPages);
if (nextRun == null) {
long nextRun = getAvailRunByOffset(runOffset + runPages);
if (nextRun == -1) {
return handle;
}

View File

@ -0,0 +1,86 @@
/*
* Copyright 2020 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.buffer;
import org.junit.jupiter.api.Test;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.PrimitiveIterator.OfLong;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
import static org.assertj.core.api.Assertions.assertThat;
class LongLongHashMapTest {
@Test
public void zeroPutGetAndRemove() {
LongLongHashMap map = new LongLongHashMap(-1);
assertThat(map.put(0, 42)).isEqualTo(-1);
assertThat(map.get(0)).isEqualTo(42);
assertThat(map.put(0, 24)).isEqualTo(42);
assertThat(map.get(0)).isEqualTo(24);
map.remove(0);
assertThat(map.get(0)).isEqualTo(-1);
}
@Test
public void mustHandleCollisions() {
LongLongHashMap map = new LongLongHashMap(-1);
Set<Long> set = new HashSet<Long>();
long v = 1;
for (int i = 0; i < 63; i++) {
assertThat(map.put(v, v)).isEqualTo(-1);
set.add(v);
v <<= 1;
}
for (Long value : set) {
assertThat(map.get(value)).isEqualTo(value);
assertThat(map.put(value, -value)).isEqualTo(value);
assertThat(map.get(value)).isEqualTo(-value);
map.remove(value);
assertThat(map.get(value)).isEqualTo(-1);
}
}
@Test
public void randomOperations() {
int operations = 6000;
ThreadLocalRandom tlr = ThreadLocalRandom.current();
Map<Long, Long> expected = new HashMap<Long, Long>();
LongLongHashMap actual = new LongLongHashMap(-1);
OfLong itr = tlr.longs(0, operations).limit(operations * 50).iterator();
while (itr.hasNext()) {
long value = itr.nextLong();
if (expected.containsKey(value)) {
assertThat(actual.get(value)).isEqualTo(expected.get(value));
if (tlr.nextBoolean()) {
actual.remove(value);
expected.remove(value);
assertThat(actual.get(value)).isEqualTo(-1);
} else {
long v = expected.get(value);
assertThat(actual.put(value, -v)).isEqualTo(expected.put(value, -v));
}
} else {
assertThat(actual.get(value)).isEqualTo(-1);
assertThat(actual.put(value, value)).isEqualTo(-1);
expected.put(value, value);
}
}
}
}

View File

@ -0,0 +1,139 @@
/*
* Copyright 2020 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.buffer;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.function.Executable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.ListIterator;
import java.util.concurrent.ThreadLocalRandom;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.*;
class LongPriorityQueueTest {
@Test
public void mustThrowWhenAddingNoValue() {
final LongPriorityQueue pq = new LongPriorityQueue();
assertThrows(IllegalArgumentException.class, new Executable() {
@Override
public void execute() throws Throwable {
pq.offer(LongPriorityQueue.NO_VALUE);
}
});
}
@Test
public void mustReturnValuesInOrder() {
ThreadLocalRandom tlr = ThreadLocalRandom.current();
int initialValues = tlr.nextInt(5, 30);
ArrayList<Long> values = new ArrayList<Long>();
for (int i = 0; i < initialValues; i++) {
values.add(tlr.nextLong(0, Long.MAX_VALUE));
}
LongPriorityQueue pq = new LongPriorityQueue();
assertTrue(pq.isEmpty());
for (Long value : values) {
pq.offer(value);
}
Collections.sort(values);
int valuesToRemove = initialValues / 2;
ListIterator<Long> itr = values.listIterator();
for (int i = 0; i < valuesToRemove; i++) {
assertTrue(itr.hasNext());
assertThat(pq.poll()).isEqualTo(itr.next());
itr.remove();
}
int moreValues = tlr.nextInt(5, 30);
for (int i = 0; i < moreValues; i++) {
long value = tlr.nextLong(0, Long.MAX_VALUE);
pq.offer(value);
values.add(value);
}
Collections.sort(values);
itr = values.listIterator();
while (itr.hasNext()) {
assertThat(pq.poll()).isEqualTo(itr.next());
}
assertTrue(pq.isEmpty());
assertThat(pq.poll()).isEqualTo(LongPriorityQueue.NO_VALUE);
}
@Test
public void internalRemoveOfAllElements() {
ThreadLocalRandom tlr = ThreadLocalRandom.current();
int initialValues = tlr.nextInt(5, 30);
ArrayList<Long> values = new ArrayList<Long>();
LongPriorityQueue pq = new LongPriorityQueue();
for (int i = 0; i < initialValues; i++) {
long value = tlr.nextLong(0, Long.MAX_VALUE);
pq.offer(value);
values.add(value);
}
for (Long value : values) {
pq.remove(value);
}
assertTrue(pq.isEmpty());
assertThat(pq.poll()).isEqualTo(LongPriorityQueue.NO_VALUE);
}
@Test
public void internalRemoveMustPreserveOrder() {
ThreadLocalRandom tlr = ThreadLocalRandom.current();
int initialValues = tlr.nextInt(5, 30);
ArrayList<Long> values = new ArrayList<Long>();
LongPriorityQueue pq = new LongPriorityQueue();
for (int i = 0; i < initialValues; i++) {
long value = tlr.nextLong(0, Long.MAX_VALUE);
pq.offer(value);
values.add(value);
}
long toRemove = values.get(values.size() / 2);
values.remove(toRemove);
pq.remove(toRemove);
Collections.sort(values);
for (Long value : values) {
assertThat(pq.poll()).isEqualTo(value);
}
assertTrue(pq.isEmpty());
assertThat(pq.poll()).isEqualTo(LongPriorityQueue.NO_VALUE);
}
@Test
public void mustSupportDuplicateValues() {
LongPriorityQueue pq = new LongPriorityQueue();
pq.offer(10);
pq.offer(5);
pq.offer(6);
pq.offer(5);
pq.offer(10);
pq.offer(10);
pq.offer(6);
pq.remove(10);
assertThat(pq.poll()).isEqualTo(5);
assertThat(pq.poll()).isEqualTo(5);
assertThat(pq.poll()).isEqualTo(6);
assertThat(pq.poll()).isEqualTo(6);
assertThat(pq.poll()).isEqualTo(10);
assertThat(pq.poll()).isEqualTo(10);
assertTrue(pq.isEmpty());
assertThat(pq.poll()).isEqualTo(LongPriorityQueue.NO_VALUE);
}
}