diff --git a/src/main/java/it/cavallium/dbengine/client/DatabaseOptions.java b/src/main/java/it/cavallium/dbengine/client/DatabaseOptions.java index 739684c..4199642 100644 --- a/src/main/java/it/cavallium/dbengine/client/DatabaseOptions.java +++ b/src/main/java/it/cavallium/dbengine/client/DatabaseOptions.java @@ -13,12 +13,5 @@ public record DatabaseOptions(Map extraFlags, boolean useDirectIO, boolean allowMemoryMapping, boolean allowNettyDirect, - boolean useNettyDirect, int maxOpenFiles) { - - public DatabaseOptions { - if (useNettyDirect && !allowNettyDirect) { - throw new IllegalArgumentException("If allowNettyDirect is false, you must also set useNettyDirect to false"); - } - } } diff --git a/src/main/java/it/cavallium/dbengine/database/disk/AbstractRocksDBColumn.java b/src/main/java/it/cavallium/dbengine/database/disk/AbstractRocksDBColumn.java index 6d186a0..be54afd 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/AbstractRocksDBColumn.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/AbstractRocksDBColumn.java @@ -7,6 +7,7 @@ import io.micrometer.core.instrument.MeterRegistry; import io.net5.buffer.api.Buffer; import io.net5.buffer.api.BufferAllocator; import io.net5.buffer.api.Send; +import io.net5.buffer.api.StandardAllocationTypes; import io.net5.util.internal.PlatformDependent; import it.cavallium.dbengine.client.DatabaseOptions; import it.cavallium.dbengine.database.LLUtils; @@ -43,6 +44,7 @@ public sealed abstract class AbstractRocksDBColumn implements private final T db; private final DatabaseOptions opts; + private final boolean nettyDirect; private final BufferAllocator alloc; private final ColumnFamilyHandle cfh; @@ -56,6 +58,7 @@ public sealed abstract class AbstractRocksDBColumn implements MeterRegistry meterRegistry) { this.db = db; this.opts = databaseOptions; + this.nettyDirect = opts.allowNettyDirect() && alloc.getAllocationType() == StandardAllocationTypes.OFF_HEAP; this.alloc = alloc; this.cfh = cfh; @@ -95,7 +98,7 @@ public sealed abstract class AbstractRocksDBColumn implements if (!cfh.isOwningHandle()) { throw new IllegalStateException("Column family is closed"); } - if (opts.allowNettyDirect()) { + if (nettyDirect) { //todo: implement keyMayExist if existsAlmostCertainly is false. // Unfortunately it's not feasible until RocksDB implements keyMayExist with buffers @@ -202,7 +205,7 @@ public sealed abstract class AbstractRocksDBColumn implements } assert key.isAccessible(); assert value.isAccessible(); - if (opts.allowNettyDirect()) { + if (nettyDirect) { var keyNioBuffer = LLUtils.convertToReadableDirect(alloc, key.send()); try (var ignored1 = keyNioBuffer.buffer().receive()) { assert keyNioBuffer.byteBuffer().isDirect(); @@ -275,7 +278,7 @@ public sealed abstract class AbstractRocksDBColumn implements if (!cfh.isOwningHandle()) { throw new IllegalStateException("Column family is closed"); } - if (opts.allowNettyDirect()) { + if (nettyDirect) { DirectBuffer keyNioBuffer = LLUtils.convertToReadableDirect(alloc, key.send()); try { db.delete(cfh, writeOptions, keyNioBuffer.byteBuffer()); diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java index 5d9f284..fd99551 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java @@ -1,6 +1,7 @@ package it.cavallium.dbengine.database.disk; import static io.net5.buffer.Unpooled.wrappedBuffer; +import static io.net5.buffer.api.StandardAllocationTypes.OFF_HEAP; import static it.cavallium.dbengine.database.LLUtils.MARKER_ROCKSDB; import static it.cavallium.dbengine.database.LLUtils.fromByteArray; import static java.util.Objects.requireNonNull; @@ -11,6 +12,7 @@ import io.net5.buffer.api.BufferAllocator; import io.net5.buffer.api.MemoryManager; import io.net5.buffer.api.Resource; import io.net5.buffer.api.Send; +import io.net5.buffer.api.StandardAllocationTypes; import io.net5.buffer.api.internal.ResourceSupport; import io.net5.util.internal.PlatformDependent; import it.cavallium.dbengine.client.BadBlock; @@ -129,8 +131,9 @@ public class LLLocalDictionary implements LLDictionary { private final Scheduler dbScheduler; private final Function snapshotResolver; private final UpdateMode updateMode; - private final BufferAllocator alloc; private final DatabaseOptions databaseOptions; + private final boolean nettyDirect; + private final BufferAllocator alloc; public LLLocalDictionary( BufferAllocator allocator, @@ -151,6 +154,7 @@ public class LLLocalDictionary implements LLDictionary { this.updateMode = updateMode; this.databaseOptions = databaseOptions; alloc = allocator; + this.nettyDirect = databaseOptions.allowNettyDirect() && alloc.getAllocationType() == OFF_HEAP; } @Override @@ -269,7 +273,7 @@ public class LLLocalDictionary implements LLDictionary { readOpts.setFillCache(false); if (range.hasMin()) { try (var rangeMin = range.getMin().receive()) { - if (databaseOptions.allowNettyDirect()) { + if (nettyDirect) { var directBuf = LLUtils.convertToReadableDirect(alloc, rangeMin.send()); cloned1 = directBuf.buffer().receive(); direct1 = directBuf.byteBuffer(); @@ -281,7 +285,7 @@ public class LLLocalDictionary implements LLDictionary { } if (range.hasMax()) { try (var rangeMax = range.getMax().receive()) { - if (databaseOptions.allowNettyDirect()) { + if (nettyDirect) { var directBuf = LLUtils.convertToReadableDirect(alloc, rangeMax.send()); cloned2 = directBuf.buffer().receive(); direct2 = directBuf.byteBuffer(); @@ -294,7 +298,7 @@ public class LLLocalDictionary implements LLDictionary { try (RocksIterator rocksIterator = db.newIterator(readOpts)) { if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) { try (var rangeMin = range.getMin().receive()) { - if (databaseOptions.allowNettyDirect()) { + if (nettyDirect) { var directBuf = LLUtils.convertToReadableDirect(alloc, rangeMin.send()); cloned3 = directBuf.buffer().receive(); direct3 = directBuf.byteBuffer(); @@ -601,7 +605,7 @@ public class LLLocalDictionary implements LLDictionary { for (LLEntry entry : entriesWindow) { var k = entry.getKey(); var v = entry.getValue(); - if (databaseOptions.allowNettyDirect()) { + if (nettyDirect) { batch.put(cfh, k, v); } else { try (var key = k.receive()) { @@ -807,7 +811,7 @@ public class LLLocalDictionary implements LLDictionary { return Flux.usingWhen(rangeMono, rangeSend -> Flux.using( () -> new LLLocalEntryReactiveRocksIterator(db, rangeSend, - databaseOptions.allowNettyDirect(), resolveSnapshot(snapshot)), + nettyDirect, resolveSnapshot(snapshot)), iterator -> iterator.flux().subscribeOn(dbScheduler, false), LLLocalReactiveRocksIterator::close ).transform(LLUtils::handleDiscard), @@ -820,7 +824,7 @@ public class LLLocalDictionary implements LLDictionary { return Flux.usingWhen(rangeMono, rangeSend -> Flux.using( () -> new LLLocalGroupedEntryReactiveRocksIterator(db, prefixLength, rangeSend, - databaseOptions.allowNettyDirect(), resolveSnapshot(snapshot)), + nettyDirect, resolveSnapshot(snapshot)), iterator -> iterator.flux().subscribeOn(dbScheduler, false), LLLocalGroupedReactiveRocksIterator::close ).transform(LLUtils::handleDiscard), @@ -851,7 +855,7 @@ public class LLLocalDictionary implements LLDictionary { return Flux.usingWhen(rangeMono, rangeSend -> Flux.using( () -> new LLLocalGroupedKeyReactiveRocksIterator(db, prefixLength, rangeSend, - databaseOptions.allowNettyDirect(), resolveSnapshot(snapshot)), + nettyDirect, resolveSnapshot(snapshot)), iterator -> iterator.flux().subscribeOn(dbScheduler, false), LLLocalGroupedReactiveRocksIterator::close ).transform(LLUtils::handleDiscard), @@ -873,7 +877,7 @@ public class LLLocalDictionary implements LLDictionary { } ro.setVerifyChecksums(true); var rocksIteratorTuple = getRocksIterator(alloc, - databaseOptions.allowNettyDirect(), ro, range.send(), db + nettyDirect, ro, range.send(), db ); try { try (var rocksIterator = rocksIteratorTuple.getT1()) { @@ -916,7 +920,7 @@ public class LLLocalDictionary implements LLDictionary { () -> new LLLocalKeyPrefixReactiveRocksIterator(db, prefixLength, rangeSend, - databaseOptions.allowNettyDirect(), + nettyDirect, resolveSnapshot(snapshot), true ), @@ -949,7 +953,7 @@ public class LLLocalDictionary implements LLDictionary { return Flux.usingWhen(rangeMono, rangeSend -> Flux.using( () -> new LLLocalKeyReactiveRocksIterator(db, rangeSend, - databaseOptions.allowNettyDirect(), resolveSnapshot(snapshot) + nettyDirect, resolveSnapshot(snapshot) ), iterator -> iterator.flux().subscribeOn(dbScheduler, false), LLLocalReactiveRocksIterator::close @@ -974,7 +978,7 @@ public class LLLocalDictionary implements LLDictionary { try (var opts = new ReadOptions(EMPTY_READ_OPTIONS)) { ReleasableSlice minBound; if (range.hasMin()) { - minBound = setIterateBound(alloc, databaseOptions.allowNettyDirect(), + minBound = setIterateBound(alloc, nettyDirect, opts, IterateBound.LOWER, range.getMin() @@ -985,7 +989,7 @@ public class LLLocalDictionary implements LLDictionary { try { ReleasableSlice maxBound; if (range.hasMax()) { - maxBound = setIterateBound(alloc, databaseOptions.allowNettyDirect(), + maxBound = setIterateBound(alloc, nettyDirect, opts, IterateBound.UPPER, range.getMax() @@ -998,7 +1002,7 @@ public class LLLocalDictionary implements LLDictionary { SafeCloseable seekTo; try (RocksIterator it = db.newIterator(opts)) { if (!PREFER_SEEK_TO_FIRST && range.hasMin()) { - seekTo = rocksIterSeekTo(alloc, databaseOptions.allowNettyDirect(), it, range.getMin()); + seekTo = rocksIterSeekTo(alloc, nettyDirect, it, range.getMin()); } else { seekTo = null; it.seekToFirst(); @@ -1076,7 +1080,7 @@ public class LLLocalDictionary implements LLDictionary { )) { for (LLEntry entry : entriesList) { assert entry.isAccessible(); - if (databaseOptions.allowNettyDirect()) { + if (nettyDirect) { batch.put(cfh, entry.getKey(), entry.getValue()); } else { batch.put(cfh, @@ -1151,7 +1155,7 @@ public class LLLocalDictionary implements LLDictionary { readOpts.setFillCache(false); ReleasableSlice minBound; if (range.hasMin()) { - minBound = setIterateBound(alloc, databaseOptions.allowNettyDirect(), readOpts, + minBound = setIterateBound(alloc, nettyDirect, readOpts, IterateBound.LOWER, range.getMin()); } else { minBound = emptyReleasableSlice(); @@ -1159,7 +1163,7 @@ public class LLLocalDictionary implements LLDictionary { try { ReleasableSlice maxBound; if (range.hasMax()) { - maxBound = setIterateBound(alloc, databaseOptions.allowNettyDirect(), readOpts, + maxBound = setIterateBound(alloc, nettyDirect, readOpts, IterateBound.UPPER, range.getMax()); } else { maxBound = emptyReleasableSlice(); @@ -1167,7 +1171,7 @@ public class LLLocalDictionary implements LLDictionary { try (var rocksIterator = db.newIterator(readOpts)) { SafeCloseable seekTo; if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) { - seekTo = rocksIterSeekTo(alloc, databaseOptions.allowNettyDirect(), rocksIterator, range.getMin()); + seekTo = rocksIterSeekTo(alloc, nettyDirect, rocksIterator, range.getMin()); } else { seekTo = null; rocksIterator.seekToFirst(); @@ -1203,7 +1207,7 @@ public class LLLocalDictionary implements LLDictionary { readOpts.setFillCache(false); ReleasableSlice minBound; if (range.hasMin()) { - minBound = setIterateBound(alloc, databaseOptions.allowNettyDirect(), readOpts, + minBound = setIterateBound(alloc, nettyDirect, readOpts, IterateBound.LOWER, range.getMin()); } else { minBound = emptyReleasableSlice(); @@ -1211,7 +1215,7 @@ public class LLLocalDictionary implements LLDictionary { try { ReleasableSlice maxBound; if (range.hasMax()) { - maxBound = setIterateBound(alloc, databaseOptions.allowNettyDirect(), readOpts, IterateBound.UPPER, + maxBound = setIterateBound(alloc, nettyDirect, readOpts, IterateBound.UPPER, range.getMax()); } else { maxBound = emptyReleasableSlice(); @@ -1219,7 +1223,7 @@ public class LLLocalDictionary implements LLDictionary { try (var rocksIterator = db.newIterator(readOpts)) { SafeCloseable seekTo; if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) { - seekTo = rocksIterSeekTo(alloc, databaseOptions.allowNettyDirect(), rocksIterator, range.getMin()); + seekTo = rocksIterSeekTo(alloc, nettyDirect, rocksIterator, range.getMin()); } else { seekTo = null; rocksIterator.seekToFirst(); @@ -1415,7 +1419,7 @@ public class LLLocalDictionary implements LLDictionary { readOpts.setVerifyChecksums(VERIFY_CHECKSUMS_WHEN_NOT_NEEDED); ReleasableSlice minBound; if (range.hasMin()) { - minBound = setIterateBound(alloc, databaseOptions.allowNettyDirect(), readOpts, + minBound = setIterateBound(alloc, nettyDirect, readOpts, IterateBound.LOWER, range.getMin()); } else { minBound = emptyReleasableSlice(); @@ -1423,7 +1427,7 @@ public class LLLocalDictionary implements LLDictionary { try { ReleasableSlice maxBound; if (range.hasMax()) { - maxBound = setIterateBound(alloc, databaseOptions.allowNettyDirect(), readOpts, + maxBound = setIterateBound(alloc, nettyDirect, readOpts, IterateBound.UPPER, range.getMax()); } else { maxBound = emptyReleasableSlice(); @@ -1436,7 +1440,7 @@ public class LLLocalDictionary implements LLDictionary { try (var rocksIterator = db.newIterator(readOpts)) { SafeCloseable seekTo; if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) { - seekTo = rocksIterSeekTo(alloc, databaseOptions.allowNettyDirect(), + seekTo = rocksIterSeekTo(alloc, nettyDirect, rocksIterator, range.getMin()); } else { seekTo = null; @@ -1481,7 +1485,7 @@ public class LLLocalDictionary implements LLDictionary { try (var readOpts = new ReadOptions(resolveSnapshot(snapshot))) { ReleasableSlice minBound; if (range.hasMin()) { - minBound = setIterateBound(alloc, databaseOptions.allowNettyDirect(), readOpts, + minBound = setIterateBound(alloc, nettyDirect, readOpts, IterateBound.LOWER, range.getMin()); } else { minBound = emptyReleasableSlice(); @@ -1489,7 +1493,7 @@ public class LLLocalDictionary implements LLDictionary { try { ReleasableSlice maxBound; if (range.hasMax()) { - maxBound = setIterateBound(alloc, databaseOptions.allowNettyDirect(), readOpts, + maxBound = setIterateBound(alloc, nettyDirect, readOpts, IterateBound.UPPER, range.getMax()); } else { maxBound = emptyReleasableSlice(); @@ -1497,7 +1501,7 @@ public class LLLocalDictionary implements LLDictionary { try (var rocksIterator = db.newIterator(readOpts)) { SafeCloseable seekTo; if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) { - seekTo = rocksIterSeekTo(alloc, databaseOptions.allowNettyDirect(), + seekTo = rocksIterSeekTo(alloc, nettyDirect, rocksIterator, range.getMin()); } else { seekTo = null; @@ -1543,7 +1547,7 @@ public class LLLocalDictionary implements LLDictionary { try (var readOpts = new ReadOptions(resolveSnapshot(snapshot))) { ReleasableSlice minBound; if (range.hasMin()) { - minBound = setIterateBound(alloc, databaseOptions.allowNettyDirect(), readOpts, + minBound = setIterateBound(alloc, nettyDirect, readOpts, IterateBound.LOWER, range.getMin()); } else { minBound = emptyReleasableSlice(); @@ -1551,7 +1555,7 @@ public class LLLocalDictionary implements LLDictionary { try { ReleasableSlice maxBound; if (range.hasMax()) { - maxBound = setIterateBound(alloc, databaseOptions.allowNettyDirect(), readOpts, + maxBound = setIterateBound(alloc, nettyDirect, readOpts, IterateBound.UPPER, range.getMax()); } else { maxBound = emptyReleasableSlice(); @@ -1559,7 +1563,7 @@ public class LLLocalDictionary implements LLDictionary { try (var rocksIterator = db.newIterator(readOpts)) { SafeCloseable seekTo; if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) { - seekTo = rocksIterSeekTo(alloc, databaseOptions.allowNettyDirect(), + seekTo = rocksIterSeekTo(alloc, nettyDirect, rocksIterator, range.getMin()); } else { seekTo = null; @@ -1712,7 +1716,7 @@ public class LLLocalDictionary implements LLDictionary { try (var readOpts = new ReadOptions(getReadOptions(null))) { ReleasableSlice minBound; if (range.hasMin()) { - minBound = setIterateBound(alloc, databaseOptions.allowNettyDirect(), readOpts, + minBound = setIterateBound(alloc, nettyDirect, readOpts, IterateBound.LOWER, range.getMin()); } else { minBound = emptyReleasableSlice(); @@ -1720,7 +1724,7 @@ public class LLLocalDictionary implements LLDictionary { try { ReleasableSlice maxBound; if (range.hasMax()) { - maxBound = setIterateBound(alloc, databaseOptions.allowNettyDirect(), readOpts, + maxBound = setIterateBound(alloc, nettyDirect, readOpts, IterateBound.UPPER, range.getMax()); } else { maxBound = emptyReleasableSlice(); @@ -1728,7 +1732,7 @@ public class LLLocalDictionary implements LLDictionary { try (RocksIterator rocksIterator = db.newIterator(readOpts)) { SafeCloseable seekTo; if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) { - seekTo = rocksIterSeekTo(alloc, databaseOptions.allowNettyDirect(), + seekTo = rocksIterSeekTo(alloc, nettyDirect, rocksIterator, range.getMin()); } else { seekTo = null; diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java index 2ef7c65..b74a825 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java @@ -1,5 +1,6 @@ package it.cavallium.dbengine.database.disk; +import static io.net5.buffer.api.StandardAllocationTypes.OFF_HEAP; import static it.cavallium.dbengine.database.LLUtils.MARKER_ROCKSDB; import io.micrometer.core.instrument.MeterRegistry; @@ -82,6 +83,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { private final Path dbPath; private final String name; private final DatabaseOptions databaseOptions; + private final boolean nettyDirect; private final boolean enableColumnsBug; private RocksDB db; @@ -99,9 +101,10 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { DatabaseOptions databaseOptions) throws IOException { this.name = name; this.allocator = allocator; + this.nettyDirect = databaseOptions.allowNettyDirect() && allocator.getAllocationType() == OFF_HEAP; this.meterRegistry = meterRegistry; - if (databaseOptions.allowNettyDirect()) { + if (nettyDirect) { if (!PlatformDependent.hasUnsafe()) { throw new UnsupportedOperationException("Please enable unsafe support or disable netty direct buffers", PlatformDependent.getUnsafeUnavailabilityCause() diff --git a/src/test/java/it/cavallium/dbengine/LocalTemporaryDbGenerator.java b/src/test/java/it/cavallium/dbengine/LocalTemporaryDbGenerator.java index 4bbf55b..a53e03f 100644 --- a/src/test/java/it/cavallium/dbengine/LocalTemporaryDbGenerator.java +++ b/src/test/java/it/cavallium/dbengine/LocalTemporaryDbGenerator.java @@ -65,7 +65,7 @@ public class LocalTemporaryDbGenerator implements TemporaryDbGenerator { conn.getDatabase("testdb", List.of(Column.dictionary("testmap"), Column.special("ints"), Column.special("longs")), new DatabaseOptions(Map.of(), true, false, true, false, - true, canUseNettyDirect, canUseNettyDirect, -1) + true, canUseNettyDirect, -1) ), conn.getLuceneIndex("testluceneindex1", 1, diff --git a/src/test/java/it/cavallium/dbengine/MemoryTemporaryDbGenerator.java b/src/test/java/it/cavallium/dbengine/MemoryTemporaryDbGenerator.java index 73aabcd..a3b5cc5 100644 --- a/src/test/java/it/cavallium/dbengine/MemoryTemporaryDbGenerator.java +++ b/src/test/java/it/cavallium/dbengine/MemoryTemporaryDbGenerator.java @@ -37,7 +37,7 @@ public class MemoryTemporaryDbGenerator implements TemporaryDbGenerator { .zip( conn.getDatabase("testdb", List.of(Column.dictionary("testmap"), Column.special("ints"), Column.special("longs")), - new DatabaseOptions(Map.of(), true, false, true, false, true, canUseNettyDirect, canUseNettyDirect, -1) + new DatabaseOptions(Map.of(), true, false, true, false, true, canUseNettyDirect, -1) ), conn.getLuceneIndex("testluceneindex1", 1,