Bugfixes
This commit is contained in:
parent
0c26daba57
commit
0d3157ec3c
|
@ -13,8 +13,10 @@ import it.cavallium.dbengine.database.UpdateMode;
|
|||
import it.cavallium.dbengine.database.disk.LLLocalDictionary;
|
||||
import it.cavallium.dbengine.database.serialization.SerializerFixedBinaryLength;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.stream.Collectors;
|
||||
import lombok.Value;
|
||||
import org.jetbrains.annotations.Nullable;
|
||||
import reactor.core.publisher.Flux;
|
||||
|
@ -380,7 +382,7 @@ public class DatabaseMapDictionaryDeep<T, U, US extends DatabaseStage<U>> implem
|
|||
.using(
|
||||
() -> serializeSuffix(keySuffix),
|
||||
keySuffixData -> {
|
||||
Flux<ByteBuf> keyFlux = Flux
|
||||
Mono<List<ByteBuf>> debuggingKeysMono = Mono
|
||||
.defer(() -> {
|
||||
if (LLLocalDictionary.DEBUG_PREFIXES_WHEN_ASSERTIONS_ARE_ENABLED
|
||||
&& this.subStageGetter.needsDebuggingKeyFlux()) {
|
||||
|
@ -390,16 +392,19 @@ public class DatabaseMapDictionaryDeep<T, U, US extends DatabaseStage<U>> implem
|
|||
extRangeBuf -> this.dictionary
|
||||
.getRangeKeys(resolveSnapshot(snapshot), extRangeBuf.retain()),
|
||||
LLRange::release
|
||||
);
|
||||
)
|
||||
.collectList();
|
||||
} else {
|
||||
return Flux.empty();
|
||||
return Mono.just(List.of());
|
||||
}
|
||||
});
|
||||
return Mono
|
||||
.using(
|
||||
() -> toKeyWithoutExt(keySuffixData.retain()),
|
||||
keyBuf -> this.subStageGetter
|
||||
.subStage(dictionary, snapshot, keyBuf.retain(), keyFlux),
|
||||
keyBuf -> debuggingKeysMono
|
||||
.flatMap(debuggingKeysList -> this.subStageGetter
|
||||
.subStage(dictionary, snapshot, keyBuf.retain(), debuggingKeysList)
|
||||
),
|
||||
ReferenceCounted::release
|
||||
)
|
||||
.doOnDiscard(DatabaseStage.class, DatabaseStage::release);
|
||||
|
@ -447,9 +452,7 @@ public class DatabaseMapDictionaryDeep<T, U, US extends DatabaseStage<U>> implem
|
|||
.subStage(dictionary,
|
||||
snapshot,
|
||||
buffers.groupKeyWithoutExt.retain(),
|
||||
Flux
|
||||
.fromIterable(rangeKeys)
|
||||
.map(ByteBuf::retain)
|
||||
rangeKeys.stream().map(ByteBuf::retain).collect(Collectors.toList())
|
||||
)
|
||||
.map(us -> Map.entry(this.deserializeSuffix(buffers.groupSuffix.retain()), us))
|
||||
),
|
||||
|
@ -485,7 +488,7 @@ public class DatabaseMapDictionaryDeep<T, U, US extends DatabaseStage<U>> implem
|
|||
.subStage(dictionary,
|
||||
snapshot,
|
||||
groupKeyWithoutExt.retain(),
|
||||
Flux.empty()
|
||||
List.of()
|
||||
)
|
||||
.map(us -> Map.entry(this.deserializeSuffix(groupSuffix.retain()), us))
|
||||
.doFinally(s -> groupSuffix.release());
|
||||
|
|
|
@ -3,6 +3,8 @@ package it.cavallium.dbengine.database.collections;
|
|||
import io.netty.buffer.ByteBuf;
|
||||
import it.cavallium.dbengine.client.CompositeSnapshot;
|
||||
import it.cavallium.dbengine.database.LLDictionary;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import org.jetbrains.annotations.Nullable;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
@ -12,7 +14,7 @@ public interface SubStageGetter<U, US extends DatabaseStage<U>> {
|
|||
Mono<US> subStage(LLDictionary dictionary,
|
||||
@Nullable CompositeSnapshot snapshot,
|
||||
ByteBuf prefixKey,
|
||||
Flux<ByteBuf> debuggingKeyFlux);
|
||||
List<ByteBuf> debuggingKeyFlux);
|
||||
|
||||
boolean isMultiKey();
|
||||
|
||||
|
|
|
@ -6,6 +6,7 @@ import it.cavallium.dbengine.client.CompositeSnapshot;
|
|||
import it.cavallium.dbengine.database.LLDictionary;
|
||||
import it.cavallium.dbengine.database.serialization.Serializer;
|
||||
import it.cavallium.dbengine.database.serialization.SerializerFixedBinaryLength;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.function.Function;
|
||||
import org.jetbrains.annotations.Nullable;
|
||||
|
@ -44,24 +45,33 @@ public class SubStageGetterHashMap<T, U, TH> implements
|
|||
public Mono<DatabaseMapDictionaryHashed<T, U, TH>> subStage(LLDictionary dictionary,
|
||||
@Nullable CompositeSnapshot snapshot,
|
||||
ByteBuf prefixKey,
|
||||
Flux<ByteBuf> debuggingKeyFlux) {
|
||||
Mono<DatabaseMapDictionaryHashed<T, U, TH>> result = Mono.fromSupplier(() -> DatabaseMapDictionaryHashed.tail(dictionary,
|
||||
prefixKey.retain(),
|
||||
keySerializer,
|
||||
valueSerializer,
|
||||
keyHashFunction,
|
||||
keyHashSerializer
|
||||
));
|
||||
if (assertsEnabled) {
|
||||
return checkKeyFluxConsistency(prefixKey.retain(), debuggingKeyFlux)
|
||||
.then(result)
|
||||
.doFinally(s -> prefixKey.release());
|
||||
} else {
|
||||
return debuggingKeyFlux
|
||||
.flatMap(key -> Mono.fromRunnable(key::release))
|
||||
.then(result)
|
||||
.doFinally(s -> prefixKey.release());
|
||||
}
|
||||
List<ByteBuf> debuggingKeys) {
|
||||
return Mono
|
||||
.defer(() -> {
|
||||
if (assertsEnabled) {
|
||||
return checkKeyFluxConsistency(prefixKey.retain(), debuggingKeys);
|
||||
} else {
|
||||
return Mono
|
||||
.fromCallable(() -> {
|
||||
for (ByteBuf key : debuggingKeys) {
|
||||
key.release();
|
||||
}
|
||||
return null;
|
||||
});
|
||||
}
|
||||
})
|
||||
.then(Mono
|
||||
.fromSupplier(() -> DatabaseMapDictionaryHashed
|
||||
.tail(dictionary,
|
||||
prefixKey.retain(),
|
||||
keySerializer,
|
||||
valueSerializer,
|
||||
keyHashFunction,
|
||||
keyHashSerializer
|
||||
)
|
||||
)
|
||||
)
|
||||
.doFinally(s -> prefixKey.release());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -74,15 +84,21 @@ public class SubStageGetterHashMap<T, U, TH> implements
|
|||
return assertsEnabled;
|
||||
}
|
||||
|
||||
private Mono<Void> checkKeyFluxConsistency(ByteBuf prefixKey, Flux<ByteBuf> keyFlux) {
|
||||
return keyFlux
|
||||
.doOnNext(key -> {
|
||||
assert key.readableBytes() == prefixKey.readableBytes() + getKeyHashBinaryLength();
|
||||
})
|
||||
.flatMap(key -> Mono.fromRunnable(key::release))
|
||||
.doOnDiscard(ByteBuf.class, ReferenceCounted::release)
|
||||
.then()
|
||||
.doFinally(s -> prefixKey.release());
|
||||
private Mono<Void> checkKeyFluxConsistency(ByteBuf prefixKey, List<ByteBuf> keys) {
|
||||
return Mono
|
||||
.fromCallable(() -> {
|
||||
try {
|
||||
for (ByteBuf key : keys) {
|
||||
assert key.readableBytes() == prefixKey.readableBytes() + getKeyHashBinaryLength();
|
||||
}
|
||||
} finally {
|
||||
prefixKey.release();
|
||||
for (ByteBuf key : keys) {
|
||||
key.release();
|
||||
}
|
||||
}
|
||||
return null;
|
||||
});
|
||||
}
|
||||
|
||||
public int getKeyHashBinaryLength() {
|
||||
|
|
|
@ -7,6 +7,7 @@ import it.cavallium.dbengine.database.LLDictionary;
|
|||
import it.cavallium.dbengine.database.collections.DatabaseEmpty.Nothing;
|
||||
import it.cavallium.dbengine.database.serialization.Serializer;
|
||||
import it.cavallium.dbengine.database.serialization.SerializerFixedBinaryLength;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.function.Function;
|
||||
import org.jetbrains.annotations.Nullable;
|
||||
|
@ -42,23 +43,32 @@ public class SubStageGetterHashSet<T, TH> implements
|
|||
public Mono<DatabaseSetDictionaryHashed<T, TH>> subStage(LLDictionary dictionary,
|
||||
@Nullable CompositeSnapshot snapshot,
|
||||
ByteBuf prefixKey,
|
||||
Flux<ByteBuf> debuggingKeyFlux) {
|
||||
Mono<DatabaseSetDictionaryHashed<T, TH>> result = Mono.fromSupplier(() -> DatabaseSetDictionaryHashed.tail(dictionary,
|
||||
prefixKey.retain(),
|
||||
keySerializer,
|
||||
keyHashFunction,
|
||||
keyHashSerializer
|
||||
));
|
||||
if (assertsEnabled) {
|
||||
return checkKeyFluxConsistency(prefixKey.retain(), debuggingKeyFlux)
|
||||
.then(result)
|
||||
.doFinally(s -> prefixKey.release());
|
||||
} else {
|
||||
return debuggingKeyFlux
|
||||
.flatMap(key -> Mono.fromRunnable(key::release))
|
||||
.then(result)
|
||||
.doFinally(s -> prefixKey.release());
|
||||
}
|
||||
List<ByteBuf> debuggingKeys) {
|
||||
return Mono
|
||||
.defer(() -> {
|
||||
if (assertsEnabled) {
|
||||
return checkKeyFluxConsistency(prefixKey.retain(), debuggingKeys);
|
||||
} else {
|
||||
return Mono
|
||||
.fromCallable(() -> {
|
||||
for (ByteBuf key : debuggingKeys) {
|
||||
key.release();
|
||||
}
|
||||
return null;
|
||||
});
|
||||
}
|
||||
})
|
||||
.then(Mono
|
||||
.fromSupplier(() -> DatabaseSetDictionaryHashed
|
||||
.tail(dictionary,
|
||||
prefixKey.retain(),
|
||||
keySerializer,
|
||||
keyHashFunction,
|
||||
keyHashSerializer
|
||||
)
|
||||
)
|
||||
)
|
||||
.doFinally(s -> prefixKey.release());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -71,15 +81,21 @@ public class SubStageGetterHashSet<T, TH> implements
|
|||
return assertsEnabled;
|
||||
}
|
||||
|
||||
private Mono<Void> checkKeyFluxConsistency(ByteBuf prefixKey, Flux<ByteBuf> keyFlux) {
|
||||
return keyFlux
|
||||
.doOnNext(key -> {
|
||||
assert key.readableBytes() == prefixKey.readableBytes() + getKeyHashBinaryLength();
|
||||
})
|
||||
.flatMap(key -> Mono.fromRunnable(key::release))
|
||||
.doOnDiscard(ByteBuf.class, ReferenceCounted::release)
|
||||
.then()
|
||||
.doFinally(s -> prefixKey.release());
|
||||
private Mono<Void> checkKeyFluxConsistency(ByteBuf prefixKey, List<ByteBuf> keys) {
|
||||
return Mono
|
||||
.fromCallable(() -> {
|
||||
try {
|
||||
for (ByteBuf key : keys) {
|
||||
assert key.readableBytes() == prefixKey.readableBytes() + getKeyHashBinaryLength();
|
||||
}
|
||||
} finally {
|
||||
prefixKey.release();
|
||||
for (ByteBuf key : keys) {
|
||||
key.release();
|
||||
}
|
||||
}
|
||||
return null;
|
||||
});
|
||||
}
|
||||
|
||||
public int getKeyHashBinaryLength() {
|
||||
|
|
|
@ -6,6 +6,7 @@ import it.cavallium.dbengine.client.CompositeSnapshot;
|
|||
import it.cavallium.dbengine.database.LLDictionary;
|
||||
import it.cavallium.dbengine.database.serialization.Serializer;
|
||||
import it.cavallium.dbengine.database.serialization.SerializerFixedBinaryLength;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import org.jetbrains.annotations.Nullable;
|
||||
import reactor.core.publisher.Flux;
|
||||
|
@ -35,28 +36,31 @@ public class SubStageGetterMap<T, U> implements SubStageGetter<Map<T, U>, Databa
|
|||
public Mono<DatabaseMapDictionary<T, U>> subStage(LLDictionary dictionary,
|
||||
@Nullable CompositeSnapshot snapshot,
|
||||
ByteBuf prefixKey,
|
||||
Flux<ByteBuf> debuggingKeyFlux) {
|
||||
List<ByteBuf> debuggingKeys) {
|
||||
return Mono
|
||||
.using(
|
||||
() -> true,
|
||||
b -> Mono
|
||||
.fromSupplier(() -> DatabaseMapDictionary.tail(dictionary, prefixKey.retain(), keySerializer, valueSerializer))
|
||||
.doOnDiscard(DatabaseMapDictionary.class, DatabaseMapDictionary::release)
|
||||
.transformDeferred(result -> {
|
||||
if (assertsEnabled) {
|
||||
return this
|
||||
.checkKeyFluxConsistency(prefixKey.retain(), debuggingKeyFlux)
|
||||
.then(result);
|
||||
} else {
|
||||
return debuggingKeyFlux
|
||||
.flatMap(buf -> Mono.fromRunnable(buf::release))
|
||||
.doOnDiscard(ByteBuf.class, ReferenceCounted::release)
|
||||
.then(result);
|
||||
.defer(() -> {
|
||||
if (assertsEnabled) {
|
||||
return checkKeyFluxConsistency(prefixKey.retain(), debuggingKeys);
|
||||
} else {
|
||||
return Mono
|
||||
.fromCallable(() -> {
|
||||
for (ByteBuf key : debuggingKeys) {
|
||||
key.release();
|
||||
}
|
||||
})
|
||||
.doOnDiscard(DatabaseMapDictionary.class, DatabaseMapDictionary::release),
|
||||
b -> prefixKey.release()
|
||||
);
|
||||
return null;
|
||||
});
|
||||
}
|
||||
})
|
||||
.then(Mono
|
||||
.fromSupplier(() -> DatabaseMapDictionary
|
||||
.tail(dictionary,
|
||||
prefixKey.retain(),
|
||||
keySerializer,
|
||||
valueSerializer
|
||||
)
|
||||
)
|
||||
)
|
||||
.doFinally(s -> prefixKey.release());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -69,15 +73,21 @@ public class SubStageGetterMap<T, U> implements SubStageGetter<Map<T, U>, Databa
|
|||
return assertsEnabled;
|
||||
}
|
||||
|
||||
private Mono<Void> checkKeyFluxConsistency(ByteBuf prefixKey, Flux<ByteBuf> keyFlux) {
|
||||
return keyFlux
|
||||
.doOnNext(key -> {
|
||||
assert key.readableBytes() == prefixKey.readableBytes() + getKeyBinaryLength();
|
||||
})
|
||||
.flatMap(key -> Mono.fromRunnable(key::release))
|
||||
.doOnDiscard(ByteBuf.class, ReferenceCounted::release)
|
||||
.then()
|
||||
.doFinally(s -> prefixKey.release());
|
||||
private Mono<Void> checkKeyFluxConsistency(ByteBuf prefixKey, List<ByteBuf> keys) {
|
||||
return Mono
|
||||
.fromCallable(() -> {
|
||||
try {
|
||||
for (ByteBuf key : keys) {
|
||||
assert key.readableBytes() == prefixKey.readableBytes() + getKeyBinaryLength();
|
||||
}
|
||||
} finally {
|
||||
prefixKey.release();
|
||||
for (ByteBuf key : keys) {
|
||||
key.release();
|
||||
}
|
||||
}
|
||||
return null;
|
||||
});
|
||||
}
|
||||
|
||||
public int getKeyBinaryLength() {
|
||||
|
|
|
@ -5,6 +5,7 @@ import io.netty.util.ReferenceCounted;
|
|||
import it.cavallium.dbengine.client.CompositeSnapshot;
|
||||
import it.cavallium.dbengine.database.LLDictionary;
|
||||
import it.cavallium.dbengine.database.serialization.SerializerFixedBinaryLength;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import org.jetbrains.annotations.Nullable;
|
||||
import reactor.core.publisher.Flux;
|
||||
|
@ -49,23 +50,30 @@ public class SubStageGetterMapDeep<T, U, US extends DatabaseStage<U>> implements
|
|||
public Mono<DatabaseMapDictionaryDeep<T, U, US>> subStage(LLDictionary dictionary,
|
||||
@Nullable CompositeSnapshot snapshot,
|
||||
ByteBuf prefixKey,
|
||||
Flux<ByteBuf> debuggingKeyFlux) {
|
||||
return Flux
|
||||
List<ByteBuf> debuggingKeys) {
|
||||
return Mono
|
||||
.defer(() -> {
|
||||
if (assertsEnabled) {
|
||||
return this
|
||||
.checkKeyFluxConsistency(prefixKey.retain(), debuggingKeyFlux);
|
||||
return checkKeyFluxConsistency(prefixKey.retain(), debuggingKeys);
|
||||
} else {
|
||||
return debuggingKeyFlux.flatMap(buf -> Mono.fromRunnable(buf::release));
|
||||
return Mono
|
||||
.fromCallable(() -> {
|
||||
for (ByteBuf key : debuggingKeys) {
|
||||
key.release();
|
||||
}
|
||||
return null;
|
||||
});
|
||||
}
|
||||
})
|
||||
.then(Mono
|
||||
.fromSupplier(() -> DatabaseMapDictionaryDeep.deepIntermediate(dictionary,
|
||||
prefixKey.retain(),
|
||||
keySerializer,
|
||||
subStageGetter,
|
||||
keyExtLength
|
||||
))
|
||||
.fromSupplier(() -> DatabaseMapDictionaryDeep
|
||||
.deepIntermediate(dictionary,
|
||||
prefixKey.retain(),
|
||||
keySerializer,
|
||||
subStageGetter,
|
||||
keyExtLength
|
||||
)
|
||||
)
|
||||
)
|
||||
.doFinally(s -> prefixKey.release());
|
||||
}
|
||||
|
@ -80,15 +88,21 @@ public class SubStageGetterMapDeep<T, U, US extends DatabaseStage<U>> implements
|
|||
return assertsEnabled;
|
||||
}
|
||||
|
||||
private Mono<Void> checkKeyFluxConsistency(ByteBuf prefixKey, Flux<ByteBuf> keyFlux) {
|
||||
return keyFlux
|
||||
.doOnNext(key -> {
|
||||
assert key.readableBytes() == prefixKey.readableBytes() + getKeyBinaryLength();
|
||||
})
|
||||
.flatMap(key -> Mono.fromRunnable(key::release))
|
||||
.doOnDiscard(ByteBuf.class, ReferenceCounted::release)
|
||||
.then()
|
||||
.doFinally(s -> prefixKey.release());
|
||||
private Mono<Void> checkKeyFluxConsistency(ByteBuf prefixKey, List<ByteBuf> keys) {
|
||||
return Mono
|
||||
.fromCallable(() -> {
|
||||
try {
|
||||
for (ByteBuf key : keys) {
|
||||
assert key.readableBytes() == prefixKey.readableBytes() + getKeyBinaryLength();
|
||||
}
|
||||
} finally {
|
||||
prefixKey.release();
|
||||
for (ByteBuf key : keys) {
|
||||
key.release();
|
||||
}
|
||||
}
|
||||
return null;
|
||||
});
|
||||
}
|
||||
|
||||
public int getKeyBinaryLength() {
|
||||
|
|
|
@ -6,6 +6,7 @@ import it.cavallium.dbengine.client.CompositeSnapshot;
|
|||
import it.cavallium.dbengine.database.LLDictionary;
|
||||
import it.cavallium.dbengine.database.collections.DatabaseEmpty.Nothing;
|
||||
import it.cavallium.dbengine.database.serialization.SerializerFixedBinaryLength;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import org.jetbrains.annotations.Nullable;
|
||||
import reactor.core.publisher.Flux;
|
||||
|
@ -32,19 +33,31 @@ public class SubStageGetterSet<T> implements SubStageGetter<Map<T, Nothing>, Dat
|
|||
public Mono<DatabaseSetDictionary<T>> subStage(LLDictionary dictionary,
|
||||
@Nullable CompositeSnapshot snapshot,
|
||||
ByteBuf prefixKey,
|
||||
Flux<ByteBuf> debuggingKeyFlux) {
|
||||
Mono<DatabaseSetDictionary<T>> result = Mono
|
||||
.fromSupplier(() -> DatabaseSetDictionary.tail(dictionary, prefixKey.retain(), keySerializer));
|
||||
if (assertsEnabled) {
|
||||
return checkKeyFluxConsistency(prefixKey.retain(), debuggingKeyFlux)
|
||||
.then(result)
|
||||
.doFinally(s -> prefixKey.release());
|
||||
} else {
|
||||
return debuggingKeyFlux
|
||||
.flatMap(key -> Mono.fromRunnable(key::release))
|
||||
.then(result)
|
||||
.doFinally(s -> prefixKey.release());
|
||||
}
|
||||
List<ByteBuf> debuggingKeys) {
|
||||
return Mono
|
||||
.defer(() -> {
|
||||
if (assertsEnabled) {
|
||||
return checkKeyFluxConsistency(prefixKey.retain(), debuggingKeys);
|
||||
} else {
|
||||
return Mono
|
||||
.fromCallable(() -> {
|
||||
for (ByteBuf key : debuggingKeys) {
|
||||
key.release();
|
||||
}
|
||||
return null;
|
||||
});
|
||||
}
|
||||
})
|
||||
.then(Mono
|
||||
.fromSupplier(() -> DatabaseSetDictionary
|
||||
.tail(
|
||||
dictionary,
|
||||
prefixKey.retain(),
|
||||
keySerializer
|
||||
)
|
||||
)
|
||||
)
|
||||
.doFinally(s -> prefixKey.release());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -57,15 +70,21 @@ public class SubStageGetterSet<T> implements SubStageGetter<Map<T, Nothing>, Dat
|
|||
return assertsEnabled;
|
||||
}
|
||||
|
||||
private Mono<Void> checkKeyFluxConsistency(ByteBuf prefixKey, Flux<ByteBuf> keyFlux) {
|
||||
return keyFlux
|
||||
.doOnNext(key -> {
|
||||
assert key.readableBytes() == prefixKey.readableBytes() + getKeyBinaryLength();
|
||||
})
|
||||
.flatMap(key -> Mono.fromRunnable(key::release))
|
||||
.doOnDiscard(ByteBuf.class, ReferenceCounted::release)
|
||||
.then()
|
||||
.doFinally(s -> prefixKey.release());
|
||||
private Mono<Void> checkKeyFluxConsistency(ByteBuf prefixKey, List<ByteBuf> keys) {
|
||||
return Mono
|
||||
.fromCallable(() -> {
|
||||
try {
|
||||
for (ByteBuf key : keys) {
|
||||
assert key.readableBytes() == prefixKey.readableBytes() + getKeyBinaryLength();
|
||||
}
|
||||
} finally {
|
||||
prefixKey.release();
|
||||
for (ByteBuf key : keys) {
|
||||
key.release();
|
||||
}
|
||||
}
|
||||
return null;
|
||||
});
|
||||
}
|
||||
|
||||
public int getKeyBinaryLength() {
|
||||
|
|
|
@ -6,6 +6,7 @@ import it.cavallium.dbengine.database.LLDictionary;
|
|||
import it.cavallium.dbengine.database.LLUtils;
|
||||
import it.cavallium.dbengine.database.serialization.Serializer;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import org.jetbrains.annotations.Nullable;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
@ -31,21 +32,22 @@ public class SubStageGetterSingle<T> implements SubStageGetter<T, DatabaseStageE
|
|||
public Mono<DatabaseStageEntry<T>> subStage(LLDictionary dictionary,
|
||||
@Nullable CompositeSnapshot snapshot,
|
||||
ByteBuf keyPrefix,
|
||||
Flux<ByteBuf> debuggingKeyFlux) {
|
||||
return debuggingKeyFlux
|
||||
.singleOrEmpty()
|
||||
.flatMap(key -> Mono
|
||||
.<DatabaseStageEntry<T>>fromCallable(() -> {
|
||||
try {
|
||||
if (!LLUtils.equals(keyPrefix, key)) {
|
||||
throw new IndexOutOfBoundsException("Found more than one element!");
|
||||
}
|
||||
} finally {
|
||||
key.release();
|
||||
List<ByteBuf> debuggingKeys) {
|
||||
return Mono
|
||||
.fromCallable(() -> {
|
||||
try {
|
||||
for (ByteBuf key : debuggingKeys) {
|
||||
if (!LLUtils.equals(keyPrefix, key)) {
|
||||
throw new IndexOutOfBoundsException("Found more than one element!");
|
||||
}
|
||||
return null;
|
||||
})
|
||||
)
|
||||
}
|
||||
return null;
|
||||
} finally {
|
||||
for (ByteBuf key : debuggingKeys) {
|
||||
key.release();
|
||||
}
|
||||
}
|
||||
})
|
||||
.then(Mono
|
||||
.<DatabaseStageEntry<T>>fromSupplier(() -> new DatabaseSingle<>(dictionary, keyPrefix.retain(), serializer))
|
||||
)
|
||||
|
|
|
@ -89,6 +89,10 @@ public class LLLocalDictionary implements LLDictionary {
|
|||
private static final byte[] NO_DATA = new byte[0];
|
||||
|
||||
private static final boolean ASSERTIONS_ENABLED;
|
||||
/**
|
||||
* Default: true
|
||||
*/
|
||||
private static final boolean USE_DIRECT_BUFFER_BOUNDS = true;
|
||||
|
||||
static {
|
||||
boolean assertionsEnabled = false;
|
||||
|
@ -484,9 +488,9 @@ public class LLLocalDictionary implements LLDictionary {
|
|||
@Nullable ByteBuf newData;
|
||||
ByteBuf prevDataToSendToUpdater = prevData == null ? null : prevData.retainedSlice();
|
||||
try {
|
||||
newData = updater.apply(
|
||||
prevDataToSendToUpdater == null ? null : prevDataToSendToUpdater.retain());
|
||||
assert prevDataToSendToUpdater == null || prevDataToSendToUpdater.readerIndex() == 0
|
||||
newData = updater.apply(prevDataToSendToUpdater == null ? null : prevDataToSendToUpdater.retain());
|
||||
assert prevDataToSendToUpdater == null
|
||||
|| prevDataToSendToUpdater.readerIndex() == 0
|
||||
|| !prevDataToSendToUpdater.isReadable();
|
||||
} finally {
|
||||
if (prevDataToSendToUpdater != null) {
|
||||
|
@ -928,23 +932,19 @@ public class LLLocalDictionary implements LLDictionary {
|
|||
public Flux<List<ByteBuf>> getRangeKeysGrouped(@Nullable LLSnapshot snapshot, LLRange range, int prefixLength) {
|
||||
return Flux
|
||||
.using(
|
||||
() -> true,
|
||||
b -> Flux
|
||||
.using(
|
||||
() -> new LLLocalGroupedKeyReactiveRocksIterator(db,
|
||||
alloc,
|
||||
cfh,
|
||||
prefixLength,
|
||||
range.retain(),
|
||||
resolveSnapshot(snapshot),
|
||||
"getRangeKeysGrouped"
|
||||
),
|
||||
LLLocalGroupedReactiveRocksIterator::flux,
|
||||
LLLocalGroupedReactiveRocksIterator::release
|
||||
)
|
||||
.subscribeOn(dbScheduler),
|
||||
b -> range.release()
|
||||
);
|
||||
() -> new LLLocalGroupedKeyReactiveRocksIterator(db,
|
||||
alloc,
|
||||
cfh,
|
||||
prefixLength,
|
||||
range.retain(),
|
||||
resolveSnapshot(snapshot),
|
||||
"getRangeKeysGrouped"
|
||||
),
|
||||
LLLocalGroupedReactiveRocksIterator::flux,
|
||||
LLLocalGroupedReactiveRocksIterator::release
|
||||
)
|
||||
.subscribeOn(dbScheduler)
|
||||
.doFinally(s -> range.release());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -1173,10 +1173,18 @@ public class LLLocalDictionary implements LLDictionary {
|
|||
|
||||
private static ReleasableSlice setIterateBound(ReadOptions readOpts, IterateBound boundType, ByteBuf buffer) {
|
||||
try {
|
||||
ByteBuffer nioBuffer = LLUtils.toDirect(buffer);
|
||||
AbstractSlice<?> slice;
|
||||
assert nioBuffer.isDirect();
|
||||
slice = new DirectSlice(nioBuffer);
|
||||
ByteBuffer nioBuffer;
|
||||
if (LLLocalDictionary.USE_DIRECT_BUFFER_BOUNDS) {
|
||||
nioBuffer = LLUtils.toDirect(buffer);
|
||||
assert nioBuffer.isDirect();
|
||||
slice = new DirectSlice(nioBuffer, buffer.readableBytes());
|
||||
assert slice.size() == buffer.readableBytes();
|
||||
assert slice.compare(new Slice(LLUtils.toArray(buffer))) == 0;
|
||||
} else {
|
||||
nioBuffer = null;
|
||||
slice = new Slice(LLUtils.toArray(buffer));
|
||||
}
|
||||
if (boundType == IterateBound.LOWER) {
|
||||
readOpts.setIterateLowerBound(slice);
|
||||
} else {
|
||||
|
@ -1559,18 +1567,18 @@ public class LLLocalDictionary implements LLDictionary {
|
|||
ReleasableSlice sliceMin;
|
||||
ReleasableSlice sliceMax;
|
||||
if (range.hasMin()) {
|
||||
sliceMin = setIterateBound(readOptions, IterateBound.LOWER, range.getMin().retainedSlice());
|
||||
sliceMin = setIterateBound(readOptions, IterateBound.LOWER, range.getMin().retain());
|
||||
} else {
|
||||
sliceMin = emptyReleasableSlice();
|
||||
}
|
||||
if (range.hasMax()) {
|
||||
sliceMax = setIterateBound(readOptions, IterateBound.UPPER, range.getMax().retainedSlice());
|
||||
sliceMax = setIterateBound(readOptions, IterateBound.UPPER, range.getMax().retain());
|
||||
} else {
|
||||
sliceMax = emptyReleasableSlice();
|
||||
}
|
||||
var rocksIterator = db.newIterator(cfh, readOptions);
|
||||
if (!PREFER_SEEK_TO_FIRST && range.hasMin()) {
|
||||
rocksIterSeekTo(rocksIterator, range.getMin().retainedSlice());
|
||||
rocksIterSeekTo(rocksIterator, range.getMin().retain());
|
||||
} else {
|
||||
rocksIterator.seekToFirst();
|
||||
}
|
||||
|
|
|
@ -1,7 +1,5 @@
|
|||
package it.cavallium.dbengine.database.disk;
|
||||
|
||||
import static it.cavallium.dbengine.database.disk.LLLocalDictionary.getRocksIterator;
|
||||
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.buffer.ByteBufAllocator;
|
||||
import io.netty.buffer.ByteBufUtil;
|
||||
|
@ -51,14 +49,13 @@ public abstract class LLLocalGroupedReactiveRocksIterator<T> {
|
|||
.generate(() -> {
|
||||
var readOptions = new ReadOptions(this.readOptions);
|
||||
readOptions.setFillCache(canFillCache && range.hasMin() && range.hasMax());
|
||||
return getRocksIterator(readOptions, range.retain(), db, cfh);
|
||||
return LLLocalDictionary.getRocksIterator(readOptions, range.retain(), db, cfh);
|
||||
}, (tuple, sink) -> {
|
||||
range.retain();
|
||||
try {
|
||||
var rocksIterator = tuple.getT1();
|
||||
ObjectArrayList<T> values = new ObjectArrayList<>();
|
||||
ByteBuf firstGroupKey = null;
|
||||
|
||||
try {
|
||||
while (rocksIterator.isValid()) {
|
||||
ByteBuf key = LLUtils.readDirectNioBuffer(alloc, rocksIterator::key);
|
||||
|
|
|
@ -61,7 +61,7 @@ public class LLLocalKeyPrefixReactiveRocksIterator {
|
|||
try {
|
||||
if (firstGroupKey == null) {
|
||||
firstGroupKey = key.retain();
|
||||
} else if (!ByteBufUtil.equals(firstGroupKey, 0, key, 0, prefixLength)) {
|
||||
} else if (!ByteBufUtil.equals(firstGroupKey, firstGroupKey.readerIndex(), key, key.readerIndex(), prefixLength)) {
|
||||
break;
|
||||
}
|
||||
rocksIterator.next();
|
||||
|
|
Loading…
Reference in New Issue
Block a user