package it.cavallium.dbengine.database.collections; import io.net5.buffer.api.Drop; import io.net5.buffer.api.Owned; import io.net5.buffer.api.Send; import it.cavallium.dbengine.client.BadBlock; import it.cavallium.dbengine.client.CompositeSnapshot; import it.cavallium.dbengine.database.Delta; import it.cavallium.dbengine.database.LLUtils; import io.net5.buffer.api.internal.ResourceSupport; import it.cavallium.dbengine.database.UpdateReturnMode; import it.cavallium.dbengine.database.serialization.SerializationFunction; import it.unimi.dsi.fastutil.objects.ObjectArraySet; import java.util.Map; import java.util.Map.Entry; import java.util.Objects; import java.util.Set; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @SuppressWarnings("unused") public class DatabaseSingleBucket extends ResourceSupport, DatabaseSingleBucket> implements DatabaseStageEntry { private static final Logger logger = LogManager.getLogger(DatabaseSingleBucket.class); private static final Drop> DROP = new Drop<>() { @Override public void drop(DatabaseSingleBucket obj) { try { if (obj.bucketStage != null) { obj.bucketStage.close(); } } catch (Throwable ex) { logger.error("Failed to close bucketStage", ex); } try { if (obj.onClose != null) { obj.onClose.run(); } } catch (Throwable ex) { logger.error("Failed to close onClose", ex); } } @Override public Drop> fork() { return this; } @Override public void attach(DatabaseSingleBucket obj) { } }; private final K key; private DatabaseStageEntry>> bucketStage; private Runnable onClose; @SuppressWarnings({"unchecked", "rawtypes"}) public DatabaseSingleBucket(DatabaseStageEntry>> bucketStage, K key, Runnable onClose) { super((Drop>) (Drop) DROP); this.key = key; this.bucketStage = bucketStage; this.onClose = onClose; } @SuppressWarnings({"unchecked", "rawtypes"}) private DatabaseSingleBucket(Send>>> bucketStage, K key, Runnable onClose) { super((Drop>) (Drop) DROP); this.key = key; this.bucketStage = (DatabaseStageEntry>>) bucketStage.receive(); this.onClose = onClose; } @Override public Mono get(@Nullable CompositeSnapshot snapshot, boolean existsAlmostCertainly) { return bucketStage.get(snapshot, existsAlmostCertainly).flatMap(this::extractValueTransformation); } @Override public Mono getOrDefault(@Nullable CompositeSnapshot snapshot, Mono defaultValue) { return bucketStage.get(snapshot).flatMap(this::extractValueTransformation).switchIfEmpty(defaultValue); } @Override public Mono set(V value) { return this.update(prev -> value, UpdateReturnMode.NOTHING).then(); } @Override public Mono setAndGetPrevious(V value) { return this.update(prev -> value, UpdateReturnMode.GET_OLD_VALUE); } @Override public Mono setAndGetChanged(V value) { return this.updateAndGetDelta(prev -> value).map(LLUtils::isDeltaChanged); } @Override public Mono update(SerializationFunction<@Nullable V, @Nullable V> updater, UpdateReturnMode updateReturnMode, boolean existsAlmostCertainly) { return bucketStage .update(oldBucket -> { V oldValue = extractValue(oldBucket); V newValue = updater.apply(oldValue); if (newValue == null) { return this.removeValueOrDelete(oldBucket); } else { return this.insertValueOrCreate(oldBucket, newValue); } }, updateReturnMode, existsAlmostCertainly) .flatMap(this::extractValueTransformation); } @Override public Mono> updateAndGetDelta(SerializationFunction<@Nullable V, @Nullable V> updater, boolean existsAlmostCertainly) { return bucketStage .updateAndGetDelta(oldBucket -> { V oldValue = extractValue(oldBucket); var result = updater.apply(oldValue); if (result == null) { return this.removeValueOrDelete(oldBucket); } else { return this.insertValueOrCreate(oldBucket, result); } }, existsAlmostCertainly) .transform(mono -> LLUtils.mapDelta(mono, this::extractValue)); } @Override public Mono clear() { return this.update(prev -> null, UpdateReturnMode.NOTHING).then(); } @Override public Mono clearAndGetPrevious() { return this.update(prev -> null, UpdateReturnMode.GET_OLD_VALUE); } @Override public Mono clearAndGetStatus() { return this.updateAndGetDelta(prev -> null).map(LLUtils::isDeltaChanged); } @Override public Mono leavesCount(@Nullable CompositeSnapshot snapshot, boolean fast) { return this.get(snapshot).map(prev -> 1L).defaultIfEmpty(0L); } @Override public Mono isEmpty(@Nullable CompositeSnapshot snapshot) { return this.get(snapshot).map(prev -> true).defaultIfEmpty(true); } @Override public DatabaseStageEntry entry() { return this; } @Override public Flux badBlocks() { return bucketStage.badBlocks(); } private Mono extractValueTransformation(Set> entries) { return Mono.fromCallable(() -> extractValue(entries)); } @Nullable private V extractValue(Set> entries) { if (entries == null) return null; for (Entry entry : entries) { if (Objects.equals(entry.getKey(), key)) { return entry.getValue(); } } return null; } @NotNull private ObjectArraySet> insertValueOrCreate(@Nullable ObjectArraySet> entries, V value) { if (entries != null) { var clonedEntries = entries.clone(); var it = clonedEntries.iterator(); while (it.hasNext()) { var entry = it.next(); if (Objects.equals(entry.getKey(), key)) { it.remove(); break; } } clonedEntries.add(Map.entry(key, value)); return clonedEntries; } else { var oas = new ObjectArraySet>(1); oas.add(Map.entry(key, value)); return oas; } } @Nullable private ObjectArraySet> removeValueOrDelete(@Nullable ObjectArraySet> entries) { if (entries != null) { var clonedEntries = entries.clone(); var it = clonedEntries.iterator(); while (it.hasNext()) { var entry = it.next(); if (Objects.equals(entry.getKey(), key)) { it.remove(); break; } } if (clonedEntries.size() == 0) { return null; } else { return clonedEntries; } } else { return null; } } @Override protected RuntimeException createResourceClosedException() { throw new IllegalStateException("Closed"); } @Override protected Owned> prepareSend() { var bucketStage = this.bucketStage.send(); var onClose = this.onClose; return drop -> { var instance = new DatabaseSingleBucket(bucketStage, key, onClose); drop.attach(instance); return instance; }; } @Override protected void makeInaccessible() { this.bucketStage = null; this.onClose = null; } }