2021-10-20 01:51:34 +02:00
|
|
|
package it.cavallium.dbengine.database.disk;
|
|
|
|
|
|
|
|
import static it.cavallium.dbengine.database.LLUtils.MARKER_ROCKSDB;
|
|
|
|
|
2021-10-30 11:13:46 +02:00
|
|
|
import io.micrometer.core.instrument.MeterRegistry;
|
2021-10-20 01:51:34 +02:00
|
|
|
import io.net5.buffer.api.Buffer;
|
|
|
|
import io.net5.buffer.api.BufferAllocator;
|
|
|
|
import io.net5.buffer.api.Send;
|
|
|
|
import it.cavallium.dbengine.client.DatabaseOptions;
|
|
|
|
import it.cavallium.dbengine.database.LLDelta;
|
|
|
|
import it.cavallium.dbengine.database.LLUtils;
|
|
|
|
import it.cavallium.dbengine.database.serialization.SerializationFunction;
|
|
|
|
import java.io.IOException;
|
|
|
|
import org.jetbrains.annotations.NotNull;
|
|
|
|
import org.jetbrains.annotations.Nullable;
|
|
|
|
import org.rocksdb.ColumnFamilyHandle;
|
|
|
|
import org.rocksdb.Holder;
|
|
|
|
import org.rocksdb.ReadOptions;
|
|
|
|
import org.rocksdb.RocksDB;
|
|
|
|
import org.rocksdb.RocksDBException;
|
|
|
|
import org.rocksdb.Transaction;
|
|
|
|
import org.rocksdb.WriteOptions;
|
|
|
|
|
|
|
|
public final class StandardRocksDBColumn extends AbstractRocksDBColumn<RocksDB> {
|
|
|
|
|
|
|
|
public StandardRocksDBColumn(RocksDB db,
|
|
|
|
DatabaseOptions databaseOptions,
|
|
|
|
BufferAllocator alloc,
|
2021-10-30 11:13:46 +02:00
|
|
|
ColumnFamilyHandle cfh, MeterRegistry meterRegistry) {
|
|
|
|
super(db, databaseOptions, alloc, cfh, meterRegistry);
|
2021-10-20 01:51:34 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
protected boolean commitOptimistically(Transaction tx) {
|
|
|
|
throw new UnsupportedOperationException("Transactions not supported");
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
protected Transaction beginTransaction(@NotNull WriteOptions writeOptions) {
|
|
|
|
throw new UnsupportedOperationException("Transactions not supported");
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public @NotNull UpdateAtomicResult updateAtomic(@NotNull ReadOptions readOptions,
|
|
|
|
@NotNull WriteOptions writeOptions,
|
|
|
|
Send<Buffer> keySend,
|
2021-11-08 16:33:41 +01:00
|
|
|
SerializationFunction<@Nullable Send<Buffer>, @Nullable Buffer> updater,
|
2021-10-20 01:51:34 +02:00
|
|
|
boolean existsAlmostCertainly,
|
|
|
|
UpdateAtomicResultMode returnMode) throws IOException, RocksDBException {
|
|
|
|
try (Buffer key = keySend.receive()) {
|
2021-12-29 00:31:35 +01:00
|
|
|
try {
|
|
|
|
var cfh = getCfh();
|
|
|
|
var db = getDb();
|
|
|
|
var alloc = getAllocator();
|
|
|
|
@Nullable Buffer prevData;
|
|
|
|
var prevDataHolder = existsAlmostCertainly ? null : new Holder<byte[]>();
|
|
|
|
if (existsAlmostCertainly || db.keyMayExist(cfh, LLUtils.toArray(key), prevDataHolder)) {
|
|
|
|
if (!existsAlmostCertainly && prevDataHolder.getValue() != null) {
|
|
|
|
byte @Nullable [] prevDataBytes = prevDataHolder.getValue();
|
|
|
|
if (prevDataBytes != null) {
|
|
|
|
prevData = LLUtils.fromByteArray(alloc, prevDataBytes);
|
|
|
|
} else {
|
|
|
|
prevData = null;
|
|
|
|
}
|
2021-10-20 01:51:34 +02:00
|
|
|
} else {
|
2021-12-29 00:31:35 +01:00
|
|
|
prevData = this.get(readOptions, key, existsAlmostCertainly);
|
2021-10-20 01:51:34 +02:00
|
|
|
}
|
|
|
|
} else {
|
2021-12-29 00:31:35 +01:00
|
|
|
prevData = null;
|
2021-10-20 01:51:34 +02:00
|
|
|
}
|
2021-12-29 00:31:35 +01:00
|
|
|
if (logger.isTraceEnabled()) {
|
|
|
|
logger.trace(MARKER_ROCKSDB, "Reading {}: {} (before update)", LLUtils.toStringSafe(key),
|
|
|
|
LLUtils.toStringSafe(prevData));
|
2021-10-20 01:51:34 +02:00
|
|
|
}
|
|
|
|
try {
|
2021-12-29 00:31:35 +01:00
|
|
|
@Nullable Buffer newData;
|
|
|
|
try (Buffer prevDataToSendToUpdater = prevData == null ? null : prevData.copy()) {
|
|
|
|
try (var sentData = prevDataToSendToUpdater == null ? null : prevDataToSendToUpdater.send()) {
|
|
|
|
try (var newDataToReceive = updater.apply(sentData)) {
|
|
|
|
if (newDataToReceive != null) {
|
|
|
|
newData = newDataToReceive;
|
|
|
|
} else {
|
|
|
|
newData = null;
|
|
|
|
}
|
|
|
|
}
|
2021-10-20 01:51:34 +02:00
|
|
|
}
|
2021-12-29 00:31:35 +01:00
|
|
|
}
|
|
|
|
boolean changed;
|
|
|
|
assert newData == null || newData.isAccessible();
|
|
|
|
try {
|
2021-10-20 01:51:34 +02:00
|
|
|
if (logger.isTraceEnabled()) {
|
2021-12-29 00:31:35 +01:00
|
|
|
logger.trace(MARKER_ROCKSDB,
|
|
|
|
"Updating {}. previous data: {}, updated data: {}", LLUtils.toStringSafe(key),
|
|
|
|
LLUtils.toStringSafe(prevData), LLUtils.toStringSafe(newData));
|
2021-10-20 01:51:34 +02:00
|
|
|
}
|
2021-12-29 00:31:35 +01:00
|
|
|
if (prevData != null && newData == null) {
|
|
|
|
if (logger.isTraceEnabled()) {
|
|
|
|
logger.trace(MARKER_ROCKSDB, "Deleting {} (after update)", LLUtils.toStringSafe(key));
|
|
|
|
}
|
|
|
|
this.delete(writeOptions, key);
|
2021-10-20 01:51:34 +02:00
|
|
|
changed = true;
|
2021-12-29 00:31:35 +01:00
|
|
|
} else if (newData != null && (prevData == null || !LLUtils.equals(prevData, newData))) {
|
|
|
|
if (logger.isTraceEnabled()) {
|
|
|
|
logger.trace(MARKER_ROCKSDB, "Writing {}: {} (after update)", LLUtils.toStringSafe(key),
|
|
|
|
LLUtils.toStringSafe(newData));
|
2021-10-20 01:51:34 +02:00
|
|
|
}
|
2021-12-29 00:31:35 +01:00
|
|
|
Buffer dataToPut;
|
|
|
|
if (returnMode == UpdateAtomicResultMode.CURRENT) {
|
|
|
|
dataToPut = newData.copy();
|
|
|
|
} else {
|
|
|
|
dataToPut = newData;
|
2021-10-20 01:51:34 +02:00
|
|
|
}
|
2021-12-29 00:31:35 +01:00
|
|
|
try {
|
|
|
|
this.put(writeOptions, key, dataToPut);
|
|
|
|
changed = true;
|
|
|
|
} finally {
|
|
|
|
if (dataToPut != newData) {
|
|
|
|
dataToPut.close();
|
|
|
|
}
|
2021-10-20 01:51:34 +02:00
|
|
|
}
|
2021-12-29 00:31:35 +01:00
|
|
|
} else {
|
|
|
|
changed = false;
|
2021-10-20 01:51:34 +02:00
|
|
|
}
|
2021-12-29 00:31:35 +01:00
|
|
|
return switch (returnMode) {
|
|
|
|
case NOTHING -> {
|
|
|
|
if (prevData != null) {
|
|
|
|
prevData.close();
|
|
|
|
}
|
|
|
|
if (newData != null) {
|
|
|
|
newData.close();
|
|
|
|
}
|
|
|
|
yield RESULT_NOTHING;
|
2021-10-20 01:51:34 +02:00
|
|
|
}
|
2021-12-29 00:31:35 +01:00
|
|
|
case CURRENT -> {
|
|
|
|
if (prevData != null) {
|
|
|
|
prevData.close();
|
|
|
|
}
|
|
|
|
yield new UpdateAtomicResultCurrent(newData != null ? newData.send() : null);
|
2021-10-20 01:51:34 +02:00
|
|
|
}
|
2021-12-29 00:31:35 +01:00
|
|
|
case PREVIOUS -> {
|
|
|
|
if (newData != null) {
|
|
|
|
newData.close();
|
|
|
|
}
|
|
|
|
yield new UpdateAtomicResultPrevious(prevData != null ? prevData.send() : null);
|
|
|
|
}
|
|
|
|
case BINARY_CHANGED -> new UpdateAtomicResultBinaryChanged(changed);
|
|
|
|
case DELTA -> new UpdateAtomicResultDelta(LLDelta
|
|
|
|
.of(prevData != null ? prevData.send() : null, newData != null ? newData.send() : null)
|
|
|
|
.send());
|
|
|
|
};
|
|
|
|
} finally {
|
|
|
|
if (newData != null) {
|
|
|
|
newData.close();
|
2021-10-20 01:51:34 +02:00
|
|
|
}
|
2021-12-29 00:31:35 +01:00
|
|
|
}
|
2021-10-20 01:51:34 +02:00
|
|
|
} finally {
|
2021-12-29 00:31:35 +01:00
|
|
|
if (prevData != null) {
|
|
|
|
prevData.close();
|
2021-10-20 01:51:34 +02:00
|
|
|
}
|
|
|
|
}
|
2021-12-29 00:31:35 +01:00
|
|
|
} catch (Throwable ex) {
|
|
|
|
throw new IOException("Failed to update key " + LLUtils.toStringSafe(key), ex);
|
2021-10-20 01:51:34 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public boolean supportsTransactions() {
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
}
|