This commit is contained in:
Andrea Cavalli 2021-12-13 01:57:37 +01:00
parent 907561d93c
commit d58d696ca4
8 changed files with 55 additions and 50 deletions

View File

@ -8,6 +8,7 @@ import it.cavallium.dbengine.client.query.current.data.NoSort;
import it.cavallium.dbengine.client.query.current.data.Query;
import it.cavallium.dbengine.client.query.current.data.QueryParams;
import it.cavallium.dbengine.client.query.current.data.QueryParamsBuilder;
import java.time.Duration;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
@ -18,7 +19,8 @@ public final record ClientQueryParams(@Nullable CompositeSnapshot snapshot,
long limit,
@Nullable Float minCompetitiveScore,
@Nullable Sort sort,
boolean computePreciseHitsCount) {
boolean computePreciseHitsCount,
@NotNull Duration timeout) {
public static ClientQueryParamsBuilder builder() {
return ClientQueryParamsBuilder
@ -28,6 +30,7 @@ public final record ClientQueryParams(@Nullable CompositeSnapshot snapshot,
.limit(Long.MAX_VALUE)
.minCompetitiveScore(null)
.sort(null)
.timeout(Duration.ofSeconds(10))
.computePreciseHitsCount(true);
}
@ -44,6 +47,7 @@ public final record ClientQueryParams(@Nullable CompositeSnapshot snapshot,
.offset(offset())
.limit(limit())
.computePreciseHitsCount(computePreciseHitsCount())
.timeoutMilliseconds(timeout.toMillis())
.build();
}
}

View File

@ -437,10 +437,9 @@ public class LLUtils {
/**
* @return null if size is equal to RocksDB.NOT_FOUND
*/
@SuppressWarnings("ConstantConditions")
@Nullable
public static Buffer readNullableDirectNioBuffer(BufferAllocator alloc, ToIntFunction<ByteBuffer> reader) {
var directBuffer = LLUtils.allocateShared(INITIAL_DIRECT_READ_BYTE_BUF_SIZE_BYTES);
var directBuffer = allocateShared(INITIAL_DIRECT_READ_BYTE_BUF_SIZE_BYTES);
assert directBuffer.readerOffset() == 0;
assert directBuffer.writerOffset() == 0;
var directBufferWriter = ((WritableComponent) directBuffer).writableBuffer();
@ -464,7 +463,7 @@ public class LLUtils {
directBufferWriter = ((WritableComponent) directBuffer).writableBuffer();
assert directBufferWriter.position() == 0;
assert directBufferWriter.isDirect();
reader.applyAsInt(directBufferWriter);
reader.applyAsInt(directBufferWriter.position(0));
return directBuffer.writerOffset(trueSize);
}
} catch (Throwable t) {

View File

@ -110,13 +110,13 @@ public sealed abstract class AbstractRocksDBColumn<T extends RocksDB> implements
if (nettyDirect) {
// Get the key nio buffer to pass to RocksDB
ByteBuffer keyNioBuffer = LLUtils.asReadOnlyDirect(key);
assert keyNioBuffer.isDirect();
boolean mustCloseKey;
if (keyNioBuffer == null) {
mustCloseKey = true;
// If the nio buffer is not available, copy the netty buffer into a new direct buffer
keyNioBuffer = LLUtils.copyToNewDirectBuffer(key);
} else {
assert keyNioBuffer.isDirect();
mustCloseKey = false;
}
assert keyNioBuffer.limit() == key.readableBytes();
@ -144,7 +144,7 @@ public sealed abstract class AbstractRocksDBColumn<T extends RocksDB> implements
assert keyMayExistValueLength == 0;
resultWritable.clear();
// real data size
size = db.get(cfh, readOptions, keyNioBuffer, resultWritable);
size = db.get(cfh, readOptions, keyNioBuffer.position(0), resultWritable);
if (size == RocksDB.NOT_FOUND) {
resultBuffer.close();
return null;
@ -164,7 +164,7 @@ public sealed abstract class AbstractRocksDBColumn<T extends RocksDB> implements
assert resultBuffer.readerOffset() == 0;
assert resultBuffer.writerOffset() == 0;
size = db.get(cfh, readOptions, keyNioBuffer, resultWritable);
size = db.get(cfh, readOptions, keyNioBuffer.position(0), resultWritable);
if (size == RocksDB.NOT_FOUND) {
resultBuffer.close();
return null;
@ -301,7 +301,7 @@ public sealed abstract class AbstractRocksDBColumn<T extends RocksDB> implements
}
try {
if (db.keyMayExist(cfh, keyNioBuffer)) {
int size = db.get(cfh, readOptions, keyNioBuffer, LLUtils.EMPTY_BYTE_BUFFER);
int size = db.get(cfh, readOptions, keyNioBuffer.position(0), LLUtils.EMPTY_BYTE_BUFFER);
return size != RocksDB.NOT_FOUND;
} else {
return false;

View File

@ -3,6 +3,7 @@ package it.cavallium.dbengine.database.disk;
import static io.net5.buffer.Unpooled.wrappedBuffer;
import static io.net5.buffer.api.StandardAllocationTypes.OFF_HEAP;
import static it.cavallium.dbengine.database.LLUtils.MARKER_ROCKSDB;
import static it.cavallium.dbengine.database.LLUtils.asReadOnlyDirect;
import static it.cavallium.dbengine.database.LLUtils.fromByteArray;
import static java.util.Objects.requireNonNull;
import static java.util.Objects.requireNonNullElse;
@ -258,7 +259,7 @@ public class LLLocalDictionary implements LLDictionary {
readOpts.setVerifyChecksums(VERIFY_CHECKSUMS_WHEN_NOT_NEEDED);
readOpts.setFillCache(false);
if (range.hasMin()) {
var rangeMinInternalByteBuffer = LLUtils.asReadOnlyDirect(range.getMinUnsafe());
var rangeMinInternalByteBuffer = asReadOnlyDirect(range.getMinUnsafe());
if (nettyDirect && rangeMinInternalByteBuffer != null) {
readOpts.setIterateLowerBound(slice1 = new DirectSlice(rangeMinInternalByteBuffer,
range.getMinUnsafe().readableBytes()));
@ -267,7 +268,7 @@ public class LLLocalDictionary implements LLDictionary {
}
}
if (range.hasMax()) {
var rangeMaxInternalByteBuffer = LLUtils.asReadOnlyDirect(range.getMaxUnsafe());
var rangeMaxInternalByteBuffer = asReadOnlyDirect(range.getMaxUnsafe());
if (nettyDirect && rangeMaxInternalByteBuffer != null) {
readOpts.setIterateUpperBound(slice2 = new DirectSlice(rangeMaxInternalByteBuffer,
range.getMaxUnsafe().readableBytes()));
@ -277,7 +278,7 @@ public class LLLocalDictionary implements LLDictionary {
}
try (RocksIterator rocksIterator = db.newIterator(readOpts)) {
if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) {
var rangeMinInternalByteBuffer = LLUtils.asReadOnlyDirect(range.getMinUnsafe());
var rangeMinInternalByteBuffer = asReadOnlyDirect(range.getMinUnsafe());
if (nettyDirect && rangeMinInternalByteBuffer != null) {
rocksIterator.seek(rangeMinInternalByteBuffer);
} else {
@ -828,7 +829,7 @@ public class LLLocalDictionary implements LLDictionary {
}
ro.setVerifyChecksums(true);
var rocksIteratorTuple = getRocksIterator(alloc,
nettyDirect, ro, range.send(), db
nettyDirect, ro, range, db
);
try {
try (var rocksIterator = rocksIteratorTuple.getT1()) {
@ -1193,9 +1194,11 @@ public class LLLocalDictionary implements LLDictionary {
private static SafeCloseable rocksIterSeekTo(boolean allowNettyDirect,
RocksIterator rocksIterator, Buffer key) {
ByteBuffer keyInternalByteBuffer;
if (allowNettyDirect && (keyInternalByteBuffer = LLUtils.asReadOnlyDirect(key)) != null) {
if (allowNettyDirect && (keyInternalByteBuffer = asReadOnlyDirect(key)) != null) {
assert keyInternalByteBuffer.position() == 0;
rocksIterator.seek(keyInternalByteBuffer);
return null;
// This is useful to retain the key buffer in memory and avoid deallocations
return key::isAccessible;
} else {
rocksIterator.seek(LLUtils.toArray(key));
return null;
@ -1208,7 +1211,8 @@ public class LLLocalDictionary implements LLDictionary {
AbstractSlice<?> slice;
ByteBuffer keyInternalByteBuffer;
if (allowNettyDirect && LLLocalDictionary.USE_DIRECT_BUFFER_BOUNDS
&& (keyInternalByteBuffer = LLUtils.asReadOnlyDirect(key)) != null) {
&& (keyInternalByteBuffer = asReadOnlyDirect(key)) != null) {
assert keyInternalByteBuffer.position() == 0;
slice = new DirectSlice(keyInternalByteBuffer, key.readableBytes());
assert slice.size() == key.readableBytes();
assert slice.compare(new Slice(LLUtils.toArray(key))) == 0;
@ -1217,7 +1221,7 @@ public class LLLocalDictionary implements LLDictionary {
} else {
readOpts.setIterateUpperBound(slice);
}
return new ReleasableSliceImpl(slice, null, null);
return new ReleasableSliceImpl(slice, null, key);
} else {
slice = new Slice(requireNonNull(LLUtils.toArray(key)));
if (boundType == IterateBound.LOWER) {
@ -1679,34 +1683,32 @@ public class LLLocalDictionary implements LLDictionary {
public static Tuple4<RocksIterator, ReleasableSlice, ReleasableSlice, SafeCloseable> getRocksIterator(BufferAllocator alloc,
boolean allowNettyDirect,
ReadOptions readOptions,
Send<LLRange> rangeToReceive,
LLRange range,
RocksDBColumn db) {
try (var range = rangeToReceive.receive()) {
if (Schedulers.isInNonBlockingThread()) {
throw new UnsupportedOperationException("Called getRocksIterator in a nonblocking thread");
}
ReleasableSlice sliceMin;
ReleasableSlice sliceMax;
if (range.hasMin()) {
sliceMin = setIterateBound(allowNettyDirect, readOptions, IterateBound.LOWER, range.getMinUnsafe());
} else {
sliceMin = emptyReleasableSlice();
}
if (range.hasMax()) {
sliceMax = setIterateBound(allowNettyDirect, readOptions, IterateBound.UPPER, range.getMaxUnsafe());
} else {
sliceMax = emptyReleasableSlice();
}
var rocksIterator = db.newIterator(readOptions);
SafeCloseable seekTo;
if (!PREFER_SEEK_TO_FIRST && range.hasMin()) {
seekTo = Objects.requireNonNullElseGet(rocksIterSeekTo(allowNettyDirect, rocksIterator, range.getMinUnsafe()),
() -> ((SafeCloseable) () -> {}));
} else {
seekTo = () -> {};
rocksIterator.seekToFirst();
}
return Tuples.of(rocksIterator, sliceMin, sliceMax, seekTo);
if (Schedulers.isInNonBlockingThread()) {
throw new UnsupportedOperationException("Called getRocksIterator in a nonblocking thread");
}
ReleasableSlice sliceMin;
ReleasableSlice sliceMax;
if (range.hasMin()) {
sliceMin = setIterateBound(allowNettyDirect, readOptions, IterateBound.LOWER, range.getMinUnsafe());
} else {
sliceMin = emptyReleasableSlice();
}
if (range.hasMax()) {
sliceMax = setIterateBound(allowNettyDirect, readOptions, IterateBound.UPPER, range.getMaxUnsafe());
} else {
sliceMax = emptyReleasableSlice();
}
var rocksIterator = db.newIterator(readOptions);
SafeCloseable seekTo;
if (!PREFER_SEEK_TO_FIRST && range.hasMin()) {
seekTo = Objects.requireNonNullElseGet(rocksIterSeekTo(allowNettyDirect, rocksIterator, range.getMinUnsafe()),
() -> ((SafeCloseable) () -> {}));
} else {
seekTo = () -> {};
rocksIterator.seekToFirst();
}
return Tuples.of(rocksIterator, sliceMin, sliceMax, seekTo);
}
}

View File

@ -91,7 +91,7 @@ public abstract class LLLocalGroupedReactiveRocksIterator<T> extends
if (logger.isTraceEnabled()) {
logger.trace(MARKER_ROCKSDB, "Range {} started", LLUtils.toStringSafe(range));
}
return LLLocalDictionary.getRocksIterator(db.getAllocator(), allowNettyDirect, readOptions, range.copy().send(), db);
return LLLocalDictionary.getRocksIterator(db.getAllocator(), allowNettyDirect, readOptions, range, db);
}, (tuple, sink) -> {
try {
var rocksIterator = tuple.getT1();

View File

@ -93,7 +93,7 @@ public class LLLocalKeyPrefixReactiveRocksIterator extends
if (logger.isTraceEnabled()) {
logger.trace(MARKER_ROCKSDB, "Range {} started", LLUtils.toStringSafe(range));
}
return LLLocalDictionary.getRocksIterator(db.getAllocator(), allowNettyDirect, readOptions, rangeSend, db);
return LLLocalDictionary.getRocksIterator(db.getAllocator(), allowNettyDirect, readOptions, range, db);
}, (tuple, sink) -> {
try {
var rocksIterator = tuple.getT1();

View File

@ -83,7 +83,7 @@ public abstract class LLLocalReactiveRocksIterator<T> extends
if (logger.isTraceEnabled()) {
logger.trace(MARKER_ROCKSDB, "Range {} started", LLUtils.toStringSafe(range));
}
return getRocksIterator(db.getAllocator(), allowNettyDirect, readOptions, range.copy().send(), db);
return getRocksIterator(db.getAllocator(), allowNettyDirect, readOptions, range, db);
}, (tuple, sink) -> {
try {
var rocksIterator = tuple.getT1();

View File

@ -1,5 +1,6 @@
package org.rocksdb;
import static it.cavallium.dbengine.database.LLUtils.asReadOnlyDirect;
import static it.cavallium.dbengine.database.LLUtils.isDirect;
import io.net5.buffer.api.Buffer;
@ -111,8 +112,8 @@ public class CappedWriteBatch extends WriteBatch {
ByteBuffer keyNioBuffer;
ByteBuffer valueNioBuffer;
if (USE_FAST_DIRECT_BUFFERS
&& (keyNioBuffer = LLUtils.asReadOnlyDirect(key)) != null
&& (valueNioBuffer = LLUtils.asReadOnlyDirect(value)) != null) {
&& (keyNioBuffer = asReadOnlyDirect(key)) != null
&& (valueNioBuffer = asReadOnlyDirect(value)) != null) {
buffersToRelease.add(value);
buffersToRelease.add(key);
@ -171,7 +172,7 @@ public class CappedWriteBatch extends WriteBatch {
public synchronized void delete(ColumnFamilyHandle columnFamilyHandle, Send<Buffer> keyToReceive) throws RocksDBException {
var key = keyToReceive.receive();
ByteBuffer keyNioBuffer;
if (USE_FAST_DIRECT_BUFFERS && (keyNioBuffer = LLUtils.asReadOnlyDirect(key)) != null) {
if (USE_FAST_DIRECT_BUFFERS && (keyNioBuffer = asReadOnlyDirect(key)) != null) {
buffersToRelease.add(key);
removeDirect(nativeHandle_,
keyNioBuffer,
@ -179,7 +180,6 @@ public class CappedWriteBatch extends WriteBatch {
keyNioBuffer.remaining(),
columnFamilyHandle.nativeHandle_
);
keyNioBuffer.position(keyNioBuffer.limit());
} else {
try {
super.delete(columnFamilyHandle, LLUtils.toArray(key));