Create rocksdb directory

This commit is contained in:
Andrea Cavalli 2022-02-26 22:51:22 +01:00
parent 743919b831
commit 86263af6f7
13 changed files with 1019 additions and 110 deletions

View File

@ -0,0 +1,119 @@
package it.cavallium.dbengine.client;
import it.cavallium.dbengine.lucene.directory.RocksdbDirectory;
import java.io.IOException;
import java.nio.file.Path;
import java.util.Optional;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.misc.store.DirectIODirectory;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FSDirectory;
import org.apache.lucene.util.Constants;
public sealed interface LuceneDirectoryOptions {
Directory createLuceneDirectory(String directoryName) throws IOException;
Optional<Path> getManagedPath();
record ByteBuffersDirectory() implements LuceneDirectoryOptions {
@Override
public Directory createLuceneDirectory(String directoryName) {
return new org.apache.lucene.store.ByteBuffersDirectory();
}
@Override
public Optional<Path> getManagedPath() {
return Optional.empty();
}
}
record MemoryMappedFSDirectory(Path managedPath) implements StandardFSDirectoryOptions {
@Override
public FSDirectory createLuceneDirectory(String directoryName) throws IOException {
return FSDirectory.open(managedPath.resolve(directoryName + ".lucene.db"));
}
}
record NIOFSDirectory(Path managedPath) implements StandardFSDirectoryOptions {
@Override
public FSDirectory createLuceneDirectory(String directoryName) throws IOException {
return org.apache.lucene.store.NIOFSDirectory.open(managedPath.resolve(directoryName + ".lucene.db"));
}
}
record DirectIOFSDirectory(StandardFSDirectoryOptions delegate, Optional<Integer> mergeBufferSize,
Optional<Long> minBytesDirect) implements LuceneDirectoryOptions {
private static final Logger logger = LogManager.getLogger(DirectIOFSDirectory.class);
@Override
public Directory createLuceneDirectory(String directoryName) throws IOException {
var delegateDirectory = delegate.createLuceneDirectory(directoryName);
if (Constants.LINUX || Constants.MAC_OS_X) {
try {
int mergeBufferSize = mergeBufferSize().orElse(DirectIODirectory.DEFAULT_MERGE_BUFFER_SIZE);
long minBytesDirect = minBytesDirect().orElse(DirectIODirectory.DEFAULT_MIN_BYTES_DIRECT);
return new DirectIODirectory(delegateDirectory, mergeBufferSize, minBytesDirect);
} catch (UnsupportedOperationException ex) {
logger.warn("Failed to open FSDirectory with DIRECT flag", ex);
return delegateDirectory;
}
} else {
logger.warn("Failed to open FSDirectory with DIRECT flag because the operating system is Windows");
return delegateDirectory;
}
}
@Override
public Optional<Path> getManagedPath() {
return delegate.getManagedPath();
}
}
record RocksDBDirectory(Path managedPath) implements PathDirectoryOptions {
@Override
public Directory createLuceneDirectory(String directoryName) throws IOException {
return new RocksdbDirectory(managedPath.resolve(directoryName + ".lucene.db"));
}
}
record NRTCachingDirectory(LuceneDirectoryOptions delegate, long maxMergeSizeBytes, long maxCachedBytes) implements
LuceneDirectoryOptions {
@Override
public Directory createLuceneDirectory(String directoryName) throws IOException {
var delegateDirectory = delegate.createLuceneDirectory(directoryName);
return new org.apache.lucene.store.NRTCachingDirectory(delegateDirectory,
maxMergeSizeBytes / 1024D / 1024D,
maxCachedBytes / 1024D / 1024D
);
}
@Override
public Optional<Path> getManagedPath() {
return delegate.getManagedPath();
}
}
sealed interface StandardFSDirectoryOptions extends PathDirectoryOptions {
@Override
FSDirectory createLuceneDirectory(String directoryName) throws IOException;
}
sealed interface PathDirectoryOptions extends LuceneDirectoryOptions {
Path managedPath();
@Override
default Optional<Path> getManagedPath() {
return Optional.of(managedPath());
}
}
}

View File

@ -4,6 +4,7 @@ import io.soabase.recordbuilder.core.RecordBuilder;
import java.time.Duration;
import java.util.Map;
import java.util.Optional;
import org.apache.lucene.store.Directory;
import org.jetbrains.annotations.Nullable;
@RecordBuilder
@ -11,12 +12,10 @@ public record LuceneOptions(Map<String, String> extraFlags,
Duration queryRefreshDebounceTime,
Duration commitDebounceTime,
boolean lowMemory,
boolean inMemory,
Optional<DirectIOOptions> directIOOptions,
boolean allowMemoryMapping,
Optional<NRTCachingOptions> nrtCachingOptions,
LuceneDirectoryOptions directoryOptions,
long indexWriterBufferSize,
boolean applyAllDeletes,
boolean writeAllDeletes,
boolean allowNonVolatileCollection,
int maxInMemoryResultEntries) {}
int maxInMemoryResultEntries) {
}

View File

@ -1,6 +0,0 @@
package it.cavallium.dbengine.client;
import io.soabase.recordbuilder.core.RecordBuilder;
@RecordBuilder
public record NRTCachingOptions(double maxMergeSizeMB, double maxCachedMB) {}

View File

@ -105,7 +105,6 @@ public class LLLocalDatabaseConnection implements LLDatabaseConnection {
}
Objects.requireNonNull(env, "Environment not set");
return new LLLocalMultiLuceneIndex(env,
luceneOptions.inMemory() ? null : basePath.resolve("lucene"),
meterRegistry,
clusterName,
instancesCount,
@ -115,7 +114,7 @@ public class LLLocalDatabaseConnection implements LLDatabaseConnection {
luceneHacks
);
} else {
return new LLLocalLuceneIndex(env, luceneOptions.inMemory() ? null : basePath.resolve("lucene"),
return new LLLocalLuceneIndex(env,
meterRegistry,
clusterName,
shardName,

View File

@ -11,11 +11,9 @@ import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Timer;
import io.net5.buffer.api.Send;
import it.cavallium.dbengine.client.DirectIOOptions;
import it.cavallium.dbengine.client.IndicizerAnalyzers;
import it.cavallium.dbengine.client.IndicizerSimilarities;
import it.cavallium.dbengine.client.LuceneOptions;
import it.cavallium.dbengine.client.NRTCachingOptions;
import it.cavallium.dbengine.client.query.QueryParser;
import it.cavallium.dbengine.client.query.current.data.Query;
import it.cavallium.dbengine.client.query.current.data.QueryParams;
@ -28,7 +26,6 @@ import it.cavallium.dbengine.database.LLTerm;
import it.cavallium.dbengine.database.LLUpdateDocument;
import it.cavallium.dbengine.database.LLUpdateFields;
import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.lucene.AlwaysDirectIOFSDirectory;
import it.cavallium.dbengine.lucene.LuceneHacks;
import it.cavallium.dbengine.lucene.LuceneUtils;
import it.cavallium.dbengine.lucene.collector.Buckets;
@ -63,15 +60,9 @@ import org.apache.lucene.index.SerialMergeScheduler;
import org.apache.lucene.index.SimpleMergedSegmentWarmer;
import org.apache.lucene.index.SnapshotDeletionPolicy;
import org.apache.lucene.index.TieredMergePolicy;
import org.apache.lucene.misc.store.DirectIODirectory;
import org.apache.lucene.search.similarities.Similarity;
import org.apache.lucene.store.ByteBuffersDirectory;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FSDirectory;
import org.apache.lucene.store.MMapDirectory;
import org.apache.lucene.store.NIOFSDirectory;
import org.apache.lucene.store.NRTCachingDirectory;
import org.apache.lucene.util.Constants;
import org.apache.lucene.util.InfoStream;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
@ -121,7 +112,6 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
private final AtomicBoolean closeRequested = new AtomicBoolean();
public LLLocalLuceneIndex(LLTempLMDBEnv env,
@Nullable Path luceneBasePath,
MeterRegistry meterRegistry,
@Nullable String clusterName,
@Nullable String shardName,
@ -135,14 +125,6 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
String logName = Objects.requireNonNullElse(clusterName, shardName);
String luceneIndexName = Objects.requireNonNullElse(shardName, clusterName);
Path directoryPath;
if (luceneOptions.inMemory() != (luceneBasePath == null)) {
throw new IllegalArgumentException();
} else if (luceneBasePath != null) {
directoryPath = luceneBasePath.resolve(shardName + ".lucene.db");
} else {
directoryPath = null;
}
if (luceneIndexName.length() == 0) {
throw new IOException("Empty lucene database name");
}
@ -151,67 +133,11 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
} else {
logger.debug("Lucene MMap is supported");
}
boolean lowMemory = luceneOptions.lowMemory();
if (luceneOptions.inMemory()) {
this.directory = new ByteBuffersDirectory();
} else {
Directory directory;
{
Directory forcedDirectFsDirectory = null;
if (luceneOptions.directIOOptions().isPresent()) {
DirectIOOptions directIOOptions = luceneOptions.directIOOptions().get();
if (directIOOptions.alwaysForceDirectIO()) {
try {
forcedDirectFsDirectory = new AlwaysDirectIOFSDirectory(directoryPath);
} catch (UnsupportedOperationException ex) {
logger.warn("Failed to open FSDirectory with DIRECT flag", ex);
}
}
}
if (forcedDirectFsDirectory != null) {
directory = forcedDirectFsDirectory;
} else {
FSDirectory fsDirectory;
if (luceneOptions.allowMemoryMapping()) {
fsDirectory = FSDirectory.open(directoryPath);
} else {
fsDirectory = new NIOFSDirectory(directoryPath);
}
if (Constants.LINUX || Constants.MAC_OS_X) {
try {
int mergeBufferSize;
long minBytesDirect;
if (luceneOptions.directIOOptions().isPresent()) {
var directIOOptions = luceneOptions.directIOOptions().get();
mergeBufferSize = directIOOptions.mergeBufferSize();
minBytesDirect = directIOOptions.minBytesDirect();
} else {
mergeBufferSize = DirectIODirectory.DEFAULT_MERGE_BUFFER_SIZE;
minBytesDirect = DirectIODirectory.DEFAULT_MIN_BYTES_DIRECT;
}
directory = new DirectIODirectory(fsDirectory, mergeBufferSize, minBytesDirect);
} catch (UnsupportedOperationException ex) {
logger.warn("Failed to open FSDirectory with DIRECT flag", ex);
directory = fsDirectory;
}
} else {
directory = fsDirectory;
}
}
}
if (luceneOptions.nrtCachingOptions().isPresent()) {
NRTCachingOptions nrtCachingOptions = luceneOptions.nrtCachingOptions().get();
directory = new NRTCachingDirectory(directory, nrtCachingOptions.maxMergeSizeMB(),
nrtCachingOptions.maxCachedMB());
}
this.directory = directory;
}
this.lowMemory = luceneOptions.lowMemory();
this.directory = luceneOptions.directoryOptions().createLuceneDirectory(luceneIndexName);
this.luceneIndexName = luceneIndexName;
var snapshotter = new SnapshotDeletionPolicy(new KeepOnlyLastCommitDeletionPolicy());
this.lowMemory = lowMemory;
this.luceneAnalyzer = LuceneUtils.toPerFieldAnalyzerWrapper(indicizerAnalyzers);
this.luceneSimilarity = LuceneUtils.toPerFieldSimilarityWrapper(indicizerSimilarities);
@ -239,7 +165,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
var concurrentMergeScheduler = new ConcurrentMergeScheduler();
// false means SSD, true means HDD
concurrentMergeScheduler.setDefaultMaxMergesAndThreads(false);
if (luceneOptions.inMemory()) {
if (luceneOptions.directoryOptions().getManagedPath().isEmpty()) {
concurrentMergeScheduler.disableAutoIOThrottle();
} else {
concurrentMergeScheduler.enableAutoIOThrottle();

View File

@ -66,7 +66,6 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex {
private final DecimalBucketMultiSearcher decimalBucketMultiSearcher = new DecimalBucketMultiSearcher();
public LLLocalMultiLuceneIndex(LLTempLMDBEnv env,
Path lucene,
MeterRegistry meterRegistry,
String clusterName,
int instancesCount,
@ -89,7 +88,6 @@ public class LLLocalMultiLuceneIndex implements LLLuceneIndex {
shardName = clusterName + "_" + String.format("%03d", i);
}
luceneIndices[i] = new LLLocalLuceneIndex(env,
lucene,
meterRegistry,
clusterName,
shardName,

View File

@ -5,7 +5,9 @@ import io.net5.buffer.api.BufferAllocator;
import it.cavallium.dbengine.client.DatabaseOptions;
import it.cavallium.dbengine.client.IndicizerAnalyzers;
import it.cavallium.dbengine.client.IndicizerSimilarities;
import it.cavallium.dbengine.client.LuceneDirectoryOptions.ByteBuffersDirectory;
import it.cavallium.dbengine.client.LuceneOptions;
import it.cavallium.dbengine.client.LuceneOptionsBuilder;
import it.cavallium.dbengine.database.Column;
import it.cavallium.dbengine.database.LLDatabaseConnection;
import it.cavallium.dbengine.database.LLKeyValueDatabase;
@ -14,7 +16,6 @@ import it.cavallium.dbengine.database.disk.LLLocalLuceneIndex;
import it.cavallium.dbengine.database.disk.LLTempLMDBEnv;
import it.cavallium.dbengine.lucene.LuceneHacks;
import it.cavallium.dbengine.netty.JMXNettyMonitoringManager;
import java.nio.file.Files;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
@ -84,19 +85,22 @@ public class LLMemoryDatabaseConnection implements LLDatabaseConnection {
int instancesCount,
IndicizerAnalyzers indicizerAnalyzers,
IndicizerSimilarities indicizerSimilarities,
LuceneOptions luceneOptions,
LuceneOptions inputLuceneOptions,
@Nullable LuceneHacks luceneHacks) {
var memoryLuceneOptions = LuceneOptionsBuilder
.builder(inputLuceneOptions)
.directoryOptions(new ByteBuffersDirectory())
.build();
return Mono
.<LLLuceneIndex>fromCallable(() -> {
var env = this.env.get();
return new LLLocalLuceneIndex(env,
null,
meterRegistry,
clusterName,
shardName,
indicizerAnalyzers,
indicizerSimilarities,
luceneOptions,
memoryLuceneOptions,
luceneHacks
);
})

View File

@ -0,0 +1,202 @@
package it.cavallium.dbengine.lucene.directory;
import org.apache.lucene.index.IndexFileNames;
import org.apache.lucene.store.*;
import org.apache.lucene.util.Accountable;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import org.rocksdb.RocksDBException;
/**
* Created by wens on 16-3-10.
*/
public class RocksdbDirectory extends BaseDirectory implements Accountable {
private static final int BUFFER_SIZE = 10 * 1024;
protected final RocksdbFileStore store;
protected final AtomicLong sizeInBytes = new AtomicLong();
/** Used to generate temp file names in {@link #createTempOutput}. */
private final AtomicLong nextTempFileCounter = new AtomicLong();
public RocksdbDirectory(Path path) throws IOException {
this(path, new SingleInstanceLockFactory());
}
/**
* Sole constructor.
*
*/
protected RocksdbDirectory(Path path, LockFactory lockFactory) throws IOException {
super(lockFactory);
store = new RocksdbFileStore(path);
}
public RocksdbDirectory(Path path, FSDirectory dir, IOContext context) throws IOException {
this(path, dir, false, context);
}
private RocksdbDirectory(Path path, FSDirectory dir, boolean closeDir, IOContext context) throws IOException {
this(path);
for (String file : dir.listAll()) {
if (!Files.isDirectory(dir.getDirectory().resolve(file))) {
copyFrom(dir, file, file, context);
}
}
if (closeDir) {
dir.close();
}
}
@Override
public final String[] listAll() {
ensureOpen();
return store.listKey().toArray(String[]::new);
}
/**
* Returns the length in bytes of a file in the directory.
*
* @throws IOException if the file does not exist
*/
@Override
public final long fileLength(String name) throws IOException {
ensureOpen();
long size = store.getSize(name);
if (size == -1) {
throw new FileNotFoundException(name);
}
return size;
}
/**
* Removes an existing file in the directory.
*
* @throws IOException if the file does not exist
*/
@Override
public void deleteFile(String name) throws IOException {
ensureOpen();
long size = store.getSize(name);
if (size != -1) {
sizeInBytes.addAndGet(-size);
store.remove(name);
} else {
throw new FileNotFoundException(name);
}
}
/**
* Creates a new, empty file in the directory with the given name. Returns a stream writing this file.
*/
@Override
public IndexOutput createOutput(String name, IOContext context) throws IOException {
ensureOpen();
try {
if (store.contains(name)) {
store.remove(name);
}
return new RocksdbOutputStream(name, store, BUFFER_SIZE, true);
} catch (RocksDBException ex) {
throw new IOException(ex);
}
}
@Override
public IndexOutput createTempOutput(String prefix, String suffix, IOContext context) {
ensureOpen();
String name = getTempFileName(prefix, suffix, nextTempFileCounter.getAndIncrement());
return new RocksdbOutputStream(name, store, BUFFER_SIZE, true);
}
/**
* Creates a file name for a temporary file. The name will start with {@code prefix}, end with
* {@code suffix} and have a reserved file extension {@code .tmp}.
*
* @see #createTempOutput(String, String, IOContext)
*/
protected static String getTempFileName(String prefix, String suffix, long counter) {
return IndexFileNames.segmentFileName(
prefix, suffix + "_" + Long.toString(counter, Character.MAX_RADIX), "tmp");
}
@Override
public void sync(Collection<String> names) {
}
@Override
public void syncMetaData() {
}
@Override
public void rename(String source, String dest) throws IOException {
ensureOpen();
try {
if (!store.contains(source)) {
throw new FileNotFoundException(source);
}
store.move(source, dest);
} catch (RocksDBException ex) {
throw new IOException(ex);
}
}
/**
* Returns a stream reading an existing file.
*/
@Override
public IndexInput openInput(String name, IOContext context) throws IOException {
ensureOpen();
try {
if (!store.contains(name)) {
throw new FileNotFoundException(name);
}
return new RocksdbInputStream(name, store, BUFFER_SIZE);
} catch (RocksDBException ex) {
throw new IOException(ex);
}
}
/**
* Closes the store to future operations, releasing associated memory.
*/
@Override
public void close() {
isOpen = false;
try {
store.close();
} catch (IOException e) {
throw new RuntimeException();
}
}
@Override
public Set<String> getPendingDeletions() {
return Set.of();
}
@Override
public long ramBytesUsed() {
return 0;
}
@Override
public Collection<Accountable> getChildResources() {
return null;
}
}

View File

@ -0,0 +1,387 @@
package it.cavallium.dbengine.lucene.directory;
import io.net5.buffer.ByteBuf;
import io.net5.buffer.ByteBufAllocator;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.jetbrains.annotations.Nullable;
import org.rocksdb.ColumnFamilyDescriptor;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.DBOptions;
import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.RocksIterator;
import org.rocksdb.WriteOptions;
/**
* Created by wens on 16-3-10.
*/
public class RocksdbFileStore {
static {
RocksDB.loadLibrary();
}
private static final int BLOCK_SIZE = 10 * 1024;
private static final ReadOptions DEFAULT_READ_OPTS = new ReadOptions();
private static final WriteOptions DEFAULT_WRITE_OPTS = new WriteOptions();
private static final ByteBuffer EMPTY_BYTE_BUF = ByteBuffer.allocateDirect(0);
private final ReadWriteLock lock = new ReentrantReadWriteLock();
private final RocksDB db;
private final ColumnFamilyHandle meta;
private final ColumnFamilyHandle data;
public RocksdbFileStore(Path path) throws IOException {
var options = new DBOptions();
options.setCreateIfMissing(true);
if (Files.notExists(path)) {
Files.createDirectories(path);
}
options.setCreateMissingColumnFamilies(true);
try {
var handles = new ArrayList<ColumnFamilyHandle>(2);
this.db = RocksDB.open(options,
path.toString(),
List.of(
new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY),
new ColumnFamilyDescriptor("metadata".getBytes(StandardCharsets.US_ASCII))
),
handles
);
this.meta = handles.get(0);
this.data = handles.get(1);
} catch (RocksDBException e) {
throw new IOException("Failed to open RocksDB meta file store", e);
}
}
public boolean contains(String key) throws RocksDBException {
lock.readLock().lock();
try {
ByteBuf metaKey = getMetaKey(key);
try {
return db.get(meta, DEFAULT_READ_OPTS, metaKey.nioBuffer(), EMPTY_BYTE_BUF) != RocksDB.NOT_FOUND;
} finally {
metaKey.release();
}
} finally {
lock.readLock().unlock();
}
}
private ByteBuf getMetaValueBuf() {
return ByteBufAllocator.DEFAULT.ioBuffer(Long.BYTES, Long.BYTES);
}
private ByteBuf getDataValueBuf() {
return ByteBufAllocator.DEFAULT.ioBuffer(BLOCK_SIZE, BLOCK_SIZE);
}
private ByteBuf getMetaKey(String key) {
ByteBuf buf = ByteBufAllocator.DEFAULT.ioBuffer(key.length());
buf.writeCharSequence(key, StandardCharsets.US_ASCII);
return buf;
}
private ByteBuf getDataKey(@Nullable ByteBuf buf, String key, int i) {
if (buf == null) {
buf = ByteBufAllocator.DEFAULT.buffer(key.length() + 1 + Integer.BYTES);
}
buf.writerIndex(0);
buf.writeCharSequence(key, StandardCharsets.US_ASCII);
buf.writeByte('\0');
buf.writeIntLE(i);
return buf;
}
public int load(String name, long position, byte[] buf, int offset, int len) throws IOException {
lock.readLock().lock();
try {
long size = getSize(name);
if (position >= size) {
return -1;
}
if (buf.length < offset + len) {
throw new IllegalArgumentException("len is too long");
}
long p = position;
int f = offset;
int n = len;
ByteBuf keyBuf = null;
ByteBuf valBuf = getDataValueBuf();
ByteBuffer valBuffer = valBuf.nioBuffer(0, BLOCK_SIZE);
try {
int m;
int r;
int i;
do {
m = (int) (p % (long) BLOCK_SIZE);
r = Math.min(BLOCK_SIZE - m, n);
i = (int) (p / (long) BLOCK_SIZE);
keyBuf = getDataKey(keyBuf, name, i);
if (db.get(data, DEFAULT_READ_OPTS, keyBuf.nioBuffer(), valBuffer)
== RocksDB.NOT_FOUND) {
throw new IOException("Block " + name + ":" + i + " not found");
}
valBuf.writerIndex(BLOCK_SIZE);
valBuf.getBytes(m, buf, f, r);
keyBuf.writerIndex(0);
keyBuf.readerIndex(0);
valBuf.writerIndex(0);
valBuf.readerIndex(0);
p += r;
f += r;
n -= r;
} while (n != 0 && p < size);
return (int) (p - position);
} finally {
if (keyBuf != null) {
keyBuf.release();
}
valBuf.release();
}
} catch (RocksDBException ex) {
throw new IOException(ex);
} finally {
lock.readLock().unlock();
}
}
/**
* @return not exist return -1
*/
public long getSize(String key) throws IOException {
lock.readLock().lock();
try {
ByteBuf metaKey = getMetaKey(key);
ByteBuf metaData = getMetaValueBuf();
try {
if (db.get(meta, DEFAULT_READ_OPTS, metaKey.nioBuffer(), metaData.internalNioBuffer(0, Long.BYTES))
!= RocksDB.NOT_FOUND) {
metaData.writerIndex(Long.BYTES);
return metaData.readLongLE();
} else {
return -1;
}
} finally {
metaData.release();
metaKey.release();
}
} catch (RocksDBException ex) {
throw new IOException(ex);
} finally {
lock.readLock().unlock();
}
}
public void remove(String key) throws IOException {
lock.writeLock().lock();
try {
long size = getSize(key);
if (size == -1) {
return;
}
ByteBuf dataKey = null;
try {
int n = (int) ((size + BLOCK_SIZE - 1) / BLOCK_SIZE);
for (int i = 0; i < n; i++) {
dataKey = getDataKey(dataKey, key, i);
db.delete(data, DEFAULT_WRITE_OPTS, dataKey.nioBuffer());
dataKey.readerIndex(0);
dataKey.writerIndex(0);
}
ByteBuf metaKey = getMetaKey(key);
try {
db.delete(meta, DEFAULT_WRITE_OPTS, metaKey.nioBuffer(0, Long.BYTES));
} finally {
metaKey.release();
}
} finally {
if (dataKey != null) {
dataKey.release();
}
}
} catch (RocksDBException ex) {
throw new IOException(ex);
} finally {
lock.writeLock().unlock();
}
}
public void clear() throws IOException {
lock.writeLock().lock();
try {
List<String> keySet = listKey();
for (String key : keySet) {
remove(key);
}
} finally {
lock.writeLock().unlock();
}
}
public List<String> listKey() {
List<String> keys = new ArrayList<>();
lock.readLock().lock();
try {
RocksIterator iterator = db.newIterator(meta);
iterator.seekToFirst();
while (iterator.isValid()) {
keys.add(new String(iterator.key()).intern());
iterator.next();
}
} finally {
lock.readLock().unlock();
}
return keys;
}
public void append(String name, byte[] buf, int offset, int len) throws IOException {
lock.writeLock().lock();
try {
long size = getSize(name);
if (size == -1) {
size = 0;
}
int f = offset;
int n = len;
ByteBuf dataKey = null;
ByteBuf bb = getDataValueBuf();
try {
do {
int m = (int) (size % (long) BLOCK_SIZE);
int r = Math.min(BLOCK_SIZE - m, n);
int i = (int) ((size) / (long) BLOCK_SIZE);
dataKey = getDataKey(dataKey, name, i);
if (m != 0) {
if (db.get(data, DEFAULT_READ_OPTS, dataKey.nioBuffer(), bb.internalNioBuffer(0, BLOCK_SIZE))
== RocksDB.NOT_FOUND) {
throw new IOException("Block " + name + ":" + i + " not found");
}
bb.writerIndex(BLOCK_SIZE);
dataKey.readerIndex(0);
} else {
bb.writerIndex(0);
}
bb.ensureWritable(BLOCK_SIZE);
bb.writerIndex(BLOCK_SIZE);
bb.setBytes(m, buf, f, r);
db.put(data, DEFAULT_WRITE_OPTS, dataKey.nioBuffer(), bb.nioBuffer(0, BLOCK_SIZE));
size += r;
f += r;
n -= r;
dataKey.readerIndex(0);
dataKey.writerIndex(0);
bb.readerIndex(0);
bb.writerIndex(0);
} while (n != 0);
} finally {
if (dataKey != null) {
dataKey.release();
}
bb.release();
}
ByteBuf metaKey = getMetaKey(name);
ByteBuf metaValue = getMetaValueBuf();
try {
metaValue.writeLongLE(size);
db.put(meta, DEFAULT_WRITE_OPTS, metaKey.nioBuffer(), metaValue.nioBuffer(0, Long.BYTES));
} finally {
metaValue.release();
metaKey.release();
}
} catch (RocksDBException ex) {
throw new IOException(ex);
} finally {
lock.writeLock().unlock();
}
}
public void move(String source, String dest) throws IOException {
lock.writeLock().lock();
try {
long s_size = getSize(source);
var metaKey = getMetaKey(dest);
var metaValue = getMetaValueBuf();
try {
metaValue.writeLongLE(s_size);
db.put(meta, DEFAULT_WRITE_OPTS, metaKey.nioBuffer(), metaValue.nioBuffer(0, Long.BYTES));
} finally {
metaValue.release();
metaKey.release();
}
int n = (int) ((s_size + BLOCK_SIZE - 1) / BLOCK_SIZE);
ByteBuf sourceKey = null;
ByteBuf destKey = null;
ByteBuf dataBuf = getDataValueBuf();
try {
for (int i = 0; i < n; i++) {
sourceKey = getDataKey(sourceKey, source, i);
destKey = getDataKey(destKey, dest, i);
var nioBuf = dataBuf.nioBuffer(0, BLOCK_SIZE);
nioBuf.limit(BLOCK_SIZE);
db.get(data, DEFAULT_READ_OPTS, sourceKey.nioBuffer(), nioBuf);
nioBuf.position(BLOCK_SIZE);
db.put(data, DEFAULT_WRITE_OPTS, destKey.nioBuffer(), nioBuf);
sourceKey.writerIndex(0);
sourceKey.readerIndex(0);
destKey.writerIndex(0);
destKey.readerIndex(0);
}
} finally {
if (sourceKey != null) {
sourceKey.release();
}
if (destKey != null) {
destKey.release();
}
dataBuf.release();
}
remove(source);
} catch (RocksDBException ex) {
throw new IOException(ex);
} finally {
lock.writeLock().unlock();
}
}
public void close() throws IOException {
db.close();
}
}

View File

@ -0,0 +1,161 @@
package it.cavallium.dbengine.lucene.directory;
import org.apache.lucene.store.IndexInput;
import java.io.EOFException;
import java.io.IOException;
/**
* Created by wens on 16-3-10.
*/
public class RocksdbInputStream extends IndexInput {
private final int bufferSize;
private long position;
private final long length;
private byte[] currentBuffer;
private int currentBufferIndex;
private final RocksdbFileStore store;
private final String name;
public RocksdbInputStream(String name, RocksdbFileStore store, int bufferSize) throws IOException {
this(name, store, bufferSize, store.getSize(name));
}
public RocksdbInputStream(String name, RocksdbFileStore store, int bufferSize, long length) {
super("RocksdbInputStream(name=" + name + ")");
this.name = name;
this.store = store;
this.bufferSize = bufferSize;
this.currentBuffer = new byte[this.bufferSize];
this.currentBufferIndex = bufferSize;
this.position = 0;
this.length = length;
}
@Override
public void close() throws IOException {
//store.close();
}
@Override
public long getFilePointer() {
return position;
}
@Override
public void seek(long pos) {
if (pos < 0 || pos > length) {
throw new IllegalArgumentException("pos must be between 0 and " + length);
}
position = pos;
currentBufferIndex = this.bufferSize;
}
@Override
public long length() {
return this.length;
}
@Override
public IndexInput slice(String sliceDescription, final long offset, final long length) throws IOException {
if (offset < 0 || length < 0 || offset + length > this.length) {
throw new IllegalArgumentException("slice() " + sliceDescription + " out of bounds: " + this);
}
return new RocksdbInputStream(name, store, bufferSize, offset + length) {
{
seek(0L);
}
@Override
public void seek(long pos) {
if (pos < 0L) {
throw new IllegalArgumentException("Seeking to negative position: " + this);
}
super.seek(pos + offset);
}
@Override
public long getFilePointer() {
return super.getFilePointer() - offset;
}
@Override
public long length() {
return super.length() - offset;
}
@Override
public IndexInput slice(String sliceDescription, long ofs, long len) throws IOException {
return super.slice(sliceDescription, offset + ofs, len);
}
};
}
@Override
public byte readByte() throws IOException {
if (position >= length) {
throw new EOFException("Read end");
}
loadBufferIfNeed();
byte b = currentBuffer[currentBufferIndex++];
position++;
return b;
}
protected void loadBufferIfNeed() throws IOException {
if (this.currentBufferIndex == this.bufferSize) {
int n = store.load(name, position, currentBuffer, 0, bufferSize);
if (n == -1) {
throw new EOFException("Read end");
}
this.currentBufferIndex = 0;
}
}
@Override
public void readBytes(byte[] b, int offset, int len) throws IOException {
if (position >= length) {
throw new EOFException("Read end");
}
int f = offset;
int n = Math.min((int) (length - position), len);
do {
loadBufferIfNeed();
int r = Math.min(bufferSize - currentBufferIndex, n);
System.arraycopy(currentBuffer, currentBufferIndex, b, f, r);
f += r;
position += r;
currentBufferIndex += r;
n -= r;
} while (n != 0);
}
@Override
public IndexInput clone() {
RocksdbInputStream in = (RocksdbInputStream) super.clone();
in.currentBuffer = new byte[bufferSize];
System.arraycopy(this.currentBuffer, 0, in.currentBuffer, 0, bufferSize);
return in;
}
}

View File

@ -0,0 +1,126 @@
package it.cavallium.dbengine.lucene.directory;
import org.apache.lucene.store.BufferedChecksum;
import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.util.Accountable;
import java.io.IOException;
import java.util.Collection;
import java.util.zip.CRC32;
import java.util.zip.Checksum;
/**
* Created by wens on 16-3-10.
*/
public class RocksdbOutputStream extends IndexOutput implements Accountable {
private final int bufferSize;
private long position;
private final byte[] currentBuffer;
private int currentBufferIndex;
private boolean dirty;
private final Checksum crc;
private final RocksdbFileStore store;
private final String name;
public RocksdbOutputStream(String name, RocksdbFileStore store, int bufferSize, boolean checksum) {
super("RocksdbOutputStream(name=" + name + ")", name);
this.name = name;
this.store = store;
this.bufferSize = bufferSize;
this.currentBuffer = new byte[this.bufferSize];
this.position = 0;
this.dirty = false;
if (checksum) {
crc = new BufferedChecksum(new CRC32());
} else {
crc = null;
}
}
@Override
public void close() throws IOException {
if (dirty) {
flush();
}
//store.close();
}
private void flush() throws IOException {
store.append(name, currentBuffer, 0, currentBufferIndex);
currentBufferIndex = 0;
dirty = false;
}
@Override
public long getFilePointer() {
return position;
}
@Override
public long getChecksum() {
if (crc != null) {
return crc.getValue();
} else {
throw new IllegalStateException("crc is null");
}
}
@Override
public void writeByte(byte b) throws IOException {
if (crc != null) {
crc.update(b);
}
if (currentBufferIndex == bufferSize) {
flush();
}
currentBuffer[currentBufferIndex++] = b;
position++;
dirty = true;
}
@Override
public void writeBytes(byte[] b, int offset, int length) throws IOException {
if (crc != null) {
crc.update(b, offset, length);
}
int f = offset;
int n = length;
do {
if (currentBufferIndex == bufferSize) {
flush();
}
int r = Math.min(bufferSize - currentBufferIndex, n);
System.arraycopy(b, f, currentBuffer, currentBufferIndex, r);
f += r;
currentBufferIndex += r;
position += r;
n -= r;
dirty = true;
} while (n != 0);
}
@Override
public long ramBytesUsed() {
return position;
}
@Override
public Collection<Accountable> getChildResources() {
return null;
}
}

View File

@ -9,8 +9,9 @@ import it.cavallium.dbengine.DbTestUtils.TestAllocator;
import it.cavallium.dbengine.client.DatabaseOptions;
import it.cavallium.dbengine.client.IndicizerAnalyzers;
import it.cavallium.dbengine.client.IndicizerSimilarities;
import it.cavallium.dbengine.client.LuceneDirectoryOptions;
import it.cavallium.dbengine.client.LuceneDirectoryOptions.ByteBuffersDirectory;
import it.cavallium.dbengine.client.LuceneOptions;
import it.cavallium.dbengine.client.NRTCachingOptions;
import it.cavallium.dbengine.database.Column;
import it.cavallium.dbengine.database.LLKeyValueDatabase;
import it.cavallium.dbengine.database.disk.LLLocalDatabaseConnection;
@ -34,15 +35,11 @@ public class LocalTemporaryDbGenerator implements TemporaryDbGenerator {
private static final AtomicInteger dbId = new AtomicInteger(0);
private static final Optional<NRTCachingOptions> NRT = Optional.empty();
private static final LuceneOptions LUCENE_OPTS = new LuceneOptions(Map.of(),
Duration.ofSeconds(5),
Duration.ofSeconds(5),
false,
true,
Optional.empty(),
true,
NRT,
new ByteBuffersDirectory(),
16 * 1024 * 1024,
true,
false,

View File

@ -8,8 +8,8 @@ import it.cavallium.dbengine.DbTestUtils.TestAllocator;
import it.cavallium.dbengine.client.DatabaseOptions;
import it.cavallium.dbengine.client.IndicizerAnalyzers;
import it.cavallium.dbengine.client.IndicizerSimilarities;
import it.cavallium.dbengine.client.LuceneDirectoryOptions.ByteBuffersDirectory;
import it.cavallium.dbengine.client.LuceneOptions;
import it.cavallium.dbengine.client.NRTCachingOptions;
import it.cavallium.dbengine.database.Column;
import it.cavallium.dbengine.database.memory.LLMemoryDatabaseConnection;
import it.cavallium.dbengine.lucene.LuceneHacks;
@ -18,18 +18,15 @@ import it.cavallium.dbengine.lucene.analyzer.TextFieldsSimilarity;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import reactor.core.publisher.Mono;
public class MemoryTemporaryDbGenerator implements TemporaryDbGenerator {
private static final Optional<NRTCachingOptions> NRT = Optional.empty();
private static final LuceneOptions LUCENE_OPTS = new LuceneOptions(Map.of(), Duration.ofSeconds(5), Duration.ofSeconds(5),
private static final LuceneOptions LUCENE_OPTS = new LuceneOptions(Map.of(),
Duration.ofSeconds(5),
Duration.ofSeconds(5),
false,
true,
Optional.empty(),
true,
NRT,
new ByteBuffersDirectory(),
16 * 1024 * 1024,
true,
false,