package it.cavallium.dbengine.database; import static io.netty5.buffer.StandardAllocationTypes.OFF_HEAP; import static io.netty5.buffer.internal.InternalBufferUtils.NO_OP_DROP; import static it.cavallium.dbengine.lucene.LuceneUtils.luceneScheduler; import static org.apache.commons.lang3.ArrayUtils.EMPTY_BYTE_ARRAY; import com.google.common.primitives.Ints; import com.google.common.primitives.Longs; import io.netty5.buffer.AllocatorControl; import io.netty5.buffer.Buffer; import io.netty5.buffer.BufferAllocator; import io.netty5.buffer.BufferComponent; import io.netty5.buffer.CompositeBuffer; import io.netty5.buffer.Drop; import io.netty5.util.Resource; import io.netty5.util.Send; import io.netty5.util.IllegalReferenceCountException; import it.cavallium.dbengine.database.serialization.SerializationException; import it.cavallium.dbengine.database.serialization.SerializationFunction; import it.cavallium.dbengine.lucene.LuceneCloseable; import it.cavallium.dbengine.lucene.LuceneUtils; import it.cavallium.dbengine.lucene.RandomSortField; import java.lang.invoke.MethodHandle; import java.lang.invoke.MethodHandles; import java.lang.invoke.MethodHandles.Lookup; import java.lang.invoke.MethodType; import java.nio.ByteBuffer; import java.nio.charset.Charset; import java.time.Duration; import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import java.util.function.ToIntFunction; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Marker; import org.apache.logging.log4j.MarkerManager; import org.apache.lucene.document.Document; import org.apache.lucene.document.DoublePoint; import org.apache.lucene.document.Field; import org.apache.lucene.document.Field.Store; import org.apache.lucene.document.FloatPoint; import org.apache.lucene.document.IntPoint; import org.apache.lucene.document.LongPoint; import org.apache.lucene.document.NumericDocValuesField; import org.apache.lucene.document.SortedNumericDocValuesField; import org.apache.lucene.document.StoredField; import org.apache.lucene.document.StringField; import org.apache.lucene.document.TextField; import org.apache.lucene.index.Term; import org.apache.lucene.search.ScoreDoc; import org.apache.lucene.search.ScoreMode; import org.apache.lucene.search.Sort; import org.apache.lucene.search.SortField; import org.apache.lucene.search.SortedNumericSortField; import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.BytesRefBuilder; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import org.rocksdb.AbstractImmutableNativeReference; import org.rocksdb.AbstractNativeReference; import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.ReadOptions; import org.rocksdb.RocksDB; import reactor.core.Disposable; import reactor.core.Fuseable.QueueSubscription; import reactor.core.publisher.Flux; import reactor.core.publisher.Hooks; import reactor.core.publisher.Mono; import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Schedulers; @SuppressWarnings("unused") public class LLUtils { private static final Logger logger = LogManager.getLogger(LLUtils.class); public static final Marker MARKER_ROCKSDB = MarkerManager.getMarker("ROCKSDB"); public static final Marker MARKER_LUCENE = MarkerManager.getMarker("LUCENE"); public static final int INITIAL_DIRECT_READ_BYTE_BUF_SIZE_BYTES = 4096; public static final ByteBuffer EMPTY_BYTE_BUFFER = ByteBuffer.allocateDirect(0).asReadOnlyBuffer(); private static final AllocatorControl NO_OP_ALLOCATION_CONTROL = (AllocatorControl) BufferAllocator.offHeapUnpooled(); private static final byte[] RESPONSE_TRUE = new byte[]{1}; private static final byte[] RESPONSE_FALSE = new byte[]{0}; private static final byte[] RESPONSE_TRUE_BUF = new byte[]{1}; private static final byte[] RESPONSE_FALSE_BUF = new byte[]{0}; public static final byte[][] LEXICONOGRAPHIC_ITERATION_SEEKS = new byte[256][1]; public static final AtomicBoolean hookRegistered = new AtomicBoolean(); public static final boolean MANUAL_READAHEAD = false; public static final boolean ALLOW_STATIC_OPTIONS = false; public static final boolean FORCE_DISABLE_CHECKSUM_VERIFICATION = Boolean.parseBoolean(System.getProperty("it.cavallium.dbengine.checksum.disable.force", "false")); public static final boolean DEBUG_ALL_DROPS = Boolean.parseBoolean(System.getProperty("it.cavallium.dbengine.drops.log", "false")); public static final boolean DEBUG_ALL_DISCARDS = Boolean.parseBoolean(System.getProperty("it.cavallium.dbengine.discards.log", "false")); private static final Lookup PUBLIC_LOOKUP = MethodHandles.publicLookup(); private static final MethodHandle IS_ACCESSIBLE_METHOD_HANDLE; static { for (int i1 = 0; i1 < 256; i1++) { var b = LEXICONOGRAPHIC_ITERATION_SEEKS[i1]; b[0] = (byte) i1; } var methodType = MethodType.methodType(boolean.class); MethodHandle isAccessibleMethodHandle = null; try { isAccessibleMethodHandle = PUBLIC_LOOKUP.findVirtual(AbstractNativeReference.class, "isAccessible", methodType); } catch (NoSuchMethodException e) { logger.debug("Failed to find isAccessible(): no such method"); } catch (IllegalAccessException e) { logger.debug("Failed to find isAccessible()", e); } IS_ACCESSIBLE_METHOD_HANDLE = isAccessibleMethodHandle; initHooks(); } public static void initHooks() { if (hookRegistered.compareAndSet(false, true)) { Hooks.onNextDropped(LLUtils::onNextDropped); //todo: add Hooks.onDiscard when it will be implemented // Hooks.onDiscard(LLUtils::onDiscard); } } public static boolean responseToBoolean(byte[] response) { return response[0] == 1; } public static boolean responseToBoolean(Send responseToReceive) { try (var response = responseToReceive.receive()) { assert response.readableBytes() == 1; return response.getByte(response.readerOffset()) == 1; } } public static boolean responseToBoolean(Buffer response) { try (response) { assert response.readableBytes() == 1; return response.getByte(response.readerOffset()) == 1; } } public static byte[] booleanToResponse(boolean bool) { return bool ? RESPONSE_TRUE : RESPONSE_FALSE; } public static Buffer booleanToResponseByteBuffer(BufferAllocator alloc, boolean bool) { return alloc.allocate(1).writeByte(bool ? (byte) 1 : 0); } @Nullable public static Sort toSort(@Nullable LLSort sort) { if (sort == null) { return null; } if (sort.getType() == LLSortType.LONG) { return new Sort(new SortedNumericSortField(sort.getFieldName(), SortField.Type.LONG, sort.isReverse())); } else if (sort.getType() == LLSortType.RANDOM) { return new Sort(new RandomSortField()); } else if (sort.getType() == LLSortType.SCORE) { return new Sort(SortField.FIELD_SCORE); } else if (sort.getType() == LLSortType.DOC) { return new Sort(SortField.FIELD_DOC); } return null; } public static ScoreMode toScoreMode(LLScoreMode scoreMode) { return switch (scoreMode) { case COMPLETE -> ScoreMode.COMPLETE; case TOP_SCORES -> ScoreMode.TOP_SCORES; case COMPLETE_NO_SCORES -> ScoreMode.COMPLETE_NO_SCORES; case NO_SCORES -> ScoreMode.TOP_DOCS; }; } public static Term toTerm(LLTerm term) { var valueRef = new FakeBytesRefBuilder(term); return new Term(term.getKey(), valueRef); } public static Document toDocument(LLUpdateDocument document) { return toDocument(document.items()); } public static Document toDocument(List document) { Document d = new Document(); for (LLItem item : document) { if (item != null) { d.add(LLUtils.toField(item)); } } return d; } public static Field[] toFields(List fields) { Field[] d = new Field[fields.size()]; for (int i = 0; i < fields.size(); i++) { d[i] = LLUtils.toField(fields.get(i)); } return d; } public static Collection toDocuments(Collection document) { List d = new ArrayList<>(document.size()); for (LLUpdateDocument doc : document) { d.add(LLUtils.toDocument(doc)); } return d; } public static Collection toDocumentsFromEntries(Collection> documentsList) { ArrayList results = new ArrayList<>(documentsList.size()); for (Entry entry : documentsList) { results.add(LLUtils.toDocument(entry.getValue())); } return results; } public static Iterable toTerms(Iterable terms) { List d = new ArrayList<>(); for (LLTerm term : terms) { d.add(LLUtils.toTerm(term)); } return d; } private static Field toField(LLItem item) { return switch (item.getType()) { case IntPoint -> new IntPoint(item.getName(), item.intData()); case DoublePoint -> new DoublePoint(item.getName(), item.doubleData()); case IntPointND -> new IntPoint(item.getName(), item.intArrayData()); case LongPoint -> new LongPoint(item.getName(), item.longData()); case LongPointND -> new LongPoint(item.getName(), item.longArrayData()); case FloatPointND -> new FloatPoint(item.getName(), item.floatArrayData()); case DoublePointND -> new DoublePoint(item.getName(), item.doubleArrayData()); case LongStoredField -> new StoredField(item.getName(), item.longData()); case BytesStoredField -> new StoredField(item.getName(), (BytesRef) item.getData()); case FloatPoint -> new FloatPoint(item.getName(), item.floatData()); case TextField -> new TextField(item.getName(), item.stringValue(), Store.NO); case TextFieldStored -> new TextField(item.getName(), item.stringValue(), Store.YES); case SortedNumericDocValuesField -> new SortedNumericDocValuesField(item.getName(), item.longData()); case NumericDocValuesField -> new NumericDocValuesField(item.getName(), item.longData()); case StringField -> { if (item.getData() instanceof BytesRef bytesRef) { yield new StringField(item.getName(), bytesRef, Store.NO); } else { yield new StringField(item.getName(), item.stringValue(), Store.NO); } } case StringFieldStored -> { if (item.getData() instanceof BytesRef bytesRef) { yield new StringField(item.getName(), bytesRef, Store.YES); } else { yield new StringField(item.getName(), item.stringValue(), Store.YES); } } }; } private static int[] getIntArray(byte[] data) { var count = data.length / Integer.BYTES; var items = new int[count]; for (int i = 0; i < items.length; i++) { items[i] = Ints.fromBytes(data[i * Integer.BYTES], data[i * Integer.BYTES + 1], data[i * Integer.BYTES + 2], data[i * Integer.BYTES + 3] ); } return items; } private static long[] getLongArray(byte[] data) { var count = data.length / Long.BYTES; var items = new long[count]; for (int i = 0; i < items.length; i++) { items[i] = Longs.fromBytes(data[i * Long.BYTES], data[i * Long.BYTES + 1], data[i * Long.BYTES + 2], data[i * Long.BYTES + 3], data[i * Long.BYTES + 4], data[i * Long.BYTES + 5], data[i * Long.BYTES + 6], data[i * Long.BYTES + 7] ); } return items; } public static it.cavallium.dbengine.database.LLKeyScore toKeyScore(LLKeyScore hit) { return new it.cavallium.dbengine.database.LLKeyScore(hit.docId(), hit.shardId(), hit.score(), hit.key()); } public static String toStringSafe(@Nullable Buffer key) { try { if (key == null || key.isAccessible()) { return toString(key); } else { return "(released)"; } } catch (IllegalReferenceCountException ex) { return "(released)"; } } public static String toStringSafe(byte @Nullable[] key) { try { if (key == null) { return toString(key); } else { return "(released)"; } } catch (IllegalReferenceCountException ex) { return "(released)"; } } public static String toStringSafe(@Nullable LLRange range) { try { if (range == null || !range.isClosed()) { return toString(range); } else { return "(released)"; } } catch (IllegalReferenceCountException ex) { return "(released)"; } } public static String toString(@Nullable LLRange range) { if (range == null) { return "null"; } else if (range.isAll()) { return "ξ"; } else if (range.hasMin() && range.hasMax()) { return "[" + toStringSafe(range.getMinUnsafe()) + "," + toStringSafe(range.getMaxUnsafe()) + ")"; } else if (range.hasMin()) { return "[" + toStringSafe(range.getMinUnsafe()) + ",*)"; } else if (range.hasMax()) { return "[*," + toStringSafe(range.getMaxUnsafe()) + ")"; } else { return "∅"; } } public static String toString(@Nullable Buffer key) { if (key == null) { return "null"; } else { int startIndex = key.readerOffset(); int iMax = key.readableBytes() - 1; int iLimit = 128; if (iMax <= -1) { return "[]"; } else { StringBuilder arraySB = new StringBuilder(); StringBuilder asciiSB = new StringBuilder(); boolean isAscii = true; arraySB.append('['); int i = 0; while (true) { var byteVal = key.getUnsignedByte(startIndex + i); arraySB.append(byteVal); if (isAscii) { if (byteVal >= 32 && byteVal < 127) { asciiSB.append((char) byteVal); } else if (byteVal == 0) { asciiSB.append('␀'); } else { isAscii = false; asciiSB = null; } } if (i == iLimit) { arraySB.append("…"); } if (i == iMax || i == iLimit) { if (isAscii) { return asciiSB.insert(0, "\"").append("\"").toString(); } else { return arraySB.append(']').toString(); } } arraySB.append(", "); ++i; } } } } public static String toString(byte @Nullable[] key) { if (key == null) { return "null"; } else { int startIndex = 0; int iMax = key.length - 1; int iLimit = 128; if (iMax <= -1) { return "[]"; } else { StringBuilder arraySB = new StringBuilder(); StringBuilder asciiSB = new StringBuilder(); boolean isAscii = true; arraySB.append('['); int i = 0; while (true) { var byteVal = (int) key[startIndex + i]; arraySB.append(byteVal); if (isAscii) { if (byteVal >= 32 && byteVal < 127) { asciiSB.append((char) byteVal); } else if (byteVal == 0) { asciiSB.append('␀'); } else { isAscii = false; asciiSB = null; } } if (i == iLimit) { arraySB.append("…"); } if (i == iMax || i == iLimit) { if (isAscii) { return asciiSB.insert(0, "\"").append("\"").toString(); } else { return arraySB.append(']').toString(); } } arraySB.append(", "); ++i; } } } } public static boolean equals(Buffer a, Buffer b) { if (a == null && b == null) { return true; } else if (a != null && b != null) { var aCur = a.openCursor(); var bCur = b.openCursor(); if (aCur.bytesLeft() != bCur.bytesLeft()) { return false; } while (aCur.readByte() && bCur.readByte()) { if (aCur.getByte() != bCur.getByte()) { return false; } } return true; } else { return false; } } /** * Returns {@code true} if and only if the two specified buffers are identical to each other for {@code length} bytes * starting at {@code aStartIndex} index for the {@code a} buffer and {@code bStartIndex} index for the {@code b} * buffer. A more compact way to express this is: *

* {@code a[aStartIndex : aStartIndex + length] == b[bStartIndex : bStartIndex + length]} */ public static boolean equals(Buffer a, int aStartIndex, Buffer b, int bStartIndex, int length) { var aCur = a.openCursor(aStartIndex, length); var bCur = b.openCursor(bStartIndex, length); if (aCur.bytesLeft() != bCur.bytesLeft()) { return false; } while (aCur.readByte() && bCur.readByte()) { if (aCur.getByte() != bCur.getByte()) { return false; } } return true; } public static byte[] toArray(@Nullable Buffer key) { if (key == null) { return EMPTY_BYTE_ARRAY; } byte[] array = new byte[key.readableBytes()]; key.copyInto(key.readerOffset(), array, 0, key.readableBytes()); return array; } public static List toArray(List input) { List result = new ArrayList<>(input.size()); for (Buffer byteBuf : input) { result.add(toArray(byteBuf)); } return result; } public static int hashCode(Buffer buf) { if (buf == null) { return 0; } int result = 1; var cur = buf.openCursor(); while (cur.readByte()) { var element = cur.getByte(); result = 31 * result + element; } return result; } /** * @return null if size is equal to RocksDB.NOT_FOUND */ @Nullable public static Buffer readNullableDirectNioBuffer(BufferAllocator alloc, ToIntFunction reader) { if (alloc.getAllocationType() != OFF_HEAP) { throw new UnsupportedOperationException("Allocator type is not direct: " + alloc); } var directBuffer = alloc.allocate(INITIAL_DIRECT_READ_BYTE_BUF_SIZE_BYTES); try { assert directBuffer.readerOffset() == 0; assert directBuffer.writerOffset() == 0; var directBufferWriter = ((BufferComponent) directBuffer).writableBuffer(); assert directBufferWriter.position() == 0; assert directBufferWriter.capacity() >= directBuffer.capacity(); assert directBufferWriter.isDirect(); int trueSize = reader.applyAsInt(directBufferWriter); if (trueSize == RocksDB.NOT_FOUND) { directBuffer.close(); return null; } int readSize = directBufferWriter.limit(); if (trueSize < readSize) { throw new IllegalStateException(); } else if (trueSize == readSize) { return directBuffer.writerOffset(directBufferWriter.limit()); } else { assert directBuffer.readerOffset() == 0; directBuffer.ensureWritable(trueSize); assert directBuffer.writerOffset() == 0; directBufferWriter = ((BufferComponent) directBuffer).writableBuffer(); assert directBufferWriter.position() == 0; assert directBufferWriter.isDirect(); reader.applyAsInt(directBufferWriter.position(0)); return directBuffer.writerOffset(trueSize); } } catch (Throwable t) { directBuffer.close(); throw t; } } public static void ensureBlocking() { if (Schedulers.isInNonBlockingThread()) { throw new UnsupportedOperationException("Called collect in a nonblocking thread"); } } // todo: remove this ugly method /** * cleanup resource * @param cleanupOnSuccess if true the resource will be cleaned up if the function is successful */ public static > Mono usingSendResource(Mono> resourceSupplier, Function> resourceClosure, boolean cleanupOnSuccess) { return Mono.usingWhen(resourceSupplier.map(Send::receive), resourceClosure, r -> { if (cleanupOnSuccess) { return Mono.fromRunnable(() -> r.close()); } else { return Mono.empty(); } }, (r, ex) -> Mono.fromRunnable(() -> { if (r.isAccessible()) { r.close(); } }), r -> Mono.fromRunnable(() -> { if (r.isAccessible()) { r.close(); } })); } public static boolean isSet(ScoreDoc[] scoreDocs) { for (ScoreDoc scoreDoc : scoreDocs) { if (scoreDoc == null) { return false; } } return true; } public static Send empty(BufferAllocator allocator) { try { return allocator.allocate(0).send(); } catch (Exception ex) { try (var empty = CompositeBuffer.compose(allocator)) { assert empty.readableBytes() == 0; assert empty.capacity() == 0; return empty.send(); } } } public static Send copy(BufferAllocator allocator, Buffer buf) { if (CompositeBuffer.isComposite(buf) && buf.capacity() == 0) { return empty(allocator); } else { return buf.copy().send(); } } public static boolean isBoundedRange(LLRange rangeShared) { return rangeShared.hasMin() && rangeShared.hasMax(); } /** * Generate a ReadOptions, with some parameters modified to help with bulk iterations * @param readOptions the read options to start with, it will be modified * @param canFillCache true to fill the cache. If closedRange is false, this field will be ignored * @param boundedRange true if the range is bounded from both sides * @param smallRange true if the range is small * @return the passed instance of ReadOptions, or a new one if the passed readOptions is null */ public static ReadOptions generateCustomReadOptions(@Nullable ReadOptions readOptions, boolean canFillCache, boolean boundedRange, boolean smallRange) { if (readOptions == null) { //noinspection resource readOptions = new ReadOptions(); } if (boundedRange || smallRange) { readOptions.setFillCache(canFillCache); } else { if (readOptions.readaheadSize() <= 0) { readOptions.setReadaheadSize(4 * 1024 * 1024); // 4MiB } readOptions.setFillCache(false); readOptions.setVerifyChecksums(false); } if (FORCE_DISABLE_CHECKSUM_VERIFICATION) { readOptions.setVerifyChecksums(false); } return readOptions; } public static Mono finalizeResource(Resource resource) { Mono runnable = Mono.fromRunnable(() -> LLUtils.finalizeResourceNow(resource)); if (resource instanceof LuceneCloseable) { return runnable.transform(LuceneUtils::scheduleLucene); } else { return runnable; } } public static Mono finalizeResource(SafeCloseable resource) { Mono runnable = Mono.fromRunnable(resource::close); if (resource instanceof LuceneCloseable) { return runnable.transform(LuceneUtils::scheduleLucene); } else { return runnable; } } public static void finalizeResourceNow(Resource resource) { if (resource.isAccessible()) { resource.close(); } } public static void finalizeResourceNow(SafeCloseable resource) { resource.close(); } public static Flux handleDiscard(Flux flux) { return flux.doOnDiscard(Object.class, LLUtils::onDiscard); } public static Mono handleDiscard(Mono flux) { return flux.doOnDiscard(Object.class, LLUtils::onDiscard); } /** * Obtain the resource, then run the closure. * If the closure publisher returns a single element, then the resource is kept open, * otherwise it is closed. */ public static Mono singleOrClose(Mono resourceMono, Function> closure) { return Mono.usingWhen(resourceMono, resource -> { if (resource instanceof LuceneCloseable) { return closure.apply(resource).publishOn(luceneScheduler()).doOnSuccess(s -> { if (s == null) { try { resource.close(); } catch (Exception e) { throw new RuntimeException(e); } } }).publishOn(Schedulers.parallel()); } else { return closure.apply(resource).doOnSuccess(s -> { if (s == null) { try { resource.close(); } catch (Exception e) { throw new RuntimeException(e); } } }); } }, resource -> Mono.empty(), (resource, ex) -> Mono.fromCallable(() -> { resource.close(); return null; }), r -> (r instanceof SafeCloseable s) ? LLUtils.finalizeResource(s) : Mono.fromCallable(() -> { r.close(); return null; })); } public static Disposable scheduleRepeated(Scheduler scheduler, Runnable action, Duration delay) { var currentDisposable = new AtomicReference(); var disposed = new AtomicBoolean(false); scheduleRepeatedInternal(scheduler, action, delay, currentDisposable, disposed); return () -> { disposed.set(true); currentDisposable.get().dispose(); }; } private static void scheduleRepeatedInternal(Scheduler scheduler, Runnable action, Duration delay, AtomicReference currentDisposable, AtomicBoolean disposed) { if (disposed.get()) return; currentDisposable.set(scheduler.schedule(() -> { if (disposed.get()) return; try { action.run(); } catch (Throwable ex) { logger.error(ex); } scheduleRepeatedInternal(scheduler, action, delay, currentDisposable, disposed); }, delay.toMillis(), TimeUnit.MILLISECONDS)); } public static boolean isAccessible(AbstractNativeReference abstractNativeReference) { if (IS_ACCESSIBLE_METHOD_HANDLE != null) { try { return (boolean) IS_ACCESSIBLE_METHOD_HANDLE.invoke(abstractNativeReference); } catch (Throwable e) { throw new RuntimeException(e); } } return true; } @Deprecated public record DirectBuffer(@NotNull Buffer buffer, @NotNull ByteBuffer byteBuffer) {} @NotNull public static ByteBuffer newDirect(int size) { return ByteBuffer.allocateDirect(size); } private static Drop drop() { // We cannot reliably drop unsafe memory. We have to rely on the cleaner to do that. return NO_OP_DROP; } public static boolean isReadOnlyDirect(Buffer inputBuffer) { return inputBuffer instanceof BufferComponent component && component.readableNativeAddress() != 0; } public static ByteBuffer getReadOnlyDirect(Buffer inputBuffer) { assert isReadOnlyDirect(inputBuffer); return ((BufferComponent) inputBuffer).readableBuffer(); } public static Buffer fromByteArray(BufferAllocator alloc, byte[] array) { Buffer result = alloc.allocate(array.length); result.writeBytes(array); return result; } @NotNull public static Buffer readDirectNioBuffer(BufferAllocator alloc, ToIntFunction reader) { var nullable = readNullableDirectNioBuffer(alloc, reader); if (nullable == null) { throw new IllegalStateException("A non-nullable buffer read operation tried to return a \"not found\" element"); } return nullable; } public static Buffer compositeBuffer(BufferAllocator alloc, Send buffer) { return buffer.receive(); } @NotNull public static Buffer compositeBuffer(BufferAllocator alloc, @NotNull Send buffer1, @NotNull Send buffer2) { var b1 = buffer1.receive(); try (var b2 = buffer2.receive()) { if (b1.writerOffset() < b1.capacity() || b2.writerOffset() < b2.capacity()) { b1.ensureWritable(b2.readableBytes(), b2.readableBytes(), true); b2.copyInto(b2.readerOffset(), b1, b1.writerOffset(), b2.readableBytes()); b1.writerOffset(b1.writerOffset() + b2.readableBytes()); return b1; } else { return alloc.compose(List.of(b1.send(), b2.send())); } } } @NotNull public static Buffer compositeBuffer(BufferAllocator alloc, @NotNull Send buffer1, @NotNull Send buffer2, @NotNull Send buffer3) { var b1 = buffer1.receive(); try (var b2 = buffer2.receive()) { try (var b3 = buffer3.receive()) { if (b1.writerOffset() < b1.capacity() || b2.writerOffset() < b2.capacity() || b3.writerOffset() < b3.capacity()) { b1.ensureWritable(b2.readableBytes(), b2.readableBytes(), true); b2.copyInto(b2.readerOffset(), b1, b1.writerOffset(), b2.readableBytes()); b1.writerOffset(b1.writerOffset() + b2.readableBytes()); b1.ensureWritable(b3.readableBytes(), b3.readableBytes(), true); b3.copyInto(b3.readerOffset(), b1, b1.writerOffset(), b3.readableBytes()); b1.writerOffset(b1.writerOffset() + b3.readableBytes()); return b1; } else { return alloc.compose(List.of(b1.send(), b2.send(), b3.send())); } } } } public static Mono resolveDelta(Mono> prev, UpdateReturnMode updateReturnMode) { return prev.handle((delta, sink) -> { switch (updateReturnMode) { case GET_NEW_VALUE -> { var current = delta.current(); if (current != null) { sink.next(current); } else { sink.complete(); } } case GET_OLD_VALUE -> { var previous = delta.previous(); if (previous != null) { sink.next(previous); } else { sink.complete(); } } case NOTHING -> sink.complete(); default -> sink.error(new IllegalStateException()); } }); } public static Mono resolveLLDelta(Mono prev, UpdateReturnMode updateReturnMode) { return prev.mapNotNull(delta -> { final Buffer previous = delta.previousUnsafe(); final Buffer current = delta.currentUnsafe(); return switch (updateReturnMode) { case GET_NEW_VALUE -> { if (previous != null && previous.isAccessible()) { previous.close(); } yield current; } case GET_OLD_VALUE -> { if (current != null && current.isAccessible()) { current.close(); } yield previous; } case NOTHING -> { if (previous != null && previous.isAccessible()) { previous.close(); } if (current != null && current.isAccessible()) { current.close(); } yield null; } }; }); } public static Mono> mapDelta(Mono> mono, SerializationFunction<@NotNull T, @Nullable U> mapper) { return mono.handle((delta, sink) -> { try { T prev = delta.previous(); T curr = delta.current(); U newPrev; U newCurr; if (prev != null) { newPrev = mapper.apply(prev); } else { newPrev = null; } if (curr != null) { newCurr = mapper.apply(curr); } else { newCurr = null; } sink.next(new Delta<>(newPrev, newCurr)); } catch (SerializationException ex) { sink.error(ex); } }); } public static Mono> mapLLDelta(Mono mono, SerializationFunction<@NotNull Buffer, @Nullable U> mapper) { return Mono.usingWhen(mono, delta -> Mono.fromCallable(() -> { Buffer prev = delta.previousUnsafe(); Buffer curr = delta.currentUnsafe(); U newPrev; U newCurr; if (prev != null) { newPrev = mapper.apply(prev); } else { newPrev = null; } if (curr != null) { newCurr = mapper.apply(curr); } else { newCurr = null; } return new Delta<>(newPrev, newCurr); }), LLUtils::finalizeResource); } public static boolean isDeltaChanged(Delta delta) { return !Objects.equals(delta.previous(), delta.current()); } public static boolean isDirect(Buffer key) { var readableComponents = key.countReadableComponents(); if (readableComponents == 0) { return true; } else if (readableComponents == 1) { return key.isDirect(); } else { return false; } } public static String deserializeString(Send bufferSend, int readerOffset, int length, Charset charset) { try (var buffer = bufferSend.receive()) { byte[] bytes = new byte[Math.min(length, buffer.readableBytes())]; buffer.copyInto(readerOffset, bytes, 0, length); return new String(bytes, charset); } } public static String deserializeString(@NotNull Buffer buffer, int readerOffset, int length, Charset charset) { byte[] bytes = new byte[Math.min(length, buffer.readableBytes())]; buffer.copyInto(readerOffset, bytes, 0, length); return new String(bytes, charset); } public static int utf8MaxBytes(String deserialized) { return deserialized.length() * 3; } private static void onNextDropped(Object next) { if (DEBUG_ALL_DROPS) { logger.trace("Dropped: {}", () -> next.getClass().getName()); } closeResource(next, false); } public static void onDiscard(Object next) { if (DEBUG_ALL_DISCARDS) { logger.trace("Discarded: {}", () -> next.getClass().getName()); } closeResource(next, false); } public static void closeResource(Object next) { closeResource(next, true); } private static void closeResource(Object next, boolean manual) { if (next instanceof Send send) { send.close(); } if (next instanceof SafeCloseable closeable) { if (manual || closeable instanceof DiscardingCloseable) { if (!manual && !LuceneUtils.isLuceneThread() && closeable instanceof LuceneCloseable luceneCloseable) { luceneScheduler().schedule(() -> luceneCloseable.close()); } else { closeable.close(); } } } else if (next instanceof Resource resource && resource.isAccessible()) { resource.close(); } else if (next instanceof List iterable) { iterable.forEach(obj -> closeResource(obj, manual)); } else if (next instanceof Set iterable) { iterable.forEach(obj -> closeResource(obj, manual)); } else if (next instanceof AbstractImmutableNativeReference rocksObj) { if (rocksObj.isOwningHandle()) { rocksObj.close(); } } else if (next instanceof Optional optional) { optional.ifPresent(obj -> closeResource(obj, manual)); } else if (next instanceof Map.Entry entry) { var key = entry.getKey(); if (key != null) { closeResource(key, manual); } var value = entry.getValue(); if (value != null) { closeResource(value, manual); } } else if (next instanceof Delta delta) { var previous = delta.previous(); if (previous != null) { closeResource(previous, manual); } var current = delta.current(); if (current != null) { closeResource(current, manual); } } else if (next instanceof Map map) { map.forEach((key, value) -> { if (key != null) { closeResource(key, manual); } if (value != null) { closeResource(value, manual); } }); } } private static class FakeBytesRefBuilder extends BytesRefBuilder { private final LLTerm term; public FakeBytesRefBuilder(LLTerm term) { this.term = term; } @Override public BytesRef toBytesRef() { return term.getValueBytesRef(); } } }