Update LLLuceneIndex.java, LLScoreMode.java, and 22 more files...

This commit is contained in:
Andrea Cavalli 2021-01-29 17:19:01 +01:00
parent 4b442e4066
commit dd1fb834b5
24 changed files with 372 additions and 2255 deletions

View File

@ -2,13 +2,10 @@ package it.cavallium.dbengine.database;
import java.io.Closeable;
import java.io.IOException;
import java.util.Collection;
import java.util.Map;
import java.util.Set;
import org.jetbrains.annotations.Nullable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuple2;
public interface LLLuceneIndex extends Closeable, LLSnapshottable {
@ -31,20 +28,12 @@ public interface LLLuceneIndex extends Closeable, LLSnapshottable {
* @param limit the limit is valid for each lucene instance.
* If you have 15 instances, the number of elements returned
* can be at most <code>limit * 15</code>
* @return the collection has one or more flux
*/
Collection<LLTopKeys> search(@Nullable LLSnapshot snapshot, String query, int limit, @Nullable LLSort sort, String keyFieldName)
throws IOException;
/**
*
* @param limit the limit is valid for each lucene instance.
* If you have 15 instances, the number of elements returned
* can be at most <code>limit * 15</code>
*/
Collection<LLTopKeys> moreLikeThis(@Nullable LLSnapshot snapshot,
Mono<LLSearchResult> moreLikeThis(@Nullable LLSnapshot snapshot,
Map<String, Set<String>> mltDocumentFields,
int limit,
String keyFieldName) throws IOException;
String keyFieldName);
/**
*
@ -53,10 +42,11 @@ public interface LLLuceneIndex extends Closeable, LLSnapshottable {
* can be at most <code>limit * 15</code>
* @return the collection has one or more flux
*/
Tuple2<Mono<Long>, Collection<Flux<String>>> searchStream(@Nullable LLSnapshot snapshot,
Mono<LLSearchResult> search(@Nullable LLSnapshot snapshot,
String query,
int limit,
@Nullable LLSort sort,
LLScoreMode scoreMode,
String keyFieldName);
long count(@Nullable LLSnapshot snapshot, String query) throws IOException;

View File

@ -0,0 +1,7 @@
package it.cavallium.dbengine.database;
public enum LLScoreMode {
COMPLETE,
TOP_SCORES,
COMPLETE_NO_SCORES
}

View File

@ -0,0 +1,71 @@
package it.cavallium.dbengine.database;
import java.util.Objects;
import java.util.function.BiFunction;
import org.jetbrains.annotations.NotNull;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
public class LLSearchResult {
private final Mono<Long> totalHitsCount;
private final Flux<Flux<LLKeyScore>> results;
public LLSearchResult(Mono<Long> totalHitsCount, Flux<Flux<LLKeyScore>> results) {
this.totalHitsCount = totalHitsCount;
this.results = results;
}
public static LLSearchResult empty() {
return new LLSearchResult(Mono.just(0L), Flux.just(Flux.empty()));
}
@NotNull
public static BiFunction<LLSearchResult, LLSearchResult, LLSearchResult> accumulator() {
return (a, b) -> {
var mergedTotals = a.totalHitsCount.flatMap(aL -> b.totalHitsCount.map(bL -> aL + bL));
var mergedResults = Flux.merge(a.results, b.results);
return new LLSearchResult(mergedTotals, mergedResults);
};
}
public Mono<Long> totalHitsCount() {
return this.totalHitsCount;
}
public Flux<Flux<LLKeyScore>> results() {
return this.results;
}
public boolean equals(final Object o) {
if (o == this) {
return true;
}
if (!(o instanceof LLSearchResult)) {
return false;
}
final LLSearchResult other = (LLSearchResult) o;
final Object this$totalHitsCount = this.totalHitsCount();
final Object other$totalHitsCount = other.totalHitsCount();
if (!Objects.equals(this$totalHitsCount, other$totalHitsCount)) {
return false;
}
final Object this$results = this.results();
final Object other$results = other.results();
return Objects.equals(this$results, other$results);
}
public int hashCode() {
final int PRIME = 59;
int result = 1;
final Object $totalHitsCount = this.totalHitsCount();
result = result * PRIME + ($totalHitsCount == null ? 43 : $totalHitsCount.hashCode());
final Object $results = this.results();
result = result * PRIME + ($results == null ? 43 : $results.hashCode());
return result;
}
public String toString() {
return "LLSearchResult(totalHitsCount=" + this.totalHitsCount() + ", results=" + this.results() + ")";
}
}

View File

@ -4,6 +4,8 @@ import com.google.common.primitives.Ints;
import com.google.common.primitives.Longs;
import com.google.protobuf.ByteString;
import it.cavallium.dbengine.database.utils.RandomSortField;
import it.cavallium.dbengine.proto.LLKeyScore;
import it.cavallium.dbengine.proto.LLType;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.LinkedList;
@ -19,13 +21,13 @@ import org.apache.lucene.document.StringField;
import org.apache.lucene.document.TextField;
import org.apache.lucene.index.IndexableField;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.ScoreMode;
import org.apache.lucene.search.Sort;
import org.apache.lucene.search.SortField;
import org.apache.lucene.search.SortedNumericSortField;
import org.jetbrains.annotations.Nullable;
import it.cavallium.dbengine.proto.LLKeyScore;
import it.cavallium.dbengine.proto.LLType;
@SuppressWarnings("unused")
public class LLUtils {
private static final byte[] RESPONSE_TRUE = new byte[]{1};
@ -52,6 +54,15 @@ public class LLUtils {
return null;
}
public static ScoreMode toScoreMode(LLScoreMode scoreMode) {
switch (scoreMode) {
case COMPLETE: return ScoreMode.COMPLETE;
case TOP_SCORES: return ScoreMode.TOP_SCORES;
case COMPLETE_NO_SCORES: return ScoreMode.COMPLETE_NO_SCORES;
default: throw new IllegalStateException("Unexpected value: " + scoreMode);
}
}
public static Term toTerm(LLTerm term) {
return new Term(term.getKey(), term.getValue());
}
@ -176,6 +187,7 @@ public class LLUtils {
return termItemsList.stream().map(LLUtils::toLocal).collect(Collectors.toList());
}
@SuppressWarnings("ConstantConditions")
private static LLItem toLocal(it.cavallium.dbengine.proto.LLItem item) {
var data2 = item.getData2() != null ? item.getData2().toByteArray() : null;
return new LLItem(it.cavallium.dbengine.database.LLType.valueOf(item.getType().toString()),

View File

@ -3,58 +3,57 @@ package it.cavallium.dbengine.database.disk;
import it.cavallium.dbengine.database.LLDocument;
import it.cavallium.dbengine.database.LLKeyScore;
import it.cavallium.dbengine.database.LLLuceneIndex;
import it.cavallium.dbengine.database.LLScoreMode;
import it.cavallium.dbengine.database.LLSearchResult;
import it.cavallium.dbengine.database.LLSnapshot;
import it.cavallium.dbengine.database.LLSort;
import it.cavallium.dbengine.database.LLTerm;
import it.cavallium.dbengine.database.LLTopKeys;
import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.LuceneUtils;
import it.cavallium.dbengine.database.analyzer.TextFieldsAnalyzer;
import it.cavallium.dbengine.database.luceneutil.AdaptiveStreamSearcher;
import it.cavallium.dbengine.database.luceneutil.LuceneStreamSearcher;
import it.cavallium.dbengine.database.luceneutil.PagedStreamSearcher;
import it.cavallium.luceneserializer.luceneserializer.ParseException;
import it.cavallium.luceneserializer.luceneserializer.QueryParser;
import it.unimi.dsi.fastutil.objects.ObjectArrayList;
import java.io.IOException;
import java.nio.file.Path;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.lucene.document.Document;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.IndexableField;
import org.apache.lucene.index.KeepOnlyLastCommitDeletionPolicy;
import org.apache.lucene.index.SnapshotDeletionPolicy;
import org.apache.lucene.queries.mlt.MoreLikeThis;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.ScoreMode;
import org.apache.lucene.search.SearcherManager;
import org.apache.lucene.search.Sort;
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FSDirectory;
import org.jetbrains.annotations.Nullable;
import org.warp.commonutils.concurrency.executor.ScheduledTaskLifecycle;
import org.warp.commonutils.functional.IOFunction;
import org.warp.commonutils.type.ShortNamedThreadFactory;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.core.publisher.Sinks.EmissionException;
import reactor.core.publisher.Sinks.EmitResult;
import reactor.core.publisher.Sinks.Many;
import reactor.core.publisher.Sinks.One;
import reactor.core.scheduler.Schedulers;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuples;
public class LLLocalLuceneIndex implements LLLuceneIndex {
@ -221,36 +220,41 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
indexWriter.commit();
}
@Override
public Collection<LLTopKeys> search(@Nullable LLSnapshot snapshot, String queryString, int limit, @Nullable LLSort sort,
String keyFieldName)
throws IOException {
private Mono<IndexSearcher> acquireSearcherWrapper(LLSnapshot snapshot) {
return Mono.fromCallable(() -> {
if (snapshot == null) {
return searcherManager.acquire();
} else {
return resolveSnapshot(snapshot).getIndexSearcher();
}
}).subscribeOn(Schedulers.boundedElastic());
}
private Mono<Void> releaseSearcherWrapper(LLSnapshot snapshot, IndexSearcher indexSearcher) {
return Mono.<Void>fromRunnable(() -> {
if (snapshot == null) {
try {
var luceneIndexSnapshot = resolveSnapshot(snapshot);
Query query = QueryParser.parse(queryString);
Sort luceneSort = LLUtils.toSort(sort);
return Collections.singleton(runSearch(luceneIndexSnapshot, (indexSearcher) -> {
return blockingSearch(indexSearcher, limit, query, luceneSort, keyFieldName);
}));
} catch (ParseException e) {
throw new IOException("Error during query count!", e);
searcherManager.release(indexSearcher);
} catch (IOException e) {
e.printStackTrace();
}
}
}).subscribeOn(Schedulers.boundedElastic());
}
@SuppressWarnings({"Convert2MethodRef", "unchecked", "rawtypes"})
@Override
public Collection<LLTopKeys> moreLikeThis(@Nullable LLSnapshot snapshot, Map<String, Set<String>> mltDocumentFields, int limit,
String keyFieldName)
throws IOException {
var luceneIndexSnapshot = resolveSnapshot(snapshot);
public Mono<LLSearchResult> moreLikeThis(@Nullable LLSnapshot snapshot,
Map<String, Set<String>> mltDocumentFields,
int limit,
String keyFieldName) {
if (mltDocumentFields.isEmpty()) {
return Collections.singleton(new LLTopKeys(0, new LLKeyScore[0]));
return Mono.just(LLSearchResult.empty());
}
return Collections.singleton(runSearch(luceneIndexSnapshot, (indexSearcher) -> {
return acquireSearcherWrapper(snapshot)
.flatMap(indexSearcher -> Mono
.fromCallable(() -> {
var mlt = new MoreLikeThis(indexSearcher.getIndexReader());
mlt.setAnalyzer(indexWriter.getAnalyzer());
mlt.setFieldNames(mltDocumentFields.keySet().toArray(String[]::new));
@ -259,109 +263,91 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
mlt.setBoost(true);
// Get the reference doc and apply it to MoreLikeThis, to generate the query
@SuppressWarnings({"unchecked", "rawtypes"})
Query query = mlt.like((Map) mltDocumentFields);
return mlt.like((Map) mltDocumentFields);
})
.subscribeOn(Schedulers.boundedElastic())
.flatMap(query -> Mono
.fromCallable(() -> {
One<Long> totalHitsCountSink = Sinks.one();
Many<LLKeyScore> topKeysSink = Sinks.many().unicast().onBackpressureBuffer(new ArrayBlockingQueue<>(1000));
// Search
return blockingSearch(indexSearcher, limit, query, null, keyFieldName);
}));
streamSearcher.search(indexSearcher,
query,
limit,
null,
ScoreMode.COMPLETE,
keyFieldName,
keyScore -> {
EmitResult result = topKeysSink.tryEmitNext(keyScore);
if (result.isFailure()) {
throw new EmissionException(result);
}
},
totalHitsCount -> {
EmitResult result = totalHitsCountSink.tryEmitValue(totalHitsCount);
if (result.isFailure()) {
throw new EmissionException(result);
}
});
return new LLSearchResult(totalHitsCountSink.asMono(), Flux.just(topKeysSink.asFlux()));
}).subscribeOn(Schedulers.boundedElastic())
).then()
.materialize()
.flatMap(value -> releaseSearcherWrapper(snapshot, indexSearcher).thenReturn(value))
.dematerialize()
);
}
private static LLTopKeys blockingSearch(IndexSearcher indexSearcher,
int limit,
Query query,
Sort luceneSort,
String keyFieldName) throws IOException {
TopDocs results;
List<LLKeyScore> keyScores;
results = luceneSort != null ? indexSearcher.search(query, limit, luceneSort)
: indexSearcher.search(query, limit);
var hits = ObjectArrayList.wrap(results.scoreDocs);
keyScores = new LinkedList<>();
for (ScoreDoc hit : hits) {
int docId = hit.doc;
float score = hit.score;
Document d = indexSearcher.doc(docId, Set.of(keyFieldName));
if (d.getFields().isEmpty()) {
System.err.println("The document docId:" + docId + ",score:" + score + " is empty.");
var realFields = indexSearcher.doc(docId).getFields();
if (!realFields.isEmpty()) {
System.err.println("Present fields:");
for (IndexableField field : realFields) {
System.err.println(" - " + field.name());
}
}
} else {
var field = d.getField(keyFieldName);
if (field == null) {
System.err.println("Can't get key of document docId:" + docId + ",score:" + score);
} else {
keyScores.add(new LLKeyScore(field.stringValue(), score));
}
}
}
return new LLTopKeys(results.totalHits.value, keyScores.toArray(new LLKeyScore[0]));
}
@SuppressWarnings("UnnecessaryLocalVariable")
@SuppressWarnings("Convert2MethodRef")
@Override
public Tuple2<Mono<Long>, Collection<Flux<String>>> searchStream(@Nullable LLSnapshot snapshot, String queryString, int limit,
@Nullable LLSort sort, String keyFieldName) {
try {
public Mono<LLSearchResult> search(@Nullable LLSnapshot snapshot, String queryString, int limit,
@Nullable LLSort sort, LLScoreMode scoreMode, String keyFieldName) {
return acquireSearcherWrapper(snapshot)
.flatMap(indexSearcher -> Mono
.fromCallable(() -> {
Query query = QueryParser.parse(queryString);
Sort luceneSort = LLUtils.toSort(sort);
var acquireSearcherWrappedBlocking = Mono
.fromCallable(() -> {
if (snapshot == null) {
return searcherManager.acquire();
} else {
return resolveSnapshot(snapshot).getIndexSearcher();
}
org.apache.lucene.search.ScoreMode luceneScoreMode = LLUtils.toScoreMode(scoreMode);
return Tuples.of(query, Optional.ofNullable(luceneSort), luceneScoreMode);
})
.subscribeOn(Schedulers.boundedElastic());
.subscribeOn(Schedulers.boundedElastic())
.flatMap(tuple -> Mono
.fromCallable(() -> {
Query query = tuple.getT1();
Sort luceneSort = tuple.getT2().orElse(null);
ScoreMode luceneScoreMode = tuple.getT3();
EmitterProcessor<Long> countProcessor = EmitterProcessor.create();
EmitterProcessor<String> resultsProcessor = EmitterProcessor.create();
One<Long> totalHitsCountSink = Sinks.one();
Many<LLKeyScore> topKeysSink = Sinks.many().unicast().onBackpressureBuffer(new ArrayBlockingQueue<>(PagedStreamSearcher.MAX_ITEMS_PER_PAGE));
var publisher = acquireSearcherWrappedBlocking.flatMapMany(indexSearcher -> {
return Flux.<Object>push(sink -> {
try {
Long approximatedTotalResultsCount = streamSearcher.streamSearch(indexSearcher,
streamSearcher.search(indexSearcher,
query,
limit,
luceneSort,
luceneScoreMode,
keyFieldName,
sink::next
);
sink.next(approximatedTotalResultsCount);
sink.complete();
} catch (IOException e) {
sink.error(e);
}
}).subscribeOn(Schedulers.boundedElastic())
.doOnTerminate(() -> {
if (snapshot == null) {
try {
searcherManager.release(indexSearcher);
} catch (IOException e) {
e.printStackTrace();
keyScore -> {
EmitResult result = topKeysSink.tryEmitNext(keyScore);
if (result.isFailure()) {
throw new EmissionException(result);
}
},
totalHitsCount -> {
EmitResult result = totalHitsCountSink.tryEmitValue(totalHitsCount);
if (result.isFailure()) {
throw new EmissionException(result);
}
});
}).publish();
publisher.filter(item -> item instanceof Long).cast(Long.class).subscribe(countProcessor);
publisher.filter(item -> item instanceof String).cast(String.class).subscribe(resultsProcessor);
publisher.connect();
return Tuples.of(countProcessor.single(0L), Collections.singleton(resultsProcessor));
} catch (ParseException e) {
var error = new IOException("Error during query count!", e);
return Tuples.of(Mono.error(error), Collections.singleton(Flux.error(error)));
}
return new LLSearchResult(totalHitsCountSink.asMono(), Flux.just(topKeysSink.asFlux()));
}).subscribeOn(Schedulers.boundedElastic())
)
.materialize()
.flatMap(value -> releaseSearcherWrapper(snapshot, indexSearcher).thenReturn(value))
.dematerialize()
);
}
@Override
@ -394,11 +380,11 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
}
}
@SuppressWarnings("unused")
private void scheduledQueryRefresh() {
try {
if (!searcherManager.maybeRefresh()) {
// skipped refreshing because another thread is currently refreshing
}
boolean refreshStarted = searcherManager.maybeRefresh();
// if refreshStarted == false, another thread is currently already refreshing
} catch (IOException ex) {
ex.printStackTrace();
}

View File

@ -3,6 +3,8 @@ package it.cavallium.dbengine.database.disk;
import it.cavallium.dbengine.database.LLDocument;
import it.cavallium.dbengine.database.LLKeyScore;
import it.cavallium.dbengine.database.LLLuceneIndex;
import it.cavallium.dbengine.database.LLScoreMode;
import it.cavallium.dbengine.database.LLSearchResult;
import it.cavallium.dbengine.database.LLSnapshot;
import it.cavallium.dbengine.database.LLSort;
import it.cavallium.dbengine.database.LLTerm;
@ -21,12 +23,9 @@ import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.StampedLock;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import org.jetbrains.annotations.Nullable;
import org.warp.commonutils.batch.ParallelUtils;
import org.warp.commonutils.functional.IOBiConsumer;
@ -35,7 +34,7 @@ import org.warp.commonutils.functional.IOTriConsumer;
import org.warp.commonutils.locks.LockUtils;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuple2;
import reactor.core.scheduler.Schedulers;
import reactor.util.function.Tuples;
public class LLLocalMultiLuceneIndex implements LLLuceneIndex {
@ -166,26 +165,6 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex {
});
}
@Override
public Collection<LLTopKeys> search(@Nullable LLSnapshot snapshot,
String query,
int limit,
@Nullable LLSort sort,
String keyFieldName) throws IOException {
return LockUtils.readLockIO(access, () -> {
Collection<Collection<LLTopKeys>> result = new ConcurrentLinkedQueue<>();
ParallelUtils.parallelizeIO((IOBiConsumer<LLLuceneIndex, LLSnapshot> s) -> {
for (int i = 0; i < luceneIndices.length; i++) {
s.consume(luceneIndices[i], resolveSnapshot(snapshot, i));
}
}, maxQueueSize, luceneIndices.length, 1, (instance, instanceSnapshot) -> {
result.add(instance.search(instanceSnapshot, query, limit, sort, keyFieldName));
});
return result;
}).stream().flatMap(Collection::stream).collect(Collectors.toList());
}
private LLTopKeys mergeTopKeys(Collection<LLTopKeys> multi) {
long totalHitsCount = 0;
LLKeyScore[] hits;
@ -215,57 +194,61 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex {
}
@Override
public Collection<LLTopKeys> moreLikeThis(@Nullable LLSnapshot snapshot,
public Mono<LLSearchResult> moreLikeThis(@Nullable LLSnapshot snapshot,
Map<String, Set<String>> mltDocumentFields,
int limit,
String keyFieldName) throws IOException {
return LockUtils.readLockIO(access, () -> {
Collection<Collection<LLTopKeys>> result = new ConcurrentLinkedQueue<>();
ParallelUtils.parallelizeIO((IOBiConsumer<LLLuceneIndex, LLSnapshot> s) -> {
for (int i = 0; i < luceneIndices.length; i++) {
s.consume(luceneIndices[i], resolveSnapshot(snapshot, i));
}
}, maxQueueSize, luceneIndices.length, 1, (instance, instanceSnapshot) -> {
result.add(instance.moreLikeThis(instanceSnapshot, mltDocumentFields, limit, keyFieldName));
});
return result;
}).stream().flatMap(Collection::stream).collect(Collectors.toList());
String keyFieldName) {
return Mono
.fromSupplier(access::readLock)
.subscribeOn(Schedulers.boundedElastic())
.flatMap(stamp -> Flux
.fromArray(luceneIndices)
.index()
.flatMap(tuple -> Mono
.fromCallable(() -> resolveSnapshot(snapshot, (int) (long) tuple.getT1()))
.subscribeOn(Schedulers.boundedElastic())
.map(luceneSnapshot -> Tuples.of(tuple.getT2(), luceneSnapshot))
)
.flatMap(tuple -> tuple.getT1().moreLikeThis(tuple.getT2(), mltDocumentFields, limit, keyFieldName))
.reduce(LLSearchResult.accumulator())
.materialize()
.flatMap(signal -> Mono
.fromRunnable(() -> access.unlockRead(stamp))
.subscribeOn(Schedulers.boundedElastic())
.thenReturn(signal)
)
.dematerialize()
);
}
@Override
public Tuple2<Mono<Long>, Collection<Flux<String>>> searchStream(@Nullable LLSnapshot snapshot,
public Mono<LLSearchResult> search(@Nullable LLSnapshot snapshot,
String query,
int limit,
@Nullable LLSort sort,
LLScoreMode scoreMode,
String keyFieldName) {
Collection<Tuple2<Mono<Long>, Collection<Flux<String>>>> multi = LockUtils.readLock(access, () -> {
Collection<Tuple2<Mono<Long>, Collection<Flux<String>>>> result = new ConcurrentLinkedQueue<>();
ParallelUtils.parallelize((BiConsumer<LLLuceneIndex, LLSnapshot> s) -> {
for (int i = 0; i < luceneIndices.length; i++) {
s.accept(luceneIndices[i], resolveSnapshot(snapshot, i));
}
}, maxQueueSize, luceneIndices.length, 1, (instance, instanceSnapshot) -> {
result.add(instance.searchStream(instanceSnapshot, query, limit, sort, keyFieldName));
});
return result;
});
Mono<Long> result1;
Collection<Flux<String>> result2;
result1 = Mono.zip(multi.stream().map(Tuple2::getT1).collect(Collectors.toList()), (items) -> {
long total = 0;
for (Object item : items) {
total += (Long) item;
}
return total;
});
result2 = multi.stream().map(Tuple2::getT2).flatMap(Collection::stream).collect(Collectors.toList());
return Tuples.of(result1, result2);
return Mono
.fromSupplier(access::readLock)
.subscribeOn(Schedulers.boundedElastic())
.flatMap(stamp -> Flux
.fromArray(luceneIndices)
.index()
.flatMap(tuple -> Mono
.fromCallable(() -> resolveSnapshot(snapshot, (int) (long) tuple.getT1()))
.subscribeOn(Schedulers.boundedElastic())
.map(luceneSnapshot -> Tuples.of(tuple.getT2(), luceneSnapshot))
)
.flatMap(tuple -> tuple.getT1().search(tuple.getT2(), query, limit, sort, scoreMode, keyFieldName))
.reduce(LLSearchResult.accumulator())
.materialize()
.flatMap(signal -> Mono
.fromRunnable(() -> access.unlockRead(stamp))
.subscribeOn(Schedulers.boundedElastic())
.thenReturn(signal)
)
.dematerialize()
);
}
@Override

View File

@ -1,9 +1,12 @@
package it.cavallium.dbengine.database.luceneutil;
import it.cavallium.dbengine.database.LLKeyScore;
import java.io.IOException;
import java.util.function.Consumer;
import java.util.function.LongConsumer;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.ScoreMode;
import org.apache.lucene.search.Sort;
import org.jetbrains.annotations.Nullable;
@ -25,21 +28,23 @@ public class AdaptiveStreamSearcher implements LuceneStreamSearcher {
}
@Override
public Long streamSearch(IndexSearcher indexSearcher,
public void search(IndexSearcher indexSearcher,
Query query,
int limit,
@Nullable Sort luceneSort,
ScoreMode scoreMode,
String keyFieldName,
Consumer<String> consumer) throws IOException {
Consumer<LLKeyScore> consumer,
LongConsumer totalHitsConsumer) throws IOException {
if (limit == 0) {
return countStreamSearcher.count(indexSearcher, query);
totalHitsConsumer.accept(countStreamSearcher.count(indexSearcher, query));
} else if (luceneSort == null) {
return parallelCollectorStreamSearcher.streamSearch(indexSearcher, query, limit, null, keyFieldName, consumer);
parallelCollectorStreamSearcher.search(indexSearcher, query, limit, null, scoreMode, keyFieldName, consumer, totalHitsConsumer);
} else {
if (limit > PagedStreamSearcher.MAX_ITEMS_PER_PAGE) {
return pagedStreamSearcher.streamSearch(indexSearcher, query, limit, luceneSort, keyFieldName, consumer);
pagedStreamSearcher.search(indexSearcher, query, limit, luceneSort, scoreMode, keyFieldName, consumer, totalHitsConsumer);
} else {
return simpleStreamSearcher.streamSearch(indexSearcher, query, limit, luceneSort, keyFieldName, consumer);
simpleStreamSearcher.search(indexSearcher, query, limit, luceneSort, scoreMode, keyFieldName, consumer, totalHitsConsumer);
}
}
}

View File

@ -1,9 +1,12 @@
package it.cavallium.dbengine.database.luceneutil;
import it.cavallium.dbengine.database.LLKeyScore;
import java.io.IOException;
import java.util.function.Consumer;
import java.util.function.LongConsumer;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.ScoreMode;
import org.apache.lucene.search.Sort;
import org.jetbrains.annotations.Nullable;
@ -13,25 +16,27 @@ import org.jetbrains.annotations.Nullable;
public class CountStreamSearcher implements LuceneStreamSearcher {
@Override
public Long streamSearch(IndexSearcher indexSearcher,
public void search(IndexSearcher indexSearcher,
Query query,
int limit,
@Nullable Sort luceneSort,
ScoreMode scoreMode,
String keyFieldName,
Consumer<String> consumer) throws IOException {
Consumer<LLKeyScore> resultsConsumer,
LongConsumer totalHitsConsumer) throws IOException {
if (limit != 0) {
throw new IllegalArgumentException("CountStream doesn't support a limit different than 0");
}
if (luceneSort != null) {
throw new IllegalArgumentException("CountStream doesn't support sorting");
}
if (consumer != null) {
if (resultsConsumer != null) {
throw new IllegalArgumentException("CountStream doesn't support a results consumer");
}
if (keyFieldName != null) {
throw new IllegalArgumentException("CountStream doesn't support a key field");
}
return count(indexSearcher, query);
totalHitsConsumer.accept(count(indexSearcher, query));
}
public long count(IndexSearcher indexSearcher, Query query) throws IOException {

View File

@ -1,9 +1,12 @@
package it.cavallium.dbengine.database.luceneutil;
import it.cavallium.dbengine.database.LLKeyScore;
import java.io.IOException;
import java.util.function.Consumer;
import java.util.function.LongConsumer;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.ScoreMode;
import org.apache.lucene.search.Sort;
import org.jetbrains.annotations.Nullable;
@ -15,15 +18,18 @@ public interface LuceneStreamSearcher {
* @param query the query
* @param limit the maximum number of results
* @param luceneSort the sorting method used for the search
* @param scoreMode score mode
* @param keyFieldName the name of the key field
* @param consumer the consumer of results
* @return the approximated total count of results
* @param resultsConsumer the consumer of results
* @param totalHitsConsumer the consumer of total count of results
* @throws IOException thrown if there is an error
*/
Long streamSearch(IndexSearcher indexSearcher,
void search(IndexSearcher indexSearcher,
Query query,
int limit,
@Nullable Sort luceneSort,
ScoreMode scoreMode,
String keyFieldName,
Consumer<String> consumer) throws IOException;
Consumer<LLKeyScore> resultsConsumer,
LongConsumer totalHitsConsumer) throws IOException;
}

View File

@ -1,13 +1,16 @@
package it.cavallium.dbengine.database.luceneutil;
import it.cavallium.dbengine.database.LLKeyScore;
import java.io.IOException;
import java.util.Set;
import java.util.function.Consumer;
import java.util.function.LongConsumer;
import org.apache.lucene.document.Document;
import org.apache.lucene.index.IndexableField;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.ScoreMode;
import org.apache.lucene.search.Sort;
import org.apache.lucene.search.TopDocs;
import org.jetbrains.annotations.Nullable;
@ -26,45 +29,49 @@ public class PagedStreamSearcher implements LuceneStreamSearcher {
}
@Override
public Long streamSearch(IndexSearcher indexSearcher,
public void search(IndexSearcher indexSearcher,
Query query,
int limit,
@Nullable Sort luceneSort,
ScoreMode scoreMode,
String keyFieldName,
Consumer<String> consumer) throws IOException {
Consumer<LLKeyScore> resultsConsumer,
LongConsumer totalHitsConsumer) throws IOException {
if (limit < MAX_ITEMS_PER_PAGE) {
// Use a normal search method because the limit is low
return baseStreamSearcher.streamSearch(indexSearcher, query, limit, luceneSort, keyFieldName, consumer);
baseStreamSearcher.search(indexSearcher, query, limit, luceneSort, scoreMode, keyFieldName, resultsConsumer, totalHitsConsumer);
return;
}
IntWrapper currentAllowedResults = new IntWrapper(limit);
// Run the first page search
TopDocs lastTopDocs = indexSearcher.search(query, MAX_ITEMS_PER_PAGE, luceneSort);
TopDocs lastTopDocs = indexSearcher.search(query, MAX_ITEMS_PER_PAGE, luceneSort, scoreMode != ScoreMode.COMPLETE_NO_SCORES);
totalHitsConsumer.accept(lastTopDocs.totalHits.value);
if (lastTopDocs.scoreDocs.length > 0) {
ScoreDoc lastScoreDoc = getLastItem(lastTopDocs.scoreDocs);
consumeHits(currentAllowedResults, lastTopDocs.scoreDocs, indexSearcher, keyFieldName, consumer);
consumeHits(currentAllowedResults, lastTopDocs.scoreDocs, indexSearcher, scoreMode, keyFieldName, resultsConsumer);
// Run the searches for each page until the end
boolean finished = currentAllowedResults.var <= 0;
while (!finished) {
lastTopDocs = indexSearcher.searchAfter(lastScoreDoc, query, MAX_ITEMS_PER_PAGE, luceneSort);
lastTopDocs = indexSearcher.searchAfter(lastScoreDoc, query, MAX_ITEMS_PER_PAGE, luceneSort, scoreMode != ScoreMode.COMPLETE_NO_SCORES);
if (lastTopDocs.scoreDocs.length > 0) {
lastScoreDoc = getLastItem(lastTopDocs.scoreDocs);
consumeHits(currentAllowedResults, lastTopDocs.scoreDocs, indexSearcher, keyFieldName, consumer);
consumeHits(currentAllowedResults, lastTopDocs.scoreDocs, indexSearcher, scoreMode, keyFieldName, resultsConsumer);
}
if (lastTopDocs.scoreDocs.length < MAX_ITEMS_PER_PAGE || currentAllowedResults.var <= 0) {
finished = true;
}
}
}
return lastTopDocs.totalHits.value;
}
private void consumeHits(IntWrapper currentAllowedResults,
ScoreDoc[] hits,
IndexSearcher indexSearcher,
ScoreMode scoreMode,
String keyFieldName,
Consumer<String> consumer) throws IOException {
Consumer<LLKeyScore> resultsConsumer) throws IOException {
for (ScoreDoc hit : hits) {
int docId = hit.doc;
float score = hit.score;
@ -85,7 +92,7 @@ public class PagedStreamSearcher implements LuceneStreamSearcher {
if (field == null) {
System.err.println("Can't get key of document docId:" + docId + ",score:" + score);
} else {
consumer.accept(field.stringValue());
resultsConsumer.accept(new LLKeyScore(field.stringValue(), score));
}
}
} else {

View File

@ -1,15 +1,18 @@
package it.cavallium.dbengine.database.luceneutil;
import it.cavallium.dbengine.database.LLKeyScore;
import it.cavallium.dbengine.database.utils.LuceneParallelStreamCollectorManager;
import java.io.IOException;
import java.util.Set;
import java.util.concurrent.CompletionException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.LongConsumer;
import org.apache.lucene.document.Document;
import org.apache.lucene.index.IndexableField;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.ScoreMode;
import org.apache.lucene.search.Sort;
import org.jetbrains.annotations.Nullable;
@ -19,19 +22,21 @@ import org.jetbrains.annotations.Nullable;
public class ParallelCollectorStreamSearcher implements LuceneStreamSearcher {
@Override
public Long streamSearch(IndexSearcher indexSearcher,
public void search(IndexSearcher indexSearcher,
Query query,
int limit,
@Nullable Sort luceneSort,
ScoreMode scoreMode,
String keyFieldName,
Consumer<String> consumer) throws IOException {
Consumer<LLKeyScore> resultsConsumer,
LongConsumer totalHitsConsumer) throws IOException {
if (luceneSort != null) {
throw new IllegalArgumentException("ParallelCollectorStreamSearcher doesn't support sorted searches");
}
AtomicInteger currentCount = new AtomicInteger();
var result = indexSearcher.search(query, LuceneParallelStreamCollectorManager.fromConsumer(docId -> {
var result = indexSearcher.search(query, LuceneParallelStreamCollectorManager.fromConsumer(scoreMode, (docId, score) -> {
if (currentCount.getAndIncrement() >= limit) {
return false;
} else {
@ -51,7 +56,7 @@ public class ParallelCollectorStreamSearcher implements LuceneStreamSearcher {
if (field == null) {
System.err.println("Can't get key of document docId:" + docId);
} else {
consumer.accept(field.stringValue());
resultsConsumer.accept(new LLKeyScore(field.stringValue(), score));
}
}
} catch (IOException e) {
@ -62,6 +67,6 @@ public class ParallelCollectorStreamSearcher implements LuceneStreamSearcher {
}
}));
//todo: check the accuracy of our hits counter!
return result.getTotalHitsCount();
totalHitsConsumer.accept(result.getTotalHitsCount());
}
}

View File

@ -1,14 +1,17 @@
package it.cavallium.dbengine.database.luceneutil;
import it.cavallium.dbengine.database.LLKeyScore;
import it.unimi.dsi.fastutil.objects.ObjectArrayList;
import java.io.IOException;
import java.util.Set;
import java.util.function.Consumer;
import java.util.function.LongConsumer;
import org.apache.lucene.document.Document;
import org.apache.lucene.index.IndexableField;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.ScoreMode;
import org.apache.lucene.search.Sort;
import org.apache.lucene.search.TopDocs;
import org.jetbrains.annotations.Nullable;
@ -19,13 +22,16 @@ import org.jetbrains.annotations.Nullable;
public class SimpleStreamSearcher implements LuceneStreamSearcher {
@Override
public Long streamSearch(IndexSearcher indexSearcher,
public void search(IndexSearcher indexSearcher,
Query query,
int limit,
@Nullable Sort luceneSort,
ScoreMode scoreMode,
String keyFieldName,
Consumer<String> consumer) throws IOException {
TopDocs topDocs = indexSearcher.search(query, limit, luceneSort);
Consumer<LLKeyScore> resultsConsumer,
LongConsumer totalHitsConsumer) throws IOException {
TopDocs topDocs = indexSearcher.search(query, limit, luceneSort, scoreMode != ScoreMode.COMPLETE_NO_SCORES);
totalHitsConsumer.accept(topDocs.totalHits.value);
var hits = ObjectArrayList.wrap(topDocs.scoreDocs);
for (ScoreDoc hit : hits) {
int docId = hit.doc;
@ -45,10 +51,9 @@ public class SimpleStreamSearcher implements LuceneStreamSearcher {
if (field == null) {
System.err.println("Can't get key of document docId:" + docId + ",score:" + score);
} else {
consumer.accept(field.stringValue());
resultsConsumer.accept(new LLKeyScore(field.stringValue(), score));
}
}
}
return topDocs.totalHits.value;
}
}

View File

@ -1,76 +0,0 @@
package it.cavallium.dbengine.database.remote.client;
import io.grpc.ManagedChannel;
import io.grpc.netty.GrpcSslContexts;
import io.grpc.netty.NettyChannelBuilder;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import it.cavallium.dbengine.proto.CavalliumDBEngineServiceGrpc;
import java.nio.file.Path;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
import javax.net.ssl.SSLException;
public class DbClientFunctions extends CavalliumDBEngineServiceGrpc.CavalliumDBEngineServiceImplBase {
private static final Logger logger = Logger.getLogger(DbClientFunctions.class.getName());
private static final boolean SSL = false;
private final ManagedChannel channel;
private final CavalliumDBEngineServiceGrpc.CavalliumDBEngineServiceBlockingStub blockingStub;
private final CavalliumDBEngineServiceGrpc.CavalliumDBEngineServiceStub stub;
public CavalliumDBEngineServiceGrpc.CavalliumDBEngineServiceBlockingStub getBlockingStub() {
return blockingStub;
}
public CavalliumDBEngineServiceGrpc.CavalliumDBEngineServiceStub getStub() {
return stub;
}
public static SslContext buildSslContext(Path trustCertCollectionFilePath,
Path clientCertChainFilePath,
Path clientPrivateKeyFilePath) throws SSLException {
SslContextBuilder builder = GrpcSslContexts.forClient();
if (trustCertCollectionFilePath != null) {
builder.trustManager(trustCertCollectionFilePath.toFile());
}
if (clientCertChainFilePath != null && clientPrivateKeyFilePath != null) {
builder.keyManager(clientCertChainFilePath.toFile(), clientPrivateKeyFilePath.toFile());
}
return builder.build();
}
/**
* Construct client connecting to HelloWorld server at {@code host:port}.
*/
public DbClientFunctions(String host,
int port,
SslContext sslContext) throws SSLException {
this(generateThis(host, port, sslContext));
}
private static ManagedChannel generateThis(String host, int port, SslContext sslContext) {
var builder = NettyChannelBuilder.forAddress(host, port);
if (SSL) {
builder.sslContext(sslContext);
} else {
builder.usePlaintext();
}
return builder.build();
}
/**
* Construct client for accessing RouteGuide server using the existing channel.
*/
DbClientFunctions(ManagedChannel channel) {
this.channel = channel;
blockingStub = CavalliumDBEngineServiceGrpc.newBlockingStub(channel);
stub = CavalliumDBEngineServiceGrpc.newStub(channel);
}
public void shutdown() throws InterruptedException {
channel.shutdown().awaitTermination(5, TimeUnit.SECONDS);
}
}

View File

@ -1,131 +0,0 @@
package it.cavallium.dbengine.database.remote.client;
import com.google.protobuf.ByteString;
import io.grpc.StatusRuntimeException;
import it.cavallium.dbengine.proto.CavalliumDBEngineServiceGrpc;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.time.Duration;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
import javax.net.ssl.SSLException;
import it.cavallium.dbengine.database.Column;
import it.cavallium.dbengine.database.LLDatabaseConnection;
import it.cavallium.dbengine.database.LLKeyValueDatabase;
import it.cavallium.dbengine.database.LLLuceneIndex;
import it.cavallium.dbengine.database.analyzer.TextFieldsAnalyzer;
import it.cavallium.dbengine.proto.DatabaseOpenRequest;
import it.cavallium.dbengine.proto.Empty;
import it.cavallium.dbengine.proto.LuceneIndexOpenRequest;
import it.cavallium.dbengine.proto.ResetConnectionRequest;
public class LLRemoteDatabaseConnection implements LLDatabaseConnection {
private final String address;
private final int port;
private final Path trustCertCollectionFilePath;
private final Path clientCertChainFilePath;
private final Path clientPrivateKeyFilePath;
private DbClientFunctions client;
private CavalliumDBEngineServiceGrpc.CavalliumDBEngineServiceBlockingStub blockingStub;
public LLRemoteDatabaseConnection(String address, int port, Path trustCertCollectionFilePath,
Path clientCertChainFilePath,
Path clientPrivateKeyFilePath) {
this.address = address;
this.port = port;
this.trustCertCollectionFilePath = trustCertCollectionFilePath;
this.clientCertChainFilePath = clientCertChainFilePath;
this.clientPrivateKeyFilePath = clientPrivateKeyFilePath;
}
@Override
public void connect() throws IOException {
try {
this.client = new DbClientFunctions(address, port,
DbClientFunctions.buildSslContext(trustCertCollectionFilePath, clientCertChainFilePath,
clientPrivateKeyFilePath));
this.blockingStub = client.getBlockingStub();
//noinspection ResultOfMethodCallIgnored
blockingStub.resetConnection(ResetConnectionRequest.newBuilder().build());
} catch (SSLException | StatusRuntimeException e) {
throw new IOException(e);
}
}
@Override
public LLKeyValueDatabase getDatabase(String name, List<Column> columns, boolean lowMemory) throws IOException {
try {
var response = blockingStub.databaseOpen(DatabaseOpenRequest.newBuilder()
.setName(ByteString.copyFrom(name, StandardCharsets.US_ASCII))
.addAllColumnName(columns.stream().map(
(column) -> ByteString.copyFrom(column.getName().getBytes(StandardCharsets.US_ASCII)))
.collect(Collectors.toList()))
.setLowMemory(lowMemory)
.build());
int handle = response.getHandle();
return new LLRemoteKeyValueDatabase(name, client, handle);
} catch (StatusRuntimeException ex) {
throw new IOException(ex);
}
}
@Override
public LLLuceneIndex getLuceneIndex(String name,
int instancesCount,
TextFieldsAnalyzer textFieldsAnalyzer,
Duration queryRefreshDebounceTime,
java.time.Duration commitDebounceTime,
boolean lowMemory) throws IOException {
try {
var response = blockingStub.luceneIndexOpen(LuceneIndexOpenRequest.newBuilder()
.setName(name)
.setTextFieldsAnalyzer(textFieldsAnalyzer.ordinal())
.setQueryRefreshDebounceTime((int) queryRefreshDebounceTime.toMillis())
.setCommitDebounceTime((int) commitDebounceTime.toMillis())
.setLowMemory(lowMemory)
.setInstancesCount(instancesCount)
.build());
int handle = response.getHandle();
return new LLRemoteLuceneIndex(client, name, handle, lowMemory, instancesCount);
} catch (StatusRuntimeException ex) {
throw new IOException(ex);
}
}
@Override
public void disconnect() throws IOException {
try {
//noinspection ResultOfMethodCallIgnored
blockingStub.resetConnection(ResetConnectionRequest.newBuilder().build());
client.shutdown();
} catch (InterruptedException | StatusRuntimeException e) {
throw new IOException(e);
}
}
@Override
public void ping() throws IOException {
try {
blockingStub.ping(Empty.newBuilder().build());
} catch (StatusRuntimeException e) {
throw new IOException(e);
}
}
@Override
public double getMediumLatencyMillis() throws IOException {
int cap = 3;
long[] results = new long[cap];
for (int i = 0; i < cap; i++) {
long time1 = System.nanoTime();
ping();
long time2 = System.nanoTime();
results[i] = time2 - time1;
}
return LongStream.of(results).average().orElseThrow() / 1000000;
}
}

View File

@ -1,278 +0,0 @@
package it.cavallium.dbengine.database.remote.client;
import com.google.protobuf.ByteString;
import io.grpc.StatusRuntimeException;
import it.cavallium.dbengine.database.LLDictionary;
import it.cavallium.dbengine.database.LLDictionaryResultType;
import it.cavallium.dbengine.database.LLSnapshot;
import it.cavallium.dbengine.proto.CavalliumDBEngineServiceGrpc;
import it.cavallium.dbengine.proto.DictionaryMethodClearRequest;
import it.cavallium.dbengine.proto.DictionaryMethodContainsRequest;
import it.cavallium.dbengine.proto.DictionaryMethodForEachRequest;
import it.cavallium.dbengine.proto.DictionaryMethodGetRequest;
import it.cavallium.dbengine.proto.DictionaryMethodIsEmptyRequest;
import it.cavallium.dbengine.proto.DictionaryMethodPutMultiRequest;
import it.cavallium.dbengine.proto.DictionaryMethodPutRequest;
import it.cavallium.dbengine.proto.DictionaryMethodRemoveOneRequest;
import it.cavallium.dbengine.proto.DictionaryMethodRemoveRequest;
import it.cavallium.dbengine.proto.DictionaryMethodReplaceAllRequest;
import it.cavallium.dbengine.proto.DictionaryMethodSizeRequest;
import java.io.IOError;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.jetbrains.annotations.Nullable;
import org.warp.commonutils.concurrency.atomicity.NotAtomic;
import org.warp.commonutils.functional.CancellableBiConsumer;
import org.warp.commonutils.functional.CancellableBiFunction;
import org.warp.commonutils.functional.ConsumerResult;
@NotAtomic
public class LLRemoteDictionary implements LLDictionary {
private final CavalliumDBEngineServiceGrpc.CavalliumDBEngineServiceBlockingStub blockingStub;
private final CavalliumDBEngineServiceGrpc.CavalliumDBEngineServiceStub stub;
private final int handle;
private final String name;
public LLRemoteDictionary(DbClientFunctions clientFunctions, int handle, String name) {
this.blockingStub = clientFunctions.getBlockingStub();
this.stub = clientFunctions.getStub();
this.handle = handle;
this.name = name;
}
@Override
public Optional<byte[]> get(@Nullable LLSnapshot snapshot, byte[] key) throws IOException {
try {
var request = DictionaryMethodGetRequest.newBuilder()
.setDictionaryHandle(handle)
.setKey(ByteString.copyFrom(key));
if (snapshot != null) {
request.setSequenceNumber(snapshot.getSequenceNumber());
}
var response = blockingStub.dictionaryMethodGet(request.build());
var value = response.getValue();
if (value != null) {
return Optional.of(value.toByteArray());
} else {
return Optional.empty();
}
} catch (StatusRuntimeException ex) {
throw new IOException(ex);
}
}
@Override
public boolean contains(@Nullable LLSnapshot snapshot, byte[] key) throws IOException {
try {
var request = DictionaryMethodContainsRequest.newBuilder()
.setDictionaryHandle(handle)
.setKey(ByteString.copyFrom(key));
if (snapshot != null) {
request.setSequenceNumber(snapshot.getSequenceNumber());
}
var response = blockingStub.dictionaryMethodContains(request.build());
return response.getValue();
} catch (StatusRuntimeException ex) {
throw new IOException(ex);
}
}
@Override
public Optional<byte[]> put(byte[] key, byte[] value, LLDictionaryResultType resultType)
throws IOException {
try {
return put_(key, value, resultType);
} catch (StatusRuntimeException ex) {
throw new IOException(ex);
}
}
private Optional<byte[]> put_(byte[] key, byte[] value, LLDictionaryResultType resultType) {
var response = blockingStub.dictionaryMethodPut(DictionaryMethodPutRequest.newBuilder()
.setDictionaryHandle(handle)
.setKey(ByteString.copyFrom(key))
.setValue(ByteString.copyFrom(value))
.setResultType(resultType.toProto())
.build());
var bytes = response.getValue();
if (bytes != null) {
return Optional.of(bytes.toByteArray());
} else {
return Optional.empty();
}
}
@Override
public void putMulti(byte[][] key, byte[][] value, LLDictionaryResultType resultType,
Consumer<byte[]> responses) throws IOException {
try {
var response = blockingStub
.dictionaryMethodPutMulti(DictionaryMethodPutMultiRequest.newBuilder()
.setDictionaryHandle(handle)
.addAllKey(
List.of(key).stream().map(ByteString::copyFrom).collect(Collectors.toList()))
.addAllValue(
List.of(value).stream().map(ByteString::copyFrom).collect(Collectors.toList()))
.setResultType(resultType.toProto())
.build());
if (response.getValueList() != null) {
for (ByteString byteString : response.getValueList()) {
responses.accept(byteString.toByteArray());
}
}
} catch (StatusRuntimeException ex) {
throw new IOException(ex);
}
}
@Override
public Optional<byte[]> remove(byte[] key, LLDictionaryResultType resultType) throws IOException {
try {
return remove_(key, resultType);
} catch (StatusRuntimeException ex) {
throw new IOException(ex);
}
}
private Optional<byte[]> remove_(byte[] key, LLDictionaryResultType resultType) {
var response = blockingStub.dictionaryMethodRemove(DictionaryMethodRemoveRequest.newBuilder()
.setDictionaryHandle(handle)
.setKey(ByteString.copyFrom(key))
.setResultType(resultType.toProto())
.build());
var bytes = response.getValue();
if (bytes != null) {
return Optional.of(bytes.toByteArray());
} else {
return Optional.empty();
}
}
@Override
public ConsumerResult forEach(@Nullable LLSnapshot snapshot, int parallelism, CancellableBiConsumer<byte[], byte[]> consumer) {
try {
var request = DictionaryMethodForEachRequest.newBuilder().setDictionaryHandle(handle);
if (snapshot != null) {
request.setSequenceNumber(snapshot.getSequenceNumber());
}
var response = blockingStub.dictionaryMethodForEach(request.build());
while (response.hasNext()) {
var entry = response.next();
var key = entry.getKey().toByteArray();
var value = entry.getValue().toByteArray();
var cancelled = consumer.acceptCancellable(key, value);
if (cancelled.isCancelled()) {
return ConsumerResult.cancelNext();
}
}
return ConsumerResult.result();
} catch (StatusRuntimeException ex) {
throw new IOError(ex);
}
}
@Override
public ConsumerResult replaceAll(int parallelism, boolean replaceKeys, CancellableBiFunction<byte[], byte[], Entry<byte[], byte[]>> consumer) throws IOException {
try {
//todo: reimplement remote replaceAll using writeBatch
//todo: implement cancellation during iteration
var response = blockingStub
.dictionaryMethodReplaceAll(DictionaryMethodReplaceAllRequest.newBuilder()
.setDictionaryHandle(handle)
.setReplaceKeys(replaceKeys)
.build());
response.forEachRemaining((entry) -> {
var key = entry.getKey().toByteArray();
var value = entry.getValue().toByteArray();
var singleResponse = consumer.applyCancellable(key, value);
boolean keyDiffers = false;
if (!Arrays.equals(key, singleResponse.getValue().getKey())) {
remove_(key, LLDictionaryResultType.VOID);
keyDiffers = true;
}
// put if changed
if (keyDiffers || !Arrays.equals(value, singleResponse.getValue().getValue())) {
put_(singleResponse.getValue().getKey(), singleResponse.getValue().getValue(), LLDictionaryResultType.VOID);
}
});
return ConsumerResult.result();
} catch (StatusRuntimeException ex) {
throw new IOException(ex);
}
}
@Override
public void clear() throws IOException {
try {
//noinspection ResultOfMethodCallIgnored
blockingStub.dictionaryMethodClear(DictionaryMethodClearRequest.newBuilder()
.setDictionaryHandle(handle)
.build());
} catch (StatusRuntimeException ex) {
throw new IOException(ex);
}
}
@Override
public long size(@Nullable LLSnapshot snapshot, boolean fast) throws IOException {
try {
var request = DictionaryMethodSizeRequest.newBuilder().setDictionaryHandle(handle);
if (snapshot != null) {
request.setSequenceNumber(snapshot.getSequenceNumber());
}
var response = fast ? blockingStub.dictionaryMethodFastSize(request.build())
: blockingStub.dictionaryMethodExactSize(request.build());
return response.getSize();
} catch (StatusRuntimeException ex) {
throw new IOException(ex);
}
}
@Override
public boolean isEmpty(@Nullable LLSnapshot snapshot) throws IOException {
try {
var request = DictionaryMethodIsEmptyRequest
.newBuilder()
.setDictionaryHandle(handle);
if (snapshot != null) {
request.setSequenceNumber(snapshot.getSequenceNumber());
}
var response = blockingStub.dictionaryMethodIsEmpty(request.build());
return response.getEmpty();
} catch (StatusRuntimeException ex) {
throw new IOException(ex);
}
}
@Override
public Optional<Entry<byte[], byte[]>> removeOne() throws IOException {
try {
var response = blockingStub.dictionaryMethodRemoveOne(DictionaryMethodRemoveOneRequest
.newBuilder()
.setDictionaryHandle(handle)
.build());
var keyBytes = response.getKey();
var valueBytes = response.getValue();
if (keyBytes != null && valueBytes != null) {
return Optional.of(Map.entry(keyBytes.toByteArray(), valueBytes.toByteArray()));
} else {
return Optional.empty();
}
} catch (StatusRuntimeException ex) {
throw new IOException(ex);
}
}
@Override
public String getDatabaseName() {
return name;
}
}

View File

@ -1,124 +0,0 @@
package it.cavallium.dbengine.database.remote.client;
import com.google.protobuf.ByteString;
import io.grpc.StatusRuntimeException;
import java.io.IOException;
import it.cavallium.dbengine.database.LLSnapshot;
import it.cavallium.dbengine.database.LLDeepDictionary;
import it.cavallium.dbengine.database.LLDictionary;
import it.cavallium.dbengine.database.LLKeyValueDatabase;
import it.cavallium.dbengine.database.LLSingleton;
import it.cavallium.dbengine.proto.DatabaseCloseRequest;
import it.cavallium.dbengine.proto.DatabaseSnapshotReleaseRequest;
import it.cavallium.dbengine.proto.DatabaseSnapshotTakeRequest;
import it.cavallium.dbengine.proto.DictionaryOpenRequest;
import it.cavallium.dbengine.proto.SingletonOpenRequest;
import it.cavallium.dbengine.proto.CavalliumDBEngineServiceGrpc;
public class LLRemoteKeyValueDatabase implements LLKeyValueDatabase {
private final String name;
private final DbClientFunctions clientFunctions;
private final CavalliumDBEngineServiceGrpc.CavalliumDBEngineServiceBlockingStub blockingStub;
private final CavalliumDBEngineServiceGrpc.CavalliumDBEngineServiceStub stub;
private final int handle;
public LLRemoteKeyValueDatabase(String name, DbClientFunctions clientFunctions, int handle) {
this.name = name;
this.clientFunctions = clientFunctions;
this.blockingStub = clientFunctions.getBlockingStub();
this.stub = clientFunctions.getStub();
this.handle = handle;
}
@Override
public String getDatabaseName() {
return name;
}
@Override
public LLSingleton getSingleton(byte[] singletonListColumnName, byte[] name, byte[] defaultValue)
throws IOException {
try {
var response = blockingStub.singletonOpen(SingletonOpenRequest.newBuilder()
.setDatabaseHandle(this.handle)
.setSingletonListColumnName(ByteString.copyFrom(singletonListColumnName))
.setName(ByteString.copyFrom(name))
.setDefaultValue(ByteString.copyFrom(defaultValue))
.build());
int handle = response.getHandle();
return new LLRemoteSingleton(LLRemoteKeyValueDatabase.this.name, blockingStub, handle);
} catch (StatusRuntimeException ex) {
throw new IOException(ex);
}
}
@Override
public LLDictionary getDictionary(byte[] columnName) throws IOException {
try {
var response = blockingStub.dictionaryOpen(DictionaryOpenRequest.newBuilder()
.setDatabaseHandle(this.handle)
.setColumnName(ByteString.copyFrom(columnName))
.build());
int handle = response.getHandle();
return new LLRemoteDictionary(clientFunctions, handle, name);
} catch (StatusRuntimeException ex) {
throw new IOException(ex);
}
}
@Override
public LLDeepDictionary getDeepDictionary(byte[] columnName, int keySize, int key2Size) throws IOException {
try {
var response = blockingStub.dictionaryOpen(DictionaryOpenRequest.newBuilder()
.setDatabaseHandle(this.handle)
.setColumnName(ByteString.copyFrom(columnName))
.build());
int handle = response.getHandle();
throw new UnsupportedOperationException("Deep dictionaries are not implemented in remote databases!"); //todo: implement
} catch (StatusRuntimeException ex) {
throw new IOException(ex);
}
}
@Override
public LLSnapshot takeSnapshot() throws IOException {
try {
var response = blockingStub.databaseSnapshotTake(DatabaseSnapshotTakeRequest.newBuilder()
.setDatabaseHandle(this.handle)
.build());
long sequenceNumber = response.getSequenceNumber();
return new LLSnapshot(sequenceNumber);
} catch (StatusRuntimeException ex) {
throw new IOException(ex);
}
}
@Override
public void releaseSnapshot(LLSnapshot snapshot) throws IOException {
try {
var response = blockingStub.databaseSnapshotRelease(DatabaseSnapshotReleaseRequest.newBuilder()
.setDatabaseHandle(this.handle)
.setSequenceNumber(snapshot.getSequenceNumber())
.build());
} catch (StatusRuntimeException ex) {
throw new IOException(ex);
}
}
@Override
public long getProperty(String propertyName) throws IOException {
throw new UnsupportedOperationException("Not implemented"); //todo: implement
}
@Override
public void close() throws IOException {
try {
var response = blockingStub.databaseClose(DatabaseCloseRequest.newBuilder()
.setDatabaseHandle(this.handle)
.build());
} catch (StatusRuntimeException ex) {
throw new IOException(ex);
}
}
}

View File

@ -1,339 +0,0 @@
package it.cavallium.dbengine.database.remote.client;
import io.grpc.StatusRuntimeException;
import it.cavallium.dbengine.database.LLDocument;
import it.cavallium.dbengine.database.LLKeyScore;
import it.cavallium.dbengine.database.LLLuceneIndex;
import it.cavallium.dbengine.database.LLSnapshot;
import it.cavallium.dbengine.database.LLSort;
import it.cavallium.dbengine.database.LLTerm;
import it.cavallium.dbengine.database.LLTopKeys;
import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.proto.CavalliumDBEngineServiceGrpc;
import it.cavallium.dbengine.proto.LuceneIndexCloseRequest;
import it.cavallium.dbengine.proto.LuceneIndexMethodAddDocumentMultiRequest;
import it.cavallium.dbengine.proto.LuceneIndexMethodAddDocumentRequest;
import it.cavallium.dbengine.proto.LuceneIndexMethodCountRequest;
import it.cavallium.dbengine.proto.LuceneIndexMethodDeleteAllRequest;
import it.cavallium.dbengine.proto.LuceneIndexMethodDeleteDocumentRequest;
import it.cavallium.dbengine.proto.LuceneIndexMethodMoreLikeThisRequest;
import it.cavallium.dbengine.proto.LuceneIndexMethodSearchRequest;
import it.cavallium.dbengine.proto.LuceneIndexMethodSearchResponse;
import it.cavallium.dbengine.proto.LuceneIndexMethodSearchStreamRequest;
import it.cavallium.dbengine.proto.LuceneIndexMethodUpdateDocumentMultiRequest;
import it.cavallium.dbengine.proto.LuceneIndexMethodUpdateDocumentRequest;
import it.cavallium.dbengine.proto.LuceneIndexSnapshotReleaseRequest;
import it.cavallium.dbengine.proto.LuceneIndexSnapshotTakeRequest;
import it.cavallium.dbengine.proto.MltField;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.stream.Collectors;
import org.jetbrains.annotations.Nullable;
import org.warp.commonutils.batch.ParallelUtils;
import org.warp.commonutils.functional.IOConsumer;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuples;
public class LLRemoteLuceneIndex implements LLLuceneIndex {
private final CavalliumDBEngineServiceGrpc.CavalliumDBEngineServiceBlockingStub blockingStub;
private final String luceneIndexName;
private final int handle;
private final boolean lowMemory;
private final int instancesCount;
public LLRemoteLuceneIndex(DbClientFunctions clientFunctions,
String name,
int handle,
boolean lowMemory,
int instancesCount) {
this.blockingStub = clientFunctions.getBlockingStub();
this.luceneIndexName = name;
this.handle = handle;
this.lowMemory = lowMemory;
this.instancesCount = instancesCount;
}
@Override
public String getLuceneIndexName() {
return luceneIndexName;
}
@Override
public LLSnapshot takeSnapshot() throws IOException {
try {
var searchResult = blockingStub
.luceneIndexSnapshotTake(LuceneIndexSnapshotTakeRequest.newBuilder()
.setHandle(handle).build());
return new LLSnapshot(searchResult.getSequenceNumber());
} catch (StatusRuntimeException ex) {
throw new IOException(ex);
}
}
@Override
public void releaseSnapshot(LLSnapshot snapshot) throws IOException {
try {
//noinspection ResultOfMethodCallIgnored
blockingStub.luceneIndexSnapshotRelease(LuceneIndexSnapshotReleaseRequest.newBuilder()
.setHandle(handle)
.setSequenceNumber(snapshot.getSequenceNumber())
.build());
} catch (StatusRuntimeException ex) {
throw new IOException(ex);
}
}
@Override
public void addDocument(LLTerm key, LLDocument doc) throws IOException {
try {
//noinspection ResultOfMethodCallIgnored
blockingStub.luceneIndexMethodAddDocument(LuceneIndexMethodAddDocumentRequest.newBuilder()
.setHandle(handle)
.setKey(LLUtils.toGrpc(key))
.addAllDocumentItems(LLUtils.toGrpc(doc.getItems()))
.build());
} catch (StatusRuntimeException ex) {
throw new IOException(ex);
}
}
@Override
public void addDocuments(Iterable<LLTerm> keys, Iterable<LLDocument> docs) throws IOException {
try {
//noinspection ResultOfMethodCallIgnored
blockingStub
.luceneIndexMethodAddDocumentMulti(LuceneIndexMethodAddDocumentMultiRequest.newBuilder()
.setHandle(handle)
.addAllKey(LLUtils.toGrpcKey(keys))
.addAllDocuments(LLUtils.toGrpc(docs))
.build());
} catch (StatusRuntimeException ex) {
throw new IOException(ex);
}
}
@Override
public void deleteDocument(LLTerm id) throws IOException {
try {
//noinspection ResultOfMethodCallIgnored
blockingStub
.luceneIndexMethodDeleteDocument(LuceneIndexMethodDeleteDocumentRequest.newBuilder()
.setHandle(handle)
.setKey(LLUtils.toGrpc(id))
.build());
} catch (StatusRuntimeException ex) {
throw new IOException(ex);
}
}
@Override
public void updateDocument(LLTerm id, LLDocument document) throws IOException {
try {
//noinspection ResultOfMethodCallIgnored
blockingStub
.luceneIndexMethodUpdateDocument(LuceneIndexMethodUpdateDocumentRequest.newBuilder()
.setHandle(handle)
.setKey(LLUtils.toGrpc(id))
.addAllDocumentItems(LLUtils.toGrpc(document.getItems()))
.build());
} catch (StatusRuntimeException ex) {
throw new IOException(ex);
}
}
@Override
public void updateDocuments(Iterable<LLTerm> ids, Iterable<LLDocument> documents)
throws IOException {
try {
//noinspection ResultOfMethodCallIgnored
blockingStub.luceneIndexMethodUpdateDocumentMulti(
LuceneIndexMethodUpdateDocumentMultiRequest.newBuilder()
.setHandle(handle)
.addAllKey(LLUtils.toGrpcKey(ids))
.addAllDocuments(LLUtils.toGrpc(documents))
.build());
} catch (StatusRuntimeException ex) {
throw new IOException(ex);
}
}
@Override
public void deleteAll() throws IOException {
try {
//noinspection ResultOfMethodCallIgnored
blockingStub.luceneIndexMethodDeleteAll(LuceneIndexMethodDeleteAllRequest.newBuilder()
.setHandle(handle)
.build());
} catch (StatusRuntimeException ex) {
throw new IOException(ex);
}
}
@Override
public Collection<LLTopKeys> search(@Nullable LLSnapshot snapshot,
String query,
int limit,
@Nullable LLSort sort,
String keyFieldName) throws IOException {
try {
ConcurrentLinkedQueue<LLTopKeys> multiResult = new ConcurrentLinkedQueue<>();
ParallelUtils.parallelizeIO((IOConsumer<Integer> c) -> {
for (int shardIndex = 0; shardIndex < instancesCount; shardIndex++) {
c.consume(shardIndex);
}
}, 0, instancesCount, 1, shardIndex -> {
var request = LuceneIndexMethodSearchRequest.newBuilder()
.setHandle(handle)
.setQuery(query)
.setLimit(limit)
.setKeyFieldName(keyFieldName);
if (snapshot != null) {
request.setSequenceNumber(snapshot.getSequenceNumber());
}
if (sort != null) {
request.setSort(LLUtils.toGrpc(sort));
}
var searchMultiResults = blockingStub.luceneIndexMethodSearch(request.build());
for (LuceneIndexMethodSearchResponse response : searchMultiResults.getResponseList()) {
var result = new LLTopKeys(response.getTotalHitsCount(),
response.getHitsList().stream().map(LLUtils::toKeyScore).toArray(LLKeyScore[]::new)
);
multiResult.add(result);
}
});
return multiResult;
} catch (StatusRuntimeException ex) {
throw new IOException(ex);
}
}
@Override
public Collection<LLTopKeys> moreLikeThis(@Nullable LLSnapshot snapshot, Map<String, Set<String>> mltDocumentFields,
int limit,
String keyFieldName) throws IOException {
try {
ConcurrentLinkedQueue<LLTopKeys> multiResult = new ConcurrentLinkedQueue<>();
ParallelUtils.parallelizeIO((IOConsumer<Integer> c) -> {
for (int shardIndex = 0; shardIndex < instancesCount; shardIndex++) {
c.consume(shardIndex);
}
}, 0, instancesCount, 1, shardIndex -> {
var request = LuceneIndexMethodMoreLikeThisRequest.newBuilder()
.setHandle(handle)
.addAllMltFields(mltDocumentFields
.entrySet()
.stream()
.map(entry -> MltField.newBuilder().setKey(entry.getKey()).addAllValues(entry.getValue()).build())
.collect(Collectors.toList()))
.setLimit(limit)
.setKeyFieldName(keyFieldName);
if (snapshot != null) {
request.setSequenceNumber(snapshot.getSequenceNumber());
}
var searchMultiResult = blockingStub.luceneIndexMethodMoreLikeThis(request.build());
for (LuceneIndexMethodSearchResponse response : searchMultiResult.getResponseList()) {
var result = new LLTopKeys(response.getTotalHitsCount(),
response.getHitsList().stream().map(LLUtils::toKeyScore).toArray(LLKeyScore[]::new)
);
multiResult.add(result);
}
});
return multiResult;
} catch (StatusRuntimeException ex) {
throw new IOException(ex);
}
}
@Override
public Tuple2<Mono<Long>, Collection<Flux<String>>> searchStream(@Nullable LLSnapshot snapshot, String query, int limit, @Nullable LLSort sort, String keyFieldName) {
try {
var request = LuceneIndexMethodSearchStreamRequest.newBuilder()
.setHandle(handle)
.setQuery(query)
.setLimit(limit)
.setKeyFieldName(keyFieldName);
if (snapshot != null) {
request.setSequenceNumber(snapshot.getSequenceNumber());
}
if (sort != null) {
request.setSort(LLUtils.toGrpc(sort));
}
var searchResult = blockingStub.luceneIndexMethodSearchStream(request.build());
EmitterProcessor<Long> approximatedTotalHitsCount = EmitterProcessor.create();
ArrayList<EmitterProcessor<String>> results = new ArrayList<>();
for (int shardIndex = 0; shardIndex < instancesCount; shardIndex++) {
results.add(EmitterProcessor.create());
}
searchResult.forEachRemaining((result) -> {
if (result.getIsKey()) {
results.get(result.getShardIndex()).onNext(result.getKey());
} else {
approximatedTotalHitsCount.onNext(result.getApproximatedTotalCount());
}
});
return Tuples.of(approximatedTotalHitsCount.single(0L),
results.stream().map(EmitterProcessor::asFlux).collect(Collectors.toList())
);
} catch (RuntimeException ex) {
var error = new IOException(ex);
return Tuples.of(Mono.error(error), Collections.singleton(Flux.error(error)));
}
}
@Override
public long count(@Nullable LLSnapshot snapshot, String query) throws IOException {
try {
var request = LuceneIndexMethodCountRequest.newBuilder()
.setHandle(handle)
.setQuery(query);
if (snapshot != null) {
request.setSequenceNumber(snapshot.getSequenceNumber());
}
var searchResult = blockingStub
.luceneIndexMethodCount(request.build());
return searchResult.getCount();
} catch (StatusRuntimeException ex) {
throw new IOException(ex);
}
}
@Override
public void close() throws IOException {
try {
//noinspection ResultOfMethodCallIgnored
blockingStub.luceneIndexClose(LuceneIndexCloseRequest.newBuilder()
.setHandle(handle)
.build());
} catch (StatusRuntimeException ex) {
throw new IOException(ex);
}
}
@Override
public boolean isLowMemoryMode() {
return lowMemory;
}
}

View File

@ -1,59 +0,0 @@
package it.cavallium.dbengine.database.remote.client;
import com.google.protobuf.ByteString;
import io.grpc.StatusRuntimeException;
import it.cavallium.dbengine.database.LLSingleton;
import it.cavallium.dbengine.database.LLSnapshot;
import it.cavallium.dbengine.proto.CavalliumDBEngineServiceGrpc;
import it.cavallium.dbengine.proto.SingletonMethodGetRequest;
import it.cavallium.dbengine.proto.SingletonMethodSetRequest;
import java.io.IOException;
import org.jetbrains.annotations.Nullable;
public class LLRemoteSingleton implements LLSingleton {
private final CavalliumDBEngineServiceGrpc.CavalliumDBEngineServiceBlockingStub blockingStub;
private final int handle;
private final String databaseName;
public LLRemoteSingleton(
String databaseName,
CavalliumDBEngineServiceGrpc.CavalliumDBEngineServiceBlockingStub blockingStub, int handle) {
this.databaseName = databaseName;
this.blockingStub = blockingStub;
this.handle = handle;
}
@Override
public byte[] get(@Nullable LLSnapshot snapshot) throws IOException {
try {
var request = SingletonMethodGetRequest.newBuilder()
.setSingletonHandle(handle);
if (snapshot != null) {
request.setSequenceNumber(snapshot.getSequenceNumber());
}
var response = blockingStub.singletonMethodGet(request.build());
return response.getValue().toByteArray();
} catch (StatusRuntimeException ex) {
throw new IOException(ex);
}
}
@Override
public void set(byte[] value) throws IOException {
try {
//noinspection ResultOfMethodCallIgnored
blockingStub.singletonMethodSet(SingletonMethodSetRequest.newBuilder()
.setSingletonHandle(handle)
.setValue(ByteString.copyFrom(value))
.build());
} catch (StatusRuntimeException ex) {
throw new IOException(ex);
}
}
@Override
public String getDatabaseName() {
return databaseName;
}
}

View File

@ -1,842 +0,0 @@
package it.cavallium.dbengine.database.remote.server;
import com.google.protobuf.ByteString;
import io.grpc.stub.StreamObserver;
import it.cavallium.dbengine.database.Column;
import it.cavallium.dbengine.database.LLDictionary;
import it.cavallium.dbengine.database.LLDictionaryResultType;
import it.cavallium.dbengine.database.LLKeyValueDatabase;
import it.cavallium.dbengine.database.LLLuceneIndex;
import it.cavallium.dbengine.database.LLSingleton;
import it.cavallium.dbengine.database.LLSnapshot;
import it.cavallium.dbengine.database.LLTopKeys;
import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.analyzer.TextFieldsAnalyzer;
import it.cavallium.dbengine.database.disk.LLLocalDatabaseConnection;
import it.cavallium.dbengine.proto.CavalliumDBEngineServiceGrpc;
import it.cavallium.dbengine.proto.DatabaseCloseRequest;
import it.cavallium.dbengine.proto.DatabaseOpenRequest;
import it.cavallium.dbengine.proto.DatabaseSnapshotReleaseRequest;
import it.cavallium.dbengine.proto.DatabaseSnapshotTakeRequest;
import it.cavallium.dbengine.proto.DatabaseSnapshotTakeResult;
import it.cavallium.dbengine.proto.DictionaryMethodClearRequest;
import it.cavallium.dbengine.proto.DictionaryMethodContainsRequest;
import it.cavallium.dbengine.proto.DictionaryMethodContainsResponse;
import it.cavallium.dbengine.proto.DictionaryMethodForEachRequest;
import it.cavallium.dbengine.proto.DictionaryMethodGetRequest;
import it.cavallium.dbengine.proto.DictionaryMethodGetResponse;
import it.cavallium.dbengine.proto.DictionaryMethodIsEmptyRequest;
import it.cavallium.dbengine.proto.DictionaryMethodIsEmptyResponse;
import it.cavallium.dbengine.proto.DictionaryMethodMultiStandardResult;
import it.cavallium.dbengine.proto.DictionaryMethodPutMultiRequest;
import it.cavallium.dbengine.proto.DictionaryMethodPutRequest;
import it.cavallium.dbengine.proto.DictionaryMethodRemoveOneRequest;
import it.cavallium.dbengine.proto.DictionaryMethodRemoveRequest;
import it.cavallium.dbengine.proto.DictionaryMethodSizeRequest;
import it.cavallium.dbengine.proto.DictionaryMethodSizeResponse;
import it.cavallium.dbengine.proto.DictionaryMethodStandardEntityResponse;
import it.cavallium.dbengine.proto.DictionaryMethodStandardResult;
import it.cavallium.dbengine.proto.DictionaryOpenRequest;
import it.cavallium.dbengine.proto.Empty;
import it.cavallium.dbengine.proto.HandleResult;
import it.cavallium.dbengine.proto.LLDocument;
import it.cavallium.dbengine.proto.LLTerm;
import it.cavallium.dbengine.proto.LuceneIndexCloseRequest;
import it.cavallium.dbengine.proto.LuceneIndexMethodAddDocumentMultiRequest;
import it.cavallium.dbengine.proto.LuceneIndexMethodAddDocumentRequest;
import it.cavallium.dbengine.proto.LuceneIndexMethodCountRequest;
import it.cavallium.dbengine.proto.LuceneIndexMethodCountResponse;
import it.cavallium.dbengine.proto.LuceneIndexMethodDeleteAllRequest;
import it.cavallium.dbengine.proto.LuceneIndexMethodDeleteDocumentRequest;
import it.cavallium.dbengine.proto.LuceneIndexMethodMoreLikeThisRequest;
import it.cavallium.dbengine.proto.LuceneIndexMethodSearchMultiResponse;
import it.cavallium.dbengine.proto.LuceneIndexMethodSearchRequest;
import it.cavallium.dbengine.proto.LuceneIndexMethodSearchResponse;
import it.cavallium.dbengine.proto.LuceneIndexMethodSearchStreamItem;
import it.cavallium.dbengine.proto.LuceneIndexMethodSearchStreamRequest;
import it.cavallium.dbengine.proto.LuceneIndexMethodUpdateDocumentMultiRequest;
import it.cavallium.dbengine.proto.LuceneIndexMethodUpdateDocumentRequest;
import it.cavallium.dbengine.proto.LuceneIndexOpenRequest;
import it.cavallium.dbengine.proto.LuceneIndexSnapshotReleaseRequest;
import it.cavallium.dbengine.proto.LuceneIndexSnapshotTakeRequest;
import it.cavallium.dbengine.proto.LuceneIndexSnapshotTakeResult;
import it.cavallium.dbengine.proto.MltField;
import it.cavallium.dbengine.proto.ResetConnectionRequest;
import it.cavallium.dbengine.proto.SingletonMethodGetRequest;
import it.cavallium.dbengine.proto.SingletonMethodGetResponse;
import it.cavallium.dbengine.proto.SingletonMethodSetRequest;
import it.cavallium.dbengine.proto.SingletonOpenRequest;
import it.unimi.dsi.fastutil.objects.ObjectArrayList;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.warp.commonutils.functional.ConsumerResult;
public class DbServerFunctions extends CavalliumDBEngineServiceGrpc.CavalliumDBEngineServiceImplBase {
private final AtomicInteger firstFreeDbHandle = new AtomicInteger(0);
private final AtomicInteger firstFreeLuceneHandle = new AtomicInteger(0);
private final AtomicInteger firstFreeStructureHandle = new AtomicInteger(0);
private final ConcurrentHashMap<Integer, LLKeyValueDatabase> databases = new ConcurrentHashMap<>();
private final ConcurrentHashMap<Integer, LLLuceneIndex> luceneIndices = new ConcurrentHashMap<>();
private final ConcurrentHashMap<Integer, Set<Integer>> databasesRelatedHandles = new ConcurrentHashMap<>();
private final ConcurrentHashMap<Integer, LLSingleton> singletons = new ConcurrentHashMap<>();
private final ConcurrentHashMap<Integer, LLDictionary> dictionaries = new ConcurrentHashMap<>();
private final LLLocalDatabaseConnection localDatabaseConnection;
public DbServerFunctions(LLLocalDatabaseConnection localDatabaseConnection) {
this.localDatabaseConnection = localDatabaseConnection;
}
@Override
public void resetConnection(ResetConnectionRequest request,
StreamObserver<Empty> responseObserver) {
System.out.println("Resetting connection...");
int lastHandle = firstFreeDbHandle.get();
databases.forEach((handle, db) -> {
System.out.println("Closing db " + handle);
try {
db.close();
} catch (IOException e) {
e.printStackTrace();
}
});
for (int i = 0; i < lastHandle; i++) {
var relatedHandles = databasesRelatedHandles.remove(i);
if (relatedHandles != null) {
for (Integer relatedHandle : relatedHandles) {
singletons.remove(relatedHandle);
dictionaries.remove(relatedHandle);
}
}
databases.remove(i);
}
responseObserver.onNext(Empty.newBuilder().build());
responseObserver.onCompleted();
System.out.println("Connection reset.");
}
@Override
public void databaseOpen(DatabaseOpenRequest request,
StreamObserver<HandleResult> responseObserver) {
var response = HandleResult.newBuilder();
int handle = firstFreeDbHandle.getAndIncrement();
System.out.println("Opening db " + handle + ".");
String dbName = Column.toString(request.getName().toByteArray());
List<Column> columns = request.getColumnNameList().stream()
.map((nameBinary) -> Column.special(Column.toString(nameBinary.toByteArray())))
.collect(Collectors.toList());
boolean lowMemory = request.getLowMemory();
try {
var database = localDatabaseConnection.getDatabase(dbName, columns, lowMemory);
databases.put(handle, database);
response.setHandle(handle);
responseObserver.onNext(response.build());
} catch (IOException e) {
e.printStackTrace();
responseObserver.onError(e);
}
responseObserver.onCompleted();
}
@Override
public void databaseClose(DatabaseCloseRequest request, StreamObserver<Empty> responseObserver) {
try {
System.out.println("Closing db " + request.getDatabaseHandle() + ".");
var db = databases.remove(request.getDatabaseHandle());
db.close();
responseObserver.onNext(Empty.newBuilder().build());
} catch (Exception e) {
responseObserver.onError(e);
}
responseObserver.onCompleted();
}
@Override
public void luceneIndexOpen(LuceneIndexOpenRequest request,
StreamObserver<HandleResult> responseObserver) {
var response = HandleResult.newBuilder();
int handle = firstFreeLuceneHandle.getAndIncrement();
System.out.println("Opening lucene " + handle + ".");
String name = request.getName();
TextFieldsAnalyzer textFieldsAnalyzer = TextFieldsAnalyzer.values()[request.getTextFieldsAnalyzer()];
var queryRefreshDebounceTime = Duration.ofMillis(request.getQueryRefreshDebounceTime());
var commitDebounceTime = Duration.ofMillis(request.getCommitDebounceTime());
var lowMemory = request.getLowMemory();
var instancesCount = request.getInstancesCount();
try {
var luceneIndex = localDatabaseConnection.getLuceneIndex(name,
instancesCount,
textFieldsAnalyzer,
queryRefreshDebounceTime,
commitDebounceTime,
lowMemory
);
luceneIndices.put(handle, luceneIndex);
response.setHandle(handle);
responseObserver.onNext(response.build());
} catch (IOException e) {
e.printStackTrace();
responseObserver.onError(e);
}
responseObserver.onCompleted();
}
@Override
public void luceneIndexClose(LuceneIndexCloseRequest request,
StreamObserver<Empty> responseObserver) {
try {
System.out.println("Closing lucene " + request.getHandle() + ".");
var luceneIndex = luceneIndices.remove(request.getHandle());
luceneIndex.close();
responseObserver.onNext(Empty.newBuilder().build());
} catch (Exception e) {
responseObserver.onError(e);
}
responseObserver.onCompleted();
}
@Override
public void luceneIndexSnapshotTake(LuceneIndexSnapshotTakeRequest request, StreamObserver<LuceneIndexSnapshotTakeResult> responseObserver) {
var response = LuceneIndexSnapshotTakeResult.newBuilder();
int handle = request.getHandle();
try {
var snapshot = luceneIndices.get(handle).takeSnapshot();
response.setSequenceNumber(snapshot.getSequenceNumber());
responseObserver.onNext(response.build());
} catch (IOException e) {
e.printStackTrace();
responseObserver.onError(e);
}
responseObserver.onCompleted();
}
@SuppressWarnings("DuplicatedCode")
@Override
public void luceneIndexSnapshotRelease(LuceneIndexSnapshotReleaseRequest request, StreamObserver<Empty> responseObserver) {
var response = Empty.newBuilder();
int handle = request.getHandle();
long sequenceNumber = request.getSequenceNumber();
try {
luceneIndices.get(handle).releaseSnapshot(new LLSnapshot(sequenceNumber));
responseObserver.onNext(response.build());
} catch (IOException e) {
e.printStackTrace();
responseObserver.onError(e);
}
responseObserver.onCompleted();
}
@Override
public void singletonOpen(SingletonOpenRequest request,
StreamObserver<HandleResult> responseObserver) {
var response = HandleResult.newBuilder();
int handle = firstFreeStructureHandle.getAndIncrement();
int dbHandle = request.getDatabaseHandle();
byte[] singletonListColumnName = request.getSingletonListColumnName().toByteArray();
byte[] name = request.getName().toByteArray();
byte[] defaultValue = request.getDefaultValue().toByteArray();
try {
var singleton = databases.get(dbHandle)
.getSingleton(singletonListColumnName, name, defaultValue);
singletons.put(handle, singleton);
response.setHandle(handle);
responseObserver.onNext(response.build());
} catch (IOException e) {
e.printStackTrace();
responseObserver.onError(e);
}
responseObserver.onCompleted();
}
@Override
public void dictionaryOpen(DictionaryOpenRequest request,
StreamObserver<HandleResult> responseObserver) {
var response = HandleResult.newBuilder();
int handle = firstFreeStructureHandle.getAndIncrement();
int dbHandle = request.getDatabaseHandle();
byte[] columnName = request.getColumnName().toByteArray();
try {
var dict = databases.get(dbHandle).getDictionary(columnName);
dictionaries.put(handle, dict);
response.setHandle(handle);
responseObserver.onNext(response.build());
} catch (IOException e) {
e.printStackTrace();
responseObserver.onError(e);
}
responseObserver.onCompleted();
}
@Override
public void databaseSnapshotTake(DatabaseSnapshotTakeRequest request, StreamObserver<DatabaseSnapshotTakeResult> responseObserver) {
var response = DatabaseSnapshotTakeResult.newBuilder();
int dbHandle = request.getDatabaseHandle();
try {
var snapshot = databases.get(dbHandle).takeSnapshot();
response.setSequenceNumber(snapshot.getSequenceNumber());
responseObserver.onNext(response.build());
} catch (IOException e) {
e.printStackTrace();
responseObserver.onError(e);
}
responseObserver.onCompleted();
}
@Override
public void databaseSnapshotRelease(DatabaseSnapshotReleaseRequest request, StreamObserver<Empty> responseObserver) {
var response = Empty.newBuilder();
int dbHandle = request.getDatabaseHandle();
long sequenceNumber = request.getSequenceNumber();
try {
databases.get(dbHandle).releaseSnapshot(new LLSnapshot(sequenceNumber));
responseObserver.onNext(response.build());
} catch (IOException e) {
e.printStackTrace();
responseObserver.onError(e);
}
responseObserver.onCompleted();
}
@Override
public void dictionaryMethodGet(DictionaryMethodGetRequest request,
StreamObserver<DictionaryMethodGetResponse> responseObserver) {
var response = DictionaryMethodGetResponse.newBuilder();
int handle = request.getDictionaryHandle();
long sequenceNumber = request.getSequenceNumber();
LLSnapshot snapshot = sequenceNumber == 0 ? null : new LLSnapshot(sequenceNumber);
byte[] key = request.getKey().toByteArray();
try {
var dict = dictionaries.get(handle);
Optional<byte[]> value = dict.get(snapshot, key);
value.ifPresent(bytes -> response.setValue(ByteString.copyFrom(bytes)));
responseObserver.onNext(response.build());
} catch (IOException e) {
e.printStackTrace();
responseObserver.onError(e);
}
responseObserver.onCompleted();
}
@Override
public void dictionaryMethodContains(DictionaryMethodContainsRequest request,
StreamObserver<DictionaryMethodContainsResponse> responseObserver) {
var response = DictionaryMethodContainsResponse.newBuilder();
int handle = request.getDictionaryHandle();
long sequenceNumber = request.getSequenceNumber();
LLSnapshot snapshot = sequenceNumber == 0 ? null : new LLSnapshot(sequenceNumber);
byte[] key = request.getKey().toByteArray();
try {
var dict = dictionaries.get(handle);
boolean value = dict.contains(snapshot, key);
response.setValue(value);
responseObserver.onNext(response.build());
} catch (IOException e) {
e.printStackTrace();
responseObserver.onError(e);
}
responseObserver.onCompleted();
}
@Override
public void dictionaryMethodPut(DictionaryMethodPutRequest request,
StreamObserver<DictionaryMethodStandardResult> responseObserver) {
var response = DictionaryMethodStandardResult.newBuilder();
int handle = request.getDictionaryHandle();
byte[] key = request.getKey().toByteArray();
byte[] value = request.getValue().toByteArray();
var resultType = LLDictionaryResultType
.valueOf(it.cavallium.dbengine.proto.LLDictionaryResultType.forNumber(request.getResultTypeValue()));
try {
var dict = dictionaries.get(handle);
Optional<byte[]> result = dict.put(key, value, resultType);
result.ifPresent((data) -> response.setValue(ByteString.copyFrom(data)));
responseObserver.onNext(response.build());
} catch (IOException e) {
e.printStackTrace();
responseObserver.onError(e);
}
responseObserver.onCompleted();
}
@Override
public void dictionaryMethodPutMulti(DictionaryMethodPutMultiRequest request,
StreamObserver<DictionaryMethodMultiStandardResult> responseObserver) {
var response = DictionaryMethodMultiStandardResult.newBuilder();
int handle = request.getDictionaryHandle();
byte[][] key = request.getKeyList().stream().map(ByteString::toByteArray)
.toArray(byte[][]::new);
byte[][] value = request.getValueList().stream().map(ByteString::toByteArray)
.toArray(byte[][]::new);
var resultType = LLDictionaryResultType
.valueOf(it.cavallium.dbengine.proto.LLDictionaryResultType.forNumber(request.getResultTypeValue()));
try {
var dict = dictionaries.get(handle);
List<ByteString> responses = new LinkedList<>();
dict.putMulti(key, value, resultType, (bytes) -> responses.add(ByteString.copyFrom(bytes)));
if (!responses.isEmpty()) {
response.addAllValue(responses);
}
responseObserver.onNext(response.build());
} catch (IOException e) {
e.printStackTrace();
responseObserver.onError(e);
}
responseObserver.onCompleted();
}
@Override
public void dictionaryMethodRemove(DictionaryMethodRemoveRequest request,
StreamObserver<DictionaryMethodStandardResult> responseObserver) {
var response = DictionaryMethodStandardResult.newBuilder();
int handle = request.getDictionaryHandle();
byte[] key = request.getKey().toByteArray();
var resultType = LLDictionaryResultType
.valueOf(it.cavallium.dbengine.proto.LLDictionaryResultType.forNumber(request.getResultTypeValue()));
try {
var dict = dictionaries.get(handle);
Optional<byte[]> result = dict.remove(key, resultType);
result.ifPresent((data) -> response.setValue(ByteString.copyFrom(data)));
responseObserver.onNext(response.build());
} catch (IOException e) {
e.printStackTrace();
responseObserver.onError(e);
}
responseObserver.onCompleted();
}
@Override
public void dictionaryMethodClear(DictionaryMethodClearRequest request,
StreamObserver<Empty> responseObserver) {
int handle = request.getDictionaryHandle();
try {
var dict = dictionaries.get(handle);
dict.clear();
responseObserver.onNext(Empty.newBuilder().build());
} catch (IOException e) {
e.printStackTrace();
responseObserver.onError(e);
}
responseObserver.onCompleted();
}
@Override
public void dictionaryMethodFastSize(DictionaryMethodSizeRequest request,
StreamObserver<DictionaryMethodSizeResponse> responseObserver) {
var response = DictionaryMethodSizeResponse.newBuilder();
int handle = request.getDictionaryHandle();
long sequenceNumber = request.getSequenceNumber();
LLSnapshot snapshot = sequenceNumber == 0 ? null : new LLSnapshot(sequenceNumber);
try {
var dict = dictionaries.get(handle);
long result = dict.size(snapshot, true);
response.setSize(result);
responseObserver.onNext(response.build());
} catch (IOException e) {
e.printStackTrace();
responseObserver.onError(e);
}
responseObserver.onCompleted();
}
@Override
public void dictionaryMethodExactSize(DictionaryMethodSizeRequest request,
StreamObserver<DictionaryMethodSizeResponse> responseObserver) {
var response = DictionaryMethodSizeResponse.newBuilder();
int handle = request.getDictionaryHandle();
long sequenceNumber = request.getSequenceNumber();
LLSnapshot snapshot = sequenceNumber == 0 ? null : new LLSnapshot(sequenceNumber);
try {
var dict = dictionaries.get(handle);
long result = dict.size(snapshot, false);
response.setSize(result);
responseObserver.onNext(response.build());
} catch (IOException e) {
e.printStackTrace();
responseObserver.onError(e);
}
responseObserver.onCompleted();
}
@Override
public void dictionaryMethodIsEmpty(DictionaryMethodIsEmptyRequest request,
StreamObserver<DictionaryMethodIsEmptyResponse> responseObserver) {
var response = DictionaryMethodIsEmptyResponse.newBuilder();
int handle = request.getDictionaryHandle();
long sequenceNumber = request.getSequenceNumber();
LLSnapshot snapshot = sequenceNumber == 0 ? null : new LLSnapshot(sequenceNumber);
try {
var dict = dictionaries.get(handle);
boolean result = dict.isEmpty(snapshot);
response.setEmpty(result);
responseObserver.onNext(response.build());
} catch (IOException e) {
e.printStackTrace();
responseObserver.onError(e);
}
responseObserver.onCompleted();
}
@Override
public void dictionaryMethodRemoveOne(DictionaryMethodRemoveOneRequest request,
StreamObserver<DictionaryMethodStandardEntityResponse> responseObserver) {
var response = DictionaryMethodStandardEntityResponse.newBuilder();
int handle = request.getDictionaryHandle();
try {
var dict = dictionaries.get(handle);
Optional<Entry<byte[], byte[]>> result = dict.removeOne();
result.ifPresent((data) -> {
response.setKey(ByteString.copyFrom(data.getKey()));
response.setValue(ByteString.copyFrom(data.getValue()));
});
responseObserver.onNext(response.build());
} catch (IOException e) {
e.printStackTrace();
responseObserver.onError(e);
}
responseObserver.onCompleted();
}
@Override
public void dictionaryMethodForEach(DictionaryMethodForEachRequest request,
StreamObserver<DictionaryMethodStandardEntityResponse> responseObserver) {
int handle = request.getDictionaryHandle();
long sequenceNumber = request.getSequenceNumber();
LLSnapshot snapshot = sequenceNumber == 0 ? null : new LLSnapshot(sequenceNumber);
var dict = dictionaries.get(handle);
dict.forEach(snapshot, 1, (key, val) -> {
var response = DictionaryMethodStandardEntityResponse.newBuilder();
response.setKey(ByteString.copyFrom(key));
response.setValue(ByteString.copyFrom(val));
responseObserver.onNext(response.build());
return ConsumerResult.result();
});
responseObserver.onCompleted();
}
@Override
public void singletonMethodGet(SingletonMethodGetRequest request,
StreamObserver<SingletonMethodGetResponse> responseObserver) {
var response = SingletonMethodGetResponse.newBuilder();
int handle = request.getSingletonHandle();
long sequenceNumber = request.getSequenceNumber();
LLSnapshot snapshot = sequenceNumber == 0 ? null : new LLSnapshot(sequenceNumber);
try {
var singleton = singletons.get(handle);
byte[] result = singleton.get(snapshot);
response.setValue(ByteString.copyFrom(result));
responseObserver.onNext(response.build());
} catch (IOException e) {
e.printStackTrace();
responseObserver.onError(e);
}
responseObserver.onCompleted();
}
@Override
public void singletonMethodSet(SingletonMethodSetRequest request,
StreamObserver<Empty> responseObserver) {
int handle = request.getSingletonHandle();
byte[] value = request.getValue().toByteArray();
try {
var singleton = singletons.get(handle);
singleton.set(value);
responseObserver.onNext(Empty.newBuilder().build());
} catch (IOException e) {
e.printStackTrace();
responseObserver.onError(e);
}
responseObserver.onCompleted();
}
@Override
public void luceneIndexMethodAddDocument(LuceneIndexMethodAddDocumentRequest request,
StreamObserver<Empty> responseObserver) {
int handle = request.getHandle();
var documentKey = request.getKey();
var documentItemsList = request.getDocumentItemsList();
try {
var luceneIndex = luceneIndices.get(handle);
luceneIndex.addDocument(LLUtils.toLocal(documentKey), LLUtils.toLocal(documentItemsList));
responseObserver.onNext(Empty.newBuilder().build());
} catch (IOException e) {
e.printStackTrace();
responseObserver.onError(e);
}
responseObserver.onCompleted();
}
@Override
public void luceneIndexMethodAddDocumentMulti(LuceneIndexMethodAddDocumentMultiRequest request,
StreamObserver<Empty> responseObserver) {
int handle = request.getHandle();
List<LLTerm> keyList = request.getKeyList();
List<LLDocument> documentItemsList = request.getDocumentsList();
try {
var luceneIndex = luceneIndices.get(handle);
luceneIndex.addDocuments(LLUtils.toLocalTerms(keyList), LLUtils.toLocalDocuments(documentItemsList));
responseObserver.onNext(Empty.newBuilder().build());
} catch (IOException e) {
e.printStackTrace();
responseObserver.onError(e);
}
responseObserver.onCompleted();
}
@Override
public void luceneIndexMethodDeleteDocument(LuceneIndexMethodDeleteDocumentRequest request,
StreamObserver<Empty> responseObserver) {
int handle = request.getHandle();
var key = request.getKey();
try {
var luceneIndex = luceneIndices.get(handle);
luceneIndex.deleteDocument(LLUtils.toLocal(key));
responseObserver.onNext(Empty.newBuilder().build());
} catch (IOException e) {
e.printStackTrace();
responseObserver.onError(e);
}
responseObserver.onCompleted();
}
@Override
public void luceneIndexMethodUpdateDocument(LuceneIndexMethodUpdateDocumentRequest request,
StreamObserver<Empty> responseObserver) {
int handle = request.getHandle();
var key = request.getKey();
var documentItemsList = request.getDocumentItemsList();
try {
var luceneIndex = luceneIndices.get(handle);
luceneIndex.updateDocument(LLUtils.toLocal(key), LLUtils.toLocal(documentItemsList));
responseObserver.onNext(Empty.newBuilder().build());
} catch (IOException e) {
e.printStackTrace();
responseObserver.onError(e);
}
responseObserver.onCompleted();
}
@Override
public void luceneIndexMethodUpdateDocumentMulti(
LuceneIndexMethodUpdateDocumentMultiRequest request, StreamObserver<Empty> responseObserver) {
int handle = request.getHandle();
List<LLTerm> keyList = request.getKeyList();
List<LLDocument> documentItemsList = request.getDocumentsList();
try {
var luceneIndex = luceneIndices.get(handle);
luceneIndex.updateDocuments(LLUtils.toLocalTerms(keyList),
LLUtils.toLocalDocuments(documentItemsList));
responseObserver.onNext(Empty.newBuilder().build());
} catch (IOException e) {
e.printStackTrace();
responseObserver.onError(e);
}
responseObserver.onCompleted();
}
@Override
public void luceneIndexMethodDeleteAll(LuceneIndexMethodDeleteAllRequest request,
StreamObserver<Empty> responseObserver) {
int handle = request.getHandle();
try {
var luceneIndex = luceneIndices.get(handle);
luceneIndex.deleteAll();
responseObserver.onNext(Empty.newBuilder().build());
} catch (IOException e) {
e.printStackTrace();
responseObserver.onError(e);
}
responseObserver.onCompleted();
}
@Override
public void luceneIndexMethodSearch(LuceneIndexMethodSearchRequest request,
StreamObserver<LuceneIndexMethodSearchMultiResponse> responseObserver) {
int handle = request.getHandle();
var snapshot = request.getSequenceNumber() == 0 ? null : new LLSnapshot(request.getSequenceNumber());
var query = request.getQuery();
var limit = request.getLimit();
var sort = request.hasSort() ? LLUtils.toLocal(request.getSort()) : null;
var keyFieldName = request.getKeyFieldName();
try {
var luceneIndex = luceneIndices.get(handle);
var multiResults = luceneIndex.search(snapshot, query, limit, sort, keyFieldName);
List<LuceneIndexMethodSearchResponse> responses = new ArrayList<>();
for (LLTopKeys result : multiResults) {
var response = LuceneIndexMethodSearchResponse.newBuilder()
.setTotalHitsCount(result.getTotalHitsCount())
.addAllHits(ObjectArrayList.wrap(result.getHits()).stream().map(LLUtils::toGrpc)
.collect(Collectors.toList()));
responses.add(response.build());
}
responseObserver.onNext(LuceneIndexMethodSearchMultiResponse.newBuilder().addAllResponse(responses).build());
} catch (IOException e) {
e.printStackTrace();
responseObserver.onError(e);
}
responseObserver.onCompleted();
}
@Override
public void luceneIndexMethodMoreLikeThis(LuceneIndexMethodMoreLikeThisRequest request,
StreamObserver<LuceneIndexMethodSearchMultiResponse> responseObserver) {
int handle = request.getHandle();
var snapshot = request.getSequenceNumber() == 0 ? null : new LLSnapshot(request.getSequenceNumber());
var mltFieldsList = request.getMltFieldsList();
var limit = request.getLimit();
var keyFieldName = request.getKeyFieldName();
try {
var luceneIndex = luceneIndices.get(handle);
var mltFields = new HashMap<String, Set<String>>();
for (MltField mltField : mltFieldsList) {
mltFields.put(mltField.getKey(), new HashSet<>(mltField.getValuesList()));
}
var multiResults = luceneIndex.moreLikeThis(snapshot, mltFields, limit, keyFieldName);
List<LuceneIndexMethodSearchResponse> responses = new ArrayList<>();
for (LLTopKeys result : multiResults) {
var response = LuceneIndexMethodSearchResponse
.newBuilder()
.setTotalHitsCount(result.getTotalHitsCount())
.addAllHits(ObjectArrayList.wrap(result.getHits()).stream().map(LLUtils::toGrpc).collect(Collectors.toList()));
responses.add(response.build());
}
responseObserver.onNext(LuceneIndexMethodSearchMultiResponse.newBuilder().addAllResponse(responses).build());
} catch (IOException e) {
e.printStackTrace();
responseObserver.onError(e);
}
responseObserver.onCompleted();
}
@Override
public void luceneIndexMethodSearchStream(LuceneIndexMethodSearchStreamRequest request,
StreamObserver<LuceneIndexMethodSearchStreamItem> responseObserver) {
int handle = request.getHandle();
var snapshot = request.getSequenceNumber() == 0 ? null : new LLSnapshot(request.getSequenceNumber());
var query = request.getQuery();
var limit = request.getLimit();
var sort = request.hasSort() ? LLUtils.toLocal(request.getSort()) : null;
var keyFieldName = request.getKeyFieldName();
try {
var luceneIndex = luceneIndices.get(handle);
var results = luceneIndex.searchStream(snapshot, query, limit, sort, keyFieldName);
int shardIndex = 0;
for (var flux : results.getT2()) {
int shardIndexF = shardIndex;
flux.subscribe(resultKey -> responseObserver.onNext(LuceneIndexMethodSearchStreamItem
.newBuilder()
.setShardIndex(shardIndexF)
.setIsKey(true)
.setKey(resultKey)
.build()), responseObserver::onError, responseObserver::onCompleted);
shardIndex++;
}
results
.getT1()
.subscribe(count -> responseObserver.onNext(LuceneIndexMethodSearchStreamItem
.newBuilder()
.setIsKey(false)
.setApproximatedTotalCount(count)
.build()), responseObserver::onError, responseObserver::onCompleted);
} catch (Exception e) {
e.printStackTrace();
responseObserver.onError(e);
}
}
@Override
public void luceneIndexMethodCount(LuceneIndexMethodCountRequest request,
StreamObserver<LuceneIndexMethodCountResponse> responseObserver) {
int handle = request.getHandle();
var snapshot = request.getSequenceNumber() == 0 ? null : new LLSnapshot(request.getSequenceNumber());
var query = request.getQuery();
try {
var luceneIndex = luceneIndices.get(handle);
var result = luceneIndex.count(snapshot, query);
var response = LuceneIndexMethodCountResponse.newBuilder()
.setCount(result);
responseObserver.onNext(response.build());
} catch (IOException e) {
e.printStackTrace();
responseObserver.onError(e);
}
responseObserver.onCompleted();
}
@Override
public void ping(Empty request, StreamObserver<Empty> responseObserver) {
responseObserver.onNext(Empty.newBuilder().build());
responseObserver.onCompleted();
}
}

View File

@ -1,95 +0,0 @@
package it.cavallium.dbengine.database.remote.server;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.netty.GrpcSslContexts;
import io.netty.handler.ssl.ClientAuth;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.SslProvider;
import java.io.IOException;
import java.nio.file.Path;
import it.cavallium.dbengine.database.disk.LLLocalDatabaseConnection;
public class DbServerManager {
private Server server;
public boolean stopped;
private final LLLocalDatabaseConnection databaseConnection;
private final String host;
private final int port;
private final Path certChainFilePath;
private final Path privateKeyFilePath;
private final Path trustCertCollectionFilePath;
public DbServerManager(LLLocalDatabaseConnection databaseConnection, String host, int port,
Path certChainFilePath, Path privateKeyFilePath, Path trustCertCollectionFilePath) {
this.databaseConnection = databaseConnection;
this.host = host;
this.port = port;
this.certChainFilePath = certChainFilePath;
this.privateKeyFilePath = privateKeyFilePath;
this.trustCertCollectionFilePath = trustCertCollectionFilePath;
}
private SslContextBuilder getSslContextBuilder() {
SslContextBuilder sslClientContextBuilder = SslContextBuilder
.forServer(certChainFilePath.toFile(),
privateKeyFilePath.toFile());
if (trustCertCollectionFilePath != null) {
sslClientContextBuilder.trustManager(trustCertCollectionFilePath.toFile());
sslClientContextBuilder.clientAuth(ClientAuth.REQUIRE);
}
return GrpcSslContexts.configure(sslClientContextBuilder,
SslProvider.OPENSSL);
}
public void start() throws IOException {
var srvBuilder = ServerBuilder.forPort(port)
.addService(new DbServerFunctions(databaseConnection));
server = srvBuilder.build()
.start();
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
if (!stopped) {
// Use stderr here since the logger may have been reset by its JVM shutdown hook.
System.err.println("*** shutting down gRPC server since JVM is shutting down");
this.stop();
try {
databaseConnection.disconnect();
} catch (IOException e) {
e.printStackTrace();
}
System.err.println("*** server shut down");
}
}));
System.out.println("Server started, listening on " + port);
}
public void stop() {
stopped = true;
if (server != null) {
try {
server.shutdown();
} catch (Exception ex) {
ex.printStackTrace();
}
try {
blockUntilShutdown();
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
System.out.println("Server stopped.");
}
/**
* Await termination on the main thread since the grpc library uses daemon threads.
*/
void blockUntilShutdown() throws InterruptedException {
if (server != null) {
server.awaitTermination();
}
}
}

View File

@ -1,29 +0,0 @@
package it.cavallium.dbengine.database.remote.server;
import java.io.IOException;
import java.nio.file.Paths;
import it.cavallium.dbengine.database.disk.LLLocalDatabaseConnection;
public class Main {
/**
* @param args [database-path] [host] [port] [cert-chain-file-path] [private-key-file-path]
* [trust-cert-collection-file-path]
*/
public static void main(String[] args) throws IOException, InterruptedException {
if (args.length != 7) {
System.out.println(
"Usage: java -jar dataserver.jar <database-path> <host> <port> <cert-chain-file-path> <private-key-file-path> <trust-cert-collection-file-path> <crash-if-wal-errored>");
} else {
System.out.println("Database server starting...");
var dbConnection = new LLLocalDatabaseConnection(Paths.get(args[0]),
Boolean.parseBoolean(args[6]));
dbConnection.connect();
var serverManager = new DbServerManager(dbConnection, args[1], Integer.parseInt(args[2]),
Paths.get(args[3]), Paths.get(args[4]), Paths.get(args[5]));
serverManager.start();
serverManager.blockUntilShutdown();
System.out.println("Database has been terminated.");
}
}
}

View File

@ -12,15 +12,18 @@ import org.apache.lucene.search.ScoreMode;
public class LuceneParallelStreamCollector implements Collector, LeafCollector {
private final int base;
private final ScoreMode scoreMode;
private final LuceneParallelStreamConsumer streamConsumer;
private final AtomicBoolean stopped;
private final AtomicLong totalHitsCounter;
private final ReentrantLock lock;
private final int base;
private Scorable scorer;
public LuceneParallelStreamCollector(int base, LuceneParallelStreamConsumer streamConsumer,
public LuceneParallelStreamCollector(int base, ScoreMode scoreMode, LuceneParallelStreamConsumer streamConsumer,
AtomicBoolean stopped, AtomicLong totalHitsCounter, ReentrantLock lock) {
this.base = base;
this.scoreMode = scoreMode;
this.streamConsumer = streamConsumer;
this.stopped = stopped;
this.totalHitsCounter = totalHitsCounter;
@ -28,13 +31,13 @@ public class LuceneParallelStreamCollector implements Collector, LeafCollector {
}
@Override
public final LeafCollector getLeafCollector(LeafReaderContext context) throws IOException {
return new LuceneParallelStreamCollector(context.docBase, streamConsumer, stopped, totalHitsCounter, lock);
public final LeafCollector getLeafCollector(LeafReaderContext context) {
return new LuceneParallelStreamCollector(context.docBase, scoreMode, streamConsumer, stopped, totalHitsCounter, lock);
}
@Override
public void setScorer(Scorable scorer) throws IOException {
public void setScorer(Scorable scorer) {
this.scorer = scorer;
}
@Override
@ -44,7 +47,8 @@ public class LuceneParallelStreamCollector implements Collector, LeafCollector {
lock.lock();
try {
if (!stopped.get()) {
if (!streamConsumer.consume(doc)) {
assert (scorer == null) || scorer.docID() == doc;
if (!streamConsumer.consume(doc, scorer == null ? 0 : scorer.score())) {
stopped.set(true);
}
}
@ -55,6 +59,6 @@ public class LuceneParallelStreamCollector implements Collector, LeafCollector {
@Override
public ScoreMode scoreMode() {
return ScoreMode.COMPLETE_NO_SCORES;
return scoreMode;
}
}

View File

@ -1,26 +1,29 @@
package it.cavallium.dbengine.database.utils;
import java.io.IOException;
import java.util.Collection;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.lucene.search.CollectorManager;
import org.apache.lucene.search.ScoreMode;
public class LuceneParallelStreamCollectorManager implements
CollectorManager<LuceneParallelStreamCollector, LuceneParallelStreamCollectorResult> {
private final ScoreMode scoreMode;
private final LuceneParallelStreamConsumer streamConsumer;
private final AtomicBoolean stopped;
private final AtomicLong totalHitsCounter;
private final ReentrantLock lock;
public static LuceneParallelStreamCollectorManager fromConsumer(
ScoreMode scoreMode,
LuceneParallelStreamConsumer streamConsumer) {
return new LuceneParallelStreamCollectorManager(streamConsumer);
return new LuceneParallelStreamCollectorManager(scoreMode, streamConsumer);
}
public LuceneParallelStreamCollectorManager(LuceneParallelStreamConsumer streamConsumer) {
public LuceneParallelStreamCollectorManager(ScoreMode scoreMode, LuceneParallelStreamConsumer streamConsumer) {
this.scoreMode = scoreMode;
this.streamConsumer = streamConsumer;
this.stopped = new AtomicBoolean();
this.totalHitsCounter = new AtomicLong();
@ -28,13 +31,13 @@ public class LuceneParallelStreamCollectorManager implements
}
@Override
public LuceneParallelStreamCollector newCollector() throws IOException {
return new LuceneParallelStreamCollector(0, streamConsumer, stopped, totalHitsCounter, lock);
public LuceneParallelStreamCollector newCollector() {
return new LuceneParallelStreamCollector(0, scoreMode, streamConsumer, stopped, totalHitsCounter, lock);
}
@Override
public LuceneParallelStreamCollectorResult reduce(
Collection<LuceneParallelStreamCollector> collectors) throws IOException {
Collection<LuceneParallelStreamCollector> collectors) {
return new LuceneParallelStreamCollectorResult(totalHitsCounter.get());
}

View File

@ -3,8 +3,9 @@ package it.cavallium.dbengine.database.utils;
public interface LuceneParallelStreamConsumer {
/**
* @param docId
* @param docId document id
* @param score score of document
* @return true to continue, false to stop the execution
*/
boolean consume(int docId);
boolean consume(int docId, float score);
}