Add sets
This commit is contained in:
parent
d86c92cb61
commit
059da90ef4
@ -0,0 +1,42 @@
|
||||
package it.cavallium.dbengine.database.collections;
|
||||
|
||||
import it.cavallium.dbengine.database.LLDictionary;
|
||||
import it.cavallium.dbengine.database.serialization.Serializer;
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
|
||||
public class DatabaseEmpty {
|
||||
|
||||
@SuppressWarnings("unused")
|
||||
public static final Nothing NOTHING = new Nothing();
|
||||
private static final byte[] NOTHING_BYTES = new byte[0];
|
||||
private static final Serializer<Nothing, byte[]> NOTHING_SERIALIZER = new Serializer<>() {
|
||||
@Override
|
||||
public @NotNull Nothing deserialize(byte @NotNull [] serialized) {
|
||||
return NOTHING;
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte @NotNull [] serialize(@NotNull Nothing deserialized) {
|
||||
return NOTHING_BYTES;
|
||||
}
|
||||
};
|
||||
private static final SubStageGetter<Nothing, DatabaseStageEntry<Nothing>> NOTHING_SUB_STAGE_GETTER = new SubStageGetterSingle<>(NOTHING_SERIALIZER);
|
||||
|
||||
private DatabaseEmpty() {
|
||||
}
|
||||
|
||||
public static DatabaseStageEntry<Nothing> create(LLDictionary dictionary, byte[] key) {
|
||||
return new DatabaseSingle<>(dictionary, key, NOTHING_SERIALIZER);
|
||||
}
|
||||
|
||||
public static SubStageGetter<Nothing, DatabaseStageEntry<Nothing>> createSubStageGetter() {
|
||||
return NOTHING_SUB_STAGE_GETTER;
|
||||
}
|
||||
|
||||
public static final class Nothing {
|
||||
|
||||
private Nothing() {
|
||||
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,47 @@
|
||||
package it.cavallium.dbengine.database.collections;
|
||||
|
||||
import it.cavallium.dbengine.client.CompositeSnapshot;
|
||||
import it.cavallium.dbengine.database.LLDictionary;
|
||||
import it.cavallium.dbengine.database.collections.DatabaseEmpty.Nothing;
|
||||
import it.cavallium.dbengine.database.serialization.SerializerFixedBinaryLength;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import org.jetbrains.annotations.Nullable;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
public class DatabaseSetDictionary<T> extends DatabaseMapDictionaryDeep<T, Nothing, DatabaseStageEntry<Nothing>> {
|
||||
|
||||
protected DatabaseSetDictionary(LLDictionary dictionary,
|
||||
byte[] prefixKey,
|
||||
SerializerFixedBinaryLength<T, byte[]> keySuffixSerializer) {
|
||||
super(dictionary, prefixKey, keySuffixSerializer, DatabaseEmpty.createSubStageGetter(), 0);
|
||||
}
|
||||
|
||||
public static <T> DatabaseSetDictionary<T> simple(LLDictionary dictionary,
|
||||
SerializerFixedBinaryLength<T, byte[]> keySerializer) {
|
||||
return new DatabaseSetDictionary<>(dictionary, EMPTY_BYTES, keySerializer);
|
||||
}
|
||||
|
||||
public static <T> DatabaseSetDictionary<T> tail(LLDictionary dictionary,
|
||||
byte[] prefixKey,
|
||||
SerializerFixedBinaryLength<T, byte[]> keySuffixSerializer) {
|
||||
return new DatabaseSetDictionary<>(dictionary, prefixKey, keySuffixSerializer);
|
||||
}
|
||||
|
||||
public Mono<Set<T>> getKeySet(@Nullable CompositeSnapshot snapshot) {
|
||||
return get(snapshot).map(Map::keySet);
|
||||
}
|
||||
|
||||
public Mono<Set<T>> setAndGetPreviousKeySet(Set<T> value) {
|
||||
var hm = new HashMap<T, Nothing>();
|
||||
for (T t : value) {
|
||||
hm.put(t, DatabaseEmpty.NOTHING);
|
||||
}
|
||||
return setAndGetPrevious(hm).map(Map::keySet);
|
||||
}
|
||||
|
||||
public Mono<Set<T>> clearAndGetPreviousKeySet() {
|
||||
return clearAndGetPrevious().map(Map::keySet);
|
||||
}
|
||||
}
|
@ -0,0 +1,56 @@
|
||||
package it.cavallium.dbengine.database.collections;
|
||||
|
||||
import it.cavallium.dbengine.client.CompositeSnapshot;
|
||||
import it.cavallium.dbengine.database.LLDictionary;
|
||||
import it.cavallium.dbengine.database.collections.DatabaseEmpty.Nothing;
|
||||
import it.cavallium.dbengine.database.serialization.SerializerFixedBinaryLength;
|
||||
import java.util.Map;
|
||||
import org.jetbrains.annotations.Nullable;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
public class SubStageGetterSet<T> implements SubStageGetter<Map<T, Nothing>, DatabaseSetDictionary<T>> {
|
||||
|
||||
private static final boolean assertsEnabled;
|
||||
static {
|
||||
boolean assertsEnabledTmp = false;
|
||||
//noinspection AssertWithSideEffects
|
||||
assert assertsEnabledTmp = true;
|
||||
//noinspection ConstantConditions
|
||||
assertsEnabled = assertsEnabledTmp;
|
||||
}
|
||||
|
||||
private final SerializerFixedBinaryLength<T, byte[]> keySerializer;
|
||||
|
||||
public SubStageGetterSet(SerializerFixedBinaryLength<T, byte[]> keySerializer) {
|
||||
this.keySerializer = keySerializer;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<DatabaseSetDictionary<T>> subStage(LLDictionary dictionary,
|
||||
@Nullable CompositeSnapshot snapshot,
|
||||
byte[] prefixKey,
|
||||
Flux<byte[]> keyFlux) {
|
||||
Mono<DatabaseSetDictionary<T>> result = Mono.just(DatabaseSetDictionary.tail(dictionary, prefixKey, keySerializer));
|
||||
if (assertsEnabled) {
|
||||
return checkKeyFluxConsistency(prefixKey, keyFlux).then(result);
|
||||
} else {
|
||||
return result;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean needsKeyFlux() {
|
||||
return assertsEnabled;
|
||||
}
|
||||
|
||||
private Mono<Void> checkKeyFluxConsistency(byte[] prefixKey, Flux<byte[]> keyFlux) {
|
||||
return keyFlux.doOnNext(key -> {
|
||||
assert key.length == prefixKey.length + getKeyBinaryLength();
|
||||
}).then();
|
||||
}
|
||||
|
||||
public int getKeyBinaryLength() {
|
||||
return keySerializer.getSerializedBinaryLength();
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user