diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java index 54418a1..76baec9 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionary.java @@ -3,7 +3,6 @@ package it.cavallium.dbengine.database.collections; import io.netty5.buffer.api.Buffer; import io.netty5.buffer.api.Resource; import io.netty5.buffer.api.Send; -import io.netty5.buffer.api.internal.ResourceSupport; import it.cavallium.dbengine.client.CompositeSnapshot; import it.cavallium.dbengine.database.Delta; import it.cavallium.dbengine.database.LLDictionary; @@ -22,7 +21,6 @@ import it.unimi.dsi.fastutil.objects.Object2ObjectLinkedOpenHashMap; import it.unimi.dsi.fastutil.objects.Object2ObjectSortedMap; import it.unimi.dsi.fastutil.objects.Object2ObjectSortedMaps; import java.util.Collections; -import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Objects; @@ -35,7 +33,6 @@ import org.jetbrains.annotations.Nullable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.publisher.SynchronousSink; -import reactor.util.function.Tuple2; /** * Optimized implementation of "DatabaseMapDictionary with SubStageGetterSingle" @@ -73,25 +70,34 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep(dictionary, prefixKey, keySuffixSerializer, valueSerializer, onClose); } - private void deserializeValue(Send valueToReceive, SynchronousSink sink) { + private void deserializeValue(T keySuffix, Send valueToReceive, SynchronousSink sink) { try (var value = valueToReceive.receive()) { - sink.next(valueSerializer.deserialize(value)); - } catch (IndexOutOfBoundsException ex) { - var exMessage = ex.getMessage(); - if (exMessage != null && exMessage.contains("read 0 to 0, write 0 to ")) { - var totalZeroBytesErrors = this.totalZeroBytesErrors.incrementAndGet(); - if (totalZeroBytesErrors < 512 || totalZeroBytesErrors % 10000 == 0) { - LOG.error("Unexpected zero-bytes value in column " - + dictionary.getDatabaseName() + ":" + dictionary.getColumnName() - + " total=" + totalZeroBytesErrors - ); + try { + sink.next(valueSerializer.deserialize(value)); + } catch (IndexOutOfBoundsException ex) { + var exMessage = ex.getMessage(); + if (exMessage != null && exMessage.contains("read 0 to 0, write 0 to ")) { + var totalZeroBytesErrors = this.totalZeroBytesErrors.incrementAndGet(); + if (totalZeroBytesErrors < 512 || totalZeroBytesErrors % 10000 == 0) { + try (var keySuffixBytes = serializeKeySuffixToKey(keySuffix)) { + LOG.error("Unexpected zero-bytes value at " + dictionary.getDatabaseName() + + ":" + dictionary.getColumnName() + + ":" + LLUtils.toStringSafe(this.keyPrefix) + + ":" + keySuffix + "(" + LLUtils.toStringSafe(keySuffixBytes) + ") total=" + totalZeroBytesErrors); + } catch (SerializationException e) { + LOG.error("Unexpected zero-bytes value at " + dictionary.getDatabaseName() + + ":" + dictionary.getColumnName() + + ":" + LLUtils.toStringSafe(this.keyPrefix) + + ":" + keySuffix + "(?) total=" + totalZeroBytesErrors); + } + } + sink.complete(); + } else { + sink.error(ex); } - sink.complete(); - } else { + } catch (Throwable ex) { sink.error(ex); } - } catch (Throwable ex) { - sink.error(ex); } } @@ -227,7 +233,7 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep serializeKeySuffixToKey(keySuffix).send()), existsAlmostCertainly ) - .handle(this::deserializeValue); + .handle((valueToReceive, sink) -> deserializeValue(keySuffix, valueToReceive, sink)); } @Override @@ -252,7 +258,7 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep serializeKeySuffixToKey(keySuffix).send()); return dictionary .update(keyMono, getSerializedUpdater(updater), updateReturnMode) - .handle(this::deserializeValue); + .handle((valueToReceive, sink) -> deserializeValue(keySuffix, valueToReceive, sink)); } @Override @@ -314,7 +320,9 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep putValueAndGetPrevious(T keySuffix, U value) { var keyMono = Mono.fromCallable(() -> serializeKeySuffixToKey(keySuffix).send()); var valueMono = Mono.fromCallable(() -> serializeValue(value).send()); - return dictionary.put(keyMono, valueMono, LLDictionaryResultType.PREVIOUS_VALUE).handle(this::deserializeValue); + return dictionary + .put(keyMono, valueMono, LLDictionaryResultType.PREVIOUS_VALUE) + .handle((valueToReceive, sink) -> deserializeValue(keySuffix, valueToReceive, sink)); } @Override @@ -323,7 +331,10 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep serializeValue(value).send()); return dictionary .put(keyMono, valueMono, LLDictionaryResultType.PREVIOUS_VALUE) - .handle(this::deserializeValue) + .handle((Send valueToReceive, SynchronousSink sink) -> deserializeValue(keySuffix, + valueToReceive, + sink + )) .map(oldValue -> !Objects.equals(oldValue, value)) .defaultIfEmpty(value != null); } @@ -340,7 +351,9 @@ public class DatabaseMapDictionary extends DatabaseMapDictionaryDeep removeAndGetPrevious(T keySuffix) { var keyMono = Mono.fromCallable(() -> serializeKeySuffixToKey(keySuffix).send()); - return dictionary.remove(keyMono, LLDictionaryResultType.PREVIOUS_VALUE).handle(this::deserializeValue); + return dictionary + .remove(keyMono, LLDictionaryResultType.PREVIOUS_VALUE) + .handle((valueToReceive, sink) -> deserializeValue(keySuffix, valueToReceive, sink)); } @Override diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryDeep.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryDeep.java index 03e97a4..20e1022 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryDeep.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryDeep.java @@ -502,31 +502,36 @@ public class DatabaseMapDictionaryDeep> extend .dictionary .getRange(deepMap.resolveSnapshot(snapshot), deepMap.rangeMono) .handle((entrySend, sink) -> { + K1 key1 = null; + K2 key2 = null; try (var entry = entrySend.receive()) { var keyBuf = entry.getKeyUnsafe(); var valueBuf = entry.getValueUnsafe(); - assert keyBuf != null; - keyBuf.skipReadable(deepMap.keyPrefixLength); - K1 key1; - K2 key2; - try (var key1Buf = keyBuf.split(deepMap.keySuffixLength)) { - key1 = keySuffix1Serializer.deserialize(key1Buf); - } - key2 = keySuffix2Serializer.deserialize(keyBuf); - assert valueBuf != null; - var value = valueSerializer.deserialize(valueBuf); - sink.next(merger.apply(key1, key2, value)); - } catch (IndexOutOfBoundsException ex) { - var exMessage = ex.getMessage(); - if (exMessage != null && exMessage.contains("read 0 to 0, write 0 to ")) { - var totalZeroBytesErrors = deepMap.totalZeroBytesErrors.incrementAndGet(); - if (totalZeroBytesErrors < 512 || totalZeroBytesErrors % 10000 == 0) { - LOG.error("Unexpected zero-bytes value in column " + deepMap.dictionary.getDatabaseName() + ":" - + deepMap.dictionary.getColumnName() + " total=" + totalZeroBytesErrors); + try { + assert keyBuf != null; + keyBuf.skipReadable(deepMap.keyPrefixLength); + try (var key1Buf = keyBuf.split(deepMap.keySuffixLength)) { + key1 = keySuffix1Serializer.deserialize(key1Buf); + } + key2 = keySuffix2Serializer.deserialize(keyBuf); + assert valueBuf != null; + var value = valueSerializer.deserialize(valueBuf); + sink.next(merger.apply(key1, key2, value)); + } catch (IndexOutOfBoundsException ex) { + var exMessage = ex.getMessage(); + if (exMessage != null && exMessage.contains("read 0 to 0, write 0 to ")) { + var totalZeroBytesErrors = deepMap.totalZeroBytesErrors.incrementAndGet(); + if (totalZeroBytesErrors < 512 || totalZeroBytesErrors % 10000 == 0) { + LOG.error("Unexpected zero-bytes value at " + deepMap.dictionary.getDatabaseName() + + ":" + deepMap.dictionary.getColumnName() + + ":[" + key1 + + ":" + key2 + + "](" + LLUtils.toStringSafe(keyBuf) + ") total=" + totalZeroBytesErrors); + } + sink.complete(); + } else { + sink.error(ex); } - sink.complete(); - } else { - sink.error(ex); } } catch (SerializationException ex) { sink.error(ex); diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingle.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingle.java index dfc061d..1898c3e 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingle.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingle.java @@ -92,8 +92,8 @@ public class DatabaseSingle extends ResourceSupport, Databas } catch (IndexOutOfBoundsException ex) { var exMessage = ex.getMessage(); if (exMessage != null && exMessage.contains("read 0 to 0, write 0 to ")) { - LOG.error("Unexpected zero-bytes value in column " - + dictionary.getDatabaseName() + ":" + dictionary.getColumnName()); + LOG.error("Unexpected zero-bytes value at " + + dictionary.getDatabaseName() + ":" + dictionary.getColumnName() + ":" + LLUtils.toStringSafe(key)); sink.complete(); } else { sink.error(ex); diff --git a/src/main/java/it/cavallium/dbengine/database/disk/AbstractRocksDBColumn.java b/src/main/java/it/cavallium/dbengine/database/disk/AbstractRocksDBColumn.java index 6cf3a5d..8cab784 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/AbstractRocksDBColumn.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/AbstractRocksDBColumn.java @@ -4,6 +4,7 @@ import static io.netty5.buffer.api.StandardAllocationTypes.OFF_HEAP; import static it.cavallium.dbengine.database.LLUtils.INITIAL_DIRECT_READ_BYTE_BUF_SIZE_BYTES; import static java.util.Objects.requireNonNull; import static org.rocksdb.KeyMayExist.KeyMayExistEnum.kExistsWithValue; +import static org.rocksdb.KeyMayExist.KeyMayExistEnum.kExistsWithoutValue; import io.micrometer.core.instrument.Gauge; import io.micrometer.core.instrument.MeterRegistry; @@ -138,17 +139,32 @@ public sealed abstract class AbstractRocksDBColumn implements resultBuffer.close(); return null; } + // todo: kExistsWithValue is not reliable (read below), + // in some cases it should be treated as kExistsWithoutValue + case kExistsWithValue: case kExistsWithoutValue: { - assert keyMayExistValueLength == 0; - resultWritable.clear(); - // real data size - size = db.get(cfh, readOptions, keyNioBuffer.rewind(), resultWritable.clear()); - if (size == RocksDB.NOT_FOUND) { - resultBuffer.close(); - return null; + boolean isKExistsWithoutValue = false; + if (keyMayExistState == kExistsWithoutValue) { + isKExistsWithoutValue = true; + } else { + // todo: "size == 0 || resultWritable.limit() == 0" is checked because keyMayExist is broken, + // and sometimes it returns an empty array, as if it exists + if (size == 0 || resultWritable.limit() == 0) { + isKExistsWithoutValue = true; + } + } + if (isKExistsWithoutValue) { + assert keyMayExistValueLength == 0; + resultWritable.clear(); + // real data size + size = db.get(cfh, readOptions, keyNioBuffer.rewind(), resultWritable.clear()); + if (size == RocksDB.NOT_FOUND) { + resultBuffer.close(); + return null; + } } } - case kExistsWithValue: { + default: { // real data size this.lastDataSizeMetric.set(size); assert size >= 0; @@ -170,9 +186,6 @@ public sealed abstract class AbstractRocksDBColumn implements return resultBuffer.writerOffset(resultWritable.limit()); } } - default: { - throw new IllegalStateException(); - } } } catch (Throwable t) { resultBuffer.close(); @@ -189,7 +202,9 @@ public sealed abstract class AbstractRocksDBColumn implements requireNonNull(keyArray); Holder data = new Holder<>(); if (db.keyMayExist(cfh, readOptions, keyArray, data)) { - if (data.getValue() != null) { + // todo: "data.getValue().length > 0" is checked because keyMayExist is broken, and sometimes it + // returns an empty array, as if it exists + if (data.getValue() != null && data.getValue().length > 0) { return LLUtils.fromByteArray(alloc, data.getValue()); } else { byte[] result = db.get(cfh, readOptions, keyArray); diff --git a/src/main/java/it/cavallium/dbengine/database/disk/PessimisticRocksDBColumn.java b/src/main/java/it/cavallium/dbengine/database/disk/PessimisticRocksDBColumn.java index b24682a..9253063 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/PessimisticRocksDBColumn.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/PessimisticRocksDBColumn.java @@ -62,67 +62,74 @@ public final class PessimisticRocksDBColumn extends AbstractRocksDBColumn sentPrevData; Send sentCurData; boolean changed; - var prevDataArray = tx.getForUpdate(readOptions, cfh, keyArray, true); if (logger.isTraceEnabled()) { - logger.trace(MARKER_ROCKSDB, - "Reading {}: {} (before update)", - LLUtils.toStringSafe(key), - LLUtils.toStringSafe(prevDataArray) - ); + logger.trace(MARKER_ROCKSDB, "Reading {} (before update lock)", LLUtils.toStringSafe(key)); } - Buffer prevData; - if (prevDataArray != null) { - prevData = MemoryManager.unsafeWrap(prevDataArray); - } else { - prevData = null; - } - try (prevData) { - Buffer prevDataToSendToUpdater; - if (prevData != null) { - prevDataToSendToUpdater = prevData.copy(); + var prevDataArray = tx.getForUpdate(readOptions, cfh, keyArray, true); + try { + if (logger.isTraceEnabled()) { + logger.trace(MARKER_ROCKSDB, + "Reading {}: {} (before update)", + LLUtils.toStringSafe(key), + LLUtils.toStringSafe(prevDataArray) + ); + } + Buffer prevData; + if (prevDataArray != null) { + prevData = MemoryManager.unsafeWrap(prevDataArray); } else { - prevDataToSendToUpdater = null; + prevData = null; } - - @Nullable Buffer newData; - try (var sentData = prevDataToSendToUpdater == null ? null : prevDataToSendToUpdater.send()) { - newData = updater.apply(sentData); - } - try (newData) { - var newDataArray = newData == null ? null : LLUtils.toArray(newData); - if (logger.isTraceEnabled()) { - logger.trace(MARKER_ROCKSDB, - "Updating {}. previous data: {}, updated data: {}", - LLUtils.toStringSafe(key), - LLUtils.toStringSafe(prevDataArray), - LLUtils.toStringSafe(newDataArray) - ); + try (prevData) { + Buffer prevDataToSendToUpdater; + if (prevData != null) { + prevDataToSendToUpdater = prevData.copy(); + } else { + prevDataToSendToUpdater = null; } - if (prevData != null && newData == null) { - if (logger.isTraceEnabled()) { - logger.trace(MARKER_ROCKSDB, "Deleting {} (after update)", LLUtils.toStringSafe(key)); - } - tx.delete(cfh, keyArray, true); - changed = true; - tx.commit(); - } else if (newData != null && (prevData == null || !LLUtils.equals(prevData, newData))) { + + @Nullable Buffer newData; + try (var sentData = prevDataToSendToUpdater == null ? null : prevDataToSendToUpdater.send()) { + newData = updater.apply(sentData); + } + try (newData) { + var newDataArray = newData == null ? null : LLUtils.toArray(newData); if (logger.isTraceEnabled()) { logger.trace(MARKER_ROCKSDB, - "Writing {}: {} (after update)", + "Updating {}. previous data: {}, updated data: {}", LLUtils.toStringSafe(key), - LLUtils.toStringSafe(newData) + LLUtils.toStringSafe(prevDataArray), + LLUtils.toStringSafe(newDataArray) ); } - tx.put(cfh, keyArray, newDataArray); - changed = true; - tx.commit(); - } else { - changed = false; - tx.rollback(); + if (prevData != null && newData == null) { + if (logger.isTraceEnabled()) { + logger.trace(MARKER_ROCKSDB, "Deleting {} (after update)", LLUtils.toStringSafe(key)); + } + tx.delete(cfh, keyArray, true); + changed = true; + tx.commit(); + } else if (newData != null && (prevData == null || !LLUtils.equals(prevData, newData))) { + if (logger.isTraceEnabled()) { + logger.trace(MARKER_ROCKSDB, + "Writing {}: {} (after update)", + LLUtils.toStringSafe(key), + LLUtils.toStringSafe(newData) + ); + } + tx.put(cfh, keyArray, newDataArray); + changed = true; + tx.commit(); + } else { + changed = false; + tx.rollback(); + } + sentPrevData = prevData == null ? null : prevData.send(); + sentCurData = newData == null ? null : newData.send(); } - sentPrevData = prevData == null ? null : prevData.send(); - sentCurData = newData == null ? null : newData.send(); } + } finally { + tx.undoGetForUpdate(cfh, keyArray); } return switch (returnMode) { case NOTHING -> {