Add sst_file_dumper status check (#7315)
Summary: Pull Request resolved: https://github.com/facebook/rocksdb/pull/7315 Test Plan: `ASSERT_STATUS_CHECKED=1 make sst_dump_test && ./sst_dump_test` And manually run `./sst_dump --file=*.sst` before and after the change. Reviewed By: pdillinger Differential Revision: D23361669 Pulled By: jay-zhuang fbshipit-source-id: 5bf51a2a90ee35c8c679e5f604732ec2aef5949a
This commit is contained in:
parent
ef32f11004
commit
27aa443a15
1
Makefile
1
Makefile
@ -607,6 +607,7 @@ ifdef ASSERT_STATUS_CHECKED
|
|||||||
repeatable_thread_test \
|
repeatable_thread_test \
|
||||||
skiplist_test \
|
skiplist_test \
|
||||||
slice_test \
|
slice_test \
|
||||||
|
sst_dump_test \
|
||||||
statistics_test \
|
statistics_test \
|
||||||
thread_local_test \
|
thread_local_test \
|
||||||
env_timed_test \
|
env_timed_test \
|
||||||
|
@ -3180,18 +3180,16 @@ Status BlockBasedTable::GetKVPairsFromDataBlocks(
|
|||||||
}
|
}
|
||||||
|
|
||||||
Status BlockBasedTable::DumpTable(WritableFile* out_file) {
|
Status BlockBasedTable::DumpTable(WritableFile* out_file) {
|
||||||
|
auto out_file_wrapper = WritableFileStringStreamAdapter(out_file);
|
||||||
|
std::ostream out_stream(&out_file_wrapper);
|
||||||
// Output Footer
|
// Output Footer
|
||||||
out_file->Append(
|
out_stream << "Footer Details:\n"
|
||||||
"Footer Details:\n"
|
"--------------------------------------\n";
|
||||||
"--------------------------------------\n"
|
out_stream << " " << rep_->footer.ToString() << "\n";
|
||||||
" ");
|
|
||||||
out_file->Append(rep_->footer.ToString().c_str());
|
|
||||||
out_file->Append("\n");
|
|
||||||
|
|
||||||
// Output MetaIndex
|
// Output MetaIndex
|
||||||
out_file->Append(
|
out_stream << "Metaindex Details:\n"
|
||||||
"Metaindex Details:\n"
|
"--------------------------------------\n";
|
||||||
"--------------------------------------\n");
|
|
||||||
std::unique_ptr<Block> metaindex;
|
std::unique_ptr<Block> metaindex;
|
||||||
std::unique_ptr<InternalIterator> metaindex_iter;
|
std::unique_ptr<InternalIterator> metaindex_iter;
|
||||||
ReadOptions ro;
|
ReadOptions ro;
|
||||||
@ -3204,27 +3202,22 @@ Status BlockBasedTable::DumpTable(WritableFile* out_file) {
|
|||||||
if (!s.ok()) {
|
if (!s.ok()) {
|
||||||
return s;
|
return s;
|
||||||
}
|
}
|
||||||
if (metaindex_iter->key() == ROCKSDB_NAMESPACE::kPropertiesBlock) {
|
if (metaindex_iter->key() == kPropertiesBlock) {
|
||||||
out_file->Append(" Properties block handle: ");
|
out_stream << " Properties block handle: "
|
||||||
out_file->Append(metaindex_iter->value().ToString(true).c_str());
|
<< metaindex_iter->value().ToString(true) << "\n";
|
||||||
out_file->Append("\n");
|
} else if (metaindex_iter->key() == kCompressionDictBlock) {
|
||||||
} else if (metaindex_iter->key() ==
|
out_stream << " Compression dictionary block handle: "
|
||||||
ROCKSDB_NAMESPACE::kCompressionDictBlock) {
|
<< metaindex_iter->value().ToString(true) << "\n";
|
||||||
out_file->Append(" Compression dictionary block handle: ");
|
|
||||||
out_file->Append(metaindex_iter->value().ToString(true).c_str());
|
|
||||||
out_file->Append("\n");
|
|
||||||
} else if (strstr(metaindex_iter->key().ToString().c_str(),
|
} else if (strstr(metaindex_iter->key().ToString().c_str(),
|
||||||
"filter.rocksdb.") != nullptr) {
|
"filter.rocksdb.") != nullptr) {
|
||||||
out_file->Append(" Filter block handle: ");
|
out_stream << " Filter block handle: "
|
||||||
out_file->Append(metaindex_iter->value().ToString(true).c_str());
|
<< metaindex_iter->value().ToString(true) << "\n";
|
||||||
out_file->Append("\n");
|
} else if (metaindex_iter->key() == kRangeDelBlock) {
|
||||||
} else if (metaindex_iter->key() == ROCKSDB_NAMESPACE::kRangeDelBlock) {
|
out_stream << " Range deletion block handle: "
|
||||||
out_file->Append(" Range deletion block handle: ");
|
<< metaindex_iter->value().ToString(true) << "\n";
|
||||||
out_file->Append(metaindex_iter->value().ToString(true).c_str());
|
|
||||||
out_file->Append("\n");
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
out_file->Append("\n");
|
out_stream << "\n";
|
||||||
} else {
|
} else {
|
||||||
return s;
|
return s;
|
||||||
}
|
}
|
||||||
@ -3234,25 +3227,19 @@ Status BlockBasedTable::DumpTable(WritableFile* out_file) {
|
|||||||
table_properties = rep_->table_properties.get();
|
table_properties = rep_->table_properties.get();
|
||||||
|
|
||||||
if (table_properties != nullptr) {
|
if (table_properties != nullptr) {
|
||||||
out_file->Append(
|
out_stream << "Table Properties:\n"
|
||||||
"Table Properties:\n"
|
"--------------------------------------\n";
|
||||||
"--------------------------------------\n"
|
out_stream << " " << table_properties->ToString("\n ", ": ") << "\n";
|
||||||
" ");
|
|
||||||
out_file->Append(table_properties->ToString("\n ", ": ").c_str());
|
|
||||||
out_file->Append("\n");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (rep_->filter) {
|
if (rep_->filter) {
|
||||||
out_file->Append(
|
out_stream << "Filter Details:\n"
|
||||||
"Filter Details:\n"
|
"--------------------------------------\n";
|
||||||
"--------------------------------------\n"
|
out_stream << " " << rep_->filter->ToString() << "\n";
|
||||||
" ");
|
|
||||||
out_file->Append(rep_->filter->ToString().c_str());
|
|
||||||
out_file->Append("\n");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Output Index block
|
// Output Index block
|
||||||
s = DumpIndexBlock(out_file);
|
s = DumpIndexBlock(out_stream);
|
||||||
if (!s.ok()) {
|
if (!s.ok()) {
|
||||||
return s;
|
return s;
|
||||||
}
|
}
|
||||||
@ -3271,15 +3258,10 @@ Status BlockBasedTable::DumpTable(WritableFile* out_file) {
|
|||||||
assert(uncompression_dict.GetValue());
|
assert(uncompression_dict.GetValue());
|
||||||
|
|
||||||
const Slice& raw_dict = uncompression_dict.GetValue()->GetRawDict();
|
const Slice& raw_dict = uncompression_dict.GetValue()->GetRawDict();
|
||||||
out_file->Append(
|
out_stream << "Compression Dictionary:\n"
|
||||||
"Compression Dictionary:\n"
|
"--------------------------------------\n";
|
||||||
"--------------------------------------\n");
|
out_stream << " size (bytes): " << raw_dict.size() << "\n\n";
|
||||||
out_file->Append(" size (bytes): ");
|
out_stream << " HEX " << raw_dict.ToString(true) << "\n\n";
|
||||||
out_file->Append(ROCKSDB_NAMESPACE::ToString(raw_dict.size()));
|
|
||||||
out_file->Append("\n\n");
|
|
||||||
out_file->Append(" HEX ");
|
|
||||||
out_file->Append(raw_dict.ToString(true).c_str());
|
|
||||||
out_file->Append("\n\n");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Output range deletions block
|
// Output range deletions block
|
||||||
@ -3287,39 +3269,44 @@ Status BlockBasedTable::DumpTable(WritableFile* out_file) {
|
|||||||
if (range_del_iter != nullptr) {
|
if (range_del_iter != nullptr) {
|
||||||
range_del_iter->SeekToFirst();
|
range_del_iter->SeekToFirst();
|
||||||
if (range_del_iter->Valid()) {
|
if (range_del_iter->Valid()) {
|
||||||
out_file->Append(
|
out_stream << "Range deletions:\n"
|
||||||
"Range deletions:\n"
|
"--------------------------------------\n";
|
||||||
"--------------------------------------\n"
|
|
||||||
" ");
|
|
||||||
for (; range_del_iter->Valid(); range_del_iter->Next()) {
|
for (; range_del_iter->Valid(); range_del_iter->Next()) {
|
||||||
DumpKeyValue(range_del_iter->key(), range_del_iter->value(), out_file);
|
DumpKeyValue(range_del_iter->key(), range_del_iter->value(),
|
||||||
|
out_stream);
|
||||||
}
|
}
|
||||||
out_file->Append("\n");
|
out_stream << "\n";
|
||||||
}
|
}
|
||||||
delete range_del_iter;
|
delete range_del_iter;
|
||||||
}
|
}
|
||||||
// Output Data blocks
|
// Output Data blocks
|
||||||
s = DumpDataBlocks(out_file);
|
s = DumpDataBlocks(out_stream);
|
||||||
|
|
||||||
return s;
|
if (!s.ok()) {
|
||||||
|
return s;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!out_stream.good()) {
|
||||||
|
return Status::IOError("Failed to write to output file");
|
||||||
|
}
|
||||||
|
return Status::OK();
|
||||||
}
|
}
|
||||||
|
|
||||||
Status BlockBasedTable::DumpIndexBlock(WritableFile* out_file) {
|
Status BlockBasedTable::DumpIndexBlock(std::ostream& out_stream) {
|
||||||
out_file->Append(
|
out_stream << "Index Details:\n"
|
||||||
"Index Details:\n"
|
"--------------------------------------\n";
|
||||||
"--------------------------------------\n");
|
|
||||||
std::unique_ptr<InternalIteratorBase<IndexValue>> blockhandles_iter(
|
std::unique_ptr<InternalIteratorBase<IndexValue>> blockhandles_iter(
|
||||||
NewIndexIterator(ReadOptions(), /*need_upper_bound_check=*/false,
|
NewIndexIterator(ReadOptions(), /*need_upper_bound_check=*/false,
|
||||||
/*input_iter=*/nullptr, /*get_context=*/nullptr,
|
/*input_iter=*/nullptr, /*get_context=*/nullptr,
|
||||||
/*lookup_contex=*/nullptr));
|
/*lookup_contex=*/nullptr));
|
||||||
Status s = blockhandles_iter->status();
|
Status s = blockhandles_iter->status();
|
||||||
if (!s.ok()) {
|
if (!s.ok()) {
|
||||||
out_file->Append("Can not read Index Block \n\n");
|
out_stream << "Can not read Index Block \n\n";
|
||||||
return s;
|
return s;
|
||||||
}
|
}
|
||||||
|
|
||||||
out_file->Append(" Block key hex dump: Data block handle\n");
|
out_stream << " Block key hex dump: Data block handle\n";
|
||||||
out_file->Append(" Block key ascii\n\n");
|
out_stream << " Block key ascii\n\n";
|
||||||
for (blockhandles_iter->SeekToFirst(); blockhandles_iter->Valid();
|
for (blockhandles_iter->SeekToFirst(); blockhandles_iter->Valid();
|
||||||
blockhandles_iter->Next()) {
|
blockhandles_iter->Next()) {
|
||||||
s = blockhandles_iter->status();
|
s = blockhandles_iter->status();
|
||||||
@ -3336,13 +3323,10 @@ Status BlockBasedTable::DumpIndexBlock(WritableFile* out_file) {
|
|||||||
user_key = ikey.user_key();
|
user_key = ikey.user_key();
|
||||||
}
|
}
|
||||||
|
|
||||||
out_file->Append(" HEX ");
|
out_stream << " HEX " << user_key.ToString(true) << ": "
|
||||||
out_file->Append(user_key.ToString(true).c_str());
|
<< blockhandles_iter->value().ToString(true,
|
||||||
out_file->Append(": ");
|
rep_->index_has_first_key)
|
||||||
out_file->Append(blockhandles_iter->value()
|
<< "\n";
|
||||||
.ToString(true, rep_->index_has_first_key)
|
|
||||||
.c_str());
|
|
||||||
out_file->Append("\n");
|
|
||||||
|
|
||||||
std::string str_key = user_key.ToString();
|
std::string str_key = user_key.ToString();
|
||||||
std::string res_key("");
|
std::string res_key("");
|
||||||
@ -3351,22 +3335,21 @@ Status BlockBasedTable::DumpIndexBlock(WritableFile* out_file) {
|
|||||||
res_key.append(&str_key[i], 1);
|
res_key.append(&str_key[i], 1);
|
||||||
res_key.append(1, cspace);
|
res_key.append(1, cspace);
|
||||||
}
|
}
|
||||||
out_file->Append(" ASCII ");
|
out_stream << " ASCII " << res_key << "\n";
|
||||||
out_file->Append(res_key.c_str());
|
out_stream << " ------\n";
|
||||||
out_file->Append("\n ------\n");
|
|
||||||
}
|
}
|
||||||
out_file->Append("\n");
|
out_stream << "\n";
|
||||||
return Status::OK();
|
return Status::OK();
|
||||||
}
|
}
|
||||||
|
|
||||||
Status BlockBasedTable::DumpDataBlocks(WritableFile* out_file) {
|
Status BlockBasedTable::DumpDataBlocks(std::ostream& out_stream) {
|
||||||
std::unique_ptr<InternalIteratorBase<IndexValue>> blockhandles_iter(
|
std::unique_ptr<InternalIteratorBase<IndexValue>> blockhandles_iter(
|
||||||
NewIndexIterator(ReadOptions(), /*need_upper_bound_check=*/false,
|
NewIndexIterator(ReadOptions(), /*need_upper_bound_check=*/false,
|
||||||
/*input_iter=*/nullptr, /*get_context=*/nullptr,
|
/*input_iter=*/nullptr, /*get_context=*/nullptr,
|
||||||
/*lookup_contex=*/nullptr));
|
/*lookup_contex=*/nullptr));
|
||||||
Status s = blockhandles_iter->status();
|
Status s = blockhandles_iter->status();
|
||||||
if (!s.ok()) {
|
if (!s.ok()) {
|
||||||
out_file->Append("Can not read Index Block \n\n");
|
out_stream << "Can not read Index Block \n\n";
|
||||||
return s;
|
return s;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -3388,12 +3371,9 @@ Status BlockBasedTable::DumpDataBlocks(WritableFile* out_file) {
|
|||||||
datablock_size_max = std::max(datablock_size_max, datablock_size);
|
datablock_size_max = std::max(datablock_size_max, datablock_size);
|
||||||
datablock_size_sum += datablock_size;
|
datablock_size_sum += datablock_size;
|
||||||
|
|
||||||
out_file->Append("Data Block # ");
|
out_stream << "Data Block # " << block_id << " @ "
|
||||||
out_file->Append(ROCKSDB_NAMESPACE::ToString(block_id));
|
<< blockhandles_iter->value().handle.ToString(true) << "\n";
|
||||||
out_file->Append(" @ ");
|
out_stream << "--------------------------------------\n";
|
||||||
out_file->Append(blockhandles_iter->value().handle.ToString(true).c_str());
|
|
||||||
out_file->Append("\n");
|
|
||||||
out_file->Append("--------------------------------------\n");
|
|
||||||
|
|
||||||
std::unique_ptr<InternalIterator> datablock_iter;
|
std::unique_ptr<InternalIterator> datablock_iter;
|
||||||
datablock_iter.reset(NewDataBlockIterator<DataBlockIter>(
|
datablock_iter.reset(NewDataBlockIterator<DataBlockIter>(
|
||||||
@ -3404,7 +3384,7 @@ Status BlockBasedTable::DumpDataBlocks(WritableFile* out_file) {
|
|||||||
s = datablock_iter->status();
|
s = datablock_iter->status();
|
||||||
|
|
||||||
if (!s.ok()) {
|
if (!s.ok()) {
|
||||||
out_file->Append("Error reading the block - Skipped \n\n");
|
out_stream << "Error reading the block - Skipped \n\n";
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -3412,44 +3392,37 @@ Status BlockBasedTable::DumpDataBlocks(WritableFile* out_file) {
|
|||||||
datablock_iter->Next()) {
|
datablock_iter->Next()) {
|
||||||
s = datablock_iter->status();
|
s = datablock_iter->status();
|
||||||
if (!s.ok()) {
|
if (!s.ok()) {
|
||||||
out_file->Append("Error reading the block - Skipped \n");
|
out_stream << "Error reading the block - Skipped \n";
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
DumpKeyValue(datablock_iter->key(), datablock_iter->value(), out_file);
|
DumpKeyValue(datablock_iter->key(), datablock_iter->value(), out_stream);
|
||||||
}
|
}
|
||||||
out_file->Append("\n");
|
out_stream << "\n";
|
||||||
}
|
}
|
||||||
|
|
||||||
uint64_t num_datablocks = block_id - 1;
|
uint64_t num_datablocks = block_id - 1;
|
||||||
if (num_datablocks) {
|
if (num_datablocks) {
|
||||||
double datablock_size_avg =
|
double datablock_size_avg =
|
||||||
static_cast<double>(datablock_size_sum) / num_datablocks;
|
static_cast<double>(datablock_size_sum) / num_datablocks;
|
||||||
out_file->Append("Data Block Summary:\n");
|
out_stream << "Data Block Summary:\n";
|
||||||
out_file->Append("--------------------------------------");
|
out_stream << "--------------------------------------\n";
|
||||||
out_file->Append("\n # data blocks: ");
|
out_stream << " # data blocks: " << num_datablocks << "\n";
|
||||||
out_file->Append(ROCKSDB_NAMESPACE::ToString(num_datablocks));
|
out_stream << " min data block size: " << datablock_size_min << "\n";
|
||||||
out_file->Append("\n min data block size: ");
|
out_stream << " max data block size: " << datablock_size_max << "\n";
|
||||||
out_file->Append(ROCKSDB_NAMESPACE::ToString(datablock_size_min));
|
out_stream << " avg data block size: " << ToString(datablock_size_avg)
|
||||||
out_file->Append("\n max data block size: ");
|
<< "\n";
|
||||||
out_file->Append(ROCKSDB_NAMESPACE::ToString(datablock_size_max));
|
|
||||||
out_file->Append("\n avg data block size: ");
|
|
||||||
out_file->Append(ROCKSDB_NAMESPACE::ToString(datablock_size_avg));
|
|
||||||
out_file->Append("\n");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return Status::OK();
|
return Status::OK();
|
||||||
}
|
}
|
||||||
|
|
||||||
void BlockBasedTable::DumpKeyValue(const Slice& key, const Slice& value,
|
void BlockBasedTable::DumpKeyValue(const Slice& key, const Slice& value,
|
||||||
WritableFile* out_file) {
|
std::ostream& out_stream) {
|
||||||
InternalKey ikey;
|
InternalKey ikey;
|
||||||
ikey.DecodeFrom(key);
|
ikey.DecodeFrom(key);
|
||||||
|
|
||||||
out_file->Append(" HEX ");
|
out_stream << " HEX " << ikey.user_key().ToString(true) << ": "
|
||||||
out_file->Append(ikey.user_key().ToString(true).c_str());
|
<< value.ToString(true) << "\n";
|
||||||
out_file->Append(": ");
|
|
||||||
out_file->Append(value.ToString(true).c_str());
|
|
||||||
out_file->Append("\n");
|
|
||||||
|
|
||||||
std::string str_key = ikey.user_key().ToString();
|
std::string str_key = ikey.user_key().ToString();
|
||||||
std::string str_value = value.ToString();
|
std::string str_value = value.ToString();
|
||||||
@ -3472,11 +3445,8 @@ void BlockBasedTable::DumpKeyValue(const Slice& key, const Slice& value,
|
|||||||
res_value.append(1, cspace);
|
res_value.append(1, cspace);
|
||||||
}
|
}
|
||||||
|
|
||||||
out_file->Append(" ASCII ");
|
out_stream << " ASCII " << res_key << ": " << res_value << "\n";
|
||||||
out_file->Append(res_key.c_str());
|
out_stream << " ------\n";
|
||||||
out_file->Append(": ");
|
|
||||||
out_file->Append(res_value.c_str());
|
|
||||||
out_file->Append("\n ------\n");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
} // namespace ROCKSDB_NAMESPACE
|
} // namespace ROCKSDB_NAMESPACE
|
||||||
|
@ -472,10 +472,10 @@ class BlockBasedTable : public TableReader {
|
|||||||
uint64_t data_size) const;
|
uint64_t data_size) const;
|
||||||
|
|
||||||
// Helper functions for DumpTable()
|
// Helper functions for DumpTable()
|
||||||
Status DumpIndexBlock(WritableFile* out_file);
|
Status DumpIndexBlock(std::ostream& out_stream);
|
||||||
Status DumpDataBlocks(WritableFile* out_file);
|
Status DumpDataBlocks(std::ostream& out_stream);
|
||||||
void DumpKeyValue(const Slice& key, const Slice& value,
|
void DumpKeyValue(const Slice& key, const Slice& value,
|
||||||
WritableFile* out_file);
|
std::ostream& out_stream);
|
||||||
|
|
||||||
// A cumulative data block file read in MultiGet lower than this size will
|
// A cumulative data block file read in MultiGet lower than this size will
|
||||||
// use a stack buffer
|
// use a stack buffer
|
||||||
@ -642,4 +642,40 @@ struct BlockBasedTable::Rep {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// This is an adapter class for `WritableFile` to be used for `std::ostream`.
|
||||||
|
// The adapter wraps a `WritableFile`, which can be passed to a `std::ostream`
|
||||||
|
// constructor for storing streaming data.
|
||||||
|
// Note:
|
||||||
|
// * This adapter doesn't provide any buffering, each write is forwarded to
|
||||||
|
// `WritableFile->Append()` directly.
|
||||||
|
// * For a failed write, the user needs to check the status by `ostream.good()`
|
||||||
|
class WritableFileStringStreamAdapter : public std::stringbuf {
|
||||||
|
public:
|
||||||
|
explicit WritableFileStringStreamAdapter(WritableFile* writable_file)
|
||||||
|
: file_(writable_file) {}
|
||||||
|
|
||||||
|
// This is to handle `std::endl`, `endl` is written by `os.put()` directly
|
||||||
|
// without going through `xsputn()`. As we explicitly disabled buffering,
|
||||||
|
// every write, not captured by xsputn, is an overflow.
|
||||||
|
int overflow(int ch = EOF) override {
|
||||||
|
if (ch == '\n') {
|
||||||
|
file_->Append("\n");
|
||||||
|
return ch;
|
||||||
|
}
|
||||||
|
return EOF;
|
||||||
|
}
|
||||||
|
|
||||||
|
std::streamsize xsputn(char const* p, std::streamsize n) override {
|
||||||
|
Status s = file_->Append(Slice(p, n));
|
||||||
|
if (!s.ok()) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
return n;
|
||||||
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
WritableFile* file_;
|
||||||
|
};
|
||||||
|
|
||||||
} // namespace ROCKSDB_NAMESPACE
|
} // namespace ROCKSDB_NAMESPACE
|
||||||
|
@ -105,8 +105,8 @@ Status SstFileDumper::GetTableReader(const std::string& file_path) {
|
|||||||
: file_size;
|
: file_size;
|
||||||
uint64_t prefetch_off = file_size - prefetch_size;
|
uint64_t prefetch_off = file_size - prefetch_size;
|
||||||
IOOptions opts;
|
IOOptions opts;
|
||||||
prefetch_buffer.Prefetch(opts, file_.get(), prefetch_off,
|
s = prefetch_buffer.Prefetch(opts, file_.get(), prefetch_off,
|
||||||
static_cast<size_t>(prefetch_size));
|
static_cast<size_t>(prefetch_size));
|
||||||
|
|
||||||
s = ReadFooterFromFile(opts, file_.get(), &prefetch_buffer, file_size,
|
s = ReadFooterFromFile(opts, file_.get(), &prefetch_buffer, file_size,
|
||||||
&footer);
|
&footer);
|
||||||
@ -130,9 +130,9 @@ Status SstFileDumper::GetTableReader(const std::string& file_path) {
|
|||||||
? &prefetch_buffer
|
? &prefetch_buffer
|
||||||
: nullptr)
|
: nullptr)
|
||||||
.ok()) {
|
.ok()) {
|
||||||
SetTableOptionsByMagicNumber(magic_number);
|
s = SetTableOptionsByMagicNumber(magic_number);
|
||||||
} else {
|
} else {
|
||||||
SetOldTableOptions();
|
s = SetOldTableOptions();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -178,16 +178,23 @@ Status SstFileDumper::DumpTable(const std::string& out_filename) {
|
|||||||
Env* env = options_.env;
|
Env* env = options_.env;
|
||||||
env->NewWritableFile(out_filename, &out_file, soptions_);
|
env->NewWritableFile(out_filename, &out_file, soptions_);
|
||||||
Status s = table_reader_->DumpTable(out_file.get());
|
Status s = table_reader_->DumpTable(out_file.get());
|
||||||
out_file->Close();
|
if (!s.ok()) {
|
||||||
return s;
|
// close the file before return error, ignore the close error if there's any
|
||||||
|
out_file->Close().PermitUncheckedError();
|
||||||
|
return s;
|
||||||
|
}
|
||||||
|
return out_file->Close();
|
||||||
}
|
}
|
||||||
|
|
||||||
uint64_t SstFileDumper::CalculateCompressedTableSize(
|
Status SstFileDumper::CalculateCompressedTableSize(
|
||||||
const TableBuilderOptions& tb_options, size_t block_size,
|
const TableBuilderOptions& tb_options, size_t block_size,
|
||||||
uint64_t* num_data_blocks) {
|
uint64_t* num_data_blocks, uint64_t* compressed_table_size) {
|
||||||
std::unique_ptr<WritableFile> out_file;
|
std::unique_ptr<WritableFile> out_file;
|
||||||
std::unique_ptr<Env> env(NewMemEnv(options_.env));
|
std::unique_ptr<Env> env(NewMemEnv(options_.env));
|
||||||
env->NewWritableFile(testFileName, &out_file, soptions_);
|
Status s = env->NewWritableFile(testFileName, &out_file, soptions_);
|
||||||
|
if (!s.ok()) {
|
||||||
|
return s;
|
||||||
|
}
|
||||||
std::unique_ptr<WritableFileWriter> dest_writer;
|
std::unique_ptr<WritableFileWriter> dest_writer;
|
||||||
dest_writer.reset(
|
dest_writer.reset(
|
||||||
new WritableFileWriter(NewLegacyWritableFileWrapper(std::move(out_file)),
|
new WritableFileWriter(NewLegacyWritableFileWrapper(std::move(out_file)),
|
||||||
@ -206,23 +213,21 @@ uint64_t SstFileDumper::CalculateCompressedTableSize(
|
|||||||
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
|
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
|
||||||
table_builder->Add(iter->key(), iter->value());
|
table_builder->Add(iter->key(), iter->value());
|
||||||
}
|
}
|
||||||
if (!iter->status().ok()) {
|
s = iter->status();
|
||||||
fputs(iter->status().ToString().c_str(), stderr);
|
|
||||||
exit(1);
|
|
||||||
}
|
|
||||||
Status s = table_builder->Finish();
|
|
||||||
if (!s.ok()) {
|
if (!s.ok()) {
|
||||||
fputs(s.ToString().c_str(), stderr);
|
return s;
|
||||||
exit(1);
|
|
||||||
}
|
}
|
||||||
uint64_t size = table_builder->FileSize();
|
s = table_builder->Finish();
|
||||||
|
if (!s.ok()) {
|
||||||
|
return s;
|
||||||
|
}
|
||||||
|
*compressed_table_size = table_builder->FileSize();
|
||||||
assert(num_data_blocks != nullptr);
|
assert(num_data_blocks != nullptr);
|
||||||
*num_data_blocks = table_builder->GetTableProperties().num_data_blocks;
|
*num_data_blocks = table_builder->GetTableProperties().num_data_blocks;
|
||||||
env->DeleteFile(testFileName);
|
return env->DeleteFile(testFileName);
|
||||||
return size;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int SstFileDumper::ShowAllCompressionSizes(
|
Status SstFileDumper::ShowAllCompressionSizes(
|
||||||
size_t block_size,
|
size_t block_size,
|
||||||
const std::vector<std::pair<CompressionType, const char*>>&
|
const std::vector<std::pair<CompressionType, const char*>>&
|
||||||
compression_types,
|
compression_types,
|
||||||
@ -238,18 +243,21 @@ int SstFileDumper::ShowAllCompressionSizes(
|
|||||||
for (int32_t j = compress_level_from; j <= compress_level_to; j++) {
|
for (int32_t j = compress_level_from; j <= compress_level_to; j++) {
|
||||||
fprintf(stdout, "Compression level: %d", j);
|
fprintf(stdout, "Compression level: %d", j);
|
||||||
compress_opt.level = j;
|
compress_opt.level = j;
|
||||||
ShowCompressionSize(block_size, i.first, compress_opt);
|
Status s = ShowCompressionSize(block_size, i.first, compress_opt);
|
||||||
|
if (!s.ok()) {
|
||||||
|
return s;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
fprintf(stdout, "Unsupported compression type: %s.\n", i.second);
|
fprintf(stdout, "Unsupported compression type: %s.\n", i.second);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return 0;
|
return Status::OK();
|
||||||
}
|
}
|
||||||
|
|
||||||
int SstFileDumper::ShowCompressionSize(size_t block_size,
|
Status SstFileDumper::ShowCompressionSize(
|
||||||
CompressionType compress_type,
|
size_t block_size, CompressionType compress_type,
|
||||||
const CompressionOptions& compress_opt) {
|
const CompressionOptions& compress_opt) {
|
||||||
Options opts;
|
Options opts;
|
||||||
opts.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
|
opts.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
|
||||||
opts.statistics->set_stats_level(StatsLevel::kAll);
|
opts.statistics->set_stats_level(StatsLevel::kAll);
|
||||||
@ -269,8 +277,13 @@ int SstFileDumper::ShowCompressionSize(size_t block_size,
|
|||||||
uint64_t num_data_blocks = 0;
|
uint64_t num_data_blocks = 0;
|
||||||
std::chrono::steady_clock::time_point start =
|
std::chrono::steady_clock::time_point start =
|
||||||
std::chrono::steady_clock::now();
|
std::chrono::steady_clock::now();
|
||||||
uint64_t file_size =
|
uint64_t file_size;
|
||||||
CalculateCompressedTableSize(tb_opts, block_size, &num_data_blocks);
|
Status s = CalculateCompressedTableSize(tb_opts, block_size, &num_data_blocks,
|
||||||
|
&file_size);
|
||||||
|
if (!s.ok()) {
|
||||||
|
return s;
|
||||||
|
}
|
||||||
|
|
||||||
std::chrono::steady_clock::time_point end = std::chrono::steady_clock::now();
|
std::chrono::steady_clock::time_point end = std::chrono::steady_clock::now();
|
||||||
fprintf(stdout, " Size: %10" PRIu64, file_size);
|
fprintf(stdout, " Size: %10" PRIu64, file_size);
|
||||||
fprintf(stdout, " Blocks: %6" PRIu64, num_data_blocks);
|
fprintf(stdout, " Blocks: %6" PRIu64, num_data_blocks);
|
||||||
@ -313,7 +326,7 @@ int SstFileDumper::ShowCompressionSize(size_t block_size,
|
|||||||
ratio_not_compressed_blocks, ratio_not_compressed_pcnt);
|
ratio_not_compressed_blocks, ratio_not_compressed_pcnt);
|
||||||
fprintf(stdout, " Not compressed (abort): %6" PRIu64 " (%5.1f%%)\n",
|
fprintf(stdout, " Not compressed (abort): %6" PRIu64 " (%5.1f%%)\n",
|
||||||
not_compressed_blocks, not_compressed_pcnt);
|
not_compressed_blocks, not_compressed_pcnt);
|
||||||
return 0;
|
return Status::OK();
|
||||||
}
|
}
|
||||||
|
|
||||||
Status SstFileDumper::ReadTableProperties(uint64_t table_magic_number,
|
Status SstFileDumper::ReadTableProperties(uint64_t table_magic_number,
|
||||||
|
@ -35,17 +35,15 @@ class SstFileDumper {
|
|||||||
Status DumpTable(const std::string& out_filename);
|
Status DumpTable(const std::string& out_filename);
|
||||||
Status getStatus() { return init_result_; }
|
Status getStatus() { return init_result_; }
|
||||||
|
|
||||||
int ShowAllCompressionSizes(
|
Status ShowAllCompressionSizes(
|
||||||
size_t block_size,
|
size_t block_size,
|
||||||
const std::vector<std::pair<CompressionType, const char*>>&
|
const std::vector<std::pair<CompressionType, const char*>>&
|
||||||
compression_types,
|
compression_types,
|
||||||
int32_t compress_level_from, int32_t compress_level_to,
|
int32_t compress_level_from, int32_t compress_level_to,
|
||||||
uint32_t max_dict_bytes, uint32_t zstd_max_train_bytes);
|
uint32_t max_dict_bytes, uint32_t zstd_max_train_bytes);
|
||||||
|
|
||||||
int ShowCompressionSize(
|
Status ShowCompressionSize(size_t block_size, CompressionType compress_type,
|
||||||
size_t block_size,
|
const CompressionOptions& compress_opt);
|
||||||
CompressionType compress_type,
|
|
||||||
const CompressionOptions& compress_opt);
|
|
||||||
|
|
||||||
private:
|
private:
|
||||||
// Get the TableReader implementation for the sst file
|
// Get the TableReader implementation for the sst file
|
||||||
@ -54,9 +52,10 @@ class SstFileDumper {
|
|||||||
RandomAccessFileReader* file, uint64_t file_size,
|
RandomAccessFileReader* file, uint64_t file_size,
|
||||||
FilePrefetchBuffer* prefetch_buffer);
|
FilePrefetchBuffer* prefetch_buffer);
|
||||||
|
|
||||||
uint64_t CalculateCompressedTableSize(const TableBuilderOptions& tb_options,
|
Status CalculateCompressedTableSize(const TableBuilderOptions& tb_options,
|
||||||
size_t block_size,
|
size_t block_size,
|
||||||
uint64_t* num_data_blocks);
|
uint64_t* num_data_blocks,
|
||||||
|
uint64_t* compressed_table_size);
|
||||||
|
|
||||||
Status SetTableOptionsByMagicNumber(uint64_t table_magic_number);
|
Status SetTableOptionsByMagicNumber(uint64_t table_magic_number);
|
||||||
Status SetOldTableOptions();
|
Status SetOldTableOptions();
|
||||||
|
@ -39,48 +39,12 @@ static std::string MakeValue(int i) {
|
|||||||
return key.Encode().ToString();
|
return key.Encode().ToString();
|
||||||
}
|
}
|
||||||
|
|
||||||
void createSST(const Options& opts, const std::string& file_name) {
|
|
||||||
Env* env = opts.env;
|
|
||||||
EnvOptions env_options(opts);
|
|
||||||
ReadOptions read_options;
|
|
||||||
const ImmutableCFOptions imoptions(opts);
|
|
||||||
const MutableCFOptions moptions(opts);
|
|
||||||
ROCKSDB_NAMESPACE::InternalKeyComparator ikc(opts.comparator);
|
|
||||||
std::unique_ptr<TableBuilder> tb;
|
|
||||||
|
|
||||||
std::unique_ptr<WritableFile> file;
|
|
||||||
ASSERT_OK(env->NewWritableFile(file_name, &file, env_options));
|
|
||||||
|
|
||||||
std::vector<std::unique_ptr<IntTblPropCollectorFactory> >
|
|
||||||
int_tbl_prop_collector_factories;
|
|
||||||
std::unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter(
|
|
||||||
NewLegacyWritableFileWrapper(std::move(file)), file_name, EnvOptions()));
|
|
||||||
std::string column_family_name;
|
|
||||||
int unknown_level = -1;
|
|
||||||
tb.reset(opts.table_factory->NewTableBuilder(
|
|
||||||
TableBuilderOptions(
|
|
||||||
imoptions, moptions, ikc, &int_tbl_prop_collector_factories,
|
|
||||||
CompressionType::kNoCompression, 0 /* sample_for_compression */,
|
|
||||||
CompressionOptions(), false /* skip_filters */, column_family_name,
|
|
||||||
unknown_level),
|
|
||||||
TablePropertiesCollectorFactory::Context::kUnknownColumnFamily,
|
|
||||||
file_writer.get()));
|
|
||||||
|
|
||||||
// Populate slightly more than 1K keys
|
|
||||||
uint32_t num_keys = 1024;
|
|
||||||
for (uint32_t i = 0; i < num_keys; i++) {
|
|
||||||
tb->Add(MakeKey(i), MakeValue(i));
|
|
||||||
}
|
|
||||||
tb->Finish();
|
|
||||||
file_writer->Close();
|
|
||||||
}
|
|
||||||
|
|
||||||
void cleanup(const Options& opts, const std::string& file_name) {
|
void cleanup(const Options& opts, const std::string& file_name) {
|
||||||
Env* env = opts.env;
|
Env* env = opts.env;
|
||||||
env->DeleteFile(file_name);
|
ASSERT_OK(env->DeleteFile(file_name));
|
||||||
std::string outfile_name = file_name.substr(0, file_name.length() - 4);
|
std::string outfile_name = file_name.substr(0, file_name.length() - 4);
|
||||||
outfile_name.append("_dump.txt");
|
outfile_name.append("_dump.txt");
|
||||||
env->DeleteFile(outfile_name);
|
env->DeleteFile(outfile_name).PermitUncheckedError();
|
||||||
}
|
}
|
||||||
} // namespace
|
} // namespace
|
||||||
|
|
||||||
@ -127,8 +91,50 @@ class SSTDumpToolTest : public testing::Test {
|
|||||||
snprintf(usage[1], kOptLength, "%s", command);
|
snprintf(usage[1], kOptLength, "%s", command);
|
||||||
snprintf(usage[2], kOptLength, "--file=%s", file_path.c_str());
|
snprintf(usage[2], kOptLength, "--file=%s", file_path.c_str());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void createSST(const Options& opts, const std::string& file_name) {
|
||||||
|
Env* env = opts.env;
|
||||||
|
EnvOptions env_options(opts);
|
||||||
|
ReadOptions read_options;
|
||||||
|
const ImmutableCFOptions imoptions(opts);
|
||||||
|
const MutableCFOptions moptions(opts);
|
||||||
|
ROCKSDB_NAMESPACE::InternalKeyComparator ikc(opts.comparator);
|
||||||
|
std::unique_ptr<TableBuilder> tb;
|
||||||
|
|
||||||
|
std::unique_ptr<WritableFile> file;
|
||||||
|
ASSERT_OK(env->NewWritableFile(file_name, &file, env_options));
|
||||||
|
|
||||||
|
std::vector<std::unique_ptr<IntTblPropCollectorFactory> >
|
||||||
|
int_tbl_prop_collector_factories;
|
||||||
|
std::unique_ptr<WritableFileWriter> file_writer(
|
||||||
|
new WritableFileWriter(NewLegacyWritableFileWrapper(std::move(file)),
|
||||||
|
file_name, EnvOptions()));
|
||||||
|
std::string column_family_name;
|
||||||
|
int unknown_level = -1;
|
||||||
|
tb.reset(opts.table_factory->NewTableBuilder(
|
||||||
|
TableBuilderOptions(
|
||||||
|
imoptions, moptions, ikc, &int_tbl_prop_collector_factories,
|
||||||
|
CompressionType::kNoCompression, 0 /* sample_for_compression */,
|
||||||
|
CompressionOptions(), false /* skip_filters */, column_family_name,
|
||||||
|
unknown_level),
|
||||||
|
TablePropertiesCollectorFactory::Context::kUnknownColumnFamily,
|
||||||
|
file_writer.get()));
|
||||||
|
|
||||||
|
// Populate slightly more than 1K keys
|
||||||
|
uint32_t num_keys = kNumKey;
|
||||||
|
for (uint32_t i = 0; i < num_keys; i++) {
|
||||||
|
tb->Add(MakeKey(i), MakeValue(i));
|
||||||
|
}
|
||||||
|
ASSERT_OK(tb->Finish());
|
||||||
|
file_writer->Close();
|
||||||
|
}
|
||||||
|
|
||||||
|
protected:
|
||||||
|
constexpr static int kNumKey = 1024;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
constexpr int SSTDumpToolTest::kNumKey;
|
||||||
|
|
||||||
TEST_F(SSTDumpToolTest, HelpAndVersion) {
|
TEST_F(SSTDumpToolTest, HelpAndVersion) {
|
||||||
Options opts;
|
Options opts;
|
||||||
opts.env = env();
|
opts.env = env();
|
||||||
@ -356,6 +362,43 @@ TEST_F(SSTDumpToolTest, ValidSSTPath) {
|
|||||||
delete[] usage[i];
|
delete[] usage[i];
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TEST_F(SSTDumpToolTest, RawOutput) {
|
||||||
|
Options opts;
|
||||||
|
opts.env = env();
|
||||||
|
std::string file_path = MakeFilePath("rocksdb_sst_test.sst");
|
||||||
|
createSST(opts, file_path);
|
||||||
|
|
||||||
|
char* usage[3];
|
||||||
|
PopulateCommandArgs(file_path, "--command=raw", usage);
|
||||||
|
|
||||||
|
ROCKSDB_NAMESPACE::SSTDumpTool tool;
|
||||||
|
ASSERT_TRUE(!tool.Run(3, usage, opts));
|
||||||
|
|
||||||
|
const std::string raw_path = MakeFilePath("rocksdb_sst_test_dump.txt");
|
||||||
|
std::ifstream raw_file(raw_path);
|
||||||
|
|
||||||
|
std::string tp;
|
||||||
|
bool is_data_block = false;
|
||||||
|
int key_count = 0;
|
||||||
|
while (getline(raw_file, tp)) {
|
||||||
|
if (tp.find("Data Block #") != std::string::npos) {
|
||||||
|
is_data_block = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (is_data_block && tp.find("HEX") != std::string::npos) {
|
||||||
|
key_count++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
ASSERT_EQ(kNumKey, key_count);
|
||||||
|
|
||||||
|
cleanup(opts, file_path);
|
||||||
|
for (int i = 0; i < 3; i++) {
|
||||||
|
delete[] usage[i];
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
} // namespace ROCKSDB_NAMESPACE
|
} // namespace ROCKSDB_NAMESPACE
|
||||||
|
|
||||||
#ifdef ROCKSDB_UNITTESTS_WITH_CUSTOM_OBJECTS_FROM_STATIC_LIBS
|
#ifdef ROCKSDB_UNITTESTS_WITH_CUSTOM_OBJECTS_FROM_STATIC_LIBS
|
||||||
|
@ -399,11 +399,15 @@ int SSTDumpTool::Run(int argc, char const* const* argv, Options options) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (command == "recompress") {
|
if (command == "recompress") {
|
||||||
dumper.ShowAllCompressionSizes(
|
st = dumper.ShowAllCompressionSizes(
|
||||||
set_block_size ? block_size : 16384,
|
set_block_size ? block_size : 16384,
|
||||||
compression_types.empty() ? kCompressions : compression_types,
|
compression_types.empty() ? kCompressions : compression_types,
|
||||||
compress_level_from, compress_level_to, compression_max_dict_bytes,
|
compress_level_from, compress_level_to, compression_max_dict_bytes,
|
||||||
compression_zstd_max_train_bytes);
|
compression_zstd_max_train_bytes);
|
||||||
|
if (!st.ok()) {
|
||||||
|
fprintf(stderr, "Failed to recompress: %s\n", st.ToString().c_str());
|
||||||
|
exit(1);
|
||||||
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user