(unfinished) Netty 5 refactoring

This commit is contained in:
Andrea Cavalli 2021-08-31 09:14:46 +02:00
parent 3b55e8bd24
commit 0faef5316e
29 changed files with 603 additions and 499 deletions

View File

@ -1,32 +1,31 @@
package it.cavallium.dbengine.client;
import io.netty.buffer.api.Buffer;
import io.netty.buffer.api.Send;
import it.cavallium.dbengine.database.serialization.SerializationException;
import it.cavallium.dbengine.database.serialization.Serializer;
import org.jetbrains.annotations.NotNull;
public class MappedSerializer<A, B> implements Serializer<B, Buffer> {
public class MappedSerializer<A, B> implements Serializer<B, Send<Buffer>> {
private final Serializer<A, Buffer> serializer;
private final Serializer<A, Send<Buffer>> serializer;
private final Mapper<A, B> keyMapper;
public MappedSerializer(Serializer<A, Buffer> serializer,
public MappedSerializer(Serializer<A, Send<Buffer>> serializer,
Mapper<A, B> keyMapper) {
this.serializer = serializer;
this.keyMapper = keyMapper;
}
@Override
public @NotNull B deserialize(@NotNull Buffer serialized) throws SerializationException {
try {
return keyMapper.map(serializer.deserialize(serialized.retain()));
} finally {
serialized.release();
public @NotNull B deserialize(@NotNull Send<Buffer> serialized) throws SerializationException {
try (serialized) {
return keyMapper.map(serializer.deserialize(serialized));
}
}
@Override
public @NotNull Buffer serialize(@NotNull B deserialized) throws SerializationException {
public @NotNull Send<Buffer> serialize(@NotNull B deserialized) throws SerializationException {
return serializer.serialize(keyMapper.unmap(deserialized));
}
}

View File

@ -1,32 +1,31 @@
package it.cavallium.dbengine.client;
import io.netty.buffer.api.Buffer;
import io.netty.buffer.api.Send;
import it.cavallium.dbengine.database.serialization.SerializationException;
import it.cavallium.dbengine.database.serialization.SerializerFixedBinaryLength;
import org.jetbrains.annotations.NotNull;
public class MappedSerializerFixedLength<A, B> implements SerializerFixedBinaryLength<B, Buffer> {
public class MappedSerializerFixedLength<A, B> implements SerializerFixedBinaryLength<B, Send<Buffer>> {
private final SerializerFixedBinaryLength<A, Buffer> fixedLengthSerializer;
private final SerializerFixedBinaryLength<A, Send<Buffer>> fixedLengthSerializer;
private final Mapper<A, B> keyMapper;
public MappedSerializerFixedLength(SerializerFixedBinaryLength<A, Buffer> fixedLengthSerializer,
public MappedSerializerFixedLength(SerializerFixedBinaryLength<A, Send<Buffer>> fixedLengthSerializer,
Mapper<A, B> keyMapper) {
this.fixedLengthSerializer = fixedLengthSerializer;
this.keyMapper = keyMapper;
}
@Override
public @NotNull B deserialize(@NotNull Buffer serialized) throws SerializationException {
try {
return keyMapper.map(fixedLengthSerializer.deserialize(serialized.retain()));
} finally {
serialized.release();
public @NotNull B deserialize(@NotNull Send<Buffer> serialized) throws SerializationException {
try (serialized) {
return keyMapper.map(fixedLengthSerializer.deserialize(serialized));
}
}
@Override
public @NotNull Buffer serialize(@NotNull B deserialized) throws SerializationException {
public @NotNull Send<Buffer> serialize(@NotNull B deserialized) throws SerializationException {
return fixedLengthSerializer.serialize(keyMapper.unmap(deserialized));
}

View File

@ -2,6 +2,7 @@ package it.cavallium.dbengine.database;
import com.google.common.primitives.Ints;
import com.google.common.primitives.Longs;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.api.Buffer;
import io.netty.buffer.api.BufferAllocator;
import io.netty.buffer.api.CompositeBuffer;
@ -70,8 +71,8 @@ public class LLUtils {
return response[0] == 1;
}
public static boolean responseToBoolean(Buffer response) {
try (response) {
public static boolean responseToBoolean(Send<Buffer> responseToReceive) {
try (var response = responseToReceive.receive()) {
assert response.readableBytes() == 1;
return response.getByte(response.readerOffset()) == 1;
}
@ -228,6 +229,29 @@ public class LLUtils {
}
}
/**
* Returns {@code true} if and only if the two specified buffers are
* identical to each other for {@code length} bytes starting at {@code aStartIndex}
* index for the {@code a} buffer and {@code bStartIndex} index for the {@code b} buffer.
* A more compact way to express this is:
* <p>
* {@code a[aStartIndex : aStartIndex + length] == b[bStartIndex : bStartIndex + length]}
*/
public static boolean equals(Buffer a, int aStartIndex, Buffer b, int bStartIndex, int length) {
var aCur = a.openCursor(aStartIndex, length);
var bCur = b.openCursor(bStartIndex, length);
if (aCur.bytesLeft() != bCur.bytesLeft()) {
return false;
}
while (aCur.readByte() && bCur.readByte()) {
if (aCur.getByte() != bCur.getByte()) {
return false;
}
}
return true;
}
public static byte[] toArray(Buffer key) {
byte[] array = new byte[key.readableBytes()];
key.copyInto(key.readerOffset(), array, 0, key.readableBytes());
@ -732,8 +756,4 @@ public class LLUtils {
public static int utf8MaxBytes(String deserialized) {
return deserialized.length() * 3;
}
public static void writeString(Buffer buf, String deserialized, Charset charset) {
buf.writeBytes(deserialized.getBytes(charset));
}
}

View File

@ -1,44 +1,39 @@
package it.cavallium.dbengine.database.collections;
import io.netty.buffer.api.Buffer;
import io.netty.buffer.api.BufferAllocator;
import io.netty.buffer.api.Send;
import it.cavallium.dbengine.database.LLDictionary;
import it.cavallium.dbengine.database.serialization.Serializer;
import java.util.function.Function;
import org.jetbrains.annotations.NotNull;
import static io.netty.buffer.Unpooled.*;
public class DatabaseEmpty {
@SuppressWarnings({"unused", "InstantiationOfUtilityClass"})
public static final Nothing NOTHING = new Nothing();
public static final Serializer<Nothing, Buffer> NOTHING_SERIALIZER = new Serializer<>() {
@Override
public @NotNull Nothing deserialize(@NotNull Buffer serialized) {
try {
return NOTHING;
} finally {
serialized.release();
}
}
@Override
public @NotNull Buffer serialize(@NotNull Nothing deserialized) {
return EMPTY_BUFFER;
}
};
public static final Function<Nothing, Nothing> NOTHING_HASH_FUNCTION = nothing -> nothing;
private static final SubStageGetter<Nothing, DatabaseStageEntry<Nothing>> NOTHING_SUB_STAGE_GETTER
= new SubStageGetterSingle<>(NOTHING_SERIALIZER);
public static Serializer<Nothing, Send<Buffer>> nothingSerializer(BufferAllocator bufferAllocator) {
return new Serializer<>() {
@Override
public @NotNull Nothing deserialize(@NotNull Send<Buffer> serialized) {
try (serialized) {
return NOTHING;
}
}
@Override
public @NotNull Send<Buffer> serialize(@NotNull Nothing deserialized) {
return bufferAllocator.allocate(0).send();
}
};
}
private DatabaseEmpty() {
}
public static DatabaseStageEntry<Nothing> create(LLDictionary dictionary, Buffer key) {
return new DatabaseSingle<>(dictionary, key, NOTHING_SERIALIZER);
}
public static SubStageGetter<Nothing, DatabaseStageEntry<Nothing>> createSubStageGetter() {
return NOTHING_SUB_STAGE_GETTER;
public static DatabaseStageEntry<Nothing> create(LLDictionary dictionary, Send<Buffer> key) {
return new DatabaseSingle<>(dictionary, key, nothingSerializer(dictionary.getAllocator()));
}
public static final class Nothing {

View File

@ -2,6 +2,7 @@ package it.cavallium.dbengine.database.collections;
import io.netty.buffer.api.Buffer;
import io.netty.buffer.api.Send;
import io.netty.buffer.api.internal.ResourceSupport;
import io.netty.util.ReferenceCounted;
import it.cavallium.dbengine.client.CompositeSnapshot;
import it.cavallium.dbengine.database.Delta;
@ -82,9 +83,9 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
public Mono<Map<T, U>> get(@Nullable CompositeSnapshot snapshot, boolean existsAlmostCertainly) {
return dictionary
.getRange(resolveSnapshot(snapshot), rangeMono, existsAlmostCertainly)
.<Entry<T, U>>handle((entry, sink) -> {
try {
var key = deserializeSuffix(stripPrefix(entry.getKey(), false));
.<Entry<T, U>>handle((entrySend, sink) -> {
try (var entry = entrySend.receive()) {
var key = deserializeSuffix(stripPrefix(entry.getKey()));
var value = valueSerializer.deserialize(entry.getValue());
sink.next(Map.entry(key, value));
} catch (SerializationException ex) {
@ -103,8 +104,8 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
.fromIterable(Collections.unmodifiableMap(value).entrySet())
.handle((entry, sink) -> {
try {
sink.next(new LLEntry(this.toKey(serializeSuffix(entry.getKey())),
valueSerializer.serialize(entry.getValue())));
sink.next(LLEntry.of(this.toKey(serializeSuffix(entry.getKey())),
valueSerializer.serialize(entry.getValue())).send());
} catch (SerializationException e) {
sink.error(e);
}
@ -141,30 +142,19 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
@Override
public Mono<U> getValue(@Nullable CompositeSnapshot snapshot, T keySuffix, boolean existsAlmostCertainly) {
return Mono
.using(
() -> toKey(serializeSuffix(keySuffix)),
keyBuf -> dictionary
.get(resolveSnapshot(snapshot), LLUtils.lazyRetain(keyBuf), existsAlmostCertainly)
.handle(this::deserializeValue),
ReferenceCounted::release
);
return dictionary
.get(resolveSnapshot(snapshot), Mono.fromCallable(() -> toKey(serializeSuffix(keySuffix))), existsAlmostCertainly)
.handle(this::deserializeValue);
}
@Override
public Mono<Void> putValue(T keySuffix, U value) {
return Mono.using(() -> serializeSuffix(keySuffix),
keySuffixBuf -> Mono.using(() -> toKey(keySuffixBuf.retain()),
keyBuf -> Mono.using(() -> valueSerializer.serialize(value),
valueBuf -> dictionary
.put(LLUtils.lazyRetain(keyBuf), LLUtils.lazyRetain(valueBuf), LLDictionaryResultType.VOID)
.doOnNext(ReferenceCounted::release),
ReferenceCounted::release
),
ReferenceCounted::release
),
ReferenceCounted::release
).then();
var keyMono = Mono.fromCallable(() -> toKey(serializeSuffix(keySuffix)));
var valueMono = Mono.fromCallable(() -> valueSerializer.serialize(value));
return dictionary
.put(keyMono, valueMono, LLDictionaryResultType.VOID)
.doOnNext(Send::close)
.then();
}
@Override
@ -177,212 +167,149 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
UpdateReturnMode updateReturnMode,
boolean existsAlmostCertainly,
SerializationFunction<@Nullable U, @Nullable U> updater) {
return Mono
.using(
() -> toKey(serializeSuffix(keySuffix)),
keyBuf -> dictionary
.update(LLUtils.lazyRetain(keyBuf), getSerializedUpdater(updater), updateReturnMode, existsAlmostCertainly)
.handle(this::deserializeValue),
ReferenceCounted::release
);
var keyMono = Mono.fromCallable(() -> toKey(serializeSuffix(keySuffix)));
return dictionary
.update(keyMono, getSerializedUpdater(updater), updateReturnMode, existsAlmostCertainly)
.handle(this::deserializeValue);
}
@Override
public Mono<Delta<U>> updateValueAndGetDelta(T keySuffix,
boolean existsAlmostCertainly,
SerializationFunction<@Nullable U, @Nullable U> updater) {
return Mono
.using(
() -> toKey(serializeSuffix(keySuffix)),
keyBuf -> dictionary
.updateAndGetDelta(LLUtils.lazyRetain(keyBuf), getSerializedUpdater(updater), existsAlmostCertainly)
.transform(mono -> LLUtils.mapDelta(mono, valueSerializer::deserialize)),
ReferenceCounted::release
);
var keyMono = Mono.fromCallable(() -> toKey(serializeSuffix(keySuffix)));
return dictionary
.updateAndGetDelta(keyMono, getSerializedUpdater(updater), existsAlmostCertainly)
.transform(mono -> LLUtils.mapLLDelta(mono, valueSerializer::deserialize));
}
public SerializationFunction<@Nullable Buffer, @Nullable Buffer> getSerializedUpdater(SerializationFunction<@Nullable U, @Nullable U> updater) {
public SerializationFunction<@Nullable Send<Buffer>, @Nullable Send<Buffer>> getSerializedUpdater(
SerializationFunction<@Nullable U, @Nullable U> updater) {
return oldSerialized -> {
try {
try (oldSerialized) {
U result;
if (oldSerialized == null) {
result = updater.apply(null);
} else {
result = updater.apply(valueSerializer.deserialize(oldSerialized.retain()));
result = updater.apply(valueSerializer.deserialize(oldSerialized));
}
if (result == null) {
return null;
} else {
return valueSerializer.serialize(result);
}
} finally {
if (oldSerialized != null) {
oldSerialized.release();
}
}
};
}
public <X> BiSerializationFunction<@Nullable Buffer, X, @Nullable Buffer> getSerializedUpdater(
public <X> BiSerializationFunction<@Nullable Send<Buffer>, X, @Nullable Send<Buffer>> getSerializedUpdater(
BiSerializationFunction<@Nullable U, X, @Nullable U> updater) {
return (oldSerialized, extra) -> {
try {
try (oldSerialized) {
U result;
if (oldSerialized == null) {
result = updater.apply(null, extra);
} else {
result = updater.apply(valueSerializer.deserialize(oldSerialized.retain()), extra);
result = updater.apply(valueSerializer.deserialize(oldSerialized), extra);
}
if (result == null) {
return null;
} else {
return valueSerializer.serialize(result);
}
} finally {
if (oldSerialized != null) {
oldSerialized.release();
}
}
};
}
@Override
public Mono<U> putValueAndGetPrevious(T keySuffix, U value) {
return Mono
.using(
() -> serializeSuffix(keySuffix),
keySuffixBuf -> Mono
.using(
() -> toKey(keySuffixBuf.retain()),
keyBuf -> Mono
.using(() -> valueSerializer.serialize(value),
valueBuf -> dictionary
.put(LLUtils.lazyRetain(keyBuf),
LLUtils.lazyRetain(valueBuf),
LLDictionaryResultType.PREVIOUS_VALUE)
.handle(this::deserializeValue),
ReferenceCounted::release
),
ReferenceCounted::release
),
ReferenceCounted::release
);
var keyMono = Mono.fromCallable(() -> toKey(serializeSuffix(keySuffix)));
var valueMono = Mono.fromCallable(() -> valueSerializer.serialize(value));
return dictionary
.put(keyMono,
valueMono,
LLDictionaryResultType.PREVIOUS_VALUE)
.handle(this::deserializeValue);
}
@Override
public Mono<Boolean> putValueAndGetChanged(T keySuffix, U value) {
return Mono
.using(
() -> serializeSuffix(keySuffix),
keySuffixBuf -> Mono
.using(
() -> toKey(keySuffixBuf.retain()),
keyBuf -> Mono
.using(() -> valueSerializer.serialize(value),
valueBuf -> dictionary
.put(LLUtils.lazyRetain(keyBuf),
LLUtils.lazyRetain(valueBuf),
LLDictionaryResultType.PREVIOUS_VALUE
)
.handle(this::deserializeValue)
.map(oldValue -> !Objects.equals(oldValue, value))
.defaultIfEmpty(value != null),
ReferenceCounted::release
),
ReferenceCounted::release
),
ReferenceCounted::release
);
var keyMono = Mono.fromCallable(() -> toKey(serializeSuffix(keySuffix)));
var valueMono = Mono.fromCallable(() -> valueSerializer.serialize(value));
return dictionary
.put(keyMono, valueMono, LLDictionaryResultType.PREVIOUS_VALUE)
.handle(this::deserializeValue)
.map(oldValue -> !Objects.equals(oldValue, value))
.defaultIfEmpty(value != null);
}
@Override
public Mono<Void> remove(T keySuffix) {
return Mono
.using(
() -> toKey(serializeSuffix(keySuffix)),
keyBuf -> dictionary
.remove(LLUtils.lazyRetain(keyBuf), LLDictionaryResultType.VOID)
.doOnNext(ReferenceCounted::release)
.then(),
ReferenceCounted::release
);
var keyMono = Mono.fromCallable(() -> toKey(serializeSuffix(keySuffix)));
return dictionary
.remove(keyMono, LLDictionaryResultType.VOID)
.doOnNext(Send::close)
.then();
}
@Override
public Mono<U> removeAndGetPrevious(T keySuffix) {
return Mono
.using(
() -> toKey(serializeSuffix(keySuffix)),
keyBuf -> dictionary
.remove(LLUtils.lazyRetain(keyBuf), LLDictionaryResultType.PREVIOUS_VALUE)
.handle(this::deserializeValue),
ReferenceCounted::release
);
var keyMono = Mono.fromCallable(() -> toKey(serializeSuffix(keySuffix)));
return dictionary
.remove(keyMono, LLDictionaryResultType.PREVIOUS_VALUE)
.handle(this::deserializeValue);
}
@Override
public Mono<Boolean> removeAndGetStatus(T keySuffix) {
return Mono
.using(
() -> toKey(serializeSuffix(keySuffix)),
keyBuf -> dictionary
.remove(LLUtils.lazyRetain(keyBuf), LLDictionaryResultType.PREVIOUS_VALUE_EXISTENCE)
.map(LLUtils::responseToBoolean),
ReferenceCounted::release
);
var keyMono = Mono.fromCallable(() -> toKey(serializeSuffix(keySuffix)));
return dictionary
.remove(keyMono, LLDictionaryResultType.PREVIOUS_VALUE_EXISTENCE)
.map(LLUtils::responseToBoolean);
}
@Override
public Flux<Entry<T, Optional<U>>> getMulti(@Nullable CompositeSnapshot snapshot, Flux<T> keys, boolean existsAlmostCertainly) {
return dictionary.getMulti(resolveSnapshot(snapshot), keys.flatMap(keySuffix -> Mono.fromCallable(() -> {
Buffer keySuffixBuf = serializeSuffix(keySuffix);
try {
var key = toKey(keySuffixBuf.retain());
try {
return Tuples.of(keySuffix, key.retain());
} finally {
key.release();
}
} finally {
keySuffixBuf.release();
}
})), existsAlmostCertainly).flatMapSequential(entry -> {
entry.getT2().release();
return Mono.fromCallable(() -> {
Optional<U> valueOpt;
if (entry.getT3().isPresent()) {
var buf = entry.getT3().get();
var mappedKeys = keys
.<Tuple2<T, Send<Buffer>>>handle((keySuffix, sink) -> {
try {
valueOpt = Optional.of(valueSerializer.deserialize(buf.retain()));
} finally {
buf.release();
sink.next(Tuples.of(keySuffix, toKey(serializeSuffix(keySuffix))));
} catch (SerializationException ex) {
sink.error(ex);
}
} else {
valueOpt = Optional.empty();
}
return Map.entry(entry.getT1(), valueOpt);
});
}).transform(LLUtils::handleDiscard);
});
return dictionary
.getMulti(resolveSnapshot(snapshot), mappedKeys, existsAlmostCertainly)
.<Entry<T, Optional<U>>>handle((entry, sink) -> {
try {
Optional<U> valueOpt;
if (entry.getT3().isPresent()) {
try (var buf = entry.getT3().get()) {
valueOpt = Optional.of(valueSerializer.deserialize(buf));
}
} else {
valueOpt = Optional.empty();
}
sink.next(Map.entry(entry.getT1(), valueOpt));
} catch (SerializationException ex) {
sink.error(ex);
}
})
.transform(LLUtils::handleDiscard);
}
private LLEntry serializeEntry(T key, U value) throws SerializationException {
Buffer serializedKey = toKey(serializeSuffix(key));
try {
Buffer serializedValue = valueSerializer.serialize(value);
try {
return new LLEntry(serializedKey.retain(), serializedValue.retain());
} finally {
serializedValue.release();
private Send<LLEntry> serializeEntry(T key, U value) throws SerializationException {
try (var serializedKey = toKey(serializeSuffix(key)).receive()) {
try (var serializedValue = valueSerializer.serialize(value).receive()) {
return LLEntry.of(serializedKey.send(), serializedValue.send()).send();
}
} finally {
serializedKey.release();
}
}
@Override
public Mono<Void> putMulti(Flux<Entry<T, U>> entries) {
var serializedEntries = entries
.<LLEntry>handle((entry, sink) -> {
.<Send<LLEntry>>handle((entry, sink) -> {
try {
sink.next(serializeEntry(entry.getKey(), entry.getValue()));
} catch (SerializationException e) {
@ -392,26 +319,26 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
return dictionary
.putMulti(serializedEntries, false)
.then()
.doOnDiscard(LLEntry.class, entry -> {
if (!entry.isReleased()) {
entry.release();
}
});
.doOnDiscard(LLEntry.class, ResourceSupport::close);
}
@Override
public <X> Flux<ExtraKeyOperationResult<T, X>> updateMulti(Flux<Tuple2<T, X>> entries,
BiSerializationFunction<@Nullable U, X, @Nullable U> updater) {
Flux<Tuple2<Buffer, X>> serializedEntries = entries
.flatMap(entry -> Mono
.fromCallable(() -> Tuples.of(serializeSuffix(entry.getT1()), entry.getT2()))
)
var serializedEntries = entries
.<Tuple2<Send<Buffer>, X>>handle((entry, sink) -> {
try {
sink.next(Tuples.of(serializeSuffix(entry.getT1()), entry.getT2()));
} catch (SerializationException ex) {
sink.error(ex);
}
})
.doOnDiscard(Tuple2.class, uncastedEntry -> {
if (uncastedEntry.getT1() instanceof Buffer byteBuf) {
byteBuf.release();
byteBuf.close();
}
if (uncastedEntry.getT2() instanceof Buffer byteBuf) {
byteBuf.release();
byteBuf.close();
}
});
var serializedUpdater = getSerializedUpdater(updater);
@ -433,22 +360,17 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
return dictionary
.getRangeKeys(resolveSnapshot(snapshot), rangeMono)
.handle((key, sink) -> {
try {
Buffer keySuffixWithExt = stripPrefix(key.retain(), false);
try {
sink.next(Map.entry(deserializeSuffix(keySuffixWithExt.retainedSlice()),
try (key) {
try (var keySuffixWithExt = stripPrefix(key).receive()) {
sink.next(Map.entry(deserializeSuffix(keySuffixWithExt.copy().send()),
new DatabaseSingleMapped<>(new DatabaseSingle<>(dictionary,
toKey(keySuffixWithExt.retainedSlice()),
toKey(keySuffixWithExt.send()),
Serializer.noop()
), valueSerializer)
));
} catch (SerializationException ex) {
sink.error(ex);
} finally {
keySuffixWithExt.release();
}
} finally {
key.release();
} catch (SerializationException ex) {
sink.error(ex);
}
});
}
@ -457,34 +379,20 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
public Flux<Entry<T, U>> getAllValues(@Nullable CompositeSnapshot snapshot) {
return dictionary
.getRange(resolveSnapshot(snapshot), rangeMono)
.<Entry<T, U>>handle((serializedEntry, sink) -> {
Buffer key = serializedEntry.getKey();
Buffer value = serializedEntry.getValue();
try {
Buffer keySuffix = stripPrefix(key.retain(), false);
try {
sink.next(Map.entry(deserializeSuffix(keySuffix.retain()),
valueSerializer.deserialize(value.retain())));
} finally {
keySuffix.release();
}
.<Entry<T, U>>handle((serializedEntryToReceive, sink) -> {
try (var serializedEntry = serializedEntryToReceive.receive()) {
sink.next(Map.entry(deserializeSuffix(stripPrefix(serializedEntry.getKey())),
valueSerializer.deserialize(serializedEntry.getValue())));
} catch (SerializationException e) {
sink.error(e);
} finally {
key.release();
value.release();
}
})
.doOnDiscard(Entry.class, uncastedEntry -> {
if (uncastedEntry.getKey() instanceof Buffer byteBuf) {
if (byteBuf.refCnt() > 0) {
byteBuf.release();
}
byteBuf.close();
}
if (uncastedEntry.getValue() instanceof Buffer byteBuf) {
if (byteBuf.refCnt() > 0) {
byteBuf.release();
}
byteBuf.close();
}
});
}
@ -495,17 +403,8 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
this.getAllValues(null),
dictionary.setRange(rangeMono, entries.handle((entry, sink) -> {
try {
Buffer serializedKey = toKey(serializeSuffix(entry.getKey()));
try {
Buffer serializedValue = valueSerializer.serialize(entry.getValue());
try {
sink.next(new LLEntry(serializedKey.retain(), serializedValue.retain()));
} finally {
serializedValue.release();
}
} finally {
serializedKey.release();
}
sink.next(LLEntry.of(toKey(serializeSuffix(entry.getKey())),
valueSerializer.serialize(entry.getValue())).send());
} catch (SerializationException e) {
sink.error(e);
}
@ -515,18 +414,16 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
@Override
public Mono<Void> clear() {
return Mono.defer(() -> {
if (range.isAll()) {
return dictionary.clear();
} else if (range.isSingle()) {
return dictionary
.remove(LLUtils.lazyRetain(range.getSingle()), LLDictionaryResultType.VOID)
.doOnNext(ReferenceCounted::release)
.then();
} else {
return dictionary.setRange(rangeMono, Flux.empty());
}
});
if (range.isAll()) {
return dictionary.clear();
} else if (range.isSingle()) {
return dictionary
.remove(Mono.fromCallable(range::getSingle), LLDictionaryResultType.VOID)
.doOnNext(Send::close)
.then();
} else {
return dictionary.setRange(rangeMono, Flux.empty());
}
}
}

View File

@ -43,7 +43,7 @@ public class DatabaseMapDictionaryHashed<T, U, TH> implements DatabaseStageMap<T
Serializer<T, Send<Buffer>> keySuffixSerializer,
Serializer<U, Send<Buffer>> valueSerializer,
Function<T, TH> keySuffixHashFunction,
SerializerFixedBinaryLength<TH, Buffer> keySuffixHashSerializer) {
SerializerFixedBinaryLength<TH, Send<Buffer>> keySuffixHashSerializer) {
if (dictionary.getUpdateMode().block() != UpdateMode.ALLOW) {
throw new IllegalArgumentException("Hashed maps only works when UpdateMode is ALLOW");
}
@ -61,13 +61,13 @@ public class DatabaseMapDictionaryHashed<T, U, TH> implements DatabaseStageMap<T
}
public static <T, U, UH> DatabaseMapDictionaryHashed<T, U, UH> simple(LLDictionary dictionary,
Serializer<T, Buffer> keySerializer,
Serializer<U, Buffer> valueSerializer,
Serializer<T, Send<Buffer>> keySerializer,
Serializer<U, Send<Buffer>> valueSerializer,
Function<T, UH> keyHashFunction,
SerializerFixedBinaryLength<UH, Buffer> keyHashSerializer) {
SerializerFixedBinaryLength<UH, Send<Buffer>> keyHashSerializer) {
return new DatabaseMapDictionaryHashed<>(
dictionary,
dictionary.getAllocator().buffer(0),
dictionary.getAllocator().allocate(0).send(),
keySerializer,
valueSerializer,
keyHashFunction,
@ -76,11 +76,11 @@ public class DatabaseMapDictionaryHashed<T, U, TH> implements DatabaseStageMap<T
}
public static <T, U, UH> DatabaseMapDictionaryHashed<T, U, UH> tail(LLDictionary dictionary,
Buffer prefixKey,
Serializer<T, Buffer> keySuffixSerializer,
Serializer<U, Buffer> valueSerializer,
Send<Buffer> prefixKey,
Serializer<T, Send<Buffer>> keySuffixSerializer,
Serializer<U, Send<Buffer>> valueSerializer,
Function<T, UH> keySuffixHashFunction,
SerializerFixedBinaryLength<UH, Buffer> keySuffixHashSerializer) {
SerializerFixedBinaryLength<UH, Send<Buffer>> keySuffixHashSerializer) {
return new DatabaseMapDictionaryHashed<>(dictionary,
prefixKey,
keySuffixSerializer,

View File

@ -1,6 +1,7 @@
package it.cavallium.dbengine.database.collections;
import io.netty.buffer.api.Buffer;
import io.netty.buffer.api.Send;
import it.cavallium.dbengine.client.CompositeSnapshot;
import it.cavallium.dbengine.database.LLDictionary;
import it.cavallium.dbengine.database.collections.DatabaseEmpty.Nothing;
@ -15,24 +16,21 @@ import reactor.core.publisher.Mono;
public class DatabaseSetDictionary<T> extends DatabaseMapDictionary<T, Nothing> {
protected DatabaseSetDictionary(LLDictionary dictionary,
Buffer prefixKey,
SerializerFixedBinaryLength<T, Buffer> keySuffixSerializer) {
super(dictionary, prefixKey, keySuffixSerializer, DatabaseEmpty.NOTHING_SERIALIZER);
Send<Buffer> prefixKey,
SerializerFixedBinaryLength<T, Send<Buffer>> keySuffixSerializer) {
super(dictionary, prefixKey, keySuffixSerializer, DatabaseEmpty.nothingSerializer(dictionary.getAllocator()));
}
public static <T> DatabaseSetDictionary<T> simple(LLDictionary dictionary,
SerializerFixedBinaryLength<T, Buffer> keySerializer) {
var buf = dictionary.getAllocator().buffer(0);
try {
return new DatabaseSetDictionary<>(dictionary, buf, keySerializer);
} finally {
buf.release();
SerializerFixedBinaryLength<T, Send<Buffer>> keySerializer) {
try (var buf = dictionary.getAllocator().allocate(0)) {
return new DatabaseSetDictionary<>(dictionary, buf.send(), keySerializer);
}
}
public static <T> DatabaseSetDictionary<T> tail(LLDictionary dictionary,
Buffer prefixKey,
SerializerFixedBinaryLength<T, Buffer> keySuffixSerializer) {
Send<Buffer> prefixKey,
SerializerFixedBinaryLength<T, Send<Buffer>> keySuffixSerializer) {
return new DatabaseSetDictionary<>(dictionary, prefixKey, keySuffixSerializer);
}

View File

@ -1,6 +1,7 @@
package it.cavallium.dbengine.database.collections;
import io.netty.buffer.api.Buffer;
import io.netty.buffer.api.Send;
import it.cavallium.dbengine.client.CompositeSnapshot;
import it.cavallium.dbengine.database.LLDictionary;
import it.cavallium.dbengine.database.collections.DatabaseEmpty.Nothing;
@ -17,25 +18,25 @@ import reactor.core.publisher.Mono;
public class DatabaseSetDictionaryHashed<T, TH> extends DatabaseMapDictionaryHashed<T, Nothing, TH> {
protected DatabaseSetDictionaryHashed(LLDictionary dictionary,
Buffer prefixKey,
Serializer<T, Buffer> keySuffixSerializer,
Send<Buffer> prefixKey,
Serializer<T, Send<Buffer>> keySuffixSerializer,
Function<T, TH> keySuffixHashFunction,
SerializerFixedBinaryLength<TH, Buffer> keySuffixHashSerializer) {
SerializerFixedBinaryLength<TH, Send<Buffer>> keySuffixHashSerializer) {
super(dictionary,
prefixKey,
keySuffixSerializer,
DatabaseEmpty.NOTHING_SERIALIZER,
DatabaseEmpty.nothingSerializer(dictionary.getAllocator()),
keySuffixHashFunction,
keySuffixHashSerializer
);
}
public static <T, TH> DatabaseSetDictionaryHashed<T, TH> simple(LLDictionary dictionary,
Serializer<T, Buffer> keySerializer,
Serializer<T, Send<Buffer>> keySerializer,
Function<T, TH> keyHashFunction,
SerializerFixedBinaryLength<TH, Buffer> keyHashSerializer) {
SerializerFixedBinaryLength<TH, Send<Buffer>> keyHashSerializer) {
return new DatabaseSetDictionaryHashed<>(dictionary,
dictionary.getAllocator().buffer(0),
dictionary.getAllocator().allocate(0).send(),
keySerializer,
keyHashFunction,
keyHashSerializer
@ -43,10 +44,10 @@ public class DatabaseSetDictionaryHashed<T, TH> extends DatabaseMapDictionaryHas
}
public static <T, TH> DatabaseSetDictionaryHashed<T, TH> tail(LLDictionary dictionary,
Buffer prefixKey,
Serializer<T, Buffer> keySuffixSerializer,
Send<Buffer> prefixKey,
Serializer<T, Send<Buffer>> keySuffixSerializer,
Function<T, TH> keyHashFunction,
SerializerFixedBinaryLength<TH, Buffer> keyHashSerializer) {
SerializerFixedBinaryLength<TH, Send<Buffer>> keyHashSerializer) {
return new DatabaseSetDictionaryHashed<>(dictionary,
prefixKey,
keySuffixSerializer,

View File

@ -1,6 +1,7 @@
package it.cavallium.dbengine.database.collections;
import io.netty.buffer.api.Buffer;
import io.netty.buffer.api.Send;
import it.cavallium.dbengine.client.CompositeSnapshot;
import it.cavallium.dbengine.database.LLDictionary;
import it.cavallium.dbengine.database.serialization.Serializer;
@ -16,15 +17,15 @@ import reactor.core.publisher.Mono;
public class SubStageGetterHashMap<T, U, TH> implements
SubStageGetter<Map<T, U>, DatabaseMapDictionaryHashed<T, U, TH>> {
private final Serializer<T, Buffer> keySerializer;
private final Serializer<U, Buffer> valueSerializer;
private final Serializer<T, Send<Buffer>> keySerializer;
private final Serializer<U, Send<Buffer>> valueSerializer;
private final Function<T, TH> keyHashFunction;
private final SerializerFixedBinaryLength<TH, Buffer> keyHashSerializer;
private final SerializerFixedBinaryLength<TH, Send<Buffer>> keyHashSerializer;
public SubStageGetterHashMap(Serializer<T, Buffer> keySerializer,
Serializer<U, Buffer> valueSerializer,
public SubStageGetterHashMap(Serializer<T, Send<Buffer>> keySerializer,
Serializer<U, Send<Buffer>> valueSerializer,
Function<T, TH> keyHashFunction,
SerializerFixedBinaryLength<TH, Buffer> keyHashSerializer) {
SerializerFixedBinaryLength<TH, Send<Buffer>> keyHashSerializer) {
this.keySerializer = keySerializer;
this.valueSerializer = valueSerializer;
this.keyHashFunction = keyHashFunction;
@ -34,20 +35,20 @@ public class SubStageGetterHashMap<T, U, TH> implements
@Override
public Mono<DatabaseMapDictionaryHashed<T, U, TH>> subStage(LLDictionary dictionary,
@Nullable CompositeSnapshot snapshot,
Mono<Buffer> prefixKeyMono) {
Mono<Send<Buffer>> prefixKeyMono) {
return Mono.usingWhen(
prefixKeyMono,
prefixKey -> Mono
.fromSupplier(() -> DatabaseMapDictionaryHashed
.tail(dictionary,
prefixKey.retain(),
prefixKey,
keySerializer,
valueSerializer,
keyHashFunction,
keyHashSerializer
)
),
prefixKey -> Mono.fromRunnable(prefixKey::release)
prefixKey -> Mono.fromRunnable(prefixKey::close)
);
}

View File

@ -1,6 +1,7 @@
package it.cavallium.dbengine.database.collections;
import io.netty.buffer.api.Buffer;
import io.netty.buffer.api.Send;
import it.cavallium.dbengine.client.CompositeSnapshot;
import it.cavallium.dbengine.database.LLDictionary;
import it.cavallium.dbengine.database.collections.DatabaseEmpty.Nothing;
@ -16,13 +17,13 @@ import reactor.core.publisher.Mono;
public class SubStageGetterHashSet<T, TH> implements
SubStageGetter<Map<T, Nothing>, DatabaseSetDictionaryHashed<T, TH>> {
private final Serializer<T, Buffer> keySerializer;
private final Serializer<T, Send<Buffer>> keySerializer;
private final Function<T, TH> keyHashFunction;
private final SerializerFixedBinaryLength<TH, Buffer> keyHashSerializer;
private final SerializerFixedBinaryLength<TH, Send<Buffer>> keyHashSerializer;
public SubStageGetterHashSet(Serializer<T, Buffer> keySerializer,
public SubStageGetterHashSet(Serializer<T, Send<Buffer>> keySerializer,
Function<T, TH> keyHashFunction,
SerializerFixedBinaryLength<TH, Buffer> keyHashSerializer) {
SerializerFixedBinaryLength<TH, Send<Buffer>> keyHashSerializer) {
this.keySerializer = keySerializer;
this.keyHashFunction = keyHashFunction;
this.keyHashSerializer = keyHashSerializer;
@ -31,18 +32,18 @@ public class SubStageGetterHashSet<T, TH> implements
@Override
public Mono<DatabaseSetDictionaryHashed<T, TH>> subStage(LLDictionary dictionary,
@Nullable CompositeSnapshot snapshot,
Mono<Buffer> prefixKeyMono) {
Mono<Send<Buffer>> prefixKeyMono) {
return Mono.usingWhen(prefixKeyMono,
prefixKey -> Mono
.fromSupplier(() -> DatabaseSetDictionaryHashed
.tail(dictionary,
prefixKey.retain(),
prefixKey,
keySerializer,
keyHashFunction,
keyHashSerializer
)
),
prefixKey -> Mono.fromRunnable(prefixKey::release)
prefixKey -> Mono.fromRunnable(prefixKey::close)
);
}

View File

@ -1,6 +1,7 @@
package it.cavallium.dbengine.database.collections;
import io.netty.buffer.api.Buffer;
import io.netty.buffer.api.Send;
import io.netty.util.ReferenceCounted;
import it.cavallium.dbengine.client.CompositeSnapshot;
import it.cavallium.dbengine.database.LLDictionary;
@ -14,11 +15,11 @@ import reactor.core.publisher.Mono;
public class SubStageGetterMap<T, U> implements SubStageGetter<Map<T, U>, DatabaseMapDictionary<T, U>> {
private final SerializerFixedBinaryLength<T, Buffer> keySerializer;
private final Serializer<U, Buffer> valueSerializer;
private final SerializerFixedBinaryLength<T, Send<Buffer>> keySerializer;
private final Serializer<U, Send<Buffer>> valueSerializer;
public SubStageGetterMap(SerializerFixedBinaryLength<T, Buffer> keySerializer,
Serializer<U, Buffer> valueSerializer) {
public SubStageGetterMap(SerializerFixedBinaryLength<T, Send<Buffer>> keySerializer,
Serializer<U, Send<Buffer>> valueSerializer) {
this.keySerializer = keySerializer;
this.valueSerializer = valueSerializer;
}
@ -26,17 +27,17 @@ public class SubStageGetterMap<T, U> implements SubStageGetter<Map<T, U>, Databa
@Override
public Mono<DatabaseMapDictionary<T, U>> subStage(LLDictionary dictionary,
@Nullable CompositeSnapshot snapshot,
Mono<Buffer> prefixKeyMono) {
Mono<Send<Buffer>> prefixKeyMono) {
return Mono.usingWhen(prefixKeyMono,
prefixKey -> Mono
.fromSupplier(() -> DatabaseMapDictionary
.tail(dictionary,
prefixKey.retain(),
prefixKey,
keySerializer,
valueSerializer
)
),
prefixKey -> Mono.fromRunnable(prefixKey::release)
prefixKey -> Mono.fromRunnable(prefixKey::close)
);
}

View File

@ -1,6 +1,7 @@
package it.cavallium.dbengine.database.collections;
import io.netty.buffer.api.Buffer;
import io.netty.buffer.api.Send;
import io.netty.util.ReferenceCounted;
import it.cavallium.dbengine.client.CompositeSnapshot;
import it.cavallium.dbengine.database.LLDictionary;
@ -15,11 +16,11 @@ public class SubStageGetterMapDeep<T, U, US extends DatabaseStage<U>> implements
SubStageGetter<Map<T, U>, DatabaseMapDictionaryDeep<T, U, US>> {
private final SubStageGetter<U, US> subStageGetter;
private final SerializerFixedBinaryLength<T, Buffer> keySerializer;
private final SerializerFixedBinaryLength<T, Send<Buffer>> keySerializer;
private final int keyExtLength;
public SubStageGetterMapDeep(SubStageGetter<U, US> subStageGetter,
SerializerFixedBinaryLength<T, Buffer> keySerializer,
SerializerFixedBinaryLength<T, Send<Buffer>> keySerializer,
int keyExtLength) {
this.subStageGetter = subStageGetter;
this.keySerializer = keySerializer;
@ -41,18 +42,18 @@ public class SubStageGetterMapDeep<T, U, US extends DatabaseStage<U>> implements
@Override
public Mono<DatabaseMapDictionaryDeep<T, U, US>> subStage(LLDictionary dictionary,
@Nullable CompositeSnapshot snapshot,
Mono<Buffer> prefixKeyMono) {
Mono<Send<Buffer>> prefixKeyMono) {
return Mono.usingWhen(prefixKeyMono,
prefixKey -> Mono
.fromSupplier(() -> DatabaseMapDictionaryDeep
.deepIntermediate(dictionary,
prefixKey.retain(),
prefixKey,
keySerializer,
subStageGetter,
keyExtLength
)
),
prefixKey -> Mono.fromRunnable(prefixKey::release)
prefixKey -> Mono.fromRunnable(prefixKey::close)
);
}
@ -61,23 +62,6 @@ public class SubStageGetterMapDeep<T, U, US extends DatabaseStage<U>> implements
return true;
}
private Mono<Void> checkKeyFluxConsistency(Buffer prefixKey, List<Buffer> keys) {
return Mono
.fromCallable(() -> {
try {
for (Buffer key : keys) {
assert key.readableBytes() == prefixKey.readableBytes() + getKeyBinaryLength();
}
} finally {
prefixKey.release();
for (Buffer key : keys) {
key.release();
}
}
return null;
});
}
public int getKeyBinaryLength() {
return keySerializer.getSerializedBinaryLength() + keyExtLength;
}

View File

@ -1,6 +1,7 @@
package it.cavallium.dbengine.database.collections;
import io.netty.buffer.api.Buffer;
import io.netty.buffer.api.Send;
import io.netty.util.ReferenceCounted;
import it.cavallium.dbengine.client.CompositeSnapshot;
import it.cavallium.dbengine.database.LLDictionary;
@ -14,20 +15,20 @@ import reactor.core.publisher.Mono;
public class SubStageGetterSet<T> implements SubStageGetter<Map<T, Nothing>, DatabaseSetDictionary<T>> {
private final SerializerFixedBinaryLength<T, Buffer> keySerializer;
private final SerializerFixedBinaryLength<T, Send<Buffer>> keySerializer;
public SubStageGetterSet(SerializerFixedBinaryLength<T, Buffer> keySerializer) {
public SubStageGetterSet(SerializerFixedBinaryLength<T, Send<Buffer>> keySerializer) {
this.keySerializer = keySerializer;
}
@Override
public Mono<DatabaseSetDictionary<T>> subStage(LLDictionary dictionary,
@Nullable CompositeSnapshot snapshot,
Mono<Buffer> prefixKeyMono) {
Mono<Send<Buffer>> prefixKeyMono) {
return Mono.usingWhen(prefixKeyMono,
prefixKey -> Mono
.fromSupplier(() -> DatabaseSetDictionary.tail(dictionary, prefixKey.retain(), keySerializer)),
prefixKey -> Mono.fromRunnable(prefixKey::release)
.fromSupplier(() -> DatabaseSetDictionary.tail(dictionary, prefixKey, keySerializer)),
prefixKey -> Mono.fromRunnable(prefixKey::close)
);
}

View File

@ -1,9 +1,10 @@
package it.cavallium.dbengine.database.collections;
import io.netty.buffer.api.Buffer;
import io.netty.buffer.api.Send;
import it.cavallium.dbengine.database.serialization.Serializer;
public class SubStageGetterSingleBytes extends SubStageGetterSingle<Buffer> {
public class SubStageGetterSingleBytes extends SubStageGetterSingle<Send<Buffer>> {
public SubStageGetterSingleBytes() {
super(Serializer.noop());

View File

@ -2,7 +2,6 @@ package it.cavallium.dbengine.database.disk;
import io.netty.buffer.api.Buffer;
import io.netty.buffer.api.BufferAllocator;
import io.netty.buffer.api.BufferUtil;
import io.netty.buffer.api.Send;
import it.cavallium.dbengine.database.LLRange;
import it.cavallium.dbengine.database.LLUtils;
@ -37,15 +36,17 @@ public abstract class LLLocalGroupedReactiveRocksIterator<T> {
ReadOptions readOptions,
boolean canFillCache,
boolean readValues) {
this.db = db;
this.alloc = alloc;
this.cfh = cfh;
this.prefixLength = prefixLength;
this.range = range;
this.allowNettyDirect = allowNettyDirect;
this.readOptions = readOptions;
this.canFillCache = canFillCache;
this.readValues = readValues;
try (range) {
this.db = db;
this.alloc = alloc;
this.cfh = cfh;
this.prefixLength = prefixLength;
this.range = range.receive();
this.allowNettyDirect = allowNettyDirect;
this.readOptions = readOptions;
this.canFillCache = canFillCache;
this.readValues = readValues;
}
}
@ -54,9 +55,8 @@ public abstract class LLLocalGroupedReactiveRocksIterator<T> {
.generate(() -> {
var readOptions = new ReadOptions(this.readOptions);
readOptions.setFillCache(canFillCache && range.hasMin() && range.hasMax());
return LLLocalDictionary.getRocksIterator(allowNettyDirect, readOptions, range.retain(), db, cfh);
return LLLocalDictionary.getRocksIterator(allowNettyDirect, readOptions, range.copy().send(), db, cfh);
}, (tuple, sink) -> {
range.retain();
try {
var rocksIterator = tuple.getT1();
ObjectArrayList<T> values = new ObjectArrayList<>();
@ -64,34 +64,32 @@ public abstract class LLLocalGroupedReactiveRocksIterator<T> {
try {
rocksIterator.status();
while (rocksIterator.isValid()) {
Buffer key = LLUtils.readDirectNioBuffer(alloc, rocksIterator::key);
try {
try (Buffer key = LLUtils.readDirectNioBuffer(alloc, rocksIterator::key)) {
if (firstGroupKey == null) {
firstGroupKey = key.retain();
} else if (!ByteBufUtil.equals(firstGroupKey, firstGroupKey.readerIndex(), key, key.readerIndex(), prefixLength)) {
firstGroupKey = key.copy();
} else if (!LLUtils.equals(firstGroupKey, firstGroupKey.readerOffset(),
key, key.readerOffset(), prefixLength)) {
break;
}
Buffer value;
if (readValues) {
value = LLUtils.readDirectNioBuffer(alloc, rocksIterator::value);
} else {
value = alloc.buffer(0);
value = alloc.allocate(0);
}
try {
rocksIterator.next();
rocksIterator.status();
T entry = getEntry(key.retain(), value.retain());
T entry = getEntry(key.send(), value.send());
values.add(entry);
} finally {
value.release();
value.close();
}
} finally {
key.release();
}
}
} finally {
if (firstGroupKey != null) {
firstGroupKey.release();
firstGroupKey.close();
}
}
if (!values.isEmpty()) {
@ -101,21 +99,19 @@ public abstract class LLLocalGroupedReactiveRocksIterator<T> {
}
} catch (RocksDBException ex) {
sink.error(ex);
} finally {
range.release();
}
return tuple;
}, tuple -> {
var rocksIterator = tuple.getT1();
rocksIterator.close();
tuple.getT2().release();
tuple.getT3().release();
tuple.getT2().close();
tuple.getT3().close();
});
}
public abstract T getEntry(Send<Buffer> key, Send<Buffer> value);
public void release() {
range.release();
range.close();
}
}

View File

@ -2,7 +2,6 @@ package it.cavallium.dbengine.database.disk;
import io.netty.buffer.api.Buffer;
import io.netty.buffer.api.BufferAllocator;
import io.netty.buffer.api.BufferUtil;
import io.netty.buffer.api.Send;
import it.cavallium.dbengine.database.LLRange;
import it.cavallium.dbengine.database.LLUtils;
@ -34,15 +33,17 @@ public class LLLocalKeyPrefixReactiveRocksIterator {
ReadOptions readOptions,
boolean canFillCache,
String debugName) {
this.db = db;
this.alloc = alloc;
this.cfh = cfh;
this.prefixLength = prefixLength;
this.range = range;
this.allowNettyDirect = allowNettyDirect;
this.readOptions = readOptions;
this.canFillCache = canFillCache;
this.debugName = debugName;
try (range) {
this.db = db;
this.alloc = alloc;
this.cfh = cfh;
this.prefixLength = prefixLength;
this.range = range.receive();
this.allowNettyDirect = allowNettyDirect;
this.readOptions = readOptions;
this.canFillCache = canFillCache;
this.debugName = debugName;
}
}
@ -54,54 +55,48 @@ public class LLLocalKeyPrefixReactiveRocksIterator {
readOptions.setReadaheadSize(32 * 1024); // 32KiB
readOptions.setFillCache(canFillCache);
}
return LLLocalDictionary.getRocksIterator(allowNettyDirect, readOptions, range.retain(), db, cfh);
return LLLocalDictionary.getRocksIterator(allowNettyDirect, readOptions, range.copy().send(), db, cfh);
}, (tuple, sink) -> {
range.retain();
try {
var rocksIterator = tuple.getT1();
rocksIterator.status();
Buffer firstGroupKey = null;
try {
while (rocksIterator.isValid()) {
Buffer key = LLUtils.readDirectNioBuffer(alloc, rocksIterator::key);
try {
try (Buffer key = LLUtils.readDirectNioBuffer(alloc, rocksIterator::key)) {
if (firstGroupKey == null) {
firstGroupKey = key.retain();
} else if (!ByteBufUtil.equals(firstGroupKey, firstGroupKey.readerIndex(), key, key.readerIndex(), prefixLength)) {
firstGroupKey = key.copy();
} else if (!LLUtils.equals(firstGroupKey, firstGroupKey.readerOffset(), key, key.readerOffset(), prefixLength)) {
break;
}
rocksIterator.next();
rocksIterator.status();
} finally {
key.release();
}
}
if (firstGroupKey != null) {
var groupKeyPrefix = firstGroupKey.slice(0, prefixLength);
sink.next(groupKeyPrefix.retain());
var groupKeyPrefix = firstGroupKey.copy(firstGroupKey.readerOffset(), prefixLength);
sink.next(groupKeyPrefix.send());
} else {
sink.complete();
}
} finally {
if (firstGroupKey != null) {
firstGroupKey.release();
firstGroupKey.close();
}
}
} catch (RocksDBException ex) {
sink.error(ex);
} finally {
range.release();
}
return tuple;
}, tuple -> {
var rocksIterator = tuple.getT1();
rocksIterator.close();
tuple.getT2().release();
tuple.getT3().release();
tuple.getT2().close();
tuple.getT3().close();
});
}
public void release() {
range.release();
range.close();
}
}

View File

@ -26,14 +26,14 @@ public class LLMemoryDatabaseConnection implements LLDatabaseConnection {
JMXNettyMonitoringManager.initialize();
}
private final ByteBufAllocator allocator;
private final BufferAllocator allocator;
public LLMemoryDatabaseConnection(ByteBufAllocator allocator) {
public LLMemoryDatabaseConnection(BufferAllocator allocator) {
this.allocator = allocator;
}
@Override
public ByteBufAllocator getAllocator() {
public BufferAllocator getAllocator() {
return allocator;
}

View File

@ -38,7 +38,7 @@ public class LLMemoryDictionary implements LLDictionary {
private final String databaseName;
private final String columnName;
private final ByteBufAllocator allocator;
private final BufferAllocator allocator;
private final UpdateMode updateMode;
private final Getter<Long, ConcurrentSkipListMap<ByteList, ByteList>> snapshots;
private final ConcurrentSkipListMap<ByteList, ByteList> mainDb;
@ -47,7 +47,7 @@ public class LLMemoryDictionary implements LLDictionary {
U get(T argument);
}
public LLMemoryDictionary(ByteBufAllocator allocator,
public LLMemoryDictionary(BufferAllocator allocator,
String databaseName,
String columnName,
UpdateMode updateMode,
@ -67,7 +67,7 @@ public class LLMemoryDictionary implements LLDictionary {
}
@Override
public ByteBufAllocator getAllocator() {
public BufferAllocator getAllocator() {
return allocator;
}

View File

@ -28,7 +28,7 @@ import reactor.core.publisher.Mono;
public class LLMemoryKeyValueDatabase implements LLKeyValueDatabase {
private final ByteBufAllocator allocator;
private final BufferAllocator allocator;
private final String name;
private final AtomicLong nextSnapshotNumber = new AtomicLong(1);
@ -36,7 +36,7 @@ public class LLMemoryKeyValueDatabase implements LLKeyValueDatabase {
private final ConcurrentHashMap<String, ConcurrentSkipListMap<ByteList, ByteList>> mainDb;
private final ConcurrentHashMap<String, LLMemoryDictionary> singletons = new ConcurrentHashMap<>();
public LLMemoryKeyValueDatabase(ByteBufAllocator allocator, String name, List<Column> columns) {
public LLMemoryKeyValueDatabase(BufferAllocator allocator, String name, List<Column> columns) {
this.allocator = allocator;
this.name = name;
this.mainDb = new ConcurrentHashMap<>();
@ -87,7 +87,7 @@ public class LLMemoryKeyValueDatabase implements LLKeyValueDatabase {
}
@Override
public ByteBufAllocator getAllocator() {
public BufferAllocator getAllocator() {
return allocator;
}

View File

@ -2,6 +2,7 @@ package it.cavallium.dbengine.database.memory;
import io.netty.buffer.api.Buffer;
import io.netty.buffer.Unpooled;
import io.netty.buffer.api.Send;
import it.cavallium.dbengine.database.LLDictionaryResultType;
import it.cavallium.dbengine.database.LLSingleton;
import it.cavallium.dbengine.database.LLSnapshot;
@ -18,8 +19,10 @@ public class LLMemorySingleton implements LLSingleton {
public LLMemorySingleton(LLMemoryDictionary dict, byte[] singletonName) {
this.dict = dict;
this.singletonName = singletonName;
Buffer singletonNameBuf = Unpooled.wrappedBuffer(singletonName);
this.singletonNameBufMono = Mono.just(singletonNameBuf).map(Buffer::retain);
this.singletonNameBufMono = Mono.fromCallable(() -> dict
.getAllocator()
.allocate(singletonName.length)
.writeBytes(singletonName));
}
@Override
@ -32,18 +35,16 @@ public class LLMemorySingleton implements LLSingleton {
return dict
.get(snapshot, singletonNameBufMono, false)
.map(b -> {
try {
try (b) {
return LLUtils.toArray(b);
} finally {
b.release();
}
});
}
@Override
public Mono<Void> set(byte[] value) {
var bbKey = Mono.just(Unpooled.wrappedBuffer(singletonName)).map(Buffer::retain);
var bbVal = Mono.just(Unpooled.wrappedBuffer(value)).map(Buffer::retain);
var bbKey = singletonNameBufMono;
var bbVal = Mono.fromCallable(() -> dict.getAllocator().allocate(value.length).writeBytes(value));
return dict
.put(bbKey, bbVal, LLDictionaryResultType.VOID)
.then();

View File

@ -0,0 +1,107 @@
package it.cavallium.dbengine.database.serialization;
import io.netty.buffer.api.Buffer;
import io.netty.buffer.api.Send;
import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.SafeCloseable;
import java.io.DataInput;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import org.jetbrains.annotations.NotNull;
public class BufferDataInput implements DataInput, SafeCloseable {
private final Buffer buf;
public BufferDataInput(Send<Buffer> bufferSend) {
this.buf = bufferSend.receive().makeReadOnly();
}
@Override
public void readFully(byte @NotNull [] b) {
this.readFully(b, 0, b.length);
}
@Override
public void readFully(byte @NotNull [] b, int off, int len) {
buf.copyInto(buf.readerOffset(), b, off, len);
buf.readerOffset(buf.readerOffset() + len);
}
@Override
public int skipBytes(int n) {
n = Math.min(n, buf.readerOffset() - buf.writerOffset());
buf.readerOffset(buf.readerOffset() + n);
return n;
}
@Override
public boolean readBoolean() {
return buf.readUnsignedByte() != 0;
}
@Override
public byte readByte() {
return buf.readByte();
}
@Override
public int readUnsignedByte() {
return buf.readUnsignedByte();
}
@Override
public short readShort() {
return buf.readShort();
}
@Override
public int readUnsignedShort() {
return buf.readUnsignedShort();
}
@Override
public char readChar() {
return buf.readChar();
}
@Override
public int readInt() {
return buf.readInt();
}
@Override
public long readLong() {
return buf.readLong();
}
@Override
public float readFloat() {
return buf.readFloat();
}
@Override
public double readDouble() {
return buf.readDouble();
}
@Override
public String readLine() {
throw new UnsupportedOperationException();
}
@NotNull
@Override
public String readUTF() {
var len = buf.readUnsignedShort();
try (var copiedBuf = buf.copy(buf.readerOffset(), len)) {
var off = copiedBuf.readerOffset();
return LLUtils.deserializeString(copiedBuf.send(), off, len, StandardCharsets.UTF_8);
}
}
@Override
public void close() {
buf.close();
}
}

View File

@ -0,0 +1,91 @@
package it.cavallium.dbengine.database.serialization;
import io.netty.buffer.api.Buffer;
import io.netty.buffer.api.Send;
import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.SafeCloseable;
import java.io.DataOutput;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import org.jetbrains.annotations.NotNull;
public class BufferDataOutput implements DataOutput {
private final Buffer buf;
public BufferDataOutput(Buffer bufferSend) {
this.buf = bufferSend;
}
@Override
public void write(int b) {
buf.writeUnsignedByte(b);
}
@Override
public void write(byte @NotNull [] b) {
buf.writeBytes(b);
}
@Override
public void write(byte @NotNull [] b, int off, int len) {
buf.writeBytes(b, off, len);
}
@Override
public void writeBoolean(boolean v) {
buf.writeUnsignedByte(v ? 1 : 0);
}
@Override
public void writeByte(int v) {
buf.writeByte((byte) v);
}
@Override
public void writeShort(int v) {
buf.writeShort((short) v);
}
@Override
public void writeChar(int v) {
buf.writeChar((char) v);
}
@Override
public void writeInt(int v) {
buf.writeInt(v);
}
@Override
public void writeLong(long v) {
buf.writeLong(v);
}
@Override
public void writeFloat(float v) {
buf.writeFloat(v);
}
@Override
public void writeDouble(double v) {
buf.writeDouble(v);
}
@Override
public void writeBytes(@NotNull String s) {
buf.writeBytes(s.getBytes());
}
@Override
public void writeChars(@NotNull String s) {
s.chars().forEach(c -> buf.writeChar((char) c));
}
@Override
public void writeUTF(@NotNull String s) {
var bytes = s.getBytes(StandardCharsets.UTF_8);
buf.writeUnsignedShort(bytes.length);
buf.writeBytes(bytes);
}
}

View File

@ -1,13 +1,13 @@
package it.cavallium.dbengine.database.serialization;
import io.netty.buffer.api.BufferInputStream;
import io.netty.buffer.api.BufferOutputStream;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.jetbrains.annotations.NotNull;
public interface Codec<A> {
@NotNull A deserialize(@NotNull ByteBufInputStream serialized) throws IOException;
@NotNull A deserialize(@NotNull BufferDataInput serialized) throws IOException;
void serialize(@NotNull ByteBufOutputStream outputStream, @NotNull A deserialized) throws IOException;
void serialize(@NotNull BufferDataOutput outputStream, @NotNull A deserialized) throws IOException;
}

View File

@ -2,18 +2,15 @@ package it.cavallium.dbengine.database.serialization;
import io.netty.buffer.api.Buffer;
import io.netty.buffer.api.BufferAllocator;
import io.netty.buffer.api.BufferInputStream;
import io.netty.buffer.api.BufferOutputStream;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.buffer.Unpooled;
import io.netty.buffer.api.Send;
import java.io.IOError;
import java.io.IOException;
import org.jetbrains.annotations.NotNull;
import org.warp.commonutils.error.IndexOutOfBoundsException;
public class CodecSerializer<A> implements Serializer<A, Buffer> {
public class CodecSerializer<A> implements Serializer<A, Send<Buffer>> {
private final ByteBufAllocator allocator;
private final BufferAllocator allocator;
private final Codecs<A> deserializationCodecs;
private final Codec<A> serializationCodec;
private final int serializationCodecId;
@ -24,7 +21,7 @@ public class CodecSerializer<A> implements Serializer<A, Buffer> {
* @param microCodecs if true, allow only codecs with a value from 0 to 255 to save disk space
*/
public CodecSerializer(
ByteBufAllocator allocator,
BufferAllocator allocator,
Codecs<A> deserializationCodecs,
Codec<A> serializationCodec,
int serializationCodecId,
@ -40,8 +37,8 @@ public class CodecSerializer<A> implements Serializer<A, Buffer> {
}
@Override
public @NotNull A deserialize(@NotNull Buffer serialized) {
try (var is = new ByteBufInputStream(serialized)) {
public @NotNull A deserialize(@NotNull Send<Buffer> serialized) {
try (var is = new BufferDataInput(serialized)) {
int codecId;
if (microCodecs) {
codecId = is.readUnsignedByte();
@ -53,26 +50,24 @@ public class CodecSerializer<A> implements Serializer<A, Buffer> {
} catch (IOException ex) {
// This shouldn't happen
throw new IOError(ex);
} finally {
serialized.release();
}
}
@Override
public @NotNull Buffer serialize(@NotNull A deserialized) {
Buffer buf = allocator.buffer();
try (var os = new ByteBufOutputStream(buf)) {
public @NotNull Send<Buffer> serialize(@NotNull A deserialized) {
try (Buffer buf = allocator.allocate(64)) {
var os = new BufferDataOutput(buf);
if (microCodecs) {
os.writeByte(serializationCodecId);
} else {
os.writeInt(serializationCodecId);
}
serializationCodec.serialize(os, deserialized);
return buf.send();
} catch (IOException ex) {
// This shouldn't happen
throw new IOError(ex);
}
return buf;
}
@SuppressWarnings("unused")

View File

@ -41,11 +41,10 @@ public interface Serializer<A, B> {
@Override
public @NotNull Send<Buffer> serialize(@NotNull String deserialized) {
// UTF-8 uses max. 3 bytes per char, so calculate the worst case.
int length = LLUtils.utf8MaxBytes(deserialized);
try (Buffer buf = allocator.allocate(Integer.BYTES + length)) {
buf.writeInt(length);
LLUtils.writeString(buf, deserialized, StandardCharsets.UTF_8);
var bytes = deserialized.getBytes(StandardCharsets.UTF_8);
try (Buffer buf = allocator.allocate(Integer.BYTES + bytes.length)) {
buf.writeInt(bytes.length);
buf.writeBytes(bytes);
return buf.send();
}
}

View File

@ -64,7 +64,7 @@ public interface SerializerFixedBinaryLength<A, B> extends Serializer<A, B> {
public @NotNull Send<Buffer> serialize(@NotNull String deserialized) throws SerializationException {
// UTF-8 uses max. 3 bytes per char, so calculate the worst case.
try (Buffer buf = allocator.allocate(LLUtils.utf8MaxBytes(deserialized))) {
LLUtils.writeString(buf, deserialized, StandardCharsets.UTF_8);
buf.writeBytes(deserialized.getBytes(StandardCharsets.UTF_8));
if (buf.readableBytes() != getSerializedBinaryLength()) {
throw new SerializationException("Fixed serializer with " + getSerializedBinaryLength()
+ " bytes has tried to serialize an element with "

View File

@ -1,14 +1,14 @@
package it.cavallium.dbengine.netty;
import io.netty.buffer.api.BufferAllocatorMetric;
public class JMXNettyMonitoring implements JMXNettyMonitoringMBean {
private final String name;
private final ByteBufAllocatorMetric metric;
protected final boolean direct;
private final io.netty.buffer.api.pool.BufferAllocatorMetric metric;
public JMXNettyMonitoring(String name, io.netty.buffer.api.BufferAllocatorMetric metric) {
public JMXNettyMonitoring(String name, boolean direct, io.netty.buffer.api.pool.BufferAllocatorMetric metric) {
this.name = name;
this.direct = direct;
this.metric = metric;
}
@ -19,12 +19,12 @@ public class JMXNettyMonitoring implements JMXNettyMonitoringMBean {
@Override
public Long getHeapUsed() {
return metric.usedHeapMemory();
return direct ? 0 : metric.usedMemory();
}
@Override
public Long getDirectUsed() {
return metric.usedDirectMemory();
return direct ? metric.usedMemory() : 0;
}
@Override

View File

@ -1,10 +1,11 @@
package it.cavallium.dbengine.netty;
import io.netty.buffer.api.BufferAllocatorMetric;
import io.netty.buffer.api.BufferAllocator;
import io.netty.buffer.PoolArenaMetric;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.buffer.PooledByteBufAllocatorMetric;
import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.buffer.api.pool.PooledBufferAllocator;
import java.lang.management.ManagementFactory;
import java.util.HashMap;
import java.util.Map;
@ -37,50 +38,23 @@ public class JMXNettyMonitoringManager {
public synchronized static JMXNettyMonitoringManager getInstance() {
if (instance == null) {
instance = new JMXNettyMonitoringManager();
instance.initializeInternal();
}
return instance;
}
private void initializeInternal() {
Map<String, ByteBufAllocatorMetric> allocators = new HashMap<>();
allocators.put("unpooled", UnpooledByteBufAllocator.DEFAULT.metric());
allocators.put("pooled", PooledByteBufAllocator.DEFAULT.metric());
for (var entry : allocators.entrySet()) {
register(entry.getKey(), entry.getValue());
}
}
public void register(String name, ByteBufAllocatorMetric metric) {
public void register(String name, BufferAllocator metric) {
try {
name = name.replaceAll("[^\\p{IsAlphabetic}\\p{IsDigit}_]", "_");
String type;
StandardMBean mbean;
if (metric instanceof PooledByteBufAllocatorMetric) {
var pooledMetric = (PooledByteBufAllocatorMetric) metric;
for (var arenaEntry : (Iterable<Entry<String, PoolArenaMetric>>) Stream.concat(
pooledMetric.directArenas().stream().map(arena -> Map.entry("direct", arena)),
pooledMetric.heapArenas().stream().map(arena -> Map.entry("heap", arena))
)::iterator) {
var arenaType = arenaEntry.getKey();
var arenaMetric = arenaEntry.getValue();
var jmx = new JMXPoolArenaNettyMonitoring(arenaMetric);
mbean = new StandardMBean(jmx, JMXPoolArenaNettyMonitoringMBean.class);
ObjectName botObjectName = new ObjectName("io.netty.stats:name=PoolArena,type=" + arenaType + ",arenaId=" + nextArenaId.getAndIncrement());
platformMBeanServer.registerMBean(mbean, botObjectName);
}
if (metric instanceof PooledBufferAllocator pooledMetric) {
var jmx = new JMXPooledNettyMonitoring(name, pooledMetric);
type = "pooled";
mbean = new StandardMBean(jmx, JMXNettyMonitoringMBean.class);
} else {
var jmx = new JMXNettyMonitoring(name, metric);
type = "unpooled";
mbean = new StandardMBean(jmx, JMXNettyMonitoringMBean.class);
}
ObjectName botObjectName = new ObjectName("io.netty.stats:name=ByteBufAllocator,allocatorName=" + name + ",type=" + type);
platformMBeanServer.registerMBean(mbean, botObjectName);
ObjectName botObjectName = new ObjectName("io.netty.stats:name=ByteBufAllocator,allocatorName=" + name + ",type=" + type);
platformMBeanServer.registerMBean(mbean, botObjectName);
}
} catch (MalformedObjectNameException | NotCompliantMBeanException | InstanceAlreadyExistsException | MBeanRegistrationException e) {
throw new RuntimeException(e);
}

View File

@ -1,50 +1,98 @@
package it.cavallium.dbengine.netty;
import io.netty.buffer.api.BufferAllocatorMetric;
import io.netty.buffer.PooledByteBufAllocatorMetric;
import io.netty.buffer.api.pool.BufferAllocatorMetric;
import io.netty.buffer.api.pool.BufferAllocatorMetric;
import io.netty.buffer.api.pool.PooledBufferAllocator;
import java.lang.reflect.Field;
public class JMXPooledNettyMonitoring extends JMXNettyMonitoring implements JMXNettyMonitoringMBean {
private final PooledByteBufAllocatorMetric metric;
private final PooledBufferAllocator alloc;
private final BufferAllocatorMetric metric;
private Field smallCacheSize;
private Field numThreadLocalCaches;
private Field normalCacheSize;
private Field chunkSize;
public JMXPooledNettyMonitoring(String name, PooledByteBufAllocatorMetric metric) {
super(name, metric);
this.metric = metric;
public JMXPooledNettyMonitoring(String name, PooledBufferAllocator alloc) {
super(name, alloc.isDirectBufferPooled(), alloc.metric());
this.alloc = alloc;
this.metric = alloc.metric();
try {
//noinspection JavaReflectionMemberAccess
numThreadLocalCaches = metric.getClass().getDeclaredField("numThreadLocalCaches");
} catch (NoSuchFieldException e) {
e.printStackTrace();
}
try {
//noinspection JavaReflectionMemberAccess
smallCacheSize = metric.getClass().getDeclaredField("smallCacheSize");
} catch (NoSuchFieldException e) {
e.printStackTrace();
}
try {
//noinspection JavaReflectionMemberAccess
normalCacheSize = metric.getClass().getDeclaredField("normalCacheSize");
} catch (NoSuchFieldException e) {
e.printStackTrace();
}
try {
//noinspection JavaReflectionMemberAccess
chunkSize = metric.getClass().getDeclaredField("chunkSize");
} catch (NoSuchFieldException e) {
e.printStackTrace();
}
}
@Override
public Integer getNumHeapArenas() {
return metric.numHeapArenas();
return direct ? 0 : alloc.numArenas();
}
@Override
public Integer getNumDirectArenas() {
return metric.numDirectArenas();
return direct ? alloc.numArenas() : 0;
}
@Override
public Integer getNumThreadLocalCachesArenas() {
return metric.numThreadLocalCaches();
try {
return numThreadLocalCaches.getInt(metric);
} catch (IllegalAccessException e) {
return 0;
}
}
@Deprecated
@Override
public Integer getTinyCacheSize() {
return metric.tinyCacheSize();
return 0;
}
@Override
public Integer getSmallCacheSize() {
return metric.smallCacheSize();
try {
return smallCacheSize.getInt(metric);
} catch (IllegalAccessException e) {
return 0;
}
}
@Override
public Integer getNormalCacheSize() {
return metric.normalCacheSize();
try {
return normalCacheSize.getInt(metric);
} catch (IllegalAccessException e) {
return 0;
}
}
@Override
public Integer getChunkSize() {
return metric.chunkSize();
try {
return chunkSize.getInt(metric);
} catch (IllegalAccessException e) {
return 0;
}
}
}