Optimize array
This commit is contained in:
parent
f9b656986c
commit
1899ef1723
@ -6,6 +6,10 @@ import io.net5.buffer.ByteBuf;
|
|||||||
import io.net5.buffer.PooledByteBufAllocator;
|
import io.net5.buffer.PooledByteBufAllocator;
|
||||||
import it.cavallium.dbengine.database.LLUtils;
|
import it.cavallium.dbengine.database.LLUtils;
|
||||||
import it.cavallium.dbengine.database.disk.LLTempLMDBEnv;
|
import it.cavallium.dbengine.database.disk.LLTempLMDBEnv;
|
||||||
|
import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
|
||||||
|
import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
|
||||||
|
import it.unimi.dsi.fastutil.longs.Long2ObjectMap;
|
||||||
|
import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
|
||||||
import java.io.Closeable;
|
import java.io.Closeable;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayDeque;
|
import java.util.ArrayDeque;
|
||||||
@ -33,8 +37,7 @@ public class LMDBArray<V> implements IArray<V>, Closeable {
|
|||||||
|
|
||||||
// Cache
|
// Cache
|
||||||
private static final int WRITE_QUEUE_MAX_BOUND = 10_000;
|
private static final int WRITE_QUEUE_MAX_BOUND = 10_000;
|
||||||
private final Deque<ByteBuf> toWriteKeys = new ArrayDeque<>();
|
private final Long2ObjectMap<ByteBuf> toWrite = new Long2ObjectOpenHashMap<>();
|
||||||
private final Deque<ByteBuf> toWriteValues = new ArrayDeque<>();
|
|
||||||
|
|
||||||
private long allocatedSize = 0;
|
private long allocatedSize = 0;
|
||||||
private final long virtualSize;
|
private final long virtualSize;
|
||||||
@ -61,28 +64,22 @@ public class LMDBArray<V> implements IArray<V>, Closeable {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private void switchToMode(boolean write) {
|
private void switchToMode(boolean write) {
|
||||||
if (toWriteKeys.size() > 0) {
|
if (toWrite.size() > 0) {
|
||||||
switchToModeUncached(true);
|
switchToModeUncached(true);
|
||||||
try {
|
try {
|
||||||
var ki = toWriteKeys.iterator();
|
toWrite.forEach((ki, v) -> {
|
||||||
var vi = toWriteValues.iterator();
|
var keyBuf = allocate(Long.BYTES);
|
||||||
while (ki.hasNext()) {
|
keyBuf.writeLong(ki);
|
||||||
var k = ki.next();
|
if (lmdb.put(rwTxn, keyBuf, v)) {
|
||||||
var v = vi.next();
|
|
||||||
if (lmdb.put(rwTxn, k, v)) {
|
|
||||||
allocatedSize++;
|
allocatedSize++;
|
||||||
}
|
}
|
||||||
}
|
});
|
||||||
} finally {
|
} finally {
|
||||||
endMode();
|
endMode();
|
||||||
for (ByteBuf toWriteKey : toWriteKeys) {
|
for (ByteBuf value : toWrite.values()) {
|
||||||
toWriteKey.release();
|
value.release();
|
||||||
}
|
}
|
||||||
for (ByteBuf toWriteValue : toWriteValues) {
|
toWrite.clear();
|
||||||
toWriteValue.release();
|
|
||||||
}
|
|
||||||
toWriteKeys.clear();
|
|
||||||
toWriteValues.clear();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -145,13 +142,15 @@ public class LMDBArray<V> implements IArray<V>, Closeable {
|
|||||||
public void set(long index, @Nullable V value) {
|
public void set(long index, @Nullable V value) {
|
||||||
ensureBounds(index);
|
ensureBounds(index);
|
||||||
ensureThread();
|
ensureThread();
|
||||||
var keyBuf = allocate(Long.BYTES);
|
|
||||||
var valueBuf = valueCodec.serialize(this::allocate, value);
|
var valueBuf = valueCodec.serialize(this::allocate, value);
|
||||||
keyBuf.writeLong(index);
|
if (toWrite.size() < WRITE_QUEUE_MAX_BOUND) {
|
||||||
if (toWriteKeys.size() < WRITE_QUEUE_MAX_BOUND) {
|
var prev = toWrite.put(index, valueBuf);
|
||||||
toWriteKeys.add(keyBuf);
|
if (prev != null) {
|
||||||
toWriteValues.add(valueBuf);
|
prev.release();
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
|
var keyBuf = allocate(Long.BYTES);
|
||||||
|
keyBuf.writeLong(index);
|
||||||
switchToMode(true);
|
switchToMode(true);
|
||||||
try {
|
try {
|
||||||
if (lmdb.put(rwTxn, keyBuf, valueBuf)) {
|
if (lmdb.put(rwTxn, keyBuf, valueBuf)) {
|
||||||
@ -188,24 +187,18 @@ public class LMDBArray<V> implements IArray<V>, Closeable {
|
|||||||
ensureBounds(index);
|
ensureBounds(index);
|
||||||
ensureThread();
|
ensureThread();
|
||||||
|
|
||||||
if (!toWriteKeys.isEmpty()) {
|
if (!toWrite.isEmpty()) {
|
||||||
var ki = toWriteKeys.iterator();
|
var v = toWrite.get(index);
|
||||||
var vi = toWriteValues.iterator();
|
if (v != null) {
|
||||||
while (ki.hasNext()) {
|
var ri = v.readerIndex();
|
||||||
var k = ki.next();
|
var wi = v.writerIndex();
|
||||||
var v = vi.next();
|
var c = v.capacity();
|
||||||
var readIndex = k.getLong(0);
|
try {
|
||||||
if (readIndex == index) {
|
return valueCodec.deserialize(v);
|
||||||
var ri = v.readerIndex();
|
} finally {
|
||||||
var wi = v.writerIndex();
|
v.readerIndex(ri);
|
||||||
var c = v.capacity();
|
v.writerIndex(wi);
|
||||||
try {
|
v.capacity(c);
|
||||||
return valueCodec.deserialize(v);
|
|
||||||
} finally {
|
|
||||||
v.readerIndex(ri);
|
|
||||||
v.writerIndex(wi);
|
|
||||||
v.capacity(c);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -244,14 +237,10 @@ public class LMDBArray<V> implements IArray<V>, Closeable {
|
|||||||
public void close() throws IOException {
|
public void close() throws IOException {
|
||||||
if (closed.compareAndSet(false, true)) {
|
if (closed.compareAndSet(false, true)) {
|
||||||
ensureThread();
|
ensureThread();
|
||||||
for (ByteBuf toWriteKey : toWriteKeys) {
|
for (ByteBuf value : toWrite.values()) {
|
||||||
toWriteKey.release();
|
value.release();
|
||||||
}
|
}
|
||||||
for (ByteBuf toWriteValue : toWriteValues) {
|
toWrite.clear();
|
||||||
toWriteValue.release();
|
|
||||||
}
|
|
||||||
toWriteKeys.clear();
|
|
||||||
toWriteValues.clear();
|
|
||||||
if (rwTxn != null) {
|
if (rwTxn != null) {
|
||||||
rwTxn.close();
|
rwTxn.close();
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user