Allow concurrent writes to blob db

Summary:
I'm going with brute-force solution, just letting Put() and Write() holding a mutex before writing. May improve concurrent writing with finer granularity locking later.
Closes https://github.com/facebook/rocksdb/pull/2682

Differential Revision: D5552690

Pulled By: yiwu-arbug

fbshipit-source-id: 039abd675b5d274a7af6428198d1733cafecef4c
This commit is contained in:
Yi Wu 2017-08-03 15:07:01 -07:00
parent 1b33ee8e7b
commit 1339b299bc
3 changed files with 32 additions and 8 deletions

View File

@ -27,6 +27,7 @@
#include "util/file_reader_writer.h" #include "util/file_reader_writer.h"
#include "util/filename.h" #include "util/filename.h"
#include "util/logging.h" #include "util/logging.h"
#include "util/mutexlock.h"
#include "util/random.h" #include "util/random.h"
#include "util/timer_queue.h" #include "util/timer_queue.h"
#include "utilities/transactions/optimistic_transaction_db_impl.h" #include "utilities/transactions/optimistic_transaction_db_impl.h"
@ -882,6 +883,8 @@ Status BlobDBImpl::Write(const WriteOptions& opts, WriteBatch* updates) {
} }
}; };
MutexLock l(&write_mutex_);
SequenceNumber sequence = db_impl_->GetLatestSequenceNumber() + 1; SequenceNumber sequence = db_impl_->GetLatestSequenceNumber() + 1;
BlobInserter blob_inserter(this, sequence); BlobInserter blob_inserter(this, sequence);
updates->Iterate(&blob_inserter); updates->Iterate(&blob_inserter);
@ -957,6 +960,7 @@ Slice BlobDBImpl::GetCompressedSlice(const Slice& raw,
Status BlobDBImpl::PutUntil(const WriteOptions& options, Status BlobDBImpl::PutUntil(const WriteOptions& options,
ColumnFamilyHandle* column_family, const Slice& key, ColumnFamilyHandle* column_family, const Slice& key,
const Slice& value_unc, int32_t expiration) { const Slice& value_unc, int32_t expiration) {
MutexLock l(&write_mutex_);
UpdateWriteOptions(options); UpdateWriteOptions(options);
std::shared_ptr<BlobFile> bfile = std::shared_ptr<BlobFile> bfile =

View File

@ -447,6 +447,9 @@ class BlobDBImpl : public BlobDB {
// HEAVILY TRAFFICKED // HEAVILY TRAFFICKED
port::RWMutex mutex_; port::RWMutex mutex_;
// Writers has to hold write_mutex_ before writing.
mutable port::Mutex write_mutex_;
// counter for blob file number // counter for blob file number
std::atomic<uint64_t> next_file_number_; std::atomic<uint64_t> next_file_number_;

View File

@ -510,18 +510,35 @@ TEST_F(BlobDBTest, Compression) {
} }
#endif #endif
TEST_F(BlobDBTest, DISABLED_MultipleWriters) { TEST_F(BlobDBTest, MultipleWriters) {
Open(); Open(BlobDBOptions());
std::vector<port::Thread> workers; std::vector<port::Thread> workers;
for (size_t ii = 0; ii < 10; ii++) std::vector<std::map<std::string, std::string>> data_set(10);
workers.push_back(port::Thread(&BlobDBTest::InsertBlobs, this)); for (uint32_t i = 0; i < 10; i++)
workers.push_back(port::Thread(
for (auto& t : workers) { [&](uint32_t id) {
if (t.joinable()) { Random rnd(301 + id);
t.join(); for (int j = 0; j < 100; j++) {
std::string key = "key" + ToString(id) + "_" + ToString(j);
if (id < 5) {
PutRandom(key, &rnd, &data_set[id]);
} else {
WriteBatch batch;
PutRandomToWriteBatch(key, &rnd, &batch, &data_set[id]);
blob_db_->Write(WriteOptions(), &batch);
}
}
},
i));
std::map<std::string, std::string> data;
for (size_t i = 0; i < 10; i++) {
if (workers[i].joinable()) {
workers[i].join();
} }
data.insert(data_set[i].begin(), data_set[i].end());
} }
VerifyDB(data);
} }
// Test sequence number store in blob file is correct. // Test sequence number store in blob file is correct.