2021-05-02 19:18:15 +02:00
|
|
|
package it.cavallium.dbengine;
|
2021-01-31 12:02:02 +01:00
|
|
|
|
2021-03-14 03:13:19 +01:00
|
|
|
import static it.cavallium.dbengine.client.CompositeDatabasePartLocation.CompositeDatabasePartType.KV_DATABASE;
|
|
|
|
|
2021-04-30 19:15:04 +02:00
|
|
|
import io.netty.buffer.ByteBuf;
|
2021-05-03 21:41:51 +02:00
|
|
|
import io.netty.buffer.PooledByteBufAllocator;
|
2021-04-30 19:15:04 +02:00
|
|
|
import io.netty.buffer.Unpooled;
|
2021-05-02 19:18:15 +02:00
|
|
|
import it.cavallium.dbengine.client.CompositeDatabasePartLocation;
|
|
|
|
import it.cavallium.dbengine.client.CompositeSnapshot;
|
2021-03-14 03:13:19 +01:00
|
|
|
import it.cavallium.dbengine.database.Column;
|
|
|
|
import it.cavallium.dbengine.database.LLKeyValueDatabase;
|
|
|
|
import it.cavallium.dbengine.database.UpdateMode;
|
2021-04-30 19:15:04 +02:00
|
|
|
import it.cavallium.dbengine.database.collections.DatabaseMapDictionary;
|
2021-03-14 03:13:19 +01:00
|
|
|
import it.cavallium.dbengine.database.collections.DatabaseMapDictionaryDeep;
|
|
|
|
import it.cavallium.dbengine.database.collections.SubStageGetterMap;
|
2021-07-01 21:19:52 +02:00
|
|
|
import it.cavallium.dbengine.client.DatabaseOptions;
|
2021-03-14 03:13:19 +01:00
|
|
|
import it.cavallium.dbengine.database.disk.LLLocalDatabaseConnection;
|
|
|
|
import it.cavallium.dbengine.database.serialization.Serializer;
|
|
|
|
import it.cavallium.dbengine.database.serialization.SerializerFixedBinaryLength;
|
|
|
|
import java.io.IOException;
|
|
|
|
import java.nio.charset.StandardCharsets;
|
|
|
|
import java.nio.file.Files;
|
|
|
|
import java.nio.file.Path;
|
|
|
|
import java.util.Comparator;
|
|
|
|
import java.util.LinkedHashSet;
|
|
|
|
import java.util.List;
|
|
|
|
import java.util.Map;
|
|
|
|
import java.util.Map.Entry;
|
|
|
|
import java.util.concurrent.CompletionException;
|
|
|
|
import java.util.stream.Collectors;
|
|
|
|
import org.jetbrains.annotations.NotNull;
|
|
|
|
import org.junit.jupiter.api.Test;
|
|
|
|
import reactor.core.publisher.Flux;
|
|
|
|
import reactor.core.publisher.Mono;
|
|
|
|
import reactor.core.scheduler.Schedulers;
|
|
|
|
import reactor.test.StepVerifier;
|
|
|
|
import reactor.util.function.Tuples;
|
|
|
|
|
2021-04-30 19:15:04 +02:00
|
|
|
public class OldDatabaseTests {
|
|
|
|
|
|
|
|
@Test
|
|
|
|
public void testDatabaseAddKeysAndCheckSize() {
|
|
|
|
LinkedHashSet<String> originalKeys = new LinkedHashSet<>(List.of("K1a", "K1b", "K1c"));
|
|
|
|
|
|
|
|
StepVerifier
|
|
|
|
.create(
|
|
|
|
tempDb()
|
|
|
|
.flatMap(db -> db
|
|
|
|
.getDictionary("testmap", UpdateMode.DISALLOW)
|
|
|
|
.map(dictionary -> DatabaseMapDictionary.simple(dictionary,
|
|
|
|
new FixedStringSerializer(3),
|
|
|
|
Serializer.noop()
|
|
|
|
))
|
|
|
|
.flatMap(collection -> Flux
|
|
|
|
.fromIterable(originalKeys)
|
|
|
|
.flatMap(k1 -> collection.putValue(k1, DUMMY_VALUE))
|
|
|
|
.then(collection.leavesCount(null, false))
|
|
|
|
)
|
|
|
|
)
|
|
|
|
)
|
|
|
|
.expectNext((long) originalKeys.size())
|
|
|
|
.verifyComplete();
|
|
|
|
}
|
|
|
|
|
|
|
|
@Test
|
|
|
|
public void testDeepDatabaseAddKeysAndCheckSize() {
|
|
|
|
LinkedHashSet<String> originalSuperKeys = new LinkedHashSet<>(List.of("K1a", "K1b", "K1c"));
|
|
|
|
LinkedHashSet<String> originalSubKeys = new LinkedHashSet<>(List.of("K2aa", "K2bb", "K2cc"));
|
|
|
|
|
|
|
|
StepVerifier
|
|
|
|
.create(
|
|
|
|
tempDb()
|
|
|
|
.flatMap(db -> db
|
|
|
|
.getDictionary("testmap", UpdateMode.DISALLOW)
|
|
|
|
.map(dictionary -> DatabaseMapDictionaryDeep.deepTail(dictionary,
|
|
|
|
new FixedStringSerializer(3),
|
|
|
|
4,
|
2021-07-18 19:37:24 +02:00
|
|
|
new SubStageGetterMap<>(new FixedStringSerializer(4), Serializer.noop(), true)
|
2021-04-30 19:15:04 +02:00
|
|
|
))
|
|
|
|
.flatMap(collection -> Flux
|
|
|
|
.fromIterable(originalSuperKeys)
|
|
|
|
.flatMap(k1 -> collection.at(null, k1))
|
|
|
|
.flatMap(k1at -> Flux
|
|
|
|
.fromIterable(originalSubKeys)
|
|
|
|
.flatMap(k2 -> k1at.putValue(k2, DUMMY_VALUE))
|
|
|
|
)
|
|
|
|
.then(collection.leavesCount(null, false))
|
|
|
|
)
|
|
|
|
)
|
|
|
|
)
|
|
|
|
.expectNext((long) originalSuperKeys.size() * originalSubKeys.size())
|
|
|
|
.verifyComplete();
|
|
|
|
}
|
2021-01-31 12:43:28 +01:00
|
|
|
|
2021-03-14 03:13:19 +01:00
|
|
|
@Test
|
|
|
|
public void testDeepDatabaseAddKeysAndConvertToLongerOnes() {
|
|
|
|
LinkedHashSet<String> originalSuperKeys = new LinkedHashSet<>(List.of("K1a", "K1b", "K1c"));
|
|
|
|
LinkedHashSet<String> originalSubKeys = new LinkedHashSet<>(List.of("K2aa", "K2bb", "K2cc"));
|
|
|
|
String newPrefix = "xxx";
|
|
|
|
|
|
|
|
StepVerifier
|
|
|
|
.create(
|
|
|
|
tempDb()
|
|
|
|
.flatMapMany(db -> addKeysAndConvertToLongerOnes(db, originalSuperKeys, originalSubKeys, newPrefix))
|
|
|
|
)
|
|
|
|
.expectNextSequence(originalSuperKeys
|
|
|
|
.stream()
|
|
|
|
.flatMap(superKey -> originalSubKeys
|
|
|
|
.stream()
|
|
|
|
.map(subKey -> Map.entry(newPrefix + superKey, newPrefix + subKey)))
|
|
|
|
.collect(Collectors.toList())
|
|
|
|
)
|
|
|
|
.verifyComplete();
|
|
|
|
}
|
|
|
|
|
|
|
|
public static <U> Mono<? extends LLKeyValueDatabase> tempDb() {
|
2021-04-30 19:15:04 +02:00
|
|
|
var wrkspcPath = Path.of("/tmp/.cache/tempdb-" + DbTestUtils.dbId.incrementAndGet() + "/");
|
2021-03-14 03:13:19 +01:00
|
|
|
return Mono
|
|
|
|
.fromCallable(() -> {
|
|
|
|
if (Files.exists(wrkspcPath)) {
|
|
|
|
Files.walk(wrkspcPath)
|
|
|
|
.sorted(Comparator.reverseOrder())
|
|
|
|
.forEach(file -> {
|
|
|
|
try {
|
|
|
|
Files.delete(file);
|
|
|
|
} catch (IOException ex) {
|
|
|
|
throw new CompletionException(ex);
|
|
|
|
}
|
|
|
|
});
|
|
|
|
}
|
|
|
|
Files.createDirectories(wrkspcPath);
|
|
|
|
return null;
|
|
|
|
})
|
|
|
|
.subscribeOn(Schedulers.boundedElastic())
|
2021-06-27 15:40:56 +02:00
|
|
|
.then(new LLLocalDatabaseConnection(PooledByteBufAllocator.DEFAULT, wrkspcPath).connect())
|
|
|
|
.flatMap(conn -> conn.getDatabase("testdb",
|
|
|
|
List.of(Column.dictionary("testmap")),
|
2021-07-18 19:37:24 +02:00
|
|
|
new DatabaseOptions(Map.of(), true, false, true, false, true, true, true, true)
|
2021-06-27 15:40:56 +02:00
|
|
|
));
|
2021-03-14 03:13:19 +01:00
|
|
|
}
|
|
|
|
|
2021-04-30 19:15:04 +02:00
|
|
|
private static final ByteBuf DUMMY_VALUE;
|
|
|
|
static {
|
|
|
|
ByteBuf buf = Unpooled.directBuffer(2, 2);
|
|
|
|
buf.writeByte(0x01);
|
|
|
|
buf.writeByte(0x03);
|
|
|
|
DUMMY_VALUE = buf;
|
|
|
|
}
|
2021-03-14 03:13:19 +01:00
|
|
|
|
|
|
|
private Flux<Entry<String, String>> addKeysAndConvertToLongerOnes(LLKeyValueDatabase db,
|
|
|
|
LinkedHashSet<String> originalSuperKeys,
|
|
|
|
LinkedHashSet<String> originalSubKeys,
|
|
|
|
String newPrefix) {
|
|
|
|
return Flux
|
|
|
|
.defer(() -> Mono
|
|
|
|
.zip(
|
|
|
|
db
|
|
|
|
.getDictionary("testmap", UpdateMode.DISALLOW)
|
|
|
|
.map(dictionary -> DatabaseMapDictionaryDeep.deepTail(dictionary,
|
|
|
|
new FixedStringSerializer(3),
|
|
|
|
4,
|
2021-07-18 19:37:24 +02:00
|
|
|
new SubStageGetterMap<>(new FixedStringSerializer(4), Serializer.noop(), true)
|
2021-03-14 03:13:19 +01:00
|
|
|
)),
|
|
|
|
db
|
|
|
|
.getDictionary("testmap", UpdateMode.DISALLOW)
|
|
|
|
.map(dictionary -> DatabaseMapDictionaryDeep.deepTail(dictionary,
|
|
|
|
new FixedStringSerializer(6),
|
|
|
|
7,
|
2021-07-18 19:37:24 +02:00
|
|
|
new SubStageGetterMap<>(new FixedStringSerializer(7), Serializer.noop(), true)
|
2021-03-14 03:13:19 +01:00
|
|
|
))
|
|
|
|
)
|
|
|
|
.single()
|
|
|
|
.flatMap(tuple -> {
|
|
|
|
var db1 = tuple.getT1();
|
|
|
|
return Flux
|
|
|
|
.fromIterable(originalSuperKeys)
|
|
|
|
.flatMapSequential(superKey -> db1.at(null, superKey))
|
|
|
|
.flatMapSequential(at -> Flux
|
|
|
|
.fromIterable(originalSubKeys)
|
|
|
|
.flatMapSequential(subKey -> at.at(null, subKey))
|
|
|
|
.flatMapSequential(at2 -> at2.set(DUMMY_VALUE))
|
|
|
|
)
|
|
|
|
.then(db
|
|
|
|
.takeSnapshot()
|
|
|
|
.map(snapshot -> new CompositeSnapshot(Map.of(CompositeDatabasePartLocation.of(KV_DATABASE,
|
|
|
|
db.getDatabaseName()), snapshot)))
|
|
|
|
)
|
|
|
|
.map(snapshot -> Tuples.of(tuple.getT1(), tuple.getT2(), snapshot))
|
|
|
|
.single();
|
|
|
|
})
|
|
|
|
.single()
|
|
|
|
.flatMap(tuple -> tuple.getT1().clear().thenReturn(tuple))
|
|
|
|
.flatMap(tuple -> tuple
|
|
|
|
.getT1()
|
|
|
|
.leavesCount(null, false)
|
|
|
|
.flatMap(count -> count == 0 ? Mono.just(tuple) : Mono.error(new IllegalStateException(
|
|
|
|
"Failed to clear map. Remaining elements after clear: " + count)))
|
|
|
|
)
|
|
|
|
.flatMapMany(tuple -> {
|
|
|
|
var oldDb = tuple.getT1();
|
|
|
|
var newDb = tuple.getT2();
|
|
|
|
var snapshot = tuple.getT3();
|
|
|
|
|
|
|
|
return oldDb
|
|
|
|
.getAllStages(snapshot)
|
|
|
|
.flatMapSequential(parentEntry -> Mono
|
|
|
|
.fromCallable(() -> newPrefix + parentEntry.getKey())
|
|
|
|
.flatMapMany(newId1 -> parentEntry.getValue()
|
|
|
|
.getAllValues(snapshot)
|
|
|
|
.flatMapSequential(entry -> Mono
|
|
|
|
.fromCallable(() -> newPrefix + entry.getKey())
|
|
|
|
.flatMap(newId2 -> newDb
|
|
|
|
.at(null, newId1)
|
|
|
|
.flatMap(newStage -> newStage.putValue(newId2, entry.getValue()))
|
|
|
|
.thenReturn(Map.entry(newId1, newId2))
|
|
|
|
)
|
|
|
|
)
|
|
|
|
)
|
|
|
|
)
|
|
|
|
.concatWith(db
|
|
|
|
.releaseSnapshot(snapshot.getSnapshot(db))
|
|
|
|
.then(oldDb.close())
|
|
|
|
.then(newDb.close())
|
|
|
|
.then(Mono.empty())
|
|
|
|
);
|
|
|
|
})
|
|
|
|
);
|
|
|
|
}
|
|
|
|
|
2021-04-30 19:15:04 +02:00
|
|
|
private static class FixedStringSerializer implements SerializerFixedBinaryLength<String, ByteBuf> {
|
2021-03-14 03:13:19 +01:00
|
|
|
|
|
|
|
private final int size;
|
|
|
|
|
|
|
|
public FixedStringSerializer(int i) {
|
|
|
|
this.size = i;
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public int getSerializedBinaryLength() {
|
|
|
|
return size;
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
2021-04-30 19:15:04 +02:00
|
|
|
public @NotNull String deserialize(ByteBuf serialized) {
|
|
|
|
try {
|
|
|
|
return serialized.toString(StandardCharsets.US_ASCII);
|
|
|
|
} finally {
|
|
|
|
serialized.release();
|
|
|
|
}
|
2021-03-14 03:13:19 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
2021-04-30 19:15:04 +02:00
|
|
|
public ByteBuf serialize(@NotNull String deserialized) {
|
|
|
|
var serialized = deserialized.getBytes(StandardCharsets.US_ASCII);
|
|
|
|
var serializedBuf = Unpooled.directBuffer(serialized.length, serialized.length);
|
|
|
|
serializedBuf.writeBytes(serialized);
|
|
|
|
assert serializedBuf.isDirect();
|
|
|
|
return serializedBuf;
|
2021-03-14 03:13:19 +01:00
|
|
|
}
|
|
|
|
}
|
2021-01-31 12:43:28 +01:00
|
|
|
}
|