Fix all warnings

This commit is contained in:
Andrea Cavalli 2021-02-03 14:37:02 +01:00
parent 9ac9d27f07
commit db5c444b92
35 changed files with 152 additions and 229 deletions

17
pom.xml
View File

@ -26,6 +26,23 @@
<url>https://mvn.mchv.eu/repository/mchv-snapshot</url>
</repository>
</repositories>
<distributionManagement>
<repository>
<id>mchv-release-distribution</id>
<name>MCHV Release Apache Maven Packages Distribution</name>
<url>https://mvn.mchv.eu/repository/mchv</url>
</repository>
<snapshotRepository>
<id>mchv-snapshot-distribution</id>
<name>MCHV Snapshot Apache Maven Packages Distribution</name>
<url>https://mvn.mchv.eu/repository/mchv-snapshot</url>
</snapshotRepository>
</distributionManagement>
<scm>
<connection>scm:git:https://git.ignuranza.net/andreacavalli/CavalliumDBEngine.git</connection>
<developerConnection>scm:git:https://git.ignuranza.net/andreacavalli/CavalliumDBEngine.git</developerConnection>
<tag>HEAD</tag>
</scm>
<dependencies>
<dependency>
<groupId>org.slf4j</groupId>

View File

@ -8,13 +8,12 @@ import it.cavallium.dbengine.database.collections.DatabaseMapDictionary;
import it.cavallium.dbengine.database.collections.DatabaseMapDictionaryDeep;
import it.cavallium.dbengine.database.collections.DatabaseStageEntry;
import it.cavallium.dbengine.database.collections.DatabaseStageMap;
import it.cavallium.dbengine.database.collections.QueryableBuilder;
import it.cavallium.dbengine.database.serialization.Serializer;
import it.cavallium.dbengine.database.serialization.SerializerFixedBinaryLength;
import it.cavallium.dbengine.database.collections.SubStageGetterMap;
import it.cavallium.dbengine.database.collections.SubStageGetterMapDeep;
import it.cavallium.dbengine.database.collections.SubStageGetterSingleBytes;
import it.cavallium.dbengine.database.disk.LLLocalDatabaseConnection;
import it.cavallium.dbengine.database.serialization.Serializer;
import it.cavallium.dbengine.database.serialization.SerializerFixedBinaryLength;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
@ -55,36 +54,6 @@ public class SpeedExample {
.blockOptional();
}
private static Mono<Void> testCreateQueryable() {
var ssg = new SubStageGetterSingleBytes();
var ser = SerializerFixedBinaryLength.noop(4);
var itemKey = new byte[]{0, 1, 2, 3};
var newValue = new byte[]{4, 5, 6, 7};
return test("Create Queryable",
tempDb()
.flatMap(db -> db.getDictionary("testmap").map(dict -> Tuples.of(db, dict)))
.map(tuple -> tuple.mapT2(dict -> {
var builder = new QueryableBuilder(2);
return builder.wrap(DatabaseMapDictionaryDeep.simple(dict, builder.serializer(), builder.tail(ssg, ser)));
})),
tuple -> Flux.range(0, batchSize).flatMap(n -> Mono
.defer(() -> Mono
.fromRunnable(() -> {
if (printPreviousValue)
System.out.println("Setting new value at key " + Arrays.toString(itemKey) + ": " + Arrays.toString(newValue));
})
.then(tuple.getT2().at(null, itemKey))
.flatMap(handle -> handle.setAndGetPrevious(newValue))
.doOnSuccess(oldValue -> {
if (printPreviousValue)
System.out.println("Old value: " + (oldValue == null ? "None" : Arrays.toString(oldValue)));
})
))
.then(),
numRepeats,
tuple -> tuple.getT1().close());
}
private static Mono<Void> test2LevelPut() {
var k1ser = SerializerFixedBinaryLength.noop(4);
var k2ser = SerializerFixedBinaryLength.noop(4);

View File

@ -3,6 +3,7 @@ package it.cavallium.dbengine.client;
import java.util.Objects;
import java.util.StringJoiner;
@SuppressWarnings("unused")
public class CompositeDatabasePartLocation {
private final CompositeDatabasePartType partType;
private final String partName;

View File

@ -5,6 +5,7 @@ import java.time.Duration;
import java.util.List;
import reactor.core.publisher.Mono;
@SuppressWarnings("UnusedReturnValue")
public interface LLDatabaseConnection {
Mono<? extends LLDatabaseConnection> connect();

View File

@ -8,6 +8,7 @@ import org.warp.commonutils.concurrency.atomicity.NotAtomic;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@SuppressWarnings("unused")
@NotAtomic
public interface LLDictionary extends LLKeyValueDatabaseStructure {

View File

@ -1,5 +1,5 @@
package it.cavallium.dbengine.database;
public enum LLDictionaryResultType {
VOID, VALUE_CHANGED, PREVIOUS_VALUE;
VOID, VALUE_CHANGED, PREVIOUS_VALUE
}

View File

@ -8,17 +8,16 @@ import java.util.Arrays;
import java.util.Objects;
import java.util.StringJoiner;
import org.apache.lucene.document.Field;
import org.jetbrains.annotations.Nullable;
public class LLItem {
private final LLType type;
private final String name;
private final byte[] data;
@Nullable
// nullable
private final byte[] data2;
public LLItem(LLType type, String name, byte[] data, @Nullable byte[] data2) {
public LLItem(LLType type, String name, byte[] data, byte[] data2) {
this.type = type;
this.name = name;
this.data = data;

View File

@ -74,10 +74,6 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
HashMap::new);
}
private Entry<byte[], byte[]> stripPrefix(Entry<byte[], byte[]> entry) {
return Map.entry(stripPrefix(entry.getKey()), entry.getValue());
}
@Override
public Mono<Map<T, U>> clearAndGetPrevious() {
return dictionary

View File

@ -14,7 +14,6 @@ import reactor.core.publisher.Mono;
import reactor.util.function.Tuples;
// todo: implement optimized methods
@SuppressWarnings("Convert2MethodRef")
public class DatabaseMapDictionaryDeep<T, U, US extends DatabaseStage<U>> implements DatabaseStageMap<T, U, US> {
public static final byte[] EMPTY_BYTES = new byte[0];
@ -143,15 +142,6 @@ public class DatabaseMapDictionaryDeep<T, U, US extends DatabaseStage<U>> implem
return Arrays.copyOfRange(key, this.keyPrefix.length, key.length);
}
/**
* Remove ext from suffix
*/
protected byte[] trimSuffix(byte[] keySuffix) {
if (keySuffix.length == keySuffixLength)
return keySuffix;
return Arrays.copyOf(keySuffix, keySuffixLength);
}
/**
* Remove ext from full key
*/
@ -170,15 +160,6 @@ public class DatabaseMapDictionaryDeep<T, U, US extends DatabaseStage<U>> implem
return result;
}
/**
* Remove suffix from keySuffix, returning probably an empty byte array
*/
protected byte[] stripSuffix(byte[] keySuffix) {
if (keySuffix.length == this.keySuffixLength)
return EMPTY_BYTES;
return Arrays.copyOfRange(keySuffix, this.keySuffixLength, keySuffix.length);
}
protected LLSnapshot resolveSnapshot(@Nullable CompositeSnapshot snapshot) {
if (snapshot == null) {
return null;

View File

@ -5,7 +5,7 @@ import java.util.Objects;
import org.jetbrains.annotations.Nullable;
import reactor.core.publisher.Mono;
public interface DatabaseStage<T> extends DatabaseEntryable<T> {
public interface DatabaseStage<T> extends DatabaseStageWithEntry<T> {
Mono<T> get(@Nullable CompositeSnapshot snapshot);

View File

@ -1,3 +0,0 @@
package it.cavallium.dbengine.database.collections;
public interface DatabaseStageQueryable<T, U> {}

View File

@ -1,6 +1,6 @@
package it.cavallium.dbengine.database.collections;
public interface DatabaseEntryable<T> {
public interface DatabaseStageWithEntry<T> {
DatabaseStageEntry<T> entry();
}

View File

@ -2,6 +2,7 @@ package it.cavallium.dbengine.database.collections;
import java.io.IOException;
@SuppressWarnings("SpellCheckingInspection")
public interface JoinerBlocking<KEY, DBVALUE, JOINEDVALUE> {
interface ValueGetterBlocking<KEY, VALUE> {

View File

@ -1,24 +0,0 @@
package it.cavallium.dbengine.database.collections;
import it.cavallium.dbengine.database.serialization.SerializerFixedBinaryLength;
public class QueryableBuilder {
public QueryableBuilder(int stagesNumber) {
}
public SerializerFixedBinaryLength<byte[], byte[]> serializer() {
return null;
}
public <T, U extends SubStageGetterSingle<T>> SubStageGetterSingleBytes tail(U ssg,
SerializerFixedBinaryLength<T, byte[]> ser) {
return null;
}
public <T, U, US extends DatabaseStage<U>, M extends DatabaseStageMap<T, U, US>> M wrap(M map) {
return null;
}
}

View File

@ -21,12 +21,10 @@ public class SubStageGetterSingle<T> implements SubStageGetter<T, DatabaseStageE
@Nullable CompositeSnapshot snapshot,
byte[] keyPrefix,
Flux<byte[]> keyFlux) {
//System.out.println(Thread.currentThread() + "subStageGetterSingle1");
return keyFlux
.singleOrEmpty()
.flatMap(key -> Mono
.<DatabaseStageEntry<T>>fromCallable(() -> {
//System.out.println(Thread.currentThread() + "subStageGetterSingle2");
if (!Arrays.equals(keyPrefix, key)) {
throw new IndexOutOfBoundsException("Found more than one element!");
}
@ -34,8 +32,7 @@ public class SubStageGetterSingle<T> implements SubStageGetter<T, DatabaseStageE
})
)
.then(Mono.fromSupplier(() -> {
//System.out.println(Thread.currentThread() + "subStageGetterSingle3");
return new DatabaseSingle<T>(dictionary,
return new DatabaseSingle<>(dictionary,
keyPrefix,
serializer
);
@ -47,13 +44,4 @@ public class SubStageGetterSingle<T> implements SubStageGetter<T, DatabaseStageE
return true;
}
//todo: temporary wrapper. convert the whole class to buffers
private T deserialize(byte[] bytes) {
return serializer.deserialize(bytes);
}
//todo: temporary wrapper. convert the whole class to buffers
private byte[] serialize(T bytes) {
return serializer.serialize(bytes);
}
}

View File

@ -37,7 +37,7 @@ public class LLLocalDatabaseConnection implements LLDatabaseConnection {
@Override
public Mono<LLLocalKeyValueDatabase> getDatabase(String name, List<Column> columns, boolean lowMemory) {
return Mono
.<LLLocalKeyValueDatabase>fromCallable(() -> new LLLocalKeyValueDatabase(name,
.fromCallable(() -> new LLLocalKeyValueDatabase(name,
basePath.resolve("database_" + name),
columns,
new LinkedList<>(),

View File

@ -224,7 +224,7 @@ public class LLLocalDictionary implements LLDictionary {
.window(MULTI_GET_WINDOW)
.flatMap(keysWindowFlux -> keysWindowFlux.collectList()
.flatMapMany(keysWindow -> Mono
.<ArrayList<Entry<byte[], byte[]>>>fromCallable(() -> {
.fromCallable(() -> {
var handlesArray = new ColumnFamilyHandle[keysWindow.size()];
Arrays.fill(handlesArray, cfh);
var handles = ObjectArrayList.wrap(handlesArray, handlesArray.length);
@ -240,7 +240,7 @@ public class LLLocalDictionary implements LLDictionary {
return mappedResults;
})
.subscribeOn(dbScheduler)
.<Entry<byte[], byte[]>>flatMapMany(Flux::fromIterable)
.flatMapMany(Flux::fromIterable)
)
)
.onErrorMap(IOException::new);
@ -292,31 +292,6 @@ public class LLLocalDictionary implements LLDictionary {
.map(oldValue -> Map.entry(newEntry.getKey(), oldValue)));
}
@NotNull
private Flux<Entry<byte[], byte[]>> putEntryToWriteBatch(List<Entry<byte[], byte[]>> newEntries, boolean getOldValues,
CappedWriteBatch writeBatch) {
return Flux
.from(Flux
.defer(() -> {
if (getOldValues) {
return getMulti(null, Flux.fromIterable(newEntries).map(Entry::getKey));
} else {
return Flux.empty();
}
})
.concatWith(Mono
.<Entry<byte[], byte[]>>fromCallable(() -> {
synchronized (writeBatch) {
for (Entry<byte[], byte[]> newEntry : newEntries) {
writeBatch.put(cfh, newEntry.getKey(), newEntry.getValue());
}
}
return null;
}).subscribeOn(dbScheduler)
)
);
}
@Override
public Flux<Entry<byte[], byte[]>> getRange(@Nullable LLSnapshot snapshot, LLRange range) {
return Flux.defer(() -> {
@ -408,7 +383,7 @@ public class LLLocalDictionary implements LLDictionary {
if (!currentGroupValues.isEmpty()) {
sink.next(currentGroupValues);
}
} finally {;
} finally {
sink.complete();
}
})
@ -448,16 +423,14 @@ public class LLLocalDictionary implements LLDictionary {
if (range.hasMax() && Arrays.compareUnsigned(key, range.getMax()) > 0) {
break;
}
if (Arrays.equals(firstGroupKey, 0, prefixLength, key, 0, prefixLength)) {
currentGroupValues.add(key);
} else {
if (!Arrays.equals(firstGroupKey, 0, prefixLength, key, 0, prefixLength)) {
if (!currentGroupValues.isEmpty()) {
sink.next(currentGroupValues);
}
firstGroupKey = key;
currentGroupValues = new ArrayList<>();
currentGroupValues.add(key);
}
currentGroupValues.add(key);
rocksIterator.next();
}
if (!currentGroupValues.isEmpty()) {

View File

@ -61,9 +61,9 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
/**
* Global lucene index scheduler.
* There is only a single thread globally to not overwhelm the disk with
* parallel commits or parallel refreshes.
* concurrent commits or concurrent refreshes.
*/
private static final Scheduler luceneScheduler = Schedulers.newBoundedElastic(1,
private static final Scheduler luceneBlockingScheduler = Schedulers.newBoundedElastic(1,
Schedulers.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE,
"Lucene",
120,
@ -124,7 +124,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
}
private void registerScheduledFixedTask(Runnable task, Duration duration) {
scheduledTasksLifecycle.registerScheduledTask(luceneScheduler.schedulePeriodically(() -> {
scheduledTasksLifecycle.registerScheduledTask(luceneBlockingScheduler.schedulePeriodically(() -> {
scheduledTasksLifecycle.startScheduledTask();
try {
task.run();
@ -143,14 +143,14 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
public Mono<LLSnapshot> takeSnapshot() {
return Mono
.fromCallable(lastSnapshotSeqNo::incrementAndGet)
.subscribeOn(luceneScheduler)
.subscribeOn(luceneBlockingScheduler)
.flatMap(snapshotSeqNo -> takeLuceneSnapshot()
.flatMap(snapshot -> Mono
.fromCallable(() -> {
this.snapshots.put(snapshotSeqNo, new LuceneIndexSnapshot(snapshot));
return new LLSnapshot(snapshotSeqNo);
})
.subscribeOn(luceneScheduler)
.subscribeOn(luceneBlockingScheduler)
)
);
}
@ -160,18 +160,23 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
* avoiding the exception.
*/
private Mono<IndexCommit> takeLuceneSnapshot() {
return Mono.fromCallable(() -> {
try {
return snapshotter.snapshot();
} catch (IllegalStateException ex) {
if ("No index commit to snapshot".equals(ex.getMessage())) {
indexWriter.commit();
return snapshotter.snapshot();
} else {
throw ex;
}
}
}).subscribeOn(luceneScheduler);
return Mono
.fromCallable(() -> {
try {
//noinspection BlockingMethodInNonBlockingContext
return snapshotter.snapshot();
} catch (IllegalStateException ex) {
if ("No index commit to snapshot".equals(ex.getMessage())) {
//noinspection BlockingMethodInNonBlockingContext
indexWriter.commit();
//noinspection BlockingMethodInNonBlockingContext
return snapshotter.snapshot();
} else {
throw ex;
}
}
})
.subscribeOn(luceneBlockingScheduler);
}
@Override
@ -182,22 +187,26 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
throw new IOException("Snapshot " + snapshot.getSequenceNumber() + " not found!");
}
//noinspection BlockingMethodInNonBlockingContext
indexSnapshot.close();
var luceneIndexSnapshot = indexSnapshot.getSnapshot();
//noinspection BlockingMethodInNonBlockingContext
snapshotter.release(luceneIndexSnapshot);
// Delete unused files after releasing the snapshot
//noinspection BlockingMethodInNonBlockingContext
indexWriter.deleteUnusedFiles();
return null;
}).subscribeOn(luceneScheduler);
}).subscribeOn(luceneBlockingScheduler);
}
@Override
public Mono<Void> addDocument(LLTerm key, LLDocument doc) {
return Mono.<Void>fromCallable(() -> {
//noinspection BlockingMethodInNonBlockingContext
indexWriter.addDocument(LLUtils.toDocument(doc));
return null;
}).subscribeOn(luceneScheduler);
}).subscribeOn(luceneBlockingScheduler);
}
@Override
@ -207,10 +216,11 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
.collectList()
.flatMap(docs -> Mono
.<Void>fromCallable(() -> {
//noinspection BlockingMethodInNonBlockingContext
indexWriter.addDocuments(LLUtils.toDocuments(docs));
return null;
})
.subscribeOn(luceneScheduler))
.subscribeOn(luceneBlockingScheduler))
)
.then();
}
@ -219,17 +229,19 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
@Override
public Mono<Void> deleteDocument(LLTerm id) {
return Mono.<Void>fromCallable(() -> {
//noinspection BlockingMethodInNonBlockingContext
indexWriter.deleteDocuments(LLUtils.toTerm(id));
return null;
}).subscribeOn(luceneScheduler);
}).subscribeOn(luceneBlockingScheduler);
}
@Override
public Mono<Void> updateDocument(LLTerm id, LLDocument document) {
return Mono.<Void>fromCallable(() -> {
//noinspection BlockingMethodInNonBlockingContext
indexWriter.updateDocument(LLUtils.toTerm(id), LLUtils.toDocument(document));
return null;
}).subscribeOn(luceneScheduler);
}).subscribeOn(luceneBlockingScheduler);
}
@Override
@ -243,45 +255,53 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
.collectList()
.flatMap(luceneDocuments -> Mono
.<Void>fromCallable(() -> {
//noinspection BlockingMethodInNonBlockingContext
indexWriter.updateDocuments(LLUtils.toTerm(documents.key()), luceneDocuments);
return null;
})
.subscribeOn(luceneScheduler)
.subscribeOn(luceneBlockingScheduler)
);
}
@Override
public Mono<Void> deleteAll() {
return Mono.<Void>fromCallable(() -> {
//noinspection BlockingMethodInNonBlockingContext
indexWriter.deleteAll();
//noinspection BlockingMethodInNonBlockingContext
indexWriter.commit();
//noinspection BlockingMethodInNonBlockingContext
indexWriter.forceMergeDeletes(true);
//noinspection BlockingMethodInNonBlockingContext
indexWriter.flush();
//noinspection BlockingMethodInNonBlockingContext
indexWriter.commit();
return null;
}).subscribeOn(luceneScheduler);
}).subscribeOn(luceneBlockingScheduler);
}
private Mono<IndexSearcher> acquireSearcherWrapper(LLSnapshot snapshot) {
return Mono.fromCallable(() -> {
if (snapshot == null) {
//noinspection BlockingMethodInNonBlockingContext
return searcherManager.acquire();
} else {
return resolveSnapshot(snapshot).getIndexSearcher();
}
}).subscribeOn(luceneScheduler);
}).subscribeOn(luceneBlockingScheduler);
}
private Mono<Void> releaseSearcherWrapper(LLSnapshot snapshot, IndexSearcher indexSearcher) {
return Mono.<Void>fromRunnable(() -> {
if (snapshot == null) {
try {
//noinspection BlockingMethodInNonBlockingContext
searcherManager.release(indexSearcher);
} catch (IOException e) {
e.printStackTrace();
}
}
}).subscribeOn(luceneScheduler);
}).subscribeOn(luceneBlockingScheduler);
}
@SuppressWarnings({"Convert2MethodRef", "unchecked", "rawtypes"})
@ -308,9 +328,10 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
mlt.setBoost(true);
// Get the reference doc and apply it to MoreLikeThis, to generate the query
//noinspection BlockingMethodInNonBlockingContext
return mlt.like((Map) mltDocumentFields);
})
.subscribeOn(luceneScheduler)
.subscribeOn(luceneBlockingScheduler)
.flatMap(query -> Mono
.fromCallable(() -> {
One<Long> totalHitsCountSink = Sinks.one();
@ -319,7 +340,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
.unicast()
.onBackpressureBuffer(new ArrayBlockingQueue<>(1000));
luceneScheduler.schedule(() -> {
luceneBlockingScheduler.schedule(() -> {
try {
streamSearcher.search(indexSearcher,
query,
@ -347,7 +368,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
});
return new LLSearchResult(totalHitsCountSink.asMono(), Flux.just(topKeysSink.asFlux()));
}).subscribeOn(luceneScheduler)
}).subscribeOn(luceneBlockingScheduler)
).then()
.materialize()
.flatMap(value -> releaseSearcherWrapper(snapshot, indexSearcher).thenReturn(value))
@ -369,7 +390,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
org.apache.lucene.search.ScoreMode luceneScoreMode = LLUtils.toScoreMode(scoreMode);
return Tuples.of(luceneQuery, Optional.ofNullable(luceneSort), luceneScoreMode);
})
.subscribeOn(luceneScheduler)
.subscribeOn(luceneBlockingScheduler)
.flatMap(tuple -> Mono
.fromCallable(() -> {
Query luceneQuery = tuple.getT1();
@ -382,7 +403,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
.unicast()
.onBackpressureBuffer(new ArrayBlockingQueue<>(PagedStreamSearcher.MAX_ITEMS_PER_PAGE));
luceneScheduler.schedule(() -> {
luceneBlockingScheduler.schedule(() -> {
try {
streamSearcher.search(indexSearcher,
luceneQuery,
@ -410,7 +431,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
});
return new LLSearchResult(totalHitsCountSink.asMono(), Flux.just(topKeysSink.asFlux()));
}).subscribeOn(luceneScheduler)
}).subscribeOn(luceneBlockingScheduler)
)
.materialize()
.flatMap(value -> releaseSearcherWrapper(snapshot, indexSearcher).thenReturn(value))
@ -423,11 +444,13 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
return Mono
.<Void>fromCallable(() -> {
scheduledTasksLifecycle.cancelAndWait();
//noinspection BlockingMethodInNonBlockingContext
indexWriter.close();
//noinspection BlockingMethodInNonBlockingContext
directory.close();
return null;
})
.subscribeOn(luceneScheduler);
.subscribeOn(luceneBlockingScheduler);
}
@Override
@ -436,14 +459,16 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
.<Void>fromCallable(() -> {
scheduledTasksLifecycle.startScheduledTask();
try {
//noinspection BlockingMethodInNonBlockingContext
indexWriter.commit();
//noinspection BlockingMethodInNonBlockingContext
indexWriter.flush();
} finally {
scheduledTasksLifecycle.endScheduledTask();
}
return null;
})
.subscribeOn(luceneScheduler);
.subscribeOn(luceneBlockingScheduler);
}
@Override
@ -452,13 +477,14 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
.<Void>fromCallable(() -> {
scheduledTasksLifecycle.startScheduledTask();
try {
//noinspection BlockingMethodInNonBlockingContext
searcherManager.maybeRefreshBlocking();
} finally {
scheduledTasksLifecycle.endScheduledTask();
}
return null;
})
.subscribeOn(luceneScheduler);
.subscribeOn(luceneBlockingScheduler);
}
private void scheduledCommit() {

View File

@ -9,15 +9,11 @@ import it.cavallium.dbengine.database.LLSort;
import it.cavallium.dbengine.database.LLTerm;
import it.cavallium.dbengine.database.analyzer.TextFieldsAnalyzer;
import it.cavallium.dbengine.lucene.serializer.Query;
import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
import it.unimi.dsi.fastutil.longs.Long2ObjectMap;
import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
import java.io.IOException;
import java.nio.file.Path;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
@ -25,7 +21,6 @@ import java.util.concurrent.atomic.AtomicLong;
import org.jetbrains.annotations.Nullable;
import org.warp.commonutils.batch.ParallelUtils;
import org.warp.commonutils.functional.IOBiConsumer;
import org.warp.commonutils.functional.TriFunction;
import reactor.core.publisher.Flux;
import reactor.core.publisher.GroupedFlux;
import reactor.core.publisher.Mono;
@ -95,34 +90,6 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex {
return documents.flatMap(docs -> getLuceneIndex(docs.key()).addDocuments(documents)).then();
}
private Mono<Void> runPerInstance(Iterable<LLTerm> keys,
Iterable<LLDocument> documents,
TriFunction<LLLuceneIndex, Iterable<LLTerm>, Iterable<LLDocument>, Mono<Void>> consumer) {
var keysIt = keys.iterator();
var docsIt = documents.iterator();
Int2ObjectMap<List<LLTerm>> perInstanceKeys = new Int2ObjectOpenHashMap<>();
Int2ObjectMap<List<LLDocument>> perInstanceDocs = new Int2ObjectOpenHashMap<>();
while (keysIt.hasNext()) {
LLTerm key = keysIt.next();
LLDocument doc = docsIt.next();
var instanceId = getLuceneIndexId(key);
perInstanceKeys.computeIfAbsent(instanceId, iid -> new ArrayList<>()).add(key);
perInstanceDocs.computeIfAbsent(instanceId, iid -> new ArrayList<>()).add(doc);
}
return Flux
.fromIterable(perInstanceKeys.int2ObjectEntrySet())
.flatMap(currentInstanceEntry -> {
int instanceId = currentInstanceEntry.getIntKey();
List<LLTerm> currentInstanceKeys = currentInstanceEntry.getValue();
return consumer.apply(this.luceneIndices[instanceId], currentInstanceKeys, perInstanceDocs.get(instanceId));
})
.then();
}
@Override
public Mono<Void> deleteDocument(LLTerm id) {
return getLuceneIndex(id).deleteDocument(id);

View File

@ -5,6 +5,7 @@ import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.search.IndexSearcher;
@SuppressWarnings("unused")
public class LuceneIndexSnapshot {
private final IndexCommit snapshot;

View File

@ -11,6 +11,7 @@ import org.jetbrains.annotations.Nullable;
import it.cavallium.dbengine.client.CompositeSnapshot;
import reactor.core.publisher.Mono;
@SuppressWarnings("SpellCheckingInspection")
public class JoinedIndicizerWriter<KEY, DBTYPE, JOINEDTYPE> implements LuceneIndicizerWriter<KEY, DBTYPE> {
private final LuceneIndicizerWriter<KEY, JOINEDTYPE> indicizerWriter;

View File

@ -12,6 +12,7 @@ import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuple2;
@SuppressWarnings("unused")
public interface LuceneIndicizerWriter<T, U> {
Mono<Void> add(T key, U value);

View File

@ -4,6 +4,7 @@ import com.google.common.primitives.Ints;
import com.google.common.primitives.Longs;
import org.jetbrains.annotations.NotNull;
@SuppressWarnings("unused")
public interface SerializerFixedBinaryLength<A, B> extends Serializer<A, B> {
int getSerializedBinaryLength();

View File

@ -65,7 +65,7 @@ public class RandomFieldComparator extends FieldComparator<Float> implements Lea
var randomizedScorer = new Scorable() {
@Override
public float score() throws IOException {
public float score() {
return randomize(scorer.docID());
}
@ -81,9 +81,10 @@ public class RandomFieldComparator extends FieldComparator<Float> implements Lea
}
}
@SuppressWarnings("RedundantCast")
@Override
public Float value(int slot) {
return Float.valueOf(scores[slot]);
return (float) scores[slot];
}
// Override because we sort reverse of natural Float order:

View File

@ -14,7 +14,7 @@ public class RandomFieldComparatorSource extends FieldComparatorSource {
}
@Override
public FieldComparator<?> newComparator(String fieldname, int numHits, int sortPos, boolean reversed) {
public FieldComparator<?> newComparator(String fieldName, int numHits, int sortPos, boolean reversed) {
return new RandomFieldComparator(rand.iterator(), numHits);
}
}

View File

@ -49,7 +49,7 @@ public class PagedStreamSearcher implements LuceneStreamSearcher {
totalHitsConsumer.accept(lastTopDocs.totalHits.value);
if (lastTopDocs.scoreDocs.length > 0) {
ScoreDoc lastScoreDoc = getLastItem(lastTopDocs.scoreDocs);
consumeHits(currentAllowedResults, lastTopDocs.scoreDocs, indexSearcher, scoreMode, keyFieldName, resultsConsumer);
consumeHits(currentAllowedResults, lastTopDocs.scoreDocs, indexSearcher, keyFieldName, resultsConsumer);
// Run the searches for each page until the end
boolean finished = currentAllowedResults.var <= 0;
@ -57,7 +57,7 @@ public class PagedStreamSearcher implements LuceneStreamSearcher {
lastTopDocs = indexSearcher.searchAfter(lastScoreDoc, query, MAX_ITEMS_PER_PAGE, luceneSort, scoreMode != ScoreMode.COMPLETE_NO_SCORES);
if (lastTopDocs.scoreDocs.length > 0) {
lastScoreDoc = getLastItem(lastTopDocs.scoreDocs);
consumeHits(currentAllowedResults, lastTopDocs.scoreDocs, indexSearcher, scoreMode, keyFieldName, resultsConsumer);
consumeHits(currentAllowedResults, lastTopDocs.scoreDocs, indexSearcher, keyFieldName, resultsConsumer);
}
if (lastTopDocs.scoreDocs.length < MAX_ITEMS_PER_PAGE || currentAllowedResults.var <= 0) {
finished = true;
@ -69,7 +69,6 @@ public class PagedStreamSearcher implements LuceneStreamSearcher {
private void consumeHits(IntWrapper currentAllowedResults,
ScoreDoc[] hits,
IndexSearcher indexSearcher,
ScoreMode scoreMode,
String keyFieldName,
Consumer<LLKeyScore> resultsConsumer) throws IOException {
for (ScoreDoc hit : hits) {

View File

@ -2,6 +2,7 @@ package it.cavallium.dbengine.lucene.serializer;
import java.util.Collection;
@SuppressWarnings("unused")
public class BooleanQuery implements Query {
private final BooleanQueryPart[] parts;
@ -26,8 +27,8 @@ public class BooleanQuery implements Query {
StringifyUtils.stringifyInt(data, minShouldMatch);
StringBuilder listData = new StringBuilder();
listData.append(parts.length).append('|');
for (int i = 0; i < parts.length; i++) {
parts[i].stringify(listData);
for (BooleanQueryPart part : parts) {
part.stringify(listData);
}
StringifyUtils.writeHeader(data, QueryConstructorType.BOOLEAN_QUERY_INFO_LIST, listData);
StringifyUtils.writeHeader(output, QueryConstructorType.BOOLEAN_QUERY, data);

View File

@ -4,6 +4,7 @@ import org.apache.lucene.index.Term;
import org.apache.lucene.search.BooleanQuery;
import org.apache.lucene.util.automaton.LevenshteinAutomata;
@SuppressWarnings("unused")
public class FuzzyQuery implements Query {
private final Term term;

View File

@ -2,6 +2,7 @@ package it.cavallium.dbengine.lucene.serializer;
import org.apache.lucene.search.BooleanClause;
@SuppressWarnings("unused")
public class Occur implements SerializedQueryObject {
private final BooleanClause.Occur occur;

View File

@ -14,8 +14,8 @@ public class PhraseQuery implements Query {
StringBuilder data = new StringBuilder();
StringBuilder listData = new StringBuilder();
listData.append(parts.length).append('|');
for (int i = 0; i < parts.length; i++) {
StringifyUtils.stringifyTermPosition(listData, parts[i]);
for (TermPosition part : parts) {
StringifyUtils.stringifyTermPosition(listData, part);
}
StringifyUtils.writeHeader(data, QueryConstructorType.TERM_POSITION_LIST, listData);
StringifyUtils.writeHeader(output, QueryConstructorType.PHRASE_QUERY, data);

View File

@ -16,7 +16,7 @@ import org.apache.lucene.index.Term;
public interface Query extends SerializedQueryObject {
static Query approximativeSearch(TextFieldsAnalyzer yotsubaAnalyzer, String field, String text) {
static Query approximateSearch(TextFieldsAnalyzer yotsubaAnalyzer, String field, String text) {
try {
var terms = getTerms(yotsubaAnalyzer, field, text);

View File

@ -23,8 +23,7 @@ public class QueryParser {
public static Query parse(String text) throws ParseException {
try {
var builtQuery = (Query) parse(text, new AtomicInteger(0));
return builtQuery;
return (Query) parse(text, new AtomicInteger(0));
} catch (Exception e) {
throw new ParseException(e);
}
@ -72,10 +71,13 @@ public class QueryParser {
switch (type) {
case TERM_QUERY:
Term term = (Term) parse(completeText, position);
assert term != null;
return new TermQuery(term);
case BOOST_QUERY:
Query query = (Query) parse(completeText, position);
Float numb = (Float) parse(completeText, position);
assert query != null;
assert numb != null;
return new BoostQuery(query, numb);
case FUZZY_QUERY:
Term fqTerm = (Term) parse(completeText, position);
@ -83,11 +85,16 @@ public class QueryParser {
Integer numb2 = (Integer) parse(completeText, position);
Integer numb3 = (Integer) parse(completeText, position);
Boolean bool1 = (Boolean) parse(completeText, position);
assert fqTerm != null;
assert numb1 != null;
assert numb2 != null;
assert numb3 != null;
assert bool1 != null;
return new FuzzyQuery(fqTerm, numb1, numb2, numb3, bool1);
case PHRASE_QUERY:
//noinspection unchecked
TermPosition[] pqTerms = (TermPosition[]) parse(completeText, position);
var pqB = new PhraseQuery.Builder();
assert pqTerms != null;
for (TermPosition pqTerm : pqTerms) {
if (pqTerm != null) {
pqB.add(pqTerm.getTerm(), pqTerm.getPosition());
@ -99,7 +106,6 @@ public class QueryParser {
//noinspection ConstantConditions
int minShouldMatch = (Integer) parse(completeText, position);
bqB.setMinimumNumberShouldMatch(minShouldMatch);
//noinspection unchecked
BooleanQueryInfo[] bqTerms = (BooleanQueryInfo[]) parse(completeText, position);
assert bqTerms != null;
for (BooleanQueryInfo bqTerm : bqTerms) {
@ -113,26 +119,39 @@ public class QueryParser {
case INT_POINT_EXACT_QUERY:
String string1 = (String) parse(completeText, position);
Integer int1 = (Integer) parse(completeText, position);
assert string1 != null;
assert int1 != null;
return IntPoint.newExactQuery(string1, int1);
case LONG_POINT_EXACT_QUERY:
String string5 = (String) parse(completeText, position);
Long long3 = (Long) parse(completeText, position);
assert string5 != null;
assert long3 != null;
return LongPoint.newExactQuery(string5, long3);
case SORTED_SLOW_RANGE_QUERY:
String string2 = (String) parse(completeText, position);
Long long1 = (Long) parse(completeText, position);
Long long2 = (Long) parse(completeText, position);
assert string2 != null;
assert long1 != null;
assert long2 != null;
return SortedNumericDocValuesField.newSlowRangeQuery(string2, long1, long2);
case LONG_POINT_RANGE_QUERY:
String stringx2 = (String) parse(completeText, position);
Long longx1 = (Long) parse(completeText, position);
Long longx2 = (Long) parse(completeText, position);
return LongPoint.newRangeQuery(stringx2, longx1, longx2);
String stringX2 = (String) parse(completeText, position);
Long longX1 = (Long) parse(completeText, position);
Long longX2 = (Long) parse(completeText, position);
assert stringX2 != null;
assert longX1 != null;
assert longX2 != null;
return LongPoint.newRangeQuery(stringX2, longX1, longX2);
case INT_POINT_RANGE_QUERY:
String stringx3 = (String) parse(completeText, position);
Integer intx1 = (Integer) parse(completeText, position);
Integer intx2 = (Integer) parse(completeText, position);
return IntPoint.newRangeQuery(stringx3, intx1, intx2);
String stringX3 = (String) parse(completeText, position);
Integer intX1 = (Integer) parse(completeText, position);
Integer intX2 = (Integer) parse(completeText, position);
assert stringX3 != null;
assert intX1 != null;
assert intX2 != null;
return IntPoint.newRangeQuery(stringX3, intX1, intX2);
case INT:
position.addAndGet(toParse.length());
return Integer.parseInt(toParse);
@ -142,11 +161,13 @@ public class QueryParser {
case TERM:
String string3 = (String) parse(completeText, position);
String string4 = (String) parse(completeText, position);
assert string4 != null;
return new Term(string3, string4);
case TERM_POSITION:
Term term1 = (Term) parse(completeText, position);
Integer intx3 = (Integer) parse(completeText, position);
return new TermPosition(term1, intx3);
Integer intX3 = (Integer) parse(completeText, position);
assert intX3 != null;
return new TermPosition(term1, intX3);
case FLOAT:
position.addAndGet(toParse.length());
return Float.parseFloat(toParse);

View File

@ -3,7 +3,7 @@ package it.cavallium.dbengine.lucene.serializer;
public interface SerializedQueryObject {
/**
* @return length|type|---data---
* returns length|type|---data---
*/
void stringify(StringBuilder output);
}

View File

@ -1,5 +1,6 @@
package it.cavallium.dbengine.lucene.serializer;
@SuppressWarnings("unused")
public class SortedNumericDocValuesFieldSlowRangeQuery implements Query {
private final String name;

View File

@ -4,6 +4,7 @@ import java.nio.charset.StandardCharsets;
import java.util.Base64;
import org.apache.lucene.index.Term;
@SuppressWarnings("unused")
public class StringifyUtils {
public static void stringifyFloat(StringBuilder output, float value) {