Support memory segments

This commit is contained in:
Andrea Cavalli 2021-09-22 18:33:28 +02:00
parent 8b73a05177
commit e034f3b778
26 changed files with 297 additions and 277 deletions

View File

@ -99,6 +99,10 @@
<groupId>io.net5</groupId>
<artifactId>netty-buffer</artifactId>
</dependency>
<dependency>
<groupId>io.net5.incubator</groupId>
<artifactId>netty-incubator-buffer-memseg</artifactId>
</dependency>
<dependency>
<groupId>javax.xml.bind</groupId>
<artifactId>jaxb-api</artifactId>
@ -266,6 +270,11 @@
<artifactId>netty-buffer</artifactId>
<version>5.0.0.Final-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>io.net5.incubator</groupId>
<artifactId>netty-incubator-buffer-memseg</artifactId>
<version>0.0.1.Final-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>javax.xml.bind</groupId>
<artifactId>jaxb-api</artifactId>

View File

@ -5,6 +5,7 @@ import io.net5.buffer.api.Send;
import it.cavallium.dbengine.database.serialization.SerializationException;
import it.cavallium.dbengine.database.serialization.Serializer;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
public class MappedSerializer<A, B> implements Serializer<B> {
@ -18,7 +19,7 @@ public class MappedSerializer<A, B> implements Serializer<B> {
}
@Override
public @NotNull DeserializationResult<B> deserialize(@NotNull Send<Buffer> serialized) throws SerializationException {
public @NotNull DeserializationResult<B> deserialize(@Nullable Send<Buffer> serialized) throws SerializationException {
try (serialized) {
var deserialized = serializer.deserialize(serialized);
return new DeserializationResult<>(keyMapper.map(deserialized.deserializedData()), deserialized.bytesRead());
@ -26,7 +27,7 @@ public class MappedSerializer<A, B> implements Serializer<B> {
}
@Override
public @NotNull Send<Buffer> serialize(@NotNull B deserialized) throws SerializationException {
public @Nullable Send<Buffer> serialize(@NotNull B deserialized) throws SerializationException {
return serializer.serialize(keyMapper.unmap(deserialized));
}
}

View File

@ -5,6 +5,7 @@ import io.net5.buffer.api.Send;
import it.cavallium.dbengine.database.serialization.SerializationException;
import it.cavallium.dbengine.database.serialization.SerializerFixedBinaryLength;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
public class MappedSerializerFixedLength<A, B> implements SerializerFixedBinaryLength<B> {
@ -18,7 +19,7 @@ public class MappedSerializerFixedLength<A, B> implements SerializerFixedBinaryL
}
@Override
public @NotNull DeserializationResult<B> deserialize(@NotNull Send<Buffer> serialized) throws SerializationException {
public @NotNull DeserializationResult<B> deserialize(@Nullable Send<Buffer> serialized) throws SerializationException {
try (serialized) {
var deserialized = fixedLengthSerializer.deserialize(serialized);
return new DeserializationResult<>(keyMapper.map(deserialized.deserializedData()), deserialized.bytesRead());
@ -26,7 +27,7 @@ public class MappedSerializerFixedLength<A, B> implements SerializerFixedBinaryL
}
@Override
public @NotNull Send<Buffer> serialize(@NotNull B deserialized) throws SerializationException {
public @Nullable Send<Buffer> serialize(@NotNull B deserialized) throws SerializationException {
return fixedLengthSerializer.serialize(keyMapper.unmap(deserialized));
}

View File

@ -313,7 +313,7 @@ public class LLUtils {
*/
@SuppressWarnings("ConstantConditions")
@Nullable
public static Send<Buffer> readNullableDirectNioBuffer(BufferAllocator alloc, ToIntFunction<ByteBuffer> reader) {
public static Buffer readNullableDirectNioBuffer(BufferAllocator alloc, ToIntFunction<ByteBuffer> reader) {
ByteBuffer directBuffer;
Buffer buffer;
{
@ -330,7 +330,7 @@ public class LLUtils {
if (size != RocksDB.NOT_FOUND) {
if (size == directBuffer.limit()) {
buffer.readerOffset(0).writerOffset(size);
return buffer.send();
return buffer;
} else {
assert size > directBuffer.limit();
assert directBuffer.limit() > 0;
@ -522,85 +522,32 @@ public class LLUtils {
}
@NotNull
public static Send<Buffer> readDirectNioBuffer(BufferAllocator alloc, ToIntFunction<ByteBuffer> reader) {
var nullableSend = readNullableDirectNioBuffer(alloc, reader);
try (var buffer = nullableSend != null ? nullableSend.receive() : null) {
if (buffer == null) {
throw new IllegalStateException("A non-nullable buffer read operation tried to return a \"not found\" element");
}
return buffer.send();
public static Buffer readDirectNioBuffer(BufferAllocator alloc, ToIntFunction<ByteBuffer> reader) {
var nullable = readNullableDirectNioBuffer(alloc, reader);
if (nullable == null) {
throw new IllegalStateException("A non-nullable buffer read operation tried to return a \"not found\" element");
}
return nullable;
}
public static Send<Buffer> compositeBuffer(BufferAllocator alloc, Send<Buffer> buffer) {
try (var composite = buffer.receive()) {
return composite.send();
}
public static Buffer compositeBuffer(BufferAllocator alloc, Send<Buffer> buffer) {
return buffer.receive();
}
public static Send<Buffer> compositeBuffer(BufferAllocator alloc, Send<Buffer> buffer1, Send<Buffer> buffer2) {
try (var buf1 = buffer1.receive()) {
try (var buf2 = buffer2.receive()) {
try (var composite = CompositeBuffer.compose(alloc, buf1.split().send(), buf2.split().send())) {
return composite.send();
}
}
}
public static Buffer compositeBuffer(BufferAllocator alloc, Send<Buffer> buffer1, Send<Buffer> buffer2) {
return CompositeBuffer.compose(alloc, buffer1, buffer2);
}
public static Send<Buffer> compositeBuffer(BufferAllocator alloc,
public static Buffer compositeBuffer(BufferAllocator alloc,
Send<Buffer> buffer1,
Send<Buffer> buffer2,
Send<Buffer> buffer3) {
try (var buf1 = buffer1.receive()) {
try (var buf2 = buffer2.receive()) {
try (var buf3 = buffer3.receive()) {
try (var composite = CompositeBuffer.compose(alloc,
buf1.split().send(),
buf2.split().send(),
buf3.split().send()
)) {
return composite.send();
}
}
}
}
return CompositeBuffer.compose(alloc, buffer1, buffer2, buffer3);
}
@SafeVarargs
public static Send<Buffer> compositeBuffer(BufferAllocator alloc, Send<Buffer>... buffers) {
try {
return switch (buffers.length) {
case 0 -> alloc.allocate(0).send();
case 1 -> compositeBuffer(alloc, buffers[0]);
case 2 -> compositeBuffer(alloc, buffers[0], buffers[1]);
case 3 -> compositeBuffer(alloc, buffers[0], buffers[1], buffers[2]);
default -> {
Buffer[] bufs = new Buffer[buffers.length];
for (int i = 0; i < buffers.length; i++) {
bufs[i] = buffers[i].receive();
}
try {
//noinspection unchecked
Send<Buffer>[] sentBufs = new Send[buffers.length];
for (int i = 0; i < buffers.length; i++) {
sentBufs[i] = bufs[i].split().send();
}
try (var composite = CompositeBuffer.compose(alloc, sentBufs)) {
yield composite.send();
}
} finally {
for (Buffer buf : bufs) {
buf.close();
}
}
}
};
} finally {
for (Send<Buffer> buffer : buffers) {
buffer.close();
}
}
public static Buffer compositeBuffer(BufferAllocator alloc, Send<Buffer>... buffers) {
return CompositeBuffer.compose(alloc, buffers);
}
public static <T> Mono<T> resolveDelta(Mono<Delta<T>> prev, UpdateReturnMode updateReturnMode) {

View File

@ -46,7 +46,7 @@ public class RepeatedElementList<T> implements List<T> {
@NotNull
@Override
public Object[] toArray() {
public Object @NotNull [] toArray() {
var arr = new Object[size];
Arrays.fill(arr, element);
return arr;
@ -54,7 +54,7 @@ public class RepeatedElementList<T> implements List<T> {
@NotNull
@Override
public <T1> T1[] toArray(@NotNull T1[] a) {
public <T1> T1 @NotNull [] toArray(@NotNull T1 @NotNull [] a) {
var arr = Arrays.copyOf(a, size);
Arrays.fill(arr, element);
return arr;
@ -152,8 +152,9 @@ public class RepeatedElementList<T> implements List<T> {
@NotNull
@Override
public ListIterator<T> listIterator(int index) {
return new ListIterator<T>() {
return new ListIterator<>() {
int position = index - 1;
@Override
public boolean hasNext() {
return position + 1 < size;

View File

@ -7,6 +7,7 @@ import it.cavallium.dbengine.database.LLDictionary;
import it.cavallium.dbengine.database.serialization.Serializer;
import it.cavallium.dbengine.database.serialization.Serializer.DeserializationResult;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
public class DatabaseEmpty {
@ -17,15 +18,15 @@ public class DatabaseEmpty {
public static Serializer<Nothing> nothingSerializer(BufferAllocator bufferAllocator) {
return new Serializer<>() {
@Override
public @NotNull DeserializationResult<Nothing> deserialize(@NotNull Send<Buffer> serialized) {
public @NotNull DeserializationResult<Nothing> deserialize(@Nullable Send<Buffer> serialized) {
try (serialized) {
return NOTHING_RESULT;
}
}
@Override
public @NotNull Send<Buffer> serialize(@NotNull Nothing deserialized) {
return bufferAllocator.allocate(0).send();
public @Nullable Send<Buffer> serialize(@NotNull Nothing deserialized) {
return null;
}
};
}

View File

@ -39,7 +39,7 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
private final Serializer<U> valueSerializer;
protected DatabaseMapDictionary(LLDictionary dictionary,
Send<Buffer> prefixKey,
@Nullable Send<Buffer> prefixKey,
SerializerFixedBinaryLength<T> keySuffixSerializer,
Serializer<U> valueSerializer) {
// Do not retain or release or use the prefixKey here
@ -50,7 +50,7 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
public static <T, U> DatabaseMapDictionary<T, U> simple(LLDictionary dictionary,
SerializerFixedBinaryLength<T> keySerializer,
Serializer<U> valueSerializer) {
return new DatabaseMapDictionary<>(dictionary, dictionary.getAllocator().allocate(0).send(), keySerializer, valueSerializer);
return new DatabaseMapDictionary<>(dictionary, null, keySerializer, valueSerializer);
}
public static <T, U> DatabaseMapDictionary<T, U> tail(LLDictionary dictionary,
@ -60,10 +60,13 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
return new DatabaseMapDictionary<>(dictionary, prefixKey, keySuffixSerializer, valueSerializer);
}
private Send<Buffer> toKey(Send<Buffer> suffixKeyToSend) {
try (var suffixKey = suffixKeyToSend.receive()) {
assert suffixKeyConsistency(suffixKey.readableBytes());
private Buffer toKey(Send<Buffer> suffixKeyToSend) {
var suffixKey = suffixKeyToSend.receive();
assert suffixKeyConsistency(suffixKey.readableBytes());
if (keyPrefix != null) {
return LLUtils.compositeBuffer(dictionary.getAllocator(), keyPrefix.copy().send(), suffixKey.send());
} else {
return suffixKey;
}
}
@ -100,7 +103,7 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
.fromIterable(Collections.unmodifiableMap(value).entrySet())
.handle((entry, sink) -> {
try {
sink.next(LLEntry.of(this.toKey(serializeSuffix(entry.getKey())),
sink.next(LLEntry.of(this.toKey(serializeSuffix(entry.getKey()).send()).send(),
valueSerializer.serialize(entry.getValue())).send());
} catch (SerializationException e) {
sink.error(e);
@ -130,19 +133,22 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
@Override
public Mono<DatabaseStageEntry<U>> at(@Nullable CompositeSnapshot snapshot, T keySuffix) {
return Mono.fromCallable(() ->
new DatabaseSingle<>(dictionary, toKey(serializeSuffix(keySuffix)), valueSerializer));
new DatabaseSingle<>(dictionary, toKey(serializeSuffix(keySuffix).send()).send(), valueSerializer));
}
@Override
public Mono<U> getValue(@Nullable CompositeSnapshot snapshot, T keySuffix, boolean existsAlmostCertainly) {
return dictionary
.get(resolveSnapshot(snapshot), Mono.fromCallable(() -> toKey(serializeSuffix(keySuffix))), existsAlmostCertainly)
.get(resolveSnapshot(snapshot),
Mono.fromCallable(() -> toKey(serializeSuffix(keySuffix).send()).send()),
existsAlmostCertainly
)
.handle((value, sink) -> deserializeValue(value, sink));
}
@Override
public Mono<Void> putValue(T keySuffix, U value) {
var keyMono = Mono.fromCallable(() -> toKey(serializeSuffix(keySuffix)));
var keyMono = Mono.fromCallable(() -> toKey(serializeSuffix(keySuffix).send()).send());
var valueMono = Mono.fromCallable(() -> valueSerializer.serialize(value));
return dictionary
.put(keyMono, valueMono, LLDictionaryResultType.VOID)
@ -160,7 +166,7 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
UpdateReturnMode updateReturnMode,
boolean existsAlmostCertainly,
SerializationFunction<@Nullable U, @Nullable U> updater) {
var keyMono = Mono.fromCallable(() -> toKey(serializeSuffix(keySuffix)));
var keyMono = Mono.fromCallable(() -> toKey(serializeSuffix(keySuffix).send()).send());
return dictionary
.update(keyMono, getSerializedUpdater(updater), updateReturnMode, existsAlmostCertainly)
.handle((value, sink) -> deserializeValue(value, sink));
@ -170,7 +176,7 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
public Mono<Delta<U>> updateValueAndGetDelta(T keySuffix,
boolean existsAlmostCertainly,
SerializationFunction<@Nullable U, @Nullable U> updater) {
var keyMono = Mono.fromCallable(() -> toKey(serializeSuffix(keySuffix)));
var keyMono = Mono.fromCallable(() -> toKey(serializeSuffix(keySuffix).send()).send());
return dictionary
.updateAndGetDelta(keyMono, getSerializedUpdater(updater), existsAlmostCertainly)
.transform(mono -> LLUtils.mapLLDelta(mono,
@ -218,7 +224,7 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
@Override
public Mono<U> putValueAndGetPrevious(T keySuffix, U value) {
var keyMono = Mono.fromCallable(() -> toKey(serializeSuffix(keySuffix)));
var keyMono = Mono.fromCallable(() -> toKey(serializeSuffix(keySuffix).send()).send());
var valueMono = Mono.fromCallable(() -> valueSerializer.serialize(value));
return dictionary
.put(keyMono,
@ -229,7 +235,7 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
@Override
public Mono<Boolean> putValueAndGetChanged(T keySuffix, U value) {
var keyMono = Mono.fromCallable(() -> toKey(serializeSuffix(keySuffix)));
var keyMono = Mono.fromCallable(() -> toKey(serializeSuffix(keySuffix).send()).send());
var valueMono = Mono.fromCallable(() -> valueSerializer.serialize(value));
return dictionary
.put(keyMono, valueMono, LLDictionaryResultType.PREVIOUS_VALUE)
@ -240,7 +246,7 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
@Override
public Mono<Void> remove(T keySuffix) {
var keyMono = Mono.fromCallable(() -> toKey(serializeSuffix(keySuffix)));
var keyMono = Mono.fromCallable(() -> toKey(serializeSuffix(keySuffix).send()).send());
return dictionary
.remove(keyMono, LLDictionaryResultType.VOID)
.doOnNext(Send::close)
@ -249,7 +255,7 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
@Override
public Mono<U> removeAndGetPrevious(T keySuffix) {
var keyMono = Mono.fromCallable(() -> toKey(serializeSuffix(keySuffix)));
var keyMono = Mono.fromCallable(() -> toKey(serializeSuffix(keySuffix).send()).send());
return dictionary
.remove(keyMono, LLDictionaryResultType.PREVIOUS_VALUE)
.handle((value, sink) -> deserializeValue(value, sink));
@ -257,7 +263,7 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
@Override
public Mono<Boolean> removeAndGetStatus(T keySuffix) {
var keyMono = Mono.fromCallable(() -> toKey(serializeSuffix(keySuffix)));
var keyMono = Mono.fromCallable(() -> toKey(serializeSuffix(keySuffix).send()).send());
return dictionary
.remove(keyMono, LLDictionaryResultType.PREVIOUS_VALUE_EXISTENCE)
.map(LLUtils::responseToBoolean);
@ -268,7 +274,7 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
var mappedKeys = keys
.<Tuple2<T, Send<Buffer>>>handle((keySuffix, sink) -> {
try {
sink.next(Tuples.of(keySuffix, toKey(serializeSuffix(keySuffix))));
sink.next(Tuples.of(keySuffix, toKey(serializeSuffix(keySuffix).send()).send()));
} catch (SerializationException ex) {
sink.error(ex);
}
@ -297,7 +303,7 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
}
private Send<LLEntry> serializeEntry(T key, U value) throws SerializationException {
try (var serializedKey = toKey(serializeSuffix(key)).receive()) {
try (var serializedKey = toKey(serializeSuffix(key).send())) {
try (var serializedValue = valueSerializer.serialize(value).receive()) {
return LLEntry.of(serializedKey.send(), serializedValue.send()).send();
}
@ -337,7 +343,7 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
var serializedEntries = entries
.<Tuple2<Send<Buffer>, X>>handle((entry, sink) -> {
try {
sink.next(Tuples.of(serializeSuffix(entry.getT1()), entry.getT2()));
sink.next(Tuples.of(serializeSuffix(entry.getT1()).send(), entry.getT2()));
} catch (SerializationException ex) {
sink.error(ex);
}
@ -373,7 +379,7 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
try (var keySuffixWithExt = stripPrefix(key).receive()) {
sink.next(Map.entry(deserializeSuffix(keySuffixWithExt.copy().send()),
new DatabaseSingle<>(dictionary,
toKey(keySuffixWithExt.send()),
toKey(keySuffixWithExt.send()).send(),
valueSerializer
)
));
@ -412,7 +418,7 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
this.getAllValues(null),
dictionary.setRange(rangeMono, entries.handle((entry, sink) -> {
try {
sink.next(LLEntry.of(toKey(serializeSuffix(entry.getKey())),
sink.next(LLEntry.of(toKey(serializeSuffix(entry.getKey()).send()).send(),
valueSerializer.serialize(entry.getValue())).send());
} catch (SerializationException e) {
sink.error(e);

View File

@ -2,6 +2,7 @@ package it.cavallium.dbengine.database.collections;
import io.net5.buffer.api.Buffer;
import io.net5.buffer.api.BufferAllocator;
import io.net5.buffer.api.CompositeBuffer;
import io.net5.buffer.api.Resource;
import io.net5.buffer.api.Send;
import io.net5.util.IllegalReferenceCountException;
@ -36,53 +37,39 @@ public class DatabaseMapDictionaryDeep<T, U, US extends DatabaseStage<U>> implem
protected final Mono<Send<LLRange>> rangeMono;
private volatile boolean released;
private static Send<Buffer> incrementPrefix(BufferAllocator alloc, Send<Buffer> originalKeySend, int prefixLength) {
try (var originalKey = originalKeySend.receive()) {
assert originalKey.readableBytes() >= prefixLength;
var originalKeyLength = originalKey.readableBytes();
try (Buffer copiedBuf = alloc.allocate(originalKey.readableBytes())) {
boolean overflowed = true;
final int ff = 0xFF;
int writtenBytes = 0;
copiedBuf.writerOffset(prefixLength);
for (int i = prefixLength - 1; i >= 0; i--) {
int iByte = originalKey.getUnsignedByte(i);
if (iByte != ff) {
copiedBuf.setUnsignedByte(i, iByte + 1);
writtenBytes++;
overflowed = false;
break;
} else {
copiedBuf.setUnsignedByte(i, 0x00);
writtenBytes++;
overflowed = true;
}
}
assert prefixLength - writtenBytes >= 0;
if (prefixLength - writtenBytes > 0) {
originalKey.copyInto(0, copiedBuf, 0, (prefixLength - writtenBytes));
}
copiedBuf.writerOffset(originalKeyLength);
if (originalKeyLength - prefixLength > 0) {
originalKey.copyInto(prefixLength, copiedBuf, prefixLength, originalKeyLength - prefixLength);
}
if (overflowed) {
copiedBuf.ensureWritable(originalKeyLength + 1);
copiedBuf.writerOffset(originalKeyLength + 1);
for (int i = 0; i < originalKeyLength; i++) {
copiedBuf.setUnsignedByte(i, 0xFF);
}
copiedBuf.setUnsignedByte(originalKeyLength, (byte) 0x00);
}
return copiedBuf.send();
private static void incrementPrefix(Buffer prefix, int prefixLength) {
assert prefix.readableBytes() >= prefixLength;
assert prefix.readerOffset() == 0;
final var originalKeyLength = prefix.readableBytes();
boolean overflowed = true;
final int ff = 0xFF;
int writtenBytes = 0;
for (int i = prefixLength - 1; i >= 0; i--) {
int iByte = prefix.getUnsignedByte(i);
if (iByte != ff) {
prefix.setUnsignedByte(i, iByte + 1);
writtenBytes++;
overflowed = false;
break;
} else {
prefix.setUnsignedByte(i, 0x00);
writtenBytes++;
}
}
assert prefixLength - writtenBytes >= 0;
if (overflowed) {
assert prefix.writerOffset() == originalKeyLength;
prefix.ensureWritable(1, 1, true);
prefix.writerOffset(originalKeyLength + 1);
for (int i = 0; i < originalKeyLength; i++) {
prefix.setUnsignedByte(i, 0xFF);
}
prefix.setUnsignedByte(originalKeyLength, (byte) 0x00);
}
}
static Send<Buffer> firstRangeKey(BufferAllocator alloc,
static Buffer firstRangeKey(BufferAllocator alloc,
Send<Buffer> prefixKey,
int prefixLength,
int suffixLength,
@ -90,40 +77,44 @@ public class DatabaseMapDictionaryDeep<T, U, US extends DatabaseStage<U>> implem
return zeroFillKeySuffixAndExt(alloc, prefixKey, prefixLength, suffixLength, extLength);
}
static Send<Buffer> nextRangeKey(BufferAllocator alloc,
static Buffer nextRangeKey(BufferAllocator alloc,
Send<Buffer> prefixKey,
int prefixLength,
int suffixLength,
int extLength) {
try (prefixKey) {
try (Send<Buffer> nonIncremented = zeroFillKeySuffixAndExt(alloc, prefixKey, prefixLength, suffixLength,
extLength)) {
return incrementPrefix(alloc, nonIncremented, prefixLength);
}
Buffer nonIncremented = zeroFillKeySuffixAndExt(alloc, prefixKey, prefixLength, suffixLength, extLength);
incrementPrefix(nonIncremented, prefixLength);
return nonIncremented;
}
}
protected static Send<Buffer> zeroFillKeySuffixAndExt(BufferAllocator alloc,
Send<Buffer> prefixKeySend,
protected static Buffer zeroFillKeySuffixAndExt(BufferAllocator alloc,
@Nullable Send<Buffer> prefixKeySend,
int prefixLength,
int suffixLength,
int extLength) {
try (var prefixKey = prefixKeySend.receive()) {
assert prefixKey.readableBytes() == prefixLength;
assert suffixLength > 0;
assert extLength >= 0;
try (Buffer zeroSuffixAndExt = alloc.allocate(suffixLength + extLength)) {
try (var result = prefixKeySend == null ? null : prefixKeySend.receive()) {
if (result == null) {
assert prefixLength == 0;
var buf = alloc.allocate(prefixLength + suffixLength + extLength);
buf.writerOffset(prefixLength + suffixLength + extLength);
buf.fill((byte) 0);
return buf;
} else {
assert result.readableBytes() == prefixLength;
assert suffixLength > 0;
assert extLength >= 0;
result.ensureWritable(suffixLength + extLength, suffixLength + extLength, true);
for (int i = 0; i < suffixLength + extLength; i++) {
zeroSuffixAndExt.writeByte((byte) 0x0);
}
try (Buffer result = LLUtils.compositeBuffer(alloc, prefixKey.send(), zeroSuffixAndExt.send()).receive()) {
return result.send();
result.writeByte((byte) 0x0);
}
return result;
}
}
}
static Send<Buffer> firstRangeKey(
static Buffer firstRangeKey(
BufferAllocator alloc,
Send<Buffer> prefixKey,
Send<Buffer> suffixKey,
@ -133,25 +124,25 @@ public class DatabaseMapDictionaryDeep<T, U, US extends DatabaseStage<U>> implem
return zeroFillKeyExt(alloc, prefixKey, suffixKey, prefixLength, suffixLength, extLength);
}
static Send<Buffer> nextRangeKey(
static Buffer nextRangeKey(
BufferAllocator alloc,
Send<Buffer> prefixKey,
Send<Buffer> suffixKey,
int prefixLength,
int suffixLength,
int extLength) {
try (Send<Buffer> nonIncremented = zeroFillKeyExt(alloc,
Buffer nonIncremented = zeroFillKeyExt(alloc,
prefixKey,
suffixKey,
prefixLength,
suffixLength,
extLength
)) {
return incrementPrefix(alloc, nonIncremented, prefixLength + suffixLength);
}
);
incrementPrefix(nonIncremented, prefixLength + suffixLength);
return nonIncremented;
}
protected static Send<Buffer> zeroFillKeyExt(
protected static Buffer zeroFillKeyExt(
BufferAllocator alloc,
Send<Buffer> prefixKeySend,
Send<Buffer> suffixKeySend,
@ -165,17 +156,14 @@ public class DatabaseMapDictionaryDeep<T, U, US extends DatabaseStage<U>> implem
assert suffixLength > 0;
assert extLength >= 0;
try (var ext = alloc.allocate(extLength)) {
for (int i = 0; i < extLength; i++) {
ext.writeByte((byte) 0);
}
try (Buffer result = LLUtils.compositeBuffer(alloc, prefixKey.send(), suffixKey.send(), ext.send())
.receive()) {
assert result.readableBytes() == prefixLength + suffixLength + extLength;
return result.send();
}
Buffer result = LLUtils.compositeBuffer(alloc, prefixKey.send(), suffixKey.send());
result.ensureWritable(extLength, extLength, true);
for (int i = 0; i < extLength; i++) {
result.writeByte((byte) 0);
}
assert result.readableBytes() == prefixLength + suffixLength + extLength;
return result;
}
}
}
@ -187,8 +175,7 @@ public class DatabaseMapDictionaryDeep<T, U, US extends DatabaseStage<U>> implem
public static <T, U> DatabaseMapDictionaryDeep<T, U, DatabaseStageEntry<U>> simple(LLDictionary dictionary,
SerializerFixedBinaryLength<T> keySerializer,
SubStageGetterSingle<U> subStageGetter) {
return new DatabaseMapDictionaryDeep<>(dictionary, dictionary.getAllocator().allocate(0).send(),
keySerializer, subStageGetter, 0);
return new DatabaseMapDictionaryDeep<>(dictionary, null, keySerializer, subStageGetter, 0);
}
public static <T, U, US extends DatabaseStage<U>> DatabaseMapDictionaryDeep<T, U, US> deepTail(LLDictionary dictionary,
@ -196,7 +183,7 @@ public class DatabaseMapDictionaryDeep<T, U, US extends DatabaseStage<U>> implem
int keyExtLength,
SubStageGetter<U, US> subStageGetter) {
return new DatabaseMapDictionaryDeep<>(dictionary,
dictionary.getAllocator().allocate(0).send(),
null,
keySerializer,
subStageGetter,
keyExtLength
@ -212,39 +199,34 @@ public class DatabaseMapDictionaryDeep<T, U, US extends DatabaseStage<U>> implem
}
protected DatabaseMapDictionaryDeep(LLDictionary dictionary,
Send<Buffer> prefixKeyToReceive,
@Nullable Send<Buffer> prefixKeyToReceive,
SerializerFixedBinaryLength<T> keySuffixSerializer,
SubStageGetter<U, US> subStageGetter,
int keyExtLength) {
try (var prefixKey = prefixKeyToReceive.receive()) {
try (var prefixKey = prefixKeyToReceive == null ? null : prefixKeyToReceive.receive()) {
this.dictionary = dictionary;
this.alloc = dictionary.getAllocator();
this.subStageGetter = subStageGetter;
this.keySuffixSerializer = keySuffixSerializer;
this.keyPrefix = prefixKey.copy();
assert keyPrefix.isAccessible();
this.keyPrefixLength = keyPrefix.readableBytes();
assert prefixKey == null || prefixKey.isAccessible();
this.keyPrefixLength = prefixKey == null ? 0 : prefixKey.readableBytes();
this.keySuffixLength = keySuffixSerializer.getSerializedBinaryLength();
this.keyExtLength = keyExtLength;
try (Buffer firstKey = firstRangeKey(alloc,
prefixKey.copy().send(),
keyPrefixLength,
keySuffixLength,
keyExtLength
).receive().compact()) {
try (Buffer nextRangeKey = nextRangeKey(alloc,
prefixKey.copy().send(),
keyPrefixLength,
keySuffixLength,
keyExtLength
).receive().compact()) {
assert keyPrefix.isAccessible();
Buffer firstKey = firstRangeKey(alloc, prefixKey == null ? null : prefixKey.copy().send(), keyPrefixLength,
keySuffixLength, keyExtLength);
try (firstKey) {
var nextRangeKey = nextRangeKey(alloc, prefixKey == null ? null : prefixKey.copy().send(),
keyPrefixLength, keySuffixLength, keyExtLength);
try (nextRangeKey) {
assert prefixKey == null || prefixKey.isAccessible();
assert keyPrefixLength == 0 || !LLUtils.equals(firstKey, nextRangeKey);
this.range = keyPrefixLength == 0 ? LLRange.all() : LLRange.of(firstKey.send(), nextRangeKey.send());
this.rangeMono = LLUtils.lazyRetainRange(this.range);
assert subStageKeysConsistency(keyPrefixLength + keySuffixLength + keyExtLength);
}
}
this.keyPrefix = prefixKey == null ? null : prefixKey.send().receive();
}
}
@ -278,7 +260,7 @@ public class DatabaseMapDictionaryDeep<T, U, US extends DatabaseStage<U>> implem
protected Send<Buffer> toKeyWithoutExt(Send<Buffer> suffixKeyToReceive) {
try (var suffixKey = suffixKeyToReceive.receive()) {
assert suffixKey.readableBytes() == keySuffixLength;
try (Buffer result = LLUtils.compositeBuffer(alloc, keyPrefix.copy().send(), suffixKey.send()).receive()) {
try (Buffer result = LLUtils.compositeBuffer(alloc, keyPrefix.copy().send(), suffixKey.send())) {
assert result.readableBytes() == keyPrefixLength + keySuffixLength;
return result.send();
}
@ -306,7 +288,7 @@ public class DatabaseMapDictionaryDeep<T, U, US extends DatabaseStage<U>> implem
@Override
public Mono<US> at(@Nullable CompositeSnapshot snapshot, T keySuffix) {
return this.subStageGetter
.subStage(dictionary, snapshot, Mono.fromCallable(() -> toKeyWithoutExt(serializeSuffix(keySuffix))))
.subStage(dictionary, snapshot, Mono.fromCallable(() -> toKeyWithoutExt(serializeSuffix(keySuffix).send())))
.transform(LLUtils::handleDiscard)
.doOnDiscard(DatabaseStage.class, DatabaseStage::release);
}
@ -411,12 +393,11 @@ public class DatabaseMapDictionaryDeep<T, U, US extends DatabaseStage<U>> implem
}
//todo: temporary wrapper. convert the whole class to buffers
protected Send<Buffer> serializeSuffix(T keySuffix) throws SerializationException {
try (Buffer suffixData = keySuffixSerializer.serialize(keySuffix).receive()) {
assert suffixKeyConsistency(suffixData.readableBytes());
assert keyPrefix.isAccessible();
return suffixData.send();
}
protected Buffer serializeSuffix(T keySuffix) throws SerializationException {
Buffer suffixData = keySuffixSerializer.serialize(keySuffix).receive();
assert suffixKeyConsistency(suffixData.readableBytes());
assert keyPrefix.isAccessible();
return suffixData;
}
@Override

View File

@ -58,7 +58,7 @@ public class DatabaseMapDictionaryHashed<T, U, TH> implements DatabaseStageMap<T
SerializerFixedBinaryLength<UH> keyHashSerializer) {
return new DatabaseMapDictionaryHashed<>(
dictionary,
dictionary.getAllocator().allocate(0).send(),
null,
keySerializer,
valueSerializer,
keyHashFunction,

View File

@ -23,9 +23,7 @@ public class DatabaseSetDictionary<T> extends DatabaseMapDictionary<T, Nothing>
public static <T> DatabaseSetDictionary<T> simple(LLDictionary dictionary,
SerializerFixedBinaryLength<T> keySerializer) {
try (var buf = dictionary.getAllocator().allocate(0)) {
return new DatabaseSetDictionary<>(dictionary, buf.send(), keySerializer);
}
return new DatabaseSetDictionary<>(dictionary, null, keySerializer);
}
public static <T> DatabaseSetDictionary<T> tail(LLDictionary dictionary,

View File

@ -36,7 +36,7 @@ public class DatabaseSetDictionaryHashed<T, TH> extends DatabaseMapDictionaryHas
Function<T, TH> keyHashFunction,
SerializerFixedBinaryLength<TH> keyHashSerializer) {
return new DatabaseSetDictionaryHashed<>(dictionary,
dictionary.getAllocator().allocate(0).send(),
null,
keySerializer,
keyHashFunction,
keyHashSerializer

View File

@ -8,7 +8,9 @@ import it.cavallium.dbengine.database.serialization.SerializationException;
import it.cavallium.dbengine.database.serialization.Serializer;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
class ValueWithHashSerializer<X, Y> implements Serializer<Entry<X, Y>> {
@ -25,8 +27,9 @@ class ValueWithHashSerializer<X, Y> implements Serializer<Entry<X, Y>> {
}
@Override
public @NotNull DeserializationResult<Entry<X, Y>> deserialize(@NotNull Send<Buffer> serializedToReceive)
public @NotNull DeserializationResult<Entry<X, Y>> deserialize(@Nullable Send<Buffer> serializedToReceive)
throws SerializationException {
Objects.requireNonNull(serializedToReceive);
try (var serialized = serializedToReceive.receive()) {
DeserializationResult<X> deserializedKey = keySuffixSerializer.deserialize(serialized.copy().send());
DeserializationResult<Y> deserializedValue = valueSerializer.deserialize(serialized
@ -40,11 +43,17 @@ class ValueWithHashSerializer<X, Y> implements Serializer<Entry<X, Y>> {
}
@Override
public @NotNull Send<Buffer> serialize(@NotNull Entry<X, Y> deserialized) throws SerializationException {
try (Buffer keySuffix = keySuffixSerializer.serialize(deserialized.getKey()).receive()) {
try (Buffer value = valueSerializer.serialize(deserialized.getValue()).receive()) {
return LLUtils.compositeBuffer(allocator, keySuffix.send(), value.send());
}
public @Nullable Send<Buffer> serialize(@NotNull Entry<X, Y> deserialized) throws SerializationException {
var keySuffix = keySuffixSerializer.serialize(deserialized.getKey());
var value = valueSerializer.serialize(deserialized.getValue());
if (value == null && keySuffix == null) {
return null;
} else if (value == null) {
return keySuffix;
} else if (keySuffix == null) {
return value;
} else {
return LLUtils.compositeBuffer(allocator, keySuffix, value).send();
}
}
}

View File

@ -7,7 +7,9 @@ import it.cavallium.dbengine.database.serialization.SerializationException;
import it.cavallium.dbengine.database.serialization.Serializer;
import it.unimi.dsi.fastutil.objects.ObjectArraySet;
import java.util.ArrayList;
import java.util.Objects;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
class ValuesSetSerializer<X> implements Serializer<ObjectArraySet<X>> {
@ -20,7 +22,8 @@ class ValuesSetSerializer<X> implements Serializer<ObjectArraySet<X>> {
}
@Override
public @NotNull DeserializationResult<ObjectArraySet<X>> deserialize(@NotNull Send<Buffer> serializedToReceive) throws SerializationException {
public @NotNull DeserializationResult<ObjectArraySet<X>> deserialize(@Nullable Send<Buffer> serializedToReceive) throws SerializationException {
Objects.requireNonNull(serializedToReceive);
try (var serialized = serializedToReceive.receive()) {
int initialReaderOffset = serialized.readerOffset();
int entriesLength = serialized.readInt();
@ -41,9 +44,12 @@ class ValuesSetSerializer<X> implements Serializer<ObjectArraySet<X>> {
try (Buffer output = allocator.allocate(64)) {
output.writeInt(deserialized.size());
for (X entry : deserialized) {
try (Buffer serialized = entrySerializer.serialize(entry).receive()) {
output.ensureWritable(serialized.readableBytes());
output.writeBytes(serialized);
var serializedToReceive = entrySerializer.serialize(entry);
if (serializedToReceive != null) {
try (Buffer serialized = serializedToReceive.receive()) {
output.ensureWritable(serialized.readableBytes());
output.writeBytes(serialized);
}
}
}
return output.send();

View File

@ -1565,7 +1565,8 @@ public class LLLocalDictionary implements LLDictionary {
return Flux.usingWhen(rangeMono,
rangeSend -> Flux.using(
() -> new LLLocalKeyReactiveRocksIterator(db, alloc, cfh, rangeSend,
databaseOptions.allowNettyDirect(), resolveSnapshot(snapshot), getRangeKeysMultiDebugName),
databaseOptions.allowNettyDirect(), resolveSnapshot(snapshot)
),
llLocalKeyReactiveRocksIterator -> llLocalKeyReactiveRocksIterator.flux().subscribeOn(dbScheduler),
LLLocalReactiveRocksIterator::release
).transform(LLUtils::handleDiscard),
@ -1799,7 +1800,7 @@ public class LLLocalDictionary implements LLDictionary {
try {
rocksIterator.status();
while (rocksIterator.isValid()) {
writeBatch.delete(cfh, LLUtils.readDirectNioBuffer(alloc, rocksIterator::key));
writeBatch.delete(cfh, LLUtils.readDirectNioBuffer(alloc, rocksIterator::key).send());
rocksIterator.next();
rocksIterator.status();
}
@ -2123,8 +2124,8 @@ public class LLLocalDictionary implements LLDictionary {
try {
rocksIterator.status();
if (rocksIterator.isValid()) {
try (var key = LLUtils.readDirectNioBuffer(alloc, rocksIterator::key).receive()) {
try (var value = LLUtils.readDirectNioBuffer(alloc, rocksIterator::value).receive()) {
try (var key = LLUtils.readDirectNioBuffer(alloc, rocksIterator::key)) {
try (var value = LLUtils.readDirectNioBuffer(alloc, rocksIterator::value)) {
return LLEntry.of(key.send(), value.send()).send();
}
}
@ -2184,7 +2185,7 @@ public class LLLocalDictionary implements LLDictionary {
try {
rocksIterator.status();
if (rocksIterator.isValid()) {
return LLUtils.readDirectNioBuffer(alloc, rocksIterator::key);
return LLUtils.readDirectNioBuffer(alloc, rocksIterator::key).send();
} else {
return null;
}
@ -2354,8 +2355,8 @@ public class LLLocalDictionary implements LLDictionary {
if (!rocksIterator.isValid()) {
return null;
}
try (Buffer key = LLUtils.readDirectNioBuffer(alloc, rocksIterator::key).receive()) {
try (Buffer value = LLUtils.readDirectNioBuffer(alloc, rocksIterator::value).receive()) {
try (Buffer key = LLUtils.readDirectNioBuffer(alloc, rocksIterator::key)) {
try (Buffer value = LLUtils.readDirectNioBuffer(alloc, rocksIterator::value)) {
dbDelete(cfh, null, key.copy().send());
return LLEntry.of(key.send(), value.send()).send();
}

View File

@ -18,7 +18,7 @@ public class LLLocalEntryReactiveRocksIterator extends LLLocalReactiveRocksItera
boolean allowNettyDirect,
ReadOptions readOptions,
String debugName) {
super(db, alloc, cfh, range, allowNettyDirect, readOptions, true, debugName);
super(db, alloc, cfh, range, allowNettyDirect, readOptions, true);
}
@Override

View File

@ -7,6 +7,7 @@ import it.cavallium.dbengine.database.LLRange;
import it.cavallium.dbengine.database.LLUtils;
import it.unimi.dsi.fastutil.objects.ObjectArrayList;
import java.util.List;
import org.jetbrains.annotations.Nullable;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB;
@ -60,26 +61,28 @@ public abstract class LLLocalGroupedReactiveRocksIterator<T> {
try {
rocksIterator.status();
while (rocksIterator.isValid()) {
try (Buffer key = LLUtils.readDirectNioBuffer(alloc, rocksIterator::key).receive()) {
try (Buffer key = LLUtils.readDirectNioBuffer(alloc, rocksIterator::key)) {
if (firstGroupKey == null) {
firstGroupKey = key.copy();
} else if (!LLUtils.equals(firstGroupKey, firstGroupKey.readerOffset(),
key, key.readerOffset(), prefixLength)) {
break;
}
Buffer value;
@Nullable Buffer value;
if (readValues) {
value = LLUtils.readDirectNioBuffer(alloc, rocksIterator::value).receive();
value = LLUtils.readDirectNioBuffer(alloc, rocksIterator::value);
} else {
value = alloc.allocate(0);
value = null;
}
try {
rocksIterator.next();
rocksIterator.status();
T entry = getEntry(key.send(), value.send());
T entry = getEntry(key.send(), value == null ? null : value.send());
values.add(entry);
} finally {
value.close();
if (value != null) {
value.close();
}
}
}
}
@ -106,7 +109,7 @@ public abstract class LLLocalGroupedReactiveRocksIterator<T> {
});
}
public abstract T getEntry(Send<Buffer> key, Send<Buffer> value);
public abstract T getEntry(@Nullable Send<Buffer> key, @Nullable Send<Buffer> value);
public void release() {
range.close();

View File

@ -64,7 +64,7 @@ public class LLLocalKeyPrefixReactiveRocksIterator {
while (rocksIterator.isValid()) {
Buffer key;
if (allowNettyDirect) {
key = LLUtils.readDirectNioBuffer(alloc, rocksIterator::key).receive();
key = LLUtils.readDirectNioBuffer(alloc, rocksIterator::key);
} else {
key = LLUtils.fromByteArray(alloc, rocksIterator.key());
}

View File

@ -15,9 +15,8 @@ public class LLLocalKeyReactiveRocksIterator extends LLLocalReactiveRocksIterato
ColumnFamilyHandle cfh,
Send<LLRange> range,
boolean allowNettyDirect,
ReadOptions readOptions,
String debugName) {
super(db, alloc, cfh, range, allowNettyDirect, readOptions, false, debugName);
ReadOptions readOptions) {
super(db, alloc, cfh, range, allowNettyDirect, readOptions, false);
}
@Override

View File

@ -9,6 +9,7 @@ import io.net5.util.IllegalReferenceCountException;
import it.cavallium.dbengine.database.LLRange;
import it.cavallium.dbengine.database.LLUtils;
import java.util.concurrent.atomic.AtomicBoolean;
import org.jetbrains.annotations.Nullable;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB;
@ -25,7 +26,6 @@ public abstract class LLLocalReactiveRocksIterator<T> {
private final boolean allowNettyDirect;
private final ReadOptions readOptions;
private final boolean readValues;
private final String debugName;
public LLLocalReactiveRocksIterator(RocksDB db,
BufferAllocator alloc,
@ -33,8 +33,7 @@ public abstract class LLLocalReactiveRocksIterator<T> {
Send<LLRange> range,
boolean allowNettyDirect,
ReadOptions readOptions,
boolean readValues,
String debugName) {
boolean readValues) {
this.db = db;
this.alloc = alloc;
this.cfh = cfh;
@ -42,7 +41,6 @@ public abstract class LLLocalReactiveRocksIterator<T> {
this.allowNettyDirect = allowNettyDirect;
this.readOptions = readOptions;
this.readValues = readValues;
this.debugName = debugName;
}
public Flux<T> flux() {
@ -61,7 +59,7 @@ public abstract class LLLocalReactiveRocksIterator<T> {
if (rocksIterator.isValid()) {
Buffer key;
if (allowNettyDirect) {
key = LLUtils.readDirectNioBuffer(alloc, rocksIterator::key).receive();
key = LLUtils.readDirectNioBuffer(alloc, rocksIterator::key);
} else {
key = LLUtils.fromByteArray(alloc, rocksIterator.key());
}
@ -69,19 +67,21 @@ public abstract class LLLocalReactiveRocksIterator<T> {
Buffer value;
if (readValues) {
if (allowNettyDirect) {
value = LLUtils.readDirectNioBuffer(alloc, rocksIterator::value).receive();
value = LLUtils.readDirectNioBuffer(alloc, rocksIterator::value);
} else {
value = LLUtils.fromByteArray(alloc, rocksIterator.value());
}
} else {
value = alloc.allocate(0);
value = null;
}
try {
rocksIterator.next();
rocksIterator.status();
sink.next(getEntry(key.send(), value.send()));
sink.next(getEntry(key.send(), value == null ? null : value.send()));
} finally {
value.close();
if (value != null) {
value.close();
}
}
}
} else {
@ -100,7 +100,7 @@ public abstract class LLLocalReactiveRocksIterator<T> {
});
}
public abstract T getEntry(Send<Buffer> key, Send<Buffer> value);
public abstract T getEntry(@Nullable Send<Buffer> key, @Nullable Send<Buffer> value);
public void release() {
if (released.compareAndSet(false, true)) {

View File

@ -6,15 +6,17 @@ import it.cavallium.dbengine.database.SafeCloseable;
import java.io.DataInput;
import java.nio.charset.StandardCharsets;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
public class BufferDataInput implements DataInput, SafeCloseable {
@Nullable
private final Buffer buf;
private final int initialReaderOffset;
public BufferDataInput(Send<Buffer> bufferSend) {
this.buf = bufferSend.receive().makeReadOnly();
this.initialReaderOffset = buf.readerOffset();
public BufferDataInput(@Nullable Send<Buffer> bufferSend) {
this.buf = bufferSend == null ? null : bufferSend.receive().makeReadOnly();
this.initialReaderOffset = buf == null ? 0 : buf.readerOffset();
}
@Override
@ -24,75 +26,100 @@ public class BufferDataInput implements DataInput, SafeCloseable {
@Override
public void readFully(byte @NotNull [] b, int off, int len) {
buf.copyInto(buf.readerOffset(), b, off, len);
buf.readerOffset(buf.readerOffset() + len);
if (buf == null) {
if (len != 0) {
throw new IndexOutOfBoundsException();
}
} else {
buf.copyInto(buf.readerOffset(), b, off, len);
buf.readerOffset(buf.readerOffset() + len);
}
}
@Override
public int skipBytes(int n) {
n = Math.min(n, buf.readerOffset() - buf.writerOffset());
buf.readerOffset(buf.readerOffset() + n);
return n;
if (buf == null) {
if (n != 0) {
throw new IndexOutOfBoundsException();
}
return 0;
} else {
n = Math.min(n, buf.readerOffset() - buf.writerOffset());
buf.readerOffset(buf.readerOffset() + n);
return n;
}
}
@Override
public boolean readBoolean() {
if (buf == null) throw new IndexOutOfBoundsException();
return buf.readUnsignedByte() != 0;
}
@Override
public byte readByte() {
if (buf == null) throw new IndexOutOfBoundsException();
return buf.readByte();
}
@Override
public int readUnsignedByte() {
if (buf == null) throw new IndexOutOfBoundsException();
return buf.readUnsignedByte();
}
@Override
public short readShort() {
if (buf == null) throw new IndexOutOfBoundsException();
return buf.readShort();
}
@Override
public int readUnsignedShort() {
if (buf == null) throw new IndexOutOfBoundsException();
return buf.readUnsignedShort();
}
@Override
public char readChar() {
if (buf == null) throw new IndexOutOfBoundsException();
return buf.readChar();
}
@Override
public int readInt() {
if (buf == null) throw new IndexOutOfBoundsException();
return buf.readInt();
}
@Override
public long readLong() {
if (buf == null) throw new IndexOutOfBoundsException();
return buf.readLong();
}
@Override
public float readFloat() {
if (buf == null) throw new IndexOutOfBoundsException();
return buf.readFloat();
}
@Override
public double readDouble() {
if (buf == null) throw new IndexOutOfBoundsException();
return buf.readDouble();
}
@Override
public String readLine() {
if (buf == null) throw new IndexOutOfBoundsException();
throw new UnsupportedOperationException();
}
@NotNull
@Override
public String readUTF() {
if (buf == null) throw new IndexOutOfBoundsException();
var len = buf.readUnsignedShort();
byte[] bytes = new byte[len];
buf.copyInto(buf.readerOffset(), bytes, 0, len);
@ -102,10 +129,16 @@ public class BufferDataInput implements DataInput, SafeCloseable {
@Override
public void close() {
buf.close();
if (buf != null) {
buf.close();
}
}
public int getReadBytesCount() {
return buf.readerOffset() - initialReaderOffset;
if (buf == null) {
return 0;
} else {
return buf.readerOffset() - initialReaderOffset;
}
}
}

View File

@ -6,6 +6,7 @@ import io.net5.buffer.api.Send;
import java.io.IOError;
import java.io.IOException;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.warp.commonutils.error.IndexOutOfBoundsException;
public class CodecSerializer<A> implements Serializer<A> {
@ -37,7 +38,7 @@ public class CodecSerializer<A> implements Serializer<A> {
}
@Override
public @NotNull DeserializationResult<A> deserialize(@NotNull Send<Buffer> serializedToReceive) {
public @NotNull DeserializationResult<A> deserialize(@Nullable Send<Buffer> serializedToReceive) {
try (var is = new BufferDataInput(serializedToReceive)) {
int codecId;
if (microCodecs) {

View File

@ -5,22 +5,24 @@ import io.net5.buffer.api.BufferAllocator;
import io.net5.buffer.api.Send;
import it.cavallium.dbengine.database.LLUtils;
import java.nio.charset.StandardCharsets;
import java.util.Objects;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
public interface Serializer<A> {
record DeserializationResult<T>(T deserializedData, int bytesRead) {}
@NotNull DeserializationResult<A> deserialize(@NotNull Send<Buffer> serialized) throws SerializationException;
@NotNull DeserializationResult<A> deserialize(@Nullable Send<Buffer> serialized) throws SerializationException;
@NotNull Send<Buffer> serialize(@NotNull A deserialized) throws SerializationException;
@Nullable Send<Buffer> serialize(@NotNull A deserialized) throws SerializationException;
Serializer<Send<Buffer>> NOOP_SERIALIZER = new Serializer<>() {
@Override
public @NotNull DeserializationResult<Send<Buffer>> deserialize(@NotNull Send<Buffer> serialized) {
try (var serializedBuf = serialized.receive()) {
var readableBytes = serializedBuf.readableBytes();
return new DeserializationResult<>(serializedBuf.send(), readableBytes);
public @NotNull DeserializationResult<Send<Buffer>> deserialize(@Nullable Send<Buffer> serialized) {
try (var serializedBuf = serialized == null ? null : serialized.receive()) {
var readableBytes = serializedBuf == null ? 0 : serializedBuf.readableBytes();
return new DeserializationResult<>(serializedBuf == null ? null : serializedBuf.send(), readableBytes);
}
}
@ -37,7 +39,8 @@ public interface Serializer<A> {
static Serializer<String> utf8(BufferAllocator allocator) {
return new Serializer<>() {
@Override
public @NotNull DeserializationResult<String> deserialize(@NotNull Send<Buffer> serializedToReceive) {
public @NotNull DeserializationResult<String> deserialize(@Nullable Send<Buffer> serializedToReceive) {
Objects.requireNonNull(serializedToReceive);
try (Buffer serialized = serializedToReceive.receive()) {
assert serialized.isAccessible();
int length = serialized.readInt();

View File

@ -5,7 +5,9 @@ import io.net5.buffer.api.BufferAllocator;
import io.net5.buffer.api.Send;
import it.cavallium.dbengine.database.LLUtils;
import java.nio.charset.StandardCharsets;
import java.util.Objects;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
@SuppressWarnings("unused")
public interface SerializerFixedBinaryLength<A> extends Serializer<A> {
@ -15,7 +17,11 @@ public interface SerializerFixedBinaryLength<A> extends Serializer<A> {
static SerializerFixedBinaryLength<Send<Buffer>> noop(int length) {
return new SerializerFixedBinaryLength<>() {
@Override
public @NotNull DeserializationResult<Send<Buffer>> deserialize(@NotNull Send<Buffer> serialized) {
public @NotNull DeserializationResult<Send<Buffer>> deserialize(@Nullable Send<Buffer> serialized) {
if (length == 0 && serialized == null) {
return new DeserializationResult<>(null, 0);
}
Objects.requireNonNull(serialized);
try (var buf = serialized.receive()) {
if (buf.readableBytes() != getSerializedBinaryLength()) {
throw new IllegalArgumentException(
@ -49,8 +55,12 @@ public interface SerializerFixedBinaryLength<A> extends Serializer<A> {
static SerializerFixedBinaryLength<String> utf8(BufferAllocator allocator, int length) {
return new SerializerFixedBinaryLength<>() {
@Override
public @NotNull DeserializationResult<String> deserialize(@NotNull Send<Buffer> serializedToReceive)
public @NotNull DeserializationResult<String> deserialize(@Nullable Send<Buffer> serializedToReceive)
throws SerializationException {
if (length == 0 && serializedToReceive == null) {
return new DeserializationResult<>(null, 0);
}
Objects.requireNonNull(serializedToReceive);
try (var serialized = serializedToReceive.receive()) {
if (serialized.readableBytes() != getSerializedBinaryLength()) {
throw new SerializationException(
@ -89,7 +99,11 @@ public interface SerializerFixedBinaryLength<A> extends Serializer<A> {
static SerializerFixedBinaryLength<Integer> intSerializer(BufferAllocator allocator) {
return new SerializerFixedBinaryLength<>() {
@Override
public @NotNull DeserializationResult<Integer> deserialize(@NotNull Send<Buffer> serializedToReceive) {
public @NotNull DeserializationResult<Integer> deserialize(@Nullable Send<Buffer> serializedToReceive) {
if (getSerializedBinaryLength() == 0 && serializedToReceive == null) {
return new DeserializationResult<>(null, 0);
}
Objects.requireNonNull(serializedToReceive);
try (var serialized = serializedToReceive.receive()) {
if (serialized.readableBytes() != getSerializedBinaryLength()) {
throw new IllegalArgumentException(
@ -117,7 +131,11 @@ public interface SerializerFixedBinaryLength<A> extends Serializer<A> {
static SerializerFixedBinaryLength<Long> longSerializer(BufferAllocator allocator) {
return new SerializerFixedBinaryLength<>() {
@Override
public @NotNull DeserializationResult<Long> deserialize(@NotNull Send<Buffer> serializedToReceive) {
public @NotNull DeserializationResult<Long> deserialize(@Nullable Send<Buffer> serializedToReceive) {
if (getSerializedBinaryLength() == 0 && serializedToReceive == null) {
return new DeserializationResult<>(null, 0);
}
Objects.requireNonNull(serializedToReceive);
try (var serialized = serializedToReceive.receive()) {
if (serialized.readableBytes() != getSerializedBinaryLength()) {
throw new IllegalArgumentException(

View File

@ -190,7 +190,6 @@ public class LuceneUtils {
*
* @return false if the result is not relevant
*/
@Nullable
public static boolean filterTopDoc(float score, Float minCompetitiveScore) {
return minCompetitiveScore == null || score >= minCompetitiveScore;
}
@ -327,7 +326,7 @@ public class LuceneUtils {
assert i > 0 : "FileChannel.read with non zero-length bb.remaining() must always read at least one byte (FileChannel is in blocking mode, see spec of ReadableByteChannel)";
pos += (long)i;
pos += i;
}
assert readLength == 0;

View File

@ -25,8 +25,10 @@ import it.cavallium.dbengine.database.serialization.Serializer;
import it.cavallium.dbengine.database.serialization.SerializerFixedBinaryLength;
import java.nio.file.Path;
import java.util.Map;
import java.util.Objects;
import java.util.function.Function;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@ -158,7 +160,8 @@ public class DbTestUtils {
}
@Override
public @NotNull DeserializationResult<Short> deserialize(@NotNull Send<Buffer> serializedToReceive) {
public @NotNull DeserializationResult<Short> deserialize(@Nullable Send<Buffer> serializedToReceive) {
Objects.requireNonNull(serializedToReceive);
try (var serialized = serializedToReceive.receive()) {
var val = serialized.readShort();
return new DeserializationResult<>(val, Short.BYTES);

View File

@ -51,7 +51,7 @@ public class TestRanges {
byte[] firstRangeKey;
try (var firstRangeKeyBuf = DatabaseMapDictionaryDeep.firstRangeKey(alloc,
alloc.allocate(prefixKey.length).writeBytes(prefixKey).send(),
prefixKey.length, 7, 3).receive()) {
prefixKey.length, 7, 3)) {
firstRangeKey = LLUtils.toArray(firstRangeKeyBuf);
}
byte[] nextRangeKey;
@ -60,7 +60,7 @@ public class TestRanges {
prefixKey.length,
7,
3
).receive()) {
)) {
nextRangeKey = LLUtils.toArray(nextRangeKeyBuf);
}
@ -114,7 +114,7 @@ public class TestRanges {
prefixKey.length,
3,
7
).receive()) {
)) {
firstRangeKey = LLUtils.toArray(firstRangeKeyBuf);
}
try (var nextRangeKeyBuf = DatabaseMapDictionaryDeep.nextRangeKey(alloc,
@ -123,7 +123,7 @@ public class TestRanges {
prefixKey.length,
3,
7
).receive()) {
)) {
byte[] nextRangeKey = LLUtils.toArray(nextRangeKeyBuf);
if (Arrays.equals(prefixKey, new byte[] {(byte) 0xFF, (byte) 0xFF, (byte) 0xFF}) && Arrays.equals(suffixKey, new byte[] {(byte) 0xFF, (byte) 0xFF, (byte) 0xFF})) {