Finish refactoring

This commit is contained in:
Andrea Cavalli 2021-09-02 21:14:26 +02:00
parent 3091b81d34
commit a8028024c8
15 changed files with 244 additions and 221 deletions

View File

@ -2,16 +2,13 @@ package it.cavallium.dbengine.database;
import com.google.common.primitives.Ints;
import com.google.common.primitives.Longs;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.UnpooledDirectByteBuf;
import io.netty.buffer.api.Buffer;
import io.netty.buffer.api.BufferAllocator;
import io.netty.buffer.api.CompositeBuffer;
import io.netty.buffer.api.MemoryManager;
import io.netty.buffer.api.Send;
import io.netty.buffer.api.unsafe.UnsafeMemoryManager;
import io.netty.util.IllegalReferenceCountException;
import io.netty.util.internal.PlatformDependent;
import it.cavallium.dbengine.database.disk.MemorySegmentUtils;
import it.cavallium.dbengine.database.serialization.SerializationException;
import it.cavallium.dbengine.database.serialization.SerializationFunction;
import it.cavallium.dbengine.lucene.RandomSortField;
@ -25,8 +22,6 @@ import java.util.Map.Entry;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.ToIntFunction;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
@ -51,8 +46,6 @@ import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuple3;
import reactor.util.function.Tuples;
import sun.misc.Unsafe;
@SuppressWarnings("unused")
public class LLUtils {
@ -370,9 +363,9 @@ public class LLUtils {
PlatformDependent.getUnsafeUnavailabilityCause()
);
}
if (!PlatformDependent.hasDirectBufferNoCleanerConstructor()) {
throw new UnsupportedOperationException("Please enable unsafe support or disable netty direct buffers:"
+ " DirectBufferNoCleanerConstructor is not available");
if (!MemorySegmentUtils.isSupported()) {
throw new UnsupportedOperationException("Please enable Foreign Memory Access API support or disable"
+ " netty direct buffers");
}
assert buffer.isAccessible();
long nativeAddress;
@ -382,7 +375,7 @@ public class LLUtils {
}
throw new IllegalStateException("Buffer is not direct");
}
return PlatformDependent.directBuffer(nativeAddress, buffer.capacity());
return MemorySegmentUtils.directBuffer(nativeAddress, buffer.capacity());
}
public static Buffer fromByteArray(BufferAllocator alloc, byte[] array) {

View File

@ -5,7 +5,6 @@ import io.netty.buffer.api.BufferAllocator;
import io.netty.buffer.api.Resource;
import io.netty.buffer.api.Send;
import io.netty.util.IllegalReferenceCountException;
import io.netty.util.ReferenceCounted;
import it.cavallium.dbengine.client.BadBlock;
import it.cavallium.dbengine.client.CompositeSnapshot;
import it.cavallium.dbengine.database.LLDictionary;
@ -14,16 +13,14 @@ import it.cavallium.dbengine.database.LLRange;
import it.cavallium.dbengine.database.LLSnapshot;
import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.UpdateMode;
import it.cavallium.dbengine.database.disk.LLLocalDictionary;
import it.cavallium.dbengine.database.serialization.SerializationException;
import it.cavallium.dbengine.database.serialization.SerializerFixedBinaryLength;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import org.jetbrains.annotations.Nullable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuples;
// todo: implement optimized methods (which?)
@ -44,7 +41,6 @@ public class DatabaseMapDictionaryDeep<T, U, US extends DatabaseStage<U>> implem
private static Send<Buffer> incrementPrefix(BufferAllocator alloc, Send<Buffer> originalKeySend, int prefixLength) {
try (var originalKey = originalKeySend.receive()) {
assert originalKey.readableBytes() >= prefixLength;
var originalKeyStartOffset = originalKey.readerOffset();
var originalKeyLength = originalKey.readableBytes();
try (Buffer copiedBuf = alloc.allocate(originalKey.readableBytes())) {
boolean overflowed = true;
@ -278,15 +274,6 @@ public class DatabaseMapDictionaryDeep<T, U, US extends DatabaseStage<U>> implem
}
}
/**
* Remove ext from full key
*/
protected Send<Buffer> removeExtFromFullKey(Send<Buffer> keyToReceive) {
try (var key = keyToReceive.receive()) {
return key.copy(key.readerOffset(), keyPrefixLength + keySuffixLength).send();
}
}
/**
* Add prefix to suffix
*/
@ -308,26 +295,6 @@ public class DatabaseMapDictionaryDeep<T, U, US extends DatabaseStage<U>> implem
}
}
protected Send<LLRange> toExtRange(Buffer keySuffix) {
try (Buffer first = firstRangeKey(alloc,
keyPrefix.copy().send(),
keySuffix.copy().send(),
keyPrefixLength,
keySuffixLength,
keyExtLength
).receive()) {
try (Buffer end = nextRangeKey(alloc,
keyPrefix.copy().send(),
keySuffix.copy().send(),
keyPrefixLength,
keySuffixLength,
keyExtLength
).receive()) {
return LLRange.of(first.send(), end.send()).send();
}
}
}
@Override
public Mono<Long> leavesCount(@Nullable CompositeSnapshot snapshot, boolean fast) {
return dictionary.sizeRange(resolveSnapshot(snapshot), rangeMono, fast);
@ -340,16 +307,10 @@ public class DatabaseMapDictionaryDeep<T, U, US extends DatabaseStage<U>> implem
@Override
public Mono<US> at(@Nullable CompositeSnapshot snapshot, T keySuffix) {
return Mono.using(
() -> serializeSuffix(keySuffix).receive(),
keySuffixData -> Mono.using(
() -> toKeyWithoutExt(keySuffixData.send()).receive(),
keyWithoutExt -> this.subStageGetter
.subStage(dictionary, snapshot, LLUtils.lazyRetain(keyWithoutExt)),
Resource::close
),
Resource::close
).transform(LLUtils::handleDiscard).doOnDiscard(DatabaseStage.class, DatabaseStage::release);
return this.subStageGetter
.subStage(dictionary, snapshot, Mono.fromCallable(() -> toKeyWithoutExt(serializeSuffix(keySuffix))))
.transform(LLUtils::handleDiscard)
.doOnDiscard(DatabaseStage.class, DatabaseStage::release);
}
@Override
@ -362,45 +323,45 @@ public class DatabaseMapDictionaryDeep<T, U, US extends DatabaseStage<U>> implem
return dictionary.badBlocks(rangeMono);
}
private static record GroupBuffers(Buffer groupKeyWithExt, Buffer groupKeyWithoutExt, Buffer groupSuffix) {}
@Override
public Flux<Entry<T, US>> getAllStages(@Nullable CompositeSnapshot snapshot) {
return Flux
.defer(() -> dictionary.getRangeKeyPrefixes(resolveSnapshot(snapshot), rangeMono, keyPrefixLength + keySuffixLength))
.flatMapSequential(groupKeyWithoutExtSend -> Mono
.using(
() -> {
try (var groupKeyWithoutExt = groupKeyWithoutExtSend.receive()) {
try (var groupSuffix = this.stripPrefix(groupKeyWithoutExt.copy().send()).receive()) {
assert subStageKeysConsistency(groupKeyWithoutExt.readableBytes() + keyExtLength);
return Tuples.of(groupKeyWithoutExt, groupSuffix);
}
}
},
groupKeyWithoutExtAndGroupSuffix -> this.subStageGetter
.subStage(dictionary,
snapshot,
LLUtils.lazyRetain(groupKeyWithoutExtAndGroupSuffix.getT1())
)
return dictionary
.getRangeKeyPrefixes(resolveSnapshot(snapshot), rangeMono, keyPrefixLength + keySuffixLength)
.flatMapSequential(groupKeyWithoutExtSend_ -> Mono.using(
groupKeyWithoutExtSend_::receive,
groupKeyWithoutExtSend -> this.subStageGetter
.subStage(dictionary, snapshot, getGroupKeyWithoutExt(groupKeyWithoutExtSend.copy().send()))
.<Entry<T, US>>handle((us, sink) -> {
try {
sink.next(Map.entry(this.deserializeSuffix(groupKeyWithoutExtAndGroupSuffix.getT2().send()),
sink.next(Map.entry(this.deserializeSuffix(getGroupSuffix(groupKeyWithoutExtSend.send())),
us));
} catch (SerializationException ex) {
sink.error(ex);
}
}),
entry -> {
entry.getT1().close();
entry.getT2().close();
}
)
)
Resource::close
))
.transform(LLUtils::handleDiscard);
}
private Send<Buffer> getGroupSuffix(Send<Buffer> groupKeyWithoutExtSend) {
try (var groupKeyWithoutExt = groupKeyWithoutExtSend.receive()) {
try (var groupSuffix = this.stripPrefix(groupKeyWithoutExt.copy().send()).receive()) {
assert subStageKeysConsistency(groupKeyWithoutExt.readableBytes() + keyExtLength);
return groupSuffix.send();
}
}
}
private Mono<Send<Buffer>> getGroupKeyWithoutExt(Send<Buffer> groupKeyWithoutExtSend) {
return Mono.fromCallable(() -> {
try (var groupKeyWithoutExt = groupKeyWithoutExtSend.receive()) {
assert subStageKeysConsistency(groupKeyWithoutExt.readableBytes() + keyExtLength);
return groupKeyWithoutExt.send();
}
});
}
private boolean subStageKeysConsistency(int totalKeyLength) {
if (subStageGetter instanceof SubStageGetterMapDeep) {
return totalKeyLength
@ -432,7 +393,7 @@ public class DatabaseMapDictionaryDeep<T, U, US extends DatabaseStage<U>> implem
return dictionary.clear();
} else if (range.isSingle()) {
return dictionary
.remove(LLUtils.lazyRetain(range::getSingle), LLDictionaryResultType.VOID)
.remove(Mono.fromCallable(range::getSingle), LLDictionaryResultType.VOID)
.doOnNext(Send::close)
.then();
} else {

View File

@ -16,5 +16,4 @@ public interface SubStageGetter<U, US extends DatabaseStage<U>> {
@Nullable CompositeSnapshot snapshot,
Mono<Send<Buffer>> prefixKey);
boolean isMultiKey();
}

View File

@ -6,11 +6,9 @@ import it.cavallium.dbengine.client.CompositeSnapshot;
import it.cavallium.dbengine.database.LLDictionary;
import it.cavallium.dbengine.database.serialization.Serializer;
import it.cavallium.dbengine.database.serialization.SerializerFixedBinaryLength;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import org.jetbrains.annotations.Nullable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@SuppressWarnings("unused")
@ -52,11 +50,6 @@ public class SubStageGetterHashMap<T, U, TH> implements
);
}
@Override
public boolean isMultiKey() {
return true;
}
public int getKeyHashBinaryLength() {
return keyHashSerializer.getSerializedBinaryLength();
}

View File

@ -10,7 +10,6 @@ import it.cavallium.dbengine.database.serialization.SerializerFixedBinaryLength;
import java.util.Map;
import java.util.function.Function;
import org.jetbrains.annotations.Nullable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@SuppressWarnings({"unused", "ClassCanBeRecord"})
@ -47,11 +46,6 @@ public class SubStageGetterHashSet<T, TH> implements
);
}
@Override
public boolean isMultiKey() {
return true;
}
public int getKeyHashBinaryLength() {
return keyHashSerializer.getSerializedBinaryLength();
}

View File

@ -2,15 +2,12 @@ package it.cavallium.dbengine.database.collections;
import io.netty.buffer.api.Buffer;
import io.netty.buffer.api.Send;
import io.netty.util.ReferenceCounted;
import it.cavallium.dbengine.client.CompositeSnapshot;
import it.cavallium.dbengine.database.LLDictionary;
import it.cavallium.dbengine.database.serialization.Serializer;
import it.cavallium.dbengine.database.serialization.SerializerFixedBinaryLength;
import java.util.List;
import java.util.Map;
import org.jetbrains.annotations.Nullable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
public class SubStageGetterMap<T, U> implements SubStageGetter<Map<T, U>, DatabaseMapDictionary<T, U>> {
@ -29,23 +26,12 @@ public class SubStageGetterMap<T, U> implements SubStageGetter<Map<T, U>, Databa
@Nullable CompositeSnapshot snapshot,
Mono<Send<Buffer>> prefixKeyMono) {
return Mono.usingWhen(prefixKeyMono,
prefixKey -> Mono
.fromSupplier(() -> DatabaseMapDictionary
.tail(dictionary,
prefixKey,
keySerializer,
valueSerializer
)
),
prefixKey -> Mono.fromSupplier(() -> DatabaseMapDictionary
.tail(dictionary, prefixKey, keySerializer, valueSerializer)),
prefixKey -> Mono.fromRunnable(prefixKey::close)
);
}
@Override
public boolean isMultiKey() {
return true;
}
public int getKeyBinaryLength() {
return keySerializer.getSerializedBinaryLength();
}

View File

@ -2,14 +2,11 @@ package it.cavallium.dbengine.database.collections;
import io.netty.buffer.api.Buffer;
import io.netty.buffer.api.Send;
import io.netty.util.ReferenceCounted;
import it.cavallium.dbengine.client.CompositeSnapshot;
import it.cavallium.dbengine.database.LLDictionary;
import it.cavallium.dbengine.database.serialization.SerializerFixedBinaryLength;
import java.util.List;
import java.util.Map;
import org.jetbrains.annotations.Nullable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
public class SubStageGetterMapDeep<T, U, US extends DatabaseStage<U>> implements
@ -44,24 +41,12 @@ public class SubStageGetterMapDeep<T, U, US extends DatabaseStage<U>> implements
@Nullable CompositeSnapshot snapshot,
Mono<Send<Buffer>> prefixKeyMono) {
return Mono.usingWhen(prefixKeyMono,
prefixKey -> Mono
.fromSupplier(() -> DatabaseMapDictionaryDeep
.deepIntermediate(dictionary,
prefixKey,
keySerializer,
subStageGetter,
keyExtLength
)
),
prefixKey -> Mono.fromSupplier(() -> DatabaseMapDictionaryDeep
.deepIntermediate(dictionary, prefixKey, keySerializer, subStageGetter, keyExtLength)),
prefixKey -> Mono.fromRunnable(prefixKey::close)
);
}
@Override
public boolean isMultiKey() {
return true;
}
public int getKeyBinaryLength() {
return keySerializer.getSerializedBinaryLength() + keyExtLength;
}

View File

@ -2,15 +2,12 @@ package it.cavallium.dbengine.database.collections;
import io.netty.buffer.api.Buffer;
import io.netty.buffer.api.Send;
import io.netty.util.ReferenceCounted;
import it.cavallium.dbengine.client.CompositeSnapshot;
import it.cavallium.dbengine.database.LLDictionary;
import it.cavallium.dbengine.database.collections.DatabaseEmpty.Nothing;
import it.cavallium.dbengine.database.serialization.SerializerFixedBinaryLength;
import java.util.List;
import java.util.Map;
import org.jetbrains.annotations.Nullable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
public class SubStageGetterSet<T> implements SubStageGetter<Map<T, Nothing>, DatabaseSetDictionary<T>> {
@ -32,11 +29,6 @@ public class SubStageGetterSet<T> implements SubStageGetter<Map<T, Nothing>, Dat
);
}
@Override
public boolean isMultiKey() {
return true;
}
public int getKeyBinaryLength() {
return keySerializer.getSerializedBinaryLength();
}

View File

@ -4,12 +4,8 @@ import io.netty.buffer.api.Buffer;
import io.netty.buffer.api.Send;
import it.cavallium.dbengine.client.CompositeSnapshot;
import it.cavallium.dbengine.database.LLDictionary;
import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.serialization.Serializer;
import java.util.Arrays;
import java.util.List;
import org.jetbrains.annotations.Nullable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
public class SubStageGetterSingle<T> implements SubStageGetter<T, DatabaseStageEntry<T>> {
@ -32,9 +28,4 @@ public class SubStageGetterSingle<T> implements SubStageGetter<T, DatabaseStageE
);
}
@Override
public boolean isMultiKey() {
return false;
}
}

View File

@ -1506,8 +1506,8 @@ public class LLLocalDictionary implements LLDictionary {
true,
"getRangeKeysGrouped"
),
LLLocalKeyPrefixReactiveRocksIterator::flux,
LLLocalKeyPrefixReactiveRocksIterator::release
it -> it.flux(),
it -> it.release()
)
.subscribeOn(dbScheduler),
rangeSend -> Mono.fromRunnable(rangeSend::close)

View File

@ -48,14 +48,16 @@ public class LLLocalKeyPrefixReactiveRocksIterator {
public Flux<Send<Buffer>> flux() {
return Flux
return Flux.using(
() -> range.copy().send(),
rangeSend -> Flux
.generate(() -> {
var readOptions = new ReadOptions(this.readOptions);
if (!range.hasMin() || !range.hasMax()) {
readOptions.setReadaheadSize(32 * 1024); // 32KiB
readOptions.setFillCache(canFillCache);
}
return LLLocalDictionary.getRocksIterator(alloc, allowNettyDirect, readOptions, range.copy().send(), db, cfh);
return LLLocalDictionary.getRocksIterator(alloc, allowNettyDirect, readOptions, rangeSend, db, cfh);
}, (tuple, sink) -> {
try {
var rocksIterator = tuple.getT1();
@ -63,10 +65,17 @@ public class LLLocalKeyPrefixReactiveRocksIterator {
Buffer firstGroupKey = null;
try {
while (rocksIterator.isValid()) {
try (Buffer key = LLUtils.readDirectNioBuffer(alloc, rocksIterator::key).receive()) {
Buffer key;
if (allowNettyDirect) {
key = LLUtils.readDirectNioBuffer(alloc, rocksIterator::key).receive();
} else {
key = LLUtils.fromByteArray(alloc, rocksIterator.key());
}
try (key) {
if (firstGroupKey == null) {
firstGroupKey = key.copy();
} else if (!LLUtils.equals(firstGroupKey, firstGroupKey.readerOffset(), key, key.readerOffset(), prefixLength)) {
} else if (!LLUtils.equals(firstGroupKey, firstGroupKey.readerOffset(), key, key.readerOffset(),
prefixLength)) {
break;
}
rocksIterator.next();
@ -75,6 +84,7 @@ public class LLLocalKeyPrefixReactiveRocksIterator {
}
if (firstGroupKey != null) {
var groupKeyPrefix = firstGroupKey.copy(firstGroupKey.readerOffset(), prefixLength);
assert groupKeyPrefix.isAccessible();
sink.next(groupKeyPrefix.send());
} else {
sink.complete();
@ -94,7 +104,9 @@ public class LLLocalKeyPrefixReactiveRocksIterator {
tuple.getT2().close();
tuple.getT3().close();
tuple.getT4().close();
});
}),
Send::close
);
}
public void release() {

View File

@ -97,9 +97,9 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
PlatformDependent.getUnsafeUnavailabilityCause()
);
}
if (!PlatformDependent.hasDirectBufferNoCleanerConstructor()) {
throw new UnsupportedOperationException("Please enable unsafe support or disable netty direct buffers:"
+ " DirectBufferNoCleanerConstructor is not available");
if (!MemorySegmentUtils.isSupported()) {
throw new UnsupportedOperationException("Please enable Foreign Memory Access API support or disable"
+ " netty direct buffers");
}
}

View File

@ -0,0 +1,94 @@
package it.cavallium.dbengine.database.disk;
import io.netty.util.internal.PlatformDependent;
import java.lang.invoke.MethodHandle;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodHandles.Lookup;
import java.lang.invoke.MethodType;
import java.nio.ByteBuffer;
public class MemorySegmentUtils {
private static final MethodHandle OF_NATIVE_RESTRICTED;
private static final MethodHandle AS_SLICE;
private static final MethodHandle AS_BYTE_BUFFER;
private static Throwable cause;
private static final Object NATIVE;
static {
Lookup lookup = MethodHandles.publicLookup();
Object nativeVal = null;
MethodHandle ofNativeRestricted;
try {
ofNativeRestricted = lookup.findStatic(Class.forName("jdk.incubator.foreign.MemorySegment"),
"ofNativeRestricted",
MethodType.methodType(Class.forName("jdk.incubator.foreign.MemorySegment"))
);
try {
nativeVal = ofNativeRestricted.invoke();
} catch (Throwable e) {
cause = e;
}
} catch (NoSuchMethodException | IllegalAccessException | ClassNotFoundException e) {
ofNativeRestricted = null;
cause = e;
}
OF_NATIVE_RESTRICTED = ofNativeRestricted;
MethodHandle asSlice;
try {
asSlice = lookup.findVirtual(Class.forName("jdk.incubator.foreign.MemorySegment"),
"asSlice",
MethodType.methodType(Class.forName("jdk.incubator.foreign.MemorySegment"), long.class, long.class)
);
} catch (NoSuchMethodException | IllegalAccessException | ClassNotFoundException e) {
asSlice = null;
cause = e;
}
AS_SLICE = asSlice;
MethodHandle asByteBuffer;
try {
asByteBuffer = lookup.findVirtual(Class.forName("jdk.incubator.foreign.MemorySegment"),
"asByteBuffer", MethodType.methodType(ByteBuffer.class));
} catch (NoSuchMethodException | IllegalAccessException | ClassNotFoundException e) {
asByteBuffer = null;
cause = e;
}
AS_BYTE_BUFFER = asByteBuffer;
NATIVE = nativeVal;
}
public static ByteBuffer directBuffer(long address, long size) {
if (address <= 0) {
throw new IllegalArgumentException("Address is " + address);
}
if (size > Integer.MAX_VALUE || size < 0) {
throw new IllegalArgumentException("size is " + size);
}
try {
if (!isSupported()) {
if (PlatformDependent.hasDirectBufferNoCleanerConstructor()) {
return PlatformDependent.directBuffer(address, (int) size);
}
throw new UnsupportedOperationException("Foreign Memory Access API is not enabled!");
}
var memorySegment = AS_SLICE.invoke(NATIVE, address, size);
return (ByteBuffer) AS_BYTE_BUFFER.invoke(memorySegment);
} catch (Throwable e) {
throw new UnsupportedOperationException("Foreign Memory Access API is not enabled!", e);
}
}
public static boolean isSupported() {
return OF_NATIVE_RESTRICTED != null && AS_SLICE != null && AS_BYTE_BUFFER != null && NATIVE != null;
}
public static Throwable getUnsupportedCause() {
return cause;
}
}

View File

@ -2,16 +2,12 @@ package it.cavallium.dbengine;
import static org.junit.jupiter.api.Assertions.assertEquals;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.PoolArenaMetric;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.buffer.api.Buffer;
import io.netty.buffer.api.MemoryManager;
import io.netty.buffer.api.Send;
import io.netty.buffer.api.pool.BufferAllocatorMetric;
import io.netty.buffer.api.pool.PooledBufferAllocator;
import io.netty.util.internal.PlatformDependent;
import it.cavallium.dbengine.database.Column;
import it.cavallium.dbengine.database.LLDatabaseConnection;
import it.cavallium.dbengine.database.LLDictionary;
@ -26,6 +22,7 @@ import it.cavallium.dbengine.database.collections.SubStageGetterHashMap;
import it.cavallium.dbengine.database.collections.SubStageGetterMap;
import it.cavallium.dbengine.client.DatabaseOptions;
import it.cavallium.dbengine.database.disk.LLLocalDatabaseConnection;
import it.cavallium.dbengine.database.disk.MemorySegmentUtils;
import it.cavallium.dbengine.database.serialization.Serializer;
import it.cavallium.dbengine.database.serialization.SerializerFixedBinaryLength;
import java.io.IOException;
@ -78,6 +75,7 @@ public class DbTestUtils {
Path path) {}
public static Mono<TempDb> openTempDb(TestAllocator alloc) {
boolean canUseNettyDirect = computeCanUseNettyDirect();
return Mono.defer(() -> {
var wrkspcPath = Path.of("/tmp/.cache/tempdb-" + dbId.incrementAndGet() + "/");
return Mono
@ -99,13 +97,33 @@ public class DbTestUtils {
.flatMap(conn -> conn
.getDatabase("testdb",
List.of(Column.dictionary("testmap"), Column.special("ints"), Column.special("longs")),
new DatabaseOptions(Map.of(), true, false, true, false, true, false, false, -1)
new DatabaseOptions(Map.of(), true, false, true, false, true, canUseNettyDirect, canUseNettyDirect, -1)
)
.map(db -> new TempDb(alloc, conn, db, wrkspcPath))
);
});
}
private static boolean computeCanUseNettyDirect() {
boolean canUse = true;
if (!PlatformDependent.hasUnsafe()) {
System.err.println("Warning! Unsafe is not available!"
+ " Netty direct buffers will not be used in tests!");
canUse = false;
}
if (!MemorySegmentUtils.isSupported()) {
System.err.println("Warning! Foreign Memory Access API is not available!"
+ " Netty direct buffers will not be used in tests!"
+ " Please set \"--enable-preview --add-modules jdk.incubator.foreign --foreign.restricted=permit\"");
if (MemorySegmentUtils.getUnsupportedCause() != null) {
System.err.println("\tCause: " + MemorySegmentUtils.getUnsupportedCause().getClass().getName()
+ ":" + MemorySegmentUtils.getUnsupportedCause().getLocalizedMessage());
}
canUse = false;
}
return canUse;
}
public static Mono<Void> closeTempDb(TempDb tempDb) {
return tempDb.db().close().then(tempDb.connection().disconnect()).then(Mono.fromCallable(() -> {
ensureNoLeaks(tempDb.allocator().allocator(), false);

View File

@ -10,6 +10,7 @@ import static it.cavallium.dbengine.DbTestUtils.tempDictionary;
import it.cavallium.dbengine.DbTestUtils.TestAllocator;
import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.UpdateMode;
import it.cavallium.dbengine.database.collections.DatabaseMapDictionaryDeep;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
@ -214,18 +215,22 @@ public class TestDictionaryMapDeep {
Step<Tuple3<String, String, String>> stpVer = StepVerifier
.create(tempDb(allocator, db -> tempDictionary(db, updateMode)
.map(dict -> tempDatabaseMapDictionaryDeepMap(dict, 5, 6))
.flatMapMany(map -> map
.flatMapMany(map_ -> Flux.using(
() -> map_,
map -> map
.at(null, key)
.flatMap(v -> v
.set(value)
.doAfterTerminate(v::release)
)
.flatMap(v_ -> Mono.using(
() -> v_,
v -> v.set(value),
DatabaseMapDictionaryDeep::release
))
.then(map
.at(null, "capra")
.flatMap(v -> v
.set(Map.of("normal", "123", "ormaln", "456"))
.doAfterTerminate(v::release)
)
.flatMap(v_ -> Mono.using(
() -> v_,
v -> v.set(Map.of("normal", "123", "ormaln", "456")),
DatabaseMapDictionaryDeep::release
))
)
.thenMany(map
.getAllStages(null)
@ -234,9 +239,9 @@ public class TestDictionaryMapDeep {
.map(result -> Tuples.of(v.getKey(), result.getKey(), result.getValue()))
.doAfterTerminate(() -> v.getValue().release())
)
)
.doAfterTerminate(map::release)
)
),
DatabaseMapDictionaryDeep::release
))
));
if (shouldFail) {
stpVer.verifyError();