Fix ClassCastException

This commit is contained in:
Andrea Cavalli 2021-09-29 11:47:17 +02:00
parent 74d20204ab
commit 6b8c1025d1

View File

@ -1083,15 +1083,17 @@ public class LLLocalDictionary implements LLDictionary {
return keys return keys
.buffer(MULTI_GET_WINDOW) .buffer(MULTI_GET_WINDOW)
.doOnDiscard(Tuple2.class, discardedEntry -> { .doOnDiscard(Tuple2.class, discardedEntry -> {
//noinspection unchecked if (discardedEntry.getT2() instanceof Resource<?> resource) {
var entry = (Tuple2<K, Buffer>) discardedEntry; resource.close();
entry.getT2().close(); }
}) })
.doOnDiscard(Tuple3.class, discardedEntry -> { .doOnDiscard(Tuple3.class, discardedEntry -> {
//noinspection unchecked if (discardedEntry.getT2() instanceof Resource<?> resource) {
var entry = (Tuple3<K, Buffer, Buffer>) discardedEntry; resource.close();
entry.getT2().close(); }
entry.getT3().close(); if (discardedEntry.getT3() instanceof Resource<?> resource) {
resource.close();
}
}) })
.flatMapSequential(keysWindow -> { .flatMapSequential(keysWindow -> {
List<Send<Buffer>> keyBufsWindowSend = new ArrayList<>(keysWindow.size()); List<Send<Buffer>> keyBufsWindowSend = new ArrayList<>(keysWindow.size());
@ -1280,7 +1282,7 @@ public class LLLocalDictionary implements LLDictionary {
BiSerializationFunction<Send<Buffer>, X, Send<Buffer>> updateFunction) { BiSerializationFunction<Send<Buffer>, X, Send<Buffer>> updateFunction) {
return entries return entries
.buffer(Math.min(MULTI_GET_WINDOW, CAPPED_WRITE_BATCH_CAP)) .buffer(Math.min(MULTI_GET_WINDOW, CAPPED_WRITE_BATCH_CAP))
.flatMapSequential(ew -> this.<Iterable<ExtraKeyOperationResult<Send<Buffer>, X>>>runOnDb(() -> { .flatMapSequential(ew -> this.<List<ExtraKeyOperationResult<Send<Buffer>, X>>>runOnDb(() -> {
List<Tuple2<Buffer, X>> entriesWindow = new ArrayList<>(ew.size()); List<Tuple2<Buffer, X>> entriesWindow = new ArrayList<>(ew.size());
for (Tuple2<Send<Buffer>, X> tuple : ew) { for (Tuple2<Send<Buffer>, X> tuple : ew) {
entriesWindow.add(tuple.mapT1(Send::receive)); entriesWindow.add(tuple.mapT1(Send::receive));
@ -1421,15 +1423,17 @@ public class LLLocalDictionary implements LLDictionary {
bb.close(); bb.close();
} }
}) })
.doOnDiscard(Collection.class, obj -> { .doOnDiscard(List.class, obj -> {
//noinspection unchecked if (!obj.isEmpty() && obj.get(0) instanceof ExtraKeyOperationResult<?, ?>) {
var castedEntries = (Collection<ExtraKeyOperationResult<Object, Object>>) obj; //noinspection unchecked
for (var entry : castedEntries) { var castedEntries = (List<ExtraKeyOperationResult<?, ?>>) obj;
if (entry.key() instanceof Buffer bb) { for (ExtraKeyOperationResult<?, ?> entry : castedEntries) {
bb.close(); if (entry.key() instanceof Resource<?> bb) {
} bb.close();
if (entry.extra() instanceof Buffer bb) { }
bb.close(); if (entry.extra() instanceof Resource<?> bb) {
bb.close();
}
} }
} }
}); });