From 46152d53bf58748fc3ed0681d8970c342bcfc47a Mon Sep 17 00:00:00 2001 From: Andrew Kryczka Date: Mon, 30 Apr 2018 12:23:45 -0700 Subject: [PATCH] Second attempt at db_stress crash-recovery verification Summary: - Original commit: a4fb1f8c049ee9d61a9da8cf23b64d2c7d36a33f - Revert commit (we reverted as a quick fix to get crash tests passing): 6afe22db2e667799d8c903db61750d676bffe152 This PR includes the contents of the original commit plus two bug fixes, which are: - In whitebox crash test, only set `--expected_values_path` for `db_stress` runs in the first half of the crash test's duration. In the second half, a fresh DB is created for each `db_stress` run, so we cannot maintain expected state across `db_stress` runs. - Made `Exists()` return true for `UNKNOWN_SENTINEL` values. I previously had an assert in `Exists()` that value was not `UNKNOWN_SENTINEL`. But it is possible for post-crash-recovery expected values to be `UNKNOWN_SENTINEL` (i.e., if the crash happens in the middle of an update), in which case this assertion would be tripped. The effect of returning true in this case is there may be cases where a `SingleDelete` deletes no data. But if we had returned false, the effect would be calling `SingleDelete` on a key with multiple older versions, which is not supported. Closes https://github.com/facebook/rocksdb/pull/3793 Differential Revision: D7811671 Pulled By: ajkr fbshipit-source-id: 67e0295bfb1695ff9674837f2e05bb29c50efc30 --- env/env.cc | 2 + env/env_posix.cc | 41 +++++++++ env/env_test.cc | 35 ++++++++ env/io_posix.cc | 5 ++ env/io_posix.h | 6 ++ include/rocksdb/env.h | 22 +++++ tools/db_crashtest.py | 19 ++++- tools/db_stress.cc | 192 +++++++++++++++++++++++++++++++++++------- 8 files changed, 290 insertions(+), 32 deletions(-) diff --git a/env/env.cc b/env/env.cc index 1943f6ad8..66e293337 100644 --- a/env/env.cc +++ b/env/env.cc @@ -87,6 +87,8 @@ RandomAccessFile::~RandomAccessFile() { WritableFile::~WritableFile() { } +MemoryMappedFileBuffer::~MemoryMappedFileBuffer() {} + Logger::~Logger() {} Status Logger::Close() { diff --git a/env/env_posix.cc b/env/env_posix.cc index 707625f3f..a9895ec78 100644 --- a/env/env_posix.cc +++ b/env/env_posix.cc @@ -457,6 +457,47 @@ class PosixEnv : public Env { return Status::OK(); } + virtual Status NewMemoryMappedFileBuffer( + const std::string& fname, + unique_ptr* result) override { + int fd = -1; + Status status; + while (fd < 0) { + IOSTATS_TIMER_GUARD(open_nanos); + fd = open(fname.c_str(), O_RDWR, 0644); + if (fd < 0) { + // Error while opening the file + if (errno == EINTR) { + continue; + } + status = + IOError("While open file for raw mmap buffer access", fname, errno); + break; + } + } + uint64_t size; + if (status.ok()) { + status = GetFileSize(fname, &size); + } + void* base; + if (status.ok()) { + base = mmap(nullptr, static_cast(size), PROT_READ | PROT_WRITE, + MAP_SHARED, fd, 0); + if (base == MAP_FAILED) { + status = IOError("while mmap file for read", fname, errno); + } + } + if (status.ok()) { + result->reset( + new PosixMemoryMappedFileBuffer(base, static_cast(size))); + } + if (fd >= 0) { + // don't need to keep it open after mmap has been called + close(fd); + } + return status; + } + virtual Status NewDirectory(const std::string& name, unique_ptr* result) override { result->reset(); diff --git a/env/env_test.cc b/env/env_test.cc index 4a87094b5..3e5892437 100644 --- a/env/env_test.cc +++ b/env/env_test.cc @@ -200,6 +200,41 @@ TEST_F(EnvPosixTest, DISABLED_FilePermission) { } #endif +TEST_F(EnvPosixTest, MemoryMappedFileBuffer) { + const int kFileBytes = 1 << 15; // 32 KB + std::string expected_data; + std::string fname = test::TmpDir(env_) + "/" + "testfile"; + { + unique_ptr wfile; + const EnvOptions soptions; + ASSERT_OK(env_->NewWritableFile(fname, &wfile, soptions)); + + Random rnd(301); + test::RandomString(&rnd, kFileBytes, &expected_data); + ASSERT_OK(wfile->Append(expected_data)); + } + + std::unique_ptr mmap_buffer; + Status status = env_->NewMemoryMappedFileBuffer(fname, &mmap_buffer); + // it should be supported at least on linux +#if !defined(OS_LINUX) + if (status.IsNotSupported()) { + fprintf(stderr, + "skipping EnvPosixTest.MemoryMappedFileBuffer due to " + "unsupported Env::NewMemoryMappedFileBuffer\n"); + return; + } +#endif // !defined(OS_LINUX) + + ASSERT_OK(status); + ASSERT_NE(nullptr, mmap_buffer.get()); + ASSERT_NE(nullptr, mmap_buffer->base); + ASSERT_EQ(kFileBytes, mmap_buffer->length); + std::string actual_data(static_cast(mmap_buffer->base), + mmap_buffer->length); + ASSERT_EQ(expected_data, actual_data); +} + TEST_P(EnvPosixTestWithParam, UnSchedule) { std::atomic called(false); env_->SetBackgroundThreads(1, Env::LOW); diff --git a/env/io_posix.cc b/env/io_posix.cc index da6b516c9..a411b5639 100644 --- a/env/io_posix.cc +++ b/env/io_posix.cc @@ -1052,6 +1052,11 @@ Status PosixRandomRWFile::Close() { return Status::OK(); } +PosixMemoryMappedFileBuffer::~PosixMemoryMappedFileBuffer() { + // TODO should have error handling though not much we can do... + munmap(this->base, length); +} + /* * PosixDirectory */ diff --git a/env/io_posix.h b/env/io_posix.h index f29a159ae..106f6df65 100644 --- a/env/io_posix.h +++ b/env/io_posix.h @@ -236,6 +236,12 @@ class PosixRandomRWFile : public RandomRWFile { int fd_; }; +struct PosixMemoryMappedFileBuffer : public MemoryMappedFileBuffer { + PosixMemoryMappedFileBuffer(void* _base, size_t _length) + : MemoryMappedFileBuffer(_base, _length) {} + virtual ~PosixMemoryMappedFileBuffer(); +}; + class PosixDirectory : public Directory { public: explicit PosixDirectory(int fd) : fd_(fd) {} diff --git a/include/rocksdb/env.h b/include/rocksdb/env.h index 81b31bdbb..1f1f06010 100644 --- a/include/rocksdb/env.h +++ b/include/rocksdb/env.h @@ -42,6 +42,7 @@ class SequentialFile; class Slice; class WritableFile; class RandomRWFile; +struct MemoryMappedFileBuffer; class Directory; struct DBOptions; struct ImmutableDBOptions; @@ -204,6 +205,16 @@ class Env { return Status::NotSupported("RandomRWFile is not implemented in this Env"); } + // Opens `fname` as a memory-mapped file for read and write (in-place updates + // only, i.e., no appends). On success, stores a raw buffer covering the whole + // file in `*result`. The file must exist prior to this call. + virtual Status NewMemoryMappedFileBuffer( + const std::string& /*fname*/, + unique_ptr* /*result*/) { + return Status::NotSupported( + "MemoryMappedFileBuffer is not implemented in this Env"); + } + // Create an object that represents a directory. Will fail if directory // doesn't exist. If the directory exists, it will open the directory // and create a new Directory object. @@ -809,6 +820,17 @@ class RandomRWFile { RandomRWFile& operator=(const RandomRWFile&) = delete; }; +// MemoryMappedFileBuffer object represents a memory-mapped file's raw buffer. +// Subclasses should release the mapping upon destruction. +struct MemoryMappedFileBuffer { + MemoryMappedFileBuffer(void* _base, size_t _length) + : base(_base), length(_length) {} + virtual ~MemoryMappedFileBuffer() = 0; + + void* const base; + const size_t length; +}; + // Directory object represents collection of files and implements // filesystem operations that can be executed on directories. class Directory { diff --git a/tools/db_crashtest.py b/tools/db_crashtest.py index 01a36a6ac..344caf52f 100644 --- a/tools/db_crashtest.py +++ b/tools/db_crashtest.py @@ -16,14 +16,19 @@ import argparse # for simple: # simple_default_params < blackbox|whitebox_simple_default_params < args +expected_values_file = tempfile.NamedTemporaryFile() + default_params = { "acquire_snapshot_one_in": 10000, "block_size": 16384, "cache_size": 1048576, + "clear_column_family_one_in": 0, + "compression_type": "snappy", "use_clock_cache": "false", "delpercent": 5, "destroy_db_initially": 0, "disable_wal": 0, + "expected_values_path": expected_values_file.name, "allow_concurrent_memtable_write": 0, "iterpercent": 10, "max_background_compactions": 20, @@ -89,10 +94,13 @@ simple_default_params = { "block_size": 16384, "cache_size": 1048576, "use_clock_cache": "false", + "clear_column_family_one_in": 0, "column_families": 1, + "compression_type": "snappy", "delpercent": 5, "destroy_db_initially": 0, "disable_wal": 0, + "expected_values_path": expected_values_file.name, "allow_concurrent_memtable_write": lambda: random.randint(0, 1), "iterpercent": 10, "max_background_compactions": 1, @@ -192,13 +200,16 @@ def blackbox_crash_main(args): + "threads=" + str(cmd_params['threads']) + "\n" + "ops_per_thread=" + str(cmd_params['ops_per_thread']) + "\n" + "write_buffer_size=" + str(cmd_params['write_buffer_size']) + "\n" - + "subcompactions=" + str(cmd_params['subcompactions']) + "\n") + + "subcompactions=" + str(cmd_params['subcompactions']) + "\n" + + "expected_values_path=" + str(cmd_params['expected_values_path']) + "\n") while time.time() < exit_time: run_had_errors = False killtime = time.time() + cmd_params['interval'] - cmd = gen_cmd(dict(cmd_params.items() + {'db': dbname}.items())) + cmd = gen_cmd(dict( + cmd_params.items() + + {'db': dbname}.items())) child = subprocess.Popen(cmd, stderr=subprocess.PIPE) print("Running db_stress with pid=%d: %s\n\n" @@ -255,7 +266,8 @@ def whitebox_crash_main(args): + "threads=" + str(cmd_params['threads']) + "\n" + "ops_per_thread=" + str(cmd_params['ops_per_thread']) + "\n" + "write_buffer_size=" + str(cmd_params['write_buffer_size']) + "\n" - + "subcompactions=" + str(cmd_params['subcompactions']) + "\n") + + "subcompactions=" + str(cmd_params['subcompactions']) + "\n" + + "expected_values_path=" + str(cmd_params['expected_values_path']) + "\n") total_check_mode = 4 check_mode = 0 @@ -360,6 +372,7 @@ def whitebox_crash_main(args): # we need to clean up after ourselves -- only do this on test # success shutil.rmtree(dbname, True) + cmd_params.pop('expected_values_path', None) check_mode = (check_mode + 1) % total_check_mode time.sleep(1) # time to stabilize after a kill diff --git a/tools/db_stress.cc b/tools/db_stress.cc index c88e4ba70..64e7b2a05 100644 --- a/tools/db_stress.cc +++ b/tools/db_stress.cc @@ -306,6 +306,14 @@ DEFINE_bool(use_block_based_filter, false, "use block based filter" DEFINE_string(db, "", "Use the db with the following name."); +DEFINE_string( + expected_values_path, "", + "File where the array of expected uint32_t values will be stored. If " + "provided and non-empty, the DB state will be verified against these " + "values after recovery. --max_key and --column_family must be kept the " + "same across invocations of this program that use the same " + "--expected_values_path."); + DEFINE_bool(verify_checksum, false, "Verify checksum for every block read from storage"); @@ -777,7 +785,11 @@ class Stats { // State shared by all concurrent executions of the same benchmark. class SharedState { public: - static const uint32_t SENTINEL; + // indicates a key may have any value (or not be present) as an operation on + // it is incomplete. + static const uint32_t UNKNOWN_SENTINEL; + // indicates a key should definitely be deleted + static const uint32_t DELETION_SENTINEL; explicit SharedState(StressTest* stress_test) : cv_(&mu_), @@ -795,7 +807,8 @@ class SharedState { bg_thread_finished_(false), stress_test_(stress_test), verification_failure_(false), - no_overwrite_ids_(FLAGS_column_families) { + no_overwrite_ids_(FLAGS_column_families), + values_(nullptr) { // Pick random keys in each column family that will not experience // overwrite @@ -829,15 +842,69 @@ class SharedState { } delete[] permutation; + size_t expected_values_size = + sizeof(std::atomic) * FLAGS_column_families * max_key_; + bool values_init_needed = false; + Status status; + if (!FLAGS_expected_values_path.empty()) { + if (!std::atomic{}.is_lock_free()) { + status = Status::InvalidArgument( + "Cannot use --expected_values_path on platforms without lock-free " + "std::atomic"); + } + if (status.ok() && FLAGS_clear_column_family_one_in > 0) { + status = Status::InvalidArgument( + "Cannot use --expected_values_path on when " + "--clear_column_family_one_in is greater than zero."); + } + size_t size; + if (status.ok()) { + status = FLAGS_env->GetFileSize(FLAGS_expected_values_path, &size); + } + unique_ptr wfile; + if (status.ok() && size == 0) { + const EnvOptions soptions; + status = FLAGS_env->NewWritableFile(FLAGS_expected_values_path, &wfile, + soptions); + } + if (status.ok() && size == 0) { + std::string buf(expected_values_size, '\0'); + status = wfile->Append(buf); + values_init_needed = true; + } + if (status.ok()) { + status = FLAGS_env->NewMemoryMappedFileBuffer( + FLAGS_expected_values_path, &expected_mmap_buffer_); + } + if (status.ok()) { + assert(expected_mmap_buffer_->length == expected_values_size); + values_ = + static_cast*>(expected_mmap_buffer_->base); + assert(values_ != nullptr); + } else { + fprintf(stderr, "Failed opening shared file '%s' with error: %s\n", + FLAGS_expected_values_path.c_str(), status.ToString().c_str()); + assert(values_ == nullptr); + } + } + if (values_ == nullptr) { + values_ = + static_cast*>(malloc(expected_values_size)); + values_init_needed = true; + } + assert(values_ != nullptr); + if (values_init_needed) { + for (int i = 0; i < FLAGS_column_families; ++i) { + for (int j = 0; j < max_key_; ++j) { + Delete(i, j, false /* pending */); + } + } + } + if (FLAGS_test_batches_snapshots) { fprintf(stdout, "No lock creation because test_batches_snapshots set\n"); return; } - values_.resize(FLAGS_column_families); - - for (int i = 0; i < FLAGS_column_families; ++i) { - values_[i] = std::vector(max_key_, SENTINEL); - } long num_locks = static_cast(max_key_ >> log2_keys_per_lock_); if (max_key_ & ((1 << log2_keys_per_lock_) - 1)) { @@ -944,27 +1011,57 @@ class SharedState { } } + std::atomic& Value(int cf, int64_t key) const { + return values_[cf * max_key_ + key]; + } + void ClearColumnFamily(int cf) { - std::fill(values_[cf].begin(), values_[cf].end(), SENTINEL); + std::fill(&Value(cf, 0 /* key */), &Value(cf + 1, 0 /* key */), + DELETION_SENTINEL); } - void Put(int cf, int64_t key, uint32_t value_base) { - values_[cf][key] = value_base; + // @param pending True if the update may have started but is not yet + // guaranteed finished. This is useful for crash-recovery testing when the + // process may crash before updating the expected values array. + void Put(int cf, int64_t key, uint32_t value_base, bool pending) { + if (!pending) { + // prevent expected-value update from reordering before Write + std::atomic_thread_fence(std::memory_order_release); + } + Value(cf, key).store(pending ? UNKNOWN_SENTINEL : value_base, + std::memory_order_relaxed); + if (pending) { + // prevent Write from reordering before expected-value update + std::atomic_thread_fence(std::memory_order_release); + } } - uint32_t Get(int cf, int64_t key) const { return values_[cf][key]; } + uint32_t Get(int cf, int64_t key) const { return Value(cf, key); } - void Delete(int cf, int64_t key) { values_[cf][key] = SENTINEL; } + // @param pending See comment above Put() + // Returns true if the key was not yet deleted. + bool Delete(int cf, int64_t key, bool pending) { + if (Value(cf, key) == DELETION_SENTINEL) { + return false; + } + Put(cf, key, DELETION_SENTINEL, pending); + return true; + } - void SingleDelete(int cf, int64_t key) { values_[cf][key] = SENTINEL; } + // @param pending See comment above Put() + // Returns true if the key was not yet deleted. + bool SingleDelete(int cf, int64_t key, bool pending) { + return Delete(cf, key, pending); + } - int DeleteRange(int cf, int64_t begin_key, int64_t end_key) { + // @param pending See comment above Put() + // Returns number of keys deleted by the call. + int DeleteRange(int cf, int64_t begin_key, int64_t end_key, bool pending) { int covered = 0; for (int64_t key = begin_key; key < end_key; ++key) { - if (values_[cf][key] != SENTINEL) { + if (Delete(cf, key, pending)) { ++covered; } - values_[cf][key] = SENTINEL; } return covered; } @@ -973,7 +1070,15 @@ class SharedState { return no_overwrite_ids_[cf].find(key) == no_overwrite_ids_[cf].end(); } - bool Exists(int cf, int64_t key) { return values_[cf][key] != SENTINEL; } + bool Exists(int cf, int64_t key) { + // UNKNOWN_SENTINEL counts as exists. That assures a key for which overwrite + // is disallowed can't be accidentally added a second time, in which case + // SingleDelete wouldn't be able to properly delete the key. It does allow + // the case where a SingleDelete might be added which covers nothing, but + // that's not a correctness issue. + uint32_t expected_value = Value(cf, key).load(); + return expected_value != DELETION_SENTINEL; + } uint32_t GetSeed() const { return seed_; } @@ -985,6 +1090,10 @@ class SharedState { bool BgThreadFinished() const { return bg_thread_finished_; } + bool ShouldVerifyAtBeginning() const { + return expected_mmap_buffer_.get() != nullptr; + } + private: port::Mutex mu_; port::CondVar cv_; @@ -1006,13 +1115,15 @@ class SharedState { // Keys that should not be overwritten std::vector > no_overwrite_ids_; - std::vector> values_; + std::atomic* values_; // Has to make it owned by a smart ptr as port::Mutex is not copyable // and storing it in the container may require copying depending on the impl. std::vector > > key_locks_; + std::unique_ptr expected_mmap_buffer_; }; -const uint32_t SharedState::SENTINEL = 0xffffffff; +const uint32_t SharedState::UNKNOWN_SENTINEL = 0xfffffffe; +const uint32_t SharedState::DELETION_SENTINEL = 0xffffffff; // Per-thread state for concurrent executions of the same benchmark. struct ThreadState { @@ -1320,6 +1431,13 @@ class StressTest { while (!shared.AllInitialized()) { shared.GetCondVar()->Wait(); } + if (shared.ShouldVerifyAtBeginning()) { + if (shared.HasVerificationFailedYet()) { + printf("Crash-recovery verification failed :(\n"); + } else { + printf("Crash-recovery verification passed :)\n"); + } + } auto now = FLAGS_env->NowMicros(); fprintf(stdout, "%s Starting database operations\n", @@ -1415,6 +1533,9 @@ class StressTest { ThreadState* thread = reinterpret_cast(v); SharedState* shared = thread->shared; + if (shared->ShouldVerifyAtBeginning()) { + thread->shared->GetStressTest()->VerifyDb(thread); + } { MutexLock l(shared->GetMutex()); shared->IncInitialized(); @@ -1996,7 +2117,7 @@ class StressTest { } } else if (prefixBound <= prob_op && prob_op < writeBound) { // OPERATION write - uint32_t value_base = thread->rand.Next(); + uint32_t value_base = thread->rand.Next() % shared->UNKNOWN_SENTINEL; size_t sz = GenerateValue(value_base, value, sizeof(value)); Slice v(value, sz); if (!FLAGS_test_batches_snapshots) { @@ -2024,7 +2145,8 @@ class StressTest { break; } } - shared->Put(rand_column_family, rand_key, value_base); + shared->Put(rand_column_family, rand_key, value_base, + true /* pending */); Status s; if (FLAGS_use_merge) { if (!FLAGS_use_txn) { @@ -2057,6 +2179,8 @@ class StressTest { #endif } } + shared->Put(rand_column_family, rand_key, value_base, + false /* pending */); if (!s.ok()) { fprintf(stderr, "put or merge error: %s\n", s.ToString().c_str()); std::terminate(); @@ -2088,7 +2212,7 @@ class StressTest { // Use delete if the key may be overwritten and a single deletion // otherwise. if (shared->AllowsOverwrite(rand_column_family, rand_key)) { - shared->Delete(rand_column_family, rand_key); + shared->Delete(rand_column_family, rand_key, true /* pending */); Status s; if (!FLAGS_use_txn) { s = db_->Delete(write_opts, column_family, key); @@ -2104,13 +2228,15 @@ class StressTest { } #endif } + shared->Delete(rand_column_family, rand_key, false /* pending */); thread->stats.AddDeletes(1); if (!s.ok()) { fprintf(stderr, "delete error: %s\n", s.ToString().c_str()); std::terminate(); } } else { - shared->SingleDelete(rand_column_family, rand_key); + shared->SingleDelete(rand_column_family, rand_key, + true /* pending */); Status s; if (!FLAGS_use_txn) { s = db_->SingleDelete(write_opts, column_family, key); @@ -2126,6 +2252,8 @@ class StressTest { } #endif } + shared->SingleDelete(rand_column_family, rand_key, + false /* pending */); thread->stats.AddSingleDeletes(1); if (!s.ok()) { fprintf(stderr, "single delete error: %s\n", @@ -2159,21 +2287,24 @@ class StressTest { shared->GetMutexForKey(rand_column_family, rand_key + j))); } } + shared->DeleteRange(rand_column_family, rand_key, + rand_key + FLAGS_range_deletion_width, + true /* pending */); keystr = Key(rand_key); key = keystr; column_family = column_families_[rand_column_family]; std::string end_keystr = Key(rand_key + FLAGS_range_deletion_width); Slice end_key = end_keystr; - int covered = shared->DeleteRange( - rand_column_family, rand_key, - rand_key + FLAGS_range_deletion_width); Status s = db_->DeleteRange(write_opts, column_family, key, end_key); if (!s.ok()) { fprintf(stderr, "delete range error: %s\n", s.ToString().c_str()); std::terminate(); } + int covered = shared->DeleteRange( + rand_column_family, rand_key, + rand_key + FLAGS_range_deletion_width, false /* pending */); thread->stats.AddRangeDeletions(1); thread->stats.AddCoveredByRangeDeletions(covered); } @@ -2285,12 +2416,15 @@ class StressTest { // compare value_from_db with the value in the shared state char value[kValueMaxLen]; uint32_t value_base = shared->Get(cf, key); - if (value_base == SharedState::SENTINEL && !strict) { + if (value_base == SharedState::UNKNOWN_SENTINEL) { + return true; + } + if (value_base == SharedState::DELETION_SENTINEL && !strict) { return true; } if (s.ok()) { - if (value_base == SharedState::SENTINEL) { + if (value_base == SharedState::DELETION_SENTINEL) { VerificationAbort(shared, "Unexpected value found", cf, key); return false; } @@ -2305,7 +2439,7 @@ class StressTest { return false; } } else { - if (value_base != SharedState::SENTINEL) { + if (value_base != SharedState::DELETION_SENTINEL) { VerificationAbort(shared, "Value not found: " + s.ToString(), cf, key); return false; }