From a8028024c811c81cb951a21879cb83eec3907a20 Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Thu, 2 Sep 2021 21:14:26 +0200 Subject: [PATCH] Finish refactoring --- .../cavallium/dbengine/database/LLUtils.java | 17 +-- .../DatabaseMapDictionaryDeep.java | 117 ++++++------------ .../database/collections/SubStageGetter.java | 1 - .../collections/SubStageGetterHashMap.java | 7 -- .../collections/SubStageGetterHashSet.java | 6 - .../collections/SubStageGetterMap.java | 18 +-- .../collections/SubStageGetterMapDeep.java | 19 +-- .../collections/SubStageGetterSet.java | 8 -- .../collections/SubStageGetterSingle.java | 9 -- .../database/disk/LLLocalDictionary.java | 4 +- ...LLLocalKeyPrefixReactiveRocksIterator.java | 100 ++++++++------- .../disk/LLLocalKeyValueDatabase.java | 6 +- .../database/disk/MemorySegmentUtils.java | 94 ++++++++++++++ .../it/cavallium/dbengine/DbTestUtils.java | 30 ++++- .../dbengine/TestDictionaryMapDeep.java | 29 +++-- 15 files changed, 244 insertions(+), 221 deletions(-) create mode 100644 src/main/java/it/cavallium/dbengine/database/disk/MemorySegmentUtils.java diff --git a/src/main/java/it/cavallium/dbengine/database/LLUtils.java b/src/main/java/it/cavallium/dbengine/database/LLUtils.java index 2f57509..7b28d43 100644 --- a/src/main/java/it/cavallium/dbengine/database/LLUtils.java +++ b/src/main/java/it/cavallium/dbengine/database/LLUtils.java @@ -2,16 +2,13 @@ package it.cavallium.dbengine.database; import com.google.common.primitives.Ints; import com.google.common.primitives.Longs; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.UnpooledDirectByteBuf; import io.netty.buffer.api.Buffer; import io.netty.buffer.api.BufferAllocator; import io.netty.buffer.api.CompositeBuffer; -import io.netty.buffer.api.MemoryManager; import io.netty.buffer.api.Send; -import io.netty.buffer.api.unsafe.UnsafeMemoryManager; import io.netty.util.IllegalReferenceCountException; import io.netty.util.internal.PlatformDependent; +import it.cavallium.dbengine.database.disk.MemorySegmentUtils; import it.cavallium.dbengine.database.serialization.SerializationException; import it.cavallium.dbengine.database.serialization.SerializationFunction; import it.cavallium.dbengine.lucene.RandomSortField; @@ -25,8 +22,6 @@ import java.util.Map.Entry; import java.util.Objects; import java.util.Optional; import java.util.concurrent.Callable; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; import java.util.function.ToIntFunction; import org.apache.lucene.document.Document; import org.apache.lucene.document.Field; @@ -51,8 +46,6 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.util.function.Tuple2; import reactor.util.function.Tuple3; -import reactor.util.function.Tuples; -import sun.misc.Unsafe; @SuppressWarnings("unused") public class LLUtils { @@ -370,9 +363,9 @@ public class LLUtils { PlatformDependent.getUnsafeUnavailabilityCause() ); } - if (!PlatformDependent.hasDirectBufferNoCleanerConstructor()) { - throw new UnsupportedOperationException("Please enable unsafe support or disable netty direct buffers:" - + " DirectBufferNoCleanerConstructor is not available"); + if (!MemorySegmentUtils.isSupported()) { + throw new UnsupportedOperationException("Please enable Foreign Memory Access API support or disable" + + " netty direct buffers"); } assert buffer.isAccessible(); long nativeAddress; @@ -382,7 +375,7 @@ public class LLUtils { } throw new IllegalStateException("Buffer is not direct"); } - return PlatformDependent.directBuffer(nativeAddress, buffer.capacity()); + return MemorySegmentUtils.directBuffer(nativeAddress, buffer.capacity()); } public static Buffer fromByteArray(BufferAllocator alloc, byte[] array) { diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryDeep.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryDeep.java index c70ae92..65b5dbf 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryDeep.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryDeep.java @@ -5,7 +5,6 @@ import io.netty.buffer.api.BufferAllocator; import io.netty.buffer.api.Resource; import io.netty.buffer.api.Send; import io.netty.util.IllegalReferenceCountException; -import io.netty.util.ReferenceCounted; import it.cavallium.dbengine.client.BadBlock; import it.cavallium.dbengine.client.CompositeSnapshot; import it.cavallium.dbengine.database.LLDictionary; @@ -14,16 +13,14 @@ import it.cavallium.dbengine.database.LLRange; import it.cavallium.dbengine.database.LLSnapshot; import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.database.UpdateMode; -import it.cavallium.dbengine.database.disk.LLLocalDictionary; import it.cavallium.dbengine.database.serialization.SerializationException; import it.cavallium.dbengine.database.serialization.SerializerFixedBinaryLength; -import java.util.Collection; -import java.util.List; import java.util.Map; import java.util.Map.Entry; import org.jetbrains.annotations.Nullable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.util.function.Tuple2; import reactor.util.function.Tuples; // todo: implement optimized methods (which?) @@ -44,7 +41,6 @@ public class DatabaseMapDictionaryDeep> implem private static Send incrementPrefix(BufferAllocator alloc, Send originalKeySend, int prefixLength) { try (var originalKey = originalKeySend.receive()) { assert originalKey.readableBytes() >= prefixLength; - var originalKeyStartOffset = originalKey.readerOffset(); var originalKeyLength = originalKey.readableBytes(); try (Buffer copiedBuf = alloc.allocate(originalKey.readableBytes())) { boolean overflowed = true; @@ -278,15 +274,6 @@ public class DatabaseMapDictionaryDeep> implem } } - /** - * Remove ext from full key - */ - protected Send removeExtFromFullKey(Send keyToReceive) { - try (var key = keyToReceive.receive()) { - return key.copy(key.readerOffset(), keyPrefixLength + keySuffixLength).send(); - } - } - /** * Add prefix to suffix */ @@ -308,26 +295,6 @@ public class DatabaseMapDictionaryDeep> implem } } - protected Send toExtRange(Buffer keySuffix) { - try (Buffer first = firstRangeKey(alloc, - keyPrefix.copy().send(), - keySuffix.copy().send(), - keyPrefixLength, - keySuffixLength, - keyExtLength - ).receive()) { - try (Buffer end = nextRangeKey(alloc, - keyPrefix.copy().send(), - keySuffix.copy().send(), - keyPrefixLength, - keySuffixLength, - keyExtLength - ).receive()) { - return LLRange.of(first.send(), end.send()).send(); - } - } - } - @Override public Mono leavesCount(@Nullable CompositeSnapshot snapshot, boolean fast) { return dictionary.sizeRange(resolveSnapshot(snapshot), rangeMono, fast); @@ -340,16 +307,10 @@ public class DatabaseMapDictionaryDeep> implem @Override public Mono at(@Nullable CompositeSnapshot snapshot, T keySuffix) { - return Mono.using( - () -> serializeSuffix(keySuffix).receive(), - keySuffixData -> Mono.using( - () -> toKeyWithoutExt(keySuffixData.send()).receive(), - keyWithoutExt -> this.subStageGetter - .subStage(dictionary, snapshot, LLUtils.lazyRetain(keyWithoutExt)), - Resource::close - ), - Resource::close - ).transform(LLUtils::handleDiscard).doOnDiscard(DatabaseStage.class, DatabaseStage::release); + return this.subStageGetter + .subStage(dictionary, snapshot, Mono.fromCallable(() -> toKeyWithoutExt(serializeSuffix(keySuffix)))) + .transform(LLUtils::handleDiscard) + .doOnDiscard(DatabaseStage.class, DatabaseStage::release); } @Override @@ -362,45 +323,45 @@ public class DatabaseMapDictionaryDeep> implem return dictionary.badBlocks(rangeMono); } - private static record GroupBuffers(Buffer groupKeyWithExt, Buffer groupKeyWithoutExt, Buffer groupSuffix) {} - @Override public Flux> getAllStages(@Nullable CompositeSnapshot snapshot) { - - return Flux - .defer(() -> dictionary.getRangeKeyPrefixes(resolveSnapshot(snapshot), rangeMono, keyPrefixLength + keySuffixLength)) - .flatMapSequential(groupKeyWithoutExtSend -> Mono - .using( - () -> { - try (var groupKeyWithoutExt = groupKeyWithoutExtSend.receive()) { - try (var groupSuffix = this.stripPrefix(groupKeyWithoutExt.copy().send()).receive()) { - assert subStageKeysConsistency(groupKeyWithoutExt.readableBytes() + keyExtLength); - return Tuples.of(groupKeyWithoutExt, groupSuffix); - } + return dictionary + .getRangeKeyPrefixes(resolveSnapshot(snapshot), rangeMono, keyPrefixLength + keySuffixLength) + .flatMapSequential(groupKeyWithoutExtSend_ -> Mono.using( + groupKeyWithoutExtSend_::receive, + groupKeyWithoutExtSend -> this.subStageGetter + .subStage(dictionary, snapshot, getGroupKeyWithoutExt(groupKeyWithoutExtSend.copy().send())) + .>handle((us, sink) -> { + try { + sink.next(Map.entry(this.deserializeSuffix(getGroupSuffix(groupKeyWithoutExtSend.send())), + us)); + } catch (SerializationException ex) { + sink.error(ex); } - }, - groupKeyWithoutExtAndGroupSuffix -> this.subStageGetter - .subStage(dictionary, - snapshot, - LLUtils.lazyRetain(groupKeyWithoutExtAndGroupSuffix.getT1()) - ) - .>handle((us, sink) -> { - try { - sink.next(Map.entry(this.deserializeSuffix(groupKeyWithoutExtAndGroupSuffix.getT2().send()), - us)); - } catch (SerializationException ex) { - sink.error(ex); - } - }), - entry -> { - entry.getT1().close(); - entry.getT2().close(); - } - ) - ) + }), + Resource::close + )) .transform(LLUtils::handleDiscard); } + private Send getGroupSuffix(Send groupKeyWithoutExtSend) { + try (var groupKeyWithoutExt = groupKeyWithoutExtSend.receive()) { + try (var groupSuffix = this.stripPrefix(groupKeyWithoutExt.copy().send()).receive()) { + assert subStageKeysConsistency(groupKeyWithoutExt.readableBytes() + keyExtLength); + return groupSuffix.send(); + } + } + } + + private Mono> getGroupKeyWithoutExt(Send groupKeyWithoutExtSend) { + return Mono.fromCallable(() -> { + try (var groupKeyWithoutExt = groupKeyWithoutExtSend.receive()) { + assert subStageKeysConsistency(groupKeyWithoutExt.readableBytes() + keyExtLength); + return groupKeyWithoutExt.send(); + } + }); + } + private boolean subStageKeysConsistency(int totalKeyLength) { if (subStageGetter instanceof SubStageGetterMapDeep) { return totalKeyLength @@ -432,7 +393,7 @@ public class DatabaseMapDictionaryDeep> implem return dictionary.clear(); } else if (range.isSingle()) { return dictionary - .remove(LLUtils.lazyRetain(range::getSingle), LLDictionaryResultType.VOID) + .remove(Mono.fromCallable(range::getSingle), LLDictionaryResultType.VOID) .doOnNext(Send::close) .then(); } else { diff --git a/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetter.java b/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetter.java index 1e15434..173ce0e 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetter.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetter.java @@ -16,5 +16,4 @@ public interface SubStageGetter> { @Nullable CompositeSnapshot snapshot, Mono> prefixKey); - boolean isMultiKey(); } diff --git a/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterHashMap.java b/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterHashMap.java index e0ee8f2..731f8c5 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterHashMap.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterHashMap.java @@ -6,11 +6,9 @@ import it.cavallium.dbengine.client.CompositeSnapshot; import it.cavallium.dbengine.database.LLDictionary; import it.cavallium.dbengine.database.serialization.Serializer; import it.cavallium.dbengine.database.serialization.SerializerFixedBinaryLength; -import java.util.List; import java.util.Map; import java.util.function.Function; import org.jetbrains.annotations.Nullable; -import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @SuppressWarnings("unused") @@ -52,11 +50,6 @@ public class SubStageGetterHashMap implements ); } - @Override - public boolean isMultiKey() { - return true; - } - public int getKeyHashBinaryLength() { return keyHashSerializer.getSerializedBinaryLength(); } diff --git a/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterHashSet.java b/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterHashSet.java index c1375aa..15b2078 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterHashSet.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterHashSet.java @@ -10,7 +10,6 @@ import it.cavallium.dbengine.database.serialization.SerializerFixedBinaryLength; import java.util.Map; import java.util.function.Function; import org.jetbrains.annotations.Nullable; -import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @SuppressWarnings({"unused", "ClassCanBeRecord"}) @@ -47,11 +46,6 @@ public class SubStageGetterHashSet implements ); } - @Override - public boolean isMultiKey() { - return true; - } - public int getKeyHashBinaryLength() { return keyHashSerializer.getSerializedBinaryLength(); } diff --git a/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterMap.java b/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterMap.java index 2f01ce9..25879a5 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterMap.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterMap.java @@ -2,15 +2,12 @@ package it.cavallium.dbengine.database.collections; import io.netty.buffer.api.Buffer; import io.netty.buffer.api.Send; -import io.netty.util.ReferenceCounted; import it.cavallium.dbengine.client.CompositeSnapshot; import it.cavallium.dbengine.database.LLDictionary; import it.cavallium.dbengine.database.serialization.Serializer; import it.cavallium.dbengine.database.serialization.SerializerFixedBinaryLength; -import java.util.List; import java.util.Map; import org.jetbrains.annotations.Nullable; -import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; public class SubStageGetterMap implements SubStageGetter, DatabaseMapDictionary> { @@ -29,23 +26,12 @@ public class SubStageGetterMap implements SubStageGetter, Databa @Nullable CompositeSnapshot snapshot, Mono> prefixKeyMono) { return Mono.usingWhen(prefixKeyMono, - prefixKey -> Mono - .fromSupplier(() -> DatabaseMapDictionary - .tail(dictionary, - prefixKey, - keySerializer, - valueSerializer - ) - ), + prefixKey -> Mono.fromSupplier(() -> DatabaseMapDictionary + .tail(dictionary, prefixKey, keySerializer, valueSerializer)), prefixKey -> Mono.fromRunnable(prefixKey::close) ); } - @Override - public boolean isMultiKey() { - return true; - } - public int getKeyBinaryLength() { return keySerializer.getSerializedBinaryLength(); } diff --git a/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterMapDeep.java b/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterMapDeep.java index c4742e5..a5b21ba 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterMapDeep.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterMapDeep.java @@ -2,14 +2,11 @@ package it.cavallium.dbengine.database.collections; import io.netty.buffer.api.Buffer; import io.netty.buffer.api.Send; -import io.netty.util.ReferenceCounted; import it.cavallium.dbengine.client.CompositeSnapshot; import it.cavallium.dbengine.database.LLDictionary; import it.cavallium.dbengine.database.serialization.SerializerFixedBinaryLength; -import java.util.List; import java.util.Map; import org.jetbrains.annotations.Nullable; -import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; public class SubStageGetterMapDeep> implements @@ -44,24 +41,12 @@ public class SubStageGetterMapDeep> implements @Nullable CompositeSnapshot snapshot, Mono> prefixKeyMono) { return Mono.usingWhen(prefixKeyMono, - prefixKey -> Mono - .fromSupplier(() -> DatabaseMapDictionaryDeep - .deepIntermediate(dictionary, - prefixKey, - keySerializer, - subStageGetter, - keyExtLength - ) - ), + prefixKey -> Mono.fromSupplier(() -> DatabaseMapDictionaryDeep + .deepIntermediate(dictionary, prefixKey, keySerializer, subStageGetter, keyExtLength)), prefixKey -> Mono.fromRunnable(prefixKey::close) ); } - @Override - public boolean isMultiKey() { - return true; - } - public int getKeyBinaryLength() { return keySerializer.getSerializedBinaryLength() + keyExtLength; } diff --git a/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterSet.java b/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterSet.java index 8de364c..a3bcf22 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterSet.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterSet.java @@ -2,15 +2,12 @@ package it.cavallium.dbengine.database.collections; import io.netty.buffer.api.Buffer; import io.netty.buffer.api.Send; -import io.netty.util.ReferenceCounted; import it.cavallium.dbengine.client.CompositeSnapshot; import it.cavallium.dbengine.database.LLDictionary; import it.cavallium.dbengine.database.collections.DatabaseEmpty.Nothing; import it.cavallium.dbengine.database.serialization.SerializerFixedBinaryLength; -import java.util.List; import java.util.Map; import org.jetbrains.annotations.Nullable; -import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; public class SubStageGetterSet implements SubStageGetter, DatabaseSetDictionary> { @@ -32,11 +29,6 @@ public class SubStageGetterSet implements SubStageGetter, Dat ); } - @Override - public boolean isMultiKey() { - return true; - } - public int getKeyBinaryLength() { return keySerializer.getSerializedBinaryLength(); } diff --git a/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterSingle.java b/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterSingle.java index 273e504..4fc3069 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterSingle.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/SubStageGetterSingle.java @@ -4,12 +4,8 @@ import io.netty.buffer.api.Buffer; import io.netty.buffer.api.Send; import it.cavallium.dbengine.client.CompositeSnapshot; import it.cavallium.dbengine.database.LLDictionary; -import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.database.serialization.Serializer; -import java.util.Arrays; -import java.util.List; import org.jetbrains.annotations.Nullable; -import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; public class SubStageGetterSingle implements SubStageGetter> { @@ -32,9 +28,4 @@ public class SubStageGetterSingle implements SubStageGetter it.flux(), + it -> it.release() ) .subscribeOn(dbScheduler), rangeSend -> Mono.fromRunnable(rangeSend::close) diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyPrefixReactiveRocksIterator.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyPrefixReactiveRocksIterator.java index 2f25138..263d05e 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyPrefixReactiveRocksIterator.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyPrefixReactiveRocksIterator.java @@ -48,53 +48,65 @@ public class LLLocalKeyPrefixReactiveRocksIterator { public Flux> flux() { - return Flux - .generate(() -> { - var readOptions = new ReadOptions(this.readOptions); - if (!range.hasMin() || !range.hasMax()) { - readOptions.setReadaheadSize(32 * 1024); // 32KiB - readOptions.setFillCache(canFillCache); - } - return LLLocalDictionary.getRocksIterator(alloc, allowNettyDirect, readOptions, range.copy().send(), db, cfh); - }, (tuple, sink) -> { - try { - var rocksIterator = tuple.getT1(); - rocksIterator.status(); - Buffer firstGroupKey = null; - try { - while (rocksIterator.isValid()) { - try (Buffer key = LLUtils.readDirectNioBuffer(alloc, rocksIterator::key).receive()) { - if (firstGroupKey == null) { - firstGroupKey = key.copy(); - } else if (!LLUtils.equals(firstGroupKey, firstGroupKey.readerOffset(), key, key.readerOffset(), prefixLength)) { - break; + return Flux.using( + () -> range.copy().send(), + rangeSend -> Flux + .generate(() -> { + var readOptions = new ReadOptions(this.readOptions); + if (!range.hasMin() || !range.hasMax()) { + readOptions.setReadaheadSize(32 * 1024); // 32KiB + readOptions.setFillCache(canFillCache); + } + return LLLocalDictionary.getRocksIterator(alloc, allowNettyDirect, readOptions, rangeSend, db, cfh); + }, (tuple, sink) -> { + try { + var rocksIterator = tuple.getT1(); + rocksIterator.status(); + Buffer firstGroupKey = null; + try { + while (rocksIterator.isValid()) { + Buffer key; + if (allowNettyDirect) { + key = LLUtils.readDirectNioBuffer(alloc, rocksIterator::key).receive(); + } else { + key = LLUtils.fromByteArray(alloc, rocksIterator.key()); + } + try (key) { + if (firstGroupKey == null) { + firstGroupKey = key.copy(); + } else if (!LLUtils.equals(firstGroupKey, firstGroupKey.readerOffset(), key, key.readerOffset(), + prefixLength)) { + break; + } + rocksIterator.next(); + rocksIterator.status(); + } + } + if (firstGroupKey != null) { + var groupKeyPrefix = firstGroupKey.copy(firstGroupKey.readerOffset(), prefixLength); + assert groupKeyPrefix.isAccessible(); + sink.next(groupKeyPrefix.send()); + } else { + sink.complete(); + } + } finally { + if (firstGroupKey != null) { + firstGroupKey.close(); } - rocksIterator.next(); - rocksIterator.status(); } + } catch (RocksDBException ex) { + sink.error(ex); } - if (firstGroupKey != null) { - var groupKeyPrefix = firstGroupKey.copy(firstGroupKey.readerOffset(), prefixLength); - sink.next(groupKeyPrefix.send()); - } else { - sink.complete(); - } - } finally { - if (firstGroupKey != null) { - firstGroupKey.close(); - } - } - } catch (RocksDBException ex) { - sink.error(ex); - } - return tuple; - }, tuple -> { - var rocksIterator = tuple.getT1(); - rocksIterator.close(); - tuple.getT2().close(); - tuple.getT3().close(); - tuple.getT4().close(); - }); + return tuple; + }, tuple -> { + var rocksIterator = tuple.getT1(); + rocksIterator.close(); + tuple.getT2().close(); + tuple.getT3().close(); + tuple.getT4().close(); + }), + Send::close + ); } public void release() { diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java index b874082..53655df 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java @@ -97,9 +97,9 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { PlatformDependent.getUnsafeUnavailabilityCause() ); } - if (!PlatformDependent.hasDirectBufferNoCleanerConstructor()) { - throw new UnsupportedOperationException("Please enable unsafe support or disable netty direct buffers:" - + " DirectBufferNoCleanerConstructor is not available"); + if (!MemorySegmentUtils.isSupported()) { + throw new UnsupportedOperationException("Please enable Foreign Memory Access API support or disable" + + " netty direct buffers"); } } diff --git a/src/main/java/it/cavallium/dbengine/database/disk/MemorySegmentUtils.java b/src/main/java/it/cavallium/dbengine/database/disk/MemorySegmentUtils.java new file mode 100644 index 0000000..da20cc6 --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/database/disk/MemorySegmentUtils.java @@ -0,0 +1,94 @@ +package it.cavallium.dbengine.database.disk; + +import io.netty.util.internal.PlatformDependent; +import java.lang.invoke.MethodHandle; +import java.lang.invoke.MethodHandles; +import java.lang.invoke.MethodHandles.Lookup; +import java.lang.invoke.MethodType; +import java.nio.ByteBuffer; + +public class MemorySegmentUtils { + + private static final MethodHandle OF_NATIVE_RESTRICTED; + private static final MethodHandle AS_SLICE; + private static final MethodHandle AS_BYTE_BUFFER; + private static Throwable cause; + + private static final Object NATIVE; + + static { + Lookup lookup = MethodHandles.publicLookup(); + + Object nativeVal = null; + + MethodHandle ofNativeRestricted; + try { + ofNativeRestricted = lookup.findStatic(Class.forName("jdk.incubator.foreign.MemorySegment"), + "ofNativeRestricted", + MethodType.methodType(Class.forName("jdk.incubator.foreign.MemorySegment")) + ); + try { + nativeVal = ofNativeRestricted.invoke(); + } catch (Throwable e) { + cause = e; + } + } catch (NoSuchMethodException | IllegalAccessException | ClassNotFoundException e) { + ofNativeRestricted = null; + cause = e; + } + OF_NATIVE_RESTRICTED = ofNativeRestricted; + + MethodHandle asSlice; + try { + asSlice = lookup.findVirtual(Class.forName("jdk.incubator.foreign.MemorySegment"), + "asSlice", + MethodType.methodType(Class.forName("jdk.incubator.foreign.MemorySegment"), long.class, long.class) + ); + } catch (NoSuchMethodException | IllegalAccessException | ClassNotFoundException e) { + asSlice = null; + cause = e; + } + AS_SLICE = asSlice; + + MethodHandle asByteBuffer; + try { + asByteBuffer = lookup.findVirtual(Class.forName("jdk.incubator.foreign.MemorySegment"), + "asByteBuffer", MethodType.methodType(ByteBuffer.class)); + } catch (NoSuchMethodException | IllegalAccessException | ClassNotFoundException e) { + asByteBuffer = null; + cause = e; + } + AS_BYTE_BUFFER = asByteBuffer; + + NATIVE = nativeVal; + } + + public static ByteBuffer directBuffer(long address, long size) { + if (address <= 0) { + throw new IllegalArgumentException("Address is " + address); + } + if (size > Integer.MAX_VALUE || size < 0) { + throw new IllegalArgumentException("size is " + size); + } + try { + if (!isSupported()) { + if (PlatformDependent.hasDirectBufferNoCleanerConstructor()) { + return PlatformDependent.directBuffer(address, (int) size); + } + throw new UnsupportedOperationException("Foreign Memory Access API is not enabled!"); + } + var memorySegment = AS_SLICE.invoke(NATIVE, address, size); + return (ByteBuffer) AS_BYTE_BUFFER.invoke(memorySegment); + } catch (Throwable e) { + throw new UnsupportedOperationException("Foreign Memory Access API is not enabled!", e); + } + } + + public static boolean isSupported() { + return OF_NATIVE_RESTRICTED != null && AS_SLICE != null && AS_BYTE_BUFFER != null && NATIVE != null; + } + + public static Throwable getUnsupportedCause() { + return cause; + } +} diff --git a/src/test/java/it/cavallium/dbengine/DbTestUtils.java b/src/test/java/it/cavallium/dbengine/DbTestUtils.java index 7f62f1f..0672788 100644 --- a/src/test/java/it/cavallium/dbengine/DbTestUtils.java +++ b/src/test/java/it/cavallium/dbengine/DbTestUtils.java @@ -2,16 +2,12 @@ package it.cavallium.dbengine; import static org.junit.jupiter.api.Assertions.assertEquals; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufAllocator; -import io.netty.buffer.PoolArenaMetric; -import io.netty.buffer.PooledByteBufAllocator; -import io.netty.buffer.UnpooledByteBufAllocator; import io.netty.buffer.api.Buffer; import io.netty.buffer.api.MemoryManager; import io.netty.buffer.api.Send; import io.netty.buffer.api.pool.BufferAllocatorMetric; import io.netty.buffer.api.pool.PooledBufferAllocator; +import io.netty.util.internal.PlatformDependent; import it.cavallium.dbengine.database.Column; import it.cavallium.dbengine.database.LLDatabaseConnection; import it.cavallium.dbengine.database.LLDictionary; @@ -26,6 +22,7 @@ import it.cavallium.dbengine.database.collections.SubStageGetterHashMap; import it.cavallium.dbengine.database.collections.SubStageGetterMap; import it.cavallium.dbengine.client.DatabaseOptions; import it.cavallium.dbengine.database.disk.LLLocalDatabaseConnection; +import it.cavallium.dbengine.database.disk.MemorySegmentUtils; import it.cavallium.dbengine.database.serialization.Serializer; import it.cavallium.dbengine.database.serialization.SerializerFixedBinaryLength; import java.io.IOException; @@ -78,6 +75,7 @@ public class DbTestUtils { Path path) {} public static Mono openTempDb(TestAllocator alloc) { + boolean canUseNettyDirect = computeCanUseNettyDirect(); return Mono.defer(() -> { var wrkspcPath = Path.of("/tmp/.cache/tempdb-" + dbId.incrementAndGet() + "/"); return Mono @@ -99,13 +97,33 @@ public class DbTestUtils { .flatMap(conn -> conn .getDatabase("testdb", List.of(Column.dictionary("testmap"), Column.special("ints"), Column.special("longs")), - new DatabaseOptions(Map.of(), true, false, true, false, true, false, false, -1) + new DatabaseOptions(Map.of(), true, false, true, false, true, canUseNettyDirect, canUseNettyDirect, -1) ) .map(db -> new TempDb(alloc, conn, db, wrkspcPath)) ); }); } + private static boolean computeCanUseNettyDirect() { + boolean canUse = true; + if (!PlatformDependent.hasUnsafe()) { + System.err.println("Warning! Unsafe is not available!" + + " Netty direct buffers will not be used in tests!"); + canUse = false; + } + if (!MemorySegmentUtils.isSupported()) { + System.err.println("Warning! Foreign Memory Access API is not available!" + + " Netty direct buffers will not be used in tests!" + + " Please set \"--enable-preview --add-modules jdk.incubator.foreign --foreign.restricted=permit\""); + if (MemorySegmentUtils.getUnsupportedCause() != null) { + System.err.println("\tCause: " + MemorySegmentUtils.getUnsupportedCause().getClass().getName() + + ":" + MemorySegmentUtils.getUnsupportedCause().getLocalizedMessage()); + } + canUse = false; + } + return canUse; + } + public static Mono closeTempDb(TempDb tempDb) { return tempDb.db().close().then(tempDb.connection().disconnect()).then(Mono.fromCallable(() -> { ensureNoLeaks(tempDb.allocator().allocator(), false); diff --git a/src/test/java/it/cavallium/dbengine/TestDictionaryMapDeep.java b/src/test/java/it/cavallium/dbengine/TestDictionaryMapDeep.java index b6216dc..9e489d5 100644 --- a/src/test/java/it/cavallium/dbengine/TestDictionaryMapDeep.java +++ b/src/test/java/it/cavallium/dbengine/TestDictionaryMapDeep.java @@ -10,6 +10,7 @@ import static it.cavallium.dbengine.DbTestUtils.tempDictionary; import it.cavallium.dbengine.DbTestUtils.TestAllocator; import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.database.UpdateMode; +import it.cavallium.dbengine.database.collections.DatabaseMapDictionaryDeep; import java.util.Arrays; import java.util.List; import java.util.Map; @@ -214,18 +215,22 @@ public class TestDictionaryMapDeep { Step> stpVer = StepVerifier .create(tempDb(allocator, db -> tempDictionary(db, updateMode) .map(dict -> tempDatabaseMapDictionaryDeepMap(dict, 5, 6)) - .flatMapMany(map -> map + .flatMapMany(map_ -> Flux.using( + () -> map_, + map -> map .at(null, key) - .flatMap(v -> v - .set(value) - .doAfterTerminate(v::release) - ) + .flatMap(v_ -> Mono.using( + () -> v_, + v -> v.set(value), + DatabaseMapDictionaryDeep::release + )) .then(map .at(null, "capra") - .flatMap(v -> v - .set(Map.of("normal", "123", "ormaln", "456")) - .doAfterTerminate(v::release) - ) + .flatMap(v_ -> Mono.using( + () -> v_, + v -> v.set(Map.of("normal", "123", "ormaln", "456")), + DatabaseMapDictionaryDeep::release + )) ) .thenMany(map .getAllStages(null) @@ -234,9 +239,9 @@ public class TestDictionaryMapDeep { .map(result -> Tuples.of(v.getKey(), result.getKey(), result.getValue())) .doAfterTerminate(() -> v.getValue().release()) ) - ) - .doAfterTerminate(map::release) - ) + ), + DatabaseMapDictionaryDeep::release + )) )); if (shouldFail) { stpVer.verifyError();