CavalliumDBEngine/src/main/java/it/cavallium/dbengine/database/disk/PessimisticRocksDBColumn.java

150 lines
5.2 KiB
Java
Raw Normal View History

2021-10-20 01:51:34 +02:00
package it.cavallium.dbengine.database.disk;
import static it.cavallium.dbengine.database.LLUtils.MARKER_ROCKSDB;
import io.micrometer.core.instrument.MeterRegistry;
2023-03-06 12:19:08 +01:00
import it.cavallium.buffer.Buf;
2021-10-20 01:51:34 +02:00
import it.cavallium.dbengine.database.LLDelta;
import it.cavallium.dbengine.database.LLUtils;
2023-05-22 19:12:05 +02:00
import it.cavallium.dbengine.database.disk.rocksdb.LLReadOptions;
import it.cavallium.dbengine.database.disk.rocksdb.LLWriteOptions;
2023-03-27 22:00:32 +02:00
import it.cavallium.dbengine.database.serialization.SerializationFunction;
import it.cavallium.dbengine.utils.DBException;
2021-10-20 01:51:34 +02:00
import java.io.IOException;
import java.util.concurrent.locks.StampedLock;
2021-10-20 01:51:34 +02:00
import org.jetbrains.annotations.NotNull;
2023-03-27 22:00:32 +02:00
import org.jetbrains.annotations.Nullable;
2021-10-20 01:51:34 +02:00
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDBException;
import org.rocksdb.Transaction;
import org.rocksdb.TransactionDB;
import org.rocksdb.TransactionOptions;
import org.rocksdb.WriteOptions;
public final class PessimisticRocksDBColumn extends AbstractRocksDBColumn<TransactionDB> {
private static final TransactionOptions DEFAULT_TX_OPTIONS = new TransactionOptions();
public PessimisticRocksDBColumn(TransactionDB db,
String dbName,
ColumnFamilyHandle cfh,
MeterRegistry meterRegistry,
StampedLock closeLock) {
super(db, dbName, cfh, meterRegistry, closeLock);
2021-10-20 01:51:34 +02:00
}
@Override
protected boolean commitOptimistically(Transaction tx) throws RocksDBException {
tx.commit();
2021-10-20 01:51:34 +02:00
return true;
}
@Override
2023-05-22 19:12:05 +02:00
protected Transaction beginTransaction(@NotNull LLWriteOptions writeOptions,
TransactionOptions txOpts) {
2023-05-22 19:12:05 +02:00
return getDb().beginTransaction(writeOptions.getUnsafe(), txOpts);
2021-10-20 01:51:34 +02:00
}
@Override
2023-05-22 19:12:05 +02:00
public @NotNull UpdateAtomicResult updateAtomicImpl(@NotNull LLReadOptions readOptions,
@NotNull LLWriteOptions writeOptions,
Buf key,
2023-03-27 22:00:32 +02:00
SerializationFunction<@Nullable Buf, @Nullable Buf> updater,
UpdateAtomicResultMode returnMode) {
long initNanoTime = System.nanoTime();
try {
var cfh = getCfh();
var keyArray = LLUtils.asArray(key);
if (LLUtils.isInNonBlockingThread()) {
throw new UnsupportedOperationException("Called update in a nonblocking thread");
}
2023-05-22 19:12:05 +02:00
try (var txOpts = new TransactionOptions()) {
try (var tx = beginTransaction(writeOptions, txOpts)) {
Buf prevData;
Buf newData;
boolean changed;
2021-12-29 00:31:35 +01:00
if (logger.isTraceEnabled()) {
2023-05-22 19:12:05 +02:00
logger.trace(MARKER_ROCKSDB, "Reading {} (before update lock)", LLUtils.toStringSafe(key));
2023-04-17 23:31:16 +02:00
}
2023-05-22 19:12:05 +02:00
var prevDataArray = tx.getForUpdate(readOptions.getUnsafe(), cfh, keyArray, true);
2023-04-17 23:31:16 +02:00
try {
if (logger.isTraceEnabled()) {
2023-05-22 19:12:05 +02:00
logger.trace(MARKER_ROCKSDB,
"Reading {}: {} (before update)",
LLUtils.toStringSafe(key),
LLUtils.toStringSafe(prevDataArray)
);
}
if (prevDataArray != null) {
readValueFoundWithoutBloomBufferSize.record(prevDataArray.length);
prevData = Buf.wrap(prevDataArray);
} else {
readValueNotFoundWithoutBloomBufferSize.record(0);
prevData = null;
2022-05-20 18:31:05 +02:00
}
2023-05-22 19:12:05 +02:00
if (prevData != null) {
prevData.freeze();
}
try {
newData = updater.apply(prevData);
} catch (Exception ex) {
throw new DBException("Failed to update key " + LLUtils.toStringSafe(key) + ". The previous value was:\n" + LLUtils.toStringSafe(prevData, 8192), ex);
}
var newDataArray = newData == null ? null : LLUtils.asArray(newData);
2022-05-20 23:59:56 +02:00
if (logger.isTraceEnabled()) {
logger.trace(MARKER_ROCKSDB,
2023-05-22 19:12:05 +02:00
"Updating {}. previous data: {}, updated data: {}",
2022-05-20 23:59:56 +02:00
LLUtils.toStringSafe(key),
2023-05-22 19:12:05 +02:00
LLUtils.toStringSafe(prevDataArray),
LLUtils.toStringSafe(newDataArray)
2022-05-20 23:59:56 +02:00
);
}
2023-05-22 19:12:05 +02:00
if (prevData != null && newData == null) {
if (logger.isTraceEnabled()) {
logger.trace(MARKER_ROCKSDB, "Deleting {} (after update)", LLUtils.toStringSafe(key));
}
writeValueBufferSize.record(0);
tx.delete(cfh, keyArray, true);
changed = true;
tx.commit();
} else if (newData != null && (prevData == null || !LLUtils.equals(prevData, newData))) {
if (logger.isTraceEnabled()) {
logger.trace(MARKER_ROCKSDB,
"Writing {}: {} (after update)",
LLUtils.toStringSafe(key),
LLUtils.toStringSafe(newData)
);
}
writeValueBufferSize.record(newDataArray.length);
tx.put(cfh, keyArray, newDataArray);
changed = true;
tx.commit();
} else {
changed = false;
tx.rollback();
}
} finally {
tx.undoGetForUpdate(cfh, keyArray);
2022-05-20 18:31:05 +02:00
}
2023-05-22 19:12:05 +02:00
recordAtomicUpdateTime(changed, prevData != null, newData != null, initNanoTime);
return switch (returnMode) {
case NOTHING -> RESULT_NOTHING;
case CURRENT -> new UpdateAtomicResultCurrent(newData);
case PREVIOUS -> new UpdateAtomicResultPrevious(prevData);
case BINARY_CHANGED -> new UpdateAtomicResultBinaryChanged(changed);
case DELTA -> new UpdateAtomicResultDelta(LLDelta.of(prevData, newData));
};
2022-05-20 23:59:56 +02:00
}
2021-10-20 01:51:34 +02:00
}
} catch (Exception ex) {
throw new DBException("Failed to update key " + LLUtils.toStringSafe(key), ex);
2021-10-20 01:51:34 +02:00
}
}
@Override
public boolean supportsTransactions() {
return true;
}
}