Compare commits

...

18 Commits

Author SHA1 Message Date
sdong
d903241757 Disable error as warning 2019-11-05 11:04:45 -08:00
sdong
01475b49ee Add one more #include<functional> 2019-11-05 11:04:44 -08:00
sdong
de3fdee3ce Add some include<functional> 2019-10-31 14:24:56 -07:00
sdong
6afcb0d933 [FB Internal] Point to the latest tool chain. 2019-10-31 14:24:29 -07:00
sdong
d6f3ff3629 [fb only] revert unintended change of USE_SSE
The previuos change that use gcc-5 set USE_SSE to wrong flag by mistake. Fix it.
2017-07-17 22:22:40 -07:00
sdong
4e0ea33c3c [FB Only] use gcc-5 2017-07-17 21:55:40 -07:00
sdong
58d8de115f Upgrade to version 4.10.2 2016-09-07 11:03:54 -07:00
sdong
95ef90988b Ignore stale logs while restarting DBs
Summary:
Stale log files can be deleted out of order. This can happen for various reasons. One of the reason is that no data is ever inserted to a column family and we have an optimization to update its log number, but not all the old log files are cleaned up (the case shown in the unit tests added). It can also happen when we simply delete multiple log files out of order.

This causes data corruption because we simply increase seqID after processing the next row and we may end up with writing data with smaller seqID than what is already flushed to memtables.

In DB recovery, for the oldest files we are replaying, if there it contains no data for any column family, we ignore the sequence IDs in the file.

Test Plan: Add two unit tests that fail without the fix.

Reviewers: IslamAbdelRahman, igor, yiwu

Reviewed By: yiwu

Subscribers: hermanlee4, yoshinorim, leveldb, andrewkr, dhruba

Differential Revision: https://reviews.facebook.net/D60891
2016-09-07 10:57:32 -07:00
krad
1b3810391e Update version to 4.10.1 2016-08-26 14:53:34 -07:00
krad
4f1f992b67 Revert "Update version to 4.10.1"
This reverts commit 89b20d3cce3f2aeabf16d0df7abb43db4e5f0f76.
2016-08-26 14:51:39 -07:00
krad
89b20d3cce Update version to 4.10.1 2016-08-26 14:46:47 -07:00
sdong
a31b8cb7a7 Mitigate regression bug of options.max_successive_merges hit during DB Recovery
Summary:
After 1b8a2e8fdd1db0dac3cb50228065f8e7e43095f0, DB Pointer is passed to WriteBatchInternal::InsertInto() while DB recovery. This can cause deadlock if options.max_successive_merges hits. In that case DB::Get() will be called. Get() will try to acquire the DB mutex, which is already held by the DB::Open(), causing a deadlock condition.

This commit mitigates the problem by not passing the DB pointer unless 2PC is allowed.

Test Plan: Add a new test and run it.

Reviewers: IslamAbdelRahman, andrewkr, kradhakrishnan, horuff

Reviewed By: kradhakrishnan

Subscribers: leveldb, andrewkr, dhruba

Differential Revision: https://reviews.facebook.net/D62625
2016-08-25 17:35:38 -07:00
John Alexander
2701f5c267 Remove %z Format Specifier and Fix Windows Build of sim_cache.cc (#1224)
* Replace %zu format specifier with Windows-compatible macro 'ROCKSDB_PRIszt'

* Added "port/port.h" include to sim_cache.cc for call to snprintf().

* Applied cleaner fix to windows build, reverting part of 7bedd94
2016-07-28 16:07:21 -07:00
John Alexander
fdc0675f5c New Statistics to track Compression/Decompression (#1197)
* Added new statistics and refactored to allow ioptions to be passed around as required to access environment and statistics pointers (and, as a convenient side effect, info_log pointer).

* Prevent incrementing compression counter when compression is turned off in options.

* Prevent incrementing compression counter when compression is turned off in options.

* Added two more supported compression types to test code in db_test.cc

* Prevent incrementing compression counter when compression is turned off in options.

* Added new StatsLevel that excludes compression timing.

* Fixed casting error in coding.h

* Fixed CompressionStatsTest for new StatsLevel.

* Removed unused variable that was breaking the Linux build
2016-07-28 16:06:47 -07:00
Islam AbdelRahman
605cd9bfcb Fix Statistics TickersNameMap miss match with Tickers enum
Summary:
TickersNameMap is not consistent with Tickers enum.
this cause us to report wrong statistics and sometimes to access TickersNameMap outside it's boundary causing crashes (in Fb303 statistics)

Test Plan: added new unit test

Reviewers: sdong, kradhakrishnan

Reviewed By: kradhakrishnan

Subscribers: andrewkr, dhruba

Differential Revision: https://reviews.facebook.net/D61083
2016-07-26 13:42:44 -07:00
Yi Wu
c7eb50b132 Fix unit test which breaks lite build
Summary: Comment out assertion of number of table files from lite build.

Test Plan:
    OPT=-DROCKSDB_LITE make check

Reviewers: lightmark

Reviewed By: lightmark

Subscribers: andrewkr, dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D60999
2016-07-21 14:32:49 -07:00
Yi Wu
2462cebfe8 Fix flush not being commit while writing manifest
Summary:
Fix flush not being commit while writing manifest, which is a recent bug introduced by D60075.

The issue:
# Options.max_background_flushes > 1
# Background thread A pick up a flush job, flush, then commit to manifest. (Note that mutex is released before writing manifest.)
# Background thread B pick up another flush job, flush. When it gets to `MemTableList::InstallMemtableFlushResults`, it notices another thread is commiting, so it quit.
# After the first commit, thread A doesn't double check if there are more flush result need to commit, leaving the second flush uncommitted.

Test Plan: run the test. Also verify the new test hit deadlock without the fix.

Reviewers: sdong, igor, lightmark

Reviewed By: lightmark

Subscribers: andrewkr, omegaga, dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D60969
2016-07-21 13:56:22 -07:00
krad
becc230aa9 Fix version number 2016-07-21 10:59:26 -07:00
41 changed files with 689 additions and 231 deletions

2
.gitignore vendored
View File

@ -64,3 +64,5 @@ java/javadoc
scan_build_report/ scan_build_report/
t t
LOG LOG
db_logs/

View File

@ -356,6 +356,7 @@ set(TESTS
db/db_test2.cc db/db_test2.cc
db/db_block_cache_test.cc db/db_block_cache_test.cc
db/db_bloom_filter_test.cc db/db_bloom_filter_test.cc
db/db_flush_test.cc
db/db_iterator_test.cc db/db_iterator_test.cc
db/db_sst_test.cc db/db_sst_test.cc
db/db_universal_compaction_test.cc db/db_universal_compaction_test.cc
@ -420,6 +421,7 @@ set(TESTS
util/options_test.cc util/options_test.cc
util/rate_limiter_test.cc util/rate_limiter_test.cc
util/slice_transform_test.cc util/slice_transform_test.cc
util/statistics_test.cc
util/thread_list_test.cc util/thread_list_test.cc
util/thread_local_test.cc util/thread_local_test.cc
utilities/backupable/backupable_db_test.cc utilities/backupable/backupable_db_test.cc

View File

@ -1,5 +1,12 @@
# Rocksdb Change Log ## 4.10.2
## Unreleased ### Bug Fixes
* backport the bug fix of the regression data corruption after DB recovery when stale WAL file deletion is reordered.
## 4.10.1
### Bug Fixes
* Fix the regression deadlock bug of DB recovery if options.max_successive_merges hits.
## 4.10.0
### Public API Change ### Public API Change
* options.memtable_prefix_bloom_bits changes to options.memtable_prefix_bloom_bits_ratio and deprecate options.memtable_prefix_bloom_probes * options.memtable_prefix_bloom_bits changes to options.memtable_prefix_bloom_bits_ratio and deprecate options.memtable_prefix_bloom_probes
* enum type CompressionType and PerfLevel changes from char to unsigned char. Value of all PerfLevel shift by one. * enum type CompressionType and PerfLevel changes from char to unsigned char. Value of all PerfLevel shift by one.

View File

@ -212,10 +212,6 @@ default: all
WARNING_FLAGS = -W -Wextra -Wall -Wsign-compare -Wshadow \ WARNING_FLAGS = -W -Wextra -Wall -Wsign-compare -Wshadow \
-Wno-unused-parameter -Wno-unused-parameter
ifndef DISABLE_WARNING_AS_ERROR
WARNING_FLAGS += -Werror
endif
CFLAGS += $(WARNING_FLAGS) -I. -I./include $(PLATFORM_CCFLAGS) $(OPT) CFLAGS += $(WARNING_FLAGS) -I. -I./include $(PLATFORM_CCFLAGS) $(OPT)
CXXFLAGS += $(WARNING_FLAGS) -I. -I./include $(PLATFORM_CXXFLAGS) $(OPT) -Woverloaded-virtual -Wnon-virtual-dtor -Wno-missing-field-initializers CXXFLAGS += $(WARNING_FLAGS) -I. -I./include $(PLATFORM_CXXFLAGS) $(OPT) -Woverloaded-virtual -Wnon-virtual-dtor -Wno-missing-field-initializers
@ -274,8 +270,10 @@ TESTS = \
db_compaction_filter_test \ db_compaction_filter_test \
db_compaction_test \ db_compaction_test \
db_dynamic_level_test \ db_dynamic_level_test \
db_flush_test \
db_inplace_update_test \ db_inplace_update_test \
db_iterator_test \ db_iterator_test \
db_options_test \
db_sst_test \ db_sst_test \
db_tailing_iter_test \ db_tailing_iter_test \
db_universal_compaction_test \ db_universal_compaction_test \
@ -372,6 +370,7 @@ TESTS = \
ldb_cmd_test \ ldb_cmd_test \
iostats_context_test \ iostats_context_test \
persistent_cache_test \ persistent_cache_test \
statistics_test \
PARALLEL_TEST = \ PARALLEL_TEST = \
backupable_db_test \ backupable_db_test \
@ -920,6 +919,9 @@ db_compaction_test: db/db_compaction_test.o db/db_test_util.o $(LIBOBJECTS) $(TE
db_dynamic_level_test: db/db_dynamic_level_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS) db_dynamic_level_test: db/db_dynamic_level_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK) $(AM_LINK)
db_flush_test: db/db_flush_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK)
db_inplace_update_test: db/db_inplace_update_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS) db_inplace_update_test: db/db_inplace_update_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK) $(AM_LINK)
@ -1199,6 +1201,9 @@ iostats_context_test: util/iostats_context_test.o $(LIBOBJECTS) $(TESTHARNESS)
persistent_cache_test: utilities/persistent_cache/persistent_cache_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS) persistent_cache_test: utilities/persistent_cache/persistent_cache_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK) $(AM_LINK)
statistics_test: util/statistics_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK)
#------------------------------------------------- #-------------------------------------------------
# make install related stuff # make install related stuff
INSTALL_PATH ?= /usr/local INSTALL_PATH ?= /usr/local

View File

@ -52,12 +52,7 @@ if [ -z "$ROCKSDB_NO_FBCODE" -a -d /mnt/gvfs/third-party ]; then
FBCODE_BUILD="true" FBCODE_BUILD="true"
# If we're compiling with TSAN we need pic build # If we're compiling with TSAN we need pic build
PIC_BUILD=$COMPILE_WITH_TSAN PIC_BUILD=$COMPILE_WITH_TSAN
if [ -z "$ROCKSDB_FBCODE_BUILD_WITH_481" ]; then source "$PWD/build_tools/fbcode_config.sh"
source "$PWD/build_tools/fbcode_config.sh"
else
# we need this to build with MySQL. Don't use for other purposes.
source "$PWD/build_tools/fbcode_config4.8.1.sh"
fi
fi fi
# Delete existing output, if it exists # Delete existing output, if it exists

View File

@ -1,16 +1,19 @@
GCC_BASE=/mnt/gvfs/third-party2/gcc/ebc96bc2fb751b5a0300b8d91a95bdf24ac1d88b/4.9.x/centos6-native/108cf83 # Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved.
CLANG_BASE=/mnt/gvfs/third-party2/llvm-fb/b91de48a4974ec839946d824402b098d43454cef/stable/centos6-native/7aaccbe GCC_BASE=/mnt/gvfs/third-party2/gcc/7331085db891a2ef4a88a48a751d834e8d68f4cb/7.x/centos7-native/b2ef2b6
LIBGCC_BASE=/mnt/gvfs/third-party2/libgcc/53e0eac8911888a105aa98b9a35fe61cf1d8b278/4.9.x/gcc-4.9-glibc-2.20/024dbc3 CLANG_BASE=/mnt/gvfs/third-party2/llvm-fb/963d9aeda70cc4779885b1277484fe7544a04e3e/9.0.0/platform007/9e92d53/
GLIBC_BASE=/mnt/gvfs/third-party2/glibc/ee36ac9a72dfac4a995f1b215bb4c0fc8a0f6f91/2.20/gcc-4.9-glibc-2.20/500e281 LIBGCC_BASE=/mnt/gvfs/third-party2/libgcc/6ace84e956873d53638c738b6f65f3f469cca74c/7.x/platform007/5620abc
SNAPPY_BASE=/mnt/gvfs/third-party2/snappy/8c38a4c1e52b4c2cc8a9cdc31b9c947ed7dbfcb4/1.1.3/gcc-4.9-glibc-2.20/e9936bf GLIBC_BASE=/mnt/gvfs/third-party2/glibc/192b0f42d63dcf6210d6ceae387b49af049e6e0c/2.26/platform007/f259413
ZLIB_BASE=/mnt/gvfs/third-party2/zlib/2b24f1e99454f9ca7b0301720f94836dae1bf71b/1.2.8/gcc-5-glibc-2.23/9bc6787 SNAPPY_BASE=/mnt/gvfs/third-party2/snappy/7f9bdaada18f59bc27ec2b0871eb8a6144343aef/1.1.3/platform007/ca4da3d
BZIP2_BASE=/mnt/gvfs/third-party2/bzip2/af7c14c9b652cdd5ec34eadd25c3f38a9b306c5d/1.0.6/gcc-5-glibc-2.23/9bc6787 ZLIB_BASE=/mnt/gvfs/third-party2/zlib/2d9f0b9a4274cc21f61272a9e89bdb859bce8f1f/1.2.8/platform007/ca4da3d
LZ4_BASE=/mnt/gvfs/third-party2/lz4/0e790b441e2d9acd68d51e1d2e028f88c6a79ddf/r131/gcc-5-glibc-2.23/9bc6787 BZIP2_BASE=/mnt/gvfs/third-party2/bzip2/dc49a21c5fceec6456a7a28a94dcd16690af1337/1.0.6/platform007/ca4da3d
ZSTD_BASE=/mnt/gvfs/third-party2/zstd/1408484d03b15492aa54b10356104e9dc22e1cc5/0.6.1/gcc-5-glibc-2.23/9bc6787 LZ4_BASE=/mnt/gvfs/third-party2/lz4/0f607f8fc442ea7d6b876931b1898bb573d5e5da/1.9.1/platform007/ca4da3d
GFLAGS_BASE=/mnt/gvfs/third-party2/gflags/5a5c7a6608cb32f1e1e7f814023d5bdfbd136370/2.1.1/gcc-4.8.1-glibc-2.17/c3f970a ZSTD_BASE=/mnt/gvfs/third-party2/zstd/ca22bc441a4eb709e9e0b1f9fec9750fed7b31c5/1.4.x/platform007/15a3614
JEMALLOC_BASE=/mnt/gvfs/third-party2/jemalloc/a4c2adecffcaa68d5585d06be2252e3efa52555b/master/gcc-5-glibc-2.23/1c32b4b GFLAGS_BASE=/mnt/gvfs/third-party2/gflags/0b9929d2588991c65a57168bf88aff2db87c5d48/2.2.0/platform007/ca4da3d
NUMA_BASE=/mnt/gvfs/third-party2/numa/1abc0d3c01743b854676423cf2d3629912f34930/2.0.8/gcc-5-glibc-2.23/9bc6787 JEMALLOC_BASE=/mnt/gvfs/third-party2/jemalloc/c26f08f47ac35fc31da2633b7da92d6b863246eb/master/platform007/c26c002
LIBUNWIND_BASE=/mnt/gvfs/third-party2/libunwind/ad576de2a1ea560c4d3434304f0fc4e079bede42/trunk/gcc-5-glibc-2.23/b1847cb NUMA_BASE=/mnt/gvfs/third-party2/numa/3f3fb57a5ccc5fd21c66416c0b83e0aa76a05376/2.0.11/platform007/ca4da3d
KERNEL_HEADERS_BASE=/mnt/gvfs/third-party2/kernel-headers/f24be37d170e04be6e469af487644d4d62e1c6c1/4.0.9-36_fbk5_2933_gd092e3f/gcc-5-glibc-2.23/da39a3e LIBUNWIND_BASE=/mnt/gvfs/third-party2/libunwind/40c73d874898b386a71847f1b99115d93822d11f/1.4/platform007/6f3e0a9
BINUTILS_BASE=/mnt/gvfs/third-party2/binutils/9d65c666b9adf8f2a989fd4b98a9a5e7d3afa233/2.26/centos6-native/da39a3e TBB_BASE=/mnt/gvfs/third-party2/tbb/4ce8e8dba77cdbd81b75d6f0c32fd7a1b76a11ec/2018_U5/platform007/ca4da3d
VALGRIND_BASE=/mnt/gvfs/third-party2/valgrind/9cee5a3628dc9d4b93897972c58eba865e25b270/3.10.0/gcc-4.9-glibc-2.20/e9936bf KERNEL_HEADERS_BASE=/mnt/gvfs/third-party2/kernel-headers/fb251ecd2f5ae16f8671f7014c246e52a748fe0b/fb/platform007/da39a3e
BINUTILS_BASE=/mnt/gvfs/third-party2/binutils/ab9f09bba370e7066cafd4eb59752db93f2e8312/2.29.1/platform007/15a3614
VALGRIND_BASE=/mnt/gvfs/third-party2/valgrind/d42d152a15636529b0861ec493927200ebebca8e/3.15.0/platform007/ca4da3d
LUA_BASE=/mnt/gvfs/third-party2/lua/f0cd714433206d5139df61659eb7b28b1dea6683/5.3.4/platform007/5007832

View File

@ -13,7 +13,7 @@ source "$BASEDIR/dependencies.sh"
CFLAGS="" CFLAGS=""
# libgcc # libgcc
LIBGCC_INCLUDE="$LIBGCC_BASE/include" LIBGCC_INCLUDE="$LIBGCC_BASE/include/c++/7.3.0"
LIBGCC_LIBS=" -L $LIBGCC_BASE/lib" LIBGCC_LIBS=" -L $LIBGCC_BASE/lib"
# glibc # glibc
@ -43,12 +43,16 @@ if test -z $PIC_BUILD; then
LZ4_INCLUDE=" -I $LZ4_BASE/include/" LZ4_INCLUDE=" -I $LZ4_BASE/include/"
LZ4_LIBS=" $LZ4_BASE/lib/liblz4.a" LZ4_LIBS=" $LZ4_BASE/lib/liblz4.a"
CFLAGS+=" -DLZ4" CFLAGS+=" -DLZ4"
ZSTD_INCLUDE=" -I $ZSTD_BASE/include/"
ZSTD_LIBS=" $ZSTD_BASE/lib/libzstd.a"
CFLAGS+=" -DZSTD"
fi fi
ZSTD_INCLUDE=" -I $ZSTD_BASE/include/"
if test -z $PIC_BUILD; then
ZSTD_LIBS=" $ZSTD_BASE/lib/libzstd.a"
else
ZSTD_LIBS=" $ZSTD_BASE/lib/libzstd_pic.a"
fi
CFLAGS+=" -DZSTD"
# location of gflags headers and libraries # location of gflags headers and libraries
GFLAGS_INCLUDE=" -I $GFLAGS_BASE/include/" GFLAGS_INCLUDE=" -I $GFLAGS_BASE/include/"
if test -z $PIC_BUILD; then if test -z $PIC_BUILD; then
@ -56,7 +60,7 @@ if test -z $PIC_BUILD; then
else else
GFLAGS_LIBS=" $GFLAGS_BASE/lib/libgflags_pic.a" GFLAGS_LIBS=" $GFLAGS_BASE/lib/libgflags_pic.a"
fi fi
CFLAGS+=" -DGFLAGS=google" CFLAGS+=" -DGFLAGS=gflags"
# location of jemalloc # location of jemalloc
JEMALLOC_INCLUDE=" -I $JEMALLOC_BASE/include/" JEMALLOC_INCLUDE=" -I $JEMALLOC_BASE/include/"
@ -72,13 +76,22 @@ if test -z $PIC_BUILD; then
LIBUNWIND="$LIBUNWIND_BASE/lib/libunwind.a" LIBUNWIND="$LIBUNWIND_BASE/lib/libunwind.a"
fi fi
# location of TBB
TBB_INCLUDE=" -isystem $TBB_BASE/include/"
if test -z $PIC_BUILD; then
TBB_LIBS="$TBB_BASE/lib/libtbb.a"
else
TBB_LIBS="$TBB_BASE/lib/libtbb_pic.a"
fi
CFLAGS+=" -DTBB"
# use Intel SSE support for checksum calculations # use Intel SSE support for checksum calculations
export USE_SSE=1 export USE_SSE=1
BINUTILS="$BINUTILS_BASE/bin" BINUTILS="$BINUTILS_BASE/bin"
AR="$BINUTILS/ar" AR="$BINUTILS/ar"
DEPS_INCLUDE="$SNAPPY_INCLUDE $ZLIB_INCLUDE $BZIP_INCLUDE $LZ4_INCLUDE $ZSTD_INCLUDE $GFLAGS_INCLUDE $NUMA_INCLUDE" DEPS_INCLUDE="$SNAPPY_INCLUDE $ZLIB_INCLUDE $BZIP_INCLUDE $LZ4_INCLUDE $ZSTD_INCLUDE $GFLAGS_INCLUDE $NUMA_INCLUDE $TBB_INCLUDE"
STDLIBS="-L $GCC_BASE/lib64" STDLIBS="-L $GCC_BASE/lib64"
@ -95,8 +108,8 @@ if [ -z "$USE_CLANG" ]; then
CXX="$GCC_BASE/bin/g++" CXX="$GCC_BASE/bin/g++"
CFLAGS+=" -B$BINUTILS/gold" CFLAGS+=" -B$BINUTILS/gold"
CFLAGS+=" -isystem $GLIBC_INCLUDE"
CFLAGS+=" -isystem $LIBGCC_INCLUDE" CFLAGS+=" -isystem $LIBGCC_INCLUDE"
CFLAGS+=" -isystem $GLIBC_INCLUDE"
JEMALLOC=1 JEMALLOC=1
else else
# clang # clang
@ -107,8 +120,8 @@ else
KERNEL_HEADERS_INCLUDE="$KERNEL_HEADERS_BASE/include" KERNEL_HEADERS_INCLUDE="$KERNEL_HEADERS_BASE/include"
CFLAGS+=" -B$BINUTILS/gold -nostdinc -nostdlib" CFLAGS+=" -B$BINUTILS/gold -nostdinc -nostdlib"
CFLAGS+=" -isystem $LIBGCC_BASE/include/c++/4.9.x " CFLAGS+=" -isystem $LIBGCC_BASE/include/c++/7.x "
CFLAGS+=" -isystem $LIBGCC_BASE/include/c++/4.9.x/x86_64-facebook-linux " CFLAGS+=" -isystem $LIBGCC_BASE/include/c++/7.x/x86_64-facebook-linux "
CFLAGS+=" -isystem $GLIBC_INCLUDE" CFLAGS+=" -isystem $GLIBC_INCLUDE"
CFLAGS+=" -isystem $LIBGCC_INCLUDE" CFLAGS+=" -isystem $LIBGCC_INCLUDE"
CFLAGS+=" -isystem $CLANG_INCLUDE" CFLAGS+=" -isystem $CLANG_INCLUDE"
@ -119,18 +132,21 @@ else
fi fi
CFLAGS+=" $DEPS_INCLUDE" CFLAGS+=" $DEPS_INCLUDE"
CFLAGS+=" -DROCKSDB_PLATFORM_POSIX -DROCKSDB_LIB_IO_POSIX -DROCKSDB_FALLOCATE_PRESENT -DROCKSDB_MALLOC_USABLE_SIZE" CFLAGS+=" -DROCKSDB_PLATFORM_POSIX -DROCKSDB_LIB_IO_POSIX -DROCKSDB_FALLOCATE_PRESENT -DROCKSDB_MALLOC_USABLE_SIZE -DROCKSDB_RANGESYNC_PRESENT -DROCKSDB_SCHED_GETCPU_PRESENT -DROCKSDB_SUPPORT_THREAD_LOCAL -DHAVE_SSE42"
CXXFLAGS+=" $CFLAGS" CXXFLAGS+=" $CFLAGS"
EXEC_LDFLAGS=" $SNAPPY_LIBS $ZLIB_LIBS $BZIP_LIBS $LZ4_LIBS $ZSTD_LIBS $GFLAGS_LIBS $NUMA_LIB" EXEC_LDFLAGS=" $SNAPPY_LIBS $ZLIB_LIBS $BZIP_LIBS $LZ4_LIBS $ZSTD_LIBS $GFLAGS_LIBS $NUMA_LIB $TBB_LIBS"
EXEC_LDFLAGS+=" -Wl,--dynamic-linker,/usr/local/fbcode/gcc-4.9-glibc-2.20/lib/ld.so" EXEC_LDFLAGS+=" -B$BINUTILS/gold"
EXEC_LDFLAGS+=" -Wl,--dynamic-linker,/usr/local/fbcode/platform007/lib/ld.so"
EXEC_LDFLAGS+=" $LIBUNWIND" EXEC_LDFLAGS+=" $LIBUNWIND"
EXEC_LDFLAGS+=" -Wl,-rpath=/usr/local/fbcode/gcc-4.9-glibc-2.20/lib" EXEC_LDFLAGS+=" -Wl,-rpath=/usr/local/fbcode/platform007/lib"
# required by libtbb
EXEC_LDFLAGS+=" -ldl"
PLATFORM_LDFLAGS="$LIBGCC_LIBS $GLIBC_LIBS $STDLIBS -lgcc -lstdc++" PLATFORM_LDFLAGS="$LIBGCC_LIBS $GLIBC_LIBS $STDLIBS -lgcc -lstdc++"
EXEC_LDFLAGS_SHARED="$SNAPPY_LIBS $ZLIB_LIBS $BZIP_LIBS $LZ4_LIBS $ZSTD_LIBS $GFLAGS_LIBS" EXEC_LDFLAGS_SHARED="$SNAPPY_LIBS $ZLIB_LIBS $BZIP_LIBS $LZ4_LIBS $ZSTD_LIBS $GFLAGS_LIBS $TBB_LIBS"
VALGRIND_VER="$VALGRIND_BASE/bin/" VALGRIND_VER="$VALGRIND_BASE/bin/"
export CC CXX AR CFLAGS CXXFLAGS EXEC_LDFLAGS EXEC_LDFLAGS_SHARED VALGRIND_VER JEMALLOC_LIB JEMALLOC_INCLUDE CLANG_ANALYZER CLANG_SCAN_BUILD export CC CXX AR CFLAGS CXXFLAGS EXEC_LDFLAGS EXEC_LDFLAGS_SHARED VALGRIND_VER JEMALLOC_LIB JEMALLOC_INCLUDE CLANG_ANALYZER CLANG_SCAN_BUILD

View File

@ -18,6 +18,7 @@
#include "rocksdb/env.h" #include "rocksdb/env.h"
#include "rocksdb/iterator.h" #include "rocksdb/iterator.h"
#include "util/coding.h" #include "util/coding.h"
#include "util/fault_injection_test_env.h"
#include "util/options_parser.h" #include "util/options_parser.h"
#include "util/string_util.h" #include "util/string_util.h"
#include "util/sync_point.h" #include "util/sync_point.h"
@ -500,6 +501,135 @@ TEST_F(ColumnFamilyTest, DontReuseColumnFamilyID) {
} }
} }
class FlushEmptyCFTestWithParam : public ColumnFamilyTest,
public testing::WithParamInterface<bool> {
public:
FlushEmptyCFTestWithParam() { allow_2pc_ = GetParam(); }
// Required if inheriting from testing::WithParamInterface<>
static void SetUpTestCase() {}
static void TearDownTestCase() {}
bool allow_2pc_;
};
TEST_P(FlushEmptyCFTestWithParam, FlushEmptyCFTest) {
std::unique_ptr<FaultInjectionTestEnv> fault_env(
new FaultInjectionTestEnv(env_));
db_options_.env = fault_env.get();
db_options_.allow_2pc = allow_2pc_;
Open();
CreateColumnFamilies({"one", "two"});
// Generate log file A.
ASSERT_OK(Put(1, "foo", "v1")); // seqID 1
Reopen();
// Log file A is not dropped after reopening because default column family's
// min log number is 0.
// It flushes to SST file X
ASSERT_OK(Put(1, "foo", "v1")); // seqID 2
ASSERT_OK(Put(1, "bar", "v2")); // seqID 3
// Current log file is file B now. While flushing, a new log file C is created
// and is set to current. Boths' min log number is set to file C in memory, so
// after flushing file B is deleted. At the same time, the min log number of
// default CF is not written to manifest. Log file A still remains.
// Flushed to SST file Y.
Flush(1);
Flush(0);
ASSERT_OK(Put(1, "bar", "v3")); // seqID 4
ASSERT_OK(Put(1, "foo", "v4")); // seqID 5
// Preserve file system state up to here to simulate a crash condition.
fault_env->SetFilesystemActive(false);
std::vector<std::string> names;
for (auto name : names_) {
if (name != "") {
names.push_back(name);
}
}
Close();
fault_env->ResetState();
// Before opening, there are four files:
// Log file A contains seqID 1
// Log file C contains seqID 4, 5
// SST file X contains seqID 1
// SST file Y contains seqID 2, 3
// Min log number:
// default CF: 0
// CF one, two: C
// When opening the DB, all the seqID should be preserved.
Open(names, {});
ASSERT_EQ("v4", Get(1, "foo"));
ASSERT_EQ("v3", Get(1, "bar"));
Close();
db_options_.env = env_;
}
TEST_P(FlushEmptyCFTestWithParam, FlushEmptyCFTest2) {
std::unique_ptr<FaultInjectionTestEnv> fault_env(
new FaultInjectionTestEnv(env_));
db_options_.env = fault_env.get();
db_options_.allow_2pc = allow_2pc_;
Open();
CreateColumnFamilies({"one", "two"});
// Generate log file A.
ASSERT_OK(Put(1, "foo", "v1")); // seqID 1
Reopen();
// Log file A is not dropped after reopening because default column family's
// min log number is 0.
// It flushes to SST file X
ASSERT_OK(Put(1, "foo", "v1")); // seqID 2
ASSERT_OK(Put(1, "bar", "v2")); // seqID 3
// Current log file is file B now. While flushing, a new log file C is created
// and is set to current. Both CFs' min log number is set to file C so after
// flushing file B is deleted. Log file A still remains.
// Flushed to SST file Y.
Flush(1);
ASSERT_OK(Put(0, "bar", "v2")); // seqID 4
ASSERT_OK(Put(2, "bar", "v2")); // seqID 5
ASSERT_OK(Put(1, "bar", "v3")); // seqID 6
// Flushing all column families. This forces all CFs' min log to current. This
// is written to the manifest file. Log file C is cleared.
Flush(0);
Flush(1);
Flush(2);
// Write to log file D
ASSERT_OK(Put(1, "bar", "v4")); // seqID 7
ASSERT_OK(Put(1, "bar", "v5")); // seqID 8
// Preserve file system state up to here to simulate a crash condition.
fault_env->SetFilesystemActive(false);
std::vector<std::string> names;
for (auto name : names_) {
if (name != "") {
names.push_back(name);
}
}
Close();
fault_env->ResetState();
// Before opening, there are two logfiles:
// Log file A contains seqID 1
// Log file D contains seqID 7, 8
// Min log number:
// default CF: D
// CF one, two: D
// When opening the DB, log file D should be replayed using the seqID
// specified in the file.
Open(names, {});
ASSERT_EQ("v1", Get(1, "foo"));
ASSERT_EQ("v5", Get(1, "bar"));
Close();
db_options_.env = env_;
}
INSTANTIATE_TEST_CASE_P(FlushEmptyCFTestWithParam, FlushEmptyCFTestWithParam,
::testing::Bool());
TEST_F(ColumnFamilyTest, AddDrop) { TEST_F(ColumnFamilyTest, AddDrop) {
Open(); Open();
CreateColumnFamilies({"one", "two", "three"}); CreateColumnFamilies({"one", "two", "three"});

56
db/db_flush_test.cc Normal file
View File

@ -0,0 +1,56 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "db/db_test_util.h"
#include "port/stack_trace.h"
#include "util/sync_point.h"
namespace rocksdb {
class DBFlushTest : public DBTestBase {
public:
DBFlushTest() : DBTestBase("/db_flush_test") {}
};
// We had issue when two background threads trying to flush at the same time,
// only one of them get committed. The test verifies the issue is fixed.
TEST_F(DBFlushTest, FlushWhileWritingManifest) {
Options options;
options.disable_auto_compactions = true;
options.max_background_flushes = 2;
Reopen(options);
FlushOptions no_wait;
no_wait.wait = false;
SyncPoint::GetInstance()->LoadDependency(
{{"VersionSet::LogAndApply:WriteManifest",
"DBFlushTest::FlushWhileWritingManifest:1"},
{"MemTableList::InstallMemtableFlushResults:InProgress",
"VersionSet::LogAndApply:WriteManifestDone"}});
SyncPoint::GetInstance()->EnableProcessing();
ASSERT_OK(Put("foo", "v"));
ASSERT_OK(dbfull()->Flush(no_wait));
TEST_SYNC_POINT("DBFlushTest::FlushWhileWritingManifest:1");
ASSERT_OK(Put("bar", "v"));
ASSERT_OK(dbfull()->Flush(no_wait));
// If the issue is hit we will wait here forever.
dbfull()->TEST_WaitForFlushMemTable();
#ifndef ROCKSDB_LITE
ASSERT_EQ(2, TotalTableFiles());
#endif // ROCKSDB_LITE
}
} // namespace rocksdb
int main(int argc, char** argv) {
rocksdb::port::InstallStackTraceHandler();
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}

View File

@ -1478,9 +1478,11 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
} }
recovered_sequence = sequence; recovered_sequence = sequence;
bool no_prev_seq = true;
if (*next_sequence == kMaxSequenceNumber) { if (*next_sequence == kMaxSequenceNumber) {
*next_sequence = sequence; *next_sequence = sequence;
} else { } else {
no_prev_seq = false;
WriteBatchInternal::SetSequence(&batch, *next_sequence); WriteBatchInternal::SetSequence(&batch, *next_sequence);
} }
@ -1563,10 +1565,24 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
// insert. We don't want to fail the whole write batch in that case -- // insert. We don't want to fail the whole write batch in that case --
// we just ignore the update. // we just ignore the update.
// That's why we set ignore missing column families to true // That's why we set ignore missing column families to true
//
// If we pass DB through and options.max_successive_merges is hit
// during recovery, Get() will be issued which will try to acquire
// DB mutex and cause deadlock, as DB mutex is already held.
// The DB pointer is not needed unless 2PC is used.
// TODO(sdong) fix the allow_2pc case too.
bool has_valid_writes = false;
status = WriteBatchInternal::InsertInto( status = WriteBatchInternal::InsertInto(
&batch, column_family_memtables_.get(), &flush_scheduler_, true, &batch, column_family_memtables_.get(), &flush_scheduler_, true,
log_number, this, false /* concurrent_memtable_writes */, log_number, db_options_.allow_2pc ? this : nullptr, false /* concurrent_memtable_writes */,
next_sequence); next_sequence, &has_valid_writes);
// If it is the first log file and there is no column family updated
// after replaying the file, this file may be a stale file. We ignore
// sequence IDs from the file. Otherwise, if a newer stale log file that
// has been deleted, the sequenceID may be wrong.
if (no_prev_seq && !has_valid_writes) {
*next_sequence = kMaxSequenceNumber;
}
MaybeIgnoreError(&status); MaybeIgnoreError(&status);
if (!status.ok()) { if (!status.ok()) {
// We are treating this as a failure while reading since we read valid // We are treating this as a failure while reading since we read valid
@ -1575,7 +1591,7 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
continue; continue;
} }
if (!read_only) { if (has_valid_writes && !read_only) {
// we can do this because this is called before client has access to the // we can do this because this is called before client has access to the
// DB and there is only a single thread operating on DB // DB and there is only a single thread operating on DB
ColumnFamilyData* cfd; ColumnFamilyData* cfd;

View File

@ -11,6 +11,7 @@
#include "port/stack_trace.h" #include "port/stack_trace.h"
#include "rocksdb/iostats_context.h" #include "rocksdb/iostats_context.h"
#include "rocksdb/perf_context.h" #include "rocksdb/perf_context.h"
#include "port/port.h"
namespace rocksdb { namespace rocksdb {
@ -979,7 +980,7 @@ TEST_F(DBIteratorTest, PinnedDataIteratorRandomized) {
{ {
// Test Seek to random keys // Test Seek to random keys
printf("Testing seek on %zu keys\n", random_keys.size()); printf("Testing seek on %" ROCKSDB_PRIszt " keys\n", random_keys.size());
std::vector<Slice> keys_slices; std::vector<Slice> keys_slices;
std::vector<std::string> true_keys; std::vector<std::string> true_keys;
for (auto& k : random_keys) { for (auto& k : random_keys) {

View File

@ -12,6 +12,7 @@
#include "rocksdb/sst_file_manager.h" #include "rocksdb/sst_file_manager.h"
#include "rocksdb/sst_file_writer.h" #include "rocksdb/sst_file_writer.h"
#include "util/sst_file_manager_impl.h" #include "util/sst_file_manager_impl.h"
#include "port/port.h"
namespace rocksdb { namespace rocksdb {
@ -1455,7 +1456,7 @@ TEST_F(DBSSTTest, AddExternalSstFileOverlappingRanges) {
} }
printf( printf(
"Total: %zu ranges\n" "Total: %" ROCKSDB_PRIszt " ranges\n"
"AddFile()|Success: %d ranges\n" "AddFile()|Success: %d ranges\n"
"AddFile()|RangeConflict: %d ranges\n" "AddFile()|RangeConflict: %d ranges\n"
"Put(): %d ranges\n", "Put(): %d ranges\n",

View File

@ -1189,6 +1189,12 @@ bool MinLevelToCompress(CompressionType& type, Options& options, int wbits,
} else if (LZ4_Supported()) { } else if (LZ4_Supported()) {
type = kLZ4Compression; type = kLZ4Compression;
fprintf(stderr, "using lz4\n"); fprintf(stderr, "using lz4\n");
} else if (XPRESS_Supported()) {
type = kXpressCompression;
fprintf(stderr, "using xpress\n");
} else if (ZSTD_Supported()) {
type = kZSTDNotFinalCompression;
fprintf(stderr, "using ZSTD\n");
} else { } else {
fprintf(stderr, "skipping test, compression disabled\n"); fprintf(stderr, "skipping test, compression disabled\n");
return false; return false;
@ -4685,6 +4691,75 @@ TEST_F(DBTest, EncodeDecompressedBlockSizeTest) {
} }
} }
TEST_F(DBTest, CompressionStatsTest) {
CompressionType type;
if (Snappy_Supported()) {
type = kSnappyCompression;
fprintf(stderr, "using snappy\n");
} else if (Zlib_Supported()) {
type = kZlibCompression;
fprintf(stderr, "using zlib\n");
} else if (BZip2_Supported()) {
type = kBZip2Compression;
fprintf(stderr, "using bzip2\n");
} else if (LZ4_Supported()) {
type = kLZ4Compression;
fprintf(stderr, "using lz4\n");
} else if (XPRESS_Supported()) {
type = kXpressCompression;
fprintf(stderr, "using xpress\n");
} else if (ZSTD_Supported()) {
type = kZSTDNotFinalCompression;
fprintf(stderr, "using ZSTD\n");
} else {
fprintf(stderr, "skipping test, compression disabled\n");
return;
}
Options options = CurrentOptions();
options.compression = type;
options.statistics = rocksdb::CreateDBStatistics();
options.statistics->stats_level_ = StatsLevel::kAll;
DestroyAndReopen(options);
int kNumKeysWritten = 100000;
// Check that compressions occur and are counted when compression is turned on
Random rnd(301);
for (int i = 0; i < kNumKeysWritten; ++i) {
// compressible string
ASSERT_OK(Put(Key(i), RandomString(&rnd, 128) + std::string(128, 'a')));
}
ASSERT_GT(options.statistics->getTickerCount(NUMBER_BLOCK_COMPRESSED), 0);
for (int i = 0; i < kNumKeysWritten; ++i) {
auto r = Get(Key(i));
}
ASSERT_GT(options.statistics->getTickerCount(NUMBER_BLOCK_DECOMPRESSED), 0);
options.compression = kNoCompression;
DestroyAndReopen(options);
uint64_t currentCompressions =
options.statistics->getTickerCount(NUMBER_BLOCK_COMPRESSED);
uint64_t currentDecompressions =
options.statistics->getTickerCount(NUMBER_BLOCK_DECOMPRESSED);
// Check that compressions do not occur when turned off
for (int i = 0; i < kNumKeysWritten; ++i) {
// compressible string
ASSERT_OK(Put(Key(i), RandomString(&rnd, 128) + std::string(128, 'a')));
}
ASSERT_EQ(options.statistics->getTickerCount(NUMBER_BLOCK_COMPRESSED)
- currentCompressions, 0);
for (int i = 0; i < kNumKeysWritten; ++i) {
auto r = Get(Key(i));
}
ASSERT_EQ(options.statistics->getTickerCount(NUMBER_BLOCK_DECOMPRESSED)
- currentDecompressions, 0);
}
TEST_F(DBTest, MutexWaitStatsDisabledByDefault) { TEST_F(DBTest, MutexWaitStatsDisabledByDefault) {
Options options = CurrentOptions(); Options options = CurrentOptions();
options.create_if_missing = true; options.create_if_missing = true;

View File

@ -1487,6 +1487,23 @@ TEST_F(DBTest2, SyncPointMarker) {
rocksdb::SyncPoint::GetInstance()->DisableProcessing(); rocksdb::SyncPoint::GetInstance()->DisableProcessing();
} }
#endif #endif
TEST_F(DBTest2, MaxSuccessiveMergesInRecovery) {
Options options;
options = CurrentOptions(options);
options.merge_operator = MergeOperators::CreatePutOperator();
DestroyAndReopen(options);
db_->Put(WriteOptions(), "foo", "bar");
ASSERT_OK(db_->Merge(WriteOptions(), "foo", "bar"));
ASSERT_OK(db_->Merge(WriteOptions(), "foo", "bar"));
ASSERT_OK(db_->Merge(WriteOptions(), "foo", "bar"));
ASSERT_OK(db_->Merge(WriteOptions(), "foo", "bar"));
ASSERT_OK(db_->Merge(WriteOptions(), "foo", "bar"));
options.max_successive_merges = 3;
Reopen(options);
}
} // namespace rocksdb } // namespace rocksdb
int main(int argc, char** argv) { int main(int argc, char** argv) {

View File

@ -11,14 +11,15 @@
#include <inttypes.h> #include <inttypes.h>
#include <string> #include <string>
#include "rocksdb/db.h"
#include "db/memtable.h" #include "db/memtable.h"
#include "db/version_set.h" #include "db/version_set.h"
#include "rocksdb/db.h"
#include "rocksdb/env.h" #include "rocksdb/env.h"
#include "rocksdb/iterator.h" #include "rocksdb/iterator.h"
#include "table/merger.h" #include "table/merger.h"
#include "util/coding.h" #include "util/coding.h"
#include "util/log_buffer.h" #include "util/log_buffer.h"
#include "util/sync_point.h"
#include "util/thread_status_util.h" #include "util/thread_status_util.h"
namespace rocksdb { namespace rocksdb {
@ -297,69 +298,79 @@ Status MemTableList::InstallMemtableFlushResults(
// if some other thread is already committing, then return // if some other thread is already committing, then return
Status s; Status s;
if (commit_in_progress_) { if (commit_in_progress_) {
TEST_SYNC_POINT("MemTableList::InstallMemtableFlushResults:InProgress");
return s; return s;
} }
// Only a single thread can be executing this piece of code // Only a single thread can be executing this piece of code
commit_in_progress_ = true; commit_in_progress_ = true;
// scan all memtables from the earliest, and commit those // Retry until all completed flushes are committed. New flushes can finish
// (in that order) that have finished flushing. Memetables // while the current thread is writing manifest where mutex is released.
// are always committed in the order that they were created. while (s.ok()) {
uint64_t batch_file_number = 0; auto& memlist = current_->memlist_;
size_t batch_count = 0; if (memlist.empty() || !memlist.back()->flush_completed_) {
autovector<VersionEdit*> edit_list;
auto& memlist = current_->memlist_;
// enumerate from the last (earliest) element to see how many batch finished
for (auto it = memlist.rbegin(); it != memlist.rend(); ++it) {
MemTable* m = *it;
if (!m->flush_completed_) {
break; break;
} }
if (it == memlist.rbegin() || batch_file_number != m->file_number_) { // scan all memtables from the earliest, and commit those
batch_file_number = m->file_number_; // (in that order) that have finished flushing. Memetables
LogToBuffer(log_buffer, "[%s] Level-0 commit table #%" PRIu64 " started", // are always committed in the order that they were created.
cfd->GetName().c_str(), m->file_number_); uint64_t batch_file_number = 0;
edit_list.push_back(&m->edit_); size_t batch_count = 0;
} autovector<VersionEdit*> edit_list;
batch_count++; // enumerate from the last (earliest) element to see how many batch finished
} for (auto it = memlist.rbegin(); it != memlist.rend(); ++it) {
MemTable* m = *it;
if (batch_count > 0) { if (!m->flush_completed_) {
// this can release and reacquire the mutex. break;
s = vset->LogAndApply(cfd, mutable_cf_options, edit_list, mu, db_directory);
// we will be changing the version in the next code path,
// so we better create a new one, since versions are immutable
InstallNewVersion();
// All the later memtables that have the same filenum
// are part of the same batch. They can be committed now.
uint64_t mem_id = 1; // how many memtables have been flushed.
if (s.ok()) { // commit new state
while (batch_count-- > 0) {
MemTable* m = current_->memlist_.back();
LogToBuffer(log_buffer, "[%s] Level-0 commit table #%" PRIu64
": memtable #%" PRIu64 " done",
cfd->GetName().c_str(), m->file_number_, mem_id);
assert(m->file_number_ > 0);
current_->Remove(m, to_delete);
++mem_id;
} }
} else { if (it == memlist.rbegin() || batch_file_number != m->file_number_) {
for (auto it = current_->memlist_.rbegin(); batch_count-- > 0; it++) { batch_file_number = m->file_number_;
MemTable* m = *it; LogToBuffer(log_buffer,
// commit failed. setup state so that we can flush again. "[%s] Level-0 commit table #%" PRIu64 " started",
LogToBuffer(log_buffer, "Level-0 commit table #%" PRIu64 cfd->GetName().c_str(), m->file_number_);
": memtable #%" PRIu64 " failed", edit_list.push_back(&m->edit_);
m->file_number_, mem_id); }
m->flush_completed_ = false; batch_count++;
m->flush_in_progress_ = false; }
m->edit_.Clear();
num_flush_not_started_++; if (batch_count > 0) {
m->file_number_ = 0; // this can release and reacquire the mutex.
imm_flush_needed.store(true, std::memory_order_release); s = vset->LogAndApply(cfd, mutable_cf_options, edit_list, mu,
++mem_id; db_directory);
// we will be changing the version in the next code path,
// so we better create a new one, since versions are immutable
InstallNewVersion();
// All the later memtables that have the same filenum
// are part of the same batch. They can be committed now.
uint64_t mem_id = 1; // how many memtables have been flushed.
if (s.ok()) { // commit new state
while (batch_count-- > 0) {
MemTable* m = current_->memlist_.back();
LogToBuffer(log_buffer, "[%s] Level-0 commit table #%" PRIu64
": memtable #%" PRIu64 " done",
cfd->GetName().c_str(), m->file_number_, mem_id);
assert(m->file_number_ > 0);
current_->Remove(m, to_delete);
++mem_id;
}
} else {
for (auto it = current_->memlist_.rbegin(); batch_count-- > 0; it++) {
MemTable* m = *it;
// commit failed. setup state so that we can flush again.
LogToBuffer(log_buffer, "Level-0 commit table #%" PRIu64
": memtable #%" PRIu64 " failed",
m->file_number_, mem_id);
m->flush_completed_ = false;
m->flush_in_progress_ = false;
m->edit_.Clear();
num_flush_not_started_++;
m->file_number_ = 0;
imm_flush_needed.store(true, std::memory_order_release);
++mem_id;
}
} }
} }
} }

View File

@ -331,20 +331,19 @@ class TestPlainTableFactory : public PlainTableFactory {
TableProperties* props = nullptr; TableProperties* props = nullptr;
auto s = auto s =
ReadTableProperties(file.get(), file_size, kPlainTableMagicNumber, ReadTableProperties(file.get(), file_size, kPlainTableMagicNumber,
table_reader_options.ioptions.env, table_reader_options.ioptions, &props);
table_reader_options.ioptions.info_log, &props);
EXPECT_TRUE(s.ok()); EXPECT_TRUE(s.ok());
if (store_index_in_file_) { if (store_index_in_file_) {
BlockHandle bloom_block_handle; BlockHandle bloom_block_handle;
s = FindMetaBlock(file.get(), file_size, kPlainTableMagicNumber, s = FindMetaBlock(file.get(), file_size, kPlainTableMagicNumber,
table_reader_options.ioptions.env, table_reader_options.ioptions,
BloomBlockBuilder::kBloomBlock, &bloom_block_handle); BloomBlockBuilder::kBloomBlock, &bloom_block_handle);
EXPECT_TRUE(s.ok()); EXPECT_TRUE(s.ok());
BlockHandle index_block_handle; BlockHandle index_block_handle;
s = FindMetaBlock(file.get(), file_size, kPlainTableMagicNumber, s = FindMetaBlock(file.get(), file_size, kPlainTableMagicNumber,
table_reader_options.ioptions.env, table_reader_options.ioptions,
PlainTableIndexBuilder::kPlainTableIndexBlock, PlainTableIndexBuilder::kPlainTableIndexBlock,
&index_block_handle); &index_block_handle);
EXPECT_TRUE(s.ok()); EXPECT_TRUE(s.ok());

View File

@ -276,7 +276,7 @@ void TestCustomizedTablePropertiesCollector(
new test::StringSource(fwf->contents()))); new test::StringSource(fwf->contents())));
TableProperties* props; TableProperties* props;
Status s = ReadTableProperties(fake_file_reader.get(), fwf->contents().size(), Status s = ReadTableProperties(fake_file_reader.get(), fwf->contents().size(),
magic_number, Env::Default(), nullptr, &props); magic_number, ioptions, &props);
std::unique_ptr<TableProperties> props_guard(props); std::unique_ptr<TableProperties> props_guard(props);
ASSERT_OK(s); ASSERT_OK(s);
@ -417,7 +417,7 @@ void TestInternalKeyPropertiesCollector(
TableProperties* props; TableProperties* props;
Status s = Status s =
ReadTableProperties(reader.get(), fwf->contents().size(), magic_number, ReadTableProperties(reader.get(), fwf->contents().size(), magic_number,
Env::Default(), nullptr, &props); ioptions, &props);
ASSERT_OK(s); ASSERT_OK(s);
std::unique_ptr<TableProperties> props_guard(props); std::unique_ptr<TableProperties> props_guard(props);

View File

@ -597,8 +597,7 @@ Status Version::GetTableProperties(std::shared_ptr<const TableProperties>* tp,
new RandomAccessFileReader(std::move(file))); new RandomAccessFileReader(std::move(file)));
s = ReadTableProperties( s = ReadTableProperties(
file_reader.get(), file_meta->fd.GetFileSize(), file_reader.get(), file_meta->fd.GetFileSize(),
Footer::kInvalidTableMagicNumber /* table's magic number */, vset_->env_, Footer::kInvalidTableMagicNumber /* table's magic number */, *ioptions, &raw_table_properties);
ioptions->info_log, &raw_table_properties);
if (!s.ok()) { if (!s.ok()) {
return s; return s;
} }

View File

@ -694,6 +694,7 @@ class MemTableInserter : public WriteBatch::Handler {
uint64_t log_number_ref_; uint64_t log_number_ref_;
DBImpl* db_; DBImpl* db_;
const bool concurrent_memtable_writes_; const bool concurrent_memtable_writes_;
bool* has_valid_writes_;
typedef std::map<MemTable*, MemTablePostProcessInfo> MemPostInfoMap; typedef std::map<MemTable*, MemTablePostProcessInfo> MemPostInfoMap;
MemPostInfoMap mem_post_info_map_; MemPostInfoMap mem_post_info_map_;
// current recovered transaction we are rebuilding (recovery) // current recovered transaction we are rebuilding (recovery)
@ -704,7 +705,8 @@ class MemTableInserter : public WriteBatch::Handler {
FlushScheduler* flush_scheduler, FlushScheduler* flush_scheduler,
bool ignore_missing_column_families, bool ignore_missing_column_families,
uint64_t recovering_log_number, DB* db, uint64_t recovering_log_number, DB* db,
bool concurrent_memtable_writes) bool concurrent_memtable_writes,
bool* has_valid_writes = nullptr)
: sequence_(sequence), : sequence_(sequence),
cf_mems_(cf_mems), cf_mems_(cf_mems),
flush_scheduler_(flush_scheduler), flush_scheduler_(flush_scheduler),
@ -713,6 +715,7 @@ class MemTableInserter : public WriteBatch::Handler {
log_number_ref_(0), log_number_ref_(0),
db_(reinterpret_cast<DBImpl*>(db)), db_(reinterpret_cast<DBImpl*>(db)),
concurrent_memtable_writes_(concurrent_memtable_writes), concurrent_memtable_writes_(concurrent_memtable_writes),
has_valid_writes_(has_valid_writes),
rebuilding_trx_(nullptr) { rebuilding_trx_(nullptr) {
assert(cf_mems_); assert(cf_mems_);
} }
@ -756,6 +759,10 @@ class MemTableInserter : public WriteBatch::Handler {
return false; return false;
} }
if (has_valid_writes_ != nullptr) {
*has_valid_writes_ = true;
}
if (log_number_ref_ > 0) { if (log_number_ref_ > 0) {
cf_mems_->GetMemTable()->RefLogContainingPrepSection(log_number_ref_); cf_mems_->GetMemTable()->RefLogContainingPrepSection(log_number_ref_);
} }
@ -978,6 +985,9 @@ class MemTableInserter : public WriteBatch::Handler {
// we are now iterating through a prepared section // we are now iterating through a prepared section
rebuilding_trx_ = new WriteBatch(); rebuilding_trx_ = new WriteBatch();
if (has_valid_writes_ != nullptr) {
*has_valid_writes_ = true;
}
} else { } else {
// in non-recovery we ignore prepare markers // in non-recovery we ignore prepare markers
// and insert the values directly. making sure we have a // and insert the values directly. making sure we have a
@ -1031,6 +1041,9 @@ class MemTableInserter : public WriteBatch::Handler {
if (s.ok()) { if (s.ok()) {
db_->DeleteRecoveredTransaction(name.ToString()); db_->DeleteRecoveredTransaction(name.ToString());
} }
if (has_valid_writes_ != nullptr) {
*has_valid_writes_ = true;
}
} }
} else { } else {
// in non recovery we simply ignore this tag // in non recovery we simply ignore this tag
@ -1114,16 +1127,15 @@ Status WriteBatchInternal::InsertInto(WriteThread::Writer* writer,
return s; return s;
} }
Status WriteBatchInternal::InsertInto(const WriteBatch* batch, Status WriteBatchInternal::InsertInto(
ColumnFamilyMemTables* memtables, const WriteBatch* batch, ColumnFamilyMemTables* memtables,
FlushScheduler* flush_scheduler, FlushScheduler* flush_scheduler, bool ignore_missing_column_families,
bool ignore_missing_column_families, uint64_t log_number, DB* db, bool concurrent_memtable_writes,
uint64_t log_number, DB* db, SequenceNumber* last_seq_used, bool* has_valid_writes) {
bool concurrent_memtable_writes,
SequenceNumber* last_seq_used) {
MemTableInserter inserter(WriteBatchInternal::Sequence(batch), memtables, MemTableInserter inserter(WriteBatchInternal::Sequence(batch), memtables,
flush_scheduler, ignore_missing_column_families, flush_scheduler, ignore_missing_column_families,
log_number, db, concurrent_memtable_writes); log_number, db, concurrent_memtable_writes,
has_valid_writes);
Status s = batch->Iterate(&inserter); Status s = batch->Iterate(&inserter);
if (last_seq_used != nullptr) { if (last_seq_used != nullptr) {
*last_seq_used = inserter.get_final_sequence(); *last_seq_used = inserter.get_final_sequence();

View File

@ -160,7 +160,8 @@ class WriteBatchInternal {
bool ignore_missing_column_families = false, bool ignore_missing_column_families = false,
uint64_t log_number = 0, DB* db = nullptr, uint64_t log_number = 0, DB* db = nullptr,
bool concurrent_memtable_writes = false, bool concurrent_memtable_writes = false,
SequenceNumber* last_seq_used = nullptr); SequenceNumber* last_seq_used = nullptr,
bool* has_valid_writes = nullptr);
static Status InsertInto(WriteThread::Writer* writer, static Status InsertInto(WriteThread::Writer* writer,
ColumnFamilyMemTables* memtables, ColumnFamilyMemTables* memtables,

View File

@ -179,6 +179,11 @@ enum Tickers : uint32_t {
NUMBER_SUPERVERSION_ACQUIRES, NUMBER_SUPERVERSION_ACQUIRES,
NUMBER_SUPERVERSION_RELEASES, NUMBER_SUPERVERSION_RELEASES,
NUMBER_SUPERVERSION_CLEANUPS, NUMBER_SUPERVERSION_CLEANUPS,
// # of compressions/decompressions executed
NUMBER_BLOCK_COMPRESSED,
NUMBER_BLOCK_DECOMPRESSED,
NUMBER_BLOCK_NOT_COMPRESSED, NUMBER_BLOCK_NOT_COMPRESSED,
MERGE_OPERATION_TOTAL_TIME, MERGE_OPERATION_TOTAL_TIME,
FILTER_OPERATION_TOTAL_TIME, FILTER_OPERATION_TOTAL_TIME,
@ -211,6 +216,8 @@ const std::vector<std::pair<Tickers, std::string>> TickersNameMap = {
{BLOCK_CACHE_BYTES_READ, "rocksdb.block.cache.bytes.read"}, {BLOCK_CACHE_BYTES_READ, "rocksdb.block.cache.bytes.read"},
{BLOCK_CACHE_BYTES_WRITE, "rocksdb.block.cache.bytes.write"}, {BLOCK_CACHE_BYTES_WRITE, "rocksdb.block.cache.bytes.write"},
{BLOOM_FILTER_USEFUL, "rocksdb.bloom.filter.useful"}, {BLOOM_FILTER_USEFUL, "rocksdb.bloom.filter.useful"},
{PERSISTENT_CACHE_HIT, "rocksdb.persistent.cache.hit"},
{PERSISTENT_CACHE_MISS, "rocksdb.persistent.cache.miss"},
{MEMTABLE_HIT, "rocksdb.memtable.hit"}, {MEMTABLE_HIT, "rocksdb.memtable.hit"},
{MEMTABLE_MISS, "rocksdb.memtable.miss"}, {MEMTABLE_MISS, "rocksdb.memtable.miss"},
{GET_HIT_L0, "rocksdb.l0.hit"}, {GET_HIT_L0, "rocksdb.l0.hit"},
@ -260,15 +267,18 @@ const std::vector<std::pair<Tickers, std::string>> TickersNameMap = {
{WAL_FILE_BYTES, "rocksdb.wal.bytes"}, {WAL_FILE_BYTES, "rocksdb.wal.bytes"},
{WRITE_DONE_BY_SELF, "rocksdb.write.self"}, {WRITE_DONE_BY_SELF, "rocksdb.write.self"},
{WRITE_DONE_BY_OTHER, "rocksdb.write.other"}, {WRITE_DONE_BY_OTHER, "rocksdb.write.other"},
{WRITE_TIMEDOUT, "rocksdb.write.timeout"},
{WRITE_WITH_WAL, "rocksdb.write.wal"}, {WRITE_WITH_WAL, "rocksdb.write.wal"},
{FLUSH_WRITE_BYTES, "rocksdb.flush.write.bytes"},
{COMPACT_READ_BYTES, "rocksdb.compact.read.bytes"}, {COMPACT_READ_BYTES, "rocksdb.compact.read.bytes"},
{COMPACT_WRITE_BYTES, "rocksdb.compact.write.bytes"}, {COMPACT_WRITE_BYTES, "rocksdb.compact.write.bytes"},
{FLUSH_WRITE_BYTES, "rocksdb.flush.write.bytes"},
{NUMBER_DIRECT_LOAD_TABLE_PROPERTIES, {NUMBER_DIRECT_LOAD_TABLE_PROPERTIES,
"rocksdb.number.direct.load.table.properties"}, "rocksdb.number.direct.load.table.properties"},
{NUMBER_SUPERVERSION_ACQUIRES, "rocksdb.number.superversion_acquires"}, {NUMBER_SUPERVERSION_ACQUIRES, "rocksdb.number.superversion_acquires"},
{NUMBER_SUPERVERSION_RELEASES, "rocksdb.number.superversion_releases"}, {NUMBER_SUPERVERSION_RELEASES, "rocksdb.number.superversion_releases"},
{NUMBER_SUPERVERSION_CLEANUPS, "rocksdb.number.superversion_cleanups"}, {NUMBER_SUPERVERSION_CLEANUPS, "rocksdb.number.superversion_cleanups"},
{NUMBER_BLOCK_COMPRESSED, "rocksdb.number.block.compressed"},
{NUMBER_BLOCK_DECOMPRESSED, "rocksdb.number.block.decompressed"},
{NUMBER_BLOCK_NOT_COMPRESSED, "rocksdb.number.block.not_compressed"}, {NUMBER_BLOCK_NOT_COMPRESSED, "rocksdb.number.block.not_compressed"},
{MERGE_OPERATION_TOTAL_TIME, "rocksdb.merge.operation.time.nanos"}, {MERGE_OPERATION_TOTAL_TIME, "rocksdb.merge.operation.time.nanos"},
{FILTER_OPERATION_TOTAL_TIME, "rocksdb.filter.operation.time.nanos"}, {FILTER_OPERATION_TOTAL_TIME, "rocksdb.filter.operation.time.nanos"},
@ -313,6 +323,14 @@ enum Histograms : uint32_t {
BYTES_PER_READ, BYTES_PER_READ,
BYTES_PER_WRITE, BYTES_PER_WRITE,
BYTES_PER_MULTIGET, BYTES_PER_MULTIGET,
// number of bytes compressed/decompressed
// number of bytes is when uncompressed; i.e. before/after respectively
BYTES_COMPRESSED,
BYTES_DECOMPRESSED,
COMPRESSION_TIMES_NANOS,
DECOMPRESSION_TIMES_NANOS,
HISTOGRAM_ENUM_MAX, // TODO(ldemailly): enforce HistogramsNameMap match HISTOGRAM_ENUM_MAX, // TODO(ldemailly): enforce HistogramsNameMap match
}; };
@ -343,6 +361,10 @@ const std::vector<std::pair<Histograms, std::string>> HistogramsNameMap = {
{BYTES_PER_READ, "rocksdb.bytes.per.read"}, {BYTES_PER_READ, "rocksdb.bytes.per.read"},
{BYTES_PER_WRITE, "rocksdb.bytes.per.write"}, {BYTES_PER_WRITE, "rocksdb.bytes.per.write"},
{BYTES_PER_MULTIGET, "rocksdb.bytes.per.multiget"}, {BYTES_PER_MULTIGET, "rocksdb.bytes.per.multiget"},
{BYTES_COMPRESSED, "rocksdb.bytes.compressed"},
{BYTES_DECOMPRESSED, "rocksdb.bytes.decompressed"},
{COMPRESSION_TIMES_NANOS, "rocksdb.compression.times.nanos"},
{DECOMPRESSION_TIMES_NANOS, "rocksdb.decompression.times.nanos"},
}; };
struct HistogramData { struct HistogramData {
@ -357,6 +379,9 @@ enum StatsLevel {
// Collect all stats except the counters requiring to get time inside the // Collect all stats except the counters requiring to get time inside the
// mutex lock. // mutex lock.
kExceptTimeForMutex, kExceptTimeForMutex,
// Collect all stats expect time inside mutex lock AND time spent on
// compression
kExceptDetailedTimers,
// Collect all stats, including measuring duration of mutex operations. // Collect all stats, including measuring duration of mutex operations.
// If getting time is expensive on the platform to run, it can // If getting time is expensive on the platform to run, it can
// reduce scalability to more threads, especially for writes. // reduce scalability to more threads, especially for writes.

View File

@ -5,8 +5,8 @@
#pragma once #pragma once
#define ROCKSDB_MAJOR 4 #define ROCKSDB_MAJOR 4
#define ROCKSDB_MINOR 9 #define ROCKSDB_MINOR 10
#define ROCKSDB_PATCH 0 #define ROCKSDB_PATCH 2
// Do not use these. We made the mistake of declaring macros starting with // Do not use these. We made the mistake of declaring macros starting with
// double underscore. Now we have to live with our choice. We'll deprecate these // double underscore. Now we have to live with our choice. We'll deprecate these

2
src.mk
View File

@ -215,6 +215,7 @@ MAIN_SOURCES = \
db/db_compaction_filter_test.cc \ db/db_compaction_filter_test.cc \
db/db_compaction_test.cc \ db/db_compaction_test.cc \
db/db_dynamic_level_test.cc \ db/db_dynamic_level_test.cc \
db/db_flush_test.cc \
db/db_inplace_update_test.cc \ db/db_inplace_update_test.cc \
db/db_iterator_test.cc \ db/db_iterator_test.cc \
db/db_log_iter_test.cc \ db/db_log_iter_test.cc \
@ -274,6 +275,7 @@ MAIN_SOURCES = \
util/env_test.cc \ util/env_test.cc \
util/filelock_test.cc \ util/filelock_test.cc \
util/histogram_test.cc \ util/histogram_test.cc \
util/statistics_test.cc \
utilities/backupable/backupable_db_test.cc \ utilities/backupable/backupable_db_test.cc \
utilities/checkpoint/checkpoint_test.cc \ utilities/checkpoint/checkpoint_test.cc \
utilities/document/document_db_test.cc \ utilities/document/document_db_test.cc \

View File

@ -651,11 +651,16 @@ void BlockBasedTableBuilder::WriteBlock(const Slice& raw_block_contents,
auto type = r->compression_type; auto type = r->compression_type;
Slice block_contents; Slice block_contents;
bool abort_compression = false; bool abort_compression = false;
StopWatchNano timer(r->ioptions.env,
ShouldReportDetailedTime(r->ioptions.env, r->ioptions.statistics));
if (raw_block_contents.size() < kCompressionSizeLimit) { if (raw_block_contents.size() < kCompressionSizeLimit) {
Slice compression_dict; Slice compression_dict;
if (is_data_block && r->compression_dict && r->compression_dict->size()) { if (is_data_block && r->compression_dict && r->compression_dict->size()) {
compression_dict = *r->compression_dict; compression_dict = *r->compression_dict;
} }
block_contents = CompressBlock(raw_block_contents, r->compression_opts, block_contents = CompressBlock(raw_block_contents, r->compression_opts,
&type, r->table_options.format_version, &type, r->table_options.format_version,
compression_dict, &r->compressed_output); compression_dict, &r->compressed_output);
@ -668,7 +673,8 @@ void BlockBasedTableBuilder::WriteBlock(const Slice& raw_block_contents,
BlockContents contents; BlockContents contents;
Status stat = UncompressBlockContentsForCompressionType( Status stat = UncompressBlockContentsForCompressionType(
block_contents.data(), block_contents.size(), &contents, block_contents.data(), block_contents.size(), &contents,
r->table_options.format_version, compression_dict, type); r->table_options.format_version, compression_dict, type,
r->ioptions);
if (stat.ok()) { if (stat.ok()) {
bool compressed_ok = contents.data.compare(raw_block_contents) == 0; bool compressed_ok = contents.data.compare(raw_block_contents) == 0;
@ -698,6 +704,15 @@ void BlockBasedTableBuilder::WriteBlock(const Slice& raw_block_contents,
type = kNoCompression; type = kNoCompression;
block_contents = raw_block_contents; block_contents = raw_block_contents;
} }
else if (type != kNoCompression &&
ShouldReportDetailedTime(r->ioptions.env,
r->ioptions.statistics)) {
MeasureTime(r->ioptions.statistics, COMPRESSION_TIMES_NANOS,
timer.ElapsedNanos());
MeasureTime(r->ioptions.statistics, BYTES_COMPRESSED,
raw_block_contents.size());
RecordTick(r->ioptions.statistics, NUMBER_BLOCK_COMPRESSED);
}
WriteRawBlock(block_contents, type, handle); WriteRawBlock(block_contents, type, handle);
r->compressed_output.clear(); r->compressed_output.clear();

View File

@ -60,14 +60,13 @@ namespace {
// dictionary. // dictionary.
Status ReadBlockFromFile(RandomAccessFileReader* file, const Footer& footer, Status ReadBlockFromFile(RandomAccessFileReader* file, const Footer& footer,
const ReadOptions& options, const BlockHandle& handle, const ReadOptions& options, const BlockHandle& handle,
std::unique_ptr<Block>* result, Env* env, std::unique_ptr<Block>* result,
const ImmutableCFOptions &ioptions,
bool do_uncompress, const Slice& compression_dict, bool do_uncompress, const Slice& compression_dict,
const PersistentCacheOptions& cache_options, const PersistentCacheOptions& cache_options) {
Logger* info_log) {
BlockContents contents; BlockContents contents;
Status s = ReadBlockContents(file, footer, options, handle, &contents, env, Status s = ReadBlockContents(file, footer, options, handle, &contents, ioptions,
do_uncompress, compression_dict, cache_options, do_uncompress, compression_dict, cache_options);
info_log);
if (s.ok()) { if (s.ok()) {
result->reset(new Block(std::move(contents))); result->reset(new Block(std::move(contents)));
} }
@ -177,19 +176,18 @@ class BinarySearchIndexReader : public IndexReader {
// On success, index_reader will be populated; otherwise it will remain // On success, index_reader will be populated; otherwise it will remain
// unmodified. // unmodified.
static Status Create(RandomAccessFileReader* file, const Footer& footer, static Status Create(RandomAccessFileReader* file, const Footer& footer,
const BlockHandle& index_handle, Env* env, const BlockHandle& index_handle,
const ImmutableCFOptions &ioptions,
const Comparator* comparator, IndexReader** index_reader, const Comparator* comparator, IndexReader** index_reader,
const PersistentCacheOptions& cache_options, const PersistentCacheOptions& cache_options) {
Statistics* statistics) {
std::unique_ptr<Block> index_block; std::unique_ptr<Block> index_block;
auto s = ReadBlockFromFile(file, footer, ReadOptions(), index_handle, auto s = ReadBlockFromFile(file, footer, ReadOptions(), index_handle,
&index_block, env, true /* decompress */, &index_block, ioptions, true /* decompress */,
Slice() /*compression dict*/, cache_options, Slice() /*compression dict*/, cache_options);
/*info_log*/ nullptr);
if (s.ok()) { if (s.ok()) {
*index_reader = new BinarySearchIndexReader( *index_reader = new BinarySearchIndexReader(
comparator, std::move(index_block), statistics); comparator, std::move(index_block), ioptions.statistics);
} }
return s; return s;
@ -226,15 +224,15 @@ class HashIndexReader : public IndexReader {
public: public:
static Status Create( static Status Create(
const SliceTransform* hash_key_extractor, const Footer& footer, const SliceTransform* hash_key_extractor, const Footer& footer,
RandomAccessFileReader* file, Env* env, const Comparator* comparator, RandomAccessFileReader* file, const ImmutableCFOptions &ioptions,
const BlockHandle& index_handle, InternalIterator* meta_index_iter, const Comparator* comparator, const BlockHandle& index_handle,
IndexReader** index_reader, bool hash_index_allow_collision, InternalIterator* meta_index_iter, IndexReader** index_reader,
const PersistentCacheOptions& cache_options, Statistics* statistics) { bool hash_index_allow_collision,
const PersistentCacheOptions& cache_options) {
std::unique_ptr<Block> index_block; std::unique_ptr<Block> index_block;
auto s = ReadBlockFromFile(file, footer, ReadOptions(), index_handle, auto s = ReadBlockFromFile(file, footer, ReadOptions(), index_handle,
&index_block, env, true /* decompress */, &index_block, ioptions, true /* decompress */,
Slice() /*compression dict*/, cache_options, Slice() /*compression dict*/, cache_options);
/*info_log*/ nullptr);
if (!s.ok()) { if (!s.ok()) {
return s; return s;
@ -245,7 +243,8 @@ class HashIndexReader : public IndexReader {
// So, Create will succeed regardless, from this point on. // So, Create will succeed regardless, from this point on.
auto new_index_reader = auto new_index_reader =
new HashIndexReader(comparator, std::move(index_block), statistics); new HashIndexReader(comparator, std::move(index_block),
ioptions.statistics);
*index_reader = new_index_reader; *index_reader = new_index_reader;
// Get prefixes block // Get prefixes block
@ -269,14 +268,14 @@ class HashIndexReader : public IndexReader {
// Read contents for the blocks // Read contents for the blocks
BlockContents prefixes_contents; BlockContents prefixes_contents;
s = ReadBlockContents(file, footer, ReadOptions(), prefixes_handle, s = ReadBlockContents(file, footer, ReadOptions(), prefixes_handle,
&prefixes_contents, env, true /* decompress */, &prefixes_contents, ioptions, true /* decompress */,
Slice() /*compression dict*/, cache_options); Slice() /*compression dict*/, cache_options);
if (!s.ok()) { if (!s.ok()) {
return s; return s;
} }
BlockContents prefixes_meta_contents; BlockContents prefixes_meta_contents;
s = ReadBlockContents(file, footer, ReadOptions(), prefixes_meta_handle, s = ReadBlockContents(file, footer, ReadOptions(), prefixes_meta_handle,
&prefixes_meta_contents, env, true /* decompress */, &prefixes_meta_contents, ioptions, true /* decompress */,
Slice() /*compression dict*/, cache_options); Slice() /*compression dict*/, cache_options);
if (!s.ok()) { if (!s.ok()) {
// TODO: log error // TODO: log error
@ -547,7 +546,7 @@ Status BlockBasedTable::Open(const ImmutableCFOptions& ioptions,
PersistentCacheOptions(rep->table_options.persistent_cache, PersistentCacheOptions(rep->table_options.persistent_cache,
std::string(rep->persistent_cache_key_prefix, std::string(rep->persistent_cache_key_prefix,
rep->persistent_cache_key_prefix_size), rep->persistent_cache_key_prefix_size),
rep->ioptions.statistics); rep->ioptions.statistics);
// Read meta index // Read meta index
std::unique_ptr<Block> meta; std::unique_ptr<Block> meta;
@ -585,8 +584,7 @@ Status BlockBasedTable::Open(const ImmutableCFOptions& ioptions,
TableProperties* table_properties = nullptr; TableProperties* table_properties = nullptr;
if (s.ok()) { if (s.ok()) {
s = ReadProperties(meta_iter->value(), rep->file.get(), rep->footer, s = ReadProperties(meta_iter->value(), rep->file.get(), rep->footer,
rep->ioptions.env, rep->ioptions.info_log, rep->ioptions, &table_properties);
&table_properties);
} }
if (!s.ok()) { if (!s.ok()) {
@ -613,7 +611,7 @@ Status BlockBasedTable::Open(const ImmutableCFOptions& ioptions,
// true. // true.
unique_ptr<BlockContents> compression_dict_block{new BlockContents()}; unique_ptr<BlockContents> compression_dict_block{new BlockContents()};
s = rocksdb::ReadMetaBlock(rep->file.get(), file_size, s = rocksdb::ReadMetaBlock(rep->file.get(), file_size,
kBlockBasedTableMagicNumber, rep->ioptions.env, kBlockBasedTableMagicNumber, rep->ioptions,
rocksdb::kCompressionDictBlock, rocksdb::kCompressionDictBlock,
compression_dict_block.get()); compression_dict_block.get());
if (!s.ok()) { if (!s.ok()) {
@ -745,9 +743,9 @@ Status BlockBasedTable::ReadMetaBlock(Rep* rep,
std::unique_ptr<Block> meta; std::unique_ptr<Block> meta;
Status s = ReadBlockFromFile( Status s = ReadBlockFromFile(
rep->file.get(), rep->footer, ReadOptions(), rep->file.get(), rep->footer, ReadOptions(),
rep->footer.metaindex_handle(), &meta, rep->ioptions.env, rep->footer.metaindex_handle(), &meta, rep->ioptions,
true /* decompress */, Slice() /*compression dict*/, true /* decompress */, Slice() /*compression dict*/,
rep->persistent_cache_options, rep->ioptions.info_log); rep->persistent_cache_options);
if (!s.ok()) { if (!s.ok()) {
Log(InfoLogLevel::ERROR_LEVEL, rep->ioptions.info_log, Log(InfoLogLevel::ERROR_LEVEL, rep->ioptions.info_log,
@ -764,13 +762,14 @@ Status BlockBasedTable::ReadMetaBlock(Rep* rep,
Status BlockBasedTable::GetDataBlockFromCache( Status BlockBasedTable::GetDataBlockFromCache(
const Slice& block_cache_key, const Slice& compressed_block_cache_key, const Slice& block_cache_key, const Slice& compressed_block_cache_key,
Cache* block_cache, Cache* block_cache_compressed, Statistics* statistics, Cache* block_cache, Cache* block_cache_compressed,
const ReadOptions& read_options, const ImmutableCFOptions &ioptions, const ReadOptions& read_options,
BlockBasedTable::CachableEntry<Block>* block, uint32_t format_version, BlockBasedTable::CachableEntry<Block>* block, uint32_t format_version,
const Slice& compression_dict) { const Slice& compression_dict) {
Status s; Status s;
Block* compressed_block = nullptr; Block* compressed_block = nullptr;
Cache::Handle* block_cache_compressed_handle = nullptr; Cache::Handle* block_cache_compressed_handle = nullptr;
Statistics* statistics = ioptions.statistics;
// Lookup uncompressed cache first // Lookup uncompressed cache first
if (block_cache != nullptr) { if (block_cache != nullptr) {
@ -811,7 +810,8 @@ Status BlockBasedTable::GetDataBlockFromCache(
BlockContents contents; BlockContents contents;
s = UncompressBlockContents(compressed_block->data(), s = UncompressBlockContents(compressed_block->data(),
compressed_block->size(), &contents, compressed_block->size(), &contents,
format_version, compression_dict); format_version, compression_dict,
ioptions);
// Insert uncompressed block into block cache // Insert uncompressed block into block cache
if (s.ok()) { if (s.ok()) {
@ -840,7 +840,7 @@ Status BlockBasedTable::GetDataBlockFromCache(
Status BlockBasedTable::PutDataBlockToCache( Status BlockBasedTable::PutDataBlockToCache(
const Slice& block_cache_key, const Slice& compressed_block_cache_key, const Slice& block_cache_key, const Slice& compressed_block_cache_key,
Cache* block_cache, Cache* block_cache_compressed, Cache* block_cache, Cache* block_cache_compressed,
const ReadOptions& read_options, Statistics* statistics, const ReadOptions& read_options, const ImmutableCFOptions &ioptions,
CachableEntry<Block>* block, Block* raw_block, uint32_t format_version, CachableEntry<Block>* block, Block* raw_block, uint32_t format_version,
const Slice& compression_dict) { const Slice& compression_dict) {
assert(raw_block->compression_type() == kNoCompression || assert(raw_block->compression_type() == kNoCompression ||
@ -849,9 +849,10 @@ Status BlockBasedTable::PutDataBlockToCache(
Status s; Status s;
// Retrieve the uncompressed contents into a new buffer // Retrieve the uncompressed contents into a new buffer
BlockContents contents; BlockContents contents;
Statistics* statistics = ioptions.statistics;
if (raw_block->compression_type() != kNoCompression) { if (raw_block->compression_type() != kNoCompression) {
s = UncompressBlockContents(raw_block->data(), raw_block->size(), &contents, s = UncompressBlockContents(raw_block->data(), raw_block->size(), &contents,
format_version, compression_dict); format_version, compression_dict, ioptions);
} }
if (!s.ok()) { if (!s.ok()) {
delete raw_block; delete raw_block;
@ -913,7 +914,7 @@ FilterBlockReader* BlockBasedTable::ReadFilter(Rep* rep) {
} }
BlockContents block; BlockContents block;
if (!ReadBlockContents(rep->file.get(), rep->footer, ReadOptions(), if (!ReadBlockContents(rep->file.get(), rep->footer, ReadOptions(),
rep->filter_handle, &block, rep->ioptions.env, rep->filter_handle, &block, rep->ioptions,
false /* decompress */, Slice() /*compression dict*/, false /* decompress */, Slice() /*compression dict*/,
rep->persistent_cache_options) rep->persistent_cache_options)
.ok()) { .ok()) {
@ -1148,7 +1149,7 @@ InternalIterator* BlockBasedTable::NewDataBlockIterator(
} }
s = GetDataBlockFromCache( s = GetDataBlockFromCache(
key, ckey, block_cache, block_cache_compressed, statistics, ro, &block, key, ckey, block_cache, block_cache_compressed, rep->ioptions, ro, &block,
rep->table_options.format_version, compression_dict); rep->table_options.format_version, compression_dict);
if (block.value == nullptr && !no_io && ro.fill_cache) { if (block.value == nullptr && !no_io && ro.fill_cache) {
@ -1156,15 +1157,14 @@ InternalIterator* BlockBasedTable::NewDataBlockIterator(
{ {
StopWatch sw(rep->ioptions.env, statistics, READ_BLOCK_GET_MICROS); StopWatch sw(rep->ioptions.env, statistics, READ_BLOCK_GET_MICROS);
s = ReadBlockFromFile(rep->file.get(), rep->footer, ro, handle, s = ReadBlockFromFile(rep->file.get(), rep->footer, ro, handle,
&raw_block, rep->ioptions.env, &raw_block, rep->ioptions,
block_cache_compressed == nullptr, block_cache_compressed == nullptr,
compression_dict, rep->persistent_cache_options, compression_dict, rep->persistent_cache_options);
rep->ioptions.info_log);
} }
if (s.ok()) { if (s.ok()) {
s = PutDataBlockToCache(key, ckey, block_cache, block_cache_compressed, s = PutDataBlockToCache(key, ckey, block_cache, block_cache_compressed,
ro, statistics, &block, raw_block.release(), ro, rep->ioptions, &block, raw_block.release(),
rep->table_options.format_version, rep->table_options.format_version,
compression_dict); compression_dict);
} }
@ -1184,9 +1184,8 @@ InternalIterator* BlockBasedTable::NewDataBlockIterator(
} }
std::unique_ptr<Block> block_value; std::unique_ptr<Block> block_value;
s = ReadBlockFromFile(rep->file.get(), rep->footer, ro, handle, s = ReadBlockFromFile(rep->file.get(), rep->footer, ro, handle,
&block_value, rep->ioptions.env, true /* compress */, &block_value, rep->ioptions, true /* compress */,
compression_dict, rep->persistent_cache_options, compression_dict, rep->persistent_cache_options);
rep->ioptions.info_log);
if (s.ok()) { if (s.ok()) {
block.value = block_value.release(); block.value = block_value.release();
} }
@ -1510,8 +1509,9 @@ bool BlockBasedTable::TEST_KeyInCache(const ReadOptions& options,
handle, cache_key_storage); handle, cache_key_storage);
Slice ckey; Slice ckey;
s = GetDataBlockFromCache(cache_key, ckey, block_cache, nullptr, nullptr, s = GetDataBlockFromCache(cache_key, ckey, block_cache, nullptr,
options, &block, rep_->table_options.format_version, rep_->ioptions, options, &block,
rep_->table_options.format_version,
rep_->compression_dict_block rep_->compression_dict_block
? rep_->compression_dict_block->data ? rep_->compression_dict_block->data
: Slice()); : Slice());
@ -1544,11 +1544,8 @@ Status BlockBasedTable::CreateIndexReader(
} }
auto file = rep_->file.get(); auto file = rep_->file.get();
auto env = rep_->ioptions.env;
auto comparator = &rep_->internal_comparator; auto comparator = &rep_->internal_comparator;
const Footer& footer = rep_->footer; const Footer& footer = rep_->footer;
Statistics* stats = rep_->ioptions.statistics;
if (index_type_on_file == BlockBasedTableOptions::kHashSearch && if (index_type_on_file == BlockBasedTableOptions::kHashSearch &&
rep_->ioptions.prefix_extractor == nullptr) { rep_->ioptions.prefix_extractor == nullptr) {
Log(InfoLogLevel::WARN_LEVEL, rep_->ioptions.info_log, Log(InfoLogLevel::WARN_LEVEL, rep_->ioptions.info_log,
@ -1561,8 +1558,8 @@ Status BlockBasedTable::CreateIndexReader(
switch (index_type_on_file) { switch (index_type_on_file) {
case BlockBasedTableOptions::kBinarySearch: { case BlockBasedTableOptions::kBinarySearch: {
return BinarySearchIndexReader::Create( return BinarySearchIndexReader::Create(
file, footer, footer.index_handle(), env, comparator, index_reader, file, footer, footer.index_handle(), rep_->ioptions, comparator,
rep_->persistent_cache_options, stats); index_reader, rep_->persistent_cache_options);
} }
case BlockBasedTableOptions::kHashSearch: { case BlockBasedTableOptions::kHashSearch: {
std::unique_ptr<Block> meta_guard; std::unique_ptr<Block> meta_guard;
@ -1577,8 +1574,8 @@ Status BlockBasedTable::CreateIndexReader(
"Unable to read the metaindex block." "Unable to read the metaindex block."
" Fall back to binary search index."); " Fall back to binary search index.");
return BinarySearchIndexReader::Create( return BinarySearchIndexReader::Create(
file, footer, footer.index_handle(), env, comparator, file, footer, footer.index_handle(), rep_->ioptions, comparator,
index_reader, rep_->persistent_cache_options, stats); index_reader, rep_->persistent_cache_options);
} }
meta_index_iter = meta_iter_guard.get(); meta_index_iter = meta_iter_guard.get();
} }
@ -1588,10 +1585,9 @@ Status BlockBasedTable::CreateIndexReader(
rep_->internal_prefix_transform.reset( rep_->internal_prefix_transform.reset(
new InternalKeySliceTransform(rep_->ioptions.prefix_extractor)); new InternalKeySliceTransform(rep_->ioptions.prefix_extractor));
return HashIndexReader::Create( return HashIndexReader::Create(
rep_->internal_prefix_transform.get(), footer, file, env, comparator, rep_->internal_prefix_transform.get(), footer, file, rep_->ioptions,
footer.index_handle(), meta_index_iter, index_reader, comparator, footer.index_handle(), meta_index_iter, index_reader,
rep_->hash_index_allow_collision, rep_->persistent_cache_options, rep_->hash_index_allow_collision, rep_->persistent_cache_options);
stats);
} }
default: { default: {
std::string error_message = std::string error_message =
@ -1711,7 +1707,7 @@ Status BlockBasedTable::DumpTable(WritableFile* out_file) {
BlockContents block; BlockContents block;
if (ReadBlockContents( if (ReadBlockContents(
rep_->file.get(), rep_->footer, ReadOptions(), handle, &block, rep_->file.get(), rep_->footer, ReadOptions(), handle, &block,
rep_->ioptions.env, false /*decompress*/, rep_->ioptions, false /*decompress*/,
Slice() /*compression dict*/, rep_->persistent_cache_options) Slice() /*compression dict*/, rep_->persistent_cache_options)
.ok()) { .ok()) {
rep_->filter.reset(new BlockBasedFilterBlockReader( rep_->filter.reset(new BlockBasedFilterBlockReader(

View File

@ -177,8 +177,8 @@ class BlockBasedTable : public TableReader {
// dictionary. // dictionary.
static Status GetDataBlockFromCache( static Status GetDataBlockFromCache(
const Slice& block_cache_key, const Slice& compressed_block_cache_key, const Slice& block_cache_key, const Slice& compressed_block_cache_key,
Cache* block_cache, Cache* block_cache_compressed, Statistics* statistics, Cache* block_cache, Cache* block_cache_compressed,
const ReadOptions& read_options, const ImmutableCFOptions &ioptions, const ReadOptions& read_options,
BlockBasedTable::CachableEntry<Block>* block, uint32_t format_version, BlockBasedTable::CachableEntry<Block>* block, uint32_t format_version,
const Slice& compression_dict); const Slice& compression_dict);
@ -195,7 +195,7 @@ class BlockBasedTable : public TableReader {
static Status PutDataBlockToCache( static Status PutDataBlockToCache(
const Slice& block_cache_key, const Slice& compressed_block_cache_key, const Slice& block_cache_key, const Slice& compressed_block_cache_key,
Cache* block_cache, Cache* block_cache_compressed, Cache* block_cache, Cache* block_cache_compressed,
const ReadOptions& read_options, Statistics* statistics, const ReadOptions& read_options, const ImmutableCFOptions &ioptions,
CachableEntry<Block>* block, Block* raw_block, uint32_t format_version, CachableEntry<Block>* block, Block* raw_block, uint32_t format_version,
const Slice& compression_dict); const Slice& compression_dict);

View File

@ -49,12 +49,16 @@ class CuckooBuilderTest : public testing::Test {
uint64_t read_file_size; uint64_t read_file_size;
ASSERT_OK(env_->GetFileSize(fname, &read_file_size)); ASSERT_OK(env_->GetFileSize(fname, &read_file_size));
Options options;
options.allow_mmap_reads = true;
ImmutableCFOptions ioptions(options);
// Assert Table Properties. // Assert Table Properties.
TableProperties* props = nullptr; TableProperties* props = nullptr;
unique_ptr<RandomAccessFileReader> file_reader( unique_ptr<RandomAccessFileReader> file_reader(
new RandomAccessFileReader(std::move(read_file))); new RandomAccessFileReader(std::move(read_file)));
ASSERT_OK(ReadTableProperties(file_reader.get(), read_file_size, ASSERT_OK(ReadTableProperties(file_reader.get(), read_file_size,
kCuckooTableMagicNumber, env_, nullptr, kCuckooTableMagicNumber, ioptions,
&props)); &props));
// Check unused bucket. // Check unused bucket.
std::string unused_key = props->user_collected_properties[ std::string unused_key = props->user_collected_properties[

View File

@ -45,7 +45,7 @@ CuckooTableReader::CuckooTableReader(
} }
TableProperties* props = nullptr; TableProperties* props = nullptr;
status_ = ReadTableProperties(file_.get(), file_size, kCuckooTableMagicNumber, status_ = ReadTableProperties(file_.get(), file_size, kCuckooTableMagicNumber,
ioptions.env, ioptions.info_log, &props); ioptions, &props);
if (!status_.ok()) { if (!status_.ok()) {
return; return;
} }

View File

@ -23,6 +23,9 @@
#include "util/perf_context_imp.h" #include "util/perf_context_imp.h"
#include "util/string_util.h" #include "util/string_util.h"
#include "util/xxhash.h" #include "util/xxhash.h"
#include "util/statistics.h"
#include "util/stop_watch.h"
namespace rocksdb { namespace rocksdb {
@ -39,6 +42,11 @@ const uint64_t kPlainTableMagicNumber = 0;
#endif #endif
const uint32_t DefaultStackBufferSize = 5000; const uint32_t DefaultStackBufferSize = 5000;
bool ShouldReportDetailedTime(Env* env, Statistics* stats) {
return env != nullptr && stats != nullptr &&
stats->stats_level_ > kExceptDetailedTimers;
}
void BlockHandle::EncodeTo(std::string* dst) const { void BlockHandle::EncodeTo(std::string* dst) const {
// Sanity check that all fields have been set // Sanity check that all fields have been set
assert(offset_ != ~static_cast<uint64_t>(0)); assert(offset_ != ~static_cast<uint64_t>(0));
@ -297,10 +305,10 @@ Status ReadBlock(RandomAccessFileReader* file, const Footer& footer,
Status ReadBlockContents(RandomAccessFileReader* file, const Footer& footer, Status ReadBlockContents(RandomAccessFileReader* file, const Footer& footer,
const ReadOptions& read_options, const ReadOptions& read_options,
const BlockHandle& handle, BlockContents* contents, const BlockHandle& handle, BlockContents* contents,
Env* env, bool decompression_requested, const ImmutableCFOptions &ioptions,
bool decompression_requested,
const Slice& compression_dict, const Slice& compression_dict,
const PersistentCacheOptions& cache_options, const PersistentCacheOptions& cache_options) {
Logger* info_log) {
Status status; Status status;
Slice slice; Slice slice;
size_t n = static_cast<size_t>(handle.size()); size_t n = static_cast<size_t>(handle.size());
@ -318,9 +326,9 @@ Status ReadBlockContents(RandomAccessFileReader* file, const Footer& footer,
return status; return status;
} else { } else {
// uncompressed page is not found // uncompressed page is not found
if (info_log && !status.IsNotFound()) { if (ioptions.info_log && !status.IsNotFound()) {
assert(!status.ok()); assert(!status.ok());
Log(InfoLogLevel::INFO_LEVEL, info_log, Log(InfoLogLevel::INFO_LEVEL, ioptions.info_log,
"Error reading from persistent cache. %s", "Error reading from persistent cache. %s",
status.ToString().c_str()); status.ToString().c_str());
} }
@ -341,9 +349,9 @@ Status ReadBlockContents(RandomAccessFileReader* file, const Footer& footer,
used_buf = heap_buf.get(); used_buf = heap_buf.get();
slice = Slice(heap_buf.get(), n); slice = Slice(heap_buf.get(), n);
} else { } else {
if (info_log && !status.IsNotFound()) { if (ioptions.info_log && !status.IsNotFound()) {
assert(!status.ok()); assert(!status.ok());
Log(InfoLogLevel::INFO_LEVEL, info_log, Log(InfoLogLevel::INFO_LEVEL, ioptions.info_log,
"Error reading from persistent cache. %s", status.ToString().c_str()); "Error reading from persistent cache. %s", status.ToString().c_str());
} }
// cache miss read from device // cache miss read from device
@ -378,7 +386,8 @@ Status ReadBlockContents(RandomAccessFileReader* file, const Footer& footer,
if (decompression_requested && compression_type != kNoCompression) { if (decompression_requested && compression_type != kNoCompression) {
// compressed page, uncompress, update cache // compressed page, uncompress, update cache
status = UncompressBlockContents(slice.data(), n, contents, status = UncompressBlockContents(slice.data(), n, contents,
footer.version(), compression_dict); footer.version(), compression_dict,
ioptions);
} else if (slice.data() != used_buf) { } else if (slice.data() != used_buf) {
// the slice content is not the buffer provided // the slice content is not the buffer provided
*contents = BlockContents(Slice(slice.data(), n), false, compression_type); *contents = BlockContents(Slice(slice.data(), n), false, compression_type);
@ -405,11 +414,13 @@ Status ReadBlockContents(RandomAccessFileReader* file, const Footer& footer,
Status UncompressBlockContentsForCompressionType( Status UncompressBlockContentsForCompressionType(
const char* data, size_t n, BlockContents* contents, const char* data, size_t n, BlockContents* contents,
uint32_t format_version, const Slice& compression_dict, uint32_t format_version, const Slice& compression_dict,
CompressionType compression_type) { CompressionType compression_type, const ImmutableCFOptions &ioptions) {
std::unique_ptr<char[]> ubuf; std::unique_ptr<char[]> ubuf;
assert(compression_type != kNoCompression && "Invalid compression type"); assert(compression_type != kNoCompression && "Invalid compression type");
StopWatchNano timer(ioptions.env,
ShouldReportDetailedTime(ioptions.env, ioptions.statistics));
int decompress_size = 0; int decompress_size = 0;
switch (compression_type) { switch (compression_type) {
case kSnappyCompression: { case kSnappyCompression: {
@ -501,6 +512,13 @@ Status UncompressBlockContentsForCompressionType(
return Status::Corruption("bad block type"); return Status::Corruption("bad block type");
} }
if(ShouldReportDetailedTime(ioptions.env, ioptions.statistics)){
MeasureTime(ioptions.statistics, DECOMPRESSION_TIMES_NANOS,
timer.ElapsedNanos());
MeasureTime(ioptions.statistics, BYTES_DECOMPRESSED, contents->data.size());
RecordTick(ioptions.statistics, NUMBER_BLOCK_DECOMPRESSED);
}
return Status::OK(); return Status::OK();
} }
@ -513,11 +531,12 @@ Status UncompressBlockContentsForCompressionType(
// format_version is the block format as defined in include/rocksdb/table.h // format_version is the block format as defined in include/rocksdb/table.h
Status UncompressBlockContents(const char* data, size_t n, Status UncompressBlockContents(const char* data, size_t n,
BlockContents* contents, uint32_t format_version, BlockContents* contents, uint32_t format_version,
const Slice& compression_dict) { const Slice& compression_dict,
const ImmutableCFOptions &ioptions) {
assert(data[n] != kNoCompression); assert(data[n] != kNoCompression);
return UncompressBlockContentsForCompressionType( return UncompressBlockContentsForCompressionType(
data, n, contents, format_version, compression_dict, data, n, contents, format_version, compression_dict,
(CompressionType)data[n]); (CompressionType)data[n], ioptions);
} }
} // namespace rocksdb } // namespace rocksdb

View File

@ -24,6 +24,8 @@ class Block;
class RandomAccessFile; class RandomAccessFile;
struct ReadOptions; struct ReadOptions;
extern bool ShouldReportDetailedTime(Env* env, Statistics* stats);
// the length of the magic number in bytes. // the length of the magic number in bytes.
const int kMagicNumberLengthByte = 8; const int kMagicNumberLengthByte = 8;
@ -212,10 +214,9 @@ struct BlockContents {
extern Status ReadBlockContents( extern Status ReadBlockContents(
RandomAccessFileReader* file, const Footer& footer, RandomAccessFileReader* file, const Footer& footer,
const ReadOptions& options, const BlockHandle& handle, const ReadOptions& options, const BlockHandle& handle,
BlockContents* contents, Env* env, bool do_uncompress = true, BlockContents* contents, const ImmutableCFOptions &ioptions,
const Slice& compression_dict = Slice(), bool do_uncompress = true, const Slice& compression_dict = Slice(),
const PersistentCacheOptions& cache_options = PersistentCacheOptions(), const PersistentCacheOptions& cache_options = PersistentCacheOptions());
Logger* info_log = nullptr);
// The 'data' points to the raw block contents read in from file. // The 'data' points to the raw block contents read in from file.
// This method allocates a new heap buffer and the raw block // This method allocates a new heap buffer and the raw block
@ -227,7 +228,8 @@ extern Status ReadBlockContents(
extern Status UncompressBlockContents(const char* data, size_t n, extern Status UncompressBlockContents(const char* data, size_t n,
BlockContents* contents, BlockContents* contents,
uint32_t compress_format_version, uint32_t compress_format_version,
const Slice& compression_dict); const Slice& compression_dict,
const ImmutableCFOptions &ioptions);
// This is an extension to UncompressBlockContents that accepts // This is an extension to UncompressBlockContents that accepts
// a specific compression type. This is used by un-wrapped blocks // a specific compression type. This is used by un-wrapped blocks
@ -235,7 +237,7 @@ extern Status UncompressBlockContents(const char* data, size_t n,
extern Status UncompressBlockContentsForCompressionType( extern Status UncompressBlockContentsForCompressionType(
const char* data, size_t n, BlockContents* contents, const char* data, size_t n, BlockContents* contents,
uint32_t compress_format_version, const Slice& compression_dict, uint32_t compress_format_version, const Slice& compression_dict,
CompressionType compression_type); CompressionType compression_type, const ImmutableCFOptions &ioptions);
// Implementation details follow. Clients should ignore, // Implementation details follow. Clients should ignore,

View File

@ -150,7 +150,7 @@ bool NotifyCollectTableCollectorsOnFinish(
} }
Status ReadProperties(const Slice& handle_value, RandomAccessFileReader* file, Status ReadProperties(const Slice& handle_value, RandomAccessFileReader* file,
const Footer& footer, Env* env, Logger* logger, const Footer& footer, const ImmutableCFOptions &ioptions,
TableProperties** table_properties) { TableProperties** table_properties) {
assert(table_properties); assert(table_properties);
@ -165,7 +165,7 @@ Status ReadProperties(const Slice& handle_value, RandomAccessFileReader* file,
read_options.verify_checksums = false; read_options.verify_checksums = false;
Status s; Status s;
s = ReadBlockContents(file, footer, read_options, handle, &block_contents, s = ReadBlockContents(file, footer, read_options, handle, &block_contents,
env, false /* decompress */); ioptions, false /* decompress */);
if (!s.ok()) { if (!s.ok()) {
return s; return s;
@ -219,7 +219,8 @@ Status ReadProperties(const Slice& handle_value, RandomAccessFileReader* file,
auto error_msg = auto error_msg =
"Detect malformed value in properties meta-block:" "Detect malformed value in properties meta-block:"
"\tkey: " + key + "\tval: " + raw_val.ToString(); "\tkey: " + key + "\tval: " + raw_val.ToString();
Log(InfoLogLevel::ERROR_LEVEL, logger, "%s", error_msg.c_str()); Log(InfoLogLevel::ERROR_LEVEL, ioptions.info_log, "%s",
error_msg.c_str());
continue; continue;
} }
*(pos->second) = val; *(pos->second) = val;
@ -251,8 +252,9 @@ Status ReadProperties(const Slice& handle_value, RandomAccessFileReader* file,
} }
Status ReadTableProperties(RandomAccessFileReader* file, uint64_t file_size, Status ReadTableProperties(RandomAccessFileReader* file, uint64_t file_size,
uint64_t table_magic_number, Env* env, uint64_t table_magic_number,
Logger* info_log, TableProperties** properties) { const ImmutableCFOptions &ioptions,
TableProperties** properties) {
// -- Read metaindex block // -- Read metaindex block
Footer footer; Footer footer;
auto s = ReadFooterFromFile(file, file_size, &footer, table_magic_number); auto s = ReadFooterFromFile(file, file_size, &footer, table_magic_number);
@ -265,7 +267,7 @@ Status ReadTableProperties(RandomAccessFileReader* file, uint64_t file_size,
ReadOptions read_options; ReadOptions read_options;
read_options.verify_checksums = false; read_options.verify_checksums = false;
s = ReadBlockContents(file, footer, read_options, metaindex_handle, s = ReadBlockContents(file, footer, read_options, metaindex_handle,
&metaindex_contents, env, false /* decompress */); &metaindex_contents, ioptions, false /* decompress */);
if (!s.ok()) { if (!s.ok()) {
return s; return s;
} }
@ -282,8 +284,7 @@ Status ReadTableProperties(RandomAccessFileReader* file, uint64_t file_size,
TableProperties table_properties; TableProperties table_properties;
if (found_properties_block == true) { if (found_properties_block == true) {
s = ReadProperties(meta_iter->value(), file, footer, env, info_log, s = ReadProperties(meta_iter->value(), file, footer, ioptions, properties);
properties);
} else { } else {
s = Status::NotFound(); s = Status::NotFound();
} }
@ -305,7 +306,8 @@ Status FindMetaBlock(InternalIterator* meta_index_iter,
} }
Status FindMetaBlock(RandomAccessFileReader* file, uint64_t file_size, Status FindMetaBlock(RandomAccessFileReader* file, uint64_t file_size,
uint64_t table_magic_number, Env* env, uint64_t table_magic_number,
const ImmutableCFOptions &ioptions,
const std::string& meta_block_name, const std::string& meta_block_name,
BlockHandle* block_handle) { BlockHandle* block_handle) {
Footer footer; Footer footer;
@ -319,7 +321,7 @@ Status FindMetaBlock(RandomAccessFileReader* file, uint64_t file_size,
ReadOptions read_options; ReadOptions read_options;
read_options.verify_checksums = false; read_options.verify_checksums = false;
s = ReadBlockContents(file, footer, read_options, metaindex_handle, s = ReadBlockContents(file, footer, read_options, metaindex_handle,
&metaindex_contents, env, false /* do decompression */); &metaindex_contents, ioptions, false /* do decompression */);
if (!s.ok()) { if (!s.ok()) {
return s; return s;
} }
@ -332,7 +334,8 @@ Status FindMetaBlock(RandomAccessFileReader* file, uint64_t file_size,
} }
Status ReadMetaBlock(RandomAccessFileReader* file, uint64_t file_size, Status ReadMetaBlock(RandomAccessFileReader* file, uint64_t file_size,
uint64_t table_magic_number, Env* env, uint64_t table_magic_number,
const ImmutableCFOptions &ioptions,
const std::string& meta_block_name, const std::string& meta_block_name,
BlockContents* contents) { BlockContents* contents) {
Status status; Status status;
@ -348,7 +351,8 @@ Status ReadMetaBlock(RandomAccessFileReader* file, uint64_t file_size,
ReadOptions read_options; ReadOptions read_options;
read_options.verify_checksums = false; read_options.verify_checksums = false;
status = ReadBlockContents(file, footer, read_options, metaindex_handle, status = ReadBlockContents(file, footer, read_options, metaindex_handle,
&metaindex_contents, env, false /* decompress */); &metaindex_contents, ioptions,
false /* decompress */);
if (!status.ok()) { if (!status.ok()) {
return status; return status;
} }
@ -368,7 +372,7 @@ Status ReadMetaBlock(RandomAccessFileReader* file, uint64_t file_size,
// Reading metablock // Reading metablock
return ReadBlockContents(file, footer, read_options, block_handle, contents, return ReadBlockContents(file, footer, read_options, block_handle, contents,
env, false /* decompress */); ioptions, false /* decompress */);
} }
} // namespace rocksdb } // namespace rocksdb

View File

@ -94,7 +94,7 @@ bool NotifyCollectTableCollectorsOnFinish(
// *table_properties will point to a heap-allocated TableProperties // *table_properties will point to a heap-allocated TableProperties
// object, otherwise value of `table_properties` will not be modified. // object, otherwise value of `table_properties` will not be modified.
Status ReadProperties(const Slice& handle_value, RandomAccessFileReader* file, Status ReadProperties(const Slice& handle_value, RandomAccessFileReader* file,
const Footer& footer, Env* env, Logger* logger, const Footer& footer, const ImmutableCFOptions &ioptions,
TableProperties** table_properties); TableProperties** table_properties);
// Directly read the properties from the properties block of a plain table. // Directly read the properties from the properties block of a plain table.
@ -102,8 +102,9 @@ Status ReadProperties(const Slice& handle_value, RandomAccessFileReader* file,
// *table_properties will point to a heap-allocated TableProperties // *table_properties will point to a heap-allocated TableProperties
// object, otherwise value of `table_properties` will not be modified. // object, otherwise value of `table_properties` will not be modified.
Status ReadTableProperties(RandomAccessFileReader* file, uint64_t file_size, Status ReadTableProperties(RandomAccessFileReader* file, uint64_t file_size,
uint64_t table_magic_number, Env* env, uint64_t table_magic_number,
Logger* info_log, TableProperties** properties); const ImmutableCFOptions &ioptions,
TableProperties** properties);
// Find the meta block from the meta index block. // Find the meta block from the meta index block.
Status FindMetaBlock(InternalIterator* meta_index_iter, Status FindMetaBlock(InternalIterator* meta_index_iter,
@ -112,7 +113,8 @@ Status FindMetaBlock(InternalIterator* meta_index_iter,
// Find the meta block // Find the meta block
Status FindMetaBlock(RandomAccessFileReader* file, uint64_t file_size, Status FindMetaBlock(RandomAccessFileReader* file, uint64_t file_size,
uint64_t table_magic_number, Env* env, uint64_t table_magic_number,
const ImmutableCFOptions &ioptions,
const std::string& meta_block_name, const std::string& meta_block_name,
BlockHandle* block_handle); BlockHandle* block_handle);
@ -120,7 +122,8 @@ Status FindMetaBlock(RandomAccessFileReader* file, uint64_t file_size,
// from `file` and initialize `contents` with contents of this block. // from `file` and initialize `contents` with contents of this block.
// Return Status::OK in case of success. // Return Status::OK in case of success.
Status ReadMetaBlock(RandomAccessFileReader* file, uint64_t file_size, Status ReadMetaBlock(RandomAccessFileReader* file, uint64_t file_size,
uint64_t table_magic_number, Env* env, uint64_t table_magic_number,
const ImmutableCFOptions &ioptions,
const std::string& meta_block_name, const std::string& meta_block_name,
BlockContents* contents); BlockContents* contents);

View File

@ -128,7 +128,7 @@ Status PlainTableReader::Open(const ImmutableCFOptions& ioptions,
TableProperties* props = nullptr; TableProperties* props = nullptr;
auto s = ReadTableProperties(file.get(), file_size, kPlainTableMagicNumber, auto s = ReadTableProperties(file.get(), file_size, kPlainTableMagicNumber,
ioptions.env, ioptions.info_log, &props); ioptions, &props);
if (!s.ok()) { if (!s.ok()) {
return s; return s;
} }
@ -293,13 +293,13 @@ Status PlainTableReader::PopulateIndex(TableProperties* props,
BlockContents bloom_block_contents; BlockContents bloom_block_contents;
auto s = ReadMetaBlock(file_info_.file.get(), file_size_, auto s = ReadMetaBlock(file_info_.file.get(), file_size_,
kPlainTableMagicNumber, ioptions_.env, kPlainTableMagicNumber, ioptions_,
BloomBlockBuilder::kBloomBlock, &bloom_block_contents); BloomBlockBuilder::kBloomBlock, &bloom_block_contents);
bool index_in_file = s.ok(); bool index_in_file = s.ok();
BlockContents index_block_contents; BlockContents index_block_contents;
s = ReadMetaBlock( s = ReadMetaBlock(
file_info_.file.get(), file_size_, kPlainTableMagicNumber, ioptions_.env, file_info_.file.get(), file_size_, kPlainTableMagicNumber, ioptions_,
PlainTableIndexBuilder::kPlainTableIndexBlock, &index_block_contents); PlainTableIndexBuilder::kPlainTableIndexBlock, &index_block_contents);
index_in_file &= s.ok(); index_in_file &= s.ok();

View File

@ -2064,7 +2064,7 @@ TEST_F(PlainTableTest, BasicPlainTableProperties) {
TableProperties* props = nullptr; TableProperties* props = nullptr;
auto s = ReadTableProperties(file_reader.get(), ss->contents().size(), auto s = ReadTableProperties(file_reader.get(), ss->contents().size(),
kPlainTableMagicNumber, Env::Default(), nullptr, kPlainTableMagicNumber, ioptions,
&props); &props);
std::unique_ptr<TableProperties> props_guard(props); std::unique_ptr<TableProperties> props_guard(props);
ASSERT_OK(s); ASSERT_OK(s);

View File

@ -219,8 +219,7 @@ Status SstFileReader::ReadTableProperties(uint64_t table_magic_number,
uint64_t file_size) { uint64_t file_size) {
TableProperties* table_properties = nullptr; TableProperties* table_properties = nullptr;
Status s = rocksdb::ReadTableProperties(file, file_size, table_magic_number, Status s = rocksdb::ReadTableProperties(file, file_size, table_magic_number,
options_.env, options_.info_log.get(), ioptions_, &table_properties);
&table_properties);
if (s.ok()) { if (s.ok()) {
table_properties_.reset(table_properties); table_properties_.reset(table_properties);
} else { } else {

View File

@ -154,7 +154,8 @@ inline void EncodeFixed64(char* buf, uint64_t value) {
// Pull the last 8 bits and cast it to a character // Pull the last 8 bits and cast it to a character
inline void PutFixed32(std::string* dst, uint32_t value) { inline void PutFixed32(std::string* dst, uint32_t value) {
#if __BYTE_ORDER__ == __LITTLE_ENDIAN__ #if __BYTE_ORDER__ == __LITTLE_ENDIAN__
dst->append(static_cast<const char*>(&value), sizeof(value)); dst->append(const_cast<const char*>(reinterpret_cast<char*>(&value)),
sizeof(value));
#else #else
char buf[sizeof(value)]; char buf[sizeof(value)];
EncodeFixed32(buf, value); EncodeFixed32(buf, value);
@ -164,7 +165,8 @@ inline void PutFixed32(std::string* dst, uint32_t value) {
inline void PutFixed64(std::string* dst, uint64_t value) { inline void PutFixed64(std::string* dst, uint64_t value) {
#if __BYTE_ORDER__ == __LITTLE_ENDIAN__ #if __BYTE_ORDER__ == __LITTLE_ENDIAN__
dst->append(static_const<const char*>(&value), sizeof(value)); dst->append(const_cast<const char*>(reinterpret_cast<char*>(&value)),
sizeof(value));
#else #else
char buf[sizeof(value)]; char buf[sizeof(value)];
EncodeFixed64(buf, value); EncodeFixed64(buf, value);

35
util/statistics_test.cc Normal file
View File

@ -0,0 +1,35 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
//
#include "port/stack_trace.h"
#include "util/testharness.h"
#include "util/testutil.h"
#include "rocksdb/statistics.h"
namespace rocksdb {
class StatisticsTest : public testing::Test {};
// Sanity check to make sure that contents and order of TickersNameMap
// match Tickers enum
TEST_F(StatisticsTest, Sanity) {
EXPECT_EQ(static_cast<size_t>(Tickers::TICKER_ENUM_MAX),
TickersNameMap.size());
for (uint32_t t = 0; t < Tickers::TICKER_ENUM_MAX; t++) {
auto pair = TickersNameMap[static_cast<size_t>(t)];
ASSERT_EQ(pair.first, t) << "Miss match at " << pair.second;
}
}
} // namespace rocksdb
int main(int argc, char** argv) {
rocksdb::port::InstallStackTraceHandler();
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}

View File

@ -6,6 +6,7 @@
#include <assert.h> #include <assert.h>
#include <condition_variable> #include <condition_variable>
#include <functional>
#include <mutex> #include <mutex>
#include <string> #include <string>
#include <thread> #include <thread>

View File

@ -10,6 +10,7 @@
#pragma once #pragma once
#include <atomic> #include <atomic>
#include <functional>
#include <memory> #include <memory>
#include <unordered_map> #include <unordered_map>
#include <vector> #include <vector>

View File

@ -7,6 +7,7 @@
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
#include <functional>
#include "util/random.h" #include "util/random.h"
#include "utilities/persistent_cache/hash_table.h" #include "utilities/persistent_cache/hash_table.h"
#include "utilities/persistent_cache/lrulist.h" #include "utilities/persistent_cache/lrulist.h"

View File

@ -5,6 +5,7 @@
#include "rocksdb/utilities/sim_cache.h" #include "rocksdb/utilities/sim_cache.h"
#include <atomic> #include <atomic>
#include "port/port.h"
namespace rocksdb { namespace rocksdb {