diff --git a/pom.xml b/pom.xml index 04de6b5..b134909 100644 --- a/pom.xml +++ b/pom.xml @@ -260,7 +260,7 @@ org.jetbrains annotations - 23.0.0 + 24.0.1 compile diff --git a/src/main/java/it/cavallium/dbengine/client/LuceneIndexImpl.java b/src/main/java/it/cavallium/dbengine/client/LuceneIndexImpl.java index dd5df85..53c68dc 100644 --- a/src/main/java/it/cavallium/dbengine/client/LuceneIndexImpl.java +++ b/src/main/java/it/cavallium/dbengine/client/LuceneIndexImpl.java @@ -1,10 +1,7 @@ package it.cavallium.dbengine.client; -import static it.cavallium.dbengine.utils.StreamUtils.LUCENE_SCHEDULER; -import static it.cavallium.dbengine.utils.StreamUtils.collect; +import static it.cavallium.dbengine.utils.StreamUtils.LUCENE_POOL; import static it.cavallium.dbengine.utils.StreamUtils.collectOn; -import static it.cavallium.dbengine.utils.StreamUtils.fastListing; -import static it.cavallium.dbengine.utils.StreamUtils.toListOn; import static java.util.stream.Collectors.collectingAndThen; import static java.util.stream.Collectors.toList; @@ -17,8 +14,6 @@ import it.cavallium.dbengine.database.LLSearchResultShard; import it.cavallium.dbengine.database.LLSnapshot; import it.cavallium.dbengine.database.LLTerm; import it.cavallium.dbengine.database.LLUtils; -import it.cavallium.dbengine.database.SafeCloseable; -import it.cavallium.dbengine.lucene.LuceneCloseable; import it.cavallium.dbengine.lucene.LuceneUtils; import it.cavallium.dbengine.lucene.collector.Buckets; import it.cavallium.dbengine.lucene.searcher.BucketParams; @@ -31,8 +26,6 @@ import java.util.Map.Entry; import java.util.Objects; import java.util.function.Function; import java.util.stream.Stream; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -95,7 +88,7 @@ public class LuceneIndexImpl implements LuceneIndex { var mltDocumentFields = indicizer.getMoreLikeThisDocumentFields(key, mltDocumentValue); - return collectOn(LUCENE_SCHEDULER, luceneIndex.moreLikeThis(resolveSnapshot(queryParams.snapshot()), + return collectOn(LUCENE_POOL, luceneIndex.moreLikeThis(resolveSnapshot(queryParams.snapshot()), queryParams.toQueryParams(), indicizer.getKeyFieldName(), mltDocumentFields), @@ -104,7 +97,7 @@ public class LuceneIndexImpl implements LuceneIndex { @Override public Hits> search(ClientQueryParams queryParams) { - return collectOn(LUCENE_SCHEDULER, luceneIndex.search(resolveSnapshot(queryParams.snapshot()), + return collectOn(LUCENE_POOL, luceneIndex.search(resolveSnapshot(queryParams.snapshot()), queryParams.toQueryParams(), indicizer.getKeyFieldName()), collectingAndThen(toList(), toHitsCollector(queryParams))); diff --git a/src/main/java/it/cavallium/dbengine/database/LLLuceneIndex.java b/src/main/java/it/cavallium/dbengine/database/LLLuceneIndex.java index 9dd4aa0..be3eae1 100644 --- a/src/main/java/it/cavallium/dbengine/database/LLLuceneIndex.java +++ b/src/main/java/it/cavallium/dbengine/database/LLLuceneIndex.java @@ -12,11 +12,9 @@ import it.cavallium.dbengine.client.query.current.data.TotalHitsCount; import it.cavallium.dbengine.lucene.collector.Buckets; import it.cavallium.dbengine.lucene.searcher.BucketParams; import it.cavallium.dbengine.utils.StreamUtils; -import java.io.IOException; import java.time.Duration; import java.util.List; import java.util.Map.Entry; -import java.util.stream.Collectors; import java.util.stream.Stream; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -76,7 +74,7 @@ public interface LLLuceneIndex extends LLSnapshottable, IBackuppable, SafeClosea false, timeout == null ? Long.MAX_VALUE : timeout.toMillis() ); - return collectOn(StreamUtils.LUCENE_SCHEDULER, + return collectOn(StreamUtils.LUCENE_POOL, this.search(snapshot, params, null).map(LLSearchResultShard::totalHitsCount), fastReducing(TotalHitsCount.of(0, true), (a, b) -> TotalHitsCount.of(a.value() + b.value(), a.exact() && b.exact()) diff --git a/src/main/java/it/cavallium/dbengine/database/LLMultiDatabaseConnection.java b/src/main/java/it/cavallium/dbengine/database/LLMultiDatabaseConnection.java index e51f85c..d941514 100644 --- a/src/main/java/it/cavallium/dbengine/database/LLMultiDatabaseConnection.java +++ b/src/main/java/it/cavallium/dbengine/database/LLMultiDatabaseConnection.java @@ -1,6 +1,6 @@ package it.cavallium.dbengine.database; -import static it.cavallium.dbengine.utils.StreamUtils.ROCKSDB_SCHEDULER; +import static it.cavallium.dbengine.utils.StreamUtils.ROCKSDB_POOL; import static it.cavallium.dbengine.utils.StreamUtils.collectOn; import static it.cavallium.dbengine.utils.StreamUtils.executing; @@ -18,7 +18,6 @@ import it.cavallium.dbengine.rpc.current.data.LuceneOptions; import it.unimi.dsi.fastutil.ints.IntArrayList; import it.unimi.dsi.fastutil.ints.IntOpenHashSet; import it.unimi.dsi.fastutil.ints.IntSet; -import java.io.IOException; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -27,7 +26,6 @@ import java.util.Map.Entry; import java.util.Objects; import java.util.Set; import java.util.StringJoiner; -import java.util.concurrent.CompletionException; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.jetbrains.annotations.Nullable; @@ -90,7 +88,7 @@ public class LLMultiDatabaseConnection implements LLDatabaseConnection { @Override public LLDatabaseConnection connect() { - collectOn(ROCKSDB_SCHEDULER, allConnections.stream(), executing(connection -> { + collectOn(ROCKSDB_POOL, allConnections.stream(), executing(connection -> { try { connection.connect(); } catch (Exception ex) { @@ -168,7 +166,7 @@ public class LLMultiDatabaseConnection implements LLDatabaseConnection { @Override public void disconnect() { - collectOn(ROCKSDB_SCHEDULER, allConnections.stream(), executing(connection -> { + collectOn(ROCKSDB_POOL, allConnections.stream(), executing(connection -> { try { connection.disconnect(); } catch (Exception ex) { diff --git a/src/main/java/it/cavallium/dbengine/database/LLMultiLuceneIndex.java b/src/main/java/it/cavallium/dbengine/database/LLMultiLuceneIndex.java index e17da25..8c2f6cb 100644 --- a/src/main/java/it/cavallium/dbengine/database/LLMultiLuceneIndex.java +++ b/src/main/java/it/cavallium/dbengine/database/LLMultiLuceneIndex.java @@ -2,8 +2,7 @@ package it.cavallium.dbengine.database; import static it.cavallium.dbengine.database.LLUtils.mapList; import static it.cavallium.dbengine.lucene.LuceneUtils.getLuceneIndexId; -import static it.cavallium.dbengine.utils.StreamUtils.LUCENE_SCHEDULER; -import static it.cavallium.dbengine.utils.StreamUtils.collect; +import static it.cavallium.dbengine.utils.StreamUtils.LUCENE_POOL; import static it.cavallium.dbengine.utils.StreamUtils.collectOn; import static it.cavallium.dbengine.utils.StreamUtils.executing; import static it.cavallium.dbengine.utils.StreamUtils.fastListing; @@ -23,16 +22,13 @@ import it.cavallium.dbengine.rpc.current.data.IndicizerAnalyzers; import it.cavallium.dbengine.rpc.current.data.IndicizerSimilarities; import it.cavallium.dbengine.rpc.current.data.LuceneIndexStructure; import it.cavallium.dbengine.rpc.current.data.LuceneOptions; -import it.cavallium.dbengine.utils.StreamUtils; import it.unimi.dsi.fastutil.doubles.DoubleArrayList; -import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap; import java.util.ArrayList; import java.util.HashSet; import java.util.List; import java.util.Map.Entry; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; -import java.util.stream.Collectors; import java.util.stream.Stream; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -93,7 +89,7 @@ public class LLMultiLuceneIndex implements LLLuceneIndex { @Override public long addDocuments(boolean atomic, Stream> documents) { - return collectOn(LUCENE_SCHEDULER, + return collectOn(LUCENE_POOL, partitionByInt(term -> getLuceneIndexId(term.getKey(), totalShards), documents) .map(entry -> luceneIndicesById[entry.key()].addDocuments(atomic, entry.values().stream())), fastSummingLong() @@ -112,7 +108,7 @@ public class LLMultiLuceneIndex implements LLLuceneIndex { @Override public long updateDocuments(Stream> documents) { - return collectOn(LUCENE_SCHEDULER, + return collectOn(LUCENE_POOL, partitionByInt(term -> getLuceneIndexId(term.getKey(), totalShards), documents) .map(entry -> luceneIndicesById[entry.key()].updateDocuments(entry.values().stream())), fastSummingLong() @@ -189,34 +185,34 @@ public class LLMultiLuceneIndex implements LLLuceneIndex { @Override public void close() { - collectOn(LUCENE_SCHEDULER, luceneIndicesSet.stream(), executing(LLLuceneIndex::close)); + collectOn(LUCENE_POOL, luceneIndicesSet.stream(), executing(LLLuceneIndex::close)); } @Override public void flush() { - collectOn(LUCENE_SCHEDULER, luceneIndicesSet.stream(), executing(LLLuceneIndex::flush)); + collectOn(LUCENE_POOL, luceneIndicesSet.stream(), executing(LLLuceneIndex::flush)); } @Override public void waitForMerges() { - collectOn(LUCENE_SCHEDULER, luceneIndicesSet.stream(), executing(LLLuceneIndex::waitForMerges)); + collectOn(LUCENE_POOL, luceneIndicesSet.stream(), executing(LLLuceneIndex::waitForMerges)); } @Override public void waitForLastMerges() { - collectOn(LUCENE_SCHEDULER, luceneIndicesSet.stream(), executing(LLLuceneIndex::waitForLastMerges)); + collectOn(LUCENE_POOL, luceneIndicesSet.stream(), executing(LLLuceneIndex::waitForLastMerges)); } @Override public void refresh(boolean force) { - collectOn(LUCENE_SCHEDULER, luceneIndicesSet.stream(), executing(index -> index.refresh(force))); + collectOn(LUCENE_POOL, luceneIndicesSet.stream(), executing(index -> index.refresh(force))); } @Override public LLSnapshot takeSnapshot() { // Generate next snapshot index var snapshotIndex = nextSnapshotNumber.getAndIncrement(); - var snapshot = collectOn(LUCENE_SCHEDULER, luceneIndicesSet.stream().map(LLSnapshottable::takeSnapshot), fastListing()); + var snapshot = collectOn(LUCENE_POOL, luceneIndicesSet.stream().map(LLSnapshottable::takeSnapshot), fastListing()); registeredSnapshots.put(snapshotIndex, snapshot); return new LLSnapshot(snapshotIndex); } @@ -233,12 +229,12 @@ public class LLMultiLuceneIndex implements LLLuceneIndex { @Override public void pauseForBackup() { - collectOn(LUCENE_SCHEDULER, luceneIndicesSet.stream(), executing(LLLuceneIndex::pauseForBackup)); + collectOn(LUCENE_POOL, luceneIndicesSet.stream(), executing(LLLuceneIndex::pauseForBackup)); } @Override public void resumeAfterBackup() { - collectOn(LUCENE_SCHEDULER, luceneIndicesSet.stream(), executing(LLLuceneIndex::resumeAfterBackup)); + collectOn(LUCENE_POOL, luceneIndicesSet.stream(), executing(LLLuceneIndex::resumeAfterBackup)); } @Override diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseStageMap.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseStageMap.java index 81369df..41d027e 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseStageMap.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseStageMap.java @@ -1,6 +1,6 @@ package it.cavallium.dbengine.database.collections; -import static it.cavallium.dbengine.utils.StreamUtils.ROCKSDB_SCHEDULER; +import static it.cavallium.dbengine.utils.StreamUtils.ROCKSDB_POOL; import static it.cavallium.dbengine.utils.StreamUtils.collectOn; import static it.cavallium.dbengine.utils.StreamUtils.count; import static it.cavallium.dbengine.utils.StreamUtils.executing; @@ -20,7 +20,6 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Objects; import java.util.Optional; -import java.util.function.BiFunction; import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; @@ -105,7 +104,7 @@ public interface DatabaseStageMap> extends Dat } default void putMulti(Stream> entries) { - collectOn(ROCKSDB_SCHEDULER, entries, executing(entry -> this.putValue(entry.getKey(), entry.getValue()))); + collectOn(ROCKSDB_POOL, entries, executing(entry -> this.putValue(entry.getKey(), entry.getValue()))); } Stream> getAllStages(@Nullable CompositeSnapshot snapshot, boolean smallRange); @@ -150,7 +149,7 @@ public interface DatabaseStageMap> extends Dat this.setAllEntries(entries.map(entriesReplacer)); } } else { - collectOn(ROCKSDB_SCHEDULER, + collectOn(ROCKSDB_POOL, this.getAllEntries(null, smallRange).map(entriesReplacer), executing(replacedEntry -> this.at(null, replacedEntry.getKey()).set(replacedEntry.getValue())) ); @@ -158,7 +157,7 @@ public interface DatabaseStageMap> extends Dat } default void replaceAll(Consumer> entriesReplacer) { - collectOn(ROCKSDB_SCHEDULER, this.getAllStages(null, false), executing(entriesReplacer)); + collectOn(ROCKSDB_POOL, this.getAllStages(null, false), executing(entriesReplacer)); } @Override diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java index 02b79ce..d82b910 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java @@ -6,19 +6,14 @@ import static it.cavallium.dbengine.database.LLUtils.isBoundedRange; import static it.cavallium.dbengine.database.LLUtils.mapList; import static it.cavallium.dbengine.database.LLUtils.toStringSafe; import static it.cavallium.dbengine.database.disk.UpdateAtomicResultMode.DELTA; -import static it.cavallium.dbengine.utils.StreamUtils.LUCENE_SCHEDULER; -import static it.cavallium.dbengine.utils.StreamUtils.ROCKSDB_SCHEDULER; -import static it.cavallium.dbengine.utils.StreamUtils.collect; +import static it.cavallium.dbengine.utils.StreamUtils.ROCKSDB_POOL; import static it.cavallium.dbengine.utils.StreamUtils.collectOn; import static it.cavallium.dbengine.utils.StreamUtils.executing; -import static it.cavallium.dbengine.utils.StreamUtils.fastListing; import static it.cavallium.dbengine.utils.StreamUtils.fastSummingLong; import static it.cavallium.dbengine.utils.StreamUtils.streamWhileNonNull; import static java.util.Objects.requireNonNull; import static it.cavallium.dbengine.utils.StreamUtils.batches; -import com.google.common.collect.Lists; -import com.google.common.collect.Streams; import io.micrometer.core.instrument.Counter; import io.micrometer.core.instrument.Timer; import it.cavallium.buffer.Buf; @@ -39,20 +34,14 @@ import it.cavallium.dbengine.database.serialization.KVSerializationFunction; import it.cavallium.dbengine.database.serialization.SerializationFunction; import it.cavallium.dbengine.rpc.current.data.DatabaseOptions; import it.cavallium.dbengine.utils.DBException; -import it.cavallium.dbengine.utils.StreamUtils; import java.io.IOException; import java.nio.ByteBuffer; import java.time.Duration; import java.util.ArrayList; import java.util.Collections; import java.util.List; -import java.util.Objects; -import java.util.concurrent.Callable; import java.util.concurrent.CompletionException; -import java.util.concurrent.ForkJoinPool; -import java.util.concurrent.ForkJoinTask; import java.util.function.Function; -import java.util.stream.Collectors; import java.util.stream.IntStream; import java.util.stream.Stream; import org.apache.commons.lang3.tuple.Pair; @@ -497,7 +486,7 @@ public class LLLocalDictionary implements LLDictionary { @Override public void putMulti(Stream entries) { - collectOn(ROCKSDB_SCHEDULER, + collectOn(ROCKSDB_POOL, batches(entries, Math.min(MULTI_GET_WINDOW, CAPPED_WRITE_BATCH_CAP)), executing(entriesWindow -> { try (var writeOptions = new WriteOptions()) { @@ -817,7 +806,7 @@ public class LLLocalDictionary implements LLDictionary { throw new DBException("Failed to set a range: " + ex.getMessage()); } - collectOn(ROCKSDB_SCHEDULER, batches(entries, MULTI_GET_WINDOW), executing(entriesList -> { + collectOn(ROCKSDB_POOL, batches(entries, MULTI_GET_WINDOW), executing(entriesList -> { try (var writeOptions = new WriteOptions()) { if (!USE_WRITE_BATCHES_IN_SET_RANGE) { for (LLEntry entry : entriesList) { @@ -853,7 +842,7 @@ public class LLLocalDictionary implements LLDictionary { if (USE_WRITE_BATCHES_IN_SET_RANGE) { throw new UnsupportedOperationException("Can't use write batches in setRange without window. Please fix the parameters"); } - collectOn(ROCKSDB_SCHEDULER, this.getRange(null, range, false, smallRange), executing(oldValue -> { + collectOn(ROCKSDB_POOL, this.getRange(null, range, false, smallRange), executing(oldValue -> { try (var writeOptions = new WriteOptions()) { db.delete(writeOptions, oldValue.getKey()); } catch (RocksDBException ex) { @@ -861,7 +850,7 @@ public class LLLocalDictionary implements LLDictionary { } })); - collectOn(ROCKSDB_SCHEDULER, entries, executing(entry -> { + collectOn(ROCKSDB_POOL, entries, executing(entry -> { if (entry.getKey() != null && entry.getValue() != null) { this.putInternal(entry.getKey(), entry.getValue()); } @@ -1092,7 +1081,7 @@ public class LLLocalDictionary implements LLDictionary { readOpts.setVerifyChecksums(VERIFY_CHECKSUMS_WHEN_NOT_NEEDED); if (PARALLEL_EXACT_SIZE) { - return collectOn(ROCKSDB_SCHEDULER, IntStream + return collectOn(ROCKSDB_POOL, IntStream .range(-1, LLUtils.LEXICONOGRAPHIC_ITERATION_SEEKS.length) .mapToObj(idx -> Pair.of(idx == -1 ? new byte[0] : LLUtils.LEXICONOGRAPHIC_ITERATION_SEEKS[idx], idx + 1 >= LLUtils.LEXICONOGRAPHIC_ITERATION_SEEKS.length ? null diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMultiLuceneIndex.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMultiLuceneIndex.java index dac9d0d..37918b3 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMultiLuceneIndex.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMultiLuceneIndex.java @@ -1,8 +1,7 @@ package it.cavallium.dbengine.database.disk; import static it.cavallium.dbengine.lucene.LuceneUtils.getLuceneIndexId; -import static it.cavallium.dbengine.utils.StreamUtils.LUCENE_SCHEDULER; -import static it.cavallium.dbengine.utils.StreamUtils.collect; +import static it.cavallium.dbengine.utils.StreamUtils.LUCENE_POOL; import static it.cavallium.dbengine.utils.StreamUtils.collectOn; import static it.cavallium.dbengine.utils.StreamUtils.executing; import static it.cavallium.dbengine.utils.StreamUtils.fastListing; @@ -45,7 +44,6 @@ import it.cavallium.dbengine.rpc.current.data.LuceneOptions; import it.cavallium.dbengine.utils.DBException; import it.cavallium.dbengine.utils.SimpleResource; import it.cavallium.dbengine.utils.StreamUtils; -import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap; import it.unimi.dsi.fastutil.ints.IntList; import java.io.Closeable; import java.io.IOException; @@ -55,11 +53,9 @@ import java.util.HashSet; import java.util.List; import java.util.Map.Entry; import java.util.Objects; -import java.util.concurrent.CompletionException; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Function; -import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -151,7 +147,7 @@ public class LLLocalMultiLuceneIndex extends SimpleResource implements LLLuceneI private LLIndexSearchers getIndexSearchers(LLSnapshot snapshot) { // Resolve the snapshot of each shard - return LLIndexSearchers.of(StreamUtils.toListOn(StreamUtils.LUCENE_SCHEDULER, + return LLIndexSearchers.of(StreamUtils.toListOn(StreamUtils.LUCENE_POOL, Streams.mapWithIndex(this.luceneIndicesSet.stream(), (luceneIndex, index) -> { var subSnapshot = resolveSnapshot(snapshot, (int) index); return luceneIndex.retrieveSearcher(subSnapshot); @@ -166,7 +162,7 @@ public class LLLocalMultiLuceneIndex extends SimpleResource implements LLLuceneI @Override public long addDocuments(boolean atomic, Stream> documents) { - return collectOn(LUCENE_SCHEDULER, + return collectOn(LUCENE_POOL, partitionByInt(term -> getLuceneIndexId(term.getKey(), totalShards), documents) .map(entry -> luceneIndicesById[entry.key()].addDocuments(atomic, entry.values().stream())), fastSummingLong() @@ -185,7 +181,7 @@ public class LLLocalMultiLuceneIndex extends SimpleResource implements LLLuceneI @Override public long updateDocuments(Stream> documents) { - return collectOn(LUCENE_SCHEDULER, + return collectOn(LUCENE_POOL, partitionByInt(term -> getLuceneIndexId(term.getKey(), totalShards), documents) .map(entry -> luceneIndicesById[entry.key()].updateDocuments(entry.values().stream())), fastSummingLong() @@ -279,7 +275,7 @@ public class LLLocalMultiLuceneIndex extends SimpleResource implements LLLuceneI @Override protected void onClose() { - collectOn(StreamUtils.LUCENE_SCHEDULER, luceneIndicesSet.stream(), executing(SafeCloseable::close)); + collectOn(StreamUtils.LUCENE_POOL, luceneIndicesSet.stream(), executing(SafeCloseable::close)); if (multiSearcher instanceof Closeable closeable) { try { closeable.close(); @@ -291,29 +287,29 @@ public class LLLocalMultiLuceneIndex extends SimpleResource implements LLLuceneI @Override public void flush() { - collectOn(StreamUtils.LUCENE_SCHEDULER, luceneIndicesSet.stream(), executing(LLLuceneIndex::flush)); + collectOn(StreamUtils.LUCENE_POOL, luceneIndicesSet.stream(), executing(LLLuceneIndex::flush)); } @Override public void waitForMerges() { - collectOn(StreamUtils.LUCENE_SCHEDULER, luceneIndicesSet.stream(), executing(LLLuceneIndex::waitForMerges)); + collectOn(StreamUtils.LUCENE_POOL, luceneIndicesSet.stream(), executing(LLLuceneIndex::waitForMerges)); } @Override public void waitForLastMerges() { - collectOn(StreamUtils.LUCENE_SCHEDULER, luceneIndicesSet.stream(), executing(LLLuceneIndex::waitForLastMerges)); + collectOn(StreamUtils.LUCENE_POOL, luceneIndicesSet.stream(), executing(LLLuceneIndex::waitForLastMerges)); } @Override public void refresh(boolean force) { - collectOn(StreamUtils.LUCENE_SCHEDULER, luceneIndicesSet.stream(), executing(index -> index.refresh(force))); + collectOn(StreamUtils.LUCENE_POOL, luceneIndicesSet.stream(), executing(index -> index.refresh(force))); } @Override public LLSnapshot takeSnapshot() { // Generate next snapshot index var snapshotIndex = nextSnapshotNumber.getAndIncrement(); - var snapshot = collectOn(StreamUtils.LUCENE_SCHEDULER, + var snapshot = collectOn(StreamUtils.LUCENE_POOL, luceneIndicesSet.stream().map(LLSnapshottable::takeSnapshot), fastListing() ); @@ -338,12 +334,12 @@ public class LLLocalMultiLuceneIndex extends SimpleResource implements LLLuceneI @Override public void pauseForBackup() { - collectOn(StreamUtils.LUCENE_SCHEDULER, luceneIndicesSet.stream(), executing(LLLuceneIndex::pauseForBackup)); + collectOn(StreamUtils.LUCENE_POOL, luceneIndicesSet.stream(), executing(LLLuceneIndex::pauseForBackup)); } @Override public void resumeAfterBackup() { - collectOn(StreamUtils.LUCENE_SCHEDULER, luceneIndicesSet.stream(), executing(LLLuceneIndex::resumeAfterBackup)); + collectOn(StreamUtils.LUCENE_POOL, luceneIndicesSet.stream(), executing(LLLuceneIndex::resumeAfterBackup)); } @Override diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/DecimalBucketMultiSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/DecimalBucketMultiSearcher.java index 5437aac..e01d6e5 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/DecimalBucketMultiSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/DecimalBucketMultiSearcher.java @@ -1,10 +1,9 @@ package it.cavallium.dbengine.lucene.searcher; -import static it.cavallium.dbengine.utils.StreamUtils.LUCENE_SCHEDULER; +import static it.cavallium.dbengine.utils.StreamUtils.LUCENE_POOL; import static it.cavallium.dbengine.utils.StreamUtils.collectOn; import static it.cavallium.dbengine.utils.StreamUtils.fastListing; -import com.google.common.collect.Streams; import it.cavallium.dbengine.database.disk.LLIndexSearchers; import it.cavallium.dbengine.lucene.collector.Buckets; import it.cavallium.dbengine.lucene.collector.DecimalBucketMultiCollectorManager; @@ -49,7 +48,7 @@ public class DecimalBucketMultiSearcher { bucketParams.collectionRate(), bucketParams.sampleSize() ); - return cmm.reduce(collectOn(LUCENE_SCHEDULER, indexSearchers.stream().map(indexSearcher -> { + return cmm.reduce(collectOn(LUCENE_POOL, indexSearchers.stream().map(indexSearcher -> { try { return cmm.search(indexSearcher); } catch (IOException e) { diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/ScoredPagedMultiSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/ScoredPagedMultiSearcher.java index 76f99ad..85f4d93 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/ScoredPagedMultiSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/ScoredPagedMultiSearcher.java @@ -1,17 +1,15 @@ package it.cavallium.dbengine.lucene.searcher; import static it.cavallium.dbengine.lucene.searcher.PaginationInfo.MAX_SINGLE_SEARCH_LIMIT; -import static it.cavallium.dbengine.utils.StreamUtils.LUCENE_SCHEDULER; +import static it.cavallium.dbengine.utils.StreamUtils.LUCENE_POOL; import static it.cavallium.dbengine.utils.StreamUtils.fastListing; import static it.cavallium.dbengine.utils.StreamUtils.streamWhileNonNull; import static it.cavallium.dbengine.utils.StreamUtils.toListOn; import com.google.common.collect.Streams; -import it.cavallium.dbengine.client.query.current.data.TotalHitsCount; import it.cavallium.dbengine.database.LLKeyScore; import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.database.disk.LLIndexSearchers; -import it.cavallium.dbengine.lucene.LuceneCloseable; import it.cavallium.dbengine.lucene.LuceneUtils; import it.cavallium.dbengine.lucene.PageLimits; import it.cavallium.dbengine.lucene.collector.ScoringShardsCollectorMultiManager; @@ -21,7 +19,6 @@ import java.io.IOException; import java.util.Arrays; import java.util.Collection; import java.util.List; -import java.util.Objects; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import java.util.stream.Stream; @@ -184,7 +181,7 @@ public class ScoredPagedMultiSearcher implements MultiSearcher { return null; }; record IndexedShard(IndexSearcher indexSearcher, long shardIndex) {} - List shardResults = toListOn(LUCENE_SCHEDULER, + List shardResults = toListOn(LUCENE_POOL, Streams.mapWithIndex(indexSearchers.stream(), IndexedShard::new).map(shardWithIndex -> { var index = (int) shardWithIndex.shardIndex(); var shard = shardWithIndex.indexSearcher(); diff --git a/src/main/java/it/cavallium/dbengine/utils/StreamUtils.java b/src/main/java/it/cavallium/dbengine/utils/StreamUtils.java index 987ec9f..09988ee 100644 --- a/src/main/java/it/cavallium/dbengine/utils/StreamUtils.java +++ b/src/main/java/it/cavallium/dbengine/utils/StreamUtils.java @@ -1,7 +1,5 @@ package it.cavallium.dbengine.utils; -import static java.util.concurrent.ForkJoinPool.defaultForkJoinWorkerThreadFactory; - import com.google.common.collect.Iterators; import com.google.common.collect.Streams; import it.cavallium.dbengine.utils.PartitionByIntSpliterator.IntPartition; @@ -12,7 +10,6 @@ import java.util.Comparator; import java.util.EnumSet; import java.util.Iterator; import java.util.List; -import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.Spliterator; @@ -31,10 +28,8 @@ import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; import java.util.function.ToIntFunction; -import java.util.function.ToLongFunction; import java.util.stream.Collector; import java.util.stream.Collector.Characteristics; -import java.util.stream.IntStream; import java.util.stream.Stream; import java.util.stream.StreamSupport; import org.jetbrains.annotations.NotNull; @@ -42,11 +37,11 @@ import org.jetbrains.annotations.Nullable; public class StreamUtils { - public static final ForkJoinPool LUCENE_SCHEDULER = newNamedForkJoinPool("Lucene"); + public static final ForkJoinPool LUCENE_POOL = newNamedForkJoinPool("Lucene"); - public static final ForkJoinPool GRAPH_SCHEDULER = newNamedForkJoinPool("Graph"); + public static final ForkJoinPool GRAPH_POOL = newNamedForkJoinPool("Graph"); - public static final ForkJoinPool ROCKSDB_SCHEDULER = newNamedForkJoinPool("RocksDB"); + public static final ForkJoinPool ROCKSDB_POOL = newNamedForkJoinPool("RocksDB"); private static final Collector TO_LIST_FAKE_COLLECTOR = new FakeCollector(); private static final Collector COUNT_FAKE_COLLECTOR = new FakeCollector();