diff --git a/src/main/java/it/cavallium/dbengine/client/LuceneIndex.java b/src/main/java/it/cavallium/dbengine/client/LuceneIndex.java index 5698c93..cc394e2 100644 --- a/src/main/java/it/cavallium/dbengine/client/LuceneIndex.java +++ b/src/main/java/it/cavallium/dbengine/client/LuceneIndex.java @@ -4,13 +4,16 @@ import it.cavallium.dbengine.client.query.ClientQueryParams; import it.cavallium.dbengine.client.query.current.data.Query; import it.cavallium.dbengine.database.Delta; import it.cavallium.dbengine.database.LLSnapshottable; -import it.cavallium.dbengine.database.collections.Joiner.ValueGetter; +import it.cavallium.dbengine.database.collections.ValueGetter; +import it.cavallium.dbengine.database.collections.ValueTransformer; import java.util.Map.Entry; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.util.function.Tuple2; +import reactor.util.function.Tuple3; +import reactor.util.function.Tuples; @SuppressWarnings("unused") public interface LuceneIndex extends LLSnapshottable { @@ -34,8 +37,12 @@ public interface LuceneIndex extends LLSnapshottable { } default Mono updateOrDeleteDocumentIfModified(T key, @NotNull Delta delta) { - if (delta.isModified()) { - return updateOrDeleteDocument(key, delta.current()); + return updateOrDeleteDocumentIfModified(key, delta.current(), delta.isModified()); + } + + default Mono updateOrDeleteDocumentIfModified(T key, @Nullable U currentValue, boolean modified) { + if (modified) { + return updateOrDeleteDocument(key, currentValue); } else { return Mono.empty(); } @@ -43,17 +50,34 @@ public interface LuceneIndex extends LLSnapshottable { Mono deleteAll(); - Mono> moreLikeThis(ClientQueryParams> queryParams, T key, U mltDocumentValue); + Mono> moreLikeThis(ClientQueryParams> queryParams, T key, U mltDocumentValue); - Mono> moreLikeThisWithValues(ClientQueryParams> queryParams, + default Mono> moreLikeThisWithValues(ClientQueryParams> queryParams, T key, U mltDocumentValue, - ValueGetter valueGetter); + ValueGetter valueGetter) { + return this.moreLikeThisWithTransformer(queryParams, + key, + mltDocumentValue, + getValueGetterTransformer(valueGetter)); + } - Mono> search(ClientQueryParams> queryParams); + Mono> moreLikeThisWithTransformer(ClientQueryParams> queryParams, + T key, + U mltDocumentValue, + ValueTransformer valueTransformer); - Mono> searchWithValues(ClientQueryParams> queryParams, - ValueGetter valueGetter); + Mono> search(ClientQueryParams> queryParams); + + default Mono> searchWithValues(ClientQueryParams> queryParams, + ValueGetter valueGetter) { + return this.searchWithTransformer(queryParams, + getValueGetterTransformer(valueGetter) + ); + } + + Mono> searchWithTransformer(ClientQueryParams> queryParams, + ValueTransformer valueTransformer); Mono count(@Nullable CompositeSnapshot snapshot, Query query); @@ -64,4 +88,15 @@ public interface LuceneIndex extends LLSnapshottable { Mono flush(); Mono refresh(); + + private static ValueTransformer getValueGetterTransformer(ValueGetter valueGetter) { + return new ValueTransformer() { + @Override + public Flux> transform(Flux> keys) { + return keys.flatMapSequential(key -> valueGetter + .get(key.getT2()) + .map(result -> Tuples.of(key.getT1(), key.getT2(), result))); + } + }; + } } diff --git a/src/main/java/it/cavallium/dbengine/client/LuceneIndexImpl.java b/src/main/java/it/cavallium/dbengine/client/LuceneIndexImpl.java index 35b8628..7390288 100644 --- a/src/main/java/it/cavallium/dbengine/client/LuceneIndexImpl.java +++ b/src/main/java/it/cavallium/dbengine/client/LuceneIndexImpl.java @@ -2,15 +2,13 @@ package it.cavallium.dbengine.client; import it.cavallium.dbengine.client.query.ClientQueryParams; import it.cavallium.dbengine.client.query.current.data.Query; -import it.cavallium.dbengine.client.query.current.data.QueryParams; +import it.cavallium.dbengine.database.LLKeyScore; import it.cavallium.dbengine.database.LLLuceneIndex; -import it.cavallium.dbengine.database.LLScoreMode; -import it.cavallium.dbengine.database.LLSearchResult; import it.cavallium.dbengine.database.LLSearchResultShard; import it.cavallium.dbengine.database.LLSnapshot; import it.cavallium.dbengine.database.LLTerm; -import it.cavallium.dbengine.database.collections.Joiner.ValueGetter; -import it.cavallium.dbengine.lucene.LuceneUtils; +import it.cavallium.dbengine.database.collections.ValueGetter; +import it.cavallium.dbengine.database.collections.ValueTransformer; import java.util.Map; import java.util.Map.Entry; import java.util.Set; @@ -19,6 +17,7 @@ import org.jetbrains.annotations.Nullable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.util.function.Tuple2; +import reactor.util.function.Tuples; public class LuceneIndexImpl implements LuceneIndex { @@ -84,7 +83,7 @@ public class LuceneIndexImpl implements LuceneIndex { return luceneIndex.deleteAll(); } - private Mono> transformLuceneResult(LLSearchResultShard llSearchResult) { + private Mono> transformLuceneResultWithTransformer(LLSearchResultShard llSearchResult) { return Mono.just(new SearchResultKeys<>(llSearchResult.results() .map(signal -> new SearchResultKey<>(signal.key().map(indicizer::getKey), signal.score())), llSearchResult.totalHitsCount(), @@ -92,7 +91,7 @@ public class LuceneIndexImpl implements LuceneIndex { )); } - private Mono> transformLuceneResultWithValues(LLSearchResultShard llSearchResult, + private Mono> transformLuceneResultWithValues(LLSearchResultShard llSearchResult, ValueGetter valueGetter) { return Mono.just(new SearchResult<>(llSearchResult.results().map(signal -> { var key = signal.key().map(indicizer::getKey); @@ -100,35 +99,34 @@ public class LuceneIndexImpl implements LuceneIndex { }), llSearchResult.totalHitsCount(), llSearchResult.release())); } - /** - * - * @param queryParams the limit is valid for each lucene instance. - * If you have 15 instances, the number of elements returned - * can be at most limit * 15 - * @return the collection has one or more flux - */ + private Mono> transformLuceneResultWithTransformer(LLSearchResultShard llSearchResult, + ValueTransformer valueTransformer) { + var scoresWithKeysFlux = llSearchResult + .results() + .flatMapSequential(signal -> signal.key().map(indicizer::getKey).map(key -> Tuples.of(signal.score(), key))); + var resultItemsFlux = valueTransformer + .transform(scoresWithKeysFlux) + .map(tuple3 -> new SearchResultItem<>(Mono.just(tuple3.getT2()), + Mono.just(tuple3.getT3()), + tuple3.getT1() + )); + return Mono.just(new SearchResult<>(resultItemsFlux, llSearchResult.totalHitsCount(), llSearchResult.release())); + } + @Override - public Mono> moreLikeThis(ClientQueryParams> queryParams, + public Mono> moreLikeThis(ClientQueryParams> queryParams, T key, U mltDocumentValue) { Flux>> mltDocumentFields = indicizer.getMoreLikeThisDocumentFields(key, mltDocumentValue); return luceneIndex .moreLikeThis(resolveSnapshot(queryParams.snapshot()), queryParams.toQueryParams(), indicizer.getKeyFieldName(), mltDocumentFields) - .flatMap(this::transformLuceneResult); + .flatMap(this::transformLuceneResultWithTransformer); } - - /** - * - * @param queryParams the limit is valid for each lucene instance. - * If you have 15 instances, the number of elements returned - * can be at most limit * 15 - * @return the collection has one or more flux - */ @Override - public Mono> moreLikeThisWithValues(ClientQueryParams> queryParams, + public Mono> moreLikeThisWithValues(ClientQueryParams> queryParams, T key, U mltDocumentValue, ValueGetter valueGetter) { @@ -145,41 +143,52 @@ public class LuceneIndexImpl implements LuceneIndex { )); } - /** - * - * @param queryParams the limit is valid for each lucene instance. - * If you have 15 instances, the number of elements returned - * can be at most limit * 15 - * @return the collection has one or more flux - */ @Override - public Mono> search(ClientQueryParams> queryParams) { + public Mono> moreLikeThisWithTransformer(ClientQueryParams> queryParams, + T key, + U mltDocumentValue, + ValueTransformer valueTransformer) { + Flux>> mltDocumentFields + = indicizer.getMoreLikeThisDocumentFields(key, mltDocumentValue); + return luceneIndex + .moreLikeThis(resolveSnapshot(queryParams.snapshot()), + queryParams.toQueryParams(), + indicizer.getKeyFieldName(), + mltDocumentFields + ) + .flatMap(llSearchResult -> this.transformLuceneResultWithTransformer(llSearchResult, valueTransformer)); + } + + @Override + public Mono> search(ClientQueryParams> queryParams) { return luceneIndex .search(resolveSnapshot(queryParams.snapshot()), queryParams.toQueryParams(), indicizer.getKeyFieldName() ) - .flatMap(this::transformLuceneResult); + .flatMap(this::transformLuceneResultWithTransformer); } - /** - * - * @param queryParams the limit is valid for each lucene instance. - * If you have 15 instances, the number of elements returned - * can be at most limit * 15 - * @return the collection has one or more flux - */ @Override - public Mono> searchWithValues(ClientQueryParams> queryParams, + public Mono> searchWithValues(ClientQueryParams> queryParams, ValueGetter valueGetter) { return luceneIndex .search(resolveSnapshot(queryParams.snapshot()), queryParams.toQueryParams(), indicizer.getKeyFieldName()) .flatMap(llSearchResult -> this.transformLuceneResultWithValues(llSearchResult, valueGetter)); } + @Override + public Mono> searchWithTransformer(ClientQueryParams> queryParams, + ValueTransformer valueTransformer) { + return luceneIndex + .search(resolveSnapshot(queryParams.snapshot()), queryParams.toQueryParams(), indicizer.getKeyFieldName()) + .flatMap(llSearchResult -> this.transformLuceneResultWithTransformer(llSearchResult, valueTransformer)); + } + @Override public Mono count(@Nullable CompositeSnapshot snapshot, Query query) { - return this.search(ClientQueryParams.>builder().snapshot(snapshot).query(query).limit(0).build()) + return this + .search(ClientQueryParams.>builder().snapshot(snapshot).query(query).limit(0).build()) .flatMap(tSearchResultKeys -> tSearchResultKeys.release().thenReturn(tSearchResultKeys.totalHitsCount())); } diff --git a/src/main/java/it/cavallium/dbengine/client/SearchResultKeys.java b/src/main/java/it/cavallium/dbengine/client/SearchResultKeys.java index 8d5a722..d0ce1f3 100644 --- a/src/main/java/it/cavallium/dbengine/client/SearchResultKeys.java +++ b/src/main/java/it/cavallium/dbengine/client/SearchResultKeys.java @@ -1,6 +1,6 @@ package it.cavallium.dbengine.client; -import it.cavallium.dbengine.database.collections.Joiner.ValueGetter; +import it.cavallium.dbengine.database.collections.ValueGetter; import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; diff --git a/src/main/java/it/cavallium/dbengine/database/ExtraKeyOperationResult.java b/src/main/java/it/cavallium/dbengine/database/ExtraKeyOperationResult.java new file mode 100644 index 0000000..8bea532 --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/database/ExtraKeyOperationResult.java @@ -0,0 +1,3 @@ +package it.cavallium.dbengine.database; + +public record ExtraKeyOperationResult(T key, X extra, boolean changed) {} diff --git a/src/main/java/it/cavallium/dbengine/database/KeyOperationResult.java b/src/main/java/it/cavallium/dbengine/database/KeyOperationResult.java new file mode 100644 index 0000000..697b21c --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/database/KeyOperationResult.java @@ -0,0 +1,3 @@ +package it.cavallium.dbengine.database; + +public record KeyOperationResult(T key, boolean changed) {} diff --git a/src/main/java/it/cavallium/dbengine/database/LLDictionary.java b/src/main/java/it/cavallium/dbengine/database/LLDictionary.java index 53e8186..1332503 100644 --- a/src/main/java/it/cavallium/dbengine/database/LLDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/LLDictionary.java @@ -5,12 +5,14 @@ import io.netty.buffer.ByteBufAllocator; import it.cavallium.dbengine.client.BadBlock; import java.util.List; import java.util.Map.Entry; +import java.util.function.BiFunction; import java.util.function.Function; import org.jetbrains.annotations.Nullable; import org.warp.commonutils.concurrency.atomicity.NotAtomic; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.util.function.Tuple2; +import reactor.util.function.Tuple3; @SuppressWarnings("unused") @NotAtomic @@ -58,14 +60,19 @@ public interface LLDictionary extends LLKeyValueDatabaseStructure { Mono remove(ByteBuf key, LLDictionaryResultType resultType); - Flux> getMulti(@Nullable LLSnapshot snapshot, Flux keys, boolean existsAlmostCertainly); + Flux> getMulti(@Nullable LLSnapshot snapshot, + Flux> keys, + boolean existsAlmostCertainly); - default Flux> getMulti(@Nullable LLSnapshot snapshot, Flux keys) { + default Flux> getMulti(@Nullable LLSnapshot snapshot, Flux> keys) { return getMulti(snapshot, keys, false); } Flux> putMulti(Flux> entries, boolean getOldValues); + Flux> updateMulti(Flux> entries, + BiFunction updateFunction); + Flux> getRange(@Nullable LLSnapshot snapshot, LLRange range, boolean existsAlmostCertainly); default Flux> getRange(@Nullable LLSnapshot snapshot, LLRange range) { diff --git a/src/main/java/it/cavallium/dbengine/database/RepeatedElementList.java b/src/main/java/it/cavallium/dbengine/database/RepeatedElementList.java new file mode 100644 index 0000000..503f4ac --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/database/RepeatedElementList.java @@ -0,0 +1,211 @@ +package it.cavallium.dbengine.database; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.ListIterator; +import java.util.Objects; +import org.jetbrains.annotations.NotNull; + +@SuppressWarnings("ClassCanBeRecord") +public class RepeatedElementList implements List { + + private final T element; + private final int size; + + public RepeatedElementList(T element, int size) { + this.element = element; + this.size = size; + } + + private UnsupportedOperationException uoe() { + return new UnsupportedOperationException(); + } + + @Override + public int size() { + return size; + } + + @Override + public boolean isEmpty() { + return size == 0; + } + + @Override + public boolean contains(Object o) { + return Objects.equals(element, o); + } + + @NotNull + @Override + public Iterator iterator() { + return this.listIterator(); + } + + @NotNull + @Override + public Object[] toArray() { + var arr = new Object[size]; + Arrays.fill(arr, element); + return arr; + } + + @NotNull + @Override + public T1[] toArray(@NotNull T1[] a) { + var arr = Arrays.copyOf(a, size); + Arrays.fill(arr, element); + return arr; + } + + @Override + public boolean add(T t) { + throw uoe(); + } + + @Override + public boolean remove(Object o) { + throw uoe(); + } + + @Override + public boolean containsAll(@NotNull Collection c) { + for (Object o : c) { + if (!contains(o)) { + return false; + } + } + return true; + } + + @Override + public boolean addAll(@NotNull Collection c) { + throw uoe(); + } + + @Override + public boolean addAll(int index, @NotNull Collection c) { + throw uoe(); + } + + @Override + public boolean removeAll(@NotNull Collection c) { + throw uoe(); + } + + @Override + public boolean retainAll(@NotNull Collection c) { + throw uoe(); + } + + @Override + public void clear() { + throw uoe(); + } + + @Override + public T get(int i) { + if (i >= 0 && i < size) { + return element; + } else { + throw new IndexOutOfBoundsException(i); + } + } + + @Override + public T set(int index, T element) { + throw uoe(); + } + + @Override + public void add(int index, T element) { + throw uoe(); + } + + @Override + public T remove(int index) { + throw uoe(); + } + + @Override + public int indexOf(Object o) { + if (contains(o)) { + return 0; + } else { + return -1; + } + } + + @Override + public int lastIndexOf(Object o) { + return indexOf(o); + } + + @NotNull + @Override + public ListIterator listIterator() { + return listIterator(0); + } + + @NotNull + @Override + public ListIterator listIterator(int index) { + return new ListIterator() { + int position = index - 1; + @Override + public boolean hasNext() { + return position + 1 < size; + } + + @Override + public T next() { + position++; + return get(position); + } + + @Override + public boolean hasPrevious() { + return position - 1 >= 0; + } + + @Override + public T previous() { + position--; + return get(position); + } + + @Override + public int nextIndex() { + return position + 1; + } + + @Override + public int previousIndex() { + return position - 1; + } + + @Override + public void remove() { + throw uoe(); + } + + @Override + public void set(T t) { + throw uoe(); + } + + @Override + public void add(T t) { + throw uoe(); + } + }; + } + + @NotNull + @Override + public List subList(int fromIndex, int toIndex) { + return new RepeatedElementList<>(element, toIndex - fromIndex); + } +} diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java index 065ea89..322a4d0 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java @@ -2,9 +2,10 @@ package it.cavallium.dbengine.database.collections; import io.netty.buffer.ByteBuf; import io.netty.util.ReferenceCounted; -import it.cavallium.dbengine.client.BadBlock; import it.cavallium.dbengine.client.CompositeSnapshot; import it.cavallium.dbengine.database.Delta; +import it.cavallium.dbengine.database.ExtraKeyOperationResult; +import it.cavallium.dbengine.database.KeyOperationResult; import it.cavallium.dbengine.database.LLDictionary; import it.cavallium.dbengine.database.LLDictionaryResultType; import it.cavallium.dbengine.database.LLUtils; @@ -17,10 +18,13 @@ import java.util.HashMap; import java.util.Map; import java.util.Map.Entry; import java.util.Objects; +import java.util.function.BiFunction; import java.util.function.Function; import org.jetbrains.annotations.Nullable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.util.function.Tuple2; +import reactor.util.function.Tuples; /** * Optimized implementation of "DatabaseMapDictionary with SubStageGetterSingle" @@ -173,20 +177,7 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep toKey(serializeSuffix(keySuffix)), keyBuf -> dictionary - .update(keyBuf.retain(), oldSerialized -> { - try { - var result = updater.apply(oldSerialized == null ? null : this.deserialize(oldSerialized.retain())); - if (result == null) { - return null; - } else { - return this.serialize(result); - } - } finally { - if (oldSerialized != null) { - oldSerialized.release(); - } - } - }, updateReturnMode, existsAlmostCertainly) + .update(keyBuf.retain(), getSerializedUpdater(updater), updateReturnMode, existsAlmostCertainly) .map(this::deserialize), ReferenceCounted::release ); @@ -200,25 +191,46 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep toKey(serializeSuffix(keySuffix)), keyBuf -> dictionary - .updateAndGetDelta(keyBuf.retain(), oldSerialized -> { - try { - var result = updater.apply(oldSerialized == null ? null : this.deserialize(oldSerialized.retain())); - if (result == null) { - return null; - } else { - return this.serialize(result); - } - } finally { - if (oldSerialized != null) { - oldSerialized.release(); - } - } - }, existsAlmostCertainly) + .updateAndGetDelta(keyBuf.retain(), getSerializedUpdater(updater), existsAlmostCertainly) .transform(mono -> LLUtils.mapDelta(mono, this::deserialize)), ReferenceCounted::release ); } + public Function<@Nullable ByteBuf, @Nullable ByteBuf> getSerializedUpdater(Function<@Nullable U, @Nullable U> updater) { + return oldSerialized -> { + try { + var result = updater.apply(oldSerialized == null ? null : this.deserialize(oldSerialized.retain())); + if (result == null) { + return null; + } else { + return this.serialize(result); + } + } finally { + if (oldSerialized != null) { + oldSerialized.release(); + } + } + }; + } + + public BiFunction<@Nullable ByteBuf, X, @Nullable ByteBuf> getSerializedUpdater(BiFunction<@Nullable U, X, @Nullable U> updater) { + return (oldSerialized, extra) -> { + try { + var result = updater.apply(oldSerialized == null ? null : this.deserialize(oldSerialized.retain()), extra); + if (result == null) { + return null; + } else { + return this.serialize(result); + } + } finally { + if (oldSerialized != null) { + oldSerialized.release(); + } + } + }; + } + @Override public Mono putValueAndGetPrevious(T keySuffix, U value) { return Mono @@ -309,15 +321,17 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep Mono.fromCallable(() -> { ByteBuf keySuffixBuf = serializeSuffix(keySuffix); try { - return toKey(keySuffixBuf.retain()); + return Tuples.of(keySuffix, toKey(keySuffixBuf.retain())); } finally { keySuffixBuf.release(); } })), existsAlmostCertainly) ) - .flatMap(entry -> Mono - .fromCallable(() -> Map.entry(deserializeSuffix(stripPrefix(entry.getKey(), false)), deserialize(entry.getValue()))) - ); + .flatMapSequential(entry -> { + entry.getT2().release(); + return Mono + .fromCallable(() -> Map.entry(entry.getT1(), deserialize(entry.getT3()))); + }); } private Entry serializeEntry(T key, U value) { @@ -351,6 +365,26 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep Flux> updateMulti(Flux> entries, + BiFunction<@Nullable U, X, @Nullable U> updater) { + Flux> serializedEntries = entries + .flatMap(entry -> Mono + .fromCallable(() -> Tuples.of(serializeSuffix(entry.getT1()), entry.getT2())) + .doOnDiscard(Entry.class, uncastedEntry -> { + //noinspection unchecked + var castedEntry = (Tuple2) uncastedEntry; + castedEntry.getT1().release(); + }) + ); + var serializedUpdater = getSerializedUpdater(updater); + return dictionary.updateMulti(serializedEntries, serializedUpdater) + .map(result -> new ExtraKeyOperationResult<>(deserializeSuffix(result.key()), + result.extra(), + result.changed() + )); + } + @Override public Flux>> getAllStages(@Nullable CompositeSnapshot snapshot) { return Flux diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryDeep.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryDeep.java index a1c8df0..e3dab72 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryDeep.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryDeep.java @@ -19,10 +19,14 @@ import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; import org.jetbrains.annotations.Nullable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.util.function.Tuple2; +import reactor.util.function.Tuple3; +import reactor.util.function.Tuples; // todo: implement optimized methods public class DatabaseMapDictionaryDeep> implements DatabaseStageMap { diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryHashed.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryHashed.java index 1016d12..0951a30 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryHashed.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryHashed.java @@ -7,8 +7,7 @@ import it.cavallium.dbengine.client.CompositeSnapshot; import it.cavallium.dbengine.database.Delta; import it.cavallium.dbengine.database.LLDictionary; import it.cavallium.dbengine.database.UpdateMode; -import it.cavallium.dbengine.database.collections.Joiner.ValueGetter; -import it.cavallium.dbengine.database.collections.JoinerBlocking.ValueGetterBlocking; +import it.cavallium.dbengine.database.collections.ValueGetter; import it.cavallium.dbengine.database.serialization.Serializer; import it.cavallium.dbengine.database.serialization.SerializerFixedBinaryLength; import it.unimi.dsi.fastutil.objects.ObjectArraySet; @@ -21,11 +20,15 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Objects; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.function.Function; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.util.function.Tuple2; +import reactor.util.function.Tuple3; +import reactor.util.function.Tuples; @SuppressWarnings("unused") public class DatabaseMapDictionaryHashed implements DatabaseStageMap> { diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseStageMap.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseStageMap.java index 1bbfa7c..85b34f7 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseStageMap.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseStageMap.java @@ -1,19 +1,19 @@ package it.cavallium.dbengine.database.collections; -import io.netty.buffer.ByteBuf; import it.cavallium.dbengine.client.CompositeSnapshot; import it.cavallium.dbengine.database.Delta; +import it.cavallium.dbengine.database.ExtraKeyOperationResult; +import it.cavallium.dbengine.database.KeyOperationResult; import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.database.UpdateMode; import it.cavallium.dbengine.database.UpdateReturnMode; -import it.cavallium.dbengine.database.collections.Joiner.ValueGetter; -import it.cavallium.dbengine.database.collections.JoinerBlocking.ValueGetterBlocking; -import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.Map.Entry; import java.util.Objects; import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.BiFunction; import java.util.function.Function; import org.jetbrains.annotations.Nullable; import reactor.core.publisher.Flux; @@ -55,6 +55,14 @@ public interface DatabaseStageMap> extends Dat ); } + default Flux> updateMulti(Flux> entries, BiFunction<@Nullable U, X, @Nullable U> updater) { + return entries + .flatMapSequential(entry -> this + .updateValue(entry.getT1(), prevValue -> updater.apply(prevValue, entry.getT2())) + .map(changed -> new ExtraKeyOperationResult<>(entry.getT1(), entry.getT2(), changed)) + ); + } + default Mono updateValue(T key, UpdateReturnMode updateReturnMode, Function<@Nullable U, @Nullable U> updater) { return updateValue(key, updateReturnMode, false, updater); } @@ -254,10 +262,20 @@ public interface DatabaseStageMap> extends Dat return k -> getValue(snapshot, k).block(); } - /** - * Value getter doesn't lock data. Please make sure to lock before getting data. - */ default ValueGetter getAsyncDbValueGetter(@Nullable CompositeSnapshot snapshot) { return k -> getValue(snapshot, k); } + + default ValueTransformer getAsyncDbValueTransformer(@Nullable CompositeSnapshot snapshot) { + return new ValueTransformer<>() { + @Override + public Flux> transform(Flux> keys) { + return Flux.defer(() -> { + ConcurrentHashMap extraValues = new ConcurrentHashMap<>(); + return getMulti(snapshot, keys.doOnNext(key -> extraValues.put(key.getT2(), key.getT1())).map(Tuple2::getT2)) + .map(result -> Tuples.of(extraValues.get(result.getKey()), result.getKey(), result.getValue())); + }); + } + }; + } } diff --git a/src/main/java/it/cavallium/dbengine/database/collections/Joiner.java b/src/main/java/it/cavallium/dbengine/database/collections/Joiner.java deleted file mode 100644 index 5e8ccd1..0000000 --- a/src/main/java/it/cavallium/dbengine/database/collections/Joiner.java +++ /dev/null @@ -1,24 +0,0 @@ -package it.cavallium.dbengine.database.collections; - -import reactor.core.publisher.Mono; - -public interface Joiner { - - interface ValueGetter { - - /** - * Can return Mono error IOException - */ - Mono get(KEY key); - } - - /** - * Warning! You must only join with immutable data to ensure data correctness. - * Good examples: message id, send date, ... - * Bad examples: message content, views, edited, ... - * - * Can return Mono error IOException - */ - Mono join(ValueGetter dbValueGetter, DB_VALUE value); - -} diff --git a/src/main/java/it/cavallium/dbengine/database/collections/JoinerBlocking.java b/src/main/java/it/cavallium/dbengine/database/collections/JoinerBlocking.java deleted file mode 100644 index 815d3c9..0000000 --- a/src/main/java/it/cavallium/dbengine/database/collections/JoinerBlocking.java +++ /dev/null @@ -1,18 +0,0 @@ -package it.cavallium.dbengine.database.collections; - -import java.io.IOException; - -public interface JoinerBlocking { - - interface ValueGetterBlocking { - VALUE get(KEY key) throws IOException; - } - - /** - * Warning! You must only join with immutable data to ensure data correctness. - * Good examples: message id, send date, ... - * Bad examples: message content, views, edited, ... - */ - JOINED_VALUE join(ValueGetterBlocking dbValueGetter, DB_VALUE value) throws IOException; - -} diff --git a/src/main/java/it/cavallium/dbengine/database/collections/ValueGetter.java b/src/main/java/it/cavallium/dbengine/database/collections/ValueGetter.java new file mode 100644 index 0000000..e7376b9 --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/database/collections/ValueGetter.java @@ -0,0 +1,11 @@ +package it.cavallium.dbengine.database.collections; + +import reactor.core.publisher.Mono; + +public interface ValueGetter { + + /** + * Can return Mono error IOException + */ + Mono get(KEY key); +} diff --git a/src/main/java/it/cavallium/dbengine/database/collections/ValueGetterBlocking.java b/src/main/java/it/cavallium/dbengine/database/collections/ValueGetterBlocking.java new file mode 100644 index 0000000..8720b98 --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/database/collections/ValueGetterBlocking.java @@ -0,0 +1,8 @@ +package it.cavallium.dbengine.database.collections; + +import java.io.IOException; + +public interface ValueGetterBlocking { + + VALUE get(KEY key) throws IOException; +} diff --git a/src/main/java/it/cavallium/dbengine/database/collections/ValueTransformer.java b/src/main/java/it/cavallium/dbengine/database/collections/ValueTransformer.java new file mode 100644 index 0000000..7ec9fe4 --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/database/collections/ValueTransformer.java @@ -0,0 +1,15 @@ +package it.cavallium.dbengine.database.collections; + +import java.util.Map.Entry; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.util.function.Tuple2; +import reactor.util.function.Tuple3; + +public interface ValueTransformer { + + /** + * Can return Flux error IOException + */ + Flux> transform(Flux> keys); +} diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java index 11b02a3..b4144d1 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java @@ -7,17 +7,22 @@ import it.cavallium.dbengine.client.BadBlock; import it.cavallium.dbengine.database.Column; import it.cavallium.dbengine.client.DatabaseOptions; import it.cavallium.dbengine.database.Delta; +import it.cavallium.dbengine.database.ExtraKeyOperationResult; +import it.cavallium.dbengine.database.KeyOperationResult; import it.cavallium.dbengine.database.LLDictionary; import it.cavallium.dbengine.database.LLDictionaryResultType; import it.cavallium.dbengine.database.LLRange; import it.cavallium.dbengine.database.LLSnapshot; import it.cavallium.dbengine.database.LLUtils; +import it.cavallium.dbengine.database.RepeatedElementList; import it.cavallium.dbengine.database.UpdateMode; import it.cavallium.dbengine.database.UpdateReturnMode; +import it.unimi.dsi.fastutil.booleans.BooleanArrayList; import it.unimi.dsi.fastutil.ints.IntArrayList; import it.unimi.dsi.fastutil.objects.ObjectArrayList; import java.io.IOException; import java.nio.ByteBuffer; +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -26,10 +31,12 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Objects; +import java.util.Optional; import java.util.concurrent.Callable; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.ForkJoinTask; import java.util.concurrent.locks.StampedLock; +import java.util.function.BiFunction; import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -58,6 +65,7 @@ import org.warp.commonutils.log.LoggerFactory; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.scheduler.Scheduler; +import reactor.util.function.Tuple2; import reactor.util.function.Tuple3; import reactor.util.function.Tuples; import static io.netty.buffer.Unpooled.*; @@ -71,6 +79,7 @@ public class LLLocalDictionary implements LLDictionary { static final long MAX_WRITE_BATCH_SIZE = 1024L * 1024L * 1024L; // 1GiB static final int CAPPED_WRITE_BATCH_CAP = 50000; // 50K operations static final int MULTI_GET_WINDOW = 500; + static final Duration MULTI_GET_WINDOW_TIMEOUT = Duration.ofSeconds(1); static final ReadOptions EMPTY_READ_OPTIONS = new UnmodifiableReadOptions(); static final WriteOptions EMPTY_WRITE_OPTIONS = new UnmodifiableWriteOptions(); static final WriteOptions BATCH_WRITE_OPTIONS = new UnmodifiableWriteOptions(); @@ -215,6 +224,14 @@ public class LLLocalDictionary implements LLDictionary { return list; } + private IntArrayList getLockIndicesWithExtra(List> entries) { + var list = new IntArrayList(entries.size()); + for (Tuple2 key : entries) { + list.add(getLockIndex(key.getT1())); + } + return list; + } + @Override public ByteBufAllocator getAllocator() { return alloc; @@ -869,60 +886,53 @@ public class LLLocalDictionary implements LLDictionary { private Mono getPreviousData(ByteBuf key, LLDictionaryResultType resultType) { try { return Mono - .defer(() -> { - switch (resultType) { - case PREVIOUS_VALUE_EXISTENCE: - return this - .containsKey(null, key.retain()) - .single() - .map(LLUtils::booleanToResponseByteBuffer) - .doAfterTerminate(() -> { - assert key.refCnt() > 0; - }); - case PREVIOUS_VALUE: - return Mono - .fromCallable(() -> { - StampedLock lock; - long stamp; - if (updateMode == UpdateMode.ALLOW) { - lock = itemsLock.getAt(getLockIndex(key)); + .defer(() -> switch (resultType) { + case PREVIOUS_VALUE_EXISTENCE -> this + .containsKey(null, key.retain()) + .single() + .map(LLUtils::booleanToResponseByteBuffer) + .doAfterTerminate(() -> { + assert key.refCnt() > 0; + }); + case PREVIOUS_VALUE -> Mono + .fromCallable(() -> { + StampedLock lock; + long stamp; + if (updateMode == UpdateMode.ALLOW) { + lock = itemsLock.getAt(getLockIndex(key)); - stamp = lock.readLock(); + stamp = lock.readLock(); + } else { + lock = null; + stamp = 0; + } + try { + if (logger.isTraceEnabled()) { + logger.trace("Reading {}", LLUtils.toArray(key)); + } + var data = new Holder(); + if (db.keyMayExist(cfh, LLUtils.toArray(key), data)) { + if (data.getValue() != null) { + return wrappedBuffer(data.getValue()); } else { - lock = null; - stamp = 0; - } - try { - if (logger.isTraceEnabled()) { - logger.trace("Reading {}", LLUtils.toArray(key)); - } - var data = new Holder(); - if (db.keyMayExist(cfh, LLUtils.toArray(key), data)) { - if (data.getValue() != null) { - return wrappedBuffer(data.getValue()); - } else { - try { - return dbGet(cfh, null, key.retain(), true); - } finally { - assert key.refCnt() > 0; - } - } - } else { - return null; - } - } finally { - if (updateMode == UpdateMode.ALLOW) { - lock.unlockRead(stamp); + try { + return dbGet(cfh, null, key.retain(), true); + } finally { + assert key.refCnt() > 0; } } - }) - .onErrorMap(cause -> new IOException("Failed to read " + LLUtils.toStringSafe(key), cause)) - .subscribeOn(dbScheduler); - case VOID: - return Mono.empty(); - default: - return Mono.error(new IllegalStateException("Unexpected value: " + resultType)); - } + } else { + return null; + } + } finally { + if (updateMode == UpdateMode.ALLOW) { + lock.unlockRead(stamp); + } + } + }) + .onErrorMap(cause -> new IOException("Failed to read " + LLUtils.toStringSafe(key), cause)) + .subscribeOn(dbScheduler); + case VOID -> Mono.empty(); }) .doFirst(key::retain) .doAfterTerminate(key::release); @@ -932,80 +942,92 @@ public class LLLocalDictionary implements LLDictionary { } @Override - public Flux> getMulti(@Nullable LLSnapshot snapshot, - Flux keys, + public Flux> getMulti(@Nullable LLSnapshot snapshot, + Flux> keys, boolean existsAlmostCertainly) { return keys - .window(MULTI_GET_WINDOW) - .flatMap(keysWindowFlux -> keysWindowFlux - .collectList() - .doOnDiscard(Entry.class, discardedEntry -> { - //noinspection unchecked - var entry = (Entry) discardedEntry; - entry.getKey().release(); - entry.getValue().release(); - }) - .flatMapMany(keysWindow -> Mono - .fromCallable(() -> { - Iterable locks; - ArrayList stamps; - if (updateMode == UpdateMode.ALLOW) { - locks = itemsLock.bulkGetAt(getLockIndices(keysWindow)); - stamps = new ArrayList<>(); - for (var lock : locks) { - - stamps.add(lock.readLock()); - } - } else { - locks = null; - stamps = null; - } - try { - var handlesArray = new ColumnFamilyHandle[keysWindow.size()]; - Arrays.fill(handlesArray, cfh); - var handles = ObjectArrayList.wrap(handlesArray, handlesArray.length); - var results = db.multiGetAsList(resolveSnapshot(snapshot), handles, LLUtils.toArray(keysWindow)); - var mappedResults = new ArrayList>(results.size()); - for (int i = 0; i < results.size(); i++) { - var val = results.get(i); - if (val != null) { - results.set(i, null); - mappedResults.add(Map.entry(keysWindow.get(i).retain(), wrappedBuffer(val))); - } - } - return mappedResults; - } finally { + .bufferTimeout(MULTI_GET_WINDOW, MULTI_GET_WINDOW_TIMEOUT) + .doOnDiscard(Tuple2.class, discardedEntry -> { + //noinspection unchecked + var entry = (Tuple2) discardedEntry; + entry.getT2().release(); + }) + .doOnDiscard(Tuple3.class, discardedEntry -> { + //noinspection unchecked + var entry = (Tuple3) discardedEntry; + entry.getT2().release(); + entry.getT3().release(); + }) + .flatMapSequential(keysWindow -> { + List keyBufsWindow = new ArrayList<>(keysWindow.size()); + for (Tuple2 objects : keysWindow) { + keyBufsWindow.add(objects.getT2()); + } + return Mono + .fromCallable(() -> { + Iterable locks; + ArrayList stamps; if (updateMode == UpdateMode.ALLOW) { - int index = 0; + locks = itemsLock.bulkGetAt(getLockIndices(keyBufsWindow)); + stamps = new ArrayList<>(); for (var lock : locks) { - lock.unlockRead(stamps.get(index)); - index++; + + stamps.add(lock.readLock()); + } + } else { + locks = null; + stamps = null; + } + try { + var columnFamilyHandles = new RepeatedElementList<>(cfh, keysWindow.size()); + var results = db.multiGetAsList(resolveSnapshot(snapshot), columnFamilyHandles, LLUtils.toArray(keyBufsWindow)); + var mappedResults = new ArrayList>(results.size()); + for (int i = 0; i < results.size(); i++) { + var val = results.get(i); + if (val != null) { + results.set(i, null); + mappedResults.add(Tuples.of(keysWindow.get(i).getT1(), + keyBufsWindow.get(i).retain(), + wrappedBuffer(val) + )); + } + } + return mappedResults; + } finally { + if (updateMode == UpdateMode.ALLOW) { + int index = 0; + for (var lock : locks) { + lock.unlockRead(stamps.get(index)); + index++; + } } } - } - }) - .subscribeOn(dbScheduler) - .flatMapMany(Flux::fromIterable) - .onErrorMap(cause -> new IOException("Failed to read keys " - + Arrays.deepToString(keysWindow.toArray(ByteBuf[]::new)), cause)) - .doAfterTerminate(() -> keysWindow.forEach(ReferenceCounted::release)) - ) - ) + }) + .subscribeOn(dbScheduler) + .flatMapMany(Flux::fromIterable) + .onErrorMap(cause -> new IOException("Failed to read keys " + + Arrays.deepToString(keyBufsWindow.toArray(ByteBuf[]::new)), cause)) + .doAfterTerminate(() -> keyBufsWindow.forEach(ReferenceCounted::release)); + }, 2) // Max concurrency is 2 to read data while preparing the next segment .doOnDiscard(Entry.class, discardedEntry -> { //noinspection unchecked var entry = (Entry) discardedEntry; entry.getKey().release(); entry.getValue().release(); + }) + .doOnDiscard(Tuple3.class, discardedEntry -> { + //noinspection unchecked + var entry = (Tuple3) discardedEntry; + entry.getT2().release(); + entry.getT3().release(); }); } @Override public Flux> putMulti(Flux> entries, boolean getOldValues) { return entries - .bufferTime(Math.min(MULTI_GET_WINDOW, CAPPED_WRITE_BATCH_CAP)) - .flatMap(Flux::collectList) - .map(Collections::unmodifiableList) - .flatMap(ew -> Mono + .buffer(Math.min(MULTI_GET_WINDOW, CAPPED_WRITE_BATCH_CAP)) + .flatMapSequential(ew -> Mono .using( () -> ew, entriesWindow -> Mono @@ -1054,13 +1076,15 @@ public class LLLocalDictionary implements LLDictionary { .subscribeOn(dbScheduler) // Prepend everything to get previous elements - .transformDeferred(transformer -> { + .transform(transformer -> { + var obj = new Object(); if (getOldValues) { return this .getMulti(null, Flux .fromIterable(entriesWindow) .map(Entry::getKey) - .map(ByteBuf::retain), false) + .map(ByteBuf::retain) + .map(buf -> Tuples.of(obj, buf)), false) .publishOn(dbScheduler) .then(transformer); } else { @@ -1073,8 +1097,7 @@ public class LLLocalDictionary implements LLDictionary { entry.getValue().release(); } } - ) - ) + ), 2) // Max concurrency is 2 to read data while preparing the next segment .doOnDiscard(Entry.class, entry -> { if (entry.getKey() instanceof ByteBuf && entry.getValue() instanceof ByteBuf) { //noinspection unchecked @@ -1093,6 +1116,152 @@ public class LLLocalDictionary implements LLDictionary { }); } + @Override + public Flux> updateMulti(Flux> entries, + BiFunction updateFunction) { + return entries + .buffer(Math.min(MULTI_GET_WINDOW, CAPPED_WRITE_BATCH_CAP)) + .flatMapSequential(ew -> Flux + .using( + () -> ew, + entriesWindow -> { + List keyBufsWindow = new ArrayList<>(entriesWindow.size()); + for (Tuple2 objects : entriesWindow) { + keyBufsWindow.add(objects.getT1()); + } + return Mono + .>>fromCallable(() -> { + Iterable locks; + ArrayList stamps; + if (updateMode == UpdateMode.ALLOW) { + locks = itemsLock.bulkGetAt(getLockIndicesWithExtra(entriesWindow)); + stamps = new ArrayList<>(); + for (var lock : locks) { + stamps.add(lock.writeLock()); + } + } else { + locks = null; + stamps = null; + } + try { + var columnFamilyHandles = new RepeatedElementList<>(cfh, entriesWindow.size()); + ArrayList>> mappedInputs; + { + var inputs = db.multiGetAsList(resolveSnapshot(null), columnFamilyHandles, LLUtils.toArray(keyBufsWindow)); + mappedInputs = new ArrayList<>(inputs.size()); + for (int i = 0; i < inputs.size(); i++) { + var val = inputs.get(i); + if (val != null) { + inputs.set(i, null); + mappedInputs.add(Tuples.of( + keyBufsWindow.get(i).retain(), + entriesWindow.get(i).getT2(), + Optional.of(wrappedBuffer(val)) + )); + } else { + mappedInputs.add(Tuples.of( + keyBufsWindow.get(i).retain(), + entriesWindow.get(i).getT2(), + Optional.empty() + )); + } + } + } + var updatedValuesToWrite = new ArrayList(mappedInputs.size()); + var valueChangedResult = new ArrayList>(mappedInputs.size()); + try { + for (var mappedInput : mappedInputs) { + var updatedValue = updateFunction.apply(mappedInput.getT1().retain(), mappedInput.getT2()); + valueChangedResult.add(new ExtraKeyOperationResult<>(mappedInput.getT1(), + mappedInput.getT2(), + !Objects.equals(mappedInput.getT3().orElse(null), updatedValue.retain()) + )); + updatedValuesToWrite.add(updatedValue); + } + } finally { + for (var mappedInput : mappedInputs) { + mappedInput.getT3().ifPresent(ReferenceCounted::release); + } + } + + if (USE_WRITE_BATCHES_IN_PUT_MULTI) { + var batch = new CappedWriteBatch(db, + CAPPED_WRITE_BATCH_CAP, + RESERVED_WRITE_BATCH_SIZE, + MAX_WRITE_BATCH_SIZE, + BATCH_WRITE_OPTIONS + ); + int i = 0; + for (Tuple2 entry : entriesWindow) { + var valueToWrite = updatedValuesToWrite.get(i); + if (valueToWrite == null) { + batch.delete(cfh, entry.getT1().retain()); + } else { + batch.put(cfh, entry.getT1().retain(), valueToWrite.retain()); + } + i++; + } + batch.writeToDbAndClose(); + batch.close(); + } else { + int i = 0; + for (Tuple2 entry : entriesWindow) { + var valueToWrite = updatedValuesToWrite.get(i); + db.put(cfh, EMPTY_WRITE_OPTIONS, entry.getT1().nioBuffer(), valueToWrite.nioBuffer()); + i++; + } + } + return valueChangedResult; + } finally { + if (updateMode == UpdateMode.ALLOW) { + int index = 0; + for (var lock : locks) { + lock.unlockWrite(stamps.get(index)); + index++; + } + } + } + }) + .subscribeOn(dbScheduler) + .flatMapMany(Flux::fromIterable); + }, + entriesWindow -> { + for (Tuple2 entry : entriesWindow) { + entry.getT1().release(); + } + } + ), 2 // Max concurrency is 2 to update data while preparing the next segment + ) + .doOnDiscard(Tuple2.class, entry -> { + if (entry.getT1() instanceof ByteBuf bb) { + bb.release(); + } + if (entry.getT2() instanceof ByteBuf bb) { + bb.release(); + } + }) + .doOnDiscard(ExtraKeyOperationResult.class, entry -> { + if (entry.key() instanceof ByteBuf bb) { + bb.release(); + } + if (entry.extra() instanceof ByteBuf bb) { + bb.release(); + } + }) + .doOnDiscard(Collection.class, obj -> { + //noinspection unchecked + var castedEntries = (Collection>) obj; + for (var entry : castedEntries) { + if (entry.key() instanceof ByteBuf bb) { + bb.release(); + } + if (entry.extra() instanceof ByteBuf bb) { + bb.release(); + } + } + }); + } + @Override public Flux> getRange(@Nullable LLSnapshot snapshot, LLRange range, @@ -1811,7 +1980,7 @@ public class LLLocalDictionary implements LLDictionary { } }) .onErrorMap(cause -> new IOException("Failed to get size of range " - + range.toString(), cause)) + + range, cause)) .subscribeOn(dbScheduler); } }) diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java index bfc4afb..98c2a64 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java @@ -29,6 +29,7 @@ import org.apache.commons.lang3.time.StopWatch; import org.jetbrains.annotations.Nullable; import org.rocksdb.BlockBasedTableConfig; import org.rocksdb.BloomFilter; +import org.rocksdb.ClockCache; import org.rocksdb.ColumnFamilyDescriptor; import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.CompactRangeOptions; @@ -38,6 +39,7 @@ import org.rocksdb.CompressionType; import org.rocksdb.DBOptions; import org.rocksdb.DbPath; import org.rocksdb.FlushOptions; +import org.rocksdb.IndexType; import org.rocksdb.LRUCache; import org.rocksdb.MemoryUtil; import org.rocksdb.Options; @@ -315,11 +317,25 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { .setMaxTotalWalSize(0) // automatic ; tableOptions - .setBlockCache(new LRUCache(8L * 1024L * 1024L)) // 8MiB - .setCacheIndexAndFilterBlocks(false) - .setPinL0FilterAndIndexBlocksInCache(false) + .setIndexType(IndexType.kTwoLevelIndexSearch) + .setPartitionFilters(true) + .setMetadataBlockSize(4096) + .setBlockCache(new ClockCache(8L * 1024L * 1024L)) // 8MiB + .setCacheIndexAndFilterBlocks(true) + .setCacheIndexAndFilterBlocksWithHighPriority(true) + .setPinL0FilterAndIndexBlocksInCache(true) ; - options.setWriteBufferManager(new WriteBufferManager(8L * 1024L * 1024L, new LRUCache(8L * 1024L * 1024L))); // 8MiB + options.setWriteBufferManager(new WriteBufferManager(8L * 1024L * 1024L, new ClockCache(8L * 1024L * 1024L))); // 8MiB + + if (databaseOptions.useDirectIO()) { + options + // Option to enable readahead in compaction + // If not set, it will be set to 2MB internally + .setCompactionReadaheadSize(2 * 1024 * 1024) // recommend at least 2MB + // Option to tune write buffer for direct writes + .setWritableFileMaxBufferSize(1024 * 1024) + ; + } } else { // HIGH MEMORY options @@ -329,7 +345,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { .setIncreaseParallelism(Runtime.getRuntime().availableProcessors()) .setBytesPerSync(1 * 1024 * 1024) // 1MiB .setWalBytesPerSync(10 * 1024 * 1024) - .setMaxOpenFiles(150) + .setMaxOpenFiles(30) .optimizeLevelStyleCompaction( 128 * 1024 * 1024) // 128MiB of ram will be used for level style compaction .setWriteBufferSize(64 * 1024 * 1024) // 64MB @@ -338,33 +354,44 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { .setMaxTotalWalSize(2L * 1024L * 1024L * 1024L) // 2GiB max wal directory size ; tableOptions - .setBlockCache(new LRUCache(128L * 1024L * 1024L)) // 128MiB + .setIndexType(IndexType.kTwoLevelIndexSearch) + .setPartitionFilters(true) + .setMetadataBlockSize(4096) + .setBlockCache(new ClockCache(512L * 1024L * 1024L)) // 512MiB .setCacheIndexAndFilterBlocks(true) + .setCacheIndexAndFilterBlocksWithHighPriority(true) .setPinL0FilterAndIndexBlocksInCache(true) ; final BloomFilter bloomFilter = new BloomFilter(10, false); tableOptions.setOptimizeFiltersForMemory(true); tableOptions.setFilterPolicy(bloomFilter); - options.setWriteBufferManager(new WriteBufferManager(256L * 1024L * 1024L, new LRUCache(128L * 1024L * 1024L))); // 128MiB + options.setWriteBufferManager(new WriteBufferManager(256L * 1024L * 1024L, new ClockCache(128L * 1024L * 1024L))); // 128MiB if (databaseOptions.useDirectIO()) { options - .setAllowMmapReads(false) - .setAllowMmapWrites(false) - .setUseDirectIoForFlushAndCompaction(true) - .setUseDirectReads(true) // Option to enable readahead in compaction // If not set, it will be set to 2MB internally - .setCompactionReadaheadSize(2 * 1024 * 1024) // recommend at least 2MB + .setCompactionReadaheadSize(4 * 1024 * 1024) // recommend at least 2MB // Option to tune write buffer for direct writes - .setWritableFileMaxBufferSize(1024 * 1024) + .setWritableFileMaxBufferSize(4 * 1024 * 1024) ; - } else { - options - .setAllowMmapReads(databaseOptions.allowMemoryMapping()) - .setAllowMmapWrites(databaseOptions.allowMemoryMapping()); } } + if (databaseOptions.useDirectIO()) { + options + .setAllowMmapReads(false) + .setAllowMmapWrites(false) + .setUseDirectReads(true) + ; + } else { + options + .setAllowMmapReads(databaseOptions.allowMemoryMapping()) + .setAllowMmapWrites(databaseOptions.allowMemoryMapping()); + } + + if (!databaseOptions.allowMemoryMapping()) { + options.setUseDirectIoForFlushAndCompaction(true); + } tableOptions.setBlockSize(16 * 1024); // 16MiB options.setTableFormatConfig(tableOptions); diff --git a/src/main/java/it/cavallium/dbengine/database/memory/LLMemoryDictionary.java b/src/main/java/it/cavallium/dbengine/database/memory/LLMemoryDictionary.java index 9a0095c..7c458b0 100644 --- a/src/main/java/it/cavallium/dbengine/database/memory/LLMemoryDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/memory/LLMemoryDictionary.java @@ -5,6 +5,7 @@ import io.netty.buffer.ByteBufAllocator; import io.netty.util.ReferenceCounted; import it.cavallium.dbengine.client.BadBlock; import it.cavallium.dbengine.database.Delta; +import it.cavallium.dbengine.database.ExtraKeyOperationResult; import it.cavallium.dbengine.database.LLDictionary; import it.cavallium.dbengine.database.LLDictionaryResultType; import it.cavallium.dbengine.database.LLRange; @@ -18,10 +19,14 @@ import java.util.Map; import java.util.Map.Entry; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentSkipListMap; +import java.util.function.BiFunction; import java.util.function.Function; import org.jetbrains.annotations.Nullable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.util.function.Tuple2; +import reactor.util.function.Tuple3; +import reactor.util.function.Tuples; public class LLMemoryDictionary implements LLDictionary { @@ -188,20 +193,20 @@ public class LLMemoryDictionary implements LLDictionary { } @Override - public Flux> getMulti(@Nullable LLSnapshot snapshot, - Flux keys, + public Flux> getMulti(@Nullable LLSnapshot snapshot, + Flux> keys, boolean existsAlmostCertainly) { return keys .handle((key, sink) -> { try { - var v = snapshots.get(resolveSnapshot(snapshot)).get(k(key)); + var v = snapshots.get(resolveSnapshot(snapshot)).get(k(key.getT2())); if (v == null) { sink.complete(); } else { - sink.next(Map.entry(key.retain(), kk(v))); + sink.next(Tuples.of(key.getT1(), key.getT2().retain(), kk(v))); } } finally { - key.release(); + key.getT2().release(); } }); } @@ -226,6 +231,12 @@ public class LLMemoryDictionary implements LLDictionary { }); } + @Override + public Flux> updateMulti(Flux> entries, + BiFunction updateFunction) { + return Flux.error(new UnsupportedOperationException("Not implemented")); + } + @Override public Flux> getRange(@Nullable LLSnapshot snapshot, LLRange range, diff --git a/src/main/java/it/cavallium/dbengine/lucene/LuceneUtils.java b/src/main/java/it/cavallium/dbengine/lucene/LuceneUtils.java index 7dd0fd4..8c0c9cc 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/LuceneUtils.java +++ b/src/main/java/it/cavallium/dbengine/lucene/LuceneUtils.java @@ -8,7 +8,7 @@ import it.cavallium.dbengine.client.query.current.data.QueryParams; import it.cavallium.dbengine.database.LLKeyScore; import it.cavallium.dbengine.database.collections.DatabaseMapDictionary; import it.cavallium.dbengine.database.collections.DatabaseMapDictionaryDeep; -import it.cavallium.dbengine.database.collections.Joiner.ValueGetter; +import it.cavallium.dbengine.database.collections.ValueGetter; import it.cavallium.dbengine.lucene.analyzer.NCharGramAnalyzer; import it.cavallium.dbengine.lucene.analyzer.NCharGramEdgeAnalyzer; import it.cavallium.dbengine.lucene.analyzer.TextFieldsAnalyzer;