Pool LMDB databases
This commit is contained in:
parent
0e9c8c089e
commit
638595f518
@ -6,11 +6,10 @@ import java.io.File;
|
|||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.nio.file.Files;
|
import java.nio.file.Files;
|
||||||
import java.nio.file.Path;
|
import java.nio.file.Path;
|
||||||
import java.time.Duration;
|
|
||||||
import java.util.Comparator;
|
import java.util.Comparator;
|
||||||
import java.util.concurrent.Phaser;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import java.util.concurrent.TimeUnit;
|
import org.apache.lucene.search.DocIdSetIterator;
|
||||||
import java.util.concurrent.TimeoutException;
|
import org.apache.lucene.util.BitSet;
|
||||||
import org.lmdbjava.Net5ByteBufProxy;
|
import org.lmdbjava.Net5ByteBufProxy;
|
||||||
import org.lmdbjava.Env;
|
import org.lmdbjava.Env;
|
||||||
import static org.lmdbjava.EnvFlags.*;
|
import static org.lmdbjava.EnvFlags.*;
|
||||||
@ -18,19 +17,24 @@ import static org.lmdbjava.EnvFlags.*;
|
|||||||
public class LLTempLMDBEnv implements Closeable {
|
public class LLTempLMDBEnv implements Closeable {
|
||||||
|
|
||||||
private static final long TWENTY_GIBIBYTES = 20L * 1024L * 1024L * 1024L;
|
private static final long TWENTY_GIBIBYTES = 20L * 1024L * 1024L * 1024L;
|
||||||
private static final int MAX_DATABASES = 16777216;
|
public static final int MAX_DATABASES = 1024;
|
||||||
|
private static final AtomicInteger NEXT_LMDB_ENV_ID = new AtomicInteger(0);
|
||||||
|
private final BitSet freeIds;
|
||||||
|
|
||||||
|
private final int envId;
|
||||||
private final Path tempDirectory;
|
private final Path tempDirectory;
|
||||||
private final Env<ByteBuf> env;
|
private final Env<ByteBuf> env;
|
||||||
private volatile boolean closed;
|
private volatile boolean closed;
|
||||||
|
|
||||||
public LLTempLMDBEnv() throws IOException {
|
public LLTempLMDBEnv() throws IOException {
|
||||||
|
this.envId = NEXT_LMDB_ENV_ID.getAndIncrement();
|
||||||
tempDirectory = Files.createTempDirectory("lmdb");
|
tempDirectory = Files.createTempDirectory("lmdb");
|
||||||
var envBuilder = Env.create(Net5ByteBufProxy.PROXY_NETTY)
|
var envBuilder = Env.create(Net5ByteBufProxy.PROXY_NETTY)
|
||||||
.setMapSize(TWENTY_GIBIBYTES)
|
.setMapSize(TWENTY_GIBIBYTES)
|
||||||
.setMaxDbs(MAX_DATABASES);
|
.setMaxDbs(MAX_DATABASES);
|
||||||
//env = envBuilder.open(tempDirectory.toFile(), MDB_NOLOCK, MDB_NOSYNC, MDB_NOTLS, MDB_NORDAHEAD, MDB_WRITEMAP);
|
//env = envBuilder.open(tempDirectory.toFile(), MDB_NOLOCK, MDB_NOSYNC, MDB_NOTLS, MDB_NORDAHEAD, MDB_WRITEMAP);
|
||||||
env = envBuilder.open(tempDirectory.toFile(), MDB_NOTLS, MDB_NOSYNC, MDB_NORDAHEAD, MDB_NOMETASYNC);
|
env = envBuilder.open(tempDirectory.toFile(), MDB_NOTLS, MDB_NOSYNC, MDB_NORDAHEAD, MDB_NOMETASYNC);
|
||||||
|
freeIds = BitSet.of(DocIdSetIterator.range(0, MAX_DATABASES), MAX_DATABASES);
|
||||||
}
|
}
|
||||||
|
|
||||||
public Env<ByteBuf> getEnv() {
|
public Env<ByteBuf> getEnv() {
|
||||||
@ -40,6 +44,28 @@ public class LLTempLMDBEnv implements Closeable {
|
|||||||
return env;
|
return env;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public int allocateDb() {
|
||||||
|
synchronized (freeIds) {
|
||||||
|
var freeBit = freeIds.nextSetBit(0);
|
||||||
|
if (freeBit == DocIdSetIterator.NO_MORE_DOCS) {
|
||||||
|
throw new IllegalStateException("LMDB databases limit has been reached in environment "
|
||||||
|
+ envId + ": " + MAX_DATABASES);
|
||||||
|
}
|
||||||
|
freeIds.clear(freeBit);
|
||||||
|
return freeBit;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static String stringifyDbId(int bit) {
|
||||||
|
return "$db_" + bit;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void freeDb(int db) {
|
||||||
|
synchronized (freeIds) {
|
||||||
|
freeIds.set(db);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void close() throws IOException {
|
public void close() throws IOException {
|
||||||
this.closed = true;
|
this.closed = true;
|
||||||
|
@ -10,6 +10,8 @@ import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
|
|||||||
import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
|
import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
|
||||||
import it.unimi.dsi.fastutil.longs.Long2ObjectMap;
|
import it.unimi.dsi.fastutil.longs.Long2ObjectMap;
|
||||||
import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
|
import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
|
||||||
|
import it.unimi.dsi.fastutil.longs.LongOpenHashSet;
|
||||||
|
import it.unimi.dsi.fastutil.longs.LongSet;
|
||||||
import java.io.Closeable;
|
import java.io.Closeable;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayDeque;
|
import java.util.ArrayDeque;
|
||||||
@ -23,11 +25,12 @@ import org.lmdbjava.Txn;
|
|||||||
|
|
||||||
public class LMDBArray<V> implements IArray<V>, Closeable {
|
public class LMDBArray<V> implements IArray<V>, Closeable {
|
||||||
|
|
||||||
private static final AtomicLong NEXT_LMDB_ARRAY_ID = new AtomicLong(0);
|
|
||||||
|
|
||||||
private final AtomicBoolean closed = new AtomicBoolean();
|
private final AtomicBoolean closed = new AtomicBoolean();
|
||||||
private final LMDBCodec<V> valueCodec;
|
private final LMDBCodec<V> valueCodec;
|
||||||
|
private final LLTempLMDBEnv tempEnv;
|
||||||
private final Env<ByteBuf> env;
|
private final Env<ByteBuf> env;
|
||||||
|
private final int lmdbDbId;
|
||||||
private final Dbi<ByteBuf> lmdb;
|
private final Dbi<ByteBuf> lmdb;
|
||||||
private final V defaultValue;
|
private final V defaultValue;
|
||||||
|
|
||||||
@ -43,10 +46,11 @@ public class LMDBArray<V> implements IArray<V>, Closeable {
|
|||||||
private final long virtualSize;
|
private final long virtualSize;
|
||||||
|
|
||||||
public LMDBArray(LLTempLMDBEnv env, LMDBCodec<V> codec, long size, @Nullable V defaultValue) {
|
public LMDBArray(LLTempLMDBEnv env, LMDBCodec<V> codec, long size, @Nullable V defaultValue) {
|
||||||
var name = "$array_" + NEXT_LMDB_ARRAY_ID.getAndIncrement();
|
|
||||||
this.valueCodec = codec;
|
this.valueCodec = codec;
|
||||||
|
this.tempEnv = env;
|
||||||
this.env = env.getEnv();
|
this.env = env.getEnv();
|
||||||
this.lmdb = this.env.openDbi(name, MDB_CREATE);
|
this.lmdbDbId = env.allocateDb();
|
||||||
|
this.lmdb = this.env.openDbi(LLTempLMDBEnv.stringifyDbId(lmdbDbId), MDB_CREATE);
|
||||||
this.defaultValue = defaultValue;
|
this.defaultValue = defaultValue;
|
||||||
|
|
||||||
this.writing = true;
|
this.writing = true;
|
||||||
@ -252,6 +256,7 @@ public class LMDBArray<V> implements IArray<V>, Closeable {
|
|||||||
txn.commit();
|
txn.commit();
|
||||||
}
|
}
|
||||||
lmdb.close();
|
lmdb.close();
|
||||||
|
this.tempEnv.freeDb(lmdbDbId);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -30,12 +30,13 @@ import reactor.util.function.Tuples;
|
|||||||
|
|
||||||
public class LMDBPriorityQueue<T> implements PriorityQueue<T>, Reversable<ReversableResourceIterable<T>>, ReversableResourceIterable<T> {
|
public class LMDBPriorityQueue<T> implements PriorityQueue<T>, Reversable<ReversableResourceIterable<T>>, ReversableResourceIterable<T> {
|
||||||
|
|
||||||
private static final AtomicLong NEXT_LMDB_QUEUE_ID = new AtomicLong(0);
|
|
||||||
private static final AtomicLong NEXT_ITEM_UID = new AtomicLong(0);
|
private static final AtomicLong NEXT_ITEM_UID = new AtomicLong(0);
|
||||||
|
|
||||||
private final AtomicBoolean closed = new AtomicBoolean();
|
private final AtomicBoolean closed = new AtomicBoolean();
|
||||||
private final LMDBSortedCodec<T> codec;
|
private final LMDBSortedCodec<T> codec;
|
||||||
|
private final LLTempLMDBEnv tempEnv;
|
||||||
private final Env<ByteBuf> env;
|
private final Env<ByteBuf> env;
|
||||||
|
private final int lmdbDbId;
|
||||||
private final Dbi<ByteBuf> lmdb;
|
private final Dbi<ByteBuf> lmdb;
|
||||||
|
|
||||||
private boolean writing;
|
private boolean writing;
|
||||||
@ -54,10 +55,11 @@ public class LMDBPriorityQueue<T> implements PriorityQueue<T>, Reversable<Revers
|
|||||||
private long size = 0;
|
private long size = 0;
|
||||||
|
|
||||||
public LMDBPriorityQueue(LLTempLMDBEnv env, LMDBSortedCodec<T> codec) {
|
public LMDBPriorityQueue(LLTempLMDBEnv env, LMDBSortedCodec<T> codec) {
|
||||||
var name = "$queue_" + NEXT_LMDB_QUEUE_ID.getAndIncrement();
|
|
||||||
this.codec = codec;
|
this.codec = codec;
|
||||||
|
this.tempEnv = env;
|
||||||
this.env = env.getEnv();
|
this.env = env.getEnv();
|
||||||
this.lmdb = this.env.openDbi(name, codec::compareDirect, MDB_CREATE, MDB_DUPSORT, MDB_DUPFIXED);
|
this.lmdbDbId = env.allocateDb();
|
||||||
|
this.lmdb = this.env.openDbi(LLTempLMDBEnv.stringifyDbId(lmdbDbId), codec::compareDirect, MDB_CREATE, MDB_DUPSORT, MDB_DUPFIXED);
|
||||||
|
|
||||||
this.writing = true;
|
this.writing = true;
|
||||||
this.iterating = false;
|
this.iterating = false;
|
||||||
@ -520,6 +522,7 @@ public class LMDBPriorityQueue<T> implements PriorityQueue<T>, Reversable<Revers
|
|||||||
txn.commit();
|
txn.commit();
|
||||||
}
|
}
|
||||||
lmdb.close();
|
lmdb.close();
|
||||||
|
this.tempEnv.freeDb(lmdbDbId);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user