Compare commits

...

20 Commits

Author SHA1 Message Date
sdong
4844d698b9 Disable error as warning 2019-11-05 11:05:17 -08:00
sdong
157da07b6d Add one more #include<functional> 2019-11-05 11:05:16 -08:00
sdong
951ae63e50 Add some include<functional> 2019-10-31 14:21:04 -07:00
sdong
8df5fdedba [FB Internal] Point to the latest tool chain. 2019-10-31 14:21:03 -07:00
sdong
3c56f032cd [fb only] revert unintended change of USE_SSE
The previuos change that use gcc-5 set USE_SSE to wrong flag by mistake. Fix it.
2017-07-17 22:20:53 -07:00
sdong
368afc9d22 [FB Only] use gcc-5 2017-07-17 21:59:53 -07:00
krad
5339a5e6e6 Add factory method for creating persistent cache that is accessible from public
Summary:
Currently there is no mechanism to create persistent cache from
headers. Adding a simple factory method to create a simple persistent cache with
default or NVM optimized settings.

note: Any idea to test this factory is appreciated.

Test Plan: None

Reviewers: sdong

Subscribers: andrewkr, dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D64527
2016-10-03 16:14:03 -07:00
Islam AbdelRahman
24c3b2b21e Fix conflict between AddFile() and CompactRange()
Summary:
Fix the conflict bug between AddFile() and CompactRange() by
- Make sure that no AddFile calls are running when asking CompactionPicker to pick compaction for manual compaction
- If AddFile() run after we pick the compaction for the manual compaction it will be aware of it since we will add the manual compaction to running_compactions_ after picking it

This will solve these 2 scenarios
- If AddFile() is running, we will wait for it to finish before we pick a compaction for the manual compaction
- If we already picked a manual compaction and then AddFile() started ... we ensure that it never ingest a file in a level that will overlap with the manual compaction

Test Plan: unit tests

Reviewers: sdong

Reviewed By: sdong

Subscribers: andrewkr, yoshinorim, jkedgar, dhruba

Differential Revision: https://reviews.facebook.net/D64449
2016-09-28 16:16:10 -07:00
Islam AbdelRahman
f0b881629f Fix AddFile() conflict with compaction output [WaitForAddFile()]
Summary:
Since AddFile unlock/lock the mutex inside LogAndApply() we need to ensure that during this period other compactions cannot run since such compactions are not aware of the file we are ingesting and could create a compaction that overlap wit this file

this diff add
- WaitForAddFile() call that will ensure that no AddFile() calls are being processed right now
- Call `WaitForAddFile()` in 3 locations
-- When doing manual Compaction
-- When starting automatic Compaction
-- When  doing CompactFiles()

Test Plan: unit test

Reviewers: lightmark, yiwu, andrewkr, sdong

Reviewed By: sdong

Subscribers: andrewkr, yoshinorim, jkedgar, dhruba

Differential Revision: https://reviews.facebook.net/D64383
2016-09-27 10:37:29 -07:00
Islam AbdelRahman
8ee2ee8952 Fix CompactFilesTest.ObsoleteFiles timeout (#1353) 2016-09-26 12:03:39 -07:00
Aaron Gao
af28d114d6 not cut compaction output when compact to level 0
Summary: we should not call ShouldStopBefore() in compaction when the compaction targets level 0. Otherwise, CheckConsistency will fail the assertion of seq number check on level 0.

Test Plan:
make all check -j64
I also manully test that using db_bench to compact files to level 0. Without this line change, the assertion files and multiple files are generated on level 0 after compaction.

Reviewers: yhchiang, andrewkr, yiwu, sdong

Reviewed By: sdong

Subscribers: andrewkr, dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D64269
2016-09-25 20:08:34 -07:00
yiwu-arbug
a34c1e3373 Recover same sequence id from WAL (#1350)
Summary:
Revert the behavior where we don't read sequence id from WAL, but increase it as we replay the log. We still keep the behave for 2PC for now but will fix later.

This change fixes github issue 1339, where some writes come with WAL disabled and we may recover records with wrong sequence id.

Test Plan: Added unit test.

Subscribers: andrewkr, dhruba

Differential Revision: https://reviews.facebook.net/D64275
2016-09-25 20:08:24 -07:00
Yi Wu
165cec6ef6 Fix DBWALTest.RecoveryWithLogDataForSomeCFs with mac
Summary: Seems there's no std::array on mac+clang. Use raw array instead.

Test Plan: run ./db_wal_test on mac.

Reviewers: andrewkr

Reviewed By: andrewkr

Subscribers: andrewkr, dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D64005
2016-09-19 14:29:29 -07:00
Andrew Kryczka
d7d6a9a41d Fix recovery for WALs without data for all CFs
Summary:
if one or more CFs had no data in the WAL, the log number that's used
by FindObsoleteFiles() wasn't updated. We need to treat this case the same as
if the data for that WAL had been flushed.

Test Plan: new unit test

Reviewers: IslamAbdelRahman, yiwu, sdong

Reviewed By: sdong

Subscribers: andrewkr, dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D63963
2016-09-19 12:51:54 -07:00
Andrew Kryczka
55ab150b46 Fix GetSortedWalFiles when log recycling enabled
Summary:
Previously the sequence number was mistakenly passed in an argument
where the log number should go. This caused the reader to assume the old WAL
format was used, which is incompatible with the WAL recycling format.

Test Plan:
new unit test, verified it fails before this change and passes
afterwards.

Reviewers: yiwu, lightmark, IslamAbdelRahman, sdong

Reviewed By: sdong

Subscribers: andrewkr, dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D63987
2016-09-19 12:51:48 -07:00
xh931076284
cba883c538 Fix bug in UnScSigned-off-by: xh931076284 <931076284@qq.com> (#1336)
Fix HdfsEnv::UnSchedule() API error
2016-09-19 12:50:12 -07:00
Yi Wu
c4ffd74608 Fix java makefile dependencies
Summary: Fix dependencies in java makefile, to avoid java build failure.

Test Plan: run "make rocksdbjava" multiple times.

Reviewers: IslamAbdelRahman, sdong, yhchiang

Reviewed By: yhchiang

Subscribers: andrewkr, dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D64023
2016-09-19 12:49:47 -07:00
Yi Wu
ac7e52c2ba Rename jvalue to jval in rocksjni
Summary: jvalue shadows a global name in <jni.h>. Rename it to jval to fix java build.

Test Plan:
    JAVA_HOME=/usr/local/jdk-7u10-64 make rocksdbjava -j64

Reviewers: adamretter, yhchiang, IslamAbdelRahman

Reviewed By: yhchiang, IslamAbdelRahman

Subscribers: andrewkr, dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D63981
2016-09-19 12:49:38 -07:00
Adam Retter
28d8501876 Allow an offset as well as a length to be specified for byte[] operations in RocksJava JNI (#1264)
Test Plan: Execute the Java test suite

Reviewers: yhchiang

Subscribers: andrewkr, dhruba

Differential Revision: https://reviews.facebook.net/D61971
2016-09-19 12:47:40 -07:00
Islam AbdelRahman
e8bbb13a97 Release RocksDB 4.12
Summary: Release 4.12

Test Plan: none

Reviewers: andrewkr, yiwu, lightmark, sdong

Reviewed By: sdong

Subscribers: andrewkr, dhruba

Differential Revision: https://reviews.facebook.net/D63885
2016-09-12 12:56:56 -07:00
30 changed files with 896 additions and 455 deletions

View File

@ -1,5 +1,5 @@
# Rocksdb Change Log # Rocksdb Change Log
## Unreleased ## 4.12.0 (9/12/2016)
### Public API Change ### Public API Change
* CancelAllBackgroundWork() flushes all memtables for databases containing writes that have bypassed the WAL (writes issued with WriteOptions::disableWAL=true) before shutting down background threads. * CancelAllBackgroundWork() flushes all memtables for databases containing writes that have bypassed the WAL (writes issued with WriteOptions::disableWAL=true) before shutting down background threads.
* Merge options source_compaction_factor, max_grandparent_overlap_bytes and expanded_compaction_factor into max_compaction_bytes. * Merge options source_compaction_factor, max_grandparent_overlap_bytes and expanded_compaction_factor into max_compaction_bytes.
@ -9,6 +9,7 @@
### New Features ### New Features
* Introduce NewClockCache, which is based on CLOCK algorithm with better concurrent performance in some cases. It can be used to replace the default LRU-based block cache and table cache. To use it, RocksDB need to be linked with TBB lib. * Introduce NewClockCache, which is based on CLOCK algorithm with better concurrent performance in some cases. It can be used to replace the default LRU-based block cache and table cache. To use it, RocksDB need to be linked with TBB lib.
* Change ticker/histogram statistics implementations to accumulate data in thread-local storage, which improves CPU performance by reducing cache coherency costs. Callers of CreateDBStatistics do not need to change anything to use this feature. * Change ticker/histogram statistics implementations to accumulate data in thread-local storage, which improves CPU performance by reducing cache coherency costs. Callers of CreateDBStatistics do not need to change anything to use this feature.
* Block cache mid-point insertion, where index and filter block are inserted into LRU block cache with higher priority. The feature can be enabled by setting BlockBasedTableOptions::cache_index_and_filter_blocks_with_high_priority to true and high_pri_pool_ratio > 0 when creating NewLRUCache.
## 4.11.0 (8/1/2016) ## 4.11.0 (8/1/2016)
### Public API Change ### Public API Change

View File

@ -216,10 +216,6 @@ default: all
WARNING_FLAGS = -W -Wextra -Wall -Wsign-compare -Wshadow \ WARNING_FLAGS = -W -Wextra -Wall -Wsign-compare -Wshadow \
-Wno-unused-parameter -Wno-unused-parameter
ifndef DISABLE_WARNING_AS_ERROR
WARNING_FLAGS += -Werror
endif
CFLAGS += $(WARNING_FLAGS) -I. -I./include $(PLATFORM_CCFLAGS) $(OPT) CFLAGS += $(WARNING_FLAGS) -I. -I./include $(PLATFORM_CCFLAGS) $(OPT)
CXXFLAGS += $(WARNING_FLAGS) -I. -I./include $(PLATFORM_CXXFLAGS) $(OPT) -Woverloaded-virtual -Wnon-virtual-dtor -Wno-missing-field-initializers CXXFLAGS += $(WARNING_FLAGS) -I. -I./include $(PLATFORM_CXXFLAGS) $(OPT) -Woverloaded-virtual -Wnon-virtual-dtor -Wno-missing-field-initializers
@ -393,9 +389,10 @@ PARALLEL_TEST = \
db_compaction_filter_test \ db_compaction_filter_test \
db_compaction_test \ db_compaction_test \
db_sst_test \ db_sst_test \
external_sst_file_test \
db_test \ db_test \
db_universal_compaction_test \ db_universal_compaction_test \
db_wal_test \
external_sst_file_test \
fault_injection_test \ fault_injection_test \
inlineskiplist_test \ inlineskiplist_test \
manual_compaction_test \ manual_compaction_test \
@ -599,7 +596,7 @@ gen_parallel_tests:
# 107.816 PASS t/DBTest.EncodeDecompressedBlockSizeTest # 107.816 PASS t/DBTest.EncodeDecompressedBlockSizeTest
# #
slow_test_regexp = \ slow_test_regexp = \
^t/run-table_test-HarnessTest.Randomized$$|^t/run-db_test-.*(?:FileCreationRandomFailure|EncodeDecompressedBlockSizeTest)$$ ^t/run-table_test-HarnessTest.Randomized$$|^t/run-db_test-.*(?:FileCreationRandomFailure|EncodeDecompressedBlockSizeTest)$$|^.*RecoverFromCorruptedWALWithoutFlush$$
prioritize_long_running_tests = \ prioritize_long_running_tests = \
perl -pe 's,($(slow_test_regexp)),100 $$1,' \ perl -pe 's,($(slow_test_regexp)),100 $$1,' \
| sort -k1,1gr \ | sort -k1,1gr \
@ -820,6 +817,7 @@ clean:
find . -name "*.[oda]" -exec rm -f {} \; find . -name "*.[oda]" -exec rm -f {} \;
find . -type f -regex ".*\.\(\(gcda\)\|\(gcno\)\)" -exec rm {} \; find . -type f -regex ".*\.\(\(gcda\)\|\(gcno\)\)" -exec rm {} \;
rm -rf bzip2* snappy* zlib* lz4* rm -rf bzip2* snappy* zlib* lz4*
cd java; $(MAKE) clean
tags: tags:
ctags * -R ctags * -R

View File

@ -51,12 +51,7 @@ if [ -z "$ROCKSDB_NO_FBCODE" -a -d /mnt/gvfs/third-party ]; then
FBCODE_BUILD="true" FBCODE_BUILD="true"
# If we're compiling with TSAN we need pic build # If we're compiling with TSAN we need pic build
PIC_BUILD=$COMPILE_WITH_TSAN PIC_BUILD=$COMPILE_WITH_TSAN
if [ -z "$ROCKSDB_FBCODE_BUILD_WITH_481" ]; then source "$PWD/build_tools/fbcode_config.sh"
source "$PWD/build_tools/fbcode_config.sh"
else
# we need this to build with MySQL. Don't use for other purposes.
source "$PWD/build_tools/fbcode_config4.8.1.sh"
fi
fi fi
# Delete existing output, if it exists # Delete existing output, if it exists

View File

@ -1,17 +1,19 @@
GCC_BASE=/mnt/gvfs/third-party2/gcc/cf7d14c625ce30bae1a4661c2319c5a283e4dd22/4.9.x/centos6-native/108cf83 # Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved.
CLANG_BASE=/mnt/gvfs/third-party2/llvm-fb/8598c375b0e94e1448182eb3df034704144a838d/stable/centos6-native/3f16ddd GCC_BASE=/mnt/gvfs/third-party2/gcc/7331085db891a2ef4a88a48a751d834e8d68f4cb/7.x/centos7-native/b2ef2b6
LIBGCC_BASE=/mnt/gvfs/third-party2/libgcc/d6e0a7da6faba45f5e5b1638f9edd7afc2f34e7d/4.9.x/gcc-4.9-glibc-2.20/024dbc3 CLANG_BASE=/mnt/gvfs/third-party2/llvm-fb/963d9aeda70cc4779885b1277484fe7544a04e3e/9.0.0/platform007/9e92d53/
GLIBC_BASE=/mnt/gvfs/third-party2/glibc/d282e6e8f3d20f4e40a516834847bdc038e07973/2.20/gcc-4.9-glibc-2.20/500e281 LIBGCC_BASE=/mnt/gvfs/third-party2/libgcc/6ace84e956873d53638c738b6f65f3f469cca74c/7.x/platform007/5620abc
SNAPPY_BASE=/mnt/gvfs/third-party2/snappy/8c38a4c1e52b4c2cc8a9cdc31b9c947ed7dbfcb4/1.1.3/gcc-4.9-glibc-2.20/e9936bf GLIBC_BASE=/mnt/gvfs/third-party2/glibc/192b0f42d63dcf6210d6ceae387b49af049e6e0c/2.26/platform007/f259413
ZLIB_BASE=/mnt/gvfs/third-party2/zlib/0882df3713c7a84f15abe368dc004581f20b39d7/1.2.8/gcc-5-glibc-2.23/9bc6787 SNAPPY_BASE=/mnt/gvfs/third-party2/snappy/7f9bdaada18f59bc27ec2b0871eb8a6144343aef/1.1.3/platform007/ca4da3d
BZIP2_BASE=/mnt/gvfs/third-party2/bzip2/740325875f6729f42d28deaa2147b0854f3a347e/1.0.6/gcc-5-glibc-2.23/9bc6787 ZLIB_BASE=/mnt/gvfs/third-party2/zlib/2d9f0b9a4274cc21f61272a9e89bdb859bce8f1f/1.2.8/platform007/ca4da3d
LZ4_BASE=/mnt/gvfs/third-party2/lz4/0e790b441e2d9acd68d51e1d2e028f88c6a79ddf/r131/gcc-5-glibc-2.23/9bc6787 BZIP2_BASE=/mnt/gvfs/third-party2/bzip2/dc49a21c5fceec6456a7a28a94dcd16690af1337/1.0.6/platform007/ca4da3d
ZSTD_BASE=/mnt/gvfs/third-party2/zstd/9455f75ff7f4831dc9fda02a6a0f8c68922fad8f/1.0.0/gcc-5-glibc-2.23/9bc6787 LZ4_BASE=/mnt/gvfs/third-party2/lz4/0f607f8fc442ea7d6b876931b1898bb573d5e5da/1.9.1/platform007/ca4da3d
GFLAGS_BASE=/mnt/gvfs/third-party2/gflags/f001a51b2854957676d07306ef3abf67186b5c8b/2.1.1/gcc-4.8.1-glibc-2.17/c3f970a ZSTD_BASE=/mnt/gvfs/third-party2/zstd/ca22bc441a4eb709e9e0b1f9fec9750fed7b31c5/1.4.x/platform007/15a3614
JEMALLOC_BASE=/mnt/gvfs/third-party2/jemalloc/fc8a13ca1fffa4d0765c716c5a0b49f0c107518f/master/gcc-5-glibc-2.23/1c32b4b GFLAGS_BASE=/mnt/gvfs/third-party2/gflags/0b9929d2588991c65a57168bf88aff2db87c5d48/2.2.0/platform007/ca4da3d
NUMA_BASE=/mnt/gvfs/third-party2/numa/17c514c4d102a25ca15f4558be564eeed76f4b6a/2.0.8/gcc-5-glibc-2.23/9bc6787 JEMALLOC_BASE=/mnt/gvfs/third-party2/jemalloc/c26f08f47ac35fc31da2633b7da92d6b863246eb/master/platform007/c26c002
LIBUNWIND_BASE=/mnt/gvfs/third-party2/libunwind/ad576de2a1ea560c4d3434304f0fc4e079bede42/trunk/gcc-5-glibc-2.23/b1847cb NUMA_BASE=/mnt/gvfs/third-party2/numa/3f3fb57a5ccc5fd21c66416c0b83e0aa76a05376/2.0.11/platform007/ca4da3d
TBB_BASE=/mnt/gvfs/third-party2/tbb/9d9a554877d0c5bef330fe818ab7178806dd316a/4.0_update2/gcc-4.9-glibc-2.20/e9936bf LIBUNWIND_BASE=/mnt/gvfs/third-party2/libunwind/40c73d874898b386a71847f1b99115d93822d11f/1.4/platform007/6f3e0a9
KERNEL_HEADERS_BASE=/mnt/gvfs/third-party2/kernel-headers/7c111ff27e0c466235163f00f280a9d617c3d2ec/4.0.9-36_fbk5_2933_gd092e3f/gcc-5-glibc-2.23/da39a3e TBB_BASE=/mnt/gvfs/third-party2/tbb/4ce8e8dba77cdbd81b75d6f0c32fd7a1b76a11ec/2018_U5/platform007/ca4da3d
BINUTILS_BASE=/mnt/gvfs/third-party2/binutils/b7fd454c4b10c6a81015d4524ed06cdeab558490/2.26/centos6-native/da39a3e KERNEL_HEADERS_BASE=/mnt/gvfs/third-party2/kernel-headers/fb251ecd2f5ae16f8671f7014c246e52a748fe0b/fb/platform007/da39a3e
VALGRIND_BASE=/mnt/gvfs/third-party2/valgrind/d7f4d4d86674a57668e3a96f76f0e17dd0eb8765/3.10.0/gcc-4.9-glibc-2.20/e9936bf BINUTILS_BASE=/mnt/gvfs/third-party2/binutils/ab9f09bba370e7066cafd4eb59752db93f2e8312/2.29.1/platform007/15a3614
VALGRIND_BASE=/mnt/gvfs/third-party2/valgrind/d42d152a15636529b0861ec493927200ebebca8e/3.15.0/platform007/ca4da3d
LUA_BASE=/mnt/gvfs/third-party2/lua/f0cd714433206d5139df61659eb7b28b1dea6683/5.3.4/platform007/5007832

View File

@ -13,7 +13,7 @@ source "$BASEDIR/dependencies.sh"
CFLAGS="" CFLAGS=""
# libgcc # libgcc
LIBGCC_INCLUDE="$LIBGCC_BASE/include" LIBGCC_INCLUDE="$LIBGCC_BASE/include/c++/7.3.0"
LIBGCC_LIBS=" -L $LIBGCC_BASE/lib" LIBGCC_LIBS=" -L $LIBGCC_BASE/lib"
# glibc # glibc
@ -43,12 +43,16 @@ if test -z $PIC_BUILD; then
LZ4_INCLUDE=" -I $LZ4_BASE/include/" LZ4_INCLUDE=" -I $LZ4_BASE/include/"
LZ4_LIBS=" $LZ4_BASE/lib/liblz4.a" LZ4_LIBS=" $LZ4_BASE/lib/liblz4.a"
CFLAGS+=" -DLZ4" CFLAGS+=" -DLZ4"
ZSTD_INCLUDE=" -I $ZSTD_BASE/include/"
ZSTD_LIBS=" $ZSTD_BASE/lib/libzstd.a"
CFLAGS+=" -DZSTD"
fi fi
ZSTD_INCLUDE=" -I $ZSTD_BASE/include/"
if test -z $PIC_BUILD; then
ZSTD_LIBS=" $ZSTD_BASE/lib/libzstd.a"
else
ZSTD_LIBS=" $ZSTD_BASE/lib/libzstd_pic.a"
fi
CFLAGS+=" -DZSTD"
# location of gflags headers and libraries # location of gflags headers and libraries
GFLAGS_INCLUDE=" -I $GFLAGS_BASE/include/" GFLAGS_INCLUDE=" -I $GFLAGS_BASE/include/"
if test -z $PIC_BUILD; then if test -z $PIC_BUILD; then
@ -56,7 +60,7 @@ if test -z $PIC_BUILD; then
else else
GFLAGS_LIBS=" $GFLAGS_BASE/lib/libgflags_pic.a" GFLAGS_LIBS=" $GFLAGS_BASE/lib/libgflags_pic.a"
fi fi
CFLAGS+=" -DGFLAGS=google" CFLAGS+=" -DGFLAGS=gflags"
# location of jemalloc # location of jemalloc
JEMALLOC_INCLUDE=" -I $JEMALLOC_BASE/include/" JEMALLOC_INCLUDE=" -I $JEMALLOC_BASE/include/"
@ -104,8 +108,8 @@ if [ -z "$USE_CLANG" ]; then
CXX="$GCC_BASE/bin/g++" CXX="$GCC_BASE/bin/g++"
CFLAGS+=" -B$BINUTILS/gold" CFLAGS+=" -B$BINUTILS/gold"
CFLAGS+=" -isystem $GLIBC_INCLUDE"
CFLAGS+=" -isystem $LIBGCC_INCLUDE" CFLAGS+=" -isystem $LIBGCC_INCLUDE"
CFLAGS+=" -isystem $GLIBC_INCLUDE"
JEMALLOC=1 JEMALLOC=1
else else
# clang # clang
@ -116,8 +120,8 @@ else
KERNEL_HEADERS_INCLUDE="$KERNEL_HEADERS_BASE/include" KERNEL_HEADERS_INCLUDE="$KERNEL_HEADERS_BASE/include"
CFLAGS+=" -B$BINUTILS/gold -nostdinc -nostdlib" CFLAGS+=" -B$BINUTILS/gold -nostdinc -nostdlib"
CFLAGS+=" -isystem $LIBGCC_BASE/include/c++/4.9.x " CFLAGS+=" -isystem $LIBGCC_BASE/include/c++/7.x "
CFLAGS+=" -isystem $LIBGCC_BASE/include/c++/4.9.x/x86_64-facebook-linux " CFLAGS+=" -isystem $LIBGCC_BASE/include/c++/7.x/x86_64-facebook-linux "
CFLAGS+=" -isystem $GLIBC_INCLUDE" CFLAGS+=" -isystem $GLIBC_INCLUDE"
CFLAGS+=" -isystem $LIBGCC_INCLUDE" CFLAGS+=" -isystem $LIBGCC_INCLUDE"
CFLAGS+=" -isystem $CLANG_INCLUDE" CFLAGS+=" -isystem $CLANG_INCLUDE"
@ -128,13 +132,14 @@ else
fi fi
CFLAGS+=" $DEPS_INCLUDE" CFLAGS+=" $DEPS_INCLUDE"
CFLAGS+=" -DROCKSDB_PLATFORM_POSIX -DROCKSDB_LIB_IO_POSIX -DROCKSDB_FALLOCATE_PRESENT -DROCKSDB_MALLOC_USABLE_SIZE" CFLAGS+=" -DROCKSDB_PLATFORM_POSIX -DROCKSDB_LIB_IO_POSIX -DROCKSDB_FALLOCATE_PRESENT -DROCKSDB_MALLOC_USABLE_SIZE -DROCKSDB_RANGESYNC_PRESENT -DROCKSDB_SCHED_GETCPU_PRESENT -DROCKSDB_SUPPORT_THREAD_LOCAL -DHAVE_SSE42"
CXXFLAGS+=" $CFLAGS" CXXFLAGS+=" $CFLAGS"
EXEC_LDFLAGS=" $SNAPPY_LIBS $ZLIB_LIBS $BZIP_LIBS $LZ4_LIBS $ZSTD_LIBS $GFLAGS_LIBS $NUMA_LIB $TBB_LIBS" EXEC_LDFLAGS=" $SNAPPY_LIBS $ZLIB_LIBS $BZIP_LIBS $LZ4_LIBS $ZSTD_LIBS $GFLAGS_LIBS $NUMA_LIB $TBB_LIBS"
EXEC_LDFLAGS+=" -Wl,--dynamic-linker,/usr/local/fbcode/gcc-4.9-glibc-2.20/lib/ld.so" EXEC_LDFLAGS+=" -B$BINUTILS/gold"
EXEC_LDFLAGS+=" -Wl,--dynamic-linker,/usr/local/fbcode/platform007/lib/ld.so"
EXEC_LDFLAGS+=" $LIBUNWIND" EXEC_LDFLAGS+=" $LIBUNWIND"
EXEC_LDFLAGS+=" -Wl,-rpath=/usr/local/fbcode/gcc-4.9-glibc-2.20/lib" EXEC_LDFLAGS+=" -Wl,-rpath=/usr/local/fbcode/platform007/lib"
# required by libtbb # required by libtbb
EXEC_LDFLAGS+=" -ldl" EXEC_LDFLAGS+=" -ldl"
@ -144,4 +149,4 @@ EXEC_LDFLAGS_SHARED="$SNAPPY_LIBS $ZLIB_LIBS $BZIP_LIBS $LZ4_LIBS $ZSTD_LIBS $GF
VALGRIND_VER="$VALGRIND_BASE/bin/" VALGRIND_VER="$VALGRIND_BASE/bin/"
export CC CXX AR CFLAGS CXXFLAGS EXEC_LDFLAGS EXEC_LDFLAGS_SHARED VALGRIND_VER JEMALLOC_LIB JEMALLOC_INCLUDE CLANG_ANALYZER CLANG_SCAN_BUILD export CC CXX AR CFLAGS CXXFLAGS EXEC_LDFLAGS EXEC_LDFLAGS_SHARED VALGRIND_VER JEMALLOC_LIB JEMALLOC_INCLUDE CLANG_ANALYZER CLANG_SCAN_BUILD

View File

@ -50,6 +50,7 @@ class FlushedFileCollector : public EventListener {
} }
return result; return result;
} }
void ClearFlushedFiles() { flushed_files_.clear(); }
private: private:
std::vector<std::string> flushed_files_; std::vector<std::string> flushed_files_;
@ -116,13 +117,12 @@ TEST_F(CompactFilesTest, L0ConflictsFiles) {
TEST_F(CompactFilesTest, ObsoleteFiles) { TEST_F(CompactFilesTest, ObsoleteFiles) {
Options options; Options options;
// to trigger compaction more easily // to trigger compaction more easily
const int kWriteBufferSize = 10000; const int kWriteBufferSize = 65536;
options.create_if_missing = true; options.create_if_missing = true;
// Disable RocksDB background compaction. // Disable RocksDB background compaction.
options.compaction_style = kCompactionStyleNone; options.compaction_style = kCompactionStyleNone;
// Small slowdown and stop trigger for experimental purpose. options.level0_slowdown_writes_trigger = (1 << 30);
options.level0_slowdown_writes_trigger = 20; options.level0_stop_writes_trigger = (1 << 30);
options.level0_stop_writes_trigger = 20;
options.write_buffer_size = kWriteBufferSize; options.write_buffer_size = kWriteBufferSize;
options.max_write_buffer_number = 2; options.max_write_buffer_number = 2;
options.compression = kNoCompression; options.compression = kNoCompression;
@ -154,6 +154,46 @@ TEST_F(CompactFilesTest, ObsoleteFiles) {
delete db; delete db;
} }
TEST_F(CompactFilesTest, NotCutOutputOnLevel0) {
Options options;
options.create_if_missing = true;
// Disable RocksDB background compaction.
options.compaction_style = kCompactionStyleNone;
options.level0_slowdown_writes_trigger = 1000;
options.level0_stop_writes_trigger = 1000;
options.write_buffer_size = 65536;
options.max_write_buffer_number = 2;
options.compression = kNoCompression;
options.max_compaction_bytes = 5000;
// Add listener
FlushedFileCollector* collector = new FlushedFileCollector();
options.listeners.emplace_back(collector);
DB* db = nullptr;
DestroyDB(db_name_, options);
Status s = DB::Open(options, db_name_, &db);
assert(s.ok());
assert(db);
// create couple files
for (int i = 0; i < 500; ++i) {
db->Put(WriteOptions(), ToString(i), std::string(1000, 'a' + (i % 26)));
}
reinterpret_cast<DBImpl*>(db)->TEST_WaitForFlushMemTable();
auto l0_files_1 = collector->GetFlushedFiles();
collector->ClearFlushedFiles();
for (int i = 0; i < 500; ++i) {
db->Put(WriteOptions(), ToString(i), std::string(1000, 'a' + (i % 26)));
}
reinterpret_cast<DBImpl*>(db)->TEST_WaitForFlushMemTable();
auto l0_files_2 = collector->GetFlushedFiles();
ASSERT_OK(db->CompactFiles(CompactionOptions(), l0_files_1, 0));
ASSERT_OK(db->CompactFiles(CompactionOptions(), l0_files_2, 0));
// no assertion failure
delete db;
}
TEST_F(CompactFilesTest, CapturingPendingFiles) { TEST_F(CompactFilesTest, CapturingPendingFiles) {
Options options; Options options;
options.create_if_missing = true; options.create_if_missing = true;

View File

@ -762,7 +762,8 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
if (end != nullptr && if (end != nullptr &&
cfd->user_comparator()->Compare(c_iter->user_key(), *end) >= 0) { cfd->user_comparator()->Compare(c_iter->user_key(), *end) >= 0) {
break; break;
} else if (sub_compact->ShouldStopBefore( } else if (sub_compact->compaction->output_level() != 0 &&
sub_compact->ShouldStopBefore(
key, sub_compact->current_output_file_size) && key, sub_compact->current_output_file_size) &&
sub_compact->builder != nullptr) { sub_compact->builder != nullptr) {
status = FinishCompactionOutputFile(input->status(), sub_compact); status = FinishCompactionOutputFile(input->status(), sub_compact);

View File

@ -341,6 +341,7 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname)
next_job_id_(1), next_job_id_(1),
has_unpersisted_data_(false), has_unpersisted_data_(false),
env_options_(db_options_), env_options_(db_options_),
num_running_addfile_(0),
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
wal_manager_(db_options_, env_options_), wal_manager_(db_options_, env_options_),
#endif // ROCKSDB_LITE #endif // ROCKSDB_LITE
@ -1393,7 +1394,6 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
bool stop_replay_by_wal_filter = false; bool stop_replay_by_wal_filter = false;
bool stop_replay_for_corruption = false; bool stop_replay_for_corruption = false;
bool flushed = false; bool flushed = false;
SequenceNumber recovered_sequence = 0;
for (auto log_number : log_numbers) { for (auto log_number : log_numbers) {
// The previous incarnation may not have written any MANIFEST // The previous incarnation may not have written any MANIFEST
// records after allocating this log number. So we manually // records after allocating this log number. So we manually
@ -1472,13 +1472,13 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
WriteBatchInternal::SetContents(&batch, record); WriteBatchInternal::SetContents(&batch, record);
SequenceNumber sequence = WriteBatchInternal::Sequence(&batch); SequenceNumber sequence = WriteBatchInternal::Sequence(&batch);
// In point-in-time recovery mode, if sequence id of log files are
// consecutive, we continue recovery despite corruption. This could happen
// when we open and write to a corrupted DB, where sequence id will start
// from the last sequence id we recovered.
if (db_options_.wal_recovery_mode == if (db_options_.wal_recovery_mode ==
WALRecoveryMode::kPointInTimeRecovery) { WALRecoveryMode::kPointInTimeRecovery) {
if (sequence == recovered_sequence + 1) { // In point-in-time recovery mode, if sequence id of log files are
// consecutive, we continue recovery despite corruption. This could
// happen when we open and write to a corrupted DB, where sequence id
// will start from the last sequence id we recovered.
if (sequence == *next_sequence) {
stop_replay_for_corruption = false; stop_replay_for_corruption = false;
} }
if (stop_replay_for_corruption) { if (stop_replay_for_corruption) {
@ -1487,13 +1487,16 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
} }
} }
recovered_sequence = sequence;
bool no_prev_seq = true; bool no_prev_seq = true;
if (*next_sequence == kMaxSequenceNumber) { if (!db_options_.allow_2pc) {
*next_sequence = sequence; *next_sequence = sequence;
} else { } else {
no_prev_seq = false; if (*next_sequence == kMaxSequenceNumber) {
WriteBatchInternal::SetSequence(&batch, *next_sequence); *next_sequence = sequence;
} else {
no_prev_seq = false;
WriteBatchInternal::SetSequence(&batch, *next_sequence);
}
} }
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
@ -1590,8 +1593,10 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
// after replaying the file, this file may be a stale file. We ignore // after replaying the file, this file may be a stale file. We ignore
// sequence IDs from the file. Otherwise, if a newer stale log file that // sequence IDs from the file. Otherwise, if a newer stale log file that
// has been deleted, the sequenceID may be wrong. // has been deleted, the sequenceID may be wrong.
if (no_prev_seq && !has_valid_writes) { if (db_options_.allow_2pc) {
*next_sequence = kMaxSequenceNumber; if (no_prev_seq && !has_valid_writes) {
*next_sequence = kMaxSequenceNumber;
}
} }
MaybeIgnoreError(&status); MaybeIgnoreError(&status);
if (!status.ok()) { if (!status.ok()) {
@ -1701,7 +1706,7 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
// recovered and should be ignored on next reincarnation. // recovered and should be ignored on next reincarnation.
// Since we already recovered max_log_number, we want all logs // Since we already recovered max_log_number, we want all logs
// with numbers `<= max_log_number` (includes this one) to be ignored // with numbers `<= max_log_number` (includes this one) to be ignored
if (flushed) { if (flushed || cfd->mem()->GetFirstSequenceNumber() == 0) {
edit->SetLogNumber(max_log_number + 1); edit->SetLogNumber(max_log_number + 1);
} }
// we must mark the next log number as used, even though it's // we must mark the next log number as used, even though it's
@ -2105,6 +2110,10 @@ Status DBImpl::CompactFiles(
{ {
InstrumentedMutexLock l(&mutex_); InstrumentedMutexLock l(&mutex_);
// This call will unlock/lock the mutex to wait for current running
// AddFile() calls to finish.
WaitForAddFile();
s = CompactFilesImpl(compact_options, cfd, sv->current, s = CompactFilesImpl(compact_options, cfd, sv->current,
input_file_names, output_level, input_file_names, output_level,
output_path_id, &job_context, &log_buffer); output_path_id, &job_context, &log_buffer);
@ -2696,6 +2705,8 @@ Status DBImpl::RunManualCompaction(ColumnFamilyData* cfd, int input_level,
manual.end = &end_storage; manual.end = &end_storage;
} }
TEST_SYNC_POINT("DBImpl::RunManualCompaction:0");
TEST_SYNC_POINT("DBImpl::RunManualCompaction:1");
InstrumentedMutexLock l(&mutex_); InstrumentedMutexLock l(&mutex_);
// When a manual compaction arrives, temporarily disable scheduling of // When a manual compaction arrives, temporarily disable scheduling of
@ -2764,6 +2775,10 @@ Status DBImpl::RunManualCompaction(ColumnFamilyData* cfd, int input_level,
ca->m = &manual; ca->m = &manual;
manual.incomplete = false; manual.incomplete = false;
bg_compaction_scheduled_++; bg_compaction_scheduled_++;
// manual.compaction will be added to running_compactions_ and erased
// inside BackgroundCompaction() but we need to put it now since we
// will unlock the mutex.
running_compactions_.insert(manual.compaction);
env_->Schedule(&DBImpl::BGWorkCompaction, ca, Env::Priority::LOW, this, env_->Schedule(&DBImpl::BGWorkCompaction, ca, Env::Priority::LOW, this,
&DBImpl::UnscheduleCallback); &DBImpl::UnscheduleCallback);
scheduled = true; scheduled = true;
@ -3186,6 +3201,11 @@ void DBImpl::BackgroundCallCompaction(void* arg) {
LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, db_options_.info_log.get()); LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, db_options_.info_log.get());
{ {
InstrumentedMutexLock l(&mutex_); InstrumentedMutexLock l(&mutex_);
// This call will unlock/lock the mutex to wait for current running
// AddFile() calls to finish.
WaitForAddFile();
num_running_compactions_++; num_running_compactions_++;
auto pending_outputs_inserted_elem = auto pending_outputs_inserted_elem =
@ -3597,6 +3617,11 @@ void DBImpl::RemoveManualCompaction(DBImpl::ManualCompaction* m) {
} }
bool DBImpl::ShouldntRunManualCompaction(ManualCompaction* m) { bool DBImpl::ShouldntRunManualCompaction(ManualCompaction* m) {
if (num_running_addfile_ > 0) {
// We need to wait for other AddFile() calls to finish
// before running a manual compaction.
return true;
}
if (m->exclusive) { if (m->exclusive) {
return (bg_compaction_scheduled_ > 0); return (bg_compaction_scheduled_ > 0);
} }

View File

@ -620,7 +620,7 @@ class DBImpl : public DB {
// REQUIRES: log_numbers are sorted in ascending order // REQUIRES: log_numbers are sorted in ascending order
Status RecoverLogFiles(const std::vector<uint64_t>& log_numbers, Status RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
SequenceNumber* max_sequence, bool read_only); SequenceNumber* next_sequence, bool read_only);
// The following two methods are used to flush a memtable to // The following two methods are used to flush a memtable to
// storage. The first one is used at database RecoveryTime (when the // storage. The first one is used at database RecoveryTime (when the
@ -650,15 +650,24 @@ class DBImpl : public DB {
int PickLevelForIngestedFile(ColumnFamilyData* cfd, int PickLevelForIngestedFile(ColumnFamilyData* cfd,
const ExternalSstFileInfo& file_info); const ExternalSstFileInfo& file_info);
Status CompactFilesImpl( // Wait for current AddFile() calls to finish.
const CompactionOptions& compact_options, ColumnFamilyData* cfd, // REQUIRES: mutex_ held
Version* version, const std::vector<std::string>& input_file_names, void WaitForAddFile();
const int output_level, int output_path_id, JobContext* job_context,
LogBuffer* log_buffer); Status CompactFilesImpl(const CompactionOptions& compact_options,
ColumnFamilyData* cfd, Version* version,
const std::vector<std::string>& input_file_names,
const int output_level, int output_path_id,
JobContext* job_context, LogBuffer* log_buffer);
Status ReadExternalSstFileInfo(ColumnFamilyHandle* column_family, Status ReadExternalSstFileInfo(ColumnFamilyHandle* column_family,
const std::string& file_path, const std::string& file_path,
ExternalSstFileInfo* file_info); ExternalSstFileInfo* file_info);
#else
// AddFile is not supported in ROCKSDB_LITE so this function
// will be no-op
void WaitForAddFile() {}
#endif // ROCKSDB_LITE #endif // ROCKSDB_LITE
ColumnFamilyData* GetColumnFamilyDataByName(const std::string& cf_name); ColumnFamilyData* GetColumnFamilyDataByName(const std::string& cf_name);
@ -728,6 +737,7 @@ class DBImpl : public DB {
// * whenever bg_flush_scheduled_ or bg_purge_scheduled_ value decreases // * whenever bg_flush_scheduled_ or bg_purge_scheduled_ value decreases
// (i.e. whenever a flush is done, even if it didn't make any progress) // (i.e. whenever a flush is done, even if it didn't make any progress)
// * whenever there is an error in background purge, flush or compaction // * whenever there is an error in background purge, flush or compaction
// * whenever num_running_addfile_ goes to 0.
InstrumentedCondVar bg_cv_; InstrumentedCondVar bg_cv_;
uint64_t logfile_number_; uint64_t logfile_number_;
std::deque<uint64_t> std::deque<uint64_t>
@ -973,6 +983,10 @@ class DBImpl : public DB {
// REQUIRES: mutex held // REQUIRES: mutex held
std::unordered_set<Compaction*> running_compactions_; std::unordered_set<Compaction*> running_compactions_;
// Number of running AddFile() calls.
// REQUIRES: mutex held
int num_running_addfile_;
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
WalManager wal_manager_; WalManager wal_manager_;
#endif // ROCKSDB_LITE #endif // ROCKSDB_LITE

View File

@ -237,12 +237,16 @@ Status DBImpl::AddFile(ColumnFamilyHandle* column_family,
{ {
InstrumentedMutexLock l(&mutex_); InstrumentedMutexLock l(&mutex_);
TEST_SYNC_POINT("DBImpl::AddFile:MutexLock");
const MutableCFOptions mutable_cf_options = const MutableCFOptions mutable_cf_options =
*cfd->GetLatestMutableCFOptions(); *cfd->GetLatestMutableCFOptions();
WriteThread::Writer w; WriteThread::Writer w;
write_thread_.EnterUnbatched(&w, &mutex_); write_thread_.EnterUnbatched(&w, &mutex_);
num_running_addfile_++;
if (!skip_snapshot_check && !snapshots_.empty()) { if (!skip_snapshot_check && !snapshots_.empty()) {
// Check that no snapshots are being held // Check that no snapshots are being held
status = status =
@ -316,7 +320,13 @@ Status DBImpl::AddFile(ColumnFamilyHandle* column_family,
ReleaseFileNumberFromPendingOutputs( ReleaseFileNumberFromPendingOutputs(
pending_outputs_inserted_elem_list[i]); pending_outputs_inserted_elem_list[i]);
} }
}
num_running_addfile_--;
if (num_running_addfile_ == 0) {
bg_cv_.SignalAll();
}
TEST_SYNC_POINT("DBImpl::AddFile:MutexUnlock");
} // mutex_ is unlocked here;
if (!status.ok()) { if (!status.ok()) {
// We failed to add the files to the database // We failed to add the files to the database
@ -395,6 +405,13 @@ int DBImpl::PickLevelForIngestedFile(ColumnFamilyData* cfd,
return target_level; return target_level;
} }
void DBImpl::WaitForAddFile() {
mutex_.AssertHeld();
while (num_running_addfile_ > 0) {
bg_cv_.Wait();
}
}
#endif // ROCKSDB_LITE #endif // ROCKSDB_LITE
} // namespace rocksdb } // namespace rocksdb

View File

@ -9,6 +9,7 @@
#include "db/db_test_util.h" #include "db/db_test_util.h"
#include "port/stack_trace.h" #include "port/stack_trace.h"
#include "util/fault_injection_test_env.h"
#include "util/options_helper.h" #include "util/options_helper.h"
#include "util/sync_point.h" #include "util/sync_point.h"
@ -292,6 +293,44 @@ TEST_F(DBWALTest, RecoveryWithEmptyLog) {
} }
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
TEST_F(DBWALTest, GetSortedWalFiles) {
do {
CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
VectorLogPtr log_files;
ASSERT_OK(dbfull()->GetSortedWalFiles(log_files));
ASSERT_EQ(0, log_files.size());
ASSERT_OK(Put(1, "foo", "v1"));
ASSERT_OK(dbfull()->GetSortedWalFiles(log_files));
ASSERT_EQ(1, log_files.size());
} while (ChangeOptions());
}
TEST_F(DBWALTest, RecoveryWithLogDataForSomeCFs) {
// Test for regression of WAL cleanup missing files that don't contain data
// for every column family.
do {
CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
ASSERT_OK(Put(1, "foo", "v1"));
ASSERT_OK(Put(1, "foo", "v2"));
uint64_t earliest_log_nums[2];
for (int i = 0; i < 2; ++i) {
if (i > 0) {
ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
}
VectorLogPtr log_files;
ASSERT_OK(dbfull()->GetSortedWalFiles(log_files));
if (log_files.size() > 0) {
earliest_log_nums[i] = log_files[0]->LogNumber();
} else {
earliest_log_nums[i] = port::kMaxUint64;
}
}
// Check at least the first WAL was cleaned up during the recovery.
ASSERT_LT(earliest_log_nums[0], earliest_log_nums[1]);
} while (ChangeOptions());
}
TEST_F(DBWALTest, RecoverWithLargeLog) { TEST_F(DBWALTest, RecoverWithLargeLog) {
do { do {
{ {
@ -471,6 +510,41 @@ TEST_F(DBWALTest, SyncMultipleLogs) {
ASSERT_OK(dbfull()->SyncWAL()); ASSERT_OK(dbfull()->SyncWAL());
} }
// Github issue 1339. Prior the fix we read sequence id from the first log to
// a local variable, then keep increase the variable as we replay logs,
// ignoring actual sequence id of the records. This is incorrect if some writes
// come with WAL disabled.
TEST_F(DBWALTest, PartOfWritesWithWALDisabled) {
std::unique_ptr<FaultInjectionTestEnv> fault_env(
new FaultInjectionTestEnv(env_));
Options options = CurrentOptions();
options.env = fault_env.get();
options.disable_auto_compactions = true;
// TODO(yiwu): fix for 2PC.
options.allow_2pc = false;
WriteOptions wal_on, wal_off;
wal_on.sync = true;
wal_on.disableWAL = false;
wal_off.disableWAL = true;
CreateAndReopenWithCF({"dummy"}, options);
ASSERT_OK(Put(1, "dummy", "d1", wal_on)); // seq id 1
ASSERT_OK(Put(1, "dummy", "d2", wal_off));
ASSERT_OK(Put(1, "dummy", "d3", wal_off));
ASSERT_OK(Put(0, "key", "v4", wal_on)); // seq id 4
ASSERT_OK(Flush(0));
ASSERT_OK(Put(0, "key", "v5", wal_on)); // seq id 5
ASSERT_EQ("v5", Get(0, "key"));
// Simulate a crash.
fault_env->SetFilesystemActive(false);
Close();
fault_env->ResetState();
ReopenWithColumnFamilies({"default", "dummy"}, options);
// Prior to the fix, we may incorrectly recover "v5" with sequence id = 3.
ASSERT_EQ("v5", Get(0, "key"));
// Destroy DB before destruct fault_env.
Destroy(options);
}
// //
// Test WAL recovery for the various modes available // Test WAL recovery for the various modes available
// //

View File

@ -984,6 +984,138 @@ TEST_F(ExternalSSTFileTest, PickedLevel) {
rocksdb::SyncPoint::GetInstance()->DisableProcessing(); rocksdb::SyncPoint::GetInstance()->DisableProcessing();
} }
TEST_F(ExternalSSTFileTest, PickedLevelBug) {
Options options = CurrentOptions();
options.disable_auto_compactions = false;
options.level0_file_num_compaction_trigger = 3;
options.num_levels = 2;
options.env = env_;
DestroyAndReopen(options);
std::vector<int> file_keys;
// file #1 in L0
file_keys = {0, 5, 7};
for (int k : file_keys) {
ASSERT_OK(Put(Key(k), Key(k)));
}
ASSERT_OK(Flush());
// file #2 in L0
file_keys = {4, 6, 8, 9};
for (int k : file_keys) {
ASSERT_OK(Put(Key(k), Key(k)));
}
ASSERT_OK(Flush());
// We have 2 overlapping files in L0
EXPECT_EQ(FilesPerLevel(), "2");
rocksdb::SyncPoint::GetInstance()->LoadDependency(
{{"DBImpl::AddFile:MutexLock", "ExternalSSTFileTest::PickedLevelBug:0"},
{"ExternalSSTFileTest::PickedLevelBug:1", "DBImpl::AddFile:MutexUnlock"},
{"ExternalSSTFileTest::PickedLevelBug:2",
"DBImpl::RunManualCompaction:0"},
{"ExternalSSTFileTest::PickedLevelBug:3",
"DBImpl::RunManualCompaction:1"}});
std::atomic<bool> bg_compact_started(false);
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::BackgroundCompaction:Start",
[&](void* arg) { bg_compact_started.store(true); });
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
// While writing the MANIFEST start a thread that will ask for compaction
std::thread bg_compact([&]() {
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
});
TEST_SYNC_POINT("ExternalSSTFileTest::PickedLevelBug:2");
// Start a thread that will ingest a new file
std::thread bg_addfile([&]() {
file_keys = {1, 2, 3};
ASSERT_OK(GenerateAndAddExternalFile(options, file_keys, 1));
});
// Wait for AddFile to start picking levels and writing MANIFEST
TEST_SYNC_POINT("ExternalSSTFileTest::PickedLevelBug:0");
TEST_SYNC_POINT("ExternalSSTFileTest::PickedLevelBug:3");
// We need to verify that no compactions can run while AddFile is
// ingesting the files into the levels it find suitable. So we will
// wait for 2 seconds to give a chance for compactions to run during
// this period, and then make sure that no compactions where able to run
env_->SleepForMicroseconds(1000000 * 2);
ASSERT_FALSE(bg_compact_started.load());
// Hold AddFile from finishing writing the MANIFEST
TEST_SYNC_POINT("ExternalSSTFileTest::PickedLevelBug:1");
bg_addfile.join();
bg_compact.join();
dbfull()->TEST_WaitForCompact();
int total_keys = 0;
Iterator* iter = db_->NewIterator(ReadOptions());
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
ASSERT_OK(iter->status());
total_keys++;
}
ASSERT_EQ(total_keys, 10);
delete iter;
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
}
TEST_F(ExternalSSTFileTest, CompactDuringAddFileRandom) {
Options options = CurrentOptions();
options.disable_auto_compactions = false;
options.level0_file_num_compaction_trigger = 2;
options.num_levels = 2;
options.env = env_;
DestroyAndReopen(options);
std::function<void()> bg_compact = [&]() {
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
};
int range_id = 0;
std::vector<int> file_keys;
std::function<void()> bg_addfile = [&]() {
ASSERT_OK(GenerateAndAddExternalFile(options, file_keys, range_id));
};
std::vector<std::thread> threads;
while (range_id < 5000) {
int range_start = (range_id * 20);
int range_end = range_start + 10;
file_keys.clear();
for (int k = range_start + 1; k < range_end; k++) {
file_keys.push_back(k);
}
ASSERT_OK(Put(Key(range_start), Key(range_start)));
ASSERT_OK(Put(Key(range_end), Key(range_end)));
ASSERT_OK(Flush());
if (range_id % 10 == 0) {
threads.emplace_back(bg_compact);
}
threads.emplace_back(bg_addfile);
for (auto& t : threads) {
t.join();
}
threads.clear();
range_id++;
}
}
TEST_F(ExternalSSTFileTest, PickedLevelDynamic) { TEST_F(ExternalSSTFileTest, PickedLevelDynamic) {
Options options = CurrentOptions(); Options options = CurrentOptions();
options.disable_auto_compactions = false; options.disable_auto_compactions = false;

View File

@ -383,7 +383,7 @@ Status WalManager::ReadFirstRecord(const WalFileType type,
Status s; Status s;
if (type == kAliveLogFile) { if (type == kAliveLogFile) {
std::string fname = LogFileName(db_options_.wal_dir, number); std::string fname = LogFileName(db_options_.wal_dir, number);
s = ReadFirstLine(fname, sequence); s = ReadFirstLine(fname, number, sequence);
if (env_->FileExists(fname).ok() && !s.ok()) { if (env_->FileExists(fname).ok() && !s.ok()) {
// return any error that is not caused by non-existing file // return any error that is not caused by non-existing file
return s; return s;
@ -394,7 +394,7 @@ Status WalManager::ReadFirstRecord(const WalFileType type,
// check if the file got moved to archive. // check if the file got moved to archive.
std::string archived_file = std::string archived_file =
ArchivedLogFileName(db_options_.wal_dir, number); ArchivedLogFileName(db_options_.wal_dir, number);
s = ReadFirstLine(archived_file, sequence); s = ReadFirstLine(archived_file, number, sequence);
// maybe the file was deleted from archive dir. If that's the case, return // maybe the file was deleted from archive dir. If that's the case, return
// Status::OK(). The caller with identify this as empty file because // Status::OK(). The caller with identify this as empty file because
// *sequence == 0 // *sequence == 0
@ -413,6 +413,7 @@ Status WalManager::ReadFirstRecord(const WalFileType type,
// the function returns status.ok() and sequence == 0 if the file exists, but is // the function returns status.ok() and sequence == 0 if the file exists, but is
// empty // empty
Status WalManager::ReadFirstLine(const std::string& fname, Status WalManager::ReadFirstLine(const std::string& fname,
const uint64_t number,
SequenceNumber* sequence) { SequenceNumber* sequence) {
struct LogReporter : public log::Reader::Reporter { struct LogReporter : public log::Reader::Reporter {
Env* env; Env* env;
@ -449,7 +450,7 @@ Status WalManager::ReadFirstLine(const std::string& fname,
reporter.status = &status; reporter.status = &status;
reporter.ignore_error = !db_options_.paranoid_checks; reporter.ignore_error = !db_options_.paranoid_checks;
log::Reader reader(db_options_.info_log, std::move(file_reader), &reporter, log::Reader reader(db_options_.info_log, std::move(file_reader), &reporter,
true /*checksum*/, 0 /*initial_offset*/, *sequence); true /*checksum*/, 0 /*initial_offset*/, number);
std::string scratch; std::string scratch;
Slice record; Slice record;

View File

@ -54,9 +54,9 @@ class WalManager {
return ReadFirstRecord(type, number, sequence); return ReadFirstRecord(type, number, sequence);
} }
Status TEST_ReadFirstLine(const std::string& fname, Status TEST_ReadFirstLine(const std::string& fname, const uint64_t number,
SequenceNumber* sequence) { SequenceNumber* sequence) {
return ReadFirstLine(fname, sequence); return ReadFirstLine(fname, number, sequence);
} }
private: private:
@ -71,7 +71,8 @@ class WalManager {
Status ReadFirstRecord(const WalFileType type, const uint64_t number, Status ReadFirstRecord(const WalFileType type, const uint64_t number,
SequenceNumber* sequence); SequenceNumber* sequence);
Status ReadFirstLine(const std::string& fname, SequenceNumber* sequence); Status ReadFirstLine(const std::string& fname, const uint64_t number,
SequenceNumber* sequence);
// ------- state from DBImpl ------ // ------- state from DBImpl ------
const DBOptions& db_options_; const DBOptions& db_options_;

View File

@ -119,10 +119,11 @@ TEST_F(WalManagerTest, ReadFirstRecordCache) {
ASSERT_OK(env_->NewWritableFile(path, &file, EnvOptions())); ASSERT_OK(env_->NewWritableFile(path, &file, EnvOptions()));
SequenceNumber s; SequenceNumber s;
ASSERT_OK(wal_manager_->TEST_ReadFirstLine(path, &s)); ASSERT_OK(wal_manager_->TEST_ReadFirstLine(path, 1 /* number */, &s));
ASSERT_EQ(s, 0U); ASSERT_EQ(s, 0U);
ASSERT_OK(wal_manager_->TEST_ReadFirstRecord(kAliveLogFile, 1, &s)); ASSERT_OK(
wal_manager_->TEST_ReadFirstRecord(kAliveLogFile, 1 /* number */, &s));
ASSERT_EQ(s, 0U); ASSERT_EQ(s, 0U);
unique_ptr<WritableFileWriter> file_writer( unique_ptr<WritableFileWriter> file_writer(

View File

@ -106,7 +106,7 @@ class HdfsEnv : public Env {
} }
virtual int UnSchedule(void* tag, Priority pri) { virtual int UnSchedule(void* tag, Priority pri) {
posixEnv->UnSchedule(tag, pri); return posixEnv->UnSchedule(tag, pri);
} }
virtual void StartThread(void (*function)(void* arg), void* arg) { virtual void StartThread(void (*function)(void* arg), void* arg) {

View File

@ -9,7 +9,9 @@
#include <stdint.h> #include <stdint.h>
#include <memory> #include <memory>
#include <string>
#include "rocksdb/env.h"
#include "rocksdb/slice.h" #include "rocksdb/slice.h"
#include "rocksdb/statistics.h" #include "rocksdb/statistics.h"
#include "rocksdb/status.h" #include "rocksdb/status.h"
@ -46,4 +48,10 @@ class PersistentCache {
virtual bool IsCompressed() = 0; virtual bool IsCompressed() = 0;
}; };
// Factor method to create a new persistent cache
Status NewPersistentCache(Env* const env, const std::string& path,
const uint64_t size,
const std::shared_ptr<Logger>& log,
const bool optimized_for_nvm,
std::shared_ptr<PersistentCache>* cache);
} // namespace rocksdb } // namespace rocksdb

View File

@ -137,7 +137,7 @@ clean:
$(AM_V_at)rm -rf $(SAMPLES_OUTPUT) $(AM_V_at)rm -rf $(SAMPLES_OUTPUT)
javadocs: javadocs: java
$(AM_V_GEN)mkdir -p $(JAVADOC) $(AM_V_GEN)mkdir -p $(JAVADOC)
$(AM_V_at)javadoc -d $(JAVADOC) -sourcepath $(MAIN_SRC) -subpackages org $(AM_V_at)javadoc -d $(JAVADOC) -sourcepath $(MAIN_SRC) -subpackages org
@ -176,7 +176,7 @@ resolve_test_deps:
test -s "$(JAVA_CGLIB_JAR)" || cp $(MVN_LOCAL)/cglib/cglib/2.2.2/cglib-2.2.2.jar $(JAVA_TEST_LIBDIR) || curl -k -L -o "$(JAVA_CGLIB_JAR)" http://search.maven.org/remotecontent?filepath=cglib/cglib/2.2.2/cglib-2.2.2.jar test -s "$(JAVA_CGLIB_JAR)" || cp $(MVN_LOCAL)/cglib/cglib/2.2.2/cglib-2.2.2.jar $(JAVA_TEST_LIBDIR) || curl -k -L -o "$(JAVA_CGLIB_JAR)" http://search.maven.org/remotecontent?filepath=cglib/cglib/2.2.2/cglib-2.2.2.jar
test -s "$(JAVA_ASSERTJ_JAR)" || cp $(MVN_LOCAL)/org/assertj/assertj-core/1.7.1/assertj-core-1.7.1.jar $(JAVA_TEST_LIBDIR) || curl -k -L -o "$(JAVA_ASSERTJ_JAR)" http://central.maven.org/maven2/org/assertj/assertj-core/1.7.1/assertj-core-1.7.1.jar test -s "$(JAVA_ASSERTJ_JAR)" || cp $(MVN_LOCAL)/org/assertj/assertj-core/1.7.1/assertj-core-1.7.1.jar $(JAVA_TEST_LIBDIR) || curl -k -L -o "$(JAVA_ASSERTJ_JAR)" http://central.maven.org/maven2/org/assertj/assertj-core/1.7.1/assertj-core-1.7.1.jar
java_test: resolve_test_deps java_test: java resolve_test_deps
$(AM_V_GEN)mkdir -p $(TEST_CLASSES) $(AM_V_GEN)mkdir -p $(TEST_CLASSES)
$(AM_V_at)javac -cp $(MAIN_CLASSES):$(JAVA_TESTCLASSPATH) -d $(TEST_CLASSES)\ $(AM_V_at)javac -cp $(MAIN_CLASSES):$(JAVA_TESTCLASSPATH) -d $(TEST_CLASSES)\
$(TEST_SRC)/org/rocksdb/test/*.java\ $(TEST_SRC)/org/rocksdb/test/*.java\
@ -184,7 +184,7 @@ java_test: resolve_test_deps
$(TEST_SRC)/org/rocksdb/*.java $(TEST_SRC)/org/rocksdb/*.java
$(AM_V_at)javah -cp $(MAIN_CLASSES):$(TEST_CLASSES) -d $(NATIVE_INCLUDE) -jni $(NATIVE_JAVA_TEST_CLASSES) $(AM_V_at)javah -cp $(MAIN_CLASSES):$(TEST_CLASSES) -d $(NATIVE_INCLUDE) -jni $(NATIVE_JAVA_TEST_CLASSES)
test: java resolve_test_deps java_test run_test test: java java_test run_test
run_test: run_test:
java -ea -Xcheck:jni -Djava.library.path=target -cp "$(MAIN_CLASSES):$(TEST_CLASSES):$(JAVA_TESTCLASSPATH):target/*" org.rocksdb.test.RocksJunitRunner $(JAVA_TESTS) java -ea -Xcheck:jni -Djava.library.path=target -cp "$(MAIN_CLASSES):$(TEST_CLASSES):$(JAVA_TESTCLASSPATH):target/*" org.rocksdb.test.RocksJunitRunner $(JAVA_TESTS)

View File

@ -4371,6 +4371,20 @@ jlong Java_org_rocksdb_DBOptions_writeThreadSlowYieldUsec(
write_thread_slow_yield_usec; write_thread_slow_yield_usec;
} }
void Java_org_rocksdb_DBOptions_setDelayedWriteRate(
JNIEnv* env, jobject jobj, jlong jhandle, jlong delay_write_rate){
reinterpret_cast<rocksdb::DBOptions*>(jhandle)->
delayed_write_rate = static_cast<int64_t>(delay_write_rate);
}
jlong Java_org_rocksdb_DBOptions_delayedWriteRate(
JNIEnv* env, jobject jobj, jlong jhandle){
return reinterpret_cast<rocksdb::DBOptions*>(jhandle)->
delayed_write_rate;
}
////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////
// rocksdb::WriteOptions // rocksdb::WriteOptions

View File

@ -204,16 +204,19 @@ jobjectArray Java_org_rocksdb_RocksDB_listColumnFamilies(
////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////
// rocksdb::DB::Put // rocksdb::DB::Put
void rocksdb_put_helper( void rocksdb_put_helper(JNIEnv* env, rocksdb::DB* db,
JNIEnv* env, rocksdb::DB* db, const rocksdb::WriteOptions& write_options, const rocksdb::WriteOptions& write_options,
rocksdb::ColumnFamilyHandle* cf_handle, jbyteArray jkey, jint jkey_len, rocksdb::ColumnFamilyHandle* cf_handle, jbyteArray jkey,
jbyteArray jentry_value, jint jentry_value_len) { jint jkey_off, jint jkey_len, jbyteArray jval,
jint jval_off, jint jval_len) {
jbyte* key = new jbyte[jkey_len];
env->GetByteArrayRegion(jkey, jkey_off, jkey_len, key);
jbyte* value = new jbyte[jval_len];
env->GetByteArrayRegion(jval, jval_off, jval_len, value);
jbyte* key = env->GetByteArrayElements(jkey, 0);
jbyte* value = env->GetByteArrayElements(jentry_value, 0);
rocksdb::Slice key_slice(reinterpret_cast<char*>(key), jkey_len); rocksdb::Slice key_slice(reinterpret_cast<char*>(key), jkey_len);
rocksdb::Slice value_slice(reinterpret_cast<char*>(value), rocksdb::Slice value_slice(reinterpret_cast<char*>(value), jval_len);
jentry_value_len);
rocksdb::Status s; rocksdb::Status s;
if (cf_handle != nullptr) { if (cf_handle != nullptr) {
@ -223,11 +226,9 @@ void rocksdb_put_helper(
s = db->Put(write_options, key_slice, value_slice); s = db->Put(write_options, key_slice, value_slice);
} }
// trigger java unref on key and value. // cleanup
// by passing JNI_ABORT, it will simply release the reference without delete [] value;
// copying the result back to the java byte array. delete [] key;
env->ReleaseByteArrayElements(jkey, key, JNI_ABORT);
env->ReleaseByteArrayElements(jentry_value, value, JNI_ABORT);
if (s.ok()) { if (s.ok()) {
return; return;
@ -238,36 +239,39 @@ void rocksdb_put_helper(
/* /*
* Class: org_rocksdb_RocksDB * Class: org_rocksdb_RocksDB
* Method: put * Method: put
* Signature: (J[BI[BI)V * Signature: (J[BII[BII)V
*/ */
void Java_org_rocksdb_RocksDB_put__J_3BI_3BI( void Java_org_rocksdb_RocksDB_put__J_3BII_3BII(JNIEnv* env, jobject jdb,
JNIEnv* env, jobject jdb, jlong jdb_handle, jlong jdb_handle,
jbyteArray jkey, jint jkey_len, jbyteArray jkey, jint jkey_off,
jbyteArray jentry_value, jint jentry_value_len) { jint jkey_len, jbyteArray jval,
jint jval_off, jint jval_len) {
auto db = reinterpret_cast<rocksdb::DB*>(jdb_handle); auto db = reinterpret_cast<rocksdb::DB*>(jdb_handle);
static const rocksdb::WriteOptions default_write_options = static const rocksdb::WriteOptions default_write_options =
rocksdb::WriteOptions(); rocksdb::WriteOptions();
rocksdb_put_helper(env, db, default_write_options, nullptr, rocksdb_put_helper(env, db, default_write_options, nullptr, jkey, jkey_off,
jkey, jkey_len, jkey_len, jval, jval_off, jval_len);
jentry_value, jentry_value_len);
} }
/* /*
* Class: org_rocksdb_RocksDB * Class: org_rocksdb_RocksDB
* Method: put * Method: put
* Signature: (J[BI[BIJ)V * Signature: (J[BII[BIIJ)V
*/ */
void Java_org_rocksdb_RocksDB_put__J_3BI_3BIJ( void Java_org_rocksdb_RocksDB_put__J_3BII_3BIIJ(JNIEnv* env, jobject jdb,
JNIEnv* env, jobject jdb, jlong jdb_handle, jlong jdb_handle,
jbyteArray jkey, jint jkey_len, jbyteArray jkey, jint jkey_off,
jbyteArray jentry_value, jint jentry_value_len, jlong jcf_handle) { jint jkey_len, jbyteArray jval,
jint jval_off, jint jval_len,
jlong jcf_handle) {
auto db = reinterpret_cast<rocksdb::DB*>(jdb_handle); auto db = reinterpret_cast<rocksdb::DB*>(jdb_handle);
static const rocksdb::WriteOptions default_write_options = static const rocksdb::WriteOptions default_write_options =
rocksdb::WriteOptions(); rocksdb::WriteOptions();
auto cf_handle = reinterpret_cast<rocksdb::ColumnFamilyHandle*>(jcf_handle); auto cf_handle = reinterpret_cast<rocksdb::ColumnFamilyHandle*>(jcf_handle);
if (cf_handle != nullptr) { if (cf_handle != nullptr) {
rocksdb_put_helper(env, db, default_write_options, cf_handle, rocksdb_put_helper(env, db, default_write_options, cf_handle, jkey,
jkey, jkey_len, jentry_value, jentry_value_len); jkey_off, jkey_len, jval, jval_off, jval_len);
} else { } else {
rocksdb::RocksDBExceptionJni::ThrowNew(env, rocksdb::RocksDBExceptionJni::ThrowNew(env,
rocksdb::Status::InvalidArgument("Invalid ColumnFamilyHandle.")); rocksdb::Status::InvalidArgument("Invalid ColumnFamilyHandle."));
@ -277,39 +281,38 @@ void Java_org_rocksdb_RocksDB_put__J_3BI_3BIJ(
/* /*
* Class: org_rocksdb_RocksDB * Class: org_rocksdb_RocksDB
* Method: put * Method: put
* Signature: (JJ[BI[BI)V * Signature: (JJ[BII[BII)V
*/ */
void Java_org_rocksdb_RocksDB_put__JJ_3BI_3BI( void Java_org_rocksdb_RocksDB_put__JJ_3BII_3BII(JNIEnv* env, jobject jdb,
JNIEnv* env, jobject jdb, jlong jdb_handle,
jlong jdb_handle, jlong jwrite_options_handle, jlong jwrite_options_handle,
jbyteArray jkey, jint jkey_len, jbyteArray jkey, jint jkey_off,
jbyteArray jentry_value, jint jentry_value_len) { jint jkey_len, jbyteArray jval,
jint jval_off, jint jval_len) {
auto db = reinterpret_cast<rocksdb::DB*>(jdb_handle); auto db = reinterpret_cast<rocksdb::DB*>(jdb_handle);
auto write_options = reinterpret_cast<rocksdb::WriteOptions*>( auto write_options = reinterpret_cast<rocksdb::WriteOptions*>(
jwrite_options_handle); jwrite_options_handle);
rocksdb_put_helper(env, db, *write_options, nullptr, rocksdb_put_helper(env, db, *write_options, nullptr, jkey, jkey_off, jkey_len,
jkey, jkey_len, jval, jval_off, jval_len);
jentry_value, jentry_value_len);
} }
/* /*
* Class: org_rocksdb_RocksDB * Class: org_rocksdb_RocksDB
* Method: put * Method: put
* Signature: (JJ[BI[BIJ)V * Signature: (JJ[BII[BIIJ)V
*/ */
void Java_org_rocksdb_RocksDB_put__JJ_3BI_3BIJ( void Java_org_rocksdb_RocksDB_put__JJ_3BII_3BIIJ(
JNIEnv* env, jobject jdb, JNIEnv* env, jobject jdb, jlong jdb_handle, jlong jwrite_options_handle,
jlong jdb_handle, jlong jwrite_options_handle, jbyteArray jkey, jint jkey_off, jint jkey_len, jbyteArray jval,
jbyteArray jkey, jint jkey_len, jint jval_off, jint jval_len, jlong jcf_handle) {
jbyteArray jentry_value, jint jentry_value_len, jlong jcf_handle) {
auto db = reinterpret_cast<rocksdb::DB*>(jdb_handle); auto db = reinterpret_cast<rocksdb::DB*>(jdb_handle);
auto write_options = reinterpret_cast<rocksdb::WriteOptions*>( auto write_options = reinterpret_cast<rocksdb::WriteOptions*>(
jwrite_options_handle); jwrite_options_handle);
auto cf_handle = reinterpret_cast<rocksdb::ColumnFamilyHandle*>(jcf_handle); auto cf_handle = reinterpret_cast<rocksdb::ColumnFamilyHandle*>(jcf_handle);
if (cf_handle != nullptr) { if (cf_handle != nullptr) {
rocksdb_put_helper(env, db, *write_options, cf_handle, rocksdb_put_helper(env, db, *write_options, cf_handle, jkey, jkey_off,
jkey, jkey_len, jentry_value, jentry_value_len); jkey_len, jval, jval_off, jval_len);
} else { } else {
rocksdb::RocksDBExceptionJni::ThrowNew(env, rocksdb::RocksDBExceptionJni::ThrowNew(env,
rocksdb::Status::InvalidArgument("Invalid ColumnFamilyHandle.")); rocksdb::Status::InvalidArgument("Invalid ColumnFamilyHandle."));
@ -363,12 +366,12 @@ void Java_org_rocksdb_RocksDB_write1(
// rocksdb::DB::KeyMayExist // rocksdb::DB::KeyMayExist
jboolean key_may_exist_helper(JNIEnv* env, rocksdb::DB* db, jboolean key_may_exist_helper(JNIEnv* env, rocksdb::DB* db,
const rocksdb::ReadOptions& read_opt, const rocksdb::ReadOptions& read_opt,
rocksdb::ColumnFamilyHandle* cf_handle, jbyteArray jkey, jint jkey_len, rocksdb::ColumnFamilyHandle* cf_handle, jbyteArray jkey, jint jkey_off,
jobject jstring_buffer) { jint jkey_len, jobject jstring_buffer) {
std::string value; std::string value;
bool value_found = false; bool value_found = false;
jboolean isCopy; jbyte* key = new jbyte[jkey_len];
jbyte* key = env->GetByteArrayElements(jkey, &isCopy); env->GetByteArrayRegion(jkey, jkey_off, jkey_len, key);
rocksdb::Slice key_slice(reinterpret_cast<char*>(key), jkey_len); rocksdb::Slice key_slice(reinterpret_cast<char*>(key), jkey_len);
bool keyMayExist; bool keyMayExist;
if (cf_handle != nullptr) { if (cf_handle != nullptr) {
@ -379,6 +382,10 @@ jboolean key_may_exist_helper(JNIEnv* env, rocksdb::DB* db,
&value, &value_found); &value, &value_found);
} }
// cleanup
delete [] key;
// extract the value
if (value_found && !value.empty()) { if (value_found && !value.empty()) {
jclass clazz = env->GetObjectClass(jstring_buffer); jclass clazz = env->GetObjectClass(jstring_buffer);
jmethodID mid = env->GetMethodID(clazz, "append", jmethodID mid = env->GetMethodID(clazz, "append",
@ -386,37 +393,36 @@ jboolean key_may_exist_helper(JNIEnv* env, rocksdb::DB* db,
jstring new_value_str = env->NewStringUTF(value.c_str()); jstring new_value_str = env->NewStringUTF(value.c_str());
env->CallObjectMethod(jstring_buffer, mid, new_value_str); env->CallObjectMethod(jstring_buffer, mid, new_value_str);
} }
env->ReleaseByteArrayElements(jkey, key, JNI_ABORT);
return static_cast<jboolean>(keyMayExist); return static_cast<jboolean>(keyMayExist);
} }
/* /*
* Class: org_rocksdb_RocksDB * Class: org_rocksdb_RocksDB
* Method: keyMayExist * Method: keyMayExist
* Signature: (J[BILjava/lang/StringBuffer;)Z * Signature: (J[BIILjava/lang/StringBuffer;)Z
*/ */
jboolean Java_org_rocksdb_RocksDB_keyMayExist__J_3BILjava_lang_StringBuffer_2( jboolean Java_org_rocksdb_RocksDB_keyMayExist__J_3BIILjava_lang_StringBuffer_2(
JNIEnv* env, jobject jdb, jlong jdb_handle, jbyteArray jkey, jint jkey_len, JNIEnv* env, jobject jdb, jlong jdb_handle, jbyteArray jkey, jint jkey_off,
jobject jstring_buffer) { jint jkey_len, jobject jstring_buffer) {
auto* db = reinterpret_cast<rocksdb::DB*>(jdb_handle); auto* db = reinterpret_cast<rocksdb::DB*>(jdb_handle);
return key_may_exist_helper(env, db, rocksdb::ReadOptions(), return key_may_exist_helper(env, db, rocksdb::ReadOptions(),
nullptr, jkey, jkey_len, jstring_buffer); nullptr, jkey, jkey_off, jkey_len, jstring_buffer);
} }
/* /*
* Class: org_rocksdb_RocksDB * Class: org_rocksdb_RocksDB
* Method: keyMayExist * Method: keyMayExist
* Signature: (J[BIJLjava/lang/StringBuffer;)Z * Signature: (J[BIIJLjava/lang/StringBuffer;)Z
*/ */
jboolean Java_org_rocksdb_RocksDB_keyMayExist__J_3BIJLjava_lang_StringBuffer_2( jboolean Java_org_rocksdb_RocksDB_keyMayExist__J_3BIIJLjava_lang_StringBuffer_2(
JNIEnv* env, jobject jdb, jlong jdb_handle, jbyteArray jkey, jint jkey_len, JNIEnv* env, jobject jdb, jlong jdb_handle, jbyteArray jkey, jint jkey_off,
jlong jcf_handle, jobject jstring_buffer) { jint jkey_len, jlong jcf_handle, jobject jstring_buffer) {
auto* db = reinterpret_cast<rocksdb::DB*>(jdb_handle); auto* db = reinterpret_cast<rocksdb::DB*>(jdb_handle);
auto* cf_handle = reinterpret_cast<rocksdb::ColumnFamilyHandle*>( auto* cf_handle = reinterpret_cast<rocksdb::ColumnFamilyHandle*>(
jcf_handle); jcf_handle);
if (cf_handle != nullptr) { if (cf_handle != nullptr) {
return key_may_exist_helper(env, db, rocksdb::ReadOptions(), return key_may_exist_helper(env, db, rocksdb::ReadOptions(),
cf_handle, jkey, jkey_len, jstring_buffer); cf_handle, jkey, jkey_off, jkey_len, jstring_buffer);
} else { } else {
rocksdb::RocksDBExceptionJni::ThrowNew(env, rocksdb::RocksDBExceptionJni::ThrowNew(env,
rocksdb::Status::InvalidArgument("Invalid ColumnFamilyHandle.")); rocksdb::Status::InvalidArgument("Invalid ColumnFamilyHandle."));
@ -427,26 +433,27 @@ jboolean Java_org_rocksdb_RocksDB_keyMayExist__J_3BIJLjava_lang_StringBuffer_2(
/* /*
* Class: org_rocksdb_RocksDB * Class: org_rocksdb_RocksDB
* Method: keyMayExist * Method: keyMayExist
* Signature: (JJ[BILjava/lang/StringBuffer;)Z * Signature: (JJ[BIILjava/lang/StringBuffer;)Z
*/ */
jboolean Java_org_rocksdb_RocksDB_keyMayExist__JJ_3BILjava_lang_StringBuffer_2( jboolean Java_org_rocksdb_RocksDB_keyMayExist__JJ_3BIILjava_lang_StringBuffer_2(
JNIEnv* env, jobject jdb, jlong jdb_handle, jlong jread_options_handle, JNIEnv* env, jobject jdb, jlong jdb_handle, jlong jread_options_handle,
jbyteArray jkey, jint jkey_len, jobject jstring_buffer) { jbyteArray jkey, jint jkey_off, jint jkey_len, jobject jstring_buffer) {
auto* db = reinterpret_cast<rocksdb::DB*>(jdb_handle); auto* db = reinterpret_cast<rocksdb::DB*>(jdb_handle);
auto& read_options = *reinterpret_cast<rocksdb::ReadOptions*>( auto& read_options = *reinterpret_cast<rocksdb::ReadOptions*>(
jread_options_handle); jread_options_handle);
return key_may_exist_helper(env, db, read_options, return key_may_exist_helper(env, db, read_options,
nullptr, jkey, jkey_len, jstring_buffer); nullptr, jkey, jkey_off, jkey_len, jstring_buffer);
} }
/* /*
* Class: org_rocksdb_RocksDB * Class: org_rocksdb_RocksDB
* Method: keyMayExist * Method: keyMayExist
* Signature: (JJ[BIJLjava/lang/StringBuffer;)Z * Signature: (JJ[BIIJLjava/lang/StringBuffer;)Z
*/ */
jboolean Java_org_rocksdb_RocksDB_keyMayExist__JJ_3BIJLjava_lang_StringBuffer_2( jboolean Java_org_rocksdb_RocksDB_keyMayExist__JJ_3BIIJLjava_lang_StringBuffer_2(
JNIEnv* env, jobject jdb, jlong jdb_handle, jlong jread_options_handle, JNIEnv* env, jobject jdb, jlong jdb_handle, jlong jread_options_handle,
jbyteArray jkey, jint jkey_len, jlong jcf_handle, jobject jstring_buffer) { jbyteArray jkey, jint jkey_off, jint jkey_len, jlong jcf_handle,
jobject jstring_buffer) {
auto* db = reinterpret_cast<rocksdb::DB*>(jdb_handle); auto* db = reinterpret_cast<rocksdb::DB*>(jdb_handle);
auto& read_options = *reinterpret_cast<rocksdb::ReadOptions*>( auto& read_options = *reinterpret_cast<rocksdb::ReadOptions*>(
jread_options_handle); jread_options_handle);
@ -454,7 +461,7 @@ jboolean Java_org_rocksdb_RocksDB_keyMayExist__JJ_3BIJLjava_lang_StringBuffer_2(
jcf_handle); jcf_handle);
if (cf_handle != nullptr) { if (cf_handle != nullptr) {
return key_may_exist_helper(env, db, read_options, cf_handle, return key_may_exist_helper(env, db, read_options, cf_handle,
jkey, jkey_len, jstring_buffer); jkey, jkey_off, jkey_len, jstring_buffer);
} else { } else {
rocksdb::RocksDBExceptionJni::ThrowNew(env, rocksdb::RocksDBExceptionJni::ThrowNew(env,
rocksdb::Status::InvalidArgument("Invalid ColumnFamilyHandle.")); rocksdb::Status::InvalidArgument("Invalid ColumnFamilyHandle."));
@ -468,9 +475,10 @@ jboolean Java_org_rocksdb_RocksDB_keyMayExist__JJ_3BIJLjava_lang_StringBuffer_2(
jbyteArray rocksdb_get_helper( jbyteArray rocksdb_get_helper(
JNIEnv* env, rocksdb::DB* db, const rocksdb::ReadOptions& read_opt, JNIEnv* env, rocksdb::DB* db, const rocksdb::ReadOptions& read_opt,
rocksdb::ColumnFamilyHandle* column_family_handle, jbyteArray jkey, rocksdb::ColumnFamilyHandle* column_family_handle, jbyteArray jkey,
jint jkey_len) { jint jkey_off, jint jkey_len) {
jboolean isCopy;
jbyte* key = env->GetByteArrayElements(jkey, &isCopy); jbyte* key = new jbyte[jkey_len];
env->GetByteArrayRegion(jkey, jkey_off, jkey_len, key);
rocksdb::Slice key_slice( rocksdb::Slice key_slice(
reinterpret_cast<char*>(key), jkey_len); reinterpret_cast<char*>(key), jkey_len);
@ -483,10 +491,8 @@ jbyteArray rocksdb_get_helper(
s = db->Get(read_opt, key_slice, &value); s = db->Get(read_opt, key_slice, &value);
} }
// trigger java unref on key. // cleanup
// by passing JNI_ABORT, it will simply release the reference without delete [] key;
// copying the result back to the java byte array.
env->ReleaseByteArrayElements(jkey, key, JNI_ABORT);
if (s.IsNotFound()) { if (s.IsNotFound()) {
return nullptr; return nullptr;
@ -506,30 +512,30 @@ jbyteArray rocksdb_get_helper(
/* /*
* Class: org_rocksdb_RocksDB * Class: org_rocksdb_RocksDB
* Method: get * Method: get
* Signature: (J[BI)[B * Signature: (J[BII)[B
*/ */
jbyteArray Java_org_rocksdb_RocksDB_get__J_3BI( jbyteArray Java_org_rocksdb_RocksDB_get__J_3BII(
JNIEnv* env, jobject jdb, jlong jdb_handle, JNIEnv* env, jobject jdb, jlong jdb_handle,
jbyteArray jkey, jint jkey_len) { jbyteArray jkey, jint jkey_off, jint jkey_len) {
return rocksdb_get_helper(env, return rocksdb_get_helper(env,
reinterpret_cast<rocksdb::DB*>(jdb_handle), reinterpret_cast<rocksdb::DB*>(jdb_handle),
rocksdb::ReadOptions(), nullptr, rocksdb::ReadOptions(), nullptr,
jkey, jkey_len); jkey, jkey_off, jkey_len);
} }
/* /*
* Class: org_rocksdb_RocksDB * Class: org_rocksdb_RocksDB
* Method: get * Method: get
* Signature: (J[BIJ)[B * Signature: (J[BIIJ)[B
*/ */
jbyteArray Java_org_rocksdb_RocksDB_get__J_3BIJ( jbyteArray Java_org_rocksdb_RocksDB_get__J_3BIIJ(
JNIEnv* env, jobject jdb, jlong jdb_handle, JNIEnv* env, jobject jdb, jlong jdb_handle,
jbyteArray jkey, jint jkey_len, jlong jcf_handle) { jbyteArray jkey, jint jkey_off, jint jkey_len, jlong jcf_handle) {
auto db_handle = reinterpret_cast<rocksdb::DB*>(jdb_handle); auto db_handle = reinterpret_cast<rocksdb::DB*>(jdb_handle);
auto cf_handle = reinterpret_cast<rocksdb::ColumnFamilyHandle*>(jcf_handle); auto cf_handle = reinterpret_cast<rocksdb::ColumnFamilyHandle*>(jcf_handle);
if (cf_handle != nullptr) { if (cf_handle != nullptr) {
return rocksdb_get_helper(env, db_handle, rocksdb::ReadOptions(), return rocksdb_get_helper(env, db_handle, rocksdb::ReadOptions(),
cf_handle, jkey, jkey_len); cf_handle, jkey, jkey_off, jkey_len);
} else { } else {
rocksdb::RocksDBExceptionJni::ThrowNew(env, rocksdb::RocksDBExceptionJni::ThrowNew(env,
rocksdb::Status::InvalidArgument("Invalid ColumnFamilyHandle.")); rocksdb::Status::InvalidArgument("Invalid ColumnFamilyHandle."));
@ -541,31 +547,31 @@ jbyteArray Java_org_rocksdb_RocksDB_get__J_3BIJ(
/* /*
* Class: org_rocksdb_RocksDB * Class: org_rocksdb_RocksDB
* Method: get * Method: get
* Signature: (JJ[BI)[B * Signature: (JJ[BII)[B
*/ */
jbyteArray Java_org_rocksdb_RocksDB_get__JJ_3BI( jbyteArray Java_org_rocksdb_RocksDB_get__JJ_3BII(
JNIEnv* env, jobject jdb, jlong jdb_handle, jlong jropt_handle, JNIEnv* env, jobject jdb, jlong jdb_handle, jlong jropt_handle,
jbyteArray jkey, jint jkey_len) { jbyteArray jkey, jint jkey_off, jint jkey_len) {
return rocksdb_get_helper(env, return rocksdb_get_helper(env,
reinterpret_cast<rocksdb::DB*>(jdb_handle), reinterpret_cast<rocksdb::DB*>(jdb_handle),
*reinterpret_cast<rocksdb::ReadOptions*>(jropt_handle), nullptr, *reinterpret_cast<rocksdb::ReadOptions*>(jropt_handle), nullptr,
jkey, jkey_len); jkey, jkey_off, jkey_len);
} }
/* /*
* Class: org_rocksdb_RocksDB * Class: org_rocksdb_RocksDB
* Method: get * Method: get
* Signature: (JJ[BIJ)[B * Signature: (JJ[BIIJ)[B
*/ */
jbyteArray Java_org_rocksdb_RocksDB_get__JJ_3BIJ( jbyteArray Java_org_rocksdb_RocksDB_get__JJ_3BIIJ(
JNIEnv* env, jobject jdb, jlong jdb_handle, jlong jropt_handle, JNIEnv* env, jobject jdb, jlong jdb_handle, jlong jropt_handle,
jbyteArray jkey, jint jkey_len, jlong jcf_handle) { jbyteArray jkey, jint jkey_off, jint jkey_len, jlong jcf_handle) {
auto db_handle = reinterpret_cast<rocksdb::DB*>(jdb_handle); auto db_handle = reinterpret_cast<rocksdb::DB*>(jdb_handle);
auto& ro_opt = *reinterpret_cast<rocksdb::ReadOptions*>(jropt_handle); auto& ro_opt = *reinterpret_cast<rocksdb::ReadOptions*>(jropt_handle);
auto cf_handle = reinterpret_cast<rocksdb::ColumnFamilyHandle*>(jcf_handle); auto cf_handle = reinterpret_cast<rocksdb::ColumnFamilyHandle*>(jcf_handle);
if (cf_handle != nullptr) { if (cf_handle != nullptr) {
return rocksdb_get_helper(env, db_handle, ro_opt, cf_handle, return rocksdb_get_helper(env, db_handle, ro_opt, cf_handle,
jkey, jkey_len); jkey, jkey_off, jkey_len);
} else { } else {
rocksdb::RocksDBExceptionJni::ThrowNew(env, rocksdb::RocksDBExceptionJni::ThrowNew(env,
rocksdb::Status::InvalidArgument("Invalid ColumnFamilyHandle.")); rocksdb::Status::InvalidArgument("Invalid ColumnFamilyHandle."));
@ -574,14 +580,16 @@ jbyteArray Java_org_rocksdb_RocksDB_get__JJ_3BIJ(
} }
} }
jint rocksdb_get_helper( jint rocksdb_get_helper(JNIEnv* env, rocksdb::DB* db,
JNIEnv* env, rocksdb::DB* db, const rocksdb::ReadOptions& read_options, const rocksdb::ReadOptions& read_options,
rocksdb::ColumnFamilyHandle* column_family_handle, jbyteArray jkey, rocksdb::ColumnFamilyHandle* column_family_handle,
jint jkey_len, jbyteArray jentry_value, jint jentry_value_len) { jbyteArray jkey, jint jkey_off, jint jkey_len,
jbyteArray jval, jint jval_off, jint jval_len) {
static const int kNotFound = -1; static const int kNotFound = -1;
static const int kStatusError = -2; static const int kStatusError = -2;
jbyte* key = env->GetByteArrayElements(jkey, 0); jbyte* key = new jbyte[jkey_len];
env->GetByteArrayRegion(jkey, jkey_off, jkey_len, key);
rocksdb::Slice key_slice( rocksdb::Slice key_slice(
reinterpret_cast<char*>(key), jkey_len); reinterpret_cast<char*>(key), jkey_len);
@ -596,10 +604,8 @@ jint rocksdb_get_helper(
s = db->Get(read_options, key_slice, &cvalue); s = db->Get(read_options, key_slice, &cvalue);
} }
// trigger java unref on key. // cleanup
// by passing JNI_ABORT, it will simply release the reference without delete [] key;
// copying the result back to the java byte array.
env->ReleaseByteArrayElements(jkey, key, JNI_ABORT);
if (s.IsNotFound()) { if (s.IsNotFound()) {
return kNotFound; return kNotFound;
@ -617,17 +623,17 @@ jint rocksdb_get_helper(
} }
jint cvalue_len = static_cast<jint>(cvalue.size()); jint cvalue_len = static_cast<jint>(cvalue.size());
jint length = std::min(jentry_value_len, cvalue_len); jint length = std::min(jval_len, cvalue_len);
env->SetByteArrayRegion( env->SetByteArrayRegion(jval, jval_off, length,
jentry_value, 0, length, reinterpret_cast<const jbyte*>(cvalue.c_str()));
reinterpret_cast<const jbyte*>(cvalue.c_str()));
return cvalue_len; return cvalue_len;
} }
// cf multi get // cf multi get
jobjectArray multi_get_helper(JNIEnv* env, jobject jdb, rocksdb::DB* db, jobjectArray multi_get_helper(JNIEnv* env, jobject jdb, rocksdb::DB* db,
const rocksdb::ReadOptions& rOpt, jobjectArray jkeys, const rocksdb::ReadOptions& rOpt, jobjectArray jkeys,
jintArray jkey_offs, jintArray jkey_lens,
jlongArray jcolumn_family_handles) { jlongArray jcolumn_family_handles) {
std::vector<rocksdb::ColumnFamilyHandle*> cf_handles; std::vector<rocksdb::ColumnFamilyHandle*> cf_handles;
if (jcolumn_family_handles != nullptr) { if (jcolumn_family_handles != nullptr) {
@ -642,24 +648,35 @@ jobjectArray multi_get_helper(JNIEnv* env, jobject jdb, rocksdb::DB* db,
} }
std::vector<rocksdb::Slice> keys; std::vector<rocksdb::Slice> keys;
std::vector<std::tuple<jbyteArray, jbyte*, jobject>> keys_to_free; std::vector<std::pair<jbyte*, jobject>> keys_to_free;
jsize len_keys = env->GetArrayLength(jkeys); jsize len_keys = env->GetArrayLength(jkeys);
if(env->EnsureLocalCapacity(len_keys) != 0) { if (env->EnsureLocalCapacity(len_keys) != 0) {
// out of memory // out of memory
return NULL; return NULL;
} }
for (int i = 0; i < len_keys; i++) {
jobject jk = env->GetObjectArrayElement(jkeys, i);
jbyteArray jk_ba = reinterpret_cast<jbyteArray>(jk);
jsize len_key = env->GetArrayLength(jk_ba);
jbyte* jk_val = env->GetByteArrayElements(jk_ba, NULL);
rocksdb::Slice key_slice(reinterpret_cast<char*>(jk_val), len_key); jint* jkey_off = env->GetIntArrayElements(jkey_offs, NULL);
jint* jkey_len = env->GetIntArrayElements(jkey_lens, NULL);
for (int i = 0; i < len_keys; i++) {
jobject jkey = env->GetObjectArrayElement(jkeys, i);
jbyteArray jkey_ba = reinterpret_cast<jbyteArray>(jkey);
jint len_key = jkey_len[i];
jbyte* key = new jbyte[len_key];
env->GetByteArrayRegion(jkey_ba, jkey_off[i], len_key, key);
rocksdb::Slice key_slice(reinterpret_cast<char*>(key), len_key);
keys.push_back(key_slice); keys.push_back(key_slice);
keys_to_free.push_back(std::make_tuple(jk_ba, jk_val, jk)); keys_to_free.push_back(std::pair<jbyte*, jobject>(key, jkey));
} }
// cleanup jkey_off and jken_len
env->ReleaseIntArrayElements(jkey_lens, jkey_len, JNI_ABORT);
env->ReleaseIntArrayElements(jkey_offs, jkey_off, JNI_ABORT);
std::vector<std::string> values; std::vector<std::string> values;
std::vector<rocksdb::Status> s; std::vector<rocksdb::Status> s;
if (cf_handles.size() == 0) { if (cf_handles.size() == 0) {
@ -669,15 +686,11 @@ jobjectArray multi_get_helper(JNIEnv* env, jobject jdb, rocksdb::DB* db,
} }
// free up allocated byte arrays // free up allocated byte arrays
for (std::vector<std::tuple<jbyteArray, jbyte*, jobject>>::size_type i = 0; for (auto it = keys_to_free.begin(); it != keys_to_free.end(); ++it) {
i < keys_to_free.size(); i++) { delete [] it->first;
jobject jk; env->DeleteLocalRef(it->second);
jbyteArray jk_ba;
jbyte* jk_val;
std::tie(jk_ba, jk_val, jk) = keys_to_free[i];
env->ReleaseByteArrayElements(jk_ba, jk_val, JNI_ABORT);
env->DeleteLocalRef(jk);
} }
keys_to_free.clear();
// prepare the results // prepare the results
jclass jcls_ba = env->FindClass("[B"); jclass jcls_ba = env->FindClass("[B");
@ -703,80 +716,88 @@ jobjectArray multi_get_helper(JNIEnv* env, jobject jdb, rocksdb::DB* db,
/* /*
* Class: org_rocksdb_RocksDB * Class: org_rocksdb_RocksDB
* Method: multiGet * Method: multiGet
* Signature: (J[[B)[[B * Signature: (J[[B[I[I)[[B
*/ */
jobjectArray Java_org_rocksdb_RocksDB_multiGet__J_3_3B( jobjectArray Java_org_rocksdb_RocksDB_multiGet__J_3_3B_3I_3I(
JNIEnv* env, jobject jdb, jlong jdb_handle, jobjectArray jkeys) { JNIEnv* env, jobject jdb, jlong jdb_handle, jobjectArray jkeys,
jintArray jkey_offs, jintArray jkey_lens) {
return multi_get_helper(env, jdb, reinterpret_cast<rocksdb::DB*>(jdb_handle), return multi_get_helper(env, jdb, reinterpret_cast<rocksdb::DB*>(jdb_handle),
rocksdb::ReadOptions(), jkeys, nullptr); rocksdb::ReadOptions(), jkeys, jkey_offs, jkey_lens, nullptr);
} }
/* /*
* Class: org_rocksdb_RocksDB * Class: org_rocksdb_RocksDB
* Method: multiGet * Method: multiGet
* Signature: (J[[B[J)[[B * Signature: (J[[B[I[I[J)[[B
*/ */
jobjectArray Java_org_rocksdb_RocksDB_multiGet__J_3_3B_3J( jobjectArray Java_org_rocksdb_RocksDB_multiGet__J_3_3B_3I_3I_3J(
JNIEnv* env, jobject jdb, jlong jdb_handle, jobjectArray jkeys, JNIEnv* env, jobject jdb, jlong jdb_handle, jobjectArray jkeys,
jintArray jkey_offs, jintArray jkey_lens,
jlongArray jcolumn_family_handles) { jlongArray jcolumn_family_handles) {
return multi_get_helper(env, jdb, reinterpret_cast<rocksdb::DB*>(jdb_handle), return multi_get_helper(env, jdb, reinterpret_cast<rocksdb::DB*>(jdb_handle),
rocksdb::ReadOptions(), jkeys, jcolumn_family_handles); rocksdb::ReadOptions(), jkeys, jkey_offs, jkey_lens,
}
/*
* Class: org_rocksdb_RocksDB
* Method: multiGet
* Signature: (JJ[[B)[[B
*/
jobjectArray Java_org_rocksdb_RocksDB_multiGet__JJ_3_3B(
JNIEnv* env, jobject jdb, jlong jdb_handle, jlong jropt_handle,
jobjectArray jkeys) {
return multi_get_helper(env, jdb, reinterpret_cast<rocksdb::DB*>(jdb_handle),
*reinterpret_cast<rocksdb::ReadOptions*>(jropt_handle), jkeys, nullptr);
}
/*
* Class: org_rocksdb_RocksDB
* Method: multiGet
* Signature: (JJ[[B[J)[[B
*/
jobjectArray Java_org_rocksdb_RocksDB_multiGet__JJ_3_3B_3J(
JNIEnv* env, jobject jdb, jlong jdb_handle, jlong jropt_handle,
jobjectArray jkeys, jlongArray jcolumn_family_handles) {
return multi_get_helper(env, jdb, reinterpret_cast<rocksdb::DB*>(jdb_handle),
*reinterpret_cast<rocksdb::ReadOptions*>(jropt_handle), jkeys,
jcolumn_family_handles); jcolumn_family_handles);
} }
/* /*
* Class: org_rocksdb_RocksDB * Class: org_rocksdb_RocksDB
* Method: get * Method: multiGet
* Signature: (J[BI[BI)I * Signature: (JJ[[B[I[I)[[B
*/ */
jint Java_org_rocksdb_RocksDB_get__J_3BI_3BI( jobjectArray Java_org_rocksdb_RocksDB_multiGet__JJ_3_3B_3I_3I(
JNIEnv* env, jobject jdb, jlong jdb_handle, JNIEnv* env, jobject jdb, jlong jdb_handle, jlong jropt_handle,
jbyteArray jkey, jint jkey_len, jobjectArray jkeys, jintArray jkey_offs, jintArray jkey_lens) {
jbyteArray jentry_value, jint jentry_value_len) { return multi_get_helper(env, jdb, reinterpret_cast<rocksdb::DB*>(jdb_handle),
return rocksdb_get_helper(env, *reinterpret_cast<rocksdb::ReadOptions*>(jropt_handle), jkeys, jkey_offs,
reinterpret_cast<rocksdb::DB*>(jdb_handle), jkey_lens, nullptr);
rocksdb::ReadOptions(), nullptr, }
jkey, jkey_len, jentry_value, jentry_value_len);
/*
* Class: org_rocksdb_RocksDB
* Method: multiGet
* Signature: (JJ[[B[I[I[J)[[B
*/
jobjectArray Java_org_rocksdb_RocksDB_multiGet__JJ_3_3B_3I_3I_3J(
JNIEnv* env, jobject jdb, jlong jdb_handle, jlong jropt_handle,
jobjectArray jkeys, jintArray jkey_offs, jintArray jkey_lens,
jlongArray jcolumn_family_handles) {
return multi_get_helper(env, jdb, reinterpret_cast<rocksdb::DB*>(jdb_handle),
*reinterpret_cast<rocksdb::ReadOptions*>(jropt_handle), jkeys, jkey_offs,
jkey_lens, jcolumn_family_handles);
} }
/* /*
* Class: org_rocksdb_RocksDB * Class: org_rocksdb_RocksDB
* Method: get * Method: get
* Signature: (J[BI[BIJ)I * Signature: (J[BII[BII)I
*/ */
jint Java_org_rocksdb_RocksDB_get__J_3BI_3BIJ( jint Java_org_rocksdb_RocksDB_get__J_3BII_3BII(JNIEnv* env, jobject jdb,
JNIEnv* env, jobject jdb, jlong jdb_handle, jlong jdb_handle,
jbyteArray jkey, jint jkey_len, jbyteArray jkey, jint jkey_off,
jbyteArray jentry_value, jint jentry_value_len, jlong jcf_handle) { jint jkey_len, jbyteArray jval,
jint jval_off, jint jval_len) {
return rocksdb_get_helper(env, reinterpret_cast<rocksdb::DB*>(jdb_handle),
rocksdb::ReadOptions(), nullptr, jkey, jkey_off,
jkey_len, jval, jval_off, jval_len);
}
/*
* Class: org_rocksdb_RocksDB
* Method: get
* Signature: (J[BII[BIIJ)I
*/
jint Java_org_rocksdb_RocksDB_get__J_3BII_3BIIJ(JNIEnv* env, jobject jdb,
jlong jdb_handle,
jbyteArray jkey, jint jkey_off,
jint jkey_len, jbyteArray jval,
jint jval_off, jint jval_len,
jlong jcf_handle) {
auto db_handle = reinterpret_cast<rocksdb::DB*>(jdb_handle); auto db_handle = reinterpret_cast<rocksdb::DB*>(jdb_handle);
auto cf_handle = reinterpret_cast<rocksdb::ColumnFamilyHandle*>(jcf_handle); auto cf_handle = reinterpret_cast<rocksdb::ColumnFamilyHandle*>(jcf_handle);
if (cf_handle != nullptr) { if (cf_handle != nullptr) {
return rocksdb_get_helper(env, db_handle, rocksdb::ReadOptions(), cf_handle, return rocksdb_get_helper(env, db_handle, rocksdb::ReadOptions(), cf_handle,
jkey, jkey_len, jentry_value, jentry_value_len); jkey, jkey_off, jkey_len, jval, jval_off,
jval_len);
} else { } else {
rocksdb::RocksDBExceptionJni::ThrowNew(env, rocksdb::RocksDBExceptionJni::ThrowNew(env,
rocksdb::Status::InvalidArgument("Invalid ColumnFamilyHandle.")); rocksdb::Status::InvalidArgument("Invalid ColumnFamilyHandle."));
@ -788,33 +809,35 @@ jint Java_org_rocksdb_RocksDB_get__J_3BI_3BIJ(
/* /*
* Class: org_rocksdb_RocksDB * Class: org_rocksdb_RocksDB
* Method: get * Method: get
* Signature: (JJ[BI[BI)I * Signature: (JJ[BII[BII)I
*/ */
jint Java_org_rocksdb_RocksDB_get__JJ_3BI_3BI( jint Java_org_rocksdb_RocksDB_get__JJ_3BII_3BII(JNIEnv* env, jobject jdb,
JNIEnv* env, jobject jdb, jlong jdb_handle, jlong jropt_handle, jlong jdb_handle,
jbyteArray jkey, jint jkey_len, jlong jropt_handle,
jbyteArray jentry_value, jint jentry_value_len) { jbyteArray jkey, jint jkey_off,
return rocksdb_get_helper(env, jint jkey_len, jbyteArray jval,
reinterpret_cast<rocksdb::DB*>(jdb_handle), jint jval_off, jint jval_len) {
*reinterpret_cast<rocksdb::ReadOptions*>(jropt_handle), return rocksdb_get_helper(
nullptr, jkey, jkey_len, jentry_value, jentry_value_len); env, reinterpret_cast<rocksdb::DB*>(jdb_handle),
*reinterpret_cast<rocksdb::ReadOptions*>(jropt_handle), nullptr, jkey,
jkey_off, jkey_len, jval, jval_off, jval_len);
} }
/* /*
* Class: org_rocksdb_RocksDB * Class: org_rocksdb_RocksDB
* Method: get * Method: get
* Signature: (JJ[BI[BIJ)I * Signature: (JJ[BII[BIIJ)I
*/ */
jint Java_org_rocksdb_RocksDB_get__JJ_3BI_3BIJ( jint Java_org_rocksdb_RocksDB_get__JJ_3BII_3BIIJ(
JNIEnv* env, jobject jdb, jlong jdb_handle, jlong jropt_handle, JNIEnv* env, jobject jdb, jlong jdb_handle, jlong jropt_handle,
jbyteArray jkey, jint jkey_len, jbyteArray jkey, jint jkey_off, jint jkey_len, jbyteArray jval,
jbyteArray jentry_value, jint jentry_value_len, jlong jcf_handle) { jint jval_off, jint jval_len, jlong jcf_handle) {
auto db_handle = reinterpret_cast<rocksdb::DB*>(jdb_handle); auto db_handle = reinterpret_cast<rocksdb::DB*>(jdb_handle);
auto& ro_opt = *reinterpret_cast<rocksdb::ReadOptions*>(jropt_handle); auto& ro_opt = *reinterpret_cast<rocksdb::ReadOptions*>(jropt_handle);
auto cf_handle = reinterpret_cast<rocksdb::ColumnFamilyHandle*>(jcf_handle); auto cf_handle = reinterpret_cast<rocksdb::ColumnFamilyHandle*>(jcf_handle);
if (cf_handle != nullptr) { if (cf_handle != nullptr) {
return rocksdb_get_helper(env, db_handle, ro_opt, cf_handle, jkey, return rocksdb_get_helper(env, db_handle, ro_opt, cf_handle, jkey, jkey_off,
jkey_len, jentry_value, jentry_value_len); jkey_len, jval, jval_off, jval_len);
} else { } else {
rocksdb::RocksDBExceptionJni::ThrowNew(env, rocksdb::RocksDBExceptionJni::ThrowNew(env,
rocksdb::Status::InvalidArgument("Invalid ColumnFamilyHandle.")); rocksdb::Status::InvalidArgument("Invalid ColumnFamilyHandle."));
@ -826,8 +849,10 @@ jint Java_org_rocksdb_RocksDB_get__JJ_3BI_3BIJ(
// rocksdb::DB::Delete() // rocksdb::DB::Delete()
void rocksdb_delete_helper( void rocksdb_delete_helper(
JNIEnv* env, rocksdb::DB* db, const rocksdb::WriteOptions& write_options, JNIEnv* env, rocksdb::DB* db, const rocksdb::WriteOptions& write_options,
rocksdb::ColumnFamilyHandle* cf_handle, jbyteArray jkey, jint jkey_len) { rocksdb::ColumnFamilyHandle* cf_handle, jbyteArray jkey, jint jkey_off,
jbyte* key = env->GetByteArrayElements(jkey, 0); jint jkey_len) {
jbyte* key = new jbyte[jkey_len];
env->GetByteArrayRegion(jkey, jkey_off, jkey_len, key);
rocksdb::Slice key_slice(reinterpret_cast<char*>(key), jkey_len); rocksdb::Slice key_slice(reinterpret_cast<char*>(key), jkey_len);
rocksdb::Status s; rocksdb::Status s;
@ -837,10 +862,9 @@ void rocksdb_delete_helper(
// backwards compatibility // backwards compatibility
s = db->Delete(write_options, key_slice); s = db->Delete(write_options, key_slice);
} }
// trigger java unref on key and value.
// by passing JNI_ABORT, it will simply release the reference without // cleanup
// copying the result back to the java byte array. delete [] key;
env->ReleaseByteArrayElements(jkey, key, JNI_ABORT);
if (!s.ok()) { if (!s.ok()) {
rocksdb::RocksDBExceptionJni::ThrowNew(env, s); rocksdb::RocksDBExceptionJni::ThrowNew(env, s);
@ -851,33 +875,33 @@ void rocksdb_delete_helper(
/* /*
* Class: org_rocksdb_RocksDB * Class: org_rocksdb_RocksDB
* Method: delete * Method: delete
* Signature: (J[BI)V * Signature: (J[BII)V
*/ */
void Java_org_rocksdb_RocksDB_delete__J_3BI( void Java_org_rocksdb_RocksDB_delete__J_3BII(
JNIEnv* env, jobject jdb, jlong jdb_handle, JNIEnv* env, jobject jdb, jlong jdb_handle,
jbyteArray jkey, jint jkey_len) { jbyteArray jkey, jint jkey_off, jint jkey_len) {
auto db = reinterpret_cast<rocksdb::DB*>(jdb_handle); auto db = reinterpret_cast<rocksdb::DB*>(jdb_handle);
static const rocksdb::WriteOptions default_write_options = static const rocksdb::WriteOptions default_write_options =
rocksdb::WriteOptions(); rocksdb::WriteOptions();
rocksdb_delete_helper(env, db, default_write_options, nullptr, rocksdb_delete_helper(env, db, default_write_options, nullptr,
jkey, jkey_len); jkey, jkey_off, jkey_len);
} }
/* /*
* Class: org_rocksdb_RocksDB * Class: org_rocksdb_RocksDB
* Method: delete * Method: delete
* Signature: (J[BIJ)V * Signature: (J[BIIJ)V
*/ */
void Java_org_rocksdb_RocksDB_delete__J_3BIJ( void Java_org_rocksdb_RocksDB_delete__J_3BIIJ(
JNIEnv* env, jobject jdb, jlong jdb_handle, JNIEnv* env, jobject jdb, jlong jdb_handle,
jbyteArray jkey, jint jkey_len, jlong jcf_handle) { jbyteArray jkey, jint jkey_off, jint jkey_len, jlong jcf_handle) {
auto db = reinterpret_cast<rocksdb::DB*>(jdb_handle); auto db = reinterpret_cast<rocksdb::DB*>(jdb_handle);
static const rocksdb::WriteOptions default_write_options = static const rocksdb::WriteOptions default_write_options =
rocksdb::WriteOptions(); rocksdb::WriteOptions();
auto cf_handle = reinterpret_cast<rocksdb::ColumnFamilyHandle*>(jcf_handle); auto cf_handle = reinterpret_cast<rocksdb::ColumnFamilyHandle*>(jcf_handle);
if (cf_handle != nullptr) { if (cf_handle != nullptr) {
rocksdb_delete_helper(env, db, default_write_options, cf_handle, rocksdb_delete_helper(env, db, default_write_options, cf_handle,
jkey, jkey_len); jkey, jkey_off, jkey_len);
} else { } else {
rocksdb::RocksDBExceptionJni::ThrowNew(env, rocksdb::RocksDBExceptionJni::ThrowNew(env,
rocksdb::Status::InvalidArgument("Invalid ColumnFamilyHandle.")); rocksdb::Status::InvalidArgument("Invalid ColumnFamilyHandle."));
@ -887,30 +911,32 @@ void Java_org_rocksdb_RocksDB_delete__J_3BIJ(
/* /*
* Class: org_rocksdb_RocksDB * Class: org_rocksdb_RocksDB
* Method: delete * Method: delete
* Signature: (JJ[BIJ)V * Signature: (JJ[BII)V
*/ */
void Java_org_rocksdb_RocksDB_delete__JJ_3BI( void Java_org_rocksdb_RocksDB_delete__JJ_3BII(
JNIEnv* env, jobject jdb, jlong jdb_handle, JNIEnv* env, jobject jdb, jlong jdb_handle,
jlong jwrite_options, jbyteArray jkey, jint jkey_len) { jlong jwrite_options, jbyteArray jkey, jint jkey_off, jint jkey_len) {
auto db = reinterpret_cast<rocksdb::DB*>(jdb_handle); auto db = reinterpret_cast<rocksdb::DB*>(jdb_handle);
auto write_options = reinterpret_cast<rocksdb::WriteOptions*>(jwrite_options); auto write_options = reinterpret_cast<rocksdb::WriteOptions*>(jwrite_options);
rocksdb_delete_helper(env, db, *write_options, nullptr, jkey, jkey_len); rocksdb_delete_helper(env, db, *write_options, nullptr, jkey, jkey_off,
jkey_len);
} }
/* /*
* Class: org_rocksdb_RocksDB * Class: org_rocksdb_RocksDB
* Method: delete * Method: delete
* Signature: (JJ[BIJ)V * Signature: (JJ[BIIJ)V
*/ */
void Java_org_rocksdb_RocksDB_delete__JJ_3BIJ( void Java_org_rocksdb_RocksDB_delete__JJ_3BIIJ(
JNIEnv* env, jobject jdb, jlong jdb_handle, JNIEnv* env, jobject jdb, jlong jdb_handle,
jlong jwrite_options, jbyteArray jkey, jint jkey_len, jlong jwrite_options, jbyteArray jkey, jint jkey_off, jint jkey_len,
jlong jcf_handle) { jlong jcf_handle) {
auto db = reinterpret_cast<rocksdb::DB*>(jdb_handle); auto db = reinterpret_cast<rocksdb::DB*>(jdb_handle);
auto write_options = reinterpret_cast<rocksdb::WriteOptions*>(jwrite_options); auto write_options = reinterpret_cast<rocksdb::WriteOptions*>(jwrite_options);
auto cf_handle = reinterpret_cast<rocksdb::ColumnFamilyHandle*>(jcf_handle); auto cf_handle = reinterpret_cast<rocksdb::ColumnFamilyHandle*>(jcf_handle);
if (cf_handle != nullptr) { if (cf_handle != nullptr) {
rocksdb_delete_helper(env, db, *write_options, cf_handle, jkey, jkey_len); rocksdb_delete_helper(env, db, *write_options, cf_handle, jkey, jkey_off,
jkey_len);
} else { } else {
rocksdb::RocksDBExceptionJni::ThrowNew(env, rocksdb::RocksDBExceptionJni::ThrowNew(env,
rocksdb::Status::InvalidArgument("Invalid ColumnFamilyHandle.")); rocksdb::Status::InvalidArgument("Invalid ColumnFamilyHandle."));
@ -1016,16 +1042,17 @@ void Java_org_rocksdb_RocksDB_singleDelete__JJ_3BIJ(
////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////
// rocksdb::DB::Merge // rocksdb::DB::Merge
void rocksdb_merge_helper( void rocksdb_merge_helper(JNIEnv* env, rocksdb::DB* db,
JNIEnv* env, rocksdb::DB* db, const rocksdb::WriteOptions& write_options, const rocksdb::WriteOptions& write_options,
rocksdb::ColumnFamilyHandle* cf_handle, jbyteArray jkey, jint jkey_len, rocksdb::ColumnFamilyHandle* cf_handle,
jbyteArray jentry_value, jint jentry_value_len) { jbyteArray jkey, jint jkey_off, jint jkey_len,
jbyteArray jval, jint jval_off, jint jval_len) {
jbyte* key = env->GetByteArrayElements(jkey, 0); jbyte* key = new jbyte[jkey_len];
jbyte* value = env->GetByteArrayElements(jentry_value, 0); env->GetByteArrayRegion(jkey, jkey_off, jkey_len, key);
jbyte* value = new jbyte[jkey_len];
env->GetByteArrayRegion(jval, jval_off, jval_len, value);
rocksdb::Slice key_slice(reinterpret_cast<char*>(key), jkey_len); rocksdb::Slice key_slice(reinterpret_cast<char*>(key), jkey_len);
rocksdb::Slice value_slice(reinterpret_cast<char*>(value), rocksdb::Slice value_slice(reinterpret_cast<char*>(value), jval_len);
jentry_value_len);
rocksdb::Status s; rocksdb::Status s;
if (cf_handle != nullptr) { if (cf_handle != nullptr) {
@ -1034,11 +1061,9 @@ void rocksdb_merge_helper(
s = db->Merge(write_options, key_slice, value_slice); s = db->Merge(write_options, key_slice, value_slice);
} }
// trigger java unref on key and value. // cleanup
// by passing JNI_ABORT, it will simply release the reference without delete [] value;
// copying the result back to the java byte array. delete [] key;
env->ReleaseByteArrayElements(jkey, key, JNI_ABORT);
env->ReleaseByteArrayElements(jentry_value, value, JNI_ABORT);
if (s.ok()) { if (s.ok()) {
return; return;
@ -1049,36 +1074,37 @@ void rocksdb_merge_helper(
/* /*
* Class: org_rocksdb_RocksDB * Class: org_rocksdb_RocksDB
* Method: merge * Method: merge
* Signature: (J[BI[BI)V * Signature: (J[BII[BII)V
*/ */
void Java_org_rocksdb_RocksDB_merge__J_3BI_3BI( void Java_org_rocksdb_RocksDB_merge__J_3BII_3BII(JNIEnv* env, jobject jdb,
JNIEnv* env, jobject jdb, jlong jdb_handle, jlong jdb_handle,
jbyteArray jkey, jint jkey_len, jbyteArray jkey, jint jkey_off,
jbyteArray jentry_value, jint jentry_value_len) { jint jkey_len, jbyteArray jval,
jint jval_off, jint jval_len) {
auto db = reinterpret_cast<rocksdb::DB*>(jdb_handle); auto db = reinterpret_cast<rocksdb::DB*>(jdb_handle);
static const rocksdb::WriteOptions default_write_options = static const rocksdb::WriteOptions default_write_options =
rocksdb::WriteOptions(); rocksdb::WriteOptions();
rocksdb_merge_helper(env, db, default_write_options, rocksdb_merge_helper(env, db, default_write_options, nullptr, jkey, jkey_off,
nullptr, jkey, jkey_len, jentry_value, jentry_value_len); jkey_len, jval, jval_off, jval_len);
} }
/* /*
* Class: org_rocksdb_RocksDB * Class: org_rocksdb_RocksDB
* Method: merge * Method: merge
* Signature: (J[BI[BIJ)V * Signature: (J[BII[BIIJ)V
*/ */
void Java_org_rocksdb_RocksDB_merge__J_3BI_3BIJ( void Java_org_rocksdb_RocksDB_merge__J_3BII_3BIIJ(
JNIEnv* env, jobject jdb, jlong jdb_handle, JNIEnv* env, jobject jdb, jlong jdb_handle, jbyteArray jkey, jint jkey_off,
jbyteArray jkey, jint jkey_len, jint jkey_len, jbyteArray jval, jint jval_off, jint jval_len,
jbyteArray jentry_value, jint jentry_value_len, jlong jcf_handle) { jlong jcf_handle) {
auto db = reinterpret_cast<rocksdb::DB*>(jdb_handle); auto db = reinterpret_cast<rocksdb::DB*>(jdb_handle);
static const rocksdb::WriteOptions default_write_options = static const rocksdb::WriteOptions default_write_options =
rocksdb::WriteOptions(); rocksdb::WriteOptions();
auto cf_handle = reinterpret_cast<rocksdb::ColumnFamilyHandle*>(jcf_handle); auto cf_handle = reinterpret_cast<rocksdb::ColumnFamilyHandle*>(jcf_handle);
if (cf_handle != nullptr) { if (cf_handle != nullptr) {
rocksdb_merge_helper(env, db, default_write_options, rocksdb_merge_helper(env, db, default_write_options, cf_handle, jkey,
cf_handle, jkey, jkey_len, jentry_value, jentry_value_len); jkey_off, jkey_len, jval, jval_off, jval_len);
} else { } else {
rocksdb::RocksDBExceptionJni::ThrowNew(env, rocksdb::RocksDBExceptionJni::ThrowNew(env,
rocksdb::Status::InvalidArgument("Invalid ColumnFamilyHandle.")); rocksdb::Status::InvalidArgument("Invalid ColumnFamilyHandle."));
@ -1088,38 +1114,36 @@ void Java_org_rocksdb_RocksDB_merge__J_3BI_3BIJ(
/* /*
* Class: org_rocksdb_RocksDB * Class: org_rocksdb_RocksDB
* Method: merge * Method: merge
* Signature: (JJ[BI[BI)V * Signature: (JJ[BII[BII)V
*/ */
void Java_org_rocksdb_RocksDB_merge__JJ_3BI_3BI( void Java_org_rocksdb_RocksDB_merge__JJ_3BII_3BII(
JNIEnv* env, jobject jdb, JNIEnv* env, jobject jdb, jlong jdb_handle, jlong jwrite_options_handle,
jlong jdb_handle, jlong jwrite_options_handle, jbyteArray jkey, jint jkey_off, jint jkey_len, jbyteArray jval,
jbyteArray jkey, jint jkey_len, jint jval_off, jint jval_len) {
jbyteArray jentry_value, jint jentry_value_len) {
auto db = reinterpret_cast<rocksdb::DB*>(jdb_handle); auto db = reinterpret_cast<rocksdb::DB*>(jdb_handle);
auto write_options = reinterpret_cast<rocksdb::WriteOptions*>( auto write_options = reinterpret_cast<rocksdb::WriteOptions*>(
jwrite_options_handle); jwrite_options_handle);
rocksdb_merge_helper(env, db, *write_options, rocksdb_merge_helper(env, db, *write_options, nullptr, jkey, jkey_off,
nullptr, jkey, jkey_len, jentry_value, jentry_value_len); jkey_len, jval, jval_off, jval_len);
} }
/* /*
* Class: org_rocksdb_RocksDB * Class: org_rocksdb_RocksDB
* Method: merge * Method: merge
* Signature: (JJ[BI[BIJ)V * Signature: (JJ[BII[BIIJ)V
*/ */
void Java_org_rocksdb_RocksDB_merge__JJ_3BI_3BIJ( void Java_org_rocksdb_RocksDB_merge__JJ_3BII_3BIIJ(
JNIEnv* env, jobject jdb, JNIEnv* env, jobject jdb, jlong jdb_handle, jlong jwrite_options_handle,
jlong jdb_handle, jlong jwrite_options_handle, jbyteArray jkey, jint jkey_off, jint jkey_len, jbyteArray jval,
jbyteArray jkey, jint jkey_len, jint jval_off, jint jval_len, jlong jcf_handle) {
jbyteArray jentry_value, jint jentry_value_len, jlong jcf_handle) {
auto db = reinterpret_cast<rocksdb::DB*>(jdb_handle); auto db = reinterpret_cast<rocksdb::DB*>(jdb_handle);
auto write_options = reinterpret_cast<rocksdb::WriteOptions*>( auto write_options = reinterpret_cast<rocksdb::WriteOptions*>(
jwrite_options_handle); jwrite_options_handle);
auto cf_handle = reinterpret_cast<rocksdb::ColumnFamilyHandle*>(jcf_handle); auto cf_handle = reinterpret_cast<rocksdb::ColumnFamilyHandle*>(jcf_handle);
if (cf_handle != nullptr) { if (cf_handle != nullptr) {
rocksdb_merge_helper(env, db, *write_options, rocksdb_merge_helper(env, db, *write_options, cf_handle, jkey, jkey_off,
cf_handle, jkey, jkey_len, jentry_value, jentry_value_len); jkey_len, jval, jval_off, jval_len);
} else { } else {
rocksdb::RocksDBExceptionJni::ThrowNew(env, rocksdb::RocksDBExceptionJni::ThrowNew(env,
rocksdb::Status::InvalidArgument("Invalid ColumnFamilyHandle.")); rocksdb::Status::InvalidArgument("Invalid ColumnFamilyHandle."));
@ -1696,7 +1720,7 @@ void Java_org_rocksdb_RocksDB_setOptions(JNIEnv* env, jobject jdb,
std::unordered_map<std::string, std::string> options_map; std::unordered_map<std::string, std::string> options_map;
const jsize len = env->GetArrayLength(jkeys); const jsize len = env->GetArrayLength(jkeys);
assert(len == env->GetArrayLength(jvalues)); assert(len == env->GetArrayLength(jvalues));
for(int i = 0; i < len; i++) { for (int i = 0; i < len; i++) {
jobject jobj_key = env->GetObjectArrayElement(jkeys, i); jobject jobj_key = env->GetObjectArrayElement(jkeys, i);
jobject jobj_value = env->GetObjectArrayElement(jvalues, i); jobject jobj_value = env->GetObjectArrayElement(jvalues, i);
jstring jkey = reinterpret_cast<jstring>(jobj_key); jstring jkey = reinterpret_cast<jstring>(jobj_key);

View File

@ -604,6 +604,17 @@ public class DBOptions extends RocksObject implements DBOptionsInterface {
static final int DEFAULT_NUM_SHARD_BITS = -1; static final int DEFAULT_NUM_SHARD_BITS = -1;
public DBOptions setDelayedWriteRate(final long delayedWriteRate){
assert(isOwningHandle());
setDelayedWriteRate(nativeHandle_, delayedWriteRate);
return this;
}
public long delayedWriteRate(){
return delayedWriteRate(nativeHandle_);
}
/** /**
* <p>Private constructor to be used by * <p>Private constructor to be used by
* {@link #getDBOptionsFromProps(java.util.Properties)}</p> * {@link #getDBOptionsFromProps(java.util.Properties)}</p>
@ -725,6 +736,9 @@ public class DBOptions extends RocksObject implements DBOptionsInterface {
long writeThreadSlowYieldUsec); long writeThreadSlowYieldUsec);
private native long writeThreadSlowYieldUsec(long handle); private native long writeThreadSlowYieldUsec(long handle);
private native void setDelayedWriteRate(long handle, long delayedWriteRate);
private native long delayedWriteRate(long handle);
int numShardBits_; int numShardBits_;
RateLimiterConfig rateLimiterConfig_; RateLimiterConfig rateLimiterConfig_;
} }

View File

@ -403,7 +403,7 @@ public class RocksDB extends RocksObject {
*/ */
public void put(final byte[] key, final byte[] value) public void put(final byte[] key, final byte[] value)
throws RocksDBException { throws RocksDBException {
put(nativeHandle_, key, key.length, value, value.length); put(nativeHandle_, key, 0, key.length, value, 0, value.length);
} }
/** /**
@ -422,7 +422,7 @@ public class RocksDB extends RocksObject {
*/ */
public void put(final ColumnFamilyHandle columnFamilyHandle, public void put(final ColumnFamilyHandle columnFamilyHandle,
final byte[] key, final byte[] value) throws RocksDBException { final byte[] key, final byte[] value) throws RocksDBException {
put(nativeHandle_, key, key.length, value, value.length, put(nativeHandle_, key, 0, key.length, value, 0, value.length,
columnFamilyHandle.nativeHandle_); columnFamilyHandle.nativeHandle_);
} }
@ -439,7 +439,7 @@ public class RocksDB extends RocksObject {
public void put(final WriteOptions writeOpts, final byte[] key, public void put(final WriteOptions writeOpts, final byte[] key,
final byte[] value) throws RocksDBException { final byte[] value) throws RocksDBException {
put(nativeHandle_, writeOpts.nativeHandle_, put(nativeHandle_, writeOpts.nativeHandle_,
key, key.length, value, value.length); key, 0, key.length, value, 0, value.length);
} }
/** /**
@ -461,8 +461,8 @@ public class RocksDB extends RocksObject {
public void put(final ColumnFamilyHandle columnFamilyHandle, public void put(final ColumnFamilyHandle columnFamilyHandle,
final WriteOptions writeOpts, final byte[] key, final WriteOptions writeOpts, final byte[] key,
final byte[] value) throws RocksDBException { final byte[] value) throws RocksDBException {
put(nativeHandle_, writeOpts.nativeHandle_, key, key.length, value, put(nativeHandle_, writeOpts.nativeHandle_, key, 0, key.length, value,
value.length, columnFamilyHandle.nativeHandle_); 0, value.length, columnFamilyHandle.nativeHandle_);
} }
/** /**
@ -477,8 +477,8 @@ public class RocksDB extends RocksObject {
* found in block-cache. * found in block-cache.
* @return boolean value indicating if key does not exist or might exist. * @return boolean value indicating if key does not exist or might exist.
*/ */
public boolean keyMayExist(final byte[] key, final StringBuffer value){ public boolean keyMayExist(final byte[] key, final StringBuffer value) {
return keyMayExist(nativeHandle_, key, key.length, value); return keyMayExist(nativeHandle_, key, 0, key.length, value);
} }
/** /**
@ -495,8 +495,8 @@ public class RocksDB extends RocksObject {
* @return boolean value indicating if key does not exist or might exist. * @return boolean value indicating if key does not exist or might exist.
*/ */
public boolean keyMayExist(final ColumnFamilyHandle columnFamilyHandle, public boolean keyMayExist(final ColumnFamilyHandle columnFamilyHandle,
final byte[] key, final StringBuffer value){ final byte[] key, final StringBuffer value) {
return keyMayExist(nativeHandle_, key, key.length, return keyMayExist(nativeHandle_, key, 0, key.length,
columnFamilyHandle.nativeHandle_, value); columnFamilyHandle.nativeHandle_, value);
} }
@ -514,9 +514,9 @@ public class RocksDB extends RocksObject {
* @return boolean value indicating if key does not exist or might exist. * @return boolean value indicating if key does not exist or might exist.
*/ */
public boolean keyMayExist(final ReadOptions readOptions, public boolean keyMayExist(final ReadOptions readOptions,
final byte[] key, final StringBuffer value){ final byte[] key, final StringBuffer value) {
return keyMayExist(nativeHandle_, readOptions.nativeHandle_, return keyMayExist(nativeHandle_, readOptions.nativeHandle_,
key, key.length, value); key, 0, key.length, value);
} }
/** /**
@ -535,9 +535,9 @@ public class RocksDB extends RocksObject {
*/ */
public boolean keyMayExist(final ReadOptions readOptions, public boolean keyMayExist(final ReadOptions readOptions,
final ColumnFamilyHandle columnFamilyHandle, final byte[] key, final ColumnFamilyHandle columnFamilyHandle, final byte[] key,
final StringBuffer value){ final StringBuffer value) {
return keyMayExist(nativeHandle_, readOptions.nativeHandle_, return keyMayExist(nativeHandle_, readOptions.nativeHandle_,
key, key.length, columnFamilyHandle.nativeHandle_, key, 0, key.length, columnFamilyHandle.nativeHandle_,
value); value);
} }
@ -581,7 +581,7 @@ public class RocksDB extends RocksObject {
*/ */
public void merge(final byte[] key, final byte[] value) public void merge(final byte[] key, final byte[] value)
throws RocksDBException { throws RocksDBException {
merge(nativeHandle_, key, key.length, value, value.length); merge(nativeHandle_, key, 0, key.length, value, 0, value.length);
} }
/** /**
@ -597,7 +597,7 @@ public class RocksDB extends RocksObject {
*/ */
public void merge(final ColumnFamilyHandle columnFamilyHandle, public void merge(final ColumnFamilyHandle columnFamilyHandle,
final byte[] key, final byte[] value) throws RocksDBException { final byte[] key, final byte[] value) throws RocksDBException {
merge(nativeHandle_, key, key.length, value, value.length, merge(nativeHandle_, key, 0, key.length, value, 0, value.length,
columnFamilyHandle.nativeHandle_); columnFamilyHandle.nativeHandle_);
} }
@ -615,7 +615,7 @@ public class RocksDB extends RocksObject {
public void merge(final WriteOptions writeOpts, final byte[] key, public void merge(final WriteOptions writeOpts, final byte[] key,
final byte[] value) throws RocksDBException { final byte[] value) throws RocksDBException {
merge(nativeHandle_, writeOpts.nativeHandle_, merge(nativeHandle_, writeOpts.nativeHandle_,
key, key.length, value, value.length); key, 0, key.length, value, 0, value.length);
} }
/** /**
@ -634,7 +634,7 @@ public class RocksDB extends RocksObject {
final WriteOptions writeOpts, final byte[] key, final WriteOptions writeOpts, final byte[] key,
final byte[] value) throws RocksDBException { final byte[] value) throws RocksDBException {
merge(nativeHandle_, writeOpts.nativeHandle_, merge(nativeHandle_, writeOpts.nativeHandle_,
key, key.length, value, value.length, key, 0, key.length, value, 0, value.length,
columnFamilyHandle.nativeHandle_); columnFamilyHandle.nativeHandle_);
} }
@ -653,7 +653,7 @@ public class RocksDB extends RocksObject {
* native library. * native library.
*/ */
public int get(final byte[] key, final byte[] value) throws RocksDBException { public int get(final byte[] key, final byte[] value) throws RocksDBException {
return get(nativeHandle_, key, key.length, value, value.length); return get(nativeHandle_, key, 0, key.length, value, 0, value.length);
} }
/** /**
@ -675,7 +675,7 @@ public class RocksDB extends RocksObject {
*/ */
public int get(final ColumnFamilyHandle columnFamilyHandle, final byte[] key, public int get(final ColumnFamilyHandle columnFamilyHandle, final byte[] key,
final byte[] value) throws RocksDBException, IllegalArgumentException { final byte[] value) throws RocksDBException, IllegalArgumentException {
return get(nativeHandle_, key, key.length, value, value.length, return get(nativeHandle_, key, 0, key.length, value, 0, value.length,
columnFamilyHandle.nativeHandle_); columnFamilyHandle.nativeHandle_);
} }
@ -698,7 +698,7 @@ public class RocksDB extends RocksObject {
public int get(final ReadOptions opt, final byte[] key, public int get(final ReadOptions opt, final byte[] key,
final byte[] value) throws RocksDBException { final byte[] value) throws RocksDBException {
return get(nativeHandle_, opt.nativeHandle_, return get(nativeHandle_, opt.nativeHandle_,
key, key.length, value, value.length); key, 0, key.length, value, 0, value.length);
} }
/** /**
* Get the value associated with the specified key within column family. * Get the value associated with the specified key within column family.
@ -721,8 +721,8 @@ public class RocksDB extends RocksObject {
public int get(final ColumnFamilyHandle columnFamilyHandle, public int get(final ColumnFamilyHandle columnFamilyHandle,
final ReadOptions opt, final byte[] key, final byte[] value) final ReadOptions opt, final byte[] key, final byte[] value)
throws RocksDBException { throws RocksDBException {
return get(nativeHandle_, opt.nativeHandle_, key, key.length, value, return get(nativeHandle_, opt.nativeHandle_, key, 0, key.length, value,
value.length, columnFamilyHandle.nativeHandle_); 0, value.length, columnFamilyHandle.nativeHandle_);
} }
/** /**
@ -738,7 +738,7 @@ public class RocksDB extends RocksObject {
* native library. * native library.
*/ */
public byte[] get(final byte[] key) throws RocksDBException { public byte[] get(final byte[] key) throws RocksDBException {
return get(nativeHandle_, key, key.length); return get(nativeHandle_, key, 0, key.length);
} }
/** /**
@ -757,7 +757,7 @@ public class RocksDB extends RocksObject {
*/ */
public byte[] get(final ColumnFamilyHandle columnFamilyHandle, public byte[] get(final ColumnFamilyHandle columnFamilyHandle,
final byte[] key) throws RocksDBException { final byte[] key) throws RocksDBException {
return get(nativeHandle_, key, key.length, return get(nativeHandle_, key, 0, key.length,
columnFamilyHandle.nativeHandle_); columnFamilyHandle.nativeHandle_);
} }
@ -776,7 +776,7 @@ public class RocksDB extends RocksObject {
*/ */
public byte[] get(final ReadOptions opt, final byte[] key) public byte[] get(final ReadOptions opt, final byte[] key)
throws RocksDBException { throws RocksDBException {
return get(nativeHandle_, opt.nativeHandle_, key, key.length); return get(nativeHandle_, opt.nativeHandle_, key, 0, key.length);
} }
/** /**
@ -796,7 +796,7 @@ public class RocksDB extends RocksObject {
*/ */
public byte[] get(final ColumnFamilyHandle columnFamilyHandle, public byte[] get(final ColumnFamilyHandle columnFamilyHandle,
final ReadOptions opt, final byte[] key) throws RocksDBException { final ReadOptions opt, final byte[] key) throws RocksDBException {
return get(nativeHandle_, opt.nativeHandle_, key, key.length, return get(nativeHandle_, opt.nativeHandle_, key, 0, key.length,
columnFamilyHandle.nativeHandle_); columnFamilyHandle.nativeHandle_);
} }
@ -814,10 +814,17 @@ public class RocksDB extends RocksObject {
throws RocksDBException { throws RocksDBException {
assert(keys.size() != 0); assert(keys.size() != 0);
final byte[][] values = multiGet(nativeHandle_, final byte[][] keysArray = keys.toArray(new byte[keys.size()][]);
keys.toArray(new byte[keys.size()][])); final int keyOffsets[] = new int[keysArray.length];
final int keyLengths[] = new int[keysArray.length];
for(int i = 0; i < keyLengths.length; i++) {
keyLengths[i] = keysArray[i].length;
}
Map<byte[], byte[]> keyValueMap = new HashMap<>(); final byte[][] values = multiGet(nativeHandle_, keysArray, keyOffsets,
keyLengths);
final Map<byte[], byte[]> keyValueMap = new HashMap<>();
for(int i = 0; i < values.length; i++) { for(int i = 0; i < values.length; i++) {
if(values[i] == null) { if(values[i] == null) {
continue; continue;
@ -862,10 +869,18 @@ public class RocksDB extends RocksObject {
for (int i = 0; i < columnFamilyHandleList.size(); i++) { for (int i = 0; i < columnFamilyHandleList.size(); i++) {
cfHandles[i] = columnFamilyHandleList.get(i).nativeHandle_; cfHandles[i] = columnFamilyHandleList.get(i).nativeHandle_;
} }
final byte[][] values = multiGet(nativeHandle_,
keys.toArray(new byte[keys.size()][]), cfHandles);
Map<byte[], byte[]> keyValueMap = new HashMap<>(); final byte[][] keysArray = keys.toArray(new byte[keys.size()][]);
final int keyOffsets[] = new int[keysArray.length];
final int keyLengths[] = new int[keysArray.length];
for(int i = 0; i < keyLengths.length; i++) {
keyLengths[i] = keysArray[i].length;
}
final byte[][] values = multiGet(nativeHandle_, keysArray, keyOffsets,
keyLengths, cfHandles);
final Map<byte[], byte[]> keyValueMap = new HashMap<>();
for(int i = 0; i < values.length; i++) { for(int i = 0; i < values.length; i++) {
if (values[i] == null) { if (values[i] == null) {
continue; continue;
@ -890,10 +905,17 @@ public class RocksDB extends RocksObject {
final List<byte[]> keys) throws RocksDBException { final List<byte[]> keys) throws RocksDBException {
assert(keys.size() != 0); assert(keys.size() != 0);
final byte[][] values = multiGet(nativeHandle_, opt.nativeHandle_, final byte[][] keysArray = keys.toArray(new byte[keys.size()][]);
keys.toArray(new byte[keys.size()][])); final int keyOffsets[] = new int[keysArray.length];
final int keyLengths[] = new int[keysArray.length];
for(int i = 0; i < keyLengths.length; i++) {
keyLengths[i] = keysArray[i].length;
}
Map<byte[], byte[]> keyValueMap = new HashMap<>(); final byte[][] values = multiGet(nativeHandle_, opt.nativeHandle_,
keysArray, keyOffsets, keyLengths);
final Map<byte[], byte[]> keyValueMap = new HashMap<>();
for(int i = 0; i < values.length; i++) { for(int i = 0; i < values.length; i++) {
if(values[i] == null) { if(values[i] == null) {
continue; continue;
@ -938,10 +960,18 @@ public class RocksDB extends RocksObject {
for (int i = 0; i < columnFamilyHandleList.size(); i++) { for (int i = 0; i < columnFamilyHandleList.size(); i++) {
cfHandles[i] = columnFamilyHandleList.get(i).nativeHandle_; cfHandles[i] = columnFamilyHandleList.get(i).nativeHandle_;
} }
final byte[][] values = multiGet(nativeHandle_, opt.nativeHandle_,
keys.toArray(new byte[keys.size()][]), cfHandles);
Map<byte[], byte[]> keyValueMap = new HashMap<>(); final byte[][] keysArray = keys.toArray(new byte[keys.size()][]);
final int keyOffsets[] = new int[keysArray.length];
final int keyLengths[] = new int[keysArray.length];
for(int i = 0; i < keyLengths.length; i++) {
keyLengths[i] = keysArray[i].length;
}
final byte[][] values = multiGet(nativeHandle_, opt.nativeHandle_,
keysArray, keyOffsets, keyLengths, cfHandles);
final Map<byte[], byte[]> keyValueMap = new HashMap<>();
for(int i = 0; i < values.length; i++) { for(int i = 0; i < values.length; i++) {
if(values[i] == null) { if(values[i] == null) {
continue; continue;
@ -980,7 +1010,7 @@ public class RocksDB extends RocksObject {
* native library. * native library.
*/ */
public void delete(final byte[] key) throws RocksDBException { public void delete(final byte[] key) throws RocksDBException {
delete(nativeHandle_, key, key.length); delete(nativeHandle_, key, 0, key.length);
} }
/** /**
@ -1017,7 +1047,7 @@ public class RocksDB extends RocksObject {
*/ */
public void delete(final ColumnFamilyHandle columnFamilyHandle, public void delete(final ColumnFamilyHandle columnFamilyHandle,
final byte[] key) throws RocksDBException { final byte[] key) throws RocksDBException {
delete(nativeHandle_, key, key.length, columnFamilyHandle.nativeHandle_); delete(nativeHandle_, key, 0, key.length, columnFamilyHandle.nativeHandle_);
} }
/** /**
@ -1052,7 +1082,7 @@ public class RocksDB extends RocksObject {
*/ */
public void delete(final WriteOptions writeOpt, final byte[] key) public void delete(final WriteOptions writeOpt, final byte[] key)
throws RocksDBException { throws RocksDBException {
delete(nativeHandle_, writeOpt.nativeHandle_, key, key.length); delete(nativeHandle_, writeOpt.nativeHandle_, key, 0, key.length);
} }
/** /**
@ -1093,7 +1123,7 @@ public class RocksDB extends RocksObject {
public void delete(final ColumnFamilyHandle columnFamilyHandle, public void delete(final ColumnFamilyHandle columnFamilyHandle,
final WriteOptions writeOpt, final byte[] key) final WriteOptions writeOpt, final byte[] key)
throws RocksDBException { throws RocksDBException {
delete(nativeHandle_, writeOpt.nativeHandle_, key, key.length, delete(nativeHandle_, writeOpt.nativeHandle_, key, 0, key.length,
columnFamilyHandle.nativeHandle_); columnFamilyHandle.nativeHandle_);
} }
@ -1970,91 +2000,87 @@ public class RocksDB extends RocksObject {
final long[] columnFamilyOptions final long[] columnFamilyOptions
) throws RocksDBException; ) throws RocksDBException;
protected native static byte[][] listColumnFamilies( protected native static byte[][] listColumnFamilies(long optionsHandle,
long optionsHandle, String path) throws RocksDBException; String path) throws RocksDBException;
protected native void put( protected native void put(long handle, byte[] key, int keyOffset,
long handle, byte[] key, int keyLen, int keyLength, byte[] value, int valueOffset, int valueLength)
byte[] value, int valueLen) throws RocksDBException; throws RocksDBException;
protected native void put( protected native void put(long handle, byte[] key, int keyOffset,
long handle, byte[] key, int keyLen, int keyLength, byte[] value, int valueOffset, int valueLength,
byte[] value, int valueLen, long cfHandle) throws RocksDBException; long cfHandle) throws RocksDBException;
protected native void put( protected native void put(long handle, long writeOptHandle, byte[] key,
long handle, long writeOptHandle, int keyOffset, int keyLength, byte[] value, int valueOffset,
byte[] key, int keyLen, int valueLength) throws RocksDBException;
byte[] value, int valueLen) throws RocksDBException; protected native void put(long handle, long writeOptHandle, byte[] key,
protected native void put( int keyOffset, int keyLength, byte[] value, int valueOffset,
long handle, long writeOptHandle, int valueLength, long cfHandle) throws RocksDBException;
byte[] key, int keyLen,
byte[] value, int valueLen, long cfHandle) throws RocksDBException;
protected native void write0(final long handle, long writeOptHandle, protected native void write0(final long handle, long writeOptHandle,
long wbHandle) throws RocksDBException; long wbHandle) throws RocksDBException;
protected native void write1(final long handle, long writeOptHandle, protected native void write1(final long handle, long writeOptHandle,
long wbwiHandle) throws RocksDBException; long wbwiHandle) throws RocksDBException;
protected native boolean keyMayExist(final long handle, final byte[] key, protected native boolean keyMayExist(final long handle, final byte[] key,
final int keyLen, final StringBuffer stringBuffer); final int keyOffset, final int keyLength,
final StringBuffer stringBuffer);
protected native boolean keyMayExist(final long handle, final byte[] key, protected native boolean keyMayExist(final long handle, final byte[] key,
final int keyLen, final long cfHandle, final StringBuffer stringBuffer); final int keyOffset, final int keyLength, final long cfHandle,
protected native boolean keyMayExist(final long handle,
final long optionsHandle, final byte[] key, final int keyLen,
final StringBuffer stringBuffer); final StringBuffer stringBuffer);
protected native boolean keyMayExist(final long handle, protected native boolean keyMayExist(final long handle,
final long optionsHandle, final byte[] key, final int keyLen, final long optionsHandle, final byte[] key, final int keyOffset,
final long cfHandle, final StringBuffer stringBuffer); final int keyLength, final StringBuffer stringBuffer);
protected native void merge( protected native boolean keyMayExist(final long handle,
long handle, byte[] key, int keyLen, final long optionsHandle, final byte[] key, final int keyOffset,
byte[] value, int valueLen) throws RocksDBException; final int keyLength, final long cfHandle,
protected native void merge( final StringBuffer stringBuffer);
long handle, byte[] key, int keyLen, protected native void merge(long handle, byte[] key, int keyOffset,
byte[] value, int valueLen, long cfHandle) throws RocksDBException; int keyLength, byte[] value, int valueOffset, int valueLength)
protected native void merge( throws RocksDBException;
long handle, long writeOptHandle, protected native void merge(long handle, byte[] key, int keyOffset,
byte[] key, int keyLen, int keyLength, byte[] value, int valueOffset, int valueLength,
byte[] value, int valueLen) throws RocksDBException; long cfHandle) throws RocksDBException;
protected native void merge( protected native void merge(long handle, long writeOptHandle, byte[] key,
long handle, long writeOptHandle, int keyOffset, int keyLength, byte[] value, int valueOffset,
byte[] key, int keyLen, int valueLength) throws RocksDBException;
byte[] value, int valueLen, long cfHandle) throws RocksDBException; protected native void merge(long handle, long writeOptHandle, byte[] key,
protected native int get( int keyOffset, int keyLength, byte[] value, int valueOffset,
long handle, byte[] key, int keyLen, int valueLength, long cfHandle) throws RocksDBException;
byte[] value, int valueLen) throws RocksDBException; protected native int get(long handle, byte[] key, int keyOffset,
protected native int get( int keyLength, byte[] value, int valueOffset, int valueLength)
long handle, byte[] key, int keyLen, throws RocksDBException;
byte[] value, int valueLen, long cfHandle) throws RocksDBException; protected native int get(long handle, byte[] key, int keyOffset,
protected native int get( int keyLength, byte[] value, int valueOffset, int valueLength,
long handle, long readOptHandle, byte[] key, int keyLen, long cfHandle) throws RocksDBException;
byte[] value, int valueLen) throws RocksDBException; protected native int get(long handle, long readOptHandle, byte[] key,
protected native int get( int keyOffset, int keyLength, byte[] value, int valueOffset,
long handle, long readOptHandle, byte[] key, int keyLen, int valueLength) throws RocksDBException;
byte[] value, int valueLen, long cfHandle) throws RocksDBException; protected native int get(long handle, long readOptHandle, byte[] key,
protected native byte[][] multiGet(final long dbHandle, final byte[][] keys); int keyOffset, int keyLength, byte[] value, int valueOffset,
int valueLength, long cfHandle) throws RocksDBException;
protected native byte[][] multiGet(final long dbHandle, final byte[][] keys, protected native byte[][] multiGet(final long dbHandle, final byte[][] keys,
final int[] keyOffsets, final int[] keyLengths);
protected native byte[][] multiGet(final long dbHandle, final byte[][] keys,
final int[] keyOffsets, final int[] keyLengths,
final long[] columnFamilyHandles); final long[] columnFamilyHandles);
protected native byte[][] multiGet(final long dbHandle, final long rOptHandle, protected native byte[][] multiGet(final long dbHandle, final long rOptHandle,
final byte[][] keys); final byte[][] keys, final int[] keyOffsets, final int[] keyLengths);
protected native byte[][] multiGet(final long dbHandle, final long rOptHandle, protected native byte[][] multiGet(final long dbHandle, final long rOptHandle,
final byte[][] keys, final long[] columnFamilyHandles); final byte[][] keys, final int[] keyOffsets, final int[] keyLengths,
protected native byte[] get( final long[] columnFamilyHandles);
long handle, byte[] key, int keyLen) throws RocksDBException; protected native byte[] get(long handle, byte[] key, int keyOffset,
protected native byte[] get( int keyLength) throws RocksDBException;
long handle, byte[] key, int keyLen, long cfHandle) protected native byte[] get(long handle, byte[] key, int keyOffset,
throws RocksDBException; int keyLength, long cfHandle) throws RocksDBException;
protected native byte[] get( protected native byte[] get(long handle, long readOptHandle,
long handle, long readOptHandle, byte[] key, int keyOffset, int keyLength) throws RocksDBException;
byte[] key, int keyLen) throws RocksDBException; protected native byte[] get(long handle, long readOptHandle, byte[] key,
protected native byte[] get( int keyOffset, int keyLength, long cfHandle) throws RocksDBException;
long handle, long readOptHandle, protected native void delete(long handle, byte[] key, int keyOffset,
byte[] key, int keyLen, long cfHandle) throws RocksDBException; int keyLength) throws RocksDBException;
protected native void delete( protected native void delete(long handle, byte[] key, int keyOffset,
long handle, byte[] key, int keyLen) throws RocksDBException; int keyLength, long cfHandle) throws RocksDBException;
protected native void delete( protected native void delete(long handle, long writeOptHandle, byte[] key,
long handle, byte[] key, int keyLen, long cfHandle) int keyOffset, int keyLength) throws RocksDBException;
throws RocksDBException; protected native void delete(long handle, long writeOptHandle, byte[] key,
protected native void delete( int keyOffset, int keyLength, long cfHandle) throws RocksDBException;
long handle, long writeOptHandle,
byte[] key, int keyLen) throws RocksDBException;
protected native void delete(
long handle, long writeOptHandle,
byte[] key, int keyLen, long cfHandle) throws RocksDBException;
protected native void singleDelete( protected native void singleDelete(
long handle, byte[] key, int keyLen) throws RocksDBException; long handle, byte[] key, int keyLen) throws RocksDBException;
protected native void singleDelete( protected native void singleDelete(
@ -2070,8 +2096,8 @@ public class RocksDB extends RocksObject {
String property, int propertyLength) throws RocksDBException; String property, int propertyLength) throws RocksDBException;
protected native String getProperty0(long nativeHandle, long cfHandle, protected native String getProperty0(long nativeHandle, long cfHandle,
String property, int propertyLength) throws RocksDBException; String property, int propertyLength) throws RocksDBException;
protected native long getLongProperty(long nativeHandle, protected native long getLongProperty(long nativeHandle, String property,
String property, int propertyLength) throws RocksDBException; int propertyLength) throws RocksDBException;
protected native long getLongProperty(long nativeHandle, long cfHandle, protected native long getLongProperty(long nativeHandle, long cfHandle,
String property, int propertyLength) throws RocksDBException; String property, int propertyLength) throws RocksDBException;
protected native long iterator(long handle); protected native long iterator(long handle);
@ -2083,8 +2109,7 @@ public class RocksDB extends RocksObject {
final long[] columnFamilyHandles, final long readOptHandle) final long[] columnFamilyHandles, final long readOptHandle)
throws RocksDBException; throws RocksDBException;
protected native long getSnapshot(long nativeHandle); protected native long getSnapshot(long nativeHandle);
protected native void releaseSnapshot( protected native void releaseSnapshot(long nativeHandle, long snapshotHandle);
long nativeHandle, long snapshotHandle);
@Override protected final native void disposeInternal(final long handle); @Override protected final native void disposeInternal(final long handle);
private native long getDefaultColumnFamily(long handle); private native long getDefaultColumnFamily(long handle);
private native long createColumnFamily(final long handle, private native long createColumnFamily(final long handle,
@ -2094,8 +2119,8 @@ public class RocksDB extends RocksObject {
throws RocksDBException; throws RocksDBException;
private native void flush(long handle, long flushOptHandle) private native void flush(long handle, long flushOptHandle)
throws RocksDBException; throws RocksDBException;
private native void flush(long handle, long flushOptHandle, private native void flush(long handle, long flushOptHandle, long cfHandle)
long cfHandle) throws RocksDBException; throws RocksDBException;
private native void compactRange0(long handle, boolean reduce_level, private native void compactRange0(long handle, boolean reduce_level,
int target_level, int target_path_id) throws RocksDBException; int target_level, int target_path_id) throws RocksDBException;
private native void compactRange0(long handle, byte[] begin, int beginLen, private native void compactRange0(long handle, byte[] begin, int beginLen,
@ -2111,8 +2136,8 @@ public class RocksDB extends RocksObject {
private native void continueBackgroundWork(long handle) throws RocksDBException; private native void continueBackgroundWork(long handle) throws RocksDBException;
private native long getLatestSequenceNumber(long handle); private native long getLatestSequenceNumber(long handle);
private native void disableFileDeletions(long handle) throws RocksDBException; private native void disableFileDeletions(long handle) throws RocksDBException;
private native void enableFileDeletions(long handle, private native void enableFileDeletions(long handle, boolean force)
boolean force) throws RocksDBException; throws RocksDBException;
private native long getUpdatesSince(long handle, long sequenceNumber) private native long getUpdatesSince(long handle, long sequenceNumber)
throws RocksDBException; throws RocksDBException;
private native void setOptions(long handle, long cfHandle, String[] keys, private native void setOptions(long handle, long cfHandle, String[] keys,

View File

@ -6,6 +6,7 @@
#include <assert.h> #include <assert.h>
#include <condition_variable> #include <condition_variable>
#include <functional>
#include <mutex> #include <mutex>
#include <string> #include <string>
#include <thread> #include <thread>

View File

@ -10,6 +10,7 @@
#pragma once #pragma once
#include <atomic> #include <atomic>
#include <functional>
#include <memory> #include <memory>
#include <unordered_map> #include <unordered_map>
#include <vector> #include <vector>

View File

@ -359,6 +359,36 @@ bool BlockCacheTier::Reserve(const size_t size) {
return true; return true;
} }
Status NewPersistentCache(Env* const env, const std::string& path,
const uint64_t size,
const std::shared_ptr<Logger>& log,
const bool optimized_for_nvm,
std::shared_ptr<PersistentCache>* cache) {
if (!cache) {
return Status::IOError("invalid argument cache");
}
auto opt = PersistentCacheConfig(env, path, size, log);
if (optimized_for_nvm) {
// the default settings are optimized for SSD
// NVM devices are better accessed with 4K direct IO and written with
// parallelism
opt.enable_direct_writes = true;
opt.writer_qdepth = 4;
opt.writer_dispatch_size = 4 * 1024;
}
auto pcache = std::make_shared<BlockCacheTier>(opt);
Status s = pcache->Open();
if (!s.ok()) {
return s;
}
*cache = pcache;
return s;
}
} // namespace rocksdb } // namespace rocksdb
#endif // ifndef ROCKSDB_LITE #endif // ifndef ROCKSDB_LITE

View File

@ -49,8 +49,8 @@ class BlockCacheTier : public PersistentCacheTier {
} }
virtual ~BlockCacheTier() { virtual ~BlockCacheTier() {
// By contract, the user should have called stop before destroying the // Close is re-entrant so we can call close even if it is already closed
// object Close();
assert(!insert_th_.joinable()); assert(!insert_th_.joinable());
} }

View File

@ -9,6 +9,7 @@
#ifndef OS_WIN #ifndef OS_WIN
#include <unistd.h> #include <unistd.h>
#endif #endif
#include <functional>
#include <memory> #include <memory>
#include <vector> #include <vector>

View File

@ -6,6 +6,7 @@
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
#include <functional>
#include <list> #include <list>
#include <memory> #include <memory>
#include <string> #include <string>

View File

@ -7,6 +7,7 @@
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
#include <functional>
#include "util/random.h" #include "util/random.h"
#include "utilities/persistent_cache/hash_table.h" #include "utilities/persistent_cache/hash_table.h"
#include "utilities/persistent_cache/lrulist.h" #include "utilities/persistent_cache/lrulist.h"

View File

@ -235,6 +235,19 @@ static void UniqueIdCallback(void* arg) {
} }
#endif #endif
TEST_F(PersistentCacheTierTest, FactoryTest) {
for (auto nvm_opt : {true, false}) {
ASSERT_FALSE(cache_);
auto log = std::make_shared<ConsoleLogger>();
std::shared_ptr<PersistentCache> cache;
ASSERT_OK(NewPersistentCache(Env::Default(), path_,
/*size=*/1 * 1024 * 1024 * 1024, log, nvm_opt,
&cache));
ASSERT_TRUE(cache);
cache.reset();
}
}
PersistentCacheDBTest::PersistentCacheDBTest() : DBTestBase("/cache_test") { PersistentCacheDBTest::PersistentCacheDBTest() : DBTestBase("/cache_test") {
#ifdef OS_LINUX #ifdef OS_LINUX
rocksdb::SyncPoint::GetInstance()->EnableProcessing(); rocksdb::SyncPoint::GetInstance()->EnableProcessing();
@ -403,6 +416,7 @@ TEST_F(PersistentCacheDBTest, TieredCacheTest) {
RunTest(std::bind(&MakeTieredCache, dbname_)); RunTest(std::bind(&MakeTieredCache, dbname_));
} }
#endif #endif
} // namespace rocksdb } // namespace rocksdb
int main(int argc, char** argv) { int main(int argc, char** argv) {