diff --git a/src/main/java/it/cavallium/dbengine/client/Backuppable.java b/src/main/java/it/cavallium/dbengine/client/Backuppable.java new file mode 100644 index 0000000..a83a6c9 --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/client/Backuppable.java @@ -0,0 +1,57 @@ +package it.cavallium.dbengine.client; + +import java.util.concurrent.atomic.AtomicInteger; +import reactor.core.publisher.Mono; +import reactor.core.publisher.SignalType; + +public abstract class Backuppable implements IBackuppable { + + public enum State { + RUNNING, PAUSING, PAUSED, RESUMING, STOPPED + } + + private final AtomicInteger state = new AtomicInteger(); + + @Override + public final Mono pauseForBackup() { + return Mono.defer(() -> { + if (state.compareAndSet(State.RUNNING.ordinal(), State.PAUSING.ordinal())) { + return onPauseForBackup().doFinally(type -> state.compareAndSet(State.PAUSING.ordinal(), + type == SignalType.ON_ERROR ? State.RUNNING.ordinal() : State.PAUSED.ordinal() + )); + } else { + return Mono.empty(); + } + }); + } + + @Override + public final Mono resumeAfterBackup() { + return Mono.defer(() -> { + if (state.compareAndSet(State.PAUSED.ordinal(), State.RESUMING.ordinal())) { + return onResumeAfterBackup().doFinally(type -> state.compareAndSet(State.RESUMING.ordinal(), + type == SignalType.ON_ERROR ? State.PAUSED.ordinal() : State.RUNNING.ordinal() + )); + } else { + return Mono.empty(); + } + }); + } + + @Override + public final boolean isPaused() { + return state.get() == State.PAUSED.ordinal(); + } + + public final State getState() { + return State.values()[state.get()]; + } + + protected abstract Mono onPauseForBackup(); + + protected abstract Mono onResumeAfterBackup(); + + public final void setStopped() { + state.set(State.STOPPED.ordinal()); + } +} diff --git a/src/main/java/it/cavallium/dbengine/client/IBackuppable.java b/src/main/java/it/cavallium/dbengine/client/IBackuppable.java new file mode 100644 index 0000000..2427f07 --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/client/IBackuppable.java @@ -0,0 +1,12 @@ +package it.cavallium.dbengine.client; + +import reactor.core.publisher.Mono; + +public interface IBackuppable { + + Mono pauseForBackup(); + + Mono resumeAfterBackup(); + + boolean isPaused(); +} diff --git a/src/main/java/it/cavallium/dbengine/database/LLKeyValueDatabase.java b/src/main/java/it/cavallium/dbengine/database/LLKeyValueDatabase.java index 8b8623f..590b117 100644 --- a/src/main/java/it/cavallium/dbengine/database/LLKeyValueDatabase.java +++ b/src/main/java/it/cavallium/dbengine/database/LLKeyValueDatabase.java @@ -4,6 +4,7 @@ import com.google.common.primitives.Ints; import com.google.common.primitives.Longs; import io.micrometer.core.instrument.MeterRegistry; import io.netty5.buffer.api.BufferAllocator; +import it.cavallium.dbengine.client.IBackuppable; import it.cavallium.dbengine.client.MemoryStats; import it.cavallium.dbengine.database.collections.DatabaseInt; import it.cavallium.dbengine.database.collections.DatabaseLong; @@ -14,7 +15,8 @@ import org.jetbrains.annotations.Nullable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -public interface LLKeyValueDatabase extends LLSnapshottable, LLKeyValueDatabaseStructure, DatabaseProperties { +public interface LLKeyValueDatabase extends LLSnapshottable, LLKeyValueDatabaseStructure, DatabaseProperties, + IBackuppable { Mono getSingleton(byte[] singletonListColumnName, byte[] name, byte @Nullable[] defaultValue); diff --git a/src/main/java/it/cavallium/dbengine/database/LLLuceneIndex.java b/src/main/java/it/cavallium/dbengine/database/LLLuceneIndex.java index e96c490..f49d4fa 100644 --- a/src/main/java/it/cavallium/dbengine/database/LLLuceneIndex.java +++ b/src/main/java/it/cavallium/dbengine/database/LLLuceneIndex.java @@ -1,6 +1,7 @@ package it.cavallium.dbengine.database; import com.google.common.collect.Multimap; +import it.cavallium.dbengine.client.IBackuppable; import it.cavallium.dbengine.client.query.current.data.NoSort; import it.cavallium.dbengine.client.query.current.data.Query; import it.cavallium.dbengine.client.query.current.data.QueryParams; @@ -16,7 +17,7 @@ import org.jetbrains.annotations.Nullable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -public interface LLLuceneIndex extends LLSnapshottable, SafeCloseable { +public interface LLLuceneIndex extends LLSnapshottable, IBackuppable, SafeCloseable { String getLuceneIndexName(); diff --git a/src/main/java/it/cavallium/dbengine/database/LLMultiLuceneIndex.java b/src/main/java/it/cavallium/dbengine/database/LLMultiLuceneIndex.java index a7729ed..0378b38 100644 --- a/src/main/java/it/cavallium/dbengine/database/LLMultiLuceneIndex.java +++ b/src/main/java/it/cavallium/dbengine/database/LLMultiLuceneIndex.java @@ -1,6 +1,8 @@ package it.cavallium.dbengine.database; +import com.google.common.collect.Iterables; import com.google.common.collect.Multimap; +import it.cavallium.dbengine.client.IBackuppable; import it.cavallium.dbengine.rpc.current.data.IndicizerAnalyzers; import it.cavallium.dbengine.rpc.current.data.IndicizerSimilarities; import it.cavallium.dbengine.client.query.current.data.Query; @@ -242,4 +244,24 @@ public class LLMultiLuceneIndex implements LLLuceneIndex { }) .then(); } + + @Override + public Mono pauseForBackup() { + return Mono.whenDelayError(Iterables.transform(this.luceneIndicesSet, IBackuppable::pauseForBackup)); + } + + @Override + public Mono resumeAfterBackup() { + return Mono.whenDelayError(Iterables.transform(this.luceneIndicesSet, IBackuppable::resumeAfterBackup)); + } + + @Override + public boolean isPaused() { + for (LLLuceneIndex llLuceneIndex : this.luceneIndicesSet) { + if (llLuceneIndex.isPaused()) { + return true; + } + } + return false; + } } diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java index 0b20e9d..686304a 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java @@ -12,6 +12,7 @@ import io.micrometer.core.instrument.Timer; import io.netty5.buffer.api.BufferAllocator; import io.netty5.util.internal.PlatformDependent; import it.cavallium.data.generator.nativedata.NullableString; +import it.cavallium.dbengine.client.Backuppable; import it.cavallium.dbengine.client.MemoryStats; import it.cavallium.dbengine.database.ColumnProperty; import it.cavallium.dbengine.database.ColumnUtils; @@ -98,7 +99,7 @@ import reactor.core.publisher.Mono; import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Schedulers; -public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { +public class LLLocalKeyValueDatabase extends Backuppable implements LLKeyValueDatabase { private static final boolean DELETE_LOG_FILES = false; private static final boolean FOLLOW_ROCKSDB_OPTIMIZATIONS = true; @@ -687,6 +688,16 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { }).subscribeOn(dbWScheduler); } + @Override + protected Mono onPauseForBackup() { + return pauseWrites(); + } + + @Override + protected Mono onResumeAfterBackup() { + return resumeWrites(); + } + private record RocksLevelOptions(CompressionType compressionType, CompressionOptions compressionOptions) {} private RocksLevelOptions getRocksLevelOptions(DatabaseLevel levelOptions, RocksDBRefs refs) { var compressionType = levelOptions.compression().getType(); @@ -1617,6 +1628,22 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { .subscribeOn(dbWScheduler); } + private Mono pauseWrites() { + return Mono.fromCallable(() -> { + db.pauseBackgroundWork(); + db.disableFileDeletions(); + return null; + }).subscribeOn(dbWScheduler); + } + + private Mono resumeWrites() { + return Mono.fromCallable(() -> { + db.continueBackgroundWork(); + db.enableFileDeletions(false); + return null; + }).subscribeOn(dbWScheduler); + } + /** * Call this method ONLY AFTER flushing completely a db and closing it! */ diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java index 706f1f2..fbb0ff2 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java @@ -14,6 +14,8 @@ import io.micrometer.core.instrument.Counter; import io.micrometer.core.instrument.MeterRegistry; import io.micrometer.core.instrument.Tag; import io.micrometer.core.instrument.Timer; +import it.cavallium.dbengine.client.Backuppable; +import it.cavallium.dbengine.client.IBackuppable; import it.cavallium.dbengine.client.query.QueryParser; import it.cavallium.dbengine.client.query.current.data.Query; import it.cavallium.dbengine.client.query.current.data.QueryParams; @@ -82,7 +84,7 @@ import reactor.core.publisher.SignalType; import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Schedulers; -public class LLLocalLuceneIndex extends SimpleResource implements LLLuceneIndex, LuceneCloseable { +public class LLLocalLuceneIndex extends SimpleResource implements IBackuppable, LLLuceneIndex, LuceneCloseable { protected static final Logger logger = LogManager.getLogger(LLLocalLuceneIndex.class); @@ -138,6 +140,7 @@ public class LLLocalLuceneIndex extends SimpleResource implements LLLuceneIndex, private final Similarity luceneSimilarity; private final LuceneRocksDBManager rocksDBManager; private final Directory directory; + private final LuceneBackuppable backuppable; private final boolean lowMemory; private final Phaser activeTasks = new Phaser(1); @@ -280,6 +283,8 @@ public class LLLocalLuceneIndex extends SimpleResource implements LLLuceneIndex, var commitMillis = luceneOptions.commitDebounceTime().toMillis(); luceneHeavyTasksScheduler.schedulePeriodically(this::scheduledCommit, commitMillis, commitMillis, TimeUnit.MILLISECONDS); + + this.backuppable = new LuceneBackuppable(); } private Similarity getLuceneSimilarity() { @@ -848,4 +853,42 @@ public class LLLocalLuceneIndex extends SimpleResource implements LLLuceneIndex, public int hashCode() { return shardName.hashCode(); } + + @Override + public Mono pauseForBackup() { + return backuppable.pauseForBackup(); + } + + @Override + public Mono resumeAfterBackup() { + return backuppable.resumeAfterBackup(); + } + + @Override + public boolean isPaused() { + return backuppable.isPaused(); + } + + private class LuceneBackuppable extends Backuppable { + + private LLSnapshot snapshot; + + @Override + protected Mono onPauseForBackup() { + return LLLocalLuceneIndex.this.takeSnapshot().doOnSuccess(snapshot -> { + if (snapshot == null) { + logger.error("Can't pause index \"{}\" because snapshots are not enabled!", shardName); + } + this.snapshot = snapshot; + }).then(); + } + + @Override + protected Mono onResumeAfterBackup() { + if (snapshot == null) { + return Mono.empty(); + } + return LLLocalLuceneIndex.this.releaseSnapshot(snapshot); + } + } } diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMultiLuceneIndex.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMultiLuceneIndex.java index cae6584..7769f70 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMultiLuceneIndex.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalMultiLuceneIndex.java @@ -3,9 +3,11 @@ package it.cavallium.dbengine.database.disk; import static it.cavallium.dbengine.client.UninterruptibleScheduler.uninterruptibleScheduler; import static it.cavallium.dbengine.lucene.LuceneUtils.luceneScheduler; +import com.google.common.collect.Iterables; import com.google.common.collect.Multimap; import io.micrometer.core.instrument.MeterRegistry; import io.netty5.util.Send; +import it.cavallium.dbengine.client.IBackuppable; import it.cavallium.dbengine.client.query.QueryParser; import it.cavallium.dbengine.client.query.current.data.NoSort; import it.cavallium.dbengine.client.query.current.data.Query; @@ -415,4 +417,24 @@ public class LLLocalMultiLuceneIndex extends SimpleResource implements LLLuceneI public boolean isLowMemoryMode() { return lowMemory; } + + @Override + public Mono pauseForBackup() { + return Mono.whenDelayError(Iterables.transform(this.luceneIndicesSet, IBackuppable::pauseForBackup)); + } + + @Override + public Mono resumeAfterBackup() { + return Mono.whenDelayError(Iterables.transform(this.luceneIndicesSet, IBackuppable::resumeAfterBackup)); + } + + @Override + public boolean isPaused() { + for (LLLuceneIndex llLuceneIndex : this.luceneIndicesSet) { + if (llLuceneIndex.isPaused()) { + return true; + } + } + return false; + } } diff --git a/src/main/java/it/cavallium/dbengine/database/memory/LLMemoryKeyValueDatabase.java b/src/main/java/it/cavallium/dbengine/database/memory/LLMemoryKeyValueDatabase.java index 8425baf..a2b1c8a 100644 --- a/src/main/java/it/cavallium/dbengine/database/memory/LLMemoryKeyValueDatabase.java +++ b/src/main/java/it/cavallium/dbengine/database/memory/LLMemoryKeyValueDatabase.java @@ -213,4 +213,19 @@ public class LLMemoryKeyValueDatabase implements LLKeyValueDatabase { .fromCallable(() -> snapshots.remove(snapshot.getSequenceNumber())) .then(); } + + @Override + public Mono pauseForBackup() { + return Mono.empty(); + } + + @Override + public Mono resumeAfterBackup() { + return Mono.empty(); + } + + @Override + public boolean isPaused() { + return false; + } } diff --git a/src/main/java/it/cavallium/dbengine/database/remote/LLQuicConnection.java b/src/main/java/it/cavallium/dbengine/database/remote/LLQuicConnection.java index b3a9a68..4de7c20 100644 --- a/src/main/java/it/cavallium/dbengine/database/remote/LLQuicConnection.java +++ b/src/main/java/it/cavallium/dbengine/database/remote/LLQuicConnection.java @@ -231,6 +231,7 @@ public class LLQuicConnection implements LLDatabaseConnection { .cast(GeneratedEntityId.class) .map(GeneratedEntityId::id) .map(id -> new LLKeyValueDatabase() { + @Override public Mono getSingleton(byte[] singletonListColumnName, byte[] name, @@ -423,6 +424,21 @@ public class LLQuicConnection implements LLDatabaseConnection { public Mono releaseSnapshot(LLSnapshot snapshot) { return null; } + + @Override + public Mono pauseForBackup() { + return Mono.empty(); + } + + @Override + public Mono resumeAfterBackup() { + return Mono.empty(); + } + + @Override + public boolean isPaused() { + return false; + } }); } @@ -446,6 +462,7 @@ public class LLQuicConnection implements LLDatabaseConnection { .cast(GeneratedEntityId.class) .map(GeneratedEntityId::id) .map(id -> new LLLuceneIndex() { + @Override public String getLuceneIndexName() { return clusterName; @@ -543,6 +560,21 @@ public class LLQuicConnection implements LLDatabaseConnection { public Mono releaseSnapshot(LLSnapshot snapshot) { return null; } + + @Override + public Mono pauseForBackup() { + return null; + } + + @Override + public Mono resumeAfterBackup() { + return null; + } + + @Override + public boolean isPaused() { + return false; + } }); }