rocksdb/util/blob_store.cc
Igor Canadi cb8a7302e4 Implement max_size in BlobStore
Summary:
I added max_size option in blobstore. Since we now know the maximum number of buckets we'll ever use, we can allocate an array of buckets and access its elements without use of any locks! Common case Get doesn't lock anything now.

Benchmarks on 16KB block size show no impact on speed, though.

Test Plan: unittests + benchmark

Reviewers: dhruba, haobo, kailiu

Reviewed By: dhruba

CC: leveldb

Differential Revision: https://reviews.facebook.net/D13641
2013-10-23 14:38:52 -07:00

265 lines
7.2 KiB
C++

// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "util/blob_store.h"
namespace rocksdb {
using namespace std;
// BlobChunk
bool BlobChunk::ImmediatelyBefore(const BlobChunk& chunk) const {
// overlapping!?
assert(!Overlap(chunk));
// size == 0 is a marker, not a block
return size != 0 &&
bucket_id == chunk.bucket_id &&
offset + size == chunk.offset;
}
bool BlobChunk::Overlap(const BlobChunk &chunk) const {
return size != 0 && chunk.size != 0 && bucket_id == chunk.bucket_id &&
((offset >= chunk.offset && offset < chunk.offset + chunk.size) ||
(chunk.offset >= offset && chunk.offset < offset + size));
}
// Blob
string Blob::ToString() const {
string ret;
for (auto chunk : chunks) {
PutFixed32(&ret, chunk.bucket_id);
PutFixed32(&ret, chunk.offset);
PutFixed32(&ret, chunk.size);
}
return ret;
}
Blob::Blob(const std::string& blob) {
for (uint32_t i = 0; i < blob.size(); ) {
uint32_t t[3] = {0};
for (int j = 0; j < 3 && i + sizeof(uint32_t) - 1 < blob.size();
++j, i += sizeof(uint32_t)) {
t[j] = DecodeFixed32(blob.data() + i);
}
chunks.push_back(BlobChunk(t[0], t[1], t[2]));
}
}
// FreeList
Status FreeList::Free(const Blob& blob) {
// add it back to the free list
for (auto chunk : blob.chunks) {
free_blocks_ += chunk.size;
if (fifo_free_chunks_.size() &&
fifo_free_chunks_.back().ImmediatelyBefore(chunk)) {
fifo_free_chunks_.back().size += chunk.size;
} else {
fifo_free_chunks_.push_back(chunk);
}
}
return Status::OK();
}
Status FreeList::Allocate(uint32_t blocks, Blob* blob) {
if (free_blocks_ < blocks) {
return Status::Incomplete("");
}
blob->chunks.clear();
free_blocks_ -= blocks;
while (blocks > 0) {
assert(fifo_free_chunks_.size() > 0);
auto& front = fifo_free_chunks_.front();
if (front.size > blocks) {
blob->chunks.push_back(BlobChunk(front.bucket_id, front.offset, blocks));
front.offset += blocks;
front.size -= blocks;
blocks = 0;
} else {
blob->chunks.push_back(front);
blocks -= front.size;
fifo_free_chunks_.pop_front();
}
}
assert(blocks == 0);
return Status::OK();
}
bool FreeList::Overlap(const Blob &blob) const {
for (auto chunk : blob.chunks) {
for (auto itr = fifo_free_chunks_.begin();
itr != fifo_free_chunks_.end();
++itr) {
if (itr->Overlap(chunk)) {
return true;
}
}
}
return false;
}
// BlobStore
BlobStore::BlobStore(const string& directory,
uint64_t block_size,
uint32_t blocks_per_bucket,
uint32_t max_buckets,
Env* env) :
directory_(directory),
block_size_(block_size),
blocks_per_bucket_(blocks_per_bucket),
env_(env),
max_buckets_(max_buckets) {
env_->CreateDirIfMissing(directory_);
storage_options_.use_mmap_writes = false;
storage_options_.use_mmap_reads = false;
buckets_size_ = 0;
buckets_ = new unique_ptr<RandomRWFile>[max_buckets_];
CreateNewBucket();
}
BlobStore::~BlobStore() {
// TODO we don't care about recovery for now
delete [] buckets_;
}
Status BlobStore::Put(const Slice& value, Blob* blob) {
// convert size to number of blocks
Status s = Allocate((value.size() + block_size_ - 1) / block_size_, blob);
if (!s.ok()) {
return s;
}
size_t size_left = value.size();
uint64_t offset = 0; // in bytes, not blocks
for (auto chunk : blob->chunks) {
uint64_t write_size = min(chunk.size * block_size_, size_left);
assert(chunk.bucket_id < buckets_size_);
s = buckets_[chunk.bucket_id].get()->Write(chunk.offset * block_size_,
Slice(value.data() + offset,
write_size));
if (!s.ok()) {
Delete(*blob);
return s;
}
offset += write_size;
size_left -= write_size;
if (write_size < chunk.size * block_size_) {
// if we have any space left in the block, fill it up with zeros
string zero_string(chunk.size * block_size_ - write_size, 0);
s = buckets_[chunk.bucket_id].get()->Write(chunk.offset * block_size_ +
write_size,
Slice(zero_string));
}
}
if (size_left > 0) {
Delete(*blob);
return Status::IOError("Tried to write more data than fits in the blob");
}
return Status::OK();
}
Status BlobStore::Get(const Blob& blob,
string* value) const {
{
// assert that it doesn't overlap with free list
// it will get compiled out for release
MutexLock l(&free_list_mutex_);
assert(!free_list_.Overlap(blob));
}
value->resize(blob.Size() * block_size_);
uint64_t offset = 0; // in bytes, not blocks
for (auto chunk : blob.chunks) {
Slice result;
assert(chunk.bucket_id < buckets_size_);
Status s;
s = buckets_[chunk.bucket_id].get()->Read(chunk.offset * block_size_,
chunk.size * block_size_,
&result,
&value->at(offset));
if (!s.ok() || result.size() < chunk.size * block_size_) {
value->clear();
return Status::IOError("Could not read in from file");
}
offset += chunk.size * block_size_;
}
// remove the '\0's at the end of the string
value->erase(find(value->begin(), value->end(), '\0'), value->end());
return Status::OK();
}
Status BlobStore::Delete(const Blob& blob) {
MutexLock l(&free_list_mutex_);
return free_list_.Free(blob);
}
Status BlobStore::Sync() {
for (size_t i = 0; i < buckets_size_; ++i) {
Status s = buckets_[i].get()->Sync();
if (!s.ok()) {
return s;
}
}
return Status::OK();
}
Status BlobStore::Allocate(uint32_t blocks, Blob* blob) {
MutexLock l(&free_list_mutex_);
Status s;
s = free_list_.Allocate(blocks, blob);
if (!s.ok()) {
s = CreateNewBucket();
if (!s.ok()) {
return s;
}
s = free_list_.Allocate(blocks, blob);
}
return s;
}
// called with free_list_mutex_ held
Status BlobStore::CreateNewBucket() {
MutexLock l(&buckets_mutex_);
if (buckets_size_ >= max_buckets_) {
return Status::IOError("Max size exceeded\n");
}
int new_bucket_id = buckets_size_;
char fname[200];
sprintf(fname, "%s/%d.bs", directory_.c_str(), new_bucket_id);
Status s = env_->NewRandomRWFile(string(fname),
&buckets_[new_bucket_id],
storage_options_);
if (!s.ok()) {
return s;
}
// whether Allocate succeeds or not, does not affect the overall correctness
// of this function - calling Allocate is really optional
// (also, tmpfs does not support allocate)
buckets_[new_bucket_id].get()->Allocate(0, block_size_ * blocks_per_bucket_);
buckets_size_ = new_bucket_id + 1;
return free_list_.Free(Blob(new_bucket_id, 0, blocks_per_bucket_));
}
} // namespace rocksdb