Remove unneeded parameter
This commit is contained in:
parent
ed37a769e2
commit
77af845a8a
@ -32,27 +32,14 @@ public interface LLDictionary extends LLKeyValueDatabaseStructure {
|
||||
|
||||
default Mono<Send<Buffer>> update(Mono<Send<Buffer>> key,
|
||||
SerializationFunction<@Nullable Send<Buffer>, @Nullable Buffer> updater,
|
||||
UpdateReturnMode updateReturnMode,
|
||||
boolean existsAlmostCertainly) {
|
||||
UpdateReturnMode updateReturnMode) {
|
||||
return this
|
||||
.updateAndGetDelta(key, updater, existsAlmostCertainly)
|
||||
.updateAndGetDelta(key, updater)
|
||||
.transform(prev -> LLUtils.resolveLLDelta(prev, updateReturnMode));
|
||||
}
|
||||
|
||||
default Mono<Send<Buffer>> update(Mono<Send<Buffer>> key,
|
||||
SerializationFunction<@Nullable Send<Buffer>, @Nullable Buffer> updater,
|
||||
UpdateReturnMode returnMode) {
|
||||
return update(key, updater, returnMode, false);
|
||||
}
|
||||
|
||||
Mono<Send<LLDelta>> updateAndGetDelta(Mono<Send<Buffer>> key,
|
||||
SerializationFunction<@Nullable Send<Buffer>, @Nullable Buffer> updater,
|
||||
boolean existsAlmostCertainly);
|
||||
|
||||
default Mono<Send<LLDelta>> updateAndGetDelta(Mono<Send<Buffer>> key,
|
||||
SerializationFunction<@Nullable Send<Buffer>, @Nullable Buffer> updater) {
|
||||
return updateAndGetDelta(key, updater, false);
|
||||
}
|
||||
SerializationFunction<@Nullable Send<Buffer>, @Nullable Buffer> updater);
|
||||
|
||||
Mono<Void> clear();
|
||||
|
||||
|
@ -228,21 +228,19 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
|
||||
@Override
|
||||
public Mono<U> updateValue(T keySuffix,
|
||||
UpdateReturnMode updateReturnMode,
|
||||
boolean existsAlmostCertainly,
|
||||
SerializationFunction<@Nullable U, @Nullable U> updater) {
|
||||
var keyMono = Mono.fromCallable(() -> serializeKeySuffixToKey(keySuffix).send());
|
||||
return dictionary
|
||||
.update(keyMono, getSerializedUpdater(updater), updateReturnMode, existsAlmostCertainly)
|
||||
.update(keyMono, getSerializedUpdater(updater), updateReturnMode)
|
||||
.handle(this::deserializeValue);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<Delta<U>> updateValueAndGetDelta(T keySuffix,
|
||||
boolean existsAlmostCertainly,
|
||||
SerializationFunction<@Nullable U, @Nullable U> updater) {
|
||||
var keyMono = Mono.fromCallable(() -> serializeKeySuffixToKey(keySuffix).send());
|
||||
return dictionary
|
||||
.updateAndGetDelta(keyMono, getSerializedUpdater(updater), existsAlmostCertainly)
|
||||
.updateAndGetDelta(keyMono, getSerializedUpdater(updater))
|
||||
.transform(mono -> LLUtils.mapLLDelta(mono, serializedToReceive -> {
|
||||
try (var serialized = serializedToReceive.receive()) {
|
||||
return valueSerializer.deserialize(serialized);
|
||||
|
@ -117,8 +117,7 @@ public class DatabaseSingle<U> extends ResourceSupport<DatabaseStage<U>, Databas
|
||||
|
||||
@Override
|
||||
public Mono<U> update(SerializationFunction<@Nullable U, @Nullable U> updater,
|
||||
UpdateReturnMode updateReturnMode,
|
||||
boolean existsAlmostCertainly) {
|
||||
UpdateReturnMode updateReturnMode) {
|
||||
return dictionary
|
||||
.update(keyMono, (oldValueSer) -> {
|
||||
try (oldValueSer) {
|
||||
@ -138,13 +137,12 @@ public class DatabaseSingle<U> extends ResourceSupport<DatabaseStage<U>, Databas
|
||||
return serializeValue(result).receive();
|
||||
}
|
||||
}
|
||||
}, updateReturnMode, existsAlmostCertainly)
|
||||
}, updateReturnMode)
|
||||
.handle(this::deserializeValue);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<Delta<U>> updateAndGetDelta(SerializationFunction<@Nullable U, @Nullable U> updater,
|
||||
boolean existsAlmostCertainly) {
|
||||
public Mono<Delta<U>> updateAndGetDelta(SerializationFunction<@Nullable U, @Nullable U> updater) {
|
||||
return dictionary
|
||||
.updateAndGetDelta(keyMono, (oldValueSer) -> {
|
||||
try (oldValueSer) {
|
||||
@ -164,7 +162,7 @@ public class DatabaseSingle<U> extends ResourceSupport<DatabaseStage<U>, Databas
|
||||
return serializeValue(result).receive();
|
||||
}
|
||||
}
|
||||
}, existsAlmostCertainly).transform(mono -> LLUtils.mapLLDelta(mono, serialized -> {
|
||||
}).transform(mono -> LLUtils.mapLLDelta(mono, serialized -> {
|
||||
try (var valueBuf = serialized.receive()) {
|
||||
return serializer.deserialize(valueBuf);
|
||||
}
|
||||
|
@ -107,9 +107,7 @@ public class DatabaseSingleBucket<K, V, TH>
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<V> update(SerializationFunction<@Nullable V, @Nullable V> updater,
|
||||
UpdateReturnMode updateReturnMode,
|
||||
boolean existsAlmostCertainly) {
|
||||
public Mono<V> update(SerializationFunction<@Nullable V, @Nullable V> updater, UpdateReturnMode updateReturnMode) {
|
||||
return bucketStage
|
||||
.update(oldBucket -> {
|
||||
V oldValue = extractValue(oldBucket);
|
||||
@ -120,13 +118,12 @@ public class DatabaseSingleBucket<K, V, TH>
|
||||
} else {
|
||||
return this.insertValueOrCreate(oldBucket, newValue);
|
||||
}
|
||||
}, updateReturnMode, existsAlmostCertainly)
|
||||
}, updateReturnMode)
|
||||
.flatMap(this::extractValueTransformation);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<Delta<V>> updateAndGetDelta(SerializationFunction<@Nullable V, @Nullable V> updater,
|
||||
boolean existsAlmostCertainly) {
|
||||
public Mono<Delta<V>> updateAndGetDelta(SerializationFunction<@Nullable V, @Nullable V> updater) {
|
||||
return bucketStage
|
||||
.updateAndGetDelta(oldBucket -> {
|
||||
V oldValue = extractValue(oldBucket);
|
||||
@ -136,7 +133,7 @@ public class DatabaseSingleBucket<K, V, TH>
|
||||
} else {
|
||||
return this.insertValueOrCreate(oldBucket, result);
|
||||
}
|
||||
}, existsAlmostCertainly)
|
||||
})
|
||||
.transform(mono -> LLUtils.mapDelta(mono, this::extractValue));
|
||||
}
|
||||
|
||||
|
@ -113,8 +113,7 @@ public class DatabaseSingleMapped<A, B> extends ResourceSupport<DatabaseStage<A>
|
||||
|
||||
@Override
|
||||
public Mono<A> update(SerializationFunction<@Nullable A, @Nullable A> updater,
|
||||
UpdateReturnMode updateReturnMode,
|
||||
boolean existsAlmostCertainly) {
|
||||
UpdateReturnMode updateReturnMode) {
|
||||
return serializedSingle.update(oldValue -> {
|
||||
var result = updater.apply(oldValue == null ? null : this.unMap(oldValue));
|
||||
if (result == null) {
|
||||
@ -122,12 +121,11 @@ public class DatabaseSingleMapped<A, B> extends ResourceSupport<DatabaseStage<A>
|
||||
} else {
|
||||
return this.map(result);
|
||||
}
|
||||
}, updateReturnMode, existsAlmostCertainly).handle(this::deserializeSink);
|
||||
}, updateReturnMode).handle(this::deserializeSink);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<Delta<A>> updateAndGetDelta(SerializationFunction<@Nullable A, @Nullable A> updater,
|
||||
boolean existsAlmostCertainly) {
|
||||
public Mono<Delta<A>> updateAndGetDelta(SerializationFunction<@Nullable A, @Nullable A> updater) {
|
||||
return serializedSingle.updateAndGetDelta(oldValue -> {
|
||||
var result = updater.apply(oldValue == null ? null : this.unMap(oldValue));
|
||||
if (result == null) {
|
||||
@ -135,7 +133,7 @@ public class DatabaseSingleMapped<A, B> extends ResourceSupport<DatabaseStage<A>
|
||||
} else {
|
||||
return this.map(result);
|
||||
}
|
||||
}, existsAlmostCertainly).transform(mono -> LLUtils.mapDelta(mono, this::unMap));
|
||||
}).transform(mono -> LLUtils.mapDelta(mono, this::unMap));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -47,23 +47,13 @@ public interface DatabaseStage<T> extends DatabaseStageWithEntry<T>, Resource<Da
|
||||
}
|
||||
|
||||
default Mono<T> update(SerializationFunction<@Nullable T, @Nullable T> updater,
|
||||
UpdateReturnMode updateReturnMode,
|
||||
boolean existsAlmostCertainly) {
|
||||
UpdateReturnMode updateReturnMode) {
|
||||
return this
|
||||
.updateAndGetDelta(updater, existsAlmostCertainly)
|
||||
.updateAndGetDelta(updater)
|
||||
.transform(prev -> LLUtils.resolveDelta(prev, updateReturnMode));
|
||||
}
|
||||
|
||||
default Mono<T> update(SerializationFunction<@Nullable T, @Nullable T> updater, UpdateReturnMode updateReturnMode) {
|
||||
return update(updater, updateReturnMode, false);
|
||||
}
|
||||
|
||||
Mono<Delta<T>> updateAndGetDelta(SerializationFunction<@Nullable T, @Nullable T> updater,
|
||||
boolean existsAlmostCertainly);
|
||||
|
||||
default Mono<Delta<T>> updateAndGetDelta(SerializationFunction<@Nullable T, @Nullable T> updater) {
|
||||
return updateAndGetDelta(updater, false);
|
||||
}
|
||||
Mono<Delta<T>> updateAndGetDelta(SerializationFunction<@Nullable T, @Nullable T> updater);
|
||||
|
||||
default Mono<Void> clear() {
|
||||
return clearAndGetStatus().then();
|
||||
|
@ -55,41 +55,27 @@ public interface DatabaseStageMap<T, U, US extends DatabaseStage<U>> extends
|
||||
|
||||
default Mono<U> updateValue(T key,
|
||||
UpdateReturnMode updateReturnMode,
|
||||
boolean existsAlmostCertainly,
|
||||
SerializationFunction<@Nullable U, @Nullable U> updater) {
|
||||
return LLUtils.usingResource(this.at(null, key).single(),
|
||||
stage -> stage.update(updater, updateReturnMode, existsAlmostCertainly), true);
|
||||
stage -> stage.update(updater, updateReturnMode), true);
|
||||
}
|
||||
|
||||
default Flux<Boolean> updateMulti(Flux<T> keys, KVSerializationFunction<T, @Nullable U, @Nullable U> updater) {
|
||||
return keys.flatMapSequential(key -> this.updateValue(key, prevValue -> updater.apply(key, prevValue)));
|
||||
}
|
||||
|
||||
default Mono<U> updateValue(T key, UpdateReturnMode updateReturnMode, SerializationFunction<@Nullable U, @Nullable U> updater) {
|
||||
return updateValue(key, updateReturnMode, false, updater);
|
||||
}
|
||||
|
||||
default Mono<Boolean> updateValue(T key, SerializationFunction<@Nullable U, @Nullable U> updater) {
|
||||
return updateValueAndGetDelta(key, false, updater).map(LLUtils::isDeltaChanged).single();
|
||||
}
|
||||
|
||||
default Mono<Boolean> updateValue(T key, boolean existsAlmostCertainly, SerializationFunction<@Nullable U, @Nullable U> updater) {
|
||||
return updateValueAndGetDelta(key, existsAlmostCertainly, updater).map(LLUtils::isDeltaChanged).single();
|
||||
return updateValueAndGetDelta(key, updater).map(LLUtils::isDeltaChanged).single();
|
||||
}
|
||||
|
||||
default Mono<Delta<U>> updateValueAndGetDelta(T key,
|
||||
boolean existsAlmostCertainly,
|
||||
SerializationFunction<@Nullable U, @Nullable U> updater) {
|
||||
var stageMono = this.at(null, key).single();
|
||||
return stageMono.flatMap(stage -> stage
|
||||
.updateAndGetDelta(updater, existsAlmostCertainly)
|
||||
.updateAndGetDelta(updater)
|
||||
.doFinally(s -> stage.close()));
|
||||
}
|
||||
|
||||
default Mono<Delta<U>> updateValueAndGetDelta(T key, SerializationFunction<@Nullable U, @Nullable U> updater) {
|
||||
return updateValueAndGetDelta(key, false, updater);
|
||||
}
|
||||
|
||||
default Mono<U> putValueAndGetPrevious(T key, U value) {
|
||||
return LLUtils.usingResource(at(null, key).single(), stage -> stage.setAndGetPrevious(value), true);
|
||||
}
|
||||
@ -203,8 +189,7 @@ public interface DatabaseStageMap<T, U, US extends DatabaseStage<U>> extends
|
||||
}
|
||||
|
||||
@Override
|
||||
default Mono<Delta<Object2ObjectSortedMap<T, U>>> updateAndGetDelta(SerializationFunction<@Nullable Object2ObjectSortedMap<T, U>, @Nullable Object2ObjectSortedMap<T, U>> updater,
|
||||
boolean existsAlmostCertainly) {
|
||||
default Mono<Delta<Object2ObjectSortedMap<T, U>>> updateAndGetDelta(SerializationFunction<@Nullable Object2ObjectSortedMap<T, U>, @Nullable Object2ObjectSortedMap<T, U>> updater) {
|
||||
return this
|
||||
.getUpdateMode()
|
||||
.single()
|
||||
|
@ -83,7 +83,7 @@ public sealed abstract class AbstractRocksDBColumn<T extends RocksDB> implements
|
||||
}
|
||||
|
||||
@Override
|
||||
public @Nullable Buffer get(@NotNull ReadOptions readOptions, Buffer key, boolean existsAlmostCertainly)
|
||||
public @Nullable Buffer get(@NotNull ReadOptions readOptions, Buffer key)
|
||||
throws RocksDBException {
|
||||
if (Schedulers.isInNonBlockingThread()) {
|
||||
throw new UnsupportedOperationException("Called dbGet in a nonblocking thread");
|
||||
@ -177,9 +177,9 @@ public sealed abstract class AbstractRocksDBColumn<T extends RocksDB> implements
|
||||
try {
|
||||
byte[] keyArray = LLUtils.toArray(key);
|
||||
requireNonNull(keyArray);
|
||||
Holder<byte[]> data = existsAlmostCertainly ? null : new Holder<>();
|
||||
if (existsAlmostCertainly || db.keyMayExist(cfh, readOptions, keyArray, data)) {
|
||||
if (!existsAlmostCertainly && data.getValue() != null) {
|
||||
Holder<byte[]> data = new Holder<>();
|
||||
if (db.keyMayExist(cfh, readOptions, keyArray, data)) {
|
||||
if (data.getValue() != null) {
|
||||
return LLUtils.fromByteArray(alloc, data.getValue());
|
||||
} else {
|
||||
byte[] result = db.get(cfh, readOptions, keyArray);
|
||||
|
@ -261,7 +261,7 @@ public class LLLocalDictionary implements LLDictionary {
|
||||
Buffer result;
|
||||
startedGet.increment();
|
||||
try {
|
||||
result = getTime.recordCallable(() -> db.get(readOptions, key, existsAlmostCertainly));
|
||||
result = getTime.recordCallable(() -> db.get(readOptions, key));
|
||||
} finally {
|
||||
endedGet.increment();
|
||||
}
|
||||
@ -431,8 +431,7 @@ public class LLLocalDictionary implements LLDictionary {
|
||||
@Override
|
||||
public Mono<Send<Buffer>> update(Mono<Send<Buffer>> keyMono,
|
||||
SerializationFunction<@Nullable Send<Buffer>, @Nullable Buffer> updater,
|
||||
UpdateReturnMode updateReturnMode,
|
||||
boolean existsAlmostCertainly) {
|
||||
UpdateReturnMode updateReturnMode) {
|
||||
return keyMono
|
||||
.publishOn(dbScheduler)
|
||||
.handle((keySend, sink) -> {
|
||||
@ -451,7 +450,7 @@ public class LLLocalDictionary implements LLDictionary {
|
||||
startedUpdates.increment();
|
||||
try {
|
||||
result = updateTime.recordCallable(() -> db.updateAtomic(EMPTY_READ_OPTIONS,
|
||||
EMPTY_WRITE_OPTIONS, keySend, updater, existsAlmostCertainly, returnMode));
|
||||
EMPTY_WRITE_OPTIONS, keySend, updater, returnMode));
|
||||
} finally {
|
||||
endedUpdates.increment();
|
||||
}
|
||||
@ -475,8 +474,7 @@ public class LLLocalDictionary implements LLDictionary {
|
||||
@SuppressWarnings("DuplicatedCode")
|
||||
@Override
|
||||
public Mono<Send<LLDelta>> updateAndGetDelta(Mono<Send<Buffer>> keyMono,
|
||||
SerializationFunction<@Nullable Send<Buffer>, @Nullable Buffer> updater,
|
||||
boolean existsAlmostCertainly) {
|
||||
SerializationFunction<@Nullable Send<Buffer>, @Nullable Buffer> updater) {
|
||||
return keyMono
|
||||
.publishOn(dbScheduler)
|
||||
.handle((keySend, sink) -> {
|
||||
@ -496,7 +494,7 @@ public class LLLocalDictionary implements LLDictionary {
|
||||
startedUpdates.increment();
|
||||
try {
|
||||
result = updateTime.recordCallable(() -> db.updateAtomic(EMPTY_READ_OPTIONS,
|
||||
EMPTY_WRITE_OPTIONS, keySend, updater, existsAlmostCertainly, UpdateAtomicResultMode.DELTA));
|
||||
EMPTY_WRITE_OPTIONS, keySend, updater, UpdateAtomicResultMode.DELTA));
|
||||
} finally {
|
||||
endedUpdates.increment();
|
||||
}
|
||||
@ -556,7 +554,7 @@ public class LLLocalDictionary implements LLDictionary {
|
||||
.handle((keySend, sink) -> {
|
||||
try (var key = keySend.receive()) {
|
||||
assert !Schedulers.isInNonBlockingThread() : "Called getPreviousData in a nonblocking thread";
|
||||
var result = db.get(EMPTY_READ_OPTIONS, key, existsAlmostCertainly);
|
||||
var result = db.get(EMPTY_READ_OPTIONS, key);
|
||||
logger.trace(MARKER_ROCKSDB, "Reading {}: {}", () -> toStringSafe(key), () -> toStringSafe(result));
|
||||
if (result == null) {
|
||||
sink.complete();
|
||||
|
@ -119,8 +119,8 @@ public class LLLocalSingleton implements LLSingleton {
|
||||
case GET_NEW_VALUE -> UpdateAtomicResultMode.CURRENT;
|
||||
case GET_OLD_VALUE -> UpdateAtomicResultMode.PREVIOUS;
|
||||
};
|
||||
UpdateAtomicResult result = db.updateAtomic(EMPTY_READ_OPTIONS, EMPTY_WRITE_OPTIONS, keySend, updater,
|
||||
true, returnMode);
|
||||
UpdateAtomicResult result
|
||||
= db.updateAtomic(EMPTY_READ_OPTIONS, EMPTY_WRITE_OPTIONS, keySend, updater, returnMode);
|
||||
return switch (updateReturnMode) {
|
||||
case NOTHING -> null;
|
||||
case GET_NEW_VALUE -> ((UpdateAtomicResultCurrent) result).current();
|
||||
|
@ -68,7 +68,6 @@ public final class OptimisticRocksDBColumn extends AbstractRocksDBColumn<Optimis
|
||||
@NotNull WriteOptions writeOptions,
|
||||
Send<Buffer> keySend,
|
||||
SerializationFunction<@Nullable Send<Buffer>, @Nullable Buffer> updater,
|
||||
boolean existsAlmostCertainly,
|
||||
UpdateAtomicResultMode returnMode) throws IOException, RocksDBException {
|
||||
try (Buffer key = keySend.receive()) {
|
||||
try {
|
||||
|
@ -50,7 +50,6 @@ public final class PessimisticRocksDBColumn extends AbstractRocksDBColumn<Transa
|
||||
@NotNull WriteOptions writeOptions,
|
||||
Send<Buffer> keySend,
|
||||
SerializationFunction<@Nullable Send<Buffer>, @Nullable Buffer> updater,
|
||||
boolean existsAlmostCertainly,
|
||||
UpdateAtomicResultMode returnMode) throws IOException, RocksDBException {
|
||||
try (Buffer key = keySend.receive()) {
|
||||
try {
|
||||
|
@ -5,13 +5,9 @@ import io.net5.buffer.api.Buffer;
|
||||
import io.net5.buffer.api.BufferAllocator;
|
||||
import io.net5.buffer.api.Send;
|
||||
import it.cavallium.dbengine.database.LLUtils;
|
||||
import it.cavallium.dbengine.database.RepeatedElementList;
|
||||
import it.cavallium.dbengine.database.UpdateReturnMode;
|
||||
import it.cavallium.dbengine.database.serialization.SerializationFunction;
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.List;
|
||||
import java.util.function.Function;
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
import org.jetbrains.annotations.Nullable;
|
||||
import org.rocksdb.ColumnFamilyHandle;
|
||||
@ -20,8 +16,6 @@ import org.rocksdb.FlushOptions;
|
||||
import org.rocksdb.ReadOptions;
|
||||
import org.rocksdb.RocksDBException;
|
||||
import org.rocksdb.RocksIterator;
|
||||
import org.rocksdb.Transaction;
|
||||
import org.rocksdb.TransactionOptions;
|
||||
import org.rocksdb.WriteBatch;
|
||||
import org.rocksdb.WriteOptions;
|
||||
|
||||
@ -34,7 +28,7 @@ public sealed interface RocksDBColumn permits AbstractRocksDBColumn {
|
||||
var allocator = getAllocator();
|
||||
try (var keyBuf = allocator.allocate(key.length)) {
|
||||
keyBuf.writeBytes(key);
|
||||
var result = this.get(readOptions, keyBuf, existsAlmostCertainly);
|
||||
var result = this.get(readOptions, keyBuf);
|
||||
if (result == null) {
|
||||
return null;
|
||||
}
|
||||
@ -43,8 +37,7 @@ public sealed interface RocksDBColumn permits AbstractRocksDBColumn {
|
||||
}
|
||||
|
||||
@Nullable
|
||||
Buffer get(@NotNull ReadOptions readOptions, Buffer key,
|
||||
boolean existsAlmostCertainly) throws RocksDBException;
|
||||
Buffer get(@NotNull ReadOptions readOptions, Buffer key) throws RocksDBException;
|
||||
|
||||
boolean exists(@NotNull ReadOptions readOptions, Buffer key) throws RocksDBException;
|
||||
|
||||
@ -68,7 +61,7 @@ public sealed interface RocksDBColumn permits AbstractRocksDBColumn {
|
||||
|
||||
@NotNull UpdateAtomicResult updateAtomic(@NotNull ReadOptions readOptions, @NotNull WriteOptions writeOptions,
|
||||
Send<Buffer> keySend, SerializationFunction<@Nullable Send<Buffer>, @Nullable Buffer> updater,
|
||||
boolean existsAlmostCertainly, UpdateAtomicResultMode returnMode) throws RocksDBException, IOException;
|
||||
UpdateAtomicResultMode returnMode) throws RocksDBException, IOException;
|
||||
|
||||
void delete(WriteOptions writeOptions, Buffer key) throws RocksDBException;
|
||||
|
||||
|
@ -14,7 +14,6 @@ import java.io.IOException;
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
import org.jetbrains.annotations.Nullable;
|
||||
import org.rocksdb.ColumnFamilyHandle;
|
||||
import org.rocksdb.Holder;
|
||||
import org.rocksdb.ReadOptions;
|
||||
import org.rocksdb.RocksDB;
|
||||
import org.rocksdb.RocksDBException;
|
||||
@ -45,43 +44,23 @@ public final class StandardRocksDBColumn extends AbstractRocksDBColumn<RocksDB>
|
||||
@NotNull WriteOptions writeOptions,
|
||||
Send<Buffer> keySend,
|
||||
SerializationFunction<@Nullable Send<Buffer>, @Nullable Buffer> updater,
|
||||
boolean existsAlmostCertainly,
|
||||
UpdateAtomicResultMode returnMode) throws IOException, RocksDBException {
|
||||
try (Buffer key = keySend.receive()) {
|
||||
try {
|
||||
var cfh = getCfh();
|
||||
var db = getDb();
|
||||
var alloc = getAllocator();
|
||||
@Nullable Buffer prevData;
|
||||
var prevDataHolder = existsAlmostCertainly ? null : new Holder<byte[]>();
|
||||
if (existsAlmostCertainly || db.keyMayExist(cfh, LLUtils.toArray(key), prevDataHolder)) {
|
||||
if (!existsAlmostCertainly && prevDataHolder.getValue() != null) {
|
||||
byte @Nullable [] prevDataBytes = prevDataHolder.getValue();
|
||||
if (prevDataBytes != null) {
|
||||
prevData = LLUtils.fromByteArray(alloc, prevDataBytes);
|
||||
} else {
|
||||
prevData = null;
|
||||
}
|
||||
} else {
|
||||
prevData = this.get(readOptions, key, existsAlmostCertainly);
|
||||
@Nullable Buffer prevData = this.get(readOptions, key);
|
||||
try (prevData) {
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace(MARKER_ROCKSDB,
|
||||
"Reading {}: {} (before update)",
|
||||
LLUtils.toStringSafe(key),
|
||||
LLUtils.toStringSafe(prevData)
|
||||
);
|
||||
}
|
||||
} else {
|
||||
prevData = null;
|
||||
}
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace(MARKER_ROCKSDB, "Reading {}: {} (before update)", LLUtils.toStringSafe(key),
|
||||
LLUtils.toStringSafe(prevData));
|
||||
}
|
||||
try {
|
||||
@Nullable Buffer newData;
|
||||
try (Buffer prevDataToSendToUpdater = prevData == null ? null : prevData.copy()) {
|
||||
try (var sentData = prevDataToSendToUpdater == null ? null : prevDataToSendToUpdater.send()) {
|
||||
try (var newDataToReceive = updater.apply(sentData)) {
|
||||
if (newDataToReceive != null) {
|
||||
newData = newDataToReceive;
|
||||
} else {
|
||||
newData = null;
|
||||
}
|
||||
newData = newDataToReceive;
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -90,8 +69,11 @@ public final class StandardRocksDBColumn extends AbstractRocksDBColumn<RocksDB>
|
||||
try {
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace(MARKER_ROCKSDB,
|
||||
"Updating {}. previous data: {}, updated data: {}", LLUtils.toStringSafe(key),
|
||||
LLUtils.toStringSafe(prevData), LLUtils.toStringSafe(newData));
|
||||
"Updating {}. previous data: {}, updated data: {}",
|
||||
LLUtils.toStringSafe(key),
|
||||
LLUtils.toStringSafe(prevData),
|
||||
LLUtils.toStringSafe(newData)
|
||||
);
|
||||
}
|
||||
if (prevData != null && newData == null) {
|
||||
if (logger.isTraceEnabled()) {
|
||||
@ -101,8 +83,11 @@ public final class StandardRocksDBColumn extends AbstractRocksDBColumn<RocksDB>
|
||||
changed = true;
|
||||
} else if (newData != null && (prevData == null || !LLUtils.equals(prevData, newData))) {
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace(MARKER_ROCKSDB, "Writing {}: {} (after update)", LLUtils.toStringSafe(key),
|
||||
LLUtils.toStringSafe(newData));
|
||||
logger.trace(MARKER_ROCKSDB,
|
||||
"Writing {}: {} (after update)",
|
||||
LLUtils.toStringSafe(key),
|
||||
LLUtils.toStringSafe(newData)
|
||||
);
|
||||
}
|
||||
Buffer dataToPut;
|
||||
if (returnMode == UpdateAtomicResultMode.CURRENT) {
|
||||
@ -153,10 +138,6 @@ public final class StandardRocksDBColumn extends AbstractRocksDBColumn<RocksDB>
|
||||
newData.close();
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
if (prevData != null) {
|
||||
prevData.close();
|
||||
}
|
||||
}
|
||||
} catch (Throwable ex) {
|
||||
throw new IOException("Failed to update key " + LLUtils.toStringSafe(key), ex);
|
||||
|
@ -197,8 +197,7 @@ public class LLMemoryDictionary implements LLDictionary {
|
||||
|
||||
@Override
|
||||
public Mono<Send<LLDelta>> updateAndGetDelta(Mono<Send<Buffer>> keyMono,
|
||||
SerializationFunction<@Nullable Send<Buffer>, @Nullable Buffer> updater,
|
||||
boolean existsAlmostCertainly) {
|
||||
SerializationFunction<@Nullable Send<Buffer>, @Nullable Buffer> updater) {
|
||||
return Mono.usingWhen(keyMono,
|
||||
key -> Mono.fromCallable(() -> {
|
||||
try (key) {
|
||||
|
@ -73,6 +73,6 @@ public class LLMemorySingleton implements LLSingleton {
|
||||
@Override
|
||||
public Mono<Send<Buffer>> update(SerializationFunction<@Nullable Send<Buffer>, @Nullable Buffer> updater,
|
||||
UpdateReturnMode updateReturnMode) {
|
||||
return dict.update(singletonNameBufMono, updater, updateReturnMode, true);
|
||||
return dict.update(singletonNameBufMono, updater, updateReturnMode);
|
||||
}
|
||||
}
|
||||
|
@ -229,19 +229,19 @@ public abstract class TestDictionaryMap {
|
||||
Assertions.assertNull(old);
|
||||
return "error?";
|
||||
}),
|
||||
map.updateValue(key, false, old -> {
|
||||
map.updateValue(key, old -> {
|
||||
Assertions.assertEquals("error?", old);
|
||||
return "error?";
|
||||
}),
|
||||
map.updateValue(key, true, old -> {
|
||||
map.updateValue(key, old -> {
|
||||
Assertions.assertEquals("error?", old);
|
||||
return "error?";
|
||||
}),
|
||||
map.updateValue(key, true, old -> {
|
||||
map.updateValue(key, old -> {
|
||||
Assertions.assertEquals("error?", old);
|
||||
return value;
|
||||
}),
|
||||
map.updateValue(key, true, old -> {
|
||||
map.updateValue(key, old -> {
|
||||
Assertions.assertEquals(value, old);
|
||||
return value;
|
||||
})
|
||||
@ -278,7 +278,7 @@ public abstract class TestDictionaryMap {
|
||||
|
||||
Mono
|
||||
.fromRunnable(() -> log.debug("2. Updating value: {}", key))
|
||||
.then(map.updateValue(key, false, old -> {
|
||||
.then(map.updateValue(key, old -> {
|
||||
assert Objects.equals(old, "error?");
|
||||
return "error?";
|
||||
}))
|
||||
@ -287,7 +287,7 @@ public abstract class TestDictionaryMap {
|
||||
|
||||
Mono
|
||||
.fromRunnable(() -> log.debug("3. Updating value: {}", key))
|
||||
.then(map.updateValue(key, true, old -> {
|
||||
.then(map.updateValue(key, old -> {
|
||||
assert Objects.equals(old, "error?");
|
||||
return "error?";
|
||||
}))
|
||||
@ -296,7 +296,7 @@ public abstract class TestDictionaryMap {
|
||||
|
||||
Mono
|
||||
.fromRunnable(() -> log.debug("4. Updating value: {}", key))
|
||||
.then(map.updateValue(key, true, old -> {
|
||||
.then(map.updateValue(key, old -> {
|
||||
assert Objects.equals(old, "error?");
|
||||
return value;
|
||||
}))
|
||||
@ -305,7 +305,7 @@ public abstract class TestDictionaryMap {
|
||||
|
||||
Mono
|
||||
.fromRunnable(() -> log.debug("5. Updating value: {}", key))
|
||||
.then(map.updateValue(key, true, old -> {
|
||||
.then(map.updateValue(key, old -> {
|
||||
assert Objects.equals(old, value);
|
||||
return value;
|
||||
}))
|
||||
|
@ -563,19 +563,19 @@ public abstract class TestDictionaryMapDeep {
|
||||
assert old == null;
|
||||
return new Object2ObjectLinkedOpenHashMap<>(Map.of("error?", "error."));
|
||||
}),
|
||||
map.updateValue(key, false, old -> {
|
||||
map.updateValue(key, old -> {
|
||||
assert Objects.equals(old, Map.of("error?", "error."));
|
||||
return new Object2ObjectLinkedOpenHashMap<>(Map.of("error?", "error."));
|
||||
}),
|
||||
map.updateValue(key, true, old -> {
|
||||
map.updateValue(key, old -> {
|
||||
assert Objects.equals(old, Map.of("error?", "error."));
|
||||
return new Object2ObjectLinkedOpenHashMap<>(Map.of("error?", "error."));
|
||||
}),
|
||||
map.updateValue(key, true, old -> {
|
||||
map.updateValue(key, old -> {
|
||||
assert Objects.equals(old, Map.of("error?", "error."));
|
||||
return value;
|
||||
}),
|
||||
map.updateValue(key, true, old -> {
|
||||
map.updateValue(key, old -> {
|
||||
assert Objects.equals(old, value);
|
||||
return value;
|
||||
})
|
||||
@ -652,19 +652,19 @@ public abstract class TestDictionaryMapDeep {
|
||||
assert old == null;
|
||||
return new Object2ObjectLinkedOpenHashMap<>(Map.of("error?", "error."));
|
||||
}).then(map.getValue(null, key)),
|
||||
map.updateValue(key, false, old -> {
|
||||
map.updateValue(key, old -> {
|
||||
assert Objects.equals(old, Map.of("error?", "error."));
|
||||
return new Object2ObjectLinkedOpenHashMap<>(Map.of("error?", "error."));
|
||||
}).then(map.getValue(null, key)),
|
||||
map.updateValue(key, true, old -> {
|
||||
map.updateValue(key, old -> {
|
||||
assert Objects.equals(old, Map.of("error?", "error."));
|
||||
return new Object2ObjectLinkedOpenHashMap<>(Map.of("error?", "error."));
|
||||
}).then(map.getValue(null, key)),
|
||||
map.updateValue(key, true, old -> {
|
||||
map.updateValue(key, old -> {
|
||||
assert Objects.equals(old, new Object2ObjectLinkedOpenHashMap<>(Map.of("error?", "error.")));
|
||||
return value;
|
||||
}).then(map.getValue(null, key)),
|
||||
map.updateValue(key, true, old -> {
|
||||
map.updateValue(key, old -> {
|
||||
assert Objects.equals(old, value);
|
||||
return value;
|
||||
}).then(map.getValue(null, key))
|
||||
|
@ -210,12 +210,12 @@ public abstract class TestLLDictionary {
|
||||
var beforeSize = run(dict.sizeRange(null, RANGE_ALL, false));
|
||||
long afterSize;
|
||||
runVoid(updateMode == UpdateMode.DISALLOW,
|
||||
dict.update(keyEx, old -> fromString("test-value"), updateReturnMode, true).doOnNext(Send::close).then()
|
||||
dict.update(keyEx, old -> fromString("test-value"), updateReturnMode).doOnNext(Send::close).then()
|
||||
);
|
||||
afterSize = run(dict.sizeRange(null, RANGE_ALL, false));
|
||||
assertEquals(0, afterSize - beforeSize);
|
||||
runVoid(updateMode == UpdateMode.DISALLOW,
|
||||
dict.update(keyEx, old -> fromString("test-value"), updateReturnMode, false).doOnNext(Send::close).then()
|
||||
dict.update(keyEx, old -> fromString("test-value"), updateReturnMode).doOnNext(Send::close).then()
|
||||
);
|
||||
afterSize = run(dict.sizeRange(null, RANGE_ALL, false));
|
||||
assertEquals(0, afterSize - beforeSize);
|
||||
@ -235,12 +235,12 @@ public abstract class TestLLDictionary {
|
||||
var beforeSize = run(dict.sizeRange(null, RANGE_ALL, false));
|
||||
long afterSize;
|
||||
runVoid(updateMode == UpdateMode.DISALLOW,
|
||||
dict.update(keyNonEx, old -> fromString("test-value"), updateReturnMode, true).doOnNext(Send::close).then()
|
||||
dict.update(keyNonEx, old -> fromString("test-value"), updateReturnMode).doOnNext(Send::close).then()
|
||||
);
|
||||
afterSize = run(dict.sizeRange(null, RANGE_ALL, false));
|
||||
assertEquals(expected, afterSize - beforeSize);
|
||||
runVoid(updateMode == UpdateMode.DISALLOW,
|
||||
dict.update(keyNonEx, old -> fromString("test-value"), updateReturnMode, false).doOnNext(Send::close).then()
|
||||
dict.update(keyNonEx, old -> fromString("test-value"), updateReturnMode).doOnNext(Send::close).then()
|
||||
);
|
||||
afterSize = run(dict.sizeRange(null, RANGE_ALL, false));
|
||||
assertEquals(expected, afterSize - beforeSize);
|
||||
|
@ -160,10 +160,10 @@ public abstract class TestLLDictionaryLeaks {
|
||||
var dict = getDict(updateMode);
|
||||
var key = Mono.fromCallable(() -> fromString("test-key"));
|
||||
runVoid(updateMode == UpdateMode.DISALLOW,
|
||||
dict.update(key, this::pass, updateReturnMode, true).then()
|
||||
dict.update(key, this::pass, updateReturnMode).then()
|
||||
);
|
||||
runVoid(updateMode == UpdateMode.DISALLOW,
|
||||
dict.update(key, this::pass, updateReturnMode, false).then()
|
||||
dict.update(key, this::pass, updateReturnMode).then()
|
||||
);
|
||||
runVoid(updateMode == UpdateMode.DISALLOW,
|
||||
dict.update(key, this::pass, updateReturnMode).then()
|
||||
@ -180,10 +180,10 @@ public abstract class TestLLDictionaryLeaks {
|
||||
var dict = getDict(updateMode);
|
||||
var key = Mono.fromCallable(() -> fromString("test-key"));
|
||||
runVoid(updateMode == UpdateMode.DISALLOW,
|
||||
dict.updateAndGetDelta(key, this::pass, true).then()
|
||||
dict.updateAndGetDelta(key, this::pass).then()
|
||||
);
|
||||
runVoid(updateMode == UpdateMode.DISALLOW,
|
||||
dict.updateAndGetDelta(key, this::pass, false).then()
|
||||
dict.updateAndGetDelta(key, this::pass).then()
|
||||
);
|
||||
runVoid(updateMode == UpdateMode.DISALLOW,
|
||||
dict.updateAndGetDelta(key, this::pass).then()
|
||||
|
@ -334,7 +334,7 @@ public class TestLuceneSearches {
|
||||
@MethodSource("provideQueryArgumentsScoreModeAndSort")
|
||||
public void testSearchAdvancedText(boolean shards, Sort multiSort) throws Throwable {
|
||||
var queryBuilder = ClientQueryParams
|
||||
.<LazyHitKey<String>>builder()
|
||||
.builder()
|
||||
.query(new BooleanQuery(List.of(
|
||||
new BooleanQueryPart(new BoostQuery(new TermQuery(new Term("text", "hello")), 3), new OccurShould()),
|
||||
new BooleanQueryPart(new TermQuery(new Term("text", "world")), new OccurShould()),
|
||||
|
Loading…
Reference in New Issue
Block a user