From d58d696ca4af82f869eb3821d3154304621f7a4b Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Mon, 13 Dec 2021 01:57:37 +0100 Subject: [PATCH] Bugfixes --- .../client/query/ClientQueryParams.java | 6 +- .../cavallium/dbengine/database/LLUtils.java | 5 +- .../database/disk/AbstractRocksDBColumn.java | 8 +-- .../database/disk/LLLocalDictionary.java | 72 ++++++++++--------- .../LLLocalGroupedReactiveRocksIterator.java | 2 +- ...LLLocalKeyPrefixReactiveRocksIterator.java | 2 +- .../disk/LLLocalReactiveRocksIterator.java | 2 +- .../java/org/rocksdb/CappedWriteBatch.java | 8 +-- 8 files changed, 55 insertions(+), 50 deletions(-) diff --git a/src/main/java/it/cavallium/dbengine/client/query/ClientQueryParams.java b/src/main/java/it/cavallium/dbengine/client/query/ClientQueryParams.java index 929d422..594444d 100644 --- a/src/main/java/it/cavallium/dbengine/client/query/ClientQueryParams.java +++ b/src/main/java/it/cavallium/dbengine/client/query/ClientQueryParams.java @@ -8,6 +8,7 @@ import it.cavallium.dbengine.client.query.current.data.NoSort; import it.cavallium.dbengine.client.query.current.data.Query; import it.cavallium.dbengine.client.query.current.data.QueryParams; import it.cavallium.dbengine.client.query.current.data.QueryParamsBuilder; +import java.time.Duration; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -18,7 +19,8 @@ public final record ClientQueryParams(@Nullable CompositeSnapshot snapshot, long limit, @Nullable Float minCompetitiveScore, @Nullable Sort sort, - boolean computePreciseHitsCount) { + boolean computePreciseHitsCount, + @NotNull Duration timeout) { public static ClientQueryParamsBuilder builder() { return ClientQueryParamsBuilder @@ -28,6 +30,7 @@ public final record ClientQueryParams(@Nullable CompositeSnapshot snapshot, .limit(Long.MAX_VALUE) .minCompetitiveScore(null) .sort(null) + .timeout(Duration.ofSeconds(10)) .computePreciseHitsCount(true); } @@ -44,6 +47,7 @@ public final record ClientQueryParams(@Nullable CompositeSnapshot snapshot, .offset(offset()) .limit(limit()) .computePreciseHitsCount(computePreciseHitsCount()) + .timeoutMilliseconds(timeout.toMillis()) .build(); } } diff --git a/src/main/java/it/cavallium/dbengine/database/LLUtils.java b/src/main/java/it/cavallium/dbengine/database/LLUtils.java index 0406942..b27b8d5 100644 --- a/src/main/java/it/cavallium/dbengine/database/LLUtils.java +++ b/src/main/java/it/cavallium/dbengine/database/LLUtils.java @@ -437,10 +437,9 @@ public class LLUtils { /** * @return null if size is equal to RocksDB.NOT_FOUND */ - @SuppressWarnings("ConstantConditions") @Nullable public static Buffer readNullableDirectNioBuffer(BufferAllocator alloc, ToIntFunction reader) { - var directBuffer = LLUtils.allocateShared(INITIAL_DIRECT_READ_BYTE_BUF_SIZE_BYTES); + var directBuffer = allocateShared(INITIAL_DIRECT_READ_BYTE_BUF_SIZE_BYTES); assert directBuffer.readerOffset() == 0; assert directBuffer.writerOffset() == 0; var directBufferWriter = ((WritableComponent) directBuffer).writableBuffer(); @@ -464,7 +463,7 @@ public class LLUtils { directBufferWriter = ((WritableComponent) directBuffer).writableBuffer(); assert directBufferWriter.position() == 0; assert directBufferWriter.isDirect(); - reader.applyAsInt(directBufferWriter); + reader.applyAsInt(directBufferWriter.position(0)); return directBuffer.writerOffset(trueSize); } } catch (Throwable t) { diff --git a/src/main/java/it/cavallium/dbengine/database/disk/AbstractRocksDBColumn.java b/src/main/java/it/cavallium/dbengine/database/disk/AbstractRocksDBColumn.java index 19c1682..e9406d0 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/AbstractRocksDBColumn.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/AbstractRocksDBColumn.java @@ -110,13 +110,13 @@ public sealed abstract class AbstractRocksDBColumn implements if (nettyDirect) { // Get the key nio buffer to pass to RocksDB ByteBuffer keyNioBuffer = LLUtils.asReadOnlyDirect(key); - assert keyNioBuffer.isDirect(); boolean mustCloseKey; if (keyNioBuffer == null) { mustCloseKey = true; // If the nio buffer is not available, copy the netty buffer into a new direct buffer keyNioBuffer = LLUtils.copyToNewDirectBuffer(key); } else { + assert keyNioBuffer.isDirect(); mustCloseKey = false; } assert keyNioBuffer.limit() == key.readableBytes(); @@ -144,7 +144,7 @@ public sealed abstract class AbstractRocksDBColumn implements assert keyMayExistValueLength == 0; resultWritable.clear(); // real data size - size = db.get(cfh, readOptions, keyNioBuffer, resultWritable); + size = db.get(cfh, readOptions, keyNioBuffer.position(0), resultWritable); if (size == RocksDB.NOT_FOUND) { resultBuffer.close(); return null; @@ -164,7 +164,7 @@ public sealed abstract class AbstractRocksDBColumn implements assert resultBuffer.readerOffset() == 0; assert resultBuffer.writerOffset() == 0; - size = db.get(cfh, readOptions, keyNioBuffer, resultWritable); + size = db.get(cfh, readOptions, keyNioBuffer.position(0), resultWritable); if (size == RocksDB.NOT_FOUND) { resultBuffer.close(); return null; @@ -301,7 +301,7 @@ public sealed abstract class AbstractRocksDBColumn implements } try { if (db.keyMayExist(cfh, keyNioBuffer)) { - int size = db.get(cfh, readOptions, keyNioBuffer, LLUtils.EMPTY_BYTE_BUFFER); + int size = db.get(cfh, readOptions, keyNioBuffer.position(0), LLUtils.EMPTY_BYTE_BUFFER); return size != RocksDB.NOT_FOUND; } else { return false; diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java index a195aaf..fa9fe5b 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java @@ -3,6 +3,7 @@ package it.cavallium.dbengine.database.disk; import static io.net5.buffer.Unpooled.wrappedBuffer; import static io.net5.buffer.api.StandardAllocationTypes.OFF_HEAP; import static it.cavallium.dbengine.database.LLUtils.MARKER_ROCKSDB; +import static it.cavallium.dbengine.database.LLUtils.asReadOnlyDirect; import static it.cavallium.dbengine.database.LLUtils.fromByteArray; import static java.util.Objects.requireNonNull; import static java.util.Objects.requireNonNullElse; @@ -258,7 +259,7 @@ public class LLLocalDictionary implements LLDictionary { readOpts.setVerifyChecksums(VERIFY_CHECKSUMS_WHEN_NOT_NEEDED); readOpts.setFillCache(false); if (range.hasMin()) { - var rangeMinInternalByteBuffer = LLUtils.asReadOnlyDirect(range.getMinUnsafe()); + var rangeMinInternalByteBuffer = asReadOnlyDirect(range.getMinUnsafe()); if (nettyDirect && rangeMinInternalByteBuffer != null) { readOpts.setIterateLowerBound(slice1 = new DirectSlice(rangeMinInternalByteBuffer, range.getMinUnsafe().readableBytes())); @@ -267,7 +268,7 @@ public class LLLocalDictionary implements LLDictionary { } } if (range.hasMax()) { - var rangeMaxInternalByteBuffer = LLUtils.asReadOnlyDirect(range.getMaxUnsafe()); + var rangeMaxInternalByteBuffer = asReadOnlyDirect(range.getMaxUnsafe()); if (nettyDirect && rangeMaxInternalByteBuffer != null) { readOpts.setIterateUpperBound(slice2 = new DirectSlice(rangeMaxInternalByteBuffer, range.getMaxUnsafe().readableBytes())); @@ -277,7 +278,7 @@ public class LLLocalDictionary implements LLDictionary { } try (RocksIterator rocksIterator = db.newIterator(readOpts)) { if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) { - var rangeMinInternalByteBuffer = LLUtils.asReadOnlyDirect(range.getMinUnsafe()); + var rangeMinInternalByteBuffer = asReadOnlyDirect(range.getMinUnsafe()); if (nettyDirect && rangeMinInternalByteBuffer != null) { rocksIterator.seek(rangeMinInternalByteBuffer); } else { @@ -828,7 +829,7 @@ public class LLLocalDictionary implements LLDictionary { } ro.setVerifyChecksums(true); var rocksIteratorTuple = getRocksIterator(alloc, - nettyDirect, ro, range.send(), db + nettyDirect, ro, range, db ); try { try (var rocksIterator = rocksIteratorTuple.getT1()) { @@ -1193,9 +1194,11 @@ public class LLLocalDictionary implements LLDictionary { private static SafeCloseable rocksIterSeekTo(boolean allowNettyDirect, RocksIterator rocksIterator, Buffer key) { ByteBuffer keyInternalByteBuffer; - if (allowNettyDirect && (keyInternalByteBuffer = LLUtils.asReadOnlyDirect(key)) != null) { + if (allowNettyDirect && (keyInternalByteBuffer = asReadOnlyDirect(key)) != null) { + assert keyInternalByteBuffer.position() == 0; rocksIterator.seek(keyInternalByteBuffer); - return null; + // This is useful to retain the key buffer in memory and avoid deallocations + return key::isAccessible; } else { rocksIterator.seek(LLUtils.toArray(key)); return null; @@ -1208,7 +1211,8 @@ public class LLLocalDictionary implements LLDictionary { AbstractSlice slice; ByteBuffer keyInternalByteBuffer; if (allowNettyDirect && LLLocalDictionary.USE_DIRECT_BUFFER_BOUNDS - && (keyInternalByteBuffer = LLUtils.asReadOnlyDirect(key)) != null) { + && (keyInternalByteBuffer = asReadOnlyDirect(key)) != null) { + assert keyInternalByteBuffer.position() == 0; slice = new DirectSlice(keyInternalByteBuffer, key.readableBytes()); assert slice.size() == key.readableBytes(); assert slice.compare(new Slice(LLUtils.toArray(key))) == 0; @@ -1217,7 +1221,7 @@ public class LLLocalDictionary implements LLDictionary { } else { readOpts.setIterateUpperBound(slice); } - return new ReleasableSliceImpl(slice, null, null); + return new ReleasableSliceImpl(slice, null, key); } else { slice = new Slice(requireNonNull(LLUtils.toArray(key))); if (boundType == IterateBound.LOWER) { @@ -1679,34 +1683,32 @@ public class LLLocalDictionary implements LLDictionary { public static Tuple4 getRocksIterator(BufferAllocator alloc, boolean allowNettyDirect, ReadOptions readOptions, - Send rangeToReceive, + LLRange range, RocksDBColumn db) { - try (var range = rangeToReceive.receive()) { - if (Schedulers.isInNonBlockingThread()) { - throw new UnsupportedOperationException("Called getRocksIterator in a nonblocking thread"); - } - ReleasableSlice sliceMin; - ReleasableSlice sliceMax; - if (range.hasMin()) { - sliceMin = setIterateBound(allowNettyDirect, readOptions, IterateBound.LOWER, range.getMinUnsafe()); - } else { - sliceMin = emptyReleasableSlice(); - } - if (range.hasMax()) { - sliceMax = setIterateBound(allowNettyDirect, readOptions, IterateBound.UPPER, range.getMaxUnsafe()); - } else { - sliceMax = emptyReleasableSlice(); - } - var rocksIterator = db.newIterator(readOptions); - SafeCloseable seekTo; - if (!PREFER_SEEK_TO_FIRST && range.hasMin()) { - seekTo = Objects.requireNonNullElseGet(rocksIterSeekTo(allowNettyDirect, rocksIterator, range.getMinUnsafe()), - () -> ((SafeCloseable) () -> {})); - } else { - seekTo = () -> {}; - rocksIterator.seekToFirst(); - } - return Tuples.of(rocksIterator, sliceMin, sliceMax, seekTo); + if (Schedulers.isInNonBlockingThread()) { + throw new UnsupportedOperationException("Called getRocksIterator in a nonblocking thread"); } + ReleasableSlice sliceMin; + ReleasableSlice sliceMax; + if (range.hasMin()) { + sliceMin = setIterateBound(allowNettyDirect, readOptions, IterateBound.LOWER, range.getMinUnsafe()); + } else { + sliceMin = emptyReleasableSlice(); + } + if (range.hasMax()) { + sliceMax = setIterateBound(allowNettyDirect, readOptions, IterateBound.UPPER, range.getMaxUnsafe()); + } else { + sliceMax = emptyReleasableSlice(); + } + var rocksIterator = db.newIterator(readOptions); + SafeCloseable seekTo; + if (!PREFER_SEEK_TO_FIRST && range.hasMin()) { + seekTo = Objects.requireNonNullElseGet(rocksIterSeekTo(allowNettyDirect, rocksIterator, range.getMinUnsafe()), + () -> ((SafeCloseable) () -> {})); + } else { + seekTo = () -> {}; + rocksIterator.seekToFirst(); + } + return Tuples.of(rocksIterator, sliceMin, sliceMax, seekTo); } } diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalGroupedReactiveRocksIterator.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalGroupedReactiveRocksIterator.java index 944932e..13b98f3 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalGroupedReactiveRocksIterator.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalGroupedReactiveRocksIterator.java @@ -91,7 +91,7 @@ public abstract class LLLocalGroupedReactiveRocksIterator extends if (logger.isTraceEnabled()) { logger.trace(MARKER_ROCKSDB, "Range {} started", LLUtils.toStringSafe(range)); } - return LLLocalDictionary.getRocksIterator(db.getAllocator(), allowNettyDirect, readOptions, range.copy().send(), db); + return LLLocalDictionary.getRocksIterator(db.getAllocator(), allowNettyDirect, readOptions, range, db); }, (tuple, sink) -> { try { var rocksIterator = tuple.getT1(); diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyPrefixReactiveRocksIterator.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyPrefixReactiveRocksIterator.java index ce41e7f..d7f0c22 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyPrefixReactiveRocksIterator.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyPrefixReactiveRocksIterator.java @@ -93,7 +93,7 @@ public class LLLocalKeyPrefixReactiveRocksIterator extends if (logger.isTraceEnabled()) { logger.trace(MARKER_ROCKSDB, "Range {} started", LLUtils.toStringSafe(range)); } - return LLLocalDictionary.getRocksIterator(db.getAllocator(), allowNettyDirect, readOptions, rangeSend, db); + return LLLocalDictionary.getRocksIterator(db.getAllocator(), allowNettyDirect, readOptions, range, db); }, (tuple, sink) -> { try { var rocksIterator = tuple.getT1(); diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalReactiveRocksIterator.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalReactiveRocksIterator.java index 7ca5330..814c924 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalReactiveRocksIterator.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalReactiveRocksIterator.java @@ -83,7 +83,7 @@ public abstract class LLLocalReactiveRocksIterator extends if (logger.isTraceEnabled()) { logger.trace(MARKER_ROCKSDB, "Range {} started", LLUtils.toStringSafe(range)); } - return getRocksIterator(db.getAllocator(), allowNettyDirect, readOptions, range.copy().send(), db); + return getRocksIterator(db.getAllocator(), allowNettyDirect, readOptions, range, db); }, (tuple, sink) -> { try { var rocksIterator = tuple.getT1(); diff --git a/src/main/java/org/rocksdb/CappedWriteBatch.java b/src/main/java/org/rocksdb/CappedWriteBatch.java index 7b762f3..b8f96c1 100644 --- a/src/main/java/org/rocksdb/CappedWriteBatch.java +++ b/src/main/java/org/rocksdb/CappedWriteBatch.java @@ -1,5 +1,6 @@ package org.rocksdb; +import static it.cavallium.dbengine.database.LLUtils.asReadOnlyDirect; import static it.cavallium.dbengine.database.LLUtils.isDirect; import io.net5.buffer.api.Buffer; @@ -111,8 +112,8 @@ public class CappedWriteBatch extends WriteBatch { ByteBuffer keyNioBuffer; ByteBuffer valueNioBuffer; if (USE_FAST_DIRECT_BUFFERS - && (keyNioBuffer = LLUtils.asReadOnlyDirect(key)) != null - && (valueNioBuffer = LLUtils.asReadOnlyDirect(value)) != null) { + && (keyNioBuffer = asReadOnlyDirect(key)) != null + && (valueNioBuffer = asReadOnlyDirect(value)) != null) { buffersToRelease.add(value); buffersToRelease.add(key); @@ -171,7 +172,7 @@ public class CappedWriteBatch extends WriteBatch { public synchronized void delete(ColumnFamilyHandle columnFamilyHandle, Send keyToReceive) throws RocksDBException { var key = keyToReceive.receive(); ByteBuffer keyNioBuffer; - if (USE_FAST_DIRECT_BUFFERS && (keyNioBuffer = LLUtils.asReadOnlyDirect(key)) != null) { + if (USE_FAST_DIRECT_BUFFERS && (keyNioBuffer = asReadOnlyDirect(key)) != null) { buffersToRelease.add(key); removeDirect(nativeHandle_, keyNioBuffer, @@ -179,7 +180,6 @@ public class CappedWriteBatch extends WriteBatch { keyNioBuffer.remaining(), columnFamilyHandle.nativeHandle_ ); - keyNioBuffer.position(keyNioBuffer.limit()); } else { try { super.delete(columnFamilyHandle, LLUtils.toArray(key));