From 53519fbc4eb4d8705674dba9e1686687b0abd075 Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Mon, 28 Feb 2022 00:40:17 +0100 Subject: [PATCH] Use WriteBatch --- .../client/LuceneDirectoryOptions.java | 60 +- .../database/disk/LLLocalLuceneIndex.java | 27 +- .../lucene/directory/RocksDBInstance.java | 7 + .../lucene/directory/RocksdbDirectory.java | 108 +-- .../lucene/directory/RocksdbFileStore.java | 748 ++++++++++++++---- 5 files changed, 737 insertions(+), 213 deletions(-) create mode 100644 src/main/java/it/cavallium/dbengine/lucene/directory/RocksDBInstance.java diff --git a/src/main/java/it/cavallium/dbengine/client/LuceneDirectoryOptions.java b/src/main/java/it/cavallium/dbengine/client/LuceneDirectoryOptions.java index cf1d6fa..f334d45 100644 --- a/src/main/java/it/cavallium/dbengine/client/LuceneDirectoryOptions.java +++ b/src/main/java/it/cavallium/dbengine/client/LuceneDirectoryOptions.java @@ -3,6 +3,7 @@ package it.cavallium.dbengine.client; import it.cavallium.dbengine.lucene.directory.RocksdbDirectory; import java.io.IOException; import java.nio.file.Path; +import java.util.Map; import java.util.Optional; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -10,6 +11,8 @@ import org.apache.lucene.misc.store.DirectIODirectory; import org.apache.lucene.store.Directory; import org.apache.lucene.store.FSDirectory; import org.apache.lucene.util.Constants; +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.RocksDB; public sealed interface LuceneDirectoryOptions { @@ -17,6 +20,8 @@ public sealed interface LuceneDirectoryOptions { Optional getManagedPath(); + boolean isStorageCompressed(); + record ByteBuffersDirectory() implements LuceneDirectoryOptions { @Override @@ -28,6 +33,11 @@ public sealed interface LuceneDirectoryOptions { public Optional getManagedPath() { return Optional.empty(); } + + @Override + public boolean isStorageCompressed() { + return false; + } } record MemoryMappedFSDirectory(Path managedPath) implements StandardFSDirectoryOptions { @@ -36,6 +46,11 @@ public sealed interface LuceneDirectoryOptions { public FSDirectory createLuceneDirectory(String directoryName) throws IOException { return FSDirectory.open(managedPath.resolve(directoryName + ".lucene.db")); } + + @Override + public boolean isStorageCompressed() { + return false; + } } record NIOFSDirectory(Path managedPath) implements StandardFSDirectoryOptions { @@ -44,6 +59,11 @@ public sealed interface LuceneDirectoryOptions { public FSDirectory createLuceneDirectory(String directoryName) throws IOException { return org.apache.lucene.store.NIOFSDirectory.open(managedPath.resolve(directoryName + ".lucene.db")); } + + @Override + public boolean isStorageCompressed() { + return false; + } } record DirectIOFSDirectory(StandardFSDirectoryOptions delegate, Optional mergeBufferSize, @@ -73,14 +93,44 @@ public sealed interface LuceneDirectoryOptions { public Optional getManagedPath() { return delegate.getManagedPath(); } + + @Override + public boolean isStorageCompressed() { + return delegate.isStorageCompressed(); + } } - record RocksDBDirectory(Path managedPath) implements PathDirectoryOptions { + record RocksDBStandaloneDirectory(Path managedPath, int blockSize) implements PathDirectoryOptions { @Override public Directory createLuceneDirectory(String directoryName) throws IOException { - return new RocksdbDirectory(managedPath.resolve(directoryName + ".lucene.db")); + return new RocksdbDirectory(managedPath.resolve(directoryName + ".lucene.db"), blockSize); } + + @Override + public boolean isStorageCompressed() { + return true; + } + } + + record RocksDBSharedDirectory(RocksDB db, Map handles, int blockSize) implements + LuceneDirectoryOptions { + + @Override + public Directory createLuceneDirectory(String directoryName) throws IOException { + return new RocksdbDirectory(db, handles, directoryName, blockSize); + } + + @Override + public Optional getManagedPath() { + return Optional.empty(); + } + + @Override + public boolean isStorageCompressed() { + return true; + } + } record NRTCachingDirectory(LuceneDirectoryOptions delegate, long maxMergeSizeBytes, long maxCachedBytes) implements @@ -99,6 +149,11 @@ public sealed interface LuceneDirectoryOptions { public Optional getManagedPath() { return delegate.getManagedPath(); } + + @Override + public boolean isStorageCompressed() { + return delegate.isStorageCompressed(); + } } sealed interface StandardFSDirectoryOptions extends PathDirectoryOptions { @@ -115,5 +170,6 @@ public sealed interface LuceneDirectoryOptions { default Optional getManagedPath() { return Optional.of(managedPath()); } + } } diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java index bfb0302..d724b21 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java @@ -51,6 +51,9 @@ import org.apache.commons.lang3.time.StopWatch; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.lucene.analysis.miscellaneous.PerFieldAnalyzerWrapper; +import org.apache.lucene.codecs.Codec; +import org.apache.lucene.codecs.lucene90.Lucene90Codec; +import org.apache.lucene.codecs.lucene90.Lucene90Codec.Mode; import org.apache.lucene.index.ConcurrentMergeScheduler; import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.IndexWriterConfig; @@ -61,6 +64,7 @@ import org.apache.lucene.index.SimpleMergedSegmentWarmer; import org.apache.lucene.index.SnapshotDeletionPolicy; import org.apache.lucene.index.TieredMergePolicy; import org.apache.lucene.search.similarities.Similarity; +import org.apache.lucene.store.ByteBuffersDirectory; import org.apache.lucene.store.Directory; import org.apache.lucene.store.MMapDirectory; import org.apache.lucene.util.InfoStream; @@ -135,6 +139,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { } this.lowMemory = luceneOptions.lowMemory(); this.directory = luceneOptions.directoryOptions().createLuceneDirectory(luceneIndexName); + boolean compressCodec = !luceneOptions.directoryOptions().isStorageCompressed(); this.luceneIndexName = luceneIndexName; var snapshotter = new SnapshotDeletionPolicy(new KeepOnlyLastCommitDeletionPolicy()); @@ -450,6 +455,9 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { if (activeTasks.isTerminated()) return null; shutdownLock.lock(); try { + if (closeRequested.get()) { + return null; + } flushTime.recordCallable(() -> { indexWriter.flush(); return null; @@ -472,6 +480,9 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { if (activeTasks.isTerminated()) return null; shutdownLock.lock(); try { + if (closeRequested.get()) { + return null; + } refreshTime.recordCallable(() -> { if (force) { searcherManager.maybeRefreshBlocking(); @@ -491,9 +502,15 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { .subscribeOn(luceneHeavyTasksScheduler); } - private void scheduledCommit() { + /** + * Internal method, do not use + */ + public void scheduledCommit() { shutdownLock.lock(); try { + if (closeRequested.get()) { + return; + } commitTime.recordCallable(() -> { indexWriter.commit(); return null; @@ -505,9 +522,15 @@ public class LLLocalLuceneIndex implements LLLuceneIndex { } } - private void scheduledMerge() { // Do not use. Merges are done automatically by merge policies + /** + * Internal method, do not use + */ + public void scheduledMerge() { // Do not use. Merges are done automatically by merge policies shutdownLock.lock(); try { + if (closeRequested.get()) { + return; + } mergeTime.recordCallable(() -> { indexWriter.maybeMerge(); return null; diff --git a/src/main/java/it/cavallium/dbengine/lucene/directory/RocksDBInstance.java b/src/main/java/it/cavallium/dbengine/lucene/directory/RocksDBInstance.java new file mode 100644 index 0000000..cc9584e --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/lucene/directory/RocksDBInstance.java @@ -0,0 +1,7 @@ +package it.cavallium.dbengine.lucene.directory; + +import java.util.Map; +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.RocksDB; + +public record RocksDBInstance(RocksDB db, Map handles) {} diff --git a/src/main/java/it/cavallium/dbengine/lucene/directory/RocksdbDirectory.java b/src/main/java/it/cavallium/dbengine/lucene/directory/RocksdbDirectory.java index 005d37f..e08c8c9 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/directory/RocksdbDirectory.java +++ b/src/main/java/it/cavallium/dbengine/lucene/directory/RocksdbDirectory.java @@ -1,62 +1,60 @@ package it.cavallium.dbengine.lucene.directory; -import org.apache.lucene.index.IndexFileNames; -import org.apache.lucene.store.*; -import org.apache.lucene.util.Accountable; - +import com.google.common.util.concurrent.Striped; import java.io.FileNotFoundException; import java.io.IOException; -import java.nio.file.Files; import java.nio.file.Path; -import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReadWriteLock; +import org.apache.lucene.index.IndexFileNames; +import org.apache.lucene.store.BaseDirectory; +import org.apache.lucene.store.IOContext; +import org.apache.lucene.store.IndexInput; +import org.apache.lucene.store.IndexOutput; +import org.apache.lucene.store.LockFactory; +import org.apache.lucene.store.SingleInstanceLockFactory; +import org.apache.lucene.util.Accountable; +import org.jetbrains.annotations.Nullable; +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.RocksDB; import org.rocksdb.RocksDBException; -/** - * Created by wens on 16-3-10. - */ public class RocksdbDirectory extends BaseDirectory implements Accountable { private static final int BUFFER_SIZE = 10 * 1024; + @SuppressWarnings("UnstableApiUsage") + protected final Striped metaLock = Striped.readWriteLock(64); + protected final RocksdbFileStore store; - protected final AtomicLong sizeInBytes = new AtomicLong(); - - /** Used to generate temp file names in {@link #createTempOutput}. */ private final AtomicLong nextTempFileCounter = new AtomicLong(); - public RocksdbDirectory(Path path) throws IOException { - this(path, new SingleInstanceLockFactory()); + public RocksdbDirectory(Path path, int blockSize) throws IOException { + this(path, blockSize, new SingleInstanceLockFactory()); } - /** - * Sole constructor. - * - */ - protected RocksdbDirectory(Path path, LockFactory lockFactory) throws IOException { + public RocksdbDirectory(RocksDB db, Map handles, @Nullable String name, int blockSize) + throws IOException { + this(db, handles, name, blockSize, new SingleInstanceLockFactory()); + } + + protected RocksdbDirectory(Path path, int blockSize, LockFactory lockFactory) throws IOException { super(lockFactory); - store = new RocksdbFileStore(path); + store = RocksdbFileStore.create(path, blockSize, metaLock); } - - public RocksdbDirectory(Path path, FSDirectory dir, IOContext context) throws IOException { - this(path, dir, false, context); - } - - private RocksdbDirectory(Path path, FSDirectory dir, boolean closeDir, IOContext context) throws IOException { - this(path); - for (String file : dir.listAll()) { - if (!Files.isDirectory(dir.getDirectory().resolve(file))) { - copyFrom(dir, file, file, context); - } - } - if (closeDir) { - dir.close(); - } + protected RocksdbDirectory(RocksDB db, + Map handles, + @Nullable String name, + int blockSize, + LockFactory lockFactory) throws IOException { + super(lockFactory); + store = RocksdbFileStore.create(db, handles, name, blockSize, metaLock); } @Override @@ -88,12 +86,17 @@ public class RocksdbDirectory extends BaseDirectory implements Accountable { @Override public void deleteFile(String name) throws IOException { ensureOpen(); - long size = store.getSize(name); - if (size != -1) { - sizeInBytes.addAndGet(-size); - store.remove(name); - } else { - throw new FileNotFoundException(name); + var l = metaLock.get(name).writeLock(); + l.lock(); + try { + long size = store.getSize(name); + if (size != -1) { + store.remove(name); + } else { + throw new FileNotFoundException(name); + } + } finally { + l.unlock(); } } @@ -103,6 +106,8 @@ public class RocksdbDirectory extends BaseDirectory implements Accountable { @Override public IndexOutput createOutput(String name, IOContext context) throws IOException { ensureOpen(); + var l = metaLock.get(name).writeLock(); + l.lock(); try { if (store.contains(name)) { store.remove(name); @@ -111,6 +116,8 @@ public class RocksdbDirectory extends BaseDirectory implements Accountable { return new RocksdbOutputStream(name, store, BUFFER_SIZE, true); } catch (RocksDBException ex) { throw new IOException(ex); + } finally { + l.unlock(); } } @Override @@ -134,17 +141,22 @@ public class RocksdbDirectory extends BaseDirectory implements Accountable { } @Override - public void sync(Collection names) { + public void sync(Collection names) throws IOException { + // System.out.println("Syncing " + names.size() + " files"); } @Override - public void syncMetaData() { - + public void syncMetaData() throws IOException { + // System.out.println("Syncing meta"); } @Override public void rename(String source, String dest) throws IOException { ensureOpen(); + var l = metaLock.bulkGet(List.of(source, dest)); + for (ReadWriteLock ll : l) { + ll.writeLock().lock(); + } try { if (!store.contains(source)) { throw new FileNotFoundException(source); @@ -152,6 +164,10 @@ public class RocksdbDirectory extends BaseDirectory implements Accountable { store.move(source, dest); } catch (RocksDBException ex) { throw new IOException(ex); + } finally { + for (ReadWriteLock ll : l) { + ll.writeLock().unlock(); + } } } @@ -161,6 +177,8 @@ public class RocksdbDirectory extends BaseDirectory implements Accountable { @Override public IndexInput openInput(String name, IOContext context) throws IOException { ensureOpen(); + var l = metaLock.get(name).readLock(); + l.lock(); try { if (!store.contains(name)) { throw new FileNotFoundException(name); @@ -169,6 +187,8 @@ public class RocksdbDirectory extends BaseDirectory implements Accountable { return new RocksdbInputStream(name, store, BUFFER_SIZE); } catch (RocksDBException ex) { throw new IOException(ex); + } finally { + l.unlock(); } } diff --git a/src/main/java/it/cavallium/dbengine/lucene/directory/RocksdbFileStore.java b/src/main/java/it/cavallium/dbengine/lucene/directory/RocksdbFileStore.java index 2a5d3de..f7f3459 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/directory/RocksdbFileStore.java +++ b/src/main/java/it/cavallium/dbengine/lucene/directory/RocksdbFileStore.java @@ -1,82 +1,398 @@ package it.cavallium.dbengine.lucene.directory; +import com.google.common.primitives.Longs; +import com.google.common.util.concurrent.Striped; import io.net5.buffer.ByteBuf; import io.net5.buffer.ByteBufAllocator; import java.io.IOException; import java.nio.ByteBuffer; +import java.nio.ByteOrder; import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; +import java.time.Duration; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; +import org.apache.lucene.store.AlreadyClosedException; import org.jetbrains.annotations.Nullable; import org.rocksdb.ColumnFamilyDescriptor; import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.ColumnFamilyOptions; +import org.rocksdb.CompressionType; import org.rocksdb.DBOptions; +import org.rocksdb.InfoLogLevel; +import org.rocksdb.Options; import org.rocksdb.ReadOptions; import org.rocksdb.RocksDB; import org.rocksdb.RocksDBException; import org.rocksdb.RocksIterator; +import org.rocksdb.WALRecoveryMode; +import org.rocksdb.WriteBatch; import org.rocksdb.WriteOptions; +import org.rocksdb.util.SizeUnit; /** * Created by wens on 16-3-10. */ public class RocksdbFileStore { + private static final byte[] NEXT_ID_KEY = new byte[]{0x0}; + private static final String DEFAULT_COLUMN_FAMILY_STRING = new String(RocksDB.DEFAULT_COLUMN_FAMILY, StandardCharsets.US_ASCII); + static { RocksDB.loadLibrary(); } - private static final int BLOCK_SIZE = 10 * 1024; + @SuppressWarnings("UnstableApiUsage") + private final Striped metaLock; + private static final ReadOptions DEFAULT_READ_OPTS = new ReadOptions(); + private static final ReadOptions DEFAULT_IT_READ_OPTS = new ReadOptions() + .setReadaheadSize(SizeUnit.MB) + .setVerifyChecksums(false); private static final WriteOptions DEFAULT_WRITE_OPTS = new WriteOptions(); private static final ByteBuffer EMPTY_BYTE_BUF = ByteBuffer.allocateDirect(0); - private final ReadWriteLock lock = new ReentrantReadWriteLock(); - private final RocksDB db; - private final ColumnFamilyHandle meta; + private final int blockSize; + private final ColumnFamilyHandle headers; + private final ColumnFamilyHandle filename; + private final ColumnFamilyHandle size; private final ColumnFamilyHandle data; + private final ConcurrentHashMap filenameToId = new ConcurrentHashMap<>(); + private final AtomicLong nextId; + private final AtomicLong flushCounter = new AtomicLong(); + private volatile boolean closed; - public RocksdbFileStore(Path path) throws IOException { - var options = new DBOptions(); - options.setCreateIfMissing(true); - if (Files.notExists(path)) { - Files.createDirectories(path); - } - options.setCreateMissingColumnFamilies(true); + private RocksdbFileStore(RocksDB db, + ColumnFamilyHandle headers, + ColumnFamilyHandle filename, + ColumnFamilyHandle size, + ColumnFamilyHandle data, + int blockSize, + Striped metaLock) throws IOException { try { - var handles = new ArrayList(2); - this.db = RocksDB.open(options, - path.toString(), - List.of( - new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY), - new ColumnFamilyDescriptor("metadata".getBytes(StandardCharsets.US_ASCII)) - ), - handles - ); - this.meta = handles.get(0); - this.data = handles.get(1); + this.db = db; + this.blockSize = blockSize; + this.headers = headers; + this.filename = filename; + this.size = size; + this.data = data; + this.metaLock = metaLock; + byte[] nextIdBytes = db.get(headers, NEXT_ID_KEY); + if (nextIdBytes != null) { + this.nextId = new AtomicLong(Longs.fromByteArray(nextIdBytes)); + } else { + this.nextId = new AtomicLong(); + incFlush(); + db.put(headers, NEXT_ID_KEY, Longs.toByteArray(100)); + incFlush(); + } } catch (RocksDBException e) { throw new IOException("Failed to open RocksDB meta file store", e); } } + private static DBOptions getDBOptions() { + var options = new DBOptions(); + options.setWalSizeLimitMB(256); + options.setMaxWriteBatchGroupSizeBytes(2 * SizeUnit.MB); + options.setMaxLogFileSize(256 * SizeUnit.MB); + options.setAtomicFlush(false); + options.setWalRecoveryMode(WALRecoveryMode.PointInTimeRecovery); + options.setCreateMissingColumnFamilies(true); + options.setCreateIfMissing(true); + options.setUnorderedWrite(true); + options.setAvoidUnnecessaryBlockingIO(true); + options.setSkipCheckingSstFileSizesOnDbOpen(true); + options.setInfoLogLevel(InfoLogLevel.ERROR_LEVEL); + options.setAllowMmapReads(true); + options.setAllowMmapWrites(true); + options.setIncreaseParallelism(Runtime.getRuntime().availableProcessors()); + options.setDeleteObsoleteFilesPeriodMicros(Duration.ofMinutes(15).toNanos() / 1000L); + options.setRecycleLogFileNum(10); + return options; + } - public boolean contains(String key) throws RocksDBException { - lock.readLock().lock(); + private static List getColumnFamilyDescriptors(@Nullable String name) { + String headersName, filenameName, sizeName, dataName; + if (name != null) { + headersName = (name + "_headers"); + filenameName = (name + "_filename"); + sizeName = (name + "_size"); + dataName = (name + "_data"); + } else { + headersName = DEFAULT_COLUMN_FAMILY_STRING; + filenameName = "filename"; + sizeName = "size"; + dataName = "data"; + } + return List.of( + getColumnFamilyDescriptor(headersName), + getColumnFamilyDescriptor(filenameName), + getColumnFamilyDescriptor(sizeName), + getColumnFamilyDescriptor(dataName) + ); + } + + public static ColumnFamilyDescriptor getColumnFamilyDescriptor(String name) { + ColumnFamilyOptions opts; + if (name.equals(DEFAULT_COLUMN_FAMILY_STRING) || name.endsWith("_headers")) { + opts = new ColumnFamilyOptions() + .setCompressionType(CompressionType.NO_COMPRESSION) + .setTargetFileSizeBase(SizeUnit.KB); + } else if (name.endsWith("_filename")) { + opts = new ColumnFamilyOptions() + .setCompressionType(CompressionType.NO_COMPRESSION) + .setTargetFileSizeBase(32L * SizeUnit.MB); + } else if (name.endsWith("_size")) { + opts = new ColumnFamilyOptions() + .setCompressionType(CompressionType.NO_COMPRESSION) + .setTargetFileSizeBase(32L * SizeUnit.MB); + } else if (name.endsWith("_data")) { + opts = new ColumnFamilyOptions() + .setCompressionType(CompressionType.LZ4_COMPRESSION) + .setTargetFileSizeBase(128L * SizeUnit.MB); + } else { + opts = new ColumnFamilyOptions(); + } + return new ColumnFamilyDescriptor(name.getBytes(StandardCharsets.US_ASCII), opts); + } + + public static RocksdbFileStore create(RocksDB db, + Map existingHandles, + @Nullable String name, + int blockSize, + Striped metaLock) throws IOException { + List columnFamilyDescriptors = getColumnFamilyDescriptors(name); try { - ByteBuf metaKey = getMetaKey(key); - try { - return db.get(meta, DEFAULT_READ_OPTS, metaKey.nioBuffer(), EMPTY_BYTE_BUF) != RocksDB.NOT_FOUND; - } finally { - metaKey.release(); + List handles = new ArrayList<>(columnFamilyDescriptors.size()); + for (ColumnFamilyDescriptor columnFamilyDescriptor : columnFamilyDescriptors) { + var columnFamilyName = new String(columnFamilyDescriptor.getName(), StandardCharsets.US_ASCII); + ColumnFamilyHandle columnFamilyHandle; + if (existingHandles.containsKey(columnFamilyName)) { + columnFamilyHandle = existingHandles.get(columnFamilyName); + } else { + columnFamilyHandle = db.createColumnFamily(columnFamilyDescriptor); + } + handles.add(columnFamilyHandle); } + return new RocksdbFileStore(db, + handles.get(0), + handles.get(1), + handles.get(2), + handles.get(3), + blockSize, + metaLock + ); + } catch (RocksDBException e) { + throw new IOException(e); + } + } + + public static RocksdbFileStore create(Path path, int blockSize, Striped metaLock) throws IOException { + try { + DBOptions options = getDBOptions(); + List descriptors = getColumnFamilyDescriptors(null); + if (Files.notExists(path)) { + Files.createDirectories(path); + } + var handles = new ArrayList(4); + RocksDB db = RocksDB.open(options, path.toString(), descriptors, handles); + return new RocksdbFileStore(db, + handles.get(0), + handles.get(1), + handles.get(2), + handles.get(3), + blockSize, + metaLock + ); + } catch (RocksDBException e) { + throw new IOException("Failed to open RocksDB meta file store", e); + } + } + + public static RocksDBInstance createEmpty(Path path) throws IOException { + try { + DBOptions options = getDBOptions(); + List descriptors; + if (Files.exists(path)) { + descriptors = RocksDB + .listColumnFamilies(new Options(), path.toString()) + .stream() + .map(nameBytes -> { + var name = new String(nameBytes, StandardCharsets.US_ASCII); + return getColumnFamilyDescriptor(name); + }) + .toList(); + } else { + descriptors = List.of(new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY)); + } + if (Files.notExists(path)) { + Files.createDirectories(path); + } + var handles = new ArrayList(descriptors.size()); + RocksDB db = RocksDB.open(options, path.toString(), descriptors, handles); + var handlesMap = new HashMap(); + for (int i = 0; i < handles.size(); i++) { + var name = new String(descriptors.get(i).getName(), StandardCharsets.US_ASCII); + handlesMap.put(name, handles.get(i)); + } + return new RocksDBInstance(db, Collections.unmodifiableMap(handlesMap)); + } catch (RocksDBException e) { + throw new IOException("Failed to open RocksDB meta file store", e); + } + } + + private long getFileId(String key) throws RocksDBException, IOException { + Long id = filenameToId.get(key); + if (id != null) { + return id; + } else { + var filenameKey = getFilenameKey(key); + var filenameValue = getFilenameValue(); + try { + if (db.get(filename, DEFAULT_READ_OPTS, filenameKey.nioBuffer(), filenameValue.nioBuffer(0, Long.BYTES)) + == RocksDB.NOT_FOUND) { + throw new IOException("File not found: " + key); + } + filenameValue.writerIndex(Long.BYTES); + return filenameValue.readLongLE(); + } finally { + filenameKey.release(); + filenameValue.release(); + } + } + } + + @Nullable + private Long getFileIdOrNull(String key) throws RocksDBException { + Long id = filenameToId.get(key); + if (id != null) { + return id; + } else { + var filenameKey = getFilenameKey(key); + var filenameValue = getFilenameValue(); + try { + if (db.get(filename, DEFAULT_READ_OPTS, filenameKey.nioBuffer(), filenameValue.nioBuffer(0, Long.BYTES)) + == RocksDB.NOT_FOUND) { + return null; + } + filenameValue.writerIndex(Long.BYTES); + return filenameValue.readLongLE(); + } finally { + filenameKey.release(); + filenameValue.release(); + } + } + } + + private boolean containsFileId(String key) throws RocksDBException { + Long id = filenameToId.get(key); + if (id != null) { + return true; + } else { + var filenameKey = getFilenameKey(key); + try { + if (db.keyMayExist(filename, DEFAULT_READ_OPTS, filenameKey.nioBuffer())) { + return db.get(filename, DEFAULT_READ_OPTS, filenameKey.nioBuffer(), EMPTY_BYTE_BUF) != RocksDB.NOT_FOUND; + } else { + return false; + } + } finally { + filenameKey.release(); + } + } + } + + private void moveFileId(long id, String oldKey, String newKey) throws RocksDBException { + var filenameOldKey = getFilenameKey(oldKey); + var filenameNewKey = getFilenameKey(newKey); + var filenameValue = getFilenameValue(); + filenameValue.writeLongLE(id); + try { + db.delete(filename, DEFAULT_WRITE_OPTS, filenameOldKey.nioBuffer()); + incFlush(); + db.put(filename, DEFAULT_WRITE_OPTS, filenameNewKey.nioBuffer(), filenameValue.nioBuffer(0, Long.BYTES)); + incFlush(); } finally { - lock.readLock().unlock(); + filenameOldKey.release(); + filenameNewKey.release(); + filenameValue.release(); + } + } + + private void incFlush() throws RocksDBException { + /* + if ((flushCounter.incrementAndGet() % 1) == 0) { + db.flushWal(false); + } + */ + } + + private long getFileIdOrAllocate(String key) throws RocksDBException { + Long id = filenameToId.get(key); + if (id != null) { + return id; + } else { + var filenameKey = getFilenameKey(key); + var filenameValue = getFilenameValue(); + try { + if (db.get(filename, DEFAULT_READ_OPTS, filenameKey.nioBuffer(), filenameValue.nioBuffer(0, Long.BYTES)) + == RocksDB.NOT_FOUND) { + filenameValue.writerIndex(0); + filenameValue.readerIndex(0); + var newlyAllocatedId = this.nextId.getAndIncrement(); + if (newlyAllocatedId % 100 == 99) { + db.put(headers, new byte[] {0x00}, Longs.toByteArray(newlyAllocatedId + 1 + 100)); + incFlush(); + } + filenameValue.writeLongLE(newlyAllocatedId); + db.put(filename, + DEFAULT_WRITE_OPTS, + filenameKey.nioBuffer(), + filenameValue.nioBuffer(0, filenameValue.writerIndex()) + ); + incFlush(); + filenameToId.put(key, newlyAllocatedId); + return newlyAllocatedId; + } + filenameValue.readerIndex(0); + filenameValue.writerIndex(Long.BYTES); + return filenameValue.readLongLE(); + } finally { + filenameKey.release(); + filenameValue.release(); + } + } + } + + private void dellocateFilename(String key) throws RocksDBException { + var filenameKey = getFilenameKey(key); + try { + db.delete(filename, DEFAULT_WRITE_OPTS, filenameKey.nioBuffer()); + filenameToId.remove(key); + } finally { + filenameKey.release(); + } + } + + public boolean contains(String key) throws RocksDBException, IOException { + var l = metaLock.get(key).readLock(); + l.lock(); + try { + ensureOpen(); + return containsFileId(key); + } finally { + l.unlock(); } } @@ -85,31 +401,53 @@ public class RocksdbFileStore { } private ByteBuf getDataValueBuf() { - return ByteBufAllocator.DEFAULT.ioBuffer(BLOCK_SIZE, BLOCK_SIZE); + return ByteBufAllocator.DEFAULT.ioBuffer(blockSize, blockSize); } - private ByteBuf getMetaKey(String key) { + private ByteBuf getFilenameValue() { + return ByteBufAllocator.DEFAULT.ioBuffer(Long.BYTES, Long.BYTES); + } + + private ByteBuf getMetaKey(long id) { + ByteBuf buf = ByteBufAllocator.DEFAULT.ioBuffer(Long.BYTES); + buf.writeLongLE(id); + return buf; + } + + private ByteBuf getFilenameKey(String key) { ByteBuf buf = ByteBufAllocator.DEFAULT.ioBuffer(key.length()); buf.writeCharSequence(key, StandardCharsets.US_ASCII); return buf; } - private ByteBuf getDataKey(@Nullable ByteBuf buf, String key, int i) { + private ByteBuf getDataKey(@Nullable ByteBuf buf, long id, int i) { if (buf == null) { - buf = ByteBufAllocator.DEFAULT.buffer(key.length() + 1 + Integer.BYTES); + buf = ByteBufAllocator.DEFAULT.buffer(Long.BYTES + Integer.BYTES); } - buf.writerIndex(0); - buf.writeCharSequence(key, StandardCharsets.US_ASCII); - buf.writeByte('\0'); - buf.writeIntLE(i); + buf.writeLongLE(id); + buf.writeInt(i); return buf; } + private byte[] getDataKeyByteArray(long id, int i) { + ByteBuffer bb = ByteBuffer.wrap(new byte[Long.BYTES + Integer.BYTES]); + bb.order(ByteOrder.LITTLE_ENDIAN); + bb.putLong(id); + bb.putInt(i); + return bb.array(); + } + public int load(String name, long position, byte[] buf, int offset, int len) throws IOException { - lock.readLock().lock(); + var l = metaLock.get(name).readLock(); + l.lock(); try { - long size = getSize(name); + ensureOpen(); + Long fileId = getFileIdOrNull(name); + if (fileId == null) { + return -1; + } + long size = getSizeInternal(fileId); if (position >= size) { return -1; @@ -123,29 +461,43 @@ public class RocksdbFileStore { int f = offset; int n = len; - ByteBuf keyBuf = null; ByteBuf valBuf = getDataValueBuf(); - ByteBuffer valBuffer = valBuf.nioBuffer(0, BLOCK_SIZE); - try { + ByteBuffer valBuffer = valBuf.nioBuffer(0, blockSize); + boolean shouldSeekTo = true; + try (RocksIterator it = db.newIterator(data, DEFAULT_IT_READ_OPTS)) { int m; int r; int i; do { - m = (int) (p % (long) BLOCK_SIZE); - r = Math.min(BLOCK_SIZE - m, n); - i = (int) (p / (long) BLOCK_SIZE); + m = (int) (p % (long) blockSize); + r = Math.min(blockSize - m, n); + i = (int) (p / (long) blockSize); - keyBuf = getDataKey(keyBuf, name, i); - if (db.get(data, DEFAULT_READ_OPTS, keyBuf.nioBuffer(), valBuffer) - == RocksDB.NOT_FOUND) { - throw new IOException("Block " + name + ":" + i + " not found"); + //System.out.println("Reading block " + name + "(" + fileId + "):" + i); + + if (shouldSeekTo) { + shouldSeekTo = false; + ByteBuf dataKey = getDataKey(null, fileId, i); + try { + it.seek(dataKey.nioBuffer()); + } finally { + dataKey.release(); + } + if (!it.isValid()) { + throw new IOException("Block " + name + "(" + fileId + ")" + ":" + i + " not found"); + } + } else { + it.next(); + if (!it.isValid()) { + throw new IOException("Block " + name + "(" + fileId + ")" + ":" + i + " not found"); + } + assert Arrays.equals(getDataKeyByteArray(fileId, i), it.key()); } - valBuf.writerIndex(BLOCK_SIZE); + int dataRead = it.value(valBuffer); + valBuf.writerIndex(dataRead); valBuf.getBytes(m, buf, f, r); - keyBuf.writerIndex(0); - keyBuf.readerIndex(0); valBuf.writerIndex(0); valBuf.readerIndex(0); @@ -156,15 +508,12 @@ public class RocksdbFileStore { return (int) (p - position); } finally { - if (keyBuf != null) { - keyBuf.release(); - } valBuf.release(); } } catch (RocksDBException ex) { throw new IOException(ex); } finally { - lock.readLock().unlock(); + l.unlock(); } } @@ -172,12 +521,40 @@ public class RocksdbFileStore { * @return not exist return -1 */ public long getSize(String key) throws IOException { - lock.readLock().lock(); + var l = metaLock.get(key).readLock(); + l.lock(); try { - ByteBuf metaKey = getMetaKey(key); + ensureOpen(); + return getSizeInternal(key); + } finally { + l.unlock(); + } + } + + /** + * @return not exist return -1 + */ + private long getSizeInternal(String key) throws IOException { + try { + Long fileId = getFileIdOrNull(key); + if (fileId == null) { + return -1; + } + return getSizeInternal(fileId); + } catch (RocksDBException ex) { + throw new IOException(ex); + } + } + + /** + * @return not exist return -1 + */ + private long getSizeInternal(long fileId) throws IOException { + try { + ByteBuf metaKey = getMetaKey(fileId); ByteBuf metaData = getMetaValueBuf(); try { - if (db.get(meta, DEFAULT_READ_OPTS, metaKey.nioBuffer(), metaData.internalNioBuffer(0, Long.BYTES)) + if (db.get(size, DEFAULT_READ_OPTS, metaKey.nioBuffer(), metaData.internalNioBuffer(0, Long.BYTES)) != RocksDB.NOT_FOUND) { metaData.writerIndex(Long.BYTES); return metaData.readLongLE(); @@ -190,32 +567,39 @@ public class RocksdbFileStore { } } catch (RocksDBException ex) { throw new IOException(ex); - } finally { - lock.readLock().unlock(); } } public void remove(String key) throws IOException { - lock.writeLock().lock(); + var l = metaLock.get(key).writeLock(); + l.lock(); try { - long size = getSize(key); - + ensureOpen(); + Long fileId = getFileIdOrNull(key); + if (fileId == null) { + return; + } + long size; + size = getSizeInternal(fileId); if (size == -1) { return; } ByteBuf dataKey = null; try { - int n = (int) ((size + BLOCK_SIZE - 1) / BLOCK_SIZE); - for (int i = 0; i < n; i++) { - dataKey = getDataKey(dataKey, key, i); + int n = (int) ((size + blockSize - 1) / blockSize); + if (n == 1) { + dataKey = getDataKey(dataKey, fileId, 0); db.delete(data, DEFAULT_WRITE_OPTS, dataKey.nioBuffer()); - dataKey.readerIndex(0); - dataKey.writerIndex(0); + } else if (n > 1) { + var dataKey1 = getDataKeyByteArray(fileId, 0); + var dataKey2 = getDataKeyByteArray(fileId, n - 1); + db.deleteRange(data, DEFAULT_WRITE_OPTS, dataKey1, dataKey2); } - ByteBuf metaKey = getMetaKey(key); + ByteBuf metaKey = getMetaKey(fileId); try { - db.delete(meta, DEFAULT_WRITE_OPTS, metaKey.nioBuffer(0, Long.BYTES)); + dellocateFilename(key); + db.delete(this.size, DEFAULT_WRITE_OPTS, metaKey.nioBuffer()); } finally { metaKey.release(); } @@ -227,84 +611,118 @@ public class RocksdbFileStore { } catch (RocksDBException ex) { throw new IOException(ex); } finally { - lock.writeLock().unlock(); + l.unlock(); } } public void clear() throws IOException { - lock.writeLock().lock(); + Lock[] locks = new Lock[metaLock.size()]; + for (int i = 0; i < metaLock.size(); i++) { + locks[i] = metaLock.getAt(i).writeLock(); + } + for (Lock lock : locks) { + lock.lock(); + } try { - List keySet = listKey(); + ensureOpen(); + List keySet = listKeyInternal(); for (String key : keySet) { remove(key); } } finally { - lock.writeLock().unlock(); + for (Lock lock : locks) { + lock.unlock(); + } } } public List listKey() { - List keys = new ArrayList<>(); - lock.readLock().lock(); + Lock[] locks = new Lock[metaLock.size()]; + for (int i = 0; i < metaLock.size(); i++) { + locks[i] = metaLock.getAt(i).readLock(); + } + for (Lock lock : locks) { + lock.lock(); + } try { - RocksIterator iterator = db.newIterator(meta); - iterator.seekToFirst(); - while (iterator.isValid()) { - keys.add(new String(iterator.key()).intern()); - iterator.next(); - } + ensureOpen(); + return listKeyInternal(); } finally { - lock.readLock().unlock(); + for (Lock lock : locks) { + lock.unlock(); + } + } + } + + private List listKeyInternal() { + List keys = new ArrayList<>(); + RocksIterator iterator = db.newIterator(filename); + iterator.seekToFirst(); + while (iterator.isValid()) { + keys.add(new String(iterator.key(), StandardCharsets.US_ASCII).intern()); + iterator.next(); } return keys; } public void append(String name, byte[] buf, int offset, int len) throws IOException { - lock.writeLock().lock(); + var l = metaLock.get(name).writeLock(); + l.lock(); try { - long size = getSize(name); + ensureOpen(); + long size; + long fileId; + int f; + int n; + size = getSizeInternal(name); if (size == -1) { size = 0; } - int f = offset; - int n = len; + f = offset; + n = len; + fileId = getFileIdOrAllocate(name); ByteBuf dataKey = null; ByteBuf bb = getDataValueBuf(); try { - do { - int m = (int) (size % (long) BLOCK_SIZE); - int r = Math.min(BLOCK_SIZE - m, n); + try (var wb = new WriteBatch(len)) { + do { + int m = (int) (size % (long) blockSize); + int r = Math.min(blockSize - m, n); - int i = (int) ((size) / (long) BLOCK_SIZE); - dataKey = getDataKey(dataKey, name, i); - if (m != 0) { - if (db.get(data, DEFAULT_READ_OPTS, dataKey.nioBuffer(), bb.internalNioBuffer(0, BLOCK_SIZE)) - == RocksDB.NOT_FOUND) { - throw new IOException("Block " + name + ":" + i + " not found"); + int i = (int) ((size) / (long) blockSize); + dataKey = getDataKey(dataKey, fileId, i); + if (m != 0) { + int dataRead; + if ((dataRead = db.get(data, DEFAULT_READ_OPTS, dataKey.nioBuffer(), bb.internalNioBuffer(0, blockSize))) + == RocksDB.NOT_FOUND) { + throw new IOException("Block " + name + "(" + fileId + "):" + i + " not found"); + } + bb.writerIndex(dataRead); + dataKey.readerIndex(0); + } else { + bb.writerIndex(0); } - bb.writerIndex(BLOCK_SIZE); + + bb.ensureWritable(r); + bb.setBytes(m, buf, f, r); + + wb.put(data, dataKey.nioBuffer(), bb.internalNioBuffer(0, m + r)); + incFlush(); + size += r; + f += r; + n -= r; + dataKey.readerIndex(0); - } else { + dataKey.writerIndex(0); + bb.readerIndex(0); bb.writerIndex(0); - } - - bb.ensureWritable(BLOCK_SIZE); - bb.writerIndex(BLOCK_SIZE); - bb.setBytes(m, buf, f, r); - - db.put(data, DEFAULT_WRITE_OPTS, dataKey.nioBuffer(), bb.nioBuffer(0, BLOCK_SIZE)); - size += r; - f += r; - n -= r; - - dataKey.readerIndex(0); - dataKey.writerIndex(0); - bb.readerIndex(0); - bb.writerIndex(0); - } while (n != 0); + } while (n != 0); + db.write(DEFAULT_WRITE_OPTS, wb); + wb.clear(); + } } finally { if (dataKey != null) { dataKey.release(); @@ -312,11 +730,12 @@ public class RocksdbFileStore { bb.release(); } - ByteBuf metaKey = getMetaKey(name); + ByteBuf metaKey = getMetaKey(fileId); ByteBuf metaValue = getMetaValueBuf(); try { metaValue.writeLongLE(size); - db.put(meta, DEFAULT_WRITE_OPTS, metaKey.nioBuffer(), metaValue.nioBuffer(0, Long.BYTES)); + db.put(this.size, DEFAULT_WRITE_OPTS, metaKey.nioBuffer(), metaValue.nioBuffer(0, Long.BYTES)); + incFlush(); } finally { metaValue.release(); metaKey.release(); @@ -324,64 +743,63 @@ public class RocksdbFileStore { } catch (RocksDBException ex) { throw new IOException(ex); } finally { - lock.writeLock().unlock(); + l.unlock(); } } public void move(String source, String dest) throws IOException { - - lock.writeLock().lock(); + var locks = metaLock.bulkGet(List.of(source, dest)); + for (ReadWriteLock lock : locks) { + lock.writeLock().lock(); + } try { - - long s_size = getSize(source); - var metaKey = getMetaKey(dest); - var metaValue = getMetaValueBuf(); - try { - metaValue.writeLongLE(s_size); - db.put(meta, DEFAULT_WRITE_OPTS, metaKey.nioBuffer(), metaValue.nioBuffer(0, Long.BYTES)); - } finally { - metaValue.release(); - metaKey.release(); - } - - int n = (int) ((s_size + BLOCK_SIZE - 1) / BLOCK_SIZE); - - ByteBuf sourceKey = null; - ByteBuf destKey = null; - ByteBuf dataBuf = getDataValueBuf(); - try { - for (int i = 0; i < n; i++) { - sourceKey = getDataKey(sourceKey, source, i); - destKey = getDataKey(destKey, dest, i); - var nioBuf = dataBuf.nioBuffer(0, BLOCK_SIZE); - nioBuf.limit(BLOCK_SIZE); - db.get(data, DEFAULT_READ_OPTS, sourceKey.nioBuffer(), nioBuf); - nioBuf.position(BLOCK_SIZE); - db.put(data, DEFAULT_WRITE_OPTS, destKey.nioBuffer(), nioBuf); - sourceKey.writerIndex(0); - sourceKey.readerIndex(0); - destKey.writerIndex(0); - destKey.readerIndex(0); - } - } finally { - if (sourceKey != null) { - sourceKey.release(); - } - if (destKey != null) { - destKey.release(); - } - dataBuf.release(); - } - remove(source); + ensureOpen(); + long sourceFileId = getFileId(source); + moveFileId(sourceFileId, source, dest); } catch (RocksDBException ex) { throw new IOException(ex); } finally { - lock.writeLock().unlock(); + for (ReadWriteLock lock : locks) { + lock.writeLock().unlock(); + } } + } + private void ensureOpen() { + if (closed) { + throw new AlreadyClosedException("Index already closed"); + } } public void close() throws IOException { - db.close(); + if (closed) { + return; + } + Lock[] locks = new Lock[metaLock.size()]; + for (int i = 0; i < metaLock.size(); i++) { + locks[i] = metaLock.getAt(i).writeLock(); + } + for (Lock lock : locks) { + lock.lock(); + } + try { + if (closed) { + return; + } + db.close(); + closed = true; + } finally { + for (Lock lock : locks) { + lock.unlock(); + } + } + } + + public void sync() throws RocksDBException { + /* + db.flushWal(true); + db.flush(new FlushOptions().setAllowWriteStall(true).setWaitForFlush(true)); + + */ } } \ No newline at end of file