Add update method to single values

This commit is contained in:
Andrea Cavalli 2021-11-12 02:05:44 +01:00
parent 42c4b6e651
commit 23d5f700fb
6 changed files with 165 additions and 12 deletions

View File

@ -1,11 +1,23 @@
package it.cavallium.dbengine.database; package it.cavallium.dbengine.database;
import io.net5.buffer.api.Buffer;
import io.net5.buffer.api.BufferAllocator;
import io.net5.buffer.api.Send;
import it.cavallium.dbengine.database.serialization.SerializationFunction;
import it.unimi.dsi.fastutil.bytes.ByteList;
import java.util.function.Function;
import org.jetbrains.annotations.Nullable; import org.jetbrains.annotations.Nullable;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
public interface LLSingleton extends LLKeyValueDatabaseStructure { public interface LLSingleton extends LLKeyValueDatabaseStructure {
BufferAllocator getAllocator();
Mono<byte[]> get(@Nullable LLSnapshot snapshot); Mono<byte[]> get(@Nullable LLSnapshot snapshot);
Mono<Void> set(byte[] value); Mono<Void> set(byte[] value);
Mono<Send<Buffer>> update(SerializationFunction<@Nullable Send<Buffer>, @Nullable Buffer> updater,
UpdateReturnMode updateReturnMode);
} }

View File

@ -5,6 +5,7 @@ import com.google.common.primitives.Longs;
import it.cavallium.dbengine.database.LLKeyValueDatabaseStructure; import it.cavallium.dbengine.database.LLKeyValueDatabaseStructure;
import it.cavallium.dbengine.database.LLSingleton; import it.cavallium.dbengine.database.LLSingleton;
import it.cavallium.dbengine.database.LLSnapshot; import it.cavallium.dbengine.database.LLSnapshot;
import it.cavallium.dbengine.database.UpdateReturnMode;
import org.jetbrains.annotations.Nullable; import org.jetbrains.annotations.Nullable;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
@ -26,6 +27,39 @@ public class DatabaseLong implements LLKeyValueDatabaseStructure {
}); });
} }
public Mono<Long> incrementAndGet() {
return incrementAnd(UpdateReturnMode.GET_NEW_VALUE);
}
public Mono<Long> getAndIncrement() {
return incrementAnd(UpdateReturnMode.GET_OLD_VALUE);
}
private Mono<Long> incrementAnd(UpdateReturnMode updateReturnMode) {
return singleton.update(prev -> {
if (prev != null) {
try (var prevBuf = prev.receive()) {
var prevLong = prevBuf.readLong();
var alloc = singleton.getAllocator();
try (var buf = alloc.allocate(Long.BYTES)) {
buf.writeLong(prevLong + 1);
return buf;
}
}
} else {
var alloc = singleton.getAllocator();
try (var buf = alloc.allocate(Long.BYTES)) {
buf.writeLong(1);
return buf;
}
}
}, updateReturnMode).map(send -> {
try (var buf = send.receive()) {
return buf.readLong();
}
}).single();
}
public Mono<Void> set(long value) { public Mono<Void> set(long value) {
return singleton.set(Longs.toByteArray(value)); return singleton.set(Longs.toByteArray(value));
} }

View File

@ -473,8 +473,8 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
@Override @Override
public Mono<LLLocalSingleton> getSingleton(byte[] singletonListColumnName, byte[] name, byte[] defaultValue) { public Mono<LLLocalSingleton> getSingleton(byte[] singletonListColumnName, byte[] name, byte[] defaultValue) {
return Mono return Mono
.fromCallable(() -> new LLLocalSingleton(db, .fromCallable(() -> new LLLocalSingleton(
getCfh(singletonListColumnName), getRocksDBColumn(db, getCfh(singletonListColumnName)),
(snapshot) -> snapshotsHandles.get(snapshot.getSequenceNumber()), (snapshot) -> snapshotsHandles.get(snapshot.getSequenceNumber()),
LLLocalKeyValueDatabase.this.name, LLLocalKeyValueDatabase.this.name,
name, name,

View File

@ -1,50 +1,72 @@
package it.cavallium.dbengine.database.disk; package it.cavallium.dbengine.database.disk;
import io.net5.buffer.api.Buffer;
import io.net5.buffer.api.BufferAllocator;
import io.net5.buffer.api.Send;
import it.cavallium.dbengine.database.Delta;
import it.cavallium.dbengine.database.LLSingleton; import it.cavallium.dbengine.database.LLSingleton;
import it.cavallium.dbengine.database.LLSnapshot; import it.cavallium.dbengine.database.LLSnapshot;
import it.cavallium.dbengine.database.UpdateMode;
import it.cavallium.dbengine.database.UpdateReturnMode;
import it.cavallium.dbengine.database.serialization.SerializationFunction;
import it.unimi.dsi.fastutil.bytes.ByteList;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays; import java.util.Arrays;
import java.util.concurrent.Callable;
import java.util.function.Function; import java.util.function.Function;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable; import org.jetbrains.annotations.Nullable;
import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ReadOptions; import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB; import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException; import org.rocksdb.RocksDBException;
import org.rocksdb.Snapshot; import org.rocksdb.Snapshot;
import org.rocksdb.WriteOptions;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers; import reactor.core.scheduler.Schedulers;
public class LLLocalSingleton implements LLSingleton { public class LLLocalSingleton implements LLSingleton {
private static final ReadOptions EMPTY_READ_OPTIONS = new ReadOptions(); static final ReadOptions EMPTY_READ_OPTIONS = new UnreleasableReadOptions(new UnmodifiableReadOptions());
private final RocksDB db; static final WriteOptions EMPTY_WRITE_OPTIONS = new UnreleasableWriteOptions(new UnmodifiableWriteOptions());
private final ColumnFamilyHandle cfh; private final RocksDBColumn db;
private final Function<LLSnapshot, Snapshot> snapshotResolver; private final Function<LLSnapshot, Snapshot> snapshotResolver;
private final byte[] name; private final byte[] name;
private final Mono<Send<Buffer>> nameMono;
private final String databaseName; private final String databaseName;
private final Scheduler dbScheduler; private final Scheduler dbScheduler;
public LLLocalSingleton(RocksDB db, ColumnFamilyHandle singletonListColumn, public LLLocalSingleton(RocksDBColumn db,
Function<LLSnapshot, Snapshot> snapshotResolver, Function<LLSnapshot, Snapshot> snapshotResolver,
String databaseName, String databaseName,
byte[] name, byte[] name,
Scheduler dbScheduler, Scheduler dbScheduler,
byte[] defaultValue) throws RocksDBException { byte[] defaultValue) throws RocksDBException {
this.db = db; this.db = db;
this.cfh = singletonListColumn;
this.databaseName = databaseName; this.databaseName = databaseName;
this.snapshotResolver = snapshotResolver; this.snapshotResolver = snapshotResolver;
this.name = name; this.name = name;
this.nameMono = Mono.fromCallable(() -> {
var alloc = db.getAllocator();
try (var nameBuf = alloc.allocate(this.name.length)) {
nameBuf.writeBytes(this.name);
return nameBuf.send();
}
});
this.dbScheduler = dbScheduler; this.dbScheduler = dbScheduler;
if (Schedulers.isInNonBlockingThread()) { if (Schedulers.isInNonBlockingThread()) {
throw new UnsupportedOperationException("Initialized in a nonblocking thread"); throw new UnsupportedOperationException("Initialized in a nonblocking thread");
} }
if (db.get(cfh, this.name) == null) { if (db.get(EMPTY_READ_OPTIONS, this.name, true) == null) {
db.put(cfh, this.name, defaultValue); db.put(EMPTY_WRITE_OPTIONS, this.name, defaultValue);
} }
} }
private <T> @NotNull Mono<T> runOnDb(Callable<@Nullable T> callable) {
return Mono.fromCallable(callable).subscribeOn(dbScheduler);
}
private ReadOptions resolveSnapshot(LLSnapshot snapshot) { private ReadOptions resolveSnapshot(LLSnapshot snapshot) {
if (snapshot != null) { if (snapshot != null) {
return new ReadOptions().setSnapshot(snapshotResolver.apply(snapshot)); return new ReadOptions().setSnapshot(snapshotResolver.apply(snapshot));
@ -53,6 +75,11 @@ public class LLLocalSingleton implements LLSingleton {
} }
} }
@Override
public BufferAllocator getAllocator() {
return db.getAllocator();
}
@Override @Override
public Mono<byte[]> get(@Nullable LLSnapshot snapshot) { public Mono<byte[]> get(@Nullable LLSnapshot snapshot) {
return Mono return Mono
@ -60,7 +87,7 @@ public class LLLocalSingleton implements LLSingleton {
if (Schedulers.isInNonBlockingThread()) { if (Schedulers.isInNonBlockingThread()) {
throw new UnsupportedOperationException("Called get in a nonblocking thread"); throw new UnsupportedOperationException("Called get in a nonblocking thread");
} }
return db.get(cfh, resolveSnapshot(snapshot), name); return db.get(resolveSnapshot(snapshot), name, true);
}) })
.onErrorMap(cause -> new IOException("Failed to read " + Arrays.toString(name), cause)) .onErrorMap(cause -> new IOException("Failed to read " + Arrays.toString(name), cause))
.subscribeOn(dbScheduler); .subscribeOn(dbScheduler);
@ -73,13 +100,36 @@ public class LLLocalSingleton implements LLSingleton {
if (Schedulers.isInNonBlockingThread()) { if (Schedulers.isInNonBlockingThread()) {
throw new UnsupportedOperationException("Called set in a nonblocking thread"); throw new UnsupportedOperationException("Called set in a nonblocking thread");
} }
db.put(cfh, name, value); db.put(EMPTY_WRITE_OPTIONS, name, value);
return null; return null;
}) })
.onErrorMap(cause -> new IOException("Failed to write " + Arrays.toString(name), cause)) .onErrorMap(cause -> new IOException("Failed to write " + Arrays.toString(name), cause))
.subscribeOn(dbScheduler); .subscribeOn(dbScheduler);
} }
@Override
public Mono<Send<Buffer>> update(SerializationFunction<@Nullable Send<Buffer>, @Nullable Buffer> updater,
UpdateReturnMode updateReturnMode) {
return Mono.usingWhen(nameMono, keySend -> runOnDb(() -> {
if (Schedulers.isInNonBlockingThread()) {
throw new UnsupportedOperationException("Called update in a nonblocking thread");
}
UpdateAtomicResultMode returnMode = switch (updateReturnMode) {
case NOTHING -> UpdateAtomicResultMode.NOTHING;
case GET_NEW_VALUE -> UpdateAtomicResultMode.CURRENT;
case GET_OLD_VALUE -> UpdateAtomicResultMode.PREVIOUS;
};
UpdateAtomicResult result = db.updateAtomic(EMPTY_READ_OPTIONS, EMPTY_WRITE_OPTIONS, keySend, updater,
true, returnMode);
return switch (updateReturnMode) {
case NOTHING -> null;
case GET_NEW_VALUE -> ((UpdateAtomicResultCurrent) result).current();
case GET_OLD_VALUE -> ((UpdateAtomicResultPrevious) result).previous();
};
}).onErrorMap(cause -> new IOException("Failed to read or write", cause)),
keySend -> Mono.fromRunnable(keySend::close));
}
@Override @Override
public String getDatabaseName() { public String getDatabaseName() {
return databaseName; return databaseName;

View File

@ -4,6 +4,7 @@ import io.micrometer.core.instrument.MeterRegistry;
import io.net5.buffer.api.Buffer; import io.net5.buffer.api.Buffer;
import io.net5.buffer.api.BufferAllocator; import io.net5.buffer.api.BufferAllocator;
import io.net5.buffer.api.Send; import io.net5.buffer.api.Send;
import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.RepeatedElementList; import it.cavallium.dbengine.database.RepeatedElementList;
import it.cavallium.dbengine.database.UpdateReturnMode; import it.cavallium.dbengine.database.UpdateReturnMode;
import it.cavallium.dbengine.database.serialization.SerializationFunction; import it.cavallium.dbengine.database.serialization.SerializationFunction;
@ -26,6 +27,23 @@ import org.rocksdb.WriteOptions;
public sealed interface RocksDBColumn permits AbstractRocksDBColumn { public sealed interface RocksDBColumn permits AbstractRocksDBColumn {
default byte @Nullable [] get(@NotNull ReadOptions readOptions,
byte[] key,
boolean existsAlmostCertainly)
throws RocksDBException {
var allocator = getAllocator();
try (var keyBuf = allocator.allocate(key.length)) {
keyBuf.writeBytes(key);
var result = this.get(readOptions, keyBuf.send(), existsAlmostCertainly);
if (result == null) {
return null;
}
try (var resultBuf = result.receive()) {
return LLUtils.toArray(resultBuf);
}
}
}
@Nullable @Nullable
Send<Buffer> get(@NotNull ReadOptions readOptions, Send<Buffer> keySend, Send<Buffer> get(@NotNull ReadOptions readOptions, Send<Buffer> keySend,
boolean existsAlmostCertainly) throws RocksDBException; boolean existsAlmostCertainly) throws RocksDBException;
@ -35,6 +53,19 @@ public sealed interface RocksDBColumn permits AbstractRocksDBColumn {
void put(@NotNull WriteOptions writeOptions, Send<Buffer> keyToReceive, void put(@NotNull WriteOptions writeOptions, Send<Buffer> keyToReceive,
Send<Buffer> valueToReceive) throws RocksDBException; Send<Buffer> valueToReceive) throws RocksDBException;
default void put(@NotNull WriteOptions writeOptions, byte[] key, byte[] value)
throws RocksDBException {
var allocator = getAllocator();
try (var keyBuf = allocator.allocate(key.length)) {
keyBuf.writeBytes(key);
try (var valBuf = allocator.allocate(value.length)) {
valBuf.writeBytes(value);
this.put(writeOptions, keyBuf.send(), valBuf.send());
}
}
}
@NotNull RocksIterator newIterator(@NotNull ReadOptions readOptions); @NotNull RocksIterator newIterator(@NotNull ReadOptions readOptions);
@NotNull UpdateAtomicResult updateAtomic(@NotNull ReadOptions readOptions, @NotNull WriteOptions writeOptions, @NotNull UpdateAtomicResult updateAtomic(@NotNull ReadOptions readOptions, @NotNull WriteOptions writeOptions,
@ -45,7 +76,7 @@ public sealed interface RocksDBColumn permits AbstractRocksDBColumn {
void delete(WriteOptions writeOptions, byte[] key) throws RocksDBException; void delete(WriteOptions writeOptions, byte[] key) throws RocksDBException;
List<byte[]> multiGetAsList(ReadOptions resolveSnapshot, List<byte[]> keys) throws RocksDBException; List<byte[]> multiGetAsList(ReadOptions readOptions, List<byte[]> keys) throws RocksDBException;
void write(WriteOptions writeOptions, WriteBatch writeBatch) throws RocksDBException; void write(WriteOptions writeOptions, WriteBatch writeBatch) throws RocksDBException;

View File

@ -1,13 +1,28 @@
package it.cavallium.dbengine.database.memory; package it.cavallium.dbengine.database.memory;
import io.net5.buffer.api.Buffer; import io.net5.buffer.api.Buffer;
import io.net5.buffer.api.BufferAllocator;
import io.net5.buffer.api.Send; import io.net5.buffer.api.Send;
import it.cavallium.dbengine.database.LLDelta;
import it.cavallium.dbengine.database.LLDictionaryResultType; import it.cavallium.dbengine.database.LLDictionaryResultType;
import it.cavallium.dbengine.database.LLSingleton; import it.cavallium.dbengine.database.LLSingleton;
import it.cavallium.dbengine.database.LLSnapshot; import it.cavallium.dbengine.database.LLSnapshot;
import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.UpdateMode;
import it.cavallium.dbengine.database.UpdateReturnMode;
import it.cavallium.dbengine.database.disk.UpdateAtomicResult;
import it.cavallium.dbengine.database.disk.UpdateAtomicResultCurrent;
import it.cavallium.dbengine.database.disk.UpdateAtomicResultMode;
import it.cavallium.dbengine.database.disk.UpdateAtomicResultPrevious;
import it.cavallium.dbengine.database.serialization.SerializationException;
import it.cavallium.dbengine.database.serialization.SerializationFunction;
import java.io.IOException;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicReference;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable; import org.jetbrains.annotations.Nullable;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
public class LLMemorySingleton implements LLSingleton { public class LLMemorySingleton implements LLSingleton {
@ -30,6 +45,11 @@ public class LLMemorySingleton implements LLSingleton {
return dict.getDatabaseName(); return dict.getDatabaseName();
} }
@Override
public BufferAllocator getAllocator() {
return dict.getAllocator();
}
@Override @Override
public Mono<byte[]> get(@Nullable LLSnapshot snapshot) { public Mono<byte[]> get(@Nullable LLSnapshot snapshot) {
return dict return dict
@ -49,4 +69,10 @@ public class LLMemorySingleton implements LLSingleton {
.put(bbKey, bbVal, LLDictionaryResultType.VOID) .put(bbKey, bbVal, LLDictionaryResultType.VOID)
.then(); .then();
} }
@Override
public Mono<Send<Buffer>> update(SerializationFunction<@Nullable Send<Buffer>, @Nullable Buffer> updater,
UpdateReturnMode updateReturnMode) {
return dict.update(singletonNameBufMono, updater, updateReturnMode, true);
}
} }