db/log_test: add recycle log test

This currently fails because we do not properly map a
corrupt header to the logical end of the log.

Signed-off-by: Sage Weil <sage@redhat.com>
This commit is contained in:
Sage Weil 2015-12-11 14:12:03 -05:00 committed by krad
parent 4e7e41ba77
commit 847e471db6
2 changed files with 68 additions and 0 deletions

View File

@ -174,6 +174,8 @@ class LogTest : public ::testing::TestWithParam<int> {
3 * header_size;
}
Slice* get_reader_contents() { return &reader_contents_; }
void Write(const std::string& msg) {
writer_.AddRecord(Slice(msg));
}
@ -690,6 +692,29 @@ TEST_P(LogTest, ClearEofError2) {
ASSERT_EQ("OK", MatchError("read error"));
}
TEST_P(LogTest, Recycle) {
if (!GetParam()) {
return; // test is only valid for recycled logs
}
Write("foo");
Write("bar");
Write("baz");
Write("bif");
Write("blitz");
while (get_reader_contents()->size() < log::kBlockSize * 2) {
Write("xxxxxxxxxxxxxxxx");
}
unique_ptr<WritableFileWriter> dest_holder(test::GetWritableFileWriter(
new test::OverwritingStringSink(get_reader_contents())));
Writer recycle_writer(std::move(dest_holder), 123, true);
recycle_writer.AddRecord(Slice("foooo"));
recycle_writer.AddRecord(Slice("bar"));
ASSERT_GE(get_reader_contents()->size(), log::kBlockSize * 2);
ASSERT_EQ("foooo", Read());
ASSERT_EQ("bar", Read());
ASSERT_EQ("EOF", Read());
}
INSTANTIATE_TEST_CASE_P(bool, LogTest, ::testing::Values(0, 2));
} // namespace log

View File

@ -230,6 +230,49 @@ class StringSink: public WritableFile {
size_t last_flush_;
};
// Like StringSink, this writes into a string. Unlink StringSink, it
// has some initial content and overwrites it, just like a recycled
// log file.
class OverwritingStringSink : public WritableFile {
public:
explicit OverwritingStringSink(Slice* reader_contents)
: WritableFile(),
contents_(""),
reader_contents_(reader_contents),
last_flush_(0) {}
const std::string& contents() const { return contents_; }
virtual Status Truncate(uint64_t size) override {
contents_.resize(static_cast<size_t>(size));
return Status::OK();
}
virtual Status Close() override { return Status::OK(); }
virtual Status Flush() override {
if (last_flush_ < contents_.size()) {
assert(reader_contents_->size() >= contents_.size());
memcpy((char*)reader_contents_->data() + last_flush_,
contents_.data() + last_flush_, contents_.size() - last_flush_);
last_flush_ = contents_.size();
}
return Status::OK();
}
virtual Status Sync() override { return Status::OK(); }
virtual Status Append(const Slice& slice) override {
contents_.append(slice.data(), slice.size());
return Status::OK();
}
void Drop(size_t bytes) {
contents_.resize(contents_.size() - bytes);
if (last_flush_ > contents_.size()) last_flush_ = contents_.size();
}
private:
std::string contents_;
Slice* reader_contents_;
size_t last_flush_;
};
class StringSource: public RandomAccessFile {
public:
explicit StringSource(const Slice& contents, uint64_t uniq_id = 0,