diff --git a/src/main/java/it/cavallium/dbengine/client/CompositeDatabase.java b/src/main/java/it/cavallium/dbengine/client/CompositeDatabase.java index 3a91ba5..5540a8f 100644 --- a/src/main/java/it/cavallium/dbengine/client/CompositeDatabase.java +++ b/src/main/java/it/cavallium/dbengine/client/CompositeDatabase.java @@ -2,11 +2,12 @@ package it.cavallium.dbengine.client; import io.micrometer.core.instrument.MeterRegistry; import io.netty5.buffer.BufferAllocator; +import it.cavallium.dbengine.database.DatabaseOperations; import it.cavallium.dbengine.database.DatabaseProperties; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -public interface CompositeDatabase extends DatabaseProperties { +public interface CompositeDatabase extends DatabaseProperties, DatabaseOperations { Mono preClose(); diff --git a/src/main/java/it/cavallium/dbengine/database/DatabaseOperations.java b/src/main/java/it/cavallium/dbengine/database/DatabaseOperations.java new file mode 100644 index 0000000..2864d55 --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/database/DatabaseOperations.java @@ -0,0 +1,11 @@ +package it.cavallium.dbengine.database; + +import it.cavallium.dbengine.rpc.current.data.Column; +import java.nio.file.Path; +import org.reactivestreams.Publisher; +import reactor.core.publisher.Mono; + +public interface DatabaseOperations { + + Mono ingestSST(Column column, Publisher files); +} diff --git a/src/main/java/it/cavallium/dbengine/database/LLKeyValueDatabase.java b/src/main/java/it/cavallium/dbengine/database/LLKeyValueDatabase.java index cd8e4c4..f9e5b18 100644 --- a/src/main/java/it/cavallium/dbengine/database/LLKeyValueDatabase.java +++ b/src/main/java/it/cavallium/dbengine/database/LLKeyValueDatabase.java @@ -16,7 +16,7 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; public interface LLKeyValueDatabase extends LLSnapshottable, LLKeyValueDatabaseStructure, DatabaseProperties, - IBackuppable { + IBackuppable, DatabaseOperations { Mono getSingleton(byte[] singletonListColumnName, byte[] name, byte @Nullable[] defaultValue); diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java index a699c95..d0f96f7 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java @@ -33,7 +33,6 @@ import it.cavallium.dbengine.rpc.current.data.NamedColumnOptions; import it.cavallium.dbengine.rpc.current.data.NoFilter; import java.io.File; import java.io.IOException; -import java.lang.invoke.MethodHandles; import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; @@ -56,12 +55,12 @@ import org.apache.commons.lang3.time.StopWatch; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.jetbrains.annotations.Nullable; +import org.reactivestreams.Publisher; import org.rocksdb.AbstractImmutableNativeReference; import org.rocksdb.BlockBasedTableConfig; import org.rocksdb.BloomFilter; import org.rocksdb.Cache; import org.rocksdb.ChecksumType; -import org.rocksdb.ClockCache; import org.rocksdb.ColumnFamilyDescriptor; import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.ColumnFamilyOptions; @@ -77,9 +76,6 @@ import org.rocksdb.FlushOptions; import org.rocksdb.IndexType; import org.rocksdb.InfoLogLevel; import org.rocksdb.IngestExternalFileOptions; -import org.rocksdb.LRUCache; -import org.rocksdb.LogFile; -import org.rocksdb.MutableDBOptions; import org.rocksdb.OptimisticTransactionDB; import org.rocksdb.PersistentCache; import org.rocksdb.PrepopulateBlobCache; @@ -707,6 +703,27 @@ public class LLLocalKeyValueDatabase extends Backuppable implements LLKeyValueDa return resumeWrites(); } + @Override + public Mono ingestSST(Column column, Publisher files) { + var columnHandle = handles.get(column); + if (columnHandle == null) { + logger.warn("Column {} doesn't exist", column); + return Mono.empty(); + } + return Flux.from(files).concatMap(sst -> Mono.fromCallable(() -> { + try (var opts = new IngestExternalFileOptions()) { + opts.setIngestBehind(true); + opts.setSnapshotConsistency(false); + opts.setAllowBlockingFlush(true); + opts.setMoveFiles(true); + db.ingestExternalFile(columnHandle, List.of(sst.toString()), opts); + } catch (RocksDBException ex) { + return new IOException("Failed to ingest SST file " + sst, ex); + } + return null; + })).then(); + } + private record RocksLevelOptions(CompressionType compressionType, CompressionOptions compressionOptions) {} private RocksLevelOptions getRocksLevelOptions(DatabaseLevel levelOptions, RocksDBRefs refs) { var compressionType = levelOptions.compression().getType(); diff --git a/src/main/java/it/cavallium/dbengine/database/memory/LLMemoryKeyValueDatabase.java b/src/main/java/it/cavallium/dbengine/database/memory/LLMemoryKeyValueDatabase.java index c33932e..5b9d353 100644 --- a/src/main/java/it/cavallium/dbengine/database/memory/LLMemoryKeyValueDatabase.java +++ b/src/main/java/it/cavallium/dbengine/database/memory/LLMemoryKeyValueDatabase.java @@ -17,12 +17,14 @@ import it.cavallium.dbengine.database.UpdateMode; import it.cavallium.dbengine.rpc.current.data.Column; import it.unimi.dsi.fastutil.bytes.ByteList; import java.nio.charset.StandardCharsets; +import java.nio.file.Path; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.atomic.AtomicLong; import org.jetbrains.annotations.Nullable; +import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -228,4 +230,9 @@ public class LLMemoryKeyValueDatabase implements LLKeyValueDatabase { public boolean isPaused() { return false; } + + @Override + public Mono ingestSST(Column column, Publisher files) { + return Mono.error(new UnsupportedOperationException("Memory db doesn't support SST files")); + } } diff --git a/src/main/java/it/cavallium/dbengine/database/remote/LLQuicConnection.java b/src/main/java/it/cavallium/dbengine/database/remote/LLQuicConnection.java index a038263..31bcd86 100644 --- a/src/main/java/it/cavallium/dbengine/database/remote/LLQuicConnection.java +++ b/src/main/java/it/cavallium/dbengine/database/remote/LLQuicConnection.java @@ -63,6 +63,7 @@ import it.cavallium.dbengine.rpc.current.data.nullables.NullableLLSnapshot; import it.unimi.dsi.fastutil.bytes.ByteList; import java.io.File; import java.net.SocketAddress; +import java.nio.file.Path; import java.time.Duration; import java.util.List; import java.util.Map; @@ -71,6 +72,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.function.Function; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; +import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.netty.incubator.quic.QuicClient; @@ -232,6 +234,11 @@ public class LLQuicConnection implements LLDatabaseConnection { .map(GeneratedEntityId::id) .map(id -> new LLKeyValueDatabase() { + @Override + public Mono ingestSST(Column column, Publisher files) { + return null; + } + @Override public Mono getSingleton(byte[] singletonListColumnName, byte[] name,