Update Example.java, DatabaseMapDictionary.java, and 4 more files...

This commit is contained in:
Andrea Cavalli 2021-02-01 02:21:53 +01:00
parent 6d63ff3edf
commit 554facde13
6 changed files with 181 additions and 84 deletions

View File

@ -5,20 +5,24 @@ import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled; import io.netty.buffer.Unpooled;
import it.cavallium.dbengine.database.Column; import it.cavallium.dbengine.database.Column;
import it.cavallium.dbengine.database.LLKeyValueDatabase; import it.cavallium.dbengine.database.LLKeyValueDatabase;
import it.cavallium.dbengine.database.collections.DatabaseMapDictionaryDeep;
import it.cavallium.dbengine.database.collections.DatabaseMapDictionary; import it.cavallium.dbengine.database.collections.DatabaseMapDictionary;
import it.cavallium.dbengine.database.collections.DatabaseMapDictionaryDeep;
import it.cavallium.dbengine.database.collections.FixedLengthSerializer; import it.cavallium.dbengine.database.collections.FixedLengthSerializer;
import it.cavallium.dbengine.database.collections.Serializer; import it.cavallium.dbengine.database.collections.Serializer;
import it.cavallium.dbengine.database.collections.SubStageGetterSingleBytes; import it.cavallium.dbengine.database.collections.SubStageGetterSingleBytes;
import it.cavallium.dbengine.database.disk.LLLocalDatabaseConnection; import it.cavallium.dbengine.database.disk.LLLocalDatabaseConnection;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
import java.text.DecimalFormat; import java.text.DecimalFormat;
import java.time.Duration; import java.time.Duration;
import java.time.Instant; import java.time.Instant;
import java.util.Arrays; import java.util.Arrays;
import java.util.Comparator;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Locale; import java.util.Locale;
import java.util.concurrent.CompletionException;
import java.util.function.Function; import java.util.function.Function;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
@ -30,10 +34,11 @@ import reactor.util.function.Tuples;
public class Example { public class Example {
private static final boolean printPreviousValue = false; private static final boolean printPreviousValue = false;
private static final int numRepeats = 500; private static final int numRepeats = 100;
private static final int batchSize = 1000; private static final int batchSize = 10000;
public static void main(String[] args) throws InterruptedException { public static void main(String[] args) throws InterruptedException {
/*
testAtPut(); testAtPut();
testPutValueAndGetPrevious(); testPutValueAndGetPrevious();
testPutValue(); testPutValue();
@ -45,6 +50,14 @@ public class Example {
.then(rangeTestPutMulti()) .then(rangeTestPutMulti())
.subscribeOn(Schedulers.parallel()) .subscribeOn(Schedulers.parallel())
.blockOptional(); .blockOptional();
*/
testPutMulti()
.then(rangeTestPutMulti())
.subscribeOn(Schedulers.parallel())
.blockOptional();
} }
private static Mono<Void> testAtPut() { private static Mono<Void> testAtPut() {
@ -128,9 +141,8 @@ public class Example {
private static Mono<Void> testPutMulti() { private static Mono<Void> testPutMulti() {
var ssg = new SubStageGetterSingleBytes(); var ssg = new SubStageGetterSingleBytes();
var ser = FixedLengthSerializer.noop(4); var ser = FixedLengthSerializer.noop(4);
int batchSize = 1000;
HashMap<ByteBuf, byte[]> keysToPut = new HashMap<>(); HashMap<ByteBuf, byte[]> keysToPut = new HashMap<>();
for (int i = 0; i < 1000; i++) { for (int i = 0; i < batchSize; i++) {
keysToPut.put(Unpooled.wrappedBuffer(Ints.toByteArray(i * 3)), Ints.toByteArray(i * 11)); keysToPut.put(Unpooled.wrappedBuffer(Ints.toByteArray(i * 3)), Ints.toByteArray(i * 11));
} }
var putMultiFlux = Flux.fromIterable(keysToPut.entrySet()); var putMultiFlux = Flux.fromIterable(keysToPut.entrySet());
@ -243,8 +255,25 @@ public class Example {
} }
private static <U> Mono<? extends LLKeyValueDatabase> tempDb() { private static <U> Mono<? extends LLKeyValueDatabase> tempDb() {
return new LLLocalDatabaseConnection(Path.of("/tmp/"), true) var wrkspcPath = Path.of("/home/ubuntu/tempdb/");
.connect() return Mono
.fromCallable(() -> {
if (Files.exists(wrkspcPath)) {
Files.walk(wrkspcPath)
.sorted(Comparator.reverseOrder())
.forEach(file -> {
try {
Files.delete(file);
} catch (IOException ex) {
throw new CompletionException(ex);
}
});
}
Files.createDirectories(wrkspcPath);
return null;
})
.subscribeOn(Schedulers.boundedElastic())
.then(new LLLocalDatabaseConnection(wrkspcPath, true).connect())
.flatMap(conn -> conn.getDatabase("testdb", List.of(Column.dictionary("testmap")), false)); .flatMap(conn -> conn.getDatabase("testdb", List.of(Column.dictionary("testmap")), false));
} }
@ -256,14 +285,13 @@ public class Example {
Duration WAIT_TIME = Duration.ofSeconds(5); Duration WAIT_TIME = Duration.ofSeconds(5);
Duration WAIT_TIME_END = Duration.ofSeconds(5); Duration WAIT_TIME_END = Duration.ofSeconds(5);
return Mono return Mono
.fromRunnable(() -> instantInit.tryEmitValue(now())) .delay(WAIT_TIME)
.then(Mono.fromRunnable(() -> instantInit.tryEmitValue(now())))
.then(setup) .then(setup)
.delayElement(WAIT_TIME)
.doOnSuccess(s -> instantInitTest.tryEmitValue(now())) .doOnSuccess(s -> instantInitTest.tryEmitValue(now()))
.flatMap(a ->Mono.defer(() -> test.apply(a)).repeat(numRepeats) .flatMap(a ->Mono.defer(() -> test.apply(a)).repeat(numRepeats)
.then() .then()
.doOnSuccess(s -> instantEndTest.tryEmitValue(now())) .doOnSuccess(s -> instantEndTest.tryEmitValue(now()))
.delayElement(WAIT_TIME_END)
.then(close.apply(a))) .then(close.apply(a)))
.doOnSuccess(s -> instantEnd.tryEmitValue(now())) .doOnSuccess(s -> instantEnd.tryEmitValue(now()))
.then(Mono.zip(instantInit.asMono(), instantInitTest.asMono(), instantEndTest.asMono(), instantEnd.asMono())) .then(Mono.zip(instantInit.asMono(), instantInitTest.asMono(), instantEndTest.asMono(), instantEnd.asMono()))
@ -271,23 +299,24 @@ public class Example {
System.out.println("----------------------------------------------------------------------"); System.out.println("----------------------------------------------------------------------");
System.out.println(name); System.out.println(name);
System.out.println( System.out.println(
"\t - Executed " + DecimalFormat.getInstance(Locale.ITALY).format(numRepeats) + " times:"); "\t - Executed " + DecimalFormat.getInstance(Locale.ITALY).format((numRepeats * batchSize)) + " times:");
System.out.println("\t - Test time: " + DecimalFormat System.out.println("\t - Test time: " + DecimalFormat
.getInstance(Locale.ITALY) .getInstance(Locale.ITALY)
.format(Duration.between(tuple.getT2(), tuple.getT3()).toNanos() / (double) numRepeats / (double) 1000000) .format(Duration.between(tuple.getT2(), tuple.getT3()).toNanos() / (double) (numRepeats * batchSize) / (double) 1000000)
+ "ms"); + "ms");
System.out.println("\t - Test speed: " + DecimalFormat System.out.println("\t - Test speed: " + DecimalFormat
.getInstance(Locale.ITALY) .getInstance(Locale.ITALY)
.format(numRepeats / (Duration.between(tuple.getT2(), tuple.getT3()).toNanos() / (double) 1000000 / (double) 1000)) .format((numRepeats * batchSize) / (Duration.between(tuple.getT2(), tuple.getT3()).toNanos() / (double) 1000000 / (double) 1000))
+ " tests/s"); + " tests/s");
System.out.println("\t - Total time: " + DecimalFormat System.out.println("\t - Total time: " + DecimalFormat
.getInstance(Locale.ITALY) .getInstance(Locale.ITALY)
.format(Duration.between(tuple.getT2(), tuple.getT3()).toNanos() / (double) 1000000) + "ms"); .format(Duration.between(tuple.getT2(), tuple.getT3()).toNanos() / (double) 1000000) + "ms");
System.out.println("\t - Total time (setup+test+end): " + DecimalFormat System.out.println("\t - Total time (setup+test+end): " + DecimalFormat
.getInstance(Locale.ITALY) .getInstance(Locale.ITALY)
.format(Duration.between(tuple.getT1(), tuple.getT4().minus(WAIT_TIME)).toNanos() / (double) 1000000) + "ms"); .format(Duration.between(tuple.getT1(), tuple.getT4()).toNanos() / (double) 1000000) + "ms");
System.out.println("----------------------------------------------------------------------"); System.out.println("----------------------------------------------------------------------");
}) })
.delayElement(WAIT_TIME_END)
.then(); .then();
} }

View File

@ -136,6 +136,14 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
.map(entry -> Map.entry(deserializeSuffix(stripPrefix(entry.getKey())), deserialize(entry.getValue()))); .map(entry -> Map.entry(deserializeSuffix(stripPrefix(entry.getKey())), deserialize(entry.getValue())));
} }
@Override
public Mono<Void> putMulti(Flux<Entry<T, U>> entries) {
return dictionary
.putMulti(entries
.map(entry -> Map.entry(toKey(serializeSuffix(entry.getKey())), serialize(entry.getValue()))), false)
.then();
}
@Override @Override
public Flux<Entry<T, DatabaseStageEntry<U>>> getAllStages(@Nullable CompositeSnapshot snapshot) { public Flux<Entry<T, DatabaseStageEntry<U>>> getAllStages(@Nullable CompositeSnapshot snapshot) {
return dictionary return dictionary

View File

@ -26,7 +26,7 @@ public class SubStageGetterSingle<T> implements SubStageGetter<T, DatabaseStageE
throw new IndexOutOfBoundsException("Found more than one element!"); throw new IndexOutOfBoundsException("Found more than one element!");
} }
return null; return null;
})).thenReturn(new DatabaseSingle(dictionary, keyPrefix, Serializer.noopBytes())); })).thenReturn(new DatabaseSingle<>(dictionary, keyPrefix, serializer));
} }
//todo: temporary wrapper. convert the whole class to buffers //todo: temporary wrapper. convert the whole class to buffers

View File

@ -5,9 +5,12 @@ import it.cavallium.dbengine.database.LLDictionaryResultType;
import it.cavallium.dbengine.database.LLRange; import it.cavallium.dbengine.database.LLRange;
import it.cavallium.dbengine.database.LLSnapshot; import it.cavallium.dbengine.database.LLSnapshot;
import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.database.LLUtils;
import it.unimi.dsi.fastutil.objects.ObjectArrayList;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Iterator; import java.util.Iterator;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.Objects; import java.util.Objects;
@ -27,7 +30,7 @@ import org.warp.commonutils.concurrency.atomicity.NotAtomic;
import org.warp.commonutils.type.VariableWrapper; import org.warp.commonutils.type.VariableWrapper;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers; import reactor.core.scheduler.Scheduler;
@NotAtomic @NotAtomic
public class LLLocalDictionary implements LLDictionary { public class LLLocalDictionary implements LLDictionary {
@ -36,6 +39,7 @@ public class LLLocalDictionary implements LLDictionary {
static final int RESERVED_WRITE_BATCH_SIZE = 2 * 1024 * 1024; // 2MiB static final int RESERVED_WRITE_BATCH_SIZE = 2 * 1024 * 1024; // 2MiB
static final long MAX_WRITE_BATCH_SIZE = 1024L * 1024L * 1024L; // 1GiB static final long MAX_WRITE_BATCH_SIZE = 1024L * 1024L * 1024L; // 1GiB
static final int CAPPED_WRITE_BATCH_CAP = 50000; // 50K operations static final int CAPPED_WRITE_BATCH_CAP = 50000; // 50K operations
static final int MULTI_GET_WINDOW = 500;
static final WriteOptions BATCH_WRITE_OPTIONS = new WriteOptions().setLowPri(true); static final WriteOptions BATCH_WRITE_OPTIONS = new WriteOptions().setLowPri(true);
private static final byte[] FIRST_KEY = new byte[]{}; private static final byte[] FIRST_KEY = new byte[]{};
@ -44,17 +48,20 @@ public class LLLocalDictionary implements LLDictionary {
private final RocksDB db; private final RocksDB db;
private final ColumnFamilyHandle cfh; private final ColumnFamilyHandle cfh;
private final String databaseName; private final String databaseName;
private final Scheduler dbScheduler;
private final Function<LLSnapshot, Snapshot> snapshotResolver; private final Function<LLSnapshot, Snapshot> snapshotResolver;
public LLLocalDictionary(@NotNull RocksDB db, public LLLocalDictionary(@NotNull RocksDB db,
@NotNull ColumnFamilyHandle columnFamilyHandle, @NotNull ColumnFamilyHandle columnFamilyHandle,
String databaseName, String databaseName,
Scheduler dbScheduler,
Function<LLSnapshot, Snapshot> snapshotResolver) { Function<LLSnapshot, Snapshot> snapshotResolver) {
Objects.requireNonNull(db); Objects.requireNonNull(db);
this.db = db; this.db = db;
Objects.requireNonNull(columnFamilyHandle); Objects.requireNonNull(columnFamilyHandle);
this.cfh = columnFamilyHandle; this.cfh = columnFamilyHandle;
this.databaseName = databaseName; this.databaseName = databaseName;
this.dbScheduler = dbScheduler;
this.snapshotResolver = snapshotResolver; this.snapshotResolver = snapshotResolver;
} }
@ -95,7 +102,7 @@ public class LLLocalDictionary implements LLDictionary {
} }
}) })
.onErrorMap(IOException::new) .onErrorMap(IOException::new)
.subscribeOn(Schedulers.boundedElastic()); .subscribeOn(dbScheduler);
} }
@Override @Override
@ -129,7 +136,7 @@ public class LLLocalDictionary implements LLDictionary {
} }
}) })
.onErrorMap(IOException::new) .onErrorMap(IOException::new)
.subscribeOn(Schedulers.boundedElastic()); .subscribeOn(dbScheduler);
} }
private Mono<Boolean> containsKey(@Nullable LLSnapshot snapshot, byte[] key) { private Mono<Boolean> containsKey(@Nullable LLSnapshot snapshot, byte[] key) {
@ -147,7 +154,7 @@ public class LLLocalDictionary implements LLDictionary {
return size != RocksDB.NOT_FOUND; return size != RocksDB.NOT_FOUND;
}) })
.onErrorMap(IOException::new) .onErrorMap(IOException::new)
.subscribeOn(Schedulers.boundedElastic()); .subscribeOn(dbScheduler);
} }
@Override @Override
@ -159,7 +166,7 @@ public class LLLocalDictionary implements LLDictionary {
return null; return null;
}) })
.onErrorMap(IOException::new) .onErrorMap(IOException::new)
.subscribeOn(Schedulers.boundedElastic()) .subscribeOn(dbScheduler)
.then(response); .then(response);
} }
@ -172,7 +179,7 @@ public class LLLocalDictionary implements LLDictionary {
return null; return null;
}) })
.onErrorMap(IOException::new) .onErrorMap(IOException::new)
.subscribeOn(Schedulers.boundedElastic()) .subscribeOn(dbScheduler)
.then(response); .then(response);
} }
@ -195,7 +202,7 @@ public class LLLocalDictionary implements LLDictionary {
} }
}) })
.onErrorMap(IOException::new) .onErrorMap(IOException::new)
.subscribeOn(Schedulers.boundedElastic()); .subscribeOn(dbScheduler);
case VOID: case VOID:
return Mono.empty(); return Mono.empty();
default: default:
@ -205,40 +212,57 @@ public class LLLocalDictionary implements LLDictionary {
@Override @Override
public Flux<Entry<byte[], byte[]>> getMulti(@Nullable LLSnapshot snapshot, Flux<byte[]> keys) { public Flux<Entry<byte[], byte[]>> getMulti(@Nullable LLSnapshot snapshot, Flux<byte[]> keys) {
return keys.flatMap(key -> this.get(snapshot, key).map(value -> Map.entry(key, value))); return keys
.window(MULTI_GET_WINDOW)
.flatMap(keysWindowFlux -> keysWindowFlux.collectList()
.flatMapMany(keysWindow -> Mono
.<ArrayList<Entry<byte[], byte[]>>>fromCallable(() -> {
var handlesArray = new ColumnFamilyHandle[keysWindow.size()];
Arrays.fill(handlesArray, cfh);
var handles = ObjectArrayList.wrap(handlesArray, handlesArray.length);
var results = db.multiGetAsList(resolveSnapshot(snapshot), handles, keysWindow);
var mappedResults = new ArrayList<Entry<byte[], byte[]>>(results.size());
for (int i = 0; i < results.size(); i++) {
var val = results.get(i);
if (val != null) {
results.set(i, null);
mappedResults.add(Map.entry(keysWindow.get(i), val));
}
}
return mappedResults;
})
.subscribeOn(dbScheduler)
.<Entry<byte[], byte[]>>flatMapMany(Flux::fromIterable)
)
)
.onErrorMap(IOException::new);
} }
@Override @Override
public Flux<Entry<byte[], byte[]>> putMulti(Flux<Entry<byte[], byte[]>> entries, boolean getOldValues) { public Flux<Entry<byte[], byte[]>> putMulti(Flux<Entry<byte[], byte[]>> entries, boolean getOldValues) {
return Mono return entries
.fromCallable(() -> new CappedWriteBatch(db, .window(Math.min(MULTI_GET_WINDOW, CAPPED_WRITE_BATCH_CAP))
.publishOn(dbScheduler)
.flatMap(Flux::collectList)
.flatMap(entriesWindow -> this
.getMulti(null, Flux.fromIterable(entriesWindow).map(Entry::getKey))
.concatWith(Mono.fromCallable(() -> {
var batch = new CappedWriteBatch(db,
CAPPED_WRITE_BATCH_CAP, CAPPED_WRITE_BATCH_CAP,
RESERVED_WRITE_BATCH_SIZE, RESERVED_WRITE_BATCH_SIZE,
MAX_WRITE_BATCH_SIZE, MAX_WRITE_BATCH_SIZE,
BATCH_WRITE_OPTIONS BATCH_WRITE_OPTIONS
)) );
.subscribeOn(Schedulers.boundedElastic()) for (Entry<byte[], byte[]> entry : entriesWindow) {
.flatMapMany(writeBatch -> entries batch.put(entry.getKey(), entry.getValue());
.flatMap(newEntry -> putEntryToWriteBatch(newEntry, getOldValues, writeBatch))
.concatWith(Mono
.<Entry<byte[], byte[]>>fromCallable(() -> {
synchronized (writeBatch) {
writeBatch.writeToDbAndClose();
writeBatch.close();
} }
batch.writeToDbAndClose();
batch.close();
return null; return null;
}) })));
.subscribeOn(Schedulers.boundedElastic())
)
.doFinally(signalType -> {
synchronized (writeBatch) {
writeBatch.close();
}
})
)
.onErrorMap(IOException::new);
} }
@NotNull @NotNull
private Mono<Entry<byte[], byte[]>> putEntryToWriteBatch(Entry<byte[], byte[]> newEntry, boolean getOldValues, private Mono<Entry<byte[], byte[]>> putEntryToWriteBatch(Entry<byte[], byte[]> newEntry, boolean getOldValues,
CappedWriteBatch writeBatch) { CappedWriteBatch writeBatch) {
@ -256,10 +280,35 @@ public class LLLocalDictionary implements LLDictionary {
} }
return null; return null;
}) })
.subscribeOn(Schedulers.boundedElastic())) .subscribeOn(dbScheduler))
.map(oldValue -> Map.entry(newEntry.getKey(), oldValue))); .map(oldValue -> Map.entry(newEntry.getKey(), oldValue)));
} }
@NotNull
private Flux<Entry<byte[], byte[]>> putEntryToWriteBatch(List<Entry<byte[], byte[]>> newEntries, boolean getOldValues,
CappedWriteBatch writeBatch) {
return Flux
.from(Flux
.defer(() -> {
if (getOldValues) {
return getMulti(null, Flux.fromIterable(newEntries).map(Entry::getKey));
} else {
return Flux.empty();
}
})
.concatWith(Mono
.<Entry<byte[], byte[]>>fromCallable(() -> {
synchronized (writeBatch) {
for (Entry<byte[], byte[]> newEntry : newEntries) {
writeBatch.put(cfh, newEntry.getKey(), newEntry.getValue());
}
}
return null;
}).subscribeOn(dbScheduler)
)
);
}
@Override @Override
public Flux<Entry<byte[], byte[]>> getRange(@Nullable LLSnapshot snapshot, LLRange range) { public Flux<Entry<byte[], byte[]>> getRange(@Nullable LLSnapshot snapshot, LLRange range) {
if (range.isSingle()) { if (range.isSingle()) {
@ -287,7 +336,7 @@ public class LLLocalDictionary implements LLDictionary {
} }
return iter; return iter;
}) })
.subscribeOn(Schedulers.boundedElastic()) .subscribeOn(dbScheduler)
.flatMapMany(rocksIterator -> Flux .flatMapMany(rocksIterator -> Flux
.<Entry<byte[], byte[]>>fromIterable(() -> { .<Entry<byte[], byte[]>>fromIterable(() -> {
VariableWrapper<byte[]> nextKey = new VariableWrapper<>(null); VariableWrapper<byte[]> nextKey = new VariableWrapper<>(null);
@ -327,7 +376,7 @@ public class LLLocalDictionary implements LLDictionary {
}; };
}) })
.doFinally(signalType -> rocksIterator.close()) .doFinally(signalType -> rocksIterator.close())
.subscribeOn(Schedulers.boundedElastic()) .subscribeOn(dbScheduler)
); );
} }
@ -359,7 +408,7 @@ public class LLLocalDictionary implements LLDictionary {
} }
return iter; return iter;
}) })
.subscribeOn(Schedulers.boundedElastic()) .subscribeOn(dbScheduler)
.flatMapMany(rocksIterator -> Flux .flatMapMany(rocksIterator -> Flux
.<byte[]>fromIterable(() -> { .<byte[]>fromIterable(() -> {
VariableWrapper<byte[]> nextKey = new VariableWrapper<>(null); VariableWrapper<byte[]> nextKey = new VariableWrapper<>(null);
@ -391,7 +440,7 @@ public class LLLocalDictionary implements LLDictionary {
}; };
}) })
.doFinally(signalType -> rocksIterator.close()) .doFinally(signalType -> rocksIterator.close())
.subscribeOn(Schedulers.boundedElastic()) .subscribeOn(dbScheduler)
); );
} }
@ -404,7 +453,7 @@ public class LLLocalDictionary implements LLDictionary {
} else { } else {
return Mono return Mono
.fromCallable(() -> new CappedWriteBatch(db, CAPPED_WRITE_BATCH_CAP, RESERVED_WRITE_BATCH_SIZE, MAX_WRITE_BATCH_SIZE, BATCH_WRITE_OPTIONS)) .fromCallable(() -> new CappedWriteBatch(db, CAPPED_WRITE_BATCH_CAP, RESERVED_WRITE_BATCH_SIZE, MAX_WRITE_BATCH_SIZE, BATCH_WRITE_OPTIONS))
.subscribeOn(Schedulers.boundedElastic()) .subscribeOn(dbScheduler)
.flatMapMany(writeBatch -> Mono .flatMapMany(writeBatch -> Mono
.fromCallable(() -> { .fromCallable(() -> {
synchronized (writeBatch) { synchronized (writeBatch) {
@ -426,7 +475,7 @@ public class LLLocalDictionary implements LLDictionary {
} }
return null; return null;
}) })
.subscribeOn(Schedulers.boundedElastic()) .subscribeOn(dbScheduler)
.thenMany(entries) .thenMany(entries)
.flatMap(newEntry -> putEntryToWriteBatch(newEntry, getOldValues, writeBatch)) .flatMap(newEntry -> putEntryToWriteBatch(newEntry, getOldValues, writeBatch))
.concatWith(Mono .concatWith(Mono
@ -437,7 +486,7 @@ public class LLLocalDictionary implements LLDictionary {
} }
return null; return null;
}) })
.subscribeOn(Schedulers.boundedElastic()) .subscribeOn(dbScheduler)
) )
.doFinally(signalType -> { .doFinally(signalType -> {
synchronized (writeBatch) { synchronized (writeBatch) {
@ -478,7 +527,7 @@ public class LLLocalDictionary implements LLDictionary {
return null; return null;
}) })
.onErrorMap(IOException::new) .onErrorMap(IOException::new)
.subscribeOn(Schedulers.boundedElastic()); .subscribeOn(dbScheduler);
} }
@ -490,7 +539,7 @@ public class LLLocalDictionary implements LLDictionary {
return Mono return Mono
.fromCallable(() -> fast ? fastSizeAll(snapshot) : exactSizeAll(snapshot)) .fromCallable(() -> fast ? fastSizeAll(snapshot) : exactSizeAll(snapshot))
.onErrorMap(IOException::new) .onErrorMap(IOException::new)
.subscribeOn(Schedulers.boundedElastic()); .subscribeOn(dbScheduler);
} else { } else {
return Mono return Mono
.fromCallable(() -> { .fromCallable(() -> {
@ -516,7 +565,7 @@ public class LLLocalDictionary implements LLDictionary {
} }
}) })
.onErrorMap(IOException::new) .onErrorMap(IOException::new)
.subscribeOn(Schedulers.boundedElastic()); .subscribeOn(dbScheduler);
} }
}); });
} }
@ -579,6 +628,6 @@ public class LLLocalDictionary implements LLDictionary {
} }
}) })
.onErrorMap(IOException::new) .onErrorMap(IOException::new)
.subscribeOn(Schedulers.boundedElastic()); .subscribeOn(dbScheduler);
} }
} }

View File

@ -35,6 +35,7 @@ import org.rocksdb.RocksDBException;
import org.rocksdb.Snapshot; import org.rocksdb.Snapshot;
import org.rocksdb.WALRecoveryMode; import org.rocksdb.WALRecoveryMode;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers; import reactor.core.scheduler.Schedulers;
public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
@ -46,6 +47,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
private static final ColumnFamilyDescriptor DEFAULT_COLUMN_FAMILY = new ColumnFamilyDescriptor( private static final ColumnFamilyDescriptor DEFAULT_COLUMN_FAMILY = new ColumnFamilyDescriptor(
RocksDB.DEFAULT_COLUMN_FAMILY); RocksDB.DEFAULT_COLUMN_FAMILY);
private final Scheduler dbScheduler;
private final Path dbPath; private final Path dbPath;
private final String name; private final String name;
private RocksDB db; private RocksDB db;
@ -73,6 +75,12 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
Path dbPath = Paths.get(dbPathString); Path dbPath = Paths.get(dbPathString);
this.dbPath = dbPath; this.dbPath = dbPath;
this.name = name; this.name = name;
this.dbScheduler = Schedulers.newBoundedElastic(Runtime.getRuntime().availableProcessors(),
Schedulers.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE,
"db-" + name,
60,
true
);
createIfNotExists(descriptors, options, dbPath, dbPathString); createIfNotExists(descriptors, options, dbPath, dbPathString);
// Create all column families that don't exist // Create all column families that don't exist
@ -301,7 +309,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
defaultValue defaultValue
)) ))
.onErrorMap(IOException::new) .onErrorMap(IOException::new)
.subscribeOn(Schedulers.boundedElastic()); .subscribeOn(dbScheduler);
} }
@Override @Override
@ -310,16 +318,17 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
.fromCallable(() -> new LLLocalDictionary(db, .fromCallable(() -> new LLLocalDictionary(db,
handles.get(Column.special(Column.toString(columnName))), handles.get(Column.special(Column.toString(columnName))),
name, name,
dbScheduler,
(snapshot) -> snapshotsHandles.get(snapshot.getSequenceNumber()) (snapshot) -> snapshotsHandles.get(snapshot.getSequenceNumber())
)) ))
.subscribeOn(Schedulers.boundedElastic()); .subscribeOn(dbScheduler);
} }
@Override @Override
public Mono<Long> getProperty(String propertyName) { public Mono<Long> getProperty(String propertyName) {
return Mono.fromCallable(() -> db.getAggregatedLongProperty(propertyName)) return Mono.fromCallable(() -> db.getAggregatedLongProperty(propertyName))
.onErrorMap(IOException::new) .onErrorMap(IOException::new)
.subscribeOn(Schedulers.boundedElastic()); .subscribeOn(dbScheduler);
} }
@Override @Override
@ -331,7 +340,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
this.snapshotsHandles.put(currentSnapshotSequenceNumber, snapshot); this.snapshotsHandles.put(currentSnapshotSequenceNumber, snapshot);
return new LLSnapshot(currentSnapshotSequenceNumber); return new LLSnapshot(currentSnapshotSequenceNumber);
}) })
.subscribeOn(Schedulers.boundedElastic()); .subscribeOn(dbScheduler);
} }
@Override @Override
@ -345,7 +354,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
db.releaseSnapshot(dbSnapshot); db.releaseSnapshot(dbSnapshot);
return null; return null;
}) })
.subscribeOn(Schedulers.boundedElastic()); .subscribeOn(dbScheduler);
} }
@Override @Override
@ -361,7 +370,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
return null; return null;
}) })
.onErrorMap(IOException::new) .onErrorMap(IOException::new)
.subscribeOn(Schedulers.boundedElastic()); .subscribeOn(dbScheduler);
} }
/** /**

View File

@ -53,6 +53,7 @@ import reactor.core.publisher.Sinks.EmissionException;
import reactor.core.publisher.Sinks.EmitResult; import reactor.core.publisher.Sinks.EmitResult;
import reactor.core.publisher.Sinks.Many; import reactor.core.publisher.Sinks.Many;
import reactor.core.publisher.Sinks.One; import reactor.core.publisher.Sinks.One;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers; import reactor.core.scheduler.Schedulers;
import reactor.util.function.Tuple2; import reactor.util.function.Tuple2;
import reactor.util.function.Tuples; import reactor.util.function.Tuples;
@ -67,6 +68,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
*/ */
private static final ScheduledExecutorService scheduler private static final ScheduledExecutorService scheduler
= Executors.newSingleThreadScheduledExecutor(new ShortNamedThreadFactory("Lucene")); = Executors.newSingleThreadScheduledExecutor(new ShortNamedThreadFactory("Lucene"));
private static final Scheduler luceneScheduler = Schedulers.fromExecutorService(scheduler);
private final String luceneIndexName; private final String luceneIndexName;
private final SnapshotDeletionPolicy snapshotter; private final SnapshotDeletionPolicy snapshotter;
@ -141,14 +143,14 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
public Mono<LLSnapshot> takeSnapshot() { public Mono<LLSnapshot> takeSnapshot() {
return Mono return Mono
.fromCallable(lastSnapshotSeqNo::incrementAndGet) .fromCallable(lastSnapshotSeqNo::incrementAndGet)
.subscribeOn(Schedulers.boundedElastic()) .subscribeOn(luceneScheduler)
.flatMap(snapshotSeqNo -> takeLuceneSnapshot() .flatMap(snapshotSeqNo -> takeLuceneSnapshot()
.flatMap(snapshot -> Mono .flatMap(snapshot -> Mono
.fromCallable(() -> { .fromCallable(() -> {
this.snapshots.put(snapshotSeqNo, new LuceneIndexSnapshot(snapshot)); this.snapshots.put(snapshotSeqNo, new LuceneIndexSnapshot(snapshot));
return new LLSnapshot(snapshotSeqNo); return new LLSnapshot(snapshotSeqNo);
}) })
.subscribeOn(Schedulers.boundedElastic()) .subscribeOn(luceneScheduler)
) )
); );
} }
@ -169,7 +171,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
throw ex; throw ex;
} }
} }
}).subscribeOn(Schedulers.boundedElastic()); }).subscribeOn(luceneScheduler);
} }
@Override @Override
@ -187,7 +189,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
// Delete unused files after releasing the snapshot // Delete unused files after releasing the snapshot
indexWriter.deleteUnusedFiles(); indexWriter.deleteUnusedFiles();
return null; return null;
}).subscribeOn(Schedulers.boundedElastic()); }).subscribeOn(luceneScheduler);
} }
@Override @Override
@ -195,7 +197,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
return Mono.<Void>fromCallable(() -> { return Mono.<Void>fromCallable(() -> {
indexWriter.addDocument(LLUtils.toDocument(doc)); indexWriter.addDocument(LLUtils.toDocument(doc));
return null; return null;
}).subscribeOn(Schedulers.boundedElastic()); }).subscribeOn(luceneScheduler);
} }
@Override @Override
@ -208,7 +210,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
indexWriter.addDocuments(LLUtils.toDocuments(docs)); indexWriter.addDocuments(LLUtils.toDocuments(docs));
return null; return null;
}) })
.subscribeOn(Schedulers.boundedElastic())) .subscribeOn(luceneScheduler))
) )
.then(); .then();
} }
@ -219,7 +221,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
return Mono.<Void>fromCallable(() -> { return Mono.<Void>fromCallable(() -> {
indexWriter.deleteDocuments(LLUtils.toTerm(id)); indexWriter.deleteDocuments(LLUtils.toTerm(id));
return null; return null;
}).subscribeOn(Schedulers.boundedElastic()); }).subscribeOn(luceneScheduler);
} }
@Override @Override
@ -227,7 +229,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
return Mono.<Void>fromCallable(() -> { return Mono.<Void>fromCallable(() -> {
indexWriter.updateDocument(LLUtils.toTerm(id), LLUtils.toDocument(document)); indexWriter.updateDocument(LLUtils.toTerm(id), LLUtils.toDocument(document));
return null; return null;
}).subscribeOn(Schedulers.boundedElastic()); }).subscribeOn(luceneScheduler);
} }
@Override @Override
@ -244,7 +246,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
indexWriter.updateDocuments(LLUtils.toTerm(documents.key()), luceneDocuments); indexWriter.updateDocuments(LLUtils.toTerm(documents.key()), luceneDocuments);
return null; return null;
}) })
.subscribeOn(Schedulers.boundedElastic()) .subscribeOn(luceneScheduler)
); );
} }
@ -257,7 +259,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
indexWriter.flush(); indexWriter.flush();
indexWriter.commit(); indexWriter.commit();
return null; return null;
}).subscribeOn(Schedulers.boundedElastic()); }).subscribeOn(luceneScheduler);
} }
private Mono<IndexSearcher> acquireSearcherWrapper(LLSnapshot snapshot) { private Mono<IndexSearcher> acquireSearcherWrapper(LLSnapshot snapshot) {
@ -267,7 +269,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
} else { } else {
return resolveSnapshot(snapshot).getIndexSearcher(); return resolveSnapshot(snapshot).getIndexSearcher();
} }
}).subscribeOn(Schedulers.boundedElastic()); }).subscribeOn(luceneScheduler);
} }
private Mono<Void> releaseSearcherWrapper(LLSnapshot snapshot, IndexSearcher indexSearcher) { private Mono<Void> releaseSearcherWrapper(LLSnapshot snapshot, IndexSearcher indexSearcher) {
@ -279,7 +281,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
e.printStackTrace(); e.printStackTrace();
} }
} }
}).subscribeOn(Schedulers.boundedElastic()); }).subscribeOn(luceneScheduler);
} }
@SuppressWarnings({"Convert2MethodRef", "unchecked", "rawtypes"}) @SuppressWarnings({"Convert2MethodRef", "unchecked", "rawtypes"})
@ -308,7 +310,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
// Get the reference doc and apply it to MoreLikeThis, to generate the query // Get the reference doc and apply it to MoreLikeThis, to generate the query
return mlt.like((Map) mltDocumentFields); return mlt.like((Map) mltDocumentFields);
}) })
.subscribeOn(Schedulers.boundedElastic()) .subscribeOn(luceneScheduler)
.flatMap(query -> Mono .flatMap(query -> Mono
.fromCallable(() -> { .fromCallable(() -> {
One<Long> totalHitsCountSink = Sinks.one(); One<Long> totalHitsCountSink = Sinks.one();
@ -334,7 +336,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
}); });
return new LLSearchResult(totalHitsCountSink.asMono(), Flux.just(topKeysSink.asFlux())); return new LLSearchResult(totalHitsCountSink.asMono(), Flux.just(topKeysSink.asFlux()));
}).subscribeOn(Schedulers.boundedElastic()) }).subscribeOn(luceneScheduler)
).then() ).then()
.materialize() .materialize()
.flatMap(value -> releaseSearcherWrapper(snapshot, indexSearcher).thenReturn(value)) .flatMap(value -> releaseSearcherWrapper(snapshot, indexSearcher).thenReturn(value))
@ -356,7 +358,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
org.apache.lucene.search.ScoreMode luceneScoreMode = LLUtils.toScoreMode(scoreMode); org.apache.lucene.search.ScoreMode luceneScoreMode = LLUtils.toScoreMode(scoreMode);
return Tuples.of(query, Optional.ofNullable(luceneSort), luceneScoreMode); return Tuples.of(query, Optional.ofNullable(luceneSort), luceneScoreMode);
}) })
.subscribeOn(Schedulers.boundedElastic()) .subscribeOn(luceneScheduler)
.flatMap(tuple -> Mono .flatMap(tuple -> Mono
.fromCallable(() -> { .fromCallable(() -> {
Query query = tuple.getT1(); Query query = tuple.getT1();
@ -386,7 +388,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
}); });
return new LLSearchResult(totalHitsCountSink.asMono(), Flux.just(topKeysSink.asFlux())); return new LLSearchResult(totalHitsCountSink.asMono(), Flux.just(topKeysSink.asFlux()));
}).subscribeOn(Schedulers.boundedElastic()) }).subscribeOn(luceneScheduler)
) )
.materialize() .materialize()
.flatMap(value -> releaseSearcherWrapper(snapshot, indexSearcher).thenReturn(value)) .flatMap(value -> releaseSearcherWrapper(snapshot, indexSearcher).thenReturn(value))
@ -403,7 +405,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
directory.close(); directory.close();
return null; return null;
}) })
.subscribeOn(Schedulers.boundedElastic()); .subscribeOn(luceneScheduler);
} }
private void scheduledCommit() { private void scheduledCommit() {