diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java index 8ba96b3..83c17e0 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java @@ -21,7 +21,10 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep valueSerializer; - protected DatabaseMapDictionary(LLDictionary dictionary, byte[] prefixKey, SerializerFixedBinaryLength keySuffixSerializer, Serializer valueSerializer) { + protected DatabaseMapDictionary(LLDictionary dictionary, + byte[] prefixKey, + SerializerFixedBinaryLength keySuffixSerializer, + Serializer valueSerializer) { super(dictionary, new SubStageGetterSingle<>(valueSerializer), keySuffixSerializer, prefixKey, 0); this.valueSerializer = valueSerializer; } @@ -50,7 +53,10 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep> get(@Nullable CompositeSnapshot snapshot) { return dictionary .getRange(resolveSnapshot(snapshot), range) - .collectMap(entry -> deserializeSuffix(stripPrefix(entry.getKey())), entry -> deserialize(entry.getValue()), HashMap::new); + .collectMap( + entry -> deserializeSuffix(stripPrefix(entry.getKey())), + entry -> deserialize(entry.getValue()), + HashMap::new); } @Override @@ -62,7 +68,10 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep Map.entry(serializeSuffix(entry.getKey()), serialize(entry.getValue()))), true ) - .collectMap(entry -> deserializeSuffix(stripPrefix(entry.getKey())), entry -> deserialize(entry.getValue()), HashMap::new); + .collectMap( + entry -> deserializeSuffix(stripPrefix(entry.getKey())), + entry -> deserialize(entry.getValue()), + HashMap::new); } private Entry stripPrefix(Entry entry) { @@ -74,7 +83,10 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep> clearAndGetPrevious() { return dictionary .setRange(range, Flux.empty(), true) - .collectMap(entry -> deserializeSuffix(stripPrefix(entry.getKey())), entry -> deserialize(entry.getValue()), HashMap::new); + .collectMap( + entry -> deserializeSuffix(stripPrefix(entry.getKey())), + entry -> deserialize(entry.getValue()), + HashMap::new); } @Override @@ -84,7 +96,9 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep> at(@Nullable CompositeSnapshot snapshot, T keySuffix) { - return Mono.just(new DatabaseSingle<>(dictionary, toKey(serializeSuffix(keySuffix)), Serializer.noopBytes())).map(entry -> new DatabaseSingleMapped<>(entry, valueSerializer)); + return Mono + .just(new DatabaseSingle<>(dictionary, toKey(serializeSuffix(keySuffix)), Serializer.noopBytes())) + .map(entry -> new DatabaseSingleMapped<>(entry, valueSerializer)); } @Override @@ -141,7 +155,8 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep putMulti(Flux> entries) { return dictionary .putMulti(entries - .map(entry -> Map.entry(toKey(serializeSuffix(entry.getKey())), serialize(entry.getValue()))), false) + .map(entry -> Map + .entry(toKey(serializeSuffix(entry.getKey())), serialize(entry.getValue()))), false) .then(); } @@ -150,7 +165,10 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep Map.entry(deserializeSuffix(stripPrefix(keySuffix)), - new DatabaseSingleMapped<>(new DatabaseSingle<>(dictionary, toKey(stripPrefix(keySuffix)), Serializer.noopBytes()), + new DatabaseSingleMapped<>( + new DatabaseSingle<>(dictionary, + toKey(stripPrefix(keySuffix)), + Serializer.noopBytes()), valueSerializer ) )); diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryDeep.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryDeep.java index 43a6b4c..eb373ad 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryDeep.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryDeep.java @@ -35,7 +35,11 @@ public class DatabaseMapDictionaryDeep> implem return fillKeySuffixAndExt(prefixKey, prefixLength, suffixLength, extLength, (byte) 0xFF); } - protected static byte[] fillKeySuffixAndExt(byte[] prefixKey, int prefixLength, int suffixLength, int extLength, byte fillValue) { + protected static byte[] fillKeySuffixAndExt(byte[] prefixKey, + int prefixLength, + int suffixLength, + int extLength, + byte fillValue) { assert prefixKey.length == prefixLength; assert suffixLength > 0; assert extLength > 0; @@ -44,11 +48,19 @@ public class DatabaseMapDictionaryDeep> implem return result; } - protected static byte[] firstKey(byte[] prefixKey, byte[] suffixKey, int prefixLength, int suffixLength, int extLength) { + protected static byte[] firstKey(byte[] prefixKey, + byte[] suffixKey, + int prefixLength, + int suffixLength, + int extLength) { return fillKeyExt(prefixKey, suffixKey, prefixLength, suffixLength, extLength, (byte) 0x00); } - protected static byte[] lastKey(byte[] prefixKey, byte[] suffixKey, int prefixLength, int suffixLength, int extLength) { + protected static byte[] lastKey(byte[] prefixKey, + byte[] suffixKey, + int prefixLength, + int suffixLength, + int extLength) { return fillKeyExt(prefixKey, suffixKey, prefixLength, suffixLength, extLength, (byte) 0xFF); } @@ -72,20 +84,23 @@ public class DatabaseMapDictionaryDeep> implem * Use DatabaseMapDictionaryRange.simple instead */ @Deprecated - public static DatabaseMapDictionaryDeep> simple(LLDictionary dictionary, + public static DatabaseMapDictionaryDeep> simple( + LLDictionary dictionary, SubStageGetterSingle subStageGetter, SerializerFixedBinaryLength keySerializer) { return new DatabaseMapDictionaryDeep<>(dictionary, subStageGetter, keySerializer, EMPTY_BYTES, 0); } - public static > DatabaseMapDictionaryDeep deepTail(LLDictionary dictionary, + public static > DatabaseMapDictionaryDeep deepTail( + LLDictionary dictionary, SubStageGetter subStageGetter, SerializerFixedBinaryLength keySerializer, int keyExtLength) { return new DatabaseMapDictionaryDeep<>(dictionary, subStageGetter, keySerializer, EMPTY_BYTES, keyExtLength); } - public static > DatabaseMapDictionaryDeep deepIntermediate(LLDictionary dictionary, + public static > DatabaseMapDictionaryDeep deepIntermediate( + LLDictionary dictionary, SubStageGetter subStageGetter, SerializerFixedBinaryLength keySuffixSerializer, byte[] prefixKey, diff --git a/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetter.java b/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetter.java index 49eda7e..5b47ef8 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetter.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetter.java @@ -8,5 +8,8 @@ import reactor.core.publisher.Mono; public interface SubStageGetter> { - Mono subStage(LLDictionary dictionary, @Nullable CompositeSnapshot snapshot, byte[] prefixKey, Flux keyFlux); + Mono subStage(LLDictionary dictionary, + @Nullable CompositeSnapshot snapshot, + byte[] prefixKey, + Flux keyFlux); } diff --git a/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterMap.java b/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterMap.java index 43245cd..de36ec9 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterMap.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterMap.java @@ -13,7 +13,8 @@ public class SubStageGetterMap implements SubStageGetter, Databa private final SerializerFixedBinaryLength keySerializer; private final Serializer valueSerializer; - public SubStageGetterMap(SerializerFixedBinaryLength keySerializer, Serializer valueSerializer) { + public SubStageGetterMap(SerializerFixedBinaryLength keySerializer, + Serializer valueSerializer) { this.keySerializer = keySerializer; this.valueSerializer = valueSerializer; } diff --git a/src/main/java/it/cavallium/dbengine/database/disk/CappedWriteBatch.java b/src/main/java/it/cavallium/dbengine/database/disk/CappedWriteBatch.java index a3b93ec..563eff9 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/CappedWriteBatch.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/CappedWriteBatch.java @@ -19,10 +19,13 @@ public class CappedWriteBatch implements WriteBatchInterface, AutoCloseable { private final WriteBatch writeBatch; /** - * * @param cap The limit of operations */ - public CappedWriteBatch(RocksDB db, int cap, int reservedWriteBatchSize, long maxWriteBatchSize, WriteOptions writeOptions) { + public CappedWriteBatch(RocksDB db, + int cap, + int reservedWriteBatchSize, + long maxWriteBatchSize, + WriteOptions writeOptions) { this.db = db; this.cap = cap; this.writeOptions = writeOptions; diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java index 793ead1..0e3ea3b 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java @@ -452,7 +452,12 @@ public class LLLocalDictionary implements LLDictionary { return clear().thenMany(Flux.empty()); } else { return Mono - .fromCallable(() -> new CappedWriteBatch(db, CAPPED_WRITE_BATCH_CAP, RESERVED_WRITE_BATCH_SIZE, MAX_WRITE_BATCH_SIZE, BATCH_WRITE_OPTIONS)) + .fromCallable(() -> new CappedWriteBatch(db, + CAPPED_WRITE_BATCH_CAP, + RESERVED_WRITE_BATCH_SIZE, + MAX_WRITE_BATCH_SIZE, + BATCH_WRITE_OPTIONS + )) .subscribeOn(dbScheduler) .flatMapMany(writeBatch -> Mono .fromCallable(() -> { @@ -478,22 +483,18 @@ public class LLLocalDictionary implements LLDictionary { .subscribeOn(dbScheduler) .thenMany(entries) .flatMap(newEntry -> putEntryToWriteBatch(newEntry, getOldValues, writeBatch)) - .concatWith(Mono - .>fromCallable(() -> { - synchronized (writeBatch) { - writeBatch.writeToDbAndClose(); - writeBatch.close(); - } - return null; - }) - .subscribeOn(dbScheduler) - ) + .concatWith(Mono.>fromCallable(() -> { + synchronized (writeBatch) { + writeBatch.writeToDbAndClose(); + writeBatch.close(); + } + return null; + }).subscribeOn(dbScheduler)) .doFinally(signalType -> { synchronized (writeBatch) { writeBatch.close(); } - }) - ) + })) .onErrorMap(IOException::new); } } diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java index 5375000..96be045 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java @@ -174,8 +174,9 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { options.setWalTtlSeconds(30); // flush wal after 30 seconds options.setAvoidFlushDuringShutdown(false); // Flush all WALs during shutdown options.setAvoidFlushDuringRecovery(false); // Flush all WALs during startup - options.setWalRecoveryMode(crashIfWalError ? WALRecoveryMode.AbsoluteConsistency - : WALRecoveryMode.PointInTimeRecovery); // Crash if the WALs are corrupted. Default: TolerateCorruptedTailRecords + options.setWalRecoveryMode(crashIfWalError + ? WALRecoveryMode.AbsoluteConsistency + : WALRecoveryMode.PointInTimeRecovery); // Crash if the WALs are corrupted.Default: TolerateCorruptedTailRecords options.setDeleteObsoleteFilesPeriodMicros(20 * 1000000); // 20 seconds options.setPreserveDeletes(false); options.setKeepLogFileNum(10); diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java index 6ddf833..9bfb58c 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java @@ -314,7 +314,10 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { .flatMap(query -> Mono .fromCallable(() -> { One totalHitsCountSink = Sinks.one(); - Many topKeysSink = Sinks.many().unicast().onBackpressureBuffer(new ArrayBlockingQueue<>(1000)); + Many topKeysSink = Sinks + .many() + .unicast() + .onBackpressureBuffer(new ArrayBlockingQueue<>(1000)); streamSearcher.search(indexSearcher, query, @@ -366,7 +369,10 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { ScoreMode luceneScoreMode = tuple.getT3(); One totalHitsCountSink = Sinks.one(); - Many topKeysSink = Sinks.many().unicast().onBackpressureBuffer(new ArrayBlockingQueue<>(PagedStreamSearcher.MAX_ITEMS_PER_PAGE)); + Many topKeysSink = Sinks + .many() + .unicast() + .onBackpressureBuffer(new ArrayBlockingQueue<>(PagedStreamSearcher.MAX_ITEMS_PER_PAGE)); streamSearcher.search(indexSearcher, query, diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMultiLuceneIndex.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMultiLuceneIndex.java index 0f222c5..87f20b2 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMultiLuceneIndex.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMultiLuceneIndex.java @@ -196,7 +196,8 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex { public Mono takeSnapshot() { return Mono .fromCallable(() -> { - CopyOnWriteArrayList instancesSnapshots = new CopyOnWriteArrayList<>(new LLSnapshot[luceneIndices.length]); + CopyOnWriteArrayList instancesSnapshots + = new CopyOnWriteArrayList<>(new LLSnapshot[luceneIndices.length]); var snapIndex = nextSnapshotNumber.getAndIncrement(); ParallelUtils.parallelizeIO((IOBiConsumer s) -> {