getPoolArenaMetrics(PooledBufferAllocator allocator) {
- var metric = allocator.metric();
- try {
- // Invoke the method to get the metrics
- return (List) GET_ARENA_METRICS.invoke(metric);
- } catch (Throwable e) {
- return List.of();
- }
- }
-}
diff --git a/src/main/java/it/cavallium/dbengine/buffers/Buf.java b/src/main/java/it/cavallium/dbengine/buffers/Buf.java
new file mode 100644
index 0000000..2e678d2
--- /dev/null
+++ b/src/main/java/it/cavallium/dbengine/buffers/Buf.java
@@ -0,0 +1,182 @@
+package it.cavallium.dbengine.buffers;
+
+import com.google.common.primitives.Ints;
+import com.google.common.primitives.Longs;
+import it.unimi.dsi.fastutil.bytes.ByteArrayList;
+import it.unimi.dsi.fastutil.bytes.ByteList;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.RandomAccess;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+import org.warp.commonutils.stream.SafeByteArrayInputStream;
+import org.warp.commonutils.stream.SafeByteArrayOutputStream;
+import org.warp.commonutils.stream.SafeDataOutput;
+
+public interface Buf extends ByteList, RandomAccess {
+ static Buf wrap(ByteList bytes) {
+ if (bytes instanceof Buf buf) {
+ return buf;
+ } else if (bytes instanceof ByteArrayList byteArrayList) {
+ return ByteListBuf.wrap(byteArrayList.elements(), byteArrayList.size());
+ } else {
+ return ByteListBuf.wrap(bytes.toByteArray());
+ }
+ }
+ static Buf wrap(ByteList bytes, int from, int to) {
+ if (bytes instanceof Buf buf) {
+ return buf.subList(from, to);
+ } else if (bytes instanceof ByteArrayList byteArrayList) {
+ return ByteListBuf.wrap(byteArrayList.elements(), byteArrayList.size()).subList(from, to);
+ } else {
+ return ByteListBuf.wrap(bytes.toByteArray()).subList(from, to);
+ }
+ }
+
+ static Buf wrap(byte[] bytes) {
+ return ByteListBuf.wrap(bytes);
+ }
+
+ static Buf wrap(byte[] bytes, int from, int to) {
+ return ByteListBuf.wrap(bytes, to).subList(from, to);
+ }
+
+ static Buf create(int initialCapacity) {
+ return new ByteListBuf(initialCapacity);
+ }
+
+ static Buf copyOf(byte[] original) {
+ return new ByteListBuf(original);
+ }
+
+ static Buf create() {
+ return new ByteListBuf();
+ }
+
+ static Buf wrap(byte[] array, int length) {
+ return ByteListBuf.wrap(array, length);
+ }
+
+ static Buf createZeroes(int length) {
+ return ByteListBuf.wrap(new byte[length], length);
+ }
+
+ /**
+ * Get this element as an array, converting it if needed
+ */
+ byte @NotNull[] asArray();
+
+ /**
+ * Get this element as an array, only if it's already an array, otherwise return null
+ */
+ byte @Nullable[] asArrayStrict();
+
+ /**
+ * Get this element as an array with equal or bigger size, converting it if needed
+ * The returned array may be bigger than expected!
+ */
+ byte @Nullable[] asUnboundedArray();
+
+ /**
+ * Get this element as an array with equal or bigger size, only if it's already an array, otherwise return null
+ * The returned array may be bigger than expected!
+ */
+ byte @Nullable[] asUnboundedArrayStrict();
+
+ boolean isMutable();
+
+ void freeze();
+
+ @Override
+ Buf subList(int from, int to);
+
+ Buf copy();
+
+ SafeByteArrayInputStream binaryInputStream();
+
+ void writeTo(SafeDataOutput dataOutput);
+
+ default long getLong(int i) {
+ return Longs.fromBytes(getByte(i),
+ getByte(i + 1),
+ getByte(i + 2),
+ getByte(i + 3),
+ getByte(i + 4),
+ getByte(i + 5),
+ getByte(i + 6),
+ getByte(i + 7)
+ );
+ }
+
+ default int getInt(int i) {
+ return Ints.fromBytes(getByte(i),
+ getByte(i + 1),
+ getByte(i + 2),
+ getByte(i + 3)
+ );
+ }
+
+ default float getFloat(int i) {
+ return Float.intBitsToFloat(getInt(i));
+ }
+
+ default double getDouble(int i) {
+ return Double.longBitsToDouble(getLong(i));
+ }
+
+ default boolean getBoolean(int i) {
+ return getByte(i) != 0;
+ }
+
+ default void setBoolean(int i, boolean val) {
+ set(i, val ? (byte) 1 : 0);
+ }
+
+ default void setByte(int i, byte val) {
+ set(i, val);
+ }
+
+ default void setInt(int i, int val) {
+ set(i, (byte) (val >> 24));
+ set(i + 1, (byte) (val >> 16));
+ set(i + 2, (byte) (val >> 8));
+ set(i + 3, (byte) val);
+ }
+
+ default void setLong(int i, long val) {
+ set(i, (byte) (val >> 56));
+ set(i + 1, (byte) (val >> 48));
+ set(i + 2, (byte) (val >> 40));
+ set(i + 3, (byte) (val >> 32));
+ set(i + 4, (byte) (val >> 24));
+ set(i + 5, (byte) (val >> 16));
+ set(i + 6, (byte) (val >> 8));
+ set(i + 7, (byte) val);
+ }
+
+ default void setFloat(int i, float val) {
+ setInt(i, Float.floatToRawIntBits(val));
+ }
+
+ default void setDouble(int i, double val) {
+ setLong(i, Double.doubleToRawLongBits(val));
+ }
+
+ default SafeByteArrayOutputStream binaryOutputStream() {
+ return binaryOutputStream(0, size());
+ }
+
+ default SafeByteArrayOutputStream binaryOutputStream(int from) {
+ return binaryOutputStream(from, size());
+ }
+
+ SafeByteArrayOutputStream binaryOutputStream(int from, int to);
+
+ boolean equals(int aStartIndex, Buf b, int bStartIndex, int length);
+
+ boolean equals(int aStartIndex, byte[] b, int bStartIndex, int length);
+
+ default String toString(Charset charset) {
+ return new String(this.asArray(), charset);
+ }
+}
diff --git a/src/main/java/it/cavallium/dbengine/buffers/BufDataInput.java b/src/main/java/it/cavallium/dbengine/buffers/BufDataInput.java
new file mode 100644
index 0000000..13db4ac
--- /dev/null
+++ b/src/main/java/it/cavallium/dbengine/buffers/BufDataInput.java
@@ -0,0 +1,42 @@
+package it.cavallium.dbengine.buffers;
+
+import org.jetbrains.annotations.NotNull;
+import org.warp.commonutils.stream.SafeByteArrayInputStream;
+import org.warp.commonutils.stream.SafeDataInputStream;
+
+
+public class BufDataInput extends SafeDataInputStream {
+
+ /**
+ * Creates a DataInputStream that uses the specified underlying InputStream.
+ *
+ * @param in the specified input stream
+ */
+ private BufDataInput(@NotNull SafeByteArrayInputStream in) {
+ super(in);
+ }
+
+ public static BufDataInput create(Buf byteList) {
+ return new BufDataInput(byteList.binaryInputStream());
+ }
+
+ @Deprecated
+ @Override
+ public void close() {
+ }
+
+ @Override
+ public void mark(int readlimit) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void reset() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean markSupported() {
+ return false;
+ }
+}
diff --git a/src/main/java/it/cavallium/dbengine/buffers/BufDataOutput.java b/src/main/java/it/cavallium/dbengine/buffers/BufDataOutput.java
new file mode 100644
index 0000000..acd1260
--- /dev/null
+++ b/src/main/java/it/cavallium/dbengine/buffers/BufDataOutput.java
@@ -0,0 +1,218 @@
+package it.cavallium.dbengine.buffers;
+
+import it.unimi.dsi.fastutil.Arrays;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Objects;
+import org.jetbrains.annotations.NotNull;
+import org.warp.commonutils.stream.SafeByteArrayOutputStream;
+import org.warp.commonutils.stream.SafeDataOutputStream;
+
+public class BufDataOutput implements DataOutput {
+
+ private final SafeByteArrayOutputStream buf;
+ private final SafeDataOutputStream dOut;
+ private final int limit;
+
+ private BufDataOutput(SafeByteArrayOutputStream buf) {
+ this.buf = buf;
+ this.dOut = new SafeDataOutputStream(buf);
+ limit = Integer.MAX_VALUE;
+ }
+
+ private BufDataOutput(SafeByteArrayOutputStream buf, int maxSize) {
+ this.buf = buf;
+ this.dOut = new SafeDataOutputStream(buf);
+ this.limit = maxSize;
+ }
+
+ public static BufDataOutput createLimited(int maxSize, int hint) {
+ if (hint >= 0) {
+ if (maxSize < 0 || maxSize == Integer.MAX_VALUE) {
+ return create(hint);
+ } else {
+ return new BufDataOutput(new SafeByteArrayOutputStream(Math.min(maxSize, hint)), maxSize);
+ }
+ } else {
+ return createLimited(maxSize);
+ }
+ }
+
+ public static BufDataOutput createLimited(int maxSize) {
+ if (maxSize < 0 || maxSize == Integer.MAX_VALUE) {
+ return create();
+ } else {
+ return new BufDataOutput(new SafeByteArrayOutputStream(maxSize), maxSize);
+ }
+ }
+
+ public static BufDataOutput create() {
+ return new BufDataOutput(new SafeByteArrayOutputStream());
+ }
+
+ public static BufDataOutput create(int hint) {
+ if (hint >= 0) {
+ return new BufDataOutput(new SafeByteArrayOutputStream(hint));
+ } else {
+ return create();
+ }
+ }
+
+ public static BufDataOutput wrap(Buf buf, int from, int to) {
+ Arrays.ensureFromTo(buf.size(), from, to);
+ if (buf.isEmpty()) {
+ return createLimited(0);
+ } else {
+ return new BufDataOutput(buf.binaryOutputStream(from), to - from);
+ }
+ }
+
+ public static BufDataOutput wrap(Buf buf) {
+ if (buf.isEmpty()) {
+ return createLimited(0);
+ } else {
+ return new BufDataOutput(buf.binaryOutputStream(), buf.size());
+ }
+ }
+
+ private IllegalStateException unreachable(IOException ex) {
+ return new IllegalStateException(ex);
+ }
+
+ @Override
+ public void write(int b) {
+ checkOutOfBounds(1);
+ dOut.write(b);
+ }
+
+ private void checkOutOfBounds(int delta) {
+ if (dOut.size() + delta > limit) {
+ throw new IndexOutOfBoundsException(limit);
+ }
+ }
+
+ @Override
+ public void write(byte @NotNull [] b) {
+ checkOutOfBounds(b.length);
+ dOut.write(b);
+ }
+
+ @Override
+ public void write(byte @NotNull [] b, int off, int len) {
+ checkOutOfBounds(Math.max(0, Math.min(b.length - off, len)));
+ dOut.write(b, off, len);
+ }
+
+ @Override
+ public void writeBoolean(boolean v) {
+ checkOutOfBounds(1);
+ dOut.writeBoolean(v);
+ }
+
+ @Override
+ public void writeByte(int v) {
+ checkOutOfBounds(Byte.BYTES);
+ dOut.writeByte(v);
+ }
+
+ @Override
+ public void writeShort(int v) {
+ checkOutOfBounds(Short.BYTES);
+ dOut.writeShort(v);
+ }
+
+ @Override
+ public void writeChar(int v) {
+ checkOutOfBounds(Character.BYTES);
+ dOut.writeChar(v);
+ }
+
+ @Override
+ public void writeInt(int v) {
+ checkOutOfBounds(Integer.BYTES);
+ dOut.writeInt(v);
+ }
+
+ @Override
+ public void writeLong(long v) {
+ checkOutOfBounds(Long.BYTES);
+ dOut.writeLong(v);
+ }
+
+ @Override
+ public void writeFloat(float v) {
+ checkOutOfBounds(Float.BYTES);
+ dOut.writeFloat(v);
+ }
+
+ @Override
+ public void writeDouble(double v) {
+ checkOutOfBounds(Double.BYTES);
+ dOut.writeDouble(v);
+ }
+
+ public void ensureWritable(int size) {
+ dOut.flush();
+ buf.ensureWritable(size);
+ }
+
+ @Override
+ public void writeBytes(@NotNull String s) {
+ checkOutOfBounds(s.length() * Byte.BYTES);
+ dOut.writeBytes(s);
+ }
+
+ // todo: check
+ public void writeBytes(Buf deserialized) {
+ checkOutOfBounds(deserialized.size());
+ deserialized.writeTo(dOut);
+ }
+
+ public void writeBytes(byte[] b, int off, int len) {
+ write(b, off, len);
+ }
+
+ @Override
+ public void writeChars(@NotNull String s) {
+ checkOutOfBounds(Character.BYTES * s.length());
+ dOut.writeChars(s);
+ }
+
+ @Override
+ public void writeUTF(@NotNull String s) {
+ throw new UnsupportedOperationException();
+ }
+
+ public Buf asList() {
+ dOut.flush();
+ return Buf.wrap(this.buf.array, this.buf.length);
+ }
+
+ @Override
+ public String toString() {
+ return dOut.toString();
+ }
+
+ @Override
+ public int hashCode() {
+ return dOut.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ BufDataOutput that = (BufDataOutput) o;
+
+ return Objects.equals(dOut, that.dOut);
+ }
+
+ public int size() {
+ return dOut.size();
+ }
+}
diff --git a/src/main/java/it/cavallium/dbengine/buffers/ByteListBuf.java b/src/main/java/it/cavallium/dbengine/buffers/ByteListBuf.java
new file mode 100644
index 0000000..3afbb85
--- /dev/null
+++ b/src/main/java/it/cavallium/dbengine/buffers/ByteListBuf.java
@@ -0,0 +1,467 @@
+package it.cavallium.dbengine.buffers;
+
+import it.unimi.dsi.fastutil.bytes.AbstractByteList;
+import it.unimi.dsi.fastutil.bytes.ByteArrayList;
+import it.unimi.dsi.fastutil.bytes.ByteCollection;
+import it.unimi.dsi.fastutil.bytes.ByteConsumer;
+import it.unimi.dsi.fastutil.bytes.ByteIterator;
+import it.unimi.dsi.fastutil.bytes.ByteIterators;
+import it.unimi.dsi.fastutil.bytes.ByteList;
+import it.unimi.dsi.fastutil.bytes.ByteListIterator;
+import it.unimi.dsi.fastutil.bytes.ByteSpliterator;
+import it.unimi.dsi.fastutil.bytes.ByteSpliterators;
+import java.io.Serial;
+import java.nio.charset.Charset;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+import org.warp.commonutils.stream.SafeByteArrayInputStream;
+import org.warp.commonutils.stream.SafeByteArrayOutputStream;
+import org.warp.commonutils.stream.SafeDataOutput;
+
+class ByteListBuf extends ByteArrayList implements Buf {
+
+ private boolean mutable = true;
+
+ protected ByteListBuf(byte[] a, boolean wrapped) {
+ super(a, wrapped);
+ }
+
+ public ByteListBuf(int capacity) {
+ super(capacity);
+ }
+
+ public ByteListBuf() {
+ }
+
+ public ByteListBuf(Collection extends Byte> c) {
+ super(c);
+ }
+
+ public ByteListBuf(ByteCollection c) {
+ super(c);
+ }
+
+ public ByteListBuf(ByteList l) {
+ super(l);
+ }
+
+ public ByteListBuf(byte[] a) {
+ super(a);
+ }
+
+ public ByteListBuf(byte[] a, int offset, int length) {
+ super(a, offset, length);
+ }
+
+ public ByteListBuf(Iterator extends Byte> i) {
+ super(i);
+ }
+
+ public ByteListBuf(ByteIterator i) {
+ super(i);
+ }
+
+ /**
+ * Wraps a given array into an array list of given size.
+ *
+ *
+ * Note it is guaranteed that the type of the array returned by {@link #elements()} will be the same
+ * (see the comments in the class documentation).
+ *
+ * @param a an array to wrap.
+ * @param length the length of the resulting array list.
+ * @return a new array list of the given size, wrapping the given array.
+ */
+ public static ByteListBuf wrap(final byte[] a, final int length) {
+ if (length > a.length) throw new IllegalArgumentException("The specified length (" + length + ") is greater than the array size (" + a.length + ")");
+ final ByteListBuf l = new ByteListBuf(a, true);
+ l.size = length;
+ return l;
+ }
+
+ /**
+ * Wraps a given array into an array list.
+ *
+ *
+ * Note it is guaranteed that the type of the array returned by {@link #elements()} will be the same
+ * (see the comments in the class documentation).
+ *
+ * @param a an array to wrap.
+ * @return a new array list wrapping the given array.
+ */
+ public static ByteListBuf wrap(final byte[] a) {
+ return wrap(a, a.length);
+ }
+
+ /**
+ * Creates a new empty array list.
+ *
+ * @return a new empty array list.
+ */
+ public static ByteListBuf of() {
+ return new ByteListBuf();
+ }
+
+ /**
+ * Creates an array list using an array of elements.
+ *
+ * @param init a the array the will become the new backing array of the array list.
+ * @return a new array list backed by the given array.
+ * @see #wrap
+ */
+
+ public static ByteListBuf of(final byte... init) {
+ return wrap(init);
+ }
+
+ @Override
+ public byte @NotNull [] asArray() {
+ if (this.size() == a.length) {
+ return this.a;
+ } else {
+ return this.toByteArray();
+ }
+ }
+
+ @Override
+ public byte @Nullable [] asArrayStrict() {
+ if (this.size() == a.length) {
+ return a;
+ } else {
+ return null;
+ }
+ }
+
+ @Override
+ public byte @Nullable [] asUnboundedArray() {
+ return a;
+ }
+
+ @Override
+ public byte @Nullable [] asUnboundedArrayStrict() {
+ return a;
+ }
+
+ @Override
+ public boolean isMutable() {
+ return mutable;
+ }
+
+ @Override
+ public void freeze() {
+ mutable = false;
+ }
+
+ @Override
+ public Buf subList(int from, int to) {
+ if (from == 0 && to == size()) return this;
+ ensureIndex(from);
+ ensureIndex(to);
+ if (from > to) throw new IndexOutOfBoundsException("Start index (" + from + ") is greater than end index (" + to + ")");
+ return new SubList(from, to);
+ }
+
+ @Override
+ public Buf copy() {
+ var copied = ByteListBuf.wrap(this.a.clone());
+ copied.size = this.size;
+ return copied;
+ }
+
+ @Override
+ public SafeByteArrayInputStream binaryInputStream() {
+ return new SafeByteArrayInputStream(this.a, 0, this.size);
+ }
+
+ @Override
+ public void writeTo(SafeDataOutput dataOutput) {
+ dataOutput.write(this.a, 0, this.size);
+ }
+
+ @Override
+ public SafeByteArrayOutputStream binaryOutputStream(int from, int to) {
+ it.unimi.dsi.fastutil.Arrays.ensureFromTo(size, from, to);
+ return new SafeByteArrayOutputStream(a, from, to);
+ }
+
+ @Override
+ public boolean equals(int aStartIndex, Buf b, int bStartIndex, int length) {
+ return b.equals(bStartIndex, this.a, aStartIndex, length);
+ }
+
+ @Override
+ public boolean equals(int aStartIndex, byte[] b, int bStartIndex, int length) {
+ if (aStartIndex < 0) return false;
+ if (aStartIndex + length > this.size) {
+ return false;
+ }
+ return Arrays.equals(a, aStartIndex, aStartIndex + length, b, bStartIndex, bStartIndex + length);
+ }
+
+ @Override
+ public String toString(Charset charset) {
+ return new String(a, 0, size, charset);
+ }
+
+ private class SubList extends AbstractByteList.ByteRandomAccessSubList implements Buf {
+ @Serial
+ private static final long serialVersionUID = -3185226345314976296L;
+
+ private boolean subMutable = true;
+
+ protected SubList(int from, int to) {
+ super(ByteListBuf.this, from, to);
+ }
+
+ // Most of the inherited methods should be fine, but we can override a few of them for performance.
+ // Needed because we can't access the parent class' instance variables directly in a different
+ // instance of SubList.
+ private byte[] getParentArray() {
+ return a;
+ }
+
+ @Override
+ public @NotNull Buf subList(int from, int to) {
+ it.unimi.dsi.fastutil.Arrays.ensureFromTo(a.length, from, to);
+ if (from > to) throw new IllegalArgumentException("Start index (" + from + ") is greater than end index (" + to + ")");
+ // Sadly we have to rewrap this, because if there is a sublist of a sublist, and the
+ // subsublist adds, both sublists need to update their "to" value.
+ return new SubList(from, to);
+ }
+
+ @Override
+ public Buf copy() {
+ return Buf.wrap(Arrays.copyOfRange(a, from, to));
+ }
+
+ @Override
+ public SafeByteArrayInputStream binaryInputStream() {
+ return new SafeByteArrayInputStream(a, from, size());
+ }
+
+ @Override
+ public void writeTo(SafeDataOutput dataOutput) {
+ dataOutput.write(a, from, size());
+ }
+
+ @Override
+ public SafeByteArrayOutputStream binaryOutputStream(int from, int to) {
+ it.unimi.dsi.fastutil.Arrays.ensureFromTo(size(), from, to);
+ return new SafeByteArrayOutputStream(a, from + this.from, to + this.from);
+ }
+
+ @Override
+ public boolean equals(int aStartIndex, Buf b, int bStartIndex, int length) {
+ return b.equals(bStartIndex, a, aStartIndex + from, length);
+ }
+
+ @Override
+ public boolean equals(int aStartIndex, byte[] b, int bStartIndex, int length) {
+ var aFrom = from + aStartIndex;
+ var aTo = from + aStartIndex + length;
+ if (aFrom < from) return false;
+ if (aTo > to) return false;
+ return Arrays.equals(a, aFrom, aTo, b, bStartIndex, bStartIndex + length);
+ }
+
+ @Override
+ public byte getByte(int i) {
+ ensureRestrictedIndex(i);
+ return a[i + from];
+ }
+
+ @Override
+ public byte @NotNull [] asArray() {
+ if (this.from == 0 && this.to == a.length) {
+ return a;
+ } else {
+ return toByteArray();
+ }
+ }
+
+ @Override
+ public byte @Nullable [] asArrayStrict() {
+ if (this.from == 0 && this.to == a.length) {
+ return a;
+ } else {
+ return null;
+ }
+ }
+
+ @Override
+ public byte @Nullable [] asUnboundedArray() {
+ if (from == 0) {
+ return a;
+ } else {
+ return toByteArray();
+ }
+ }
+
+ @Override
+ public byte @Nullable [] asUnboundedArrayStrict() {
+ if (from == 0) {
+ return a;
+ } else {
+ return null;
+ }
+ }
+
+ @Override
+ public boolean isMutable() {
+ return mutable && subMutable;
+ }
+
+ @Override
+ public void freeze() {
+ subMutable = false;
+ }
+
+ private final class SubListIterator extends ByteIterators.AbstractIndexBasedListIterator {
+ // We are using pos == 0 to be 0 relative to SubList.from (meaning you need to do a[from + i] when
+ // accessing array).
+ SubListIterator(int index) {
+ super(0, index);
+ }
+
+ @Override
+ protected byte get(int i) {
+ return a[from + i];
+ }
+
+ @Override
+ protected void add(int i, byte k) {
+ ByteListBuf.SubList.this.add(i, k);
+ }
+
+ @Override
+ protected void set(int i, byte k) {
+ ByteListBuf.SubList.this.set(i, k);
+ }
+
+ @Override
+ protected void remove(int i) {
+ ByteListBuf.SubList.this.removeByte(i);
+ }
+
+ @Override
+ protected int getMaxPos() {
+ return to - from;
+ }
+
+ @Override
+ public byte nextByte() {
+ if (!hasNext()) throw new NoSuchElementException();
+ return a[from + (lastReturned = pos++)];
+ }
+
+ @Override
+ public byte previousByte() {
+ if (!hasPrevious()) throw new NoSuchElementException();
+ return a[from + (lastReturned = --pos)];
+ }
+
+ @Override
+ public void forEachRemaining(final ByteConsumer action) {
+ final int max = to - from;
+ while (pos < max) {
+ action.accept(a[from + (lastReturned = pos++)]);
+ }
+ }
+ }
+
+ @Override
+ public @NotNull ByteListIterator listIterator(int index) {
+ return new ByteListBuf.SubList.SubListIterator(index);
+ }
+
+ private final class SubListSpliterator extends ByteSpliterators.LateBindingSizeIndexBasedSpliterator {
+ // We are using pos == 0 to be 0 relative to real array 0
+ SubListSpliterator() {
+ super(from);
+ }
+
+ private SubListSpliterator(int pos, int maxPos) {
+ super(pos, maxPos);
+ }
+
+ @Override
+ protected int getMaxPosFromBackingStore() {
+ return to;
+ }
+
+ @Override
+ protected byte get(int i) {
+ return a[i];
+ }
+
+ @Override
+ protected ByteListBuf.SubList.SubListSpliterator makeForSplit(int pos, int maxPos) {
+ return new ByteListBuf.SubList.SubListSpliterator(pos, maxPos);
+ }
+
+ @Override
+ public boolean tryAdvance(final ByteConsumer action) {
+ if (pos >= getMaxPos()) return false;
+ action.accept(a[pos++]);
+ return true;
+ }
+
+ @Override
+ public void forEachRemaining(final ByteConsumer action) {
+ final int max = getMaxPos();
+ while (pos < max) {
+ action.accept(a[pos++]);
+ }
+ }
+ }
+
+ @Override
+ public ByteSpliterator spliterator() {
+ return new ByteListBuf.SubList.SubListSpliterator();
+ }
+
+ boolean contentsEquals(byte[] otherA, int otherAFrom, int otherATo) {
+ if (a == otherA && from == otherAFrom && to == otherATo) return true;
+ return Arrays.equals(a, from, to, otherA, otherAFrom, otherATo);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o == this) return true;
+ if (o == null) return false;
+ if (!(o instanceof java.util.List)) return false;
+ if (o instanceof ByteListBuf other) {
+ return contentsEquals(other.a, 0, other.size());
+ }
+ if (o instanceof SubList other) {
+ return contentsEquals(other.getParentArray(), other.from, other.to);
+ }
+ return super.equals(o);
+ }
+
+ int contentsCompareTo(byte[] otherA, int otherAFrom, int otherATo) {
+ if (a == otherA && from == otherAFrom && to == otherATo) return 0;
+ return Arrays.compareUnsigned(a, from, to, otherA, otherAFrom, otherATo);
+ }
+
+ @Override
+ public int compareTo(final java.util.@NotNull List extends Byte> l) {
+ if (l instanceof ByteListBuf other) {
+ return contentsCompareTo(other.a, 0, other.size());
+ }
+ if (l instanceof ByteListBuf.SubList other) {
+ return contentsCompareTo(other.getParentArray(), other.from, other.to);
+ }
+ return super.compareTo(l);
+ }
+
+ @Override
+ public String toString(Charset charset) {
+ return new String(a, from, to, charset);
+ }
+ }
+}
diff --git a/src/main/java/it/cavallium/dbengine/client/Backuppable.java b/src/main/java/it/cavallium/dbengine/client/Backuppable.java
index a83a6c9..eed25fc 100644
--- a/src/main/java/it/cavallium/dbengine/client/Backuppable.java
+++ b/src/main/java/it/cavallium/dbengine/client/Backuppable.java
@@ -1,8 +1,7 @@
package it.cavallium.dbengine.client;
+import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;
-import reactor.core.publisher.Mono;
-import reactor.core.publisher.SignalType;
public abstract class Backuppable implements IBackuppable {
@@ -13,29 +12,29 @@ public abstract class Backuppable implements IBackuppable {
private final AtomicInteger state = new AtomicInteger();
@Override
- public final Mono pauseForBackup() {
- return Mono.defer(() -> {
- if (state.compareAndSet(State.RUNNING.ordinal(), State.PAUSING.ordinal())) {
- return onPauseForBackup().doFinally(type -> state.compareAndSet(State.PAUSING.ordinal(),
- type == SignalType.ON_ERROR ? State.RUNNING.ordinal() : State.PAUSED.ordinal()
- ));
- } else {
- return Mono.empty();
+ public final void pauseForBackup() {
+ if (state.compareAndSet(State.RUNNING.ordinal(), State.PAUSING.ordinal())) {
+ try {
+ onPauseForBackup();
+ state.compareAndSet(State.PAUSING.ordinal(), State.PAUSED.ordinal());
+ } catch (Throwable ex) {
+ state.compareAndSet(State.PAUSING.ordinal(), State.RUNNING.ordinal());
+ throw ex;
}
- });
+ }
}
@Override
- public final Mono resumeAfterBackup() {
- return Mono.defer(() -> {
- if (state.compareAndSet(State.PAUSED.ordinal(), State.RESUMING.ordinal())) {
- return onResumeAfterBackup().doFinally(type -> state.compareAndSet(State.RESUMING.ordinal(),
- type == SignalType.ON_ERROR ? State.PAUSED.ordinal() : State.RUNNING.ordinal()
- ));
- } else {
- return Mono.empty();
+ public final void resumeAfterBackup() {
+ if (state.compareAndSet(State.PAUSED.ordinal(), State.RESUMING.ordinal())) {
+ try {
+ onResumeAfterBackup();
+ state.compareAndSet(State.RESUMING.ordinal(), State.RUNNING.ordinal());
+ } catch (Throwable ex) {
+ state.compareAndSet(State.RESUMING.ordinal(), State.PAUSED.ordinal());
+ throw ex;
}
- });
+ }
}
@Override
@@ -47,9 +46,9 @@ public abstract class Backuppable implements IBackuppable {
return State.values()[state.get()];
}
- protected abstract Mono onPauseForBackup();
+ protected abstract void onPauseForBackup();
- protected abstract Mono onResumeAfterBackup();
+ protected abstract void onResumeAfterBackup();
public final void setStopped() {
state.set(State.STOPPED.ordinal());
diff --git a/src/main/java/it/cavallium/dbengine/client/BadBlock.java b/src/main/java/it/cavallium/dbengine/client/BadBlock.java
index 7363a7a..aee9a2c 100644
--- a/src/main/java/it/cavallium/dbengine/client/BadBlock.java
+++ b/src/main/java/it/cavallium/dbengine/client/BadBlock.java
@@ -1,8 +1,8 @@
package it.cavallium.dbengine.client;
+import it.cavallium.dbengine.buffers.Buf;
import it.cavallium.dbengine.rpc.current.data.Column;
-import it.unimi.dsi.fastutil.bytes.ByteList;
import org.jetbrains.annotations.Nullable;
-public record BadBlock(String databaseName, @Nullable Column column, @Nullable ByteList rawKey,
+public record BadBlock(String databaseName, @Nullable Column column, @Nullable Buf rawKey,
@Nullable Throwable ex) {}
diff --git a/src/main/java/it/cavallium/dbengine/client/CastMapper.java b/src/main/java/it/cavallium/dbengine/client/CastMapper.java
index 8f09a10..12f6aba 100644
--- a/src/main/java/it/cavallium/dbengine/client/CastMapper.java
+++ b/src/main/java/it/cavallium/dbengine/client/CastMapper.java
@@ -1,14 +1,14 @@
package it.cavallium.dbengine.client;
-import it.cavallium.dbengine.client.Mapper;
-
public class CastMapper implements Mapper {
+ @SuppressWarnings("unchecked")
@Override
public U map(T key) {
return (U) key;
}
+ @SuppressWarnings("unchecked")
@Override
public T unmap(U key) {
return (T) key;
diff --git a/src/main/java/it/cavallium/dbengine/client/CompositeDatabase.java b/src/main/java/it/cavallium/dbengine/client/CompositeDatabase.java
index 5540a8f..8d51878 100644
--- a/src/main/java/it/cavallium/dbengine/client/CompositeDatabase.java
+++ b/src/main/java/it/cavallium/dbengine/client/CompositeDatabase.java
@@ -1,36 +1,32 @@
package it.cavallium.dbengine.client;
import io.micrometer.core.instrument.MeterRegistry;
-import io.netty5.buffer.BufferAllocator;
import it.cavallium.dbengine.database.DatabaseOperations;
import it.cavallium.dbengine.database.DatabaseProperties;
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
+import java.util.stream.Stream;
public interface CompositeDatabase extends DatabaseProperties, DatabaseOperations {
- Mono preClose();
+ void preClose();
- Mono close();
+ void close();
/**
* Can return SnapshotException
*/
- Mono takeSnapshot();
+ CompositeSnapshot takeSnapshot();
/**
* Can return SnapshotException
*/
- Mono releaseSnapshot(CompositeSnapshot snapshot);
-
- BufferAllocator getAllocator();
+ void releaseSnapshot(CompositeSnapshot snapshot);
MeterRegistry getMeterRegistry();
/**
* Find corrupted items
*/
- Flux badBlocks();
+ Stream badBlocks();
- Mono verifyChecksum();
+ void verifyChecksum();
}
diff --git a/src/main/java/it/cavallium/dbengine/client/CountedStream.java b/src/main/java/it/cavallium/dbengine/client/CountedStream.java
deleted file mode 100644
index 7035bb6..0000000
--- a/src/main/java/it/cavallium/dbengine/client/CountedStream.java
+++ /dev/null
@@ -1,47 +0,0 @@
-package it.cavallium.dbengine.client;
-
-import java.util.Collection;
-import java.util.List;
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
-
-public class CountedStream {
-
- private final Flux stream;
- private final long count;
-
- public CountedStream(Flux stream, long count) {
- this.stream = stream;
- this.count = count;
- }
-
- public Flux getStream() {
- return stream;
- }
-
- public long getCount() {
- return count;
- }
-
- @SafeVarargs
- public static CountedStream merge(CountedStream... stream) {
- return merge(List.of(stream));
- }
-
- public static CountedStream merge(Collection> stream) {
- return stream
- .stream()
- .reduce((a, b) -> new CountedStream<>(Flux.merge(a.getStream(), b.getStream()), a.getCount() + b.getCount()))
- .orElseGet(() -> new CountedStream<>(Flux.empty(), 0));
- }
-
- public static Mono> merge(Flux> stream) {
- return stream
- .reduce((a, b) -> new CountedStream<>(Flux.merge(a.getStream(), b.getStream()), a.getCount() + b.getCount()))
- .switchIfEmpty(Mono.fromSupplier(() -> new CountedStream<>(Flux.empty(), 0)));
- }
-
- public Mono> collectList() {
- return stream.collectList();
- }
-}
diff --git a/src/main/java/it/cavallium/dbengine/client/DefaultDatabaseOptions.java b/src/main/java/it/cavallium/dbengine/client/DefaultDatabaseOptions.java
index 196feb9..b5007c5 100644
--- a/src/main/java/it/cavallium/dbengine/client/DefaultDatabaseOptions.java
+++ b/src/main/java/it/cavallium/dbengine/client/DefaultDatabaseOptions.java
@@ -58,7 +58,6 @@ public class DefaultDatabaseOptions {
false,
false,
true,
- true,
Nullableint.empty(),
Nullablelong.empty(),
Nullablelong.empty(),
diff --git a/src/main/java/it/cavallium/dbengine/client/HitEntry.java b/src/main/java/it/cavallium/dbengine/client/HitEntry.java
index 701de4a..b03e1c4 100644
--- a/src/main/java/it/cavallium/dbengine/client/HitEntry.java
+++ b/src/main/java/it/cavallium/dbengine/client/HitEntry.java
@@ -1,9 +1,9 @@
package it.cavallium.dbengine.client;
import org.jetbrains.annotations.NotNull;
-import reactor.core.publisher.Mono;
+import org.jetbrains.annotations.Nullable;
-public record HitEntry(T key, U value, float score)
+public record HitEntry(T key, @Nullable U value, float score)
implements Comparable> {
@Override
diff --git a/src/main/java/it/cavallium/dbengine/client/HitKey.java b/src/main/java/it/cavallium/dbengine/client/HitKey.java
index 7c1cd8a..122f702 100644
--- a/src/main/java/it/cavallium/dbengine/client/HitKey.java
+++ b/src/main/java/it/cavallium/dbengine/client/HitKey.java
@@ -1,16 +1,13 @@
package it.cavallium.dbengine.client;
import it.cavallium.dbengine.database.collections.DatabaseEmpty.Nothing;
-import java.util.Comparator;
import java.util.function.Function;
import org.jetbrains.annotations.NotNull;
-import org.jetbrains.annotations.Nullable;
-import reactor.core.publisher.Mono;
public record HitKey(T key, float score) implements Comparable> {
- public Mono> withValue(Function> valueGetter) {
- return valueGetter.apply(key).map(value -> new HitEntry<>(key, value, score));
+ public HitEntry withValue(Function valueGetter) {
+ return new HitEntry<>(key, valueGetter.apply(key), score);
}
public HitEntry withNullValue() {
diff --git a/src/main/java/it/cavallium/dbengine/client/Hits.java b/src/main/java/it/cavallium/dbengine/client/Hits.java
index 684a403..1ecc5e6 100644
--- a/src/main/java/it/cavallium/dbengine/client/Hits.java
+++ b/src/main/java/it/cavallium/dbengine/client/Hits.java
@@ -4,29 +4,25 @@ import it.cavallium.dbengine.client.query.current.data.TotalHitsCount;
import it.cavallium.dbengine.database.DiscardingCloseable;
import it.cavallium.dbengine.database.SafeCloseable;
import it.cavallium.dbengine.database.collections.ValueGetter;
-import it.cavallium.dbengine.database.collections.ValueTransformer;
import it.cavallium.dbengine.lucene.LuceneCloseable;
import it.cavallium.dbengine.utils.SimpleResource;
-import java.util.Map.Entry;
-import java.util.Optional;
import java.util.function.Function;
+import java.util.stream.Stream;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
public class Hits extends SimpleResource implements DiscardingCloseable {
private static final Logger LOG = LogManager.getLogger(Hits.class);
- private static final Hits> EMPTY_HITS = new Hits<>(Flux.empty(), TotalHitsCount.of(0, true), false);
- private final Flux results;
+ private static final Hits> EMPTY_HITS = new Hits<>(Stream.empty(), TotalHitsCount.of(0, true), false);
+ private final Stream results;
private final TotalHitsCount totalHitsCount;
- public Hits(Flux results, TotalHitsCount totalHitsCount) {
+ public Hits(Stream results, TotalHitsCount totalHitsCount) {
this(results, totalHitsCount, true);
}
- private Hits(Flux results, TotalHitsCount totalHitsCount, boolean canClose) {
+ private Hits(Stream results, TotalHitsCount totalHitsCount, boolean canClose) {
super(canClose);
this.results = results;
this.totalHitsCount = totalHitsCount;
@@ -37,44 +33,16 @@ public class Hits extends SimpleResource implements DiscardingCloseable {
return (Hits) EMPTY_HITS;
}
- public static Function>, Hits>> generateMapper(
+ public static Function>, Hits>> generateMapper(
ValueGetter valueGetter) {
return result -> {
var hitsToTransform = result.results()
- .map(hit -> new LazyHitEntry<>(Mono.just(hit.key()), valueGetter.get(hit.key()), hit.score()));
+ .map(hit -> new HitEntry<>(hit.key(), valueGetter.get(hit.key()), hit.score()));
return Hits.withResource(hitsToTransform, result.totalHitsCount(), result);
};
}
- public static Function>, Hits>> generateMapper(
- ValueTransformer valueTransformer) {
- return result -> {
- try {
- var sharedHitsFlux = result.results().publish().refCount(3);
- var scoresFlux = sharedHitsFlux.map(HitKey::score);
- var keysFlux = sharedHitsFlux.map(HitKey::key);
-
- var valuesFlux = valueTransformer.transform(keysFlux);
-
- var transformedFlux = Flux.zip((Object[] data) -> {
- //noinspection unchecked
- var keyMono = Mono.just((T) data[0]);
- //noinspection unchecked
- var val = (Entry>) data[1];
- var valMono = Mono.justOrEmpty(val.getValue());
- var score = (Float) data[2];
- return new LazyHitEntry<>(keyMono, valMono, score);
- }, keysFlux, valuesFlux, scoresFlux);
-
- return Hits.withResource(transformedFlux, result.totalHitsCount(), result);
- } catch (Throwable t) {
- result.close();
- throw t;
- }
- };
- }
-
- public static Hits withResource(Flux hits, TotalHitsCount count, SafeCloseable resource) {
+ public static Hits withResource(Stream hits, TotalHitsCount count, SafeCloseable resource) {
if (resource instanceof LuceneCloseable luceneCloseable) {
return new LuceneHits<>(hits, count, luceneCloseable);
} else {
@@ -82,7 +50,7 @@ public class Hits extends SimpleResource implements DiscardingCloseable {
}
}
- public Flux results() {
+ public Stream results() {
ensureOpen();
return results;
}
@@ -105,7 +73,7 @@ public class Hits extends SimpleResource implements DiscardingCloseable {
private final LuceneCloseable resource;
- public LuceneHits(Flux hits, TotalHitsCount count, LuceneCloseable resource) {
+ public LuceneHits(Stream hits, TotalHitsCount count, LuceneCloseable resource) {
super(hits, count);
this.resource = resource;
}
@@ -125,7 +93,7 @@ public class Hits extends SimpleResource implements DiscardingCloseable {
private final SafeCloseable resource;
- public CloseableHits(Flux hits, TotalHitsCount count, SafeCloseable resource) {
+ public CloseableHits(Stream hits, TotalHitsCount count, SafeCloseable resource) {
super(hits, count);
this.resource = resource;
}
diff --git a/src/main/java/it/cavallium/dbengine/client/IBackuppable.java b/src/main/java/it/cavallium/dbengine/client/IBackuppable.java
index 2427f07..fa6ed80 100644
--- a/src/main/java/it/cavallium/dbengine/client/IBackuppable.java
+++ b/src/main/java/it/cavallium/dbengine/client/IBackuppable.java
@@ -1,12 +1,10 @@
package it.cavallium.dbengine.client;
-import reactor.core.publisher.Mono;
-
public interface IBackuppable {
- Mono pauseForBackup();
+ void pauseForBackup();
- Mono resumeAfterBackup();
+ void resumeAfterBackup();
boolean isPaused();
}
diff --git a/src/main/java/it/cavallium/dbengine/client/IndexAction.java b/src/main/java/it/cavallium/dbengine/client/IndexAction.java
deleted file mode 100644
index ec5d680..0000000
--- a/src/main/java/it/cavallium/dbengine/client/IndexAction.java
+++ /dev/null
@@ -1,128 +0,0 @@
-package it.cavallium.dbengine.client;
-
-import it.cavallium.dbengine.client.IndexAction.Add;
-import it.cavallium.dbengine.client.IndexAction.AddMulti;
-import it.cavallium.dbengine.client.IndexAction.Update;
-import it.cavallium.dbengine.client.IndexAction.UpdateMulti;
-import it.cavallium.dbengine.client.IndexAction.Delete;
-import it.cavallium.dbengine.client.IndexAction.DeleteAll;
-import it.cavallium.dbengine.client.IndexAction.TakeSnapshot;
-import it.cavallium.dbengine.client.IndexAction.ReleaseSnapshot;
-import it.cavallium.dbengine.client.IndexAction.Flush;
-import it.cavallium.dbengine.client.IndexAction.Refresh;
-import it.cavallium.dbengine.client.IndexAction.Close;
-import it.cavallium.dbengine.database.LLUpdateDocument;
-import it.cavallium.dbengine.database.LLSnapshot;
-import it.cavallium.dbengine.database.LLTerm;
-import java.util.Map;
-import java.util.Map.Entry;
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.MonoSink;
-
-sealed interface IndexAction permits Add, AddMulti, Update, UpdateMulti, Delete, DeleteAll, TakeSnapshot,
- ReleaseSnapshot, Flush, Refresh, Close {
-
- IndexActionType getType();
-
- final record Add(LLTerm key, LLUpdateDocument doc, MonoSink addedFuture) implements IndexAction {
-
- @Override
- public IndexActionType getType() {
- return IndexActionType.ADD;
- }
- }
-
- final record AddMulti(Flux> docsFlux, MonoSink addedMultiFuture) implements IndexAction {
-
- @Override
- public IndexActionType getType() {
- return IndexActionType.ADD_MULTI;
- }
- }
-
- final record Update(LLTerm key, LLUpdateDocument doc, MonoSink updatedFuture) implements IndexAction {
-
- @Override
- public IndexActionType getType() {
- return IndexActionType.UPDATE;
- }
- }
-
- final record UpdateMulti(Map docs, MonoSink updatedMultiFuture) implements IndexAction {
-
- @Override
- public IndexActionType getType() {
- return IndexActionType.UPDATE_MULTI;
- }
- }
-
- final record Delete(LLTerm key, MonoSink deletedFuture) implements IndexAction {
-
- @Override
- public IndexActionType getType() {
- return IndexActionType.DELETE;
- }
- }
-
- final record DeleteAll(MonoSink deletedAllFuture) implements IndexAction {
-
- @Override
- public IndexActionType getType() {
- return IndexActionType.DELETE_ALL;
- }
- }
-
- final record TakeSnapshot(MonoSink snapshotFuture) implements IndexAction {
-
- @Override
- public IndexActionType getType() {
- return IndexActionType.TAKE_SNAPSHOT;
- }
- }
-
- final record ReleaseSnapshot(LLSnapshot snapshot, MonoSink releasedFuture) implements IndexAction {
-
- @Override
- public IndexActionType getType() {
- return IndexActionType.RELEASE_SNAPSHOT;
- }
- }
-
- final record Flush(MonoSink flushFuture) implements IndexAction {
-
- @Override
- public IndexActionType getType() {
- return IndexActionType.FLUSH;
- }
- }
-
- final record Refresh(boolean force, MonoSink refreshFuture) implements IndexAction {
-
- @Override
- public IndexActionType getType() {
- return IndexActionType.REFRESH;
- }
- }
-
- final record Close(MonoSink closeFuture) implements IndexAction {
-
- @Override
- public IndexActionType getType() {
- return IndexActionType.CLOSE;
- }
- }
-
- enum IndexActionType {
- ADD,
- ADD_MULTI,
- UPDATE,
- UPDATE_MULTI,
- DELETE,
- DELETE_ALL,
- TAKE_SNAPSHOT,
- RELEASE_SNAPSHOT,
- FLUSH,
- REFRESH,
- CLOSE
- }
-}
diff --git a/src/main/java/it/cavallium/dbengine/client/Indicizer.java b/src/main/java/it/cavallium/dbengine/client/Indicizer.java
index 2a97c92..9d6118e 100644
--- a/src/main/java/it/cavallium/dbengine/client/Indicizer.java
+++ b/src/main/java/it/cavallium/dbengine/client/Indicizer.java
@@ -4,40 +4,33 @@ import com.google.common.collect.Multimap;
import com.google.common.collect.Multimaps;
import it.cavallium.dbengine.database.LLIndexRequest;
import it.cavallium.dbengine.database.LLSoftUpdateDocument;
-import it.cavallium.dbengine.database.LLUpdateDocument;
import it.cavallium.dbengine.database.LLTerm;
+import it.cavallium.dbengine.database.LLUpdateDocument;
import it.cavallium.dbengine.database.LLUpdateFields;
-import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.rpc.current.data.IndicizerAnalyzers;
import it.cavallium.dbengine.rpc.current.data.IndicizerSimilarities;
import java.util.Map;
-import java.util.Set;
import org.apache.lucene.index.IndexableField;
-import org.apache.lucene.util.BytesRef;
import org.jetbrains.annotations.NotNull;
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
-import reactor.util.function.Tuple2;
public abstract class Indicizer {
/**
* Transform a value to an IndexRequest.
*/
- public abstract @NotNull Mono extends LLIndexRequest> toIndexRequest(@NotNull T key, @NotNull U value);
+ public abstract @NotNull LLIndexRequest toIndexRequest(@NotNull T key, @NotNull U value);
- public final @NotNull Mono toDocument(@NotNull T key, @NotNull U value) {
- return toIndexRequest(key, value).map(req -> {
- if (req instanceof LLUpdateFields updateFields) {
- return new LLUpdateDocument(updateFields.items());
- } else if (req instanceof LLUpdateDocument updateDocument) {
- return updateDocument;
- } else if (req instanceof LLSoftUpdateDocument softUpdateDocument) {
- return new LLUpdateDocument(softUpdateDocument.items());
- } else {
- throw new UnsupportedOperationException("Unexpected request type: " + req);
- }
- });
+ public final @NotNull LLUpdateDocument toDocument(@NotNull T key, @NotNull U value) {
+ var req = toIndexRequest(key, value);
+ if (req instanceof LLUpdateFields updateFields) {
+ return new LLUpdateDocument(updateFields.items());
+ } else if (req instanceof LLUpdateDocument updateDocument) {
+ return updateDocument;
+ } else if (req instanceof LLSoftUpdateDocument softUpdateDocument) {
+ return new LLUpdateDocument(softUpdateDocument.items());
+ } else {
+ throw new UnsupportedOperationException("Unexpected request type: " + req);
+ }
}
public abstract @NotNull LLTerm toIndex(@NotNull T key);
diff --git a/src/main/java/it/cavallium/dbengine/client/IndicizerAnalyzers.java b/src/main/java/it/cavallium/dbengine/client/IndicizerAnalyzers.java
index dca3903..5b6a404 100644
--- a/src/main/java/it/cavallium/dbengine/client/IndicizerAnalyzers.java
+++ b/src/main/java/it/cavallium/dbengine/client/IndicizerAnalyzers.java
@@ -1,7 +1,6 @@
package it.cavallium.dbengine.client;
import it.cavallium.dbengine.lucene.analyzer.TextFieldsAnalyzer;
-import it.cavallium.dbengine.rpc.current.serializers.IndicizerAnalyzersSerializer;
import java.util.Map;
public class IndicizerAnalyzers {
diff --git a/src/main/java/it/cavallium/dbengine/client/IntOpenHashSetJsonAdapter.java b/src/main/java/it/cavallium/dbengine/client/IntOpenHashSetJsonAdapter.java
index 8ac66f8..13c9db2 100644
--- a/src/main/java/it/cavallium/dbengine/client/IntOpenHashSetJsonAdapter.java
+++ b/src/main/java/it/cavallium/dbengine/client/IntOpenHashSetJsonAdapter.java
@@ -2,7 +2,6 @@ package it.cavallium.dbengine.client;
import com.squareup.moshi.JsonReader;
import com.squareup.moshi.JsonWriter;
-import it.cavallium.data.generator.nativedata.Int52;
import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
import java.io.IOException;
import org.jetbrains.annotations.NotNull;
diff --git a/src/main/java/it/cavallium/dbengine/client/LazyHitEntry.java b/src/main/java/it/cavallium/dbengine/client/LazyHitEntry.java
deleted file mode 100644
index 03cf552..0000000
--- a/src/main/java/it/cavallium/dbengine/client/LazyHitEntry.java
+++ /dev/null
@@ -1,15 +0,0 @@
-package it.cavallium.dbengine.client;
-
-import org.jetbrains.annotations.NotNull;
-import reactor.core.publisher.Mono;
-
-public record LazyHitEntry(Mono key, Mono value, float score) {
-
- public Mono> resolve() {
- return Mono.zip(key, value, (k, v) -> new HitEntry<>(k, v, score));
- }
-
- public Mono> resolveKey() {
- return key.map(k -> new HitKey<>(k, score));
- }
-}
diff --git a/src/main/java/it/cavallium/dbengine/client/LazyHitKey.java b/src/main/java/it/cavallium/dbengine/client/LazyHitKey.java
deleted file mode 100644
index c6c101f..0000000
--- a/src/main/java/it/cavallium/dbengine/client/LazyHitKey.java
+++ /dev/null
@@ -1,19 +0,0 @@
-package it.cavallium.dbengine.client;
-
-import java.util.function.Function;
-import reactor.core.publisher.Mono;
-
-public record LazyHitKey(Mono key, float score) {
-
- public LazyHitEntry withValue(Function> valueGetter) {
- return new LazyHitEntry<>(key, key.flatMap(valueGetter), score);
- }
-
- public Mono> resolve() {
- return key.map(k -> new HitKey<>(k, score));
- }
-
- public Mono> resolveWithValue(Function> valueGetter) {
- return resolve().flatMap(key -> key.withValue(valueGetter));
- }
-}
diff --git a/src/main/java/it/cavallium/dbengine/client/LuceneIndex.java b/src/main/java/it/cavallium/dbengine/client/LuceneIndex.java
index 46af19f..d9d46cb 100644
--- a/src/main/java/it/cavallium/dbengine/client/LuceneIndex.java
+++ b/src/main/java/it/cavallium/dbengine/client/LuceneIndex.java
@@ -1,78 +1,71 @@
package it.cavallium.dbengine.client;
-import io.netty5.util.Send;
import it.cavallium.dbengine.client.query.ClientQueryParams;
import it.cavallium.dbengine.client.query.current.data.Query;
import it.cavallium.dbengine.client.query.current.data.TotalHitsCount;
import it.cavallium.dbengine.database.Delta;
import it.cavallium.dbengine.database.LLSnapshottable;
-import it.cavallium.dbengine.database.collections.ValueGetter;
-import it.cavallium.dbengine.database.collections.ValueTransformer;
import it.cavallium.dbengine.lucene.collector.Buckets;
import it.cavallium.dbengine.lucene.searcher.BucketParams;
-import it.unimi.dsi.fastutil.doubles.DoubleArrayList;
import java.util.List;
import java.util.Map.Entry;
+import java.util.stream.Stream;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
public interface LuceneIndex extends LLSnapshottable {
- Mono addDocument(T key, U value);
+ void addDocument(T key, U value);
- Mono addDocuments(boolean atomic, Flux> entries);
+ long addDocuments(boolean atomic, Stream> entries);
- Mono deleteDocument(T key);
+ void deleteDocument(T key);
- Mono updateDocument(T key, @NotNull U value);
+ void updateDocument(T key, @NotNull U value);
- Mono updateDocuments(Flux> entries);
+ long updateDocuments(Stream> entries);
- default Mono updateOrDeleteDocument(T key, @Nullable U value) {
+ default void updateOrDeleteDocument(T key, @Nullable U value) {
if (value == null) {
- return deleteDocument(key);
+ deleteDocument(key);
} else {
- return updateDocument(key, value);
+ updateDocument(key, value);
}
}
- default Mono updateOrDeleteDocumentIfModified(T key, @NotNull Delta delta) {
- return updateOrDeleteDocumentIfModified(key, delta.current(), delta.isModified());
+ default void updateOrDeleteDocumentIfModified(T key, @NotNull Delta delta) {
+ updateOrDeleteDocumentIfModified(key, delta.current(), delta.isModified());
}
- default Mono updateOrDeleteDocumentIfModified(T key, @Nullable U currentValue, boolean modified) {
+ default void updateOrDeleteDocumentIfModified(T key, @Nullable U currentValue, boolean modified) {
if (modified) {
- return updateOrDeleteDocument(key, currentValue);
- } else {
- return Mono.empty();
+ updateOrDeleteDocument(key, currentValue);
}
}
- Mono deleteAll();
+ void deleteAll();
- Mono>> moreLikeThis(ClientQueryParams queryParams, T key,
+ Hits> moreLikeThis(ClientQueryParams queryParams, T key,
U mltDocumentValue);
- Mono>> search(ClientQueryParams queryParams);
+ Hits> search(ClientQueryParams queryParams);
- Mono computeBuckets(@Nullable CompositeSnapshot snapshot,
+ Buckets computeBuckets(@Nullable CompositeSnapshot snapshot,
@NotNull List queries,
@Nullable Query normalizationQuery,
BucketParams bucketParams);
- Mono count(@Nullable CompositeSnapshot snapshot, Query query);
+ TotalHitsCount count(@Nullable CompositeSnapshot snapshot, Query query);
boolean isLowMemoryMode();
void close();
- Mono flush();
+ void flush();
- Mono waitForMerges();
+ void waitForMerges();
- Mono waitForLastMerges();
+ void waitForLastMerges();
- Mono refresh(boolean force);
+ void refresh(boolean force);
}
diff --git a/src/main/java/it/cavallium/dbengine/client/LuceneIndexImpl.java b/src/main/java/it/cavallium/dbengine/client/LuceneIndexImpl.java
index f68ebdc..d32c10d 100644
--- a/src/main/java/it/cavallium/dbengine/client/LuceneIndexImpl.java
+++ b/src/main/java/it/cavallium/dbengine/client/LuceneIndexImpl.java
@@ -5,7 +5,6 @@ import it.cavallium.dbengine.client.Hits.LuceneHits;
import it.cavallium.dbengine.client.query.ClientQueryParams;
import it.cavallium.dbengine.client.query.current.data.Query;
import it.cavallium.dbengine.client.query.current.data.TotalHitsCount;
-import it.cavallium.dbengine.database.DiscardingCloseable;
import it.cavallium.dbengine.database.LLKeyScore;
import it.cavallium.dbengine.database.LLLuceneIndex;
import it.cavallium.dbengine.database.LLSearchResultShard;
@@ -13,8 +12,6 @@ import it.cavallium.dbengine.database.LLSearchResultShard.LuceneLLSearchResultSh
import it.cavallium.dbengine.database.LLSearchResultShard.ResourcesLLSearchResultShard;
import it.cavallium.dbengine.database.LLSnapshot;
import it.cavallium.dbengine.database.LLTerm;
-import it.cavallium.dbengine.database.LLUpdateDocument;
-import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.SafeCloseable;
import it.cavallium.dbengine.lucene.LuceneCloseable;
import it.cavallium.dbengine.lucene.LuceneUtils;
@@ -26,14 +23,12 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
-import java.util.logging.Level;
+import java.util.function.Function;
+import java.util.stream.Stream;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
-import reactor.core.publisher.SignalType;
public class LuceneIndexImpl implements LuceneIndex {
@@ -56,96 +51,87 @@ public class LuceneIndexImpl implements LuceneIndex {
}
@Override
- public Mono addDocument(T key, U value) {
- return indicizer
- .toDocument(key, value)
- .flatMap(doc -> luceneIndex.addDocument(indicizer.toIndex(key), doc));
+ public void addDocument(T key, U value) {
+ luceneIndex.addDocument(indicizer.toIndex(key), indicizer.toDocument(key, value));
}
@Override
- public Mono addDocuments(boolean atomic, Flux> entries) {
- return luceneIndex.addDocuments(atomic, entries.flatMap(entry -> indicizer
- .toDocument(entry.getKey(), entry.getValue())
- .map(doc -> Map.entry(indicizer.toIndex(entry.getKey()), doc))));
+ public long addDocuments(boolean atomic, Stream> entries) {
+ return luceneIndex.addDocuments(atomic, entries.map(entry ->
+ Map.entry(indicizer.toIndex(entry.getKey()), indicizer.toDocument(entry.getKey(), entry.getValue()))));
}
@Override
- public Mono deleteDocument(T key) {
+ public void deleteDocument(T key) {
LLTerm id = indicizer.toIndex(key);
- return luceneIndex.deleteDocument(id);
+ luceneIndex.deleteDocument(id);
}
@Override
- public Mono updateDocument(T key, @NotNull U value) {
- return indicizer
- .toIndexRequest(key, value)
- .flatMap(doc -> luceneIndex.update(indicizer.toIndex(key), doc));
+ public void updateDocument(T key, @NotNull U value) {
+ luceneIndex.update(indicizer.toIndex(key), indicizer.toIndexRequest(key, value));
}
@Override
- public Mono updateDocuments(Flux> entries) {
- Flux> mappedEntries = entries
- .flatMap(entry -> Mono
- .zip(Mono.just(indicizer.toIndex(entry.getKey())),
- indicizer.toDocument(entry.getKey(), entry.getValue()).single(),
- Map::entry
- )
- .single()
- )
- .log("impl-update-documents", Level.FINEST, false, SignalType.ON_NEXT, SignalType.ON_COMPLETE);
- return luceneIndex.updateDocuments(mappedEntries);
+ public long updateDocuments(Stream> entries) {
+ return luceneIndex.updateDocuments(entries.map(entry ->
+ Map.entry(indicizer.toIndex(entry.getKey()), indicizer.toDocument(entry.getKey(), entry.getValue()))));
}
@Override
- public Mono deleteAll() {
- return luceneIndex.deleteAll();
+ public void deleteAll() {
+ luceneIndex.deleteAll();
}
@Override
- public Mono>> moreLikeThis(ClientQueryParams queryParams,
+ public Hits> moreLikeThis(ClientQueryParams queryParams,
T key,
U mltDocumentValue) {
var mltDocumentFields
= indicizer.getMoreLikeThisDocumentFields(key, mltDocumentValue);
- return luceneIndex
+ var results = luceneIndex
.moreLikeThis(resolveSnapshot(queryParams.snapshot()),
queryParams.toQueryParams(),
indicizer.getKeyFieldName(),
mltDocumentFields
)
- .collectList()
- .mapNotNull(shards -> mergeResults(queryParams, shards))
- .map(llSearchResult -> mapResults(llSearchResult))
- .defaultIfEmpty(Hits.empty())
- .doOnDiscard(DiscardingCloseable.class, LLUtils::onDiscard);
+ .toList();
+ LLSearchResultShard mergedResults = mergeResults(queryParams, results);
+ if (mergedResults != null) {
+ return mapResults(mergedResults);
+ } else {
+ return Hits.empty();
+ }
}
@Override
- public Mono>> search(ClientQueryParams queryParams) {
- return luceneIndex
+ public Hits> search(ClientQueryParams queryParams) {
+ var results = luceneIndex
.search(resolveSnapshot(queryParams.snapshot()),
queryParams.toQueryParams(),
indicizer.getKeyFieldName()
)
- .collectList()
- .mapNotNull(shards -> mergeResults(queryParams, shards))
- .map(llSearchResult -> mapResults(llSearchResult))
- .defaultIfEmpty(Hits.empty())
- .doOnDiscard(DiscardingCloseable.class, LLUtils::onDiscard);
+ .toList();
+
+ var mergedResults = mergeResults(queryParams, results);
+ if (mergedResults != null) {
+ return mapResults(mergedResults);
+ } else {
+ return Hits.empty();
+ }
}
@Override
- public Mono computeBuckets(@Nullable CompositeSnapshot snapshot,
+ public Buckets computeBuckets(@Nullable CompositeSnapshot snapshot,
@NotNull List query,
@Nullable Query normalizationQuery,
BucketParams bucketParams) {
- return luceneIndex.computeBuckets(resolveSnapshot(snapshot), query,
- normalizationQuery, bucketParams).single();
+ return luceneIndex.computeBuckets(resolveSnapshot(snapshot), query, normalizationQuery, bucketParams);
}
private Hits> mapResults(LLSearchResultShard llSearchResult) {
- Flux> scoresWithKeysFlux = llSearchResult.results()
+ Stream> scoresWithKeysFlux = llSearchResult.results()
.map(hit -> new HitKey<>(indicizer.getKey(hit.key()), hit.score()));
if (llSearchResult instanceof LuceneCloseable luceneCloseable) {
@@ -156,10 +142,8 @@ public class LuceneIndexImpl implements LuceneIndex {
}
@Override
- public Mono count(@Nullable CompositeSnapshot snapshot, Query query) {
- return luceneIndex
- .count(resolveSnapshot(snapshot), query, MAX_COUNT_TIME)
- .doOnDiscard(DiscardingCloseable.class, LLUtils::onDiscard);
+ public TotalHitsCount count(@Nullable CompositeSnapshot snapshot, Query query) {
+ return luceneIndex.count(resolveSnapshot(snapshot), query, MAX_COUNT_TIME);
}
@Override
@@ -176,36 +160,36 @@ public class LuceneIndexImpl implements LuceneIndex {
* Flush writes to disk
*/
@Override
- public Mono flush() {
- return luceneIndex.flush();
+ public void flush() {
+ luceneIndex.flush();
}
@Override
- public Mono waitForMerges() {
- return luceneIndex.waitForMerges();
+ public void waitForMerges() {
+ luceneIndex.waitForMerges();
}
@Override
- public Mono waitForLastMerges() {
- return luceneIndex.waitForLastMerges();
+ public void waitForLastMerges() {
+ luceneIndex.waitForLastMerges();
}
/**
* Refresh index searcher
*/
@Override
- public Mono refresh(boolean force) {
- return luceneIndex.refresh(force);
+ public void refresh(boolean force) {
+ luceneIndex.refresh(force);
}
@Override
- public Mono takeSnapshot() {
+ public LLSnapshot takeSnapshot() {
return luceneIndex.takeSnapshot();
}
@Override
- public Mono releaseSnapshot(LLSnapshot snapshot) {
- return luceneIndex.releaseSnapshot(snapshot);
+ public void releaseSnapshot(LLSnapshot snapshot) {
+ luceneIndex.releaseSnapshot(snapshot);
}
@SuppressWarnings({"unchecked", "rawtypes"})
@@ -217,7 +201,7 @@ public class LuceneIndexImpl implements LuceneIndex {
return shards.get(0);
}
TotalHitsCount count = null;
- ObjectArrayList> results = new ObjectArrayList<>(shards.size());
+ ObjectArrayList> results = new ObjectArrayList<>(shards.size());
ObjectArrayList resources = new ObjectArrayList(shards.size());
boolean luceneResources = false;
for (LLSearchResultShard shard : shards) {
@@ -230,17 +214,17 @@ public class LuceneIndexImpl implements LuceneIndex {
count = LuceneUtils.sum(count, shard.totalHitsCount());
}
var maxLimit = queryParams.offset() + queryParams.limit();
- results.add(shard.results().take(maxLimit, true));
+ results.add(shard.results().limit(maxLimit));
resources.add(shard);
}
Objects.requireNonNull(count);
- Flux resultsFlux;
+ Stream resultsFlux;
if (results.size() == 0) {
- resultsFlux = Flux.empty();
+ resultsFlux = Stream.empty();
} else if (results.size() == 1) {
resultsFlux = results.get(0);
} else {
- resultsFlux = Flux.merge(results);
+ resultsFlux = results.parallelStream().flatMap(Function.identity());
}
if (luceneResources) {
return new LuceneLLSearchResultShard(resultsFlux, count, (List) resources);
diff --git a/src/main/java/it/cavallium/dbengine/client/MappedSerializer.java b/src/main/java/it/cavallium/dbengine/client/MappedSerializer.java
index 91d8f74..021be3b 100644
--- a/src/main/java/it/cavallium/dbengine/client/MappedSerializer.java
+++ b/src/main/java/it/cavallium/dbengine/client/MappedSerializer.java
@@ -1,11 +1,11 @@
package it.cavallium.dbengine.client;
-import io.netty5.buffer.Buffer;
-import io.netty5.util.Send;
+import it.cavallium.dbengine.buffers.Buf;
+import it.cavallium.dbengine.buffers.BufDataInput;
+import it.cavallium.dbengine.buffers.BufDataOutput;
import it.cavallium.dbengine.database.serialization.SerializationException;
import it.cavallium.dbengine.database.serialization.Serializer;
import org.jetbrains.annotations.NotNull;
-import org.jetbrains.annotations.Nullable;
public class MappedSerializer implements Serializer {
@@ -19,13 +19,13 @@ public class MappedSerializer implements Serializer {
}
@Override
- public @NotNull B deserialize(@NotNull Buffer serialized) throws SerializationException {
- return keyMapper.map(serializer.deserialize(serialized));
+ public @NotNull B deserialize(@NotNull BufDataInput in) throws SerializationException {
+ return keyMapper.map(serializer.deserialize(in));
}
@Override
- public void serialize(@NotNull B deserialized, Buffer output) throws SerializationException {
- serializer.serialize(keyMapper.unmap(deserialized), output);
+ public void serialize(@NotNull B deserialized, BufDataOutput out) throws SerializationException {
+ serializer.serialize(keyMapper.unmap(deserialized), out);
}
@Override
diff --git a/src/main/java/it/cavallium/dbengine/client/MappedSerializerFixedLength.java b/src/main/java/it/cavallium/dbengine/client/MappedSerializerFixedLength.java
index ebe9090..36fc752 100644
--- a/src/main/java/it/cavallium/dbengine/client/MappedSerializerFixedLength.java
+++ b/src/main/java/it/cavallium/dbengine/client/MappedSerializerFixedLength.java
@@ -1,11 +1,11 @@
package it.cavallium.dbengine.client;
-import io.netty5.buffer.Buffer;
-import io.netty5.util.Send;
+import it.cavallium.dbengine.buffers.Buf;
+import it.cavallium.dbengine.buffers.BufDataInput;
+import it.cavallium.dbengine.buffers.BufDataOutput;
import it.cavallium.dbengine.database.serialization.SerializationException;
import it.cavallium.dbengine.database.serialization.SerializerFixedBinaryLength;
import org.jetbrains.annotations.NotNull;
-import org.jetbrains.annotations.Nullable;
public class MappedSerializerFixedLength implements SerializerFixedBinaryLength {
@@ -19,13 +19,13 @@ public class MappedSerializerFixedLength implements SerializerFixedBinaryL
}
@Override
- public @NotNull B deserialize(@NotNull Buffer serialized) throws SerializationException {
- return keyMapper.map(fixedLengthSerializer.deserialize(serialized));
+ public @NotNull B deserialize(@NotNull BufDataInput in) throws SerializationException {
+ return keyMapper.map(fixedLengthSerializer.deserialize(in));
}
@Override
- public void serialize(@NotNull B deserialized, Buffer output) throws SerializationException {
- fixedLengthSerializer.serialize(keyMapper.unmap(deserialized), output);
+ public void serialize(@NotNull B deserialized, BufDataOutput out) throws SerializationException {
+ fixedLengthSerializer.serialize(keyMapper.unmap(deserialized), out);
}
@Override
diff --git a/src/main/java/it/cavallium/dbengine/client/NoMapper.java b/src/main/java/it/cavallium/dbengine/client/NoMapper.java
index 1b91787..98b3279 100644
--- a/src/main/java/it/cavallium/dbengine/client/NoMapper.java
+++ b/src/main/java/it/cavallium/dbengine/client/NoMapper.java
@@ -1,7 +1,5 @@
package it.cavallium.dbengine.client;
-import it.cavallium.dbengine.client.Mapper;
-
public class NoMapper implements Mapper {
@Override
diff --git a/src/main/java/it/cavallium/dbengine/client/UninterruptibleScheduler.java b/src/main/java/it/cavallium/dbengine/client/UninterruptibleScheduler.java
deleted file mode 100644
index ba05044..0000000
--- a/src/main/java/it/cavallium/dbengine/client/UninterruptibleScheduler.java
+++ /dev/null
@@ -1,90 +0,0 @@
-package it.cavallium.dbengine.client;
-
-import java.util.concurrent.TimeUnit;
-import org.jetbrains.annotations.NotNull;
-import reactor.core.Disposable;
-import reactor.core.scheduler.Scheduler;
-
-public class UninterruptibleScheduler {
-
- public static Scheduler uninterruptibleScheduler(Scheduler scheduler) {
- return new Scheduler() {
- @Override
- public @NotNull Disposable schedule(@NotNull Runnable task) {
- scheduler.schedule(task);
- return () -> {};
- }
-
- @Override
- public @NotNull Disposable schedule(@NotNull Runnable task, long delay, @NotNull TimeUnit unit) {
- scheduler.schedule(task, delay, unit);
- return () -> {};
- }
-
- @Override
- public @NotNull Disposable schedulePeriodically(@NotNull Runnable task,
- long initialDelay,
- long period,
- @NotNull TimeUnit unit) {
- scheduler.schedulePeriodically(task, initialDelay, period, unit);
- return () -> {};
- }
-
- @Override
- public boolean isDisposed() {
- return scheduler.isDisposed();
- }
-
- @Override
- public void dispose() {
- scheduler.dispose();
- }
-
- @Override
- public void start() {
- scheduler.start();
- }
-
- @Override
- public long now(@NotNull TimeUnit unit) {
- return Scheduler.super.now(unit);
- }
-
- @Override
- public @NotNull Worker createWorker() {
- var worker = scheduler.createWorker();
- return new Worker() {
- @Override
- public @NotNull Disposable schedule(@NotNull Runnable task) {
- worker.schedule(task);
- return () -> {};
- }
-
- @Override
- public void dispose() {
- }
-
- @Override
- public boolean isDisposed() {
- return worker.isDisposed();
- }
-
- @Override
- public @NotNull Disposable schedule(@NotNull Runnable task, long delay, @NotNull TimeUnit unit) {
- worker.schedule(task, delay, unit);
- return () -> {};
- }
-
- @Override
- public @NotNull Disposable schedulePeriodically(@NotNull Runnable task,
- long initialDelay,
- long period,
- @NotNull TimeUnit unit) {
- worker.schedulePeriodically(task, initialDelay, period, unit);
- return () -> {};
- }
- };
- }
- };
- }
-}
diff --git a/src/main/java/it/cavallium/dbengine/client/query/ClientQueryParams.java b/src/main/java/it/cavallium/dbengine/client/query/ClientQueryParams.java
index 9b36a12..0e04007 100644
--- a/src/main/java/it/cavallium/dbengine/client/query/ClientQueryParams.java
+++ b/src/main/java/it/cavallium/dbengine/client/query/ClientQueryParams.java
@@ -1,7 +1,6 @@
package it.cavallium.dbengine.client.query;
import io.soabase.recordbuilder.core.RecordBuilder;
-import it.cavallium.data.generator.nativedata.Nullablefloat;
import it.cavallium.dbengine.client.CompositeSnapshot;
import it.cavallium.dbengine.client.Sort;
import it.cavallium.dbengine.client.query.current.data.NoSort;
diff --git a/src/main/java/it/cavallium/dbengine/client/query/QueryMoshi.java b/src/main/java/it/cavallium/dbengine/client/query/QueryMoshi.java
index 722fa55..e711b5c 100644
--- a/src/main/java/it/cavallium/dbengine/client/query/QueryMoshi.java
+++ b/src/main/java/it/cavallium/dbengine/client/query/QueryMoshi.java
@@ -1,12 +1,19 @@
package it.cavallium.dbengine.client.query;
import com.squareup.moshi.JsonAdapter;
+import it.cavallium.dbengine.buffers.Buf;
import it.cavallium.dbengine.client.IntOpenHashSetJsonAdapter;
import it.cavallium.dbengine.client.query.current.CurrentVersion;
import it.cavallium.dbengine.client.query.current.IBaseType;
import it.cavallium.dbengine.client.query.current.IType;
+import it.cavallium.dbengine.utils.BooleanListJsonAdapter;
+import it.cavallium.dbengine.utils.ByteListJsonAdapter;
+import it.cavallium.dbengine.utils.CharListJsonAdapter;
+import it.cavallium.dbengine.utils.IntListJsonAdapter;
+import it.cavallium.dbengine.utils.LongListJsonAdapter;
+import it.cavallium.dbengine.utils.MoshiPolymorphic;
+import it.cavallium.dbengine.utils.ShortListJsonAdapter;
import it.unimi.dsi.fastutil.booleans.BooleanList;
-import it.unimi.dsi.fastutil.bytes.ByteList;
import it.unimi.dsi.fastutil.chars.CharList;
import it.unimi.dsi.fastutil.ints.IntList;
import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
@@ -18,13 +25,6 @@ import it.unimi.dsi.fastutil.shorts.ShortList;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
-import it.cavallium.dbengine.utils.BooleanListJsonAdapter;
-import it.cavallium.dbengine.utils.ByteListJsonAdapter;
-import it.cavallium.dbengine.utils.CharListJsonAdapter;
-import it.cavallium.dbengine.utils.IntListJsonAdapter;
-import it.cavallium.dbengine.utils.LongListJsonAdapter;
-import it.cavallium.dbengine.utils.MoshiPolymorphic;
-import it.cavallium.dbengine.utils.ShortListJsonAdapter;
public class QueryMoshi extends MoshiPolymorphic {
@@ -57,7 +57,7 @@ public class QueryMoshi extends MoshiPolymorphic {
this.concreteClasses = concreteClasses;
Object2ObjectMap, JsonAdapter>> extraAdapters = new Object2ObjectOpenHashMap<>();
extraAdapters.put(BooleanList.class, new BooleanListJsonAdapter());
- extraAdapters.put(ByteList.class, new ByteListJsonAdapter());
+ extraAdapters.put(Buf.class, new ByteListJsonAdapter());
extraAdapters.put(ShortList.class, new ShortListJsonAdapter());
extraAdapters.put(CharList.class, new CharListJsonAdapter());
extraAdapters.put(IntList.class, new IntListJsonAdapter());
diff --git a/src/main/java/it/cavallium/dbengine/client/query/QueryParser.java b/src/main/java/it/cavallium/dbengine/client/query/QueryParser.java
index 84cf9f2..36bb21f 100644
--- a/src/main/java/it/cavallium/dbengine/client/query/QueryParser.java
+++ b/src/main/java/it/cavallium/dbengine/client/query/QueryParser.java
@@ -51,16 +51,7 @@ import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.lucene.analysis.Analyzer;
-import org.apache.lucene.analysis.LowerCaseFilter;
-import org.apache.lucene.analysis.StopFilter;
-import org.apache.lucene.analysis.TokenStream;
-import org.apache.lucene.analysis.Tokenizer;
-import org.apache.lucene.analysis.core.KeywordTokenizer;
-import org.apache.lucene.analysis.en.EnglishPossessiveFilter;
-import org.apache.lucene.analysis.en.PorterStemFilter;
import org.apache.lucene.analysis.miscellaneous.PerFieldAnalyzerWrapper;
-import org.apache.lucene.analysis.miscellaneous.SetKeywordMarkerFilter;
-import org.apache.lucene.analysis.standard.StandardTokenizer;
import org.apache.lucene.document.DoublePoint;
import org.apache.lucene.document.FloatPoint;
import org.apache.lucene.document.IntPoint;
@@ -89,7 +80,7 @@ public class QueryParser {
return null;
}
switch (query.getBaseType$()) {
- case StandardQuery:
+ case StandardQuery -> {
var standardQuery = (it.cavallium.dbengine.client.query.current.data.StandardQuery) query;
// Fix the analyzer
@@ -98,19 +89,12 @@ public class QueryParser {
.stream()
.collect(Collectors.toMap(Function.identity(), term -> new NoOpAnalyzer()));
analyzer = new PerFieldAnalyzerWrapper(analyzer, customAnalyzers);
-
var standardQueryParser = new StandardQueryParser(analyzer);
-
- standardQueryParser.setPointsConfigMap(standardQuery
- .pointsConfig()
- .stream()
- .collect(Collectors.toMap(
- PointConfig::field,
- pointConfig -> new PointsConfig(
- toNumberFormat(pointConfig.data().numberFormat()),
- toType(pointConfig.data().type())
- )
- )));
+ standardQueryParser.setPointsConfigMap(standardQuery.pointsConfig().stream().collect(
+ Collectors.toMap(PointConfig::field, pointConfig ->
+ new PointsConfig(toNumberFormat(pointConfig.data().numberFormat()), toType(pointConfig.data().type()))
+ ))
+ );
var defaultFields = standardQuery.defaultFields();
try {
Query parsed;
@@ -126,7 +110,8 @@ public class QueryParser {
} catch (QueryNodeException e) {
throw new IllegalStateException("Can't parse query expression \"" + standardQuery.query() + "\"", e);
}
- case BooleanQuery:
+ }
+ case BooleanQuery -> {
var booleanQuery = (it.cavallium.dbengine.client.query.current.data.BooleanQuery) query;
var bq = new Builder();
for (BooleanQueryPart part : booleanQuery.parts()) {
@@ -141,101 +126,127 @@ public class QueryParser {
}
bq.setMinimumNumberShouldMatch(booleanQuery.minShouldMatch());
return bq.build();
- case IntPointExactQuery:
+ }
+ case IntPointExactQuery -> {
var intPointExactQuery = (IntPointExactQuery) query;
return IntPoint.newExactQuery(intPointExactQuery.field(), intPointExactQuery.value());
- case IntNDPointExactQuery:
+ }
+ case IntNDPointExactQuery -> {
var intndPointExactQuery = (IntNDPointExactQuery) query;
var intndValues = intndPointExactQuery.value().toIntArray();
return IntPoint.newRangeQuery(intndPointExactQuery.field(), intndValues, intndValues);
- case LongPointExactQuery:
+ }
+ case LongPointExactQuery -> {
var longPointExactQuery = (LongPointExactQuery) query;
return LongPoint.newExactQuery(longPointExactQuery.field(), longPointExactQuery.value());
- case FloatPointExactQuery:
+ }
+ case FloatPointExactQuery -> {
var floatPointExactQuery = (FloatPointExactQuery) query;
return FloatPoint.newExactQuery(floatPointExactQuery.field(), floatPointExactQuery.value());
- case DoublePointExactQuery:
+ }
+ case DoublePointExactQuery -> {
var doublePointExactQuery = (DoublePointExactQuery) query;
return DoublePoint.newExactQuery(doublePointExactQuery.field(), doublePointExactQuery.value());
- case LongNDPointExactQuery:
+ }
+ case LongNDPointExactQuery -> {
var longndPointExactQuery = (LongNDPointExactQuery) query;
var longndValues = longndPointExactQuery.value().toLongArray();
return LongPoint.newRangeQuery(longndPointExactQuery.field(), longndValues, longndValues);
- case FloatNDPointExactQuery:
+ }
+ case FloatNDPointExactQuery -> {
var floatndPointExactQuery = (FloatNDPointExactQuery) query;
var floatndValues = floatndPointExactQuery.value().toFloatArray();
return FloatPoint.newRangeQuery(floatndPointExactQuery.field(), floatndValues, floatndValues);
- case DoubleNDPointExactQuery:
+ }
+ case DoubleNDPointExactQuery -> {
var doublendPointExactQuery = (DoubleNDPointExactQuery) query;
var doublendValues = doublendPointExactQuery.value().toDoubleArray();
return DoublePoint.newRangeQuery(doublendPointExactQuery.field(), doublendValues, doublendValues);
- case IntPointSetQuery:
+ }
+ case IntPointSetQuery -> {
var intPointSetQuery = (IntPointSetQuery) query;
return IntPoint.newSetQuery(intPointSetQuery.field(), intPointSetQuery.values().toIntArray());
- case LongPointSetQuery:
+ }
+ case LongPointSetQuery -> {
var longPointSetQuery = (LongPointSetQuery) query;
return LongPoint.newSetQuery(longPointSetQuery.field(), longPointSetQuery.values().toLongArray());
- case FloatPointSetQuery:
+ }
+ case FloatPointSetQuery -> {
var floatPointSetQuery = (FloatPointSetQuery) query;
return FloatPoint.newSetQuery(floatPointSetQuery.field(), floatPointSetQuery.values().toFloatArray());
- case DoublePointSetQuery:
+ }
+ case DoublePointSetQuery -> {
var doublePointSetQuery = (DoublePointSetQuery) query;
return DoublePoint.newSetQuery(doublePointSetQuery.field(), doublePointSetQuery.values().toDoubleArray());
- case TermQuery:
+ }
+ case TermQuery -> {
var termQuery = (TermQuery) query;
return new org.apache.lucene.search.TermQuery(toTerm(termQuery.term()));
- case IntTermQuery:
+ }
+ case IntTermQuery -> {
var intTermQuery = (IntTermQuery) query;
return new org.apache.lucene.search.TermQuery(new Term(intTermQuery.field(),
IntPoint.pack(intTermQuery.value())
));
- case IntNDTermQuery:
+ }
+ case IntNDTermQuery -> {
var intNDTermQuery = (IntNDTermQuery) query;
return new org.apache.lucene.search.TermQuery(new Term(intNDTermQuery.field(),
IntPoint.pack(intNDTermQuery.value().toIntArray())
));
- case LongTermQuery:
+ }
+ case LongTermQuery -> {
var longTermQuery = (LongTermQuery) query;
return new org.apache.lucene.search.TermQuery(new Term(longTermQuery.field(),
LongPoint.pack(longTermQuery.value())
));
- case LongNDTermQuery:
+ }
+ case LongNDTermQuery -> {
var longNDTermQuery = (LongNDTermQuery) query;
return new org.apache.lucene.search.TermQuery(new Term(longNDTermQuery.field(),
LongPoint.pack(longNDTermQuery.value().toLongArray())
));
- case FloatTermQuery:
+ }
+ case FloatTermQuery -> {
var floatTermQuery = (FloatTermQuery) query;
return new org.apache.lucene.search.TermQuery(new Term(floatTermQuery.field(),
FloatPoint.pack(floatTermQuery.value())
));
- case FloatNDTermQuery:
+ }
+ case FloatNDTermQuery -> {
var floatNDTermQuery = (FloatNDTermQuery) query;
return new org.apache.lucene.search.TermQuery(new Term(floatNDTermQuery.field(),
FloatPoint.pack(floatNDTermQuery.value().toFloatArray())
));
- case DoubleTermQuery:
+ }
+ case DoubleTermQuery -> {
var doubleTermQuery = (DoubleTermQuery) query;
return new org.apache.lucene.search.TermQuery(new Term(doubleTermQuery.field(),
DoublePoint.pack(doubleTermQuery.value())
));
- case DoubleNDTermQuery:
+ }
+ case DoubleNDTermQuery -> {
var doubleNDTermQuery = (DoubleNDTermQuery) query;
return new org.apache.lucene.search.TermQuery(new Term(doubleNDTermQuery.field(),
DoublePoint.pack(doubleNDTermQuery.value().toDoubleArray())
));
- case FieldExistsQuery:
+ }
+ case FieldExistsQuery -> {
var fieldExistQuery = (FieldExistsQuery) query;
return new org.apache.lucene.search.FieldExistsQuery(fieldExistQuery.field());
- case BoostQuery:
+ }
+ case BoostQuery -> {
var boostQuery = (BoostQuery) query;
return new org.apache.lucene.search.BoostQuery(toQuery(boostQuery.query(), analyzer), boostQuery.scoreBoost());
- case ConstantScoreQuery:
+ }
+ case ConstantScoreQuery -> {
var constantScoreQuery = (ConstantScoreQuery) query;
return new org.apache.lucene.search.ConstantScoreQuery(toQuery(constantScoreQuery.query(), analyzer));
- case BoxedQuery:
+ }
+ case BoxedQuery -> {
return toQuery(((BoxedQuery) query).query(), analyzer);
- case FuzzyQuery:
+ }
+ case FuzzyQuery -> {
var fuzzyQuery = (it.cavallium.dbengine.client.query.current.data.FuzzyQuery) query;
return new FuzzyQuery(toTerm(fuzzyQuery.term()),
fuzzyQuery.maxEdits(),
@@ -243,56 +254,67 @@ public class QueryParser {
fuzzyQuery.maxExpansions(),
fuzzyQuery.transpositions()
);
- case IntPointRangeQuery:
+ }
+ case IntPointRangeQuery -> {
var intPointRangeQuery = (IntPointRangeQuery) query;
return IntPoint.newRangeQuery(intPointRangeQuery.field(), intPointRangeQuery.min(), intPointRangeQuery.max());
- case IntNDPointRangeQuery:
+ }
+ case IntNDPointRangeQuery -> {
var intndPointRangeQuery = (IntNDPointRangeQuery) query;
return IntPoint.newRangeQuery(intndPointRangeQuery.field(),
intndPointRangeQuery.min().toIntArray(),
intndPointRangeQuery.max().toIntArray()
);
- case LongPointRangeQuery:
+ }
+ case LongPointRangeQuery -> {
var longPointRangeQuery = (LongPointRangeQuery) query;
return LongPoint.newRangeQuery(longPointRangeQuery.field(),
longPointRangeQuery.min(),
longPointRangeQuery.max()
);
- case FloatPointRangeQuery:
+ }
+ case FloatPointRangeQuery -> {
var floatPointRangeQuery = (FloatPointRangeQuery) query;
return FloatPoint.newRangeQuery(floatPointRangeQuery.field(),
floatPointRangeQuery.min(),
floatPointRangeQuery.max()
);
- case DoublePointRangeQuery:
+ }
+ case DoublePointRangeQuery -> {
var doublePointRangeQuery = (DoublePointRangeQuery) query;
return DoublePoint.newRangeQuery(doublePointRangeQuery.field(),
doublePointRangeQuery.min(),
doublePointRangeQuery.max()
);
- case LongNDPointRangeQuery:
+ }
+ case LongNDPointRangeQuery -> {
var longndPointRangeQuery = (LongNDPointRangeQuery) query;
return LongPoint.newRangeQuery(longndPointRangeQuery.field(),
longndPointRangeQuery.min().toLongArray(),
longndPointRangeQuery.max().toLongArray()
);
- case FloatNDPointRangeQuery:
+ }
+ case FloatNDPointRangeQuery -> {
var floatndPointRangeQuery = (FloatNDPointRangeQuery) query;
return FloatPoint.newRangeQuery(floatndPointRangeQuery.field(),
floatndPointRangeQuery.min().toFloatArray(),
floatndPointRangeQuery.max().toFloatArray()
);
- case DoubleNDPointRangeQuery:
+ }
+ case DoubleNDPointRangeQuery -> {
var doublendPointRangeQuery = (DoubleNDPointRangeQuery) query;
return DoublePoint.newRangeQuery(doublendPointRangeQuery.field(),
doublendPointRangeQuery.min().toDoubleArray(),
doublendPointRangeQuery.max().toDoubleArray()
);
- case MatchAllDocsQuery:
+ }
+ case MatchAllDocsQuery -> {
return new MatchAllDocsQuery();
- case MatchNoDocsQuery:
+ }
+ case MatchNoDocsQuery -> {
return new MatchNoDocsQuery();
- case PhraseQuery:
+ }
+ case PhraseQuery -> {
var phraseQuery = (PhraseQuery) query;
var pqb = new org.apache.lucene.search.PhraseQuery.Builder();
for (TermPosition phrase : phraseQuery.phrase()) {
@@ -300,27 +322,31 @@ public class QueryParser {
}
pqb.setSlop(phraseQuery.slop());
return pqb.build();
- case SortedDocFieldExistsQuery:
+ }
+ case SortedDocFieldExistsQuery -> {
var sortedDocFieldExistsQuery = (SortedDocFieldExistsQuery) query;
return new DocValuesFieldExistsQuery(sortedDocFieldExistsQuery.field());
- case SynonymQuery:
+ }
+ case SynonymQuery -> {
var synonymQuery = (SynonymQuery) query;
var sqb = new org.apache.lucene.search.SynonymQuery.Builder(synonymQuery.field());
for (TermAndBoost part : synonymQuery.parts()) {
sqb.addTerm(toTerm(part.term()), part.boost());
}
return sqb.build();
- case SortedNumericDocValuesFieldSlowRangeQuery:
+ }
+ case SortedNumericDocValuesFieldSlowRangeQuery -> {
var sortedNumericDocValuesFieldSlowRangeQuery = (SortedNumericDocValuesFieldSlowRangeQuery) query;
return SortedNumericDocValuesField.newSlowRangeQuery(sortedNumericDocValuesFieldSlowRangeQuery.field(),
sortedNumericDocValuesFieldSlowRangeQuery.min(),
sortedNumericDocValuesFieldSlowRangeQuery.max()
);
- case WildcardQuery:
+ }
+ case WildcardQuery -> {
var wildcardQuery = (WildcardQuery) query;
return new org.apache.lucene.search.WildcardQuery(new Term(wildcardQuery.field(), wildcardQuery.pattern()));
- default:
- throw new IllegalStateException("Unexpected value: " + query.getBaseType$());
+ }
+ default -> throw new IllegalStateException("Unexpected value: " + query.getBaseType$());
}
}
diff --git a/src/main/java/it/cavallium/dbengine/client/query/QueryUtils.java b/src/main/java/it/cavallium/dbengine/client/query/QueryUtils.java
index b55725c..be7a47b 100644
--- a/src/main/java/it/cavallium/dbengine/client/query/QueryUtils.java
+++ b/src/main/java/it/cavallium/dbengine/client/query/QueryUtils.java
@@ -17,7 +17,6 @@ import it.cavallium.dbengine.lucene.LuceneUtils;
import it.cavallium.dbengine.lucene.analyzer.TextFieldsAnalyzer;
import java.util.ArrayList;
import java.util.List;
-import java.util.stream.Collectors;
import org.apache.lucene.search.BooleanClause;
import org.apache.lucene.util.QueryBuilder;
import org.jetbrains.annotations.NotNull;
@@ -63,29 +62,17 @@ public class QueryUtils {
for (BooleanClause booleanClause : booleanQuery) {
org.apache.lucene.search.Query queryPartQuery = booleanClause.getQuery();
- Occur occur;
- switch (booleanClause.getOccur()) {
- case MUST:
- occur = OccurMust.of();
- break;
- case FILTER:
- occur = OccurFilter.of();
- break;
- case SHOULD:
- occur = OccurShould.of();
- break;
- case MUST_NOT:
- occur = OccurMustNot.of();
- break;
- default:
- throw new IllegalArgumentException();
- }
+ Occur occur = switch (booleanClause.getOccur()) {
+ case MUST -> OccurMust.of();
+ case FILTER -> OccurFilter.of();
+ case SHOULD -> OccurShould.of();
+ case MUST_NOT -> OccurMustNot.of();
+ };
queryParts.add(BooleanQueryPart.of(transformQuery(field, queryPartQuery), occur));
}
return BooleanQuery.of(List.copyOf(queryParts), booleanQuery.getMinimumNumberShouldMatch());
}
- if (luceneQuery instanceof org.apache.lucene.search.PhraseQuery) {
- var phraseQuery = (org.apache.lucene.search.PhraseQuery) luceneQuery;
+ if (luceneQuery instanceof org.apache.lucene.search.PhraseQuery phraseQuery) {
int slop = phraseQuery.getSlop();
var terms = phraseQuery.getTerms();
var positions = phraseQuery.getPositions();
diff --git a/src/main/java/it/cavallium/dbengine/database/BufSupplier.java b/src/main/java/it/cavallium/dbengine/database/BufSupplier.java
deleted file mode 100644
index 06f212c..0000000
--- a/src/main/java/it/cavallium/dbengine/database/BufSupplier.java
+++ /dev/null
@@ -1,62 +0,0 @@
-package it.cavallium.dbengine.database;
-
-import io.netty5.buffer.Buffer;
-import io.netty5.util.Send;
-import java.util.function.Supplier;
-
-public abstract class BufSupplier implements SafeCloseable, DiscardingCloseable, Supplier {
-
- public static BufSupplier of(Supplier supplier) {
- return new SimpleBufSupplier(supplier);
- }
-
- public static BufSupplier of(Send supplier) {
- return new CopyBufSupplier(supplier.receive());
- }
-
- public static BufSupplier ofOwned(Buffer supplier) {
- return new CopyBufSupplier(supplier);
- }
-
- public static BufSupplier ofShared(Buffer supplier) {
- return new SimpleBufSupplier(() -> supplier.copy());
- }
-
- private static final class SimpleBufSupplier extends BufSupplier {
-
- private final Supplier supplier;
-
- public SimpleBufSupplier(Supplier supplier) {
- this.supplier = supplier;
- }
-
- @Override
- public Buffer get() {
- return supplier.get();
- }
-
- @Override
- public void close() {
-
- }
- }
-
- private static final class CopyBufSupplier extends BufSupplier {
-
- private final Buffer supplier;
-
- public CopyBufSupplier(Buffer supplier) {
- this.supplier = supplier;
- }
-
- @Override
- public Buffer get() {
- return supplier.copy();
- }
-
- @Override
- public void close() {
- supplier.close();
- }
- }
-}
diff --git a/src/main/java/it/cavallium/dbengine/database/DatabaseOperations.java b/src/main/java/it/cavallium/dbengine/database/DatabaseOperations.java
index 0347bd5..04d59a8 100644
--- a/src/main/java/it/cavallium/dbengine/database/DatabaseOperations.java
+++ b/src/main/java/it/cavallium/dbengine/database/DatabaseOperations.java
@@ -2,10 +2,9 @@ package it.cavallium.dbengine.database;
import it.cavallium.dbengine.rpc.current.data.Column;
import java.nio.file.Path;
-import org.reactivestreams.Publisher;
-import reactor.core.publisher.Mono;
+import java.util.stream.Stream;
public interface DatabaseOperations {
- Mono ingestSST(Column column, Publisher files, boolean replaceExisting);
+ void ingestSST(Column column, Stream files, boolean replaceExisting);
}
diff --git a/src/main/java/it/cavallium/dbengine/database/DatabaseProperties.java b/src/main/java/it/cavallium/dbengine/database/DatabaseProperties.java
index 8c79293..58939ca 100644
--- a/src/main/java/it/cavallium/dbengine/database/DatabaseProperties.java
+++ b/src/main/java/it/cavallium/dbengine/database/DatabaseProperties.java
@@ -2,30 +2,30 @@ package it.cavallium.dbengine.database;
import it.cavallium.dbengine.client.MemoryStats;
import it.cavallium.dbengine.rpc.current.data.Column;
+import java.io.IOException;
import java.util.Map;
+import java.util.stream.Stream;
import org.jetbrains.annotations.Nullable;
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
public interface DatabaseProperties {
- Mono getMemoryStats();
+ MemoryStats getMemoryStats();
- Mono getRocksDBStats();
+ String getRocksDBStats();
- Mono