> documents);
-
- void deleteAll();
-
- // todo: add a filterer parameter?
- /**
- * @param queryParams the limit is valid for each lucene instance. If you have 15 instances, the number of elements
- * returned can be at most limit * 15
.
- *
- * The additional query will be used with the moreLikeThis query: "mltQuery AND additionalQuery"
- * @return the collection has one or more flux
- */
- Stream moreLikeThis(@Nullable LLSnapshot snapshot,
- QueryParams queryParams,
- @Nullable String keyFieldName,
- Multimap mltDocumentFields);
-
- // todo: add a filterer parameter?
- /**
- * @param queryParams the limit is valid for each lucene instance. If you have 15 instances, the number of elements
- * returned can be at most limit * 15
- * @return the collection has one or more flux
- */
- Stream search(@Nullable LLSnapshot snapshot,
- QueryParams queryParams,
- @Nullable String keyFieldName);
-
- /**
- * @return buckets with each value collected into one of the buckets
- */
- Buckets computeBuckets(@Nullable LLSnapshot snapshot,
- @NotNull List queries,
- @Nullable Query normalizationQuery,
- BucketParams bucketParams);
-
- default TotalHitsCount count(@Nullable LLSnapshot snapshot, Query query, @Nullable Duration timeout) {
- QueryParams params = QueryParams.of(query,
- 0,
- 0,
- NoSort.of(),
- false,
- timeout == null ? Long.MAX_VALUE : timeout.toMillis()
- );
- return collectOn(StreamUtils.LUCENE_POOL,
- this.search(snapshot, params, null).map(LLSearchResultShard::totalHitsCount),
- fastReducing(TotalHitsCount.of(0, true),
- (a, b) -> TotalHitsCount.of(a.value() + b.value(), a.exact() && b.exact())
- )
- );
- }
-
- boolean isLowMemoryMode();
-
- /**
- * Flush writes to disk.
- * This does not commit, it syncs the data to the disk
- */
- void flush();
-
- void waitForMerges();
-
- /**
- * Wait for the latest pending merge
- * This disables future merges until shutdown!
- */
- void waitForLastMerges();
-
- /**
- * Refresh index searcher
- */
- void refresh(boolean force);
-}
diff --git a/src/main/java/it/cavallium/dbengine/database/LLMultiDatabaseConnection.java b/src/main/java/it/cavallium/dbengine/database/LLMultiDatabaseConnection.java
index 7a08f93..7779fa3 100644
--- a/src/main/java/it/cavallium/dbengine/database/LLMultiDatabaseConnection.java
+++ b/src/main/java/it/cavallium/dbengine/database/LLMultiDatabaseConnection.java
@@ -1,23 +1,14 @@
package it.cavallium.dbengine.database;
import static it.cavallium.dbengine.utils.StreamUtils.collect;
-import static it.cavallium.dbengine.utils.StreamUtils.collectOn;
import static it.cavallium.dbengine.utils.StreamUtils.executing;
import com.google.common.collect.Multimap;
import io.micrometer.core.instrument.MeterRegistry;
import it.cavallium.dbengine.client.ConnectionSettings.ConnectionPart;
-import it.cavallium.dbengine.client.ConnectionSettings.ConnectionPart.ConnectionPartLucene;
import it.cavallium.dbengine.client.ConnectionSettings.ConnectionPart.ConnectionPartRocksDB;
-import it.cavallium.dbengine.lucene.LuceneHacks;
-import it.cavallium.dbengine.lucene.LuceneUtils;
import it.cavallium.dbengine.rpc.current.data.Column;
import it.cavallium.dbengine.rpc.current.data.DatabaseOptions;
-import it.cavallium.dbengine.rpc.current.data.LuceneIndexStructure;
-import it.cavallium.dbengine.rpc.current.data.LuceneOptions;
-import it.unimi.dsi.fastutil.ints.IntArrayList;
-import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
-import it.unimi.dsi.fastutil.ints.IntSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -28,31 +19,21 @@ import java.util.Set;
import java.util.StringJoiner;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
-import org.jetbrains.annotations.Nullable;
public class LLMultiDatabaseConnection implements LLDatabaseConnection {
private static final Logger LOG = LogManager.getLogger(LLMultiDatabaseConnection.class);
private final Map databaseShardConnections = new HashMap<>();
- private final Map luceneShardConnections = new HashMap<>();
private final Set allConnections = new HashSet<>();
private final LLDatabaseConnection defaultDatabaseConnection;
- private final LLDatabaseConnection defaultLuceneConnection;
private final LLDatabaseConnection anyConnection;
public LLMultiDatabaseConnection(Multimap subConnections) {
LLDatabaseConnection defaultDatabaseConnection = null;
- LLDatabaseConnection defaultLuceneConnection = null;
for (Entry entry : subConnections.entries()) {
var subConnectionSettings = entry.getKey();
var connectionPart = entry.getValue();
- if (connectionPart instanceof ConnectionPartLucene connectionPartLucene) {
- if (connectionPartLucene.name() == null) {
- defaultLuceneConnection = subConnectionSettings;
- } else {
- luceneShardConnections.put(connectionPartLucene.name(), subConnectionSettings);
- }
- } else if (connectionPart instanceof ConnectionPartRocksDB connectionPartRocksDB) {
+ if (connectionPart instanceof ConnectionPartRocksDB connectionPartRocksDB) {
if (connectionPartRocksDB.name() == null) {
defaultDatabaseConnection = subConnectionSettings;
} else {
@@ -63,21 +44,14 @@ public class LLMultiDatabaseConnection implements LLDatabaseConnection {
}
}
this.defaultDatabaseConnection = defaultDatabaseConnection;
- this.defaultLuceneConnection = defaultLuceneConnection;
if (defaultDatabaseConnection != null) {
anyConnection = defaultDatabaseConnection;
- } else if (defaultLuceneConnection != null) {
- anyConnection = defaultLuceneConnection;
} else {
anyConnection = subConnections.keySet().stream().findAny().orElse(null);
}
if (defaultDatabaseConnection != null) {
allConnections.add(defaultDatabaseConnection);
}
- if (defaultLuceneConnection != null) {
- allConnections.add(defaultLuceneConnection);
- }
- allConnections.addAll(luceneShardConnections.values());
allConnections.addAll(databaseShardConnections.values());
}
@@ -107,63 +81,6 @@ public class LLMultiDatabaseConnection implements LLDatabaseConnection {
return conn.getDatabase(name, columns, databaseOptions);
}
- @Override
- public LLLuceneIndex getLuceneIndex(String clusterName,
- LuceneIndexStructure indexStructure,
- it.cavallium.dbengine.rpc.current.data.IndicizerAnalyzers indicizerAnalyzers,
- it.cavallium.dbengine.rpc.current.data.IndicizerSimilarities indicizerSimilarities,
- LuceneOptions luceneOptions,
- @Nullable LuceneHacks luceneHacks) {
- IntSet registeredShards = new IntOpenHashSet();
- Map connectionToShardMap = new HashMap<>();
- for (int activeShard : indexStructure.activeShards()) {
- if (activeShard >= indexStructure.totalShards()) {
- throw new IllegalArgumentException(
- "ActiveShard " + activeShard + " is bigger than total shards count " + indexStructure.totalShards());
- }
- if (!registeredShards.add(activeShard)) {
- throw new IllegalArgumentException("ActiveShard " + activeShard + " has been specified twice");
- }
- var shardName = LuceneUtils.getStandardName(clusterName, activeShard);
- var connection = luceneShardConnections.getOrDefault(shardName, defaultLuceneConnection);
- Objects.requireNonNull(connection, "Null connection");
- connectionToShardMap.computeIfAbsent(connection, k -> new IntOpenHashSet()).add(activeShard);
- }
- if (connectionToShardMap.keySet().size() == 1) {
- return connectionToShardMap
- .keySet()
- .stream()
- .findFirst()
- .orElseThrow()
- .getLuceneIndex(clusterName,
- indexStructure,
- indicizerAnalyzers,
- indicizerSimilarities,
- luceneOptions,
- luceneHacks
- );
- } else {
- record ShardToIndex(int shard, LLLuceneIndex connIndex) {}
- var luceneIndices = new LLLuceneIndex[indexStructure.totalShards()];
- connectionToShardMap.entrySet().stream().flatMap(entry -> {
- var connectionIndexStructure = indexStructure.setActiveShards(new IntArrayList(entry.getValue()));
-
- LLLuceneIndex connIndex = entry.getKey().getLuceneIndex(clusterName, connectionIndexStructure,
- indicizerAnalyzers, indicizerSimilarities, luceneOptions, luceneHacks);
-
- return entry.getValue().intStream().mapToObj(shard -> new ShardToIndex(shard, connIndex));
- }).forEach(index -> luceneIndices[index.shard] = index.connIndex);
- return new LLMultiLuceneIndex(clusterName,
- indexStructure,
- indicizerAnalyzers,
- indicizerSimilarities,
- luceneOptions,
- luceneHacks,
- luceneIndices
- );
- }
- }
-
@Override
public void disconnect() {
collect(allConnections.stream(), executing(connection -> {
@@ -179,10 +96,8 @@ public class LLMultiDatabaseConnection implements LLDatabaseConnection {
public String toString() {
return new StringJoiner(", ", LLMultiDatabaseConnection.class.getSimpleName() + "[", "]")
.add("databaseShardConnections=" + databaseShardConnections)
- .add("luceneShardConnections=" + luceneShardConnections)
.add("allConnections=" + allConnections)
.add("defaultDatabaseConnection=" + defaultDatabaseConnection)
- .add("defaultLuceneConnection=" + defaultLuceneConnection)
.add("anyConnection=" + anyConnection)
.toString();
}
diff --git a/src/main/java/it/cavallium/dbengine/database/LLMultiLuceneIndex.java b/src/main/java/it/cavallium/dbengine/database/LLMultiLuceneIndex.java
deleted file mode 100644
index 8c2f6cb..0000000
--- a/src/main/java/it/cavallium/dbengine/database/LLMultiLuceneIndex.java
+++ /dev/null
@@ -1,244 +0,0 @@
-package it.cavallium.dbengine.database;
-
-import static it.cavallium.dbengine.database.LLUtils.mapList;
-import static it.cavallium.dbengine.lucene.LuceneUtils.getLuceneIndexId;
-import static it.cavallium.dbengine.utils.StreamUtils.LUCENE_POOL;
-import static it.cavallium.dbengine.utils.StreamUtils.collectOn;
-import static it.cavallium.dbengine.utils.StreamUtils.executing;
-import static it.cavallium.dbengine.utils.StreamUtils.fastListing;
-import static it.cavallium.dbengine.utils.StreamUtils.fastReducing;
-import static it.cavallium.dbengine.utils.StreamUtils.fastSummingLong;
-import static it.cavallium.dbengine.utils.StreamUtils.partitionByInt;
-import static java.util.stream.Collectors.groupingBy;
-
-import com.google.common.collect.Multimap;
-import it.cavallium.dbengine.client.IBackuppable;
-import it.cavallium.dbengine.client.query.current.data.Query;
-import it.cavallium.dbengine.client.query.current.data.QueryParams;
-import it.cavallium.dbengine.lucene.LuceneHacks;
-import it.cavallium.dbengine.lucene.collector.Buckets;
-import it.cavallium.dbengine.lucene.searcher.BucketParams;
-import it.cavallium.dbengine.rpc.current.data.IndicizerAnalyzers;
-import it.cavallium.dbengine.rpc.current.data.IndicizerSimilarities;
-import it.cavallium.dbengine.rpc.current.data.LuceneIndexStructure;
-import it.cavallium.dbengine.rpc.current.data.LuceneOptions;
-import it.unimi.dsi.fastutil.doubles.DoubleArrayList;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map.Entry;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.stream.Stream;
-import org.jetbrains.annotations.NotNull;
-import org.jetbrains.annotations.Nullable;
-
-public class LLMultiLuceneIndex implements LLLuceneIndex {
-
-
- private final ConcurrentHashMap> registeredSnapshots = new ConcurrentHashMap<>();
- private final AtomicLong nextSnapshotNumber = new AtomicLong(1);
-
- private final String clusterName;
- private final LuceneIndexStructure indexStructure;
- private final IndicizerAnalyzers indicizerAnalyzers;
- private final IndicizerSimilarities indicizerSimilarities;
- private final LuceneOptions luceneOptions;
- private final LuceneHacks luceneHacks;
- private final LLLuceneIndex[] luceneIndicesById;
- private final List luceneIndicesSet;
- private final int totalShards;
-
- public LLMultiLuceneIndex(String clusterName,
- LuceneIndexStructure indexStructure,
- IndicizerAnalyzers indicizerAnalyzers,
- IndicizerSimilarities indicizerSimilarities,
- LuceneOptions luceneOptions,
- LuceneHacks luceneHacks,
- LLLuceneIndex[] luceneIndices) {
- this.clusterName = clusterName;
- this.indexStructure = indexStructure;
- this.indicizerAnalyzers = indicizerAnalyzers;
- this.indicizerSimilarities = indicizerSimilarities;
- this.luceneOptions = luceneOptions;
- this.luceneHacks = luceneHacks;
- this.luceneIndicesById = luceneIndices;
- this.totalShards = indexStructure.totalShards();
- var luceneIndicesSet = new HashSet();
- for (LLLuceneIndex luceneIndex : luceneIndices) {
- if (luceneIndex != null) {
- luceneIndicesSet.add(luceneIndex);
- }
- }
- this.luceneIndicesSet = new ArrayList<>(luceneIndicesSet);
- }
-
- @Override
- public String getLuceneIndexName() {
- return clusterName;
- }
-
- private LLLuceneIndex getLuceneIndex(LLTerm id) {
- return luceneIndicesById[getLuceneIndexId(id, totalShards)];
- }
-
- @Override
- public void addDocument(LLTerm id, LLUpdateDocument doc) {
- getLuceneIndex(id).addDocument(id, doc);
- }
-
- @Override
- public long addDocuments(boolean atomic, Stream> documents) {
- return collectOn(LUCENE_POOL,
- partitionByInt(term -> getLuceneIndexId(term.getKey(), totalShards), documents)
- .map(entry -> luceneIndicesById[entry.key()].addDocuments(atomic, entry.values().stream())),
- fastSummingLong()
- );
- }
-
- @Override
- public void deleteDocument(LLTerm id) {
- getLuceneIndex(id).deleteDocument(id);
- }
-
- @Override
- public void update(LLTerm id, LLIndexRequest request) {
- getLuceneIndex(id).update(id, request);
- }
-
- @Override
- public long updateDocuments(Stream> documents) {
- return collectOn(LUCENE_POOL,
- partitionByInt(term -> getLuceneIndexId(term.getKey(), totalShards), documents)
- .map(entry -> luceneIndicesById[entry.key()].updateDocuments(entry.values().stream())),
- fastSummingLong()
- );
- }
-
- @Override
- public void deleteAll() {
- luceneIndicesSet.forEach(LLLuceneIndex::deleteAll);
- }
-
- @Override
- public Stream moreLikeThis(@Nullable LLSnapshot snapshot,
- QueryParams queryParams,
- @Nullable String keyFieldName,
- Multimap mltDocumentFields) {
- return luceneIndicesSet.stream().flatMap(luceneIndex -> luceneIndex.moreLikeThis(snapshot,
- queryParams,
- keyFieldName,
- mltDocumentFields
- ));
- }
-
- private Buckets mergeShards(List shards) {
- List seriesValues = new ArrayList<>();
- DoubleArrayList totals = new DoubleArrayList(shards.get(0).totals());
-
- for (Buckets shard : shards) {
- if (seriesValues.isEmpty()) {
- seriesValues.addAll(shard.seriesValues());
- } else {
- for (int serieIndex = 0; serieIndex < seriesValues.size(); serieIndex++) {
- DoubleArrayList mergedSerieValues = seriesValues.get(serieIndex);
- for (int dataIndex = 0; dataIndex < mergedSerieValues.size(); dataIndex++) {
- mergedSerieValues.set(dataIndex, mergedSerieValues.getDouble(dataIndex)
- + shard.seriesValues().get(serieIndex).getDouble(dataIndex)
- );
- }
- }
- }
- for (int i = 0; i < totals.size(); i++) {
- totals.set(i, totals.getDouble(i) + shard.totals().getDouble(i));
- }
- }
- return new Buckets(seriesValues, totals);
- }
-
- @Override
- public Stream search(@Nullable LLSnapshot snapshot,
- QueryParams queryParams,
- @Nullable String keyFieldName) {
- return luceneIndicesSet.stream().flatMap(luceneIndex -> luceneIndex.search(snapshot,
- queryParams,
- keyFieldName
- ));
- }
-
- @Override
- public Buckets computeBuckets(@Nullable LLSnapshot snapshot,
- @NotNull List queries,
- @Nullable Query normalizationQuery,
- BucketParams bucketParams) {
- return mergeShards(mapList(luceneIndicesSet, luceneIndex -> luceneIndex.computeBuckets(snapshot,
- queries,
- normalizationQuery,
- bucketParams
- )));
- }
-
- @Override
- public boolean isLowMemoryMode() {
- return luceneOptions.lowMemory();
- }
-
- @Override
- public void close() {
- collectOn(LUCENE_POOL, luceneIndicesSet.stream(), executing(LLLuceneIndex::close));
- }
-
- @Override
- public void flush() {
- collectOn(LUCENE_POOL, luceneIndicesSet.stream(), executing(LLLuceneIndex::flush));
- }
-
- @Override
- public void waitForMerges() {
- collectOn(LUCENE_POOL, luceneIndicesSet.stream(), executing(LLLuceneIndex::waitForMerges));
- }
-
- @Override
- public void waitForLastMerges() {
- collectOn(LUCENE_POOL, luceneIndicesSet.stream(), executing(LLLuceneIndex::waitForLastMerges));
- }
-
- @Override
- public void refresh(boolean force) {
- collectOn(LUCENE_POOL, luceneIndicesSet.stream(), executing(index -> index.refresh(force)));
- }
-
- @Override
- public LLSnapshot takeSnapshot() {
- // Generate next snapshot index
- var snapshotIndex = nextSnapshotNumber.getAndIncrement();
- var snapshot = collectOn(LUCENE_POOL, luceneIndicesSet.stream().map(LLSnapshottable::takeSnapshot), fastListing());
- registeredSnapshots.put(snapshotIndex, snapshot);
- return new LLSnapshot(snapshotIndex);
- }
-
- @Override
- public void releaseSnapshot(LLSnapshot snapshot) {
- var list = registeredSnapshots.remove(snapshot.getSequenceNumber());
- for (int shardIndex = 0; shardIndex < list.size(); shardIndex++) {
- var luceneIndex = luceneIndicesSet.get(shardIndex);
- LLSnapshot instanceSnapshot = list.get(shardIndex);
- luceneIndex.releaseSnapshot(instanceSnapshot);
- }
- }
-
- @Override
- public void pauseForBackup() {
- collectOn(LUCENE_POOL, luceneIndicesSet.stream(), executing(LLLuceneIndex::pauseForBackup));
- }
-
- @Override
- public void resumeAfterBackup() {
- collectOn(LUCENE_POOL, luceneIndicesSet.stream(), executing(LLLuceneIndex::resumeAfterBackup));
- }
-
- @Override
- public boolean isPaused() {
- return this.luceneIndicesSet.stream().anyMatch(IBackuppable::isPaused);
- }
-}
diff --git a/src/main/java/it/cavallium/dbengine/database/LLScoreMode.java b/src/main/java/it/cavallium/dbengine/database/LLScoreMode.java
index 7674e0b..459b33e 100644
--- a/src/main/java/it/cavallium/dbengine/database/LLScoreMode.java
+++ b/src/main/java/it/cavallium/dbengine/database/LLScoreMode.java
@@ -1,7 +1,5 @@
package it.cavallium.dbengine.database;
-import org.apache.lucene.search.Scorer;
-
public enum LLScoreMode {
/**
* Produced scorers will allow visiting all matches and get their score.
@@ -15,7 +13,7 @@ public enum LLScoreMode {
COMPLETE_NO_SCORES,
/**
* Produced scorers will optionally allow skipping over non-competitive
- * hits using the {@link Scorer#setMinCompetitiveScore(float)} API.
+ * hits using the {@link org.apache.lucene.search.Scorer#setMinCompetitiveScore(float)} API.
* This can reduce time if using setMinCompetitiveScore.
*/
TOP_SCORES,
diff --git a/src/main/java/it/cavallium/dbengine/database/LLSearchResult.java b/src/main/java/it/cavallium/dbengine/database/LLSearchResult.java
deleted file mode 100644
index d206cc5..0000000
--- a/src/main/java/it/cavallium/dbengine/database/LLSearchResult.java
+++ /dev/null
@@ -1,13 +0,0 @@
-package it.cavallium.dbengine.database;
-
-import java.util.function.BiFunction;
-import java.util.stream.Stream;
-import org.jetbrains.annotations.NotNull;
-
-public record LLSearchResult(Stream results) {
-
- @NotNull
- public static BiFunction accumulator() {
- return (a, b) -> new LLSearchResult(Stream.concat(a.results, b.results));
- }
-}
diff --git a/src/main/java/it/cavallium/dbengine/database/LLSearchResultShard.java b/src/main/java/it/cavallium/dbengine/database/LLSearchResultShard.java
deleted file mode 100644
index 9bdbd81..0000000
--- a/src/main/java/it/cavallium/dbengine/database/LLSearchResultShard.java
+++ /dev/null
@@ -1,51 +0,0 @@
-package it.cavallium.dbengine.database;
-
-import it.cavallium.dbengine.client.query.current.data.TotalHitsCount;
-import it.cavallium.dbengine.lucene.LuceneCloseable;
-import it.cavallium.dbengine.utils.SimpleResource;
-import java.util.List;
-import java.util.Objects;
-import java.util.stream.Stream;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
-public class LLSearchResultShard {
-
- private static final Logger LOG = LogManager.getLogger(LLSearchResultShard.class);
-
- private final List results;
- private final TotalHitsCount totalHitsCount;
-
- public LLSearchResultShard(List results, TotalHitsCount totalHitsCount) {
- this.results = results;
- this.totalHitsCount = totalHitsCount;
- }
-
- public List results() {
- return results;
- }
-
- public TotalHitsCount totalHitsCount() {
- return totalHitsCount;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (obj == this)
- return true;
- if (obj == null || obj.getClass() != this.getClass())
- return false;
- var that = (LLSearchResultShard) obj;
- return Objects.equals(this.results, that.results) && Objects.equals(this.totalHitsCount, that.totalHitsCount);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(results, totalHitsCount);
- }
-
- @Override
- public String toString() {
- return "LLSearchResultShard[" + "results=" + results + ", " + "totalHitsCount=" + totalHitsCount + ']';
- }
-}
diff --git a/src/main/java/it/cavallium/dbengine/database/LLSoftUpdateDocument.java b/src/main/java/it/cavallium/dbengine/database/LLSoftUpdateDocument.java
deleted file mode 100644
index b11bcba..0000000
--- a/src/main/java/it/cavallium/dbengine/database/LLSoftUpdateDocument.java
+++ /dev/null
@@ -1,5 +0,0 @@
-package it.cavallium.dbengine.database;
-
-import java.util.List;
-
-public record LLSoftUpdateDocument(List items, List softDeleteItems) implements LLIndexRequest {}
diff --git a/src/main/java/it/cavallium/dbengine/database/LLTerm.java b/src/main/java/it/cavallium/dbengine/database/LLTerm.java
deleted file mode 100644
index e171449..0000000
--- a/src/main/java/it/cavallium/dbengine/database/LLTerm.java
+++ /dev/null
@@ -1,58 +0,0 @@
-package it.cavallium.dbengine.database;
-
-import java.util.Objects;
-import org.apache.lucene.util.BytesRef;
-
-public class LLTerm {
-
- private final String key;
- private final BytesRef value;
-
- public LLTerm(String key, String value) {
- this.key = key;
- this.value = new BytesRef(value);
- }
-
- public LLTerm(String key, BytesRef value) {
- this.key = key;
- this.value = value;
- }
-
- public String getKey() {
- return key;
- }
-
- public String getValueUTF8() {
- return value.utf8ToString();
- }
-
- public BytesRef getValueBytesRef() {
- return value;
- }
-
- @Override
- public String toString() {
- return "LLTerm{" +
- "key='" + key + '\'' +
- ", value='" + value + '\'' +
- '}';
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- LLTerm llTerm = (LLTerm) o;
- return Objects.equals(key, llTerm.key) &&
- Objects.equals(value, llTerm.value);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(key, value);
- }
-}
diff --git a/src/main/java/it/cavallium/dbengine/database/LLTopKeys.java b/src/main/java/it/cavallium/dbengine/database/LLTopKeys.java
deleted file mode 100644
index 28dd3ac..0000000
--- a/src/main/java/it/cavallium/dbengine/database/LLTopKeys.java
+++ /dev/null
@@ -1,52 +0,0 @@
-package it.cavallium.dbengine.database;
-
-import java.util.Arrays;
-import java.util.Objects;
-
-@SuppressWarnings("unused")
-public class LLTopKeys {
-
- private final long totalHitsCount;
- private final LLKeyScore[] hits;
-
- public LLTopKeys(long totalHitsCount, LLKeyScore[] hits) {
- this.totalHitsCount = totalHitsCount;
- this.hits = hits;
- }
-
- public long getTotalHitsCount() {
- return totalHitsCount;
- }
-
- public LLKeyScore[] getHits() {
- return hits;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- LLTopKeys llTopKeys = (LLTopKeys) o;
- return totalHitsCount == llTopKeys.totalHitsCount &&
- Arrays.equals(hits, llTopKeys.hits);
- }
-
- @Override
- public int hashCode() {
- int result = Objects.hash(totalHitsCount);
- result = 31 * result + Arrays.hashCode(hits);
- return result;
- }
-
- @Override
- public String toString() {
- return "LLTopKeys{" +
- "totalHitsCount=" + totalHitsCount +
- ", hits=" + Arrays.toString(hits) +
- '}';
- }
-}
diff --git a/src/main/java/it/cavallium/dbengine/database/LLUpdateDocument.java b/src/main/java/it/cavallium/dbengine/database/LLUpdateDocument.java
deleted file mode 100644
index 3023125..0000000
--- a/src/main/java/it/cavallium/dbengine/database/LLUpdateDocument.java
+++ /dev/null
@@ -1,5 +0,0 @@
-package it.cavallium.dbengine.database;
-
-import java.util.List;
-
-public record LLUpdateDocument(List items) implements LLIndexRequest {}
diff --git a/src/main/java/it/cavallium/dbengine/database/LLUpdateFields.java b/src/main/java/it/cavallium/dbengine/database/LLUpdateFields.java
deleted file mode 100644
index 86de268..0000000
--- a/src/main/java/it/cavallium/dbengine/database/LLUpdateFields.java
+++ /dev/null
@@ -1,5 +0,0 @@
-package it.cavallium.dbengine.database;
-
-import java.util.List;
-
-public record LLUpdateFields(List items) implements LLIndexRequest {}
diff --git a/src/main/java/it/cavallium/dbengine/database/LLUtils.java b/src/main/java/it/cavallium/dbengine/database/LLUtils.java
index b6f9fa9..941345c 100644
--- a/src/main/java/it/cavallium/dbengine/database/LLUtils.java
+++ b/src/main/java/it/cavallium/dbengine/database/LLUtils.java
@@ -5,13 +5,8 @@ import static org.apache.commons.lang3.ArrayUtils.EMPTY_BYTE_ARRAY;
import com.google.common.primitives.Ints;
import com.google.common.primitives.Longs;
import it.cavallium.buffer.Buf;
-import it.cavallium.dbengine.client.HitEntry;
-import it.cavallium.dbengine.client.HitKey;
import it.cavallium.dbengine.database.disk.rocksdb.LLReadOptions;
import it.cavallium.dbengine.database.serialization.SerializationFunction;
-import it.cavallium.dbengine.lucene.LuceneCloseable;
-import it.cavallium.dbengine.lucene.LuceneUtils;
-import it.cavallium.dbengine.lucene.RandomSortField;
import java.lang.invoke.MethodHandle;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodHandles.Lookup;
@@ -22,49 +17,25 @@ import java.util.Collection;
import java.util.HexFormat;
import java.util.List;
import java.util.Map;
-import java.util.Map.Entry;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.function.Consumer;
import java.util.function.Function;
-import java.util.stream.Stream;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.Marker;
import org.apache.logging.log4j.MarkerManager;
-import org.apache.lucene.document.Document;
-import org.apache.lucene.document.DoublePoint;
-import org.apache.lucene.document.Field;
-import org.apache.lucene.document.Field.Store;
-import org.apache.lucene.document.FloatPoint;
-import org.apache.lucene.document.IntPoint;
-import org.apache.lucene.document.LongPoint;
-import org.apache.lucene.document.NumericDocValuesField;
-import org.apache.lucene.document.SortedNumericDocValuesField;
-import org.apache.lucene.document.StoredField;
-import org.apache.lucene.document.StringField;
-import org.apache.lucene.document.TextField;
-import org.apache.lucene.index.Term;
-import org.apache.lucene.search.ScoreDoc;
-import org.apache.lucene.search.ScoreMode;
-import org.apache.lucene.search.Sort;
-import org.apache.lucene.search.SortField;
-import org.apache.lucene.search.SortedNumericSortField;
-import org.apache.lucene.util.BytesRef;
-import org.apache.lucene.util.BytesRefBuilder;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.rocksdb.AbstractImmutableNativeReference;
import org.rocksdb.AbstractNativeReference;
-import org.rocksdb.ReadOptions;
@SuppressWarnings("unused")
public class LLUtils {
private static final Logger logger = LogManager.getLogger(LLUtils.class);
public static final Marker MARKER_ROCKSDB = MarkerManager.getMarker("ROCKSDB");
- public static final Marker MARKER_LUCENE = MarkerManager.getMarker("LUCENE");
public static final int INITIAL_DIRECT_READ_BYTE_BUF_SIZE_BYTES = 4096;
public static final ByteBuffer EMPTY_BYTE_BUFFER = ByteBuffer.allocateDirect(0).asReadOnlyBuffer();
@@ -144,116 +115,6 @@ public class LLUtils {
return bool ? BUF_TRUE : BUF_FALSE;
}
- @Nullable
- public static Sort toSort(@Nullable LLSort sort) {
- if (sort == null) {
- return null;
- }
- if (sort.getType() == LLSortType.LONG) {
- return new Sort(new SortedNumericSortField(sort.getFieldName(), SortField.Type.LONG, sort.isReverse()));
- } else if (sort.getType() == LLSortType.RANDOM) {
- return new Sort(new RandomSortField());
- } else if (sort.getType() == LLSortType.SCORE) {
- return new Sort(SortField.FIELD_SCORE);
- } else if (sort.getType() == LLSortType.DOC) {
- return new Sort(SortField.FIELD_DOC);
- }
- return null;
- }
-
- public static ScoreMode toScoreMode(LLScoreMode scoreMode) {
- return switch (scoreMode) {
- case COMPLETE -> ScoreMode.COMPLETE;
- case TOP_SCORES -> ScoreMode.TOP_SCORES;
- case COMPLETE_NO_SCORES -> ScoreMode.COMPLETE_NO_SCORES;
- case NO_SCORES -> ScoreMode.TOP_DOCS;
- };
- }
-
- public static Term toTerm(LLTerm term) {
- var valueRef = new FakeBytesRefBuilder(term);
- return new Term(term.getKey(), valueRef);
- }
-
- public static Document toDocument(LLUpdateDocument document) {
- return toDocument(document.items());
- }
-
- public static Document toDocument(List document) {
- Document d = new Document();
- for (LLItem item : document) {
- if (item != null) {
- d.add(LLUtils.toField(item));
- }
- }
- return d;
- }
-
- public static Field[] toFields(List fields) {
- Field[] d = new Field[fields.size()];
- for (int i = 0; i < fields.size(); i++) {
- d[i] = LLUtils.toField(fields.get(i));
- }
- return d;
- }
-
- public static Collection toDocuments(Collection document) {
- List d = new ArrayList<>(document.size());
- for (LLUpdateDocument doc : document) {
- d.add(LLUtils.toDocument(doc));
- }
- return d;
- }
-
- public static Collection toDocumentsFromEntries(Collection> documentsList) {
- ArrayList results = new ArrayList<>(documentsList.size());
- for (Entry entry : documentsList) {
- results.add(LLUtils.toDocument(entry.getValue()));
- }
- return results;
- }
-
- public static Iterable toTerms(Iterable terms) {
- List d = new ArrayList<>();
- for (LLTerm term : terms) {
- d.add(LLUtils.toTerm(term));
- }
- return d;
- }
-
- private static Field toField(LLItem item) {
- return switch (item.getType()) {
- case IntPoint -> new IntPoint(item.getName(), item.intData());
- case DoublePoint -> new DoublePoint(item.getName(), item.doubleData());
- case IntPointND -> new IntPoint(item.getName(), item.intArrayData());
- case LongPoint -> new LongPoint(item.getName(), item.longData());
- case LongPointND -> new LongPoint(item.getName(), item.longArrayData());
- case FloatPointND -> new FloatPoint(item.getName(), item.floatArrayData());
- case DoublePointND -> new DoublePoint(item.getName(), item.doubleArrayData());
- case LongStoredField -> new StoredField(item.getName(), item.longData());
- case BytesStoredField -> new StoredField(item.getName(), (BytesRef) item.getData());
- case FloatPoint -> new FloatPoint(item.getName(), item.floatData());
- case TextField -> new TextField(item.getName(), item.stringValue(), Store.NO);
- case TextFieldStored -> new TextField(item.getName(), item.stringValue(), Store.YES);
- case SortedNumericDocValuesField -> new SortedNumericDocValuesField(item.getName(), item.longData());
- case NumericDocValuesField -> new NumericDocValuesField(item.getName(), item.longData());
- case StringField -> {
- if (item.getData() instanceof BytesRef bytesRef) {
- yield new StringField(item.getName(), bytesRef, Store.NO);
- } else {
- yield new StringField(item.getName(), item.stringValue(), Store.NO);
- }
- }
- case StringFieldStored -> {
- if (item.getData() instanceof BytesRef bytesRef) {
- yield new StringField(item.getName(), bytesRef, Store.YES);
- } else {
- yield new StringField(item.getName(), item.stringValue(), Store.YES);
- }
- }
- };
- }
-
private static int[] getIntArray(byte[] data) {
var count = data.length / Integer.BYTES;
var items = new int[count];
@@ -284,10 +145,6 @@ public class LLUtils {
return items;
}
- public static it.cavallium.dbengine.database.LLKeyScore toKeyScore(LLKeyScore hit) {
- return new it.cavallium.dbengine.database.LLKeyScore(hit.docId(), hit.shardId(), hit.score(), hit.key());
- }
-
public static String toStringSafe(byte @Nullable[] key) {
if (key != null) {
return toString(key);
@@ -451,15 +308,6 @@ public class LLUtils {
return buf.hashCode();
}
- public static boolean isSet(ScoreDoc[] scoreDocs) {
- for (ScoreDoc scoreDoc : scoreDocs) {
- if (scoreDoc == null) {
- return false;
- }
- }
- return true;
- }
-
public static boolean isBoundedRange(LLRange rangeShared) {
return rangeShared.hasMin() && rangeShared.hasMax();
}
@@ -625,11 +473,7 @@ public class LLUtils {
private static void closeResource(Object next, boolean manual) {
if (next instanceof SafeCloseable closeable) {
if (manual || closeable instanceof DiscardingCloseable) {
- if (!manual && !LuceneUtils.isLuceneThread() && closeable instanceof LuceneCloseable luceneCloseable) {
- luceneCloseable.close();
- } else {
- closeable.close();
- }
+ closeable.close();
}
} else if (next instanceof List> iterable) {
iterable.forEach(obj -> closeResource(obj, manual));
@@ -680,18 +524,4 @@ public class LLUtils {
public static Buf wrapNullable(byte[] array) {
return array != null ? Buf.wrap(array) : null;
}
-
- private static class FakeBytesRefBuilder extends BytesRefBuilder {
-
- private final LLTerm term;
-
- public FakeBytesRefBuilder(LLTerm term) {
- this.term = term;
- }
-
- @Override
- public BytesRef toBytesRef() {
- return term.getValueBytesRef();
- }
- }
}
diff --git a/src/main/java/it/cavallium/dbengine/database/disk/CachedIndexSearcherManager.java b/src/main/java/it/cavallium/dbengine/database/disk/CachedIndexSearcherManager.java
deleted file mode 100644
index 30cb399..0000000
--- a/src/main/java/it/cavallium/dbengine/database/disk/CachedIndexSearcherManager.java
+++ /dev/null
@@ -1,249 +0,0 @@
-package it.cavallium.dbengine.database.disk;
-
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
-import it.cavallium.dbengine.database.LLSnapshot;
-import it.cavallium.dbengine.lucene.LuceneCloseable;
-import it.cavallium.dbengine.utils.SimpleResource;
-import java.io.IOException;
-import it.cavallium.dbengine.utils.DBException;
-import java.time.Duration;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.LockSupport;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-import org.apache.lucene.index.IndexWriter;
-import org.apache.lucene.search.IndexSearcher;
-import org.apache.lucene.search.SearcherFactory;
-import org.apache.lucene.search.SearcherManager;
-import org.apache.lucene.search.similarities.Similarity;
-import org.apache.lucene.store.AlreadyClosedException;
-import org.jetbrains.annotations.NotNull;
-import org.jetbrains.annotations.Nullable;
-
-// todo: deduplicate code between Cached and Simple searcher managers
-public class CachedIndexSearcherManager extends SimpleResource implements IndexSearcherManager, LuceneCloseable {
-
- private static final Logger LOG = LogManager.getLogger(SimpleIndexSearcherManager.class);
- private static final ExecutorService SEARCH_EXECUTOR = Executors.newFixedThreadPool(
- Runtime.getRuntime().availableProcessors(),
- new LuceneThreadFactory("lucene-search")
- .setDaemon(true).withGroup(new ThreadGroup("lucene-search"))
- );
- private static final SearcherFactory SEARCHER_FACTORY = new ExecutorSearcherFactory(SEARCH_EXECUTOR);
-
- @Nullable
- private final SnapshotsManager snapshotsManager;
- private final Similarity similarity;
- private final SearcherManager searcherManager;
-
- private final AtomicLong activeSearchers = new AtomicLong(0);
- private final AtomicLong activeRefreshes = new AtomicLong(0);
-
- private final LoadingCache cachedSnapshotSearchers;
- private final ScheduledFuture> refreshSubscription;
-
- public CachedIndexSearcherManager(IndexWriter indexWriter,
- @Nullable SnapshotsManager snapshotsManager,
- ScheduledExecutorService luceneHeavyTasksScheduler,
- Similarity similarity,
- boolean applyAllDeletes,
- boolean writeAllDeletes,
- Duration queryRefreshDebounceTime) {
- this.snapshotsManager = snapshotsManager;
- this.similarity = similarity;
-
- try {
- this.searcherManager = new SearcherManager(indexWriter, applyAllDeletes, writeAllDeletes, SEARCHER_FACTORY);
- } catch (IOException e) {
- throw new DBException(e);
- }
-
- refreshSubscription = luceneHeavyTasksScheduler.scheduleAtFixedRate(() -> {
- try {
- maybeRefresh();
- } catch (Exception ex) {
- LOG.error("Failed to refresh the searcher manager", ex);
- }
- },
- queryRefreshDebounceTime.toMillis(),
- queryRefreshDebounceTime.toMillis(),
- TimeUnit.MILLISECONDS
- );
-
- this.cachedSnapshotSearchers = CacheBuilder.newBuilder()
- .expireAfterWrite(queryRefreshDebounceTime)
- // Max 3 cached non-main index writers
- .maximumSize(3)
- .build(new CacheLoader<>() {
- @Override
- public LLIndexSearcher load(@NotNull LLSnapshot snapshot) {
- return CachedIndexSearcherManager.this.generateCachedSearcher(snapshot);
- }
- });
- }
-
- private LLIndexSearcher generateCachedSearcher(@Nullable LLSnapshot snapshot) {
- if (isClosed()) {
- return null;
- }
- activeSearchers.incrementAndGet();
- try {
- IndexSearcher indexSearcher;
- boolean fromSnapshot;
- if (snapshotsManager == null || snapshot == null) {
- try {
- indexSearcher = searcherManager.acquire();
- } catch (IOException ex) {
- throw new DBException(ex);
- }
- fromSnapshot = false;
- } else {
- indexSearcher = snapshotsManager.resolveSnapshot(snapshot).getIndexSearcher(SEARCH_EXECUTOR);
- fromSnapshot = true;
- }
- indexSearcher.setSimilarity(similarity);
- assert indexSearcher.getIndexReader().getRefCount() > 0;
- LLIndexSearcher llIndexSearcher;
- if (fromSnapshot) {
- llIndexSearcher = new SnapshotIndexSearcher(indexSearcher);
- } else {
- llIndexSearcher = new MainIndexSearcher(indexSearcher, searcherManager);
- }
- return llIndexSearcher;
- } catch (Throwable ex) {
- activeSearchers.decrementAndGet();
- throw ex;
- }
- }
-
- private void dropCachedIndexSearcher() {
- // This shouldn't happen more than once per searcher.
- activeSearchers.decrementAndGet();
- }
-
- @Override
- public void maybeRefreshBlocking() {
- try {
- activeRefreshes.incrementAndGet();
- searcherManager.maybeRefreshBlocking();
- } catch (AlreadyClosedException ignored) {
-
- } catch (IOException e) {
- throw new DBException(e);
- } finally {
- activeRefreshes.decrementAndGet();
- }
- }
-
- @Override
- public void maybeRefresh() {
- try {
- activeRefreshes.incrementAndGet();
- searcherManager.maybeRefresh();
- } catch (AlreadyClosedException ignored) {
-
- } catch (IOException e) {
- throw new DBException(e);
- } finally {
- activeRefreshes.decrementAndGet();
- }
- }
-
- @Override
- public LLIndexSearcher retrieveSearcher(@Nullable LLSnapshot snapshot) {
- if (snapshot == null) {
- return this.generateCachedSearcher(null);
- } else {
- return this.cachedSnapshotSearchers.getUnchecked(snapshot);
- }
- }
-
- @Override
- protected void onClose() {
- LOG.debug("Closing IndexSearcherManager...");
- long initTime = System.nanoTime();
- refreshSubscription.cancel(false);
- while (!refreshSubscription.isDone() && (System.nanoTime() - initTime) <= 240000000000L) {
- LockSupport.parkNanos(50000000);
- }
- refreshSubscription.cancel(true);
- LOG.debug("Closed IndexSearcherManager");
- LOG.debug("Closing refreshes...");
- initTime = System.nanoTime();
- while (activeRefreshes.get() > 0 && (System.nanoTime() - initTime) <= 15000000000L) {
- LockSupport.parkNanos(50000000);
- }
- LOG.debug("Closed refreshes...");
- LOG.debug("Closing active searchers...");
- initTime = System.nanoTime();
- while (activeSearchers.get() > 0 && (System.nanoTime() - initTime) <= 15000000000L) {
- LockSupport.parkNanos(50000000);
- }
- LOG.debug("Closed active searchers");
- LOG.debug("Stopping searcher executor...");
- cachedSnapshotSearchers.invalidateAll();
- cachedSnapshotSearchers.cleanUp();
- SEARCH_EXECUTOR.shutdown();
- try {
- if (!SEARCH_EXECUTOR.awaitTermination(15, TimeUnit.SECONDS)) {
- SEARCH_EXECUTOR.shutdownNow();
- }
- } catch (InterruptedException e) {
- LOG.error("Failed to stop executor", e);
- }
- LOG.debug("Stopped searcher executor");
- }
-
- public long getActiveSearchers() {
- return activeSearchers.get();
- }
-
- public long getActiveRefreshes() {
- return activeRefreshes.get();
- }
-
- private class MainIndexSearcher extends LLIndexSearcherImpl implements LuceneCloseable {
-
- public MainIndexSearcher(IndexSearcher indexSearcher, SearcherManager searcherManager) {
- super(indexSearcher, () -> releaseOnCleanup(searcherManager, indexSearcher));
- }
-
- private static void releaseOnCleanup(SearcherManager searcherManager, IndexSearcher indexSearcher) {
- try {
- LOG.warn("An index searcher was not closed!");
- searcherManager.release(indexSearcher);
- } catch (IOException ex) {
- LOG.error("Failed to release the index searcher during cleanup: {}", indexSearcher, ex);
- }
- }
-
- @Override
- public void onClose() {
- dropCachedIndexSearcher();
- try {
- searcherManager.release(indexSearcher);
- } catch (IOException ex) {
- throw new DBException(ex);
- }
- }
- }
-
- private class SnapshotIndexSearcher extends LLIndexSearcherImpl {
-
- public SnapshotIndexSearcher(IndexSearcher indexSearcher) {
- super(indexSearcher);
- }
-
- @Override
- public void onClose() {
- dropCachedIndexSearcher();
- }
- }
-}
diff --git a/src/main/java/it/cavallium/dbengine/database/disk/ExecutorSearcherFactory.java b/src/main/java/it/cavallium/dbengine/database/disk/ExecutorSearcherFactory.java
deleted file mode 100644
index 28ab6d9..0000000
--- a/src/main/java/it/cavallium/dbengine/database/disk/ExecutorSearcherFactory.java
+++ /dev/null
@@ -1,20 +0,0 @@
-package it.cavallium.dbengine.database.disk;
-
-import java.util.concurrent.Executor;
-import org.apache.lucene.index.IndexReader;
-import org.apache.lucene.search.IndexSearcher;
-import org.apache.lucene.search.SearcherFactory;
-
-public class ExecutorSearcherFactory extends SearcherFactory {
-
- private final Executor executor;
-
- public ExecutorSearcherFactory(Executor executor) {
- this.executor = executor;
- }
-
- @Override
- public IndexSearcher newSearcher(IndexReader reader, IndexReader previousReader) {
- return new IndexSearcher(reader, executor);
- }
-}
diff --git a/src/main/java/it/cavallium/dbengine/database/disk/IndexSearcherManager.java b/src/main/java/it/cavallium/dbengine/database/disk/IndexSearcherManager.java
deleted file mode 100644
index 465bacb..0000000
--- a/src/main/java/it/cavallium/dbengine/database/disk/IndexSearcherManager.java
+++ /dev/null
@@ -1,16 +0,0 @@
-package it.cavallium.dbengine.database.disk;
-
-import it.cavallium.dbengine.database.LLSnapshot;
-import it.cavallium.dbengine.database.SafeCloseable;
-import java.io.IOException;
-import java.util.function.Supplier;
-import org.jetbrains.annotations.Nullable;
-
-public interface IndexSearcherManager extends SafeCloseable {
-
- void maybeRefreshBlocking();
-
- void maybeRefresh();
-
- LLIndexSearcher retrieveSearcher(@Nullable LLSnapshot snapshot);
-}
diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLIndexSearcher.java b/src/main/java/it/cavallium/dbengine/database/disk/LLIndexSearcher.java
deleted file mode 100644
index 722aa13..0000000
--- a/src/main/java/it/cavallium/dbengine/database/disk/LLIndexSearcher.java
+++ /dev/null
@@ -1,28 +0,0 @@
-package it.cavallium.dbengine.database.disk;
-
-import it.cavallium.dbengine.database.DiscardingCloseable;
-import it.cavallium.dbengine.utils.SimpleResource;
-import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-import org.apache.lucene.search.IndexSearcher;
-
-public abstract class LLIndexSearcher extends SimpleResource implements DiscardingCloseable {
-
- protected static final Logger LOG = LogManager.getLogger(LLIndexSearcher.class);
-
- public LLIndexSearcher() {
- super();
- }
-
- public LLIndexSearcher(Runnable cleanAction) {
- super(cleanAction);
- }
-
- public IndexSearcher getIndexSearcher() {
- ensureOpen();
- return getIndexSearcherInternal();
- }
-
- protected abstract IndexSearcher getIndexSearcherInternal();
-}
diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLIndexSearcherImpl.java b/src/main/java/it/cavallium/dbengine/database/disk/LLIndexSearcherImpl.java
deleted file mode 100644
index 9610598..0000000
--- a/src/main/java/it/cavallium/dbengine/database/disk/LLIndexSearcherImpl.java
+++ /dev/null
@@ -1,27 +0,0 @@
-package it.cavallium.dbengine.database.disk;
-
-import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-import org.apache.lucene.search.IndexSearcher;
-
-public abstract class LLIndexSearcherImpl extends LLIndexSearcher {
-
- protected static final Logger LOG = LogManager.getLogger(LLIndexSearcherImpl.class);
-
- protected final IndexSearcher indexSearcher;
-
- public LLIndexSearcherImpl(IndexSearcher indexSearcher) {
- super();
- this.indexSearcher = indexSearcher;
- }
-
- public LLIndexSearcherImpl(IndexSearcher indexSearcher, Runnable cleanAction) {
- super(cleanAction);
- this.indexSearcher = indexSearcher;
- }
-
- public IndexSearcher getIndexSearcherInternal() {
- return indexSearcher;
- }
-}
diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLIndexSearchers.java b/src/main/java/it/cavallium/dbengine/database/disk/LLIndexSearchers.java
deleted file mode 100644
index 4ea39ee..0000000
--- a/src/main/java/it/cavallium/dbengine/database/disk/LLIndexSearchers.java
+++ /dev/null
@@ -1,128 +0,0 @@
-package it.cavallium.dbengine.database.disk;
-
-import it.cavallium.dbengine.database.DiscardingCloseable;
-import it.cavallium.dbengine.lucene.LuceneCloseable;
-import it.cavallium.dbengine.lucene.searcher.ShardIndexSearcher;
-import it.cavallium.dbengine.utils.SimpleResource;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Objects;
-import org.apache.lucene.search.IndexSearcher;
-
-public interface LLIndexSearchers extends DiscardingCloseable {
-
- static LLIndexSearchers of(List indexSearchers) {
- return new ShardedIndexSearchers(indexSearchers);
- }
-
- static UnshardedIndexSearchers unsharded(LLIndexSearcher indexSearcher) {
- return new UnshardedIndexSearchers(indexSearcher);
- }
-
- List shards();
-
- List llShards();
-
- IndexSearcher shard(int shardIndex);
-
- LLIndexSearcher llShard(int shardIndex);
-
- class UnshardedIndexSearchers implements LLIndexSearchers, LuceneCloseable {
-
- private final LLIndexSearcher indexSearcher;
-
- public UnshardedIndexSearchers(LLIndexSearcher indexSearcher) {
- Objects.requireNonNull(indexSearcher);
- this.indexSearcher = indexSearcher;
- }
-
- @Override
- public List shards() {
- return List.of(indexSearcher.getIndexSearcher());
- }
-
- @Override
- public List llShards() {
- return Collections.singletonList(indexSearcher);
- }
-
- @Override
- public IndexSearcher shard(int shardIndex) {
- if (shardIndex != -1) {
- throw new IndexOutOfBoundsException("Shard index " + shardIndex + " is invalid, this is a unsharded index");
- }
- return indexSearcher.getIndexSearcher();
- }
-
- @Override
- public LLIndexSearcher llShard(int shardIndex) {
- if (shardIndex != -1) {
- throw new IndexOutOfBoundsException("Shard index " + shardIndex + " is invalid, this is a unsharded index");
- }
- return indexSearcher;
- }
-
- public IndexSearcher shard() {
- return this.shard(-1);
- }
-
- public LLIndexSearcher llShard() {
- return this.llShard(-1);
- }
-
- @Override
- public void close() {
- indexSearcher.close();
- }
- }
-
- class ShardedIndexSearchers implements LLIndexSearchers, LuceneCloseable {
-
- private final List indexSearchers;
- private final List indexSearchersVals;
-
- public ShardedIndexSearchers(List indexSearchers) {
- List shardedIndexSearchersVals = new ArrayList<>(indexSearchers.size());
- for (LLIndexSearcher indexSearcher : indexSearchers) {
- shardedIndexSearchersVals.add(indexSearcher.getIndexSearcher());
- }
- shardedIndexSearchersVals = ShardIndexSearcher.create(shardedIndexSearchersVals);
- this.indexSearchers = indexSearchers;
- this.indexSearchersVals = shardedIndexSearchersVals;
- }
-
- @Override
- public List shards() {
- return Collections.unmodifiableList(indexSearchersVals);
- }
-
- @Override
- public List llShards() {
- return Collections.unmodifiableList(indexSearchers);
- }
-
- @Override
- public IndexSearcher shard(int shardIndex) {
- if (shardIndex < 0) {
- throw new IndexOutOfBoundsException("Shard index " + shardIndex + " is invalid");
- }
- return indexSearchersVals.get(shardIndex);
- }
-
- @Override
- public LLIndexSearcher llShard(int shardIndex) {
- if (shardIndex < 0) {
- throw new IndexOutOfBoundsException("Shard index " + shardIndex + " is invalid");
- }
- return indexSearchers.get(shardIndex);
- }
-
- @Override
- public void close() {
- for (LLIndexSearcher indexSearcher : indexSearchers) {
- indexSearcher.close();
- }
- }
- }
-}
diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDatabaseConnection.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDatabaseConnection.java
index 7b842d7..9845bf3 100644
--- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDatabaseConnection.java
+++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDatabaseConnection.java
@@ -2,14 +2,8 @@ package it.cavallium.dbengine.database.disk;
import io.micrometer.core.instrument.MeterRegistry;
import it.cavallium.dbengine.database.LLDatabaseConnection;
-import it.cavallium.dbengine.database.LLLuceneIndex;
-import it.cavallium.dbengine.lucene.LuceneHacks;
import it.cavallium.dbengine.rpc.current.data.Column;
import it.cavallium.dbengine.rpc.current.data.DatabaseOptions;
-import it.cavallium.dbengine.rpc.current.data.IndicizerAnalyzers;
-import it.cavallium.dbengine.rpc.current.data.IndicizerSimilarities;
-import it.cavallium.dbengine.rpc.current.data.LuceneIndexStructure;
-import it.cavallium.dbengine.rpc.current.data.LuceneOptions;
import it.cavallium.dbengine.utils.DBException;
import java.io.IOException;
import java.nio.file.Files;
@@ -18,7 +12,6 @@ import java.util.LinkedList;
import java.util.List;
import java.util.StringJoiner;
import java.util.concurrent.atomic.AtomicBoolean;
-import org.jetbrains.annotations.Nullable;
public class LLLocalDatabaseConnection implements LLDatabaseConnection {
@@ -75,38 +68,6 @@ public class LLLocalDatabaseConnection implements LLDatabaseConnection {
return basePath.resolve("database_" + databaseName);
}
- @Override
- public LLLuceneIndex getLuceneIndex(String clusterName,
- LuceneIndexStructure indexStructure,
- IndicizerAnalyzers indicizerAnalyzers,
- IndicizerSimilarities indicizerSimilarities,
- LuceneOptions luceneOptions,
- @Nullable LuceneHacks luceneHacks) {
- if (clusterName == null) {
- throw new IllegalArgumentException("Cluster name must be set");
- }
- if (indexStructure.activeShards().size() != 1) {
- return new LLLocalMultiLuceneIndex(meterRegistry,
- clusterName,
- indexStructure.activeShards(),
- indexStructure.totalShards(),
- indicizerAnalyzers,
- indicizerSimilarities,
- luceneOptions,
- luceneHacks
- );
- } else {
- return new LLLocalLuceneIndex(meterRegistry,
- clusterName,
- indexStructure.activeShards().getInt(0),
- indicizerAnalyzers,
- indicizerSimilarities,
- luceneOptions,
- luceneHacks
- );
- }
- }
-
@Override
public void disconnect() {
if (connected.compareAndSet(true, false)) {
diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java
deleted file mode 100644
index db505d2..0000000
--- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java
+++ /dev/null
@@ -1,882 +0,0 @@
-package it.cavallium.dbengine.database.disk;
-
-import static it.cavallium.dbengine.database.LLUtils.MARKER_LUCENE;
-import static it.cavallium.dbengine.database.LLUtils.toDocument;
-import static it.cavallium.dbengine.database.LLUtils.toFields;
-import static it.cavallium.dbengine.lucene.searcher.GlobalQueryRewrite.NO_REWRITE;
-import static it.cavallium.dbengine.lucene.searcher.LuceneSearchResult.EMPTY_COUNT;
-import static it.cavallium.dbengine.utils.StreamUtils.collect;
-import static it.cavallium.dbengine.utils.StreamUtils.fastListing;
-import static java.util.Objects.requireNonNull;
-
-import com.google.common.collect.Multimap;
-import io.micrometer.core.instrument.Counter;
-import io.micrometer.core.instrument.MeterRegistry;
-import io.micrometer.core.instrument.Tag;
-import io.micrometer.core.instrument.Timer;
-import it.cavallium.dbengine.client.Backuppable;
-import it.cavallium.dbengine.client.IBackuppable;
-import it.cavallium.dbengine.client.query.QueryParser;
-import it.cavallium.dbengine.client.query.current.data.Query;
-import it.cavallium.dbengine.client.query.current.data.QueryParams;
-import it.cavallium.dbengine.client.query.current.data.TotalHitsCount;
-import it.cavallium.dbengine.database.LLIndexRequest;
-import it.cavallium.dbengine.database.LLLuceneIndex;
-import it.cavallium.dbengine.database.LLSearchResultShard;
-import it.cavallium.dbengine.database.LLSnapshot;
-import it.cavallium.dbengine.database.LLSoftUpdateDocument;
-import it.cavallium.dbengine.database.LLTerm;
-import it.cavallium.dbengine.database.LLUpdateDocument;
-import it.cavallium.dbengine.database.LLUpdateFields;
-import it.cavallium.dbengine.database.LLUtils;
-import it.cavallium.dbengine.lucene.LuceneCloseable;
-import it.cavallium.dbengine.lucene.LuceneConcurrentMergeScheduler;
-import it.cavallium.dbengine.lucene.LuceneHacks;
-import it.cavallium.dbengine.lucene.LuceneUtils;
-import it.cavallium.dbengine.lucene.collector.Buckets;
-import it.cavallium.dbengine.lucene.directory.Lucene91CodecWithNoFieldCompression;
-import it.cavallium.dbengine.lucene.mlt.MoreLikeThisTransformer;
-import it.cavallium.dbengine.lucene.searcher.AdaptiveLocalSearcher;
-import it.cavallium.dbengine.lucene.searcher.BucketParams;
-import it.cavallium.dbengine.lucene.searcher.DecimalBucketMultiSearcher;
-import it.cavallium.dbengine.lucene.searcher.LocalQueryParams;
-import it.cavallium.dbengine.lucene.searcher.LocalSearcher;
-import it.cavallium.dbengine.lucene.searcher.LuceneSearchResult;
-import it.cavallium.dbengine.rpc.current.data.IndicizerAnalyzers;
-import it.cavallium.dbengine.rpc.current.data.IndicizerSimilarities;
-import it.cavallium.dbengine.rpc.current.data.LuceneOptions;
-import it.cavallium.dbengine.utils.SimpleResource;
-import java.io.IOException;
-import it.cavallium.dbengine.utils.DBException;
-import java.time.Duration;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map.Entry;
-import java.util.Objects;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CompletionException;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Phaser;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.LongAdder;
-import java.util.concurrent.locks.ReentrantLock;
-import java.util.function.Function;
-import java.util.function.Supplier;
-import java.util.logging.Level;
-import java.util.stream.Stream;
-import org.apache.commons.lang3.time.StopWatch;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-import org.apache.lucene.analysis.miscellaneous.PerFieldAnalyzerWrapper;
-import org.apache.lucene.index.ConcurrentMergeScheduler;
-import org.apache.lucene.index.IndexDeletionPolicy;
-import org.apache.lucene.index.IndexWriter;
-import org.apache.lucene.index.IndexWriterConfig;
-import org.apache.lucene.index.MergeScheduler;
-import org.apache.lucene.index.NoMergePolicy;
-import org.apache.lucene.index.SerialMergeScheduler;
-import org.apache.lucene.index.SnapshotDeletionPolicy;
-import org.apache.lucene.search.similarities.Similarity;
-import org.apache.lucene.store.Directory;
-import org.apache.lucene.store.MMapDirectory;
-import org.apache.lucene.util.IOSupplier;
-import org.jetbrains.annotations.NotNull;
-import org.jetbrains.annotations.Nullable;
-
-public class LLLocalLuceneIndex extends SimpleResource implements IBackuppable, LLLuceneIndex, LuceneCloseable {
-
- protected static final Logger logger = LogManager.getLogger(LLLocalLuceneIndex.class);
-
- private final ReentrantLock shutdownLock = new ReentrantLock();
- /**
- * Global lucene index scheduler.
- * There is only a single thread globally to not overwhelm the disk with
- * concurrent commits or concurrent refreshes.
- */
- private static final ScheduledExecutorService luceneHeavyTasksScheduler = Executors.newScheduledThreadPool(4,
- new LuceneThreadFactory("heavy-tasks").setDaemon(true).withGroup(new ThreadGroup("lucene-heavy-tasks"))
- );
- private static final ScheduledExecutorService luceneWriteScheduler = Executors.newScheduledThreadPool(8,
- new LuceneThreadFactory("lucene-write").setDaemon(true).withGroup(new ThreadGroup("lucene-write"))
- );
- private static final ScheduledExecutorService bulkScheduler = luceneWriteScheduler;
-
- private static final boolean ENABLE_SNAPSHOTS
- = Boolean.parseBoolean(System.getProperty("it.cavallium.dbengine.lucene.snapshot.enable", "true"));
-
- private static final boolean CACHE_SEARCHER_MANAGER
- = Boolean.parseBoolean(System.getProperty("it.cavallium.dbengine.lucene.cachedsearchermanager.enable", "true"));
-
- private static final LLSnapshot DUMMY_SNAPSHOT = new LLSnapshot(-1);
-
- private final LocalSearcher localSearcher;
- private final DecimalBucketMultiSearcher decimalBucketMultiSearcher = new DecimalBucketMultiSearcher();
-
- private final Counter startedDocIndexings;
- private final Counter endeddDocIndexings;
- private final Timer docIndexingTime;
- private final Timer snapshotTime;
- private final Timer flushTime;
- private final Timer commitTime;
- private final Timer mergeTime;
- private final Timer refreshTime;
-
- private final String shardName;
- private final IndexWriter indexWriter;
- private final SnapshotsManager snapshotsManager;
- private final IndexSearcherManager searcherManager;
- private final PerFieldAnalyzerWrapper luceneAnalyzer;
- private final Similarity luceneSimilarity;
- private final Directory directory;
- private final LuceneBackuppable backuppable;
- private final boolean lowMemory;
-
- private final Phaser activeTasks = new Phaser(1);
-
- public LLLocalLuceneIndex(MeterRegistry meterRegistry,
- @NotNull String clusterName,
- int shardIndex,
- IndicizerAnalyzers indicizerAnalyzers,
- IndicizerSimilarities indicizerSimilarities,
- LuceneOptions luceneOptions,
- @Nullable LuceneHacks luceneHacks) {
-
- if (clusterName.isBlank()) {
- throw new DBException("Empty lucene database name");
- }
- if (!MMapDirectory.UNMAP_SUPPORTED) {
- logger.error("Unmap is unsupported, lucene will run slower: {}", MMapDirectory.UNMAP_NOT_SUPPORTED_REASON);
- } else {
- logger.debug("Lucene MMap is supported");
- }
- this.lowMemory = luceneOptions.lowMemory();
- this.shardName = LuceneUtils.getStandardName(clusterName, shardIndex);
- try {
- this.directory = LuceneUtils.createLuceneDirectory(luceneOptions.directoryOptions(), shardName);
- } catch (IOException e) {
- throw new DBException(e);
- }
- boolean isFilesystemCompressed = LuceneUtils.getIsFilesystemCompressed(luceneOptions.directoryOptions());
-
- this.luceneAnalyzer = LuceneUtils.toPerFieldAnalyzerWrapper(indicizerAnalyzers);
- this.luceneSimilarity = LuceneUtils.toPerFieldSimilarityWrapper(indicizerSimilarities);
-
- var maxInMemoryResultEntries = luceneOptions.maxInMemoryResultEntries();
- if (luceneHacks != null && luceneHacks.customLocalSearcher() != null) {
- localSearcher = luceneHacks.customLocalSearcher().get();
- } else {
- localSearcher = new AdaptiveLocalSearcher(maxInMemoryResultEntries);
- }
-
- var indexWriterConfig = new IndexWriterConfig(luceneAnalyzer);
- IndexDeletionPolicy deletionPolicy;
- deletionPolicy = requireNonNull(indexWriterConfig.getIndexDeletionPolicy());
- if (ENABLE_SNAPSHOTS) {
- deletionPolicy = new SnapshotDeletionPolicy(deletionPolicy);
- }
- indexWriterConfig.setIndexDeletionPolicy(deletionPolicy);
- indexWriterConfig.setCommitOnClose(true);
- int writerSchedulerMaxThreadCount;
- MergeScheduler mergeScheduler;
- if (lowMemory) {
- mergeScheduler = new SerialMergeScheduler();
- writerSchedulerMaxThreadCount = 1;
- } else {
- //noinspection resource
- ConcurrentMergeScheduler concurrentMergeScheduler = new LuceneConcurrentMergeScheduler();
- // false means SSD, true means HDD
- boolean spins = false;
- concurrentMergeScheduler.setDefaultMaxMergesAndThreads(spins);
- // It's true by default, but this makes sure it's true if it's a managed path
- if (LuceneUtils.getManagedPath(luceneOptions.directoryOptions()).isPresent()) {
- concurrentMergeScheduler.enableAutoIOThrottle();
- }
- writerSchedulerMaxThreadCount = concurrentMergeScheduler.getMaxThreadCount();
- mergeScheduler = concurrentMergeScheduler;
- }
- if (isFilesystemCompressed) {
- indexWriterConfig.setUseCompoundFile(false);
- indexWriterConfig.setCodec(new Lucene91CodecWithNoFieldCompression());
- }
- logger.trace("WriterSchedulerMaxThreadCount: {}", writerSchedulerMaxThreadCount);
- indexWriterConfig.setMergeScheduler(mergeScheduler);
- indexWriterConfig.setMergePolicy(LuceneUtils.getMergePolicy(luceneOptions));
- if (luceneOptions.indexWriterRAMBufferSizeMB().isPresent()) {
- indexWriterConfig.setRAMBufferSizeMB(luceneOptions.indexWriterRAMBufferSizeMB().get());
- }
- if (luceneOptions.indexWriterMaxBufferedDocs().isPresent()) {
- indexWriterConfig.setMaxBufferedDocs(luceneOptions.indexWriterMaxBufferedDocs().get());
- }
- if (luceneOptions.indexWriterReaderPooling().isPresent()) {
- indexWriterConfig.setReaderPooling(luceneOptions.indexWriterReaderPooling().get());
- }
- indexWriterConfig.setSimilarity(getLuceneSimilarity());
- try {
- this.indexWriter = new IndexWriter(directory, indexWriterConfig);
- } catch (IOException e) {
- throw new DBException(e);
- }
- if (ENABLE_SNAPSHOTS) {
- this.snapshotsManager = new SnapshotsManager(indexWriter, (SnapshotDeletionPolicy) deletionPolicy);
- } else {
- this.snapshotsManager = null;
- }
- SimpleIndexSearcherManager searcherManager;
- if (CACHE_SEARCHER_MANAGER) {
- searcherManager = new SimpleIndexSearcherManager(indexWriter,
- snapshotsManager,
- luceneHeavyTasksScheduler,
- getLuceneSimilarity(),
- luceneOptions.applyAllDeletes().orElse(true),
- luceneOptions.writeAllDeletes().orElse(false),
- luceneOptions.queryRefreshDebounceTime()
- );
- } else {
- searcherManager = new SimpleIndexSearcherManager(indexWriter,
- snapshotsManager,
- luceneHeavyTasksScheduler,
- getLuceneSimilarity(),
- luceneOptions.applyAllDeletes().orElse(true),
- luceneOptions.writeAllDeletes().orElse(false),
- luceneOptions.queryRefreshDebounceTime());
- }
- this.searcherManager = searcherManager;
-
- this.startedDocIndexings = meterRegistry.counter("index.write.doc.started.counter", "index.name", clusterName);
- this.endeddDocIndexings = meterRegistry.counter("index.write.doc.ended.counter", "index.name", clusterName);
- this.docIndexingTime = Timer.builder("index.write.doc.timer").publishPercentiles(0.2, 0.5, 0.95).publishPercentileHistogram().tag("index.name", clusterName).register(meterRegistry);
- this.snapshotTime = Timer.builder("index.write.snapshot.timer").publishPercentiles(0.2, 0.5, 0.95).publishPercentileHistogram().tag("index.name", clusterName).register(meterRegistry);
- this.flushTime = Timer.builder("index.write.flush.timer").publishPercentiles(0.2, 0.5, 0.95).publishPercentileHistogram().tag("index.name", clusterName).register(meterRegistry);
- this.commitTime = Timer.builder("index.write.commit.timer").publishPercentiles(0.2, 0.5, 0.95).publishPercentileHistogram().tag("index.name", clusterName).register(meterRegistry);
- this.mergeTime = Timer.builder("index.write.merge.timer").publishPercentiles(0.2, 0.5, 0.95).publishPercentileHistogram().tag("index.name", clusterName).register(meterRegistry);
- this.refreshTime = Timer.builder("index.search.refresh.timer").publishPercentiles(0.2, 0.5, 0.95).publishPercentileHistogram().tag("index.name", clusterName).register(meterRegistry);
- meterRegistry.gauge("index.snapshot.counter", List.of(Tag.of("index.name", clusterName)), this, LLLocalLuceneIndex::getSnapshotsCount);
- meterRegistry.gauge("index.write.flushing.bytes", List.of(Tag.of("index.name", clusterName)), this, LLLocalLuceneIndex::getIndexWriterFlushingBytes);
- meterRegistry.gauge("index.write.sequence.completed.max", List.of(Tag.of("index.name", clusterName)), this, LLLocalLuceneIndex::getIndexWriterMaxCompletedSequenceNumber);
- meterRegistry.gauge("index.write.doc.pending.counter", List.of(Tag.of("index.name", clusterName)), this, LLLocalLuceneIndex::getIndexWriterPendingNumDocs);
- meterRegistry.gauge("index.write.segment.merging.counter", List.of(Tag.of("index.name", clusterName)), this, LLLocalLuceneIndex::getIndexWriterMergingSegmentsSize);
- meterRegistry.gauge("index.directory.deletion.pending.counter", List.of(Tag.of("index.name", clusterName)), this, LLLocalLuceneIndex::getDirectoryPendingDeletionsCount);
- meterRegistry.gauge("index.doc.counter", List.of(Tag.of("index.name", clusterName)), this, LLLocalLuceneIndex::getDocCount);
- meterRegistry.gauge("index.doc.max", List.of(Tag.of("index.name", clusterName)), this, LLLocalLuceneIndex::getMaxDoc);
- meterRegistry.gauge("index.searcher.refreshes.active.count",
- List.of(Tag.of("index.name", clusterName)),
- searcherManager,
- SimpleIndexSearcherManager::getActiveRefreshes
- );
- meterRegistry.gauge("index.searcher.searchers.active.count",
- List.of(Tag.of("index.name", clusterName)),
- searcherManager,
- SimpleIndexSearcherManager::getActiveSearchers
- );
-
- // Start scheduled tasks
- var commitMillis = luceneOptions.commitDebounceTime().toMillis();
- luceneHeavyTasksScheduler.scheduleAtFixedRate(this::scheduledCommit, commitMillis, commitMillis,
- TimeUnit.MILLISECONDS);
-
- this.backuppable = new LuceneBackuppable();
- }
-
- private Similarity getLuceneSimilarity() {
- return luceneSimilarity;
- }
-
- @Override
- public String getLuceneIndexName() {
- return shardName;
- }
-
- @Override
- public LLSnapshot takeSnapshot() {
- return runTask(() -> {
- if (snapshotsManager == null) {
- return DUMMY_SNAPSHOT;
- }
- try {
- return snapshotTime.recordCallable(snapshotsManager::takeSnapshot);
- } catch (Exception e) {
- throw new DBException("Failed to take snapshot", e);
- }
- });
- }
-
- private V runTask(Supplier supplier) {
- if (isClosed()) {
- throw new IllegalStateException("Lucene index is closed");
- } else {
- activeTasks.register();
- try {
- return supplier.get();
- } finally {
- activeTasks.arriveAndDeregister();
- }
- }
- }
-
- @Override
- public void releaseSnapshot(LLSnapshot snapshot) {
- if (snapshotsManager == null) {
- if (snapshot != null && !Objects.equals(snapshot, DUMMY_SNAPSHOT)) {
- throw new IllegalStateException("Can't release snapshot " + snapshot);
- }
- return;
- }
- snapshotsManager.releaseSnapshot(snapshot);
- }
-
- @Override
- public void addDocument(LLTerm key, LLUpdateDocument doc) {
- runTask(() -> {
- try {
- docIndexingTime.recordCallable(() -> {
- startedDocIndexings.increment();
- try {
- indexWriter.addDocument(toDocument(doc));
- } finally {
- endeddDocIndexings.increment();
- }
- return null;
- });
- } catch (Exception e) {
- throw new DBException("Failed to add document", e);
- }
- logger.trace(MARKER_LUCENE, "Added document {}: {}", key, doc);
- return null;
- });
- }
-
- @Override
- public long addDocuments(boolean atomic, Stream> documents) {
- return this.runTask(() -> {
- if (!atomic) {
- LongAdder count = new LongAdder();
- documents.forEach(document -> {
- count.increment();
- LLUpdateDocument value = document.getValue();
- startedDocIndexings.increment();
- try {
- docIndexingTime.recordCallable(() -> {
- indexWriter.addDocument(toDocument(value));
- return null;
- });
- } catch (Exception ex) {
- throw new CompletionException("Failed to add document", ex);
- } finally {
- endeddDocIndexings.increment();
- }
- logger.trace(MARKER_LUCENE, "Added document: {}", document);
- });
- return count.sum();
- } else {
- var documentsList = collect(documents, fastListing());
- assert documentsList != null;
- var count = documentsList.size();
- StopWatch stopWatch = StopWatch.createStarted();
- try {
- startedDocIndexings.increment(count);
- try {
- indexWriter.addDocuments(LLUtils.toDocumentsFromEntries(documentsList));
- } catch (IOException e) {
- throw new DBException(e);
- } finally {
- endeddDocIndexings.increment(count);
- }
- } finally {
- docIndexingTime.record(stopWatch.getTime(TimeUnit.MILLISECONDS) / Math.max(count, 1),
- TimeUnit.MILLISECONDS
- );
- }
- return (long) documentsList.size();
- }
- });
- }
-
-
- @Override
- public void deleteDocument(LLTerm id) {
- this.runTask(() -> {
- try {
- return docIndexingTime.recordCallable(() -> {
- startedDocIndexings.increment();
- try {
- indexWriter.deleteDocuments(LLUtils.toTerm(id));
- } finally {
- endeddDocIndexings.increment();
- }
- return null;
- });
- } catch (Exception e) {
- throw new DBException("Failed to delete document", e);
- }
- });
- }
-
- @Override
- public void update(LLTerm id, LLIndexRequest request) {
- this.runTask(() -> {
- try {
- docIndexingTime.recordCallable(() -> {
- startedDocIndexings.increment();
- try {
- if (request instanceof LLUpdateDocument updateDocument) {
- indexWriter.updateDocument(LLUtils.toTerm(id), toDocument(updateDocument));
- } else if (request instanceof LLSoftUpdateDocument softUpdateDocument) {
- indexWriter.softUpdateDocument(LLUtils.toTerm(id),
- toDocument(softUpdateDocument.items()),
- toFields(softUpdateDocument.softDeleteItems())
- );
- } else if (request instanceof LLUpdateFields updateFields) {
- indexWriter.updateDocValues(LLUtils.toTerm(id), toFields(updateFields.items()));
- } else {
- throw new UnsupportedOperationException("Unexpected request type: " + request);
- }
- } finally {
- endeddDocIndexings.increment();
- }
- return null;
- });
- } catch (Exception e) {
- throw new DBException("Failed to update document", e);
- }
- logger.trace(MARKER_LUCENE, "Updated document {}: {}", id, request);
- return null;
- });
- }
-
- @Override
- public long updateDocuments(Stream> documents) {
- return runTask(() -> {
- var count = new LongAdder();
- documents.forEach(document -> {
- count.increment();
- LLTerm key = document.getKey();
- LLUpdateDocument value = document.getValue();
- startedDocIndexings.increment();
- try {
- docIndexingTime.recordCallable(() -> {
- indexWriter.updateDocument(LLUtils.toTerm(key), toDocument(value));
- return null;
- });
- logger.trace(MARKER_LUCENE, "Updated document {}: {}", key, value);
- } catch (Exception ex) {
- throw new CompletionException(ex);
- } finally {
- endeddDocIndexings.increment();
- }
- });
- return count.sum();
- });
- }
-
- @Override
- public void deleteAll() {
- this.runTask(() -> {
- shutdownLock.lock();
- try {
- indexWriter.deleteAll();
- indexWriter.forceMergeDeletes(true);
- indexWriter.commit();
- indexWriter.deleteUnusedFiles();
- } catch (IOException e) {
- throw new DBException(e);
- } finally {
- shutdownLock.unlock();
- }
- return null;
- });
- }
-
- @Override
- public Stream moreLikeThis(@Nullable LLSnapshot snapshot,
- QueryParams queryParams,
- @Nullable String keyFieldName,
- Multimap mltDocumentFieldsFlux) {
- LocalQueryParams localQueryParams = LuceneUtils.toLocalQueryParams(queryParams, luceneAnalyzer);
- var searcher = this.searcherManager.retrieveSearcher(snapshot);
- var transformer = new MoreLikeThisTransformer(mltDocumentFieldsFlux, luceneAnalyzer, luceneSimilarity);
-
- var result = localSearcher.collect(searcher, localQueryParams, keyFieldName, transformer, Function.identity());
- return Stream.of(new LLSearchResultShard(result.results(), result.totalHitsCount()));
- }
-
- @Override
- public Stream search(@Nullable LLSnapshot snapshot, QueryParams queryParams,
- @Nullable String keyFieldName) {
- var result = searchInternal(snapshot, queryParams, keyFieldName);
- var shard = new LLSearchResultShard(result.results(), result.totalHitsCount());
- return Stream.of(shard);
- }
-
- public LuceneSearchResult searchInternal(@Nullable LLSnapshot snapshot, QueryParams queryParams,
- @Nullable String keyFieldName) {
- LocalQueryParams localQueryParams = LuceneUtils.toLocalQueryParams(queryParams, luceneAnalyzer);
- try (var searcher = searcherManager.retrieveSearcher(snapshot)) {
- if (searcher != null) {
- return localSearcher.collect(searcher, localQueryParams, keyFieldName, NO_REWRITE, Function.identity());
- } else {
- return LuceneSearchResult.EMPTY;
- }
- }
- }
-
- @Override
- public TotalHitsCount count(@Nullable LLSnapshot snapshot, Query query, @Nullable Duration timeout) {
- var params = LuceneUtils.getCountQueryParams(query);
- var result = this.searchInternal(snapshot, params, null);
- if (result != null) {
- return result.totalHitsCount();
- } else {
- return EMPTY_COUNT;
- }
- }
-
- @Override
- public Buckets computeBuckets(@Nullable LLSnapshot snapshot,
- @NotNull List queries,
- @Nullable Query normalizationQuery,
- BucketParams bucketParams) {
- List localQueries = new ArrayList<>(queries.size());
- for (Query query : queries) {
- localQueries.add(QueryParser.toQuery(query, luceneAnalyzer));
- }
- var localNormalizationQuery = QueryParser.toQuery(normalizationQuery, luceneAnalyzer);
- try (LLIndexSearchers searchers = LLIndexSearchers.unsharded(searcherManager.retrieveSearcher(snapshot))) {
-
- return decimalBucketMultiSearcher.collectMulti(searchers, bucketParams, localQueries, localNormalizationQuery);
- }
- }
-
- public LLIndexSearcher retrieveSearcher(@Nullable LLSnapshot snapshot) {
- return searcherManager.retrieveSearcher(snapshot);
- }
-
- @Override
- protected void onClose() {
- logger.debug("Waiting IndexWriter tasks...");
- activeTasks.arriveAndAwaitAdvance();
- logger.debug("IndexWriter tasks ended");
- shutdownLock.lock();
- try {
- logger.debug("Closing searcher manager...");
- searcherManager.close();
- logger.debug("Searcher manager closed");
- logger.debug("Closing IndexWriter...");
- indexWriter.close();
- directory.close();
- logger.debug("IndexWriter closed");
- } catch (IOException ex) {
- throw new DBException(ex);
- } finally {
- shutdownLock.unlock();
- }
- }
-
- @Override
- public void flush() {
- runTask(() -> {
- if (activeTasks.isTerminated()) return null;
- shutdownLock.lock();
- try {
- if (isClosed()) {
- return null;
- }
- flushTime.recordCallable(() -> {
- indexWriter.flush();
- return null;
- });
- } catch (Exception e) {
- throw new DBException("Failed to flush", e);
- } finally {
- shutdownLock.unlock();
- }
- return null;
- });
- }
-
- @Override
- public void waitForMerges() {
- runTask(() -> {
- if (activeTasks.isTerminated()) return null;
- shutdownLock.lock();
- try {
- if (isClosed()) {
- return null;
- }
- var mergeScheduler = indexWriter.getConfig().getMergeScheduler();
- if (mergeScheduler instanceof ConcurrentMergeScheduler concurrentMergeScheduler) {
- concurrentMergeScheduler.sync();
- }
- } finally {
- shutdownLock.unlock();
- }
- return null;
- });
- }
-
- @Override
- public void waitForLastMerges() {
- runTask(() -> {
- if (activeTasks.isTerminated()) return null;
- shutdownLock.lock();
- try {
- if (isClosed()) {
- return null;
- }
- indexWriter.getConfig().setMergePolicy(NoMergePolicy.INSTANCE);
- var mergeScheduler = indexWriter.getConfig().getMergeScheduler();
- if (mergeScheduler instanceof ConcurrentMergeScheduler concurrentMergeScheduler) {
- concurrentMergeScheduler.sync();
- }
- indexWriter.deleteUnusedFiles();
- } catch (IOException e) {
- throw new DBException(e);
- } finally {
- shutdownLock.unlock();
- }
- return null;
- });
- }
-
- @Override
- public void refresh(boolean force) {
- runTask(() -> {
- activeTasks.register();
- try {
- if (activeTasks.isTerminated()) return null;
- shutdownLock.lock();
- try {
- if (isClosed()) {
- return null;
- }
- refreshTime.recordCallable(() -> {
- if (force) {
- searcherManager.maybeRefreshBlocking();
- } else {
- searcherManager.maybeRefresh();
- }
- return null;
- });
- } catch (Exception e) {
- throw new DBException("Failed to refresh", e);
- } finally {
- shutdownLock.unlock();
- }
- } finally {
- activeTasks.arriveAndDeregister();
- }
- return null;
- });
- }
-
- /**
- * Internal method, do not use
- */
- public void scheduledCommit() {
- shutdownLock.lock();
- try {
- if (isClosed()) {
- return;
- }
- commitTime.recordCallable(() -> {
- indexWriter.commit();
- indexWriter.deleteUnusedFiles();
- return null;
- });
- } catch (Exception ex) {
- logger.error(MARKER_LUCENE, "Failed to execute a scheduled commit", ex);
- } finally {
- shutdownLock.unlock();
- }
- }
-
- /**
- * Internal method, do not use
- */
- public void scheduledMerge() { // Do not use. Merges are done automatically by merge policies
- shutdownLock.lock();
- try {
- if (isClosed()) {
- return;
- }
- mergeTime.recordCallable(() -> {
- indexWriter.maybeMerge();
- return null;
- });
- } catch (Exception ex) {
- logger.error(MARKER_LUCENE, "Failed to execute a scheduled merge", ex);
- } finally {
- shutdownLock.unlock();
- }
- }
-
- @Override
- public boolean isLowMemoryMode() {
- return lowMemory;
- }
-
- private double getSnapshotsCount() {
- shutdownLock.lock();
- try {
- if (isClosed()) {
- return 0d;
- }
- if (snapshotsManager == null) return 0d;
- return snapshotsManager.getSnapshotsCount();
- } finally {
- shutdownLock.unlock();
- }
- }
-
- private double getIndexWriterFlushingBytes() {
- shutdownLock.lock();
- try {
- if (isClosed()) {
- return 0d;
- }
- return indexWriter.getFlushingBytes();
- } finally {
- shutdownLock.unlock();
- }
- }
-
- private double getIndexWriterMaxCompletedSequenceNumber() {
- shutdownLock.lock();
- try {
- if (isClosed()) {
- return 0d;
- }
- return indexWriter.getMaxCompletedSequenceNumber();
- } finally {
- shutdownLock.unlock();
- }
- }
-
- private double getIndexWriterPendingNumDocs() {
- shutdownLock.lock();
- try {
- if (isClosed()) {
- return 0d;
- }
- return indexWriter.getPendingNumDocs();
- } finally {
- shutdownLock.unlock();
- }
- }
-
- private double getIndexWriterMergingSegmentsSize() {
- shutdownLock.lock();
- try {
- if (isClosed()) {
- return 0d;
- }
- return indexWriter.getMergingSegments().size();
- } finally {
- shutdownLock.unlock();
- }
- }
-
- private double getDirectoryPendingDeletionsCount() {
- shutdownLock.lock();
- try {
- if (isClosed()) {
- return 0d;
- }
- return indexWriter.getDirectory().getPendingDeletions().size();
- } catch (IOException e) {
- return 0d;
- } finally {
- shutdownLock.unlock();
- }
- }
-
- private double getDocCount() {
- shutdownLock.lock();
- try {
- if (isClosed()) {
- return 0d;
- }
- var docStats = indexWriter.getDocStats();
- if (docStats != null) {
- return docStats.numDocs;
- } else {
- return 0d;
- }
- } finally {
- shutdownLock.unlock();
- }
- }
-
- private double getMaxDoc() {
- shutdownLock.lock();
- try {
- if (isClosed()) {
- return 0d;
- }
- var docStats = indexWriter.getDocStats();
- if (docStats != null) {
- return docStats.maxDoc;
- } else {
- return 0d;
- }
- } finally {
- shutdownLock.unlock();
- }
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
-
- LLLocalLuceneIndex that = (LLLocalLuceneIndex) o;
-
- return Objects.equals(shardName, that.shardName);
- }
-
- @Override
- public int hashCode() {
- return shardName.hashCode();
- }
-
- @Override
- public void pauseForBackup() {
- backuppable.pauseForBackup();
- }
-
- @Override
- public void resumeAfterBackup() {
- backuppable.resumeAfterBackup();
- }
-
- @Override
- public boolean isPaused() {
- return backuppable.isPaused();
- }
-
- private class LuceneBackuppable extends Backuppable {
-
- private LLSnapshot snapshot;
-
- @Override
- protected void onPauseForBackup() {
- var snapshot = LLLocalLuceneIndex.this.takeSnapshot();
- if (snapshot == null) {
- logger.error("Can't pause index \"{}\" because snapshots are not enabled!", shardName);
- }
- this.snapshot = snapshot;
- }
-
- @Override
- protected void onResumeAfterBackup() {
- if (snapshot == null) {
- return;
- }
- LLLocalLuceneIndex.this.releaseSnapshot(snapshot);
- }
- }
-}
diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMultiLuceneIndex.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMultiLuceneIndex.java
deleted file mode 100644
index e5add52..0000000
--- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMultiLuceneIndex.java
+++ /dev/null
@@ -1,345 +0,0 @@
-package it.cavallium.dbengine.database.disk;
-
-import static it.cavallium.dbengine.lucene.LuceneUtils.getLuceneIndexId;
-import static it.cavallium.dbengine.utils.StreamUtils.LUCENE_POOL;
-import static it.cavallium.dbengine.utils.StreamUtils.collectOn;
-import static it.cavallium.dbengine.utils.StreamUtils.executing;
-import static it.cavallium.dbengine.utils.StreamUtils.fastListing;
-import static it.cavallium.dbengine.utils.StreamUtils.fastReducing;
-import static it.cavallium.dbengine.utils.StreamUtils.fastSummingLong;
-import static it.cavallium.dbengine.utils.StreamUtils.partitionByInt;
-import static java.util.stream.Collectors.groupingBy;
-
-import com.google.common.collect.Multimap;
-import com.google.common.collect.Streams;
-import io.micrometer.core.instrument.MeterRegistry;
-import it.cavallium.dbengine.client.IBackuppable;
-import it.cavallium.dbengine.client.query.QueryParser;
-import it.cavallium.dbengine.client.query.current.data.Query;
-import it.cavallium.dbengine.client.query.current.data.QueryParams;
-import it.cavallium.dbengine.client.query.current.data.TotalHitsCount;
-import it.cavallium.dbengine.database.LLIndexRequest;
-import it.cavallium.dbengine.database.LLLuceneIndex;
-import it.cavallium.dbengine.database.LLSearchResultShard;
-import it.cavallium.dbengine.database.LLSnapshot;
-import it.cavallium.dbengine.database.LLSnapshottable;
-import it.cavallium.dbengine.database.LLTerm;
-import it.cavallium.dbengine.database.LLUpdateDocument;
-import it.cavallium.dbengine.database.SafeCloseable;
-import it.cavallium.dbengine.lucene.LuceneCloseable;
-import it.cavallium.dbengine.lucene.LuceneHacks;
-import it.cavallium.dbengine.lucene.LuceneUtils;
-import it.cavallium.dbengine.lucene.collector.Buckets;
-import it.cavallium.dbengine.lucene.mlt.MoreLikeThisTransformer;
-import it.cavallium.dbengine.lucene.searcher.AdaptiveMultiSearcher;
-import it.cavallium.dbengine.lucene.searcher.BucketParams;
-import it.cavallium.dbengine.lucene.searcher.DecimalBucketMultiSearcher;
-import it.cavallium.dbengine.lucene.searcher.GlobalQueryRewrite;
-import it.cavallium.dbengine.lucene.searcher.LocalQueryParams;
-import it.cavallium.dbengine.lucene.searcher.LuceneSearchResult;
-import it.cavallium.dbengine.lucene.searcher.MultiSearcher;
-import it.cavallium.dbengine.rpc.current.data.IndicizerAnalyzers;
-import it.cavallium.dbengine.rpc.current.data.IndicizerSimilarities;
-import it.cavallium.dbengine.rpc.current.data.LuceneOptions;
-import it.cavallium.dbengine.utils.DBException;
-import it.cavallium.dbengine.utils.SimpleResource;
-import it.cavallium.dbengine.utils.StreamUtils;
-import it.unimi.dsi.fastutil.ints.IntList;
-import java.io.Closeable;
-import java.io.IOException;
-import java.time.Duration;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map.Entry;
-import java.util.Objects;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.function.Function;
-import java.util.stream.Stream;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-import org.apache.lucene.analysis.miscellaneous.PerFieldAnalyzerWrapper;
-import org.apache.lucene.search.similarities.PerFieldSimilarityWrapper;
-import org.jetbrains.annotations.NotNull;
-import org.jetbrains.annotations.Nullable;
-
-public class LLLocalMultiLuceneIndex extends SimpleResource implements LLLuceneIndex, LuceneCloseable {
-
- private static final Logger LOG = LogManager.getLogger(LLLuceneIndex.class);
-
- private final String clusterName;
- private final boolean lowMemory;
- private final MeterRegistry meterRegistry;
- private final ConcurrentHashMap> registeredSnapshots = new ConcurrentHashMap<>();
- private final AtomicLong nextSnapshotNumber = new AtomicLong(1);
- private final LLLocalLuceneIndex[] luceneIndicesById;
- private final List luceneIndicesSet;
- private final int totalShards;
- private final PerFieldAnalyzerWrapper luceneAnalyzer;
- private final PerFieldSimilarityWrapper luceneSimilarity;
-
- private final MultiSearcher multiSearcher;
- private final DecimalBucketMultiSearcher decimalBucketMultiSearcher = new DecimalBucketMultiSearcher();
-
- public LLLocalMultiLuceneIndex(MeterRegistry meterRegistry,
- String clusterName,
- IntList activeShards,
- int totalShards,
- IndicizerAnalyzers indicizerAnalyzers,
- IndicizerSimilarities indicizerSimilarities,
- LuceneOptions luceneOptions,
- @Nullable LuceneHacks luceneHacks) {
-
- if (totalShards <= 1 || totalShards > 100) {
- throw new DBException("Unsupported instances count: " + totalShards);
- }
-
- this.meterRegistry = meterRegistry;
- LLLocalLuceneIndex[] luceneIndices = new LLLocalLuceneIndex[totalShards];
- for (int i = 0; i < totalShards; i++) {
- if (!activeShards.contains(i)) {
- continue;
- }
- luceneIndices[i] = new LLLocalLuceneIndex(meterRegistry,
- clusterName,
- i,
- indicizerAnalyzers,
- indicizerSimilarities,
- luceneOptions,
- luceneHacks
- );
- }
- this.clusterName = clusterName;
- this.totalShards = totalShards;
- this.luceneIndicesById = luceneIndices;
- var luceneIndicesSet = new HashSet();
- for (var luceneIndex : luceneIndices) {
- if (luceneIndex != null) {
- luceneIndicesSet.add(luceneIndex);
- }
- }
- this.luceneIndicesSet = new ArrayList<>(luceneIndicesSet);
- this.luceneAnalyzer = LuceneUtils.toPerFieldAnalyzerWrapper(indicizerAnalyzers);
- this.luceneSimilarity = LuceneUtils.toPerFieldSimilarityWrapper(indicizerSimilarities);
- this.lowMemory = luceneOptions.lowMemory();
-
- var maxInMemoryResultEntries = luceneOptions.maxInMemoryResultEntries();
- if (luceneHacks != null && luceneHacks.customMultiSearcher() != null) {
- multiSearcher = luceneHacks.customMultiSearcher().get();
- } else {
- multiSearcher = new AdaptiveMultiSearcher(maxInMemoryResultEntries);
- }
- }
-
- private LLLocalLuceneIndex getLuceneIndex(LLTerm id) {
- return Objects.requireNonNull(luceneIndicesById[LuceneUtils.getLuceneIndexId(id, totalShards)]);
- }
-
- @Override
- public String getLuceneIndexName() {
- return clusterName;
- }
-
- private LLIndexSearchers getIndexSearchers(LLSnapshot snapshot) {
- // Resolve the snapshot of each shard
- return LLIndexSearchers.of(StreamUtils.toListOn(StreamUtils.LUCENE_POOL,
- Streams.mapWithIndex(this.luceneIndicesSet.stream(), (luceneIndex, index) -> {
- var subSnapshot = resolveSnapshot(snapshot, (int) index);
- return luceneIndex.retrieveSearcher(subSnapshot);
- })
- ));
- }
-
- @Override
- public void addDocument(LLTerm id, LLUpdateDocument doc) {
- getLuceneIndex(id).addDocument(id, doc);
- }
-
- @Override
- public long addDocuments(boolean atomic, Stream> documents) {
- return collectOn(LUCENE_POOL,
- partitionByInt(term -> getLuceneIndexId(term.getKey(), totalShards), documents)
- .map(entry -> luceneIndicesById[entry.key()].addDocuments(atomic, entry.values().stream())),
- fastSummingLong()
- );
- }
-
- @Override
- public void deleteDocument(LLTerm id) {
- getLuceneIndex(id).deleteDocument(id);
- }
-
- @Override
- public void update(LLTerm id, LLIndexRequest request) {
- getLuceneIndex(id).update(id, request);
- }
-
- @Override
- public long updateDocuments(Stream> documents) {
- return collectOn(LUCENE_POOL,
- partitionByInt(term -> getLuceneIndexId(term.getKey(), totalShards), documents)
- .map(entry -> luceneIndicesById[entry.key()].updateDocuments(entry.values().stream())),
- fastSummingLong()
- );
- }
-
- @Override
- public void deleteAll() {
- luceneIndicesSet.forEach(LLLuceneIndex::deleteAll);
- }
-
- private LLSnapshot resolveSnapshot(LLSnapshot multiSnapshot, int instanceId) {
- if (multiSnapshot != null) {
- return registeredSnapshots.get(multiSnapshot.getSequenceNumber()).get(instanceId);
- } else {
- return null;
- }
- }
-
- @Override
- public Stream moreLikeThis(@Nullable LLSnapshot snapshot,
- QueryParams queryParams,
- String keyFieldName,
- Multimap mltDocumentFields) {
- LocalQueryParams localQueryParams = LuceneUtils.toLocalQueryParams(queryParams, luceneAnalyzer);
- try (var searchers = this.getIndexSearchers(snapshot)) {
- var transformer = new MoreLikeThisTransformer(mltDocumentFields, luceneAnalyzer, luceneSimilarity);
-
- // Collect all the shards results into a single global result
- LuceneSearchResult result = multiSearcher.collectMulti(searchers,
- localQueryParams,
- keyFieldName,
- transformer,
- Function.identity()
- );
-
- // Transform the result type
- return Stream.of(new LLSearchResultShard(result.results(), result.totalHitsCount()));
- }
- }
-
- @Override
- public Stream search(@Nullable LLSnapshot snapshot,
- QueryParams queryParams,
- @Nullable String keyFieldName) {
- LuceneSearchResult result = searchInternal(snapshot, queryParams, keyFieldName);
- // Transform the result type
- var shard = new LLSearchResultShard(result.results(), result.totalHitsCount());
- return Stream.of(shard);
- }
-
- private LuceneSearchResult searchInternal(@Nullable LLSnapshot snapshot,
- QueryParams queryParams,
- @Nullable String keyFieldName) {
- LocalQueryParams localQueryParams = LuceneUtils.toLocalQueryParams(queryParams, luceneAnalyzer);
- try (var searchers = getIndexSearchers(snapshot)) {
-
- // Collect all the shards results into a single global result
- return multiSearcher.collectMulti(searchers,
- localQueryParams,
- keyFieldName,
- GlobalQueryRewrite.NO_REWRITE,
- Function.identity()
- );
- }
- }
-
- @Override
- public TotalHitsCount count(@Nullable LLSnapshot snapshot, Query query, @Nullable Duration timeout) {
- var params = LuceneUtils.getCountQueryParams(query);
- var result = this.searchInternal(snapshot, params, null);
- return result != null ? result.totalHitsCount() : TotalHitsCount.of(0, true);
- }
-
- @Override
- public Buckets computeBuckets(@Nullable LLSnapshot snapshot,
- @NotNull List queries,
- @Nullable Query normalizationQuery,
- BucketParams bucketParams) {
- List localQueries = new ArrayList<>(queries.size());
- for (Query query : queries) {
- localQueries.add(QueryParser.toQuery(query, luceneAnalyzer));
- }
- var localNormalizationQuery = QueryParser.toQuery(normalizationQuery, luceneAnalyzer);
- try (var searchers = getIndexSearchers(snapshot)) {
-
- // Collect all the shards results into a single global result
- return decimalBucketMultiSearcher.collectMulti(searchers, bucketParams, localQueries, localNormalizationQuery);
- }
- }
-
- @Override
- protected void onClose() {
- collectOn(StreamUtils.LUCENE_POOL, luceneIndicesSet.stream(), executing(SafeCloseable::close));
- if (multiSearcher instanceof Closeable closeable) {
- try {
- closeable.close();
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
- }
-
- @Override
- public void flush() {
- collectOn(StreamUtils.LUCENE_POOL, luceneIndicesSet.stream(), executing(LLLuceneIndex::flush));
- }
-
- @Override
- public void waitForMerges() {
- collectOn(StreamUtils.LUCENE_POOL, luceneIndicesSet.stream(), executing(LLLuceneIndex::waitForMerges));
- }
-
- @Override
- public void waitForLastMerges() {
- collectOn(StreamUtils.LUCENE_POOL, luceneIndicesSet.stream(), executing(LLLuceneIndex::waitForLastMerges));
- }
-
- @Override
- public void refresh(boolean force) {
- collectOn(StreamUtils.LUCENE_POOL, luceneIndicesSet.stream(), executing(index -> index.refresh(force)));
- }
-
- @Override
- public LLSnapshot takeSnapshot() {
- // Generate next snapshot index
- var snapshotIndex = nextSnapshotNumber.getAndIncrement();
- var snapshot = collectOn(StreamUtils.LUCENE_POOL,
- luceneIndicesSet.stream().map(LLSnapshottable::takeSnapshot),
- fastListing()
- );
- registeredSnapshots.put(snapshotIndex, snapshot);
- return new LLSnapshot(snapshotIndex);
- }
-
- @Override
- public void releaseSnapshot(LLSnapshot snapshot) {
- var list = registeredSnapshots.remove(snapshot.getSequenceNumber());
- for (int shardIndex = 0; shardIndex < list.size(); shardIndex++) {
- var luceneIndex = luceneIndicesSet.get(shardIndex);
- LLSnapshot instanceSnapshot = list.get(shardIndex);
- luceneIndex.releaseSnapshot(instanceSnapshot);
- }
- }
-
- @Override
- public boolean isLowMemoryMode() {
- return lowMemory;
- }
-
- @Override
- public void pauseForBackup() {
- collectOn(StreamUtils.LUCENE_POOL, luceneIndicesSet.stream(), executing(LLLuceneIndex::pauseForBackup));
- }
-
- @Override
- public void resumeAfterBackup() {
- collectOn(StreamUtils.LUCENE_POOL, luceneIndicesSet.stream(), executing(LLLuceneIndex::resumeAfterBackup));
- }
-
- @Override
- public boolean isPaused() {
- return this.luceneIndicesSet.stream().anyMatch(IBackuppable::isPaused);
- }
-}
diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LuceneIndexSnapshot.java b/src/main/java/it/cavallium/dbengine/database/disk/LuceneIndexSnapshot.java
deleted file mode 100644
index 8610722..0000000
--- a/src/main/java/it/cavallium/dbengine/database/disk/LuceneIndexSnapshot.java
+++ /dev/null
@@ -1,75 +0,0 @@
-package it.cavallium.dbengine.database.disk;
-
-import it.cavallium.dbengine.database.DiscardingCloseable;
-import it.cavallium.dbengine.lucene.LuceneCloseable;
-import it.cavallium.dbengine.utils.SimpleResource;
-import java.io.IOException;
-import it.cavallium.dbengine.utils.DBException;
-import java.util.concurrent.Executor;
-import org.apache.lucene.index.DirectoryReader;
-import org.apache.lucene.index.IndexCommit;
-import org.apache.lucene.search.IndexSearcher;
-import org.jetbrains.annotations.Nullable;
-
-public class LuceneIndexSnapshot extends SimpleResource implements DiscardingCloseable, LuceneCloseable {
- private final IndexCommit snapshot;
-
- private boolean initialized;
- private boolean failed;
- private boolean closed;
-
- private DirectoryReader indexReader;
- private IndexSearcher indexSearcher;
-
- public LuceneIndexSnapshot(IndexCommit snapshot) {
- this.snapshot = snapshot;
- }
-
- public IndexCommit getSnapshot() {
- return snapshot;
- }
-
- /**
- * Can be called only if the snapshot has not been closed
- * @throws IllegalStateException if closed or failed
- */
- public synchronized IndexSearcher getIndexSearcher(@Nullable Executor searchExecutor) throws IllegalStateException {
- openDirectoryIfNeeded(searchExecutor);
- return indexSearcher;
- }
-
- private synchronized void openDirectoryIfNeeded(@Nullable Executor searchExecutor) throws IllegalStateException {
- if (closed) {
- throw new IllegalStateException("Snapshot is closed");
- }
- if (failed) {
- throw new IllegalStateException("Snapshot failed to open");
- }
- if (!initialized) {
- try {
- var indexReader = DirectoryReader.open(snapshot);
- this.indexReader = indexReader;
- indexSearcher = new IndexSearcher(indexReader, searchExecutor);
-
- initialized = true;
- } catch (IOException e) {
- failed = true;
- throw new RuntimeException(e);
- }
- }
- }
-
- @Override
- protected synchronized void onClose() {
- closed = true;
-
- if (initialized && !failed) {
- try {
- indexReader.close();
- } catch (IOException e) {
- throw new DBException(e);
- }
- indexSearcher = null;
- }
- }
-}
diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LuceneThreadFactory.java b/src/main/java/it/cavallium/dbengine/database/disk/LuceneThreadFactory.java
deleted file mode 100644
index 7ad0f3c..0000000
--- a/src/main/java/it/cavallium/dbengine/database/disk/LuceneThreadFactory.java
+++ /dev/null
@@ -1,27 +0,0 @@
-package it.cavallium.dbengine.database.disk;
-
-import it.cavallium.dbengine.lucene.LuceneThread;
-import it.cavallium.dbengine.utils.ShortNamedThreadFactory;
-import java.util.Locale;
-import org.jetbrains.annotations.NotNull;
-
-public class LuceneThreadFactory extends ShortNamedThreadFactory {
-
- /**
- * Creates a new {@link ShortNamedThreadFactory} instance
- *
- * @param threadNamePrefix the name prefix assigned to each thread created.
- */
- public LuceneThreadFactory(String threadNamePrefix) {
- super(threadNamePrefix);
- }
-
- @Override
- public Thread newThread(@NotNull Runnable r) {
- final Thread t = new LuceneThread(group, r, String.format(Locale.ROOT, "%s-%d",
- this.threadNamePrefix, threadNumber.getAndIncrement()), 0);
- t.setDaemon(daemon);
- t.setPriority(Thread.NORM_PRIORITY);
- return t;
- }
-}
diff --git a/src/main/java/it/cavallium/dbengine/database/disk/OptimisticRocksDBColumn.java b/src/main/java/it/cavallium/dbengine/database/disk/OptimisticRocksDBColumn.java
index e462876..1dd1c06 100644
--- a/src/main/java/it/cavallium/dbengine/database/disk/OptimisticRocksDBColumn.java
+++ b/src/main/java/it/cavallium/dbengine/database/disk/OptimisticRocksDBColumn.java
@@ -10,9 +10,8 @@ import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.disk.rocksdb.LLReadOptions;
import it.cavallium.dbengine.database.disk.rocksdb.LLWriteOptions;
import it.cavallium.dbengine.database.serialization.SerializationFunction;
-import it.cavallium.dbengine.lucene.ExponentialPageLimits;
+import it.cavallium.dbengine.utils.ExponentialLimits;
import it.cavallium.dbengine.utils.DBException;
-import java.io.IOException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.locks.LockSupport;
@@ -21,13 +20,11 @@ import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.OptimisticTransactionDB;
-import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDBException;
import org.rocksdb.Status.Code;
import org.rocksdb.Transaction;
import org.rocksdb.TransactionOptions;
import org.rocksdb.WriteBatch;
-import org.rocksdb.WriteOptions;
public final class OptimisticRocksDBColumn extends AbstractRocksDBColumn {
@@ -95,7 +92,7 @@ public final class OptimisticRocksDBColumn extends AbstractRocksDBColumn refreshSubscription;
-
- public SimpleIndexSearcherManager(IndexWriter indexWriter,
- @Nullable SnapshotsManager snapshotsManager,
- ScheduledExecutorService luceneHeavyTasksScheduler,
- Similarity similarity,
- boolean applyAllDeletes,
- boolean writeAllDeletes,
- Duration queryRefreshDebounceTime) {
- this.snapshotsManager = snapshotsManager;
- this.luceneHeavyTasksScheduler = luceneHeavyTasksScheduler;
- this.similarity = similarity;
- this.queryRefreshDebounceTime = queryRefreshDebounceTime;
-
- try {
- this.searcherManager = new SearcherManager(indexWriter, applyAllDeletes, writeAllDeletes, SEARCHER_FACTORY);
- } catch (IOException e) {
- throw new DBException(e);
- }
-
- refreshSubscription = luceneHeavyTasksScheduler.scheduleAtFixedRate(() -> {
- try {
- maybeRefresh();
- } catch (Exception ex) {
- LOG.error("Failed to refresh the searcher manager", ex);
- }
- }, queryRefreshDebounceTime.toMillis(), queryRefreshDebounceTime.toMillis(), TimeUnit.MILLISECONDS);
- }
-
- private void dropCachedIndexSearcher() {
- // This shouldn't happen more than once per searcher.
- activeSearchers.decrementAndGet();
- }
-
- @Override
- public void maybeRefreshBlocking() {
- try {
- activeRefreshes.incrementAndGet();
- searcherManager.maybeRefreshBlocking();
- } catch (AlreadyClosedException ignored) {
-
- } catch (IOException e) {
- throw new DBException(e);
- } finally {
- activeRefreshes.decrementAndGet();
- }
- }
-
- @Override
- public void maybeRefresh() {
- try {
- activeRefreshes.incrementAndGet();
- searcherManager.maybeRefresh();
- } catch (AlreadyClosedException ignored) {
-
- } catch (IOException e) {
- throw new DBException(e);
- } finally {
- activeRefreshes.decrementAndGet();
- }
- }
-
- @Override
- public LLIndexSearcher retrieveSearcher(@Nullable LLSnapshot snapshot) {
- if (snapshot == null) {
- return retrieveSearcherInternal(null);
- } else {
- return retrieveSearcherInternal(snapshot);
- }
- }
-
- private LLIndexSearcher retrieveSearcherInternal(@Nullable LLSnapshot snapshot) {
- if (isClosed()) {
- return null;
- }
- try {
- if (snapshotsManager == null || snapshot == null) {
- return new OnDemandIndexSearcher(searcherManager, similarity);
- } else {
- activeSearchers.incrementAndGet();
- IndexSearcher indexSearcher = snapshotsManager.resolveSnapshot(snapshot).getIndexSearcher(SEARCH_EXECUTOR);
- indexSearcher.setSimilarity(similarity);
- assert indexSearcher.getIndexReader().getRefCount() > 0;
- return new SnapshotIndexSearcher(indexSearcher);
- }
- } catch (Throwable ex) {
- activeSearchers.decrementAndGet();
- throw ex;
- }
- }
-
- @Override
- protected void onClose() {
- LOG.debug("Closing IndexSearcherManager...");
- refreshSubscription.cancel(false);
- long initTime = System.nanoTime();
- while (!refreshSubscription.isDone() && (System.nanoTime() - initTime) <= 15000000000L) {
- LockSupport.parkNanos(50000000);
- }
- refreshSubscription.cancel(true);
- LOG.debug("Closed IndexSearcherManager");
- LOG.debug("Closing refresh tasks...");
- initTime = System.nanoTime();
- while (activeRefreshes.get() > 0 && (System.nanoTime() - initTime) <= 15000000000L) {
- LockSupport.parkNanos(50000000);
- }
- if (activeRefreshes.get() > 0) {
- LOG.warn("Some refresh tasks remained active after shutdown: {}", activeRefreshes.get());
- }
- LOG.debug("Closed refresh tasks");
- LOG.debug("Closing active searchers...");
- initTime = System.nanoTime();
- while (activeSearchers.get() > 0 && (System.nanoTime() - initTime) <= 15000000000L) {
- LockSupport.parkNanos(50000000);
- }
- if (activeSearchers.get() > 0) {
- LOG.warn("Some searchers remained active after shutdown: {}", activeSearchers.get());
- }
- LOG.debug("Closed active searchers");
- LOG.debug("Stopping searcher executor...");
- SEARCH_EXECUTOR.shutdown();
- try {
- if (!SEARCH_EXECUTOR.awaitTermination(15, TimeUnit.SECONDS)) {
- SEARCH_EXECUTOR.shutdownNow();
- }
- } catch (InterruptedException e) {
- LOG.error("Failed to stop executor", e);
- }
- LOG.debug("Stopped searcher executor");
- }
-
- public long getActiveSearchers() {
- return activeSearchers.get();
- }
-
- public long getActiveRefreshes() {
- return activeRefreshes.get();
- }
-
- private class MainIndexSearcher extends LLIndexSearcherImpl implements LuceneCloseable {
-
- public MainIndexSearcher(IndexSearcher indexSearcher) {
- super(indexSearcher, () -> releaseOnCleanup(searcherManager, indexSearcher));
- }
-
- private static void releaseOnCleanup(SearcherManager searcherManager, IndexSearcher indexSearcher) {
- try {
- LOG.warn("An index searcher was not closed!");
- searcherManager.release(indexSearcher);
- } catch (IOException ex) {
- LOG.error("Failed to release the index searcher during cleanup: {}", indexSearcher, ex);
- }
- }
-
- @Override
- public void onClose() {
- dropCachedIndexSearcher();
- try {
- searcherManager.release(indexSearcher);
- } catch (IOException ex) {
- throw new DBException(ex);
- }
- }
- }
-
- private class SnapshotIndexSearcher extends LLIndexSearcherImpl {
-
- public SnapshotIndexSearcher(IndexSearcher indexSearcher) {
- super(indexSearcher);
- }
-
- @Override
- public void onClose() {
- dropCachedIndexSearcher();
- }
- }
-
- private class OnDemandIndexSearcher extends LLIndexSearcher implements LuceneCloseable {
-
- private final SearcherManager searcherManager;
- private final Similarity similarity;
-
- private IndexSearcher indexSearcher = null;
-
- public OnDemandIndexSearcher(SearcherManager searcherManager,
- Similarity similarity) {
- super();
- this.searcherManager = searcherManager;
- this.similarity = similarity;
- }
-
- @Override
- protected IndexSearcher getIndexSearcherInternal() {
- if (indexSearcher != null) {
- return indexSearcher;
- }
- synchronized (this) {
- try {
- var indexSearcher = searcherManager.acquire();
- indexSearcher.setSimilarity(similarity);
- activeSearchers.incrementAndGet();
- this.indexSearcher = indexSearcher;
- return indexSearcher;
- } catch (IOException e) {
- throw new IllegalStateException("Failed to acquire the index searcher", e);
- }
- }
- }
-
- @Override
- protected void onClose() {
- try {
- synchronized (this) {
- if (indexSearcher != null) {
- dropCachedIndexSearcher();
- searcherManager.release(indexSearcher);
- }
- }
- } catch (IOException ex) {
- throw new DBException(ex);
- }
- }
- }
-}
diff --git a/src/main/java/it/cavallium/dbengine/database/disk/SnapshotsManager.java b/src/main/java/it/cavallium/dbengine/database/disk/SnapshotsManager.java
deleted file mode 100644
index 975fde9..0000000
--- a/src/main/java/it/cavallium/dbengine/database/disk/SnapshotsManager.java
+++ /dev/null
@@ -1,110 +0,0 @@
-package it.cavallium.dbengine.database.disk;
-
-import it.cavallium.dbengine.database.LLSnapshot;
-import it.cavallium.dbengine.utils.SimpleResource;
-import java.io.IOException;
-import it.cavallium.dbengine.utils.DBException;
-import java.util.Objects;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Phaser;
-import java.util.concurrent.atomic.AtomicLong;
-import org.apache.lucene.index.IndexCommit;
-import org.apache.lucene.index.IndexWriter;
-import org.apache.lucene.index.SnapshotDeletionPolicy;
-import org.jetbrains.annotations.Nullable;
-
-public class SnapshotsManager extends SimpleResource {
-
- private final IndexWriter indexWriter;
- private final SnapshotDeletionPolicy snapshotter;
- private final Phaser activeTasks = new Phaser(1);
- /**
- * Last snapshot sequence number. 0 is not used
- */
- private final AtomicLong lastSnapshotSeqNo = new AtomicLong(0);
- /**
- * LLSnapshot seq no to index commit point
- */
- private final ConcurrentHashMap snapshots = new ConcurrentHashMap<>();
-
- public SnapshotsManager(IndexWriter indexWriter,
- SnapshotDeletionPolicy snapshotter) {
- this.indexWriter = indexWriter;
- this.snapshotter = snapshotter;
- }
-
- public LuceneIndexSnapshot resolveSnapshot(@Nullable LLSnapshot snapshot) {
- if (snapshot == null) {
- return null;
- }
- return Objects.requireNonNull(snapshots.get(snapshot.getSequenceNumber()),
- () -> "Can't resolve snapshot " + snapshot.getSequenceNumber()
- );
- }
-
- public LLSnapshot takeSnapshot() {
- return takeLuceneSnapshot();
- }
-
- /**
- * Use internally. This method commits before taking the snapshot if there are no commits in a new database,
- * avoiding the exception.
- */
- private LLSnapshot takeLuceneSnapshot() {
- activeTasks.register();
- try {
- if (snapshotter.getSnapshots().isEmpty()) {
- indexWriter.commit();
- }
- var snapshotSeqNo = lastSnapshotSeqNo.incrementAndGet();
- IndexCommit snapshot = snapshotter.snapshot();
- var prevSnapshot = this.snapshots.put(snapshotSeqNo, new LuceneIndexSnapshot(snapshot));
-
- // Unexpectedly found a snapshot
- if (prevSnapshot != null) {
- try {
- prevSnapshot.close();
- } catch (DBException e) {
- throw new IllegalStateException("Can't close snapshot", e);
- }
- }
-
- return new LLSnapshot(snapshotSeqNo);
- } catch (IOException e) {
- throw new DBException(e);
- } finally {
- activeTasks.arriveAndDeregister();
- }
- }
-
- public void releaseSnapshot(LLSnapshot snapshot) {
- activeTasks.register();
- try {
- var indexSnapshot = this.snapshots.remove(snapshot.getSequenceNumber());
- if (indexSnapshot == null) {
- throw new DBException("LLSnapshot " + snapshot.getSequenceNumber() + " not found!");
- }
-
- var luceneIndexSnapshot = indexSnapshot.getSnapshot();
- snapshotter.release(luceneIndexSnapshot);
- } catch (IOException e) {
- throw new DBException(e);
- } finally {
- activeTasks.arriveAndDeregister();
- }
- }
-
- /**
- * Returns the total number of snapshots currently held.
- */
- public int getSnapshotsCount() {
- return Math.max(snapshots.size(), snapshotter.getSnapshotCount());
- }
-
- @Override
- protected void onClose() {
- if (!activeTasks.isTerminated()) {
- activeTasks.arriveAndAwaitAdvance();
- }
- }
-}
diff --git a/src/main/java/it/cavallium/dbengine/database/memory/LLMemoryDatabaseConnection.java b/src/main/java/it/cavallium/dbengine/database/memory/LLMemoryDatabaseConnection.java
index d3e8c1b..0fac207 100644
--- a/src/main/java/it/cavallium/dbengine/database/memory/LLMemoryDatabaseConnection.java
+++ b/src/main/java/it/cavallium/dbengine/database/memory/LLMemoryDatabaseConnection.java
@@ -3,23 +3,11 @@ package it.cavallium.dbengine.database.memory;
import io.micrometer.core.instrument.MeterRegistry;
import it.cavallium.dbengine.database.LLDatabaseConnection;
import it.cavallium.dbengine.database.LLKeyValueDatabase;
-import it.cavallium.dbengine.database.LLLuceneIndex;
-import it.cavallium.dbengine.database.disk.LLLocalLuceneIndex;
-import it.cavallium.dbengine.lucene.LuceneHacks;
-import it.cavallium.dbengine.lucene.LuceneUtils;
-import it.cavallium.dbengine.rpc.current.data.ByteBuffersDirectory;
import it.cavallium.dbengine.rpc.current.data.Column;
import it.cavallium.dbengine.rpc.current.data.DatabaseOptions;
-import it.cavallium.dbengine.rpc.current.data.IndicizerAnalyzers;
-import it.cavallium.dbengine.rpc.current.data.IndicizerSimilarities;
-import it.cavallium.dbengine.rpc.current.data.LuceneIndexStructure;
-import it.cavallium.dbengine.rpc.current.data.LuceneOptions;
-import it.cavallium.dbengine.rpc.current.data.LuceneOptionsBuilder;
import java.util.List;
import java.util.StringJoiner;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-import org.jetbrains.annotations.Nullable;
public class LLMemoryDatabaseConnection implements LLDatabaseConnection {
@@ -50,27 +38,6 @@ public class LLMemoryDatabaseConnection implements LLDatabaseConnection {
return new LLMemoryKeyValueDatabase(meterRegistry, name, columns);
}
- @Override
- public LLLuceneIndex getLuceneIndex(String clusterName,
- LuceneIndexStructure indexStructure,
- IndicizerAnalyzers indicizerAnalyzers,
- IndicizerSimilarities indicizerSimilarities,
- LuceneOptions luceneOptions,
- @Nullable LuceneHacks luceneHacks) {
- var memoryLuceneOptions = LuceneOptionsBuilder
- .builder(luceneOptions)
- .directoryOptions(new ByteBuffersDirectory())
- .build();
- return new LLLocalLuceneIndex(meterRegistry,
- clusterName,
- 0,
- indicizerAnalyzers,
- indicizerSimilarities,
- memoryLuceneOptions,
- luceneHacks
- );
- }
-
@Override
public void disconnect() {
connected.compareAndSet(true, false);
diff --git a/src/main/java/it/cavallium/dbengine/database/remote/LuceneHacksSerializer.java b/src/main/java/it/cavallium/dbengine/database/remote/LuceneHacksSerializer.java
deleted file mode 100644
index e9255b2..0000000
--- a/src/main/java/it/cavallium/dbengine/database/remote/LuceneHacksSerializer.java
+++ /dev/null
@@ -1,25 +0,0 @@
-package it.cavallium.dbengine.database.remote;
-
-import it.cavallium.datagen.DataSerializer;
-import it.cavallium.dbengine.lucene.LuceneHacks;
-import it.cavallium.stream.SafeDataInput;
-import it.cavallium.stream.SafeDataOutput;
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import org.jetbrains.annotations.NotNull;
-
-public class LuceneHacksSerializer implements DataSerializer {
-
- @Override
- public void serialize(SafeDataOutput dataOutput, @NotNull LuceneHacks luceneHacks) {
- if (luceneHacks.customLocalSearcher() != null || luceneHacks.customMultiSearcher() != null) {
- throw new UnsupportedOperationException("Can't encode this type");
- }
- }
-
- @Override
- public @NotNull LuceneHacks deserialize(SafeDataInput dataInput) {
- return new LuceneHacks(null, null);
- }
-}
diff --git a/src/main/java/it/cavallium/dbengine/database/remote/String2FieldAnalyzerMapSerializer.java b/src/main/java/it/cavallium/dbengine/database/remote/String2FieldAnalyzerMapSerializer.java
deleted file mode 100644
index 06b359e..0000000
--- a/src/main/java/it/cavallium/dbengine/database/remote/String2FieldAnalyzerMapSerializer.java
+++ /dev/null
@@ -1,38 +0,0 @@
-package it.cavallium.dbengine.database.remote;
-
-import it.cavallium.datagen.DataSerializer;
-import it.cavallium.dbengine.lucene.analyzer.TextFieldsAnalyzer;
-import it.cavallium.stream.SafeDataInput;
-import it.cavallium.stream.SafeDataOutput;
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Map.Entry;
-import org.jetbrains.annotations.NotNull;
-
-public class String2FieldAnalyzerMapSerializer implements DataSerializer