Shared block cache in RocksJava
Summary: Changes to support sharing block cache using the Java API. Previously DB instances could share the block cache only when the same Options instance is passed to all the DB instances. But now, with this change, it is possible to explicitly create a cache and pass it to multiple options instances, to share the block cache. Implementing this for [Rocksandra](https://github.com/instagram/cassandra/tree/rocks_3.0), but this feature has been requested by many java api users over the years. Closes https://github.com/facebook/rocksdb/pull/3623 Differential Revision: D7305794 Pulled By: sagar0 fbshipit-source-id: 03e4e8ed7aeee6f88bada4a8365d4279ede2ad71
This commit is contained in:
parent
f1b7b790c9
commit
d5585bb605
@ -38,13 +38,14 @@ jlong Java_org_rocksdb_PlainTableConfig_newTableFactoryHandle(
|
||||
/*
|
||||
* Class: org_rocksdb_BlockBasedTableConfig
|
||||
* Method: newTableFactoryHandle
|
||||
* Signature: (ZJIJIIZIZZZJIBBI)J
|
||||
* Signature: (ZJIJJIIZIZZZJIBBI)J
|
||||
*/
|
||||
jlong Java_org_rocksdb_BlockBasedTableConfig_newTableFactoryHandle(
|
||||
JNIEnv* env, jobject jobj, jboolean no_block_cache, jlong block_cache_size,
|
||||
jint block_cache_num_shardbits, jlong block_size, jint block_size_deviation,
|
||||
jint block_restart_interval, jboolean whole_key_filtering,
|
||||
jlong jfilterPolicy, jboolean cache_index_and_filter_blocks,
|
||||
JNIEnv *env, jobject jobj, jboolean no_block_cache, jlong block_cache_size,
|
||||
jint block_cache_num_shardbits, jlong jblock_cache, jlong block_size,
|
||||
jint block_size_deviation, jint block_restart_interval,
|
||||
jboolean whole_key_filtering, jlong jfilter_policy,
|
||||
jboolean cache_index_and_filter_blocks,
|
||||
jboolean pin_l0_filter_and_index_blocks_in_cache,
|
||||
jboolean hash_index_allow_collision, jlong block_cache_compressed_size,
|
||||
jint block_cache_compressd_num_shard_bits, jbyte jchecksum_type,
|
||||
@ -52,22 +53,28 @@ jlong Java_org_rocksdb_BlockBasedTableConfig_newTableFactoryHandle(
|
||||
rocksdb::BlockBasedTableOptions options;
|
||||
options.no_block_cache = no_block_cache;
|
||||
|
||||
if (!no_block_cache && block_cache_size > 0) {
|
||||
if (block_cache_num_shardbits > 0) {
|
||||
options.block_cache =
|
||||
rocksdb::NewLRUCache(block_cache_size, block_cache_num_shardbits);
|
||||
} else {
|
||||
options.block_cache = rocksdb::NewLRUCache(block_cache_size);
|
||||
if (!no_block_cache) {
|
||||
if (jblock_cache > 0) {
|
||||
std::shared_ptr<rocksdb::Cache> *pCache =
|
||||
reinterpret_cast<std::shared_ptr<rocksdb::Cache> *>(jblock_cache);
|
||||
options.block_cache = *pCache;
|
||||
} else if (block_cache_size > 0) {
|
||||
if (block_cache_num_shardbits > 0) {
|
||||
options.block_cache =
|
||||
rocksdb::NewLRUCache(block_cache_size, block_cache_num_shardbits);
|
||||
} else {
|
||||
options.block_cache = rocksdb::NewLRUCache(block_cache_size);
|
||||
}
|
||||
}
|
||||
}
|
||||
options.block_size = block_size;
|
||||
options.block_size_deviation = block_size_deviation;
|
||||
options.block_restart_interval = block_restart_interval;
|
||||
options.whole_key_filtering = whole_key_filtering;
|
||||
if (jfilterPolicy > 0) {
|
||||
if (jfilter_policy > 0) {
|
||||
std::shared_ptr<rocksdb::FilterPolicy> *pFilterPolicy =
|
||||
reinterpret_cast<std::shared_ptr<rocksdb::FilterPolicy> *>(
|
||||
jfilterPolicy);
|
||||
jfilter_policy);
|
||||
options.filter_policy = *pFilterPolicy;
|
||||
}
|
||||
options.cache_index_and_filter_blocks = cache_index_and_filter_blocks;
|
||||
|
@ -15,6 +15,7 @@ public class BlockBasedTableConfig extends TableFormatConfig {
|
||||
noBlockCache_ = false;
|
||||
blockCacheSize_ = 8 * 1024 * 1024;
|
||||
blockCacheNumShardBits_ = 0;
|
||||
blockCache_ = null;
|
||||
blockSize_ = 4 * 1024;
|
||||
blockSizeDeviation_ = 10;
|
||||
blockRestartInterval_ = 16;
|
||||
@ -71,6 +72,24 @@ public class BlockBasedTableConfig extends TableFormatConfig {
|
||||
return blockCacheSize_;
|
||||
}
|
||||
|
||||
/**
|
||||
* Use the specified cache for blocks.
|
||||
* When not null this take precedence even if the user sets a block cache size.
|
||||
*
|
||||
* {@link org.rocksdb.Cache} should not be disposed before options instances
|
||||
* using this cache is disposed.
|
||||
*
|
||||
* {@link org.rocksdb.Cache} instance can be re-used in multiple options
|
||||
* instances.
|
||||
*
|
||||
* @param cache {@link org.rocksdb.Cache} Cache java instance (e.g. LRUCache).
|
||||
* @return the reference to the current config.
|
||||
*/
|
||||
public BlockBasedTableConfig setBlockCache(final Cache cache) {
|
||||
blockCache_ = cache;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Controls the number of shards for the block cache.
|
||||
* This is applied only if cacheSize is set to non-negative.
|
||||
@ -413,25 +432,25 @@ public class BlockBasedTableConfig extends TableFormatConfig {
|
||||
filterHandle = filter_.nativeHandle_;
|
||||
}
|
||||
|
||||
return newTableFactoryHandle(noBlockCache_, blockCacheSize_,
|
||||
blockCacheNumShardBits_, blockSize_, blockSizeDeviation_,
|
||||
blockRestartInterval_, wholeKeyFiltering_,
|
||||
filterHandle, cacheIndexAndFilterBlocks_,
|
||||
pinL0FilterAndIndexBlocksInCache_,
|
||||
hashIndexAllowCollision_, blockCacheCompressedSize_,
|
||||
blockCacheCompressedNumShardBits_,
|
||||
checksumType_.getValue(), indexType_.getValue(),
|
||||
long blockCacheHandle = 0;
|
||||
if (blockCache_ != null) {
|
||||
blockCacheHandle = blockCache_.nativeHandle_;
|
||||
}
|
||||
|
||||
return newTableFactoryHandle(noBlockCache_, blockCacheSize_, blockCacheNumShardBits_,
|
||||
blockCacheHandle, blockSize_, blockSizeDeviation_, blockRestartInterval_,
|
||||
wholeKeyFiltering_, filterHandle, cacheIndexAndFilterBlocks_,
|
||||
pinL0FilterAndIndexBlocksInCache_, hashIndexAllowCollision_, blockCacheCompressedSize_,
|
||||
blockCacheCompressedNumShardBits_, checksumType_.getValue(), indexType_.getValue(),
|
||||
formatVersion_);
|
||||
}
|
||||
|
||||
private native long newTableFactoryHandle(
|
||||
boolean noBlockCache, long blockCacheSize, int blockCacheNumShardBits,
|
||||
long blockSize, int blockSizeDeviation, int blockRestartInterval,
|
||||
boolean wholeKeyFiltering, long filterPolicyHandle,
|
||||
private native long newTableFactoryHandle(boolean noBlockCache, long blockCacheSize,
|
||||
int blockCacheNumShardBits, long blockCacheHandle, long blockSize, int blockSizeDeviation,
|
||||
int blockRestartInterval, boolean wholeKeyFiltering, long filterPolicyHandle,
|
||||
boolean cacheIndexAndFilterBlocks, boolean pinL0FilterAndIndexBlocksInCache,
|
||||
boolean hashIndexAllowCollision, long blockCacheCompressedSize,
|
||||
int blockCacheCompressedNumShardBits, byte checkSumType,
|
||||
byte indexType, int formatVersion);
|
||||
int blockCacheCompressedNumShardBits, byte checkSumType, byte indexType, int formatVersion);
|
||||
|
||||
private boolean cacheIndexAndFilterBlocks_;
|
||||
private boolean pinL0FilterAndIndexBlocksInCache_;
|
||||
@ -442,6 +461,7 @@ public class BlockBasedTableConfig extends TableFormatConfig {
|
||||
private long blockSize_;
|
||||
private long blockCacheSize_;
|
||||
private int blockCacheNumShardBits_;
|
||||
private Cache blockCache_;
|
||||
private long blockCacheCompressedSize_;
|
||||
private int blockCacheCompressedNumShardBits_;
|
||||
private int blockSizeDeviation_;
|
||||
|
@ -6,7 +6,11 @@
|
||||
package org.rocksdb;
|
||||
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.TemporaryFolder;
|
||||
|
||||
import java.nio.charset.StandardCharsets;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
@ -16,6 +20,8 @@ public class BlockBasedTableConfigTest {
|
||||
public static final RocksMemoryResource rocksMemoryResource =
|
||||
new RocksMemoryResource();
|
||||
|
||||
@Rule public TemporaryFolder dbFolder = new TemporaryFolder();
|
||||
|
||||
@Test
|
||||
public void noBlockCache() {
|
||||
BlockBasedTableConfig blockBasedTableConfig = new BlockBasedTableConfig();
|
||||
@ -31,6 +37,31 @@ public class BlockBasedTableConfigTest {
|
||||
isEqualTo(8 * 1024);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void sharedBlockCache() throws RocksDBException {
|
||||
try (final Cache cache = new LRUCache(8 * 1024 * 1024);
|
||||
final Statistics statistics = new Statistics()) {
|
||||
for (int shard = 0; shard < 8; shard++) {
|
||||
try (final Options options =
|
||||
new Options()
|
||||
.setCreateIfMissing(true)
|
||||
.setStatistics(statistics)
|
||||
.setTableFormatConfig(new BlockBasedTableConfig().setBlockCache(cache));
|
||||
final RocksDB db =
|
||||
RocksDB.open(options, dbFolder.getRoot().getAbsolutePath() + "/" + shard)) {
|
||||
final byte[] key = "some-key".getBytes(StandardCharsets.UTF_8);
|
||||
final byte[] value = "some-value".getBytes(StandardCharsets.UTF_8);
|
||||
|
||||
db.put(key, value);
|
||||
db.flush(new FlushOptions());
|
||||
db.get(key);
|
||||
|
||||
assertThat(statistics.getTickerCount(TickerType.BLOCK_CACHE_ADD)).isEqualTo(shard + 1);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void blockSizeDeviation() {
|
||||
BlockBasedTableConfig blockBasedTableConfig = new BlockBasedTableConfig();
|
||||
@ -148,6 +179,14 @@ public class BlockBasedTableConfigTest {
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void blockBasedTableWithBlockCache() {
|
||||
try (final Options options = new Options().setTableFormatConfig(
|
||||
new BlockBasedTableConfig().setBlockCache(new LRUCache(17 * 1024 * 1024)))) {
|
||||
assertThat(options.tableFactoryName()).isEqualTo("BlockBasedTable");
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void blockBasedTableFormatVersion() {
|
||||
BlockBasedTableConfig config = new BlockBasedTableConfig();
|
||||
|
Loading…
Reference in New Issue
Block a user