diff --git a/src/main/java/it/cavallium/dbengine/database/LLSingleton.java b/src/main/java/it/cavallium/dbengine/database/LLSingleton.java index 65135c0..4eb2b18 100644 --- a/src/main/java/it/cavallium/dbengine/database/LLSingleton.java +++ b/src/main/java/it/cavallium/dbengine/database/LLSingleton.java @@ -1,11 +1,23 @@ package it.cavallium.dbengine.database; +import io.net5.buffer.api.Buffer; +import io.net5.buffer.api.BufferAllocator; +import io.net5.buffer.api.Send; +import it.cavallium.dbengine.database.serialization.SerializationFunction; +import it.unimi.dsi.fastutil.bytes.ByteList; +import java.util.function.Function; import org.jetbrains.annotations.Nullable; import reactor.core.publisher.Mono; public interface LLSingleton extends LLKeyValueDatabaseStructure { + + BufferAllocator getAllocator(); + Mono get(@Nullable LLSnapshot snapshot); Mono set(byte[] value); + + Mono> update(SerializationFunction<@Nullable Send, @Nullable Buffer> updater, + UpdateReturnMode updateReturnMode); } diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseLong.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseLong.java index bc5d52d..57a0f90 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseLong.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseLong.java @@ -5,6 +5,7 @@ import com.google.common.primitives.Longs; import it.cavallium.dbengine.database.LLKeyValueDatabaseStructure; import it.cavallium.dbengine.database.LLSingleton; import it.cavallium.dbengine.database.LLSnapshot; +import it.cavallium.dbengine.database.UpdateReturnMode; import org.jetbrains.annotations.Nullable; import reactor.core.publisher.Mono; @@ -26,6 +27,39 @@ public class DatabaseLong implements LLKeyValueDatabaseStructure { }); } + public Mono incrementAndGet() { + return incrementAnd(UpdateReturnMode.GET_NEW_VALUE); + } + + public Mono getAndIncrement() { + return incrementAnd(UpdateReturnMode.GET_OLD_VALUE); + } + + private Mono incrementAnd(UpdateReturnMode updateReturnMode) { + return singleton.update(prev -> { + if (prev != null) { + try (var prevBuf = prev.receive()) { + var prevLong = prevBuf.readLong(); + var alloc = singleton.getAllocator(); + try (var buf = alloc.allocate(Long.BYTES)) { + buf.writeLong(prevLong + 1); + return buf; + } + } + } else { + var alloc = singleton.getAllocator(); + try (var buf = alloc.allocate(Long.BYTES)) { + buf.writeLong(1); + return buf; + } + } + }, updateReturnMode).map(send -> { + try (var buf = send.receive()) { + return buf.readLong(); + } + }).single(); + } + public Mono set(long value) { return singleton.set(Longs.toByteArray(value)); } diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java index 49b1db7..068a171 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java @@ -473,8 +473,8 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { @Override public Mono getSingleton(byte[] singletonListColumnName, byte[] name, byte[] defaultValue) { return Mono - .fromCallable(() -> new LLLocalSingleton(db, - getCfh(singletonListColumnName), + .fromCallable(() -> new LLLocalSingleton( + getRocksDBColumn(db, getCfh(singletonListColumnName)), (snapshot) -> snapshotsHandles.get(snapshot.getSequenceNumber()), LLLocalKeyValueDatabase.this.name, name, diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalSingleton.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalSingleton.java index 71d49e3..bbb6086 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalSingleton.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalSingleton.java @@ -1,50 +1,72 @@ package it.cavallium.dbengine.database.disk; +import io.net5.buffer.api.Buffer; +import io.net5.buffer.api.BufferAllocator; +import io.net5.buffer.api.Send; +import it.cavallium.dbengine.database.Delta; import it.cavallium.dbengine.database.LLSingleton; import it.cavallium.dbengine.database.LLSnapshot; +import it.cavallium.dbengine.database.UpdateMode; +import it.cavallium.dbengine.database.UpdateReturnMode; +import it.cavallium.dbengine.database.serialization.SerializationFunction; +import it.unimi.dsi.fastutil.bytes.ByteList; import java.io.IOException; import java.util.Arrays; +import java.util.concurrent.Callable; import java.util.function.Function; +import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.ReadOptions; import org.rocksdb.RocksDB; import org.rocksdb.RocksDBException; import org.rocksdb.Snapshot; +import org.rocksdb.WriteOptions; import reactor.core.publisher.Mono; import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Schedulers; public class LLLocalSingleton implements LLSingleton { - private static final ReadOptions EMPTY_READ_OPTIONS = new ReadOptions(); - private final RocksDB db; - private final ColumnFamilyHandle cfh; + static final ReadOptions EMPTY_READ_OPTIONS = new UnreleasableReadOptions(new UnmodifiableReadOptions()); + static final WriteOptions EMPTY_WRITE_OPTIONS = new UnreleasableWriteOptions(new UnmodifiableWriteOptions()); + private final RocksDBColumn db; private final Function snapshotResolver; private final byte[] name; + private final Mono> nameMono; private final String databaseName; private final Scheduler dbScheduler; - public LLLocalSingleton(RocksDB db, ColumnFamilyHandle singletonListColumn, + public LLLocalSingleton(RocksDBColumn db, Function snapshotResolver, String databaseName, byte[] name, Scheduler dbScheduler, byte[] defaultValue) throws RocksDBException { this.db = db; - this.cfh = singletonListColumn; this.databaseName = databaseName; this.snapshotResolver = snapshotResolver; this.name = name; + this.nameMono = Mono.fromCallable(() -> { + var alloc = db.getAllocator(); + try (var nameBuf = alloc.allocate(this.name.length)) { + nameBuf.writeBytes(this.name); + return nameBuf.send(); + } + }); this.dbScheduler = dbScheduler; if (Schedulers.isInNonBlockingThread()) { throw new UnsupportedOperationException("Initialized in a nonblocking thread"); } - if (db.get(cfh, this.name) == null) { - db.put(cfh, this.name, defaultValue); + if (db.get(EMPTY_READ_OPTIONS, this.name, true) == null) { + db.put(EMPTY_WRITE_OPTIONS, this.name, defaultValue); } } + private @NotNull Mono runOnDb(Callable<@Nullable T> callable) { + return Mono.fromCallable(callable).subscribeOn(dbScheduler); + } + private ReadOptions resolveSnapshot(LLSnapshot snapshot) { if (snapshot != null) { return new ReadOptions().setSnapshot(snapshotResolver.apply(snapshot)); @@ -53,6 +75,11 @@ public class LLLocalSingleton implements LLSingleton { } } + @Override + public BufferAllocator getAllocator() { + return db.getAllocator(); + } + @Override public Mono get(@Nullable LLSnapshot snapshot) { return Mono @@ -60,7 +87,7 @@ public class LLLocalSingleton implements LLSingleton { if (Schedulers.isInNonBlockingThread()) { throw new UnsupportedOperationException("Called get in a nonblocking thread"); } - return db.get(cfh, resolveSnapshot(snapshot), name); + return db.get(resolveSnapshot(snapshot), name, true); }) .onErrorMap(cause -> new IOException("Failed to read " + Arrays.toString(name), cause)) .subscribeOn(dbScheduler); @@ -73,13 +100,36 @@ public class LLLocalSingleton implements LLSingleton { if (Schedulers.isInNonBlockingThread()) { throw new UnsupportedOperationException("Called set in a nonblocking thread"); } - db.put(cfh, name, value); + db.put(EMPTY_WRITE_OPTIONS, name, value); return null; }) .onErrorMap(cause -> new IOException("Failed to write " + Arrays.toString(name), cause)) .subscribeOn(dbScheduler); } + @Override + public Mono> update(SerializationFunction<@Nullable Send, @Nullable Buffer> updater, + UpdateReturnMode updateReturnMode) { + return Mono.usingWhen(nameMono, keySend -> runOnDb(() -> { + if (Schedulers.isInNonBlockingThread()) { + throw new UnsupportedOperationException("Called update in a nonblocking thread"); + } + UpdateAtomicResultMode returnMode = switch (updateReturnMode) { + case NOTHING -> UpdateAtomicResultMode.NOTHING; + case GET_NEW_VALUE -> UpdateAtomicResultMode.CURRENT; + case GET_OLD_VALUE -> UpdateAtomicResultMode.PREVIOUS; + }; + UpdateAtomicResult result = db.updateAtomic(EMPTY_READ_OPTIONS, EMPTY_WRITE_OPTIONS, keySend, updater, + true, returnMode); + return switch (updateReturnMode) { + case NOTHING -> null; + case GET_NEW_VALUE -> ((UpdateAtomicResultCurrent) result).current(); + case GET_OLD_VALUE -> ((UpdateAtomicResultPrevious) result).previous(); + }; + }).onErrorMap(cause -> new IOException("Failed to read or write", cause)), + keySend -> Mono.fromRunnable(keySend::close)); + } + @Override public String getDatabaseName() { return databaseName; diff --git a/src/main/java/it/cavallium/dbengine/database/disk/RocksDBColumn.java b/src/main/java/it/cavallium/dbengine/database/disk/RocksDBColumn.java index 2f16656..51cf683 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/RocksDBColumn.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/RocksDBColumn.java @@ -4,6 +4,7 @@ import io.micrometer.core.instrument.MeterRegistry; import io.net5.buffer.api.Buffer; import io.net5.buffer.api.BufferAllocator; import io.net5.buffer.api.Send; +import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.database.RepeatedElementList; import it.cavallium.dbengine.database.UpdateReturnMode; import it.cavallium.dbengine.database.serialization.SerializationFunction; @@ -26,6 +27,23 @@ import org.rocksdb.WriteOptions; public sealed interface RocksDBColumn permits AbstractRocksDBColumn { + default byte @Nullable [] get(@NotNull ReadOptions readOptions, + byte[] key, + boolean existsAlmostCertainly) + throws RocksDBException { + var allocator = getAllocator(); + try (var keyBuf = allocator.allocate(key.length)) { + keyBuf.writeBytes(key); + var result = this.get(readOptions, keyBuf.send(), existsAlmostCertainly); + if (result == null) { + return null; + } + try (var resultBuf = result.receive()) { + return LLUtils.toArray(resultBuf); + } + } + } + @Nullable Send get(@NotNull ReadOptions readOptions, Send keySend, boolean existsAlmostCertainly) throws RocksDBException; @@ -35,6 +53,19 @@ public sealed interface RocksDBColumn permits AbstractRocksDBColumn { void put(@NotNull WriteOptions writeOptions, Send keyToReceive, Send valueToReceive) throws RocksDBException; + default void put(@NotNull WriteOptions writeOptions, byte[] key, byte[] value) + throws RocksDBException { + var allocator = getAllocator(); + try (var keyBuf = allocator.allocate(key.length)) { + keyBuf.writeBytes(key); + try (var valBuf = allocator.allocate(value.length)) { + valBuf.writeBytes(value); + + this.put(writeOptions, keyBuf.send(), valBuf.send()); + } + } + } + @NotNull RocksIterator newIterator(@NotNull ReadOptions readOptions); @NotNull UpdateAtomicResult updateAtomic(@NotNull ReadOptions readOptions, @NotNull WriteOptions writeOptions, @@ -45,7 +76,7 @@ public sealed interface RocksDBColumn permits AbstractRocksDBColumn { void delete(WriteOptions writeOptions, byte[] key) throws RocksDBException; - List multiGetAsList(ReadOptions resolveSnapshot, List keys) throws RocksDBException; + List multiGetAsList(ReadOptions readOptions, List keys) throws RocksDBException; void write(WriteOptions writeOptions, WriteBatch writeBatch) throws RocksDBException; diff --git a/src/main/java/it/cavallium/dbengine/database/memory/LLMemorySingleton.java b/src/main/java/it/cavallium/dbengine/database/memory/LLMemorySingleton.java index 11ddf72..ee15dfc 100644 --- a/src/main/java/it/cavallium/dbengine/database/memory/LLMemorySingleton.java +++ b/src/main/java/it/cavallium/dbengine/database/memory/LLMemorySingleton.java @@ -1,13 +1,28 @@ package it.cavallium.dbengine.database.memory; import io.net5.buffer.api.Buffer; +import io.net5.buffer.api.BufferAllocator; import io.net5.buffer.api.Send; +import it.cavallium.dbengine.database.LLDelta; import it.cavallium.dbengine.database.LLDictionaryResultType; import it.cavallium.dbengine.database.LLSingleton; import it.cavallium.dbengine.database.LLSnapshot; import it.cavallium.dbengine.database.LLUtils; +import it.cavallium.dbengine.database.UpdateMode; +import it.cavallium.dbengine.database.UpdateReturnMode; +import it.cavallium.dbengine.database.disk.UpdateAtomicResult; +import it.cavallium.dbengine.database.disk.UpdateAtomicResultCurrent; +import it.cavallium.dbengine.database.disk.UpdateAtomicResultMode; +import it.cavallium.dbengine.database.disk.UpdateAtomicResultPrevious; +import it.cavallium.dbengine.database.serialization.SerializationException; +import it.cavallium.dbengine.database.serialization.SerializationFunction; +import java.io.IOException; +import java.util.concurrent.Callable; +import java.util.concurrent.atomic.AtomicReference; +import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; public class LLMemorySingleton implements LLSingleton { @@ -30,6 +45,11 @@ public class LLMemorySingleton implements LLSingleton { return dict.getDatabaseName(); } + @Override + public BufferAllocator getAllocator() { + return dict.getAllocator(); + } + @Override public Mono get(@Nullable LLSnapshot snapshot) { return dict @@ -49,4 +69,10 @@ public class LLMemorySingleton implements LLSingleton { .put(bbKey, bbVal, LLDictionaryResultType.VOID) .then(); } + + @Override + public Mono> update(SerializationFunction<@Nullable Send, @Nullable Buffer> updater, + UpdateReturnMode updateReturnMode) { + return dict.update(singletonNameBufMono, updater, updateReturnMode, true); + } }