Fix map tests

This commit is contained in:
Andrea Cavalli 2021-09-02 17:15:40 +02:00
parent ff7823656e
commit 3091b81d34
27 changed files with 277 additions and 190 deletions

View File

@ -14,4 +14,11 @@ public record DatabaseOptions(Map<String, String> extraFlags,
boolean allowMemoryMapping, boolean allowMemoryMapping,
boolean allowNettyDirect, boolean allowNettyDirect,
boolean useNettyDirect, boolean useNettyDirect,
int maxOpenFiles) {} int maxOpenFiles) {
public DatabaseOptions {
if (useNettyDirect && !allowNettyDirect) {
throw new IllegalArgumentException("If allowNettyDirect is false, you must also set useNettyDirect to false");
}
}
}

View File

@ -6,21 +6,22 @@ import it.cavallium.dbengine.database.serialization.SerializationException;
import it.cavallium.dbengine.database.serialization.Serializer; import it.cavallium.dbengine.database.serialization.Serializer;
import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.NotNull;
public class MappedSerializer<A, B> implements Serializer<B, Send<Buffer>> { public class MappedSerializer<A, B> implements Serializer<B> {
private final Serializer<A, Send<Buffer>> serializer; private final Serializer<A> serializer;
private final Mapper<A, B> keyMapper; private final Mapper<A, B> keyMapper;
public MappedSerializer(Serializer<A, Send<Buffer>> serializer, public MappedSerializer(Serializer<A> serializer,
Mapper<A, B> keyMapper) { Mapper<A, B> keyMapper) {
this.serializer = serializer; this.serializer = serializer;
this.keyMapper = keyMapper; this.keyMapper = keyMapper;
} }
@Override @Override
public @NotNull B deserialize(@NotNull Send<Buffer> serialized) throws SerializationException { public @NotNull DeserializationResult<B> deserialize(@NotNull Send<Buffer> serialized) throws SerializationException {
try (serialized) { try (serialized) {
return keyMapper.map(serializer.deserialize(serialized)); var deserialized = serializer.deserialize(serialized);
return new DeserializationResult<>(keyMapper.map(deserialized.deserializedData()), deserialized.bytesRead());
} }
} }

View File

@ -6,21 +6,22 @@ import it.cavallium.dbengine.database.serialization.SerializationException;
import it.cavallium.dbengine.database.serialization.SerializerFixedBinaryLength; import it.cavallium.dbengine.database.serialization.SerializerFixedBinaryLength;
import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.NotNull;
public class MappedSerializerFixedLength<A, B> implements SerializerFixedBinaryLength<B, Send<Buffer>> { public class MappedSerializerFixedLength<A, B> implements SerializerFixedBinaryLength<B> {
private final SerializerFixedBinaryLength<A, Send<Buffer>> fixedLengthSerializer; private final SerializerFixedBinaryLength<A> fixedLengthSerializer;
private final Mapper<A, B> keyMapper; private final Mapper<A, B> keyMapper;
public MappedSerializerFixedLength(SerializerFixedBinaryLength<A, Send<Buffer>> fixedLengthSerializer, public MappedSerializerFixedLength(SerializerFixedBinaryLength<A> fixedLengthSerializer,
Mapper<A, B> keyMapper) { Mapper<A, B> keyMapper) {
this.fixedLengthSerializer = fixedLengthSerializer; this.fixedLengthSerializer = fixedLengthSerializer;
this.keyMapper = keyMapper; this.keyMapper = keyMapper;
} }
@Override @Override
public @NotNull B deserialize(@NotNull Send<Buffer> serialized) throws SerializationException { public @NotNull DeserializationResult<B> deserialize(@NotNull Send<Buffer> serialized) throws SerializationException {
try (serialized) { try (serialized) {
return keyMapper.map(fixedLengthSerializer.deserialize(serialized)); var deserialized = fixedLengthSerializer.deserialize(serialized);
return new DeserializationResult<>(keyMapper.map(deserialized.deserializedData()), deserialized.bytesRead());
} }
} }

View File

@ -117,10 +117,10 @@ public class LLDelta extends ResourceSupport<LLDelta, LLDelta> {
@Override @Override
public void drop(LLDelta obj) { public void drop(LLDelta obj) {
if (obj.previous != null) { if (obj.previous != null && obj.previous.isAccessible()) {
obj.previous.close(); obj.previous.close();
} }
if (obj.current != null) { if (obj.current != null && obj.current.isAccessible()) {
obj.current.close(); obj.current.close();
} }
delegate.drop(obj); delegate.drop(obj);

View File

@ -16,9 +16,9 @@ public class LLEntry extends ResourceSupport<LLEntry, LLEntry> {
private LLEntry(Send<Buffer> key, Send<Buffer> value, Drop<LLEntry> drop) { private LLEntry(Send<Buffer> key, Send<Buffer> value, Drop<LLEntry> drop) {
super(new LLEntry.CloseOnDrop(drop)); super(new LLEntry.CloseOnDrop(drop));
assert isAllAccessible();
this.key = key.receive().makeReadOnly(); this.key = key.receive().makeReadOnly();
this.value = value.receive().makeReadOnly(); this.value = value.receive().makeReadOnly();
assert isAllAccessible();
} }
private boolean isAllAccessible() { private boolean isAllAccessible() {
@ -119,8 +119,12 @@ public class LLEntry extends ResourceSupport<LLEntry, LLEntry> {
@Override @Override
public void drop(LLEntry obj) { public void drop(LLEntry obj) {
if (obj.key.isAccessible()) {
obj.key.close(); obj.key.close();
}
if (obj.value.isAccessible()) {
obj.value.close(); obj.value.close();
}
delegate.drop(obj); delegate.drop(obj);
} }
} }

View File

@ -199,7 +199,7 @@ public class LLUtils {
b.append('['); b.append('[');
int i = 0; int i = 0;
while(true) { while (true) {
b.append(key.getByte(startIndex + i)); b.append(key.getByte(startIndex + i));
if (i == iLimit) { if (i == iLimit) {
b.append(""); b.append("");
@ -237,10 +237,9 @@ public class LLUtils {
/** /**
* Returns {@code true} if and only if the two specified buffers are * Returns {@code true} if and only if the two specified buffers are identical to each other for {@code length} bytes
* identical to each other for {@code length} bytes starting at {@code aStartIndex} * starting at {@code aStartIndex} index for the {@code a} buffer and {@code bStartIndex} index for the {@code b}
* index for the {@code a} buffer and {@code bStartIndex} index for the {@code b} buffer. * buffer. A more compact way to express this is:
* A more compact way to express this is:
* <p> * <p>
* {@code a[aStartIndex : aStartIndex + length] == b[bStartIndex : bStartIndex + length]} * {@code a[aStartIndex : aStartIndex + length] == b[bStartIndex : bStartIndex + length]}
*/ */
@ -273,8 +272,9 @@ public class LLUtils {
} }
public static int hashCode(Buffer buf) { public static int hashCode(Buffer buf) {
if (buf == null) if (buf == null) {
return 0; return 0;
}
int result = 1; int result = 1;
var cur = buf.openCursor(); var cur = buf.openCursor();
@ -287,7 +287,6 @@ public class LLUtils {
} }
/** /**
*
* @return null if size is equal to RocksDB.NOT_FOUND * @return null if size is equal to RocksDB.NOT_FOUND
*/ */
@SuppressWarnings("ConstantConditions") @SuppressWarnings("ConstantConditions")
@ -366,12 +365,21 @@ public class LLUtils {
@NotNull @NotNull
public static ByteBuffer obtainDirect(Buffer buffer) { public static ByteBuffer obtainDirect(Buffer buffer) {
assert buffer.isAccessible(); if (!PlatformDependent.hasUnsafe()) {
if (buffer.readableBytes() == 0) { throw new UnsupportedOperationException("Please enable unsafe support or disable netty direct buffers",
return EMPTY_BYTE_BUFFER; PlatformDependent.getUnsafeUnavailabilityCause()
);
} }
if (!PlatformDependent.hasDirectBufferNoCleanerConstructor()) {
throw new UnsupportedOperationException("Please enable unsafe support or disable netty direct buffers:"
+ " DirectBufferNoCleanerConstructor is not available");
}
assert buffer.isAccessible();
long nativeAddress; long nativeAddress;
if ((nativeAddress = buffer.nativeAddress()) == 0) { if ((nativeAddress = buffer.nativeAddress()) == 0) {
if (buffer.capacity() == 0) {
return EMPTY_BYTE_BUFFER;
}
throw new IllegalStateException("Buffer is not direct"); throw new IllegalStateException("Buffer is not direct");
} }
return PlatformDependent.directBuffer(nativeAddress, buffer.capacity()); return PlatformDependent.directBuffer(nativeAddress, buffer.capacity());
@ -395,7 +403,7 @@ public class LLUtils {
} }
public static Send<Buffer> compositeBuffer(BufferAllocator alloc, Send<Buffer> buffer) { public static Send<Buffer> compositeBuffer(BufferAllocator alloc, Send<Buffer> buffer) {
try (var composite = buffer.receive().compact()) { try (var composite = buffer.receive()) {
return composite.send(); return composite.send();
} }
} }
@ -403,18 +411,21 @@ public class LLUtils {
public static Send<Buffer> compositeBuffer(BufferAllocator alloc, Send<Buffer> buffer1, Send<Buffer> buffer2) { public static Send<Buffer> compositeBuffer(BufferAllocator alloc, Send<Buffer> buffer1, Send<Buffer> buffer2) {
try (buffer1) { try (buffer1) {
try (buffer2) { try (buffer2) {
try (var composite = CompositeBuffer.compose(alloc, buffer1, buffer2).compact()) { try (var composite = CompositeBuffer.compose(alloc, buffer1, buffer2)) {
return composite.send(); return composite.send();
} }
} }
} }
} }
public static Send<Buffer> compositeBuffer(BufferAllocator alloc, Send<Buffer> buffer1, Send<Buffer> buffer2, Send<Buffer> buffer3) { public static Send<Buffer> compositeBuffer(BufferAllocator alloc,
Send<Buffer> buffer1,
Send<Buffer> buffer2,
Send<Buffer> buffer3) {
try (buffer1) { try (buffer1) {
try (buffer2) { try (buffer2) {
try (buffer3) { try (buffer3) {
try (var composite = CompositeBuffer.compose(alloc, buffer1, buffer2, buffer3).compact()) { try (var composite = CompositeBuffer.compose(alloc, buffer1, buffer2, buffer3)) {
return composite.send(); return composite.send();
} }
} }
@ -431,7 +442,7 @@ public class LLUtils {
case 2 -> compositeBuffer(alloc, buffers[0], buffers[1]); case 2 -> compositeBuffer(alloc, buffers[0], buffers[1]);
case 3 -> compositeBuffer(alloc, buffers[0], buffers[1], buffers[2]); case 3 -> compositeBuffer(alloc, buffers[0], buffers[1], buffers[2]);
default -> { default -> {
try (var composite = CompositeBuffer.compose(alloc, buffers).compact()) { try (var composite = CompositeBuffer.compose(alloc, buffers)) {
yield composite.send(); yield composite.send();
} }
} }
@ -568,8 +579,7 @@ public class LLUtils {
} }
public static <T> Mono<T> handleDiscard(Mono<T> mono) { public static <T> Mono<T> handleDiscard(Mono<T> mono) {
return mono return mono.doOnDiscard(Object.class, obj -> {
.doOnDiscard(Object.class, obj -> {
if (obj instanceof SafeCloseable o) { if (obj instanceof SafeCloseable o) {
discardRefCounted(o); discardRefCounted(o);
} else if (obj instanceof Entry o) { } else if (obj instanceof Entry o) {
@ -609,8 +619,7 @@ public class LLUtils {
} }
public static <T> Flux<T> handleDiscard(Flux<T> mono) { public static <T> Flux<T> handleDiscard(Flux<T> mono) {
return mono return mono.doOnDiscard(Object.class, obj -> {
.doOnDiscard(Object.class, obj -> {
if (obj instanceof SafeCloseable o) { if (obj instanceof SafeCloseable o) {
discardRefCounted(o); discardRefCounted(o);
} else if (obj instanceof Entry o) { } else if (obj instanceof Entry o) {

View File

@ -5,6 +5,7 @@ import io.netty.buffer.api.BufferAllocator;
import io.netty.buffer.api.Send; import io.netty.buffer.api.Send;
import it.cavallium.dbengine.database.LLDictionary; import it.cavallium.dbengine.database.LLDictionary;
import it.cavallium.dbengine.database.serialization.Serializer; import it.cavallium.dbengine.database.serialization.Serializer;
import it.cavallium.dbengine.database.serialization.Serializer.DeserializationResult;
import java.util.function.Function; import java.util.function.Function;
import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.NotNull;
@ -12,13 +13,14 @@ public class DatabaseEmpty {
@SuppressWarnings({"unused", "InstantiationOfUtilityClass"}) @SuppressWarnings({"unused", "InstantiationOfUtilityClass"})
public static final Nothing NOTHING = new Nothing(); public static final Nothing NOTHING = new Nothing();
public static final DeserializationResult<Nothing> NOTHING_RESULT = new DeserializationResult<>(NOTHING, 0);
public static Serializer<Nothing, Send<Buffer>> nothingSerializer(BufferAllocator bufferAllocator) { public static Serializer<Nothing> nothingSerializer(BufferAllocator bufferAllocator) {
return new Serializer<>() { return new Serializer<>() {
@Override @Override
public @NotNull Nothing deserialize(@NotNull Send<Buffer> serialized) { public @NotNull DeserializationResult<Nothing> deserialize(@NotNull Send<Buffer> serialized) {
try (serialized) { try (serialized) {
return NOTHING; return NOTHING_RESULT;
} }
} }

View File

@ -5,6 +5,7 @@ import io.netty.buffer.api.Send;
import io.netty.buffer.api.internal.ResourceSupport; import io.netty.buffer.api.internal.ResourceSupport;
import io.netty.util.ReferenceCounted; import io.netty.util.ReferenceCounted;
import it.cavallium.dbengine.client.CompositeSnapshot; import it.cavallium.dbengine.client.CompositeSnapshot;
import it.cavallium.dbengine.client.NoMapper;
import it.cavallium.dbengine.database.Delta; import it.cavallium.dbengine.database.Delta;
import it.cavallium.dbengine.database.ExtraKeyOperationResult; import it.cavallium.dbengine.database.ExtraKeyOperationResult;
import it.cavallium.dbengine.database.LLDictionary; import it.cavallium.dbengine.database.LLDictionary;
@ -40,27 +41,27 @@ import reactor.util.function.Tuples;
*/ */
public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U, DatabaseStageEntry<U>> { public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U, DatabaseStageEntry<U>> {
private final Serializer<U, Send<Buffer>> valueSerializer; private final Serializer<U> valueSerializer;
protected DatabaseMapDictionary(LLDictionary dictionary, protected DatabaseMapDictionary(LLDictionary dictionary,
Send<Buffer> prefixKey, Send<Buffer> prefixKey,
SerializerFixedBinaryLength<T, Send<Buffer>> keySuffixSerializer, SerializerFixedBinaryLength<T> keySuffixSerializer,
Serializer<U, Send<Buffer>> valueSerializer) { Serializer<U> valueSerializer) {
// Do not retain or release or use the prefixKey here // Do not retain or release or use the prefixKey here
super(dictionary, prefixKey, keySuffixSerializer, new SubStageGetterSingle<>(valueSerializer), 0); super(dictionary, prefixKey, keySuffixSerializer, new SubStageGetterSingle<>(valueSerializer), 0);
this.valueSerializer = valueSerializer; this.valueSerializer = valueSerializer;
} }
public static <T, U> DatabaseMapDictionary<T, U> simple(LLDictionary dictionary, public static <T, U> DatabaseMapDictionary<T, U> simple(LLDictionary dictionary,
SerializerFixedBinaryLength<T, Send<Buffer>> keySerializer, SerializerFixedBinaryLength<T> keySerializer,
Serializer<U, Send<Buffer>> valueSerializer) { Serializer<U> valueSerializer) {
return new DatabaseMapDictionary<>(dictionary, dictionary.getAllocator().allocate(0).send(), keySerializer, valueSerializer); return new DatabaseMapDictionary<>(dictionary, dictionary.getAllocator().allocate(0).send(), keySerializer, valueSerializer);
} }
public static <T, U> DatabaseMapDictionary<T, U> tail(LLDictionary dictionary, public static <T, U> DatabaseMapDictionary<T, U> tail(LLDictionary dictionary,
Send<Buffer> prefixKey, Send<Buffer> prefixKey,
SerializerFixedBinaryLength<T, Send<Buffer>> keySuffixSerializer, SerializerFixedBinaryLength<T> keySuffixSerializer,
Serializer<U, Send<Buffer>> valueSerializer) { Serializer<U> valueSerializer) {
return new DatabaseMapDictionary<>(dictionary, prefixKey, keySuffixSerializer, valueSerializer); return new DatabaseMapDictionary<>(dictionary, prefixKey, keySuffixSerializer, valueSerializer);
} }
@ -73,7 +74,7 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
private void deserializeValue(Send<Buffer> value, SynchronousSink<U> sink) { private void deserializeValue(Send<Buffer> value, SynchronousSink<U> sink) {
try { try {
sink.next(valueSerializer.deserialize(value)); sink.next(valueSerializer.deserialize(value).deserializedData());
} catch (SerializationException ex) { } catch (SerializationException ex) {
sink.error(ex); sink.error(ex);
} }
@ -86,7 +87,7 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
.<Entry<T, U>>handle((entrySend, sink) -> { .<Entry<T, U>>handle((entrySend, sink) -> {
try (var entry = entrySend.receive()) { try (var entry = entrySend.receive()) {
var key = deserializeSuffix(stripPrefix(entry.getKey())); var key = deserializeSuffix(stripPrefix(entry.getKey()));
var value = valueSerializer.deserialize(entry.getValue()); var value = valueSerializer.deserialize(entry.getValue()).deserializedData();
sink.next(Map.entry(key, value)); sink.next(Map.entry(key, value));
} catch (SerializationException ex) { } catch (SerializationException ex) {
sink.error(ex); sink.error(ex);
@ -133,18 +134,15 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
@Override @Override
public Mono<DatabaseStageEntry<U>> at(@Nullable CompositeSnapshot snapshot, T keySuffix) { public Mono<DatabaseStageEntry<U>> at(@Nullable CompositeSnapshot snapshot, T keySuffix) {
return Mono return Mono.fromCallable(() ->
.fromCallable(() -> new DatabaseSingleMapped<>( new DatabaseSingle<>(dictionary, toKey(serializeSuffix(keySuffix)), valueSerializer));
new DatabaseSingle<>(dictionary, toKey(serializeSuffix(keySuffix)), Serializer.noop())
, valueSerializer)
);
} }
@Override @Override
public Mono<U> getValue(@Nullable CompositeSnapshot snapshot, T keySuffix, boolean existsAlmostCertainly) { public Mono<U> getValue(@Nullable CompositeSnapshot snapshot, T keySuffix, boolean existsAlmostCertainly) {
return dictionary return dictionary
.get(resolveSnapshot(snapshot), Mono.fromCallable(() -> toKey(serializeSuffix(keySuffix))), existsAlmostCertainly) .get(resolveSnapshot(snapshot), Mono.fromCallable(() -> toKey(serializeSuffix(keySuffix))), existsAlmostCertainly)
.handle(this::deserializeValue); .handle((value, sink) -> deserializeValue(value, sink));
} }
@Override @Override
@ -170,7 +168,7 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
var keyMono = Mono.fromCallable(() -> toKey(serializeSuffix(keySuffix))); var keyMono = Mono.fromCallable(() -> toKey(serializeSuffix(keySuffix)));
return dictionary return dictionary
.update(keyMono, getSerializedUpdater(updater), updateReturnMode, existsAlmostCertainly) .update(keyMono, getSerializedUpdater(updater), updateReturnMode, existsAlmostCertainly)
.handle(this::deserializeValue); .handle((value, sink) -> deserializeValue(value, sink));
} }
@Override @Override
@ -180,7 +178,9 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
var keyMono = Mono.fromCallable(() -> toKey(serializeSuffix(keySuffix))); var keyMono = Mono.fromCallable(() -> toKey(serializeSuffix(keySuffix)));
return dictionary return dictionary
.updateAndGetDelta(keyMono, getSerializedUpdater(updater), existsAlmostCertainly) .updateAndGetDelta(keyMono, getSerializedUpdater(updater), existsAlmostCertainly)
.transform(mono -> LLUtils.mapLLDelta(mono, valueSerializer::deserialize)); .transform(mono -> LLUtils.mapLLDelta(mono,
serialized -> valueSerializer.deserialize(serialized).deserializedData()
));
} }
public SerializationFunction<@Nullable Send<Buffer>, @Nullable Send<Buffer>> getSerializedUpdater( public SerializationFunction<@Nullable Send<Buffer>, @Nullable Send<Buffer>> getSerializedUpdater(
@ -191,7 +191,7 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
if (oldSerialized == null) { if (oldSerialized == null) {
result = updater.apply(null); result = updater.apply(null);
} else { } else {
result = updater.apply(valueSerializer.deserialize(oldSerialized)); result = updater.apply(valueSerializer.deserialize(oldSerialized).deserializedData());
} }
if (result == null) { if (result == null) {
return null; return null;
@ -210,7 +210,7 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
if (oldSerialized == null) { if (oldSerialized == null) {
result = updater.apply(null, extra); result = updater.apply(null, extra);
} else { } else {
result = updater.apply(valueSerializer.deserialize(oldSerialized), extra); result = updater.apply(valueSerializer.deserialize(oldSerialized).deserializedData(), extra);
} }
if (result == null) { if (result == null) {
return null; return null;
@ -229,7 +229,7 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
.put(keyMono, .put(keyMono,
valueMono, valueMono,
LLDictionaryResultType.PREVIOUS_VALUE) LLDictionaryResultType.PREVIOUS_VALUE)
.handle(this::deserializeValue); .handle((value1, sink) -> deserializeValue(value1, sink));
} }
@Override @Override
@ -238,7 +238,7 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
var valueMono = Mono.fromCallable(() -> valueSerializer.serialize(value)); var valueMono = Mono.fromCallable(() -> valueSerializer.serialize(value));
return dictionary return dictionary
.put(keyMono, valueMono, LLDictionaryResultType.PREVIOUS_VALUE) .put(keyMono, valueMono, LLDictionaryResultType.PREVIOUS_VALUE)
.handle(this::deserializeValue) .handle((Send<Buffer> value1, SynchronousSink<U> sink) -> deserializeValue(value1, sink))
.map(oldValue -> !Objects.equals(oldValue, value)) .map(oldValue -> !Objects.equals(oldValue, value))
.defaultIfEmpty(value != null); .defaultIfEmpty(value != null);
} }
@ -257,7 +257,7 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
var keyMono = Mono.fromCallable(() -> toKey(serializeSuffix(keySuffix))); var keyMono = Mono.fromCallable(() -> toKey(serializeSuffix(keySuffix)));
return dictionary return dictionary
.remove(keyMono, LLDictionaryResultType.PREVIOUS_VALUE) .remove(keyMono, LLDictionaryResultType.PREVIOUS_VALUE)
.handle(this::deserializeValue); .handle((value, sink) -> deserializeValue(value, sink));
} }
@Override @Override
@ -285,7 +285,7 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
Optional<U> valueOpt; Optional<U> valueOpt;
if (entry.getT3().isPresent()) { if (entry.getT3().isPresent()) {
try (var buf = entry.getT3().get()) { try (var buf = entry.getT3().get()) {
valueOpt = Optional.of(valueSerializer.deserialize(buf)); valueOpt = Optional.of(valueSerializer.deserialize(buf).deserializedData());
} }
} else { } else {
valueOpt = Optional.empty(); valueOpt = Optional.empty();
@ -363,10 +363,10 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
try (key) { try (key) {
try (var keySuffixWithExt = stripPrefix(key).receive()) { try (var keySuffixWithExt = stripPrefix(key).receive()) {
sink.next(Map.entry(deserializeSuffix(keySuffixWithExt.copy().send()), sink.next(Map.entry(deserializeSuffix(keySuffixWithExt.copy().send()),
new DatabaseSingleMapped<>(new DatabaseSingle<>(dictionary, new DatabaseSingle<>(dictionary,
toKey(keySuffixWithExt.send()), toKey(keySuffixWithExt.send()),
Serializer.noop() valueSerializer
), valueSerializer) )
)); ));
} }
} catch (SerializationException ex) { } catch (SerializationException ex) {
@ -382,7 +382,7 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
.<Entry<T, U>>handle((serializedEntryToReceive, sink) -> { .<Entry<T, U>>handle((serializedEntryToReceive, sink) -> {
try (var serializedEntry = serializedEntryToReceive.receive()) { try (var serializedEntry = serializedEntryToReceive.receive()) {
sink.next(Map.entry(deserializeSuffix(stripPrefix(serializedEntry.getKey())), sink.next(Map.entry(deserializeSuffix(stripPrefix(serializedEntry.getKey())),
valueSerializer.deserialize(serializedEntry.getValue()))); valueSerializer.deserialize(serializedEntry.getValue()).deserializedData()));
} catch (SerializationException e) { } catch (SerializationException e) {
sink.error(e); sink.error(e);
} }

View File

@ -32,7 +32,7 @@ public class DatabaseMapDictionaryDeep<T, U, US extends DatabaseStage<U>> implem
protected final LLDictionary dictionary; protected final LLDictionary dictionary;
private final BufferAllocator alloc; private final BufferAllocator alloc;
protected final SubStageGetter<U, US> subStageGetter; protected final SubStageGetter<U, US> subStageGetter;
protected final SerializerFixedBinaryLength<T, Send<Buffer>> keySuffixSerializer; protected final SerializerFixedBinaryLength<T> keySuffixSerializer;
protected final Buffer keyPrefix; protected final Buffer keyPrefix;
protected final int keyPrefixLength; protected final int keyPrefixLength;
protected final int keySuffixLength; protected final int keySuffixLength;
@ -191,14 +191,14 @@ public class DatabaseMapDictionaryDeep<T, U, US extends DatabaseStage<U>> implem
*/ */
@Deprecated @Deprecated
public static <T, U> DatabaseMapDictionaryDeep<T, U, DatabaseStageEntry<U>> simple(LLDictionary dictionary, public static <T, U> DatabaseMapDictionaryDeep<T, U, DatabaseStageEntry<U>> simple(LLDictionary dictionary,
SerializerFixedBinaryLength<T, Send<Buffer>> keySerializer, SerializerFixedBinaryLength<T> keySerializer,
SubStageGetterSingle<U> subStageGetter) { SubStageGetterSingle<U> subStageGetter) {
return new DatabaseMapDictionaryDeep<>(dictionary, dictionary.getAllocator().allocate(0).send(), return new DatabaseMapDictionaryDeep<>(dictionary, dictionary.getAllocator().allocate(0).send(),
keySerializer, subStageGetter, 0); keySerializer, subStageGetter, 0);
} }
public static <T, U, US extends DatabaseStage<U>> DatabaseMapDictionaryDeep<T, U, US> deepTail(LLDictionary dictionary, public static <T, U, US extends DatabaseStage<U>> DatabaseMapDictionaryDeep<T, U, US> deepTail(LLDictionary dictionary,
SerializerFixedBinaryLength<T, Send<Buffer>> keySerializer, SerializerFixedBinaryLength<T> keySerializer,
int keyExtLength, int keyExtLength,
SubStageGetter<U, US> subStageGetter) { SubStageGetter<U, US> subStageGetter) {
return new DatabaseMapDictionaryDeep<>(dictionary, return new DatabaseMapDictionaryDeep<>(dictionary,
@ -211,7 +211,7 @@ public class DatabaseMapDictionaryDeep<T, U, US extends DatabaseStage<U>> implem
public static <T, U, US extends DatabaseStage<U>> DatabaseMapDictionaryDeep<T, U, US> deepIntermediate(LLDictionary dictionary, public static <T, U, US extends DatabaseStage<U>> DatabaseMapDictionaryDeep<T, U, US> deepIntermediate(LLDictionary dictionary,
Send<Buffer> prefixKey, Send<Buffer> prefixKey,
SerializerFixedBinaryLength<T, Send<Buffer>> keySuffixSerializer, SerializerFixedBinaryLength<T> keySuffixSerializer,
SubStageGetter<U, US> subStageGetter, SubStageGetter<U, US> subStageGetter,
int keyExtLength) { int keyExtLength) {
return new DatabaseMapDictionaryDeep<>(dictionary, prefixKey, keySuffixSerializer, subStageGetter, keyExtLength); return new DatabaseMapDictionaryDeep<>(dictionary, prefixKey, keySuffixSerializer, subStageGetter, keyExtLength);
@ -219,7 +219,7 @@ public class DatabaseMapDictionaryDeep<T, U, US extends DatabaseStage<U>> implem
protected DatabaseMapDictionaryDeep(LLDictionary dictionary, protected DatabaseMapDictionaryDeep(LLDictionary dictionary,
Send<Buffer> prefixKeyToReceive, Send<Buffer> prefixKeyToReceive,
SerializerFixedBinaryLength<T, Send<Buffer>> keySuffixSerializer, SerializerFixedBinaryLength<T> keySuffixSerializer,
SubStageGetter<U, US> subStageGetter, SubStageGetter<U, US> subStageGetter,
int keyExtLength) { int keyExtLength) {
try (var prefixKey = prefixKeyToReceive.receive()) { try (var prefixKey = prefixKeyToReceive.receive()) {
@ -436,7 +436,7 @@ public class DatabaseMapDictionaryDeep<T, U, US extends DatabaseStage<U>> implem
.doOnNext(Send::close) .doOnNext(Send::close)
.then(); .then();
} else { } else {
return dictionary.setRange(LLUtils.lazyRetainRange(range), Flux.empty()); return dictionary.setRange(rangeMono, Flux.empty());
} }
}); });
} }
@ -447,7 +447,7 @@ public class DatabaseMapDictionaryDeep<T, U, US extends DatabaseStage<U>> implem
assert suffixKeyConsistency(keySuffix.readableBytes()); assert suffixKeyConsistency(keySuffix.readableBytes());
var result = keySuffixSerializer.deserialize(keySuffix.send()); var result = keySuffixSerializer.deserialize(keySuffix.send());
assert keyPrefix.isAccessible(); assert keyPrefix.isAccessible();
return result; return result.deserializedData();
} }
} }

View File

@ -17,12 +17,12 @@ public class DatabaseSetDictionary<T> extends DatabaseMapDictionary<T, Nothing>
protected DatabaseSetDictionary(LLDictionary dictionary, protected DatabaseSetDictionary(LLDictionary dictionary,
Send<Buffer> prefixKey, Send<Buffer> prefixKey,
SerializerFixedBinaryLength<T, Send<Buffer>> keySuffixSerializer) { SerializerFixedBinaryLength<T> keySuffixSerializer) {
super(dictionary, prefixKey, keySuffixSerializer, DatabaseEmpty.nothingSerializer(dictionary.getAllocator())); super(dictionary, prefixKey, keySuffixSerializer, DatabaseEmpty.nothingSerializer(dictionary.getAllocator()));
} }
public static <T> DatabaseSetDictionary<T> simple(LLDictionary dictionary, public static <T> DatabaseSetDictionary<T> simple(LLDictionary dictionary,
SerializerFixedBinaryLength<T, Send<Buffer>> keySerializer) { SerializerFixedBinaryLength<T> keySerializer) {
try (var buf = dictionary.getAllocator().allocate(0)) { try (var buf = dictionary.getAllocator().allocate(0)) {
return new DatabaseSetDictionary<>(dictionary, buf.send(), keySerializer); return new DatabaseSetDictionary<>(dictionary, buf.send(), keySerializer);
} }
@ -30,7 +30,7 @@ public class DatabaseSetDictionary<T> extends DatabaseMapDictionary<T, Nothing>
public static <T> DatabaseSetDictionary<T> tail(LLDictionary dictionary, public static <T> DatabaseSetDictionary<T> tail(LLDictionary dictionary,
Send<Buffer> prefixKey, Send<Buffer> prefixKey,
SerializerFixedBinaryLength<T, Send<Buffer>> keySuffixSerializer) { SerializerFixedBinaryLength<T> keySuffixSerializer) {
return new DatabaseSetDictionary<>(dictionary, prefixKey, keySuffixSerializer); return new DatabaseSetDictionary<>(dictionary, prefixKey, keySuffixSerializer);
} }

View File

@ -27,9 +27,9 @@ public class DatabaseSingle<U> implements DatabaseStageEntry<U> {
private final LLDictionary dictionary; private final LLDictionary dictionary;
private final Buffer key; private final Buffer key;
private final Mono<Send<Buffer>> keyMono; private final Mono<Send<Buffer>> keyMono;
private final Serializer<U, Send<Buffer>> serializer; private final Serializer<U> serializer;
public DatabaseSingle(LLDictionary dictionary, Send<Buffer> key, Serializer<U, Send<Buffer>> serializer) { public DatabaseSingle(LLDictionary dictionary, Send<Buffer> key, Serializer<U> serializer) {
try (key) { try (key) {
this.dictionary = dictionary; this.dictionary = dictionary;
this.key = key.receive(); this.key = key.receive();
@ -48,7 +48,7 @@ public class DatabaseSingle<U> implements DatabaseStageEntry<U> {
private void deserializeValue(Send<Buffer> value, SynchronousSink<U> sink) { private void deserializeValue(Send<Buffer> value, SynchronousSink<U> sink) {
try { try {
sink.next(serializer.deserialize(value)); sink.next(serializer.deserialize(value).deserializedData());
} catch (SerializationException ex) { } catch (SerializationException ex) {
sink.error(ex); sink.error(ex);
} }
@ -74,7 +74,8 @@ public class DatabaseSingle<U> implements DatabaseStageEntry<U> {
boolean existsAlmostCertainly) { boolean existsAlmostCertainly) {
return dictionary return dictionary
.update(keyMono, (oldValueSer) -> { .update(keyMono, (oldValueSer) -> {
var result = updater.apply(oldValueSer == null ? null : serializer.deserialize(oldValueSer)); var result = updater.apply(
oldValueSer == null ? null : serializer.deserialize(oldValueSer).deserializedData());
if (result == null) { if (result == null) {
return null; return null;
} else { } else {
@ -89,13 +90,16 @@ public class DatabaseSingle<U> implements DatabaseStageEntry<U> {
boolean existsAlmostCertainly) { boolean existsAlmostCertainly) {
return dictionary return dictionary
.updateAndGetDelta(keyMono, (oldValueSer) -> { .updateAndGetDelta(keyMono, (oldValueSer) -> {
var result = updater.apply(oldValueSer == null ? null : serializer.deserialize(oldValueSer)); var result = updater.apply(
oldValueSer == null ? null : serializer.deserialize(oldValueSer).deserializedData());
if (result == null) { if (result == null) {
return null; return null;
} else { } else {
return serializer.serialize(result); return serializer.serialize(result);
} }
}, existsAlmostCertainly).transform(mono -> LLUtils.mapLLDelta(mono, serializer::deserialize)); }, existsAlmostCertainly).transform(mono -> LLUtils.mapLLDelta(mono,
serialized -> serializer.deserialize(serialized).deserializedData()
));
} }
@Override @Override

View File

@ -1,15 +1,13 @@
package it.cavallium.dbengine.database.collections; package it.cavallium.dbengine.database.collections;
import io.netty.buffer.api.Buffer;
import it.cavallium.dbengine.client.BadBlock; import it.cavallium.dbengine.client.BadBlock;
import it.cavallium.dbengine.client.CompositeSnapshot; import it.cavallium.dbengine.client.CompositeSnapshot;
import it.cavallium.dbengine.client.Mapper;
import it.cavallium.dbengine.database.Delta; import it.cavallium.dbengine.database.Delta;
import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.UpdateReturnMode; import it.cavallium.dbengine.database.UpdateReturnMode;
import it.cavallium.dbengine.database.serialization.SerializationException; import it.cavallium.dbengine.database.serialization.SerializationException;
import it.cavallium.dbengine.database.serialization.SerializationFunction; import it.cavallium.dbengine.database.serialization.SerializationFunction;
import it.cavallium.dbengine.database.serialization.Serializer;
import java.util.function.Function;
import org.jetbrains.annotations.Nullable; import org.jetbrains.annotations.Nullable;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
@ -19,16 +17,16 @@ import reactor.core.publisher.SynchronousSink;
public class DatabaseSingleMapped<A, B> implements DatabaseStageEntry<A> { public class DatabaseSingleMapped<A, B> implements DatabaseStageEntry<A> {
private final DatabaseStageEntry<B> serializedSingle; private final DatabaseStageEntry<B> serializedSingle;
private final Serializer<A, B> serializer; private final Mapper<A, B> mapper;
public DatabaseSingleMapped(DatabaseStageEntry<B> serializedSingle, Serializer<A, B> serializer) { public DatabaseSingleMapped(DatabaseStageEntry<B> serializedSingle, Mapper<A, B> mapper) {
this.serializedSingle = serializedSingle; this.serializedSingle = serializedSingle;
this.serializer = serializer; this.mapper = mapper;
} }
private void deserializeSink(B value, SynchronousSink<A> sink) { private void deserializeSink(B value, SynchronousSink<A> sink) {
try { try {
sink.next(this.deserialize(value)); sink.next(this.unMap(value));
} catch (SerializationException ex) { } catch (SerializationException ex) {
sink.error(ex); sink.error(ex);
} }
@ -47,14 +45,14 @@ public class DatabaseSingleMapped<A, B> implements DatabaseStageEntry<A> {
@Override @Override
public Mono<Void> set(A value) { public Mono<Void> set(A value) {
return Mono return Mono
.fromCallable(() -> serialize(value)) .fromCallable(() -> map(value))
.flatMap(serializedSingle::set); .flatMap(serializedSingle::set);
} }
@Override @Override
public Mono<A> setAndGetPrevious(A value) { public Mono<A> setAndGetPrevious(A value) {
return Mono return Mono
.fromCallable(() -> serialize(value)) .fromCallable(() -> map(value))
.flatMap(serializedSingle::setAndGetPrevious) .flatMap(serializedSingle::setAndGetPrevious)
.handle(this::deserializeSink); .handle(this::deserializeSink);
} }
@ -62,7 +60,7 @@ public class DatabaseSingleMapped<A, B> implements DatabaseStageEntry<A> {
@Override @Override
public Mono<Boolean> setAndGetChanged(A value) { public Mono<Boolean> setAndGetChanged(A value) {
return Mono return Mono
.fromCallable(() -> serialize(value)) .fromCallable(() -> map(value))
.flatMap(serializedSingle::setAndGetChanged) .flatMap(serializedSingle::setAndGetChanged)
.single(); .single();
} }
@ -72,11 +70,11 @@ public class DatabaseSingleMapped<A, B> implements DatabaseStageEntry<A> {
UpdateReturnMode updateReturnMode, UpdateReturnMode updateReturnMode,
boolean existsAlmostCertainly) { boolean existsAlmostCertainly) {
return serializedSingle.update(oldValue -> { return serializedSingle.update(oldValue -> {
var result = updater.apply(oldValue == null ? null : this.deserialize(oldValue)); var result = updater.apply(oldValue == null ? null : this.unMap(oldValue));
if (result == null) { if (result == null) {
return null; return null;
} else { } else {
return this.serialize(result); return this.map(result);
} }
}, updateReturnMode, existsAlmostCertainly).handle(this::deserializeSink); }, updateReturnMode, existsAlmostCertainly).handle(this::deserializeSink);
} }
@ -85,13 +83,13 @@ public class DatabaseSingleMapped<A, B> implements DatabaseStageEntry<A> {
public Mono<Delta<A>> updateAndGetDelta(SerializationFunction<@Nullable A, @Nullable A> updater, public Mono<Delta<A>> updateAndGetDelta(SerializationFunction<@Nullable A, @Nullable A> updater,
boolean existsAlmostCertainly) { boolean existsAlmostCertainly) {
return serializedSingle.updateAndGetDelta(oldValue -> { return serializedSingle.updateAndGetDelta(oldValue -> {
var result = updater.apply(oldValue == null ? null : this.deserialize(oldValue)); var result = updater.apply(oldValue == null ? null : this.unMap(oldValue));
if (result == null) { if (result == null) {
return null; return null;
} else { } else {
return this.serialize(result); return this.map(result);
} }
}, existsAlmostCertainly).transform(mono -> LLUtils.mapDelta(mono, this::deserialize)); }, existsAlmostCertainly).transform(mono -> LLUtils.mapDelta(mono, this::unMap));
} }
@Override @Override
@ -140,12 +138,12 @@ public class DatabaseSingleMapped<A, B> implements DatabaseStageEntry<A> {
} }
//todo: temporary wrapper. convert the whole class to buffers //todo: temporary wrapper. convert the whole class to buffers
private A deserialize(B bytes) throws SerializationException { private A unMap(B bytes) throws SerializationException {
return serializer.deserialize(bytes); return mapper.unmap(bytes);
} }
//todo: temporary wrapper. convert the whole class to buffers //todo: temporary wrapper. convert the whole class to buffers
private B serialize(A bytes) throws SerializationException { private B map(A bytes) throws SerializationException {
return serializer.serialize(bytes); return mapper.map(bytes);
} }
} }

View File

@ -17,15 +17,15 @@ import reactor.core.publisher.Mono;
public class SubStageGetterHashMap<T, U, TH> implements public class SubStageGetterHashMap<T, U, TH> implements
SubStageGetter<Map<T, U>, DatabaseMapDictionaryHashed<T, U, TH>> { SubStageGetter<Map<T, U>, DatabaseMapDictionaryHashed<T, U, TH>> {
private final Serializer<T, Send<Buffer>> keySerializer; private final Serializer<T> keySerializer;
private final Serializer<U, Send<Buffer>> valueSerializer; private final Serializer<U> valueSerializer;
private final Function<T, TH> keyHashFunction; private final Function<T, TH> keyHashFunction;
private final SerializerFixedBinaryLength<TH, Send<Buffer>> keyHashSerializer; private final SerializerFixedBinaryLength<TH> keyHashSerializer;
public SubStageGetterHashMap(Serializer<T, Send<Buffer>> keySerializer, public SubStageGetterHashMap(Serializer<T> keySerializer,
Serializer<U, Send<Buffer>> valueSerializer, Serializer<U> valueSerializer,
Function<T, TH> keyHashFunction, Function<T, TH> keyHashFunction,
SerializerFixedBinaryLength<TH, Send<Buffer>> keyHashSerializer) { SerializerFixedBinaryLength<TH> keyHashSerializer) {
this.keySerializer = keySerializer; this.keySerializer = keySerializer;
this.valueSerializer = valueSerializer; this.valueSerializer = valueSerializer;
this.keyHashFunction = keyHashFunction; this.keyHashFunction = keyHashFunction;

View File

@ -17,13 +17,13 @@ import reactor.core.publisher.Mono;
public class SubStageGetterHashSet<T, TH> implements public class SubStageGetterHashSet<T, TH> implements
SubStageGetter<Map<T, Nothing>, DatabaseSetDictionaryHashed<T, TH>> { SubStageGetter<Map<T, Nothing>, DatabaseSetDictionaryHashed<T, TH>> {
private final Serializer<T, Send<Buffer>> keySerializer; private final Serializer<T> keySerializer;
private final Function<T, TH> keyHashFunction; private final Function<T, TH> keyHashFunction;
private final SerializerFixedBinaryLength<TH, Send<Buffer>> keyHashSerializer; private final SerializerFixedBinaryLength<TH> keyHashSerializer;
public SubStageGetterHashSet(Serializer<T, Send<Buffer>> keySerializer, public SubStageGetterHashSet(Serializer<T> keySerializer,
Function<T, TH> keyHashFunction, Function<T, TH> keyHashFunction,
SerializerFixedBinaryLength<TH, Send<Buffer>> keyHashSerializer) { SerializerFixedBinaryLength<TH> keyHashSerializer) {
this.keySerializer = keySerializer; this.keySerializer = keySerializer;
this.keyHashFunction = keyHashFunction; this.keyHashFunction = keyHashFunction;
this.keyHashSerializer = keyHashSerializer; this.keyHashSerializer = keyHashSerializer;

View File

@ -15,11 +15,11 @@ import reactor.core.publisher.Mono;
public class SubStageGetterMap<T, U> implements SubStageGetter<Map<T, U>, DatabaseMapDictionary<T, U>> { public class SubStageGetterMap<T, U> implements SubStageGetter<Map<T, U>, DatabaseMapDictionary<T, U>> {
private final SerializerFixedBinaryLength<T, Send<Buffer>> keySerializer; private final SerializerFixedBinaryLength<T> keySerializer;
private final Serializer<U, Send<Buffer>> valueSerializer; private final Serializer<U> valueSerializer;
public SubStageGetterMap(SerializerFixedBinaryLength<T, Send<Buffer>> keySerializer, public SubStageGetterMap(SerializerFixedBinaryLength<T> keySerializer,
Serializer<U, Send<Buffer>> valueSerializer) { Serializer<U> valueSerializer) {
this.keySerializer = keySerializer; this.keySerializer = keySerializer;
this.valueSerializer = valueSerializer; this.valueSerializer = valueSerializer;
} }

View File

@ -16,11 +16,11 @@ public class SubStageGetterMapDeep<T, U, US extends DatabaseStage<U>> implements
SubStageGetter<Map<T, U>, DatabaseMapDictionaryDeep<T, U, US>> { SubStageGetter<Map<T, U>, DatabaseMapDictionaryDeep<T, U, US>> {
private final SubStageGetter<U, US> subStageGetter; private final SubStageGetter<U, US> subStageGetter;
private final SerializerFixedBinaryLength<T, Send<Buffer>> keySerializer; private final SerializerFixedBinaryLength<T> keySerializer;
private final int keyExtLength; private final int keyExtLength;
public SubStageGetterMapDeep(SubStageGetter<U, US> subStageGetter, public SubStageGetterMapDeep(SubStageGetter<U, US> subStageGetter,
SerializerFixedBinaryLength<T, Send<Buffer>> keySerializer, SerializerFixedBinaryLength<T> keySerializer,
int keyExtLength) { int keyExtLength) {
this.subStageGetter = subStageGetter; this.subStageGetter = subStageGetter;
this.keySerializer = keySerializer; this.keySerializer = keySerializer;

View File

@ -15,9 +15,9 @@ import reactor.core.publisher.Mono;
public class SubStageGetterSet<T> implements SubStageGetter<Map<T, Nothing>, DatabaseSetDictionary<T>> { public class SubStageGetterSet<T> implements SubStageGetter<Map<T, Nothing>, DatabaseSetDictionary<T>> {
private final SerializerFixedBinaryLength<T, Send<Buffer>> keySerializer; private final SerializerFixedBinaryLength<T> keySerializer;
public SubStageGetterSet(SerializerFixedBinaryLength<T, Send<Buffer>> keySerializer) { public SubStageGetterSet(SerializerFixedBinaryLength<T> keySerializer) {
this.keySerializer = keySerializer; this.keySerializer = keySerializer;
} }

View File

@ -29,7 +29,11 @@ class ValueWithHashSerializer<X, Y> implements Serializer<Entry<X, Y>> {
throws SerializationException { throws SerializationException {
try (var serialized = serializedToReceive.receive()) { try (var serialized = serializedToReceive.receive()) {
DeserializationResult<X> deserializedKey = keySuffixSerializer.deserialize(serialized.copy().send()); DeserializationResult<X> deserializedKey = keySuffixSerializer.deserialize(serialized.copy().send());
DeserializationResult<Y> deserializedValue = valueSerializer.deserialize(serialized.send()); DeserializationResult<Y> deserializedValue = valueSerializer.deserialize(serialized
.copy(serialized.readerOffset() + deserializedKey.bytesRead(),
serialized.readableBytes() - deserializedKey.bytesRead()
)
.send());
return new DeserializationResult<>(Map.entry(deserializedKey.deserializedData(), return new DeserializationResult<>(Map.entry(deserializedKey.deserializedData(),
deserializedValue.deserializedData()), deserializedKey.bytesRead() + deserializedValue.bytesRead()); deserializedValue.deserializedData()), deserializedKey.bytesRead() + deserializedValue.bytesRead());
} }

View File

@ -27,13 +27,17 @@ class ValuesSetSerializer<X> implements Serializer<ObjectArraySet<X>> {
@Override @Override
public @NotNull DeserializationResult<ObjectArraySet<X>> deserialize(@NotNull Send<Buffer> serializedToReceive) throws SerializationException { public @NotNull DeserializationResult<ObjectArraySet<X>> deserialize(@NotNull Send<Buffer> serializedToReceive) throws SerializationException {
try (var serialized = serializedToReceive.receive()) { try (var serialized = serializedToReceive.receive()) {
int initialReaderOffset = serialized.readerOffset();
int entriesLength = serialized.readInt(); int entriesLength = serialized.readInt();
ArrayList<X> deserializedElements = new ArrayList<>(entriesLength); ArrayList<X> deserializedElements = new ArrayList<>(entriesLength);
for (int i = 0; i < entriesLength; i++) { for (int i = 0; i < entriesLength; i++) {
X entry = entrySerializer.deserialize(serialized.copy().send()); var deserializationResult = entrySerializer.deserialize(serialized
deserializedElements.add(entry); .copy(serialized.readerOffset(), serialized.readableBytes())
.send());
deserializedElements.add(deserializationResult.deserializedData());
serialized.readerOffset(serialized.readerOffset() + deserializationResult.bytesRead());
} }
return new ObjectArraySet<>(deserializedElements); return new DeserializationResult<>(new ObjectArraySet<>(deserializedElements), serialized.readerOffset() - initialReaderOffset);
} }
} }
@ -43,6 +47,7 @@ class ValuesSetSerializer<X> implements Serializer<ObjectArraySet<X>> {
output.writeInt(deserialized.size()); output.writeInt(deserialized.size());
for (X entry : deserialized) { for (X entry : deserialized) {
try (Buffer serialized = entrySerializer.serialize(entry).receive()) { try (Buffer serialized = entrySerializer.serialize(entry).receive()) {
output.ensureWritable(serialized.readableBytes());
output.writeBytes(serialized); output.writeBytes(serialized);
} }
} }

View File

@ -376,7 +376,8 @@ public class LLLocalDictionary implements LLDictionary {
PlatformDependent.freeDirectBuffer(keyNioBuffer.byteBuffer()); PlatformDependent.freeDirectBuffer(keyNioBuffer.byteBuffer());
} }
} else { } else {
try (ReadOptions validReadOptions = Objects.requireNonNullElse(readOptions, EMPTY_READ_OPTIONS)) { ReadOptions validReadOptions = Objects.requireNonNullElse(readOptions, EMPTY_READ_OPTIONS);
try {
byte[] keyArray = LLUtils.toArray(key); byte[] keyArray = LLUtils.toArray(key);
requireNonNull(keyArray); requireNonNull(keyArray);
Holder<byte[]> data = existsAlmostCertainly ? null : new Holder<>(); Holder<byte[]> data = existsAlmostCertainly ? null : new Holder<>();
@ -394,6 +395,10 @@ public class LLLocalDictionary implements LLDictionary {
} else { } else {
return null; return null;
} }
} finally {
if (!(validReadOptions instanceof UnreleasableReadOptions)) {
validReadOptions.close();
}
} }
} }
} }
@ -440,9 +445,9 @@ public class LLLocalDictionary implements LLDictionary {
rangeSend -> { rangeSend -> {
try (var range = rangeSend.receive()) { try (var range = rangeSend.receive()) {
if (range.isSingle()) { if (range.isSingle()) {
return this.containsKey(snapshot, LLUtils.lazyRetain((range.getSingle().receive()))); return this.containsKey(snapshot, Mono.fromCallable(range::getSingle));
} else { } else {
return this.containsRange(snapshot, LLUtils.lazyRetainRange(range)); return this.containsRange(snapshot, rangeMono);
} }
} }
}, },
@ -794,7 +799,7 @@ public class LLLocalDictionary implements LLDictionary {
prevData = null; prevData = null;
} }
} else { } else {
var obtainedPrevData = dbGet(cfh, null, key.send(), existsAlmostCertainly); var obtainedPrevData = dbGet(cfh, null, key.copy().send(), existsAlmostCertainly);
if (obtainedPrevData == null) { if (obtainedPrevData == null) {
prevData = null; prevData = null;
} else { } else {
@ -852,6 +857,8 @@ public class LLLocalDictionary implements LLDictionary {
logger.trace("Writing {}: {}", logger.trace("Writing {}: {}",
LLUtils.toStringSafe(key), LLUtils.toStringSafe(newData)); LLUtils.toStringSafe(key), LLUtils.toStringSafe(newData));
} }
assert key.isAccessible();
assert newData.isAccessible();
dbPut(cfh, null, key.send(), newData.copy().send()); dbPut(cfh, null, key.send(), newData.copy().send());
} }
return LLDelta.of( return LLDelta.of(
@ -1128,7 +1135,15 @@ public class LLLocalDictionary implements LLDictionary {
for (LLEntry entry : entriesWindow) { for (LLEntry entry : entriesWindow) {
var k = entry.getKey(); var k = entry.getKey();
var v = entry.getValue(); var v = entry.getValue();
if (databaseOptions.allowNettyDirect()) {
batch.put(cfh, k, v); batch.put(cfh, k, v);
} else {
try (var key = k.receive()) {
try (var value = v.receive()) {
batch.put(cfh, LLUtils.toArray(key), LLUtils.toArray(value));
}
}
}
} }
batch.writeToDbAndClose(); batch.writeToDbAndClose();
batch.close(); batch.close();
@ -1655,7 +1670,14 @@ public class LLLocalDictionary implements LLDictionary {
)) { )) {
for (LLEntry entry : entriesList) { for (LLEntry entry : entriesList) {
assert entry.isAccessible(); assert entry.isAccessible();
if (databaseOptions.allowNettyDirect()) {
batch.put(cfh, entry.getKey(), entry.getValue()); batch.put(cfh, entry.getKey(), entry.getValue());
} else {
batch.put(cfh,
LLUtils.toArray(entry.getKeyUnsafe()),
LLUtils.toArray(entry.getValueUnsafe())
);
}
} }
batch.writeToDbAndClose(); batch.writeToDbAndClose();
} }

View File

@ -1,6 +1,7 @@
package it.cavallium.dbengine.database.disk; package it.cavallium.dbengine.database.disk;
import io.netty.buffer.api.BufferAllocator; import io.netty.buffer.api.BufferAllocator;
import io.netty.util.internal.PlatformDependent;
import it.cavallium.dbengine.database.Column; import it.cavallium.dbengine.database.Column;
import it.cavallium.dbengine.client.DatabaseOptions; import it.cavallium.dbengine.client.DatabaseOptions;
import it.cavallium.dbengine.database.LLKeyValueDatabase; import it.cavallium.dbengine.database.LLKeyValueDatabase;
@ -89,6 +90,19 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
DatabaseOptions databaseOptions) throws IOException { DatabaseOptions databaseOptions) throws IOException {
this.name = name; this.name = name;
this.allocator = allocator; this.allocator = allocator;
if (databaseOptions.allowNettyDirect()) {
if (!PlatformDependent.hasUnsafe()) {
throw new UnsupportedOperationException("Please enable unsafe support or disable netty direct buffers",
PlatformDependent.getUnsafeUnavailabilityCause()
);
}
if (!PlatformDependent.hasDirectBufferNoCleanerConstructor()) {
throw new UnsupportedOperationException("Please enable unsafe support or disable netty direct buffers:"
+ " DirectBufferNoCleanerConstructor is not available");
}
}
Options rocksdbOptions = openRocksDb(path, databaseOptions); Options rocksdbOptions = openRocksDb(path, databaseOptions);
try { try {
List<ColumnFamilyDescriptor> descriptors = new LinkedList<>(); List<ColumnFamilyDescriptor> descriptors = new LinkedList<>();

View File

@ -68,10 +68,20 @@ public abstract class LLLocalReactiveRocksIterator<T> {
var rocksIterator = tuple.getT1(); var rocksIterator = tuple.getT1();
rocksIterator.status(); rocksIterator.status();
if (rocksIterator.isValid()) { if (rocksIterator.isValid()) {
try (Buffer key = LLUtils.readDirectNioBuffer(alloc, rocksIterator::key).receive()) { Buffer key;
if (allowNettyDirect) {
key = LLUtils.readDirectNioBuffer(alloc, rocksIterator::key).receive();
} else {
key = LLUtils.fromByteArray(alloc, rocksIterator.key());
}
try (key) {
Buffer value; Buffer value;
if (readValues) { if (readValues) {
if (allowNettyDirect) {
value = LLUtils.readDirectNioBuffer(alloc, rocksIterator::value).receive(); value = LLUtils.readDirectNioBuffer(alloc, rocksIterator::value).receive();
} else {
value = LLUtils.fromByteArray(alloc, rocksIterator.value());
}
} else { } else {
value = alloc.allocate(0); value = alloc.allocate(0);
} }

View File

@ -12,9 +12,11 @@ import org.jetbrains.annotations.NotNull;
public class BufferDataInput implements DataInput, SafeCloseable { public class BufferDataInput implements DataInput, SafeCloseable {
private final Buffer buf; private final Buffer buf;
private final int initialReaderOffset;
public BufferDataInput(Send<Buffer> bufferSend) { public BufferDataInput(Send<Buffer> bufferSend) {
this.buf = bufferSend.receive().makeReadOnly(); this.buf = bufferSend.receive().makeReadOnly();
this.initialReaderOffset = buf.readerOffset();
} }
@Override @Override
@ -104,4 +106,8 @@ public class BufferDataInput implements DataInput, SafeCloseable {
public void close() { public void close() {
buf.close(); buf.close();
} }
public int getReadBytesCount() {
return buf.readerOffset() - initialReaderOffset;
}
} }

View File

@ -8,7 +8,7 @@ import java.io.IOException;
import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.NotNull;
import org.warp.commonutils.error.IndexOutOfBoundsException; import org.warp.commonutils.error.IndexOutOfBoundsException;
public class CodecSerializer<A> implements Serializer<A, Send<Buffer>> { public class CodecSerializer<A> implements Serializer<A> {
private final BufferAllocator allocator; private final BufferAllocator allocator;
private final Codecs<A> deserializationCodecs; private final Codecs<A> deserializationCodecs;
@ -37,8 +37,8 @@ public class CodecSerializer<A> implements Serializer<A, Send<Buffer>> {
} }
@Override @Override
public @NotNull A deserialize(@NotNull Send<Buffer> serialized) { public @NotNull DeserializationResult<A> deserialize(@NotNull Send<Buffer> serializedToReceive) {
try (var is = new BufferDataInput(serialized)) { try (var is = new BufferDataInput(serializedToReceive)) {
int codecId; int codecId;
if (microCodecs) { if (microCodecs) {
codecId = is.readUnsignedByte(); codecId = is.readUnsignedByte();
@ -46,7 +46,7 @@ public class CodecSerializer<A> implements Serializer<A, Send<Buffer>> {
codecId = is.readInt(); codecId = is.readInt();
} }
var serializer = deserializationCodecs.getCodec(codecId); var serializer = deserializationCodecs.getCodec(codecId);
return serializer.deserialize(is); return new DeserializationResult<>(serializer.deserialize(is), is.getReadBytesCount());
} catch (IOException ex) { } catch (IOException ex) {
// This shouldn't happen // This shouldn't happen
throw new IOError(ex); throw new IOError(ex);

View File

@ -39,11 +39,11 @@ public interface Serializer<A> {
@Override @Override
public @NotNull DeserializationResult<String> deserialize(@NotNull Send<Buffer> serializedToReceive) { public @NotNull DeserializationResult<String> deserialize(@NotNull Send<Buffer> serializedToReceive) {
try (Buffer serialized = serializedToReceive.receive()) { try (Buffer serialized = serializedToReceive.receive()) {
assert serialized.isAccessible();
int length = serialized.readInt(); int length = serialized.readInt();
var readerOffset = serialized.readerOffset(); var readerOffset = serialized.readerOffset();
var readableBytes = serialized.readableBytes();
return new DeserializationResult<>(LLUtils.deserializeString(serialized.send(), return new DeserializationResult<>(LLUtils.deserializeString(serialized.send(),
readerOffset, length, StandardCharsets.UTF_8), readableBytes); readerOffset, length, StandardCharsets.UTF_8), Integer.BYTES + length);
} }
} }
@ -51,8 +51,10 @@ public interface Serializer<A> {
public @NotNull Send<Buffer> serialize(@NotNull String deserialized) { public @NotNull Send<Buffer> serialize(@NotNull String deserialized) {
var bytes = deserialized.getBytes(StandardCharsets.UTF_8); var bytes = deserialized.getBytes(StandardCharsets.UTF_8);
try (Buffer buf = allocator.allocate(Integer.BYTES + bytes.length)) { try (Buffer buf = allocator.allocate(Integer.BYTES + bytes.length)) {
assert buf.isAccessible();
buf.writeInt(bytes.length); buf.writeInt(bytes.length);
buf.writeBytes(bytes); buf.writeBytes(bytes);
assert buf.isAccessible();
return buf.send(); return buf.send();
} }
} }

View File

@ -58,9 +58,8 @@ public interface SerializerFixedBinaryLength<A> extends Serializer<A> {
+ serialized.readableBytes() + " bytes instead"); + serialized.readableBytes() + " bytes instead");
} }
var readerOffset = serialized.readerOffset(); var readerOffset = serialized.readerOffset();
var readableBytes = serialized.readableBytes();
return new DeserializationResult<>(LLUtils.deserializeString(serialized.send(), return new DeserializationResult<>(LLUtils.deserializeString(serialized.send(),
readerOffset, length, StandardCharsets.UTF_8), readableBytes); readerOffset, length, StandardCharsets.UTF_8), length);
} }
} }
@ -68,12 +67,14 @@ public interface SerializerFixedBinaryLength<A> extends Serializer<A> {
public @NotNull Send<Buffer> serialize(@NotNull String deserialized) throws SerializationException { public @NotNull Send<Buffer> serialize(@NotNull String deserialized) throws SerializationException {
// UTF-8 uses max. 3 bytes per char, so calculate the worst case. // UTF-8 uses max. 3 bytes per char, so calculate the worst case.
try (Buffer buf = allocator.allocate(LLUtils.utf8MaxBytes(deserialized))) { try (Buffer buf = allocator.allocate(LLUtils.utf8MaxBytes(deserialized))) {
assert buf.isAccessible();
buf.writeBytes(deserialized.getBytes(StandardCharsets.UTF_8)); buf.writeBytes(deserialized.getBytes(StandardCharsets.UTF_8));
if (buf.readableBytes() != getSerializedBinaryLength()) { if (buf.readableBytes() != getSerializedBinaryLength()) {
throw new SerializationException("Fixed serializer with " + getSerializedBinaryLength() throw new SerializationException("Fixed serializer with " + getSerializedBinaryLength()
+ " bytes has tried to serialize an element with " + " bytes has tried to serialize an element with "
+ buf.readableBytes() + " bytes instead"); + buf.readableBytes() + " bytes instead");
} }
assert buf.isAccessible();
return buf.send(); return buf.send();
} }
} }
@ -95,8 +96,7 @@ public interface SerializerFixedBinaryLength<A> extends Serializer<A> {
"Fixed serializer with " + getSerializedBinaryLength() + " bytes has tried to deserialize an element with " "Fixed serializer with " + getSerializedBinaryLength() + " bytes has tried to deserialize an element with "
+ serialized.readableBytes() + " bytes instead"); + serialized.readableBytes() + " bytes instead");
} }
var readableBytes = serialized.readableBytes(); return new DeserializationResult<>(serialized.readInt(), Integer.BYTES);
return new DeserializationResult<>(serialized.readInt(), readableBytes);
} }
} }
@ -125,7 +125,7 @@ public interface SerializerFixedBinaryLength<A> extends Serializer<A> {
+ serialized.readableBytes() + " bytes instead"); + serialized.readableBytes() + " bytes instead");
} }
var readableBytes = serialized.readableBytes(); var readableBytes = serialized.readableBytes();
return new DeserializationResult<>(serialized.readLong(), readableBytes); return new DeserializationResult<>(serialized.readLong(), Long.BYTES);
} }
} }

View File

@ -99,7 +99,7 @@ public class DbTestUtils {
.flatMap(conn -> conn .flatMap(conn -> conn
.getDatabase("testdb", .getDatabase("testdb",
List.of(Column.dictionary("testmap"), Column.special("ints"), Column.special("longs")), List.of(Column.dictionary("testmap"), Column.special("ints"), Column.special("longs")),
new DatabaseOptions(Map.of(), true, false, true, false, true, true, true, -1) new DatabaseOptions(Map.of(), true, false, true, false, true, false, false, -1)
) )
.map(db -> new TempDb(alloc, conn, db, wrkspcPath)) .map(db -> new TempDb(alloc, conn, db, wrkspcPath))
); );
@ -165,12 +165,10 @@ public class DbTestUtils {
} }
@Override @Override
public @NotNull Short deserialize(@NotNull Send<Buffer> serializedToReceive) { public @NotNull DeserializationResult<Short> deserialize(@NotNull Send<Buffer> serializedToReceive) {
try (var serialized = serializedToReceive.receive()) { try (var serialized = serializedToReceive.receive()) {
var prevReaderIdx = serialized.readerOffset();
var val = serialized.readShort(); var val = serialized.readShort();
serialized.readerOffset(prevReaderIdx + Short.BYTES); return new DeserializationResult<>(val, Short.BYTES);
return val;
} }
} }