This commit is contained in:
Andrea Cavalli 2021-09-01 00:01:56 +02:00
parent 013d26387d
commit ff7823656e
14 changed files with 459 additions and 267 deletions

View File

@ -3,11 +3,15 @@ package it.cavallium.dbengine.database;
import com.google.common.primitives.Ints; import com.google.common.primitives.Ints;
import com.google.common.primitives.Longs; import com.google.common.primitives.Longs;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.buffer.UnpooledDirectByteBuf;
import io.netty.buffer.api.Buffer; import io.netty.buffer.api.Buffer;
import io.netty.buffer.api.BufferAllocator; import io.netty.buffer.api.BufferAllocator;
import io.netty.buffer.api.CompositeBuffer; import io.netty.buffer.api.CompositeBuffer;
import io.netty.buffer.api.MemoryManager;
import io.netty.buffer.api.Send; import io.netty.buffer.api.Send;
import io.netty.buffer.api.unsafe.UnsafeMemoryManager;
import io.netty.util.IllegalReferenceCountException; import io.netty.util.IllegalReferenceCountException;
import io.netty.util.internal.PlatformDependent;
import it.cavallium.dbengine.database.serialization.SerializationException; import it.cavallium.dbengine.database.serialization.SerializationException;
import it.cavallium.dbengine.database.serialization.SerializationFunction; import it.cavallium.dbengine.database.serialization.SerializationFunction;
import it.cavallium.dbengine.lucene.RandomSortField; import it.cavallium.dbengine.lucene.RandomSortField;
@ -47,6 +51,8 @@ import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import reactor.util.function.Tuple2; import reactor.util.function.Tuple2;
import reactor.util.function.Tuple3; import reactor.util.function.Tuple3;
import reactor.util.function.Tuples;
import sun.misc.Unsafe;
@SuppressWarnings("unused") @SuppressWarnings("unused")
public class LLUtils { public class LLUtils {
@ -284,23 +290,36 @@ public class LLUtils {
* *
* @return null if size is equal to RocksDB.NOT_FOUND * @return null if size is equal to RocksDB.NOT_FOUND
*/ */
@SuppressWarnings("ConstantConditions")
@Nullable @Nullable
public static Buffer readNullableDirectNioBuffer(BufferAllocator alloc, ToIntFunction<ByteBuffer> reader) { public static Send<Buffer> readNullableDirectNioBuffer(BufferAllocator alloc, ToIntFunction<ByteBuffer> reader) {
Buffer buffer = alloc.allocate(4096); ByteBuffer directBuffer;
ByteBuffer nioBuffer; Buffer buffer;
{
var direct = LLUtils.newDirect(alloc, 4096);
directBuffer = direct.byteBuffer();
buffer = direct.buffer().receive();
}
try {
int size; int size;
do { do {
nioBuffer = LLUtils.toDirect(buffer); directBuffer.limit(directBuffer.capacity());
nioBuffer.limit(nioBuffer.capacity()); assert directBuffer.isDirect();
assert nioBuffer.isDirect(); size = reader.applyAsInt(directBuffer);
size = reader.applyAsInt(nioBuffer);
if (size != RocksDB.NOT_FOUND) { if (size != RocksDB.NOT_FOUND) {
if (size == nioBuffer.limit()) { if (size == directBuffer.limit()) {
buffer.readerOffset(0).writerOffset(size); buffer.readerOffset(0).writerOffset(size);
return buffer; return buffer.send();
} else { } else {
assert size > nioBuffer.limit(); assert size > directBuffer.limit();
assert nioBuffer.limit() > 0; assert directBuffer.limit() > 0;
// Free the buffer
if (directBuffer != null) {
// todo: check if free is needed
PlatformDependent.freeDirectBuffer(directBuffer);
directBuffer = null;
}
directBuffer = LLUtils.obtainDirect(buffer);
buffer.ensureWritable(size); buffer.ensureWritable(size);
} }
} }
@ -308,45 +327,54 @@ public class LLUtils {
// Return null if size is equal to RocksDB.NOT_FOUND // Return null if size is equal to RocksDB.NOT_FOUND
return null; return null;
} finally {
// Free the buffer
if (directBuffer != null) {
// todo: check if free is needed
PlatformDependent.freeDirectBuffer(directBuffer);
directBuffer = null;
}
buffer.close();
}
} }
@Nullable public static record DirectBuffer(@NotNull Send<Buffer> buffer, @NotNull ByteBuffer byteBuffer) {}
public static ByteBuffer toDirectFast(Buffer buffer) {
int readableComponents = buffer.countReadableComponents();
if (readableComponents > 0) {
AtomicReference<ByteBuffer> byteBufferReference = new AtomicReference<>(null);
buffer.forEachReadable(0, (index, component) -> {
byteBufferReference.setPlain(component.readableBuffer());
return false;
});
ByteBuffer byteBuffer = byteBufferReference.getPlain();
if (byteBuffer != null && byteBuffer.isDirect()) {
byteBuffer.limit(buffer.writerOffset());
assert byteBuffer.isDirect(); @NotNull
assert byteBuffer.capacity() == buffer.capacity(); public static DirectBuffer newDirect(BufferAllocator allocator, int size) {
assert buffer.readerOffset() == byteBuffer.position(); try (var buf = allocator.allocate(size)) {
assert byteBuffer.limit() - byteBuffer.position() == buffer.readableBytes(); var direct = obtainDirect(buf);
return new DirectBuffer(buf.send(), direct);
}
}
return byteBuffer; @NotNull
public static DirectBuffer convertToDirect(BufferAllocator allocator, Send<Buffer> content) {
try (var buf = content.receive()) {
if (buf.nativeAddress() != 0) {
var direct = obtainDirect(buf);
return new DirectBuffer(buf.send(), direct);
} else { } else {
return null; var direct = newDirect(allocator, buf.readableBytes());
try (var buf2 = direct.buffer().receive()) {
buf.copyInto(buf.readerOffset(), buf2, buf2.writerOffset(), buf.readableBytes());
return new DirectBuffer(buf2.send(), direct.byteBuffer());
} }
} else if (readableComponents == 0) { }
}
}
@NotNull
public static ByteBuffer obtainDirect(Buffer buffer) {
assert buffer.isAccessible();
if (buffer.readableBytes() == 0) {
return EMPTY_BYTE_BUFFER; return EMPTY_BYTE_BUFFER;
} else {
return null;
} }
long nativeAddress;
if ((nativeAddress = buffer.nativeAddress()) == 0) {
throw new IllegalStateException("Buffer is not direct");
} }
return PlatformDependent.directBuffer(nativeAddress, buffer.capacity());
public static ByteBuffer toDirect(Buffer buffer) {
ByteBuffer result = toDirectFast(buffer);
if (result == null) {
throw new IllegalArgumentException("The supplied Buffer is not direct "
+ "(if it's a CompositeByteBuf it must be consolidated before)");
}
assert result.isDirect();
return result;
} }
public static Buffer fromByteArray(BufferAllocator alloc, byte[] array) { public static Buffer fromByteArray(BufferAllocator alloc, byte[] array) {
@ -356,12 +384,14 @@ public class LLUtils {
} }
@NotNull @NotNull
public static Buffer readDirectNioBuffer(BufferAllocator alloc, ToIntFunction<ByteBuffer> reader) { public static Send<Buffer> readDirectNioBuffer(BufferAllocator alloc, ToIntFunction<ByteBuffer> reader) {
var buffer = readNullableDirectNioBuffer(alloc, reader); var nullableSend = readNullableDirectNioBuffer(alloc, reader);
try (var buffer = nullableSend != null ? nullableSend.receive() : null) {
if (buffer == null) { if (buffer == null) {
throw new IllegalStateException("A non-nullable buffer read operation tried to return a \"not found\" element"); throw new IllegalStateException("A non-nullable buffer read operation tried to return a \"not found\" element");
} }
return buffer; return buffer.send();
}
} }
public static Send<Buffer> compositeBuffer(BufferAllocator alloc, Send<Buffer> buffer) { public static Send<Buffer> compositeBuffer(BufferAllocator alloc, Send<Buffer> buffer) {

View File

@ -40,10 +40,10 @@ public class DatabaseMapDictionaryHashed<T, U, TH> implements DatabaseStageMap<T
protected DatabaseMapDictionaryHashed(LLDictionary dictionary, protected DatabaseMapDictionaryHashed(LLDictionary dictionary,
Send<Buffer> prefixKey, Send<Buffer> prefixKey,
Serializer<T, Send<Buffer>> keySuffixSerializer, Serializer<T> keySuffixSerializer,
Serializer<U, Send<Buffer>> valueSerializer, Serializer<U> valueSerializer,
Function<T, TH> keySuffixHashFunction, Function<T, TH> keySuffixHashFunction,
SerializerFixedBinaryLength<TH, Send<Buffer>> keySuffixHashSerializer) { SerializerFixedBinaryLength<TH> keySuffixHashSerializer) {
if (dictionary.getUpdateMode().block() != UpdateMode.ALLOW) { if (dictionary.getUpdateMode().block() != UpdateMode.ALLOW) {
throw new IllegalArgumentException("Hashed maps only works when UpdateMode is ALLOW"); throw new IllegalArgumentException("Hashed maps only works when UpdateMode is ALLOW");
} }
@ -61,10 +61,10 @@ public class DatabaseMapDictionaryHashed<T, U, TH> implements DatabaseStageMap<T
} }
public static <T, U, UH> DatabaseMapDictionaryHashed<T, U, UH> simple(LLDictionary dictionary, public static <T, U, UH> DatabaseMapDictionaryHashed<T, U, UH> simple(LLDictionary dictionary,
Serializer<T, Send<Buffer>> keySerializer, Serializer<T> keySerializer,
Serializer<U, Send<Buffer>> valueSerializer, Serializer<U> valueSerializer,
Function<T, UH> keyHashFunction, Function<T, UH> keyHashFunction,
SerializerFixedBinaryLength<UH, Send<Buffer>> keyHashSerializer) { SerializerFixedBinaryLength<UH> keyHashSerializer) {
return new DatabaseMapDictionaryHashed<>( return new DatabaseMapDictionaryHashed<>(
dictionary, dictionary,
dictionary.getAllocator().allocate(0).send(), dictionary.getAllocator().allocate(0).send(),
@ -77,10 +77,10 @@ public class DatabaseMapDictionaryHashed<T, U, TH> implements DatabaseStageMap<T
public static <T, U, UH> DatabaseMapDictionaryHashed<T, U, UH> tail(LLDictionary dictionary, public static <T, U, UH> DatabaseMapDictionaryHashed<T, U, UH> tail(LLDictionary dictionary,
Send<Buffer> prefixKey, Send<Buffer> prefixKey,
Serializer<T, Send<Buffer>> keySuffixSerializer, Serializer<T> keySuffixSerializer,
Serializer<U, Send<Buffer>> valueSerializer, Serializer<U> valueSerializer,
Function<T, UH> keySuffixHashFunction, Function<T, UH> keySuffixHashFunction,
SerializerFixedBinaryLength<UH, Send<Buffer>> keySuffixHashSerializer) { SerializerFixedBinaryLength<UH> keySuffixHashSerializer) {
return new DatabaseMapDictionaryHashed<>(dictionary, return new DatabaseMapDictionaryHashed<>(dictionary,
prefixKey, prefixKey,
keySuffixSerializer, keySuffixSerializer,

View File

@ -19,9 +19,9 @@ public class DatabaseSetDictionaryHashed<T, TH> extends DatabaseMapDictionaryHas
protected DatabaseSetDictionaryHashed(LLDictionary dictionary, protected DatabaseSetDictionaryHashed(LLDictionary dictionary,
Send<Buffer> prefixKey, Send<Buffer> prefixKey,
Serializer<T, Send<Buffer>> keySuffixSerializer, Serializer<T> keySuffixSerializer,
Function<T, TH> keySuffixHashFunction, Function<T, TH> keySuffixHashFunction,
SerializerFixedBinaryLength<TH, Send<Buffer>> keySuffixHashSerializer) { SerializerFixedBinaryLength<TH> keySuffixHashSerializer) {
super(dictionary, super(dictionary,
prefixKey, prefixKey,
keySuffixSerializer, keySuffixSerializer,
@ -32,9 +32,9 @@ public class DatabaseSetDictionaryHashed<T, TH> extends DatabaseMapDictionaryHas
} }
public static <T, TH> DatabaseSetDictionaryHashed<T, TH> simple(LLDictionary dictionary, public static <T, TH> DatabaseSetDictionaryHashed<T, TH> simple(LLDictionary dictionary,
Serializer<T, Send<Buffer>> keySerializer, Serializer<T> keySerializer,
Function<T, TH> keyHashFunction, Function<T, TH> keyHashFunction,
SerializerFixedBinaryLength<TH, Send<Buffer>> keyHashSerializer) { SerializerFixedBinaryLength<TH> keyHashSerializer) {
return new DatabaseSetDictionaryHashed<>(dictionary, return new DatabaseSetDictionaryHashed<>(dictionary,
dictionary.getAllocator().allocate(0).send(), dictionary.getAllocator().allocate(0).send(),
keySerializer, keySerializer,
@ -45,9 +45,9 @@ public class DatabaseSetDictionaryHashed<T, TH> extends DatabaseMapDictionaryHas
public static <T, TH> DatabaseSetDictionaryHashed<T, TH> tail(LLDictionary dictionary, public static <T, TH> DatabaseSetDictionaryHashed<T, TH> tail(LLDictionary dictionary,
Send<Buffer> prefixKey, Send<Buffer> prefixKey,
Serializer<T, Send<Buffer>> keySuffixSerializer, Serializer<T> keySuffixSerializer,
Function<T, TH> keyHashFunction, Function<T, TH> keyHashFunction,
SerializerFixedBinaryLength<TH, Send<Buffer>> keyHashSerializer) { SerializerFixedBinaryLength<TH> keyHashSerializer) {
return new DatabaseSetDictionaryHashed<>(dictionary, return new DatabaseSetDictionaryHashed<>(dictionary,
prefixKey, prefixKey,
keySuffixSerializer, keySuffixSerializer,

View File

@ -14,9 +14,9 @@ import reactor.core.publisher.Mono;
public class SubStageGetterSingle<T> implements SubStageGetter<T, DatabaseStageEntry<T>> { public class SubStageGetterSingle<T> implements SubStageGetter<T, DatabaseStageEntry<T>> {
private final Serializer<T, Send<Buffer>> serializer; private final Serializer<T> serializer;
public SubStageGetterSingle(Serializer<T, Send<Buffer>> serializer) { public SubStageGetterSingle(Serializer<T> serializer) {
this.serializer = serializer; this.serializer = serializer;
} }

View File

@ -10,26 +10,28 @@ import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.NotNull;
class ValueWithHashSerializer<X, Y> implements Serializer<Entry<X, Y>, Send<Buffer>> { class ValueWithHashSerializer<X, Y> implements Serializer<Entry<X, Y>> {
private final BufferAllocator allocator; private final BufferAllocator allocator;
private final Serializer<X, Send<Buffer>> keySuffixSerializer; private final Serializer<X> keySuffixSerializer;
private final Serializer<Y, Send<Buffer>> valueSerializer; private final Serializer<Y> valueSerializer;
ValueWithHashSerializer(BufferAllocator allocator, ValueWithHashSerializer(BufferAllocator allocator,
Serializer<X, Send<Buffer>> keySuffixSerializer, Serializer<X> keySuffixSerializer,
Serializer<Y, Send<Buffer>> valueSerializer) { Serializer<Y> valueSerializer) {
this.allocator = allocator; this.allocator = allocator;
this.keySuffixSerializer = keySuffixSerializer; this.keySuffixSerializer = keySuffixSerializer;
this.valueSerializer = valueSerializer; this.valueSerializer = valueSerializer;
} }
@Override @Override
public @NotNull Entry<X, Y> deserialize(@NotNull Send<Buffer> serializedToReceive) throws SerializationException { public @NotNull DeserializationResult<Entry<X, Y>> deserialize(@NotNull Send<Buffer> serializedToReceive)
throws SerializationException {
try (var serialized = serializedToReceive.receive()) { try (var serialized = serializedToReceive.receive()) {
X deserializedKey = keySuffixSerializer.deserialize(serialized.copy().send()); DeserializationResult<X> deserializedKey = keySuffixSerializer.deserialize(serialized.copy().send());
Y deserializedValue = valueSerializer.deserialize(serialized.send()); DeserializationResult<Y> deserializedValue = valueSerializer.deserialize(serialized.send());
return Map.entry(deserializedKey, deserializedValue); return new DeserializationResult<>(Map.entry(deserializedKey.deserializedData(),
deserializedValue.deserializedData()), deserializedKey.bytesRead() + deserializedValue.bytesRead());
} }
} }

View File

@ -14,23 +14,23 @@ import java.util.HashSet;
import java.util.Set; import java.util.Set;
import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.NotNull;
class ValuesSetSerializer<X> implements Serializer<ObjectArraySet<X>, Send<Buffer>> { class ValuesSetSerializer<X> implements Serializer<ObjectArraySet<X>> {
private final BufferAllocator allocator; private final BufferAllocator allocator;
private final Serializer<X, Send<Buffer>> entrySerializer; private final Serializer<X> entrySerializer;
ValuesSetSerializer(BufferAllocator allocator, Serializer<X, Send<Buffer>> entrySerializer) { ValuesSetSerializer(BufferAllocator allocator, Serializer<X> entrySerializer) {
this.allocator = allocator; this.allocator = allocator;
this.entrySerializer = entrySerializer; this.entrySerializer = entrySerializer;
} }
@Override @Override
public @NotNull ObjectArraySet<X> deserialize(@NotNull Send<Buffer> serializedToReceive) throws SerializationException { public @NotNull DeserializationResult<ObjectArraySet<X>> deserialize(@NotNull Send<Buffer> serializedToReceive) throws SerializationException {
try (var serialized = serializedToReceive.receive()) { try (var serialized = serializedToReceive.receive()) {
int entriesLength = serialized.readInt(); int entriesLength = serialized.readInt();
ArrayList<X> deserializedElements = new ArrayList<>(entriesLength); ArrayList<X> deserializedElements = new ArrayList<>(entriesLength);
for (int i = 0; i < entriesLength; i++) { for (int i = 0; i < entriesLength; i++) {
X entry = entrySerializer.deserialize(serialized.send()); X entry = entrySerializer.deserialize(serialized.copy().send());
deserializedElements.add(entry); deserializedElements.add(entry);
} }
return new ObjectArraySet<>(deserializedElements); return new ObjectArraySet<>(deserializedElements);

View File

@ -2,8 +2,6 @@ package it.cavallium.dbengine.database.disk;
import static io.netty.buffer.Unpooled.wrappedBuffer; import static io.netty.buffer.Unpooled.wrappedBuffer;
import static it.cavallium.dbengine.database.LLUtils.fromByteArray; import static it.cavallium.dbengine.database.LLUtils.fromByteArray;
import static it.cavallium.dbengine.database.LLUtils.isDirect;
import static it.cavallium.dbengine.database.LLUtils.toDirect;
import static java.util.Objects.requireNonNull; import static java.util.Objects.requireNonNull;
import io.netty.buffer.api.Buffer; import io.netty.buffer.api.Buffer;
@ -12,6 +10,7 @@ import io.netty.buffer.api.Resource;
import io.netty.buffer.api.Send; import io.netty.buffer.api.Send;
import io.netty.buffer.api.internal.ResourceSupport; import io.netty.buffer.api.internal.ResourceSupport;
import io.netty.util.ReferenceCounted; import io.netty.util.ReferenceCounted;
import io.netty.util.internal.PlatformDependent;
import it.cavallium.dbengine.client.BadBlock; import it.cavallium.dbengine.client.BadBlock;
import it.cavallium.dbengine.client.DatabaseOptions; import it.cavallium.dbengine.client.DatabaseOptions;
import it.cavallium.dbengine.database.Column; import it.cavallium.dbengine.database.Column;
@ -24,12 +23,15 @@ import it.cavallium.dbengine.database.LLEntry;
import it.cavallium.dbengine.database.LLRange; import it.cavallium.dbengine.database.LLRange;
import it.cavallium.dbengine.database.LLSnapshot; import it.cavallium.dbengine.database.LLSnapshot;
import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.LLUtils.DirectBuffer;
import it.cavallium.dbengine.database.RepeatedElementList; import it.cavallium.dbengine.database.RepeatedElementList;
import it.cavallium.dbengine.database.SafeCloseable;
import it.cavallium.dbengine.database.UpdateMode; import it.cavallium.dbengine.database.UpdateMode;
import it.cavallium.dbengine.database.UpdateReturnMode; import it.cavallium.dbengine.database.UpdateReturnMode;
import it.cavallium.dbengine.database.serialization.BiSerializationFunction; import it.cavallium.dbengine.database.serialization.BiSerializationFunction;
import it.cavallium.dbengine.database.serialization.SerializationFunction; import it.cavallium.dbengine.database.serialization.SerializationFunction;
import it.unimi.dsi.fastutil.ints.IntArrayList; import it.unimi.dsi.fastutil.ints.IntArrayList;
import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.time.Duration; import java.time.Duration;
@ -76,6 +78,7 @@ import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Scheduler;
import reactor.util.function.Tuple2; import reactor.util.function.Tuple2;
import reactor.util.function.Tuple3; import reactor.util.function.Tuple3;
import reactor.util.function.Tuple4;
import reactor.util.function.Tuples; import reactor.util.function.Tuples;
@NotAtomic @NotAtomic
@ -294,17 +297,13 @@ public class LLLocalDictionary implements LLDictionary {
Send<Buffer> keySend, Send<Buffer> keySend,
boolean existsAlmostCertainly) throws RocksDBException { boolean existsAlmostCertainly) throws RocksDBException {
try (var key = keySend.receive()) { try (var key = keySend.receive()) {
if (databaseOptions.allowNettyDirect() && isDirect(key)) { if (databaseOptions.allowNettyDirect()) {
//todo: implement keyMayExist if existsAlmostCertainly is false. //todo: implement keyMayExist if existsAlmostCertainly is false.
// Unfortunately it's not feasible until RocksDB implements keyMayExist with buffers // Unfortunately it's not feasible until RocksDB implements keyMayExist with buffers
// Create the key nio buffer to pass to RocksDB // Create the key nio buffer to pass to RocksDB
if (!isDirect(key)) { var keyNioBuffer = LLUtils.convertToDirect(alloc, key.send());
throw new RocksDBException("Key buffer must be direct");
}
ByteBuffer keyNioBuffer = toDirect(key);
assert keyNioBuffer.isDirect();
// Create a direct result buffer because RocksDB works only with direct buffers // Create a direct result buffer because RocksDB works only with direct buffers
try (Buffer resultBuf = alloc.allocate(INITIAL_DIRECT_READ_BYTE_BUF_SIZE_BYTES)) { try (Buffer resultBuf = alloc.allocate(INITIAL_DIRECT_READ_BYTE_BUF_SIZE_BYTES)) {
int valueSize; int valueSize;
@ -312,12 +311,12 @@ public class LLLocalDictionary implements LLDictionary {
ByteBuffer resultNioBuf; ByteBuffer resultNioBuf;
do { do {
// Create the result nio buffer to pass to RocksDB // Create the result nio buffer to pass to RocksDB
resultNioBuf = toDirect(resultBuf); resultNioBuf = LLUtils.obtainDirect(resultBuf);
assert keyNioBuffer.isDirect(); assert keyNioBuffer.byteBuffer().isDirect();
assert resultNioBuf.isDirect(); assert resultNioBuf.isDirect();
valueSize = db.get(cfh, valueSize = db.get(cfh,
Objects.requireNonNullElse(readOptions, EMPTY_READ_OPTIONS), Objects.requireNonNullElse(readOptions, EMPTY_READ_OPTIONS),
keyNioBuffer.position(0), keyNioBuffer.byteBuffer().position(0),
resultNioBuf resultNioBuf
); );
if (valueSize != RocksDB.NOT_FOUND) { if (valueSize != RocksDB.NOT_FOUND) {
@ -361,7 +360,7 @@ public class LLLocalDictionary implements LLDictionary {
resultNioBuf = null; resultNioBuf = null;
} }
// Rewind the keyNioBuf position, making it readable again for the next loop iteration // Rewind the keyNioBuf position, making it readable again for the next loop iteration
keyNioBuffer.rewind(); keyNioBuffer.byteBuffer().rewind();
if (resultBuf.capacity() < valueSize) { if (resultBuf.capacity() < valueSize) {
// Expand the resultBuf size if the result is bigger than the current result // Expand the resultBuf size if the result is bigger than the current result
// buffer size // buffer size
@ -372,6 +371,9 @@ public class LLLocalDictionary implements LLDictionary {
} while (valueSize != RocksDB.NOT_FOUND); } while (valueSize != RocksDB.NOT_FOUND);
// If the value is not found return null // If the value is not found return null
return null; return null;
} finally {
keyNioBuffer.buffer().close();
PlatformDependent.freeDirectBuffer(keyNioBuffer.byteBuffer());
} }
} else { } else {
try (ReadOptions validReadOptions = Objects.requireNonNullElse(readOptions, EMPTY_READ_OPTIONS)) { try (ReadOptions validReadOptions = Objects.requireNonNullElse(readOptions, EMPTY_READ_OPTIONS)) {
@ -407,19 +409,19 @@ public class LLLocalDictionary implements LLDictionary {
try (var key = keyToReceive.receive()) { try (var key = keyToReceive.receive()) {
try (var value = valueToReceive.receive()) { try (var value = valueToReceive.receive()) {
if (databaseOptions.allowNettyDirect()) { if (databaseOptions.allowNettyDirect()) {
if (!isDirect(key)) { var keyNioBuffer = LLUtils.convertToDirect(alloc, key.send());
throw new RocksDBException("Key buffer must be direct"); try (var ignored1 = keyNioBuffer.buffer().receive()) {
assert keyNioBuffer.byteBuffer().isDirect();
var valueNioBuffer = LLUtils.convertToDirect(alloc, value.send());
try (var ignored2 = valueNioBuffer.buffer().receive()) {
assert valueNioBuffer.byteBuffer().isDirect();
db.put(cfh, validWriteOptions, keyNioBuffer.byteBuffer(), valueNioBuffer.byteBuffer());
} finally {
PlatformDependent.freeDirectBuffer(valueNioBuffer.byteBuffer());
} }
if (!isDirect(value)) { } finally {
throw new RocksDBException("Value buffer must be direct"); PlatformDependent.freeDirectBuffer(keyNioBuffer.byteBuffer());
} }
var keyNioBuffer = toDirect(key);
assert keyNioBuffer.isDirect();
var valueNioBuffer = toDirect(value);
assert valueNioBuffer.isDirect();
db.put(cfh, validWriteOptions, keyNioBuffer, valueNioBuffer);
} else { } else {
db.put(cfh, validWriteOptions, LLUtils.toArray(key), LLUtils.toArray(value)); db.put(cfh, validWriteOptions, LLUtils.toArray(key), LLUtils.toArray(value));
} }
@ -455,6 +457,11 @@ public class LLLocalDictionary implements LLDictionary {
Buffer cloned1 = null; Buffer cloned1 = null;
Buffer cloned2 = null; Buffer cloned2 = null;
Buffer cloned3 = null; Buffer cloned3 = null;
ByteBuffer direct1 = null;
ByteBuffer direct2 = null;
ByteBuffer direct3 = null;
AbstractSlice<?> slice1 = null;
AbstractSlice<?> slice2 = null;
try { try {
try (var range = rangeSend.receive()) { try (var range = rangeSend.receive()) {
try (var readOpts = new ReadOptions(resolveSnapshot(snapshot))) { try (var readOpts = new ReadOptions(resolveSnapshot(snapshot))) {
@ -463,22 +470,24 @@ public class LLLocalDictionary implements LLDictionary {
if (range.hasMin()) { if (range.hasMin()) {
try (var rangeMin = range.getMin().receive()) { try (var rangeMin = range.getMin().receive()) {
if (databaseOptions.allowNettyDirect()) { if (databaseOptions.allowNettyDirect()) {
ByteBuffer directBuf = toDirect(cloned1 = rangeMin.copy()); var directBuf = LLUtils.convertToDirect(alloc, rangeMin.send());
requireNonNull(directBuf, "This range must use direct buffers"); cloned1 = directBuf.buffer().receive();
readOpts.setIterateLowerBound(new DirectSlice(directBuf)); direct1 = directBuf.byteBuffer();
readOpts.setIterateLowerBound(slice1 = new DirectSlice(directBuf.byteBuffer()));
} else { } else {
readOpts.setIterateLowerBound(new Slice(LLUtils.toArray(rangeMin))); readOpts.setIterateLowerBound(slice1 = new Slice(LLUtils.toArray(rangeMin)));
} }
} }
} }
if (range.hasMax()) { if (range.hasMax()) {
try (var rangeMax = range.getMax().receive()) { try (var rangeMax = range.getMax().receive()) {
if (databaseOptions.allowNettyDirect()) { if (databaseOptions.allowNettyDirect()) {
var directBuf = toDirect(cloned2 = rangeMax.copy()); var directBuf = LLUtils.convertToDirect(alloc, rangeMax.send());
requireNonNull(directBuf, "This range must use direct buffers"); cloned2 = directBuf.buffer().receive();
readOpts.setIterateUpperBound(new DirectSlice(directBuf)); direct2 = directBuf.byteBuffer();
readOpts.setIterateUpperBound(slice2 = new DirectSlice(directBuf.byteBuffer()));
} else { } else {
readOpts.setIterateUpperBound(new Slice(LLUtils.toArray(rangeMax))); readOpts.setIterateUpperBound(slice2 = new Slice(LLUtils.toArray(rangeMax)));
} }
} }
} }
@ -486,9 +495,10 @@ public class LLLocalDictionary implements LLDictionary {
if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) { if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) {
try (var rangeMin = range.getMin().receive()) { try (var rangeMin = range.getMin().receive()) {
if (databaseOptions.allowNettyDirect()) { if (databaseOptions.allowNettyDirect()) {
var directBuf = toDirect(cloned3 = rangeMin.copy()); var directBuf = LLUtils.convertToDirect(alloc, rangeMin.send());
requireNonNull(directBuf, "This range must use direct buffers"); cloned3 = directBuf.buffer().receive();
rocksIterator.seek(directBuf); direct3 = directBuf.byteBuffer();
rocksIterator.seek(directBuf.byteBuffer());
} else { } else {
rocksIterator.seek(LLUtils.toArray(rangeMin)); rocksIterator.seek(LLUtils.toArray(rangeMin));
} }
@ -505,6 +515,9 @@ public class LLLocalDictionary implements LLDictionary {
if (cloned1 != null) cloned1.close(); if (cloned1 != null) cloned1.close();
if (cloned2 != null) cloned2.close(); if (cloned2 != null) cloned2.close();
if (cloned3 != null) cloned3.close(); if (cloned3 != null) cloned3.close();
if (direct1 != null) PlatformDependent.freeDirectBuffer(direct1);
if (direct2 != null) PlatformDependent.freeDirectBuffer(direct2);
if (direct3 != null) PlatformDependent.freeDirectBuffer(direct3);
} }
}).onErrorMap(cause -> new IOException("Failed to read range", cause)), }).onErrorMap(cause -> new IOException("Failed to read range", cause)),
rangeSend -> Mono.fromRunnable(rangeSend::close)); rangeSend -> Mono.fromRunnable(rangeSend::close));
@ -872,11 +885,13 @@ public class LLLocalDictionary implements LLDictionary {
try (var key = keyToReceive.receive()) { try (var key = keyToReceive.receive()) {
var validWriteOptions = Objects.requireNonNullElse(writeOptions, EMPTY_WRITE_OPTIONS); var validWriteOptions = Objects.requireNonNullElse(writeOptions, EMPTY_WRITE_OPTIONS);
if (databaseOptions.allowNettyDirect()) { if (databaseOptions.allowNettyDirect()) {
if (!isDirect(key)) { var keyNioBuffer = LLUtils.convertToDirect(alloc, key.send());
throw new IllegalArgumentException("Key must be a direct buffer"); try {
db.delete(cfh, validWriteOptions, keyNioBuffer.byteBuffer());
} finally {
keyNioBuffer.buffer().close();
PlatformDependent.freeDirectBuffer(keyNioBuffer.byteBuffer());
} }
var keyNioBuffer = toDirect(key);
db.delete(cfh, validWriteOptions, keyNioBuffer);
} else { } else {
db.delete(cfh, validWriteOptions, LLUtils.toArray(key)); db.delete(cfh, validWriteOptions, LLUtils.toArray(key));
} }
@ -1104,6 +1119,7 @@ public class LLLocalDictionary implements LLDictionary {
} }
if (USE_WRITE_BATCHES_IN_PUT_MULTI) { if (USE_WRITE_BATCHES_IN_PUT_MULTI) {
var batch = new CappedWriteBatch(db, var batch = new CappedWriteBatch(db,
alloc,
CAPPED_WRITE_BATCH_CAP, CAPPED_WRITE_BATCH_CAP,
RESERVED_WRITE_BATCH_SIZE, RESERVED_WRITE_BATCH_SIZE,
MAX_WRITE_BATCH_SIZE, MAX_WRITE_BATCH_SIZE,
@ -1118,10 +1134,18 @@ public class LLLocalDictionary implements LLDictionary {
batch.close(); batch.close();
} else { } else {
for (LLEntry entry : entriesWindow) { for (LLEntry entry : entriesWindow) {
try (var k = entry.getKey().receive()) { var k = LLUtils.convertToDirect(alloc, entry.getKey());
try (var v = entry.getValue().receive()) { try {
db.put(cfh, EMPTY_WRITE_OPTIONS, toDirect(k), toDirect(v)); var v = LLUtils.convertToDirect(alloc, entry.getValue());
try {
db.put(cfh, EMPTY_WRITE_OPTIONS, k.byteBuffer(), v.byteBuffer());
} finally {
v.buffer().close();
PlatformDependent.freeDirectBuffer(v.byteBuffer());
} }
} finally {
k.buffer().close();
PlatformDependent.freeDirectBuffer(k.byteBuffer());
} }
} }
} }
@ -1219,6 +1243,7 @@ public class LLLocalDictionary implements LLDictionary {
if (USE_WRITE_BATCHES_IN_PUT_MULTI) { if (USE_WRITE_BATCHES_IN_PUT_MULTI) {
var batch = new CappedWriteBatch(db, var batch = new CappedWriteBatch(db,
alloc,
CAPPED_WRITE_BATCH_CAP, CAPPED_WRITE_BATCH_CAP,
RESERVED_WRITE_BATCH_SIZE, RESERVED_WRITE_BATCH_SIZE,
MAX_WRITE_BATCH_SIZE, MAX_WRITE_BATCH_SIZE,
@ -1239,8 +1264,18 @@ public class LLLocalDictionary implements LLDictionary {
} else { } else {
int i = 0; int i = 0;
for (Tuple2<Buffer, X> entry : entriesWindow) { for (Tuple2<Buffer, X> entry : entriesWindow) {
try (var valueToWrite = updatedValuesToWrite.get(i).receive()) { var k = LLUtils.convertToDirect(alloc, entry.getT1().send());
db.put(cfh, EMPTY_WRITE_OPTIONS, toDirect(entry.getT1()), toDirect(valueToWrite)); try {
var v = LLUtils.convertToDirect(alloc, updatedValuesToWrite.get(i));
try {
db.put(cfh, EMPTY_WRITE_OPTIONS, k.byteBuffer(), v.byteBuffer());
} finally {
v.buffer().close();
PlatformDependent.freeDirectBuffer(v.byteBuffer());
}
} finally {
k.buffer().close();
PlatformDependent.freeDirectBuffer(k.byteBuffer());
} }
i++; i++;
} }
@ -1407,7 +1442,8 @@ public class LLLocalDictionary implements LLDictionary {
ro.setReadaheadSize(32 * 1024); ro.setReadaheadSize(32 * 1024);
} }
ro.setVerifyChecksums(true); ro.setVerifyChecksums(true);
var rocksIteratorTuple = getRocksIterator(databaseOptions.allowNettyDirect(), ro, range.send(), db, cfh); var rocksIteratorTuple = getRocksIterator(alloc,
databaseOptions.allowNettyDirect(), ro, range.send(), db, cfh);
try { try {
try (var rocksIterator = rocksIteratorTuple.getT1()) { try (var rocksIterator = rocksIteratorTuple.getT1()) {
rocksIterator.seekToFirst(); rocksIterator.seekToFirst();
@ -1428,6 +1464,7 @@ public class LLLocalDictionary implements LLDictionary {
} finally { } finally {
rocksIteratorTuple.getT2().close(); rocksIteratorTuple.getT2().close();
rocksIteratorTuple.getT3().close(); rocksIteratorTuple.getT3().close();
rocksIteratorTuple.getT4().close();
} }
sink.complete(); sink.complete();
} catch (Throwable ex) { } catch (Throwable ex) {
@ -1504,7 +1541,7 @@ public class LLLocalDictionary implements LLDictionary {
try (var opts = new ReadOptions(EMPTY_READ_OPTIONS)) { try (var opts = new ReadOptions(EMPTY_READ_OPTIONS)) {
ReleasableSlice minBound; ReleasableSlice minBound;
if (range.hasMin()) { if (range.hasMin()) {
minBound = setIterateBound(databaseOptions.allowNettyDirect(), minBound = setIterateBound(alloc, databaseOptions.allowNettyDirect(),
opts, opts,
IterateBound.LOWER, IterateBound.LOWER,
range.getMin() range.getMin()
@ -1515,7 +1552,7 @@ public class LLLocalDictionary implements LLDictionary {
try { try {
ReleasableSlice maxBound; ReleasableSlice maxBound;
if (range.hasMax()) { if (range.hasMax()) {
maxBound = setIterateBound(databaseOptions.allowNettyDirect(), maxBound = setIterateBound(alloc, databaseOptions.allowNettyDirect(),
opts, opts,
IterateBound.UPPER, IterateBound.UPPER,
range.getMax() range.getMax()
@ -1525,18 +1562,26 @@ public class LLLocalDictionary implements LLDictionary {
} }
assert cfh.isOwningHandle(); assert cfh.isOwningHandle();
assert opts.isOwningHandle(); assert opts.isOwningHandle();
SafeCloseable seekTo;
try (RocksIterator it = db.newIterator(cfh, opts)) { try (RocksIterator it = db.newIterator(cfh, opts)) {
if (!PREFER_SEEK_TO_FIRST && range.hasMin()) { if (!PREFER_SEEK_TO_FIRST && range.hasMin()) {
rocksIterSeekTo(databaseOptions.allowNettyDirect(), it, range.getMin()); seekTo = rocksIterSeekTo(alloc, databaseOptions.allowNettyDirect(), it, range.getMin());
} else { } else {
seekTo = null;
it.seekToFirst(); it.seekToFirst();
} }
try {
it.status(); it.status();
while (it.isValid()) { while (it.isValid()) {
db.delete(cfh, it.key()); db.delete(cfh, it.key());
it.next(); it.next();
it.status(); it.status();
} }
} finally {
if (seekTo != null) {
seekTo.close();
}
}
} finally { } finally {
maxBound.close(); maxBound.close();
} }
@ -1546,6 +1591,7 @@ public class LLLocalDictionary implements LLDictionary {
} }
} else if (USE_CAPPED_WRITE_BATCH_IN_SET_RANGE) { } else if (USE_CAPPED_WRITE_BATCH_IN_SET_RANGE) {
try (var batch = new CappedWriteBatch(db, try (var batch = new CappedWriteBatch(db,
alloc,
CAPPED_WRITE_BATCH_CAP, CAPPED_WRITE_BATCH_CAP,
RESERVED_WRITE_BATCH_SIZE, RESERVED_WRITE_BATCH_SIZE,
MAX_WRITE_BATCH_SIZE, MAX_WRITE_BATCH_SIZE,
@ -1585,14 +1631,23 @@ public class LLLocalDictionary implements LLDictionary {
if (!USE_WRITE_BATCHES_IN_SET_RANGE) { if (!USE_WRITE_BATCHES_IN_SET_RANGE) {
for (LLEntry entry : entriesList) { for (LLEntry entry : entriesList) {
assert entry.isAccessible(); assert entry.isAccessible();
try (var k = entry.getKey().receive()) { var k = LLUtils.convertToDirect(alloc, entry.getKey());
try (var v = entry.getValue().receive()) { try {
db.put(cfh, EMPTY_WRITE_OPTIONS, toDirect(k), toDirect(v)); var v = LLUtils.convertToDirect(alloc, entry.getValue());
try {
db.put(cfh, EMPTY_WRITE_OPTIONS, k.byteBuffer(), v.byteBuffer());
} finally {
v.buffer().close();
PlatformDependent.freeDirectBuffer(v.byteBuffer());
} }
} finally {
k.buffer().close();
PlatformDependent.freeDirectBuffer(k.byteBuffer());
} }
} }
} else if (USE_CAPPED_WRITE_BATCH_IN_SET_RANGE) { } else if (USE_CAPPED_WRITE_BATCH_IN_SET_RANGE) {
try (var batch = new CappedWriteBatch(db, try (var batch = new CappedWriteBatch(db,
alloc,
CAPPED_WRITE_BATCH_CAP, CAPPED_WRITE_BATCH_CAP,
RESERVED_WRITE_BATCH_SIZE, RESERVED_WRITE_BATCH_SIZE,
MAX_WRITE_BATCH_SIZE, MAX_WRITE_BATCH_SIZE,
@ -1667,29 +1722,37 @@ public class LLLocalDictionary implements LLDictionary {
readOpts.setFillCache(false); readOpts.setFillCache(false);
ReleasableSlice minBound; ReleasableSlice minBound;
if (range.hasMin()) { if (range.hasMin()) {
minBound = setIterateBound(databaseOptions.allowNettyDirect(), readOpts, IterateBound.LOWER, range.getMin()); minBound = setIterateBound(alloc, databaseOptions.allowNettyDirect(), readOpts, IterateBound.LOWER, range.getMin());
} else { } else {
minBound = emptyReleasableSlice(); minBound = emptyReleasableSlice();
} }
try { try {
ReleasableSlice maxBound; ReleasableSlice maxBound;
if (range.hasMax()) { if (range.hasMax()) {
maxBound = setIterateBound(databaseOptions.allowNettyDirect(), readOpts, IterateBound.UPPER, range.getMax()); maxBound = setIterateBound(alloc, databaseOptions.allowNettyDirect(), readOpts, IterateBound.UPPER, range.getMax());
} else { } else {
maxBound = emptyReleasableSlice(); maxBound = emptyReleasableSlice();
} }
try (var rocksIterator = db.newIterator(cfh, readOpts)) { try (var rocksIterator = db.newIterator(cfh, readOpts)) {
SafeCloseable seekTo;
if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) { if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) {
rocksIterSeekTo(databaseOptions.allowNettyDirect(), rocksIterator, range.getMin()); seekTo = rocksIterSeekTo(alloc, databaseOptions.allowNettyDirect(), rocksIterator, range.getMin());
} else { } else {
seekTo = null;
rocksIterator.seekToFirst(); rocksIterator.seekToFirst();
} }
try {
rocksIterator.status(); rocksIterator.status();
while (rocksIterator.isValid()) { while (rocksIterator.isValid()) {
writeBatch.delete(cfh, LLUtils.readDirectNioBuffer(alloc, rocksIterator::key).send()); writeBatch.delete(cfh, LLUtils.readDirectNioBuffer(alloc, rocksIterator::key));
rocksIterator.next(); rocksIterator.next();
rocksIterator.status(); rocksIterator.status();
} }
} finally {
if (seekTo != null) {
seekTo.close();
}
}
} finally { } finally {
maxBound.close(); maxBound.close();
} }
@ -1709,30 +1772,38 @@ public class LLLocalDictionary implements LLDictionary {
readOpts.setFillCache(false); readOpts.setFillCache(false);
ReleasableSlice minBound; ReleasableSlice minBound;
if (range.hasMin()) { if (range.hasMin()) {
minBound = setIterateBound(databaseOptions.allowNettyDirect(), readOpts, IterateBound.LOWER, range.getMin()); minBound = setIterateBound(alloc, databaseOptions.allowNettyDirect(), readOpts, IterateBound.LOWER, range.getMin());
} else { } else {
minBound = emptyReleasableSlice(); minBound = emptyReleasableSlice();
} }
try { try {
ReleasableSlice maxBound; ReleasableSlice maxBound;
if (range.hasMax()) { if (range.hasMax()) {
maxBound = setIterateBound(databaseOptions.allowNettyDirect(), readOpts, IterateBound.UPPER, maxBound = setIterateBound(alloc, databaseOptions.allowNettyDirect(), readOpts, IterateBound.UPPER,
range.getMax()); range.getMax());
} else { } else {
maxBound = emptyReleasableSlice(); maxBound = emptyReleasableSlice();
} }
try (var rocksIterator = db.newIterator(cfh, readOpts)) { try (var rocksIterator = db.newIterator(cfh, readOpts)) {
SafeCloseable seekTo;
if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) { if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) {
rocksIterSeekTo(databaseOptions.allowNettyDirect(), rocksIterator, range.getMin()); seekTo = rocksIterSeekTo(alloc, databaseOptions.allowNettyDirect(), rocksIterator, range.getMin());
} else { } else {
seekTo = null;
rocksIterator.seekToFirst(); rocksIterator.seekToFirst();
} }
try {
rocksIterator.status(); rocksIterator.status();
while (rocksIterator.isValid()) { while (rocksIterator.isValid()) {
writeBatch.delete(cfh, rocksIterator.key()); writeBatch.delete(cfh, rocksIterator.key());
rocksIterator.next(); rocksIterator.next();
rocksIterator.status(); rocksIterator.status();
} }
} finally {
if (seekTo != null) {
seekTo.close();
}
}
} finally { } finally {
maxBound.close(); maxBound.close();
} }
@ -1743,29 +1814,36 @@ public class LLLocalDictionary implements LLDictionary {
} }
} }
private static void rocksIterSeekTo(boolean allowNettyDirect, RocksIterator rocksIterator, @Nullable
private static SafeCloseable rocksIterSeekTo(BufferAllocator alloc, boolean allowNettyDirect, RocksIterator rocksIterator,
Send<Buffer> bufferToReceive) { Send<Buffer> bufferToReceive) {
try (var buffer = bufferToReceive.receive()) { try (var buffer = bufferToReceive.receive()) {
if (allowNettyDirect) { if (allowNettyDirect) {
ByteBuffer nioBuffer = toDirect(buffer); var direct = LLUtils.convertToDirect(alloc, buffer.send());
assert nioBuffer.isDirect(); assert direct.byteBuffer().isDirect();
rocksIterator.seek(nioBuffer); rocksIterator.seek(direct.byteBuffer());
return () -> {
direct.buffer().close();
PlatformDependent.freeDirectBuffer(direct.byteBuffer());
};
} else { } else {
rocksIterator.seek(LLUtils.toArray(buffer)); rocksIterator.seek(LLUtils.toArray(buffer));
return null;
} }
} }
} }
private static ReleasableSlice setIterateBound(boolean allowNettyDirect, ReadOptions readOpts, private static ReleasableSlice setIterateBound(BufferAllocator alloc, boolean allowNettyDirect, ReadOptions readOpts,
IterateBound boundType, Send<Buffer> bufferToReceive) { IterateBound boundType, Send<Buffer> bufferToReceive) {
var buffer = bufferToReceive.receive(); var buffer = bufferToReceive.receive();
try { try {
requireNonNull(buffer); requireNonNull(buffer);
AbstractSlice<?> slice; AbstractSlice<?> slice;
if (allowNettyDirect && LLLocalDictionary.USE_DIRECT_BUFFER_BOUNDS) { if (allowNettyDirect && LLLocalDictionary.USE_DIRECT_BUFFER_BOUNDS) {
ByteBuffer nioBuffer = toDirect(buffer); var direct = LLUtils.convertToDirect(alloc, buffer.send());
assert nioBuffer.isDirect(); buffer = direct.buffer().receive();
slice = new DirectSlice(nioBuffer, buffer.readableBytes()); assert direct.byteBuffer().isDirect();
slice = new DirectSlice(direct.byteBuffer(), buffer.readableBytes());
assert slice.size() == buffer.readableBytes(); assert slice.size() == buffer.readableBytes();
assert slice.compare(new Slice(LLUtils.toArray(buffer))) == 0; assert slice.compare(new Slice(LLUtils.toArray(buffer))) == 0;
if (boundType == IterateBound.LOWER) { if (boundType == IterateBound.LOWER) {
@ -1773,17 +1851,19 @@ public class LLLocalDictionary implements LLDictionary {
} else { } else {
readOpts.setIterateUpperBound(slice); readOpts.setIterateUpperBound(slice);
} }
return new ReleasableSliceImpl(slice, buffer, nioBuffer); return new ReleasableSliceImpl(slice, buffer, direct.byteBuffer());
} else { } else {
try (buffer) { try {
slice = new Slice(requireNonNull(LLUtils.toArray(buffer))); slice = new Slice(requireNonNull(LLUtils.toArray(buffer)));
}
if (boundType == IterateBound.LOWER) { if (boundType == IterateBound.LOWER) {
readOpts.setIterateLowerBound(slice); readOpts.setIterateLowerBound(slice);
} else { } else {
readOpts.setIterateUpperBound(slice); readOpts.setIterateUpperBound(slice);
} }
return new ReleasableSliceImpl(slice, null, null); return new ReleasableSliceImpl(slice, null, null);
} finally {
buffer.close();
}
} }
} catch (Throwable e) { } catch (Throwable e) {
buffer.close(); buffer.close();
@ -1810,6 +1890,9 @@ public class LLLocalDictionary implements LLDictionary {
if (byteBuf != null) { if (byteBuf != null) {
byteBuf.close(); byteBuf.close();
} }
if (additionalData instanceof ByteBuffer bb && bb.isDirect()) {
PlatformDependent.freeDirectBuffer(bb);
}
} }
} }
@ -1823,6 +1906,7 @@ public class LLLocalDictionary implements LLDictionary {
readOpts.setFillCache(false); readOpts.setFillCache(false);
readOpts.setReadaheadSize(32 * 1024); // 32KiB readOpts.setReadaheadSize(32 * 1024); // 32KiB
try (CappedWriteBatch writeBatch = new CappedWriteBatch(db, try (CappedWriteBatch writeBatch = new CappedWriteBatch(db,
alloc,
CAPPED_WRITE_BATCH_CAP, CAPPED_WRITE_BATCH_CAP,
RESERVED_WRITE_BATCH_SIZE, RESERVED_WRITE_BATCH_SIZE,
MAX_WRITE_BATCH_SIZE, MAX_WRITE_BATCH_SIZE,
@ -1886,7 +1970,7 @@ public class LLLocalDictionary implements LLDictionary {
readOpts.setVerifyChecksums(VERIFY_CHECKSUMS_WHEN_NOT_NEEDED); readOpts.setVerifyChecksums(VERIFY_CHECKSUMS_WHEN_NOT_NEEDED);
ReleasableSlice minBound; ReleasableSlice minBound;
if (range.hasMin()) { if (range.hasMin()) {
minBound = setIterateBound(databaseOptions.allowNettyDirect(), readOpts, IterateBound.LOWER, minBound = setIterateBound(alloc, databaseOptions.allowNettyDirect(), readOpts, IterateBound.LOWER,
range.getMin()); range.getMin());
} else { } else {
minBound = emptyReleasableSlice(); minBound = emptyReleasableSlice();
@ -1894,7 +1978,7 @@ public class LLLocalDictionary implements LLDictionary {
try { try {
ReleasableSlice maxBound; ReleasableSlice maxBound;
if (range.hasMax()) { if (range.hasMax()) {
maxBound = setIterateBound(databaseOptions.allowNettyDirect(), readOpts, IterateBound.UPPER, maxBound = setIterateBound(alloc, databaseOptions.allowNettyDirect(), readOpts, IterateBound.UPPER,
range.getMax()); range.getMax());
} else { } else {
maxBound = emptyReleasableSlice(); maxBound = emptyReleasableSlice();
@ -1905,12 +1989,15 @@ public class LLLocalDictionary implements LLDictionary {
} }
try (var rocksIterator = db.newIterator(cfh, readOpts)) { try (var rocksIterator = db.newIterator(cfh, readOpts)) {
SafeCloseable seekTo;
if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) { if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) {
rocksIterSeekTo(databaseOptions.allowNettyDirect(), rocksIterator, seekTo = rocksIterSeekTo(alloc, databaseOptions.allowNettyDirect(), rocksIterator,
range.getMin()); range.getMin());
} else { } else {
seekTo = null;
rocksIterator.seekToFirst(); rocksIterator.seekToFirst();
} }
try {
long i = 0; long i = 0;
rocksIterator.status(); rocksIterator.status();
while (rocksIterator.isValid()) { while (rocksIterator.isValid()) {
@ -1919,6 +2006,11 @@ public class LLLocalDictionary implements LLDictionary {
i++; i++;
} }
return i; return i;
} finally {
if (seekTo != null) {
seekTo.close();
}
}
} }
} finally { } finally {
maxBound.close(); maxBound.close();
@ -1943,7 +2035,7 @@ public class LLLocalDictionary implements LLDictionary {
try (var readOpts = new ReadOptions(resolveSnapshot(snapshot))) { try (var readOpts = new ReadOptions(resolveSnapshot(snapshot))) {
ReleasableSlice minBound; ReleasableSlice minBound;
if (range.hasMin()) { if (range.hasMin()) {
minBound = setIterateBound(databaseOptions.allowNettyDirect(), readOpts, IterateBound.LOWER, minBound = setIterateBound(alloc, databaseOptions.allowNettyDirect(), readOpts, IterateBound.LOWER,
range.getMin()); range.getMin());
} else { } else {
minBound = emptyReleasableSlice(); minBound = emptyReleasableSlice();
@ -1951,27 +2043,35 @@ public class LLLocalDictionary implements LLDictionary {
try { try {
ReleasableSlice maxBound; ReleasableSlice maxBound;
if (range.hasMax()) { if (range.hasMax()) {
maxBound = setIterateBound(databaseOptions.allowNettyDirect(), readOpts, IterateBound.UPPER, maxBound = setIterateBound(alloc, databaseOptions.allowNettyDirect(), readOpts, IterateBound.UPPER,
range.getMax()); range.getMax());
} else { } else {
maxBound = emptyReleasableSlice(); maxBound = emptyReleasableSlice();
} }
try (var rocksIterator = db.newIterator(cfh, readOpts)) { try (var rocksIterator = db.newIterator(cfh, readOpts)) {
SafeCloseable seekTo;
if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) { if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) {
rocksIterSeekTo(databaseOptions.allowNettyDirect(), rocksIterator, range.getMin()); seekTo = rocksIterSeekTo(alloc, databaseOptions.allowNettyDirect(), rocksIterator, range.getMin());
} else { } else {
seekTo = null;
rocksIterator.seekToFirst(); rocksIterator.seekToFirst();
} }
try {
rocksIterator.status(); rocksIterator.status();
if (rocksIterator.isValid()) { if (rocksIterator.isValid()) {
try (Buffer key = LLUtils.readDirectNioBuffer(alloc, rocksIterator::key)) { try (var key = LLUtils.readDirectNioBuffer(alloc, rocksIterator::key).receive()) {
try (Buffer value = LLUtils.readDirectNioBuffer(alloc, rocksIterator::value)) { try (var value = LLUtils.readDirectNioBuffer(alloc, rocksIterator::value).receive()) {
return LLEntry.of(key.send(), value.send()).send(); return LLEntry.of(key.send(), value.send()).send();
} }
} }
} else { } else {
return null; return null;
} }
} finally {
if (seekTo != null) {
seekTo.close();
}
}
} finally { } finally {
maxBound.close(); maxBound.close();
} }
@ -1993,7 +2093,7 @@ public class LLLocalDictionary implements LLDictionary {
try (var readOpts = new ReadOptions(resolveSnapshot(snapshot))) { try (var readOpts = new ReadOptions(resolveSnapshot(snapshot))) {
ReleasableSlice minBound; ReleasableSlice minBound;
if (range.hasMin()) { if (range.hasMin()) {
minBound = setIterateBound(databaseOptions.allowNettyDirect(), readOpts, IterateBound.LOWER, minBound = setIterateBound(alloc, databaseOptions.allowNettyDirect(), readOpts, IterateBound.LOWER,
range.getMin()); range.getMin());
} else { } else {
minBound = emptyReleasableSlice(); minBound = emptyReleasableSlice();
@ -2001,23 +2101,31 @@ public class LLLocalDictionary implements LLDictionary {
try { try {
ReleasableSlice maxBound; ReleasableSlice maxBound;
if (range.hasMax()) { if (range.hasMax()) {
maxBound = setIterateBound(databaseOptions.allowNettyDirect(), readOpts, IterateBound.UPPER, maxBound = setIterateBound(alloc, databaseOptions.allowNettyDirect(), readOpts, IterateBound.UPPER,
range.getMax()); range.getMax());
} else { } else {
maxBound = emptyReleasableSlice(); maxBound = emptyReleasableSlice();
} }
try (var rocksIterator = db.newIterator(cfh, readOpts)) { try (var rocksIterator = db.newIterator(cfh, readOpts)) {
SafeCloseable seekTo;
if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) { if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) {
rocksIterSeekTo(databaseOptions.allowNettyDirect(), rocksIterator, range.getMin()); seekTo = rocksIterSeekTo(alloc, databaseOptions.allowNettyDirect(), rocksIterator, range.getMin());
} else { } else {
seekTo = null;
rocksIterator.seekToFirst(); rocksIterator.seekToFirst();
} }
try {
rocksIterator.status(); rocksIterator.status();
if (rocksIterator.isValid()) { if (rocksIterator.isValid()) {
return LLUtils.readDirectNioBuffer(alloc, rocksIterator::key).send(); return LLUtils.readDirectNioBuffer(alloc, rocksIterator::key);
} else { } else {
return null; return null;
} }
} finally {
if (seekTo != null) {
seekTo.close();
}
}
} finally { } finally {
maxBound.close(); maxBound.close();
} }
@ -2147,7 +2255,7 @@ public class LLLocalDictionary implements LLDictionary {
try (var readOpts = new ReadOptions(getReadOptions(null))) { try (var readOpts = new ReadOptions(getReadOptions(null))) {
ReleasableSlice minBound; ReleasableSlice minBound;
if (range.hasMin()) { if (range.hasMin()) {
minBound = setIterateBound(databaseOptions.allowNettyDirect(), readOpts, IterateBound.LOWER, minBound = setIterateBound(alloc, databaseOptions.allowNettyDirect(), readOpts, IterateBound.LOWER,
range.getMin()); range.getMin());
} else { } else {
minBound = emptyReleasableSlice(); minBound = emptyReleasableSlice();
@ -2155,27 +2263,35 @@ public class LLLocalDictionary implements LLDictionary {
try { try {
ReleasableSlice maxBound; ReleasableSlice maxBound;
if (range.hasMax()) { if (range.hasMax()) {
maxBound = setIterateBound(databaseOptions.allowNettyDirect(), readOpts, IterateBound.UPPER, maxBound = setIterateBound(alloc, databaseOptions.allowNettyDirect(), readOpts, IterateBound.UPPER,
range.getMax()); range.getMax());
} else { } else {
maxBound = emptyReleasableSlice(); maxBound = emptyReleasableSlice();
} }
try (RocksIterator rocksIterator = db.newIterator(cfh, readOpts)) { try (RocksIterator rocksIterator = db.newIterator(cfh, readOpts)) {
SafeCloseable seekTo;
if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) { if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) {
rocksIterSeekTo(databaseOptions.allowNettyDirect(), rocksIterator, range.getMin()); seekTo = rocksIterSeekTo(alloc, databaseOptions.allowNettyDirect(), rocksIterator, range.getMin());
} else { } else {
seekTo = null;
rocksIterator.seekToFirst(); rocksIterator.seekToFirst();
} }
try {
rocksIterator.status(); rocksIterator.status();
if (!rocksIterator.isValid()) { if (!rocksIterator.isValid()) {
return null; return null;
} }
try (Buffer key = LLUtils.readDirectNioBuffer(alloc, rocksIterator::key)) { try (Buffer key = LLUtils.readDirectNioBuffer(alloc, rocksIterator::key).receive()) {
try (Buffer value = LLUtils.readDirectNioBuffer(alloc, rocksIterator::value)) { try (Buffer value = LLUtils.readDirectNioBuffer(alloc, rocksIterator::value).receive()) {
dbDelete(cfh, null, key.copy().send()); dbDelete(cfh, null, key.copy().send());
return LLEntry.of(key.send(), value.send()).send(); return LLEntry.of(key.send(), value.send()).send();
} }
} }
} finally {
if (seekTo != null) {
seekTo.close();
}
}
} finally { } finally {
maxBound.close(); maxBound.close();
} }
@ -2190,7 +2306,8 @@ public class LLLocalDictionary implements LLDictionary {
} }
@NotNull @NotNull
public static Tuple3<RocksIterator, ReleasableSlice, ReleasableSlice> getRocksIterator(boolean allowNettyDirect, public static Tuple4<RocksIterator, ReleasableSlice, ReleasableSlice, SafeCloseable> getRocksIterator(BufferAllocator alloc,
boolean allowNettyDirect,
ReadOptions readOptions, ReadOptions readOptions,
Send<LLRange> rangeToReceive, Send<LLRange> rangeToReceive,
RocksDB db, RocksDB db,
@ -2199,22 +2316,26 @@ public class LLLocalDictionary implements LLDictionary {
ReleasableSlice sliceMin; ReleasableSlice sliceMin;
ReleasableSlice sliceMax; ReleasableSlice sliceMax;
if (range.hasMin()) { if (range.hasMin()) {
sliceMin = setIterateBound(allowNettyDirect, readOptions, IterateBound.LOWER, range.getMin()); sliceMin = setIterateBound(alloc, allowNettyDirect, readOptions, IterateBound.LOWER, range.getMin());
} else { } else {
sliceMin = emptyReleasableSlice(); sliceMin = emptyReleasableSlice();
} }
if (range.hasMax()) { if (range.hasMax()) {
sliceMax = setIterateBound(allowNettyDirect, readOptions, IterateBound.UPPER, range.getMax()); sliceMax = setIterateBound(alloc, allowNettyDirect, readOptions, IterateBound.UPPER, range.getMax());
} else { } else {
sliceMax = emptyReleasableSlice(); sliceMax = emptyReleasableSlice();
} }
var rocksIterator = db.newIterator(cfh, readOptions); var rocksIterator = db.newIterator(cfh, readOptions);
SafeCloseable seekTo;
if (!PREFER_SEEK_TO_FIRST && range.hasMin()) { if (!PREFER_SEEK_TO_FIRST && range.hasMin()) {
rocksIterSeekTo(allowNettyDirect, rocksIterator, range.getMin()); seekTo = Objects.requireNonNullElseGet(rocksIterSeekTo(alloc, allowNettyDirect, rocksIterator, range.getMin()),
() -> ((SafeCloseable) () -> {})
);
} else { } else {
seekTo = () -> {};
rocksIterator.seekToFirst(); rocksIterator.seekToFirst();
} }
return Tuples.of(rocksIterator, sliceMin, sliceMax); return Tuples.of(rocksIterator, sliceMin, sliceMax, seekTo);
} }
} }
} }

View File

@ -55,7 +55,7 @@ public abstract class LLLocalGroupedReactiveRocksIterator<T> {
.generate(() -> { .generate(() -> {
var readOptions = new ReadOptions(this.readOptions); var readOptions = new ReadOptions(this.readOptions);
readOptions.setFillCache(canFillCache && range.hasMin() && range.hasMax()); readOptions.setFillCache(canFillCache && range.hasMin() && range.hasMax());
return LLLocalDictionary.getRocksIterator(allowNettyDirect, readOptions, range.copy().send(), db, cfh); return LLLocalDictionary.getRocksIterator(alloc, allowNettyDirect, readOptions, range.copy().send(), db, cfh);
}, (tuple, sink) -> { }, (tuple, sink) -> {
try { try {
var rocksIterator = tuple.getT1(); var rocksIterator = tuple.getT1();
@ -64,7 +64,7 @@ public abstract class LLLocalGroupedReactiveRocksIterator<T> {
try { try {
rocksIterator.status(); rocksIterator.status();
while (rocksIterator.isValid()) { while (rocksIterator.isValid()) {
try (Buffer key = LLUtils.readDirectNioBuffer(alloc, rocksIterator::key)) { try (Buffer key = LLUtils.readDirectNioBuffer(alloc, rocksIterator::key).receive()) {
if (firstGroupKey == null) { if (firstGroupKey == null) {
firstGroupKey = key.copy(); firstGroupKey = key.copy();
} else if (!LLUtils.equals(firstGroupKey, firstGroupKey.readerOffset(), } else if (!LLUtils.equals(firstGroupKey, firstGroupKey.readerOffset(),
@ -73,7 +73,7 @@ public abstract class LLLocalGroupedReactiveRocksIterator<T> {
} }
Buffer value; Buffer value;
if (readValues) { if (readValues) {
value = LLUtils.readDirectNioBuffer(alloc, rocksIterator::value); value = LLUtils.readDirectNioBuffer(alloc, rocksIterator::value).receive();
} else { } else {
value = alloc.allocate(0); value = alloc.allocate(0);
} }
@ -106,6 +106,7 @@ public abstract class LLLocalGroupedReactiveRocksIterator<T> {
rocksIterator.close(); rocksIterator.close();
tuple.getT2().close(); tuple.getT2().close();
tuple.getT3().close(); tuple.getT3().close();
tuple.getT4().close();
}); });
} }

View File

@ -55,7 +55,7 @@ public class LLLocalKeyPrefixReactiveRocksIterator {
readOptions.setReadaheadSize(32 * 1024); // 32KiB readOptions.setReadaheadSize(32 * 1024); // 32KiB
readOptions.setFillCache(canFillCache); readOptions.setFillCache(canFillCache);
} }
return LLLocalDictionary.getRocksIterator(allowNettyDirect, readOptions, range.copy().send(), db, cfh); return LLLocalDictionary.getRocksIterator(alloc, allowNettyDirect, readOptions, range.copy().send(), db, cfh);
}, (tuple, sink) -> { }, (tuple, sink) -> {
try { try {
var rocksIterator = tuple.getT1(); var rocksIterator = tuple.getT1();
@ -63,7 +63,7 @@ public class LLLocalKeyPrefixReactiveRocksIterator {
Buffer firstGroupKey = null; Buffer firstGroupKey = null;
try { try {
while (rocksIterator.isValid()) { while (rocksIterator.isValid()) {
try (Buffer key = LLUtils.readDirectNioBuffer(alloc, rocksIterator::key)) { try (Buffer key = LLUtils.readDirectNioBuffer(alloc, rocksIterator::key).receive()) {
if (firstGroupKey == null) { if (firstGroupKey == null) {
firstGroupKey = key.copy(); firstGroupKey = key.copy();
} else if (!LLUtils.equals(firstGroupKey, firstGroupKey.readerOffset(), key, key.readerOffset(), prefixLength)) { } else if (!LLUtils.equals(firstGroupKey, firstGroupKey.readerOffset(), key, key.readerOffset(), prefixLength)) {
@ -93,6 +93,7 @@ public class LLLocalKeyPrefixReactiveRocksIterator {
rocksIterator.close(); rocksIterator.close();
tuple.getT2().close(); tuple.getT2().close();
tuple.getT3().close(); tuple.getT3().close();
tuple.getT4().close();
}); });
} }

View File

@ -62,16 +62,16 @@ public abstract class LLLocalReactiveRocksIterator<T> {
readOptions.setReadaheadSize(32 * 1024); // 32KiB readOptions.setReadaheadSize(32 * 1024); // 32KiB
readOptions.setFillCache(false); readOptions.setFillCache(false);
} }
return getRocksIterator(allowNettyDirect, readOptions, range.copy().send(), db, cfh); return getRocksIterator(alloc, allowNettyDirect, readOptions, range.copy().send(), db, cfh);
}, (tuple, sink) -> { }, (tuple, sink) -> {
try { try {
var rocksIterator = tuple.getT1(); var rocksIterator = tuple.getT1();
rocksIterator.status(); rocksIterator.status();
if (rocksIterator.isValid()) { if (rocksIterator.isValid()) {
try (Buffer key = LLUtils.readDirectNioBuffer(alloc, rocksIterator::key)) { try (Buffer key = LLUtils.readDirectNioBuffer(alloc, rocksIterator::key).receive()) {
Buffer value; Buffer value;
if (readValues) { if (readValues) {
value = LLUtils.readDirectNioBuffer(alloc, rocksIterator::value); value = LLUtils.readDirectNioBuffer(alloc, rocksIterator::value).receive();
} else { } else {
value = alloc.allocate(0); value = alloc.allocate(0);
} }
@ -95,6 +95,7 @@ public abstract class LLLocalReactiveRocksIterator<T> {
rocksIterator.close(); rocksIterator.close();
tuple.getT2().close(); tuple.getT2().close();
tuple.getT3().close(); tuple.getT3().close();
tuple.getT4().close();
}); });
} }

View File

@ -7,16 +7,21 @@ import it.cavallium.dbengine.database.LLUtils;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.NotNull;
public interface Serializer<A, B> { public interface Serializer<A> {
@NotNull A deserialize(@NotNull B serialized) throws SerializationException; record DeserializationResult<T>(T deserializedData, int bytesRead) {}
@NotNull B serialize(@NotNull A deserialized) throws SerializationException; @NotNull DeserializationResult<A> deserialize(@NotNull Send<Buffer> serialized) throws SerializationException;
Serializer<Send<Buffer>, Send<Buffer>> NOOP_SERIALIZER = new Serializer<>() { @NotNull Send<Buffer> serialize(@NotNull A deserialized) throws SerializationException;
Serializer<Send<Buffer>> NOOP_SERIALIZER = new Serializer<>() {
@Override @Override
public @NotNull Send<Buffer> deserialize(@NotNull Send<Buffer> serialized) { public @NotNull DeserializationResult<Send<Buffer>> deserialize(@NotNull Send<Buffer> serialized) {
return serialized; try (var serializedBuf = serialized.receive()) {
var readableBytes = serializedBuf.readableBytes();
return new DeserializationResult<>(serializedBuf.send(), readableBytes);
}
} }
@Override @Override
@ -25,17 +30,20 @@ public interface Serializer<A, B> {
} }
}; };
static Serializer<Send<Buffer>, Send<Buffer>> noop() { static Serializer<Send<Buffer>> noop() {
return NOOP_SERIALIZER; return NOOP_SERIALIZER;
} }
static Serializer<String, Send<Buffer>> utf8(BufferAllocator allocator) { static Serializer<String> utf8(BufferAllocator allocator) {
return new Serializer<>() { return new Serializer<>() {
@Override @Override
public @NotNull String deserialize(@NotNull Send<Buffer> serializedToReceive) { public @NotNull DeserializationResult<String> deserialize(@NotNull Send<Buffer> serializedToReceive) {
try (Buffer serialized = serializedToReceive.receive()) { try (Buffer serialized = serializedToReceive.receive()) {
int length = serialized.readInt(); int length = serialized.readInt();
return LLUtils.deserializeString(serialized.send(), serialized.readerOffset(), length, StandardCharsets.UTF_8); var readerOffset = serialized.readerOffset();
var readableBytes = serialized.readableBytes();
return new DeserializationResult<>(LLUtils.deserializeString(serialized.send(),
readerOffset, length, StandardCharsets.UTF_8), readableBytes);
} }
} }

View File

@ -8,21 +8,22 @@ import java.nio.charset.StandardCharsets;
import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.NotNull;
@SuppressWarnings("unused") @SuppressWarnings("unused")
public interface SerializerFixedBinaryLength<A, B> extends Serializer<A, B> { public interface SerializerFixedBinaryLength<A> extends Serializer<A> {
int getSerializedBinaryLength(); int getSerializedBinaryLength();
static SerializerFixedBinaryLength<Send<Buffer>, Send<Buffer>> noop(int length) { static SerializerFixedBinaryLength<Send<Buffer>> noop(int length) {
return new SerializerFixedBinaryLength<>() { return new SerializerFixedBinaryLength<>() {
@Override @Override
public @NotNull Send<Buffer> deserialize(@NotNull Send<Buffer> serialized) { public @NotNull DeserializationResult<Send<Buffer>> deserialize(@NotNull Send<Buffer> serialized) {
try (var buf = serialized.receive()) { try (var buf = serialized.receive()) {
if (buf.readableBytes() != getSerializedBinaryLength()) { if (buf.readableBytes() != getSerializedBinaryLength()) {
throw new IllegalArgumentException( throw new IllegalArgumentException(
"Fixed serializer with " + getSerializedBinaryLength() + " bytes has tried to deserialize an element with " "Fixed serializer with " + getSerializedBinaryLength() + " bytes has tried to deserialize an element with "
+ buf.readableBytes() + " bytes instead"); + buf.readableBytes() + " bytes instead");
} }
return buf.send(); var readableBytes = buf.readableBytes();
return new DeserializationResult<>(buf.send(), readableBytes);
} }
} }
@ -45,10 +46,11 @@ public interface SerializerFixedBinaryLength<A, B> extends Serializer<A, B> {
}; };
} }
static SerializerFixedBinaryLength<String, Send<Buffer>> utf8(BufferAllocator allocator, int length) { static SerializerFixedBinaryLength<String> utf8(BufferAllocator allocator, int length) {
return new SerializerFixedBinaryLength<>() { return new SerializerFixedBinaryLength<>() {
@Override @Override
public @NotNull String deserialize(@NotNull Send<Buffer> serializedToReceive) throws SerializationException { public @NotNull DeserializationResult<String> deserialize(@NotNull Send<Buffer> serializedToReceive)
throws SerializationException {
try (var serialized = serializedToReceive.receive()) { try (var serialized = serializedToReceive.receive()) {
if (serialized.readableBytes() != getSerializedBinaryLength()) { if (serialized.readableBytes() != getSerializedBinaryLength()) {
throw new SerializationException( throw new SerializationException(
@ -56,7 +58,9 @@ public interface SerializerFixedBinaryLength<A, B> extends Serializer<A, B> {
+ serialized.readableBytes() + " bytes instead"); + serialized.readableBytes() + " bytes instead");
} }
var readerOffset = serialized.readerOffset(); var readerOffset = serialized.readerOffset();
return LLUtils.deserializeString(serialized.send(), readerOffset, length, StandardCharsets.UTF_8); var readableBytes = serialized.readableBytes();
return new DeserializationResult<>(LLUtils.deserializeString(serialized.send(),
readerOffset, length, StandardCharsets.UTF_8), readableBytes);
} }
} }
@ -81,17 +85,18 @@ public interface SerializerFixedBinaryLength<A, B> extends Serializer<A, B> {
}; };
} }
static SerializerFixedBinaryLength<Integer, Send<Buffer>> intSerializer(BufferAllocator allocator) { static SerializerFixedBinaryLength<Integer> intSerializer(BufferAllocator allocator) {
return new SerializerFixedBinaryLength<>() { return new SerializerFixedBinaryLength<>() {
@Override @Override
public @NotNull Integer deserialize(@NotNull Send<Buffer> serializedToReceive) { public @NotNull DeserializationResult<Integer> deserialize(@NotNull Send<Buffer> serializedToReceive) {
try (var serialized = serializedToReceive.receive()) { try (var serialized = serializedToReceive.receive()) {
if (serialized.readableBytes() != getSerializedBinaryLength()) { if (serialized.readableBytes() != getSerializedBinaryLength()) {
throw new IllegalArgumentException( throw new IllegalArgumentException(
"Fixed serializer with " + getSerializedBinaryLength() + " bytes has tried to deserialize an element with " "Fixed serializer with " + getSerializedBinaryLength() + " bytes has tried to deserialize an element with "
+ serialized.readableBytes() + " bytes instead"); + serialized.readableBytes() + " bytes instead");
} }
return serialized.readInt(); var readableBytes = serialized.readableBytes();
return new DeserializationResult<>(serialized.readInt(), readableBytes);
} }
} }
@ -109,17 +114,18 @@ public interface SerializerFixedBinaryLength<A, B> extends Serializer<A, B> {
}; };
} }
static SerializerFixedBinaryLength<Long, Send<Buffer>> longSerializer(BufferAllocator allocator) { static SerializerFixedBinaryLength<Long> longSerializer(BufferAllocator allocator) {
return new SerializerFixedBinaryLength<>() { return new SerializerFixedBinaryLength<>() {
@Override @Override
public @NotNull Long deserialize(@NotNull Send<Buffer> serializedToReceive) { public @NotNull DeserializationResult<Long> deserialize(@NotNull Send<Buffer> serializedToReceive) {
try (var serialized = serializedToReceive.receive()) { try (var serialized = serializedToReceive.receive()) {
if (serialized.readableBytes() != getSerializedBinaryLength()) { if (serialized.readableBytes() != getSerializedBinaryLength()) {
throw new IllegalArgumentException( throw new IllegalArgumentException(
"Fixed serializer with " + getSerializedBinaryLength() + " bytes has tried to deserialize an element with " "Fixed serializer with " + getSerializedBinaryLength() + " bytes has tried to deserialize an element with "
+ serialized.readableBytes() + " bytes instead"); + serialized.readableBytes() + " bytes instead");
} }
return serialized.readLong(); var readableBytes = serialized.readableBytes();
return new DeserializationResult<>(serialized.readLong(), readableBytes);
} }
} }

View File

@ -3,7 +3,9 @@ package org.rocksdb;
import static it.cavallium.dbengine.database.LLUtils.isDirect; import static it.cavallium.dbengine.database.LLUtils.isDirect;
import io.netty.buffer.api.Buffer; import io.netty.buffer.api.Buffer;
import io.netty.buffer.api.BufferAllocator;
import io.netty.buffer.api.Send; import io.netty.buffer.api.Send;
import io.netty.util.internal.PlatformDependent;
import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.database.LLUtils;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.ArrayList; import java.util.ArrayList;
@ -25,25 +27,30 @@ public class CappedWriteBatch extends WriteBatch {
*/ */
private static final boolean USE_FAST_DIRECT_BUFFERS = true; private static final boolean USE_FAST_DIRECT_BUFFERS = true;
private final RocksDB db; private final RocksDB db;
private final BufferAllocator alloc;
private final int cap; private final int cap;
private final WriteOptions writeOptions; private final WriteOptions writeOptions;
private final List<Buffer> buffersToRelease; private final List<Buffer> buffersToRelease;
private final List<ByteBuffer> byteBuffersToRelease;
/** /**
* @param cap The limit of operations * @param cap The limit of operations
*/ */
public CappedWriteBatch(RocksDB db, public CappedWriteBatch(RocksDB db,
BufferAllocator alloc,
int cap, int cap,
int reservedWriteBatchSize, int reservedWriteBatchSize,
long maxWriteBatchSize, long maxWriteBatchSize,
WriteOptions writeOptions) { WriteOptions writeOptions) {
super(reservedWriteBatchSize); super(reservedWriteBatchSize);
this.db = db; this.db = db;
this.alloc = alloc;
this.cap = cap; this.cap = cap;
this.writeOptions = writeOptions; this.writeOptions = writeOptions;
this.setMaxBytes(maxWriteBatchSize); this.setMaxBytes(maxWriteBatchSize);
this.buffersToRelease = new ArrayList<>(); this.buffersToRelease = new ArrayList<>();
this.byteBuffersToRelease = new ArrayList<>();
} }
private synchronized void flushIfNeeded(boolean force) throws RocksDBException { private synchronized void flushIfNeeded(boolean force) throws RocksDBException {
@ -61,6 +68,12 @@ public class CappedWriteBatch extends WriteBatch {
} }
buffersToRelease.clear(); buffersToRelease.clear();
} }
if (!byteBuffersToRelease.isEmpty()) {
for (var byteBuffer : byteBuffersToRelease) {
PlatformDependent.freeDirectBuffer(byteBuffer);
}
byteBuffersToRelease.clear();
}
} }
@Override @Override
@ -98,14 +111,20 @@ public class CappedWriteBatch extends WriteBatch {
var key = keyToReceive.receive(); var key = keyToReceive.receive();
var value = valueToReceive.receive(); var value = valueToReceive.receive();
if (USE_FAST_DIRECT_BUFFERS && isDirect(key) && isDirect(value)) { if (USE_FAST_DIRECT_BUFFERS && isDirect(key) && isDirect(value)) {
buffersToRelease.add(key);
buffersToRelease.add(value); buffersToRelease.add(value);
ByteBuffer keyNioBuffer = LLUtils.toDirect(key); var keyNioBuffer = LLUtils.convertToDirect(alloc, key.send());
assert keyNioBuffer.isDirect(); key = keyNioBuffer.buffer().receive();
buffersToRelease.add(key);
byteBuffersToRelease.add(keyNioBuffer.byteBuffer());
assert keyNioBuffer.byteBuffer().isDirect();
ByteBuffer valueNioBuffer = LLUtils.toDirect(value); var valueNioBuffer = LLUtils.convertToDirect(alloc, value.send());
assert valueNioBuffer.isDirect(); value = valueNioBuffer.buffer().receive();
super.put(columnFamilyHandle, keyNioBuffer, valueNioBuffer); buffersToRelease.add(value);
byteBuffersToRelease.add(valueNioBuffer.byteBuffer());
assert valueNioBuffer.byteBuffer().isDirect();
super.put(columnFamilyHandle, keyNioBuffer.byteBuffer(), valueNioBuffer.byteBuffer());
} else { } else {
try { try {
byte[] keyArray = LLUtils.toArray(key); byte[] keyArray = LLUtils.toArray(key);
@ -160,16 +179,18 @@ public class CappedWriteBatch extends WriteBatch {
public synchronized void delete(ColumnFamilyHandle columnFamilyHandle, Send<Buffer> keyToReceive) throws RocksDBException { public synchronized void delete(ColumnFamilyHandle columnFamilyHandle, Send<Buffer> keyToReceive) throws RocksDBException {
var key = keyToReceive.receive(); var key = keyToReceive.receive();
if (USE_FAST_DIRECT_BUFFERS) { if (USE_FAST_DIRECT_BUFFERS) {
var keyNioBuffer = LLUtils.convertToDirect(alloc, key.send());
key = keyNioBuffer.buffer().receive();
buffersToRelease.add(key); buffersToRelease.add(key);
ByteBuffer keyNioBuffer = LLUtils.toDirect(key); byteBuffersToRelease.add(keyNioBuffer.byteBuffer());
assert keyNioBuffer.isDirect(); assert keyNioBuffer.byteBuffer().isDirect();
removeDirect(nativeHandle_, removeDirect(nativeHandle_,
keyNioBuffer, keyNioBuffer.byteBuffer(),
keyNioBuffer.position(), keyNioBuffer.byteBuffer().position(),
keyNioBuffer.remaining(), keyNioBuffer.byteBuffer().remaining(),
columnFamilyHandle.nativeHandle_ columnFamilyHandle.nativeHandle_
); );
keyNioBuffer.position(keyNioBuffer.limit()); keyNioBuffer.byteBuffer().position(keyNioBuffer.byteBuffer().limit());
} else { } else {
try { try {
super.delete(columnFamilyHandle, LLUtils.toArray(key)); super.delete(columnFamilyHandle, LLUtils.toArray(key));

View File

@ -15,6 +15,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.Stream; import java.util.stream.Stream;
import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.Arguments;
@ -211,19 +212,19 @@ public class TestDictionaryMap {
return "error?"; return "error?";
}), }),
map.updateValue(key, false, old -> { map.updateValue(key, false, old -> {
assert Objects.equals(old, "error?"); Assertions.assertEquals("error?", old);
return "error?"; return "error?";
}), }),
map.updateValue(key, true, old -> { map.updateValue(key, true, old -> {
assert Objects.equals(old, "error?"); Assertions.assertEquals("error?", old);
return "error?"; return "error?";
}), }),
map.updateValue(key, true, old -> { map.updateValue(key, true, old -> {
assert Objects.equals(old, "error?"); Assertions.assertEquals("error?", old);
return value; return value;
}), }),
map.updateValue(key, true, old -> { map.updateValue(key, true, old -> {
assert Objects.equals(old, value); Assertions.assertEquals(value, old);
return value; return value;
}) })
) )