CavalliumDBEngine/src/main/java/it/cavallium/dbengine/database/disk/LLLocalSingleton.java

183 lines
6.1 KiB
Java
Raw Normal View History

2020-12-07 22:15:18 +01:00
package it.cavallium.dbengine.database.disk;
import static it.cavallium.dbengine.database.disk.UpdateAtomicResultMode.DELTA;
2022-03-16 13:47:56 +01:00
import io.netty5.buffer.api.Buffer;
import io.netty5.buffer.api.BufferAllocator;
import io.netty5.buffer.api.Send;
2022-03-20 14:33:27 +01:00
import it.cavallium.dbengine.database.LLDelta;
import it.cavallium.dbengine.database.LLSingleton;
import it.cavallium.dbengine.database.LLSnapshot;
2021-11-12 02:05:44 +01:00
import it.cavallium.dbengine.database.UpdateReturnMode;
2020-12-07 22:15:18 +01:00
import java.io.IOException;
2021-03-04 22:01:50 +01:00
import java.util.Arrays;
2021-11-12 02:05:44 +01:00
import java.util.concurrent.Callable;
2020-12-07 22:15:18 +01:00
import java.util.function.Function;
2021-11-12 02:05:44 +01:00
import org.jetbrains.annotations.NotNull;
2020-12-07 22:15:18 +01:00
import org.jetbrains.annotations.Nullable;
import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDBException;
import org.rocksdb.Snapshot;
2021-11-12 02:05:44 +01:00
import org.rocksdb.WriteOptions;
2021-01-30 10:52:14 +01:00
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
2020-12-07 22:15:18 +01:00
public class LLLocalSingleton implements LLSingleton {
2021-11-12 02:05:44 +01:00
static final ReadOptions EMPTY_READ_OPTIONS = new UnreleasableReadOptions(new UnmodifiableReadOptions());
static final WriteOptions EMPTY_WRITE_OPTIONS = new UnreleasableWriteOptions(new UnmodifiableWriteOptions());
private final RocksDBColumn db;
2020-12-07 22:15:18 +01:00
private final Function<LLSnapshot, Snapshot> snapshotResolver;
private final byte[] name;
2022-03-20 14:33:27 +01:00
private final String columnName;
2021-11-12 02:05:44 +01:00
private final Mono<Send<Buffer>> nameMono;
2020-12-07 22:15:18 +01:00
private final String databaseName;
2022-04-05 13:58:12 +02:00
private final Scheduler dbWScheduler;
private final Scheduler dbRScheduler;
2020-12-07 22:15:18 +01:00
2021-11-12 02:05:44 +01:00
public LLLocalSingleton(RocksDBColumn db,
2020-12-07 22:15:18 +01:00
Function<LLSnapshot, Snapshot> snapshotResolver,
String databaseName,
byte[] name,
2022-03-20 14:33:27 +01:00
String columnName,
2022-04-05 13:58:12 +02:00
Scheduler dbWScheduler,
Scheduler dbRScheduler,
2022-03-20 14:33:27 +01:00
byte @Nullable [] defaultValue) throws RocksDBException {
2020-12-07 22:15:18 +01:00
this.db = db;
this.databaseName = databaseName;
this.snapshotResolver = snapshotResolver;
this.name = name;
2022-03-20 14:33:27 +01:00
this.columnName = columnName;
2021-11-12 02:05:44 +01:00
this.nameMono = Mono.fromCallable(() -> {
var alloc = db.getAllocator();
try (var nameBuf = alloc.allocate(this.name.length)) {
nameBuf.writeBytes(this.name);
return nameBuf.send();
}
});
2022-04-05 13:58:12 +02:00
this.dbWScheduler = dbWScheduler;
this.dbRScheduler = dbRScheduler;
if (Schedulers.isInNonBlockingThread()) {
throw new UnsupportedOperationException("Initialized in a nonblocking thread");
}
2022-03-20 14:33:27 +01:00
if (defaultValue != null && db.get(EMPTY_READ_OPTIONS, this.name, true) == null) {
2021-11-12 02:05:44 +01:00
db.put(EMPTY_WRITE_OPTIONS, this.name, defaultValue);
2020-12-07 22:15:18 +01:00
}
}
2022-04-05 13:58:12 +02:00
private <T> @NotNull Mono<T> runOnDb(boolean write, Callable<@Nullable T> callable) {
return Mono.fromCallable(callable).subscribeOn(write ? dbWScheduler : dbRScheduler);
2021-11-12 02:05:44 +01:00
}
2020-12-07 22:15:18 +01:00
private ReadOptions resolveSnapshot(LLSnapshot snapshot) {
if (snapshot != null) {
return new ReadOptions().setSnapshot(snapshotResolver.apply(snapshot));
} else {
return EMPTY_READ_OPTIONS;
}
}
2021-11-12 02:05:44 +01:00
@Override
public BufferAllocator getAllocator() {
return db.getAllocator();
}
2020-12-07 22:15:18 +01:00
@Override
2022-03-20 14:33:27 +01:00
public Mono<Send<Buffer>> get(@Nullable LLSnapshot snapshot) {
2022-04-05 13:58:12 +02:00
return nameMono.publishOn(dbRScheduler).handle((nameSend, sink) -> {
2022-03-20 14:33:27 +01:00
try (Buffer name = nameSend.receive()) {
Buffer result = db.get(resolveSnapshot(snapshot), name);
if (result != null) {
sink.next(result.send());
} else {
sink.complete();
}
} catch (RocksDBException ex) {
sink.error(new IOException("Failed to read " + Arrays.toString(name), ex));
}
});
2020-12-07 22:15:18 +01:00
}
@Override
2022-03-20 14:33:27 +01:00
public Mono<Void> set(Mono<Send<Buffer>> valueMono) {
2022-04-05 13:58:12 +02:00
return Mono.zip(nameMono, valueMono).publishOn(dbWScheduler).handle((tuple, sink) -> {
2022-03-20 14:33:27 +01:00
var nameSend = tuple.getT1();
var valueSend = tuple.getT2();
try (Buffer name = nameSend.receive()) {
try (Buffer value = valueSend.receive()) {
2021-11-12 02:05:44 +01:00
db.put(EMPTY_WRITE_OPTIONS, name, value);
2022-03-20 14:33:27 +01:00
sink.next(true);
}
} catch (RocksDBException ex) {
sink.error(new IOException("Failed to write " + Arrays.toString(name), ex));
}
}).switchIfEmpty(unset().thenReturn(true)).then();
}
private Mono<Void> unset() {
2022-04-05 13:58:12 +02:00
return nameMono.publishOn(dbWScheduler).handle((nameSend, sink) -> {
2022-03-20 14:33:27 +01:00
try (Buffer name = nameSend.receive()) {
db.delete(EMPTY_WRITE_OPTIONS, name);
} catch (RocksDBException ex) {
sink.error(new IOException("Failed to read " + Arrays.toString(name), ex));
}
});
2020-12-07 22:15:18 +01:00
}
2021-11-12 02:05:44 +01:00
@Override
public Mono<Send<Buffer>> update(BinarySerializationFunction updater,
2021-11-12 02:05:44 +01:00
UpdateReturnMode updateReturnMode) {
2022-04-05 13:58:12 +02:00
return Mono.usingWhen(nameMono, keySend -> runOnDb(true, () -> {
if (Schedulers.isInNonBlockingThread()) {
throw new UnsupportedOperationException("Called update in a nonblocking thread");
}
UpdateAtomicResultMode returnMode = switch (updateReturnMode) {
case NOTHING -> UpdateAtomicResultMode.NOTHING;
case GET_NEW_VALUE -> UpdateAtomicResultMode.CURRENT;
case GET_OLD_VALUE -> UpdateAtomicResultMode.PREVIOUS;
};
UpdateAtomicResult result;
try (var key = keySend.receive()) {
result = db.updateAtomic(EMPTY_READ_OPTIONS, EMPTY_WRITE_OPTIONS, key, updater, returnMode);
}
return switch (updateReturnMode) {
case NOTHING -> null;
case GET_NEW_VALUE -> ((UpdateAtomicResultCurrent) result).current();
case GET_OLD_VALUE -> ((UpdateAtomicResultPrevious) result).previous();
};
}).onErrorMap(cause -> new IOException("Failed to read or write", cause)),
keySend -> Mono.fromRunnable(keySend::close));
2021-11-12 02:05:44 +01:00
}
2022-03-20 14:33:27 +01:00
@Override
public Mono<Send<LLDelta>> updateAndGetDelta(BinarySerializationFunction updater) {
2022-04-05 13:58:12 +02:00
return Mono.usingWhen(nameMono, keySend -> runOnDb(true, () -> {
if (Schedulers.isInNonBlockingThread()) {
throw new UnsupportedOperationException("Called update in a nonblocking thread");
}
UpdateAtomicResult result;
try (var key = keySend.receive()) {
result = db.updateAtomic(EMPTY_READ_OPTIONS, EMPTY_WRITE_OPTIONS, key, updater, DELTA);
}
return ((UpdateAtomicResultDelta) result).delta();
}).onErrorMap(cause -> new IOException("Failed to read or write", cause)),
keySend -> Mono.fromRunnable(keySend::close));
2022-03-20 14:33:27 +01:00
}
2020-12-07 22:15:18 +01:00
@Override
public String getDatabaseName() {
return databaseName;
}
2022-03-20 14:33:27 +01:00
@Override
public String getColumnName() {
return columnName;
}
@Override
public String getName() {
return new String(name);
}
2020-12-07 22:15:18 +01:00
}