From 36b76d81ed272d1b60f273d3b135d241c5d92978 Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Tue, 22 Nov 2022 18:44:45 +0100 Subject: [PATCH] flag --- .../it/cavallium/dbengine/database/DatabaseOperations.java | 2 +- .../dbengine/database/disk/LLLocalKeyValueDatabase.java | 4 ++-- .../dbengine/database/memory/LLMemoryKeyValueDatabase.java | 2 +- .../cavallium/dbengine/database/remote/LLQuicConnection.java | 3 +-- 4 files changed, 5 insertions(+), 6 deletions(-) diff --git a/src/main/java/it/cavallium/dbengine/database/DatabaseOperations.java b/src/main/java/it/cavallium/dbengine/database/DatabaseOperations.java index 2864d55..0347bd5 100644 --- a/src/main/java/it/cavallium/dbengine/database/DatabaseOperations.java +++ b/src/main/java/it/cavallium/dbengine/database/DatabaseOperations.java @@ -7,5 +7,5 @@ import reactor.core.publisher.Mono; public interface DatabaseOperations { - Mono ingestSST(Column column, Publisher files); + Mono ingestSST(Column column, Publisher files, boolean replaceExisting); } diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java index d0f96f7..b42f0ee 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java @@ -704,7 +704,7 @@ public class LLLocalKeyValueDatabase extends Backuppable implements LLKeyValueDa } @Override - public Mono ingestSST(Column column, Publisher files) { + public Mono ingestSST(Column column, Publisher files, boolean replaceExisting) { var columnHandle = handles.get(column); if (columnHandle == null) { logger.warn("Column {} doesn't exist", column); @@ -712,7 +712,7 @@ public class LLLocalKeyValueDatabase extends Backuppable implements LLKeyValueDa } return Flux.from(files).concatMap(sst -> Mono.fromCallable(() -> { try (var opts = new IngestExternalFileOptions()) { - opts.setIngestBehind(true); + opts.setIngestBehind(!replaceExisting); opts.setSnapshotConsistency(false); opts.setAllowBlockingFlush(true); opts.setMoveFiles(true); diff --git a/src/main/java/it/cavallium/dbengine/database/memory/LLMemoryKeyValueDatabase.java b/src/main/java/it/cavallium/dbengine/database/memory/LLMemoryKeyValueDatabase.java index 5b9d353..22f5ae4 100644 --- a/src/main/java/it/cavallium/dbengine/database/memory/LLMemoryKeyValueDatabase.java +++ b/src/main/java/it/cavallium/dbengine/database/memory/LLMemoryKeyValueDatabase.java @@ -232,7 +232,7 @@ public class LLMemoryKeyValueDatabase implements LLKeyValueDatabase { } @Override - public Mono ingestSST(Column column, Publisher files) { + public Mono ingestSST(Column column, Publisher files, boolean replaceExisting) { return Mono.error(new UnsupportedOperationException("Memory db doesn't support SST files")); } } diff --git a/src/main/java/it/cavallium/dbengine/database/remote/LLQuicConnection.java b/src/main/java/it/cavallium/dbengine/database/remote/LLQuicConnection.java index 31bcd86..142e055 100644 --- a/src/main/java/it/cavallium/dbengine/database/remote/LLQuicConnection.java +++ b/src/main/java/it/cavallium/dbengine/database/remote/LLQuicConnection.java @@ -6,7 +6,6 @@ import io.netty.handler.ssl.util.InsecureTrustManagerFactory; import io.netty.incubator.codec.quic.QuicSslContextBuilder; import io.netty5.buffer.Buffer; import io.netty5.buffer.BufferAllocator; -import io.netty5.util.Send; import it.cavallium.dbengine.client.MemoryStats; import it.cavallium.dbengine.client.query.current.data.Query; import it.cavallium.dbengine.client.query.current.data.QueryParams; @@ -235,7 +234,7 @@ public class LLQuicConnection implements LLDatabaseConnection { .map(id -> new LLKeyValueDatabase() { @Override - public Mono ingestSST(Column column, Publisher files) { + public Mono ingestSST(Column column, Publisher files, boolean replaceExisting) { return null; }