This commit is contained in:
Andrea Cavalli 2021-05-03 02:45:29 +02:00
parent 0d3157ec3c
commit 4aa18fcd60
2 changed files with 75 additions and 20 deletions

View File

@ -74,6 +74,10 @@ public class LLLocalDictionary implements LLDictionary {
static final boolean PREFER_SEEK_TO_FIRST = false;
static final boolean VERIFY_CHECKSUMS_WHEN_NOT_NEEDED = false;
public static final boolean DEBUG_PREFIXES_WHEN_ASSERTIONS_ARE_ENABLED = true;
/**
* Default: true. Use false to debug problems with windowing.
*/
static final boolean USE_WINDOW_IN_SET_RANGE = true;
/**
* Default: true. Use false to debug problems with write batches.
*/
@ -81,7 +85,7 @@ public class LLLocalDictionary implements LLDictionary {
/**
* Default: true. Use false to debug problems with capped write batches.
*/
static final boolean USE_CAPPED_WRITE_BATCH_IN_SET_RANGE = false;
static final boolean USE_CAPPED_WRITE_BATCH_IN_SET_RANGE = true;
static final boolean PARALLEL_EXACT_SIZE = true;
private static final int STRIPES = 512;
@ -996,21 +1000,29 @@ public class LLLocalDictionary implements LLDictionary {
@Override
public Mono<Void> setRange(LLRange range, Flux<Entry<ByteBuf, ByteBuf>> entries) {
if (USE_WRITE_BATCHES_IN_SET_RANGE) {
return entries
.window(MULTI_GET_WINDOW)
.flatMap(keysWindowFlux -> keysWindowFlux
.collectList()
.doOnDiscard(Entry.class, discardedEntry -> {
//noinspection unchecked
var entry = (Entry<ByteBuf, ByteBuf>) discardedEntry;
entry.getKey().release();
entry.getValue().release();
})
.flatMap(entriesList -> Mono
if (USE_WINDOW_IN_SET_RANGE) {
return Mono
.<Void>fromCallable(() -> {
try {
if (USE_CAPPED_WRITE_BATCH_IN_SET_RANGE) {
if (!USE_WRITE_BATCHES_IN_SET_RANGE) {
var opts = new ReadOptions(EMPTY_READ_OPTIONS);
if (range.hasMin()) {
setIterateBound(opts, IterateBound.LOWER, range.getMin().retain());
}
if (range.hasMax()) {
setIterateBound(opts, IterateBound.UPPER, range.getMax().retain());
}
try (RocksIterator it = db.newIterator(cfh, opts)) {
if (!PREFER_SEEK_TO_FIRST && range.hasMin()) {
rocksIterSeekTo(it, range.getMin().retain());
} else {
it.seekToFirst();
}
while (it.isValid()) {
db.delete(cfh, it.key());
it.next();
}
}
} else if (USE_CAPPED_WRITE_BATCH_IN_SET_RANGE) {
try (var batch = new CappedWriteBatch(db,
CAPPED_WRITE_BATCH_CAP,
RESERVED_WRITE_BATCH_SIZE,
@ -1022,9 +1034,6 @@ public class LLLocalDictionary implements LLDictionary {
} else {
deleteSmallRangeWriteBatch(batch, range.retain());
}
for (Entry<ByteBuf, ByteBuf> entry : entriesList) {
batch.put(cfh, entry.getKey().retain(), entry.getValue().retain());
}
batch.writeToDbAndClose();
}
} else {
@ -1037,6 +1046,47 @@ public class LLLocalDictionary implements LLDictionary {
db.write(EMPTY_WRITE_OPTIONS, batch);
batch.clear();
}
}
return null;
})
.subscribeOn(dbScheduler)
.thenMany(entries
.window(MULTI_GET_WINDOW)
.doOnDiscard(Entry.class, discardedEntry -> {
//noinspection unchecked
var entry = (Entry<ByteBuf, ByteBuf>) discardedEntry;
entry.getKey().release();
entry.getValue().release();
})
)
.flatMap(keysWindowFlux -> keysWindowFlux
.collectList()
.doOnDiscard(Entry.class, discardedEntry -> {
//noinspection unchecked
var entry = (Entry<ByteBuf, ByteBuf>) discardedEntry;
entry.getKey().release();
entry.getValue().release();
})
.flatMap(entriesList -> Mono
.<Void>fromCallable(() -> {
try {
if (!USE_WRITE_BATCHES_IN_SET_RANGE) {
for (Entry<ByteBuf, ByteBuf> entry : entriesList) {
db.put(cfh, EMPTY_WRITE_OPTIONS, entry.getKey().nioBuffer(), entry.getValue().nioBuffer());
}
} else if (USE_CAPPED_WRITE_BATCH_IN_SET_RANGE) {
try (var batch = new CappedWriteBatch(db,
CAPPED_WRITE_BATCH_CAP,
RESERVED_WRITE_BATCH_SIZE,
MAX_WRITE_BATCH_SIZE,
BATCH_WRITE_OPTIONS
)) {
for (Entry<ByteBuf, ByteBuf> entry : entriesList) {
batch.put(cfh, entry.getKey().retain(), entry.getValue().retain());
}
batch.writeToDbAndClose();
}
} else {
try (var batch = new WriteBatch(RESERVED_WRITE_BATCH_SIZE)) {
for (Entry<ByteBuf, ByteBuf> entry : entriesList) {
batch.put(cfh, LLUtils.toArray(entry.getKey()), LLUtils.toArray(entry.getValue()));
@ -1066,6 +1116,11 @@ public class LLLocalDictionary implements LLDictionary {
.onErrorMap(cause -> new IOException("Failed to write range", cause))
.doFinally(s -> range.release());
} else {
if (USE_WRITE_BATCHES_IN_SET_RANGE) {
return Mono.fromCallable(() -> {
throw new UnsupportedOperationException("Can't use write batches in setRange without window. Please fix params");
});
}
return Flux
.defer(() -> this.getRange(null, range.retain(), false))
.flatMap(oldValue -> Mono

View File

@ -20,7 +20,7 @@ public class CappedWriteBatch extends WriteBatch {
/**
* Default: true, Use false to debug problems with direct buffers
*/
private static final boolean USE_FAST_DIRECT_BUFFERS = false;
private static final boolean USE_FAST_DIRECT_BUFFERS = true;
private final RocksDB db;
private final int cap;
private final WriteOptions writeOptions;