Optimistic updates

This commit is contained in:
Andrea Cavalli 2021-10-17 19:52:43 +02:00
parent df2b757fdd
commit aad5f8c96c
4 changed files with 363 additions and 626 deletions

View File

@ -74,9 +74,9 @@ public class LLDelta extends ResourceSupport<LLDelta, LLDelta> {
return true;
}
public static LLDelta of(Send<Buffer> min, Send<Buffer> max) {
assert (min == null && max == null) || (min != max);
return new LLDelta(min, max, null);
public static LLDelta of(Send<Buffer> previous, Send<Buffer> current) {
assert (previous == null && current == null) || (previous != current);
return new LLDelta(previous, current, null);
}
public Send<Buffer> previous() {

View File

@ -7,6 +7,7 @@ import static java.util.Objects.requireNonNull;
import io.net5.buffer.api.Buffer;
import io.net5.buffer.api.BufferAllocator;
import io.net5.buffer.api.MemoryManager;
import io.net5.buffer.api.Resource;
import io.net5.buffer.api.Send;
import io.net5.buffer.api.internal.ResourceSupport;
@ -40,6 +41,7 @@ import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.locks.LockSupport;
import java.util.concurrent.locks.StampedLock;
import java.util.function.Function;
import java.util.stream.Collectors;
@ -54,12 +56,18 @@ import org.rocksdb.CompactRangeOptions;
import org.rocksdb.DirectSlice;
import org.rocksdb.FlushOptions;
import org.rocksdb.Holder;
import org.rocksdb.OptimisticTransactionDB;
import org.rocksdb.OptimisticTransactionOptions;
import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.RocksIterator;
import org.rocksdb.Slice;
import org.rocksdb.Snapshot;
import org.rocksdb.Status;
import org.rocksdb.Status.Code;
import org.rocksdb.Transaction;
import org.rocksdb.TransactionDB;
import org.rocksdb.WriteBatch;
import org.rocksdb.WriteOptions;
import org.warp.commonutils.concurrency.atomicity.NotAtomic;
@ -88,6 +96,7 @@ public class LLLocalDictionary implements LLDictionary {
static final ReadOptions EMPTY_READ_OPTIONS = new UnreleasableReadOptions(new UnmodifiableReadOptions());
static final WriteOptions EMPTY_WRITE_OPTIONS = new UnreleasableWriteOptions(new UnmodifiableWriteOptions());
static final WriteOptions BATCH_WRITE_OPTIONS = new UnreleasableWriteOptions(new UnmodifiableWriteOptions());
static final OptimisticTransactionOptions DEFAULT_OPTIMISTIC_TX_OPTIONS = new OptimisticTransactionOptions();
static final boolean PREFER_SEEK_TO_FIRST = false;
/**
* It used to be false,
@ -131,6 +140,7 @@ public class LLLocalDictionary implements LLDictionary {
* 1KiB dummy buffer, write only, used for debugging purposes
*/
private static final ByteBuffer DUMMY_WRITE_ONLY_BYTE_BUFFER = ByteBuffer.allocateDirect(1024);
private static final BufferAllocator BYTE_ARRAY_BUFFERS = BufferAllocator.onHeapPooled();
static {
boolean assertionsEnabled = false;
@ -140,13 +150,12 @@ public class LLLocalDictionary implements LLDictionary {
ASSERTIONS_ENABLED = assertionsEnabled;
}
private final RocksDB db;
private final OptimisticTransactionDB db;
private final ColumnFamilyHandle cfh;
private final String databaseName;
private final String columnName;
private final Scheduler dbScheduler;
private final Function<LLSnapshot, Snapshot> snapshotResolver;
private final Striped<StampedLock> itemsLock = Striped.readWriteStampedLock(STRIPES);
private final UpdateMode updateMode;
private final BufferAllocator alloc;
private final String getRangeMultiDebugName;
@ -155,7 +164,7 @@ public class LLLocalDictionary implements LLDictionary {
public LLLocalDictionary(
BufferAllocator allocator,
@NotNull RocksDB db,
@NotNull OptimisticTransactionDB db,
@NotNull ColumnFamilyHandle columnFamilyHandle,
String databaseName,
String columnName,
@ -255,17 +264,6 @@ public class LLLocalDictionary implements LLDictionary {
return Mono.usingWhen(keyMono,
keySend -> runOnDb(() -> {
try (var key = keySend.receive()) {
try {
StampedLock lock;
long stamp;
if (updateMode == UpdateMode.ALLOW) {
lock = itemsLock.getAt(getLockIndex(key));
stamp = lock.readLock();
} else {
lock = null;
stamp = 0;
}
try {
Buffer logKey;
if (logger.isTraceEnabled(MARKER_ROCKSDB)) {
@ -284,11 +282,6 @@ public class LLLocalDictionary implements LLDictionary {
return result;
}
}
} finally {
if (updateMode == UpdateMode.ALLOW) {
lock.unlockRead(stamp);
}
}
} catch (Exception ex) {
throw new IOException("Failed to read " + LLUtils.toStringSafe(key), ex);
}
@ -556,17 +549,6 @@ public class LLLocalDictionary implements LLDictionary {
if (Schedulers.isInNonBlockingThread()) {
throw new UnsupportedOperationException("Called containsKey in a nonblocking thread");
}
StampedLock lock;
long stamp;
if (updateMode == UpdateMode.ALLOW) {
lock = itemsLock.getAt(getLockIndex(key));
stamp = lock.readLock();
} else {
lock = null;
stamp = 0;
}
try {
int size = RocksDB.NOT_FOUND;
byte[] keyBytes = LLUtils.toArray(key);
Holder<byte[]> data = new Holder<>();
@ -585,11 +567,6 @@ public class LLLocalDictionary implements LLDictionary {
}
}
return size != RocksDB.NOT_FOUND;
} finally {
if (updateMode == UpdateMode.ALLOW) {
lock.unlockRead(stamp);
}
}
}
}).onErrorMap(cause -> new IOException("Failed to read", cause)),
keySend -> Mono.fromRunnable(keySend::close)
@ -609,28 +586,12 @@ public class LLLocalDictionary implements LLDictionary {
try (var value = valueSend.receive()) {
assert key.isAccessible();
assert value.isAccessible();
StampedLock lock;
long stamp;
if (updateMode == UpdateMode.ALLOW) {
lock = itemsLock.getAt(getLockIndex(key));
stamp = lock.writeLock();
} else {
lock = null;
stamp = 0;
}
try {
if (logger.isTraceEnabled()) {
logger.trace(MARKER_ROCKSDB, "Writing {}: {}",
LLUtils.toStringSafe(key), LLUtils.toStringSafe(value));
}
dbPut(cfh, null, key.send(), value.send());
return null;
} finally {
if (updateMode == UpdateMode.ALLOW) {
lock.unlockWrite(stamp);
}
}
}
}
}),
@ -653,61 +614,34 @@ public class LLLocalDictionary implements LLDictionary {
SerializationFunction<@Nullable Send<Buffer>, @Nullable Send<Buffer>> updater,
UpdateReturnMode updateReturnMode,
boolean existsAlmostCertainly) {
return Mono.usingWhen(keyMono,
keySend -> runOnDb(() -> {
try (var key = keySend.receive()) {
return Mono.usingWhen(keyMono, keySend -> runOnDb(() -> {
try (Buffer key = keySend.receive()) {
var keyArray = LLUtils.toArray(key);
if (Schedulers.isInNonBlockingThread()) {
throw new UnsupportedOperationException("Called update in a nonblocking thread");
}
if (updateMode == UpdateMode.DISALLOW) {
throw new UnsupportedOperationException("update() is disallowed");
}
StampedLock lock;
long stamp;
if (updateMode == UpdateMode.ALLOW) {
lock = itemsLock.getAt(getLockIndex(key));
try (var tx = db.beginTransaction(EMPTY_WRITE_OPTIONS, DEFAULT_OPTIMISTIC_TX_OPTIONS)) {
var prevDataArray = tx.getForUpdate(EMPTY_READ_OPTIONS, cfh, keyArray, true);
Buffer prevData;
if (prevDataArray != null) {
prevData = BYTE_ARRAY_BUFFERS.allocate(prevDataArray.length);
prevData.writeBytes(prevDataArray);
} else {
prevData = null;
}
try (prevData) {
Buffer prevDataToSendToUpdater;
if (prevData != null) {
prevDataToSendToUpdater = prevData.copy();
} else {
prevDataToSendToUpdater = null;
}
stamp = lock.readLock();
} else {
lock = null;
stamp = 0;
}
try {
while (true) {
@Nullable Buffer prevData;
var prevDataHolder = existsAlmostCertainly ? null : new Holder<byte[]>();
if (existsAlmostCertainly
|| db.keyMayExist(cfh, LLUtils.toArray(key), prevDataHolder)) {
if (!existsAlmostCertainly && prevDataHolder.getValue() != null) {
byte @Nullable [] prevDataBytes = prevDataHolder.getValue();
if (prevDataBytes != null) {
prevData = LLUtils.fromByteArray(alloc, prevDataBytes);
} else {
prevData = null;
}
} else {
var obtainedPrevData = dbGet(cfh, null, key.copy().send(), existsAlmostCertainly);
if (obtainedPrevData == null) {
prevData = null;
} else {
prevData = obtainedPrevData.receive();
}
}
} else {
prevData = null;
}
if (logger.isTraceEnabled()) {
logger.trace(MARKER_ROCKSDB,
"Reading {}: {} (before update)",
LLUtils.toStringSafe(key),
LLUtils.toStringSafe(prevData)
);
}
try {
@Nullable Buffer newData;
try (Buffer prevDataToSendToUpdater = prevData == null ? null : prevData.copy()) {
try (var sentData = prevDataToSendToUpdater == null ? null
: prevDataToSendToUpdater.send()) {
try (var sentData = prevDataToSendToUpdater == null ? null : prevDataToSendToUpdater.send()) {
try (var newDataToReceive = updater.apply(sentData)) {
if (newDataToReceive != null) {
newData = newDataToReceive.receive();
@ -716,48 +650,15 @@ public class LLLocalDictionary implements LLDictionary {
}
}
}
}
assert newData == null || newData.isAccessible();
try {
if (logger.isTraceEnabled()) {
logger.trace(MARKER_ROCKSDB,
"Updating {}. previous data: {}, updated data: {}",
LLUtils.toStringSafe(key),
LLUtils.toStringSafe(prevData),
LLUtils.toStringSafe(newData)
);
}
try (newData) {
var newDataArray = newData == null ? null : LLUtils.toArray(newData);
if (prevData != null && newData == null) {
//noinspection DuplicatedCode
if (updateMode == UpdateMode.ALLOW) {
var ws = lock.tryConvertToWriteLock(stamp);
if (ws != 0) {
stamp = ws;
} else {
lock.unlockRead(stamp);
stamp = lock.writeLock();
continue;
}
}
if (logger.isTraceEnabled()) {
logger.trace(MARKER_ROCKSDB, "Deleting {} (after update)", LLUtils.toStringSafe(key));
}
dbDelete(cfh, null, key.send());
} else if (newData != null
&& (prevData == null || !LLUtils.equals(prevData, newData))) {
//noinspection DuplicatedCode
if (updateMode == UpdateMode.ALLOW) {
var ws = lock.tryConvertToWriteLock(stamp);
if (ws != 0) {
stamp = ws;
} else {
lock.unlockRead(stamp);
stamp = lock.writeLock();
continue;
}
}
tx.delete(cfh, keyArray, true);
commitOptimistically(tx);
} else if (newData != null && (prevData == null || !LLUtils.equals(prevData, newData))) {
if (logger.isTraceEnabled()) {
logger.trace(MARKER_ROCKSDB,
"Writing {}: {} (after update)",
@ -765,19 +666,10 @@ public class LLLocalDictionary implements LLDictionary {
LLUtils.toStringSafe(newData)
);
}
Buffer dataToPut;
if (updateReturnMode == UpdateReturnMode.GET_NEW_VALUE) {
dataToPut = newData.copy();
tx.put(cfh, keyArray, newDataArray);
commitOptimistically(tx);
} else {
dataToPut = newData;
}
try {
dbPut(cfh, null, key.send(), dataToPut.send());
} finally {
if (dataToPut != newData) {
dataToPut.close();
}
}
tx.rollback();
}
return switch (updateReturnMode) {
case GET_NEW_VALUE -> newData != null ? newData.send() : null;
@ -786,89 +678,64 @@ public class LLLocalDictionary implements LLDictionary {
//noinspection UnnecessaryDefault
default -> throw new IllegalArgumentException();
};
} finally {
if (newData != null) {
newData.close();
}
}
} finally {
if (prevData != null) {
prevData.close();
}
}
}
} finally {
if (updateMode == UpdateMode.ALLOW) {
lock.unlock(stamp);
}
}
}
}).onErrorMap(cause -> new IOException("Failed to read or write", cause)),
keySend -> Mono.fromRunnable(keySend::close)
);
keySend -> Mono.fromRunnable(keySend::close));
}
private void commitOptimistically(Transaction tx) throws RocksDBException {
Code commitStatusCode = null;
do {
try {
tx.commit();
} catch (RocksDBException ex) {
if (ex.getStatus() != null && ex.getStatus().getCode() == Code.TryAgain) {
commitStatusCode = Code.TryAgain;
// Park for maximum 5ms
LockSupport.parkNanos(5000000);
} else {
throw ex;
}
}
} while (commitStatusCode == Code.TryAgain);
}
// Remember to change also update() if you are modifying this function
@SuppressWarnings("DuplicatedCode")
@Override
public Mono<Send<LLDelta>> updateAndGetDelta(Mono<Send<Buffer>> keyMono,
SerializationFunction<@Nullable Send<Buffer>, @Nullable Send<Buffer>> updater,
boolean existsAlmostCertainly) {
return Mono.usingWhen(keyMono,
keySend -> runOnDb(() -> {
try (var key = keySend.receive()) {
return Mono.usingWhen(keyMono, keySend -> runOnDb(() -> {
try (Buffer key = keySend.receive()) {
var keyArray = LLUtils.toArray(key);
if (Schedulers.isInNonBlockingThread()) {
throw new UnsupportedOperationException("Called update in a nonblocking thread");
}
if (updateMode == UpdateMode.DISALLOW) {
throw new UnsupportedOperationException("update() is disallowed");
}
StampedLock lock;
long stamp;
if (updateMode == UpdateMode.ALLOW) {
lock = itemsLock.getAt(getLockIndex(key));
try (var tx = db.beginTransaction(EMPTY_WRITE_OPTIONS, DEFAULT_OPTIMISTIC_TX_OPTIONS)) {
var prevDataArray = tx.getForUpdate(EMPTY_READ_OPTIONS, cfh, keyArray, true);
Buffer prevData;
if (prevDataArray != null) {
prevData = BYTE_ARRAY_BUFFERS.allocate(prevDataArray.length);
prevData.writeBytes(prevDataArray);
} else {
prevData = null;
}
try (prevData) {
Buffer prevDataToSendToUpdater;
if (prevData != null) {
prevDataToSendToUpdater = prevData.copy();
} else {
prevDataToSendToUpdater = null;
}
stamp = lock.readLock();
} else {
lock = null;
stamp = 0;
}
try {
while (true) {
@Nullable Buffer prevData;
var prevDataHolder = existsAlmostCertainly ? null : new Holder<byte[]>();
if (existsAlmostCertainly
|| db.keyMayExist(cfh, LLUtils.toArray(key), prevDataHolder)) {
if (!existsAlmostCertainly && prevDataHolder.getValue() != null) {
byte @Nullable [] prevDataBytes = prevDataHolder.getValue();
if (prevDataBytes != null) {
prevData = LLUtils.fromByteArray(alloc, prevDataBytes);
} else {
prevData = null;
}
} else {
var obtainedPrevData = dbGet(cfh, null, key.copy().send(), existsAlmostCertainly);
if (obtainedPrevData == null) {
prevData = null;
} else {
prevData = obtainedPrevData.receive();
}
}
} else {
prevData = null;
}
if (logger.isTraceEnabled()) {
logger.trace(MARKER_ROCKSDB,
"Reading {}: {} (before update)",
LLUtils.toStringSafe(key),
LLUtils.toStringSafe(prevData)
);
}
try {
@Nullable Buffer newData;
try (Buffer prevDataToSendToUpdater = prevData == null ? null : prevData.copy()) {
try (var sentData = prevDataToSendToUpdater == null ? null
: prevDataToSendToUpdater.send()) {
try (var sentData = prevDataToSendToUpdater == null ? null : prevDataToSendToUpdater.send()) {
try (var newDataToReceive = updater.apply(sentData)) {
if (newDataToReceive != null) {
newData = newDataToReceive.receive();
@ -877,48 +744,15 @@ public class LLLocalDictionary implements LLDictionary {
}
}
}
}
assert newData == null || newData.isAccessible();
try {
if (logger.isTraceEnabled()) {
logger.trace(MARKER_ROCKSDB,
"Updating {}. previous data: {}, updated data: {}",
LLUtils.toStringSafe(key),
LLUtils.toStringSafe(prevData),
LLUtils.toStringSafe(newData)
);
}
try (newData) {
var newDataArray = newData == null ? null : LLUtils.toArray(newData);
if (prevData != null && newData == null) {
//noinspection DuplicatedCode
if (updateMode == UpdateMode.ALLOW) {
var ws = lock.tryConvertToWriteLock(stamp);
if (ws != 0) {
stamp = ws;
} else {
lock.unlockRead(stamp);
stamp = lock.writeLock();
continue;
}
}
if (logger.isTraceEnabled()) {
logger.trace(MARKER_ROCKSDB, "Deleting {} (after update)", LLUtils.toStringSafe(key));
}
dbDelete(cfh, null, key.send());
} else if (newData != null
&& (prevData == null || !LLUtils.equals(prevData, newData))) {
//noinspection DuplicatedCode
if (updateMode == UpdateMode.ALLOW) {
var ws = lock.tryConvertToWriteLock(stamp);
if (ws != 0) {
stamp = ws;
} else {
lock.unlockRead(stamp);
stamp = lock.writeLock();
continue;
}
}
tx.delete(cfh, keyArray, true);
commitOptimistically(tx);
} else if (newData != null && (prevData == null || !LLUtils.equals(prevData, newData))) {
if (logger.isTraceEnabled()) {
logger.trace(MARKER_ROCKSDB,
"Writing {}: {} (after update)",
@ -926,38 +760,20 @@ public class LLLocalDictionary implements LLDictionary {
LLUtils.toStringSafe(newData)
);
}
assert key.isAccessible();
assert newData.isAccessible();
dbPut(cfh, null, key.send(), newData.copy().send());
tx.put(cfh, keyArray, newDataArray);
commitOptimistically(tx);
} else {
tx.rollback();
}
if (newData == prevData && newData != null) {
newData = newData.copy();
return LLDelta
.of(prevData != null ? prevData.send() : null, newData != null ? newData.send() : null)
.send();
}
assert (prevData == null && newData == null) || newData != prevData;
return LLDelta.of(
prevData != null ? prevData.send() : null,
newData != null ? newData.send() : null
).send();
} finally {
if (newData != null) {
newData.close();
}
}
} finally {
if (prevData != null) {
prevData.close();
}
}
}
} finally {
if (updateMode == UpdateMode.ALLOW) {
lock.unlock(stamp);
}
}
}
}).onErrorMap(cause -> new IOException("Failed to read or write", cause)),
keySend -> Mono.fromRunnable(keySend::close)
);
keySend -> Mono.fromRunnable(keySend::close));
}
private void dbDelete(ColumnFamilyHandle cfh, @Nullable WriteOptions writeOptions, Send<Buffer> keyToReceive)
@ -989,27 +805,11 @@ public class LLLocalDictionary implements LLDictionary {
.concatWith(this
.<Send<Buffer>>runOnDb(() -> {
try (var key = keySend.receive()) {
StampedLock lock;
long stamp;
if (updateMode == UpdateMode.ALLOW) {
lock = itemsLock.getAt(getLockIndex(key));
stamp = lock.writeLock();
} else {
lock = null;
stamp = 0;
}
try {
if (logger.isTraceEnabled()) {
logger.trace(MARKER_ROCKSDB, "Deleting {}", LLUtils.toStringSafe(key));
}
dbDelete(cfh, null, key.send());
return null;
} finally {
if (updateMode == UpdateMode.ALLOW) {
lock.unlockWrite(stamp);
}
}
}
})
.onErrorMap(cause -> new IOException("Failed to delete", cause))
@ -1033,17 +833,6 @@ public class LLLocalDictionary implements LLDictionary {
if (Schedulers.isInNonBlockingThread()) {
throw new UnsupportedOperationException("Called getPreviousData in a nonblocking thread");
}
StampedLock lock;
long stamp;
if (updateMode == UpdateMode.ALLOW) {
lock = itemsLock.getAt(getLockIndex(key));
stamp = lock.readLock();
} else {
lock = null;
stamp = 0;
}
try {
var data = new Holder<byte[]>();
Buffer bufferResult;
if (db.keyMayExist(cfh, LLUtils.toArray(key), data)) {
@ -1063,11 +852,6 @@ public class LLLocalDictionary implements LLDictionary {
}
return bufferResult == null ? null : bufferResult.send();
}
} finally {
if (updateMode == UpdateMode.ALLOW) {
lock.unlockRead(stamp);
}
}
}
})
.onErrorMap(cause -> new IOException("Failed to read ", cause)),
@ -1109,20 +893,6 @@ public class LLLocalDictionary implements LLDictionary {
if (Schedulers.isInNonBlockingThread()) {
throw new UnsupportedOperationException("Called getMulti in a nonblocking thread");
}
Iterable<StampedLock> locks;
ArrayList<Long> stamps;
if (updateMode == UpdateMode.ALLOW) {
locks = itemsLock.bulkGetAt(getLockIndices(keyBufsWindow));
stamps = new ArrayList<>();
for (var lock : locks) {
stamps.add(lock.readLock());
}
} else {
locks = null;
stamps = null;
}
try {
var columnFamilyHandles = new RepeatedElementList<>(cfh, keysWindow.size());
List<byte[]> results = db.multiGetAsList(resolveSnapshot(snapshot),
columnFamilyHandles, LLUtils.toArray(keyBufsWindow));
@ -1142,15 +912,6 @@ public class LLLocalDictionary implements LLDictionary {
));
}
return mappedResults;
} finally {
if (updateMode == UpdateMode.ALLOW) {
int index = 0;
for (var lock : locks) {
lock.unlockRead(stamps.get(index));
index++;
}
}
}
} finally {
for (Buffer buffer : keyBufsWindow) {
buffer.close();
@ -1188,19 +949,6 @@ public class LLLocalDictionary implements LLDictionary {
if (Schedulers.isInNonBlockingThread()) {
throw new UnsupportedOperationException("Called putMulti in a nonblocking thread");
}
Iterable<StampedLock> locks;
ArrayList<Long> stamps;
if (updateMode == UpdateMode.ALLOW) {
locks = itemsLock.bulkGetAt(getLockIndicesEntries(entriesWindow));
stamps = new ArrayList<>();
for (var lock : locks) {
stamps.add(lock.writeLock());
}
} else {
locks = null;
stamps = null;
}
try {
ArrayList<Send<LLEntry>> oldValues;
if (getOldValues) {
oldValues = new ArrayList<>(entriesWindow.size());
@ -1258,15 +1006,6 @@ public class LLLocalDictionary implements LLDictionary {
}
}
return oldValues;
} finally {
if (updateMode == UpdateMode.ALLOW) {
int index = 0;
for (var lock : locks) {
lock.unlockWrite(stamps.get(index));
index++;
}
}
}
} finally {
for (LLEntry llEntry : entriesWindow) {
llEntry.close();
@ -1295,20 +1034,6 @@ public class LLLocalDictionary implements LLDictionary {
for (Tuple2<Buffer, X> objects : entriesWindow) {
keyBufsWindow.add(objects.getT1());
}
Iterable<StampedLock> locks;
ArrayList<Long> stamps;
if (updateMode == UpdateMode.ALLOW) {
locks = itemsLock.bulkGetAt(getLockIndicesWithExtra(entriesWindow));
stamps = new ArrayList<>();
for (var lock : locks) {
stamps.add(lock.writeLock());
}
} else {
locks = null;
stamps = null;
}
try {
var columnFamilyHandles = new RepeatedElementList<>(cfh, entriesWindow.size());
ArrayList<Tuple3<Send<Buffer>, X, Optional<Send<Buffer>>>> mappedInputs;
{
@ -1392,15 +1117,6 @@ public class LLLocalDictionary implements LLDictionary {
}
}
return valueChangedResult;
} finally {
if (updateMode == UpdateMode.ALLOW) {
int index = 0;
for (var lock : locks) {
lock.unlockWrite(stamps.get(index));
index++;
}
}
}
} finally {
for (Tuple2<Buffer, X> tuple : entriesWindow) {
tuple.getT1().close();
@ -2026,6 +1742,7 @@ public class LLLocalDictionary implements LLDictionary {
if (Schedulers.isInNonBlockingThread()) {
throw new UnsupportedOperationException("Called clear in a nonblocking thread");
}
boolean shouldCompactLater = false;
try (var readOpts = new ReadOptions(getReadOptions(null))) {
readOpts.setVerifyChecksums(VERIFY_CHECKSUMS_WHEN_NOT_NEEDED);
@ -2043,6 +1760,16 @@ public class LLLocalDictionary implements LLDictionary {
byte[] firstDeletedKey = null;
byte[] lastDeletedKey = null;
try (RocksIterator rocksIterator = db.newIterator(cfh, readOpts)) {
//noinspection ConstantConditions
if (db instanceof OptimisticTransactionDB) {
rocksIterator.seekToFirst();
rocksIterator.status();
while (rocksIterator.isValid()) {
writeBatch.delete(cfh, rocksIterator.key());
rocksIterator.next();
rocksIterator.status();
}
} else {
rocksIterator.seekToLast();
rocksIterator.status();
@ -2051,12 +1778,15 @@ public class LLLocalDictionary implements LLDictionary {
lastDeletedKey = rocksIterator.key();
writeBatch.deleteRange(cfh, FIRST_KEY, rocksIterator.key());
writeBatch.delete(cfh, rocksIterator.key());
shouldCompactLater = true;
}
}
}
writeBatch.writeToDbAndClose();
if (shouldCompactLater) {
// Compact range
db.suggestCompactRange(cfh);
if (firstDeletedKey != null && lastDeletedKey != null) {
@ -2069,6 +1799,7 @@ public class LLLocalDictionary implements LLDictionary {
.setChangeLevel(false)
);
}
}
db.flush(new FlushOptions().setWaitForFlush(true).setAllowWriteStall(true), cfh);
db.flushWal(true);

View File

@ -44,11 +44,14 @@ import org.rocksdb.DBOptions;
import org.rocksdb.DbPath;
import org.rocksdb.FlushOptions;
import org.rocksdb.IndexType;
import org.rocksdb.MergeOperator;
import org.rocksdb.OptimisticTransactionDB;
import org.rocksdb.Options;
import org.rocksdb.RateLimiter;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.Snapshot;
import org.rocksdb.TransactionDB;
import org.rocksdb.WALRecoveryMode;
import org.rocksdb.WriteBufferManager;
import org.warp.commonutils.log.Logger;
@ -77,7 +80,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
private final DatabaseOptions databaseOptions;
private final boolean enableColumnsBug;
private RocksDB db;
private OptimisticTransactionDB db;
private final Map<Column, ColumnFamilyHandle> handles;
private final ConcurrentHashMap<Long, Snapshot> snapshotsHandles = new ConcurrentHashMap<>();
private final AtomicLong nextSnapshotNumbers = new AtomicLong(1);
@ -146,7 +149,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
while (true) {
try {
// a factory method that returns a RocksDB instance
this.db = RocksDB.open(new DBOptions(rocksdbOptions),
this.db = OptimisticTransactionDB.open(new DBOptions(rocksdbOptions),
dbPathString,
descriptors,
handles
@ -194,7 +197,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
return name;
}
private void flushAndCloseDb(RocksDB db, List<ColumnFamilyHandle> handles)
private void flushAndCloseDb(OptimisticTransactionDB db, List<ColumnFamilyHandle> handles)
throws RocksDBException {
flushDb(db, handles);
@ -223,7 +226,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
}
}
private void flushDb(RocksDB db, List<ColumnFamilyHandle> handles) throws RocksDBException {
private void flushDb(OptimisticTransactionDB db, List<ColumnFamilyHandle> handles) throws RocksDBException {
if (Schedulers.isInNonBlockingThread()) {
logger.error("Called flushDb in a nonblocking thread");
}
@ -240,7 +243,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
}
@SuppressWarnings("unused")
private void compactDb(RocksDB db, List<ColumnFamilyHandle> handles) {
private void compactDb(OptimisticTransactionDB db, List<ColumnFamilyHandle> handles) {
if (Schedulers.isInNonBlockingThread()) {
logger.error("Called compactDb in a nonblocking thread");
}
@ -452,7 +455,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
LinkedList<ColumnFamilyHandle> handles = new LinkedList<>();
this.db = RocksDB.open(options, dbPathString);
this.db = OptimisticTransactionDB.open(options, dbPathString);
for (ColumnFamilyDescriptor columnFamilyDescriptor : descriptorsToCreate) {
handles.add(db.createColumnFamily(columnFamilyDescriptor));
}

View File

@ -48,11 +48,14 @@ public class CappedWriteBatch extends WriteBatch {
private synchronized void flushIfNeeded(boolean force) throws RocksDBException {
if (this.count() >= (force ? 1 : cap)) {
try {
db.write(writeOptions, this.getWriteBatch());
this.clear();
} finally {
releaseAllBuffers();
}
}
}
private synchronized void releaseAllBuffers() {
if (!buffersToRelease.isEmpty()) {