diff --git a/pom.xml b/pom.xml
index b9dd848..c740646 100644
--- a/pom.xml
+++ b/pom.xml
@@ -32,6 +32,13 @@
false
true
+
+ netty5-snapshots
+ Netty 5 snapshots
+ https://oss.sonatype.org/content/repositories/snapshots
+ true
+ true
+
@@ -245,7 +252,7 @@
io.netty
netty-buffer
- 4.1.63.Final
+ 5.0.0.Final-SNAPSHOT
javax.xml.bind
diff --git a/src/main/java/it/cavallium/dbengine/client/CompositeDatabase.java b/src/main/java/it/cavallium/dbengine/client/CompositeDatabase.java
index 3e97b1c..cc8c66b 100644
--- a/src/main/java/it/cavallium/dbengine/client/CompositeDatabase.java
+++ b/src/main/java/it/cavallium/dbengine/client/CompositeDatabase.java
@@ -1,6 +1,6 @@
package it.cavallium.dbengine.client;
-import io.netty.buffer.ByteBufAllocator;
+import io.netty.buffer.api.BufferAllocator;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@@ -18,7 +18,7 @@ public interface CompositeDatabase {
*/
Mono releaseSnapshot(CompositeSnapshot snapshot);
- ByteBufAllocator getAllocator();
+ BufferAllocator getAllocator();
/**
* Find corrupted items
diff --git a/src/main/java/it/cavallium/dbengine/client/MappedSerializer.java b/src/main/java/it/cavallium/dbengine/client/MappedSerializer.java
index de2ff04..5b3a0dd 100644
--- a/src/main/java/it/cavallium/dbengine/client/MappedSerializer.java
+++ b/src/main/java/it/cavallium/dbengine/client/MappedSerializer.java
@@ -1,23 +1,23 @@
package it.cavallium.dbengine.client;
-import io.netty.buffer.ByteBuf;
+import io.netty.buffer.api.Buffer;
import it.cavallium.dbengine.database.serialization.SerializationException;
import it.cavallium.dbengine.database.serialization.Serializer;
import org.jetbrains.annotations.NotNull;
-public class MappedSerializer implements Serializer {
+public class MappedSerializer implements Serializer {
- private final Serializer serializer;
+ private final Serializer serializer;
private final Mapper keyMapper;
- public MappedSerializer(Serializer serializer,
+ public MappedSerializer(Serializer serializer,
Mapper keyMapper) {
this.serializer = serializer;
this.keyMapper = keyMapper;
}
@Override
- public @NotNull B deserialize(@NotNull ByteBuf serialized) throws SerializationException {
+ public @NotNull B deserialize(@NotNull Buffer serialized) throws SerializationException {
try {
return keyMapper.map(serializer.deserialize(serialized.retain()));
} finally {
@@ -26,7 +26,7 @@ public class MappedSerializer implements Serializer {
}
@Override
- public @NotNull ByteBuf serialize(@NotNull B deserialized) throws SerializationException {
+ public @NotNull Buffer serialize(@NotNull B deserialized) throws SerializationException {
return serializer.serialize(keyMapper.unmap(deserialized));
}
}
diff --git a/src/main/java/it/cavallium/dbengine/client/MappedSerializerFixedLength.java b/src/main/java/it/cavallium/dbengine/client/MappedSerializerFixedLength.java
index f2d2508..e6b28b0 100644
--- a/src/main/java/it/cavallium/dbengine/client/MappedSerializerFixedLength.java
+++ b/src/main/java/it/cavallium/dbengine/client/MappedSerializerFixedLength.java
@@ -1,23 +1,23 @@
package it.cavallium.dbengine.client;
-import io.netty.buffer.ByteBuf;
+import io.netty.buffer.api.Buffer;
import it.cavallium.dbengine.database.serialization.SerializationException;
import it.cavallium.dbengine.database.serialization.SerializerFixedBinaryLength;
import org.jetbrains.annotations.NotNull;
-public class MappedSerializerFixedLength implements SerializerFixedBinaryLength {
+public class MappedSerializerFixedLength implements SerializerFixedBinaryLength {
- private final SerializerFixedBinaryLength fixedLengthSerializer;
+ private final SerializerFixedBinaryLength fixedLengthSerializer;
private final Mapper keyMapper;
- public MappedSerializerFixedLength(SerializerFixedBinaryLength fixedLengthSerializer,
+ public MappedSerializerFixedLength(SerializerFixedBinaryLength fixedLengthSerializer,
Mapper keyMapper) {
this.fixedLengthSerializer = fixedLengthSerializer;
this.keyMapper = keyMapper;
}
@Override
- public @NotNull B deserialize(@NotNull ByteBuf serialized) throws SerializationException {
+ public @NotNull B deserialize(@NotNull Buffer serialized) throws SerializationException {
try {
return keyMapper.map(fixedLengthSerializer.deserialize(serialized.retain()));
} finally {
@@ -26,7 +26,7 @@ public class MappedSerializerFixedLength implements SerializerFixedBinaryL
}
@Override
- public @NotNull ByteBuf serialize(@NotNull B deserialized) throws SerializationException {
+ public @NotNull Buffer serialize(@NotNull B deserialized) throws SerializationException {
return fixedLengthSerializer.serialize(keyMapper.unmap(deserialized));
}
diff --git a/src/main/java/it/cavallium/dbengine/database/Delta.java b/src/main/java/it/cavallium/dbengine/database/Delta.java
index 7546717..425b7f6 100644
--- a/src/main/java/it/cavallium/dbengine/database/Delta.java
+++ b/src/main/java/it/cavallium/dbengine/database/Delta.java
@@ -3,9 +3,46 @@ package it.cavallium.dbengine.database;
import java.util.Objects;
import org.jetbrains.annotations.Nullable;
-public record Delta(@Nullable T previous, @Nullable T current) {
+public class Delta {
+
+ private final @Nullable T previous;
+ private final @Nullable T current;
+
+ public Delta(@Nullable T previous, @Nullable T current) {
+ this.previous = previous;
+ this.current = current;
+ }
public boolean isModified() {
return !Objects.equals(previous, current);
}
+
+ public @Nullable T previous() {
+ return previous;
+ }
+
+ public @Nullable T current() {
+ return current;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == this)
+ return true;
+ if (obj == null || obj.getClass() != this.getClass())
+ return false;
+ var that = (Delta) obj;
+ return Objects.equals(this.previous, that.previous) && Objects.equals(this.current, that.current);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(previous, current);
+ }
+
+ @Override
+ public String toString() {
+ return "Delta[" + "previous=" + previous + ", " + "current=" + current + ']';
+ }
+
}
diff --git a/src/main/java/it/cavallium/dbengine/database/LLDatabaseConnection.java b/src/main/java/it/cavallium/dbengine/database/LLDatabaseConnection.java
index 39709b0..defeefa 100644
--- a/src/main/java/it/cavallium/dbengine/database/LLDatabaseConnection.java
+++ b/src/main/java/it/cavallium/dbengine/database/LLDatabaseConnection.java
@@ -1,6 +1,6 @@
package it.cavallium.dbengine.database;
-import io.netty.buffer.ByteBufAllocator;
+import io.netty.buffer.api.BufferAllocator;
import it.cavallium.dbengine.client.DatabaseOptions;
import it.cavallium.dbengine.client.IndicizerAnalyzers;
import it.cavallium.dbengine.client.IndicizerSimilarities;
@@ -11,7 +11,7 @@ import reactor.core.publisher.Mono;
@SuppressWarnings("UnusedReturnValue")
public interface LLDatabaseConnection {
- ByteBufAllocator getAllocator();
+ BufferAllocator getAllocator();
Mono extends LLDatabaseConnection> connect();
diff --git a/src/main/java/it/cavallium/dbengine/database/LLDelta.java b/src/main/java/it/cavallium/dbengine/database/LLDelta.java
new file mode 100644
index 0000000..8d4feaf
--- /dev/null
+++ b/src/main/java/it/cavallium/dbengine/database/LLDelta.java
@@ -0,0 +1,129 @@
+package it.cavallium.dbengine.database;
+
+import io.netty.buffer.api.Buffer;
+import io.netty.buffer.api.Drop;
+import io.netty.buffer.api.Owned;
+import io.netty.buffer.api.Send;
+import io.netty.buffer.api.internal.ResourceSupport;
+import java.util.StringJoiner;
+import org.jetbrains.annotations.Nullable;
+
+public class LLDelta extends ResourceSupport {
+ @Nullable
+ private final Buffer previous;
+ @Nullable
+ private final Buffer current;
+
+ private LLDelta(@Nullable Send previous, @Nullable Send current, Drop drop) {
+ super(new LLDelta.CloseOnDrop(drop));
+ assert isAllAccessible();
+ this.previous = previous != null ? previous.receive().makeReadOnly() : null;
+ this.current = current != null ? current.receive().makeReadOnly() : null;
+ }
+
+ private boolean isAllAccessible() {
+ assert previous == null || previous.isAccessible();
+ assert current == null || current.isAccessible();
+ assert this.isAccessible();
+ assert this.isOwned();
+ return true;
+ }
+
+ public static LLDelta of(Send min, Send max) {
+ return new LLDelta(min, max, d -> {});
+ }
+
+ public Send previous() {
+ ensureOwned();
+ return previous != null ? previous.copy().send() : null;
+ }
+
+ public Send current() {
+ ensureOwned();
+ return current != null ? current.copy().send() : null;
+ }
+
+ public boolean isModified() {
+ return !LLUtils.equals(previous, current);
+ }
+
+ private void ensureOwned() {
+ assert isAllAccessible();
+ if (!isOwned()) {
+ if (!isAccessible()) {
+ throw this.createResourceClosedException();
+ } else {
+ throw new IllegalStateException("Resource not owned");
+ }
+ }
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ LLDelta LLDelta = (LLDelta) o;
+ return LLUtils.equals(previous, LLDelta.previous) && LLUtils.equals(current, LLDelta.current);
+ }
+
+ @Override
+ public int hashCode() {
+ int result = LLUtils.hashCode(previous);
+ result = 31 * result + LLUtils.hashCode(current);
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ return new StringJoiner(", ", LLDelta.class.getSimpleName() + "[", "]")
+ .add("min=" + LLUtils.toString(previous))
+ .add("max=" + LLUtils.toString(current))
+ .toString();
+ }
+
+ public LLDelta copy() {
+ ensureOwned();
+ return new LLDelta(previous != null ? previous.copy().send() : null,
+ current != null ? current.copy().send() : null,
+ d -> {}
+ );
+ }
+
+ @Override
+ protected RuntimeException createResourceClosedException() {
+ return new IllegalStateException("Closed");
+ }
+
+ @Override
+ protected Owned prepareSend() {
+ Send minSend;
+ Send maxSend;
+ minSend = this.previous != null ? this.previous.send() : null;
+ maxSend = this.current != null ? this.current.send() : null;
+ return drop -> new LLDelta(minSend, maxSend, drop);
+ }
+
+ private static class CloseOnDrop implements Drop {
+
+ private final Drop delegate;
+
+ public CloseOnDrop(Drop drop) {
+ this.delegate = drop;
+ }
+
+ @Override
+ public void drop(LLDelta obj) {
+ if (obj.previous != null) {
+ obj.previous.close();
+ }
+ if (obj.current != null) {
+ obj.current.close();
+ }
+ delegate.drop(obj);
+ }
+ }
+}
diff --git a/src/main/java/it/cavallium/dbengine/database/LLDictionary.java b/src/main/java/it/cavallium/dbengine/database/LLDictionary.java
index 5e2c135..4f18cdc 100644
--- a/src/main/java/it/cavallium/dbengine/database/LLDictionary.java
+++ b/src/main/java/it/cavallium/dbengine/database/LLDictionary.java
@@ -1,7 +1,8 @@
package it.cavallium.dbengine.database;
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.ByteBufAllocator;
+import io.netty.buffer.api.Buffer;
+import io.netty.buffer.api.BufferAllocator;
+import io.netty.buffer.api.Send;
import it.cavallium.dbengine.client.BadBlock;
import it.cavallium.dbengine.database.serialization.BiSerializationFunction;
import it.cavallium.dbengine.database.serialization.SerializationFunction;
@@ -23,89 +24,90 @@ public interface LLDictionary extends LLKeyValueDatabaseStructure {
String getColumnName();
- ByteBufAllocator getAllocator();
+ BufferAllocator getAllocator();
- Mono get(@Nullable LLSnapshot snapshot, Mono key, boolean existsAlmostCertainly);
+ Mono> get(@Nullable LLSnapshot snapshot, Mono> key, boolean existsAlmostCertainly);
- default Mono get(@Nullable LLSnapshot snapshot, Mono key) {
+ default Mono> get(@Nullable LLSnapshot snapshot, Mono> key) {
return get(snapshot, key, false);
}
- Mono put(Mono key, Mono value, LLDictionaryResultType resultType);
+ Mono> put(Mono> key, Mono> value, LLDictionaryResultType resultType);
Mono getUpdateMode();
- default Mono update(Mono key,
- SerializationFunction<@Nullable ByteBuf, @Nullable ByteBuf> updater,
+ default Mono> update(Mono> key,
+ SerializationFunction<@Nullable Send, @Nullable Send> updater,
UpdateReturnMode updateReturnMode,
boolean existsAlmostCertainly) {
return this
.updateAndGetDelta(key, updater, existsAlmostCertainly)
- .transform(prev -> LLUtils.resolveDelta(prev, updateReturnMode));
+ .transform(prev -> LLUtils.resolveLLDelta(prev, updateReturnMode));
}
- default Mono update(Mono key,
- SerializationFunction<@Nullable ByteBuf, @Nullable ByteBuf> updater,
+ default Mono> update(Mono> key,
+ SerializationFunction<@Nullable Send, @Nullable Send> updater,
UpdateReturnMode returnMode) {
return update(key, updater, returnMode, false);
}
- Mono> updateAndGetDelta(Mono key,
- SerializationFunction<@Nullable ByteBuf, @Nullable ByteBuf> updater,
+ Mono updateAndGetDelta(Mono> key,
+ SerializationFunction<@Nullable Send, @Nullable Send> updater,
boolean existsAlmostCertainly);
- default Mono> updateAndGetDelta(Mono key,
- SerializationFunction<@Nullable ByteBuf, @Nullable ByteBuf> updater) {
+ default Mono updateAndGetDelta(Mono> key,
+ SerializationFunction<@Nullable Send, @Nullable Send> updater) {
return updateAndGetDelta(key, updater, false);
}
Mono clear();
- Mono remove(Mono key, LLDictionaryResultType resultType);
+ Mono> remove(Mono> key, LLDictionaryResultType resultType);
- Flux>> getMulti(@Nullable LLSnapshot snapshot,
- Flux> keys,
+ Flux, Optional>>> getMulti(@Nullable LLSnapshot snapshot,
+ Flux>> keys,
boolean existsAlmostCertainly);
- default Flux>> getMulti(@Nullable LLSnapshot snapshot, Flux> keys) {
+ default Flux, Optional>>> getMulti(@Nullable LLSnapshot snapshot,
+ Flux>> keys) {
return getMulti(snapshot, keys, false);
}
- Flux putMulti(Flux entries, boolean getOldValues);
+ Flux> putMulti(Flux> entries, boolean getOldValues);
- Flux> updateMulti(Flux> entries,
- BiSerializationFunction updateFunction);
+ Flux, X>> updateMulti(Flux, X>> entries,
+ BiSerializationFunction, X, Send> updateFunction);
- Flux getRange(@Nullable LLSnapshot snapshot, Mono range, boolean existsAlmostCertainly);
+ Flux> getRange(@Nullable LLSnapshot snapshot, Mono> range, boolean existsAlmostCertainly);
- default Flux getRange(@Nullable LLSnapshot snapshot, Mono range) {
+ default Flux> getRange(@Nullable LLSnapshot snapshot, Mono> range) {
return getRange(snapshot, range, false);
}
- Flux> getRangeGrouped(@Nullable LLSnapshot snapshot,
- Mono range,
+ Flux>> getRangeGrouped(@Nullable LLSnapshot snapshot,
+ Mono> range,
int prefixLength,
boolean existsAlmostCertainly);
- default Flux> getRangeGrouped(@Nullable LLSnapshot snapshot,
- Mono range,
+ default Flux>> getRangeGrouped(@Nullable LLSnapshot snapshot,
+ Mono> range,
int prefixLength) {
return getRangeGrouped(snapshot, range, prefixLength, false);
}
- Flux getRangeKeys(@Nullable LLSnapshot snapshot, Mono range);
+ Flux> getRangeKeys(@Nullable LLSnapshot snapshot, Mono> range);
- Flux> getRangeKeysGrouped(@Nullable LLSnapshot snapshot, Mono range, int prefixLength);
+ Flux>> getRangeKeysGrouped(@Nullable LLSnapshot snapshot, Mono> range, int prefixLength);
- Flux getRangeKeyPrefixes(@Nullable LLSnapshot snapshot, Mono range, int prefixLength);
+ Flux> getRangeKeyPrefixes(@Nullable LLSnapshot snapshot, Mono> range, int prefixLength);
- Flux badBlocks(Mono range);
+ Flux badBlocks(Mono> range);
- Mono setRange(Mono range, Flux entries);
+ Mono setRange(Mono> range, Flux> entries);
- default Mono replaceRange(Mono range,
+ default Mono replaceRange(Mono> range,
boolean canKeysChange,
- Function> entriesReplacer,
+ Function, Mono>> entriesReplacer,
boolean existsAlmostCertainly) {
return Mono.defer(() -> {
if (canKeysChange) {
@@ -124,19 +126,19 @@ public interface LLDictionary extends LLKeyValueDatabaseStructure {
});
}
- default Mono replaceRange(Mono range,
+ default Mono replaceRange(Mono> range,
boolean canKeysChange,
- Function> entriesReplacer) {
+ Function, Mono>> entriesReplacer) {
return replaceRange(range, canKeysChange, entriesReplacer, false);
}
- Mono isRangeEmpty(@Nullable LLSnapshot snapshot, Mono range);
+ Mono isRangeEmpty(@Nullable LLSnapshot snapshot, Mono> range);
- Mono sizeRange(@Nullable LLSnapshot snapshot, Mono range, boolean fast);
+ Mono sizeRange(@Nullable LLSnapshot snapshot, Mono> range, boolean fast);
- Mono getOne(@Nullable LLSnapshot snapshot, Mono range);
+ Mono> getOne(@Nullable LLSnapshot snapshot, Mono> range);
- Mono getOneKey(@Nullable LLSnapshot snapshot, Mono range);
+ Mono> getOneKey(@Nullable LLSnapshot snapshot, Mono> range);
- Mono removeOne(Mono range);
+ Mono> removeOne(Mono> range);
}
diff --git a/src/main/java/it/cavallium/dbengine/database/LLEntry.java b/src/main/java/it/cavallium/dbengine/database/LLEntry.java
index 3a7a46a..e5d73b9 100644
--- a/src/main/java/it/cavallium/dbengine/database/LLEntry.java
+++ b/src/main/java/it/cavallium/dbengine/database/LLEntry.java
@@ -1,74 +1,127 @@
package it.cavallium.dbengine.database;
-import io.netty.buffer.ByteBuf;
-import io.netty.util.IllegalReferenceCountException;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicInteger;
-import org.warp.commonutils.log.Logger;
-import org.warp.commonutils.log.LoggerFactory;
+import io.netty.buffer.api.Buffer;
+import io.netty.buffer.api.Drop;
+import io.netty.buffer.api.Owned;
+import io.netty.buffer.api.Send;
+import io.netty.buffer.api.internal.ResourceSupport;
+import java.util.StringJoiner;
+import org.jetbrains.annotations.NotNull;
-public class LLEntry {
+public class LLEntry extends ResourceSupport {
+ @NotNull
+ private final Buffer key;
+ @NotNull
+ private final Buffer value;
- private static final Logger logger = LoggerFactory.getLogger(LLEntry.class);
-
- private final AtomicInteger refCnt = new AtomicInteger(1);
-
- private final ByteBuf key;
- private final ByteBuf value;
-
- public LLEntry(ByteBuf key, ByteBuf value) {
- try {
- this.key = key.retain();
- this.value = value.retain();
- } finally {
- key.release();
- value.release();
- }
+ private LLEntry(Send key, Send value, Drop drop) {
+ super(new LLEntry.CloseOnDrop(drop));
+ assert isAllAccessible();
+ this.key = key.receive().makeReadOnly();
+ this.value = value.receive().makeReadOnly();
}
- public ByteBuf getKey() {
- if (refCnt.get() <= 0) {
- throw new IllegalReferenceCountException(refCnt.get());
- }
+ private boolean isAllAccessible() {
+ assert key.isAccessible();
+ assert value.isAccessible();
+ assert this.isAccessible();
+ assert this.isOwned();
+ return true;
+ }
+
+ public static LLEntry of(Send key, Send value) {
+ return new LLEntry(key, value, d -> {});
+ }
+
+ public Send getKey() {
+ ensureOwned();
+ return key.copy().send();
+ }
+
+ public Buffer getKeyUnsafe() {
return key;
}
- public ByteBuf getValue() {
- if (refCnt.get() <= 0) {
- throw new IllegalReferenceCountException(refCnt.get());
- }
+ public Send getValue() {
+ ensureOwned();
+ return value.copy().send();
+ }
+
+
+ public Buffer getValueUnsafe() {
return value;
}
- public void retain() {
- if (refCnt.getAndIncrement() <= 0) {
- throw new IllegalReferenceCountException(refCnt.get(), 1);
+ private void ensureOwned() {
+ assert isAllAccessible();
+ if (!isOwned()) {
+ if (!isAccessible()) {
+ throw this.createResourceClosedException();
+ } else {
+ throw new IllegalStateException("Resource not owned");
+ }
}
- key.retain();
- value.retain();
- }
-
- public void release() {
- if (refCnt.decrementAndGet() < 0) {
- throw new IllegalReferenceCountException(refCnt.get(), -1);
- }
- if (key.refCnt() > 0) {
- key.release();
- }
- if (value.refCnt() > 0) {
- value.release();
- }
- }
-
- public boolean isReleased() {
- return refCnt.get() <= 0;
}
@Override
- protected void finalize() throws Throwable {
- if (refCnt.get() > 0) {
- logger.warn(this.getClass().getName() + "::release has not been called!");
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ LLEntry LLEntry = (LLEntry) o;
+ return LLUtils.equals(key, LLEntry.key) && LLUtils.equals(value, LLEntry.value);
+ }
+
+ @Override
+ public int hashCode() {
+ int result = LLUtils.hashCode(key);
+ result = 31 * result + LLUtils.hashCode(value);
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ return new StringJoiner(", ", LLEntry.class.getSimpleName() + "[", "]")
+ .add("key=" + LLUtils.toString(key))
+ .add("value=" + LLUtils.toString(value))
+ .toString();
+ }
+
+ public LLEntry copy() {
+ ensureOwned();
+ return new LLEntry(key.copy().send(), value.copy().send(), d -> {});
+ }
+
+ @Override
+ protected RuntimeException createResourceClosedException() {
+ return new IllegalStateException("Closed");
+ }
+
+ @Override
+ protected Owned prepareSend() {
+ Send keySend;
+ Send valueSend;
+ keySend = this.key.send();
+ valueSend = this.value.send();
+ return drop -> new LLEntry(keySend, valueSend, drop);
+ }
+
+ private static class CloseOnDrop implements Drop {
+
+ private final Drop delegate;
+
+ public CloseOnDrop(Drop drop) {
+ this.delegate = drop;
+ }
+
+ @Override
+ public void drop(LLEntry obj) {
+ obj.key.close();
+ obj.value.close();
+ delegate.drop(obj);
}
- super.finalize();
}
}
diff --git a/src/main/java/it/cavallium/dbengine/database/LLKeyValueDatabase.java b/src/main/java/it/cavallium/dbengine/database/LLKeyValueDatabase.java
index 568f5c6..61023e3 100644
--- a/src/main/java/it/cavallium/dbengine/database/LLKeyValueDatabase.java
+++ b/src/main/java/it/cavallium/dbengine/database/LLKeyValueDatabase.java
@@ -2,7 +2,7 @@ package it.cavallium.dbengine.database;
import com.google.common.primitives.Ints;
import com.google.common.primitives.Longs;
-import io.netty.buffer.ByteBufAllocator;
+import io.netty.buffer.api.BufferAllocator;
import it.cavallium.dbengine.database.collections.DatabaseInt;
import it.cavallium.dbengine.database.collections.DatabaseLong;
import java.nio.charset.StandardCharsets;
@@ -46,7 +46,7 @@ public interface LLKeyValueDatabase extends LLSnapshottable, LLKeyValueDatabaseS
Mono verifyChecksum();
- ByteBufAllocator getAllocator();
+ BufferAllocator getAllocator();
Mono close();
}
diff --git a/src/main/java/it/cavallium/dbengine/database/LLRange.java b/src/main/java/it/cavallium/dbengine/database/LLRange.java
index c31831d..3bb8fd1 100644
--- a/src/main/java/it/cavallium/dbengine/database/LLRange.java
+++ b/src/main/java/it/cavallium/dbengine/database/LLRange.java
@@ -1,117 +1,146 @@
package it.cavallium.dbengine.database;
import static io.netty.buffer.Unpooled.wrappedBuffer;
-import static io.netty.buffer.Unpooled.wrappedUnmodifiableBuffer;
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.ByteBufUtil;
-import io.netty.util.IllegalReferenceCountException;
-import java.util.Arrays;
+import io.netty.buffer.api.Buffer;
+import io.netty.buffer.api.Drop;
+import io.netty.buffer.api.Owned;
+import io.netty.buffer.api.Send;
+import io.netty.buffer.api.internal.ResourceSupport;
import java.util.StringJoiner;
-import java.util.concurrent.atomic.AtomicInteger;
/**
* Range of data, from min (inclusive),to max (exclusive)
*/
-public class LLRange {
+public class LLRange extends ResourceSupport {
- private static final LLRange RANGE_ALL = new LLRange(null, null, false);
- private final ByteBuf min;
- private final ByteBuf max;
- private final boolean releasable;
- private final AtomicInteger refCnt = new AtomicInteger(1);
+ private static final LLRange RANGE_ALL = new LLRange(null, null, null, d -> {});
+ private Buffer min;
+ private Buffer max;
+ private Buffer single;
- private LLRange(ByteBuf min, ByteBuf max, boolean releasable) {
- assert min == null || min.refCnt() > 0;
- assert max == null || max.refCnt() > 0;
- this.min = min;
- this.max = max;
- this.releasable = releasable;
+ private LLRange(Send min, Send max, Send single, Drop drop) {
+ super(new CloseOnDrop(drop));
+ assert isAllAccessible();
+ assert single == null || (min == null && max == null);
+ this.min = min != null ? min.receive().makeReadOnly() : null;
+ this.max = max != null ? max.receive().makeReadOnly() : null;
+ this.single = single != null ? single.receive().makeReadOnly() : null;
+ }
+
+ private boolean isAllAccessible() {
+ assert min == null || min.isAccessible();
+ assert max == null || max.isAccessible();
+ assert single == null || single.isAccessible();
+ assert this.isAccessible();
+ assert this.isOwned();
+ return true;
}
public static LLRange all() {
- return RANGE_ALL;
+ return RANGE_ALL.copy();
}
- public static LLRange from(ByteBuf min) {
- return new LLRange(min, null, true);
+ public static LLRange from(Send min) {
+ return new LLRange(min, null, null, d -> {});
}
- public static LLRange to(ByteBuf max) {
- return new LLRange(null, max, true);
+ public static LLRange to(Send max) {
+ return new LLRange(null, max, null, d -> {});
}
- public static LLRange single(ByteBuf single) {
- try {
- return new LLRange(single.retain(), single.retain(), true);
- } finally {
- single.release();
- }
+ public static LLRange single(Send single) {
+ return new LLRange(null, null, single, d -> {});
}
- public static LLRange of(ByteBuf min, ByteBuf max) {
- return new LLRange(min, max, true);
+ public static LLRange of(Send min, Send max) {
+ return new LLRange(min, max, null, d -> {});
}
public boolean isAll() {
- checkReleased();
- assert min == null || min.refCnt() > 0;
- assert max == null || max.refCnt() > 0;
- return min == null && max == null;
+ ensureOwned();
+ return min == null && max == null && single == null;
}
public boolean isSingle() {
- checkReleased();
- assert min == null || min.refCnt() > 0;
- assert max == null || max.refCnt() > 0;
- if (min == null || max == null) return false;
- return LLUtils.equals(min, max);
+ ensureOwned();
+ return single != null;
}
public boolean hasMin() {
- checkReleased();
- assert min == null || min.refCnt() > 0;
- assert max == null || max.refCnt() > 0;
- return min != null;
+ ensureOwned();
+ return min != null || single != null;
}
- public ByteBuf getMin() {
- checkReleased();
- assert min == null || min.refCnt() > 0;
- assert max == null || max.refCnt() > 0;
- assert min != null;
- return min;
+ public Send getMin() {
+ ensureOwned();
+ if (min != null) {
+ return min.copy().send();
+ } else if (single != null) {
+ return single.copy().send();
+ } else {
+ return null;
+ }
+ }
+
+ public Buffer getMinUnsafe() {
+ ensureOwned();
+ if (min != null) {
+ return min;
+ } else if (single != null) {
+ return single;
+ } else {
+ return null;
+ }
}
public boolean hasMax() {
- checkReleased();
- assert min == null || min.refCnt() > 0;
- assert max == null || max.refCnt() > 0;
- return max != null;
+ ensureOwned();
+ return max != null || single != null;
}
- public ByteBuf getMax() {
- checkReleased();
- assert min == null || min.refCnt() > 0;
- assert max == null || max.refCnt() > 0;
- assert max != null;
- return max;
- }
-
- public ByteBuf getSingle() {
- checkReleased();
- assert min == null || min.refCnt() > 0;
- assert max == null || max.refCnt() > 0;
- assert isSingle();
- return min;
- }
-
- private void checkReleased() {
- if (!releasable) {
- return;
+ public Send getMax() {
+ ensureOwned();
+ if (max != null) {
+ return max.copy().send();
+ } else if (single != null) {
+ return single.copy().send();
+ } else {
+ return null;
}
- if (refCnt.get() <= 0) {
- throw new IllegalReferenceCountException(0);
+ }
+
+ public Buffer getMaxUnsafe() {
+ ensureOwned();
+ if (max != null) {
+ return max;
+ } else if (single != null) {
+ return single;
+ } else {
+ return null;
+ }
+ }
+
+ public Send getSingle() {
+ ensureOwned();
+ assert isSingle();
+ return single != null ? single.copy().send() : null;
+ }
+
+ public Buffer getSingleUnsafe() {
+ ensureOwned();
+ assert isSingle();
+ return single;
+ }
+
+ private void ensureOwned() {
+ assert isAllAccessible();
+ if (!isOwned()) {
+ if (!isAccessible()) {
+ throw this.createResourceClosedException();
+ } else {
+ throw new IllegalStateException("Resource not owned");
+ }
}
}
@@ -142,34 +171,53 @@ public class LLRange {
.toString();
}
- public LLRange retain() {
- if (!releasable) {
- return this;
- }
- if (refCnt.updateAndGet(refCnt -> refCnt <= 0 ? 0 : (refCnt + 1)) <= 0) {
- throw new IllegalReferenceCountException(0, 1);
- }
- if (min != null) {
- min.retain();
- }
- if (max != null) {
- max.retain();
- }
- return this;
+ public LLRange copy() {
+ ensureOwned();
+ return new LLRange(min != null ? min.copy().send() : null,
+ max != null ? max.copy().send() : null,
+ single != null ? single.copy().send(): null,
+ d -> {}
+ );
}
- public void release() {
- if (!releasable) {
- return;
+ @Override
+ protected RuntimeException createResourceClosedException() {
+ return new IllegalStateException("Closed");
+ }
+
+ @Override
+ protected Owned prepareSend() {
+ Send minSend;
+ Send maxSend;
+ Send singleSend;
+ minSend = this.min != null ? this.min.send() : null;
+ maxSend = this.max != null ? this.max.send() : null;
+ singleSend = this.single != null ? this.single.send() : null;
+ this.makeInaccessible();
+ return drop -> new LLRange(minSend, maxSend, singleSend, drop);
+ }
+
+ private void makeInaccessible() {
+ this.min = null;
+ this.max = null;
+ this.single = null;
+ }
+
+ private static class CloseOnDrop implements Drop {
+
+ private final Drop delegate;
+
+ public CloseOnDrop(Drop drop) {
+ this.delegate = drop;
}
- if (refCnt.decrementAndGet() < 0) {
- throw new IllegalReferenceCountException(0, -1);
- }
- if (min != null) {
- min.release();
- }
- if (max != null) {
- max.release();
+
+ @Override
+ public void drop(LLRange obj) {
+ if (obj.min != null) obj.min.close();
+ if (obj.max != null) obj.max.close();
+ if (obj.single != null) obj.single.close();
+ obj.makeInaccessible();
+ delegate.drop(obj);
}
}
}
diff --git a/src/main/java/it/cavallium/dbengine/database/LLUtils.java b/src/main/java/it/cavallium/dbengine/database/LLUtils.java
index 9c16c7e..77e537f 100644
--- a/src/main/java/it/cavallium/dbengine/database/LLUtils.java
+++ b/src/main/java/it/cavallium/dbengine/database/LLUtils.java
@@ -2,21 +2,17 @@ package it.cavallium.dbengine.database;
import com.google.common.primitives.Ints;
import com.google.common.primitives.Longs;
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.ByteBufAllocator;
-import io.netty.buffer.ByteBufUtil;
-import io.netty.buffer.CompositeByteBuf;
-import io.netty.buffer.Unpooled;
-import io.netty.util.AbstractReferenceCounted;
+import io.netty.buffer.api.Buffer;
+import io.netty.buffer.api.BufferAllocator;
+import io.netty.buffer.api.CompositeBuffer;
+import io.netty.buffer.api.Send;
import io.netty.util.IllegalReferenceCountException;
-import io.netty.util.ReferenceCounted;
-import it.cavallium.dbengine.database.disk.ReleasableSlice;
import it.cavallium.dbengine.database.serialization.SerializationException;
import it.cavallium.dbengine.database.serialization.SerializationFunction;
import it.cavallium.dbengine.lucene.RandomSortField;
import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
@@ -24,7 +20,8 @@ import java.util.Map.Entry;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.Callable;
-import java.util.function.Function;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.function.ToIntFunction;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
@@ -45,7 +42,6 @@ import org.jetbrains.annotations.Nullable;
import org.rocksdb.RocksDB;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.warp.commonutils.functional.IOFunction;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuple2;
@@ -56,6 +52,7 @@ public class LLUtils {
private static final Logger logger = LoggerFactory.getLogger(LLUtils.class);
+ private static final ByteBuffer EMPTY_BYTE_BUFFER = ByteBuffer.allocateDirect(0);
private static final byte[] RESPONSE_TRUE = new byte[]{1};
private static final byte[] RESPONSE_FALSE = new byte[]{0};
private static final byte[] RESPONSE_TRUE_BUF = new byte[]{1};
@@ -73,12 +70,10 @@ public class LLUtils {
return response[0] == 1;
}
- public static boolean responseToBoolean(ByteBuf response) {
- try {
+ public static boolean responseToBoolean(Buffer response) {
+ try (response) {
assert response.readableBytes() == 1;
- return response.getByte(response.readerIndex()) == 1;
- } finally {
- response.release();
+ return response.getByte(response.readerOffset()) == 1;
}
}
@@ -86,8 +81,8 @@ public class LLUtils {
return bool ? RESPONSE_TRUE : RESPONSE_FALSE;
}
- public static ByteBuf booleanToResponseByteBuffer(boolean bool) {
- return Unpooled.wrappedBuffer(booleanToResponse(bool));
+ public static Buffer booleanToResponseByteBuffer(BufferAllocator alloc, boolean bool) {
+ return alloc.allocate(1).writeByte(bool ? (byte) 1 : 0);
}
@Nullable
@@ -171,9 +166,9 @@ public class LLUtils {
return new it.cavallium.dbengine.database.LLKeyScore(hit.docId(), hit.score(), hit.key());
}
- public static String toStringSafe(ByteBuf key) {
+ public static String toStringSafe(Buffer key) {
try {
- if (key.refCnt() > 0) {
+ if (key.isAccessible()) {
return toString(key);
} else {
return "(released)";
@@ -183,11 +178,11 @@ public class LLUtils {
}
}
- public static String toString(ByteBuf key) {
+ public static String toString(Buffer key) {
if (key == null) {
return "null";
} else {
- int startIndex = key.readerIndex();
+ int startIndex = key.readerOffset();
int iMax = key.readableBytes() - 1;
int iLimit = 128;
if (iMax <= -1) {
@@ -213,111 +208,117 @@ public class LLUtils {
}
}
- public static boolean equals(ByteBuf a, ByteBuf b) {
+ public static boolean equals(Buffer a, Buffer b) {
if (a == null && b == null) {
return true;
} else if (a != null && b != null) {
- return ByteBufUtil.equals(a, b);
+ var aCur = a.openCursor();
+ var bCur = b.openCursor();
+ if (aCur.bytesLeft() != bCur.bytesLeft()) {
+ return false;
+ }
+ while (aCur.readByte() && bCur.readByte()) {
+ if (aCur.getByte() != bCur.getByte()) {
+ return false;
+ }
+ }
+ return true;
} else {
return false;
}
}
- public static byte[] toArray(ByteBuf key) {
- if (key.hasArray()) {
- return Arrays.copyOfRange(key.array(), key.arrayOffset() + key.readerIndex(), key.arrayOffset() + key.writerIndex());
- } else {
- byte[] keyBytes = new byte[key.readableBytes()];
- key.getBytes(key.readerIndex(), keyBytes, 0, key.readableBytes());
- return keyBytes;
- }
+ public static byte[] toArray(Buffer key) {
+ byte[] array = new byte[key.readableBytes()];
+ key.copyInto(key.readerOffset(), array, 0, key.readableBytes());
+ return array;
}
- public static List toArray(List input) {
+ public static List toArray(List input) {
List result = new ArrayList<>(input.size());
- for (ByteBuf byteBuf : input) {
+ for (Buffer byteBuf : input) {
result.add(toArray(byteBuf));
}
return result;
}
- public static int hashCode(ByteBuf buf) {
- return buf == null ? 0 : buf.hashCode();
+ public static int hashCode(Buffer buf) {
+ if (buf == null)
+ return 0;
+
+ int result = 1;
+ var cur = buf.openCursor();
+ while (cur.readByte()) {
+ var element = cur.getByte();
+ result = 31 * result + element;
+ }
+
+ return result;
}
+ /**
+ *
+ * @return null if size is equal to RocksDB.NOT_FOUND
+ */
@Nullable
- public static ByteBuf readNullableDirectNioBuffer(ByteBufAllocator alloc, ToIntFunction reader) {
- ByteBuf buffer = alloc.directBuffer();
- ByteBuf directBuffer = null;
+ public static Buffer readNullableDirectNioBuffer(BufferAllocator alloc, ToIntFunction reader) {
+ Buffer buffer = alloc.allocate(4096);
ByteBuffer nioBuffer;
int size;
- Boolean mustBeCopied = null;
do {
- if (mustBeCopied == null || !mustBeCopied) {
- nioBuffer = LLUtils.toDirectFast(buffer);
- if (nioBuffer != null) {
- nioBuffer.limit(nioBuffer.capacity());
- }
- } else {
- nioBuffer = null;
- }
- if ((mustBeCopied != null && mustBeCopied) || nioBuffer == null) {
- directBuffer = buffer;
- nioBuffer = directBuffer.nioBuffer(0, directBuffer.capacity());
- mustBeCopied = true;
- } else {
- mustBeCopied = false;
- }
- try {
- assert nioBuffer.isDirect();
- size = reader.applyAsInt(nioBuffer);
- if (size != RocksDB.NOT_FOUND) {
- if (mustBeCopied) {
- buffer.writerIndex(0).writeBytes(nioBuffer);
- }
- if (size == nioBuffer.limit()) {
- buffer.setIndex(0, size);
- return buffer;
- } else {
- assert size > nioBuffer.limit();
- assert nioBuffer.limit() > 0;
- buffer.capacity(size);
- }
- }
- } finally {
- if (nioBuffer != null) {
- nioBuffer = null;
- }
- if(directBuffer != null) {
- directBuffer.release();
- directBuffer = null;
+ nioBuffer = LLUtils.toDirect(buffer);
+ nioBuffer.limit(nioBuffer.capacity());
+ assert nioBuffer.isDirect();
+ size = reader.applyAsInt(nioBuffer);
+ if (size != RocksDB.NOT_FOUND) {
+ if (size == nioBuffer.limit()) {
+ buffer.readerOffset(0).writerOffset(size);
+ return buffer;
+ } else {
+ assert size > nioBuffer.limit();
+ assert nioBuffer.limit() > 0;
+ buffer.ensureWritable(size);
}
}
} while (size != RocksDB.NOT_FOUND);
+
+ // Return null if size is equal to RocksDB.NOT_FOUND
return null;
}
@Nullable
- public static ByteBuffer toDirectFast(ByteBuf buffer) {
- ByteBuffer result = buffer.nioBuffer(0, buffer.capacity());
- if (result.isDirect()) {
- result.limit(buffer.writerIndex());
+ public static ByteBuffer toDirectFast(Buffer buffer) {
+ int readableComponents = buffer.countReadableComponents();
+ if (readableComponents > 0) {
+ AtomicReference byteBufferReference = new AtomicReference<>(null);
+ buffer.forEachReadable(0, (index, component) -> {
+ byteBufferReference.setPlain(component.readableBuffer());
+ return false;
+ });
+ ByteBuffer byteBuffer = byteBufferReference.getPlain();
+ if (byteBuffer != null && byteBuffer.isDirect()) {
+ byteBuffer.limit(buffer.writerOffset());
- assert result.isDirect();
- assert result.capacity() == buffer.capacity();
- assert buffer.readerIndex() == result.position();
- assert result.limit() - result.position() == buffer.readableBytes();
+ assert byteBuffer.isDirect();
+ assert byteBuffer.capacity() == buffer.capacity();
+ assert buffer.readerOffset() == byteBuffer.position();
+ assert byteBuffer.limit() - byteBuffer.position() == buffer.readableBytes();
- return result;
+ return byteBuffer;
+ } else {
+ return null;
+ }
+ } else if (readableComponents == 0) {
+ return EMPTY_BYTE_BUFFER;
} else {
return null;
}
}
- public static ByteBuffer toDirect(ByteBuf buffer) {
+ public static ByteBuffer toDirect(Buffer buffer) {
ByteBuffer result = toDirectFast(buffer);
if (result == null) {
- throw new IllegalArgumentException("The supplied ByteBuf is not direct "
+ throw new IllegalArgumentException("The supplied Buffer is not direct "
+ "(if it's a CompositeByteBuf it must be consolidated before)");
}
assert result.isDirect();
@@ -325,9 +326,9 @@ public class LLUtils {
}
/*
- public static ByteBuf toDirectCopy(ByteBuf buffer) {
+ public static Buffer toDirectCopy(Buffer buffer) {
try {
- ByteBuf directCopyBuf = buffer.alloc().buffer(buffer.capacity(), buffer.maxCapacity());
+ Buffer directCopyBuf = buffer.alloc().buffer(buffer.capacity(), buffer.maxCapacity());
directCopyBuf.writeBytes(buffer, 0, buffer.writerIndex());
return directCopyBuf;
} finally {
@@ -336,26 +337,14 @@ public class LLUtils {
}
*/
- public static ByteBuf convertToDirectByteBuf(ByteBufAllocator alloc, ByteBuf buffer) {
- ByteBuf result;
- ByteBuf directCopyBuf = alloc.buffer(buffer.capacity(), buffer.maxCapacity());
- directCopyBuf.writeBytes(buffer, 0, buffer.writerIndex());
- directCopyBuf.readerIndex(buffer.readerIndex());
- result = directCopyBuf;
- assert result.isDirect();
- assert result.capacity() == buffer.capacity();
- assert buffer.readerIndex() == result.readerIndex();
- return result;
- }
-
- public static ByteBuf fromByteArray(ByteBufAllocator alloc, byte[] array) {
- ByteBuf result = alloc.buffer(array.length);
+ public static Buffer fromByteArray(BufferAllocator alloc, byte[] array) {
+ Buffer result = alloc.allocate(array.length);
result.writeBytes(array);
return result;
}
@NotNull
- public static ByteBuf readDirectNioBuffer(ByteBufAllocator alloc, ToIntFunction reader) {
+ public static Buffer readDirectNioBuffer(BufferAllocator alloc, ToIntFunction reader) {
var buffer = readNullableDirectNioBuffer(alloc, reader);
if (buffer == null) {
throw new IllegalStateException("A non-nullable buffer read operation tried to return a \"not found\" element");
@@ -363,81 +352,54 @@ public class LLUtils {
return buffer;
}
- public static ByteBuf compositeBuffer(ByteBufAllocator alloc, ByteBuf buffer) {
- return buffer;
- }
-
- public static ByteBuf compositeBuffer(ByteBufAllocator alloc, ByteBuf buffer1, ByteBuf buffer2) {
- try {
- if (buffer1.readableBytes() == 0) {
- return compositeBuffer(alloc, buffer2.retain());
- } else if (buffer2.readableBytes() == 0) {
- return compositeBuffer(alloc, buffer1.retain());
- }
- CompositeByteBuf result = alloc.compositeBuffer(2);
- try {
- result.addComponent(true, buffer1.retain());
- result.addComponent(true, buffer2.retain());
- return result.consolidate().retain();
- } finally {
- result.release();
- }
- } finally {
- buffer1.release();
- buffer2.release();
+ public static Send compositeBuffer(BufferAllocator alloc, Send buffer) {
+ try (var composite = buffer.receive().compact()) {
+ assert composite.countReadableComponents() == 1 || composite.countReadableComponents() == 0;
+ return composite.send();
}
}
- public static ByteBuf compositeBuffer(ByteBufAllocator alloc, ByteBuf buffer1, ByteBuf buffer2, ByteBuf buffer3) {
- try {
- if (buffer1.readableBytes() == 0) {
- return compositeBuffer(alloc, buffer2.retain(), buffer3.retain());
- } else if (buffer2.readableBytes() == 0) {
- return compositeBuffer(alloc, buffer1.retain(), buffer3.retain());
- } else if (buffer3.readableBytes() == 0) {
- return compositeBuffer(alloc, buffer1.retain(), buffer2.retain());
+ public static Send compositeBuffer(BufferAllocator alloc, Send buffer1, Send buffer2) {
+ try (buffer1) {
+ try (buffer2) {
+ try (var composite = CompositeBuffer.compose(alloc, buffer1, buffer2).compact()) {
+ assert composite.countReadableComponents() == 1 || composite.countReadableComponents() == 0;
+ return composite.send();
+ }
}
- CompositeByteBuf result = alloc.compositeBuffer(3);
- try {
- result.addComponent(true, buffer1.retain());
- result.addComponent(true, buffer2.retain());
- result.addComponent(true, buffer3.retain());
- return result.consolidate().retain();
- } finally {
- result.release();
- }
- } finally {
- buffer1.release();
- buffer2.release();
- buffer3.release();
}
}
- public static ByteBuf compositeBuffer(ByteBufAllocator alloc, ByteBuf... buffers) {
- try {
- switch (buffers.length) {
- case 0:
- return alloc.buffer(0);
- case 1:
- return compositeBuffer(alloc, buffers[0].retain().retain());
- case 2:
- return compositeBuffer(alloc, buffers[0].retain(), buffers[1].retain());
- case 3:
- return compositeBuffer(alloc, buffers[0].retain(), buffers[1].retain(), buffers[2].retain());
- default:
- CompositeByteBuf result = alloc.compositeBuffer(buffers.length);
- try {
- for (ByteBuf buffer : buffers) {
- result.addComponent(true, buffer.retain());
- }
- return result.consolidate().retain();
- } finally {
- result.release();
+ public static Send compositeBuffer(BufferAllocator alloc, Send buffer1, Send buffer2, Send buffer3) {
+ try (buffer1) {
+ try (buffer2) {
+ try (buffer3) {
+ try (var composite = CompositeBuffer.compose(alloc, buffer1, buffer2, buffer3).compact()) {
+ assert composite.countReadableComponents() == 1 || composite.countReadableComponents() == 0;
+ return composite.send();
}
+ }
}
+ }
+ }
+
+ public static Send compositeBuffer(BufferAllocator alloc, Send... buffers) {
+ try {
+ return switch (buffers.length) {
+ case 0 -> alloc.allocate(0).send();
+ case 1 -> compositeBuffer(alloc, buffers[0]);
+ case 2 -> compositeBuffer(alloc, buffers[0], buffers[1]);
+ case 3 -> compositeBuffer(alloc, buffers[0], buffers[1], buffers[2]);
+ default -> {
+ try (var composite = CompositeBuffer.compose(alloc, buffers).compact()) {
+ assert composite.countReadableComponents() == 1 || composite.countReadableComponents() == 0;
+ yield composite.send();
+ }
+ }
+ };
} finally {
- for (ByteBuf buffer : buffers) {
- buffer.release();
+ for (Send buffer : buffers) {
+ buffer.close();
}
}
}
@@ -467,6 +429,33 @@ public class LLUtils {
});
}
+ public static Mono> resolveLLDelta(Mono prev, UpdateReturnMode updateReturnMode) {
+ return prev.handle((delta, sink) -> {
+ try (delta) {
+ switch (updateReturnMode) {
+ case GET_NEW_VALUE -> {
+ var current = delta.current();
+ if (current != null) {
+ sink.next(current);
+ } else {
+ sink.complete();
+ }
+ }
+ case GET_OLD_VALUE -> {
+ var previous = delta.previous();
+ if (previous != null) {
+ sink.next(previous);
+ } else {
+ sink.complete();
+ }
+ }
+ case NOTHING -> sink.complete();
+ default -> sink.error(new IllegalStateException());
+ }
+ }
+ });
+ }
+
public static Mono> mapDelta(Mono> mono,
SerializationFunction<@NotNull T, @Nullable U> mapper) {
return mono.handle((delta, sink) -> {
@@ -492,38 +481,57 @@ public class LLUtils {
});
}
+ public static Mono> mapLLDelta(Mono mono,
+ SerializationFunction<@NotNull Send, @Nullable U> mapper) {
+ return mono.handle((delta, sink) -> {
+ try {
+ try (Send prev = delta.previous()) {
+ try (Send curr = delta.current()) {
+ U newPrev;
+ U newCurr;
+ if (prev != null) {
+ newPrev = mapper.apply(prev);
+ } else {
+ newPrev = null;
+ }
+ if (curr != null) {
+ newCurr = mapper.apply(curr);
+ } else {
+ newCurr = null;
+ }
+ sink.next(new Delta<>(newPrev, newCurr));
+ }
+ }
+ } catch (SerializationException ex) {
+ sink.error(ex);
+ }
+ });
+ }
+
public static boolean isDeltaChanged(Delta delta) {
return !Objects.equals(delta.previous(), delta.current());
}
- public static Mono lazyRetain(ByteBuf buf) {
- return Mono.just(buf).map(ByteBuf::retain);
+ public static Mono> lazyRetain(Buffer buf) {
+ return Mono.just(buf).map(b -> b.copy().send());
}
- public static Mono lazyRetainRange(LLRange range) {
- return Mono.just(range).map(LLRange::retain);
+ public static Mono> lazyRetainRange(LLRange range) {
+ return Mono.just(range).map(r -> r.copy().send());
}
- public static Mono lazyRetain(Callable bufCallable) {
- return Mono.fromCallable(bufCallable).cacheInvalidateIf(byteBuf -> {
- // Retain if the value has been cached previously
- byteBuf.retain();
- return false;
- });
+ public static Mono> lazyRetain(Callable> bufCallable) {
+ return Mono.fromCallable(bufCallable);
}
- public static Mono lazyRetainRange(Callable