Update DatabaseMapDictionary.java, DatabaseMapDictionaryDeep.java, and 7 more files...

This commit is contained in:
Andrea Cavalli 2021-02-01 11:00:27 +01:00
parent f537303b90
commit ffc7e3c35a
9 changed files with 84 additions and 35 deletions

View File

@ -21,7 +21,10 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
private final Serializer<U, ByteBuf> valueSerializer;
protected DatabaseMapDictionary(LLDictionary dictionary, byte[] prefixKey, SerializerFixedBinaryLength<T, ByteBuf> keySuffixSerializer, Serializer<U, ByteBuf> valueSerializer) {
protected DatabaseMapDictionary(LLDictionary dictionary,
byte[] prefixKey,
SerializerFixedBinaryLength<T, ByteBuf> keySuffixSerializer,
Serializer<U, ByteBuf> valueSerializer) {
super(dictionary, new SubStageGetterSingle<>(valueSerializer), keySuffixSerializer, prefixKey, 0);
this.valueSerializer = valueSerializer;
}
@ -50,7 +53,10 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
public Mono<Map<T, U>> get(@Nullable CompositeSnapshot snapshot) {
return dictionary
.getRange(resolveSnapshot(snapshot), range)
.collectMap(entry -> deserializeSuffix(stripPrefix(entry.getKey())), entry -> deserialize(entry.getValue()), HashMap::new);
.collectMap(
entry -> deserializeSuffix(stripPrefix(entry.getKey())),
entry -> deserialize(entry.getValue()),
HashMap::new);
}
@Override
@ -62,7 +68,10 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
.map(entry -> Map.entry(serializeSuffix(entry.getKey()), serialize(entry.getValue()))),
true
)
.collectMap(entry -> deserializeSuffix(stripPrefix(entry.getKey())), entry -> deserialize(entry.getValue()), HashMap::new);
.collectMap(
entry -> deserializeSuffix(stripPrefix(entry.getKey())),
entry -> deserialize(entry.getValue()),
HashMap::new);
}
private Entry<byte[], byte[]> stripPrefix(Entry<byte[], byte[]> entry) {
@ -74,7 +83,10 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
public Mono<Map<T, U>> clearAndGetPrevious() {
return dictionary
.setRange(range, Flux.empty(), true)
.collectMap(entry -> deserializeSuffix(stripPrefix(entry.getKey())), entry -> deserialize(entry.getValue()), HashMap::new);
.collectMap(
entry -> deserializeSuffix(stripPrefix(entry.getKey())),
entry -> deserialize(entry.getValue()),
HashMap::new);
}
@Override
@ -84,7 +96,9 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
@Override
public Mono<DatabaseStageEntry<U>> at(@Nullable CompositeSnapshot snapshot, T keySuffix) {
return Mono.just(new DatabaseSingle<>(dictionary, toKey(serializeSuffix(keySuffix)), Serializer.noopBytes())).map(entry -> new DatabaseSingleMapped<>(entry, valueSerializer));
return Mono
.just(new DatabaseSingle<>(dictionary, toKey(serializeSuffix(keySuffix)), Serializer.noopBytes()))
.map(entry -> new DatabaseSingleMapped<>(entry, valueSerializer));
}
@Override
@ -141,7 +155,8 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
public Mono<Void> putMulti(Flux<Entry<T, U>> entries) {
return dictionary
.putMulti(entries
.map(entry -> Map.entry(toKey(serializeSuffix(entry.getKey())), serialize(entry.getValue()))), false)
.map(entry -> Map
.entry(toKey(serializeSuffix(entry.getKey())), serialize(entry.getValue()))), false)
.then();
}
@ -150,7 +165,10 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
return dictionary
.getRangeKeys(resolveSnapshot(snapshot), range)
.map(keySuffix -> Map.entry(deserializeSuffix(stripPrefix(keySuffix)),
new DatabaseSingleMapped<>(new DatabaseSingle<>(dictionary, toKey(stripPrefix(keySuffix)), Serializer.noopBytes()),
new DatabaseSingleMapped<>(
new DatabaseSingle<>(dictionary,
toKey(stripPrefix(keySuffix)),
Serializer.noopBytes()),
valueSerializer
)
));

View File

@ -35,7 +35,11 @@ public class DatabaseMapDictionaryDeep<T, U, US extends DatabaseStage<U>> implem
return fillKeySuffixAndExt(prefixKey, prefixLength, suffixLength, extLength, (byte) 0xFF);
}
protected static byte[] fillKeySuffixAndExt(byte[] prefixKey, int prefixLength, int suffixLength, int extLength, byte fillValue) {
protected static byte[] fillKeySuffixAndExt(byte[] prefixKey,
int prefixLength,
int suffixLength,
int extLength,
byte fillValue) {
assert prefixKey.length == prefixLength;
assert suffixLength > 0;
assert extLength > 0;
@ -44,11 +48,19 @@ public class DatabaseMapDictionaryDeep<T, U, US extends DatabaseStage<U>> implem
return result;
}
protected static byte[] firstKey(byte[] prefixKey, byte[] suffixKey, int prefixLength, int suffixLength, int extLength) {
protected static byte[] firstKey(byte[] prefixKey,
byte[] suffixKey,
int prefixLength,
int suffixLength,
int extLength) {
return fillKeyExt(prefixKey, suffixKey, prefixLength, suffixLength, extLength, (byte) 0x00);
}
protected static byte[] lastKey(byte[] prefixKey, byte[] suffixKey, int prefixLength, int suffixLength, int extLength) {
protected static byte[] lastKey(byte[] prefixKey,
byte[] suffixKey,
int prefixLength,
int suffixLength,
int extLength) {
return fillKeyExt(prefixKey, suffixKey, prefixLength, suffixLength, extLength, (byte) 0xFF);
}
@ -72,20 +84,23 @@ public class DatabaseMapDictionaryDeep<T, U, US extends DatabaseStage<U>> implem
* Use DatabaseMapDictionaryRange.simple instead
*/
@Deprecated
public static <T, U> DatabaseMapDictionaryDeep<T, U, DatabaseStageEntry<U>> simple(LLDictionary dictionary,
public static <T, U> DatabaseMapDictionaryDeep<T, U, DatabaseStageEntry<U>> simple(
LLDictionary dictionary,
SubStageGetterSingle<U> subStageGetter,
SerializerFixedBinaryLength<T, ByteBuf> keySerializer) {
return new DatabaseMapDictionaryDeep<>(dictionary, subStageGetter, keySerializer, EMPTY_BYTES, 0);
}
public static <T, U, US extends DatabaseStage<U>> DatabaseMapDictionaryDeep<T, U, US> deepTail(LLDictionary dictionary,
public static <T, U, US extends DatabaseStage<U>> DatabaseMapDictionaryDeep<T, U, US> deepTail(
LLDictionary dictionary,
SubStageGetter<U, US> subStageGetter,
SerializerFixedBinaryLength<T, ByteBuf> keySerializer,
int keyExtLength) {
return new DatabaseMapDictionaryDeep<>(dictionary, subStageGetter, keySerializer, EMPTY_BYTES, keyExtLength);
}
public static <T, U, US extends DatabaseStage<U>> DatabaseMapDictionaryDeep<T, U, US> deepIntermediate(LLDictionary dictionary,
public static <T, U, US extends DatabaseStage<U>> DatabaseMapDictionaryDeep<T, U, US> deepIntermediate(
LLDictionary dictionary,
SubStageGetter<U, US> subStageGetter,
SerializerFixedBinaryLength<T, ByteBuf> keySuffixSerializer,
byte[] prefixKey,

View File

@ -8,5 +8,8 @@ import reactor.core.publisher.Mono;
public interface SubStageGetter<U, US extends DatabaseStage<U>> {
Mono<US> subStage(LLDictionary dictionary, @Nullable CompositeSnapshot snapshot, byte[] prefixKey, Flux<byte[]> keyFlux);
Mono<US> subStage(LLDictionary dictionary,
@Nullable CompositeSnapshot snapshot,
byte[] prefixKey,
Flux<byte[]> keyFlux);
}

View File

@ -13,7 +13,8 @@ public class SubStageGetterMap<T, U> implements SubStageGetter<Map<T, U>, Databa
private final SerializerFixedBinaryLength<T, ByteBuf> keySerializer;
private final Serializer<U, ByteBuf> valueSerializer;
public SubStageGetterMap(SerializerFixedBinaryLength<T, ByteBuf> keySerializer, Serializer<U, ByteBuf> valueSerializer) {
public SubStageGetterMap(SerializerFixedBinaryLength<T, ByteBuf> keySerializer,
Serializer<U, ByteBuf> valueSerializer) {
this.keySerializer = keySerializer;
this.valueSerializer = valueSerializer;
}

View File

@ -19,10 +19,13 @@ public class CappedWriteBatch implements WriteBatchInterface, AutoCloseable {
private final WriteBatch writeBatch;
/**
*
* @param cap The limit of operations
*/
public CappedWriteBatch(RocksDB db, int cap, int reservedWriteBatchSize, long maxWriteBatchSize, WriteOptions writeOptions) {
public CappedWriteBatch(RocksDB db,
int cap,
int reservedWriteBatchSize,
long maxWriteBatchSize,
WriteOptions writeOptions) {
this.db = db;
this.cap = cap;
this.writeOptions = writeOptions;

View File

@ -452,7 +452,12 @@ public class LLLocalDictionary implements LLDictionary {
return clear().thenMany(Flux.empty());
} else {
return Mono
.fromCallable(() -> new CappedWriteBatch(db, CAPPED_WRITE_BATCH_CAP, RESERVED_WRITE_BATCH_SIZE, MAX_WRITE_BATCH_SIZE, BATCH_WRITE_OPTIONS))
.fromCallable(() -> new CappedWriteBatch(db,
CAPPED_WRITE_BATCH_CAP,
RESERVED_WRITE_BATCH_SIZE,
MAX_WRITE_BATCH_SIZE,
BATCH_WRITE_OPTIONS
))
.subscribeOn(dbScheduler)
.flatMapMany(writeBatch -> Mono
.fromCallable(() -> {
@ -478,22 +483,18 @@ public class LLLocalDictionary implements LLDictionary {
.subscribeOn(dbScheduler)
.thenMany(entries)
.flatMap(newEntry -> putEntryToWriteBatch(newEntry, getOldValues, writeBatch))
.concatWith(Mono
.<Entry<byte[], byte[]>>fromCallable(() -> {
.concatWith(Mono.<Entry<byte[], byte[]>>fromCallable(() -> {
synchronized (writeBatch) {
writeBatch.writeToDbAndClose();
writeBatch.close();
}
return null;
})
.subscribeOn(dbScheduler)
)
}).subscribeOn(dbScheduler))
.doFinally(signalType -> {
synchronized (writeBatch) {
writeBatch.close();
}
})
)
}))
.onErrorMap(IOException::new);
}
}

View File

@ -174,7 +174,8 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
options.setWalTtlSeconds(30); // flush wal after 30 seconds
options.setAvoidFlushDuringShutdown(false); // Flush all WALs during shutdown
options.setAvoidFlushDuringRecovery(false); // Flush all WALs during startup
options.setWalRecoveryMode(crashIfWalError ? WALRecoveryMode.AbsoluteConsistency
options.setWalRecoveryMode(crashIfWalError
? WALRecoveryMode.AbsoluteConsistency
: WALRecoveryMode.PointInTimeRecovery); // Crash if the WALs are corrupted.Default: TolerateCorruptedTailRecords
options.setDeleteObsoleteFilesPeriodMicros(20 * 1000000); // 20 seconds
options.setPreserveDeletes(false);

View File

@ -314,7 +314,10 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
.flatMap(query -> Mono
.fromCallable(() -> {
One<Long> totalHitsCountSink = Sinks.one();
Many<LLKeyScore> topKeysSink = Sinks.many().unicast().onBackpressureBuffer(new ArrayBlockingQueue<>(1000));
Many<LLKeyScore> topKeysSink = Sinks
.many()
.unicast()
.onBackpressureBuffer(new ArrayBlockingQueue<>(1000));
streamSearcher.search(indexSearcher,
query,
@ -366,7 +369,10 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
ScoreMode luceneScoreMode = tuple.getT3();
One<Long> totalHitsCountSink = Sinks.one();
Many<LLKeyScore> topKeysSink = Sinks.many().unicast().onBackpressureBuffer(new ArrayBlockingQueue<>(PagedStreamSearcher.MAX_ITEMS_PER_PAGE));
Many<LLKeyScore> topKeysSink = Sinks
.many()
.unicast()
.onBackpressureBuffer(new ArrayBlockingQueue<>(PagedStreamSearcher.MAX_ITEMS_PER_PAGE));
streamSearcher.search(indexSearcher,
query,

View File

@ -196,7 +196,8 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex {
public Mono<LLSnapshot> takeSnapshot() {
return Mono
.fromCallable(() -> {
CopyOnWriteArrayList<LLSnapshot> instancesSnapshots = new CopyOnWriteArrayList<>(new LLSnapshot[luceneIndices.length]);
CopyOnWriteArrayList<LLSnapshot> instancesSnapshots
= new CopyOnWriteArrayList<>(new LLSnapshot[luceneIndices.length]);
var snapIndex = nextSnapshotNumber.getAndIncrement();
ParallelUtils.parallelizeIO((IOBiConsumer<LLLuceneIndex, Integer> s) -> {