Handle discards and drops

This commit is contained in:
Andrea Cavalli 2022-05-21 22:41:48 +02:00
parent 7f52339a6a
commit 52c216c0df
7 changed files with 62 additions and 22 deletions

View File

@ -57,6 +57,7 @@ import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.BytesRefBuilder;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.reactivestreams.Publisher;
import org.rocksdb.AbstractImmutableNativeReference;
import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB;
@ -87,6 +88,11 @@ public class LLUtils {
public static final boolean FORCE_DISABLE_CHECKSUM_VERIFICATION
= Boolean.parseBoolean(System.getProperty("it.cavallium.dbengine.checksum.disable.force", "false"));
public static final boolean DEBUG_ALL_DROPS
= Boolean.parseBoolean(System.getProperty("it.cavallium.dbengine.drops.log", "false"));
public static final boolean DEBUG_ALL_DISCARDS
= Boolean.parseBoolean(System.getProperty("it.cavallium.dbengine.discards.log", "false"));
static {
for (int i1 = 0; i1 < 256; i1++) {
var b = LEXICONOGRAPHIC_ITERATION_SEEKS[i1];
@ -624,8 +630,16 @@ public class LLUtils {
return readOptions;
}
public static Mono<Void> closeResource(Resource<?> resource) {
return Mono.fromRunnable(resource::close);
public static Mono<Void> finalizeResource(Resource<?> resource) {
return Mono.fromRunnable(() -> LLUtils.closeResource(resource));
}
public static <V> Flux<V> handleDiscard(Flux<V> flux) {
return flux.doOnDiscard(Object.class, LLUtils::onDiscard);
}
public static <V> Mono<V> handleDiscard(Mono<V> flux) {
return flux.doOnDiscard(Object.class, LLUtils::onDiscard);
}
@Deprecated
@ -847,6 +861,20 @@ public class LLUtils {
}
private static void onNextDropped(Object next) {
if (DEBUG_ALL_DROPS) {
logger.trace("Dropped: {}", () -> next.getClass().getName());
}
closeResource(next);
}
public static void onDiscard(Object next) {
if (DEBUG_ALL_DISCARDS) {
logger.trace("Discarded: {}", () -> next.getClass().getName());
}
closeResource(next);
}
public static void closeResource(Object next) {
if (next instanceof Send<?> send) {
send.close();
} else if (next instanceof Resource<?> resource && resource.isAccessible()) {

View File

@ -440,7 +440,8 @@ public class DatabaseMapDictionaryDeep<T, U, US extends DatabaseStage<U>> extend
var beforeWriterOffset = output.writerOffset();
keySuffixSerializer.serialize(keySuffix, output);
var afterWriterOffset = output.writerOffset();
assert suffixKeyLengthConsistency(afterWriterOffset - beforeWriterOffset);
assert suffixKeyLengthConsistency(afterWriterOffset - beforeWriterOffset)
: "Invalid key suffix length: " + (afterWriterOffset - beforeWriterOffset) + ". Expected: " + keySuffixLength;
}
@Override

View File

@ -1,6 +1,5 @@
package it.cavallium.dbengine.database.collections;
import io.netty5.buffer.api.Buffer;
import io.netty5.buffer.api.BufferAllocator;
import io.netty5.buffer.api.Drop;
import io.netty5.buffer.api.Owned;
@ -237,7 +236,7 @@ public class DatabaseMapDictionaryHashed<T, U, TH> extends
public Flux<Entry<T, U>> setAllValuesAndGetPrevious(Flux<Entry<T, U>> entries) {
return entries.flatMap(entry -> Mono.usingWhen(this.at(null, entry.getKey()),
stage -> stage.setAndGetPrevious(entry.getValue()).map(prev -> Map.entry(entry.getKey(), prev)),
LLUtils::closeResource
LLUtils::finalizeResource
));
}

View File

@ -1,7 +1,5 @@
package it.cavallium.dbengine.database.collections;
import static reactor.core.publisher.Mono.fromRunnable;
import it.cavallium.dbengine.client.CompositeSnapshot;
import it.cavallium.dbengine.database.Delta;
import it.cavallium.dbengine.database.LLUtils;
@ -34,14 +32,14 @@ public interface DatabaseStageMap<T, U, US extends DatabaseStage<U>> extends
default Mono<Boolean> containsKey(@Nullable CompositeSnapshot snapshot, T key) {
return Mono.usingWhen(this.at(snapshot, key),
stage -> stage.isEmpty(snapshot).map(empty -> !empty),
LLUtils::closeResource
LLUtils::finalizeResource
);
}
default Mono<U> getValue(@Nullable CompositeSnapshot snapshot, T key, boolean existsAlmostCertainly) {
return Mono.usingWhen(this.at(snapshot, key),
stage -> stage.get(snapshot, existsAlmostCertainly),
LLUtils::closeResource
LLUtils::finalizeResource
);
}
@ -54,7 +52,7 @@ public interface DatabaseStageMap<T, U, US extends DatabaseStage<U>> extends
}
default Mono<Void> putValue(T key, U value) {
return Mono.usingWhen(at(null, key).single(), stage -> stage.set(value), LLUtils::closeResource);
return Mono.usingWhen(at(null, key).single(), stage -> stage.set(value), LLUtils::finalizeResource);
}
Mono<UpdateMode> getUpdateMode();
@ -64,7 +62,7 @@ public interface DatabaseStageMap<T, U, US extends DatabaseStage<U>> extends
SerializationFunction<@Nullable U, @Nullable U> updater) {
return Mono.usingWhen(at(null, key).single(),
stage -> stage.update(updater, updateReturnMode),
LLUtils::closeResource
LLUtils::finalizeResource
);
}
@ -87,7 +85,7 @@ public interface DatabaseStageMap<T, U, US extends DatabaseStage<U>> extends
default Mono<U> putValueAndGetPrevious(T key, U value) {
return Mono.usingWhen(at(null, key).single(),
stage -> stage.setAndGetPrevious(value),
LLUtils::closeResource
LLUtils::finalizeResource
);
}
@ -96,7 +94,7 @@ public interface DatabaseStageMap<T, U, US extends DatabaseStage<U>> extends
*/
default Mono<Boolean> putValueAndGetChanged(T key, U value) {
return Mono
.usingWhen(at(null, key).single(), stage -> stage.setAndGetChanged(value), LLUtils::closeResource)
.usingWhen(at(null, key).single(), stage -> stage.setAndGetChanged(value), LLUtils::finalizeResource)
.single();
}
@ -105,7 +103,7 @@ public interface DatabaseStageMap<T, U, US extends DatabaseStage<U>> extends
}
default Mono<U> removeAndGetPrevious(T key) {
return Mono.usingWhen(at(null, key), DatabaseStage::clearAndGetPrevious, LLUtils::closeResource);
return Mono.usingWhen(at(null, key), DatabaseStage::clearAndGetPrevious, LLUtils::finalizeResource);
}
default Mono<Boolean> removeAndGetStatus(T key) {

View File

@ -7,6 +7,7 @@ import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import io.netty5.buffer.api.Send;
import it.cavallium.dbengine.database.LLSnapshot;
import it.cavallium.dbengine.database.LLUtils;
import java.io.IOException;
import java.time.Duration;
import java.util.concurrent.ExecutorService;
@ -86,6 +87,7 @@ public class CachedIndexSearcherManager implements IndexSearcherManager {
.repeatWhen(s -> s.delayElements(queryRefreshDebounceTime))
.takeUntilOther(closeRequestedMono.asMono())
.doAfterTerminate(refresherClosed::tryEmitEmpty)
.transform(LLUtils::handleDiscard)
.subscribe();
this.cachedSnapshotSearchers = CacheBuilder.newBuilder()

View File

@ -11,7 +11,6 @@ import it.cavallium.dbengine.client.query.current.data.TotalHitsCount;
import it.cavallium.dbengine.database.LLKeyScore;
import it.cavallium.dbengine.database.LLTerm;
import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.collections.DatabaseMapDictionary;
import it.cavallium.dbengine.database.collections.DatabaseMapDictionaryDeep;
import it.cavallium.dbengine.database.collections.DatabaseStageEntry;
import it.cavallium.dbengine.database.collections.DatabaseStageMap;
@ -90,7 +89,6 @@ import org.apache.lucene.search.similarities.Similarity;
import org.apache.lucene.search.similarities.TFIDFSimilarity;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FSDirectory;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.Constants;
import org.apache.lucene.util.StringHelper;
import org.jetbrains.annotations.NotNull;
@ -235,7 +233,7 @@ public class LuceneUtils {
DatabaseMapDictionaryDeep<T, Object2ObjectSortedMap<U, V>, ? extends DatabaseStageMap<U, V, ? extends DatabaseStageEntry<V>>> dictionaryDeep) {
return entry -> Mono.usingWhen(dictionaryDeep.at(snapshot, entry.getKey()),
sub -> sub.getValue(snapshot, entry.getValue()),
LLUtils::closeResource
LLUtils::finalizeResource
);
}

View File

@ -6,8 +6,6 @@ import static it.cavallium.dbengine.SyncUtils.*;
import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.DbTestUtils.TestAllocator;
import it.cavallium.dbengine.database.UpdateMode;
import it.cavallium.dbengine.database.collections.DatabaseStageEntry;
import it.cavallium.dbengine.database.collections.DatabaseStageMap;
import it.unimi.dsi.fastutil.objects.Object2ObjectLinkedOpenHashMap;
import it.unimi.dsi.fastutil.objects.Object2ObjectSortedMap;
import it.unimi.dsi.fastutil.objects.Object2ObjectSortedMaps;
@ -24,6 +22,7 @@ import org.apache.logging.log4j.Logger;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
@ -132,8 +131,8 @@ public abstract class TestDictionaryMap {
.create(tempDb(getTempDbGenerator(), allocator, db -> tempDictionary(db, updateMode)
.map(dict -> tempDatabaseMapDictionaryMap(dict, mapType, 5))
.flatMap(map -> Mono
.usingWhen(map.at(null, key), v -> v.set(value), LLUtils::closeResource)
.then(Mono.usingWhen(map.at(null, key), v -> v.get(null), LLUtils::closeResource))
.usingWhen(map.at(null, key), v -> v.set(value), LLUtils::finalizeResource)
.then(Mono.usingWhen(map.at(null, key), v -> v.get(null), LLUtils::finalizeResource))
.doFinally(s -> map.close())
)
));
@ -322,6 +321,21 @@ public abstract class TestDictionaryMap {
}
}
@Test
public void testUpdateGetWithCancel() {
tempDb(getTempDbGenerator(), allocator, db -> {
var mapMono = tempDictionary(db, UpdateMode.ALLOW)
.map(dict -> tempDatabaseMapDictionaryMap(dict, MapType.MAP, 5));
var keys = Flux.range(10, 89).map(n -> "key" + n).repeat(100);
return Mono.usingWhen(mapMono,
map -> keys.flatMap(key -> map.updateValue(key, prevValue -> key + "-val")).then(),
LLUtils::finalizeResource
);
}).take(50).blockLast();
}
@ParameterizedTest
@MethodSource("provideArgumentsPut")
public void testPutAndGetChanged(MapType mapType, UpdateMode updateMode, String key, String value, boolean shouldFail) {
@ -427,7 +441,7 @@ public abstract class TestDictionaryMap {
Flux<String> keysFlux = entriesFlux.map(Entry::getKey);
Flux<Optional<String>> resultsFlux = map.setAllValues(entriesFlux).thenMany(map.getMulti(null, keysFlux));
return Flux.zip(keysFlux, resultsFlux, Map::entry);
}, LLUtils::closeResource)
}, LLUtils::finalizeResource)
.filter(k -> k.getValue().isPresent()).map(k -> Map.entry(k.getKey(), k.getValue().orElseThrow()));
}
));