Test more numbers

This commit is contained in:
Andrea Cavalli 2021-07-18 19:37:24 +02:00
parent ff9ee54857
commit aa1aa7a6fb
25 changed files with 317 additions and 138 deletions

View File

@ -44,12 +44,4 @@ public class CountedStream<T> {
public Mono<List<T>> collectList() {
return stream.collectList();
}
public static <T> Mono<CountedStream<T>> counted(Flux<T> flux) {
var publishedFlux = flux.cache();
return publishedFlux
.count()
.map(count -> new CountedStream<>(publishedFlux, count))
.switchIfEmpty(Mono.fromSupplier(() -> new CountedStream<>(Flux.empty(), 0)));
}
}

View File

@ -13,4 +13,5 @@ public record DatabaseOptions(Map<String, String> extraFlags,
boolean useDirectIO,
boolean allowMemoryMapping,
boolean allowNettyDirect,
boolean useNettyDirect) {}
boolean useNettyDirect,
boolean enableDbAssertionsWhenUsingAssertions) {}

View File

@ -87,7 +87,7 @@ public interface LuceneIndex<T, U> extends LLSnapshottable {
Mono<Void> flush();
Mono<Void> refresh();
Mono<Void> refresh(boolean force);
private static <T, U> ValueTransformer<T, U> getValueGetterTransformer(ValueGetter<T, U> valueGetter) {
return new ValueTransformer<T, U>() {

View File

@ -93,7 +93,7 @@ public class LuceneIndexImpl<T, U> implements LuceneIndex<T, U> {
private Mono<SearchResult<T, U>> transformLuceneResultWithValues(LLSearchResultShard llSearchResult,
ValueGetter<T, U> valueGetter) {
return Mono.just(new SearchResult<>(llSearchResult.results().map(signal -> {
return Mono.fromCallable(() -> new SearchResult<>(llSearchResult.results().map(signal -> {
var key = signal.key().map(indicizer::getKey);
return new SearchResultItem<>(key, key.flatMap(valueGetter::get), signal.score());
}), llSearchResult.totalHitsCount(), llSearchResult.release()));
@ -110,7 +110,10 @@ public class LuceneIndexImpl<T, U> implements LuceneIndex<T, U> {
Mono.just(tuple3.getT3()),
tuple3.getT1()
));
return Mono.just(new SearchResult<>(resultItemsFlux, llSearchResult.totalHitsCount(), llSearchResult.release()));
return Mono.fromCallable(() -> new SearchResult<>(resultItemsFlux,
llSearchResult.totalHitsCount(),
llSearchResult.release()
));
}
@Override
@ -214,8 +217,8 @@ public class LuceneIndexImpl<T, U> implements LuceneIndex<T, U> {
* Refresh index searcher
*/
@Override
public Mono<Void> refresh() {
return luceneIndex.refresh();
public Mono<Void> refresh(boolean force) {
return luceneIndex.refresh(force);
}
@Override

View File

@ -70,5 +70,5 @@ public interface LLLuceneIndex extends LLSnapshottable {
/**
* Refresh index searcher
*/
Mono<Void> refresh();
Mono<Void> refresh(boolean force);
}

View File

@ -12,7 +12,9 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.function.BiFunction;
import java.util.function.Function;
import org.jetbrains.annotations.Nullable;
@ -115,12 +117,20 @@ public interface DatabaseStageMap<T, U, US extends DatabaseStage<U>> extends Dat
return removeAndGetPrevious(key).map(o -> true).defaultIfEmpty(false);
}
/**
* GetMulti must return the elements in sequence!
*/
default Flux<Entry<T, U>> getMulti(@Nullable CompositeSnapshot snapshot, Flux<T> keys, boolean existsAlmostCertainly) {
return keys.flatMapSequential(key -> this
.getValue(snapshot, key, existsAlmostCertainly)
.map(value -> Map.entry(key, value)));
return keys
.flatMapSequential(key -> this
.getValue(snapshot, key, existsAlmostCertainly)
.map(value -> Map.entry(key, value))
);
}
/**
* GetMulti must return the elements in sequence!
*/
default Flux<Entry<T, U>> getMulti(@Nullable CompositeSnapshot snapshot, Flux<T> keys) {
return getMulti(snapshot, keys, false);
}
@ -271,9 +281,14 @@ public interface DatabaseStageMap<T, U, US extends DatabaseStage<U>> extends Dat
@Override
public <X> Flux<Tuple3<X, T, U>> transform(Flux<Tuple2<X, T>> keys) {
return Flux.defer(() -> {
ConcurrentHashMap<T, X> extraValues = new ConcurrentHashMap<>();
return getMulti(snapshot, keys.doOnNext(key -> extraValues.put(key.getT2(), key.getT1())).map(Tuple2::getT2))
.map(result -> Tuples.of(extraValues.get(result.getKey()), result.getKey(), result.getValue()));
ConcurrentLinkedQueue<X> extraValues = new ConcurrentLinkedQueue<>();
return getMulti(snapshot, keys.map(key -> {
extraValues.add(key.getT1());
return key.getT2();
})).map(result -> {
var extraValue = extraValues.remove();
return Tuples.of(extraValue, result.getKey(), result.getValue());
});
});
}
};

View File

@ -30,15 +30,18 @@ public class SubStageGetterHashMap<T, U, TH> implements
private final Serializer<U, ByteBuf> valueSerializer;
private final Function<T, TH> keyHashFunction;
private final SerializerFixedBinaryLength<TH, ByteBuf> keyHashSerializer;
private final boolean enableAssertionsWhenUsingAssertions;
public SubStageGetterHashMap(Serializer<T, ByteBuf> keySerializer,
Serializer<U, ByteBuf> valueSerializer,
Function<T, TH> keyHashFunction,
SerializerFixedBinaryLength<TH, ByteBuf> keyHashSerializer) {
SerializerFixedBinaryLength<TH, ByteBuf> keyHashSerializer,
boolean enableAssertionsWhenUsingAssertions) {
this.keySerializer = keySerializer;
this.valueSerializer = valueSerializer;
this.keyHashFunction = keyHashFunction;
this.keyHashSerializer = keyHashSerializer;
this.enableAssertionsWhenUsingAssertions = enableAssertionsWhenUsingAssertions;
}
@Override
@ -49,7 +52,7 @@ public class SubStageGetterHashMap<T, U, TH> implements
try {
return Mono
.defer(() -> {
if (assertsEnabled) {
if (assertsEnabled && enableAssertionsWhenUsingAssertions) {
return checkKeyFluxConsistency(prefixKey.retain(), debuggingKeys);
} else {
return Mono
@ -86,7 +89,7 @@ public class SubStageGetterHashMap<T, U, TH> implements
@Override
public boolean needsDebuggingKeyFlux() {
return assertsEnabled;
return assertsEnabled && enableAssertionsWhenUsingAssertions;
}
private Mono<Void> checkKeyFluxConsistency(ByteBuf prefixKey, List<ByteBuf> keys) {

View File

@ -30,13 +30,16 @@ public class SubStageGetterHashSet<T, TH> implements
private final Serializer<T, ByteBuf> keySerializer;
private final Function<T, TH> keyHashFunction;
private final SerializerFixedBinaryLength<TH, ByteBuf> keyHashSerializer;
private final boolean enableAssertionsWhenUsingAssertions;
public SubStageGetterHashSet(Serializer<T, ByteBuf> keySerializer,
Function<T, TH> keyHashFunction,
SerializerFixedBinaryLength<TH, ByteBuf> keyHashSerializer) {
SerializerFixedBinaryLength<TH, ByteBuf> keyHashSerializer,
boolean enableAssertionsWhenUsingAssertions) {
this.keySerializer = keySerializer;
this.keyHashFunction = keyHashFunction;
this.keyHashSerializer = keyHashSerializer;
this.enableAssertionsWhenUsingAssertions = enableAssertionsWhenUsingAssertions;
}
@Override
@ -47,7 +50,7 @@ public class SubStageGetterHashSet<T, TH> implements
try {
return Mono
.defer(() -> {
if (assertsEnabled) {
if (assertsEnabled && enableAssertionsWhenUsingAssertions) {
return checkKeyFluxConsistency(prefixKey.retain(), debuggingKeys);
} else {
return Mono
@ -83,7 +86,7 @@ public class SubStageGetterHashSet<T, TH> implements
@Override
public boolean needsDebuggingKeyFlux() {
return assertsEnabled;
return assertsEnabled && enableAssertionsWhenUsingAssertions;
}
private Mono<Void> checkKeyFluxConsistency(ByteBuf prefixKey, List<ByteBuf> keys) {

View File

@ -25,11 +25,13 @@ public class SubStageGetterMap<T, U> implements SubStageGetter<Map<T, U>, Databa
private final SerializerFixedBinaryLength<T, ByteBuf> keySerializer;
private final Serializer<U, ByteBuf> valueSerializer;
private final boolean enableAssertionsWhenUsingAssertions;
public SubStageGetterMap(SerializerFixedBinaryLength<T, ByteBuf> keySerializer,
Serializer<U, ByteBuf> valueSerializer) {
Serializer<U, ByteBuf> valueSerializer, boolean enableAssertionsWhenUsingAssertions) {
this.keySerializer = keySerializer;
this.valueSerializer = valueSerializer;
this.enableAssertionsWhenUsingAssertions = enableAssertionsWhenUsingAssertions;
}
@Override
@ -40,7 +42,7 @@ public class SubStageGetterMap<T, U> implements SubStageGetter<Map<T, U>, Databa
try {
return Mono
.defer(() -> {
if (assertsEnabled) {
if (assertsEnabled && enableAssertionsWhenUsingAssertions) {
return checkKeyFluxConsistency(prefixKey.retain(), debuggingKeys);
} else {
return Mono
@ -75,7 +77,7 @@ public class SubStageGetterMap<T, U> implements SubStageGetter<Map<T, U>, Databa
@Override
public boolean needsDebuggingKeyFlux() {
return assertsEnabled;
return assertsEnabled && enableAssertionsWhenUsingAssertions;
}
private Mono<Void> checkKeyFluxConsistency(ByteBuf prefixKey, List<ByteBuf> keys) {

View File

@ -25,14 +25,16 @@ public class SubStageGetterMapDeep<T, U, US extends DatabaseStage<U>> implements
private final SubStageGetter<U, US> subStageGetter;
private final SerializerFixedBinaryLength<T, ByteBuf> keySerializer;
private final int keyExtLength;
private final boolean enableAssertionsWhenUsingAssertions;
public SubStageGetterMapDeep(SubStageGetter<U, US> subStageGetter,
SerializerFixedBinaryLength<T, ByteBuf> keySerializer,
int keyExtLength) {
int keyExtLength, boolean enableAssertionsWhenUsingAssertions) {
this.subStageGetter = subStageGetter;
this.keySerializer = keySerializer;
this.keyExtLength = keyExtLength;
assert keyExtConsistency();
this.enableAssertionsWhenUsingAssertions = enableAssertionsWhenUsingAssertions;
}
@ -54,7 +56,7 @@ public class SubStageGetterMapDeep<T, U, US extends DatabaseStage<U>> implements
try {
return Mono
.defer(() -> {
if (assertsEnabled) {
if (assertsEnabled && enableAssertionsWhenUsingAssertions) {
return checkKeyFluxConsistency(prefixKey.retain(), debuggingKeys);
} else {
return Mono
@ -90,7 +92,7 @@ public class SubStageGetterMapDeep<T, U, US extends DatabaseStage<U>> implements
@Override
public boolean needsDebuggingKeyFlux() {
return assertsEnabled;
return assertsEnabled && enableAssertionsWhenUsingAssertions;
}
private Mono<Void> checkKeyFluxConsistency(ByteBuf prefixKey, List<ByteBuf> keys) {

View File

@ -24,9 +24,12 @@ public class SubStageGetterSet<T> implements SubStageGetter<Map<T, Nothing>, Dat
}
private final SerializerFixedBinaryLength<T, ByteBuf> keySerializer;
private final boolean enableAssertionsWhenUsingAssertions;
public SubStageGetterSet(SerializerFixedBinaryLength<T, ByteBuf> keySerializer) {
public SubStageGetterSet(SerializerFixedBinaryLength<T, ByteBuf> keySerializer,
boolean enableAssertionsWhenUsingAssertions) {
this.keySerializer = keySerializer;
this.enableAssertionsWhenUsingAssertions = enableAssertionsWhenUsingAssertions;
}
@Override
@ -37,7 +40,7 @@ public class SubStageGetterSet<T> implements SubStageGetter<Map<T, Nothing>, Dat
try {
return Mono
.defer(() -> {
if (assertsEnabled) {
if (assertsEnabled && enableAssertionsWhenUsingAssertions) {
return checkKeyFluxConsistency(prefixKey.retain(), debuggingKeys);
} else {
return Mono
@ -72,7 +75,7 @@ public class SubStageGetterSet<T> implements SubStageGetter<Map<T, Nothing>, Dat
@Override
public boolean needsDebuggingKeyFlux() {
return assertsEnabled;
return assertsEnabled && enableAssertionsWhenUsingAssertions;
}
private Mono<Void> checkKeyFluxConsistency(ByteBuf prefixKey, List<ByteBuf> keys) {

View File

@ -287,7 +287,9 @@ public class LLLocalDictionary implements LLDictionary {
throw new RocksDBException("Key buffer must be direct");
}
ByteBuffer keyNioBuffer = LLUtils.toDirect(key);
assert keyNioBuffer.isDirect();
if (databaseOptions.enableDbAssertionsWhenUsingAssertions()) {
assert keyNioBuffer.isDirect();
}
// Create a direct result buffer because RocksDB works only with direct buffers
ByteBuf resultBuf = alloc.directBuffer(LLLocalDictionary.INITIAL_DIRECT_READ_BYTE_BUF_SIZE_BYTES);
try {
@ -297,35 +299,39 @@ public class LLLocalDictionary implements LLDictionary {
do {
// Create the result nio buffer to pass to RocksDB
resultNioBuf = resultBuf.nioBuffer(0, resultBuf.capacity());
assert keyNioBuffer.isDirect();
assert resultNioBuf.isDirect();
if (databaseOptions.enableDbAssertionsWhenUsingAssertions()) {
assert keyNioBuffer.isDirect();
assert resultNioBuf.isDirect();
}
valueSize = db.get(cfh,
Objects.requireNonNullElse(readOptions, EMPTY_READ_OPTIONS),
keyNioBuffer.position(0),
resultNioBuf
);
if (valueSize != RocksDB.NOT_FOUND) {
// todo: check if position is equal to data that have been read
// todo: check if limit is equal to value size or data that have been read
assert valueSize <= 0 || resultNioBuf.limit() > 0;
if (databaseOptions.enableDbAssertionsWhenUsingAssertions()) {
// todo: check if position is equal to data that have been read
// todo: check if limit is equal to value size or data that have been read
assert valueSize <= 0 || resultNioBuf.limit() > 0;
// If the locking is enabled the data is safe, so since we are appending data to the end,
// we need to check if it has been appended correctly or it it has been overwritten.
// We must not do this check otherwise because if there is no locking the data can be
// overwritten with a smaller value the next time.
if (updateMode == UpdateMode.ALLOW) {
// Check if read data is larger than previously read data.
// If it's smaller or equals it means that RocksDB is overwriting the beginning of the result buffer.
assert resultNioBuf.limit() > assertionReadData;
if (ASSERTIONS_ENABLED) {
assertionReadData = resultNioBuf.limit();
// If the locking is enabled the data is safe, so since we are appending data to the end,
// we need to check if it has been appended correctly or it it has been overwritten.
// We must not do this check otherwise because if there is no locking the data can be
// overwritten with a smaller value the next time.
if (updateMode == UpdateMode.ALLOW) {
// Check if read data is larger than previously read data.
// If it's smaller or equals it means that RocksDB is overwriting the beginning of the result buffer.
assert resultNioBuf.limit() > assertionReadData;
if (ASSERTIONS_ENABLED) {
assertionReadData = resultNioBuf.limit();
}
}
}
// Check if read data is not bigger than the total value size.
// If it's bigger it means that RocksDB is writing the start of the result into the result
// buffer more than once.
assert resultNioBuf.limit() <= valueSize;
// Check if read data is not bigger than the total value size.
// If it's bigger it means that RocksDB is writing the start of the result into the result
// buffer more than once.
assert resultNioBuf.limit() <= valueSize;
}
if (valueSize <= resultNioBuf.limit()) {
// Return the result ready to be read
@ -392,13 +398,17 @@ public class LLLocalDictionary implements LLDictionary {
if (!value.isDirect()) {
throw new RocksDBException("Value buffer must be direct");
}
var keyNioBuffer = LLUtils.toDirect(key);
var keyNioBuffer = LLUtils.toDirect(key);
if (databaseOptions.enableDbAssertionsWhenUsingAssertions()) {
assert keyNioBuffer.isDirect();
}
var valueNioBuffer = LLUtils.toDirect(value);
var valueNioBuffer = LLUtils.toDirect(value);
if (databaseOptions.enableDbAssertionsWhenUsingAssertions()) {
assert valueNioBuffer.isDirect();
db.put(cfh, Objects.requireNonNullElse(writeOptions, EMPTY_WRITE_OPTIONS), keyNioBuffer, valueNioBuffer);
}
db.put(cfh, Objects.requireNonNullElse(writeOptions, EMPTY_WRITE_OPTIONS), keyNioBuffer, valueNioBuffer);
} else {
db.put(cfh, Objects.requireNonNullElse(writeOptions, EMPTY_WRITE_OPTIONS), LLUtils.toArray(key), LLUtils.toArray(value));
}
@ -750,9 +760,11 @@ public class LLLocalDictionary implements LLDictionary {
ByteBuf prevDataToSendToUpdater = prevData == null ? null : prevData.retainedSlice();
try {
newData = updater.apply(prevDataToSendToUpdater == null ? null : prevDataToSendToUpdater.retain());
assert prevDataToSendToUpdater == null
|| prevDataToSendToUpdater.readerIndex() == 0
|| !prevDataToSendToUpdater.isReadable();
if (databaseOptions.enableDbAssertionsWhenUsingAssertions()) {
assert prevDataToSendToUpdater == null
|| prevDataToSendToUpdater.readerIndex() == 0
|| !prevDataToSendToUpdater.isReadable();
}
} finally {
if (prevDataToSendToUpdater != null) {
prevDataToSendToUpdater.release();
@ -892,7 +904,9 @@ public class LLLocalDictionary implements LLDictionary {
.single()
.map(LLUtils::booleanToResponseByteBuffer)
.doAfterTerminate(() -> {
assert key.refCnt() > 0;
if (databaseOptions.enableDbAssertionsWhenUsingAssertions()) {
assert key.refCnt() > 0;
}
});
case PREVIOUS_VALUE -> Mono
.fromCallable(() -> {
@ -918,7 +932,9 @@ public class LLLocalDictionary implements LLDictionary {
try {
return dbGet(cfh, null, key.retain(), true);
} finally {
assert key.refCnt() > 0;
if (databaseOptions.enableDbAssertionsWhenUsingAssertions()) {
assert key.refCnt() > 0;
}
}
}
} else {

View File

@ -470,8 +470,10 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
private ColumnFamilyHandle getCfh(byte[] columnName) throws RocksDBException {
ColumnFamilyHandle cfh = handles.get(Column.special(Column.toString(columnName)));
//noinspection RedundantIfStatement
if (!enableColumnsBug) {
assert Arrays.equals(cfh.getName(), columnName);
if (databaseOptions.enableDbAssertionsWhenUsingAssertions()) {
if (!enableColumnsBug) {
assert Arrays.equals(cfh.getName(), columnName);
}
}
return cfh;
}

View File

@ -91,7 +91,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
Schedulers.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE,
"lucene",
Integer.MAX_VALUE,
true
false
);
// Scheduler used to get callback values of LuceneStreamSearcher without creating deadlocks
private final Scheduler luceneSearcherScheduler = Schedulers.newBoundedElastic(
@ -99,7 +99,15 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
Schedulers.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE,
"lucene-searcher",
60,
true
false
);
// Scheduler used to get callback values of LuceneStreamSearcher without creating deadlocks
private final Scheduler luceneWriterScheduler = Schedulers.newBoundedElastic(
4,
Schedulers.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE,
"lucene-writer",
60,
false
);
private final String luceneIndexName;
@ -353,12 +361,13 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
return Mono.<Void>fromCallable(() -> {
scheduledTasksLifecycle.startScheduledTask();
try {
//noinspection BlockingMethodInNonBlockingContext
indexWriter.addDocument(LLUtils.toDocument(doc));
return null;
} finally {
scheduledTasksLifecycle.endScheduledTask();
}
}).subscribeOn(Schedulers.boundedElastic());
}).subscribeOn(luceneWriterScheduler);
}
@Override
@ -369,13 +378,14 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
.<Void>fromCallable(() -> {
scheduledTasksLifecycle.startScheduledTask();
try {
//noinspection BlockingMethodInNonBlockingContext
indexWriter.addDocuments(LLUtils.toDocumentsFromEntries(documentsList));
return null;
} finally {
scheduledTasksLifecycle.endScheduledTask();
}
})
.subscribeOn(Schedulers.boundedElastic())
.subscribeOn(luceneWriterScheduler)
);
}
@ -385,12 +395,13 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
return Mono.<Void>fromCallable(() -> {
scheduledTasksLifecycle.startScheduledTask();
try {
//noinspection BlockingMethodInNonBlockingContext
indexWriter.deleteDocuments(LLUtils.toTerm(id));
return null;
} finally {
scheduledTasksLifecycle.endScheduledTask();
}
}).subscribeOn(Schedulers.boundedElastic());
}).subscribeOn(luceneWriterScheduler);
}
@Override
@ -398,12 +409,13 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
return Mono.<Void>fromCallable(() -> {
scheduledTasksLifecycle.startScheduledTask();
try {
//noinspection BlockingMethodInNonBlockingContext
indexWriter.updateDocument(LLUtils.toTerm(id), LLUtils.toDocument(document));
} finally {
scheduledTasksLifecycle.endScheduledTask();
}
return null;
}).subscribeOn(Schedulers.boundedElastic());
}).subscribeOn(luceneWriterScheduler);
}
@Override
@ -419,6 +431,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
for (Entry<LLTerm, LLDocument> entry : documentsMap.entrySet()) {
LLTerm key = entry.getKey();
LLDocument value = entry.getValue();
//noinspection BlockingMethodInNonBlockingContext
indexWriter.updateDocument(LLUtils.toTerm(key), LLUtils.toDocument(value));
}
return null;
@ -426,7 +439,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
scheduledTasksLifecycle.endScheduledTask();
}
})
.subscribeOn(Schedulers.boundedElastic());
.subscribeOn(luceneWriterScheduler);
}
@Override
@ -634,14 +647,20 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
}
@Override
public Mono<Void> refresh() {
public Mono<Void> refresh(boolean force) {
return Mono
.<Void>fromCallable(() -> {
scheduledTasksLifecycle.startScheduledTask();
try {
if (scheduledTasksLifecycle.isCancelled()) return null;
//noinspection BlockingMethodInNonBlockingContext
searcherManager.maybeRefresh();
if (force) {
if (scheduledTasksLifecycle.isCancelled()) return null;
//noinspection BlockingMethodInNonBlockingContext
searcherManager.maybeRefreshBlocking();
} else {
//noinspection BlockingMethodInNonBlockingContext
searcherManager.maybeRefresh();
}
} finally {
scheduledTasksLifecycle.endScheduledTask();
}

View File

@ -269,10 +269,10 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex {
}
@Override
public Mono<Void> refresh() {
public Mono<Void> refresh(boolean force) {
return Flux
.fromArray(luceneIndices)
.flatMap(LLLocalLuceneIndex::refresh)
.flatMap(index -> index.refresh(force))
.then();
}

View File

@ -252,6 +252,22 @@ public class BinaryLexicographicList implements ByteList {
return true;
}
if (o == null || getClass() != o.getClass()) {
if (o instanceof List) {
int i = 0;
for (Object o1 : ((List<?>) o)) {
if (i >= size()) {
return false;
}
if (!(o1 instanceof Byte)) {
return false;
}
if (this.bytes[i] != (Byte) o1) {
return false;
}
i++;
}
return (size() == i);
}
return false;
}
BinaryLexicographicList bytes1 = (BinaryLexicographicList) o;

View File

@ -19,6 +19,7 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.function.Function;
import org.jetbrains.annotations.Nullable;
@ -150,7 +151,7 @@ public class LLMemoryDictionary implements LLDictionary {
public Mono<ByteBuf> put(ByteBuf key, ByteBuf value, LLDictionaryResultType resultType) {
try {
return Mono
.fromCallable(() -> mainDb.put(k(key),k(value)))
.fromCallable(() -> mainDb.put(k(key), k(value)))
.transform(result -> this.transformResult(result, resultType))
.onErrorMap(cause -> new IOException("Failed to read " + LLUtils.toStringSafe(key), cause))
.doFirst(key::retain)
@ -169,7 +170,23 @@ public class LLMemoryDictionary implements LLDictionary {
public Mono<Delta<ByteBuf>> updateAndGetDelta(ByteBuf key,
Function<@Nullable ByteBuf, @Nullable ByteBuf> updater,
boolean existsAlmostCertainly) {
return null;
return Mono.fromCallable(() -> {
AtomicReference<ByteBuf> oldRef = new AtomicReference<>(null);
var newValue = mainDb.compute(k(key), (_unused, old) -> {
if (old != null) {
oldRef.set(kk(old));
}
var v = updater.apply(old != null ? kk(old) : null);
try {
return k(v);
} finally {
if (v != null) {
v.release();
}
}
});
return new Delta<>(oldRef.get(), kk(newValue));
});
}
@Override
@ -197,13 +214,13 @@ public class LLMemoryDictionary implements LLDictionary {
Flux<Tuple2<K, ByteBuf>> keys,
boolean existsAlmostCertainly) {
return keys
.handle((key, sink) -> {
.flatMapSequential(key -> {
try {
var v = snapshots.get(resolveSnapshot(snapshot)).get(k(key.getT2()));
ByteList v = snapshots.get(resolveSnapshot(snapshot)).get(k(key.getT2()));
if (v == null) {
sink.complete();
return Flux.empty();
} else {
sink.next(Tuples.of(key.getT1(), key.getT2().retain(), kk(v)));
return Flux.just(Tuples.of(key.getT1(), key.getT2().retain(), kk(v)));
}
} finally {
key.getT2().release();

View File

@ -34,6 +34,7 @@ public class LLMemoryKeyValueDatabase implements LLKeyValueDatabase {
private final ConcurrentHashMap<Long, ConcurrentHashMap<String, ConcurrentSkipListMap<ByteList, ByteList>>> snapshots = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, ConcurrentSkipListMap<ByteList, ByteList>> mainDb;
private final ConcurrentHashMap<String, LLMemoryDictionary> singletons = new ConcurrentHashMap<>();
public LLMemoryKeyValueDatabase(ByteBufAllocator allocator, String name, List<Column> columns) {
this.allocator = allocator;
@ -46,8 +47,21 @@ public class LLMemoryKeyValueDatabase implements LLKeyValueDatabase {
}
@Override
public Mono<? extends LLSingleton> getSingleton(byte[] singletonListColumnName, byte[] name, byte[] defaultValue) {
return Mono.error(new UnsupportedOperationException("Not implemented"));
public Mono<? extends LLSingleton> getSingleton(byte[] singletonListColumnName, byte[] singletonName, byte[] defaultValue) {
var columnNameString = new String(singletonListColumnName, StandardCharsets.UTF_8);
var dict = singletons.computeIfAbsent(columnNameString, _unused -> new LLMemoryDictionary(allocator,
name,
columnNameString,
UpdateMode.ALLOW,
snapshots,
mainDb
));
return Mono
.fromCallable(() -> new LLMemorySingleton(dict, singletonName)).flatMap(singleton -> singleton
.get(null)
.switchIfEmpty(singleton.set(defaultValue).then(Mono.empty()))
.thenReturn(singleton)
);
}
@Override

View File

@ -0,0 +1,57 @@
package it.cavallium.dbengine.database.memory;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import it.cavallium.dbengine.database.LLDictionaryResultType;
import it.cavallium.dbengine.database.LLSingleton;
import it.cavallium.dbengine.database.LLSnapshot;
import it.cavallium.dbengine.database.LLUtils;
import org.jetbrains.annotations.Nullable;
import reactor.core.publisher.Mono;
public class LLMemorySingleton implements LLSingleton {
private final LLMemoryDictionary dict;
private final byte[] singletonName;
public LLMemorySingleton(LLMemoryDictionary dict, byte[] singletonName) {
this.dict = dict;
this.singletonName = singletonName;
}
@Override
public String getDatabaseName() {
return dict.getDatabaseName();
}
@Override
public Mono<byte[]> get(@Nullable LLSnapshot snapshot) {
var bb = Unpooled.wrappedBuffer(singletonName);
return Mono
.defer(() -> dict.get(snapshot, bb.retain(), false))
.map(b -> {
try {
return LLUtils.toArray(b);
} finally {
b.release();
}
})
.doAfterTerminate(bb::release)
.doFirst(bb::retain);
}
@Override
public Mono<Void> set(byte[] value) {
var bbKey = Unpooled.wrappedBuffer(singletonName);
var bbVal = Unpooled.wrappedBuffer(value);
return Mono
.defer(() -> dict
.put(bbKey.retain(), bbVal.retain(), LLDictionaryResultType.VOID)
)
.doAfterTerminate(bbKey::release)
.doAfterTerminate(bbVal::release)
.doFirst(bbKey::retain)
.doFirst(bbVal::retain)
.then();
}
}

View File

@ -3,9 +3,11 @@ package it.cavallium.dbengine.lucene;
import it.cavallium.dbengine.client.CompositeSnapshot;
import it.cavallium.dbengine.client.IndicizerAnalyzers;
import it.cavallium.dbengine.client.IndicizerSimilarities;
import it.cavallium.dbengine.client.query.BasicType;
import it.cavallium.dbengine.client.query.QueryParser;
import it.cavallium.dbengine.client.query.current.data.QueryParams;
import it.cavallium.dbengine.database.LLKeyScore;
import it.cavallium.dbengine.database.LLScoreMode;
import it.cavallium.dbengine.database.collections.DatabaseMapDictionary;
import it.cavallium.dbengine.database.collections.DatabaseMapDictionaryDeep;
import it.cavallium.dbengine.database.collections.ValueGetter;
@ -432,6 +434,6 @@ public class LuceneUtils {
}
public static int totalHitsThreshold() {
return 0;
return 1;
}
}

View File

@ -142,8 +142,8 @@ class ScoredSimpleLuceneShardSearcher implements LuceneShardSearcher {
return new LuceneSearchResult(result.totalHits.value,
firstPageHits
.concatWith(nextHits)
.transform(flux -> LuceneUtils.filterTopDoc(flux, queryParams)),
.concatWith(nextHits),
//.transform(flux -> LuceneUtils.filterTopDoc(flux, queryParams)),
release
);
})

View File

@ -56,48 +56,58 @@ public class SimpleLuceneLocalSearcher implements LuceneLocalSearcher {
.take(queryParams.limit(), true);
Flux<LLKeyScore> nextHits = Flux.defer(() -> {
if (paginationInfo.forceSinglePage() || paginationInfo.totalLimit() - paginationInfo.firstPageLimit() <= 0) {
return Flux.empty();
}
return Flux
.<TopDocs, CurrentPageInfo>generate(
() -> new CurrentPageInfo(LuceneUtils.getLastScoreDoc(firstPageTopDocs.scoreDocs), paginationInfo.totalLimit() - paginationInfo.firstPageLimit(), 1),
(s, sink) -> {
if (s.last() != null && s.remainingLimit() > 0) {
TopDocs pageTopDocs;
try {
TopDocsCollector<ScoreDoc> collector = TopDocsSearcher.getTopDocsCollector(queryParams.sort(),
s.currentPageLimit(),
s.last(),
LuceneUtils.totalHitsThreshold()
);
//noinspection BlockingMethodInNonBlockingContext
indexSearcher.search(queryParams.query(), collector);
pageTopDocs = collector.topDocs();
} catch (IOException e) {
sink.error(e);
Flux<LLKeyScore> nextHits;
if (paginationInfo.forceSinglePage() || paginationInfo.totalLimit() - paginationInfo.firstPageLimit() <= 0) {
nextHits = null;
} else {
nextHits = Flux.defer(() -> {
return Flux
.<TopDocs, CurrentPageInfo>generate(
() -> new CurrentPageInfo(LuceneUtils.getLastScoreDoc(firstPageTopDocs.scoreDocs), paginationInfo.totalLimit() - paginationInfo.firstPageLimit(), 1),
(s, sink) -> {
if (s.last() != null && s.remainingLimit() > 0) {
TopDocs pageTopDocs;
try {
TopDocsCollector<ScoreDoc> collector = TopDocsSearcher.getTopDocsCollector(queryParams.sort(),
s.currentPageLimit(),
s.last(),
LuceneUtils.totalHitsThreshold()
);
//noinspection BlockingMethodInNonBlockingContext
indexSearcher.search(queryParams.query(), collector);
pageTopDocs = collector.topDocs();
} catch (IOException e) {
sink.error(e);
return EMPTY_STATUS;
}
var pageLastDoc = LuceneUtils.getLastScoreDoc(pageTopDocs.scoreDocs);
sink.next(pageTopDocs);
return new CurrentPageInfo(pageLastDoc, s.remainingLimit() - s.currentPageLimit(), s.pageIndex() + 1);
} else {
sink.complete();
return EMPTY_STATUS;
}
var pageLastDoc = LuceneUtils.getLastScoreDoc(pageTopDocs.scoreDocs);
sink.next(pageTopDocs);
return new CurrentPageInfo(pageLastDoc, s.remainingLimit() - s.currentPageLimit(), s.pageIndex() + 1);
} else {
sink.complete();
return EMPTY_STATUS;
}
},
s -> {}
)
.subscribeOn(scheduler)
.concatMap(topFieldDoc -> LuceneUtils
.convertHits(topFieldDoc.scoreDocs, IndexSearchers.unsharded(indexSearcher), keyFieldName, scheduler)
);
});
},
s -> {}
)
.subscribeOn(scheduler)
.concatMap(topFieldDoc -> LuceneUtils
.convertHits(topFieldDoc.scoreDocs, IndexSearchers.unsharded(indexSearcher), keyFieldName, scheduler)
);
});
}
return new LuceneSearchResult(firstPageTopDocs.totalHits.value, firstPageMono
.concatWith(nextHits)
.transform(flux -> LuceneUtils.filterTopDoc(flux, queryParams)),
Flux<LLKeyScore> combinedFlux;
if (nextHits != null) {
combinedFlux = firstPageMono
.concatWith(nextHits);
} else {
combinedFlux = firstPageMono;
}
return new LuceneSearchResult(firstPageTopDocs.totalHits.value, combinedFlux,
//.transform(flux -> LuceneUtils.filterTopDoc(flux, queryParams)),
releaseIndexSearcher
);
})

View File

@ -131,8 +131,8 @@ class UnscoredLuceneShardSearcher implements LuceneShardSearcher {
});
return new LuceneSearchResult(result.totalHits.value, firstPageHits
.concatWith(nextHits)
.transform(flux -> LuceneUtils.filterTopDoc(flux, queryParams)),
.concatWith(nextHits),
//.transform(flux -> LuceneUtils.filterTopDoc(flux, queryParams)),
release
);
})

View File

@ -58,7 +58,7 @@ public class DbTestUtils {
.then(new LLLocalDatabaseConnection(DbTestUtils.ALLOCATOR, wrkspcPath).connect())
.flatMap(conn -> conn.getDatabase("testdb",
List.of(Column.dictionary("testmap"), Column.special("ints"), Column.special("longs")),
new DatabaseOptions(Map.of(), true, false, true, false, true, true, true)
new DatabaseOptions(Map.of(), true, false, true, false, true, true, true, true)
)),
action,
db -> db.close().then(Mono.fromCallable(() -> {
@ -149,7 +149,8 @@ public class DbTestUtils {
SerializerFixedBinaryLength.utf8(DbTestUtils.ALLOCATOR, key1Bytes),
key2Bytes,
new SubStageGetterMap<>(SerializerFixedBinaryLength.utf8(DbTestUtils.ALLOCATOR, key2Bytes),
Serializer.utf8(DbTestUtils.ALLOCATOR)
Serializer.utf8(DbTestUtils.ALLOCATOR),
true
)
);
}
@ -164,7 +165,8 @@ public class DbTestUtils {
new SubStageGetterHashMap<>(Serializer.utf8(DbTestUtils.ALLOCATOR),
Serializer.utf8(DbTestUtils.ALLOCATOR),
String::hashCode,
SerializerFixedBinaryLength.intSerializer(DbTestUtils.ALLOCATOR)
SerializerFixedBinaryLength.intSerializer(DbTestUtils.ALLOCATOR),
true
)
);
}

View File

@ -75,7 +75,7 @@ public class OldDatabaseTests {
.map(dictionary -> DatabaseMapDictionaryDeep.deepTail(dictionary,
new FixedStringSerializer(3),
4,
new SubStageGetterMap<>(new FixedStringSerializer(4), Serializer.noop())
new SubStageGetterMap<>(new FixedStringSerializer(4), Serializer.noop(), true)
))
.flatMap(collection -> Flux
.fromIterable(originalSuperKeys)
@ -135,7 +135,7 @@ public class OldDatabaseTests {
.then(new LLLocalDatabaseConnection(PooledByteBufAllocator.DEFAULT, wrkspcPath).connect())
.flatMap(conn -> conn.getDatabase("testdb",
List.of(Column.dictionary("testmap")),
new DatabaseOptions(Map.of(), true, false, true, false, true, true, true)
new DatabaseOptions(Map.of(), true, false, true, false, true, true, true, true)
));
}
@ -159,14 +159,14 @@ public class OldDatabaseTests {
.map(dictionary -> DatabaseMapDictionaryDeep.deepTail(dictionary,
new FixedStringSerializer(3),
4,
new SubStageGetterMap<>(new FixedStringSerializer(4), Serializer.noop())
new SubStageGetterMap<>(new FixedStringSerializer(4), Serializer.noop(), true)
)),
db
.getDictionary("testmap", UpdateMode.DISALLOW)
.map(dictionary -> DatabaseMapDictionaryDeep.deepTail(dictionary,
new FixedStringSerializer(6),
7,
new SubStageGetterMap<>(new FixedStringSerializer(7), Serializer.noop())
new SubStageGetterMap<>(new FixedStringSerializer(7), Serializer.noop(), true)
))
)
.single()