Update to java 16
This commit is contained in:
parent
211d2fc99e
commit
2d24436b93
19
pom.xml
19
pom.xml
|
@ -235,6 +235,12 @@
|
|||
<artifactId>data-generator</artifactId>
|
||||
<version>[0.9.26,)</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.soabase.record-builder</groupId>
|
||||
<artifactId>record-builder-core</artifactId>
|
||||
<version>1.19</version>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
<build>
|
||||
<testSourceDirectory>src/test/java</testSourceDirectory>
|
||||
|
@ -283,13 +289,24 @@
|
|||
<artifactId>maven-compiler-plugin</artifactId>
|
||||
<version>3.8.1</version>
|
||||
<configuration>
|
||||
<release>11</release>
|
||||
<release>16</release>
|
||||
<annotationProcessorPaths>
|
||||
<annotationProcessorPath>
|
||||
<groupId>io.soabase.record-builder</groupId>
|
||||
<artifactId>record-builder-processor</artifactId>
|
||||
<version>1.19</version>
|
||||
</annotationProcessorPath>
|
||||
</annotationProcessorPaths>
|
||||
<annotationProcessors>
|
||||
<annotationProcessor>io.soabase.recordbuilder.processor.RecordBuilderProcessor</annotationProcessor>
|
||||
</annotationProcessors>
|
||||
<useIncrementalCompilation>false</useIncrementalCompilation>
|
||||
</configuration>
|
||||
</plugin>
|
||||
<plugin>
|
||||
<groupId>it.cavallium</groupId>
|
||||
<artifactId>data-generator</artifactId>
|
||||
<version>0.9.0-SNAPSHOT</version>
|
||||
<executions>
|
||||
<execution>
|
||||
<id>generate-lucene-query-sources</id>
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package it.cavallium.dbengine.client;
|
||||
|
||||
import it.cavallium.dbengine.client.query.ClientQueryParams;
|
||||
import it.cavallium.dbengine.client.query.ClientQueryParamsBuilder;
|
||||
import it.cavallium.dbengine.client.query.current.data.Query;
|
||||
import it.cavallium.dbengine.client.query.current.data.QueryParams;
|
||||
import it.cavallium.dbengine.database.LLLuceneIndex;
|
||||
|
@ -105,11 +106,11 @@ public class LuceneIndexImpl<T, U> implements LuceneIndex<T, U> {
|
|||
long offset,
|
||||
@Nullable Long limit) {
|
||||
Flux<SearchResultKeys<T>> mappedKeys = llSearchResult
|
||||
.getResults()
|
||||
.results()
|
||||
.map(flux -> new SearchResultKeys<>(flux
|
||||
.getResults()
|
||||
.results()
|
||||
.map(signal -> new SearchResultKey<>(indicizer.getKey(signal.getKey()), signal.getScore())),
|
||||
flux.getTotalHitsCount()
|
||||
flux.totalHitsCount()
|
||||
));
|
||||
MultiSort<SearchResultKey<T>> finalSort;
|
||||
if (scoreMode != LLScoreMode.COMPLETE_NO_SCORES && sort == null) {
|
||||
|
@ -137,13 +138,13 @@ public class LuceneIndexImpl<T, U> implements LuceneIndex<T, U> {
|
|||
@Nullable Long limit,
|
||||
ValueGetter<T, U> valueGetter) {
|
||||
Flux<SearchResult<T, U>> mappedKeys = llSearchResult
|
||||
.getResults()
|
||||
.map(flux -> new SearchResult<>(flux.getResults().flatMapSequential(signal -> {
|
||||
.results()
|
||||
.map(flux -> new SearchResult<>(flux.results().flatMapSequential(signal -> {
|
||||
var key = indicizer.getKey(signal.getKey());
|
||||
return valueGetter
|
||||
.get(key)
|
||||
.map(value -> new SearchResultItem<>(key, value, signal.getScore()));
|
||||
}), flux.getTotalHitsCount()));
|
||||
}), flux.totalHitsCount()));
|
||||
MultiSort<SearchResultItem<T, U>> finalSort;
|
||||
if (scoreMode != LLScoreMode.COMPLETE_NO_SCORES && sort == null) {
|
||||
finalSort = MultiSort.topScoreWithValues();
|
||||
|
@ -177,12 +178,12 @@ public class LuceneIndexImpl<T, U> implements LuceneIndex<T, U> {
|
|||
Flux<Tuple2<String, Set<String>>> mltDocumentFields
|
||||
= indicizer.getMoreLikeThisDocumentFields(key, mltDocumentValue);
|
||||
return luceneIndex
|
||||
.moreLikeThis(resolveSnapshot(queryParams.getSnapshot()), fixOffset(luceneIndex, queryParams.toQueryParams()), indicizer.getKeyFieldName(), mltDocumentFields)
|
||||
.moreLikeThis(resolveSnapshot(queryParams.snapshot()), fixOffset(luceneIndex, queryParams.toQueryParams()), indicizer.getKeyFieldName(), mltDocumentFields)
|
||||
.flatMap(llSearchResult -> this.transformLuceneResult(llSearchResult,
|
||||
queryParams.getSort(),
|
||||
queryParams.getScoreMode(),
|
||||
fixTransformOffset(luceneIndex, queryParams.getOffset()),
|
||||
queryParams.getLimit()
|
||||
queryParams.sort(),
|
||||
queryParams.scoreMode(),
|
||||
fixTransformOffset(luceneIndex, queryParams.offset()),
|
||||
queryParams.limit()
|
||||
));
|
||||
|
||||
}
|
||||
|
@ -203,16 +204,16 @@ public class LuceneIndexImpl<T, U> implements LuceneIndex<T, U> {
|
|||
Flux<Tuple2<String, Set<String>>> mltDocumentFields
|
||||
= indicizer.getMoreLikeThisDocumentFields(key, mltDocumentValue);
|
||||
return luceneIndex
|
||||
.moreLikeThis(resolveSnapshot(queryParams.getSnapshot()),
|
||||
.moreLikeThis(resolveSnapshot(queryParams.snapshot()),
|
||||
fixOffset(luceneIndex, queryParams.toQueryParams()),
|
||||
indicizer.getKeyFieldName(),
|
||||
mltDocumentFields
|
||||
)
|
||||
.flatMap(llSearchResult -> this.transformLuceneResultWithValues(llSearchResult,
|
||||
queryParams.getSort(),
|
||||
queryParams.getScoreMode(),
|
||||
fixTransformOffset(luceneIndex, queryParams.getOffset()),
|
||||
queryParams.getLimit(),
|
||||
queryParams.sort(),
|
||||
queryParams.scoreMode(),
|
||||
fixTransformOffset(luceneIndex, queryParams.offset()),
|
||||
queryParams.limit(),
|
||||
valueGetter
|
||||
));
|
||||
}
|
||||
|
@ -227,15 +228,15 @@ public class LuceneIndexImpl<T, U> implements LuceneIndex<T, U> {
|
|||
@Override
|
||||
public Mono<SearchResultKeys<T>> search(ClientQueryParams<SearchResultKey<T>> queryParams) {
|
||||
return luceneIndex
|
||||
.search(resolveSnapshot(queryParams.getSnapshot()),
|
||||
.search(resolveSnapshot(queryParams.snapshot()),
|
||||
fixOffset(luceneIndex, queryParams.toQueryParams()),
|
||||
indicizer.getKeyFieldName()
|
||||
)
|
||||
.flatMap(llSearchResult -> this.transformLuceneResult(llSearchResult,
|
||||
queryParams.getSort(),
|
||||
queryParams.getScoreMode(),
|
||||
fixTransformOffset(luceneIndex, queryParams.getOffset()),
|
||||
queryParams.getLimit()
|
||||
queryParams.sort(),
|
||||
queryParams.scoreMode(),
|
||||
fixTransformOffset(luceneIndex, queryParams.offset()),
|
||||
queryParams.limit()
|
||||
));
|
||||
}
|
||||
|
||||
|
@ -250,12 +251,12 @@ public class LuceneIndexImpl<T, U> implements LuceneIndex<T, U> {
|
|||
public Mono<SearchResult<T, U>> searchWithValues(ClientQueryParams<SearchResultItem<T, U>> queryParams,
|
||||
ValueGetter<T, U> valueGetter) {
|
||||
return luceneIndex
|
||||
.search(resolveSnapshot(queryParams.getSnapshot()), fixOffset(luceneIndex, queryParams.toQueryParams()), indicizer.getKeyFieldName())
|
||||
.search(resolveSnapshot(queryParams.snapshot()), fixOffset(luceneIndex, queryParams.toQueryParams()), indicizer.getKeyFieldName())
|
||||
.flatMap(llSearchResult -> this.transformLuceneResultWithValues(llSearchResult,
|
||||
queryParams.getSort(),
|
||||
queryParams.getScoreMode(),
|
||||
fixTransformOffset(luceneIndex, queryParams.getOffset()),
|
||||
queryParams.getLimit(),
|
||||
queryParams.sort(),
|
||||
queryParams.scoreMode(),
|
||||
fixTransformOffset(luceneIndex, queryParams.offset()),
|
||||
queryParams.limit(),
|
||||
valueGetter
|
||||
));
|
||||
}
|
||||
|
@ -263,7 +264,7 @@ public class LuceneIndexImpl<T, U> implements LuceneIndex<T, U> {
|
|||
@Override
|
||||
public Mono<Long> count(@Nullable CompositeSnapshot snapshot, Query query) {
|
||||
return this.search(ClientQueryParams.<SearchResultKey<T>>builder().snapshot(snapshot).query(query).limit(0).build())
|
||||
.map(SearchResultKeys::getTotalHitsCount);
|
||||
.map(SearchResultKeys::totalHitsCount);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -1,13 +1,8 @@
|
|||
package it.cavallium.dbengine.client;
|
||||
|
||||
import lombok.Value;
|
||||
import reactor.core.publisher.Flux;
|
||||
|
||||
@Value
|
||||
public class SearchResult<T, U> {
|
||||
|
||||
Flux<SearchResultItem<T, U>> results;
|
||||
long totalHitsCount;
|
||||
public record SearchResult<T, U>(Flux<SearchResultItem<T, U>> results, long totalHitsCount) {
|
||||
|
||||
public static <T, U> SearchResult<T, U> empty() {
|
||||
return new SearchResult<>(Flux.empty(), 0L);
|
||||
|
|
|
@ -1,15 +1,10 @@
|
|||
package it.cavallium.dbengine.client;
|
||||
|
||||
import it.cavallium.dbengine.database.collections.Joiner.ValueGetter;
|
||||
import lombok.Value;
|
||||
import reactor.core.publisher.Flux;
|
||||
|
||||
@SuppressWarnings("unused")
|
||||
@Value
|
||||
public class SearchResultKeys<T> {
|
||||
|
||||
Flux<SearchResultKey<T>> results;
|
||||
long totalHitsCount;
|
||||
public record SearchResultKeys<T>(Flux<SearchResultKey<T>> results, long totalHitsCount) {
|
||||
|
||||
public static <T, U> SearchResultKeys<T> empty() {
|
||||
return new SearchResultKeys<>(Flux.empty(), 0L);
|
||||
|
|
|
@ -1,85 +1,56 @@
|
|||
package it.cavallium.dbengine.client.query;
|
||||
|
||||
import io.soabase.recordbuilder.core.RecordBuilder;
|
||||
import it.cavallium.data.generator.nativedata.Nullablefloat;
|
||||
import it.cavallium.dbengine.client.CompositeSnapshot;
|
||||
import it.cavallium.dbengine.client.MultiSort;
|
||||
import it.cavallium.dbengine.client.query.current.data.NoSort;
|
||||
import it.cavallium.dbengine.client.query.current.data.Query;
|
||||
import it.cavallium.dbengine.client.query.current.data.QueryParams;
|
||||
import it.cavallium.dbengine.client.query.current.data.QueryParamsBuilder;
|
||||
import it.cavallium.dbengine.client.query.current.data.ScoreMode;
|
||||
import it.cavallium.dbengine.database.LLScoreMode;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Builder;
|
||||
import lombok.Builder.Default;
|
||||
import lombok.Data;
|
||||
import lombok.EqualsAndHashCode;
|
||||
import lombok.NonNull;
|
||||
import lombok.ToString;
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
import org.jetbrains.annotations.Nullable;
|
||||
|
||||
@EqualsAndHashCode
|
||||
@AllArgsConstructor(
|
||||
staticName = "of"
|
||||
)
|
||||
@Data
|
||||
@Builder
|
||||
@ToString
|
||||
public final class ClientQueryParams<T> {
|
||||
@RecordBuilder
|
||||
public final record ClientQueryParams<T>(@Nullable CompositeSnapshot snapshot,
|
||||
@NotNull Query query,
|
||||
long offset,
|
||||
long limit,
|
||||
@Nullable Float minCompetitiveScore,
|
||||
@Nullable MultiSort<T> sort,
|
||||
@NotNull LLScoreMode scoreMode) {
|
||||
|
||||
@Nullable
|
||||
@Default
|
||||
private CompositeSnapshot snapshot = null;
|
||||
|
||||
@NotNull
|
||||
@NonNull
|
||||
private Query query;
|
||||
|
||||
@Default
|
||||
private long offset = 0;
|
||||
|
||||
@Default
|
||||
private long limit = Long.MAX_VALUE;
|
||||
|
||||
@Nullable
|
||||
@Default
|
||||
private Float minCompetitiveScore = null;
|
||||
|
||||
@Nullable
|
||||
@Default
|
||||
private MultiSort<T> sort = null;
|
||||
|
||||
@NotNull
|
||||
@NonNull
|
||||
@Default
|
||||
private LLScoreMode scoreMode = LLScoreMode.COMPLETE;
|
||||
public static <T> ClientQueryParamsBuilder<T> builder() {
|
||||
return ClientQueryParamsBuilder
|
||||
.<T>builder()
|
||||
.snapshot(null)
|
||||
.offset(0)
|
||||
.limit(Long.MAX_VALUE)
|
||||
.minCompetitiveScore(null)
|
||||
.sort(null)
|
||||
.scoreMode(LLScoreMode.COMPLETE);
|
||||
}
|
||||
|
||||
public ScoreMode toScoreMode() {
|
||||
ScoreMode scoreMode;
|
||||
switch (getScoreMode()) {
|
||||
case COMPLETE:
|
||||
scoreMode = ScoreMode.of(false, true);
|
||||
break;
|
||||
case COMPLETE_NO_SCORES:
|
||||
scoreMode = ScoreMode.of(false, false);
|
||||
break;
|
||||
case TOP_SCORES:
|
||||
scoreMode = ScoreMode.of(true, true);
|
||||
break;
|
||||
default:
|
||||
throw new IllegalArgumentException();
|
||||
}
|
||||
return scoreMode;
|
||||
return switch (this.scoreMode()) {
|
||||
case COMPLETE -> ScoreMode.of(false, true);
|
||||
case COMPLETE_NO_SCORES -> ScoreMode.of(false, false);
|
||||
case TOP_SCORES -> ScoreMode.of(true, true);
|
||||
//noinspection UnnecessaryDefault
|
||||
default -> throw new IllegalArgumentException();
|
||||
};
|
||||
}
|
||||
|
||||
public QueryParams toQueryParams() {
|
||||
return QueryParams
|
||||
return QueryParamsBuilder
|
||||
.builder()
|
||||
.query(getQuery())
|
||||
.sort(getSort() != null ? getSort().getQuerySort() : NoSort.of())
|
||||
.minCompetitiveScore(Nullablefloat.ofNullable(getMinCompetitiveScore()))
|
||||
.offset(getOffset())
|
||||
.limit(getLimit())
|
||||
.query(query())
|
||||
.sort(sort() != null ? sort().getQuerySort() : NoSort.of())
|
||||
.minCompetitiveScore(Nullablefloat.ofNullable(minCompetitiveScore()))
|
||||
.offset(offset())
|
||||
.limit(limit())
|
||||
.scoreMode(toScoreMode())
|
||||
.build();
|
||||
}
|
||||
|
|
|
@ -1,10 +1,5 @@
|
|||
package it.cavallium.dbengine.database;
|
||||
|
||||
import lombok.Value;
|
||||
import org.jetbrains.annotations.Nullable;
|
||||
|
||||
@Value(staticConstructor = "of")
|
||||
public class Delta<T> {
|
||||
@Nullable T previous;
|
||||
@Nullable T current;
|
||||
}
|
||||
public record Delta<T>(@Nullable T previous, @Nullable T current) {}
|
||||
|
|
|
@ -53,8 +53,8 @@ public interface LLLuceneIndex extends LLSnapshottable {
|
|||
default Mono<Long> count(@Nullable LLSnapshot snapshot, Query query) {
|
||||
QueryParams params = QueryParams.of(query, 0, 0, Nullablefloat.empty(), NoSort.of(), ScoreMode.of(false, false));
|
||||
return Mono.from(this.search(snapshot, params, null)
|
||||
.flatMap(results -> LuceneUtils.mergeSignalStreamRaw(results.getResults(), null, 0, null))
|
||||
.map(LLSearchResultShard::getTotalHitsCount)
|
||||
.flatMap(results -> LuceneUtils.mergeSignalStreamRaw(results.results(), null, 0, null))
|
||||
.map(LLSearchResultShard::totalHitsCount)
|
||||
.defaultIfEmpty(0L));
|
||||
}
|
||||
|
||||
|
|
|
@ -1,14 +1,10 @@
|
|||
package it.cavallium.dbengine.database;
|
||||
|
||||
import java.util.function.BiFunction;
|
||||
import lombok.Value;
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
import reactor.core.publisher.Flux;
|
||||
|
||||
@Value
|
||||
public class LLSearchResult {
|
||||
|
||||
Flux<LLSearchResultShard> results;
|
||||
public record LLSearchResult(Flux<LLSearchResultShard> results) {
|
||||
|
||||
@NotNull
|
||||
public static BiFunction<LLSearchResult, LLSearchResult, LLSearchResult> accumulator() {
|
||||
|
|
|
@ -1,11 +1,5 @@
|
|||
package it.cavallium.dbengine.database;
|
||||
|
||||
import lombok.Value;
|
||||
import reactor.core.publisher.Flux;
|
||||
|
||||
@Value
|
||||
public class LLSearchResultShard {
|
||||
|
||||
Flux<LLKeyScore> results;
|
||||
long totalHitsCount;
|
||||
}
|
||||
public record LLSearchResultShard (Flux<LLKeyScore> results, long totalHitsCount) {}
|
||||
|
|
|
@ -432,7 +432,7 @@ public class LLUtils {
|
|||
return prev.handle((delta, sink) -> {
|
||||
switch (updateReturnMode) {
|
||||
case GET_NEW_VALUE:
|
||||
var current = delta.getCurrent();
|
||||
var current = delta.current();
|
||||
if (current != null) {
|
||||
sink.next(current);
|
||||
} else {
|
||||
|
@ -440,7 +440,7 @@ public class LLUtils {
|
|||
}
|
||||
break;
|
||||
case GET_OLD_VALUE:
|
||||
var previous = delta.getPrevious();
|
||||
var previous = delta.previous();
|
||||
if (previous != null) {
|
||||
sink.next(previous);
|
||||
} else {
|
||||
|
@ -458,8 +458,8 @@ public class LLUtils {
|
|||
|
||||
public static <T, U> Mono<Delta<U>> mapDelta(Mono<Delta<T>> mono, Function<@NotNull T, @Nullable U> mapper) {
|
||||
return mono.map(delta -> {
|
||||
T prev = delta.getPrevious();
|
||||
T curr = delta.getCurrent();
|
||||
T prev = delta.previous();
|
||||
T curr = delta.current();
|
||||
U newPrev;
|
||||
U newCurr;
|
||||
if (prev != null) {
|
||||
|
@ -472,11 +472,11 @@ public class LLUtils {
|
|||
} else {
|
||||
newCurr = null;
|
||||
}
|
||||
return Delta.of(newPrev, newCurr);
|
||||
return new Delta<>(newPrev, newCurr);
|
||||
});
|
||||
}
|
||||
|
||||
public static <R, V> boolean isDeltaChanged(Delta<V> delta) {
|
||||
return !Objects.equals(delta.getPrevious(), delta.getCurrent());
|
||||
return !Objects.equals(delta.previous(), delta.current());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -11,14 +11,12 @@ import it.cavallium.dbengine.database.LLSnapshot;
|
|||
import it.cavallium.dbengine.database.LLUtils;
|
||||
import it.cavallium.dbengine.database.UpdateMode;
|
||||
import it.cavallium.dbengine.database.disk.LLLocalDictionary;
|
||||
import it.cavallium.dbengine.database.disk.LLLocalDictionary.ReleasableSlice;
|
||||
import it.cavallium.dbengine.database.serialization.SerializerFixedBinaryLength;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.stream.Collectors;
|
||||
import lombok.Value;
|
||||
import org.jetbrains.annotations.Nullable;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
@ -423,12 +421,7 @@ public class DatabaseMapDictionaryDeep<T, U, US extends DatabaseStage<U>> implem
|
|||
return dictionary.getUpdateMode();
|
||||
}
|
||||
|
||||
@Value
|
||||
private static class GroupBuffers {
|
||||
ByteBuf groupKeyWithExt;
|
||||
ByteBuf groupKeyWithoutExt;
|
||||
ByteBuf groupSuffix;
|
||||
}
|
||||
private static record GroupBuffers(ByteBuf groupKeyWithExt, ByteBuf groupKeyWithoutExt, ByteBuf groupSuffix) {}
|
||||
|
||||
@Override
|
||||
public Flux<Entry<T, US>> getAllStages(@Nullable CompositeSnapshot snapshot) {
|
||||
|
|
|
@ -207,7 +207,7 @@ public interface DatabaseStageMap<T, U, US extends DatabaseStage<U>> extends Dat
|
|||
.flatMap(result -> Mono
|
||||
.justOrEmpty(result.getT2())
|
||||
.flatMap(values -> this.setAllValues(Flux.fromIterable(values.entrySet())))
|
||||
.thenReturn(Delta.of(result.getT1().orElse(null), result.getT2().orElse(null)))
|
||||
.thenReturn(new Delta<>(result.getT1().orElse(null), result.getT2().orElse(null)))
|
||||
);
|
||||
} else if (updateMode == UpdateMode.ALLOW) {
|
||||
return Mono.fromCallable(() -> {
|
||||
|
|
|
@ -2,7 +2,6 @@ package it.cavallium.dbengine.database.disk;
|
|||
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.buffer.ByteBufAllocator;
|
||||
import io.netty.buffer.PooledByteBufAllocator;
|
||||
import io.netty.util.ReferenceCounted;
|
||||
import it.cavallium.dbengine.database.Delta;
|
||||
import it.cavallium.dbengine.database.LLDictionary;
|
||||
|
@ -30,8 +29,6 @@ import java.util.concurrent.locks.StampedLock;
|
|||
import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.IntStream;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Data;
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
import org.jetbrains.annotations.Nullable;
|
||||
|
@ -617,16 +614,13 @@ public class LLLocalDictionary implements LLDictionary {
|
|||
}
|
||||
dbPut(cfh, null, key.retain(), newData.retain());
|
||||
}
|
||||
switch (updateReturnMode) {
|
||||
case GET_NEW_VALUE:
|
||||
return newData != null ? newData.retain() : null;
|
||||
case GET_OLD_VALUE:
|
||||
return prevData != null ? prevData.retain() : null;
|
||||
case NOTHING:
|
||||
return null;
|
||||
default:
|
||||
throw new IllegalArgumentException();
|
||||
}
|
||||
return switch (updateReturnMode) {
|
||||
case GET_NEW_VALUE -> newData != null ? newData.retain() : null;
|
||||
case GET_OLD_VALUE -> prevData != null ? prevData.retain() : null;
|
||||
case NOTHING -> null;
|
||||
//noinspection UnnecessaryDefault
|
||||
default -> throw new IllegalArgumentException();
|
||||
};
|
||||
} finally {
|
||||
if (newData != null) {
|
||||
newData.release();
|
||||
|
@ -744,7 +738,7 @@ public class LLLocalDictionary implements LLDictionary {
|
|||
}
|
||||
dbPut(cfh, null, key.retain(), newData.retain());
|
||||
}
|
||||
return Delta.of(
|
||||
return new Delta<>(
|
||||
prevData != null ? prevData.retain() : null,
|
||||
newData != null ? newData.retain() : null
|
||||
);
|
||||
|
@ -1531,7 +1525,7 @@ public class LLLocalDictionary implements LLDictionary {
|
|||
} else {
|
||||
readOpts.setIterateUpperBound(slice);
|
||||
}
|
||||
return new ReleasableSlice(slice, buffer.retain(), nioBuffer);
|
||||
return new ReleasableSliceImpl(slice, buffer.retain(), nioBuffer);
|
||||
} else {
|
||||
slice = new Slice(Objects.requireNonNull(LLUtils.toArray(buffer)));
|
||||
if (boundType == IterateBound.LOWER) {
|
||||
|
@ -1539,7 +1533,7 @@ public class LLLocalDictionary implements LLDictionary {
|
|||
} else {
|
||||
readOpts.setIterateUpperBound(slice);
|
||||
}
|
||||
return new ReleasableSlice(slice, null, null);
|
||||
return new ReleasableSliceImpl(slice, null, null);
|
||||
}
|
||||
} finally {
|
||||
buffer.release();
|
||||
|
@ -1548,20 +1542,17 @@ public class LLLocalDictionary implements LLDictionary {
|
|||
|
||||
private static ReleasableSlice emptyReleasableSlice() {
|
||||
var arr = new byte[0];
|
||||
return new ReleasableSlice(new Slice(arr), null, arr) {
|
||||
@Override
|
||||
public void release() {
|
||||
}
|
||||
};
|
||||
|
||||
return new SimpleSliceWithoutRelease(new Slice(arr), null, arr);
|
||||
}
|
||||
|
||||
@Data
|
||||
@AllArgsConstructor
|
||||
public static class ReleasableSlice {
|
||||
AbstractSlice<?> slice;
|
||||
@Nullable ByteBuf byteBuf;
|
||||
private @Nullable Object additionalData;
|
||||
public static record SimpleSliceWithoutRelease(AbstractSlice<?> slice, @Nullable ByteBuf byteBuf,
|
||||
@Nullable Object additionalData) implements ReleasableSlice {}
|
||||
|
||||
public static record ReleasableSliceImpl(AbstractSlice<?> slice, @Nullable ByteBuf byteBuf,
|
||||
@Nullable Object additionalData) implements ReleasableSlice {
|
||||
|
||||
@Override
|
||||
public void release() {
|
||||
slice.clear();
|
||||
if (byteBuf != null) {
|
||||
|
|
|
@ -28,7 +28,6 @@ import java.util.concurrent.CopyOnWriteArrayList;
|
|||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import lombok.Value;
|
||||
import org.apache.lucene.search.CollectionStatistics;
|
||||
import org.apache.lucene.search.IndexSearcher;
|
||||
import org.jetbrains.annotations.Nullable;
|
||||
|
@ -41,7 +40,6 @@ import reactor.core.scheduler.Schedulers;
|
|||
import reactor.util.function.Tuple2;
|
||||
import reactor.util.function.Tuples;
|
||||
|
||||
@SuppressWarnings("UnstableApiUsage")
|
||||
public class LLLocalMultiLuceneIndex implements LLLuceneIndex {
|
||||
|
||||
private final Long2ObjectMap<LLSnapshot[]> registeredSnapshots = new Long2ObjectOpenHashMap<>();
|
||||
|
@ -287,21 +285,21 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex {
|
|||
queryParams,
|
||||
keyFieldName,
|
||||
mltDocumentFieldsShared,
|
||||
distributedSearch.getActionId(),
|
||||
distributedSearch.getScoreDivisor()
|
||||
distributedSearch.actionId(),
|
||||
distributedSearch.scoreDivisor()
|
||||
)
|
||||
)
|
||||
.reduce(LLSearchResult.accumulator())
|
||||
.map(result -> {
|
||||
if (distributedSearch.getActionId() != -1) {
|
||||
if (distributedSearch.actionId() != -1) {
|
||||
Flux<LLSearchResultShard> resultsWithTermination = result
|
||||
.getResults()
|
||||
.results()
|
||||
.map(flux -> new LLSearchResultShard(Flux
|
||||
.using(
|
||||
distributedSearch::getActionId,
|
||||
actionId -> flux.getResults(),
|
||||
distributedSearch::actionId,
|
||||
actionId -> flux.results(),
|
||||
this::completedAction
|
||||
), flux.getTotalHitsCount())
|
||||
), flux.totalHitsCount())
|
||||
);
|
||||
return new LLSearchResult(resultsWithTermination);
|
||||
} else {
|
||||
|
@ -309,18 +307,14 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex {
|
|||
}
|
||||
})
|
||||
.doOnError(ex -> {
|
||||
if (distributedSearch.getActionId() != -1) {
|
||||
completedAction(distributedSearch.getActionId());
|
||||
if (distributedSearch.actionId() != -1) {
|
||||
completedAction(distributedSearch.actionId());
|
||||
}
|
||||
})
|
||||
);
|
||||
}
|
||||
|
||||
@Value
|
||||
private static class DistributedSearch {
|
||||
long actionId;
|
||||
int scoreDivisor;
|
||||
}
|
||||
private static record DistributedSearch(long actionId, int scoreDivisor) {}
|
||||
|
||||
@Override
|
||||
public Mono<LLSearchResult> search(@Nullable LLSnapshot snapshot,
|
||||
|
@ -366,20 +360,20 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex {
|
|||
.distributedSearch(tuple.getT2().orElse(null),
|
||||
queryParams,
|
||||
keyFieldName,
|
||||
distributedSearch.getActionId(),
|
||||
distributedSearch.getScoreDivisor()
|
||||
distributedSearch.actionId(),
|
||||
distributedSearch.scoreDivisor()
|
||||
))
|
||||
.reduce(LLSearchResult.accumulator())
|
||||
.map(result -> {
|
||||
if (distributedSearch.getActionId() != -1) {
|
||||
if (distributedSearch.actionId() != -1) {
|
||||
Flux<LLSearchResultShard> resultsWithTermination = result
|
||||
.getResults()
|
||||
.results()
|
||||
.map(flux -> new LLSearchResultShard(Flux
|
||||
.using(
|
||||
distributedSearch::getActionId,
|
||||
actionId -> flux.getResults(),
|
||||
distributedSearch::actionId,
|
||||
actionId -> flux.results(),
|
||||
this::completedAction
|
||||
), flux.getTotalHitsCount())
|
||||
), flux.totalHitsCount())
|
||||
);
|
||||
return new LLSearchResult(resultsWithTermination);
|
||||
} else {
|
||||
|
@ -387,8 +381,8 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex {
|
|||
}
|
||||
})
|
||||
.doOnError(ex -> {
|
||||
if (distributedSearch.getActionId() != -1) {
|
||||
completedAction(distributedSearch.getActionId());
|
||||
if (distributedSearch.actionId() != -1) {
|
||||
completedAction(distributedSearch.actionId());
|
||||
}
|
||||
})
|
||||
);
|
||||
|
|
|
@ -7,7 +7,6 @@ import io.netty.buffer.ByteBufAllocator;
|
|||
import it.cavallium.dbengine.database.LLRange;
|
||||
import it.cavallium.dbengine.database.LLUtils;
|
||||
import it.cavallium.dbengine.database.collections.DatabaseMapDictionaryDeep;
|
||||
import it.cavallium.dbengine.database.disk.LLLocalDictionary.ReleasableSlice;
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
import org.rocksdb.ColumnFamilyHandle;
|
||||
import org.rocksdb.ReadOptions;
|
||||
|
|
|
@ -0,0 +1,17 @@
|
|||
package it.cavallium.dbengine.database.disk;
|
||||
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import org.rocksdb.AbstractSlice;
|
||||
|
||||
public interface ReleasableSlice {
|
||||
|
||||
default void release() {
|
||||
|
||||
}
|
||||
|
||||
AbstractSlice<?> slice();
|
||||
|
||||
ByteBuf byteBuf();
|
||||
|
||||
Object additionalData();
|
||||
}
|
|
@ -243,11 +243,11 @@ public class LuceneUtils {
|
|||
Long limit) {
|
||||
return mappedKeys.reduce(
|
||||
new SearchResultKeys<>(Flux.empty(), 0L),
|
||||
(a, b) -> new SearchResultKeys<>(LuceneUtils.mergeStream(Flux.just(a.getResults(), b.getResults()),
|
||||
(a, b) -> new SearchResultKeys<>(LuceneUtils.mergeStream(Flux.just(a.results(), b.results()),
|
||||
sort,
|
||||
offset,
|
||||
limit
|
||||
), a.getTotalHitsCount() + b.getTotalHitsCount())
|
||||
), a.totalHitsCount() + b.totalHitsCount())
|
||||
);
|
||||
}
|
||||
|
||||
|
@ -257,11 +257,11 @@ public class LuceneUtils {
|
|||
Long limit) {
|
||||
return mappedKeys.reduce(
|
||||
new SearchResult<>(Flux.empty(), 0L),
|
||||
(a, b) -> new SearchResult<>(LuceneUtils.mergeStream(Flux.just(a.getResults(), b.getResults()),
|
||||
(a, b) -> new SearchResult<>(LuceneUtils.mergeStream(Flux.just(a.results(), b.results()),
|
||||
sort,
|
||||
offset,
|
||||
limit
|
||||
), a.getTotalHitsCount() + b.getTotalHitsCount())
|
||||
), a.totalHitsCount() + b.totalHitsCount())
|
||||
);
|
||||
}
|
||||
|
||||
|
@ -272,8 +272,8 @@ public class LuceneUtils {
|
|||
return mappedKeys.reduce(
|
||||
new LLSearchResultShard(Flux.empty(), 0),
|
||||
(s1, s2) -> new LLSearchResultShard(
|
||||
LuceneUtils.mergeStream(Flux.just(s1.getResults(), s2.getResults()), mappedSort, offset, limit),
|
||||
s1.getTotalHitsCount() + s2.getTotalHitsCount()
|
||||
LuceneUtils.mergeStream(Flux.just(s1.results(), s2.results()), mappedSort, offset, limit),
|
||||
s1.totalHitsCount() + s2.totalHitsCount()
|
||||
)
|
||||
);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue
Block a user