Fix discard errors
This commit is contained in:
parent
abde1d1aab
commit
c5d353e02a
@ -355,10 +355,12 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
|
|||||||
.flatMap(entry -> Mono
|
.flatMap(entry -> Mono
|
||||||
.fromCallable(() -> serializeEntry(entry.getKey(), entry.getValue()))
|
.fromCallable(() -> serializeEntry(entry.getKey(), entry.getValue()))
|
||||||
.doOnDiscard(Entry.class, uncastedEntry -> {
|
.doOnDiscard(Entry.class, uncastedEntry -> {
|
||||||
//noinspection unchecked
|
if (uncastedEntry.getKey() instanceof ByteBuf byteBuf) {
|
||||||
var castedEntry = (Entry<ByteBuf, ByteBuf>) uncastedEntry;
|
byteBuf.release();
|
||||||
castedEntry.getKey().release();
|
}
|
||||||
castedEntry.getValue().release();
|
if (uncastedEntry.getValue() instanceof ByteBuf byteBuf) {
|
||||||
|
byteBuf.release();
|
||||||
|
}
|
||||||
})
|
})
|
||||||
);
|
);
|
||||||
return dictionary
|
return dictionary
|
||||||
@ -372,12 +374,15 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
|
|||||||
Flux<Tuple2<ByteBuf, X>> serializedEntries = entries
|
Flux<Tuple2<ByteBuf, X>> serializedEntries = entries
|
||||||
.flatMap(entry -> Mono
|
.flatMap(entry -> Mono
|
||||||
.fromCallable(() -> Tuples.of(serializeSuffix(entry.getT1()), entry.getT2()))
|
.fromCallable(() -> Tuples.of(serializeSuffix(entry.getT1()), entry.getT2()))
|
||||||
.doOnDiscard(Entry.class, uncastedEntry -> {
|
)
|
||||||
//noinspection unchecked
|
.doOnDiscard(Tuple2.class, uncastedEntry -> {
|
||||||
var castedEntry = (Tuple2<ByteBuf, Object>) uncastedEntry;
|
if (uncastedEntry.getT1() instanceof ByteBuf byteBuf) {
|
||||||
castedEntry.getT1().release();
|
byteBuf.release();
|
||||||
})
|
}
|
||||||
);
|
if (uncastedEntry.getT2() instanceof ByteBuf byteBuf) {
|
||||||
|
byteBuf.release();
|
||||||
|
}
|
||||||
|
});
|
||||||
var serializedUpdater = getSerializedUpdater(updater);
|
var serializedUpdater = getSerializedUpdater(updater);
|
||||||
return dictionary.updateMulti(serializedEntries, serializedUpdater)
|
return dictionary.updateMulti(serializedEntries, serializedUpdater)
|
||||||
.map(result -> new ExtraKeyOperationResult<>(deserializeSuffix(result.key()),
|
.map(result -> new ExtraKeyOperationResult<>(deserializeSuffix(result.key()),
|
||||||
@ -419,11 +424,13 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
|
|||||||
deserializeSuffix(stripPrefix(serializedEntry.getKey(), false)),
|
deserializeSuffix(stripPrefix(serializedEntry.getKey(), false)),
|
||||||
valueSerializer.deserialize(serializedEntry.getValue())
|
valueSerializer.deserialize(serializedEntry.getValue())
|
||||||
))
|
))
|
||||||
.doOnDiscard(Entry.class, entry -> {
|
.doOnDiscard(Entry.class, uncastedEntry -> {
|
||||||
//noinspection unchecked
|
if (uncastedEntry.getKey() instanceof ByteBuf byteBuf) {
|
||||||
var castedEntry = (Entry<ByteBuf, ByteBuf>) entry;
|
byteBuf.release();
|
||||||
castedEntry.getKey().release();
|
}
|
||||||
castedEntry.getValue().release();
|
if (uncastedEntry.getValue() instanceof ByteBuf byteBuf) {
|
||||||
|
byteBuf.release();
|
||||||
|
}
|
||||||
})
|
})
|
||||||
.doFirst(range::retain)
|
.doFirst(range::retain)
|
||||||
.doAfterTerminate(range::release);
|
.doAfterTerminate(range::release);
|
||||||
|
@ -480,10 +480,10 @@ public class DatabaseMapDictionaryDeep<T, U, US extends DatabaseStage<U>> implem
|
|||||||
})
|
})
|
||||||
)
|
)
|
||||||
.doOnDiscard(Collection.class, discardedCollection -> {
|
.doOnDiscard(Collection.class, discardedCollection -> {
|
||||||
//noinspection unchecked
|
for (Object o : discardedCollection) {
|
||||||
var rangeKeys = (Collection<ByteBuf>) discardedCollection;
|
if (o instanceof ByteBuf byteBuf) {
|
||||||
for (ByteBuf rangeKey : rangeKeys) {
|
byteBuf.release();
|
||||||
rangeKey.release();
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
} else {
|
} else {
|
||||||
|
Loading…
Reference in New Issue
Block a user