diff --git a/pom.xml b/pom.xml index ec7c0fc..b4e2d8e 100644 --- a/pom.xml +++ b/pom.xml @@ -512,7 +512,6 @@ io.soabase.recordbuilder.processor.RecordBuilderProcessor - --enable-preview 17 17 @@ -547,7 +546,7 @@ - --enable-preview --add-modules jdk.incubator.foreign -Dforeign.restricted=permit --add-opens=java.base/jdk.internal.misc=ALL-UNNAMED --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --enable-native-access=ALL-UNNAMED + --add-modules jdk.incubator.foreign -Dforeign.restricted=permit --add-opens=java.base/jdk.internal.misc=ALL-UNNAMED --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --enable-native-access=ALL-UNNAMED ci diff --git a/src/main/java/it/cavallium/dbengine/client/Indicizer.java b/src/main/java/it/cavallium/dbengine/client/Indicizer.java index 74c7ebd..c0abac2 100644 --- a/src/main/java/it/cavallium/dbengine/client/Indicizer.java +++ b/src/main/java/it/cavallium/dbengine/client/Indicizer.java @@ -10,6 +10,7 @@ import it.cavallium.dbengine.database.LLUpdateFields; import it.cavallium.dbengine.database.LLUtils; import java.util.Map; import java.util.Set; +import org.apache.lucene.util.BytesRef; import org.jetbrains.annotations.NotNull; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -40,7 +41,7 @@ public abstract class Indicizer { public abstract @NotNull String getKeyFieldName(); - public abstract @NotNull T getKey(String key); + public abstract @NotNull T getKey(BytesRef key); public abstract IndicizerAnalyzers getPerFieldAnalyzer(); diff --git a/src/main/java/it/cavallium/dbengine/client/LuceneOptions.java b/src/main/java/it/cavallium/dbengine/client/LuceneOptions.java index 55c98eb..5896f80 100644 --- a/src/main/java/it/cavallium/dbengine/client/LuceneOptions.java +++ b/src/main/java/it/cavallium/dbengine/client/LuceneOptions.java @@ -15,7 +15,7 @@ public record LuceneOptions(Map extraFlags, Optional directIOOptions, boolean allowMemoryMapping, Optional nrtCachingOptions, - int indexWriterBufferSize, + long indexWriterBufferSize, boolean applyAllDeletes, boolean writeAllDeletes, boolean allowNonVolatileCollection, diff --git a/src/main/java/it/cavallium/dbengine/database/LLItem.java b/src/main/java/it/cavallium/dbengine/database/LLItem.java index 01c79db..cae7ca9 100644 --- a/src/main/java/it/cavallium/dbengine/database/LLItem.java +++ b/src/main/java/it/cavallium/dbengine/database/LLItem.java @@ -11,7 +11,9 @@ import java.util.Arrays; import java.util.Objects; import java.util.StringJoiner; import org.apache.lucene.document.Field; +import org.apache.lucene.document.LongPoint; import org.apache.lucene.index.VectorSimilarityFunction; +import org.apache.lucene.util.BytesRef; public class LLItem { @@ -25,6 +27,12 @@ public class LLItem { this.data = data; } + public LLItem(LLType type, String name, BytesRef data) { + this.type = type; + this.name = name; + this.data = data; + } + public LLItem(LLType type, String name, KnnFieldData data) { this.type = type; this.name = name; @@ -115,6 +123,11 @@ public class LLItem { return new LLItem(LLType.LongStoredField, name, data); } + public static LLItem newLongStoredFieldND(String name, long... data) { + BytesRef packed = LongPoint.pack(data); + return new LLItem(LLType.BytesStoredField, name, packed); + } + public static LLItem newTextField(String name, String data, Field.Store store) { if (store == Field.Store.YES) { return new LLItem(LLType.TextFieldStored, name, data); diff --git a/src/main/java/it/cavallium/dbengine/database/LLKeyScore.java b/src/main/java/it/cavallium/dbengine/database/LLKeyScore.java index 770c11d..95d4b0e 100644 --- a/src/main/java/it/cavallium/dbengine/database/LLKeyScore.java +++ b/src/main/java/it/cavallium/dbengine/database/LLKeyScore.java @@ -2,7 +2,8 @@ package it.cavallium.dbengine.database; import java.util.Objects; import java.util.StringJoiner; +import org.apache.lucene.util.BytesRef; import org.jetbrains.annotations.Nullable; import reactor.core.publisher.Mono; -public record LLKeyScore(int docId, float score, @Nullable String key) {} +public record LLKeyScore(int docId, float score, @Nullable BytesRef key) {} diff --git a/src/main/java/it/cavallium/dbengine/database/LLTerm.java b/src/main/java/it/cavallium/dbengine/database/LLTerm.java index c529a52..f007d82 100644 --- a/src/main/java/it/cavallium/dbengine/database/LLTerm.java +++ b/src/main/java/it/cavallium/dbengine/database/LLTerm.java @@ -1,13 +1,20 @@ package it.cavallium.dbengine.database; import java.util.Objects; +import org.apache.lucene.index.Term; +import org.apache.lucene.util.BytesRef; public class LLTerm { private final String key; - private final String value; + private final BytesRef value; public LLTerm(String key, String value) { + this.key = key; + this.value = new BytesRef(value); + } + + public LLTerm(String key, BytesRef value) { this.key = key; this.value = value; } @@ -16,7 +23,11 @@ public class LLTerm { return key; } - public String getValue() { + public String getValueUTF8() { + return value.utf8ToString(); + } + + public BytesRef getValueBytesRef() { return value; } diff --git a/src/main/java/it/cavallium/dbengine/database/LLType.java b/src/main/java/it/cavallium/dbengine/database/LLType.java index 93ba054..079b708 100644 --- a/src/main/java/it/cavallium/dbengine/database/LLType.java +++ b/src/main/java/it/cavallium/dbengine/database/LLType.java @@ -15,6 +15,7 @@ public enum LLType { FloatPointND, DoublePointND, LongStoredField, + BytesStoredField, NumericDocValuesField, SortedNumericDocValuesField, TextField, diff --git a/src/main/java/it/cavallium/dbengine/database/LLUtils.java b/src/main/java/it/cavallium/dbengine/database/LLUtils.java index 7e58cf9..46b5b68 100644 --- a/src/main/java/it/cavallium/dbengine/database/LLUtils.java +++ b/src/main/java/it/cavallium/dbengine/database/LLUtils.java @@ -217,6 +217,7 @@ public class LLUtils { case FloatPointND -> new FloatPoint(item.getName(), item.floatArrayData()); case DoublePointND -> new DoublePoint(item.getName(), item.doubleArrayData()); case LongStoredField -> new StoredField(item.getName(), item.longData()); + case BytesStoredField -> new StoredField(item.getName(), (BytesRef) item.getData()); case FloatPoint -> new FloatPoint(item.getName(), item.floatData()); case TextField -> new TextField(item.getName(), item.stringValue(), Store.NO); case TextFieldStored -> new TextField(item.getName(), item.stringValue(), Store.YES); @@ -1059,8 +1060,7 @@ public class LLUtils { @Override public BytesRef toBytesRef() { - byte[] data = term.getValue().getBytes(StandardCharsets.UTF_8); - return new BytesRef(data, 0, data.length); + return term.getValueBytesRef(); } } } diff --git a/src/main/java/it/cavallium/dbengine/database/disk/CachedIndexSearcherManager.java b/src/main/java/it/cavallium/dbengine/database/disk/CachedIndexSearcherManager.java index 90ffb50..0656d92 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/CachedIndexSearcherManager.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/CachedIndexSearcherManager.java @@ -5,18 +5,16 @@ import static it.cavallium.dbengine.client.UninterruptibleScheduler.uninterrupti import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; -import io.net5.buffer.api.Resource; import io.net5.buffer.api.Send; import it.cavallium.dbengine.database.LLSnapshot; import java.io.IOException; import java.time.Duration; -import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.ForkJoinPool; -import java.util.concurrent.Phaser; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.LockSupport; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.lucene.index.IndexWriter; @@ -36,16 +34,20 @@ import reactor.core.scheduler.Schedulers; public class CachedIndexSearcherManager implements IndexSearcherManager { private static final Logger logger = LogManager.getLogger(CachedIndexSearcherManager.class); - private final Executor SEARCH_EXECUTOR = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors(), - new ShortNamedThreadFactory("lucene-search").withGroup(new ThreadGroup("lucene-search"))); - private final SearcherFactory SEARCHER_FACTORY = new ExecutorSearcherFactory(SEARCH_EXECUTOR); + private final ExecutorService searchExecutor = Executors.newFixedThreadPool( + Runtime.getRuntime().availableProcessors(), + new ShortNamedThreadFactory("lucene-search") + .setDaemon(true).withGroup(new ThreadGroup("lucene-search")) + ); + private final SearcherFactory SEARCHER_FACTORY = new ExecutorSearcherFactory(searchExecutor); private final SnapshotsManager snapshotsManager; private final Similarity similarity; private final SearcherManager searcherManager; private final Duration queryRefreshDebounceTime; - private final Phaser activeSearchers = new Phaser(1); - private final Phaser activeRefreshes = new Phaser(1); + + private final AtomicLong activeSearchers = new AtomicLong(0); + private final AtomicLong activeRefreshes = new AtomicLong(0); private final LoadingCache>> cachedSnapshotSearchers; private final Mono> cachedMainSearcher; @@ -104,35 +106,30 @@ public class CachedIndexSearcherManager implements IndexSearcherManager { .then(Mono.fromRunnable(() -> { logger.info("Closed IndexSearcherManager"); logger.info("Closing refreshes..."); - if (!activeRefreshes.isTerminated()) { - try { - //noinspection BlockingMethodInNonBlockingContext - activeRefreshes.awaitAdvanceInterruptibly(activeRefreshes.arrive(), 15, TimeUnit.SECONDS); - } catch (Exception ex) { - if (ex instanceof TimeoutException) { - logger.error("Failed to terminate active refreshes: timeout"); - } else { - logger.error("Failed to terminate active refreshes", ex); - } - } + long initTime = System.nanoTime(); + while (activeRefreshes.get() > 0 && (System.nanoTime() - initTime) <= 15000000000L) { + LockSupport.parkNanos(50000000); } logger.info("Closed refreshes..."); logger.info("Closing active searchers..."); - if (!activeSearchers.isTerminated()) { - try { - //noinspection BlockingMethodInNonBlockingContext - activeSearchers.awaitAdvanceInterruptibly(activeSearchers.arrive(), 15, TimeUnit.SECONDS); - } catch (Exception ex) { - if (ex instanceof TimeoutException) { - logger.error("Failed to terminate active searchers: timeout"); - } else { - logger.error("Failed to terminate active searchers", ex); - } - } + initTime = System.nanoTime(); + while (activeSearchers.get() > 0 && (System.nanoTime() - initTime) <= 15000000000L) { + LockSupport.parkNanos(50000000); } logger.info("Closed active searchers"); + logger.info("Stopping searcher executor..."); cachedSnapshotSearchers.invalidateAll(); cachedSnapshotSearchers.cleanUp(); + searchExecutor.shutdown(); + try { + //noinspection BlockingMethodInNonBlockingContext + if (!searchExecutor.awaitTermination(15, TimeUnit.SECONDS)) { + searchExecutor.shutdownNow(); + } + } catch (InterruptedException e) { + logger.error("Failed to stop executor", e); + } + logger.info("Stopped searcher executor..."); }).subscribeOn(uninterruptibleScheduler(Schedulers.boundedElastic()))) .publishOn(Schedulers.parallel()) .cache(); @@ -143,14 +140,14 @@ public class CachedIndexSearcherManager implements IndexSearcherManager { if (closeRequested.get()) { return null; } - activeSearchers.register(); + activeSearchers.incrementAndGet(); IndexSearcher indexSearcher; boolean decRef; if (snapshot == null) { indexSearcher = searcherManager.acquire(); decRef = true; } else { - indexSearcher = snapshotsManager.resolveSnapshot(snapshot).getIndexSearcher(SEARCH_EXECUTOR); + indexSearcher = snapshotsManager.resolveSnapshot(snapshot).getIndexSearcher(searchExecutor); decRef = false; } indexSearcher.setSimilarity(similarity); @@ -161,30 +158,30 @@ public class CachedIndexSearcherManager implements IndexSearcherManager { private void dropCachedIndexSearcher() { // This shouldn't happen more than once per searcher. - activeSearchers.arriveAndDeregister(); + activeSearchers.decrementAndGet(); } @Override public void maybeRefreshBlocking() throws IOException { try { - activeRefreshes.register(); + activeRefreshes.incrementAndGet(); searcherManager.maybeRefreshBlocking(); } catch (AlreadyClosedException ignored) { } finally { - activeRefreshes.arriveAndDeregister(); + activeRefreshes.decrementAndGet(); } } @Override public void maybeRefresh() throws IOException { try { - activeRefreshes.register(); + activeRefreshes.incrementAndGet(); searcherManager.maybeRefresh(); } catch (AlreadyClosedException ignored) { } finally { - activeRefreshes.arriveAndDeregister(); + activeRefreshes.decrementAndGet(); } } diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java index 2a613b0..8f41ca1 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java @@ -375,26 +375,25 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { @Override public Mono update(LLTerm id, LLIndexRequest request) { - return this - .runSafe(() -> docIndexingTime.recordCallable(() -> { - startedDocIndexings.increment(); - try { - switch (request) { - case LLUpdateDocument updateDocument -> - indexWriter.updateDocument(LLUtils.toTerm(id), toDocument(updateDocument)); - case LLSoftUpdateDocument softUpdateDocument -> - indexWriter.softUpdateDocument(LLUtils.toTerm(id), toDocument(softUpdateDocument.items()), - toFields(softUpdateDocument.softDeleteItems())); - case LLUpdateFields updateFields -> indexWriter.updateDocValues(LLUtils.toTerm(id), - toFields(updateFields.items())); - case null, default -> throw new UnsupportedOperationException("Unexpected request type: " + request); - } - } finally { - endeddDocIndexings.increment(); - } - return null; - })) - .transform(this::ensureOpen); + return this.runSafe(() -> docIndexingTime.recordCallable(() -> { + startedDocIndexings.increment(); + try { + if (request instanceof LLUpdateDocument updateDocument) { + indexWriter.updateDocument(LLUtils.toTerm(id), toDocument(updateDocument)); + } else if (request instanceof LLSoftUpdateDocument softUpdateDocument) { + indexWriter.softUpdateDocument(LLUtils.toTerm(id), + toDocument(softUpdateDocument.items()), + toFields(softUpdateDocument.softDeleteItems())); + } else if (request instanceof LLUpdateFields updateFields) { + indexWriter.updateDocValues(LLUtils.toTerm(id), toFields(updateFields.items())); + } else { + throw new UnsupportedOperationException("Unexpected request type: " + request); + } + } finally { + endeddDocIndexings.increment(); + } + return null; + })).transform(this::ensureOpen); } @Override diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMultiLuceneIndex.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMultiLuceneIndex.java index 10292eb..03faaa3 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMultiLuceneIndex.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMultiLuceneIndex.java @@ -42,6 +42,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; import org.apache.lucene.analysis.miscellaneous.PerFieldAnalyzerWrapper; import org.apache.lucene.search.similarities.PerFieldSimilarityWrapper; +import org.apache.lucene.util.StringHelper; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import reactor.core.publisher.Flux; @@ -116,7 +117,7 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex { } private int getLuceneIndexId(LLTerm id) { - return Math.abs(id.getValue().hashCode()) % luceneIndices.length; + return Math.abs(StringHelper.murmurhash3_x86_32(id.getValueBytesRef(), 7) % luceneIndices.length); } @Override diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLTempLMDBEnv.java b/src/main/java/it/cavallium/dbengine/database/disk/LLTempLMDBEnv.java index c12a130..6cffdb4 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLTempLMDBEnv.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLTempLMDBEnv.java @@ -19,32 +19,49 @@ public class LLTempLMDBEnv implements Closeable { private static final long TWENTY_GIBIBYTES = 20L * 1024L * 1024L * 1024L; public static final int MAX_DATABASES = 1024; private static final AtomicInteger NEXT_LMDB_ENV_ID = new AtomicInteger(0); - private final BitSet freeIds; + private BitSet freeIds; - private final int envId; - private final Path tempDirectory; - private final Env env; + private int envId; + private Path tempDirectory; + private Env env; + private volatile boolean initialized; private volatile boolean closed; - public LLTempLMDBEnv() throws IOException { + public LLTempLMDBEnv() { this.envId = NEXT_LMDB_ENV_ID.getAndIncrement(); - tempDirectory = Files.createTempDirectory("lmdb"); - var envBuilder = Env.create(Net5ByteBufProxy.PROXY_NETTY) - .setMapSize(TWENTY_GIBIBYTES) - .setMaxDbs(MAX_DATABASES); - //env = envBuilder.open(tempDirectory.toFile(), MDB_NOLOCK, MDB_NOSYNC, MDB_NOTLS, MDB_NORDAHEAD, MDB_WRITEMAP); - env = envBuilder.open(tempDirectory.toFile(), MDB_NOTLS, MDB_NOSYNC, MDB_NORDAHEAD, MDB_NOMETASYNC); - freeIds = BitSet.of(DocIdSetIterator.range(0, MAX_DATABASES), MAX_DATABASES); } public Env getEnv() { if (closed) { throw new IllegalStateException("Environment closed"); } + initializeIfPossible(); return env; } + private void initializeIfPossible() { + if (!initialized) { + synchronized(this) { + if (!initialized) { + try { + tempDirectory = Files.createTempDirectory("lmdb"); + var envBuilder = Env.create(Net5ByteBufProxy.PROXY_NETTY) + .setMapSize(TWENTY_GIBIBYTES) + .setMaxDbs(MAX_DATABASES); + //env = envBuilder.open(tempDirectory.toFile(), MDB_NOLOCK, MDB_NOSYNC, MDB_NOTLS, MDB_NORDAHEAD, MDB_WRITEMAP); + env = envBuilder.open(tempDirectory.toFile(), MDB_NOTLS, MDB_NOSYNC, MDB_NORDAHEAD, MDB_NOMETASYNC); + freeIds = BitSet.of(DocIdSetIterator.range(0, MAX_DATABASES), MAX_DATABASES); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } + } + } + public int allocateDb() { + initializeIfPossible(); + //noinspection SynchronizeOnNonFinalField synchronized (freeIds) { var freeBit = freeIds.nextSetBit(0); if (freeBit == DocIdSetIterator.NO_MORE_DOCS) { @@ -61,6 +78,8 @@ public class LLTempLMDBEnv implements Closeable { } public void freeDb(int db) { + initializeIfPossible(); + //noinspection SynchronizeOnNonFinalField synchronized (freeIds) { freeIds.set(db); } @@ -68,6 +87,16 @@ public class LLTempLMDBEnv implements Closeable { @Override public void close() throws IOException { + if (this.closed) { + return; + } + if (!this.initialized) { + synchronized (this) { + closed = true; + initialized = true; + return; + } + } this.closed = true; env.close(); //noinspection ResultOfMethodCallIgnored diff --git a/src/main/java/it/cavallium/dbengine/lucene/FullDocs.java b/src/main/java/it/cavallium/dbengine/lucene/FullDocs.java index 71dafe3..5c3c25a 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/FullDocs.java +++ b/src/main/java/it/cavallium/dbengine/lucene/FullDocs.java @@ -138,17 +138,19 @@ public interface FullDocs extends ResourceIterable { @SuppressWarnings("unchecked") Flux[] fluxes = new Flux[fullDocs.length]; for (int i = 0; i < iterables.length; i++) { var shardIndex = i; - fluxes[i] = iterables[i].map(shard -> switch (shard) { - case LLScoreDoc scoreDoc -> - //noinspection unchecked - (T) new LLScoreDoc(scoreDoc.doc(), scoreDoc.score(), shardIndex); - case LLFieldDoc fieldDoc -> - //noinspection unchecked - (T) new LLFieldDoc(fieldDoc.doc(), fieldDoc.score(), shardIndex, fieldDoc.fields()); - case LLSlotDoc slotDoc -> - //noinspection unchecked - (T) new LLSlotDoc(slotDoc.doc(), slotDoc.score(), shardIndex, slotDoc.slot()); - case null, default -> throw new UnsupportedOperationException("Unsupported type " + (shard == null ? null : shard.getClass())); + fluxes[i] = iterables[i].map(shard -> { + if (shard instanceof LLScoreDoc scoreDoc) { + //noinspection unchecked + return (T) new LLScoreDoc(scoreDoc.doc(), scoreDoc.score(), shardIndex); + } else if (shard instanceof LLFieldDoc fieldDoc) { + //noinspection unchecked + return (T) new LLFieldDoc(fieldDoc.doc(), fieldDoc.score(), shardIndex, fieldDoc.fields()); + } else if (shard instanceof LLSlotDoc slotDoc) { + //noinspection unchecked + return (T) new LLSlotDoc(slotDoc.doc(), slotDoc.score(), shardIndex, slotDoc.slot()); + } else { + throw new UnsupportedOperationException("Unsupported type " + (shard == null ? null : shard.getClass())); + } }); if (fullDocs[i].totalHits().relation == EQUAL_TO) { fluxes[i] = fluxes[i].take(fullDocs[i].totalHits().value, true); diff --git a/src/main/java/it/cavallium/dbengine/lucene/LuceneUtils.java b/src/main/java/it/cavallium/dbengine/lucene/LuceneUtils.java index 75a24a6..0e3ae56 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/LuceneUtils.java +++ b/src/main/java/it/cavallium/dbengine/lucene/LuceneUtils.java @@ -71,6 +71,7 @@ import org.apache.lucene.search.similarities.ClassicSimilarity; import org.apache.lucene.search.similarities.PerFieldSimilarityWrapper; import org.apache.lucene.search.similarities.Similarity; import org.apache.lucene.search.similarities.TFIDFSimilarity; +import org.apache.lucene.util.BytesRef; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import org.novasearch.lucene.search.similarities.BM25Similarity; @@ -178,7 +179,7 @@ public class LuceneUtils { * @throws IOException when an error occurs when reading the document */ @NotNull - public static String keyOfTopDoc(int docId, IndexReader indexReader, + public static BytesRef keyOfTopDoc(int docId, IndexReader indexReader, String keyFieldName) throws IOException, NoSuchElementException { if (Schedulers.isInNonBlockingThread()) { throw new UnsupportedOperationException("Called keyOfTopDoc in a nonblocking thread"); @@ -202,7 +203,7 @@ public class LuceneUtils { .map(IndexableField::name) .collect(Collectors.joining(",", "[", "]"))); } else { - return field.stringValue(); + return field.binaryValue(); } } } @@ -392,7 +393,7 @@ public class LuceneUtils { indexSearcher = indexSearchers.get(shardIndex); } try { - String collectedDoc = keyOfTopDoc(shardDocId, indexSearcher.getIndexReader(), keyFieldName); + BytesRef collectedDoc = keyOfTopDoc(shardDocId, indexSearcher.getIndexReader(), keyFieldName); return new LLKeyScore(shardDocId, score, collectedDoc); } catch (NoSuchElementException ex) { logger.debug("Error: document {} key is not present!", shardDocId); diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/CountMultiSearcher.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/CountMultiSearcher.java index cea3e1c..23df2b7 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/CountMultiSearcher.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/CountMultiSearcher.java @@ -119,7 +119,7 @@ public class CountMultiSearcher implements MultiSearcher { } }).subscribeOn(uninterruptibleScheduler(Schedulers.boundedElastic()))) .publishOn(Schedulers.parallel()) - .timeout(queryParams.timeout()); + .transform(TimeoutUtil.timeoutMono(queryParams.timeout())); }, is -> Mono.empty() ) diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/LuceneGenerator.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/LuceneGenerator.java index 8a45672..6fa27a8 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/searcher/LuceneGenerator.java +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/LuceneGenerator.java @@ -66,7 +66,7 @@ public class LuceneGenerator implements Supplier { return s; } ) - .subscribeOn(SCHED, false); + .subscribeOn(SCHED); } @Override diff --git a/src/main/java/it/cavallium/dbengine/lucene/searcher/TimeoutUtil.java b/src/main/java/it/cavallium/dbengine/lucene/searcher/TimeoutUtil.java new file mode 100644 index 0000000..e906530 --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/lucene/searcher/TimeoutUtil.java @@ -0,0 +1,31 @@ +package it.cavallium.dbengine.lucene.searcher; + +import java.time.Duration; +import java.util.function.Function; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +public class TimeoutUtil { + + private static final Duration INFINITE = Duration.ofDays(360); + + public static Function, Mono> timeoutMono(Duration timeout) { + return query -> { + if (timeout.isZero() || timeout.isNegative() || timeout.compareTo(INFINITE) > 0) { + return query; + } else { + return query.timeout(timeout); + } + }; + } + + public static Function, Flux> timeoutFlux(Duration timeout) { + return query -> { + if (timeout.compareTo(INFINITE) > 0) { + return query; + } else { + return query.timeout(timeout); + } + }; + } +} diff --git a/src/test/java/it/cavallium/dbengine/StringIndicizer.java b/src/test/java/it/cavallium/dbengine/StringIndicizer.java index 733f8f2..551afe4 100644 --- a/src/test/java/it/cavallium/dbengine/StringIndicizer.java +++ b/src/test/java/it/cavallium/dbengine/StringIndicizer.java @@ -13,6 +13,7 @@ import it.cavallium.dbengine.lucene.analyzer.TextFieldsSimilarity; import java.util.LinkedList; import org.apache.lucene.document.Field; import org.apache.lucene.document.Field.Store; +import org.apache.lucene.util.BytesRef; import org.jetbrains.annotations.NotNull; import reactor.core.publisher.Mono; @@ -51,8 +52,8 @@ public class StringIndicizer extends Indicizer { } @Override - public @NotNull String getKey(String key) { - return key; + public @NotNull String getKey(BytesRef key) { + return key.utf8ToString(); } @Override