CavalliumDBEngine/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java

1759 lines
60 KiB
Java
Raw Normal View History

2020-12-07 22:15:18 +01:00
package it.cavallium.dbengine.database.disk;
2022-03-16 13:47:56 +01:00
import static io.netty5.buffer.api.StandardAllocationTypes.OFF_HEAP;
2021-09-10 12:13:52 +02:00
import static it.cavallium.dbengine.database.LLUtils.MARKER_ROCKSDB;
2021-08-29 23:18:03 +02:00
import static it.cavallium.dbengine.database.LLUtils.fromByteArray;
2022-03-16 19:19:26 +01:00
import static it.cavallium.dbengine.database.LLUtils.isReadOnlyDirect;
import static it.cavallium.dbengine.database.LLUtils.toStringSafe;
2021-08-29 23:18:03 +02:00
import static java.util.Objects.requireNonNull;
2021-10-20 01:51:34 +02:00
import static java.util.Objects.requireNonNullElse;
2021-08-29 23:18:03 +02:00
2021-12-30 17:28:06 +01:00
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Timer;
2022-03-16 13:47:56 +01:00
import io.netty5.buffer.api.Buffer;
import io.netty5.buffer.api.BufferAllocator;
2022-03-16 19:19:26 +01:00
import io.netty5.buffer.api.ReadableComponent;
2022-03-16 13:47:56 +01:00
import io.netty5.buffer.api.Send;
import it.cavallium.dbengine.client.BadBlock;
2022-03-02 12:34:30 +01:00
import it.cavallium.dbengine.database.ColumnUtils;
2021-08-29 23:18:03 +02:00
import it.cavallium.dbengine.database.LLDelta;
2021-01-17 18:31:25 +01:00
import it.cavallium.dbengine.database.LLDictionary;
import it.cavallium.dbengine.database.LLDictionaryResultType;
2021-08-28 22:42:51 +02:00
import it.cavallium.dbengine.database.LLEntry;
2021-01-30 00:24:55 +01:00
import it.cavallium.dbengine.database.LLRange;
2021-01-17 18:31:25 +01:00
import it.cavallium.dbengine.database.LLSnapshot;
import it.cavallium.dbengine.database.LLUtils;
2021-09-01 00:01:56 +02:00
import it.cavallium.dbengine.database.SafeCloseable;
2021-02-13 01:31:24 +01:00
import it.cavallium.dbengine.database.UpdateMode;
2021-05-08 03:09:00 +02:00
import it.cavallium.dbengine.database.UpdateReturnMode;
2021-11-08 10:49:59 +01:00
import it.cavallium.dbengine.database.serialization.KVSerializationFunction;
2021-08-22 21:23:22 +02:00
import it.cavallium.dbengine.database.serialization.SerializationFunction;
2022-03-02 12:34:30 +01:00
import it.cavallium.dbengine.rpc.current.data.DatabaseOptions;
2020-12-07 22:15:18 +01:00
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
2020-12-07 22:15:18 +01:00
import java.util.Objects;
2021-07-17 11:52:08 +02:00
import java.util.Optional;
2021-03-18 19:53:32 +01:00
import java.util.concurrent.Callable;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
2020-12-07 22:15:18 +01:00
import java.util.function.Function;
2021-03-18 19:53:32 +01:00
import java.util.stream.IntStream;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
2021-12-27 17:34:44 +01:00
import org.apache.logging.log4j.util.Supplier;
2020-12-07 22:15:18 +01:00
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.rocksdb.AbstractSlice;
import org.rocksdb.CappedWriteBatch;
2020-12-07 22:15:18 +01:00
import org.rocksdb.ColumnFamilyHandle;
2021-03-20 12:41:11 +01:00
import org.rocksdb.CompactRangeOptions;
import org.rocksdb.DirectSlice;
2020-12-07 22:15:18 +01:00
import org.rocksdb.FlushOptions;
import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDBException;
import org.rocksdb.RocksIterator;
2021-03-13 19:01:36 +01:00
import org.rocksdb.Slice;
2020-12-07 22:15:18 +01:00
import org.rocksdb.Snapshot;
2021-05-02 19:18:15 +02:00
import org.rocksdb.WriteBatch;
2020-12-07 22:15:18 +01:00
import org.rocksdb.WriteOptions;
2021-01-30 00:24:55 +01:00
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
2021-07-17 11:52:08 +02:00
import reactor.util.function.Tuple2;
2021-04-03 19:09:06 +02:00
import reactor.util.function.Tuple3;
import reactor.util.function.Tuples;
2020-12-07 22:15:18 +01:00
public class LLLocalDictionary implements LLDictionary {
protected static final Logger logger = LogManager.getLogger(LLLocalDictionary.class);
2021-03-18 19:53:32 +01:00
private static final boolean USE_CURRENT_FASTSIZE_FOR_OLD_SNAPSHOTS = false;
2020-12-07 22:15:18 +01:00
static final int RESERVED_WRITE_BATCH_SIZE = 2 * 1024 * 1024; // 2MiB
static final long MAX_WRITE_BATCH_SIZE = 1024L * 1024L * 1024L; // 1GiB
static final int CAPPED_WRITE_BATCH_CAP = 50000; // 50K operations
static final int MULTI_GET_WINDOW = 16;
static final ReadOptions EMPTY_READ_OPTIONS = new UnreleasableReadOptions(new UnmodifiableReadOptions());
static final WriteOptions EMPTY_WRITE_OPTIONS = new UnreleasableWriteOptions(new UnmodifiableWriteOptions());
static final WriteOptions BATCH_WRITE_OPTIONS = new UnreleasableWriteOptions(new UnmodifiableWriteOptions());
2022-03-24 21:14:17 +01:00
static final boolean PREFER_AUTO_SEEK_BOUND = false;
/**
2021-08-16 10:36:54 +02:00
* It used to be false,
* now it's true to avoid crashes during iterations on completely corrupted files
*/
2022-03-20 14:33:27 +01:00
static final boolean VERIFY_CHECKSUMS_WHEN_NOT_NEEDED = false;
2021-05-03 02:45:29 +02:00
/**
* Default: true. Use false to debug problems with windowing.
*/
static final boolean USE_WINDOW_IN_SET_RANGE = true;
2021-05-02 19:18:15 +02:00
/**
* Default: true. Use false to debug problems with write batches.
*/
2021-05-03 12:29:15 +02:00
static final boolean USE_WRITE_BATCHES_IN_PUT_MULTI = true;
/**
* Default: true. Use false to debug problems with write batches.
*/
static final boolean USE_WRITE_BATCHES_IN_SET_RANGE = true;
2021-05-02 19:18:15 +02:00
/**
* Default: true. Use false to debug problems with capped write batches.
*/
2021-05-03 02:45:29 +02:00
static final boolean USE_CAPPED_WRITE_BATCH_IN_SET_RANGE = true;
2021-05-03 12:29:15 +02:00
/**
* Default: true. Use false to debug problems with write batches deletes.
*/
static final boolean USE_WRITE_BATCH_IN_SET_RANGE_DELETE = false;
2021-03-18 19:53:32 +01:00
static final boolean PARALLEL_EXACT_SIZE = true;
2020-12-07 22:15:18 +01:00
2021-01-30 00:24:55 +01:00
private static final byte[] FIRST_KEY = new byte[]{};
2021-05-03 00:29:26 +02:00
/**
* Default: true
*/
private static final boolean USE_DIRECT_BUFFER_BOUNDS = true;
/**
* 1KiB dummy buffer, write only, used for debugging purposes
*/
private static final ByteBuffer DUMMY_WRITE_ONLY_BYTE_BUFFER = ByteBuffer.allocateDirect(1024);
2021-10-20 01:51:34 +02:00
private final RocksDBColumn db;
2020-12-07 22:15:18 +01:00
private final ColumnFamilyHandle cfh;
private final String databaseName;
2021-06-26 02:35:33 +02:00
private final String columnName;
private final Scheduler dbScheduler;
2020-12-07 22:15:18 +01:00
private final Function<LLSnapshot, Snapshot> snapshotResolver;
2021-02-13 01:31:24 +01:00
private final UpdateMode updateMode;
2021-10-30 12:39:56 +02:00
private final boolean nettyDirect;
private final BufferAllocator alloc;
2020-12-07 22:15:18 +01:00
2021-12-30 17:28:06 +01:00
private final Counter startedUpdates;
private final Counter endedUpdates;
private final Timer updateTime;
private final Counter startedGet;
private final Counter endedGet;
private final Timer getTime;
private final Counter startedContains;
private final Counter endedContains;
private final Timer containsTime;
private final Counter startedPut;
private final Counter endedPut;
private final Timer putTime;
private final Counter startedRemove;
private final Counter endedRemove;
private final Timer removeTime;
2021-05-03 21:41:51 +02:00
public LLLocalDictionary(
2021-08-29 23:18:03 +02:00
BufferAllocator allocator,
2021-10-20 01:51:34 +02:00
@NotNull RocksDBColumn db,
2020-12-07 22:15:18 +01:00
String databaseName,
2021-06-26 02:35:33 +02:00
String columnName,
Scheduler dbScheduler,
2021-02-13 01:31:24 +01:00
Function<LLSnapshot, Snapshot> snapshotResolver,
2021-06-29 23:31:02 +02:00
UpdateMode updateMode,
DatabaseOptions databaseOptions) {
2021-08-29 23:18:03 +02:00
requireNonNull(db);
2020-12-07 22:15:18 +01:00
this.db = db;
2021-10-20 01:51:34 +02:00
this.cfh = db.getColumnFamilyHandle();
2020-12-07 22:15:18 +01:00
this.databaseName = databaseName;
2021-06-26 02:35:33 +02:00
this.columnName = columnName;
this.dbScheduler = dbScheduler;
2020-12-07 22:15:18 +01:00
this.snapshotResolver = snapshotResolver;
2021-02-13 01:31:24 +01:00
this.updateMode = updateMode;
2021-05-03 21:41:51 +02:00
alloc = allocator;
2021-10-30 12:39:56 +02:00
this.nettyDirect = databaseOptions.allowNettyDirect() && alloc.getAllocationType() == OFF_HEAP;
2021-12-30 17:28:06 +01:00
var meterRegistry = db.getMeterRegistry();
this.startedGet = meterRegistry.counter("db.read.map.get.started.counter", "db.name", databaseName, "db.column", columnName);
this.endedGet = meterRegistry.counter("db.read.map.get.ended.counter", "db.name", databaseName, "db.column", columnName);
this.getTime = Timer
2021-12-30 18:20:56 +01:00
.builder("db.read.map.get.timer")
2021-12-30 17:28:06 +01:00
.publishPercentiles(0.2, 0.5, 0.95)
.publishPercentileHistogram()
.tags("db.name", databaseName, "db.column", columnName)
.register(meterRegistry);
2021-12-30 18:20:56 +01:00
this.startedContains = meterRegistry.counter("db.read.map.contains.started.counter", "db.name", databaseName, "db.column", columnName);
this.endedContains = meterRegistry.counter("db.read.map.contains.ended.counter", "db.name", databaseName, "db.column", columnName);
2021-12-30 17:28:06 +01:00
this.containsTime = Timer
2021-12-30 18:20:56 +01:00
.builder("db.read.map.contains.timer")
2021-12-30 17:28:06 +01:00
.publishPercentiles(0.2, 0.5, 0.95)
.publishPercentileHistogram()
.tags("db.name", databaseName, "db.column", columnName)
.register(meterRegistry);
this.startedUpdates = meterRegistry.counter("db.write.map.update.started.counter", "db.name", databaseName, "db.column", columnName);
this.endedUpdates = meterRegistry.counter("db.write.map.update.ended.counter", "db.name", databaseName, "db.column", columnName);
this.updateTime = Timer
.builder("db.write.map.update.timer")
.publishPercentiles(0.2, 0.5, 0.95)
.publishPercentileHistogram()
.tags("db.name", databaseName, "db.column", columnName)
.register(meterRegistry);
this.startedPut = meterRegistry.counter("db.write.map.put.started.counter", "db.name", databaseName, "db.column", columnName);
this.endedPut = meterRegistry.counter("db.write.map.put.ended.counter", "db.name", databaseName, "db.column", columnName);
this.putTime = Timer
.builder("db.write.map.put.timer")
.publishPercentiles(0.2, 0.5, 0.95)
.publishPercentileHistogram()
.tags("db.name", databaseName, "db.column", columnName)
.register(meterRegistry);
this.startedRemove = meterRegistry.counter("db.write.map.remove.started.counter", "db.name", databaseName, "db.column", columnName);
this.endedRemove = meterRegistry.counter("db.write.map.remove.ended.counter", "db.name", databaseName, "db.column", columnName);
this.removeTime = Timer
.builder("db.write.map.remove.timer")
.publishPercentiles(0.2, 0.5, 0.95)
.publishPercentileHistogram()
.tags("db.name", databaseName, "db.column", columnName)
.register(meterRegistry);
2020-12-07 22:15:18 +01:00
}
@Override
public String getDatabaseName() {
return databaseName;
}
2021-06-26 02:35:33 +02:00
public String getColumnName() {
return columnName;
}
2021-06-19 21:55:20 +02:00
/**
2021-08-16 10:36:54 +02:00
* Please don't modify the returned ReadOptions!
* If you want to modify it, wrap it into a new ReadOptions!
2021-06-19 21:55:20 +02:00
*/
2020-12-07 22:15:18 +01:00
private ReadOptions resolveSnapshot(LLSnapshot snapshot) {
if (snapshot != null) {
return getReadOptions(snapshotResolver.apply(snapshot));
} else {
return EMPTY_READ_OPTIONS;
}
}
2021-06-19 21:55:20 +02:00
/**
2021-08-16 10:36:54 +02:00
* Please don't modify the returned ReadOptions!
* If you want to modify it, wrap it into a new ReadOptions!
2021-06-19 21:55:20 +02:00
*/
2020-12-07 22:15:18 +01:00
private ReadOptions getReadOptions(Snapshot snapshot) {
if (snapshot != null) {
return new ReadOptions().setSnapshot(snapshot);
} else {
return EMPTY_READ_OPTIONS;
}
}
@Override
2021-08-29 23:18:03 +02:00
public BufferAllocator getAllocator() {
return alloc;
}
2021-08-29 23:18:03 +02:00
private <T> @NotNull Mono<T> runOnDb(Callable<@Nullable T> callable) {
return Mono.fromCallable(callable).subscribeOn(dbScheduler);
}
2021-05-12 01:25:59 +02:00
@Override
2022-03-24 21:14:17 +01:00
public Mono<Send<Buffer>> get(@Nullable LLSnapshot snapshot, Mono<Send<Buffer>> keyMono) {
return keyMono
2022-01-26 14:22:54 +01:00
.publishOn(dbScheduler)
.<Send<Buffer>>handle((keySend, sink) -> {
2021-08-29 23:18:03 +02:00
try (var key = keySend.receive()) {
logger.trace(MARKER_ROCKSDB, "Reading {}", () -> toStringSafe(key));
2021-08-29 23:18:03 +02:00
try {
var readOptions = requireNonNullElse(resolveSnapshot(snapshot), EMPTY_READ_OPTIONS);
2021-12-30 17:28:06 +01:00
Buffer result;
startedGet.increment();
try {
2022-03-02 18:33:58 +01:00
result = getTime.recordCallable(() -> db.get(readOptions, key));
2021-12-30 17:28:06 +01:00
} finally {
endedGet.increment();
}
logger.trace(MARKER_ROCKSDB, "Read {}: {}", () -> toStringSafe(key), () -> toStringSafe(result));
if (result != null) {
sink.next(result.send());
2021-08-29 23:18:03 +02:00
} else {
sink.complete();
2021-08-29 23:18:03 +02:00
}
} catch (RocksDBException ex) {
sink.error(new IOException("Failed to read " + toStringSafe(key) + ": " + ex.getMessage()));
2021-08-29 23:18:03 +02:00
} catch (Exception ex) {
sink.error(ex);
2021-05-12 01:25:59 +02:00
}
}
});
}
2020-12-07 22:15:18 +01:00
@Override
public Mono<Boolean> isRangeEmpty(@Nullable LLSnapshot snapshot, Mono<Send<LLRange>> rangeMono, boolean fillCache) {
return rangeMono.publishOn(dbScheduler).handle((rangeSend, sink) -> {
try (var range = rangeSend.receive()) {
assert !Schedulers.isInNonBlockingThread() : "Called isRangeEmpty in a nonblocking thread";
startedContains.increment();
try {
2022-02-02 23:29:11 +01:00
Boolean isRangeEmpty = containsTime.recordCallable(() -> {
if (range.isSingle()) {
return !containsKey(snapshot, range.getSingleUnsafe());
} else {
// Temporary resources to release after finished
AbstractSlice<?> slice1 = null;
AbstractSlice<?> slice2 = null;
2020-12-07 22:15:18 +01:00
try (var readOpts = new ReadOptions(resolveSnapshot(snapshot))) {
readOpts.setVerifyChecksums(VERIFY_CHECKSUMS_WHEN_NOT_NEEDED);
readOpts.setFillCache(fillCache);
if (range.hasMin()) {
2022-03-16 19:19:26 +01:00
if (nettyDirect && isReadOnlyDirect(range.getMinUnsafe())) {
readOpts.setIterateLowerBound(slice1 = new DirectSlice(
((ReadableComponent) range.getMinUnsafe()).readableBuffer(),
range.getMinUnsafe().readableBytes()
));
} else {
readOpts.setIterateLowerBound(slice1 = new Slice(LLUtils.toArray(range.getMinUnsafe())));
}
}
if (range.hasMax()) {
2022-03-16 19:19:26 +01:00
if (nettyDirect && isReadOnlyDirect(range.getMaxUnsafe())) {
readOpts.setIterateUpperBound(slice2 = new DirectSlice(
((ReadableComponent) range.getMaxUnsafe()).readableBuffer(),
range.getMaxUnsafe().readableBytes()
));
} else {
readOpts.setIterateUpperBound(slice2 = new Slice(LLUtils.toArray(range.getMaxUnsafe())));
}
}
try (RocksIterator rocksIterator = db.newIterator(readOpts)) {
2022-03-24 21:14:17 +01:00
if (!LLLocalDictionary.PREFER_AUTO_SEEK_BOUND && range.hasMin()) {
2022-03-16 19:19:26 +01:00
if (nettyDirect && isReadOnlyDirect(range.getMinUnsafe())) {
rocksIterator.seek(((ReadableComponent) range.getMinUnsafe()).readableBuffer());
} else {
rocksIterator.seek(LLUtils.toArray(range.getMinUnsafe()));
}
} else {
rocksIterator.seekToFirst();
}
rocksIterator.status();
2022-02-02 23:29:11 +01:00
return !rocksIterator.isValid();
}
} finally {
if (slice1 != null) {
slice1.close();
}
if (slice2 != null) {
slice2.close();
2021-12-30 17:28:06 +01:00
}
}
}
});
2022-02-02 23:29:11 +01:00
assert isRangeEmpty != null;
sink.next(isRangeEmpty);
2022-01-26 19:03:51 +01:00
} catch (RocksDBException ex) {
sink.error(new RocksDBException("Failed to read range " + LLUtils.toStringSafe(range)
+ ": " + ex.getMessage()));
} finally {
endedContains.increment();
}
} catch (Throwable ex) {
sink.error(ex);
}
});
2021-05-12 01:25:59 +02:00
}
private boolean containsKey(@Nullable LLSnapshot snapshot, Buffer key) throws RocksDBException {
2021-12-30 17:28:06 +01:00
startedContains.increment();
try {
var result = containsTime.recordCallable(() -> {
var unmodifiableReadOpts = resolveSnapshot(snapshot);
return db.exists(unmodifiableReadOpts, key);
});
assert result != null;
return result;
} catch (RocksDBException | RuntimeException e) {
throw e;
} catch (Exception ex) {
throw new RuntimeException(ex);
} finally {
endedContains.increment();
}
2021-01-30 00:24:55 +01:00
}
@Override
public Mono<Send<Buffer>> put(Mono<Send<Buffer>> keyMono, Mono<Send<Buffer>> valueMono,
2021-08-16 10:36:54 +02:00
LLDictionaryResultType resultType) {
// Zip the entry to write to the database
var entryMono = Mono.zip(keyMono, valueMono, Map::entry);
// Obtain the previous value from the database
2022-03-24 21:14:17 +01:00
var previousDataMono = this.getPreviousData(keyMono, resultType);
// Write the new entry to the database
2022-01-26 19:03:51 +01:00
Mono<Send<Buffer>> putMono = entryMono
2022-01-26 14:22:54 +01:00
.publishOn(dbScheduler)
2022-01-26 19:03:51 +01:00
.handle((entry, sink) -> {
try (var key = entry.getKey().receive()) {
try (var value = entry.getValue().receive()) {
2022-01-26 21:18:43 +01:00
put(key, value);
sink.complete();
2022-01-26 21:18:43 +01:00
} catch (RocksDBException ex) {
sink.error(ex);
}
}
});
// Read the previous data, then write the new data, then return the previous data
2022-01-26 19:03:51 +01:00
return Flux.concat(previousDataMono, putMono).singleOrEmpty();
2020-12-07 22:15:18 +01:00
}
2022-01-26 21:18:43 +01:00
private void put(Buffer key, Buffer value) throws RocksDBException {
assert key.isAccessible();
assert value.isAccessible();
if (logger.isTraceEnabled(MARKER_ROCKSDB)) {
var varargs = new Supplier<?>[]{() -> toStringSafe(key), () -> toStringSafe(value)};
logger.trace(MARKER_ROCKSDB, "Writing {}: {}", varargs);
}
startedPut.increment();
try {
putTime.recordCallable(() -> {
db.put(EMPTY_WRITE_OPTIONS, key, value);
return null;
});
} catch (RocksDBException ex) {
throw new RocksDBException("Failed to write: " + ex.getMessage());
} catch (RuntimeException ex) {
throw ex;
} catch (Exception ex) {
throw new RuntimeException("Failed to write", ex);
} finally {
endedPut.increment();
}
}
2021-05-02 19:18:15 +02:00
@Override
public Mono<UpdateMode> getUpdateMode() {
2022-01-26 19:03:51 +01:00
return Mono.just(updateMode);
2021-05-02 19:18:15 +02:00
}
2021-05-08 03:09:00 +02:00
@SuppressWarnings("DuplicatedCode")
2021-02-06 19:21:31 +01:00
@Override
2021-08-29 23:18:03 +02:00
public Mono<Send<Buffer>> update(Mono<Send<Buffer>> keyMono,
2021-11-08 16:33:41 +01:00
SerializationFunction<@Nullable Send<Buffer>, @Nullable Buffer> updater,
2022-03-02 18:33:58 +01:00
UpdateReturnMode updateReturnMode) {
2022-01-26 19:03:51 +01:00
return keyMono
.publishOn(dbScheduler)
.handle((keySend, sink) -> {
try (keySend) {
assert !Schedulers.isInNonBlockingThread() : "Called update in a nonblocking thread";
if (updateMode == UpdateMode.DISALLOW) {
sink.error(new UnsupportedOperationException("update() is disallowed"));
return;
}
UpdateAtomicResultMode returnMode = switch (updateReturnMode) {
case NOTHING -> UpdateAtomicResultMode.NOTHING;
case GET_NEW_VALUE -> UpdateAtomicResultMode.CURRENT;
case GET_OLD_VALUE -> UpdateAtomicResultMode.PREVIOUS;
};
UpdateAtomicResult result;
startedUpdates.increment();
try {
result = updateTime.recordCallable(() -> db.updateAtomic(EMPTY_READ_OPTIONS,
2022-03-02 18:33:58 +01:00
EMPTY_WRITE_OPTIONS, keySend, updater, returnMode));
2022-01-26 19:03:51 +01:00
} finally {
endedUpdates.increment();
}
assert result != null;
var previous = switch (updateReturnMode) {
case NOTHING -> null;
case GET_NEW_VALUE -> ((UpdateAtomicResultCurrent) result).current();
case GET_OLD_VALUE -> ((UpdateAtomicResultPrevious) result).previous();
};
if (previous != null) {
sink.next(previous);
} else {
sink.complete();
}
} catch (Exception ex) {
sink.error(ex);
2021-12-30 17:28:06 +01:00
}
2022-01-26 19:03:51 +01:00
});
2021-10-17 19:52:43 +02:00
}
2021-10-20 01:51:34 +02:00
@SuppressWarnings("DuplicatedCode")
2021-05-08 03:09:00 +02:00
@Override
2021-09-08 00:22:39 +02:00
public Mono<Send<LLDelta>> updateAndGetDelta(Mono<Send<Buffer>> keyMono,
2022-03-02 18:33:58 +01:00
SerializationFunction<@Nullable Send<Buffer>, @Nullable Buffer> updater) {
2022-01-26 19:03:51 +01:00
return keyMono
.publishOn(dbScheduler)
.handle((keySend, sink) -> {
try (keySend) {
assert !Schedulers.isInNonBlockingThread() : "Called update in a nonblocking thread";
if (updateMode == UpdateMode.DISALLOW) {
sink.error(new UnsupportedOperationException("update() is disallowed"));
return;
}
if (updateMode == UpdateMode.ALLOW && !db.supportsTransactions()) {
sink.error(new UnsupportedOperationException("update() is disallowed because the database doesn't support"
+ "safe atomic operations"));
return;
}
UpdateAtomicResult result;
startedUpdates.increment();
try {
result = updateTime.recordCallable(() -> db.updateAtomic(EMPTY_READ_OPTIONS,
2022-03-02 18:33:58 +01:00
EMPTY_WRITE_OPTIONS, keySend, updater, UpdateAtomicResultMode.DELTA));
2022-01-26 19:03:51 +01:00
} finally {
endedUpdates.increment();
}
assert result != null;
sink.next(((UpdateAtomicResultDelta) result).delta());
} catch (Exception ex) {
sink.error(ex);
2021-12-30 17:28:06 +01:00
}
2022-01-26 19:03:51 +01:00
});
}
2020-12-07 22:15:18 +01:00
@Override
2021-08-29 23:18:03 +02:00
public Mono<Send<Buffer>> remove(Mono<Send<Buffer>> keyMono, LLDictionaryResultType resultType) {
2022-01-26 19:03:51 +01:00
// Obtain the previous value from the database
2022-03-24 21:14:17 +01:00
Mono<Send<Buffer>> previousDataMono = this.getPreviousData(keyMono, resultType);
2022-01-26 19:03:51 +01:00
// Delete the value from the database
Mono<Send<Buffer>> removeMono = keyMono
.publishOn(dbScheduler)
.handle((keySend, sink) -> {
try (var key = keySend.receive()) {
logger.trace(MARKER_ROCKSDB, "Deleting {}", () -> toStringSafe(key));
startedRemove.increment();
try {
removeTime.recordCallable(() -> {
db.delete(EMPTY_WRITE_OPTIONS, key);
return null;
});
} finally {
endedRemove.increment();
}
sink.complete();
} catch (RocksDBException ex) {
sink.error(new RocksDBException("Failed to delete: " + ex.getMessage()));
} catch (Exception ex) {
sink.error(ex);
}
});
// Read the previous data, then delete the data, then return the previous data
return Flux.concat(previousDataMono, removeMono).singleOrEmpty();
2021-01-30 00:24:55 +01:00
}
2020-12-07 22:15:18 +01:00
2022-03-24 21:14:17 +01:00
private Mono<Send<Buffer>> getPreviousData(Mono<Send<Buffer>> keyMono, LLDictionaryResultType resultType) {
2021-08-31 15:50:11 +02:00
return switch (resultType) {
2022-01-26 21:30:08 +01:00
case PREVIOUS_VALUE_EXISTENCE -> keyMono
.publishOn(dbScheduler)
.handle((keySend, sink) -> {
try (var key = keySend.receive()) {
var contained = containsKey(null, key);
sink.next(LLUtils.booleanToResponseByteBuffer(alloc, contained));
} catch (RocksDBException ex) {
sink.error(ex);
}
});
2022-01-26 19:03:51 +01:00
case PREVIOUS_VALUE -> keyMono
.publishOn(dbScheduler)
.handle((keySend, sink) -> {
try (var key = keySend.receive()) {
assert !Schedulers.isInNonBlockingThread() : "Called getPreviousData in a nonblocking thread";
2022-03-02 18:33:58 +01:00
var result = db.get(EMPTY_READ_OPTIONS, key);
2022-01-26 19:03:51 +01:00
logger.trace(MARKER_ROCKSDB, "Reading {}: {}", () -> toStringSafe(key), () -> toStringSafe(result));
if (result == null) {
sink.complete();
} else {
sink.next(result.send());
}
} catch (Exception ex) {
sink.error(ex);
}
});
2021-08-31 15:50:11 +02:00
case VOID -> Mono.empty();
};
}
@Override
2022-03-24 21:14:17 +01:00
public Flux<Optional<Buffer>> getMulti(@Nullable LLSnapshot snapshot, Flux<Send<Buffer>> keys) {
return keys
.buffer(MULTI_GET_WINDOW)
2022-01-26 19:03:51 +01:00
.publishOn(dbScheduler)
.<ArrayList<Optional<Buffer>>>handle((keysWindow, sink) -> {
2021-11-08 10:49:59 +01:00
List<Buffer> keyBufsWindow = new ArrayList<>(keysWindow.size());
for (Send<Buffer> bufferSend : keysWindow) {
keyBufsWindow.add(bufferSend.receive());
2021-08-29 23:18:03 +02:00
}
2021-11-08 10:49:59 +01:00
try {
2022-01-26 19:03:51 +01:00
assert !Schedulers.isInNonBlockingThread() : "Called getMulti in a nonblocking thread";
2021-11-08 10:49:59 +01:00
var readOptions = Objects.requireNonNullElse(resolveSnapshot(snapshot), EMPTY_READ_OPTIONS);
List<byte[]> results = db.multiGetAsList(readOptions, LLUtils.toArray(keyBufsWindow));
2021-11-08 16:33:41 +01:00
var mappedResults = new ArrayList<Optional<Buffer>>(results.size());
2021-11-08 10:49:59 +01:00
for (int i = 0; i < results.size(); i++) {
byte[] val = results.get(i);
Optional<Buffer> valueOpt;
if (val != null) {
// free memory
results.set(i, null);
valueOpt = Optional.of(LLUtils.fromByteArray(alloc, val));
} else {
valueOpt = Optional.empty();
2021-08-29 23:18:03 +02:00
}
2021-11-08 16:33:41 +01:00
mappedResults.add(valueOpt);
2021-11-08 10:49:59 +01:00
}
2022-01-26 19:03:51 +01:00
sink.next(mappedResults);
} catch (RocksDBException ex) {
sink.error(new RocksDBException("Failed to read keys: " + ex.getMessage()));
2021-11-08 10:49:59 +01:00
} finally {
for (Buffer buffer : keyBufsWindow) {
buffer.close();
2021-08-29 23:18:03 +02:00
}
2021-11-08 10:49:59 +01:00
}
})
2022-01-26 19:03:51 +01:00
.flatMapIterable(list -> list);
2021-08-29 23:18:03 +02:00
}
@Override
2022-01-26 19:03:51 +01:00
public Mono<Void> putMulti(Flux<Send<LLEntry>> entries) {
2021-08-29 23:18:03 +02:00
return entries
.buffer(Math.min(MULTI_GET_WINDOW, CAPPED_WRITE_BATCH_CAP))
2022-01-26 19:03:51 +01:00
.publishOn(dbScheduler)
.handle((entriesWindowList, sink) -> {
var entriesWindow = new ArrayList<LLEntry>(entriesWindowList.size());
for (Send<LLEntry> entrySend : entriesWindowList) {
entriesWindow.add(entrySend.receive());
}
try {
assert !Schedulers.isInNonBlockingThread() : "Called putMulti in a nonblocking thread";
if (USE_WRITE_BATCHES_IN_PUT_MULTI) {
var batch = new CappedWriteBatch(db,
alloc,
CAPPED_WRITE_BATCH_CAP,
RESERVED_WRITE_BATCH_SIZE,
MAX_WRITE_BATCH_SIZE,
BATCH_WRITE_OPTIONS
);
for (LLEntry entry : entriesWindow) {
var k = entry.getKey();
var v = entry.getValue();
if (nettyDirect) {
batch.put(cfh, k, v);
2021-10-17 19:52:43 +02:00
} else {
2022-01-26 19:03:51 +01:00
try (var key = k.receive()) {
try (var value = v.receive()) {
batch.put(cfh, LLUtils.toArray(key), LLUtils.toArray(value));
2021-08-29 23:18:03 +02:00
}
2021-10-17 19:52:43 +02:00
}
2021-05-02 19:18:15 +02:00
}
2021-08-29 23:18:03 +02:00
}
2022-01-26 19:03:51 +01:00
batch.writeToDbAndClose();
batch.close();
} else {
for (LLEntry entry : entriesWindow) {
db.put(EMPTY_WRITE_OPTIONS, entry.getKeyUnsafe(), entry.getValueUnsafe());
}
}
sink.complete();
} catch (RocksDBException ex) {
sink.error(new RocksDBException("Failed to write: " + ex.getMessage()));
} finally {
for (LLEntry llEntry : entriesWindow) {
llEntry.close();
}
}
})
.then();
}
2021-07-17 11:52:08 +02:00
@Override
2021-11-08 10:49:59 +01:00
public <K> Flux<Boolean> updateMulti(Flux<K> keys, Flux<Send<Buffer>> serializedKeys,
2021-11-08 16:33:41 +01:00
KVSerializationFunction<K, @Nullable Send<Buffer>, @Nullable Buffer> updateFunction) {
2021-11-08 10:49:59 +01:00
return Flux.zip(keys, serializedKeys)
2021-07-17 11:52:08 +02:00
.buffer(Math.min(MULTI_GET_WINDOW, CAPPED_WRITE_BATCH_CAP))
2021-11-08 10:49:59 +01:00
.flatMapSequential(ew -> this.<List<Boolean>>runOnDb(() -> {
List<Tuple2<K, Buffer>> entriesWindow = new ArrayList<>(ew.size());
for (Tuple2<K, Send<Buffer>> tuple : ew) {
entriesWindow.add(tuple.mapT2(Send::receive));
2021-08-29 23:18:03 +02:00
}
try {
if (Schedulers.isInNonBlockingThread()) {
throw new UnsupportedOperationException("Called updateMulti in a nonblocking thread");
}
2021-08-29 23:18:03 +02:00
List<Buffer> keyBufsWindow = new ArrayList<>(entriesWindow.size());
2021-11-08 10:49:59 +01:00
for (Tuple2<K, Buffer> objects : entriesWindow) {
keyBufsWindow.add(objects.getT2());
2021-08-29 23:18:03 +02:00
}
2021-11-08 10:49:59 +01:00
ArrayList<Tuple3<K, Send<Buffer>, Optional<Send<Buffer>>>> mappedInputs;
2021-10-17 19:52:43 +02:00
{
2021-10-20 01:51:34 +02:00
var readOptions = Objects.requireNonNullElse(resolveSnapshot(null), EMPTY_READ_OPTIONS);
var inputs = db.multiGetAsList(readOptions, LLUtils.toArray(keyBufsWindow));
2021-10-17 19:52:43 +02:00
mappedInputs = new ArrayList<>(inputs.size());
for (int i = 0; i < inputs.size(); i++) {
var val = inputs.get(i);
if (val != null) {
inputs.set(i, null);
mappedInputs.add(Tuples.of(
2021-11-08 10:49:59 +01:00
entriesWindow.get(i).getT1(),
2021-10-17 19:52:43 +02:00
keyBufsWindow.get(i).send(),
Optional.of(fromByteArray(alloc, val).send())
));
} else {
mappedInputs.add(Tuples.of(
2021-11-08 10:49:59 +01:00
entriesWindow.get(i).getT1(),
2021-10-17 19:52:43 +02:00
keyBufsWindow.get(i).send(),
Optional.empty()
));
}
2021-08-29 23:18:03 +02:00
}
}
2021-11-08 16:33:41 +01:00
var updatedValuesToWrite = new ArrayList<Buffer>(mappedInputs.size());
2021-11-08 10:49:59 +01:00
var valueChangedResult = new ArrayList<Boolean>(mappedInputs.size());
2021-08-29 23:18:03 +02:00
try {
2021-10-17 19:52:43 +02:00
for (var mappedInput : mappedInputs) {
2021-11-08 16:33:41 +01:00
var updatedValue = updateFunction.apply(mappedInput.getT1(), mappedInput.getT2());
try {
if (updatedValue != null) {
try (var t3 = mappedInput.getT3().map(Send::receive).orElse(null)) {
valueChangedResult.add(!LLUtils.equals(t3, updatedValue));
2021-11-08 10:49:59 +01:00
}
2021-11-08 16:33:41 +01:00
updatedValuesToWrite.add(updatedValue);
2021-11-08 10:49:59 +01:00
} else {
try (var t3 = mappedInput.getT3().map(Send::receive).orElse(null)) {
valueChangedResult.add(!LLUtils.equals(t3, null));
}
updatedValuesToWrite.add(null);
2021-07-17 11:52:08 +02:00
}
2021-11-08 16:33:41 +01:00
} catch (Throwable t) {
if (updatedValue != null) {
updatedValue.close();
}
throw t;
2021-08-29 23:18:03 +02:00
}
}
2021-10-17 19:52:43 +02:00
} finally {
for (var mappedInput : mappedInputs) {
mappedInput.getT3().ifPresent(Send::close);
2021-08-29 23:18:03 +02:00
}
2021-10-17 19:52:43 +02:00
}
2021-07-17 11:52:08 +02:00
2021-10-17 19:52:43 +02:00
if (USE_WRITE_BATCHES_IN_PUT_MULTI) {
var batch = new CappedWriteBatch(db,
alloc,
CAPPED_WRITE_BATCH_CAP,
RESERVED_WRITE_BATCH_SIZE,
MAX_WRITE_BATCH_SIZE,
BATCH_WRITE_OPTIONS
);
int i = 0;
2021-11-08 10:49:59 +01:00
for (Tuple2<K, Buffer> entry : entriesWindow) {
2021-11-08 16:33:41 +01:00
try (var valueToWrite = updatedValuesToWrite.get(i)) {
if (valueToWrite == null) {
batch.delete(cfh, entry.getT2().send());
} else {
batch.put(cfh, entry.getT2().send(), valueToWrite.send());
}
2021-07-17 11:52:08 +02:00
}
2021-10-17 19:52:43 +02:00
i++;
}
batch.writeToDbAndClose();
batch.close();
} else {
int i = 0;
2021-11-08 10:49:59 +01:00
for (Tuple2<K, Buffer> entry : entriesWindow) {
2021-12-12 02:17:36 +01:00
db.put(EMPTY_WRITE_OPTIONS, entry.getT2(), updatedValuesToWrite.get(i));
2021-10-17 19:52:43 +02:00
i++;
2021-08-29 23:18:03 +02:00
}
}
2021-10-17 19:52:43 +02:00
return valueChangedResult;
2021-08-29 23:18:03 +02:00
} finally {
2021-11-08 10:49:59 +01:00
for (Tuple2<K, Buffer> tuple : entriesWindow) {
tuple.getT2().close();
2021-08-29 23:18:03 +02:00
}
}
2022-01-26 14:22:54 +01:00
}).flatMapIterable(list -> list), /* Max concurrency is 2 to update data while preparing the next segment */ 2);
2021-07-17 11:52:08 +02:00
}
2020-12-07 22:15:18 +01:00
@Override
2022-03-24 21:14:17 +01:00
public Flux<Send<LLEntry>> getRange(@Nullable LLSnapshot snapshot, Mono<Send<LLRange>> rangeMono, boolean reverse) {
2022-01-26 19:56:51 +01:00
return rangeMono.flatMapMany(rangeSend -> {
try (var range = rangeSend.receive()) {
if (range.isSingle()) {
var rangeSingleMono = rangeMono.map(r -> r.receive().getSingle());
2022-03-24 21:14:17 +01:00
return getRangeSingle(snapshot, rangeSingleMono);
2022-01-26 19:56:51 +01:00
} else {
2022-03-24 21:14:17 +01:00
return getRangeMulti(snapshot, rangeMono, reverse);
2022-01-26 19:56:51 +01:00
}
}
});
}
@Override
2021-08-29 23:18:03 +02:00
public Flux<List<Send<LLEntry>>> getRangeGrouped(@Nullable LLSnapshot snapshot,
Mono<Send<LLRange>> rangeMono,
2022-03-24 21:14:17 +01:00
int prefixLength) {
2022-01-26 19:56:51 +01:00
return rangeMono.flatMapMany(rangeSend -> {
try (var range = rangeSend.receive()) {
if (range.isSingle()) {
var rangeSingleMono = rangeMono.map(r -> r.receive().getSingle());
2022-03-24 21:14:17 +01:00
return getRangeSingle(snapshot, rangeSingleMono).map(List::of);
2022-01-26 19:56:51 +01:00
} else {
return getRangeMultiGrouped(snapshot, rangeMono, prefixLength);
}
}
});
2021-01-30 00:24:55 +01:00
}
2022-03-24 21:14:17 +01:00
private Flux<Send<LLEntry>> getRangeSingle(LLSnapshot snapshot, Mono<Send<Buffer>> keyMono) {
2021-08-29 23:18:03 +02:00
return Mono
2022-03-24 21:14:17 +01:00
.zip(keyMono, this.get(snapshot, keyMono))
2021-08-29 23:18:03 +02:00
.map(result -> LLEntry.of(result.getT1(), result.getT2()).send())
2022-01-26 14:22:54 +01:00
.flux();
}
2022-03-24 21:14:17 +01:00
private Flux<Send<LLEntry>> getRangeMulti(LLSnapshot snapshot, Mono<Send<LLRange>> rangeMono, boolean reverse) {
2022-01-26 19:56:51 +01:00
Mono<LLLocalEntryReactiveRocksIterator> iteratorMono = rangeMono.map(rangeSend -> {
ReadOptions resolvedSnapshot = resolveSnapshot(snapshot);
2022-03-24 21:14:17 +01:00
return new LLLocalEntryReactiveRocksIterator(db, rangeSend, nettyDirect, resolvedSnapshot, reverse);
2022-01-26 19:56:51 +01:00
});
return Flux.usingWhen(iteratorMono,
iterator -> iterator.flux().subscribeOn(dbScheduler, false),
iterator -> Mono.fromRunnable(iterator::close)
);
}
2021-10-19 00:22:05 +02:00
private Flux<List<Send<LLEntry>>> getRangeMultiGrouped(LLSnapshot snapshot, Mono<Send<LLRange>> rangeMono,
int prefixLength) {
2022-01-26 19:56:51 +01:00
Mono<LLLocalGroupedEntryReactiveRocksIterator> iteratorMono = rangeMono.map(rangeSend -> {
ReadOptions resolvedSnapshot = resolveSnapshot(snapshot);
return new LLLocalGroupedEntryReactiveRocksIterator(db, prefixLength, rangeSend, nettyDirect, resolvedSnapshot);
});
return Flux.usingWhen(
iteratorMono,
iterator -> iterator.flux().subscribeOn(dbScheduler, false),
iterator -> Mono.fromRunnable(iterator::close)
);
2021-01-30 00:24:55 +01:00
}
@Override
2022-03-24 21:14:17 +01:00
public Flux<Send<Buffer>> getRangeKeys(@Nullable LLSnapshot snapshot, Mono<Send<LLRange>> rangeMono, boolean reverse) {
2022-01-26 19:56:51 +01:00
return rangeMono.flatMapMany(rangeSend -> {
try (var range = rangeSend.receive()) {
if (range.isSingle()) {
return this.getRangeKeysSingle(snapshot, rangeMono.map(r -> r.receive().getSingle()));
} else {
2022-03-24 21:14:17 +01:00
return this.getRangeKeysMulti(snapshot, rangeMono, reverse);
2022-01-26 19:56:51 +01:00
}
}
});
}
@Override
2021-08-29 23:18:03 +02:00
public Flux<List<Send<Buffer>>> getRangeKeysGrouped(@Nullable LLSnapshot snapshot,
Mono<Send<LLRange>> rangeMono,
int prefixLength) {
2022-01-26 19:56:51 +01:00
Mono<LLLocalGroupedKeyReactiveRocksIterator> iteratorMono = rangeMono.map(rangeSend -> {
ReadOptions resolvedSnapshot = resolveSnapshot(snapshot);
return new LLLocalGroupedKeyReactiveRocksIterator(db, prefixLength, rangeSend, nettyDirect, resolvedSnapshot);
});
return Flux.usingWhen(iteratorMono,
iterator -> iterator.flux().subscribeOn(dbScheduler, false),
iterator -> Mono.fromRunnable(iterator::close)
);
2021-03-14 13:24:46 +01:00
}
@Override
2021-08-29 23:18:03 +02:00
public Flux<BadBlock> badBlocks(Mono<Send<LLRange>> rangeMono) {
return Flux.usingWhen(rangeMono,
2021-08-29 23:18:03 +02:00
rangeSend -> Flux
.<BadBlock>create(sink -> {
2021-08-29 23:18:03 +02:00
var range = rangeSend.receive();
sink.onDispose(range::close);
try (var ro = new ReadOptions(getReadOptions(null))) {
ro.setFillCache(false);
if (!range.isSingle()) {
2022-03-20 14:33:27 +01:00
if (LLUtils.MANUAL_READAHEAD) {
ro.setReadaheadSize(32 * 1024);
}
}
ro.setVerifyChecksums(true);
2022-03-24 21:14:17 +01:00
try (var rocksIteratorTuple = getRocksIterator(nettyDirect, ro, range, db, false)) {
2022-01-26 19:03:51 +01:00
var rocksIterator = rocksIteratorTuple.iterator();
rocksIterator.seekToFirst();
rocksIterator.status();
while (rocksIterator.isValid() && !sink.isCancelled()) {
try {
rocksIterator.status();
rocksIterator.key(DUMMY_WRITE_ONLY_BYTE_BUFFER);
rocksIterator.status();
rocksIterator.value(DUMMY_WRITE_ONLY_BYTE_BUFFER);
rocksIterator.status();
} catch (RocksDBException ex) {
2022-03-02 12:34:30 +01:00
sink.next(new BadBlock(databaseName, ColumnUtils.special(columnName), null, ex));
}
2022-01-26 19:03:51 +01:00
rocksIterator.next();
}
}
sink.complete();
} catch (Throwable ex) {
sink.error(ex);
}
})
.subscribeOn(dbScheduler),
2021-08-29 23:18:03 +02:00
rangeSend -> Mono.fromRunnable(rangeSend::close)
);
}
2021-03-14 13:24:46 +01:00
@Override
2021-10-19 00:22:05 +02:00
public Flux<Send<Buffer>> getRangeKeyPrefixes(@Nullable LLSnapshot snapshot, Mono<Send<LLRange>> rangeMono,
int prefixLength) {
2022-01-26 19:56:51 +01:00
Mono<LLLocalKeyPrefixReactiveRocksIterator> iteratorMono = rangeMono.map(range -> {
ReadOptions resolvedSnapshot = resolveSnapshot(snapshot);
return new LLLocalKeyPrefixReactiveRocksIterator(db, prefixLength, range, nettyDirect, resolvedSnapshot, true);
});
return Flux.usingWhen(iteratorMono,
iterator -> iterator.flux().subscribeOn(dbScheduler),
iterator -> Mono.fromRunnable(iterator::close)
);
}
2021-08-29 23:18:03 +02:00
private Flux<Send<Buffer>> getRangeKeysSingle(LLSnapshot snapshot, Mono<Send<Buffer>> keyMono) {
2022-01-26 19:56:51 +01:00
return keyMono
.publishOn(dbScheduler)
.<Send<Buffer>>handle((keySend, sink) -> {
try (var key = keySend.receive()) {
if (containsKey(snapshot, key)) {
sink.next(key.send());
} else {
sink.complete();
}
} catch (Throwable ex) {
sink.error(ex);
}
})
.flux();
2020-12-07 22:15:18 +01:00
}
2022-03-24 21:14:17 +01:00
private Flux<Send<Buffer>> getRangeKeysMulti(LLSnapshot snapshot, Mono<Send<LLRange>> rangeMono, boolean reverse) {
2022-01-26 19:56:51 +01:00
Mono<LLLocalKeyReactiveRocksIterator> iteratorMono = rangeMono.map(range -> {
ReadOptions resolvedSnapshot = resolveSnapshot(snapshot);
2022-03-24 21:14:17 +01:00
return new LLLocalKeyReactiveRocksIterator(db, range, nettyDirect, resolvedSnapshot, reverse);
2022-01-26 19:56:51 +01:00
});
return Flux.usingWhen(iteratorMono,
iterator -> iterator.flux().subscribeOn(dbScheduler, false),
iterator -> Mono.fromRunnable(iterator::close)
);
}
2020-12-07 22:15:18 +01:00
@Override
2021-08-29 23:18:03 +02:00
public Mono<Void> setRange(Mono<Send<LLRange>> rangeMono, Flux<Send<LLEntry>> entries) {
2022-01-26 21:18:43 +01:00
if (USE_WINDOW_IN_SET_RANGE) {
return rangeMono
.publishOn(dbScheduler)
.<Void>handle((rangeSend, sink) -> {
try (var range = rangeSend.receive()) {
assert !Schedulers.isInNonBlockingThread() : "Called setRange in a nonblocking thread";
if (!USE_WRITE_BATCH_IN_SET_RANGE_DELETE || !USE_WRITE_BATCHES_IN_SET_RANGE) {
assert EMPTY_READ_OPTIONS.isOwningHandle();
try (var opts = new ReadOptions(EMPTY_READ_OPTIONS)) {
ReleasableSlice minBound;
if (range.hasMin()) {
minBound = setIterateBound(nettyDirect, opts, IterateBound.LOWER, range.getMinUnsafe());
} else {
minBound = emptyReleasableSlice();
}
try {
ReleasableSlice maxBound;
if (range.hasMax()) {
maxBound = setIterateBound(nettyDirect, opts, IterateBound.UPPER, range.getMaxUnsafe());
} else {
maxBound = emptyReleasableSlice();
}
2022-01-26 21:18:43 +01:00
assert cfh.isOwningHandle();
assert opts.isOwningHandle();
SafeCloseable seekTo;
try (RocksIterator it = db.newIterator(opts)) {
2022-03-24 21:14:17 +01:00
if (!PREFER_AUTO_SEEK_BOUND && range.hasMin()) {
2022-01-26 21:18:43 +01:00
seekTo = rocksIterSeekTo(nettyDirect, it, range.getMinUnsafe());
} else {
seekTo = null;
it.seekToFirst();
2021-06-19 21:55:20 +02:00
}
2022-01-26 21:18:43 +01:00
try {
it.status();
while (it.isValid()) {
db.delete(EMPTY_WRITE_OPTIONS, it.key());
it.next();
it.status();
2021-08-29 23:18:03 +02:00
}
2022-01-26 21:18:43 +01:00
} finally {
if (seekTo != null) {
seekTo.close();
2021-08-29 23:18:03 +02:00
}
}
2022-01-26 21:18:43 +01:00
} finally {
maxBound.close();
}
2022-01-26 21:18:43 +01:00
} finally {
minBound.close();
2021-05-12 01:25:59 +02:00
}
2022-01-26 21:18:43 +01:00
}
} else if (USE_CAPPED_WRITE_BATCH_IN_SET_RANGE) {
try (var batch = new CappedWriteBatch(db,
alloc,
CAPPED_WRITE_BATCH_CAP,
RESERVED_WRITE_BATCH_SIZE,
MAX_WRITE_BATCH_SIZE,
BATCH_WRITE_OPTIONS
)) {
if (range.isSingle()) {
batch.delete(cfh, range.getSingle());
} else {
deleteSmallRangeWriteBatch(batch, range.copy().send());
}
batch.writeToDbAndClose();
}
} else {
try (var batch = new WriteBatch(RESERVED_WRITE_BATCH_SIZE)) {
if (range.isSingle()) {
batch.delete(cfh, LLUtils.toArray(range.getSingleUnsafe()));
} else {
deleteSmallRangeWriteBatch(batch, range.copy().send());
}
db.write(EMPTY_WRITE_OPTIONS, batch);
batch.clear();
}
}
sink.complete();
} catch (RocksDBException ex) {
sink.error(new RocksDBException("Failed to set a range: " + ex.getMessage()));
}
})
.thenMany(entries.window(MULTI_GET_WINDOW))
.flatMap(keysWindowFlux -> keysWindowFlux
.collectList()
.flatMap(entriesListSend -> this
.<Void>runOnDb(() -> {
List<LLEntry> entriesList = new ArrayList<>(entriesListSend.size());
for (Send<LLEntry> entrySend : entriesListSend) {
entriesList.add(entrySend.receive());
}
try {
if (!USE_WRITE_BATCHES_IN_SET_RANGE) {
for (LLEntry entry : entriesList) {
assert entry.isAccessible();
db.put(EMPTY_WRITE_OPTIONS, entry.getKeyUnsafe(), entry.getValueUnsafe());
}
} else if (USE_CAPPED_WRITE_BATCH_IN_SET_RANGE) {
try (var batch = new CappedWriteBatch(db,
alloc,
CAPPED_WRITE_BATCH_CAP,
RESERVED_WRITE_BATCH_SIZE,
MAX_WRITE_BATCH_SIZE,
BATCH_WRITE_OPTIONS
)) {
for (LLEntry entry : entriesList) {
assert entry.isAccessible();
if (nettyDirect) {
batch.put(cfh, entry.getKey(), entry.getValue());
} else {
2022-01-26 21:18:43 +01:00
batch.put(cfh,
LLUtils.toArray(entry.getKeyUnsafe()),
LLUtils.toArray(entry.getValueUnsafe())
);
2021-05-12 01:25:59 +02:00
}
2021-05-02 19:18:15 +02:00
}
2022-01-26 21:18:43 +01:00
batch.writeToDbAndClose();
}
} else {
try (var batch = new WriteBatch(RESERVED_WRITE_BATCH_SIZE)) {
for (LLEntry entry : entriesList) {
assert entry.isAccessible();
batch.put(cfh, LLUtils.toArray(entry.getKeyUnsafe()),
LLUtils.toArray(entry.getValueUnsafe()));
}
db.write(EMPTY_WRITE_OPTIONS, batch);
batch.clear();
}
}
return null;
} finally {
for (LLEntry entry : entriesList) {
assert entry.isAccessible();
entry.close();
}
}
})
)
)
.then()
.onErrorMap(cause -> new IOException("Failed to write range", cause));
} else {
if (USE_WRITE_BATCHES_IN_SET_RANGE) {
return Mono.error(() -> new UnsupportedOperationException(
"Can't use write batches in setRange without window. Please fix the parameters"));
}
var deleteMono = this
.getRange(null, rangeMono, false)
.publishOn(dbScheduler)
.handle((oldValueSend, sink) -> {
try (var oldValue = oldValueSend.receive()) {
db.delete(EMPTY_WRITE_OPTIONS, oldValue.getKeyUnsafe());
sink.complete();
} catch (RocksDBException ex) {
sink.error(new RocksDBException("Failed to write range: " + ex.getMessage()));
}
2022-01-26 21:18:43 +01:00
})
.then(Mono.<Void>empty());
var putMono = entries
.publishOn(dbScheduler)
.handle((entrySend, sink) -> {
try (var entry = entrySend.receive()) {
if (entry.getKeyUnsafe() != null && entry.getValueUnsafe() != null) {
this.put(entry.getKeyUnsafe(), entry.getValueUnsafe());
}
sink.complete();
} catch (RocksDBException ex) {
sink.error(new RocksDBException("Failed to write range: " + ex.getMessage()));
}
})
.then(Mono.<Void>empty());
return deleteMono.then(putMono);
}
2021-03-14 03:13:19 +01:00
}
//todo: this is broken, check why. (is this still true?)
2021-08-29 23:18:03 +02:00
private void deleteSmallRangeWriteBatch(CappedWriteBatch writeBatch, Send<LLRange> rangeToReceive)
2021-03-20 12:41:11 +01:00
throws RocksDBException {
2021-08-29 23:18:03 +02:00
var range = rangeToReceive.receive();
2021-06-19 21:55:20 +02:00
try (var readOpts = new ReadOptions(getReadOptions(null))) {
2021-05-03 02:57:08 +02:00
readOpts.setFillCache(false);
ReleasableSlice minBound;
if (range.hasMin()) {
2021-12-12 02:17:36 +01:00
minBound = setIterateBound(nettyDirect, readOpts, IterateBound.LOWER, range.getMinUnsafe());
2021-03-20 12:41:11 +01:00
} else {
2021-05-03 02:57:08 +02:00
minBound = emptyReleasableSlice();
2021-03-20 12:41:11 +01:00
}
2021-05-03 02:57:08 +02:00
try {
ReleasableSlice maxBound;
if (range.hasMax()) {
2021-12-12 02:17:36 +01:00
maxBound = setIterateBound(nettyDirect, readOpts, IterateBound.UPPER, range.getMaxUnsafe());
2021-05-03 02:57:08 +02:00
} else {
maxBound = emptyReleasableSlice();
}
2021-10-20 01:51:34 +02:00
try (var rocksIterator = db.newIterator(readOpts)) {
2021-09-01 00:01:56 +02:00
SafeCloseable seekTo;
2022-03-24 21:14:17 +01:00
if (!LLLocalDictionary.PREFER_AUTO_SEEK_BOUND && range.hasMin()) {
2021-12-12 02:17:36 +01:00
seekTo = rocksIterSeekTo(nettyDirect, rocksIterator, range.getMinUnsafe());
2021-05-03 02:57:08 +02:00
} else {
2021-09-01 00:01:56 +02:00
seekTo = null;
2021-05-03 02:57:08 +02:00
rocksIterator.seekToFirst();
}
2021-09-01 00:01:56 +02:00
try {
rocksIterator.status();
2021-09-01 00:01:56 +02:00
while (rocksIterator.isValid()) {
2021-09-22 18:33:28 +02:00
writeBatch.delete(cfh, LLUtils.readDirectNioBuffer(alloc, rocksIterator::key).send());
2021-09-01 00:01:56 +02:00
rocksIterator.next();
rocksIterator.status();
}
} finally {
if (seekTo != null) {
seekTo.close();
}
2021-05-03 02:57:08 +02:00
}
} finally {
2021-08-29 23:18:03 +02:00
maxBound.close();
2021-05-03 02:57:08 +02:00
}
} finally {
2021-08-29 23:18:03 +02:00
minBound.close();
2021-03-20 12:41:11 +01:00
}
2021-08-29 23:18:03 +02:00
} catch (Throwable e) {
range.close();
throw e;
}
}
2021-08-29 23:18:03 +02:00
private void deleteSmallRangeWriteBatch(WriteBatch writeBatch, Send<LLRange> rangeToReceive)
2021-05-02 19:18:15 +02:00
throws RocksDBException {
2021-08-29 23:18:03 +02:00
try (var range = rangeToReceive.receive()) {
try (var readOpts = new ReadOptions(getReadOptions(null))) {
readOpts.setFillCache(false);
ReleasableSlice minBound;
if (range.hasMin()) {
2021-12-12 02:17:36 +01:00
minBound = setIterateBound(nettyDirect, readOpts, IterateBound.LOWER, range.getMinUnsafe());
2021-05-03 02:57:08 +02:00
} else {
2021-08-29 23:18:03 +02:00
minBound = emptyReleasableSlice();
2021-05-03 02:57:08 +02:00
}
2021-08-29 23:18:03 +02:00
try {
ReleasableSlice maxBound;
if (range.hasMax()) {
2021-12-12 02:17:36 +01:00
maxBound = setIterateBound(nettyDirect, readOpts, IterateBound.UPPER, range.getMaxUnsafe());
2021-05-03 02:57:08 +02:00
} else {
2021-08-29 23:18:03 +02:00
maxBound = emptyReleasableSlice();
2021-05-03 02:57:08 +02:00
}
2021-10-20 01:51:34 +02:00
try (var rocksIterator = db.newIterator(readOpts)) {
2021-09-01 00:01:56 +02:00
SafeCloseable seekTo;
2022-03-24 21:14:17 +01:00
if (!LLLocalDictionary.PREFER_AUTO_SEEK_BOUND && range.hasMin()) {
2021-12-12 02:17:36 +01:00
seekTo = rocksIterSeekTo(nettyDirect, rocksIterator, range.getMinUnsafe());
2021-08-29 23:18:03 +02:00
} else {
2021-09-01 00:01:56 +02:00
seekTo = null;
2021-08-29 23:18:03 +02:00
rocksIterator.seekToFirst();
}
2021-09-01 00:01:56 +02:00
try {
2021-08-29 23:18:03 +02:00
rocksIterator.status();
2021-09-01 00:01:56 +02:00
while (rocksIterator.isValid()) {
writeBatch.delete(cfh, rocksIterator.key());
rocksIterator.next();
rocksIterator.status();
}
} finally {
if (seekTo != null) {
seekTo.close();
}
2021-08-29 23:18:03 +02:00
}
} finally {
maxBound.close();
2021-05-03 02:57:08 +02:00
}
} finally {
2021-08-29 23:18:03 +02:00
minBound.close();
2021-05-03 02:57:08 +02:00
}
}
2021-05-02 19:18:15 +02:00
}
}
2022-03-24 21:14:17 +01:00
/**
* Useful for reverse iterations
*/
@Nullable
private static SafeCloseable rocksIterSeekFrom(boolean allowNettyDirect,
RocksIterator rocksIterator, Buffer key) {
if (allowNettyDirect && isReadOnlyDirect(key)) {
ByteBuffer keyInternalByteBuffer = ((ReadableComponent) key).readableBuffer();
assert keyInternalByteBuffer.position() == 0;
rocksIterator.seekForPrev(keyInternalByteBuffer);
// This is useful to retain the key buffer in memory and avoid deallocations
return key::isAccessible;
} else {
rocksIterator.seekForPrev(LLUtils.toArray(key));
return null;
}
}
/**
* Useful for forward iterations
*/
2021-09-01 00:01:56 +02:00
@Nullable
2021-12-12 02:17:36 +01:00
private static SafeCloseable rocksIterSeekTo(boolean allowNettyDirect,
RocksIterator rocksIterator, Buffer key) {
2022-03-16 19:19:26 +01:00
if (allowNettyDirect && isReadOnlyDirect(key)) {
ByteBuffer keyInternalByteBuffer = ((ReadableComponent) key).readableBuffer();
2021-12-13 01:57:37 +01:00
assert keyInternalByteBuffer.position() == 0;
2021-12-12 02:17:36 +01:00
rocksIterator.seek(keyInternalByteBuffer);
2021-12-13 01:57:37 +01:00
// This is useful to retain the key buffer in memory and avoid deallocations
return key::isAccessible;
2021-12-12 02:17:36 +01:00
} else {
rocksIterator.seek(LLUtils.toArray(key));
return null;
}
}
2022-01-26 19:03:51 +01:00
/**
* This method should not modify or move the writerIndex/readerIndex of the key
*/
2021-12-12 02:17:36 +01:00
private static ReleasableSlice setIterateBound(boolean allowNettyDirect,
ReadOptions readOpts, IterateBound boundType, Buffer key) {
requireNonNull(key);
AbstractSlice<?> slice;
if (allowNettyDirect && LLLocalDictionary.USE_DIRECT_BUFFER_BOUNDS
2022-03-16 19:19:26 +01:00
&& (isReadOnlyDirect(key))) {
ByteBuffer keyInternalByteBuffer = ((ReadableComponent) key).readableBuffer();
2021-12-13 01:57:37 +01:00
assert keyInternalByteBuffer.position() == 0;
2021-12-12 02:17:36 +01:00
slice = new DirectSlice(keyInternalByteBuffer, key.readableBytes());
assert slice.size() == key.readableBytes();
} else {
slice = new Slice(requireNonNull(LLUtils.toArray(key)));
}
2022-01-26 19:03:51 +01:00
if (boundType == IterateBound.LOWER) {
readOpts.setIterateLowerBound(slice);
} else {
readOpts.setIterateUpperBound(slice);
}
return new ReleasableSliceImplWithRelease(slice);
}
2021-05-02 19:18:15 +02:00
private static ReleasableSlice emptyReleasableSlice() {
var arr = new byte[0];
2021-05-21 00:19:40 +02:00
2022-01-26 19:03:51 +01:00
return new ReleasableSliceImplWithoutRelease(new Slice(arr));
2021-05-02 19:18:15 +02:00
}
2022-01-26 19:03:51 +01:00
/**
* This method should not modify or move the writerIndex/readerIndex of the key
*/
public record ReleasableSliceImplWithoutRelease(AbstractSlice<?> slice) implements ReleasableSlice {}
2021-05-21 00:19:40 +02:00
2022-01-26 19:03:51 +01:00
/**
* This class should not modify or move the writerIndex/readerIndex of the key
*/
public record ReleasableSliceImplWithRelease(AbstractSlice<?> slice) implements ReleasableSlice {
2021-05-21 00:19:40 +02:00
@Override
2021-08-29 23:18:03 +02:00
public void close() {
slice.close();
2021-03-20 12:41:11 +01:00
}
}
2021-01-30 00:24:55 +01:00
public Mono<Void> clear() {
return Mono
.<Void>fromCallable(() -> {
2022-01-26 21:18:43 +01:00
assert !Schedulers.isInNonBlockingThread() : "Called clear in a nonblocking thread";
2021-10-17 19:52:43 +02:00
boolean shouldCompactLater = false;
2021-06-27 16:52:45 +02:00
try (var readOpts = new ReadOptions(getReadOptions(null))) {
readOpts.setVerifyChecksums(VERIFY_CHECKSUMS_WHEN_NOT_NEEDED);
// readOpts.setIgnoreRangeDeletions(true);
readOpts.setFillCache(false);
2022-03-20 14:33:27 +01:00
if (LLUtils.MANUAL_READAHEAD) {
readOpts.setReadaheadSize(32 * 1024); // 32KiB
}
2021-06-27 16:52:45 +02:00
try (CappedWriteBatch writeBatch = new CappedWriteBatch(db,
2021-09-01 00:01:56 +02:00
alloc,
2021-06-27 16:52:45 +02:00
CAPPED_WRITE_BATCH_CAP,
RESERVED_WRITE_BATCH_SIZE,
MAX_WRITE_BATCH_SIZE,
BATCH_WRITE_OPTIONS
)) {
byte[] firstDeletedKey = null;
byte[] lastDeletedKey = null;
2021-10-20 01:51:34 +02:00
try (RocksIterator rocksIterator = db.newIterator(readOpts)) {
// If the database supports transactions, delete each key one by one
if (db.supportsTransactions()) {
2021-10-17 19:52:43 +02:00
rocksIterator.seekToFirst();
rocksIterator.status();
while (rocksIterator.isValid()) {
writeBatch.delete(cfh, rocksIterator.key());
rocksIterator.next();
rocksIterator.status();
}
} else {
rocksIterator.seekToLast();
2021-06-27 16:52:45 +02:00
2021-10-17 19:52:43 +02:00
rocksIterator.status();
if (rocksIterator.isValid()) {
firstDeletedKey = FIRST_KEY;
lastDeletedKey = rocksIterator.key();
writeBatch.deleteRange(cfh, FIRST_KEY, rocksIterator.key());
writeBatch.delete(cfh, rocksIterator.key());
shouldCompactLater = true;
}
2021-06-27 16:52:45 +02:00
}
2021-03-14 03:13:19 +01:00
}
2020-12-07 22:15:18 +01:00
2021-06-27 16:52:45 +02:00
writeBatch.writeToDbAndClose();
2020-12-07 22:15:18 +01:00
2021-10-17 19:52:43 +02:00
if (shouldCompactLater) {
// Compact range
2021-10-20 01:51:34 +02:00
db.suggestCompactRange();
if (lastDeletedKey != null) {
db.compactRange(firstDeletedKey, lastDeletedKey, new CompactRangeOptions()
.setAllowWriteStall(false)
.setExclusiveManualCompaction(false)
.setChangeLevel(false)
2021-10-17 19:52:43 +02:00
);
}
2021-06-27 16:52:45 +02:00
}
2020-12-07 22:15:18 +01:00
2021-10-20 01:51:34 +02:00
db.flush(new FlushOptions().setWaitForFlush(true).setAllowWriteStall(true));
2021-06-27 16:52:45 +02:00
db.flushWal(true);
}
return null;
2020-12-07 22:15:18 +01:00
}
2021-01-30 00:24:55 +01:00
})
.onErrorMap(cause -> new IOException("Failed to clear", cause))
.subscribeOn(dbScheduler);
2020-12-07 22:15:18 +01:00
}
@Override
2021-08-29 23:18:03 +02:00
public Mono<Long> sizeRange(@Nullable LLSnapshot snapshot, Mono<Send<LLRange>> rangeMono, boolean fast) {
2022-01-26 21:18:43 +01:00
return rangeMono.publishOn(dbScheduler).handle((rangeSend, sink) -> {
2021-10-19 00:22:05 +02:00
try (var range = rangeSend.receive()) {
2022-01-26 21:18:43 +01:00
assert !Schedulers.isInNonBlockingThread() : "Called sizeRange in a nonblocking thread";
2021-10-19 00:22:05 +02:00
if (range.isAll()) {
2022-01-26 21:18:43 +01:00
sink.next(fast ? fastSizeAll(snapshot) : exactSizeAll(snapshot));
2021-10-19 00:22:05 +02:00
} else {
try (var readOpts = new ReadOptions(resolveSnapshot(snapshot))) {
readOpts.setFillCache(false);
readOpts.setVerifyChecksums(VERIFY_CHECKSUMS_WHEN_NOT_NEEDED);
ReleasableSlice minBound;
if (range.hasMin()) {
2021-12-12 02:17:36 +01:00
minBound = setIterateBound(nettyDirect, readOpts, IterateBound.LOWER, range.getMinUnsafe());
2021-10-19 00:22:05 +02:00
} else {
minBound = emptyReleasableSlice();
}
try {
ReleasableSlice maxBound;
if (range.hasMax()) {
2021-12-12 02:17:36 +01:00
maxBound = setIterateBound(nettyDirect, readOpts, IterateBound.UPPER, range.getMaxUnsafe());
2021-10-16 01:49:41 +02:00
} else {
2021-10-19 00:22:05 +02:00
maxBound = emptyReleasableSlice();
}
try {
if (fast) {
readOpts.setIgnoreRangeDeletions(true);
}
2021-10-20 01:51:34 +02:00
try (var rocksIterator = db.newIterator(readOpts)) {
2021-10-19 00:22:05 +02:00
SafeCloseable seekTo;
2022-03-24 21:14:17 +01:00
if (!LLLocalDictionary.PREFER_AUTO_SEEK_BOUND && range.hasMin()) {
2021-12-12 02:17:36 +01:00
seekTo = rocksIterSeekTo(nettyDirect, rocksIterator, range.getMinUnsafe());
} else {
2021-10-19 00:22:05 +02:00
seekTo = null;
rocksIterator.seekToFirst();
}
try {
2021-10-19 00:22:05 +02:00
long i = 0;
rocksIterator.status();
while (rocksIterator.isValid()) {
rocksIterator.next();
rocksIterator.status();
i++;
2021-05-12 01:25:59 +02:00
}
2022-01-26 21:18:43 +01:00
sink.next(i);
} finally {
2021-10-19 00:22:05 +02:00
if (seekTo != null) {
seekTo.close();
}
}
}
2021-10-19 00:22:05 +02:00
} finally {
maxBound.close();
2021-10-16 01:49:41 +02:00
}
2021-10-19 00:22:05 +02:00
} finally {
minBound.close();
2021-08-29 23:18:03 +02:00
}
2021-10-19 00:22:05 +02:00
}
}
2022-01-26 21:18:43 +01:00
} catch (RocksDBException ex) {
sink.error(new RocksDBException("Failed to get size of range: " + ex.getMessage()));
2021-10-19 00:22:05 +02:00
}
2022-01-26 21:18:43 +01:00
});
2021-05-12 01:25:59 +02:00
}
@Override
2021-08-29 23:18:03 +02:00
public Mono<Send<LLEntry>> getOne(@Nullable LLSnapshot snapshot, Mono<Send<LLRange>> rangeMono) {
2022-01-26 21:18:43 +01:00
return rangeMono.publishOn(dbScheduler).handle((rangeSend, sink) -> {
try (var range = rangeSend.receive()) {
assert !Schedulers.isInNonBlockingThread() : "Called getOne in a nonblocking thread";
try (var readOpts = new ReadOptions(resolveSnapshot(snapshot))) {
ReleasableSlice minBound;
if (range.hasMin()) {
minBound = setIterateBound(nettyDirect, readOpts, IterateBound.LOWER, range.getMinUnsafe());
} else {
minBound = emptyReleasableSlice();
}
try {
ReleasableSlice maxBound;
if (range.hasMax()) {
maxBound = setIterateBound(nettyDirect, readOpts, IterateBound.UPPER, range.getMaxUnsafe());
} else {
maxBound = emptyReleasableSlice();
}
2022-01-26 21:18:43 +01:00
try (var rocksIterator = db.newIterator(readOpts)) {
SafeCloseable seekTo;
2022-03-24 21:14:17 +01:00
if (!LLLocalDictionary.PREFER_AUTO_SEEK_BOUND && range.hasMin()) {
2022-01-26 21:18:43 +01:00
seekTo = rocksIterSeekTo(nettyDirect, rocksIterator, range.getMinUnsafe());
2021-05-03 02:57:08 +02:00
} else {
2022-01-26 21:18:43 +01:00
seekTo = null;
rocksIterator.seekToFirst();
2021-03-18 19:53:32 +01:00
}
2021-08-29 23:18:03 +02:00
try {
2022-01-26 21:18:43 +01:00
rocksIterator.status();
if (rocksIterator.isValid()) {
try (var key = LLUtils.readDirectNioBuffer(alloc, rocksIterator::key)) {
try (var value = LLUtils.readDirectNioBuffer(alloc, rocksIterator::value)) {
sink.next(LLEntry.of(key.send(), value.send()).send());
2021-05-12 01:25:59 +02:00
}
2021-05-03 02:57:08 +02:00
}
2022-01-26 21:18:43 +01:00
} else {
sink.complete();
}
2021-05-03 02:57:08 +02:00
} finally {
2022-01-26 21:18:43 +01:00
if (seekTo != null) {
seekTo.close();
}
2021-03-14 13:08:03 +01:00
}
2022-01-26 21:18:43 +01:00
} finally {
maxBound.close();
2021-03-14 13:08:03 +01:00
}
2022-01-26 21:18:43 +01:00
} finally {
minBound.close();
}
2022-01-26 21:18:43 +01:00
}
} catch (RocksDBException ex) {
sink.error(new RocksDBException("Failed to get one entry: " + ex.getMessage()));
}
});
}
@Override
2021-08-29 23:18:03 +02:00
public Mono<Send<Buffer>> getOneKey(@Nullable LLSnapshot snapshot, Mono<Send<LLRange>> rangeMono) {
2022-01-26 21:18:43 +01:00
return rangeMono.publishOn(dbScheduler).handle((rangeSend, sink) -> {
try (var range = rangeSend.receive()) {
assert !Schedulers.isInNonBlockingThread() : "Called getOneKey in a nonblocking thread";
try (var readOpts = new ReadOptions(resolveSnapshot(snapshot))) {
ReleasableSlice minBound;
if (range.hasMin()) {
minBound = setIterateBound(nettyDirect, readOpts, IterateBound.LOWER, range.getMinUnsafe());
} else {
minBound = emptyReleasableSlice();
}
try {
ReleasableSlice maxBound;
if (range.hasMax()) {
maxBound = setIterateBound(nettyDirect, readOpts, IterateBound.UPPER, range.getMaxUnsafe());
} else {
maxBound = emptyReleasableSlice();
}
2022-01-26 21:18:43 +01:00
try (var rocksIterator = db.newIterator(readOpts)) {
SafeCloseable seekTo;
2022-03-24 21:14:17 +01:00
if (!LLLocalDictionary.PREFER_AUTO_SEEK_BOUND && range.hasMin()) {
2022-01-26 21:18:43 +01:00
seekTo = rocksIterSeekTo(nettyDirect, rocksIterator, range.getMinUnsafe());
2021-05-03 02:57:08 +02:00
} else {
2022-01-26 21:18:43 +01:00
seekTo = null;
rocksIterator.seekToFirst();
2021-05-03 02:57:08 +02:00
}
2021-08-29 23:18:03 +02:00
try {
2022-01-26 21:18:43 +01:00
rocksIterator.status();
if (rocksIterator.isValid()) {
sink.next(LLUtils.readDirectNioBuffer(alloc, rocksIterator::key).send());
2021-05-12 01:25:59 +02:00
} else {
2022-01-26 21:18:43 +01:00
sink.complete();
2021-05-12 01:25:59 +02:00
}
} finally {
2022-01-26 21:18:43 +01:00
if (seekTo != null) {
seekTo.close();
}
2021-05-03 02:57:08 +02:00
}
2022-01-26 21:18:43 +01:00
} finally {
maxBound.close();
}
2022-01-26 21:18:43 +01:00
} finally {
minBound.close();
}
2022-01-26 21:18:43 +01:00
}
} catch (RocksDBException ex) {
sink.error(new RocksDBException("Failed to get one key: " + ex.getMessage()));
}
});
}
private long fastSizeAll(@Nullable LLSnapshot snapshot) throws RocksDBException {
2021-06-19 21:55:20 +02:00
try (var rocksdbSnapshot = new ReadOptions(resolveSnapshot(snapshot))) {
if (USE_CURRENT_FASTSIZE_FOR_OLD_SNAPSHOTS || rocksdbSnapshot.snapshot() == null) {
try {
2021-10-20 01:51:34 +02:00
return db.getLongProperty("rocksdb.estimate-num-keys");
2021-06-19 21:55:20 +02:00
} catch (RocksDBException e) {
2021-09-10 12:13:52 +02:00
logger.error(MARKER_ROCKSDB, "Failed to get RocksDB estimated keys count property", e);
2021-06-19 21:55:20 +02:00
return 0;
}
} else if (PARALLEL_EXACT_SIZE) {
return exactSizeAll(snapshot);
} else {
rocksdbSnapshot.setFillCache(false);
rocksdbSnapshot.setVerifyChecksums(VERIFY_CHECKSUMS_WHEN_NOT_NEEDED);
rocksdbSnapshot.setIgnoreRangeDeletions(true);
long count = 0;
2021-10-20 01:51:34 +02:00
try (RocksIterator rocksIterator = db.newIterator(rocksdbSnapshot)) {
rocksIterator.seekToFirst();
rocksIterator.status();
2021-06-19 21:55:20 +02:00
// If it's a fast size of a snapshot, count only up to 100'000 elements
while (rocksIterator.isValid() && count < 100_000) {
2021-06-19 21:55:20 +02:00
count++;
rocksIterator.next();
rocksIterator.status();
2021-06-19 21:55:20 +02:00
}
return count;
2020-12-07 22:15:18 +01:00
}
}
}
}
2021-01-30 00:24:55 +01:00
private long exactSizeAll(@Nullable LLSnapshot snapshot) {
if (Schedulers.isInNonBlockingThread()) {
throw new UnsupportedOperationException("Called exactSizeAll in a nonblocking thread");
}
2021-06-19 21:55:20 +02:00
try (var readOpts = new ReadOptions(resolveSnapshot(snapshot))) {
readOpts.setFillCache(false);
2022-03-20 14:33:27 +01:00
if (LLUtils.MANUAL_READAHEAD) {
readOpts.setReadaheadSize(128 * 1024); // 128KiB
}
2021-06-19 21:55:20 +02:00
readOpts.setVerifyChecksums(VERIFY_CHECKSUMS_WHEN_NOT_NEEDED);
if (PARALLEL_EXACT_SIZE) {
var commonPool = ForkJoinPool.commonPool();
var futures = IntStream
.range(-1, LLUtils.LEXICONOGRAPHIC_ITERATION_SEEKS.length)
.mapToObj(idx -> Pair.of(idx == -1 ? new byte[0] : LLUtils.LEXICONOGRAPHIC_ITERATION_SEEKS[idx],
idx + 1 >= LLUtils.LEXICONOGRAPHIC_ITERATION_SEEKS.length ? null
: LLUtils.LEXICONOGRAPHIC_ITERATION_SEEKS[idx + 1]
))
.map(range -> (Callable<Long>) () -> {
long partialCount = 0;
try (var rangeReadOpts = new ReadOptions(readOpts)) {
Slice sliceBegin;
if (range.getKey() != null) {
sliceBegin = new Slice(range.getKey());
} else {
sliceBegin = null;
}
Slice sliceEnd;
if (range.getValue() != null) {
sliceEnd = new Slice(range.getValue());
} else {
sliceEnd = null;
}
try {
if (sliceBegin != null) {
rangeReadOpts.setIterateLowerBound(sliceBegin);
}
if (sliceBegin != null) {
rangeReadOpts.setIterateUpperBound(sliceEnd);
}
2021-10-20 01:51:34 +02:00
try (RocksIterator rocksIterator = db.newIterator(rangeReadOpts)) {
rocksIterator.seekToFirst();
rocksIterator.status();
while (rocksIterator.isValid()) {
2021-06-19 21:55:20 +02:00
partialCount++;
rocksIterator.next();
rocksIterator.status();
2021-06-19 21:55:20 +02:00
}
return partialCount;
}
} finally {
if (sliceBegin != null) {
sliceBegin.close();
}
if (sliceEnd != null) {
sliceEnd.close();
}
2021-03-18 19:53:32 +01:00
}
}
2021-06-19 21:55:20 +02:00
})
.map(commonPool::submit)
.toList();
2021-06-19 21:55:20 +02:00
long count = 0;
for (ForkJoinTask<Long> future : futures) {
count += future.join();
2021-03-18 19:53:32 +01:00
}
return count;
2021-06-19 21:55:20 +02:00
} else {
long count = 0;
2021-10-20 01:51:34 +02:00
try (RocksIterator iter = db.newIterator(readOpts)) {
2021-06-19 21:55:20 +02:00
iter.seekToFirst();
while (iter.isValid()) {
count++;
iter.next();
}
return count;
}
2021-03-18 19:53:32 +01:00
}
2020-12-07 22:15:18 +01:00
}
}
@Override
2021-08-29 23:18:03 +02:00
public Mono<Send<LLEntry>> removeOne(Mono<Send<LLRange>> rangeMono) {
2022-01-26 21:18:43 +01:00
return rangeMono.publishOn(dbScheduler).handle((rangeSend, sink) -> {
try (var range = rangeSend.receive()) {
assert !Schedulers.isInNonBlockingThread() : "Called removeOne in a nonblocking thread";
try (var readOpts = new ReadOptions(getReadOptions(null))) {
ReleasableSlice minBound;
if (range.hasMin()) {
minBound = setIterateBound(nettyDirect, readOpts, IterateBound.LOWER, range.getMinUnsafe());
} else {
minBound = emptyReleasableSlice();
}
try {
ReleasableSlice maxBound;
if (range.hasMax()) {
maxBound = setIterateBound(nettyDirect, readOpts, IterateBound.UPPER, range.getMaxUnsafe());
} else {
maxBound = emptyReleasableSlice();
}
2022-01-26 21:18:43 +01:00
try (RocksIterator rocksIterator = db.newIterator(readOpts)) {
SafeCloseable seekTo;
2022-03-24 21:14:17 +01:00
if (!LLLocalDictionary.PREFER_AUTO_SEEK_BOUND && range.hasMin()) {
2022-01-26 21:18:43 +01:00
seekTo = rocksIterSeekTo(nettyDirect, rocksIterator, range.getMinUnsafe());
2021-05-03 02:57:08 +02:00
} else {
2022-01-26 21:18:43 +01:00
seekTo = null;
rocksIterator.seekToFirst();
2021-05-03 02:57:08 +02:00
}
2021-08-29 23:18:03 +02:00
try {
2022-01-26 21:18:43 +01:00
rocksIterator.status();
if (!rocksIterator.isValid()) {
sink.complete();
return;
2021-05-12 01:25:59 +02:00
}
2022-01-26 21:18:43 +01:00
Buffer key = LLUtils.readDirectNioBuffer(alloc, rocksIterator::key);
Buffer value = LLUtils.readDirectNioBuffer(alloc, rocksIterator::value);
db.delete(EMPTY_WRITE_OPTIONS, key);
sink.next(LLEntry.of(key, value).send());
2021-05-12 01:25:59 +02:00
} finally {
2022-01-26 21:18:43 +01:00
if (seekTo != null) {
seekTo.close();
}
2021-05-03 02:57:08 +02:00
}
2022-01-26 21:18:43 +01:00
} finally {
maxBound.close();
2021-01-30 00:24:55 +01:00
}
2022-01-26 21:18:43 +01:00
} finally {
minBound.close();
}
2022-01-26 21:18:43 +01:00
}
} catch (RocksDBException ex) {
sink.error(ex);
}
});
2020-12-07 22:15:18 +01:00
}
2021-04-03 19:09:06 +02:00
2022-01-26 19:03:51 +01:00
/**
* This method should not modify or move the writerIndex/readerIndex of the buffers inside the range
*/
2021-04-03 19:09:06 +02:00
@NotNull
2022-03-24 21:14:17 +01:00
public static RocksIteratorTuple getRocksIterator(boolean allowNettyDirect,
2021-06-29 23:31:02 +02:00
ReadOptions readOptions,
2021-12-13 01:57:37 +01:00
LLRange range,
2022-03-24 21:14:17 +01:00
RocksDBColumn db,
boolean reverse) {
2022-01-26 19:03:51 +01:00
assert !Schedulers.isInNonBlockingThread() : "Called getRocksIterator in a nonblocking thread";
2021-12-13 01:57:37 +01:00
ReleasableSlice sliceMin;
ReleasableSlice sliceMax;
if (range.hasMin()) {
sliceMin = setIterateBound(allowNettyDirect, readOptions, IterateBound.LOWER, range.getMinUnsafe());
} else {
sliceMin = emptyReleasableSlice();
}
if (range.hasMax()) {
sliceMax = setIterateBound(allowNettyDirect, readOptions, IterateBound.UPPER, range.getMaxUnsafe());
} else {
sliceMax = emptyReleasableSlice();
}
var rocksIterator = db.newIterator(readOptions);
2022-03-24 21:14:17 +01:00
SafeCloseable seekFromOrTo;
if (reverse) {
if (!PREFER_AUTO_SEEK_BOUND && range.hasMax()) {
seekFromOrTo = Objects.requireNonNullElseGet(rocksIterSeekFrom(allowNettyDirect, rocksIterator, range.getMaxUnsafe()),
() -> ((SafeCloseable) () -> {}));
} else {
seekFromOrTo = () -> {};
rocksIterator.seekToLast();
}
2021-12-13 01:57:37 +01:00
} else {
2022-03-24 21:14:17 +01:00
if (!PREFER_AUTO_SEEK_BOUND && range.hasMin()) {
seekFromOrTo = Objects.requireNonNullElseGet(rocksIterSeekTo(allowNettyDirect, rocksIterator, range.getMinUnsafe()),
() -> ((SafeCloseable) () -> {}));
} else {
seekFromOrTo = () -> {};
rocksIterator.seekToFirst();
}
2021-04-03 19:09:06 +02:00
}
2022-03-24 21:14:17 +01:00
return new RocksIteratorTuple(List.of(readOptions), rocksIterator, sliceMin, sliceMax, seekFromOrTo);
2021-04-03 19:09:06 +02:00
}
2020-12-07 22:15:18 +01:00
}