Better errors logging, avoid zero-bytes bug in keyMayExist
This commit is contained in:
parent
59c37c0fc9
commit
28b4fdee50
@ -3,7 +3,6 @@ package it.cavallium.dbengine.database.collections;
|
||||
import io.netty5.buffer.api.Buffer;
|
||||
import io.netty5.buffer.api.Resource;
|
||||
import io.netty5.buffer.api.Send;
|
||||
import io.netty5.buffer.api.internal.ResourceSupport;
|
||||
import it.cavallium.dbengine.client.CompositeSnapshot;
|
||||
import it.cavallium.dbengine.database.Delta;
|
||||
import it.cavallium.dbengine.database.LLDictionary;
|
||||
@ -22,7 +21,6 @@ import it.unimi.dsi.fastutil.objects.Object2ObjectLinkedOpenHashMap;
|
||||
import it.unimi.dsi.fastutil.objects.Object2ObjectSortedMap;
|
||||
import it.unimi.dsi.fastutil.objects.Object2ObjectSortedMaps;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.Objects;
|
||||
@ -35,7 +33,6 @@ import org.jetbrains.annotations.Nullable;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
import reactor.core.publisher.SynchronousSink;
|
||||
import reactor.util.function.Tuple2;
|
||||
|
||||
/**
|
||||
* Optimized implementation of "DatabaseMapDictionary with SubStageGetterSingle"
|
||||
@ -73,25 +70,34 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
|
||||
return new DatabaseMapDictionary<>(dictionary, prefixKey, keySuffixSerializer, valueSerializer, onClose);
|
||||
}
|
||||
|
||||
private void deserializeValue(Send<Buffer> valueToReceive, SynchronousSink<U> sink) {
|
||||
private void deserializeValue(T keySuffix, Send<Buffer> valueToReceive, SynchronousSink<U> sink) {
|
||||
try (var value = valueToReceive.receive()) {
|
||||
sink.next(valueSerializer.deserialize(value));
|
||||
} catch (IndexOutOfBoundsException ex) {
|
||||
var exMessage = ex.getMessage();
|
||||
if (exMessage != null && exMessage.contains("read 0 to 0, write 0 to ")) {
|
||||
var totalZeroBytesErrors = this.totalZeroBytesErrors.incrementAndGet();
|
||||
if (totalZeroBytesErrors < 512 || totalZeroBytesErrors % 10000 == 0) {
|
||||
LOG.error("Unexpected zero-bytes value in column "
|
||||
+ dictionary.getDatabaseName() + ":" + dictionary.getColumnName()
|
||||
+ " total=" + totalZeroBytesErrors
|
||||
);
|
||||
try {
|
||||
sink.next(valueSerializer.deserialize(value));
|
||||
} catch (IndexOutOfBoundsException ex) {
|
||||
var exMessage = ex.getMessage();
|
||||
if (exMessage != null && exMessage.contains("read 0 to 0, write 0 to ")) {
|
||||
var totalZeroBytesErrors = this.totalZeroBytesErrors.incrementAndGet();
|
||||
if (totalZeroBytesErrors < 512 || totalZeroBytesErrors % 10000 == 0) {
|
||||
try (var keySuffixBytes = serializeKeySuffixToKey(keySuffix)) {
|
||||
LOG.error("Unexpected zero-bytes value at " + dictionary.getDatabaseName()
|
||||
+ ":" + dictionary.getColumnName()
|
||||
+ ":" + LLUtils.toStringSafe(this.keyPrefix)
|
||||
+ ":" + keySuffix + "(" + LLUtils.toStringSafe(keySuffixBytes) + ") total=" + totalZeroBytesErrors);
|
||||
} catch (SerializationException e) {
|
||||
LOG.error("Unexpected zero-bytes value at " + dictionary.getDatabaseName()
|
||||
+ ":" + dictionary.getColumnName()
|
||||
+ ":" + LLUtils.toStringSafe(this.keyPrefix)
|
||||
+ ":" + keySuffix + "(?) total=" + totalZeroBytesErrors);
|
||||
}
|
||||
}
|
||||
sink.complete();
|
||||
} else {
|
||||
sink.error(ex);
|
||||
}
|
||||
sink.complete();
|
||||
} else {
|
||||
} catch (Throwable ex) {
|
||||
sink.error(ex);
|
||||
}
|
||||
} catch (Throwable ex) {
|
||||
sink.error(ex);
|
||||
}
|
||||
}
|
||||
|
||||
@ -227,7 +233,7 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
|
||||
Mono.fromCallable(() -> serializeKeySuffixToKey(keySuffix).send()),
|
||||
existsAlmostCertainly
|
||||
)
|
||||
.handle(this::deserializeValue);
|
||||
.handle((valueToReceive, sink) -> deserializeValue(keySuffix, valueToReceive, sink));
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -252,7 +258,7 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
|
||||
var keyMono = Mono.fromCallable(() -> serializeKeySuffixToKey(keySuffix).send());
|
||||
return dictionary
|
||||
.update(keyMono, getSerializedUpdater(updater), updateReturnMode)
|
||||
.handle(this::deserializeValue);
|
||||
.handle((valueToReceive, sink) -> deserializeValue(keySuffix, valueToReceive, sink));
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -314,7 +320,9 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
|
||||
public Mono<U> putValueAndGetPrevious(T keySuffix, U value) {
|
||||
var keyMono = Mono.fromCallable(() -> serializeKeySuffixToKey(keySuffix).send());
|
||||
var valueMono = Mono.fromCallable(() -> serializeValue(value).send());
|
||||
return dictionary.put(keyMono, valueMono, LLDictionaryResultType.PREVIOUS_VALUE).handle(this::deserializeValue);
|
||||
return dictionary
|
||||
.put(keyMono, valueMono, LLDictionaryResultType.PREVIOUS_VALUE)
|
||||
.handle((valueToReceive, sink) -> deserializeValue(keySuffix, valueToReceive, sink));
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -323,7 +331,10 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
|
||||
var valueMono = Mono.fromCallable(() -> serializeValue(value).send());
|
||||
return dictionary
|
||||
.put(keyMono, valueMono, LLDictionaryResultType.PREVIOUS_VALUE)
|
||||
.handle(this::deserializeValue)
|
||||
.handle((Send<Buffer> valueToReceive, SynchronousSink<U> sink) -> deserializeValue(keySuffix,
|
||||
valueToReceive,
|
||||
sink
|
||||
))
|
||||
.map(oldValue -> !Objects.equals(oldValue, value))
|
||||
.defaultIfEmpty(value != null);
|
||||
}
|
||||
@ -340,7 +351,9 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
|
||||
@Override
|
||||
public Mono<U> removeAndGetPrevious(T keySuffix) {
|
||||
var keyMono = Mono.fromCallable(() -> serializeKeySuffixToKey(keySuffix).send());
|
||||
return dictionary.remove(keyMono, LLDictionaryResultType.PREVIOUS_VALUE).handle(this::deserializeValue);
|
||||
return dictionary
|
||||
.remove(keyMono, LLDictionaryResultType.PREVIOUS_VALUE)
|
||||
.handle((valueToReceive, sink) -> deserializeValue(keySuffix, valueToReceive, sink));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -502,31 +502,36 @@ public class DatabaseMapDictionaryDeep<T, U, US extends DatabaseStage<U>> extend
|
||||
.dictionary
|
||||
.getRange(deepMap.resolveSnapshot(snapshot), deepMap.rangeMono)
|
||||
.handle((entrySend, sink) -> {
|
||||
K1 key1 = null;
|
||||
K2 key2 = null;
|
||||
try (var entry = entrySend.receive()) {
|
||||
var keyBuf = entry.getKeyUnsafe();
|
||||
var valueBuf = entry.getValueUnsafe();
|
||||
assert keyBuf != null;
|
||||
keyBuf.skipReadable(deepMap.keyPrefixLength);
|
||||
K1 key1;
|
||||
K2 key2;
|
||||
try (var key1Buf = keyBuf.split(deepMap.keySuffixLength)) {
|
||||
key1 = keySuffix1Serializer.deserialize(key1Buf);
|
||||
}
|
||||
key2 = keySuffix2Serializer.deserialize(keyBuf);
|
||||
assert valueBuf != null;
|
||||
var value = valueSerializer.deserialize(valueBuf);
|
||||
sink.next(merger.apply(key1, key2, value));
|
||||
} catch (IndexOutOfBoundsException ex) {
|
||||
var exMessage = ex.getMessage();
|
||||
if (exMessage != null && exMessage.contains("read 0 to 0, write 0 to ")) {
|
||||
var totalZeroBytesErrors = deepMap.totalZeroBytesErrors.incrementAndGet();
|
||||
if (totalZeroBytesErrors < 512 || totalZeroBytesErrors % 10000 == 0) {
|
||||
LOG.error("Unexpected zero-bytes value in column " + deepMap.dictionary.getDatabaseName() + ":"
|
||||
+ deepMap.dictionary.getColumnName() + " total=" + totalZeroBytesErrors);
|
||||
try {
|
||||
assert keyBuf != null;
|
||||
keyBuf.skipReadable(deepMap.keyPrefixLength);
|
||||
try (var key1Buf = keyBuf.split(deepMap.keySuffixLength)) {
|
||||
key1 = keySuffix1Serializer.deserialize(key1Buf);
|
||||
}
|
||||
key2 = keySuffix2Serializer.deserialize(keyBuf);
|
||||
assert valueBuf != null;
|
||||
var value = valueSerializer.deserialize(valueBuf);
|
||||
sink.next(merger.apply(key1, key2, value));
|
||||
} catch (IndexOutOfBoundsException ex) {
|
||||
var exMessage = ex.getMessage();
|
||||
if (exMessage != null && exMessage.contains("read 0 to 0, write 0 to ")) {
|
||||
var totalZeroBytesErrors = deepMap.totalZeroBytesErrors.incrementAndGet();
|
||||
if (totalZeroBytesErrors < 512 || totalZeroBytesErrors % 10000 == 0) {
|
||||
LOG.error("Unexpected zero-bytes value at " + deepMap.dictionary.getDatabaseName()
|
||||
+ ":" + deepMap.dictionary.getColumnName()
|
||||
+ ":[" + key1
|
||||
+ ":" + key2
|
||||
+ "](" + LLUtils.toStringSafe(keyBuf) + ") total=" + totalZeroBytesErrors);
|
||||
}
|
||||
sink.complete();
|
||||
} else {
|
||||
sink.error(ex);
|
||||
}
|
||||
sink.complete();
|
||||
} else {
|
||||
sink.error(ex);
|
||||
}
|
||||
} catch (SerializationException ex) {
|
||||
sink.error(ex);
|
||||
|
@ -92,8 +92,8 @@ public class DatabaseSingle<U> extends ResourceSupport<DatabaseStage<U>, Databas
|
||||
} catch (IndexOutOfBoundsException ex) {
|
||||
var exMessage = ex.getMessage();
|
||||
if (exMessage != null && exMessage.contains("read 0 to 0, write 0 to ")) {
|
||||
LOG.error("Unexpected zero-bytes value in column "
|
||||
+ dictionary.getDatabaseName() + ":" + dictionary.getColumnName());
|
||||
LOG.error("Unexpected zero-bytes value at "
|
||||
+ dictionary.getDatabaseName() + ":" + dictionary.getColumnName() + ":" + LLUtils.toStringSafe(key));
|
||||
sink.complete();
|
||||
} else {
|
||||
sink.error(ex);
|
||||
|
@ -4,6 +4,7 @@ import static io.netty5.buffer.api.StandardAllocationTypes.OFF_HEAP;
|
||||
import static it.cavallium.dbengine.database.LLUtils.INITIAL_DIRECT_READ_BYTE_BUF_SIZE_BYTES;
|
||||
import static java.util.Objects.requireNonNull;
|
||||
import static org.rocksdb.KeyMayExist.KeyMayExistEnum.kExistsWithValue;
|
||||
import static org.rocksdb.KeyMayExist.KeyMayExistEnum.kExistsWithoutValue;
|
||||
|
||||
import io.micrometer.core.instrument.Gauge;
|
||||
import io.micrometer.core.instrument.MeterRegistry;
|
||||
@ -138,17 +139,32 @@ public sealed abstract class AbstractRocksDBColumn<T extends RocksDB> implements
|
||||
resultBuffer.close();
|
||||
return null;
|
||||
}
|
||||
// todo: kExistsWithValue is not reliable (read below),
|
||||
// in some cases it should be treated as kExistsWithoutValue
|
||||
case kExistsWithValue:
|
||||
case kExistsWithoutValue: {
|
||||
assert keyMayExistValueLength == 0;
|
||||
resultWritable.clear();
|
||||
// real data size
|
||||
size = db.get(cfh, readOptions, keyNioBuffer.rewind(), resultWritable.clear());
|
||||
if (size == RocksDB.NOT_FOUND) {
|
||||
resultBuffer.close();
|
||||
return null;
|
||||
boolean isKExistsWithoutValue = false;
|
||||
if (keyMayExistState == kExistsWithoutValue) {
|
||||
isKExistsWithoutValue = true;
|
||||
} else {
|
||||
// todo: "size == 0 || resultWritable.limit() == 0" is checked because keyMayExist is broken,
|
||||
// and sometimes it returns an empty array, as if it exists
|
||||
if (size == 0 || resultWritable.limit() == 0) {
|
||||
isKExistsWithoutValue = true;
|
||||
}
|
||||
}
|
||||
if (isKExistsWithoutValue) {
|
||||
assert keyMayExistValueLength == 0;
|
||||
resultWritable.clear();
|
||||
// real data size
|
||||
size = db.get(cfh, readOptions, keyNioBuffer.rewind(), resultWritable.clear());
|
||||
if (size == RocksDB.NOT_FOUND) {
|
||||
resultBuffer.close();
|
||||
return null;
|
||||
}
|
||||
}
|
||||
}
|
||||
case kExistsWithValue: {
|
||||
default: {
|
||||
// real data size
|
||||
this.lastDataSizeMetric.set(size);
|
||||
assert size >= 0;
|
||||
@ -170,9 +186,6 @@ public sealed abstract class AbstractRocksDBColumn<T extends RocksDB> implements
|
||||
return resultBuffer.writerOffset(resultWritable.limit());
|
||||
}
|
||||
}
|
||||
default: {
|
||||
throw new IllegalStateException();
|
||||
}
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
resultBuffer.close();
|
||||
@ -189,7 +202,9 @@ public sealed abstract class AbstractRocksDBColumn<T extends RocksDB> implements
|
||||
requireNonNull(keyArray);
|
||||
Holder<byte[]> data = new Holder<>();
|
||||
if (db.keyMayExist(cfh, readOptions, keyArray, data)) {
|
||||
if (data.getValue() != null) {
|
||||
// todo: "data.getValue().length > 0" is checked because keyMayExist is broken, and sometimes it
|
||||
// returns an empty array, as if it exists
|
||||
if (data.getValue() != null && data.getValue().length > 0) {
|
||||
return LLUtils.fromByteArray(alloc, data.getValue());
|
||||
} else {
|
||||
byte[] result = db.get(cfh, readOptions, keyArray);
|
||||
|
@ -62,67 +62,74 @@ public final class PessimisticRocksDBColumn extends AbstractRocksDBColumn<Transa
|
||||
Send<Buffer> sentPrevData;
|
||||
Send<Buffer> sentCurData;
|
||||
boolean changed;
|
||||
var prevDataArray = tx.getForUpdate(readOptions, cfh, keyArray, true);
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace(MARKER_ROCKSDB,
|
||||
"Reading {}: {} (before update)",
|
||||
LLUtils.toStringSafe(key),
|
||||
LLUtils.toStringSafe(prevDataArray)
|
||||
);
|
||||
logger.trace(MARKER_ROCKSDB, "Reading {} (before update lock)", LLUtils.toStringSafe(key));
|
||||
}
|
||||
Buffer prevData;
|
||||
if (prevDataArray != null) {
|
||||
prevData = MemoryManager.unsafeWrap(prevDataArray);
|
||||
} else {
|
||||
prevData = null;
|
||||
}
|
||||
try (prevData) {
|
||||
Buffer prevDataToSendToUpdater;
|
||||
if (prevData != null) {
|
||||
prevDataToSendToUpdater = prevData.copy();
|
||||
var prevDataArray = tx.getForUpdate(readOptions, cfh, keyArray, true);
|
||||
try {
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace(MARKER_ROCKSDB,
|
||||
"Reading {}: {} (before update)",
|
||||
LLUtils.toStringSafe(key),
|
||||
LLUtils.toStringSafe(prevDataArray)
|
||||
);
|
||||
}
|
||||
Buffer prevData;
|
||||
if (prevDataArray != null) {
|
||||
prevData = MemoryManager.unsafeWrap(prevDataArray);
|
||||
} else {
|
||||
prevDataToSendToUpdater = null;
|
||||
prevData = null;
|
||||
}
|
||||
|
||||
@Nullable Buffer newData;
|
||||
try (var sentData = prevDataToSendToUpdater == null ? null : prevDataToSendToUpdater.send()) {
|
||||
newData = updater.apply(sentData);
|
||||
}
|
||||
try (newData) {
|
||||
var newDataArray = newData == null ? null : LLUtils.toArray(newData);
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace(MARKER_ROCKSDB,
|
||||
"Updating {}. previous data: {}, updated data: {}",
|
||||
LLUtils.toStringSafe(key),
|
||||
LLUtils.toStringSafe(prevDataArray),
|
||||
LLUtils.toStringSafe(newDataArray)
|
||||
);
|
||||
try (prevData) {
|
||||
Buffer prevDataToSendToUpdater;
|
||||
if (prevData != null) {
|
||||
prevDataToSendToUpdater = prevData.copy();
|
||||
} else {
|
||||
prevDataToSendToUpdater = null;
|
||||
}
|
||||
if (prevData != null && newData == null) {
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace(MARKER_ROCKSDB, "Deleting {} (after update)", LLUtils.toStringSafe(key));
|
||||
}
|
||||
tx.delete(cfh, keyArray, true);
|
||||
changed = true;
|
||||
tx.commit();
|
||||
} else if (newData != null && (prevData == null || !LLUtils.equals(prevData, newData))) {
|
||||
|
||||
@Nullable Buffer newData;
|
||||
try (var sentData = prevDataToSendToUpdater == null ? null : prevDataToSendToUpdater.send()) {
|
||||
newData = updater.apply(sentData);
|
||||
}
|
||||
try (newData) {
|
||||
var newDataArray = newData == null ? null : LLUtils.toArray(newData);
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace(MARKER_ROCKSDB,
|
||||
"Writing {}: {} (after update)",
|
||||
"Updating {}. previous data: {}, updated data: {}",
|
||||
LLUtils.toStringSafe(key),
|
||||
LLUtils.toStringSafe(newData)
|
||||
LLUtils.toStringSafe(prevDataArray),
|
||||
LLUtils.toStringSafe(newDataArray)
|
||||
);
|
||||
}
|
||||
tx.put(cfh, keyArray, newDataArray);
|
||||
changed = true;
|
||||
tx.commit();
|
||||
} else {
|
||||
changed = false;
|
||||
tx.rollback();
|
||||
if (prevData != null && newData == null) {
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace(MARKER_ROCKSDB, "Deleting {} (after update)", LLUtils.toStringSafe(key));
|
||||
}
|
||||
tx.delete(cfh, keyArray, true);
|
||||
changed = true;
|
||||
tx.commit();
|
||||
} else if (newData != null && (prevData == null || !LLUtils.equals(prevData, newData))) {
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace(MARKER_ROCKSDB,
|
||||
"Writing {}: {} (after update)",
|
||||
LLUtils.toStringSafe(key),
|
||||
LLUtils.toStringSafe(newData)
|
||||
);
|
||||
}
|
||||
tx.put(cfh, keyArray, newDataArray);
|
||||
changed = true;
|
||||
tx.commit();
|
||||
} else {
|
||||
changed = false;
|
||||
tx.rollback();
|
||||
}
|
||||
sentPrevData = prevData == null ? null : prevData.send();
|
||||
sentCurData = newData == null ? null : newData.send();
|
||||
}
|
||||
sentPrevData = prevData == null ? null : prevData.send();
|
||||
sentCurData = newData == null ? null : newData.send();
|
||||
}
|
||||
} finally {
|
||||
tx.undoGetForUpdate(cfh, keyArray);
|
||||
}
|
||||
return switch (returnMode) {
|
||||
case NOTHING -> {
|
||||
|
Loading…
x
Reference in New Issue
Block a user