package it.cavallium.dbengine.database.disk; import io.netty5.buffer.api.Buffer; import io.netty5.buffer.api.BufferAllocator; import io.netty5.buffer.api.Send; import it.cavallium.dbengine.database.LLDelta; import it.cavallium.dbengine.database.LLSingleton; import it.cavallium.dbengine.database.LLSnapshot; import it.cavallium.dbengine.database.UpdateReturnMode; import it.cavallium.dbengine.database.serialization.SerializationFunction; import java.io.IOException; import java.util.Arrays; import java.util.concurrent.Callable; import java.util.function.Function; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import org.rocksdb.ReadOptions; import org.rocksdb.RocksDBException; import org.rocksdb.Snapshot; import org.rocksdb.WriteOptions; import reactor.core.publisher.Mono; import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Schedulers; public class LLLocalSingleton implements LLSingleton { static final ReadOptions EMPTY_READ_OPTIONS = new UnreleasableReadOptions(new UnmodifiableReadOptions()); static final WriteOptions EMPTY_WRITE_OPTIONS = new UnreleasableWriteOptions(new UnmodifiableWriteOptions()); private final RocksDBColumn db; private final Function snapshotResolver; private final byte[] name; private final String columnName; private final Mono> nameMono; private final String databaseName; private final Scheduler dbScheduler; public LLLocalSingleton(RocksDBColumn db, Function snapshotResolver, String databaseName, byte[] name, String columnName, Scheduler dbScheduler, byte @Nullable [] defaultValue) throws RocksDBException { this.db = db; this.databaseName = databaseName; this.snapshotResolver = snapshotResolver; this.name = name; this.columnName = columnName; this.nameMono = Mono.fromCallable(() -> { var alloc = db.getAllocator(); try (var nameBuf = alloc.allocate(this.name.length)) { nameBuf.writeBytes(this.name); return nameBuf.send(); } }); this.dbScheduler = dbScheduler; if (Schedulers.isInNonBlockingThread()) { throw new UnsupportedOperationException("Initialized in a nonblocking thread"); } if (defaultValue != null && db.get(EMPTY_READ_OPTIONS, this.name, true) == null) { db.put(EMPTY_WRITE_OPTIONS, this.name, defaultValue); } } private @NotNull Mono runOnDb(Callable<@Nullable T> callable) { return Mono.fromCallable(callable).subscribeOn(dbScheduler); } private ReadOptions resolveSnapshot(LLSnapshot snapshot) { if (snapshot != null) { return new ReadOptions().setSnapshot(snapshotResolver.apply(snapshot)); } else { return EMPTY_READ_OPTIONS; } } @Override public BufferAllocator getAllocator() { return db.getAllocator(); } @Override public Mono> get(@Nullable LLSnapshot snapshot) { return nameMono.publishOn(Schedulers.boundedElastic()).handle((nameSend, sink) -> { try (Buffer name = nameSend.receive()) { Buffer result = db.get(resolveSnapshot(snapshot), name); if (result != null) { sink.next(result.send()); } else { sink.complete(); } } catch (RocksDBException ex) { sink.error(new IOException("Failed to read " + Arrays.toString(name), ex)); } }); } @Override public Mono set(Mono> valueMono) { return Mono.zip(nameMono, valueMono).publishOn(Schedulers.boundedElastic()).handle((tuple, sink) -> { var nameSend = tuple.getT1(); var valueSend = tuple.getT2(); try (Buffer name = nameSend.receive()) { try (Buffer value = valueSend.receive()) { db.put(EMPTY_WRITE_OPTIONS, name, value); sink.next(true); } } catch (RocksDBException ex) { sink.error(new IOException("Failed to write " + Arrays.toString(name), ex)); } }).switchIfEmpty(unset().thenReturn(true)).then(); } private Mono unset() { return nameMono.publishOn(Schedulers.boundedElastic()).handle((nameSend, sink) -> { try (Buffer name = nameSend.receive()) { db.delete(EMPTY_WRITE_OPTIONS, name); } catch (RocksDBException ex) { sink.error(new IOException("Failed to read " + Arrays.toString(name), ex)); } }); } @Override public Mono> update(SerializationFunction<@Nullable Send, @Nullable Buffer> updater, UpdateReturnMode updateReturnMode) { return Mono.usingWhen(nameMono, keySend -> runOnDb(() -> { if (Schedulers.isInNonBlockingThread()) { throw new UnsupportedOperationException("Called update in a nonblocking thread"); } UpdateAtomicResultMode returnMode = switch (updateReturnMode) { case NOTHING -> UpdateAtomicResultMode.NOTHING; case GET_NEW_VALUE -> UpdateAtomicResultMode.CURRENT; case GET_OLD_VALUE -> UpdateAtomicResultMode.PREVIOUS; }; UpdateAtomicResult result = db.updateAtomic(EMPTY_READ_OPTIONS, EMPTY_WRITE_OPTIONS, keySend, updater, returnMode); return switch (updateReturnMode) { case NOTHING -> null; case GET_NEW_VALUE -> ((UpdateAtomicResultCurrent) result).current(); case GET_OLD_VALUE -> ((UpdateAtomicResultPrevious) result).previous(); }; }).onErrorMap(cause -> new IOException("Failed to read or write", cause)), keySend -> Mono.fromRunnable(keySend::close)); } @Override public Mono> updateAndGetDelta(SerializationFunction<@Nullable Send, @Nullable Buffer> updater) { return Mono.usingWhen(nameMono, keySend -> runOnDb(() -> { if (Schedulers.isInNonBlockingThread()) { throw new UnsupportedOperationException("Called update in a nonblocking thread"); } UpdateAtomicResult result = db.updateAtomic(EMPTY_READ_OPTIONS, EMPTY_WRITE_OPTIONS, keySend, updater, UpdateAtomicResultMode.DELTA); return ((UpdateAtomicResultDelta) result).delta(); }).onErrorMap(cause -> new IOException("Failed to read or write", cause)), keySend -> Mono.fromRunnable(keySend::close)); } @Override public String getDatabaseName() { return databaseName; } @Override public String getColumnName() { return columnName; } @Override public String getName() { return new String(name); } }