Add heap kv database implementation

This commit is contained in:
Andrea Cavalli 2021-07-10 20:52:01 +02:00
parent 5f3c8a2515
commit 58a9121978
20 changed files with 722 additions and 82 deletions

View File

@ -87,7 +87,8 @@ public class LuceneIndexImpl<T, U> implements LuceneIndex<T, U> {
private Mono<SearchResultKeys<T>> transformLuceneResult(LLSearchResultShard llSearchResult) {
return Mono.just(new SearchResultKeys<>(llSearchResult.results()
.map(signal -> new SearchResultKey<>(signal.key().map(indicizer::getKey), signal.score())),
llSearchResult.totalHitsCount()
llSearchResult.totalHitsCount(),
llSearchResult.release()
));
}
@ -96,7 +97,7 @@ public class LuceneIndexImpl<T, U> implements LuceneIndex<T, U> {
return Mono.just(new SearchResult<>(llSearchResult.results().map(signal -> {
var key = signal.key().map(indicizer::getKey);
return new SearchResultItem<>(key, key.flatMap(valueGetter::get), signal.score());
}), llSearchResult.totalHitsCount()));
}), llSearchResult.totalHitsCount(), llSearchResult.release()));
}
/**
@ -179,7 +180,7 @@ public class LuceneIndexImpl<T, U> implements LuceneIndex<T, U> {
@Override
public Mono<Long> count(@Nullable CompositeSnapshot snapshot, Query query) {
return this.search(ClientQueryParams.<SearchResultKey<T>>builder().snapshot(snapshot).query(query).limit(0).build())
.map(SearchResultKeys::totalHitsCount);
.flatMap(tSearchResultKeys -> tSearchResultKeys.release().thenReturn(tSearchResultKeys.totalHitsCount()));
}
@Override

View File

@ -1,10 +1,20 @@
package it.cavallium.dbengine.client;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
public record SearchResult<T, U>(Flux<SearchResultItem<T, U>> results, long totalHitsCount) {
public record SearchResult<T, U>(Flux<SearchResultItem<T, U>> results, long totalHitsCount, Mono<Void> release) {
public static <T, U> SearchResult<T, U> empty() {
return new SearchResult<>(Flux.empty(), 0L);
return new SearchResult<>(Flux.empty(), 0L, Mono.empty());
}
public Flux<SearchResultItem<T, U>> resultsThenRelease() {
return Flux
.usingWhen(
Mono.just(true),
_unused -> results,
_unused -> release
);
}
}

View File

@ -1,19 +1,30 @@
package it.cavallium.dbengine.client;
import it.cavallium.dbengine.database.collections.Joiner.ValueGetter;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@SuppressWarnings("unused")
public record SearchResultKeys<T>(Flux<SearchResultKey<T>> results, long totalHitsCount) {
public record SearchResultKeys<T>(Flux<SearchResultKey<T>> results, long totalHitsCount, Mono<Void> release) {
public static <T, U> SearchResultKeys<T> empty() {
return new SearchResultKeys<>(Flux.empty(), 0L);
return new SearchResultKeys<>(Flux.empty(), 0L, Mono.empty());
}
public <U> SearchResult<T, U> withValues(ValueGetter<T, U> valuesGetter) {
return new SearchResult<>(results.map(item -> new SearchResultItem<>(item.key(),
item.key().flatMap(valuesGetter::get),
item.score()
)), totalHitsCount);
)), totalHitsCount, release);
}
public Flux<SearchResultKey<T>> resultsThenRelease() {
return Flux
.usingWhen(
Mono.just(true),
_unused -> results,
_unused -> release
);
}
}

View File

@ -54,7 +54,7 @@ public interface LLLuceneIndex extends LLSnapshottable {
default Mono<Long> count(@Nullable LLSnapshot snapshot, Query query) {
QueryParams params = QueryParams.of(query, 0, 0, Nullablefloat.empty(), NoSort.of(), ScoreMode.of(false, false));
return Mono.from(this.search(snapshot, params, null)
.map(LLSearchResultShard::totalHitsCount)
.flatMap(llSearchResultShard -> llSearchResultShard.release().thenReturn(llSearchResultShard.totalHitsCount()))
.defaultIfEmpty(0L));
}

View File

@ -1,5 +1,6 @@
package it.cavallium.dbengine.database;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
public record LLSearchResultShard (Flux<LLKeyScore> results, long totalHitsCount) {}
public record LLSearchResultShard (Flux<LLKeyScore> results, long totalHitsCount, Mono<Void> release) {}

View File

@ -26,6 +26,7 @@ import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang3.time.StopWatch;
import org.jetbrains.annotations.Nullable;
import org.rocksdb.BlockBasedTableConfig;
import org.rocksdb.BloomFilter;
import org.rocksdb.ColumnFamilyDescriptor;
@ -80,7 +81,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
@SuppressWarnings("SwitchStatementWithTooFewBranches")
public LLLocalKeyValueDatabase(ByteBufAllocator allocator,
String name,
Path path,
@Nullable Path path,
List<Column> columns,
List<ColumnFamilyHandle> handles,
DatabaseOptions databaseOptions) throws IOException {
@ -127,7 +128,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
// a factory method that returns a RocksDB instance
this.db = RocksDB.open(new DBOptions(rocksdbOptions),
dbPathString,
databaseOptions.inMemory() ? List.of(DEFAULT_COLUMN_FAMILY) : descriptors,
descriptors,
handles
);
break;
@ -145,7 +146,6 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
}
}
}
createInMemoryColumns(descriptors, databaseOptions, handles);
this.handles = new HashMap<>();
if (enableColumnsBug && !databaseOptions.inMemory()) {
for (int i = 0; i < columns.size(); i++) {
@ -189,7 +189,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
db.closeE();
} catch (RocksDBException ex) {
if ("Cannot close DB with unreleased snapshot.".equals(ex.getMessage())) {
snapshotsHandles.forEach((id, snapshot) -> {;
snapshotsHandles.forEach((id, snapshot) -> {
try {
db.releaseSnapshot(snapshot);
} catch (Exception ex2) {
@ -246,12 +246,17 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
}
@SuppressWarnings({"CommentedOutCode", "PointlessArithmeticExpression"})
private static Options openRocksDb(Path path, DatabaseOptions databaseOptions) throws IOException {
private static Options openRocksDb(@Nullable Path path, DatabaseOptions databaseOptions) throws IOException {
// Get databases directory path
Path databasesDirPath = path.toAbsolutePath().getParent();
// Create base directories
if (Files.notExists(databasesDirPath)) {
Files.createDirectories(databasesDirPath);
Path databasesDirPath;
if (path != null) {
databasesDirPath = path.toAbsolutePath().getParent();
// Create base directories
if (Files.notExists(databasesDirPath)) {
Files.createDirectories(databasesDirPath);
}
} else {
databasesDirPath = null;
}
// the Options class contains a set of configurable DB options
@ -280,7 +285,8 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
options.setKeepLogFileNum(10);
options.setAllowFAllocate(true);
options.setRateLimiter(new RateLimiter(10L * 1024L * 1024L)); // 10MiB/s max compaction write speed
var paths = List.of(new DbPath(databasesDirPath.resolve(path.getFileName() + "_hot"),
List<DbPath> paths = List.of(new DbPath(databasesDirPath.resolve(path.getFileName() + "_hot"),
10L * 1024L * 1024L * 1024L), // 10GiB
new DbPath(databasesDirPath.resolve(path.getFileName() + "_cold"),
100L * 1024L * 1024L * 1024L), // 100GiB
@ -367,27 +373,6 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
return options;
}
private void createInMemoryColumns(List<ColumnFamilyDescriptor> totalDescriptors,
DatabaseOptions databaseOptions,
List<ColumnFamilyHandle> handles)
throws RocksDBException {
if (!databaseOptions.inMemory()) {
return;
}
List<byte[]> columnFamiliesToCreate = new LinkedList<>();
for (ColumnFamilyDescriptor descriptor : totalDescriptors) {
columnFamiliesToCreate.add(descriptor.getName());
}
for (byte[] name : columnFamiliesToCreate) {
if (!Arrays.equals(name, DEFAULT_COLUMN_FAMILY.getName())) {
var descriptor = new ColumnFamilyDescriptor(name);
handles.add(db.createColumnFamily(descriptor));
}
}
}
private void createIfNotExists(List<ColumnFamilyDescriptor> descriptors,
Options options,
DatabaseOptions databaseOptions,

View File

@ -120,15 +120,22 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
private final ScheduledTaskLifecycle scheduledTasksLifecycle;
public LLLocalLuceneIndex(Path luceneBasePath,
public LLLocalLuceneIndex(@Nullable Path luceneBasePath,
String name,
IndicizerAnalyzers indicizerAnalyzers,
IndicizerSimilarities indicizerSimilarities,
LuceneOptions luceneOptions) throws IOException {
Path directoryPath;
if (luceneOptions.inMemory() != (luceneBasePath == null)) {
throw new IllegalArgumentException();
} else if (luceneBasePath != null) {
directoryPath = luceneBasePath.resolve(name + ".lucene.db");
} else {
directoryPath = null;
}
if (name.length() == 0) {
throw new IOException("Empty lucene database name");
}
Path directoryPath = luceneBasePath.resolve(name + ".lucene.db");
if (!MMapDirectory.UNMAP_SUPPORTED) {
logger.error("Unmap is unsupported, lucene will run slower: {}", MMapDirectory.UNMAP_NOT_SUPPORTED_REASON);
} else {
@ -471,13 +478,14 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
String keyFieldName,
Flux<Tuple2<String, Set<String>>> mltDocumentFieldsFlux) {
return getMoreLikeThisQuery(snapshot, LuceneUtils.toLocalQueryParams(queryParams), mltDocumentFieldsFlux)
.flatMap(modifiedLocalQuery -> Mono
.usingWhen(
this.acquireSearcherWrapper(snapshot),
indexSearcher -> localSearcher.collect(indexSearcher, modifiedLocalQuery, keyFieldName, luceneSearcherScheduler),
indexSearcher -> releaseSearcherWrapper(snapshot, indexSearcher)
)
.map(result -> new LLSearchResultShard(result.results(), result.totalHitsCount()))
.flatMap(modifiedLocalQuery -> this.acquireSearcherWrapper(snapshot)
.flatMap(indexSearcher -> {
Mono<Void> releaseMono = releaseSearcherWrapper(snapshot, indexSearcher);
return localSearcher
.collect(indexSearcher, releaseMono, modifiedLocalQuery, keyFieldName, luceneSearcherScheduler)
.map(result -> new LLSearchResultShard(result.results(), result.totalHitsCount(), result.release()))
.onErrorResume(ex -> releaseMono.then(Mono.error(ex)));
})
);
}
@ -486,12 +494,13 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
Flux<Tuple2<String, Set<String>>> mltDocumentFieldsFlux,
LuceneShardSearcher shardSearcher) {
return getMoreLikeThisQuery(snapshot, LuceneUtils.toLocalQueryParams(queryParams), mltDocumentFieldsFlux)
.flatMap(modifiedLocalQuery -> Mono
.usingWhen(
this.acquireSearcherWrapper(snapshot),
indexSearcher -> shardSearcher.searchOn(indexSearcher, modifiedLocalQuery, luceneSearcherScheduler),
indexSearcher -> releaseSearcherWrapper(snapshot, indexSearcher)
)
.flatMap(modifiedLocalQuery -> this.acquireSearcherWrapper(snapshot)
.flatMap(indexSearcher -> {
Mono<Void> releaseMono = releaseSearcherWrapper(snapshot, indexSearcher);
return shardSearcher
.searchOn(indexSearcher, releaseMono, modifiedLocalQuery, luceneSearcherScheduler)
.onErrorResume(ex -> releaseMono.then(Mono.error(ex)));
})
);
}
@ -569,25 +578,26 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
@Override
public Mono<LLSearchResultShard> search(@Nullable LLSnapshot snapshot, QueryParams queryParams, String keyFieldName) {
LocalQueryParams localQueryParams = LuceneUtils.toLocalQueryParams(queryParams);
return Mono
.usingWhen(
this.acquireSearcherWrapper(snapshot),
indexSearcher -> localSearcher.collect(indexSearcher, localQueryParams, keyFieldName, luceneSearcherScheduler),
indexSearcher -> releaseSearcherWrapper(snapshot, indexSearcher)
)
.map(result -> new LLSearchResultShard(result.results(), result.totalHitsCount()));
return this.acquireSearcherWrapper(snapshot)
.flatMap(indexSearcher -> {
Mono<Void> releaseMono = releaseSearcherWrapper(snapshot, indexSearcher);
return localSearcher
.collect(indexSearcher, releaseMono, localQueryParams, keyFieldName, luceneSearcherScheduler)
.map(result -> new LLSearchResultShard(result.results(), result.totalHitsCount(), result.release()))
.onErrorResume(ex -> releaseMono.then(Mono.error(ex)));
});
}
public Mono<Void> distributedSearch(@Nullable LLSnapshot snapshot,
QueryParams queryParams,
LuceneShardSearcher shardSearcher) {
LocalQueryParams localQueryParams = LuceneUtils.toLocalQueryParams(queryParams);
return Mono
.usingWhen(
this.acquireSearcherWrapper(snapshot),
indexSearcher -> shardSearcher.searchOn(indexSearcher, localQueryParams, luceneSearcherScheduler),
indexSearcher -> releaseSearcherWrapper(snapshot, indexSearcher)
);
return this.acquireSearcherWrapper(snapshot)
.flatMap(indexSearcher -> {
Mono<Void> releaseMono = releaseSearcherWrapper(snapshot, indexSearcher);
return shardSearcher.searchOn(indexSearcher, releaseMono, localQueryParams, luceneSearcherScheduler)
.onErrorResume(ex -> releaseMono.then(Mono.error(ex)));
});
}
@Override

View File

@ -221,7 +221,7 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex {
.then(shardSearcher.collect(localQueryParams, keyFieldName, Schedulers.boundedElastic()))
)
// Fix the result type
.map(result -> new LLSearchResultShard(result.results(), result.totalHitsCount()));
.map(result -> new LLSearchResultShard(result.results(), result.totalHitsCount(), result.release()));
}
@Override
@ -249,7 +249,7 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex {
.then(shardSearcher.collect(localQueryParams, keyFieldName, Schedulers.boundedElastic()))
)
// Fix the result type
.map(result -> new LLSearchResultShard(result.results(), result.totalHitsCount()));
.map(result -> new LLSearchResultShard(result.results(), result.totalHitsCount(), result.release()));
}
@Override

View File

@ -0,0 +1,78 @@
package it.cavallium.dbengine.database.memory;
import io.netty.buffer.ByteBufAllocator;
import it.cavallium.dbengine.client.DatabaseOptions;
import it.cavallium.dbengine.client.IndicizerAnalyzers;
import it.cavallium.dbengine.client.IndicizerSimilarities;
import it.cavallium.dbengine.client.LuceneOptions;
import it.cavallium.dbengine.database.Column;
import it.cavallium.dbengine.database.LLDatabaseConnection;
import it.cavallium.dbengine.database.LLKeyValueDatabase;
import it.cavallium.dbengine.database.LLLuceneIndex;
import it.cavallium.dbengine.database.disk.LLLocalKeyValueDatabase;
import it.cavallium.dbengine.database.disk.LLLocalLuceneIndex;
import it.cavallium.dbengine.database.disk.LLLocalMultiLuceneIndex;
import it.cavallium.dbengine.netty.JMXNettyMonitoringManager;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.LinkedList;
import java.util.List;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
public class LLMemoryDatabaseConnection implements LLDatabaseConnection {
static {
JMXNettyMonitoringManager.initialize();
}
private final ByteBufAllocator allocator;
public LLMemoryDatabaseConnection(ByteBufAllocator allocator) {
this.allocator = allocator;
}
@Override
public ByteBufAllocator getAllocator() {
return allocator;
}
@Override
public Mono<LLDatabaseConnection> connect() {
return Mono.empty();
}
@Override
public Mono<LLKeyValueDatabase> getDatabase(String name,
List<Column> columns,
DatabaseOptions databaseOptions) {
return Mono
.<LLKeyValueDatabase>fromCallable(() -> new LLMemoryKeyValueDatabase(
allocator,
name,
columns
))
.subscribeOn(Schedulers.boundedElastic());
}
@Override
public Mono<LLLuceneIndex> getLuceneIndex(String name,
int instancesCount,
IndicizerAnalyzers indicizerAnalyzers,
IndicizerSimilarities indicizerSimilarities,
LuceneOptions luceneOptions) {
return Mono
.<LLLuceneIndex>fromCallable(() -> new LLLocalLuceneIndex(null,
name,
indicizerAnalyzers,
indicizerSimilarities,
luceneOptions
))
.subscribeOn(Schedulers.boundedElastic());
}
@Override
public Mono<Void> disconnect() {
return Mono.empty();
}
}

View File

@ -0,0 +1,331 @@
package it.cavallium.dbengine.database.memory;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.util.ReferenceCounted;
import it.cavallium.dbengine.client.BadBlock;
import it.cavallium.dbengine.database.Delta;
import it.cavallium.dbengine.database.LLDictionary;
import it.cavallium.dbengine.database.LLDictionaryResultType;
import it.cavallium.dbengine.database.LLRange;
import it.cavallium.dbengine.database.LLSnapshot;
import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.UpdateMode;
import it.unimi.dsi.fastutil.bytes.ByteList;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.function.Function;
import org.jetbrains.annotations.Nullable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
public class LLMemoryDictionary implements LLDictionary {
private final String databaseName;
private final String columnName;
private final ByteBufAllocator allocator;
private final UpdateMode updateMode;
private final Getter<Long, ConcurrentSkipListMap<ByteList, ByteList>> snapshots;
private final ConcurrentSkipListMap<ByteList, ByteList> mainDb;
private interface Getter<T, U> {
U get(T argument);
}
public LLMemoryDictionary(ByteBufAllocator allocator,
String databaseName,
String columnName,
UpdateMode updateMode,
ConcurrentHashMap<Long, ConcurrentHashMap<String, ConcurrentSkipListMap<ByteList, ByteList>>> snapshots,
ConcurrentHashMap<String, ConcurrentSkipListMap<ByteList, ByteList>> mainDb) {
this.databaseName = databaseName;
this.columnName = columnName;
this.allocator = allocator;
this.updateMode = updateMode;
this.snapshots = (snapshotId) -> snapshots.get(snapshotId).get(columnName);
this.mainDb = mainDb.get(columnName);
}
@Override
public String getColumnName() {
return columnName;
}
@Override
public ByteBufAllocator getAllocator() {
return allocator;
}
private long resolveSnapshot(LLSnapshot snapshot) {
if (snapshot == null) {
return Long.MIN_VALUE + 1L;
} else if (snapshot.getSequenceNumber() == Long.MIN_VALUE + 1L) {
throw new IllegalStateException();
} else {
return snapshot.getSequenceNumber();
}
}
private Mono<ByteBuf> transformResult(Mono<ByteList> result, LLDictionaryResultType resultType) {
if (resultType == LLDictionaryResultType.PREVIOUS_VALUE) {
// Don't retain the result because it has been removed from the skip list
return result.map(this::kk);
} else if (resultType == LLDictionaryResultType.PREVIOUS_VALUE_EXISTENCE) {
return result
.map(prev -> true)
.defaultIfEmpty(false)
.map(LLUtils::booleanToResponseByteBuffer);
} else {
return result.then(Mono.empty());
}
}
private ByteList k(ByteBuf buf) {
return ByteList.of(LLUtils.toArray(buf));
}
private ByteBuf kk(ByteList bytesList) {
var buffer = getAllocator().buffer(bytesList.size());
buffer.writeBytes(bytesList.toByteArray());
return buffer;
}
private Map<ByteList, ByteList> mapSlice(LLSnapshot snapshot, LLRange range) {
if (range.isAll()) {
return snapshots.get(resolveSnapshot(snapshot));
} else if (range.isSingle()) {
var key = k(range.getSingle());
var value = snapshots
.get(resolveSnapshot(snapshot))
.get(key);
if (value != null) {
return Map.of(key, value);
} else {
return Map.of();
}
} else if (range.hasMin() && range.hasMax()) {
var min = k(range.getMin());
var max = k(range.getMax());
if (min.compareTo(max) > 0) {
return Map.of();
}
return snapshots
.get(resolveSnapshot(snapshot))
.subMap(min, true, max, false);
} else if (range.hasMin()) {
return snapshots
.get(resolveSnapshot(snapshot))
.tailMap(k(range.getMin()), true);
} else {
return snapshots
.get(resolveSnapshot(snapshot))
.headMap(k(range.getMax()), false);
}
}
@Override
public Mono<ByteBuf> get(@Nullable LLSnapshot snapshot, ByteBuf key, boolean existsAlmostCertainly) {
try {
return Mono
.fromCallable(() -> snapshots.get(resolveSnapshot(snapshot)).get(k(key)))
.map(this::kk)
.onErrorMap(cause -> new IOException("Failed to read " + LLUtils.toStringSafe(key), cause))
.doFirst(key::retain)
.doAfterTerminate(key::release);
} finally {
key.release();
}
}
@Override
public Mono<ByteBuf> put(ByteBuf key, ByteBuf value, LLDictionaryResultType resultType) {
try {
return Mono
.fromCallable(() -> mainDb.put(k(key),k(value)))
.transform(result -> this.transformResult(result, resultType))
.onErrorMap(cause -> new IOException("Failed to read " + LLUtils.toStringSafe(key), cause))
.doFirst(key::retain)
.doAfterTerminate(key::release);
} finally {
key.release();
}
}
@Override
public Mono<UpdateMode> getUpdateMode() {
return Mono.just(updateMode);
}
@Override
public Mono<Delta<ByteBuf>> updateAndGetDelta(ByteBuf key,
Function<@Nullable ByteBuf, @Nullable ByteBuf> updater,
boolean existsAlmostCertainly) {
return null;
}
@Override
public Mono<Void> clear() {
return Mono.fromRunnable(mainDb::clear);
}
@Override
public Mono<ByteBuf> remove(ByteBuf key, LLDictionaryResultType resultType) {
try {
return Mono
.fromCallable(() -> mainDb.remove(k(key)))
// Don't retain the result because it has been removed from the skip list
.onErrorMap(cause -> new IOException("Failed to read " + LLUtils.toStringSafe(key), cause))
.map(this::kk)
.doFirst(key::retain)
.doAfterTerminate(key::release);
} finally {
key.release();
}
}
@Override
public Flux<Entry<ByteBuf, ByteBuf>> getMulti(@Nullable LLSnapshot snapshot,
Flux<ByteBuf> keys,
boolean existsAlmostCertainly) {
return keys
.handle((key, sink) -> {
try {
var v = snapshots.get(resolveSnapshot(snapshot)).get(k(key));
if (v == null) {
sink.complete();
} else {
sink.next(Map.entry(key.retain(), kk(v)));
}
} finally {
key.release();
}
});
}
@Override
public Flux<Entry<ByteBuf, ByteBuf>> putMulti(Flux<Entry<ByteBuf, ByteBuf>> entries, boolean getOldValues) {
return entries
.handle((entry, sink) -> {
var key = entry.getKey();
var val = entry.getValue();
try {
var v = mainDb.put(k(key), k(val));
if (v == null || !getOldValues) {
sink.complete();
} else {
sink.next(Map.entry(key.retain(), kk(v)));
}
} finally {
key.release();
val.release();
}
});
}
@Override
public Flux<Entry<ByteBuf, ByteBuf>> getRange(@Nullable LLSnapshot snapshot,
LLRange range,
boolean existsAlmostCertainly) {
try {
if (range.isSingle()) {
return Mono.fromCallable(() -> {
var element = snapshots.get(resolveSnapshot(snapshot))
.get(k(range.getSingle()));
return Map.entry(range.getSingle().retain(), kk(element));
}).flux();
} else {
return Mono
.fromCallable(() -> mapSlice(snapshot, range))
.flatMapMany(map -> Flux.fromIterable(map.entrySet()))
.map(entry -> Map.entry(kk(entry.getKey()), kk(entry.getValue())));
}
} finally {
range.release();
}
}
@Override
public Flux<List<Entry<ByteBuf, ByteBuf>>> getRangeGrouped(@Nullable LLSnapshot snapshot,
LLRange range,
int prefixLength,
boolean existsAlmostCertainly) {
return Flux.error(new UnsupportedOperationException("Not implemented"));
}
@Override
public Flux<ByteBuf> getRangeKeys(@Nullable LLSnapshot snapshot, LLRange range) {
try {
if (range.isSingle()) {
return Mono.fromCallable(() -> {
var contains = snapshots.get(resolveSnapshot(snapshot))
.containsKey(k(range.getSingle()));
return contains ? range.getSingle().retain() : null;
}).flux();
} else {
return Mono
.fromCallable(() -> mapSlice(snapshot, range))
.flatMapMany(map -> Flux.fromIterable(map.entrySet()))
.map(entry -> kk(entry.getKey()));
}
} finally {
range.release();
}
}
@Override
public Flux<List<ByteBuf>> getRangeKeysGrouped(@Nullable LLSnapshot snapshot, LLRange range, int prefixLength) {
return getRangeKeys(snapshot, range)
.bufferUntilChanged(k -> k.slice(k.readerIndex(), prefixLength), LLUtils::equals);
}
@Override
public Flux<ByteBuf> getRangeKeyPrefixes(@Nullable LLSnapshot snapshot, LLRange range, int prefixLength) {
return getRangeKeys(snapshot, range)
.distinctUntilChanged(k -> k.slice(k.readerIndex(), prefixLength), LLUtils::equals)
.map(k -> k.slice(k.readerIndex(), prefixLength));
}
@Override
public Flux<BadBlock> badBlocks(LLRange range) {
return Flux.empty();
}
@Override
public Mono<Void> setRange(LLRange range, Flux<Entry<ByteBuf, ByteBuf>> entries) {
return Mono.error(new UnsupportedOperationException("Not implemented"));
}
@Override
public Mono<Boolean> isRangeEmpty(@Nullable LLSnapshot snapshot, LLRange range) {
return Mono.error(new UnsupportedOperationException("Not implemented"));
}
@Override
public Mono<Long> sizeRange(@Nullable LLSnapshot snapshot, LLRange range, boolean fast) {
return Mono.fromCallable(() -> (long) mapSlice(snapshot, range).size());
}
@Override
public Mono<Entry<ByteBuf, ByteBuf>> getOne(@Nullable LLSnapshot snapshot, LLRange range) {
return Mono.error(new UnsupportedOperationException("Not implemented"));
}
@Override
public Mono<ByteBuf> getOneKey(@Nullable LLSnapshot snapshot, LLRange range) {
return Mono.error(new UnsupportedOperationException("Not implemented"));
}
@Override
public Mono<Entry<ByteBuf, ByteBuf>> removeOne(LLRange range) {
return Mono.error(new UnsupportedOperationException("Not implemented"));
}
@Override
public String getDatabaseName() {
return databaseName;
}
}

View File

@ -0,0 +1,119 @@
package it.cavallium.dbengine.database.memory;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import it.cavallium.dbengine.client.DatabaseOptions;
import it.cavallium.dbengine.database.Column;
import it.cavallium.dbengine.database.LLDictionary;
import it.cavallium.dbengine.database.LLKeyValueDatabase;
import it.cavallium.dbengine.database.LLKeyValueDatabaseStructure;
import it.cavallium.dbengine.database.LLSingleton;
import it.cavallium.dbengine.database.LLSnapshot;
import it.cavallium.dbengine.database.LLSnapshottable;
import it.cavallium.dbengine.database.UpdateMode;
import it.unimi.dsi.fastutil.bytes.ByteList;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicLong;
import org.jetbrains.annotations.Nullable;
import org.rocksdb.ColumnFamilyHandle;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
public class LLMemoryKeyValueDatabase implements LLKeyValueDatabase {
private final ByteBufAllocator allocator;
private final String name;
private final AtomicLong nextSnapshotNumber = new AtomicLong(1);
private final ConcurrentHashMap<Long, ConcurrentHashMap<String, ConcurrentSkipListMap<ByteList, ByteList>>> snapshots = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, ConcurrentSkipListMap<ByteList, ByteList>> mainDb;
public LLMemoryKeyValueDatabase(ByteBufAllocator allocator, String name, List<Column> columns) {
this.allocator = allocator;
this.name = name;
this.mainDb = new ConcurrentHashMap<>();
for (Column column : columns) {
mainDb.put(column.name(), new ConcurrentSkipListMap<>());
}
this.snapshots.put(Long.MIN_VALUE + 1L, this.mainDb);
}
@Override
public Mono<? extends LLSingleton> getSingleton(byte[] singletonListColumnName, byte[] name, byte[] defaultValue) {
return Mono.error(new UnsupportedOperationException("Not implemented"));
}
@Override
public Mono<? extends LLDictionary> getDictionary(byte[] columnName, UpdateMode updateMode) {
var columnNameString = new String(columnName, StandardCharsets.UTF_8);
return Mono.fromCallable(() -> new LLMemoryDictionary(allocator,
name,
columnNameString,
updateMode,
snapshots,
mainDb
));
}
@Override
public Mono<Long> getProperty(String propertyName) {
return Mono.empty();
}
@Override
public Mono<Void> verifyChecksum() {
return Mono.empty();
}
@Override
public ByteBufAllocator getAllocator() {
return allocator;
}
@Override
public Mono<Void> close() {
return Mono
.fromRunnable(() -> {
snapshots.forEach((snapshot, dbs) -> dbs.forEach((columnName, db) -> {
db.clear();
}));
mainDb.forEach((columnName, db) -> {
db.clear();
});
});
}
@Override
public String getDatabaseName() {
return name;
}
@Override
public Mono<LLSnapshot> takeSnapshot() {
return Mono
.fromCallable(() -> {
var snapshotNumber = nextSnapshotNumber.getAndIncrement();
var snapshot = new ConcurrentHashMap<String, ConcurrentSkipListMap<ByteList, ByteList>>();
mainDb.forEach((columnName, column) -> {
var cloned = column.clone();
snapshot.put(columnName, cloned);
});
snapshots.put(snapshotNumber, snapshot);
return new LLSnapshot(snapshotNumber);
});
}
@Override
public Mono<Void> releaseSnapshot(LLSnapshot snapshot) {
return Mono
.fromCallable(() -> snapshots.remove(snapshot.getSequenceNumber()))
.then();
}
}

View File

@ -13,13 +13,14 @@ public class AdaptiveLuceneLocalSearcher implements LuceneLocalSearcher {
@Override
public Mono<LuceneSearchResult> collect(IndexSearcher indexSearcher,
Mono<Void> releaseIndexSearcher,
LocalQueryParams queryParams,
String keyFieldName,
Scheduler scheduler) {
if (queryParams.limit() == 0) {
return countSearcher.collect(indexSearcher, queryParams, keyFieldName, scheduler);
return countSearcher.collect(indexSearcher, releaseIndexSearcher, queryParams, keyFieldName, scheduler);
} else {
return localSearcher.collect(indexSearcher, queryParams, keyFieldName, scheduler);
return localSearcher.collect(indexSearcher, releaseIndexSearcher, queryParams, keyFieldName, scheduler);
}
}
}

View File

@ -11,6 +11,7 @@ public class CountLuceneLocalSearcher implements LuceneLocalSearcher {
@Override
public Mono<LuceneSearchResult> collect(IndexSearcher indexSearcher,
Mono<Void> releaseIndexSearcher,
LocalQueryParams queryParams,
String keyFieldName,
Scheduler scheduler) {
@ -18,7 +19,8 @@ public class CountLuceneLocalSearcher implements LuceneLocalSearcher {
return Mono
.fromCallable(() -> new LuceneSearchResult(
indexSearcher.count(queryParams.query()),
Flux.empty())
Flux.empty(),
releaseIndexSearcher)
)
.subscribeOn(scheduler);
}

View File

@ -2,6 +2,7 @@ package it.cavallium.dbengine.lucene.searcher;
import it.cavallium.dbengine.client.query.QueryParser;
import it.cavallium.dbengine.client.query.current.data.QueryParams;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.Query;
@ -16,13 +17,18 @@ public class CountLuceneMultiSearcher implements LuceneMultiSearcher {
return Mono
.fromCallable(() -> {
AtomicLong totalHits = new AtomicLong(0);
ConcurrentLinkedQueue<Mono<Void>> release = new ConcurrentLinkedQueue<>();
return new LuceneShardSearcher() {
@Override
public Mono<Void> searchOn(IndexSearcher indexSearcher, LocalQueryParams queryParams, Scheduler scheduler) {
public Mono<Void> searchOn(IndexSearcher indexSearcher,
Mono<Void> releaseIndexSearcher,
LocalQueryParams queryParams,
Scheduler scheduler) {
return Mono
.<Void>fromCallable(() -> {
//noinspection BlockingMethodInNonBlockingContext
totalHits.addAndGet(indexSearcher.count(queryParams.query()));
release.add(releaseIndexSearcher);
return null;
})
.subscribeOn(scheduler);
@ -30,7 +36,7 @@ public class CountLuceneMultiSearcher implements LuceneMultiSearcher {
@Override
public Mono<LuceneSearchResult> collect(LocalQueryParams queryParams, String keyFieldName, Scheduler scheduler) {
return Mono.fromCallable(() -> new LuceneSearchResult(totalHits.get(), Flux.empty()));
return Mono.fromCallable(() -> new LuceneSearchResult(totalHits.get(), Flux.empty(), Mono.when(release)));
}
};
});

View File

@ -14,6 +14,7 @@ public interface LuceneLocalSearcher {
* @param scheduler a blocking scheduler
*/
Mono<LuceneSearchResult> collect(IndexSearcher indexSearcher,
Mono<Void> releaseIndexSearcher,
LocalQueryParams queryParams,
String keyFieldName,
Scheduler scheduler);

View File

@ -1,8 +1,73 @@
package it.cavallium.dbengine.lucene.searcher;
import it.cavallium.dbengine.database.LLKeyScore;
import it.cavallium.dbengine.database.disk.LLLocalKeyValueDatabase;
import java.io.IOException;
import java.util.Objects;
import org.warp.commonutils.log.Logger;
import org.warp.commonutils.log.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
public final class LuceneSearchResult {
protected static final Logger logger = LoggerFactory.getLogger(LuceneSearchResult.class);
private volatile boolean releaseCalled;
private final long totalHitsCount;
private final Flux<LLKeyScore> results;
private final Mono<Void> release;
public LuceneSearchResult(long totalHitsCount, Flux<LLKeyScore> results, Mono<Void> release) {
this.totalHitsCount = totalHitsCount;
this.results = results;
this.release = Mono.fromRunnable(() -> {
if (releaseCalled) {
logger.warn("LuceneSearchResult::release has been called twice!");
}
releaseCalled = true;
}).then(release);
}
@SuppressWarnings("deprecation")
@Override
protected void finalize() throws Throwable {
if (!releaseCalled) {
logger.warn("LuceneSearchResult::release has not been called before class finalization!");
}
super.finalize();
}
public long totalHitsCount() {
return totalHitsCount;
}
public Flux<LLKeyScore> results() {
return results;
}
public Mono<Void> release() {
return release;
}
@Override
public boolean equals(Object obj) {
if (obj == this)
return true;
if (obj == null || obj.getClass() != this.getClass())
return false;
var that = (LuceneSearchResult) obj;
return this.totalHitsCount == that.totalHitsCount && Objects.equals(this.results, that.results);
}
@Override
public int hashCode() {
return Objects.hash(totalHitsCount, results);
}
@Override
public String toString() {
return "LuceneSearchResult[" + "totalHitsCount=" + totalHitsCount + ", " + "results=" + results + ']';
}
public record LuceneSearchResult(long totalHitsCount, Flux<LLKeyScore> results) {
}

View File

@ -13,6 +13,7 @@ public interface LuceneShardSearcher {
* @param scheduler a blocking scheduler
*/
Mono<Void> searchOn(IndexSearcher indexSearcher,
Mono<Void> indexSearcherRelease,
LocalQueryParams queryParams,
Scheduler scheduler);

View File

@ -27,6 +27,7 @@ class ScoredSimpleLuceneShardSearcher implements LuceneShardSearcher {
private final Object lock = new Object();
private final List<IndexSearcher> indexSearchersArray = new ArrayList<>();
private final List<Mono<Void>> indexSearcherReleasersArray = new ArrayList<>();
private final List<TopFieldCollector> collectors = new ArrayList<>();
private final CollectorManager<TopFieldCollector, TopDocs> firstPageSharedManager;
private final Query luceneQuery;
@ -40,13 +41,17 @@ class ScoredSimpleLuceneShardSearcher implements LuceneShardSearcher {
}
@Override
public Mono<Void> searchOn(IndexSearcher indexSearcher, LocalQueryParams queryParams, Scheduler scheduler) {
public Mono<Void> searchOn(IndexSearcher indexSearcher,
Mono<Void> releaseIndexSearcher,
LocalQueryParams queryParams,
Scheduler scheduler) {
return Mono.<Void>fromCallable(() -> {
TopFieldCollector collector;
synchronized (lock) {
//noinspection BlockingMethodInNonBlockingContext
collector = firstPageSharedManager.newCollector();
indexSearchersArray.add(indexSearcher);
indexSearcherReleasersArray.add(releaseIndexSearcher);
collectors.add(collector);
}
//noinspection BlockingMethodInNonBlockingContext
@ -65,9 +70,11 @@ class ScoredSimpleLuceneShardSearcher implements LuceneShardSearcher {
return Mono
.fromCallable(() -> {
TopDocs result;
Mono<Void> release;
synchronized (lock) {
//noinspection BlockingMethodInNonBlockingContext
result = firstPageSharedManager.reduce(collectors);
release = Mono.when(indexSearcherReleasersArray);
}
IndexSearchers indexSearchers;
synchronized (lock) {
@ -136,7 +143,8 @@ class ScoredSimpleLuceneShardSearcher implements LuceneShardSearcher {
return new LuceneSearchResult(result.totalHits.value,
firstPageHits
.concatWith(nextHits)
.transform(flux -> LuceneUtils.filterTopDoc(flux, queryParams))
.transform(flux -> LuceneUtils.filterTopDoc(flux, queryParams)),
release
);
})
.subscribeOn(scheduler);

View File

@ -18,6 +18,7 @@ public class SimpleLuceneLocalSearcher implements LuceneLocalSearcher {
@Override
public Mono<LuceneSearchResult> collect(IndexSearcher indexSearcher,
Mono<Void> releaseIndexSearcher,
LocalQueryParams queryParams,
String keyFieldName,
Scheduler scheduler) {
@ -85,7 +86,8 @@ public class SimpleLuceneLocalSearcher implements LuceneLocalSearcher {
return new LuceneSearchResult(firstPageTopDocs.totalHits.value, firstPageMono
.concatWith(nextHits)
.transform(flux -> LuceneUtils.filterTopDoc(flux, queryParams))
.transform(flux -> LuceneUtils.filterTopDoc(flux, queryParams)),
releaseIndexSearcher
);
})
.subscribeOn(scheduler);

View File

@ -26,6 +26,7 @@ class UnscoredLuceneShardSearcher implements LuceneShardSearcher {
private final Object lock = new Object();
private final List<IndexSearcher> indexSearchersArray = new ArrayList<>();
private final List<Mono<Void>> indexSearcherReleasersArray = new ArrayList<>();
private final List<TopDocsCollector<ScoreDoc>> collectors = new ArrayList<>();
private final CollectorManager<TopDocsCollector<ScoreDoc>, TopDocs> firstPageUnsortedCollectorManager;
private final Query luceneQuery;
@ -40,13 +41,17 @@ class UnscoredLuceneShardSearcher implements LuceneShardSearcher {
}
@Override
public Mono<Void> searchOn(IndexSearcher indexSearcher, LocalQueryParams queryParams, Scheduler scheduler) {
public Mono<Void> searchOn(IndexSearcher indexSearcher,
Mono<Void> releaseIndexSearcher,
LocalQueryParams queryParams,
Scheduler scheduler) {
return Mono.<Void>fromCallable(() -> {
TopDocsCollector<ScoreDoc> collector;
synchronized (lock) {
//noinspection BlockingMethodInNonBlockingContext
collector = firstPageUnsortedCollectorManager.newCollector();
indexSearchersArray.add(indexSearcher);
indexSearcherReleasersArray.add(releaseIndexSearcher);
collectors.add(collector);
}
//noinspection BlockingMethodInNonBlockingContext
@ -60,9 +65,11 @@ class UnscoredLuceneShardSearcher implements LuceneShardSearcher {
return Mono
.fromCallable(() -> {
TopDocs result;
Mono<Void> release;
synchronized (lock) {
//noinspection BlockingMethodInNonBlockingContext
result = firstPageUnsortedCollectorManager.reduce(collectors);
release = Mono.when(indexSearcherReleasersArray);
}
IndexSearchers indexSearchers;
synchronized (lock) {
@ -125,7 +132,8 @@ class UnscoredLuceneShardSearcher implements LuceneShardSearcher {
return new LuceneSearchResult(result.totalHits.value, firstPageHits
.concatWith(nextHits)
.transform(flux -> LuceneUtils.filterTopDoc(flux, queryParams))
.transform(flux -> LuceneUtils.filterTopDoc(flux, queryParams)),
release
);
})
.subscribeOn(scheduler);