2021-10-15 22:03:53 +02:00
|
|
|
package it.cavallium.dbengine.lucene;
|
|
|
|
|
|
|
|
import static org.lmdbjava.DbiFlags.MDB_CREATE;
|
|
|
|
|
|
|
|
import io.net5.buffer.ByteBuf;
|
|
|
|
import io.net5.buffer.PooledByteBufAllocator;
|
|
|
|
import it.cavallium.dbengine.database.LLUtils;
|
|
|
|
import it.cavallium.dbengine.database.disk.LLTempLMDBEnv;
|
2021-10-25 01:54:37 +02:00
|
|
|
import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
|
|
|
|
import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
|
|
|
|
import it.unimi.dsi.fastutil.longs.Long2ObjectMap;
|
|
|
|
import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
|
2021-10-15 22:03:53 +02:00
|
|
|
import java.io.Closeable;
|
|
|
|
import java.io.IOException;
|
2021-10-25 01:41:02 +02:00
|
|
|
import java.util.ArrayDeque;
|
|
|
|
import java.util.Deque;
|
2021-10-15 22:03:53 +02:00
|
|
|
import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
|
import java.util.concurrent.atomic.AtomicLong;
|
|
|
|
import org.jetbrains.annotations.Nullable;
|
|
|
|
import org.lmdbjava.Dbi;
|
|
|
|
import org.lmdbjava.Env;
|
|
|
|
import org.lmdbjava.Txn;
|
|
|
|
|
|
|
|
public class LMDBArray<V> implements IArray<V>, Closeable {
|
|
|
|
|
|
|
|
private static final AtomicLong NEXT_LMDB_ARRAY_ID = new AtomicLong(0);
|
|
|
|
|
|
|
|
private final AtomicBoolean closed = new AtomicBoolean();
|
|
|
|
private final LMDBCodec<V> valueCodec;
|
|
|
|
private final Env<ByteBuf> env;
|
|
|
|
private final Dbi<ByteBuf> lmdb;
|
|
|
|
private final V defaultValue;
|
|
|
|
|
|
|
|
private boolean writing;
|
|
|
|
private Txn<ByteBuf> readTxn;
|
|
|
|
private Txn<ByteBuf> rwTxn;
|
|
|
|
|
2021-10-25 01:41:02 +02:00
|
|
|
// Cache
|
|
|
|
private static final int WRITE_QUEUE_MAX_BOUND = 10_000;
|
2021-10-25 01:54:37 +02:00
|
|
|
private final Long2ObjectMap<ByteBuf> toWrite = new Long2ObjectOpenHashMap<>();
|
2021-10-25 01:41:02 +02:00
|
|
|
|
2021-10-15 22:03:53 +02:00
|
|
|
private long allocatedSize = 0;
|
|
|
|
private final long virtualSize;
|
|
|
|
|
|
|
|
public LMDBArray(LLTempLMDBEnv env, LMDBCodec<V> codec, long size, @Nullable V defaultValue) {
|
|
|
|
var name = "$array_" + NEXT_LMDB_ARRAY_ID.getAndIncrement();
|
|
|
|
this.valueCodec = codec;
|
2021-10-16 14:59:38 +02:00
|
|
|
this.env = env.getEnv();
|
2021-10-15 22:03:53 +02:00
|
|
|
this.lmdb = this.env.openDbi(name, MDB_CREATE);
|
|
|
|
this.defaultValue = defaultValue;
|
|
|
|
|
|
|
|
this.writing = true;
|
2021-10-25 00:44:30 +02:00
|
|
|
this.rwTxn = null;
|
2021-10-15 22:03:53 +02:00
|
|
|
this.readTxn = null;
|
|
|
|
this.virtualSize = size;
|
|
|
|
}
|
|
|
|
|
|
|
|
public LMDBCodec<V> getValueCodec() {
|
|
|
|
return valueCodec;
|
|
|
|
}
|
|
|
|
|
|
|
|
private ByteBuf allocate(int size) {
|
|
|
|
return PooledByteBufAllocator.DEFAULT.directBuffer(size, size);
|
|
|
|
}
|
|
|
|
|
|
|
|
private void switchToMode(boolean write) {
|
2021-10-25 01:54:37 +02:00
|
|
|
if (toWrite.size() > 0) {
|
2021-10-25 01:41:02 +02:00
|
|
|
switchToModeUncached(true);
|
|
|
|
try {
|
2021-10-25 01:54:37 +02:00
|
|
|
toWrite.forEach((ki, v) -> {
|
|
|
|
var keyBuf = allocate(Long.BYTES);
|
|
|
|
keyBuf.writeLong(ki);
|
|
|
|
if (lmdb.put(rwTxn, keyBuf, v)) {
|
2021-10-25 01:41:02 +02:00
|
|
|
allocatedSize++;
|
|
|
|
}
|
2021-10-25 01:54:37 +02:00
|
|
|
});
|
2021-10-25 01:41:02 +02:00
|
|
|
} finally {
|
|
|
|
endMode();
|
2021-10-25 01:54:37 +02:00
|
|
|
for (ByteBuf value : toWrite.values()) {
|
|
|
|
value.release();
|
2021-10-25 01:41:02 +02:00
|
|
|
}
|
2021-10-25 01:54:37 +02:00
|
|
|
toWrite.clear();
|
2021-10-25 01:41:02 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
switchToModeUncached(write);
|
|
|
|
}
|
|
|
|
|
|
|
|
private void switchToModeUncached(boolean write) {
|
2021-10-15 22:03:53 +02:00
|
|
|
if (write) {
|
|
|
|
if (!writing) {
|
|
|
|
writing = true;
|
|
|
|
readTxn.close();
|
|
|
|
readTxn = null;
|
|
|
|
assert rwTxn == null;
|
|
|
|
rwTxn = env.txnWrite();
|
|
|
|
} else if (rwTxn == null) {
|
|
|
|
assert readTxn == null;
|
|
|
|
rwTxn = env.txnWrite();
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
if (writing) {
|
|
|
|
writing = false;
|
|
|
|
if (rwTxn != null) {
|
|
|
|
rwTxn.commit();
|
|
|
|
rwTxn.close();
|
|
|
|
rwTxn = null;
|
|
|
|
}
|
|
|
|
assert readTxn == null;
|
|
|
|
readTxn = env.txnRead();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
private void endMode() {
|
2021-10-25 00:44:30 +02:00
|
|
|
writing = true;
|
|
|
|
if (readTxn != null) {
|
|
|
|
readTxn.commit();
|
|
|
|
readTxn.close();
|
|
|
|
readTxn = null;
|
|
|
|
}
|
|
|
|
if (rwTxn != null) {
|
|
|
|
rwTxn.commit();
|
|
|
|
rwTxn.close();
|
|
|
|
rwTxn = null;
|
2021-10-15 22:03:53 +02:00
|
|
|
}
|
|
|
|
assert readTxn == null;
|
|
|
|
}
|
|
|
|
|
|
|
|
private static void ensureThread() {
|
|
|
|
LLUtils.ensureBlocking();
|
|
|
|
}
|
|
|
|
|
|
|
|
private static void ensureItThread() {
|
|
|
|
ensureThread();
|
|
|
|
//if (!(Thread.currentThread() instanceof LMDBThread)) {
|
|
|
|
// throw new IllegalStateException("Must run in LMDB scheduler");
|
|
|
|
//}
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public void set(long index, @Nullable V value) {
|
|
|
|
ensureBounds(index);
|
|
|
|
ensureThread();
|
|
|
|
var valueBuf = valueCodec.serialize(this::allocate, value);
|
2021-10-25 01:54:37 +02:00
|
|
|
if (toWrite.size() < WRITE_QUEUE_MAX_BOUND) {
|
|
|
|
var prev = toWrite.put(index, valueBuf);
|
|
|
|
if (prev != null) {
|
|
|
|
prev.release();
|
|
|
|
}
|
2021-10-25 01:41:02 +02:00
|
|
|
} else {
|
2021-10-25 01:54:37 +02:00
|
|
|
var keyBuf = allocate(Long.BYTES);
|
|
|
|
keyBuf.writeLong(index);
|
2021-10-25 01:41:02 +02:00
|
|
|
switchToMode(true);
|
|
|
|
try {
|
|
|
|
if (lmdb.put(rwTxn, keyBuf, valueBuf)) {
|
|
|
|
allocatedSize++;
|
|
|
|
}
|
|
|
|
} finally {
|
|
|
|
endMode();
|
2021-10-15 22:03:53 +02:00
|
|
|
|
2021-10-25 01:41:02 +02:00
|
|
|
keyBuf.release();
|
|
|
|
valueBuf.release();
|
|
|
|
}
|
2021-10-15 22:03:53 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public void reset(long index) {
|
|
|
|
ensureBounds(index);
|
|
|
|
ensureThread();
|
|
|
|
switchToMode(true);
|
|
|
|
var keyBuf = allocate(Long.BYTES);
|
|
|
|
keyBuf.writeLong(index);
|
|
|
|
try {
|
|
|
|
if (lmdb.delete(rwTxn, keyBuf)) {
|
|
|
|
allocatedSize--;
|
|
|
|
}
|
|
|
|
} finally {
|
|
|
|
endMode();
|
|
|
|
keyBuf.release();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public @Nullable V get(long index) {
|
|
|
|
ensureBounds(index);
|
|
|
|
ensureThread();
|
2021-10-25 01:41:02 +02:00
|
|
|
|
2021-10-25 01:54:37 +02:00
|
|
|
if (!toWrite.isEmpty()) {
|
|
|
|
var v = toWrite.get(index);
|
|
|
|
if (v != null) {
|
|
|
|
var ri = v.readerIndex();
|
|
|
|
var wi = v.writerIndex();
|
|
|
|
var c = v.capacity();
|
|
|
|
try {
|
|
|
|
return valueCodec.deserialize(v);
|
|
|
|
} finally {
|
|
|
|
v.readerIndex(ri);
|
|
|
|
v.writerIndex(wi);
|
|
|
|
v.capacity(c);
|
2021-10-25 01:41:02 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-10-15 22:03:53 +02:00
|
|
|
var keyBuf = allocate(Long.BYTES);
|
|
|
|
keyBuf.writeLong(index);
|
|
|
|
try {
|
2021-10-25 01:41:02 +02:00
|
|
|
switchToModeUncached(false);
|
2021-10-15 22:03:53 +02:00
|
|
|
var value = lmdb.get(readTxn, keyBuf);
|
|
|
|
if (value != null) {
|
|
|
|
return valueCodec.deserialize(value);
|
|
|
|
} else {
|
|
|
|
return defaultValue;
|
|
|
|
}
|
|
|
|
} finally {
|
|
|
|
endMode();
|
|
|
|
keyBuf.release();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
private void ensureBounds(long index) {
|
|
|
|
if (index < 0 || index >= virtualSize) throw new IndexOutOfBoundsException();
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public long size() {
|
|
|
|
ensureThread();
|
|
|
|
return virtualSize;
|
|
|
|
}
|
|
|
|
|
|
|
|
public long allocatedSize() {
|
|
|
|
return allocatedSize;
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public void close() throws IOException {
|
|
|
|
if (closed.compareAndSet(false, true)) {
|
2021-10-16 14:59:38 +02:00
|
|
|
ensureThread();
|
2021-10-25 01:54:37 +02:00
|
|
|
for (ByteBuf value : toWrite.values()) {
|
|
|
|
value.release();
|
2021-10-25 01:44:12 +02:00
|
|
|
}
|
2021-10-25 01:54:37 +02:00
|
|
|
toWrite.clear();
|
2021-10-16 14:59:38 +02:00
|
|
|
if (rwTxn != null) {
|
|
|
|
rwTxn.close();
|
|
|
|
}
|
|
|
|
if (readTxn != null) {
|
|
|
|
readTxn.close();
|
|
|
|
}
|
|
|
|
try (var txn = env.txnWrite()) {
|
|
|
|
lmdb.drop(txn, true);
|
|
|
|
txn.commit();
|
2021-10-15 22:03:53 +02:00
|
|
|
}
|
2021-10-16 14:59:38 +02:00
|
|
|
lmdb.close();
|
2021-10-15 22:03:53 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public String toString() {
|
|
|
|
return "lmdb_array[" + virtualSize + " (allocated=" + allocatedSize + ")]";
|
|
|
|
}
|
|
|
|
}
|