From d411675c2bc12ec28dec6e99f6262b79934d3a4e Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Sat, 31 Jul 2021 18:00:53 +0200 Subject: [PATCH] Use flatMapIterable when possible --- .../DatabaseMapDictionaryHashed.java | 2 +- .../database/disk/LLLocalDictionary.java | 90 +++++++++---------- .../cavallium/dbengine/TestDictionaryMap.java | 6 +- .../dbengine/TestDictionaryMapDeep.java | 6 +- 4 files changed, 52 insertions(+), 52 deletions(-) diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryHashed.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryHashed.java index 0951a30..0b44fc8 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryHashed.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryHashed.java @@ -201,7 +201,7 @@ public class DatabaseMapDictionaryHashed implements DatabaseStageMap list); } @Override diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java index 2010fd6..9a002e7 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java @@ -980,54 +980,54 @@ public class LLLocalDictionary implements LLDictionary { keyBufsWindow.add(objects.getT2()); } return Mono - .fromCallable(() -> { - Iterable locks; - ArrayList stamps; - if (updateMode == UpdateMode.ALLOW) { - locks = itemsLock.bulkGetAt(getLockIndices(keyBufsWindow)); - stamps = new ArrayList<>(); - for (var lock : locks) { + .fromCallable(() -> { + Iterable locks; + ArrayList stamps; + if (updateMode == UpdateMode.ALLOW) { + locks = itemsLock.bulkGetAt(getLockIndices(keyBufsWindow)); + stamps = new ArrayList<>(); + for (var lock : locks) { - stamps.add(lock.readLock()); - } + stamps.add(lock.readLock()); + } + } else { + locks = null; + stamps = null; + } + try { + var columnFamilyHandles = new RepeatedElementList<>(cfh, keysWindow.size()); + var results = db.multiGetAsList(resolveSnapshot(snapshot), columnFamilyHandles, LLUtils.toArray(keyBufsWindow)); + var mappedResults = new ArrayList>>(results.size()); + for (int i = 0; i < results.size(); i++) { + byte[] val = results.get(i); + Optional valueOpt; + if (val != null) { + results.set(i, null); + valueOpt = Optional.of(wrappedBuffer(val)); } else { - locks = null; - stamps = null; + valueOpt = Optional.empty(); } - try { - var columnFamilyHandles = new RepeatedElementList<>(cfh, keysWindow.size()); - var results = db.multiGetAsList(resolveSnapshot(snapshot), columnFamilyHandles, LLUtils.toArray(keyBufsWindow)); - var mappedResults = new ArrayList>>(results.size()); - for (int i = 0; i < results.size(); i++) { - byte[] val = results.get(i); - Optional valueOpt; - if (val != null) { - results.set(i, null); - valueOpt = Optional.of(wrappedBuffer(val)); - } else { - valueOpt = Optional.empty(); - } - mappedResults.add(Tuples.of(keysWindow.get(i).getT1(), - keyBufsWindow.get(i).retain(), - valueOpt - )); - } - return mappedResults; - } finally { - if (updateMode == UpdateMode.ALLOW) { - int index = 0; - for (var lock : locks) { - lock.unlockRead(stamps.get(index)); - index++; - } - } + mappedResults.add(Tuples.of(keysWindow.get(i).getT1(), + keyBufsWindow.get(i).retain(), + valueOpt + )); + } + return mappedResults; + } finally { + if (updateMode == UpdateMode.ALLOW) { + int index = 0; + for (var lock : locks) { + lock.unlockRead(stamps.get(index)); + index++; } - }) - .subscribeOn(dbScheduler) - .flatMapMany(Flux::fromIterable) - .onErrorMap(cause -> new IOException("Failed to read keys " - + Arrays.deepToString(keyBufsWindow.toArray(ByteBuf[]::new)), cause)) - .doAfterTerminate(() -> keyBufsWindow.forEach(ReferenceCounted::release)); + } + } + }) + .subscribeOn(dbScheduler) + .flatMapIterable(list -> list) + .onErrorMap(cause -> new IOException("Failed to read keys " + + Arrays.deepToString(keyBufsWindow.toArray(ByteBuf[]::new)), cause)) + .doAfterTerminate(() -> keyBufsWindow.forEach(ReferenceCounted::release)); }, 2) // Max concurrency is 2 to read data while preparing the next segment .doOnDiscard(Entry.class, discardedEntry -> { //noinspection unchecked @@ -1243,7 +1243,7 @@ public class LLLocalDictionary implements LLDictionary { } }) .subscribeOn(dbScheduler) - .flatMapMany(Flux::fromIterable); + .concatMapIterable(list -> list); }, entriesWindow -> { for (Tuple2 entry : entriesWindow) { diff --git a/src/test/java/it/cavallium/dbengine/TestDictionaryMap.java b/src/test/java/it/cavallium/dbengine/TestDictionaryMap.java index 47eddd8..b2f69cf 100644 --- a/src/test/java/it/cavallium/dbengine/TestDictionaryMap.java +++ b/src/test/java/it/cavallium/dbengine/TestDictionaryMap.java @@ -477,7 +477,7 @@ public class TestDictionaryMap { .flatMapMany(map -> Flux .concat(map.setAndGetPrevious(entries), map.setAndGetPrevious(entries)) .map(Map::entrySet) - .flatMap(Flux::fromIterable) + .concatMapIterable(list -> list) .doAfterTerminate(map::release) ) )); @@ -502,7 +502,7 @@ public class TestDictionaryMap { .flatMapMany(map -> Flux .concat(map.set(entries).then(Mono.empty()), map.clearAndGetPrevious(), map.get(null)) .map(Map::entrySet) - .flatMap(Flux::fromIterable) + .concatMapIterable(list -> list) .doAfterTerminate(map::release) ) )); @@ -555,7 +555,7 @@ public class TestDictionaryMap { map.putMulti(Flux.fromIterable(entries.entrySet())).then(Mono.empty()), map.get(null) .map(Map::entrySet) - .flatMapMany(Flux::fromIterable) + .flatMapIterable(list -> list) ) .doAfterTerminate(map::release) ) diff --git a/src/test/java/it/cavallium/dbengine/TestDictionaryMapDeep.java b/src/test/java/it/cavallium/dbengine/TestDictionaryMapDeep.java index a04faed..31ab5bf 100644 --- a/src/test/java/it/cavallium/dbengine/TestDictionaryMapDeep.java +++ b/src/test/java/it/cavallium/dbengine/TestDictionaryMapDeep.java @@ -822,7 +822,7 @@ public class TestDictionaryMapDeep { map.setAndGetPrevious(entries) ) .map(Map::entrySet) - .flatMap(Flux::fromIterable) + .concatMapIterable(list -> list) .doAfterTerminate(map::release) ) )); @@ -847,7 +847,7 @@ public class TestDictionaryMapDeep { .flatMapMany(map -> Flux .concat(map.set(entries).then(Mono.empty()), map.clearAndGetPrevious(), map.get(null)) .map(Map::entrySet) - .flatMap(Flux::fromIterable) + .concatMapIterable(list -> list) .doAfterTerminate(map::release) ) )); @@ -900,7 +900,7 @@ public class TestDictionaryMapDeep { map.putMulti(Flux.fromIterable(entries.entrySet())).then(Mono.empty()), map.get(null) .map(Map::entrySet) - .flatMapMany(Flux::fromIterable) + .flatMapIterable(list -> list) ) .doAfterTerminate(map::release) )