Improve temporary LMDB database lifecycle

This commit is contained in:
Andrea Cavalli 2021-10-16 14:35:04 +02:00
parent a441c2fb85
commit f77784fc50
9 changed files with 132 additions and 95 deletions

View File

@ -12,8 +12,13 @@ import it.cavallium.dbengine.lucene.LuceneHacks;
import it.cavallium.dbengine.netty.JMXNettyMonitoringManager;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Duration;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.checkerframework.checker.units.qual.A;
import org.jetbrains.annotations.Nullable;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
@ -24,8 +29,10 @@ public class LLLocalDatabaseConnection implements LLDatabaseConnection {
JMXNettyMonitoringManager.initialize();
}
private final AtomicBoolean connected = new AtomicBoolean();
private final BufferAllocator allocator;
private final Path basePath;
private final AtomicReference<LLTempLMDBEnv> env = new AtomicReference<>();
public LLLocalDatabaseConnection(BufferAllocator allocator, Path basePath) {
this.allocator = allocator;
@ -41,9 +48,16 @@ public class LLLocalDatabaseConnection implements LLDatabaseConnection {
public Mono<LLDatabaseConnection> connect() {
return Mono
.<LLDatabaseConnection>fromCallable(() -> {
if (!connected.compareAndSet(false, true)) {
throw new IllegalStateException("Already connected");
}
if (Files.notExists(basePath)) {
Files.createDirectories(basePath);
}
var prev = env.getAndSet(new LLTempLMDBEnv());
if (prev != null) {
throw new IllegalStateException("Env was already set");
}
return this;
})
.subscribeOn(Schedulers.boundedElastic());
@ -75,7 +89,10 @@ public class LLLocalDatabaseConnection implements LLDatabaseConnection {
return Mono
.fromCallable(() -> {
if (instancesCount != 1) {
return new LLLocalMultiLuceneIndex(basePath.resolve("lucene"),
var env = this.env.get();
Objects.requireNonNull(env, "Environment not set");
return new LLLocalMultiLuceneIndex(env,
basePath.resolve("lucene"),
name,
instancesCount,
indicizerAnalyzers,
@ -98,6 +115,14 @@ public class LLLocalDatabaseConnection implements LLDatabaseConnection {
@Override
public Mono<Void> disconnect() {
return Mono.empty();
return Mono.<Void>fromCallable(() -> {
if (connected.compareAndSet(true, false)) {
var env = this.env.get();
if (env != null) {
env.close(Duration.ofSeconds(30));
}
}
return null;
}).subscribeOn(Schedulers.boundedElastic());
}
}

View File

@ -47,7 +47,8 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex {
private final MultiSearcher multiSearcher;
public LLLocalMultiLuceneIndex(Path lucene,
public LLLocalMultiLuceneIndex(LLTempLMDBEnv env,
Path lucene,
String name,
int instancesCount,
IndicizerAnalyzers indicizerAnalyzers,
@ -82,7 +83,7 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex {
if (luceneHacks != null && luceneHacks.customMultiSearcher() != null) {
multiSearcher = luceneHacks.customMultiSearcher().get();
} else {
multiSearcher = new AdaptiveMultiSearcher();
multiSearcher = new AdaptiveMultiSearcher(env);
}
}

View File

@ -6,32 +6,39 @@ import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Duration;
import java.util.Comparator;
import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.lmdbjava.Net5ByteBufProxy;
import org.lmdbjava.Env;
import static org.lmdbjava.EnvFlags.*;
public class LLTempLMDBEnv implements Closeable {
private static final long TEN_MEBYBYTES = 10_485_760;
private static final long TWENTY_GIBIBYTES = 20L * 1024L * 1024L * 1024L;
private static final int MAX_DATABASES = 1024;
private final Phaser resources = new Phaser(1);
private final Path tempDirectory;
private final Env<ByteBuf> env;
private volatile boolean closed;
public LLTempLMDBEnv() throws IOException {
tempDirectory = Files.createTempDirectory("lmdb");
var envBuilder = Env.create(Net5ByteBufProxy.PROXY_NETTY)
.setMapSize(TEN_MEBYBYTES)
.setMapSize(TWENTY_GIBIBYTES)
.setMaxDbs(MAX_DATABASES);
//env = envBuilder.open(tempDirectory.toFile(), MDB_NOLOCK, MDB_NOSYNC, MDB_NOTLS, MDB_NORDAHEAD, MDB_WRITEMAP);
env = envBuilder.open(tempDirectory.toFile(), MDB_NOTLS, MDB_WRITEMAP, MDB_NORDAHEAD);
}
public Env<ByteBuf> getEnvAndIncrementRef() {
if (closed) {
throw new IllegalStateException("Environment closed");
}
resources.register();
return env;
}
@ -42,8 +49,20 @@ public class LLTempLMDBEnv implements Closeable {
@Override
public void close() throws IOException {
this.closed = true;
resources.arriveAndAwaitAdvance();
closeInternal();
}
public void close(Duration timeout) throws InterruptedException, TimeoutException, IOException {
this.closed = true;
int phase = resources.arrive();
resources.awaitAdvanceInterruptibly(phase, timeout.toMillis(), TimeUnit.MILLISECONDS);
closeInternal();
}
private void closeInternal() throws IOException {
this.closed = true;
env.close();
//noinspection ResultOfMethodCallIgnored
Files.walk(tempDirectory)

View File

@ -3,12 +3,13 @@ package it.cavallium.dbengine.lucene.searcher;
import io.net5.buffer.api.Send;
import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.disk.LLIndexSearchers;
import it.cavallium.dbengine.database.disk.LLTempLMDBEnv;
import it.cavallium.dbengine.lucene.searcher.LLSearchTransformer.TransformerInput;
import java.io.Closeable;
import java.io.IOException;
import reactor.core.publisher.Mono;
public class AdaptiveMultiSearcher implements MultiSearcher, Closeable {
public class AdaptiveMultiSearcher implements MultiSearcher {
private static final MultiSearcher count
= new UnsortedUnscoredSimpleMultiSearcher(new CountLocalSearcher());
@ -25,9 +26,9 @@ public class AdaptiveMultiSearcher implements MultiSearcher, Closeable {
private final SortedScoredFullMultiSearcher sortedScoredFull;
public AdaptiveMultiSearcher() throws IOException {
unsortedScoredFull = new UnsortedScoredFullMultiSearcher();
sortedScoredFull = new SortedScoredFullMultiSearcher();
public AdaptiveMultiSearcher(LLTempLMDBEnv env) {
unsortedScoredFull = new UnsortedScoredFullMultiSearcher(env);
sortedScoredFull = new SortedScoredFullMultiSearcher(env);
}
@Override
@ -76,12 +77,6 @@ public class AdaptiveMultiSearcher implements MultiSearcher, Closeable {
}, true);
}
@Override
public void close() throws IOException {
sortedScoredFull.close();
unsortedScoredFull.close();
}
@Override
public String getName() {
return "adaptive local";

View File

@ -25,14 +25,14 @@ import org.warp.commonutils.log.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
public class OfficialSearcher implements MultiSearcher, Closeable {
public class OfficialSearcher implements MultiSearcher {
protected static final Logger logger = LoggerFactory.getLogger(OfficialSearcher.class);
private final LLTempLMDBEnv env;
public OfficialSearcher() throws IOException {
this.env = new LLTempLMDBEnv();
public OfficialSearcher(LLTempLMDBEnv env) {
this.env = env;
}
@Override
@ -119,11 +119,6 @@ public class OfficialSearcher implements MultiSearcher, Closeable {
});
}
@Override
public void close() throws IOException {
env.close();
}
@Override
public String getName() {
return "official";

View File

@ -14,20 +14,21 @@ import it.cavallium.dbengine.lucene.collector.LMDBFullScoreDocCollector;
import it.cavallium.dbengine.lucene.searcher.LLSearchTransformer.TransformerInput;
import java.io.Closeable;
import java.io.IOException;
import java.util.ServiceLoader;
import org.apache.lucene.search.IndexSearcher;
import org.warp.commonutils.log.Logger;
import org.warp.commonutils.log.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
public class SortedScoredFullMultiSearcher implements MultiSearcher, Closeable {
public class SortedScoredFullMultiSearcher implements MultiSearcher {
protected static final Logger logger = LoggerFactory.getLogger(SortedScoredFullMultiSearcher.class);
private final LLTempLMDBEnv env;
public SortedScoredFullMultiSearcher() throws IOException {
this.env = new LLTempLMDBEnv();
public SortedScoredFullMultiSearcher(LLTempLMDBEnv env) {
this.env = env;
}
@Override
@ -108,11 +109,6 @@ public class SortedScoredFullMultiSearcher implements MultiSearcher, Closeable {
});
}
@Override
public void close() throws IOException {
env.close();
}
@Override
public String getName() {
return "sorted scored full multi";

View File

@ -20,14 +20,14 @@ import org.warp.commonutils.log.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
public class UnsortedScoredFullMultiSearcher implements MultiSearcher, Closeable {
public class UnsortedScoredFullMultiSearcher implements MultiSearcher {
protected static final Logger logger = LoggerFactory.getLogger(UnsortedScoredFullMultiSearcher.class);
private final LLTempLMDBEnv env;
public UnsortedScoredFullMultiSearcher() throws IOException {
this.env = new LLTempLMDBEnv();
public UnsortedScoredFullMultiSearcher(LLTempLMDBEnv env) {
this.env = env;
}
@Override
@ -114,11 +114,6 @@ public class UnsortedScoredFullMultiSearcher implements MultiSearcher, Closeable
});
}
@Override
public void close() throws IOException {
env.close();
}
@Override
public String getName() {
return "unsorted scored full multi";

View File

@ -15,6 +15,7 @@ import it.cavallium.dbengine.client.SearchResultKey;
import it.cavallium.dbengine.client.query.current.data.MatchAllDocsQuery;
import it.cavallium.dbengine.database.LLLuceneIndex;
import it.cavallium.dbengine.database.LLScoreMode;
import it.cavallium.dbengine.database.disk.LLTempLMDBEnv;
import it.cavallium.dbengine.lucene.searcher.AdaptiveLocalSearcher;
import it.cavallium.dbengine.lucene.searcher.AdaptiveMultiSearcher;
import it.cavallium.dbengine.lucene.searcher.CountLocalSearcher;
@ -22,11 +23,16 @@ import it.cavallium.dbengine.lucene.searcher.LocalSearcher;
import it.cavallium.dbengine.lucene.searcher.MultiSearcher;
import it.cavallium.dbengine.lucene.searcher.UnsortedUnscoredSimpleMultiSearcher;
import java.io.IOException;
import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Stream;
import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
@ -40,6 +46,7 @@ import reactor.util.function.Tuples;
public class TestLuceneIndex {
private final Logger log = LoggerFactory.getLogger(this.getClass());
private static LLTempLMDBEnv ENV;
private TestAllocator allocator;
private TempDb tempDb;
@ -50,6 +57,11 @@ public class TestLuceneIndex {
return new MemoryTemporaryDbGenerator();
}
@BeforeAll
public static void beforeAll() throws IOException {
ENV = new LLTempLMDBEnv();
}
@BeforeEach
public void beforeEach() {
this.allocator = newAllocator();
@ -106,6 +118,11 @@ public class TestLuceneIndex {
destroyAllocator(allocator);
}
@AfterAll
public static void afterAll() throws IOException, InterruptedException, TimeoutException {
ENV.close(Duration.ofSeconds(10));
}
private LuceneIndex<String, String> getLuceneIndex(boolean shards, @Nullable LocalSearcher customSearcher) {
LuceneIndex<String, String> index = run(DbTestUtils.tempLuceneIndex(shards ? luceneSingle : luceneMulti));
index.updateDocument("test-key-1", "0123456789").block();
@ -127,22 +144,18 @@ public class TestLuceneIndex {
tempDb.swappableLuceneSearcher().setSingle(new CountLocalSearcher());
tempDb.swappableLuceneSearcher().setMulti(new UnsortedUnscoredSimpleMultiSearcher(new CountLocalSearcher()));
assertCount(index, 1000 + 15);
try {
if (customSearcher != null) {
tempDb.swappableLuceneSearcher().setSingle(customSearcher);
if (shards) {
if (customSearcher instanceof MultiSearcher multiSearcher) {
tempDb.swappableLuceneSearcher().setMulti(multiSearcher);
} else {
throw new IllegalArgumentException("Expected a LuceneMultiSearcher, got a LuceneLocalSearcher: " + customSearcher.getName());
}
if (customSearcher != null) {
tempDb.swappableLuceneSearcher().setSingle(customSearcher);
if (shards) {
if (customSearcher instanceof MultiSearcher multiSearcher) {
tempDb.swappableLuceneSearcher().setMulti(multiSearcher);
} else {
throw new IllegalArgumentException("Expected a LuceneMultiSearcher, got a LuceneLocalSearcher: " + customSearcher.getName());
}
} else {
tempDb.swappableLuceneSearcher().setSingle(new AdaptiveLocalSearcher());
tempDb.swappableLuceneSearcher().setMulti(new AdaptiveMultiSearcher());
}
} catch (IOException e) {
fail(e);
} else {
tempDb.swappableLuceneSearcher().setSingle(new AdaptiveLocalSearcher());
tempDb.swappableLuceneSearcher().setMulti(new AdaptiveMultiSearcher(ENV));
}
return index;
}

View File

@ -33,6 +33,7 @@ import it.cavallium.dbengine.client.query.current.data.TotalHitsCount;
import it.cavallium.dbengine.database.LLLuceneIndex;
import it.cavallium.dbengine.database.LLScoreMode;
import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.disk.LLTempLMDBEnv;
import it.cavallium.dbengine.lucene.searcher.AdaptiveLocalSearcher;
import it.cavallium.dbengine.lucene.searcher.AdaptiveMultiSearcher;
import it.cavallium.dbengine.lucene.searcher.CountLocalSearcher;
@ -46,6 +47,7 @@ import it.cavallium.dbengine.lucene.searcher.UnsortedUnscoredSimpleMultiSearcher
import it.cavallium.dbengine.lucene.searcher.UnsortedScoredFullMultiSearcher;
import it.cavallium.dbengine.lucene.searcher.UnsortedUnscoredStreamingMultiSearcher;
import java.io.IOException;
import java.time.Duration;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
@ -55,6 +57,7 @@ import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.lang3.function.FailableConsumer;
@ -75,6 +78,7 @@ import reactor.util.function.Tuples;
public class TestLuceneSearches {
private static final Logger log = LoggerFactory.getLogger(TestLuceneSearches.class);
private static LLTempLMDBEnv ENV;
private static final MemoryTemporaryDbGenerator TEMP_DB_GENERATOR = new MemoryTemporaryDbGenerator();
private static TestAllocator allocator;
@ -110,12 +114,13 @@ public class TestLuceneSearches {
}
@BeforeAll
public static void beforeAll() {
public static void beforeAll() throws IOException {
allocator = newAllocator();
ensureNoLeaks(allocator.allocator(), false, false);
tempDb = Objects.requireNonNull(TEMP_DB_GENERATOR.openTempDb(allocator).block(), "TempDB");
luceneSingle = tempDb.luceneSingle();
luceneMulti = tempDb.luceneMulti();
ENV = new LLTempLMDBEnv();
setUpIndex(true);
setUpIndex(false);
@ -158,35 +163,31 @@ public class TestLuceneSearches {
private static Flux<LocalSearcher> getSearchers(ExpectedQueryType info) {
return Flux.push(sink -> {
try {
if (info.shard()) {
if (info.onlyCount()) {
sink.next(new UnsortedUnscoredSimpleMultiSearcher(new CountLocalSearcher()));
} else {
sink.next(new ScoredPagedMultiSearcher());
if (info.sorted() && !info.sortedByScore()) {
sink.next(new SortedScoredFullMultiSearcher());
} else {
sink.next(new UnsortedScoredFullMultiSearcher());
}
if (!info.sorted()) {
sink.next(new UnsortedUnscoredSimpleMultiSearcher(new PagedLocalSearcher()));
sink.next(new UnsortedUnscoredStreamingMultiSearcher());
}
}
sink.next(new AdaptiveMultiSearcher());
if (info.shard()) {
if (info.onlyCount()) {
sink.next(new UnsortedUnscoredSimpleMultiSearcher(new CountLocalSearcher()));
} else {
if (info.onlyCount()) {
sink.next(new CountLocalSearcher());
sink.next(new ScoredPagedMultiSearcher());
if (info.sorted() && !info.sortedByScore()) {
sink.next(new SortedScoredFullMultiSearcher(ENV));
} else {
sink.next(new PagedLocalSearcher());
sink.next(new UnsortedScoredFullMultiSearcher(ENV));
}
if (!info.sorted()) {
sink.next(new UnsortedUnscoredSimpleMultiSearcher(new PagedLocalSearcher()));
sink.next(new UnsortedUnscoredStreamingMultiSearcher());
}
sink.next(new AdaptiveLocalSearcher());
}
sink.complete();
} catch (IOException e) {
sink.error(e);
sink.next(new AdaptiveMultiSearcher(ENV));
} else {
if (info.onlyCount()) {
sink.next(new CountLocalSearcher());
} else {
sink.next(new PagedLocalSearcher());
}
sink.next(new AdaptiveLocalSearcher());
}
sink.complete();
}, OverflowStrategy.BUFFER);
}
@ -211,29 +212,26 @@ public class TestLuceneSearches {
}
@AfterAll
public static void afterAll() {
public static void afterAll() throws IOException, InterruptedException, TimeoutException {
TEMP_DB_GENERATOR.closeTempDb(tempDb).block();
ENV.close(Duration.ofSeconds(10));
ensureNoLeaks(allocator.allocator(), true, false);
destroyAllocator(allocator);
}
private LuceneIndex<String, String> getLuceneIndex(boolean shards, @Nullable LocalSearcher customSearcher) {
try {
if (customSearcher != null) {
tempDb.swappableLuceneSearcher().setSingle(customSearcher);
if (shards) {
if (customSearcher instanceof MultiSearcher multiSearcher) {
tempDb.swappableLuceneSearcher().setMulti(multiSearcher);
} else {
throw new IllegalArgumentException("Expected a LuceneMultiSearcher, got a LuceneLocalSearcher: " + customSearcher.getName());
}
if (customSearcher != null) {
tempDb.swappableLuceneSearcher().setSingle(customSearcher);
if (shards) {
if (customSearcher instanceof MultiSearcher multiSearcher) {
tempDb.swappableLuceneSearcher().setMulti(multiSearcher);
} else {
throw new IllegalArgumentException("Expected a LuceneMultiSearcher, got a LuceneLocalSearcher: " + customSearcher.getName());
}
} else {
tempDb.swappableLuceneSearcher().setSingle(new AdaptiveLocalSearcher());
tempDb.swappableLuceneSearcher().setMulti(new AdaptiveMultiSearcher());
}
} catch (IOException e) {
fail(e);
} else {
tempDb.swappableLuceneSearcher().setSingle(new AdaptiveLocalSearcher());
tempDb.swappableLuceneSearcher().setMulti(new AdaptiveMultiSearcher(ENV));
}
return shards ? multiIndex : localIndex;
}
@ -276,7 +274,7 @@ public class TestLuceneSearches {
Assertions.assertTrue(keys.size() >= hits.value());
}
var officialSearcher = new OfficialSearcher();
var officialSearcher = new OfficialSearcher(ENV);
luceneIndex = getLuceneIndex(expectedQueryType.shard(), officialSearcher);
var officialQuery = queryParamsBuilder.limit(ELEMENTS.size() * 2L).build();
try (var officialResults = run(luceneIndex.search(officialQuery)).receive()) {