Handle multi-subscriber refcnt

This commit is contained in:
Andrea Cavalli 2021-05-12 01:25:59 +02:00
parent 2f404c477e
commit f4242218da

View File

@ -193,6 +193,7 @@ public class LLLocalDictionary implements LLDictionary {
@Override @Override
public Mono<ByteBuf> get(@Nullable LLSnapshot snapshot, ByteBuf key, boolean existsAlmostCertainly) { public Mono<ByteBuf> get(@Nullable LLSnapshot snapshot, ByteBuf key, boolean existsAlmostCertainly) {
try {
return Mono return Mono
.fromCallable(() -> { .fromCallable(() -> {
StampedLock lock; StampedLock lock;
@ -218,7 +219,11 @@ public class LLLocalDictionary implements LLDictionary {
}) })
.onErrorMap(cause -> new IOException("Failed to read " + LLUtils.toString(key), cause)) .onErrorMap(cause -> new IOException("Failed to read " + LLUtils.toString(key), cause))
.subscribeOn(dbScheduler) .subscribeOn(dbScheduler)
.doOnSubscribe(s -> key.retain())
.doFinally(s -> key.release()); .doFinally(s -> key.release());
} finally {
key.release();
}
} }
private ByteBuf dbGet(ColumnFamilyHandle cfh, private ByteBuf dbGet(ColumnFamilyHandle cfh,
@ -359,6 +364,7 @@ public class LLLocalDictionary implements LLDictionary {
@Override @Override
public Mono<Boolean> isRangeEmpty(@Nullable LLSnapshot snapshot, LLRange range) { public Mono<Boolean> isRangeEmpty(@Nullable LLSnapshot snapshot, LLRange range) {
try {
return Mono return Mono
.defer(() -> { .defer(() -> {
if (range.isSingle()) { if (range.isSingle()) {
@ -368,10 +374,15 @@ public class LLLocalDictionary implements LLDictionary {
} }
}) })
.map(isContained -> !isContained) .map(isContained -> !isContained)
.doOnSubscribe(s -> range.retain())
.doFinally(s -> range.release()); .doFinally(s -> range.release());
} finally {
range.release();
}
} }
public Mono<Boolean> containsRange(@Nullable LLSnapshot snapshot, LLRange range) { public Mono<Boolean> containsRange(@Nullable LLSnapshot snapshot, LLRange range) {
try {
return Mono return Mono
.fromCallable(() -> { .fromCallable(() -> {
var readOpts = resolveSnapshot(snapshot); var readOpts = resolveSnapshot(snapshot);
@ -412,10 +423,15 @@ public class LLLocalDictionary implements LLDictionary {
}) })
.onErrorMap(cause -> new IOException("Failed to read range " + range.toString(), cause)) .onErrorMap(cause -> new IOException("Failed to read range " + range.toString(), cause))
.subscribeOn(dbScheduler) .subscribeOn(dbScheduler)
.doOnSubscribe(s -> range.retain())
.doFinally(s -> range.release()); .doFinally(s -> range.release());
} finally {
range.release();
}
} }
private Mono<Boolean> containsKey(@Nullable LLSnapshot snapshot, ByteBuf key) { private Mono<Boolean> containsKey(@Nullable LLSnapshot snapshot, ByteBuf key) {
try {
return Mono return Mono
.fromCallable(() -> { .fromCallable(() -> {
StampedLock lock; StampedLock lock;
@ -448,11 +464,16 @@ public class LLLocalDictionary implements LLDictionary {
}) })
.onErrorMap(cause -> new IOException("Failed to read " + LLUtils.toString(key), cause)) .onErrorMap(cause -> new IOException("Failed to read " + LLUtils.toString(key), cause))
.subscribeOn(dbScheduler) .subscribeOn(dbScheduler)
.doOnSubscribe(s -> key.retain())
.doFinally(s -> key.release()); .doFinally(s -> key.release());
} finally {
key.release();
}
} }
@Override @Override
public Mono<ByteBuf> put(ByteBuf key, ByteBuf value, LLDictionaryResultType resultType) { public Mono<ByteBuf> put(ByteBuf key, ByteBuf value, LLDictionaryResultType resultType) {
try {
return Mono return Mono
.defer(() -> getPreviousData(key.retain(), resultType)) .defer(() -> getPreviousData(key.retain(), resultType))
.concatWith(Mono .concatWith(Mono
@ -483,10 +504,18 @@ public class LLLocalDictionary implements LLDictionary {
.onErrorMap(cause -> new IOException("Failed to write " + LLUtils.toString(key), cause)) .onErrorMap(cause -> new IOException("Failed to write " + LLUtils.toString(key), cause))
) )
.singleOrEmpty() .singleOrEmpty()
.doOnSubscribe(s -> {
key.retain();
value.retain();
})
.doFinally(s -> { .doFinally(s -> {
key.release(); key.release();
value.release(); value.release();
}); });
} finally {
key.release();
value.release();
}
} }
@Override @Override
@ -501,6 +530,7 @@ public class LLLocalDictionary implements LLDictionary {
Function<@Nullable ByteBuf, @Nullable ByteBuf> updater, Function<@Nullable ByteBuf, @Nullable ByteBuf> updater,
UpdateReturnMode updateReturnMode, UpdateReturnMode updateReturnMode,
boolean existsAlmostCertainly) { boolean existsAlmostCertainly) {
try {
return Mono return Mono
.fromCallable(() -> { .fromCallable(() -> {
if (updateMode == UpdateMode.DISALLOW) { if (updateMode == UpdateMode.DISALLOW) {
@ -616,7 +646,11 @@ public class LLLocalDictionary implements LLDictionary {
}) })
.onErrorMap(cause -> new IOException("Failed to read or write " + (key.refCnt() > 0 ? LLUtils.toString(key) : "(released)"), cause)) .onErrorMap(cause -> new IOException("Failed to read or write " + (key.refCnt() > 0 ? LLUtils.toString(key) : "(released)"), cause))
.subscribeOn(dbScheduler) .subscribeOn(dbScheduler)
.doOnSubscribe(s -> key.retain())
.doFinally(s -> key.release()); .doFinally(s -> key.release());
} finally {
key.release();
}
} }
// Remember to change also update() if you are modifying this function // Remember to change also update() if you are modifying this function
@ -625,6 +659,7 @@ public class LLLocalDictionary implements LLDictionary {
public Mono<Delta<ByteBuf>> updateAndGetDelta(ByteBuf key, public Mono<Delta<ByteBuf>> updateAndGetDelta(ByteBuf key,
Function<@Nullable ByteBuf, @Nullable ByteBuf> updater, Function<@Nullable ByteBuf, @Nullable ByteBuf> updater,
boolean existsAlmostCertainly) { boolean existsAlmostCertainly) {
try {
return Mono return Mono
.fromCallable(() -> { .fromCallable(() -> {
if (updateMode == UpdateMode.DISALLOW) throw new UnsupportedOperationException("update() is disallowed"); if (updateMode == UpdateMode.DISALLOW) throw new UnsupportedOperationException("update() is disallowed");
@ -732,7 +767,11 @@ public class LLLocalDictionary implements LLDictionary {
}) })
.onErrorMap(cause -> new IOException("Failed to read or write " + (key.refCnt() > 0 ? LLUtils.toString(key) : "(released)"), cause)) .onErrorMap(cause -> new IOException("Failed to read or write " + (key.refCnt() > 0 ? LLUtils.toString(key) : "(released)"), cause))
.subscribeOn(dbScheduler) .subscribeOn(dbScheduler)
.doOnSubscribe(s -> key.retain())
.doFinally(s -> key.release()); .doFinally(s -> key.release());
} finally {
key.release();
}
} }
private void dbDelete(ColumnFamilyHandle cfh, @Nullable WriteOptions writeOptions, ByteBuf key) private void dbDelete(ColumnFamilyHandle cfh, @Nullable WriteOptions writeOptions, ByteBuf key)
@ -754,6 +793,7 @@ public class LLLocalDictionary implements LLDictionary {
@Override @Override
public Mono<ByteBuf> remove(ByteBuf key, LLDictionaryResultType resultType) { public Mono<ByteBuf> remove(ByteBuf key, LLDictionaryResultType resultType) {
try {
return Mono return Mono
.defer(() -> getPreviousData(key.retain(), resultType)) .defer(() -> getPreviousData(key.retain(), resultType))
.concatWith(Mono .concatWith(Mono
@ -783,11 +823,17 @@ public class LLLocalDictionary implements LLDictionary {
.onErrorMap(cause -> new IOException("Failed to delete " + LLUtils.toString(key), cause)) .onErrorMap(cause -> new IOException("Failed to delete " + LLUtils.toString(key), cause))
.subscribeOn(dbScheduler) .subscribeOn(dbScheduler)
.then(Mono.empty()) .then(Mono.empty())
).singleOrEmpty() )
.singleOrEmpty()
.doOnSubscribe(s -> key.retain())
.doFinally(s -> key.release()); .doFinally(s -> key.release());
} finally {
key.release();
}
} }
private Mono<ByteBuf> getPreviousData(ByteBuf key, LLDictionaryResultType resultType) { private Mono<ByteBuf> getPreviousData(ByteBuf key, LLDictionaryResultType resultType) {
try {
return Mono return Mono
.defer(() -> { .defer(() -> {
switch (resultType) { switch (resultType) {
@ -844,7 +890,11 @@ public class LLLocalDictionary implements LLDictionary {
return Mono.error(new IllegalStateException("Unexpected value: " + resultType)); return Mono.error(new IllegalStateException("Unexpected value: " + resultType));
} }
}) })
.doOnSubscribe(s -> key.retain())
.doFinally(s -> key.release()); .doFinally(s -> key.release());
} finally {
key.release();
}
} }
@Override @Override
@ -1016,21 +1066,11 @@ public class LLLocalDictionary implements LLDictionary {
}); });
} }
@NotNull
private Mono<Void> putEntryToWriteBatch(Entry<ByteBuf, ByteBuf> newEntry, CappedWriteBatch writeBatch) {
return Mono
.<Void>fromCallable(() -> {
writeBatch.put(cfh, newEntry.getKey(), newEntry.getValue());
return null;
})
.subscribeOn(dbScheduler);
}
@Override @Override
public Flux<Entry<ByteBuf, ByteBuf>> getRange(@Nullable LLSnapshot snapshot, public Flux<Entry<ByteBuf, ByteBuf>> getRange(@Nullable LLSnapshot snapshot,
LLRange range, LLRange range,
boolean existsAlmostCertainly) { boolean existsAlmostCertainly) {
try {
return Flux return Flux
.defer(() -> { .defer(() -> {
if (range.isSingle()) { if (range.isSingle()) {
@ -1039,13 +1079,18 @@ public class LLLocalDictionary implements LLDictionary {
return getRangeMulti(snapshot, range.retain()); return getRangeMulti(snapshot, range.retain());
} }
}) })
.doOnSubscribe(s -> range.retain())
.doFinally(s -> range.release()); .doFinally(s -> range.release());
} finally {
range.release();
}
} }
@Override @Override
public Flux<List<Entry<ByteBuf, ByteBuf>>> getRangeGrouped(@Nullable LLSnapshot snapshot, public Flux<List<Entry<ByteBuf, ByteBuf>>> getRangeGrouped(@Nullable LLSnapshot snapshot,
LLRange range, LLRange range,
int prefixLength, boolean existsAlmostCertainly) { int prefixLength, boolean existsAlmostCertainly) {
try {
return Flux return Flux
.defer(() -> { .defer(() -> {
if (range.isSingle()) { if (range.isSingle()) {
@ -1054,18 +1099,28 @@ public class LLLocalDictionary implements LLDictionary {
return getRangeMultiGrouped(snapshot, range.retain(), prefixLength); return getRangeMultiGrouped(snapshot, range.retain(), prefixLength);
} }
}) })
.doOnSubscribe(s -> range.retain())
.doFinally(s -> range.release()); .doFinally(s -> range.release());
} finally {
range.release();
}
} }
private Flux<Entry<ByteBuf, ByteBuf>> getRangeSingle(LLSnapshot snapshot, ByteBuf key, boolean existsAlmostCertainly) { private Flux<Entry<ByteBuf, ByteBuf>> getRangeSingle(LLSnapshot snapshot, ByteBuf key, boolean existsAlmostCertainly) {
try {
return Mono return Mono
.defer(() -> this.get(snapshot, key.retain(), existsAlmostCertainly)) .defer(() -> this.get(snapshot, key.retain(), existsAlmostCertainly))
.map(value -> Map.entry(key.retain(), value)) .map(value -> Map.entry(key.retain(), value))
.flux() .flux()
.doOnSubscribe(s -> key.retain())
.doFinally(s -> key.release()); .doFinally(s -> key.release());
} finally {
key.release();
}
} }
private Flux<Entry<ByteBuf, ByteBuf>> getRangeMulti(LLSnapshot snapshot, LLRange range) { private Flux<Entry<ByteBuf, ByteBuf>> getRangeMulti(LLSnapshot snapshot, LLRange range) {
try {
return Flux return Flux
.using( .using(
() -> new LLLocalEntryReactiveRocksIterator(db, alloc, cfh, range.retain(), resolveSnapshot(snapshot)), () -> new LLLocalEntryReactiveRocksIterator(db, alloc, cfh, range.retain(), resolveSnapshot(snapshot)),
@ -1079,10 +1134,15 @@ public class LLLocalDictionary implements LLDictionary {
castedEntry.getValue().release(); castedEntry.getValue().release();
}) })
.subscribeOn(dbScheduler) .subscribeOn(dbScheduler)
.doOnSubscribe(s -> range.retain())
.doFinally(s -> range.release()); .doFinally(s -> range.release());
} finally {
range.release();
}
} }
private Flux<List<Entry<ByteBuf, ByteBuf>>> getRangeMultiGrouped(LLSnapshot snapshot, LLRange range, int prefixLength) { private Flux<List<Entry<ByteBuf, ByteBuf>>> getRangeMultiGrouped(LLSnapshot snapshot, LLRange range, int prefixLength) {
try {
return Flux return Flux
.using( .using(
() -> new LLLocalGroupedEntryReactiveRocksIterator(db, () -> new LLLocalGroupedEntryReactiveRocksIterator(db,
@ -1097,11 +1157,16 @@ public class LLLocalDictionary implements LLDictionary {
LLLocalGroupedReactiveRocksIterator::release LLLocalGroupedReactiveRocksIterator::release
) )
.subscribeOn(dbScheduler) .subscribeOn(dbScheduler)
.doOnSubscribe(s -> range.retain())
.doFinally(s -> range.release()); .doFinally(s -> range.release());
} finally {
range.release();
}
} }
@Override @Override
public Flux<ByteBuf> getRangeKeys(@Nullable LLSnapshot snapshot, LLRange range) { public Flux<ByteBuf> getRangeKeys(@Nullable LLSnapshot snapshot, LLRange range) {
try {
return Flux return Flux
.defer(() -> { .defer(() -> {
if (range.isSingle()) { if (range.isSingle()) {
@ -1110,11 +1175,16 @@ public class LLLocalDictionary implements LLDictionary {
return this.getRangeKeysMulti(snapshot, range.retain()); return this.getRangeKeysMulti(snapshot, range.retain());
} }
}) })
.doOnSubscribe(s -> range.retain())
.doFinally(s -> range.release()); .doFinally(s -> range.release());
} finally {
range.release();
}
} }
@Override @Override
public Flux<List<ByteBuf>> getRangeKeysGrouped(@Nullable LLSnapshot snapshot, LLRange range, int prefixLength) { public Flux<List<ByteBuf>> getRangeKeysGrouped(@Nullable LLSnapshot snapshot, LLRange range, int prefixLength) {
try {
return Flux return Flux
.using( .using(
() -> new LLLocalGroupedKeyReactiveRocksIterator(db, () -> new LLLocalGroupedKeyReactiveRocksIterator(db,
@ -1129,11 +1199,16 @@ public class LLLocalDictionary implements LLDictionary {
LLLocalGroupedReactiveRocksIterator::release LLLocalGroupedReactiveRocksIterator::release
) )
.subscribeOn(dbScheduler) .subscribeOn(dbScheduler)
.doOnSubscribe(s -> range.retain())
.doFinally(s -> range.release()); .doFinally(s -> range.release());
} finally {
range.release();
}
} }
@Override @Override
public Flux<ByteBuf> getRangeKeyPrefixes(@Nullable LLSnapshot snapshot, LLRange range, int prefixLength) { public Flux<ByteBuf> getRangeKeyPrefixes(@Nullable LLSnapshot snapshot, LLRange range, int prefixLength) {
try {
return Flux return Flux
.using( .using(
() -> new LLLocalKeyPrefixReactiveRocksIterator(db, () -> new LLLocalKeyPrefixReactiveRocksIterator(db,
@ -1149,10 +1224,15 @@ public class LLLocalDictionary implements LLDictionary {
LLLocalKeyPrefixReactiveRocksIterator::release LLLocalKeyPrefixReactiveRocksIterator::release
) )
.subscribeOn(dbScheduler) .subscribeOn(dbScheduler)
.doOnSubscribe(s -> range.retain())
.doFinally(s -> range.release()); .doFinally(s -> range.release());
} finally {
range.release();
}
} }
private Flux<ByteBuf> getRangeKeysSingle(LLSnapshot snapshot, ByteBuf key) { private Flux<ByteBuf> getRangeKeysSingle(LLSnapshot snapshot, ByteBuf key) {
try {
return Mono return Mono
.defer(() -> this.containsKey(snapshot, key.retain())) .defer(() -> this.containsKey(snapshot, key.retain()))
.flux() .flux()
@ -1164,10 +1244,15 @@ public class LLLocalDictionary implements LLDictionary {
} }
}) })
.doOnDiscard(ByteBuf.class, ReferenceCounted::release) .doOnDiscard(ByteBuf.class, ReferenceCounted::release)
.doOnSubscribe(s -> key.retain())
.doFinally(s -> key.release()); .doFinally(s -> key.release());
} finally {
key.release();
}
} }
private Flux<ByteBuf> getRangeKeysMulti(LLSnapshot snapshot, LLRange range) { private Flux<ByteBuf> getRangeKeysMulti(LLSnapshot snapshot, LLRange range) {
try {
return Flux return Flux
.using( .using(
() -> new LLLocalKeyReactiveRocksIterator(db, alloc, cfh, range.retain(), resolveSnapshot(snapshot)), () -> new LLLocalKeyReactiveRocksIterator(db, alloc, cfh, range.retain(), resolveSnapshot(snapshot)),
@ -1176,11 +1261,16 @@ public class LLLocalDictionary implements LLDictionary {
) )
.doOnDiscard(ByteBuf.class, ReferenceCounted::release) .doOnDiscard(ByteBuf.class, ReferenceCounted::release)
.subscribeOn(dbScheduler) .subscribeOn(dbScheduler)
.doOnSubscribe(s -> range.retain())
.doFinally(s -> range.release()); .doFinally(s -> range.release());
} finally {
range.release();
}
} }
@Override @Override
public Mono<Void> setRange(LLRange range, Flux<Entry<ByteBuf, ByteBuf>> entries) { public Mono<Void> setRange(LLRange range, Flux<Entry<ByteBuf, ByteBuf>> entries) {
try {
if (USE_WINDOW_IN_SET_RANGE) { if (USE_WINDOW_IN_SET_RANGE) {
return Mono return Mono
.<Void>fromCallable(() -> { .<Void>fromCallable(() -> {
@ -1295,6 +1385,7 @@ public class LLLocalDictionary implements LLDictionary {
) )
.then() .then()
.onErrorMap(cause -> new IOException("Failed to write range", cause)) .onErrorMap(cause -> new IOException("Failed to write range", cause))
.doOnSubscribe(s -> range.retain())
.doFinally(s -> range.release()); .doFinally(s -> range.release());
} else { } else {
if (USE_WRITE_BATCHES_IN_SET_RANGE) { if (USE_WRITE_BATCHES_IN_SET_RANGE) {
@ -1322,8 +1413,12 @@ public class LLLocalDictionary implements LLDictionary {
.then(Mono.<Void>empty()) .then(Mono.<Void>empty())
) )
.onErrorMap(cause -> new IOException("Failed to write range", cause)) .onErrorMap(cause -> new IOException("Failed to write range", cause))
.doOnSubscribe(s -> range.retain())
.doFinally(s -> range.release()); .doFinally(s -> range.release());
} }
} finally {
range.release();
}
} }
//todo: this is broken, check why //todo: this is broken, check why
@ -1532,6 +1627,7 @@ public class LLLocalDictionary implements LLDictionary {
@Override @Override
public Mono<Long> sizeRange(@Nullable LLSnapshot snapshot, LLRange range, boolean fast) { public Mono<Long> sizeRange(@Nullable LLSnapshot snapshot, LLRange range, boolean fast) {
try {
Mono<Long> result; Mono<Long> result;
if (range.isAll()) { if (range.isAll()) {
result = Mono result = Mono
@ -1586,11 +1682,15 @@ public class LLLocalDictionary implements LLDictionary {
+ range.toString(), cause)) + range.toString(), cause))
.subscribeOn(dbScheduler); .subscribeOn(dbScheduler);
} }
return result.doFinally(s -> range.release()); return result.doOnSubscribe(s -> range.retain()).doFinally(s -> range.release());
} finally {
range.release();
}
} }
@Override @Override
public Mono<Entry<ByteBuf, ByteBuf>> getOne(@Nullable LLSnapshot snapshot, LLRange range) { public Mono<Entry<ByteBuf, ByteBuf>> getOne(@Nullable LLSnapshot snapshot, LLRange range) {
try {
return Mono return Mono
.fromCallable(() -> { .fromCallable(() -> {
var readOpts = resolveSnapshot(snapshot); var readOpts = resolveSnapshot(snapshot);
@ -1636,11 +1736,16 @@ public class LLLocalDictionary implements LLDictionary {
} }
}) })
.subscribeOn(dbScheduler) .subscribeOn(dbScheduler)
.doOnSubscribe(s -> range.retain())
.doFinally(s -> range.release()); .doFinally(s -> range.release());
} finally {
range.release();
}
} }
@Override @Override
public Mono<ByteBuf> getOneKey(@Nullable LLSnapshot snapshot, LLRange range) { public Mono<ByteBuf> getOneKey(@Nullable LLSnapshot snapshot, LLRange range) {
try {
return Mono return Mono
.fromCallable(() -> { .fromCallable(() -> {
var readOpts = resolveSnapshot(snapshot); var readOpts = resolveSnapshot(snapshot);
@ -1678,7 +1783,11 @@ public class LLLocalDictionary implements LLDictionary {
} }
}) })
.subscribeOn(dbScheduler) .subscribeOn(dbScheduler)
.doOnSubscribe(s -> range.retain())
.doFinally(s -> range.release()); .doFinally(s -> range.release());
} finally {
range.release();
}
} }
private long fastSizeAll(@Nullable LLSnapshot snapshot) { private long fastSizeAll(@Nullable LLSnapshot snapshot) {
@ -1784,6 +1893,7 @@ public class LLLocalDictionary implements LLDictionary {
@Override @Override
public Mono<Entry<ByteBuf, ByteBuf>> removeOne(LLRange range) { public Mono<Entry<ByteBuf, ByteBuf>> removeOne(LLRange range) {
try {
return Mono return Mono
.fromCallable(() -> { .fromCallable(() -> {
var readOpts = getReadOptions(null); var readOpts = getReadOptions(null);
@ -1822,7 +1932,11 @@ public class LLLocalDictionary implements LLDictionary {
}) })
.onErrorMap(cause -> new IOException("Failed to delete " + range.toString(), cause)) .onErrorMap(cause -> new IOException("Failed to delete " + range.toString(), cause))
.subscribeOn(dbScheduler) .subscribeOn(dbScheduler)
.doOnSubscribe(s -> range.retain())
.doFinally(s -> range.release()); .doFinally(s -> range.release());
} finally {
range.release();
}
} }
@NotNull @NotNull