This commit is contained in:
Andrea Cavalli 2022-11-22 18:44:45 +01:00
parent 6aa7bb6040
commit 36b76d81ed
4 changed files with 5 additions and 6 deletions

View File

@ -7,5 +7,5 @@ import reactor.core.publisher.Mono;
public interface DatabaseOperations { public interface DatabaseOperations {
Mono<Void> ingestSST(Column column, Publisher<Path> files); Mono<Void> ingestSST(Column column, Publisher<Path> files, boolean replaceExisting);
} }

View File

@ -704,7 +704,7 @@ public class LLLocalKeyValueDatabase extends Backuppable implements LLKeyValueDa
} }
@Override @Override
public Mono<Void> ingestSST(Column column, Publisher<Path> files) { public Mono<Void> ingestSST(Column column, Publisher<Path> files, boolean replaceExisting) {
var columnHandle = handles.get(column); var columnHandle = handles.get(column);
if (columnHandle == null) { if (columnHandle == null) {
logger.warn("Column {} doesn't exist", column); logger.warn("Column {} doesn't exist", column);
@ -712,7 +712,7 @@ public class LLLocalKeyValueDatabase extends Backuppable implements LLKeyValueDa
} }
return Flux.from(files).concatMap(sst -> Mono.fromCallable(() -> { return Flux.from(files).concatMap(sst -> Mono.fromCallable(() -> {
try (var opts = new IngestExternalFileOptions()) { try (var opts = new IngestExternalFileOptions()) {
opts.setIngestBehind(true); opts.setIngestBehind(!replaceExisting);
opts.setSnapshotConsistency(false); opts.setSnapshotConsistency(false);
opts.setAllowBlockingFlush(true); opts.setAllowBlockingFlush(true);
opts.setMoveFiles(true); opts.setMoveFiles(true);

View File

@ -232,7 +232,7 @@ public class LLMemoryKeyValueDatabase implements LLKeyValueDatabase {
} }
@Override @Override
public Mono<Void> ingestSST(Column column, Publisher<Path> files) { public Mono<Void> ingestSST(Column column, Publisher<Path> files, boolean replaceExisting) {
return Mono.error(new UnsupportedOperationException("Memory db doesn't support SST files")); return Mono.error(new UnsupportedOperationException("Memory db doesn't support SST files"));
} }
} }

View File

@ -6,7 +6,6 @@ import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
import io.netty.incubator.codec.quic.QuicSslContextBuilder; import io.netty.incubator.codec.quic.QuicSslContextBuilder;
import io.netty5.buffer.Buffer; import io.netty5.buffer.Buffer;
import io.netty5.buffer.BufferAllocator; import io.netty5.buffer.BufferAllocator;
import io.netty5.util.Send;
import it.cavallium.dbengine.client.MemoryStats; import it.cavallium.dbengine.client.MemoryStats;
import it.cavallium.dbengine.client.query.current.data.Query; import it.cavallium.dbengine.client.query.current.data.Query;
import it.cavallium.dbengine.client.query.current.data.QueryParams; import it.cavallium.dbengine.client.query.current.data.QueryParams;
@ -235,7 +234,7 @@ public class LLQuicConnection implements LLDatabaseConnection {
.map(id -> new LLKeyValueDatabase() { .map(id -> new LLKeyValueDatabase() {
@Override @Override
public Mono<Void> ingestSST(Column column, Publisher<Path> files) { public Mono<Void> ingestSST(Column column, Publisher<Path> files, boolean replaceExisting) {
return null; return null;
} }