Don't use arrays
This commit is contained in:
parent
46ac6ca481
commit
aad7195acb
4
pom.xml
4
pom.xml
@ -344,7 +344,7 @@
|
||||
<dependency>
|
||||
<groupId>com.google.guava</groupId>
|
||||
<artifactId>guava</artifactId>
|
||||
<version>31.0.1-jre</version>
|
||||
<version>31.1-jre</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.warp</groupId>
|
||||
@ -466,7 +466,7 @@
|
||||
<dependency>
|
||||
<groupId>org.jetbrains</groupId>
|
||||
<artifactId>annotations</artifactId>
|
||||
<version>22.0.0</version>
|
||||
<version>23.0.0</version>
|
||||
<scope>compile</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
|
@ -71,9 +71,11 @@ public class LuceneIndexImpl<T, U> implements LuceneIndex<T, U> {
|
||||
|
||||
@Override
|
||||
public Mono<Void> updateDocuments(Flux<Entry<T, U>> entries) {
|
||||
return luceneIndex.updateDocuments(entries.flatMap(entry -> indicizer
|
||||
.toDocument(entry.getKey(), entry.getValue())
|
||||
.map(doc -> Map.entry(indicizer.toIndex(entry.getKey()), doc))));
|
||||
return luceneIndex.updateDocuments(entries.flatMap(entry -> Mono.zip(
|
||||
Mono.just(indicizer.toIndex(entry.getKey())),
|
||||
indicizer.toDocument(entry.getKey(), entry.getValue()).single(),
|
||||
Map::entry
|
||||
)));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -1,3 +1,5 @@
|
||||
package it.cavallium.dbengine.database;
|
||||
|
||||
public record LLSoftUpdateDocument(LLItem[] items, LLItem[] softDeleteItems) implements LLIndexRequest {}
|
||||
import java.util.List;
|
||||
|
||||
public record LLSoftUpdateDocument(List<LLItem> items, List<LLItem> softDeleteItems) implements LLIndexRequest {}
|
||||
|
@ -1,3 +1,5 @@
|
||||
package it.cavallium.dbengine.database;
|
||||
|
||||
public record LLUpdateDocument(LLItem[] items) implements LLIndexRequest {}
|
||||
import java.util.List;
|
||||
|
||||
public record LLUpdateDocument(List<LLItem> items) implements LLIndexRequest {}
|
||||
|
@ -1,3 +1,5 @@
|
||||
package it.cavallium.dbengine.database;
|
||||
|
||||
public record LLUpdateFields(LLItem[] items) implements LLIndexRequest {}
|
||||
import java.util.List;
|
||||
|
||||
public record LLUpdateFields(List<LLItem> items) implements LLIndexRequest {}
|
||||
|
@ -158,7 +158,7 @@ public class LLUtils {
|
||||
return toDocument(document.items());
|
||||
}
|
||||
|
||||
public static Document toDocument(LLItem[] document) {
|
||||
public static Document toDocument(List<LLItem> document) {
|
||||
Document d = new Document();
|
||||
for (LLItem item : document) {
|
||||
if (item != null) {
|
||||
@ -168,10 +168,10 @@ public class LLUtils {
|
||||
return d;
|
||||
}
|
||||
|
||||
public static Field[] toFields(LLItem... fields) {
|
||||
Field[] d = new Field[fields.length];
|
||||
for (int i = 0; i < fields.length; i++) {
|
||||
d[i] = LLUtils.toField(fields[i]);
|
||||
public static Field[] toFields(List<LLItem> fields) {
|
||||
Field[] d = new Field[fields.size()];
|
||||
for (int i = 0; i < fields.size(); i++) {
|
||||
d[i] = LLUtils.toField(fields.get(i));
|
||||
}
|
||||
return d;
|
||||
}
|
||||
|
@ -85,8 +85,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
|
||||
private static final ReentrantLock shutdownLock = new ReentrantLock();
|
||||
private static final Scheduler luceneHeavyTasksScheduler = uninterruptibleScheduler(Schedulers.single(Schedulers.boundedElastic()));
|
||||
//todo: remove after https://github.com/reactor/reactor-core/issues/2960 is fixed
|
||||
private static final Scheduler bulkScheduler = uninterruptibleScheduler(Schedulers.newBoundedElastic(
|
||||
DEFAULT_BOUNDED_ELASTIC_SIZE, DEFAULT_BOUNDED_ELASTIC_QUEUESIZE, "bulkBoundedElastic", 60, true));
|
||||
private static final Scheduler bulkScheduler = Schedulers.boundedElastic();
|
||||
|
||||
static {
|
||||
LLUtils.initHooks();
|
||||
@ -378,6 +377,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
|
||||
indexWriter.updateDocument(LLUtils.toTerm(key), toDocument(value));
|
||||
return null;
|
||||
});
|
||||
logger.trace(MARKER_LUCENE, "Updated document {}: {}", key, value);
|
||||
} catch (Exception ex) {
|
||||
sink.error(ex);
|
||||
return;
|
||||
|
@ -29,11 +29,11 @@ import it.cavallium.dbengine.lucene.searcher.MultiSearcher;
|
||||
import it.cavallium.dbengine.rpc.current.data.IndicizerAnalyzers;
|
||||
import it.cavallium.dbengine.rpc.current.data.IndicizerSimilarities;
|
||||
import it.cavallium.dbengine.rpc.current.data.LuceneOptions;
|
||||
import it.unimi.dsi.fastutil.ints.Int2ObjectArrayMap;
|
||||
import it.unimi.dsi.fastutil.ints.IntList;
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
@ -216,35 +216,20 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex {
|
||||
@Override
|
||||
public Mono<Void> updateDocuments(Flux<Entry<LLTerm, LLUpdateDocument>> documents) {
|
||||
if (BYPASS_GROUPBY_BUG) {
|
||||
int bufferSize = 8192;
|
||||
return documents
|
||||
.buffer(8192)
|
||||
.flatMap(inputEntries -> {
|
||||
List<Entry<LLTerm, LLUpdateDocument>>[] sortedEntries = new List[totalShards];
|
||||
Mono<Void>[] results = new Mono[totalShards];
|
||||
|
||||
// Sort entries
|
||||
for(var inputEntry : inputEntries) {
|
||||
int luceneIndexId = LuceneUtils.getLuceneIndexId(inputEntry.getKey(), totalShards);
|
||||
if (sortedEntries[luceneIndexId] == null) {
|
||||
sortedEntries[luceneIndexId] = new ArrayList<>();
|
||||
}
|
||||
sortedEntries[luceneIndexId].add(inputEntry);
|
||||
}
|
||||
|
||||
// Add documents
|
||||
int luceneIndexId = 0;
|
||||
for (List<Entry<LLTerm, LLUpdateDocument>> docs : sortedEntries) {
|
||||
if (docs != null && !docs.isEmpty()) {
|
||||
LLLocalLuceneIndex luceneIndex = Objects.requireNonNull(luceneIndicesById[luceneIndexId]);
|
||||
results[luceneIndexId] = luceneIndex.updateDocuments(Flux.fromIterable(docs));
|
||||
} else {
|
||||
results[luceneIndexId] = Mono.empty();
|
||||
}
|
||||
luceneIndexId++;
|
||||
}
|
||||
|
||||
return Mono.when(results);
|
||||
})
|
||||
.window(bufferSize)
|
||||
.flatMap(bufferFlux -> bufferFlux
|
||||
.collect(Collectors.groupingBy(inputEntry -> LuceneUtils.getLuceneIndexId(inputEntry.getKey(), totalShards),
|
||||
Collectors.collectingAndThen(Collectors.toList(), docs -> {
|
||||
int luceneIndexId = LuceneUtils.getLuceneIndexId(docs.get(0).getKey(), totalShards);
|
||||
LLLocalLuceneIndex luceneIndex = Objects.requireNonNull(luceneIndicesById[luceneIndexId]);
|
||||
return luceneIndex.updateDocuments(Flux.fromIterable(docs));
|
||||
}))
|
||||
)
|
||||
.map(Map::values)
|
||||
.flatMap(Mono::whenDelayError)
|
||||
)
|
||||
.then();
|
||||
} else {
|
||||
return documents
|
||||
|
@ -38,7 +38,7 @@ public class StringIndicizer extends Indicizer<String, String> {
|
||||
fields.add(LLItem.newLongPoint("longpoint", numLong));
|
||||
fields.add(LLItem.newNumericDocValuesField("longsort", numLong));
|
||||
}
|
||||
return new LLUpdateDocument(fields.toArray(LLItem[]::new));
|
||||
return new LLUpdateDocument(fields);
|
||||
});
|
||||
}
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user