Fix pessimistic db

This commit is contained in:
Andrea Cavalli 2021-12-27 18:44:54 +01:00
parent dffb8eb3eb
commit 7993a6210b
4 changed files with 33 additions and 20 deletions

View File

@ -8,11 +8,15 @@ import it.cavallium.dbengine.database.serialization.Serializer;
import it.unimi.dsi.fastutil.objects.ObjectArraySet; import it.unimi.dsi.fastutil.objects.ObjectArraySet;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Objects; import java.util.Objects;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable; import org.jetbrains.annotations.Nullable;
class ValuesSetSerializer<X> implements Serializer<ObjectArraySet<X>> { class ValuesSetSerializer<X> implements Serializer<ObjectArraySet<X>> {
private static final Logger logger = LogManager.getLogger(ValuesSetSerializer.class);
private final Serializer<X> entrySerializer; private final Serializer<X> entrySerializer;
ValuesSetSerializer(Serializer<X> entrySerializer) { ValuesSetSerializer(Serializer<X> entrySerializer) {
@ -21,14 +25,23 @@ class ValuesSetSerializer<X> implements Serializer<ObjectArraySet<X>> {
@Override @Override
public @NotNull ObjectArraySet<X> deserialize(@NotNull Buffer serialized) throws SerializationException { public @NotNull ObjectArraySet<X> deserialize(@NotNull Buffer serialized) throws SerializationException {
Objects.requireNonNull(serialized); try {
int entriesLength = serialized.readInt(); Objects.requireNonNull(serialized);
ArrayList<X> deserializedElements = new ArrayList<>(entriesLength); if (serialized.readableBytes() == 0) {
for (int i = 0; i < entriesLength; i++) { logger.error("Can't deserialize, 0 bytes are readable");
var deserializationResult = entrySerializer.deserialize(serialized); return new ObjectArraySet<>();
deserializedElements.add(deserializationResult); }
int entriesLength = serialized.readInt();
ArrayList<X> deserializedElements = new ArrayList<>(entriesLength);
for (int i = 0; i < entriesLength; i++) {
var deserializationResult = entrySerializer.deserialize(serialized);
deserializedElements.add(deserializationResult);
}
return new ObjectArraySet<>(deserializedElements);
} catch (IndexOutOfBoundsException ex) {
logger.error("Error during deserialization of value set, returning an empty set", ex);
return new ObjectArraySet<>();
} }
return new ObjectArraySet<>(deserializedElements);
} }
@Override @Override

View File

@ -6,9 +6,9 @@ import static it.cavallium.dbengine.database.LLUtils.MARKER_ROCKSDB;
import io.micrometer.core.instrument.MeterRegistry; import io.micrometer.core.instrument.MeterRegistry;
import io.net5.buffer.api.BufferAllocator; import io.net5.buffer.api.BufferAllocator;
import io.net5.util.internal.PlatformDependent; import io.net5.util.internal.PlatformDependent;
import it.cavallium.dbengine.client.DatabaseOptions;
import it.cavallium.dbengine.client.DatabaseVolume; import it.cavallium.dbengine.client.DatabaseVolume;
import it.cavallium.dbengine.database.Column; import it.cavallium.dbengine.database.Column;
import it.cavallium.dbengine.client.DatabaseOptions;
import it.cavallium.dbengine.database.LLKeyValueDatabase; import it.cavallium.dbengine.database.LLKeyValueDatabase;
import it.cavallium.dbengine.database.LLSnapshot; import it.cavallium.dbengine.database.LLSnapshot;
import it.cavallium.dbengine.database.UpdateMode; import it.cavallium.dbengine.database.UpdateMode;
@ -44,7 +44,6 @@ import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.CompactRangeOptions; import org.rocksdb.CompactRangeOptions;
import org.rocksdb.CompactionPriority; import org.rocksdb.CompactionPriority;
import org.rocksdb.CompactionStyle; import org.rocksdb.CompactionStyle;
import org.rocksdb.CompressionOptions;
import org.rocksdb.CompressionType; import org.rocksdb.CompressionType;
import org.rocksdb.DBOptions; import org.rocksdb.DBOptions;
import org.rocksdb.DbPath; import org.rocksdb.DbPath;
@ -56,6 +55,8 @@ import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException; import org.rocksdb.RocksDBException;
import org.rocksdb.Snapshot; import org.rocksdb.Snapshot;
import org.rocksdb.TransactionDB; import org.rocksdb.TransactionDB;
import org.rocksdb.TransactionDBOptions;
import org.rocksdb.TxnDBWritePolicy;
import org.rocksdb.WALRecoveryMode; import org.rocksdb.WALRecoveryMode;
import org.rocksdb.WriteBufferManager; import org.rocksdb.WriteBufferManager;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
@ -159,7 +160,12 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
if (databaseOptions.optimistic()) { if (databaseOptions.optimistic()) {
this.db = OptimisticTransactionDB.open(new DBOptions(rocksdbOptions), dbPathString, descriptors, handles); this.db = OptimisticTransactionDB.open(new DBOptions(rocksdbOptions), dbPathString, descriptors, handles);
} else { } else {
this.db = TransactionDB.open(new DBOptions(rocksdbOptions), dbPathString, descriptors, handles); this.db = TransactionDB.open(new DBOptions(rocksdbOptions),
new TransactionDBOptions().setWritePolicy(TxnDBWritePolicy.WRITE_COMMITTED),
dbPathString,
descriptors,
handles
);
} }
break; break;
} catch (RocksDBException ex) { } catch (RocksDBException ex) {
@ -519,8 +525,8 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
private RocksDBColumn getRocksDBColumn(RocksDB db, ColumnFamilyHandle cfh) { private RocksDBColumn getRocksDBColumn(RocksDB db, ColumnFamilyHandle cfh) {
if (db instanceof OptimisticTransactionDB optimisticTransactionDB) { if (db instanceof OptimisticTransactionDB optimisticTransactionDB) {
return new OptimisticRocksDBColumn(optimisticTransactionDB, databaseOptions, allocator, cfh, meterRegistry); return new OptimisticRocksDBColumn(optimisticTransactionDB, databaseOptions, allocator, cfh, meterRegistry);
} else if (db instanceof TransactionDB) { } else if (db instanceof TransactionDB transactionDB) {
return new PessimisticRocksDBColumn((TransactionDB) db, databaseOptions, allocator, cfh, meterRegistry); return new PessimisticRocksDBColumn(transactionDB, databaseOptions, allocator, cfh, meterRegistry);
} else { } else {
return new StandardRocksDBColumn(db, databaseOptions, allocator, cfh, meterRegistry); return new StandardRocksDBColumn(db, databaseOptions, allocator, cfh, meterRegistry);
} }

View File

@ -86,13 +86,7 @@ public final class PessimisticRocksDBColumn extends AbstractRocksDBColumn<Transa
@Nullable Buffer newData; @Nullable Buffer newData;
try (var sentData = prevDataToSendToUpdater == null ? null : prevDataToSendToUpdater.send()) { try (var sentData = prevDataToSendToUpdater == null ? null : prevDataToSendToUpdater.send()) {
try (var newDataToReceive = updater.apply(sentData)) { newData = updater.apply(sentData);
if (newDataToReceive != null) {
newData = newDataToReceive;
} else {
newData = null;
}
}
} }
try (newData) { try (newData) {
var newDataArray = newData == null ? null : LLUtils.toArray(newData); var newDataArray = newData == null ? null : LLUtils.toArray(newData);

View File

@ -78,7 +78,7 @@ public class LocalTemporaryDbGenerator implements TemporaryDbGenerator {
conn.getDatabase("testdb", conn.getDatabase("testdb",
List.of(Column.dictionary("testmap"), Column.special("ints"), Column.special("longs")), List.of(Column.dictionary("testmap"), Column.special("ints"), Column.special("longs")),
new DatabaseOptions(List.of(), Map.of(), true, false, true, false, new DatabaseOptions(List.of(), Map.of(), true, false, true, false,
true, canUseNettyDirect, true, -1, null) true, canUseNettyDirect, false, -1, null)
), ),
conn.getLuceneIndex("testluceneindex1", conn.getLuceneIndex("testluceneindex1",
1, 1,