Remove some leaks

This commit is contained in:
Andrea Cavalli 2022-05-20 23:59:56 +02:00
parent d253111233
commit 18d5ddf6e1
13 changed files with 461 additions and 410 deletions

41
pom.xml
View File

@ -90,13 +90,20 @@
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-bom</artifactId>
<version>2020.0.18</version>
<version>2020.0.19</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-tools</artifactId>
<classifier>original</classifier>
<scope>runtime</scope>
<version>3.4.18</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
@ -106,6 +113,16 @@
<groupId>io.netty</groupId>
<artifactId>netty5-buffer</artifactId>
<version>5.0.0.Alpha2</version>
<exclusions>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>netty-common</artifactId>
</exclusion>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>netty-buffer</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.yaml</groupId>
@ -418,11 +435,11 @@
<version>3.12.0</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<testSourceDirectory>src/test/java</testSourceDirectory>
@ -600,6 +617,18 @@
</lifecycleMappingMetadata>
</configuration>
</plugin>
<plugin>
<groupId>net.bytebuddy</groupId>
<artifactId>byte-buddy-maven-plugin</artifactId>
<version>1.12.10</version>
<configuration>
<transformations>
<transformation>
<plugin>reactor.tools.agent.ReactorDebugByteBuddyPlugin</plugin>
</transformation>
</transformations>
</configuration>
</plugin>
</plugins>
</pluginManagement>
</build>

View File

@ -797,29 +797,21 @@ public class LLUtils {
}
public static Mono<Buffer> resolveLLDelta(Mono<LLDelta> prev, UpdateReturnMode updateReturnMode) {
return prev.handle((delta, sink) -> {
return prev.mapNotNull(delta -> {
final Buffer previous = delta.previousUnsafe();
final Buffer current = delta.currentUnsafe();
switch (updateReturnMode) {
return switch (updateReturnMode) {
case GET_NEW_VALUE -> {
if (previous != null && previous.isAccessible()) {
previous.close();
}
if (current != null) {
sink.next(current);
} else {
sink.complete();
}
yield current;
}
case GET_OLD_VALUE -> {
if (current != null && current.isAccessible()) {
current.close();
}
if (previous != null) {
sink.next(previous);
} else {
sink.complete();
}
yield previous;
}
case NOTHING -> {
if (previous != null && previous.isAccessible()) {
@ -828,10 +820,9 @@ public class LLUtils {
if (current != null && current.isAccessible()) {
current.close();
}
sink.complete();
yield null;
}
default -> sink.error(new IllegalStateException());
}
};
});
}
@ -862,27 +853,23 @@ public class LLUtils {
public static <U> Mono<Delta<U>> mapLLDelta(Mono<LLDelta> mono,
SerializationFunction<@NotNull Buffer, @Nullable U> mapper) {
return mono.handle((delta, sink) -> {
try (delta) {
Buffer prev = delta.previousUnsafe();
Buffer curr = delta.currentUnsafe();
U newPrev;
U newCurr;
if (prev != null) {
newPrev = mapper.apply(prev);
} else {
newPrev = null;
}
if (curr != null) {
newCurr = mapper.apply(curr);
} else {
newCurr = null;
}
sink.next(new Delta<>(newPrev, newCurr));
} catch (SerializationException ex) {
sink.error(ex);
return Mono.usingWhen(mono, delta -> Mono.fromCallable(() -> {
Buffer prev = delta.previousUnsafe();
Buffer curr = delta.currentUnsafe();
U newPrev;
U newCurr;
if (prev != null) {
newPrev = mapper.apply(prev);
} else {
newPrev = null;
}
});
if (curr != null) {
newCurr = mapper.apply(curr);
} else {
newCurr = null;
}
return new Delta<>(newPrev, newCurr);
}), delta -> Mono.fromRunnable(delta::close));
}
public static <R, V> boolean isDeltaChanged(Delta<V> delta) {

View File

@ -0,0 +1,60 @@
package it.cavallium.dbengine.database;
import it.cavallium.dbengine.database.collections.DatabaseStage;
import java.util.Map.Entry;
import java.util.Objects;
public final class SubStageEntry<T, U extends DatabaseStage<?>> implements SafeCloseable, Entry<T, U> {
private final T key;
private final U value;
public SubStageEntry(T key, U value) {
this.key = key;
this.value = value;
}
@Override
public void close() {
if (value != null && value.isAccessible()) {
value.close();
}
}
@Override
public T getKey() {
return key;
}
@Override
public U getValue() {
return value;
}
@Override
public U setValue(U value) {
throw new UnsupportedOperationException();
}
@Override
public boolean equals(Object obj) {
if (obj == this)
return true;
if (obj == null || obj.getClass() != this.getClass())
return false;
//noinspection rawtypes
var that = (SubStageEntry) obj;
return Objects.equals(this.key, that.key) && Objects.equals(this.value, that.value);
}
@Override
public int hashCode() {
return Objects.hash(key, value);
}
@Override
public String toString() {
return "SubStageEntry[" + "key=" + key + ", " + "value=" + value + ']';
}
}

View File

@ -4,7 +4,6 @@ import static java.util.Objects.requireNonNullElseGet;
import io.netty5.buffer.api.Buffer;
import io.netty5.buffer.api.Resource;
import io.netty5.buffer.api.Send;
import io.netty5.buffer.api.internal.ResourceSupport;
import it.cavallium.dbengine.client.CompositeSnapshot;
import it.cavallium.dbengine.database.BufSupplier;
@ -14,6 +13,7 @@ import it.cavallium.dbengine.database.LLDictionaryResultType;
import it.cavallium.dbengine.database.LLEntry;
import it.cavallium.dbengine.database.LLRange;
import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.SubStageEntry;
import it.cavallium.dbengine.database.UpdateMode;
import it.cavallium.dbengine.database.UpdateReturnMode;
import it.cavallium.dbengine.database.disk.BinarySerializationFunction;
@ -126,32 +126,31 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
});
}
private void deserializeValue(T keySuffix, Buffer value, SynchronousSink<U> sink) {
private @Nullable U deserializeValue(T keySuffix, Buffer value) {
try {
sink.next(valueSerializer.deserialize(value));
return valueSerializer.deserialize(value);
} catch (IndexOutOfBoundsException ex) {
var exMessage = ex.getMessage();
if (exMessage != null && exMessage.contains("read 0 to 0, write 0 to ")) {
var totalZeroBytesErrors = this.totalZeroBytesErrors.incrementAndGet();
if (totalZeroBytesErrors < 512 || totalZeroBytesErrors % 10000 == 0) {
try (var keySuffixBytes = serializeKeySuffixToKey(keySuffix)) {
LOG.error("Unexpected zero-bytes value at " + dictionary.getDatabaseName()
+ ":" + dictionary.getColumnName()
+ ":" + LLUtils.toStringSafe(this.keyPrefix)
+ ":" + keySuffix + "(" + LLUtils.toStringSafe(keySuffixBytes) + ") total=" + totalZeroBytesErrors);
LOG.error(
"Unexpected zero-bytes value at "
+ dictionary.getDatabaseName() + ":" + dictionary.getColumnName()
+ ":" + LLUtils.toStringSafe(this.keyPrefix) + ":" + keySuffix
+ "(" + LLUtils.toStringSafe(keySuffixBytes) + ") total=" + totalZeroBytesErrors);
} catch (SerializationException e) {
LOG.error("Unexpected zero-bytes value at " + dictionary.getDatabaseName()
+ ":" + dictionary.getColumnName()
+ ":" + LLUtils.toStringSafe(this.keyPrefix)
+ ":" + keySuffix + "(?) total=" + totalZeroBytesErrors);
LOG.error(
"Unexpected zero-bytes value at " + dictionary.getDatabaseName() + ":" + dictionary.getColumnName()
+ ":" + LLUtils.toStringSafe(this.keyPrefix) + ":" + keySuffix + "(?) total="
+ totalZeroBytesErrors);
}
}
sink.complete();
return null;
} else {
sink.error(ex);
throw ex;
}
} catch (Throwable ex) {
sink.error(ex);
}
}
@ -278,13 +277,10 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
@Override
public Mono<U> getValue(@Nullable CompositeSnapshot snapshot, T keySuffix, boolean existsAlmostCertainly) {
return dictionary
.get(resolveSnapshot(snapshot), Mono.fromCallable(() -> serializeKeySuffixToKey(keySuffix)))
.handle((valueToReceive, sink) -> {
try (valueToReceive) {
deserializeValue(keySuffix, valueToReceive, sink);
}
});
return Mono.usingWhen(dictionary
.get(resolveSnapshot(snapshot), Mono.fromCallable(() -> serializeKeySuffixToKey(keySuffix))),
value -> Mono.fromCallable(() -> deserializeValue(keySuffix, value)),
value -> Mono.fromRunnable(value::close));
}
@Override
@ -306,13 +302,10 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
public Mono<U> updateValue(T keySuffix, UpdateReturnMode updateReturnMode,
SerializationFunction<@Nullable U, @Nullable U> updater) {
var keyMono = Mono.fromCallable(() -> serializeKeySuffixToKey(keySuffix));
return dictionary
.update(keyMono, getSerializedUpdater(updater), updateReturnMode)
.handle((valueToReceive, sink) -> {
try (valueToReceive) {
deserializeValue(keySuffix, valueToReceive, sink);
}
});
return Mono.usingWhen(dictionary.update(keyMono, getSerializedUpdater(updater), updateReturnMode),
result -> Mono.fromCallable(() -> deserializeValue(keySuffix, result)),
result -> Mono.fromRunnable(result::close)
);
}
@Override
@ -320,11 +313,7 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
var keyMono = Mono.fromCallable(() -> serializeKeySuffixToKey(keySuffix));
return dictionary
.updateAndGetDelta(keyMono, getSerializedUpdater(updater))
.transform(mono -> LLUtils.mapLLDelta(mono, serialized -> {
try (serialized) {
return valueSerializer.deserialize(serialized);
}
}));
.transform(mono -> LLUtils.mapLLDelta(mono, valueSerializer::deserialize));
}
public BinarySerializationFunction getSerializedUpdater(SerializationFunction<@Nullable U, @Nullable U> updater) {
@ -368,26 +357,21 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
public Mono<U> putValueAndGetPrevious(T keySuffix, U value) {
var keyMono = Mono.fromCallable(() -> serializeKeySuffixToKey(keySuffix));
var valueMono = Mono.fromCallable(() -> serializeValue(value));
return dictionary
.put(keyMono, valueMono, LLDictionaryResultType.PREVIOUS_VALUE)
.handle((valueToReceive, sink) -> {
try (valueToReceive) {
deserializeValue(keySuffix, valueToReceive, sink);
}
});
return Mono.usingWhen(dictionary.put(keyMono, valueMono, LLDictionaryResultType.PREVIOUS_VALUE),
valueBuf -> Mono.fromCallable(() -> deserializeValue(keySuffix, valueBuf)),
valueBuf -> Mono.fromRunnable(valueBuf::close)
);
}
@Override
public Mono<Boolean> putValueAndGetChanged(T keySuffix, U value) {
var keyMono = Mono.fromCallable(() -> serializeKeySuffixToKey(keySuffix));
var valueMono = Mono.fromCallable(() -> serializeValue(value));
return dictionary
.put(keyMono, valueMono, LLDictionaryResultType.PREVIOUS_VALUE)
.handle((Buffer valueBuf, SynchronousSink<U> sink) -> {
try (valueBuf) {
deserializeValue(keySuffix, valueBuf, sink);
}
})
return Mono
.usingWhen(dictionary.put(keyMono, valueMono, LLDictionaryResultType.PREVIOUS_VALUE),
valueBuf -> Mono.fromCallable(() -> deserializeValue(keySuffix, valueBuf)),
valueBuf -> Mono.fromRunnable(valueBuf::close)
)
.map(oldValue -> !Objects.equals(oldValue, value))
.defaultIfEmpty(value != null);
}
@ -404,13 +388,10 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
@Override
public Mono<U> removeAndGetPrevious(T keySuffix) {
var keyMono = Mono.fromCallable(() -> serializeKeySuffixToKey(keySuffix));
return dictionary
.remove(keyMono, LLDictionaryResultType.PREVIOUS_VALUE)
.handle((valueToReceive, sink) -> {
try (valueToReceive) {
deserializeValue(keySuffix, valueToReceive, sink);
}
});
return Mono.usingWhen(dictionary.remove(keyMono, LLDictionaryResultType.PREVIOUS_VALUE),
valueBuf -> Mono.fromCallable(() -> deserializeValue(keySuffix, valueBuf)),
valueBuf -> Mono.fromRunnable(valueBuf::close)
);
}
@Override
@ -433,15 +414,9 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
});
return dictionary
.getMulti(resolveSnapshot(snapshot), mappedKeys)
.<Optional<U>>handle((valueBufOpt, sink) -> {
.handle((valueBufOpt, sink) -> {
try {
Optional<U> valueOpt;
if (valueBufOpt.isPresent()) {
valueOpt = Optional.of(valueSerializer.deserialize(valueBufOpt.get()));
} else {
valueOpt = Optional.empty();
}
sink.next(valueOpt);
sink.next(valueBufOpt.map(valueSerializer::deserialize));
} catch (Throwable ex) {
sink.error(ex);
} finally {
@ -499,7 +474,7 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
}
@Override
public Flux<Entry<T, DatabaseStageEntry<U>>> getAllStages(@Nullable CompositeSnapshot snapshot, boolean smallRange) {
public Flux<SubStageEntry<T, DatabaseStageEntry<U>>> getAllStages(@Nullable CompositeSnapshot snapshot, boolean smallRange) {
return getAllStages(snapshot, rangeMono, false, smallRange);
}
@ -530,9 +505,8 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
/**
* Get all stages
* @param reverse if true, the results will go backwards from the specified key (inclusive)
* @param smallRange
*/
public Flux<Entry<T, DatabaseStageEntry<U>>> getAllStages(@Nullable CompositeSnapshot snapshot,
public Flux<SubStageEntry<T, DatabaseStageEntry<U>>> getAllStages(@Nullable CompositeSnapshot snapshot,
@Nullable T keyMin,
@Nullable T keyMax,
boolean reverse,
@ -540,39 +514,41 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
if (keyMin == null && keyMax == null) {
return getAllStages(snapshot, smallRange);
} else {
Mono<LLRange> boundedRangeMono = rangeMono
.handle((fullRange, sink) -> {
try (fullRange) {
sink.next(getPatchedRange(fullRange, keyMin, keyMax));
} catch (SerializationException e) {
sink.error(e);
}
});
Mono<LLRange> boundedRangeMono = Mono.usingWhen(rangeMono,
range -> Mono.fromCallable(() -> getPatchedRange(range, keyMin, keyMax)),
range -> Mono.fromRunnable(range::close)
);
return getAllStages(snapshot, boundedRangeMono, reverse, smallRange);
}
}
private Flux<Entry<T, DatabaseStageEntry<U>>> getAllStages(@Nullable CompositeSnapshot snapshot,
private Flux<SubStageEntry<T, DatabaseStageEntry<U>>> getAllStages(@Nullable CompositeSnapshot snapshot,
Mono<LLRange> sliceRangeMono, boolean reverse, boolean smallRange) {
return dictionary
.getRangeKeys(resolveSnapshot(snapshot), sliceRangeMono, reverse, smallRange)
.handle((keyBuf, sink) -> {
try {
assert keyBuf.readableBytes() == keyPrefixLength + keySuffixLength + keyExtLength;
// Remove prefix. Keep only the suffix and the ext
splitPrefix(keyBuf).close();
suffixKeyLengthConsistency(keyBuf.readableBytes());
T keySuffix;
try (var keyBufCopy = keyBuf.copy()) {
keySuffix = deserializeSuffix(keyBufCopy);
}
var subStage = new DatabaseMapSingle<>(dictionary, BufSupplier.ofOwned(toKey(keyBuf)), valueSerializer, null);
sink.next(Map.entry(keySuffix, subStage));
} catch (Throwable ex) {
keyBuf.close();
sink.error(ex);
}
});
.flatMapSequential(keyBuf -> Mono
.<SubStageEntry<T, DatabaseStageEntry<U>>>fromCallable(() -> {
assert keyBuf.readableBytes() == keyPrefixLength + keySuffixLength + keyExtLength;
// Remove prefix. Keep only the suffix and the ext
splitPrefix(keyBuf).close();
suffixKeyLengthConsistency(keyBuf.readableBytes());
T keySuffix;
try (var keyBufCopy = keyBuf.copy()) {
keySuffix = deserializeSuffix(keyBufCopy);
}
var bufSupplier = BufSupplier.ofOwned(toKey(keyBuf));
var subStage = new DatabaseMapSingle<>(dictionary, bufSupplier, valueSerializer, null);
return new SubStageEntry<>(keySuffix, subStage);
}).doOnCancel(() -> {
if (keyBuf.isAccessible()) {
keyBuf.close();
}
}).doOnError(ex -> {
if (keyBuf.isAccessible()) {
keyBuf.close();
}
})
);
}
@Override
@ -583,7 +559,6 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
/**
* Get all values
* @param reverse if true, the results will go backwards from the specified key (inclusive)
* @param smallRange
*/
public Flux<Entry<T, U>> getAllValues(@Nullable CompositeSnapshot snapshot,
@Nullable T keyMin,
@ -593,14 +568,9 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
if (keyMin == null && keyMax == null) {
return getAllValues(snapshot, smallRange);
} else {
Mono<LLRange> boundedRangeMono = rangeMono
.handle((fullRange, sink) -> {
try (fullRange) {
sink.next(getPatchedRange(fullRange, keyMin, keyMax));
} catch (SerializationException e) {
sink.error(e);
}
});
Mono<LLRange> boundedRangeMono = Mono.usingWhen(rangeMono,
range -> Mono.fromCallable(() -> getPatchedRange(range, keyMin, keyMax)),
range -> Mono.fromRunnable(range::close));
return getAllValues(snapshot, boundedRangeMono, reverse, smallRange);
}
}
@ -610,7 +580,7 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
boolean reverse, boolean smallRange) {
return dictionary
.getRange(resolveSnapshot(snapshot), sliceRangeMono, reverse, smallRange)
.<Entry<T, U>>handle((serializedEntry, sink) -> {
.handle((serializedEntry, sink) -> {
try {
Entry<T, U> entry;
try (serializedEntry) {

View File

@ -16,6 +16,7 @@ import it.cavallium.dbengine.database.LLRange;
import it.cavallium.dbengine.database.LLSnapshot;
import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.RangeSupplier;
import it.cavallium.dbengine.database.SubStageEntry;
import it.cavallium.dbengine.database.UpdateMode;
import it.cavallium.dbengine.database.collections.DatabaseEmpty.Nothing;
import it.cavallium.dbengine.database.serialization.SerializationException;
@ -370,7 +371,7 @@ public class DatabaseMapDictionaryDeep<T, U, US extends DatabaseStage<U>> extend
}
@Override
public Flux<Entry<T, US>> getAllStages(@Nullable CompositeSnapshot snapshot, boolean smallRange) {
public Flux<SubStageEntry<T, US>> getAllStages(@Nullable CompositeSnapshot snapshot, boolean smallRange) {
return dictionary
.getRangeKeyPrefixes(resolveSnapshot(snapshot), rangeMono, keyPrefixLength + keySuffixLength, smallRange)
.flatMapSequential(groupKeyWithoutExtSend_ -> Mono.using(
@ -381,7 +382,7 @@ public class DatabaseMapDictionaryDeep<T, U, US extends DatabaseStage<U>> extend
T deserializedSuffix;
try (var splittedGroupSuffix = splitGroupSuffix(groupKeyWithoutExtSend)) {
deserializedSuffix = this.deserializeSuffix(splittedGroupSuffix);
sink.next(Map.entry(deserializedSuffix, us));
sink.next(new SubStageEntry<>(deserializedSuffix, us));
} catch (SerializationException ex) {
sink.error(ex);
}

View File

@ -10,6 +10,7 @@ import it.cavallium.dbengine.client.CompositeSnapshot;
import it.cavallium.dbengine.database.LLDictionary;
import it.cavallium.dbengine.database.LLUtils;
import io.netty5.buffer.api.internal.ResourceSupport;
import it.cavallium.dbengine.database.SubStageEntry;
import it.cavallium.dbengine.database.UpdateMode;
import it.cavallium.dbengine.database.serialization.Serializer;
import it.cavallium.dbengine.database.serialization.SerializerFixedBinaryLength;
@ -195,7 +196,7 @@ public class DatabaseMapDictionaryHashed<T, U, TH> extends
public Mono<DatabaseStageEntry<U>> at(@Nullable CompositeSnapshot snapshot, T key) {
return this
.atPrivate(snapshot, key, keySuffixHashFunction.apply(key))
.map(cast -> (DatabaseStageEntry<U>) cast);
.map(cast -> cast);
}
private Mono<DatabaseSingleBucket<T, U, TH>> atPrivate(@Nullable CompositeSnapshot snapshot, T key, TH hash) {
@ -210,7 +211,8 @@ public class DatabaseMapDictionaryHashed<T, U, TH> extends
}
@Override
public Flux<Entry<T, DatabaseStageEntry<U>>> getAllStages(@Nullable CompositeSnapshot snapshot, boolean smallRange) {
public Flux<SubStageEntry<T, DatabaseStageEntry<U>>> getAllStages(@Nullable CompositeSnapshot snapshot,
boolean smallRange) {
return subDictionary
.getAllValues(snapshot, smallRange)
.map(Entry::getValue)
@ -218,8 +220,7 @@ public class DatabaseMapDictionaryHashed<T, U, TH> extends
.flatMap(bucket -> Flux
.fromIterable(bucket)
.map(Entry::getKey)
.flatMap(key -> this.at(snapshot, key).map(stage -> Map.entry(key, stage)))
);
.flatMap(key -> this.at(snapshot, key).map(stage -> new SubStageEntry<>(key, stage))));
}
@Override

View File

@ -167,11 +167,7 @@ public class DatabaseMapSingle<U> extends ResourceSupport<DatabaseStage<U>, Data
} else {
return serializeValue(result);
}
}).transform(mono -> LLUtils.mapLLDelta(mono, serialized -> {
try (serialized) {
return serializer.deserialize(serialized);
}
}));
}).transform(mono -> LLUtils.mapLLDelta(mono, serializer::deserialize));
}
@Override

View File

@ -124,17 +124,15 @@ public class DatabaseSingleBucket<K, V, TH>
@Override
public Mono<Delta<V>> updateAndGetDelta(SerializationFunction<@Nullable V, @Nullable V> updater) {
return bucketStage
.updateAndGetDelta(oldBucket -> {
V oldValue = extractValue(oldBucket);
var result = updater.apply(oldValue);
if (result == null) {
return this.removeValueOrDelete(oldBucket);
} else {
return this.insertValueOrCreate(oldBucket, result);
}
})
.transform(mono -> LLUtils.mapDelta(mono, this::extractValue));
return bucketStage.updateAndGetDelta(oldBucket -> {
V oldValue = extractValue(oldBucket);
var result = updater.apply(oldValue);
if (result == null) {
return this.removeValueOrDelete(oldBucket);
} else {
return this.insertValueOrCreate(oldBucket, result);
}
}).transform(mono -> LLUtils.mapDelta(mono, this::extractValue));
}
@Override

View File

@ -164,11 +164,7 @@ public class DatabaseSingleton<U> extends ResourceSupport<DatabaseStage<U>, Data
return serializeValue(result);
}
}
}).transform(mono -> LLUtils.mapLLDelta(mono, serialized -> {
try (serialized) {
return serializer.deserialize(serialized);
}
}));
}).transform(mono -> LLUtils.mapLLDelta(mono, serializer::deserialize));
}
@Override

View File

@ -3,6 +3,7 @@ package it.cavallium.dbengine.database.collections;
import it.cavallium.dbengine.client.CompositeSnapshot;
import it.cavallium.dbengine.database.Delta;
import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.SubStageEntry;
import it.cavallium.dbengine.database.UpdateMode;
import it.cavallium.dbengine.database.UpdateReturnMode;
import it.cavallium.dbengine.database.serialization.KVSerializationFunction;
@ -120,7 +121,7 @@ public interface DatabaseStageMap<T, U, US extends DatabaseStage<U>> extends
return entries.flatMap(entry -> this.putValue(entry.getKey(), entry.getValue())).then();
}
Flux<Entry<T, US>> getAllStages(@Nullable CompositeSnapshot snapshot, boolean smallRange);
Flux<SubStageEntry<T, US>> getAllStages(@Nullable CompositeSnapshot snapshot, boolean smallRange);
default Flux<Entry<T, U>> getAllValues(@Nullable CompositeSnapshot snapshot, boolean smallRange) {
return this

View File

@ -421,26 +421,33 @@ public class LLLocalDictionary implements LLDictionary {
case GET_NEW_VALUE -> UpdateAtomicResultMode.CURRENT;
case GET_OLD_VALUE -> UpdateAtomicResultMode.PREVIOUS;
};
UpdateAtomicResult result;
var readOptions = generateReadOptionsOrStatic(null);
startedUpdates.increment();
try (var writeOptions = new WriteOptions()) {
result = updateTime.recordCallable(() -> db.updateAtomic(readOptions, writeOptions, key, updater, returnMode));
} finally {
endedUpdates.increment();
if (readOptions != EMPTY_READ_OPTIONS) {
readOptions.close();
UpdateAtomicResult result = null;
try {
var readOptions = generateReadOptionsOrStatic(null);
startedUpdates.increment();
try (var writeOptions = new WriteOptions()) {
result = updateTime.recordCallable(() -> db.updateAtomic(readOptions, writeOptions, key, updater, returnMode));
} finally {
endedUpdates.increment();
if (readOptions != EMPTY_READ_OPTIONS) {
readOptions.close();
}
}
}
assert result != null;
return switch (updateReturnMode) {
case NOTHING -> {
assert result != null;
return switch (updateReturnMode) {
case NOTHING -> {
result.close();
yield null;
}
case GET_NEW_VALUE -> ((UpdateAtomicResultCurrent) result).current();
case GET_OLD_VALUE -> ((UpdateAtomicResultPrevious) result).previous();
};
} catch (Throwable ex) {
if (result != null) {
result.close();
yield null;
}
case GET_NEW_VALUE -> ((UpdateAtomicResultCurrent) result).current();
case GET_OLD_VALUE -> ((UpdateAtomicResultPrevious) result).previous();
};
throw ex;
}
}), key -> Mono.fromRunnable(key::close));
}
@ -458,19 +465,27 @@ public class LLLocalDictionary implements LLDictionary {
+ "safe atomic operations");
}
UpdateAtomicResult result;
var readOptions = generateReadOptionsOrStatic(null);
startedUpdates.increment();
try (var writeOptions = new WriteOptions()) {
result = updateTime.recordCallable(() -> db.updateAtomic(readOptions, writeOptions, key, updater, DELTA));
} finally {
endedUpdates.increment();
if (readOptions != EMPTY_READ_OPTIONS) {
readOptions.close();
UpdateAtomicResultDelta result = null;
try {
var readOptions = generateReadOptionsOrStatic(null);
startedUpdates.increment();
try (var writeOptions = new WriteOptions()) {
result = updateTime.recordCallable(() ->
(UpdateAtomicResultDelta) db.updateAtomic(readOptions, writeOptions, key, updater, DELTA));
} finally {
endedUpdates.increment();
if (readOptions != EMPTY_READ_OPTIONS) {
readOptions.close();
}
}
assert result != null;
return result.delta();
} catch (Throwable ex) {
if (result != null && result.delta().isAccessible()) {
result.close();
}
throw ex;
}
assert result != null;
return ((UpdateAtomicResultDelta) result).delta();
}), key -> Mono.fromRunnable(key::close));
}
@ -938,7 +953,7 @@ public class LLLocalDictionary implements LLDictionary {
if (USE_WINDOW_IN_SET_RANGE) {
return Mono
.usingWhen(rangeMono, range -> runOnDb(true, () -> {
try (var writeOptions = new WriteOptions(); range) {
try (var writeOptions = new WriteOptions()) {
assert !Schedulers.isInNonBlockingThread() : "Called setRange in a nonblocking thread";
if (!USE_WRITE_BATCH_IN_SET_RANGE_DELETE || !USE_WRITE_BATCHES_IN_SET_RANGE) {
try (var opts = LLUtils.generateCustomReadOptions(null, true, isBoundedRange(range), smallRange)) {

View File

@ -96,33 +96,37 @@ public final class OptimisticRocksDBColumn extends AbstractRocksDBColumn<Optimis
boolean committedSuccessfully;
int retries = 0;
ExponentialPageLimits retryTime = null;
Buffer sentPrevData = null;
Buffer sentCurData = null;
boolean changed;
do {
var prevDataArray = tx.getForUpdate(readOptions, cfh, keyArray, true);
if (logger.isTraceEnabled()) {
logger.trace(MARKER_ROCKSDB,
"Reading {}: {} (before update)",
LLUtils.toStringSafe(key),
LLUtils.toStringSafe(prevDataArray)
);
}
Buffer prevData;
if (prevDataArray != null) {
prevData = MemoryManager.unsafeWrap(prevDataArray);
prevDataArray = null;
} else {
prevData = null;
}
try (prevData) {
Buffer prevData = null;
Buffer newData = null;
try {
boolean changed;
do {
if (prevData != null && prevData.isAccessible()) {
prevData.close();
}
if (newData != null && newData.isAccessible()) {
newData.close();
}
var prevDataArray = tx.getForUpdate(readOptions, cfh, keyArray, true);
if (logger.isTraceEnabled()) {
logger.trace(MARKER_ROCKSDB,
"Reading {}: {} (before update)",
LLUtils.toStringSafe(key),
LLUtils.toStringSafe(prevDataArray)
);
}
if (prevDataArray != null) {
prevData = MemoryManager.unsafeWrap(prevDataArray);
prevDataArray = null;
} else {
prevData = null;
}
Buffer prevDataToSendToUpdater;
if (prevData != null) {
prevDataToSendToUpdater = prevData.copy().makeReadOnly();
} else {
prevDataToSendToUpdater = null;
}
@Nullable Buffer newData;
try {
newData = updater.apply(prevDataToSendToUpdater);
} finally {
@ -130,119 +134,111 @@ public final class OptimisticRocksDBColumn extends AbstractRocksDBColumn<Optimis
prevDataToSendToUpdater.close();
}
}
try (newData) {
var newDataArray = newData == null ? null : LLUtils.toArray(newData);
var newDataArray = newData == null ? null : LLUtils.toArray(newData);
if (logger.isTraceEnabled()) {
logger.trace(MARKER_ROCKSDB,
"Updating {}. previous data: {}, updated data: {}",
LLUtils.toStringSafe(key),
LLUtils.toStringSafe(prevDataArray),
LLUtils.toStringSafe(newDataArray)
);
}
if (prevData != null && newData == null) {
if (logger.isTraceEnabled()) {
logger.trace(MARKER_ROCKSDB, "Deleting {} (after update)", LLUtils.toStringSafe(key));
}
tx.delete(cfh, keyArray, true);
changed = true;
committedSuccessfully = commitOptimistically(tx);
} else if (newData != null && (prevData == null || !LLUtils.equals(prevData, newData))) {
if (logger.isTraceEnabled()) {
logger.trace(MARKER_ROCKSDB,
"Updating {}. previous data: {}, updated data: {}",
"Writing {}: {} (after update)",
LLUtils.toStringSafe(key),
LLUtils.toStringSafe(prevDataArray),
LLUtils.toStringSafe(newDataArray)
LLUtils.toStringSafe(newData)
);
}
if (prevData != null && newData == null) {
if (logger.isTraceEnabled()) {
logger.trace(MARKER_ROCKSDB, "Deleting {} (after update)", LLUtils.toStringSafe(key));
}
tx.delete(cfh, keyArray, true);
changed = true;
committedSuccessfully = commitOptimistically(tx);
} else if (newData != null && (prevData == null || !LLUtils.equals(prevData, newData))) {
if (logger.isTraceEnabled()) {
logger.trace(MARKER_ROCKSDB,
"Writing {}: {} (after update)",
LLUtils.toStringSafe(key),
LLUtils.toStringSafe(newData)
);
}
tx.put(cfh, keyArray, newDataArray);
changed = true;
committedSuccessfully = commitOptimistically(tx);
} else {
changed = false;
committedSuccessfully = true;
tx.rollback();
}
if (sentPrevData != null && sentPrevData.isAccessible()) {
sentPrevData.close();
}
if (sentCurData != null && sentCurData.isAccessible()) {
sentCurData.close();
}
sentPrevData = prevData == null ? null : prevData.copy();
sentCurData = newData == null ? null : newData.copy();
if (!committedSuccessfully) {
tx.undoGetForUpdate(cfh, keyArray);
tx.rollback();
if (sentPrevData != null && sentPrevData.isAccessible()) {
sentPrevData.close();
}
if (sentCurData != null && sentCurData.isAccessible()) {
sentCurData.close();
}
retries++;
tx.put(cfh, keyArray, newDataArray);
changed = true;
committedSuccessfully = commitOptimistically(tx);
} else {
changed = false;
committedSuccessfully = true;
tx.rollback();
}
if (!committedSuccessfully) {
tx.undoGetForUpdate(cfh, keyArray);
tx.rollback();
retries++;
if (retries == 1) {
retryTime = new ExponentialPageLimits(0, 2, 2000);
}
long retryNs = 1000000L * retryTime.getPageLimit(retries);
if (retries == 1) {
retryTime = new ExponentialPageLimits(0, 2, 2000);
}
long retryNs = 1000000L * retryTime.getPageLimit(retries);
// +- 30%
retryNs = retryNs + ThreadLocalRandom.current().nextLong(-retryNs * 30L / 100L, retryNs * 30L / 100L);
// +- 30%
retryNs = retryNs + ThreadLocalRandom.current().nextLong(-retryNs * 30L / 100L, retryNs * 30L / 100L);
if (retries >= 5 && retries % 5 == 0 || ALWAYS_PRINT_OPTIMISTIC_RETRIES) {
logger.warn(MARKER_ROCKSDB, "Failed optimistic transaction {} (update):"
+ " waiting {} ms before retrying for the {} time", LLUtils.toStringSafe(key), retryNs / 1000000d, retries);
} else if (logger.isDebugEnabled(MARKER_ROCKSDB)) {
logger.debug(MARKER_ROCKSDB, "Failed optimistic transaction {} (update):"
+ " waiting {} ms before retrying for the {} time", LLUtils.toStringSafe(key), retryNs / 1000000d, retries);
}
// Wait for n milliseconds
if (retryNs > 0) {
LockSupport.parkNanos(retryNs);
}
if (retries >= 5 && retries % 5 == 0 || ALWAYS_PRINT_OPTIMISTIC_RETRIES) {
logger.warn(MARKER_ROCKSDB, "Failed optimistic transaction {} (update):"
+ " waiting {} ms before retrying for the {} time", LLUtils.toStringSafe(key), retryNs / 1000000d, retries);
} else if (logger.isDebugEnabled(MARKER_ROCKSDB)) {
logger.debug(MARKER_ROCKSDB, "Failed optimistic transaction {} (update):"
+ " waiting {} ms before retrying for the {} time", LLUtils.toStringSafe(key), retryNs / 1000000d, retries);
}
// Wait for n milliseconds
if (retryNs > 0) {
LockSupport.parkNanos(retryNs);
}
}
} while (!committedSuccessfully);
if (retries > 5) {
logger.warn(MARKER_ROCKSDB, "Took {} retries to update key {}", retries, LLUtils.toStringSafe(key));
}
} while (!committedSuccessfully);
if (retries > 5) {
logger.warn(MARKER_ROCKSDB, "Took {} retries to update key {}", retries, LLUtils.toStringSafe(key));
recordAtomicUpdateTime(changed, prevData != null, newData != null, initNanoTime);
optimisticAttempts.record(retries);
return switch (returnMode) {
case NOTHING -> {
if (prevData != null) {
prevData.close();
}
if (newData != null) {
newData.close();
}
yield RESULT_NOTHING;
}
case CURRENT -> {
if (prevData != null) {
prevData.close();
}
yield new UpdateAtomicResultCurrent(newData);
}
case PREVIOUS -> {
if (newData != null) {
newData.close();
}
yield new UpdateAtomicResultPrevious(prevData);
}
case BINARY_CHANGED -> {
if (prevData != null) {
prevData.close();
}
if (newData != null) {
newData.close();
}
yield new UpdateAtomicResultBinaryChanged(changed);
}
case DELTA -> new UpdateAtomicResultDelta(LLDelta.of(prevData, newData));
};
} catch (Throwable ex) {
if (prevData != null && prevData.isAccessible()) {
prevData.close();
}
if (newData != null && newData.isAccessible()) {
newData.close();
}
throw ex;
}
recordAtomicUpdateTime(changed, sentPrevData != null, sentCurData != null, initNanoTime);
optimisticAttempts.record(retries);
return switch (returnMode) {
case NOTHING -> {
if (sentPrevData != null) {
sentPrevData.close();
}
if (sentCurData != null) {
sentCurData.close();
}
yield RESULT_NOTHING;
}
case CURRENT -> {
if (sentPrevData != null) {
sentPrevData.close();
}
yield new UpdateAtomicResultCurrent(sentCurData);
}
case PREVIOUS -> {
if (sentCurData != null) {
sentCurData.close();
}
yield new UpdateAtomicResultPrevious(sentPrevData);
}
case BINARY_CHANGED -> {
if (sentPrevData != null) {
sentPrevData.close();
}
if (sentCurData != null) {
sentCurData.close();
}
yield new UpdateAtomicResultBinaryChanged(changed);
}
case DELTA -> new UpdateAtomicResultDelta(LLDelta.of(sentPrevData, sentCurData));
};
}
} catch (Throwable ex) {
throw new IOException("Failed to update key " + LLUtils.toStringSafe(key), ex);

View File

@ -64,30 +64,28 @@ public final class PessimisticRocksDBColumn extends AbstractRocksDBColumn<Transa
}
try (var txOpts = new TransactionOptions();
var tx = beginTransaction(writeOptions, txOpts)) {
Buffer sentPrevData;
Buffer sentCurData;
boolean changed;
if (logger.isTraceEnabled()) {
logger.trace(MARKER_ROCKSDB, "Reading {} (before update lock)", LLUtils.toStringSafe(key));
}
var prevDataArray = tx.getForUpdate(readOptions, cfh, keyArray, true);
Buffer prevData = null;
Buffer newData = null;
try {
boolean changed;
if (logger.isTraceEnabled()) {
logger.trace(MARKER_ROCKSDB,
"Reading {}: {} (before update)",
LLUtils.toStringSafe(key),
LLUtils.toStringSafe(prevDataArray)
);
logger.trace(MARKER_ROCKSDB, "Reading {} (before update lock)", LLUtils.toStringSafe(key));
}
Buffer prevData;
if (prevDataArray != null) {
readValueFoundWithoutBloomBufferSize.record(prevDataArray.length);
prevData = MemoryManager.unsafeWrap(prevDataArray);
} else {
readValueNotFoundWithoutBloomBufferSize.record(0);
prevData = null;
}
try (prevData) {
var prevDataArray = tx.getForUpdate(readOptions, cfh, keyArray, true);
try {
if (logger.isTraceEnabled()) {
logger.trace(MARKER_ROCKSDB,
"Reading {}: {} (before update)",
LLUtils.toStringSafe(key),
LLUtils.toStringSafe(prevDataArray)
);
}
if (prevDataArray != null) {
readValueFoundWithoutBloomBufferSize.record(prevDataArray.length);
prevData = MemoryManager.unsafeWrap(prevDataArray);
} else {
readValueNotFoundWithoutBloomBufferSize.record(0);
}
Buffer prevDataToSendToUpdater;
if (prevData != null) {
prevDataToSendToUpdater = prevData.copy().makeReadOnly();
@ -95,7 +93,6 @@ public final class PessimisticRocksDBColumn extends AbstractRocksDBColumn<Transa
prevDataToSendToUpdater = null;
}
@Nullable Buffer newData;
try {
newData = updater.apply(prevDataToSendToUpdater);
} finally {
@ -103,81 +100,85 @@ public final class PessimisticRocksDBColumn extends AbstractRocksDBColumn<Transa
prevDataToSendToUpdater.close();
}
}
try (newData) {
var newDataArray = newData == null ? null : LLUtils.toArray(newData);
var newDataArray = newData == null ? null : LLUtils.toArray(newData);
if (logger.isTraceEnabled()) {
logger.trace(MARKER_ROCKSDB,
"Updating {}. previous data: {}, updated data: {}",
LLUtils.toStringSafe(key),
LLUtils.toStringSafe(prevDataArray),
LLUtils.toStringSafe(newDataArray)
);
}
if (prevData != null && newData == null) {
if (logger.isTraceEnabled()) {
logger.trace(MARKER_ROCKSDB, "Deleting {} (after update)", LLUtils.toStringSafe(key));
}
writeValueBufferSize.record(0);
tx.delete(cfh, keyArray, true);
changed = true;
tx.commit();
} else if (newData != null && (prevData == null || !LLUtils.equals(prevData, newData))) {
if (logger.isTraceEnabled()) {
logger.trace(MARKER_ROCKSDB,
"Updating {}. previous data: {}, updated data: {}",
"Writing {}: {} (after update)",
LLUtils.toStringSafe(key),
LLUtils.toStringSafe(prevDataArray),
LLUtils.toStringSafe(newDataArray)
LLUtils.toStringSafe(newData)
);
}
if (prevData != null && newData == null) {
if (logger.isTraceEnabled()) {
logger.trace(MARKER_ROCKSDB, "Deleting {} (after update)", LLUtils.toStringSafe(key));
}
writeValueBufferSize.record(0);
tx.delete(cfh, keyArray, true);
changed = true;
tx.commit();
} else if (newData != null && (prevData == null || !LLUtils.equals(prevData, newData))) {
if (logger.isTraceEnabled()) {
logger.trace(MARKER_ROCKSDB,
"Writing {}: {} (after update)",
LLUtils.toStringSafe(key),
LLUtils.toStringSafe(newData)
);
}
writeValueBufferSize.record(newDataArray.length);
tx.put(cfh, keyArray, newDataArray);
changed = true;
tx.commit();
} else {
changed = false;
tx.rollback();
writeValueBufferSize.record(newDataArray.length);
tx.put(cfh, keyArray, newDataArray);
changed = true;
tx.commit();
} else {
changed = false;
tx.rollback();
}
} finally {
tx.undoGetForUpdate(cfh, keyArray);
}
recordAtomicUpdateTime(changed, prevData != null, newData != null, initNanoTime);
return switch (returnMode) {
case NOTHING -> {
if (prevData != null) {
prevData.close();
}
sentPrevData = prevData == null ? null : prevData.copy();
sentCurData = newData == null ? null : newData.copy();
if (newData != null) {
newData.close();
}
yield RESULT_NOTHING;
}
case CURRENT -> {
if (prevData != null) {
prevData.close();
}
yield new UpdateAtomicResultCurrent(newData);
}
case PREVIOUS -> {
if (newData != null) {
newData.close();
}
yield new UpdateAtomicResultPrevious(prevData);
}
case BINARY_CHANGED -> {
if (prevData != null) {
prevData.close();
}
if (newData != null) {
newData.close();
}
yield new UpdateAtomicResultBinaryChanged(changed);
}
case DELTA -> new UpdateAtomicResultDelta(LLDelta.of(prevData, newData));
};
} catch (Throwable ex) {
if (prevData != null && prevData.isAccessible()) {
prevData.close();
}
} finally {
tx.undoGetForUpdate(cfh, keyArray);
if (newData != null && newData.isAccessible()) {
newData.close();
}
throw ex;
}
recordAtomicUpdateTime(changed, sentPrevData != null, sentCurData != null, initNanoTime);
return switch (returnMode) {
case NOTHING -> {
if (sentPrevData != null) {
sentPrevData.close();
}
if (sentCurData != null) {
sentCurData.close();
}
yield RESULT_NOTHING;
}
case CURRENT -> {
if (sentPrevData != null) {
sentPrevData.close();
}
yield new UpdateAtomicResultCurrent(sentCurData);
}
case PREVIOUS -> {
if (sentCurData != null) {
sentCurData.close();
}
yield new UpdateAtomicResultPrevious(sentPrevData);
}
case BINARY_CHANGED -> {
if (sentPrevData != null) {
sentPrevData.close();
}
if (sentCurData != null) {
sentCurData.close();
}
yield new UpdateAtomicResultBinaryChanged(changed);
}
case DELTA -> new UpdateAtomicResultDelta(LLDelta.of(sentPrevData, sentCurData));
};
}
} catch (Throwable ex) {
throw new IOException("Failed to update key " + LLUtils.toStringSafe(key), ex);