RocksDB database abstraction

This commit is contained in:
Andrea Cavalli 2021-10-20 01:51:34 +02:00
parent 80d0ced888
commit 1625a5c44b
23 changed files with 1126 additions and 639 deletions

View File

@ -619,6 +619,9 @@ public class LLUtils {
);
}
assert buffer.isAccessible();
if (buffer.readOnly()) {
throw new IllegalStateException("Buffer is read only");
}
buffer.compact();
assert buffer.readerOffset() == 0;
AtomicLong nativeAddress = new AtomicLong(0);

View File

@ -0,0 +1,312 @@
package it.cavallium.dbengine.database.disk;
import static it.cavallium.dbengine.database.LLUtils.MARKER_ROCKSDB;
import static java.util.Objects.requireNonNull;
import static java.util.Objects.requireNonNullElse;
import io.net5.buffer.api.Buffer;
import io.net5.buffer.api.BufferAllocator;
import io.net5.buffer.api.MemoryManager;
import io.net5.buffer.api.Send;
import io.net5.util.internal.PlatformDependent;
import it.cavallium.dbengine.client.DatabaseOptions;
import it.cavallium.dbengine.database.LLDelta;
import it.cavallium.dbengine.database.LLDictionaryResultType;
import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.LLUtils.DirectBuffer;
import it.cavallium.dbengine.database.RepeatedElementList;
import it.cavallium.dbengine.database.serialization.SerializationFunction;
import it.cavallium.dbengine.lucene.ExponentialPageLimits;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.locks.LockSupport;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.CompactRangeOptions;
import org.rocksdb.FlushOptions;
import org.rocksdb.Holder;
import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.RocksIterator;
import org.rocksdb.Transaction;
import org.rocksdb.WriteBatch;
import org.rocksdb.WriteOptions;
import org.warp.commonutils.log.Logger;
import org.warp.commonutils.log.LoggerFactory;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
public sealed abstract class AbstractRocksDBColumn<T extends RocksDB> implements RocksDBColumn
permits StandardRocksDBColumn, OptimisticRocksDBColumn, PessimisticRocksDBColumn {
private static final int INITIAL_DIRECT_READ_BYTE_BUF_SIZE_BYTES = 4096;
private static final byte[] NO_DATA = new byte[0];
protected static final UpdateAtomicResult RESULT_NOTHING = new UpdateAtomicResultNothing();
protected final Logger logger = LoggerFactory.getLogger(this.getClass());
private final T db;
private final DatabaseOptions opts;
private final BufferAllocator alloc;
private final ColumnFamilyHandle cfh;
public AbstractRocksDBColumn(T db, DatabaseOptions databaseOptions, BufferAllocator alloc, ColumnFamilyHandle cfh) {
this.db = db;
this.opts = databaseOptions;
this.alloc = alloc;
this.cfh = cfh;
}
protected T getDb() {
return db;
}
protected DatabaseOptions getOpts() {
return opts;
}
protected ColumnFamilyHandle getCfh() {
return cfh;
}
@Override
public @Nullable Send<Buffer> get(@NotNull ReadOptions readOptions,
Send<Buffer> keySend,
boolean existsAlmostCertainly) throws RocksDBException {
try (var key = keySend.receive()) {
if (Schedulers.isInNonBlockingThread()) {
throw new UnsupportedOperationException("Called dbGet in a nonblocking thread");
}
if (opts.allowNettyDirect()) {
//todo: implement keyMayExist if existsAlmostCertainly is false.
// Unfortunately it's not feasible until RocksDB implements keyMayExist with buffers
// Create the key nio buffer to pass to RocksDB
var keyNioBuffer = LLUtils.convertToReadableDirect(alloc, key.send());
// Create a direct result buffer because RocksDB works only with direct buffers
try (Buffer resultBuf = alloc.allocate(INITIAL_DIRECT_READ_BYTE_BUF_SIZE_BYTES)) {
int valueSize;
int assertionReadData = -1;
ByteBuffer resultNioBuf;
do {
// Create the result nio buffer to pass to RocksDB
resultNioBuf = LLUtils.obtainDirect(resultBuf, true);
assert keyNioBuffer.byteBuffer().isDirect();
assert resultNioBuf.isDirect();
valueSize = db.get(cfh,
readOptions,
keyNioBuffer.byteBuffer().position(0),
resultNioBuf
);
if (valueSize != RocksDB.NOT_FOUND) {
// todo: check if position is equal to data that have been read
// todo: check if limit is equal to value size or data that have been read
assert valueSize <= 0 || resultNioBuf.limit() > 0;
// Check if read data is not bigger than the total value size.
// If it's bigger it means that RocksDB is writing the start
// of the result into the result buffer more than once.
assert resultNioBuf.limit() <= valueSize;
if (valueSize <= resultNioBuf.limit()) {
// Return the result ready to be read
return resultBuf.readerOffset(0).writerOffset(valueSize).send();
} else {
//noinspection UnusedAssignment
resultNioBuf = null;
}
// Rewind the keyNioBuf position, making it readable again for the next loop iteration
keyNioBuffer.byteBuffer().rewind();
if (resultBuf.capacity() < valueSize) {
// Expand the resultBuf size if the result is bigger than the current result
// buffer size
resultBuf.ensureWritable(valueSize);
}
}
// Repeat if the result has been found but it's still not finished
} while (valueSize != RocksDB.NOT_FOUND);
// If the value is not found return null
return null;
} finally {
keyNioBuffer.buffer().close();
PlatformDependent.freeDirectBuffer(keyNioBuffer.byteBuffer());
}
} else {
try {
byte[] keyArray = LLUtils.toArray(key);
requireNonNull(keyArray);
Holder<byte[]> data = existsAlmostCertainly ? null : new Holder<>();
if (existsAlmostCertainly || db.keyMayExist(cfh, readOptions, keyArray, data)) {
if (!existsAlmostCertainly && data.getValue() != null) {
return LLUtils.fromByteArray(alloc, data.getValue()).send();
} else {
byte[] result = db.get(cfh, readOptions, keyArray);
if (result == null) {
return null;
} else {
return LLUtils.fromByteArray(alloc, result).send();
}
}
} else {
return null;
}
} finally {
if (!(readOptions instanceof UnreleasableReadOptions)) {
readOptions.close();
}
}
}
}
}
@Override
public void put(@NotNull WriteOptions writeOptions, Send<Buffer> keyToReceive,
Send<Buffer> valueToReceive) throws RocksDBException {
try {
try (var key = keyToReceive.receive()) {
try (var value = valueToReceive.receive()) {
if (Schedulers.isInNonBlockingThread()) {
throw new UnsupportedOperationException("Called dbPut in a nonblocking thread");
}
assert key.isAccessible();
assert value.isAccessible();
if (opts.allowNettyDirect()) {
var keyNioBuffer = LLUtils.convertToReadableDirect(alloc, key.send());
try (var ignored1 = keyNioBuffer.buffer().receive()) {
assert keyNioBuffer.byteBuffer().isDirect();
var valueNioBuffer = LLUtils.convertToReadableDirect(alloc, value.send());
try (var ignored2 = valueNioBuffer.buffer().receive()) {
assert valueNioBuffer.byteBuffer().isDirect();
db.put(cfh, writeOptions, keyNioBuffer.byteBuffer(), valueNioBuffer.byteBuffer());
} finally {
PlatformDependent.freeDirectBuffer(valueNioBuffer.byteBuffer());
}
} finally {
PlatformDependent.freeDirectBuffer(keyNioBuffer.byteBuffer());
}
} else {
db.put(cfh, writeOptions, LLUtils.toArray(key), LLUtils.toArray(value));
}
}
}
} finally {
if (writeOptions != null && !(writeOptions instanceof UnreleasableWriteOptions)) {
writeOptions.close();
}
}
}
@Override
public boolean exists(@NotNull ReadOptions readOptions, Send<Buffer> keySend) throws RocksDBException {
try (var key = keySend.receive()) {
if (Schedulers.isInNonBlockingThread()) {
throw new UnsupportedOperationException("Called containsKey in a nonblocking thread");
}
int size = RocksDB.NOT_FOUND;
byte[] keyBytes = LLUtils.toArray(key);
Holder<byte[]> data = new Holder<>();
try {
if (db.keyMayExist(cfh, readOptions, keyBytes, data)) {
if (data.getValue() != null) {
size = data.getValue().length;
} else {
size = db.get(cfh, readOptions, keyBytes, NO_DATA);
}
}
} finally {
if (readOptions != null && !(readOptions instanceof UnreleasableReadOptions)) {
readOptions.close();
}
}
return size != RocksDB.NOT_FOUND;
}
}
@Override
public void delete(WriteOptions writeOptions, Send<Buffer> keySend) throws RocksDBException {
try (var key = keySend.receive()) {
if (opts.allowNettyDirect()) {
DirectBuffer keyNioBuffer = LLUtils.convertToReadableDirect(alloc, key.send());
try {
db.delete(cfh, writeOptions, keyNioBuffer.byteBuffer());
} finally {
keyNioBuffer.buffer().close();
PlatformDependent.freeDirectBuffer(keyNioBuffer.byteBuffer());
}
} else {
db.delete(cfh, writeOptions, LLUtils.toArray(key));
}
}
}
@Override
public void delete(WriteOptions writeOptions, byte[] key) throws RocksDBException {
db.delete(cfh, writeOptions, key);
}
@Override
public List<byte[]> multiGetAsList(ReadOptions readOptions, List<byte[]> keys) throws RocksDBException {
var columnFamilyHandles = new RepeatedElementList<>(cfh, keys.size());
return db.multiGetAsList(readOptions, columnFamilyHandles, keys);
}
@Override
public void suggestCompactRange() throws RocksDBException {
db.suggestCompactRange(cfh);
}
@Override
public void compactRange(byte[] begin, byte[] end, CompactRangeOptions options)
throws RocksDBException {
db.compactRange(cfh, begin, end, options);
}
@Override
public void flush(FlushOptions options) throws RocksDBException {
db.flush(options, cfh);
}
@Override
public void flushWal(boolean sync) throws RocksDBException {
db.flushWal(sync);
}
@Override
public long getLongProperty(String property) throws RocksDBException {
return db.getLongProperty(cfh, property);
}
@Override
public void write(WriteOptions writeOptions, WriteBatch writeBatch) throws RocksDBException {
db.write(writeOptions, writeBatch);
}
/**
* @return true if committed successfully
*/
protected abstract boolean commitOptimistically(Transaction tx) throws RocksDBException;
protected abstract Transaction beginTransaction(@NotNull WriteOptions writeOptions);
@Override
@NotNull
public RocksIterator newIterator(@NotNull ReadOptions readOptions) {
return db.newIterator(cfh, readOptions);
}
@Override
public ColumnFamilyHandle getColumnFamilyHandle() {
return cfh;
}
@Override
public BufferAllocator getAllocator() {
return alloc;
}
}

View File

@ -11,14 +11,12 @@ import org.rocksdb.RocksDB;
public class LLLocalEntryReactiveRocksIterator extends LLLocalReactiveRocksIterator<Send<LLEntry>> {
public LLLocalEntryReactiveRocksIterator(RocksDB db,
BufferAllocator alloc,
ColumnFamilyHandle cfh,
public LLLocalEntryReactiveRocksIterator(RocksDBColumn db,
Send<LLRange> range,
boolean allowNettyDirect,
ReadOptions readOptions,
String debugName) {
super(db, alloc, cfh, range, allowNettyDirect, readOptions, true);
super(db, range, allowNettyDirect, readOptions, true);
}
@Override

View File

@ -12,13 +12,13 @@ import org.rocksdb.RocksDB;
public class LLLocalGroupedEntryReactiveRocksIterator extends
LLLocalGroupedReactiveRocksIterator<Send<LLEntry>> {
public LLLocalGroupedEntryReactiveRocksIterator(RocksDB db, BufferAllocator alloc, ColumnFamilyHandle cfh,
public LLLocalGroupedEntryReactiveRocksIterator(RocksDBColumn db,
int prefixLength,
Send<LLRange> range,
boolean allowNettyDirect,
ReadOptions readOptions,
String debugName) {
super(db, alloc, cfh, prefixLength, range, allowNettyDirect, readOptions, false, true);
super(db, prefixLength, range, allowNettyDirect, readOptions, false, true);
}
@Override

View File

@ -10,15 +10,13 @@ import org.rocksdb.RocksDB;
public class LLLocalGroupedKeyReactiveRocksIterator extends LLLocalGroupedReactiveRocksIterator<Send<Buffer>> {
public LLLocalGroupedKeyReactiveRocksIterator(RocksDB db,
BufferAllocator alloc,
ColumnFamilyHandle cfh,
public LLLocalGroupedKeyReactiveRocksIterator(RocksDBColumn db,
int prefixLength,
Send<LLRange> range,
boolean allowNettyDirect,
ReadOptions readOptions,
String debugName) {
super(db, alloc, cfh, prefixLength, range, allowNettyDirect, readOptions, true, false);
super(db, prefixLength, range, allowNettyDirect, readOptions, true, false);
}
@Override

View File

@ -21,9 +21,7 @@ import reactor.core.publisher.Flux;
public abstract class LLLocalGroupedReactiveRocksIterator<T> {
protected static final Logger logger = LoggerFactory.getLogger(LLLocalGroupedReactiveRocksIterator.class);
private final RocksDB db;
private final BufferAllocator alloc;
private final ColumnFamilyHandle cfh;
private final RocksDBColumn db;
private final int prefixLength;
private final LLRange range;
private final boolean allowNettyDirect;
@ -31,7 +29,7 @@ public abstract class LLLocalGroupedReactiveRocksIterator<T> {
private final boolean canFillCache;
private final boolean readValues;
public LLLocalGroupedReactiveRocksIterator(RocksDB db, BufferAllocator alloc, ColumnFamilyHandle cfh,
public LLLocalGroupedReactiveRocksIterator(RocksDBColumn db,
int prefixLength,
Send<LLRange> range,
boolean allowNettyDirect,
@ -40,8 +38,6 @@ public abstract class LLLocalGroupedReactiveRocksIterator<T> {
boolean readValues) {
try (range) {
this.db = db;
this.alloc = alloc;
this.cfh = cfh;
this.prefixLength = prefixLength;
this.range = range.receive();
this.allowNettyDirect = allowNettyDirect;
@ -60,7 +56,7 @@ public abstract class LLLocalGroupedReactiveRocksIterator<T> {
if (logger.isTraceEnabled()) {
logger.trace(MARKER_ROCKSDB, "Range {} started", LLUtils.toStringSafe(range));
}
return LLLocalDictionary.getRocksIterator(alloc, allowNettyDirect, readOptions, range.copy().send(), db, cfh);
return LLLocalDictionary.getRocksIterator(db.getAllocator(), allowNettyDirect, readOptions, range.copy().send(), db);
}, (tuple, sink) -> {
try {
var rocksIterator = tuple.getT1();
@ -69,7 +65,7 @@ public abstract class LLLocalGroupedReactiveRocksIterator<T> {
try {
rocksIterator.status();
while (rocksIterator.isValid()) {
try (Buffer key = LLUtils.readDirectNioBuffer(alloc, rocksIterator::key)) {
try (Buffer key = LLUtils.readDirectNioBuffer(db.getAllocator(), rocksIterator::key)) {
if (firstGroupKey == null) {
firstGroupKey = key.copy();
} else if (!LLUtils.equals(firstGroupKey, firstGroupKey.readerOffset(),
@ -78,7 +74,7 @@ public abstract class LLLocalGroupedReactiveRocksIterator<T> {
}
@Nullable Buffer value;
if (readValues) {
value = LLUtils.readDirectNioBuffer(alloc, rocksIterator::value);
value = LLUtils.readDirectNioBuffer(db.getAllocator(), rocksIterator::value);
} else {
value = null;
}

View File

@ -18,9 +18,7 @@ import reactor.core.publisher.Flux;
public class LLLocalKeyPrefixReactiveRocksIterator {
protected static final Logger logger = LoggerFactory.getLogger(LLLocalKeyPrefixReactiveRocksIterator.class);
private final RocksDB db;
private final BufferAllocator alloc;
private final ColumnFamilyHandle cfh;
private final RocksDBColumn db;
private final int prefixLength;
private final LLRange range;
private final boolean allowNettyDirect;
@ -28,7 +26,7 @@ public class LLLocalKeyPrefixReactiveRocksIterator {
private final boolean canFillCache;
private final String debugName;
public LLLocalKeyPrefixReactiveRocksIterator(RocksDB db, BufferAllocator alloc, ColumnFamilyHandle cfh,
public LLLocalKeyPrefixReactiveRocksIterator(RocksDBColumn db,
int prefixLength,
Send<LLRange> range,
boolean allowNettyDirect,
@ -37,8 +35,6 @@ public class LLLocalKeyPrefixReactiveRocksIterator {
String debugName) {
try (range) {
this.db = db;
this.alloc = alloc;
this.cfh = cfh;
this.prefixLength = prefixLength;
this.range = range.receive();
this.allowNettyDirect = allowNettyDirect;
@ -62,7 +58,7 @@ public class LLLocalKeyPrefixReactiveRocksIterator {
if (logger.isTraceEnabled()) {
logger.trace(MARKER_ROCKSDB, "Range {} started", LLUtils.toStringSafe(range));
}
return LLLocalDictionary.getRocksIterator(alloc, allowNettyDirect, readOptions, rangeSend, db, cfh);
return LLLocalDictionary.getRocksIterator(db.getAllocator(), allowNettyDirect, readOptions, rangeSend, db);
}, (tuple, sink) -> {
try {
var rocksIterator = tuple.getT1();
@ -72,9 +68,9 @@ public class LLLocalKeyPrefixReactiveRocksIterator {
while (rocksIterator.isValid()) {
Buffer key;
if (allowNettyDirect) {
key = LLUtils.readDirectNioBuffer(alloc, rocksIterator::key);
key = LLUtils.readDirectNioBuffer(db.getAllocator(), rocksIterator::key);
} else {
key = LLUtils.fromByteArray(alloc, rocksIterator.key());
key = LLUtils.fromByteArray(db.getAllocator(), rocksIterator.key());
}
try (key) {
if (firstGroupKey == null) {

View File

@ -10,13 +10,11 @@ import org.rocksdb.RocksDB;
public class LLLocalKeyReactiveRocksIterator extends LLLocalReactiveRocksIterator<Send<Buffer>> {
public LLLocalKeyReactiveRocksIterator(RocksDB db,
BufferAllocator alloc,
ColumnFamilyHandle cfh,
public LLLocalKeyReactiveRocksIterator(RocksDBColumn db,
Send<LLRange> range,
boolean allowNettyDirect,
ReadOptions readOptions) {
super(db, alloc, cfh, range, allowNettyDirect, readOptions, false);
super(db, range, allowNettyDirect, readOptions, false);
}
@Override

View File

@ -44,14 +44,15 @@ import org.rocksdb.DBOptions;
import org.rocksdb.DbPath;
import org.rocksdb.FlushOptions;
import org.rocksdb.IndexType;
import org.rocksdb.MergeOperator;
import org.rocksdb.OptimisticTransactionDB;
import org.rocksdb.Options;
import org.rocksdb.RateLimiter;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.Snapshot;
import org.rocksdb.Transaction;
import org.rocksdb.TransactionDB;
import org.rocksdb.TransactionDBOptions;
import org.rocksdb.WALRecoveryMode;
import org.rocksdb.WriteBufferManager;
import org.warp.commonutils.log.Logger;
@ -80,7 +81,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
private final DatabaseOptions databaseOptions;
private final boolean enableColumnsBug;
private OptimisticTransactionDB db;
private RocksDB db;
private final Map<Column, ColumnFamilyHandle> handles;
private final ConcurrentHashMap<Long, Snapshot> snapshotsHandles = new ConcurrentHashMap<>();
private final AtomicLong nextSnapshotNumbers = new AtomicLong(1);
@ -149,11 +150,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
while (true) {
try {
// a factory method that returns a RocksDB instance
this.db = OptimisticTransactionDB.open(new DBOptions(rocksdbOptions),
dbPathString,
descriptors,
handles
);
this.db = TransactionDB.open(new DBOptions(rocksdbOptions), new TransactionDBOptions(), dbPathString, descriptors, handles);
break;
} catch (RocksDBException ex) {
switch (ex.getMessage()) {
@ -197,7 +194,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
return name;
}
private void flushAndCloseDb(OptimisticTransactionDB db, List<ColumnFamilyHandle> handles)
private void flushAndCloseDb(RocksDB db, List<ColumnFamilyHandle> handles)
throws RocksDBException {
flushDb(db, handles);
@ -226,7 +223,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
}
}
private void flushDb(OptimisticTransactionDB db, List<ColumnFamilyHandle> handles) throws RocksDBException {
private void flushDb(RocksDB db, List<ColumnFamilyHandle> handles) throws RocksDBException {
if (Schedulers.isInNonBlockingThread()) {
logger.error("Called flushDb in a nonblocking thread");
}
@ -243,7 +240,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
}
@SuppressWarnings("unused")
private void compactDb(OptimisticTransactionDB db, List<ColumnFamilyHandle> handles) {
private void compactDb(TransactionDB db, List<ColumnFamilyHandle> handles) {
if (Schedulers.isInNonBlockingThread()) {
logger.error("Called compactDb in a nonblocking thread");
}
@ -455,7 +452,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
LinkedList<ColumnFamilyHandle> handles = new LinkedList<>();
this.db = OptimisticTransactionDB.open(options, dbPathString);
this.db = TransactionDB.open(options, new TransactionDBOptions(), dbPathString);
for (ColumnFamilyDescriptor columnFamilyDescriptor : descriptorsToCreate) {
handles.add(db.createColumnFamily(columnFamilyDescriptor));
}
@ -484,8 +481,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
return Mono
.fromCallable(() -> new LLLocalDictionary(
allocator,
db,
getCfh(columnName),
getRocksDBColumn(db, getCfh(columnName)),
name,
Column.toString(columnName),
dbScheduler,
@ -496,6 +492,16 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
.subscribeOn(dbScheduler);
}
private RocksDBColumn getRocksDBColumn(RocksDB db, ColumnFamilyHandle cfh) {
if (db instanceof OptimisticTransactionDB optimisticTransactionDB) {
return new OptimisticRocksDBColumn(optimisticTransactionDB, databaseOptions, allocator, cfh);
} else if (db instanceof TransactionDB) {
return new PessimisticRocksDBColumn((TransactionDB) db, databaseOptions, allocator, cfh);
} else {
return new StandardRocksDBColumn(db, databaseOptions, allocator, cfh);
}
}
private ColumnFamilyHandle getCfh(byte[] columnName) throws RocksDBException {
ColumnFamilyHandle cfh = handles.get(Column.special(Column.toString(columnName)));
assert enableColumnsBug || Arrays.equals(cfh.getName(), columnName);

View File

@ -23,24 +23,18 @@ public abstract class LLLocalReactiveRocksIterator<T> {
protected static final Logger logger = LoggerFactory.getLogger(LLLocalReactiveRocksIterator.class);
private final AtomicBoolean released = new AtomicBoolean(false);
private final RocksDB db;
private final BufferAllocator alloc;
private final ColumnFamilyHandle cfh;
private final RocksDBColumn db;
private final LLRange range;
private final boolean allowNettyDirect;
private final ReadOptions readOptions;
private final boolean readValues;
public LLLocalReactiveRocksIterator(RocksDB db,
BufferAllocator alloc,
ColumnFamilyHandle cfh,
public LLLocalReactiveRocksIterator(RocksDBColumn db,
Send<LLRange> range,
boolean allowNettyDirect,
ReadOptions readOptions,
boolean readValues) {
this.db = db;
this.alloc = alloc;
this.cfh = cfh;
this.range = range.receive();
this.allowNettyDirect = allowNettyDirect;
this.readOptions = readOptions;
@ -58,7 +52,7 @@ public abstract class LLLocalReactiveRocksIterator<T> {
if (logger.isTraceEnabled()) {
logger.trace(MARKER_ROCKSDB, "Range {} started", LLUtils.toStringSafe(range));
}
return getRocksIterator(alloc, allowNettyDirect, readOptions, range.copy().send(), db, cfh);
return getRocksIterator(db.getAllocator(), allowNettyDirect, readOptions, range.copy().send(), db);
}, (tuple, sink) -> {
try {
var rocksIterator = tuple.getT1();
@ -66,17 +60,17 @@ public abstract class LLLocalReactiveRocksIterator<T> {
if (rocksIterator.isValid()) {
Buffer key;
if (allowNettyDirect) {
key = LLUtils.readDirectNioBuffer(alloc, rocksIterator::key);
key = LLUtils.readDirectNioBuffer(db.getAllocator(), rocksIterator::key);
} else {
key = LLUtils.fromByteArray(alloc, rocksIterator.key());
key = LLUtils.fromByteArray(db.getAllocator(), rocksIterator.key());
}
try (key) {
Buffer value;
if (readValues) {
if (allowNettyDirect) {
value = LLUtils.readDirectNioBuffer(alloc, rocksIterator::value);
value = LLUtils.readDirectNioBuffer(db.getAllocator(), rocksIterator::value);
} else {
value = LLUtils.fromByteArray(alloc, rocksIterator.value());
value = LLUtils.fromByteArray(db.getAllocator(), rocksIterator.value());
}
} else {
value = null;

View File

@ -0,0 +1,216 @@
package it.cavallium.dbengine.database.disk;
import static it.cavallium.dbengine.database.LLUtils.MARKER_ROCKSDB;
import io.net5.buffer.api.Buffer;
import io.net5.buffer.api.BufferAllocator;
import io.net5.buffer.api.MemoryManager;
import io.net5.buffer.api.Send;
import it.cavallium.dbengine.client.DatabaseOptions;
import it.cavallium.dbengine.database.LLDelta;
import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.serialization.SerializationFunction;
import it.cavallium.dbengine.lucene.ExponentialPageLimits;
import java.io.IOException;
import java.util.concurrent.ThreadLocalRandom;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.OptimisticTransactionDB;
import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDBException;
import org.rocksdb.Status.Code;
import org.rocksdb.Transaction;
import org.rocksdb.WriteBatch;
import org.rocksdb.WriteOptions;
import reactor.core.scheduler.Schedulers;
public final class OptimisticRocksDBColumn extends AbstractRocksDBColumn<OptimisticTransactionDB> {
public OptimisticRocksDBColumn(OptimisticTransactionDB db,
DatabaseOptions databaseOptions,
BufferAllocator alloc,
ColumnFamilyHandle cfh) {
super(db, databaseOptions, alloc, cfh);
}
@Override
protected boolean commitOptimistically(Transaction tx) throws RocksDBException {
try {
tx.commit();
return true;
} catch (RocksDBException ex) {
var status = ex.getStatus() != null ? ex.getStatus().getCode() : Code.Ok;
if (status == Code.Busy || status == Code.TryAgain) {
return false;
}
throw ex;
}
}
@Override
protected Transaction beginTransaction(@NotNull WriteOptions writeOptions) {
return getDb().beginTransaction(writeOptions);
}
@Override
public void write(WriteOptions writeOptions, WriteBatch writeBatch) throws RocksDBException {
getDb().write(writeOptions, writeBatch);
}
@Override
public @NotNull UpdateAtomicResult updateAtomic(@NotNull ReadOptions readOptions,
@NotNull WriteOptions writeOptions,
Send<Buffer> keySend,
SerializationFunction<@Nullable Send<Buffer>, @Nullable Send<Buffer>> updater,
boolean existsAlmostCertainly,
UpdateAtomicResultMode returnMode) throws IOException, RocksDBException {
try (Buffer key = keySend.receive()) {
var cfh = getCfh();
var keyArray = LLUtils.toArray(key);
if (Schedulers.isInNonBlockingThread()) {
throw new UnsupportedOperationException("Called update in a nonblocking thread");
}
try (var tx = beginTransaction(writeOptions)) {
boolean committedSuccessfully;
int retries = 0;
ExponentialPageLimits retryTime = null;
Send<Buffer> sentPrevData;
Send<Buffer> sentCurData;
boolean changed;
do {
var prevDataArray = tx.getForUpdate(readOptions, cfh, keyArray, true);
if (logger.isTraceEnabled()) {
logger.trace(MARKER_ROCKSDB,
"Reading {}: {} (before update)",
LLUtils.toStringSafe(key),
LLUtils.toStringSafe(prevDataArray)
);
}
Buffer prevData;
if (prevDataArray != null) {
prevData = MemoryManager.unsafeWrap(prevDataArray);
} else {
prevData = null;
}
try (prevData) {
Buffer prevDataToSendToUpdater;
if (prevData != null) {
prevDataToSendToUpdater = prevData.copy();
} else {
prevDataToSendToUpdater = null;
}
@Nullable Buffer newData;
try (var sentData = prevDataToSendToUpdater == null ? null : prevDataToSendToUpdater.send()) {
try (var newDataToReceive = updater.apply(sentData)) {
if (newDataToReceive != null) {
newData = newDataToReceive.receive();
} else {
newData = null;
}
}
}
try (newData) {
var newDataArray = newData == null ? null : LLUtils.toArray(newData);
if (logger.isTraceEnabled()) {
logger.trace(MARKER_ROCKSDB,
"Updating {}. previous data: {}, updated data: {}",
LLUtils.toStringSafe(key),
LLUtils.toStringSafe(prevDataArray),
LLUtils.toStringSafe(newDataArray)
);
}
if (prevData != null && newData == null) {
if (logger.isTraceEnabled()) {
logger.trace(MARKER_ROCKSDB, "Deleting {} (after update)", LLUtils.toStringSafe(key));
}
tx.delete(cfh, keyArray, true);
changed = true;
committedSuccessfully = commitOptimistically(tx);
} else if (newData != null && (prevData == null || !LLUtils.equals(prevData, newData))) {
if (logger.isTraceEnabled()) {
logger.trace(MARKER_ROCKSDB,
"Writing {}: {} (after update)",
LLUtils.toStringSafe(key),
LLUtils.toStringSafe(newData)
);
}
tx.put(cfh, keyArray, newDataArray);
changed = true;
committedSuccessfully = commitOptimistically(tx);
} else {
changed = false;
committedSuccessfully = true;
tx.rollback();
}
sentPrevData = prevData == null ? null : prevData.send();
sentCurData = newData == null ? null : newData.send();
if (!committedSuccessfully) {
if (sentPrevData != null) {
sentPrevData.close();
}
if (sentCurData != null) {
sentCurData.close();
}
retries++;
if (retries >= 100 && retries % 100 == 0) {
logger.warn(MARKER_ROCKSDB, "Failed optimistic transaction {} (update):"
+ " waiting 5ms before retrying for the {} time", LLUtils.toStringSafe(key), retries);
} else if (logger.isDebugEnabled(MARKER_ROCKSDB)) {
logger.debug(MARKER_ROCKSDB, "Failed optimistic transaction {} (update):"
+ " waiting 5ms before retrying", LLUtils.toStringSafe(key));
}
if (retries == 1) {
retryTime = new ExponentialPageLimits(0, 5, 2000);
}
long retryMs = retryTime.getPageLimit(retries);
// +- 20%
retryMs = retryMs + (long) (retryMs * 0.2d * ThreadLocalRandom.current().nextDouble(-1.0d, 1.0d));
// Wait for 5ms
try {
Thread.sleep(retryMs);
} catch (InterruptedException e) {
throw new RocksDBException("Interrupted");
}
}
}
}
} while (!committedSuccessfully);
if (retries > 5) {
logger.warn(MARKER_ROCKSDB, "Took {} retries to update key {}", retries, LLUtils.toStringSafe(key));
}
return switch (returnMode) {
case NOTHING -> {
if (sentPrevData != null) {
sentPrevData.close();
}
if (sentCurData != null) {
sentCurData.close();
}
yield RESULT_NOTHING;
}
case CURRENT -> {
if (sentPrevData != null) {
sentPrevData.close();
}
yield new UpdateAtomicResultCurrent(sentCurData);
}
case PREVIOUS -> {
if (sentCurData != null) {
sentCurData.close();
}
yield new UpdateAtomicResultPrevious(sentPrevData);
}
case BINARY_CHANGED -> new UpdateAtomicResultBinaryChanged(changed);
case DELTA -> new UpdateAtomicResultDelta(LLDelta.of(sentPrevData, sentCurData).send());
};
}
}
}
@Override
public boolean supportsTransactions() {
return true;
}
}

View File

@ -0,0 +1,165 @@
package it.cavallium.dbengine.database.disk;
import static it.cavallium.dbengine.database.LLUtils.MARKER_ROCKSDB;
import io.net5.buffer.api.Buffer;
import io.net5.buffer.api.BufferAllocator;
import io.net5.buffer.api.MemoryManager;
import io.net5.buffer.api.Send;
import it.cavallium.dbengine.client.DatabaseOptions;
import it.cavallium.dbengine.database.LLDelta;
import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.serialization.SerializationFunction;
import java.io.IOException;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDBException;
import org.rocksdb.Transaction;
import org.rocksdb.TransactionDB;
import org.rocksdb.TransactionOptions;
import org.rocksdb.WriteOptions;
import reactor.core.scheduler.Schedulers;
public final class PessimisticRocksDBColumn extends AbstractRocksDBColumn<TransactionDB> {
private static final TransactionOptions DEFAULT_TX_OPTIONS = new TransactionOptions();
public PessimisticRocksDBColumn(TransactionDB db,
DatabaseOptions databaseOptions,
BufferAllocator alloc,
ColumnFamilyHandle cfh) {
super(db, databaseOptions, alloc, cfh);
}
@Override
protected boolean commitOptimistically(Transaction tx) throws RocksDBException {
tx.commit();
return true;
}
@Override
protected Transaction beginTransaction(@NotNull WriteOptions writeOptions) {
return getDb().beginTransaction(writeOptions, DEFAULT_TX_OPTIONS);
}
@Override
public @NotNull UpdateAtomicResult updateAtomic(@NotNull ReadOptions readOptions,
@NotNull WriteOptions writeOptions,
Send<Buffer> keySend,
SerializationFunction<@Nullable Send<Buffer>, @Nullable Send<Buffer>> updater,
boolean existsAlmostCertainly,
UpdateAtomicResultMode returnMode) throws IOException, RocksDBException {
try (Buffer key = keySend.receive()) {
var cfh = getCfh();
var keyArray = LLUtils.toArray(key);
if (Schedulers.isInNonBlockingThread()) {
throw new UnsupportedOperationException("Called update in a nonblocking thread");
}
try (var tx = beginTransaction(writeOptions)) {
Send<Buffer> sentPrevData;
Send<Buffer> sentCurData;
boolean changed;
var prevDataArray = tx.getForUpdate(readOptions, cfh, keyArray, true);
if (logger.isTraceEnabled()) {
logger.trace(MARKER_ROCKSDB,
"Reading {}: {} (before update)",
LLUtils.toStringSafe(key),
LLUtils.toStringSafe(prevDataArray)
);
}
Buffer prevData;
if (prevDataArray != null) {
prevData = MemoryManager.unsafeWrap(prevDataArray);
} else {
prevData = null;
}
try (prevData) {
Buffer prevDataToSendToUpdater;
if (prevData != null) {
prevDataToSendToUpdater = prevData.copy();
} else {
prevDataToSendToUpdater = null;
}
@Nullable Buffer newData;
try (var sentData = prevDataToSendToUpdater == null ? null : prevDataToSendToUpdater.send()) {
try (var newDataToReceive = updater.apply(sentData)) {
if (newDataToReceive != null) {
newData = newDataToReceive.receive();
} else {
newData = null;
}
}
}
try (newData) {
var newDataArray = newData == null ? null : LLUtils.toArray(newData);
if (logger.isTraceEnabled()) {
logger.trace(MARKER_ROCKSDB,
"Updating {}. previous data: {}, updated data: {}",
LLUtils.toStringSafe(key),
LLUtils.toStringSafe(prevDataArray),
LLUtils.toStringSafe(newDataArray)
);
}
if (prevData != null && newData == null) {
if (logger.isTraceEnabled()) {
logger.trace(MARKER_ROCKSDB, "Deleting {} (after update)", LLUtils.toStringSafe(key));
}
tx.delete(cfh, keyArray, true);
changed = true;
tx.commit();
} else if (newData != null && (prevData == null || !LLUtils.equals(prevData, newData))) {
if (logger.isTraceEnabled()) {
logger.trace(MARKER_ROCKSDB,
"Writing {}: {} (after update)",
LLUtils.toStringSafe(key),
LLUtils.toStringSafe(newData)
);
}
tx.put(cfh, keyArray, newDataArray);
changed = true;
tx.commit();
} else {
changed = false;
tx.rollback();
}
sentPrevData = prevData == null ? null : prevData.send();
sentCurData = newData == null ? null : newData.send();
}
}
return switch (returnMode) {
case NOTHING -> {
if (sentPrevData != null) {
sentPrevData.close();
}
if (sentCurData != null) {
sentCurData.close();
}
yield RESULT_NOTHING;
}
case CURRENT -> {
if (sentPrevData != null) {
sentPrevData.close();
}
yield new UpdateAtomicResultCurrent(sentCurData);
}
case PREVIOUS -> {
if (sentCurData != null) {
sentCurData.close();
}
yield new UpdateAtomicResultPrevious(sentPrevData);
}
case BINARY_CHANGED -> new UpdateAtomicResultBinaryChanged(changed);
case DELTA -> new UpdateAtomicResultDelta(LLDelta.of(sentPrevData, sentCurData).send());
};
}
}
}
@Override
public boolean supportsTransactions() {
return true;
}
}

View File

@ -0,0 +1,66 @@
package it.cavallium.dbengine.database.disk;
import io.net5.buffer.api.Buffer;
import io.net5.buffer.api.BufferAllocator;
import io.net5.buffer.api.Send;
import it.cavallium.dbengine.database.RepeatedElementList;
import it.cavallium.dbengine.database.UpdateReturnMode;
import it.cavallium.dbengine.database.serialization.SerializationFunction;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.function.Function;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.CompactRangeOptions;
import org.rocksdb.FlushOptions;
import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDBException;
import org.rocksdb.RocksIterator;
import org.rocksdb.Transaction;
import org.rocksdb.TransactionOptions;
import org.rocksdb.WriteBatch;
import org.rocksdb.WriteOptions;
public sealed interface RocksDBColumn permits AbstractRocksDBColumn {
@Nullable
Send<Buffer> get(@NotNull ReadOptions readOptions, Send<Buffer> keySend,
boolean existsAlmostCertainly) throws RocksDBException;
boolean exists(@NotNull ReadOptions readOptions, Send<Buffer> keySend) throws RocksDBException;
void put(@NotNull WriteOptions writeOptions, Send<Buffer> keyToReceive,
Send<Buffer> valueToReceive) throws RocksDBException;
@NotNull RocksIterator newIterator(@NotNull ReadOptions readOptions);
@NotNull UpdateAtomicResult updateAtomic(@NotNull ReadOptions readOptions, @NotNull WriteOptions writeOptions,
Send<Buffer> keySend, SerializationFunction<@Nullable Send<Buffer>, @Nullable Send<Buffer>> updater,
boolean existsAlmostCertainly, UpdateAtomicResultMode returnMode) throws RocksDBException, IOException;
void delete(WriteOptions writeOptions, Send<Buffer> keySend) throws RocksDBException;
void delete(WriteOptions writeOptions, byte[] key) throws RocksDBException;
List<byte[]> multiGetAsList(ReadOptions resolveSnapshot, List<byte[]> keys) throws RocksDBException;
void write(WriteOptions writeOptions, WriteBatch writeBatch) throws RocksDBException;
void suggestCompactRange() throws RocksDBException;
void compactRange(byte[] begin, byte[] end, CompactRangeOptions options) throws RocksDBException;
void flush(FlushOptions options) throws RocksDBException;
void flushWal(boolean sync) throws RocksDBException;
long getLongProperty(String property) throws RocksDBException;
ColumnFamilyHandle getColumnFamilyHandle();
BufferAllocator getAllocator();
boolean supportsTransactions();
}

View File

@ -0,0 +1,171 @@
package it.cavallium.dbengine.database.disk;
import static it.cavallium.dbengine.database.LLUtils.MARKER_ROCKSDB;
import io.net5.buffer.api.Buffer;
import io.net5.buffer.api.BufferAllocator;
import io.net5.buffer.api.Send;
import it.cavallium.dbengine.client.DatabaseOptions;
import it.cavallium.dbengine.database.LLDelta;
import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.serialization.SerializationFunction;
import java.io.IOException;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.Holder;
import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.Transaction;
import org.rocksdb.WriteOptions;
public final class StandardRocksDBColumn extends AbstractRocksDBColumn<RocksDB> {
public StandardRocksDBColumn(RocksDB db,
DatabaseOptions databaseOptions,
BufferAllocator alloc,
ColumnFamilyHandle cfh) {
super(db, databaseOptions, alloc, cfh);
}
@Override
protected boolean commitOptimistically(Transaction tx) {
throw new UnsupportedOperationException("Transactions not supported");
}
@Override
protected Transaction beginTransaction(@NotNull WriteOptions writeOptions) {
throw new UnsupportedOperationException("Transactions not supported");
}
@Override
public @NotNull UpdateAtomicResult updateAtomic(@NotNull ReadOptions readOptions,
@NotNull WriteOptions writeOptions,
Send<Buffer> keySend,
SerializationFunction<@Nullable Send<Buffer>, @Nullable Send<Buffer>> updater,
boolean existsAlmostCertainly,
UpdateAtomicResultMode returnMode) throws IOException, RocksDBException {
try (Buffer key = keySend.receive()) {
var cfh = getCfh();
var db = getDb();
var alloc = getAllocator();
@Nullable Buffer prevData;
var prevDataHolder = existsAlmostCertainly ? null : new Holder<byte[]>();
if (existsAlmostCertainly || db.keyMayExist(cfh, LLUtils.toArray(key), prevDataHolder)) {
if (!existsAlmostCertainly && prevDataHolder.getValue() != null) {
byte @Nullable [] prevDataBytes = prevDataHolder.getValue();
if (prevDataBytes != null) {
prevData = LLUtils.fromByteArray(alloc, prevDataBytes);
} else {
prevData = null;
}
} else {
var obtainedPrevData = this.get(readOptions, key.copy().send(), existsAlmostCertainly);
if (obtainedPrevData == null) {
prevData = null;
} else {
prevData = obtainedPrevData.receive();
}
}
} else {
prevData = null;
}
if (logger.isTraceEnabled()) {
logger.trace(MARKER_ROCKSDB, "Reading {}: {} (before update)", LLUtils.toStringSafe(key),
LLUtils.toStringSafe(prevData));
}
try {
@Nullable Buffer newData;
try (Buffer prevDataToSendToUpdater = prevData == null ? null : prevData.copy()) {
try (var sentData = prevDataToSendToUpdater == null ? null : prevDataToSendToUpdater.send()) {
try (var newDataToReceive = updater.apply(sentData)) {
if (newDataToReceive != null) {
newData = newDataToReceive.receive();
} else {
newData = null;
}
}
}
}
boolean changed;
assert newData == null || newData.isAccessible();
try {
if (logger.isTraceEnabled()) {
logger.trace(MARKER_ROCKSDB,
"Updating {}. previous data: {}, updated data: {}", LLUtils.toStringSafe(key),
LLUtils.toStringSafe(prevData), LLUtils.toStringSafe(newData));
}
if (prevData != null && newData == null) {
if (logger.isTraceEnabled()) {
logger.trace(MARKER_ROCKSDB, "Deleting {} (after update)", LLUtils.toStringSafe(key));
}
this.delete(writeOptions, key.send());
changed = true;
} else if (newData != null && (prevData == null || !LLUtils.equals(prevData, newData))) {
if (logger.isTraceEnabled()) {
logger.trace(MARKER_ROCKSDB, "Writing {}: {} (after update)", LLUtils.toStringSafe(key),
LLUtils.toStringSafe(newData));
}
Buffer dataToPut;
if (returnMode == UpdateAtomicResultMode.CURRENT) {
dataToPut = newData.copy();
} else {
dataToPut = newData;
}
try {
this.put(writeOptions, key.send(), dataToPut.send());
changed = true;
} finally {
if (dataToPut != newData) {
dataToPut.close();
}
}
} else {
changed = false;
}
return switch (returnMode) {
case NOTHING -> {
if (prevData != null) {
prevData.close();
}
if (newData != null) {
newData.close();
}
yield RESULT_NOTHING;
}
case CURRENT -> {
if (prevData != null) {
prevData.close();
}
yield new UpdateAtomicResultCurrent(newData != null ? newData.send() : null);
}
case PREVIOUS -> {
if (newData != null) {
newData.close();
}
yield new UpdateAtomicResultPrevious(prevData != null ? prevData.send() : null);
}
case BINARY_CHANGED -> new UpdateAtomicResultBinaryChanged(changed);
case DELTA -> new UpdateAtomicResultDelta(LLDelta
.of(prevData != null ? prevData.send() : null, newData != null ? newData.send() : null)
.send());
};
} finally {
if (newData != null) {
newData.close();
}
}
} finally {
if (prevData != null) {
prevData.close();
}
}
}
}
@Override
public boolean supportsTransactions() {
return false;
}
}

View File

@ -0,0 +1,4 @@
package it.cavallium.dbengine.database.disk;
public sealed interface UpdateAtomicResult permits UpdateAtomicResultBinaryChanged, UpdateAtomicResultDelta,
UpdateAtomicResultNothing, UpdateAtomicResultPrevious, UpdateAtomicResultCurrent {}

View File

@ -0,0 +1,3 @@
package it.cavallium.dbengine.database.disk;
public final record UpdateAtomicResultBinaryChanged(boolean changed) implements UpdateAtomicResult {}

View File

@ -0,0 +1,6 @@
package it.cavallium.dbengine.database.disk;
import io.net5.buffer.api.Buffer;
import io.net5.buffer.api.Send;
public final record UpdateAtomicResultCurrent(Send<Buffer> current) implements UpdateAtomicResult {}

View File

@ -0,0 +1,6 @@
package it.cavallium.dbengine.database.disk;
import io.net5.buffer.api.Send;
import it.cavallium.dbengine.database.LLDelta;
public final record UpdateAtomicResultDelta(Send<LLDelta> delta) implements UpdateAtomicResult {}

View File

@ -0,0 +1,8 @@
package it.cavallium.dbengine.database.disk;
public enum UpdateAtomicResultMode {
NOTHING,
PREVIOUS,
CURRENT, BINARY_CHANGED,
DELTA
}

View File

@ -0,0 +1,3 @@
package it.cavallium.dbengine.database.disk;
public final class UpdateAtomicResultNothing implements UpdateAtomicResult {}

View File

@ -0,0 +1,6 @@
package it.cavallium.dbengine.database.disk;
import io.net5.buffer.api.Buffer;
import io.net5.buffer.api.Send;
public final record UpdateAtomicResultPrevious(Send<Buffer> previous) implements UpdateAtomicResult {}

View File

@ -7,6 +7,7 @@ import io.net5.buffer.api.BufferAllocator;
import io.net5.buffer.api.Send;
import io.net5.util.internal.PlatformDependent;
import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.disk.RocksDBColumn;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
@ -19,7 +20,7 @@ public class CappedWriteBatch extends WriteBatch {
* Default: true, Use false to debug problems with direct buffers
*/
private static final boolean USE_FAST_DIRECT_BUFFERS = true;
private final RocksDB db;
private final RocksDBColumn db;
private final BufferAllocator alloc;
private final int cap;
private final WriteOptions writeOptions;
@ -28,9 +29,10 @@ public class CappedWriteBatch extends WriteBatch {
private final List<ByteBuffer> byteBuffersToRelease;
/**
* @param db
* @param cap The limit of operations
*/
public CappedWriteBatch(RocksDB db,
public CappedWriteBatch(RocksDBColumn db,
BufferAllocator alloc,
int cap,
int reservedWriteBatchSize,