Use BytesRef when possible

This commit is contained in:
Andrea Cavalli 2022-02-25 15:46:32 +01:00
parent f5729f0b65
commit 85642621df
18 changed files with 186 additions and 99 deletions

View File

@ -512,7 +512,6 @@
<annotationProcessors>
<annotationProcessor>io.soabase.recordbuilder.processor.RecordBuilderProcessor</annotationProcessor>
</annotationProcessors>
<compilerArgs>--enable-preview</compilerArgs>
<source>17</source>
<target>17</target>
</configuration>
@ -547,7 +546,7 @@
</dependency>
</dependencies>
<configuration>
<argLine>--enable-preview --add-modules jdk.incubator.foreign -Dforeign.restricted=permit --add-opens=java.base/jdk.internal.misc=ALL-UNNAMED --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --enable-native-access=ALL-UNNAMED</argLine>
<argLine>--add-modules jdk.incubator.foreign -Dforeign.restricted=permit --add-opens=java.base/jdk.internal.misc=ALL-UNNAMED --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --enable-native-access=ALL-UNNAMED</argLine>
<systemProperties>
<property>
<name>ci</name>

View File

@ -10,6 +10,7 @@ import it.cavallium.dbengine.database.LLUpdateFields;
import it.cavallium.dbengine.database.LLUtils;
import java.util.Map;
import java.util.Set;
import org.apache.lucene.util.BytesRef;
import org.jetbrains.annotations.NotNull;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@ -40,7 +41,7 @@ public abstract class Indicizer<T, U> {
public abstract @NotNull String getKeyFieldName();
public abstract @NotNull T getKey(String key);
public abstract @NotNull T getKey(BytesRef key);
public abstract IndicizerAnalyzers getPerFieldAnalyzer();

View File

@ -15,7 +15,7 @@ public record LuceneOptions(Map<String, String> extraFlags,
Optional<DirectIOOptions> directIOOptions,
boolean allowMemoryMapping,
Optional<NRTCachingOptions> nrtCachingOptions,
int indexWriterBufferSize,
long indexWriterBufferSize,
boolean applyAllDeletes,
boolean writeAllDeletes,
boolean allowNonVolatileCollection,

View File

@ -11,7 +11,9 @@ import java.util.Arrays;
import java.util.Objects;
import java.util.StringJoiner;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.LongPoint;
import org.apache.lucene.index.VectorSimilarityFunction;
import org.apache.lucene.util.BytesRef;
public class LLItem {
@ -25,6 +27,12 @@ public class LLItem {
this.data = data;
}
public LLItem(LLType type, String name, BytesRef data) {
this.type = type;
this.name = name;
this.data = data;
}
public LLItem(LLType type, String name, KnnFieldData data) {
this.type = type;
this.name = name;
@ -115,6 +123,11 @@ public class LLItem {
return new LLItem(LLType.LongStoredField, name, data);
}
public static LLItem newLongStoredFieldND(String name, long... data) {
BytesRef packed = LongPoint.pack(data);
return new LLItem(LLType.BytesStoredField, name, packed);
}
public static LLItem newTextField(String name, String data, Field.Store store) {
if (store == Field.Store.YES) {
return new LLItem(LLType.TextFieldStored, name, data);

View File

@ -2,7 +2,8 @@ package it.cavallium.dbengine.database;
import java.util.Objects;
import java.util.StringJoiner;
import org.apache.lucene.util.BytesRef;
import org.jetbrains.annotations.Nullable;
import reactor.core.publisher.Mono;
public record LLKeyScore(int docId, float score, @Nullable String key) {}
public record LLKeyScore(int docId, float score, @Nullable BytesRef key) {}

View File

@ -1,13 +1,20 @@
package it.cavallium.dbengine.database;
import java.util.Objects;
import org.apache.lucene.index.Term;
import org.apache.lucene.util.BytesRef;
public class LLTerm {
private final String key;
private final String value;
private final BytesRef value;
public LLTerm(String key, String value) {
this.key = key;
this.value = new BytesRef(value);
}
public LLTerm(String key, BytesRef value) {
this.key = key;
this.value = value;
}
@ -16,7 +23,11 @@ public class LLTerm {
return key;
}
public String getValue() {
public String getValueUTF8() {
return value.utf8ToString();
}
public BytesRef getValueBytesRef() {
return value;
}

View File

@ -15,6 +15,7 @@ public enum LLType {
FloatPointND,
DoublePointND,
LongStoredField,
BytesStoredField,
NumericDocValuesField,
SortedNumericDocValuesField,
TextField,

View File

@ -217,6 +217,7 @@ public class LLUtils {
case FloatPointND -> new FloatPoint(item.getName(), item.floatArrayData());
case DoublePointND -> new DoublePoint(item.getName(), item.doubleArrayData());
case LongStoredField -> new StoredField(item.getName(), item.longData());
case BytesStoredField -> new StoredField(item.getName(), (BytesRef) item.getData());
case FloatPoint -> new FloatPoint(item.getName(), item.floatData());
case TextField -> new TextField(item.getName(), item.stringValue(), Store.NO);
case TextFieldStored -> new TextField(item.getName(), item.stringValue(), Store.YES);
@ -1059,8 +1060,7 @@ public class LLUtils {
@Override
public BytesRef toBytesRef() {
byte[] data = term.getValue().getBytes(StandardCharsets.UTF_8);
return new BytesRef(data, 0, data.length);
return term.getValueBytesRef();
}
}
}

View File

@ -5,18 +5,16 @@ import static it.cavallium.dbengine.client.UninterruptibleScheduler.uninterrupti
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import io.net5.buffer.api.Resource;
import io.net5.buffer.api.Send;
import it.cavallium.dbengine.database.LLSnapshot;
import java.io.IOException;
import java.time.Duration;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.LockSupport;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.index.IndexWriter;
@ -36,16 +34,20 @@ import reactor.core.scheduler.Schedulers;
public class CachedIndexSearcherManager implements IndexSearcherManager {
private static final Logger logger = LogManager.getLogger(CachedIndexSearcherManager.class);
private final Executor SEARCH_EXECUTOR = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors(),
new ShortNamedThreadFactory("lucene-search").withGroup(new ThreadGroup("lucene-search")));
private final SearcherFactory SEARCHER_FACTORY = new ExecutorSearcherFactory(SEARCH_EXECUTOR);
private final ExecutorService searchExecutor = Executors.newFixedThreadPool(
Runtime.getRuntime().availableProcessors(),
new ShortNamedThreadFactory("lucene-search")
.setDaemon(true).withGroup(new ThreadGroup("lucene-search"))
);
private final SearcherFactory SEARCHER_FACTORY = new ExecutorSearcherFactory(searchExecutor);
private final SnapshotsManager snapshotsManager;
private final Similarity similarity;
private final SearcherManager searcherManager;
private final Duration queryRefreshDebounceTime;
private final Phaser activeSearchers = new Phaser(1);
private final Phaser activeRefreshes = new Phaser(1);
private final AtomicLong activeSearchers = new AtomicLong(0);
private final AtomicLong activeRefreshes = new AtomicLong(0);
private final LoadingCache<LLSnapshot, Mono<Send<LLIndexSearcher>>> cachedSnapshotSearchers;
private final Mono<Send<LLIndexSearcher>> cachedMainSearcher;
@ -104,35 +106,30 @@ public class CachedIndexSearcherManager implements IndexSearcherManager {
.then(Mono.<Void>fromRunnable(() -> {
logger.info("Closed IndexSearcherManager");
logger.info("Closing refreshes...");
if (!activeRefreshes.isTerminated()) {
try {
//noinspection BlockingMethodInNonBlockingContext
activeRefreshes.awaitAdvanceInterruptibly(activeRefreshes.arrive(), 15, TimeUnit.SECONDS);
} catch (Exception ex) {
if (ex instanceof TimeoutException) {
logger.error("Failed to terminate active refreshes: timeout");
} else {
logger.error("Failed to terminate active refreshes", ex);
}
}
long initTime = System.nanoTime();
while (activeRefreshes.get() > 0 && (System.nanoTime() - initTime) <= 15000000000L) {
LockSupport.parkNanos(50000000);
}
logger.info("Closed refreshes...");
logger.info("Closing active searchers...");
if (!activeSearchers.isTerminated()) {
try {
//noinspection BlockingMethodInNonBlockingContext
activeSearchers.awaitAdvanceInterruptibly(activeSearchers.arrive(), 15, TimeUnit.SECONDS);
} catch (Exception ex) {
if (ex instanceof TimeoutException) {
logger.error("Failed to terminate active searchers: timeout");
} else {
logger.error("Failed to terminate active searchers", ex);
}
}
initTime = System.nanoTime();
while (activeSearchers.get() > 0 && (System.nanoTime() - initTime) <= 15000000000L) {
LockSupport.parkNanos(50000000);
}
logger.info("Closed active searchers");
logger.info("Stopping searcher executor...");
cachedSnapshotSearchers.invalidateAll();
cachedSnapshotSearchers.cleanUp();
searchExecutor.shutdown();
try {
//noinspection BlockingMethodInNonBlockingContext
if (!searchExecutor.awaitTermination(15, TimeUnit.SECONDS)) {
searchExecutor.shutdownNow();
}
} catch (InterruptedException e) {
logger.error("Failed to stop executor", e);
}
logger.info("Stopped searcher executor...");
}).subscribeOn(uninterruptibleScheduler(Schedulers.boundedElastic())))
.publishOn(Schedulers.parallel())
.cache();
@ -143,14 +140,14 @@ public class CachedIndexSearcherManager implements IndexSearcherManager {
if (closeRequested.get()) {
return null;
}
activeSearchers.register();
activeSearchers.incrementAndGet();
IndexSearcher indexSearcher;
boolean decRef;
if (snapshot == null) {
indexSearcher = searcherManager.acquire();
decRef = true;
} else {
indexSearcher = snapshotsManager.resolveSnapshot(snapshot).getIndexSearcher(SEARCH_EXECUTOR);
indexSearcher = snapshotsManager.resolveSnapshot(snapshot).getIndexSearcher(searchExecutor);
decRef = false;
}
indexSearcher.setSimilarity(similarity);
@ -161,30 +158,30 @@ public class CachedIndexSearcherManager implements IndexSearcherManager {
private void dropCachedIndexSearcher() {
// This shouldn't happen more than once per searcher.
activeSearchers.arriveAndDeregister();
activeSearchers.decrementAndGet();
}
@Override
public void maybeRefreshBlocking() throws IOException {
try {
activeRefreshes.register();
activeRefreshes.incrementAndGet();
searcherManager.maybeRefreshBlocking();
} catch (AlreadyClosedException ignored) {
} finally {
activeRefreshes.arriveAndDeregister();
activeRefreshes.decrementAndGet();
}
}
@Override
public void maybeRefresh() throws IOException {
try {
activeRefreshes.register();
activeRefreshes.incrementAndGet();
searcherManager.maybeRefresh();
} catch (AlreadyClosedException ignored) {
} finally {
activeRefreshes.arriveAndDeregister();
activeRefreshes.decrementAndGet();
}
}

View File

@ -375,26 +375,25 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
@Override
public Mono<Void> update(LLTerm id, LLIndexRequest request) {
return this
.<Void>runSafe(() -> docIndexingTime.recordCallable(() -> {
startedDocIndexings.increment();
try {
switch (request) {
case LLUpdateDocument updateDocument ->
indexWriter.updateDocument(LLUtils.toTerm(id), toDocument(updateDocument));
case LLSoftUpdateDocument softUpdateDocument ->
indexWriter.softUpdateDocument(LLUtils.toTerm(id), toDocument(softUpdateDocument.items()),
toFields(softUpdateDocument.softDeleteItems()));
case LLUpdateFields updateFields -> indexWriter.updateDocValues(LLUtils.toTerm(id),
toFields(updateFields.items()));
case null, default -> throw new UnsupportedOperationException("Unexpected request type: " + request);
}
} finally {
endeddDocIndexings.increment();
}
return null;
}))
.transform(this::ensureOpen);
return this.<Void>runSafe(() -> docIndexingTime.recordCallable(() -> {
startedDocIndexings.increment();
try {
if (request instanceof LLUpdateDocument updateDocument) {
indexWriter.updateDocument(LLUtils.toTerm(id), toDocument(updateDocument));
} else if (request instanceof LLSoftUpdateDocument softUpdateDocument) {
indexWriter.softUpdateDocument(LLUtils.toTerm(id),
toDocument(softUpdateDocument.items()),
toFields(softUpdateDocument.softDeleteItems()));
} else if (request instanceof LLUpdateFields updateFields) {
indexWriter.updateDocValues(LLUtils.toTerm(id), toFields(updateFields.items()));
} else {
throw new UnsupportedOperationException("Unexpected request type: " + request);
}
} finally {
endeddDocIndexings.increment();
}
return null;
})).transform(this::ensureOpen);
}
@Override

View File

@ -42,6 +42,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.lucene.analysis.miscellaneous.PerFieldAnalyzerWrapper;
import org.apache.lucene.search.similarities.PerFieldSimilarityWrapper;
import org.apache.lucene.util.StringHelper;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import reactor.core.publisher.Flux;
@ -116,7 +117,7 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex {
}
private int getLuceneIndexId(LLTerm id) {
return Math.abs(id.getValue().hashCode()) % luceneIndices.length;
return Math.abs(StringHelper.murmurhash3_x86_32(id.getValueBytesRef(), 7) % luceneIndices.length);
}
@Override

View File

@ -19,32 +19,49 @@ public class LLTempLMDBEnv implements Closeable {
private static final long TWENTY_GIBIBYTES = 20L * 1024L * 1024L * 1024L;
public static final int MAX_DATABASES = 1024;
private static final AtomicInteger NEXT_LMDB_ENV_ID = new AtomicInteger(0);
private final BitSet freeIds;
private BitSet freeIds;
private final int envId;
private final Path tempDirectory;
private final Env<ByteBuf> env;
private int envId;
private Path tempDirectory;
private Env<ByteBuf> env;
private volatile boolean initialized;
private volatile boolean closed;
public LLTempLMDBEnv() throws IOException {
public LLTempLMDBEnv() {
this.envId = NEXT_LMDB_ENV_ID.getAndIncrement();
tempDirectory = Files.createTempDirectory("lmdb");
var envBuilder = Env.create(Net5ByteBufProxy.PROXY_NETTY)
.setMapSize(TWENTY_GIBIBYTES)
.setMaxDbs(MAX_DATABASES);
//env = envBuilder.open(tempDirectory.toFile(), MDB_NOLOCK, MDB_NOSYNC, MDB_NOTLS, MDB_NORDAHEAD, MDB_WRITEMAP);
env = envBuilder.open(tempDirectory.toFile(), MDB_NOTLS, MDB_NOSYNC, MDB_NORDAHEAD, MDB_NOMETASYNC);
freeIds = BitSet.of(DocIdSetIterator.range(0, MAX_DATABASES), MAX_DATABASES);
}
public Env<ByteBuf> getEnv() {
if (closed) {
throw new IllegalStateException("Environment closed");
}
initializeIfPossible();
return env;
}
private void initializeIfPossible() {
if (!initialized) {
synchronized(this) {
if (!initialized) {
try {
tempDirectory = Files.createTempDirectory("lmdb");
var envBuilder = Env.create(Net5ByteBufProxy.PROXY_NETTY)
.setMapSize(TWENTY_GIBIBYTES)
.setMaxDbs(MAX_DATABASES);
//env = envBuilder.open(tempDirectory.toFile(), MDB_NOLOCK, MDB_NOSYNC, MDB_NOTLS, MDB_NORDAHEAD, MDB_WRITEMAP);
env = envBuilder.open(tempDirectory.toFile(), MDB_NOTLS, MDB_NOSYNC, MDB_NORDAHEAD, MDB_NOMETASYNC);
freeIds = BitSet.of(DocIdSetIterator.range(0, MAX_DATABASES), MAX_DATABASES);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
}
}
public int allocateDb() {
initializeIfPossible();
//noinspection SynchronizeOnNonFinalField
synchronized (freeIds) {
var freeBit = freeIds.nextSetBit(0);
if (freeBit == DocIdSetIterator.NO_MORE_DOCS) {
@ -61,6 +78,8 @@ public class LLTempLMDBEnv implements Closeable {
}
public void freeDb(int db) {
initializeIfPossible();
//noinspection SynchronizeOnNonFinalField
synchronized (freeIds) {
freeIds.set(db);
}
@ -68,6 +87,16 @@ public class LLTempLMDBEnv implements Closeable {
@Override
public void close() throws IOException {
if (this.closed) {
return;
}
if (!this.initialized) {
synchronized (this) {
closed = true;
initialized = true;
return;
}
}
this.closed = true;
env.close();
//noinspection ResultOfMethodCallIgnored

View File

@ -138,17 +138,19 @@ public interface FullDocs<T extends LLDoc> extends ResourceIterable<T> {
@SuppressWarnings("unchecked") Flux<T>[] fluxes = new Flux[fullDocs.length];
for (int i = 0; i < iterables.length; i++) {
var shardIndex = i;
fluxes[i] = iterables[i].<T>map(shard -> switch (shard) {
case LLScoreDoc scoreDoc ->
//noinspection unchecked
(T) new LLScoreDoc(scoreDoc.doc(), scoreDoc.score(), shardIndex);
case LLFieldDoc fieldDoc ->
//noinspection unchecked
(T) new LLFieldDoc(fieldDoc.doc(), fieldDoc.score(), shardIndex, fieldDoc.fields());
case LLSlotDoc slotDoc ->
//noinspection unchecked
(T) new LLSlotDoc(slotDoc.doc(), slotDoc.score(), shardIndex, slotDoc.slot());
case null, default -> throw new UnsupportedOperationException("Unsupported type " + (shard == null ? null : shard.getClass()));
fluxes[i] = iterables[i].<T>map(shard -> {
if (shard instanceof LLScoreDoc scoreDoc) {
//noinspection unchecked
return (T) new LLScoreDoc(scoreDoc.doc(), scoreDoc.score(), shardIndex);
} else if (shard instanceof LLFieldDoc fieldDoc) {
//noinspection unchecked
return (T) new LLFieldDoc(fieldDoc.doc(), fieldDoc.score(), shardIndex, fieldDoc.fields());
} else if (shard instanceof LLSlotDoc slotDoc) {
//noinspection unchecked
return (T) new LLSlotDoc(slotDoc.doc(), slotDoc.score(), shardIndex, slotDoc.slot());
} else {
throw new UnsupportedOperationException("Unsupported type " + (shard == null ? null : shard.getClass()));
}
});
if (fullDocs[i].totalHits().relation == EQUAL_TO) {
fluxes[i] = fluxes[i].take(fullDocs[i].totalHits().value, true);

View File

@ -71,6 +71,7 @@ import org.apache.lucene.search.similarities.ClassicSimilarity;
import org.apache.lucene.search.similarities.PerFieldSimilarityWrapper;
import org.apache.lucene.search.similarities.Similarity;
import org.apache.lucene.search.similarities.TFIDFSimilarity;
import org.apache.lucene.util.BytesRef;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.novasearch.lucene.search.similarities.BM25Similarity;
@ -178,7 +179,7 @@ public class LuceneUtils {
* @throws IOException when an error occurs when reading the document
*/
@NotNull
public static String keyOfTopDoc(int docId, IndexReader indexReader,
public static BytesRef keyOfTopDoc(int docId, IndexReader indexReader,
String keyFieldName) throws IOException, NoSuchElementException {
if (Schedulers.isInNonBlockingThread()) {
throw new UnsupportedOperationException("Called keyOfTopDoc in a nonblocking thread");
@ -202,7 +203,7 @@ public class LuceneUtils {
.map(IndexableField::name)
.collect(Collectors.joining(",", "[", "]")));
} else {
return field.stringValue();
return field.binaryValue();
}
}
}
@ -392,7 +393,7 @@ public class LuceneUtils {
indexSearcher = indexSearchers.get(shardIndex);
}
try {
String collectedDoc = keyOfTopDoc(shardDocId, indexSearcher.getIndexReader(), keyFieldName);
BytesRef collectedDoc = keyOfTopDoc(shardDocId, indexSearcher.getIndexReader(), keyFieldName);
return new LLKeyScore(shardDocId, score, collectedDoc);
} catch (NoSuchElementException ex) {
logger.debug("Error: document {} key is not present!", shardDocId);

View File

@ -119,7 +119,7 @@ public class CountMultiSearcher implements MultiSearcher {
}
}).subscribeOn(uninterruptibleScheduler(Schedulers.boundedElastic())))
.publishOn(Schedulers.parallel())
.timeout(queryParams.timeout());
.transform(TimeoutUtil.timeoutMono(queryParams.timeout()));
},
is -> Mono.empty()
)

View File

@ -66,7 +66,7 @@ public class LuceneGenerator implements Supplier<ScoreDoc> {
return s;
}
)
.subscribeOn(SCHED, false);
.subscribeOn(SCHED);
}
@Override

View File

@ -0,0 +1,31 @@
package it.cavallium.dbengine.lucene.searcher;
import java.time.Duration;
import java.util.function.Function;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
public class TimeoutUtil {
private static final Duration INFINITE = Duration.ofDays(360);
public static <T> Function<Mono<T>, Mono<T>> timeoutMono(Duration timeout) {
return query -> {
if (timeout.isZero() || timeout.isNegative() || timeout.compareTo(INFINITE) > 0) {
return query;
} else {
return query.timeout(timeout);
}
};
}
public static <T> Function<Flux<T>, Flux<T>> timeoutFlux(Duration timeout) {
return query -> {
if (timeout.compareTo(INFINITE) > 0) {
return query;
} else {
return query.timeout(timeout);
}
};
}
}

View File

@ -13,6 +13,7 @@ import it.cavallium.dbengine.lucene.analyzer.TextFieldsSimilarity;
import java.util.LinkedList;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.Field.Store;
import org.apache.lucene.util.BytesRef;
import org.jetbrains.annotations.NotNull;
import reactor.core.publisher.Mono;
@ -51,8 +52,8 @@ public class StringIndicizer extends Indicizer<String, String> {
}
@Override
public @NotNull String getKey(String key) {
return key;
public @NotNull String getKey(BytesRef key) {
return key.utf8ToString();
}
@Override