Print errored key
This commit is contained in:
parent
24493eb4ff
commit
5769bc7076
|
@ -71,142 +71,146 @@ public final class OptimisticRocksDBColumn extends AbstractRocksDBColumn<Optimis
|
|||
boolean existsAlmostCertainly,
|
||||
UpdateAtomicResultMode returnMode) throws IOException, RocksDBException {
|
||||
try (Buffer key = keySend.receive()) {
|
||||
var cfh = getCfh();
|
||||
var keyArray = LLUtils.toArray(key);
|
||||
if (Schedulers.isInNonBlockingThread()) {
|
||||
throw new UnsupportedOperationException("Called update in a nonblocking thread");
|
||||
}
|
||||
try (var tx = beginTransaction(writeOptions)) {
|
||||
boolean committedSuccessfully;
|
||||
int retries = 0;
|
||||
ExponentialPageLimits retryTime = null;
|
||||
Send<Buffer> sentPrevData;
|
||||
Send<Buffer> sentCurData;
|
||||
boolean changed;
|
||||
do {
|
||||
var prevDataArray = tx.getForUpdate(readOptions, cfh, keyArray, true);
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace(MARKER_ROCKSDB,
|
||||
"Reading {}: {} (before update)",
|
||||
LLUtils.toStringSafe(key),
|
||||
LLUtils.toStringSafe(prevDataArray)
|
||||
);
|
||||
}
|
||||
Buffer prevData;
|
||||
if (prevDataArray != null) {
|
||||
prevData = MemoryManager.unsafeWrap(prevDataArray);
|
||||
} else {
|
||||
prevData = null;
|
||||
}
|
||||
try (prevData) {
|
||||
Buffer prevDataToSendToUpdater;
|
||||
if (prevData != null) {
|
||||
prevDataToSendToUpdater = prevData.copy();
|
||||
try {
|
||||
var cfh = getCfh();
|
||||
var keyArray = LLUtils.toArray(key);
|
||||
if (Schedulers.isInNonBlockingThread()) {
|
||||
throw new UnsupportedOperationException("Called update in a nonblocking thread");
|
||||
}
|
||||
try (var tx = beginTransaction(writeOptions)) {
|
||||
boolean committedSuccessfully;
|
||||
int retries = 0;
|
||||
ExponentialPageLimits retryTime = null;
|
||||
Send<Buffer> sentPrevData;
|
||||
Send<Buffer> sentCurData;
|
||||
boolean changed;
|
||||
do {
|
||||
var prevDataArray = tx.getForUpdate(readOptions, cfh, keyArray, true);
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace(MARKER_ROCKSDB,
|
||||
"Reading {}: {} (before update)",
|
||||
LLUtils.toStringSafe(key),
|
||||
LLUtils.toStringSafe(prevDataArray)
|
||||
);
|
||||
}
|
||||
Buffer prevData;
|
||||
if (prevDataArray != null) {
|
||||
prevData = MemoryManager.unsafeWrap(prevDataArray);
|
||||
} else {
|
||||
prevDataToSendToUpdater = null;
|
||||
prevData = null;
|
||||
}
|
||||
|
||||
@Nullable Buffer newData;
|
||||
try (var sentData = prevDataToSendToUpdater == null ? null : prevDataToSendToUpdater.send()) {
|
||||
newData = updater.apply(sentData);
|
||||
}
|
||||
try (newData) {
|
||||
var newDataArray = newData == null ? null : LLUtils.toArray(newData);
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace(MARKER_ROCKSDB,
|
||||
"Updating {}. previous data: {}, updated data: {}",
|
||||
LLUtils.toStringSafe(key),
|
||||
LLUtils.toStringSafe(prevDataArray),
|
||||
LLUtils.toStringSafe(newDataArray)
|
||||
);
|
||||
try (prevData) {
|
||||
Buffer prevDataToSendToUpdater;
|
||||
if (prevData != null) {
|
||||
prevDataToSendToUpdater = prevData.copy();
|
||||
} else {
|
||||
prevDataToSendToUpdater = null;
|
||||
}
|
||||
if (prevData != null && newData == null) {
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace(MARKER_ROCKSDB, "Deleting {} (after update)", LLUtils.toStringSafe(key));
|
||||
}
|
||||
tx.delete(cfh, keyArray, true);
|
||||
changed = true;
|
||||
committedSuccessfully = commitOptimistically(tx);
|
||||
} else if (newData != null && (prevData == null || !LLUtils.equals(prevData, newData))) {
|
||||
|
||||
@Nullable Buffer newData;
|
||||
try (var sentData = prevDataToSendToUpdater == null ? null : prevDataToSendToUpdater.send()) {
|
||||
newData = updater.apply(sentData);
|
||||
}
|
||||
try (newData) {
|
||||
var newDataArray = newData == null ? null : LLUtils.toArray(newData);
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace(MARKER_ROCKSDB,
|
||||
"Writing {}: {} (after update)",
|
||||
"Updating {}. previous data: {}, updated data: {}",
|
||||
LLUtils.toStringSafe(key),
|
||||
LLUtils.toStringSafe(newData)
|
||||
LLUtils.toStringSafe(prevDataArray),
|
||||
LLUtils.toStringSafe(newDataArray)
|
||||
);
|
||||
}
|
||||
tx.put(cfh, keyArray, newDataArray);
|
||||
changed = true;
|
||||
committedSuccessfully = commitOptimistically(tx);
|
||||
} else {
|
||||
changed = false;
|
||||
committedSuccessfully = true;
|
||||
tx.rollback();
|
||||
}
|
||||
sentPrevData = prevData == null ? null : prevData.send();
|
||||
sentCurData = newData == null ? null : newData.send();
|
||||
if (!committedSuccessfully) {
|
||||
tx.undoGetForUpdate(cfh, keyArray);
|
||||
tx.rollback();
|
||||
if (sentPrevData != null) {
|
||||
sentPrevData.close();
|
||||
if (prevData != null && newData == null) {
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace(MARKER_ROCKSDB, "Deleting {} (after update)", LLUtils.toStringSafe(key));
|
||||
}
|
||||
tx.delete(cfh, keyArray, true);
|
||||
changed = true;
|
||||
committedSuccessfully = commitOptimistically(tx);
|
||||
} else if (newData != null && (prevData == null || !LLUtils.equals(prevData, newData))) {
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace(MARKER_ROCKSDB,
|
||||
"Writing {}: {} (after update)",
|
||||
LLUtils.toStringSafe(key),
|
||||
LLUtils.toStringSafe(newData)
|
||||
);
|
||||
}
|
||||
tx.put(cfh, keyArray, newDataArray);
|
||||
changed = true;
|
||||
committedSuccessfully = commitOptimistically(tx);
|
||||
} else {
|
||||
changed = false;
|
||||
committedSuccessfully = true;
|
||||
tx.rollback();
|
||||
}
|
||||
if (sentCurData != null) {
|
||||
sentCurData.close();
|
||||
}
|
||||
retries++;
|
||||
sentPrevData = prevData == null ? null : prevData.send();
|
||||
sentCurData = newData == null ? null : newData.send();
|
||||
if (!committedSuccessfully) {
|
||||
tx.undoGetForUpdate(cfh, keyArray);
|
||||
tx.rollback();
|
||||
if (sentPrevData != null) {
|
||||
sentPrevData.close();
|
||||
}
|
||||
if (sentCurData != null) {
|
||||
sentCurData.close();
|
||||
}
|
||||
retries++;
|
||||
|
||||
if (retries == 1) {
|
||||
retryTime = new ExponentialPageLimits(0, 2, 2000);
|
||||
}
|
||||
long retryNs = 1000000L * retryTime.getPageLimit(retries);
|
||||
if (retries == 1) {
|
||||
retryTime = new ExponentialPageLimits(0, 2, 2000);
|
||||
}
|
||||
long retryNs = 1000000L * retryTime.getPageLimit(retries);
|
||||
|
||||
// +- 30%
|
||||
retryNs = retryNs + ThreadLocalRandom.current().nextLong(-retryNs * 30L / 100L, retryNs * 30L / 100L);
|
||||
// +- 30%
|
||||
retryNs = retryNs + ThreadLocalRandom.current().nextLong(-retryNs * 30L / 100L, retryNs * 30L / 100L);
|
||||
|
||||
if (retries >= 5 && retries % 5 == 0 || ALWAYS_PRINT_OPTIMISTIC_RETRIES) {
|
||||
logger.warn(MARKER_ROCKSDB, "Failed optimistic transaction {} (update):"
|
||||
+ " waiting {} ms before retrying for the {} time", LLUtils.toStringSafe(key), retryNs / 1000000d, retries);
|
||||
} else if (logger.isDebugEnabled(MARKER_ROCKSDB)) {
|
||||
logger.debug(MARKER_ROCKSDB, "Failed optimistic transaction {} (update):"
|
||||
+ " waiting {} ms before retrying for the {} time", LLUtils.toStringSafe(key), retryNs / 1000000d, retries);
|
||||
}
|
||||
// Wait for n milliseconds
|
||||
if (retryNs > 0) {
|
||||
LockSupport.parkNanos(retryNs);
|
||||
if (retries >= 5 && retries % 5 == 0 || ALWAYS_PRINT_OPTIMISTIC_RETRIES) {
|
||||
logger.warn(MARKER_ROCKSDB, "Failed optimistic transaction {} (update):"
|
||||
+ " waiting {} ms before retrying for the {} time", LLUtils.toStringSafe(key), retryNs / 1000000d, retries);
|
||||
} else if (logger.isDebugEnabled(MARKER_ROCKSDB)) {
|
||||
logger.debug(MARKER_ROCKSDB, "Failed optimistic transaction {} (update):"
|
||||
+ " waiting {} ms before retrying for the {} time", LLUtils.toStringSafe(key), retryNs / 1000000d, retries);
|
||||
}
|
||||
// Wait for n milliseconds
|
||||
if (retryNs > 0) {
|
||||
LockSupport.parkNanos(retryNs);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
} while (!committedSuccessfully);
|
||||
if (retries > 5) {
|
||||
logger.warn(MARKER_ROCKSDB, "Took {} retries to update key {}", retries, LLUtils.toStringSafe(key));
|
||||
}
|
||||
} while (!committedSuccessfully);
|
||||
if (retries > 5) {
|
||||
logger.warn(MARKER_ROCKSDB, "Took {} retries to update key {}", retries, LLUtils.toStringSafe(key));
|
||||
return switch (returnMode) {
|
||||
case NOTHING -> {
|
||||
if (sentPrevData != null) {
|
||||
sentPrevData.close();
|
||||
}
|
||||
if (sentCurData != null) {
|
||||
sentCurData.close();
|
||||
}
|
||||
yield RESULT_NOTHING;
|
||||
}
|
||||
case CURRENT -> {
|
||||
if (sentPrevData != null) {
|
||||
sentPrevData.close();
|
||||
}
|
||||
yield new UpdateAtomicResultCurrent(sentCurData);
|
||||
}
|
||||
case PREVIOUS -> {
|
||||
if (sentCurData != null) {
|
||||
sentCurData.close();
|
||||
}
|
||||
yield new UpdateAtomicResultPrevious(sentPrevData);
|
||||
}
|
||||
case BINARY_CHANGED -> new UpdateAtomicResultBinaryChanged(changed);
|
||||
case DELTA -> new UpdateAtomicResultDelta(LLDelta.of(sentPrevData, sentCurData).send());
|
||||
};
|
||||
}
|
||||
return switch (returnMode) {
|
||||
case NOTHING -> {
|
||||
if (sentPrevData != null) {
|
||||
sentPrevData.close();
|
||||
}
|
||||
if (sentCurData != null) {
|
||||
sentCurData.close();
|
||||
}
|
||||
yield RESULT_NOTHING;
|
||||
}
|
||||
case CURRENT -> {
|
||||
if (sentPrevData != null) {
|
||||
sentPrevData.close();
|
||||
}
|
||||
yield new UpdateAtomicResultCurrent(sentCurData);
|
||||
}
|
||||
case PREVIOUS -> {
|
||||
if (sentCurData != null) {
|
||||
sentCurData.close();
|
||||
}
|
||||
yield new UpdateAtomicResultPrevious(sentPrevData);
|
||||
}
|
||||
case BINARY_CHANGED -> new UpdateAtomicResultBinaryChanged(changed);
|
||||
case DELTA -> new UpdateAtomicResultDelta(LLDelta.of(sentPrevData, sentCurData).send());
|
||||
};
|
||||
} catch (Throwable ex) {
|
||||
throw new IOException("Failed to update key " + LLUtils.toStringSafe(key), ex);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -53,102 +53,106 @@ public final class PessimisticRocksDBColumn extends AbstractRocksDBColumn<Transa
|
|||
boolean existsAlmostCertainly,
|
||||
UpdateAtomicResultMode returnMode) throws IOException, RocksDBException {
|
||||
try (Buffer key = keySend.receive()) {
|
||||
var cfh = getCfh();
|
||||
var keyArray = LLUtils.toArray(key);
|
||||
if (Schedulers.isInNonBlockingThread()) {
|
||||
throw new UnsupportedOperationException("Called update in a nonblocking thread");
|
||||
}
|
||||
try (var tx = beginTransaction(writeOptions)) {
|
||||
Send<Buffer> sentPrevData;
|
||||
Send<Buffer> sentCurData;
|
||||
boolean changed;
|
||||
var prevDataArray = tx.getForUpdate(readOptions, cfh, keyArray, true);
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace(MARKER_ROCKSDB,
|
||||
"Reading {}: {} (before update)",
|
||||
LLUtils.toStringSafe(key),
|
||||
LLUtils.toStringSafe(prevDataArray)
|
||||
);
|
||||
try {
|
||||
var cfh = getCfh();
|
||||
var keyArray = LLUtils.toArray(key);
|
||||
if (Schedulers.isInNonBlockingThread()) {
|
||||
throw new UnsupportedOperationException("Called update in a nonblocking thread");
|
||||
}
|
||||
Buffer prevData;
|
||||
if (prevDataArray != null) {
|
||||
prevData = MemoryManager.unsafeWrap(prevDataArray);
|
||||
} else {
|
||||
prevData = null;
|
||||
}
|
||||
try (prevData) {
|
||||
Buffer prevDataToSendToUpdater;
|
||||
if (prevData != null) {
|
||||
prevDataToSendToUpdater = prevData.copy();
|
||||
try (var tx = beginTransaction(writeOptions)) {
|
||||
Send<Buffer> sentPrevData;
|
||||
Send<Buffer> sentCurData;
|
||||
boolean changed;
|
||||
var prevDataArray = tx.getForUpdate(readOptions, cfh, keyArray, true);
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace(MARKER_ROCKSDB,
|
||||
"Reading {}: {} (before update)",
|
||||
LLUtils.toStringSafe(key),
|
||||
LLUtils.toStringSafe(prevDataArray)
|
||||
);
|
||||
}
|
||||
Buffer prevData;
|
||||
if (prevDataArray != null) {
|
||||
prevData = MemoryManager.unsafeWrap(prevDataArray);
|
||||
} else {
|
||||
prevDataToSendToUpdater = null;
|
||||
prevData = null;
|
||||
}
|
||||
|
||||
@Nullable Buffer newData;
|
||||
try (var sentData = prevDataToSendToUpdater == null ? null : prevDataToSendToUpdater.send()) {
|
||||
newData = updater.apply(sentData);
|
||||
}
|
||||
try (newData) {
|
||||
var newDataArray = newData == null ? null : LLUtils.toArray(newData);
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace(MARKER_ROCKSDB,
|
||||
"Updating {}. previous data: {}, updated data: {}",
|
||||
LLUtils.toStringSafe(key),
|
||||
LLUtils.toStringSafe(prevDataArray),
|
||||
LLUtils.toStringSafe(newDataArray)
|
||||
);
|
||||
try (prevData) {
|
||||
Buffer prevDataToSendToUpdater;
|
||||
if (prevData != null) {
|
||||
prevDataToSendToUpdater = prevData.copy();
|
||||
} else {
|
||||
prevDataToSendToUpdater = null;
|
||||
}
|
||||
if (prevData != null && newData == null) {
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace(MARKER_ROCKSDB, "Deleting {} (after update)", LLUtils.toStringSafe(key));
|
||||
}
|
||||
tx.delete(cfh, keyArray, true);
|
||||
changed = true;
|
||||
tx.commit();
|
||||
} else if (newData != null && (prevData == null || !LLUtils.equals(prevData, newData))) {
|
||||
|
||||
@Nullable Buffer newData;
|
||||
try (var sentData = prevDataToSendToUpdater == null ? null : prevDataToSendToUpdater.send()) {
|
||||
newData = updater.apply(sentData);
|
||||
}
|
||||
try (newData) {
|
||||
var newDataArray = newData == null ? null : LLUtils.toArray(newData);
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace(MARKER_ROCKSDB,
|
||||
"Writing {}: {} (after update)",
|
||||
"Updating {}. previous data: {}, updated data: {}",
|
||||
LLUtils.toStringSafe(key),
|
||||
LLUtils.toStringSafe(newData)
|
||||
LLUtils.toStringSafe(prevDataArray),
|
||||
LLUtils.toStringSafe(newDataArray)
|
||||
);
|
||||
}
|
||||
tx.put(cfh, keyArray, newDataArray);
|
||||
changed = true;
|
||||
tx.commit();
|
||||
} else {
|
||||
changed = false;
|
||||
tx.rollback();
|
||||
if (prevData != null && newData == null) {
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace(MARKER_ROCKSDB, "Deleting {} (after update)", LLUtils.toStringSafe(key));
|
||||
}
|
||||
tx.delete(cfh, keyArray, true);
|
||||
changed = true;
|
||||
tx.commit();
|
||||
} else if (newData != null && (prevData == null || !LLUtils.equals(prevData, newData))) {
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace(MARKER_ROCKSDB,
|
||||
"Writing {}: {} (after update)",
|
||||
LLUtils.toStringSafe(key),
|
||||
LLUtils.toStringSafe(newData)
|
||||
);
|
||||
}
|
||||
tx.put(cfh, keyArray, newDataArray);
|
||||
changed = true;
|
||||
tx.commit();
|
||||
} else {
|
||||
changed = false;
|
||||
tx.rollback();
|
||||
}
|
||||
sentPrevData = prevData == null ? null : prevData.send();
|
||||
sentCurData = newData == null ? null : newData.send();
|
||||
}
|
||||
sentPrevData = prevData == null ? null : prevData.send();
|
||||
sentCurData = newData == null ? null : newData.send();
|
||||
}
|
||||
return switch (returnMode) {
|
||||
case NOTHING -> {
|
||||
if (sentPrevData != null) {
|
||||
sentPrevData.close();
|
||||
}
|
||||
if (sentCurData != null) {
|
||||
sentCurData.close();
|
||||
}
|
||||
yield RESULT_NOTHING;
|
||||
}
|
||||
case CURRENT -> {
|
||||
if (sentPrevData != null) {
|
||||
sentPrevData.close();
|
||||
}
|
||||
yield new UpdateAtomicResultCurrent(sentCurData);
|
||||
}
|
||||
case PREVIOUS -> {
|
||||
if (sentCurData != null) {
|
||||
sentCurData.close();
|
||||
}
|
||||
yield new UpdateAtomicResultPrevious(sentPrevData);
|
||||
}
|
||||
case BINARY_CHANGED -> new UpdateAtomicResultBinaryChanged(changed);
|
||||
case DELTA -> new UpdateAtomicResultDelta(LLDelta.of(sentPrevData, sentCurData).send());
|
||||
};
|
||||
}
|
||||
return switch (returnMode) {
|
||||
case NOTHING -> {
|
||||
if (sentPrevData != null) {
|
||||
sentPrevData.close();
|
||||
}
|
||||
if (sentCurData != null) {
|
||||
sentCurData.close();
|
||||
}
|
||||
yield RESULT_NOTHING;
|
||||
}
|
||||
case CURRENT -> {
|
||||
if (sentPrevData != null) {
|
||||
sentPrevData.close();
|
||||
}
|
||||
yield new UpdateAtomicResultCurrent(sentCurData);
|
||||
}
|
||||
case PREVIOUS -> {
|
||||
if (sentCurData != null) {
|
||||
sentCurData.close();
|
||||
}
|
||||
yield new UpdateAtomicResultPrevious(sentPrevData);
|
||||
}
|
||||
case BINARY_CHANGED -> new UpdateAtomicResultBinaryChanged(changed);
|
||||
case DELTA -> new UpdateAtomicResultDelta(LLDelta.of(sentPrevData, sentCurData).send());
|
||||
};
|
||||
} catch (Throwable ex) {
|
||||
throw new IOException("Failed to update key " + LLUtils.toStringSafe(key), ex);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -48,114 +48,118 @@ public final class StandardRocksDBColumn extends AbstractRocksDBColumn<RocksDB>
|
|||
boolean existsAlmostCertainly,
|
||||
UpdateAtomicResultMode returnMode) throws IOException, RocksDBException {
|
||||
try (Buffer key = keySend.receive()) {
|
||||
var cfh = getCfh();
|
||||
var db = getDb();
|
||||
var alloc = getAllocator();
|
||||
@Nullable Buffer prevData;
|
||||
var prevDataHolder = existsAlmostCertainly ? null : new Holder<byte[]>();
|
||||
if (existsAlmostCertainly || db.keyMayExist(cfh, LLUtils.toArray(key), prevDataHolder)) {
|
||||
if (!existsAlmostCertainly && prevDataHolder.getValue() != null) {
|
||||
byte @Nullable [] prevDataBytes = prevDataHolder.getValue();
|
||||
if (prevDataBytes != null) {
|
||||
prevData = LLUtils.fromByteArray(alloc, prevDataBytes);
|
||||
try {
|
||||
var cfh = getCfh();
|
||||
var db = getDb();
|
||||
var alloc = getAllocator();
|
||||
@Nullable Buffer prevData;
|
||||
var prevDataHolder = existsAlmostCertainly ? null : new Holder<byte[]>();
|
||||
if (existsAlmostCertainly || db.keyMayExist(cfh, LLUtils.toArray(key), prevDataHolder)) {
|
||||
if (!existsAlmostCertainly && prevDataHolder.getValue() != null) {
|
||||
byte @Nullable [] prevDataBytes = prevDataHolder.getValue();
|
||||
if (prevDataBytes != null) {
|
||||
prevData = LLUtils.fromByteArray(alloc, prevDataBytes);
|
||||
} else {
|
||||
prevData = null;
|
||||
}
|
||||
} else {
|
||||
prevData = null;
|
||||
prevData = this.get(readOptions, key, existsAlmostCertainly);
|
||||
}
|
||||
} else {
|
||||
prevData = this.get(readOptions, key, existsAlmostCertainly);
|
||||
prevData = null;
|
||||
}
|
||||
} else {
|
||||
prevData = null;
|
||||
}
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace(MARKER_ROCKSDB, "Reading {}: {} (before update)", LLUtils.toStringSafe(key),
|
||||
LLUtils.toStringSafe(prevData));
|
||||
}
|
||||
try {
|
||||
@Nullable Buffer newData;
|
||||
try (Buffer prevDataToSendToUpdater = prevData == null ? null : prevData.copy()) {
|
||||
try (var sentData = prevDataToSendToUpdater == null ? null : prevDataToSendToUpdater.send()) {
|
||||
try (var newDataToReceive = updater.apply(sentData)) {
|
||||
if (newDataToReceive != null) {
|
||||
newData = newDataToReceive;
|
||||
} else {
|
||||
newData = null;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace(MARKER_ROCKSDB, "Reading {}: {} (before update)", LLUtils.toStringSafe(key),
|
||||
LLUtils.toStringSafe(prevData));
|
||||
}
|
||||
boolean changed;
|
||||
assert newData == null || newData.isAccessible();
|
||||
try {
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace(MARKER_ROCKSDB,
|
||||
"Updating {}. previous data: {}, updated data: {}", LLUtils.toStringSafe(key),
|
||||
LLUtils.toStringSafe(prevData), LLUtils.toStringSafe(newData));
|
||||
@Nullable Buffer newData;
|
||||
try (Buffer prevDataToSendToUpdater = prevData == null ? null : prevData.copy()) {
|
||||
try (var sentData = prevDataToSendToUpdater == null ? null : prevDataToSendToUpdater.send()) {
|
||||
try (var newDataToReceive = updater.apply(sentData)) {
|
||||
if (newDataToReceive != null) {
|
||||
newData = newDataToReceive;
|
||||
} else {
|
||||
newData = null;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if (prevData != null && newData == null) {
|
||||
boolean changed;
|
||||
assert newData == null || newData.isAccessible();
|
||||
try {
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace(MARKER_ROCKSDB, "Deleting {} (after update)", LLUtils.toStringSafe(key));
|
||||
logger.trace(MARKER_ROCKSDB,
|
||||
"Updating {}. previous data: {}, updated data: {}", LLUtils.toStringSafe(key),
|
||||
LLUtils.toStringSafe(prevData), LLUtils.toStringSafe(newData));
|
||||
}
|
||||
this.delete(writeOptions, key);
|
||||
changed = true;
|
||||
} else if (newData != null && (prevData == null || !LLUtils.equals(prevData, newData))) {
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace(MARKER_ROCKSDB, "Writing {}: {} (after update)", LLUtils.toStringSafe(key),
|
||||
LLUtils.toStringSafe(newData));
|
||||
}
|
||||
Buffer dataToPut;
|
||||
if (returnMode == UpdateAtomicResultMode.CURRENT) {
|
||||
dataToPut = newData.copy();
|
||||
} else {
|
||||
dataToPut = newData;
|
||||
}
|
||||
try {
|
||||
this.put(writeOptions, key, dataToPut);
|
||||
if (prevData != null && newData == null) {
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace(MARKER_ROCKSDB, "Deleting {} (after update)", LLUtils.toStringSafe(key));
|
||||
}
|
||||
this.delete(writeOptions, key);
|
||||
changed = true;
|
||||
} finally {
|
||||
if (dataToPut != newData) {
|
||||
dataToPut.close();
|
||||
} else if (newData != null && (prevData == null || !LLUtils.equals(prevData, newData))) {
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace(MARKER_ROCKSDB, "Writing {}: {} (after update)", LLUtils.toStringSafe(key),
|
||||
LLUtils.toStringSafe(newData));
|
||||
}
|
||||
Buffer dataToPut;
|
||||
if (returnMode == UpdateAtomicResultMode.CURRENT) {
|
||||
dataToPut = newData.copy();
|
||||
} else {
|
||||
dataToPut = newData;
|
||||
}
|
||||
try {
|
||||
this.put(writeOptions, key, dataToPut);
|
||||
changed = true;
|
||||
} finally {
|
||||
if (dataToPut != newData) {
|
||||
dataToPut.close();
|
||||
}
|
||||
}
|
||||
} else {
|
||||
changed = false;
|
||||
}
|
||||
return switch (returnMode) {
|
||||
case NOTHING -> {
|
||||
if (prevData != null) {
|
||||
prevData.close();
|
||||
}
|
||||
if (newData != null) {
|
||||
newData.close();
|
||||
}
|
||||
yield RESULT_NOTHING;
|
||||
}
|
||||
case CURRENT -> {
|
||||
if (prevData != null) {
|
||||
prevData.close();
|
||||
}
|
||||
yield new UpdateAtomicResultCurrent(newData != null ? newData.send() : null);
|
||||
}
|
||||
case PREVIOUS -> {
|
||||
if (newData != null) {
|
||||
newData.close();
|
||||
}
|
||||
yield new UpdateAtomicResultPrevious(prevData != null ? prevData.send() : null);
|
||||
}
|
||||
case BINARY_CHANGED -> new UpdateAtomicResultBinaryChanged(changed);
|
||||
case DELTA -> new UpdateAtomicResultDelta(LLDelta
|
||||
.of(prevData != null ? prevData.send() : null, newData != null ? newData.send() : null)
|
||||
.send());
|
||||
};
|
||||
} finally {
|
||||
if (newData != null) {
|
||||
newData.close();
|
||||
}
|
||||
} else {
|
||||
changed = false;
|
||||
}
|
||||
return switch (returnMode) {
|
||||
case NOTHING -> {
|
||||
if (prevData != null) {
|
||||
prevData.close();
|
||||
}
|
||||
if (newData != null) {
|
||||
newData.close();
|
||||
}
|
||||
yield RESULT_NOTHING;
|
||||
}
|
||||
case CURRENT -> {
|
||||
if (prevData != null) {
|
||||
prevData.close();
|
||||
}
|
||||
yield new UpdateAtomicResultCurrent(newData != null ? newData.send() : null);
|
||||
}
|
||||
case PREVIOUS -> {
|
||||
if (newData != null) {
|
||||
newData.close();
|
||||
}
|
||||
yield new UpdateAtomicResultPrevious(prevData != null ? prevData.send() : null);
|
||||
}
|
||||
case BINARY_CHANGED -> new UpdateAtomicResultBinaryChanged(changed);
|
||||
case DELTA -> new UpdateAtomicResultDelta(LLDelta
|
||||
.of(prevData != null ? prevData.send() : null, newData != null ? newData.send() : null)
|
||||
.send());
|
||||
};
|
||||
} finally {
|
||||
if (newData != null) {
|
||||
newData.close();
|
||||
if (prevData != null) {
|
||||
prevData.close();
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
if (prevData != null) {
|
||||
prevData.close();
|
||||
}
|
||||
} catch (Throwable ex) {
|
||||
throw new IOException("Failed to update key " + LLUtils.toStringSafe(key), ex);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue
Block a user