Update Example.java, LLDatabaseConnection.java, and 4 more files...

This commit is contained in:
Andrea Cavalli 2021-01-31 12:43:28 +01:00
parent da0ba7d6bb
commit 345bc81252
6 changed files with 84 additions and 42 deletions

View File

@ -0,0 +1,17 @@
package it.cavallium.dbengine.client;
import it.cavallium.dbengine.database.Column;
import it.cavallium.dbengine.database.disk.LLLocalDatabaseConnection;
import java.nio.file.Path;
import java.util.List;
public class Example {
public static void main(String[] args) {
System.out.println("Test");
new LLLocalDatabaseConnection(Path.of("/tmp/"), true)
.connect()
.flatMap(conn -> conn.getDatabase("testdb", List.of(Column.hashMap("testmap")), false))
.block();
}
}

View File

@ -1,23 +1,22 @@
package it.cavallium.dbengine.database; package it.cavallium.dbengine.database;
import it.cavallium.dbengine.database.analyzer.TextFieldsAnalyzer; import it.cavallium.dbengine.database.analyzer.TextFieldsAnalyzer;
import java.io.IOException;
import java.time.Duration; import java.time.Duration;
import java.util.List; import java.util.List;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
public interface LLDatabaseConnection { public interface LLDatabaseConnection {
Mono<Void> connect(); Mono<? extends LLDatabaseConnection> connect();
LLKeyValueDatabase getDatabase(String name, List<Column> columns, boolean lowMemory) throws IOException; Mono<? extends LLKeyValueDatabase> getDatabase(String name, List<Column> columns, boolean lowMemory);
LLLuceneIndex getLuceneIndex(String name, Mono<? extends LLLuceneIndex> getLuceneIndex(String name,
int instancesCount, int instancesCount,
TextFieldsAnalyzer textFieldsAnalyzer, TextFieldsAnalyzer textFieldsAnalyzer,
Duration queryRefreshDebounceTime, Duration queryRefreshDebounceTime,
Duration commitDebounceTime, Duration commitDebounceTime,
boolean lowMemory) throws IOException; boolean lowMemory);
Mono<Void> disconnect(); Mono<Void> disconnect();
} }

View File

@ -7,24 +7,35 @@ import org.jetbrains.annotations.Nullable;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
public class SubStageGetterMapDeep<U, US extends DatabaseStage<U>> implements public class SubStageGetterMapDeep<T, U, US extends DatabaseStage<U>> implements
SubStageGetter<Map<byte[], U>, DatabaseStageEntry<Map<byte[], U>>> { SubStageGetter<Map<T, U>, DatabaseStageEntry<Map<T, U>>> {
private final SubStageGetter<U, US> subStageGetter; private final SubStageGetter<U, US> subStageGetter;
private final FixedLengthSerializer<T> keySerializer;
private final int keyLength; private final int keyLength;
private final int keyExtLength; private final int keyExtLength;
public SubStageGetterMapDeep(SubStageGetter<U, US> subStageGetter, int keyLength, int keyExtLength) { public SubStageGetterMapDeep(SubStageGetter<U, US> subStageGetter,
FixedLengthSerializer<T> keySerializer,
int keyLength,
int keyExtLength) {
this.subStageGetter = subStageGetter; this.subStageGetter = subStageGetter;
this.keySerializer = keySerializer;
this.keyLength = keyLength; this.keyLength = keyLength;
this.keyExtLength = keyExtLength; this.keyExtLength = keyExtLength;
} }
@Override @Override
public Mono<DatabaseStageEntry<Map<byte[], U>>> subStage(LLDictionary dictionary, public Mono<DatabaseStageEntry<Map<T, U>>> subStage(LLDictionary dictionary,
@Nullable CompositeSnapshot snapshot, @Nullable CompositeSnapshot snapshot,
byte[] prefixKey, byte[] prefixKey,
Flux<byte[]> keyFlux) { Flux<byte[]> keyFlux) {
return Mono.just(new DatabaseMapDictionary<>(dictionary, subStageGetter, prefixKey, keyLength, keyExtLength)); return Mono.just(new DatabaseMapDictionary<>(dictionary,
subStageGetter,
keySerializer,
prefixKey,
keyLength,
keyExtLength
));
} }
} }

View File

@ -1,15 +1,14 @@
package it.cavallium.dbengine.database.disk; package it.cavallium.dbengine.database.disk;
import java.io.IOException; import it.cavallium.dbengine.database.Column;
import it.cavallium.dbengine.database.LLDatabaseConnection;
import it.cavallium.dbengine.database.LLLuceneIndex;
import it.cavallium.dbengine.database.analyzer.TextFieldsAnalyzer;
import java.nio.file.Files; import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
import java.time.Duration; import java.time.Duration;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import it.cavallium.dbengine.database.Column;
import it.cavallium.dbengine.database.LLDatabaseConnection;
import it.cavallium.dbengine.database.LLLuceneIndex;
import it.cavallium.dbengine.database.analyzer.TextFieldsAnalyzer;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers; import reactor.core.scheduler.Schedulers;
@ -24,48 +23,59 @@ public class LLLocalDatabaseConnection implements LLDatabaseConnection {
} }
@Override @Override
public Mono<Void> connect() { public Mono<LLDatabaseConnection> connect() {
return Mono return Mono
.<Void>fromCallable(() -> { .<LLDatabaseConnection>fromCallable(() -> {
if (Files.notExists(basePath)) { if (Files.notExists(basePath)) {
Files.createDirectories(basePath); Files.createDirectories(basePath);
} }
return null; return this;
}) })
.subscribeOn(Schedulers.boundedElastic()); .subscribeOn(Schedulers.boundedElastic());
} }
@Override @Override
public LLLocalKeyValueDatabase getDatabase(String name, List<Column> columns, boolean lowMemory) throws IOException { public Mono<LLLocalKeyValueDatabase> getDatabase(String name, List<Column> columns, boolean lowMemory) {
return new LLLocalKeyValueDatabase(name, basePath.resolve("database_" + name), columns, new LinkedList<>(), return Mono
crashIfWalError, lowMemory); .<LLLocalKeyValueDatabase>fromCallable(() -> new LLLocalKeyValueDatabase(name,
basePath.resolve("database_" + name),
columns,
new LinkedList<>(),
crashIfWalError,
lowMemory
))
.subscribeOn(Schedulers.boundedElastic());
} }
@Override @Override
public LLLuceneIndex getLuceneIndex(String name, public Mono<LLLuceneIndex> getLuceneIndex(String name,
int instancesCount, int instancesCount,
TextFieldsAnalyzer textFieldsAnalyzer, TextFieldsAnalyzer textFieldsAnalyzer,
Duration queryRefreshDebounceTime, Duration queryRefreshDebounceTime,
Duration commitDebounceTime, Duration commitDebounceTime,
boolean lowMemory) throws IOException { boolean lowMemory) {
if (instancesCount != 1) { return Mono
return new LLLocalMultiLuceneIndex(basePath.resolve("lucene"), .fromCallable(() -> {
name, if (instancesCount != 1) {
instancesCount, return new LLLocalMultiLuceneIndex(basePath.resolve("lucene"),
textFieldsAnalyzer, name,
queryRefreshDebounceTime, instancesCount,
commitDebounceTime, textFieldsAnalyzer,
lowMemory queryRefreshDebounceTime,
); commitDebounceTime,
} else { lowMemory
return new LLLocalLuceneIndex(basePath.resolve("lucene"), );
name, } else {
textFieldsAnalyzer, return new LLLocalLuceneIndex(basePath.resolve("lucene"),
queryRefreshDebounceTime, name,
commitDebounceTime, textFieldsAnalyzer,
lowMemory queryRefreshDebounceTime,
); commitDebounceTime,
} lowMemory
);
}
})
.subscribeOn(Schedulers.boundedElastic());
} }
@Override @Override

View File

@ -61,6 +61,8 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
Options options = openRocksDb(path, crashIfWalError, lowMemory); Options options = openRocksDb(path, crashIfWalError, lowMemory);
try { try {
List<ColumnFamilyDescriptor> descriptors = new LinkedList<>(); List<ColumnFamilyDescriptor> descriptors = new LinkedList<>();
descriptors
.add(new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY));
for (Column column : columns) { for (Column column : columns) {
descriptors descriptors
.add(new ColumnFamilyDescriptor(column.getName().getBytes(StandardCharsets.US_ASCII))); .add(new ColumnFamilyDescriptor(column.getName().getBytes(StandardCharsets.US_ASCII)));
@ -240,6 +242,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
}); });
List<ColumnFamilyDescriptor> descriptors = new LinkedList<>(); List<ColumnFamilyDescriptor> descriptors = new LinkedList<>();
descriptors.add(new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY));
for (byte[] existingColumnFamily : existingColumnFamilies) { for (byte[] existingColumnFamily : existingColumnFamilies) {
descriptors.add(new ColumnFamilyDescriptor(existingColumnFamily)); descriptors.add(new ColumnFamilyDescriptor(existingColumnFamily));
} }

View File

@ -1,3 +1,5 @@
package it.cavallium.dbengine.client; package it.cavallium.dbengine.client;
public class Database {} public class Database {
}