Pass manual_wal_flush also to the first wal file
Summary: Currently manual_wal_flush if set in the options will be used only for the wal files created during wal switch. The configuration thus does not affect the first wal file. The patch fixes that and also update the related unit tests. This PR is built on top of https://github.com/facebook/rocksdb/pull/3756 Closes https://github.com/facebook/rocksdb/pull/3824 Differential Revision: D7909153 Pulled By: maysamyabandeh fbshipit-source-id: 024ed99d2555db06bf096c902b998e432bb7b9ce
This commit is contained in:
parent
66c7aa32fb
commit
718c1c9c1f
@ -718,6 +718,11 @@ Status DBImpl::FlushWAL(bool sync) {
|
|||||||
if (!s.ok()) {
|
if (!s.ok()) {
|
||||||
ROCKS_LOG_ERROR(immutable_db_options_.info_log, "WAL flush error %s",
|
ROCKS_LOG_ERROR(immutable_db_options_.info_log, "WAL flush error %s",
|
||||||
s.ToString().c_str());
|
s.ToString().c_str());
|
||||||
|
// In case there is a fs error we should set it globally to prevent the
|
||||||
|
// future writes
|
||||||
|
WriteStatusCheck(s);
|
||||||
|
// whether sync or not, we should abort the rest of function upon error
|
||||||
|
return s;
|
||||||
}
|
}
|
||||||
if (!sync) {
|
if (!sync) {
|
||||||
ROCKS_LOG_DEBUG(immutable_db_options_.info_log, "FlushWAL sync=false");
|
ROCKS_LOG_DEBUG(immutable_db_options_.info_log, "FlushWAL sync=false");
|
||||||
|
@ -221,6 +221,7 @@ class DBImpl : public DB {
|
|||||||
virtual Status Flush(const FlushOptions& options,
|
virtual Status Flush(const FlushOptions& options,
|
||||||
ColumnFamilyHandle* column_family) override;
|
ColumnFamilyHandle* column_family) override;
|
||||||
virtual Status FlushWAL(bool sync) override;
|
virtual Status FlushWAL(bool sync) override;
|
||||||
|
bool TEST_WALBufferIsEmpty();
|
||||||
virtual Status SyncWAL() override;
|
virtual Status SyncWAL() override;
|
||||||
|
|
||||||
virtual SequenceNumber GetLatestSequenceNumber() const override;
|
virtual SequenceNumber GetLatestSequenceNumber() const override;
|
||||||
|
@ -25,6 +25,12 @@ void DBImpl::TEST_SwitchWAL() {
|
|||||||
SwitchWAL(&write_context);
|
SwitchWAL(&write_context);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool DBImpl::TEST_WALBufferIsEmpty() {
|
||||||
|
InstrumentedMutexLock wl(&log_write_mutex_);
|
||||||
|
log::Writer* cur_log_writer = logs_.back().writer;
|
||||||
|
return cur_log_writer->TEST_BufferIsEmpty();
|
||||||
|
}
|
||||||
|
|
||||||
int64_t DBImpl::TEST_MaxNextLevelOverlappingBytes(
|
int64_t DBImpl::TEST_MaxNextLevelOverlappingBytes(
|
||||||
ColumnFamilyHandle* column_family) {
|
ColumnFamilyHandle* column_family) {
|
||||||
ColumnFamilyData* cfd;
|
ColumnFamilyData* cfd;
|
||||||
|
@ -1090,7 +1090,8 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname,
|
|||||||
new_log_number,
|
new_log_number,
|
||||||
new log::Writer(
|
new log::Writer(
|
||||||
std::move(file_writer), new_log_number,
|
std::move(file_writer), new_log_number,
|
||||||
impl->immutable_db_options_.recycle_log_file_num > 0));
|
impl->immutable_db_options_.recycle_log_file_num > 0,
|
||||||
|
impl->immutable_db_options_.manual_wal_flush));
|
||||||
}
|
}
|
||||||
|
|
||||||
// set column family handles
|
// set column family handles
|
||||||
@ -1217,6 +1218,9 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname,
|
|||||||
if (s.ok()) {
|
if (s.ok()) {
|
||||||
ROCKS_LOG_INFO(impl->immutable_db_options_.info_log, "DB pointer %p", impl);
|
ROCKS_LOG_INFO(impl->immutable_db_options_.info_log, "DB pointer %p", impl);
|
||||||
LogFlush(impl->immutable_db_options_.info_log);
|
LogFlush(impl->immutable_db_options_.info_log);
|
||||||
|
assert(impl->TEST_WALBufferIsEmpty());
|
||||||
|
// If the assert above fails then we need to FlushWAL before returning
|
||||||
|
// control back to the user.
|
||||||
if (!persist_options_status.ok()) {
|
if (!persist_options_status.ok()) {
|
||||||
s = Status::IOError(
|
s = Status::IOError(
|
||||||
"DB::Open() failed --- Unable to persist Options file",
|
"DB::Open() failed --- Unable to persist Options file",
|
||||||
|
@ -50,6 +50,7 @@ TEST_P(DBWriteTest, IOErrorOnWALWritePropagateToWriteThreadFollower) {
|
|||||||
std::atomic<int> leader_count{0};
|
std::atomic<int> leader_count{0};
|
||||||
std::vector<port::Thread> threads;
|
std::vector<port::Thread> threads;
|
||||||
mock_env->SetFilesystemActive(false);
|
mock_env->SetFilesystemActive(false);
|
||||||
|
|
||||||
// Wait until all threads linked to write threads, to make sure
|
// Wait until all threads linked to write threads, to make sure
|
||||||
// all threads join the same batch group.
|
// all threads join the same batch group.
|
||||||
SyncPoint::GetInstance()->SetCallBack(
|
SyncPoint::GetInstance()->SetCallBack(
|
||||||
@ -68,7 +69,13 @@ TEST_P(DBWriteTest, IOErrorOnWALWritePropagateToWriteThreadFollower) {
|
|||||||
threads.push_back(port::Thread(
|
threads.push_back(port::Thread(
|
||||||
[&](int index) {
|
[&](int index) {
|
||||||
// All threads should fail.
|
// All threads should fail.
|
||||||
ASSERT_FALSE(Put("key" + ToString(index), "value").ok());
|
auto res = Put("key" + ToString(index), "value");
|
||||||
|
if (options.manual_wal_flush) {
|
||||||
|
ASSERT_TRUE(res.ok());
|
||||||
|
// we should see fs error when we do the flush
|
||||||
|
res = dbfull()->FlushWAL(false);
|
||||||
|
}
|
||||||
|
ASSERT_FALSE(res.ok());
|
||||||
},
|
},
|
||||||
i));
|
i));
|
||||||
}
|
}
|
||||||
@ -80,6 +87,22 @@ TEST_P(DBWriteTest, IOErrorOnWALWritePropagateToWriteThreadFollower) {
|
|||||||
Close();
|
Close();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TEST_P(DBWriteTest, ManualWalFlushInEffect) {
|
||||||
|
Options options = GetOptions();
|
||||||
|
Reopen(options);
|
||||||
|
// try the 1st WAL created during open
|
||||||
|
ASSERT_TRUE(Put("key" + ToString(0), "value").ok());
|
||||||
|
ASSERT_TRUE(options.manual_wal_flush != dbfull()->TEST_WALBufferIsEmpty());
|
||||||
|
ASSERT_TRUE(dbfull()->FlushWAL(false).ok());
|
||||||
|
ASSERT_TRUE(dbfull()->TEST_WALBufferIsEmpty());
|
||||||
|
// try the 2nd wal created during SwitchWAL
|
||||||
|
dbfull()->TEST_SwitchWAL();
|
||||||
|
ASSERT_TRUE(Put("key" + ToString(0), "value").ok());
|
||||||
|
ASSERT_TRUE(options.manual_wal_flush != dbfull()->TEST_WALBufferIsEmpty());
|
||||||
|
ASSERT_TRUE(dbfull()->FlushWAL(false).ok());
|
||||||
|
ASSERT_TRUE(dbfull()->TEST_WALBufferIsEmpty());
|
||||||
|
}
|
||||||
|
|
||||||
TEST_P(DBWriteTest, IOErrorOnWALWriteTriggersReadOnlyMode) {
|
TEST_P(DBWriteTest, IOErrorOnWALWriteTriggersReadOnlyMode) {
|
||||||
std::unique_ptr<FaultInjectionTestEnv> mock_env(
|
std::unique_ptr<FaultInjectionTestEnv> mock_env(
|
||||||
new FaultInjectionTestEnv(Env::Default()));
|
new FaultInjectionTestEnv(Env::Default()));
|
||||||
@ -90,7 +113,15 @@ TEST_P(DBWriteTest, IOErrorOnWALWriteTriggersReadOnlyMode) {
|
|||||||
// Forcibly fail WAL write for the first Put only. Subsequent Puts should
|
// Forcibly fail WAL write for the first Put only. Subsequent Puts should
|
||||||
// fail due to read-only mode
|
// fail due to read-only mode
|
||||||
mock_env->SetFilesystemActive(i != 0);
|
mock_env->SetFilesystemActive(i != 0);
|
||||||
ASSERT_FALSE(Put("key" + ToString(i), "value").ok());
|
auto res = Put("key" + ToString(i), "value");
|
||||||
|
if (options.manual_wal_flush && i == 0) {
|
||||||
|
// even with manual_wal_flush the 2nd Put should return error because of
|
||||||
|
// the read-only mode
|
||||||
|
ASSERT_TRUE(res.ok());
|
||||||
|
// we should see fs error when we do the flush
|
||||||
|
res = dbfull()->FlushWAL(false);
|
||||||
|
}
|
||||||
|
ASSERT_FALSE(res.ok());
|
||||||
}
|
}
|
||||||
// Close before mock_env destruct.
|
// Close before mock_env destruct.
|
||||||
Close();
|
Close();
|
||||||
|
@ -92,6 +92,8 @@ Status Writer::AddRecord(const Slice& slice) {
|
|||||||
return s;
|
return s;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool Writer::TEST_BufferIsEmpty() { return dest_->TEST_BufferIsEmpty(); }
|
||||||
|
|
||||||
Status Writer::EmitPhysicalRecord(RecordType t, const char* ptr, size_t n) {
|
Status Writer::EmitPhysicalRecord(RecordType t, const char* ptr, size_t n) {
|
||||||
assert(n <= 0xffff); // Must fit in two bytes
|
assert(n <= 0xffff); // Must fit in two bytes
|
||||||
|
|
||||||
|
@ -85,6 +85,8 @@ class Writer {
|
|||||||
|
|
||||||
Status WriteBuffer();
|
Status WriteBuffer();
|
||||||
|
|
||||||
|
bool TEST_BufferIsEmpty();
|
||||||
|
|
||||||
private:
|
private:
|
||||||
unique_ptr<WritableFileWriter> dest_;
|
unique_ptr<WritableFileWriter> dest_;
|
||||||
size_t block_offset_; // Current offset in block
|
size_t block_offset_; // Current offset in block
|
||||||
|
@ -124,6 +124,8 @@ DBOptions BuildDBOptions(const ImmutableDBOptions& immutable_db_options,
|
|||||||
immutable_db_options.allow_ingest_behind;
|
immutable_db_options.allow_ingest_behind;
|
||||||
options.preserve_deletes =
|
options.preserve_deletes =
|
||||||
immutable_db_options.preserve_deletes;
|
immutable_db_options.preserve_deletes;
|
||||||
|
options.two_write_queues = immutable_db_options.two_write_queues;
|
||||||
|
options.manual_wal_flush = immutable_db_options.manual_wal_flush;
|
||||||
|
|
||||||
return options;
|
return options;
|
||||||
}
|
}
|
||||||
|
@ -189,6 +189,8 @@ class WritableFileWriter {
|
|||||||
|
|
||||||
bool use_direct_io() { return writable_file_->use_direct_io(); }
|
bool use_direct_io() { return writable_file_->use_direct_io(); }
|
||||||
|
|
||||||
|
bool TEST_BufferIsEmpty() { return buf_.CurrentSize() == 0; }
|
||||||
|
|
||||||
private:
|
private:
|
||||||
// Used when os buffering is OFF and we are writing
|
// Used when os buffering is OFF and we are writing
|
||||||
// DMA such as in Direct I/O mode
|
// DMA such as in Direct I/O mode
|
||||||
|
Loading…
Reference in New Issue
Block a user