package it.cavallium.dbengine.database.collections; import io.net5.buffer.api.Drop; import io.net5.buffer.api.Owned; import io.net5.buffer.api.Send; import it.cavallium.dbengine.client.BadBlock; import it.cavallium.dbengine.client.CompositeSnapshot; import it.cavallium.dbengine.database.Column; import it.cavallium.dbengine.database.Delta; import it.cavallium.dbengine.database.LLEntry; import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.database.LiveResourceSupport; import it.cavallium.dbengine.database.UpdateReturnMode; import it.cavallium.dbengine.database.serialization.SerializationFunction; import it.unimi.dsi.fastutil.objects.ObjectArraySet; import it.unimi.dsi.fastutil.objects.ObjectSets; import java.util.HashSet; import java.util.Map; import java.util.Map.Entry; import java.util.Objects; import java.util.Set; import java.util.function.BiFunction; import java.util.function.Function; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import org.warp.commonutils.functional.TriFunction; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @SuppressWarnings("unused") public class DatabaseSingleBucket extends LiveResourceSupport, DatabaseSingleBucket> implements DatabaseStageEntry { private final K key; private DatabaseStageEntry>> bucketStage; public DatabaseSingleBucket(DatabaseStageEntry>> bucketStage, K key, Drop> drop) { super(new CloseOnDrop<>(drop)); this.key = key; this.bucketStage = bucketStage; } private DatabaseSingleBucket(Send>>> bucketStage, K key, Drop> drop) { super(new CloseOnDrop<>(drop)); this.key = key; this.bucketStage = (DatabaseStageEntry>>) bucketStage.receive(); } @Override public Mono get(@Nullable CompositeSnapshot snapshot, boolean existsAlmostCertainly) { return bucketStage.get(snapshot, existsAlmostCertainly).flatMap(this::extractValueTransformation); } @Override public Mono getOrDefault(@Nullable CompositeSnapshot snapshot, Mono defaultValue) { return bucketStage.get(snapshot).flatMap(this::extractValueTransformation).switchIfEmpty(defaultValue); } @Override public Mono set(V value) { return this.update(prev -> value, UpdateReturnMode.NOTHING).then(); } @Override public Mono setAndGetPrevious(V value) { return this.update(prev -> value, UpdateReturnMode.GET_OLD_VALUE); } @Override public Mono setAndGetChanged(V value) { return this.updateAndGetDelta(prev -> value).map(LLUtils::isDeltaChanged); } @Override public Mono update(SerializationFunction<@Nullable V, @Nullable V> updater, UpdateReturnMode updateReturnMode, boolean existsAlmostCertainly) { return bucketStage .update(oldBucket -> { V oldValue = extractValue(oldBucket); V newValue = updater.apply(oldValue); if (newValue == null) { return this.removeValueOrDelete(oldBucket); } else { return this.insertValueOrCreate(oldBucket, newValue); } }, updateReturnMode, existsAlmostCertainly) .flatMap(this::extractValueTransformation); } @Override public Mono> updateAndGetDelta(SerializationFunction<@Nullable V, @Nullable V> updater, boolean existsAlmostCertainly) { return bucketStage .updateAndGetDelta(oldBucket -> { V oldValue = extractValue(oldBucket); var result = updater.apply(oldValue); if (result == null) { return this.removeValueOrDelete(oldBucket); } else { return this.insertValueOrCreate(oldBucket, result); } }, existsAlmostCertainly) .transform(mono -> LLUtils.mapDelta(mono, this::extractValue)); } @Override public Mono clear() { return this.update(prev -> null, UpdateReturnMode.NOTHING).then(); } @Override public Mono clearAndGetPrevious() { return this.update(prev -> null, UpdateReturnMode.GET_OLD_VALUE); } @Override public Mono clearAndGetStatus() { return this.updateAndGetDelta(prev -> null).map(LLUtils::isDeltaChanged); } @Override public Mono leavesCount(@Nullable CompositeSnapshot snapshot, boolean fast) { return this.get(snapshot).map(prev -> 1L).defaultIfEmpty(0L); } @Override public Mono isEmpty(@Nullable CompositeSnapshot snapshot) { return this.get(snapshot).map(prev -> true).defaultIfEmpty(true); } @Override public DatabaseStageEntry entry() { return this; } @Override public Flux badBlocks() { return bucketStage.badBlocks(); } private Mono extractValueTransformation(Set> entries) { return Mono.fromCallable(() -> extractValue(entries)); } @Nullable private V extractValue(Set> entries) { if (entries == null) return null; for (Entry entry : entries) { if (Objects.equals(entry.getKey(), key)) { return entry.getValue(); } } return null; } @NotNull private ObjectArraySet> insertValueOrCreate(@Nullable ObjectArraySet> entries, V value) { if (entries != null) { var clonedEntries = entries.clone(); var it = clonedEntries.iterator(); while (it.hasNext()) { var entry = it.next(); if (Objects.equals(entry.getKey(), key)) { it.remove(); break; } } clonedEntries.add(Map.entry(key, value)); return clonedEntries; } else { var oas = new ObjectArraySet>(1); oas.add(Map.entry(key, value)); return oas; } } @Nullable private ObjectArraySet> removeValueOrDelete(@Nullable ObjectArraySet> entries) { if (entries != null) { var clonedEntries = entries.clone(); var it = clonedEntries.iterator(); while (it.hasNext()) { var entry = it.next(); if (Objects.equals(entry.getKey(), key)) { it.remove(); break; } } if (clonedEntries.size() == 0) { return null; } else { return clonedEntries; } } else { return null; } } @Override protected RuntimeException createResourceClosedException() { throw new IllegalStateException("Closed"); } @Override protected Owned> prepareSend() { var bucketStage = this.bucketStage.send(); return drop -> new DatabaseSingleBucket<>(bucketStage, key, drop); } @Override protected void makeInaccessible() { this.bucketStage = null; } private static class CloseOnDrop implements Drop> { private final Drop> delegate; public CloseOnDrop(Drop> drop) { if (drop instanceof CloseOnDrop closeOnDrop) { this.delegate = closeOnDrop.delegate; } else { this.delegate = drop; } } @Override public void drop(DatabaseSingleBucket obj) { obj.bucketStage.close(); delegate.drop(obj); } } }