diff --git a/src/main/java/it/cavallium/dbengine/database/disk/DatabaseOptions.java b/src/main/java/it/cavallium/dbengine/database/disk/DatabaseOptions.java index 383b001..1503be9 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/DatabaseOptions.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/DatabaseOptions.java @@ -11,4 +11,6 @@ public record DatabaseOptions(Map extraFlags, boolean lowMemory, boolean inMemory, boolean useDirectIO, - boolean allowMemoryMapping) {} + boolean allowMemoryMapping, + boolean allowNettyDirect, + boolean useNettyDirect) {} diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java index 21a0b23..1eb03f5 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java @@ -138,6 +138,7 @@ public class LLLocalDictionary implements LLDictionary { private final ByteBufAllocator alloc; private final String getRangeMultiDebugName; private final String getRangeKeysMultiDebugName; + private final DatabaseOptions databaseOptions; public LLLocalDictionary( ByteBufAllocator allocator, @@ -147,7 +148,8 @@ public class LLLocalDictionary implements LLDictionary { String columnName, Scheduler dbScheduler, Function snapshotResolver, - UpdateMode updateMode) { + UpdateMode updateMode, + DatabaseOptions databaseOptions) { Objects.requireNonNull(db); this.db = db; Objects.requireNonNull(columnFamilyHandle); @@ -159,6 +161,7 @@ public class LLLocalDictionary implements LLDictionary { this.updateMode = updateMode; this.getRangeMultiDebugName = databaseName + "(" + columnName + ")" + "::getRangeMulti"; this.getRangeKeysMultiDebugName = databaseName + "(" + columnName + ")" + "::getRangeKeysMulti"; + this.databaseOptions = databaseOptions; alloc = allocator; } @@ -258,7 +261,7 @@ public class LLLocalDictionary implements LLDictionary { ByteBuf key, boolean existsAlmostCertainly) throws RocksDBException { try { - if (key.isDirect()) { + if (databaseOptions.allowNettyDirect() && key.isDirect()) { //todo: implement keyMayExist if existsAlmostCertainly is false. // Unfortunately it's not feasible until RocksDB implements keyMayExist with buffers @@ -366,7 +369,7 @@ public class LLLocalDictionary implements LLDictionary { private void dbPut(ColumnFamilyHandle cfh, @Nullable WriteOptions writeOptions, ByteBuf key, ByteBuf value) throws RocksDBException { try { - if (key.isDirect() && value.isDirect()) { + if (databaseOptions.allowNettyDirect() && key.isDirect() && value.isDirect()) { if (!key.isDirect()) { throw new RocksDBException("Key buffer must be direct"); } @@ -416,7 +419,7 @@ public class LLLocalDictionary implements LLDictionary { readOpts.setVerifyChecksums(VERIFY_CHECKSUMS_WHEN_NOT_NEEDED); readOpts.setFillCache(false); if (range.hasMin()) { - if (range.getMin().isDirect()) { + if (databaseOptions.allowNettyDirect() && range.getMin().isDirect()) { readOpts.setIterateLowerBound(new DirectSlice(Objects.requireNonNull(LLUtils.toDirect(range.getMin()), "This range must use direct buffers" ))); @@ -425,7 +428,7 @@ public class LLLocalDictionary implements LLDictionary { } } if (range.hasMax()) { - if (range.getMax().isDirect()) { + if (databaseOptions.allowNettyDirect() && range.getMax().isDirect()) { readOpts.setIterateUpperBound(new DirectSlice(Objects.requireNonNull(LLUtils.toDirect(range.getMax()), "This range must use direct buffers" ))); @@ -435,7 +438,7 @@ public class LLLocalDictionary implements LLDictionary { } try (RocksIterator rocksIterator = db.newIterator(cfh, readOpts)) { if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) { - if (range.getMin().isDirect()) { + if (databaseOptions.allowNettyDirect() && range.getMin().isDirect()) { rocksIterator.seek(Objects.requireNonNull(LLUtils.toDirect(range.getMin()), "This range must use direct buffers" )); @@ -809,7 +812,7 @@ public class LLLocalDictionary implements LLDictionary { private void dbDelete(ColumnFamilyHandle cfh, @Nullable WriteOptions writeOptions, ByteBuf key) throws RocksDBException { try { - if (key.isDirect()) { + if (databaseOptions.allowNettyDirect() && key.isDirect()) { if (!key.isDirect()) { throw new IllegalArgumentException("Key must be a direct buffer"); } @@ -983,7 +986,7 @@ public class LLLocalDictionary implements LLDictionary { } }) .subscribeOn(dbScheduler) - .flatMapMany(entries -> Flux.fromIterable(entries)) + .flatMapMany(Flux::fromIterable) .onErrorMap(cause -> new IOException("Failed to read keys " + Arrays.deepToString(keysWindow.toArray(ByteBuf[]::new)), cause)) .doAfterTerminate(() -> keysWindow.forEach(ReferenceCounted::release)) @@ -1001,23 +1004,7 @@ public class LLLocalDictionary implements LLDictionary { public Flux> putMulti(Flux> entries, boolean getOldValues) { return entries .window(Math.min(MULTI_GET_WINDOW, CAPPED_WRITE_BATCH_CAP)) - .doOnDiscard(Entry.class, entry -> { - if (entry.getKey() instanceof ByteBuf && entry.getValue() instanceof ByteBuf) { - //noinspection unchecked - var castedEntry = (Entry) entry; - castedEntry.getKey().release(); - castedEntry.getValue().release(); - } - }) .flatMap(Flux::collectList) - .doOnDiscard(Entry.class, entry -> { - if (entry.getKey() instanceof ByteBuf && entry.getValue() instanceof ByteBuf) { - //noinspection unchecked - var castedEntry = (Entry) entry; - castedEntry.getKey().release(); - castedEntry.getValue().release(); - } - }) .map(Collections::unmodifiableList) .flatMap(ew -> Mono .using( @@ -1089,6 +1076,14 @@ public class LLLocalDictionary implements LLDictionary { } ) ) + .doOnDiscard(Entry.class, entry -> { + if (entry.getKey() instanceof ByteBuf && entry.getValue() instanceof ByteBuf) { + //noinspection unchecked + var castedEntry = (Entry) entry; + castedEntry.getKey().release(); + castedEntry.getValue().release(); + } + }) .doOnDiscard(Collection.class, obj -> { //noinspection unchecked var castedEntries = (Collection>) obj; @@ -1161,6 +1156,7 @@ public class LLLocalDictionary implements LLDictionary { alloc, cfh, range.retain(), + databaseOptions.allowNettyDirect(), resolveSnapshot(snapshot), getRangeMultiDebugName ), @@ -1191,6 +1187,7 @@ public class LLLocalDictionary implements LLDictionary { cfh, prefixLength, range.retain(), + databaseOptions.allowNettyDirect(), resolveSnapshot(snapshot), "getRangeMultiGrouped" ), @@ -1233,6 +1230,7 @@ public class LLLocalDictionary implements LLDictionary { cfh, prefixLength, range.retain(), + databaseOptions.allowNettyDirect(), resolveSnapshot(snapshot), "getRangeKeysGrouped" ), @@ -1257,7 +1255,7 @@ public class LLLocalDictionary implements LLDictionary { ro.setReadaheadSize(32 * 1024); } ro.setVerifyChecksums(true); - var rocksIteratorTuple = getRocksIterator(ro, range.retain(), db, cfh); + var rocksIteratorTuple = getRocksIterator(databaseOptions.allowNettyDirect(), ro, range.retain(), db, cfh); try { try (var rocksIterator = rocksIteratorTuple.getT1()) { rocksIterator.seekToFirst(); @@ -1299,6 +1297,7 @@ public class LLLocalDictionary implements LLDictionary { cfh, prefixLength, range.retain(), + databaseOptions.allowNettyDirect(), resolveSnapshot(snapshot), true, "getRangeKeysGrouped" @@ -1343,6 +1342,7 @@ public class LLLocalDictionary implements LLDictionary { alloc, cfh, range.retain(), + databaseOptions.allowNettyDirect(), resolveSnapshot(snapshot), getRangeKeysMultiDebugName ), @@ -1368,20 +1368,28 @@ public class LLLocalDictionary implements LLDictionary { try (var opts = new ReadOptions(EMPTY_READ_OPTIONS)) { ReleasableSlice minBound; if (range.hasMin()) { - minBound = setIterateBound(opts, IterateBound.LOWER, range.getMin().retain()); + minBound = setIterateBound(databaseOptions.allowNettyDirect(), + opts, + IterateBound.LOWER, + range.getMin().retain() + ); } else { minBound = emptyReleasableSlice(); } try { ReleasableSlice maxBound; if (range.hasMax()) { - maxBound = setIterateBound(opts, IterateBound.UPPER, range.getMax().retain()); + maxBound = setIterateBound(databaseOptions.allowNettyDirect(), + opts, + IterateBound.UPPER, + range.getMax().retain() + ); } else { maxBound = emptyReleasableSlice(); } try (RocksIterator it = db.newIterator(cfh, opts)) { if (!PREFER_SEEK_TO_FIRST && range.hasMin()) { - rocksIterSeekTo(it, range.getMin().retain()); + rocksIterSeekTo(databaseOptions.allowNettyDirect(), it, range.getMin().retain()); } else { it.seekToFirst(); } @@ -1521,20 +1529,28 @@ public class LLLocalDictionary implements LLDictionary { readOpts.setFillCache(false); ReleasableSlice minBound; if (range.hasMin()) { - minBound = setIterateBound(readOpts, IterateBound.LOWER, range.getMin().retain()); + minBound = setIterateBound(databaseOptions.allowNettyDirect(), + readOpts, + IterateBound.LOWER, + range.getMin().retain() + ); } else { minBound = emptyReleasableSlice(); } try { ReleasableSlice maxBound; if (range.hasMax()) { - maxBound = setIterateBound(readOpts, IterateBound.UPPER, range.getMax().retain()); + maxBound = setIterateBound(databaseOptions.allowNettyDirect(), + readOpts, + IterateBound.UPPER, + range.getMax().retain() + ); } else { maxBound = emptyReleasableSlice(); } try (var rocksIterator = db.newIterator(cfh, readOpts)) { if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) { - rocksIterSeekTo(rocksIterator, range.getMin().retain()); + rocksIterSeekTo(databaseOptions.allowNettyDirect(), rocksIterator, range.getMin().retain()); } else { rocksIterator.seekToFirst(); } @@ -1561,20 +1577,28 @@ public class LLLocalDictionary implements LLDictionary { readOpts.setFillCache(false); ReleasableSlice minBound; if (range.hasMin()) { - minBound = setIterateBound(readOpts, IterateBound.LOWER, range.getMin().retain()); + minBound = setIterateBound(databaseOptions.allowNettyDirect(), + readOpts, + IterateBound.LOWER, + range.getMin().retain() + ); } else { minBound = emptyReleasableSlice(); } try { ReleasableSlice maxBound; if (range.hasMax()) { - maxBound = setIterateBound(readOpts, IterateBound.UPPER, range.getMax().retain()); + maxBound = setIterateBound(databaseOptions.allowNettyDirect(), + readOpts, + IterateBound.UPPER, + range.getMax().retain() + ); } else { maxBound = emptyReleasableSlice(); } try (var rocksIterator = db.newIterator(cfh, readOpts)) { if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) { - rocksIterSeekTo(rocksIterator, range.getMin().retain()); + rocksIterSeekTo(databaseOptions.allowNettyDirect(), rocksIterator, range.getMin().retain()); } else { rocksIterator.seekToFirst(); } @@ -1595,9 +1619,9 @@ public class LLLocalDictionary implements LLDictionary { } } - private static void rocksIterSeekTo(RocksIterator rocksIterator, ByteBuf buffer) { + private static void rocksIterSeekTo(boolean allowNettyDirect, RocksIterator rocksIterator, ByteBuf buffer) { try { - if (buffer.isDirect()) { + if (allowNettyDirect && buffer.isDirect()) { ByteBuffer nioBuffer = LLUtils.toDirect(buffer); assert nioBuffer.isDirect(); rocksIterator.seek(nioBuffer); @@ -1611,11 +1635,11 @@ public class LLLocalDictionary implements LLDictionary { } } - private static ReleasableSlice setIterateBound(ReadOptions readOpts, IterateBound boundType, ByteBuf buffer) { + private static ReleasableSlice setIterateBound(boolean allowNettyDirect, ReadOptions readOpts, IterateBound boundType, ByteBuf buffer) { try { Objects.requireNonNull(buffer); AbstractSlice slice; - if (LLLocalDictionary.USE_DIRECT_BUFFER_BOUNDS && buffer.isDirect()) { + if (allowNettyDirect && LLLocalDictionary.USE_DIRECT_BUFFER_BOUNDS && buffer.isDirect()) { ByteBuffer nioBuffer = LLUtils.toDirect(buffer); assert nioBuffer.isDirect(); slice = new DirectSlice(nioBuffer, buffer.readableBytes()); @@ -1737,14 +1761,22 @@ public class LLLocalDictionary implements LLDictionary { readOpts.setVerifyChecksums(VERIFY_CHECKSUMS_WHEN_NOT_NEEDED); ReleasableSlice minBound; if (range.hasMin()) { - minBound = setIterateBound(readOpts, IterateBound.LOWER, range.getMin().retain()); + minBound = setIterateBound(databaseOptions.allowNettyDirect(), + readOpts, + IterateBound.LOWER, + range.getMin().retain() + ); } else { minBound = emptyReleasableSlice(); } try { ReleasableSlice maxBound; if (range.hasMax()) { - maxBound = setIterateBound(readOpts, IterateBound.UPPER, range.getMax().retain()); + maxBound = setIterateBound(databaseOptions.allowNettyDirect(), + readOpts, + IterateBound.UPPER, + range.getMax().retain() + ); } else { maxBound = emptyReleasableSlice(); } @@ -1755,7 +1787,10 @@ public class LLLocalDictionary implements LLDictionary { } try (var rocksIterator = db.newIterator(cfh, readOpts)) { if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) { - rocksIterSeekTo(rocksIterator, range.getMin().retain()); + rocksIterSeekTo(databaseOptions.allowNettyDirect(), + rocksIterator, + range.getMin().retain() + ); } else { rocksIterator.seekToFirst(); } @@ -1796,20 +1831,28 @@ public class LLLocalDictionary implements LLDictionary { try (var readOpts = new ReadOptions(resolveSnapshot(snapshot))) { ReleasableSlice minBound; if (range.hasMin()) { - minBound = setIterateBound(readOpts, IterateBound.LOWER, range.getMin().retain()); + minBound = setIterateBound(databaseOptions.allowNettyDirect(), + readOpts, + IterateBound.LOWER, + range.getMin().retain() + ); } else { minBound = emptyReleasableSlice(); } try { ReleasableSlice maxBound; if (range.hasMax()) { - maxBound = setIterateBound(readOpts, IterateBound.UPPER, range.getMax().retain()); + maxBound = setIterateBound(databaseOptions.allowNettyDirect(), + readOpts, + IterateBound.UPPER, + range.getMax().retain() + ); } else { maxBound = emptyReleasableSlice(); } try (var rocksIterator = db.newIterator(cfh, readOpts)) { if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) { - rocksIterSeekTo(rocksIterator, range.getMin().retain()); + rocksIterSeekTo(databaseOptions.allowNettyDirect(), rocksIterator, range.getMin().retain()); } else { rocksIterator.seekToFirst(); } @@ -1853,20 +1896,28 @@ public class LLLocalDictionary implements LLDictionary { try (var readOpts = new ReadOptions(resolveSnapshot(snapshot))) { ReleasableSlice minBound; if (range.hasMin()) { - minBound = setIterateBound(readOpts, IterateBound.LOWER, range.getMin().retain()); + minBound = setIterateBound(databaseOptions.allowNettyDirect(), + readOpts, + IterateBound.LOWER, + range.getMin().retain() + ); } else { minBound = emptyReleasableSlice(); } try { ReleasableSlice maxBound; if (range.hasMax()) { - maxBound = setIterateBound(readOpts, IterateBound.UPPER, range.getMax().retain()); + maxBound = setIterateBound(databaseOptions.allowNettyDirect(), + readOpts, + IterateBound.UPPER, + range.getMax().retain() + ); } else { maxBound = emptyReleasableSlice(); } try (var rocksIterator = db.newIterator(cfh, readOpts)) { if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) { - rocksIterSeekTo(rocksIterator, range.getMin().retain()); + rocksIterSeekTo(databaseOptions.allowNettyDirect(), rocksIterator, range.getMin().retain()); } else { rocksIterator.seekToFirst(); } @@ -2010,20 +2061,28 @@ public class LLLocalDictionary implements LLDictionary { try (var readOpts = new ReadOptions(getReadOptions(null))) { ReleasableSlice minBound; if (range.hasMin()) { - minBound = setIterateBound(readOpts, IterateBound.LOWER, range.getMin().retain()); + minBound = setIterateBound(databaseOptions.allowNettyDirect(), + readOpts, + IterateBound.LOWER, + range.getMin().retain() + ); } else { minBound = emptyReleasableSlice(); } try { ReleasableSlice maxBound; if (range.hasMax()) { - maxBound = setIterateBound(readOpts, IterateBound.UPPER, range.getMax().retain()); + maxBound = setIterateBound(databaseOptions.allowNettyDirect(), + readOpts, + IterateBound.UPPER, + range.getMax().retain() + ); } else { maxBound = emptyReleasableSlice(); } try (RocksIterator rocksIterator = db.newIterator(cfh, readOpts)) { if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) { - rocksIterSeekTo(rocksIterator, range.getMin().retain()); + rocksIterSeekTo(databaseOptions.allowNettyDirect(), rocksIterator, range.getMin().retain()); } else { rocksIterator.seekToFirst(); } @@ -2053,7 +2112,8 @@ public class LLLocalDictionary implements LLDictionary { } @NotNull - public static Tuple3 getRocksIterator(ReadOptions readOptions, + public static Tuple3 getRocksIterator(boolean allowNettyDirect, + ReadOptions readOptions, LLRange range, RocksDB db, ColumnFamilyHandle cfh) { @@ -2061,18 +2121,18 @@ public class LLLocalDictionary implements LLDictionary { ReleasableSlice sliceMin; ReleasableSlice sliceMax; if (range.hasMin()) { - sliceMin = setIterateBound(readOptions, IterateBound.LOWER, range.getMin().retain()); + sliceMin = setIterateBound(allowNettyDirect, readOptions, IterateBound.LOWER, range.getMin().retain()); } else { sliceMin = emptyReleasableSlice(); } if (range.hasMax()) { - sliceMax = setIterateBound(readOptions, IterateBound.UPPER, range.getMax().retain()); + sliceMax = setIterateBound(allowNettyDirect, readOptions, IterateBound.UPPER, range.getMax().retain()); } else { sliceMax = emptyReleasableSlice(); } var rocksIterator = db.newIterator(cfh, readOptions); if (!PREFER_SEEK_TO_FIRST && range.hasMin()) { - rocksIterSeekTo(rocksIterator, range.getMin().retain()); + rocksIterSeekTo(allowNettyDirect, rocksIterator, range.getMin().retain()); } else { rocksIterator.seekToFirst(); } diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalEntryReactiveRocksIterator.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalEntryReactiveRocksIterator.java index 1dfa6de..b8e014a 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalEntryReactiveRocksIterator.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalEntryReactiveRocksIterator.java @@ -15,9 +15,10 @@ public class LLLocalEntryReactiveRocksIterator extends LLLocalReactiveRocksItera ByteBufAllocator alloc, ColumnFamilyHandle cfh, LLRange range, + boolean allowNettyDirect, ReadOptions readOptions, String debugName) { - super(db, alloc, cfh, range, readOptions, true, debugName); + super(db, alloc, cfh, range, allowNettyDirect, readOptions, true, debugName); } @Override diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalGroupedEntryReactiveRocksIterator.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalGroupedEntryReactiveRocksIterator.java index 0f63943..259f416 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalGroupedEntryReactiveRocksIterator.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalGroupedEntryReactiveRocksIterator.java @@ -15,9 +15,10 @@ public class LLLocalGroupedEntryReactiveRocksIterator extends public LLLocalGroupedEntryReactiveRocksIterator(RocksDB db, ByteBufAllocator alloc, ColumnFamilyHandle cfh, int prefixLength, LLRange range, + boolean allowNettyDirect, ReadOptions readOptions, String debugName) { - super(db, alloc, cfh, prefixLength, range, readOptions, false, true); + super(db, alloc, cfh, prefixLength, range, allowNettyDirect, readOptions, false, true); } @Override diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalGroupedKeyReactiveRocksIterator.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalGroupedKeyReactiveRocksIterator.java index f961ee4..17111d9 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalGroupedKeyReactiveRocksIterator.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalGroupedKeyReactiveRocksIterator.java @@ -14,9 +14,10 @@ public class LLLocalGroupedKeyReactiveRocksIterator extends LLLocalGroupedReacti ColumnFamilyHandle cfh, int prefixLength, LLRange range, + boolean allowNettyDirect, ReadOptions readOptions, String debugName) { - super(db, alloc, cfh, prefixLength, range, readOptions, true, false); + super(db, alloc, cfh, prefixLength, range, allowNettyDirect, readOptions, true, false); } @Override diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalGroupedReactiveRocksIterator.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalGroupedReactiveRocksIterator.java index 05def19..4d5da06 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalGroupedReactiveRocksIterator.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalGroupedReactiveRocksIterator.java @@ -24,6 +24,7 @@ public abstract class LLLocalGroupedReactiveRocksIterator { private final ColumnFamilyHandle cfh; private final int prefixLength; private final LLRange range; + private final boolean allowNettyDirect; private final ReadOptions readOptions; private final boolean canFillCache; private final boolean readValues; @@ -31,6 +32,7 @@ public abstract class LLLocalGroupedReactiveRocksIterator { public LLLocalGroupedReactiveRocksIterator(RocksDB db, ByteBufAllocator alloc, ColumnFamilyHandle cfh, int prefixLength, LLRange range, + boolean allowNettyDirect, ReadOptions readOptions, boolean canFillCache, boolean readValues) { @@ -39,6 +41,7 @@ public abstract class LLLocalGroupedReactiveRocksIterator { this.cfh = cfh; this.prefixLength = prefixLength; this.range = range; + this.allowNettyDirect = allowNettyDirect; this.readOptions = readOptions; this.canFillCache = canFillCache; this.readValues = readValues; @@ -50,7 +53,7 @@ public abstract class LLLocalGroupedReactiveRocksIterator { .generate(() -> { var readOptions = new ReadOptions(this.readOptions); readOptions.setFillCache(canFillCache && range.hasMin() && range.hasMax()); - return LLLocalDictionary.getRocksIterator(readOptions, range.retain(), db, cfh); + return LLLocalDictionary.getRocksIterator(allowNettyDirect, readOptions, range.retain(), db, cfh); }, (tuple, sink) -> { range.retain(); try { diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyPrefixReactiveRocksIterator.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyPrefixReactiveRocksIterator.java index d4b82a6..9e667c7 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyPrefixReactiveRocksIterator.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyPrefixReactiveRocksIterator.java @@ -21,6 +21,7 @@ public class LLLocalKeyPrefixReactiveRocksIterator { private final ColumnFamilyHandle cfh; private final int prefixLength; private final LLRange range; + private final boolean allowNettyDirect; private final ReadOptions readOptions; private final boolean canFillCache; private final String debugName; @@ -28,6 +29,7 @@ public class LLLocalKeyPrefixReactiveRocksIterator { public LLLocalKeyPrefixReactiveRocksIterator(RocksDB db, ByteBufAllocator alloc, ColumnFamilyHandle cfh, int prefixLength, LLRange range, + boolean allowNettyDirect, ReadOptions readOptions, boolean canFillCache, String debugName) { @@ -36,6 +38,7 @@ public class LLLocalKeyPrefixReactiveRocksIterator { this.cfh = cfh; this.prefixLength = prefixLength; this.range = range; + this.allowNettyDirect = allowNettyDirect; this.readOptions = readOptions; this.canFillCache = canFillCache; this.debugName = debugName; @@ -50,7 +53,7 @@ public class LLLocalKeyPrefixReactiveRocksIterator { readOptions.setReadaheadSize(32 * 1024); // 32KiB readOptions.setFillCache(canFillCache); } - return LLLocalDictionary.getRocksIterator(readOptions, range.retain(), db, cfh); + return LLLocalDictionary.getRocksIterator(allowNettyDirect, readOptions, range.retain(), db, cfh); }, (tuple, sink) -> { range.retain(); try { diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyReactiveRocksIterator.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyReactiveRocksIterator.java index e732d90..8a6fac4 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyReactiveRocksIterator.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyReactiveRocksIterator.java @@ -13,9 +13,10 @@ public class LLLocalKeyReactiveRocksIterator extends LLLocalReactiveRocksIterato ByteBufAllocator alloc, ColumnFamilyHandle cfh, LLRange range, + boolean allowNettyDirect, ReadOptions readOptions, String debugName) { - super(db, alloc, cfh, range, readOptions, false, debugName); + super(db, alloc, cfh, range, allowNettyDirect, readOptions, false, debugName); } @Override diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java index 685a782..d4f474b 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java @@ -474,7 +474,8 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { Column.toString(columnName), dbScheduler, (snapshot) -> snapshotsHandles.get(snapshot.getSequenceNumber()), - updateMode + updateMode, + databaseOptions )) .subscribeOn(dbScheduler); } diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalReactiveRocksIterator.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalReactiveRocksIterator.java index d605f10..15d2b81 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalReactiveRocksIterator.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalReactiveRocksIterator.java @@ -29,6 +29,7 @@ public abstract class LLLocalReactiveRocksIterator { private final ByteBufAllocator alloc; private final ColumnFamilyHandle cfh; private final LLRange range; + private final boolean allowNettyDirect; private final ReadOptions readOptions; private final boolean readValues; private final String debugName; @@ -37,6 +38,7 @@ public abstract class LLLocalReactiveRocksIterator { ByteBufAllocator alloc, ColumnFamilyHandle cfh, LLRange range, + boolean allowNettyDirect, ReadOptions readOptions, boolean readValues, String debugName) { @@ -44,6 +46,7 @@ public abstract class LLLocalReactiveRocksIterator { this.alloc = alloc; this.cfh = cfh; this.range = range; + this.allowNettyDirect = allowNettyDirect; this.readOptions = readOptions; this.readValues = readValues; this.debugName = debugName; @@ -57,7 +60,7 @@ public abstract class LLLocalReactiveRocksIterator { readOptions.setReadaheadSize(32 * 1024); // 32KiB readOptions.setFillCache(false); } - return getRocksIterator(readOptions, range.retain(), db, cfh); + return getRocksIterator(allowNettyDirect, readOptions, range.retain(), db, cfh); }, (tuple, sink) -> { range.retain(); try { diff --git a/src/test/java/it/cavallium/dbengine/DbTestUtils.java b/src/test/java/it/cavallium/dbengine/DbTestUtils.java index 09aa748..003897d 100644 --- a/src/test/java/it/cavallium/dbengine/DbTestUtils.java +++ b/src/test/java/it/cavallium/dbengine/DbTestUtils.java @@ -58,7 +58,7 @@ public class DbTestUtils { .then(new LLLocalDatabaseConnection(DbTestUtils.ALLOCATOR, wrkspcPath).connect()) .flatMap(conn -> conn.getDatabase("testdb", List.of(Column.dictionary("testmap"), Column.special("ints"), Column.special("longs")), - new DatabaseOptions(Map.of(), true, false, true, false, true) + new DatabaseOptions(Map.of(), true, false, true, false, true, true, true) )), action, db -> db.close().then(Mono.fromCallable(() -> { diff --git a/src/test/java/it/cavallium/dbengine/OldDatabaseTests.java b/src/test/java/it/cavallium/dbengine/OldDatabaseTests.java index 212f272..9b8dcc9 100644 --- a/src/test/java/it/cavallium/dbengine/OldDatabaseTests.java +++ b/src/test/java/it/cavallium/dbengine/OldDatabaseTests.java @@ -135,7 +135,7 @@ public class OldDatabaseTests { .then(new LLLocalDatabaseConnection(PooledByteBufAllocator.DEFAULT, wrkspcPath).connect()) .flatMap(conn -> conn.getDatabase("testdb", List.of(Column.dictionary("testmap")), - new DatabaseOptions(Map.of(), true, false, true, false, true) + new DatabaseOptions(Map.of(), true, false, true, false, true, true, true) )); }