CavalliumDBEngine/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java

2411 lines
82 KiB
Java
Raw Normal View History

2020-12-07 22:15:18 +01:00
package it.cavallium.dbengine.database.disk;
2021-09-04 02:19:10 +02:00
import static io.netty5.buffer.Unpooled.wrappedBuffer;
2021-08-29 23:18:03 +02:00
import static it.cavallium.dbengine.database.LLUtils.fromByteArray;
import static java.util.Objects.requireNonNull;
2021-09-04 02:19:10 +02:00
import io.netty5.buffer.api.Buffer;
import io.netty5.buffer.api.BufferAllocator;
import io.netty5.buffer.api.Resource;
import io.netty5.buffer.api.Send;
import io.netty5.buffer.api.internal.ResourceSupport;
import io.netty5.util.internal.PlatformDependent;
import it.cavallium.dbengine.client.BadBlock;
2021-07-01 21:19:52 +02:00
import it.cavallium.dbengine.client.DatabaseOptions;
import it.cavallium.dbengine.database.Column;
2021-07-17 11:52:08 +02:00
import it.cavallium.dbengine.database.ExtraKeyOperationResult;
2021-08-29 23:18:03 +02:00
import it.cavallium.dbengine.database.LLDelta;
2021-01-17 18:31:25 +01:00
import it.cavallium.dbengine.database.LLDictionary;
import it.cavallium.dbengine.database.LLDictionaryResultType;
2021-08-28 22:42:51 +02:00
import it.cavallium.dbengine.database.LLEntry;
2021-01-30 00:24:55 +01:00
import it.cavallium.dbengine.database.LLRange;
2021-01-17 18:31:25 +01:00
import it.cavallium.dbengine.database.LLSnapshot;
import it.cavallium.dbengine.database.LLUtils;
2021-07-17 11:52:08 +02:00
import it.cavallium.dbengine.database.RepeatedElementList;
2021-09-01 00:01:56 +02:00
import it.cavallium.dbengine.database.SafeCloseable;
2021-02-13 01:31:24 +01:00
import it.cavallium.dbengine.database.UpdateMode;
2021-05-08 03:09:00 +02:00
import it.cavallium.dbengine.database.UpdateReturnMode;
2021-08-22 21:23:22 +02:00
import it.cavallium.dbengine.database.serialization.BiSerializationFunction;
import it.cavallium.dbengine.database.serialization.SerializationFunction;
2021-02-06 19:21:31 +01:00
import it.unimi.dsi.fastutil.ints.IntArrayList;
2020-12-07 22:15:18 +01:00
import java.io.IOException;
import java.nio.ByteBuffer;
2021-07-17 11:52:08 +02:00
import java.time.Duration;
import java.util.ArrayList;
2021-05-02 19:18:15 +02:00
import java.util.Collection;
import java.util.List;
2020-12-07 22:15:18 +01:00
import java.util.Objects;
2021-07-17 11:52:08 +02:00
import java.util.Optional;
2021-03-18 19:53:32 +01:00
import java.util.concurrent.Callable;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
2021-02-13 00:18:57 +01:00
import java.util.concurrent.locks.StampedLock;
2020-12-07 22:15:18 +01:00
import java.util.function.Function;
2021-03-18 19:53:32 +01:00
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.commons.lang3.tuple.Pair;
2020-12-07 22:15:18 +01:00
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.rocksdb.AbstractSlice;
import org.rocksdb.CappedWriteBatch;
2020-12-07 22:15:18 +01:00
import org.rocksdb.ColumnFamilyHandle;
2021-03-20 12:41:11 +01:00
import org.rocksdb.CompactRangeOptions;
import org.rocksdb.DirectSlice;
2020-12-07 22:15:18 +01:00
import org.rocksdb.FlushOptions;
import org.rocksdb.Holder;
import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.RocksIterator;
2021-03-13 19:01:36 +01:00
import org.rocksdb.Slice;
2020-12-07 22:15:18 +01:00
import org.rocksdb.Snapshot;
2021-05-02 19:18:15 +02:00
import org.rocksdb.WriteBatch;
2020-12-07 22:15:18 +01:00
import org.rocksdb.WriteOptions;
import org.warp.commonutils.concurrency.atomicity.NotAtomic;
2021-02-06 19:21:31 +01:00
import org.warp.commonutils.locks.Striped;
import org.warp.commonutils.log.Logger;
import org.warp.commonutils.log.LoggerFactory;
2021-01-30 00:24:55 +01:00
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
2021-07-17 11:52:08 +02:00
import reactor.util.function.Tuple2;
2021-04-03 19:09:06 +02:00
import reactor.util.function.Tuple3;
2021-09-01 00:01:56 +02:00
import reactor.util.function.Tuple4;
2021-04-03 19:09:06 +02:00
import reactor.util.function.Tuples;
2020-12-07 22:15:18 +01:00
@NotAtomic
public class LLLocalDictionary implements LLDictionary {
protected static final Logger logger = LoggerFactory.getLogger(LLLocalDictionary.class);
2021-03-18 19:53:32 +01:00
private static final boolean USE_CURRENT_FASTSIZE_FOR_OLD_SNAPSHOTS = false;
2020-12-07 22:15:18 +01:00
static final int RESERVED_WRITE_BATCH_SIZE = 2 * 1024 * 1024; // 2MiB
static final long MAX_WRITE_BATCH_SIZE = 1024L * 1024L * 1024L; // 1GiB
static final int CAPPED_WRITE_BATCH_CAP = 50000; // 50K operations
static final int MULTI_GET_WINDOW = 16;
2021-07-17 11:52:08 +02:00
static final Duration MULTI_GET_WINDOW_TIMEOUT = Duration.ofSeconds(1);
static final ReadOptions EMPTY_READ_OPTIONS = new UnreleasableReadOptions(new UnmodifiableReadOptions());
static final WriteOptions EMPTY_WRITE_OPTIONS = new UnreleasableWriteOptions(new UnmodifiableWriteOptions());
static final WriteOptions BATCH_WRITE_OPTIONS = new UnreleasableWriteOptions(new UnmodifiableWriteOptions());
2021-03-14 19:38:20 +01:00
static final boolean PREFER_SEEK_TO_FIRST = false;
/**
2021-08-16 10:36:54 +02:00
* It used to be false,
* now it's true to avoid crashes during iterations on completely corrupted files
*/
static final boolean VERIFY_CHECKSUMS_WHEN_NOT_NEEDED = true;
2021-05-03 02:45:29 +02:00
/**
* Default: true. Use false to debug problems with windowing.
*/
static final boolean USE_WINDOW_IN_SET_RANGE = true;
2021-05-02 19:18:15 +02:00
/**
* Default: true. Use false to debug problems with write batches.
*/
2021-05-03 12:29:15 +02:00
static final boolean USE_WRITE_BATCHES_IN_PUT_MULTI = true;
/**
* Default: true. Use false to debug problems with write batches.
*/
static final boolean USE_WRITE_BATCHES_IN_SET_RANGE = true;
2021-05-02 19:18:15 +02:00
/**
* Default: true. Use false to debug problems with capped write batches.
*/
2021-05-03 02:45:29 +02:00
static final boolean USE_CAPPED_WRITE_BATCH_IN_SET_RANGE = true;
2021-05-03 12:29:15 +02:00
/**
* Default: true. Use false to debug problems with write batches deletes.
*/
static final boolean USE_WRITE_BATCH_IN_SET_RANGE_DELETE = false;
2021-03-18 19:53:32 +01:00
static final boolean PARALLEL_EXACT_SIZE = true;
2020-12-07 22:15:18 +01:00
2021-02-15 00:15:42 +01:00
private static final int STRIPES = 512;
2021-01-30 00:24:55 +01:00
private static final byte[] FIRST_KEY = new byte[]{};
2020-12-07 22:15:18 +01:00
private static final byte[] NO_DATA = new byte[0];
private static final boolean ASSERTIONS_ENABLED;
2021-05-03 00:29:26 +02:00
/**
* Default: true
*/
private static final boolean USE_DIRECT_BUFFER_BOUNDS = true;
2021-05-05 00:07:18 +02:00
private static final int INITIAL_DIRECT_READ_BYTE_BUF_SIZE_BYTES = 4096;
/**
* 1KiB dummy buffer, write only, used for debugging purposes
*/
private static final ByteBuffer DUMMY_WRITE_ONLY_BYTE_BUFFER = ByteBuffer.allocateDirect(1024);
static {
boolean assertionsEnabled = false;
//noinspection AssertWithSideEffects
assert (assertionsEnabled = true);
//noinspection ConstantConditions
ASSERTIONS_ENABLED = assertionsEnabled;
}
2020-12-07 22:15:18 +01:00
private final RocksDB db;
private final ColumnFamilyHandle cfh;
private final String databaseName;
2021-06-26 02:35:33 +02:00
private final String columnName;
2020-12-07 22:15:18 +01:00
private final Function<LLSnapshot, Snapshot> snapshotResolver;
2021-02-13 00:18:57 +01:00
private final Striped<StampedLock> itemsLock = Striped.readWriteStampedLock(STRIPES);
2021-02-13 01:31:24 +01:00
private final UpdateMode updateMode;
2021-08-29 23:18:03 +02:00
private final BufferAllocator alloc;
2021-05-28 16:04:59 +02:00
private final String getRangeMultiDebugName;
private final String getRangeKeysMultiDebugName;
2021-06-29 23:31:02 +02:00
private final DatabaseOptions databaseOptions;
2020-12-07 22:15:18 +01:00
2021-05-03 21:41:51 +02:00
public LLLocalDictionary(
2021-08-29 23:18:03 +02:00
BufferAllocator allocator,
2021-05-03 21:41:51 +02:00
@NotNull RocksDB db,
2020-12-07 22:15:18 +01:00
@NotNull ColumnFamilyHandle columnFamilyHandle,
String databaseName,
2021-06-26 02:35:33 +02:00
String columnName,
2021-02-13 01:31:24 +01:00
Function<LLSnapshot, Snapshot> snapshotResolver,
2021-06-29 23:31:02 +02:00
UpdateMode updateMode,
DatabaseOptions databaseOptions) {
2021-08-29 23:18:03 +02:00
requireNonNull(db);
2020-12-07 22:15:18 +01:00
this.db = db;
2021-08-29 23:18:03 +02:00
requireNonNull(columnFamilyHandle);
2020-12-07 22:15:18 +01:00
this.cfh = columnFamilyHandle;
this.databaseName = databaseName;
2021-06-26 02:35:33 +02:00
this.columnName = columnName;
2020-12-07 22:15:18 +01:00
this.snapshotResolver = snapshotResolver;
2021-02-13 01:31:24 +01:00
this.updateMode = updateMode;
2021-06-26 02:35:33 +02:00
this.getRangeMultiDebugName = databaseName + "(" + columnName + ")" + "::getRangeMulti";
this.getRangeKeysMultiDebugName = databaseName + "(" + columnName + ")" + "::getRangeKeysMulti";
2021-06-29 23:31:02 +02:00
this.databaseOptions = databaseOptions;
2021-05-03 21:41:51 +02:00
alloc = allocator;
2020-12-07 22:15:18 +01:00
}
@Override
public String getDatabaseName() {
return databaseName;
}
2021-06-26 02:35:33 +02:00
public String getColumnName() {
return columnName;
}
2021-06-19 21:55:20 +02:00
/**
2021-08-16 10:36:54 +02:00
* Please don't modify the returned ReadOptions!
* If you want to modify it, wrap it into a new ReadOptions!
2021-06-19 21:55:20 +02:00
*/
2020-12-07 22:15:18 +01:00
private ReadOptions resolveSnapshot(LLSnapshot snapshot) {
if (snapshot != null) {
return getReadOptions(snapshotResolver.apply(snapshot));
} else {
return EMPTY_READ_OPTIONS;
}
}
2021-06-19 21:55:20 +02:00
/**
2021-08-16 10:36:54 +02:00
* Please don't modify the returned ReadOptions!
* If you want to modify it, wrap it into a new ReadOptions!
2021-06-19 21:55:20 +02:00
*/
2020-12-07 22:15:18 +01:00
private ReadOptions getReadOptions(Snapshot snapshot) {
if (snapshot != null) {
return new ReadOptions().setSnapshot(snapshot);
} else {
return EMPTY_READ_OPTIONS;
}
}
2021-08-29 23:18:03 +02:00
private int getLockIndex(Buffer key) {
2021-05-08 03:09:00 +02:00
return Math.abs(LLUtils.hashCode(key) % STRIPES);
2021-02-06 19:21:31 +01:00
}
2021-08-29 23:18:03 +02:00
private IntArrayList getLockIndices(List<Buffer> keys) {
2021-02-06 19:21:31 +01:00
var list = new IntArrayList(keys.size());
2021-08-29 23:18:03 +02:00
for (Buffer key : keys) {
2021-02-06 19:21:31 +01:00
list.add(getLockIndex(key));
}
return list;
}
2021-08-28 22:42:51 +02:00
private IntArrayList getLockIndicesEntries(List<LLEntry> keys) {
2021-02-06 19:21:31 +01:00
var list = new IntArrayList(keys.size());
2021-08-28 22:42:51 +02:00
for (LLEntry key : keys) {
2021-08-29 23:18:03 +02:00
list.add(getLockIndex(key.getKeyUnsafe()));
2021-02-06 19:21:31 +01:00
}
return list;
}
2021-08-29 23:18:03 +02:00
private <X> IntArrayList getLockIndicesWithExtra(List<Tuple2<Buffer, X>> entries) {
2021-07-17 11:52:08 +02:00
var list = new IntArrayList(entries.size());
2021-08-29 23:18:03 +02:00
for (Tuple2<Buffer, X> key : entries) {
2021-07-17 11:52:08 +02:00
list.add(getLockIndex(key.getT1()));
}
return list;
}
2020-12-07 22:15:18 +01:00
@Override
2021-08-29 23:18:03 +02:00
public BufferAllocator getAllocator() {
return alloc;
}
2021-08-29 23:18:03 +02:00
private <T> @NotNull Mono<T> runOnDb(Callable<@Nullable T> callable) {
return Mono.fromCallable(callable);
}
2021-05-12 01:25:59 +02:00
@Override
2021-08-29 23:18:03 +02:00
public Mono<Send<Buffer>> get(@Nullable LLSnapshot snapshot,
Mono<Send<Buffer>> keyMono,
2021-08-16 10:36:54 +02:00
boolean existsAlmostCertainly) {
return Mono.usingWhen(keyMono,
2021-08-29 23:18:03 +02:00
keySend -> runOnDb(() -> {
try (var key = keySend.receive()) {
try {
StampedLock lock;
long stamp;
if (updateMode == UpdateMode.ALLOW) {
lock = itemsLock.getAt(getLockIndex(key));
2021-08-29 23:18:03 +02:00
stamp = lock.readLock();
} else {
lock = null;
stamp = 0;
}
try {
if (logger.isTraceEnabled()) {
logger.trace("Reading {}", LLUtils.toStringSafe(key));
}
return dbGet(cfh, resolveSnapshot(snapshot), key.send(), existsAlmostCertainly);
} finally {
if (updateMode == UpdateMode.ALLOW) {
lock.unlockRead(stamp);
}
}
} catch (Exception ex) {
throw new IOException("Failed to read " + LLUtils.toStringSafe(key), ex);
2021-05-12 01:25:59 +02:00
}
}
2021-08-29 23:18:03 +02:00
}).onErrorMap(cause -> new IOException("Failed to read", cause)),
keySend -> Mono.fromRunnable(keySend::close)
);
}
2021-08-29 23:18:03 +02:00
@Nullable
private Send<Buffer> dbGet(ColumnFamilyHandle cfh,
2021-05-03 21:41:51 +02:00
@Nullable ReadOptions readOptions,
2021-08-29 23:18:03 +02:00
Send<Buffer> keySend,
2021-05-03 21:41:51 +02:00
boolean existsAlmostCertainly) throws RocksDBException {
2021-08-29 23:18:03 +02:00
try (var key = keySend.receive()) {
if (Schedulers.isInNonBlockingThread()) {
throw new UnsupportedOperationException("Called dbGet in a nonblocking thread");
}
2021-09-01 00:01:56 +02:00
if (databaseOptions.allowNettyDirect()) {
2021-05-02 19:18:15 +02:00
2021-05-03 21:41:51 +02:00
//todo: implement keyMayExist if existsAlmostCertainly is false.
// Unfortunately it's not feasible until RocksDB implements keyMayExist with buffers
2021-05-03 21:41:51 +02:00
// Create the key nio buffer to pass to RocksDB
2021-09-01 00:01:56 +02:00
var keyNioBuffer = LLUtils.convertToDirect(alloc, key.send());
2021-05-03 21:41:51 +02:00
// Create a direct result buffer because RocksDB works only with direct buffers
2021-08-29 23:18:03 +02:00
try (Buffer resultBuf = alloc.allocate(INITIAL_DIRECT_READ_BYTE_BUF_SIZE_BYTES)) {
2021-05-03 21:41:51 +02:00
int valueSize;
int assertionReadData = -1;
ByteBuffer resultNioBuf;
do {
// Create the result nio buffer to pass to RocksDB
2021-09-01 00:01:56 +02:00
resultNioBuf = LLUtils.obtainDirect(resultBuf);
assert keyNioBuffer.byteBuffer().isDirect();
2021-08-28 22:42:51 +02:00
assert resultNioBuf.isDirect();
2021-05-03 21:41:51 +02:00
valueSize = db.get(cfh,
Objects.requireNonNullElse(readOptions, EMPTY_READ_OPTIONS),
2021-09-01 00:01:56 +02:00
keyNioBuffer.byteBuffer().position(0),
2021-05-03 21:41:51 +02:00
resultNioBuf
);
if (valueSize != RocksDB.NOT_FOUND) {
2021-08-28 22:42:51 +02:00
if (ASSERTIONS_ENABLED) {
2021-07-18 19:37:24 +02:00
// todo: check if position is equal to data that have been read
// todo: check if limit is equal to value size or data that have been read
assert valueSize <= 0 || resultNioBuf.limit() > 0;
2021-08-16 10:36:54 +02:00
// If the locking is enabled the data is safe, so since we are appending data
// to the end, we need to check if it has been appended correctly or it
// has been overwritten.
// We must not do this check otherwise because if there is no locking the data
// can be overwritten with a smaller value the next time.
2021-07-18 19:37:24 +02:00
if (updateMode == UpdateMode.ALLOW) {
// Check if read data is larger than previously read data.
2021-08-16 10:36:54 +02:00
// If it's smaller or equals it means that RocksDB is overwriting
// the beginning of the result buffer.
2021-07-18 19:37:24 +02:00
assert resultNioBuf.limit() > assertionReadData;
//noinspection ConstantConditions
2021-07-18 19:37:24 +02:00
if (ASSERTIONS_ENABLED) {
assertionReadData = resultNioBuf.limit();
}
2021-05-03 21:41:51 +02:00
}
2021-07-18 19:37:24 +02:00
// Check if read data is not bigger than the total value size.
2021-08-16 10:36:54 +02:00
// If it's bigger it means that RocksDB is writing the start
// of the result into the result buffer more than once.
2021-07-18 19:37:24 +02:00
assert resultNioBuf.limit() <= valueSize;
}
2021-05-03 21:41:51 +02:00
if (valueSize <= resultNioBuf.limit()) {
// Return the result ready to be read
2021-08-29 23:18:03 +02:00
return resultBuf.readerOffset(0).writerOffset(valueSize).send();
2021-05-03 21:41:51 +02:00
} else {
// If the locking is enabled the data is safe, so we can append the next read data.
// Otherwise we need to re-read everything.
if (updateMode == UpdateMode.ALLOW) {
// Update the resultBuf writerIndex with the new position
2021-08-29 23:18:03 +02:00
resultBuf.writerOffset(resultNioBuf.limit());
2021-05-03 21:41:51 +02:00
}
//noinspection UnusedAssignment
resultNioBuf = null;
}
// Rewind the keyNioBuf position, making it readable again for the next loop iteration
2021-09-01 00:01:56 +02:00
keyNioBuffer.byteBuffer().rewind();
2021-05-03 21:41:51 +02:00
if (resultBuf.capacity() < valueSize) {
2021-08-16 10:36:54 +02:00
// Expand the resultBuf size if the result is bigger than the current result
// buffer size
2021-08-29 23:18:03 +02:00
resultBuf.ensureWritable(valueSize);
}
}
2021-05-03 21:41:51 +02:00
// Repeat if the result has been found but it's still not finished
} while (valueSize != RocksDB.NOT_FOUND);
// If the value is not found return null
return null;
2021-09-01 00:01:56 +02:00
} finally {
keyNioBuffer.buffer().close();
PlatformDependent.freeDirectBuffer(keyNioBuffer.byteBuffer());
2021-05-03 21:41:51 +02:00
}
} else {
2021-09-02 17:15:40 +02:00
ReadOptions validReadOptions = Objects.requireNonNullElse(readOptions, EMPTY_READ_OPTIONS);
try {
2021-08-29 23:18:03 +02:00
byte[] keyArray = LLUtils.toArray(key);
requireNonNull(keyArray);
Holder<byte[]> data = existsAlmostCertainly ? null : new Holder<>();
if (existsAlmostCertainly || db.keyMayExist(cfh, validReadOptions, keyArray, data)) {
if (!existsAlmostCertainly && data.getValue() != null) {
return LLUtils.fromByteArray(alloc, data.getValue()).send();
2021-05-03 21:41:51 +02:00
} else {
2021-08-29 23:18:03 +02:00
byte[] result = db.get(cfh, validReadOptions, keyArray);
if (result == null) {
return null;
} else {
return LLUtils.fromByteArray(alloc, result).send();
}
2021-05-02 19:18:15 +02:00
}
2021-08-29 23:18:03 +02:00
} else {
return null;
2021-05-02 19:18:15 +02:00
}
2021-09-02 17:15:40 +02:00
} finally {
if (!(validReadOptions instanceof UnreleasableReadOptions)) {
validReadOptions.close();
}
2021-05-03 21:41:51 +02:00
}
}
}
}
@SuppressWarnings("SameParameterValue")
2021-08-16 10:36:54 +02:00
private void dbPut(ColumnFamilyHandle cfh,
@Nullable WriteOptions writeOptions,
2021-08-29 23:18:03 +02:00
Send<Buffer> keyToReceive,
Send<Buffer> valueToReceive) throws RocksDBException {
2021-08-31 15:50:11 +02:00
WriteOptions validWriteOptions = Objects.requireNonNullElse(writeOptions, EMPTY_WRITE_OPTIONS);
try {
2021-08-29 23:18:03 +02:00
try (var key = keyToReceive.receive()) {
try (var value = valueToReceive.receive()) {
if (Schedulers.isInNonBlockingThread()) {
throw new UnsupportedOperationException("Called dbPut in a nonblocking thread");
}
2021-08-29 23:18:03 +02:00
if (databaseOptions.allowNettyDirect()) {
2021-09-01 00:01:56 +02:00
var keyNioBuffer = LLUtils.convertToDirect(alloc, key.send());
try (var ignored1 = keyNioBuffer.buffer().receive()) {
assert keyNioBuffer.byteBuffer().isDirect();
var valueNioBuffer = LLUtils.convertToDirect(alloc, value.send());
try (var ignored2 = valueNioBuffer.buffer().receive()) {
assert valueNioBuffer.byteBuffer().isDirect();
db.put(cfh, validWriteOptions, keyNioBuffer.byteBuffer(), valueNioBuffer.byteBuffer());
} finally {
PlatformDependent.freeDirectBuffer(valueNioBuffer.byteBuffer());
}
} finally {
PlatformDependent.freeDirectBuffer(keyNioBuffer.byteBuffer());
2021-08-29 23:18:03 +02:00
}
} else {
db.put(cfh, validWriteOptions, LLUtils.toArray(key), LLUtils.toArray(value));
}
}
2021-05-03 21:41:51 +02:00
}
2021-08-31 15:50:11 +02:00
} finally {
if (writeOptions != null && !(writeOptions instanceof UnreleasableWriteOptions)) {
writeOptions.close();
}
}
2020-12-07 22:15:18 +01:00
}
@Override
2021-08-29 23:18:03 +02:00
public Mono<Boolean> isRangeEmpty(@Nullable LLSnapshot snapshot, Mono<Send<LLRange>> rangeMono) {
return Mono.usingWhen(rangeMono,
2021-08-29 23:18:03 +02:00
rangeSend -> {
try (var range = rangeSend.receive()) {
if (range.isSingle()) {
2021-09-02 17:15:40 +02:00
return this.containsKey(snapshot, Mono.fromCallable(range::getSingle));
2021-08-29 23:18:03 +02:00
} else {
2021-09-02 17:15:40 +02:00
return this.containsRange(snapshot, rangeMono);
2021-08-29 23:18:03 +02:00
}
}
},
2021-08-29 23:18:03 +02:00
rangeSend -> Mono.fromRunnable(rangeSend::close)
).map(isContained -> !isContained);
2020-12-07 22:15:18 +01:00
}
2021-08-29 23:18:03 +02:00
public Mono<Boolean> containsRange(@Nullable LLSnapshot snapshot, Mono<Send<LLRange>> rangeMono) {
return Mono.usingWhen(rangeMono,
2021-08-29 23:18:03 +02:00
rangeSend -> runOnDb(() -> {
// Temporary resources to release after finished
Buffer cloned1 = null;
Buffer cloned2 = null;
Buffer cloned3 = null;
2021-09-01 00:01:56 +02:00
ByteBuffer direct1 = null;
ByteBuffer direct2 = null;
ByteBuffer direct3 = null;
AbstractSlice<?> slice1 = null;
AbstractSlice<?> slice2 = null;
2021-08-29 23:18:03 +02:00
try {
try (var range = rangeSend.receive()) {
if (Schedulers.isInNonBlockingThread()) {
throw new UnsupportedOperationException("Called containsRange in a nonblocking thread");
}
2021-08-29 23:18:03 +02:00
try (var readOpts = new ReadOptions(resolveSnapshot(snapshot))) {
readOpts.setVerifyChecksums(VERIFY_CHECKSUMS_WHEN_NOT_NEEDED);
readOpts.setFillCache(false);
if (range.hasMin()) {
try (var rangeMin = range.getMin().receive()) {
if (databaseOptions.allowNettyDirect()) {
2021-09-01 00:01:56 +02:00
var directBuf = LLUtils.convertToDirect(alloc, rangeMin.send());
cloned1 = directBuf.buffer().receive();
direct1 = directBuf.byteBuffer();
readOpts.setIterateLowerBound(slice1 = new DirectSlice(directBuf.byteBuffer()));
2021-08-29 23:18:03 +02:00
} else {
2021-09-01 00:01:56 +02:00
readOpts.setIterateLowerBound(slice1 = new Slice(LLUtils.toArray(rangeMin)));
2021-08-29 23:18:03 +02:00
}
}
}
if (range.hasMax()) {
try (var rangeMax = range.getMax().receive()) {
if (databaseOptions.allowNettyDirect()) {
2021-09-01 00:01:56 +02:00
var directBuf = LLUtils.convertToDirect(alloc, rangeMax.send());
cloned2 = directBuf.buffer().receive();
direct2 = directBuf.byteBuffer();
readOpts.setIterateUpperBound(slice2 = new DirectSlice(directBuf.byteBuffer()));
2021-08-29 23:18:03 +02:00
} else {
2021-09-01 00:01:56 +02:00
readOpts.setIterateUpperBound(slice2 = new Slice(LLUtils.toArray(rangeMax)));
2021-08-29 23:18:03 +02:00
}
}
}
try (RocksIterator rocksIterator = db.newIterator(cfh, readOpts)) {
if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) {
try (var rangeMin = range.getMin().receive()) {
if (databaseOptions.allowNettyDirect()) {
2021-09-01 00:01:56 +02:00
var directBuf = LLUtils.convertToDirect(alloc, rangeMin.send());
cloned3 = directBuf.buffer().receive();
direct3 = directBuf.byteBuffer();
rocksIterator.seek(directBuf.byteBuffer());
2021-08-29 23:18:03 +02:00
} else {
rocksIterator.seek(LLUtils.toArray(rangeMin));
}
}
} else {
rocksIterator.seekToFirst();
}
rocksIterator.status();
return rocksIterator.isValid();
2021-06-19 21:55:20 +02:00
}
}
2020-12-07 22:15:18 +01:00
}
2021-08-29 23:18:03 +02:00
} finally {
if (cloned1 != null) cloned1.close();
if (cloned2 != null) cloned2.close();
if (cloned3 != null) cloned3.close();
2021-09-01 00:01:56 +02:00
if (direct1 != null) PlatformDependent.freeDirectBuffer(direct1);
if (direct2 != null) PlatformDependent.freeDirectBuffer(direct2);
if (direct3 != null) PlatformDependent.freeDirectBuffer(direct3);
if (slice1 != null) slice1.close();
if (slice2 != null) slice2.close();
}
2021-08-29 23:18:03 +02:00
}).onErrorMap(cause -> new IOException("Failed to read range", cause)),
rangeSend -> Mono.fromRunnable(rangeSend::close));
2021-05-12 01:25:59 +02:00
}
2021-08-29 23:18:03 +02:00
private Mono<Boolean> containsKey(@Nullable LLSnapshot snapshot, Mono<Send<Buffer>> keyMono) {
return Mono.usingWhen(keyMono,
2021-08-29 23:18:03 +02:00
keySend -> runOnDb(() -> {
try (var key = keySend.receive()) {
if (Schedulers.isInNonBlockingThread()) {
throw new UnsupportedOperationException("Called containsKey in a nonblocking thread");
}
2021-08-29 23:18:03 +02:00
StampedLock lock;
long stamp;
if (updateMode == UpdateMode.ALLOW) {
lock = itemsLock.getAt(getLockIndex(key));
2021-08-29 23:18:03 +02:00
stamp = lock.readLock();
} else {
lock = null;
stamp = 0;
2021-05-12 01:25:59 +02:00
}
2021-08-29 23:18:03 +02:00
try {
int size = RocksDB.NOT_FOUND;
byte[] keyBytes = LLUtils.toArray(key);
Holder<byte[]> data = new Holder<>();
2021-08-31 15:50:11 +02:00
var unmodifiableReadOpts = resolveSnapshot(snapshot);
try {
2021-08-29 23:18:03 +02:00
if (db.keyMayExist(cfh, unmodifiableReadOpts, keyBytes, data)) {
if (data.getValue() != null) {
size = data.getValue().length;
} else {
size = db.get(cfh, unmodifiableReadOpts, keyBytes, NO_DATA);
}
}
2021-08-31 15:50:11 +02:00
} finally {
if (unmodifiableReadOpts != null && !(unmodifiableReadOpts instanceof UnreleasableReadOptions)) {
unmodifiableReadOpts.close();
}
2021-08-29 23:18:03 +02:00
}
return size != RocksDB.NOT_FOUND;
} finally {
if (updateMode == UpdateMode.ALLOW) {
lock.unlockRead(stamp);
}
}
}
2021-08-29 23:18:03 +02:00
}).onErrorMap(cause -> new IOException("Failed to read", cause)),
keySend -> Mono.fromRunnable(keySend::close)
);
2021-01-30 00:24:55 +01:00
}
@Override
2021-08-29 23:18:03 +02:00
public Mono<Send<Buffer>> put(Mono<Send<Buffer>> keyMono,
Mono<Send<Buffer>> valueMono,
2021-08-16 10:36:54 +02:00
LLDictionaryResultType resultType) {
return Mono.usingWhen(keyMono,
2021-08-29 23:18:03 +02:00
keySend -> this
.getPreviousData(keyMono, resultType)
.concatWith(Mono.usingWhen(valueMono,
2021-08-29 23:18:03 +02:00
valueSend -> this.<Send<Buffer>>runOnDb(() -> {
try (var key = keySend.receive()) {
try (var value = valueSend.receive()) {
StampedLock lock;
long stamp;
if (updateMode == UpdateMode.ALLOW) {
lock = itemsLock.getAt(getLockIndex(key));
stamp = lock.writeLock();
} else {
lock = null;
stamp = 0;
}
try {
if (logger.isTraceEnabled()) {
logger.trace("Writing {}: {}",
LLUtils.toStringSafe(key), LLUtils.toStringSafe(value));
}
dbPut(cfh, null, key.send(), value.send());
return null;
} finally {
if (updateMode == UpdateMode.ALLOW) {
lock.unlockWrite(stamp);
}
}
}
}
}),
2021-08-29 23:18:03 +02:00
value -> Mono.fromRunnable(value::close)
).onErrorMap(cause -> new IOException("Failed to write", cause)))
.singleOrEmpty(),
2021-08-29 23:18:03 +02:00
keySend -> Mono.fromRunnable(keySend::close)
);
2020-12-07 22:15:18 +01:00
}
2021-05-02 19:18:15 +02:00
@Override
public Mono<UpdateMode> getUpdateMode() {
return Mono.fromSupplier(() -> updateMode);
}
2021-05-08 03:09:00 +02:00
// Remember to change also updateAndGetDelta() if you are modifying this function
@SuppressWarnings("DuplicatedCode")
2021-02-06 19:21:31 +01:00
@Override
2021-08-29 23:18:03 +02:00
public Mono<Send<Buffer>> update(Mono<Send<Buffer>> keyMono,
SerializationFunction<@Nullable Send<Buffer>, @Nullable Send<Buffer>> updater,
2021-05-08 03:09:00 +02:00
UpdateReturnMode updateReturnMode,
boolean existsAlmostCertainly) {
return Mono.usingWhen(keyMono,
2021-08-29 23:18:03 +02:00
keySend -> runOnDb(() -> {
try (var key = keySend.receive()) {
if (Schedulers.isInNonBlockingThread()) {
throw new UnsupportedOperationException("Called update in a nonblocking thread");
}
2021-08-29 23:18:03 +02:00
if (updateMode == UpdateMode.DISALLOW) {
throw new UnsupportedOperationException("update() is disallowed");
}
StampedLock lock;
long stamp;
if (updateMode == UpdateMode.ALLOW) {
lock = itemsLock.getAt(getLockIndex(key));
2021-05-08 03:09:00 +02:00
2021-08-29 23:18:03 +02:00
stamp = lock.readLock();
} else {
lock = null;
stamp = 0;
2021-05-08 03:09:00 +02:00
}
2021-08-29 23:18:03 +02:00
try {
if (logger.isTraceEnabled()) {
logger.trace("Reading {}", LLUtils.toStringSafe(key));
}
while (true) {
@Nullable Buffer prevData;
var prevDataHolder = existsAlmostCertainly ? null : new Holder<byte[]>();
if (existsAlmostCertainly
|| db.keyMayExist(cfh, LLUtils.toArray(key), prevDataHolder)) {
if (!existsAlmostCertainly && prevDataHolder.getValue() != null) {
byte @Nullable [] prevDataBytes = prevDataHolder.getValue();
if (prevDataBytes != null) {
prevData = LLUtils.fromByteArray(alloc, prevDataBytes);
} else {
prevData = null;
}
2021-05-08 03:09:00 +02:00
} else {
var obtainedPrevData = dbGet(cfh, null, key.copy().send(), existsAlmostCertainly);
2021-08-29 23:18:03 +02:00
if (obtainedPrevData == null) {
prevData = null;
} else {
prevData = obtainedPrevData.receive();
}
2021-05-08 03:09:00 +02:00
}
} else {
2021-08-29 23:18:03 +02:00
prevData = null;
}
try {
2021-08-29 23:18:03 +02:00
@Nullable Buffer newData;
try (Buffer prevDataToSendToUpdater = prevData == null ? null : prevData.copy()) {
try (var newDataToReceive = updater.apply(
prevDataToSendToUpdater == null ? null : prevDataToSendToUpdater.send())) {
if (newDataToReceive != null) {
newData = newDataToReceive.receive();
} else {
2021-08-29 23:18:03 +02:00
newData = null;
2021-05-12 01:25:59 +02:00
}
}
2021-08-29 23:18:03 +02:00
}
try {
if (prevData != null && newData == null) {
//noinspection DuplicatedCode
if (updateMode == UpdateMode.ALLOW) {
var ws = lock.tryConvertToWriteLock(stamp);
if (ws != 0) {
stamp = ws;
} else {
lock.unlockRead(stamp);
2021-05-08 03:09:00 +02:00
2021-08-29 23:18:03 +02:00
stamp = lock.writeLock();
continue;
}
}
if (logger.isTraceEnabled()) {
logger.trace("Deleting {}", LLUtils.toStringSafe(key));
}
dbDelete(cfh, null, key.send());
} else if (newData != null
&& (prevData == null || !LLUtils.equals(prevData, newData))) {
//noinspection DuplicatedCode
if (updateMode == UpdateMode.ALLOW) {
var ws = lock.tryConvertToWriteLock(stamp);
if (ws != 0) {
stamp = ws;
} else {
lock.unlockRead(stamp);
stamp = lock.writeLock();
continue;
}
}
if (logger.isTraceEnabled()) {
logger.trace("Writing {}: {}", LLUtils.toStringSafe(key), LLUtils.toStringSafe(newData));
}
Buffer dataToPut;
if (updateReturnMode == UpdateReturnMode.GET_NEW_VALUE) {
dataToPut = newData.copy();
} else {
dataToPut = newData;
}
try {
dbPut(cfh, null, key.send(), dataToPut.send());
} finally {
if (dataToPut != newData) {
dataToPut.close();
}
2021-05-12 01:25:59 +02:00
}
2021-05-08 03:09:00 +02:00
}
2021-08-29 23:18:03 +02:00
return switch (updateReturnMode) {
case GET_NEW_VALUE -> newData != null ? newData.send() : null;
case GET_OLD_VALUE -> prevData != null ? prevData.send() : null;
case NOTHING -> null;
//noinspection UnnecessaryDefault
default -> throw new IllegalArgumentException();
};
} finally {
if (newData != null) {
newData.close();
2021-05-08 03:09:00 +02:00
}
}
} finally {
2021-08-29 23:18:03 +02:00
if (prevData != null) {
prevData.close();
2021-05-08 03:09:00 +02:00
}
}
}
2021-08-29 23:18:03 +02:00
} finally {
if (updateMode == UpdateMode.ALLOW) {
lock.unlock(stamp);
}
}
}
2021-08-29 23:18:03 +02:00
}).onErrorMap(cause -> new IOException("Failed to read or write", cause)),
keySend -> Mono.fromRunnable(keySend::close)
);
2021-05-08 03:09:00 +02:00
}
// Remember to change also update() if you are modifying this function
@SuppressWarnings("DuplicatedCode")
@Override
2021-08-29 23:18:03 +02:00
public Mono<LLDelta> updateAndGetDelta(Mono<Send<Buffer>> keyMono,
SerializationFunction<@Nullable Send<Buffer>, @Nullable Send<Buffer>> updater,
2021-03-18 16:19:41 +01:00
boolean existsAlmostCertainly) {
return Mono.usingWhen(keyMono,
2021-08-29 23:18:03 +02:00
keySend -> this.runOnDb(() -> {
try (var key = keySend.receive()) {
if (Schedulers.isInNonBlockingThread()) {
throw new UnsupportedOperationException("Called update in a nonblocking thread");
}
2021-08-29 23:18:03 +02:00
if (updateMode == UpdateMode.DISALLOW) {
throw new UnsupportedOperationException("update() is disallowed");
}
StampedLock lock;
long stamp;
if (updateMode == UpdateMode.ALLOW) {
lock = itemsLock.getAt(getLockIndex(key));
2021-08-29 23:18:03 +02:00
stamp = lock.readLock();
} else {
lock = null;
stamp = 0;
}
2021-08-29 23:18:03 +02:00
try {
if (logger.isTraceEnabled()) {
logger.trace("Reading {}", LLUtils.toStringSafe(key));
}
while (true) {
@Nullable Buffer prevData;
var prevDataHolder = existsAlmostCertainly ? null : new Holder<byte[]>();
if (existsAlmostCertainly
|| db.keyMayExist(cfh, LLUtils.toArray(key), prevDataHolder)) {
if (!existsAlmostCertainly && prevDataHolder.getValue() != null) {
byte @Nullable [] prevDataBytes = prevDataHolder.getValue();
if (prevDataBytes != null) {
prevData = LLUtils.fromByteArray(alloc, prevDataBytes);
} else {
prevData = null;
}
} else {
2021-09-02 17:15:40 +02:00
var obtainedPrevData = dbGet(cfh, null, key.copy().send(), existsAlmostCertainly);
2021-08-29 23:18:03 +02:00
if (obtainedPrevData == null) {
prevData = null;
} else {
prevData = obtainedPrevData.receive();
}
}
} else {
2021-08-29 23:18:03 +02:00
prevData = null;
}
try {
2021-08-29 23:18:03 +02:00
@Nullable Buffer newData;
try (Buffer prevDataToSendToUpdater = prevData == null ? null : prevData.copy()) {
try (var newDataToReceive = updater.apply(
prevDataToSendToUpdater == null ? null : prevDataToSendToUpdater.send())) {
if (newDataToReceive != null) {
newData = newDataToReceive.receive();
} else {
2021-08-29 23:18:03 +02:00
newData = null;
2021-02-13 01:31:24 +01:00
}
}
2021-08-29 23:18:03 +02:00
}
try {
if (prevData != null && newData == null) {
//noinspection DuplicatedCode
if (updateMode == UpdateMode.ALLOW) {
var ws = lock.tryConvertToWriteLock(stamp);
if (ws != 0) {
stamp = ws;
} else {
lock.unlockRead(stamp);
2021-02-13 02:16:24 +01:00
2021-08-29 23:18:03 +02:00
stamp = lock.writeLock();
continue;
}
}
if (logger.isTraceEnabled()) {
logger.trace("Deleting {}", LLUtils.toStringSafe(key));
}
dbDelete(cfh, null, key.send());
} else if (newData != null
&& (prevData == null || !LLUtils.equals(prevData, newData))) {
//noinspection DuplicatedCode
if (updateMode == UpdateMode.ALLOW) {
var ws = lock.tryConvertToWriteLock(stamp);
if (ws != 0) {
stamp = ws;
} else {
lock.unlockRead(stamp);
stamp = lock.writeLock();
continue;
}
2021-02-13 01:31:24 +01:00
}
2021-08-29 23:18:03 +02:00
if (logger.isTraceEnabled()) {
logger.trace("Writing {}: {}",
LLUtils.toStringSafe(key), LLUtils.toStringSafe(newData));
}
2021-09-02 17:15:40 +02:00
assert key.isAccessible();
assert newData.isAccessible();
2021-08-29 23:18:03 +02:00
dbPut(cfh, null, key.send(), newData.copy().send());
2021-02-13 00:18:57 +01:00
}
2021-08-29 23:18:03 +02:00
return LLDelta.of(
prevData != null ? prevData.send() : null,
newData != null ? newData.send() : null
);
} finally {
if (newData != null) {
newData.close();
}
2021-02-06 19:21:31 +01:00
}
} finally {
2021-08-29 23:18:03 +02:00
if (prevData != null) {
prevData.close();
}
2021-02-06 19:21:31 +01:00
}
}
2021-08-29 23:18:03 +02:00
} finally {
if (updateMode == UpdateMode.ALLOW) {
lock.unlock(stamp);
}
}
}
2021-08-29 23:18:03 +02:00
}).onErrorMap(cause -> new IOException("Failed to read or write", cause)),
keySend -> Mono.fromRunnable(keySend::close)
);
}
2021-08-29 23:18:03 +02:00
private void dbDelete(ColumnFamilyHandle cfh, @Nullable WriteOptions writeOptions, Send<Buffer> keyToReceive)
throws RocksDBException {
2021-08-29 23:18:03 +02:00
try (var key = keyToReceive.receive()) {
if (Schedulers.isInNonBlockingThread()) {
throw new UnsupportedOperationException("Called dbDelete in a nonblocking thread");
}
2021-08-16 10:36:54 +02:00
var validWriteOptions = Objects.requireNonNullElse(writeOptions, EMPTY_WRITE_OPTIONS);
2021-08-29 23:18:03 +02:00
if (databaseOptions.allowNettyDirect()) {
2021-09-01 00:01:56 +02:00
var keyNioBuffer = LLUtils.convertToDirect(alloc, key.send());
try {
db.delete(cfh, validWriteOptions, keyNioBuffer.byteBuffer());
} finally {
keyNioBuffer.buffer().close();
PlatformDependent.freeDirectBuffer(keyNioBuffer.byteBuffer());
2021-05-03 21:41:51 +02:00
}
} else {
2021-08-16 10:36:54 +02:00
db.delete(cfh, validWriteOptions, LLUtils.toArray(key));
}
}
2021-02-06 19:21:31 +01:00
}
2020-12-07 22:15:18 +01:00
@Override
2021-08-29 23:18:03 +02:00
public Mono<Send<Buffer>> remove(Mono<Send<Buffer>> keyMono, LLDictionaryResultType resultType) {
return Mono.usingWhen(keyMono,
2021-08-29 23:18:03 +02:00
keySend -> this
.getPreviousData(keyMono, resultType)
.concatWith(this
2021-08-29 23:18:03 +02:00
.<Send<Buffer>>runOnDb(() -> {
try (var key = keySend.receive()) {
StampedLock lock;
long stamp;
2021-07-17 11:52:08 +02:00
if (updateMode == UpdateMode.ALLOW) {
2021-08-29 23:18:03 +02:00
lock = itemsLock.getAt(getLockIndex(key));
stamp = lock.writeLock();
} else {
lock = null;
stamp = 0;
}
try {
if (logger.isTraceEnabled()) {
logger.trace("Deleting {}", LLUtils.toStringSafe(key));
}
dbDelete(cfh, null, key.send());
return null;
} finally {
if (updateMode == UpdateMode.ALLOW) {
lock.unlockWrite(stamp);
}
2021-07-17 11:52:08 +02:00
}
}
})
2021-08-29 23:18:03 +02:00
.onErrorMap(cause -> new IOException("Failed to delete", cause))
)
.singleOrEmpty(),
2021-08-29 23:18:03 +02:00
keySend -> Mono.fromRunnable(keySend::close)
);
2021-01-30 00:24:55 +01:00
}
2020-12-07 22:15:18 +01:00
2021-08-29 23:18:03 +02:00
private Mono<Send<Buffer>> getPreviousData(Mono<Send<Buffer>> keyMono, LLDictionaryResultType resultType) {
2021-08-31 15:50:11 +02:00
return switch (resultType) {
case PREVIOUS_VALUE_EXISTENCE -> this
.containsKey(null, keyMono)
.single()
.map((Boolean bool) -> LLUtils.booleanToResponseByteBuffer(alloc, bool));
case PREVIOUS_VALUE -> Mono.usingWhen(
keyMono,
keySend -> this
.runOnDb(() -> {
try (var key = keySend.receive()) {
if (Schedulers.isInNonBlockingThread()) {
throw new UnsupportedOperationException("Called getPreviousData in a nonblocking thread");
}
2021-08-31 15:50:11 +02:00
StampedLock lock;
long stamp;
if (updateMode == UpdateMode.ALLOW) {
lock = itemsLock.getAt(getLockIndex(key));
stamp = lock.readLock();
} else {
lock = null;
stamp = 0;
}
try {
if (logger.isTraceEnabled()) {
logger.trace("Reading {}", LLUtils.toArray(key));
}
2021-08-31 15:50:11 +02:00
var data = new Holder<byte[]>();
if (db.keyMayExist(cfh, LLUtils.toArray(key), data)) {
if (data.getValue() != null) {
return LLUtils.fromByteArray(alloc, data.getValue()).send();
} else {
2021-08-31 15:50:11 +02:00
return dbGet(cfh, null, key.send(), true);
}
2021-08-31 15:50:11 +02:00
} else {
return null;
}
2021-08-31 15:50:11 +02:00
} finally {
if (updateMode == UpdateMode.ALLOW) {
lock.unlockRead(stamp);
}
}
}
})
.onErrorMap(cause -> new IOException("Failed to read ", cause)),
keySend -> Mono.fromRunnable(keySend::close));
case VOID -> Mono.empty();
};
}
@Override
2021-08-29 23:18:03 +02:00
public <K> Flux<Tuple3<K, Send<Buffer>, Optional<Send<Buffer>>>> getMulti(@Nullable LLSnapshot snapshot,
Flux<Tuple2<K, Send<Buffer>>> keys,
boolean existsAlmostCertainly) {
return keys
.transform(normal -> new BufferTimeOutPublisher<>(normal, MULTI_GET_WINDOW, MULTI_GET_WINDOW_TIMEOUT))
2021-07-17 11:52:08 +02:00
.doOnDiscard(Tuple2.class, discardedEntry -> {
//noinspection unchecked
2021-08-29 23:18:03 +02:00
var entry = (Tuple2<K, Buffer>) discardedEntry;
entry.getT2().close();
2021-07-17 11:52:08 +02:00
})
.doOnDiscard(Tuple3.class, discardedEntry -> {
//noinspection unchecked
2021-08-29 23:18:03 +02:00
var entry = (Tuple3<K, Buffer, Buffer>) discardedEntry;
entry.getT2().close();
entry.getT3().close();
2021-07-17 11:52:08 +02:00
})
.flatMapSequential(keysWindow -> {
2021-08-29 23:18:03 +02:00
List<Send<Buffer>> keyBufsWindowSend = new ArrayList<>(keysWindow.size());
for (Tuple2<K, Send<Buffer>> objects : keysWindow) {
keyBufsWindowSend.add(objects.getT2());
}
return runOnDb(() -> {
List<Buffer> keyBufsWindow = new ArrayList<>(keyBufsWindowSend.size());
for (Send<Buffer> bufferSend : keyBufsWindowSend) {
keyBufsWindow.add(bufferSend.receive());
}
try {
if (Schedulers.isInNonBlockingThread()) {
throw new UnsupportedOperationException("Called getMulti in a nonblocking thread");
}
2021-08-29 23:18:03 +02:00
Iterable<StampedLock> locks;
ArrayList<Long> stamps;
if (updateMode == UpdateMode.ALLOW) {
locks = itemsLock.bulkGetAt(getLockIndices(keyBufsWindow));
stamps = new ArrayList<>();
for (var lock : locks) {
stamps.add(lock.readLock());
}
} else {
locks = null;
stamps = null;
}
try {
var columnFamilyHandles = new RepeatedElementList<>(cfh, keysWindow.size());
List<byte[]> results = db.multiGetAsList(resolveSnapshot(snapshot),
columnFamilyHandles, LLUtils.toArray(keyBufsWindow));
var mappedResults = new ArrayList<Tuple3<K, Send<Buffer>, Optional<Send<Buffer>>>>(results.size());
for (int i = 0; i < results.size(); i++) {
byte[] val = results.get(i);
Optional<Buffer> valueOpt;
if (val != null) {
results.set(i, null);
valueOpt = Optional.of(LLUtils.fromByteArray(alloc, val));
} else {
valueOpt = Optional.empty();
}
mappedResults.add(Tuples.of(keysWindow.get(i).getT1(),
keyBufsWindow.get(i).send(),
valueOpt.map(Resource::send)
));
}
return mappedResults;
} finally {
if (updateMode == UpdateMode.ALLOW) {
int index = 0;
for (var lock : locks) {
lock.unlockRead(stamps.get(index));
index++;
}
}
}
} finally {
for (Buffer buffer : keyBufsWindow) {
buffer.close();
}
}
})
.flatMapIterable(list -> list)
.onErrorMap(cause -> new IOException("Failed to read keys", cause))
.doAfterTerminate(() -> keyBufsWindowSend.forEach(Send::close));
}, 2) // Max concurrency is 2 to read data while preparing the next segment
.doOnDiscard(LLEntry.class, ResourceSupport::close)
.doOnDiscard(Tuple3.class, discardedEntry -> {
if (discardedEntry.getT2() instanceof Buffer bb) {
bb.close();
}
if (discardedEntry.getT2() instanceof Optional opt) {
if (opt.isPresent() && opt.get() instanceof Buffer bb) {
bb.close();
}
2021-07-17 11:52:08 +02:00
}
2021-08-29 23:18:03 +02:00
});
}
@Override
public Flux<Send<LLEntry>> putMulti(Flux<Send<LLEntry>> entries, boolean getOldValues) {
return entries
.buffer(Math.min(MULTI_GET_WINDOW, CAPPED_WRITE_BATCH_CAP))
.flatMapSequential(ew -> this
.<List<Send<LLEntry>>>runOnDb(() -> {
var entriesWindow = new ArrayList<LLEntry>(ew.size());
for (Send<LLEntry> entrySend : ew) {
entriesWindow.add(entrySend.receive());
}
try {
if (Schedulers.isInNonBlockingThread()) {
throw new UnsupportedOperationException("Called putMulti in a nonblocking thread");
}
2021-07-31 18:00:53 +02:00
Iterable<StampedLock> locks;
ArrayList<Long> stamps;
if (updateMode == UpdateMode.ALLOW) {
2021-08-29 23:18:03 +02:00
locks = itemsLock.bulkGetAt(getLockIndicesEntries(entriesWindow));
2021-07-31 18:00:53 +02:00
stamps = new ArrayList<>();
for (var lock : locks) {
2021-08-29 23:18:03 +02:00
stamps.add(lock.writeLock());
2021-07-31 18:00:53 +02:00
}
} else {
locks = null;
stamps = null;
}
try {
2021-08-29 23:18:03 +02:00
ArrayList<Send<LLEntry>> oldValues;
if (getOldValues) {
oldValues = new ArrayList<>(entriesWindow.size());
try (var readOptions = resolveSnapshot(null)) {
for (LLEntry entry : entriesWindow) {
try (var key = entry.getKey().receive()) {
Send<Buffer> oldValue = dbGet(cfh, readOptions, key.copy().send(), false);
if (oldValue != null) {
oldValues.add(LLEntry.of(key.send(), oldValue).send());
}
}
}
}
} else {
oldValues = null;
}
if (USE_WRITE_BATCHES_IN_PUT_MULTI) {
var batch = new CappedWriteBatch(db,
2021-09-01 00:01:56 +02:00
alloc,
2021-08-29 23:18:03 +02:00
CAPPED_WRITE_BATCH_CAP,
RESERVED_WRITE_BATCH_SIZE,
MAX_WRITE_BATCH_SIZE,
BATCH_WRITE_OPTIONS
);
for (LLEntry entry : entriesWindow) {
var k = entry.getKey();
var v = entry.getValue();
2021-09-02 17:15:40 +02:00
if (databaseOptions.allowNettyDirect()) {
batch.put(cfh, k, v);
} else {
try (var key = k.receive()) {
try (var value = v.receive()) {
batch.put(cfh, LLUtils.toArray(key), LLUtils.toArray(value));
}
}
}
2021-08-29 23:18:03 +02:00
}
batch.writeToDbAndClose();
batch.close();
} else {
for (LLEntry entry : entriesWindow) {
2021-09-01 00:01:56 +02:00
var k = LLUtils.convertToDirect(alloc, entry.getKey());
try {
var v = LLUtils.convertToDirect(alloc, entry.getValue());
try {
db.put(cfh, EMPTY_WRITE_OPTIONS, k.byteBuffer(), v.byteBuffer());
} finally {
v.buffer().close();
PlatformDependent.freeDirectBuffer(v.byteBuffer());
2021-08-29 23:18:03 +02:00
}
2021-09-01 00:01:56 +02:00
} finally {
k.buffer().close();
PlatformDependent.freeDirectBuffer(k.byteBuffer());
2021-08-29 23:18:03 +02:00
}
}
2021-07-31 18:00:53 +02:00
}
2021-08-29 23:18:03 +02:00
return oldValues;
2021-07-31 18:00:53 +02:00
} finally {
if (updateMode == UpdateMode.ALLOW) {
int index = 0;
for (var lock : locks) {
2021-08-29 23:18:03 +02:00
lock.unlockWrite(stamps.get(index));
2021-07-31 18:00:53 +02:00
index++;
2021-07-17 11:52:08 +02:00
}
2021-07-31 18:00:53 +02:00
}
}
2021-08-29 23:18:03 +02:00
} finally {
for (LLEntry llEntry : entriesWindow) {
llEntry.close();
2021-05-02 19:18:15 +02:00
}
2021-08-29 23:18:03 +02:00
}
}), 2) // Max concurrency is 2 to read data while preparing the next segment
.flatMapIterable(oldValuesList -> oldValuesList)
2021-08-22 23:50:50 +02:00
.transform(LLUtils::handleDiscard);
}
2021-07-17 11:52:08 +02:00
@Override
2021-08-29 23:18:03 +02:00
public <X> Flux<ExtraKeyOperationResult<Send<Buffer>, X>> updateMulti(Flux<Tuple2<Send<Buffer>, X>> entries,
BiSerializationFunction<Send<Buffer>, X, Send<Buffer>> updateFunction) {
2021-07-17 11:52:08 +02:00
return entries
.buffer(Math.min(MULTI_GET_WINDOW, CAPPED_WRITE_BATCH_CAP))
2021-08-29 23:18:03 +02:00
.flatMapSequential(ew -> this.<Iterable<ExtraKeyOperationResult<Send<Buffer>, X>>>runOnDb(() -> {
List<Tuple2<Buffer, X>> entriesWindow = new ArrayList<>(ew.size());
for (Tuple2<Send<Buffer>, X> tuple : ew) {
entriesWindow.add(tuple.mapT1(Send::receive));
}
try {
if (Schedulers.isInNonBlockingThread()) {
throw new UnsupportedOperationException("Called updateMulti in a nonblocking thread");
}
2021-08-29 23:18:03 +02:00
List<Buffer> keyBufsWindow = new ArrayList<>(entriesWindow.size());
for (Tuple2<Buffer, X> objects : entriesWindow) {
keyBufsWindow.add(objects.getT1());
}
Iterable<StampedLock> locks;
ArrayList<Long> stamps;
if (updateMode == UpdateMode.ALLOW) {
locks = itemsLock.bulkGetAt(getLockIndicesWithExtra(entriesWindow));
stamps = new ArrayList<>();
for (var lock : locks) {
stamps.add(lock.writeLock());
}
} else {
locks = null;
stamps = null;
}
try {
var columnFamilyHandles = new RepeatedElementList<>(cfh, entriesWindow.size());
ArrayList<Tuple3<Send<Buffer>, X, Optional<Send<Buffer>>>> mappedInputs;
{
var inputs = db.multiGetAsList(resolveSnapshot(null), columnFamilyHandles,
LLUtils.toArray(keyBufsWindow));
mappedInputs = new ArrayList<>(inputs.size());
for (int i = 0; i < inputs.size(); i++) {
var val = inputs.get(i);
if (val != null) {
inputs.set(i, null);
mappedInputs.add(Tuples.of(
keyBufsWindow.get(i).send(),
entriesWindow.get(i).getT2(),
Optional.of(fromByteArray(alloc, val).send())
));
} else {
mappedInputs.add(Tuples.of(
keyBufsWindow.get(i).send(),
entriesWindow.get(i).getT2(),
Optional.empty()
));
2021-07-17 11:52:08 +02:00
}
2021-08-29 23:18:03 +02:00
}
}
var updatedValuesToWrite = new ArrayList<Send<Buffer>>(mappedInputs.size());
var valueChangedResult = new ArrayList<ExtraKeyOperationResult<Send<Buffer>, X>>(mappedInputs.size());
try {
for (var mappedInput : mappedInputs) {
try (var updatedValue = updateFunction
.apply(mappedInput.getT1(), mappedInput.getT2()).receive()) {
try (var t3 = mappedInput.getT3().map(Send::receive).orElse(null)) {
valueChangedResult.add(new ExtraKeyOperationResult<>(mappedInput.getT1(),
mappedInput.getT2(), !LLUtils.equals(t3, updatedValue)));
}
updatedValuesToWrite.add(updatedValue.send());
}
}
} finally {
for (var mappedInput : mappedInputs) {
mappedInput.getT3().ifPresent(Send::close);
}
}
2021-07-17 11:52:08 +02:00
2021-08-29 23:18:03 +02:00
if (USE_WRITE_BATCHES_IN_PUT_MULTI) {
var batch = new CappedWriteBatch(db,
2021-09-01 00:01:56 +02:00
alloc,
2021-08-29 23:18:03 +02:00
CAPPED_WRITE_BATCH_CAP,
RESERVED_WRITE_BATCH_SIZE,
MAX_WRITE_BATCH_SIZE,
BATCH_WRITE_OPTIONS
);
int i = 0;
for (Tuple2<Buffer, X> entry : entriesWindow) {
var valueToWrite = updatedValuesToWrite.get(i);
if (valueToWrite == null) {
batch.delete(cfh, entry.getT1().send());
} else {
batch.put(cfh, entry.getT1().send(), valueToWrite);
2021-07-17 11:52:08 +02:00
}
2021-08-29 23:18:03 +02:00
i++;
2021-07-17 11:52:08 +02:00
}
2021-08-29 23:18:03 +02:00
batch.writeToDbAndClose();
batch.close();
} else {
int i = 0;
for (Tuple2<Buffer, X> entry : entriesWindow) {
2021-09-01 00:01:56 +02:00
var k = LLUtils.convertToDirect(alloc, entry.getT1().send());
try {
var v = LLUtils.convertToDirect(alloc, updatedValuesToWrite.get(i));
try {
db.put(cfh, EMPTY_WRITE_OPTIONS, k.byteBuffer(), v.byteBuffer());
} finally {
v.buffer().close();
PlatformDependent.freeDirectBuffer(v.byteBuffer());
}
} finally {
k.buffer().close();
PlatformDependent.freeDirectBuffer(k.byteBuffer());
2021-08-29 23:18:03 +02:00
}
i++;
}
}
return valueChangedResult;
} finally {
if (updateMode == UpdateMode.ALLOW) {
int index = 0;
for (var lock : locks) {
lock.unlockWrite(stamps.get(index));
index++;
}
}
}
} finally {
for (Tuple2<Buffer, X> tuple : entriesWindow) {
tuple.getT1().close();
}
}
}).flatMapIterable(list -> list), /* Max concurrency is 2 to update data while preparing the next segment */ 2)
2021-07-17 11:52:08 +02:00
.doOnDiscard(Tuple2.class, entry -> {
2021-08-29 23:18:03 +02:00
if (entry.getT1() instanceof Buffer bb) {
bb.close();
2021-07-17 11:52:08 +02:00
}
2021-08-29 23:18:03 +02:00
if (entry.getT2() instanceof Buffer bb) {
bb.close();
2021-07-17 11:52:08 +02:00
}
})
.doOnDiscard(ExtraKeyOperationResult.class, entry -> {
2021-08-29 23:18:03 +02:00
if (entry.key() instanceof Buffer bb) {
bb.close();
2021-07-17 11:52:08 +02:00
}
2021-08-29 23:18:03 +02:00
if (entry.extra() instanceof Buffer bb) {
bb.close();
2021-07-17 11:52:08 +02:00
}
})
.doOnDiscard(Collection.class, obj -> {
//noinspection unchecked
var castedEntries = (Collection<ExtraKeyOperationResult<Object, Object>>) obj;
for (var entry : castedEntries) {
2021-08-29 23:18:03 +02:00
if (entry.key() instanceof Buffer bb) {
bb.close();
2021-07-17 11:52:08 +02:00
}
2021-08-29 23:18:03 +02:00
if (entry.extra() instanceof Buffer bb) {
bb.close();
2021-07-17 11:52:08 +02:00
}
}
});
}
2020-12-07 22:15:18 +01:00
@Override
2021-08-29 23:18:03 +02:00
public Flux<Send<LLEntry>> getRange(@Nullable LLSnapshot snapshot,
Mono<Send<LLRange>> rangeMono,
2021-03-18 16:19:41 +01:00
boolean existsAlmostCertainly) {
return Flux.usingWhen(rangeMono,
2021-08-29 23:18:03 +02:00
rangeSend -> {
try (var range = rangeSend.receive()) {
if (range.isSingle()) {
var rangeSingleMono = rangeMono.map(r -> r.receive().getSingle());
return getRangeSingle(snapshot, rangeSingleMono, existsAlmostCertainly);
} else {
return getRangeMulti(snapshot, rangeMono);
}
}
},
2021-08-29 23:18:03 +02:00
rangeSend -> Mono.fromRunnable(rangeSend::close)
);
}
@Override
2021-08-29 23:18:03 +02:00
public Flux<List<Send<LLEntry>>> getRangeGrouped(@Nullable LLSnapshot snapshot,
Mono<Send<LLRange>> rangeMono,
2021-03-18 16:19:41 +01:00
int prefixLength, boolean existsAlmostCertainly) {
return Flux.usingWhen(rangeMono,
2021-08-29 23:18:03 +02:00
rangeSend -> {
try (var range = rangeSend.receive()) {
if (range.isSingle()) {
var rangeSingleMono = rangeMono.map(r -> r.receive().getSingle());
return getRangeSingle(snapshot, rangeSingleMono, existsAlmostCertainly).map(List::of);
} else {
return getRangeMultiGrouped(snapshot, rangeMono, prefixLength);
}
}
},
2021-08-29 23:18:03 +02:00
rangeSend -> Mono.fromRunnable(rangeSend::close)
);
2021-01-30 00:24:55 +01:00
}
2021-08-29 23:18:03 +02:00
private Flux<Send<LLEntry>> getRangeSingle(LLSnapshot snapshot,
Mono<Send<Buffer>> keyMono,
boolean existsAlmostCertainly) {
2021-08-29 23:18:03 +02:00
return Mono
.zip(keyMono, this.get(snapshot, keyMono, existsAlmostCertainly))
.map(result -> LLEntry.of(result.getT1(), result.getT2()).send())
.flux()
.transform(LLUtils::handleDiscard);
}
2021-08-29 23:18:03 +02:00
private Flux<Send<LLEntry>> getRangeMulti(LLSnapshot snapshot, Mono<Send<LLRange>> rangeMono) {
return Flux.usingWhen(rangeMono,
2021-08-29 23:18:03 +02:00
rangeSend -> Flux.using(
() -> new LLLocalEntryReactiveRocksIterator(db, alloc, cfh, rangeSend,
2021-08-27 02:49:51 +02:00
databaseOptions.allowNettyDirect(), resolveSnapshot(snapshot), getRangeMultiDebugName),
LLLocalReactiveRocksIterator::flux,
2021-08-27 02:49:51 +02:00
LLLocalReactiveRocksIterator::release
).transform(LLUtils::handleDiscard),
2021-08-29 23:18:03 +02:00
rangeSend -> Mono.fromRunnable(rangeSend::close)
);
}
2021-08-29 23:18:03 +02:00
private Flux<List<Send<LLEntry>>> getRangeMultiGrouped(LLSnapshot snapshot, Mono<Send<LLRange>> rangeMono, int prefixLength) {
return Flux.usingWhen(rangeMono,
2021-08-29 23:18:03 +02:00
rangeSend -> Flux.using(
() -> new LLLocalGroupedEntryReactiveRocksIterator(db, alloc, cfh, prefixLength, rangeSend,
2021-08-27 02:49:51 +02:00
databaseOptions.allowNettyDirect(), resolveSnapshot(snapshot), "getRangeMultiGrouped"),
LLLocalGroupedReactiveRocksIterator::flux,
2021-08-27 02:49:51 +02:00
LLLocalGroupedReactiveRocksIterator::release
).transform(LLUtils::handleDiscard),
2021-08-29 23:18:03 +02:00
rangeSend -> Mono.fromRunnable(rangeSend::close)
);
2021-01-30 00:24:55 +01:00
}
@Override
2021-08-29 23:18:03 +02:00
public Flux<Send<Buffer>> getRangeKeys(@Nullable LLSnapshot snapshot, Mono<Send<LLRange>> rangeMono) {
return Flux.usingWhen(rangeMono,
2021-08-29 23:18:03 +02:00
rangeSend -> {
try (var range = rangeSend.receive()) {
if (range.isSingle()) {
return this.getRangeKeysSingle(snapshot, rangeMono.map(r -> r.receive().getSingle()));
} else {
return this.getRangeKeysMulti(snapshot, rangeMono);
}
}
},
2021-08-29 23:18:03 +02:00
rangeSend -> Mono.fromRunnable(rangeSend::close)
);
}
@Override
2021-08-29 23:18:03 +02:00
public Flux<List<Send<Buffer>>> getRangeKeysGrouped(@Nullable LLSnapshot snapshot,
Mono<Send<LLRange>> rangeMono,
int prefixLength) {
return Flux.usingWhen(rangeMono,
2021-08-29 23:18:03 +02:00
rangeSend -> Flux.using(
() -> new LLLocalGroupedKeyReactiveRocksIterator(db, alloc, cfh, prefixLength, rangeSend,
2021-08-27 02:49:51 +02:00
databaseOptions.allowNettyDirect(), resolveSnapshot(snapshot), "getRangeKeysGrouped"),
LLLocalGroupedReactiveRocksIterator::flux,
2021-08-27 02:49:51 +02:00
LLLocalGroupedReactiveRocksIterator::release
).transform(LLUtils::handleDiscard),
2021-08-29 23:18:03 +02:00
rangeSend -> Mono.fromRunnable(rangeSend::close)
);
2021-03-14 13:24:46 +01:00
}
@Override
2021-08-29 23:18:03 +02:00
public Flux<BadBlock> badBlocks(Mono<Send<LLRange>> rangeMono) {
return Flux.usingWhen(rangeMono,
2021-08-29 23:18:03 +02:00
rangeSend -> Flux
.create(sink -> {
2021-08-29 23:18:03 +02:00
var range = rangeSend.receive();
sink.onDispose(range::close);
try (var ro = new ReadOptions(getReadOptions(null))) {
ro.setFillCache(false);
if (!range.isSingle()) {
ro.setReadaheadSize(32 * 1024);
}
ro.setVerifyChecksums(true);
2021-09-01 00:01:56 +02:00
var rocksIteratorTuple = getRocksIterator(alloc,
databaseOptions.allowNettyDirect(), ro, range.send(), db, cfh);
try {
try (var rocksIterator = rocksIteratorTuple.getT1()) {
rocksIterator.seekToFirst();
rocksIterator.status();
while (rocksIterator.isValid() && !sink.isCancelled()) {
try {
rocksIterator.status();
rocksIterator.key(DUMMY_WRITE_ONLY_BYTE_BUFFER);
rocksIterator.status();
rocksIterator.value(DUMMY_WRITE_ONLY_BYTE_BUFFER);
rocksIterator.status();
} catch (RocksDBException ex) {
sink.next(new BadBlock(databaseName, Column.special(columnName), null, ex));
}
rocksIterator.next();
}
}
} finally {
2021-08-29 23:18:03 +02:00
rocksIteratorTuple.getT2().close();
rocksIteratorTuple.getT3().close();
2021-09-01 00:01:56 +02:00
rocksIteratorTuple.getT4().close();
}
sink.complete();
} catch (Throwable ex) {
sink.error(ex);
}
}),
2021-08-29 23:18:03 +02:00
rangeSend -> Mono.fromRunnable(rangeSend::close)
);
}
2021-03-14 13:24:46 +01:00
@Override
2021-08-29 23:18:03 +02:00
public Flux<Send<Buffer>> getRangeKeyPrefixes(@Nullable LLSnapshot snapshot, Mono<Send<LLRange>> rangeMono, int prefixLength) {
return Flux.usingWhen(rangeMono,
2021-08-29 23:18:03 +02:00
rangeSend -> Flux
.using(
() -> new LLLocalKeyPrefixReactiveRocksIterator(db,
alloc,
cfh,
prefixLength,
2021-08-29 23:18:03 +02:00
rangeSend,
databaseOptions.allowNettyDirect(),
resolveSnapshot(snapshot),
true,
"getRangeKeysGrouped"
),
LLLocalKeyPrefixReactiveRocksIterator::flux,
LLLocalKeyPrefixReactiveRocksIterator::release
),
2021-08-29 23:18:03 +02:00
rangeSend -> Mono.fromRunnable(rangeSend::close)
);
}
2021-08-29 23:18:03 +02:00
private Flux<Send<Buffer>> getRangeKeysSingle(LLSnapshot snapshot, Mono<Send<Buffer>> keyMono) {
return Flux.usingWhen(keyMono,
2021-08-29 23:18:03 +02:00
keySend -> this
.containsKey(snapshot, keyMono)
.<Send<Buffer>>handle((contains, sink) -> {
if (contains) {
2021-08-29 23:18:03 +02:00
sink.next(keySend);
} else {
sink.complete();
}
})
2021-08-29 23:18:03 +02:00
.flux()
.doOnDiscard(Buffer.class, Buffer::close),
keySend -> Mono.fromRunnable(keySend::close)
);
2020-12-07 22:15:18 +01:00
}
2021-08-29 23:18:03 +02:00
private Flux<Send<Buffer>> getRangeKeysMulti(LLSnapshot snapshot, Mono<Send<LLRange>> rangeMono) {
return Flux.usingWhen(rangeMono,
2021-08-29 23:18:03 +02:00
rangeSend -> Flux.using(
() -> new LLLocalKeyReactiveRocksIterator(db, alloc, cfh, rangeSend,
2021-08-27 02:49:51 +02:00
databaseOptions.allowNettyDirect(), resolveSnapshot(snapshot), getRangeKeysMultiDebugName),
LLLocalReactiveRocksIterator::flux,
2021-08-27 02:49:51 +02:00
LLLocalReactiveRocksIterator::release
).transform(LLUtils::handleDiscard),
2021-08-29 23:18:03 +02:00
rangeSend -> Mono.fromRunnable(rangeSend::close)
);
}
2020-12-07 22:15:18 +01:00
@Override
2021-08-29 23:18:03 +02:00
public Mono<Void> setRange(Mono<Send<LLRange>> rangeMono, Flux<Send<LLEntry>> entries) {
return Mono.usingWhen(rangeMono,
2021-08-29 23:18:03 +02:00
rangeSend -> {
if (USE_WINDOW_IN_SET_RANGE) {
2021-08-29 23:18:03 +02:00
return this
.<Void>runOnDb(() -> {
try (var range = rangeSend.receive()) {
if (Schedulers.isInNonBlockingThread()) {
throw new UnsupportedOperationException("Called setRange in a nonblocking thread");
}
2021-08-29 23:18:03 +02:00
if (!USE_WRITE_BATCH_IN_SET_RANGE_DELETE || !USE_WRITE_BATCHES_IN_SET_RANGE) {
assert EMPTY_READ_OPTIONS.isOwningHandle();
try (var opts = new ReadOptions(EMPTY_READ_OPTIONS)) {
ReleasableSlice minBound;
if (range.hasMin()) {
2021-09-01 00:01:56 +02:00
minBound = setIterateBound(alloc, databaseOptions.allowNettyDirect(),
opts,
2021-08-29 23:18:03 +02:00
IterateBound.LOWER,
range.getMin()
);
} else {
2021-08-29 23:18:03 +02:00
minBound = emptyReleasableSlice();
}
2021-08-29 23:18:03 +02:00
try {
ReleasableSlice maxBound;
if (range.hasMax()) {
2021-09-01 00:01:56 +02:00
maxBound = setIterateBound(alloc, databaseOptions.allowNettyDirect(),
2021-08-29 23:18:03 +02:00
opts,
IterateBound.UPPER,
range.getMax()
);
} else {
2021-08-29 23:18:03 +02:00
maxBound = emptyReleasableSlice();
}
2021-08-29 23:18:03 +02:00
assert cfh.isOwningHandle();
assert opts.isOwningHandle();
2021-09-01 00:01:56 +02:00
SafeCloseable seekTo;
2021-08-29 23:18:03 +02:00
try (RocksIterator it = db.newIterator(cfh, opts)) {
if (!PREFER_SEEK_TO_FIRST && range.hasMin()) {
2021-09-01 00:01:56 +02:00
seekTo = rocksIterSeekTo(alloc, databaseOptions.allowNettyDirect(), it, range.getMin());
2021-08-29 23:18:03 +02:00
} else {
2021-09-01 00:01:56 +02:00
seekTo = null;
2021-08-29 23:18:03 +02:00
it.seekToFirst();
}
2021-09-01 00:01:56 +02:00
try {
2021-08-29 23:18:03 +02:00
it.status();
2021-09-01 00:01:56 +02:00
while (it.isValid()) {
db.delete(cfh, it.key());
it.next();
it.status();
}
} finally {
if (seekTo != null) {
seekTo.close();
}
2021-08-29 23:18:03 +02:00
}
} finally {
maxBound.close();
}
} finally {
2021-08-29 23:18:03 +02:00
minBound.close();
}
2021-06-19 21:55:20 +02:00
}
2021-08-29 23:18:03 +02:00
} else if (USE_CAPPED_WRITE_BATCH_IN_SET_RANGE) {
try (var batch = new CappedWriteBatch(db,
2021-09-01 00:01:56 +02:00
alloc,
2021-08-29 23:18:03 +02:00
CAPPED_WRITE_BATCH_CAP,
RESERVED_WRITE_BATCH_SIZE,
MAX_WRITE_BATCH_SIZE,
BATCH_WRITE_OPTIONS
)) {
if (range.isSingle()) {
batch.delete(cfh, range.getSingle());
} else {
deleteSmallRangeWriteBatch(batch, range.copy().send());
}
batch.writeToDbAndClose();
}
2021-08-29 23:18:03 +02:00
} else {
try (var batch = new WriteBatch(RESERVED_WRITE_BATCH_SIZE)) {
if (range.isSingle()) {
batch.delete(cfh, LLUtils.toArray(range.getSingleUnsafe()));
} else {
deleteSmallRangeWriteBatch(batch, range.copy().send());
}
db.write(EMPTY_WRITE_OPTIONS, batch);
batch.clear();
}
}
2021-08-29 23:18:03 +02:00
return null;
2021-05-12 01:25:59 +02:00
}
})
2021-08-29 01:15:51 +02:00
.thenMany(entries.window(MULTI_GET_WINDOW))
.flatMap(keysWindowFlux -> keysWindowFlux
.collectList()
2021-08-29 23:18:03 +02:00
.flatMap(entriesListSend -> this
.<Void>runOnDb(() -> {
List<LLEntry> entriesList = new ArrayList<>(entriesListSend.size());
for (Send<LLEntry> entrySend : entriesListSend) {
entriesList.add(entrySend.receive());
}
try {
if (!USE_WRITE_BATCHES_IN_SET_RANGE) {
2021-08-28 22:42:51 +02:00
for (LLEntry entry : entriesList) {
2021-08-29 23:18:03 +02:00
assert entry.isAccessible();
2021-09-01 00:01:56 +02:00
var k = LLUtils.convertToDirect(alloc, entry.getKey());
try {
var v = LLUtils.convertToDirect(alloc, entry.getValue());
try {
db.put(cfh, EMPTY_WRITE_OPTIONS, k.byteBuffer(), v.byteBuffer());
} finally {
v.buffer().close();
PlatformDependent.freeDirectBuffer(v.byteBuffer());
2021-08-29 23:18:03 +02:00
}
2021-09-01 00:01:56 +02:00
} finally {
k.buffer().close();
PlatformDependent.freeDirectBuffer(k.byteBuffer());
2021-08-29 23:18:03 +02:00
}
}
} else if (USE_CAPPED_WRITE_BATCH_IN_SET_RANGE) {
try (var batch = new CappedWriteBatch(db,
2021-09-01 00:01:56 +02:00
alloc,
CAPPED_WRITE_BATCH_CAP,
RESERVED_WRITE_BATCH_SIZE,
MAX_WRITE_BATCH_SIZE,
BATCH_WRITE_OPTIONS
)) {
2021-08-28 22:42:51 +02:00
for (LLEntry entry : entriesList) {
2021-08-29 23:18:03 +02:00
assert entry.isAccessible();
2021-09-02 17:15:40 +02:00
if (databaseOptions.allowNettyDirect()) {
batch.put(cfh, entry.getKey(), entry.getValue());
} else {
batch.put(cfh,
LLUtils.toArray(entry.getKeyUnsafe()),
LLUtils.toArray(entry.getValueUnsafe())
);
}
}
batch.writeToDbAndClose();
}
} else {
try (var batch = new WriteBatch(RESERVED_WRITE_BATCH_SIZE)) {
2021-08-28 22:42:51 +02:00
for (LLEntry entry : entriesList) {
2021-08-29 23:18:03 +02:00
assert entry.isAccessible();
batch.put(cfh, LLUtils.toArray(entry.getKeyUnsafe()),
LLUtils.toArray(entry.getValueUnsafe()));
}
db.write(EMPTY_WRITE_OPTIONS, batch);
batch.clear();
}
2021-05-12 01:25:59 +02:00
}
return null;
} finally {
2021-08-28 22:42:51 +02:00
for (LLEntry entry : entriesList) {
2021-08-29 23:18:03 +02:00
assert entry.isAccessible();
entry.close();
2021-05-12 01:25:59 +02:00
}
2021-05-02 19:18:15 +02:00
}
})
)
)
.then()
.onErrorMap(cause -> new IOException("Failed to write range", cause));
} else {
if (USE_WRITE_BATCHES_IN_SET_RANGE) {
return Mono.fromCallable(() -> {
throw new UnsupportedOperationException("Can't use write batches in setRange without window. Please fix params");
});
}
return this
2021-08-29 23:18:03 +02:00
.getRange(null, rangeMono, false)
.flatMap(oldValueSend -> this.<Void>runOnDb(() -> {
try (var oldValue = oldValueSend.receive()) {
dbDelete(cfh, EMPTY_WRITE_OPTIONS, oldValue.getKey());
return null;
}
}))
.then(entries
2021-08-29 23:18:03 +02:00
.flatMap(entrySend -> Mono.using(
entrySend::receive,
entry -> this
.put(LLUtils.lazyRetain(entry::getKey), LLUtils.lazyRetain(entry::getValue),
LLDictionaryResultType.VOID)
.doOnNext(Send::close),
ResourceSupport::close
))
.then(Mono.<Void>empty())
)
.onErrorMap(cause -> new IOException("Failed to write range", cause));
}
},
2021-08-29 23:18:03 +02:00
rangeSend -> Mono.fromRunnable(rangeSend::close)
);
2021-03-14 03:13:19 +01:00
}
//todo: this is broken, check why. (is this still true?)
2021-08-29 23:18:03 +02:00
private void deleteSmallRangeWriteBatch(CappedWriteBatch writeBatch, Send<LLRange> rangeToReceive)
2021-03-20 12:41:11 +01:00
throws RocksDBException {
2021-08-29 23:18:03 +02:00
var range = rangeToReceive.receive();
2021-06-19 21:55:20 +02:00
try (var readOpts = new ReadOptions(getReadOptions(null))) {
2021-05-03 02:57:08 +02:00
readOpts.setFillCache(false);
ReleasableSlice minBound;
if (range.hasMin()) {
2021-09-01 00:01:56 +02:00
minBound = setIterateBound(alloc, databaseOptions.allowNettyDirect(), readOpts, IterateBound.LOWER, range.getMin());
2021-03-20 12:41:11 +01:00
} else {
2021-05-03 02:57:08 +02:00
minBound = emptyReleasableSlice();
2021-03-20 12:41:11 +01:00
}
2021-05-03 02:57:08 +02:00
try {
ReleasableSlice maxBound;
if (range.hasMax()) {
2021-09-01 00:01:56 +02:00
maxBound = setIterateBound(alloc, databaseOptions.allowNettyDirect(), readOpts, IterateBound.UPPER, range.getMax());
2021-05-03 02:57:08 +02:00
} else {
maxBound = emptyReleasableSlice();
}
try (var rocksIterator = db.newIterator(cfh, readOpts)) {
2021-09-01 00:01:56 +02:00
SafeCloseable seekTo;
2021-05-03 02:57:08 +02:00
if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) {
2021-09-01 00:01:56 +02:00
seekTo = rocksIterSeekTo(alloc, databaseOptions.allowNettyDirect(), rocksIterator, range.getMin());
2021-05-03 02:57:08 +02:00
} else {
2021-09-01 00:01:56 +02:00
seekTo = null;
2021-05-03 02:57:08 +02:00
rocksIterator.seekToFirst();
}
2021-09-01 00:01:56 +02:00
try {
rocksIterator.status();
2021-09-01 00:01:56 +02:00
while (rocksIterator.isValid()) {
writeBatch.delete(cfh, LLUtils.readDirectNioBuffer(alloc, rocksIterator::key));
rocksIterator.next();
rocksIterator.status();
}
} finally {
if (seekTo != null) {
seekTo.close();
}
2021-05-03 02:57:08 +02:00
}
} finally {
2021-08-29 23:18:03 +02:00
maxBound.close();
2021-05-03 02:57:08 +02:00
}
} finally {
2021-08-29 23:18:03 +02:00
minBound.close();
2021-03-20 12:41:11 +01:00
}
2021-08-29 23:18:03 +02:00
} catch (Throwable e) {
range.close();
throw e;
}
}
2021-08-29 23:18:03 +02:00
private void deleteSmallRangeWriteBatch(WriteBatch writeBatch, Send<LLRange> rangeToReceive)
2021-05-02 19:18:15 +02:00
throws RocksDBException {
2021-08-29 23:18:03 +02:00
try (var range = rangeToReceive.receive()) {
try (var readOpts = new ReadOptions(getReadOptions(null))) {
readOpts.setFillCache(false);
ReleasableSlice minBound;
if (range.hasMin()) {
2021-09-01 00:01:56 +02:00
minBound = setIterateBound(alloc, databaseOptions.allowNettyDirect(), readOpts, IterateBound.LOWER, range.getMin());
2021-05-03 02:57:08 +02:00
} else {
2021-08-29 23:18:03 +02:00
minBound = emptyReleasableSlice();
2021-05-03 02:57:08 +02:00
}
2021-08-29 23:18:03 +02:00
try {
ReleasableSlice maxBound;
if (range.hasMax()) {
2021-09-01 00:01:56 +02:00
maxBound = setIterateBound(alloc, databaseOptions.allowNettyDirect(), readOpts, IterateBound.UPPER,
2021-08-29 23:18:03 +02:00
range.getMax());
2021-05-03 02:57:08 +02:00
} else {
2021-08-29 23:18:03 +02:00
maxBound = emptyReleasableSlice();
2021-05-03 02:57:08 +02:00
}
2021-08-29 23:18:03 +02:00
try (var rocksIterator = db.newIterator(cfh, readOpts)) {
2021-09-01 00:01:56 +02:00
SafeCloseable seekTo;
2021-08-29 23:18:03 +02:00
if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) {
2021-09-01 00:01:56 +02:00
seekTo = rocksIterSeekTo(alloc, databaseOptions.allowNettyDirect(), rocksIterator, range.getMin());
2021-08-29 23:18:03 +02:00
} else {
2021-09-01 00:01:56 +02:00
seekTo = null;
2021-08-29 23:18:03 +02:00
rocksIterator.seekToFirst();
}
2021-09-01 00:01:56 +02:00
try {
2021-08-29 23:18:03 +02:00
rocksIterator.status();
2021-09-01 00:01:56 +02:00
while (rocksIterator.isValid()) {
writeBatch.delete(cfh, rocksIterator.key());
rocksIterator.next();
rocksIterator.status();
}
} finally {
if (seekTo != null) {
seekTo.close();
}
2021-08-29 23:18:03 +02:00
}
} finally {
maxBound.close();
2021-05-03 02:57:08 +02:00
}
} finally {
2021-08-29 23:18:03 +02:00
minBound.close();
2021-05-03 02:57:08 +02:00
}
}
2021-05-02 19:18:15 +02:00
}
}
2021-09-01 00:01:56 +02:00
@Nullable
private static SafeCloseable rocksIterSeekTo(BufferAllocator alloc, boolean allowNettyDirect, RocksIterator rocksIterator,
2021-08-29 23:18:03 +02:00
Send<Buffer> bufferToReceive) {
try (var buffer = bufferToReceive.receive()) {
if (allowNettyDirect) {
2021-09-01 00:01:56 +02:00
var direct = LLUtils.convertToDirect(alloc, buffer.send());
assert direct.byteBuffer().isDirect();
rocksIterator.seek(direct.byteBuffer());
return () -> {
direct.buffer().close();
PlatformDependent.freeDirectBuffer(direct.byteBuffer());
};
2021-05-03 21:41:51 +02:00
} else {
rocksIterator.seek(LLUtils.toArray(buffer));
2021-09-01 00:01:56 +02:00
return null;
2021-05-03 21:41:51 +02:00
}
}
}
2021-09-01 00:01:56 +02:00
private static ReleasableSlice setIterateBound(BufferAllocator alloc, boolean allowNettyDirect, ReadOptions readOpts,
2021-08-29 23:18:03 +02:00
IterateBound boundType, Send<Buffer> bufferToReceive) {
var buffer = bufferToReceive.receive();
try {
2021-08-29 23:18:03 +02:00
requireNonNull(buffer);
AbstractSlice<?> slice;
2021-08-29 23:18:03 +02:00
if (allowNettyDirect && LLLocalDictionary.USE_DIRECT_BUFFER_BOUNDS) {
2021-09-01 00:01:56 +02:00
var direct = LLUtils.convertToDirect(alloc, buffer.send());
buffer = direct.buffer().receive();
assert direct.byteBuffer().isDirect();
slice = new DirectSlice(direct.byteBuffer(), buffer.readableBytes());
2021-05-03 00:29:26 +02:00
assert slice.size() == buffer.readableBytes();
assert slice.compare(new Slice(LLUtils.toArray(buffer))) == 0;
2021-05-03 21:41:51 +02:00
if (boundType == IterateBound.LOWER) {
readOpts.setIterateLowerBound(slice);
} else {
readOpts.setIterateUpperBound(slice);
}
2021-09-01 00:01:56 +02:00
return new ReleasableSliceImpl(slice, buffer, direct.byteBuffer());
2021-05-03 00:29:26 +02:00
} else {
2021-09-01 00:01:56 +02:00
try {
2021-08-29 23:18:03 +02:00
slice = new Slice(requireNonNull(LLUtils.toArray(buffer)));
2021-09-01 00:01:56 +02:00
if (boundType == IterateBound.LOWER) {
readOpts.setIterateLowerBound(slice);
} else {
readOpts.setIterateUpperBound(slice);
}
return new ReleasableSliceImpl(slice, null, null);
} finally {
buffer.close();
2021-08-29 23:18:03 +02:00
}
}
2021-08-29 23:18:03 +02:00
} catch (Throwable e) {
buffer.close();
throw e;
}
}
2021-05-02 19:18:15 +02:00
private static ReleasableSlice emptyReleasableSlice() {
var arr = new byte[0];
2021-05-21 00:19:40 +02:00
return new SimpleSliceWithoutRelease(new Slice(arr), null, arr);
2021-05-02 19:18:15 +02:00
}
2021-08-29 23:18:03 +02:00
public static record SimpleSliceWithoutRelease(AbstractSlice<?> slice, @Nullable Buffer byteBuf,
2021-05-21 00:19:40 +02:00
@Nullable Object additionalData) implements ReleasableSlice {}
2021-08-29 23:18:03 +02:00
public static record ReleasableSliceImpl(AbstractSlice<?> slice, @Nullable Buffer byteBuf,
2021-05-21 00:19:40 +02:00
@Nullable Object additionalData) implements ReleasableSlice {
2021-05-21 00:19:40 +02:00
@Override
2021-08-29 23:18:03 +02:00
public void close() {
slice.clear();
2021-08-29 23:18:03 +02:00
slice.close();
if (byteBuf != null) {
2021-08-29 23:18:03 +02:00
byteBuf.close();
}
2021-09-01 00:01:56 +02:00
if (additionalData instanceof ByteBuffer bb && bb.isDirect()) {
PlatformDependent.freeDirectBuffer(bb);
}
2021-03-20 12:41:11 +01:00
}
}
2021-01-30 00:24:55 +01:00
public Mono<Void> clear() {
return Mono
.<Void>fromCallable(() -> {
if (Schedulers.isInNonBlockingThread()) {
throw new UnsupportedOperationException("Called clear in a nonblocking thread");
}
2021-06-27 16:52:45 +02:00
try (var readOpts = new ReadOptions(getReadOptions(null))) {
readOpts.setVerifyChecksums(VERIFY_CHECKSUMS_WHEN_NOT_NEEDED);
// readOpts.setIgnoreRangeDeletions(true);
readOpts.setFillCache(false);
readOpts.setReadaheadSize(32 * 1024); // 32KiB
try (CappedWriteBatch writeBatch = new CappedWriteBatch(db,
2021-09-01 00:01:56 +02:00
alloc,
2021-06-27 16:52:45 +02:00
CAPPED_WRITE_BATCH_CAP,
RESERVED_WRITE_BATCH_SIZE,
MAX_WRITE_BATCH_SIZE,
BATCH_WRITE_OPTIONS
)) {
byte[] firstDeletedKey = null;
byte[] lastDeletedKey = null;
try (RocksIterator rocksIterator = db.newIterator(cfh, readOpts)) {
rocksIterator.seekToLast();
rocksIterator.status();
if (rocksIterator.isValid()) {
firstDeletedKey = FIRST_KEY;
lastDeletedKey = rocksIterator.key();
writeBatch.deleteRange(cfh, FIRST_KEY, rocksIterator.key());
writeBatch.delete(cfh, rocksIterator.key());
}
2021-03-14 03:13:19 +01:00
}
2020-12-07 22:15:18 +01:00
2021-06-27 16:52:45 +02:00
writeBatch.writeToDbAndClose();
2020-12-07 22:15:18 +01:00
2021-03-20 12:41:11 +01:00
2021-06-27 16:52:45 +02:00
// Compact range
db.suggestCompactRange(cfh);
if (firstDeletedKey != null && lastDeletedKey != null) {
db.compactRange(cfh,
firstDeletedKey,
lastDeletedKey,
new CompactRangeOptions()
.setAllowWriteStall(false)
.setExclusiveManualCompaction(false)
.setChangeLevel(false)
);
}
2020-12-07 22:15:18 +01:00
2021-06-27 16:52:45 +02:00
db.flush(new FlushOptions().setWaitForFlush(true).setAllowWriteStall(true), cfh);
db.flushWal(true);
}
return null;
2020-12-07 22:15:18 +01:00
}
2021-01-30 00:24:55 +01:00
})
.onErrorMap(cause -> new IOException("Failed to clear", cause));
2020-12-07 22:15:18 +01:00
}
@Override
2021-08-29 23:18:03 +02:00
public Mono<Long> sizeRange(@Nullable LLSnapshot snapshot, Mono<Send<LLRange>> rangeMono, boolean fast) {
return Mono.usingWhen(rangeMono,
2021-08-29 23:18:03 +02:00
rangeSend -> {
try (var range = rangeSend.receive()) {
if (Schedulers.isInNonBlockingThread()) {
throw new UnsupportedOperationException("Called sizeRange in a nonblocking thread");
}
2021-08-29 23:18:03 +02:00
if (range.isAll()) {
return this
.runOnDb(() -> fast ? fastSizeAll(snapshot) : exactSizeAll(snapshot))
.onErrorMap(IOException::new);
} else {
return runOnDb(() -> {
try (var readOpts = new ReadOptions(resolveSnapshot(snapshot))) {
readOpts.setFillCache(false);
readOpts.setVerifyChecksums(VERIFY_CHECKSUMS_WHEN_NOT_NEEDED);
ReleasableSlice minBound;
if (range.hasMin()) {
2021-09-01 00:01:56 +02:00
minBound = setIterateBound(alloc, databaseOptions.allowNettyDirect(), readOpts, IterateBound.LOWER,
2021-08-29 23:18:03 +02:00
range.getMin());
} else {
2021-08-29 23:18:03 +02:00
minBound = emptyReleasableSlice();
}
try {
2021-08-29 23:18:03 +02:00
ReleasableSlice maxBound;
if (range.hasMax()) {
2021-09-01 00:01:56 +02:00
maxBound = setIterateBound(alloc, databaseOptions.allowNettyDirect(), readOpts, IterateBound.UPPER,
2021-08-29 23:18:03 +02:00
range.getMax());
} else {
maxBound = emptyReleasableSlice();
}
2021-08-29 23:18:03 +02:00
try {
if (fast) {
readOpts.setIgnoreRangeDeletions(true);
}
2021-08-29 23:18:03 +02:00
try (var rocksIterator = db.newIterator(cfh, readOpts)) {
2021-09-01 00:01:56 +02:00
SafeCloseable seekTo;
2021-08-29 23:18:03 +02:00
if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) {
2021-09-01 00:01:56 +02:00
seekTo = rocksIterSeekTo(alloc, databaseOptions.allowNettyDirect(), rocksIterator,
2021-08-29 23:18:03 +02:00
range.getMin());
} else {
2021-09-01 00:01:56 +02:00
seekTo = null;
2021-08-29 23:18:03 +02:00
rocksIterator.seekToFirst();
}
2021-09-01 00:01:56 +02:00
try {
long i = 0;
2021-08-29 23:18:03 +02:00
rocksIterator.status();
2021-09-01 00:01:56 +02:00
while (rocksIterator.isValid()) {
rocksIterator.next();
rocksIterator.status();
i++;
}
return i;
} finally {
if (seekTo != null) {
seekTo.close();
}
2021-08-29 23:18:03 +02:00
}
}
2021-08-29 23:18:03 +02:00
} finally {
maxBound.close();
2021-05-12 01:25:59 +02:00
}
} finally {
2021-08-29 23:18:03 +02:00
minBound.close();
}
}
2021-08-29 23:18:03 +02:00
}).onErrorMap(cause -> new IOException("Failed to get size of range " + range, cause));
}
}
},
2021-08-29 23:18:03 +02:00
rangeSend -> Mono.fromRunnable(rangeSend::close)
);
2021-05-12 01:25:59 +02:00
}
@Override
2021-08-29 23:18:03 +02:00
public Mono<Send<LLEntry>> getOne(@Nullable LLSnapshot snapshot, Mono<Send<LLRange>> rangeMono) {
return Mono.usingWhen(rangeMono,
2021-08-29 23:18:03 +02:00
rangeSend -> runOnDb(() -> {
try (var range = rangeSend.receive()) {
if (Schedulers.isInNonBlockingThread()) {
throw new UnsupportedOperationException("Called getOne in a nonblocking thread");
}
2021-08-29 23:18:03 +02:00
try (var readOpts = new ReadOptions(resolveSnapshot(snapshot))) {
ReleasableSlice minBound;
if (range.hasMin()) {
2021-09-01 00:01:56 +02:00
minBound = setIterateBound(alloc, databaseOptions.allowNettyDirect(), readOpts, IterateBound.LOWER,
2021-08-29 23:18:03 +02:00
range.getMin());
2021-05-03 02:57:08 +02:00
} else {
2021-08-29 23:18:03 +02:00
minBound = emptyReleasableSlice();
2021-03-18 19:53:32 +01:00
}
2021-08-29 23:18:03 +02:00
try {
ReleasableSlice maxBound;
if (range.hasMax()) {
2021-09-01 00:01:56 +02:00
maxBound = setIterateBound(alloc, databaseOptions.allowNettyDirect(), readOpts, IterateBound.UPPER,
2021-08-29 23:18:03 +02:00
range.getMax());
2021-05-12 01:25:59 +02:00
} else {
2021-08-29 23:18:03 +02:00
maxBound = emptyReleasableSlice();
}
2021-08-29 23:18:03 +02:00
try (var rocksIterator = db.newIterator(cfh, readOpts)) {
2021-09-01 00:01:56 +02:00
SafeCloseable seekTo;
2021-08-29 23:18:03 +02:00
if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) {
2021-09-01 00:01:56 +02:00
seekTo = rocksIterSeekTo(alloc, databaseOptions.allowNettyDirect(), rocksIterator, range.getMin());
2021-08-29 23:18:03 +02:00
} else {
2021-09-01 00:01:56 +02:00
seekTo = null;
2021-08-29 23:18:03 +02:00
rocksIterator.seekToFirst();
}
2021-09-01 00:01:56 +02:00
try {
rocksIterator.status();
if (rocksIterator.isValid()) {
try (var key = LLUtils.readDirectNioBuffer(alloc, rocksIterator::key).receive()) {
try (var value = LLUtils.readDirectNioBuffer(alloc, rocksIterator::value).receive()) {
return LLEntry.of(key.send(), value.send()).send();
}
2021-08-29 23:18:03 +02:00
}
2021-09-01 00:01:56 +02:00
} else {
return null;
}
} finally {
if (seekTo != null) {
seekTo.close();
2021-05-12 01:25:59 +02:00
}
2021-05-03 02:57:08 +02:00
}
2021-08-29 23:18:03 +02:00
} finally {
maxBound.close();
}
2021-05-03 02:57:08 +02:00
} finally {
2021-08-29 23:18:03 +02:00
minBound.close();
2021-03-14 13:08:03 +01:00
}
}
}
}),
2021-08-29 23:18:03 +02:00
rangeSend -> Mono.fromRunnable(rangeSend::close)
);
}
@Override
2021-08-29 23:18:03 +02:00
public Mono<Send<Buffer>> getOneKey(@Nullable LLSnapshot snapshot, Mono<Send<LLRange>> rangeMono) {
return Mono.usingWhen(rangeMono,
2021-08-29 23:18:03 +02:00
rangeSend -> runOnDb(() -> {
try (var range = rangeSend.receive()) {
if (Schedulers.isInNonBlockingThread()) {
throw new UnsupportedOperationException("Called getOneKey in a nonblocking thread");
}
2021-08-29 23:18:03 +02:00
try (var readOpts = new ReadOptions(resolveSnapshot(snapshot))) {
ReleasableSlice minBound;
if (range.hasMin()) {
2021-09-01 00:01:56 +02:00
minBound = setIterateBound(alloc, databaseOptions.allowNettyDirect(), readOpts, IterateBound.LOWER,
2021-08-29 23:18:03 +02:00
range.getMin());
2021-05-03 02:57:08 +02:00
} else {
2021-08-29 23:18:03 +02:00
minBound = emptyReleasableSlice();
2021-05-03 02:57:08 +02:00
}
2021-08-29 23:18:03 +02:00
try {
ReleasableSlice maxBound;
if (range.hasMax()) {
2021-09-01 00:01:56 +02:00
maxBound = setIterateBound(alloc, databaseOptions.allowNettyDirect(), readOpts, IterateBound.UPPER,
2021-08-29 23:18:03 +02:00
range.getMax());
2021-05-12 01:25:59 +02:00
} else {
2021-08-29 23:18:03 +02:00
maxBound = emptyReleasableSlice();
2021-05-12 01:25:59 +02:00
}
2021-08-29 23:18:03 +02:00
try (var rocksIterator = db.newIterator(cfh, readOpts)) {
2021-09-01 00:01:56 +02:00
SafeCloseable seekTo;
2021-08-29 23:18:03 +02:00
if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) {
2021-09-01 00:01:56 +02:00
seekTo = rocksIterSeekTo(alloc, databaseOptions.allowNettyDirect(), rocksIterator, range.getMin());
2021-08-29 23:18:03 +02:00
} else {
2021-09-01 00:01:56 +02:00
seekTo = null;
2021-08-29 23:18:03 +02:00
rocksIterator.seekToFirst();
}
2021-09-01 00:01:56 +02:00
try {
rocksIterator.status();
if (rocksIterator.isValid()) {
return LLUtils.readDirectNioBuffer(alloc, rocksIterator::key);
} else {
return null;
}
} finally {
if (seekTo != null) {
seekTo.close();
}
2021-08-29 23:18:03 +02:00
}
} finally {
maxBound.close();
2021-05-12 01:25:59 +02:00
}
} finally {
2021-08-29 23:18:03 +02:00
minBound.close();
2021-05-03 02:57:08 +02:00
}
}
}
}),
2021-08-29 23:18:03 +02:00
rangeSend -> Mono.fromRunnable(rangeSend::close)
);
}
private long fastSizeAll(@Nullable LLSnapshot snapshot) throws RocksDBException {
2021-06-19 21:55:20 +02:00
try (var rocksdbSnapshot = new ReadOptions(resolveSnapshot(snapshot))) {
if (USE_CURRENT_FASTSIZE_FOR_OLD_SNAPSHOTS || rocksdbSnapshot.snapshot() == null) {
try {
return db.getLongProperty(cfh, "rocksdb.estimate-num-keys");
} catch (RocksDBException e) {
e.printStackTrace();
return 0;
}
} else if (PARALLEL_EXACT_SIZE) {
return exactSizeAll(snapshot);
} else {
rocksdbSnapshot.setFillCache(false);
rocksdbSnapshot.setVerifyChecksums(VERIFY_CHECKSUMS_WHEN_NOT_NEEDED);
rocksdbSnapshot.setIgnoreRangeDeletions(true);
long count = 0;
try (RocksIterator rocksIterator = db.newIterator(cfh, rocksdbSnapshot)) {
rocksIterator.seekToFirst();
rocksIterator.status();
2021-06-19 21:55:20 +02:00
// If it's a fast size of a snapshot, count only up to 100'000 elements
while (rocksIterator.isValid() && count < 100_000) {
2021-06-19 21:55:20 +02:00
count++;
rocksIterator.next();
rocksIterator.status();
2021-06-19 21:55:20 +02:00
}
return count;
2020-12-07 22:15:18 +01:00
}
}
}
}
2021-01-30 00:24:55 +01:00
private long exactSizeAll(@Nullable LLSnapshot snapshot) {
if (Schedulers.isInNonBlockingThread()) {
throw new UnsupportedOperationException("Called exactSizeAll in a nonblocking thread");
}
2021-06-19 21:55:20 +02:00
try (var readOpts = new ReadOptions(resolveSnapshot(snapshot))) {
readOpts.setFillCache(false);
readOpts.setReadaheadSize(32 * 1024); // 32KiB
readOpts.setVerifyChecksums(VERIFY_CHECKSUMS_WHEN_NOT_NEEDED);
if (PARALLEL_EXACT_SIZE) {
var commonPool = ForkJoinPool.commonPool();
var futures = IntStream
.range(-1, LLUtils.LEXICONOGRAPHIC_ITERATION_SEEKS.length)
.mapToObj(idx -> Pair.of(idx == -1 ? new byte[0] : LLUtils.LEXICONOGRAPHIC_ITERATION_SEEKS[idx],
idx + 1 >= LLUtils.LEXICONOGRAPHIC_ITERATION_SEEKS.length ? null
: LLUtils.LEXICONOGRAPHIC_ITERATION_SEEKS[idx + 1]
))
.map(range -> (Callable<Long>) () -> {
long partialCount = 0;
try (var rangeReadOpts = new ReadOptions(readOpts)) {
Slice sliceBegin;
if (range.getKey() != null) {
sliceBegin = new Slice(range.getKey());
} else {
sliceBegin = null;
}
Slice sliceEnd;
if (range.getValue() != null) {
sliceEnd = new Slice(range.getValue());
} else {
sliceEnd = null;
}
try {
if (sliceBegin != null) {
rangeReadOpts.setIterateLowerBound(sliceBegin);
}
if (sliceBegin != null) {
rangeReadOpts.setIterateUpperBound(sliceEnd);
}
try (RocksIterator rocksIterator = db.newIterator(cfh, rangeReadOpts)) {
rocksIterator.seekToFirst();
rocksIterator.status();
while (rocksIterator.isValid()) {
2021-06-19 21:55:20 +02:00
partialCount++;
rocksIterator.next();
rocksIterator.status();
2021-06-19 21:55:20 +02:00
}
return partialCount;
}
} finally {
if (sliceBegin != null) {
sliceBegin.close();
}
if (sliceEnd != null) {
sliceEnd.close();
}
2021-03-18 19:53:32 +01:00
}
}
2021-06-19 21:55:20 +02:00
})
.map(commonPool::submit)
.collect(Collectors.toList());
long count = 0;
for (ForkJoinTask<Long> future : futures) {
count += future.join();
2021-03-18 19:53:32 +01:00
}
return count;
2021-06-19 21:55:20 +02:00
} else {
long count = 0;
try (RocksIterator iter = db.newIterator(cfh, readOpts)) {
iter.seekToFirst();
while (iter.isValid()) {
count++;
iter.next();
}
return count;
}
2021-03-18 19:53:32 +01:00
}
2020-12-07 22:15:18 +01:00
}
}
@Override
2021-08-29 23:18:03 +02:00
public Mono<Send<LLEntry>> removeOne(Mono<Send<LLRange>> rangeMono) {
return Mono.usingWhen(rangeMono,
2021-08-29 23:18:03 +02:00
rangeSend -> runOnDb(() -> {
try (var range = rangeSend.receive()) {
if (Schedulers.isInNonBlockingThread()) {
throw new UnsupportedOperationException("Called removeOne in a nonblocking thread");
}
2021-08-29 23:18:03 +02:00
try (var readOpts = new ReadOptions(getReadOptions(null))) {
ReleasableSlice minBound;
if (range.hasMin()) {
2021-09-01 00:01:56 +02:00
minBound = setIterateBound(alloc, databaseOptions.allowNettyDirect(), readOpts, IterateBound.LOWER,
2021-08-29 23:18:03 +02:00
range.getMin());
2021-05-03 02:57:08 +02:00
} else {
2021-08-29 23:18:03 +02:00
minBound = emptyReleasableSlice();
2021-05-03 02:57:08 +02:00
}
2021-08-29 23:18:03 +02:00
try {
ReleasableSlice maxBound;
if (range.hasMax()) {
2021-09-01 00:01:56 +02:00
maxBound = setIterateBound(alloc, databaseOptions.allowNettyDirect(), readOpts, IterateBound.UPPER,
2021-08-29 23:18:03 +02:00
range.getMax());
2021-05-12 01:25:59 +02:00
} else {
2021-08-29 23:18:03 +02:00
maxBound = emptyReleasableSlice();
2021-05-12 01:25:59 +02:00
}
2021-08-29 23:18:03 +02:00
try (RocksIterator rocksIterator = db.newIterator(cfh, readOpts)) {
2021-09-01 00:01:56 +02:00
SafeCloseable seekTo;
2021-08-29 23:18:03 +02:00
if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) {
2021-09-01 00:01:56 +02:00
seekTo = rocksIterSeekTo(alloc, databaseOptions.allowNettyDirect(), rocksIterator, range.getMin());
2021-08-29 23:18:03 +02:00
} else {
2021-09-01 00:01:56 +02:00
seekTo = null;
2021-08-29 23:18:03 +02:00
rocksIterator.seekToFirst();
}
2021-09-01 00:01:56 +02:00
try {
rocksIterator.status();
if (!rocksIterator.isValid()) {
return null;
}
try (Buffer key = LLUtils.readDirectNioBuffer(alloc, rocksIterator::key).receive()) {
try (Buffer value = LLUtils.readDirectNioBuffer(alloc, rocksIterator::value).receive()) {
dbDelete(cfh, null, key.copy().send());
return LLEntry.of(key.send(), value.send()).send();
}
}
} finally {
if (seekTo != null) {
seekTo.close();
2021-08-29 23:18:03 +02:00
}
}
} finally {
maxBound.close();
2021-05-12 01:25:59 +02:00
}
} finally {
2021-08-29 23:18:03 +02:00
minBound.close();
2021-05-03 02:57:08 +02:00
}
2021-01-30 00:24:55 +01:00
}
}
2021-08-29 23:18:03 +02:00
}).onErrorMap(cause -> new IOException("Failed to delete", cause)),
rangeSend -> Mono.fromRunnable(rangeSend::close)
);
2020-12-07 22:15:18 +01:00
}
2021-04-03 19:09:06 +02:00
@NotNull
2021-09-01 00:01:56 +02:00
public static Tuple4<RocksIterator, ReleasableSlice, ReleasableSlice, SafeCloseable> getRocksIterator(BufferAllocator alloc,
boolean allowNettyDirect,
2021-06-29 23:31:02 +02:00
ReadOptions readOptions,
2021-08-29 23:18:03 +02:00
Send<LLRange> rangeToReceive,
2021-04-03 19:09:06 +02:00
RocksDB db,
ColumnFamilyHandle cfh) {
2021-08-29 23:18:03 +02:00
try (var range = rangeToReceive.receive()) {
if (Schedulers.isInNonBlockingThread()) {
throw new UnsupportedOperationException("Called getRocksIterator in a nonblocking thread");
}
ReleasableSlice sliceMin;
ReleasableSlice sliceMax;
if (range.hasMin()) {
2021-09-01 00:01:56 +02:00
sliceMin = setIterateBound(alloc, allowNettyDirect, readOptions, IterateBound.LOWER, range.getMin());
} else {
2021-05-02 19:18:15 +02:00
sliceMin = emptyReleasableSlice();
}
if (range.hasMax()) {
2021-09-01 00:01:56 +02:00
sliceMax = setIterateBound(alloc, allowNettyDirect, readOptions, IterateBound.UPPER, range.getMax());
} else {
2021-05-02 19:18:15 +02:00
sliceMax = emptyReleasableSlice();
}
var rocksIterator = db.newIterator(cfh, readOptions);
2021-09-01 00:01:56 +02:00
SafeCloseable seekTo;
if (!PREFER_SEEK_TO_FIRST && range.hasMin()) {
2021-09-01 00:01:56 +02:00
seekTo = Objects.requireNonNullElseGet(rocksIterSeekTo(alloc, allowNettyDirect, rocksIterator, range.getMin()),
() -> ((SafeCloseable) () -> {})
);
} else {
2021-09-01 00:01:56 +02:00
seekTo = () -> {};
rocksIterator.seekToFirst();
}
2021-09-01 00:01:56 +02:00
return Tuples.of(rocksIterator, sliceMin, sliceMax, seekTo);
2021-04-03 19:09:06 +02:00
}
}
2020-12-07 22:15:18 +01:00
}