Implement db maintenance operations

This commit is contained in:
Andrea Cavalli 2022-11-22 17:36:31 +01:00
parent 889f59772c
commit 6aa7bb6040
6 changed files with 50 additions and 7 deletions

View File

@ -2,11 +2,12 @@ package it.cavallium.dbengine.client;
import io.micrometer.core.instrument.MeterRegistry;
import io.netty5.buffer.BufferAllocator;
import it.cavallium.dbengine.database.DatabaseOperations;
import it.cavallium.dbengine.database.DatabaseProperties;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
public interface CompositeDatabase extends DatabaseProperties {
public interface CompositeDatabase extends DatabaseProperties, DatabaseOperations {
Mono<Void> preClose();

View File

@ -0,0 +1,11 @@
package it.cavallium.dbengine.database;
import it.cavallium.dbengine.rpc.current.data.Column;
import java.nio.file.Path;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Mono;
public interface DatabaseOperations {
Mono<Void> ingestSST(Column column, Publisher<Path> files);
}

View File

@ -16,7 +16,7 @@ import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
public interface LLKeyValueDatabase extends LLSnapshottable, LLKeyValueDatabaseStructure, DatabaseProperties,
IBackuppable {
IBackuppable, DatabaseOperations {
Mono<? extends LLSingleton> getSingleton(byte[] singletonListColumnName, byte[] name, byte @Nullable[] defaultValue);

View File

@ -33,7 +33,6 @@ import it.cavallium.dbengine.rpc.current.data.NamedColumnOptions;
import it.cavallium.dbengine.rpc.current.data.NoFilter;
import java.io.File;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
@ -56,12 +55,12 @@ import org.apache.commons.lang3.time.StopWatch;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.jetbrains.annotations.Nullable;
import org.reactivestreams.Publisher;
import org.rocksdb.AbstractImmutableNativeReference;
import org.rocksdb.BlockBasedTableConfig;
import org.rocksdb.BloomFilter;
import org.rocksdb.Cache;
import org.rocksdb.ChecksumType;
import org.rocksdb.ClockCache;
import org.rocksdb.ColumnFamilyDescriptor;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ColumnFamilyOptions;
@ -77,9 +76,6 @@ import org.rocksdb.FlushOptions;
import org.rocksdb.IndexType;
import org.rocksdb.InfoLogLevel;
import org.rocksdb.IngestExternalFileOptions;
import org.rocksdb.LRUCache;
import org.rocksdb.LogFile;
import org.rocksdb.MutableDBOptions;
import org.rocksdb.OptimisticTransactionDB;
import org.rocksdb.PersistentCache;
import org.rocksdb.PrepopulateBlobCache;
@ -707,6 +703,27 @@ public class LLLocalKeyValueDatabase extends Backuppable implements LLKeyValueDa
return resumeWrites();
}
@Override
public Mono<Void> ingestSST(Column column, Publisher<Path> files) {
var columnHandle = handles.get(column);
if (columnHandle == null) {
logger.warn("Column {} doesn't exist", column);
return Mono.empty();
}
return Flux.from(files).concatMap(sst -> Mono.fromCallable(() -> {
try (var opts = new IngestExternalFileOptions()) {
opts.setIngestBehind(true);
opts.setSnapshotConsistency(false);
opts.setAllowBlockingFlush(true);
opts.setMoveFiles(true);
db.ingestExternalFile(columnHandle, List.of(sst.toString()), opts);
} catch (RocksDBException ex) {
return new IOException("Failed to ingest SST file " + sst, ex);
}
return null;
})).then();
}
private record RocksLevelOptions(CompressionType compressionType, CompressionOptions compressionOptions) {}
private RocksLevelOptions getRocksLevelOptions(DatabaseLevel levelOptions, RocksDBRefs refs) {
var compressionType = levelOptions.compression().getType();

View File

@ -17,12 +17,14 @@ import it.cavallium.dbengine.database.UpdateMode;
import it.cavallium.dbengine.rpc.current.data.Column;
import it.unimi.dsi.fastutil.bytes.ByteList;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicLong;
import org.jetbrains.annotations.Nullable;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@ -228,4 +230,9 @@ public class LLMemoryKeyValueDatabase implements LLKeyValueDatabase {
public boolean isPaused() {
return false;
}
@Override
public Mono<Void> ingestSST(Column column, Publisher<Path> files) {
return Mono.error(new UnsupportedOperationException("Memory db doesn't support SST files"));
}
}

View File

@ -63,6 +63,7 @@ import it.cavallium.dbengine.rpc.current.data.nullables.NullableLLSnapshot;
import it.unimi.dsi.fastutil.bytes.ByteList;
import java.io.File;
import java.net.SocketAddress;
import java.nio.file.Path;
import java.time.Duration;
import java.util.List;
import java.util.Map;
@ -71,6 +72,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.netty.incubator.quic.QuicClient;
@ -232,6 +234,11 @@ public class LLQuicConnection implements LLDatabaseConnection {
.map(GeneratedEntityId::id)
.map(id -> new LLKeyValueDatabase() {
@Override
public Mono<Void> ingestSST(Column column, Publisher<Path> files) {
return null;
}
@Override
public Mono<? extends LLSingleton> getSingleton(byte[] singletonListColumnName,
byte[] name,