This commit is contained in:
Andrea Cavalli 2021-08-28 22:42:51 +02:00
parent 0c17af2ae5
commit 03b5876001
28 changed files with 963 additions and 567 deletions

View File

@ -377,17 +377,17 @@
<dependency> <dependency>
<groupId>io.projectreactor</groupId> <groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId> <artifactId>reactor-core</artifactId>
<version>3.4.8</version> <version>3.4.9</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>io.projectreactor</groupId> <groupId>io.projectreactor</groupId>
<artifactId>reactor-tools</artifactId> <artifactId>reactor-tools</artifactId>
<version>3.4.8</version> <version>3.4.9</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>io.projectreactor</groupId> <groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId> <artifactId>reactor-test</artifactId>
<version>3.4.8</version> <version>3.4.9</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.novasearch</groupId> <groupId>org.novasearch</groupId>

View File

@ -14,5 +14,4 @@ public record DatabaseOptions(Map<String, String> extraFlags,
boolean allowMemoryMapping, boolean allowMemoryMapping,
boolean allowNettyDirect, boolean allowNettyDirect,
boolean useNettyDirect, boolean useNettyDirect,
boolean enableDbAssertionsWhenUsingAssertions,
int maxOpenFiles) {} int maxOpenFiles) {}

View File

@ -71,23 +71,23 @@ public interface LLDictionary extends LLKeyValueDatabaseStructure {
return getMulti(snapshot, keys, false); return getMulti(snapshot, keys, false);
} }
Flux<Entry<ByteBuf, ByteBuf>> putMulti(Flux<Entry<ByteBuf, ByteBuf>> entries, boolean getOldValues); Flux<LLEntry> putMulti(Flux<LLEntry> entries, boolean getOldValues);
<X> Flux<ExtraKeyOperationResult<ByteBuf, X>> updateMulti(Flux<Tuple2<ByteBuf, X>> entries, <X> Flux<ExtraKeyOperationResult<ByteBuf, X>> updateMulti(Flux<Tuple2<ByteBuf, X>> entries,
BiSerializationFunction<ByteBuf, X, ByteBuf> updateFunction); BiSerializationFunction<ByteBuf, X, ByteBuf> updateFunction);
Flux<Entry<ByteBuf, ByteBuf>> getRange(@Nullable LLSnapshot snapshot, Mono<LLRange> range, boolean existsAlmostCertainly); Flux<LLEntry> getRange(@Nullable LLSnapshot snapshot, Mono<LLRange> range, boolean existsAlmostCertainly);
default Flux<Entry<ByteBuf, ByteBuf>> getRange(@Nullable LLSnapshot snapshot, Mono<LLRange> range) { default Flux<LLEntry> getRange(@Nullable LLSnapshot snapshot, Mono<LLRange> range) {
return getRange(snapshot, range, false); return getRange(snapshot, range, false);
} }
Flux<List<Entry<ByteBuf, ByteBuf>>> getRangeGrouped(@Nullable LLSnapshot snapshot, Flux<List<LLEntry>> getRangeGrouped(@Nullable LLSnapshot snapshot,
Mono<LLRange> range, Mono<LLRange> range,
int prefixLength, int prefixLength,
boolean existsAlmostCertainly); boolean existsAlmostCertainly);
default Flux<List<Entry<ByteBuf, ByteBuf>>> getRangeGrouped(@Nullable LLSnapshot snapshot, default Flux<List<LLEntry>> getRangeGrouped(@Nullable LLSnapshot snapshot,
Mono<LLRange> range, Mono<LLRange> range,
int prefixLength) { int prefixLength) {
return getRangeGrouped(snapshot, range, prefixLength, false); return getRangeGrouped(snapshot, range, prefixLength, false);
@ -101,11 +101,11 @@ public interface LLDictionary extends LLKeyValueDatabaseStructure {
Flux<BadBlock> badBlocks(Mono<LLRange> range); Flux<BadBlock> badBlocks(Mono<LLRange> range);
Mono<Void> setRange(Mono<LLRange> range, Flux<Entry<ByteBuf, ByteBuf>> entries); Mono<Void> setRange(Mono<LLRange> range, Flux<LLEntry> entries);
default Mono<Void> replaceRange(Mono<LLRange> range, default Mono<Void> replaceRange(Mono<LLRange> range,
boolean canKeysChange, boolean canKeysChange,
Function<Entry<ByteBuf, ByteBuf>, Mono<Entry<ByteBuf, ByteBuf>>> entriesReplacer, Function<LLEntry, Mono<LLEntry>> entriesReplacer,
boolean existsAlmostCertainly) { boolean existsAlmostCertainly) {
return Mono.defer(() -> { return Mono.defer(() -> {
if (canKeysChange) { if (canKeysChange) {
@ -126,7 +126,7 @@ public interface LLDictionary extends LLKeyValueDatabaseStructure {
default Mono<Void> replaceRange(Mono<LLRange> range, default Mono<Void> replaceRange(Mono<LLRange> range,
boolean canKeysChange, boolean canKeysChange,
Function<Entry<ByteBuf, ByteBuf>, Mono<Entry<ByteBuf, ByteBuf>>> entriesReplacer) { Function<LLEntry, Mono<LLEntry>> entriesReplacer) {
return replaceRange(range, canKeysChange, entriesReplacer, false); return replaceRange(range, canKeysChange, entriesReplacer, false);
} }
@ -134,9 +134,9 @@ public interface LLDictionary extends LLKeyValueDatabaseStructure {
Mono<Long> sizeRange(@Nullable LLSnapshot snapshot, Mono<LLRange> range, boolean fast); Mono<Long> sizeRange(@Nullable LLSnapshot snapshot, Mono<LLRange> range, boolean fast);
Mono<Entry<ByteBuf, ByteBuf>> getOne(@Nullable LLSnapshot snapshot, Mono<LLRange> range); Mono<LLEntry> getOne(@Nullable LLSnapshot snapshot, Mono<LLRange> range);
Mono<ByteBuf> getOneKey(@Nullable LLSnapshot snapshot, Mono<LLRange> range); Mono<ByteBuf> getOneKey(@Nullable LLSnapshot snapshot, Mono<LLRange> range);
Mono<Entry<ByteBuf, ByteBuf>> removeOne(Mono<LLRange> range); Mono<LLEntry> removeOne(Mono<LLRange> range);
} }

View File

@ -0,0 +1,74 @@
package it.cavallium.dbengine.database;
import io.netty.buffer.ByteBuf;
import io.netty.util.IllegalReferenceCountException;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import org.warp.commonutils.log.Logger;
import org.warp.commonutils.log.LoggerFactory;
public class LLEntry {
private static final Logger logger = LoggerFactory.getLogger(LLEntry.class);
private final AtomicInteger refCnt = new AtomicInteger(1);
private final ByteBuf key;
private final ByteBuf value;
public LLEntry(ByteBuf key, ByteBuf value) {
try {
this.key = key.retain();
this.value = value.retain();
} finally {
key.release();
value.release();
}
}
public ByteBuf getKey() {
if (refCnt.get() <= 0) {
throw new IllegalReferenceCountException(refCnt.get());
}
return key;
}
public ByteBuf getValue() {
if (refCnt.get() <= 0) {
throw new IllegalReferenceCountException(refCnt.get());
}
return value;
}
public void retain() {
if (refCnt.getAndIncrement() <= 0) {
throw new IllegalReferenceCountException(refCnt.get(), 1);
}
key.retain();
value.retain();
}
public void release() {
if (refCnt.decrementAndGet() < 0) {
throw new IllegalReferenceCountException(refCnt.get(), -1);
}
if (key.refCnt() > 0) {
key.release();
}
if (value.refCnt() > 0) {
value.release();
}
}
public boolean isReleased() {
return refCnt.get() <= 0;
}
@Override
protected void finalize() throws Throwable {
if (refCnt.get() > 0) {
logger.warn(this.getClass().getName() + "::release has not been called!");
}
super.finalize();
}
}

View File

@ -22,6 +22,7 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.Objects; import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.function.Function; import java.util.function.Function;
import java.util.function.ToIntFunction; import java.util.function.ToIntFunction;
@ -42,13 +43,19 @@ import org.apache.lucene.search.SortedNumericSortField;
import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable; import org.jetbrains.annotations.Nullable;
import org.rocksdb.RocksDB; import org.rocksdb.RocksDB;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.warp.commonutils.functional.IOFunction; import org.warp.commonutils.functional.IOFunction;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuple3;
@SuppressWarnings("unused") @SuppressWarnings("unused")
public class LLUtils { public class LLUtils {
private static final Logger logger = LoggerFactory.getLogger(LLUtils.class);
private static final byte[] RESPONSE_TRUE = new byte[]{1}; private static final byte[] RESPONSE_TRUE = new byte[]{1};
private static final byte[] RESPONSE_FALSE = new byte[]{0}; private static final byte[] RESPONSE_FALSE = new byte[]{0};
private static final byte[] RESPONSE_TRUE_BUF = new byte[]{1}; private static final byte[] RESPONSE_TRUE_BUF = new byte[]{1};
@ -514,35 +521,154 @@ public class LLUtils {
} }
public static <T> Mono<T> handleDiscard(Mono<T> mono) { public static <T> Mono<T> handleDiscard(Mono<T> mono) {
return mono.doOnDiscard(Map.Entry.class, e -> { return mono
if (e.getKey() instanceof ByteBuf bb) { .doOnDiscard(Object.class, obj -> {
if (bb.refCnt() > 0) { if (obj instanceof ReferenceCounted o) {
bb.release(); discardRefCounted(o);
} } else if (obj instanceof Entry o) {
} discardEntry(o);
if (e.getValue() instanceof ByteBuf bb) { } else if (obj instanceof Collection o) {
if (bb.refCnt() > 0) { discardCollection(o);
bb.release(); } else if (obj instanceof Tuple3 o) {
} discardTuple3(o);
} } else if (obj instanceof Tuple2 o) {
}); discardTuple2(o);
} else if (obj instanceof LLEntry o) {
discardLLEntry(o);
} else if (obj instanceof LLRange o) {
discardLLRange(o);
} else if (obj instanceof Delta o) {
discardDelta(o);
} else if (obj instanceof Map o) {
discardMap(o);
}
});
// todo: check if the single object discard hook is more performant
/*
.doOnDiscard(ReferenceCounted.class, LLUtils::discardRefCounted)
.doOnDiscard(Map.Entry.class, LLUtils::discardEntry)
.doOnDiscard(Collection.class, LLUtils::discardCollection)
.doOnDiscard(Tuple2.class, LLUtils::discardTuple2)
.doOnDiscard(Tuple3.class, LLUtils::discardTuple3)
.doOnDiscard(LLEntry.class, LLUtils::discardLLEntry)
.doOnDiscard(LLRange.class, LLUtils::discardLLRange)
.doOnDiscard(Delta.class, LLUtils::discardDelta)
.doOnDiscard(Map.class, LLUtils::discardMap);
*/
} }
public static <T> Flux<T> handleDiscard(Flux<T> mono) { public static <T> Flux<T> handleDiscard(Flux<T> mono) {
return mono return mono
.doOnDiscard(Object.class, obj -> {
if (obj instanceof ReferenceCounted o) {
discardRefCounted(o);
} else if (obj instanceof Entry o) {
discardEntry(o);
} else if (obj instanceof Collection o) {
discardCollection(o);
} else if (obj instanceof Tuple3 o) {
discardTuple3(o);
} else if (obj instanceof Tuple2 o) {
discardTuple2(o);
} else if (obj instanceof LLEntry o) {
discardLLEntry(o);
} else if (obj instanceof LLRange o) {
discardLLRange(o);
} else if (obj instanceof Delta o) {
discardDelta(o);
} else if (obj instanceof Map o) {
discardMap(o);
} else {
System.err.println(obj.getClass().getName());
}
});
// todo: check if the single object discard hook is more performant
/*
.doOnDiscard(ReferenceCounted.class, LLUtils::discardRefCounted) .doOnDiscard(ReferenceCounted.class, LLUtils::discardRefCounted)
.doOnDiscard(Map.Entry.class, LLUtils::discardEntry) .doOnDiscard(Map.Entry.class, LLUtils::discardEntry)
.doOnDiscard(Collection.class, LLUtils::discardCollection); .doOnDiscard(Collection.class, LLUtils::discardCollection)
.doOnDiscard(Tuple2.class, LLUtils::discardTuple2)
.doOnDiscard(Tuple3.class, LLUtils::discardTuple3)
.doOnDiscard(LLEntry.class, LLUtils::discardLLEntry)
.doOnDiscard(LLRange.class, LLUtils::discardLLRange)
.doOnDiscard(Delta.class, LLUtils::discardDelta)
.doOnDiscard(Map.class, LLUtils::discardMap);
*/
}
private static void discardLLEntry(LLEntry entry) {
logger.trace("Releasing discarded ByteBuf");
entry.release();
}
private static void discardLLRange(LLRange range) {
logger.trace("Releasing discarded ByteBuf");
range.release();
} }
private static void discardEntry(Map.Entry<?, ?> e) { private static void discardEntry(Map.Entry<?, ?> e) {
if (e.getKey() instanceof ByteBuf bb) { if (e.getKey() instanceof ByteBuf bb) {
if (bb.refCnt() > 0) { if (bb.refCnt() > 0) {
logger.trace("Releasing discarded ByteBuf");
bb.release(); bb.release();
} }
} }
if (e.getValue() instanceof ByteBuf bb) { if (e.getValue() instanceof ByteBuf bb) {
if (bb.refCnt() > 0) { if (bb.refCnt() > 0) {
logger.trace("Releasing discarded ByteBuf");
bb.release();
}
}
}
private static void discardTuple2(Tuple2<?, ?> e) {
if (e.getT1() instanceof ByteBuf bb) {
if (bb.refCnt() > 0) {
logger.trace("Releasing discarded ByteBuf");
bb.release();
}
}
if (e.getT2() instanceof ByteBuf bb) {
if (bb.refCnt() > 0) {
logger.trace("Releasing discarded ByteBuf");
bb.release();
}
}
}
private static void discardTuple3(Tuple3<?, ?, ?> e) {
if (e.getT1() instanceof ByteBuf bb) {
if (bb.refCnt() > 0) {
logger.trace("Releasing discarded ByteBuf");
bb.release();
}
} else if (e.getT1() instanceof Optional opt) {
if (opt.isPresent() && opt.get() instanceof ByteBuf bb) {
logger.trace("Releasing discarded ByteBuf");
bb.release();
}
}
if (e.getT2() instanceof ByteBuf bb) {
if (bb.refCnt() > 0) {
logger.trace("Releasing discarded ByteBuf");
bb.release();
}
} else if (e.getT1() instanceof Optional opt) {
if (opt.isPresent() && opt.get() instanceof ByteBuf bb) {
logger.trace("Releasing discarded ByteBuf");
bb.release();
}
}
if (e.getT3() instanceof ByteBuf bb) {
if (bb.refCnt() > 0) {
logger.trace("Releasing discarded ByteBuf");
bb.release();
}
} else if (e.getT1() instanceof Optional opt) {
if (opt.isPresent() && opt.get() instanceof ByteBuf bb) {
logger.trace("Releasing discarded ByteBuf");
bb.release(); bb.release();
} }
} }
@ -550,6 +676,7 @@ public class LLUtils {
private static void discardRefCounted(ReferenceCounted referenceCounted) { private static void discardRefCounted(ReferenceCounted referenceCounted) {
if (referenceCounted.refCnt() > 0) { if (referenceCounted.refCnt() > 0) {
logger.trace("Releasing discarded ByteBuf");
referenceCounted.release(); referenceCounted.release();
} }
} }
@ -558,16 +685,19 @@ public class LLUtils {
for (Object o : collection) { for (Object o : collection) {
if (o instanceof ReferenceCounted referenceCounted) { if (o instanceof ReferenceCounted referenceCounted) {
if (referenceCounted.refCnt() > 0) { if (referenceCounted.refCnt() > 0) {
logger.trace("Releasing discarded ByteBuf");
referenceCounted.release(); referenceCounted.release();
} }
} else if (o instanceof Map.Entry entry) { } else if (o instanceof Map.Entry entry) {
if (entry.getKey() instanceof ReferenceCounted bb) { if (entry.getKey() instanceof ReferenceCounted bb) {
if (bb.refCnt() > 0) { if (bb.refCnt() > 0) {
logger.trace("Releasing discarded ByteBuf");
bb.release(); bb.release();
} }
} }
if (entry.getValue() instanceof ReferenceCounted bb) { if (entry.getValue() instanceof ReferenceCounted bb) {
if (bb.refCnt() > 0) { if (bb.refCnt() > 0) {
logger.trace("Releasing discarded ByteBuf");
bb.release(); bb.release();
} }
} }
@ -576,4 +706,42 @@ public class LLUtils {
} }
} }
} }
private static void discardDelta(Delta<?> delta) {
if (delta.previous() instanceof ByteBuf bb) {
if (bb.refCnt() > 0) {
logger.trace("Releasing discarded ByteBuf");
bb.release();
}
}
if (delta.current() instanceof ByteBuf bb) {
if (bb.refCnt() > 0) {
logger.trace("Releasing discarded ByteBuf");
bb.release();
}
}
}
private static void discardMap(Map<?, ?> map) {
for (Entry<?, ?> entry : map.entrySet()) {
boolean hasByteBuf = false;
if (entry.getKey() instanceof ByteBuf bb) {
if (bb.refCnt() > 0) {
logger.trace("Releasing discarded ByteBuf");
bb.release();
}
hasByteBuf = true;
}
if (entry.getValue() instanceof ByteBuf bb) {
if (bb.refCnt() > 0) {
logger.trace("Releasing discarded ByteBuf");
bb.release();
}
hasByteBuf = true;
}
if (!hasByteBuf) {
break;
}
}
}
} }

View File

@ -14,7 +14,11 @@ public class DatabaseEmpty {
public static final Serializer<Nothing, ByteBuf> NOTHING_SERIALIZER = new Serializer<>() { public static final Serializer<Nothing, ByteBuf> NOTHING_SERIALIZER = new Serializer<>() {
@Override @Override
public @NotNull Nothing deserialize(@NotNull ByteBuf serialized) { public @NotNull Nothing deserialize(@NotNull ByteBuf serialized) {
return NOTHING; try {
return NOTHING;
} finally {
serialized.release();
}
} }
@Override @Override

View File

@ -7,6 +7,7 @@ import it.cavallium.dbengine.database.Delta;
import it.cavallium.dbengine.database.ExtraKeyOperationResult; import it.cavallium.dbengine.database.ExtraKeyOperationResult;
import it.cavallium.dbengine.database.LLDictionary; import it.cavallium.dbengine.database.LLDictionary;
import it.cavallium.dbengine.database.LLDictionaryResultType; import it.cavallium.dbengine.database.LLDictionaryResultType;
import it.cavallium.dbengine.database.LLEntry;
import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.UpdateMode; import it.cavallium.dbengine.database.UpdateMode;
import it.cavallium.dbengine.database.UpdateReturnMode; import it.cavallium.dbengine.database.UpdateReturnMode;
@ -15,6 +16,7 @@ import it.cavallium.dbengine.database.serialization.SerializationException;
import it.cavallium.dbengine.database.serialization.SerializationFunction; import it.cavallium.dbengine.database.serialization.SerializationFunction;
import it.cavallium.dbengine.database.serialization.Serializer; import it.cavallium.dbengine.database.serialization.Serializer;
import it.cavallium.dbengine.database.serialization.SerializerFixedBinaryLength; import it.cavallium.dbengine.database.serialization.SerializerFixedBinaryLength;
import java.nio.ByteBuffer;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
@ -28,6 +30,7 @@ import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import reactor.core.publisher.SynchronousSink; import reactor.core.publisher.SynchronousSink;
import reactor.util.function.Tuple2; import reactor.util.function.Tuple2;
import reactor.util.function.Tuple3;
import reactor.util.function.Tuples; import reactor.util.function.Tuples;
/** /**
@ -87,6 +90,8 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
sink.next(Map.entry(key, value)); sink.next(Map.entry(key, value));
} catch (SerializationException ex) { } catch (SerializationException ex) {
sink.error(ex); sink.error(ex);
} finally {
entry.release();
} }
}) })
.collectMap(Entry::getKey, Entry::getValue, HashMap::new) .collectMap(Entry::getKey, Entry::getValue, HashMap::new)
@ -101,7 +106,7 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
.fromIterable(Collections.unmodifiableMap(value).entrySet()) .fromIterable(Collections.unmodifiableMap(value).entrySet())
.handle((entry, sink) -> { .handle((entry, sink) -> {
try { try {
sink.next(Map.entry(this.toKey(serializeSuffix(entry.getKey())), sink.next(new LLEntry(this.toKey(serializeSuffix(entry.getKey())),
valueSerializer.serialize(entry.getValue()))); valueSerializer.serialize(entry.getValue())));
} catch (SerializationException e) { } catch (SerializationException e) {
sink.error(e); sink.error(e);
@ -151,26 +156,18 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
@Override @Override
public Mono<Void> putValue(T keySuffix, U value) { public Mono<Void> putValue(T keySuffix, U value) {
return Mono return Mono.using(() -> serializeSuffix(keySuffix),
.using( keySuffixBuf -> Mono.using(() -> toKey(keySuffixBuf.retain()),
() -> serializeSuffix(keySuffix), keyBuf -> Mono.using(() -> valueSerializer.serialize(value),
keySuffixBuf -> Mono valueBuf -> dictionary
.using( .put(LLUtils.lazyRetain(keyBuf), LLUtils.lazyRetain(valueBuf), LLDictionaryResultType.VOID)
() -> toKey(keySuffixBuf.retain()), .doOnNext(ReferenceCounted::release),
keyBuf -> Mono ReferenceCounted::release
.using(() -> valueSerializer.serialize(value), ),
valueBuf -> dictionary
.put(LLUtils.lazyRetain(keyBuf),
LLUtils.lazyRetain(valueBuf),
LLDictionaryResultType.VOID)
.doOnNext(ReferenceCounted::release),
ReferenceCounted::release
),
ReferenceCounted::release
),
ReferenceCounted::release ReferenceCounted::release
) ),
.then(); ReferenceCounted::release
).then();
} }
@Override @Override
@ -340,35 +337,43 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
@Override @Override
public Flux<Entry<T, Optional<U>>> getMulti(@Nullable CompositeSnapshot snapshot, Flux<T> keys, boolean existsAlmostCertainly) { public Flux<Entry<T, Optional<U>>> getMulti(@Nullable CompositeSnapshot snapshot, Flux<T> keys, boolean existsAlmostCertainly) {
return dictionary return dictionary.getMulti(resolveSnapshot(snapshot), keys.flatMap(keySuffix -> Mono.fromCallable(() -> {
.getMulti(resolveSnapshot(snapshot), keys.flatMap(keySuffix -> Mono.fromCallable(() -> { ByteBuf keySuffixBuf = serializeSuffix(keySuffix);
ByteBuf keySuffixBuf = serializeSuffix(keySuffix); try {
var key = toKey(keySuffixBuf.retain());
try { try {
return Tuples.of(keySuffix, toKey(keySuffixBuf.retain())); return Tuples.of(keySuffix, key.retain());
} finally { } finally {
keySuffixBuf.release(); key.release();
} }
})), existsAlmostCertainly) } finally {
.flatMapSequential(entry -> { keySuffixBuf.release();
entry.getT2().release(); }
return Mono.fromCallable(() -> { })), existsAlmostCertainly).flatMapSequential(entry -> {
Optional<U> valueOpt; entry.getT2().release();
if (entry.getT3().isPresent()) { return Mono.fromCallable(() -> {
valueOpt = Optional.of(valueSerializer.deserialize(entry.getT3().get())); Optional<U> valueOpt;
} else { if (entry.getT3().isPresent()) {
valueOpt = Optional.empty(); var buf = entry.getT3().get();
try {
valueOpt = Optional.of(valueSerializer.deserialize(buf.retain()));
} finally {
buf.release();
} }
return Map.entry(entry.getT1(), valueOpt); } else {
}); valueOpt = Optional.empty();
}
return Map.entry(entry.getT1(), valueOpt);
}); });
}).transform(LLUtils::handleDiscard);
} }
private Entry<ByteBuf, ByteBuf> serializeEntry(T key, U value) throws SerializationException { private LLEntry serializeEntry(T key, U value) throws SerializationException {
ByteBuf serializedKey = toKey(serializeSuffix(key)); ByteBuf serializedKey = toKey(serializeSuffix(key));
try { try {
ByteBuf serializedValue = valueSerializer.serialize(value); ByteBuf serializedValue = valueSerializer.serialize(value);
try { try {
return Map.entry(serializedKey.retain(), serializedValue.retain()); return new LLEntry(serializedKey.retain(), serializedValue.retain());
} finally { } finally {
serializedValue.release(); serializedValue.release();
} }
@ -380,20 +385,21 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
@Override @Override
public Mono<Void> putMulti(Flux<Entry<T, U>> entries) { public Mono<Void> putMulti(Flux<Entry<T, U>> entries) {
var serializedEntries = entries var serializedEntries = entries
.flatMap(entry -> Mono .<LLEntry>handle((entry, sink) -> {
.fromCallable(() -> serializeEntry(entry.getKey(), entry.getValue())) try {
.doOnDiscard(Entry.class, uncastedEntry -> { sink.next(serializeEntry(entry.getKey(), entry.getValue()));
if (uncastedEntry.getKey() instanceof ByteBuf byteBuf) { } catch (SerializationException e) {
byteBuf.release(); sink.error(e);
} }
if (uncastedEntry.getValue() instanceof ByteBuf byteBuf) { });
byteBuf.release();
}
})
);
return dictionary return dictionary
.putMulti(serializedEntries, false) .putMulti(serializedEntries, false)
.then(); .then()
.doOnDiscard(LLEntry.class, entry -> {
if (!entry.isReleased()) {
entry.release();
}
});
} }
@Override @Override
@ -455,21 +461,33 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
return dictionary return dictionary
.getRange(resolveSnapshot(snapshot), rangeMono) .getRange(resolveSnapshot(snapshot), rangeMono)
.<Entry<T, U>>handle((serializedEntry, sink) -> { .<Entry<T, U>>handle((serializedEntry, sink) -> {
ByteBuf key = serializedEntry.getKey();
ByteBuf value = serializedEntry.getValue();
try { try {
sink.next(Map.entry( ByteBuf keySuffix = stripPrefix(key.retain(), false);
deserializeSuffix(stripPrefix(serializedEntry.getKey(), false)), try {
valueSerializer.deserialize(serializedEntry.getValue()) sink.next(Map.entry(deserializeSuffix(keySuffix.retain()),
)); valueSerializer.deserialize(value.retain())));
} finally {
keySuffix.release();
}
} catch (SerializationException e) { } catch (SerializationException e) {
sink.error(e); sink.error(e);
} finally {
key.release();
value.release();
} }
}) })
.doOnDiscard(Entry.class, uncastedEntry -> { .doOnDiscard(Entry.class, uncastedEntry -> {
if (uncastedEntry.getKey() instanceof ByteBuf byteBuf) { if (uncastedEntry.getKey() instanceof ByteBuf byteBuf) {
byteBuf.release(); if (byteBuf.refCnt() > 0) {
byteBuf.release();
}
} }
if (uncastedEntry.getValue() instanceof ByteBuf byteBuf) { if (uncastedEntry.getValue() instanceof ByteBuf byteBuf) {
byteBuf.release(); if (byteBuf.refCnt() > 0) {
byteBuf.release();
}
} }
}); });
} }
@ -481,8 +499,22 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
b -> getAllValues(null), b -> getAllValues(null),
b -> dictionary.setRange(rangeMono, entries.handle((entry, sink) -> { b -> dictionary.setRange(rangeMono, entries.handle((entry, sink) -> {
try { try {
ByteBuf serializedValue = valueSerializer.serialize(entry.getValue()); ByteBuf serializedKeySuffix = serializeSuffix(entry.getKey());
sink.next(Map.entry(toKey(serializeSuffix(entry.getKey())), serializedValue)); try {
ByteBuf serializedKey = toKey(serializedKeySuffix);
try {
ByteBuf serializedValue = valueSerializer.serialize(entry.getValue());
try {
sink.next(new LLEntry(serializedKey.retain(), serializedValue.retain()));
} finally {
serializedValue.release();
}
} finally {
serializedKey.release();
}
} finally {
serializedKeySuffix.release();
}
} catch (SerializationException e) { } catch (SerializationException e) {
sink.error(e); sink.error(e);
} }

View File

@ -22,6 +22,7 @@ import java.util.Map.Entry;
import org.jetbrains.annotations.Nullable; import org.jetbrains.annotations.Nullable;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import reactor.util.function.Tuples;
// todo: implement optimized methods (which?) // todo: implement optimized methods (which?)
public class DatabaseMapDictionaryDeep<T, U, US extends DatabaseStage<U>> implements DatabaseStageMap<T, U, US> { public class DatabaseMapDictionaryDeep<T, U, US extends DatabaseStage<U>> implements DatabaseStageMap<T, U, US> {
@ -393,25 +394,10 @@ public class DatabaseMapDictionaryDeep<T, U, US extends DatabaseStage<U>> implem
return Mono.using( return Mono.using(
() -> serializeSuffix(keySuffix), () -> serializeSuffix(keySuffix),
keySuffixData -> { keySuffixData -> {
Flux<ByteBuf> debuggingKeysFlux = Mono.<List<ByteBuf>>defer(() -> {
if (LLLocalDictionary.DEBUG_PREFIXES_WHEN_ASSERTIONS_ARE_ENABLED
&& this.subStageGetter.needsDebuggingKeyFlux()) {
return Flux
.using(
() -> toExtRange(keySuffixData.retain()),
extRangeBuf -> this.dictionary
.getRangeKeys(resolveSnapshot(snapshot), LLUtils.lazyRetainRange(extRangeBuf)),
LLRange::release
)
.collectList();
} else {
return Mono.just(List.of());
}
}).flatMapIterable(it -> it);
return Mono.using( return Mono.using(
() -> toKeyWithoutExt(keySuffixData.retain()), () -> toKeyWithoutExt(keySuffixData.retain()),
keyWithoutExt -> this.subStageGetter keyWithoutExt -> this.subStageGetter
.subStage(dictionary, snapshot, LLUtils.lazyRetain(keyWithoutExt), debuggingKeysFlux), .subStage(dictionary, snapshot, LLUtils.lazyRetain(keyWithoutExt)),
ReferenceCounted::release ReferenceCounted::release
); );
}, },
@ -433,87 +419,43 @@ public class DatabaseMapDictionaryDeep<T, U, US extends DatabaseStage<U>> implem
@Override @Override
public Flux<Entry<T, US>> getAllStages(@Nullable CompositeSnapshot snapshot) { public Flux<Entry<T, US>> getAllStages(@Nullable CompositeSnapshot snapshot) {
return Flux return Flux
.defer(() -> { .defer(() -> dictionary.getRangeKeyPrefixes(resolveSnapshot(snapshot), rangeMono, keyPrefixLength + keySuffixLength))
if (LLLocalDictionary.DEBUG_PREFIXES_WHEN_ASSERTIONS_ARE_ENABLED && this.subStageGetter.needsDebuggingKeyFlux()) { .flatMapSequential(groupKeyWithoutExt -> Mono
return dictionary .using(
.getRangeKeysGrouped(resolveSnapshot(snapshot), rangeMono, keyPrefixLength + keySuffixLength) () -> {
.concatMap(rangeKeys -> Flux try {
.using( var groupSuffix = this.stripPrefix(groupKeyWithoutExt.retain(), true);
() -> { try {
assert this.subStageGetter.isMultiKey() || rangeKeys.size() == 1; assert subStageKeysConsistency(groupKeyWithoutExt.readableBytes() + keyExtLength);
ByteBuf groupKeyWithExt = rangeKeys.get(0).retainedSlice(); return Tuples.of(groupKeyWithoutExt.retain(), groupSuffix.retain());
ByteBuf groupKeyWithoutExt = removeExtFromFullKey(groupKeyWithExt.retain(), true); } finally {
ByteBuf groupSuffix = this.stripPrefix(groupKeyWithoutExt.retain(), true); groupSuffix.release();
return new GroupBuffers(groupKeyWithExt, groupKeyWithoutExt, groupSuffix);
},
buffers -> Mono
.fromCallable(() -> {
assert subStageKeysConsistency(buffers.groupKeyWithExt.readableBytes());
return null;
})
.then(this.subStageGetter
.subStage(dictionary,
snapshot,
LLUtils.lazyRetain(buffers.groupKeyWithoutExt),
Flux.fromIterable(rangeKeys).map(ByteBuf::retain)
)
.<Entry<T, US>>handle((us, sink) -> {
try {
var deserializedSuffix = this.deserializeSuffix(buffers.groupSuffix.retain());
sink.next(Map.entry(deserializedSuffix, us));
} catch (SerializationException ex) {
sink.error(ex);
}
})
),
buffers -> {
buffers.groupSuffix.release();
buffers.groupKeyWithoutExt.release();
buffers.groupKeyWithExt.release();
}
)
.doAfterTerminate(() -> {
for (ByteBuf rangeKey : rangeKeys) {
rangeKey.release();
}
})
)
.doOnDiscard(Collection.class, discardedCollection -> {
for (Object o : discardedCollection) {
if (o instanceof ByteBuf byteBuf) {
byteBuf.release();
} }
} finally {
groupKeyWithoutExt.release();
} }
}); },
} else { groupKeyWithoutExtAndGroupSuffix -> this.subStageGetter
return Flux .subStage(dictionary,
.defer(() -> dictionary.getRangeKeyPrefixes(resolveSnapshot(snapshot), rangeMono, keyPrefixLength + keySuffixLength)) snapshot,
.flatMapSequential(groupKeyWithoutExt -> Mono LLUtils.lazyRetain(groupKeyWithoutExtAndGroupSuffix.getT1())
.using(
() -> {
var groupSuffix = this.stripPrefix(groupKeyWithoutExt.retain(), true);
assert subStageKeysConsistency(groupKeyWithoutExt.readableBytes() + keyExtLength);
return groupSuffix;
},
groupSuffix -> this.subStageGetter
.subStage(dictionary,
snapshot,
LLUtils.lazyRetain(groupKeyWithoutExt),
Flux.empty()
)
.<Entry<T, US>>handle((us, sink) -> {
try {
sink.next(Map.entry(this.deserializeSuffix(groupSuffix.retain()), us));
} catch (SerializationException ex) {
sink.error(ex);
}
}),
ReferenceCounted::release
) )
); .<Entry<T, US>>handle((us, sink) -> {
} try {
}); sink.next(Map.entry(this.deserializeSuffix(groupKeyWithoutExtAndGroupSuffix.getT2().retain()), us));
} catch (SerializationException ex) {
sink.error(ex);
}
}),
entry -> {
entry.getT1().release();
entry.getT2().release();
}
)
)
.transform(LLUtils::handleDiscard);
} }
private boolean subStageKeysConsistency(int totalKeyLength) { private boolean subStageKeysConsistency(int totalKeyLength) {

View File

@ -13,10 +13,7 @@ public interface SubStageGetter<U, US extends DatabaseStage<U>> {
Mono<US> subStage(LLDictionary dictionary, Mono<US> subStage(LLDictionary dictionary,
@Nullable CompositeSnapshot snapshot, @Nullable CompositeSnapshot snapshot,
Mono<ByteBuf> prefixKey, Mono<ByteBuf> prefixKey);
@Nullable Flux<ByteBuf> debuggingKeyFlux);
boolean isMultiKey(); boolean isMultiKey();
boolean needsDebuggingKeyFlux();
} }

View File

@ -16,38 +16,25 @@ import reactor.core.publisher.Mono;
public class SubStageGetterHashMap<T, U, TH> implements public class SubStageGetterHashMap<T, U, TH> implements
SubStageGetter<Map<T, U>, DatabaseMapDictionaryHashed<T, U, TH>> { SubStageGetter<Map<T, U>, DatabaseMapDictionaryHashed<T, U, TH>> {
private static final boolean assertsEnabled;
static {
boolean assertsEnabledTmp = false;
//noinspection AssertWithSideEffects
assert assertsEnabledTmp = true;
//noinspection ConstantConditions
assertsEnabled = assertsEnabledTmp;
}
private final Serializer<T, ByteBuf> keySerializer; private final Serializer<T, ByteBuf> keySerializer;
private final Serializer<U, ByteBuf> valueSerializer; private final Serializer<U, ByteBuf> valueSerializer;
private final Function<T, TH> keyHashFunction; private final Function<T, TH> keyHashFunction;
private final SerializerFixedBinaryLength<TH, ByteBuf> keyHashSerializer; private final SerializerFixedBinaryLength<TH, ByteBuf> keyHashSerializer;
private final boolean enableAssertionsWhenUsingAssertions;
public SubStageGetterHashMap(Serializer<T, ByteBuf> keySerializer, public SubStageGetterHashMap(Serializer<T, ByteBuf> keySerializer,
Serializer<U, ByteBuf> valueSerializer, Serializer<U, ByteBuf> valueSerializer,
Function<T, TH> keyHashFunction, Function<T, TH> keyHashFunction,
SerializerFixedBinaryLength<TH, ByteBuf> keyHashSerializer, SerializerFixedBinaryLength<TH, ByteBuf> keyHashSerializer) {
boolean enableAssertionsWhenUsingAssertions) {
this.keySerializer = keySerializer; this.keySerializer = keySerializer;
this.valueSerializer = valueSerializer; this.valueSerializer = valueSerializer;
this.keyHashFunction = keyHashFunction; this.keyHashFunction = keyHashFunction;
this.keyHashSerializer = keyHashSerializer; this.keyHashSerializer = keyHashSerializer;
this.enableAssertionsWhenUsingAssertions = enableAssertionsWhenUsingAssertions;
} }
@Override @Override
public Mono<DatabaseMapDictionaryHashed<T, U, TH>> subStage(LLDictionary dictionary, public Mono<DatabaseMapDictionaryHashed<T, U, TH>> subStage(LLDictionary dictionary,
@Nullable CompositeSnapshot snapshot, @Nullable CompositeSnapshot snapshot,
Mono<ByteBuf> prefixKeyMono, Mono<ByteBuf> prefixKeyMono) {
@Nullable Flux<ByteBuf> debuggingKeysFlux) {
return Mono.usingWhen( return Mono.usingWhen(
prefixKeyMono, prefixKeyMono,
prefixKey -> Mono prefixKey -> Mono
@ -59,24 +46,7 @@ public class SubStageGetterHashMap<T, U, TH> implements
keyHashFunction, keyHashFunction,
keyHashSerializer keyHashSerializer
) )
) ),
.transform(mono -> {
if (debuggingKeysFlux != null) {
return debuggingKeysFlux.handle((key, sink) -> {
try {
if (key.readableBytes() != prefixKey.readableBytes() + getKeyHashBinaryLength()) {
sink.error(new IndexOutOfBoundsException());
} else {
sink.complete();
}
} finally {
key.release();
}
}).then(mono);
} else {
return mono;
}
}),
prefixKey -> Mono.fromRunnable(prefixKey::release) prefixKey -> Mono.fromRunnable(prefixKey::release)
); );
} }
@ -86,11 +56,6 @@ public class SubStageGetterHashMap<T, U, TH> implements
return true; return true;
} }
@Override
public boolean needsDebuggingKeyFlux() {
return assertsEnabled && enableAssertionsWhenUsingAssertions;
}
public int getKeyHashBinaryLength() { public int getKeyHashBinaryLength() {
return keyHashSerializer.getSerializedBinaryLength(); return keyHashSerializer.getSerializedBinaryLength();
} }

View File

@ -16,35 +16,22 @@ import reactor.core.publisher.Mono;
public class SubStageGetterHashSet<T, TH> implements public class SubStageGetterHashSet<T, TH> implements
SubStageGetter<Map<T, Nothing>, DatabaseSetDictionaryHashed<T, TH>> { SubStageGetter<Map<T, Nothing>, DatabaseSetDictionaryHashed<T, TH>> {
private static final boolean assertsEnabled;
static {
boolean assertsEnabledTmp = false;
//noinspection AssertWithSideEffects
assert assertsEnabledTmp = true;
//noinspection ConstantConditions
assertsEnabled = assertsEnabledTmp;
}
private final Serializer<T, ByteBuf> keySerializer; private final Serializer<T, ByteBuf> keySerializer;
private final Function<T, TH> keyHashFunction; private final Function<T, TH> keyHashFunction;
private final SerializerFixedBinaryLength<TH, ByteBuf> keyHashSerializer; private final SerializerFixedBinaryLength<TH, ByteBuf> keyHashSerializer;
private final boolean enableAssertionsWhenUsingAssertions;
public SubStageGetterHashSet(Serializer<T, ByteBuf> keySerializer, public SubStageGetterHashSet(Serializer<T, ByteBuf> keySerializer,
Function<T, TH> keyHashFunction, Function<T, TH> keyHashFunction,
SerializerFixedBinaryLength<TH, ByteBuf> keyHashSerializer, SerializerFixedBinaryLength<TH, ByteBuf> keyHashSerializer) {
boolean enableAssertionsWhenUsingAssertions) {
this.keySerializer = keySerializer; this.keySerializer = keySerializer;
this.keyHashFunction = keyHashFunction; this.keyHashFunction = keyHashFunction;
this.keyHashSerializer = keyHashSerializer; this.keyHashSerializer = keyHashSerializer;
this.enableAssertionsWhenUsingAssertions = enableAssertionsWhenUsingAssertions;
} }
@Override @Override
public Mono<DatabaseSetDictionaryHashed<T, TH>> subStage(LLDictionary dictionary, public Mono<DatabaseSetDictionaryHashed<T, TH>> subStage(LLDictionary dictionary,
@Nullable CompositeSnapshot snapshot, @Nullable CompositeSnapshot snapshot,
Mono<ByteBuf> prefixKeyMono, Mono<ByteBuf> prefixKeyMono) {
@Nullable Flux<ByteBuf> debuggingKeysFlux) {
return Mono.usingWhen(prefixKeyMono, return Mono.usingWhen(prefixKeyMono,
prefixKey -> Mono prefixKey -> Mono
.fromSupplier(() -> DatabaseSetDictionaryHashed .fromSupplier(() -> DatabaseSetDictionaryHashed
@ -54,24 +41,7 @@ public class SubStageGetterHashSet<T, TH> implements
keyHashFunction, keyHashFunction,
keyHashSerializer keyHashSerializer
) )
) ),
.transform(mono -> {
if (debuggingKeysFlux != null) {
return debuggingKeysFlux.handle((key, sink) -> {
try {
if (key.readableBytes() != prefixKey.readableBytes() + getKeyHashBinaryLength()) {
sink.error(new IndexOutOfBoundsException());
} else {
sink.complete();
}
} finally {
key.release();
}
}).then(mono);
} else {
return mono;
}
}),
prefixKey -> Mono.fromRunnable(prefixKey::release) prefixKey -> Mono.fromRunnable(prefixKey::release)
); );
} }
@ -81,11 +51,6 @@ public class SubStageGetterHashSet<T, TH> implements
return true; return true;
} }
@Override
public boolean needsDebuggingKeyFlux() {
return assertsEnabled && enableAssertionsWhenUsingAssertions;
}
public int getKeyHashBinaryLength() { public int getKeyHashBinaryLength() {
return keyHashSerializer.getSerializedBinaryLength(); return keyHashSerializer.getSerializedBinaryLength();
} }

View File

@ -14,31 +14,19 @@ import reactor.core.publisher.Mono;
public class SubStageGetterMap<T, U> implements SubStageGetter<Map<T, U>, DatabaseMapDictionary<T, U>> { public class SubStageGetterMap<T, U> implements SubStageGetter<Map<T, U>, DatabaseMapDictionary<T, U>> {
private static final boolean assertsEnabled;
static {
boolean assertsEnabledTmp = false;
//noinspection AssertWithSideEffects
assert assertsEnabledTmp = true;
//noinspection ConstantConditions
assertsEnabled = assertsEnabledTmp;
}
private final SerializerFixedBinaryLength<T, ByteBuf> keySerializer; private final SerializerFixedBinaryLength<T, ByteBuf> keySerializer;
private final Serializer<U, ByteBuf> valueSerializer; private final Serializer<U, ByteBuf> valueSerializer;
private final boolean enableAssertionsWhenUsingAssertions;
public SubStageGetterMap(SerializerFixedBinaryLength<T, ByteBuf> keySerializer, public SubStageGetterMap(SerializerFixedBinaryLength<T, ByteBuf> keySerializer,
Serializer<U, ByteBuf> valueSerializer, boolean enableAssertionsWhenUsingAssertions) { Serializer<U, ByteBuf> valueSerializer) {
this.keySerializer = keySerializer; this.keySerializer = keySerializer;
this.valueSerializer = valueSerializer; this.valueSerializer = valueSerializer;
this.enableAssertionsWhenUsingAssertions = enableAssertionsWhenUsingAssertions;
} }
@Override @Override
public Mono<DatabaseMapDictionary<T, U>> subStage(LLDictionary dictionary, public Mono<DatabaseMapDictionary<T, U>> subStage(LLDictionary dictionary,
@Nullable CompositeSnapshot snapshot, @Nullable CompositeSnapshot snapshot,
Mono<ByteBuf> prefixKeyMono, Mono<ByteBuf> prefixKeyMono) {
@Nullable Flux<ByteBuf> debuggingKeysFlux) {
return Mono.usingWhen(prefixKeyMono, return Mono.usingWhen(prefixKeyMono,
prefixKey -> Mono prefixKey -> Mono
.fromSupplier(() -> DatabaseMapDictionary .fromSupplier(() -> DatabaseMapDictionary
@ -47,24 +35,7 @@ public class SubStageGetterMap<T, U> implements SubStageGetter<Map<T, U>, Databa
keySerializer, keySerializer,
valueSerializer valueSerializer
) )
) ),
.transform(mono -> {
if (debuggingKeysFlux != null) {
return debuggingKeysFlux.handle((key, sink) -> {
try {
if (key.readableBytes() != prefixKey.readableBytes() + getKeyBinaryLength()) {
sink.error(new IndexOutOfBoundsException());
} else {
sink.complete();
}
} finally {
key.release();
}
}).then(mono);
} else {
return mono;
}
}),
prefixKey -> Mono.fromRunnable(prefixKey::release) prefixKey -> Mono.fromRunnable(prefixKey::release)
); );
} }
@ -74,11 +45,6 @@ public class SubStageGetterMap<T, U> implements SubStageGetter<Map<T, U>, Databa
return true; return true;
} }
@Override
public boolean needsDebuggingKeyFlux() {
return assertsEnabled && enableAssertionsWhenUsingAssertions;
}
public int getKeyBinaryLength() { public int getKeyBinaryLength() {
return keySerializer.getSerializedBinaryLength(); return keySerializer.getSerializedBinaryLength();
} }

View File

@ -14,28 +14,17 @@ import reactor.core.publisher.Mono;
public class SubStageGetterMapDeep<T, U, US extends DatabaseStage<U>> implements public class SubStageGetterMapDeep<T, U, US extends DatabaseStage<U>> implements
SubStageGetter<Map<T, U>, DatabaseMapDictionaryDeep<T, U, US>> { SubStageGetter<Map<T, U>, DatabaseMapDictionaryDeep<T, U, US>> {
private static final boolean assertsEnabled;
static {
boolean assertsEnabledTmp = false;
//noinspection AssertWithSideEffects
assert assertsEnabledTmp = true;
//noinspection ConstantConditions
assertsEnabled = assertsEnabledTmp;
}
private final SubStageGetter<U, US> subStageGetter; private final SubStageGetter<U, US> subStageGetter;
private final SerializerFixedBinaryLength<T, ByteBuf> keySerializer; private final SerializerFixedBinaryLength<T, ByteBuf> keySerializer;
private final int keyExtLength; private final int keyExtLength;
private final boolean enableAssertionsWhenUsingAssertions;
public SubStageGetterMapDeep(SubStageGetter<U, US> subStageGetter, public SubStageGetterMapDeep(SubStageGetter<U, US> subStageGetter,
SerializerFixedBinaryLength<T, ByteBuf> keySerializer, SerializerFixedBinaryLength<T, ByteBuf> keySerializer,
int keyExtLength, boolean enableAssertionsWhenUsingAssertions) { int keyExtLength) {
this.subStageGetter = subStageGetter; this.subStageGetter = subStageGetter;
this.keySerializer = keySerializer; this.keySerializer = keySerializer;
this.keyExtLength = keyExtLength; this.keyExtLength = keyExtLength;
assert keyExtConsistency(); assert keyExtConsistency();
this.enableAssertionsWhenUsingAssertions = enableAssertionsWhenUsingAssertions;
} }
@ -52,8 +41,7 @@ public class SubStageGetterMapDeep<T, U, US extends DatabaseStage<U>> implements
@Override @Override
public Mono<DatabaseMapDictionaryDeep<T, U, US>> subStage(LLDictionary dictionary, public Mono<DatabaseMapDictionaryDeep<T, U, US>> subStage(LLDictionary dictionary,
@Nullable CompositeSnapshot snapshot, @Nullable CompositeSnapshot snapshot,
Mono<ByteBuf> prefixKeyMono, Mono<ByteBuf> prefixKeyMono) {
@Nullable Flux<ByteBuf> debuggingKeysFlux) {
return Mono.usingWhen(prefixKeyMono, return Mono.usingWhen(prefixKeyMono,
prefixKey -> Mono prefixKey -> Mono
.fromSupplier(() -> DatabaseMapDictionaryDeep .fromSupplier(() -> DatabaseMapDictionaryDeep
@ -63,24 +51,7 @@ public class SubStageGetterMapDeep<T, U, US extends DatabaseStage<U>> implements
subStageGetter, subStageGetter,
keyExtLength keyExtLength
) )
) ),
.transform(mono -> {
if (debuggingKeysFlux != null) {
return debuggingKeysFlux.handle((key, sink) -> {
try {
if (key.readableBytes() != prefixKey.readableBytes() + getKeyBinaryLength()) {
sink.error(new IndexOutOfBoundsException());
} else {
sink.complete();
}
} finally {
key.release();
}
}).then(mono);
} else {
return mono;
}
}),
prefixKey -> Mono.fromRunnable(prefixKey::release) prefixKey -> Mono.fromRunnable(prefixKey::release)
); );
} }
@ -90,11 +61,6 @@ public class SubStageGetterMapDeep<T, U, US extends DatabaseStage<U>> implements
return true; return true;
} }
@Override
public boolean needsDebuggingKeyFlux() {
return assertsEnabled && enableAssertionsWhenUsingAssertions;
}
private Mono<Void> checkKeyFluxConsistency(ByteBuf prefixKey, List<ByteBuf> keys) { private Mono<Void> checkKeyFluxConsistency(ByteBuf prefixKey, List<ByteBuf> keys) {
return Mono return Mono
.fromCallable(() -> { .fromCallable(() -> {

View File

@ -14,49 +14,19 @@ import reactor.core.publisher.Mono;
public class SubStageGetterSet<T> implements SubStageGetter<Map<T, Nothing>, DatabaseSetDictionary<T>> { public class SubStageGetterSet<T> implements SubStageGetter<Map<T, Nothing>, DatabaseSetDictionary<T>> {
private static final boolean assertsEnabled;
static {
boolean assertsEnabledTmp = false;
//noinspection AssertWithSideEffects
assert assertsEnabledTmp = true;
//noinspection ConstantConditions
assertsEnabled = assertsEnabledTmp;
}
private final SerializerFixedBinaryLength<T, ByteBuf> keySerializer; private final SerializerFixedBinaryLength<T, ByteBuf> keySerializer;
private final boolean enableAssertionsWhenUsingAssertions;
public SubStageGetterSet(SerializerFixedBinaryLength<T, ByteBuf> keySerializer, public SubStageGetterSet(SerializerFixedBinaryLength<T, ByteBuf> keySerializer) {
boolean enableAssertionsWhenUsingAssertions) {
this.keySerializer = keySerializer; this.keySerializer = keySerializer;
this.enableAssertionsWhenUsingAssertions = enableAssertionsWhenUsingAssertions;
} }
@Override @Override
public Mono<DatabaseSetDictionary<T>> subStage(LLDictionary dictionary, public Mono<DatabaseSetDictionary<T>> subStage(LLDictionary dictionary,
@Nullable CompositeSnapshot snapshot, @Nullable CompositeSnapshot snapshot,
Mono<ByteBuf> prefixKeyMono, Mono<ByteBuf> prefixKeyMono) {
@Nullable Flux<ByteBuf> debuggingKeysFlux) {
return Mono.usingWhen(prefixKeyMono, return Mono.usingWhen(prefixKeyMono,
prefixKey -> Mono prefixKey -> Mono
.fromSupplier(() -> DatabaseSetDictionary.tail(dictionary, prefixKey.retain(), keySerializer)) .fromSupplier(() -> DatabaseSetDictionary.tail(dictionary, prefixKey.retain(), keySerializer)),
.transform(mono -> {
if (debuggingKeysFlux != null) {
return debuggingKeysFlux.handle((key, sink) -> {
try {
if (key.readableBytes() != prefixKey.readableBytes() + getKeyBinaryLength()) {
sink.error(new IndexOutOfBoundsException());
} else {
sink.complete();
}
} finally {
key.release();
}
}).then(mono);
} else {
return mono;
}
}),
prefixKey -> Mono.fromRunnable(prefixKey::release) prefixKey -> Mono.fromRunnable(prefixKey::release)
); );
} }
@ -66,11 +36,6 @@ public class SubStageGetterSet<T> implements SubStageGetter<Map<T, Nothing>, Dat
return true; return true;
} }
@Override
public boolean needsDebuggingKeyFlux() {
return assertsEnabled && enableAssertionsWhenUsingAssertions;
}
public int getKeyBinaryLength() { public int getKeyBinaryLength() {
return keySerializer.getSerializedBinaryLength(); return keySerializer.getSerializedBinaryLength();
} }

View File

@ -13,15 +13,6 @@ import reactor.core.publisher.Mono;
public class SubStageGetterSingle<T> implements SubStageGetter<T, DatabaseStageEntry<T>> { public class SubStageGetterSingle<T> implements SubStageGetter<T, DatabaseStageEntry<T>> {
private static final boolean assertsEnabled;
static {
boolean assertsEnabledTmp = false;
//noinspection AssertWithSideEffects
assert assertsEnabledTmp = true;
//noinspection ConstantConditions
assertsEnabled = assertsEnabledTmp;
}
private final Serializer<T, ByteBuf> serializer; private final Serializer<T, ByteBuf> serializer;
public SubStageGetterSingle(Serializer<T, ByteBuf> serializer) { public SubStageGetterSingle(Serializer<T, ByteBuf> serializer) {
@ -31,29 +22,11 @@ public class SubStageGetterSingle<T> implements SubStageGetter<T, DatabaseStageE
@Override @Override
public Mono<DatabaseStageEntry<T>> subStage(LLDictionary dictionary, public Mono<DatabaseStageEntry<T>> subStage(LLDictionary dictionary,
@Nullable CompositeSnapshot snapshot, @Nullable CompositeSnapshot snapshot,
Mono<ByteBuf> keyPrefixMono, Mono<ByteBuf> keyPrefixMono) {
@Nullable Flux<ByteBuf> debuggingKeysFlux) {
return Mono.usingWhen( return Mono.usingWhen(
keyPrefixMono, keyPrefixMono,
keyPrefix -> Mono keyPrefix -> Mono
.<DatabaseStageEntry<T>>fromSupplier(() -> new DatabaseSingle<>(dictionary, keyPrefix.retain(), serializer)) .<DatabaseStageEntry<T>>fromSupplier(() -> new DatabaseSingle<>(dictionary, keyPrefix.retain(), serializer)),
.transform(mono -> {
if (debuggingKeysFlux != null) {
return debuggingKeysFlux.handle((key, sink) -> {
try {
if (needsDebuggingKeyFlux() && !LLUtils.equals(keyPrefix, key)) {
sink.error(new IndexOutOfBoundsException("Found more than one element!"));
} else {
sink.complete();
}
} finally {
key.release();
}
}).then(mono);
} else {
return mono;
}
}),
keyPrefix -> Mono.fromRunnable(keyPrefix::release) keyPrefix -> Mono.fromRunnable(keyPrefix::release)
); );
} }
@ -63,9 +36,4 @@ public class SubStageGetterSingle<T> implements SubStageGetter<T, DatabaseStageE
return false; return false;
} }
@Override
public boolean needsDebuggingKeyFlux() {
return assertsEnabled;
}
} }

View File

@ -12,6 +12,7 @@ import it.cavallium.dbengine.database.Delta;
import it.cavallium.dbengine.database.ExtraKeyOperationResult; import it.cavallium.dbengine.database.ExtraKeyOperationResult;
import it.cavallium.dbengine.database.LLDictionary; import it.cavallium.dbengine.database.LLDictionary;
import it.cavallium.dbengine.database.LLDictionaryResultType; import it.cavallium.dbengine.database.LLDictionaryResultType;
import it.cavallium.dbengine.database.LLEntry;
import it.cavallium.dbengine.database.LLRange; import it.cavallium.dbengine.database.LLRange;
import it.cavallium.dbengine.database.LLSnapshot; import it.cavallium.dbengine.database.LLSnapshot;
import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.database.LLUtils;
@ -88,7 +89,6 @@ public class LLLocalDictionary implements LLDictionary {
* now it's true to avoid crashes during iterations on completely corrupted files * now it's true to avoid crashes during iterations on completely corrupted files
*/ */
static final boolean VERIFY_CHECKSUMS_WHEN_NOT_NEEDED = true; static final boolean VERIFY_CHECKSUMS_WHEN_NOT_NEEDED = true;
public static final boolean DEBUG_PREFIXES_WHEN_ASSERTIONS_ARE_ENABLED = true;
/** /**
* Default: true. Use false to debug problems with windowing. * Default: true. Use false to debug problems with windowing.
*/ */
@ -218,9 +218,9 @@ public class LLLocalDictionary implements LLDictionary {
return list; return list;
} }
private IntArrayList getLockIndicesEntries(List<Entry<ByteBuf, ByteBuf>> keys) { private IntArrayList getLockIndicesEntries(List<LLEntry> keys) {
var list = new IntArrayList(keys.size()); var list = new IntArrayList(keys.size());
for (Entry<ByteBuf, ByteBuf> key : keys) { for (LLEntry key : keys) {
list.add(getLockIndex(key.getKey())); list.add(getLockIndex(key.getKey()));
} }
return list; return list;
@ -290,7 +290,7 @@ public class LLLocalDictionary implements LLDictionary {
throw new RocksDBException("Key buffer must be direct"); throw new RocksDBException("Key buffer must be direct");
} }
ByteBuffer keyNioBuffer = LLUtils.toDirect(key); ByteBuffer keyNioBuffer = LLUtils.toDirect(key);
assert !databaseOptions.enableDbAssertionsWhenUsingAssertions() || keyNioBuffer.isDirect(); assert keyNioBuffer.isDirect();
// Create a direct result buffer because RocksDB works only with direct buffers // Create a direct result buffer because RocksDB works only with direct buffers
ByteBuf resultBuf = alloc.directBuffer(INITIAL_DIRECT_READ_BYTE_BUF_SIZE_BYTES); ByteBuf resultBuf = alloc.directBuffer(INITIAL_DIRECT_READ_BYTE_BUF_SIZE_BYTES);
try { try {
@ -300,17 +300,15 @@ public class LLLocalDictionary implements LLDictionary {
do { do {
// Create the result nio buffer to pass to RocksDB // Create the result nio buffer to pass to RocksDB
resultNioBuf = resultBuf.nioBuffer(0, resultBuf.capacity()); resultNioBuf = resultBuf.nioBuffer(0, resultBuf.capacity());
if (databaseOptions.enableDbAssertionsWhenUsingAssertions()) { assert keyNioBuffer.isDirect();
assert keyNioBuffer.isDirect(); assert resultNioBuf.isDirect();
assert resultNioBuf.isDirect();
}
valueSize = db.get(cfh, valueSize = db.get(cfh,
Objects.requireNonNullElse(readOptions, EMPTY_READ_OPTIONS), Objects.requireNonNullElse(readOptions, EMPTY_READ_OPTIONS),
keyNioBuffer.position(0), keyNioBuffer.position(0),
resultNioBuf resultNioBuf
); );
if (valueSize != RocksDB.NOT_FOUND) { if (valueSize != RocksDB.NOT_FOUND) {
if (databaseOptions.enableDbAssertionsWhenUsingAssertions()) { if (ASSERTIONS_ENABLED) {
// todo: check if position is equal to data that have been read // todo: check if position is equal to data that have been read
// todo: check if limit is equal to value size or data that have been read // todo: check if limit is equal to value size or data that have been read
assert valueSize <= 0 || resultNioBuf.limit() > 0; assert valueSize <= 0 || resultNioBuf.limit() > 0;
@ -408,11 +406,11 @@ public class LLLocalDictionary implements LLDictionary {
throw new RocksDBException("Value buffer must be direct"); throw new RocksDBException("Value buffer must be direct");
} }
var keyNioBuffer = LLUtils.toDirect(key); var keyNioBuffer = LLUtils.toDirect(key);
assert !databaseOptions.enableDbAssertionsWhenUsingAssertions() || keyNioBuffer.isDirect(); assert keyNioBuffer.isDirect();
var valueNioBuffer = LLUtils.toDirect(value); var valueNioBuffer = LLUtils.toDirect(value);
assert !databaseOptions.enableDbAssertionsWhenUsingAssertions() || valueNioBuffer.isDirect(); assert valueNioBuffer.isDirect();
db.put(cfh, validWriteOptions, keyNioBuffer, valueNioBuffer); db.put(cfh, validWriteOptions, keyNioBuffer, valueNioBuffer);
} else { } else {
db.put(cfh, validWriteOptions, LLUtils.toArray(key), LLUtils.toArray(value)); db.put(cfh, validWriteOptions, LLUtils.toArray(key), LLUtils.toArray(value));
@ -750,8 +748,7 @@ public class LLLocalDictionary implements LLDictionary {
newData = updater.apply(prevDataToSendToUpdater == null newData = updater.apply(prevDataToSendToUpdater == null
? null ? null
: prevDataToSendToUpdater.retain()); : prevDataToSendToUpdater.retain());
assert !databaseOptions.enableDbAssertionsWhenUsingAssertions() assert prevDataToSendToUpdater == null
|| prevDataToSendToUpdater == null
|| prevDataToSendToUpdater.readerIndex() == 0 || prevDataToSendToUpdater.readerIndex() == 0
|| !prevDataToSendToUpdater.isReadable(); || !prevDataToSendToUpdater.isReadable();
} finally { } finally {
@ -886,7 +883,7 @@ public class LLLocalDictionary implements LLDictionary {
.single() .single()
.map(LLUtils::booleanToResponseByteBuffer) .map(LLUtils::booleanToResponseByteBuffer)
.doAfterTerminate(() -> { .doAfterTerminate(() -> {
assert !databaseOptions.enableDbAssertionsWhenUsingAssertions() || key.refCnt() > 0; assert key.refCnt() > 0;
}); });
case PREVIOUS_VALUE -> Mono case PREVIOUS_VALUE -> Mono
.fromCallable(() -> { .fromCallable(() -> {
@ -912,7 +909,7 @@ public class LLLocalDictionary implements LLDictionary {
try { try {
return dbGet(cfh, null, key.retain(), true); return dbGet(cfh, null, key.retain(), true);
} finally { } finally {
assert !databaseOptions.enableDbAssertionsWhenUsingAssertions() || key.refCnt() > 0; assert key.refCnt() > 0;
} }
} }
} else { } else {
@ -1005,8 +1002,7 @@ public class LLLocalDictionary implements LLDictionary {
.doAfterTerminate(() -> keyBufsWindow.forEach(ReferenceCounted::release)); .doAfterTerminate(() -> keyBufsWindow.forEach(ReferenceCounted::release));
}, 2) // Max concurrency is 2 to read data while preparing the next segment }, 2) // Max concurrency is 2 to read data while preparing the next segment
.doOnDiscard(Entry.class, discardedEntry -> { .doOnDiscard(Entry.class, discardedEntry -> {
//noinspection unchecked var entry = (LLEntry) discardedEntry;
var entry = (Entry<ByteBuf, ByteBuf>) discardedEntry;
entry.getKey().release(); entry.getKey().release();
entry.getValue().release(); entry.getValue().release();
}) })
@ -1019,14 +1015,14 @@ public class LLLocalDictionary implements LLDictionary {
} }
@Override @Override
public Flux<Entry<ByteBuf, ByteBuf>> putMulti(Flux<Entry<ByteBuf, ByteBuf>> entries, boolean getOldValues) { public Flux<LLEntry> putMulti(Flux<LLEntry> entries, boolean getOldValues) {
return entries return entries
.buffer(Math.min(MULTI_GET_WINDOW, CAPPED_WRITE_BATCH_CAP)) .buffer(Math.min(MULTI_GET_WINDOW, CAPPED_WRITE_BATCH_CAP))
.flatMapSequential(ew -> Mono .flatMapSequential(ew -> Mono
.using( .using(
() -> ew, () -> ew,
entriesWindow -> Mono entriesWindow -> Mono
.<Entry<ByteBuf, ByteBuf>>fromCallable(() -> { .<LLEntry>fromCallable(() -> {
Iterable<StampedLock> locks; Iterable<StampedLock> locks;
ArrayList<Long> stamps; ArrayList<Long> stamps;
if (updateMode == UpdateMode.ALLOW) { if (updateMode == UpdateMode.ALLOW) {
@ -1047,13 +1043,15 @@ public class LLLocalDictionary implements LLDictionary {
MAX_WRITE_BATCH_SIZE, MAX_WRITE_BATCH_SIZE,
BATCH_WRITE_OPTIONS BATCH_WRITE_OPTIONS
); );
for (Entry<ByteBuf, ByteBuf> entry : entriesWindow) { for (LLEntry entry : entriesWindow) {
batch.put(cfh, entry.getKey().retain(), entry.getValue().retain()); var k = entry.getKey().retain();
var v = entry.getValue().retain();
batch.put(cfh, k, v);
} }
batch.writeToDbAndClose(); batch.writeToDbAndClose();
batch.close(); batch.close();
} else { } else {
for (Entry<ByteBuf, ByteBuf> entry : entriesWindow) { for (LLEntry entry : entriesWindow) {
db.put(cfh, EMPTY_WRITE_OPTIONS, entry.getKey().nioBuffer(), entry.getValue().nioBuffer()); db.put(cfh, EMPTY_WRITE_OPTIONS, entry.getKey().nioBuffer(), entry.getValue().nioBuffer());
} }
} }
@ -1077,8 +1075,7 @@ public class LLLocalDictionary implements LLDictionary {
return this return this
.getMulti(null, Flux .getMulti(null, Flux
.fromIterable(entriesWindow) .fromIterable(entriesWindow)
.map(Entry::getKey) .map(entry -> entry.getKey().retain())
.map(ByteBuf::retain)
.map(buf -> Tuples.of(obj, buf)), false) .map(buf -> Tuples.of(obj, buf)), false)
.publishOn(dbScheduler) .publishOn(dbScheduler)
.then(transformer); .then(transformer);
@ -1087,9 +1084,8 @@ public class LLLocalDictionary implements LLDictionary {
} }
}), }),
entriesWindow -> { entriesWindow -> {
for (Entry<ByteBuf, ByteBuf> entry : entriesWindow) { for (LLEntry entry : entriesWindow) {
entry.getKey().release(); entry.release();
entry.getValue().release();
} }
} }
), 2) // Max concurrency is 2 to read data while preparing the next segment ), 2) // Max concurrency is 2 to read data while preparing the next segment
@ -1244,7 +1240,7 @@ public class LLLocalDictionary implements LLDictionary {
} }
@Override @Override
public Flux<Entry<ByteBuf, ByteBuf>> getRange(@Nullable LLSnapshot snapshot, public Flux<LLEntry> getRange(@Nullable LLSnapshot snapshot,
Mono<LLRange> rangeMono, Mono<LLRange> rangeMono,
boolean existsAlmostCertainly) { boolean existsAlmostCertainly) {
return Flux.usingWhen(rangeMono, return Flux.usingWhen(rangeMono,
@ -1260,7 +1256,7 @@ public class LLLocalDictionary implements LLDictionary {
} }
@Override @Override
public Flux<List<Entry<ByteBuf, ByteBuf>>> getRangeGrouped(@Nullable LLSnapshot snapshot, public Flux<List<LLEntry>> getRangeGrouped(@Nullable LLSnapshot snapshot,
Mono<LLRange> rangeMono, Mono<LLRange> rangeMono,
int prefixLength, boolean existsAlmostCertainly) { int prefixLength, boolean existsAlmostCertainly) {
return Flux.usingWhen(rangeMono, return Flux.usingWhen(rangeMono,
@ -1276,18 +1272,18 @@ public class LLLocalDictionary implements LLDictionary {
); );
} }
private Flux<Entry<ByteBuf, ByteBuf>> getRangeSingle(LLSnapshot snapshot, private Flux<LLEntry> getRangeSingle(LLSnapshot snapshot,
Mono<ByteBuf> keyMono, Mono<ByteBuf> keyMono,
boolean existsAlmostCertainly) { boolean existsAlmostCertainly) {
return Flux.usingWhen(keyMono, return Flux.usingWhen(keyMono,
key -> this key -> this
.get(snapshot, Mono.just(key).map(ByteBuf::retain), existsAlmostCertainly) .get(snapshot, Mono.just(key).map(ByteBuf::retain), existsAlmostCertainly)
.map(value -> Map.entry(key.retain(), value)), .map(value -> new LLEntry(key.retain(), value)),
key -> Mono.fromRunnable(key::release) key -> Mono.fromRunnable(key::release)
); ).transform(LLUtils::handleDiscard);
} }
private Flux<Entry<ByteBuf, ByteBuf>> getRangeMulti(LLSnapshot snapshot, Mono<LLRange> rangeMono) { private Flux<LLEntry> getRangeMulti(LLSnapshot snapshot, Mono<LLRange> rangeMono) {
return Flux.usingWhen(rangeMono, return Flux.usingWhen(rangeMono,
range -> Flux.using( range -> Flux.using(
() -> new LLLocalEntryReactiveRocksIterator(db, alloc, cfh, range.retain(), () -> new LLLocalEntryReactiveRocksIterator(db, alloc, cfh, range.retain(),
@ -1299,7 +1295,7 @@ public class LLLocalDictionary implements LLDictionary {
); );
} }
private Flux<List<Entry<ByteBuf, ByteBuf>>> getRangeMultiGrouped(LLSnapshot snapshot, Mono<LLRange> rangeMono, int prefixLength) { private Flux<List<LLEntry>> getRangeMultiGrouped(LLSnapshot snapshot, Mono<LLRange> rangeMono, int prefixLength) {
return Flux.usingWhen(rangeMono, return Flux.usingWhen(rangeMono,
range -> Flux.using( range -> Flux.using(
() -> new LLLocalGroupedEntryReactiveRocksIterator(db, alloc, cfh, prefixLength, range.retain(), () -> new LLLocalGroupedEntryReactiveRocksIterator(db, alloc, cfh, prefixLength, range.retain(),
@ -1436,7 +1432,7 @@ public class LLLocalDictionary implements LLDictionary {
} }
@Override @Override
public Mono<Void> setRange(Mono<LLRange> rangeMono, Flux<Entry<ByteBuf, ByteBuf>> entries) { public Mono<Void> setRange(Mono<LLRange> rangeMono, Flux<LLEntry> entries) {
return Mono.usingWhen(rangeMono, return Mono.usingWhen(rangeMono,
range -> { range -> {
if (USE_WINDOW_IN_SET_RANGE) { if (USE_WINDOW_IN_SET_RANGE) {
@ -1520,17 +1516,14 @@ public class LLLocalDictionary implements LLDictionary {
) )
.flatMap(keysWindowFlux -> keysWindowFlux .flatMap(keysWindowFlux -> keysWindowFlux
.collectList() .collectList()
.doOnDiscard(Entry.class, discardedEntry -> {
//noinspection unchecked
var entry = (Entry<ByteBuf, ByteBuf>) discardedEntry;
entry.getKey().release();
entry.getValue().release();
})
.flatMap(entriesList -> Mono .flatMap(entriesList -> Mono
.<Void>fromCallable(() -> { .<Void>fromCallable(() -> {
try { try {
if (!USE_WRITE_BATCHES_IN_SET_RANGE) { if (!USE_WRITE_BATCHES_IN_SET_RANGE) {
for (Entry<ByteBuf, ByteBuf> entry : entriesList) { for (LLEntry entry : entriesList) {
assert !entry.isReleased();
assert entry.getKey().refCnt() > 0;
assert entry.getValue().refCnt() > 0;
db.put(cfh, EMPTY_WRITE_OPTIONS, entry.getKey().nioBuffer(), entry.getValue().nioBuffer()); db.put(cfh, EMPTY_WRITE_OPTIONS, entry.getKey().nioBuffer(), entry.getValue().nioBuffer());
} }
} else if (USE_CAPPED_WRITE_BATCH_IN_SET_RANGE) { } else if (USE_CAPPED_WRITE_BATCH_IN_SET_RANGE) {
@ -1540,14 +1533,20 @@ public class LLLocalDictionary implements LLDictionary {
MAX_WRITE_BATCH_SIZE, MAX_WRITE_BATCH_SIZE,
BATCH_WRITE_OPTIONS BATCH_WRITE_OPTIONS
)) { )) {
for (Entry<ByteBuf, ByteBuf> entry : entriesList) { for (LLEntry entry : entriesList) {
assert !entry.isReleased();
assert entry.getKey().refCnt() > 0;
assert entry.getValue().refCnt() > 0;
batch.put(cfh, entry.getKey().retain(), entry.getValue().retain()); batch.put(cfh, entry.getKey().retain(), entry.getValue().retain());
} }
batch.writeToDbAndClose(); batch.writeToDbAndClose();
} }
} else { } else {
try (var batch = new WriteBatch(RESERVED_WRITE_BATCH_SIZE)) { try (var batch = new WriteBatch(RESERVED_WRITE_BATCH_SIZE)) {
for (Entry<ByteBuf, ByteBuf> entry : entriesList) { for (LLEntry entry : entriesList) {
assert !entry.isReleased();
assert entry.getKey().refCnt() > 0;
assert entry.getValue().refCnt() > 0;
batch.put(cfh, LLUtils.toArray(entry.getKey()), LLUtils.toArray(entry.getValue())); batch.put(cfh, LLUtils.toArray(entry.getKey()), LLUtils.toArray(entry.getValue()));
} }
db.write(EMPTY_WRITE_OPTIONS, batch); db.write(EMPTY_WRITE_OPTIONS, batch);
@ -1556,9 +1555,9 @@ public class LLLocalDictionary implements LLDictionary {
} }
return null; return null;
} finally { } finally {
for (Entry<ByteBuf, ByteBuf> entry : entriesList) { for (LLEntry entry : entriesList) {
entry.getKey().release(); assert !entry.isReleased();
entry.getValue().release(); entry.release();
} }
} }
}) })
@ -1903,7 +1902,7 @@ public class LLLocalDictionary implements LLDictionary {
} }
@Override @Override
public Mono<Entry<ByteBuf, ByteBuf>> getOne(@Nullable LLSnapshot snapshot, Mono<LLRange> rangeMono) { public Mono<LLEntry> getOne(@Nullable LLSnapshot snapshot, Mono<LLRange> rangeMono) {
return Mono.usingWhen(rangeMono, return Mono.usingWhen(rangeMono,
range -> runOnDb(() -> { range -> runOnDb(() -> {
try (var readOpts = new ReadOptions(resolveSnapshot(snapshot))) { try (var readOpts = new ReadOptions(resolveSnapshot(snapshot))) {
@ -1940,7 +1939,7 @@ public class LLLocalDictionary implements LLDictionary {
try { try {
ByteBuf value = LLUtils.readDirectNioBuffer(alloc, rocksIterator::value); ByteBuf value = LLUtils.readDirectNioBuffer(alloc, rocksIterator::value);
try { try {
return Map.entry(key.retain(), value.retain()); return new LLEntry(key, value);
} finally { } finally {
value.release(); value.release();
} }
@ -2123,7 +2122,7 @@ public class LLLocalDictionary implements LLDictionary {
} }
@Override @Override
public Mono<Entry<ByteBuf, ByteBuf>> removeOne(Mono<LLRange> rangeMono) { public Mono<LLEntry> removeOne(Mono<LLRange> rangeMono) {
return Mono.usingWhen(rangeMono, return Mono.usingWhen(rangeMono,
range -> runOnDb(() -> { range -> runOnDb(() -> {
try (var readOpts = new ReadOptions(getReadOptions(null))) { try (var readOpts = new ReadOptions(getReadOptions(null))) {
@ -2161,7 +2160,7 @@ public class LLLocalDictionary implements LLDictionary {
ByteBuf key = LLUtils.readDirectNioBuffer(alloc, rocksIterator::key); ByteBuf key = LLUtils.readDirectNioBuffer(alloc, rocksIterator::key);
ByteBuf value = LLUtils.readDirectNioBuffer(alloc, rocksIterator::value); ByteBuf value = LLUtils.readDirectNioBuffer(alloc, rocksIterator::value);
dbDelete(cfh, null, key); dbDelete(cfh, null, key);
return Map.entry(key, value); return new LLEntry(key, value);
} finally { } finally {
maxBound.release(); maxBound.release();
} }

View File

@ -2,6 +2,7 @@ package it.cavallium.dbengine.database.disk;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.ByteBufAllocator;
import it.cavallium.dbengine.database.LLEntry;
import it.cavallium.dbengine.database.LLRange; import it.cavallium.dbengine.database.LLRange;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
@ -9,7 +10,7 @@ import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ReadOptions; import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB; import org.rocksdb.RocksDB;
public class LLLocalEntryReactiveRocksIterator extends LLLocalReactiveRocksIterator<Entry<ByteBuf, ByteBuf>> { public class LLLocalEntryReactiveRocksIterator extends LLLocalReactiveRocksIterator<LLEntry> {
public LLLocalEntryReactiveRocksIterator(RocksDB db, public LLLocalEntryReactiveRocksIterator(RocksDB db,
ByteBufAllocator alloc, ByteBufAllocator alloc,
@ -22,7 +23,7 @@ public class LLLocalEntryReactiveRocksIterator extends LLLocalReactiveRocksItera
} }
@Override @Override
public Entry<ByteBuf, ByteBuf> getEntry(ByteBuf key, ByteBuf value) { public LLEntry getEntry(ByteBuf key, ByteBuf value) {
return Map.entry(key, value); return new LLEntry(key, value);
} }
} }

View File

@ -2,6 +2,7 @@ package it.cavallium.dbengine.database.disk;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.ByteBufAllocator;
import it.cavallium.dbengine.database.LLEntry;
import it.cavallium.dbengine.database.LLRange; import it.cavallium.dbengine.database.LLRange;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
@ -10,7 +11,7 @@ import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB; import org.rocksdb.RocksDB;
public class LLLocalGroupedEntryReactiveRocksIterator extends public class LLLocalGroupedEntryReactiveRocksIterator extends
LLLocalGroupedReactiveRocksIterator<Entry<ByteBuf, ByteBuf>> { LLLocalGroupedReactiveRocksIterator<LLEntry> {
public LLLocalGroupedEntryReactiveRocksIterator(RocksDB db, ByteBufAllocator alloc, ColumnFamilyHandle cfh, public LLLocalGroupedEntryReactiveRocksIterator(RocksDB db, ByteBufAllocator alloc, ColumnFamilyHandle cfh,
int prefixLength, int prefixLength,
@ -22,7 +23,7 @@ public class LLLocalGroupedEntryReactiveRocksIterator extends
} }
@Override @Override
public Entry<ByteBuf, ByteBuf> getEntry(ByteBuf key, ByteBuf value) { public LLEntry getEntry(ByteBuf key, ByteBuf value) {
return Map.entry(key, value); return new LLEntry(key, value);
} }
} }

View File

@ -469,12 +469,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
private ColumnFamilyHandle getCfh(byte[] columnName) throws RocksDBException { private ColumnFamilyHandle getCfh(byte[] columnName) throws RocksDBException {
ColumnFamilyHandle cfh = handles.get(Column.special(Column.toString(columnName))); ColumnFamilyHandle cfh = handles.get(Column.special(Column.toString(columnName)));
if (databaseOptions.enableDbAssertionsWhenUsingAssertions()) { assert enableColumnsBug || Arrays.equals(cfh.getName(), columnName);
//noinspection RedundantIfStatement
if (!enableColumnsBug) {
assert Arrays.equals(cfh.getName(), columnName);
}
}
return cfh; return cfh;
} }

View File

@ -7,10 +7,12 @@ import it.cavallium.dbengine.database.Delta;
import it.cavallium.dbengine.database.ExtraKeyOperationResult; import it.cavallium.dbengine.database.ExtraKeyOperationResult;
import it.cavallium.dbengine.database.LLDictionary; import it.cavallium.dbengine.database.LLDictionary;
import it.cavallium.dbengine.database.LLDictionaryResultType; import it.cavallium.dbengine.database.LLDictionaryResultType;
import it.cavallium.dbengine.database.LLEntry;
import it.cavallium.dbengine.database.LLRange; import it.cavallium.dbengine.database.LLRange;
import it.cavallium.dbengine.database.LLSnapshot; import it.cavallium.dbengine.database.LLSnapshot;
import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.UpdateMode; import it.cavallium.dbengine.database.UpdateMode;
import it.cavallium.dbengine.database.disk.ReleasableSlice;
import it.cavallium.dbengine.database.serialization.BiSerializationFunction; import it.cavallium.dbengine.database.serialization.BiSerializationFunction;
import it.cavallium.dbengine.database.serialization.SerializationException; import it.cavallium.dbengine.database.serialization.SerializationException;
import it.cavallium.dbengine.database.serialization.SerializationFunction; import it.cavallium.dbengine.database.serialization.SerializationFunction;
@ -245,7 +247,7 @@ public class LLMemoryDictionary implements LLDictionary {
} }
@Override @Override
public Flux<Entry<ByteBuf, ByteBuf>> putMulti(Flux<Entry<ByteBuf, ByteBuf>> entries, boolean getOldValues) { public Flux<LLEntry> putMulti(Flux<LLEntry> entries, boolean getOldValues) {
return entries return entries
.handle((entry, sink) -> { .handle((entry, sink) -> {
var key = entry.getKey(); var key = entry.getKey();
@ -255,7 +257,7 @@ public class LLMemoryDictionary implements LLDictionary {
if (v == null || !getOldValues) { if (v == null || !getOldValues) {
sink.complete(); sink.complete();
} else { } else {
sink.next(Map.entry(key.retain(), kk(v))); sink.next(new LLEntry(key.retain(), kk(v)));
} }
} finally { } finally {
key.release(); key.release();
@ -271,7 +273,7 @@ public class LLMemoryDictionary implements LLDictionary {
} }
@Override @Override
public Flux<Entry<ByteBuf, ByteBuf>> getRange(@Nullable LLSnapshot snapshot, public Flux<LLEntry> getRange(@Nullable LLSnapshot snapshot,
Mono<LLRange> rangeMono, Mono<LLRange> rangeMono,
boolean existsAlmostCertainly) { boolean existsAlmostCertainly) {
return Flux.usingWhen(rangeMono, return Flux.usingWhen(rangeMono,
@ -280,13 +282,13 @@ public class LLMemoryDictionary implements LLDictionary {
return Mono.fromCallable(() -> { return Mono.fromCallable(() -> {
var element = snapshots.get(resolveSnapshot(snapshot)) var element = snapshots.get(resolveSnapshot(snapshot))
.get(k(range.getSingle())); .get(k(range.getSingle()));
return Map.entry(range.getSingle().retain(), kk(element)); return new LLEntry(range.getSingle().retain(), kk(element));
}).flux(); }).flux();
} else { } else {
return Mono return Mono
.fromCallable(() -> mapSlice(snapshot, range)) .fromCallable(() -> mapSlice(snapshot, range))
.flatMapMany(map -> Flux.fromIterable(map.entrySet())) .flatMapMany(map -> Flux.fromIterable(map.entrySet()))
.map(entry -> Map.entry(kk(entry.getKey()), kk(entry.getValue()))); .map(entry -> new LLEntry(kk(entry.getKey()), kk(entry.getValue())));
} }
}, },
range -> Mono.fromRunnable(range::release) range -> Mono.fromRunnable(range::release)
@ -294,7 +296,7 @@ public class LLMemoryDictionary implements LLDictionary {
} }
@Override @Override
public Flux<List<Entry<ByteBuf, ByteBuf>>> getRangeGrouped(@Nullable LLSnapshot snapshot, public Flux<List<LLEntry>> getRangeGrouped(@Nullable LLSnapshot snapshot,
Mono<LLRange> rangeMono, Mono<LLRange> rangeMono,
int prefixLength, int prefixLength,
boolean existsAlmostCertainly) { boolean existsAlmostCertainly) {
@ -333,8 +335,16 @@ public class LLMemoryDictionary implements LLDictionary {
@Override @Override
public Flux<ByteBuf> getRangeKeyPrefixes(@Nullable LLSnapshot snapshot, Mono<LLRange> rangeMono, int prefixLength) { public Flux<ByteBuf> getRangeKeyPrefixes(@Nullable LLSnapshot snapshot, Mono<LLRange> rangeMono, int prefixLength) {
return getRangeKeys(snapshot, rangeMono) return getRangeKeys(snapshot, rangeMono)
.distinctUntilChanged(k -> k.slice(k.readerIndex(), prefixLength), LLUtils::equals) .distinctUntilChanged(k -> k.slice(k.readerIndex(), prefixLength), (a, b) -> {
.map(k -> k.slice(k.readerIndex(), prefixLength)); if (LLUtils.equals(a, b)) {
b.release();
return true;
} else {
return false;
}
})
.map(k -> k.slice(k.readerIndex(), prefixLength))
.transform(LLUtils::handleDiscard);
} }
@Override @Override
@ -343,7 +353,7 @@ public class LLMemoryDictionary implements LLDictionary {
} }
@Override @Override
public Mono<Void> setRange(Mono<LLRange> rangeMono, Flux<Entry<ByteBuf, ByteBuf>> entries) { public Mono<Void> setRange(Mono<LLRange> rangeMono, Flux<LLEntry> entries) {
return Mono.error(new UnsupportedOperationException("Not implemented")); return Mono.error(new UnsupportedOperationException("Not implemented"));
} }
@ -361,7 +371,7 @@ public class LLMemoryDictionary implements LLDictionary {
} }
@Override @Override
public Mono<Entry<ByteBuf, ByteBuf>> getOne(@Nullable LLSnapshot snapshot, Mono<LLRange> rangeMono) { public Mono<LLEntry> getOne(@Nullable LLSnapshot snapshot, Mono<LLRange> rangeMono) {
return Mono.error(new UnsupportedOperationException("Not implemented")); return Mono.error(new UnsupportedOperationException("Not implemented"));
} }
@ -371,7 +381,7 @@ public class LLMemoryDictionary implements LLDictionary {
} }
@Override @Override
public Mono<Entry<ByteBuf, ByteBuf>> removeOne(Mono<LLRange> rangeMono) { public Mono<LLEntry> removeOne(Mono<LLRange> rangeMono) {
return Mono.error(new UnsupportedOperationException("Not implemented")); return Mono.error(new UnsupportedOperationException("Not implemented"));
} }

View File

@ -1,9 +1,14 @@
package it.cavallium.dbengine; package it.cavallium.dbengine;
import static org.junit.jupiter.api.Assertions.assertEquals;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.PoolArenaMetric;
import io.netty.buffer.PooledByteBufAllocator; import io.netty.buffer.PooledByteBufAllocator;
import io.netty.buffer.UnpooledByteBufAllocator;
import it.cavallium.dbengine.database.Column; import it.cavallium.dbengine.database.Column;
import it.cavallium.dbengine.database.LLDatabaseConnection;
import it.cavallium.dbengine.database.LLDictionary; import it.cavallium.dbengine.database.LLDictionary;
import it.cavallium.dbengine.database.LLKeyValueDatabase; import it.cavallium.dbengine.database.LLKeyValueDatabase;
import it.cavallium.dbengine.database.UpdateMode; import it.cavallium.dbengine.database.UpdateMode;
@ -35,47 +40,129 @@ import reactor.core.scheduler.Schedulers;
public class DbTestUtils { public class DbTestUtils {
public static final ByteBufAllocator ALLOCATOR = new PooledByteBufAllocator(true); private volatile static ByteBufAllocator POOLED_ALLOCATOR = null;
public static synchronized ByteBufAllocator getUncachedAllocator() {
try {
ensureNoLeaks(POOLED_ALLOCATOR);
} catch (Throwable ex) {
POOLED_ALLOCATOR = null;
}
if (POOLED_ALLOCATOR == null) {
POOLED_ALLOCATOR = new PooledByteBufAllocator(false, 1, 0, 8192, 11, 0, 0, true);
}
return POOLED_ALLOCATOR;
}
public static synchronized ByteBufAllocator getUncachedAllocatorUnsafe() {
return POOLED_ALLOCATOR;
}
public static final AtomicInteger dbId = new AtomicInteger(0); public static final AtomicInteger dbId = new AtomicInteger(0);
@SuppressWarnings("SameParameterValue")
private static int getActiveBuffers(ByteBufAllocator allocator) {
int directActive = 0, directAlloc = 0, directDealloc = 0;
if (allocator instanceof PooledByteBufAllocator alloc) {
for (PoolArenaMetric arena : alloc.directArenas()) {
directActive += arena.numActiveAllocations();
directAlloc += arena.numAllocations();
directDealloc += arena.numDeallocations();
}
} else if (allocator instanceof UnpooledByteBufAllocator alloc) {
directActive += alloc.metric().usedDirectMemory();
} else {
throw new UnsupportedOperationException();
}
System.out.println("directActive " + directActive + " directAlloc " + directAlloc + " directDealloc " + directDealloc);
return directActive;
}
@SuppressWarnings("SameParameterValue")
private static int getActiveHeapBuffers(ByteBufAllocator allocator) {
int heapActive = 0, heapAlloc = 0, heapDealloc = 0;
if (allocator instanceof PooledByteBufAllocator alloc) {
for (PoolArenaMetric arena : alloc.heapArenas()) {
heapActive += arena.numActiveAllocations();
heapAlloc += arena.numAllocations();
heapDealloc += arena.numDeallocations();
}
} else if (allocator instanceof UnpooledByteBufAllocator alloc) {
heapActive += alloc.metric().usedHeapMemory();
} else {
throw new UnsupportedOperationException();
}
System.out.println("heapActive " + heapActive + " heapAlloc " + heapAlloc + " heapDealloc " + heapDealloc);
return heapActive;
}
public static <U> Flux<U> tempDb(Function<LLKeyValueDatabase, Publisher<U>> action) { public static <U> Flux<U> tempDb(Function<LLKeyValueDatabase, Publisher<U>> action) {
var wrkspcPath = Path.of("/tmp/.cache/tempdb-" + dbId.incrementAndGet() + "/"); return Flux.usingWhen(openTempDb(),
return Flux.usingWhen(Mono tempDb -> action.apply(tempDb.db()),
.<LLKeyValueDatabase>fromCallable(() -> { DbTestUtils::closeTempDb
if (Files.exists(wrkspcPath)) {
Files.walk(wrkspcPath).sorted(Comparator.reverseOrder()).forEach(file -> {
try {
Files.delete(file);
} catch (IOException ex) {
throw new CompletionException(ex);
}
});
}
Files.createDirectories(wrkspcPath);
return null;
})
.subscribeOn(Schedulers.boundedElastic())
.then(new LLLocalDatabaseConnection(DbTestUtils.ALLOCATOR, wrkspcPath).connect())
.flatMap(conn -> conn.getDatabase("testdb",
List.of(Column.dictionary("testmap"), Column.special("ints"), Column.special("longs")),
new DatabaseOptions(Map.of(), true, false, true, false, true, true, true, true, -1)
)),
action,
db -> db.close().then(Mono.fromCallable(() -> {
if (Files.exists(wrkspcPath)) {
Files.walk(wrkspcPath).sorted(Comparator.reverseOrder()).forEach(file -> {
try {
Files.delete(file);
} catch (IOException ex) {
throw new CompletionException(ex);
}
});
}
return null;
}).subscribeOn(Schedulers.boundedElastic()))
); );
} }
public static record TempDb(ByteBufAllocator allocator, LLDatabaseConnection connection, LLKeyValueDatabase db,
Path path) {}
public static Mono<TempDb> openTempDb() {
return Mono.defer(() -> {
var wrkspcPath = Path.of("/tmp/.cache/tempdb-" + dbId.incrementAndGet() + "/");
var alloc = getUncachedAllocator();
return Mono
.<LLKeyValueDatabase>fromCallable(() -> {
if (Files.exists(wrkspcPath)) {
Files.walk(wrkspcPath).sorted(Comparator.reverseOrder()).forEach(file -> {
try {
Files.delete(file);
} catch (IOException ex) {
throw new CompletionException(ex);
}
});
}
Files.createDirectories(wrkspcPath);
return null;
})
.subscribeOn(Schedulers.boundedElastic())
.then(new LLLocalDatabaseConnection(alloc, wrkspcPath).connect())
.flatMap(conn -> conn
.getDatabase("testdb",
List.of(Column.dictionary("testmap"), Column.special("ints"), Column.special("longs")),
new DatabaseOptions(Map.of(), true, false, true, false, true, true, true, -1)
)
.map(db -> new TempDb(alloc, conn, db, wrkspcPath))
);
});
}
public static Mono<Void> closeTempDb(TempDb tempDb) {
return tempDb.db().close().then(tempDb.connection().disconnect()).then(Mono.fromCallable(() -> {
ensureNoLeaks(tempDb.allocator());
if (tempDb.allocator() instanceof PooledByteBufAllocator pooledByteBufAllocator) {
pooledByteBufAllocator.trimCurrentThreadCache();
pooledByteBufAllocator.freeThreadLocalCache();
}
if (Files.exists(tempDb.path())) {
Files.walk(tempDb.path()).sorted(Comparator.reverseOrder()).forEach(file -> {
try {
Files.delete(file);
} catch (IOException ex) {
throw new CompletionException(ex);
}
});
}
return null;
}).subscribeOn(Schedulers.boundedElastic())).then();
}
public static void ensureNoLeaks(ByteBufAllocator allocator) {
if (allocator != null) {
assertEquals(0, getActiveBuffers(allocator));
assertEquals(0, getActiveHeapBuffers(allocator));
}
}
public static Mono<? extends LLDictionary> tempDictionary(LLKeyValueDatabase database, UpdateMode updateMode) { public static Mono<? extends LLDictionary> tempDictionary(LLKeyValueDatabase database, UpdateMode updateMode) {
return tempDictionary(database, "testmap", updateMode); return tempDictionary(database, "testmap", updateMode);
} }
@ -98,13 +185,13 @@ public class DbTestUtils {
int keyBytes) { int keyBytes) {
if (dbType == DbType.MAP) { if (dbType == DbType.MAP) {
return DatabaseMapDictionary.simple(dictionary, return DatabaseMapDictionary.simple(dictionary,
SerializerFixedBinaryLength.utf8(DbTestUtils.ALLOCATOR, keyBytes), SerializerFixedBinaryLength.utf8(dictionary.getAllocator(), keyBytes),
Serializer.utf8(DbTestUtils.ALLOCATOR) Serializer.utf8(dictionary.getAllocator())
); );
} else { } else {
return DatabaseMapDictionaryHashed.simple(dictionary, return DatabaseMapDictionaryHashed.simple(dictionary,
Serializer.utf8(DbTestUtils.ALLOCATOR), Serializer.utf8(dictionary.getAllocator()),
Serializer.utf8(DbTestUtils.ALLOCATOR), Serializer.utf8(dictionary.getAllocator()),
s -> (short) s.hashCode(), s -> (short) s.hashCode(),
new SerializerFixedBinaryLength<>() { new SerializerFixedBinaryLength<>() {
@Override @Override
@ -126,7 +213,7 @@ public class DbTestUtils {
@Override @Override
public @NotNull ByteBuf serialize(@NotNull Short deserialized) { public @NotNull ByteBuf serialize(@NotNull Short deserialized) {
var out = DbTestUtils.ALLOCATOR.directBuffer(Short.BYTES); var out = dictionary.getAllocator().directBuffer(Short.BYTES);
try { try {
out.writeShort(deserialized); out.writeShort(deserialized);
out.writerIndex(Short.BYTES); out.writerIndex(Short.BYTES);
@ -140,33 +227,31 @@ public class DbTestUtils {
} }
} }
public static <T, U> DatabaseMapDictionaryDeep<String, Map<String, String>, public static DatabaseMapDictionaryDeep<String, Map<String, String>,
DatabaseMapDictionary<String, String>> tempDatabaseMapDictionaryDeepMap( DatabaseMapDictionary<String, String>> tempDatabaseMapDictionaryDeepMap(
LLDictionary dictionary, LLDictionary dictionary,
int key1Bytes, int key1Bytes,
int key2Bytes) { int key2Bytes) {
return DatabaseMapDictionaryDeep.deepTail(dictionary, return DatabaseMapDictionaryDeep.deepTail(dictionary,
SerializerFixedBinaryLength.utf8(DbTestUtils.ALLOCATOR, key1Bytes), SerializerFixedBinaryLength.utf8(dictionary.getAllocator(), key1Bytes),
key2Bytes, key2Bytes,
new SubStageGetterMap<>(SerializerFixedBinaryLength.utf8(DbTestUtils.ALLOCATOR, key2Bytes), new SubStageGetterMap<>(SerializerFixedBinaryLength.utf8(dictionary.getAllocator(), key2Bytes),
Serializer.utf8(DbTestUtils.ALLOCATOR), Serializer.utf8(dictionary.getAllocator())
true
) )
); );
} }
public static <T, U> DatabaseMapDictionaryDeep<String, Map<String, String>, public static DatabaseMapDictionaryDeep<String, Map<String, String>,
DatabaseMapDictionaryHashed<String, String, Integer>> tempDatabaseMapDictionaryDeepMapHashMap( DatabaseMapDictionaryHashed<String, String, Integer>> tempDatabaseMapDictionaryDeepMapHashMap(
LLDictionary dictionary, LLDictionary dictionary,
int key1Bytes) { int key1Bytes) {
return DatabaseMapDictionaryDeep.deepTail(dictionary, return DatabaseMapDictionaryDeep.deepTail(dictionary,
SerializerFixedBinaryLength.utf8(DbTestUtils.ALLOCATOR, key1Bytes), SerializerFixedBinaryLength.utf8(dictionary.getAllocator(), key1Bytes),
Integer.BYTES, Integer.BYTES,
new SubStageGetterHashMap<>(Serializer.utf8(DbTestUtils.ALLOCATOR), new SubStageGetterHashMap<>(Serializer.utf8(dictionary.getAllocator()),
Serializer.utf8(DbTestUtils.ALLOCATOR), Serializer.utf8(dictionary.getAllocator()),
String::hashCode, String::hashCode,
SerializerFixedBinaryLength.intSerializer(DbTestUtils.ALLOCATOR), SerializerFixedBinaryLength.intSerializer(dictionary.getAllocator())
true
) )
); );
} }
@ -174,10 +259,10 @@ public class DbTestUtils {
public static <T, U> DatabaseMapDictionaryHashed<String, String, Integer> tempDatabaseMapDictionaryHashMap( public static <T, U> DatabaseMapDictionaryHashed<String, String, Integer> tempDatabaseMapDictionaryHashMap(
LLDictionary dictionary) { LLDictionary dictionary) {
return DatabaseMapDictionaryHashed.simple(dictionary, return DatabaseMapDictionaryHashed.simple(dictionary,
Serializer.utf8(DbTestUtils.ALLOCATOR), Serializer.utf8(dictionary.getAllocator()),
Serializer.utf8(DbTestUtils.ALLOCATOR), Serializer.utf8(dictionary.getAllocator()),
String::hashCode, String::hashCode,
SerializerFixedBinaryLength.intSerializer(DbTestUtils.ALLOCATOR) SerializerFixedBinaryLength.intSerializer(dictionary.getAllocator())
); );
} }
} }

View File

@ -75,7 +75,7 @@ public class OldDatabaseTests {
.map(dictionary -> DatabaseMapDictionaryDeep.deepTail(dictionary, .map(dictionary -> DatabaseMapDictionaryDeep.deepTail(dictionary,
new FixedStringSerializer(3), new FixedStringSerializer(3),
4, 4,
new SubStageGetterMap<>(new FixedStringSerializer(4), Serializer.noop(), true) new SubStageGetterMap<>(new FixedStringSerializer(4), Serializer.noop())
)) ))
.flatMap(collection -> Flux .flatMap(collection -> Flux
.fromIterable(originalSuperKeys) .fromIterable(originalSuperKeys)
@ -135,7 +135,7 @@ public class OldDatabaseTests {
.then(new LLLocalDatabaseConnection(PooledByteBufAllocator.DEFAULT, wrkspcPath).connect()) .then(new LLLocalDatabaseConnection(PooledByteBufAllocator.DEFAULT, wrkspcPath).connect())
.flatMap(conn -> conn.getDatabase("testdb", .flatMap(conn -> conn.getDatabase("testdb",
List.of(Column.dictionary("testmap")), List.of(Column.dictionary("testmap")),
new DatabaseOptions(Map.of(), true, false, true, false, true, true, true, true, -1) new DatabaseOptions(Map.of(), true, false, true, false, true, true, true, -1)
)); ));
} }
@ -159,14 +159,14 @@ public class OldDatabaseTests {
.map(dictionary -> DatabaseMapDictionaryDeep.deepTail(dictionary, .map(dictionary -> DatabaseMapDictionaryDeep.deepTail(dictionary,
new FixedStringSerializer(3), new FixedStringSerializer(3),
4, 4,
new SubStageGetterMap<>(new FixedStringSerializer(4), Serializer.noop(), true) new SubStageGetterMap<>(new FixedStringSerializer(4), Serializer.noop())
)), )),
db db
.getDictionary("testmap", UpdateMode.DISALLOW) .getDictionary("testmap", UpdateMode.DISALLOW)
.map(dictionary -> DatabaseMapDictionaryDeep.deepTail(dictionary, .map(dictionary -> DatabaseMapDictionaryDeep.deepTail(dictionary,
new FixedStringSerializer(6), new FixedStringSerializer(6),
7, 7,
new SubStageGetterMap<>(new FixedStringSerializer(7), Serializer.noop(), true) new SubStageGetterMap<>(new FixedStringSerializer(7), Serializer.noop())
)) ))
) )
.single() .single()

View File

@ -1,5 +1,8 @@
package it.cavallium.dbengine; package it.cavallium.dbengine;
import static it.cavallium.dbengine.DbTestUtils.ensureNoLeaks;
import static it.cavallium.dbengine.DbTestUtils.getUncachedAllocator;
import static it.cavallium.dbengine.DbTestUtils.getUncachedAllocatorUnsafe;
import static it.cavallium.dbengine.DbTestUtils.tempDb; import static it.cavallium.dbengine.DbTestUtils.tempDb;
import static it.cavallium.dbengine.DbTestUtils.tempDictionary; import static it.cavallium.dbengine.DbTestUtils.tempDictionary;
@ -7,6 +10,8 @@ import it.cavallium.dbengine.database.LLDictionary;
import it.cavallium.dbengine.database.UpdateMode; import it.cavallium.dbengine.database.UpdateMode;
import java.util.Arrays; import java.util.Arrays;
import java.util.stream.Stream; import java.util.stream.Stream;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource; import org.junit.jupiter.params.provider.MethodSource;
@ -18,6 +23,16 @@ public class TestDictionary {
return Arrays.stream(UpdateMode.values()).map(Arguments::of); return Arrays.stream(UpdateMode.values()).map(Arguments::of);
} }
@BeforeEach
public void beforeEach() {
ensureNoLeaks(getUncachedAllocator());
}
@AfterEach
public void afterEach() {
ensureNoLeaks(getUncachedAllocatorUnsafe());
}
@ParameterizedTest @ParameterizedTest
@MethodSource("provideArgumentsCreate") @MethodSource("provideArgumentsCreate")
public void testCreate(UpdateMode updateMode) { public void testCreate(UpdateMode updateMode) {

View File

@ -2,6 +2,7 @@ package it.cavallium.dbengine;
import static it.cavallium.dbengine.DbTestUtils.*; import static it.cavallium.dbengine.DbTestUtils.*;
import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.UpdateMode; import it.cavallium.dbengine.database.UpdateMode;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
@ -13,6 +14,8 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.Stream; import java.util.stream.Stream;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource; import org.junit.jupiter.params.provider.MethodSource;
@ -74,6 +77,16 @@ public class TestDictionaryMap {
.map(fullTuple -> Arguments.of(fullTuple.getT1(), fullTuple.getT2(), fullTuple.getT3(), fullTuple.getT4(), fullTuple.getT5())); .map(fullTuple -> Arguments.of(fullTuple.getT1(), fullTuple.getT2(), fullTuple.getT3(), fullTuple.getT4(), fullTuple.getT5()));
} }
@BeforeEach
public void beforeEach() {
ensureNoLeaks(getUncachedAllocator());
}
@AfterEach
public void afterEach() {
ensureNoLeaks(getUncachedAllocatorUnsafe());
}
@ParameterizedTest @ParameterizedTest
@MethodSource("provideArgumentsPut") @MethodSource("provideArgumentsPut")
public void testPut(DbType dbType, UpdateMode updateMode, String key, String value, boolean shouldFail) { public void testPut(DbType dbType, UpdateMode updateMode, String key, String value, boolean shouldFail) {
@ -338,6 +351,7 @@ public class TestDictionaryMap {
) )
.filter(k -> k.getValue().isPresent()) .filter(k -> k.getValue().isPresent())
.map(k -> Map.entry(k.getKey(), k.getValue().orElseThrow())) .map(k -> Map.entry(k.getKey(), k.getValue().orElseThrow()))
.transform(LLUtils::handleDiscard)
)); ));
if (shouldFail) { if (shouldFail) {
stpVer.verifyError(); stpVer.verifyError();
@ -390,6 +404,7 @@ public class TestDictionaryMap {
) )
.doAfterTerminate(map::release) .doAfterTerminate(map::release)
) )
.transform(LLUtils::handleDiscard)
)); ));
if (shouldFail) { if (shouldFail) {
stpVer.verifyError(); stpVer.verifyError();
@ -527,6 +542,7 @@ public class TestDictionaryMap {
) )
.doAfterTerminate(map::release) .doAfterTerminate(map::release)
) )
.transform(LLUtils::handleDiscard)
)); ));
if (shouldFail) { if (shouldFail) {
stpVer.verifyError(); stpVer.verifyError();
@ -555,6 +571,7 @@ public class TestDictionaryMap {
) )
.doAfterTerminate(map::release) .doAfterTerminate(map::release)
) )
.transform(LLUtils::handleDiscard)
)); ));
if (shouldFail) { if (shouldFail) {
stpVer.verifyError(); stpVer.verifyError();
@ -588,6 +605,7 @@ public class TestDictionaryMap {
) )
.doAfterTerminate(map::release) .doAfterTerminate(map::release)
) )
.transform(LLUtils::handleDiscard)
)); ));
if (shouldFail) { if (shouldFail) {
stpVer.verifyError(); stpVer.verifyError();
@ -616,6 +634,7 @@ public class TestDictionaryMap {
.doAfterTerminate(map::release) .doAfterTerminate(map::release)
) )
.flatMap(val -> shouldFail ? Mono.empty() : Mono.just(val)) .flatMap(val -> shouldFail ? Mono.empty() : Mono.just(val))
.transform(LLUtils::handleDiscard)
)); ));
if (shouldFail) { if (shouldFail) {
stpVer.verifyError(); stpVer.verifyError();
@ -627,7 +646,6 @@ public class TestDictionaryMap {
@ParameterizedTest @ParameterizedTest
@MethodSource("provideArgumentsPutMulti") @MethodSource("provideArgumentsPutMulti")
public void testPutMultiClear(DbType dbType, UpdateMode updateMode, Map<String, String> entries, boolean shouldFail) { public void testPutMultiClear(DbType dbType, UpdateMode updateMode, Map<String, String> entries, boolean shouldFail) {
var remainingEntries = new ConcurrentHashMap<Entry<String, String>, Boolean>().keySet(true);
Step<Boolean> stpVer = StepVerifier Step<Boolean> stpVer = StepVerifier
.create(tempDb(db -> tempDictionary(db, updateMode) .create(tempDb(db -> tempDictionary(db, updateMode)
.map(dict -> tempDatabaseMapDictionaryMap(dict, dbType, 5)) .map(dict -> tempDatabaseMapDictionaryMap(dict, dbType, 5))
@ -642,6 +660,7 @@ public class TestDictionaryMap {
.doAfterTerminate(map::release) .doAfterTerminate(map::release)
) )
.flatMap(val -> shouldFail ? Mono.empty() : Mono.just(val)) .flatMap(val -> shouldFail ? Mono.empty() : Mono.just(val))
.transform(LLUtils::handleDiscard)
)); ));
if (shouldFail) { if (shouldFail) {
stpVer.verifyError(); stpVer.verifyError();

View File

@ -1,9 +1,13 @@
package it.cavallium.dbengine; package it.cavallium.dbengine;
import static it.cavallium.dbengine.DbTestUtils.ensureNoLeaks;
import static it.cavallium.dbengine.DbTestUtils.getUncachedAllocator;
import static it.cavallium.dbengine.DbTestUtils.getUncachedAllocatorUnsafe;
import static it.cavallium.dbengine.DbTestUtils.tempDatabaseMapDictionaryDeepMap; import static it.cavallium.dbengine.DbTestUtils.tempDatabaseMapDictionaryDeepMap;
import static it.cavallium.dbengine.DbTestUtils.tempDb; import static it.cavallium.dbengine.DbTestUtils.tempDb;
import static it.cavallium.dbengine.DbTestUtils.tempDictionary; import static it.cavallium.dbengine.DbTestUtils.tempDictionary;
import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.UpdateMode; import it.cavallium.dbengine.database.UpdateMode;
import java.util.Arrays; import java.util.Arrays;
import java.util.Map; import java.util.Map;
@ -13,6 +17,8 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.Stream; import java.util.stream.Stream;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource; import org.junit.jupiter.params.provider.MethodSource;
@ -140,6 +146,16 @@ public class TestDictionaryMapDeep {
.toStream(); .toStream();
} }
@BeforeEach
public void beforeEach() {
ensureNoLeaks(getUncachedAllocator());
}
@AfterEach
public void afterEach() {
ensureNoLeaks(getUncachedAllocatorUnsafe());
}
@ParameterizedTest @ParameterizedTest
@MethodSource("provideArgumentsSet") @MethodSource("provideArgumentsSet")
public void testSetValueGetValue(UpdateMode updateMode, String key, Map<String, String> value, boolean shouldFail) { public void testSetValueGetValue(UpdateMode updateMode, String key, Map<String, String> value, boolean shouldFail) {
@ -520,35 +536,31 @@ public class TestDictionaryMapDeep {
if (updateMode != UpdateMode.ALLOW_UNSAFE && !isTestBadKeysEnabled()) { if (updateMode != UpdateMode.ALLOW_UNSAFE && !isTestBadKeysEnabled()) {
return; return;
} }
var stpVer = StepVerifier var stpVer = StepVerifier.create(tempDb(db -> tempDictionary(db, updateMode)
.create(tempDb(db -> tempDictionary(db, updateMode) .map(dict -> tempDatabaseMapDictionaryDeepMap(dict, 5, 6))
.map(dict -> tempDatabaseMapDictionaryDeepMap(dict, 5, 6)) .flatMapMany(map -> Flux.concat(
.flatMapMany(map -> Flux map.updateValue(key, old -> {
.concat( assert old == null;
map.updateValue(key, old -> { return Map.of("error?", "error.");
assert old == null; }).then(map.getValue(null, key)),
return Map.of("error?", "error."); map.updateValue(key, false, old -> {
}).then(map.getValue(null, key)), assert Objects.equals(old, Map.of("error?", "error."));
map.updateValue(key, false, old -> { return Map.of("error?", "error.");
assert Objects.equals(old, Map.of("error?", "error.")); }).then(map.getValue(null, key)),
return Map.of("error?", "error."); map.updateValue(key, true, old -> {
}).then(map.getValue(null, key)), assert Objects.equals(old, Map.of("error?", "error."));
map.updateValue(key, true, old -> { return Map.of("error?", "error.");
assert Objects.equals(old, Map.of("error?", "error.")); }).then(map.getValue(null, key)),
return Map.of("error?", "error."); map.updateValue(key, true, old -> {
}).then(map.getValue(null, key)), assert Objects.equals(old, Map.of("error?", "error."));
map.updateValue(key, true, old -> { return value;
assert Objects.equals(old, Map.of("error?", "error.")); }).then(map.getValue(null, key)),
return value; map.updateValue(key, true, old -> {
}).then(map.getValue(null, key)), assert Objects.equals(old, value);
map.updateValue(key, true, old -> { return value;
assert Objects.equals(old, value); }).then(map.getValue(null, key))
return value; ).doAfterTerminate(map::release))
}).then(map.getValue(null, key)) ));
)
.doAfterTerminate(map::release)
)
));
if (updateMode != UpdateMode.ALLOW_UNSAFE || shouldFail) { if (updateMode != UpdateMode.ALLOW_UNSAFE || shouldFail) {
stpVer.verifyError(); stpVer.verifyError();
} else { } else {
@ -795,6 +807,7 @@ public class TestDictionaryMapDeep {
) )
.doAfterTerminate(map::release); .doAfterTerminate(map::release);
}) })
.transform(LLUtils::handleDiscard)
)); ));
if (shouldFail) { if (shouldFail) {
stpVer.verifyError(); stpVer.verifyError();

View File

@ -1,5 +1,8 @@
package it.cavallium.dbengine; package it.cavallium.dbengine;
import static it.cavallium.dbengine.DbTestUtils.ensureNoLeaks;
import static it.cavallium.dbengine.DbTestUtils.getUncachedAllocator;
import static it.cavallium.dbengine.DbTestUtils.getUncachedAllocatorUnsafe;
import static it.cavallium.dbengine.DbTestUtils.tempDatabaseMapDictionaryDeepMapHashMap; import static it.cavallium.dbengine.DbTestUtils.tempDatabaseMapDictionaryDeepMapHashMap;
import static it.cavallium.dbengine.DbTestUtils.tempDb; import static it.cavallium.dbengine.DbTestUtils.tempDb;
import static it.cavallium.dbengine.DbTestUtils.tempDictionary; import static it.cavallium.dbengine.DbTestUtils.tempDictionary;
@ -13,6 +16,8 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.Stream; import java.util.stream.Stream;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource; import org.junit.jupiter.params.provider.MethodSource;
@ -94,6 +99,16 @@ public class TestDictionaryMapDeepHashMap {
.toStream(); .toStream();
} }
@BeforeEach
public void beforeEach() {
ensureNoLeaks(getUncachedAllocator());
}
@AfterEach
public void afterEach() {
ensureNoLeaks(getUncachedAllocatorUnsafe());
}
@ParameterizedTest @ParameterizedTest
@MethodSource("provideArgumentsPut") @MethodSource("provideArgumentsPut")
public void testAtPutValueGetAllValues(UpdateMode updateMode, String key1, String key2, String value, boolean shouldFail) { public void testAtPutValueGetAllValues(UpdateMode updateMode, String key1, String key2, String value, boolean shouldFail) {

View File

@ -0,0 +1,217 @@
package it.cavallium.dbengine;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import io.netty.buffer.ByteBuf;
import it.cavallium.dbengine.DbTestUtils.TempDb;
import it.cavallium.dbengine.database.LLDictionary;
import it.cavallium.dbengine.database.LLDictionaryResultType;
import it.cavallium.dbengine.database.LLKeyValueDatabase;
import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.UpdateMode;
import it.cavallium.dbengine.database.UpdateReturnMode;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Objects;
import java.util.concurrent.Flow.Publisher;
import java.util.stream.Stream;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
public class TestLLDictionaryLeaks {
private TempDb tempDb;
private LLKeyValueDatabase db;
@BeforeEach
public void beforeEach() {
tempDb = Objects.requireNonNull(DbTestUtils.openTempDb().block(), "TempDB");
db = tempDb.db();
}
public static Stream<Arguments> provideArguments() {
return Arrays.stream(UpdateMode.values()).map(Arguments::of);
}
public static Stream<Arguments> providePutArguments() {
var updateModes = Arrays.stream(UpdateMode.values());
return updateModes.flatMap(updateMode -> {
var resultTypes = Arrays.stream(LLDictionaryResultType.values());
return resultTypes.map(resultType -> Arguments.of(updateMode, resultType));
});
}
public static Stream<Arguments> provideUpdateArguments() {
var updateModes = Arrays.stream(UpdateMode.values());
return updateModes.flatMap(updateMode -> {
var resultTypes = Arrays.stream(UpdateReturnMode.values());
return resultTypes.map(resultType -> Arguments.of(updateMode, resultType));
});
}
private LLDictionary getDict(UpdateMode updateMode) {
var dict = DbTestUtils.tempDictionary(db, updateMode).block();
var key1 = Mono.fromCallable(() -> fromString("test-key-1"));
var key2 = Mono.fromCallable(() -> fromString("test-key-2"));
var key3 = Mono.fromCallable(() -> fromString("test-key-3"));
var key4 = Mono.fromCallable(() -> fromString("test-key-4"));
var value = Mono.fromCallable(() -> fromString("test-value"));
dict.put(key1, value, LLDictionaryResultType.VOID).block();
dict.put(key2, value, LLDictionaryResultType.VOID).block();
dict.put(key3, value, LLDictionaryResultType.VOID).block();
dict.put(key4, value, LLDictionaryResultType.VOID).block();
return dict;
}
private ByteBuf fromString(String s) {
var sb = s.getBytes(StandardCharsets.UTF_8);
var b = db.getAllocator().buffer(sb.length);
b.writeBytes(b);
return b;
}
private void run(Flux<?> publisher) {
publisher.subscribeOn(Schedulers.immediate()).blockLast();
}
private void runVoid(Mono<Void> publisher) {
publisher.then().subscribeOn(Schedulers.immediate()).block();
}
private <T> T run(Mono<T> publisher) {
return publisher.subscribeOn(Schedulers.immediate()).block();
}
private <T> T run(boolean shouldFail, Mono<T> publisher) {
return publisher.subscribeOn(Schedulers.immediate()).transform(mono -> {
if (shouldFail) {
return mono.onErrorResume(ex -> Mono.empty());
} else {
return mono;
}
}).block();
}
private void runVoid(boolean shouldFail, Mono<Void> publisher) {
publisher.then().subscribeOn(Schedulers.immediate()).transform(mono -> {
if (shouldFail) {
return mono.onErrorResume(ex -> Mono.empty());
} else {
return mono;
}
}).block();
}
@Test
public void testNoOp() {
}
@ParameterizedTest
@MethodSource("provideArguments")
public void testGetDict(UpdateMode updateMode) {
var dict = getDict(updateMode);
}
@ParameterizedTest
@MethodSource("provideArguments")
public void testGetColumnName(UpdateMode updateMode) {
var dict = getDict(updateMode);
dict.getColumnName();
}
@ParameterizedTest
@MethodSource("provideArguments")
public void testGetAllocator(UpdateMode updateMode) {
var dict = getDict(updateMode);
dict.getAllocator();
}
@ParameterizedTest
@MethodSource("provideArguments")
public void testGet(UpdateMode updateMode) {
var dict = getDict(updateMode);
var key = Mono.fromCallable(() -> fromString("test"));
runVoid(dict.get(null, key).then().transform(LLUtils::handleDiscard));
runVoid(dict.get(null, key, true).then().transform(LLUtils::handleDiscard));
runVoid(dict.get(null, key, false).then().transform(LLUtils::handleDiscard));
}
@ParameterizedTest
@MethodSource("providePutArguments")
public void testPut(UpdateMode updateMode, LLDictionaryResultType resultType) {
var dict = getDict(updateMode);
var key = Mono.fromCallable(() -> fromString("test-key"));
var value = Mono.fromCallable(() -> fromString("test-value"));
runVoid(dict.put(key, value, resultType).then());
}
@ParameterizedTest
@MethodSource("provideArguments")
public void testGetUpdateMode(UpdateMode updateMode) {
var dict = getDict(updateMode);
assertEquals(updateMode, run(dict.getUpdateMode()));
}
@ParameterizedTest
@MethodSource("provideUpdateArguments")
public void testUpdate(UpdateMode updateMode, UpdateReturnMode updateReturnMode) {
var dict = getDict(updateMode);
var key = Mono.fromCallable(() -> fromString("test-key"));
runVoid(updateMode == UpdateMode.DISALLOW,
dict.update(key, old -> old, updateReturnMode, true).then().transform(LLUtils::handleDiscard)
);
runVoid(updateMode == UpdateMode.DISALLOW,
dict.update(key, old -> old, updateReturnMode, false).then().transform(LLUtils::handleDiscard)
);
runVoid(updateMode == UpdateMode.DISALLOW,
dict.update(key, old -> old, updateReturnMode).then().transform(LLUtils::handleDiscard)
);
}
@ParameterizedTest
@MethodSource("provideArguments")
public void testUpdateAndGetDelta(UpdateMode updateMode) {
var dict = getDict(updateMode);
var key = Mono.fromCallable(() -> fromString("test-key"));
runVoid(updateMode == UpdateMode.DISALLOW,
dict.updateAndGetDelta(key, old -> old, true).then().transform(LLUtils::handleDiscard)
);
runVoid(updateMode == UpdateMode.DISALLOW,
dict.updateAndGetDelta(key, old -> old, false).then().transform(LLUtils::handleDiscard)
);
runVoid(updateMode == UpdateMode.DISALLOW,
dict.updateAndGetDelta(key, old -> old).then().transform(LLUtils::handleDiscard)
);
}
@ParameterizedTest
@MethodSource("provideArguments")
public void testClear(UpdateMode updateMode) {
var dict = getDict(updateMode);
runVoid(dict.clear());
}
@ParameterizedTest
@MethodSource("providePutArguments")
public void testRemove(UpdateMode updateMode, LLDictionaryResultType resultType) {
var dict = getDict(updateMode);
var key = Mono.fromCallable(() -> fromString("test-key"));
runVoid(dict.remove(key, resultType).then());
}
@AfterEach
public void afterEach() {
DbTestUtils.closeTempDb(tempDb).block();
}
}

View File

@ -1,11 +1,16 @@
package it.cavallium.dbengine; package it.cavallium.dbengine;
import static it.cavallium.dbengine.DbTestUtils.ensureNoLeaks;
import static it.cavallium.dbengine.DbTestUtils.getUncachedAllocator;
import static it.cavallium.dbengine.DbTestUtils.getUncachedAllocatorUnsafe;
import static it.cavallium.dbengine.DbTestUtils.tempDb; import static it.cavallium.dbengine.DbTestUtils.tempDb;
import it.cavallium.dbengine.database.LLKeyValueDatabase; import it.cavallium.dbengine.database.LLKeyValueDatabase;
import it.cavallium.dbengine.database.collections.DatabaseInt; import it.cavallium.dbengine.database.collections.DatabaseInt;
import it.cavallium.dbengine.database.collections.DatabaseLong; import it.cavallium.dbengine.database.collections.DatabaseLong;
import java.util.stream.Stream; import java.util.stream.Stream;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.Arguments;
@ -34,6 +39,16 @@ public class TestSingletons {
); );
} }
@BeforeEach
public void beforeEach() {
ensureNoLeaks(getUncachedAllocator());
}
@AfterEach
public void afterEach() {
ensureNoLeaks(getUncachedAllocatorUnsafe());
}
@Test @Test
public void testCreateInteger() { public void testCreateInteger() {
StepVerifier StepVerifier