Blob DB: Fix race condition between flush and write

Summary:
A race condition will happen when:
* a user thread writes a value, but it hits the write stop condition because there are too many un-flushed memtables, while holding blob_db_impl.write_mutex_.
* Flush is triggered and call flush begin listener and try to acquire blob_db_impl.write_mutex_.

Fixing it.
Closes https://github.com/facebook/rocksdb/pull/3149

Differential Revision: D6279805

Pulled By: yiwu-arbug

fbshipit-source-id: 0e3c58afb78795ebe3360a2c69e05651e3908c40
This commit is contained in:
Yi Wu 2017-11-08 19:33:12 -08:00
parent 725bb9d665
commit 5d928c795a

View File

@ -745,13 +745,22 @@ class BlobDBImpl::BlobInserter : public WriteBatch::Handler {
}; };
Status BlobDBImpl::Write(const WriteOptions& options, WriteBatch* updates) { Status BlobDBImpl::Write(const WriteOptions& options, WriteBatch* updates) {
MutexLock l(&write_mutex_);
uint32_t default_cf_id = uint32_t default_cf_id =
reinterpret_cast<ColumnFamilyHandleImpl*>(DefaultColumnFamily())->GetID(); reinterpret_cast<ColumnFamilyHandleImpl*>(DefaultColumnFamily())->GetID();
// TODO(yiwu): In case there are multiple writers the latest sequence would
// not be the actually sequence we are writting. Need to get the sequence
// from write batch after DB write instead.
SequenceNumber current_seq = GetLatestSequenceNumber() + 1; SequenceNumber current_seq = GetLatestSequenceNumber() + 1;
Status s;
BlobInserter blob_inserter(options, this, default_cf_id, current_seq); BlobInserter blob_inserter(options, this, default_cf_id, current_seq);
Status s = updates->Iterate(&blob_inserter); {
// Release write_mutex_ before DB write to avoid race condition with
// flush begin listener, which also require write_mutex_ to sync
// blob files.
MutexLock l(&write_mutex_);
s = updates->Iterate(&blob_inserter);
}
if (!s.ok()) { if (!s.ok()) {
return s; return s;
} }
@ -759,7 +768,6 @@ Status BlobDBImpl::Write(const WriteOptions& options, WriteBatch* updates) {
if (!s.ok()) { if (!s.ok()) {
return s; return s;
} }
assert(blob_inserter.sequence() == GetLatestSequenceNumber() + 1);
// add deleted key to list of keys that have been deleted for book-keeping // add deleted key to list of keys that have been deleted for book-keeping
class DeleteBookkeeper : public WriteBatch::Handler { class DeleteBookkeeper : public WriteBatch::Handler {
@ -849,10 +857,19 @@ Status BlobDBImpl::PutWithTTL(const WriteOptions& options,
Status BlobDBImpl::PutUntil(const WriteOptions& options, const Slice& key, Status BlobDBImpl::PutUntil(const WriteOptions& options, const Slice& key,
const Slice& value, uint64_t expiration) { const Slice& value, uint64_t expiration) {
TEST_SYNC_POINT("BlobDBImpl::PutUntil:Start"); TEST_SYNC_POINT("BlobDBImpl::PutUntil:Start");
MutexLock l(&write_mutex_); Status s;
SequenceNumber sequence = GetLatestSequenceNumber() + 1;
WriteBatch batch; WriteBatch batch;
Status s = PutBlobValue(options, key, value, expiration, sequence, &batch); {
// Release write_mutex_ before DB write to avoid race condition with
// flush begin listener, which also require write_mutex_ to sync
// blob files.
MutexLock l(&write_mutex_);
// TODO(yiwu): In case there are multiple writers the latest sequence would
// not be the actually sequence we are writting. Need to get the sequence
// from write batch after DB write instead.
SequenceNumber sequence = GetLatestSequenceNumber() + 1;
s = PutBlobValue(options, key, value, expiration, sequence, &batch);
}
if (s.ok()) { if (s.ok()) {
s = db_->Write(options, &batch); s = db_->Write(options, &batch);
} }
@ -1198,8 +1215,6 @@ Status BlobDBImpl::GetBlobValue(const Slice& key, const Slice& index_entry,
return Status::Corruption("Corruption. Blob CRC mismatch"); return Status::Corruption("Corruption. Blob CRC mismatch");
} }
// TODO(yiwu): Should use compression flag in the blob file instead of
// current compression option.
if (bfile->compression() != kNoCompression) { if (bfile->compression() != kNoCompression) {
BlockContents contents; BlockContents contents;
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(DefaultColumnFamily()); auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(DefaultColumnFamily());