Compare commits

...

20 Commits

Author SHA1 Message Date
sdong
4844d698b9 Disable error as warning 2019-11-05 11:05:17 -08:00
sdong
157da07b6d Add one more #include<functional> 2019-11-05 11:05:16 -08:00
sdong
951ae63e50 Add some include<functional> 2019-10-31 14:21:04 -07:00
sdong
8df5fdedba [FB Internal] Point to the latest tool chain. 2019-10-31 14:21:03 -07:00
sdong
3c56f032cd [fb only] revert unintended change of USE_SSE
The previuos change that use gcc-5 set USE_SSE to wrong flag by mistake. Fix it.
2017-07-17 22:20:53 -07:00
sdong
368afc9d22 [FB Only] use gcc-5 2017-07-17 21:59:53 -07:00
krad
5339a5e6e6 Add factory method for creating persistent cache that is accessible from public
Summary:
Currently there is no mechanism to create persistent cache from
headers. Adding a simple factory method to create a simple persistent cache with
default or NVM optimized settings.

note: Any idea to test this factory is appreciated.

Test Plan: None

Reviewers: sdong

Subscribers: andrewkr, dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D64527
2016-10-03 16:14:03 -07:00
Islam AbdelRahman
24c3b2b21e Fix conflict between AddFile() and CompactRange()
Summary:
Fix the conflict bug between AddFile() and CompactRange() by
- Make sure that no AddFile calls are running when asking CompactionPicker to pick compaction for manual compaction
- If AddFile() run after we pick the compaction for the manual compaction it will be aware of it since we will add the manual compaction to running_compactions_ after picking it

This will solve these 2 scenarios
- If AddFile() is running, we will wait for it to finish before we pick a compaction for the manual compaction
- If we already picked a manual compaction and then AddFile() started ... we ensure that it never ingest a file in a level that will overlap with the manual compaction

Test Plan: unit tests

Reviewers: sdong

Reviewed By: sdong

Subscribers: andrewkr, yoshinorim, jkedgar, dhruba

Differential Revision: https://reviews.facebook.net/D64449
2016-09-28 16:16:10 -07:00
Islam AbdelRahman
f0b881629f Fix AddFile() conflict with compaction output [WaitForAddFile()]
Summary:
Since AddFile unlock/lock the mutex inside LogAndApply() we need to ensure that during this period other compactions cannot run since such compactions are not aware of the file we are ingesting and could create a compaction that overlap wit this file

this diff add
- WaitForAddFile() call that will ensure that no AddFile() calls are being processed right now
- Call `WaitForAddFile()` in 3 locations
-- When doing manual Compaction
-- When starting automatic Compaction
-- When  doing CompactFiles()

Test Plan: unit test

Reviewers: lightmark, yiwu, andrewkr, sdong

Reviewed By: sdong

Subscribers: andrewkr, yoshinorim, jkedgar, dhruba

Differential Revision: https://reviews.facebook.net/D64383
2016-09-27 10:37:29 -07:00
Islam AbdelRahman
8ee2ee8952 Fix CompactFilesTest.ObsoleteFiles timeout (#1353) 2016-09-26 12:03:39 -07:00
Aaron Gao
af28d114d6 not cut compaction output when compact to level 0
Summary: we should not call ShouldStopBefore() in compaction when the compaction targets level 0. Otherwise, CheckConsistency will fail the assertion of seq number check on level 0.

Test Plan:
make all check -j64
I also manully test that using db_bench to compact files to level 0. Without this line change, the assertion files and multiple files are generated on level 0 after compaction.

Reviewers: yhchiang, andrewkr, yiwu, sdong

Reviewed By: sdong

Subscribers: andrewkr, dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D64269
2016-09-25 20:08:34 -07:00
yiwu-arbug
a34c1e3373 Recover same sequence id from WAL (#1350)
Summary:
Revert the behavior where we don't read sequence id from WAL, but increase it as we replay the log. We still keep the behave for 2PC for now but will fix later.

This change fixes github issue 1339, where some writes come with WAL disabled and we may recover records with wrong sequence id.

Test Plan: Added unit test.

Subscribers: andrewkr, dhruba

Differential Revision: https://reviews.facebook.net/D64275
2016-09-25 20:08:24 -07:00
Yi Wu
165cec6ef6 Fix DBWALTest.RecoveryWithLogDataForSomeCFs with mac
Summary: Seems there's no std::array on mac+clang. Use raw array instead.

Test Plan: run ./db_wal_test on mac.

Reviewers: andrewkr

Reviewed By: andrewkr

Subscribers: andrewkr, dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D64005
2016-09-19 14:29:29 -07:00
Andrew Kryczka
d7d6a9a41d Fix recovery for WALs without data for all CFs
Summary:
if one or more CFs had no data in the WAL, the log number that's used
by FindObsoleteFiles() wasn't updated. We need to treat this case the same as
if the data for that WAL had been flushed.

Test Plan: new unit test

Reviewers: IslamAbdelRahman, yiwu, sdong

Reviewed By: sdong

Subscribers: andrewkr, dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D63963
2016-09-19 12:51:54 -07:00
Andrew Kryczka
55ab150b46 Fix GetSortedWalFiles when log recycling enabled
Summary:
Previously the sequence number was mistakenly passed in an argument
where the log number should go. This caused the reader to assume the old WAL
format was used, which is incompatible with the WAL recycling format.

Test Plan:
new unit test, verified it fails before this change and passes
afterwards.

Reviewers: yiwu, lightmark, IslamAbdelRahman, sdong

Reviewed By: sdong

Subscribers: andrewkr, dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D63987
2016-09-19 12:51:48 -07:00
xh931076284
cba883c538 Fix bug in UnScSigned-off-by: xh931076284 <931076284@qq.com> (#1336)
Fix HdfsEnv::UnSchedule() API error
2016-09-19 12:50:12 -07:00
Yi Wu
c4ffd74608 Fix java makefile dependencies
Summary: Fix dependencies in java makefile, to avoid java build failure.

Test Plan: run "make rocksdbjava" multiple times.

Reviewers: IslamAbdelRahman, sdong, yhchiang

Reviewed By: yhchiang

Subscribers: andrewkr, dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D64023
2016-09-19 12:49:47 -07:00
Yi Wu
ac7e52c2ba Rename jvalue to jval in rocksjni
Summary: jvalue shadows a global name in <jni.h>. Rename it to jval to fix java build.

Test Plan:
    JAVA_HOME=/usr/local/jdk-7u10-64 make rocksdbjava -j64

Reviewers: adamretter, yhchiang, IslamAbdelRahman

Reviewed By: yhchiang, IslamAbdelRahman

Subscribers: andrewkr, dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D63981
2016-09-19 12:49:38 -07:00
Adam Retter
28d8501876 Allow an offset as well as a length to be specified for byte[] operations in RocksJava JNI (#1264)
Test Plan: Execute the Java test suite

Reviewers: yhchiang

Subscribers: andrewkr, dhruba

Differential Revision: https://reviews.facebook.net/D61971
2016-09-19 12:47:40 -07:00
Islam AbdelRahman
e8bbb13a97 Release RocksDB 4.12
Summary: Release 4.12

Test Plan: none

Reviewers: andrewkr, yiwu, lightmark, sdong

Reviewed By: sdong

Subscribers: andrewkr, dhruba

Differential Revision: https://reviews.facebook.net/D63885
2016-09-12 12:56:56 -07:00
30 changed files with 896 additions and 455 deletions

View File

@ -1,5 +1,5 @@
# Rocksdb Change Log
## Unreleased
## 4.12.0 (9/12/2016)
### Public API Change
* CancelAllBackgroundWork() flushes all memtables for databases containing writes that have bypassed the WAL (writes issued with WriteOptions::disableWAL=true) before shutting down background threads.
* Merge options source_compaction_factor, max_grandparent_overlap_bytes and expanded_compaction_factor into max_compaction_bytes.
@ -9,6 +9,7 @@
### New Features
* Introduce NewClockCache, which is based on CLOCK algorithm with better concurrent performance in some cases. It can be used to replace the default LRU-based block cache and table cache. To use it, RocksDB need to be linked with TBB lib.
* Change ticker/histogram statistics implementations to accumulate data in thread-local storage, which improves CPU performance by reducing cache coherency costs. Callers of CreateDBStatistics do not need to change anything to use this feature.
* Block cache mid-point insertion, where index and filter block are inserted into LRU block cache with higher priority. The feature can be enabled by setting BlockBasedTableOptions::cache_index_and_filter_blocks_with_high_priority to true and high_pri_pool_ratio > 0 when creating NewLRUCache.
## 4.11.0 (8/1/2016)
### Public API Change

View File

@ -216,10 +216,6 @@ default: all
WARNING_FLAGS = -W -Wextra -Wall -Wsign-compare -Wshadow \
-Wno-unused-parameter
ifndef DISABLE_WARNING_AS_ERROR
WARNING_FLAGS += -Werror
endif
CFLAGS += $(WARNING_FLAGS) -I. -I./include $(PLATFORM_CCFLAGS) $(OPT)
CXXFLAGS += $(WARNING_FLAGS) -I. -I./include $(PLATFORM_CXXFLAGS) $(OPT) -Woverloaded-virtual -Wnon-virtual-dtor -Wno-missing-field-initializers
@ -393,9 +389,10 @@ PARALLEL_TEST = \
db_compaction_filter_test \
db_compaction_test \
db_sst_test \
external_sst_file_test \
db_test \
db_universal_compaction_test \
db_wal_test \
external_sst_file_test \
fault_injection_test \
inlineskiplist_test \
manual_compaction_test \
@ -599,7 +596,7 @@ gen_parallel_tests:
# 107.816 PASS t/DBTest.EncodeDecompressedBlockSizeTest
#
slow_test_regexp = \
^t/run-table_test-HarnessTest.Randomized$$|^t/run-db_test-.*(?:FileCreationRandomFailure|EncodeDecompressedBlockSizeTest)$$
^t/run-table_test-HarnessTest.Randomized$$|^t/run-db_test-.*(?:FileCreationRandomFailure|EncodeDecompressedBlockSizeTest)$$|^.*RecoverFromCorruptedWALWithoutFlush$$
prioritize_long_running_tests = \
perl -pe 's,($(slow_test_regexp)),100 $$1,' \
| sort -k1,1gr \
@ -820,6 +817,7 @@ clean:
find . -name "*.[oda]" -exec rm -f {} \;
find . -type f -regex ".*\.\(\(gcda\)\|\(gcno\)\)" -exec rm {} \;
rm -rf bzip2* snappy* zlib* lz4*
cd java; $(MAKE) clean
tags:
ctags * -R

View File

@ -51,12 +51,7 @@ if [ -z "$ROCKSDB_NO_FBCODE" -a -d /mnt/gvfs/third-party ]; then
FBCODE_BUILD="true"
# If we're compiling with TSAN we need pic build
PIC_BUILD=$COMPILE_WITH_TSAN
if [ -z "$ROCKSDB_FBCODE_BUILD_WITH_481" ]; then
source "$PWD/build_tools/fbcode_config.sh"
else
# we need this to build with MySQL. Don't use for other purposes.
source "$PWD/build_tools/fbcode_config4.8.1.sh"
fi
source "$PWD/build_tools/fbcode_config.sh"
fi
# Delete existing output, if it exists

View File

@ -1,17 +1,19 @@
GCC_BASE=/mnt/gvfs/third-party2/gcc/cf7d14c625ce30bae1a4661c2319c5a283e4dd22/4.9.x/centos6-native/108cf83
CLANG_BASE=/mnt/gvfs/third-party2/llvm-fb/8598c375b0e94e1448182eb3df034704144a838d/stable/centos6-native/3f16ddd
LIBGCC_BASE=/mnt/gvfs/third-party2/libgcc/d6e0a7da6faba45f5e5b1638f9edd7afc2f34e7d/4.9.x/gcc-4.9-glibc-2.20/024dbc3
GLIBC_BASE=/mnt/gvfs/third-party2/glibc/d282e6e8f3d20f4e40a516834847bdc038e07973/2.20/gcc-4.9-glibc-2.20/500e281
SNAPPY_BASE=/mnt/gvfs/third-party2/snappy/8c38a4c1e52b4c2cc8a9cdc31b9c947ed7dbfcb4/1.1.3/gcc-4.9-glibc-2.20/e9936bf
ZLIB_BASE=/mnt/gvfs/third-party2/zlib/0882df3713c7a84f15abe368dc004581f20b39d7/1.2.8/gcc-5-glibc-2.23/9bc6787
BZIP2_BASE=/mnt/gvfs/third-party2/bzip2/740325875f6729f42d28deaa2147b0854f3a347e/1.0.6/gcc-5-glibc-2.23/9bc6787
LZ4_BASE=/mnt/gvfs/third-party2/lz4/0e790b441e2d9acd68d51e1d2e028f88c6a79ddf/r131/gcc-5-glibc-2.23/9bc6787
ZSTD_BASE=/mnt/gvfs/third-party2/zstd/9455f75ff7f4831dc9fda02a6a0f8c68922fad8f/1.0.0/gcc-5-glibc-2.23/9bc6787
GFLAGS_BASE=/mnt/gvfs/third-party2/gflags/f001a51b2854957676d07306ef3abf67186b5c8b/2.1.1/gcc-4.8.1-glibc-2.17/c3f970a
JEMALLOC_BASE=/mnt/gvfs/third-party2/jemalloc/fc8a13ca1fffa4d0765c716c5a0b49f0c107518f/master/gcc-5-glibc-2.23/1c32b4b
NUMA_BASE=/mnt/gvfs/third-party2/numa/17c514c4d102a25ca15f4558be564eeed76f4b6a/2.0.8/gcc-5-glibc-2.23/9bc6787
LIBUNWIND_BASE=/mnt/gvfs/third-party2/libunwind/ad576de2a1ea560c4d3434304f0fc4e079bede42/trunk/gcc-5-glibc-2.23/b1847cb
TBB_BASE=/mnt/gvfs/third-party2/tbb/9d9a554877d0c5bef330fe818ab7178806dd316a/4.0_update2/gcc-4.9-glibc-2.20/e9936bf
KERNEL_HEADERS_BASE=/mnt/gvfs/third-party2/kernel-headers/7c111ff27e0c466235163f00f280a9d617c3d2ec/4.0.9-36_fbk5_2933_gd092e3f/gcc-5-glibc-2.23/da39a3e
BINUTILS_BASE=/mnt/gvfs/third-party2/binutils/b7fd454c4b10c6a81015d4524ed06cdeab558490/2.26/centos6-native/da39a3e
VALGRIND_BASE=/mnt/gvfs/third-party2/valgrind/d7f4d4d86674a57668e3a96f76f0e17dd0eb8765/3.10.0/gcc-4.9-glibc-2.20/e9936bf
# Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved.
GCC_BASE=/mnt/gvfs/third-party2/gcc/7331085db891a2ef4a88a48a751d834e8d68f4cb/7.x/centos7-native/b2ef2b6
CLANG_BASE=/mnt/gvfs/third-party2/llvm-fb/963d9aeda70cc4779885b1277484fe7544a04e3e/9.0.0/platform007/9e92d53/
LIBGCC_BASE=/mnt/gvfs/third-party2/libgcc/6ace84e956873d53638c738b6f65f3f469cca74c/7.x/platform007/5620abc
GLIBC_BASE=/mnt/gvfs/third-party2/glibc/192b0f42d63dcf6210d6ceae387b49af049e6e0c/2.26/platform007/f259413
SNAPPY_BASE=/mnt/gvfs/third-party2/snappy/7f9bdaada18f59bc27ec2b0871eb8a6144343aef/1.1.3/platform007/ca4da3d
ZLIB_BASE=/mnt/gvfs/third-party2/zlib/2d9f0b9a4274cc21f61272a9e89bdb859bce8f1f/1.2.8/platform007/ca4da3d
BZIP2_BASE=/mnt/gvfs/third-party2/bzip2/dc49a21c5fceec6456a7a28a94dcd16690af1337/1.0.6/platform007/ca4da3d
LZ4_BASE=/mnt/gvfs/third-party2/lz4/0f607f8fc442ea7d6b876931b1898bb573d5e5da/1.9.1/platform007/ca4da3d
ZSTD_BASE=/mnt/gvfs/third-party2/zstd/ca22bc441a4eb709e9e0b1f9fec9750fed7b31c5/1.4.x/platform007/15a3614
GFLAGS_BASE=/mnt/gvfs/third-party2/gflags/0b9929d2588991c65a57168bf88aff2db87c5d48/2.2.0/platform007/ca4da3d
JEMALLOC_BASE=/mnt/gvfs/third-party2/jemalloc/c26f08f47ac35fc31da2633b7da92d6b863246eb/master/platform007/c26c002
NUMA_BASE=/mnt/gvfs/third-party2/numa/3f3fb57a5ccc5fd21c66416c0b83e0aa76a05376/2.0.11/platform007/ca4da3d
LIBUNWIND_BASE=/mnt/gvfs/third-party2/libunwind/40c73d874898b386a71847f1b99115d93822d11f/1.4/platform007/6f3e0a9
TBB_BASE=/mnt/gvfs/third-party2/tbb/4ce8e8dba77cdbd81b75d6f0c32fd7a1b76a11ec/2018_U5/platform007/ca4da3d
KERNEL_HEADERS_BASE=/mnt/gvfs/third-party2/kernel-headers/fb251ecd2f5ae16f8671f7014c246e52a748fe0b/fb/platform007/da39a3e
BINUTILS_BASE=/mnt/gvfs/third-party2/binutils/ab9f09bba370e7066cafd4eb59752db93f2e8312/2.29.1/platform007/15a3614
VALGRIND_BASE=/mnt/gvfs/third-party2/valgrind/d42d152a15636529b0861ec493927200ebebca8e/3.15.0/platform007/ca4da3d
LUA_BASE=/mnt/gvfs/third-party2/lua/f0cd714433206d5139df61659eb7b28b1dea6683/5.3.4/platform007/5007832

View File

@ -13,7 +13,7 @@ source "$BASEDIR/dependencies.sh"
CFLAGS=""
# libgcc
LIBGCC_INCLUDE="$LIBGCC_BASE/include"
LIBGCC_INCLUDE="$LIBGCC_BASE/include/c++/7.3.0"
LIBGCC_LIBS=" -L $LIBGCC_BASE/lib"
# glibc
@ -43,12 +43,16 @@ if test -z $PIC_BUILD; then
LZ4_INCLUDE=" -I $LZ4_BASE/include/"
LZ4_LIBS=" $LZ4_BASE/lib/liblz4.a"
CFLAGS+=" -DLZ4"
ZSTD_INCLUDE=" -I $ZSTD_BASE/include/"
ZSTD_LIBS=" $ZSTD_BASE/lib/libzstd.a"
CFLAGS+=" -DZSTD"
fi
ZSTD_INCLUDE=" -I $ZSTD_BASE/include/"
if test -z $PIC_BUILD; then
ZSTD_LIBS=" $ZSTD_BASE/lib/libzstd.a"
else
ZSTD_LIBS=" $ZSTD_BASE/lib/libzstd_pic.a"
fi
CFLAGS+=" -DZSTD"
# location of gflags headers and libraries
GFLAGS_INCLUDE=" -I $GFLAGS_BASE/include/"
if test -z $PIC_BUILD; then
@ -56,7 +60,7 @@ if test -z $PIC_BUILD; then
else
GFLAGS_LIBS=" $GFLAGS_BASE/lib/libgflags_pic.a"
fi
CFLAGS+=" -DGFLAGS=google"
CFLAGS+=" -DGFLAGS=gflags"
# location of jemalloc
JEMALLOC_INCLUDE=" -I $JEMALLOC_BASE/include/"
@ -104,8 +108,8 @@ if [ -z "$USE_CLANG" ]; then
CXX="$GCC_BASE/bin/g++"
CFLAGS+=" -B$BINUTILS/gold"
CFLAGS+=" -isystem $GLIBC_INCLUDE"
CFLAGS+=" -isystem $LIBGCC_INCLUDE"
CFLAGS+=" -isystem $GLIBC_INCLUDE"
JEMALLOC=1
else
# clang
@ -116,8 +120,8 @@ else
KERNEL_HEADERS_INCLUDE="$KERNEL_HEADERS_BASE/include"
CFLAGS+=" -B$BINUTILS/gold -nostdinc -nostdlib"
CFLAGS+=" -isystem $LIBGCC_BASE/include/c++/4.9.x "
CFLAGS+=" -isystem $LIBGCC_BASE/include/c++/4.9.x/x86_64-facebook-linux "
CFLAGS+=" -isystem $LIBGCC_BASE/include/c++/7.x "
CFLAGS+=" -isystem $LIBGCC_BASE/include/c++/7.x/x86_64-facebook-linux "
CFLAGS+=" -isystem $GLIBC_INCLUDE"
CFLAGS+=" -isystem $LIBGCC_INCLUDE"
CFLAGS+=" -isystem $CLANG_INCLUDE"
@ -128,13 +132,14 @@ else
fi
CFLAGS+=" $DEPS_INCLUDE"
CFLAGS+=" -DROCKSDB_PLATFORM_POSIX -DROCKSDB_LIB_IO_POSIX -DROCKSDB_FALLOCATE_PRESENT -DROCKSDB_MALLOC_USABLE_SIZE"
CFLAGS+=" -DROCKSDB_PLATFORM_POSIX -DROCKSDB_LIB_IO_POSIX -DROCKSDB_FALLOCATE_PRESENT -DROCKSDB_MALLOC_USABLE_SIZE -DROCKSDB_RANGESYNC_PRESENT -DROCKSDB_SCHED_GETCPU_PRESENT -DROCKSDB_SUPPORT_THREAD_LOCAL -DHAVE_SSE42"
CXXFLAGS+=" $CFLAGS"
EXEC_LDFLAGS=" $SNAPPY_LIBS $ZLIB_LIBS $BZIP_LIBS $LZ4_LIBS $ZSTD_LIBS $GFLAGS_LIBS $NUMA_LIB $TBB_LIBS"
EXEC_LDFLAGS+=" -Wl,--dynamic-linker,/usr/local/fbcode/gcc-4.9-glibc-2.20/lib/ld.so"
EXEC_LDFLAGS+=" -B$BINUTILS/gold"
EXEC_LDFLAGS+=" -Wl,--dynamic-linker,/usr/local/fbcode/platform007/lib/ld.so"
EXEC_LDFLAGS+=" $LIBUNWIND"
EXEC_LDFLAGS+=" -Wl,-rpath=/usr/local/fbcode/gcc-4.9-glibc-2.20/lib"
EXEC_LDFLAGS+=" -Wl,-rpath=/usr/local/fbcode/platform007/lib"
# required by libtbb
EXEC_LDFLAGS+=" -ldl"

View File

@ -50,6 +50,7 @@ class FlushedFileCollector : public EventListener {
}
return result;
}
void ClearFlushedFiles() { flushed_files_.clear(); }
private:
std::vector<std::string> flushed_files_;
@ -116,13 +117,12 @@ TEST_F(CompactFilesTest, L0ConflictsFiles) {
TEST_F(CompactFilesTest, ObsoleteFiles) {
Options options;
// to trigger compaction more easily
const int kWriteBufferSize = 10000;
const int kWriteBufferSize = 65536;
options.create_if_missing = true;
// Disable RocksDB background compaction.
options.compaction_style = kCompactionStyleNone;
// Small slowdown and stop trigger for experimental purpose.
options.level0_slowdown_writes_trigger = 20;
options.level0_stop_writes_trigger = 20;
options.level0_slowdown_writes_trigger = (1 << 30);
options.level0_stop_writes_trigger = (1 << 30);
options.write_buffer_size = kWriteBufferSize;
options.max_write_buffer_number = 2;
options.compression = kNoCompression;
@ -154,6 +154,46 @@ TEST_F(CompactFilesTest, ObsoleteFiles) {
delete db;
}
TEST_F(CompactFilesTest, NotCutOutputOnLevel0) {
Options options;
options.create_if_missing = true;
// Disable RocksDB background compaction.
options.compaction_style = kCompactionStyleNone;
options.level0_slowdown_writes_trigger = 1000;
options.level0_stop_writes_trigger = 1000;
options.write_buffer_size = 65536;
options.max_write_buffer_number = 2;
options.compression = kNoCompression;
options.max_compaction_bytes = 5000;
// Add listener
FlushedFileCollector* collector = new FlushedFileCollector();
options.listeners.emplace_back(collector);
DB* db = nullptr;
DestroyDB(db_name_, options);
Status s = DB::Open(options, db_name_, &db);
assert(s.ok());
assert(db);
// create couple files
for (int i = 0; i < 500; ++i) {
db->Put(WriteOptions(), ToString(i), std::string(1000, 'a' + (i % 26)));
}
reinterpret_cast<DBImpl*>(db)->TEST_WaitForFlushMemTable();
auto l0_files_1 = collector->GetFlushedFiles();
collector->ClearFlushedFiles();
for (int i = 0; i < 500; ++i) {
db->Put(WriteOptions(), ToString(i), std::string(1000, 'a' + (i % 26)));
}
reinterpret_cast<DBImpl*>(db)->TEST_WaitForFlushMemTable();
auto l0_files_2 = collector->GetFlushedFiles();
ASSERT_OK(db->CompactFiles(CompactionOptions(), l0_files_1, 0));
ASSERT_OK(db->CompactFiles(CompactionOptions(), l0_files_2, 0));
// no assertion failure
delete db;
}
TEST_F(CompactFilesTest, CapturingPendingFiles) {
Options options;
options.create_if_missing = true;

View File

@ -762,7 +762,8 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
if (end != nullptr &&
cfd->user_comparator()->Compare(c_iter->user_key(), *end) >= 0) {
break;
} else if (sub_compact->ShouldStopBefore(
} else if (sub_compact->compaction->output_level() != 0 &&
sub_compact->ShouldStopBefore(
key, sub_compact->current_output_file_size) &&
sub_compact->builder != nullptr) {
status = FinishCompactionOutputFile(input->status(), sub_compact);

View File

@ -341,6 +341,7 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname)
next_job_id_(1),
has_unpersisted_data_(false),
env_options_(db_options_),
num_running_addfile_(0),
#ifndef ROCKSDB_LITE
wal_manager_(db_options_, env_options_),
#endif // ROCKSDB_LITE
@ -1393,7 +1394,6 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
bool stop_replay_by_wal_filter = false;
bool stop_replay_for_corruption = false;
bool flushed = false;
SequenceNumber recovered_sequence = 0;
for (auto log_number : log_numbers) {
// The previous incarnation may not have written any MANIFEST
// records after allocating this log number. So we manually
@ -1472,13 +1472,13 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
WriteBatchInternal::SetContents(&batch, record);
SequenceNumber sequence = WriteBatchInternal::Sequence(&batch);
// In point-in-time recovery mode, if sequence id of log files are
// consecutive, we continue recovery despite corruption. This could happen
// when we open and write to a corrupted DB, where sequence id will start
// from the last sequence id we recovered.
if (db_options_.wal_recovery_mode ==
WALRecoveryMode::kPointInTimeRecovery) {
if (sequence == recovered_sequence + 1) {
// In point-in-time recovery mode, if sequence id of log files are
// consecutive, we continue recovery despite corruption. This could
// happen when we open and write to a corrupted DB, where sequence id
// will start from the last sequence id we recovered.
if (sequence == *next_sequence) {
stop_replay_for_corruption = false;
}
if (stop_replay_for_corruption) {
@ -1487,13 +1487,16 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
}
}
recovered_sequence = sequence;
bool no_prev_seq = true;
if (*next_sequence == kMaxSequenceNumber) {
if (!db_options_.allow_2pc) {
*next_sequence = sequence;
} else {
no_prev_seq = false;
WriteBatchInternal::SetSequence(&batch, *next_sequence);
if (*next_sequence == kMaxSequenceNumber) {
*next_sequence = sequence;
} else {
no_prev_seq = false;
WriteBatchInternal::SetSequence(&batch, *next_sequence);
}
}
#ifndef ROCKSDB_LITE
@ -1590,8 +1593,10 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
// after replaying the file, this file may be a stale file. We ignore
// sequence IDs from the file. Otherwise, if a newer stale log file that
// has been deleted, the sequenceID may be wrong.
if (no_prev_seq && !has_valid_writes) {
*next_sequence = kMaxSequenceNumber;
if (db_options_.allow_2pc) {
if (no_prev_seq && !has_valid_writes) {
*next_sequence = kMaxSequenceNumber;
}
}
MaybeIgnoreError(&status);
if (!status.ok()) {
@ -1701,7 +1706,7 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
// recovered and should be ignored on next reincarnation.
// Since we already recovered max_log_number, we want all logs
// with numbers `<= max_log_number` (includes this one) to be ignored
if (flushed) {
if (flushed || cfd->mem()->GetFirstSequenceNumber() == 0) {
edit->SetLogNumber(max_log_number + 1);
}
// we must mark the next log number as used, even though it's
@ -2105,6 +2110,10 @@ Status DBImpl::CompactFiles(
{
InstrumentedMutexLock l(&mutex_);
// This call will unlock/lock the mutex to wait for current running
// AddFile() calls to finish.
WaitForAddFile();
s = CompactFilesImpl(compact_options, cfd, sv->current,
input_file_names, output_level,
output_path_id, &job_context, &log_buffer);
@ -2696,6 +2705,8 @@ Status DBImpl::RunManualCompaction(ColumnFamilyData* cfd, int input_level,
manual.end = &end_storage;
}
TEST_SYNC_POINT("DBImpl::RunManualCompaction:0");
TEST_SYNC_POINT("DBImpl::RunManualCompaction:1");
InstrumentedMutexLock l(&mutex_);
// When a manual compaction arrives, temporarily disable scheduling of
@ -2764,6 +2775,10 @@ Status DBImpl::RunManualCompaction(ColumnFamilyData* cfd, int input_level,
ca->m = &manual;
manual.incomplete = false;
bg_compaction_scheduled_++;
// manual.compaction will be added to running_compactions_ and erased
// inside BackgroundCompaction() but we need to put it now since we
// will unlock the mutex.
running_compactions_.insert(manual.compaction);
env_->Schedule(&DBImpl::BGWorkCompaction, ca, Env::Priority::LOW, this,
&DBImpl::UnscheduleCallback);
scheduled = true;
@ -3186,6 +3201,11 @@ void DBImpl::BackgroundCallCompaction(void* arg) {
LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, db_options_.info_log.get());
{
InstrumentedMutexLock l(&mutex_);
// This call will unlock/lock the mutex to wait for current running
// AddFile() calls to finish.
WaitForAddFile();
num_running_compactions_++;
auto pending_outputs_inserted_elem =
@ -3597,6 +3617,11 @@ void DBImpl::RemoveManualCompaction(DBImpl::ManualCompaction* m) {
}
bool DBImpl::ShouldntRunManualCompaction(ManualCompaction* m) {
if (num_running_addfile_ > 0) {
// We need to wait for other AddFile() calls to finish
// before running a manual compaction.
return true;
}
if (m->exclusive) {
return (bg_compaction_scheduled_ > 0);
}

View File

@ -620,7 +620,7 @@ class DBImpl : public DB {
// REQUIRES: log_numbers are sorted in ascending order
Status RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
SequenceNumber* max_sequence, bool read_only);
SequenceNumber* next_sequence, bool read_only);
// The following two methods are used to flush a memtable to
// storage. The first one is used at database RecoveryTime (when the
@ -650,15 +650,24 @@ class DBImpl : public DB {
int PickLevelForIngestedFile(ColumnFamilyData* cfd,
const ExternalSstFileInfo& file_info);
Status CompactFilesImpl(
const CompactionOptions& compact_options, ColumnFamilyData* cfd,
Version* version, const std::vector<std::string>& input_file_names,
const int output_level, int output_path_id, JobContext* job_context,
LogBuffer* log_buffer);
// Wait for current AddFile() calls to finish.
// REQUIRES: mutex_ held
void WaitForAddFile();
Status CompactFilesImpl(const CompactionOptions& compact_options,
ColumnFamilyData* cfd, Version* version,
const std::vector<std::string>& input_file_names,
const int output_level, int output_path_id,
JobContext* job_context, LogBuffer* log_buffer);
Status ReadExternalSstFileInfo(ColumnFamilyHandle* column_family,
const std::string& file_path,
ExternalSstFileInfo* file_info);
#else
// AddFile is not supported in ROCKSDB_LITE so this function
// will be no-op
void WaitForAddFile() {}
#endif // ROCKSDB_LITE
ColumnFamilyData* GetColumnFamilyDataByName(const std::string& cf_name);
@ -728,6 +737,7 @@ class DBImpl : public DB {
// * whenever bg_flush_scheduled_ or bg_purge_scheduled_ value decreases
// (i.e. whenever a flush is done, even if it didn't make any progress)
// * whenever there is an error in background purge, flush or compaction
// * whenever num_running_addfile_ goes to 0.
InstrumentedCondVar bg_cv_;
uint64_t logfile_number_;
std::deque<uint64_t>
@ -973,6 +983,10 @@ class DBImpl : public DB {
// REQUIRES: mutex held
std::unordered_set<Compaction*> running_compactions_;
// Number of running AddFile() calls.
// REQUIRES: mutex held
int num_running_addfile_;
#ifndef ROCKSDB_LITE
WalManager wal_manager_;
#endif // ROCKSDB_LITE

View File

@ -237,12 +237,16 @@ Status DBImpl::AddFile(ColumnFamilyHandle* column_family,
{
InstrumentedMutexLock l(&mutex_);
TEST_SYNC_POINT("DBImpl::AddFile:MutexLock");
const MutableCFOptions mutable_cf_options =
*cfd->GetLatestMutableCFOptions();
WriteThread::Writer w;
write_thread_.EnterUnbatched(&w, &mutex_);
num_running_addfile_++;
if (!skip_snapshot_check && !snapshots_.empty()) {
// Check that no snapshots are being held
status =
@ -316,7 +320,13 @@ Status DBImpl::AddFile(ColumnFamilyHandle* column_family,
ReleaseFileNumberFromPendingOutputs(
pending_outputs_inserted_elem_list[i]);
}
}
num_running_addfile_--;
if (num_running_addfile_ == 0) {
bg_cv_.SignalAll();
}
TEST_SYNC_POINT("DBImpl::AddFile:MutexUnlock");
} // mutex_ is unlocked here;
if (!status.ok()) {
// We failed to add the files to the database
@ -395,6 +405,13 @@ int DBImpl::PickLevelForIngestedFile(ColumnFamilyData* cfd,
return target_level;
}
void DBImpl::WaitForAddFile() {
mutex_.AssertHeld();
while (num_running_addfile_ > 0) {
bg_cv_.Wait();
}
}
#endif // ROCKSDB_LITE
} // namespace rocksdb

View File

@ -9,6 +9,7 @@
#include "db/db_test_util.h"
#include "port/stack_trace.h"
#include "util/fault_injection_test_env.h"
#include "util/options_helper.h"
#include "util/sync_point.h"
@ -292,6 +293,44 @@ TEST_F(DBWALTest, RecoveryWithEmptyLog) {
}
#ifndef ROCKSDB_LITE
TEST_F(DBWALTest, GetSortedWalFiles) {
do {
CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
VectorLogPtr log_files;
ASSERT_OK(dbfull()->GetSortedWalFiles(log_files));
ASSERT_EQ(0, log_files.size());
ASSERT_OK(Put(1, "foo", "v1"));
ASSERT_OK(dbfull()->GetSortedWalFiles(log_files));
ASSERT_EQ(1, log_files.size());
} while (ChangeOptions());
}
TEST_F(DBWALTest, RecoveryWithLogDataForSomeCFs) {
// Test for regression of WAL cleanup missing files that don't contain data
// for every column family.
do {
CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
ASSERT_OK(Put(1, "foo", "v1"));
ASSERT_OK(Put(1, "foo", "v2"));
uint64_t earliest_log_nums[2];
for (int i = 0; i < 2; ++i) {
if (i > 0) {
ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
}
VectorLogPtr log_files;
ASSERT_OK(dbfull()->GetSortedWalFiles(log_files));
if (log_files.size() > 0) {
earliest_log_nums[i] = log_files[0]->LogNumber();
} else {
earliest_log_nums[i] = port::kMaxUint64;
}
}
// Check at least the first WAL was cleaned up during the recovery.
ASSERT_LT(earliest_log_nums[0], earliest_log_nums[1]);
} while (ChangeOptions());
}
TEST_F(DBWALTest, RecoverWithLargeLog) {
do {
{
@ -471,6 +510,41 @@ TEST_F(DBWALTest, SyncMultipleLogs) {
ASSERT_OK(dbfull()->SyncWAL());
}
// Github issue 1339. Prior the fix we read sequence id from the first log to
// a local variable, then keep increase the variable as we replay logs,
// ignoring actual sequence id of the records. This is incorrect if some writes
// come with WAL disabled.
TEST_F(DBWALTest, PartOfWritesWithWALDisabled) {
std::unique_ptr<FaultInjectionTestEnv> fault_env(
new FaultInjectionTestEnv(env_));
Options options = CurrentOptions();
options.env = fault_env.get();
options.disable_auto_compactions = true;
// TODO(yiwu): fix for 2PC.
options.allow_2pc = false;
WriteOptions wal_on, wal_off;
wal_on.sync = true;
wal_on.disableWAL = false;
wal_off.disableWAL = true;
CreateAndReopenWithCF({"dummy"}, options);
ASSERT_OK(Put(1, "dummy", "d1", wal_on)); // seq id 1
ASSERT_OK(Put(1, "dummy", "d2", wal_off));
ASSERT_OK(Put(1, "dummy", "d3", wal_off));
ASSERT_OK(Put(0, "key", "v4", wal_on)); // seq id 4
ASSERT_OK(Flush(0));
ASSERT_OK(Put(0, "key", "v5", wal_on)); // seq id 5
ASSERT_EQ("v5", Get(0, "key"));
// Simulate a crash.
fault_env->SetFilesystemActive(false);
Close();
fault_env->ResetState();
ReopenWithColumnFamilies({"default", "dummy"}, options);
// Prior to the fix, we may incorrectly recover "v5" with sequence id = 3.
ASSERT_EQ("v5", Get(0, "key"));
// Destroy DB before destruct fault_env.
Destroy(options);
}
//
// Test WAL recovery for the various modes available
//

View File

@ -984,6 +984,138 @@ TEST_F(ExternalSSTFileTest, PickedLevel) {
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
}
TEST_F(ExternalSSTFileTest, PickedLevelBug) {
Options options = CurrentOptions();
options.disable_auto_compactions = false;
options.level0_file_num_compaction_trigger = 3;
options.num_levels = 2;
options.env = env_;
DestroyAndReopen(options);
std::vector<int> file_keys;
// file #1 in L0
file_keys = {0, 5, 7};
for (int k : file_keys) {
ASSERT_OK(Put(Key(k), Key(k)));
}
ASSERT_OK(Flush());
// file #2 in L0
file_keys = {4, 6, 8, 9};
for (int k : file_keys) {
ASSERT_OK(Put(Key(k), Key(k)));
}
ASSERT_OK(Flush());
// We have 2 overlapping files in L0
EXPECT_EQ(FilesPerLevel(), "2");
rocksdb::SyncPoint::GetInstance()->LoadDependency(
{{"DBImpl::AddFile:MutexLock", "ExternalSSTFileTest::PickedLevelBug:0"},
{"ExternalSSTFileTest::PickedLevelBug:1", "DBImpl::AddFile:MutexUnlock"},
{"ExternalSSTFileTest::PickedLevelBug:2",
"DBImpl::RunManualCompaction:0"},
{"ExternalSSTFileTest::PickedLevelBug:3",
"DBImpl::RunManualCompaction:1"}});
std::atomic<bool> bg_compact_started(false);
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::BackgroundCompaction:Start",
[&](void* arg) { bg_compact_started.store(true); });
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
// While writing the MANIFEST start a thread that will ask for compaction
std::thread bg_compact([&]() {
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
});
TEST_SYNC_POINT("ExternalSSTFileTest::PickedLevelBug:2");
// Start a thread that will ingest a new file
std::thread bg_addfile([&]() {
file_keys = {1, 2, 3};
ASSERT_OK(GenerateAndAddExternalFile(options, file_keys, 1));
});
// Wait for AddFile to start picking levels and writing MANIFEST
TEST_SYNC_POINT("ExternalSSTFileTest::PickedLevelBug:0");
TEST_SYNC_POINT("ExternalSSTFileTest::PickedLevelBug:3");
// We need to verify that no compactions can run while AddFile is
// ingesting the files into the levels it find suitable. So we will
// wait for 2 seconds to give a chance for compactions to run during
// this period, and then make sure that no compactions where able to run
env_->SleepForMicroseconds(1000000 * 2);
ASSERT_FALSE(bg_compact_started.load());
// Hold AddFile from finishing writing the MANIFEST
TEST_SYNC_POINT("ExternalSSTFileTest::PickedLevelBug:1");
bg_addfile.join();
bg_compact.join();
dbfull()->TEST_WaitForCompact();
int total_keys = 0;
Iterator* iter = db_->NewIterator(ReadOptions());
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
ASSERT_OK(iter->status());
total_keys++;
}
ASSERT_EQ(total_keys, 10);
delete iter;
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
}
TEST_F(ExternalSSTFileTest, CompactDuringAddFileRandom) {
Options options = CurrentOptions();
options.disable_auto_compactions = false;
options.level0_file_num_compaction_trigger = 2;
options.num_levels = 2;
options.env = env_;
DestroyAndReopen(options);
std::function<void()> bg_compact = [&]() {
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
};
int range_id = 0;
std::vector<int> file_keys;
std::function<void()> bg_addfile = [&]() {
ASSERT_OK(GenerateAndAddExternalFile(options, file_keys, range_id));
};
std::vector<std::thread> threads;
while (range_id < 5000) {
int range_start = (range_id * 20);
int range_end = range_start + 10;
file_keys.clear();
for (int k = range_start + 1; k < range_end; k++) {
file_keys.push_back(k);
}
ASSERT_OK(Put(Key(range_start), Key(range_start)));
ASSERT_OK(Put(Key(range_end), Key(range_end)));
ASSERT_OK(Flush());
if (range_id % 10 == 0) {
threads.emplace_back(bg_compact);
}
threads.emplace_back(bg_addfile);
for (auto& t : threads) {
t.join();
}
threads.clear();
range_id++;
}
}
TEST_F(ExternalSSTFileTest, PickedLevelDynamic) {
Options options = CurrentOptions();
options.disable_auto_compactions = false;

View File

@ -383,7 +383,7 @@ Status WalManager::ReadFirstRecord(const WalFileType type,
Status s;
if (type == kAliveLogFile) {
std::string fname = LogFileName(db_options_.wal_dir, number);
s = ReadFirstLine(fname, sequence);
s = ReadFirstLine(fname, number, sequence);
if (env_->FileExists(fname).ok() && !s.ok()) {
// return any error that is not caused by non-existing file
return s;
@ -394,7 +394,7 @@ Status WalManager::ReadFirstRecord(const WalFileType type,
// check if the file got moved to archive.
std::string archived_file =
ArchivedLogFileName(db_options_.wal_dir, number);
s = ReadFirstLine(archived_file, sequence);
s = ReadFirstLine(archived_file, number, sequence);
// maybe the file was deleted from archive dir. If that's the case, return
// Status::OK(). The caller with identify this as empty file because
// *sequence == 0
@ -413,6 +413,7 @@ Status WalManager::ReadFirstRecord(const WalFileType type,
// the function returns status.ok() and sequence == 0 if the file exists, but is
// empty
Status WalManager::ReadFirstLine(const std::string& fname,
const uint64_t number,
SequenceNumber* sequence) {
struct LogReporter : public log::Reader::Reporter {
Env* env;
@ -449,7 +450,7 @@ Status WalManager::ReadFirstLine(const std::string& fname,
reporter.status = &status;
reporter.ignore_error = !db_options_.paranoid_checks;
log::Reader reader(db_options_.info_log, std::move(file_reader), &reporter,
true /*checksum*/, 0 /*initial_offset*/, *sequence);
true /*checksum*/, 0 /*initial_offset*/, number);
std::string scratch;
Slice record;

View File

@ -54,9 +54,9 @@ class WalManager {
return ReadFirstRecord(type, number, sequence);
}
Status TEST_ReadFirstLine(const std::string& fname,
Status TEST_ReadFirstLine(const std::string& fname, const uint64_t number,
SequenceNumber* sequence) {
return ReadFirstLine(fname, sequence);
return ReadFirstLine(fname, number, sequence);
}
private:
@ -71,7 +71,8 @@ class WalManager {
Status ReadFirstRecord(const WalFileType type, const uint64_t number,
SequenceNumber* sequence);
Status ReadFirstLine(const std::string& fname, SequenceNumber* sequence);
Status ReadFirstLine(const std::string& fname, const uint64_t number,
SequenceNumber* sequence);
// ------- state from DBImpl ------
const DBOptions& db_options_;

View File

@ -119,10 +119,11 @@ TEST_F(WalManagerTest, ReadFirstRecordCache) {
ASSERT_OK(env_->NewWritableFile(path, &file, EnvOptions()));
SequenceNumber s;
ASSERT_OK(wal_manager_->TEST_ReadFirstLine(path, &s));
ASSERT_OK(wal_manager_->TEST_ReadFirstLine(path, 1 /* number */, &s));
ASSERT_EQ(s, 0U);
ASSERT_OK(wal_manager_->TEST_ReadFirstRecord(kAliveLogFile, 1, &s));
ASSERT_OK(
wal_manager_->TEST_ReadFirstRecord(kAliveLogFile, 1 /* number */, &s));
ASSERT_EQ(s, 0U);
unique_ptr<WritableFileWriter> file_writer(

View File

@ -106,7 +106,7 @@ class HdfsEnv : public Env {
}
virtual int UnSchedule(void* tag, Priority pri) {
posixEnv->UnSchedule(tag, pri);
return posixEnv->UnSchedule(tag, pri);
}
virtual void StartThread(void (*function)(void* arg), void* arg) {

View File

@ -9,7 +9,9 @@
#include <stdint.h>
#include <memory>
#include <string>
#include "rocksdb/env.h"
#include "rocksdb/slice.h"
#include "rocksdb/statistics.h"
#include "rocksdb/status.h"
@ -46,4 +48,10 @@ class PersistentCache {
virtual bool IsCompressed() = 0;
};
// Factor method to create a new persistent cache
Status NewPersistentCache(Env* const env, const std::string& path,
const uint64_t size,
const std::shared_ptr<Logger>& log,
const bool optimized_for_nvm,
std::shared_ptr<PersistentCache>* cache);
} // namespace rocksdb

View File

@ -137,7 +137,7 @@ clean:
$(AM_V_at)rm -rf $(SAMPLES_OUTPUT)
javadocs:
javadocs: java
$(AM_V_GEN)mkdir -p $(JAVADOC)
$(AM_V_at)javadoc -d $(JAVADOC) -sourcepath $(MAIN_SRC) -subpackages org
@ -176,7 +176,7 @@ resolve_test_deps:
test -s "$(JAVA_CGLIB_JAR)" || cp $(MVN_LOCAL)/cglib/cglib/2.2.2/cglib-2.2.2.jar $(JAVA_TEST_LIBDIR) || curl -k -L -o "$(JAVA_CGLIB_JAR)" http://search.maven.org/remotecontent?filepath=cglib/cglib/2.2.2/cglib-2.2.2.jar
test -s "$(JAVA_ASSERTJ_JAR)" || cp $(MVN_LOCAL)/org/assertj/assertj-core/1.7.1/assertj-core-1.7.1.jar $(JAVA_TEST_LIBDIR) || curl -k -L -o "$(JAVA_ASSERTJ_JAR)" http://central.maven.org/maven2/org/assertj/assertj-core/1.7.1/assertj-core-1.7.1.jar
java_test: resolve_test_deps
java_test: java resolve_test_deps
$(AM_V_GEN)mkdir -p $(TEST_CLASSES)
$(AM_V_at)javac -cp $(MAIN_CLASSES):$(JAVA_TESTCLASSPATH) -d $(TEST_CLASSES)\
$(TEST_SRC)/org/rocksdb/test/*.java\
@ -184,7 +184,7 @@ java_test: resolve_test_deps
$(TEST_SRC)/org/rocksdb/*.java
$(AM_V_at)javah -cp $(MAIN_CLASSES):$(TEST_CLASSES) -d $(NATIVE_INCLUDE) -jni $(NATIVE_JAVA_TEST_CLASSES)
test: java resolve_test_deps java_test run_test
test: java java_test run_test
run_test:
java -ea -Xcheck:jni -Djava.library.path=target -cp "$(MAIN_CLASSES):$(TEST_CLASSES):$(JAVA_TESTCLASSPATH):target/*" org.rocksdb.test.RocksJunitRunner $(JAVA_TESTS)

View File

@ -4371,6 +4371,20 @@ jlong Java_org_rocksdb_DBOptions_writeThreadSlowYieldUsec(
write_thread_slow_yield_usec;
}
void Java_org_rocksdb_DBOptions_setDelayedWriteRate(
JNIEnv* env, jobject jobj, jlong jhandle, jlong delay_write_rate){
reinterpret_cast<rocksdb::DBOptions*>(jhandle)->
delayed_write_rate = static_cast<int64_t>(delay_write_rate);
}
jlong Java_org_rocksdb_DBOptions_delayedWriteRate(
JNIEnv* env, jobject jobj, jlong jhandle){
return reinterpret_cast<rocksdb::DBOptions*>(jhandle)->
delayed_write_rate;
}
//////////////////////////////////////////////////////////////////////////////
// rocksdb::WriteOptions

View File

@ -204,16 +204,19 @@ jobjectArray Java_org_rocksdb_RocksDB_listColumnFamilies(
//////////////////////////////////////////////////////////////////////////////
// rocksdb::DB::Put
void rocksdb_put_helper(
JNIEnv* env, rocksdb::DB* db, const rocksdb::WriteOptions& write_options,
rocksdb::ColumnFamilyHandle* cf_handle, jbyteArray jkey, jint jkey_len,
jbyteArray jentry_value, jint jentry_value_len) {
void rocksdb_put_helper(JNIEnv* env, rocksdb::DB* db,
const rocksdb::WriteOptions& write_options,
rocksdb::ColumnFamilyHandle* cf_handle, jbyteArray jkey,
jint jkey_off, jint jkey_len, jbyteArray jval,
jint jval_off, jint jval_len) {
jbyte* key = new jbyte[jkey_len];
env->GetByteArrayRegion(jkey, jkey_off, jkey_len, key);
jbyte* value = new jbyte[jval_len];
env->GetByteArrayRegion(jval, jval_off, jval_len, value);
jbyte* key = env->GetByteArrayElements(jkey, 0);
jbyte* value = env->GetByteArrayElements(jentry_value, 0);
rocksdb::Slice key_slice(reinterpret_cast<char*>(key), jkey_len);
rocksdb::Slice value_slice(reinterpret_cast<char*>(value),
jentry_value_len);
rocksdb::Slice value_slice(reinterpret_cast<char*>(value), jval_len);
rocksdb::Status s;
if (cf_handle != nullptr) {
@ -223,11 +226,9 @@ void rocksdb_put_helper(
s = db->Put(write_options, key_slice, value_slice);
}
// trigger java unref on key and value.
// by passing JNI_ABORT, it will simply release the reference without
// copying the result back to the java byte array.
env->ReleaseByteArrayElements(jkey, key, JNI_ABORT);
env->ReleaseByteArrayElements(jentry_value, value, JNI_ABORT);
// cleanup
delete [] value;
delete [] key;
if (s.ok()) {
return;
@ -238,36 +239,39 @@ void rocksdb_put_helper(
/*
* Class: org_rocksdb_RocksDB
* Method: put
* Signature: (J[BI[BI)V
* Signature: (J[BII[BII)V
*/
void Java_org_rocksdb_RocksDB_put__J_3BI_3BI(
JNIEnv* env, jobject jdb, jlong jdb_handle,
jbyteArray jkey, jint jkey_len,
jbyteArray jentry_value, jint jentry_value_len) {
void Java_org_rocksdb_RocksDB_put__J_3BII_3BII(JNIEnv* env, jobject jdb,
jlong jdb_handle,
jbyteArray jkey, jint jkey_off,
jint jkey_len, jbyteArray jval,
jint jval_off, jint jval_len) {
auto db = reinterpret_cast<rocksdb::DB*>(jdb_handle);
static const rocksdb::WriteOptions default_write_options =
rocksdb::WriteOptions();
rocksdb_put_helper(env, db, default_write_options, nullptr,
jkey, jkey_len,
jentry_value, jentry_value_len);
rocksdb_put_helper(env, db, default_write_options, nullptr, jkey, jkey_off,
jkey_len, jval, jval_off, jval_len);
}
/*
* Class: org_rocksdb_RocksDB
* Method: put
* Signature: (J[BI[BIJ)V
* Signature: (J[BII[BIIJ)V
*/
void Java_org_rocksdb_RocksDB_put__J_3BI_3BIJ(
JNIEnv* env, jobject jdb, jlong jdb_handle,
jbyteArray jkey, jint jkey_len,
jbyteArray jentry_value, jint jentry_value_len, jlong jcf_handle) {
void Java_org_rocksdb_RocksDB_put__J_3BII_3BIIJ(JNIEnv* env, jobject jdb,
jlong jdb_handle,
jbyteArray jkey, jint jkey_off,
jint jkey_len, jbyteArray jval,
jint jval_off, jint jval_len,
jlong jcf_handle) {
auto db = reinterpret_cast<rocksdb::DB*>(jdb_handle);
static const rocksdb::WriteOptions default_write_options =
rocksdb::WriteOptions();
auto cf_handle = reinterpret_cast<rocksdb::ColumnFamilyHandle*>(jcf_handle);
if (cf_handle != nullptr) {
rocksdb_put_helper(env, db, default_write_options, cf_handle,
jkey, jkey_len, jentry_value, jentry_value_len);
rocksdb_put_helper(env, db, default_write_options, cf_handle, jkey,
jkey_off, jkey_len, jval, jval_off, jval_len);
} else {
rocksdb::RocksDBExceptionJni::ThrowNew(env,
rocksdb::Status::InvalidArgument("Invalid ColumnFamilyHandle."));
@ -277,39 +281,38 @@ void Java_org_rocksdb_RocksDB_put__J_3BI_3BIJ(
/*
* Class: org_rocksdb_RocksDB
* Method: put
* Signature: (JJ[BI[BI)V
* Signature: (JJ[BII[BII)V
*/
void Java_org_rocksdb_RocksDB_put__JJ_3BI_3BI(
JNIEnv* env, jobject jdb,
jlong jdb_handle, jlong jwrite_options_handle,
jbyteArray jkey, jint jkey_len,
jbyteArray jentry_value, jint jentry_value_len) {
void Java_org_rocksdb_RocksDB_put__JJ_3BII_3BII(JNIEnv* env, jobject jdb,
jlong jdb_handle,
jlong jwrite_options_handle,
jbyteArray jkey, jint jkey_off,
jint jkey_len, jbyteArray jval,
jint jval_off, jint jval_len) {
auto db = reinterpret_cast<rocksdb::DB*>(jdb_handle);
auto write_options = reinterpret_cast<rocksdb::WriteOptions*>(
jwrite_options_handle);
rocksdb_put_helper(env, db, *write_options, nullptr,
jkey, jkey_len,
jentry_value, jentry_value_len);
rocksdb_put_helper(env, db, *write_options, nullptr, jkey, jkey_off, jkey_len,
jval, jval_off, jval_len);
}
/*
* Class: org_rocksdb_RocksDB
* Method: put
* Signature: (JJ[BI[BIJ)V
* Signature: (JJ[BII[BIIJ)V
*/
void Java_org_rocksdb_RocksDB_put__JJ_3BI_3BIJ(
JNIEnv* env, jobject jdb,
jlong jdb_handle, jlong jwrite_options_handle,
jbyteArray jkey, jint jkey_len,
jbyteArray jentry_value, jint jentry_value_len, jlong jcf_handle) {
void Java_org_rocksdb_RocksDB_put__JJ_3BII_3BIIJ(
JNIEnv* env, jobject jdb, jlong jdb_handle, jlong jwrite_options_handle,
jbyteArray jkey, jint jkey_off, jint jkey_len, jbyteArray jval,
jint jval_off, jint jval_len, jlong jcf_handle) {
auto db = reinterpret_cast<rocksdb::DB*>(jdb_handle);
auto write_options = reinterpret_cast<rocksdb::WriteOptions*>(
jwrite_options_handle);
auto cf_handle = reinterpret_cast<rocksdb::ColumnFamilyHandle*>(jcf_handle);
if (cf_handle != nullptr) {
rocksdb_put_helper(env, db, *write_options, cf_handle,
jkey, jkey_len, jentry_value, jentry_value_len);
rocksdb_put_helper(env, db, *write_options, cf_handle, jkey, jkey_off,
jkey_len, jval, jval_off, jval_len);
} else {
rocksdb::RocksDBExceptionJni::ThrowNew(env,
rocksdb::Status::InvalidArgument("Invalid ColumnFamilyHandle."));
@ -363,12 +366,12 @@ void Java_org_rocksdb_RocksDB_write1(
// rocksdb::DB::KeyMayExist
jboolean key_may_exist_helper(JNIEnv* env, rocksdb::DB* db,
const rocksdb::ReadOptions& read_opt,
rocksdb::ColumnFamilyHandle* cf_handle, jbyteArray jkey, jint jkey_len,
jobject jstring_buffer) {
rocksdb::ColumnFamilyHandle* cf_handle, jbyteArray jkey, jint jkey_off,
jint jkey_len, jobject jstring_buffer) {
std::string value;
bool value_found = false;
jboolean isCopy;
jbyte* key = env->GetByteArrayElements(jkey, &isCopy);
jbyte* key = new jbyte[jkey_len];
env->GetByteArrayRegion(jkey, jkey_off, jkey_len, key);
rocksdb::Slice key_slice(reinterpret_cast<char*>(key), jkey_len);
bool keyMayExist;
if (cf_handle != nullptr) {
@ -379,6 +382,10 @@ jboolean key_may_exist_helper(JNIEnv* env, rocksdb::DB* db,
&value, &value_found);
}
// cleanup
delete [] key;
// extract the value
if (value_found && !value.empty()) {
jclass clazz = env->GetObjectClass(jstring_buffer);
jmethodID mid = env->GetMethodID(clazz, "append",
@ -386,37 +393,36 @@ jboolean key_may_exist_helper(JNIEnv* env, rocksdb::DB* db,
jstring new_value_str = env->NewStringUTF(value.c_str());
env->CallObjectMethod(jstring_buffer, mid, new_value_str);
}
env->ReleaseByteArrayElements(jkey, key, JNI_ABORT);
return static_cast<jboolean>(keyMayExist);
}
/*
* Class: org_rocksdb_RocksDB
* Method: keyMayExist
* Signature: (J[BILjava/lang/StringBuffer;)Z
* Signature: (J[BIILjava/lang/StringBuffer;)Z
*/
jboolean Java_org_rocksdb_RocksDB_keyMayExist__J_3BILjava_lang_StringBuffer_2(
JNIEnv* env, jobject jdb, jlong jdb_handle, jbyteArray jkey, jint jkey_len,
jobject jstring_buffer) {
jboolean Java_org_rocksdb_RocksDB_keyMayExist__J_3BIILjava_lang_StringBuffer_2(
JNIEnv* env, jobject jdb, jlong jdb_handle, jbyteArray jkey, jint jkey_off,
jint jkey_len, jobject jstring_buffer) {
auto* db = reinterpret_cast<rocksdb::DB*>(jdb_handle);
return key_may_exist_helper(env, db, rocksdb::ReadOptions(),
nullptr, jkey, jkey_len, jstring_buffer);
nullptr, jkey, jkey_off, jkey_len, jstring_buffer);
}
/*
* Class: org_rocksdb_RocksDB
* Method: keyMayExist
* Signature: (J[BIJLjava/lang/StringBuffer;)Z
* Signature: (J[BIIJLjava/lang/StringBuffer;)Z
*/
jboolean Java_org_rocksdb_RocksDB_keyMayExist__J_3BIJLjava_lang_StringBuffer_2(
JNIEnv* env, jobject jdb, jlong jdb_handle, jbyteArray jkey, jint jkey_len,
jlong jcf_handle, jobject jstring_buffer) {
jboolean Java_org_rocksdb_RocksDB_keyMayExist__J_3BIIJLjava_lang_StringBuffer_2(
JNIEnv* env, jobject jdb, jlong jdb_handle, jbyteArray jkey, jint jkey_off,
jint jkey_len, jlong jcf_handle, jobject jstring_buffer) {
auto* db = reinterpret_cast<rocksdb::DB*>(jdb_handle);
auto* cf_handle = reinterpret_cast<rocksdb::ColumnFamilyHandle*>(
jcf_handle);
if (cf_handle != nullptr) {
return key_may_exist_helper(env, db, rocksdb::ReadOptions(),
cf_handle, jkey, jkey_len, jstring_buffer);
cf_handle, jkey, jkey_off, jkey_len, jstring_buffer);
} else {
rocksdb::RocksDBExceptionJni::ThrowNew(env,
rocksdb::Status::InvalidArgument("Invalid ColumnFamilyHandle."));
@ -427,26 +433,27 @@ jboolean Java_org_rocksdb_RocksDB_keyMayExist__J_3BIJLjava_lang_StringBuffer_2(
/*
* Class: org_rocksdb_RocksDB
* Method: keyMayExist
* Signature: (JJ[BILjava/lang/StringBuffer;)Z
* Signature: (JJ[BIILjava/lang/StringBuffer;)Z
*/
jboolean Java_org_rocksdb_RocksDB_keyMayExist__JJ_3BILjava_lang_StringBuffer_2(
jboolean Java_org_rocksdb_RocksDB_keyMayExist__JJ_3BIILjava_lang_StringBuffer_2(
JNIEnv* env, jobject jdb, jlong jdb_handle, jlong jread_options_handle,
jbyteArray jkey, jint jkey_len, jobject jstring_buffer) {
jbyteArray jkey, jint jkey_off, jint jkey_len, jobject jstring_buffer) {
auto* db = reinterpret_cast<rocksdb::DB*>(jdb_handle);
auto& read_options = *reinterpret_cast<rocksdb::ReadOptions*>(
jread_options_handle);
return key_may_exist_helper(env, db, read_options,
nullptr, jkey, jkey_len, jstring_buffer);
nullptr, jkey, jkey_off, jkey_len, jstring_buffer);
}
/*
* Class: org_rocksdb_RocksDB
* Method: keyMayExist
* Signature: (JJ[BIJLjava/lang/StringBuffer;)Z
* Signature: (JJ[BIIJLjava/lang/StringBuffer;)Z
*/
jboolean Java_org_rocksdb_RocksDB_keyMayExist__JJ_3BIJLjava_lang_StringBuffer_2(
jboolean Java_org_rocksdb_RocksDB_keyMayExist__JJ_3BIIJLjava_lang_StringBuffer_2(
JNIEnv* env, jobject jdb, jlong jdb_handle, jlong jread_options_handle,
jbyteArray jkey, jint jkey_len, jlong jcf_handle, jobject jstring_buffer) {
jbyteArray jkey, jint jkey_off, jint jkey_len, jlong jcf_handle,
jobject jstring_buffer) {
auto* db = reinterpret_cast<rocksdb::DB*>(jdb_handle);
auto& read_options = *reinterpret_cast<rocksdb::ReadOptions*>(
jread_options_handle);
@ -454,7 +461,7 @@ jboolean Java_org_rocksdb_RocksDB_keyMayExist__JJ_3BIJLjava_lang_StringBuffer_2(
jcf_handle);
if (cf_handle != nullptr) {
return key_may_exist_helper(env, db, read_options, cf_handle,
jkey, jkey_len, jstring_buffer);
jkey, jkey_off, jkey_len, jstring_buffer);
} else {
rocksdb::RocksDBExceptionJni::ThrowNew(env,
rocksdb::Status::InvalidArgument("Invalid ColumnFamilyHandle."));
@ -468,9 +475,10 @@ jboolean Java_org_rocksdb_RocksDB_keyMayExist__JJ_3BIJLjava_lang_StringBuffer_2(
jbyteArray rocksdb_get_helper(
JNIEnv* env, rocksdb::DB* db, const rocksdb::ReadOptions& read_opt,
rocksdb::ColumnFamilyHandle* column_family_handle, jbyteArray jkey,
jint jkey_len) {
jboolean isCopy;
jbyte* key = env->GetByteArrayElements(jkey, &isCopy);
jint jkey_off, jint jkey_len) {
jbyte* key = new jbyte[jkey_len];
env->GetByteArrayRegion(jkey, jkey_off, jkey_len, key);
rocksdb::Slice key_slice(
reinterpret_cast<char*>(key), jkey_len);
@ -483,10 +491,8 @@ jbyteArray rocksdb_get_helper(
s = db->Get(read_opt, key_slice, &value);
}
// trigger java unref on key.
// by passing JNI_ABORT, it will simply release the reference without
// copying the result back to the java byte array.
env->ReleaseByteArrayElements(jkey, key, JNI_ABORT);
// cleanup
delete [] key;
if (s.IsNotFound()) {
return nullptr;
@ -506,30 +512,30 @@ jbyteArray rocksdb_get_helper(
/*
* Class: org_rocksdb_RocksDB
* Method: get
* Signature: (J[BI)[B
* Signature: (J[BII)[B
*/
jbyteArray Java_org_rocksdb_RocksDB_get__J_3BI(
jbyteArray Java_org_rocksdb_RocksDB_get__J_3BII(
JNIEnv* env, jobject jdb, jlong jdb_handle,
jbyteArray jkey, jint jkey_len) {
jbyteArray jkey, jint jkey_off, jint jkey_len) {
return rocksdb_get_helper(env,
reinterpret_cast<rocksdb::DB*>(jdb_handle),
rocksdb::ReadOptions(), nullptr,
jkey, jkey_len);
jkey, jkey_off, jkey_len);
}
/*
* Class: org_rocksdb_RocksDB
* Method: get
* Signature: (J[BIJ)[B
* Signature: (J[BIIJ)[B
*/
jbyteArray Java_org_rocksdb_RocksDB_get__J_3BIJ(
jbyteArray Java_org_rocksdb_RocksDB_get__J_3BIIJ(
JNIEnv* env, jobject jdb, jlong jdb_handle,
jbyteArray jkey, jint jkey_len, jlong jcf_handle) {
jbyteArray jkey, jint jkey_off, jint jkey_len, jlong jcf_handle) {
auto db_handle = reinterpret_cast<rocksdb::DB*>(jdb_handle);
auto cf_handle = reinterpret_cast<rocksdb::ColumnFamilyHandle*>(jcf_handle);
if (cf_handle != nullptr) {
return rocksdb_get_helper(env, db_handle, rocksdb::ReadOptions(),
cf_handle, jkey, jkey_len);
cf_handle, jkey, jkey_off, jkey_len);
} else {
rocksdb::RocksDBExceptionJni::ThrowNew(env,
rocksdb::Status::InvalidArgument("Invalid ColumnFamilyHandle."));
@ -541,31 +547,31 @@ jbyteArray Java_org_rocksdb_RocksDB_get__J_3BIJ(
/*
* Class: org_rocksdb_RocksDB
* Method: get
* Signature: (JJ[BI)[B
* Signature: (JJ[BII)[B
*/
jbyteArray Java_org_rocksdb_RocksDB_get__JJ_3BI(
jbyteArray Java_org_rocksdb_RocksDB_get__JJ_3BII(
JNIEnv* env, jobject jdb, jlong jdb_handle, jlong jropt_handle,
jbyteArray jkey, jint jkey_len) {
jbyteArray jkey, jint jkey_off, jint jkey_len) {
return rocksdb_get_helper(env,
reinterpret_cast<rocksdb::DB*>(jdb_handle),
*reinterpret_cast<rocksdb::ReadOptions*>(jropt_handle), nullptr,
jkey, jkey_len);
jkey, jkey_off, jkey_len);
}
/*
* Class: org_rocksdb_RocksDB
* Method: get
* Signature: (JJ[BIJ)[B
* Signature: (JJ[BIIJ)[B
*/
jbyteArray Java_org_rocksdb_RocksDB_get__JJ_3BIJ(
jbyteArray Java_org_rocksdb_RocksDB_get__JJ_3BIIJ(
JNIEnv* env, jobject jdb, jlong jdb_handle, jlong jropt_handle,
jbyteArray jkey, jint jkey_len, jlong jcf_handle) {
jbyteArray jkey, jint jkey_off, jint jkey_len, jlong jcf_handle) {
auto db_handle = reinterpret_cast<rocksdb::DB*>(jdb_handle);
auto& ro_opt = *reinterpret_cast<rocksdb::ReadOptions*>(jropt_handle);
auto cf_handle = reinterpret_cast<rocksdb::ColumnFamilyHandle*>(jcf_handle);
if (cf_handle != nullptr) {
return rocksdb_get_helper(env, db_handle, ro_opt, cf_handle,
jkey, jkey_len);
jkey, jkey_off, jkey_len);
} else {
rocksdb::RocksDBExceptionJni::ThrowNew(env,
rocksdb::Status::InvalidArgument("Invalid ColumnFamilyHandle."));
@ -574,14 +580,16 @@ jbyteArray Java_org_rocksdb_RocksDB_get__JJ_3BIJ(
}
}
jint rocksdb_get_helper(
JNIEnv* env, rocksdb::DB* db, const rocksdb::ReadOptions& read_options,
rocksdb::ColumnFamilyHandle* column_family_handle, jbyteArray jkey,
jint jkey_len, jbyteArray jentry_value, jint jentry_value_len) {
jint rocksdb_get_helper(JNIEnv* env, rocksdb::DB* db,
const rocksdb::ReadOptions& read_options,
rocksdb::ColumnFamilyHandle* column_family_handle,
jbyteArray jkey, jint jkey_off, jint jkey_len,
jbyteArray jval, jint jval_off, jint jval_len) {
static const int kNotFound = -1;
static const int kStatusError = -2;
jbyte* key = env->GetByteArrayElements(jkey, 0);
jbyte* key = new jbyte[jkey_len];
env->GetByteArrayRegion(jkey, jkey_off, jkey_len, key);
rocksdb::Slice key_slice(
reinterpret_cast<char*>(key), jkey_len);
@ -596,10 +604,8 @@ jint rocksdb_get_helper(
s = db->Get(read_options, key_slice, &cvalue);
}
// trigger java unref on key.
// by passing JNI_ABORT, it will simply release the reference without
// copying the result back to the java byte array.
env->ReleaseByteArrayElements(jkey, key, JNI_ABORT);
// cleanup
delete [] key;
if (s.IsNotFound()) {
return kNotFound;
@ -617,17 +623,17 @@ jint rocksdb_get_helper(
}
jint cvalue_len = static_cast<jint>(cvalue.size());
jint length = std::min(jentry_value_len, cvalue_len);
jint length = std::min(jval_len, cvalue_len);
env->SetByteArrayRegion(
jentry_value, 0, length,
reinterpret_cast<const jbyte*>(cvalue.c_str()));
env->SetByteArrayRegion(jval, jval_off, length,
reinterpret_cast<const jbyte*>(cvalue.c_str()));
return cvalue_len;
}
// cf multi get
jobjectArray multi_get_helper(JNIEnv* env, jobject jdb, rocksdb::DB* db,
const rocksdb::ReadOptions& rOpt, jobjectArray jkeys,
jintArray jkey_offs, jintArray jkey_lens,
jlongArray jcolumn_family_handles) {
std::vector<rocksdb::ColumnFamilyHandle*> cf_handles;
if (jcolumn_family_handles != nullptr) {
@ -642,24 +648,35 @@ jobjectArray multi_get_helper(JNIEnv* env, jobject jdb, rocksdb::DB* db,
}
std::vector<rocksdb::Slice> keys;
std::vector<std::tuple<jbyteArray, jbyte*, jobject>> keys_to_free;
std::vector<std::pair<jbyte*, jobject>> keys_to_free;
jsize len_keys = env->GetArrayLength(jkeys);
if(env->EnsureLocalCapacity(len_keys) != 0) {
if (env->EnsureLocalCapacity(len_keys) != 0) {
// out of memory
return NULL;
}
for (int i = 0; i < len_keys; i++) {
jobject jk = env->GetObjectArrayElement(jkeys, i);
jbyteArray jk_ba = reinterpret_cast<jbyteArray>(jk);
jsize len_key = env->GetArrayLength(jk_ba);
jbyte* jk_val = env->GetByteArrayElements(jk_ba, NULL);
rocksdb::Slice key_slice(reinterpret_cast<char*>(jk_val), len_key);
jint* jkey_off = env->GetIntArrayElements(jkey_offs, NULL);
jint* jkey_len = env->GetIntArrayElements(jkey_lens, NULL);
for (int i = 0; i < len_keys; i++) {
jobject jkey = env->GetObjectArrayElement(jkeys, i);
jbyteArray jkey_ba = reinterpret_cast<jbyteArray>(jkey);
jint len_key = jkey_len[i];
jbyte* key = new jbyte[len_key];
env->GetByteArrayRegion(jkey_ba, jkey_off[i], len_key, key);
rocksdb::Slice key_slice(reinterpret_cast<char*>(key), len_key);
keys.push_back(key_slice);
keys_to_free.push_back(std::make_tuple(jk_ba, jk_val, jk));
keys_to_free.push_back(std::pair<jbyte*, jobject>(key, jkey));
}
// cleanup jkey_off and jken_len
env->ReleaseIntArrayElements(jkey_lens, jkey_len, JNI_ABORT);
env->ReleaseIntArrayElements(jkey_offs, jkey_off, JNI_ABORT);
std::vector<std::string> values;
std::vector<rocksdb::Status> s;
if (cf_handles.size() == 0) {
@ -669,15 +686,11 @@ jobjectArray multi_get_helper(JNIEnv* env, jobject jdb, rocksdb::DB* db,
}
// free up allocated byte arrays
for (std::vector<std::tuple<jbyteArray, jbyte*, jobject>>::size_type i = 0;
i < keys_to_free.size(); i++) {
jobject jk;
jbyteArray jk_ba;
jbyte* jk_val;
std::tie(jk_ba, jk_val, jk) = keys_to_free[i];
env->ReleaseByteArrayElements(jk_ba, jk_val, JNI_ABORT);
env->DeleteLocalRef(jk);
for (auto it = keys_to_free.begin(); it != keys_to_free.end(); ++it) {
delete [] it->first;
env->DeleteLocalRef(it->second);
}
keys_to_free.clear();
// prepare the results
jclass jcls_ba = env->FindClass("[B");
@ -703,80 +716,88 @@ jobjectArray multi_get_helper(JNIEnv* env, jobject jdb, rocksdb::DB* db,
/*
* Class: org_rocksdb_RocksDB
* Method: multiGet
* Signature: (J[[B)[[B
* Signature: (J[[B[I[I)[[B
*/
jobjectArray Java_org_rocksdb_RocksDB_multiGet__J_3_3B(
JNIEnv* env, jobject jdb, jlong jdb_handle, jobjectArray jkeys) {
jobjectArray Java_org_rocksdb_RocksDB_multiGet__J_3_3B_3I_3I(
JNIEnv* env, jobject jdb, jlong jdb_handle, jobjectArray jkeys,
jintArray jkey_offs, jintArray jkey_lens) {
return multi_get_helper(env, jdb, reinterpret_cast<rocksdb::DB*>(jdb_handle),
rocksdb::ReadOptions(), jkeys, nullptr);
rocksdb::ReadOptions(), jkeys, jkey_offs, jkey_lens, nullptr);
}
/*
* Class: org_rocksdb_RocksDB
* Method: multiGet
* Signature: (J[[B[J)[[B
* Signature: (J[[B[I[I[J)[[B
*/
jobjectArray Java_org_rocksdb_RocksDB_multiGet__J_3_3B_3J(
jobjectArray Java_org_rocksdb_RocksDB_multiGet__J_3_3B_3I_3I_3J(
JNIEnv* env, jobject jdb, jlong jdb_handle, jobjectArray jkeys,
jintArray jkey_offs, jintArray jkey_lens,
jlongArray jcolumn_family_handles) {
return multi_get_helper(env, jdb, reinterpret_cast<rocksdb::DB*>(jdb_handle),
rocksdb::ReadOptions(), jkeys, jcolumn_family_handles);
}
/*
* Class: org_rocksdb_RocksDB
* Method: multiGet
* Signature: (JJ[[B)[[B
*/
jobjectArray Java_org_rocksdb_RocksDB_multiGet__JJ_3_3B(
JNIEnv* env, jobject jdb, jlong jdb_handle, jlong jropt_handle,
jobjectArray jkeys) {
return multi_get_helper(env, jdb, reinterpret_cast<rocksdb::DB*>(jdb_handle),
*reinterpret_cast<rocksdb::ReadOptions*>(jropt_handle), jkeys, nullptr);
}
/*
* Class: org_rocksdb_RocksDB
* Method: multiGet
* Signature: (JJ[[B[J)[[B
*/
jobjectArray Java_org_rocksdb_RocksDB_multiGet__JJ_3_3B_3J(
JNIEnv* env, jobject jdb, jlong jdb_handle, jlong jropt_handle,
jobjectArray jkeys, jlongArray jcolumn_family_handles) {
return multi_get_helper(env, jdb, reinterpret_cast<rocksdb::DB*>(jdb_handle),
*reinterpret_cast<rocksdb::ReadOptions*>(jropt_handle), jkeys,
rocksdb::ReadOptions(), jkeys, jkey_offs, jkey_lens,
jcolumn_family_handles);
}
/*
* Class: org_rocksdb_RocksDB
* Method: get
* Signature: (J[BI[BI)I
* Method: multiGet
* Signature: (JJ[[B[I[I)[[B
*/
jint Java_org_rocksdb_RocksDB_get__J_3BI_3BI(
JNIEnv* env, jobject jdb, jlong jdb_handle,
jbyteArray jkey, jint jkey_len,
jbyteArray jentry_value, jint jentry_value_len) {
return rocksdb_get_helper(env,
reinterpret_cast<rocksdb::DB*>(jdb_handle),
rocksdb::ReadOptions(), nullptr,
jkey, jkey_len, jentry_value, jentry_value_len);
jobjectArray Java_org_rocksdb_RocksDB_multiGet__JJ_3_3B_3I_3I(
JNIEnv* env, jobject jdb, jlong jdb_handle, jlong jropt_handle,
jobjectArray jkeys, jintArray jkey_offs, jintArray jkey_lens) {
return multi_get_helper(env, jdb, reinterpret_cast<rocksdb::DB*>(jdb_handle),
*reinterpret_cast<rocksdb::ReadOptions*>(jropt_handle), jkeys, jkey_offs,
jkey_lens, nullptr);
}
/*
* Class: org_rocksdb_RocksDB
* Method: multiGet
* Signature: (JJ[[B[I[I[J)[[B
*/
jobjectArray Java_org_rocksdb_RocksDB_multiGet__JJ_3_3B_3I_3I_3J(
JNIEnv* env, jobject jdb, jlong jdb_handle, jlong jropt_handle,
jobjectArray jkeys, jintArray jkey_offs, jintArray jkey_lens,
jlongArray jcolumn_family_handles) {
return multi_get_helper(env, jdb, reinterpret_cast<rocksdb::DB*>(jdb_handle),
*reinterpret_cast<rocksdb::ReadOptions*>(jropt_handle), jkeys, jkey_offs,
jkey_lens, jcolumn_family_handles);
}
/*
* Class: org_rocksdb_RocksDB
* Method: get
* Signature: (J[BI[BIJ)I
* Signature: (J[BII[BII)I
*/
jint Java_org_rocksdb_RocksDB_get__J_3BI_3BIJ(
JNIEnv* env, jobject jdb, jlong jdb_handle,
jbyteArray jkey, jint jkey_len,
jbyteArray jentry_value, jint jentry_value_len, jlong jcf_handle) {
jint Java_org_rocksdb_RocksDB_get__J_3BII_3BII(JNIEnv* env, jobject jdb,
jlong jdb_handle,
jbyteArray jkey, jint jkey_off,
jint jkey_len, jbyteArray jval,
jint jval_off, jint jval_len) {
return rocksdb_get_helper(env, reinterpret_cast<rocksdb::DB*>(jdb_handle),
rocksdb::ReadOptions(), nullptr, jkey, jkey_off,
jkey_len, jval, jval_off, jval_len);
}
/*
* Class: org_rocksdb_RocksDB
* Method: get
* Signature: (J[BII[BIIJ)I
*/
jint Java_org_rocksdb_RocksDB_get__J_3BII_3BIIJ(JNIEnv* env, jobject jdb,
jlong jdb_handle,
jbyteArray jkey, jint jkey_off,
jint jkey_len, jbyteArray jval,
jint jval_off, jint jval_len,
jlong jcf_handle) {
auto db_handle = reinterpret_cast<rocksdb::DB*>(jdb_handle);
auto cf_handle = reinterpret_cast<rocksdb::ColumnFamilyHandle*>(jcf_handle);
if (cf_handle != nullptr) {
return rocksdb_get_helper(env, db_handle, rocksdb::ReadOptions(), cf_handle,
jkey, jkey_len, jentry_value, jentry_value_len);
jkey, jkey_off, jkey_len, jval, jval_off,
jval_len);
} else {
rocksdb::RocksDBExceptionJni::ThrowNew(env,
rocksdb::Status::InvalidArgument("Invalid ColumnFamilyHandle."));
@ -788,33 +809,35 @@ jint Java_org_rocksdb_RocksDB_get__J_3BI_3BIJ(
/*
* Class: org_rocksdb_RocksDB
* Method: get
* Signature: (JJ[BI[BI)I
* Signature: (JJ[BII[BII)I
*/
jint Java_org_rocksdb_RocksDB_get__JJ_3BI_3BI(
JNIEnv* env, jobject jdb, jlong jdb_handle, jlong jropt_handle,
jbyteArray jkey, jint jkey_len,
jbyteArray jentry_value, jint jentry_value_len) {
return rocksdb_get_helper(env,
reinterpret_cast<rocksdb::DB*>(jdb_handle),
*reinterpret_cast<rocksdb::ReadOptions*>(jropt_handle),
nullptr, jkey, jkey_len, jentry_value, jentry_value_len);
jint Java_org_rocksdb_RocksDB_get__JJ_3BII_3BII(JNIEnv* env, jobject jdb,
jlong jdb_handle,
jlong jropt_handle,
jbyteArray jkey, jint jkey_off,
jint jkey_len, jbyteArray jval,
jint jval_off, jint jval_len) {
return rocksdb_get_helper(
env, reinterpret_cast<rocksdb::DB*>(jdb_handle),
*reinterpret_cast<rocksdb::ReadOptions*>(jropt_handle), nullptr, jkey,
jkey_off, jkey_len, jval, jval_off, jval_len);
}
/*
* Class: org_rocksdb_RocksDB
* Method: get
* Signature: (JJ[BI[BIJ)I
* Signature: (JJ[BII[BIIJ)I
*/
jint Java_org_rocksdb_RocksDB_get__JJ_3BI_3BIJ(
jint Java_org_rocksdb_RocksDB_get__JJ_3BII_3BIIJ(
JNIEnv* env, jobject jdb, jlong jdb_handle, jlong jropt_handle,
jbyteArray jkey, jint jkey_len,
jbyteArray jentry_value, jint jentry_value_len, jlong jcf_handle) {
jbyteArray jkey, jint jkey_off, jint jkey_len, jbyteArray jval,
jint jval_off, jint jval_len, jlong jcf_handle) {
auto db_handle = reinterpret_cast<rocksdb::DB*>(jdb_handle);
auto& ro_opt = *reinterpret_cast<rocksdb::ReadOptions*>(jropt_handle);
auto cf_handle = reinterpret_cast<rocksdb::ColumnFamilyHandle*>(jcf_handle);
if (cf_handle != nullptr) {
return rocksdb_get_helper(env, db_handle, ro_opt, cf_handle, jkey,
jkey_len, jentry_value, jentry_value_len);
return rocksdb_get_helper(env, db_handle, ro_opt, cf_handle, jkey, jkey_off,
jkey_len, jval, jval_off, jval_len);
} else {
rocksdb::RocksDBExceptionJni::ThrowNew(env,
rocksdb::Status::InvalidArgument("Invalid ColumnFamilyHandle."));
@ -826,8 +849,10 @@ jint Java_org_rocksdb_RocksDB_get__JJ_3BI_3BIJ(
// rocksdb::DB::Delete()
void rocksdb_delete_helper(
JNIEnv* env, rocksdb::DB* db, const rocksdb::WriteOptions& write_options,
rocksdb::ColumnFamilyHandle* cf_handle, jbyteArray jkey, jint jkey_len) {
jbyte* key = env->GetByteArrayElements(jkey, 0);
rocksdb::ColumnFamilyHandle* cf_handle, jbyteArray jkey, jint jkey_off,
jint jkey_len) {
jbyte* key = new jbyte[jkey_len];
env->GetByteArrayRegion(jkey, jkey_off, jkey_len, key);
rocksdb::Slice key_slice(reinterpret_cast<char*>(key), jkey_len);
rocksdb::Status s;
@ -837,10 +862,9 @@ void rocksdb_delete_helper(
// backwards compatibility
s = db->Delete(write_options, key_slice);
}
// trigger java unref on key and value.
// by passing JNI_ABORT, it will simply release the reference without
// copying the result back to the java byte array.
env->ReleaseByteArrayElements(jkey, key, JNI_ABORT);
// cleanup
delete [] key;
if (!s.ok()) {
rocksdb::RocksDBExceptionJni::ThrowNew(env, s);
@ -851,33 +875,33 @@ void rocksdb_delete_helper(
/*
* Class: org_rocksdb_RocksDB
* Method: delete
* Signature: (J[BI)V
* Signature: (J[BII)V
*/
void Java_org_rocksdb_RocksDB_delete__J_3BI(
void Java_org_rocksdb_RocksDB_delete__J_3BII(
JNIEnv* env, jobject jdb, jlong jdb_handle,
jbyteArray jkey, jint jkey_len) {
jbyteArray jkey, jint jkey_off, jint jkey_len) {
auto db = reinterpret_cast<rocksdb::DB*>(jdb_handle);
static const rocksdb::WriteOptions default_write_options =
rocksdb::WriteOptions();
rocksdb_delete_helper(env, db, default_write_options, nullptr,
jkey, jkey_len);
jkey, jkey_off, jkey_len);
}
/*
* Class: org_rocksdb_RocksDB
* Method: delete
* Signature: (J[BIJ)V
* Signature: (J[BIIJ)V
*/
void Java_org_rocksdb_RocksDB_delete__J_3BIJ(
void Java_org_rocksdb_RocksDB_delete__J_3BIIJ(
JNIEnv* env, jobject jdb, jlong jdb_handle,
jbyteArray jkey, jint jkey_len, jlong jcf_handle) {
jbyteArray jkey, jint jkey_off, jint jkey_len, jlong jcf_handle) {
auto db = reinterpret_cast<rocksdb::DB*>(jdb_handle);
static const rocksdb::WriteOptions default_write_options =
rocksdb::WriteOptions();
auto cf_handle = reinterpret_cast<rocksdb::ColumnFamilyHandle*>(jcf_handle);
if (cf_handle != nullptr) {
rocksdb_delete_helper(env, db, default_write_options, cf_handle,
jkey, jkey_len);
jkey, jkey_off, jkey_len);
} else {
rocksdb::RocksDBExceptionJni::ThrowNew(env,
rocksdb::Status::InvalidArgument("Invalid ColumnFamilyHandle."));
@ -887,30 +911,32 @@ void Java_org_rocksdb_RocksDB_delete__J_3BIJ(
/*
* Class: org_rocksdb_RocksDB
* Method: delete
* Signature: (JJ[BIJ)V
* Signature: (JJ[BII)V
*/
void Java_org_rocksdb_RocksDB_delete__JJ_3BI(
void Java_org_rocksdb_RocksDB_delete__JJ_3BII(
JNIEnv* env, jobject jdb, jlong jdb_handle,
jlong jwrite_options, jbyteArray jkey, jint jkey_len) {
jlong jwrite_options, jbyteArray jkey, jint jkey_off, jint jkey_len) {
auto db = reinterpret_cast<rocksdb::DB*>(jdb_handle);
auto write_options = reinterpret_cast<rocksdb::WriteOptions*>(jwrite_options);
rocksdb_delete_helper(env, db, *write_options, nullptr, jkey, jkey_len);
rocksdb_delete_helper(env, db, *write_options, nullptr, jkey, jkey_off,
jkey_len);
}
/*
* Class: org_rocksdb_RocksDB
* Method: delete
* Signature: (JJ[BIJ)V
* Signature: (JJ[BIIJ)V
*/
void Java_org_rocksdb_RocksDB_delete__JJ_3BIJ(
void Java_org_rocksdb_RocksDB_delete__JJ_3BIIJ(
JNIEnv* env, jobject jdb, jlong jdb_handle,
jlong jwrite_options, jbyteArray jkey, jint jkey_len,
jlong jwrite_options, jbyteArray jkey, jint jkey_off, jint jkey_len,
jlong jcf_handle) {
auto db = reinterpret_cast<rocksdb::DB*>(jdb_handle);
auto write_options = reinterpret_cast<rocksdb::WriteOptions*>(jwrite_options);
auto cf_handle = reinterpret_cast<rocksdb::ColumnFamilyHandle*>(jcf_handle);
if (cf_handle != nullptr) {
rocksdb_delete_helper(env, db, *write_options, cf_handle, jkey, jkey_len);
rocksdb_delete_helper(env, db, *write_options, cf_handle, jkey, jkey_off,
jkey_len);
} else {
rocksdb::RocksDBExceptionJni::ThrowNew(env,
rocksdb::Status::InvalidArgument("Invalid ColumnFamilyHandle."));
@ -1016,16 +1042,17 @@ void Java_org_rocksdb_RocksDB_singleDelete__JJ_3BIJ(
//////////////////////////////////////////////////////////////////////////////
// rocksdb::DB::Merge
void rocksdb_merge_helper(
JNIEnv* env, rocksdb::DB* db, const rocksdb::WriteOptions& write_options,
rocksdb::ColumnFamilyHandle* cf_handle, jbyteArray jkey, jint jkey_len,
jbyteArray jentry_value, jint jentry_value_len) {
jbyte* key = env->GetByteArrayElements(jkey, 0);
jbyte* value = env->GetByteArrayElements(jentry_value, 0);
void rocksdb_merge_helper(JNIEnv* env, rocksdb::DB* db,
const rocksdb::WriteOptions& write_options,
rocksdb::ColumnFamilyHandle* cf_handle,
jbyteArray jkey, jint jkey_off, jint jkey_len,
jbyteArray jval, jint jval_off, jint jval_len) {
jbyte* key = new jbyte[jkey_len];
env->GetByteArrayRegion(jkey, jkey_off, jkey_len, key);
jbyte* value = new jbyte[jkey_len];
env->GetByteArrayRegion(jval, jval_off, jval_len, value);
rocksdb::Slice key_slice(reinterpret_cast<char*>(key), jkey_len);
rocksdb::Slice value_slice(reinterpret_cast<char*>(value),
jentry_value_len);
rocksdb::Slice value_slice(reinterpret_cast<char*>(value), jval_len);
rocksdb::Status s;
if (cf_handle != nullptr) {
@ -1034,11 +1061,9 @@ void rocksdb_merge_helper(
s = db->Merge(write_options, key_slice, value_slice);
}
// trigger java unref on key and value.
// by passing JNI_ABORT, it will simply release the reference without
// copying the result back to the java byte array.
env->ReleaseByteArrayElements(jkey, key, JNI_ABORT);
env->ReleaseByteArrayElements(jentry_value, value, JNI_ABORT);
// cleanup
delete [] value;
delete [] key;
if (s.ok()) {
return;
@ -1049,36 +1074,37 @@ void rocksdb_merge_helper(
/*
* Class: org_rocksdb_RocksDB
* Method: merge
* Signature: (J[BI[BI)V
* Signature: (J[BII[BII)V
*/
void Java_org_rocksdb_RocksDB_merge__J_3BI_3BI(
JNIEnv* env, jobject jdb, jlong jdb_handle,
jbyteArray jkey, jint jkey_len,
jbyteArray jentry_value, jint jentry_value_len) {
void Java_org_rocksdb_RocksDB_merge__J_3BII_3BII(JNIEnv* env, jobject jdb,
jlong jdb_handle,
jbyteArray jkey, jint jkey_off,
jint jkey_len, jbyteArray jval,
jint jval_off, jint jval_len) {
auto db = reinterpret_cast<rocksdb::DB*>(jdb_handle);
static const rocksdb::WriteOptions default_write_options =
rocksdb::WriteOptions();
rocksdb_merge_helper(env, db, default_write_options,
nullptr, jkey, jkey_len, jentry_value, jentry_value_len);
rocksdb_merge_helper(env, db, default_write_options, nullptr, jkey, jkey_off,
jkey_len, jval, jval_off, jval_len);
}
/*
* Class: org_rocksdb_RocksDB
* Method: merge
* Signature: (J[BI[BIJ)V
* Signature: (J[BII[BIIJ)V
*/
void Java_org_rocksdb_RocksDB_merge__J_3BI_3BIJ(
JNIEnv* env, jobject jdb, jlong jdb_handle,
jbyteArray jkey, jint jkey_len,
jbyteArray jentry_value, jint jentry_value_len, jlong jcf_handle) {
void Java_org_rocksdb_RocksDB_merge__J_3BII_3BIIJ(
JNIEnv* env, jobject jdb, jlong jdb_handle, jbyteArray jkey, jint jkey_off,
jint jkey_len, jbyteArray jval, jint jval_off, jint jval_len,
jlong jcf_handle) {
auto db = reinterpret_cast<rocksdb::DB*>(jdb_handle);
static const rocksdb::WriteOptions default_write_options =
rocksdb::WriteOptions();
auto cf_handle = reinterpret_cast<rocksdb::ColumnFamilyHandle*>(jcf_handle);
if (cf_handle != nullptr) {
rocksdb_merge_helper(env, db, default_write_options,
cf_handle, jkey, jkey_len, jentry_value, jentry_value_len);
rocksdb_merge_helper(env, db, default_write_options, cf_handle, jkey,
jkey_off, jkey_len, jval, jval_off, jval_len);
} else {
rocksdb::RocksDBExceptionJni::ThrowNew(env,
rocksdb::Status::InvalidArgument("Invalid ColumnFamilyHandle."));
@ -1088,38 +1114,36 @@ void Java_org_rocksdb_RocksDB_merge__J_3BI_3BIJ(
/*
* Class: org_rocksdb_RocksDB
* Method: merge
* Signature: (JJ[BI[BI)V
* Signature: (JJ[BII[BII)V
*/
void Java_org_rocksdb_RocksDB_merge__JJ_3BI_3BI(
JNIEnv* env, jobject jdb,
jlong jdb_handle, jlong jwrite_options_handle,
jbyteArray jkey, jint jkey_len,
jbyteArray jentry_value, jint jentry_value_len) {
void Java_org_rocksdb_RocksDB_merge__JJ_3BII_3BII(
JNIEnv* env, jobject jdb, jlong jdb_handle, jlong jwrite_options_handle,
jbyteArray jkey, jint jkey_off, jint jkey_len, jbyteArray jval,
jint jval_off, jint jval_len) {
auto db = reinterpret_cast<rocksdb::DB*>(jdb_handle);
auto write_options = reinterpret_cast<rocksdb::WriteOptions*>(
jwrite_options_handle);
rocksdb_merge_helper(env, db, *write_options,
nullptr, jkey, jkey_len, jentry_value, jentry_value_len);
rocksdb_merge_helper(env, db, *write_options, nullptr, jkey, jkey_off,
jkey_len, jval, jval_off, jval_len);
}
/*
* Class: org_rocksdb_RocksDB
* Method: merge
* Signature: (JJ[BI[BIJ)V
* Signature: (JJ[BII[BIIJ)V
*/
void Java_org_rocksdb_RocksDB_merge__JJ_3BI_3BIJ(
JNIEnv* env, jobject jdb,
jlong jdb_handle, jlong jwrite_options_handle,
jbyteArray jkey, jint jkey_len,
jbyteArray jentry_value, jint jentry_value_len, jlong jcf_handle) {
void Java_org_rocksdb_RocksDB_merge__JJ_3BII_3BIIJ(
JNIEnv* env, jobject jdb, jlong jdb_handle, jlong jwrite_options_handle,
jbyteArray jkey, jint jkey_off, jint jkey_len, jbyteArray jval,
jint jval_off, jint jval_len, jlong jcf_handle) {
auto db = reinterpret_cast<rocksdb::DB*>(jdb_handle);
auto write_options = reinterpret_cast<rocksdb::WriteOptions*>(
jwrite_options_handle);
auto cf_handle = reinterpret_cast<rocksdb::ColumnFamilyHandle*>(jcf_handle);
if (cf_handle != nullptr) {
rocksdb_merge_helper(env, db, *write_options,
cf_handle, jkey, jkey_len, jentry_value, jentry_value_len);
rocksdb_merge_helper(env, db, *write_options, cf_handle, jkey, jkey_off,
jkey_len, jval, jval_off, jval_len);
} else {
rocksdb::RocksDBExceptionJni::ThrowNew(env,
rocksdb::Status::InvalidArgument("Invalid ColumnFamilyHandle."));
@ -1696,7 +1720,7 @@ void Java_org_rocksdb_RocksDB_setOptions(JNIEnv* env, jobject jdb,
std::unordered_map<std::string, std::string> options_map;
const jsize len = env->GetArrayLength(jkeys);
assert(len == env->GetArrayLength(jvalues));
for(int i = 0; i < len; i++) {
for (int i = 0; i < len; i++) {
jobject jobj_key = env->GetObjectArrayElement(jkeys, i);
jobject jobj_value = env->GetObjectArrayElement(jvalues, i);
jstring jkey = reinterpret_cast<jstring>(jobj_key);

View File

@ -604,6 +604,17 @@ public class DBOptions extends RocksObject implements DBOptionsInterface {
static final int DEFAULT_NUM_SHARD_BITS = -1;
public DBOptions setDelayedWriteRate(final long delayedWriteRate){
assert(isOwningHandle());
setDelayedWriteRate(nativeHandle_, delayedWriteRate);
return this;
}
public long delayedWriteRate(){
return delayedWriteRate(nativeHandle_);
}
/**
* <p>Private constructor to be used by
* {@link #getDBOptionsFromProps(java.util.Properties)}</p>
@ -725,6 +736,9 @@ public class DBOptions extends RocksObject implements DBOptionsInterface {
long writeThreadSlowYieldUsec);
private native long writeThreadSlowYieldUsec(long handle);
private native void setDelayedWriteRate(long handle, long delayedWriteRate);
private native long delayedWriteRate(long handle);
int numShardBits_;
RateLimiterConfig rateLimiterConfig_;
}

View File

@ -403,7 +403,7 @@ public class RocksDB extends RocksObject {
*/
public void put(final byte[] key, final byte[] value)
throws RocksDBException {
put(nativeHandle_, key, key.length, value, value.length);
put(nativeHandle_, key, 0, key.length, value, 0, value.length);
}
/**
@ -422,7 +422,7 @@ public class RocksDB extends RocksObject {
*/
public void put(final ColumnFamilyHandle columnFamilyHandle,
final byte[] key, final byte[] value) throws RocksDBException {
put(nativeHandle_, key, key.length, value, value.length,
put(nativeHandle_, key, 0, key.length, value, 0, value.length,
columnFamilyHandle.nativeHandle_);
}
@ -439,7 +439,7 @@ public class RocksDB extends RocksObject {
public void put(final WriteOptions writeOpts, final byte[] key,
final byte[] value) throws RocksDBException {
put(nativeHandle_, writeOpts.nativeHandle_,
key, key.length, value, value.length);
key, 0, key.length, value, 0, value.length);
}
/**
@ -461,8 +461,8 @@ public class RocksDB extends RocksObject {
public void put(final ColumnFamilyHandle columnFamilyHandle,
final WriteOptions writeOpts, final byte[] key,
final byte[] value) throws RocksDBException {
put(nativeHandle_, writeOpts.nativeHandle_, key, key.length, value,
value.length, columnFamilyHandle.nativeHandle_);
put(nativeHandle_, writeOpts.nativeHandle_, key, 0, key.length, value,
0, value.length, columnFamilyHandle.nativeHandle_);
}
/**
@ -477,8 +477,8 @@ public class RocksDB extends RocksObject {
* found in block-cache.
* @return boolean value indicating if key does not exist or might exist.
*/
public boolean keyMayExist(final byte[] key, final StringBuffer value){
return keyMayExist(nativeHandle_, key, key.length, value);
public boolean keyMayExist(final byte[] key, final StringBuffer value) {
return keyMayExist(nativeHandle_, key, 0, key.length, value);
}
/**
@ -495,8 +495,8 @@ public class RocksDB extends RocksObject {
* @return boolean value indicating if key does not exist or might exist.
*/
public boolean keyMayExist(final ColumnFamilyHandle columnFamilyHandle,
final byte[] key, final StringBuffer value){
return keyMayExist(nativeHandle_, key, key.length,
final byte[] key, final StringBuffer value) {
return keyMayExist(nativeHandle_, key, 0, key.length,
columnFamilyHandle.nativeHandle_, value);
}
@ -514,9 +514,9 @@ public class RocksDB extends RocksObject {
* @return boolean value indicating if key does not exist or might exist.
*/
public boolean keyMayExist(final ReadOptions readOptions,
final byte[] key, final StringBuffer value){
final byte[] key, final StringBuffer value) {
return keyMayExist(nativeHandle_, readOptions.nativeHandle_,
key, key.length, value);
key, 0, key.length, value);
}
/**
@ -535,9 +535,9 @@ public class RocksDB extends RocksObject {
*/
public boolean keyMayExist(final ReadOptions readOptions,
final ColumnFamilyHandle columnFamilyHandle, final byte[] key,
final StringBuffer value){
final StringBuffer value) {
return keyMayExist(nativeHandle_, readOptions.nativeHandle_,
key, key.length, columnFamilyHandle.nativeHandle_,
key, 0, key.length, columnFamilyHandle.nativeHandle_,
value);
}
@ -581,7 +581,7 @@ public class RocksDB extends RocksObject {
*/
public void merge(final byte[] key, final byte[] value)
throws RocksDBException {
merge(nativeHandle_, key, key.length, value, value.length);
merge(nativeHandle_, key, 0, key.length, value, 0, value.length);
}
/**
@ -597,7 +597,7 @@ public class RocksDB extends RocksObject {
*/
public void merge(final ColumnFamilyHandle columnFamilyHandle,
final byte[] key, final byte[] value) throws RocksDBException {
merge(nativeHandle_, key, key.length, value, value.length,
merge(nativeHandle_, key, 0, key.length, value, 0, value.length,
columnFamilyHandle.nativeHandle_);
}
@ -615,7 +615,7 @@ public class RocksDB extends RocksObject {
public void merge(final WriteOptions writeOpts, final byte[] key,
final byte[] value) throws RocksDBException {
merge(nativeHandle_, writeOpts.nativeHandle_,
key, key.length, value, value.length);
key, 0, key.length, value, 0, value.length);
}
/**
@ -634,7 +634,7 @@ public class RocksDB extends RocksObject {
final WriteOptions writeOpts, final byte[] key,
final byte[] value) throws RocksDBException {
merge(nativeHandle_, writeOpts.nativeHandle_,
key, key.length, value, value.length,
key, 0, key.length, value, 0, value.length,
columnFamilyHandle.nativeHandle_);
}
@ -653,7 +653,7 @@ public class RocksDB extends RocksObject {
* native library.
*/
public int get(final byte[] key, final byte[] value) throws RocksDBException {
return get(nativeHandle_, key, key.length, value, value.length);
return get(nativeHandle_, key, 0, key.length, value, 0, value.length);
}
/**
@ -675,7 +675,7 @@ public class RocksDB extends RocksObject {
*/
public int get(final ColumnFamilyHandle columnFamilyHandle, final byte[] key,
final byte[] value) throws RocksDBException, IllegalArgumentException {
return get(nativeHandle_, key, key.length, value, value.length,
return get(nativeHandle_, key, 0, key.length, value, 0, value.length,
columnFamilyHandle.nativeHandle_);
}
@ -698,7 +698,7 @@ public class RocksDB extends RocksObject {
public int get(final ReadOptions opt, final byte[] key,
final byte[] value) throws RocksDBException {
return get(nativeHandle_, opt.nativeHandle_,
key, key.length, value, value.length);
key, 0, key.length, value, 0, value.length);
}
/**
* Get the value associated with the specified key within column family.
@ -721,8 +721,8 @@ public class RocksDB extends RocksObject {
public int get(final ColumnFamilyHandle columnFamilyHandle,
final ReadOptions opt, final byte[] key, final byte[] value)
throws RocksDBException {
return get(nativeHandle_, opt.nativeHandle_, key, key.length, value,
value.length, columnFamilyHandle.nativeHandle_);
return get(nativeHandle_, opt.nativeHandle_, key, 0, key.length, value,
0, value.length, columnFamilyHandle.nativeHandle_);
}
/**
@ -738,7 +738,7 @@ public class RocksDB extends RocksObject {
* native library.
*/
public byte[] get(final byte[] key) throws RocksDBException {
return get(nativeHandle_, key, key.length);
return get(nativeHandle_, key, 0, key.length);
}
/**
@ -757,7 +757,7 @@ public class RocksDB extends RocksObject {
*/
public byte[] get(final ColumnFamilyHandle columnFamilyHandle,
final byte[] key) throws RocksDBException {
return get(nativeHandle_, key, key.length,
return get(nativeHandle_, key, 0, key.length,
columnFamilyHandle.nativeHandle_);
}
@ -776,7 +776,7 @@ public class RocksDB extends RocksObject {
*/
public byte[] get(final ReadOptions opt, final byte[] key)
throws RocksDBException {
return get(nativeHandle_, opt.nativeHandle_, key, key.length);
return get(nativeHandle_, opt.nativeHandle_, key, 0, key.length);
}
/**
@ -796,7 +796,7 @@ public class RocksDB extends RocksObject {
*/
public byte[] get(final ColumnFamilyHandle columnFamilyHandle,
final ReadOptions opt, final byte[] key) throws RocksDBException {
return get(nativeHandle_, opt.nativeHandle_, key, key.length,
return get(nativeHandle_, opt.nativeHandle_, key, 0, key.length,
columnFamilyHandle.nativeHandle_);
}
@ -814,10 +814,17 @@ public class RocksDB extends RocksObject {
throws RocksDBException {
assert(keys.size() != 0);
final byte[][] values = multiGet(nativeHandle_,
keys.toArray(new byte[keys.size()][]));
final byte[][] keysArray = keys.toArray(new byte[keys.size()][]);
final int keyOffsets[] = new int[keysArray.length];
final int keyLengths[] = new int[keysArray.length];
for(int i = 0; i < keyLengths.length; i++) {
keyLengths[i] = keysArray[i].length;
}
Map<byte[], byte[]> keyValueMap = new HashMap<>();
final byte[][] values = multiGet(nativeHandle_, keysArray, keyOffsets,
keyLengths);
final Map<byte[], byte[]> keyValueMap = new HashMap<>();
for(int i = 0; i < values.length; i++) {
if(values[i] == null) {
continue;
@ -862,10 +869,18 @@ public class RocksDB extends RocksObject {
for (int i = 0; i < columnFamilyHandleList.size(); i++) {
cfHandles[i] = columnFamilyHandleList.get(i).nativeHandle_;
}
final byte[][] values = multiGet(nativeHandle_,
keys.toArray(new byte[keys.size()][]), cfHandles);
Map<byte[], byte[]> keyValueMap = new HashMap<>();
final byte[][] keysArray = keys.toArray(new byte[keys.size()][]);
final int keyOffsets[] = new int[keysArray.length];
final int keyLengths[] = new int[keysArray.length];
for(int i = 0; i < keyLengths.length; i++) {
keyLengths[i] = keysArray[i].length;
}
final byte[][] values = multiGet(nativeHandle_, keysArray, keyOffsets,
keyLengths, cfHandles);
final Map<byte[], byte[]> keyValueMap = new HashMap<>();
for(int i = 0; i < values.length; i++) {
if (values[i] == null) {
continue;
@ -890,10 +905,17 @@ public class RocksDB extends RocksObject {
final List<byte[]> keys) throws RocksDBException {
assert(keys.size() != 0);
final byte[][] values = multiGet(nativeHandle_, opt.nativeHandle_,
keys.toArray(new byte[keys.size()][]));
final byte[][] keysArray = keys.toArray(new byte[keys.size()][]);
final int keyOffsets[] = new int[keysArray.length];
final int keyLengths[] = new int[keysArray.length];
for(int i = 0; i < keyLengths.length; i++) {
keyLengths[i] = keysArray[i].length;
}
Map<byte[], byte[]> keyValueMap = new HashMap<>();
final byte[][] values = multiGet(nativeHandle_, opt.nativeHandle_,
keysArray, keyOffsets, keyLengths);
final Map<byte[], byte[]> keyValueMap = new HashMap<>();
for(int i = 0; i < values.length; i++) {
if(values[i] == null) {
continue;
@ -938,10 +960,18 @@ public class RocksDB extends RocksObject {
for (int i = 0; i < columnFamilyHandleList.size(); i++) {
cfHandles[i] = columnFamilyHandleList.get(i).nativeHandle_;
}
final byte[][] values = multiGet(nativeHandle_, opt.nativeHandle_,
keys.toArray(new byte[keys.size()][]), cfHandles);
Map<byte[], byte[]> keyValueMap = new HashMap<>();
final byte[][] keysArray = keys.toArray(new byte[keys.size()][]);
final int keyOffsets[] = new int[keysArray.length];
final int keyLengths[] = new int[keysArray.length];
for(int i = 0; i < keyLengths.length; i++) {
keyLengths[i] = keysArray[i].length;
}
final byte[][] values = multiGet(nativeHandle_, opt.nativeHandle_,
keysArray, keyOffsets, keyLengths, cfHandles);
final Map<byte[], byte[]> keyValueMap = new HashMap<>();
for(int i = 0; i < values.length; i++) {
if(values[i] == null) {
continue;
@ -980,7 +1010,7 @@ public class RocksDB extends RocksObject {
* native library.
*/
public void delete(final byte[] key) throws RocksDBException {
delete(nativeHandle_, key, key.length);
delete(nativeHandle_, key, 0, key.length);
}
/**
@ -1017,7 +1047,7 @@ public class RocksDB extends RocksObject {
*/
public void delete(final ColumnFamilyHandle columnFamilyHandle,
final byte[] key) throws RocksDBException {
delete(nativeHandle_, key, key.length, columnFamilyHandle.nativeHandle_);
delete(nativeHandle_, key, 0, key.length, columnFamilyHandle.nativeHandle_);
}
/**
@ -1052,7 +1082,7 @@ public class RocksDB extends RocksObject {
*/
public void delete(final WriteOptions writeOpt, final byte[] key)
throws RocksDBException {
delete(nativeHandle_, writeOpt.nativeHandle_, key, key.length);
delete(nativeHandle_, writeOpt.nativeHandle_, key, 0, key.length);
}
/**
@ -1093,7 +1123,7 @@ public class RocksDB extends RocksObject {
public void delete(final ColumnFamilyHandle columnFamilyHandle,
final WriteOptions writeOpt, final byte[] key)
throws RocksDBException {
delete(nativeHandle_, writeOpt.nativeHandle_, key, key.length,
delete(nativeHandle_, writeOpt.nativeHandle_, key, 0, key.length,
columnFamilyHandle.nativeHandle_);
}
@ -1970,91 +2000,87 @@ public class RocksDB extends RocksObject {
final long[] columnFamilyOptions
) throws RocksDBException;
protected native static byte[][] listColumnFamilies(
long optionsHandle, String path) throws RocksDBException;
protected native void put(
long handle, byte[] key, int keyLen,
byte[] value, int valueLen) throws RocksDBException;
protected native void put(
long handle, byte[] key, int keyLen,
byte[] value, int valueLen, long cfHandle) throws RocksDBException;
protected native void put(
long handle, long writeOptHandle,
byte[] key, int keyLen,
byte[] value, int valueLen) throws RocksDBException;
protected native void put(
long handle, long writeOptHandle,
byte[] key, int keyLen,
byte[] value, int valueLen, long cfHandle) throws RocksDBException;
protected native static byte[][] listColumnFamilies(long optionsHandle,
String path) throws RocksDBException;
protected native void put(long handle, byte[] key, int keyOffset,
int keyLength, byte[] value, int valueOffset, int valueLength)
throws RocksDBException;
protected native void put(long handle, byte[] key, int keyOffset,
int keyLength, byte[] value, int valueOffset, int valueLength,
long cfHandle) throws RocksDBException;
protected native void put(long handle, long writeOptHandle, byte[] key,
int keyOffset, int keyLength, byte[] value, int valueOffset,
int valueLength) throws RocksDBException;
protected native void put(long handle, long writeOptHandle, byte[] key,
int keyOffset, int keyLength, byte[] value, int valueOffset,
int valueLength, long cfHandle) throws RocksDBException;
protected native void write0(final long handle, long writeOptHandle,
long wbHandle) throws RocksDBException;
protected native void write1(final long handle, long writeOptHandle,
long wbwiHandle) throws RocksDBException;
protected native boolean keyMayExist(final long handle, final byte[] key,
final int keyLen, final StringBuffer stringBuffer);
final int keyOffset, final int keyLength,
final StringBuffer stringBuffer);
protected native boolean keyMayExist(final long handle, final byte[] key,
final int keyLen, final long cfHandle, final StringBuffer stringBuffer);
protected native boolean keyMayExist(final long handle,
final long optionsHandle, final byte[] key, final int keyLen,
final int keyOffset, final int keyLength, final long cfHandle,
final StringBuffer stringBuffer);
protected native boolean keyMayExist(final long handle,
final long optionsHandle, final byte[] key, final int keyLen,
final long cfHandle, final StringBuffer stringBuffer);
protected native void merge(
long handle, byte[] key, int keyLen,
byte[] value, int valueLen) throws RocksDBException;
protected native void merge(
long handle, byte[] key, int keyLen,
byte[] value, int valueLen, long cfHandle) throws RocksDBException;
protected native void merge(
long handle, long writeOptHandle,
byte[] key, int keyLen,
byte[] value, int valueLen) throws RocksDBException;
protected native void merge(
long handle, long writeOptHandle,
byte[] key, int keyLen,
byte[] value, int valueLen, long cfHandle) throws RocksDBException;
protected native int get(
long handle, byte[] key, int keyLen,
byte[] value, int valueLen) throws RocksDBException;
protected native int get(
long handle, byte[] key, int keyLen,
byte[] value, int valueLen, long cfHandle) throws RocksDBException;
protected native int get(
long handle, long readOptHandle, byte[] key, int keyLen,
byte[] value, int valueLen) throws RocksDBException;
protected native int get(
long handle, long readOptHandle, byte[] key, int keyLen,
byte[] value, int valueLen, long cfHandle) throws RocksDBException;
protected native byte[][] multiGet(final long dbHandle, final byte[][] keys);
final long optionsHandle, final byte[] key, final int keyOffset,
final int keyLength, final StringBuffer stringBuffer);
protected native boolean keyMayExist(final long handle,
final long optionsHandle, final byte[] key, final int keyOffset,
final int keyLength, final long cfHandle,
final StringBuffer stringBuffer);
protected native void merge(long handle, byte[] key, int keyOffset,
int keyLength, byte[] value, int valueOffset, int valueLength)
throws RocksDBException;
protected native void merge(long handle, byte[] key, int keyOffset,
int keyLength, byte[] value, int valueOffset, int valueLength,
long cfHandle) throws RocksDBException;
protected native void merge(long handle, long writeOptHandle, byte[] key,
int keyOffset, int keyLength, byte[] value, int valueOffset,
int valueLength) throws RocksDBException;
protected native void merge(long handle, long writeOptHandle, byte[] key,
int keyOffset, int keyLength, byte[] value, int valueOffset,
int valueLength, long cfHandle) throws RocksDBException;
protected native int get(long handle, byte[] key, int keyOffset,
int keyLength, byte[] value, int valueOffset, int valueLength)
throws RocksDBException;
protected native int get(long handle, byte[] key, int keyOffset,
int keyLength, byte[] value, int valueOffset, int valueLength,
long cfHandle) throws RocksDBException;
protected native int get(long handle, long readOptHandle, byte[] key,
int keyOffset, int keyLength, byte[] value, int valueOffset,
int valueLength) throws RocksDBException;
protected native int get(long handle, long readOptHandle, byte[] key,
int keyOffset, int keyLength, byte[] value, int valueOffset,
int valueLength, long cfHandle) throws RocksDBException;
protected native byte[][] multiGet(final long dbHandle, final byte[][] keys,
final int[] keyOffsets, final int[] keyLengths);
protected native byte[][] multiGet(final long dbHandle, final byte[][] keys,
final int[] keyOffsets, final int[] keyLengths,
final long[] columnFamilyHandles);
protected native byte[][] multiGet(final long dbHandle, final long rOptHandle,
final byte[][] keys);
final byte[][] keys, final int[] keyOffsets, final int[] keyLengths);
protected native byte[][] multiGet(final long dbHandle, final long rOptHandle,
final byte[][] keys, final long[] columnFamilyHandles);
protected native byte[] get(
long handle, byte[] key, int keyLen) throws RocksDBException;
protected native byte[] get(
long handle, byte[] key, int keyLen, long cfHandle)
throws RocksDBException;
protected native byte[] get(
long handle, long readOptHandle,
byte[] key, int keyLen) throws RocksDBException;
protected native byte[] get(
long handle, long readOptHandle,
byte[] key, int keyLen, long cfHandle) throws RocksDBException;
protected native void delete(
long handle, byte[] key, int keyLen) throws RocksDBException;
protected native void delete(
long handle, byte[] key, int keyLen, long cfHandle)
throws RocksDBException;
protected native void delete(
long handle, long writeOptHandle,
byte[] key, int keyLen) throws RocksDBException;
protected native void delete(
long handle, long writeOptHandle,
byte[] key, int keyLen, long cfHandle) throws RocksDBException;
final byte[][] keys, final int[] keyOffsets, final int[] keyLengths,
final long[] columnFamilyHandles);
protected native byte[] get(long handle, byte[] key, int keyOffset,
int keyLength) throws RocksDBException;
protected native byte[] get(long handle, byte[] key, int keyOffset,
int keyLength, long cfHandle) throws RocksDBException;
protected native byte[] get(long handle, long readOptHandle,
byte[] key, int keyOffset, int keyLength) throws RocksDBException;
protected native byte[] get(long handle, long readOptHandle, byte[] key,
int keyOffset, int keyLength, long cfHandle) throws RocksDBException;
protected native void delete(long handle, byte[] key, int keyOffset,
int keyLength) throws RocksDBException;
protected native void delete(long handle, byte[] key, int keyOffset,
int keyLength, long cfHandle) throws RocksDBException;
protected native void delete(long handle, long writeOptHandle, byte[] key,
int keyOffset, int keyLength) throws RocksDBException;
protected native void delete(long handle, long writeOptHandle, byte[] key,
int keyOffset, int keyLength, long cfHandle) throws RocksDBException;
protected native void singleDelete(
long handle, byte[] key, int keyLen) throws RocksDBException;
protected native void singleDelete(
@ -2070,8 +2096,8 @@ public class RocksDB extends RocksObject {
String property, int propertyLength) throws RocksDBException;
protected native String getProperty0(long nativeHandle, long cfHandle,
String property, int propertyLength) throws RocksDBException;
protected native long getLongProperty(long nativeHandle,
String property, int propertyLength) throws RocksDBException;
protected native long getLongProperty(long nativeHandle, String property,
int propertyLength) throws RocksDBException;
protected native long getLongProperty(long nativeHandle, long cfHandle,
String property, int propertyLength) throws RocksDBException;
protected native long iterator(long handle);
@ -2083,8 +2109,7 @@ public class RocksDB extends RocksObject {
final long[] columnFamilyHandles, final long readOptHandle)
throws RocksDBException;
protected native long getSnapshot(long nativeHandle);
protected native void releaseSnapshot(
long nativeHandle, long snapshotHandle);
protected native void releaseSnapshot(long nativeHandle, long snapshotHandle);
@Override protected final native void disposeInternal(final long handle);
private native long getDefaultColumnFamily(long handle);
private native long createColumnFamily(final long handle,
@ -2094,8 +2119,8 @@ public class RocksDB extends RocksObject {
throws RocksDBException;
private native void flush(long handle, long flushOptHandle)
throws RocksDBException;
private native void flush(long handle, long flushOptHandle,
long cfHandle) throws RocksDBException;
private native void flush(long handle, long flushOptHandle, long cfHandle)
throws RocksDBException;
private native void compactRange0(long handle, boolean reduce_level,
int target_level, int target_path_id) throws RocksDBException;
private native void compactRange0(long handle, byte[] begin, int beginLen,
@ -2111,8 +2136,8 @@ public class RocksDB extends RocksObject {
private native void continueBackgroundWork(long handle) throws RocksDBException;
private native long getLatestSequenceNumber(long handle);
private native void disableFileDeletions(long handle) throws RocksDBException;
private native void enableFileDeletions(long handle,
boolean force) throws RocksDBException;
private native void enableFileDeletions(long handle, boolean force)
throws RocksDBException;
private native long getUpdatesSince(long handle, long sequenceNumber)
throws RocksDBException;
private native void setOptions(long handle, long cfHandle, String[] keys,

View File

@ -6,6 +6,7 @@
#include <assert.h>
#include <condition_variable>
#include <functional>
#include <mutex>
#include <string>
#include <thread>

View File

@ -10,6 +10,7 @@
#pragma once
#include <atomic>
#include <functional>
#include <memory>
#include <unordered_map>
#include <vector>

View File

@ -359,6 +359,36 @@ bool BlockCacheTier::Reserve(const size_t size) {
return true;
}
Status NewPersistentCache(Env* const env, const std::string& path,
const uint64_t size,
const std::shared_ptr<Logger>& log,
const bool optimized_for_nvm,
std::shared_ptr<PersistentCache>* cache) {
if (!cache) {
return Status::IOError("invalid argument cache");
}
auto opt = PersistentCacheConfig(env, path, size, log);
if (optimized_for_nvm) {
// the default settings are optimized for SSD
// NVM devices are better accessed with 4K direct IO and written with
// parallelism
opt.enable_direct_writes = true;
opt.writer_qdepth = 4;
opt.writer_dispatch_size = 4 * 1024;
}
auto pcache = std::make_shared<BlockCacheTier>(opt);
Status s = pcache->Open();
if (!s.ok()) {
return s;
}
*cache = pcache;
return s;
}
} // namespace rocksdb
#endif // ifndef ROCKSDB_LITE

View File

@ -49,8 +49,8 @@ class BlockCacheTier : public PersistentCacheTier {
}
virtual ~BlockCacheTier() {
// By contract, the user should have called stop before destroying the
// object
// Close is re-entrant so we can call close even if it is already closed
Close();
assert(!insert_th_.joinable());
}

View File

@ -9,6 +9,7 @@
#ifndef OS_WIN
#include <unistd.h>
#endif
#include <functional>
#include <memory>
#include <vector>

View File

@ -6,6 +6,7 @@
#ifndef ROCKSDB_LITE
#include <functional>
#include <list>
#include <memory>
#include <string>

View File

@ -7,6 +7,7 @@
#ifndef ROCKSDB_LITE
#include <functional>
#include "util/random.h"
#include "utilities/persistent_cache/hash_table.h"
#include "utilities/persistent_cache/lrulist.h"

View File

@ -235,6 +235,19 @@ static void UniqueIdCallback(void* arg) {
}
#endif
TEST_F(PersistentCacheTierTest, FactoryTest) {
for (auto nvm_opt : {true, false}) {
ASSERT_FALSE(cache_);
auto log = std::make_shared<ConsoleLogger>();
std::shared_ptr<PersistentCache> cache;
ASSERT_OK(NewPersistentCache(Env::Default(), path_,
/*size=*/1 * 1024 * 1024 * 1024, log, nvm_opt,
&cache));
ASSERT_TRUE(cache);
cache.reset();
}
}
PersistentCacheDBTest::PersistentCacheDBTest() : DBTestBase("/cache_test") {
#ifdef OS_LINUX
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
@ -403,6 +416,7 @@ TEST_F(PersistentCacheDBTest, TieredCacheTest) {
RunTest(std::bind(&MakeTieredCache, dbname_));
}
#endif
} // namespace rocksdb
int main(int argc, char** argv) {