Use buffer api

This commit is contained in:
Andrea Cavalli 2022-03-09 02:29:38 +01:00
parent 35a70efec5
commit faa7118b8e
11 changed files with 437 additions and 237 deletions

View File

@ -251,9 +251,11 @@ versions:
commitDebounceTime: Duration commitDebounceTime: Duration
lowMemory: boolean lowMemory: boolean
directoryOptions: LuceneDirectoryOptions directoryOptions: LuceneDirectoryOptions
indexWriterBufferSize: long indexWriterReaderPooling: -boolean
applyAllDeletes: boolean indexWriterRAMBufferSizeMB: -double
writeAllDeletes: boolean indexWriterMaxBufferedDocs: -int
applyAllDeletes: -boolean
writeAllDeletes: -boolean
allowNonVolatileCollection: boolean allowNonVolatileCollection: boolean
maxInMemoryResultEntries: int maxInMemoryResultEntries: int
ByteBuffersDirectory: { data: { } } ByteBuffersDirectory: { data: { } }

View File

@ -175,23 +175,23 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
} }
logger.trace("WriterSchedulerMaxThreadCount: {}", writerSchedulerMaxThreadCount); logger.trace("WriterSchedulerMaxThreadCount: {}", writerSchedulerMaxThreadCount);
indexWriterConfig.setMergeScheduler(mergeScheduler); indexWriterConfig.setMergeScheduler(mergeScheduler);
if (luceneOptions.indexWriterBufferSize() == -1) { if (luceneOptions.indexWriterRAMBufferSizeMB().isPresent()) {
//todo: allow to configure maxbuffereddocs fallback indexWriterConfig.setRAMBufferSizeMB(luceneOptions.indexWriterRAMBufferSizeMB().get());
indexWriterConfig.setMaxBufferedDocs(80000); }
// disable ram buffer size after enabling maxBufferedDocs if (luceneOptions.indexWriterMaxBufferedDocs().isPresent()) {
indexWriterConfig.setRAMBufferSizeMB(-1); indexWriterConfig.setMaxBufferedDocs(luceneOptions.indexWriterMaxBufferedDocs().get());
} else { }
indexWriterConfig.setRAMBufferSizeMB(luceneOptions.indexWriterBufferSize() / 1024D / 1024D); if (luceneOptions.indexWriterReaderPooling().isPresent()) {
indexWriterConfig.setReaderPooling(luceneOptions.indexWriterReaderPooling().get());
} }
indexWriterConfig.setReaderPooling(false);
indexWriterConfig.setSimilarity(getLuceneSimilarity()); indexWriterConfig.setSimilarity(getLuceneSimilarity());
this.indexWriter = new IndexWriter(directory, indexWriterConfig); this.indexWriter = new IndexWriter(directory, indexWriterConfig);
this.snapshotsManager = new SnapshotsManager(indexWriter, snapshotter); this.snapshotsManager = new SnapshotsManager(indexWriter, snapshotter);
this.searcherManager = new CachedIndexSearcherManager(indexWriter, this.searcherManager = new CachedIndexSearcherManager(indexWriter,
snapshotsManager, snapshotsManager,
getLuceneSimilarity(), getLuceneSimilarity(),
luceneOptions.applyAllDeletes(), luceneOptions.applyAllDeletes().orElse(true),
luceneOptions.writeAllDeletes(), luceneOptions.writeAllDeletes().orElse(false),
luceneOptions.queryRefreshDebounceTime() luceneOptions.queryRefreshDebounceTime()
); );

View File

@ -1,5 +1,6 @@
package it.cavallium.dbengine.lucene; package it.cavallium.dbengine.lucene;
import io.net5.buffer.api.BufferAllocator;
import it.cavallium.dbengine.lucene.directory.RocksDBInstance; import it.cavallium.dbengine.lucene.directory.RocksDBInstance;
import it.cavallium.dbengine.lucene.directory.RocksdbFileStore; import it.cavallium.dbengine.lucene.directory.RocksdbFileStore;
import java.io.IOException; import java.io.IOException;
@ -16,6 +17,14 @@ public class LuceneRocksDBManager {
private static final Logger LOG = LogManager.getLogger(LuceneRocksDBManager.class); private static final Logger LOG = LogManager.getLogger(LuceneRocksDBManager.class);
private final List<Map.Entry<Path, RocksDBInstance>> dbs = new ArrayList<>(); private final List<Map.Entry<Path, RocksDBInstance>> dbs = new ArrayList<>();
private BufferAllocator bufferAllocator;
public synchronized BufferAllocator getAllocator() {
if (bufferAllocator == null) {
bufferAllocator = BufferAllocator.offHeapPooled();
}
return bufferAllocator;
}
public synchronized RocksDBInstance getOrCreate(Path path) { public synchronized RocksDBInstance getOrCreate(Path path) {
try { try {
@ -41,5 +50,9 @@ public class LuceneRocksDBManager {
} }
} }
dbs.clear(); dbs.clear();
if (bufferAllocator != null) {
bufferAllocator.close();
}
bufferAllocator = null;
} }
} }

View File

@ -610,7 +610,8 @@ public class LuceneUtils {
); );
} else if (directoryOptions instanceof RocksDBSharedDirectory rocksDBSharedDirectory) { } else if (directoryOptions instanceof RocksDBSharedDirectory rocksDBSharedDirectory) {
var dbInstance = rocksDBManager.getOrCreate(rocksDBSharedDirectory.managedPath()); var dbInstance = rocksDBManager.getOrCreate(rocksDBSharedDirectory.managedPath());
return new RocksdbDirectory(dbInstance.db(), return new RocksdbDirectory(rocksDBManager.getAllocator(),
dbInstance.db(),
dbInstance.handles(), dbInstance.handles(),
directoryName, directoryName,
rocksDBSharedDirectory.blockSize() rocksDBSharedDirectory.blockSize()

View File

@ -0,0 +1,168 @@
package it.cavallium.dbengine.lucene.directory;
import io.net5.buffer.Unpooled;
import io.net5.buffer.api.Buffer;
import io.net5.buffer.api.BufferAllocator;
import io.net5.buffer.api.BufferRef;
import io.net5.buffer.api.adaptor.ByteBufAdaptor;
import org.apache.lucene.store.IndexInput;
import java.io.EOFException;
import java.io.IOException;
public class RocksDBSliceInputStream extends IndexInput {
private static final BufferAllocator HEAP_UNPOOLED_BUFFER = BufferAllocator.onHeapUnpooled();
private final int bufferSize;
private long position;
private final long length;
private byte[] currentBuffer;
private int currentBufferIndex;
private final RocksdbFileStore store;
private final String name;
public RocksDBSliceInputStream(String name, RocksdbFileStore store, int bufferSize) throws IOException {
this(name, store, bufferSize, store.getSize(name));
}
public RocksDBSliceInputStream(String name, RocksdbFileStore store, int bufferSize, long length) {
super("RocksDBSliceInputStream(name=" + name + ")");
this.name = name;
this.store = store;
this.bufferSize = bufferSize;
this.currentBuffer = new byte[this.bufferSize];
this.currentBufferIndex = bufferSize;
this.position = 0;
this.length = length;
}
@Override
public void close() throws IOException {
//store.close();
}
@Override
public long getFilePointer() {
return position;
}
@Override
public void seek(long pos) {
if (pos < 0 || pos > length) {
throw new IllegalArgumentException("pos must be between 0 and " + length);
}
position = pos;
currentBufferIndex = this.bufferSize;
}
@Override
public long length() {
return this.length;
}
@Override
public IndexInput slice(String sliceDescription, final long offset, final long length) throws IOException {
if (offset < 0 || length < 0 || offset + length > this.length) {
throw new IllegalArgumentException("slice() " + sliceDescription + " out of bounds: " + this);
}
return new RocksDBSliceInputStream(name, store, bufferSize, offset + length) {
{
seek(0L);
}
@Override
public void seek(long pos) {
if (pos < 0L) {
throw new IllegalArgumentException("Seeking to negative position: " + this);
}
super.seek(pos + offset);
}
@Override
public long getFilePointer() {
return super.getFilePointer() - offset;
}
@Override
public long length() {
return super.length() - offset;
}
@Override
public IndexInput slice(String sliceDescription, long ofs, long len) throws IOException {
return super.slice(sliceDescription, offset + ofs, len);
}
};
}
@Override
public byte readByte() throws IOException {
if (position >= length) {
throw new EOFException("Read end");
}
loadBufferIfNeed();
byte b = currentBuffer[currentBufferIndex++];
position++;
return b;
}
protected void loadBufferIfNeed() throws IOException {
if (this.currentBufferIndex == this.bufferSize) {
try (var editedBuffer = HEAP_UNPOOLED_BUFFER.copyOf(currentBuffer)) {
int n = store.load(name, position, editedBuffer, 0, bufferSize);
editedBuffer.copyInto(0, currentBuffer, 0, currentBuffer.length);
if (n == -1) {
throw new EOFException("Read end");
}
}
this.currentBufferIndex = 0;
}
}
@Override
public void readBytes(byte[] b, int offset, int len) throws IOException {
if (position >= length) {
throw new EOFException("Read end");
}
int f = offset;
int n = Math.min((int) (length - position), len);
do {
loadBufferIfNeed();
int r = Math.min(bufferSize - currentBufferIndex, n);
System.arraycopy(currentBuffer, currentBufferIndex, b, f, r);
f += r;
position += r;
currentBufferIndex += r;
n -= r;
} while (n != 0);
}
@Override
public IndexInput clone() {
RocksDBSliceInputStream in = (RocksDBSliceInputStream) super.clone();
in.currentBuffer = new byte[bufferSize];
System.arraycopy(this.currentBuffer, 0, in.currentBuffer, 0, bufferSize);
return in;
}
}

View File

@ -1,6 +1,7 @@
package it.cavallium.dbengine.lucene.directory; package it.cavallium.dbengine.lucene.directory;
import com.google.common.util.concurrent.Striped; import com.google.common.util.concurrent.Striped;
import io.net5.buffer.api.BufferAllocator;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.nio.file.Path; import java.nio.file.Path;
@ -34,27 +35,33 @@ public class RocksdbDirectory extends BaseDirectory implements Accountable {
private final AtomicLong nextTempFileCounter = new AtomicLong(); private final AtomicLong nextTempFileCounter = new AtomicLong();
public RocksdbDirectory(Path path, int blockSize) throws IOException { public RocksdbDirectory(BufferAllocator bufferAllocator, Path path, int blockSize) throws IOException {
this(path, blockSize, new SingleInstanceLockFactory()); this(bufferAllocator, path, blockSize, new SingleInstanceLockFactory());
} }
public RocksdbDirectory(RocksDB db, Map<String, ColumnFamilyHandle> handles, @Nullable String name, int blockSize) public RocksdbDirectory(BufferAllocator bufferAllocator,
RocksDB db,
Map<String, ColumnFamilyHandle> handles,
@Nullable String name,
int blockSize)
throws IOException { throws IOException {
this(db, handles, name, blockSize, new SingleInstanceLockFactory()); this(bufferAllocator, db, handles, name, blockSize, new SingleInstanceLockFactory());
} }
protected RocksdbDirectory(Path path, int blockSize, LockFactory lockFactory) throws IOException { protected RocksdbDirectory(BufferAllocator bufferAllocator, Path path, int blockSize, LockFactory lockFactory)
throws IOException {
super(lockFactory); super(lockFactory);
store = RocksdbFileStore.create(path, blockSize, metaLock); store = RocksdbFileStore.create(bufferAllocator, path, blockSize, metaLock);
} }
protected RocksdbDirectory(RocksDB db, protected RocksdbDirectory(BufferAllocator bufferAllocator,
RocksDB db,
Map<String, ColumnFamilyHandle> handles, Map<String, ColumnFamilyHandle> handles,
@Nullable String name, @Nullable String name,
int blockSize, int blockSize,
LockFactory lockFactory) throws IOException { LockFactory lockFactory) throws IOException {
super(lockFactory); super(lockFactory);
store = RocksdbFileStore.create(db, handles, name, blockSize, metaLock); store = RocksdbFileStore.create(bufferAllocator, db, handles, name, blockSize, metaLock);
} }
@Override @Override

View File

@ -2,11 +2,12 @@ package it.cavallium.dbengine.lucene.directory;
import com.google.common.primitives.Longs; import com.google.common.primitives.Longs;
import com.google.common.util.concurrent.Striped; import com.google.common.util.concurrent.Striped;
import io.net5.buffer.ByteBuf; import io.net5.buffer.api.Buffer;
import io.net5.buffer.ByteBufAllocator; import io.net5.buffer.api.BufferAllocator;
import io.net5.buffer.api.ReadableComponent;
import io.net5.buffer.api.WritableComponent;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.nio.file.Files; import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
@ -19,10 +20,10 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReadWriteLock;
import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.store.AlreadyClosedException;
import org.jetbrains.annotations.Nullable; import org.jetbrains.annotations.Nullable;
import org.rocksdb.ClockCache;
import org.rocksdb.ColumnFamilyDescriptor; import org.rocksdb.ColumnFamilyDescriptor;
import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ColumnFamilyOptions; import org.rocksdb.ColumnFamilyOptions;
@ -60,6 +61,7 @@ public class RocksdbFileStore {
private static final ByteBuffer EMPTY_BYTE_BUF = ByteBuffer.allocateDirect(0); private static final ByteBuffer EMPTY_BYTE_BUF = ByteBuffer.allocateDirect(0);
private final RocksDB db; private final RocksDB db;
public final BufferAllocator bufferAllocator;
private final int blockSize; private final int blockSize;
private final ColumnFamilyHandle headers; private final ColumnFamilyHandle headers;
private final ColumnFamilyHandle filename; private final ColumnFamilyHandle filename;
@ -71,6 +73,7 @@ public class RocksdbFileStore {
private volatile boolean closed; private volatile boolean closed;
private RocksdbFileStore(RocksDB db, private RocksdbFileStore(RocksDB db,
BufferAllocator bufferAllocator,
ColumnFamilyHandle headers, ColumnFamilyHandle headers,
ColumnFamilyHandle filename, ColumnFamilyHandle filename,
ColumnFamilyHandle size, ColumnFamilyHandle size,
@ -80,6 +83,7 @@ public class RocksdbFileStore {
boolean closeDbOnClose) throws IOException { boolean closeDbOnClose) throws IOException {
try { try {
this.db = db; this.db = db;
this.bufferAllocator = bufferAllocator;
this.closeDbOnClose = closeDbOnClose; this.closeDbOnClose = closeDbOnClose;
this.blockSize = blockSize; this.blockSize = blockSize;
this.headers = headers; this.headers = headers;
@ -110,11 +114,25 @@ public class RocksdbFileStore {
} }
} }
private static ByteBuffer readableNioBuffer(Buffer buffer) {
assert buffer.countReadableComponents() == 1 : "Readable components count: " + buffer.countReadableComponents();
return ((ReadableComponent) buffer).readableBuffer();
}
private static ByteBuffer writableNioBuffer(Buffer buffer, int newWriterOffset) {
assert buffer.countWritableComponents() == 1 : "Writable components count: " + buffer.countWritableComponents();
buffer.writerOffset(0).ensureWritable(newWriterOffset);
var byteBuf = ((WritableComponent) buffer).writableBuffer();
buffer.writerOffset(newWriterOffset);
assert buffer.capacity() >= newWriterOffset : "Returned capacity " + buffer.capacity() + " < " + newWriterOffset;
return byteBuf;
}
private static DBOptions getDBOptions() { private static DBOptions getDBOptions() {
var options = new DBOptions(); var options = new DBOptions();
options.setWalSizeLimitMB(256); options.setWalSizeLimitMB(256);
options.setMaxWriteBatchGroupSizeBytes(2 * SizeUnit.MB); options.setMaxWriteBatchGroupSizeBytes(2 * SizeUnit.MB);
options.setAtomicFlush(false); //options.setAtomicFlush(false);
options.setWalRecoveryMode(WALRecoveryMode.PointInTimeRecovery); options.setWalRecoveryMode(WALRecoveryMode.PointInTimeRecovery);
options.setCreateMissingColumnFamilies(true); options.setCreateMissingColumnFamilies(true);
options.setCreateIfMissing(true); options.setCreateIfMissing(true);
@ -122,34 +140,17 @@ public class RocksdbFileStore {
options.setAvoidUnnecessaryBlockingIO(true); options.setAvoidUnnecessaryBlockingIO(true);
options.setSkipCheckingSstFileSizesOnDbOpen(true); options.setSkipCheckingSstFileSizesOnDbOpen(true);
options.setInfoLogLevel(InfoLogLevel.ERROR_LEVEL); options.setInfoLogLevel(InfoLogLevel.ERROR_LEVEL);
options.setAllowMmapReads(true); //options.setAllowMmapReads(true);
options.setAllowMmapWrites(true); //options.setAllowMmapWrites(true);
options.setUseDirectReads(true);
options.setUseDirectIoForFlushAndCompaction(true);
options.setIncreaseParallelism(Runtime.getRuntime().availableProcessors()); options.setIncreaseParallelism(Runtime.getRuntime().availableProcessors());
options.setDeleteObsoleteFilesPeriodMicros(Duration.ofMinutes(15).toNanos() / 1000L); options.setDeleteObsoleteFilesPeriodMicros(Duration.ofMinutes(15).toNanos() / 1000L);
options.setRowCache(new ClockCache(512 * 1024 * 1024L));
options.setMaxOpenFiles(500);
return options; return options;
} }
private static List<ColumnFamilyDescriptor> getColumnFamilyDescriptors(@Nullable String name) {
String headersName, filenameName, sizeName, dataName;
if (name != null) {
headersName = (name + "_headers");
filenameName = (name + "_filename");
sizeName = (name + "_size");
dataName = (name + "_data");
} else {
headersName = DEFAULT_COLUMN_FAMILY_STRING;
filenameName = "filename";
sizeName = "size";
dataName = "data";
}
return List.of(
getColumnFamilyDescriptor(headersName),
getColumnFamilyDescriptor(filenameName),
getColumnFamilyDescriptor(sizeName),
getColumnFamilyDescriptor(dataName)
);
}
public static ColumnFamilyDescriptor getColumnFamilyDescriptor(String name) { public static ColumnFamilyDescriptor getColumnFamilyDescriptor(String name) {
ColumnFamilyOptions opts; ColumnFamilyOptions opts;
if (name.equals(DEFAULT_COLUMN_FAMILY_STRING) || name.endsWith("_headers")) { if (name.equals(DEFAULT_COLUMN_FAMILY_STRING) || name.endsWith("_headers")) {
@ -174,7 +175,29 @@ public class RocksdbFileStore {
return new ColumnFamilyDescriptor(name.getBytes(StandardCharsets.US_ASCII), opts); return new ColumnFamilyDescriptor(name.getBytes(StandardCharsets.US_ASCII), opts);
} }
public static RocksdbFileStore create(RocksDB db, private static List<ColumnFamilyDescriptor> getColumnFamilyDescriptors(@Nullable String name) {
String headersName, filenameName, sizeName, dataName;
if (name != null) {
headersName = (name + "_headers");
filenameName = (name + "_filename");
sizeName = (name + "_size");
dataName = (name + "_data");
} else {
headersName = DEFAULT_COLUMN_FAMILY_STRING;
filenameName = "filename";
sizeName = "size";
dataName = "data";
}
return List.of(
getColumnFamilyDescriptor(headersName),
getColumnFamilyDescriptor(filenameName),
getColumnFamilyDescriptor(sizeName),
getColumnFamilyDescriptor(dataName)
);
}
public static RocksdbFileStore create(BufferAllocator bufferAllocator,
RocksDB db,
Map<String, ColumnFamilyHandle> existingHandles, Map<String, ColumnFamilyHandle> existingHandles,
@Nullable String name, @Nullable String name,
int blockSize, int blockSize,
@ -193,6 +216,7 @@ public class RocksdbFileStore {
handles.add(columnFamilyHandle); handles.add(columnFamilyHandle);
} }
return new RocksdbFileStore(db, return new RocksdbFileStore(db,
bufferAllocator,
handles.get(0), handles.get(0),
handles.get(1), handles.get(1),
handles.get(2), handles.get(2),
@ -206,7 +230,10 @@ public class RocksdbFileStore {
} }
} }
public static RocksdbFileStore create(Path path, int blockSize, Striped<ReadWriteLock> metaLock) throws IOException { public static RocksdbFileStore create(BufferAllocator bufferAllocator,
Path path,
int blockSize,
Striped<ReadWriteLock> metaLock) throws IOException {
try { try {
DBOptions options = getDBOptions(); DBOptions options = getDBOptions();
List<ColumnFamilyDescriptor> descriptors = getColumnFamilyDescriptors(null); List<ColumnFamilyDescriptor> descriptors = getColumnFamilyDescriptors(null);
@ -216,6 +243,7 @@ public class RocksdbFileStore {
var handles = new ArrayList<ColumnFamilyHandle>(4); var handles = new ArrayList<ColumnFamilyHandle>(4);
RocksDB db = RocksDB.open(options, path.toString(), descriptors, handles); RocksDB db = RocksDB.open(options, path.toString(), descriptors, handles);
return new RocksdbFileStore(db, return new RocksdbFileStore(db,
bufferAllocator,
handles.get(0), handles.get(0),
handles.get(1), handles.get(1),
handles.get(2), handles.get(2),
@ -266,18 +294,13 @@ public class RocksdbFileStore {
if (id != null) { if (id != null) {
return id; return id;
} else { } else {
var filenameKey = getFilenameKey(key); try (var filenameKey = getFilenameKey(key); var filenameValue = getFilenameValue()) {
var filenameValue = getFilenameValue(); if (db.get(filename, DEFAULT_READ_OPTS, readableNioBuffer(filenameKey), writableNioBuffer(filenameValue, Long.BYTES))
try {
if (db.get(filename, DEFAULT_READ_OPTS, filenameKey.nioBuffer(), filenameValue.nioBuffer(0, Long.BYTES))
== RocksDB.NOT_FOUND) { == RocksDB.NOT_FOUND) {
throw new IOException("File not found: " + key); throw new IOException("File not found: " + key);
} }
filenameValue.writerIndex(Long.BYTES); filenameValue.writerOffset(Long.BYTES);
return filenameValue.readLongLE(); return filenameValue.readLong();
} finally {
filenameKey.release();
filenameValue.release();
} }
} }
} }
@ -288,18 +311,13 @@ public class RocksdbFileStore {
if (id != null) { if (id != null) {
return id; return id;
} else { } else {
var filenameKey = getFilenameKey(key); try (var filenameKey = getFilenameKey(key); var filenameValue = getFilenameValue()) {
var filenameValue = getFilenameValue(); if (db.get(filename, DEFAULT_READ_OPTS, readableNioBuffer(filenameKey), writableNioBuffer(filenameValue, Long.BYTES))
try {
if (db.get(filename, DEFAULT_READ_OPTS, filenameKey.nioBuffer(), filenameValue.nioBuffer(0, Long.BYTES))
== RocksDB.NOT_FOUND) { == RocksDB.NOT_FOUND) {
return null; return null;
} }
filenameValue.writerIndex(Long.BYTES); filenameValue.writerOffset(Long.BYTES);
return filenameValue.readLongLE(); return filenameValue.readLong();
} finally {
filenameKey.release();
filenameValue.release();
} }
} }
} }
@ -309,33 +327,24 @@ public class RocksdbFileStore {
if (id != null) { if (id != null) {
return true; return true;
} else { } else {
var filenameKey = getFilenameKey(key); try (var filenameKey = getFilenameKey(key)) {
try { if (db.keyMayExist(filename, DEFAULT_READ_OPTS, readableNioBuffer(filenameKey))) {
if (db.keyMayExist(filename, DEFAULT_READ_OPTS, filenameKey.nioBuffer())) { return db.get(filename, DEFAULT_READ_OPTS, readableNioBuffer(filenameKey), EMPTY_BYTE_BUF) != RocksDB.NOT_FOUND;
return db.get(filename, DEFAULT_READ_OPTS, filenameKey.nioBuffer(), EMPTY_BYTE_BUF) != RocksDB.NOT_FOUND;
} else { } else {
return false; return false;
} }
} finally {
filenameKey.release();
} }
} }
} }
private void moveFileId(long id, String oldKey, String newKey) throws RocksDBException { private void moveFileId(long id, String oldKey, String newKey) throws RocksDBException {
var filenameOldKey = getFilenameKey(oldKey);
var filenameNewKey = getFilenameKey(newKey);
var filenameValue = getFilenameValue(); var filenameValue = getFilenameValue();
filenameValue.writeLongLE(id); filenameValue.writeLong(id);
try { try (var filenameOldKey = getFilenameKey(oldKey); var filenameNewKey = getFilenameKey(newKey); filenameValue) {
db.delete(filename, DEFAULT_WRITE_OPTS, filenameOldKey.nioBuffer()); db.delete(filename, DEFAULT_WRITE_OPTS, readableNioBuffer(filenameOldKey));
incFlush(); incFlush();
db.put(filename, DEFAULT_WRITE_OPTS, filenameNewKey.nioBuffer(), filenameValue.nioBuffer(0, Long.BYTES)); db.put(filename, DEFAULT_WRITE_OPTS, readableNioBuffer(filenameNewKey), readableNioBuffer(filenameValue));
incFlush(); incFlush();
} finally {
filenameOldKey.release();
filenameNewKey.release();
filenameValue.release();
} }
} }
@ -352,45 +361,38 @@ public class RocksdbFileStore {
if (id != null) { if (id != null) {
return id; return id;
} else { } else {
var filenameKey = getFilenameKey(key); try (var filenameKey = getFilenameKey(key); var filenameValue = getFilenameValue()) {
var filenameValue = getFilenameValue(); if (db.get(filename, DEFAULT_READ_OPTS, readableNioBuffer(filenameKey),
try { writableNioBuffer(filenameValue, Long.BYTES))
if (db.get(filename, DEFAULT_READ_OPTS, filenameKey.nioBuffer(), filenameValue.nioBuffer(0, Long.BYTES))
== RocksDB.NOT_FOUND) { == RocksDB.NOT_FOUND) {
filenameValue.writerIndex(0); filenameValue.writerOffset(0);
filenameValue.readerIndex(0); filenameValue.readerOffset(0);
var newlyAllocatedId = this.nextId.getAndIncrement(); var newlyAllocatedId = this.nextId.getAndIncrement();
if (newlyAllocatedId % 100 == 99) { if (newlyAllocatedId % 100 == 99) {
db.put(headers, new byte[] {0x00}, Longs.toByteArray(newlyAllocatedId + 1 + 100)); db.put(headers, new byte[]{0x00}, Longs.toByteArray(newlyAllocatedId + 1 + 100));
incFlush(); incFlush();
} }
filenameValue.writeLongLE(newlyAllocatedId); filenameValue.writeLong(newlyAllocatedId);
db.put(filename, db.put(filename,
DEFAULT_WRITE_OPTS, DEFAULT_WRITE_OPTS,
filenameKey.nioBuffer(), readableNioBuffer(filenameKey),
filenameValue.nioBuffer(0, filenameValue.writerIndex()) readableNioBuffer(filenameValue)
); );
incFlush(); incFlush();
filenameToId.put(key, newlyAllocatedId); filenameToId.put(key, newlyAllocatedId);
return newlyAllocatedId; return newlyAllocatedId;
} }
filenameValue.readerIndex(0); filenameValue.readerOffset(0);
filenameValue.writerIndex(Long.BYTES); filenameValue.writerOffset(Long.BYTES);
return filenameValue.readLongLE(); return filenameValue.readLong();
} finally {
filenameKey.release();
filenameValue.release();
} }
} }
} }
private void dellocateFilename(String key) throws RocksDBException { private void dellocateFilename(String key) throws RocksDBException {
var filenameKey = getFilenameKey(key); try (var filenameKey = getFilenameKey(key)) {
try { db.delete(filename, DEFAULT_WRITE_OPTS, readableNioBuffer(filenameKey));
db.delete(filename, DEFAULT_WRITE_OPTS, filenameKey.nioBuffer());
filenameToId.remove(key); filenameToId.remove(key);
} finally {
filenameKey.release();
} }
} }
@ -405,56 +407,54 @@ public class RocksdbFileStore {
} }
} }
private ByteBuf getMetaValueBuf() { private Buffer getMetaValueBuf() {
return ByteBufAllocator.DEFAULT.ioBuffer(Long.BYTES, Long.BYTES); return bufferAllocator.allocate(Long.BYTES);
} }
private ByteBuf getDataValueBuf() { private Buffer getDataValueBuf() {
return ByteBufAllocator.DEFAULT.ioBuffer(blockSize, blockSize); return bufferAllocator.allocate(blockSize);
} }
private ByteBuf getFilenameValue() { private Buffer getFilenameValue() {
return ByteBufAllocator.DEFAULT.ioBuffer(Long.BYTES, Long.BYTES); return bufferAllocator.allocate(Long.BYTES);
} }
private ByteBuf getMetaKey(long id) { private Buffer getMetaKey(long id) {
ByteBuf buf = ByteBufAllocator.DEFAULT.ioBuffer(Long.BYTES); Buffer buf = bufferAllocator.allocate(Long.BYTES);
buf.writeLongLE(id); buf.writeLong(id);
return buf; return buf;
} }
private ByteBuf getFilenameKey(String key) { private Buffer getFilenameKey(String key) {
ByteBuf buf = ByteBufAllocator.DEFAULT.ioBuffer(key.length()); Buffer buf = bufferAllocator.allocate(key.length());
buf.writeCharSequence(key, StandardCharsets.US_ASCII); buf.writeCharSequence(key, StandardCharsets.US_ASCII);
return buf; return buf;
} }
private ByteBuf getDataKey(@Nullable ByteBuf buf, long id, int i) { private Buffer getDataKey(@Nullable Buffer buf, long id, int i) {
if (buf == null) { if (buf == null) {
buf = ByteBufAllocator.DEFAULT.buffer(Long.BYTES + Integer.BYTES); buf = bufferAllocator.allocate(Long.BYTES + Integer.BYTES);
} }
buf.writeLongLE(id); buf.writeLong(id);
buf.writeInt(i); buf.writeInt(i);
return buf; return buf;
} }
private ByteBuf getDataKeyPrefix(long id) { private Buffer getDataKeyPrefix(long id) {
var buf = ByteBufAllocator.DEFAULT.buffer(Long.BYTES); var buf = bufferAllocator.allocate(Long.BYTES);
buf.writeLongLE(id); buf.writeLong(id);
return buf; return buf;
} }
private byte[] getDataKeyByteArray(long id, int i) { private byte[] getDataKeyByteArray(long id, int i) {
ByteBuffer bb = ByteBuffer.wrap(new byte[Long.BYTES + Integer.BYTES]); ByteBuffer bb = ByteBuffer.wrap(new byte[Long.BYTES + Integer.BYTES]);
bb.order(ByteOrder.LITTLE_ENDIAN);
bb.putLong(id); bb.putLong(id);
bb.order(ByteOrder.BIG_ENDIAN);
bb.putInt(i); bb.putInt(i);
return bb.array(); return bb.array();
} }
public int load(String name, long position, ByteBuf buf, int offset, int len) throws IOException { public int load(String name, long position, Buffer buf, int offset, int len) throws IOException {
var l = metaLock.get(name).readLock(); var l = metaLock.get(name).readLock();
l.lock(); l.lock();
try { try {
@ -477,15 +477,14 @@ public class RocksdbFileStore {
int f = offset; int f = offset;
int n = len; int n = len;
ByteBuf valBuf = getDataValueBuf(); Buffer valBuf = getDataValueBuf();
ByteBuffer valBuffer = valBuf.nioBuffer(0, blockSize); try (valBuf) {
try { ByteBuffer valBuffer = writableNioBuffer(valBuf, blockSize);
boolean shouldSeekTo = true; boolean shouldSeekTo = true;
try (var ro = new ReadOptions(itReadOpts)) { try (var ro = new ReadOptions(itReadOpts)) {
ro.setIgnoreRangeDeletions(true); ro.setIgnoreRangeDeletions(true);
ByteBuf fileIdPrefix = getDataKeyPrefix(fileId); try (Buffer fileIdPrefix = getDataKeyPrefix(fileId)) {
try { try (var lb = new DirectSlice(readableNioBuffer(fileIdPrefix), Long.BYTES)) {
try (var lb = new DirectSlice(fileIdPrefix.internalNioBuffer(0, Long.BYTES), Long.BYTES)) {
ro.setIterateLowerBound(lb); ro.setIterateLowerBound(lb);
ro.setPrefixSameAsStart(true); ro.setPrefixSameAsStart(true);
try (RocksIterator it = db.newIterator(data, itReadOpts)) { try (RocksIterator it = db.newIterator(data, itReadOpts)) {
@ -501,11 +500,8 @@ public class RocksdbFileStore {
if (shouldSeekTo) { if (shouldSeekTo) {
shouldSeekTo = false; shouldSeekTo = false;
ByteBuf dataKey = getDataKey(null, fileId, i); try (Buffer dataKey = getDataKey(null, fileId, i)) {
try { it.seek(readableNioBuffer(dataKey));
it.seek(dataKey.nioBuffer());
} finally {
dataKey.release();
} }
if (!it.isValid()) { if (!it.isValid()) {
throw new IOException("Block " + name + "(" + fileId + ")" + ":" + i + " not found"); throw new IOException("Block " + name + "(" + fileId + ")" + ":" + i + " not found");
@ -518,12 +514,12 @@ public class RocksdbFileStore {
} }
assert Arrays.equals(getDataKeyByteArray(fileId, i), it.key()); assert Arrays.equals(getDataKeyByteArray(fileId, i), it.key());
int dataRead = it.value(valBuffer); int dataRead = it.value(valBuffer);
valBuf.writerIndex(dataRead); valBuf.writerOffset(dataRead);
valBuf.getBytes(m, buf, f, r); valBuf.copyInto(m, buf, f, r);
valBuf.writerIndex(0); valBuf.writerOffset(0);
valBuf.readerIndex(0); valBuf.readerOffset(0);
p += r; p += r;
f += r; f += r;
@ -533,12 +529,8 @@ public class RocksdbFileStore {
return (int) (p - position); return (int) (p - position);
} }
} }
} finally {
fileIdPrefix.release();
} }
} }
} finally {
valBuf.release();
} }
} catch (RocksDBException ex) { } catch (RocksDBException ex) {
throw new IOException(ex); throw new IOException(ex);
@ -581,19 +573,14 @@ public class RocksdbFileStore {
*/ */
private long getSizeInternal(long fileId) throws IOException { private long getSizeInternal(long fileId) throws IOException {
try { try {
ByteBuf metaKey = getMetaKey(fileId); try (Buffer metaKey = getMetaKey(fileId); Buffer metaData = getMetaValueBuf()) {
ByteBuf metaData = getMetaValueBuf(); if (db.get(size, DEFAULT_READ_OPTS, readableNioBuffer(metaKey), writableNioBuffer(metaData, Long.BYTES))
try {
if (db.get(size, DEFAULT_READ_OPTS, metaKey.nioBuffer(), metaData.internalNioBuffer(0, Long.BYTES))
!= RocksDB.NOT_FOUND) { != RocksDB.NOT_FOUND) {
metaData.writerIndex(Long.BYTES); metaData.writerOffset(Long.BYTES);
return metaData.readLongLE(); return metaData.readLong();
} else { } else {
return -1; return -1;
} }
} finally {
metaData.release();
metaKey.release();
} }
} catch (RocksDBException ex) { } catch (RocksDBException ex) {
throw new IOException(ex); throw new IOException(ex);
@ -615,27 +602,24 @@ public class RocksdbFileStore {
if (size == -1) { if (size == -1) {
return; return;
} }
ByteBuf dataKey = null; Buffer dataKey = null;
try { try {
int n = (int) ((size + blockSize - 1) / blockSize); int n = (int) ((size + blockSize - 1) / blockSize);
if (n == 1) { if (n == 1) {
dataKey = getDataKey(dataKey, fileId, 0); dataKey = getDataKey(dataKey, fileId, 0);
db.delete(data, DEFAULT_WRITE_OPTS, dataKey.nioBuffer()); db.delete(data, DEFAULT_WRITE_OPTS, readableNioBuffer(dataKey));
} else if (n > 1) { } else if (n > 1) {
var dataKey1 = getDataKeyByteArray(fileId, 0); var dataKey1 = getDataKeyByteArray(fileId, 0);
var dataKey2 = getDataKeyByteArray(fileId, n - 1); var dataKey2 = getDataKeyByteArray(fileId, n - 1);
db.deleteRange(data, DEFAULT_WRITE_OPTS, dataKey1, dataKey2); db.deleteRange(data, DEFAULT_WRITE_OPTS, dataKey1, dataKey2);
} }
ByteBuf metaKey = getMetaKey(fileId); try (Buffer metaKey = getMetaKey(fileId)) {
try {
dellocateFilename(key); dellocateFilename(key);
db.delete(this.size, DEFAULT_WRITE_OPTS, metaKey.nioBuffer()); db.delete(this.size, DEFAULT_WRITE_OPTS, readableNioBuffer(metaKey));
} finally {
metaKey.release();
} }
} finally { } finally {
if (dataKey != null) { if (dataKey != null) {
dataKey.release(); dataKey.close();
} }
} }
} catch (RocksDBException ex) { } catch (RocksDBException ex) {
@ -689,7 +673,7 @@ public class RocksdbFileStore {
return keys; return keys;
} }
public void append(String name, ByteBuf buf, int offset, int len) throws IOException { public void append(String name, Buffer buf, int offset, int len) throws IOException {
var l = metaLock.get(name).writeLock(); var l = metaLock.get(name).writeLock();
l.lock(); l.lock();
try { try {
@ -707,57 +691,62 @@ public class RocksdbFileStore {
n = len; n = len;
fileId = getFileIdOrAllocate(name); fileId = getFileIdOrAllocate(name);
ByteBuf dataKey = null; Buffer dataKey = null;
ByteBuf bb = getDataValueBuf(); Buffer bb = getDataValueBuf();
try { try {
do { do {
int m = (int) (size % (long) blockSize); int m = (int) (size % (long) blockSize);
int r = Math.min(blockSize - m, n); int r = Math.min(blockSize - m, n);
int i = (int) ((size) / (long) blockSize); int i = (int) ((size) / (long) blockSize);
dataKey = getDataKey(dataKey, fileId, i); dataKey = getDataKey(dataKey, fileId, i);
if (m != 0) { if (m != 0) {
int dataRead; int dataRead;
if ((dataRead = db.get(data, DEFAULT_READ_OPTS, dataKey.nioBuffer(), bb.internalNioBuffer(0, blockSize))) if ((dataRead = db.get(data,
== RocksDB.NOT_FOUND) { DEFAULT_READ_OPTS,
throw new IOException("Block " + name + "(" + fileId + "):" + i + " not found"); readableNioBuffer(dataKey),
} writableNioBuffer(bb, blockSize)
bb.writerIndex(dataRead); )) == RocksDB.NOT_FOUND) {
dataKey.readerIndex(0); throw new IOException("Block " + name + "(" + fileId + "):" + i + " not found");
} else {
bb.writerIndex(0);
} }
bb.writerOffset(dataRead);
dataKey.readerOffset(0);
} else {
bb.writerOffset(0);
}
bb.ensureWritable(r); bb.ensureWritable(r);
bb.setBytes(m, buf, f, r); buf.copyInto(f, bb, m, r);
db.put(data, DEFAULT_WRITE_OPTS, dataKey.nioBuffer(), bb.internalNioBuffer(0, m + r)); var bbBuf = writableNioBuffer(bb, m + r);
incFlush();
size += r;
f += r;
n -= r;
dataKey.readerIndex(0); assert bbBuf.capacity() >= m + r : bbBuf.capacity() + " < " + (m + r);
dataKey.writerIndex(0); assert bbBuf.position() == 0;
bb.readerIndex(0); bbBuf.limit(m + r);
bb.writerIndex(0); assert bbBuf.limit() == m + r;
} while (n != 0);
db.put(data, DEFAULT_WRITE_OPTS, readableNioBuffer(dataKey), bbBuf);
incFlush();
size += r;
f += r;
n -= r;
dataKey.readerOffset(0);
dataKey.writerOffset(0);
bb.readerOffset(0);
bb.writerOffset(0);
} while (n != 0);
} finally { } finally {
if (dataKey != null) { if (dataKey != null) {
dataKey.release(); dataKey.close();
} }
bb.release(); bb.close();
} }
ByteBuf metaKey = getMetaKey(fileId); try (Buffer metaKey = getMetaKey(fileId); Buffer metaValue = getMetaValueBuf()) {
ByteBuf metaValue = getMetaValueBuf(); metaValue.writeLong(size);
try { db.put(this.size, DEFAULT_WRITE_OPTS, readableNioBuffer(metaKey), readableNioBuffer(metaValue));
metaValue.writeLongLE(size);
db.put(this.size, DEFAULT_WRITE_OPTS, metaKey.nioBuffer(), metaValue.nioBuffer(0, Long.BYTES));
incFlush(); incFlush();
} finally {
metaValue.release();
metaKey.release();
} }
} catch (RocksDBException ex) { } catch (RocksDBException ex) {
throw new IOException(ex); throw new IOException(ex);

View File

@ -1,8 +1,7 @@
package it.cavallium.dbengine.lucene.directory; package it.cavallium.dbengine.lucene.directory;
import io.net5.buffer.ByteBuf; import io.net5.buffer.api.Buffer;
import io.net5.buffer.ByteBufAllocator; import io.net5.buffer.api.BufferAllocator;
import io.net5.buffer.Unpooled;
import java.io.EOFException; import java.io.EOFException;
import java.io.IOException; import java.io.IOException;
import org.apache.lucene.store.IndexInput; import org.apache.lucene.store.IndexInput;
@ -15,7 +14,11 @@ public class RocksdbInputStream extends IndexInput {
private final long length; private final long length;
private ByteBuf currentBuffer; private Buffer currentBuffer;
private int currentBufferIndex;
private boolean closed = false;
private final RocksdbFileStore store; private final RocksdbFileStore store;
@ -30,24 +33,33 @@ public class RocksdbInputStream extends IndexInput {
store, store,
bufferSize, bufferSize,
length, length,
ByteBufAllocator.DEFAULT.ioBuffer(bufferSize, bufferSize).writerIndex(bufferSize) null
); );
} }
private RocksdbInputStream(String name, RocksdbFileStore store, int bufferSize, long length, ByteBuf currentBuffer) { private RocksdbInputStream(String name, RocksdbFileStore store, int bufferSize, long length, Buffer currentBuffer) {
super("RocksdbInputStream(name=" + name + ")"); super("RocksdbInputStream(name=" + name + ")");
this.name = name; this.name = name;
this.store = store; this.store = store;
this.bufferSize = bufferSize; this.bufferSize = bufferSize;
this.currentBuffer = currentBuffer; this.currentBuffer = currentBuffer;
currentBuffer.readerIndex(bufferSize); this.currentBufferIndex = bufferSize;
this.position = 0; this.position = 0;
this.length = length; this.length = length;
if (currentBuffer != null && bufferSize > currentBuffer.capacity()) {
throw new IllegalArgumentException(
"BufferSize is " + bufferSize + " but the buffer has only a capacity of " + currentBuffer.capacity());
}
} }
@Override @Override
public void close() throws IOException { public void close() throws IOException {
currentBuffer.release(); if (!closed) {
closed = true;
if (currentBuffer != null) {
currentBuffer.close();
}
}
} }
@Override @Override
@ -61,7 +73,7 @@ public class RocksdbInputStream extends IndexInput {
throw new IllegalArgumentException("pos must be between 0 and " + length); throw new IllegalArgumentException("pos must be between 0 and " + length);
} }
position = pos; position = pos;
currentBuffer.readerIndex(bufferSize); currentBufferIndex = this.bufferSize;
} }
@Override @Override
@ -76,11 +88,10 @@ public class RocksdbInputStream extends IndexInput {
throw new IllegalArgumentException("slice() " + sliceDescription + " out of bounds: " + this); throw new IllegalArgumentException("slice() " + sliceDescription + " out of bounds: " + this);
} }
return new RocksdbInputStream(name, return new RocksDBSliceInputStream(name,
store, store,
bufferSize, bufferSize,
offset + length, offset + length
Unpooled.buffer(bufferSize, bufferSize).writerIndex(bufferSize)
) { ) {
{ {
seek(0L); seek(0L);
@ -121,18 +132,21 @@ public class RocksdbInputStream extends IndexInput {
throw new EOFException("Read end"); throw new EOFException("Read end");
} }
loadBufferIfNeed(); loadBufferIfNeed();
byte b = currentBuffer.readByte(); byte b = currentBuffer.getByte(currentBufferIndex++);
position++; position++;
return b; return b;
} }
protected void loadBufferIfNeed() throws IOException { protected void loadBufferIfNeed() throws IOException {
if (currentBuffer.readerIndex() == this.bufferSize) { if (currentBuffer == null) {
currentBuffer = store.bufferAllocator.allocate(bufferSize).writerOffset(bufferSize);
}
if (this.currentBufferIndex == this.bufferSize) {
int n = store.load(name, position, currentBuffer, 0, bufferSize); int n = store.load(name, position, currentBuffer, 0, bufferSize);
if (n == -1) { if (n == -1) {
throw new EOFException("Read end"); throw new EOFException("Read end");
} }
currentBuffer.readerIndex(0); this.currentBufferIndex = 0;
} }
} }
@ -148,12 +162,13 @@ public class RocksdbInputStream extends IndexInput {
do { do {
loadBufferIfNeed(); loadBufferIfNeed();
int r = Math.min(bufferSize - currentBuffer.readerIndex(), n); int r = Math.min(bufferSize - currentBufferIndex, n);
currentBuffer.readBytes(b, f, r); currentBuffer.copyInto(currentBufferIndex, b, f, r);
f += r; f += r;
position += r; position += r;
currentBufferIndex += r;
n -= r; n -= r;
} while (n != 0); } while (n != 0);
@ -161,8 +176,6 @@ public class RocksdbInputStream extends IndexInput {
@Override @Override
public IndexInput clone() { public IndexInput clone() {
RocksdbInputStream in = (RocksdbInputStream) super.clone(); return super.clone();
in.currentBuffer = in.currentBuffer.duplicate();
return in;
} }
} }

View File

@ -2,6 +2,7 @@ package it.cavallium.dbengine.lucene.directory;
import io.net5.buffer.ByteBuf; import io.net5.buffer.ByteBuf;
import io.net5.buffer.ByteBufAllocator; import io.net5.buffer.ByteBufAllocator;
import io.net5.buffer.api.Buffer;
import org.apache.lucene.store.BufferedChecksum; import org.apache.lucene.store.BufferedChecksum;
import org.apache.lucene.store.IndexOutput; import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.util.Accountable; import org.apache.lucene.util.Accountable;
@ -17,7 +18,7 @@ public class RocksdbOutputStream extends IndexOutput implements Accountable {
private long position; private long position;
private ByteBuf currentBuffer; private Buffer currentBuffer;
private boolean dirty; private boolean dirty;
@ -32,7 +33,7 @@ public class RocksdbOutputStream extends IndexOutput implements Accountable {
this.name = name; this.name = name;
this.store = store; this.store = store;
this.bufferSize = bufferSize; this.bufferSize = bufferSize;
this.currentBuffer = ByteBufAllocator.DEFAULT.ioBuffer(bufferSize, bufferSize); this.currentBuffer = store.bufferAllocator.allocate(bufferSize);
this.position = 0; this.position = 0;
this.dirty = false; this.dirty = false;
if (checksum) { if (checksum) {
@ -48,15 +49,15 @@ public class RocksdbOutputStream extends IndexOutput implements Accountable {
if (dirty) { if (dirty) {
flush(); flush();
} }
currentBuffer.release(); currentBuffer.close();
currentBuffer = null; currentBuffer = null;
} }
} }
private void flush() throws IOException { private void flush() throws IOException {
store.append(name, currentBuffer, 0, currentBuffer.writerIndex()); store.append(name, currentBuffer, 0, currentBuffer.writerOffset());
currentBuffer.writerIndex(0); currentBuffer.writerOffset(0);
dirty = false; dirty = false;
} }
@ -81,7 +82,7 @@ public class RocksdbOutputStream extends IndexOutput implements Accountable {
if (crc != null) { if (crc != null) {
crc.update(b); crc.update(b);
} }
if (currentBuffer.writerIndex() == bufferSize) { if (currentBuffer.writerOffset() == bufferSize) {
flush(); flush();
} }
currentBuffer.writeByte(b); currentBuffer.writeByte(b);
@ -98,10 +99,10 @@ public class RocksdbOutputStream extends IndexOutput implements Accountable {
int f = offset; int f = offset;
int n = length; int n = length;
do { do {
if (currentBuffer.writerIndex() == bufferSize) { if (currentBuffer.writerOffset() == bufferSize) {
flush(); flush();
} }
int r = Math.min(bufferSize - currentBuffer.writerIndex(), n); int r = Math.min(bufferSize - currentBuffer.writerOffset(), n);
currentBuffer.writeBytes(b, f, r); currentBuffer.writeBytes(b, f, r);
f += r; f += r;
position += r; position += r;

View File

@ -5,6 +5,7 @@ import static it.cavallium.dbengine.DbTestUtils.ensureNoLeaks;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry; import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
import it.cavallium.data.generator.nativedata.Nullableboolean; import it.cavallium.data.generator.nativedata.Nullableboolean;
import it.cavallium.data.generator.nativedata.Nullabledouble;
import it.cavallium.data.generator.nativedata.Nullableint; import it.cavallium.data.generator.nativedata.Nullableint;
import it.cavallium.data.generator.nativedata.Nullablelong; import it.cavallium.data.generator.nativedata.Nullablelong;
import it.cavallium.dbengine.DbTestUtils.TempDb; import it.cavallium.dbengine.DbTestUtils.TempDb;
@ -42,9 +43,11 @@ public class LocalTemporaryDbGenerator implements TemporaryDbGenerator {
Duration.ofSeconds(5), Duration.ofSeconds(5),
false, false,
new ByteBuffersDirectory(), new ByteBuffersDirectory(),
16 * 1024 * 1024, Nullableboolean.empty(),
true, Nullabledouble.empty(),
false, Nullableint.empty(),
Nullableboolean.empty(),
Nullableboolean.empty(),
true, true,
MAX_IN_MEMORY_RESULT_ENTRIES MAX_IN_MEMORY_RESULT_ENTRIES
); );

View File

@ -4,6 +4,7 @@ import static it.cavallium.dbengine.DbTestUtils.MAX_IN_MEMORY_RESULT_ENTRIES;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry; import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
import it.cavallium.data.generator.nativedata.Nullableboolean; import it.cavallium.data.generator.nativedata.Nullableboolean;
import it.cavallium.data.generator.nativedata.Nullabledouble;
import it.cavallium.data.generator.nativedata.Nullableint; import it.cavallium.data.generator.nativedata.Nullableint;
import it.cavallium.data.generator.nativedata.Nullablelong; import it.cavallium.data.generator.nativedata.Nullablelong;
import it.cavallium.dbengine.DbTestUtils.TempDb; import it.cavallium.dbengine.DbTestUtils.TempDb;
@ -33,9 +34,11 @@ public class MemoryTemporaryDbGenerator implements TemporaryDbGenerator {
Duration.ofSeconds(5), Duration.ofSeconds(5),
false, false,
new ByteBuffersDirectory(), new ByteBuffersDirectory(),
16 * 1024 * 1024, Nullableboolean.empty(),
true, Nullabledouble.empty(),
false, Nullableint.empty(),
Nullableboolean.empty(),
Nullableboolean.empty(),
false, false,
MAX_IN_MEMORY_RESULT_ENTRIES MAX_IN_MEMORY_RESULT_ENTRIES
); );