Compare commits
30 Commits
Author | SHA1 | Date | |
---|---|---|---|
|
3500b728d1 | ||
|
275488d483 | ||
|
b8674494bc | ||
|
f659afc64a | ||
|
68c5ae432f | ||
|
8277cd14aa | ||
|
acb2e98a90 | ||
|
241c45d1df | ||
|
d19646e1c1 | ||
|
462c21dda6 | ||
|
d6cfa64721 | ||
|
1f055723b7 | ||
|
dcafd40aed | ||
|
a3576c7c4b | ||
|
6fa04041de | ||
|
4abd57290c | ||
|
f35574a16d | ||
|
00c6c53fc8 | ||
|
c2ca7a9f99 | ||
|
d4b0ac2760 | ||
|
a7a2005873 | ||
|
acf6d6bcab | ||
|
241267966c | ||
|
b642e94c88 | ||
|
53571a7a70 | ||
|
7079b78827 | ||
|
512e441819 | ||
|
62a1c55418 | ||
|
adce9c0a87 | ||
|
dabcce9dfb |
@ -1,5 +1,5 @@
|
||||
# Rocksdb Change Log
|
||||
## Unreleased
|
||||
## 5.1.0 (01/13/2017)
|
||||
### Public API Change
|
||||
* Support dynamically change `delete_obsolete_files_period_micros` option via SetDBOptions().
|
||||
* Added EventListener::OnExternalFileIngested which will be called when IngestExternalFile() add a file successfully.
|
||||
|
5
Makefile
5
Makefile
@ -223,11 +223,6 @@ default: all
|
||||
WARNING_FLAGS = -W -Wextra -Wall -Wsign-compare -Wshadow \
|
||||
-Wno-unused-parameter
|
||||
|
||||
ifndef DISABLE_WARNING_AS_ERROR
|
||||
WARNING_FLAGS += -Werror
|
||||
endif
|
||||
|
||||
|
||||
ifdef LUA_PATH
|
||||
|
||||
ifndef LUA_INCLUDE
|
||||
|
@ -51,12 +51,7 @@ if [ -z "$ROCKSDB_NO_FBCODE" -a -d /mnt/gvfs/third-party ]; then
|
||||
FBCODE_BUILD="true"
|
||||
# If we're compiling with TSAN we need pic build
|
||||
PIC_BUILD=$COMPILE_WITH_TSAN
|
||||
if [ -z "$ROCKSDB_FBCODE_BUILD_WITH_481" ]; then
|
||||
source "$PWD/build_tools/fbcode_config.sh"
|
||||
else
|
||||
# we need this to build with MySQL. Don't use for other purposes.
|
||||
source "$PWD/build_tools/fbcode_config4.8.1.sh"
|
||||
fi
|
||||
source "$PWD/build_tools/fbcode_config.sh"
|
||||
fi
|
||||
|
||||
# Delete existing output, if it exists
|
||||
|
@ -1,18 +1,19 @@
|
||||
GCC_BASE=/mnt/gvfs/third-party2/gcc/cf7d14c625ce30bae1a4661c2319c5a283e4dd22/4.9.x/centos6-native/108cf83
|
||||
CLANG_BASE=/mnt/gvfs/third-party2/llvm-fb/8598c375b0e94e1448182eb3df034704144a838d/stable/centos6-native/3f16ddd
|
||||
LIBGCC_BASE=/mnt/gvfs/third-party2/libgcc/d6e0a7da6faba45f5e5b1638f9edd7afc2f34e7d/4.9.x/gcc-4.9-glibc-2.20/024dbc3
|
||||
GLIBC_BASE=/mnt/gvfs/third-party2/glibc/d282e6e8f3d20f4e40a516834847bdc038e07973/2.20/gcc-4.9-glibc-2.20/500e281
|
||||
SNAPPY_BASE=/mnt/gvfs/third-party2/snappy/8c38a4c1e52b4c2cc8a9cdc31b9c947ed7dbfcb4/1.1.3/gcc-4.9-glibc-2.20/e9936bf
|
||||
ZLIB_BASE=/mnt/gvfs/third-party2/zlib/0882df3713c7a84f15abe368dc004581f20b39d7/1.2.8/gcc-5-glibc-2.23/9bc6787
|
||||
BZIP2_BASE=/mnt/gvfs/third-party2/bzip2/740325875f6729f42d28deaa2147b0854f3a347e/1.0.6/gcc-5-glibc-2.23/9bc6787
|
||||
LZ4_BASE=/mnt/gvfs/third-party2/lz4/0e790b441e2d9acd68d51e1d2e028f88c6a79ddf/r131/gcc-5-glibc-2.23/9bc6787
|
||||
ZSTD_BASE=/mnt/gvfs/third-party2/zstd/9455f75ff7f4831dc9fda02a6a0f8c68922fad8f/1.0.0/gcc-5-glibc-2.23/9bc6787
|
||||
GFLAGS_BASE=/mnt/gvfs/third-party2/gflags/f001a51b2854957676d07306ef3abf67186b5c8b/2.1.1/gcc-4.8.1-glibc-2.17/c3f970a
|
||||
JEMALLOC_BASE=/mnt/gvfs/third-party2/jemalloc/fc8a13ca1fffa4d0765c716c5a0b49f0c107518f/master/gcc-5-glibc-2.23/1c32b4b
|
||||
NUMA_BASE=/mnt/gvfs/third-party2/numa/17c514c4d102a25ca15f4558be564eeed76f4b6a/2.0.8/gcc-5-glibc-2.23/9bc6787
|
||||
LIBUNWIND_BASE=/mnt/gvfs/third-party2/libunwind/ad576de2a1ea560c4d3434304f0fc4e079bede42/trunk/gcc-5-glibc-2.23/b1847cb
|
||||
TBB_BASE=/mnt/gvfs/third-party2/tbb/9d9a554877d0c5bef330fe818ab7178806dd316a/4.0_update2/gcc-4.9-glibc-2.20/e9936bf
|
||||
KERNEL_HEADERS_BASE=/mnt/gvfs/third-party2/kernel-headers/7c111ff27e0c466235163f00f280a9d617c3d2ec/4.0.9-36_fbk5_2933_gd092e3f/gcc-5-glibc-2.23/da39a3e
|
||||
BINUTILS_BASE=/mnt/gvfs/third-party2/binutils/b7fd454c4b10c6a81015d4524ed06cdeab558490/2.26/centos6-native/da39a3e
|
||||
VALGRIND_BASE=/mnt/gvfs/third-party2/valgrind/d7f4d4d86674a57668e3a96f76f0e17dd0eb8765/3.10.0/gcc-4.9-glibc-2.20/e9936bf
|
||||
LUA_BASE=/mnt/gvfs/third-party2/lua/61e4abf5813bbc39bc4f548757ccfcadde175a48/5.2.3/gcc-4.9-glibc-2.20/690f0d7
|
||||
# Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved.
|
||||
GCC_BASE=/mnt/gvfs/third-party2/gcc/7331085db891a2ef4a88a48a751d834e8d68f4cb/7.x/centos7-native/b2ef2b6
|
||||
CLANG_BASE=/mnt/gvfs/third-party2/llvm-fb/963d9aeda70cc4779885b1277484fe7544a04e3e/9.0.0/platform007/9e92d53/
|
||||
LIBGCC_BASE=/mnt/gvfs/third-party2/libgcc/6ace84e956873d53638c738b6f65f3f469cca74c/7.x/platform007/5620abc
|
||||
GLIBC_BASE=/mnt/gvfs/third-party2/glibc/192b0f42d63dcf6210d6ceae387b49af049e6e0c/2.26/platform007/f259413
|
||||
SNAPPY_BASE=/mnt/gvfs/third-party2/snappy/7f9bdaada18f59bc27ec2b0871eb8a6144343aef/1.1.3/platform007/ca4da3d
|
||||
ZLIB_BASE=/mnt/gvfs/third-party2/zlib/2d9f0b9a4274cc21f61272a9e89bdb859bce8f1f/1.2.8/platform007/ca4da3d
|
||||
BZIP2_BASE=/mnt/gvfs/third-party2/bzip2/dc49a21c5fceec6456a7a28a94dcd16690af1337/1.0.6/platform007/ca4da3d
|
||||
LZ4_BASE=/mnt/gvfs/third-party2/lz4/0f607f8fc442ea7d6b876931b1898bb573d5e5da/1.9.1/platform007/ca4da3d
|
||||
ZSTD_BASE=/mnt/gvfs/third-party2/zstd/ca22bc441a4eb709e9e0b1f9fec9750fed7b31c5/1.4.x/platform007/15a3614
|
||||
GFLAGS_BASE=/mnt/gvfs/third-party2/gflags/0b9929d2588991c65a57168bf88aff2db87c5d48/2.2.0/platform007/ca4da3d
|
||||
JEMALLOC_BASE=/mnt/gvfs/third-party2/jemalloc/c26f08f47ac35fc31da2633b7da92d6b863246eb/master/platform007/c26c002
|
||||
NUMA_BASE=/mnt/gvfs/third-party2/numa/3f3fb57a5ccc5fd21c66416c0b83e0aa76a05376/2.0.11/platform007/ca4da3d
|
||||
LIBUNWIND_BASE=/mnt/gvfs/third-party2/libunwind/40c73d874898b386a71847f1b99115d93822d11f/1.4/platform007/6f3e0a9
|
||||
TBB_BASE=/mnt/gvfs/third-party2/tbb/4ce8e8dba77cdbd81b75d6f0c32fd7a1b76a11ec/2018_U5/platform007/ca4da3d
|
||||
KERNEL_HEADERS_BASE=/mnt/gvfs/third-party2/kernel-headers/fb251ecd2f5ae16f8671f7014c246e52a748fe0b/fb/platform007/da39a3e
|
||||
BINUTILS_BASE=/mnt/gvfs/third-party2/binutils/ab9f09bba370e7066cafd4eb59752db93f2e8312/2.29.1/platform007/15a3614
|
||||
VALGRIND_BASE=/mnt/gvfs/third-party2/valgrind/d42d152a15636529b0861ec493927200ebebca8e/3.15.0/platform007/ca4da3d
|
||||
LUA_BASE=/mnt/gvfs/third-party2/lua/f0cd714433206d5139df61659eb7b28b1dea6683/5.3.4/platform007/5007832
|
||||
|
@ -13,7 +13,7 @@ source "$BASEDIR/dependencies.sh"
|
||||
CFLAGS=""
|
||||
|
||||
# libgcc
|
||||
LIBGCC_INCLUDE="$LIBGCC_BASE/include"
|
||||
LIBGCC_INCLUDE="$LIBGCC_BASE/include/c++/7.3.0"
|
||||
LIBGCC_LIBS=" -L $LIBGCC_BASE/lib"
|
||||
|
||||
# glibc
|
||||
@ -43,12 +43,16 @@ if test -z $PIC_BUILD; then
|
||||
LZ4_INCLUDE=" -I $LZ4_BASE/include/"
|
||||
LZ4_LIBS=" $LZ4_BASE/lib/liblz4.a"
|
||||
CFLAGS+=" -DLZ4"
|
||||
|
||||
ZSTD_INCLUDE=" -I $ZSTD_BASE/include/"
|
||||
ZSTD_LIBS=" $ZSTD_BASE/lib/libzstd.a"
|
||||
CFLAGS+=" -DZSTD"
|
||||
fi
|
||||
|
||||
ZSTD_INCLUDE=" -I $ZSTD_BASE/include/"
|
||||
if test -z $PIC_BUILD; then
|
||||
ZSTD_LIBS=" $ZSTD_BASE/lib/libzstd.a"
|
||||
else
|
||||
ZSTD_LIBS=" $ZSTD_BASE/lib/libzstd_pic.a"
|
||||
fi
|
||||
CFLAGS+=" -DZSTD"
|
||||
|
||||
# location of gflags headers and libraries
|
||||
GFLAGS_INCLUDE=" -I $GFLAGS_BASE/include/"
|
||||
if test -z $PIC_BUILD; then
|
||||
@ -56,7 +60,7 @@ if test -z $PIC_BUILD; then
|
||||
else
|
||||
GFLAGS_LIBS=" $GFLAGS_BASE/lib/libgflags_pic.a"
|
||||
fi
|
||||
CFLAGS+=" -DGFLAGS=google"
|
||||
CFLAGS+=" -DGFLAGS=gflags"
|
||||
|
||||
# location of jemalloc
|
||||
JEMALLOC_INCLUDE=" -I $JEMALLOC_BASE/include/"
|
||||
@ -82,7 +86,7 @@ fi
|
||||
CFLAGS+=" -DTBB"
|
||||
|
||||
# use Intel SSE support for checksum calculations
|
||||
export USE_SSE=1
|
||||
export USE_SSE=" -msse -msse4.2 "
|
||||
|
||||
BINUTILS="$BINUTILS_BASE/bin"
|
||||
AR="$BINUTILS/ar"
|
||||
@ -104,8 +108,8 @@ if [ -z "$USE_CLANG" ]; then
|
||||
CXX="$GCC_BASE/bin/g++"
|
||||
|
||||
CFLAGS+=" -B$BINUTILS/gold"
|
||||
CFLAGS+=" -isystem $GLIBC_INCLUDE"
|
||||
CFLAGS+=" -isystem $LIBGCC_INCLUDE"
|
||||
CFLAGS+=" -isystem $GLIBC_INCLUDE"
|
||||
JEMALLOC=1
|
||||
else
|
||||
# clang
|
||||
@ -116,8 +120,8 @@ else
|
||||
KERNEL_HEADERS_INCLUDE="$KERNEL_HEADERS_BASE/include"
|
||||
|
||||
CFLAGS+=" -B$BINUTILS/gold -nostdinc -nostdlib"
|
||||
CFLAGS+=" -isystem $LIBGCC_BASE/include/c++/4.9.x "
|
||||
CFLAGS+=" -isystem $LIBGCC_BASE/include/c++/4.9.x/x86_64-facebook-linux "
|
||||
CFLAGS+=" -isystem $LIBGCC_BASE/include/c++/7.x "
|
||||
CFLAGS+=" -isystem $LIBGCC_BASE/include/c++/7.x/x86_64-facebook-linux "
|
||||
CFLAGS+=" -isystem $GLIBC_INCLUDE"
|
||||
CFLAGS+=" -isystem $LIBGCC_INCLUDE"
|
||||
CFLAGS+=" -isystem $CLANG_INCLUDE"
|
||||
@ -128,13 +132,14 @@ else
|
||||
fi
|
||||
|
||||
CFLAGS+=" $DEPS_INCLUDE"
|
||||
CFLAGS+=" -DROCKSDB_PLATFORM_POSIX -DROCKSDB_LIB_IO_POSIX -DROCKSDB_FALLOCATE_PRESENT -DROCKSDB_MALLOC_USABLE_SIZE"
|
||||
CFLAGS+=" -DROCKSDB_PLATFORM_POSIX -DROCKSDB_LIB_IO_POSIX -DROCKSDB_FALLOCATE_PRESENT -DROCKSDB_MALLOC_USABLE_SIZE -DROCKSDB_RANGESYNC_PRESENT -DROCKSDB_SCHED_GETCPU_PRESENT -DROCKSDB_SUPPORT_THREAD_LOCAL -DHAVE_SSE42"
|
||||
CXXFLAGS+=" $CFLAGS"
|
||||
|
||||
EXEC_LDFLAGS=" $SNAPPY_LIBS $ZLIB_LIBS $BZIP_LIBS $LZ4_LIBS $ZSTD_LIBS $GFLAGS_LIBS $NUMA_LIB $TBB_LIBS"
|
||||
EXEC_LDFLAGS+=" -Wl,--dynamic-linker,/usr/local/fbcode/gcc-4.9-glibc-2.20/lib/ld.so"
|
||||
EXEC_LDFLAGS+=" -B$BINUTILS/gold"
|
||||
EXEC_LDFLAGS+=" -Wl,--dynamic-linker,/usr/local/fbcode/platform007/lib/ld.so"
|
||||
EXEC_LDFLAGS+=" $LIBUNWIND"
|
||||
EXEC_LDFLAGS+=" -Wl,-rpath=/usr/local/fbcode/gcc-4.9-glibc-2.20/lib"
|
||||
EXEC_LDFLAGS+=" -Wl,-rpath=/usr/local/fbcode/platform007/lib"
|
||||
# required by libtbb
|
||||
EXEC_LDFLAGS+=" -ldl"
|
||||
|
||||
@ -144,12 +149,4 @@ EXEC_LDFLAGS_SHARED="$SNAPPY_LIBS $ZLIB_LIBS $BZIP_LIBS $LZ4_LIBS $ZSTD_LIBS $GF
|
||||
|
||||
VALGRIND_VER="$VALGRIND_BASE/bin/"
|
||||
|
||||
LUA_PATH="$LUA_BASE"
|
||||
|
||||
if test -z $PIC_BUILD; then
|
||||
LUA_LIB=" $LUA_PATH/lib/liblua.a"
|
||||
else
|
||||
LUA_LIB=" $LUA_PATH/lib/liblua_pic.a"
|
||||
fi
|
||||
|
||||
export CC CXX AR CFLAGS CXXFLAGS EXEC_LDFLAGS EXEC_LDFLAGS_SHARED VALGRIND_VER JEMALLOC_LIB JEMALLOC_INCLUDE CLANG_ANALYZER CLANG_SCAN_BUILD LUA_PATH LUA_LIB
|
||||
export CC CXX AR CFLAGS CXXFLAGS EXEC_LDFLAGS EXEC_LDFLAGS_SHARED VALGRIND_VER JEMALLOC_LIB JEMALLOC_INCLUDE CLANG_ANALYZER CLANG_SCAN_BUILD
|
||||
|
@ -370,7 +370,8 @@ ColumnFamilyData::ColumnFamilyData(
|
||||
column_family_set_(column_family_set),
|
||||
pending_flush_(false),
|
||||
pending_compaction_(false),
|
||||
prev_compaction_needed_bytes_(0) {
|
||||
prev_compaction_needed_bytes_(0),
|
||||
allow_2pc_(db_options.allow_2pc) {
|
||||
Ref();
|
||||
|
||||
// Convert user defined table properties collector factories to internal ones.
|
||||
@ -492,6 +493,25 @@ ColumnFamilyOptions ColumnFamilyData::GetLatestCFOptions() const {
|
||||
return BuildColumnFamilyOptions(initial_cf_options_, mutable_cf_options_);
|
||||
}
|
||||
|
||||
uint64_t ColumnFamilyData::OldestLogToKeep() {
|
||||
auto current_log = GetLogNumber();
|
||||
|
||||
if (allow_2pc_) {
|
||||
auto imm_prep_log = imm()->GetMinLogContainingPrepSection();
|
||||
auto mem_prep_log = mem()->GetMinLogContainingPrepSection();
|
||||
|
||||
if (imm_prep_log > 0 && imm_prep_log < current_log) {
|
||||
current_log = imm_prep_log;
|
||||
}
|
||||
|
||||
if (mem_prep_log > 0 && mem_prep_log < current_log) {
|
||||
current_log = mem_prep_log;
|
||||
}
|
||||
}
|
||||
|
||||
return current_log;
|
||||
}
|
||||
|
||||
const double kIncSlowdownRatio = 0.8;
|
||||
const double kDecSlowdownRatio = 1 / kIncSlowdownRatio;
|
||||
const double kNearStopSlowdownRatio = 0.6;
|
||||
|
@ -239,6 +239,9 @@ class ColumnFamilyData {
|
||||
uint64_t GetTotalSstFilesSize() const; // REQUIRE: DB mutex held
|
||||
void SetMemtable(MemTable* new_mem) { mem_ = new_mem; }
|
||||
|
||||
// calculate the oldest log needed for the durability of this column family
|
||||
uint64_t OldestLogToKeep();
|
||||
|
||||
// See Memtable constructor for explanation of earliest_seq param.
|
||||
MemTable* ConstructNewMemtable(const MutableCFOptions& mutable_cf_options,
|
||||
SequenceNumber earliest_seq);
|
||||
@ -404,6 +407,9 @@ class ColumnFamilyData {
|
||||
bool pending_compaction_;
|
||||
|
||||
uint64_t prev_compaction_needed_bytes_;
|
||||
|
||||
// if the database was opened with 2pc enabled
|
||||
bool allow_2pc_;
|
||||
};
|
||||
|
||||
// ColumnFamilySet has interesting thread-safety requirements
|
||||
|
@ -253,6 +253,61 @@ TEST_F(CompactFilesTest, CapturingPendingFiles) {
|
||||
delete db;
|
||||
}
|
||||
|
||||
TEST_F(CompactFilesTest, CompactionFilterWithGetSv) {
|
||||
class FilterWithGet : public CompactionFilter {
|
||||
public:
|
||||
virtual bool Filter(int level, const Slice& key, const Slice& value,
|
||||
std::string* new_value,
|
||||
bool* value_changed) const override {
|
||||
if (db_ == nullptr) {
|
||||
return true;
|
||||
}
|
||||
std::string res;
|
||||
db_->Get(ReadOptions(), "", &res);
|
||||
return true;
|
||||
}
|
||||
|
||||
void SetDB(DB* db) {
|
||||
db_ = db;
|
||||
}
|
||||
|
||||
virtual const char* Name() const override { return "FilterWithGet"; }
|
||||
|
||||
private:
|
||||
DB* db_;
|
||||
};
|
||||
|
||||
|
||||
std::shared_ptr<FilterWithGet> cf(new FilterWithGet());
|
||||
|
||||
Options options;
|
||||
options.create_if_missing = true;
|
||||
options.compaction_filter = cf.get();
|
||||
|
||||
DB* db = nullptr;
|
||||
DestroyDB(db_name_, options);
|
||||
Status s = DB::Open(options, db_name_, &db);
|
||||
ASSERT_OK(s);
|
||||
|
||||
cf->SetDB(db);
|
||||
|
||||
// Write one L0 file
|
||||
db->Put(WriteOptions(), "K1", "V1");
|
||||
db->Flush(FlushOptions());
|
||||
|
||||
// Compact all L0 files using CompactFiles
|
||||
rocksdb::ColumnFamilyMetaData meta;
|
||||
db->GetColumnFamilyMetaData(&meta);
|
||||
for (auto& file : meta.levels[0].files) {
|
||||
std::string fname = file.db_path + "/" + file.name;
|
||||
ASSERT_OK(
|
||||
db->CompactFiles(rocksdb::CompactionOptions(), {fname}, 0));
|
||||
}
|
||||
|
||||
|
||||
delete db;
|
||||
}
|
||||
|
||||
} // namespace rocksdb
|
||||
|
||||
int main(int argc, char** argv) {
|
||||
|
@ -771,9 +771,9 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
|
||||
key, sub_compact->current_output_file_size) &&
|
||||
sub_compact->builder != nullptr) {
|
||||
CompactionIterationStats range_del_out_stats;
|
||||
status =
|
||||
FinishCompactionOutputFile(input->status(), sub_compact,
|
||||
range_del_agg.get(), &range_del_out_stats);
|
||||
status = FinishCompactionOutputFile(input->status(), sub_compact,
|
||||
range_del_agg.get(),
|
||||
&range_del_out_stats, &key);
|
||||
RecordDroppedKeys(range_del_out_stats,
|
||||
&sub_compact->compaction_job_stats);
|
||||
if (!status.ok()) {
|
||||
|
@ -9,6 +9,7 @@
|
||||
|
||||
#include "db/db_test_util.h"
|
||||
#include "port/stack_trace.h"
|
||||
#include "util/fault_injection_test_env.h"
|
||||
#include "util/sync_point.h"
|
||||
|
||||
namespace rocksdb {
|
||||
@ -47,6 +48,38 @@ TEST_F(DBFlushTest, FlushWhileWritingManifest) {
|
||||
#endif // ROCKSDB_LITE
|
||||
}
|
||||
|
||||
TEST_F(DBFlushTest, SyncFail) {
|
||||
std::unique_ptr<FaultInjectionTestEnv> fault_injection_env(
|
||||
new FaultInjectionTestEnv(Env::Default()));
|
||||
Options options;
|
||||
options.disable_auto_compactions = true;
|
||||
options.env = fault_injection_env.get();
|
||||
|
||||
SyncPoint::GetInstance()->LoadDependency(
|
||||
{{"DBFlushTest::SyncFail:1", "DBImpl::SyncClosedLogs:Start"},
|
||||
{"DBImpl::SyncClosedLogs:Failed", "DBFlushTest::SyncFail:2"}});
|
||||
SyncPoint::GetInstance()->EnableProcessing();
|
||||
|
||||
Reopen(options);
|
||||
Put("key", "value");
|
||||
auto* cfd =
|
||||
reinterpret_cast<ColumnFamilyHandleImpl*>(db_->DefaultColumnFamily())
|
||||
->cfd();
|
||||
int refs_before = cfd->current()->TEST_refs();
|
||||
FlushOptions flush_options;
|
||||
flush_options.wait = false;
|
||||
ASSERT_OK(dbfull()->Flush(flush_options));
|
||||
fault_injection_env->SetFilesystemActive(false);
|
||||
TEST_SYNC_POINT("DBFlushTest::SyncFail:1");
|
||||
TEST_SYNC_POINT("DBFlushTest::SyncFail:2");
|
||||
fault_injection_env->SetFilesystemActive(true);
|
||||
dbfull()->TEST_WaitForFlushMemTable();
|
||||
ASSERT_EQ("", FilesPerLevel()); // flush failed.
|
||||
// Flush job should release ref count to current version.
|
||||
ASSERT_EQ(refs_before, cfd->current()->TEST_refs());
|
||||
Destroy(options);
|
||||
}
|
||||
|
||||
} // namespace rocksdb
|
||||
|
||||
int main(int argc, char** argv) {
|
||||
|
@ -342,6 +342,7 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname)
|
||||
last_stats_dump_time_microsec_(0),
|
||||
next_job_id_(1),
|
||||
has_unpersisted_data_(false),
|
||||
unable_to_flush_oldest_log_(false),
|
||||
env_options_(BuildDBOptions(immutable_db_options_, mutable_db_options_)),
|
||||
num_running_ingest_file_(0),
|
||||
#ifndef ROCKSDB_LITE
|
||||
@ -663,6 +664,10 @@ void DBImpl::MaybeDumpStats() {
|
||||
}
|
||||
|
||||
uint64_t DBImpl::FindMinPrepLogReferencedByMemTable() {
|
||||
if (!allow_2pc()) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
uint64_t min_log = 0;
|
||||
|
||||
// we must look through the memtables for two phase transactions
|
||||
@ -707,6 +712,11 @@ void DBImpl::MarkLogAsContainingPrepSection(uint64_t log) {
|
||||
}
|
||||
|
||||
uint64_t DBImpl::FindMinLogContainingOutstandingPrep() {
|
||||
|
||||
if (!allow_2pc()) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
std::lock_guard<std::mutex> lock(prep_heap_mutex_);
|
||||
uint64_t min_log = 0;
|
||||
|
||||
@ -1832,6 +1842,7 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd,
|
||||
}
|
||||
|
||||
Status DBImpl::SyncClosedLogs(JobContext* job_context) {
|
||||
TEST_SYNC_POINT("DBImpl::SyncClosedLogs:Start");
|
||||
mutex_.AssertHeld();
|
||||
autovector<log::Writer*, 1> logs_to_sync;
|
||||
uint64_t current_log_number = logfile_number_;
|
||||
@ -1868,6 +1879,7 @@ Status DBImpl::SyncClosedLogs(JobContext* job_context) {
|
||||
MarkLogsSynced(current_log_number - 1, true, s);
|
||||
if (!s.ok()) {
|
||||
bg_error_ = s;
|
||||
TEST_SYNC_POINT("DBImpl::SyncClosedLogs:Failed");
|
||||
return s;
|
||||
}
|
||||
}
|
||||
@ -1918,6 +1930,8 @@ Status DBImpl::FlushMemTableToOutputFile(
|
||||
// is unlocked by the current thread.
|
||||
if (s.ok()) {
|
||||
s = flush_job.Run(&file_meta);
|
||||
} else {
|
||||
flush_job.Cancel();
|
||||
}
|
||||
|
||||
if (s.ok()) {
|
||||
@ -2135,7 +2149,7 @@ Status DBImpl::CompactFiles(
|
||||
immutable_db_options_.info_log.get());
|
||||
|
||||
// Perform CompactFiles
|
||||
SuperVersion* sv = GetAndRefSuperVersion(cfd);
|
||||
SuperVersion* sv = cfd->GetReferencedSuperVersion(&mutex_);
|
||||
{
|
||||
InstrumentedMutexLock l(&mutex_);
|
||||
|
||||
@ -2147,7 +2161,12 @@ Status DBImpl::CompactFiles(
|
||||
input_file_names, output_level,
|
||||
output_path_id, &job_context, &log_buffer);
|
||||
}
|
||||
ReturnAndCleanupSuperVersion(cfd, sv);
|
||||
if (sv->Unref()) {
|
||||
mutex_.Lock();
|
||||
sv->Cleanup();
|
||||
mutex_.Unlock();
|
||||
delete sv;
|
||||
}
|
||||
|
||||
// Find and delete obsolete files
|
||||
{
|
||||
@ -2505,7 +2524,7 @@ Status DBImpl::SetDBOptions(
|
||||
mutable_db_options_ = new_options;
|
||||
|
||||
if (total_log_size_ > GetMaxTotalWalSize()) {
|
||||
FlushColumnFamilies();
|
||||
MaybeFlushColumnFamilies();
|
||||
}
|
||||
|
||||
persist_options_status = PersistOptions();
|
||||
@ -2752,7 +2771,7 @@ void DBImpl::MarkLogsSynced(
|
||||
++it;
|
||||
}
|
||||
}
|
||||
assert(logs_.empty() || logs_[0].number > up_to ||
|
||||
assert(!status.ok() || logs_.empty() || logs_[0].number > up_to ||
|
||||
(logs_.size() == 1 && !logs_[0].getting_synced));
|
||||
log_sync_cv_.SignalAll();
|
||||
}
|
||||
@ -4698,9 +4717,8 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
|
||||
versions_->GetColumnFamilySet()->NumberOfColumnFamilies() == 1);
|
||||
|
||||
if (UNLIKELY(!single_column_family_mode_ &&
|
||||
!alive_log_files_.begin()->getting_flushed &&
|
||||
total_log_size_ > GetMaxTotalWalSize())) {
|
||||
FlushColumnFamilies();
|
||||
MaybeFlushColumnFamilies();
|
||||
} else if (UNLIKELY(write_buffer_manager_->ShouldFlush())) {
|
||||
// Before a new memtable is added in SwitchMemtable(),
|
||||
// write_buffer_manager_->ShouldFlush() will keep returning true. If another
|
||||
@ -5018,28 +5036,40 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
|
||||
return status;
|
||||
}
|
||||
|
||||
void DBImpl::FlushColumnFamilies() {
|
||||
void DBImpl::MaybeFlushColumnFamilies() {
|
||||
mutex_.AssertHeld();
|
||||
|
||||
WriteContext context;
|
||||
|
||||
if (alive_log_files_.begin()->getting_flushed) {
|
||||
return;
|
||||
}
|
||||
|
||||
uint64_t flush_column_family_if_log_file = alive_log_files_.begin()->number;
|
||||
alive_log_files_.begin()->getting_flushed = true;
|
||||
auto oldest_alive_log = alive_log_files_.begin()->number;
|
||||
auto oldest_log_with_uncommited_prep = FindMinLogContainingOutstandingPrep();
|
||||
|
||||
if (allow_2pc() &&
|
||||
unable_to_flush_oldest_log_ &&
|
||||
oldest_log_with_uncommited_prep > 0 &&
|
||||
oldest_log_with_uncommited_prep <= oldest_alive_log) {
|
||||
// we already attempted to flush all column families dependent on
|
||||
// the oldest alive log but the log still contained uncommited transactions.
|
||||
// the oldest alive log STILL contains uncommited transaction so there
|
||||
// is still nothing that we can do.
|
||||
return;
|
||||
}
|
||||
|
||||
WriteContext context;
|
||||
|
||||
Log(InfoLogLevel::INFO_LEVEL, immutable_db_options_.info_log,
|
||||
"Flushing all column families with data in WAL number %" PRIu64
|
||||
". Total log size is %" PRIu64 " while max_total_wal_size is %" PRIu64,
|
||||
flush_column_family_if_log_file, total_log_size_, GetMaxTotalWalSize());
|
||||
oldest_alive_log, total_log_size_, GetMaxTotalWalSize());
|
||||
// no need to refcount because drop is happening in write thread, so can't
|
||||
// happen while we're in the write thread
|
||||
for (auto cfd : *versions_->GetColumnFamilySet()) {
|
||||
if (cfd->IsDropped()) {
|
||||
continue;
|
||||
}
|
||||
if (cfd->GetLogNumber() <= flush_column_family_if_log_file) {
|
||||
if (cfd->OldestLogToKeep() <= oldest_alive_log) {
|
||||
auto status = SwitchMemtable(cfd, &context);
|
||||
if (!status.ok()) {
|
||||
break;
|
||||
@ -5049,6 +5079,26 @@ void DBImpl::FlushColumnFamilies() {
|
||||
}
|
||||
}
|
||||
MaybeScheduleFlushOrCompaction();
|
||||
|
||||
// we only mark this log as getting flushed if we have successfully
|
||||
// flushed all data in this log. If this log contains outstanding prepred
|
||||
// transactions then we cannot flush this log until those transactions are commited.
|
||||
|
||||
unable_to_flush_oldest_log_ = false;
|
||||
|
||||
if (allow_2pc()) {
|
||||
if (oldest_log_with_uncommited_prep == 0 ||
|
||||
oldest_log_with_uncommited_prep > oldest_alive_log) {
|
||||
// this log contains no outstanding prepared transactions
|
||||
alive_log_files_.begin()->getting_flushed = true;
|
||||
} else {
|
||||
Log(InfoLogLevel::WARN_LEVEL, immutable_db_options_.info_log,
|
||||
"Unable to release oldest log due to uncommited transaction");
|
||||
unable_to_flush_oldest_log_ = true;
|
||||
}
|
||||
} else {
|
||||
alive_log_files_.begin()->getting_flushed = true;
|
||||
}
|
||||
}
|
||||
|
||||
uint64_t DBImpl::GetMaxTotalWalSize() const {
|
||||
|
21
db/db_impl.h
21
db/db_impl.h
@ -308,6 +308,16 @@ class DBImpl : public DB {
|
||||
ColumnFamilyHandle* column_family = nullptr,
|
||||
bool disallow_trivial_move = false);
|
||||
|
||||
void TEST_MaybeFlushColumnFamilies();
|
||||
|
||||
bool TEST_UnableToFlushOldestLog() {
|
||||
return unable_to_flush_oldest_log_;
|
||||
}
|
||||
|
||||
bool TEST_IsLogGettingFlushed() {
|
||||
return alive_log_files_.begin()->getting_flushed;
|
||||
}
|
||||
|
||||
// Force current memtable contents to be flushed.
|
||||
Status TEST_FlushMemTable(bool wait = true,
|
||||
ColumnFamilyHandle* cfh = nullptr);
|
||||
@ -734,7 +744,7 @@ class DBImpl : public DB {
|
||||
// REQUIRES: mutex locked
|
||||
Status PersistOptions();
|
||||
|
||||
void FlushColumnFamilies();
|
||||
void MaybeFlushColumnFamilies();
|
||||
|
||||
uint64_t GetMaxTotalWalSize() const;
|
||||
|
||||
@ -994,6 +1004,15 @@ class DBImpl : public DB {
|
||||
// Used when disableWAL is true.
|
||||
bool has_unpersisted_data_;
|
||||
|
||||
|
||||
// if an attempt was made to flush all column families that
|
||||
// the oldest log depends on but uncommited data in the oldest
|
||||
// log prevents the log from being released.
|
||||
// We must attempt to free the dependent memtables again
|
||||
// at a later time after the transaction in the oldest
|
||||
// log is fully commited.
|
||||
bool unable_to_flush_oldest_log_;
|
||||
|
||||
static const int KEEP_LOG_FILE_NUM = 1000;
|
||||
// MSVC version 1800 still does not have constexpr for ::max()
|
||||
static const uint64_t kNoTimeOut = port::kMaxUint64;
|
||||
|
@ -19,6 +19,11 @@ uint64_t DBImpl::TEST_GetLevel0TotalSize() {
|
||||
return default_cf_handle_->cfd()->current()->storage_info()->NumLevelBytes(0);
|
||||
}
|
||||
|
||||
void DBImpl::TEST_MaybeFlushColumnFamilies() {
|
||||
InstrumentedMutexLock l(&mutex_);
|
||||
MaybeFlushColumnFamilies();
|
||||
}
|
||||
|
||||
int64_t DBImpl::TEST_MaxNextLevelOverlappingBytes(
|
||||
ColumnFamilyHandle* column_family) {
|
||||
ColumnFamilyData* cfd;
|
||||
|
@ -341,6 +341,7 @@ void DBIter::Next() {
|
||||
//
|
||||
// NOTE: In between, saved_key_ can point to a user key that has
|
||||
// a delete marker or a sequence number higher than sequence_
|
||||
// saved_key_ MUST have a proper user_key before calling this function
|
||||
//
|
||||
// The prefix_check parameter controls whether we check the iterated
|
||||
// keys against the prefix of the seeked key. Set to false when
|
||||
@ -946,7 +947,6 @@ void DBIter::Seek(const Slice& target) {
|
||||
StopWatch sw(env_, statistics_, DB_SEEK);
|
||||
ReleaseTempPinnedData();
|
||||
saved_key_.Clear();
|
||||
// now saved_key is used to store internal key.
|
||||
saved_key_.SetInternalKey(target, sequence_);
|
||||
|
||||
{
|
||||
@ -961,6 +961,9 @@ void DBIter::Seek(const Slice& target) {
|
||||
}
|
||||
direction_ = kForward;
|
||||
ClearSavedValue();
|
||||
// convert the InternalKey to UserKey in saved_key_ before
|
||||
// passed to FindNextUserEntry
|
||||
saved_key_.Reserve(saved_key_.Size() - 8);
|
||||
FindNextUserEntry(false /* not skipping */, prefix_same_as_start_);
|
||||
if (!valid_) {
|
||||
prefix_start_key_.clear();
|
||||
@ -1039,6 +1042,8 @@ void DBIter::SeekToFirst() {
|
||||
|
||||
RecordTick(statistics_, NUMBER_DB_SEEK);
|
||||
if (iter_->Valid()) {
|
||||
saved_key_.SetKey(ExtractUserKey(iter_->key()),
|
||||
!iter_->IsKeyPinned() || !pin_thru_lifetime_ /* copy */);
|
||||
FindNextUserEntry(false /* not skipping */, false /* no prefix check */);
|
||||
if (statistics_ != nullptr) {
|
||||
if (valid_) {
|
||||
|
@ -1897,6 +1897,66 @@ TEST_F(DBIteratorTest, DBIterator12) {
|
||||
ASSERT_FALSE(db_iter->Valid());
|
||||
}
|
||||
|
||||
TEST_F(DBIteratorTest, DBIterator13) {
|
||||
Options options;
|
||||
options.merge_operator = nullptr;
|
||||
|
||||
std::string key;
|
||||
key.resize(9);
|
||||
key.assign(9, static_cast<char>(0));
|
||||
key[0] = 'b';
|
||||
|
||||
TestIterator* internal_iter = new TestIterator(BytewiseComparator());
|
||||
internal_iter->AddPut(key, "0");
|
||||
internal_iter->AddPut(key, "1");
|
||||
internal_iter->AddPut(key, "2");
|
||||
internal_iter->AddPut(key, "3");
|
||||
internal_iter->AddPut(key, "4");
|
||||
internal_iter->AddPut(key, "5");
|
||||
internal_iter->AddPut(key, "6");
|
||||
internal_iter->AddPut(key, "7");
|
||||
internal_iter->AddPut(key, "8");
|
||||
internal_iter->Finish();
|
||||
|
||||
std::unique_ptr<Iterator> db_iter(
|
||||
NewDBIterator(env_, ImmutableCFOptions(options), BytewiseComparator(),
|
||||
internal_iter, 2, 3, 0));
|
||||
db_iter->Seek("b");
|
||||
ASSERT_TRUE(db_iter->Valid());
|
||||
ASSERT_EQ(db_iter->key().ToString(), key);
|
||||
ASSERT_EQ(db_iter->value().ToString(), "2");
|
||||
}
|
||||
|
||||
TEST_F(DBIteratorTest, DBIterator14) {
|
||||
Options options;
|
||||
options.merge_operator = nullptr;
|
||||
|
||||
std::string key("b");
|
||||
TestIterator* internal_iter = new TestIterator(BytewiseComparator());
|
||||
internal_iter->AddPut("b", "0");
|
||||
internal_iter->AddPut("b", "1");
|
||||
internal_iter->AddPut("b", "2");
|
||||
internal_iter->AddPut("b", "3");
|
||||
internal_iter->AddPut("a", "4");
|
||||
internal_iter->AddPut("a", "5");
|
||||
internal_iter->AddPut("a", "6");
|
||||
internal_iter->AddPut("c", "7");
|
||||
internal_iter->AddPut("c", "8");
|
||||
internal_iter->AddPut("c", "9");
|
||||
internal_iter->Finish();
|
||||
|
||||
std::unique_ptr<Iterator> db_iter(
|
||||
NewDBIterator(env_, ImmutableCFOptions(options), BytewiseComparator(),
|
||||
internal_iter, 4, 1, 0));
|
||||
db_iter->Seek("b");
|
||||
ASSERT_TRUE(db_iter->Valid());
|
||||
ASSERT_EQ(db_iter->key().ToString(), "b");
|
||||
ASSERT_EQ(db_iter->value().ToString(), "3");
|
||||
db_iter->SeekToFirst();
|
||||
ASSERT_EQ(db_iter->key().ToString(), "a");
|
||||
ASSERT_EQ(db_iter->value().ToString(), "4");
|
||||
}
|
||||
|
||||
class DBIterWithMergeIterTest : public testing::Test {
|
||||
public:
|
||||
DBIterWithMergeIterTest()
|
||||
|
@ -112,6 +112,81 @@ TEST_F(DBRangeDelTest, CompactionOutputFilesExactlyFilled) {
|
||||
db_->ReleaseSnapshot(snapshot);
|
||||
}
|
||||
|
||||
TEST_F(DBRangeDelTest, MaxCompactionBytesCutsOutputFiles) {
|
||||
// Ensures range deletion spanning multiple compaction output files that are
|
||||
// cut by max_compaction_bytes will have non-overlapping key-ranges.
|
||||
// https://github.com/facebook/rocksdb/issues/1778
|
||||
const int kNumFiles = 2, kNumPerFile = 1 << 8, kBytesPerVal = 1 << 12;
|
||||
Options opts = CurrentOptions();
|
||||
opts.comparator = test::Uint64Comparator();
|
||||
opts.disable_auto_compactions = true;
|
||||
opts.level0_file_num_compaction_trigger = kNumFiles;
|
||||
opts.max_compaction_bytes = kNumPerFile * kBytesPerVal;
|
||||
opts.memtable_factory.reset(new SpecialSkipListFactory(kNumPerFile));
|
||||
// Want max_compaction_bytes to trigger the end of compaction output file, not
|
||||
// target_file_size_base, so make the latter much bigger
|
||||
opts.target_file_size_base = 100 * opts.max_compaction_bytes;
|
||||
Reopen(opts);
|
||||
|
||||
// snapshot protects range tombstone from dropping due to becoming obsolete.
|
||||
const Snapshot* snapshot = db_->GetSnapshot();
|
||||
|
||||
// It spans the whole key-range, thus will be included in all output files
|
||||
ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(),
|
||||
GetNumericStr(0),
|
||||
GetNumericStr(kNumFiles * kNumPerFile - 1)));
|
||||
Random rnd(301);
|
||||
for (int i = 0; i < kNumFiles; ++i) {
|
||||
std::vector<std::string> values;
|
||||
// Write 1MB (256 values, each 4K)
|
||||
for (int j = 0; j < kNumPerFile; j++) {
|
||||
values.push_back(RandomString(&rnd, kBytesPerVal));
|
||||
ASSERT_OK(Put(GetNumericStr(kNumPerFile * i + j), values[j]));
|
||||
}
|
||||
// extra entry to trigger SpecialSkipListFactory's flush
|
||||
ASSERT_OK(Put(GetNumericStr(kNumPerFile), ""));
|
||||
dbfull()->TEST_WaitForFlushMemTable();
|
||||
ASSERT_EQ(i + 1, NumTableFilesAtLevel(0));
|
||||
}
|
||||
|
||||
dbfull()->TEST_CompactRange(0, nullptr, nullptr, nullptr,
|
||||
true /* disallow_trivial_move */);
|
||||
ASSERT_EQ(0, NumTableFilesAtLevel(0));
|
||||
ASSERT_GE(NumTableFilesAtLevel(1), 2);
|
||||
|
||||
std::vector<std::vector<FileMetaData>> files;
|
||||
dbfull()->TEST_GetFilesMetaData(db_->DefaultColumnFamily(), &files);
|
||||
|
||||
for (size_t i = 0; i < files[1].size() - 1; ++i) {
|
||||
ASSERT_TRUE(InternalKeyComparator(opts.comparator)
|
||||
.Compare(files[1][i].largest, files[1][i + 1].smallest) <
|
||||
0);
|
||||
}
|
||||
db_->ReleaseSnapshot(snapshot);
|
||||
}
|
||||
|
||||
TEST_F(DBRangeDelTest, SentinelsOmittedFromOutputFile) {
|
||||
// Regression test for bug where sentinel range deletions (i.e., ones with
|
||||
// sequence number of zero) were included in output files.
|
||||
// snapshot protects range tombstone from dropping due to becoming obsolete.
|
||||
const Snapshot* snapshot = db_->GetSnapshot();
|
||||
|
||||
// gaps between ranges creates sentinels in our internal representation
|
||||
std::vector<std::pair<std::string, std::string>> range_dels = {{"a", "b"}, {"c", "d"}, {"e", "f"}};
|
||||
for (const auto& range_del : range_dels) {
|
||||
ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(),
|
||||
range_del.first, range_del.second));
|
||||
}
|
||||
ASSERT_OK(db_->Flush(FlushOptions()));
|
||||
ASSERT_EQ(1, NumTableFilesAtLevel(0));
|
||||
|
||||
std::vector<std::vector<FileMetaData>> files;
|
||||
dbfull()->TEST_GetFilesMetaData(db_->DefaultColumnFamily(), &files);
|
||||
ASSERT_GT(files[0][0].smallest_seqno, 0);
|
||||
|
||||
db_->ReleaseSnapshot(snapshot);
|
||||
}
|
||||
|
||||
TEST_F(DBRangeDelTest, FlushRangeDelsSameStartKey) {
|
||||
db_->Put(WriteOptions(), "b1", "val");
|
||||
ASSERT_OK(
|
||||
@ -475,6 +550,36 @@ TEST_F(DBRangeDelTest, GetCoveredKeyFromSst) {
|
||||
db_->ReleaseSnapshot(snapshot);
|
||||
}
|
||||
|
||||
TEST_F(DBRangeDelTest, GetCoveredMergeOperandFromMemtable) {
|
||||
const int kNumMergeOps = 10;
|
||||
Options opts = CurrentOptions();
|
||||
opts.merge_operator = MergeOperators::CreateUInt64AddOperator();
|
||||
Reopen(opts);
|
||||
|
||||
for (int i = 0; i < kNumMergeOps; ++i) {
|
||||
std::string val;
|
||||
PutFixed64(&val, i);
|
||||
db_->Merge(WriteOptions(), "key", val);
|
||||
if (i == kNumMergeOps / 2) {
|
||||
// deletes [0, 5]
|
||||
db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), "key",
|
||||
"key_");
|
||||
}
|
||||
}
|
||||
|
||||
ReadOptions read_opts;
|
||||
std::string expected, actual;
|
||||
ASSERT_OK(db_->Get(read_opts, "key", &actual));
|
||||
PutFixed64(&expected, 30); // 6+7+8+9
|
||||
ASSERT_EQ(expected, actual);
|
||||
|
||||
expected.clear();
|
||||
read_opts.ignore_range_deletions = true;
|
||||
ASSERT_OK(db_->Get(read_opts, "key", &actual));
|
||||
PutFixed64(&expected, 45); // 0+1+2+...+9
|
||||
ASSERT_EQ(expected, actual);
|
||||
}
|
||||
|
||||
TEST_F(DBRangeDelTest, GetIgnoresRangeDeletions) {
|
||||
Options opts = CurrentOptions();
|
||||
opts.max_write_buffer_number = 4;
|
||||
|
@ -2216,8 +2216,40 @@ TEST_F(DBTest2, ManualCompactionOverlapManualCompaction) {
|
||||
|
||||
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
|
||||
}
|
||||
|
||||
TEST_F(DBTest2, OptimizeForPointLookup) {
|
||||
Options options = CurrentOptions();
|
||||
Close();
|
||||
options.OptimizeForPointLookup(2);
|
||||
ASSERT_OK(DB::Open(options, dbname_, &db_));
|
||||
|
||||
ASSERT_OK(Put("foo", "v1"));
|
||||
ASSERT_EQ("v1", Get("foo"));
|
||||
Flush();
|
||||
ASSERT_EQ("v1", Get("foo"));
|
||||
}
|
||||
#endif // ROCKSDB_LITE
|
||||
|
||||
TEST_F(DBTest2, DirectIO) {
|
||||
if (!IsDirectIOSupported()) {
|
||||
return;
|
||||
}
|
||||
Options options = CurrentOptions();
|
||||
options.use_direct_reads = options.use_direct_writes = true;
|
||||
options.allow_mmap_reads = options.allow_mmap_writes = false;
|
||||
DestroyAndReopen(options);
|
||||
|
||||
ASSERT_OK(Put(Key(0), "a"));
|
||||
ASSERT_OK(Put(Key(5), "a"));
|
||||
ASSERT_OK(Flush());
|
||||
|
||||
ASSERT_OK(Put(Key(10), "a"));
|
||||
ASSERT_OK(Put(Key(15), "a"));
|
||||
ASSERT_OK(Flush());
|
||||
|
||||
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
|
||||
Reopen(options);
|
||||
}
|
||||
} // namespace rocksdb
|
||||
|
||||
int main(int argc, char** argv) {
|
||||
|
@ -231,6 +231,20 @@ class SpecialEnv : public EnvWrapper {
|
||||
return base_->Append(data);
|
||||
}
|
||||
}
|
||||
Status PositionedAppend(const Slice& data, uint64_t offset) override {
|
||||
if (env_->table_write_callback_) {
|
||||
(*env_->table_write_callback_)();
|
||||
}
|
||||
if (env_->drop_writes_.load(std::memory_order_acquire)) {
|
||||
// Drop writes on the floor
|
||||
return Status::OK();
|
||||
} else if (env_->no_space_.load(std::memory_order_acquire)) {
|
||||
return Status::NoSpace("No space left on device");
|
||||
} else {
|
||||
env_->bytes_written_ += data.size();
|
||||
return base_->PositionedAppend(data, offset);
|
||||
}
|
||||
}
|
||||
Status Truncate(uint64_t size) override { return base_->Truncate(size); }
|
||||
Status Close() override {
|
||||
// SyncPoint is not supported in Released Windows Mode.
|
||||
@ -257,6 +271,9 @@ class SpecialEnv : public EnvWrapper {
|
||||
Env::IOPriority GetIOPriority() override {
|
||||
return base_->GetIOPriority();
|
||||
}
|
||||
bool use_direct_io() const override {
|
||||
return base_->use_direct_io();
|
||||
}
|
||||
};
|
||||
class ManifestFile : public WritableFile {
|
||||
public:
|
||||
@ -358,7 +375,14 @@ class SpecialEnv : public EnvWrapper {
|
||||
return Status::IOError("simulated write error");
|
||||
}
|
||||
|
||||
Status s = target()->NewWritableFile(f, r, soptions);
|
||||
EnvOptions optimized = soptions;
|
||||
if (strstr(f.c_str(), "MANIFEST") != nullptr ||
|
||||
strstr(f.c_str(), "log") != nullptr) {
|
||||
optimized.use_mmap_writes = false;
|
||||
optimized.use_direct_writes = false;
|
||||
}
|
||||
|
||||
Status s = target()->NewWritableFile(f, r, optimized);
|
||||
if (s.ok()) {
|
||||
if (strstr(f.c_str(), ".sst") != nullptr) {
|
||||
r->reset(new SSTableFile(this, std::move(*r)));
|
||||
|
@ -230,6 +230,12 @@ Status FlushJob::Run(FileMetaData* file_meta) {
|
||||
return s;
|
||||
}
|
||||
|
||||
void FlushJob::Cancel() {
|
||||
db_mutex_->AssertHeld();
|
||||
assert(base_ != nullptr);
|
||||
base_->Unref();
|
||||
}
|
||||
|
||||
Status FlushJob::WriteLevel0Table() {
|
||||
AutoThreadOperationStageUpdater stage_updater(
|
||||
ThreadStatus::STAGE_FLUSH_WRITE_L0);
|
||||
|
@ -67,9 +67,11 @@ class FlushJob {
|
||||
|
||||
~FlushJob();
|
||||
|
||||
// Require db_mutex held
|
||||
// Require db_mutex held.
|
||||
// Once PickMemTable() is called, either Run() or Cancel() has to be call.
|
||||
void PickMemTable();
|
||||
Status Run(FileMetaData* file_meta = nullptr);
|
||||
void Cancel();
|
||||
TableProperties GetTableProperties() const { return table_properties_; }
|
||||
|
||||
private:
|
||||
|
@ -865,7 +865,7 @@ void InternalStats::DumpCFMapStats(std::map<std::string, double>* cf_stats) {
|
||||
}
|
||||
}
|
||||
|
||||
int InternalStats::DumpCFMapStats(
|
||||
void InternalStats::DumpCFMapStats(
|
||||
std::map<int, std::map<LevelStatType, double>>* levels_stats,
|
||||
CompactionStats* compaction_stats_sum) {
|
||||
const VersionStorageInfo* vstorage = cfd_->current()->storage_info();
|
||||
@ -925,7 +925,6 @@ int InternalStats::DumpCFMapStats(
|
||||
PrepareLevelStats(&sum_stats, total_files, total_files_being_compacted,
|
||||
total_file_size, 0, w_amp, *compaction_stats_sum);
|
||||
(*levels_stats)[-1] = sum_stats; // -1 is for the Sum level
|
||||
return num_levels_to_check;
|
||||
}
|
||||
|
||||
void InternalStats::DumpCFStats(std::string* value) {
|
||||
@ -937,8 +936,8 @@ void InternalStats::DumpCFStats(std::string* value) {
|
||||
// Print stats for each level
|
||||
std::map<int, std::map<LevelStatType, double>> levels_stats;
|
||||
CompactionStats compaction_stats_sum(0);
|
||||
int levels = DumpCFMapStats(&levels_stats, &compaction_stats_sum);
|
||||
for (int l = 0; l < levels; ++l) {
|
||||
DumpCFMapStats(&levels_stats, &compaction_stats_sum);
|
||||
for (int l = 0; l < number_levels_; ++l) {
|
||||
if (levels_stats.find(l) != levels_stats.end()) {
|
||||
PrintLevelStats(buf, sizeof(buf), "L" + ToString(l), levels_stats[l]);
|
||||
value->append(buf);
|
||||
|
@ -273,7 +273,7 @@ class InternalStats {
|
||||
private:
|
||||
void DumpDBStats(std::string* value);
|
||||
void DumpCFMapStats(std::map<std::string, double>* cf_stats);
|
||||
int DumpCFMapStats(
|
||||
void DumpCFMapStats(
|
||||
std::map<int, std::map<LevelStatType, double>>* level_stats,
|
||||
CompactionStats* compaction_stats_sum);
|
||||
void DumpCFStats(std::string* value);
|
||||
|
@ -560,7 +560,7 @@ static bool SaveValue(void* arg, const char* entry) {
|
||||
ValueType type;
|
||||
UnPackSequenceAndType(tag, &s->seq, &type);
|
||||
|
||||
if ((type == kTypeValue || type == kTypeDeletion) &&
|
||||
if ((type == kTypeValue || type == kTypeMerge) &&
|
||||
range_del_agg->ShouldDelete(Slice(key_ptr, key_length))) {
|
||||
type = kTypeRangeDeletion;
|
||||
}
|
||||
|
@ -221,6 +221,8 @@ class MemTableList {
|
||||
// PickMemtablesToFlush() is called.
|
||||
void FlushRequested() { flush_requested_ = true; }
|
||||
|
||||
bool HasFlushRequested() { return flush_requested_; }
|
||||
|
||||
// Copying allowed
|
||||
// MemTableList(const MemTableList&);
|
||||
// void operator=(const MemTableList&);
|
||||
|
@ -420,9 +420,10 @@ void RangeDelAggregator::AddToBuilder(
|
||||
RangeTombstone tombstone;
|
||||
if (collapse_deletions_) {
|
||||
auto next_tombstone_map_iter = std::next(tombstone_map_iter);
|
||||
if (next_tombstone_map_iter == stripe_map_iter->second.raw_map.end()) {
|
||||
// it's the sentinel tombstone
|
||||
break;
|
||||
if (next_tombstone_map_iter == stripe_map_iter->second.raw_map.end() ||
|
||||
tombstone_map_iter->second.seq_ == 0) {
|
||||
// it's a sentinel tombstone
|
||||
continue;
|
||||
}
|
||||
tombstone.start_key_ = tombstone_map_iter->first;
|
||||
tombstone.end_key_ = next_tombstone_map_iter->first;
|
||||
|
@ -520,6 +520,8 @@ class Version {
|
||||
return next_;
|
||||
}
|
||||
|
||||
int TEST_refs() const { return refs_; }
|
||||
|
||||
VersionStorageInfo* storage_info() { return &storage_info_; }
|
||||
|
||||
VersionSet* version_set() { return vset_; }
|
||||
|
@ -317,7 +317,7 @@ void WriteThread::LaunchParallelFollowers(ParallelGroup* pg,
|
||||
|
||||
while (w != pg->last_writer) {
|
||||
// Writers that won't write don't get sequence allotment
|
||||
if (!w->CallbackFailed()) {
|
||||
if (!w->CallbackFailed() && w->ShouldWriteToMemtable()) {
|
||||
sequence += WriteBatchInternal::Count(w->batch);
|
||||
}
|
||||
w = w->link_newer;
|
||||
|
@ -5,8 +5,8 @@
|
||||
#pragma once
|
||||
|
||||
#define ROCKSDB_MAJOR 5
|
||||
#define ROCKSDB_MINOR 0
|
||||
#define ROCKSDB_PATCH 0
|
||||
#define ROCKSDB_MINOR 1
|
||||
#define ROCKSDB_PATCH 5
|
||||
|
||||
// Do not use these. We made the mistake of declaring macros starting with
|
||||
// double underscore. Now we have to live with our choice. We'll deprecate these
|
||||
|
@ -149,7 +149,7 @@ Status TestWritableFile::Flush() {
|
||||
|
||||
Status TestWritableFile::Sync() {
|
||||
if (!env_->IsFilesystemActive()) {
|
||||
return Status::OK();
|
||||
return Status::IOError("FaultInjectionTestEnv: not active");
|
||||
}
|
||||
// No need to actual sync.
|
||||
state_.pos_at_last_sync_ = state_.pos_;
|
||||
|
@ -468,13 +468,15 @@ class ReadaheadRandomAccessFile : public RandomAccessFile {
|
||||
ReadaheadRandomAccessFile(std::unique_ptr<RandomAccessFile>&& file,
|
||||
size_t readahead_size)
|
||||
: file_(std::move(file)),
|
||||
readahead_size_(readahead_size),
|
||||
alignment_(file_->GetRequiredBufferAlignment()),
|
||||
readahead_size_(Roundup(readahead_size, alignment_)),
|
||||
forward_calls_(file_->ShouldForwardRawRequest()),
|
||||
buffer_(),
|
||||
buffer_offset_(0),
|
||||
buffer_len_(0) {
|
||||
if (!forward_calls_) {
|
||||
buffer_.reset(new char[readahead_size_]);
|
||||
buffer_.Alignment(alignment_);
|
||||
buffer_.AllocateNewBuffer(readahead_size_ + alignment_);
|
||||
} else if (readahead_size_ > 0) {
|
||||
file_->EnableReadAhead();
|
||||
}
|
||||
@ -500,31 +502,45 @@ class ReadaheadRandomAccessFile : public RandomAccessFile {
|
||||
|
||||
std::unique_lock<std::mutex> lk(lock_);
|
||||
|
||||
size_t copied = 0;
|
||||
// if offset between [buffer_offset_, buffer_offset_ + buffer_len>
|
||||
if (offset >= buffer_offset_ && offset < buffer_len_ + buffer_offset_) {
|
||||
uint64_t offset_in_buffer = offset - buffer_offset_;
|
||||
copied = std::min(buffer_len_ - static_cast<size_t>(offset_in_buffer), n);
|
||||
memcpy(scratch, buffer_.get() + offset_in_buffer, copied);
|
||||
if (copied == n) {
|
||||
// fully cached
|
||||
*result = Slice(scratch, n);
|
||||
return Status::OK();
|
||||
}
|
||||
size_t cached_len = 0;
|
||||
// Check if there is a cache hit, means that [offset, offset + n) is either
|
||||
// complitely or partially in the buffer
|
||||
// If it's completely cached, including end of file case when offset + n is
|
||||
// greater than EOF, return
|
||||
if (TryReadFromCache_(offset, n, &cached_len, scratch) &&
|
||||
(cached_len == n ||
|
||||
// End of file
|
||||
buffer_len_ < readahead_size_ + alignment_)) {
|
||||
*result = Slice(scratch, cached_len);
|
||||
return Status::OK();
|
||||
}
|
||||
size_t advanced_offset = offset + cached_len;
|
||||
// In the case of cache hit advanced_offset is already aligned, means that
|
||||
// chunk_offset equals to advanced_offset
|
||||
size_t chunk_offset = TruncateToPageBoundary(alignment_, advanced_offset);
|
||||
Slice readahead_result;
|
||||
Status s = file_->Read(offset + copied, readahead_size_, &readahead_result,
|
||||
buffer_.get());
|
||||
Status s = file_->Read(chunk_offset, readahead_size_ + alignment_,
|
||||
&readahead_result, buffer_.BufferStart());
|
||||
if (!s.ok()) {
|
||||
return s;
|
||||
}
|
||||
// In the case of cache miss, i.e. when cached_len equals 0, an offset can
|
||||
// exceed the file end position, so the following check is required
|
||||
if (advanced_offset < chunk_offset + readahead_result.size()) {
|
||||
// In the case of cache miss, the first chunk_padding bytes in buffer_ are
|
||||
// stored for alignment only and must be skipped
|
||||
size_t chunk_padding = advanced_offset - chunk_offset;
|
||||
auto remaining_len =
|
||||
std::min(readahead_result.size() - chunk_padding, n - cached_len);
|
||||
memcpy(scratch + cached_len, readahead_result.data() + chunk_padding,
|
||||
remaining_len);
|
||||
*result = Slice(scratch, cached_len + remaining_len);
|
||||
} else {
|
||||
*result = Slice(scratch, cached_len);
|
||||
}
|
||||
|
||||
auto left_to_copy = std::min(readahead_result.size(), n - copied);
|
||||
memcpy(scratch + copied, readahead_result.data(), left_to_copy);
|
||||
*result = Slice(scratch, copied + left_to_copy);
|
||||
|
||||
if (readahead_result.data() == buffer_.get()) {
|
||||
buffer_offset_ = offset + copied;
|
||||
if (readahead_result.data() == buffer_.BufferStart()) {
|
||||
buffer_offset_ = chunk_offset;
|
||||
buffer_len_ = readahead_result.size();
|
||||
} else {
|
||||
buffer_len_ = 0;
|
||||
@ -543,13 +559,31 @@ class ReadaheadRandomAccessFile : public RandomAccessFile {
|
||||
return file_->InvalidateCache(offset, length);
|
||||
}
|
||||
|
||||
virtual bool use_direct_io() const override {
|
||||
return file_->use_direct_io();
|
||||
}
|
||||
|
||||
private:
|
||||
bool TryReadFromCache_(uint64_t offset, size_t n, size_t* cached_len,
|
||||
char* scratch) const {
|
||||
if (offset < buffer_offset_ || offset >= buffer_offset_ + buffer_len_) {
|
||||
*cached_len = 0;
|
||||
return false;
|
||||
}
|
||||
uint64_t offset_in_buffer = offset - buffer_offset_;
|
||||
*cached_len =
|
||||
std::min(buffer_len_ - static_cast<size_t>(offset_in_buffer), n);
|
||||
memcpy(scratch, buffer_.BufferStart() + offset_in_buffer, *cached_len);
|
||||
return true;
|
||||
}
|
||||
|
||||
std::unique_ptr<RandomAccessFile> file_;
|
||||
const size_t alignment_;
|
||||
size_t readahead_size_;
|
||||
const bool forward_calls_;
|
||||
|
||||
mutable std::mutex lock_;
|
||||
mutable std::unique_ptr<char[]> buffer_;
|
||||
mutable AlignedBuffer buffer_;
|
||||
mutable uint64_t buffer_offset_;
|
||||
mutable size_t buffer_len_;
|
||||
};
|
||||
|
@ -3,10 +3,12 @@
|
||||
// LICENSE file in the root directory of this source tree. An additional grant
|
||||
// of patent rights can be found in the PATENTS file in the same directory.
|
||||
//
|
||||
#include <vector>
|
||||
#include "util/file_reader_writer.h"
|
||||
#include <algorithm>
|
||||
#include <vector>
|
||||
#include "util/random.h"
|
||||
#include "util/testharness.h"
|
||||
#include "util/testutil.h"
|
||||
|
||||
namespace rocksdb {
|
||||
|
||||
@ -125,6 +127,108 @@ TEST_F(WritableFileWriterTest, AppendStatusReturn) {
|
||||
ASSERT_NOK(writer->Append(std::string(2 * kMb, 'b')));
|
||||
}
|
||||
|
||||
class ReadaheadRandomAccessFileTest
|
||||
: public testing::Test,
|
||||
public testing::WithParamInterface<size_t> {
|
||||
public:
|
||||
static std::vector<size_t> GetReadaheadSizeList() {
|
||||
return {1lu << 12, 1lu << 16};
|
||||
}
|
||||
virtual void SetUp() override {
|
||||
readahead_size_ = GetParam();
|
||||
scratch_.reset(new char[2 * readahead_size_]);
|
||||
ResetSourceStr();
|
||||
}
|
||||
ReadaheadRandomAccessFileTest() : control_contents_() {}
|
||||
std::string Read(uint64_t offset, size_t n) {
|
||||
Slice result;
|
||||
test_read_holder_->Read(offset, n, &result, scratch_.get());
|
||||
return std::string(result.data(), result.size());
|
||||
}
|
||||
void ResetSourceStr(const std::string& str = "") {
|
||||
auto write_holder = std::unique_ptr<WritableFileWriter>(
|
||||
test::GetWritableFileWriter(new test::StringSink(&control_contents_)));
|
||||
write_holder->Append(Slice(str));
|
||||
write_holder->Flush();
|
||||
auto read_holder = std::unique_ptr<RandomAccessFile>(
|
||||
new test::StringSource(control_contents_));
|
||||
test_read_holder_ =
|
||||
NewReadaheadRandomAccessFile(std::move(read_holder), readahead_size_);
|
||||
}
|
||||
size_t GetReadaheadSize() const { return readahead_size_; }
|
||||
|
||||
private:
|
||||
size_t readahead_size_;
|
||||
Slice control_contents_;
|
||||
std::unique_ptr<RandomAccessFile> test_read_holder_;
|
||||
std::unique_ptr<char[]> scratch_;
|
||||
};
|
||||
|
||||
TEST_P(ReadaheadRandomAccessFileTest, EmptySourceStrTest) {
|
||||
ASSERT_EQ("", Read(0, 1));
|
||||
ASSERT_EQ("", Read(0, 0));
|
||||
ASSERT_EQ("", Read(13, 13));
|
||||
}
|
||||
|
||||
TEST_P(ReadaheadRandomAccessFileTest, SourceStrLenLessThanReadaheadSizeTest) {
|
||||
std::string str = "abcdefghijklmnopqrs";
|
||||
ResetSourceStr(str);
|
||||
ASSERT_EQ(str.substr(3, 4), Read(3, 4));
|
||||
ASSERT_EQ(str.substr(0, 3), Read(0, 3));
|
||||
ASSERT_EQ(str, Read(0, str.size()));
|
||||
ASSERT_EQ(str.substr(7, std::min(static_cast<int>(str.size()) - 7, 30)),
|
||||
Read(7, 30));
|
||||
ASSERT_EQ("", Read(100, 100));
|
||||
}
|
||||
|
||||
TEST_P(ReadaheadRandomAccessFileTest,
|
||||
SourceStrLenCanBeGreaterThanReadaheadSizeTest) {
|
||||
Random rng(42);
|
||||
for (int k = 0; k < 100; ++k) {
|
||||
size_t strLen = k * GetReadaheadSize() +
|
||||
rng.Uniform(static_cast<int>(GetReadaheadSize()));
|
||||
std::string str =
|
||||
test::RandomHumanReadableString(&rng, static_cast<int>(strLen));
|
||||
ResetSourceStr(str);
|
||||
for (int test = 1; test <= 100; ++test) {
|
||||
size_t offset = rng.Uniform(static_cast<int>(strLen));
|
||||
size_t n = rng.Uniform(static_cast<int>(GetReadaheadSize()));
|
||||
ASSERT_EQ(str.substr(offset, std::min(n, str.size() - offset)),
|
||||
Read(offset, n));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
TEST_P(ReadaheadRandomAccessFileTest, NExceedReadaheadTest) {
|
||||
Random rng(7);
|
||||
size_t strLen = 4 * GetReadaheadSize() +
|
||||
rng.Uniform(static_cast<int>(GetReadaheadSize()));
|
||||
std::string str =
|
||||
test::RandomHumanReadableString(&rng, static_cast<int>(strLen));
|
||||
ResetSourceStr(str);
|
||||
for (int test = 1; test <= 100; ++test) {
|
||||
size_t offset = rng.Uniform(static_cast<int>(strLen));
|
||||
size_t n =
|
||||
GetReadaheadSize() + rng.Uniform(static_cast<int>(GetReadaheadSize()));
|
||||
ASSERT_EQ(str.substr(offset, std::min(n, str.size() - offset)),
|
||||
Read(offset, n));
|
||||
}
|
||||
}
|
||||
|
||||
INSTANTIATE_TEST_CASE_P(
|
||||
EmptySourceStrTest, ReadaheadRandomAccessFileTest,
|
||||
::testing::ValuesIn(ReadaheadRandomAccessFileTest::GetReadaheadSizeList()));
|
||||
INSTANTIATE_TEST_CASE_P(
|
||||
SourceStrLenLessThanReadaheadSizeTest, ReadaheadRandomAccessFileTest,
|
||||
::testing::ValuesIn(ReadaheadRandomAccessFileTest::GetReadaheadSizeList()));
|
||||
INSTANTIATE_TEST_CASE_P(
|
||||
SourceStrLenCanBeGreaterThanReadaheadSizeTest,
|
||||
ReadaheadRandomAccessFileTest,
|
||||
::testing::ValuesIn(ReadaheadRandomAccessFileTest::GetReadaheadSizeList()));
|
||||
INSTANTIATE_TEST_CASE_P(
|
||||
NExceedReadaheadTest, ReadaheadRandomAccessFileTest,
|
||||
::testing::ValuesIn(ReadaheadRandomAccessFileTest::GetReadaheadSizeList()));
|
||||
|
||||
} // namespace rocksdb
|
||||
|
||||
int main(int argc, char** argv) {
|
||||
|
@ -265,11 +265,6 @@ Status PosixRandomAccessFile::Read(uint64_t offset, size_t n, Slice* result,
|
||||
// An error: return a non-ok status
|
||||
s = IOError(filename_, errno);
|
||||
}
|
||||
if (!use_direct_io()) {
|
||||
// we need to fadvise away the entire range of pages because
|
||||
// we do not want readahead pages to be cached.
|
||||
Fadvise(fd_, 0, 0, POSIX_FADV_DONTNEED); // free OS pages
|
||||
}
|
||||
*result = Slice(scratch, (r < 0) ? 0 : n - left);
|
||||
return s;
|
||||
}
|
||||
@ -661,6 +656,17 @@ Status PosixWritableFile::PositionedAppend(const Slice& data, uint64_t offset) {
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status PosixWritableFile::Truncate(uint64_t size) {
|
||||
Status s;
|
||||
int r = ftruncate(fd_, size);
|
||||
if (r < 0) {
|
||||
s = IOError(filename_, errno);
|
||||
} else {
|
||||
filesize_ = size;
|
||||
}
|
||||
return s;
|
||||
}
|
||||
|
||||
Status PosixWritableFile::Close() {
|
||||
Status s;
|
||||
|
||||
@ -760,7 +766,9 @@ Status PosixWritableFile::Allocate(uint64_t offset, uint64_t len) {
|
||||
return IOError(filename_, errno);
|
||||
}
|
||||
}
|
||||
#endif
|
||||
|
||||
#ifdef OS_LINUX
|
||||
Status PosixWritableFile::RangeSync(uint64_t offset, uint64_t nbytes) {
|
||||
assert(offset <= std::numeric_limits<off_t>::max());
|
||||
assert(nbytes <= std::numeric_limits<off_t>::max());
|
||||
|
@ -93,9 +93,9 @@ class PosixWritableFile : public WritableFile {
|
||||
const EnvOptions& options);
|
||||
virtual ~PosixWritableFile();
|
||||
|
||||
// Means Close() will properly take care of truncate
|
||||
// and it does not need any additional information
|
||||
virtual Status Truncate(uint64_t size) override { return Status::OK(); }
|
||||
// Need to implement this so the file is truncated correctly
|
||||
// with direct I/O
|
||||
virtual Status Truncate(uint64_t size) override;
|
||||
virtual Status Close() override;
|
||||
virtual Status Append(const Slice& data) override;
|
||||
virtual Status PositionedAppend(const Slice& data, uint64_t offset) override;
|
||||
@ -113,6 +113,8 @@ class PosixWritableFile : public WritableFile {
|
||||
virtual Status InvalidateCache(size_t offset, size_t length) override;
|
||||
#ifdef ROCKSDB_FALLOCATE_PRESENT
|
||||
virtual Status Allocate(uint64_t offset, uint64_t len) override;
|
||||
#endif
|
||||
#ifdef OS_LINUX
|
||||
virtual Status RangeSync(uint64_t offset, uint64_t nbytes) override;
|
||||
virtual size_t GetUniqueId(char* id, size_t max_size) const override;
|
||||
#endif
|
||||
|
@ -700,7 +700,7 @@ ColumnFamilyOptions* ColumnFamilyOptions::OptimizeForPointLookup(
|
||||
block_based_options.block_cache =
|
||||
NewLRUCache(static_cast<size_t>(block_cache_size_mb * 1024 * 1024));
|
||||
table_factory.reset(new BlockBasedTableFactory(block_based_options));
|
||||
memtable_factory.reset(NewHashLinkListRepFactory());
|
||||
memtable_prefix_bloom_size_ratio = 0.02;
|
||||
return this;
|
||||
}
|
||||
|
||||
|
@ -536,6 +536,10 @@ bool ParseOptionHelper(char* opt_address, const OptionType& opt_type,
|
||||
return ParseEnum<CompactionStyle>(
|
||||
compaction_style_string_map, value,
|
||||
reinterpret_cast<CompactionStyle*>(opt_address));
|
||||
case OptionType::kCompactionPri:
|
||||
return ParseEnum<CompactionPri>(
|
||||
compaction_pri_string_map, value,
|
||||
reinterpret_cast<CompactionPri*>(opt_address));
|
||||
case OptionType::kCompressionType:
|
||||
return ParseEnum<CompressionType>(
|
||||
compression_type_string_map, value,
|
||||
@ -617,6 +621,10 @@ bool SerializeSingleOptionHelper(const char* opt_address,
|
||||
return SerializeEnum<CompactionStyle>(
|
||||
compaction_style_string_map,
|
||||
*(reinterpret_cast<const CompactionStyle*>(opt_address)), value);
|
||||
case OptionType::kCompactionPri:
|
||||
return SerializeEnum<CompactionPri>(
|
||||
compaction_pri_string_map,
|
||||
*(reinterpret_cast<const CompactionPri*>(opt_address)), value);
|
||||
case OptionType::kCompressionType:
|
||||
return SerializeEnum<CompressionType>(
|
||||
compression_type_string_map,
|
||||
|
@ -93,6 +93,7 @@ enum class OptionType {
|
||||
kString,
|
||||
kDouble,
|
||||
kCompactionStyle,
|
||||
kCompactionPri,
|
||||
kSliceTransform,
|
||||
kCompressionType,
|
||||
kVectorCompressionType,
|
||||
@ -560,8 +561,10 @@ static std::unordered_map<std::string, OptionTypeInfo> cf_options_type_info = {
|
||||
OptionType::kMergeOperator, OptionVerificationType::kByName, false, 0}},
|
||||
{"compaction_style",
|
||||
{offsetof(struct ColumnFamilyOptions, compaction_style),
|
||||
OptionType::kCompactionStyle, OptionVerificationType::kNormal, false,
|
||||
0}}};
|
||||
OptionType::kCompactionStyle, OptionVerificationType::kNormal, false, 0}},
|
||||
{"compaction_pri",
|
||||
{offsetof(struct ColumnFamilyOptions, compaction_pri),
|
||||
OptionType::kCompactionPri, OptionVerificationType::kNormal, false, 0}}};
|
||||
|
||||
static std::unordered_map<std::string, OptionTypeInfo>
|
||||
block_based_table_type_info = {
|
||||
@ -688,6 +691,13 @@ static std::unordered_map<std::string, CompactionStyle>
|
||||
{"kCompactionStyleFIFO", kCompactionStyleFIFO},
|
||||
{"kCompactionStyleNone", kCompactionStyleNone}};
|
||||
|
||||
static std::unordered_map<std::string, CompactionPri>
|
||||
compaction_pri_string_map = {
|
||||
{"kByCompensatedSize", kByCompensatedSize},
|
||||
{"kOldestLargestSeqFirst", kOldestLargestSeqFirst},
|
||||
{"kOldestSmallestSeqFirst", kOldestSmallestSeqFirst},
|
||||
{"kMinOverlappingRatio", kMinOverlappingRatio}};
|
||||
|
||||
static std::unordered_map<std::string,
|
||||
WALRecoveryMode> wal_recovery_mode_string_map = {
|
||||
{"kTolerateCorruptedTailRecords",
|
||||
|
@ -542,6 +542,9 @@ bool AreEqualOptions(
|
||||
case OptionType::kCompactionStyle:
|
||||
return (*reinterpret_cast<const CompactionStyle*>(offset1) ==
|
||||
*reinterpret_cast<const CompactionStyle*>(offset2));
|
||||
case OptionType::kCompactionPri:
|
||||
return (*reinterpret_cast<const CompactionPri*>(offset1) ==
|
||||
*reinterpret_cast<const CompactionPri*>(offset2));
|
||||
case OptionType::kCompressionType:
|
||||
return (*reinterpret_cast<const CompressionType*>(offset1) ==
|
||||
*reinterpret_cast<const CompressionType*>(offset2));
|
||||
|
@ -372,7 +372,6 @@ TEST_F(OptionsSettableTest, ColumnFamilyOptionsAllFieldsSettable) {
|
||||
// Following options are not settable through
|
||||
// GetColumnFamilyOptionsFromString():
|
||||
options->rate_limit_delay_max_milliseconds = 33;
|
||||
options->compaction_pri = CompactionPri::kOldestSmallestSeqFirst;
|
||||
options->compaction_options_universal = CompactionOptionsUniversal();
|
||||
options->compression_opts = CompressionOptions();
|
||||
options->hard_rate_limit = 0;
|
||||
@ -432,6 +431,7 @@ TEST_F(OptionsSettableTest, ColumnFamilyOptionsAllFieldsSettable) {
|
||||
"level_compaction_dynamic_level_bytes=false;"
|
||||
"inplace_update_support=false;"
|
||||
"compaction_style=kCompactionStyleFIFO;"
|
||||
"compaction_pri=kMinOverlappingRatio;"
|
||||
"purge_redundant_kvs_while_flush=true;"
|
||||
"hard_pending_compaction_bytes_limit=0;"
|
||||
"disable_auto_compactions=false;"
|
||||
|
@ -77,6 +77,7 @@ TEST_F(OptionsTest, GetOptionsFromMapTest) {
|
||||
{"arena_block_size", "22"},
|
||||
{"disable_auto_compactions", "true"},
|
||||
{"compaction_style", "kCompactionStyleLevel"},
|
||||
{"compaction_pri", "kOldestSmallestSeqFirst"},
|
||||
{"verify_checksums_in_compaction", "false"},
|
||||
{"compaction_options_fifo", "23"},
|
||||
{"max_sequential_skip_in_iterations", "24"},
|
||||
@ -176,6 +177,7 @@ TEST_F(OptionsTest, GetOptionsFromMapTest) {
|
||||
ASSERT_EQ(new_cf_opt.disable_auto_compactions, true);
|
||||
ASSERT_EQ(new_cf_opt.compaction_style, kCompactionStyleLevel);
|
||||
ASSERT_EQ(new_cf_opt.verify_checksums_in_compaction, false);
|
||||
ASSERT_EQ(new_cf_opt.compaction_pri, kOldestSmallestSeqFirst);
|
||||
ASSERT_EQ(new_cf_opt.compaction_options_fifo.max_table_files_size,
|
||||
static_cast<uint64_t>(23));
|
||||
ASSERT_EQ(new_cf_opt.max_sequential_skip_in_iterations,
|
||||
|
@ -1355,6 +1355,108 @@ TEST_P(TransactionTest, TwoPhaseLogRollingTest) {
|
||||
delete cfb;
|
||||
}
|
||||
|
||||
TEST_P(TransactionTest, TwoPhaseLogRollingTest2) {
|
||||
DBImpl* db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
|
||||
|
||||
Status s;
|
||||
ColumnFamilyHandle *cfa, *cfb;
|
||||
|
||||
ColumnFamilyOptions cf_options;
|
||||
s = db->CreateColumnFamily(cf_options, "CFA", &cfa);
|
||||
ASSERT_OK(s);
|
||||
s = db->CreateColumnFamily(cf_options, "CFB", &cfb);
|
||||
ASSERT_OK(s);
|
||||
|
||||
WriteOptions wopts;
|
||||
wopts.disableWAL = false;
|
||||
wopts.sync = true;
|
||||
|
||||
auto cfh_a = reinterpret_cast<ColumnFamilyHandleImpl*>(cfa);
|
||||
auto cfh_b = reinterpret_cast<ColumnFamilyHandleImpl*>(cfb);
|
||||
|
||||
TransactionOptions topts1;
|
||||
Transaction* txn1 = db->BeginTransaction(wopts, topts1);
|
||||
s = txn1->SetName("xid1");
|
||||
ASSERT_OK(s);
|
||||
s = txn1->Put(cfa, "boys", "girls1");
|
||||
ASSERT_OK(s);
|
||||
|
||||
Transaction* txn2 = db->BeginTransaction(wopts, topts1);
|
||||
s = txn2->SetName("xid2");
|
||||
ASSERT_OK(s);
|
||||
s = txn2->Put(cfb, "up", "down1");
|
||||
ASSERT_OK(s);
|
||||
|
||||
// prepre transaction in LOG A
|
||||
s = txn1->Prepare();
|
||||
ASSERT_OK(s);
|
||||
|
||||
// prepre transaction in LOG A
|
||||
s = txn2->Prepare();
|
||||
ASSERT_OK(s);
|
||||
|
||||
// regular put so that mem table can actually be flushed for log rolling
|
||||
s = db->Put(wopts, "cats", "dogs1");
|
||||
ASSERT_OK(s);
|
||||
|
||||
auto prepare_log_no = txn1->GetLogNumber();
|
||||
|
||||
// roll to LOG B
|
||||
s = db_impl->TEST_FlushMemTable(true);
|
||||
ASSERT_OK(s);
|
||||
|
||||
// now we pause background work so that
|
||||
// imm()s are not flushed before we can check their status
|
||||
s = db_impl->PauseBackgroundWork();
|
||||
ASSERT_OK(s);
|
||||
|
||||
ASSERT_GT(db_impl->TEST_LogfileNumber(), prepare_log_no);
|
||||
ASSERT_GT(cfh_a->cfd()->GetLogNumber(), prepare_log_no);
|
||||
ASSERT_EQ(cfh_a->cfd()->GetLogNumber(), db_impl->TEST_LogfileNumber());
|
||||
ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(),
|
||||
prepare_log_no);
|
||||
ASSERT_EQ(db_impl->TEST_FindMinPrepLogReferencedByMemTable(), 0);
|
||||
|
||||
// commit in LOG B
|
||||
s = txn1->Commit();
|
||||
ASSERT_OK(s);
|
||||
|
||||
ASSERT_EQ(db_impl->TEST_FindMinPrepLogReferencedByMemTable(), prepare_log_no);
|
||||
|
||||
ASSERT_TRUE(!db_impl->TEST_UnableToFlushOldestLog());
|
||||
|
||||
// request a flush for all column families such that the earliest
|
||||
// alive log file can be killed
|
||||
db_impl->TEST_MaybeFlushColumnFamilies();
|
||||
// log cannot be flushed because txn2 has not been commited
|
||||
ASSERT_TRUE(!db_impl->TEST_IsLogGettingFlushed());
|
||||
ASSERT_TRUE(db_impl->TEST_UnableToFlushOldestLog());
|
||||
|
||||
// assert that cfa has a flush requested
|
||||
ASSERT_TRUE(cfh_a->cfd()->imm()->HasFlushRequested());
|
||||
|
||||
// cfb should not be flushed becuse it has no data from LOG A
|
||||
ASSERT_TRUE(!cfh_b->cfd()->imm()->HasFlushRequested());
|
||||
|
||||
// cfb now has data from LOG A
|
||||
s = txn2->Commit();
|
||||
ASSERT_OK(s);
|
||||
|
||||
db_impl->TEST_MaybeFlushColumnFamilies();
|
||||
ASSERT_TRUE(!db_impl->TEST_UnableToFlushOldestLog());
|
||||
|
||||
// we should see that cfb now has a flush requested
|
||||
ASSERT_TRUE(cfh_b->cfd()->imm()->HasFlushRequested());
|
||||
|
||||
// all data in LOG A resides in a memtable that has been
|
||||
// requested for a flush
|
||||
ASSERT_TRUE(db_impl->TEST_IsLogGettingFlushed());
|
||||
|
||||
delete txn1;
|
||||
delete txn2;
|
||||
delete cfa;
|
||||
delete cfb;
|
||||
}
|
||||
/*
|
||||
* 1) use prepare to keep first log around to determine starting sequence
|
||||
* during recovery.
|
||||
|
Loading…
Reference in New Issue
Block a user