diff --git a/src/main/java/it/cavallium/dbengine/database/disk/AbstractRocksDBColumn.java b/src/main/java/it/cavallium/dbengine/database/disk/AbstractRocksDBColumn.java index 9ac4a48..5f1f65a 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/AbstractRocksDBColumn.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/AbstractRocksDBColumn.java @@ -40,6 +40,7 @@ import org.rocksdb.CompactRangeOptions; import org.rocksdb.DirectSlice; import org.rocksdb.FlushOptions; import org.rocksdb.Holder; +import org.rocksdb.KeyMayExist; import org.rocksdb.KeyMayExist.KeyMayExistEnum; import org.rocksdb.ReadOptions; import org.rocksdb.RocksDB; @@ -59,8 +60,6 @@ public sealed abstract class AbstractRocksDBColumn implements * Default: true */ private static final boolean USE_DIRECT_BUFFER_BOUNDS = true; - private static final boolean WORKAROUND_MAY_EXIST_FAKE_ZERO - = parseBoolean(getProperty("it.cavallium.dbengine.workaround_may_exist_fake_zero", "true")); private static final byte[] NO_DATA = new byte[0]; protected static final UpdateAtomicResult RESULT_NOTHING = new UpdateAtomicResultNothing(); @@ -99,6 +98,7 @@ public sealed abstract class AbstractRocksDBColumn implements private final Timer updateReplacedTime; private final Timer updateRemovedTime; private final Timer updateUnchangedTime; + private final DBColumnKeyMayExistGetter keyMayExistGetter; public AbstractRocksDBColumn(T db, boolean nettyDirect, @@ -245,6 +245,7 @@ public sealed abstract class AbstractRocksDBColumn implements .publishPercentileHistogram() .tags("db.name", databaseName, "db.column", columnName, "update.type", "unchanged") .register(meterRegistry); + this.keyMayExistGetter = new DBColumnKeyMayExistGetter(); } /** @@ -340,151 +341,7 @@ public sealed abstract class AbstractRocksDBColumn implements try { ensureOpen(); ensureOwned(readOptions); - keyBufferSize.record(key.readableBytes()); - int readAttemptsCount = 0; - try { - if (nettyDirect) { - // Get the key nio buffer to pass to RocksDB - ByteBuffer keyNioBuffer; - boolean mustCloseKey; - { - if (!LLUtils.isReadOnlyDirect(key)) { - // If the nio buffer is not available, copy the netty buffer into a new direct buffer - mustCloseKey = true; - var directKey = DefaultBufferAllocators.offHeapAllocator().allocate(key.readableBytes()); - key.copyInto(key.readerOffset(), directKey, 0, key.readableBytes()); - key = directKey; - } else { - mustCloseKey = false; - } - keyNioBuffer = ((ReadableComponent) key).readableBuffer(); - assert keyNioBuffer.isDirect(); - assert keyNioBuffer.limit() == key.readableBytes(); - } - - try { - // Create a direct result buffer because RocksDB works only with direct buffers - var resultBuffer = alloc.allocate(INITIAL_DIRECT_READ_BYTE_BUF_SIZE_BYTES); - try { - assert resultBuffer.readerOffset() == 0; - assert resultBuffer.writerOffset() == 0; - var resultWritable = ((WritableComponent) resultBuffer).writableBuffer(); - - var keyMayExist = db.keyMayExist(cfh, readOptions, keyNioBuffer.rewind(), - resultWritable.clear()); - KeyMayExistEnum keyMayExistState = keyMayExist.exists; - int keyMayExistValueLength = keyMayExist.valueLength; - // At the beginning, size reflects the expected size, then it becomes the real data size - int size = keyMayExistState == kExistsWithValue ? keyMayExistValueLength : -1; - boolean isKExistsWithoutValue = false; - switch (keyMayExistState) { - case kNotExist: { - readValueNotFoundWithBloomBufferSize.record(0); - resultBuffer.close(); - return null; - } - // todo: kExistsWithValue is not reliable (read below), - // in some cases it should be treated as kExistsWithoutValue - case kExistsWithValue: - case kExistsWithoutValue: { - if (keyMayExistState == kExistsWithoutValue) { - isKExistsWithoutValue = true; - } else if (WORKAROUND_MAY_EXIST_FAKE_ZERO) { - // todo: "size == 0 || resultWritable.limit() == 0" is checked because keyMayExist is broken, - // and sometimes it returns an empty array, as if it exists - if (size == 0 || resultWritable.limit() == 0) { - isKExistsWithoutValue = true; - } - } - if (isKExistsWithoutValue) { - assert keyMayExistValueLength == 0; - resultWritable.clear(); - readAttemptsCount++; - // real data size - size = db.get(cfh, readOptions, keyNioBuffer.rewind(), resultWritable.clear()); - if (size == RocksDB.NOT_FOUND) { - resultBuffer.close(); - readValueNotFoundWithMayExistBloomBufferSize.record(0); - return null; - } - } - } - default: { - // real data size - assert size >= 0; - if (size <= resultWritable.limit()) { - if (isKExistsWithoutValue) { - readValueFoundWithBloomUncachedBufferSize.record(size); - } else { - readValueFoundWithBloomCacheBufferSize.record(size); - } - assert size == resultWritable.limit(); - return resultBuffer.writerOffset(resultWritable.limit()); - } else { - resultBuffer.ensureWritable(size); - resultWritable = ((WritableComponent) resultBuffer).writableBuffer(); - assert resultBuffer.readerOffset() == 0; - assert resultBuffer.writerOffset() == 0; - - readAttemptsCount++; - size = db.get(cfh, readOptions, keyNioBuffer.rewind(), resultWritable.clear()); - if (size == RocksDB.NOT_FOUND) { - readValueNotFoundWithMayExistBloomBufferSize.record(0); - resultBuffer.close(); - return null; - } - assert size == resultWritable.limit(); - if (isKExistsWithoutValue) { - readValueFoundWithBloomUncachedBufferSize.record(size); - } else { - readValueFoundWithBloomCacheBufferSize.record(size); - } - return resultBuffer.writerOffset(resultWritable.limit()); - } - } - } - } catch (Throwable t) { - resultBuffer.close(); - throw t; - } - } finally { - if (mustCloseKey) { - key.close(); - } - } - } else { - byte[] keyArray = LLUtils.toArray(key); - requireNonNull(keyArray); - Holder data = new Holder<>(); - if (db.keyMayExist(cfh, readOptions, keyArray, data)) { - // todo: "data.getValue().length > 0" is checked because keyMayExist is broken, and sometimes it - // returns an empty array, as if it exists - if (data.getValue() != null && (!WORKAROUND_MAY_EXIST_FAKE_ZERO || data.getValue().length > 0)) { - readValueFoundWithBloomCacheBufferSize.record(data.getValue().length); - return LLUtils.fromByteArray(alloc, data.getValue()); - } else { - readAttemptsCount++; - byte[] result = db.get(cfh, readOptions, keyArray); - if (result == null) { - if (data.getValue() != null) { - readValueNotFoundWithBloomBufferSize.record(0); - } else { - readValueNotFoundWithMayExistBloomBufferSize.record(0); - } - return null; - } else { - readValueFoundWithBloomUncachedBufferSize.record(0); - return LLUtils.fromByteArray(alloc, result); - } - } - } else { - readValueNotFoundWithBloomBufferSize.record(0); - return null; - } - } - } finally { - readAttempts.record(readAttemptsCount); - } + return keyMayExistGetter.get(readOptions, key); } finally { closeLock.unlockRead(closeReadLock); } @@ -975,4 +832,61 @@ public sealed abstract class AbstractRocksDBColumn implements public Counter getEndedIterNext() { return endedIterNext; } + + private class DBColumnKeyMayExistGetter extends KeyMayExistGetter { + + public DBColumnKeyMayExistGetter() { + super(alloc, nettyDirect); + } + + @Override + protected KeyMayExist keyMayExist(ReadOptions readOptions, ByteBuffer key, ByteBuffer value) { + return db.keyMayExist(cfh, readOptions, key, value); + } + + @Override + protected boolean keyMayExist(ReadOptions readOptions, byte[] key, @Nullable Holder valueHolder) { + return db.keyMayExist(cfh, readOptions, key, valueHolder); + } + + @Override + protected int get(ReadOptions readOptions, ByteBuffer key, ByteBuffer value) throws RocksDBException { + return db.get(cfh, readOptions, key, value); + } + + @Override + protected byte[] get(ReadOptions readOptions, byte[] key) throws RocksDBException, IllegalArgumentException { + return db.get(cfh, readOptions, key); + } + + @Override + protected void recordReadValueNotFoundWithMayExistBloomBufferSize(int value) { + readValueNotFoundWithMayExistBloomBufferSize.record(value); + } + + @Override + protected void recordReadValueFoundWithBloomUncachedBufferSize(int value) { + readValueFoundWithBloomUncachedBufferSize.record(value); + } + + @Override + protected void recordReadValueFoundWithBloomCacheBufferSize(int value) { + readValueFoundWithBloomCacheBufferSize.record(value); + } + + @Override + protected void recordReadAttempts(int value) { + readAttempts.record(value); + } + + @Override + protected void recordReadValueNotFoundWithBloomBufferSize(int value) { + readValueNotFoundWithBloomBufferSize.record(value); + } + + @Override + protected void recordKeyBufferSize(int value) { + keyBufferSize.record(value); + } + } } diff --git a/src/main/java/it/cavallium/dbengine/database/disk/KeyMayExistGetter.java b/src/main/java/it/cavallium/dbengine/database/disk/KeyMayExistGetter.java new file mode 100644 index 0000000..b90fc07 --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/database/disk/KeyMayExistGetter.java @@ -0,0 +1,271 @@ +package it.cavallium.dbengine.database.disk; + +import static it.cavallium.dbengine.database.LLUtils.INITIAL_DIRECT_READ_BYTE_BUF_SIZE_BYTES; +import static java.lang.Boolean.parseBoolean; +import static java.lang.System.getProperty; +import static java.util.Objects.requireNonNull; +import static org.rocksdb.KeyMayExist.KeyMayExistEnum.kExistsWithValue; +import static org.rocksdb.KeyMayExist.KeyMayExistEnum.kExistsWithoutValue; +import static org.rocksdb.KeyMayExist.KeyMayExistEnum.kNotExist; + +import io.netty5.buffer.api.Buffer; +import io.netty5.buffer.api.BufferAllocator; +import io.netty5.buffer.api.DefaultBufferAllocators; +import io.netty5.buffer.api.ReadableComponent; +import io.netty5.buffer.api.WritableComponent; +import it.cavallium.dbengine.database.LLUtils; +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.HexFormat; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.Holder; +import org.rocksdb.KeyMayExist; +import org.rocksdb.KeyMayExist.KeyMayExistEnum; +import org.rocksdb.ReadOptions; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; + +public abstract class KeyMayExistGetter { + + private static final Logger LOG = LogManager.getLogger(KeyMayExistGetter.class); + + private static final boolean WORKAROUND_MAY_EXIST_FAKE_ZERO = parseBoolean(getProperty( + "it.cavallium.dbengine.workaround_may_exist_fake_zero", + "true" + )); + private static final boolean STRICT_MAYEXIST_NO_VALUE = parseBoolean(getProperty( + "it.cavallium.dbengine.mayexist.strict_no_value", + "false" + )); + private static final BufferAllocator OFF_HEAP_ALLOCATOR = DefaultBufferAllocators.offHeapAllocator(); + + private final BufferAllocator bufferAllocator; + private final boolean nettyDirect; + + public KeyMayExistGetter(BufferAllocator bufferAllocator, boolean nettyDirect) { + this.bufferAllocator = bufferAllocator; + this.nettyDirect = nettyDirect; + } + + public final @Nullable Buffer get(@NotNull ReadOptions readOptions, Buffer key) throws RocksDBException { + recordKeyBufferSize(key.readableBytes()); + if (nettyDirect) { + return getDirect(readOptions, key); + } else { + return getHeap(readOptions, key); + } + } + + private Buffer getDirect(ReadOptions readOptions, Buffer key) throws RocksDBException { + int readAttemptsCount = 0; + // Get the key nio buffer to pass to RocksDB + ByteBuffer keyNioBuffer; + boolean mustCloseKey; + { + if (!LLUtils.isReadOnlyDirect(key)) { + // If the nio buffer is not available, copy the netty buffer into a new direct buffer + mustCloseKey = true; + var directKey = OFF_HEAP_ALLOCATOR.allocate(key.readableBytes()); + key.copyInto(key.readerOffset(), directKey, 0, key.readableBytes()); + key = directKey; + } else { + mustCloseKey = false; + } + keyNioBuffer = ((ReadableComponent) key).readableBuffer(); + assert keyNioBuffer.isDirect(); + assert keyNioBuffer.limit() == key.readableBytes(); + } + + try { + // Create a direct result buffer because RocksDB works only with direct buffers + var resultBuffer = bufferAllocator.allocate(INITIAL_DIRECT_READ_BYTE_BUF_SIZE_BYTES); + try { + assert resultBuffer.readerOffset() == 0; + assert resultBuffer.writerOffset() == 0; + var resultWritable = ((WritableComponent) resultBuffer).writableBuffer(); + + var keyMayExist = keyMayExist(readOptions, keyNioBuffer.rewind(), resultWritable.clear()); + if (STRICT_MAYEXIST_NO_VALUE && keyMayExist.exists != kExistsWithValue && keyMayExist.valueLength != 0) { + // Create a direct result buffer because RocksDB works only with direct buffers + try (var realResultBuffer = bufferAllocator.allocate(INITIAL_DIRECT_READ_BYTE_BUF_SIZE_BYTES)) { + var resultWritableF = resultWritable; + var realResultWritable = ((WritableComponent) realResultBuffer).writableBuffer(); + var realSize = get(readOptions, keyNioBuffer.rewind(), realResultWritable); + var hexFormat = HexFormat.ofDelimiter(" "); + LOG.error( + "KeyMayExist is {}, but value length is non-zero: {}! Disk value size is {}\nBytes from bloom cache:\n{}\nBytes from db:\n{}", + () -> keyMayExist.exists, + () -> keyMayExist.valueLength, + () -> realSize, + () -> { + resultBuffer.writerOffset(resultWritableF.limit()); + return hexFormat.formatHex(LLUtils.toArray(resultBuffer)); + }, + () -> { + realResultBuffer.writerOffset(realResultWritable.limit()); + return hexFormat.formatHex(LLUtils.toArray(realResultBuffer)); + } + ); + var sliceKME = LLUtils.toArray(resultBuffer.copy(0, Math.min(resultWritableF.limit(), realSize))); + var sliceDB = LLUtils.toArray(realResultBuffer.copy(0, Math.min(realResultWritable.limit(), realSize))); + throw new RocksDBException( + "KeyMayExist is " + keyMayExist.exists + ", but value length is non-zero: " + keyMayExist.valueLength + + "! Disk value size is " + realSize + ". The bloom cache partial value is " + + (Arrays.equals(sliceKME, sliceDB) ? "correct" : "corrupted")); + } + } + KeyMayExistEnum keyMayExistState = keyMayExist.exists; + int keyMayExistValueLength = keyMayExist.valueLength; + // At the beginning, size reflects the expected size, then it becomes the real data size + //noinspection SwitchStatementWithTooFewBranches + int size = switch (keyMayExistState) { + case kExistsWithValue -> keyMayExistValueLength; + default -> -1; + }; + boolean isKExistsWithoutValue = false; + switch (keyMayExistState) { + case kNotExist: { + recordReadValueNotFoundWithBloomBufferSize(0); + resultBuffer.close(); + return null; + } + // todo: kExistsWithValue is not reliable (read below), + // in some cases it should be treated as kExistsWithoutValue + case kExistsWithValue: + case kExistsWithoutValue: { + if (keyMayExistState == kExistsWithoutValue) { + isKExistsWithoutValue = true; + } else if (WORKAROUND_MAY_EXIST_FAKE_ZERO) { + // todo: "size == 0 || resultWritable.limit() == 0" is checked because keyMayExist is broken, + // and sometimes it returns an empty array, as if it exists + if (size == 0 || resultWritable.limit() == 0) { + isKExistsWithoutValue = true; + } + } + if (isKExistsWithoutValue) { + assert + !STRICT_MAYEXIST_NO_VALUE || keyMayExistValueLength == 0 : + "keyMayExist value length is " + keyMayExistValueLength + " instead of 0"; + resultWritable.clear(); + readAttemptsCount++; + // real data size + size = get(readOptions, keyNioBuffer.rewind(), resultWritable.clear()); + if (keyMayExistState == kExistsWithValue && size != keyMayExistValueLength) { + throw new IllegalStateException("Bloom filter data is corrupted." + + " Bloom value size=" + keyMayExistState + ", Real value size=" + size); + } + if (size == RocksDB.NOT_FOUND) { + resultBuffer.close(); + recordReadValueNotFoundWithMayExistBloomBufferSize(0); + return null; + } + } + } + default: { + // real data size + assert size >= 0; + if (size <= resultWritable.limit()) { + if (isKExistsWithoutValue) { + recordReadValueFoundWithBloomUncachedBufferSize(size); + } else { + recordReadValueFoundWithBloomCacheBufferSize(size); + } + assert size == resultWritable.limit(); + return resultBuffer.writerOffset(resultWritable.limit()); + } else { + resultBuffer.ensureWritable(size); + resultWritable = ((WritableComponent) resultBuffer).writableBuffer(); + assert resultBuffer.readerOffset() == 0; + assert resultBuffer.writerOffset() == 0; + + readAttemptsCount++; + size = get(readOptions, keyNioBuffer.rewind(), resultWritable.clear()); + if (size == RocksDB.NOT_FOUND) { + recordReadValueNotFoundWithMayExistBloomBufferSize(0); + resultBuffer.close(); + return null; + } + assert size == resultWritable.limit(); + if (isKExistsWithoutValue) { + recordReadValueFoundWithBloomUncachedBufferSize(size); + } else { + recordReadValueFoundWithBloomCacheBufferSize(size); + } + return resultBuffer.writerOffset(resultWritable.limit()); + } + } + } + } catch (Throwable t) { + resultBuffer.close(); + throw t; + } + } finally { + if (mustCloseKey) { + key.close(); + } + recordReadAttempts(readAttemptsCount); + } + } + + private Buffer getHeap(ReadOptions readOptions, Buffer key) throws RocksDBException { + int readAttemptsCount = 0; + try { + byte[] keyArray = LLUtils.toArray(key); + requireNonNull(keyArray); + Holder data = new Holder<>(); + if (keyMayExist(readOptions, keyArray, data)) { + // todo: "data.getValue().length > 0" is checked because keyMayExist is broken, and sometimes it + // returns an empty array, as if it exists + if (data.getValue() != null && (!WORKAROUND_MAY_EXIST_FAKE_ZERO || data.getValue().length > 0)) { + recordReadValueFoundWithBloomCacheBufferSize(data.getValue().length); + return LLUtils.fromByteArray(bufferAllocator, data.getValue()); + } else { + readAttemptsCount++; + byte[] result = get(readOptions, keyArray); + if (result == null) { + if (data.getValue() != null) { + recordReadValueNotFoundWithBloomBufferSize(0); + } else { + recordReadValueNotFoundWithMayExistBloomBufferSize(0); + } + return null; + } else { + recordReadValueFoundWithBloomUncachedBufferSize(0); + return LLUtils.fromByteArray(bufferAllocator, result); + } + } + } else { + recordReadValueNotFoundWithBloomBufferSize(0); + return null; + } + } finally { + recordReadAttempts(readAttemptsCount); + } + } + + protected abstract KeyMayExist keyMayExist(final ReadOptions readOptions, final ByteBuffer key, final ByteBuffer value); + + protected abstract boolean keyMayExist(final ReadOptions readOptions, + final byte[] key, + @Nullable final Holder valueHolder); + + protected abstract int get(final ReadOptions opt, final ByteBuffer key, final ByteBuffer value) throws RocksDBException; + + protected abstract byte[] get(final ReadOptions opt, final byte[] key) throws RocksDBException, IllegalArgumentException; + + protected abstract void recordReadValueNotFoundWithMayExistBloomBufferSize(int value); + + protected abstract void recordReadValueFoundWithBloomUncachedBufferSize(int value); + + protected abstract void recordReadValueFoundWithBloomCacheBufferSize(int value); + + protected abstract void recordReadAttempts(int value); + + protected abstract void recordReadValueNotFoundWithBloomBufferSize(int value); + + protected abstract void recordKeyBufferSize(int value); +} diff --git a/src/test/java/it/cavallium/dbengine/TestGetter.java b/src/test/java/it/cavallium/dbengine/TestGetter.java new file mode 100644 index 0000000..c0a7fdc --- /dev/null +++ b/src/test/java/it/cavallium/dbengine/TestGetter.java @@ -0,0 +1,100 @@ +package it.cavallium.dbengine; + +import static java.util.Map.entry; + +import io.netty5.buffer.api.BufferAllocator; +import it.cavallium.dbengine.database.disk.KeyMayExistGetter; +import it.unimi.dsi.fastutil.bytes.ByteList; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.Map; +import org.jetbrains.annotations.Nullable; +import org.junit.jupiter.api.Test; +import org.rocksdb.Holder; +import org.rocksdb.KeyMayExist; +import org.rocksdb.KeyMayExist.KeyMayExistEnum; +import org.rocksdb.ReadOptions; +import org.rocksdb.RocksDBException; +//todo: +public class TestGetter { + + private static Map db = Map.ofEntries( + entry(bytes("cached-partially"), bytes("123456789")), + entry(bytes("cached-totally"), bytes("ciaov")), + entry(bytes("cached-without-value"), bytes("ciaov")), + entry(bytes("ciao3"), bytes("ciaov")) + ); + + record KeyMayExistResult(KeyMayExist.KeyMayExistEnum keyMayExistEnum, int size, ByteList cachedValue) {} + + private static Map cache = Map.ofEntries( + entry(bytes("cached-partially"), new KeyMayExistResult(KeyMayExistEnum.kExistsWithoutValue, 9, bytes("12345678"))), + entry(bytes("cached-totally"), new KeyMayExistResult(KeyMayExistEnum.kExistsWithValue, 5, bytes("ciaov"))), + entry(bytes("cached-without-value"), new KeyMayExistResult(KeyMayExistEnum.kExistsWithoutValue, 0, bytes("ciaov"))) + ); + + private static ByteList bytes(String text) { + return ByteList.of(text.getBytes(StandardCharsets.UTF_8)); + } + + private static String text(ByteList bytes) { + return new String(bytes.toByteArray(), StandardCharsets.UTF_8); + } + + public KeyMayExistGetter getter = new KeyMayExistGetter(BufferAllocator.offHeapUnpooled(), true) { + @Override + protected KeyMayExist keyMayExist(ReadOptions readOptions, ByteBuffer key, ByteBuffer value) { + return null; + } + + @Override + protected boolean keyMayExist(ReadOptions readOptions, byte[] key, @Nullable Holder valueHolder) { + throw new UnsupportedOperationException(); + } + + @Override + protected int get(ReadOptions opt, ByteBuffer key, ByteBuffer value) throws RocksDBException { + return 0; + } + + @Override + protected byte[] get(ReadOptions opt, byte[] key) throws RocksDBException, IllegalArgumentException { + throw new UnsupportedOperationException(); + } + + @Override + protected void recordReadValueNotFoundWithMayExistBloomBufferSize(int value) { + + } + + @Override + protected void recordReadValueFoundWithBloomUncachedBufferSize(int value) { + + } + + @Override + protected void recordReadValueFoundWithBloomCacheBufferSize(int value) { + + } + + @Override + protected void recordReadAttempts(int value) { + + } + + @Override + protected void recordReadValueNotFoundWithBloomBufferSize(int value) { + + } + + @Override + protected void recordKeyBufferSize(int value) { + + } + }; + + @Test + public void testSimpleGet() { + + } +}