2020-12-07 22:15:18 +01:00
|
|
|
package it.cavallium.dbengine.database;
|
|
|
|
|
2022-10-02 03:09:50 +02:00
|
|
|
import static io.netty5.buffer.StandardAllocationTypes.OFF_HEAP;
|
|
|
|
import static io.netty5.buffer.internal.InternalBufferUtils.NO_OP_DROP;
|
2022-07-23 02:42:48 +02:00
|
|
|
import static it.cavallium.dbengine.lucene.LuceneUtils.luceneScheduler;
|
2021-09-23 02:15:58 +02:00
|
|
|
import static org.apache.commons.lang3.ArrayUtils.EMPTY_BYTE_ARRAY;
|
|
|
|
|
2020-12-07 22:15:18 +01:00
|
|
|
import com.google.common.primitives.Ints;
|
|
|
|
import com.google.common.primitives.Longs;
|
2022-10-02 03:09:50 +02:00
|
|
|
import io.netty5.buffer.AllocatorControl;
|
|
|
|
import io.netty5.buffer.Buffer;
|
|
|
|
import io.netty5.buffer.BufferAllocator;
|
|
|
|
import io.netty5.buffer.BufferComponent;
|
|
|
|
import io.netty5.buffer.CompositeBuffer;
|
|
|
|
import io.netty5.buffer.Drop;
|
2022-07-15 02:44:50 +02:00
|
|
|
import io.netty5.util.Resource;
|
|
|
|
import io.netty5.util.Send;
|
2022-03-16 13:47:56 +01:00
|
|
|
import io.netty5.util.IllegalReferenceCountException;
|
2021-08-22 21:23:22 +02:00
|
|
|
import it.cavallium.dbengine.database.serialization.SerializationException;
|
|
|
|
import it.cavallium.dbengine.database.serialization.SerializationFunction;
|
2022-07-23 02:42:48 +02:00
|
|
|
import it.cavallium.dbengine.lucene.LuceneCloseable;
|
2022-07-23 14:25:59 +02:00
|
|
|
import it.cavallium.dbengine.lucene.LuceneUtils;
|
2021-01-30 22:14:48 +01:00
|
|
|
import it.cavallium.dbengine.lucene.RandomSortField;
|
2022-07-29 00:32:08 +02:00
|
|
|
import java.lang.invoke.MethodHandle;
|
|
|
|
import java.lang.invoke.MethodHandles;
|
|
|
|
import java.lang.invoke.MethodHandles.Lookup;
|
|
|
|
import java.lang.invoke.MethodType;
|
2020-12-07 22:15:18 +01:00
|
|
|
import java.nio.ByteBuffer;
|
2021-08-29 23:18:03 +02:00
|
|
|
import java.nio.charset.Charset;
|
2022-07-28 23:41:10 +02:00
|
|
|
import java.time.Duration;
|
2021-04-30 19:15:04 +02:00
|
|
|
import java.util.ArrayList;
|
2021-05-28 16:04:59 +02:00
|
|
|
import java.util.Collection;
|
2020-12-07 22:15:18 +01:00
|
|
|
import java.util.List;
|
2021-08-22 21:23:22 +02:00
|
|
|
import java.util.Map;
|
2021-05-28 16:04:59 +02:00
|
|
|
import java.util.Map.Entry;
|
2021-05-08 03:09:00 +02:00
|
|
|
import java.util.Objects;
|
2021-08-28 22:42:51 +02:00
|
|
|
import java.util.Optional;
|
2022-10-24 01:17:08 +02:00
|
|
|
import java.util.Set;
|
2022-07-28 23:41:10 +02:00
|
|
|
import java.util.concurrent.TimeUnit;
|
2022-01-26 14:22:54 +01:00
|
|
|
import java.util.concurrent.atomic.AtomicBoolean;
|
2022-07-28 23:44:23 +02:00
|
|
|
import java.util.concurrent.atomic.AtomicReference;
|
2021-09-19 12:01:11 +02:00
|
|
|
import java.util.function.Function;
|
2021-04-30 19:15:04 +02:00
|
|
|
import java.util.function.ToIntFunction;
|
2021-12-17 01:48:49 +01:00
|
|
|
import org.apache.logging.log4j.LogManager;
|
|
|
|
import org.apache.logging.log4j.Logger;
|
|
|
|
import org.apache.logging.log4j.Marker;
|
|
|
|
import org.apache.logging.log4j.MarkerManager;
|
2020-12-07 22:15:18 +01:00
|
|
|
import org.apache.lucene.document.Document;
|
2022-02-22 02:10:36 +01:00
|
|
|
import org.apache.lucene.document.DoublePoint;
|
2020-12-07 22:15:18 +01:00
|
|
|
import org.apache.lucene.document.Field;
|
2022-02-22 02:10:36 +01:00
|
|
|
import org.apache.lucene.document.Field.Store;
|
2020-12-07 22:15:18 +01:00
|
|
|
import org.apache.lucene.document.FloatPoint;
|
|
|
|
import org.apache.lucene.document.IntPoint;
|
|
|
|
import org.apache.lucene.document.LongPoint;
|
2021-11-20 16:09:00 +01:00
|
|
|
import org.apache.lucene.document.NumericDocValuesField;
|
2020-12-07 22:15:18 +01:00
|
|
|
import org.apache.lucene.document.SortedNumericDocValuesField;
|
2021-11-19 19:03:31 +01:00
|
|
|
import org.apache.lucene.document.StoredField;
|
2020-12-07 22:15:18 +01:00
|
|
|
import org.apache.lucene.document.StringField;
|
|
|
|
import org.apache.lucene.document.TextField;
|
|
|
|
import org.apache.lucene.index.Term;
|
2021-09-22 11:03:39 +02:00
|
|
|
import org.apache.lucene.search.ScoreDoc;
|
2021-01-29 17:19:01 +01:00
|
|
|
import org.apache.lucene.search.ScoreMode;
|
2020-12-07 22:15:18 +01:00
|
|
|
import org.apache.lucene.search.Sort;
|
|
|
|
import org.apache.lucene.search.SortField;
|
|
|
|
import org.apache.lucene.search.SortedNumericSortField;
|
2022-02-09 20:01:26 +01:00
|
|
|
import org.apache.lucene.util.BytesRef;
|
|
|
|
import org.apache.lucene.util.BytesRefBuilder;
|
2021-04-30 19:15:04 +02:00
|
|
|
import org.jetbrains.annotations.NotNull;
|
2020-12-07 22:15:18 +01:00
|
|
|
import org.jetbrains.annotations.Nullable;
|
2022-05-20 10:20:00 +02:00
|
|
|
import org.rocksdb.AbstractImmutableNativeReference;
|
2022-07-29 00:32:08 +02:00
|
|
|
import org.rocksdb.AbstractNativeReference;
|
|
|
|
import org.rocksdb.ColumnFamilyHandle;
|
2022-03-19 16:36:59 +01:00
|
|
|
import org.rocksdb.ReadOptions;
|
2021-04-30 19:15:04 +02:00
|
|
|
import org.rocksdb.RocksDB;
|
2022-07-28 23:41:10 +02:00
|
|
|
import reactor.core.Disposable;
|
2022-10-18 18:01:45 +02:00
|
|
|
import reactor.core.Fuseable.QueueSubscription;
|
2021-08-22 21:23:22 +02:00
|
|
|
import reactor.core.publisher.Flux;
|
2022-01-26 14:22:54 +01:00
|
|
|
import reactor.core.publisher.Hooks;
|
2021-05-08 03:09:00 +02:00
|
|
|
import reactor.core.publisher.Mono;
|
2022-07-28 23:41:10 +02:00
|
|
|
import reactor.core.scheduler.Scheduler;
|
2021-09-18 18:34:21 +02:00
|
|
|
import reactor.core.scheduler.Schedulers;
|
2021-04-30 19:15:04 +02:00
|
|
|
|
2021-01-29 17:19:01 +01:00
|
|
|
@SuppressWarnings("unused")
|
2020-12-07 22:15:18 +01:00
|
|
|
public class LLUtils {
|
|
|
|
|
2021-12-17 01:48:49 +01:00
|
|
|
private static final Logger logger = LogManager.getLogger(LLUtils.class);
|
|
|
|
public static final Marker MARKER_ROCKSDB = MarkerManager.getMarker("ROCKSDB");
|
|
|
|
public static final Marker MARKER_LUCENE = MarkerManager.getMarker("LUCENE");
|
2021-08-28 22:42:51 +02:00
|
|
|
|
2021-12-12 02:17:36 +01:00
|
|
|
public static final int INITIAL_DIRECT_READ_BYTE_BUF_SIZE_BYTES = 4096;
|
|
|
|
public static final ByteBuffer EMPTY_BYTE_BUFFER = ByteBuffer.allocateDirect(0).asReadOnlyBuffer();
|
|
|
|
private static final AllocatorControl NO_OP_ALLOCATION_CONTROL = (AllocatorControl) BufferAllocator.offHeapUnpooled();
|
2020-12-07 22:15:18 +01:00
|
|
|
private static final byte[] RESPONSE_TRUE = new byte[]{1};
|
|
|
|
private static final byte[] RESPONSE_FALSE = new byte[]{0};
|
2021-04-30 19:15:04 +02:00
|
|
|
private static final byte[] RESPONSE_TRUE_BUF = new byte[]{1};
|
|
|
|
private static final byte[] RESPONSE_FALSE_BUF = new byte[]{0};
|
2021-03-18 19:53:32 +01:00
|
|
|
public static final byte[][] LEXICONOGRAPHIC_ITERATION_SEEKS = new byte[256][1];
|
2022-01-26 14:22:54 +01:00
|
|
|
public static final AtomicBoolean hookRegistered = new AtomicBoolean();
|
2022-03-20 14:33:27 +01:00
|
|
|
public static final boolean MANUAL_READAHEAD = false;
|
2022-05-12 19:14:27 +02:00
|
|
|
public static final boolean ALLOW_STATIC_OPTIONS = false;
|
2021-03-18 19:53:32 +01:00
|
|
|
|
2022-04-15 16:49:01 +02:00
|
|
|
public static final boolean FORCE_DISABLE_CHECKSUM_VERIFICATION
|
|
|
|
= Boolean.parseBoolean(System.getProperty("it.cavallium.dbengine.checksum.disable.force", "false"));
|
|
|
|
|
2022-05-21 22:41:48 +02:00
|
|
|
public static final boolean DEBUG_ALL_DROPS
|
|
|
|
= Boolean.parseBoolean(System.getProperty("it.cavallium.dbengine.drops.log", "false"));
|
|
|
|
public static final boolean DEBUG_ALL_DISCARDS
|
|
|
|
= Boolean.parseBoolean(System.getProperty("it.cavallium.dbengine.discards.log", "false"));
|
|
|
|
|
2022-07-29 00:32:08 +02:00
|
|
|
private static final Lookup PUBLIC_LOOKUP = MethodHandles.publicLookup();
|
|
|
|
|
|
|
|
private static final MethodHandle IS_ACCESSIBLE_METHOD_HANDLE;
|
|
|
|
|
2021-03-18 19:53:32 +01:00
|
|
|
static {
|
|
|
|
for (int i1 = 0; i1 < 256; i1++) {
|
|
|
|
var b = LEXICONOGRAPHIC_ITERATION_SEEKS[i1];
|
|
|
|
b[0] = (byte) i1;
|
|
|
|
}
|
2022-07-29 00:32:08 +02:00
|
|
|
var methodType = MethodType.methodType(boolean.class);
|
|
|
|
MethodHandle isAccessibleMethodHandle = null;
|
|
|
|
try {
|
|
|
|
isAccessibleMethodHandle = PUBLIC_LOOKUP.findVirtual(AbstractNativeReference.class, "isAccessible", methodType);
|
|
|
|
} catch (NoSuchMethodException | IllegalAccessException e) {
|
|
|
|
logger.debug("Failed to find isAccessible()", e);
|
|
|
|
}
|
|
|
|
IS_ACCESSIBLE_METHOD_HANDLE = isAccessibleMethodHandle;
|
2022-01-26 14:22:54 +01:00
|
|
|
initHooks();
|
|
|
|
}
|
|
|
|
|
|
|
|
public static void initHooks() {
|
|
|
|
if (hookRegistered.compareAndSet(false, true)) {
|
|
|
|
Hooks.onNextDropped(LLUtils::onNextDropped);
|
2022-10-24 01:17:08 +02:00
|
|
|
//todo: add Hooks.onDiscard when it will be implemented
|
|
|
|
// Hooks.onDiscard(LLUtils::onDiscard);
|
2022-01-26 14:22:54 +01:00
|
|
|
}
|
2021-03-18 19:53:32 +01:00
|
|
|
}
|
2020-12-07 22:15:18 +01:00
|
|
|
|
|
|
|
public static boolean responseToBoolean(byte[] response) {
|
|
|
|
return response[0] == 1;
|
|
|
|
}
|
|
|
|
|
2021-08-31 09:14:46 +02:00
|
|
|
public static boolean responseToBoolean(Send<Buffer> responseToReceive) {
|
|
|
|
try (var response = responseToReceive.receive()) {
|
2021-04-30 19:15:04 +02:00
|
|
|
assert response.readableBytes() == 1;
|
2021-08-29 23:18:03 +02:00
|
|
|
return response.getByte(response.readerOffset()) == 1;
|
2021-04-30 19:15:04 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-05-20 10:20:00 +02:00
|
|
|
public static boolean responseToBoolean(Buffer response) {
|
|
|
|
try (response) {
|
|
|
|
assert response.readableBytes() == 1;
|
|
|
|
return response.getByte(response.readerOffset()) == 1;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-12-07 22:15:18 +01:00
|
|
|
public static byte[] booleanToResponse(boolean bool) {
|
|
|
|
return bool ? RESPONSE_TRUE : RESPONSE_FALSE;
|
|
|
|
}
|
|
|
|
|
2022-05-20 10:20:00 +02:00
|
|
|
public static Buffer booleanToResponseByteBuffer(BufferAllocator alloc, boolean bool) {
|
|
|
|
return alloc.allocate(1).writeByte(bool ? (byte) 1 : 0);
|
2021-04-30 19:15:04 +02:00
|
|
|
}
|
|
|
|
|
2020-12-07 22:15:18 +01:00
|
|
|
@Nullable
|
|
|
|
public static Sort toSort(@Nullable LLSort sort) {
|
|
|
|
if (sort == null) {
|
|
|
|
return null;
|
|
|
|
}
|
|
|
|
if (sort.getType() == LLSortType.LONG) {
|
|
|
|
return new Sort(new SortedNumericSortField(sort.getFieldName(), SortField.Type.LONG, sort.isReverse()));
|
|
|
|
} else if (sort.getType() == LLSortType.RANDOM) {
|
|
|
|
return new Sort(new RandomSortField());
|
2021-02-04 22:42:57 +01:00
|
|
|
} else if (sort.getType() == LLSortType.SCORE) {
|
|
|
|
return new Sort(SortField.FIELD_SCORE);
|
|
|
|
} else if (sort.getType() == LLSortType.DOC) {
|
|
|
|
return new Sort(SortField.FIELD_DOC);
|
2020-12-07 22:15:18 +01:00
|
|
|
}
|
|
|
|
return null;
|
|
|
|
}
|
|
|
|
|
2021-01-29 17:19:01 +01:00
|
|
|
public static ScoreMode toScoreMode(LLScoreMode scoreMode) {
|
2021-08-22 21:23:22 +02:00
|
|
|
return switch (scoreMode) {
|
|
|
|
case COMPLETE -> ScoreMode.COMPLETE;
|
|
|
|
case TOP_SCORES -> ScoreMode.TOP_SCORES;
|
|
|
|
case COMPLETE_NO_SCORES -> ScoreMode.COMPLETE_NO_SCORES;
|
2021-10-13 00:23:56 +02:00
|
|
|
case NO_SCORES -> ScoreMode.TOP_DOCS;
|
2021-08-22 21:23:22 +02:00
|
|
|
};
|
2021-01-29 17:19:01 +01:00
|
|
|
}
|
|
|
|
|
2020-12-07 22:15:18 +01:00
|
|
|
public static Term toTerm(LLTerm term) {
|
2022-02-09 20:22:32 +01:00
|
|
|
var valueRef = new FakeBytesRefBuilder(term);
|
2022-02-09 20:01:26 +01:00
|
|
|
return new Term(term.getKey(), valueRef);
|
2020-12-07 22:15:18 +01:00
|
|
|
}
|
|
|
|
|
2021-11-07 17:46:40 +01:00
|
|
|
public static Document toDocument(LLUpdateDocument document) {
|
2021-11-07 18:34:34 +01:00
|
|
|
return toDocument(document.items());
|
|
|
|
}
|
|
|
|
|
2022-03-18 19:16:06 +01:00
|
|
|
public static Document toDocument(List<LLItem> document) {
|
2020-12-07 22:15:18 +01:00
|
|
|
Document d = new Document();
|
2021-11-07 18:34:34 +01:00
|
|
|
for (LLItem item : document) {
|
2021-11-24 16:39:22 +01:00
|
|
|
if (item != null) {
|
|
|
|
d.add(LLUtils.toField(item));
|
|
|
|
}
|
2020-12-07 22:15:18 +01:00
|
|
|
}
|
|
|
|
return d;
|
|
|
|
}
|
|
|
|
|
2022-03-18 19:16:06 +01:00
|
|
|
public static Field[] toFields(List<LLItem> fields) {
|
|
|
|
Field[] d = new Field[fields.size()];
|
|
|
|
for (int i = 0; i < fields.size(); i++) {
|
|
|
|
d[i] = LLUtils.toField(fields.get(i));
|
2021-11-07 17:46:40 +01:00
|
|
|
}
|
|
|
|
return d;
|
|
|
|
}
|
|
|
|
|
|
|
|
public static Collection<Document> toDocuments(Collection<LLUpdateDocument> document) {
|
2021-05-28 16:04:59 +02:00
|
|
|
List<Document> d = new ArrayList<>(document.size());
|
2021-11-07 17:46:40 +01:00
|
|
|
for (LLUpdateDocument doc : document) {
|
2020-12-07 22:15:18 +01:00
|
|
|
d.add(LLUtils.toDocument(doc));
|
|
|
|
}
|
|
|
|
return d;
|
|
|
|
}
|
|
|
|
|
2021-11-07 17:46:40 +01:00
|
|
|
public static Collection<Document> toDocumentsFromEntries(Collection<Entry<LLTerm, LLUpdateDocument>> documentsList) {
|
2021-05-28 16:04:59 +02:00
|
|
|
ArrayList<Document> results = new ArrayList<>(documentsList.size());
|
2021-11-07 17:46:40 +01:00
|
|
|
for (Entry<LLTerm, LLUpdateDocument> entry : documentsList) {
|
2021-05-28 16:04:59 +02:00
|
|
|
results.add(LLUtils.toDocument(entry.getValue()));
|
|
|
|
}
|
|
|
|
return results;
|
|
|
|
}
|
|
|
|
|
2020-12-07 22:15:18 +01:00
|
|
|
public static Iterable<Term> toTerms(Iterable<LLTerm> terms) {
|
2021-05-28 16:04:59 +02:00
|
|
|
List<Term> d = new ArrayList<>();
|
2020-12-07 22:15:18 +01:00
|
|
|
for (LLTerm term : terms) {
|
|
|
|
d.add(LLUtils.toTerm(term));
|
|
|
|
}
|
|
|
|
return d;
|
|
|
|
}
|
|
|
|
|
2021-11-07 17:46:40 +01:00
|
|
|
private static Field toField(LLItem item) {
|
2021-08-22 21:23:22 +02:00
|
|
|
return switch (item.getType()) {
|
2022-02-22 02:10:36 +01:00
|
|
|
case IntPoint -> new IntPoint(item.getName(), item.intData());
|
|
|
|
case DoublePoint -> new DoublePoint(item.getName(), item.doubleData());
|
|
|
|
case IntPointND -> new IntPoint(item.getName(), item.intArrayData());
|
|
|
|
case LongPoint -> new LongPoint(item.getName(), item.longData());
|
|
|
|
case LongPointND -> new LongPoint(item.getName(), item.longArrayData());
|
|
|
|
case FloatPointND -> new FloatPoint(item.getName(), item.floatArrayData());
|
|
|
|
case DoublePointND -> new DoublePoint(item.getName(), item.doubleArrayData());
|
|
|
|
case LongStoredField -> new StoredField(item.getName(), item.longData());
|
2022-02-25 15:46:32 +01:00
|
|
|
case BytesStoredField -> new StoredField(item.getName(), (BytesRef) item.getData());
|
2022-02-22 02:10:36 +01:00
|
|
|
case FloatPoint -> new FloatPoint(item.getName(), item.floatData());
|
|
|
|
case TextField -> new TextField(item.getName(), item.stringValue(), Store.NO);
|
|
|
|
case TextFieldStored -> new TextField(item.getName(), item.stringValue(), Store.YES);
|
|
|
|
case SortedNumericDocValuesField -> new SortedNumericDocValuesField(item.getName(), item.longData());
|
|
|
|
case NumericDocValuesField -> new NumericDocValuesField(item.getName(), item.longData());
|
2022-06-04 19:18:51 +02:00
|
|
|
case StringField -> {
|
|
|
|
if (item.getData() instanceof BytesRef bytesRef) {
|
|
|
|
yield new StringField(item.getName(), bytesRef, Store.NO);
|
|
|
|
} else {
|
|
|
|
yield new StringField(item.getName(), item.stringValue(), Store.NO);
|
|
|
|
}
|
|
|
|
}
|
2022-03-15 11:46:00 +01:00
|
|
|
case StringFieldStored -> {
|
|
|
|
if (item.getData() instanceof BytesRef bytesRef) {
|
|
|
|
yield new StringField(item.getName(), bytesRef, Store.YES);
|
|
|
|
} else {
|
|
|
|
yield new StringField(item.getName(), item.stringValue(), Store.YES);
|
|
|
|
}
|
|
|
|
}
|
2021-08-22 21:23:22 +02:00
|
|
|
};
|
2020-12-07 22:15:18 +01:00
|
|
|
}
|
|
|
|
|
2022-02-06 19:29:23 +01:00
|
|
|
private static int[] getIntArray(byte[] data) {
|
|
|
|
var count = data.length / Integer.BYTES;
|
|
|
|
var items = new int[count];
|
|
|
|
for (int i = 0; i < items.length; i++) {
|
|
|
|
items[i] = Ints.fromBytes(data[i * Integer.BYTES],
|
|
|
|
data[i * Integer.BYTES + 1],
|
|
|
|
data[i * Integer.BYTES + 2],
|
|
|
|
data[i * Integer.BYTES + 3]
|
|
|
|
);
|
|
|
|
}
|
|
|
|
return items;
|
|
|
|
}
|
|
|
|
|
|
|
|
private static long[] getLongArray(byte[] data) {
|
|
|
|
var count = data.length / Long.BYTES;
|
|
|
|
var items = new long[count];
|
|
|
|
for (int i = 0; i < items.length; i++) {
|
|
|
|
items[i] = Longs.fromBytes(data[i * Long.BYTES],
|
|
|
|
data[i * Long.BYTES + 1],
|
|
|
|
data[i * Long.BYTES + 2],
|
|
|
|
data[i * Long.BYTES + 3],
|
|
|
|
data[i * Long.BYTES + 4],
|
|
|
|
data[i * Long.BYTES + 5],
|
|
|
|
data[i * Long.BYTES + 6],
|
|
|
|
data[i * Long.BYTES + 7]
|
|
|
|
);
|
|
|
|
}
|
|
|
|
return items;
|
|
|
|
}
|
|
|
|
|
2020-12-07 22:15:18 +01:00
|
|
|
public static it.cavallium.dbengine.database.LLKeyScore toKeyScore(LLKeyScore hit) {
|
2022-03-15 11:46:00 +01:00
|
|
|
return new it.cavallium.dbengine.database.LLKeyScore(hit.docId(), hit.shardId(), hit.score(), hit.key());
|
2020-12-07 22:15:18 +01:00
|
|
|
}
|
2021-04-30 19:15:04 +02:00
|
|
|
|
2021-09-23 02:15:58 +02:00
|
|
|
public static String toStringSafe(@Nullable Buffer key) {
|
2021-05-12 21:41:47 +02:00
|
|
|
try {
|
2021-09-23 02:15:58 +02:00
|
|
|
if (key == null || key.isAccessible()) {
|
2021-05-12 21:41:47 +02:00
|
|
|
return toString(key);
|
|
|
|
} else {
|
|
|
|
return "(released)";
|
|
|
|
}
|
|
|
|
} catch (IllegalReferenceCountException ex) {
|
|
|
|
return "(released)";
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-10-19 00:22:05 +02:00
|
|
|
public static String toStringSafe(byte @Nullable[] key) {
|
|
|
|
try {
|
|
|
|
if (key == null) {
|
|
|
|
return toString(key);
|
|
|
|
} else {
|
|
|
|
return "(released)";
|
|
|
|
}
|
|
|
|
} catch (IllegalReferenceCountException ex) {
|
|
|
|
return "(released)";
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-09-23 11:30:44 +02:00
|
|
|
public static String toStringSafe(@Nullable LLRange range) {
|
|
|
|
try {
|
2022-07-19 23:45:39 +02:00
|
|
|
if (range == null || !range.isClosed()) {
|
2021-09-23 11:30:44 +02:00
|
|
|
return toString(range);
|
|
|
|
} else {
|
|
|
|
return "(released)";
|
|
|
|
}
|
|
|
|
} catch (IllegalReferenceCountException ex) {
|
|
|
|
return "(released)";
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
public static String toString(@Nullable LLRange range) {
|
|
|
|
if (range == null) {
|
|
|
|
return "null";
|
|
|
|
} else if (range.isAll()) {
|
|
|
|
return "ξ";
|
|
|
|
} else if (range.hasMin() && range.hasMax()) {
|
|
|
|
return "[" + toStringSafe(range.getMinUnsafe()) + "," + toStringSafe(range.getMaxUnsafe()) + ")";
|
|
|
|
} else if (range.hasMin()) {
|
|
|
|
return "[" + toStringSafe(range.getMinUnsafe()) + ",*)";
|
|
|
|
} else if (range.hasMax()) {
|
|
|
|
return "[*," + toStringSafe(range.getMaxUnsafe()) + ")";
|
|
|
|
} else {
|
|
|
|
return "∅";
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-09-23 02:15:58 +02:00
|
|
|
public static String toString(@Nullable Buffer key) {
|
2021-04-30 19:15:04 +02:00
|
|
|
if (key == null) {
|
|
|
|
return "null";
|
|
|
|
} else {
|
2021-08-29 23:18:03 +02:00
|
|
|
int startIndex = key.readerOffset();
|
2021-04-30 19:15:04 +02:00
|
|
|
int iMax = key.readableBytes() - 1;
|
|
|
|
int iLimit = 128;
|
|
|
|
if (iMax <= -1) {
|
|
|
|
return "[]";
|
|
|
|
} else {
|
2021-09-23 02:15:58 +02:00
|
|
|
StringBuilder arraySB = new StringBuilder();
|
|
|
|
StringBuilder asciiSB = new StringBuilder();
|
|
|
|
boolean isAscii = true;
|
|
|
|
arraySB.append('[');
|
2021-04-30 19:15:04 +02:00
|
|
|
int i = 0;
|
|
|
|
|
2021-09-02 17:15:40 +02:00
|
|
|
while (true) {
|
2021-09-23 02:15:58 +02:00
|
|
|
var byteVal = key.getUnsignedByte(startIndex + i);
|
|
|
|
arraySB.append(byteVal);
|
|
|
|
if (isAscii) {
|
|
|
|
if (byteVal >= 32 && byteVal < 127) {
|
|
|
|
asciiSB.append((char) byteVal);
|
2021-09-23 11:30:44 +02:00
|
|
|
} else if (byteVal == 0) {
|
|
|
|
asciiSB.append('␀');
|
2021-09-23 02:15:58 +02:00
|
|
|
} else {
|
|
|
|
isAscii = false;
|
|
|
|
asciiSB = null;
|
|
|
|
}
|
|
|
|
}
|
2021-04-30 19:15:04 +02:00
|
|
|
if (i == iLimit) {
|
2021-09-23 02:15:58 +02:00
|
|
|
arraySB.append("…");
|
2021-04-30 19:15:04 +02:00
|
|
|
}
|
|
|
|
if (i == iMax || i == iLimit) {
|
2021-09-23 02:15:58 +02:00
|
|
|
if (isAscii) {
|
|
|
|
return asciiSB.insert(0, "\"").append("\"").toString();
|
|
|
|
} else {
|
|
|
|
return arraySB.append(']').toString();
|
|
|
|
}
|
2021-04-30 19:15:04 +02:00
|
|
|
}
|
|
|
|
|
2021-09-23 02:15:58 +02:00
|
|
|
arraySB.append(", ");
|
2021-04-30 19:15:04 +02:00
|
|
|
++i;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-10-19 00:22:05 +02:00
|
|
|
public static String toString(byte @Nullable[] key) {
|
|
|
|
if (key == null) {
|
|
|
|
return "null";
|
|
|
|
} else {
|
|
|
|
int startIndex = 0;
|
|
|
|
int iMax = key.length - 1;
|
|
|
|
int iLimit = 128;
|
|
|
|
if (iMax <= -1) {
|
|
|
|
return "[]";
|
|
|
|
} else {
|
|
|
|
StringBuilder arraySB = new StringBuilder();
|
|
|
|
StringBuilder asciiSB = new StringBuilder();
|
|
|
|
boolean isAscii = true;
|
|
|
|
arraySB.append('[');
|
|
|
|
int i = 0;
|
|
|
|
|
|
|
|
while (true) {
|
|
|
|
var byteVal = (int) key[startIndex + i];
|
|
|
|
arraySB.append(byteVal);
|
|
|
|
if (isAscii) {
|
|
|
|
if (byteVal >= 32 && byteVal < 127) {
|
|
|
|
asciiSB.append((char) byteVal);
|
|
|
|
} else if (byteVal == 0) {
|
|
|
|
asciiSB.append('␀');
|
|
|
|
} else {
|
|
|
|
isAscii = false;
|
|
|
|
asciiSB = null;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if (i == iLimit) {
|
|
|
|
arraySB.append("…");
|
|
|
|
}
|
|
|
|
if (i == iMax || i == iLimit) {
|
|
|
|
if (isAscii) {
|
|
|
|
return asciiSB.insert(0, "\"").append("\"").toString();
|
|
|
|
} else {
|
|
|
|
return arraySB.append(']').toString();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
arraySB.append(", ");
|
|
|
|
++i;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-08-29 23:18:03 +02:00
|
|
|
public static boolean equals(Buffer a, Buffer b) {
|
2021-04-30 19:15:04 +02:00
|
|
|
if (a == null && b == null) {
|
|
|
|
return true;
|
|
|
|
} else if (a != null && b != null) {
|
2021-08-29 23:18:03 +02:00
|
|
|
var aCur = a.openCursor();
|
|
|
|
var bCur = b.openCursor();
|
|
|
|
if (aCur.bytesLeft() != bCur.bytesLeft()) {
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
while (aCur.readByte() && bCur.readByte()) {
|
|
|
|
if (aCur.getByte() != bCur.getByte()) {
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return true;
|
2021-04-30 19:15:04 +02:00
|
|
|
} else {
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-08-31 09:14:46 +02:00
|
|
|
|
|
|
|
/**
|
2021-09-02 17:15:40 +02:00
|
|
|
* Returns {@code true} if and only if the two specified buffers are identical to each other for {@code length} bytes
|
|
|
|
* starting at {@code aStartIndex} index for the {@code a} buffer and {@code bStartIndex} index for the {@code b}
|
|
|
|
* buffer. A more compact way to express this is:
|
2021-08-31 09:14:46 +02:00
|
|
|
* <p>
|
|
|
|
* {@code a[aStartIndex : aStartIndex + length] == b[bStartIndex : bStartIndex + length]}
|
|
|
|
*/
|
|
|
|
public static boolean equals(Buffer a, int aStartIndex, Buffer b, int bStartIndex, int length) {
|
|
|
|
var aCur = a.openCursor(aStartIndex, length);
|
|
|
|
var bCur = b.openCursor(bStartIndex, length);
|
|
|
|
if (aCur.bytesLeft() != bCur.bytesLeft()) {
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
while (aCur.readByte() && bCur.readByte()) {
|
|
|
|
if (aCur.getByte() != bCur.getByte()) {
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
|
2021-09-23 02:15:58 +02:00
|
|
|
public static byte[] toArray(@Nullable Buffer key) {
|
|
|
|
if (key == null) {
|
|
|
|
return EMPTY_BYTE_ARRAY;
|
|
|
|
}
|
2021-08-29 23:18:03 +02:00
|
|
|
byte[] array = new byte[key.readableBytes()];
|
|
|
|
key.copyInto(key.readerOffset(), array, 0, key.readableBytes());
|
|
|
|
return array;
|
2021-04-30 19:15:04 +02:00
|
|
|
}
|
|
|
|
|
2021-08-29 23:18:03 +02:00
|
|
|
public static List<byte[]> toArray(List<Buffer> input) {
|
2021-04-30 19:15:04 +02:00
|
|
|
List<byte[]> result = new ArrayList<>(input.size());
|
2021-08-29 23:18:03 +02:00
|
|
|
for (Buffer byteBuf : input) {
|
2021-04-30 19:15:04 +02:00
|
|
|
result.add(toArray(byteBuf));
|
|
|
|
}
|
|
|
|
return result;
|
|
|
|
}
|
|
|
|
|
2021-08-29 23:18:03 +02:00
|
|
|
public static int hashCode(Buffer buf) {
|
2021-09-02 17:15:40 +02:00
|
|
|
if (buf == null) {
|
2021-08-29 23:18:03 +02:00
|
|
|
return 0;
|
2021-09-02 17:15:40 +02:00
|
|
|
}
|
2021-08-29 23:18:03 +02:00
|
|
|
|
|
|
|
int result = 1;
|
|
|
|
var cur = buf.openCursor();
|
|
|
|
while (cur.readByte()) {
|
|
|
|
var element = cur.getByte();
|
|
|
|
result = 31 * result + element;
|
|
|
|
}
|
|
|
|
|
|
|
|
return result;
|
2021-04-30 19:15:04 +02:00
|
|
|
}
|
|
|
|
|
2021-08-29 23:18:03 +02:00
|
|
|
/**
|
|
|
|
* @return null if size is equal to RocksDB.NOT_FOUND
|
|
|
|
*/
|
2021-04-30 19:15:04 +02:00
|
|
|
@Nullable
|
2021-09-22 18:33:28 +02:00
|
|
|
public static Buffer readNullableDirectNioBuffer(BufferAllocator alloc, ToIntFunction<ByteBuffer> reader) {
|
2022-03-16 19:19:26 +01:00
|
|
|
if (alloc.getAllocationType() != OFF_HEAP) {
|
|
|
|
throw new UnsupportedOperationException("Allocator type is not direct: " + alloc);
|
|
|
|
}
|
|
|
|
var directBuffer = alloc.allocate(INITIAL_DIRECT_READ_BYTE_BUF_SIZE_BYTES);
|
2021-09-01 00:01:56 +02:00
|
|
|
try {
|
2022-03-16 19:19:26 +01:00
|
|
|
assert directBuffer.readerOffset() == 0;
|
|
|
|
assert directBuffer.writerOffset() == 0;
|
2022-10-02 03:09:50 +02:00
|
|
|
var directBufferWriter = ((BufferComponent) directBuffer).writableBuffer();
|
2022-03-16 19:19:26 +01:00
|
|
|
assert directBufferWriter.position() == 0;
|
|
|
|
assert directBufferWriter.capacity() >= directBuffer.capacity();
|
|
|
|
assert directBufferWriter.isDirect();
|
2021-12-12 02:17:36 +01:00
|
|
|
int trueSize = reader.applyAsInt(directBufferWriter);
|
|
|
|
if (trueSize == RocksDB.NOT_FOUND) {
|
|
|
|
directBuffer.close();
|
|
|
|
return null;
|
2021-09-01 00:01:56 +02:00
|
|
|
}
|
2021-12-12 02:17:36 +01:00
|
|
|
int readSize = directBufferWriter.limit();
|
|
|
|
if (trueSize < readSize) {
|
|
|
|
throw new IllegalStateException();
|
|
|
|
} else if (trueSize == readSize) {
|
|
|
|
return directBuffer.writerOffset(directBufferWriter.limit());
|
|
|
|
} else {
|
|
|
|
assert directBuffer.readerOffset() == 0;
|
|
|
|
directBuffer.ensureWritable(trueSize);
|
|
|
|
assert directBuffer.writerOffset() == 0;
|
2022-10-02 03:09:50 +02:00
|
|
|
directBufferWriter = ((BufferComponent) directBuffer).writableBuffer();
|
2021-12-12 02:17:36 +01:00
|
|
|
assert directBufferWriter.position() == 0;
|
|
|
|
assert directBufferWriter.isDirect();
|
2021-12-13 01:57:37 +01:00
|
|
|
reader.applyAsInt(directBufferWriter.position(0));
|
2021-12-12 02:17:36 +01:00
|
|
|
return directBuffer.writerOffset(trueSize);
|
|
|
|
}
|
|
|
|
} catch (Throwable t) {
|
|
|
|
directBuffer.close();
|
|
|
|
throw t;
|
2021-09-01 00:01:56 +02:00
|
|
|
}
|
2021-04-30 19:15:04 +02:00
|
|
|
}
|
|
|
|
|
2021-09-18 18:34:21 +02:00
|
|
|
public static void ensureBlocking() {
|
|
|
|
if (Schedulers.isInNonBlockingThread()) {
|
|
|
|
throw new UnsupportedOperationException("Called collect in a nonblocking thread");
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-10-30 17:43:59 +02:00
|
|
|
// todo: remove this ugly method
|
2021-09-20 12:51:27 +02:00
|
|
|
/**
|
|
|
|
* cleanup resource
|
|
|
|
* @param cleanupOnSuccess if true the resource will be cleaned up if the function is successful
|
|
|
|
*/
|
|
|
|
public static <U, T extends Resource<T>> Mono<U> usingSendResource(Mono<Send<T>> resourceSupplier,
|
|
|
|
Function<T, Mono<U>> resourceClosure,
|
|
|
|
boolean cleanupOnSuccess) {
|
|
|
|
return Mono.usingWhen(resourceSupplier.map(Send::receive), resourceClosure, r -> {
|
|
|
|
if (cleanupOnSuccess) {
|
2021-09-22 11:03:39 +02:00
|
|
|
return Mono.fromRunnable(() -> r.close());
|
2021-09-20 12:51:27 +02:00
|
|
|
} else {
|
|
|
|
return Mono.empty();
|
|
|
|
}
|
2021-12-17 16:24:18 +01:00
|
|
|
}, (r, ex) -> Mono.fromRunnable(() -> {
|
|
|
|
if (r.isAccessible()) {
|
|
|
|
r.close();
|
|
|
|
}
|
|
|
|
}), r -> Mono.fromRunnable(() -> {
|
|
|
|
if (r.isAccessible()) {
|
|
|
|
r.close();
|
|
|
|
}
|
2022-01-26 14:22:54 +01:00
|
|
|
}));
|
2021-09-19 12:01:11 +02:00
|
|
|
}
|
|
|
|
|
2021-09-22 11:03:39 +02:00
|
|
|
public static boolean isSet(ScoreDoc[] scoreDocs) {
|
|
|
|
for (ScoreDoc scoreDoc : scoreDocs) {
|
|
|
|
if (scoreDoc == null) {
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return true;
|
2021-09-20 16:22:39 +02:00
|
|
|
}
|
|
|
|
|
2021-09-23 02:15:58 +02:00
|
|
|
public static Send<Buffer> empty(BufferAllocator allocator) {
|
2021-09-23 15:48:27 +02:00
|
|
|
try {
|
|
|
|
return allocator.allocate(0).send();
|
|
|
|
} catch (Exception ex) {
|
|
|
|
try (var empty = CompositeBuffer.compose(allocator)) {
|
|
|
|
assert empty.readableBytes() == 0;
|
|
|
|
assert empty.capacity() == 0;
|
|
|
|
return empty.send();
|
|
|
|
}
|
2021-09-23 02:15:58 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
public static Send<Buffer> copy(BufferAllocator allocator, Buffer buf) {
|
|
|
|
if (CompositeBuffer.isComposite(buf) && buf.capacity() == 0) {
|
|
|
|
return empty(allocator);
|
|
|
|
} else {
|
|
|
|
return buf.copy().send();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-03-24 23:56:23 +01:00
|
|
|
public static boolean isBoundedRange(LLRange rangeShared) {
|
2022-03-19 16:36:59 +01:00
|
|
|
return rangeShared.hasMin() && rangeShared.hasMax();
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
2022-05-10 16:57:41 +02:00
|
|
|
* Generate a ReadOptions, with some parameters modified to help with bulk iterations
|
2022-05-11 00:29:42 +02:00
|
|
|
* @param readOptions the read options to start with, it will be modified
|
2022-03-19 16:36:59 +01:00
|
|
|
* @param canFillCache true to fill the cache. If closedRange is false, this field will be ignored
|
2022-03-24 23:56:23 +01:00
|
|
|
* @param boundedRange true if the range is bounded from both sides
|
|
|
|
* @param smallRange true if the range is small
|
2022-05-10 16:57:41 +02:00
|
|
|
* @return the passed instance of ReadOptions, or a new one if the passed readOptions is null
|
2022-03-19 16:36:59 +01:00
|
|
|
*/
|
2022-05-20 10:20:00 +02:00
|
|
|
public static ReadOptions generateCustomReadOptions(@Nullable ReadOptions readOptions,
|
2022-03-22 11:50:30 +01:00
|
|
|
boolean canFillCache,
|
2022-03-24 23:56:23 +01:00
|
|
|
boolean boundedRange,
|
|
|
|
boolean smallRange) {
|
2022-05-10 16:57:41 +02:00
|
|
|
if (readOptions == null) {
|
|
|
|
//noinspection resource
|
2022-05-20 10:20:00 +02:00
|
|
|
readOptions = new ReadOptions();
|
2022-03-19 16:36:59 +01:00
|
|
|
}
|
2022-03-24 23:56:23 +01:00
|
|
|
if (boundedRange || smallRange) {
|
2022-05-20 10:20:00 +02:00
|
|
|
readOptions.setFillCache(canFillCache);
|
2022-03-22 11:50:30 +01:00
|
|
|
} else {
|
2022-05-20 10:20:00 +02:00
|
|
|
if (readOptions.readaheadSize() <= 0) {
|
|
|
|
readOptions.setReadaheadSize(4 * 1024 * 1024); // 4MiB
|
2022-04-26 17:12:22 +02:00
|
|
|
}
|
2022-05-20 10:20:00 +02:00
|
|
|
readOptions.setFillCache(false);
|
|
|
|
readOptions.setVerifyChecksums(false);
|
2022-03-19 16:36:59 +01:00
|
|
|
}
|
2022-03-22 11:50:30 +01:00
|
|
|
|
2022-04-15 16:49:01 +02:00
|
|
|
if (FORCE_DISABLE_CHECKSUM_VERIFICATION) {
|
2022-05-20 10:20:00 +02:00
|
|
|
readOptions.setVerifyChecksums(false);
|
2022-04-15 16:49:01 +02:00
|
|
|
}
|
|
|
|
|
2022-03-19 16:36:59 +01:00
|
|
|
return readOptions;
|
|
|
|
}
|
|
|
|
|
2022-05-21 22:41:48 +02:00
|
|
|
public static Mono<Void> finalizeResource(Resource<?> resource) {
|
2022-07-23 14:36:40 +02:00
|
|
|
Mono<Void> runnable = Mono.fromRunnable(() -> LLUtils.finalizeResourceNow(resource));
|
|
|
|
if (resource instanceof LuceneCloseable) {
|
|
|
|
return runnable.transform(LuceneUtils::scheduleLucene);
|
|
|
|
} else {
|
|
|
|
return runnable;
|
|
|
|
}
|
2022-05-21 22:41:48 +02:00
|
|
|
}
|
|
|
|
|
2022-07-02 11:44:13 +02:00
|
|
|
public static Mono<Void> finalizeResource(SafeCloseable resource) {
|
2022-07-23 02:42:48 +02:00
|
|
|
Mono<Void> runnable = Mono.fromRunnable(resource::close);
|
|
|
|
if (resource instanceof LuceneCloseable) {
|
2022-07-23 14:25:59 +02:00
|
|
|
return runnable.transform(LuceneUtils::scheduleLucene);
|
2022-07-23 02:42:48 +02:00
|
|
|
} else {
|
|
|
|
return runnable;
|
|
|
|
}
|
2022-06-20 12:30:33 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
public static void finalizeResourceNow(Resource<?> resource) {
|
|
|
|
if (resource.isAccessible()) {
|
|
|
|
resource.close();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-07-02 11:44:13 +02:00
|
|
|
public static void finalizeResourceNow(SafeCloseable resource) {
|
2022-06-20 12:30:33 +02:00
|
|
|
resource.close();
|
2022-06-14 18:05:26 +02:00
|
|
|
}
|
|
|
|
|
2022-05-21 22:41:48 +02:00
|
|
|
public static <V> Flux<V> handleDiscard(Flux<V> flux) {
|
|
|
|
return flux.doOnDiscard(Object.class, LLUtils::onDiscard);
|
|
|
|
}
|
|
|
|
|
|
|
|
public static <V> Mono<V> handleDiscard(Mono<V> flux) {
|
|
|
|
return flux.doOnDiscard(Object.class, LLUtils::onDiscard);
|
2022-05-21 15:28:52 +02:00
|
|
|
}
|
|
|
|
|
2022-06-14 17:46:49 +02:00
|
|
|
/**
|
|
|
|
* Obtain the resource, then run the closure.
|
|
|
|
* If the closure publisher returns a single element, then the resource is kept open,
|
|
|
|
* otherwise it is closed.
|
|
|
|
*/
|
|
|
|
public static <T extends AutoCloseable, U> Mono<U> singleOrClose(Mono<T> resourceMono,
|
|
|
|
Function<T, Mono<U>> closure) {
|
2022-07-23 14:25:59 +02:00
|
|
|
return Mono.usingWhen(resourceMono, resource -> {
|
|
|
|
if (resource instanceof LuceneCloseable) {
|
|
|
|
return closure.apply(resource).publishOn(luceneScheduler()).doOnSuccess(s -> {
|
2022-06-14 17:46:49 +02:00
|
|
|
if (s == null) {
|
|
|
|
try {
|
|
|
|
resource.close();
|
|
|
|
} catch (Exception e) {
|
|
|
|
throw new RuntimeException(e);
|
|
|
|
}
|
|
|
|
}
|
2022-07-23 14:25:59 +02:00
|
|
|
}).publishOn(Schedulers.parallel());
|
|
|
|
} else {
|
|
|
|
return closure.apply(resource).doOnSuccess(s -> {
|
|
|
|
if (s == null) {
|
|
|
|
try {
|
|
|
|
resource.close();
|
|
|
|
} catch (Exception e) {
|
|
|
|
throw new RuntimeException(e);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
});
|
|
|
|
}
|
|
|
|
}, resource -> Mono.empty(), (resource, ex) -> Mono.fromCallable(() -> {
|
|
|
|
resource.close();
|
|
|
|
return null;
|
|
|
|
}), r -> (r instanceof SafeCloseable s) ? LLUtils.finalizeResource(s) : Mono.fromCallable(() -> {
|
|
|
|
r.close();
|
|
|
|
return null;
|
|
|
|
}));
|
2022-06-14 17:46:49 +02:00
|
|
|
}
|
|
|
|
|
2022-07-28 23:41:10 +02:00
|
|
|
public static Disposable scheduleRepeated(Scheduler scheduler, Runnable action, Duration delay) {
|
2022-07-28 23:44:23 +02:00
|
|
|
var currentDisposable = new AtomicReference<Disposable>();
|
2022-07-28 23:48:45 +02:00
|
|
|
var disposed = new AtomicBoolean(false);
|
|
|
|
scheduleRepeatedInternal(scheduler, action, delay, currentDisposable, disposed);
|
|
|
|
return () -> {
|
|
|
|
disposed.set(true);
|
|
|
|
currentDisposable.get().dispose();
|
|
|
|
};
|
2022-07-28 23:44:23 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
private static void scheduleRepeatedInternal(Scheduler scheduler,
|
|
|
|
Runnable action,
|
|
|
|
Duration delay,
|
2022-07-28 23:48:45 +02:00
|
|
|
AtomicReference<Disposable> currentDisposable,
|
|
|
|
AtomicBoolean disposed) {
|
|
|
|
if (disposed.get()) return;
|
2022-07-28 23:44:23 +02:00
|
|
|
currentDisposable.set(scheduler.schedule(() -> {
|
2022-07-28 23:48:45 +02:00
|
|
|
if (disposed.get()) return;
|
2022-07-28 23:41:10 +02:00
|
|
|
try {
|
|
|
|
action.run();
|
|
|
|
} catch (Throwable ex) {
|
|
|
|
logger.error(ex);
|
|
|
|
}
|
2022-07-28 23:48:45 +02:00
|
|
|
scheduleRepeatedInternal(scheduler, action, delay, currentDisposable, disposed);
|
2022-07-28 23:44:23 +02:00
|
|
|
}, delay.toMillis(), TimeUnit.MILLISECONDS));
|
2022-07-28 23:41:10 +02:00
|
|
|
}
|
|
|
|
|
2022-07-29 00:32:08 +02:00
|
|
|
public static boolean isAccessible(AbstractNativeReference abstractNativeReference) {
|
|
|
|
if (IS_ACCESSIBLE_METHOD_HANDLE != null) {
|
|
|
|
try {
|
|
|
|
return (boolean) IS_ACCESSIBLE_METHOD_HANDLE.invoke(abstractNativeReference);
|
|
|
|
} catch (Throwable e) {
|
|
|
|
throw new RuntimeException(e);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
|
2021-12-12 02:17:36 +01:00
|
|
|
@Deprecated
|
|
|
|
public record DirectBuffer(@NotNull Buffer buffer, @NotNull ByteBuffer byteBuffer) {}
|
2021-08-29 23:18:03 +02:00
|
|
|
|
2021-09-01 00:01:56 +02:00
|
|
|
@NotNull
|
2021-12-12 02:17:36 +01:00
|
|
|
public static ByteBuffer newDirect(int size) {
|
|
|
|
return ByteBuffer.allocateDirect(size);
|
2021-09-01 00:01:56 +02:00
|
|
|
}
|
2021-08-29 23:18:03 +02:00
|
|
|
|
2022-03-16 19:19:26 +01:00
|
|
|
private static Drop<Buffer> drop() {
|
|
|
|
// We cannot reliably drop unsafe memory. We have to rely on the cleaner to do that.
|
2022-10-02 03:09:50 +02:00
|
|
|
return NO_OP_DROP;
|
2021-12-12 02:17:36 +01:00
|
|
|
}
|
|
|
|
|
2022-03-16 19:19:26 +01:00
|
|
|
public static boolean isReadOnlyDirect(Buffer inputBuffer) {
|
2022-10-02 03:09:50 +02:00
|
|
|
return inputBuffer instanceof BufferComponent component && component.readableNativeAddress() != 0;
|
2021-05-02 19:18:15 +02:00
|
|
|
}
|
|
|
|
|
2022-03-16 19:19:26 +01:00
|
|
|
public static ByteBuffer getReadOnlyDirect(Buffer inputBuffer) {
|
|
|
|
assert isReadOnlyDirect(inputBuffer);
|
2022-10-02 03:09:50 +02:00
|
|
|
return ((BufferComponent) inputBuffer).readableBuffer();
|
2021-04-30 19:15:04 +02:00
|
|
|
}
|
|
|
|
|
2021-08-29 23:18:03 +02:00
|
|
|
public static Buffer fromByteArray(BufferAllocator alloc, byte[] array) {
|
|
|
|
Buffer result = alloc.allocate(array.length);
|
2021-05-05 17:31:21 +02:00
|
|
|
result.writeBytes(array);
|
|
|
|
return result;
|
|
|
|
}
|
|
|
|
|
2021-04-30 19:15:04 +02:00
|
|
|
@NotNull
|
2021-09-22 18:33:28 +02:00
|
|
|
public static Buffer readDirectNioBuffer(BufferAllocator alloc, ToIntFunction<ByteBuffer> reader) {
|
|
|
|
var nullable = readNullableDirectNioBuffer(alloc, reader);
|
|
|
|
if (nullable == null) {
|
|
|
|
throw new IllegalStateException("A non-nullable buffer read operation tried to return a \"not found\" element");
|
2021-04-30 19:15:04 +02:00
|
|
|
}
|
2021-09-22 18:33:28 +02:00
|
|
|
return nullable;
|
2021-04-30 19:15:04 +02:00
|
|
|
}
|
|
|
|
|
2021-09-22 18:33:28 +02:00
|
|
|
public static Buffer compositeBuffer(BufferAllocator alloc, Send<Buffer> buffer) {
|
|
|
|
return buffer.receive();
|
2021-04-30 19:15:04 +02:00
|
|
|
}
|
|
|
|
|
2021-09-23 02:15:58 +02:00
|
|
|
@NotNull
|
|
|
|
public static Buffer compositeBuffer(BufferAllocator alloc,
|
|
|
|
@NotNull Send<Buffer> buffer1,
|
|
|
|
@NotNull Send<Buffer> buffer2) {
|
2021-09-23 23:45:41 +02:00
|
|
|
var b1 = buffer1.receive();
|
|
|
|
try (var b2 = buffer2.receive()) {
|
|
|
|
if (b1.writerOffset() < b1.capacity() || b2.writerOffset() < b2.capacity()) {
|
|
|
|
b1.ensureWritable(b2.readableBytes(), b2.readableBytes(), true);
|
|
|
|
b2.copyInto(b2.readerOffset(), b1, b1.writerOffset(), b2.readableBytes());
|
|
|
|
b1.writerOffset(b1.writerOffset() + b2.readableBytes());
|
|
|
|
return b1;
|
|
|
|
} else {
|
2022-05-18 00:52:01 +02:00
|
|
|
return alloc.compose(List.of(b1.send(), b2.send()));
|
2021-09-23 23:45:41 +02:00
|
|
|
}
|
|
|
|
}
|
2021-04-30 19:15:04 +02:00
|
|
|
}
|
|
|
|
|
2021-09-23 02:15:58 +02:00
|
|
|
@NotNull
|
2021-09-22 18:33:28 +02:00
|
|
|
public static Buffer compositeBuffer(BufferAllocator alloc,
|
2021-09-23 02:15:58 +02:00
|
|
|
@NotNull Send<Buffer> buffer1,
|
|
|
|
@NotNull Send<Buffer> buffer2,
|
|
|
|
@NotNull Send<Buffer> buffer3) {
|
2021-09-23 23:45:41 +02:00
|
|
|
var b1 = buffer1.receive();
|
|
|
|
try (var b2 = buffer2.receive()) {
|
|
|
|
try (var b3 = buffer3.receive()) {
|
|
|
|
if (b1.writerOffset() < b1.capacity()
|
|
|
|
|| b2.writerOffset() < b2.capacity()
|
|
|
|
|| b3.writerOffset() < b3.capacity()) {
|
|
|
|
b1.ensureWritable(b2.readableBytes(), b2.readableBytes(), true);
|
|
|
|
b2.copyInto(b2.readerOffset(), b1, b1.writerOffset(), b2.readableBytes());
|
|
|
|
b1.writerOffset(b1.writerOffset() + b2.readableBytes());
|
|
|
|
|
|
|
|
b1.ensureWritable(b3.readableBytes(), b3.readableBytes(), true);
|
|
|
|
b3.copyInto(b3.readerOffset(), b1, b1.writerOffset(), b3.readableBytes());
|
|
|
|
b1.writerOffset(b1.writerOffset() + b3.readableBytes());
|
|
|
|
return b1;
|
|
|
|
} else {
|
2022-05-18 00:52:01 +02:00
|
|
|
return alloc.compose(List.of(b1.send(), b2.send(), b3.send()));
|
2021-09-23 23:45:41 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2021-04-30 19:15:04 +02:00
|
|
|
}
|
|
|
|
|
2021-05-08 03:09:00 +02:00
|
|
|
public static <T> Mono<T> resolveDelta(Mono<Delta<T>> prev, UpdateReturnMode updateReturnMode) {
|
|
|
|
return prev.handle((delta, sink) -> {
|
|
|
|
switch (updateReturnMode) {
|
2021-08-22 21:23:22 +02:00
|
|
|
case GET_NEW_VALUE -> {
|
2021-05-21 00:19:40 +02:00
|
|
|
var current = delta.current();
|
2021-05-08 03:09:00 +02:00
|
|
|
if (current != null) {
|
|
|
|
sink.next(current);
|
|
|
|
} else {
|
|
|
|
sink.complete();
|
|
|
|
}
|
2021-08-22 21:23:22 +02:00
|
|
|
}
|
|
|
|
case GET_OLD_VALUE -> {
|
2021-05-21 00:19:40 +02:00
|
|
|
var previous = delta.previous();
|
2021-05-08 03:09:00 +02:00
|
|
|
if (previous != null) {
|
|
|
|
sink.next(previous);
|
|
|
|
} else {
|
|
|
|
sink.complete();
|
|
|
|
}
|
2021-08-22 21:23:22 +02:00
|
|
|
}
|
|
|
|
case NOTHING -> sink.complete();
|
|
|
|
default -> sink.error(new IllegalStateException());
|
2021-05-08 03:09:00 +02:00
|
|
|
}
|
|
|
|
});
|
|
|
|
}
|
|
|
|
|
2022-05-20 10:20:00 +02:00
|
|
|
public static Mono<Buffer> resolveLLDelta(Mono<LLDelta> prev, UpdateReturnMode updateReturnMode) {
|
2022-05-20 23:59:56 +02:00
|
|
|
return prev.mapNotNull(delta -> {
|
2022-05-20 18:31:05 +02:00
|
|
|
final Buffer previous = delta.previousUnsafe();
|
|
|
|
final Buffer current = delta.currentUnsafe();
|
2022-05-20 23:59:56 +02:00
|
|
|
return switch (updateReturnMode) {
|
2022-05-20 18:31:05 +02:00
|
|
|
case GET_NEW_VALUE -> {
|
|
|
|
if (previous != null && previous.isAccessible()) {
|
|
|
|
previous.close();
|
2021-08-29 23:18:03 +02:00
|
|
|
}
|
2022-05-20 23:59:56 +02:00
|
|
|
yield current;
|
2021-08-29 23:18:03 +02:00
|
|
|
}
|
2022-05-20 18:31:05 +02:00
|
|
|
case GET_OLD_VALUE -> {
|
|
|
|
if (current != null && current.isAccessible()) {
|
|
|
|
current.close();
|
|
|
|
}
|
2022-05-20 23:59:56 +02:00
|
|
|
yield previous;
|
2022-05-20 18:31:05 +02:00
|
|
|
}
|
|
|
|
case NOTHING -> {
|
|
|
|
if (previous != null && previous.isAccessible()) {
|
|
|
|
previous.close();
|
|
|
|
}
|
|
|
|
if (current != null && current.isAccessible()) {
|
|
|
|
current.close();
|
|
|
|
}
|
2022-05-20 23:59:56 +02:00
|
|
|
yield null;
|
2022-05-20 18:31:05 +02:00
|
|
|
}
|
2022-05-20 23:59:56 +02:00
|
|
|
};
|
2021-08-29 23:18:03 +02:00
|
|
|
});
|
|
|
|
}
|
|
|
|
|
2021-08-22 21:23:22 +02:00
|
|
|
public static <T, U> Mono<Delta<U>> mapDelta(Mono<Delta<T>> mono,
|
|
|
|
SerializationFunction<@NotNull T, @Nullable U> mapper) {
|
|
|
|
return mono.handle((delta, sink) -> {
|
|
|
|
try {
|
|
|
|
T prev = delta.previous();
|
|
|
|
T curr = delta.current();
|
|
|
|
U newPrev;
|
|
|
|
U newCurr;
|
|
|
|
if (prev != null) {
|
|
|
|
newPrev = mapper.apply(prev);
|
|
|
|
} else {
|
|
|
|
newPrev = null;
|
|
|
|
}
|
|
|
|
if (curr != null) {
|
|
|
|
newCurr = mapper.apply(curr);
|
|
|
|
} else {
|
|
|
|
newCurr = null;
|
|
|
|
}
|
|
|
|
sink.next(new Delta<>(newPrev, newCurr));
|
|
|
|
} catch (SerializationException ex) {
|
|
|
|
sink.error(ex);
|
2021-05-08 03:09:00 +02:00
|
|
|
}
|
|
|
|
});
|
|
|
|
}
|
|
|
|
|
2022-05-20 10:20:00 +02:00
|
|
|
public static <U> Mono<Delta<U>> mapLLDelta(Mono<LLDelta> mono,
|
|
|
|
SerializationFunction<@NotNull Buffer, @Nullable U> mapper) {
|
2022-05-20 23:59:56 +02:00
|
|
|
return Mono.usingWhen(mono, delta -> Mono.fromCallable(() -> {
|
2022-05-22 16:48:08 +02:00
|
|
|
Buffer prev = delta.previousUnsafe();
|
|
|
|
Buffer curr = delta.currentUnsafe();
|
2022-05-20 23:59:56 +02:00
|
|
|
U newPrev;
|
|
|
|
U newCurr;
|
|
|
|
if (prev != null) {
|
|
|
|
newPrev = mapper.apply(prev);
|
|
|
|
} else {
|
|
|
|
newPrev = null;
|
2021-08-29 23:18:03 +02:00
|
|
|
}
|
2022-05-20 23:59:56 +02:00
|
|
|
if (curr != null) {
|
|
|
|
newCurr = mapper.apply(curr);
|
|
|
|
} else {
|
|
|
|
newCurr = null;
|
|
|
|
}
|
|
|
|
return new Delta<>(newPrev, newCurr);
|
2022-06-20 12:30:33 +02:00
|
|
|
}), LLUtils::finalizeResource);
|
2021-08-29 23:18:03 +02:00
|
|
|
}
|
|
|
|
|
2021-05-08 03:09:00 +02:00
|
|
|
public static <R, V> boolean isDeltaChanged(Delta<V> delta) {
|
2021-05-21 00:19:40 +02:00
|
|
|
return !Objects.equals(delta.previous(), delta.current());
|
2021-05-08 03:09:00 +02:00
|
|
|
}
|
2021-08-22 18:20:05 +02:00
|
|
|
|
2021-08-29 23:18:03 +02:00
|
|
|
public static boolean isDirect(Buffer key) {
|
2021-08-31 15:50:11 +02:00
|
|
|
var readableComponents = key.countReadableComponents();
|
|
|
|
if (readableComponents == 0) {
|
|
|
|
return true;
|
|
|
|
} else if (readableComponents == 1) {
|
2022-10-02 03:09:50 +02:00
|
|
|
return key.isDirect();
|
2021-08-29 23:18:03 +02:00
|
|
|
} else {
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
public static String deserializeString(Send<Buffer> bufferSend, int readerOffset, int length, Charset charset) {
|
|
|
|
try (var buffer = bufferSend.receive()) {
|
|
|
|
byte[] bytes = new byte[Math.min(length, buffer.readableBytes())];
|
|
|
|
buffer.copyInto(readerOffset, bytes, 0, length);
|
|
|
|
return new String(bytes, charset);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-10-19 00:22:05 +02:00
|
|
|
public static String deserializeString(@NotNull Buffer buffer, int readerOffset, int length, Charset charset) {
|
|
|
|
byte[] bytes = new byte[Math.min(length, buffer.readableBytes())];
|
|
|
|
buffer.copyInto(readerOffset, bytes, 0, length);
|
|
|
|
return new String(bytes, charset);
|
|
|
|
}
|
|
|
|
|
2021-08-29 23:18:03 +02:00
|
|
|
public static int utf8MaxBytes(String deserialized) {
|
|
|
|
return deserialized.length() * 3;
|
|
|
|
}
|
2022-01-26 14:22:54 +01:00
|
|
|
|
|
|
|
private static void onNextDropped(Object next) {
|
2022-05-21 22:41:48 +02:00
|
|
|
if (DEBUG_ALL_DROPS) {
|
|
|
|
logger.trace("Dropped: {}", () -> next.getClass().getName());
|
|
|
|
}
|
2022-06-30 15:06:10 +02:00
|
|
|
closeResource(next, false);
|
2022-05-21 22:41:48 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
public static void onDiscard(Object next) {
|
|
|
|
if (DEBUG_ALL_DISCARDS) {
|
|
|
|
logger.trace("Discarded: {}", () -> next.getClass().getName());
|
|
|
|
}
|
2022-06-30 15:06:10 +02:00
|
|
|
closeResource(next, false);
|
2022-05-21 22:41:48 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
public static void closeResource(Object next) {
|
2022-06-30 15:06:10 +02:00
|
|
|
closeResource(next, true);
|
|
|
|
}
|
|
|
|
|
|
|
|
private static void closeResource(Object next, boolean manual) {
|
2022-01-26 14:22:54 +01:00
|
|
|
if (next instanceof Send<?> send) {
|
|
|
|
send.close();
|
2022-06-30 15:06:10 +02:00
|
|
|
} if (next instanceof SafeCloseable closeable) {
|
|
|
|
if (manual || closeable instanceof DiscardingCloseable) {
|
2022-07-23 14:36:40 +02:00
|
|
|
if (!manual && !LuceneUtils.isLuceneThread() && closeable instanceof LuceneCloseable luceneCloseable) {
|
|
|
|
luceneScheduler().schedule(() -> luceneCloseable.close());
|
|
|
|
} else {
|
|
|
|
closeable.close();
|
|
|
|
}
|
2022-06-30 15:06:10 +02:00
|
|
|
}
|
2022-05-20 18:31:05 +02:00
|
|
|
} else if (next instanceof Resource<?> resource && resource.isAccessible()) {
|
2022-01-26 14:22:54 +01:00
|
|
|
resource.close();
|
2022-10-24 01:17:08 +02:00
|
|
|
} else if (next instanceof List<?> iterable) {
|
|
|
|
iterable.forEach(obj -> closeResource(obj, manual));
|
|
|
|
} else if (next instanceof Set<?> iterable) {
|
|
|
|
iterable.forEach(obj -> closeResource(obj, manual));
|
2022-05-20 10:20:00 +02:00
|
|
|
} else if (next instanceof AbstractImmutableNativeReference rocksObj) {
|
|
|
|
if (rocksObj.isOwningHandle()) {
|
|
|
|
rocksObj.close();
|
|
|
|
}
|
2022-01-26 14:22:54 +01:00
|
|
|
} else if (next instanceof Optional<?> optional) {
|
2022-10-24 01:17:08 +02:00
|
|
|
optional.ifPresent(obj -> closeResource(obj, manual));
|
2022-01-26 14:22:54 +01:00
|
|
|
} else if (next instanceof Map.Entry<?, ?> entry) {
|
|
|
|
var key = entry.getKey();
|
|
|
|
if (key != null) {
|
2022-10-24 01:17:08 +02:00
|
|
|
closeResource(key, manual);
|
2022-01-26 14:22:54 +01:00
|
|
|
}
|
|
|
|
var value = entry.getValue();
|
|
|
|
if (value != null) {
|
2022-10-24 01:17:08 +02:00
|
|
|
closeResource(value, manual);
|
2022-01-26 14:22:54 +01:00
|
|
|
}
|
|
|
|
} else if (next instanceof Delta<?> delta) {
|
|
|
|
var previous = delta.previous();
|
|
|
|
if (previous != null) {
|
2022-10-24 01:17:08 +02:00
|
|
|
closeResource(previous, manual);
|
2022-01-26 14:22:54 +01:00
|
|
|
}
|
|
|
|
var current = delta.current();
|
|
|
|
if (current != null) {
|
2022-10-24 01:17:08 +02:00
|
|
|
closeResource(current, manual);
|
2022-01-26 14:22:54 +01:00
|
|
|
}
|
|
|
|
} else if (next instanceof Map<?, ?> map) {
|
|
|
|
map.forEach((key, value) -> {
|
|
|
|
if (key != null) {
|
2022-10-24 01:17:08 +02:00
|
|
|
closeResource(key, manual);
|
2022-01-26 14:22:54 +01:00
|
|
|
}
|
|
|
|
if (value != null) {
|
2022-10-24 01:17:08 +02:00
|
|
|
closeResource(value, manual);
|
2022-01-26 14:22:54 +01:00
|
|
|
}
|
|
|
|
});
|
|
|
|
}
|
|
|
|
}
|
2022-02-09 20:22:32 +01:00
|
|
|
|
|
|
|
private static class FakeBytesRefBuilder extends BytesRefBuilder {
|
|
|
|
|
|
|
|
private final LLTerm term;
|
|
|
|
|
|
|
|
public FakeBytesRefBuilder(LLTerm term) {
|
|
|
|
this.term = term;
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public BytesRef toBytesRef() {
|
2022-02-25 15:46:32 +01:00
|
|
|
return term.getValueBytesRef();
|
2022-02-09 20:22:32 +01:00
|
|
|
}
|
|
|
|
}
|
2020-12-07 22:15:18 +01:00
|
|
|
}
|