CavalliumDBEngine/src/main/java/it/cavallium/dbengine/database/LLUtils.java

483 lines
14 KiB
Java
Raw Normal View History

2020-12-07 22:15:18 +01:00
package it.cavallium.dbengine.database;
import com.google.common.primitives.Ints;
import com.google.common.primitives.Longs;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.CompositeByteBuf;
2021-05-03 21:41:51 +02:00
import io.netty.buffer.Unpooled;
import io.netty.util.IllegalReferenceCountException;
2021-01-30 22:14:48 +01:00
import it.cavallium.dbengine.lucene.RandomSortField;
2020-12-07 22:15:18 +01:00
import java.nio.ByteBuffer;
import java.util.ArrayList;
2021-05-03 21:41:51 +02:00
import java.util.Arrays;
2020-12-07 22:15:18 +01:00
import java.util.LinkedList;
import java.util.List;
2021-05-08 03:09:00 +02:00
import java.util.Objects;
import java.util.function.Function;
import java.util.function.ToIntFunction;
2020-12-07 22:15:18 +01:00
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.FloatPoint;
import org.apache.lucene.document.IntPoint;
import org.apache.lucene.document.LongPoint;
import org.apache.lucene.document.SortedNumericDocValuesField;
import org.apache.lucene.document.StringField;
import org.apache.lucene.document.TextField;
import org.apache.lucene.index.IndexableField;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.ScoreMode;
2020-12-07 22:15:18 +01:00
import org.apache.lucene.search.Sort;
import org.apache.lucene.search.SortField;
import org.apache.lucene.search.SortedNumericSortField;
import org.jetbrains.annotations.NotNull;
2020-12-07 22:15:18 +01:00
import org.jetbrains.annotations.Nullable;
import org.rocksdb.RocksDB;
2021-05-08 03:09:00 +02:00
import reactor.core.publisher.Mono;
@SuppressWarnings("unused")
2020-12-07 22:15:18 +01:00
public class LLUtils {
private static final byte[] RESPONSE_TRUE = new byte[]{1};
private static final byte[] RESPONSE_FALSE = new byte[]{0};
private static final byte[] RESPONSE_TRUE_BUF = new byte[]{1};
private static final byte[] RESPONSE_FALSE_BUF = new byte[]{0};
2021-03-18 19:53:32 +01:00
public static final byte[][] LEXICONOGRAPHIC_ITERATION_SEEKS = new byte[256][1];
static {
for (int i1 = 0; i1 < 256; i1++) {
var b = LEXICONOGRAPHIC_ITERATION_SEEKS[i1];
b[0] = (byte) i1;
}
}
2020-12-07 22:15:18 +01:00
public static boolean responseToBoolean(byte[] response) {
return response[0] == 1;
}
public static boolean responseToBoolean(ByteBuf response) {
try {
assert response.readableBytes() == 1;
return response.getByte(response.readerIndex()) == 1;
} finally {
response.release();
}
}
2020-12-07 22:15:18 +01:00
public static byte[] booleanToResponse(boolean bool) {
return bool ? RESPONSE_TRUE : RESPONSE_FALSE;
}
public static ByteBuf booleanToResponseByteBuffer(boolean bool) {
2021-05-03 21:41:51 +02:00
return Unpooled.wrappedBuffer(booleanToResponse(bool));
}
2020-12-07 22:15:18 +01:00
@Nullable
public static Sort toSort(@Nullable LLSort sort) {
if (sort == null) {
return null;
}
if (sort.getType() == LLSortType.LONG) {
return new Sort(new SortedNumericSortField(sort.getFieldName(), SortField.Type.LONG, sort.isReverse()));
} else if (sort.getType() == LLSortType.RANDOM) {
return new Sort(new RandomSortField());
} else if (sort.getType() == LLSortType.SCORE) {
return new Sort(SortField.FIELD_SCORE);
} else if (sort.getType() == LLSortType.DOC) {
return new Sort(SortField.FIELD_DOC);
2020-12-07 22:15:18 +01:00
}
return null;
}
public static ScoreMode toScoreMode(LLScoreMode scoreMode) {
switch (scoreMode) {
2021-01-30 22:29:33 +01:00
case COMPLETE:
return ScoreMode.COMPLETE;
case TOP_SCORES:
return ScoreMode.TOP_SCORES;
case COMPLETE_NO_SCORES:
return ScoreMode.COMPLETE_NO_SCORES;
default:
throw new IllegalStateException("Unexpected value: " + scoreMode);
}
}
2020-12-07 22:15:18 +01:00
public static Term toTerm(LLTerm term) {
return new Term(term.getKey(), term.getValue());
}
public static Document toDocument(LLDocument document) {
Document d = new Document();
for (LLItem item : document.getItems()) {
d.add(LLUtils.toField(item));
}
return d;
}
public static Iterable<Document> toDocuments(Iterable<LLDocument> document) {
List<Document> d = new LinkedList<>();
for (LLDocument doc : document) {
d.add(LLUtils.toDocument(doc));
}
return d;
}
public static Iterable<Term> toTerms(Iterable<LLTerm> terms) {
List<Term> d = new LinkedList<>();
for (LLTerm term : terms) {
d.add(LLUtils.toTerm(term));
}
return d;
}
private static IndexableField toField(LLItem item) {
switch (item.getType()) {
case IntPoint:
return new IntPoint(item.getName(), Ints.fromByteArray(item.getData()));
case LongPoint:
return new LongPoint(item.getName(), Longs.fromByteArray(item.getData()));
case FloatPoint:
return new FloatPoint(item.getName(), ByteBuffer.wrap(item.getData()).getFloat());
case TextField:
return new TextField(item.getName(), item.stringValue(), Field.Store.NO);
case TextFieldStored:
return new TextField(item.getName(), item.stringValue(), Field.Store.YES);
case SortedNumericDocValuesField:
return new SortedNumericDocValuesField(item.getName(), Longs.fromByteArray(item.getData()));
case StringField:
return new StringField(item.getName(), item.stringValue(), Field.Store.NO);
case StringFieldStored:
return new StringField(item.getName(), item.stringValue(), Field.Store.YES);
}
throw new UnsupportedOperationException("Unsupported field type");
}
public static it.cavallium.dbengine.database.LLKeyScore toKeyScore(LLKeyScore hit) {
return new it.cavallium.dbengine.database.LLKeyScore(hit.getKey(), hit.getScore());
}
public static String toStringSafe(ByteBuf key) {
try {
if (key.refCnt() > 0) {
return toString(key);
} else {
return "(released)";
}
} catch (IllegalReferenceCountException ex) {
return "(released)";
}
}
public static String toString(ByteBuf key) {
if (key == null) {
return "null";
} else {
int startIndex = key.readerIndex();
int iMax = key.readableBytes() - 1;
int iLimit = 128;
if (iMax <= -1) {
return "[]";
} else {
StringBuilder b = new StringBuilder();
b.append('[');
int i = 0;
while(true) {
b.append(key.getByte(startIndex + i));
if (i == iLimit) {
b.append("…");
}
if (i == iMax || i == iLimit) {
return b.append(']').toString();
}
b.append(", ");
++i;
}
}
}
}
public static boolean equals(ByteBuf a, ByteBuf b) {
if (a == null && b == null) {
return true;
} else if (a != null && b != null) {
return ByteBufUtil.equals(a, b);
} else {
return false;
}
}
public static byte[] toArray(ByteBuf key) {
2021-05-03 21:41:51 +02:00
if (key.hasArray()) {
return Arrays.copyOfRange(key.array(), key.arrayOffset() + key.readerIndex(), key.arrayOffset() + key.writerIndex());
} else {
byte[] keyBytes = new byte[key.readableBytes()];
key.getBytes(key.readerIndex(), keyBytes, 0, key.readableBytes());
return keyBytes;
}
}
public static List<byte[]> toArray(List<ByteBuf> input) {
List<byte[]> result = new ArrayList<>(input.size());
for (ByteBuf byteBuf : input) {
result.add(toArray(byteBuf));
}
return result;
}
public static int hashCode(ByteBuf buf) {
return buf == null ? 0 : buf.hashCode();
}
@Nullable
public static ByteBuf readNullableDirectNioBuffer(ByteBufAllocator alloc, ToIntFunction<ByteBuffer> reader) {
ByteBuf buffer = alloc.directBuffer();
2021-05-02 19:18:15 +02:00
ByteBuf directBuffer = null;
ByteBuffer nioBuffer;
int size;
Boolean mustBeCopied = null;
do {
if (mustBeCopied == null || !mustBeCopied) {
nioBuffer = LLUtils.toDirectFast(buffer);
if (nioBuffer != null) {
nioBuffer.limit(nioBuffer.capacity());
}
2021-05-02 19:18:15 +02:00
} else {
nioBuffer = null;
}
if ((mustBeCopied != null && mustBeCopied) || nioBuffer == null) {
2021-05-03 21:41:51 +02:00
directBuffer = buffer;
2021-05-02 19:18:15 +02:00
nioBuffer = directBuffer.nioBuffer(0, directBuffer.capacity());
mustBeCopied = true;
} else {
mustBeCopied = false;
}
try {
assert nioBuffer.isDirect();
size = reader.applyAsInt(nioBuffer);
if (size != RocksDB.NOT_FOUND) {
if (mustBeCopied) {
buffer.writerIndex(0).writeBytes(nioBuffer);
}
2021-05-02 19:18:15 +02:00
if (size == nioBuffer.limit()) {
buffer.setIndex(0, size);
return buffer;
} else {
assert size > nioBuffer.limit();
assert nioBuffer.limit() > 0;
buffer.capacity(size);
}
}
2021-05-02 19:18:15 +02:00
} finally {
if (nioBuffer != null) {
nioBuffer = null;
}
if(directBuffer != null) {
directBuffer.release();
directBuffer = null;
}
}
} while (size != RocksDB.NOT_FOUND);
return null;
}
@Nullable
public static ByteBuffer toDirectFast(ByteBuf buffer) {
2021-05-02 19:18:15 +02:00
ByteBuffer result = buffer.nioBuffer(0, buffer.capacity());
if (result.isDirect()) {
result.limit(buffer.writerIndex());
2021-05-02 19:18:15 +02:00
assert result.isDirect();
assert result.capacity() == buffer.capacity();
assert buffer.readerIndex() == result.position();
assert result.limit() - result.position() == buffer.readableBytes();
2021-05-02 19:18:15 +02:00
return result;
} else {
return null;
}
}
public static ByteBuffer toDirect(ByteBuf buffer) {
ByteBuffer result = toDirectFast(buffer);
if (result == null) {
throw new IllegalArgumentException("The supplied ByteBuf is not direct "
+ "(if it's a CompositeByteBuf it must be consolidated before)");
}
2021-05-02 19:18:15 +02:00
assert result.isDirect();
return result;
}
2021-05-03 21:41:51 +02:00
/*
public static ByteBuf toDirectCopy(ByteBuf buffer) {
try {
2021-05-03 21:41:51 +02:00
ByteBuf directCopyBuf = buffer.alloc().buffer(buffer.capacity(), buffer.maxCapacity());
directCopyBuf.writeBytes(buffer, 0, buffer.writerIndex());
return directCopyBuf;
} finally {
buffer.release();
}
}
2021-05-03 21:41:51 +02:00
*/
2021-05-05 17:31:21 +02:00
public static ByteBuf convertToDirectByteBuf(ByteBufAllocator alloc, ByteBuf buffer) {
ByteBuf result;
2021-05-03 21:41:51 +02:00
ByteBuf directCopyBuf = alloc.buffer(buffer.capacity(), buffer.maxCapacity());
directCopyBuf.writeBytes(buffer, 0, buffer.writerIndex());
directCopyBuf.readerIndex(buffer.readerIndex());
result = directCopyBuf;
assert result.isDirect();
assert result.capacity() == buffer.capacity();
assert buffer.readerIndex() == result.readerIndex();
return result;
}
2021-05-05 17:31:21 +02:00
public static ByteBuf fromByteArray(ByteBufAllocator alloc, byte[] array) {
ByteBuf result = alloc.buffer(array.length);
result.writeBytes(array);
return result;
}
@NotNull
public static ByteBuf readDirectNioBuffer(ByteBufAllocator alloc, ToIntFunction<ByteBuffer> reader) {
var buffer = readNullableDirectNioBuffer(alloc, reader);
if (buffer == null) {
throw new IllegalStateException("A non-nullable buffer read operation tried to return a \"not found\" element");
}
return buffer;
}
2021-05-03 21:41:51 +02:00
public static ByteBuf compositeBuffer(ByteBufAllocator alloc, ByteBuf buffer) {
2021-05-03 12:49:16 +02:00
return buffer;
}
2021-05-03 21:41:51 +02:00
public static ByteBuf compositeBuffer(ByteBufAllocator alloc, ByteBuf buffer1, ByteBuf buffer2) {
2021-05-02 19:18:15 +02:00
try {
if (buffer1.readableBytes() == 0) {
2021-05-03 21:41:51 +02:00
return compositeBuffer(alloc, buffer2.retain());
2021-05-02 19:18:15 +02:00
} else if (buffer2.readableBytes() == 0) {
2021-05-03 21:41:51 +02:00
return compositeBuffer(alloc, buffer1.retain());
2021-05-02 19:18:15 +02:00
}
2021-05-03 21:41:51 +02:00
CompositeByteBuf result = alloc.compositeBuffer(2);
2021-05-02 19:18:15 +02:00
try {
2021-05-03 12:49:16 +02:00
result.addComponent(true, buffer1.retain());
result.addComponent(true, buffer2.retain());
return result.consolidate().retain();
2021-05-02 19:18:15 +02:00
} finally {
result.release();
}
} finally {
buffer1.release();
buffer2.release();
}
}
2021-05-03 21:41:51 +02:00
public static ByteBuf compositeBuffer(ByteBufAllocator alloc, ByteBuf buffer1, ByteBuf buffer2, ByteBuf buffer3) {
2021-05-02 19:18:15 +02:00
try {
if (buffer1.readableBytes() == 0) {
2021-05-03 21:41:51 +02:00
return compositeBuffer(alloc, buffer2.retain(), buffer3.retain());
2021-05-02 19:18:15 +02:00
} else if (buffer2.readableBytes() == 0) {
2021-05-03 21:41:51 +02:00
return compositeBuffer(alloc, buffer1.retain(), buffer3.retain());
2021-05-02 19:18:15 +02:00
} else if (buffer3.readableBytes() == 0) {
2021-05-03 21:41:51 +02:00
return compositeBuffer(alloc, buffer1.retain(), buffer2.retain());
2021-05-02 19:18:15 +02:00
}
2021-05-03 21:41:51 +02:00
CompositeByteBuf result = alloc.compositeBuffer(3);
2021-05-02 19:18:15 +02:00
try {
2021-05-03 12:49:16 +02:00
result.addComponent(true, buffer1.retain());
result.addComponent(true, buffer2.retain());
result.addComponent(true, buffer3.retain());
return result.consolidate().retain();
2021-05-02 19:18:15 +02:00
} finally {
result.release();
}
} finally {
buffer1.release();
buffer2.release();
buffer3.release();
}
}
2021-05-03 21:41:51 +02:00
public static ByteBuf compositeBuffer(ByteBufAllocator alloc, ByteBuf... buffers) {
2021-05-02 19:18:15 +02:00
try {
switch (buffers.length) {
case 0:
2021-05-03 21:41:51 +02:00
return alloc.buffer(0);
2021-05-02 19:18:15 +02:00
case 1:
2021-05-03 21:41:51 +02:00
return compositeBuffer(alloc, buffers[0].retain().retain());
2021-05-02 19:18:15 +02:00
case 2:
2021-05-03 21:41:51 +02:00
return compositeBuffer(alloc, buffers[0].retain(), buffers[1].retain());
2021-05-02 19:18:15 +02:00
case 3:
2021-05-03 21:41:51 +02:00
return compositeBuffer(alloc, buffers[0].retain(), buffers[1].retain(), buffers[2].retain());
2021-05-02 19:18:15 +02:00
default:
2021-05-03 21:41:51 +02:00
CompositeByteBuf result = alloc.compositeBuffer(buffers.length);
2021-05-02 19:18:15 +02:00
try {
for (ByteBuf buffer : buffers) {
2021-05-03 12:49:16 +02:00
result.addComponent(true, buffer.retain());
2021-05-02 19:18:15 +02:00
}
2021-05-03 12:49:16 +02:00
return result.consolidate().retain();
2021-05-02 19:18:15 +02:00
} finally {
result.release();
}
}
} finally {
for (ByteBuf buffer : buffers) {
buffer.release();
}
}
}
2021-05-08 03:09:00 +02:00
public static <T> Mono<T> resolveDelta(Mono<Delta<T>> prev, UpdateReturnMode updateReturnMode) {
return prev.handle((delta, sink) -> {
switch (updateReturnMode) {
case GET_NEW_VALUE:
2021-05-21 00:19:40 +02:00
var current = delta.current();
2021-05-08 03:09:00 +02:00
if (current != null) {
sink.next(current);
} else {
sink.complete();
}
break;
case GET_OLD_VALUE:
2021-05-21 00:19:40 +02:00
var previous = delta.previous();
2021-05-08 03:09:00 +02:00
if (previous != null) {
sink.next(previous);
} else {
sink.complete();
}
break;
case NOTHING:
sink.complete();
break;
default:
sink.error(new IllegalStateException());
}
});
}
public static <T, U> Mono<Delta<U>> mapDelta(Mono<Delta<T>> mono, Function<@NotNull T, @Nullable U> mapper) {
return mono.map(delta -> {
2021-05-21 00:19:40 +02:00
T prev = delta.previous();
T curr = delta.current();
2021-05-08 03:09:00 +02:00
U newPrev;
U newCurr;
if (prev != null) {
newPrev = mapper.apply(prev);
} else {
newPrev = null;
}
if (curr != null) {
newCurr = mapper.apply(curr);
} else {
newCurr = null;
}
2021-05-21 00:19:40 +02:00
return new Delta<>(newPrev, newCurr);
2021-05-08 03:09:00 +02:00
});
}
public static <R, V> boolean isDeltaChanged(Delta<V> delta) {
2021-05-21 00:19:40 +02:00
return !Objects.equals(delta.previous(), delta.current());
2021-05-08 03:09:00 +02:00
}
2020-12-07 22:15:18 +01:00
}