Remove netty 5 and unused code

This commit is contained in:
Andrea Cavalli 2023-02-28 23:10:31 +01:00
parent 024c4ee226
commit 011c8f839c
29 changed files with 228 additions and 219 deletions

38
pom.xml
View File

@ -16,7 +16,7 @@
<lucene.version>9.5.0</lucene.version>
<rocksdb.version>7.9.2</rocksdb.version>
<junit.jupiter.version>5.9.0</junit.jupiter.version>
<data.generator.version>1.0.244</data.generator.version>
<data.generator.version>1.0.249</data.generator.version>
</properties>
<repositories>
<repository>
@ -40,17 +40,6 @@
<enabled>true</enabled>
</snapshots>
</repository>
<repository>
<id>netty5-snapshots</id>
<name>Netty 5 snapshots</name>
<url>https://oss.sonatype.org/content/repositories/snapshots</url>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
<repository>
<id>apache.snapshots</id>
<name>Apache Snapshot Repository</name>
@ -97,13 +86,6 @@
<artifactId>hamcrest-library</artifactId>
<version>2.2</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-bom</artifactId>
<version>4.1.86.Final</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
@ -112,22 +94,6 @@
<artifactId>guava</artifactId>
<version>31.1-jre</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-buffer</artifactId>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport</artifactId>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-codec</artifactId>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-handler</artifactId>
</dependency>
<dependency>
<groupId>org.yaml</groupId>
<artifactId>snakeyaml</artifactId>
@ -217,7 +183,7 @@
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>2.0.3</version>
<version>2.0.5</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>

View File

@ -80,7 +80,7 @@ public class CodecsExample {
.then(),
SpeedExample.numRepeats,
tuple -> tuple.getT1().close()
)).transform(LLUtils::handleDiscard).subscribeOn(Schedulers.parallel()).blockOptional();
)).subscribeOn(Schedulers.parallel()).blockOptional();
}
private static void testConversion() {
@ -88,7 +88,6 @@ public class CodecsExample {
.then()
.then(readNew())
.subscribeOn(Schedulers.parallel())
.transform(LLUtils::handleDiscard)
.blockOptional();
}

View File

@ -62,7 +62,6 @@ public class IndicizationExample {
.then(index.close())
)
.subscribeOn(Schedulers.parallel())
.transform(LLUtils::handleDiscard)
.block();
tempIndex(true)
.flatMap(index ->
@ -139,7 +138,6 @@ public class IndicizationExample {
.then(index.close())
)
.subscribeOn(Schedulers.parallel())
.transform(LLUtils::handleDiscard)
.block();
}

View File

@ -56,7 +56,6 @@ public class SpeedExample {
.then(test3LevelPut())
.then(test4LevelPut())
.subscribeOn(Schedulers.parallel())
.transform(LLUtils::handleDiscard)
.blockOptional();
}

View File

@ -17,9 +17,6 @@ public sealed interface ConnectionSettings {
record LocalConnectionSettings(Path dataPath) implements PrimaryConnectionSettings, SubConnectionSettings {}
record QuicConnectionSettings(SocketAddress bindAddress, SocketAddress remoteAddress) implements
PrimaryConnectionSettings, SubConnectionSettings {}
record MultiConnectionSettings(Map<ConnectionPart, SubConnectionSettings> parts) implements
PrimaryConnectionSettings {

View File

@ -1,7 +1,11 @@
package it.cavallium.dbengine.client;
import java.util.Map;
import java.util.Map.Entry;
import org.jetbrains.annotations.Contract;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.jetbrains.annotations.Unmodifiable;
public record HitEntry<T, U>(T key, @Nullable U value, float score)
implements Comparable<HitEntry<T, U>> {
@ -10,4 +14,13 @@ public record HitEntry<T, U>(T key, @Nullable U value, float score)
public int compareTo(@NotNull HitEntry<T, U> o) {
return Float.compare(o.score, this.score);
}
@Contract(pure = true)
public @Nullable @Unmodifiable Entry<T, U> toEntry() {
if (value != null) {
return Map.entry(key, value);
} else {
return null;
}
}
}

View File

@ -56,7 +56,7 @@ public interface LLKeyValueDatabase extends LLSnapshottable, LLKeyValueDatabaseS
void verifyChecksum();
void compact() throws RocksDBException;
void compact();
void flush();

View File

@ -4,7 +4,6 @@ import static org.apache.commons.lang3.ArrayUtils.EMPTY_BYTE_ARRAY;
import com.google.common.primitives.Ints;
import com.google.common.primitives.Longs;
import io.netty.util.IllegalReferenceCountException;
import it.cavallium.dbengine.buffers.Buf;
import it.cavallium.dbengine.client.HitEntry;
import it.cavallium.dbengine.client.HitKey;
@ -288,37 +287,25 @@ public class LLUtils {
}
public static String toStringSafe(byte @Nullable[] key) {
try {
if (key == null) {
return toString(key);
} else {
return "(released)";
}
} catch (IllegalReferenceCountException ex) {
if (key == null) {
return toString(key);
} else {
return "(released)";
}
}
public static String toStringSafe(@Nullable Buf key) {
try {
if (key == null) {
return toString(key);
} else {
return "(released)";
}
} catch (IllegalReferenceCountException ex) {
if (key == null) {
return toString(key);
} else {
return "(released)";
}
}
public static String toStringSafe(@Nullable LLRange range) {
try {
if (range == null) {
return toString(range);
} else {
return "(released)";
}
} catch (IllegalReferenceCountException ex) {
if (range == null) {
return toString(range);
} else {
return "(released)";
}
}

View File

@ -1,9 +1,5 @@
package it.cavallium.dbengine.database.collections;
import static it.cavallium.dbengine.utils.StreamUtils.ROCKSDB_SCHEDULER;
import static it.cavallium.dbengine.utils.StreamUtils.collectOn;
import static it.cavallium.dbengine.utils.StreamUtils.fastListing;
import it.cavallium.dbengine.buffers.Buf;
import it.cavallium.dbengine.buffers.BufDataInput;
import it.cavallium.dbengine.buffers.BufDataOutput;
@ -29,12 +25,13 @@ import it.cavallium.dbengine.utils.StreamUtils;
import it.unimi.dsi.fastutil.objects.Object2ObjectLinkedOpenHashMap;
import it.unimi.dsi.fastutil.objects.Object2ObjectSortedMap;
import it.unimi.dsi.fastutil.objects.Object2ObjectSortedMaps;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.logging.log4j.LogManager;
@ -82,14 +79,15 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
boolean smallRange) {
if (keyMin != null || keyMax != null) {
return databaseMapDictionary.getAllValues(snapshot,
return databaseMapDictionary.getAllEntries(snapshot,
keyMin,
keyMax,
reverse,
smallRange
smallRange,
Map::entry
);
} else {
return databaseMapDictionary.getAllValues(snapshot, smallRange);
return databaseMapDictionary.getAllEntries(snapshot, smallRange, Map::entry);
}
}
@ -192,7 +190,7 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
var map = StreamUtils.collect(stream,
Collectors.toMap(Entry::getKey, Entry::getValue, (a, b) -> a, Object2ObjectLinkedOpenHashMap::new)
);
return map.isEmpty() ? null : map;
return map == null || map.isEmpty() ? null : map;
}
@Override
@ -446,35 +444,79 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
});
}
private Stream<T> getAllKeys(@Nullable CompositeSnapshot snapshot,
LLRange sliceRange, boolean reverse, boolean smallRange) {
return dictionary
.getRangeKeys(resolveSnapshot(snapshot), sliceRange, reverse, smallRange)
.map(keyBuf -> {
assert keyBuf.size() == keyPrefixLength + keySuffixLength + keyExtLength;
// Remove prefix. Keep only the suffix and the ext
var suffixAndExtIn = BufDataInput.create(keyBuf);
suffixAndExtIn.skipBytes(keyPrefixLength);
suffixKeyLengthConsistency(suffixAndExtIn.available());
return deserializeSuffix(suffixAndExtIn);
});
}
@Override
public Stream<Entry<T, U>> getAllValues(@Nullable CompositeSnapshot snapshot, boolean smallRange) {
return getAllValues(snapshot, range, false, smallRange);
public Stream<Entry<T, U>> getAllEntries(@Nullable CompositeSnapshot snapshot, boolean smallRange) {
return getAllEntries(snapshot, smallRange, Map::entry);
}
@Override
public Stream<U> getAllValues(@Nullable CompositeSnapshot snapshot, boolean smallRange) {
return getAllEntries(snapshot, range, false, smallRange, (k, v) -> v);
}
@Override
public Stream<T> getAllKeys(@Nullable CompositeSnapshot snapshot, boolean smallRange) {
return getAllKeys(snapshot, range, false, smallRange);
}
/**
* Get all values
* @param reverse if true, the results will go backwards from the specified key (inclusive)
*/
public Stream<Entry<T, U>> getAllValues(@Nullable CompositeSnapshot snapshot,
public Stream<Entry<T, U>> getAllEntries(@Nullable CompositeSnapshot snapshot,
@Nullable T keyMin,
@Nullable T keyMax,
boolean reverse,
boolean smallRange) {
return getAllEntries(snapshot, keyMin, keyMax, reverse, smallRange, Map::entry);
}
/**
* Get all values
* @param reverse if true, the results will go backwards from the specified key (inclusive)
*/
public <X> Stream<X> getAllEntries(@Nullable CompositeSnapshot snapshot,
@Nullable T keyMin,
@Nullable T keyMax,
boolean reverse,
boolean smallRange,
BiFunction<T, U, X> mapper) {
if (keyMin == null && keyMax == null) {
return getAllValues(snapshot, smallRange);
return getAllEntries(snapshot, smallRange, mapper);
} else {
LLRange boundedRange = getPatchedRange(range, keyMin, keyMax);
return getAllValues(snapshot, boundedRange, reverse, smallRange);
return getAllEntries(snapshot, boundedRange, reverse, smallRange, mapper);
}
}
private Stream<Entry<T, U>> getAllValues(@Nullable CompositeSnapshot snapshot,
private <X> Stream<X> getAllEntries(@Nullable CompositeSnapshot snapshot, boolean smallRange, BiFunction<T, U, X> mapper) {
return getAllEntries(snapshot, range, false, smallRange, mapper);
}
private <X> Stream<X> getAllEntries(@Nullable CompositeSnapshot snapshot,
LLRange sliceRangeMono,
boolean reverse, boolean smallRange) {
boolean reverse,
boolean smallRange,
BiFunction<T, U, X> mapper) {
return dictionary
.getRange(resolveSnapshot(snapshot), sliceRangeMono, reverse, smallRange)
.map((serializedEntry) -> {
Entry<T, U> entry;
X entry;
var keyBuf = serializedEntry.getKey();
assert keyBuf != null;
assert keyBuf.size() == keyPrefixLength + keySuffixLength + keyExtLength;
@ -488,14 +530,14 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
assert serializedEntry.getValue() != null;
U value = valueSerializer.deserialize(BufDataInput.create(serializedEntry.getValue()));
entry = Map.entry(keySuffix, value);
entry = mapper.apply(keySuffix, value);
return entry;
});
}
@Override
public Stream<Entry<T, U>> setAllValuesAndGetPrevious(Stream<Entry<T, U>> entries) {
return getAllValues(null, false)
public Stream<Entry<T, U>> setAllEntriesAndGetPrevious(Stream<Entry<T, U>> entries) {
return getAllEntries(null, false)
.onClose(() -> dictionary.setRange(range, entries.map(entry -> serializeEntry(entry)), false));
}

View File

@ -18,7 +18,6 @@ import it.cavallium.dbengine.database.serialization.Serializer;
import it.cavallium.dbengine.database.serialization.SerializerFixedBinaryLength;
import it.unimi.dsi.fastutil.objects.Object2ObjectSortedMap;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletionException;
@ -298,14 +297,14 @@ public class DatabaseMapDictionaryDeep<T, U, US extends DatabaseStage<U>> implem
}
@Override
public void setAllValues(Stream<Entry<T, U>> entries) {
public void setAllEntries(Stream<Entry<T, U>> entries) {
this.clear();
this.putMulti(entries);
}
@Override
public Stream<Entry<T, U>> setAllValuesAndGetPrevious(Stream<Entry<T, U>> entries) {
return this.getAllValues(null, false).onClose(() -> setAllValues(entries));
public Stream<Entry<T, U>> setAllEntriesAndGetPrevious(Stream<Entry<T, U>> entries) {
return this.getAllEntries(null, false).onClose(() -> setAllEntries(entries));
}
@Override

View File

@ -170,7 +170,7 @@ public class DatabaseMapDictionaryHashed<T, U, TH> implements DatabaseStageMap<T
public Stream<SubStageEntry<T, DatabaseStageEntry<U>>> getAllStages(@Nullable CompositeSnapshot snapshot,
boolean smallRange) {
return subDictionary
.getAllValues(snapshot, smallRange)
.getAllEntries(snapshot, smallRange)
.map(Entry::getValue)
.map(Collections::unmodifiableSet)
.flatMap(bucket -> bucket.stream()
@ -179,16 +179,26 @@ public class DatabaseMapDictionaryHashed<T, U, TH> implements DatabaseStageMap<T
}
@Override
public Stream<Entry<T, U>> getAllValues(@Nullable CompositeSnapshot snapshot, boolean smallRange) {
public Stream<Entry<T, U>> getAllEntries(@Nullable CompositeSnapshot snapshot, boolean smallRange) {
return subDictionary
.getAllValues(snapshot, smallRange)
.getAllEntries(snapshot, smallRange)
.map(Entry::getValue)
.map(Collections::unmodifiableSet)
.flatMap(Collection::stream);
}
@Override
public Stream<Entry<T, U>> setAllValuesAndGetPrevious(Stream<Entry<T, U>> entries) {
public Stream<T> getAllKeys(@Nullable CompositeSnapshot snapshot, boolean smallRange) {
return getAllEntries(snapshot, smallRange).map(Entry::getKey);
}
@Override
public Stream<U> getAllValues(@Nullable CompositeSnapshot snapshot, boolean smallRange) {
return getAllEntries(snapshot, smallRange).map(Entry::getValue);
}
@Override
public Stream<Entry<T, U>> setAllEntriesAndGetPrevious(Stream<Entry<T, U>> entries) {
List<Entry<T, U>> prevList = entries.map(entry -> {
var prev = this.at(null, entry.getKey()).setAndGetPrevious(entry.getValue());
if (prev != null) {

View File

@ -4,18 +4,15 @@ import static it.cavallium.dbengine.utils.StreamUtils.ROCKSDB_SCHEDULER;
import static it.cavallium.dbengine.utils.StreamUtils.collectOn;
import static it.cavallium.dbengine.utils.StreamUtils.count;
import static it.cavallium.dbengine.utils.StreamUtils.executing;
import static it.cavallium.dbengine.utils.StreamUtils.iterating;
import it.cavallium.dbengine.client.CompositeSnapshot;
import it.cavallium.dbengine.database.Delta;
import it.cavallium.dbengine.database.LLLuceneIndex;
import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.SubStageEntry;
import it.cavallium.dbengine.database.UpdateMode;
import it.cavallium.dbengine.database.UpdateReturnMode;
import it.cavallium.dbengine.database.serialization.KVSerializationFunction;
import it.cavallium.dbengine.database.serialization.SerializationFunction;
import it.cavallium.dbengine.utils.StreamUtils;
import it.unimi.dsi.fastutil.objects.Object2ObjectLinkedOpenHashMap;
import it.unimi.dsi.fastutil.objects.Object2ObjectSortedMap;
import it.unimi.dsi.fastutil.objects.Object2ObjectSortedMaps;
@ -23,6 +20,7 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Optional;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
@ -112,33 +110,48 @@ public interface DatabaseStageMap<T, U, US extends DatabaseStage<U>> extends Dat
Stream<SubStageEntry<T, US>> getAllStages(@Nullable CompositeSnapshot snapshot, boolean smallRange);
default Stream<Entry<T, U>> getAllValues(@Nullable CompositeSnapshot snapshot, boolean smallRange) {
default Stream<Entry<T, U>> getAllEntries(@Nullable CompositeSnapshot snapshot,
boolean smallRange) {
return this.getAllStages(snapshot, smallRange).map(stage -> {
var val = stage.getValue().get(snapshot);
return val != null ? Map.entry(stage.getKey(), val) : null;
}).filter(Objects::nonNull);
}
default void setAllValues(Stream<Entry<T, U>> entries) {
setAllValuesAndGetPrevious(entries).close();
default Stream<T> getAllKeys(@Nullable CompositeSnapshot snapshot, boolean smallRange) {
return this
.getAllStages(snapshot, smallRange)
.map(SubStageEntry::getKey)
.filter(Objects::nonNull);
}
Stream<Entry<T, U>> setAllValuesAndGetPrevious(Stream<Entry<T, U>> entries);
default Stream<U> getAllValues(@Nullable CompositeSnapshot snapshot, boolean smallRange) {
return this
.getAllEntries(snapshot, smallRange)
.map(Entry::getValue)
.filter(Objects::nonNull);
}
default void setAllEntries(Stream<Entry<T, U>> entries) {
setAllEntriesAndGetPrevious(entries).close();
}
Stream<Entry<T, U>> setAllEntriesAndGetPrevious(Stream<Entry<T, U>> entries);
default void clear() {
setAllValues(Stream.empty());
setAllEntries(Stream.empty());
}
default void replaceAllValues(boolean canKeysChange,
default void replaceAllEntries(boolean canKeysChange,
Function<Entry<T, U>, @NotNull Entry<T, U>> entriesReplacer,
boolean smallRange) {
if (canKeysChange) {
try (var values = this.getAllValues(null, smallRange)) {
this.setAllValues(values.map(entriesReplacer));
try (var entries = this.getAllEntries(null, smallRange)) {
this.setAllEntries(entries.map(entriesReplacer));
}
} else {
collectOn(ROCKSDB_SCHEDULER,
this.getAllValues(null, smallRange).map(entriesReplacer),
this.getAllEntries(null, smallRange).map(entriesReplacer),
executing(replacedEntry -> this.at(null, replacedEntry.getKey()).set(replacedEntry.getValue()))
);
}
@ -154,7 +167,7 @@ public interface DatabaseStageMap<T, U, US extends DatabaseStage<U>> extends Dat
if (value == null) {
map = this.clearAndGetPrevious();
} else {
try (var stream = this.setAllValuesAndGetPrevious(value.entrySet().stream())) {
try (var stream = this.setAllEntriesAndGetPrevious(value.entrySet().stream())) {
map = stream.collect(Collectors.toMap(Entry::getKey,
Entry::getValue,
(a, b) -> a,
@ -185,7 +198,7 @@ public interface DatabaseStageMap<T, U, US extends DatabaseStage<U>> extends Dat
if (updateMode == UpdateMode.ALLOW_UNSAFE) {
Object2ObjectSortedMap<T, U> v;
try (var stream = this.getAllValues(null, true)) {
try (var stream = this.getAllEntries(null, true)) {
v = stream
.collect(Collectors.toMap(Entry::getKey, Entry::getValue, (a, b) -> a, Object2ObjectLinkedOpenHashMap::new));
}
@ -198,7 +211,7 @@ public interface DatabaseStageMap<T, U, US extends DatabaseStage<U>> extends Dat
if (result != null && result.isEmpty()) {
result = null;
}
this.setAllValues(result != null ? result.entrySet().stream() : null);
this.setAllEntries(result != null ? result.entrySet().stream() : null);
return new Delta<>(v, result);
} else if (updateMode == UpdateMode.ALLOW) {
throw new UnsupportedOperationException("Maps can't be updated atomically");
@ -216,7 +229,7 @@ public interface DatabaseStageMap<T, U, US extends DatabaseStage<U>> extends Dat
@Override
default Object2ObjectSortedMap<T, U> get(@Nullable CompositeSnapshot snapshot) {
try (var stream = this.getAllValues(snapshot, true)) {
try (var stream = this.getAllEntries(snapshot, true)) {
Object2ObjectSortedMap<T, U> map = stream
.collect(Collectors.toMap(Entry::getKey, Entry::getValue, (a, b) -> a, Object2ObjectLinkedOpenHashMap::new));
return map.isEmpty() ? null : map;

View File

@ -10,7 +10,6 @@ import static org.rocksdb.ColumnFamilyOptionsInterface.DEFAULT_COMPACTION_MEMTAB
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.Timer;
import io.netty.util.internal.PlatformDependent;
import it.cavallium.data.generator.nativedata.NullableString;
import it.cavallium.dbengine.client.Backuppable;
import it.cavallium.dbengine.client.MemoryStats;
@ -153,12 +152,6 @@ public class LLLocalKeyValueDatabase extends Backuppable implements LLKeyValueDa
.tags("db.name", name)
.register(meterRegistry);
if (!PlatformDependent.hasUnsafe()) {
throw new UnsupportedOperationException("Please enable unsafe support or disable netty direct buffers",
PlatformDependent.getUnsafeUnavailabilityCause()
);
}
this.enableColumnsBug = "true".equals(databaseOptions.extraFlags().getOrDefault("enableColumnBug", "false"));
if (!enableColumnsBug) {
@ -607,7 +600,7 @@ public class LLLocalKeyValueDatabase extends Backuppable implements LLKeyValueDa
}
}
public void forceCompaction(int volumeId) throws RocksDBException {
public void forceCompaction(int volumeId) {
var closeReadLock = closeLock.readLock();
try {
ensureOpen();
@ -615,6 +608,8 @@ public class LLLocalKeyValueDatabase extends Backuppable implements LLKeyValueDa
ensureOwned(cfh);
RocksDBUtils.forceCompaction(db, name, cfh, volumeId, logger);
}
} catch (RocksDBException e) {
throw new DBException("Failed to force compaction", e);
} finally {
closeLock.unlockRead(closeReadLock);
}
@ -1472,7 +1467,7 @@ public class LLLocalKeyValueDatabase extends Backuppable implements LLKeyValueDa
}
@Override
public void compact() throws RocksDBException {
public void compact() {
this.forceCompaction(getLastVolumeId());
}

View File

@ -1,41 +0,0 @@
package it.cavallium.dbengine.database.remote;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufInputStream;
import io.netty.buffer.ByteBufOutputStream;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageCodec;
import it.cavallium.dbengine.rpc.current.data.BoxedRPCEvent;
import it.cavallium.dbengine.rpc.current.data.RPCEvent;
import it.cavallium.dbengine.rpc.current.serializers.BoxedRPCEventSerializer;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.util.List;
public class RPCCodecs {
public static class RPCEventCodec extends ByteToMessageCodec<RPCEvent> {
public static final ChannelHandler INSTANCE = new RPCEventCodec();
public static final BoxedRPCEventSerializer SERIALIZER_INSTANCE = new BoxedRPCEventSerializer();
@Override
protected void encode(ChannelHandlerContext ctx, RPCEvent msg, ByteBuf out) throws Exception {
try (var bbos = new ByteBufOutputStream(out)) {
try (var dos = new DataOutputStream(bbos)) {
SERIALIZER_INSTANCE.serialize(dos, BoxedRPCEvent.of(msg));
}
}
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
try (var bbis = new ByteBufInputStream(msg)) {
try (var dis = new DataInputStream(bbis)) {
out.add(SERIALIZER_INSTANCE.deserialize(dis).val());
}
}
}
}
}

View File

@ -1,10 +0,0 @@
package it.cavallium.dbengine.database.remote;
import org.jetbrains.annotations.Nullable;
public class RPCException extends RuntimeException {
public RPCException(int code, @Nullable String message) {
super("RPC error " + code + (message != null ? (": " + message) : ""));
}
}

View File

@ -12,6 +12,8 @@ import java.util.Comparator;
import java.util.EnumSet;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.Spliterator;
import java.util.concurrent.ExecutionException;
@ -22,6 +24,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.BinaryOperator;
import java.util.function.Consumer;
import java.util.function.Function;
@ -30,6 +33,7 @@ import java.util.function.ToIntFunction;
import java.util.function.ToLongFunction;
import java.util.stream.Collector;
import java.util.stream.Collector.Characteristics;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.jetbrains.annotations.NotNull;
@ -43,6 +47,8 @@ public class StreamUtils {
private static final Collector<?, ?, ?> TO_LIST_FAKE_COLLECTOR = new FakeCollector();
private static final Collector<?, ?, ?> COUNT_FAKE_COLLECTOR = new FakeCollector();
private static final Collector<?, ?, ?> FIRST_FAKE_COLLECTOR = new FakeCollector();
private static final Collector<?, ?, ?> ANY_FAKE_COLLECTOR = new FakeCollector();
private static final Set<Collector.Characteristics> CH_NOID = Collections.emptySet();
private static final Set<Characteristics> CH_CONCURRENT_NOID = Collections.unmodifiableSet(EnumSet.of(
@ -68,12 +74,22 @@ public class StreamUtils {
public static <T> Collector<T, ?, @NotNull List<T>> fastListing() {
//noinspection unchecked
return (Collector<T, ?, List<T>>) TO_LIST_FAKE_COLLECTOR;
return (Collector<T, ?, @NotNull List<T>>) TO_LIST_FAKE_COLLECTOR;
}
public static <T> Collector<T, ?, @NotNull Long> fastCounting() {
//noinspection unchecked
return (Collector<T, ?, Long>) COUNT_FAKE_COLLECTOR;
return (Collector<T, ?, @NotNull Long>) COUNT_FAKE_COLLECTOR;
}
public static <T> Collector<T, ?, @NotNull Optional<T>> fastFirst() {
//noinspection unchecked
return (Collector<T, ?, @NotNull Optional<T>>) FIRST_FAKE_COLLECTOR;
}
public static <T> Collector<T, ?, @NotNull Optional<T>> fastAny() {
//noinspection unchecked
return (Collector<T, ?, @NotNull Optional<T>>) ANY_FAKE_COLLECTOR;
}
@SafeVarargs
@ -141,8 +157,22 @@ public class StreamUtils {
return Streams.stream(it);
}
@SuppressWarnings("DataFlowIssue")
@NotNull
public static <X> List<X> toList(Stream<X> stream) {
return collect(stream, fastListing());
return StreamUtils.collect(stream, fastListing());
}
@SuppressWarnings("DataFlowIssue")
@NotNull
public static <X> Optional<X> toFirst(Stream<X> stream) {
return StreamUtils.collect(stream, fastFirst());
}
@SuppressWarnings("DataFlowIssue")
@NotNull
public static <X> Optional<X> toAny(Stream<X> stream) {
return StreamUtils.collect(stream, fastAny());
}
@SuppressWarnings("DataFlowIssue")
@ -158,6 +188,16 @@ public class StreamUtils {
return collectOn(forkJoinPool, stream, fastCounting());
}
@NotNull
public static <X> Optional<X> toFirstOn(ForkJoinPool forkJoinPool, Stream<X> stream) {
return collectOn(forkJoinPool, stream, fastFirst());
}
@NotNull
public static <X> Optional<X> toAnyOn(ForkJoinPool forkJoinPool, Stream<X> stream) {
return collectOn(forkJoinPool, stream, fastAny());
}
/**
* Collects and closes the stream on the specified pool
*/
@ -196,6 +236,18 @@ public class StreamUtils {
} else {
return (R) (Long) 0L;
}
} else if (collector == FIRST_FAKE_COLLECTOR) {
if (stream != null) {
return (R) stream.findFirst();
} else {
return (R) Optional.empty();
}
} else if (collector == ANY_FAKE_COLLECTOR) {
if (stream != null) {
return (R) stream.findAny();
} else {
return (R) Optional.empty();
}
} else if (stream == null) {
throw new NullPointerException("Stream is null");
} else if (collector == SUMMING_LONG_COLLECTOR) {
@ -253,6 +305,10 @@ public class StreamUtils {
return SUMMING_LONG_COLLECTOR;
}
public static <X, Y> Stream<Y> indexed(Stream<X> stream, BiFunction<X, Long, Y> mapper) {
return Streams.mapWithIndex(stream, mapper::apply);
}
private record BatchSpliterator<E>(Spliterator<E> base, int batchSize) implements Spliterator<List<E>> {
@Override

View File

@ -37,15 +37,10 @@ module dbengine {
requires org.apache.lucene.codecs;
requires org.apache.lucene.backward_codecs;
requires lucene.relevance;
requires io.netty.buffer;
requires io.netty.transport;
requires io.netty.codec;
requires org.apache.lucene.facet;
requires java.management;
requires com.ibm.icu;
requires org.apache.lucene.analysis.icu;
requires io.netty.handler;
requires io.netty.common;
requires org.apache.lucene.queryparser;
requires okio;
requires moshi.records.reflect;

View File

@ -2,8 +2,6 @@ package it.cavallium.dbengine.tests;
import static org.junit.jupiter.api.Assertions.assertEquals;
import io.netty.util.ResourceLeakDetector;
import io.netty.util.ResourceLeakDetector.Level;
import it.cavallium.dbengine.buffers.BufDataInput;
import it.cavallium.dbengine.buffers.BufDataOutput;
import it.cavallium.dbengine.client.LuceneIndex;
@ -79,8 +77,7 @@ public class DbTestUtils {
SwappableLuceneSearcher swappableLuceneSearcher,
Path path) {}
public static void ensureNoLeaks(boolean printStats, boolean useClassicException) {
ResourceLeakDetector.setLevel(Level.PARANOID);
public static void ensureNoLeaks() {
System.gc();
}

View File

@ -102,7 +102,7 @@ public class LocalTemporaryDbGenerator implements TemporaryDbGenerator {
tempDb.swappableLuceneSearcher().close();
tempDb.luceneMulti().close();
tempDb.luceneSingle().close();
ensureNoLeaks(false, false);
ensureNoLeaks();
if (Files.exists(tempDb.path())) {
try (var walk = Files.walk(tempDb.path())) {
walk.sorted(Comparator.reverseOrder()).forEach(file -> {

View File

@ -25,13 +25,13 @@ public abstract class TestDictionary {
@BeforeEach
public void beforeEach() {
ensureNoLeaks(false, false);
ensureNoLeaks();
}
@AfterEach
public void afterEach() {
if (!isCIMode()) {
ensureNoLeaks(true, false);
ensureNoLeaks();
}
}

View File

@ -88,13 +88,13 @@ public abstract class TestDictionaryMap {
@BeforeEach
public void beforeEach() {
ensureNoLeaks(false, false);
ensureNoLeaks();
}
@AfterEach
public void afterEach() {
if (!isCIMode() && checkLeaks) {
ensureNoLeaks(true, false);
ensureNoLeaks();
}
}
@ -368,7 +368,7 @@ public abstract class TestDictionaryMap {
var entriesFlux = entries.entrySet();
var keysFlux = entriesFlux.stream().map(Entry::getKey).toList();
map.setAllValues(entriesFlux.stream());
map.setAllEntries(entriesFlux.stream());
List<Optional<String>> resultsFlux;
try (var stream = map.getMulti(null, keysFlux.stream())) {
resultsFlux = stream.toList();
@ -396,8 +396,8 @@ public abstract class TestDictionaryMap {
var remainingEntries = new ArrayList<Entry<String, String>>();
var stpVer = run(shouldFail, () -> tempDb(getTempDbGenerator(), db -> {
var map = tempDatabaseMapDictionaryMap(tempDictionary(db, updateMode), mapType, 5);
return Arrays.asList(toList(map.setAllValuesAndGetPrevious(entries.entrySet().stream())),
toList(map.setAllValuesAndGetPrevious(entries.entrySet().stream()))
return Arrays.asList(toList(map.setAllEntriesAndGetPrevious(entries.entrySet().stream())),
toList(map.setAllEntriesAndGetPrevious(entries.entrySet().stream()))
);
}));
if (shouldFail) {
@ -499,7 +499,7 @@ public abstract class TestDictionaryMap {
var stpVer = run(shouldFail, () -> tempDb(getTempDbGenerator(), db -> {
var map = tempDatabaseMapDictionaryMap(tempDictionary(db, updateMode), mapType, 5);
map.putMulti(entries.entrySet().stream());
return toList(map.getAllValues(null, false));
return toList(map.getAllEntries(null, false));
}));
if (shouldFail) {
this.checkLeaks = false;

View File

@ -158,13 +158,13 @@ public abstract class TestDictionaryMapDeep {
@BeforeEach
public void beforeEach() {
ensureNoLeaks(false, false);
ensureNoLeaks();
}
@AfterEach
public void afterEach() {
if (!isCIMode() && checkLeaks) {
ensureNoLeaks(true, false);
ensureNoLeaks();
}
}
@ -220,7 +220,7 @@ public abstract class TestDictionaryMapDeep {
var result = run(shouldFail, () -> tempDb(getTempDbGenerator(), db -> {
var map = tempDatabaseMapDictionaryDeepMap(tempDictionary(db, updateMode), 5, 6);
map.putValue(key, value);
try (var stream = map.getAllValues(null, false)) {
try (var stream = map.getAllEntries(null, false)) {
return stream.toList();
}
}));
@ -249,7 +249,7 @@ public abstract class TestDictionaryMapDeep {
return stages
.flatMap(stage -> stage
.getValue()
.getAllValues(null, false)
.getAllEntries(null, false)
.map(r -> new Tuple3<>(stage.getKey(), r.getKey(), r.getValue())))
.toList();
}
@ -626,7 +626,7 @@ public abstract class TestDictionaryMapDeep {
var map = tempDatabaseMapDictionaryDeepMap(tempDictionary(db, updateMode), 5, 6);
var entriesFlux = entries.entrySet();
var keysFlux = entriesFlux.stream().map(Entry::getKey).toList();
map.setAllValues(entries.entrySet().stream());
map.setAllEntries(entries.entrySet().stream());
try (var resultsFlux = map.getMulti(null, entries.keySet().stream())) {
return Streams
.zip(keysFlux.stream(), resultsFlux, Map::entry)
@ -652,11 +652,11 @@ public abstract class TestDictionaryMapDeep {
var stpVer = run(shouldFail, () -> tempDb(getTempDbGenerator(), db -> {
var map = tempDatabaseMapDictionaryDeepMap(tempDictionary(db, updateMode), 5, 6);
List<Entry<String, Object2ObjectSortedMap<String, String>>> a1;
try (var stream1 = map.setAllValuesAndGetPrevious(entries.entrySet().stream())) {
try (var stream1 = map.setAllEntriesAndGetPrevious(entries.entrySet().stream())) {
a1 = stream1.toList();
}
List<Entry<String, Object2ObjectSortedMap<String, String>>> a2;
try (var stream2 = map.setAllValuesAndGetPrevious(entries.entrySet().stream())) {
try (var stream2 = map.setAllEntriesAndGetPrevious(entries.entrySet().stream())) {
a2 = stream2.toList();
}
return List.of(a1, a2);
@ -769,7 +769,7 @@ public abstract class TestDictionaryMapDeep {
var stpVer = run(shouldFail, () -> tempDb(getTempDbGenerator(), db -> {
var map = tempDatabaseMapDictionaryDeepMap(tempDictionary(db, updateMode), 5, 6);
map.putMulti(entries.entrySet().stream());
try (var values = map.getAllValues(null, false)) {
try (var values = map.getAllEntries(null, false)) {
return values.toList();
}
}));

View File

@ -88,13 +88,13 @@ public abstract class TestDictionaryMapDeepHashMap {
@BeforeEach
public void beforeEach() {
ensureNoLeaks(false, false);
ensureNoLeaks();
}
@AfterEach
public void afterEach() {
if (!isCIMode() && checkLeaks) {
ensureNoLeaks(true, false);
ensureNoLeaks();
}
}
@ -105,7 +105,7 @@ public abstract class TestDictionaryMapDeepHashMap {
var map = tempDatabaseMapDictionaryDeepMapHashMap(tempDictionary(db, updateMode), 5);
map.at(null, key1).putValue(key2, value);
return toList(map
.getAllValues(null, false)
.getAllEntries(null, false)
.map(Entry::getValue)
.flatMap(maps -> maps.entrySet().stream())
.map(Entry::getValue));

View File

@ -39,7 +39,7 @@ public abstract class TestLLDictionary {
@BeforeEach
public void beforeEach() throws IOException {
ensureNoLeaks(false, false);
ensureNoLeaks();
tempDb = Objects.requireNonNull(getTempDbGenerator().openTempDb(), "TempDB");
db = tempDb.db();
}
@ -47,7 +47,7 @@ public abstract class TestLLDictionary {
@AfterEach
public void afterEach() throws IOException {
getTempDbGenerator().closeTempDb(tempDb);
ensureNoLeaks(true, false);
ensureNoLeaks();
}
public static Stream<Arguments> provideArguments() {

View File

@ -34,7 +34,7 @@ public abstract class TestLLDictionaryLeaks {
@BeforeEach
public void beforeEach() throws IOException {
ensureNoLeaks(false, false);
ensureNoLeaks();
tempDb = Objects.requireNonNull(getTempDbGenerator().openTempDb(), "TempDB");
db = tempDb.db();
}
@ -42,7 +42,7 @@ public abstract class TestLLDictionaryLeaks {
@AfterEach
public void afterEach() throws IOException {
getTempDbGenerator().closeTempDb(tempDb);
ensureNoLeaks(true, false);
ensureNoLeaks();
}
public static Stream<Arguments> provideArguments() {

View File

@ -52,7 +52,7 @@ public class TestLuceneIndex {
@BeforeEach
public void beforeEach() throws IOException {
ensureNoLeaks(false, false);
ensureNoLeaks();
tempDb = Objects.requireNonNull(getTempDbGenerator().openTempDb(), "TempDB");
luceneSingle = tempDb.luceneSingle();
luceneMulti = tempDb.luceneMulti();
@ -123,7 +123,7 @@ public class TestLuceneIndex {
@AfterEach
public void afterEach() throws IOException {
getTempDbGenerator().closeTempDb(tempDb);
ensureNoLeaks(true, false);
ensureNoLeaks();
}
@AfterAll

View File

@ -5,7 +5,6 @@ import static it.cavallium.dbengine.tests.DbTestUtils.ensureNoLeaks;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.fail;
import io.netty.buffer.PooledByteBufAllocator;
import it.cavallium.dbengine.tests.DbTestUtils.TempDb;
import it.cavallium.dbengine.tests.TestLuceneIndex.Tuple2;
import it.cavallium.dbengine.client.HitKey;
@ -71,9 +70,6 @@ public class TestLuceneSearches {
private static final Map<String, String> ELEMENTS;
static {
// Start the pool by creating and deleting a direct buffer
PooledByteBufAllocator.DEFAULT.directBuffer().release();
var modifiableElements = new LinkedHashMap<String, String>();
modifiableElements.put("test-key-1", "0123456789");
modifiableElements.put("test-key-2", "test 0123456789 test word");
@ -96,7 +92,7 @@ public class TestLuceneSearches {
@BeforeAll
public static void beforeAll() throws IOException {
ensureNoLeaks(false, false);
ensureNoLeaks();
tempDb = Objects.requireNonNull(TEMP_DB_GENERATOR.openTempDb(), "TempDB");
luceneSingle = tempDb.luceneSingle();
luceneMulti = tempDb.luceneMulti();
@ -190,7 +186,7 @@ public class TestLuceneSearches {
@AfterAll
public static void afterAll() throws IOException {
TEMP_DB_GENERATOR.closeTempDb(tempDb);
ensureNoLeaks(true, false);
ensureNoLeaks();
}
private LuceneIndex<String, String> getLuceneIndex(boolean shards, @Nullable LocalSearcher customSearcher) {

View File

@ -43,12 +43,12 @@ public abstract class TestSingletons {
@BeforeEach
public void beforeEach() {
ensureNoLeaks(false, false);
ensureNoLeaks();
}
@AfterEach
public void afterEach() {
ensureNoLeaks(true, false);
ensureNoLeaks();
}
@Test

View File

@ -6,13 +6,11 @@ module dbengine.tests {
requires org.apache.lucene.core;
requires it.unimi.dsi.fastutil;
requires org.apache.lucene.queryparser;
requires io.netty.common;
requires org.jetbrains.annotations;
requires micrometer.core;
requires org.junit.jupiter.params;
requires com.google.common;
requires org.apache.logging.log4j;
requires io.netty.buffer;
requires org.apache.commons.lang3;
requires rocksdbjni;
opens it.cavallium.dbengine.tests;