This commit is contained in:
Andrea Cavalli 2021-06-19 12:14:14 +02:00
parent 6039241d2b
commit b1d48d36ff
3 changed files with 37 additions and 2 deletions

View File

@ -16,6 +16,7 @@ public class LLRange {
private static final LLRange RANGE_ALL = new LLRange(null, null);
private final ByteBuf min;
private final ByteBuf max;
private volatile int refCnt = 1;
private LLRange(ByteBuf min, ByteBuf max) {
assert min == null || min.refCnt() > 0;
@ -49,12 +50,18 @@ public class LLRange {
}
public boolean isAll() {
if (refCnt <= 0) {
throw new IllegalStateException("Released");
}
assert min == null || min.refCnt() > 0;
assert max == null || max.refCnt() > 0;
return min == null && max == null;
}
public boolean isSingle() {
if (refCnt <= 0) {
throw new IllegalStateException("Released");
}
assert min == null || min.refCnt() > 0;
assert max == null || max.refCnt() > 0;
if (min == null || max == null) return false;
@ -62,12 +69,18 @@ public class LLRange {
}
public boolean hasMin() {
if (refCnt <= 0) {
throw new IllegalStateException("Released");
}
assert min == null || min.refCnt() > 0;
assert max == null || max.refCnt() > 0;
return min != null;
}
public ByteBuf getMin() {
if (refCnt <= 0) {
throw new IllegalStateException("Released");
}
assert min == null || min.refCnt() > 0;
assert max == null || max.refCnt() > 0;
assert min != null;
@ -75,12 +88,18 @@ public class LLRange {
}
public boolean hasMax() {
if (refCnt <= 0) {
throw new IllegalStateException("Released");
}
assert min == null || min.refCnt() > 0;
assert max == null || max.refCnt() > 0;
return max != null;
}
public ByteBuf getMax() {
if (refCnt <= 0) {
throw new IllegalStateException("Released");
}
assert min == null || min.refCnt() > 0;
assert max == null || max.refCnt() > 0;
assert max != null;
@ -88,6 +107,9 @@ public class LLRange {
}
public ByteBuf getSingle() {
if (refCnt <= 0) {
throw new IllegalStateException("Released");
}
assert min == null || min.refCnt() > 0;
assert max == null || max.refCnt() > 0;
assert isSingle();
@ -122,6 +144,9 @@ public class LLRange {
}
public LLRange retain() {
if (refCnt <= 0) {
throw new IllegalStateException("Released");
}
if (min != null) {
min.retain();
}
@ -132,6 +157,10 @@ public class LLRange {
}
public void release() {
refCnt--;
if (refCnt < 0) {
throw new IllegalStateException("Already released");
}
if (min != null) {
min.release();
}

View File

@ -61,7 +61,7 @@ public class DatabaseSingleBucket<K, V, TH> implements DatabaseStageEntry<V> {
return bucketStage
.update(oldBucket -> {
V oldValue = extractValue(oldBucket);
V newValue = updater.apply(oldValue);
V newValue = updater.apply(oldValue);
if (newValue == null) {
return this.removeValueOrDelete(oldBucket);

View File

@ -7,6 +7,7 @@ import io.netty.buffer.ByteBufAllocator;
import it.cavallium.dbengine.database.LLRange;
import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.collections.DatabaseMapDictionaryDeep;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.jetbrains.annotations.NotNull;
import org.rocksdb.ColumnFamilyHandle;
@ -22,6 +23,7 @@ import static it.cavallium.dbengine.database.disk.LLLocalDictionary.logger;
public abstract class LLLocalReactiveRocksIterator<T> {
private final AtomicBoolean released = new AtomicBoolean(false);
private final RocksDB db;
private final ByteBufAllocator alloc;
private final ColumnFamilyHandle cfh;
@ -97,6 +99,10 @@ public abstract class LLLocalReactiveRocksIterator<T> {
public abstract T getEntry(ByteBuf key, ByteBuf value);
public void release() {
range.release();
if (released.compareAndSet(false, true)) {
range.release();
} else {
throw new IllegalStateException("Already released");
}
}
}