Compare commits

...

18 Commits

Author SHA1 Message Date
sdong
d903241757 Disable error as warning 2019-11-05 11:04:45 -08:00
sdong
01475b49ee Add one more #include<functional> 2019-11-05 11:04:44 -08:00
sdong
de3fdee3ce Add some include<functional> 2019-10-31 14:24:56 -07:00
sdong
6afcb0d933 [FB Internal] Point to the latest tool chain. 2019-10-31 14:24:29 -07:00
sdong
d6f3ff3629 [fb only] revert unintended change of USE_SSE
The previuos change that use gcc-5 set USE_SSE to wrong flag by mistake. Fix it.
2017-07-17 22:22:40 -07:00
sdong
4e0ea33c3c [FB Only] use gcc-5 2017-07-17 21:55:40 -07:00
sdong
58d8de115f Upgrade to version 4.10.2 2016-09-07 11:03:54 -07:00
sdong
95ef90988b Ignore stale logs while restarting DBs
Summary:
Stale log files can be deleted out of order. This can happen for various reasons. One of the reason is that no data is ever inserted to a column family and we have an optimization to update its log number, but not all the old log files are cleaned up (the case shown in the unit tests added). It can also happen when we simply delete multiple log files out of order.

This causes data corruption because we simply increase seqID after processing the next row and we may end up with writing data with smaller seqID than what is already flushed to memtables.

In DB recovery, for the oldest files we are replaying, if there it contains no data for any column family, we ignore the sequence IDs in the file.

Test Plan: Add two unit tests that fail without the fix.

Reviewers: IslamAbdelRahman, igor, yiwu

Reviewed By: yiwu

Subscribers: hermanlee4, yoshinorim, leveldb, andrewkr, dhruba

Differential Revision: https://reviews.facebook.net/D60891
2016-09-07 10:57:32 -07:00
krad
1b3810391e Update version to 4.10.1 2016-08-26 14:53:34 -07:00
krad
4f1f992b67 Revert "Update version to 4.10.1"
This reverts commit 89b20d3cce.
2016-08-26 14:51:39 -07:00
krad
89b20d3cce Update version to 4.10.1 2016-08-26 14:46:47 -07:00
sdong
a31b8cb7a7 Mitigate regression bug of options.max_successive_merges hit during DB Recovery
Summary:
After 1b8a2e8fdd, DB Pointer is passed to WriteBatchInternal::InsertInto() while DB recovery. This can cause deadlock if options.max_successive_merges hits. In that case DB::Get() will be called. Get() will try to acquire the DB mutex, which is already held by the DB::Open(), causing a deadlock condition.

This commit mitigates the problem by not passing the DB pointer unless 2PC is allowed.

Test Plan: Add a new test and run it.

Reviewers: IslamAbdelRahman, andrewkr, kradhakrishnan, horuff

Reviewed By: kradhakrishnan

Subscribers: leveldb, andrewkr, dhruba

Differential Revision: https://reviews.facebook.net/D62625
2016-08-25 17:35:38 -07:00
John Alexander
2701f5c267 Remove %z Format Specifier and Fix Windows Build of sim_cache.cc (#1224)
* Replace %zu format specifier with Windows-compatible macro 'ROCKSDB_PRIszt'

* Added "port/port.h" include to sim_cache.cc for call to snprintf().

* Applied cleaner fix to windows build, reverting part of 7bedd94
2016-07-28 16:07:21 -07:00
John Alexander
fdc0675f5c New Statistics to track Compression/Decompression (#1197)
* Added new statistics and refactored to allow ioptions to be passed around as required to access environment and statistics pointers (and, as a convenient side effect, info_log pointer).

* Prevent incrementing compression counter when compression is turned off in options.

* Prevent incrementing compression counter when compression is turned off in options.

* Added two more supported compression types to test code in db_test.cc

* Prevent incrementing compression counter when compression is turned off in options.

* Added new StatsLevel that excludes compression timing.

* Fixed casting error in coding.h

* Fixed CompressionStatsTest for new StatsLevel.

* Removed unused variable that was breaking the Linux build
2016-07-28 16:06:47 -07:00
Islam AbdelRahman
605cd9bfcb Fix Statistics TickersNameMap miss match with Tickers enum
Summary:
TickersNameMap is not consistent with Tickers enum.
this cause us to report wrong statistics and sometimes to access TickersNameMap outside it's boundary causing crashes (in Fb303 statistics)

Test Plan: added new unit test

Reviewers: sdong, kradhakrishnan

Reviewed By: kradhakrishnan

Subscribers: andrewkr, dhruba

Differential Revision: https://reviews.facebook.net/D61083
2016-07-26 13:42:44 -07:00
Yi Wu
c7eb50b132 Fix unit test which breaks lite build
Summary: Comment out assertion of number of table files from lite build.

Test Plan:
    OPT=-DROCKSDB_LITE make check

Reviewers: lightmark

Reviewed By: lightmark

Subscribers: andrewkr, dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D60999
2016-07-21 14:32:49 -07:00
Yi Wu
2462cebfe8 Fix flush not being commit while writing manifest
Summary:
Fix flush not being commit while writing manifest, which is a recent bug introduced by D60075.

The issue:
# Options.max_background_flushes > 1
# Background thread A pick up a flush job, flush, then commit to manifest. (Note that mutex is released before writing manifest.)
# Background thread B pick up another flush job, flush. When it gets to `MemTableList::InstallMemtableFlushResults`, it notices another thread is commiting, so it quit.
# After the first commit, thread A doesn't double check if there are more flush result need to commit, leaving the second flush uncommitted.

Test Plan: run the test. Also verify the new test hit deadlock without the fix.

Reviewers: sdong, igor, lightmark

Reviewed By: lightmark

Subscribers: andrewkr, omegaga, dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D60969
2016-07-21 13:56:22 -07:00
krad
becc230aa9 Fix version number 2016-07-21 10:59:26 -07:00
41 changed files with 689 additions and 231 deletions

2
.gitignore vendored
View File

@ -64,3 +64,5 @@ java/javadoc
scan_build_report/
t
LOG
db_logs/

View File

@ -356,6 +356,7 @@ set(TESTS
db/db_test2.cc
db/db_block_cache_test.cc
db/db_bloom_filter_test.cc
db/db_flush_test.cc
db/db_iterator_test.cc
db/db_sst_test.cc
db/db_universal_compaction_test.cc
@ -420,6 +421,7 @@ set(TESTS
util/options_test.cc
util/rate_limiter_test.cc
util/slice_transform_test.cc
util/statistics_test.cc
util/thread_list_test.cc
util/thread_local_test.cc
utilities/backupable/backupable_db_test.cc

View File

@ -1,5 +1,12 @@
# Rocksdb Change Log
## Unreleased
## 4.10.2
### Bug Fixes
* backport the bug fix of the regression data corruption after DB recovery when stale WAL file deletion is reordered.
## 4.10.1
### Bug Fixes
* Fix the regression deadlock bug of DB recovery if options.max_successive_merges hits.
## 4.10.0
### Public API Change
* options.memtable_prefix_bloom_bits changes to options.memtable_prefix_bloom_bits_ratio and deprecate options.memtable_prefix_bloom_probes
* enum type CompressionType and PerfLevel changes from char to unsigned char. Value of all PerfLevel shift by one.

View File

@ -212,10 +212,6 @@ default: all
WARNING_FLAGS = -W -Wextra -Wall -Wsign-compare -Wshadow \
-Wno-unused-parameter
ifndef DISABLE_WARNING_AS_ERROR
WARNING_FLAGS += -Werror
endif
CFLAGS += $(WARNING_FLAGS) -I. -I./include $(PLATFORM_CCFLAGS) $(OPT)
CXXFLAGS += $(WARNING_FLAGS) -I. -I./include $(PLATFORM_CXXFLAGS) $(OPT) -Woverloaded-virtual -Wnon-virtual-dtor -Wno-missing-field-initializers
@ -274,8 +270,10 @@ TESTS = \
db_compaction_filter_test \
db_compaction_test \
db_dynamic_level_test \
db_flush_test \
db_inplace_update_test \
db_iterator_test \
db_options_test \
db_sst_test \
db_tailing_iter_test \
db_universal_compaction_test \
@ -372,6 +370,7 @@ TESTS = \
ldb_cmd_test \
iostats_context_test \
persistent_cache_test \
statistics_test \
PARALLEL_TEST = \
backupable_db_test \
@ -920,6 +919,9 @@ db_compaction_test: db/db_compaction_test.o db/db_test_util.o $(LIBOBJECTS) $(TE
db_dynamic_level_test: db/db_dynamic_level_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK)
db_flush_test: db/db_flush_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK)
db_inplace_update_test: db/db_inplace_update_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK)
@ -1199,6 +1201,9 @@ iostats_context_test: util/iostats_context_test.o $(LIBOBJECTS) $(TESTHARNESS)
persistent_cache_test: utilities/persistent_cache/persistent_cache_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK)
statistics_test: util/statistics_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK)
#-------------------------------------------------
# make install related stuff
INSTALL_PATH ?= /usr/local

View File

@ -52,12 +52,7 @@ if [ -z "$ROCKSDB_NO_FBCODE" -a -d /mnt/gvfs/third-party ]; then
FBCODE_BUILD="true"
# If we're compiling with TSAN we need pic build
PIC_BUILD=$COMPILE_WITH_TSAN
if [ -z "$ROCKSDB_FBCODE_BUILD_WITH_481" ]; then
source "$PWD/build_tools/fbcode_config.sh"
else
# we need this to build with MySQL. Don't use for other purposes.
source "$PWD/build_tools/fbcode_config4.8.1.sh"
fi
source "$PWD/build_tools/fbcode_config.sh"
fi
# Delete existing output, if it exists

View File

@ -1,16 +1,19 @@
GCC_BASE=/mnt/gvfs/third-party2/gcc/ebc96bc2fb751b5a0300b8d91a95bdf24ac1d88b/4.9.x/centos6-native/108cf83
CLANG_BASE=/mnt/gvfs/third-party2/llvm-fb/b91de48a4974ec839946d824402b098d43454cef/stable/centos6-native/7aaccbe
LIBGCC_BASE=/mnt/gvfs/third-party2/libgcc/53e0eac8911888a105aa98b9a35fe61cf1d8b278/4.9.x/gcc-4.9-glibc-2.20/024dbc3
GLIBC_BASE=/mnt/gvfs/third-party2/glibc/ee36ac9a72dfac4a995f1b215bb4c0fc8a0f6f91/2.20/gcc-4.9-glibc-2.20/500e281
SNAPPY_BASE=/mnt/gvfs/third-party2/snappy/8c38a4c1e52b4c2cc8a9cdc31b9c947ed7dbfcb4/1.1.3/gcc-4.9-glibc-2.20/e9936bf
ZLIB_BASE=/mnt/gvfs/third-party2/zlib/2b24f1e99454f9ca7b0301720f94836dae1bf71b/1.2.8/gcc-5-glibc-2.23/9bc6787
BZIP2_BASE=/mnt/gvfs/third-party2/bzip2/af7c14c9b652cdd5ec34eadd25c3f38a9b306c5d/1.0.6/gcc-5-glibc-2.23/9bc6787
LZ4_BASE=/mnt/gvfs/third-party2/lz4/0e790b441e2d9acd68d51e1d2e028f88c6a79ddf/r131/gcc-5-glibc-2.23/9bc6787
ZSTD_BASE=/mnt/gvfs/third-party2/zstd/1408484d03b15492aa54b10356104e9dc22e1cc5/0.6.1/gcc-5-glibc-2.23/9bc6787
GFLAGS_BASE=/mnt/gvfs/third-party2/gflags/5a5c7a6608cb32f1e1e7f814023d5bdfbd136370/2.1.1/gcc-4.8.1-glibc-2.17/c3f970a
JEMALLOC_BASE=/mnt/gvfs/third-party2/jemalloc/a4c2adecffcaa68d5585d06be2252e3efa52555b/master/gcc-5-glibc-2.23/1c32b4b
NUMA_BASE=/mnt/gvfs/third-party2/numa/1abc0d3c01743b854676423cf2d3629912f34930/2.0.8/gcc-5-glibc-2.23/9bc6787
LIBUNWIND_BASE=/mnt/gvfs/third-party2/libunwind/ad576de2a1ea560c4d3434304f0fc4e079bede42/trunk/gcc-5-glibc-2.23/b1847cb
KERNEL_HEADERS_BASE=/mnt/gvfs/third-party2/kernel-headers/f24be37d170e04be6e469af487644d4d62e1c6c1/4.0.9-36_fbk5_2933_gd092e3f/gcc-5-glibc-2.23/da39a3e
BINUTILS_BASE=/mnt/gvfs/third-party2/binutils/9d65c666b9adf8f2a989fd4b98a9a5e7d3afa233/2.26/centos6-native/da39a3e
VALGRIND_BASE=/mnt/gvfs/third-party2/valgrind/9cee5a3628dc9d4b93897972c58eba865e25b270/3.10.0/gcc-4.9-glibc-2.20/e9936bf
# Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved.
GCC_BASE=/mnt/gvfs/third-party2/gcc/7331085db891a2ef4a88a48a751d834e8d68f4cb/7.x/centos7-native/b2ef2b6
CLANG_BASE=/mnt/gvfs/third-party2/llvm-fb/963d9aeda70cc4779885b1277484fe7544a04e3e/9.0.0/platform007/9e92d53/
LIBGCC_BASE=/mnt/gvfs/third-party2/libgcc/6ace84e956873d53638c738b6f65f3f469cca74c/7.x/platform007/5620abc
GLIBC_BASE=/mnt/gvfs/third-party2/glibc/192b0f42d63dcf6210d6ceae387b49af049e6e0c/2.26/platform007/f259413
SNAPPY_BASE=/mnt/gvfs/third-party2/snappy/7f9bdaada18f59bc27ec2b0871eb8a6144343aef/1.1.3/platform007/ca4da3d
ZLIB_BASE=/mnt/gvfs/third-party2/zlib/2d9f0b9a4274cc21f61272a9e89bdb859bce8f1f/1.2.8/platform007/ca4da3d
BZIP2_BASE=/mnt/gvfs/third-party2/bzip2/dc49a21c5fceec6456a7a28a94dcd16690af1337/1.0.6/platform007/ca4da3d
LZ4_BASE=/mnt/gvfs/third-party2/lz4/0f607f8fc442ea7d6b876931b1898bb573d5e5da/1.9.1/platform007/ca4da3d
ZSTD_BASE=/mnt/gvfs/third-party2/zstd/ca22bc441a4eb709e9e0b1f9fec9750fed7b31c5/1.4.x/platform007/15a3614
GFLAGS_BASE=/mnt/gvfs/third-party2/gflags/0b9929d2588991c65a57168bf88aff2db87c5d48/2.2.0/platform007/ca4da3d
JEMALLOC_BASE=/mnt/gvfs/third-party2/jemalloc/c26f08f47ac35fc31da2633b7da92d6b863246eb/master/platform007/c26c002
NUMA_BASE=/mnt/gvfs/third-party2/numa/3f3fb57a5ccc5fd21c66416c0b83e0aa76a05376/2.0.11/platform007/ca4da3d
LIBUNWIND_BASE=/mnt/gvfs/third-party2/libunwind/40c73d874898b386a71847f1b99115d93822d11f/1.4/platform007/6f3e0a9
TBB_BASE=/mnt/gvfs/third-party2/tbb/4ce8e8dba77cdbd81b75d6f0c32fd7a1b76a11ec/2018_U5/platform007/ca4da3d
KERNEL_HEADERS_BASE=/mnt/gvfs/third-party2/kernel-headers/fb251ecd2f5ae16f8671f7014c246e52a748fe0b/fb/platform007/da39a3e
BINUTILS_BASE=/mnt/gvfs/third-party2/binutils/ab9f09bba370e7066cafd4eb59752db93f2e8312/2.29.1/platform007/15a3614
VALGRIND_BASE=/mnt/gvfs/third-party2/valgrind/d42d152a15636529b0861ec493927200ebebca8e/3.15.0/platform007/ca4da3d
LUA_BASE=/mnt/gvfs/third-party2/lua/f0cd714433206d5139df61659eb7b28b1dea6683/5.3.4/platform007/5007832

View File

@ -13,7 +13,7 @@ source "$BASEDIR/dependencies.sh"
CFLAGS=""
# libgcc
LIBGCC_INCLUDE="$LIBGCC_BASE/include"
LIBGCC_INCLUDE="$LIBGCC_BASE/include/c++/7.3.0"
LIBGCC_LIBS=" -L $LIBGCC_BASE/lib"
# glibc
@ -43,12 +43,16 @@ if test -z $PIC_BUILD; then
LZ4_INCLUDE=" -I $LZ4_BASE/include/"
LZ4_LIBS=" $LZ4_BASE/lib/liblz4.a"
CFLAGS+=" -DLZ4"
ZSTD_INCLUDE=" -I $ZSTD_BASE/include/"
ZSTD_LIBS=" $ZSTD_BASE/lib/libzstd.a"
CFLAGS+=" -DZSTD"
fi
ZSTD_INCLUDE=" -I $ZSTD_BASE/include/"
if test -z $PIC_BUILD; then
ZSTD_LIBS=" $ZSTD_BASE/lib/libzstd.a"
else
ZSTD_LIBS=" $ZSTD_BASE/lib/libzstd_pic.a"
fi
CFLAGS+=" -DZSTD"
# location of gflags headers and libraries
GFLAGS_INCLUDE=" -I $GFLAGS_BASE/include/"
if test -z $PIC_BUILD; then
@ -56,7 +60,7 @@ if test -z $PIC_BUILD; then
else
GFLAGS_LIBS=" $GFLAGS_BASE/lib/libgflags_pic.a"
fi
CFLAGS+=" -DGFLAGS=google"
CFLAGS+=" -DGFLAGS=gflags"
# location of jemalloc
JEMALLOC_INCLUDE=" -I $JEMALLOC_BASE/include/"
@ -72,13 +76,22 @@ if test -z $PIC_BUILD; then
LIBUNWIND="$LIBUNWIND_BASE/lib/libunwind.a"
fi
# location of TBB
TBB_INCLUDE=" -isystem $TBB_BASE/include/"
if test -z $PIC_BUILD; then
TBB_LIBS="$TBB_BASE/lib/libtbb.a"
else
TBB_LIBS="$TBB_BASE/lib/libtbb_pic.a"
fi
CFLAGS+=" -DTBB"
# use Intel SSE support for checksum calculations
export USE_SSE=1
BINUTILS="$BINUTILS_BASE/bin"
AR="$BINUTILS/ar"
DEPS_INCLUDE="$SNAPPY_INCLUDE $ZLIB_INCLUDE $BZIP_INCLUDE $LZ4_INCLUDE $ZSTD_INCLUDE $GFLAGS_INCLUDE $NUMA_INCLUDE"
DEPS_INCLUDE="$SNAPPY_INCLUDE $ZLIB_INCLUDE $BZIP_INCLUDE $LZ4_INCLUDE $ZSTD_INCLUDE $GFLAGS_INCLUDE $NUMA_INCLUDE $TBB_INCLUDE"
STDLIBS="-L $GCC_BASE/lib64"
@ -95,8 +108,8 @@ if [ -z "$USE_CLANG" ]; then
CXX="$GCC_BASE/bin/g++"
CFLAGS+=" -B$BINUTILS/gold"
CFLAGS+=" -isystem $GLIBC_INCLUDE"
CFLAGS+=" -isystem $LIBGCC_INCLUDE"
CFLAGS+=" -isystem $GLIBC_INCLUDE"
JEMALLOC=1
else
# clang
@ -107,8 +120,8 @@ else
KERNEL_HEADERS_INCLUDE="$KERNEL_HEADERS_BASE/include"
CFLAGS+=" -B$BINUTILS/gold -nostdinc -nostdlib"
CFLAGS+=" -isystem $LIBGCC_BASE/include/c++/4.9.x "
CFLAGS+=" -isystem $LIBGCC_BASE/include/c++/4.9.x/x86_64-facebook-linux "
CFLAGS+=" -isystem $LIBGCC_BASE/include/c++/7.x "
CFLAGS+=" -isystem $LIBGCC_BASE/include/c++/7.x/x86_64-facebook-linux "
CFLAGS+=" -isystem $GLIBC_INCLUDE"
CFLAGS+=" -isystem $LIBGCC_INCLUDE"
CFLAGS+=" -isystem $CLANG_INCLUDE"
@ -119,18 +132,21 @@ else
fi
CFLAGS+=" $DEPS_INCLUDE"
CFLAGS+=" -DROCKSDB_PLATFORM_POSIX -DROCKSDB_LIB_IO_POSIX -DROCKSDB_FALLOCATE_PRESENT -DROCKSDB_MALLOC_USABLE_SIZE"
CFLAGS+=" -DROCKSDB_PLATFORM_POSIX -DROCKSDB_LIB_IO_POSIX -DROCKSDB_FALLOCATE_PRESENT -DROCKSDB_MALLOC_USABLE_SIZE -DROCKSDB_RANGESYNC_PRESENT -DROCKSDB_SCHED_GETCPU_PRESENT -DROCKSDB_SUPPORT_THREAD_LOCAL -DHAVE_SSE42"
CXXFLAGS+=" $CFLAGS"
EXEC_LDFLAGS=" $SNAPPY_LIBS $ZLIB_LIBS $BZIP_LIBS $LZ4_LIBS $ZSTD_LIBS $GFLAGS_LIBS $NUMA_LIB"
EXEC_LDFLAGS+=" -Wl,--dynamic-linker,/usr/local/fbcode/gcc-4.9-glibc-2.20/lib/ld.so"
EXEC_LDFLAGS=" $SNAPPY_LIBS $ZLIB_LIBS $BZIP_LIBS $LZ4_LIBS $ZSTD_LIBS $GFLAGS_LIBS $NUMA_LIB $TBB_LIBS"
EXEC_LDFLAGS+=" -B$BINUTILS/gold"
EXEC_LDFLAGS+=" -Wl,--dynamic-linker,/usr/local/fbcode/platform007/lib/ld.so"
EXEC_LDFLAGS+=" $LIBUNWIND"
EXEC_LDFLAGS+=" -Wl,-rpath=/usr/local/fbcode/gcc-4.9-glibc-2.20/lib"
EXEC_LDFLAGS+=" -Wl,-rpath=/usr/local/fbcode/platform007/lib"
# required by libtbb
EXEC_LDFLAGS+=" -ldl"
PLATFORM_LDFLAGS="$LIBGCC_LIBS $GLIBC_LIBS $STDLIBS -lgcc -lstdc++"
EXEC_LDFLAGS_SHARED="$SNAPPY_LIBS $ZLIB_LIBS $BZIP_LIBS $LZ4_LIBS $ZSTD_LIBS $GFLAGS_LIBS"
EXEC_LDFLAGS_SHARED="$SNAPPY_LIBS $ZLIB_LIBS $BZIP_LIBS $LZ4_LIBS $ZSTD_LIBS $GFLAGS_LIBS $TBB_LIBS"
VALGRIND_VER="$VALGRIND_BASE/bin/"
export CC CXX AR CFLAGS CXXFLAGS EXEC_LDFLAGS EXEC_LDFLAGS_SHARED VALGRIND_VER JEMALLOC_LIB JEMALLOC_INCLUDE CLANG_ANALYZER CLANG_SCAN_BUILD
export CC CXX AR CFLAGS CXXFLAGS EXEC_LDFLAGS EXEC_LDFLAGS_SHARED VALGRIND_VER JEMALLOC_LIB JEMALLOC_INCLUDE CLANG_ANALYZER CLANG_SCAN_BUILD

View File

@ -18,6 +18,7 @@
#include "rocksdb/env.h"
#include "rocksdb/iterator.h"
#include "util/coding.h"
#include "util/fault_injection_test_env.h"
#include "util/options_parser.h"
#include "util/string_util.h"
#include "util/sync_point.h"
@ -500,6 +501,135 @@ TEST_F(ColumnFamilyTest, DontReuseColumnFamilyID) {
}
}
class FlushEmptyCFTestWithParam : public ColumnFamilyTest,
public testing::WithParamInterface<bool> {
public:
FlushEmptyCFTestWithParam() { allow_2pc_ = GetParam(); }
// Required if inheriting from testing::WithParamInterface<>
static void SetUpTestCase() {}
static void TearDownTestCase() {}
bool allow_2pc_;
};
TEST_P(FlushEmptyCFTestWithParam, FlushEmptyCFTest) {
std::unique_ptr<FaultInjectionTestEnv> fault_env(
new FaultInjectionTestEnv(env_));
db_options_.env = fault_env.get();
db_options_.allow_2pc = allow_2pc_;
Open();
CreateColumnFamilies({"one", "two"});
// Generate log file A.
ASSERT_OK(Put(1, "foo", "v1")); // seqID 1
Reopen();
// Log file A is not dropped after reopening because default column family's
// min log number is 0.
// It flushes to SST file X
ASSERT_OK(Put(1, "foo", "v1")); // seqID 2
ASSERT_OK(Put(1, "bar", "v2")); // seqID 3
// Current log file is file B now. While flushing, a new log file C is created
// and is set to current. Boths' min log number is set to file C in memory, so
// after flushing file B is deleted. At the same time, the min log number of
// default CF is not written to manifest. Log file A still remains.
// Flushed to SST file Y.
Flush(1);
Flush(0);
ASSERT_OK(Put(1, "bar", "v3")); // seqID 4
ASSERT_OK(Put(1, "foo", "v4")); // seqID 5
// Preserve file system state up to here to simulate a crash condition.
fault_env->SetFilesystemActive(false);
std::vector<std::string> names;
for (auto name : names_) {
if (name != "") {
names.push_back(name);
}
}
Close();
fault_env->ResetState();
// Before opening, there are four files:
// Log file A contains seqID 1
// Log file C contains seqID 4, 5
// SST file X contains seqID 1
// SST file Y contains seqID 2, 3
// Min log number:
// default CF: 0
// CF one, two: C
// When opening the DB, all the seqID should be preserved.
Open(names, {});
ASSERT_EQ("v4", Get(1, "foo"));
ASSERT_EQ("v3", Get(1, "bar"));
Close();
db_options_.env = env_;
}
TEST_P(FlushEmptyCFTestWithParam, FlushEmptyCFTest2) {
std::unique_ptr<FaultInjectionTestEnv> fault_env(
new FaultInjectionTestEnv(env_));
db_options_.env = fault_env.get();
db_options_.allow_2pc = allow_2pc_;
Open();
CreateColumnFamilies({"one", "two"});
// Generate log file A.
ASSERT_OK(Put(1, "foo", "v1")); // seqID 1
Reopen();
// Log file A is not dropped after reopening because default column family's
// min log number is 0.
// It flushes to SST file X
ASSERT_OK(Put(1, "foo", "v1")); // seqID 2
ASSERT_OK(Put(1, "bar", "v2")); // seqID 3
// Current log file is file B now. While flushing, a new log file C is created
// and is set to current. Both CFs' min log number is set to file C so after
// flushing file B is deleted. Log file A still remains.
// Flushed to SST file Y.
Flush(1);
ASSERT_OK(Put(0, "bar", "v2")); // seqID 4
ASSERT_OK(Put(2, "bar", "v2")); // seqID 5
ASSERT_OK(Put(1, "bar", "v3")); // seqID 6
// Flushing all column families. This forces all CFs' min log to current. This
// is written to the manifest file. Log file C is cleared.
Flush(0);
Flush(1);
Flush(2);
// Write to log file D
ASSERT_OK(Put(1, "bar", "v4")); // seqID 7
ASSERT_OK(Put(1, "bar", "v5")); // seqID 8
// Preserve file system state up to here to simulate a crash condition.
fault_env->SetFilesystemActive(false);
std::vector<std::string> names;
for (auto name : names_) {
if (name != "") {
names.push_back(name);
}
}
Close();
fault_env->ResetState();
// Before opening, there are two logfiles:
// Log file A contains seqID 1
// Log file D contains seqID 7, 8
// Min log number:
// default CF: D
// CF one, two: D
// When opening the DB, log file D should be replayed using the seqID
// specified in the file.
Open(names, {});
ASSERT_EQ("v1", Get(1, "foo"));
ASSERT_EQ("v5", Get(1, "bar"));
Close();
db_options_.env = env_;
}
INSTANTIATE_TEST_CASE_P(FlushEmptyCFTestWithParam, FlushEmptyCFTestWithParam,
::testing::Bool());
TEST_F(ColumnFamilyTest, AddDrop) {
Open();
CreateColumnFamilies({"one", "two", "three"});

56
db/db_flush_test.cc Normal file
View File

@ -0,0 +1,56 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "db/db_test_util.h"
#include "port/stack_trace.h"
#include "util/sync_point.h"
namespace rocksdb {
class DBFlushTest : public DBTestBase {
public:
DBFlushTest() : DBTestBase("/db_flush_test") {}
};
// We had issue when two background threads trying to flush at the same time,
// only one of them get committed. The test verifies the issue is fixed.
TEST_F(DBFlushTest, FlushWhileWritingManifest) {
Options options;
options.disable_auto_compactions = true;
options.max_background_flushes = 2;
Reopen(options);
FlushOptions no_wait;
no_wait.wait = false;
SyncPoint::GetInstance()->LoadDependency(
{{"VersionSet::LogAndApply:WriteManifest",
"DBFlushTest::FlushWhileWritingManifest:1"},
{"MemTableList::InstallMemtableFlushResults:InProgress",
"VersionSet::LogAndApply:WriteManifestDone"}});
SyncPoint::GetInstance()->EnableProcessing();
ASSERT_OK(Put("foo", "v"));
ASSERT_OK(dbfull()->Flush(no_wait));
TEST_SYNC_POINT("DBFlushTest::FlushWhileWritingManifest:1");
ASSERT_OK(Put("bar", "v"));
ASSERT_OK(dbfull()->Flush(no_wait));
// If the issue is hit we will wait here forever.
dbfull()->TEST_WaitForFlushMemTable();
#ifndef ROCKSDB_LITE
ASSERT_EQ(2, TotalTableFiles());
#endif // ROCKSDB_LITE
}
} // namespace rocksdb
int main(int argc, char** argv) {
rocksdb::port::InstallStackTraceHandler();
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}

View File

@ -1478,9 +1478,11 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
}
recovered_sequence = sequence;
bool no_prev_seq = true;
if (*next_sequence == kMaxSequenceNumber) {
*next_sequence = sequence;
} else {
no_prev_seq = false;
WriteBatchInternal::SetSequence(&batch, *next_sequence);
}
@ -1563,10 +1565,24 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
// insert. We don't want to fail the whole write batch in that case --
// we just ignore the update.
// That's why we set ignore missing column families to true
//
// If we pass DB through and options.max_successive_merges is hit
// during recovery, Get() will be issued which will try to acquire
// DB mutex and cause deadlock, as DB mutex is already held.
// The DB pointer is not needed unless 2PC is used.
// TODO(sdong) fix the allow_2pc case too.
bool has_valid_writes = false;
status = WriteBatchInternal::InsertInto(
&batch, column_family_memtables_.get(), &flush_scheduler_, true,
log_number, this, false /* concurrent_memtable_writes */,
next_sequence);
log_number, db_options_.allow_2pc ? this : nullptr, false /* concurrent_memtable_writes */,
next_sequence, &has_valid_writes);
// If it is the first log file and there is no column family updated
// after replaying the file, this file may be a stale file. We ignore
// sequence IDs from the file. Otherwise, if a newer stale log file that
// has been deleted, the sequenceID may be wrong.
if (no_prev_seq && !has_valid_writes) {
*next_sequence = kMaxSequenceNumber;
}
MaybeIgnoreError(&status);
if (!status.ok()) {
// We are treating this as a failure while reading since we read valid
@ -1575,7 +1591,7 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
continue;
}
if (!read_only) {
if (has_valid_writes && !read_only) {
// we can do this because this is called before client has access to the
// DB and there is only a single thread operating on DB
ColumnFamilyData* cfd;

View File

@ -11,6 +11,7 @@
#include "port/stack_trace.h"
#include "rocksdb/iostats_context.h"
#include "rocksdb/perf_context.h"
#include "port/port.h"
namespace rocksdb {
@ -979,7 +980,7 @@ TEST_F(DBIteratorTest, PinnedDataIteratorRandomized) {
{
// Test Seek to random keys
printf("Testing seek on %zu keys\n", random_keys.size());
printf("Testing seek on %" ROCKSDB_PRIszt " keys\n", random_keys.size());
std::vector<Slice> keys_slices;
std::vector<std::string> true_keys;
for (auto& k : random_keys) {

View File

@ -12,6 +12,7 @@
#include "rocksdb/sst_file_manager.h"
#include "rocksdb/sst_file_writer.h"
#include "util/sst_file_manager_impl.h"
#include "port/port.h"
namespace rocksdb {
@ -1455,7 +1456,7 @@ TEST_F(DBSSTTest, AddExternalSstFileOverlappingRanges) {
}
printf(
"Total: %zu ranges\n"
"Total: %" ROCKSDB_PRIszt " ranges\n"
"AddFile()|Success: %d ranges\n"
"AddFile()|RangeConflict: %d ranges\n"
"Put(): %d ranges\n",

View File

@ -1189,6 +1189,12 @@ bool MinLevelToCompress(CompressionType& type, Options& options, int wbits,
} else if (LZ4_Supported()) {
type = kLZ4Compression;
fprintf(stderr, "using lz4\n");
} else if (XPRESS_Supported()) {
type = kXpressCompression;
fprintf(stderr, "using xpress\n");
} else if (ZSTD_Supported()) {
type = kZSTDNotFinalCompression;
fprintf(stderr, "using ZSTD\n");
} else {
fprintf(stderr, "skipping test, compression disabled\n");
return false;
@ -4685,6 +4691,75 @@ TEST_F(DBTest, EncodeDecompressedBlockSizeTest) {
}
}
TEST_F(DBTest, CompressionStatsTest) {
CompressionType type;
if (Snappy_Supported()) {
type = kSnappyCompression;
fprintf(stderr, "using snappy\n");
} else if (Zlib_Supported()) {
type = kZlibCompression;
fprintf(stderr, "using zlib\n");
} else if (BZip2_Supported()) {
type = kBZip2Compression;
fprintf(stderr, "using bzip2\n");
} else if (LZ4_Supported()) {
type = kLZ4Compression;
fprintf(stderr, "using lz4\n");
} else if (XPRESS_Supported()) {
type = kXpressCompression;
fprintf(stderr, "using xpress\n");
} else if (ZSTD_Supported()) {
type = kZSTDNotFinalCompression;
fprintf(stderr, "using ZSTD\n");
} else {
fprintf(stderr, "skipping test, compression disabled\n");
return;
}
Options options = CurrentOptions();
options.compression = type;
options.statistics = rocksdb::CreateDBStatistics();
options.statistics->stats_level_ = StatsLevel::kAll;
DestroyAndReopen(options);
int kNumKeysWritten = 100000;
// Check that compressions occur and are counted when compression is turned on
Random rnd(301);
for (int i = 0; i < kNumKeysWritten; ++i) {
// compressible string
ASSERT_OK(Put(Key(i), RandomString(&rnd, 128) + std::string(128, 'a')));
}
ASSERT_GT(options.statistics->getTickerCount(NUMBER_BLOCK_COMPRESSED), 0);
for (int i = 0; i < kNumKeysWritten; ++i) {
auto r = Get(Key(i));
}
ASSERT_GT(options.statistics->getTickerCount(NUMBER_BLOCK_DECOMPRESSED), 0);
options.compression = kNoCompression;
DestroyAndReopen(options);
uint64_t currentCompressions =
options.statistics->getTickerCount(NUMBER_BLOCK_COMPRESSED);
uint64_t currentDecompressions =
options.statistics->getTickerCount(NUMBER_BLOCK_DECOMPRESSED);
// Check that compressions do not occur when turned off
for (int i = 0; i < kNumKeysWritten; ++i) {
// compressible string
ASSERT_OK(Put(Key(i), RandomString(&rnd, 128) + std::string(128, 'a')));
}
ASSERT_EQ(options.statistics->getTickerCount(NUMBER_BLOCK_COMPRESSED)
- currentCompressions, 0);
for (int i = 0; i < kNumKeysWritten; ++i) {
auto r = Get(Key(i));
}
ASSERT_EQ(options.statistics->getTickerCount(NUMBER_BLOCK_DECOMPRESSED)
- currentDecompressions, 0);
}
TEST_F(DBTest, MutexWaitStatsDisabledByDefault) {
Options options = CurrentOptions();
options.create_if_missing = true;

View File

@ -1487,6 +1487,23 @@ TEST_F(DBTest2, SyncPointMarker) {
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
}
#endif
TEST_F(DBTest2, MaxSuccessiveMergesInRecovery) {
Options options;
options = CurrentOptions(options);
options.merge_operator = MergeOperators::CreatePutOperator();
DestroyAndReopen(options);
db_->Put(WriteOptions(), "foo", "bar");
ASSERT_OK(db_->Merge(WriteOptions(), "foo", "bar"));
ASSERT_OK(db_->Merge(WriteOptions(), "foo", "bar"));
ASSERT_OK(db_->Merge(WriteOptions(), "foo", "bar"));
ASSERT_OK(db_->Merge(WriteOptions(), "foo", "bar"));
ASSERT_OK(db_->Merge(WriteOptions(), "foo", "bar"));
options.max_successive_merges = 3;
Reopen(options);
}
} // namespace rocksdb
int main(int argc, char** argv) {

View File

@ -11,14 +11,15 @@
#include <inttypes.h>
#include <string>
#include "rocksdb/db.h"
#include "db/memtable.h"
#include "db/version_set.h"
#include "rocksdb/db.h"
#include "rocksdb/env.h"
#include "rocksdb/iterator.h"
#include "table/merger.h"
#include "util/coding.h"
#include "util/log_buffer.h"
#include "util/sync_point.h"
#include "util/thread_status_util.h"
namespace rocksdb {
@ -297,69 +298,79 @@ Status MemTableList::InstallMemtableFlushResults(
// if some other thread is already committing, then return
Status s;
if (commit_in_progress_) {
TEST_SYNC_POINT("MemTableList::InstallMemtableFlushResults:InProgress");
return s;
}
// Only a single thread can be executing this piece of code
commit_in_progress_ = true;
// scan all memtables from the earliest, and commit those
// (in that order) that have finished flushing. Memetables
// are always committed in the order that they were created.
uint64_t batch_file_number = 0;
size_t batch_count = 0;
autovector<VersionEdit*> edit_list;
auto& memlist = current_->memlist_;
// enumerate from the last (earliest) element to see how many batch finished
for (auto it = memlist.rbegin(); it != memlist.rend(); ++it) {
MemTable* m = *it;
if (!m->flush_completed_) {
// Retry until all completed flushes are committed. New flushes can finish
// while the current thread is writing manifest where mutex is released.
while (s.ok()) {
auto& memlist = current_->memlist_;
if (memlist.empty() || !memlist.back()->flush_completed_) {
break;
}
if (it == memlist.rbegin() || batch_file_number != m->file_number_) {
batch_file_number = m->file_number_;
LogToBuffer(log_buffer, "[%s] Level-0 commit table #%" PRIu64 " started",
cfd->GetName().c_str(), m->file_number_);
edit_list.push_back(&m->edit_);
}
batch_count++;
}
if (batch_count > 0) {
// this can release and reacquire the mutex.
s = vset->LogAndApply(cfd, mutable_cf_options, edit_list, mu, db_directory);
// we will be changing the version in the next code path,
// so we better create a new one, since versions are immutable
InstallNewVersion();
// All the later memtables that have the same filenum
// are part of the same batch. They can be committed now.
uint64_t mem_id = 1; // how many memtables have been flushed.
if (s.ok()) { // commit new state
while (batch_count-- > 0) {
MemTable* m = current_->memlist_.back();
LogToBuffer(log_buffer, "[%s] Level-0 commit table #%" PRIu64
": memtable #%" PRIu64 " done",
cfd->GetName().c_str(), m->file_number_, mem_id);
assert(m->file_number_ > 0);
current_->Remove(m, to_delete);
++mem_id;
// scan all memtables from the earliest, and commit those
// (in that order) that have finished flushing. Memetables
// are always committed in the order that they were created.
uint64_t batch_file_number = 0;
size_t batch_count = 0;
autovector<VersionEdit*> edit_list;
// enumerate from the last (earliest) element to see how many batch finished
for (auto it = memlist.rbegin(); it != memlist.rend(); ++it) {
MemTable* m = *it;
if (!m->flush_completed_) {
break;
}
} else {
for (auto it = current_->memlist_.rbegin(); batch_count-- > 0; it++) {
MemTable* m = *it;
// commit failed. setup state so that we can flush again.
LogToBuffer(log_buffer, "Level-0 commit table #%" PRIu64
": memtable #%" PRIu64 " failed",
m->file_number_, mem_id);
m->flush_completed_ = false;
m->flush_in_progress_ = false;
m->edit_.Clear();
num_flush_not_started_++;
m->file_number_ = 0;
imm_flush_needed.store(true, std::memory_order_release);
++mem_id;
if (it == memlist.rbegin() || batch_file_number != m->file_number_) {
batch_file_number = m->file_number_;
LogToBuffer(log_buffer,
"[%s] Level-0 commit table #%" PRIu64 " started",
cfd->GetName().c_str(), m->file_number_);
edit_list.push_back(&m->edit_);
}
batch_count++;
}
if (batch_count > 0) {
// this can release and reacquire the mutex.
s = vset->LogAndApply(cfd, mutable_cf_options, edit_list, mu,
db_directory);
// we will be changing the version in the next code path,
// so we better create a new one, since versions are immutable
InstallNewVersion();
// All the later memtables that have the same filenum
// are part of the same batch. They can be committed now.
uint64_t mem_id = 1; // how many memtables have been flushed.
if (s.ok()) { // commit new state
while (batch_count-- > 0) {
MemTable* m = current_->memlist_.back();
LogToBuffer(log_buffer, "[%s] Level-0 commit table #%" PRIu64
": memtable #%" PRIu64 " done",
cfd->GetName().c_str(), m->file_number_, mem_id);
assert(m->file_number_ > 0);
current_->Remove(m, to_delete);
++mem_id;
}
} else {
for (auto it = current_->memlist_.rbegin(); batch_count-- > 0; it++) {
MemTable* m = *it;
// commit failed. setup state so that we can flush again.
LogToBuffer(log_buffer, "Level-0 commit table #%" PRIu64
": memtable #%" PRIu64 " failed",
m->file_number_, mem_id);
m->flush_completed_ = false;
m->flush_in_progress_ = false;
m->edit_.Clear();
num_flush_not_started_++;
m->file_number_ = 0;
imm_flush_needed.store(true, std::memory_order_release);
++mem_id;
}
}
}
}

View File

@ -331,20 +331,19 @@ class TestPlainTableFactory : public PlainTableFactory {
TableProperties* props = nullptr;
auto s =
ReadTableProperties(file.get(), file_size, kPlainTableMagicNumber,
table_reader_options.ioptions.env,
table_reader_options.ioptions.info_log, &props);
table_reader_options.ioptions, &props);
EXPECT_TRUE(s.ok());
if (store_index_in_file_) {
BlockHandle bloom_block_handle;
s = FindMetaBlock(file.get(), file_size, kPlainTableMagicNumber,
table_reader_options.ioptions.env,
table_reader_options.ioptions,
BloomBlockBuilder::kBloomBlock, &bloom_block_handle);
EXPECT_TRUE(s.ok());
BlockHandle index_block_handle;
s = FindMetaBlock(file.get(), file_size, kPlainTableMagicNumber,
table_reader_options.ioptions.env,
table_reader_options.ioptions,
PlainTableIndexBuilder::kPlainTableIndexBlock,
&index_block_handle);
EXPECT_TRUE(s.ok());

View File

@ -276,7 +276,7 @@ void TestCustomizedTablePropertiesCollector(
new test::StringSource(fwf->contents())));
TableProperties* props;
Status s = ReadTableProperties(fake_file_reader.get(), fwf->contents().size(),
magic_number, Env::Default(), nullptr, &props);
magic_number, ioptions, &props);
std::unique_ptr<TableProperties> props_guard(props);
ASSERT_OK(s);
@ -417,7 +417,7 @@ void TestInternalKeyPropertiesCollector(
TableProperties* props;
Status s =
ReadTableProperties(reader.get(), fwf->contents().size(), magic_number,
Env::Default(), nullptr, &props);
ioptions, &props);
ASSERT_OK(s);
std::unique_ptr<TableProperties> props_guard(props);

View File

@ -597,8 +597,7 @@ Status Version::GetTableProperties(std::shared_ptr<const TableProperties>* tp,
new RandomAccessFileReader(std::move(file)));
s = ReadTableProperties(
file_reader.get(), file_meta->fd.GetFileSize(),
Footer::kInvalidTableMagicNumber /* table's magic number */, vset_->env_,
ioptions->info_log, &raw_table_properties);
Footer::kInvalidTableMagicNumber /* table's magic number */, *ioptions, &raw_table_properties);
if (!s.ok()) {
return s;
}

View File

@ -694,6 +694,7 @@ class MemTableInserter : public WriteBatch::Handler {
uint64_t log_number_ref_;
DBImpl* db_;
const bool concurrent_memtable_writes_;
bool* has_valid_writes_;
typedef std::map<MemTable*, MemTablePostProcessInfo> MemPostInfoMap;
MemPostInfoMap mem_post_info_map_;
// current recovered transaction we are rebuilding (recovery)
@ -704,7 +705,8 @@ class MemTableInserter : public WriteBatch::Handler {
FlushScheduler* flush_scheduler,
bool ignore_missing_column_families,
uint64_t recovering_log_number, DB* db,
bool concurrent_memtable_writes)
bool concurrent_memtable_writes,
bool* has_valid_writes = nullptr)
: sequence_(sequence),
cf_mems_(cf_mems),
flush_scheduler_(flush_scheduler),
@ -713,6 +715,7 @@ class MemTableInserter : public WriteBatch::Handler {
log_number_ref_(0),
db_(reinterpret_cast<DBImpl*>(db)),
concurrent_memtable_writes_(concurrent_memtable_writes),
has_valid_writes_(has_valid_writes),
rebuilding_trx_(nullptr) {
assert(cf_mems_);
}
@ -756,6 +759,10 @@ class MemTableInserter : public WriteBatch::Handler {
return false;
}
if (has_valid_writes_ != nullptr) {
*has_valid_writes_ = true;
}
if (log_number_ref_ > 0) {
cf_mems_->GetMemTable()->RefLogContainingPrepSection(log_number_ref_);
}
@ -978,6 +985,9 @@ class MemTableInserter : public WriteBatch::Handler {
// we are now iterating through a prepared section
rebuilding_trx_ = new WriteBatch();
if (has_valid_writes_ != nullptr) {
*has_valid_writes_ = true;
}
} else {
// in non-recovery we ignore prepare markers
// and insert the values directly. making sure we have a
@ -1031,6 +1041,9 @@ class MemTableInserter : public WriteBatch::Handler {
if (s.ok()) {
db_->DeleteRecoveredTransaction(name.ToString());
}
if (has_valid_writes_ != nullptr) {
*has_valid_writes_ = true;
}
}
} else {
// in non recovery we simply ignore this tag
@ -1114,16 +1127,15 @@ Status WriteBatchInternal::InsertInto(WriteThread::Writer* writer,
return s;
}
Status WriteBatchInternal::InsertInto(const WriteBatch* batch,
ColumnFamilyMemTables* memtables,
FlushScheduler* flush_scheduler,
bool ignore_missing_column_families,
uint64_t log_number, DB* db,
bool concurrent_memtable_writes,
SequenceNumber* last_seq_used) {
Status WriteBatchInternal::InsertInto(
const WriteBatch* batch, ColumnFamilyMemTables* memtables,
FlushScheduler* flush_scheduler, bool ignore_missing_column_families,
uint64_t log_number, DB* db, bool concurrent_memtable_writes,
SequenceNumber* last_seq_used, bool* has_valid_writes) {
MemTableInserter inserter(WriteBatchInternal::Sequence(batch), memtables,
flush_scheduler, ignore_missing_column_families,
log_number, db, concurrent_memtable_writes);
log_number, db, concurrent_memtable_writes,
has_valid_writes);
Status s = batch->Iterate(&inserter);
if (last_seq_used != nullptr) {
*last_seq_used = inserter.get_final_sequence();

View File

@ -160,7 +160,8 @@ class WriteBatchInternal {
bool ignore_missing_column_families = false,
uint64_t log_number = 0, DB* db = nullptr,
bool concurrent_memtable_writes = false,
SequenceNumber* last_seq_used = nullptr);
SequenceNumber* last_seq_used = nullptr,
bool* has_valid_writes = nullptr);
static Status InsertInto(WriteThread::Writer* writer,
ColumnFamilyMemTables* memtables,

View File

@ -179,6 +179,11 @@ enum Tickers : uint32_t {
NUMBER_SUPERVERSION_ACQUIRES,
NUMBER_SUPERVERSION_RELEASES,
NUMBER_SUPERVERSION_CLEANUPS,
// # of compressions/decompressions executed
NUMBER_BLOCK_COMPRESSED,
NUMBER_BLOCK_DECOMPRESSED,
NUMBER_BLOCK_NOT_COMPRESSED,
MERGE_OPERATION_TOTAL_TIME,
FILTER_OPERATION_TOTAL_TIME,
@ -211,6 +216,8 @@ const std::vector<std::pair<Tickers, std::string>> TickersNameMap = {
{BLOCK_CACHE_BYTES_READ, "rocksdb.block.cache.bytes.read"},
{BLOCK_CACHE_BYTES_WRITE, "rocksdb.block.cache.bytes.write"},
{BLOOM_FILTER_USEFUL, "rocksdb.bloom.filter.useful"},
{PERSISTENT_CACHE_HIT, "rocksdb.persistent.cache.hit"},
{PERSISTENT_CACHE_MISS, "rocksdb.persistent.cache.miss"},
{MEMTABLE_HIT, "rocksdb.memtable.hit"},
{MEMTABLE_MISS, "rocksdb.memtable.miss"},
{GET_HIT_L0, "rocksdb.l0.hit"},
@ -260,15 +267,18 @@ const std::vector<std::pair<Tickers, std::string>> TickersNameMap = {
{WAL_FILE_BYTES, "rocksdb.wal.bytes"},
{WRITE_DONE_BY_SELF, "rocksdb.write.self"},
{WRITE_DONE_BY_OTHER, "rocksdb.write.other"},
{WRITE_TIMEDOUT, "rocksdb.write.timeout"},
{WRITE_WITH_WAL, "rocksdb.write.wal"},
{FLUSH_WRITE_BYTES, "rocksdb.flush.write.bytes"},
{COMPACT_READ_BYTES, "rocksdb.compact.read.bytes"},
{COMPACT_WRITE_BYTES, "rocksdb.compact.write.bytes"},
{FLUSH_WRITE_BYTES, "rocksdb.flush.write.bytes"},
{NUMBER_DIRECT_LOAD_TABLE_PROPERTIES,
"rocksdb.number.direct.load.table.properties"},
{NUMBER_SUPERVERSION_ACQUIRES, "rocksdb.number.superversion_acquires"},
{NUMBER_SUPERVERSION_RELEASES, "rocksdb.number.superversion_releases"},
{NUMBER_SUPERVERSION_CLEANUPS, "rocksdb.number.superversion_cleanups"},
{NUMBER_BLOCK_COMPRESSED, "rocksdb.number.block.compressed"},
{NUMBER_BLOCK_DECOMPRESSED, "rocksdb.number.block.decompressed"},
{NUMBER_BLOCK_NOT_COMPRESSED, "rocksdb.number.block.not_compressed"},
{MERGE_OPERATION_TOTAL_TIME, "rocksdb.merge.operation.time.nanos"},
{FILTER_OPERATION_TOTAL_TIME, "rocksdb.filter.operation.time.nanos"},
@ -313,6 +323,14 @@ enum Histograms : uint32_t {
BYTES_PER_READ,
BYTES_PER_WRITE,
BYTES_PER_MULTIGET,
// number of bytes compressed/decompressed
// number of bytes is when uncompressed; i.e. before/after respectively
BYTES_COMPRESSED,
BYTES_DECOMPRESSED,
COMPRESSION_TIMES_NANOS,
DECOMPRESSION_TIMES_NANOS,
HISTOGRAM_ENUM_MAX, // TODO(ldemailly): enforce HistogramsNameMap match
};
@ -343,6 +361,10 @@ const std::vector<std::pair<Histograms, std::string>> HistogramsNameMap = {
{BYTES_PER_READ, "rocksdb.bytes.per.read"},
{BYTES_PER_WRITE, "rocksdb.bytes.per.write"},
{BYTES_PER_MULTIGET, "rocksdb.bytes.per.multiget"},
{BYTES_COMPRESSED, "rocksdb.bytes.compressed"},
{BYTES_DECOMPRESSED, "rocksdb.bytes.decompressed"},
{COMPRESSION_TIMES_NANOS, "rocksdb.compression.times.nanos"},
{DECOMPRESSION_TIMES_NANOS, "rocksdb.decompression.times.nanos"},
};
struct HistogramData {
@ -357,6 +379,9 @@ enum StatsLevel {
// Collect all stats except the counters requiring to get time inside the
// mutex lock.
kExceptTimeForMutex,
// Collect all stats expect time inside mutex lock AND time spent on
// compression
kExceptDetailedTimers,
// Collect all stats, including measuring duration of mutex operations.
// If getting time is expensive on the platform to run, it can
// reduce scalability to more threads, especially for writes.

View File

@ -5,8 +5,8 @@
#pragma once
#define ROCKSDB_MAJOR 4
#define ROCKSDB_MINOR 9
#define ROCKSDB_PATCH 0
#define ROCKSDB_MINOR 10
#define ROCKSDB_PATCH 2
// Do not use these. We made the mistake of declaring macros starting with
// double underscore. Now we have to live with our choice. We'll deprecate these

2
src.mk
View File

@ -215,6 +215,7 @@ MAIN_SOURCES = \
db/db_compaction_filter_test.cc \
db/db_compaction_test.cc \
db/db_dynamic_level_test.cc \
db/db_flush_test.cc \
db/db_inplace_update_test.cc \
db/db_iterator_test.cc \
db/db_log_iter_test.cc \
@ -274,6 +275,7 @@ MAIN_SOURCES = \
util/env_test.cc \
util/filelock_test.cc \
util/histogram_test.cc \
util/statistics_test.cc \
utilities/backupable/backupable_db_test.cc \
utilities/checkpoint/checkpoint_test.cc \
utilities/document/document_db_test.cc \

View File

@ -651,11 +651,16 @@ void BlockBasedTableBuilder::WriteBlock(const Slice& raw_block_contents,
auto type = r->compression_type;
Slice block_contents;
bool abort_compression = false;
StopWatchNano timer(r->ioptions.env,
ShouldReportDetailedTime(r->ioptions.env, r->ioptions.statistics));
if (raw_block_contents.size() < kCompressionSizeLimit) {
Slice compression_dict;
if (is_data_block && r->compression_dict && r->compression_dict->size()) {
compression_dict = *r->compression_dict;
}
block_contents = CompressBlock(raw_block_contents, r->compression_opts,
&type, r->table_options.format_version,
compression_dict, &r->compressed_output);
@ -668,7 +673,8 @@ void BlockBasedTableBuilder::WriteBlock(const Slice& raw_block_contents,
BlockContents contents;
Status stat = UncompressBlockContentsForCompressionType(
block_contents.data(), block_contents.size(), &contents,
r->table_options.format_version, compression_dict, type);
r->table_options.format_version, compression_dict, type,
r->ioptions);
if (stat.ok()) {
bool compressed_ok = contents.data.compare(raw_block_contents) == 0;
@ -698,6 +704,15 @@ void BlockBasedTableBuilder::WriteBlock(const Slice& raw_block_contents,
type = kNoCompression;
block_contents = raw_block_contents;
}
else if (type != kNoCompression &&
ShouldReportDetailedTime(r->ioptions.env,
r->ioptions.statistics)) {
MeasureTime(r->ioptions.statistics, COMPRESSION_TIMES_NANOS,
timer.ElapsedNanos());
MeasureTime(r->ioptions.statistics, BYTES_COMPRESSED,
raw_block_contents.size());
RecordTick(r->ioptions.statistics, NUMBER_BLOCK_COMPRESSED);
}
WriteRawBlock(block_contents, type, handle);
r->compressed_output.clear();

View File

@ -60,14 +60,13 @@ namespace {
// dictionary.
Status ReadBlockFromFile(RandomAccessFileReader* file, const Footer& footer,
const ReadOptions& options, const BlockHandle& handle,
std::unique_ptr<Block>* result, Env* env,
std::unique_ptr<Block>* result,
const ImmutableCFOptions &ioptions,
bool do_uncompress, const Slice& compression_dict,
const PersistentCacheOptions& cache_options,
Logger* info_log) {
const PersistentCacheOptions& cache_options) {
BlockContents contents;
Status s = ReadBlockContents(file, footer, options, handle, &contents, env,
do_uncompress, compression_dict, cache_options,
info_log);
Status s = ReadBlockContents(file, footer, options, handle, &contents, ioptions,
do_uncompress, compression_dict, cache_options);
if (s.ok()) {
result->reset(new Block(std::move(contents)));
}
@ -177,19 +176,18 @@ class BinarySearchIndexReader : public IndexReader {
// On success, index_reader will be populated; otherwise it will remain
// unmodified.
static Status Create(RandomAccessFileReader* file, const Footer& footer,
const BlockHandle& index_handle, Env* env,
const BlockHandle& index_handle,
const ImmutableCFOptions &ioptions,
const Comparator* comparator, IndexReader** index_reader,
const PersistentCacheOptions& cache_options,
Statistics* statistics) {
const PersistentCacheOptions& cache_options) {
std::unique_ptr<Block> index_block;
auto s = ReadBlockFromFile(file, footer, ReadOptions(), index_handle,
&index_block, env, true /* decompress */,
Slice() /*compression dict*/, cache_options,
/*info_log*/ nullptr);
&index_block, ioptions, true /* decompress */,
Slice() /*compression dict*/, cache_options);
if (s.ok()) {
*index_reader = new BinarySearchIndexReader(
comparator, std::move(index_block), statistics);
comparator, std::move(index_block), ioptions.statistics);
}
return s;
@ -226,15 +224,15 @@ class HashIndexReader : public IndexReader {
public:
static Status Create(
const SliceTransform* hash_key_extractor, const Footer& footer,
RandomAccessFileReader* file, Env* env, const Comparator* comparator,
const BlockHandle& index_handle, InternalIterator* meta_index_iter,
IndexReader** index_reader, bool hash_index_allow_collision,
const PersistentCacheOptions& cache_options, Statistics* statistics) {
RandomAccessFileReader* file, const ImmutableCFOptions &ioptions,
const Comparator* comparator, const BlockHandle& index_handle,
InternalIterator* meta_index_iter, IndexReader** index_reader,
bool hash_index_allow_collision,
const PersistentCacheOptions& cache_options) {
std::unique_ptr<Block> index_block;
auto s = ReadBlockFromFile(file, footer, ReadOptions(), index_handle,
&index_block, env, true /* decompress */,
Slice() /*compression dict*/, cache_options,
/*info_log*/ nullptr);
&index_block, ioptions, true /* decompress */,
Slice() /*compression dict*/, cache_options);
if (!s.ok()) {
return s;
@ -245,7 +243,8 @@ class HashIndexReader : public IndexReader {
// So, Create will succeed regardless, from this point on.
auto new_index_reader =
new HashIndexReader(comparator, std::move(index_block), statistics);
new HashIndexReader(comparator, std::move(index_block),
ioptions.statistics);
*index_reader = new_index_reader;
// Get prefixes block
@ -269,14 +268,14 @@ class HashIndexReader : public IndexReader {
// Read contents for the blocks
BlockContents prefixes_contents;
s = ReadBlockContents(file, footer, ReadOptions(), prefixes_handle,
&prefixes_contents, env, true /* decompress */,
&prefixes_contents, ioptions, true /* decompress */,
Slice() /*compression dict*/, cache_options);
if (!s.ok()) {
return s;
}
BlockContents prefixes_meta_contents;
s = ReadBlockContents(file, footer, ReadOptions(), prefixes_meta_handle,
&prefixes_meta_contents, env, true /* decompress */,
&prefixes_meta_contents, ioptions, true /* decompress */,
Slice() /*compression dict*/, cache_options);
if (!s.ok()) {
// TODO: log error
@ -547,7 +546,7 @@ Status BlockBasedTable::Open(const ImmutableCFOptions& ioptions,
PersistentCacheOptions(rep->table_options.persistent_cache,
std::string(rep->persistent_cache_key_prefix,
rep->persistent_cache_key_prefix_size),
rep->ioptions.statistics);
rep->ioptions.statistics);
// Read meta index
std::unique_ptr<Block> meta;
@ -585,8 +584,7 @@ Status BlockBasedTable::Open(const ImmutableCFOptions& ioptions,
TableProperties* table_properties = nullptr;
if (s.ok()) {
s = ReadProperties(meta_iter->value(), rep->file.get(), rep->footer,
rep->ioptions.env, rep->ioptions.info_log,
&table_properties);
rep->ioptions, &table_properties);
}
if (!s.ok()) {
@ -613,7 +611,7 @@ Status BlockBasedTable::Open(const ImmutableCFOptions& ioptions,
// true.
unique_ptr<BlockContents> compression_dict_block{new BlockContents()};
s = rocksdb::ReadMetaBlock(rep->file.get(), file_size,
kBlockBasedTableMagicNumber, rep->ioptions.env,
kBlockBasedTableMagicNumber, rep->ioptions,
rocksdb::kCompressionDictBlock,
compression_dict_block.get());
if (!s.ok()) {
@ -745,9 +743,9 @@ Status BlockBasedTable::ReadMetaBlock(Rep* rep,
std::unique_ptr<Block> meta;
Status s = ReadBlockFromFile(
rep->file.get(), rep->footer, ReadOptions(),
rep->footer.metaindex_handle(), &meta, rep->ioptions.env,
rep->footer.metaindex_handle(), &meta, rep->ioptions,
true /* decompress */, Slice() /*compression dict*/,
rep->persistent_cache_options, rep->ioptions.info_log);
rep->persistent_cache_options);
if (!s.ok()) {
Log(InfoLogLevel::ERROR_LEVEL, rep->ioptions.info_log,
@ -764,13 +762,14 @@ Status BlockBasedTable::ReadMetaBlock(Rep* rep,
Status BlockBasedTable::GetDataBlockFromCache(
const Slice& block_cache_key, const Slice& compressed_block_cache_key,
Cache* block_cache, Cache* block_cache_compressed, Statistics* statistics,
const ReadOptions& read_options,
Cache* block_cache, Cache* block_cache_compressed,
const ImmutableCFOptions &ioptions, const ReadOptions& read_options,
BlockBasedTable::CachableEntry<Block>* block, uint32_t format_version,
const Slice& compression_dict) {
Status s;
Block* compressed_block = nullptr;
Cache::Handle* block_cache_compressed_handle = nullptr;
Statistics* statistics = ioptions.statistics;
// Lookup uncompressed cache first
if (block_cache != nullptr) {
@ -811,7 +810,8 @@ Status BlockBasedTable::GetDataBlockFromCache(
BlockContents contents;
s = UncompressBlockContents(compressed_block->data(),
compressed_block->size(), &contents,
format_version, compression_dict);
format_version, compression_dict,
ioptions);
// Insert uncompressed block into block cache
if (s.ok()) {
@ -840,7 +840,7 @@ Status BlockBasedTable::GetDataBlockFromCache(
Status BlockBasedTable::PutDataBlockToCache(
const Slice& block_cache_key, const Slice& compressed_block_cache_key,
Cache* block_cache, Cache* block_cache_compressed,
const ReadOptions& read_options, Statistics* statistics,
const ReadOptions& read_options, const ImmutableCFOptions &ioptions,
CachableEntry<Block>* block, Block* raw_block, uint32_t format_version,
const Slice& compression_dict) {
assert(raw_block->compression_type() == kNoCompression ||
@ -849,9 +849,10 @@ Status BlockBasedTable::PutDataBlockToCache(
Status s;
// Retrieve the uncompressed contents into a new buffer
BlockContents contents;
Statistics* statistics = ioptions.statistics;
if (raw_block->compression_type() != kNoCompression) {
s = UncompressBlockContents(raw_block->data(), raw_block->size(), &contents,
format_version, compression_dict);
format_version, compression_dict, ioptions);
}
if (!s.ok()) {
delete raw_block;
@ -913,7 +914,7 @@ FilterBlockReader* BlockBasedTable::ReadFilter(Rep* rep) {
}
BlockContents block;
if (!ReadBlockContents(rep->file.get(), rep->footer, ReadOptions(),
rep->filter_handle, &block, rep->ioptions.env,
rep->filter_handle, &block, rep->ioptions,
false /* decompress */, Slice() /*compression dict*/,
rep->persistent_cache_options)
.ok()) {
@ -1148,7 +1149,7 @@ InternalIterator* BlockBasedTable::NewDataBlockIterator(
}
s = GetDataBlockFromCache(
key, ckey, block_cache, block_cache_compressed, statistics, ro, &block,
key, ckey, block_cache, block_cache_compressed, rep->ioptions, ro, &block,
rep->table_options.format_version, compression_dict);
if (block.value == nullptr && !no_io && ro.fill_cache) {
@ -1156,15 +1157,14 @@ InternalIterator* BlockBasedTable::NewDataBlockIterator(
{
StopWatch sw(rep->ioptions.env, statistics, READ_BLOCK_GET_MICROS);
s = ReadBlockFromFile(rep->file.get(), rep->footer, ro, handle,
&raw_block, rep->ioptions.env,
&raw_block, rep->ioptions,
block_cache_compressed == nullptr,
compression_dict, rep->persistent_cache_options,
rep->ioptions.info_log);
compression_dict, rep->persistent_cache_options);
}
if (s.ok()) {
s = PutDataBlockToCache(key, ckey, block_cache, block_cache_compressed,
ro, statistics, &block, raw_block.release(),
ro, rep->ioptions, &block, raw_block.release(),
rep->table_options.format_version,
compression_dict);
}
@ -1184,9 +1184,8 @@ InternalIterator* BlockBasedTable::NewDataBlockIterator(
}
std::unique_ptr<Block> block_value;
s = ReadBlockFromFile(rep->file.get(), rep->footer, ro, handle,
&block_value, rep->ioptions.env, true /* compress */,
compression_dict, rep->persistent_cache_options,
rep->ioptions.info_log);
&block_value, rep->ioptions, true /* compress */,
compression_dict, rep->persistent_cache_options);
if (s.ok()) {
block.value = block_value.release();
}
@ -1510,8 +1509,9 @@ bool BlockBasedTable::TEST_KeyInCache(const ReadOptions& options,
handle, cache_key_storage);
Slice ckey;
s = GetDataBlockFromCache(cache_key, ckey, block_cache, nullptr, nullptr,
options, &block, rep_->table_options.format_version,
s = GetDataBlockFromCache(cache_key, ckey, block_cache, nullptr,
rep_->ioptions, options, &block,
rep_->table_options.format_version,
rep_->compression_dict_block
? rep_->compression_dict_block->data
: Slice());
@ -1544,11 +1544,8 @@ Status BlockBasedTable::CreateIndexReader(
}
auto file = rep_->file.get();
auto env = rep_->ioptions.env;
auto comparator = &rep_->internal_comparator;
const Footer& footer = rep_->footer;
Statistics* stats = rep_->ioptions.statistics;
if (index_type_on_file == BlockBasedTableOptions::kHashSearch &&
rep_->ioptions.prefix_extractor == nullptr) {
Log(InfoLogLevel::WARN_LEVEL, rep_->ioptions.info_log,
@ -1561,8 +1558,8 @@ Status BlockBasedTable::CreateIndexReader(
switch (index_type_on_file) {
case BlockBasedTableOptions::kBinarySearch: {
return BinarySearchIndexReader::Create(
file, footer, footer.index_handle(), env, comparator, index_reader,
rep_->persistent_cache_options, stats);
file, footer, footer.index_handle(), rep_->ioptions, comparator,
index_reader, rep_->persistent_cache_options);
}
case BlockBasedTableOptions::kHashSearch: {
std::unique_ptr<Block> meta_guard;
@ -1577,8 +1574,8 @@ Status BlockBasedTable::CreateIndexReader(
"Unable to read the metaindex block."
" Fall back to binary search index.");
return BinarySearchIndexReader::Create(
file, footer, footer.index_handle(), env, comparator,
index_reader, rep_->persistent_cache_options, stats);
file, footer, footer.index_handle(), rep_->ioptions, comparator,
index_reader, rep_->persistent_cache_options);
}
meta_index_iter = meta_iter_guard.get();
}
@ -1588,10 +1585,9 @@ Status BlockBasedTable::CreateIndexReader(
rep_->internal_prefix_transform.reset(
new InternalKeySliceTransform(rep_->ioptions.prefix_extractor));
return HashIndexReader::Create(
rep_->internal_prefix_transform.get(), footer, file, env, comparator,
footer.index_handle(), meta_index_iter, index_reader,
rep_->hash_index_allow_collision, rep_->persistent_cache_options,
stats);
rep_->internal_prefix_transform.get(), footer, file, rep_->ioptions,
comparator, footer.index_handle(), meta_index_iter, index_reader,
rep_->hash_index_allow_collision, rep_->persistent_cache_options);
}
default: {
std::string error_message =
@ -1711,7 +1707,7 @@ Status BlockBasedTable::DumpTable(WritableFile* out_file) {
BlockContents block;
if (ReadBlockContents(
rep_->file.get(), rep_->footer, ReadOptions(), handle, &block,
rep_->ioptions.env, false /*decompress*/,
rep_->ioptions, false /*decompress*/,
Slice() /*compression dict*/, rep_->persistent_cache_options)
.ok()) {
rep_->filter.reset(new BlockBasedFilterBlockReader(

View File

@ -177,8 +177,8 @@ class BlockBasedTable : public TableReader {
// dictionary.
static Status GetDataBlockFromCache(
const Slice& block_cache_key, const Slice& compressed_block_cache_key,
Cache* block_cache, Cache* block_cache_compressed, Statistics* statistics,
const ReadOptions& read_options,
Cache* block_cache, Cache* block_cache_compressed,
const ImmutableCFOptions &ioptions, const ReadOptions& read_options,
BlockBasedTable::CachableEntry<Block>* block, uint32_t format_version,
const Slice& compression_dict);
@ -195,7 +195,7 @@ class BlockBasedTable : public TableReader {
static Status PutDataBlockToCache(
const Slice& block_cache_key, const Slice& compressed_block_cache_key,
Cache* block_cache, Cache* block_cache_compressed,
const ReadOptions& read_options, Statistics* statistics,
const ReadOptions& read_options, const ImmutableCFOptions &ioptions,
CachableEntry<Block>* block, Block* raw_block, uint32_t format_version,
const Slice& compression_dict);

View File

@ -49,12 +49,16 @@ class CuckooBuilderTest : public testing::Test {
uint64_t read_file_size;
ASSERT_OK(env_->GetFileSize(fname, &read_file_size));
Options options;
options.allow_mmap_reads = true;
ImmutableCFOptions ioptions(options);
// Assert Table Properties.
TableProperties* props = nullptr;
unique_ptr<RandomAccessFileReader> file_reader(
new RandomAccessFileReader(std::move(read_file)));
ASSERT_OK(ReadTableProperties(file_reader.get(), read_file_size,
kCuckooTableMagicNumber, env_, nullptr,
kCuckooTableMagicNumber, ioptions,
&props));
// Check unused bucket.
std::string unused_key = props->user_collected_properties[

View File

@ -45,7 +45,7 @@ CuckooTableReader::CuckooTableReader(
}
TableProperties* props = nullptr;
status_ = ReadTableProperties(file_.get(), file_size, kCuckooTableMagicNumber,
ioptions.env, ioptions.info_log, &props);
ioptions, &props);
if (!status_.ok()) {
return;
}

View File

@ -23,6 +23,9 @@
#include "util/perf_context_imp.h"
#include "util/string_util.h"
#include "util/xxhash.h"
#include "util/statistics.h"
#include "util/stop_watch.h"
namespace rocksdb {
@ -39,6 +42,11 @@ const uint64_t kPlainTableMagicNumber = 0;
#endif
const uint32_t DefaultStackBufferSize = 5000;
bool ShouldReportDetailedTime(Env* env, Statistics* stats) {
return env != nullptr && stats != nullptr &&
stats->stats_level_ > kExceptDetailedTimers;
}
void BlockHandle::EncodeTo(std::string* dst) const {
// Sanity check that all fields have been set
assert(offset_ != ~static_cast<uint64_t>(0));
@ -297,10 +305,10 @@ Status ReadBlock(RandomAccessFileReader* file, const Footer& footer,
Status ReadBlockContents(RandomAccessFileReader* file, const Footer& footer,
const ReadOptions& read_options,
const BlockHandle& handle, BlockContents* contents,
Env* env, bool decompression_requested,
const ImmutableCFOptions &ioptions,
bool decompression_requested,
const Slice& compression_dict,
const PersistentCacheOptions& cache_options,
Logger* info_log) {
const PersistentCacheOptions& cache_options) {
Status status;
Slice slice;
size_t n = static_cast<size_t>(handle.size());
@ -318,9 +326,9 @@ Status ReadBlockContents(RandomAccessFileReader* file, const Footer& footer,
return status;
} else {
// uncompressed page is not found
if (info_log && !status.IsNotFound()) {
if (ioptions.info_log && !status.IsNotFound()) {
assert(!status.ok());
Log(InfoLogLevel::INFO_LEVEL, info_log,
Log(InfoLogLevel::INFO_LEVEL, ioptions.info_log,
"Error reading from persistent cache. %s",
status.ToString().c_str());
}
@ -341,9 +349,9 @@ Status ReadBlockContents(RandomAccessFileReader* file, const Footer& footer,
used_buf = heap_buf.get();
slice = Slice(heap_buf.get(), n);
} else {
if (info_log && !status.IsNotFound()) {
if (ioptions.info_log && !status.IsNotFound()) {
assert(!status.ok());
Log(InfoLogLevel::INFO_LEVEL, info_log,
Log(InfoLogLevel::INFO_LEVEL, ioptions.info_log,
"Error reading from persistent cache. %s", status.ToString().c_str());
}
// cache miss read from device
@ -378,7 +386,8 @@ Status ReadBlockContents(RandomAccessFileReader* file, const Footer& footer,
if (decompression_requested && compression_type != kNoCompression) {
// compressed page, uncompress, update cache
status = UncompressBlockContents(slice.data(), n, contents,
footer.version(), compression_dict);
footer.version(), compression_dict,
ioptions);
} else if (slice.data() != used_buf) {
// the slice content is not the buffer provided
*contents = BlockContents(Slice(slice.data(), n), false, compression_type);
@ -405,11 +414,13 @@ Status ReadBlockContents(RandomAccessFileReader* file, const Footer& footer,
Status UncompressBlockContentsForCompressionType(
const char* data, size_t n, BlockContents* contents,
uint32_t format_version, const Slice& compression_dict,
CompressionType compression_type) {
CompressionType compression_type, const ImmutableCFOptions &ioptions) {
std::unique_ptr<char[]> ubuf;
assert(compression_type != kNoCompression && "Invalid compression type");
StopWatchNano timer(ioptions.env,
ShouldReportDetailedTime(ioptions.env, ioptions.statistics));
int decompress_size = 0;
switch (compression_type) {
case kSnappyCompression: {
@ -501,6 +512,13 @@ Status UncompressBlockContentsForCompressionType(
return Status::Corruption("bad block type");
}
if(ShouldReportDetailedTime(ioptions.env, ioptions.statistics)){
MeasureTime(ioptions.statistics, DECOMPRESSION_TIMES_NANOS,
timer.ElapsedNanos());
MeasureTime(ioptions.statistics, BYTES_DECOMPRESSED, contents->data.size());
RecordTick(ioptions.statistics, NUMBER_BLOCK_DECOMPRESSED);
}
return Status::OK();
}
@ -513,11 +531,12 @@ Status UncompressBlockContentsForCompressionType(
// format_version is the block format as defined in include/rocksdb/table.h
Status UncompressBlockContents(const char* data, size_t n,
BlockContents* contents, uint32_t format_version,
const Slice& compression_dict) {
const Slice& compression_dict,
const ImmutableCFOptions &ioptions) {
assert(data[n] != kNoCompression);
return UncompressBlockContentsForCompressionType(
data, n, contents, format_version, compression_dict,
(CompressionType)data[n]);
(CompressionType)data[n], ioptions);
}
} // namespace rocksdb

View File

@ -24,6 +24,8 @@ class Block;
class RandomAccessFile;
struct ReadOptions;
extern bool ShouldReportDetailedTime(Env* env, Statistics* stats);
// the length of the magic number in bytes.
const int kMagicNumberLengthByte = 8;
@ -212,10 +214,9 @@ struct BlockContents {
extern Status ReadBlockContents(
RandomAccessFileReader* file, const Footer& footer,
const ReadOptions& options, const BlockHandle& handle,
BlockContents* contents, Env* env, bool do_uncompress = true,
const Slice& compression_dict = Slice(),
const PersistentCacheOptions& cache_options = PersistentCacheOptions(),
Logger* info_log = nullptr);
BlockContents* contents, const ImmutableCFOptions &ioptions,
bool do_uncompress = true, const Slice& compression_dict = Slice(),
const PersistentCacheOptions& cache_options = PersistentCacheOptions());
// The 'data' points to the raw block contents read in from file.
// This method allocates a new heap buffer and the raw block
@ -227,7 +228,8 @@ extern Status ReadBlockContents(
extern Status UncompressBlockContents(const char* data, size_t n,
BlockContents* contents,
uint32_t compress_format_version,
const Slice& compression_dict);
const Slice& compression_dict,
const ImmutableCFOptions &ioptions);
// This is an extension to UncompressBlockContents that accepts
// a specific compression type. This is used by un-wrapped blocks
@ -235,7 +237,7 @@ extern Status UncompressBlockContents(const char* data, size_t n,
extern Status UncompressBlockContentsForCompressionType(
const char* data, size_t n, BlockContents* contents,
uint32_t compress_format_version, const Slice& compression_dict,
CompressionType compression_type);
CompressionType compression_type, const ImmutableCFOptions &ioptions);
// Implementation details follow. Clients should ignore,

View File

@ -150,7 +150,7 @@ bool NotifyCollectTableCollectorsOnFinish(
}
Status ReadProperties(const Slice& handle_value, RandomAccessFileReader* file,
const Footer& footer, Env* env, Logger* logger,
const Footer& footer, const ImmutableCFOptions &ioptions,
TableProperties** table_properties) {
assert(table_properties);
@ -165,7 +165,7 @@ Status ReadProperties(const Slice& handle_value, RandomAccessFileReader* file,
read_options.verify_checksums = false;
Status s;
s = ReadBlockContents(file, footer, read_options, handle, &block_contents,
env, false /* decompress */);
ioptions, false /* decompress */);
if (!s.ok()) {
return s;
@ -219,7 +219,8 @@ Status ReadProperties(const Slice& handle_value, RandomAccessFileReader* file,
auto error_msg =
"Detect malformed value in properties meta-block:"
"\tkey: " + key + "\tval: " + raw_val.ToString();
Log(InfoLogLevel::ERROR_LEVEL, logger, "%s", error_msg.c_str());
Log(InfoLogLevel::ERROR_LEVEL, ioptions.info_log, "%s",
error_msg.c_str());
continue;
}
*(pos->second) = val;
@ -251,8 +252,9 @@ Status ReadProperties(const Slice& handle_value, RandomAccessFileReader* file,
}
Status ReadTableProperties(RandomAccessFileReader* file, uint64_t file_size,
uint64_t table_magic_number, Env* env,
Logger* info_log, TableProperties** properties) {
uint64_t table_magic_number,
const ImmutableCFOptions &ioptions,
TableProperties** properties) {
// -- Read metaindex block
Footer footer;
auto s = ReadFooterFromFile(file, file_size, &footer, table_magic_number);
@ -265,7 +267,7 @@ Status ReadTableProperties(RandomAccessFileReader* file, uint64_t file_size,
ReadOptions read_options;
read_options.verify_checksums = false;
s = ReadBlockContents(file, footer, read_options, metaindex_handle,
&metaindex_contents, env, false /* decompress */);
&metaindex_contents, ioptions, false /* decompress */);
if (!s.ok()) {
return s;
}
@ -282,8 +284,7 @@ Status ReadTableProperties(RandomAccessFileReader* file, uint64_t file_size,
TableProperties table_properties;
if (found_properties_block == true) {
s = ReadProperties(meta_iter->value(), file, footer, env, info_log,
properties);
s = ReadProperties(meta_iter->value(), file, footer, ioptions, properties);
} else {
s = Status::NotFound();
}
@ -305,7 +306,8 @@ Status FindMetaBlock(InternalIterator* meta_index_iter,
}
Status FindMetaBlock(RandomAccessFileReader* file, uint64_t file_size,
uint64_t table_magic_number, Env* env,
uint64_t table_magic_number,
const ImmutableCFOptions &ioptions,
const std::string& meta_block_name,
BlockHandle* block_handle) {
Footer footer;
@ -319,7 +321,7 @@ Status FindMetaBlock(RandomAccessFileReader* file, uint64_t file_size,
ReadOptions read_options;
read_options.verify_checksums = false;
s = ReadBlockContents(file, footer, read_options, metaindex_handle,
&metaindex_contents, env, false /* do decompression */);
&metaindex_contents, ioptions, false /* do decompression */);
if (!s.ok()) {
return s;
}
@ -332,7 +334,8 @@ Status FindMetaBlock(RandomAccessFileReader* file, uint64_t file_size,
}
Status ReadMetaBlock(RandomAccessFileReader* file, uint64_t file_size,
uint64_t table_magic_number, Env* env,
uint64_t table_magic_number,
const ImmutableCFOptions &ioptions,
const std::string& meta_block_name,
BlockContents* contents) {
Status status;
@ -348,7 +351,8 @@ Status ReadMetaBlock(RandomAccessFileReader* file, uint64_t file_size,
ReadOptions read_options;
read_options.verify_checksums = false;
status = ReadBlockContents(file, footer, read_options, metaindex_handle,
&metaindex_contents, env, false /* decompress */);
&metaindex_contents, ioptions,
false /* decompress */);
if (!status.ok()) {
return status;
}
@ -368,7 +372,7 @@ Status ReadMetaBlock(RandomAccessFileReader* file, uint64_t file_size,
// Reading metablock
return ReadBlockContents(file, footer, read_options, block_handle, contents,
env, false /* decompress */);
ioptions, false /* decompress */);
}
} // namespace rocksdb

View File

@ -94,7 +94,7 @@ bool NotifyCollectTableCollectorsOnFinish(
// *table_properties will point to a heap-allocated TableProperties
// object, otherwise value of `table_properties` will not be modified.
Status ReadProperties(const Slice& handle_value, RandomAccessFileReader* file,
const Footer& footer, Env* env, Logger* logger,
const Footer& footer, const ImmutableCFOptions &ioptions,
TableProperties** table_properties);
// Directly read the properties from the properties block of a plain table.
@ -102,8 +102,9 @@ Status ReadProperties(const Slice& handle_value, RandomAccessFileReader* file,
// *table_properties will point to a heap-allocated TableProperties
// object, otherwise value of `table_properties` will not be modified.
Status ReadTableProperties(RandomAccessFileReader* file, uint64_t file_size,
uint64_t table_magic_number, Env* env,
Logger* info_log, TableProperties** properties);
uint64_t table_magic_number,
const ImmutableCFOptions &ioptions,
TableProperties** properties);
// Find the meta block from the meta index block.
Status FindMetaBlock(InternalIterator* meta_index_iter,
@ -112,7 +113,8 @@ Status FindMetaBlock(InternalIterator* meta_index_iter,
// Find the meta block
Status FindMetaBlock(RandomAccessFileReader* file, uint64_t file_size,
uint64_t table_magic_number, Env* env,
uint64_t table_magic_number,
const ImmutableCFOptions &ioptions,
const std::string& meta_block_name,
BlockHandle* block_handle);
@ -120,7 +122,8 @@ Status FindMetaBlock(RandomAccessFileReader* file, uint64_t file_size,
// from `file` and initialize `contents` with contents of this block.
// Return Status::OK in case of success.
Status ReadMetaBlock(RandomAccessFileReader* file, uint64_t file_size,
uint64_t table_magic_number, Env* env,
uint64_t table_magic_number,
const ImmutableCFOptions &ioptions,
const std::string& meta_block_name,
BlockContents* contents);

View File

@ -128,7 +128,7 @@ Status PlainTableReader::Open(const ImmutableCFOptions& ioptions,
TableProperties* props = nullptr;
auto s = ReadTableProperties(file.get(), file_size, kPlainTableMagicNumber,
ioptions.env, ioptions.info_log, &props);
ioptions, &props);
if (!s.ok()) {
return s;
}
@ -293,13 +293,13 @@ Status PlainTableReader::PopulateIndex(TableProperties* props,
BlockContents bloom_block_contents;
auto s = ReadMetaBlock(file_info_.file.get(), file_size_,
kPlainTableMagicNumber, ioptions_.env,
kPlainTableMagicNumber, ioptions_,
BloomBlockBuilder::kBloomBlock, &bloom_block_contents);
bool index_in_file = s.ok();
BlockContents index_block_contents;
s = ReadMetaBlock(
file_info_.file.get(), file_size_, kPlainTableMagicNumber, ioptions_.env,
file_info_.file.get(), file_size_, kPlainTableMagicNumber, ioptions_,
PlainTableIndexBuilder::kPlainTableIndexBlock, &index_block_contents);
index_in_file &= s.ok();

View File

@ -2064,7 +2064,7 @@ TEST_F(PlainTableTest, BasicPlainTableProperties) {
TableProperties* props = nullptr;
auto s = ReadTableProperties(file_reader.get(), ss->contents().size(),
kPlainTableMagicNumber, Env::Default(), nullptr,
kPlainTableMagicNumber, ioptions,
&props);
std::unique_ptr<TableProperties> props_guard(props);
ASSERT_OK(s);

View File

@ -219,8 +219,7 @@ Status SstFileReader::ReadTableProperties(uint64_t table_magic_number,
uint64_t file_size) {
TableProperties* table_properties = nullptr;
Status s = rocksdb::ReadTableProperties(file, file_size, table_magic_number,
options_.env, options_.info_log.get(),
&table_properties);
ioptions_, &table_properties);
if (s.ok()) {
table_properties_.reset(table_properties);
} else {

View File

@ -154,7 +154,8 @@ inline void EncodeFixed64(char* buf, uint64_t value) {
// Pull the last 8 bits and cast it to a character
inline void PutFixed32(std::string* dst, uint32_t value) {
#if __BYTE_ORDER__ == __LITTLE_ENDIAN__
dst->append(static_cast<const char*>(&value), sizeof(value));
dst->append(const_cast<const char*>(reinterpret_cast<char*>(&value)),
sizeof(value));
#else
char buf[sizeof(value)];
EncodeFixed32(buf, value);
@ -164,7 +165,8 @@ inline void PutFixed32(std::string* dst, uint32_t value) {
inline void PutFixed64(std::string* dst, uint64_t value) {
#if __BYTE_ORDER__ == __LITTLE_ENDIAN__
dst->append(static_const<const char*>(&value), sizeof(value));
dst->append(const_cast<const char*>(reinterpret_cast<char*>(&value)),
sizeof(value));
#else
char buf[sizeof(value)];
EncodeFixed64(buf, value);

35
util/statistics_test.cc Normal file
View File

@ -0,0 +1,35 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
//
#include "port/stack_trace.h"
#include "util/testharness.h"
#include "util/testutil.h"
#include "rocksdb/statistics.h"
namespace rocksdb {
class StatisticsTest : public testing::Test {};
// Sanity check to make sure that contents and order of TickersNameMap
// match Tickers enum
TEST_F(StatisticsTest, Sanity) {
EXPECT_EQ(static_cast<size_t>(Tickers::TICKER_ENUM_MAX),
TickersNameMap.size());
for (uint32_t t = 0; t < Tickers::TICKER_ENUM_MAX; t++) {
auto pair = TickersNameMap[static_cast<size_t>(t)];
ASSERT_EQ(pair.first, t) << "Miss match at " << pair.second;
}
}
} // namespace rocksdb
int main(int argc, char** argv) {
rocksdb::port::InstallStackTraceHandler();
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}

View File

@ -6,6 +6,7 @@
#include <assert.h>
#include <condition_variable>
#include <functional>
#include <mutex>
#include <string>
#include <thread>

View File

@ -10,6 +10,7 @@
#pragma once
#include <atomic>
#include <functional>
#include <memory>
#include <unordered_map>
#include <vector>

View File

@ -7,6 +7,7 @@
#ifndef ROCKSDB_LITE
#include <functional>
#include "util/random.h"
#include "utilities/persistent_cache/hash_table.h"
#include "utilities/persistent_cache/lrulist.h"

View File

@ -5,6 +5,7 @@
#include "rocksdb/utilities/sim_cache.h"
#include <atomic>
#include "port/port.h"
namespace rocksdb {