Configurable direct buffers

This commit is contained in:
Andrea Cavalli 2021-06-29 23:31:02 +02:00
parent 2df2c00e36
commit 3758d06853
12 changed files with 141 additions and 65 deletions

View File

@ -11,4 +11,6 @@ public record DatabaseOptions(Map<String, String> extraFlags,
boolean lowMemory,
boolean inMemory,
boolean useDirectIO,
boolean allowMemoryMapping) {}
boolean allowMemoryMapping,
boolean allowNettyDirect,
boolean useNettyDirect) {}

View File

@ -138,6 +138,7 @@ public class LLLocalDictionary implements LLDictionary {
private final ByteBufAllocator alloc;
private final String getRangeMultiDebugName;
private final String getRangeKeysMultiDebugName;
private final DatabaseOptions databaseOptions;
public LLLocalDictionary(
ByteBufAllocator allocator,
@ -147,7 +148,8 @@ public class LLLocalDictionary implements LLDictionary {
String columnName,
Scheduler dbScheduler,
Function<LLSnapshot, Snapshot> snapshotResolver,
UpdateMode updateMode) {
UpdateMode updateMode,
DatabaseOptions databaseOptions) {
Objects.requireNonNull(db);
this.db = db;
Objects.requireNonNull(columnFamilyHandle);
@ -159,6 +161,7 @@ public class LLLocalDictionary implements LLDictionary {
this.updateMode = updateMode;
this.getRangeMultiDebugName = databaseName + "(" + columnName + ")" + "::getRangeMulti";
this.getRangeKeysMultiDebugName = databaseName + "(" + columnName + ")" + "::getRangeKeysMulti";
this.databaseOptions = databaseOptions;
alloc = allocator;
}
@ -258,7 +261,7 @@ public class LLLocalDictionary implements LLDictionary {
ByteBuf key,
boolean existsAlmostCertainly) throws RocksDBException {
try {
if (key.isDirect()) {
if (databaseOptions.allowNettyDirect() && key.isDirect()) {
//todo: implement keyMayExist if existsAlmostCertainly is false.
// Unfortunately it's not feasible until RocksDB implements keyMayExist with buffers
@ -366,7 +369,7 @@ public class LLLocalDictionary implements LLDictionary {
private void dbPut(ColumnFamilyHandle cfh, @Nullable WriteOptions writeOptions, ByteBuf key, ByteBuf value)
throws RocksDBException {
try {
if (key.isDirect() && value.isDirect()) {
if (databaseOptions.allowNettyDirect() && key.isDirect() && value.isDirect()) {
if (!key.isDirect()) {
throw new RocksDBException("Key buffer must be direct");
}
@ -416,7 +419,7 @@ public class LLLocalDictionary implements LLDictionary {
readOpts.setVerifyChecksums(VERIFY_CHECKSUMS_WHEN_NOT_NEEDED);
readOpts.setFillCache(false);
if (range.hasMin()) {
if (range.getMin().isDirect()) {
if (databaseOptions.allowNettyDirect() && range.getMin().isDirect()) {
readOpts.setIterateLowerBound(new DirectSlice(Objects.requireNonNull(LLUtils.toDirect(range.getMin()),
"This range must use direct buffers"
)));
@ -425,7 +428,7 @@ public class LLLocalDictionary implements LLDictionary {
}
}
if (range.hasMax()) {
if (range.getMax().isDirect()) {
if (databaseOptions.allowNettyDirect() && range.getMax().isDirect()) {
readOpts.setIterateUpperBound(new DirectSlice(Objects.requireNonNull(LLUtils.toDirect(range.getMax()),
"This range must use direct buffers"
)));
@ -435,7 +438,7 @@ public class LLLocalDictionary implements LLDictionary {
}
try (RocksIterator rocksIterator = db.newIterator(cfh, readOpts)) {
if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) {
if (range.getMin().isDirect()) {
if (databaseOptions.allowNettyDirect() && range.getMin().isDirect()) {
rocksIterator.seek(Objects.requireNonNull(LLUtils.toDirect(range.getMin()),
"This range must use direct buffers"
));
@ -809,7 +812,7 @@ public class LLLocalDictionary implements LLDictionary {
private void dbDelete(ColumnFamilyHandle cfh, @Nullable WriteOptions writeOptions, ByteBuf key)
throws RocksDBException {
try {
if (key.isDirect()) {
if (databaseOptions.allowNettyDirect() && key.isDirect()) {
if (!key.isDirect()) {
throw new IllegalArgumentException("Key must be a direct buffer");
}
@ -983,7 +986,7 @@ public class LLLocalDictionary implements LLDictionary {
}
})
.subscribeOn(dbScheduler)
.flatMapMany(entries -> Flux.fromIterable(entries))
.flatMapMany(Flux::fromIterable)
.onErrorMap(cause -> new IOException("Failed to read keys "
+ Arrays.deepToString(keysWindow.toArray(ByteBuf[]::new)), cause))
.doAfterTerminate(() -> keysWindow.forEach(ReferenceCounted::release))
@ -1001,23 +1004,7 @@ public class LLLocalDictionary implements LLDictionary {
public Flux<Entry<ByteBuf, ByteBuf>> putMulti(Flux<Entry<ByteBuf, ByteBuf>> entries, boolean getOldValues) {
return entries
.window(Math.min(MULTI_GET_WINDOW, CAPPED_WRITE_BATCH_CAP))
.doOnDiscard(Entry.class, entry -> {
if (entry.getKey() instanceof ByteBuf && entry.getValue() instanceof ByteBuf) {
//noinspection unchecked
var castedEntry = (Entry<ByteBuf, ByteBuf>) entry;
castedEntry.getKey().release();
castedEntry.getValue().release();
}
})
.flatMap(Flux::collectList)
.doOnDiscard(Entry.class, entry -> {
if (entry.getKey() instanceof ByteBuf && entry.getValue() instanceof ByteBuf) {
//noinspection unchecked
var castedEntry = (Entry<ByteBuf, ByteBuf>) entry;
castedEntry.getKey().release();
castedEntry.getValue().release();
}
})
.map(Collections::unmodifiableList)
.flatMap(ew -> Mono
.using(
@ -1089,6 +1076,14 @@ public class LLLocalDictionary implements LLDictionary {
}
)
)
.doOnDiscard(Entry.class, entry -> {
if (entry.getKey() instanceof ByteBuf && entry.getValue() instanceof ByteBuf) {
//noinspection unchecked
var castedEntry = (Entry<ByteBuf, ByteBuf>) entry;
castedEntry.getKey().release();
castedEntry.getValue().release();
}
})
.doOnDiscard(Collection.class, obj -> {
//noinspection unchecked
var castedEntries = (Collection<Entry<ByteBuf, ByteBuf>>) obj;
@ -1161,6 +1156,7 @@ public class LLLocalDictionary implements LLDictionary {
alloc,
cfh,
range.retain(),
databaseOptions.allowNettyDirect(),
resolveSnapshot(snapshot),
getRangeMultiDebugName
),
@ -1191,6 +1187,7 @@ public class LLLocalDictionary implements LLDictionary {
cfh,
prefixLength,
range.retain(),
databaseOptions.allowNettyDirect(),
resolveSnapshot(snapshot),
"getRangeMultiGrouped"
),
@ -1233,6 +1230,7 @@ public class LLLocalDictionary implements LLDictionary {
cfh,
prefixLength,
range.retain(),
databaseOptions.allowNettyDirect(),
resolveSnapshot(snapshot),
"getRangeKeysGrouped"
),
@ -1257,7 +1255,7 @@ public class LLLocalDictionary implements LLDictionary {
ro.setReadaheadSize(32 * 1024);
}
ro.setVerifyChecksums(true);
var rocksIteratorTuple = getRocksIterator(ro, range.retain(), db, cfh);
var rocksIteratorTuple = getRocksIterator(databaseOptions.allowNettyDirect(), ro, range.retain(), db, cfh);
try {
try (var rocksIterator = rocksIteratorTuple.getT1()) {
rocksIterator.seekToFirst();
@ -1299,6 +1297,7 @@ public class LLLocalDictionary implements LLDictionary {
cfh,
prefixLength,
range.retain(),
databaseOptions.allowNettyDirect(),
resolveSnapshot(snapshot),
true,
"getRangeKeysGrouped"
@ -1343,6 +1342,7 @@ public class LLLocalDictionary implements LLDictionary {
alloc,
cfh,
range.retain(),
databaseOptions.allowNettyDirect(),
resolveSnapshot(snapshot),
getRangeKeysMultiDebugName
),
@ -1368,20 +1368,28 @@ public class LLLocalDictionary implements LLDictionary {
try (var opts = new ReadOptions(EMPTY_READ_OPTIONS)) {
ReleasableSlice minBound;
if (range.hasMin()) {
minBound = setIterateBound(opts, IterateBound.LOWER, range.getMin().retain());
minBound = setIterateBound(databaseOptions.allowNettyDirect(),
opts,
IterateBound.LOWER,
range.getMin().retain()
);
} else {
minBound = emptyReleasableSlice();
}
try {
ReleasableSlice maxBound;
if (range.hasMax()) {
maxBound = setIterateBound(opts, IterateBound.UPPER, range.getMax().retain());
maxBound = setIterateBound(databaseOptions.allowNettyDirect(),
opts,
IterateBound.UPPER,
range.getMax().retain()
);
} else {
maxBound = emptyReleasableSlice();
}
try (RocksIterator it = db.newIterator(cfh, opts)) {
if (!PREFER_SEEK_TO_FIRST && range.hasMin()) {
rocksIterSeekTo(it, range.getMin().retain());
rocksIterSeekTo(databaseOptions.allowNettyDirect(), it, range.getMin().retain());
} else {
it.seekToFirst();
}
@ -1521,20 +1529,28 @@ public class LLLocalDictionary implements LLDictionary {
readOpts.setFillCache(false);
ReleasableSlice minBound;
if (range.hasMin()) {
minBound = setIterateBound(readOpts, IterateBound.LOWER, range.getMin().retain());
minBound = setIterateBound(databaseOptions.allowNettyDirect(),
readOpts,
IterateBound.LOWER,
range.getMin().retain()
);
} else {
minBound = emptyReleasableSlice();
}
try {
ReleasableSlice maxBound;
if (range.hasMax()) {
maxBound = setIterateBound(readOpts, IterateBound.UPPER, range.getMax().retain());
maxBound = setIterateBound(databaseOptions.allowNettyDirect(),
readOpts,
IterateBound.UPPER,
range.getMax().retain()
);
} else {
maxBound = emptyReleasableSlice();
}
try (var rocksIterator = db.newIterator(cfh, readOpts)) {
if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) {
rocksIterSeekTo(rocksIterator, range.getMin().retain());
rocksIterSeekTo(databaseOptions.allowNettyDirect(), rocksIterator, range.getMin().retain());
} else {
rocksIterator.seekToFirst();
}
@ -1561,20 +1577,28 @@ public class LLLocalDictionary implements LLDictionary {
readOpts.setFillCache(false);
ReleasableSlice minBound;
if (range.hasMin()) {
minBound = setIterateBound(readOpts, IterateBound.LOWER, range.getMin().retain());
minBound = setIterateBound(databaseOptions.allowNettyDirect(),
readOpts,
IterateBound.LOWER,
range.getMin().retain()
);
} else {
minBound = emptyReleasableSlice();
}
try {
ReleasableSlice maxBound;
if (range.hasMax()) {
maxBound = setIterateBound(readOpts, IterateBound.UPPER, range.getMax().retain());
maxBound = setIterateBound(databaseOptions.allowNettyDirect(),
readOpts,
IterateBound.UPPER,
range.getMax().retain()
);
} else {
maxBound = emptyReleasableSlice();
}
try (var rocksIterator = db.newIterator(cfh, readOpts)) {
if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) {
rocksIterSeekTo(rocksIterator, range.getMin().retain());
rocksIterSeekTo(databaseOptions.allowNettyDirect(), rocksIterator, range.getMin().retain());
} else {
rocksIterator.seekToFirst();
}
@ -1595,9 +1619,9 @@ public class LLLocalDictionary implements LLDictionary {
}
}
private static void rocksIterSeekTo(RocksIterator rocksIterator, ByteBuf buffer) {
private static void rocksIterSeekTo(boolean allowNettyDirect, RocksIterator rocksIterator, ByteBuf buffer) {
try {
if (buffer.isDirect()) {
if (allowNettyDirect && buffer.isDirect()) {
ByteBuffer nioBuffer = LLUtils.toDirect(buffer);
assert nioBuffer.isDirect();
rocksIterator.seek(nioBuffer);
@ -1611,11 +1635,11 @@ public class LLLocalDictionary implements LLDictionary {
}
}
private static ReleasableSlice setIterateBound(ReadOptions readOpts, IterateBound boundType, ByteBuf buffer) {
private static ReleasableSlice setIterateBound(boolean allowNettyDirect, ReadOptions readOpts, IterateBound boundType, ByteBuf buffer) {
try {
Objects.requireNonNull(buffer);
AbstractSlice<?> slice;
if (LLLocalDictionary.USE_DIRECT_BUFFER_BOUNDS && buffer.isDirect()) {
if (allowNettyDirect && LLLocalDictionary.USE_DIRECT_BUFFER_BOUNDS && buffer.isDirect()) {
ByteBuffer nioBuffer = LLUtils.toDirect(buffer);
assert nioBuffer.isDirect();
slice = new DirectSlice(nioBuffer, buffer.readableBytes());
@ -1737,14 +1761,22 @@ public class LLLocalDictionary implements LLDictionary {
readOpts.setVerifyChecksums(VERIFY_CHECKSUMS_WHEN_NOT_NEEDED);
ReleasableSlice minBound;
if (range.hasMin()) {
minBound = setIterateBound(readOpts, IterateBound.LOWER, range.getMin().retain());
minBound = setIterateBound(databaseOptions.allowNettyDirect(),
readOpts,
IterateBound.LOWER,
range.getMin().retain()
);
} else {
minBound = emptyReleasableSlice();
}
try {
ReleasableSlice maxBound;
if (range.hasMax()) {
maxBound = setIterateBound(readOpts, IterateBound.UPPER, range.getMax().retain());
maxBound = setIterateBound(databaseOptions.allowNettyDirect(),
readOpts,
IterateBound.UPPER,
range.getMax().retain()
);
} else {
maxBound = emptyReleasableSlice();
}
@ -1755,7 +1787,10 @@ public class LLLocalDictionary implements LLDictionary {
}
try (var rocksIterator = db.newIterator(cfh, readOpts)) {
if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) {
rocksIterSeekTo(rocksIterator, range.getMin().retain());
rocksIterSeekTo(databaseOptions.allowNettyDirect(),
rocksIterator,
range.getMin().retain()
);
} else {
rocksIterator.seekToFirst();
}
@ -1796,20 +1831,28 @@ public class LLLocalDictionary implements LLDictionary {
try (var readOpts = new ReadOptions(resolveSnapshot(snapshot))) {
ReleasableSlice minBound;
if (range.hasMin()) {
minBound = setIterateBound(readOpts, IterateBound.LOWER, range.getMin().retain());
minBound = setIterateBound(databaseOptions.allowNettyDirect(),
readOpts,
IterateBound.LOWER,
range.getMin().retain()
);
} else {
minBound = emptyReleasableSlice();
}
try {
ReleasableSlice maxBound;
if (range.hasMax()) {
maxBound = setIterateBound(readOpts, IterateBound.UPPER, range.getMax().retain());
maxBound = setIterateBound(databaseOptions.allowNettyDirect(),
readOpts,
IterateBound.UPPER,
range.getMax().retain()
);
} else {
maxBound = emptyReleasableSlice();
}
try (var rocksIterator = db.newIterator(cfh, readOpts)) {
if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) {
rocksIterSeekTo(rocksIterator, range.getMin().retain());
rocksIterSeekTo(databaseOptions.allowNettyDirect(), rocksIterator, range.getMin().retain());
} else {
rocksIterator.seekToFirst();
}
@ -1853,20 +1896,28 @@ public class LLLocalDictionary implements LLDictionary {
try (var readOpts = new ReadOptions(resolveSnapshot(snapshot))) {
ReleasableSlice minBound;
if (range.hasMin()) {
minBound = setIterateBound(readOpts, IterateBound.LOWER, range.getMin().retain());
minBound = setIterateBound(databaseOptions.allowNettyDirect(),
readOpts,
IterateBound.LOWER,
range.getMin().retain()
);
} else {
minBound = emptyReleasableSlice();
}
try {
ReleasableSlice maxBound;
if (range.hasMax()) {
maxBound = setIterateBound(readOpts, IterateBound.UPPER, range.getMax().retain());
maxBound = setIterateBound(databaseOptions.allowNettyDirect(),
readOpts,
IterateBound.UPPER,
range.getMax().retain()
);
} else {
maxBound = emptyReleasableSlice();
}
try (var rocksIterator = db.newIterator(cfh, readOpts)) {
if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) {
rocksIterSeekTo(rocksIterator, range.getMin().retain());
rocksIterSeekTo(databaseOptions.allowNettyDirect(), rocksIterator, range.getMin().retain());
} else {
rocksIterator.seekToFirst();
}
@ -2010,20 +2061,28 @@ public class LLLocalDictionary implements LLDictionary {
try (var readOpts = new ReadOptions(getReadOptions(null))) {
ReleasableSlice minBound;
if (range.hasMin()) {
minBound = setIterateBound(readOpts, IterateBound.LOWER, range.getMin().retain());
minBound = setIterateBound(databaseOptions.allowNettyDirect(),
readOpts,
IterateBound.LOWER,
range.getMin().retain()
);
} else {
minBound = emptyReleasableSlice();
}
try {
ReleasableSlice maxBound;
if (range.hasMax()) {
maxBound = setIterateBound(readOpts, IterateBound.UPPER, range.getMax().retain());
maxBound = setIterateBound(databaseOptions.allowNettyDirect(),
readOpts,
IterateBound.UPPER,
range.getMax().retain()
);
} else {
maxBound = emptyReleasableSlice();
}
try (RocksIterator rocksIterator = db.newIterator(cfh, readOpts)) {
if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) {
rocksIterSeekTo(rocksIterator, range.getMin().retain());
rocksIterSeekTo(databaseOptions.allowNettyDirect(), rocksIterator, range.getMin().retain());
} else {
rocksIterator.seekToFirst();
}
@ -2053,7 +2112,8 @@ public class LLLocalDictionary implements LLDictionary {
}
@NotNull
public static Tuple3<RocksIterator, ReleasableSlice, ReleasableSlice> getRocksIterator(ReadOptions readOptions,
public static Tuple3<RocksIterator, ReleasableSlice, ReleasableSlice> getRocksIterator(boolean allowNettyDirect,
ReadOptions readOptions,
LLRange range,
RocksDB db,
ColumnFamilyHandle cfh) {
@ -2061,18 +2121,18 @@ public class LLLocalDictionary implements LLDictionary {
ReleasableSlice sliceMin;
ReleasableSlice sliceMax;
if (range.hasMin()) {
sliceMin = setIterateBound(readOptions, IterateBound.LOWER, range.getMin().retain());
sliceMin = setIterateBound(allowNettyDirect, readOptions, IterateBound.LOWER, range.getMin().retain());
} else {
sliceMin = emptyReleasableSlice();
}
if (range.hasMax()) {
sliceMax = setIterateBound(readOptions, IterateBound.UPPER, range.getMax().retain());
sliceMax = setIterateBound(allowNettyDirect, readOptions, IterateBound.UPPER, range.getMax().retain());
} else {
sliceMax = emptyReleasableSlice();
}
var rocksIterator = db.newIterator(cfh, readOptions);
if (!PREFER_SEEK_TO_FIRST && range.hasMin()) {
rocksIterSeekTo(rocksIterator, range.getMin().retain());
rocksIterSeekTo(allowNettyDirect, rocksIterator, range.getMin().retain());
} else {
rocksIterator.seekToFirst();
}

View File

@ -15,9 +15,10 @@ public class LLLocalEntryReactiveRocksIterator extends LLLocalReactiveRocksItera
ByteBufAllocator alloc,
ColumnFamilyHandle cfh,
LLRange range,
boolean allowNettyDirect,
ReadOptions readOptions,
String debugName) {
super(db, alloc, cfh, range, readOptions, true, debugName);
super(db, alloc, cfh, range, allowNettyDirect, readOptions, true, debugName);
}
@Override

View File

@ -15,9 +15,10 @@ public class LLLocalGroupedEntryReactiveRocksIterator extends
public LLLocalGroupedEntryReactiveRocksIterator(RocksDB db, ByteBufAllocator alloc, ColumnFamilyHandle cfh,
int prefixLength,
LLRange range,
boolean allowNettyDirect,
ReadOptions readOptions,
String debugName) {
super(db, alloc, cfh, prefixLength, range, readOptions, false, true);
super(db, alloc, cfh, prefixLength, range, allowNettyDirect, readOptions, false, true);
}
@Override

View File

@ -14,9 +14,10 @@ public class LLLocalGroupedKeyReactiveRocksIterator extends LLLocalGroupedReacti
ColumnFamilyHandle cfh,
int prefixLength,
LLRange range,
boolean allowNettyDirect,
ReadOptions readOptions,
String debugName) {
super(db, alloc, cfh, prefixLength, range, readOptions, true, false);
super(db, alloc, cfh, prefixLength, range, allowNettyDirect, readOptions, true, false);
}
@Override

View File

@ -24,6 +24,7 @@ public abstract class LLLocalGroupedReactiveRocksIterator<T> {
private final ColumnFamilyHandle cfh;
private final int prefixLength;
private final LLRange range;
private final boolean allowNettyDirect;
private final ReadOptions readOptions;
private final boolean canFillCache;
private final boolean readValues;
@ -31,6 +32,7 @@ public abstract class LLLocalGroupedReactiveRocksIterator<T> {
public LLLocalGroupedReactiveRocksIterator(RocksDB db, ByteBufAllocator alloc, ColumnFamilyHandle cfh,
int prefixLength,
LLRange range,
boolean allowNettyDirect,
ReadOptions readOptions,
boolean canFillCache,
boolean readValues) {
@ -39,6 +41,7 @@ public abstract class LLLocalGroupedReactiveRocksIterator<T> {
this.cfh = cfh;
this.prefixLength = prefixLength;
this.range = range;
this.allowNettyDirect = allowNettyDirect;
this.readOptions = readOptions;
this.canFillCache = canFillCache;
this.readValues = readValues;
@ -50,7 +53,7 @@ public abstract class LLLocalGroupedReactiveRocksIterator<T> {
.generate(() -> {
var readOptions = new ReadOptions(this.readOptions);
readOptions.setFillCache(canFillCache && range.hasMin() && range.hasMax());
return LLLocalDictionary.getRocksIterator(readOptions, range.retain(), db, cfh);
return LLLocalDictionary.getRocksIterator(allowNettyDirect, readOptions, range.retain(), db, cfh);
}, (tuple, sink) -> {
range.retain();
try {

View File

@ -21,6 +21,7 @@ public class LLLocalKeyPrefixReactiveRocksIterator {
private final ColumnFamilyHandle cfh;
private final int prefixLength;
private final LLRange range;
private final boolean allowNettyDirect;
private final ReadOptions readOptions;
private final boolean canFillCache;
private final String debugName;
@ -28,6 +29,7 @@ public class LLLocalKeyPrefixReactiveRocksIterator {
public LLLocalKeyPrefixReactiveRocksIterator(RocksDB db, ByteBufAllocator alloc, ColumnFamilyHandle cfh,
int prefixLength,
LLRange range,
boolean allowNettyDirect,
ReadOptions readOptions,
boolean canFillCache,
String debugName) {
@ -36,6 +38,7 @@ public class LLLocalKeyPrefixReactiveRocksIterator {
this.cfh = cfh;
this.prefixLength = prefixLength;
this.range = range;
this.allowNettyDirect = allowNettyDirect;
this.readOptions = readOptions;
this.canFillCache = canFillCache;
this.debugName = debugName;
@ -50,7 +53,7 @@ public class LLLocalKeyPrefixReactiveRocksIterator {
readOptions.setReadaheadSize(32 * 1024); // 32KiB
readOptions.setFillCache(canFillCache);
}
return LLLocalDictionary.getRocksIterator(readOptions, range.retain(), db, cfh);
return LLLocalDictionary.getRocksIterator(allowNettyDirect, readOptions, range.retain(), db, cfh);
}, (tuple, sink) -> {
range.retain();
try {

View File

@ -13,9 +13,10 @@ public class LLLocalKeyReactiveRocksIterator extends LLLocalReactiveRocksIterato
ByteBufAllocator alloc,
ColumnFamilyHandle cfh,
LLRange range,
boolean allowNettyDirect,
ReadOptions readOptions,
String debugName) {
super(db, alloc, cfh, range, readOptions, false, debugName);
super(db, alloc, cfh, range, allowNettyDirect, readOptions, false, debugName);
}
@Override

View File

@ -474,7 +474,8 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
Column.toString(columnName),
dbScheduler,
(snapshot) -> snapshotsHandles.get(snapshot.getSequenceNumber()),
updateMode
updateMode,
databaseOptions
))
.subscribeOn(dbScheduler);
}

View File

@ -29,6 +29,7 @@ public abstract class LLLocalReactiveRocksIterator<T> {
private final ByteBufAllocator alloc;
private final ColumnFamilyHandle cfh;
private final LLRange range;
private final boolean allowNettyDirect;
private final ReadOptions readOptions;
private final boolean readValues;
private final String debugName;
@ -37,6 +38,7 @@ public abstract class LLLocalReactiveRocksIterator<T> {
ByteBufAllocator alloc,
ColumnFamilyHandle cfh,
LLRange range,
boolean allowNettyDirect,
ReadOptions readOptions,
boolean readValues,
String debugName) {
@ -44,6 +46,7 @@ public abstract class LLLocalReactiveRocksIterator<T> {
this.alloc = alloc;
this.cfh = cfh;
this.range = range;
this.allowNettyDirect = allowNettyDirect;
this.readOptions = readOptions;
this.readValues = readValues;
this.debugName = debugName;
@ -57,7 +60,7 @@ public abstract class LLLocalReactiveRocksIterator<T> {
readOptions.setReadaheadSize(32 * 1024); // 32KiB
readOptions.setFillCache(false);
}
return getRocksIterator(readOptions, range.retain(), db, cfh);
return getRocksIterator(allowNettyDirect, readOptions, range.retain(), db, cfh);
}, (tuple, sink) -> {
range.retain();
try {

View File

@ -58,7 +58,7 @@ public class DbTestUtils {
.then(new LLLocalDatabaseConnection(DbTestUtils.ALLOCATOR, wrkspcPath).connect())
.flatMap(conn -> conn.getDatabase("testdb",
List.of(Column.dictionary("testmap"), Column.special("ints"), Column.special("longs")),
new DatabaseOptions(Map.of(), true, false, true, false, true)
new DatabaseOptions(Map.of(), true, false, true, false, true, true, true)
)),
action,
db -> db.close().then(Mono.fromCallable(() -> {

View File

@ -135,7 +135,7 @@ public class OldDatabaseTests {
.then(new LLLocalDatabaseConnection(PooledByteBufAllocator.DEFAULT, wrkspcPath).connect())
.flatMap(conn -> conn.getDatabase("testdb",
List.of(Column.dictionary("testmap")),
new DatabaseOptions(Map.of(), true, false, true, false, true)
new DatabaseOptions(Map.of(), true, false, true, false, true, true, true)
));
}