diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLTempLMDBEnv.java b/src/main/java/it/cavallium/dbengine/database/disk/LLTempLMDBEnv.java index c47d0ed..dfce40b 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLTempLMDBEnv.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLTempLMDBEnv.java @@ -6,11 +6,10 @@ import java.io.File; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; -import java.time.Duration; import java.util.Comparator; -import java.util.concurrent.Phaser; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.lucene.search.DocIdSetIterator; +import org.apache.lucene.util.BitSet; import org.lmdbjava.Net5ByteBufProxy; import org.lmdbjava.Env; import static org.lmdbjava.EnvFlags.*; @@ -18,19 +17,24 @@ import static org.lmdbjava.EnvFlags.*; public class LLTempLMDBEnv implements Closeable { private static final long TWENTY_GIBIBYTES = 20L * 1024L * 1024L * 1024L; - private static final int MAX_DATABASES = 16777216; + public static final int MAX_DATABASES = 1024; + private static final AtomicInteger NEXT_LMDB_ENV_ID = new AtomicInteger(0); + private final BitSet freeIds; + private final int envId; private final Path tempDirectory; private final Env env; private volatile boolean closed; public LLTempLMDBEnv() throws IOException { + this.envId = NEXT_LMDB_ENV_ID.getAndIncrement(); tempDirectory = Files.createTempDirectory("lmdb"); var envBuilder = Env.create(Net5ByteBufProxy.PROXY_NETTY) .setMapSize(TWENTY_GIBIBYTES) .setMaxDbs(MAX_DATABASES); //env = envBuilder.open(tempDirectory.toFile(), MDB_NOLOCK, MDB_NOSYNC, MDB_NOTLS, MDB_NORDAHEAD, MDB_WRITEMAP); env = envBuilder.open(tempDirectory.toFile(), MDB_NOTLS, MDB_NOSYNC, MDB_NORDAHEAD, MDB_NOMETASYNC); + freeIds = BitSet.of(DocIdSetIterator.range(0, MAX_DATABASES), MAX_DATABASES); } public Env getEnv() { @@ -40,6 +44,28 @@ public class LLTempLMDBEnv implements Closeable { return env; } + public int allocateDb() { + synchronized (freeIds) { + var freeBit = freeIds.nextSetBit(0); + if (freeBit == DocIdSetIterator.NO_MORE_DOCS) { + throw new IllegalStateException("LMDB databases limit has been reached in environment " + + envId + ": " + MAX_DATABASES); + } + freeIds.clear(freeBit); + return freeBit; + } + } + + public static String stringifyDbId(int bit) { + return "$db_" + bit; + } + + public void freeDb(int db) { + synchronized (freeIds) { + freeIds.set(db); + } + } + @Override public void close() throws IOException { this.closed = true; diff --git a/src/main/java/it/cavallium/dbengine/lucene/LMDBArray.java b/src/main/java/it/cavallium/dbengine/lucene/LMDBArray.java index a3760a8..23f3a34 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/LMDBArray.java +++ b/src/main/java/it/cavallium/dbengine/lucene/LMDBArray.java @@ -10,6 +10,8 @@ import it.unimi.dsi.fastutil.ints.Int2ObjectMap; import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap; import it.unimi.dsi.fastutil.longs.Long2ObjectMap; import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap; +import it.unimi.dsi.fastutil.longs.LongOpenHashSet; +import it.unimi.dsi.fastutil.longs.LongSet; import java.io.Closeable; import java.io.IOException; import java.util.ArrayDeque; @@ -23,11 +25,12 @@ import org.lmdbjava.Txn; public class LMDBArray implements IArray, Closeable { - private static final AtomicLong NEXT_LMDB_ARRAY_ID = new AtomicLong(0); private final AtomicBoolean closed = new AtomicBoolean(); private final LMDBCodec valueCodec; + private final LLTempLMDBEnv tempEnv; private final Env env; + private final int lmdbDbId; private final Dbi lmdb; private final V defaultValue; @@ -43,10 +46,11 @@ public class LMDBArray implements IArray, Closeable { private final long virtualSize; public LMDBArray(LLTempLMDBEnv env, LMDBCodec codec, long size, @Nullable V defaultValue) { - var name = "$array_" + NEXT_LMDB_ARRAY_ID.getAndIncrement(); this.valueCodec = codec; + this.tempEnv = env; this.env = env.getEnv(); - this.lmdb = this.env.openDbi(name, MDB_CREATE); + this.lmdbDbId = env.allocateDb(); + this.lmdb = this.env.openDbi(LLTempLMDBEnv.stringifyDbId(lmdbDbId), MDB_CREATE); this.defaultValue = defaultValue; this.writing = true; @@ -252,6 +256,7 @@ public class LMDBArray implements IArray, Closeable { txn.commit(); } lmdb.close(); + this.tempEnv.freeDb(lmdbDbId); } } diff --git a/src/main/java/it/cavallium/dbengine/lucene/LMDBPriorityQueue.java b/src/main/java/it/cavallium/dbengine/lucene/LMDBPriorityQueue.java index f2b3546..92f71e6 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/LMDBPriorityQueue.java +++ b/src/main/java/it/cavallium/dbengine/lucene/LMDBPriorityQueue.java @@ -30,12 +30,13 @@ import reactor.util.function.Tuples; public class LMDBPriorityQueue implements PriorityQueue, Reversable>, ReversableResourceIterable { - private static final AtomicLong NEXT_LMDB_QUEUE_ID = new AtomicLong(0); private static final AtomicLong NEXT_ITEM_UID = new AtomicLong(0); private final AtomicBoolean closed = new AtomicBoolean(); private final LMDBSortedCodec codec; + private final LLTempLMDBEnv tempEnv; private final Env env; + private final int lmdbDbId; private final Dbi lmdb; private boolean writing; @@ -54,10 +55,11 @@ public class LMDBPriorityQueue implements PriorityQueue, Reversable codec) { - var name = "$queue_" + NEXT_LMDB_QUEUE_ID.getAndIncrement(); this.codec = codec; + this.tempEnv = env; this.env = env.getEnv(); - this.lmdb = this.env.openDbi(name, codec::compareDirect, MDB_CREATE, MDB_DUPSORT, MDB_DUPFIXED); + this.lmdbDbId = env.allocateDb(); + this.lmdb = this.env.openDbi(LLTempLMDBEnv.stringifyDbId(lmdbDbId), codec::compareDirect, MDB_CREATE, MDB_DUPSORT, MDB_DUPFIXED); this.writing = true; this.iterating = false; @@ -520,6 +522,7 @@ public class LMDBPriorityQueue implements PriorityQueue, Reversable