From 3091b81d34a9a4704e18c07bebb35c982e8a61cc Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Thu, 2 Sep 2021 17:15:40 +0200 Subject: [PATCH] Fix map tests --- .../dbengine/client/DatabaseOptions.java | 9 +- .../dbengine/client/MappedSerializer.java | 11 +- .../client/MappedSerializerFixedLength.java | 11 +- .../cavallium/dbengine/database/LLDelta.java | 4 +- .../cavallium/dbengine/database/LLEntry.java | 10 +- .../cavallium/dbengine/database/LLUtils.java | 135 ++++++++++-------- .../database/collections/DatabaseEmpty.java | 8 +- .../collections/DatabaseMapDictionary.java | 54 +++---- .../DatabaseMapDictionaryDeep.java | 14 +- .../collections/DatabaseSetDictionary.java | 6 +- .../database/collections/DatabaseSingle.java | 16 ++- .../collections/DatabaseSingleMapped.java | 36 +++-- .../collections/SubStageGetterHashMap.java | 12 +- .../collections/SubStageGetterHashSet.java | 8 +- .../collections/SubStageGetterMap.java | 8 +- .../collections/SubStageGetterMapDeep.java | 4 +- .../collections/SubStageGetterSet.java | 4 +- .../collections/ValueWithHashSerializer.java | 6 +- .../collections/ValuesSetSerializer.java | 11 +- .../database/disk/LLLocalDictionary.java | 34 ++++- .../disk/LLLocalKeyValueDatabase.java | 14 ++ .../disk/LLLocalReactiveRocksIterator.java | 14 +- .../serialization/BufferDataInput.java | 6 + .../serialization/CodecSerializer.java | 8 +- .../database/serialization/Serializer.java | 6 +- .../SerializerFixedBinaryLength.java | 10 +- .../it/cavallium/dbengine/DbTestUtils.java | 8 +- 27 files changed, 277 insertions(+), 190 deletions(-) diff --git a/src/main/java/it/cavallium/dbengine/client/DatabaseOptions.java b/src/main/java/it/cavallium/dbengine/client/DatabaseOptions.java index 119e718..739684c 100644 --- a/src/main/java/it/cavallium/dbengine/client/DatabaseOptions.java +++ b/src/main/java/it/cavallium/dbengine/client/DatabaseOptions.java @@ -14,4 +14,11 @@ public record DatabaseOptions(Map extraFlags, boolean allowMemoryMapping, boolean allowNettyDirect, boolean useNettyDirect, - int maxOpenFiles) {} + int maxOpenFiles) { + + public DatabaseOptions { + if (useNettyDirect && !allowNettyDirect) { + throw new IllegalArgumentException("If allowNettyDirect is false, you must also set useNettyDirect to false"); + } + } +} diff --git a/src/main/java/it/cavallium/dbengine/client/MappedSerializer.java b/src/main/java/it/cavallium/dbengine/client/MappedSerializer.java index 81b585b..790ba2e 100644 --- a/src/main/java/it/cavallium/dbengine/client/MappedSerializer.java +++ b/src/main/java/it/cavallium/dbengine/client/MappedSerializer.java @@ -6,21 +6,22 @@ import it.cavallium.dbengine.database.serialization.SerializationException; import it.cavallium.dbengine.database.serialization.Serializer; import org.jetbrains.annotations.NotNull; -public class MappedSerializer implements Serializer> { +public class MappedSerializer implements Serializer { - private final Serializer> serializer; + private final Serializer serializer; private final Mapper keyMapper; - public MappedSerializer(Serializer> serializer, + public MappedSerializer(Serializer serializer, Mapper keyMapper) { this.serializer = serializer; this.keyMapper = keyMapper; } @Override - public @NotNull B deserialize(@NotNull Send serialized) throws SerializationException { + public @NotNull DeserializationResult deserialize(@NotNull Send serialized) throws SerializationException { try (serialized) { - return keyMapper.map(serializer.deserialize(serialized)); + var deserialized = serializer.deserialize(serialized); + return new DeserializationResult<>(keyMapper.map(deserialized.deserializedData()), deserialized.bytesRead()); } } diff --git a/src/main/java/it/cavallium/dbengine/client/MappedSerializerFixedLength.java b/src/main/java/it/cavallium/dbengine/client/MappedSerializerFixedLength.java index 22ef285..fc201a1 100644 --- a/src/main/java/it/cavallium/dbengine/client/MappedSerializerFixedLength.java +++ b/src/main/java/it/cavallium/dbengine/client/MappedSerializerFixedLength.java @@ -6,21 +6,22 @@ import it.cavallium.dbengine.database.serialization.SerializationException; import it.cavallium.dbengine.database.serialization.SerializerFixedBinaryLength; import org.jetbrains.annotations.NotNull; -public class MappedSerializerFixedLength implements SerializerFixedBinaryLength> { +public class MappedSerializerFixedLength implements SerializerFixedBinaryLength { - private final SerializerFixedBinaryLength> fixedLengthSerializer; + private final SerializerFixedBinaryLength fixedLengthSerializer; private final Mapper keyMapper; - public MappedSerializerFixedLength(SerializerFixedBinaryLength> fixedLengthSerializer, + public MappedSerializerFixedLength(SerializerFixedBinaryLength fixedLengthSerializer, Mapper keyMapper) { this.fixedLengthSerializer = fixedLengthSerializer; this.keyMapper = keyMapper; } @Override - public @NotNull B deserialize(@NotNull Send serialized) throws SerializationException { + public @NotNull DeserializationResult deserialize(@NotNull Send serialized) throws SerializationException { try (serialized) { - return keyMapper.map(fixedLengthSerializer.deserialize(serialized)); + var deserialized = fixedLengthSerializer.deserialize(serialized); + return new DeserializationResult<>(keyMapper.map(deserialized.deserializedData()), deserialized.bytesRead()); } } diff --git a/src/main/java/it/cavallium/dbengine/database/LLDelta.java b/src/main/java/it/cavallium/dbengine/database/LLDelta.java index 8d4feaf..f16976e 100644 --- a/src/main/java/it/cavallium/dbengine/database/LLDelta.java +++ b/src/main/java/it/cavallium/dbengine/database/LLDelta.java @@ -117,10 +117,10 @@ public class LLDelta extends ResourceSupport { @Override public void drop(LLDelta obj) { - if (obj.previous != null) { + if (obj.previous != null && obj.previous.isAccessible()) { obj.previous.close(); } - if (obj.current != null) { + if (obj.current != null && obj.current.isAccessible()) { obj.current.close(); } delegate.drop(obj); diff --git a/src/main/java/it/cavallium/dbengine/database/LLEntry.java b/src/main/java/it/cavallium/dbengine/database/LLEntry.java index e5d73b9..f7c39fe 100644 --- a/src/main/java/it/cavallium/dbengine/database/LLEntry.java +++ b/src/main/java/it/cavallium/dbengine/database/LLEntry.java @@ -16,9 +16,9 @@ public class LLEntry extends ResourceSupport { private LLEntry(Send key, Send value, Drop drop) { super(new LLEntry.CloseOnDrop(drop)); - assert isAllAccessible(); this.key = key.receive().makeReadOnly(); this.value = value.receive().makeReadOnly(); + assert isAllAccessible(); } private boolean isAllAccessible() { @@ -119,8 +119,12 @@ public class LLEntry extends ResourceSupport { @Override public void drop(LLEntry obj) { - obj.key.close(); - obj.value.close(); + if (obj.key.isAccessible()) { + obj.key.close(); + } + if (obj.value.isAccessible()) { + obj.value.close(); + } delegate.drop(obj); } } diff --git a/src/main/java/it/cavallium/dbengine/database/LLUtils.java b/src/main/java/it/cavallium/dbengine/database/LLUtils.java index 49e5ae9..2f57509 100644 --- a/src/main/java/it/cavallium/dbengine/database/LLUtils.java +++ b/src/main/java/it/cavallium/dbengine/database/LLUtils.java @@ -199,7 +199,7 @@ public class LLUtils { b.append('['); int i = 0; - while(true) { + while (true) { b.append(key.getByte(startIndex + i)); if (i == iLimit) { b.append("…"); @@ -237,10 +237,9 @@ public class LLUtils { /** - * Returns {@code true} if and only if the two specified buffers are - * identical to each other for {@code length} bytes starting at {@code aStartIndex} - * index for the {@code a} buffer and {@code bStartIndex} index for the {@code b} buffer. - * A more compact way to express this is: + * Returns {@code true} if and only if the two specified buffers are identical to each other for {@code length} bytes + * starting at {@code aStartIndex} index for the {@code a} buffer and {@code bStartIndex} index for the {@code b} + * buffer. A more compact way to express this is: *

* {@code a[aStartIndex : aStartIndex + length] == b[bStartIndex : bStartIndex + length]} */ @@ -273,8 +272,9 @@ public class LLUtils { } public static int hashCode(Buffer buf) { - if (buf == null) + if (buf == null) { return 0; + } int result = 1; var cur = buf.openCursor(); @@ -287,7 +287,6 @@ public class LLUtils { } /** - * * @return null if size is equal to RocksDB.NOT_FOUND */ @SuppressWarnings("ConstantConditions") @@ -366,12 +365,21 @@ public class LLUtils { @NotNull public static ByteBuffer obtainDirect(Buffer buffer) { - assert buffer.isAccessible(); - if (buffer.readableBytes() == 0) { - return EMPTY_BYTE_BUFFER; + if (!PlatformDependent.hasUnsafe()) { + throw new UnsupportedOperationException("Please enable unsafe support or disable netty direct buffers", + PlatformDependent.getUnsafeUnavailabilityCause() + ); } + if (!PlatformDependent.hasDirectBufferNoCleanerConstructor()) { + throw new UnsupportedOperationException("Please enable unsafe support or disable netty direct buffers:" + + " DirectBufferNoCleanerConstructor is not available"); + } + assert buffer.isAccessible(); long nativeAddress; if ((nativeAddress = buffer.nativeAddress()) == 0) { + if (buffer.capacity() == 0) { + return EMPTY_BYTE_BUFFER; + } throw new IllegalStateException("Buffer is not direct"); } return PlatformDependent.directBuffer(nativeAddress, buffer.capacity()); @@ -395,7 +403,7 @@ public class LLUtils { } public static Send compositeBuffer(BufferAllocator alloc, Send buffer) { - try (var composite = buffer.receive().compact()) { + try (var composite = buffer.receive()) { return composite.send(); } } @@ -403,18 +411,21 @@ public class LLUtils { public static Send compositeBuffer(BufferAllocator alloc, Send buffer1, Send buffer2) { try (buffer1) { try (buffer2) { - try (var composite = CompositeBuffer.compose(alloc, buffer1, buffer2).compact()) { + try (var composite = CompositeBuffer.compose(alloc, buffer1, buffer2)) { return composite.send(); } } } } - public static Send compositeBuffer(BufferAllocator alloc, Send buffer1, Send buffer2, Send buffer3) { + public static Send compositeBuffer(BufferAllocator alloc, + Send buffer1, + Send buffer2, + Send buffer3) { try (buffer1) { try (buffer2) { try (buffer3) { - try (var composite = CompositeBuffer.compose(alloc, buffer1, buffer2, buffer3).compact()) { + try (var composite = CompositeBuffer.compose(alloc, buffer1, buffer2, buffer3)) { return composite.send(); } } @@ -431,7 +442,7 @@ public class LLUtils { case 2 -> compositeBuffer(alloc, buffers[0], buffers[1]); case 3 -> compositeBuffer(alloc, buffers[0], buffers[1], buffers[2]); default -> { - try (var composite = CompositeBuffer.compose(alloc, buffers).compact()) { + try (var composite = CompositeBuffer.compose(alloc, buffers)) { yield composite.send(); } } @@ -568,30 +579,29 @@ public class LLUtils { } public static Mono handleDiscard(Mono mono) { - return mono - .doOnDiscard(Object.class, obj -> { - if (obj instanceof SafeCloseable o) { - discardRefCounted(o); - } else if (obj instanceof Entry o) { - discardEntry(o); - } else if (obj instanceof Collection o) { - discardCollection(o); - } else if (obj instanceof Tuple3 o) { - discardTuple3(o); - } else if (obj instanceof Tuple2 o) { - discardTuple2(o); - } else if (obj instanceof LLEntry o) { - discardLLEntry(o); - } else if (obj instanceof LLRange o) { - discardLLRange(o); - } else if (obj instanceof Delta o) { - discardDelta(o); - } else if (obj instanceof Send o) { - discardSend(o); - } else if (obj instanceof Map o) { - discardMap(o); - } - }); + return mono.doOnDiscard(Object.class, obj -> { + if (obj instanceof SafeCloseable o) { + discardRefCounted(o); + } else if (obj instanceof Entry o) { + discardEntry(o); + } else if (obj instanceof Collection o) { + discardCollection(o); + } else if (obj instanceof Tuple3 o) { + discardTuple3(o); + } else if (obj instanceof Tuple2 o) { + discardTuple2(o); + } else if (obj instanceof LLEntry o) { + discardLLEntry(o); + } else if (obj instanceof LLRange o) { + discardLLRange(o); + } else if (obj instanceof Delta o) { + discardDelta(o); + } else if (obj instanceof Send o) { + discardSend(o); + } else if (obj instanceof Map o) { + discardMap(o); + } + }); // todo: check if the single object discard hook is more performant /* .doOnDiscard(SafeCloseable.class, LLUtils::discardRefCounted) @@ -609,30 +619,29 @@ public class LLUtils { } public static Flux handleDiscard(Flux mono) { - return mono - .doOnDiscard(Object.class, obj -> { - if (obj instanceof SafeCloseable o) { - discardRefCounted(o); - } else if (obj instanceof Entry o) { - discardEntry(o); - } else if (obj instanceof Collection o) { - discardCollection(o); - } else if (obj instanceof Tuple3 o) { - discardTuple3(o); - } else if (obj instanceof Tuple2 o) { - discardTuple2(o); - } else if (obj instanceof LLEntry o) { - discardLLEntry(o); - } else if (obj instanceof LLRange o) { - discardLLRange(o); - } else if (obj instanceof Delta o) { - discardDelta(o); - } else if (obj instanceof Send o) { - discardSend(o); - } else if (obj instanceof Map o) { - discardMap(o); - } - }); + return mono.doOnDiscard(Object.class, obj -> { + if (obj instanceof SafeCloseable o) { + discardRefCounted(o); + } else if (obj instanceof Entry o) { + discardEntry(o); + } else if (obj instanceof Collection o) { + discardCollection(o); + } else if (obj instanceof Tuple3 o) { + discardTuple3(o); + } else if (obj instanceof Tuple2 o) { + discardTuple2(o); + } else if (obj instanceof LLEntry o) { + discardLLEntry(o); + } else if (obj instanceof LLRange o) { + discardLLRange(o); + } else if (obj instanceof Delta o) { + discardDelta(o); + } else if (obj instanceof Send o) { + discardSend(o); + } else if (obj instanceof Map o) { + discardMap(o); + } + }); // todo: check if the single object discard hook is more performant /* .doOnDiscard(SafeCloseable.class, LLUtils::discardRefCounted) diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseEmpty.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseEmpty.java index 7e47410..c1b9e66 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseEmpty.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseEmpty.java @@ -5,6 +5,7 @@ import io.netty.buffer.api.BufferAllocator; import io.netty.buffer.api.Send; import it.cavallium.dbengine.database.LLDictionary; import it.cavallium.dbengine.database.serialization.Serializer; +import it.cavallium.dbengine.database.serialization.Serializer.DeserializationResult; import java.util.function.Function; import org.jetbrains.annotations.NotNull; @@ -12,13 +13,14 @@ public class DatabaseEmpty { @SuppressWarnings({"unused", "InstantiationOfUtilityClass"}) public static final Nothing NOTHING = new Nothing(); + public static final DeserializationResult NOTHING_RESULT = new DeserializationResult<>(NOTHING, 0); - public static Serializer> nothingSerializer(BufferAllocator bufferAllocator) { + public static Serializer nothingSerializer(BufferAllocator bufferAllocator) { return new Serializer<>() { @Override - public @NotNull Nothing deserialize(@NotNull Send serialized) { + public @NotNull DeserializationResult deserialize(@NotNull Send serialized) { try (serialized) { - return NOTHING; + return NOTHING_RESULT; } } diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java index 6e8afd7..9c24eb2 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java @@ -5,6 +5,7 @@ import io.netty.buffer.api.Send; import io.netty.buffer.api.internal.ResourceSupport; import io.netty.util.ReferenceCounted; import it.cavallium.dbengine.client.CompositeSnapshot; +import it.cavallium.dbengine.client.NoMapper; import it.cavallium.dbengine.database.Delta; import it.cavallium.dbengine.database.ExtraKeyOperationResult; import it.cavallium.dbengine.database.LLDictionary; @@ -40,27 +41,27 @@ import reactor.util.function.Tuples; */ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep> { - private final Serializer> valueSerializer; + private final Serializer valueSerializer; protected DatabaseMapDictionary(LLDictionary dictionary, Send prefixKey, - SerializerFixedBinaryLength> keySuffixSerializer, - Serializer> valueSerializer) { + SerializerFixedBinaryLength keySuffixSerializer, + Serializer valueSerializer) { // Do not retain or release or use the prefixKey here super(dictionary, prefixKey, keySuffixSerializer, new SubStageGetterSingle<>(valueSerializer), 0); this.valueSerializer = valueSerializer; } public static DatabaseMapDictionary simple(LLDictionary dictionary, - SerializerFixedBinaryLength> keySerializer, - Serializer> valueSerializer) { + SerializerFixedBinaryLength keySerializer, + Serializer valueSerializer) { return new DatabaseMapDictionary<>(dictionary, dictionary.getAllocator().allocate(0).send(), keySerializer, valueSerializer); } public static DatabaseMapDictionary tail(LLDictionary dictionary, Send prefixKey, - SerializerFixedBinaryLength> keySuffixSerializer, - Serializer> valueSerializer) { + SerializerFixedBinaryLength keySuffixSerializer, + Serializer valueSerializer) { return new DatabaseMapDictionary<>(dictionary, prefixKey, keySuffixSerializer, valueSerializer); } @@ -73,7 +74,7 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep value, SynchronousSink sink) { try { - sink.next(valueSerializer.deserialize(value)); + sink.next(valueSerializer.deserialize(value).deserializedData()); } catch (SerializationException ex) { sink.error(ex); } @@ -86,7 +87,7 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep>handle((entrySend, sink) -> { try (var entry = entrySend.receive()) { var key = deserializeSuffix(stripPrefix(entry.getKey())); - var value = valueSerializer.deserialize(entry.getValue()); + var value = valueSerializer.deserialize(entry.getValue()).deserializedData(); sink.next(Map.entry(key, value)); } catch (SerializationException ex) { sink.error(ex); @@ -133,18 +134,15 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep> at(@Nullable CompositeSnapshot snapshot, T keySuffix) { - return Mono - .fromCallable(() -> new DatabaseSingleMapped<>( - new DatabaseSingle<>(dictionary, toKey(serializeSuffix(keySuffix)), Serializer.noop()) - , valueSerializer) - ); + return Mono.fromCallable(() -> + new DatabaseSingle<>(dictionary, toKey(serializeSuffix(keySuffix)), valueSerializer)); } @Override public Mono getValue(@Nullable CompositeSnapshot snapshot, T keySuffix, boolean existsAlmostCertainly) { return dictionary .get(resolveSnapshot(snapshot), Mono.fromCallable(() -> toKey(serializeSuffix(keySuffix))), existsAlmostCertainly) - .handle(this::deserializeValue); + .handle((value, sink) -> deserializeValue(value, sink)); } @Override @@ -170,7 +168,7 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep toKey(serializeSuffix(keySuffix))); return dictionary .update(keyMono, getSerializedUpdater(updater), updateReturnMode, existsAlmostCertainly) - .handle(this::deserializeValue); + .handle((value, sink) -> deserializeValue(value, sink)); } @Override @@ -180,7 +178,9 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep toKey(serializeSuffix(keySuffix))); return dictionary .updateAndGetDelta(keyMono, getSerializedUpdater(updater), existsAlmostCertainly) - .transform(mono -> LLUtils.mapLLDelta(mono, valueSerializer::deserialize)); + .transform(mono -> LLUtils.mapLLDelta(mono, + serialized -> valueSerializer.deserialize(serialized).deserializedData() + )); } public SerializationFunction<@Nullable Send, @Nullable Send> getSerializedUpdater( @@ -191,7 +191,7 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep extends DatabaseMapDictionaryDeep extends DatabaseMapDictionaryDeep deserializeValue(value1, sink)); } @Override @@ -238,7 +238,7 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep valueSerializer.serialize(value)); return dictionary .put(keyMono, valueMono, LLDictionaryResultType.PREVIOUS_VALUE) - .handle(this::deserializeValue) + .handle((Send value1, SynchronousSink sink) -> deserializeValue(value1, sink)) .map(oldValue -> !Objects.equals(oldValue, value)) .defaultIfEmpty(value != null); } @@ -257,7 +257,7 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep toKey(serializeSuffix(keySuffix))); return dictionary .remove(keyMono, LLDictionaryResultType.PREVIOUS_VALUE) - .handle(this::deserializeValue); + .handle((value, sink) -> deserializeValue(value, sink)); } @Override @@ -285,7 +285,7 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep valueOpt; if (entry.getT3().isPresent()) { try (var buf = entry.getT3().get()) { - valueOpt = Optional.of(valueSerializer.deserialize(buf)); + valueOpt = Optional.of(valueSerializer.deserialize(buf).deserializedData()); } } else { valueOpt = Optional.empty(); @@ -363,10 +363,10 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep(new DatabaseSingle<>(dictionary, + new DatabaseSingle<>(dictionary, toKey(keySuffixWithExt.send()), - Serializer.noop() - ), valueSerializer) + valueSerializer + ) )); } } catch (SerializationException ex) { @@ -382,7 +382,7 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep>handle((serializedEntryToReceive, sink) -> { try (var serializedEntry = serializedEntryToReceive.receive()) { sink.next(Map.entry(deserializeSuffix(stripPrefix(serializedEntry.getKey())), - valueSerializer.deserialize(serializedEntry.getValue()))); + valueSerializer.deserialize(serializedEntry.getValue()).deserializedData())); } catch (SerializationException e) { sink.error(e); } diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryDeep.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryDeep.java index 1b4fc83..c70ae92 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryDeep.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryDeep.java @@ -32,7 +32,7 @@ public class DatabaseMapDictionaryDeep> implem protected final LLDictionary dictionary; private final BufferAllocator alloc; protected final SubStageGetter subStageGetter; - protected final SerializerFixedBinaryLength> keySuffixSerializer; + protected final SerializerFixedBinaryLength keySuffixSerializer; protected final Buffer keyPrefix; protected final int keyPrefixLength; protected final int keySuffixLength; @@ -191,14 +191,14 @@ public class DatabaseMapDictionaryDeep> implem */ @Deprecated public static DatabaseMapDictionaryDeep> simple(LLDictionary dictionary, - SerializerFixedBinaryLength> keySerializer, + SerializerFixedBinaryLength keySerializer, SubStageGetterSingle subStageGetter) { return new DatabaseMapDictionaryDeep<>(dictionary, dictionary.getAllocator().allocate(0).send(), keySerializer, subStageGetter, 0); } public static > DatabaseMapDictionaryDeep deepTail(LLDictionary dictionary, - SerializerFixedBinaryLength> keySerializer, + SerializerFixedBinaryLength keySerializer, int keyExtLength, SubStageGetter subStageGetter) { return new DatabaseMapDictionaryDeep<>(dictionary, @@ -211,7 +211,7 @@ public class DatabaseMapDictionaryDeep> implem public static > DatabaseMapDictionaryDeep deepIntermediate(LLDictionary dictionary, Send prefixKey, - SerializerFixedBinaryLength> keySuffixSerializer, + SerializerFixedBinaryLength keySuffixSerializer, SubStageGetter subStageGetter, int keyExtLength) { return new DatabaseMapDictionaryDeep<>(dictionary, prefixKey, keySuffixSerializer, subStageGetter, keyExtLength); @@ -219,7 +219,7 @@ public class DatabaseMapDictionaryDeep> implem protected DatabaseMapDictionaryDeep(LLDictionary dictionary, Send prefixKeyToReceive, - SerializerFixedBinaryLength> keySuffixSerializer, + SerializerFixedBinaryLength keySuffixSerializer, SubStageGetter subStageGetter, int keyExtLength) { try (var prefixKey = prefixKeyToReceive.receive()) { @@ -436,7 +436,7 @@ public class DatabaseMapDictionaryDeep> implem .doOnNext(Send::close) .then(); } else { - return dictionary.setRange(LLUtils.lazyRetainRange(range), Flux.empty()); + return dictionary.setRange(rangeMono, Flux.empty()); } }); } @@ -447,7 +447,7 @@ public class DatabaseMapDictionaryDeep> implem assert suffixKeyConsistency(keySuffix.readableBytes()); var result = keySuffixSerializer.deserialize(keySuffix.send()); assert keyPrefix.isAccessible(); - return result; + return result.deserializedData(); } } diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSetDictionary.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSetDictionary.java index d2b8161..08f3df0 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSetDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSetDictionary.java @@ -17,12 +17,12 @@ public class DatabaseSetDictionary extends DatabaseMapDictionary protected DatabaseSetDictionary(LLDictionary dictionary, Send prefixKey, - SerializerFixedBinaryLength> keySuffixSerializer) { + SerializerFixedBinaryLength keySuffixSerializer) { super(dictionary, prefixKey, keySuffixSerializer, DatabaseEmpty.nothingSerializer(dictionary.getAllocator())); } public static DatabaseSetDictionary simple(LLDictionary dictionary, - SerializerFixedBinaryLength> keySerializer) { + SerializerFixedBinaryLength keySerializer) { try (var buf = dictionary.getAllocator().allocate(0)) { return new DatabaseSetDictionary<>(dictionary, buf.send(), keySerializer); } @@ -30,7 +30,7 @@ public class DatabaseSetDictionary extends DatabaseMapDictionary public static DatabaseSetDictionary tail(LLDictionary dictionary, Send prefixKey, - SerializerFixedBinaryLength> keySuffixSerializer) { + SerializerFixedBinaryLength keySuffixSerializer) { return new DatabaseSetDictionary<>(dictionary, prefixKey, keySuffixSerializer); } diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingle.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingle.java index 7b03570..fdd1be3 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingle.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingle.java @@ -27,9 +27,9 @@ public class DatabaseSingle implements DatabaseStageEntry { private final LLDictionary dictionary; private final Buffer key; private final Mono> keyMono; - private final Serializer> serializer; + private final Serializer serializer; - public DatabaseSingle(LLDictionary dictionary, Send key, Serializer> serializer) { + public DatabaseSingle(LLDictionary dictionary, Send key, Serializer serializer) { try (key) { this.dictionary = dictionary; this.key = key.receive(); @@ -48,7 +48,7 @@ public class DatabaseSingle implements DatabaseStageEntry { private void deserializeValue(Send value, SynchronousSink sink) { try { - sink.next(serializer.deserialize(value)); + sink.next(serializer.deserialize(value).deserializedData()); } catch (SerializationException ex) { sink.error(ex); } @@ -74,7 +74,8 @@ public class DatabaseSingle implements DatabaseStageEntry { boolean existsAlmostCertainly) { return dictionary .update(keyMono, (oldValueSer) -> { - var result = updater.apply(oldValueSer == null ? null : serializer.deserialize(oldValueSer)); + var result = updater.apply( + oldValueSer == null ? null : serializer.deserialize(oldValueSer).deserializedData()); if (result == null) { return null; } else { @@ -89,13 +90,16 @@ public class DatabaseSingle implements DatabaseStageEntry { boolean existsAlmostCertainly) { return dictionary .updateAndGetDelta(keyMono, (oldValueSer) -> { - var result = updater.apply(oldValueSer == null ? null : serializer.deserialize(oldValueSer)); + var result = updater.apply( + oldValueSer == null ? null : serializer.deserialize(oldValueSer).deserializedData()); if (result == null) { return null; } else { return serializer.serialize(result); } - }, existsAlmostCertainly).transform(mono -> LLUtils.mapLLDelta(mono, serializer::deserialize)); + }, existsAlmostCertainly).transform(mono -> LLUtils.mapLLDelta(mono, + serialized -> serializer.deserialize(serialized).deserializedData() + )); } @Override diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingleMapped.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingleMapped.java index 4def30c..c17fcc9 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingleMapped.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingleMapped.java @@ -1,15 +1,13 @@ package it.cavallium.dbengine.database.collections; -import io.netty.buffer.api.Buffer; import it.cavallium.dbengine.client.BadBlock; import it.cavallium.dbengine.client.CompositeSnapshot; +import it.cavallium.dbengine.client.Mapper; import it.cavallium.dbengine.database.Delta; import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.database.UpdateReturnMode; import it.cavallium.dbengine.database.serialization.SerializationException; import it.cavallium.dbengine.database.serialization.SerializationFunction; -import it.cavallium.dbengine.database.serialization.Serializer; -import java.util.function.Function; import org.jetbrains.annotations.Nullable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -19,16 +17,16 @@ import reactor.core.publisher.SynchronousSink; public class DatabaseSingleMapped implements DatabaseStageEntry { private final DatabaseStageEntry serializedSingle; - private final Serializer serializer; + private final Mapper mapper; - public DatabaseSingleMapped(DatabaseStageEntry serializedSingle, Serializer serializer) { + public DatabaseSingleMapped(DatabaseStageEntry serializedSingle, Mapper mapper) { this.serializedSingle = serializedSingle; - this.serializer = serializer; + this.mapper = mapper; } private void deserializeSink(B value, SynchronousSink sink) { try { - sink.next(this.deserialize(value)); + sink.next(this.unMap(value)); } catch (SerializationException ex) { sink.error(ex); } @@ -47,14 +45,14 @@ public class DatabaseSingleMapped implements DatabaseStageEntry { @Override public Mono set(A value) { return Mono - .fromCallable(() -> serialize(value)) + .fromCallable(() -> map(value)) .flatMap(serializedSingle::set); } @Override public Mono setAndGetPrevious(A value) { return Mono - .fromCallable(() -> serialize(value)) + .fromCallable(() -> map(value)) .flatMap(serializedSingle::setAndGetPrevious) .handle(this::deserializeSink); } @@ -62,7 +60,7 @@ public class DatabaseSingleMapped implements DatabaseStageEntry { @Override public Mono setAndGetChanged(A value) { return Mono - .fromCallable(() -> serialize(value)) + .fromCallable(() -> map(value)) .flatMap(serializedSingle::setAndGetChanged) .single(); } @@ -72,11 +70,11 @@ public class DatabaseSingleMapped implements DatabaseStageEntry { UpdateReturnMode updateReturnMode, boolean existsAlmostCertainly) { return serializedSingle.update(oldValue -> { - var result = updater.apply(oldValue == null ? null : this.deserialize(oldValue)); + var result = updater.apply(oldValue == null ? null : this.unMap(oldValue)); if (result == null) { return null; } else { - return this.serialize(result); + return this.map(result); } }, updateReturnMode, existsAlmostCertainly).handle(this::deserializeSink); } @@ -85,13 +83,13 @@ public class DatabaseSingleMapped implements DatabaseStageEntry { public Mono> updateAndGetDelta(SerializationFunction<@Nullable A, @Nullable A> updater, boolean existsAlmostCertainly) { return serializedSingle.updateAndGetDelta(oldValue -> { - var result = updater.apply(oldValue == null ? null : this.deserialize(oldValue)); + var result = updater.apply(oldValue == null ? null : this.unMap(oldValue)); if (result == null) { return null; } else { - return this.serialize(result); + return this.map(result); } - }, existsAlmostCertainly).transform(mono -> LLUtils.mapDelta(mono, this::deserialize)); + }, existsAlmostCertainly).transform(mono -> LLUtils.mapDelta(mono, this::unMap)); } @Override @@ -140,12 +138,12 @@ public class DatabaseSingleMapped implements DatabaseStageEntry { } //todo: temporary wrapper. convert the whole class to buffers - private A deserialize(B bytes) throws SerializationException { - return serializer.deserialize(bytes); + private A unMap(B bytes) throws SerializationException { + return mapper.unmap(bytes); } //todo: temporary wrapper. convert the whole class to buffers - private B serialize(A bytes) throws SerializationException { - return serializer.serialize(bytes); + private B map(A bytes) throws SerializationException { + return mapper.map(bytes); } } diff --git a/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterHashMap.java b/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterHashMap.java index ee83112..e0ee8f2 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterHashMap.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterHashMap.java @@ -17,15 +17,15 @@ import reactor.core.publisher.Mono; public class SubStageGetterHashMap implements SubStageGetter, DatabaseMapDictionaryHashed> { - private final Serializer> keySerializer; - private final Serializer> valueSerializer; + private final Serializer keySerializer; + private final Serializer valueSerializer; private final Function keyHashFunction; - private final SerializerFixedBinaryLength> keyHashSerializer; + private final SerializerFixedBinaryLength keyHashSerializer; - public SubStageGetterHashMap(Serializer> keySerializer, - Serializer> valueSerializer, + public SubStageGetterHashMap(Serializer keySerializer, + Serializer valueSerializer, Function keyHashFunction, - SerializerFixedBinaryLength> keyHashSerializer) { + SerializerFixedBinaryLength keyHashSerializer) { this.keySerializer = keySerializer; this.valueSerializer = valueSerializer; this.keyHashFunction = keyHashFunction; diff --git a/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterHashSet.java b/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterHashSet.java index 7fb153d..c1375aa 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterHashSet.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterHashSet.java @@ -17,13 +17,13 @@ import reactor.core.publisher.Mono; public class SubStageGetterHashSet implements SubStageGetter, DatabaseSetDictionaryHashed> { - private final Serializer> keySerializer; + private final Serializer keySerializer; private final Function keyHashFunction; - private final SerializerFixedBinaryLength> keyHashSerializer; + private final SerializerFixedBinaryLength keyHashSerializer; - public SubStageGetterHashSet(Serializer> keySerializer, + public SubStageGetterHashSet(Serializer keySerializer, Function keyHashFunction, - SerializerFixedBinaryLength> keyHashSerializer) { + SerializerFixedBinaryLength keyHashSerializer) { this.keySerializer = keySerializer; this.keyHashFunction = keyHashFunction; this.keyHashSerializer = keyHashSerializer; diff --git a/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterMap.java b/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterMap.java index 8eba44f..2f01ce9 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterMap.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterMap.java @@ -15,11 +15,11 @@ import reactor.core.publisher.Mono; public class SubStageGetterMap implements SubStageGetter, DatabaseMapDictionary> { - private final SerializerFixedBinaryLength> keySerializer; - private final Serializer> valueSerializer; + private final SerializerFixedBinaryLength keySerializer; + private final Serializer valueSerializer; - public SubStageGetterMap(SerializerFixedBinaryLength> keySerializer, - Serializer> valueSerializer) { + public SubStageGetterMap(SerializerFixedBinaryLength keySerializer, + Serializer valueSerializer) { this.keySerializer = keySerializer; this.valueSerializer = valueSerializer; } diff --git a/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterMapDeep.java b/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterMapDeep.java index 6ac527c..c4742e5 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterMapDeep.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterMapDeep.java @@ -16,11 +16,11 @@ public class SubStageGetterMapDeep> implements SubStageGetter, DatabaseMapDictionaryDeep> { private final SubStageGetter subStageGetter; - private final SerializerFixedBinaryLength> keySerializer; + private final SerializerFixedBinaryLength keySerializer; private final int keyExtLength; public SubStageGetterMapDeep(SubStageGetter subStageGetter, - SerializerFixedBinaryLength> keySerializer, + SerializerFixedBinaryLength keySerializer, int keyExtLength) { this.subStageGetter = subStageGetter; this.keySerializer = keySerializer; diff --git a/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterSet.java b/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterSet.java index 1f57fa3..8de364c 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterSet.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterSet.java @@ -15,9 +15,9 @@ import reactor.core.publisher.Mono; public class SubStageGetterSet implements SubStageGetter, DatabaseSetDictionary> { - private final SerializerFixedBinaryLength> keySerializer; + private final SerializerFixedBinaryLength keySerializer; - public SubStageGetterSet(SerializerFixedBinaryLength> keySerializer) { + public SubStageGetterSet(SerializerFixedBinaryLength keySerializer) { this.keySerializer = keySerializer; } diff --git a/src/main/java/it/cavallium/dbengine/database/collections/ValueWithHashSerializer.java b/src/main/java/it/cavallium/dbengine/database/collections/ValueWithHashSerializer.java index 6be5a5c..0d13357 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/ValueWithHashSerializer.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/ValueWithHashSerializer.java @@ -29,7 +29,11 @@ class ValueWithHashSerializer implements Serializer> { throws SerializationException { try (var serialized = serializedToReceive.receive()) { DeserializationResult deserializedKey = keySuffixSerializer.deserialize(serialized.copy().send()); - DeserializationResult deserializedValue = valueSerializer.deserialize(serialized.send()); + DeserializationResult deserializedValue = valueSerializer.deserialize(serialized + .copy(serialized.readerOffset() + deserializedKey.bytesRead(), + serialized.readableBytes() - deserializedKey.bytesRead() + ) + .send()); return new DeserializationResult<>(Map.entry(deserializedKey.deserializedData(), deserializedValue.deserializedData()), deserializedKey.bytesRead() + deserializedValue.bytesRead()); } diff --git a/src/main/java/it/cavallium/dbengine/database/collections/ValuesSetSerializer.java b/src/main/java/it/cavallium/dbengine/database/collections/ValuesSetSerializer.java index bf744c7..7fb26c3 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/ValuesSetSerializer.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/ValuesSetSerializer.java @@ -27,13 +27,17 @@ class ValuesSetSerializer implements Serializer> { @Override public @NotNull DeserializationResult> deserialize(@NotNull Send serializedToReceive) throws SerializationException { try (var serialized = serializedToReceive.receive()) { + int initialReaderOffset = serialized.readerOffset(); int entriesLength = serialized.readInt(); ArrayList deserializedElements = new ArrayList<>(entriesLength); for (int i = 0; i < entriesLength; i++) { - X entry = entrySerializer.deserialize(serialized.copy().send()); - deserializedElements.add(entry); + var deserializationResult = entrySerializer.deserialize(serialized + .copy(serialized.readerOffset(), serialized.readableBytes()) + .send()); + deserializedElements.add(deserializationResult.deserializedData()); + serialized.readerOffset(serialized.readerOffset() + deserializationResult.bytesRead()); } - return new ObjectArraySet<>(deserializedElements); + return new DeserializationResult<>(new ObjectArraySet<>(deserializedElements), serialized.readerOffset() - initialReaderOffset); } } @@ -43,6 +47,7 @@ class ValuesSetSerializer implements Serializer> { output.writeInt(deserialized.size()); for (X entry : deserialized) { try (Buffer serialized = entrySerializer.serialize(entry).receive()) { + output.ensureWritable(serialized.readableBytes()); output.writeBytes(serialized); } } diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java index c8281c4..ae8f705 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java @@ -376,7 +376,8 @@ public class LLLocalDictionary implements LLDictionary { PlatformDependent.freeDirectBuffer(keyNioBuffer.byteBuffer()); } } else { - try (ReadOptions validReadOptions = Objects.requireNonNullElse(readOptions, EMPTY_READ_OPTIONS)) { + ReadOptions validReadOptions = Objects.requireNonNullElse(readOptions, EMPTY_READ_OPTIONS); + try { byte[] keyArray = LLUtils.toArray(key); requireNonNull(keyArray); Holder data = existsAlmostCertainly ? null : new Holder<>(); @@ -394,6 +395,10 @@ public class LLLocalDictionary implements LLDictionary { } else { return null; } + } finally { + if (!(validReadOptions instanceof UnreleasableReadOptions)) { + validReadOptions.close(); + } } } } @@ -440,9 +445,9 @@ public class LLLocalDictionary implements LLDictionary { rangeSend -> { try (var range = rangeSend.receive()) { if (range.isSingle()) { - return this.containsKey(snapshot, LLUtils.lazyRetain((range.getSingle().receive()))); + return this.containsKey(snapshot, Mono.fromCallable(range::getSingle)); } else { - return this.containsRange(snapshot, LLUtils.lazyRetainRange(range)); + return this.containsRange(snapshot, rangeMono); } } }, @@ -794,7 +799,7 @@ public class LLLocalDictionary implements LLDictionary { prevData = null; } } else { - var obtainedPrevData = dbGet(cfh, null, key.send(), existsAlmostCertainly); + var obtainedPrevData = dbGet(cfh, null, key.copy().send(), existsAlmostCertainly); if (obtainedPrevData == null) { prevData = null; } else { @@ -852,6 +857,8 @@ public class LLLocalDictionary implements LLDictionary { logger.trace("Writing {}: {}", LLUtils.toStringSafe(key), LLUtils.toStringSafe(newData)); } + assert key.isAccessible(); + assert newData.isAccessible(); dbPut(cfh, null, key.send(), newData.copy().send()); } return LLDelta.of( @@ -1128,7 +1135,15 @@ public class LLLocalDictionary implements LLDictionary { for (LLEntry entry : entriesWindow) { var k = entry.getKey(); var v = entry.getValue(); - batch.put(cfh, k, v); + if (databaseOptions.allowNettyDirect()) { + batch.put(cfh, k, v); + } else { + try (var key = k.receive()) { + try (var value = v.receive()) { + batch.put(cfh, LLUtils.toArray(key), LLUtils.toArray(value)); + } + } + } } batch.writeToDbAndClose(); batch.close(); @@ -1655,7 +1670,14 @@ public class LLLocalDictionary implements LLDictionary { )) { for (LLEntry entry : entriesList) { assert entry.isAccessible(); - batch.put(cfh, entry.getKey(), entry.getValue()); + if (databaseOptions.allowNettyDirect()) { + batch.put(cfh, entry.getKey(), entry.getValue()); + } else { + batch.put(cfh, + LLUtils.toArray(entry.getKeyUnsafe()), + LLUtils.toArray(entry.getValueUnsafe()) + ); + } } batch.writeToDbAndClose(); } diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java index 0745e79..b874082 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java @@ -1,6 +1,7 @@ package it.cavallium.dbengine.database.disk; import io.netty.buffer.api.BufferAllocator; +import io.netty.util.internal.PlatformDependent; import it.cavallium.dbengine.database.Column; import it.cavallium.dbengine.client.DatabaseOptions; import it.cavallium.dbengine.database.LLKeyValueDatabase; @@ -89,6 +90,19 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { DatabaseOptions databaseOptions) throws IOException { this.name = name; this.allocator = allocator; + + if (databaseOptions.allowNettyDirect()) { + if (!PlatformDependent.hasUnsafe()) { + throw new UnsupportedOperationException("Please enable unsafe support or disable netty direct buffers", + PlatformDependent.getUnsafeUnavailabilityCause() + ); + } + if (!PlatformDependent.hasDirectBufferNoCleanerConstructor()) { + throw new UnsupportedOperationException("Please enable unsafe support or disable netty direct buffers:" + + " DirectBufferNoCleanerConstructor is not available"); + } + } + Options rocksdbOptions = openRocksDb(path, databaseOptions); try { List descriptors = new LinkedList<>(); diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalReactiveRocksIterator.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalReactiveRocksIterator.java index 72bb8a3..64771f5 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalReactiveRocksIterator.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalReactiveRocksIterator.java @@ -68,10 +68,20 @@ public abstract class LLLocalReactiveRocksIterator { var rocksIterator = tuple.getT1(); rocksIterator.status(); if (rocksIterator.isValid()) { - try (Buffer key = LLUtils.readDirectNioBuffer(alloc, rocksIterator::key).receive()) { + Buffer key; + if (allowNettyDirect) { + key = LLUtils.readDirectNioBuffer(alloc, rocksIterator::key).receive(); + } else { + key = LLUtils.fromByteArray(alloc, rocksIterator.key()); + } + try (key) { Buffer value; if (readValues) { - value = LLUtils.readDirectNioBuffer(alloc, rocksIterator::value).receive(); + if (allowNettyDirect) { + value = LLUtils.readDirectNioBuffer(alloc, rocksIterator::value).receive(); + } else { + value = LLUtils.fromByteArray(alloc, rocksIterator.value()); + } } else { value = alloc.allocate(0); } diff --git a/src/main/java/it/cavallium/dbengine/database/serialization/BufferDataInput.java b/src/main/java/it/cavallium/dbengine/database/serialization/BufferDataInput.java index 9e0f8be..7d524ec 100644 --- a/src/main/java/it/cavallium/dbengine/database/serialization/BufferDataInput.java +++ b/src/main/java/it/cavallium/dbengine/database/serialization/BufferDataInput.java @@ -12,9 +12,11 @@ import org.jetbrains.annotations.NotNull; public class BufferDataInput implements DataInput, SafeCloseable { private final Buffer buf; + private final int initialReaderOffset; public BufferDataInput(Send bufferSend) { this.buf = bufferSend.receive().makeReadOnly(); + this.initialReaderOffset = buf.readerOffset(); } @Override @@ -104,4 +106,8 @@ public class BufferDataInput implements DataInput, SafeCloseable { public void close() { buf.close(); } + + public int getReadBytesCount() { + return buf.readerOffset() - initialReaderOffset; + } } diff --git a/src/main/java/it/cavallium/dbengine/database/serialization/CodecSerializer.java b/src/main/java/it/cavallium/dbengine/database/serialization/CodecSerializer.java index edc173a..08a6eaa 100644 --- a/src/main/java/it/cavallium/dbengine/database/serialization/CodecSerializer.java +++ b/src/main/java/it/cavallium/dbengine/database/serialization/CodecSerializer.java @@ -8,7 +8,7 @@ import java.io.IOException; import org.jetbrains.annotations.NotNull; import org.warp.commonutils.error.IndexOutOfBoundsException; -public class CodecSerializer implements Serializer> { +public class CodecSerializer implements Serializer { private final BufferAllocator allocator; private final Codecs deserializationCodecs; @@ -37,8 +37,8 @@ public class CodecSerializer implements Serializer> { } @Override - public @NotNull A deserialize(@NotNull Send serialized) { - try (var is = new BufferDataInput(serialized)) { + public @NotNull DeserializationResult deserialize(@NotNull Send serializedToReceive) { + try (var is = new BufferDataInput(serializedToReceive)) { int codecId; if (microCodecs) { codecId = is.readUnsignedByte(); @@ -46,7 +46,7 @@ public class CodecSerializer implements Serializer> { codecId = is.readInt(); } var serializer = deserializationCodecs.getCodec(codecId); - return serializer.deserialize(is); + return new DeserializationResult<>(serializer.deserialize(is), is.getReadBytesCount()); } catch (IOException ex) { // This shouldn't happen throw new IOError(ex); diff --git a/src/main/java/it/cavallium/dbengine/database/serialization/Serializer.java b/src/main/java/it/cavallium/dbengine/database/serialization/Serializer.java index 201e1a1..4d54148 100644 --- a/src/main/java/it/cavallium/dbengine/database/serialization/Serializer.java +++ b/src/main/java/it/cavallium/dbengine/database/serialization/Serializer.java @@ -39,11 +39,11 @@ public interface Serializer { @Override public @NotNull DeserializationResult deserialize(@NotNull Send serializedToReceive) { try (Buffer serialized = serializedToReceive.receive()) { + assert serialized.isAccessible(); int length = serialized.readInt(); var readerOffset = serialized.readerOffset(); - var readableBytes = serialized.readableBytes(); return new DeserializationResult<>(LLUtils.deserializeString(serialized.send(), - readerOffset, length, StandardCharsets.UTF_8), readableBytes); + readerOffset, length, StandardCharsets.UTF_8), Integer.BYTES + length); } } @@ -51,8 +51,10 @@ public interface Serializer { public @NotNull Send serialize(@NotNull String deserialized) { var bytes = deserialized.getBytes(StandardCharsets.UTF_8); try (Buffer buf = allocator.allocate(Integer.BYTES + bytes.length)) { + assert buf.isAccessible(); buf.writeInt(bytes.length); buf.writeBytes(bytes); + assert buf.isAccessible(); return buf.send(); } } diff --git a/src/main/java/it/cavallium/dbengine/database/serialization/SerializerFixedBinaryLength.java b/src/main/java/it/cavallium/dbengine/database/serialization/SerializerFixedBinaryLength.java index 0f1bd55..24ceb1d 100644 --- a/src/main/java/it/cavallium/dbengine/database/serialization/SerializerFixedBinaryLength.java +++ b/src/main/java/it/cavallium/dbengine/database/serialization/SerializerFixedBinaryLength.java @@ -58,9 +58,8 @@ public interface SerializerFixedBinaryLength extends Serializer { + serialized.readableBytes() + " bytes instead"); } var readerOffset = serialized.readerOffset(); - var readableBytes = serialized.readableBytes(); return new DeserializationResult<>(LLUtils.deserializeString(serialized.send(), - readerOffset, length, StandardCharsets.UTF_8), readableBytes); + readerOffset, length, StandardCharsets.UTF_8), length); } } @@ -68,12 +67,14 @@ public interface SerializerFixedBinaryLength extends Serializer { public @NotNull Send serialize(@NotNull String deserialized) throws SerializationException { // UTF-8 uses max. 3 bytes per char, so calculate the worst case. try (Buffer buf = allocator.allocate(LLUtils.utf8MaxBytes(deserialized))) { + assert buf.isAccessible(); buf.writeBytes(deserialized.getBytes(StandardCharsets.UTF_8)); if (buf.readableBytes() != getSerializedBinaryLength()) { throw new SerializationException("Fixed serializer with " + getSerializedBinaryLength() + " bytes has tried to serialize an element with " + buf.readableBytes() + " bytes instead"); } + assert buf.isAccessible(); return buf.send(); } } @@ -95,8 +96,7 @@ public interface SerializerFixedBinaryLength extends Serializer { "Fixed serializer with " + getSerializedBinaryLength() + " bytes has tried to deserialize an element with " + serialized.readableBytes() + " bytes instead"); } - var readableBytes = serialized.readableBytes(); - return new DeserializationResult<>(serialized.readInt(), readableBytes); + return new DeserializationResult<>(serialized.readInt(), Integer.BYTES); } } @@ -125,7 +125,7 @@ public interface SerializerFixedBinaryLength extends Serializer { + serialized.readableBytes() + " bytes instead"); } var readableBytes = serialized.readableBytes(); - return new DeserializationResult<>(serialized.readLong(), readableBytes); + return new DeserializationResult<>(serialized.readLong(), Long.BYTES); } } diff --git a/src/test/java/it/cavallium/dbengine/DbTestUtils.java b/src/test/java/it/cavallium/dbengine/DbTestUtils.java index 339e032..7f62f1f 100644 --- a/src/test/java/it/cavallium/dbengine/DbTestUtils.java +++ b/src/test/java/it/cavallium/dbengine/DbTestUtils.java @@ -99,7 +99,7 @@ public class DbTestUtils { .flatMap(conn -> conn .getDatabase("testdb", List.of(Column.dictionary("testmap"), Column.special("ints"), Column.special("longs")), - new DatabaseOptions(Map.of(), true, false, true, false, true, true, true, -1) + new DatabaseOptions(Map.of(), true, false, true, false, true, false, false, -1) ) .map(db -> new TempDb(alloc, conn, db, wrkspcPath)) ); @@ -165,12 +165,10 @@ public class DbTestUtils { } @Override - public @NotNull Short deserialize(@NotNull Send serializedToReceive) { + public @NotNull DeserializationResult deserialize(@NotNull Send serializedToReceive) { try (var serialized = serializedToReceive.receive()) { - var prevReaderIdx = serialized.readerOffset(); var val = serialized.readShort(); - serialized.readerOffset(prevReaderIdx + Short.BYTES); - return val; + return new DeserializationResult<>(val, Short.BYTES); } }