diff --git a/src/main/java/it/cavallium/dbengine/client/CountedStream.java b/src/main/java/it/cavallium/dbengine/client/CountedStream.java index cd032a9..2262558 100644 --- a/src/main/java/it/cavallium/dbengine/client/CountedStream.java +++ b/src/main/java/it/cavallium/dbengine/client/CountedStream.java @@ -31,13 +31,13 @@ public class CountedStream { public static CountedStream merge(Collection> stream) { return stream .stream() - .reduce((a, b) -> new CountedStream(Flux.merge(a.getStream(), b.getStream()), a.getCount() + b.getCount())) + .reduce((a, b) -> new CountedStream<>(Flux.merge(a.getStream(), b.getStream()), a.getCount() + b.getCount())) .orElseGet(() -> new CountedStream<>(Flux.empty(), 0)); } public static Mono> merge(Flux> stream) { return stream - .reduce((a, b) -> new CountedStream(Flux.merge(a.getStream(), b.getStream()), a.getCount() + b.getCount())) + .reduce((a, b) -> new CountedStream<>(Flux.merge(a.getStream(), b.getStream()), a.getCount() + b.getCount())) .switchIfEmpty(Mono.fromSupplier(() -> new CountedStream<>(Flux.empty(), 0))); } diff --git a/src/main/java/it/cavallium/dbengine/client/LuceneIndex.java b/src/main/java/it/cavallium/dbengine/client/LuceneIndex.java index 10a52b4..aca2d5a 100644 --- a/src/main/java/it/cavallium/dbengine/client/LuceneIndex.java +++ b/src/main/java/it/cavallium/dbengine/client/LuceneIndex.java @@ -2,292 +2,47 @@ package it.cavallium.dbengine.client; import it.cavallium.dbengine.client.query.ClientQueryParams; import it.cavallium.dbengine.client.query.current.data.Query; -import it.cavallium.dbengine.client.query.current.data.QueryParams; -import it.cavallium.dbengine.database.LLLuceneIndex; -import it.cavallium.dbengine.database.LLScoreMode; -import it.cavallium.dbengine.database.LLSearchResult; -import it.cavallium.dbengine.database.LLSnapshot; import it.cavallium.dbengine.database.LLSnapshottable; -import it.cavallium.dbengine.database.LLTerm; import it.cavallium.dbengine.database.collections.Joiner.ValueGetter; -import it.cavallium.dbengine.lucene.LuceneUtils; -import java.util.Map; import java.util.Map.Entry; -import java.util.Set; import org.jetbrains.annotations.Nullable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import reactor.util.function.Tuple2; -public class LuceneIndex implements LLSnapshottable { +@SuppressWarnings("unused") +public interface LuceneIndex extends LLSnapshottable { - private final LLLuceneIndex luceneIndex; - private final Indicizer indicizer; + Mono addDocument(T key, U value); - public LuceneIndex(LLLuceneIndex luceneIndex, Indicizer indicizer) { - this.luceneIndex = luceneIndex; - this.indicizer = indicizer; - } + Mono addDocuments(Flux> entries); + Mono deleteDocument(T key); - private LLSnapshot resolveSnapshot(CompositeSnapshot snapshot) { - if (snapshot == null) { - return null; - } else { - return snapshot.getSnapshot(luceneIndex); - } - } + Mono updateDocument(T key, U value); - public Mono addDocument(T key, U value) { - return indicizer - .toDocument(key, value) - .flatMap(doc -> luceneIndex.addDocument(indicizer.toIndex(key), doc)); - } + Mono updateDocuments(Flux> entries); - public Mono addDocuments(Flux> entries) { - return luceneIndex - .addDocuments(entries - .flatMap(entry -> indicizer - .toDocument(entry.getKey(), entry.getValue()) - .map(doc -> Map.entry(indicizer.toIndex(entry.getKey()), doc))) - .groupBy(Entry::getKey, Entry::getValue) - ); - } + Mono deleteAll(); - public Mono deleteDocument(T key) { - LLTerm id = indicizer.toIndex(key); - return luceneIndex.deleteDocument(id); - } + Mono> moreLikeThis(ClientQueryParams> queryParams, T key, U mltDocumentValue); - public Mono updateDocument(T key, U value) { - return indicizer - .toDocument(key, value) - .flatMap(doc -> luceneIndex.updateDocument(indicizer.toIndex(key), doc)); - } - - public Mono updateDocuments(Flux> entries) { - return luceneIndex - .updateDocuments(entries - .flatMap(entry -> indicizer - .toDocument(entry.getKey(), entry.getValue()) - .map(doc -> Map.entry(indicizer.toIndex(entry.getKey()), doc))) - .groupBy(Entry::getKey, Entry::getValue) - ); - } - - public Mono deleteAll() { - return luceneIndex.deleteAll(); - } - - private static QueryParams fixOffset(LLLuceneIndex luceneIndex, QueryParams queryParams) { - if (luceneIndex.supportsOffset()) { - return queryParams; - } else { - return queryParams.setOffset(0); - } - } - - private static long fixTransformOffset(LLLuceneIndex luceneIndex, long offset) { - if (luceneIndex.supportsOffset()) { - return 0; - } else { - return offset; - } - } - - private Mono> transformLuceneResult(LLSearchResult llSearchResult, - @Nullable MultiSort> sort, - LLScoreMode scoreMode, - long offset, - @Nullable Long limit) { - Flux> mappedKeys = llSearchResult - .getResults() - .map(flux -> new SearchResultKeys<>(flux.getResults().map(signal -> { - return new SearchResultKey(indicizer.getKey(signal.getKey()), - signal.getScore() - ); - }), flux.getTotalHitsCount())); - MultiSort> finalSort; - if (scoreMode != LLScoreMode.COMPLETE_NO_SCORES && sort == null) { - finalSort = MultiSort.topScore(); - } else { - finalSort = sort; - } - - MultiSort> mappedSort; - if (finalSort != null) { - mappedSort = new MultiSort<>(finalSort.getQuerySort(), (signal1, signal2) -> { - return finalSort.getResultSort().compare((signal1), signal2); - }); - } else { - mappedSort = null; - } - return LuceneUtils.mergeSignalStreamKeys(mappedKeys, mappedSort, offset, limit); - } - - private Mono> transformLuceneResultWithValues(LLSearchResult llSearchResult, - @Nullable MultiSort> sort, - LLScoreMode scoreMode, - long offset, - @Nullable Long limit, - ValueGetter valueGetter) { - Flux> mappedKeys = llSearchResult - .getResults() - .map(flux -> new SearchResult<>(flux.getResults().flatMapSequential(signal -> { - var key = indicizer.getKey(signal.getKey()); - return valueGetter - .get(key) - .map(value -> new SearchResultItem<>(key, value, signal.getScore())); - }), flux.getTotalHitsCount())); - MultiSort> finalSort; - if (scoreMode != LLScoreMode.COMPLETE_NO_SCORES && sort == null) { - finalSort = MultiSort.topScoreWithValues(); - } else { - finalSort = sort; - } - - MultiSort> mappedSort; - if (finalSort != null) { - mappedSort = new MultiSort<>(finalSort.getQuerySort(), (signal1, signal2) -> { - return finalSort.getResultSort().compare((signal1), signal2); - }); - } else { - mappedSort = null; - } - return LuceneUtils.mergeSignalStreamItems(mappedKeys, mappedSort, offset, limit); - } - - /** - * - * @param queryParams the limit is valid for each lucene instance. - * If you have 15 instances, the number of elements returned - * can be at most limit * 15 - * @return the collection has one or more flux - */ - public Mono> moreLikeThis( - ClientQueryParams> queryParams, - T key, - U mltDocumentValue) { - Flux>> mltDocumentFields - = indicizer.getMoreLikeThisDocumentFields(key, mltDocumentValue); - return luceneIndex - .moreLikeThis(resolveSnapshot(queryParams.getSnapshot()), fixOffset(luceneIndex, queryParams.toQueryParams()), indicizer.getKeyFieldName(), mltDocumentFields) - .flatMap(llSearchResult -> this.transformLuceneResult(llSearchResult, - queryParams.getSort(), - queryParams.getScoreMode(), - fixTransformOffset(luceneIndex, queryParams.getOffset()), - queryParams.getLimit() - )); - - } - - - /** - * - * @param queryParams the limit is valid for each lucene instance. - * If you have 15 instances, the number of elements returned - * can be at most limit * 15 - * @return the collection has one or more flux - */ - public Mono> moreLikeThisWithValues( - ClientQueryParams> queryParams, + Mono> moreLikeThisWithValues(ClientQueryParams> queryParams, T key, U mltDocumentValue, - ValueGetter valueGetter) { - Flux>> mltDocumentFields - = indicizer.getMoreLikeThisDocumentFields(key, mltDocumentValue); - return luceneIndex - .moreLikeThis(resolveSnapshot(queryParams.getSnapshot()), - fixOffset(luceneIndex, queryParams.toQueryParams()), - indicizer.getKeyFieldName(), - mltDocumentFields - ) - .flatMap(llSearchResult -> this.transformLuceneResultWithValues(llSearchResult, - queryParams.getSort(), - queryParams.getScoreMode(), - fixTransformOffset(luceneIndex, queryParams.getOffset()), - queryParams.getLimit(), - valueGetter - )); - } + ValueGetter valueGetter); - /** - * - * @param queryParams the limit is valid for each lucene instance. - * If you have 15 instances, the number of elements returned - * can be at most limit * 15 - * @return the collection has one or more flux - */ - public Mono> search( - ClientQueryParams> queryParams) { - return luceneIndex - .search(resolveSnapshot(queryParams.getSnapshot()), - fixOffset(luceneIndex, queryParams.toQueryParams()), - indicizer.getKeyFieldName() - ) - .flatMap(llSearchResult -> this.transformLuceneResult(llSearchResult, - queryParams.getSort(), - queryParams.getScoreMode(), - fixTransformOffset(luceneIndex, queryParams.getOffset()), - queryParams.getLimit() - )); - } + Mono> search(ClientQueryParams> queryParams); - /** - * - * @param queryParams the limit is valid for each lucene instance. - * If you have 15 instances, the number of elements returned - * can be at most limit * 15 - * @return the collection has one or more flux - */ - public Mono> searchWithValues( - ClientQueryParams> queryParams, - ValueGetter valueGetter) { - return luceneIndex - .search(resolveSnapshot(queryParams.getSnapshot()), fixOffset(luceneIndex, queryParams.toQueryParams()), indicizer.getKeyFieldName()) - .flatMap(llSearchResult -> this.transformLuceneResultWithValues(llSearchResult, - queryParams.getSort(), - queryParams.getScoreMode(), - fixTransformOffset(luceneIndex, queryParams.getOffset()), - queryParams.getLimit(), - valueGetter - )); - } + Mono> searchWithValues(ClientQueryParams> queryParams, + ValueGetter valueGetter); - public Mono count(@Nullable CompositeSnapshot snapshot, Query query) { - return this.search(ClientQueryParams.>builder().snapshot(snapshot).query(query).limit(0).build()) - .map(SearchResultKeys::getTotalHitsCount); - } + Mono count(@Nullable CompositeSnapshot snapshot, Query query); - public boolean isLowMemoryMode() { - return luceneIndex.isLowMemoryMode(); - } + boolean isLowMemoryMode(); - public Mono close() { - return luceneIndex.close(); - } + Mono close(); - /** - * Flush writes to disk - */ - public Mono flush() { - return luceneIndex.flush(); - } + Mono flush(); - /** - * Refresh index searcher - */ - public Mono refresh() { - return luceneIndex.refresh(); - } - - @Override - public Mono takeSnapshot() { - return luceneIndex.takeSnapshot(); - } - - @Override - public Mono releaseSnapshot(LLSnapshot snapshot) { - return luceneIndex.releaseSnapshot(snapshot); - } + Mono refresh(); } diff --git a/src/main/java/it/cavallium/dbengine/client/LuceneIndexImpl.java b/src/main/java/it/cavallium/dbengine/client/LuceneIndexImpl.java new file mode 100644 index 0000000..8a501bf --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/client/LuceneIndexImpl.java @@ -0,0 +1,304 @@ +package it.cavallium.dbengine.client; + +import it.cavallium.dbengine.client.query.ClientQueryParams; +import it.cavallium.dbengine.client.query.current.data.Query; +import it.cavallium.dbengine.client.query.current.data.QueryParams; +import it.cavallium.dbengine.database.LLLuceneIndex; +import it.cavallium.dbengine.database.LLScoreMode; +import it.cavallium.dbengine.database.LLSearchResult; +import it.cavallium.dbengine.database.LLSnapshot; +import it.cavallium.dbengine.database.LLTerm; +import it.cavallium.dbengine.database.collections.Joiner.ValueGetter; +import it.cavallium.dbengine.lucene.LuceneUtils; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import org.jetbrains.annotations.Nullable; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.util.function.Tuple2; + +public class LuceneIndexImpl implements LuceneIndex { + + private final LLLuceneIndex luceneIndex; + private final Indicizer indicizer; + + public LuceneIndexImpl(LLLuceneIndex luceneIndex, Indicizer indicizer) { + this.luceneIndex = luceneIndex; + this.indicizer = indicizer; + } + + private LLSnapshot resolveSnapshot(CompositeSnapshot snapshot) { + if (snapshot == null) { + return null; + } else { + return snapshot.getSnapshot(luceneIndex); + } + } + + @Override + public Mono addDocument(T key, U value) { + return indicizer + .toDocument(key, value) + .flatMap(doc -> luceneIndex.addDocument(indicizer.toIndex(key), doc)); + } + + @Override + public Mono addDocuments(Flux> entries) { + return luceneIndex + .addDocuments(entries + .flatMap(entry -> indicizer + .toDocument(entry.getKey(), entry.getValue()) + .map(doc -> Map.entry(indicizer.toIndex(entry.getKey()), doc))) + .groupBy(Entry::getKey, Entry::getValue) + ); + } + + @Override + public Mono deleteDocument(T key) { + LLTerm id = indicizer.toIndex(key); + return luceneIndex.deleteDocument(id); + } + + @Override + public Mono updateDocument(T key, U value) { + return indicizer + .toDocument(key, value) + .flatMap(doc -> luceneIndex.updateDocument(indicizer.toIndex(key), doc)); + } + + @Override + public Mono updateDocuments(Flux> entries) { + return luceneIndex + .updateDocuments(entries + .flatMap(entry -> indicizer + .toDocument(entry.getKey(), entry.getValue()) + .map(doc -> Map.entry(indicizer.toIndex(entry.getKey()), doc))) + .groupBy(Entry::getKey, Entry::getValue) + ); + } + + @Override + public Mono deleteAll() { + return luceneIndex.deleteAll(); + } + + private static QueryParams fixOffset(LLLuceneIndex luceneIndex, QueryParams queryParams) { + if (luceneIndex.supportsOffset()) { + return queryParams; + } else { + return queryParams.setOffset(0); + } + } + + private static long fixTransformOffset(LLLuceneIndex luceneIndex, long offset) { + if (luceneIndex.supportsOffset()) { + return 0; + } else { + return offset; + } + } + + private Mono> transformLuceneResult(LLSearchResult llSearchResult, + @Nullable MultiSort> sort, + LLScoreMode scoreMode, + long offset, + @Nullable Long limit) { + Flux> mappedKeys = llSearchResult + .getResults() + .map(flux -> new SearchResultKeys<>(flux + .getResults() + .map(signal -> new SearchResultKey<>(indicizer.getKey(signal.getKey()), signal.getScore())), + flux.getTotalHitsCount() + )); + MultiSort> finalSort; + if (scoreMode != LLScoreMode.COMPLETE_NO_SCORES && sort == null) { + finalSort = MultiSort.topScore(); + } else { + finalSort = sort; + } + + MultiSort> mappedSort; + if (finalSort != null) { + mappedSort = new MultiSort<>( + finalSort.getQuerySort(), + (signal1, signal2) -> finalSort.getResultSort().compare((signal1), signal2) + ); + } else { + mappedSort = null; + } + return LuceneUtils.mergeSignalStreamKeys(mappedKeys, mappedSort, offset, limit); + } + + private Mono> transformLuceneResultWithValues(LLSearchResult llSearchResult, + @Nullable MultiSort> sort, + LLScoreMode scoreMode, + long offset, + @Nullable Long limit, + ValueGetter valueGetter) { + Flux> mappedKeys = llSearchResult + .getResults() + .map(flux -> new SearchResult<>(flux.getResults().flatMapSequential(signal -> { + var key = indicizer.getKey(signal.getKey()); + return valueGetter + .get(key) + .map(value -> new SearchResultItem<>(key, value, signal.getScore())); + }), flux.getTotalHitsCount())); + MultiSort> finalSort; + if (scoreMode != LLScoreMode.COMPLETE_NO_SCORES && sort == null) { + finalSort = MultiSort.topScoreWithValues(); + } else { + finalSort = sort; + } + + MultiSort> mappedSort; + if (finalSort != null) { + mappedSort = new MultiSort<>( + finalSort.getQuerySort(), + (signal1, signal2) -> finalSort.getResultSort().compare((signal1), signal2) + ); + } else { + mappedSort = null; + } + return LuceneUtils.mergeSignalStreamItems(mappedKeys, mappedSort, offset, limit); + } + + /** + * + * @param queryParams the limit is valid for each lucene instance. + * If you have 15 instances, the number of elements returned + * can be at most limit * 15 + * @return the collection has one or more flux + */ + @Override + public Mono> moreLikeThis(ClientQueryParams> queryParams, + T key, + U mltDocumentValue) { + Flux>> mltDocumentFields + = indicizer.getMoreLikeThisDocumentFields(key, mltDocumentValue); + return luceneIndex + .moreLikeThis(resolveSnapshot(queryParams.getSnapshot()), fixOffset(luceneIndex, queryParams.toQueryParams()), indicizer.getKeyFieldName(), mltDocumentFields) + .flatMap(llSearchResult -> this.transformLuceneResult(llSearchResult, + queryParams.getSort(), + queryParams.getScoreMode(), + fixTransformOffset(luceneIndex, queryParams.getOffset()), + queryParams.getLimit() + )); + + } + + + /** + * + * @param queryParams the limit is valid for each lucene instance. + * If you have 15 instances, the number of elements returned + * can be at most limit * 15 + * @return the collection has one or more flux + */ + @Override + public Mono> moreLikeThisWithValues(ClientQueryParams> queryParams, + T key, + U mltDocumentValue, + ValueGetter valueGetter) { + Flux>> mltDocumentFields + = indicizer.getMoreLikeThisDocumentFields(key, mltDocumentValue); + return luceneIndex + .moreLikeThis(resolveSnapshot(queryParams.getSnapshot()), + fixOffset(luceneIndex, queryParams.toQueryParams()), + indicizer.getKeyFieldName(), + mltDocumentFields + ) + .flatMap(llSearchResult -> this.transformLuceneResultWithValues(llSearchResult, + queryParams.getSort(), + queryParams.getScoreMode(), + fixTransformOffset(luceneIndex, queryParams.getOffset()), + queryParams.getLimit(), + valueGetter + )); + } + + /** + * + * @param queryParams the limit is valid for each lucene instance. + * If you have 15 instances, the number of elements returned + * can be at most limit * 15 + * @return the collection has one or more flux + */ + @Override + public Mono> search(ClientQueryParams> queryParams) { + return luceneIndex + .search(resolveSnapshot(queryParams.getSnapshot()), + fixOffset(luceneIndex, queryParams.toQueryParams()), + indicizer.getKeyFieldName() + ) + .flatMap(llSearchResult -> this.transformLuceneResult(llSearchResult, + queryParams.getSort(), + queryParams.getScoreMode(), + fixTransformOffset(luceneIndex, queryParams.getOffset()), + queryParams.getLimit() + )); + } + + /** + * + * @param queryParams the limit is valid for each lucene instance. + * If you have 15 instances, the number of elements returned + * can be at most limit * 15 + * @return the collection has one or more flux + */ + @Override + public Mono> searchWithValues(ClientQueryParams> queryParams, + ValueGetter valueGetter) { + return luceneIndex + .search(resolveSnapshot(queryParams.getSnapshot()), fixOffset(luceneIndex, queryParams.toQueryParams()), indicizer.getKeyFieldName()) + .flatMap(llSearchResult -> this.transformLuceneResultWithValues(llSearchResult, + queryParams.getSort(), + queryParams.getScoreMode(), + fixTransformOffset(luceneIndex, queryParams.getOffset()), + queryParams.getLimit(), + valueGetter + )); + } + + @Override + public Mono count(@Nullable CompositeSnapshot snapshot, Query query) { + return this.search(ClientQueryParams.>builder().snapshot(snapshot).query(query).limit(0).build()) + .map(SearchResultKeys::getTotalHitsCount); + } + + @Override + public boolean isLowMemoryMode() { + return luceneIndex.isLowMemoryMode(); + } + + @Override + public Mono close() { + return luceneIndex.close(); + } + + /** + * Flush writes to disk + */ + @Override + public Mono flush() { + return luceneIndex.flush(); + } + + /** + * Refresh index searcher + */ + @Override + public Mono refresh() { + return luceneIndex.refresh(); + } + + @Override + public Mono takeSnapshot() { + return luceneIndex.takeSnapshot(); + } + + @Override + public Mono releaseSnapshot(LLSnapshot snapshot) { + return luceneIndex.releaseSnapshot(snapshot); + } +} diff --git a/src/main/java/it/cavallium/dbengine/client/SearchResultKeys.java b/src/main/java/it/cavallium/dbengine/client/SearchResultKeys.java index d42390b..bb4d011 100644 --- a/src/main/java/it/cavallium/dbengine/client/SearchResultKeys.java +++ b/src/main/java/it/cavallium/dbengine/client/SearchResultKeys.java @@ -4,6 +4,7 @@ import it.cavallium.dbengine.database.collections.Joiner.ValueGetter; import lombok.Value; import reactor.core.publisher.Flux; +@SuppressWarnings("unused") @Value public class SearchResultKeys { diff --git a/src/main/java/it/cavallium/dbengine/client/query/QueryGson.java b/src/main/java/it/cavallium/dbengine/client/query/QueryGson.java index 9ecb66c..59e235f 100644 --- a/src/main/java/it/cavallium/dbengine/client/query/QueryGson.java +++ b/src/main/java/it/cavallium/dbengine/client/query/QueryGson.java @@ -39,6 +39,7 @@ public class QueryGson { return gsonBuilder; } + @SuppressWarnings("DuplicatedCode") public static class DbClassesGenericSerializer implements JsonSerializer, JsonDeserializer { private final BiMap> subTypes; diff --git a/src/main/java/it/cavallium/dbengine/client/query/QueryUtils.java b/src/main/java/it/cavallium/dbengine/client/query/QueryUtils.java index b138ea1..08faccb 100644 --- a/src/main/java/it/cavallium/dbengine/client/query/QueryUtils.java +++ b/src/main/java/it/cavallium/dbengine/client/query/QueryUtils.java @@ -15,20 +15,12 @@ import it.cavallium.dbengine.client.query.current.data.TermPosition; import it.cavallium.dbengine.client.query.current.data.TermQuery; import it.cavallium.dbengine.lucene.LuceneUtils; import it.cavallium.dbengine.lucene.analyzer.TextFieldsAnalyzer; -import java.io.IOException; -import java.io.StringReader; import java.util.ArrayList; -import java.util.LinkedList; -import java.util.List; -import org.apache.lucene.analysis.Analyzer; -import org.apache.lucene.analysis.TokenStream; -import org.apache.lucene.analysis.tokenattributes.PositionIncrementAttribute; -import org.apache.lucene.analysis.tokenattributes.TermToBytesRefAttribute; -import org.apache.lucene.index.Term; import org.apache.lucene.search.BooleanClause; import org.apache.lucene.util.QueryBuilder; import org.jetbrains.annotations.NotNull; +@SuppressWarnings("unused") public class QueryUtils { public static Query approximateSearch(TextFieldsAnalyzer preferredAnalyzer, String field, String text) { @@ -100,28 +92,4 @@ public class QueryUtils { .toArray(TermAndBoost[]::new) ); } - - private static List getTerms(TextFieldsAnalyzer preferredAnalyzer, String field, String text) throws IOException { - Analyzer analyzer = LuceneUtils.getAnalyzer(preferredAnalyzer); - TokenStream ts = analyzer.tokenStream(field, new StringReader(text)); - return getTerms(ts, field); - } - - private static List getTerms(TokenStream ts, String field) throws IOException { - TermToBytesRefAttribute charTermAttr = ts.addAttribute(TermToBytesRefAttribute.class); - PositionIncrementAttribute positionIncrementTermAttr = ts.addAttribute(PositionIncrementAttribute.class); - List terms = new LinkedList<>(); - try (ts) { - ts.reset(); // Resets this stream to the beginning. (Required) - int termPosition = -1; - while (ts.incrementToken()) { - var tokenPositionIncrement = positionIncrementTermAttr.getPositionIncrement(); - termPosition += tokenPositionIncrement; - terms.add(TermPosition.of(QueryParser.toQueryTerm(new Term(field, charTermAttr.getBytesRef())), termPosition)); - } - ts.end(); // Perform end-of-stream operations, e.g. set the final offset. - } - // Release resources associated with this stream. - return terms; - } } diff --git a/src/main/java/it/cavallium/dbengine/database/EnglishItalianStopFilter.java b/src/main/java/it/cavallium/dbengine/database/EnglishItalianStopFilter.java index 56853cf..6e322f5 100644 --- a/src/main/java/it/cavallium/dbengine/database/EnglishItalianStopFilter.java +++ b/src/main/java/it/cavallium/dbengine/database/EnglishItalianStopFilter.java @@ -1010,6 +1010,7 @@ public class EnglishItalianStopFilter extends StopFilter { .collect(Collectors.toSet())); } + @SuppressWarnings("unused") public static CharArraySet getStopWords() { return stopWords; } diff --git a/src/main/java/it/cavallium/dbengine/database/LLCollectionStatisticsGetter.java b/src/main/java/it/cavallium/dbengine/database/LLCollectionStatisticsGetter.java deleted file mode 100644 index 33aa869..0000000 --- a/src/main/java/it/cavallium/dbengine/database/LLCollectionStatisticsGetter.java +++ /dev/null @@ -1,9 +0,0 @@ -package it.cavallium.dbengine.database; - -import java.io.IOException; -import org.apache.lucene.search.CollectionStatistics; - -public interface LLCollectionStatisticsGetter { - - CollectionStatistics collectionStatistics(String field) throws IOException; -} diff --git a/src/main/java/it/cavallium/dbengine/database/LLItem.java b/src/main/java/it/cavallium/dbengine/database/LLItem.java index 10ba409..26c4cca 100644 --- a/src/main/java/it/cavallium/dbengine/database/LLItem.java +++ b/src/main/java/it/cavallium/dbengine/database/LLItem.java @@ -14,42 +14,35 @@ public class LLItem { private final LLType type; private final String name; private final byte[] data; - // nullable - private final byte[] data2; - public LLItem(LLType type, String name, byte[] data, byte[] data2) { + public LLItem(LLType type, String name, byte[] data) { this.type = type; this.name = name; this.data = data; - this.data2 = data2; } private LLItem(LLType type, String name, String data) { this.type = type; this.name = name; this.data = data.getBytes(StandardCharsets.UTF_8); - this.data2 = null; } private LLItem(LLType type, String name, int data) { this.type = type; this.name = name; this.data = Ints.toByteArray(data); - this.data2 = null; } private LLItem(LLType type, String name, float data) { this.type = type; this.name = name; - this.data = ByteBuffer.allocate(4).putFloat(data).array();; - this.data2 = null; + this.data = ByteBuffer.allocate(4).putFloat(data).array(); } private LLItem(LLType type, String name, long data) { this.type = type; this.name = name; this.data = Longs.toByteArray(data); - this.data2 = null; } public static LLItem newIntPoint(String name, int data) { @@ -96,10 +89,6 @@ public class LLItem { return data; } - public byte[] getData2() { - return data2; - } - @Override public boolean equals(Object o) { if (this == o) { @@ -111,15 +100,13 @@ public class LLItem { LLItem llItem = (LLItem) o; return type == llItem.type && Objects.equals(name, llItem.name) && - Arrays.equals(data, llItem.data) && - Arrays.equals(data2, llItem.data2); + Arrays.equals(data, llItem.data); } @Override public int hashCode() { int result = Objects.hash(type, name); result = 31 * result + Arrays.hashCode(data); - result = 31 * result + Arrays.hashCode(data2); return result; } @@ -129,12 +116,9 @@ public class LLItem { .add("type=" + type) .add("name='" + name + "'"); if (data != null && data.length > 0) { - sj.add("data=" + new String(data)); + sj.add("data=" + new String(data)); } - if (data2 != null && data2.length > 0) { - sj.add("data2=" + new String(data2)); - } - return sj.toString(); + return sj.toString(); } public String stringValue() { diff --git a/src/main/java/it/cavallium/dbengine/database/LLSort.java b/src/main/java/it/cavallium/dbengine/database/LLSort.java index 2817d7f..f52db22 100644 --- a/src/main/java/it/cavallium/dbengine/database/LLSort.java +++ b/src/main/java/it/cavallium/dbengine/database/LLSort.java @@ -2,6 +2,7 @@ package it.cavallium.dbengine.database; import java.util.Objects; +@SuppressWarnings("unused") public class LLSort { private final String fieldName; diff --git a/src/main/java/it/cavallium/dbengine/database/LLTopKeys.java b/src/main/java/it/cavallium/dbengine/database/LLTopKeys.java index 6c926e6..28dd3ac 100644 --- a/src/main/java/it/cavallium/dbengine/database/LLTopKeys.java +++ b/src/main/java/it/cavallium/dbengine/database/LLTopKeys.java @@ -3,6 +3,7 @@ package it.cavallium.dbengine.database; import java.util.Arrays; import java.util.Objects; +@SuppressWarnings("unused") public class LLTopKeys { private final long totalHitsCount; diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseEmpty.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseEmpty.java index c73ce4a..e460f62 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseEmpty.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseEmpty.java @@ -6,7 +6,7 @@ import org.jetbrains.annotations.NotNull; public class DatabaseEmpty { - @SuppressWarnings("unused") + @SuppressWarnings({"unused", "InstantiationOfUtilityClass"}) public static final Nothing NOTHING = new Nothing(); private static final byte[] NOTHING_BYTES = new byte[0]; private static final Serializer NOTHING_SERIALIZER = new Serializer<>() { diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryDeep.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryDeep.java index 767f1d0..9ab9735 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryDeep.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryDeep.java @@ -50,24 +50,20 @@ public class DatabaseMapDictionaryDeep> implem } static byte[] firstRangeKey(byte[] prefixKey, int prefixLength, int suffixLength, int extLength) { - return fillKeySuffixAndExt(prefixKey, prefixLength, suffixLength, extLength, (byte) 0x00); + return zeroFillKeySuffixAndExt(prefixKey, prefixLength, suffixLength, extLength); } static byte[] nextRangeKey(byte[] prefixKey, int prefixLength, int suffixLength, int extLength) { - byte[] nonIncremented = fillKeySuffixAndExt(prefixKey, prefixLength, suffixLength, extLength, (byte) 0x00); + byte[] nonIncremented = zeroFillKeySuffixAndExt(prefixKey, prefixLength, suffixLength, extLength); return incrementPrefix(nonIncremented, prefixLength); } - protected static byte[] fillKeySuffixAndExt(byte[] prefixKey, - int prefixLength, - int suffixLength, - int extLength, - byte fillValue) { + protected static byte[] zeroFillKeySuffixAndExt(byte[] prefixKey, int prefixLength, int suffixLength, int extLength) { assert prefixKey.length == prefixLength; assert suffixLength > 0; assert extLength >= 0; byte[] result = Arrays.copyOf(prefixKey, prefixLength + suffixLength + extLength); - Arrays.fill(result, prefixLength, result.length, fillValue); + Arrays.fill(result, prefixLength, result.length, (byte) 0); return result; } @@ -76,7 +72,7 @@ public class DatabaseMapDictionaryDeep> implem int prefixLength, int suffixLength, int extLength) { - return fillKeyExt(prefixKey, suffixKey, prefixLength, suffixLength, extLength, (byte) 0x00); + return zeroFillKeyExt(prefixKey, suffixKey, prefixLength, suffixLength, extLength); } static byte[] nextRangeKey(byte[] prefixKey, @@ -84,23 +80,22 @@ public class DatabaseMapDictionaryDeep> implem int prefixLength, int suffixLength, int extLength) { - byte[] nonIncremented = fillKeyExt(prefixKey, suffixKey, prefixLength, suffixLength, extLength, (byte) 0x00); + byte[] nonIncremented = zeroFillKeyExt(prefixKey, suffixKey, prefixLength, suffixLength, extLength); return incrementPrefix(nonIncremented, prefixLength + suffixLength); } - protected static byte[] fillKeyExt(byte[] prefixKey, + protected static byte[] zeroFillKeyExt(byte[] prefixKey, byte[] suffixKey, int prefixLength, int suffixLength, - int extLength, - byte fillValue) { + int extLength) { assert prefixKey.length == prefixLength; assert suffixKey.length == suffixLength; assert suffixLength > 0; assert extLength >= 0; byte[] result = Arrays.copyOf(prefixKey, prefixLength + suffixLength + extLength); System.arraycopy(suffixKey, 0, result, prefixLength, suffixLength); - Arrays.fill(result, prefixLength + suffixLength, result.length, fillValue); + Arrays.fill(result, prefixLength + suffixLength, result.length, (byte) 0); return result; } @@ -210,7 +205,6 @@ public class DatabaseMapDictionaryDeep> implem return dictionary.isRangeEmpty(resolveSnapshot(snapshot), range); } - @SuppressWarnings("ReactiveStreamsUnusedPublisher") @Override public Mono at(@Nullable CompositeSnapshot snapshot, T keySuffix) { byte[] keySuffixData = serializeSuffix(keySuffix); diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMappable.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMappable.java deleted file mode 100644 index 5b8713c..0000000 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMappable.java +++ /dev/null @@ -1,6 +0,0 @@ -package it.cavallium.dbengine.database.collections; - -public interface DatabaseMappable> { - - DatabaseStageMap map(); -} diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSetDictionary.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSetDictionary.java index 6bebfb1..c86d55d 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSetDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSetDictionary.java @@ -10,6 +10,7 @@ import java.util.Set; import org.jetbrains.annotations.Nullable; import reactor.core.publisher.Mono; +@SuppressWarnings("unused") public class DatabaseSetDictionary extends DatabaseMapDictionaryDeep> { protected DatabaseSetDictionary(LLDictionary dictionary, diff --git a/src/main/java/it/cavallium/dbengine/database/collections/Joiner.java b/src/main/java/it/cavallium/dbengine/database/collections/Joiner.java index 2902e6e..5e8ccd1 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/Joiner.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/Joiner.java @@ -2,7 +2,7 @@ package it.cavallium.dbengine.database.collections; import reactor.core.publisher.Mono; -public interface Joiner { +public interface Joiner { interface ValueGetter { @@ -19,9 +19,6 @@ public interface Joiner { * * Can return Mono error IOException */ - Mono join(ValueGetter dbValueGetter, DBVALUE value); + Mono join(ValueGetter dbValueGetter, DB_VALUE value); - static Joiner direct() { - return (dbValueGetter, value) -> Mono.just(value); - }; } diff --git a/src/main/java/it/cavallium/dbengine/database/collections/JoinerBlocking.java b/src/main/java/it/cavallium/dbengine/database/collections/JoinerBlocking.java index 331bb6a..815d3c9 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/JoinerBlocking.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/JoinerBlocking.java @@ -2,8 +2,7 @@ package it.cavallium.dbengine.database.collections; import java.io.IOException; -@SuppressWarnings("SpellCheckingInspection") -public interface JoinerBlocking { +public interface JoinerBlocking { interface ValueGetterBlocking { VALUE get(KEY key) throws IOException; @@ -14,9 +13,6 @@ public interface JoinerBlocking { * Good examples: message id, send date, ... * Bad examples: message content, views, edited, ... */ - JOINEDVALUE join(ValueGetterBlocking dbValueGetter, DBVALUE value) throws IOException; + JOINED_VALUE join(ValueGetterBlocking dbValueGetter, DB_VALUE value) throws IOException; - static JoinerBlocking direct() { - return (dbValueGetter, value) -> value; - }; } diff --git a/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterSingle.java b/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterSingle.java index 2f68053..e199f40 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterSingle.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterSingle.java @@ -40,12 +40,7 @@ public class SubStageGetterSingle implements SubStageGetter { - return new DatabaseSingle<>(dictionary, - keyPrefix, - serializer - ); - })); + .then(Mono.fromSupplier(() -> new DatabaseSingle<>(dictionary, keyPrefix, serializer))); } @Override diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java index fac377d..61a3292 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java @@ -44,6 +44,8 @@ import org.warp.commonutils.log.LoggerFactory; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.scheduler.Scheduler; +import reactor.util.function.Tuple3; +import reactor.util.function.Tuples; @NotAtomic public class LLLocalDictionary implements LLDictionary { @@ -299,6 +301,7 @@ public class LLLocalDictionary implements LLDictionary { Optional newData = value.apply(prevData); if (prevData.isPresent() && newData.isEmpty()) { + //noinspection DuplicatedCode if (updateMode == UpdateMode.ALLOW) { var ws = lock.tryConvertToWriteLock(stamp); if (ws != 0) { @@ -315,6 +318,7 @@ public class LLLocalDictionary implements LLDictionary { db.delete(cfh, key); } else if (newData.isPresent() && (prevData.isEmpty() || !Arrays.equals(prevData.get(), newData.get()))) { + //noinspection DuplicatedCode if (updateMode == UpdateMode.ALLOW) { var ws = lock.tryConvertToWriteLock(stamp); if (ws != 0) { @@ -519,12 +523,10 @@ public class LLLocalDictionary implements LLDictionary { @NotNull private Mono> putEntryToWriteBatch(Entry newEntry, - boolean getOldValues, - boolean existsAlmostCertainly, - CappedWriteBatch writeBatch) { + boolean getOldValues, CappedWriteBatch writeBatch) { Mono getOldValueMono; if (getOldValues) { - getOldValueMono = get(null, newEntry.getKey(), existsAlmostCertainly); + getOldValueMono = get(null, newEntry.getKey(), false); } else { getOldValueMono = Mono.empty(); } @@ -656,7 +658,7 @@ public class LLLocalDictionary implements LLDictionary { }) .subscribeOn(dbScheduler) .thenMany(entries) - .flatMapSequential(newEntry -> putEntryToWriteBatch(newEntry, getOldValues, false, writeBatch)), + .flatMapSequential(newEntry -> putEntryToWriteBatch(newEntry, getOldValues, writeBatch)), writeBatch -> Mono .fromCallable(() -> { try (writeBatch) { @@ -670,29 +672,6 @@ public class LLLocalDictionary implements LLDictionary { .onErrorMap(cause -> new IOException("Failed to write range", cause)); } - private void deleteSmallRange(LLRange range) - throws RocksDBException { - var readOpts = getReadOptions(null); - readOpts.setFillCache(false); - if (range.hasMin()) { - readOpts.setIterateLowerBound(new Slice(range.getMin())); - } - if (range.hasMax()) { - readOpts.setIterateUpperBound(new Slice(range.getMax())); - } - try (var rocksIterator = db.newIterator(cfh, readOpts)) { - if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) { - rocksIterator.seek(range.getMin()); - } else { - rocksIterator.seekToFirst(); - } - while (rocksIterator.isValid()) { - db.delete(cfh, rocksIterator.key()); - rocksIterator.next(); - } - } - } - private void deleteSmallRangeWriteBatch(CappedWriteBatch writeBatch, LLRange range) throws RocksDBException { var readOpts = getReadOptions(null); @@ -716,29 +695,6 @@ public class LLLocalDictionary implements LLDictionary { } } - private static byte[] incrementLexicographically(byte[] key) { - boolean remainder = true; - int prefixLength = key.length; - final byte ff = (byte) 0xFF; - for (int i = prefixLength - 1; i >= 0; i--) { - if (key[i] != ff) { - key[i]++; - remainder = false; - break; - } else { - key[i] = 0x00; - remainder = true; - } - } - - if (remainder) { - Arrays.fill(key, 0, prefixLength, (byte) 0xFF); - return Arrays.copyOf(key, key.length + 1); - } else { - return key; - } - } - public Mono clear() { return Mono .fromCallable(() -> { @@ -1025,4 +981,32 @@ public class LLLocalDictionary implements LLDictionary { .onErrorMap(cause -> new IOException("Failed to delete " + range.toString(), cause)) .subscribeOn(dbScheduler); } + + @NotNull + public static Tuple3, Optional> getRocksIterator(ReadOptions readOptions, + LLRange range, + RocksDB db, + ColumnFamilyHandle cfh) { + Slice sliceMin; + Slice sliceMax; + if (range.hasMin()) { + sliceMin = new Slice(range.getMin()); + readOptions.setIterateLowerBound(sliceMin); + } else { + sliceMin = null; + } + if (range.hasMax()) { + sliceMax = new Slice(range.getMax()); + readOptions.setIterateUpperBound(sliceMax); + } else { + sliceMax = null; + } + var rocksIterator = db.newIterator(cfh, readOptions); + if (!PREFER_SEEK_TO_FIRST && range.hasMin()) { + rocksIterator.seek(range.getMin()); + } else { + rocksIterator.seekToFirst(); + } + return Tuples.of(rocksIterator, Optional.ofNullable(sliceMin), Optional.ofNullable(sliceMax)); + } } diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalGroupedEntryReactiveRocksIterator.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalGroupedEntryReactiveRocksIterator.java index f91244c..685733e 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalGroupedEntryReactiveRocksIterator.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalGroupedEntryReactiveRocksIterator.java @@ -16,7 +16,7 @@ public class LLLocalGroupedEntryReactiveRocksIterator extends LLRange range, ReadOptions readOptions, String debugName) { - super(db, cfh, prefixLength, range, readOptions, false, true, debugName); + super(db, cfh, prefixLength, range, readOptions, false, true); } @Override diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalGroupedKeyReactiveRocksIterator.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalGroupedKeyReactiveRocksIterator.java index e6ddf86..d881f35 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalGroupedKeyReactiveRocksIterator.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalGroupedKeyReactiveRocksIterator.java @@ -13,7 +13,7 @@ public class LLLocalGroupedKeyReactiveRocksIterator extends LLLocalGroupedReacti LLRange range, ReadOptions readOptions, String debugName) { - super(db, cfh, prefixLength, range, readOptions, true, false, debugName); + super(db, cfh, prefixLength, range, readOptions, true, false); } @Override diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalGroupedReactiveRocksIterator.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalGroupedReactiveRocksIterator.java index 92b0018..a575fcd 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalGroupedReactiveRocksIterator.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalGroupedReactiveRocksIterator.java @@ -1,17 +1,16 @@ package it.cavallium.dbengine.database.disk; +import static it.cavallium.dbengine.database.disk.LLLocalDictionary.getRocksIterator; + import it.cavallium.dbengine.database.LLRange; import it.unimi.dsi.fastutil.objects.ObjectArrayList; import java.util.Arrays; import java.util.List; -import java.util.Optional; import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.ReadOptions; import org.rocksdb.RocksDB; import org.rocksdb.RocksMutableObject; -import org.rocksdb.Slice; import reactor.core.publisher.Flux; -import reactor.util.function.Tuples; public abstract class LLLocalGroupedReactiveRocksIterator { @@ -24,7 +23,6 @@ public abstract class LLLocalGroupedReactiveRocksIterator { private final ReadOptions readOptions; private final boolean canFillCache; private final boolean readValues; - private final String debugName; public LLLocalGroupedReactiveRocksIterator(RocksDB db, ColumnFamilyHandle cfh, @@ -32,8 +30,7 @@ public abstract class LLLocalGroupedReactiveRocksIterator { LLRange range, ReadOptions readOptions, boolean canFillCache, - boolean readValues, - String debugName) { + boolean readValues) { this.db = db; this.cfh = cfh; this.prefixLength = prefixLength; @@ -41,37 +38,15 @@ public abstract class LLLocalGroupedReactiveRocksIterator { this.readOptions = readOptions; this.canFillCache = canFillCache; this.readValues = readValues; - this.debugName = debugName; } - @SuppressWarnings("Convert2MethodRef") public Flux> flux() { return Flux .generate(() -> { var readOptions = new ReadOptions(this.readOptions); readOptions.setFillCache(canFillCache && range.hasMin() && range.hasMax()); - Slice sliceMin; - Slice sliceMax; - if (range.hasMin()) { - sliceMin = new Slice(range.getMin()); - readOptions.setIterateLowerBound(sliceMin); - } else { - sliceMin = null; - } - if (range.hasMax()) { - sliceMax = new Slice(range.getMax()); - readOptions.setIterateUpperBound(sliceMax); - } else { - sliceMax = null; - } - var rocksIterator = db.newIterator(cfh, readOptions); - if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) { - rocksIterator.seek(range.getMin()); - } else { - rocksIterator.seekToFirst(); - } - return Tuples.of(rocksIterator, Optional.ofNullable(sliceMin), Optional.ofNullable(sliceMax)); + return getRocksIterator(readOptions, range, db, cfh); }, (tuple, sink) -> { var rocksIterator = tuple.getT1(); ObjectArrayList values = new ObjectArrayList<>(); diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyPrefixReactiveRocksIterator.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyPrefixReactiveRocksIterator.java index 8268761..fc41939 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyPrefixReactiveRocksIterator.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyPrefixReactiveRocksIterator.java @@ -2,14 +2,11 @@ package it.cavallium.dbengine.database.disk; import it.cavallium.dbengine.database.LLRange; import java.util.Arrays; -import java.util.Optional; import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.ReadOptions; import org.rocksdb.RocksDB; import org.rocksdb.RocksMutableObject; -import org.rocksdb.Slice; import reactor.core.publisher.Flux; -import reactor.util.function.Tuples; public class LLLocalKeyPrefixReactiveRocksIterator { @@ -48,27 +45,7 @@ public class LLLocalKeyPrefixReactiveRocksIterator { //readOptions.setReadaheadSize(2 * 1024 * 1024); readOptions.setFillCache(canFillCache); } - Slice sliceMin; - Slice sliceMax; - if (range.hasMin()) { - sliceMin = new Slice(range.getMin()); - readOptions.setIterateLowerBound(sliceMin); - } else { - sliceMin = null; - } - if (range.hasMax()) { - sliceMax = new Slice(range.getMax()); - readOptions.setIterateUpperBound(sliceMax); - } else { - sliceMax = null; - } - var rocksIterator = db.newIterator(cfh, readOptions); - if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) { - rocksIterator.seek(range.getMin()); - } else { - rocksIterator.seekToFirst(); - } - return Tuples.of(rocksIterator, Optional.ofNullable(sliceMin), Optional.ofNullable(sliceMax)); + return LLLocalDictionary.getRocksIterator(readOptions, range, db, cfh); }, (tuple, sink) -> { var rocksIterator = tuple.getT1(); byte[] firstGroupKey = null; @@ -96,4 +73,5 @@ public class LLLocalKeyPrefixReactiveRocksIterator { tuple.getT3().ifPresent(RocksMutableObject::close); }); } + } diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java index 7ea8660..564b151 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java @@ -143,7 +143,8 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { // end force flush } - private void compactDb(RocksDB db, List handles) throws RocksDBException { + @SuppressWarnings("unused") + private void compactDb(RocksDB db, List handles) { // force compact the database for (ColumnFamilyHandle cfh : handles) { var t = new Thread(() -> { diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java index 783b813..65c01c1 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java @@ -95,10 +95,13 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { private static final Supplier lowMemorySchedulerSupplier = Suppliers.memoize(() -> Schedulers.newBoundedElastic(1, Schedulers.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE, "lucene-low-memory", Integer.MAX_VALUE))::get; + @SuppressWarnings("FieldCanBeLocal") private final Supplier querySchedulerSupplier = USE_STANDARD_SCHEDULERS ? Schedulers::boundedElastic : Suppliers.memoize(() -> boundedSchedulerSupplier.apply("query"))::get; + @SuppressWarnings("FieldCanBeLocal") private final Supplier blockingSchedulerSupplier = USE_STANDARD_SCHEDULERS ? Schedulers::boundedElastic : Suppliers.memoize(() -> boundedSchedulerSupplier.apply("blocking"))::get; + @SuppressWarnings("FieldCanBeLocal") private final Supplier blockingLuceneSearchSchedulerSupplier = USE_STANDARD_SCHEDULERS ? Schedulers::boundedElastic : Suppliers.memoize(() -> boundedSchedulerSupplier.apply("search-blocking"))::get; /** @@ -603,15 +606,9 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { AtomicBoolean cancelled = new AtomicBoolean(); Semaphore requests = new Semaphore(0); - sink.onDispose(() -> { - cancelled.set(true); - }); - sink.onCancel(() -> { - cancelled.set(true); - }); - sink.onRequest(delta -> { - requests.release((int) Math.min(delta, Integer.MAX_VALUE)); - }); + sink.onDispose(() -> cancelled.set(true)); + sink.onCancel(() -> cancelled.set(true)); + sink.onRequest(delta -> requests.release((int) Math.min(delta, Integer.MAX_VALUE))); try { //noinspection BlockingMethodInNonBlockingContext diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMultiLuceneIndex.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMultiLuceneIndex.java index e268c8f..83e5c90 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMultiLuceneIndex.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMultiLuceneIndex.java @@ -247,6 +247,7 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex { distributedPre = Mono.empty(); } + //noinspection DuplicatedCode return distributedPre.then(Flux .fromArray(luceneIndices) .index() @@ -314,6 +315,7 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex { ) .then(); } + //noinspection DuplicatedCode return distributedPre .then(Flux .fromArray(luceneIndices) diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalReactiveRocksIterator.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalReactiveRocksIterator.java index 82cb36d..1054a66 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalReactiveRocksIterator.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalReactiveRocksIterator.java @@ -1,14 +1,13 @@ package it.cavallium.dbengine.database.disk; +import static it.cavallium.dbengine.database.disk.LLLocalDictionary.getRocksIterator; + import it.cavallium.dbengine.database.LLRange; -import java.util.Optional; import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.ReadOptions; import org.rocksdb.RocksDB; import org.rocksdb.RocksMutableObject; -import org.rocksdb.Slice; import reactor.core.publisher.Flux; -import reactor.util.function.Tuples; public abstract class LLLocalReactiveRocksIterator { @@ -40,27 +39,7 @@ public abstract class LLLocalReactiveRocksIterator { readOptions.setReadaheadSize(2 * 1024 * 1024); readOptions.setFillCache(false); } - Slice sliceMin; - Slice sliceMax; - if (range.hasMin()) { - sliceMin = new Slice(range.getMin()); - readOptions.setIterateLowerBound(sliceMin); - } else { - sliceMin = null; - } - if (range.hasMax()) { - sliceMax = new Slice(range.getMax()); - readOptions.setIterateUpperBound(sliceMax); - } else { - sliceMax = null; - } - var rocksIterator = db.newIterator(cfh, readOptions); - if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) { - rocksIterator.seek(range.getMin()); - } else { - rocksIterator.seekToFirst(); - } - return Tuples.of(rocksIterator, Optional.ofNullable(sliceMin), Optional.ofNullable(sliceMax)); + return getRocksIterator(readOptions, range, db, cfh); }, (tuple, sink) -> { var rocksIterator = tuple.getT1(); if (rocksIterator.isValid()) { diff --git a/src/main/java/it/cavallium/dbengine/database/serialization/CodecSerializer.java b/src/main/java/it/cavallium/dbengine/database/serialization/CodecSerializer.java index 7305187..0ad8f87 100644 --- a/src/main/java/it/cavallium/dbengine/database/serialization/CodecSerializer.java +++ b/src/main/java/it/cavallium/dbengine/database/serialization/CodecSerializer.java @@ -71,6 +71,7 @@ public class CodecSerializer implements Serializer { } } + @SuppressWarnings("unused") public int getCodecHeadersBytes() { if (microCodecs) { return Byte.BYTES; diff --git a/src/main/java/it/cavallium/dbengine/lucene/LuceneUtils.java b/src/main/java/it/cavallium/dbengine/lucene/LuceneUtils.java index d117ae7..ffcc4ab 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/LuceneUtils.java +++ b/src/main/java/it/cavallium/dbengine/lucene/LuceneUtils.java @@ -247,8 +247,11 @@ public class LuceneUtils { Long limit) { return mappedKeys.reduce( new SearchResultKeys<>(Flux.empty(), 0L), - (a, b) -> new SearchResultKeys(LuceneUtils - .mergeStream(Flux.just(a.getResults(), b.getResults()), sort, offset, limit), a.getTotalHitsCount() + b.getTotalHitsCount()) + (a, b) -> new SearchResultKeys<>(LuceneUtils.mergeStream(Flux.just(a.getResults(), b.getResults()), + sort, + offset, + limit + ), a.getTotalHitsCount() + b.getTotalHitsCount()) ); } @@ -258,8 +261,11 @@ public class LuceneUtils { Long limit) { return mappedKeys.reduce( new SearchResult<>(Flux.empty(), 0L), - (a, b) -> new SearchResult(LuceneUtils - .mergeStream(Flux.just(a.getResults(), b.getResults()), sort, offset, limit), a.getTotalHitsCount() + b.getTotalHitsCount()) + (a, b) -> new SearchResult<>(LuceneUtils.mergeStream(Flux.just(a.getResults(), b.getResults()), + sort, + offset, + limit + ), a.getTotalHitsCount() + b.getTotalHitsCount()) ); } diff --git a/src/main/java/it/cavallium/dbengine/lucene/analyzer/NCharGramAnalyzer.java b/src/main/java/it/cavallium/dbengine/lucene/analyzer/NCharGramAnalyzer.java index bd4d6df..cffc9a5 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/analyzer/NCharGramAnalyzer.java +++ b/src/main/java/it/cavallium/dbengine/lucene/analyzer/NCharGramAnalyzer.java @@ -26,11 +26,10 @@ public class NCharGramAnalyzer extends Analyzer { TokenStream tokenStream; if (words) { tokenizer = new StandardTokenizer(); - tokenStream = tokenizer; } else { tokenizer = new KeywordTokenizer(); - tokenStream = tokenizer; } + tokenStream = tokenizer; tokenStream = LuceneUtils.newCommonFilter(tokenStream, words); tokenStream = new NGramTokenFilter(tokenStream, minGram, maxGram, false); diff --git a/src/main/java/it/cavallium/dbengine/lucene/analyzer/NCharGramEdgeAnalyzer.java b/src/main/java/it/cavallium/dbengine/lucene/analyzer/NCharGramEdgeAnalyzer.java index f5965e4..ee613ec 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/analyzer/NCharGramEdgeAnalyzer.java +++ b/src/main/java/it/cavallium/dbengine/lucene/analyzer/NCharGramEdgeAnalyzer.java @@ -26,11 +26,10 @@ public class NCharGramEdgeAnalyzer extends Analyzer { TokenStream tokenStream; if (words) { tokenizer = new StandardTokenizer(); - tokenStream = tokenizer; } else { tokenizer = new KeywordTokenizer(); - tokenStream = tokenizer; } + tokenStream = tokenizer; tokenStream = LuceneUtils.newCommonFilter(tokenStream, words); tokenStream = new EdgeNGramTokenFilter(tokenStream, minGram, maxGram, false); diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/AllowOnlyQueryParsingCollectorStreamSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/AllowOnlyQueryParsingCollectorStreamSearcher.java index 772666a..b45d2a2 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/AllowOnlyQueryParsingCollectorStreamSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/AllowOnlyQueryParsingCollectorStreamSearcher.java @@ -80,12 +80,12 @@ public class AllowOnlyQueryParsingCollectorStreamSearcher implements LuceneStrea return new LuceneSearchInstance() { @Override - public long getTotalHitsCount() throws IOException { + public long getTotalHitsCount() { throw new IllegalArgumentException("Total hits consumer not allowed"); } @Override - public void getResults(ResultItemConsumer consumer) throws IOException { + public void getResults(ResultItemConsumer consumer) { throw new IllegalArgumentException("Results consumer not allowed"); } }; diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/LuceneStreamSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/LuceneStreamSearcher.java index b34251b..4658cab 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/LuceneStreamSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/LuceneStreamSearcher.java @@ -24,8 +24,6 @@ public interface LuceneStreamSearcher { * @param scoreMode score mode * @param minCompetitiveScore minimum score accepted * @param keyFieldName the name of the key field - * @param resultsConsumer the consumer of results - * @param totalHitsConsumer the consumer of total count of results * @throws IOException thrown if there is an error */ LuceneSearchInstance search(IndexSearcher indexSearcher, diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/ParallelCollectorStreamSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/ParallelCollectorStreamSearcher.java index 0c31fc1..99e4bb1 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/ParallelCollectorStreamSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/ParallelCollectorStreamSearcher.java @@ -46,7 +46,7 @@ public class ParallelCollectorStreamSearcher implements LuceneStreamSearcher { long totalHitsCount = countStreamSearcher.countLong(indexSearcher, query); @Override - public long getTotalHitsCount() throws IOException { + public long getTotalHitsCount() { return totalHitsCount; }