Implement SWAR indexOf byte search (#10737)

Motivation:

Faster indexOf

Modification:

Create generic SWAR indexOf that any ByteBuf implementation can use

Result:

Fixes #10731
This commit is contained in:
Francesco Nigro 2021-01-15 15:09:27 +01:00 committed by Norman Maurer
parent aed20979e6
commit 5337d3eeb4
4 changed files with 269 additions and 53 deletions

View File

@ -1200,43 +1200,9 @@ public abstract class AbstractByteBuf extends ByteBuf {
@Override
public int indexOf(int fromIndex, int toIndex, byte value) {
if (fromIndex <= toIndex) {
return firstIndexOf(fromIndex, toIndex, value);
} else {
return lastIndexOf(fromIndex, toIndex, value);
return ByteBufUtil.firstIndexOf(this, fromIndex, toIndex, value);
}
}
private int firstIndexOf(int fromIndex, int toIndex, byte value) {
fromIndex = Math.max(fromIndex, 0);
if (fromIndex >= toIndex || capacity() == 0) {
return -1;
}
checkIndex(fromIndex, toIndex - fromIndex);
for (int i = fromIndex; i < toIndex; i ++) {
if (_getByte(i) == value) {
return i;
}
}
return -1;
}
private int lastIndexOf(int fromIndex, int toIndex, byte value) {
fromIndex = Math.min(fromIndex, capacity());
if (fromIndex < 0 || capacity() == 0) {
return -1;
}
checkIndex(toIndex, fromIndex - toIndex);
for (int i = fromIndex - 1; i >= toIndex; i --) {
if (_getByte(i) == value) {
return i;
}
}
return -1;
return ByteBufUtil.lastIndexOf(this, fromIndex, toIndex, value);
}
@Override

View File

@ -388,16 +388,124 @@ public final class ByteBufUtil {
return 0;
}
private static final class SWARByteSearch {
private static long compilePattern(byte byteToFind) {
return (byteToFind & 0xFFL) * 0x101010101010101L;
}
private static int firstAnyPattern(long word, long pattern, boolean leading) {
long input = word ^ pattern;
long tmp = (input & 0x7F7F7F7F7F7F7F7FL) + 0x7F7F7F7F7F7F7F7FL;
tmp = ~(tmp | input | 0x7F7F7F7F7F7F7F7FL);
final int binaryPosition = leading? Long.numberOfLeadingZeros(tmp) : Long.numberOfTrailingZeros(tmp);
return binaryPosition >>> 3;
}
}
private static int unrolledFirstIndexOf(AbstractByteBuf buffer, int fromIndex, int byteCount, byte value) {
assert byteCount > 0 && byteCount < 8;
if (buffer._getByte(fromIndex) == value) {
return fromIndex;
}
if (byteCount == 1) {
return -1;
}
if (buffer._getByte(fromIndex + 1) == value) {
return fromIndex + 1;
}
if (byteCount == 2) {
return -1;
}
if (buffer._getByte(fromIndex + 2) == value) {
return fromIndex + 2;
}
if (byteCount == 3) {
return -1;
}
if (buffer._getByte(fromIndex + 3) == value) {
return fromIndex + 3;
}
if (byteCount == 4) {
return -1;
}
if (buffer._getByte(fromIndex + 4) == value) {
return fromIndex + 4;
}
if (byteCount == 5) {
return -1;
}
if (buffer._getByte(fromIndex + 5) == value) {
return fromIndex + 5;
}
if (byteCount == 6) {
return -1;
}
if (buffer._getByte(fromIndex + 6) == value) {
return fromIndex + 6;
}
return -1;
}
/**
* This is using a SWAR (SIMD Within A Register) batch read technique to minimize bound-checks and improve memory
* usage while searching for {@code value}.
*/
static int firstIndexOf(AbstractByteBuf buffer, int fromIndex, int toIndex, byte value) {
fromIndex = Math.max(fromIndex, 0);
if (fromIndex >= toIndex || buffer.capacity() == 0) {
return -1;
}
final int length = toIndex - fromIndex;
buffer.checkIndex(fromIndex, length);
if (!PlatformDependent.isUnaligned()) {
return linearFirstIndexOf(buffer, fromIndex, toIndex, value);
}
assert PlatformDependent.isUnaligned();
int offset = fromIndex;
final int byteCount = length & 7;
if (byteCount > 0) {
final int index = unrolledFirstIndexOf(buffer, fromIndex, byteCount, value);
if (index != -1) {
return index;
}
offset += byteCount;
if (offset == toIndex) {
return -1;
}
}
final int longCount = length >>> 3;
final ByteOrder nativeOrder = ByteOrder.nativeOrder();
final boolean isNative = nativeOrder == buffer.order();
final boolean useLE = nativeOrder == ByteOrder.LITTLE_ENDIAN;
final long pattern = SWARByteSearch.compilePattern(value);
for (int i = 0; i < longCount; i++) {
// use the faster available getLong
final long word = useLE? buffer._getLongLE(offset) : buffer._getLong(offset);
int index = SWARByteSearch.firstAnyPattern(word, pattern, isNative);
if (index < Long.BYTES) {
return offset + index;
}
offset += Long.BYTES;
}
return -1;
}
private static int linearFirstIndexOf(AbstractByteBuf buffer, int fromIndex, int toIndex, byte value) {
for (int i = fromIndex; i < toIndex; i++) {
if (buffer._getByte(i) == value) {
return i;
}
}
return -1;
}
/**
* The default implementation of {@link ByteBuf#indexOf(int, int, byte)}.
* This method is useful when implementing a new buffer type.
*/
public static int indexOf(ByteBuf buffer, int fromIndex, int toIndex, byte value) {
if (fromIndex <= toIndex) {
return firstIndexOf(buffer, fromIndex, toIndex, value);
} else {
return lastIndexOf(buffer, fromIndex, toIndex, value);
}
return buffer.indexOf(fromIndex, toIndex, value);
}
/**
@ -476,23 +584,21 @@ public final class ByteBufUtil {
}
}
private static int firstIndexOf(ByteBuf buffer, int fromIndex, int toIndex, byte value) {
fromIndex = Math.max(fromIndex, 0);
if (fromIndex >= toIndex || buffer.capacity() == 0) {
return -1;
}
return buffer.forEachByte(fromIndex, toIndex - fromIndex, new ByteProcessor.IndexOfProcessor(value));
}
private static int lastIndexOf(ByteBuf buffer, int fromIndex, int toIndex, byte value) {
int capacity = buffer.capacity();
static int lastIndexOf(AbstractByteBuf buffer, int fromIndex, int toIndex, byte value) {
assert fromIndex > toIndex;
final int capacity = buffer.capacity();
fromIndex = Math.min(fromIndex, capacity);
if (fromIndex < 0 || capacity == 0) {
return -1;
}
buffer.checkIndex(toIndex, fromIndex - toIndex);
for (int i = fromIndex - 1; i >= toIndex; i--) {
if (buffer._getByte(i) == value) {
return i;
}
}
return buffer.forEachByteDesc(toIndex, fromIndex - toIndex, new ByteProcessor.IndexOfProcessor(value));
return -1;
}
private static CharSequence checkCharSequenceBounds(CharSequence seq, int start, int end) {

View File

@ -2114,6 +2114,36 @@ public abstract class AbstractByteBufTest {
}
}
@Test
public void testSWARIndexOf() {
ByteBuf buffer = newBuffer(16);
buffer.clear();
// Ensure the buffer is completely zero'ed.
buffer.setZero(0, buffer.capacity());
buffer.writeByte((byte) 0); // 0
buffer.writeByte((byte) 0);
buffer.writeByte((byte) 0);
buffer.writeByte((byte) 0);
buffer.writeByte((byte) 0);
buffer.writeByte((byte) 0);
buffer.writeByte((byte) 0);
buffer.writeByte((byte) 0); // 7
buffer.writeByte((byte) 0);
buffer.writeByte((byte) 0);
buffer.writeByte((byte) 0);
buffer.writeByte((byte) 1); // 11
buffer.writeByte((byte) 2);
buffer.writeByte((byte) 3);
buffer.writeByte((byte) 4);
buffer.writeByte((byte) 1);
assertEquals(11, buffer.indexOf(0, 12, (byte) 1));
assertEquals(12, buffer.indexOf(0, 16, (byte) 2));
assertEquals(-1, buffer.indexOf(0, 11, (byte) 1));
assertEquals(11, buffer.indexOf(0, 16, (byte) 1));
buffer.release();
}
@Test
public void testIndexOf() {
buffer.clear();

View File

@ -0,0 +1,114 @@
/*
* Copyright 2020 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.microbench.buffer;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.microbench.util.AbstractMicrobenchmark;
import io.netty.util.internal.SuppressJava6Requirement;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.TearDown;
import org.openjdk.jmh.annotations.Warmup;
import java.util.SplittableRandom;
import java.util.concurrent.TimeUnit;
@State(Scope.Benchmark)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
@Fork(2)
@Warmup(iterations = 5, time = 1)
@Measurement(iterations = 8, time = 1)
public class ByteBufIndexOfBenchmark extends AbstractMicrobenchmark {
@Param({ "7", "16", "23", "32" })
int size;
@Param({ "4", "11" })
int logPermutations;
@Param({ "1" })
int seed;
int permutations;
ByteBuf[] data;
private int i;
@Param({ "0" })
private byte needleByte;
@Param({ "true", "false" })
private boolean direct;
@Param({ "false", "true" })
private boolean noUnsafe;
@Param({ "false", "true" })
private boolean pooled;
@Setup(Level.Trial)
@SuppressJava6Requirement(reason = "using SplittableRandom to reliably produce data")
public void init() {
System.setProperty("io.netty.noUnsafe", Boolean.valueOf(noUnsafe).toString());
SplittableRandom random = new SplittableRandom(seed);
permutations = 1 << logPermutations;
this.data = new ByteBuf[permutations];
final ByteBufAllocator allocator = pooled? PooledByteBufAllocator.DEFAULT : UnpooledByteBufAllocator.DEFAULT;
for (int i = 0; i < permutations; ++i) {
data[i] = direct? allocator.directBuffer(size, size) : allocator.heapBuffer(size, size);
for (int j = 0; j < size; j++) {
int value = random.nextInt(Byte.MIN_VALUE, Byte.MAX_VALUE + 1);
// turn any found value into something different
if (value == needleByte) {
if (needleByte != 1) {
value = 1;
} else {
value = 0;
}
}
data[i].setByte(j, value);
}
final int foundIndex = random.nextInt(Math.max(0, size - 8), size);
data[i].setByte(foundIndex, needleByte);
}
}
private ByteBuf getData() {
return data[i++ & (permutations - 1)];
}
@Benchmark
public int indexOf() {
return getData().indexOf(0, size, needleByte);
}
@TearDown
public void releaseBuffers() {
for (ByteBuf buffer : data) {
buffer.release();
}
}
}