Group writes together to speedup writes

This commit is contained in:
Andrea Cavalli 2021-10-25 01:41:02 +02:00
parent e48f8b25eb
commit 7a01b6a2a4
2 changed files with 137 additions and 23 deletions

View File

@ -8,6 +8,8 @@ import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.disk.LLTempLMDBEnv;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.jetbrains.annotations.Nullable;
@ -29,6 +31,11 @@ public class LMDBArray<V> implements IArray<V>, Closeable {
private Txn<ByteBuf> readTxn;
private Txn<ByteBuf> rwTxn;
// Cache
private static final int WRITE_QUEUE_MAX_BOUND = 10_000;
private final Deque<ByteBuf> toWriteKeys = new ArrayDeque<>();
private final Deque<ByteBuf> toWriteValues = new ArrayDeque<>();
private long allocatedSize = 0;
private final long virtualSize;
@ -54,6 +61,35 @@ public class LMDBArray<V> implements IArray<V>, Closeable {
}
private void switchToMode(boolean write) {
if (toWriteKeys.size() > 0) {
switchToModeUncached(true);
try {
var ki = toWriteKeys.iterator();
var vi = toWriteValues.iterator();
while (ki.hasNext()) {
var k = ki.next();
var v = vi.next();
if (lmdb.put(rwTxn, k, v)) {
allocatedSize++;
}
}
} finally {
endMode();
for (ByteBuf toWriteKey : toWriteKeys) {
toWriteKey.release();
}
for (ByteBuf toWriteValue : toWriteValues) {
toWriteValue.release();
}
toWriteKeys.clear();
toWriteValues.clear();
}
}
switchToModeUncached(write);
}
private void switchToModeUncached(boolean write) {
if (write) {
if (!writing) {
writing = true;
@ -109,10 +145,14 @@ public class LMDBArray<V> implements IArray<V>, Closeable {
public void set(long index, @Nullable V value) {
ensureBounds(index);
ensureThread();
switchToMode(true);
var keyBuf = allocate(Long.BYTES);
var valueBuf = valueCodec.serialize(this::allocate, value);
keyBuf.writeLong(index);
if (toWriteKeys.size() < WRITE_QUEUE_MAX_BOUND) {
toWriteKeys.add(keyBuf);
toWriteValues.add(valueBuf);
} else {
switchToMode(true);
try {
if (lmdb.put(rwTxn, keyBuf, valueBuf)) {
allocatedSize++;
@ -124,6 +164,7 @@ public class LMDBArray<V> implements IArray<V>, Closeable {
valueBuf.release();
}
}
}
@Override
public void reset(long index) {
@ -146,10 +187,33 @@ public class LMDBArray<V> implements IArray<V>, Closeable {
public @Nullable V get(long index) {
ensureBounds(index);
ensureThread();
switchToMode(false);
if (!toWriteKeys.isEmpty()) {
var ki = toWriteKeys.iterator();
var vi = toWriteValues.iterator();
while (ki.hasNext()) {
var k = ki.next();
var v = vi.next();
var readIndex = k.getLong(0);
if (readIndex == index) {
var ri = v.readerIndex();
var wi = v.writerIndex();
var c = v.capacity();
try {
return valueCodec.deserialize(v);
} finally {
v.readerIndex(ri);
v.writerIndex(wi);
v.capacity(c);
}
}
}
}
var keyBuf = allocate(Long.BYTES);
keyBuf.writeLong(index);
try {
switchToModeUncached(false);
var value = lmdb.get(readTxn, keyBuf);
if (value != null) {
return valueCodec.deserialize(value);

View File

@ -7,8 +7,11 @@ import io.net5.buffer.PooledByteBufAllocator;
import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.disk.LLTempLMDBEnv;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.Iterator;
import java.util.Objects;
import java.util.Queue;
import java.util.StringJoiner;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
@ -41,6 +44,11 @@ public class LMDBPriorityQueue<T> implements PriorityQueue<T>, Reversable<Revers
private Txn<ByteBuf> rwTxn;
private Cursor<ByteBuf> cur;
// Cache
private static final int WRITE_QUEUE_MAX_BOUND = 10_000;
private final Deque<ByteBuf> toWriteKeys = new ArrayDeque<>();
private final Deque<ByteBuf> toWriteValues = new ArrayDeque<>();
private boolean topValid = true;
private T top = null;
private long size = 0;
@ -67,6 +75,36 @@ public class LMDBPriorityQueue<T> implements PriorityQueue<T>, Reversable<Revers
}
private void switchToMode(boolean write, boolean wantCursor) {
if (iterating) {
throw new IllegalStateException("Tried to " + (write ? "write" : "read") + " while still iterating");
}
if (toWriteKeys.size() > 0) {
switchToModeUncached(true, false);
try {
var ki = toWriteKeys.iterator();
var vi = toWriteValues.iterator();
while (ki.hasNext()) {
var k = ki.next();
var v = vi.next();
lmdb.put(rwTxn, k, v);
}
} finally {
endMode();
for (ByteBuf toWriteKey : toWriteKeys) {
toWriteKey.release();
}
for (ByteBuf toWriteValue : toWriteValues) {
toWriteValue.release();
}
toWriteKeys.clear();
toWriteValues.clear();
}
}
switchToModeUncached(write, wantCursor);
}
private void switchToModeUncached(boolean write, boolean wantCursor) {
if (iterating) {
throw new IllegalStateException("Tried to " + (write ? "write" : "read") + " while still iterating");
}
@ -151,10 +189,21 @@ public class LMDBPriorityQueue<T> implements PriorityQueue<T>, Reversable<Revers
@Override
public void add(T element) {
ensureThread();
switchToMode(true, false);
var buf = codec.serialize(this::allocate, element);
var uid = allocate(Long.BYTES);
uid.writeLong(NEXT_ITEM_UID.getAndIncrement());
if (toWriteKeys.size() < WRITE_QUEUE_MAX_BOUND) {
toWriteKeys.add(buf);
toWriteValues.add(uid);
if (++size == 1) {
topValid = true;
top = element;
} else {
topValid = false;
}
} else {
switchToMode(true, false);
try {
if (lmdb.put(rwTxn, buf, uid)) {
if (++size == 1) {
@ -169,6 +218,7 @@ public class LMDBPriorityQueue<T> implements PriorityQueue<T>, Reversable<Revers
buf.release();
uid.release();
}
}
assert topSingleValid(element);
}