Use netty direct memory if possible

This commit is contained in:
Andrea Cavalli 2021-10-30 12:39:56 +02:00
parent 98dcb39ce1
commit c506a7e71b
6 changed files with 49 additions and 46 deletions

View File

@ -13,12 +13,5 @@ public record DatabaseOptions(Map<String, String> extraFlags,
boolean useDirectIO,
boolean allowMemoryMapping,
boolean allowNettyDirect,
boolean useNettyDirect,
int maxOpenFiles) {
public DatabaseOptions {
if (useNettyDirect && !allowNettyDirect) {
throw new IllegalArgumentException("If allowNettyDirect is false, you must also set useNettyDirect to false");
}
}
}

View File

@ -7,6 +7,7 @@ import io.micrometer.core.instrument.MeterRegistry;
import io.net5.buffer.api.Buffer;
import io.net5.buffer.api.BufferAllocator;
import io.net5.buffer.api.Send;
import io.net5.buffer.api.StandardAllocationTypes;
import io.net5.util.internal.PlatformDependent;
import it.cavallium.dbengine.client.DatabaseOptions;
import it.cavallium.dbengine.database.LLUtils;
@ -43,6 +44,7 @@ public sealed abstract class AbstractRocksDBColumn<T extends RocksDB> implements
private final T db;
private final DatabaseOptions opts;
private final boolean nettyDirect;
private final BufferAllocator alloc;
private final ColumnFamilyHandle cfh;
@ -56,6 +58,7 @@ public sealed abstract class AbstractRocksDBColumn<T extends RocksDB> implements
MeterRegistry meterRegistry) {
this.db = db;
this.opts = databaseOptions;
this.nettyDirect = opts.allowNettyDirect() && alloc.getAllocationType() == StandardAllocationTypes.OFF_HEAP;
this.alloc = alloc;
this.cfh = cfh;
@ -95,7 +98,7 @@ public sealed abstract class AbstractRocksDBColumn<T extends RocksDB> implements
if (!cfh.isOwningHandle()) {
throw new IllegalStateException("Column family is closed");
}
if (opts.allowNettyDirect()) {
if (nettyDirect) {
//todo: implement keyMayExist if existsAlmostCertainly is false.
// Unfortunately it's not feasible until RocksDB implements keyMayExist with buffers
@ -202,7 +205,7 @@ public sealed abstract class AbstractRocksDBColumn<T extends RocksDB> implements
}
assert key.isAccessible();
assert value.isAccessible();
if (opts.allowNettyDirect()) {
if (nettyDirect) {
var keyNioBuffer = LLUtils.convertToReadableDirect(alloc, key.send());
try (var ignored1 = keyNioBuffer.buffer().receive()) {
assert keyNioBuffer.byteBuffer().isDirect();
@ -275,7 +278,7 @@ public sealed abstract class AbstractRocksDBColumn<T extends RocksDB> implements
if (!cfh.isOwningHandle()) {
throw new IllegalStateException("Column family is closed");
}
if (opts.allowNettyDirect()) {
if (nettyDirect) {
DirectBuffer keyNioBuffer = LLUtils.convertToReadableDirect(alloc, key.send());
try {
db.delete(cfh, writeOptions, keyNioBuffer.byteBuffer());

View File

@ -1,6 +1,7 @@
package it.cavallium.dbengine.database.disk;
import static io.net5.buffer.Unpooled.wrappedBuffer;
import static io.net5.buffer.api.StandardAllocationTypes.OFF_HEAP;
import static it.cavallium.dbengine.database.LLUtils.MARKER_ROCKSDB;
import static it.cavallium.dbengine.database.LLUtils.fromByteArray;
import static java.util.Objects.requireNonNull;
@ -11,6 +12,7 @@ import io.net5.buffer.api.BufferAllocator;
import io.net5.buffer.api.MemoryManager;
import io.net5.buffer.api.Resource;
import io.net5.buffer.api.Send;
import io.net5.buffer.api.StandardAllocationTypes;
import io.net5.buffer.api.internal.ResourceSupport;
import io.net5.util.internal.PlatformDependent;
import it.cavallium.dbengine.client.BadBlock;
@ -129,8 +131,9 @@ public class LLLocalDictionary implements LLDictionary {
private final Scheduler dbScheduler;
private final Function<LLSnapshot, Snapshot> snapshotResolver;
private final UpdateMode updateMode;
private final BufferAllocator alloc;
private final DatabaseOptions databaseOptions;
private final boolean nettyDirect;
private final BufferAllocator alloc;
public LLLocalDictionary(
BufferAllocator allocator,
@ -151,6 +154,7 @@ public class LLLocalDictionary implements LLDictionary {
this.updateMode = updateMode;
this.databaseOptions = databaseOptions;
alloc = allocator;
this.nettyDirect = databaseOptions.allowNettyDirect() && alloc.getAllocationType() == OFF_HEAP;
}
@Override
@ -269,7 +273,7 @@ public class LLLocalDictionary implements LLDictionary {
readOpts.setFillCache(false);
if (range.hasMin()) {
try (var rangeMin = range.getMin().receive()) {
if (databaseOptions.allowNettyDirect()) {
if (nettyDirect) {
var directBuf = LLUtils.convertToReadableDirect(alloc, rangeMin.send());
cloned1 = directBuf.buffer().receive();
direct1 = directBuf.byteBuffer();
@ -281,7 +285,7 @@ public class LLLocalDictionary implements LLDictionary {
}
if (range.hasMax()) {
try (var rangeMax = range.getMax().receive()) {
if (databaseOptions.allowNettyDirect()) {
if (nettyDirect) {
var directBuf = LLUtils.convertToReadableDirect(alloc, rangeMax.send());
cloned2 = directBuf.buffer().receive();
direct2 = directBuf.byteBuffer();
@ -294,7 +298,7 @@ public class LLLocalDictionary implements LLDictionary {
try (RocksIterator rocksIterator = db.newIterator(readOpts)) {
if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) {
try (var rangeMin = range.getMin().receive()) {
if (databaseOptions.allowNettyDirect()) {
if (nettyDirect) {
var directBuf = LLUtils.convertToReadableDirect(alloc, rangeMin.send());
cloned3 = directBuf.buffer().receive();
direct3 = directBuf.byteBuffer();
@ -601,7 +605,7 @@ public class LLLocalDictionary implements LLDictionary {
for (LLEntry entry : entriesWindow) {
var k = entry.getKey();
var v = entry.getValue();
if (databaseOptions.allowNettyDirect()) {
if (nettyDirect) {
batch.put(cfh, k, v);
} else {
try (var key = k.receive()) {
@ -807,7 +811,7 @@ public class LLLocalDictionary implements LLDictionary {
return Flux.usingWhen(rangeMono,
rangeSend -> Flux.using(
() -> new LLLocalEntryReactiveRocksIterator(db, rangeSend,
databaseOptions.allowNettyDirect(), resolveSnapshot(snapshot)),
nettyDirect, resolveSnapshot(snapshot)),
iterator -> iterator.flux().subscribeOn(dbScheduler, false),
LLLocalReactiveRocksIterator::close
).transform(LLUtils::handleDiscard),
@ -820,7 +824,7 @@ public class LLLocalDictionary implements LLDictionary {
return Flux.usingWhen(rangeMono,
rangeSend -> Flux.using(
() -> new LLLocalGroupedEntryReactiveRocksIterator(db, prefixLength, rangeSend,
databaseOptions.allowNettyDirect(), resolveSnapshot(snapshot)),
nettyDirect, resolveSnapshot(snapshot)),
iterator -> iterator.flux().subscribeOn(dbScheduler, false),
LLLocalGroupedReactiveRocksIterator::close
).transform(LLUtils::handleDiscard),
@ -851,7 +855,7 @@ public class LLLocalDictionary implements LLDictionary {
return Flux.usingWhen(rangeMono,
rangeSend -> Flux.using(
() -> new LLLocalGroupedKeyReactiveRocksIterator(db, prefixLength, rangeSend,
databaseOptions.allowNettyDirect(), resolveSnapshot(snapshot)),
nettyDirect, resolveSnapshot(snapshot)),
iterator -> iterator.flux().subscribeOn(dbScheduler, false),
LLLocalGroupedReactiveRocksIterator::close
).transform(LLUtils::handleDiscard),
@ -873,7 +877,7 @@ public class LLLocalDictionary implements LLDictionary {
}
ro.setVerifyChecksums(true);
var rocksIteratorTuple = getRocksIterator(alloc,
databaseOptions.allowNettyDirect(), ro, range.send(), db
nettyDirect, ro, range.send(), db
);
try {
try (var rocksIterator = rocksIteratorTuple.getT1()) {
@ -916,7 +920,7 @@ public class LLLocalDictionary implements LLDictionary {
() -> new LLLocalKeyPrefixReactiveRocksIterator(db,
prefixLength,
rangeSend,
databaseOptions.allowNettyDirect(),
nettyDirect,
resolveSnapshot(snapshot),
true
),
@ -949,7 +953,7 @@ public class LLLocalDictionary implements LLDictionary {
return Flux.usingWhen(rangeMono,
rangeSend -> Flux.using(
() -> new LLLocalKeyReactiveRocksIterator(db, rangeSend,
databaseOptions.allowNettyDirect(), resolveSnapshot(snapshot)
nettyDirect, resolveSnapshot(snapshot)
),
iterator -> iterator.flux().subscribeOn(dbScheduler, false),
LLLocalReactiveRocksIterator::close
@ -974,7 +978,7 @@ public class LLLocalDictionary implements LLDictionary {
try (var opts = new ReadOptions(EMPTY_READ_OPTIONS)) {
ReleasableSlice minBound;
if (range.hasMin()) {
minBound = setIterateBound(alloc, databaseOptions.allowNettyDirect(),
minBound = setIterateBound(alloc, nettyDirect,
opts,
IterateBound.LOWER,
range.getMin()
@ -985,7 +989,7 @@ public class LLLocalDictionary implements LLDictionary {
try {
ReleasableSlice maxBound;
if (range.hasMax()) {
maxBound = setIterateBound(alloc, databaseOptions.allowNettyDirect(),
maxBound = setIterateBound(alloc, nettyDirect,
opts,
IterateBound.UPPER,
range.getMax()
@ -998,7 +1002,7 @@ public class LLLocalDictionary implements LLDictionary {
SafeCloseable seekTo;
try (RocksIterator it = db.newIterator(opts)) {
if (!PREFER_SEEK_TO_FIRST && range.hasMin()) {
seekTo = rocksIterSeekTo(alloc, databaseOptions.allowNettyDirect(), it, range.getMin());
seekTo = rocksIterSeekTo(alloc, nettyDirect, it, range.getMin());
} else {
seekTo = null;
it.seekToFirst();
@ -1076,7 +1080,7 @@ public class LLLocalDictionary implements LLDictionary {
)) {
for (LLEntry entry : entriesList) {
assert entry.isAccessible();
if (databaseOptions.allowNettyDirect()) {
if (nettyDirect) {
batch.put(cfh, entry.getKey(), entry.getValue());
} else {
batch.put(cfh,
@ -1151,7 +1155,7 @@ public class LLLocalDictionary implements LLDictionary {
readOpts.setFillCache(false);
ReleasableSlice minBound;
if (range.hasMin()) {
minBound = setIterateBound(alloc, databaseOptions.allowNettyDirect(), readOpts,
minBound = setIterateBound(alloc, nettyDirect, readOpts,
IterateBound.LOWER, range.getMin());
} else {
minBound = emptyReleasableSlice();
@ -1159,7 +1163,7 @@ public class LLLocalDictionary implements LLDictionary {
try {
ReleasableSlice maxBound;
if (range.hasMax()) {
maxBound = setIterateBound(alloc, databaseOptions.allowNettyDirect(), readOpts,
maxBound = setIterateBound(alloc, nettyDirect, readOpts,
IterateBound.UPPER, range.getMax());
} else {
maxBound = emptyReleasableSlice();
@ -1167,7 +1171,7 @@ public class LLLocalDictionary implements LLDictionary {
try (var rocksIterator = db.newIterator(readOpts)) {
SafeCloseable seekTo;
if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) {
seekTo = rocksIterSeekTo(alloc, databaseOptions.allowNettyDirect(), rocksIterator, range.getMin());
seekTo = rocksIterSeekTo(alloc, nettyDirect, rocksIterator, range.getMin());
} else {
seekTo = null;
rocksIterator.seekToFirst();
@ -1203,7 +1207,7 @@ public class LLLocalDictionary implements LLDictionary {
readOpts.setFillCache(false);
ReleasableSlice minBound;
if (range.hasMin()) {
minBound = setIterateBound(alloc, databaseOptions.allowNettyDirect(), readOpts,
minBound = setIterateBound(alloc, nettyDirect, readOpts,
IterateBound.LOWER, range.getMin());
} else {
minBound = emptyReleasableSlice();
@ -1211,7 +1215,7 @@ public class LLLocalDictionary implements LLDictionary {
try {
ReleasableSlice maxBound;
if (range.hasMax()) {
maxBound = setIterateBound(alloc, databaseOptions.allowNettyDirect(), readOpts, IterateBound.UPPER,
maxBound = setIterateBound(alloc, nettyDirect, readOpts, IterateBound.UPPER,
range.getMax());
} else {
maxBound = emptyReleasableSlice();
@ -1219,7 +1223,7 @@ public class LLLocalDictionary implements LLDictionary {
try (var rocksIterator = db.newIterator(readOpts)) {
SafeCloseable seekTo;
if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) {
seekTo = rocksIterSeekTo(alloc, databaseOptions.allowNettyDirect(), rocksIterator, range.getMin());
seekTo = rocksIterSeekTo(alloc, nettyDirect, rocksIterator, range.getMin());
} else {
seekTo = null;
rocksIterator.seekToFirst();
@ -1415,7 +1419,7 @@ public class LLLocalDictionary implements LLDictionary {
readOpts.setVerifyChecksums(VERIFY_CHECKSUMS_WHEN_NOT_NEEDED);
ReleasableSlice minBound;
if (range.hasMin()) {
minBound = setIterateBound(alloc, databaseOptions.allowNettyDirect(), readOpts,
minBound = setIterateBound(alloc, nettyDirect, readOpts,
IterateBound.LOWER, range.getMin());
} else {
minBound = emptyReleasableSlice();
@ -1423,7 +1427,7 @@ public class LLLocalDictionary implements LLDictionary {
try {
ReleasableSlice maxBound;
if (range.hasMax()) {
maxBound = setIterateBound(alloc, databaseOptions.allowNettyDirect(), readOpts,
maxBound = setIterateBound(alloc, nettyDirect, readOpts,
IterateBound.UPPER, range.getMax());
} else {
maxBound = emptyReleasableSlice();
@ -1436,7 +1440,7 @@ public class LLLocalDictionary implements LLDictionary {
try (var rocksIterator = db.newIterator(readOpts)) {
SafeCloseable seekTo;
if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) {
seekTo = rocksIterSeekTo(alloc, databaseOptions.allowNettyDirect(),
seekTo = rocksIterSeekTo(alloc, nettyDirect,
rocksIterator, range.getMin());
} else {
seekTo = null;
@ -1481,7 +1485,7 @@ public class LLLocalDictionary implements LLDictionary {
try (var readOpts = new ReadOptions(resolveSnapshot(snapshot))) {
ReleasableSlice minBound;
if (range.hasMin()) {
minBound = setIterateBound(alloc, databaseOptions.allowNettyDirect(), readOpts,
minBound = setIterateBound(alloc, nettyDirect, readOpts,
IterateBound.LOWER, range.getMin());
} else {
minBound = emptyReleasableSlice();
@ -1489,7 +1493,7 @@ public class LLLocalDictionary implements LLDictionary {
try {
ReleasableSlice maxBound;
if (range.hasMax()) {
maxBound = setIterateBound(alloc, databaseOptions.allowNettyDirect(), readOpts,
maxBound = setIterateBound(alloc, nettyDirect, readOpts,
IterateBound.UPPER, range.getMax());
} else {
maxBound = emptyReleasableSlice();
@ -1497,7 +1501,7 @@ public class LLLocalDictionary implements LLDictionary {
try (var rocksIterator = db.newIterator(readOpts)) {
SafeCloseable seekTo;
if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) {
seekTo = rocksIterSeekTo(alloc, databaseOptions.allowNettyDirect(),
seekTo = rocksIterSeekTo(alloc, nettyDirect,
rocksIterator, range.getMin());
} else {
seekTo = null;
@ -1543,7 +1547,7 @@ public class LLLocalDictionary implements LLDictionary {
try (var readOpts = new ReadOptions(resolveSnapshot(snapshot))) {
ReleasableSlice minBound;
if (range.hasMin()) {
minBound = setIterateBound(alloc, databaseOptions.allowNettyDirect(), readOpts,
minBound = setIterateBound(alloc, nettyDirect, readOpts,
IterateBound.LOWER, range.getMin());
} else {
minBound = emptyReleasableSlice();
@ -1551,7 +1555,7 @@ public class LLLocalDictionary implements LLDictionary {
try {
ReleasableSlice maxBound;
if (range.hasMax()) {
maxBound = setIterateBound(alloc, databaseOptions.allowNettyDirect(), readOpts,
maxBound = setIterateBound(alloc, nettyDirect, readOpts,
IterateBound.UPPER, range.getMax());
} else {
maxBound = emptyReleasableSlice();
@ -1559,7 +1563,7 @@ public class LLLocalDictionary implements LLDictionary {
try (var rocksIterator = db.newIterator(readOpts)) {
SafeCloseable seekTo;
if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) {
seekTo = rocksIterSeekTo(alloc, databaseOptions.allowNettyDirect(),
seekTo = rocksIterSeekTo(alloc, nettyDirect,
rocksIterator, range.getMin());
} else {
seekTo = null;
@ -1712,7 +1716,7 @@ public class LLLocalDictionary implements LLDictionary {
try (var readOpts = new ReadOptions(getReadOptions(null))) {
ReleasableSlice minBound;
if (range.hasMin()) {
minBound = setIterateBound(alloc, databaseOptions.allowNettyDirect(), readOpts,
minBound = setIterateBound(alloc, nettyDirect, readOpts,
IterateBound.LOWER, range.getMin());
} else {
minBound = emptyReleasableSlice();
@ -1720,7 +1724,7 @@ public class LLLocalDictionary implements LLDictionary {
try {
ReleasableSlice maxBound;
if (range.hasMax()) {
maxBound = setIterateBound(alloc, databaseOptions.allowNettyDirect(), readOpts,
maxBound = setIterateBound(alloc, nettyDirect, readOpts,
IterateBound.UPPER, range.getMax());
} else {
maxBound = emptyReleasableSlice();
@ -1728,7 +1732,7 @@ public class LLLocalDictionary implements LLDictionary {
try (RocksIterator rocksIterator = db.newIterator(readOpts)) {
SafeCloseable seekTo;
if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) {
seekTo = rocksIterSeekTo(alloc, databaseOptions.allowNettyDirect(),
seekTo = rocksIterSeekTo(alloc, nettyDirect,
rocksIterator, range.getMin());
} else {
seekTo = null;

View File

@ -1,5 +1,6 @@
package it.cavallium.dbengine.database.disk;
import static io.net5.buffer.api.StandardAllocationTypes.OFF_HEAP;
import static it.cavallium.dbengine.database.LLUtils.MARKER_ROCKSDB;
import io.micrometer.core.instrument.MeterRegistry;
@ -82,6 +83,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
private final Path dbPath;
private final String name;
private final DatabaseOptions databaseOptions;
private final boolean nettyDirect;
private final boolean enableColumnsBug;
private RocksDB db;
@ -99,9 +101,10 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
DatabaseOptions databaseOptions) throws IOException {
this.name = name;
this.allocator = allocator;
this.nettyDirect = databaseOptions.allowNettyDirect() && allocator.getAllocationType() == OFF_HEAP;
this.meterRegistry = meterRegistry;
if (databaseOptions.allowNettyDirect()) {
if (nettyDirect) {
if (!PlatformDependent.hasUnsafe()) {
throw new UnsupportedOperationException("Please enable unsafe support or disable netty direct buffers",
PlatformDependent.getUnsafeUnavailabilityCause()

View File

@ -65,7 +65,7 @@ public class LocalTemporaryDbGenerator implements TemporaryDbGenerator {
conn.getDatabase("testdb",
List.of(Column.dictionary("testmap"), Column.special("ints"), Column.special("longs")),
new DatabaseOptions(Map.of(), true, false, true, false,
true, canUseNettyDirect, canUseNettyDirect, -1)
true, canUseNettyDirect, -1)
),
conn.getLuceneIndex("testluceneindex1",
1,

View File

@ -37,7 +37,7 @@ public class MemoryTemporaryDbGenerator implements TemporaryDbGenerator {
.zip(
conn.getDatabase("testdb",
List.of(Column.dictionary("testmap"), Column.special("ints"), Column.special("longs")),
new DatabaseOptions(Map.of(), true, false, true, false, true, canUseNettyDirect, canUseNettyDirect, -1)
new DatabaseOptions(Map.of(), true, false, true, false, true, canUseNettyDirect, -1)
),
conn.getLuceneIndex("testluceneindex1",
1,