package it.cavallium.dbengine.database; import com.google.common.primitives.Ints; import com.google.common.primitives.Longs; import io.netty5.buffer.api.Buffer; import io.netty5.buffer.api.BufferAllocator; import io.netty5.buffer.api.CompositeBuffer; import io.netty5.buffer.api.Send; import io.netty5.util.IllegalReferenceCountException; import io.netty5.util.internal.PlatformDependent; import it.cavallium.dbengine.database.collections.DatabaseStage; import it.cavallium.dbengine.database.disk.MemorySegmentUtils; import it.cavallium.dbengine.database.serialization.SerializationException; import it.cavallium.dbengine.database.serialization.SerializationFunction; import it.cavallium.dbengine.lucene.RandomSortField; import java.nio.ByteBuffer; import java.nio.charset.Charset; import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Objects; import java.util.Optional; import java.util.concurrent.Callable; import java.util.function.ToIntFunction; import org.apache.lucene.document.Document; import org.apache.lucene.document.Field; import org.apache.lucene.document.FloatPoint; import org.apache.lucene.document.IntPoint; import org.apache.lucene.document.LongPoint; import org.apache.lucene.document.SortedNumericDocValuesField; import org.apache.lucene.document.StringField; import org.apache.lucene.document.TextField; import org.apache.lucene.index.IndexableField; import org.apache.lucene.index.Term; import org.apache.lucene.search.ScoreMode; import org.apache.lucene.search.Sort; import org.apache.lucene.search.SortField; import org.apache.lucene.search.SortedNumericSortField; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import org.rocksdb.RocksDB; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.util.function.Tuple2; import reactor.util.function.Tuple3; @SuppressWarnings("unused") public class LLUtils { private static final Logger logger = LoggerFactory.getLogger(LLUtils.class); private static final ByteBuffer EMPTY_BYTE_BUFFER = ByteBuffer.allocateDirect(0); private static final byte[] RESPONSE_TRUE = new byte[]{1}; private static final byte[] RESPONSE_FALSE = new byte[]{0}; private static final byte[] RESPONSE_TRUE_BUF = new byte[]{1}; private static final byte[] RESPONSE_FALSE_BUF = new byte[]{0}; public static final byte[][] LEXICONOGRAPHIC_ITERATION_SEEKS = new byte[256][1]; static { for (int i1 = 0; i1 < 256; i1++) { var b = LEXICONOGRAPHIC_ITERATION_SEEKS[i1]; b[0] = (byte) i1; } } public static boolean responseToBoolean(byte[] response) { return response[0] == 1; } public static boolean responseToBoolean(Send responseToReceive) { try (var response = responseToReceive.receive()) { assert response.readableBytes() == 1; return response.getByte(response.readerOffset()) == 1; } } public static byte[] booleanToResponse(boolean bool) { return bool ? RESPONSE_TRUE : RESPONSE_FALSE; } public static Send booleanToResponseByteBuffer(BufferAllocator alloc, boolean bool) { return alloc.allocate(1).writeByte(bool ? (byte) 1 : 0).send(); } @Nullable public static Sort toSort(@Nullable LLSort sort) { if (sort == null) { return null; } if (sort.getType() == LLSortType.LONG) { return new Sort(new SortedNumericSortField(sort.getFieldName(), SortField.Type.LONG, sort.isReverse())); } else if (sort.getType() == LLSortType.RANDOM) { return new Sort(new RandomSortField()); } else if (sort.getType() == LLSortType.SCORE) { return new Sort(SortField.FIELD_SCORE); } else if (sort.getType() == LLSortType.DOC) { return new Sort(SortField.FIELD_DOC); } return null; } public static ScoreMode toScoreMode(LLScoreMode scoreMode) { return switch (scoreMode) { case COMPLETE -> ScoreMode.COMPLETE; case TOP_SCORES -> ScoreMode.TOP_SCORES; case COMPLETE_NO_SCORES -> ScoreMode.COMPLETE_NO_SCORES; default -> throw new IllegalStateException("Unexpected value: " + scoreMode); }; } public static Term toTerm(LLTerm term) { return new Term(term.getKey(), term.getValue()); } public static Document toDocument(LLDocument document) { Document d = new Document(); for (LLItem item : document.getItems()) { d.add(LLUtils.toField(item)); } return d; } public static Collection toDocuments(Collection document) { List d = new ArrayList<>(document.size()); for (LLDocument doc : document) { d.add(LLUtils.toDocument(doc)); } return d; } public static Collection toDocumentsFromEntries(Collection> documentsList) { ArrayList results = new ArrayList<>(documentsList.size()); for (Entry entry : documentsList) { results.add(LLUtils.toDocument(entry.getValue())); } return results; } public static Iterable toTerms(Iterable terms) { List d = new ArrayList<>(); for (LLTerm term : terms) { d.add(LLUtils.toTerm(term)); } return d; } private static IndexableField toField(LLItem item) { return switch (item.getType()) { case IntPoint -> new IntPoint(item.getName(), Ints.fromByteArray(item.getData())); case LongPoint -> new LongPoint(item.getName(), Longs.fromByteArray(item.getData())); case FloatPoint -> new FloatPoint(item.getName(), ByteBuffer.wrap(item.getData()).getFloat()); case TextField -> new TextField(item.getName(), item.stringValue(), Field.Store.NO); case TextFieldStored -> new TextField(item.getName(), item.stringValue(), Field.Store.YES); case SortedNumericDocValuesField -> new SortedNumericDocValuesField(item.getName(), Longs.fromByteArray(item.getData()) ); case StringField -> new StringField(item.getName(), item.stringValue(), Field.Store.NO); case StringFieldStored -> new StringField(item.getName(), item.stringValue(), Field.Store.YES); }; } public static it.cavallium.dbengine.database.LLKeyScore toKeyScore(LLKeyScore hit) { return new it.cavallium.dbengine.database.LLKeyScore(hit.docId(), hit.score(), hit.key()); } public static String toStringSafe(Buffer key) { try { if (key.isAccessible()) { return toString(key); } else { return "(released)"; } } catch (IllegalReferenceCountException ex) { return "(released)"; } } public static String toString(Buffer key) { if (key == null) { return "null"; } else { int startIndex = key.readerOffset(); int iMax = key.readableBytes() - 1; int iLimit = 128; if (iMax <= -1) { return "[]"; } else { StringBuilder b = new StringBuilder(); b.append('['); int i = 0; while (true) { b.append(key.getByte(startIndex + i)); if (i == iLimit) { b.append("…"); } if (i == iMax || i == iLimit) { return b.append(']').toString(); } b.append(", "); ++i; } } } } public static boolean equals(Buffer a, Buffer b) { if (a == null && b == null) { return true; } else if (a != null && b != null) { var aCur = a.openCursor(); var bCur = b.openCursor(); if (aCur.bytesLeft() != bCur.bytesLeft()) { return false; } while (aCur.readByte() && bCur.readByte()) { if (aCur.getByte() != bCur.getByte()) { return false; } } return true; } else { return false; } } /** * Returns {@code true} if and only if the two specified buffers are identical to each other for {@code length} bytes * starting at {@code aStartIndex} index for the {@code a} buffer and {@code bStartIndex} index for the {@code b} * buffer. A more compact way to express this is: *

* {@code a[aStartIndex : aStartIndex + length] == b[bStartIndex : bStartIndex + length]} */ public static boolean equals(Buffer a, int aStartIndex, Buffer b, int bStartIndex, int length) { var aCur = a.openCursor(aStartIndex, length); var bCur = b.openCursor(bStartIndex, length); if (aCur.bytesLeft() != bCur.bytesLeft()) { return false; } while (aCur.readByte() && bCur.readByte()) { if (aCur.getByte() != bCur.getByte()) { return false; } } return true; } public static byte[] toArray(Buffer key) { byte[] array = new byte[key.readableBytes()]; key.copyInto(key.readerOffset(), array, 0, key.readableBytes()); return array; } public static List toArray(List input) { List result = new ArrayList<>(input.size()); for (Buffer byteBuf : input) { result.add(toArray(byteBuf)); } return result; } public static int hashCode(Buffer buf) { if (buf == null) { return 0; } int result = 1; var cur = buf.openCursor(); while (cur.readByte()) { var element = cur.getByte(); result = 31 * result + element; } return result; } /** * @return null if size is equal to RocksDB.NOT_FOUND */ @SuppressWarnings("ConstantConditions") @Nullable public static Send readNullableDirectNioBuffer(BufferAllocator alloc, ToIntFunction reader) { ByteBuffer directBuffer; Buffer buffer; { var direct = LLUtils.newDirect(alloc, 4096); directBuffer = direct.byteBuffer(); buffer = direct.buffer().receive(); } try { int size; do { directBuffer.limit(directBuffer.capacity()); assert directBuffer.isDirect(); size = reader.applyAsInt(directBuffer); if (size != RocksDB.NOT_FOUND) { if (size == directBuffer.limit()) { buffer.readerOffset(0).writerOffset(size); return buffer.send(); } else { assert size > directBuffer.limit(); assert directBuffer.limit() > 0; // Free the buffer if (directBuffer != null) { // todo: check if free is needed PlatformDependent.freeDirectBuffer(directBuffer); directBuffer = null; } directBuffer = LLUtils.obtainDirect(buffer); buffer.ensureWritable(size); } } } while (size != RocksDB.NOT_FOUND); // Return null if size is equal to RocksDB.NOT_FOUND return null; } finally { // Free the buffer if (directBuffer != null) { // todo: check if free is needed PlatformDependent.freeDirectBuffer(directBuffer); directBuffer = null; } buffer.close(); } } public static record DirectBuffer(@NotNull Send buffer, @NotNull ByteBuffer byteBuffer) {} @NotNull public static DirectBuffer newDirect(BufferAllocator allocator, int size) { try (var buf = allocator.allocate(size)) { var direct = obtainDirect(buf); return new DirectBuffer(buf.send(), direct); } } @NotNull public static DirectBuffer convertToDirect(BufferAllocator allocator, Send content) { try (var buf = content.receive()) { if (buf.nativeAddress() != 0) { var direct = obtainDirect(buf); return new DirectBuffer(buf.send(), direct); } else { var direct = newDirect(allocator, buf.readableBytes()); try (var buf2 = direct.buffer().receive()) { buf.copyInto(buf.readerOffset(), buf2, buf2.writerOffset(), buf.readableBytes()); return new DirectBuffer(buf2.send(), direct.byteBuffer()); } } } } @NotNull public static ByteBuffer obtainDirect(Buffer buffer) { if (!PlatformDependent.hasUnsafe()) { throw new UnsupportedOperationException("Please enable unsafe support or disable netty direct buffers", PlatformDependent.getUnsafeUnavailabilityCause() ); } if (!MemorySegmentUtils.isSupported()) { throw new UnsupportedOperationException("Foreign Memory Access API support is disabled." + " Please set \"--enable-preview --add-modules jdk.incubator.foreign -Dforeign.restricted=permit\""); } assert buffer.isAccessible(); long nativeAddress; if ((nativeAddress = buffer.nativeAddress()) == 0) { if (buffer.capacity() == 0) { return EMPTY_BYTE_BUFFER; } if (!buffer.isAccessible()) { throw new IllegalStateException("Buffer is not accessible"); } throw new IllegalStateException("Buffer is not direct"); } return MemorySegmentUtils.directBuffer(nativeAddress, buffer.capacity()); } public static Buffer fromByteArray(BufferAllocator alloc, byte[] array) { Buffer result = alloc.allocate(array.length); result.writeBytes(array); return result; } @NotNull public static Send readDirectNioBuffer(BufferAllocator alloc, ToIntFunction reader) { var nullableSend = readNullableDirectNioBuffer(alloc, reader); try (var buffer = nullableSend != null ? nullableSend.receive() : null) { if (buffer == null) { throw new IllegalStateException("A non-nullable buffer read operation tried to return a \"not found\" element"); } return buffer.send(); } } public static Send compositeBuffer(BufferAllocator alloc, Send buffer) { try (var composite = buffer.receive()) { return composite.send(); } } public static Send compositeBuffer(BufferAllocator alloc, Send buffer1, Send buffer2) { try (var buf1 = buffer1.receive()) { try (var buf2 = buffer2.receive()) { try (var composite = CompositeBuffer.compose(alloc, buf1.split().send(), buf2.split().send())) { return composite.send(); } } } } public static Send compositeBuffer(BufferAllocator alloc, Send buffer1, Send buffer2, Send buffer3) { try (var buf1 = buffer1.receive()) { try (var buf2 = buffer2.receive()) { try (var buf3 = buffer3.receive()) { try (var composite = CompositeBuffer.compose(alloc, buf1.split().send(), buf2.split().send(), buf3.split().send() )) { return composite.send(); } } } } } @SafeVarargs public static Send compositeBuffer(BufferAllocator alloc, Send... buffers) { try { return switch (buffers.length) { case 0 -> alloc.allocate(0).send(); case 1 -> compositeBuffer(alloc, buffers[0]); case 2 -> compositeBuffer(alloc, buffers[0], buffers[1]); case 3 -> compositeBuffer(alloc, buffers[0], buffers[1], buffers[2]); default -> { Buffer[] bufs = new Buffer[buffers.length]; for (int i = 0; i < buffers.length; i++) { bufs[i] = buffers[i].receive(); } try { //noinspection unchecked Send[] sentBufs = new Send[buffers.length]; for (int i = 0; i < buffers.length; i++) { sentBufs[i] = bufs[i].split().send(); } try (var composite = CompositeBuffer.compose(alloc, sentBufs)) { yield composite.send(); } } finally { for (Buffer buf : bufs) { buf.close(); } } } }; } finally { for (Send buffer : buffers) { buffer.close(); } } } public static Mono resolveDelta(Mono> prev, UpdateReturnMode updateReturnMode) { return prev.handle((delta, sink) -> { switch (updateReturnMode) { case GET_NEW_VALUE -> { var current = delta.current(); if (current != null) { sink.next(current); } else { sink.complete(); } } case GET_OLD_VALUE -> { var previous = delta.previous(); if (previous != null) { sink.next(previous); } else { sink.complete(); } } case NOTHING -> sink.complete(); default -> sink.error(new IllegalStateException()); } }); } public static Mono> resolveLLDelta(Mono> prev, UpdateReturnMode updateReturnMode) { return prev.handle((deltaToReceive, sink) -> { try (var delta = deltaToReceive.receive()) { switch (updateReturnMode) { case GET_NEW_VALUE -> { var current = delta.current(); if (current != null) { sink.next(current); } else { sink.complete(); } } case GET_OLD_VALUE -> { var previous = delta.previous(); if (previous != null) { sink.next(previous); } else { sink.complete(); } } case NOTHING -> sink.complete(); default -> sink.error(new IllegalStateException()); } } }); } public static Mono> mapDelta(Mono> mono, SerializationFunction<@NotNull T, @Nullable U> mapper) { return mono.handle((delta, sink) -> { try { T prev = delta.previous(); T curr = delta.current(); U newPrev; U newCurr; if (prev != null) { newPrev = mapper.apply(prev); } else { newPrev = null; } if (curr != null) { newCurr = mapper.apply(curr); } else { newCurr = null; } sink.next(new Delta<>(newPrev, newCurr)); } catch (SerializationException ex) { sink.error(ex); } }); } public static Mono> mapLLDelta(Mono> mono, SerializationFunction<@NotNull Send, @Nullable U> mapper) { return mono.handle((deltaToReceive, sink) -> { try (var delta = deltaToReceive.receive()) { try (Send prev = delta.previous()) { try (Send curr = delta.current()) { U newPrev; U newCurr; if (prev != null) { newPrev = mapper.apply(prev); } else { newPrev = null; } if (curr != null) { newCurr = mapper.apply(curr); } else { newCurr = null; } sink.next(new Delta<>(newPrev, newCurr)); } } } catch (SerializationException ex) { sink.error(ex); } }); } public static boolean isDeltaChanged(Delta delta) { return !Objects.equals(delta.previous(), delta.current()); } public static Mono> lazyRetain(Buffer buf) { return Mono.just(buf).map(b -> b.copy().send()); } public static Mono> lazyRetainRange(LLRange range) { return Mono.just(range).map(r -> r.copy().send()); } public static Mono> lazyRetain(Callable> bufCallable) { return Mono.fromCallable(bufCallable); } public static Mono> lazyRetainRange(Callable> rangeCallable) { return Mono.fromCallable(rangeCallable); } public static Mono handleDiscard(Mono mono) { return mono.doOnDiscard(Object.class, obj -> { if (obj instanceof SafeCloseable o) { discardRefCounted(o); } else if (obj instanceof Entry o) { discardEntry(o); } else if (obj instanceof Collection o) { discardCollection(o); } else if (obj instanceof Tuple3 o) { discardTuple3(o); } else if (obj instanceof Tuple2 o) { discardTuple2(o); } else if (obj instanceof LLEntry o) { discardLLEntry(o); } else if (obj instanceof LLRange o) { discardLLRange(o); } else if (obj instanceof LLDelta o) { discardLLDelta(o); } else if (obj instanceof Delta o) { discardDelta(o); } else if (obj instanceof Send o) { discardSend(o); } else if (obj instanceof Map o) { discardMap(o); } else if (obj instanceof DatabaseStage o) { discardStage(o); } }); // todo: check if the single object discard hook is more performant /* .doOnDiscard(SafeCloseable.class, LLUtils::discardRefCounted) .doOnDiscard(Map.Entry.class, LLUtils::discardEntry) .doOnDiscard(Collection.class, LLUtils::discardCollection) .doOnDiscard(Tuple2.class, LLUtils::discardTuple2) .doOnDiscard(Tuple3.class, LLUtils::discardTuple3) .doOnDiscard(LLEntry.class, LLUtils::discardLLEntry) .doOnDiscard(LLRange.class, LLUtils::discardLLRange) .doOnDiscard(Delta.class, LLUtils::discardDelta) .doOnDiscard(LLDelta.class, LLUtils::discardLLDelta) .doOnDiscard(Send.class, LLUtils::discardSend) .doOnDiscard(Map.class, LLUtils::discardMap) .doOnDiscard(DatabaseStage.class, LLUtils::discardStage); */ } public static Flux handleDiscard(Flux mono) { return mono.doOnDiscard(Object.class, obj -> { if (obj instanceof SafeCloseable o) { discardRefCounted(o); } else if (obj instanceof Entry o) { discardEntry(o); } else if (obj instanceof Collection o) { discardCollection(o); } else if (obj instanceof Tuple3 o) { discardTuple3(o); } else if (obj instanceof Tuple2 o) { discardTuple2(o); } else if (obj instanceof LLEntry o) { discardLLEntry(o); } else if (obj instanceof LLRange o) { discardLLRange(o); } else if (obj instanceof Delta o) { discardDelta(o); } else if (obj instanceof LLDelta o) { discardLLDelta(o); } else if (obj instanceof Send o) { discardSend(o); } else if (obj instanceof Map o) { discardMap(o); } else if (obj instanceof DatabaseStage o) { discardStage(o); } }); // todo: check if the single object discard hook is more performant /* .doOnDiscard(SafeCloseable.class, LLUtils::discardRefCounted) .doOnDiscard(Map.Entry.class, LLUtils::discardEntry) .doOnDiscard(Collection.class, LLUtils::discardCollection) .doOnDiscard(Tuple2.class, LLUtils::discardTuple2) .doOnDiscard(Tuple3.class, LLUtils::discardTuple3) .doOnDiscard(LLEntry.class, LLUtils::discardLLEntry) .doOnDiscard(LLRange.class, LLUtils::discardLLRange) .doOnDiscard(Delta.class, LLUtils::discardDelta) .doOnDiscard(LLDelta.class, LLUtils::discardLLDelta) .doOnDiscard(Send.class, LLUtils::discardSend) .doOnDiscard(Map.class, LLUtils::discardMap) .doOnDiscard(DatabaseStage.class, LLUtils::discardStage); */ } private static void discardLLEntry(LLEntry entry) { logger.trace("Releasing discarded Buffer"); entry.close(); } private static void discardLLRange(LLRange range) { logger.trace("Releasing discarded Buffer"); range.close(); } private static void discardLLDelta(LLDelta delta) { logger.trace("Releasing discarded LLDelta"); delta.close(); } private static void discardEntry(Map.Entry e) { if (e.getKey() instanceof Buffer bb) { bb.close(); } if (e.getValue() instanceof Buffer bb) { bb.close(); } } private static void discardTuple2(Tuple2 e) { if (e.getT1() instanceof Buffer bb) { bb.close(); } if (e.getT2() instanceof Buffer bb) { bb.close(); } } private static void discardTuple3(Tuple3 e) { if (e.getT1() instanceof Buffer bb) { bb.close(); } else if (e.getT1() instanceof Optional opt) { if (opt.isPresent() && opt.get() instanceof Buffer bb) { bb.close(); } } if (e.getT2() instanceof Buffer bb) { bb.close(); } else if (e.getT1() instanceof Optional opt) { if (opt.isPresent() && opt.get() instanceof Buffer bb) { bb.close(); } } if (e.getT3() instanceof Buffer bb) { bb.close(); } else if (e.getT1() instanceof Optional opt) { if (opt.isPresent() && opt.get() instanceof Buffer bb) { bb.close(); } } } private static void discardRefCounted(SafeCloseable safeCloseable) { safeCloseable.close(); } private static void discardCollection(Collection collection) { for (Object o : collection) { if (o instanceof SafeCloseable safeCloseable) { safeCloseable.close(); } else if (o instanceof Map.Entry entry) { if (entry.getKey() instanceof SafeCloseable bb) { bb.close(); } if (entry.getValue() instanceof SafeCloseable bb) { bb.close(); } } else { break; } } } private static void discardDelta(Delta delta) { if (delta.previous() instanceof Buffer bb) { bb.close(); } if (delta.current() instanceof Buffer bb) { bb.close(); } } private static void discardSend(Send send) { send.close(); } private static void discardMap(Map map) { for (Entry entry : map.entrySet()) { boolean hasByteBuf = false; if (entry.getKey() instanceof Buffer bb) { bb.close(); hasByteBuf = true; } if (entry.getValue() instanceof Buffer bb) { bb.close(); hasByteBuf = true; } if (!hasByteBuf) { break; } } } private static void discardStage(DatabaseStage stage) { stage.release(); } public static boolean isDirect(Buffer key) { var readableComponents = key.countReadableComponents(); if (readableComponents == 0) { return true; } else if (readableComponents == 1) { return key.forEachReadable(0, (index, component) -> component.readableBuffer().isDirect()) >= 0; } else { return false; } } public static String deserializeString(Send bufferSend, int readerOffset, int length, Charset charset) { try (var buffer = bufferSend.receive()) { byte[] bytes = new byte[Math.min(length, buffer.readableBytes())]; buffer.copyInto(readerOffset, bytes, 0, length); return new String(bytes, charset); } } public static int utf8MaxBytes(String deserialized) { return deserialized.length() * 3; } }