CavalliumDBEngine/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java

1727 lines
54 KiB
Java
Raw Normal View History

2020-12-07 22:15:18 +01:00
package it.cavallium.dbengine.database.disk;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.util.ReferenceCounted;
2021-01-17 18:31:25 +01:00
import it.cavallium.dbengine.database.LLDictionary;
import it.cavallium.dbengine.database.LLDictionaryResultType;
2021-01-30 00:24:55 +01:00
import it.cavallium.dbengine.database.LLRange;
2021-01-17 18:31:25 +01:00
import it.cavallium.dbengine.database.LLSnapshot;
import it.cavallium.dbengine.database.LLUtils;
2021-02-13 01:31:24 +01:00
import it.cavallium.dbengine.database.UpdateMode;
2021-02-06 19:21:31 +01:00
import it.unimi.dsi.fastutil.ints.IntArrayList;
import it.unimi.dsi.fastutil.objects.ObjectArrayList;
2020-12-07 22:15:18 +01:00
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
2020-12-07 22:15:18 +01:00
import java.util.Arrays;
2021-05-02 19:18:15 +02:00
import java.util.Collection;
import java.util.List;
2020-12-07 22:15:18 +01:00
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
2021-03-18 19:53:32 +01:00
import java.util.concurrent.Callable;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
2021-02-13 00:18:57 +01:00
import java.util.concurrent.locks.StampedLock;
2020-12-07 22:15:18 +01:00
import java.util.function.Function;
2021-03-18 19:53:32 +01:00
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import lombok.AllArgsConstructor;
import lombok.Data;
2021-03-18 19:53:32 +01:00
import org.apache.commons.lang3.tuple.Pair;
2020-12-07 22:15:18 +01:00
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.rocksdb.AbstractSlice;
import org.rocksdb.CappedWriteBatch;
2020-12-07 22:15:18 +01:00
import org.rocksdb.ColumnFamilyHandle;
2021-03-20 12:41:11 +01:00
import org.rocksdb.CompactRangeOptions;
import org.rocksdb.DirectSlice;
2020-12-07 22:15:18 +01:00
import org.rocksdb.FlushOptions;
import org.rocksdb.Holder;
import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.RocksIterator;
2021-03-13 19:01:36 +01:00
import org.rocksdb.Slice;
2020-12-07 22:15:18 +01:00
import org.rocksdb.Snapshot;
2021-05-02 19:18:15 +02:00
import org.rocksdb.WriteBatch;
2020-12-07 22:15:18 +01:00
import org.rocksdb.WriteOptions;
import org.warp.commonutils.concurrency.atomicity.NotAtomic;
2021-02-06 19:21:31 +01:00
import org.warp.commonutils.locks.Striped;
import org.warp.commonutils.log.Logger;
import org.warp.commonutils.log.LoggerFactory;
2021-01-30 00:24:55 +01:00
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
2021-04-03 19:09:06 +02:00
import reactor.util.function.Tuple3;
import reactor.util.function.Tuples;
import static io.netty.buffer.Unpooled.*;
2020-12-07 22:15:18 +01:00
@NotAtomic
public class LLLocalDictionary implements LLDictionary {
protected static final Logger logger = LoggerFactory.getLogger(LLLocalDictionary.class);
2021-03-18 19:53:32 +01:00
private static final boolean USE_CURRENT_FASTSIZE_FOR_OLD_SNAPSHOTS = false;
2020-12-07 22:15:18 +01:00
static final int RESERVED_WRITE_BATCH_SIZE = 2 * 1024 * 1024; // 2MiB
static final long MAX_WRITE_BATCH_SIZE = 1024L * 1024L * 1024L; // 1GiB
static final int CAPPED_WRITE_BATCH_CAP = 50000; // 50K operations
static final int MULTI_GET_WINDOW = 500;
static final ReadOptions EMPTY_READ_OPTIONS = new ReadOptions();
static final WriteOptions EMPTY_WRITE_OPTIONS = new WriteOptions();
2020-12-07 22:15:18 +01:00
static final WriteOptions BATCH_WRITE_OPTIONS = new WriteOptions().setLowPri(true);
2021-03-14 19:38:20 +01:00
static final boolean PREFER_SEEK_TO_FIRST = false;
static final boolean VERIFY_CHECKSUMS_WHEN_NOT_NEEDED = false;
public static final boolean DEBUG_PREFIXES_WHEN_ASSERTIONS_ARE_ENABLED = true;
2021-05-03 02:45:29 +02:00
/**
* Default: true. Use false to debug problems with windowing.
*/
static final boolean USE_WINDOW_IN_SET_RANGE = true;
2021-05-02 19:18:15 +02:00
/**
* Default: true. Use false to debug problems with write batches.
*/
2021-05-03 12:29:15 +02:00
static final boolean USE_WRITE_BATCHES_IN_PUT_MULTI = true;
/**
* Default: true. Use false to debug problems with write batches.
*/
static final boolean USE_WRITE_BATCHES_IN_SET_RANGE = true;
2021-05-02 19:18:15 +02:00
/**
* Default: true. Use false to debug problems with capped write batches.
*/
2021-05-03 02:45:29 +02:00
static final boolean USE_CAPPED_WRITE_BATCH_IN_SET_RANGE = true;
2021-05-03 12:29:15 +02:00
/**
* Default: true. Use false to debug problems with write batches deletes.
*/
static final boolean USE_WRITE_BATCH_IN_SET_RANGE_DELETE = false;
2021-03-18 19:53:32 +01:00
static final boolean PARALLEL_EXACT_SIZE = true;
2020-12-07 22:15:18 +01:00
2021-02-15 00:15:42 +01:00
private static final int STRIPES = 512;
2021-01-30 00:24:55 +01:00
private static final byte[] FIRST_KEY = new byte[]{};
2020-12-07 22:15:18 +01:00
private static final byte[] NO_DATA = new byte[0];
private static final boolean ASSERTIONS_ENABLED;
2021-05-03 00:29:26 +02:00
/**
* Default: true
*/
private static final boolean USE_DIRECT_BUFFER_BOUNDS = true;
static {
boolean assertionsEnabled = false;
//noinspection AssertWithSideEffects
assert (assertionsEnabled = true);
//noinspection ConstantConditions
ASSERTIONS_ENABLED = assertionsEnabled;
}
2020-12-07 22:15:18 +01:00
private final RocksDB db;
private final ColumnFamilyHandle cfh;
private final String databaseName;
private final Scheduler dbScheduler;
2020-12-07 22:15:18 +01:00
private final Function<LLSnapshot, Snapshot> snapshotResolver;
2021-02-13 00:18:57 +01:00
private final Striped<StampedLock> itemsLock = Striped.readWriteStampedLock(STRIPES);
2021-02-13 01:31:24 +01:00
private final UpdateMode updateMode;
private final ByteBufAllocator alloc;
2020-12-07 22:15:18 +01:00
2021-05-03 21:41:51 +02:00
public LLLocalDictionary(
ByteBufAllocator allocator,
@NotNull RocksDB db,
2020-12-07 22:15:18 +01:00
@NotNull ColumnFamilyHandle columnFamilyHandle,
String databaseName,
Scheduler dbScheduler,
2021-02-13 01:31:24 +01:00
Function<LLSnapshot, Snapshot> snapshotResolver,
UpdateMode updateMode) {
2020-12-07 22:15:18 +01:00
Objects.requireNonNull(db);
this.db = db;
Objects.requireNonNull(columnFamilyHandle);
this.cfh = columnFamilyHandle;
this.databaseName = databaseName;
this.dbScheduler = dbScheduler;
2020-12-07 22:15:18 +01:00
this.snapshotResolver = snapshotResolver;
2021-02-13 01:31:24 +01:00
this.updateMode = updateMode;
2021-05-03 21:41:51 +02:00
alloc = allocator;
2020-12-07 22:15:18 +01:00
}
@Override
public String getDatabaseName() {
return databaseName;
}
private ReadOptions resolveSnapshot(LLSnapshot snapshot) {
if (snapshot != null) {
return getReadOptions(snapshotResolver.apply(snapshot));
} else {
return EMPTY_READ_OPTIONS;
}
}
private ReadOptions getReadOptions(Snapshot snapshot) {
if (snapshot != null) {
return new ReadOptions().setSnapshot(snapshot);
} else {
return EMPTY_READ_OPTIONS;
}
}
private int getLockIndex(ByteBuf key) {
return Math.abs(key.hashCode() % STRIPES);
2021-02-06 19:21:31 +01:00
}
private IntArrayList getLockIndices(List<ByteBuf> keys) {
2021-02-06 19:21:31 +01:00
var list = new IntArrayList(keys.size());
for (ByteBuf key : keys) {
2021-02-06 19:21:31 +01:00
list.add(getLockIndex(key));
}
return list;
}
private IntArrayList getLockIndicesEntries(List<Entry<ByteBuf, ByteBuf>> keys) {
2021-02-06 19:21:31 +01:00
var list = new IntArrayList(keys.size());
for (Entry<ByteBuf, ByteBuf> key : keys) {
2021-02-06 19:21:31 +01:00
list.add(getLockIndex(key.getKey()));
}
return list;
}
2020-12-07 22:15:18 +01:00
@Override
public ByteBufAllocator getAllocator() {
return alloc;
}
@Override
public Mono<ByteBuf> get(@Nullable LLSnapshot snapshot, ByteBuf key, boolean existsAlmostCertainly) {
2021-01-30 00:24:55 +01:00
return Mono
.fromCallable(() -> {
2021-02-13 01:31:24 +01:00
StampedLock lock;
long stamp;
if (updateMode == UpdateMode.ALLOW) {
lock = itemsLock.getAt(getLockIndex(key));
2021-02-13 02:16:24 +01:00
stamp = lock.readLock();
2021-02-13 01:31:24 +01:00
} else {
lock = null;
stamp = 0;
}
2021-02-06 19:21:31 +01:00
try {
if (logger.isTraceEnabled()) {
logger.trace("Reading {}", LLUtils.toString(key));
2021-01-30 00:24:55 +01:00
}
2021-05-03 21:41:51 +02:00
return dbGet(cfh, resolveSnapshot(snapshot), key.retain(), existsAlmostCertainly);
2021-02-06 19:21:31 +01:00
} finally {
2021-02-13 01:31:24 +01:00
if (updateMode == UpdateMode.ALLOW) {
lock.unlockRead(stamp);
}
2021-01-30 00:24:55 +01:00
}
})
.onErrorMap(cause -> new IOException("Failed to read " + LLUtils.toString(key), cause))
.subscribeOn(dbScheduler)
.doFinally(s -> key.release());
}
2021-05-03 21:41:51 +02:00
private ByteBuf dbGet(ColumnFamilyHandle cfh,
@Nullable ReadOptions readOptions,
ByteBuf key,
boolean existsAlmostCertainly) throws RocksDBException {
try {
2021-05-03 21:41:51 +02:00
if (key.isDirect()) {
2021-05-02 19:18:15 +02:00
2021-05-03 21:41:51 +02:00
//todo: implement keyMayExist if existsAlmostCertainly is false.
// Unfortunately it's not feasible until RocksDB implements keyMayExist with buffers
2021-05-03 21:41:51 +02:00
// Create the key nio buffer to pass to RocksDB
if (!key.isDirect()) {
throw new RocksDBException("Key buffer must be direct");
}
ByteBuffer keyNioBuffer = LLUtils.toDirect(key);
assert keyNioBuffer.isDirect();
// Create a direct result buffer because RocksDB works only with direct buffers
ByteBuf resultBuf = alloc.directBuffer();
try {
int valueSize;
int assertionReadData = -1;
ByteBuffer resultNioBuf;
do {
// Create the result nio buffer to pass to RocksDB
resultNioBuf = resultBuf.nioBuffer(0, resultBuf.capacity());
assert keyNioBuffer.isDirect();
assert resultNioBuf.isDirect();
valueSize = db.get(cfh,
Objects.requireNonNullElse(readOptions, EMPTY_READ_OPTIONS),
keyNioBuffer,
resultNioBuf
);
if (valueSize != RocksDB.NOT_FOUND) {
// todo: check if position is equal to data that have been read
// todo: check if limit is equal to value size or data that have been read
assert valueSize <= 0 || resultNioBuf.limit() > 0;
2021-05-03 21:41:51 +02:00
// If the locking is enabled the data is safe, so since we are appending data to the end,
// we need to check if it has been appended correctly or it it has been overwritten.
// We must not do this check otherwise because if there is no locking the data can be
// overwritten with a smaller value the next time.
2021-05-02 19:18:15 +02:00
if (updateMode == UpdateMode.ALLOW) {
2021-05-03 21:41:51 +02:00
// Check if read data is larger than previously read data.
// If it's smaller or equals it means that RocksDB is overwriting the beginning of the result buffer.
assert resultNioBuf.limit() > assertionReadData;
if (ASSERTIONS_ENABLED) {
assertionReadData = resultNioBuf.limit();
}
}
// Check if read data is not bigger than the total value size.
// If it's bigger it means that RocksDB is writing the start of the result into the result
// buffer more than once.
assert resultNioBuf.limit() <= valueSize;
if (valueSize <= resultNioBuf.limit()) {
// Return the result ready to be read
return resultBuf.setIndex(0, valueSize).retain();
} else {
// If the locking is enabled the data is safe, so we can append the next read data.
// Otherwise we need to re-read everything.
if (updateMode == UpdateMode.ALLOW) {
// Update the resultBuf writerIndex with the new position
resultBuf.writerIndex(resultNioBuf.limit());
}
//noinspection UnusedAssignment
resultNioBuf = null;
}
// Rewind the keyNioBuf position, making it readable again for the next loop iteration
keyNioBuffer.rewind();
if (resultBuf.capacity() < valueSize) {
// Expand the resultBuf size if the result is bigger than the current result buffer size
resultBuf.capacity(valueSize);
}
}
2021-05-03 21:41:51 +02:00
// Repeat if the result has been found but it's still not finished
} while (valueSize != RocksDB.NOT_FOUND);
// If the value is not found return null
return null;
} finally {
resultBuf.release();
}
} else {
byte[] keyArray = LLUtils.toArray(key);
Objects.requireNonNull(keyArray);
Holder<byte[]> data = existsAlmostCertainly ? null : new Holder<>();
if (existsAlmostCertainly || db.keyMayExist(cfh,
Objects.requireNonNullElse(readOptions, EMPTY_READ_OPTIONS),
keyArray,
data
)) {
if (!existsAlmostCertainly && data.getValue() != null) {
return wrappedBuffer(data.getValue());
} else {
byte[] result = db.get(cfh, Objects.requireNonNullElse(readOptions, EMPTY_READ_OPTIONS), keyArray);
if (result == null) {
return null;
} else {
return wrappedBuffer(result);
2021-05-02 19:18:15 +02:00
}
}
2021-05-03 21:41:51 +02:00
} else {
return null;
}
}
} finally {
key.release();
}
}
private void dbPut(ColumnFamilyHandle cfh, @Nullable WriteOptions writeOptions, ByteBuf key, ByteBuf value)
throws RocksDBException {
try {
2021-05-03 21:41:51 +02:00
if (key.isDirect() && value.isDirect()) {
if (!key.isDirect()) {
throw new RocksDBException("Key buffer must be direct");
}
if (!value.isDirect()) {
throw new RocksDBException("Value buffer must be direct");
}
var keyNioBuffer = LLUtils.toDirect(key);
assert keyNioBuffer.isDirect();
2021-05-03 21:41:51 +02:00
var valueNioBuffer = LLUtils.toDirect(value);
assert valueNioBuffer.isDirect();
db.put(cfh, Objects.requireNonNullElse(writeOptions, EMPTY_WRITE_OPTIONS), keyNioBuffer, valueNioBuffer);
} else {
db.put(cfh, Objects.requireNonNullElse(writeOptions, EMPTY_WRITE_OPTIONS), LLUtils.toArray(key), LLUtils.toArray(value));
}
} finally {
key.release();
value.release();
}
2020-12-07 22:15:18 +01:00
}
@Override
2021-01-30 00:26:58 +01:00
public Mono<Boolean> isRangeEmpty(@Nullable LLSnapshot snapshot, LLRange range) {
2021-05-02 19:18:15 +02:00
return Mono
.defer(() -> {
if (range.isSingle()) {
return containsKey(snapshot, range.getSingle().retain());
} else {
return containsRange(snapshot, range.retain());
}
})
.map(isContained -> !isContained)
.doFinally(s -> range.release());
2020-12-07 22:15:18 +01:00
}
2021-01-30 00:24:55 +01:00
public Mono<Boolean> containsRange(@Nullable LLSnapshot snapshot, LLRange range) {
return Mono
.fromCallable(() -> {
2021-03-13 19:01:36 +01:00
var readOpts = resolveSnapshot(snapshot);
2021-03-14 19:38:20 +01:00
readOpts.setVerifyChecksums(VERIFY_CHECKSUMS_WHEN_NOT_NEEDED);
2021-03-14 13:08:03 +01:00
readOpts.setFillCache(false);
2021-03-13 19:01:36 +01:00
if (range.hasMin()) {
2021-05-03 21:41:51 +02:00
if (range.getMin().isDirect()) {
readOpts.setIterateLowerBound(new DirectSlice(Objects.requireNonNull(LLUtils.toDirect(range.getMin()),
"This range must use direct buffers"
)));
} else {
readOpts.setIterateLowerBound(new Slice(LLUtils.toArray(range.getMin())));
}
2021-03-13 19:01:36 +01:00
}
if (range.hasMax()) {
2021-05-03 21:41:51 +02:00
if (range.getMax().isDirect()) {
readOpts.setIterateUpperBound(new DirectSlice(Objects.requireNonNull(LLUtils.toDirect(range.getMax()),
"This range must use direct buffers"
)));
} else {
readOpts.setIterateUpperBound(new Slice(LLUtils.toArray(range.getMax())));
}
2021-03-13 19:01:36 +01:00
}
2021-03-18 19:53:32 +01:00
try (RocksIterator rocksIterator = db.newIterator(cfh, readOpts)) {
if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) {
2021-05-03 21:41:51 +02:00
if (range.getMin().isDirect()) {
rocksIterator.seek(Objects.requireNonNull(LLUtils.toDirect(range.getMin()),
"This range must use direct buffers"
));
} else {
rocksIterator.seek(LLUtils.toArray(range.getMin()));
}
2021-03-18 19:53:32 +01:00
} else {
rocksIterator.seekToFirst();
}
return rocksIterator.isValid();
2021-01-30 00:24:55 +01:00
}
})
2021-03-04 22:01:50 +01:00
.onErrorMap(cause -> new IOException("Failed to read range " + range.toString(), cause))
.subscribeOn(dbScheduler)
.doFinally(s -> range.release());
2020-12-07 22:15:18 +01:00
}
private Mono<Boolean> containsKey(@Nullable LLSnapshot snapshot, ByteBuf key) {
2021-01-30 00:24:55 +01:00
return Mono
.fromCallable(() -> {
2021-02-13 01:31:24 +01:00
StampedLock lock;
long stamp;
if (updateMode == UpdateMode.ALLOW) {
lock = itemsLock.getAt(getLockIndex(key));
2021-02-13 02:16:24 +01:00
stamp = lock.readLock();
2021-02-13 01:31:24 +01:00
} else {
lock = null;
stamp = 0;
}
2021-02-06 19:21:31 +01:00
try {
int size = RocksDB.NOT_FOUND;
byte[] keyBytes = LLUtils.toArray(key);
2021-02-06 19:21:31 +01:00
Holder<byte[]> data = new Holder<>();
if (db.keyMayExist(cfh, resolveSnapshot(snapshot), keyBytes, data)) {
2021-02-06 19:21:31 +01:00
if (data.getValue() != null) {
size = data.getValue().length;
} else {
size = db.get(cfh, resolveSnapshot(snapshot), keyBytes, NO_DATA);
2021-02-06 19:21:31 +01:00
}
2020-12-07 22:15:18 +01:00
}
2021-02-06 19:21:31 +01:00
return size != RocksDB.NOT_FOUND;
} finally {
2021-02-13 01:31:24 +01:00
if (updateMode == UpdateMode.ALLOW) {
lock.unlockRead(stamp);
}
2020-12-07 22:15:18 +01:00
}
2021-01-30 00:24:55 +01:00
})
.onErrorMap(cause -> new IOException("Failed to read " + LLUtils.toString(key), cause))
.subscribeOn(dbScheduler)
.doFinally(s -> key.release());
2021-01-30 00:24:55 +01:00
}
@Override
public Mono<ByteBuf> put(ByteBuf key, ByteBuf value, LLDictionaryResultType resultType) {
2021-05-02 19:18:15 +02:00
return Mono
.defer(() -> getPreviousData(key.retain(), resultType))
.concatWith(Mono
2021-05-02 19:18:15 +02:00
.<ByteBuf>fromCallable(() -> {
2021-02-13 01:31:24 +01:00
StampedLock lock;
long stamp;
if (updateMode == UpdateMode.ALLOW) {
lock = itemsLock.getAt(getLockIndex(key));
2021-02-13 02:16:24 +01:00
stamp = lock.writeLock();
2021-02-13 01:31:24 +01:00
} else {
lock = null;
stamp = 0;
}
2021-02-06 19:21:31 +01:00
try {
if (logger.isTraceEnabled()) {
logger.trace("Writing {}: {}", LLUtils.toString(key), LLUtils.toString(value));
}
dbPut(cfh, null, key.retain(), value.retain());
2021-02-06 19:21:31 +01:00
return null;
} finally {
2021-02-13 01:31:24 +01:00
if (updateMode == UpdateMode.ALLOW) {
lock.unlockWrite(stamp);
}
2021-02-06 19:21:31 +01:00
}
})
.subscribeOn(dbScheduler)
2021-05-02 19:18:15 +02:00
.onErrorMap(cause -> new IOException("Failed to write " + LLUtils.toString(key), cause))
)
.singleOrEmpty()
.doFinally(s -> {
key.release();
value.release();
});
2020-12-07 22:15:18 +01:00
}
2021-05-02 19:18:15 +02:00
@Override
public Mono<UpdateMode> getUpdateMode() {
return Mono.fromSupplier(() -> updateMode);
}
2021-02-06 19:21:31 +01:00
@Override
public Mono<Boolean> update(ByteBuf key,
Function<@Nullable ByteBuf, @Nullable ByteBuf> updater,
2021-03-18 16:19:41 +01:00
boolean existsAlmostCertainly) {
2021-02-06 19:21:31 +01:00
return Mono
.fromCallable(() -> {
if (updateMode == UpdateMode.DISALLOW) throw new UnsupportedOperationException("update() is disallowed");
StampedLock lock;
long stamp;
if (updateMode == UpdateMode.ALLOW) {
lock = itemsLock.getAt(getLockIndex(key));
stamp = lock.readLock();
} else {
lock = null;
stamp = 0;
}
try {
if (logger.isTraceEnabled()) {
logger.trace("Reading {}", LLUtils.toString(key));
}
while (true) {
boolean changed = false;
@Nullable ByteBuf prevData;
var prevDataHolder = existsAlmostCertainly ? null : new Holder<byte[]>();
if (existsAlmostCertainly || db.keyMayExist(cfh, LLUtils.toArray(key), prevDataHolder)) {
if (!existsAlmostCertainly && prevDataHolder.getValue() != null) {
byte @Nullable [] prevDataBytes = prevDataHolder.getValue();
if (prevDataBytes != null) {
prevData = wrappedBuffer(prevDataBytes);
} else {
prevData = null;
}
} else {
2021-05-03 21:41:51 +02:00
prevData = dbGet(cfh, null, key.retain(), existsAlmostCertainly);
}
2021-02-13 01:31:24 +01:00
} else {
prevData = null;
2021-02-13 01:31:24 +01:00
}
2021-02-06 19:21:31 +01:00
try {
@Nullable ByteBuf newData;
ByteBuf prevDataToSendToUpdater = prevData == null ? null : prevData.retainedSlice();
try {
2021-05-03 00:29:26 +02:00
newData = updater.apply(prevDataToSendToUpdater == null ? null : prevDataToSendToUpdater.retain());
assert prevDataToSendToUpdater == null
|| prevDataToSendToUpdater.readerIndex() == 0
|| !prevDataToSendToUpdater.isReadable();
} finally {
if (prevDataToSendToUpdater != null) {
prevDataToSendToUpdater.release();
2021-02-06 19:21:31 +01:00
}
}
try {
if (prevData != null && newData == null) {
2021-04-03 19:09:06 +02:00
//noinspection DuplicatedCode
2021-02-13 01:31:24 +01:00
if (updateMode == UpdateMode.ALLOW) {
var ws = lock.tryConvertToWriteLock(stamp);
2021-02-13 02:16:24 +01:00
if (ws != 0) {
stamp = ws;
} else {
2021-02-13 01:31:24 +01:00
lock.unlockRead(stamp);
2021-02-13 02:16:24 +01:00
stamp = lock.writeLock();
2021-02-13 01:31:24 +01:00
continue;
}
2021-02-13 00:18:57 +01:00
}
if (logger.isTraceEnabled()) {
logger.trace("Deleting {}", LLUtils.toString(key));
}
2021-02-09 14:14:38 +01:00
changed = true;
dbDelete(cfh, null, key.retain());
} else if (newData != null
&& (prevData == null || !LLUtils.equals(prevData, newData))) {
2021-04-03 19:09:06 +02:00
//noinspection DuplicatedCode
2021-02-13 01:31:24 +01:00
if (updateMode == UpdateMode.ALLOW) {
var ws = lock.tryConvertToWriteLock(stamp);
2021-02-13 02:16:24 +01:00
if (ws != 0) {
stamp = ws;
} else {
2021-02-13 01:31:24 +01:00
lock.unlockRead(stamp);
2021-02-13 02:16:24 +01:00
stamp = lock.writeLock();
2021-02-13 01:31:24 +01:00
continue;
}
2021-02-13 00:18:57 +01:00
}
if (logger.isTraceEnabled()) {
logger.trace("Writing {}: {}", LLUtils.toString(key), LLUtils.toString(newData));
}
2021-02-09 14:14:38 +01:00
changed = true;
dbPut(cfh, null, key.retain(), newData.retain());
2021-02-06 19:21:31 +01:00
}
2021-02-13 00:18:57 +01:00
return changed;
} finally {
if (newData != null) {
newData.release();
}
2021-02-06 19:21:31 +01:00
}
} finally {
if (prevData != null) {
prevData.release();
2021-02-13 01:31:24 +01:00
}
2021-02-06 19:21:31 +01:00
}
}
} finally {
if (updateMode == UpdateMode.ALLOW) {
lock.unlock(stamp);
}
}
})
.onErrorMap(cause -> new IOException("Failed to read or write " + LLUtils.toString(key), cause))
.subscribeOn(dbScheduler)
.doFinally(s -> key.release());
}
private void dbDelete(ColumnFamilyHandle cfh, @Nullable WriteOptions writeOptions, ByteBuf key)
throws RocksDBException {
try {
2021-05-03 21:41:51 +02:00
if (key.isDirect()) {
if (!key.isDirect()) {
throw new IllegalArgumentException("Key must be a direct buffer");
}
var keyNioBuffer = LLUtils.toDirect(key);
db.delete(cfh, Objects.requireNonNullElse(writeOptions, EMPTY_WRITE_OPTIONS), keyNioBuffer);
} else {
db.delete(cfh, Objects.requireNonNullElse(writeOptions, EMPTY_WRITE_OPTIONS), LLUtils.toArray(key));
}
} finally {
key.release();
}
2021-02-06 19:21:31 +01:00
}
2020-12-07 22:15:18 +01:00
@Override
public Mono<ByteBuf> remove(ByteBuf key, LLDictionaryResultType resultType) {
2021-05-02 19:18:15 +02:00
return Mono
.defer(() -> getPreviousData(key.retain(), resultType))
.concatWith(Mono
.fromCallable(() -> {
2021-02-13 01:31:24 +01:00
StampedLock lock;
long stamp;
if (updateMode == UpdateMode.ALLOW) {
lock = itemsLock.getAt(getLockIndex(key));
2021-02-13 02:16:24 +01:00
stamp = lock.writeLock();
2021-02-13 01:31:24 +01:00
} else {
lock = null;
stamp = 0;
}
2021-02-06 19:21:31 +01:00
try {
if (logger.isTraceEnabled()) {
logger.trace("Deleting {}", LLUtils.toString(key));
}
dbDelete(cfh, null, key.retain());
2021-02-06 19:21:31 +01:00
return null;
} finally {
2021-02-13 01:31:24 +01:00
if (updateMode == UpdateMode.ALLOW) {
lock.unlockWrite(stamp);
}
2021-02-06 19:21:31 +01:00
}
})
.onErrorMap(cause -> new IOException("Failed to delete " + LLUtils.toString(key), cause))
.subscribeOn(dbScheduler)
.then(Mono.empty())
).singleOrEmpty()
.doFinally(s -> key.release());
2021-01-30 00:35:03 +01:00
}
private Mono<ByteBuf> getPreviousData(ByteBuf key, LLDictionaryResultType resultType) {
2021-05-02 19:18:15 +02:00
return Mono
.defer(() -> {
switch (resultType) {
case PREVIOUS_VALUE_EXISTENCE:
return this
.containsKey(null, key.retain())
.single()
.map(LLUtils::booleanToResponseByteBuffer)
.doFinally(s -> {
assert key.refCnt() > 0;
});
case PREVIOUS_VALUE:
return Mono
.fromCallable(() -> {
StampedLock lock;
long stamp;
if (updateMode == UpdateMode.ALLOW) {
lock = itemsLock.getAt(getLockIndex(key));
2021-03-14 13:08:03 +01:00
2021-05-02 19:18:15 +02:00
stamp = lock.readLock();
} else {
lock = null;
stamp = 0;
}
try {
2021-05-02 19:18:15 +02:00
if (logger.isTraceEnabled()) {
logger.trace("Reading {}", LLUtils.toArray(key));
}
var data = new Holder<byte[]>();
if (db.keyMayExist(cfh, LLUtils.toArray(key), data)) {
if (data.getValue() != null) {
return wrappedBuffer(data.getValue());
} else {
try {
2021-05-03 21:41:51 +02:00
return dbGet(cfh, null, key.retain(), true);
2021-05-02 19:18:15 +02:00
} finally {
assert key.refCnt() > 0;
}
}
} else {
return null;
}
} finally {
2021-05-02 19:18:15 +02:00
if (updateMode == UpdateMode.ALLOW) {
lock.unlockRead(stamp);
}
}
2021-05-02 19:18:15 +02:00
})
.onErrorMap(cause -> new IOException("Failed to read " + LLUtils.toString(key), cause))
.subscribeOn(dbScheduler);
case VOID:
return Mono.empty();
default:
return Mono.error(new IllegalStateException("Unexpected value: " + resultType));
}
})
.doFinally(s -> key.release());
2021-01-30 00:24:55 +01:00
}
2020-12-07 22:15:18 +01:00
2021-01-30 00:24:55 +01:00
@Override
public Flux<Entry<ByteBuf, ByteBuf>> getMulti(@Nullable LLSnapshot snapshot,
Flux<ByteBuf> keys,
2021-03-18 16:19:41 +01:00
boolean existsAlmostCertainly) {
return keys
.window(MULTI_GET_WINDOW)
2021-05-02 19:18:15 +02:00
.flatMap(keysWindowFlux -> keysWindowFlux
.collectList()
.doOnDiscard(Entry.class, discardedEntry -> {
//noinspection unchecked
var entry = (Entry<ByteBuf, ByteBuf>) discardedEntry;
entry.getKey().release();
entry.getValue().release();
})
.flatMapMany(keysWindow -> Mono
2021-02-03 14:37:02 +01:00
.fromCallable(() -> {
2021-02-13 01:31:24 +01:00
Iterable<StampedLock> locks;
ArrayList<Long> stamps;
if (updateMode == UpdateMode.ALLOW) {
locks = itemsLock.bulkGetAt(getLockIndices(keysWindow));
stamps = new ArrayList<>();
for (var lock : locks) {
2021-02-13 02:16:24 +01:00
stamps.add(lock.readLock());
2021-02-13 01:31:24 +01:00
}
} else {
locks = null;
stamps = null;
2021-02-06 19:21:31 +01:00
}
try {
var handlesArray = new ColumnFamilyHandle[keysWindow.size()];
Arrays.fill(handlesArray, cfh);
var handles = ObjectArrayList.wrap(handlesArray, handlesArray.length);
var results = db.multiGetAsList(resolveSnapshot(snapshot), handles, LLUtils.toArray(keysWindow));
var mappedResults = new ArrayList<Entry<ByteBuf, ByteBuf>>(results.size());
2021-02-06 19:21:31 +01:00
for (int i = 0; i < results.size(); i++) {
var val = results.get(i);
if (val != null) {
results.set(i, null);
mappedResults.add(Map.entry(keysWindow.get(i).retain(), wrappedBuffer(val)));
2021-02-06 19:21:31 +01:00
}
}
return mappedResults;
} finally {
2021-02-13 01:31:24 +01:00
if (updateMode == UpdateMode.ALLOW) {
int index = 0;
for (var lock : locks) {
lock.unlockRead(stamps.get(index));
index++;
}
}
2021-01-30 00:24:55 +01:00
}
})
.subscribeOn(dbScheduler)
2021-02-03 14:37:02 +01:00
.flatMapMany(Flux::fromIterable)
2021-03-04 22:01:50 +01:00
.onErrorMap(cause -> new IOException("Failed to read keys "
+ Arrays.deepToString(keysWindow.toArray(ByteBuf[]::new)), cause))
.doFinally(s -> keysWindow.forEach(ReferenceCounted::release))
2021-01-30 00:24:55 +01:00
)
2021-05-02 19:18:15 +02:00
)
.doOnDiscard(Entry.class, discardedEntry -> {
//noinspection unchecked
var entry = (Entry<ByteBuf, ByteBuf>) discardedEntry;
entry.getKey().release();
entry.getValue().release();
});
2020-12-07 22:15:18 +01:00
}
@Override
public Flux<Entry<ByteBuf, ByteBuf>> putMulti(Flux<Entry<ByteBuf, ByteBuf>> entries, boolean getOldValues) {
return entries
.window(Math.min(MULTI_GET_WINDOW, CAPPED_WRITE_BATCH_CAP))
2021-05-02 19:18:15 +02:00
.doOnDiscard(Entry.class, entry -> {
2021-05-03 18:07:18 +02:00
if (entry.getKey() instanceof ByteBuf && entry.getValue() instanceof ByteBuf) {
//noinspection unchecked
var castedEntry = (Entry<ByteBuf, ByteBuf>) entry;
castedEntry.getKey().release();
castedEntry.getValue().release();
}
2021-05-02 19:18:15 +02:00
})
.flatMap(Flux::collectList)
2021-05-02 19:18:15 +02:00
.doOnDiscard(Entry.class, entry -> {
2021-05-03 18:07:18 +02:00
if (entry.getKey() instanceof ByteBuf && entry.getValue() instanceof ByteBuf) {
//noinspection unchecked
var castedEntry = (Entry<ByteBuf, ByteBuf>) entry;
castedEntry.getKey().release();
castedEntry.getValue().release();
}
2021-05-02 19:18:15 +02:00
})
.flatMap(ew -> Mono
.using(
() -> ew,
entriesWindow -> Mono
.<Entry<ByteBuf, ByteBuf>>fromCallable(() -> {
Iterable<StampedLock> locks;
ArrayList<Long> stamps;
if (updateMode == UpdateMode.ALLOW) {
2021-05-02 19:18:15 +02:00
locks = itemsLock.bulkGetAt(getLockIndicesEntries(entriesWindow));
stamps = new ArrayList<>();
for (var lock : locks) {
2021-05-02 19:18:15 +02:00
stamps.add(lock.writeLock());
}
2021-05-02 19:18:15 +02:00
} else {
locks = null;
stamps = null;
}
2021-05-02 19:18:15 +02:00
try {
2021-05-03 12:29:15 +02:00
if (USE_WRITE_BATCHES_IN_PUT_MULTI) {
2021-05-02 19:18:15 +02:00
var batch = new CappedWriteBatch(db,
CAPPED_WRITE_BATCH_CAP,
RESERVED_WRITE_BATCH_SIZE,
MAX_WRITE_BATCH_SIZE,
BATCH_WRITE_OPTIONS
);
for (Entry<ByteBuf, ByteBuf> entry : entriesWindow) {
batch.put(cfh, entry.getKey().retain(), entry.getValue().retain());
}
batch.writeToDbAndClose();
batch.close();
} else {
for (Entry<ByteBuf, ByteBuf> entry : entriesWindow) {
2021-05-03 12:44:22 +02:00
db.put(cfh, EMPTY_WRITE_OPTIONS, entry.getKey().nioBuffer(), entry.getValue().nioBuffer());
2021-05-02 19:18:15 +02:00
}
}
return null;
} finally {
if (updateMode == UpdateMode.ALLOW) {
int index = 0;
for (var lock : locks) {
lock.unlockWrite(stamps.get(index));
index++;
}
}
}
})
// Prepend everything to get previous elements
.transformDeferred(transformer -> {
if (getOldValues) {
return this
.getMulti(null, Flux
.fromIterable(entriesWindow)
.map(Entry::getKey)
.map(ByteBuf::retain), false)
.publishOn(dbScheduler)
.then(transformer);
} else {
return transformer;
}
}),
entriesWindow -> {
for (Entry<ByteBuf, ByteBuf> entry : entriesWindow) {
entry.getKey().release();
entry.getValue().release();
2021-05-02 19:18:15 +02:00
}
}
)
)
.doOnDiscard(Collection.class, obj -> {
//noinspection unchecked
var castedEntries = (Collection<Entry<ByteBuf, ByteBuf>>) obj;
for (Entry<ByteBuf, ByteBuf> entry : castedEntries) {
entry.getKey().release();
entry.getValue().release();
}
});
}
2021-01-30 00:33:36 +01:00
@NotNull
2021-05-02 19:18:15 +02:00
private Mono<Void> putEntryToWriteBatch(Entry<ByteBuf, ByteBuf> newEntry, CappedWriteBatch writeBatch) {
return Mono
.<Void>fromCallable(() -> {
writeBatch.put(cfh, newEntry.getKey(), newEntry.getValue());
return null;
})
.subscribeOn(dbScheduler);
2020-12-07 22:15:18 +01:00
}
@Override
public Flux<Entry<ByteBuf, ByteBuf>> getRange(@Nullable LLSnapshot snapshot,
2021-03-18 16:19:41 +01:00
LLRange range,
boolean existsAlmostCertainly) {
2021-05-02 19:18:15 +02:00
return Flux
.defer(() -> {
if (range.isSingle()) {
return getRangeSingle(snapshot, range.getMin().retain(), existsAlmostCertainly);
} else {
return getRangeMulti(snapshot, range.retain());
}
})
.doFinally(s -> range.release());
}
@Override
public Flux<List<Entry<ByteBuf, ByteBuf>>> getRangeGrouped(@Nullable LLSnapshot snapshot,
LLRange range,
2021-03-18 16:19:41 +01:00
int prefixLength, boolean existsAlmostCertainly) {
2021-05-02 19:18:15 +02:00
return Flux
.defer(() -> {
if (range.isSingle()) {
return getRangeSingle(snapshot, range.getMin().retain(), existsAlmostCertainly).map(List::of);
} else {
return getRangeMultiGrouped(snapshot, range.retain(), prefixLength);
}
})
.doFinally(s -> range.release());
2021-01-30 00:24:55 +01:00
}
private Flux<Entry<ByteBuf, ByteBuf>> getRangeSingle(LLSnapshot snapshot, ByteBuf key, boolean existsAlmostCertainly) {
2021-05-02 19:18:15 +02:00
return Mono
.defer(() -> this.get(snapshot, key.retain(), existsAlmostCertainly))
.map(value -> Map.entry(key.retain(), value))
.flux()
.doFinally(s -> key.release());
}
private Flux<Entry<ByteBuf, ByteBuf>> getRangeMulti(LLSnapshot snapshot, LLRange range) {
2021-05-02 19:18:15 +02:00
return Flux
.using(
() -> new LLLocalEntryReactiveRocksIterator(db, alloc, cfh, range.retain(), resolveSnapshot(snapshot)),
LLLocalReactiveRocksIterator::flux,
LLLocalReactiveRocksIterator::release
)
.doOnDiscard(Entry.class, entry -> {
//noinspection unchecked
var castedEntry = (Entry<ByteBuf, ByteBuf>) entry;
castedEntry.getKey().release();
castedEntry.getValue().release();
})
.subscribeOn(dbScheduler)
.doFinally(s -> range.release());
}
private Flux<List<Entry<ByteBuf, ByteBuf>>> getRangeMultiGrouped(LLSnapshot snapshot, LLRange range, int prefixLength) {
2021-05-02 19:18:15 +02:00
return Flux
.using(
() -> new LLLocalGroupedEntryReactiveRocksIterator(db,
alloc,
cfh,
prefixLength,
range.retain(),
resolveSnapshot(snapshot),
"getRangeMultiGrouped"
),
LLLocalGroupedReactiveRocksIterator::flux,
LLLocalGroupedReactiveRocksIterator::release
)
.subscribeOn(dbScheduler)
.doFinally(s -> range.release());
2021-01-30 00:24:55 +01:00
}
@Override
public Flux<ByteBuf> getRangeKeys(@Nullable LLSnapshot snapshot, LLRange range) {
2021-05-02 19:18:15 +02:00
return Flux
.defer(() -> {
if (range.isSingle()) {
return this.getRangeKeysSingle(snapshot, range.getMin().retain());
} else {
return this.getRangeKeysMulti(snapshot, range.retain());
}
})
.doFinally(s -> range.release());
}
@Override
public Flux<List<ByteBuf>> getRangeKeysGrouped(@Nullable LLSnapshot snapshot, LLRange range, int prefixLength) {
2021-05-02 19:18:15 +02:00
return Flux
.using(
2021-05-03 00:29:26 +02:00
() -> new LLLocalGroupedKeyReactiveRocksIterator(db,
alloc,
cfh,
prefixLength,
range.retain(),
resolveSnapshot(snapshot),
"getRangeKeysGrouped"
),
LLLocalGroupedReactiveRocksIterator::flux,
LLLocalGroupedReactiveRocksIterator::release
)
.subscribeOn(dbScheduler)
.doFinally(s -> range.release());
2021-03-14 13:24:46 +01:00
}
@Override
public Flux<ByteBuf> getRangeKeyPrefixes(@Nullable LLSnapshot snapshot, LLRange range, int prefixLength) {
2021-05-02 19:18:15 +02:00
return Flux
.using(
() -> new LLLocalKeyPrefixReactiveRocksIterator(db,
alloc,
cfh,
prefixLength,
range.retain(),
resolveSnapshot(snapshot),
true,
"getRangeKeysGrouped"
),
LLLocalKeyPrefixReactiveRocksIterator::flux,
LLLocalKeyPrefixReactiveRocksIterator::release
)
.subscribeOn(dbScheduler)
.doFinally(s -> range.release());
}
private Flux<ByteBuf> getRangeKeysSingle(LLSnapshot snapshot, ByteBuf key) {
2021-05-02 19:18:15 +02:00
return Mono
.defer(() -> this.containsKey(snapshot, key.retain()))
.flux()
2021-05-02 19:18:15 +02:00
.<ByteBuf>handle((contains, sink) -> {
if (contains) {
sink.next(key.retain());
} else {
sink.complete();
}
})
.doOnDiscard(ByteBuf.class, ReferenceCounted::release)
.doFinally(s -> key.release());
2020-12-07 22:15:18 +01:00
}
private Flux<ByteBuf> getRangeKeysMulti(LLSnapshot snapshot, LLRange range) {
2021-05-02 19:18:15 +02:00
return Flux
.using(
() -> new LLLocalKeyReactiveRocksIterator(db, alloc, cfh, range.retain(), resolveSnapshot(snapshot)),
LLLocalReactiveRocksIterator::flux,
LLLocalReactiveRocksIterator::release
)
.doOnDiscard(ByteBuf.class, ReferenceCounted::release)
.subscribeOn(dbScheduler)
.doFinally(s -> range.release());
}
2020-12-07 22:15:18 +01:00
@Override
2021-05-02 19:18:15 +02:00
public Mono<Void> setRange(LLRange range, Flux<Entry<ByteBuf, ByteBuf>> entries) {
2021-05-03 02:45:29 +02:00
if (USE_WINDOW_IN_SET_RANGE) {
return Mono
.<Void>fromCallable(() -> {
2021-05-03 12:29:15 +02:00
if (!USE_WRITE_BATCH_IN_SET_RANGE_DELETE || !USE_WRITE_BATCHES_IN_SET_RANGE) {
2021-05-03 02:45:29 +02:00
var opts = new ReadOptions(EMPTY_READ_OPTIONS);
2021-05-03 02:57:08 +02:00
ReleasableSlice minBound;
2021-05-03 02:45:29 +02:00
if (range.hasMin()) {
2021-05-03 02:57:08 +02:00
minBound = setIterateBound(opts, IterateBound.LOWER, range.getMin().retain());
} else {
minBound = emptyReleasableSlice();
2021-05-03 02:45:29 +02:00
}
2021-05-03 02:57:08 +02:00
try {
ReleasableSlice maxBound;
if (range.hasMax()) {
maxBound = setIterateBound(opts, IterateBound.UPPER, range.getMax().retain());
2021-05-03 02:45:29 +02:00
} else {
2021-05-03 02:57:08 +02:00
maxBound = emptyReleasableSlice();
2021-05-03 02:45:29 +02:00
}
2021-05-03 02:57:08 +02:00
try (RocksIterator it = db.newIterator(cfh, opts)) {
if (!PREFER_SEEK_TO_FIRST && range.hasMin()) {
rocksIterSeekTo(it, range.getMin().retain());
} else {
it.seekToFirst();
}
while (it.isValid()) {
db.delete(cfh, it.key());
it.next();
}
} finally {
maxBound.release();
2021-05-03 02:45:29 +02:00
}
2021-05-03 02:57:08 +02:00
} finally {
minBound.release();
2021-05-03 02:45:29 +02:00
}
} else if (USE_CAPPED_WRITE_BATCH_IN_SET_RANGE) {
try (var batch = new CappedWriteBatch(db,
CAPPED_WRITE_BATCH_CAP,
RESERVED_WRITE_BATCH_SIZE,
MAX_WRITE_BATCH_SIZE,
BATCH_WRITE_OPTIONS
)) {
if (range.isSingle()) {
batch.delete(cfh, range.getSingle().retain());
} else {
deleteSmallRangeWriteBatch(batch, range.retain());
}
batch.writeToDbAndClose();
}
} else {
try (var batch = new WriteBatch(RESERVED_WRITE_BATCH_SIZE)) {
if (range.isSingle()) {
batch.delete(cfh, LLUtils.toArray(range.getSingle()));
} else {
deleteSmallRangeWriteBatch(batch, range.retain());
}
db.write(EMPTY_WRITE_OPTIONS, batch);
batch.clear();
}
}
return null;
})
.subscribeOn(dbScheduler)
.thenMany(entries
.window(MULTI_GET_WINDOW)
)
2021-05-02 19:18:15 +02:00
.flatMap(keysWindowFlux -> keysWindowFlux
.collectList()
.doOnDiscard(Entry.class, discardedEntry -> {
//noinspection unchecked
var entry = (Entry<ByteBuf, ByteBuf>) discardedEntry;
entry.getKey().release();
entry.getValue().release();
})
.flatMap(entriesList -> Mono
.<Void>fromCallable(() -> {
try {
2021-05-03 02:45:29 +02:00
if (!USE_WRITE_BATCHES_IN_SET_RANGE) {
for (Entry<ByteBuf, ByteBuf> entry : entriesList) {
db.put(cfh, EMPTY_WRITE_OPTIONS, entry.getKey().nioBuffer(), entry.getValue().nioBuffer());
}
} else if (USE_CAPPED_WRITE_BATCH_IN_SET_RANGE) {
2021-05-02 19:18:15 +02:00
try (var batch = new CappedWriteBatch(db,
CAPPED_WRITE_BATCH_CAP,
RESERVED_WRITE_BATCH_SIZE,
MAX_WRITE_BATCH_SIZE,
BATCH_WRITE_OPTIONS
)) {
for (Entry<ByteBuf, ByteBuf> entry : entriesList) {
batch.put(cfh, entry.getKey().retain(), entry.getValue().retain());
}
batch.writeToDbAndClose();
}
} else {
2021-05-02 19:18:15 +02:00
try (var batch = new WriteBatch(RESERVED_WRITE_BATCH_SIZE)) {
for (Entry<ByteBuf, ByteBuf> entry : entriesList) {
batch.put(cfh, LLUtils.toArray(entry.getKey()), LLUtils.toArray(entry.getValue()));
}
db.write(EMPTY_WRITE_OPTIONS, batch);
batch.clear();
}
}
return null;
2021-05-02 19:18:15 +02:00
} finally {
for (Entry<ByteBuf, ByteBuf> entry : entriesList) {
entry.getKey().release();
entry.getValue().release();
}
2021-05-02 19:18:15 +02:00
}
})
.subscribeOn(dbScheduler)
)
)
.then()
.onErrorMap(cause -> new IOException("Failed to write range", cause))
.doFinally(s -> range.release());
} else {
2021-05-03 02:45:29 +02:00
if (USE_WRITE_BATCHES_IN_SET_RANGE) {
return Mono.fromCallable(() -> {
throw new UnsupportedOperationException("Can't use write batches in setRange without window. Please fix params");
});
}
2021-05-02 19:18:15 +02:00
return Flux
.defer(() -> this.getRange(null, range.retain(), false))
.flatMap(oldValue -> Mono
.<Void>fromCallable(() -> {
try {
dbDelete(cfh, EMPTY_WRITE_OPTIONS, oldValue.getKey().retain());
return null;
} finally {
oldValue.getKey().release();
oldValue.getValue().release();
}
})
.subscribeOn(dbScheduler)
)
.then(entries
.flatMap(entry -> this.put(entry.getKey(), entry.getValue(), LLDictionaryResultType.VOID))
2021-05-03 12:29:15 +02:00
.doOnNext(ReferenceCounted::release)
2021-05-02 19:18:15 +02:00
.then(Mono.<Void>empty())
)
.onErrorMap(cause -> new IOException("Failed to write range", cause))
.doFinally(s -> range.release());
}
2021-03-14 03:13:19 +01:00
}
2021-05-03 12:29:15 +02:00
//todo: this is broken, check why
2021-03-20 12:41:11 +01:00
private void deleteSmallRangeWriteBatch(CappedWriteBatch writeBatch, LLRange range)
throws RocksDBException {
2021-05-03 02:57:08 +02:00
try {
var readOpts = getReadOptions(null);
readOpts.setFillCache(false);
ReleasableSlice minBound;
if (range.hasMin()) {
minBound = setIterateBound(readOpts, IterateBound.LOWER, range.getMin().retain());
2021-03-20 12:41:11 +01:00
} else {
2021-05-03 02:57:08 +02:00
minBound = emptyReleasableSlice();
2021-03-20 12:41:11 +01:00
}
2021-05-03 02:57:08 +02:00
try {
ReleasableSlice maxBound;
if (range.hasMax()) {
maxBound = setIterateBound(readOpts, IterateBound.UPPER, range.getMax().retain());
} else {
maxBound = emptyReleasableSlice();
}
try (var rocksIterator = db.newIterator(cfh, readOpts)) {
if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) {
rocksIterSeekTo(rocksIterator, range.getMin().retain());
} else {
rocksIterator.seekToFirst();
}
while (rocksIterator.isValid()) {
2021-05-03 12:44:22 +02:00
writeBatch.delete(cfh, LLUtils.readDirectNioBuffer(alloc, rocksIterator::key));
2021-05-03 02:57:08 +02:00
rocksIterator.next();
}
} finally {
maxBound.release();
}
} finally {
minBound.release();
2021-03-20 12:41:11 +01:00
}
} finally {
range.release();
}
}
2021-05-02 19:18:15 +02:00
private void deleteSmallRangeWriteBatch(WriteBatch writeBatch, LLRange range)
throws RocksDBException {
2021-05-03 02:57:08 +02:00
try {
var readOpts = getReadOptions(null);
readOpts.setFillCache(false);
ReleasableSlice minBound;
if (range.hasMin()) {
2021-05-03 12:44:22 +02:00
minBound = setIterateBound(readOpts, IterateBound.LOWER, range.getMin().retain());
2021-05-02 19:18:15 +02:00
} else {
2021-05-03 02:57:08 +02:00
minBound = emptyReleasableSlice();
}
2021-05-03 02:57:08 +02:00
try {
ReleasableSlice maxBound;
if (range.hasMax()) {
2021-05-03 12:44:22 +02:00
maxBound = setIterateBound(readOpts, IterateBound.UPPER, range.getMax().retain());
2021-05-03 02:57:08 +02:00
} else {
maxBound = emptyReleasableSlice();
}
try (var rocksIterator = db.newIterator(cfh, readOpts)) {
if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) {
2021-05-03 12:44:22 +02:00
rocksIterSeekTo(rocksIterator, range.getMin().retain());
2021-05-03 02:57:08 +02:00
} else {
rocksIterator.seekToFirst();
}
while (rocksIterator.isValid()) {
writeBatch.delete(cfh, rocksIterator.key());
rocksIterator.next();
}
} finally {
maxBound.release();
}
} finally {
minBound.release();
}
2021-05-02 19:18:15 +02:00
} finally {
range.release();
}
}
private static void rocksIterSeekTo(RocksIterator rocksIterator, ByteBuf buffer) {
try {
2021-05-03 21:41:51 +02:00
if (buffer.isDirect()) {
ByteBuffer nioBuffer = LLUtils.toDirect(buffer);
assert nioBuffer.isDirect();
rocksIterator.seek(nioBuffer);
} else if (buffer.hasArray() && buffer.array().length == buffer.readableBytes()) {
rocksIterator.seek(buffer.array());
} else {
rocksIterator.seek(LLUtils.toArray(buffer));
}
} finally {
buffer.release();
}
}
private static ReleasableSlice setIterateBound(ReadOptions readOpts, IterateBound boundType, ByteBuf buffer) {
try {
2021-05-03 21:41:51 +02:00
Objects.requireNonNull(buffer);
AbstractSlice<?> slice;
2021-05-03 21:41:51 +02:00
if (LLLocalDictionary.USE_DIRECT_BUFFER_BOUNDS && buffer.isDirect()) {
ByteBuffer nioBuffer = LLUtils.toDirect(buffer);
2021-05-03 00:29:26 +02:00
assert nioBuffer.isDirect();
slice = new DirectSlice(nioBuffer, buffer.readableBytes());
assert slice.size() == buffer.readableBytes();
assert slice.compare(new Slice(LLUtils.toArray(buffer))) == 0;
2021-05-03 21:41:51 +02:00
if (boundType == IterateBound.LOWER) {
readOpts.setIterateLowerBound(slice);
} else {
readOpts.setIterateUpperBound(slice);
}
return new ReleasableSlice(slice, buffer.retain(), nioBuffer);
2021-05-03 00:29:26 +02:00
} else {
2021-05-03 21:41:51 +02:00
slice = new Slice(Objects.requireNonNull(LLUtils.toArray(buffer)));
if (boundType == IterateBound.LOWER) {
readOpts.setIterateLowerBound(slice);
} else {
readOpts.setIterateUpperBound(slice);
}
return new ReleasableSlice(slice, null, null);
}
} finally {
buffer.release();
}
}
2021-05-02 19:18:15 +02:00
private static ReleasableSlice emptyReleasableSlice() {
var arr = new byte[0];
return new ReleasableSlice(new Slice(arr), null, arr) {
@Override
public void release() {
}
};
}
@Data
@AllArgsConstructor
public static class ReleasableSlice {
AbstractSlice<?> slice;
@Nullable ByteBuf byteBuf;
2021-05-02 19:18:15 +02:00
private @Nullable Object additionalData;
public void release() {
slice.clear();
if (byteBuf != null) {
byteBuf.release();
}
2021-03-20 12:41:11 +01:00
}
}
2021-01-30 00:24:55 +01:00
public Mono<Void> clear() {
return Mono
.<Void>fromCallable(() -> {
2021-03-14 03:13:19 +01:00
var readOpts = getReadOptions(null);
2021-03-14 19:38:20 +01:00
readOpts.setVerifyChecksums(VERIFY_CHECKSUMS_WHEN_NOT_NEEDED);
2021-03-14 13:08:03 +01:00
2021-03-14 03:13:19 +01:00
// readOpts.setIgnoreRangeDeletions(true);
readOpts.setFillCache(false);
2021-03-22 20:02:19 +01:00
//readOpts.setReadaheadSize(2 * 1024 * 1024);
2021-03-14 03:13:19 +01:00
try (CappedWriteBatch writeBatch = new CappedWriteBatch(db,
2021-01-30 00:24:55 +01:00
CAPPED_WRITE_BATCH_CAP,
RESERVED_WRITE_BATCH_SIZE,
MAX_WRITE_BATCH_SIZE,
BATCH_WRITE_OPTIONS
)) {
2020-12-07 22:15:18 +01:00
2021-03-20 12:41:11 +01:00
byte[] firstDeletedKey = null;
byte[] lastDeletedKey = null;
2021-03-14 03:13:19 +01:00
try (RocksIterator iter = db.newIterator(cfh, readOpts)) {
iter.seekToLast();
2020-12-07 22:15:18 +01:00
2021-03-14 03:13:19 +01:00
if (iter.isValid()) {
2021-03-20 12:41:11 +01:00
firstDeletedKey = FIRST_KEY;
lastDeletedKey = iter.key();
2021-03-14 03:13:19 +01:00
writeBatch.deleteRange(cfh, FIRST_KEY, iter.key());
writeBatch.delete(cfh, iter.key());
}
2020-12-07 22:15:18 +01:00
}
2021-01-30 00:24:55 +01:00
writeBatch.writeToDbAndClose();
2020-12-07 22:15:18 +01:00
2021-03-20 12:41:11 +01:00
2021-01-30 00:24:55 +01:00
// Compact range
2021-03-14 03:13:19 +01:00
db.suggestCompactRange(cfh);
2021-03-20 12:41:11 +01:00
if (firstDeletedKey != null && lastDeletedKey != null) {
2021-03-21 13:06:54 +01:00
db.compactRange(cfh,
firstDeletedKey,
lastDeletedKey,
new CompactRangeOptions()
.setAllowWriteStall(false)
.setExclusiveManualCompaction(false)
.setChangeLevel(false)
);
2021-03-20 12:41:11 +01:00
}
2020-12-07 22:15:18 +01:00
2021-01-30 00:24:55 +01:00
db.flush(new FlushOptions().setWaitForFlush(true).setAllowWriteStall(true), cfh);
db.flushWal(true);
2020-12-07 22:15:18 +01:00
}
2021-01-30 00:24:55 +01:00
return null;
})
2021-03-04 22:01:50 +01:00
.onErrorMap(cause -> new IOException("Failed to clear", cause))
.subscribeOn(dbScheduler);
2020-12-07 22:15:18 +01:00
}
@Override
2021-01-30 00:24:55 +01:00
public Mono<Long> sizeRange(@Nullable LLSnapshot snapshot, LLRange range, boolean fast) {
Mono<Long> result;
2021-03-14 13:08:03 +01:00
if (range.isAll()) {
result = Mono
2021-03-14 13:08:03 +01:00
.fromCallable(() -> fast ? fastSizeAll(snapshot) : exactSizeAll(snapshot))
.onErrorMap(IOException::new)
.subscribeOn(dbScheduler);
} else {
result = Mono
2021-03-14 13:08:03 +01:00
.fromCallable(() -> {
var readOpts = resolveSnapshot(snapshot);
readOpts.setFillCache(false);
2021-03-14 19:38:20 +01:00
readOpts.setVerifyChecksums(VERIFY_CHECKSUMS_WHEN_NOT_NEEDED);
ReleasableSlice minBound;
2021-03-14 13:08:03 +01:00
if (range.hasMin()) {
minBound = setIterateBound(readOpts, IterateBound.LOWER, range.getMin().retain());
} else {
2021-05-02 19:18:15 +02:00
minBound = emptyReleasableSlice();
2021-03-14 13:08:03 +01:00
}
try {
2021-05-03 02:57:08 +02:00
ReleasableSlice maxBound;
if (range.hasMax()) {
maxBound = setIterateBound(readOpts, IterateBound.UPPER, range.getMax().retain());
} else {
maxBound = emptyReleasableSlice();
2021-03-18 19:53:32 +01:00
}
2021-05-03 02:57:08 +02:00
try {
if (fast) {
readOpts.setIgnoreRangeDeletions(true);
}
2021-05-03 02:57:08 +02:00
try (var rocksIterator = db.newIterator(cfh, readOpts)) {
if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) {
rocksIterSeekTo(rocksIterator, range.getMin().retain());
} else {
rocksIterator.seekToFirst();
}
long i = 0;
while (rocksIterator.isValid()) {
rocksIterator.next();
i++;
}
return i;
}
2021-05-03 02:57:08 +02:00
} finally {
maxBound.release();
2021-03-14 13:08:03 +01:00
}
} finally {
minBound.release();
2021-03-14 13:08:03 +01:00
}
})
.onErrorMap(cause -> new IOException("Failed to get size of range "
+ range.toString(), cause))
.subscribeOn(dbScheduler);
}
return result.doFinally(s -> range.release());
2020-12-07 22:15:18 +01:00
}
@Override
public Mono<Entry<ByteBuf, ByteBuf>> getOne(@Nullable LLSnapshot snapshot, LLRange range) {
return Mono
.fromCallable(() -> {
2021-03-13 19:01:36 +01:00
var readOpts = resolveSnapshot(snapshot);
ReleasableSlice minBound;
2021-03-13 19:01:36 +01:00
if (range.hasMin()) {
minBound = setIterateBound(readOpts, IterateBound.LOWER, range.getMin().retain());
} else {
2021-05-02 19:18:15 +02:00
minBound = emptyReleasableSlice();
2021-03-13 19:01:36 +01:00
}
2021-05-03 02:57:08 +02:00
try {
ReleasableSlice maxBound;
if (range.hasMax()) {
maxBound = setIterateBound(readOpts, IterateBound.UPPER, range.getMax().retain());
2021-03-18 19:53:32 +01:00
} else {
2021-05-03 02:57:08 +02:00
maxBound = emptyReleasableSlice();
2021-03-18 19:53:32 +01:00
}
2021-05-03 02:57:08 +02:00
try (var rocksIterator = db.newIterator(cfh, readOpts)) {
if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) {
rocksIterSeekTo(rocksIterator, range.getMin().retain());
} else {
rocksIterator.seekToFirst();
}
if (rocksIterator.isValid()) {
ByteBuf key = LLUtils.readDirectNioBuffer(alloc, rocksIterator::key);
2021-05-02 19:18:15 +02:00
try {
2021-05-03 02:57:08 +02:00
ByteBuf value = LLUtils.readDirectNioBuffer(alloc, rocksIterator::value);
try {
return Map.entry(key.retain(), value.retain());
} finally {
value.release();
}
2021-05-02 19:18:15 +02:00
} finally {
2021-05-03 02:57:08 +02:00
key.release();
2021-05-02 19:18:15 +02:00
}
2021-05-03 02:57:08 +02:00
} else {
return null;
2021-05-02 19:18:15 +02:00
}
2021-05-03 02:57:08 +02:00
} finally {
maxBound.release();
}
} finally {
minBound.release();
}
})
.subscribeOn(dbScheduler)
.doFinally(s -> range.release());
}
@Override
public Mono<ByteBuf> getOneKey(@Nullable LLSnapshot snapshot, LLRange range) {
return Mono
.fromCallable(() -> {
2021-03-13 19:01:36 +01:00
var readOpts = resolveSnapshot(snapshot);
ReleasableSlice minBound;
2021-03-13 19:01:36 +01:00
if (range.hasMin()) {
minBound = setIterateBound(readOpts, IterateBound.LOWER, range.getMin().retain());
} else {
2021-05-02 19:18:15 +02:00
minBound = emptyReleasableSlice();
2021-03-13 19:01:36 +01:00
}
2021-05-03 02:57:08 +02:00
try {
ReleasableSlice maxBound;
if (range.hasMax()) {
maxBound = setIterateBound(readOpts, IterateBound.UPPER, range.getMax().retain());
2021-03-18 19:53:32 +01:00
} else {
2021-05-03 02:57:08 +02:00
maxBound = emptyReleasableSlice();
2021-03-18 19:53:32 +01:00
}
2021-05-03 02:57:08 +02:00
try (var rocksIterator = db.newIterator(cfh, readOpts)) {
if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) {
rocksIterSeekTo(rocksIterator, range.getMin().retain());
} else {
rocksIterator.seekToFirst();
}
ByteBuf key;
if (rocksIterator.isValid()) {
key = LLUtils.readDirectNioBuffer(alloc, rocksIterator::key);
return key;
} else {
return null;
}
} finally {
maxBound.release();
}
} finally {
minBound.release();
}
})
.subscribeOn(dbScheduler)
.doFinally(s -> range.release());
}
2021-01-30 00:24:55 +01:00
private long fastSizeAll(@Nullable LLSnapshot snapshot) {
2021-03-14 03:13:19 +01:00
var rocksdbSnapshot = resolveSnapshot(snapshot);
2020-12-07 22:15:18 +01:00
if (USE_CURRENT_FASTSIZE_FOR_OLD_SNAPSHOTS || rocksdbSnapshot.snapshot() == null) {
try {
return db.getLongProperty(cfh, "rocksdb.estimate-num-keys");
} catch (RocksDBException e) {
e.printStackTrace();
return 0;
}
2021-03-18 19:53:32 +01:00
} else if (PARALLEL_EXACT_SIZE) {
return exactSizeAll(snapshot);
2020-12-07 22:15:18 +01:00
} else {
2021-03-13 19:01:36 +01:00
rocksdbSnapshot.setFillCache(false);
2021-03-14 19:38:20 +01:00
rocksdbSnapshot.setVerifyChecksums(VERIFY_CHECKSUMS_WHEN_NOT_NEEDED);
2021-03-18 19:53:32 +01:00
rocksdbSnapshot.setIgnoreRangeDeletions(true);
2020-12-07 22:15:18 +01:00
long count = 0;
try (RocksIterator iter = db.newIterator(cfh, rocksdbSnapshot)) {
iter.seekToFirst();
2021-03-18 19:53:32 +01:00
// If it's a fast size of a snapshot, count only up to 100'000 elements
while (iter.isValid() && count < 100_000) {
2020-12-07 22:15:18 +01:00
count++;
iter.next();
}
return count;
}
}
}
2021-01-30 00:24:55 +01:00
private long exactSizeAll(@Nullable LLSnapshot snapshot) {
2021-03-13 19:01:36 +01:00
var readOpts = resolveSnapshot(snapshot);
readOpts.setFillCache(false);
2021-03-22 20:02:19 +01:00
//readOpts.setReadaheadSize(2 * 1024 * 1024);
2021-03-14 19:38:20 +01:00
readOpts.setVerifyChecksums(VERIFY_CHECKSUMS_WHEN_NOT_NEEDED);
2021-03-14 03:13:19 +01:00
2021-03-18 19:53:32 +01:00
if (PARALLEL_EXACT_SIZE) {
var commonPool = ForkJoinPool.commonPool();
var futures = IntStream
.range(-1, LLUtils.LEXICONOGRAPHIC_ITERATION_SEEKS.length)
.mapToObj(idx -> Pair.of(idx == -1 ? new byte[0] : LLUtils.LEXICONOGRAPHIC_ITERATION_SEEKS[idx],
idx + 1 >= LLUtils.LEXICONOGRAPHIC_ITERATION_SEEKS.length ? null
2021-03-18 19:53:32 +01:00
: LLUtils.LEXICONOGRAPHIC_ITERATION_SEEKS[idx + 1]
))
.map(range -> (Callable<Long>) () -> {
long partialCount = 0;
var rangeReadOpts = new ReadOptions(readOpts);
Slice sliceBegin;
if (range.getKey() != null) {
sliceBegin = new Slice(range.getKey());
} else {
sliceBegin = null;
}
Slice sliceEnd;
if (range.getValue() != null) {
sliceEnd = new Slice(range.getValue());
} else {
sliceEnd = null;
}
try {
if (sliceBegin != null) {
rangeReadOpts.setIterateLowerBound(sliceBegin);
}
if (sliceBegin != null) {
rangeReadOpts.setIterateUpperBound(sliceEnd);
}
try (RocksIterator iter = db.newIterator(cfh, rangeReadOpts)) {
iter.seekToFirst();
while (iter.isValid()) {
partialCount++;
iter.next();
}
return partialCount;
}
} finally {
if (sliceBegin != null) {
sliceBegin.close();
}
if (sliceEnd != null) {
sliceEnd.close();
}
}
})
.map(commonPool::submit)
.collect(Collectors.toList());
long count = 0;
for (ForkJoinTask<Long> future : futures) {
count += future.join();
2020-12-07 22:15:18 +01:00
}
return count;
2021-03-18 19:53:32 +01:00
} else {
long count = 0;
try (RocksIterator iter = db.newIterator(cfh, readOpts)) {
iter.seekToFirst();
while (iter.isValid()) {
count++;
iter.next();
}
return count;
}
2020-12-07 22:15:18 +01:00
}
}
@Override
public Mono<Entry<ByteBuf, ByteBuf>> removeOne(LLRange range) {
2021-01-30 00:24:55 +01:00
return Mono
.fromCallable(() -> {
2021-03-13 19:01:36 +01:00
var readOpts = getReadOptions(null);
ReleasableSlice minBound;
2021-03-13 19:01:36 +01:00
if (range.hasMin()) {
minBound = setIterateBound(readOpts, IterateBound.LOWER, range.getMin().retain());
} else {
2021-05-02 19:18:15 +02:00
minBound = emptyReleasableSlice();
2021-03-13 19:01:36 +01:00
}
2021-05-03 02:57:08 +02:00
try {
ReleasableSlice maxBound;
if (range.hasMax()) {
maxBound = setIterateBound(readOpts, IterateBound.UPPER, range.getMax().retain());
2021-03-14 03:13:19 +01:00
} else {
2021-05-03 02:57:08 +02:00
maxBound = emptyReleasableSlice();
2021-03-14 03:13:19 +01:00
}
2021-05-03 02:57:08 +02:00
try (RocksIterator rocksIterator = db.newIterator(cfh, readOpts)) {
if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) {
rocksIterSeekTo(rocksIterator, range.getMin().retain());
} else {
rocksIterator.seekToFirst();
}
if (!rocksIterator.isValid()) {
return null;
}
ByteBuf key = LLUtils.readDirectNioBuffer(alloc, rocksIterator::key);
ByteBuf value = LLUtils.readDirectNioBuffer(alloc, rocksIterator::value);
dbDelete(cfh, null, key);
return Map.entry(key, value);
} finally {
maxBound.release();
2021-01-30 00:24:55 +01:00
}
} finally {
minBound.release();
2021-01-30 00:24:55 +01:00
}
})
2021-03-04 22:01:50 +01:00
.onErrorMap(cause -> new IOException("Failed to delete " + range.toString(), cause))
.subscribeOn(dbScheduler)
.doFinally(s -> range.release());
2020-12-07 22:15:18 +01:00
}
2021-04-03 19:09:06 +02:00
@NotNull
public static Tuple3<RocksIterator, ReleasableSlice, ReleasableSlice> getRocksIterator(ReadOptions readOptions,
2021-04-03 19:09:06 +02:00
LLRange range,
RocksDB db,
ColumnFamilyHandle cfh) {
try {
ReleasableSlice sliceMin;
ReleasableSlice sliceMax;
if (range.hasMin()) {
2021-05-03 00:29:26 +02:00
sliceMin = setIterateBound(readOptions, IterateBound.LOWER, range.getMin().retain());
} else {
2021-05-02 19:18:15 +02:00
sliceMin = emptyReleasableSlice();
}
if (range.hasMax()) {
2021-05-03 00:29:26 +02:00
sliceMax = setIterateBound(readOptions, IterateBound.UPPER, range.getMax().retain());
} else {
2021-05-02 19:18:15 +02:00
sliceMax = emptyReleasableSlice();
}
var rocksIterator = db.newIterator(cfh, readOptions);
if (!PREFER_SEEK_TO_FIRST && range.hasMin()) {
2021-05-03 00:29:26 +02:00
rocksIterSeekTo(rocksIterator, range.getMin().retain());
} else {
rocksIterator.seekToFirst();
}
return Tuples.of(rocksIterator, sliceMin, sliceMax);
} finally {
range.release();
2021-04-03 19:09:06 +02:00
}
}
2020-12-07 22:15:18 +01:00
}