Optimize performance

This commit is contained in:
Andrea Cavalli 2021-07-17 11:52:08 +02:00
parent e4bbeeca3a
commit 43439c6f10
20 changed files with 800 additions and 274 deletions

View File

@ -4,13 +4,16 @@ import it.cavallium.dbengine.client.query.ClientQueryParams;
import it.cavallium.dbengine.client.query.current.data.Query;
import it.cavallium.dbengine.database.Delta;
import it.cavallium.dbengine.database.LLSnapshottable;
import it.cavallium.dbengine.database.collections.Joiner.ValueGetter;
import it.cavallium.dbengine.database.collections.ValueGetter;
import it.cavallium.dbengine.database.collections.ValueTransformer;
import java.util.Map.Entry;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuple3;
import reactor.util.function.Tuples;
@SuppressWarnings("unused")
public interface LuceneIndex<T, U> extends LLSnapshottable {
@ -34,8 +37,12 @@ public interface LuceneIndex<T, U> extends LLSnapshottable {
}
default Mono<Void> updateOrDeleteDocumentIfModified(T key, @NotNull Delta<U> delta) {
if (delta.isModified()) {
return updateOrDeleteDocument(key, delta.current());
return updateOrDeleteDocumentIfModified(key, delta.current(), delta.isModified());
}
default Mono<Void> updateOrDeleteDocumentIfModified(T key, @Nullable U currentValue, boolean modified) {
if (modified) {
return updateOrDeleteDocument(key, currentValue);
} else {
return Mono.empty();
}
@ -43,17 +50,34 @@ public interface LuceneIndex<T, U> extends LLSnapshottable {
Mono<Void> deleteAll();
<V> Mono<SearchResultKeys<T>> moreLikeThis(ClientQueryParams<SearchResultKey<T>> queryParams, T key, U mltDocumentValue);
Mono<SearchResultKeys<T>> moreLikeThis(ClientQueryParams<SearchResultKey<T>> queryParams, T key, U mltDocumentValue);
<V> Mono<SearchResult<T, U>> moreLikeThisWithValues(ClientQueryParams<SearchResultItem<T, U>> queryParams,
default Mono<SearchResult<T, U>> moreLikeThisWithValues(ClientQueryParams<SearchResultItem<T, U>> queryParams,
T key,
U mltDocumentValue,
ValueGetter<T, U> valueGetter);
ValueGetter<T, U> valueGetter) {
return this.moreLikeThisWithTransformer(queryParams,
key,
mltDocumentValue,
getValueGetterTransformer(valueGetter));
}
<V> Mono<SearchResultKeys<T>> search(ClientQueryParams<SearchResultKey<T>> queryParams);
Mono<SearchResult<T, U>> moreLikeThisWithTransformer(ClientQueryParams<SearchResultItem<T, U>> queryParams,
T key,
U mltDocumentValue,
ValueTransformer<T, U> valueTransformer);
<V> Mono<SearchResult<T, U>> searchWithValues(ClientQueryParams<SearchResultItem<T, U>> queryParams,
ValueGetter<T, U> valueGetter);
Mono<SearchResultKeys<T>> search(ClientQueryParams<SearchResultKey<T>> queryParams);
default Mono<SearchResult<T, U>> searchWithValues(ClientQueryParams<SearchResultItem<T, U>> queryParams,
ValueGetter<T, U> valueGetter) {
return this.searchWithTransformer(queryParams,
getValueGetterTransformer(valueGetter)
);
}
Mono<SearchResult<T, U>> searchWithTransformer(ClientQueryParams<SearchResultItem<T, U>> queryParams,
ValueTransformer<T, U> valueTransformer);
Mono<Long> count(@Nullable CompositeSnapshot snapshot, Query query);
@ -64,4 +88,15 @@ public interface LuceneIndex<T, U> extends LLSnapshottable {
Mono<Void> flush();
Mono<Void> refresh();
private static <T, U> ValueTransformer<T, U> getValueGetterTransformer(ValueGetter<T, U> valueGetter) {
return new ValueTransformer<T, U>() {
@Override
public <X> Flux<Tuple3<X, T, U>> transform(Flux<Tuple2<X, T>> keys) {
return keys.flatMapSequential(key -> valueGetter
.get(key.getT2())
.map(result -> Tuples.of(key.getT1(), key.getT2(), result)));
}
};
}
}

View File

@ -2,15 +2,13 @@ package it.cavallium.dbengine.client;
import it.cavallium.dbengine.client.query.ClientQueryParams;
import it.cavallium.dbengine.client.query.current.data.Query;
import it.cavallium.dbengine.client.query.current.data.QueryParams;
import it.cavallium.dbengine.database.LLKeyScore;
import it.cavallium.dbengine.database.LLLuceneIndex;
import it.cavallium.dbengine.database.LLScoreMode;
import it.cavallium.dbengine.database.LLSearchResult;
import it.cavallium.dbengine.database.LLSearchResultShard;
import it.cavallium.dbengine.database.LLSnapshot;
import it.cavallium.dbengine.database.LLTerm;
import it.cavallium.dbengine.database.collections.Joiner.ValueGetter;
import it.cavallium.dbengine.lucene.LuceneUtils;
import it.cavallium.dbengine.database.collections.ValueGetter;
import it.cavallium.dbengine.database.collections.ValueTransformer;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
@ -19,6 +17,7 @@ import org.jetbrains.annotations.Nullable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuples;
public class LuceneIndexImpl<T, U> implements LuceneIndex<T, U> {
@ -84,7 +83,7 @@ public class LuceneIndexImpl<T, U> implements LuceneIndex<T, U> {
return luceneIndex.deleteAll();
}
private Mono<SearchResultKeys<T>> transformLuceneResult(LLSearchResultShard llSearchResult) {
private Mono<SearchResultKeys<T>> transformLuceneResultWithTransformer(LLSearchResultShard llSearchResult) {
return Mono.just(new SearchResultKeys<>(llSearchResult.results()
.map(signal -> new SearchResultKey<>(signal.key().map(indicizer::getKey), signal.score())),
llSearchResult.totalHitsCount(),
@ -92,7 +91,7 @@ public class LuceneIndexImpl<T, U> implements LuceneIndex<T, U> {
));
}
private <V> Mono<SearchResult<T, U>> transformLuceneResultWithValues(LLSearchResultShard llSearchResult,
private Mono<SearchResult<T, U>> transformLuceneResultWithValues(LLSearchResultShard llSearchResult,
ValueGetter<T, U> valueGetter) {
return Mono.just(new SearchResult<>(llSearchResult.results().map(signal -> {
var key = signal.key().map(indicizer::getKey);
@ -100,35 +99,34 @@ public class LuceneIndexImpl<T, U> implements LuceneIndex<T, U> {
}), llSearchResult.totalHitsCount(), llSearchResult.release()));
}
/**
*
* @param queryParams the limit is valid for each lucene instance.
* If you have 15 instances, the number of elements returned
* can be at most <code>limit * 15</code>
* @return the collection has one or more flux
*/
private Mono<SearchResult<T, U>> transformLuceneResultWithTransformer(LLSearchResultShard llSearchResult,
ValueTransformer<T, U> valueTransformer) {
var scoresWithKeysFlux = llSearchResult
.results()
.flatMapSequential(signal -> signal.key().map(indicizer::getKey).map(key -> Tuples.of(signal.score(), key)));
var resultItemsFlux = valueTransformer
.transform(scoresWithKeysFlux)
.map(tuple3 -> new SearchResultItem<>(Mono.just(tuple3.getT2()),
Mono.just(tuple3.getT3()),
tuple3.getT1()
));
return Mono.just(new SearchResult<>(resultItemsFlux, llSearchResult.totalHitsCount(), llSearchResult.release()));
}
@Override
public <V> Mono<SearchResultKeys<T>> moreLikeThis(ClientQueryParams<SearchResultKey<T>> queryParams,
public Mono<SearchResultKeys<T>> moreLikeThis(ClientQueryParams<SearchResultKey<T>> queryParams,
T key,
U mltDocumentValue) {
Flux<Tuple2<String, Set<String>>> mltDocumentFields
= indicizer.getMoreLikeThisDocumentFields(key, mltDocumentValue);
return luceneIndex
.moreLikeThis(resolveSnapshot(queryParams.snapshot()), queryParams.toQueryParams(), indicizer.getKeyFieldName(), mltDocumentFields)
.flatMap(this::transformLuceneResult);
.flatMap(this::transformLuceneResultWithTransformer);
}
/**
*
* @param queryParams the limit is valid for each lucene instance.
* If you have 15 instances, the number of elements returned
* can be at most <code>limit * 15</code>
* @return the collection has one or more flux
*/
@Override
public <V> Mono<SearchResult<T, U>> moreLikeThisWithValues(ClientQueryParams<SearchResultItem<T, U>> queryParams,
public Mono<SearchResult<T, U>> moreLikeThisWithValues(ClientQueryParams<SearchResultItem<T, U>> queryParams,
T key,
U mltDocumentValue,
ValueGetter<T, U> valueGetter) {
@ -145,41 +143,52 @@ public class LuceneIndexImpl<T, U> implements LuceneIndex<T, U> {
));
}
/**
*
* @param queryParams the limit is valid for each lucene instance.
* If you have 15 instances, the number of elements returned
* can be at most <code>limit * 15</code>
* @return the collection has one or more flux
*/
@Override
public <V> Mono<SearchResultKeys<T>> search(ClientQueryParams<SearchResultKey<T>> queryParams) {
public Mono<SearchResult<T, U>> moreLikeThisWithTransformer(ClientQueryParams<SearchResultItem<T, U>> queryParams,
T key,
U mltDocumentValue,
ValueTransformer<T, U> valueTransformer) {
Flux<Tuple2<String, Set<String>>> mltDocumentFields
= indicizer.getMoreLikeThisDocumentFields(key, mltDocumentValue);
return luceneIndex
.moreLikeThis(resolveSnapshot(queryParams.snapshot()),
queryParams.toQueryParams(),
indicizer.getKeyFieldName(),
mltDocumentFields
)
.flatMap(llSearchResult -> this.transformLuceneResultWithTransformer(llSearchResult, valueTransformer));
}
@Override
public Mono<SearchResultKeys<T>> search(ClientQueryParams<SearchResultKey<T>> queryParams) {
return luceneIndex
.search(resolveSnapshot(queryParams.snapshot()),
queryParams.toQueryParams(),
indicizer.getKeyFieldName()
)
.flatMap(this::transformLuceneResult);
.flatMap(this::transformLuceneResultWithTransformer);
}
/**
*
* @param queryParams the limit is valid for each lucene instance.
* If you have 15 instances, the number of elements returned
* can be at most <code>limit * 15</code>
* @return the collection has one or more flux
*/
@Override
public <V> Mono<SearchResult<T, U>> searchWithValues(ClientQueryParams<SearchResultItem<T, U>> queryParams,
public Mono<SearchResult<T, U>> searchWithValues(ClientQueryParams<SearchResultItem<T, U>> queryParams,
ValueGetter<T, U> valueGetter) {
return luceneIndex
.search(resolveSnapshot(queryParams.snapshot()), queryParams.toQueryParams(), indicizer.getKeyFieldName())
.flatMap(llSearchResult -> this.transformLuceneResultWithValues(llSearchResult, valueGetter));
}
@Override
public Mono<SearchResult<T, U>> searchWithTransformer(ClientQueryParams<SearchResultItem<T, U>> queryParams,
ValueTransformer<T, U> valueTransformer) {
return luceneIndex
.search(resolveSnapshot(queryParams.snapshot()), queryParams.toQueryParams(), indicizer.getKeyFieldName())
.flatMap(llSearchResult -> this.transformLuceneResultWithTransformer(llSearchResult, valueTransformer));
}
@Override
public Mono<Long> count(@Nullable CompositeSnapshot snapshot, Query query) {
return this.search(ClientQueryParams.<SearchResultKey<T>>builder().snapshot(snapshot).query(query).limit(0).build())
return this
.search(ClientQueryParams.<SearchResultKey<T>>builder().snapshot(snapshot).query(query).limit(0).build())
.flatMap(tSearchResultKeys -> tSearchResultKeys.release().thenReturn(tSearchResultKeys.totalHitsCount()));
}

View File

@ -1,6 +1,6 @@
package it.cavallium.dbengine.client;
import it.cavallium.dbengine.database.collections.Joiner.ValueGetter;
import it.cavallium.dbengine.database.collections.ValueGetter;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

View File

@ -0,0 +1,3 @@
package it.cavallium.dbengine.database;
public record ExtraKeyOperationResult<T, X>(T key, X extra, boolean changed) {}

View File

@ -0,0 +1,3 @@
package it.cavallium.dbengine.database;
public record KeyOperationResult<T>(T key, boolean changed) {}

View File

@ -5,12 +5,14 @@ import io.netty.buffer.ByteBufAllocator;
import it.cavallium.dbengine.client.BadBlock;
import java.util.List;
import java.util.Map.Entry;
import java.util.function.BiFunction;
import java.util.function.Function;
import org.jetbrains.annotations.Nullable;
import org.warp.commonutils.concurrency.atomicity.NotAtomic;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuple3;
@SuppressWarnings("unused")
@NotAtomic
@ -58,14 +60,19 @@ public interface LLDictionary extends LLKeyValueDatabaseStructure {
Mono<ByteBuf> remove(ByteBuf key, LLDictionaryResultType resultType);
Flux<Entry<ByteBuf, ByteBuf>> getMulti(@Nullable LLSnapshot snapshot, Flux<ByteBuf> keys, boolean existsAlmostCertainly);
<K> Flux<Tuple3<K, ByteBuf, ByteBuf>> getMulti(@Nullable LLSnapshot snapshot,
Flux<Tuple2<K, ByteBuf>> keys,
boolean existsAlmostCertainly);
default Flux<Entry<ByteBuf, ByteBuf>> getMulti(@Nullable LLSnapshot snapshot, Flux<ByteBuf> keys) {
default <K> Flux<Tuple3<K, ByteBuf, ByteBuf>> getMulti(@Nullable LLSnapshot snapshot, Flux<Tuple2<K, ByteBuf>> keys) {
return getMulti(snapshot, keys, false);
}
Flux<Entry<ByteBuf, ByteBuf>> putMulti(Flux<Entry<ByteBuf, ByteBuf>> entries, boolean getOldValues);
<X> Flux<ExtraKeyOperationResult<ByteBuf, X>> updateMulti(Flux<Tuple2<ByteBuf, X>> entries,
BiFunction<ByteBuf, X, ByteBuf> updateFunction);
Flux<Entry<ByteBuf, ByteBuf>> getRange(@Nullable LLSnapshot snapshot, LLRange range, boolean existsAlmostCertainly);
default Flux<Entry<ByteBuf, ByteBuf>> getRange(@Nullable LLSnapshot snapshot, LLRange range) {

View File

@ -0,0 +1,211 @@
package it.cavallium.dbengine.database;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.ListIterator;
import java.util.Objects;
import org.jetbrains.annotations.NotNull;
@SuppressWarnings("ClassCanBeRecord")
public class RepeatedElementList<T> implements List<T> {
private final T element;
private final int size;
public RepeatedElementList(T element, int size) {
this.element = element;
this.size = size;
}
private UnsupportedOperationException uoe() {
return new UnsupportedOperationException();
}
@Override
public int size() {
return size;
}
@Override
public boolean isEmpty() {
return size == 0;
}
@Override
public boolean contains(Object o) {
return Objects.equals(element, o);
}
@NotNull
@Override
public Iterator<T> iterator() {
return this.listIterator();
}
@NotNull
@Override
public Object[] toArray() {
var arr = new Object[size];
Arrays.fill(arr, element);
return arr;
}
@NotNull
@Override
public <T1> T1[] toArray(@NotNull T1[] a) {
var arr = Arrays.copyOf(a, size);
Arrays.fill(arr, element);
return arr;
}
@Override
public boolean add(T t) {
throw uoe();
}
@Override
public boolean remove(Object o) {
throw uoe();
}
@Override
public boolean containsAll(@NotNull Collection<?> c) {
for (Object o : c) {
if (!contains(o)) {
return false;
}
}
return true;
}
@Override
public boolean addAll(@NotNull Collection<? extends T> c) {
throw uoe();
}
@Override
public boolean addAll(int index, @NotNull Collection<? extends T> c) {
throw uoe();
}
@Override
public boolean removeAll(@NotNull Collection<?> c) {
throw uoe();
}
@Override
public boolean retainAll(@NotNull Collection<?> c) {
throw uoe();
}
@Override
public void clear() {
throw uoe();
}
@Override
public T get(int i) {
if (i >= 0 && i < size) {
return element;
} else {
throw new IndexOutOfBoundsException(i);
}
}
@Override
public T set(int index, T element) {
throw uoe();
}
@Override
public void add(int index, T element) {
throw uoe();
}
@Override
public T remove(int index) {
throw uoe();
}
@Override
public int indexOf(Object o) {
if (contains(o)) {
return 0;
} else {
return -1;
}
}
@Override
public int lastIndexOf(Object o) {
return indexOf(o);
}
@NotNull
@Override
public ListIterator<T> listIterator() {
return listIterator(0);
}
@NotNull
@Override
public ListIterator<T> listIterator(int index) {
return new ListIterator<T>() {
int position = index - 1;
@Override
public boolean hasNext() {
return position + 1 < size;
}
@Override
public T next() {
position++;
return get(position);
}
@Override
public boolean hasPrevious() {
return position - 1 >= 0;
}
@Override
public T previous() {
position--;
return get(position);
}
@Override
public int nextIndex() {
return position + 1;
}
@Override
public int previousIndex() {
return position - 1;
}
@Override
public void remove() {
throw uoe();
}
@Override
public void set(T t) {
throw uoe();
}
@Override
public void add(T t) {
throw uoe();
}
};
}
@NotNull
@Override
public List<T> subList(int fromIndex, int toIndex) {
return new RepeatedElementList<>(element, toIndex - fromIndex);
}
}

View File

@ -2,9 +2,10 @@ package it.cavallium.dbengine.database.collections;
import io.netty.buffer.ByteBuf;
import io.netty.util.ReferenceCounted;
import it.cavallium.dbengine.client.BadBlock;
import it.cavallium.dbengine.client.CompositeSnapshot;
import it.cavallium.dbengine.database.Delta;
import it.cavallium.dbengine.database.ExtraKeyOperationResult;
import it.cavallium.dbengine.database.KeyOperationResult;
import it.cavallium.dbengine.database.LLDictionary;
import it.cavallium.dbengine.database.LLDictionaryResultType;
import it.cavallium.dbengine.database.LLUtils;
@ -17,10 +18,13 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.function.BiFunction;
import java.util.function.Function;
import org.jetbrains.annotations.Nullable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuples;
/**
* Optimized implementation of "DatabaseMapDictionary with SubStageGetterSingle"
@ -173,20 +177,7 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
.using(
() -> toKey(serializeSuffix(keySuffix)),
keyBuf -> dictionary
.update(keyBuf.retain(), oldSerialized -> {
try {
var result = updater.apply(oldSerialized == null ? null : this.deserialize(oldSerialized.retain()));
if (result == null) {
return null;
} else {
return this.serialize(result);
}
} finally {
if (oldSerialized != null) {
oldSerialized.release();
}
}
}, updateReturnMode, existsAlmostCertainly)
.update(keyBuf.retain(), getSerializedUpdater(updater), updateReturnMode, existsAlmostCertainly)
.map(this::deserialize),
ReferenceCounted::release
);
@ -200,25 +191,46 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
.using(
() -> toKey(serializeSuffix(keySuffix)),
keyBuf -> dictionary
.updateAndGetDelta(keyBuf.retain(), oldSerialized -> {
try {
var result = updater.apply(oldSerialized == null ? null : this.deserialize(oldSerialized.retain()));
if (result == null) {
return null;
} else {
return this.serialize(result);
}
} finally {
if (oldSerialized != null) {
oldSerialized.release();
}
}
}, existsAlmostCertainly)
.updateAndGetDelta(keyBuf.retain(), getSerializedUpdater(updater), existsAlmostCertainly)
.transform(mono -> LLUtils.mapDelta(mono, this::deserialize)),
ReferenceCounted::release
);
}
public Function<@Nullable ByteBuf, @Nullable ByteBuf> getSerializedUpdater(Function<@Nullable U, @Nullable U> updater) {
return oldSerialized -> {
try {
var result = updater.apply(oldSerialized == null ? null : this.deserialize(oldSerialized.retain()));
if (result == null) {
return null;
} else {
return this.serialize(result);
}
} finally {
if (oldSerialized != null) {
oldSerialized.release();
}
}
};
}
public <X> BiFunction<@Nullable ByteBuf, X, @Nullable ByteBuf> getSerializedUpdater(BiFunction<@Nullable U, X, @Nullable U> updater) {
return (oldSerialized, extra) -> {
try {
var result = updater.apply(oldSerialized == null ? null : this.deserialize(oldSerialized.retain()), extra);
if (result == null) {
return null;
} else {
return this.serialize(result);
}
} finally {
if (oldSerialized != null) {
oldSerialized.release();
}
}
};
}
@Override
public Mono<U> putValueAndGetPrevious(T keySuffix, U value) {
return Mono
@ -309,15 +321,17 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
.getMulti(resolveSnapshot(snapshot), keys.flatMap(keySuffix -> Mono.fromCallable(() -> {
ByteBuf keySuffixBuf = serializeSuffix(keySuffix);
try {
return toKey(keySuffixBuf.retain());
return Tuples.of(keySuffix, toKey(keySuffixBuf.retain()));
} finally {
keySuffixBuf.release();
}
})), existsAlmostCertainly)
)
.flatMap(entry -> Mono
.fromCallable(() -> Map.entry(deserializeSuffix(stripPrefix(entry.getKey(), false)), deserialize(entry.getValue())))
);
.flatMapSequential(entry -> {
entry.getT2().release();
return Mono
.fromCallable(() -> Map.entry(entry.getT1(), deserialize(entry.getT3())));
});
}
private Entry<ByteBuf, ByteBuf> serializeEntry(T key, U value) {
@ -351,6 +365,26 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
.then();
}
@Override
public <X> Flux<ExtraKeyOperationResult<T, X>> updateMulti(Flux<Tuple2<T, X>> entries,
BiFunction<@Nullable U, X, @Nullable U> updater) {
Flux<Tuple2<ByteBuf, X>> serializedEntries = entries
.flatMap(entry -> Mono
.fromCallable(() -> Tuples.of(serializeSuffix(entry.getT1()), entry.getT2()))
.doOnDiscard(Entry.class, uncastedEntry -> {
//noinspection unchecked
var castedEntry = (Tuple2<ByteBuf, Object>) uncastedEntry;
castedEntry.getT1().release();
})
);
var serializedUpdater = getSerializedUpdater(updater);
return dictionary.updateMulti(serializedEntries, serializedUpdater)
.map(result -> new ExtraKeyOperationResult<>(deserializeSuffix(result.key()),
result.extra(),
result.changed()
));
}
@Override
public Flux<Entry<T, DatabaseStageEntry<U>>> getAllStages(@Nullable CompositeSnapshot snapshot) {
return Flux

View File

@ -19,10 +19,14 @@ import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import org.jetbrains.annotations.Nullable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuple3;
import reactor.util.function.Tuples;
// todo: implement optimized methods
public class DatabaseMapDictionaryDeep<T, U, US extends DatabaseStage<U>> implements DatabaseStageMap<T, U, US> {

View File

@ -7,8 +7,7 @@ import it.cavallium.dbengine.client.CompositeSnapshot;
import it.cavallium.dbengine.database.Delta;
import it.cavallium.dbengine.database.LLDictionary;
import it.cavallium.dbengine.database.UpdateMode;
import it.cavallium.dbengine.database.collections.Joiner.ValueGetter;
import it.cavallium.dbengine.database.collections.JoinerBlocking.ValueGetterBlocking;
import it.cavallium.dbengine.database.collections.ValueGetter;
import it.cavallium.dbengine.database.serialization.Serializer;
import it.cavallium.dbengine.database.serialization.SerializerFixedBinaryLength;
import it.unimi.dsi.fastutil.objects.ObjectArraySet;
@ -21,11 +20,15 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuple3;
import reactor.util.function.Tuples;
@SuppressWarnings("unused")
public class DatabaseMapDictionaryHashed<T, U, TH> implements DatabaseStageMap<T, U, DatabaseStageEntry<U>> {

View File

@ -1,19 +1,19 @@
package it.cavallium.dbengine.database.collections;
import io.netty.buffer.ByteBuf;
import it.cavallium.dbengine.client.CompositeSnapshot;
import it.cavallium.dbengine.database.Delta;
import it.cavallium.dbengine.database.ExtraKeyOperationResult;
import it.cavallium.dbengine.database.KeyOperationResult;
import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.UpdateMode;
import it.cavallium.dbengine.database.UpdateReturnMode;
import it.cavallium.dbengine.database.collections.Joiner.ValueGetter;
import it.cavallium.dbengine.database.collections.JoinerBlocking.ValueGetterBlocking;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BiFunction;
import java.util.function.Function;
import org.jetbrains.annotations.Nullable;
import reactor.core.publisher.Flux;
@ -55,6 +55,14 @@ public interface DatabaseStageMap<T, U, US extends DatabaseStage<U>> extends Dat
);
}
default <X> Flux<ExtraKeyOperationResult<T, X>> updateMulti(Flux<Tuple2<T, X>> entries, BiFunction<@Nullable U, X, @Nullable U> updater) {
return entries
.flatMapSequential(entry -> this
.updateValue(entry.getT1(), prevValue -> updater.apply(prevValue, entry.getT2()))
.map(changed -> new ExtraKeyOperationResult<>(entry.getT1(), entry.getT2(), changed))
);
}
default Mono<U> updateValue(T key, UpdateReturnMode updateReturnMode, Function<@Nullable U, @Nullable U> updater) {
return updateValue(key, updateReturnMode, false, updater);
}
@ -254,10 +262,20 @@ public interface DatabaseStageMap<T, U, US extends DatabaseStage<U>> extends Dat
return k -> getValue(snapshot, k).block();
}
/**
* Value getter doesn't lock data. Please make sure to lock before getting data.
*/
default ValueGetter<T, U> getAsyncDbValueGetter(@Nullable CompositeSnapshot snapshot) {
return k -> getValue(snapshot, k);
}
default ValueTransformer<T, U> getAsyncDbValueTransformer(@Nullable CompositeSnapshot snapshot) {
return new ValueTransformer<>() {
@Override
public <X> Flux<Tuple3<X, T, U>> transform(Flux<Tuple2<X, T>> keys) {
return Flux.defer(() -> {
ConcurrentHashMap<T, X> extraValues = new ConcurrentHashMap<>();
return getMulti(snapshot, keys.doOnNext(key -> extraValues.put(key.getT2(), key.getT1())).map(Tuple2::getT2))
.map(result -> Tuples.of(extraValues.get(result.getKey()), result.getKey(), result.getValue()));
});
}
};
}
}

View File

@ -1,24 +0,0 @@
package it.cavallium.dbengine.database.collections;
import reactor.core.publisher.Mono;
public interface Joiner<KEY, DB_VALUE, JOINED_VALUE> {
interface ValueGetter<KEY, VALUE> {
/**
* Can return Mono error IOException
*/
Mono<VALUE> get(KEY key);
}
/**
* Warning! You must only join with immutable data to ensure data correctness.
* Good examples: message id, send date, ...
* Bad examples: message content, views, edited, ...
*
* Can return Mono error IOException
*/
Mono<JOINED_VALUE> join(ValueGetter<KEY, DB_VALUE> dbValueGetter, DB_VALUE value);
}

View File

@ -1,18 +0,0 @@
package it.cavallium.dbengine.database.collections;
import java.io.IOException;
public interface JoinerBlocking<KEY, DB_VALUE, JOINED_VALUE> {
interface ValueGetterBlocking<KEY, VALUE> {
VALUE get(KEY key) throws IOException;
}
/**
* Warning! You must only join with immutable data to ensure data correctness.
* Good examples: message id, send date, ...
* Bad examples: message content, views, edited, ...
*/
JOINED_VALUE join(ValueGetterBlocking<KEY, DB_VALUE> dbValueGetter, DB_VALUE value) throws IOException;
}

View File

@ -0,0 +1,11 @@
package it.cavallium.dbengine.database.collections;
import reactor.core.publisher.Mono;
public interface ValueGetter<KEY, VALUE> {
/**
* Can return Mono error IOException
*/
Mono<VALUE> get(KEY key);
}

View File

@ -0,0 +1,8 @@
package it.cavallium.dbengine.database.collections;
import java.io.IOException;
public interface ValueGetterBlocking<KEY, VALUE> {
VALUE get(KEY key) throws IOException;
}

View File

@ -0,0 +1,15 @@
package it.cavallium.dbengine.database.collections;
import java.util.Map.Entry;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuple3;
public interface ValueTransformer<KEY, VALUE> {
/**
* Can return Flux error IOException
*/
<X> Flux<Tuple3<X, KEY, VALUE>> transform(Flux<Tuple2<X, KEY>> keys);
}

View File

@ -7,17 +7,22 @@ import it.cavallium.dbengine.client.BadBlock;
import it.cavallium.dbengine.database.Column;
import it.cavallium.dbengine.client.DatabaseOptions;
import it.cavallium.dbengine.database.Delta;
import it.cavallium.dbengine.database.ExtraKeyOperationResult;
import it.cavallium.dbengine.database.KeyOperationResult;
import it.cavallium.dbengine.database.LLDictionary;
import it.cavallium.dbengine.database.LLDictionaryResultType;
import it.cavallium.dbengine.database.LLRange;
import it.cavallium.dbengine.database.LLSnapshot;
import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.RepeatedElementList;
import it.cavallium.dbengine.database.UpdateMode;
import it.cavallium.dbengine.database.UpdateReturnMode;
import it.unimi.dsi.fastutil.booleans.BooleanArrayList;
import it.unimi.dsi.fastutil.ints.IntArrayList;
import it.unimi.dsi.fastutil.objects.ObjectArrayList;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@ -26,10 +31,12 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.locks.StampedLock;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@ -58,6 +65,7 @@ import org.warp.commonutils.log.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuple3;
import reactor.util.function.Tuples;
import static io.netty.buffer.Unpooled.*;
@ -71,6 +79,7 @@ public class LLLocalDictionary implements LLDictionary {
static final long MAX_WRITE_BATCH_SIZE = 1024L * 1024L * 1024L; // 1GiB
static final int CAPPED_WRITE_BATCH_CAP = 50000; // 50K operations
static final int MULTI_GET_WINDOW = 500;
static final Duration MULTI_GET_WINDOW_TIMEOUT = Duration.ofSeconds(1);
static final ReadOptions EMPTY_READ_OPTIONS = new UnmodifiableReadOptions();
static final WriteOptions EMPTY_WRITE_OPTIONS = new UnmodifiableWriteOptions();
static final WriteOptions BATCH_WRITE_OPTIONS = new UnmodifiableWriteOptions();
@ -215,6 +224,14 @@ public class LLLocalDictionary implements LLDictionary {
return list;
}
private <X> IntArrayList getLockIndicesWithExtra(List<Tuple2<ByteBuf, X>> entries) {
var list = new IntArrayList(entries.size());
for (Tuple2<ByteBuf, X> key : entries) {
list.add(getLockIndex(key.getT1()));
}
return list;
}
@Override
public ByteBufAllocator getAllocator() {
return alloc;
@ -869,60 +886,53 @@ public class LLLocalDictionary implements LLDictionary {
private Mono<ByteBuf> getPreviousData(ByteBuf key, LLDictionaryResultType resultType) {
try {
return Mono
.defer(() -> {
switch (resultType) {
case PREVIOUS_VALUE_EXISTENCE:
return this
.containsKey(null, key.retain())
.single()
.map(LLUtils::booleanToResponseByteBuffer)
.doAfterTerminate(() -> {
assert key.refCnt() > 0;
});
case PREVIOUS_VALUE:
return Mono
.fromCallable(() -> {
StampedLock lock;
long stamp;
if (updateMode == UpdateMode.ALLOW) {
lock = itemsLock.getAt(getLockIndex(key));
.defer(() -> switch (resultType) {
case PREVIOUS_VALUE_EXISTENCE -> this
.containsKey(null, key.retain())
.single()
.map(LLUtils::booleanToResponseByteBuffer)
.doAfterTerminate(() -> {
assert key.refCnt() > 0;
});
case PREVIOUS_VALUE -> Mono
.fromCallable(() -> {
StampedLock lock;
long stamp;
if (updateMode == UpdateMode.ALLOW) {
lock = itemsLock.getAt(getLockIndex(key));
stamp = lock.readLock();
stamp = lock.readLock();
} else {
lock = null;
stamp = 0;
}
try {
if (logger.isTraceEnabled()) {
logger.trace("Reading {}", LLUtils.toArray(key));
}
var data = new Holder<byte[]>();
if (db.keyMayExist(cfh, LLUtils.toArray(key), data)) {
if (data.getValue() != null) {
return wrappedBuffer(data.getValue());
} else {
lock = null;
stamp = 0;
}
try {
if (logger.isTraceEnabled()) {
logger.trace("Reading {}", LLUtils.toArray(key));
}
var data = new Holder<byte[]>();
if (db.keyMayExist(cfh, LLUtils.toArray(key), data)) {
if (data.getValue() != null) {
return wrappedBuffer(data.getValue());
} else {
try {
return dbGet(cfh, null, key.retain(), true);
} finally {
assert key.refCnt() > 0;
}
}
} else {
return null;
}
} finally {
if (updateMode == UpdateMode.ALLOW) {
lock.unlockRead(stamp);
try {
return dbGet(cfh, null, key.retain(), true);
} finally {
assert key.refCnt() > 0;
}
}
})
.onErrorMap(cause -> new IOException("Failed to read " + LLUtils.toStringSafe(key), cause))
.subscribeOn(dbScheduler);
case VOID:
return Mono.empty();
default:
return Mono.error(new IllegalStateException("Unexpected value: " + resultType));
}
} else {
return null;
}
} finally {
if (updateMode == UpdateMode.ALLOW) {
lock.unlockRead(stamp);
}
}
})
.onErrorMap(cause -> new IOException("Failed to read " + LLUtils.toStringSafe(key), cause))
.subscribeOn(dbScheduler);
case VOID -> Mono.empty();
})
.doFirst(key::retain)
.doAfterTerminate(key::release);
@ -932,80 +942,92 @@ public class LLLocalDictionary implements LLDictionary {
}
@Override
public Flux<Entry<ByteBuf, ByteBuf>> getMulti(@Nullable LLSnapshot snapshot,
Flux<ByteBuf> keys,
public <K> Flux<Tuple3<K, ByteBuf, ByteBuf>> getMulti(@Nullable LLSnapshot snapshot,
Flux<Tuple2<K, ByteBuf>> keys,
boolean existsAlmostCertainly) {
return keys
.window(MULTI_GET_WINDOW)
.flatMap(keysWindowFlux -> keysWindowFlux
.collectList()
.doOnDiscard(Entry.class, discardedEntry -> {
//noinspection unchecked
var entry = (Entry<ByteBuf, ByteBuf>) discardedEntry;
entry.getKey().release();
entry.getValue().release();
})
.flatMapMany(keysWindow -> Mono
.fromCallable(() -> {
Iterable<StampedLock> locks;
ArrayList<Long> stamps;
if (updateMode == UpdateMode.ALLOW) {
locks = itemsLock.bulkGetAt(getLockIndices(keysWindow));
stamps = new ArrayList<>();
for (var lock : locks) {
stamps.add(lock.readLock());
}
} else {
locks = null;
stamps = null;
}
try {
var handlesArray = new ColumnFamilyHandle[keysWindow.size()];
Arrays.fill(handlesArray, cfh);
var handles = ObjectArrayList.wrap(handlesArray, handlesArray.length);
var results = db.multiGetAsList(resolveSnapshot(snapshot), handles, LLUtils.toArray(keysWindow));
var mappedResults = new ArrayList<Entry<ByteBuf, ByteBuf>>(results.size());
for (int i = 0; i < results.size(); i++) {
var val = results.get(i);
if (val != null) {
results.set(i, null);
mappedResults.add(Map.entry(keysWindow.get(i).retain(), wrappedBuffer(val)));
}
}
return mappedResults;
} finally {
.bufferTimeout(MULTI_GET_WINDOW, MULTI_GET_WINDOW_TIMEOUT)
.doOnDiscard(Tuple2.class, discardedEntry -> {
//noinspection unchecked
var entry = (Tuple2<K, ByteBuf>) discardedEntry;
entry.getT2().release();
})
.doOnDiscard(Tuple3.class, discardedEntry -> {
//noinspection unchecked
var entry = (Tuple3<K, ByteBuf, ByteBuf>) discardedEntry;
entry.getT2().release();
entry.getT3().release();
})
.flatMapSequential(keysWindow -> {
List<ByteBuf> keyBufsWindow = new ArrayList<>(keysWindow.size());
for (Tuple2<K, ByteBuf> objects : keysWindow) {
keyBufsWindow.add(objects.getT2());
}
return Mono
.fromCallable(() -> {
Iterable<StampedLock> locks;
ArrayList<Long> stamps;
if (updateMode == UpdateMode.ALLOW) {
int index = 0;
locks = itemsLock.bulkGetAt(getLockIndices(keyBufsWindow));
stamps = new ArrayList<>();
for (var lock : locks) {
lock.unlockRead(stamps.get(index));
index++;
stamps.add(lock.readLock());
}
} else {
locks = null;
stamps = null;
}
try {
var columnFamilyHandles = new RepeatedElementList<>(cfh, keysWindow.size());
var results = db.multiGetAsList(resolveSnapshot(snapshot), columnFamilyHandles, LLUtils.toArray(keyBufsWindow));
var mappedResults = new ArrayList<Tuple3<K, ByteBuf, ByteBuf>>(results.size());
for (int i = 0; i < results.size(); i++) {
var val = results.get(i);
if (val != null) {
results.set(i, null);
mappedResults.add(Tuples.of(keysWindow.get(i).getT1(),
keyBufsWindow.get(i).retain(),
wrappedBuffer(val)
));
}
}
return mappedResults;
} finally {
if (updateMode == UpdateMode.ALLOW) {
int index = 0;
for (var lock : locks) {
lock.unlockRead(stamps.get(index));
index++;
}
}
}
}
})
.subscribeOn(dbScheduler)
.flatMapMany(Flux::fromIterable)
.onErrorMap(cause -> new IOException("Failed to read keys "
+ Arrays.deepToString(keysWindow.toArray(ByteBuf[]::new)), cause))
.doAfterTerminate(() -> keysWindow.forEach(ReferenceCounted::release))
)
)
})
.subscribeOn(dbScheduler)
.flatMapMany(Flux::fromIterable)
.onErrorMap(cause -> new IOException("Failed to read keys "
+ Arrays.deepToString(keyBufsWindow.toArray(ByteBuf[]::new)), cause))
.doAfterTerminate(() -> keyBufsWindow.forEach(ReferenceCounted::release));
}, 2) // Max concurrency is 2 to read data while preparing the next segment
.doOnDiscard(Entry.class, discardedEntry -> {
//noinspection unchecked
var entry = (Entry<ByteBuf, ByteBuf>) discardedEntry;
entry.getKey().release();
entry.getValue().release();
})
.doOnDiscard(Tuple3.class, discardedEntry -> {
//noinspection unchecked
var entry = (Tuple3<K, ByteBuf, ByteBuf>) discardedEntry;
entry.getT2().release();
entry.getT3().release();
});
}
@Override
public Flux<Entry<ByteBuf, ByteBuf>> putMulti(Flux<Entry<ByteBuf, ByteBuf>> entries, boolean getOldValues) {
return entries
.bufferTime(Math.min(MULTI_GET_WINDOW, CAPPED_WRITE_BATCH_CAP))
.flatMap(Flux::collectList)
.map(Collections::unmodifiableList)
.flatMap(ew -> Mono
.buffer(Math.min(MULTI_GET_WINDOW, CAPPED_WRITE_BATCH_CAP))
.flatMapSequential(ew -> Mono
.using(
() -> ew,
entriesWindow -> Mono
@ -1054,13 +1076,15 @@ public class LLLocalDictionary implements LLDictionary {
.subscribeOn(dbScheduler)
// Prepend everything to get previous elements
.transformDeferred(transformer -> {
.transform(transformer -> {
var obj = new Object();
if (getOldValues) {
return this
.getMulti(null, Flux
.fromIterable(entriesWindow)
.map(Entry::getKey)
.map(ByteBuf::retain), false)
.map(ByteBuf::retain)
.map(buf -> Tuples.of(obj, buf)), false)
.publishOn(dbScheduler)
.then(transformer);
} else {
@ -1073,8 +1097,7 @@ public class LLLocalDictionary implements LLDictionary {
entry.getValue().release();
}
}
)
)
), 2) // Max concurrency is 2 to read data while preparing the next segment
.doOnDiscard(Entry.class, entry -> {
if (entry.getKey() instanceof ByteBuf && entry.getValue() instanceof ByteBuf) {
//noinspection unchecked
@ -1093,6 +1116,152 @@ public class LLLocalDictionary implements LLDictionary {
});
}
@Override
public <X> Flux<ExtraKeyOperationResult<ByteBuf, X>> updateMulti(Flux<Tuple2<ByteBuf, X>> entries,
BiFunction<ByteBuf, X, ByteBuf> updateFunction) {
return entries
.buffer(Math.min(MULTI_GET_WINDOW, CAPPED_WRITE_BATCH_CAP))
.flatMapSequential(ew -> Flux
.using(
() -> ew,
entriesWindow -> {
List<ByteBuf> keyBufsWindow = new ArrayList<>(entriesWindow.size());
for (Tuple2<ByteBuf, X> objects : entriesWindow) {
keyBufsWindow.add(objects.getT1());
}
return Mono
.<Iterable<ExtraKeyOperationResult<ByteBuf, X>>>fromCallable(() -> {
Iterable<StampedLock> locks;
ArrayList<Long> stamps;
if (updateMode == UpdateMode.ALLOW) {
locks = itemsLock.bulkGetAt(getLockIndicesWithExtra(entriesWindow));
stamps = new ArrayList<>();
for (var lock : locks) {
stamps.add(lock.writeLock());
}
} else {
locks = null;
stamps = null;
}
try {
var columnFamilyHandles = new RepeatedElementList<>(cfh, entriesWindow.size());
ArrayList<Tuple3<ByteBuf, X, Optional<ByteBuf>>> mappedInputs;
{
var inputs = db.multiGetAsList(resolveSnapshot(null), columnFamilyHandles, LLUtils.toArray(keyBufsWindow));
mappedInputs = new ArrayList<>(inputs.size());
for (int i = 0; i < inputs.size(); i++) {
var val = inputs.get(i);
if (val != null) {
inputs.set(i, null);
mappedInputs.add(Tuples.of(
keyBufsWindow.get(i).retain(),
entriesWindow.get(i).getT2(),
Optional.of(wrappedBuffer(val))
));
} else {
mappedInputs.add(Tuples.of(
keyBufsWindow.get(i).retain(),
entriesWindow.get(i).getT2(),
Optional.empty()
));
}
}
}
var updatedValuesToWrite = new ArrayList<ByteBuf>(mappedInputs.size());
var valueChangedResult = new ArrayList<ExtraKeyOperationResult<ByteBuf, X>>(mappedInputs.size());
try {
for (var mappedInput : mappedInputs) {
var updatedValue = updateFunction.apply(mappedInput.getT1().retain(), mappedInput.getT2());
valueChangedResult.add(new ExtraKeyOperationResult<>(mappedInput.getT1(),
mappedInput.getT2(),
!Objects.equals(mappedInput.getT3().orElse(null), updatedValue.retain())
));
updatedValuesToWrite.add(updatedValue);
}
} finally {
for (var mappedInput : mappedInputs) {
mappedInput.getT3().ifPresent(ReferenceCounted::release);
}
}
if (USE_WRITE_BATCHES_IN_PUT_MULTI) {
var batch = new CappedWriteBatch(db,
CAPPED_WRITE_BATCH_CAP,
RESERVED_WRITE_BATCH_SIZE,
MAX_WRITE_BATCH_SIZE,
BATCH_WRITE_OPTIONS
);
int i = 0;
for (Tuple2<ByteBuf, X> entry : entriesWindow) {
var valueToWrite = updatedValuesToWrite.get(i);
if (valueToWrite == null) {
batch.delete(cfh, entry.getT1().retain());
} else {
batch.put(cfh, entry.getT1().retain(), valueToWrite.retain());
}
i++;
}
batch.writeToDbAndClose();
batch.close();
} else {
int i = 0;
for (Tuple2<ByteBuf, X> entry : entriesWindow) {
var valueToWrite = updatedValuesToWrite.get(i);
db.put(cfh, EMPTY_WRITE_OPTIONS, entry.getT1().nioBuffer(), valueToWrite.nioBuffer());
i++;
}
}
return valueChangedResult;
} finally {
if (updateMode == UpdateMode.ALLOW) {
int index = 0;
for (var lock : locks) {
lock.unlockWrite(stamps.get(index));
index++;
}
}
}
})
.subscribeOn(dbScheduler)
.flatMapMany(Flux::fromIterable);
},
entriesWindow -> {
for (Tuple2<ByteBuf, X> entry : entriesWindow) {
entry.getT1().release();
}
}
), 2 // Max concurrency is 2 to update data while preparing the next segment
)
.doOnDiscard(Tuple2.class, entry -> {
if (entry.getT1() instanceof ByteBuf bb) {
bb.release();
}
if (entry.getT2() instanceof ByteBuf bb) {
bb.release();
}
})
.doOnDiscard(ExtraKeyOperationResult.class, entry -> {
if (entry.key() instanceof ByteBuf bb) {
bb.release();
}
if (entry.extra() instanceof ByteBuf bb) {
bb.release();
}
})
.doOnDiscard(Collection.class, obj -> {
//noinspection unchecked
var castedEntries = (Collection<ExtraKeyOperationResult<Object, Object>>) obj;
for (var entry : castedEntries) {
if (entry.key() instanceof ByteBuf bb) {
bb.release();
}
if (entry.extra() instanceof ByteBuf bb) {
bb.release();
}
}
});
}
@Override
public Flux<Entry<ByteBuf, ByteBuf>> getRange(@Nullable LLSnapshot snapshot,
LLRange range,
@ -1811,7 +1980,7 @@ public class LLLocalDictionary implements LLDictionary {
}
})
.onErrorMap(cause -> new IOException("Failed to get size of range "
+ range.toString(), cause))
+ range, cause))
.subscribeOn(dbScheduler);
}
})

View File

@ -29,6 +29,7 @@ import org.apache.commons.lang3.time.StopWatch;
import org.jetbrains.annotations.Nullable;
import org.rocksdb.BlockBasedTableConfig;
import org.rocksdb.BloomFilter;
import org.rocksdb.ClockCache;
import org.rocksdb.ColumnFamilyDescriptor;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.CompactRangeOptions;
@ -38,6 +39,7 @@ import org.rocksdb.CompressionType;
import org.rocksdb.DBOptions;
import org.rocksdb.DbPath;
import org.rocksdb.FlushOptions;
import org.rocksdb.IndexType;
import org.rocksdb.LRUCache;
import org.rocksdb.MemoryUtil;
import org.rocksdb.Options;
@ -315,11 +317,25 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
.setMaxTotalWalSize(0) // automatic
;
tableOptions
.setBlockCache(new LRUCache(8L * 1024L * 1024L)) // 8MiB
.setCacheIndexAndFilterBlocks(false)
.setPinL0FilterAndIndexBlocksInCache(false)
.setIndexType(IndexType.kTwoLevelIndexSearch)
.setPartitionFilters(true)
.setMetadataBlockSize(4096)
.setBlockCache(new ClockCache(8L * 1024L * 1024L)) // 8MiB
.setCacheIndexAndFilterBlocks(true)
.setCacheIndexAndFilterBlocksWithHighPriority(true)
.setPinL0FilterAndIndexBlocksInCache(true)
;
options.setWriteBufferManager(new WriteBufferManager(8L * 1024L * 1024L, new LRUCache(8L * 1024L * 1024L))); // 8MiB
options.setWriteBufferManager(new WriteBufferManager(8L * 1024L * 1024L, new ClockCache(8L * 1024L * 1024L))); // 8MiB
if (databaseOptions.useDirectIO()) {
options
// Option to enable readahead in compaction
// If not set, it will be set to 2MB internally
.setCompactionReadaheadSize(2 * 1024 * 1024) // recommend at least 2MB
// Option to tune write buffer for direct writes
.setWritableFileMaxBufferSize(1024 * 1024)
;
}
} else {
// HIGH MEMORY
options
@ -329,7 +345,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
.setIncreaseParallelism(Runtime.getRuntime().availableProcessors())
.setBytesPerSync(1 * 1024 * 1024) // 1MiB
.setWalBytesPerSync(10 * 1024 * 1024)
.setMaxOpenFiles(150)
.setMaxOpenFiles(30)
.optimizeLevelStyleCompaction(
128 * 1024 * 1024) // 128MiB of ram will be used for level style compaction
.setWriteBufferSize(64 * 1024 * 1024) // 64MB
@ -338,33 +354,44 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
.setMaxTotalWalSize(2L * 1024L * 1024L * 1024L) // 2GiB max wal directory size
;
tableOptions
.setBlockCache(new LRUCache(128L * 1024L * 1024L)) // 128MiB
.setIndexType(IndexType.kTwoLevelIndexSearch)
.setPartitionFilters(true)
.setMetadataBlockSize(4096)
.setBlockCache(new ClockCache(512L * 1024L * 1024L)) // 512MiB
.setCacheIndexAndFilterBlocks(true)
.setCacheIndexAndFilterBlocksWithHighPriority(true)
.setPinL0FilterAndIndexBlocksInCache(true)
;
final BloomFilter bloomFilter = new BloomFilter(10, false);
tableOptions.setOptimizeFiltersForMemory(true);
tableOptions.setFilterPolicy(bloomFilter);
options.setWriteBufferManager(new WriteBufferManager(256L * 1024L * 1024L, new LRUCache(128L * 1024L * 1024L))); // 128MiB
options.setWriteBufferManager(new WriteBufferManager(256L * 1024L * 1024L, new ClockCache(128L * 1024L * 1024L))); // 128MiB
if (databaseOptions.useDirectIO()) {
options
.setAllowMmapReads(false)
.setAllowMmapWrites(false)
.setUseDirectIoForFlushAndCompaction(true)
.setUseDirectReads(true)
// Option to enable readahead in compaction
// If not set, it will be set to 2MB internally
.setCompactionReadaheadSize(2 * 1024 * 1024) // recommend at least 2MB
.setCompactionReadaheadSize(4 * 1024 * 1024) // recommend at least 2MB
// Option to tune write buffer for direct writes
.setWritableFileMaxBufferSize(1024 * 1024)
.setWritableFileMaxBufferSize(4 * 1024 * 1024)
;
} else {
options
.setAllowMmapReads(databaseOptions.allowMemoryMapping())
.setAllowMmapWrites(databaseOptions.allowMemoryMapping());
}
}
if (databaseOptions.useDirectIO()) {
options
.setAllowMmapReads(false)
.setAllowMmapWrites(false)
.setUseDirectReads(true)
;
} else {
options
.setAllowMmapReads(databaseOptions.allowMemoryMapping())
.setAllowMmapWrites(databaseOptions.allowMemoryMapping());
}
if (!databaseOptions.allowMemoryMapping()) {
options.setUseDirectIoForFlushAndCompaction(true);
}
tableOptions.setBlockSize(16 * 1024); // 16MiB
options.setTableFormatConfig(tableOptions);

View File

@ -5,6 +5,7 @@ import io.netty.buffer.ByteBufAllocator;
import io.netty.util.ReferenceCounted;
import it.cavallium.dbengine.client.BadBlock;
import it.cavallium.dbengine.database.Delta;
import it.cavallium.dbengine.database.ExtraKeyOperationResult;
import it.cavallium.dbengine.database.LLDictionary;
import it.cavallium.dbengine.database.LLDictionaryResultType;
import it.cavallium.dbengine.database.LLRange;
@ -18,10 +19,14 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.function.BiFunction;
import java.util.function.Function;
import org.jetbrains.annotations.Nullable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuple3;
import reactor.util.function.Tuples;
public class LLMemoryDictionary implements LLDictionary {
@ -188,20 +193,20 @@ public class LLMemoryDictionary implements LLDictionary {
}
@Override
public Flux<Entry<ByteBuf, ByteBuf>> getMulti(@Nullable LLSnapshot snapshot,
Flux<ByteBuf> keys,
public <K> Flux<Tuple3<K, ByteBuf, ByteBuf>> getMulti(@Nullable LLSnapshot snapshot,
Flux<Tuple2<K, ByteBuf>> keys,
boolean existsAlmostCertainly) {
return keys
.handle((key, sink) -> {
try {
var v = snapshots.get(resolveSnapshot(snapshot)).get(k(key));
var v = snapshots.get(resolveSnapshot(snapshot)).get(k(key.getT2()));
if (v == null) {
sink.complete();
} else {
sink.next(Map.entry(key.retain(), kk(v)));
sink.next(Tuples.of(key.getT1(), key.getT2().retain(), kk(v)));
}
} finally {
key.release();
key.getT2().release();
}
});
}
@ -226,6 +231,12 @@ public class LLMemoryDictionary implements LLDictionary {
});
}
@Override
public <X> Flux<ExtraKeyOperationResult<ByteBuf, X>> updateMulti(Flux<Tuple2<ByteBuf, X>> entries,
BiFunction<ByteBuf, X, ByteBuf> updateFunction) {
return Flux.error(new UnsupportedOperationException("Not implemented"));
}
@Override
public Flux<Entry<ByteBuf, ByteBuf>> getRange(@Nullable LLSnapshot snapshot,
LLRange range,

View File

@ -8,7 +8,7 @@ import it.cavallium.dbengine.client.query.current.data.QueryParams;
import it.cavallium.dbengine.database.LLKeyScore;
import it.cavallium.dbengine.database.collections.DatabaseMapDictionary;
import it.cavallium.dbengine.database.collections.DatabaseMapDictionaryDeep;
import it.cavallium.dbengine.database.collections.Joiner.ValueGetter;
import it.cavallium.dbengine.database.collections.ValueGetter;
import it.cavallium.dbengine.lucene.analyzer.NCharGramAnalyzer;
import it.cavallium.dbengine.lucene.analyzer.NCharGramEdgeAnalyzer;
import it.cavallium.dbengine.lucene.analyzer.TextFieldsAnalyzer;