diff --git a/src/main/data-generator/quic-rpc.yaml b/src/main/data-generator/quic-rpc.yaml index 148ade0..e4b8765 100644 --- a/src/main/data-generator/quic-rpc.yaml +++ b/src/main/data-generator/quic-rpc.yaml @@ -251,9 +251,11 @@ versions: commitDebounceTime: Duration lowMemory: boolean directoryOptions: LuceneDirectoryOptions - indexWriterBufferSize: long - applyAllDeletes: boolean - writeAllDeletes: boolean + indexWriterReaderPooling: -boolean + indexWriterRAMBufferSizeMB: -double + indexWriterMaxBufferedDocs: -int + applyAllDeletes: -boolean + writeAllDeletes: -boolean allowNonVolatileCollection: boolean maxInMemoryResultEntries: int ByteBuffersDirectory: { data: { } } diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java index 49d9f38..92ffef7 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java @@ -175,23 +175,23 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { } logger.trace("WriterSchedulerMaxThreadCount: {}", writerSchedulerMaxThreadCount); indexWriterConfig.setMergeScheduler(mergeScheduler); - if (luceneOptions.indexWriterBufferSize() == -1) { - //todo: allow to configure maxbuffereddocs fallback - indexWriterConfig.setMaxBufferedDocs(80000); - // disable ram buffer size after enabling maxBufferedDocs - indexWriterConfig.setRAMBufferSizeMB(-1); - } else { - indexWriterConfig.setRAMBufferSizeMB(luceneOptions.indexWriterBufferSize() / 1024D / 1024D); + if (luceneOptions.indexWriterRAMBufferSizeMB().isPresent()) { + indexWriterConfig.setRAMBufferSizeMB(luceneOptions.indexWriterRAMBufferSizeMB().get()); + } + if (luceneOptions.indexWriterMaxBufferedDocs().isPresent()) { + indexWriterConfig.setMaxBufferedDocs(luceneOptions.indexWriterMaxBufferedDocs().get()); + } + if (luceneOptions.indexWriterReaderPooling().isPresent()) { + indexWriterConfig.setReaderPooling(luceneOptions.indexWriterReaderPooling().get()); } - indexWriterConfig.setReaderPooling(false); indexWriterConfig.setSimilarity(getLuceneSimilarity()); this.indexWriter = new IndexWriter(directory, indexWriterConfig); this.snapshotsManager = new SnapshotsManager(indexWriter, snapshotter); this.searcherManager = new CachedIndexSearcherManager(indexWriter, snapshotsManager, getLuceneSimilarity(), - luceneOptions.applyAllDeletes(), - luceneOptions.writeAllDeletes(), + luceneOptions.applyAllDeletes().orElse(true), + luceneOptions.writeAllDeletes().orElse(false), luceneOptions.queryRefreshDebounceTime() ); diff --git a/src/main/java/it/cavallium/dbengine/lucene/LuceneRocksDBManager.java b/src/main/java/it/cavallium/dbengine/lucene/LuceneRocksDBManager.java index 47ff0bb..47ac2a8 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/LuceneRocksDBManager.java +++ b/src/main/java/it/cavallium/dbengine/lucene/LuceneRocksDBManager.java @@ -1,5 +1,6 @@ package it.cavallium.dbengine.lucene; +import io.net5.buffer.api.BufferAllocator; import it.cavallium.dbengine.lucene.directory.RocksDBInstance; import it.cavallium.dbengine.lucene.directory.RocksdbFileStore; import java.io.IOException; @@ -16,6 +17,14 @@ public class LuceneRocksDBManager { private static final Logger LOG = LogManager.getLogger(LuceneRocksDBManager.class); private final List> dbs = new ArrayList<>(); + private BufferAllocator bufferAllocator; + + public synchronized BufferAllocator getAllocator() { + if (bufferAllocator == null) { + bufferAllocator = BufferAllocator.offHeapPooled(); + } + return bufferAllocator; + } public synchronized RocksDBInstance getOrCreate(Path path) { try { @@ -41,5 +50,9 @@ public class LuceneRocksDBManager { } } dbs.clear(); + if (bufferAllocator != null) { + bufferAllocator.close(); + } + bufferAllocator = null; } } diff --git a/src/main/java/it/cavallium/dbengine/lucene/LuceneUtils.java b/src/main/java/it/cavallium/dbengine/lucene/LuceneUtils.java index b357307..6540021 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/LuceneUtils.java +++ b/src/main/java/it/cavallium/dbengine/lucene/LuceneUtils.java @@ -610,7 +610,8 @@ public class LuceneUtils { ); } else if (directoryOptions instanceof RocksDBSharedDirectory rocksDBSharedDirectory) { var dbInstance = rocksDBManager.getOrCreate(rocksDBSharedDirectory.managedPath()); - return new RocksdbDirectory(dbInstance.db(), + return new RocksdbDirectory(rocksDBManager.getAllocator(), + dbInstance.db(), dbInstance.handles(), directoryName, rocksDBSharedDirectory.blockSize() diff --git a/src/main/java/it/cavallium/dbengine/lucene/directory/RocksDBSliceInputStream.java b/src/main/java/it/cavallium/dbengine/lucene/directory/RocksDBSliceInputStream.java new file mode 100644 index 0000000..da63912 --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/lucene/directory/RocksDBSliceInputStream.java @@ -0,0 +1,168 @@ +package it.cavallium.dbengine.lucene.directory; + +import io.net5.buffer.Unpooled; +import io.net5.buffer.api.Buffer; +import io.net5.buffer.api.BufferAllocator; +import io.net5.buffer.api.BufferRef; +import io.net5.buffer.api.adaptor.ByteBufAdaptor; +import org.apache.lucene.store.IndexInput; + +import java.io.EOFException; +import java.io.IOException; + +public class RocksDBSliceInputStream extends IndexInput { + + private static final BufferAllocator HEAP_UNPOOLED_BUFFER = BufferAllocator.onHeapUnpooled(); + + private final int bufferSize; + + private long position; + + private final long length; + + private byte[] currentBuffer; + + private int currentBufferIndex; + + private final RocksdbFileStore store; + + private final String name; + + public RocksDBSliceInputStream(String name, RocksdbFileStore store, int bufferSize) throws IOException { + this(name, store, bufferSize, store.getSize(name)); + } + + public RocksDBSliceInputStream(String name, RocksdbFileStore store, int bufferSize, long length) { + super("RocksDBSliceInputStream(name=" + name + ")"); + this.name = name; + this.store = store; + this.bufferSize = bufferSize; + this.currentBuffer = new byte[this.bufferSize]; + this.currentBufferIndex = bufferSize; + this.position = 0; + this.length = length; + + + } + + @Override + public void close() throws IOException { + //store.close(); + } + + @Override + public long getFilePointer() { + return position; + } + + @Override + public void seek(long pos) { + if (pos < 0 || pos > length) { + throw new IllegalArgumentException("pos must be between 0 and " + length); + } + position = pos; + currentBufferIndex = this.bufferSize; + } + + @Override + public long length() { + return this.length; + } + + @Override + public IndexInput slice(String sliceDescription, final long offset, final long length) throws IOException { + + if (offset < 0 || length < 0 || offset + length > this.length) { + throw new IllegalArgumentException("slice() " + sliceDescription + " out of bounds: " + this); + } + + return new RocksDBSliceInputStream(name, store, bufferSize, offset + length) { + { + seek(0L); + } + + @Override + public void seek(long pos) { + if (pos < 0L) { + throw new IllegalArgumentException("Seeking to negative position: " + this); + } + + super.seek(pos + offset); + } + + + @Override + public long getFilePointer() { + return super.getFilePointer() - offset; + } + + @Override + public long length() { + return super.length() - offset; + } + + @Override + public IndexInput slice(String sliceDescription, long ofs, long len) throws IOException { + return super.slice(sliceDescription, offset + ofs, len); + } + }; + } + + + @Override + public byte readByte() throws IOException { + + if (position >= length) { + throw new EOFException("Read end"); + } + loadBufferIfNeed(); + byte b = currentBuffer[currentBufferIndex++]; + position++; + return b; + } + + protected void loadBufferIfNeed() throws IOException { + if (this.currentBufferIndex == this.bufferSize) { + try (var editedBuffer = HEAP_UNPOOLED_BUFFER.copyOf(currentBuffer)) { + int n = store.load(name, position, editedBuffer, 0, bufferSize); + editedBuffer.copyInto(0, currentBuffer, 0, currentBuffer.length); + if (n == -1) { + throw new EOFException("Read end"); + } + } + this.currentBufferIndex = 0; + } + } + + @Override + public void readBytes(byte[] b, int offset, int len) throws IOException { + + if (position >= length) { + throw new EOFException("Read end"); + } + + int f = offset; + int n = Math.min((int) (length - position), len); + do { + loadBufferIfNeed(); + + int r = Math.min(bufferSize - currentBufferIndex, n); + + System.arraycopy(currentBuffer, currentBufferIndex, b, f, r); + + f += r; + position += r; + currentBufferIndex += r; + n -= r; + + } while (n != 0); + } + + @Override + public IndexInput clone() { + RocksDBSliceInputStream in = (RocksDBSliceInputStream) super.clone(); + in.currentBuffer = new byte[bufferSize]; + System.arraycopy(this.currentBuffer, 0, in.currentBuffer, 0, bufferSize); + return in; + } +} \ No newline at end of file diff --git a/src/main/java/it/cavallium/dbengine/lucene/directory/RocksdbDirectory.java b/src/main/java/it/cavallium/dbengine/lucene/directory/RocksdbDirectory.java index e08c8c9..743c823 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/directory/RocksdbDirectory.java +++ b/src/main/java/it/cavallium/dbengine/lucene/directory/RocksdbDirectory.java @@ -1,6 +1,7 @@ package it.cavallium.dbengine.lucene.directory; import com.google.common.util.concurrent.Striped; +import io.net5.buffer.api.BufferAllocator; import java.io.FileNotFoundException; import java.io.IOException; import java.nio.file.Path; @@ -34,27 +35,33 @@ public class RocksdbDirectory extends BaseDirectory implements Accountable { private final AtomicLong nextTempFileCounter = new AtomicLong(); - public RocksdbDirectory(Path path, int blockSize) throws IOException { - this(path, blockSize, new SingleInstanceLockFactory()); + public RocksdbDirectory(BufferAllocator bufferAllocator, Path path, int blockSize) throws IOException { + this(bufferAllocator, path, blockSize, new SingleInstanceLockFactory()); } - public RocksdbDirectory(RocksDB db, Map handles, @Nullable String name, int blockSize) + public RocksdbDirectory(BufferAllocator bufferAllocator, + RocksDB db, + Map handles, + @Nullable String name, + int blockSize) throws IOException { - this(db, handles, name, blockSize, new SingleInstanceLockFactory()); + this(bufferAllocator, db, handles, name, blockSize, new SingleInstanceLockFactory()); } - protected RocksdbDirectory(Path path, int blockSize, LockFactory lockFactory) throws IOException { + protected RocksdbDirectory(BufferAllocator bufferAllocator, Path path, int blockSize, LockFactory lockFactory) + throws IOException { super(lockFactory); - store = RocksdbFileStore.create(path, blockSize, metaLock); + store = RocksdbFileStore.create(bufferAllocator, path, blockSize, metaLock); } - protected RocksdbDirectory(RocksDB db, + protected RocksdbDirectory(BufferAllocator bufferAllocator, + RocksDB db, Map handles, @Nullable String name, int blockSize, LockFactory lockFactory) throws IOException { super(lockFactory); - store = RocksdbFileStore.create(db, handles, name, blockSize, metaLock); + store = RocksdbFileStore.create(bufferAllocator, db, handles, name, blockSize, metaLock); } @Override diff --git a/src/main/java/it/cavallium/dbengine/lucene/directory/RocksdbFileStore.java b/src/main/java/it/cavallium/dbengine/lucene/directory/RocksdbFileStore.java index 442a605..055cd80 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/directory/RocksdbFileStore.java +++ b/src/main/java/it/cavallium/dbengine/lucene/directory/RocksdbFileStore.java @@ -2,11 +2,12 @@ package it.cavallium.dbengine.lucene.directory; import com.google.common.primitives.Longs; import com.google.common.util.concurrent.Striped; -import io.net5.buffer.ByteBuf; -import io.net5.buffer.ByteBufAllocator; +import io.net5.buffer.api.Buffer; +import io.net5.buffer.api.BufferAllocator; +import io.net5.buffer.api.ReadableComponent; +import io.net5.buffer.api.WritableComponent; import java.io.IOException; import java.nio.ByteBuffer; -import java.nio.ByteOrder; import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; @@ -19,10 +20,10 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import org.apache.lucene.store.AlreadyClosedException; import org.jetbrains.annotations.Nullable; +import org.rocksdb.ClockCache; import org.rocksdb.ColumnFamilyDescriptor; import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.ColumnFamilyOptions; @@ -60,6 +61,7 @@ public class RocksdbFileStore { private static final ByteBuffer EMPTY_BYTE_BUF = ByteBuffer.allocateDirect(0); private final RocksDB db; + public final BufferAllocator bufferAllocator; private final int blockSize; private final ColumnFamilyHandle headers; private final ColumnFamilyHandle filename; @@ -71,6 +73,7 @@ public class RocksdbFileStore { private volatile boolean closed; private RocksdbFileStore(RocksDB db, + BufferAllocator bufferAllocator, ColumnFamilyHandle headers, ColumnFamilyHandle filename, ColumnFamilyHandle size, @@ -80,6 +83,7 @@ public class RocksdbFileStore { boolean closeDbOnClose) throws IOException { try { this.db = db; + this.bufferAllocator = bufferAllocator; this.closeDbOnClose = closeDbOnClose; this.blockSize = blockSize; this.headers = headers; @@ -110,11 +114,25 @@ public class RocksdbFileStore { } } + private static ByteBuffer readableNioBuffer(Buffer buffer) { + assert buffer.countReadableComponents() == 1 : "Readable components count: " + buffer.countReadableComponents(); + return ((ReadableComponent) buffer).readableBuffer(); + } + + private static ByteBuffer writableNioBuffer(Buffer buffer, int newWriterOffset) { + assert buffer.countWritableComponents() == 1 : "Writable components count: " + buffer.countWritableComponents(); + buffer.writerOffset(0).ensureWritable(newWriterOffset); + var byteBuf = ((WritableComponent) buffer).writableBuffer(); + buffer.writerOffset(newWriterOffset); + assert buffer.capacity() >= newWriterOffset : "Returned capacity " + buffer.capacity() + " < " + newWriterOffset; + return byteBuf; + } + private static DBOptions getDBOptions() { var options = new DBOptions(); options.setWalSizeLimitMB(256); options.setMaxWriteBatchGroupSizeBytes(2 * SizeUnit.MB); - options.setAtomicFlush(false); + //options.setAtomicFlush(false); options.setWalRecoveryMode(WALRecoveryMode.PointInTimeRecovery); options.setCreateMissingColumnFamilies(true); options.setCreateIfMissing(true); @@ -122,34 +140,17 @@ public class RocksdbFileStore { options.setAvoidUnnecessaryBlockingIO(true); options.setSkipCheckingSstFileSizesOnDbOpen(true); options.setInfoLogLevel(InfoLogLevel.ERROR_LEVEL); - options.setAllowMmapReads(true); - options.setAllowMmapWrites(true); + //options.setAllowMmapReads(true); + //options.setAllowMmapWrites(true); + options.setUseDirectReads(true); + options.setUseDirectIoForFlushAndCompaction(true); options.setIncreaseParallelism(Runtime.getRuntime().availableProcessors()); options.setDeleteObsoleteFilesPeriodMicros(Duration.ofMinutes(15).toNanos() / 1000L); + options.setRowCache(new ClockCache(512 * 1024 * 1024L)); + options.setMaxOpenFiles(500); return options; } - private static List getColumnFamilyDescriptors(@Nullable String name) { - String headersName, filenameName, sizeName, dataName; - if (name != null) { - headersName = (name + "_headers"); - filenameName = (name + "_filename"); - sizeName = (name + "_size"); - dataName = (name + "_data"); - } else { - headersName = DEFAULT_COLUMN_FAMILY_STRING; - filenameName = "filename"; - sizeName = "size"; - dataName = "data"; - } - return List.of( - getColumnFamilyDescriptor(headersName), - getColumnFamilyDescriptor(filenameName), - getColumnFamilyDescriptor(sizeName), - getColumnFamilyDescriptor(dataName) - ); - } - public static ColumnFamilyDescriptor getColumnFamilyDescriptor(String name) { ColumnFamilyOptions opts; if (name.equals(DEFAULT_COLUMN_FAMILY_STRING) || name.endsWith("_headers")) { @@ -174,7 +175,29 @@ public class RocksdbFileStore { return new ColumnFamilyDescriptor(name.getBytes(StandardCharsets.US_ASCII), opts); } - public static RocksdbFileStore create(RocksDB db, + private static List getColumnFamilyDescriptors(@Nullable String name) { + String headersName, filenameName, sizeName, dataName; + if (name != null) { + headersName = (name + "_headers"); + filenameName = (name + "_filename"); + sizeName = (name + "_size"); + dataName = (name + "_data"); + } else { + headersName = DEFAULT_COLUMN_FAMILY_STRING; + filenameName = "filename"; + sizeName = "size"; + dataName = "data"; + } + return List.of( + getColumnFamilyDescriptor(headersName), + getColumnFamilyDescriptor(filenameName), + getColumnFamilyDescriptor(sizeName), + getColumnFamilyDescriptor(dataName) + ); + } + + public static RocksdbFileStore create(BufferAllocator bufferAllocator, + RocksDB db, Map existingHandles, @Nullable String name, int blockSize, @@ -193,6 +216,7 @@ public class RocksdbFileStore { handles.add(columnFamilyHandle); } return new RocksdbFileStore(db, + bufferAllocator, handles.get(0), handles.get(1), handles.get(2), @@ -206,7 +230,10 @@ public class RocksdbFileStore { } } - public static RocksdbFileStore create(Path path, int blockSize, Striped metaLock) throws IOException { + public static RocksdbFileStore create(BufferAllocator bufferAllocator, + Path path, + int blockSize, + Striped metaLock) throws IOException { try { DBOptions options = getDBOptions(); List descriptors = getColumnFamilyDescriptors(null); @@ -216,6 +243,7 @@ public class RocksdbFileStore { var handles = new ArrayList(4); RocksDB db = RocksDB.open(options, path.toString(), descriptors, handles); return new RocksdbFileStore(db, + bufferAllocator, handles.get(0), handles.get(1), handles.get(2), @@ -266,18 +294,13 @@ public class RocksdbFileStore { if (id != null) { return id; } else { - var filenameKey = getFilenameKey(key); - var filenameValue = getFilenameValue(); - try { - if (db.get(filename, DEFAULT_READ_OPTS, filenameKey.nioBuffer(), filenameValue.nioBuffer(0, Long.BYTES)) + try (var filenameKey = getFilenameKey(key); var filenameValue = getFilenameValue()) { + if (db.get(filename, DEFAULT_READ_OPTS, readableNioBuffer(filenameKey), writableNioBuffer(filenameValue, Long.BYTES)) == RocksDB.NOT_FOUND) { throw new IOException("File not found: " + key); } - filenameValue.writerIndex(Long.BYTES); - return filenameValue.readLongLE(); - } finally { - filenameKey.release(); - filenameValue.release(); + filenameValue.writerOffset(Long.BYTES); + return filenameValue.readLong(); } } } @@ -288,18 +311,13 @@ public class RocksdbFileStore { if (id != null) { return id; } else { - var filenameKey = getFilenameKey(key); - var filenameValue = getFilenameValue(); - try { - if (db.get(filename, DEFAULT_READ_OPTS, filenameKey.nioBuffer(), filenameValue.nioBuffer(0, Long.BYTES)) + try (var filenameKey = getFilenameKey(key); var filenameValue = getFilenameValue()) { + if (db.get(filename, DEFAULT_READ_OPTS, readableNioBuffer(filenameKey), writableNioBuffer(filenameValue, Long.BYTES)) == RocksDB.NOT_FOUND) { return null; } - filenameValue.writerIndex(Long.BYTES); - return filenameValue.readLongLE(); - } finally { - filenameKey.release(); - filenameValue.release(); + filenameValue.writerOffset(Long.BYTES); + return filenameValue.readLong(); } } } @@ -309,33 +327,24 @@ public class RocksdbFileStore { if (id != null) { return true; } else { - var filenameKey = getFilenameKey(key); - try { - if (db.keyMayExist(filename, DEFAULT_READ_OPTS, filenameKey.nioBuffer())) { - return db.get(filename, DEFAULT_READ_OPTS, filenameKey.nioBuffer(), EMPTY_BYTE_BUF) != RocksDB.NOT_FOUND; + try (var filenameKey = getFilenameKey(key)) { + if (db.keyMayExist(filename, DEFAULT_READ_OPTS, readableNioBuffer(filenameKey))) { + return db.get(filename, DEFAULT_READ_OPTS, readableNioBuffer(filenameKey), EMPTY_BYTE_BUF) != RocksDB.NOT_FOUND; } else { return false; } - } finally { - filenameKey.release(); } } } private void moveFileId(long id, String oldKey, String newKey) throws RocksDBException { - var filenameOldKey = getFilenameKey(oldKey); - var filenameNewKey = getFilenameKey(newKey); var filenameValue = getFilenameValue(); - filenameValue.writeLongLE(id); - try { - db.delete(filename, DEFAULT_WRITE_OPTS, filenameOldKey.nioBuffer()); + filenameValue.writeLong(id); + try (var filenameOldKey = getFilenameKey(oldKey); var filenameNewKey = getFilenameKey(newKey); filenameValue) { + db.delete(filename, DEFAULT_WRITE_OPTS, readableNioBuffer(filenameOldKey)); incFlush(); - db.put(filename, DEFAULT_WRITE_OPTS, filenameNewKey.nioBuffer(), filenameValue.nioBuffer(0, Long.BYTES)); + db.put(filename, DEFAULT_WRITE_OPTS, readableNioBuffer(filenameNewKey), readableNioBuffer(filenameValue)); incFlush(); - } finally { - filenameOldKey.release(); - filenameNewKey.release(); - filenameValue.release(); } } @@ -352,45 +361,38 @@ public class RocksdbFileStore { if (id != null) { return id; } else { - var filenameKey = getFilenameKey(key); - var filenameValue = getFilenameValue(); - try { - if (db.get(filename, DEFAULT_READ_OPTS, filenameKey.nioBuffer(), filenameValue.nioBuffer(0, Long.BYTES)) + try (var filenameKey = getFilenameKey(key); var filenameValue = getFilenameValue()) { + if (db.get(filename, DEFAULT_READ_OPTS, readableNioBuffer(filenameKey), + writableNioBuffer(filenameValue, Long.BYTES)) == RocksDB.NOT_FOUND) { - filenameValue.writerIndex(0); - filenameValue.readerIndex(0); + filenameValue.writerOffset(0); + filenameValue.readerOffset(0); var newlyAllocatedId = this.nextId.getAndIncrement(); if (newlyAllocatedId % 100 == 99) { - db.put(headers, new byte[] {0x00}, Longs.toByteArray(newlyAllocatedId + 1 + 100)); + db.put(headers, new byte[]{0x00}, Longs.toByteArray(newlyAllocatedId + 1 + 100)); incFlush(); } - filenameValue.writeLongLE(newlyAllocatedId); + filenameValue.writeLong(newlyAllocatedId); db.put(filename, DEFAULT_WRITE_OPTS, - filenameKey.nioBuffer(), - filenameValue.nioBuffer(0, filenameValue.writerIndex()) + readableNioBuffer(filenameKey), + readableNioBuffer(filenameValue) ); incFlush(); filenameToId.put(key, newlyAllocatedId); return newlyAllocatedId; } - filenameValue.readerIndex(0); - filenameValue.writerIndex(Long.BYTES); - return filenameValue.readLongLE(); - } finally { - filenameKey.release(); - filenameValue.release(); + filenameValue.readerOffset(0); + filenameValue.writerOffset(Long.BYTES); + return filenameValue.readLong(); } } } private void dellocateFilename(String key) throws RocksDBException { - var filenameKey = getFilenameKey(key); - try { - db.delete(filename, DEFAULT_WRITE_OPTS, filenameKey.nioBuffer()); + try (var filenameKey = getFilenameKey(key)) { + db.delete(filename, DEFAULT_WRITE_OPTS, readableNioBuffer(filenameKey)); filenameToId.remove(key); - } finally { - filenameKey.release(); } } @@ -405,56 +407,54 @@ public class RocksdbFileStore { } } - private ByteBuf getMetaValueBuf() { - return ByteBufAllocator.DEFAULT.ioBuffer(Long.BYTES, Long.BYTES); + private Buffer getMetaValueBuf() { + return bufferAllocator.allocate(Long.BYTES); } - private ByteBuf getDataValueBuf() { - return ByteBufAllocator.DEFAULT.ioBuffer(blockSize, blockSize); + private Buffer getDataValueBuf() { + return bufferAllocator.allocate(blockSize); } - private ByteBuf getFilenameValue() { - return ByteBufAllocator.DEFAULT.ioBuffer(Long.BYTES, Long.BYTES); + private Buffer getFilenameValue() { + return bufferAllocator.allocate(Long.BYTES); } - private ByteBuf getMetaKey(long id) { - ByteBuf buf = ByteBufAllocator.DEFAULT.ioBuffer(Long.BYTES); - buf.writeLongLE(id); + private Buffer getMetaKey(long id) { + Buffer buf = bufferAllocator.allocate(Long.BYTES); + buf.writeLong(id); return buf; } - private ByteBuf getFilenameKey(String key) { - ByteBuf buf = ByteBufAllocator.DEFAULT.ioBuffer(key.length()); + private Buffer getFilenameKey(String key) { + Buffer buf = bufferAllocator.allocate(key.length()); buf.writeCharSequence(key, StandardCharsets.US_ASCII); return buf; } - private ByteBuf getDataKey(@Nullable ByteBuf buf, long id, int i) { + private Buffer getDataKey(@Nullable Buffer buf, long id, int i) { if (buf == null) { - buf = ByteBufAllocator.DEFAULT.buffer(Long.BYTES + Integer.BYTES); + buf = bufferAllocator.allocate(Long.BYTES + Integer.BYTES); } - buf.writeLongLE(id); + buf.writeLong(id); buf.writeInt(i); return buf; } - private ByteBuf getDataKeyPrefix(long id) { - var buf = ByteBufAllocator.DEFAULT.buffer(Long.BYTES); - buf.writeLongLE(id); + private Buffer getDataKeyPrefix(long id) { + var buf = bufferAllocator.allocate(Long.BYTES); + buf.writeLong(id); return buf; } private byte[] getDataKeyByteArray(long id, int i) { ByteBuffer bb = ByteBuffer.wrap(new byte[Long.BYTES + Integer.BYTES]); - bb.order(ByteOrder.LITTLE_ENDIAN); bb.putLong(id); - bb.order(ByteOrder.BIG_ENDIAN); bb.putInt(i); return bb.array(); } - public int load(String name, long position, ByteBuf buf, int offset, int len) throws IOException { + public int load(String name, long position, Buffer buf, int offset, int len) throws IOException { var l = metaLock.get(name).readLock(); l.lock(); try { @@ -477,15 +477,14 @@ public class RocksdbFileStore { int f = offset; int n = len; - ByteBuf valBuf = getDataValueBuf(); - ByteBuffer valBuffer = valBuf.nioBuffer(0, blockSize); - try { + Buffer valBuf = getDataValueBuf(); + try (valBuf) { + ByteBuffer valBuffer = writableNioBuffer(valBuf, blockSize); boolean shouldSeekTo = true; try (var ro = new ReadOptions(itReadOpts)) { ro.setIgnoreRangeDeletions(true); - ByteBuf fileIdPrefix = getDataKeyPrefix(fileId); - try { - try (var lb = new DirectSlice(fileIdPrefix.internalNioBuffer(0, Long.BYTES), Long.BYTES)) { + try (Buffer fileIdPrefix = getDataKeyPrefix(fileId)) { + try (var lb = new DirectSlice(readableNioBuffer(fileIdPrefix), Long.BYTES)) { ro.setIterateLowerBound(lb); ro.setPrefixSameAsStart(true); try (RocksIterator it = db.newIterator(data, itReadOpts)) { @@ -501,11 +500,8 @@ public class RocksdbFileStore { if (shouldSeekTo) { shouldSeekTo = false; - ByteBuf dataKey = getDataKey(null, fileId, i); - try { - it.seek(dataKey.nioBuffer()); - } finally { - dataKey.release(); + try (Buffer dataKey = getDataKey(null, fileId, i)) { + it.seek(readableNioBuffer(dataKey)); } if (!it.isValid()) { throw new IOException("Block " + name + "(" + fileId + ")" + ":" + i + " not found"); @@ -518,12 +514,12 @@ public class RocksdbFileStore { } assert Arrays.equals(getDataKeyByteArray(fileId, i), it.key()); int dataRead = it.value(valBuffer); - valBuf.writerIndex(dataRead); + valBuf.writerOffset(dataRead); - valBuf.getBytes(m, buf, f, r); + valBuf.copyInto(m, buf, f, r); - valBuf.writerIndex(0); - valBuf.readerIndex(0); + valBuf.writerOffset(0); + valBuf.readerOffset(0); p += r; f += r; @@ -533,12 +529,8 @@ public class RocksdbFileStore { return (int) (p - position); } } - } finally { - fileIdPrefix.release(); } } - } finally { - valBuf.release(); } } catch (RocksDBException ex) { throw new IOException(ex); @@ -581,19 +573,14 @@ public class RocksdbFileStore { */ private long getSizeInternal(long fileId) throws IOException { try { - ByteBuf metaKey = getMetaKey(fileId); - ByteBuf metaData = getMetaValueBuf(); - try { - if (db.get(size, DEFAULT_READ_OPTS, metaKey.nioBuffer(), metaData.internalNioBuffer(0, Long.BYTES)) + try (Buffer metaKey = getMetaKey(fileId); Buffer metaData = getMetaValueBuf()) { + if (db.get(size, DEFAULT_READ_OPTS, readableNioBuffer(metaKey), writableNioBuffer(metaData, Long.BYTES)) != RocksDB.NOT_FOUND) { - metaData.writerIndex(Long.BYTES); - return metaData.readLongLE(); + metaData.writerOffset(Long.BYTES); + return metaData.readLong(); } else { return -1; } - } finally { - metaData.release(); - metaKey.release(); } } catch (RocksDBException ex) { throw new IOException(ex); @@ -615,27 +602,24 @@ public class RocksdbFileStore { if (size == -1) { return; } - ByteBuf dataKey = null; + Buffer dataKey = null; try { int n = (int) ((size + blockSize - 1) / blockSize); if (n == 1) { dataKey = getDataKey(dataKey, fileId, 0); - db.delete(data, DEFAULT_WRITE_OPTS, dataKey.nioBuffer()); + db.delete(data, DEFAULT_WRITE_OPTS, readableNioBuffer(dataKey)); } else if (n > 1) { var dataKey1 = getDataKeyByteArray(fileId, 0); var dataKey2 = getDataKeyByteArray(fileId, n - 1); db.deleteRange(data, DEFAULT_WRITE_OPTS, dataKey1, dataKey2); } - ByteBuf metaKey = getMetaKey(fileId); - try { + try (Buffer metaKey = getMetaKey(fileId)) { dellocateFilename(key); - db.delete(this.size, DEFAULT_WRITE_OPTS, metaKey.nioBuffer()); - } finally { - metaKey.release(); + db.delete(this.size, DEFAULT_WRITE_OPTS, readableNioBuffer(metaKey)); } } finally { if (dataKey != null) { - dataKey.release(); + dataKey.close(); } } } catch (RocksDBException ex) { @@ -689,7 +673,7 @@ public class RocksdbFileStore { return keys; } - public void append(String name, ByteBuf buf, int offset, int len) throws IOException { + public void append(String name, Buffer buf, int offset, int len) throws IOException { var l = metaLock.get(name).writeLock(); l.lock(); try { @@ -707,57 +691,62 @@ public class RocksdbFileStore { n = len; fileId = getFileIdOrAllocate(name); - ByteBuf dataKey = null; - ByteBuf bb = getDataValueBuf(); + Buffer dataKey = null; + Buffer bb = getDataValueBuf(); try { - do { - int m = (int) (size % (long) blockSize); - int r = Math.min(blockSize - m, n); + do { + int m = (int) (size % (long) blockSize); + int r = Math.min(blockSize - m, n); - int i = (int) ((size) / (long) blockSize); - dataKey = getDataKey(dataKey, fileId, i); - if (m != 0) { - int dataRead; - if ((dataRead = db.get(data, DEFAULT_READ_OPTS, dataKey.nioBuffer(), bb.internalNioBuffer(0, blockSize))) - == RocksDB.NOT_FOUND) { - throw new IOException("Block " + name + "(" + fileId + "):" + i + " not found"); - } - bb.writerIndex(dataRead); - dataKey.readerIndex(0); - } else { - bb.writerIndex(0); + int i = (int) ((size) / (long) blockSize); + dataKey = getDataKey(dataKey, fileId, i); + if (m != 0) { + int dataRead; + if ((dataRead = db.get(data, + DEFAULT_READ_OPTS, + readableNioBuffer(dataKey), + writableNioBuffer(bb, blockSize) + )) == RocksDB.NOT_FOUND) { + throw new IOException("Block " + name + "(" + fileId + "):" + i + " not found"); } + bb.writerOffset(dataRead); + dataKey.readerOffset(0); + } else { + bb.writerOffset(0); + } - bb.ensureWritable(r); - bb.setBytes(m, buf, f, r); + bb.ensureWritable(r); + buf.copyInto(f, bb, m, r); - db.put(data, DEFAULT_WRITE_OPTS, dataKey.nioBuffer(), bb.internalNioBuffer(0, m + r)); - incFlush(); - size += r; - f += r; - n -= r; + var bbBuf = writableNioBuffer(bb, m + r); - dataKey.readerIndex(0); - dataKey.writerIndex(0); - bb.readerIndex(0); - bb.writerIndex(0); - } while (n != 0); + assert bbBuf.capacity() >= m + r : bbBuf.capacity() + " < " + (m + r); + assert bbBuf.position() == 0; + bbBuf.limit(m + r); + assert bbBuf.limit() == m + r; + + db.put(data, DEFAULT_WRITE_OPTS, readableNioBuffer(dataKey), bbBuf); + incFlush(); + size += r; + f += r; + n -= r; + + dataKey.readerOffset(0); + dataKey.writerOffset(0); + bb.readerOffset(0); + bb.writerOffset(0); + } while (n != 0); } finally { if (dataKey != null) { - dataKey.release(); + dataKey.close(); } - bb.release(); + bb.close(); } - ByteBuf metaKey = getMetaKey(fileId); - ByteBuf metaValue = getMetaValueBuf(); - try { - metaValue.writeLongLE(size); - db.put(this.size, DEFAULT_WRITE_OPTS, metaKey.nioBuffer(), metaValue.nioBuffer(0, Long.BYTES)); + try (Buffer metaKey = getMetaKey(fileId); Buffer metaValue = getMetaValueBuf()) { + metaValue.writeLong(size); + db.put(this.size, DEFAULT_WRITE_OPTS, readableNioBuffer(metaKey), readableNioBuffer(metaValue)); incFlush(); - } finally { - metaValue.release(); - metaKey.release(); } } catch (RocksDBException ex) { throw new IOException(ex); diff --git a/src/main/java/it/cavallium/dbengine/lucene/directory/RocksdbInputStream.java b/src/main/java/it/cavallium/dbengine/lucene/directory/RocksdbInputStream.java index 961043c..86961b6 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/directory/RocksdbInputStream.java +++ b/src/main/java/it/cavallium/dbengine/lucene/directory/RocksdbInputStream.java @@ -1,8 +1,7 @@ package it.cavallium.dbengine.lucene.directory; -import io.net5.buffer.ByteBuf; -import io.net5.buffer.ByteBufAllocator; -import io.net5.buffer.Unpooled; +import io.net5.buffer.api.Buffer; +import io.net5.buffer.api.BufferAllocator; import java.io.EOFException; import java.io.IOException; import org.apache.lucene.store.IndexInput; @@ -15,7 +14,11 @@ public class RocksdbInputStream extends IndexInput { private final long length; - private ByteBuf currentBuffer; + private Buffer currentBuffer; + + private int currentBufferIndex; + + private boolean closed = false; private final RocksdbFileStore store; @@ -30,24 +33,33 @@ public class RocksdbInputStream extends IndexInput { store, bufferSize, length, - ByteBufAllocator.DEFAULT.ioBuffer(bufferSize, bufferSize).writerIndex(bufferSize) + null ); } - private RocksdbInputStream(String name, RocksdbFileStore store, int bufferSize, long length, ByteBuf currentBuffer) { + private RocksdbInputStream(String name, RocksdbFileStore store, int bufferSize, long length, Buffer currentBuffer) { super("RocksdbInputStream(name=" + name + ")"); this.name = name; this.store = store; this.bufferSize = bufferSize; this.currentBuffer = currentBuffer; - currentBuffer.readerIndex(bufferSize); + this.currentBufferIndex = bufferSize; this.position = 0; this.length = length; + if (currentBuffer != null && bufferSize > currentBuffer.capacity()) { + throw new IllegalArgumentException( + "BufferSize is " + bufferSize + " but the buffer has only a capacity of " + currentBuffer.capacity()); + } } @Override public void close() throws IOException { - currentBuffer.release(); + if (!closed) { + closed = true; + if (currentBuffer != null) { + currentBuffer.close(); + } + } } @Override @@ -61,7 +73,7 @@ public class RocksdbInputStream extends IndexInput { throw new IllegalArgumentException("pos must be between 0 and " + length); } position = pos; - currentBuffer.readerIndex(bufferSize); + currentBufferIndex = this.bufferSize; } @Override @@ -76,11 +88,10 @@ public class RocksdbInputStream extends IndexInput { throw new IllegalArgumentException("slice() " + sliceDescription + " out of bounds: " + this); } - return new RocksdbInputStream(name, + return new RocksDBSliceInputStream(name, store, bufferSize, - offset + length, - Unpooled.buffer(bufferSize, bufferSize).writerIndex(bufferSize) + offset + length ) { { seek(0L); @@ -121,18 +132,21 @@ public class RocksdbInputStream extends IndexInput { throw new EOFException("Read end"); } loadBufferIfNeed(); - byte b = currentBuffer.readByte(); + byte b = currentBuffer.getByte(currentBufferIndex++); position++; return b; } protected void loadBufferIfNeed() throws IOException { - if (currentBuffer.readerIndex() == this.bufferSize) { + if (currentBuffer == null) { + currentBuffer = store.bufferAllocator.allocate(bufferSize).writerOffset(bufferSize); + } + if (this.currentBufferIndex == this.bufferSize) { int n = store.load(name, position, currentBuffer, 0, bufferSize); if (n == -1) { throw new EOFException("Read end"); } - currentBuffer.readerIndex(0); + this.currentBufferIndex = 0; } } @@ -148,12 +162,13 @@ public class RocksdbInputStream extends IndexInput { do { loadBufferIfNeed(); - int r = Math.min(bufferSize - currentBuffer.readerIndex(), n); + int r = Math.min(bufferSize - currentBufferIndex, n); - currentBuffer.readBytes(b, f, r); + currentBuffer.copyInto(currentBufferIndex, b, f, r); f += r; position += r; + currentBufferIndex += r; n -= r; } while (n != 0); @@ -161,8 +176,6 @@ public class RocksdbInputStream extends IndexInput { @Override public IndexInput clone() { - RocksdbInputStream in = (RocksdbInputStream) super.clone(); - in.currentBuffer = in.currentBuffer.duplicate(); - return in; + return super.clone(); } } \ No newline at end of file diff --git a/src/main/java/it/cavallium/dbengine/lucene/directory/RocksdbOutputStream.java b/src/main/java/it/cavallium/dbengine/lucene/directory/RocksdbOutputStream.java index a3ba586..18218eb 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/directory/RocksdbOutputStream.java +++ b/src/main/java/it/cavallium/dbengine/lucene/directory/RocksdbOutputStream.java @@ -2,6 +2,7 @@ package it.cavallium.dbengine.lucene.directory; import io.net5.buffer.ByteBuf; import io.net5.buffer.ByteBufAllocator; +import io.net5.buffer.api.Buffer; import org.apache.lucene.store.BufferedChecksum; import org.apache.lucene.store.IndexOutput; import org.apache.lucene.util.Accountable; @@ -17,7 +18,7 @@ public class RocksdbOutputStream extends IndexOutput implements Accountable { private long position; - private ByteBuf currentBuffer; + private Buffer currentBuffer; private boolean dirty; @@ -32,7 +33,7 @@ public class RocksdbOutputStream extends IndexOutput implements Accountable { this.name = name; this.store = store; this.bufferSize = bufferSize; - this.currentBuffer = ByteBufAllocator.DEFAULT.ioBuffer(bufferSize, bufferSize); + this.currentBuffer = store.bufferAllocator.allocate(bufferSize); this.position = 0; this.dirty = false; if (checksum) { @@ -48,15 +49,15 @@ public class RocksdbOutputStream extends IndexOutput implements Accountable { if (dirty) { flush(); } - currentBuffer.release(); + currentBuffer.close(); currentBuffer = null; } } private void flush() throws IOException { - store.append(name, currentBuffer, 0, currentBuffer.writerIndex()); - currentBuffer.writerIndex(0); + store.append(name, currentBuffer, 0, currentBuffer.writerOffset()); + currentBuffer.writerOffset(0); dirty = false; } @@ -81,7 +82,7 @@ public class RocksdbOutputStream extends IndexOutput implements Accountable { if (crc != null) { crc.update(b); } - if (currentBuffer.writerIndex() == bufferSize) { + if (currentBuffer.writerOffset() == bufferSize) { flush(); } currentBuffer.writeByte(b); @@ -98,10 +99,10 @@ public class RocksdbOutputStream extends IndexOutput implements Accountable { int f = offset; int n = length; do { - if (currentBuffer.writerIndex() == bufferSize) { + if (currentBuffer.writerOffset() == bufferSize) { flush(); } - int r = Math.min(bufferSize - currentBuffer.writerIndex(), n); + int r = Math.min(bufferSize - currentBuffer.writerOffset(), n); currentBuffer.writeBytes(b, f, r); f += r; position += r; diff --git a/src/test/java/it/cavallium/dbengine/LocalTemporaryDbGenerator.java b/src/test/java/it/cavallium/dbengine/LocalTemporaryDbGenerator.java index 3f2746b..0e89a69 100644 --- a/src/test/java/it/cavallium/dbengine/LocalTemporaryDbGenerator.java +++ b/src/test/java/it/cavallium/dbengine/LocalTemporaryDbGenerator.java @@ -5,6 +5,7 @@ import static it.cavallium.dbengine.DbTestUtils.ensureNoLeaks; import io.micrometer.core.instrument.simple.SimpleMeterRegistry; import it.cavallium.data.generator.nativedata.Nullableboolean; +import it.cavallium.data.generator.nativedata.Nullabledouble; import it.cavallium.data.generator.nativedata.Nullableint; import it.cavallium.data.generator.nativedata.Nullablelong; import it.cavallium.dbengine.DbTestUtils.TempDb; @@ -42,9 +43,11 @@ public class LocalTemporaryDbGenerator implements TemporaryDbGenerator { Duration.ofSeconds(5), false, new ByteBuffersDirectory(), - 16 * 1024 * 1024, - true, - false, + Nullableboolean.empty(), + Nullabledouble.empty(), + Nullableint.empty(), + Nullableboolean.empty(), + Nullableboolean.empty(), true, MAX_IN_MEMORY_RESULT_ENTRIES ); diff --git a/src/test/java/it/cavallium/dbengine/MemoryTemporaryDbGenerator.java b/src/test/java/it/cavallium/dbengine/MemoryTemporaryDbGenerator.java index d493561..f101d35 100644 --- a/src/test/java/it/cavallium/dbengine/MemoryTemporaryDbGenerator.java +++ b/src/test/java/it/cavallium/dbengine/MemoryTemporaryDbGenerator.java @@ -4,6 +4,7 @@ import static it.cavallium.dbengine.DbTestUtils.MAX_IN_MEMORY_RESULT_ENTRIES; import io.micrometer.core.instrument.simple.SimpleMeterRegistry; import it.cavallium.data.generator.nativedata.Nullableboolean; +import it.cavallium.data.generator.nativedata.Nullabledouble; import it.cavallium.data.generator.nativedata.Nullableint; import it.cavallium.data.generator.nativedata.Nullablelong; import it.cavallium.dbengine.DbTestUtils.TempDb; @@ -33,9 +34,11 @@ public class MemoryTemporaryDbGenerator implements TemporaryDbGenerator { Duration.ofSeconds(5), false, new ByteBuffersDirectory(), - 16 * 1024 * 1024, - true, - false, + Nullableboolean.empty(), + Nullabledouble.empty(), + Nullableint.empty(), + Nullableboolean.empty(), + Nullableboolean.empty(), false, MAX_IN_MEMORY_RESULT_ENTRIES );