diff --git a/pom.xml b/pom.xml index d429d33..80c3923 100644 --- a/pom.xml +++ b/pom.xml @@ -16,7 +16,7 @@ 9.5.0 7.9.2 5.9.0 - 1.0.244 + 1.0.249 @@ -40,17 +40,6 @@ true - - netty5-snapshots - Netty 5 snapshots - https://oss.sonatype.org/content/repositories/snapshots - - true - - - true - - apache.snapshots Apache Snapshot Repository @@ -97,13 +86,6 @@ hamcrest-library 2.2 - - io.netty - netty-bom - 4.1.86.Final - pom - import - @@ -112,22 +94,6 @@ guava 31.1-jre - - io.netty - netty-buffer - - - io.netty - netty-transport - - - io.netty - netty-codec - - - io.netty - netty-handler - org.yaml snakeyaml @@ -217,7 +183,7 @@ org.slf4j slf4j-api - 2.0.3 + 2.0.5 org.apache.logging.log4j diff --git a/src/example/java/it.cavallium.dbengine.client/CodecsExample.java b/src/example/java/it.cavallium.dbengine.client/CodecsExample.java index 62924ad..48fb563 100644 --- a/src/example/java/it.cavallium.dbengine.client/CodecsExample.java +++ b/src/example/java/it.cavallium.dbengine.client/CodecsExample.java @@ -80,7 +80,7 @@ public class CodecsExample { .then(), SpeedExample.numRepeats, tuple -> tuple.getT1().close() - )).transform(LLUtils::handleDiscard).subscribeOn(Schedulers.parallel()).blockOptional(); + )).subscribeOn(Schedulers.parallel()).blockOptional(); } private static void testConversion() { @@ -88,7 +88,6 @@ public class CodecsExample { .then() .then(readNew()) .subscribeOn(Schedulers.parallel()) - .transform(LLUtils::handleDiscard) .blockOptional(); } diff --git a/src/example/java/it.cavallium.dbengine.client/IndicizationExample.java b/src/example/java/it.cavallium.dbengine.client/IndicizationExample.java index 3e525f8..b9ba410 100644 --- a/src/example/java/it.cavallium.dbengine.client/IndicizationExample.java +++ b/src/example/java/it.cavallium.dbengine.client/IndicizationExample.java @@ -62,7 +62,6 @@ public class IndicizationExample { .then(index.close()) ) .subscribeOn(Schedulers.parallel()) - .transform(LLUtils::handleDiscard) .block(); tempIndex(true) .flatMap(index -> @@ -139,7 +138,6 @@ public class IndicizationExample { .then(index.close()) ) .subscribeOn(Schedulers.parallel()) - .transform(LLUtils::handleDiscard) .block(); } diff --git a/src/example/java/it.cavallium.dbengine.client/SpeedExample.java b/src/example/java/it.cavallium.dbengine.client/SpeedExample.java index 525560c..34f7768 100644 --- a/src/example/java/it.cavallium.dbengine.client/SpeedExample.java +++ b/src/example/java/it.cavallium.dbengine.client/SpeedExample.java @@ -56,7 +56,6 @@ public class SpeedExample { .then(test3LevelPut()) .then(test4LevelPut()) .subscribeOn(Schedulers.parallel()) - .transform(LLUtils::handleDiscard) .blockOptional(); } diff --git a/src/main/java/it/cavallium/dbengine/client/ConnectionSettings.java b/src/main/java/it/cavallium/dbengine/client/ConnectionSettings.java index 3e56f06..59e96a5 100644 --- a/src/main/java/it/cavallium/dbengine/client/ConnectionSettings.java +++ b/src/main/java/it/cavallium/dbengine/client/ConnectionSettings.java @@ -17,9 +17,6 @@ public sealed interface ConnectionSettings { record LocalConnectionSettings(Path dataPath) implements PrimaryConnectionSettings, SubConnectionSettings {} - record QuicConnectionSettings(SocketAddress bindAddress, SocketAddress remoteAddress) implements - PrimaryConnectionSettings, SubConnectionSettings {} - record MultiConnectionSettings(Map parts) implements PrimaryConnectionSettings { diff --git a/src/main/java/it/cavallium/dbengine/client/HitEntry.java b/src/main/java/it/cavallium/dbengine/client/HitEntry.java index b03e1c4..0a88d10 100644 --- a/src/main/java/it/cavallium/dbengine/client/HitEntry.java +++ b/src/main/java/it/cavallium/dbengine/client/HitEntry.java @@ -1,7 +1,11 @@ package it.cavallium.dbengine.client; +import java.util.Map; +import java.util.Map.Entry; +import org.jetbrains.annotations.Contract; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; +import org.jetbrains.annotations.Unmodifiable; public record HitEntry(T key, @Nullable U value, float score) implements Comparable> { @@ -10,4 +14,13 @@ public record HitEntry(T key, @Nullable U value, float score) public int compareTo(@NotNull HitEntry o) { return Float.compare(o.score, this.score); } + + @Contract(pure = true) + public @Nullable @Unmodifiable Entry toEntry() { + if (value != null) { + return Map.entry(key, value); + } else { + return null; + } + } } diff --git a/src/main/java/it/cavallium/dbengine/database/LLKeyValueDatabase.java b/src/main/java/it/cavallium/dbengine/database/LLKeyValueDatabase.java index 3bd0b36..37c106f 100644 --- a/src/main/java/it/cavallium/dbengine/database/LLKeyValueDatabase.java +++ b/src/main/java/it/cavallium/dbengine/database/LLKeyValueDatabase.java @@ -56,7 +56,7 @@ public interface LLKeyValueDatabase extends LLSnapshottable, LLKeyValueDatabaseS void verifyChecksum(); - void compact() throws RocksDBException; + void compact(); void flush(); diff --git a/src/main/java/it/cavallium/dbengine/database/LLUtils.java b/src/main/java/it/cavallium/dbengine/database/LLUtils.java index fe825cc..a9cb47a 100644 --- a/src/main/java/it/cavallium/dbengine/database/LLUtils.java +++ b/src/main/java/it/cavallium/dbengine/database/LLUtils.java @@ -4,7 +4,6 @@ import static org.apache.commons.lang3.ArrayUtils.EMPTY_BYTE_ARRAY; import com.google.common.primitives.Ints; import com.google.common.primitives.Longs; -import io.netty.util.IllegalReferenceCountException; import it.cavallium.dbengine.buffers.Buf; import it.cavallium.dbengine.client.HitEntry; import it.cavallium.dbengine.client.HitKey; @@ -288,37 +287,25 @@ public class LLUtils { } public static String toStringSafe(byte @Nullable[] key) { - try { - if (key == null) { - return toString(key); - } else { - return "(released)"; - } - } catch (IllegalReferenceCountException ex) { + if (key == null) { + return toString(key); + } else { return "(released)"; } } public static String toStringSafe(@Nullable Buf key) { - try { - if (key == null) { - return toString(key); - } else { - return "(released)"; - } - } catch (IllegalReferenceCountException ex) { + if (key == null) { + return toString(key); + } else { return "(released)"; } } public static String toStringSafe(@Nullable LLRange range) { - try { - if (range == null) { - return toString(range); - } else { - return "(released)"; - } - } catch (IllegalReferenceCountException ex) { + if (range == null) { + return toString(range); + } else { return "(released)"; } } diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java index 3bfd2dd..ad64af9 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java @@ -1,9 +1,5 @@ package it.cavallium.dbengine.database.collections; -import static it.cavallium.dbengine.utils.StreamUtils.ROCKSDB_SCHEDULER; -import static it.cavallium.dbengine.utils.StreamUtils.collectOn; -import static it.cavallium.dbengine.utils.StreamUtils.fastListing; - import it.cavallium.dbengine.buffers.Buf; import it.cavallium.dbengine.buffers.BufDataInput; import it.cavallium.dbengine.buffers.BufDataOutput; @@ -29,12 +25,13 @@ import it.cavallium.dbengine.utils.StreamUtils; import it.unimi.dsi.fastutil.objects.Object2ObjectLinkedOpenHashMap; import it.unimi.dsi.fastutil.objects.Object2ObjectSortedMap; import it.unimi.dsi.fastutil.objects.Object2ObjectSortedMaps; -import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Objects; import java.util.Optional; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.BiFunction; +import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.logging.log4j.LogManager; @@ -82,14 +79,15 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep extends DatabaseMapDictionaryDeep a, Object2ObjectLinkedOpenHashMap::new) ); - return map.isEmpty() ? null : map; + return map == null || map.isEmpty() ? null : map; } @Override @@ -446,35 +444,79 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep getAllKeys(@Nullable CompositeSnapshot snapshot, + LLRange sliceRange, boolean reverse, boolean smallRange) { + return dictionary + .getRangeKeys(resolveSnapshot(snapshot), sliceRange, reverse, smallRange) + .map(keyBuf -> { + assert keyBuf.size() == keyPrefixLength + keySuffixLength + keyExtLength; + // Remove prefix. Keep only the suffix and the ext + var suffixAndExtIn = BufDataInput.create(keyBuf); + suffixAndExtIn.skipBytes(keyPrefixLength); + + suffixKeyLengthConsistency(suffixAndExtIn.available()); + return deserializeSuffix(suffixAndExtIn); + }); + } + @Override - public Stream> getAllValues(@Nullable CompositeSnapshot snapshot, boolean smallRange) { - return getAllValues(snapshot, range, false, smallRange); + public Stream> getAllEntries(@Nullable CompositeSnapshot snapshot, boolean smallRange) { + return getAllEntries(snapshot, smallRange, Map::entry); + } + + @Override + public Stream getAllValues(@Nullable CompositeSnapshot snapshot, boolean smallRange) { + return getAllEntries(snapshot, range, false, smallRange, (k, v) -> v); + } + + @Override + public Stream getAllKeys(@Nullable CompositeSnapshot snapshot, boolean smallRange) { + return getAllKeys(snapshot, range, false, smallRange); } /** * Get all values * @param reverse if true, the results will go backwards from the specified key (inclusive) */ - public Stream> getAllValues(@Nullable CompositeSnapshot snapshot, + public Stream> getAllEntries(@Nullable CompositeSnapshot snapshot, @Nullable T keyMin, @Nullable T keyMax, boolean reverse, boolean smallRange) { + return getAllEntries(snapshot, keyMin, keyMax, reverse, smallRange, Map::entry); + } + + /** + * Get all values + * @param reverse if true, the results will go backwards from the specified key (inclusive) + */ + public Stream getAllEntries(@Nullable CompositeSnapshot snapshot, + @Nullable T keyMin, + @Nullable T keyMax, + boolean reverse, + boolean smallRange, + BiFunction mapper) { if (keyMin == null && keyMax == null) { - return getAllValues(snapshot, smallRange); + return getAllEntries(snapshot, smallRange, mapper); } else { LLRange boundedRange = getPatchedRange(range, keyMin, keyMax); - return getAllValues(snapshot, boundedRange, reverse, smallRange); + return getAllEntries(snapshot, boundedRange, reverse, smallRange, mapper); } } - private Stream> getAllValues(@Nullable CompositeSnapshot snapshot, + private Stream getAllEntries(@Nullable CompositeSnapshot snapshot, boolean smallRange, BiFunction mapper) { + return getAllEntries(snapshot, range, false, smallRange, mapper); + } + + private Stream getAllEntries(@Nullable CompositeSnapshot snapshot, LLRange sliceRangeMono, - boolean reverse, boolean smallRange) { + boolean reverse, + boolean smallRange, + BiFunction mapper) { return dictionary .getRange(resolveSnapshot(snapshot), sliceRangeMono, reverse, smallRange) .map((serializedEntry) -> { - Entry entry; + X entry; var keyBuf = serializedEntry.getKey(); assert keyBuf != null; assert keyBuf.size() == keyPrefixLength + keySuffixLength + keyExtLength; @@ -488,14 +530,14 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep> setAllValuesAndGetPrevious(Stream> entries) { - return getAllValues(null, false) + public Stream> setAllEntriesAndGetPrevious(Stream> entries) { + return getAllEntries(null, false) .onClose(() -> dictionary.setRange(range, entries.map(entry -> serializeEntry(entry)), false)); } diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryDeep.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryDeep.java index fa321ee..6980d0b 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryDeep.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryDeep.java @@ -18,7 +18,6 @@ import it.cavallium.dbengine.database.serialization.Serializer; import it.cavallium.dbengine.database.serialization.SerializerFixedBinaryLength; import it.unimi.dsi.fastutil.objects.Object2ObjectSortedMap; import java.util.Map.Entry; -import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletionException; @@ -298,14 +297,14 @@ public class DatabaseMapDictionaryDeep> implem } @Override - public void setAllValues(Stream> entries) { + public void setAllEntries(Stream> entries) { this.clear(); this.putMulti(entries); } @Override - public Stream> setAllValuesAndGetPrevious(Stream> entries) { - return this.getAllValues(null, false).onClose(() -> setAllValues(entries)); + public Stream> setAllEntriesAndGetPrevious(Stream> entries) { + return this.getAllEntries(null, false).onClose(() -> setAllEntries(entries)); } @Override diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryHashed.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryHashed.java index 9cdc6a7..ef2f8c0 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryHashed.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryHashed.java @@ -170,7 +170,7 @@ public class DatabaseMapDictionaryHashed implements DatabaseStageMap>> getAllStages(@Nullable CompositeSnapshot snapshot, boolean smallRange) { return subDictionary - .getAllValues(snapshot, smallRange) + .getAllEntries(snapshot, smallRange) .map(Entry::getValue) .map(Collections::unmodifiableSet) .flatMap(bucket -> bucket.stream() @@ -179,16 +179,26 @@ public class DatabaseMapDictionaryHashed implements DatabaseStageMap> getAllValues(@Nullable CompositeSnapshot snapshot, boolean smallRange) { + public Stream> getAllEntries(@Nullable CompositeSnapshot snapshot, boolean smallRange) { return subDictionary - .getAllValues(snapshot, smallRange) + .getAllEntries(snapshot, smallRange) .map(Entry::getValue) .map(Collections::unmodifiableSet) .flatMap(Collection::stream); } @Override - public Stream> setAllValuesAndGetPrevious(Stream> entries) { + public Stream getAllKeys(@Nullable CompositeSnapshot snapshot, boolean smallRange) { + return getAllEntries(snapshot, smallRange).map(Entry::getKey); + } + + @Override + public Stream getAllValues(@Nullable CompositeSnapshot snapshot, boolean smallRange) { + return getAllEntries(snapshot, smallRange).map(Entry::getValue); + } + + @Override + public Stream> setAllEntriesAndGetPrevious(Stream> entries) { List> prevList = entries.map(entry -> { var prev = this.at(null, entry.getKey()).setAndGetPrevious(entry.getValue()); if (prev != null) { diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseStageMap.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseStageMap.java index 08cc843..81369df 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseStageMap.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseStageMap.java @@ -4,18 +4,15 @@ import static it.cavallium.dbengine.utils.StreamUtils.ROCKSDB_SCHEDULER; import static it.cavallium.dbengine.utils.StreamUtils.collectOn; import static it.cavallium.dbengine.utils.StreamUtils.count; import static it.cavallium.dbengine.utils.StreamUtils.executing; -import static it.cavallium.dbengine.utils.StreamUtils.iterating; import it.cavallium.dbengine.client.CompositeSnapshot; import it.cavallium.dbengine.database.Delta; -import it.cavallium.dbengine.database.LLLuceneIndex; import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.database.SubStageEntry; import it.cavallium.dbengine.database.UpdateMode; import it.cavallium.dbengine.database.UpdateReturnMode; import it.cavallium.dbengine.database.serialization.KVSerializationFunction; import it.cavallium.dbengine.database.serialization.SerializationFunction; -import it.cavallium.dbengine.utils.StreamUtils; import it.unimi.dsi.fastutil.objects.Object2ObjectLinkedOpenHashMap; import it.unimi.dsi.fastutil.objects.Object2ObjectSortedMap; import it.unimi.dsi.fastutil.objects.Object2ObjectSortedMaps; @@ -23,6 +20,7 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Objects; import java.util.Optional; +import java.util.function.BiFunction; import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; @@ -112,33 +110,48 @@ public interface DatabaseStageMap> extends Dat Stream> getAllStages(@Nullable CompositeSnapshot snapshot, boolean smallRange); - default Stream> getAllValues(@Nullable CompositeSnapshot snapshot, boolean smallRange) { + default Stream> getAllEntries(@Nullable CompositeSnapshot snapshot, + boolean smallRange) { return this.getAllStages(snapshot, smallRange).map(stage -> { var val = stage.getValue().get(snapshot); return val != null ? Map.entry(stage.getKey(), val) : null; }).filter(Objects::nonNull); } - default void setAllValues(Stream> entries) { - setAllValuesAndGetPrevious(entries).close(); + default Stream getAllKeys(@Nullable CompositeSnapshot snapshot, boolean smallRange) { + return this + .getAllStages(snapshot, smallRange) + .map(SubStageEntry::getKey) + .filter(Objects::nonNull); } - Stream> setAllValuesAndGetPrevious(Stream> entries); + default Stream getAllValues(@Nullable CompositeSnapshot snapshot, boolean smallRange) { + return this + .getAllEntries(snapshot, smallRange) + .map(Entry::getValue) + .filter(Objects::nonNull); + } + + default void setAllEntries(Stream> entries) { + setAllEntriesAndGetPrevious(entries).close(); + } + + Stream> setAllEntriesAndGetPrevious(Stream> entries); default void clear() { - setAllValues(Stream.empty()); + setAllEntries(Stream.empty()); } - default void replaceAllValues(boolean canKeysChange, + default void replaceAllEntries(boolean canKeysChange, Function, @NotNull Entry> entriesReplacer, boolean smallRange) { if (canKeysChange) { - try (var values = this.getAllValues(null, smallRange)) { - this.setAllValues(values.map(entriesReplacer)); + try (var entries = this.getAllEntries(null, smallRange)) { + this.setAllEntries(entries.map(entriesReplacer)); } } else { collectOn(ROCKSDB_SCHEDULER, - this.getAllValues(null, smallRange).map(entriesReplacer), + this.getAllEntries(null, smallRange).map(entriesReplacer), executing(replacedEntry -> this.at(null, replacedEntry.getKey()).set(replacedEntry.getValue())) ); } @@ -154,7 +167,7 @@ public interface DatabaseStageMap> extends Dat if (value == null) { map = this.clearAndGetPrevious(); } else { - try (var stream = this.setAllValuesAndGetPrevious(value.entrySet().stream())) { + try (var stream = this.setAllEntriesAndGetPrevious(value.entrySet().stream())) { map = stream.collect(Collectors.toMap(Entry::getKey, Entry::getValue, (a, b) -> a, @@ -185,7 +198,7 @@ public interface DatabaseStageMap> extends Dat if (updateMode == UpdateMode.ALLOW_UNSAFE) { Object2ObjectSortedMap v; - try (var stream = this.getAllValues(null, true)) { + try (var stream = this.getAllEntries(null, true)) { v = stream .collect(Collectors.toMap(Entry::getKey, Entry::getValue, (a, b) -> a, Object2ObjectLinkedOpenHashMap::new)); } @@ -198,7 +211,7 @@ public interface DatabaseStageMap> extends Dat if (result != null && result.isEmpty()) { result = null; } - this.setAllValues(result != null ? result.entrySet().stream() : null); + this.setAllEntries(result != null ? result.entrySet().stream() : null); return new Delta<>(v, result); } else if (updateMode == UpdateMode.ALLOW) { throw new UnsupportedOperationException("Maps can't be updated atomically"); @@ -216,7 +229,7 @@ public interface DatabaseStageMap> extends Dat @Override default Object2ObjectSortedMap get(@Nullable CompositeSnapshot snapshot) { - try (var stream = this.getAllValues(snapshot, true)) { + try (var stream = this.getAllEntries(snapshot, true)) { Object2ObjectSortedMap map = stream .collect(Collectors.toMap(Entry::getKey, Entry::getValue, (a, b) -> a, Object2ObjectLinkedOpenHashMap::new)); return map.isEmpty() ? null : map; diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java index 651c4de..f80e8ac 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java @@ -10,7 +10,6 @@ import static org.rocksdb.ColumnFamilyOptionsInterface.DEFAULT_COMPACTION_MEMTAB import io.micrometer.core.instrument.MeterRegistry; import io.micrometer.core.instrument.Tag; import io.micrometer.core.instrument.Timer; -import io.netty.util.internal.PlatformDependent; import it.cavallium.data.generator.nativedata.NullableString; import it.cavallium.dbengine.client.Backuppable; import it.cavallium.dbengine.client.MemoryStats; @@ -153,12 +152,6 @@ public class LLLocalKeyValueDatabase extends Backuppable implements LLKeyValueDa .tags("db.name", name) .register(meterRegistry); - if (!PlatformDependent.hasUnsafe()) { - throw new UnsupportedOperationException("Please enable unsafe support or disable netty direct buffers", - PlatformDependent.getUnsafeUnavailabilityCause() - ); - } - this.enableColumnsBug = "true".equals(databaseOptions.extraFlags().getOrDefault("enableColumnBug", "false")); if (!enableColumnsBug) { @@ -607,7 +600,7 @@ public class LLLocalKeyValueDatabase extends Backuppable implements LLKeyValueDa } } - public void forceCompaction(int volumeId) throws RocksDBException { + public void forceCompaction(int volumeId) { var closeReadLock = closeLock.readLock(); try { ensureOpen(); @@ -615,6 +608,8 @@ public class LLLocalKeyValueDatabase extends Backuppable implements LLKeyValueDa ensureOwned(cfh); RocksDBUtils.forceCompaction(db, name, cfh, volumeId, logger); } + } catch (RocksDBException e) { + throw new DBException("Failed to force compaction", e); } finally { closeLock.unlockRead(closeReadLock); } @@ -1472,7 +1467,7 @@ public class LLLocalKeyValueDatabase extends Backuppable implements LLKeyValueDa } @Override - public void compact() throws RocksDBException { + public void compact() { this.forceCompaction(getLastVolumeId()); } diff --git a/src/main/java/it/cavallium/dbengine/database/remote/RPCCodecs.java b/src/main/java/it/cavallium/dbengine/database/remote/RPCCodecs.java deleted file mode 100644 index 7bf4d54..0000000 --- a/src/main/java/it/cavallium/dbengine/database/remote/RPCCodecs.java +++ /dev/null @@ -1,41 +0,0 @@ -package it.cavallium.dbengine.database.remote; - -import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufInputStream; -import io.netty.buffer.ByteBufOutputStream; -import io.netty.channel.ChannelHandler; -import io.netty.channel.ChannelHandlerContext; -import io.netty.handler.codec.ByteToMessageCodec; -import it.cavallium.dbengine.rpc.current.data.BoxedRPCEvent; -import it.cavallium.dbengine.rpc.current.data.RPCEvent; -import it.cavallium.dbengine.rpc.current.serializers.BoxedRPCEventSerializer; -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.util.List; - -public class RPCCodecs { - - public static class RPCEventCodec extends ByteToMessageCodec { - - public static final ChannelHandler INSTANCE = new RPCEventCodec(); - public static final BoxedRPCEventSerializer SERIALIZER_INSTANCE = new BoxedRPCEventSerializer(); - - @Override - protected void encode(ChannelHandlerContext ctx, RPCEvent msg, ByteBuf out) throws Exception { - try (var bbos = new ByteBufOutputStream(out)) { - try (var dos = new DataOutputStream(bbos)) { - SERIALIZER_INSTANCE.serialize(dos, BoxedRPCEvent.of(msg)); - } - } - } - - @Override - protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List out) throws Exception { - try (var bbis = new ByteBufInputStream(msg)) { - try (var dis = new DataInputStream(bbis)) { - out.add(SERIALIZER_INSTANCE.deserialize(dis).val()); - } - } - } - } -} diff --git a/src/main/java/it/cavallium/dbengine/database/remote/RPCException.java b/src/main/java/it/cavallium/dbengine/database/remote/RPCException.java deleted file mode 100644 index 559941c..0000000 --- a/src/main/java/it/cavallium/dbengine/database/remote/RPCException.java +++ /dev/null @@ -1,10 +0,0 @@ -package it.cavallium.dbengine.database.remote; - -import org.jetbrains.annotations.Nullable; - -public class RPCException extends RuntimeException { - - public RPCException(int code, @Nullable String message) { - super("RPC error " + code + (message != null ? (": " + message) : "")); - } -} diff --git a/src/main/java/it/cavallium/dbengine/utils/StreamUtils.java b/src/main/java/it/cavallium/dbengine/utils/StreamUtils.java index 7923d2f..a03538a 100644 --- a/src/main/java/it/cavallium/dbengine/utils/StreamUtils.java +++ b/src/main/java/it/cavallium/dbengine/utils/StreamUtils.java @@ -12,6 +12,8 @@ import java.util.Comparator; import java.util.EnumSet; import java.util.Iterator; import java.util.List; +import java.util.Objects; +import java.util.Optional; import java.util.Set; import java.util.Spliterator; import java.util.concurrent.ExecutionException; @@ -22,6 +24,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.LongAdder; import java.util.function.BiConsumer; +import java.util.function.BiFunction; import java.util.function.BinaryOperator; import java.util.function.Consumer; import java.util.function.Function; @@ -30,6 +33,7 @@ import java.util.function.ToIntFunction; import java.util.function.ToLongFunction; import java.util.stream.Collector; import java.util.stream.Collector.Characteristics; +import java.util.stream.IntStream; import java.util.stream.Stream; import java.util.stream.StreamSupport; import org.jetbrains.annotations.NotNull; @@ -43,6 +47,8 @@ public class StreamUtils { private static final Collector TO_LIST_FAKE_COLLECTOR = new FakeCollector(); private static final Collector COUNT_FAKE_COLLECTOR = new FakeCollector(); + private static final Collector FIRST_FAKE_COLLECTOR = new FakeCollector(); + private static final Collector ANY_FAKE_COLLECTOR = new FakeCollector(); private static final Set CH_NOID = Collections.emptySet(); private static final Set CH_CONCURRENT_NOID = Collections.unmodifiableSet(EnumSet.of( @@ -68,12 +74,22 @@ public class StreamUtils { public static Collector> fastListing() { //noinspection unchecked - return (Collector>) TO_LIST_FAKE_COLLECTOR; + return (Collector>) TO_LIST_FAKE_COLLECTOR; } public static Collector fastCounting() { //noinspection unchecked - return (Collector) COUNT_FAKE_COLLECTOR; + return (Collector) COUNT_FAKE_COLLECTOR; + } + + public static Collector> fastFirst() { + //noinspection unchecked + return (Collector>) FIRST_FAKE_COLLECTOR; + } + + public static Collector> fastAny() { + //noinspection unchecked + return (Collector>) ANY_FAKE_COLLECTOR; } @SafeVarargs @@ -141,8 +157,22 @@ public class StreamUtils { return Streams.stream(it); } + @SuppressWarnings("DataFlowIssue") + @NotNull public static List toList(Stream stream) { - return collect(stream, fastListing()); + return StreamUtils.collect(stream, fastListing()); + } + + @SuppressWarnings("DataFlowIssue") + @NotNull + public static Optional toFirst(Stream stream) { + return StreamUtils.collect(stream, fastFirst()); + } + + @SuppressWarnings("DataFlowIssue") + @NotNull + public static Optional toAny(Stream stream) { + return StreamUtils.collect(stream, fastAny()); } @SuppressWarnings("DataFlowIssue") @@ -158,6 +188,16 @@ public class StreamUtils { return collectOn(forkJoinPool, stream, fastCounting()); } + @NotNull + public static Optional toFirstOn(ForkJoinPool forkJoinPool, Stream stream) { + return collectOn(forkJoinPool, stream, fastFirst()); + } + + @NotNull + public static Optional toAnyOn(ForkJoinPool forkJoinPool, Stream stream) { + return collectOn(forkJoinPool, stream, fastAny()); + } + /** * Collects and closes the stream on the specified pool */ @@ -196,6 +236,18 @@ public class StreamUtils { } else { return (R) (Long) 0L; } + } else if (collector == FIRST_FAKE_COLLECTOR) { + if (stream != null) { + return (R) stream.findFirst(); + } else { + return (R) Optional.empty(); + } + } else if (collector == ANY_FAKE_COLLECTOR) { + if (stream != null) { + return (R) stream.findAny(); + } else { + return (R) Optional.empty(); + } } else if (stream == null) { throw new NullPointerException("Stream is null"); } else if (collector == SUMMING_LONG_COLLECTOR) { @@ -253,6 +305,10 @@ public class StreamUtils { return SUMMING_LONG_COLLECTOR; } + public static Stream indexed(Stream stream, BiFunction mapper) { + return Streams.mapWithIndex(stream, mapper::apply); + } + private record BatchSpliterator(Spliterator base, int batchSize) implements Spliterator> { @Override diff --git a/src/main/java/module-info.java b/src/main/java/module-info.java index fcbdafb..bf24a73 100644 --- a/src/main/java/module-info.java +++ b/src/main/java/module-info.java @@ -37,15 +37,10 @@ module dbengine { requires org.apache.lucene.codecs; requires org.apache.lucene.backward_codecs; requires lucene.relevance; - requires io.netty.buffer; - requires io.netty.transport; - requires io.netty.codec; requires org.apache.lucene.facet; requires java.management; requires com.ibm.icu; requires org.apache.lucene.analysis.icu; - requires io.netty.handler; - requires io.netty.common; requires org.apache.lucene.queryparser; requires okio; requires moshi.records.reflect; diff --git a/src/test/java/it/cavallium/dbengine/tests/DbTestUtils.java b/src/test/java/it/cavallium/dbengine/tests/DbTestUtils.java index f8cb425..d932eeb 100644 --- a/src/test/java/it/cavallium/dbengine/tests/DbTestUtils.java +++ b/src/test/java/it/cavallium/dbengine/tests/DbTestUtils.java @@ -2,8 +2,6 @@ package it.cavallium.dbengine.tests; import static org.junit.jupiter.api.Assertions.assertEquals; -import io.netty.util.ResourceLeakDetector; -import io.netty.util.ResourceLeakDetector.Level; import it.cavallium.dbengine.buffers.BufDataInput; import it.cavallium.dbengine.buffers.BufDataOutput; import it.cavallium.dbengine.client.LuceneIndex; @@ -79,8 +77,7 @@ public class DbTestUtils { SwappableLuceneSearcher swappableLuceneSearcher, Path path) {} - public static void ensureNoLeaks(boolean printStats, boolean useClassicException) { - ResourceLeakDetector.setLevel(Level.PARANOID); + public static void ensureNoLeaks() { System.gc(); } diff --git a/src/test/java/it/cavallium/dbengine/tests/LocalTemporaryDbGenerator.java b/src/test/java/it/cavallium/dbengine/tests/LocalTemporaryDbGenerator.java index 13ca9d2..6ddfbb6 100644 --- a/src/test/java/it/cavallium/dbengine/tests/LocalTemporaryDbGenerator.java +++ b/src/test/java/it/cavallium/dbengine/tests/LocalTemporaryDbGenerator.java @@ -102,7 +102,7 @@ public class LocalTemporaryDbGenerator implements TemporaryDbGenerator { tempDb.swappableLuceneSearcher().close(); tempDb.luceneMulti().close(); tempDb.luceneSingle().close(); - ensureNoLeaks(false, false); + ensureNoLeaks(); if (Files.exists(tempDb.path())) { try (var walk = Files.walk(tempDb.path())) { walk.sorted(Comparator.reverseOrder()).forEach(file -> { diff --git a/src/test/java/it/cavallium/dbengine/tests/TestDictionary.java b/src/test/java/it/cavallium/dbengine/tests/TestDictionary.java index e1a6a95..bfb27be 100644 --- a/src/test/java/it/cavallium/dbengine/tests/TestDictionary.java +++ b/src/test/java/it/cavallium/dbengine/tests/TestDictionary.java @@ -25,13 +25,13 @@ public abstract class TestDictionary { @BeforeEach public void beforeEach() { - ensureNoLeaks(false, false); + ensureNoLeaks(); } @AfterEach public void afterEach() { if (!isCIMode()) { - ensureNoLeaks(true, false); + ensureNoLeaks(); } } diff --git a/src/test/java/it/cavallium/dbengine/tests/TestDictionaryMap.java b/src/test/java/it/cavallium/dbengine/tests/TestDictionaryMap.java index 8b4d115..a3db0e9 100644 --- a/src/test/java/it/cavallium/dbengine/tests/TestDictionaryMap.java +++ b/src/test/java/it/cavallium/dbengine/tests/TestDictionaryMap.java @@ -88,13 +88,13 @@ public abstract class TestDictionaryMap { @BeforeEach public void beforeEach() { - ensureNoLeaks(false, false); + ensureNoLeaks(); } @AfterEach public void afterEach() { if (!isCIMode() && checkLeaks) { - ensureNoLeaks(true, false); + ensureNoLeaks(); } } @@ -368,7 +368,7 @@ public abstract class TestDictionaryMap { var entriesFlux = entries.entrySet(); var keysFlux = entriesFlux.stream().map(Entry::getKey).toList(); - map.setAllValues(entriesFlux.stream()); + map.setAllEntries(entriesFlux.stream()); List> resultsFlux; try (var stream = map.getMulti(null, keysFlux.stream())) { resultsFlux = stream.toList(); @@ -396,8 +396,8 @@ public abstract class TestDictionaryMap { var remainingEntries = new ArrayList>(); var stpVer = run(shouldFail, () -> tempDb(getTempDbGenerator(), db -> { var map = tempDatabaseMapDictionaryMap(tempDictionary(db, updateMode), mapType, 5); - return Arrays.asList(toList(map.setAllValuesAndGetPrevious(entries.entrySet().stream())), - toList(map.setAllValuesAndGetPrevious(entries.entrySet().stream())) + return Arrays.asList(toList(map.setAllEntriesAndGetPrevious(entries.entrySet().stream())), + toList(map.setAllEntriesAndGetPrevious(entries.entrySet().stream())) ); })); if (shouldFail) { @@ -499,7 +499,7 @@ public abstract class TestDictionaryMap { var stpVer = run(shouldFail, () -> tempDb(getTempDbGenerator(), db -> { var map = tempDatabaseMapDictionaryMap(tempDictionary(db, updateMode), mapType, 5); map.putMulti(entries.entrySet().stream()); - return toList(map.getAllValues(null, false)); + return toList(map.getAllEntries(null, false)); })); if (shouldFail) { this.checkLeaks = false; diff --git a/src/test/java/it/cavallium/dbengine/tests/TestDictionaryMapDeep.java b/src/test/java/it/cavallium/dbengine/tests/TestDictionaryMapDeep.java index 40a5970..09f6656 100644 --- a/src/test/java/it/cavallium/dbengine/tests/TestDictionaryMapDeep.java +++ b/src/test/java/it/cavallium/dbengine/tests/TestDictionaryMapDeep.java @@ -158,13 +158,13 @@ public abstract class TestDictionaryMapDeep { @BeforeEach public void beforeEach() { - ensureNoLeaks(false, false); + ensureNoLeaks(); } @AfterEach public void afterEach() { if (!isCIMode() && checkLeaks) { - ensureNoLeaks(true, false); + ensureNoLeaks(); } } @@ -220,7 +220,7 @@ public abstract class TestDictionaryMapDeep { var result = run(shouldFail, () -> tempDb(getTempDbGenerator(), db -> { var map = tempDatabaseMapDictionaryDeepMap(tempDictionary(db, updateMode), 5, 6); map.putValue(key, value); - try (var stream = map.getAllValues(null, false)) { + try (var stream = map.getAllEntries(null, false)) { return stream.toList(); } })); @@ -249,7 +249,7 @@ public abstract class TestDictionaryMapDeep { return stages .flatMap(stage -> stage .getValue() - .getAllValues(null, false) + .getAllEntries(null, false) .map(r -> new Tuple3<>(stage.getKey(), r.getKey(), r.getValue()))) .toList(); } @@ -626,7 +626,7 @@ public abstract class TestDictionaryMapDeep { var map = tempDatabaseMapDictionaryDeepMap(tempDictionary(db, updateMode), 5, 6); var entriesFlux = entries.entrySet(); var keysFlux = entriesFlux.stream().map(Entry::getKey).toList(); - map.setAllValues(entries.entrySet().stream()); + map.setAllEntries(entries.entrySet().stream()); try (var resultsFlux = map.getMulti(null, entries.keySet().stream())) { return Streams .zip(keysFlux.stream(), resultsFlux, Map::entry) @@ -652,11 +652,11 @@ public abstract class TestDictionaryMapDeep { var stpVer = run(shouldFail, () -> tempDb(getTempDbGenerator(), db -> { var map = tempDatabaseMapDictionaryDeepMap(tempDictionary(db, updateMode), 5, 6); List>> a1; - try (var stream1 = map.setAllValuesAndGetPrevious(entries.entrySet().stream())) { + try (var stream1 = map.setAllEntriesAndGetPrevious(entries.entrySet().stream())) { a1 = stream1.toList(); } List>> a2; - try (var stream2 = map.setAllValuesAndGetPrevious(entries.entrySet().stream())) { + try (var stream2 = map.setAllEntriesAndGetPrevious(entries.entrySet().stream())) { a2 = stream2.toList(); } return List.of(a1, a2); @@ -769,7 +769,7 @@ public abstract class TestDictionaryMapDeep { var stpVer = run(shouldFail, () -> tempDb(getTempDbGenerator(), db -> { var map = tempDatabaseMapDictionaryDeepMap(tempDictionary(db, updateMode), 5, 6); map.putMulti(entries.entrySet().stream()); - try (var values = map.getAllValues(null, false)) { + try (var values = map.getAllEntries(null, false)) { return values.toList(); } })); diff --git a/src/test/java/it/cavallium/dbengine/tests/TestDictionaryMapDeepHashMap.java b/src/test/java/it/cavallium/dbengine/tests/TestDictionaryMapDeepHashMap.java index 5ddd055..e07f2e5 100644 --- a/src/test/java/it/cavallium/dbengine/tests/TestDictionaryMapDeepHashMap.java +++ b/src/test/java/it/cavallium/dbengine/tests/TestDictionaryMapDeepHashMap.java @@ -88,13 +88,13 @@ public abstract class TestDictionaryMapDeepHashMap { @BeforeEach public void beforeEach() { - ensureNoLeaks(false, false); + ensureNoLeaks(); } @AfterEach public void afterEach() { if (!isCIMode() && checkLeaks) { - ensureNoLeaks(true, false); + ensureNoLeaks(); } } @@ -105,7 +105,7 @@ public abstract class TestDictionaryMapDeepHashMap { var map = tempDatabaseMapDictionaryDeepMapHashMap(tempDictionary(db, updateMode), 5); map.at(null, key1).putValue(key2, value); return toList(map - .getAllValues(null, false) + .getAllEntries(null, false) .map(Entry::getValue) .flatMap(maps -> maps.entrySet().stream()) .map(Entry::getValue)); diff --git a/src/test/java/it/cavallium/dbengine/tests/TestLLDictionary.java b/src/test/java/it/cavallium/dbengine/tests/TestLLDictionary.java index 90b972a..511d342 100644 --- a/src/test/java/it/cavallium/dbengine/tests/TestLLDictionary.java +++ b/src/test/java/it/cavallium/dbengine/tests/TestLLDictionary.java @@ -39,7 +39,7 @@ public abstract class TestLLDictionary { @BeforeEach public void beforeEach() throws IOException { - ensureNoLeaks(false, false); + ensureNoLeaks(); tempDb = Objects.requireNonNull(getTempDbGenerator().openTempDb(), "TempDB"); db = tempDb.db(); } @@ -47,7 +47,7 @@ public abstract class TestLLDictionary { @AfterEach public void afterEach() throws IOException { getTempDbGenerator().closeTempDb(tempDb); - ensureNoLeaks(true, false); + ensureNoLeaks(); } public static Stream provideArguments() { diff --git a/src/test/java/it/cavallium/dbengine/tests/TestLLDictionaryLeaks.java b/src/test/java/it/cavallium/dbengine/tests/TestLLDictionaryLeaks.java index d897a64..6e3d3d0 100644 --- a/src/test/java/it/cavallium/dbengine/tests/TestLLDictionaryLeaks.java +++ b/src/test/java/it/cavallium/dbengine/tests/TestLLDictionaryLeaks.java @@ -34,7 +34,7 @@ public abstract class TestLLDictionaryLeaks { @BeforeEach public void beforeEach() throws IOException { - ensureNoLeaks(false, false); + ensureNoLeaks(); tempDb = Objects.requireNonNull(getTempDbGenerator().openTempDb(), "TempDB"); db = tempDb.db(); } @@ -42,7 +42,7 @@ public abstract class TestLLDictionaryLeaks { @AfterEach public void afterEach() throws IOException { getTempDbGenerator().closeTempDb(tempDb); - ensureNoLeaks(true, false); + ensureNoLeaks(); } public static Stream provideArguments() { diff --git a/src/test/java/it/cavallium/dbengine/tests/TestLuceneIndex.java b/src/test/java/it/cavallium/dbengine/tests/TestLuceneIndex.java index 590295d..6463ff4 100644 --- a/src/test/java/it/cavallium/dbengine/tests/TestLuceneIndex.java +++ b/src/test/java/it/cavallium/dbengine/tests/TestLuceneIndex.java @@ -52,7 +52,7 @@ public class TestLuceneIndex { @BeforeEach public void beforeEach() throws IOException { - ensureNoLeaks(false, false); + ensureNoLeaks(); tempDb = Objects.requireNonNull(getTempDbGenerator().openTempDb(), "TempDB"); luceneSingle = tempDb.luceneSingle(); luceneMulti = tempDb.luceneMulti(); @@ -123,7 +123,7 @@ public class TestLuceneIndex { @AfterEach public void afterEach() throws IOException { getTempDbGenerator().closeTempDb(tempDb); - ensureNoLeaks(true, false); + ensureNoLeaks(); } @AfterAll diff --git a/src/test/java/it/cavallium/dbengine/tests/TestLuceneSearches.java b/src/test/java/it/cavallium/dbengine/tests/TestLuceneSearches.java index effc959..cb9f7d2 100644 --- a/src/test/java/it/cavallium/dbengine/tests/TestLuceneSearches.java +++ b/src/test/java/it/cavallium/dbengine/tests/TestLuceneSearches.java @@ -5,7 +5,6 @@ import static it.cavallium.dbengine.tests.DbTestUtils.ensureNoLeaks; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.fail; -import io.netty.buffer.PooledByteBufAllocator; import it.cavallium.dbengine.tests.DbTestUtils.TempDb; import it.cavallium.dbengine.tests.TestLuceneIndex.Tuple2; import it.cavallium.dbengine.client.HitKey; @@ -71,9 +70,6 @@ public class TestLuceneSearches { private static final Map ELEMENTS; static { - // Start the pool by creating and deleting a direct buffer - PooledByteBufAllocator.DEFAULT.directBuffer().release(); - var modifiableElements = new LinkedHashMap(); modifiableElements.put("test-key-1", "0123456789"); modifiableElements.put("test-key-2", "test 0123456789 test word"); @@ -96,7 +92,7 @@ public class TestLuceneSearches { @BeforeAll public static void beforeAll() throws IOException { - ensureNoLeaks(false, false); + ensureNoLeaks(); tempDb = Objects.requireNonNull(TEMP_DB_GENERATOR.openTempDb(), "TempDB"); luceneSingle = tempDb.luceneSingle(); luceneMulti = tempDb.luceneMulti(); @@ -190,7 +186,7 @@ public class TestLuceneSearches { @AfterAll public static void afterAll() throws IOException { TEMP_DB_GENERATOR.closeTempDb(tempDb); - ensureNoLeaks(true, false); + ensureNoLeaks(); } private LuceneIndex getLuceneIndex(boolean shards, @Nullable LocalSearcher customSearcher) { diff --git a/src/test/java/it/cavallium/dbengine/tests/TestSingletons.java b/src/test/java/it/cavallium/dbengine/tests/TestSingletons.java index c8d3f37..fd43486 100644 --- a/src/test/java/it/cavallium/dbengine/tests/TestSingletons.java +++ b/src/test/java/it/cavallium/dbengine/tests/TestSingletons.java @@ -43,12 +43,12 @@ public abstract class TestSingletons { @BeforeEach public void beforeEach() { - ensureNoLeaks(false, false); + ensureNoLeaks(); } @AfterEach public void afterEach() { - ensureNoLeaks(true, false); + ensureNoLeaks(); } @Test diff --git a/src/test/java/module-info.java b/src/test/java/module-info.java index acc18b9..13dd3c4 100644 --- a/src/test/java/module-info.java +++ b/src/test/java/module-info.java @@ -6,13 +6,11 @@ module dbengine.tests { requires org.apache.lucene.core; requires it.unimi.dsi.fastutil; requires org.apache.lucene.queryparser; - requires io.netty.common; requires org.jetbrains.annotations; requires micrometer.core; requires org.junit.jupiter.params; requires com.google.common; requires org.apache.logging.log4j; - requires io.netty.buffer; requires org.apache.commons.lang3; requires rocksdbjni; opens it.cavallium.dbengine.tests;