Use flatMapIterable when possible
This commit is contained in:
parent
7597e54bac
commit
d411675c2b
@ -201,7 +201,7 @@ public class DatabaseMapDictionaryHashed<T, U, TH> implements DatabaseStageMap<T
|
|||||||
.getAllValues(snapshot)
|
.getAllValues(snapshot)
|
||||||
.map(Entry::getValue)
|
.map(Entry::getValue)
|
||||||
.map(Collections::unmodifiableSet)
|
.map(Collections::unmodifiableSet)
|
||||||
.flatMap(Flux::fromIterable);
|
.concatMapIterable(list -> list);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -980,54 +980,54 @@ public class LLLocalDictionary implements LLDictionary {
|
|||||||
keyBufsWindow.add(objects.getT2());
|
keyBufsWindow.add(objects.getT2());
|
||||||
}
|
}
|
||||||
return Mono
|
return Mono
|
||||||
.fromCallable(() -> {
|
.fromCallable(() -> {
|
||||||
Iterable<StampedLock> locks;
|
Iterable<StampedLock> locks;
|
||||||
ArrayList<Long> stamps;
|
ArrayList<Long> stamps;
|
||||||
if (updateMode == UpdateMode.ALLOW) {
|
if (updateMode == UpdateMode.ALLOW) {
|
||||||
locks = itemsLock.bulkGetAt(getLockIndices(keyBufsWindow));
|
locks = itemsLock.bulkGetAt(getLockIndices(keyBufsWindow));
|
||||||
stamps = new ArrayList<>();
|
stamps = new ArrayList<>();
|
||||||
for (var lock : locks) {
|
for (var lock : locks) {
|
||||||
|
|
||||||
stamps.add(lock.readLock());
|
stamps.add(lock.readLock());
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
locks = null;
|
||||||
|
stamps = null;
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
var columnFamilyHandles = new RepeatedElementList<>(cfh, keysWindow.size());
|
||||||
|
var results = db.multiGetAsList(resolveSnapshot(snapshot), columnFamilyHandles, LLUtils.toArray(keyBufsWindow));
|
||||||
|
var mappedResults = new ArrayList<Tuple3<K, ByteBuf, Optional<ByteBuf>>>(results.size());
|
||||||
|
for (int i = 0; i < results.size(); i++) {
|
||||||
|
byte[] val = results.get(i);
|
||||||
|
Optional<ByteBuf> valueOpt;
|
||||||
|
if (val != null) {
|
||||||
|
results.set(i, null);
|
||||||
|
valueOpt = Optional.of(wrappedBuffer(val));
|
||||||
} else {
|
} else {
|
||||||
locks = null;
|
valueOpt = Optional.empty();
|
||||||
stamps = null;
|
|
||||||
}
|
}
|
||||||
try {
|
mappedResults.add(Tuples.of(keysWindow.get(i).getT1(),
|
||||||
var columnFamilyHandles = new RepeatedElementList<>(cfh, keysWindow.size());
|
keyBufsWindow.get(i).retain(),
|
||||||
var results = db.multiGetAsList(resolveSnapshot(snapshot), columnFamilyHandles, LLUtils.toArray(keyBufsWindow));
|
valueOpt
|
||||||
var mappedResults = new ArrayList<Tuple3<K, ByteBuf, Optional<ByteBuf>>>(results.size());
|
));
|
||||||
for (int i = 0; i < results.size(); i++) {
|
}
|
||||||
byte[] val = results.get(i);
|
return mappedResults;
|
||||||
Optional<ByteBuf> valueOpt;
|
} finally {
|
||||||
if (val != null) {
|
if (updateMode == UpdateMode.ALLOW) {
|
||||||
results.set(i, null);
|
int index = 0;
|
||||||
valueOpt = Optional.of(wrappedBuffer(val));
|
for (var lock : locks) {
|
||||||
} else {
|
lock.unlockRead(stamps.get(index));
|
||||||
valueOpt = Optional.empty();
|
index++;
|
||||||
}
|
|
||||||
mappedResults.add(Tuples.of(keysWindow.get(i).getT1(),
|
|
||||||
keyBufsWindow.get(i).retain(),
|
|
||||||
valueOpt
|
|
||||||
));
|
|
||||||
}
|
|
||||||
return mappedResults;
|
|
||||||
} finally {
|
|
||||||
if (updateMode == UpdateMode.ALLOW) {
|
|
||||||
int index = 0;
|
|
||||||
for (var lock : locks) {
|
|
||||||
lock.unlockRead(stamps.get(index));
|
|
||||||
index++;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
})
|
}
|
||||||
.subscribeOn(dbScheduler)
|
}
|
||||||
.flatMapMany(Flux::fromIterable)
|
})
|
||||||
.onErrorMap(cause -> new IOException("Failed to read keys "
|
.subscribeOn(dbScheduler)
|
||||||
+ Arrays.deepToString(keyBufsWindow.toArray(ByteBuf[]::new)), cause))
|
.flatMapIterable(list -> list)
|
||||||
.doAfterTerminate(() -> keyBufsWindow.forEach(ReferenceCounted::release));
|
.onErrorMap(cause -> new IOException("Failed to read keys "
|
||||||
|
+ Arrays.deepToString(keyBufsWindow.toArray(ByteBuf[]::new)), cause))
|
||||||
|
.doAfterTerminate(() -> keyBufsWindow.forEach(ReferenceCounted::release));
|
||||||
}, 2) // Max concurrency is 2 to read data while preparing the next segment
|
}, 2) // Max concurrency is 2 to read data while preparing the next segment
|
||||||
.doOnDiscard(Entry.class, discardedEntry -> {
|
.doOnDiscard(Entry.class, discardedEntry -> {
|
||||||
//noinspection unchecked
|
//noinspection unchecked
|
||||||
@ -1243,7 +1243,7 @@ public class LLLocalDictionary implements LLDictionary {
|
|||||||
}
|
}
|
||||||
})
|
})
|
||||||
.subscribeOn(dbScheduler)
|
.subscribeOn(dbScheduler)
|
||||||
.flatMapMany(Flux::fromIterable);
|
.concatMapIterable(list -> list);
|
||||||
},
|
},
|
||||||
entriesWindow -> {
|
entriesWindow -> {
|
||||||
for (Tuple2<ByteBuf, X> entry : entriesWindow) {
|
for (Tuple2<ByteBuf, X> entry : entriesWindow) {
|
||||||
|
@ -477,7 +477,7 @@ public class TestDictionaryMap {
|
|||||||
.flatMapMany(map -> Flux
|
.flatMapMany(map -> Flux
|
||||||
.concat(map.setAndGetPrevious(entries), map.setAndGetPrevious(entries))
|
.concat(map.setAndGetPrevious(entries), map.setAndGetPrevious(entries))
|
||||||
.map(Map::entrySet)
|
.map(Map::entrySet)
|
||||||
.flatMap(Flux::fromIterable)
|
.concatMapIterable(list -> list)
|
||||||
.doAfterTerminate(map::release)
|
.doAfterTerminate(map::release)
|
||||||
)
|
)
|
||||||
));
|
));
|
||||||
@ -502,7 +502,7 @@ public class TestDictionaryMap {
|
|||||||
.flatMapMany(map -> Flux
|
.flatMapMany(map -> Flux
|
||||||
.concat(map.set(entries).then(Mono.empty()), map.clearAndGetPrevious(), map.get(null))
|
.concat(map.set(entries).then(Mono.empty()), map.clearAndGetPrevious(), map.get(null))
|
||||||
.map(Map::entrySet)
|
.map(Map::entrySet)
|
||||||
.flatMap(Flux::fromIterable)
|
.concatMapIterable(list -> list)
|
||||||
.doAfterTerminate(map::release)
|
.doAfterTerminate(map::release)
|
||||||
)
|
)
|
||||||
));
|
));
|
||||||
@ -555,7 +555,7 @@ public class TestDictionaryMap {
|
|||||||
map.putMulti(Flux.fromIterable(entries.entrySet())).then(Mono.empty()),
|
map.putMulti(Flux.fromIterable(entries.entrySet())).then(Mono.empty()),
|
||||||
map.get(null)
|
map.get(null)
|
||||||
.map(Map::entrySet)
|
.map(Map::entrySet)
|
||||||
.flatMapMany(Flux::fromIterable)
|
.flatMapIterable(list -> list)
|
||||||
)
|
)
|
||||||
.doAfterTerminate(map::release)
|
.doAfterTerminate(map::release)
|
||||||
)
|
)
|
||||||
|
@ -822,7 +822,7 @@ public class TestDictionaryMapDeep {
|
|||||||
map.setAndGetPrevious(entries)
|
map.setAndGetPrevious(entries)
|
||||||
)
|
)
|
||||||
.map(Map::entrySet)
|
.map(Map::entrySet)
|
||||||
.flatMap(Flux::fromIterable)
|
.concatMapIterable(list -> list)
|
||||||
.doAfterTerminate(map::release)
|
.doAfterTerminate(map::release)
|
||||||
)
|
)
|
||||||
));
|
));
|
||||||
@ -847,7 +847,7 @@ public class TestDictionaryMapDeep {
|
|||||||
.flatMapMany(map -> Flux
|
.flatMapMany(map -> Flux
|
||||||
.concat(map.set(entries).then(Mono.empty()), map.clearAndGetPrevious(), map.get(null))
|
.concat(map.set(entries).then(Mono.empty()), map.clearAndGetPrevious(), map.get(null))
|
||||||
.map(Map::entrySet)
|
.map(Map::entrySet)
|
||||||
.flatMap(Flux::fromIterable)
|
.concatMapIterable(list -> list)
|
||||||
.doAfterTerminate(map::release)
|
.doAfterTerminate(map::release)
|
||||||
)
|
)
|
||||||
));
|
));
|
||||||
@ -900,7 +900,7 @@ public class TestDictionaryMapDeep {
|
|||||||
map.putMulti(Flux.fromIterable(entries.entrySet())).then(Mono.empty()),
|
map.putMulti(Flux.fromIterable(entries.entrySet())).then(Mono.empty()),
|
||||||
map.get(null)
|
map.get(null)
|
||||||
.map(Map::entrySet)
|
.map(Map::entrySet)
|
||||||
.flatMapMany(Flux::fromIterable)
|
.flatMapIterable(list -> list)
|
||||||
)
|
)
|
||||||
.doAfterTerminate(map::release)
|
.doAfterTerminate(map::release)
|
||||||
)
|
)
|
||||||
|
Loading…
Reference in New Issue
Block a user