From 60bd8015f21fdb63d5409b1191f8ea9d8f1a1b87 Mon Sep 17 00:00:00 2001 From: "gabor@google.com" Date: Thu, 21 Jul 2011 02:40:18 +0000 Subject: [PATCH] Speed up Snappy uncompression, new Logger interface. - Removed one copy of an uncompressed block contents changing the signature of Snappy_Uncompress() so it uncompresses into a flat array instead of a std::string. Speeds up readrandom ~10%. - Instead of a combination of Env/WritableFile, we now have a Logger interface that can be easily overridden applications that want to supply their own logging. - Separated out the gcc and Sun Studio parts of atomic_pointer.h so we can use 'asm', 'volatile' keywords for Sun Studio. git-svn-id: https://leveldb.googlecode.com/svn/trunk@39 62dab493-f737-651d-591e-8d6aee1b9529 --- db/db_bench.cc | 7 +-- db/db_impl.cc | 45 +++++++----------- db/repair.cc | 20 ++++---- db/version_set.cc | 4 +- include/leveldb/env.h | 29 +++++++++--- include/leveldb/options.h | 6 +-- port/atomic_pointer.h | 14 ++++-- port/port_android.h | 8 +++- port/port_chromium.cc | 20 ++++---- port/port_chromium.h | 4 +- port/port_example.h | 12 ++++- port/port_posix.h | 30 ++++++------ table/format.cc | 20 ++++---- util/env.cc | 15 ++++-- util/env_chromium.cc | 74 ++++++----------------------- util/env_posix.cc | 74 +++++------------------------ util/posix_logger.h | 97 +++++++++++++++++++++++++++++++++++++++ 17 files changed, 264 insertions(+), 215 deletions(-) create mode 100644 util/posix_logger.h diff --git a/db/db_bench.cc b/db/db_bench.cc index 53b8c539c..7b4e41aab 100644 --- a/db/db_bench.cc +++ b/db/db_bench.cc @@ -472,13 +472,14 @@ class Benchmark { std::string compressed; bool ok = port::Snappy_Compress(input.data(), input.size(), &compressed); int64_t bytes = 0; - std::string uncompressed; + char* uncompressed = new char[input.size()]; while (ok && bytes < 1024 * 1048576) { // Compress 1G ok = port::Snappy_Uncompress(compressed.data(), compressed.size(), - &uncompressed); - bytes += uncompressed.size(); + uncompressed); + bytes += input.size(); FinishedSingleOp(); } + delete[] uncompressed; if (!ok) { message_ = "(snappy failure)"; diff --git a/db/db_impl.cc b/db/db_impl.cc index 48056da61..5a0648e50 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -68,16 +68,6 @@ struct DBImpl::CompactionState { } }; -namespace { -class NullWritableFile : public WritableFile { - public: - virtual Status Append(const Slice& data) { return Status::OK(); } - virtual Status Close() { return Status::OK(); } - virtual Status Flush() { return Status::OK(); } - virtual Status Sync() { return Status::OK(); } -}; -} - // Fix user-supplied options to be reasonable template static void ClipToRange(T* ptr, V minvalue, V maxvalue) { @@ -96,11 +86,10 @@ Options SanitizeOptions(const std::string& dbname, // Open a log file in the same directory as the db src.env->CreateDir(dbname); // In case it does not exist src.env->RenameFile(InfoLogFileName(dbname), OldInfoLogFileName(dbname)); - Status s = src.env->NewWritableFile(InfoLogFileName(dbname), - &result.info_log); + Status s = src.env->NewLogger(InfoLogFileName(dbname), &result.info_log); if (!s.ok()) { // No place suitable for logging - result.info_log = new NullWritableFile; + result.info_log = NULL; } } if (result.block_cache == NULL) { @@ -201,7 +190,7 @@ void DBImpl::MaybeIgnoreError(Status* s) const { if (s->ok() || options_.paranoid_checks) { // No change needed } else { - Log(env_, options_.info_log, "Ignoring error %s", s->ToString().c_str()); + Log(options_.info_log, "Ignoring error %s", s->ToString().c_str()); *s = Status::OK(); } } @@ -247,7 +236,7 @@ void DBImpl::DeleteObsoleteFiles() { if (type == kTableFile) { table_cache_->Evict(number); } - Log(env_, options_.info_log, "Delete type=%d #%lld\n", + Log(options_.info_log, "Delete type=%d #%lld\n", int(type), static_cast(number)); env_->DeleteFile(dbname_ + "/" + filenames[i]); @@ -336,11 +325,11 @@ Status DBImpl::RecoverLogFile(uint64_t log_number, SequenceNumber* max_sequence) { struct LogReporter : public log::Reader::Reporter { Env* env; - WritableFile* info_log; + Logger* info_log; const char* fname; Status* status; // NULL if options_.paranoid_checks==false virtual void Corruption(size_t bytes, const Status& s) { - Log(env, info_log, "%s%s: dropping %d bytes; %s", + Log(info_log, "%s%s: dropping %d bytes; %s", (this->status == NULL ? "(ignoring error) " : ""), fname, static_cast(bytes), s.ToString().c_str()); if (this->status != NULL && this->status->ok()) *this->status = s; @@ -370,7 +359,7 @@ Status DBImpl::RecoverLogFile(uint64_t log_number, // large sequence numbers). log::Reader reader(file, &reporter, true/*checksum*/, 0/*initial_offset*/); - Log(env_, options_.info_log, "Recovering log #%llu", + Log(options_.info_log, "Recovering log #%llu", (unsigned long long) log_number); // Read all the records and add to a memtable @@ -434,7 +423,7 @@ Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit, meta.number = versions_->NewFileNumber(); pending_outputs_.insert(meta.number); Iterator* iter = mem->NewIterator(); - Log(env_, options_.info_log, "Level-0 table #%llu: started", + Log(options_.info_log, "Level-0 table #%llu: started", (unsigned long long) meta.number); Status s; @@ -444,7 +433,7 @@ Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit, mutex_.Lock(); } - Log(env_, options_.info_log, "Level-0 table #%llu: %lld bytes %s", + Log(options_.info_log, "Level-0 table #%llu: %lld bytes %s", (unsigned long long) meta.number, (unsigned long long) meta.file_size, s.ToString().c_str()); @@ -613,7 +602,7 @@ void DBImpl::BackgroundCompaction() { f->smallest, f->largest); status = versions_->LogAndApply(c->edit()); VersionSet::LevelSummaryStorage tmp; - Log(env_, options_.info_log, "Moved #%lld to level-%d %lld bytes %s: %s\n", + Log(options_.info_log, "Moved #%lld to level-%d %lld bytes %s: %s\n", static_cast(f->number), c->level() + 1, static_cast(f->file_size), @@ -631,7 +620,7 @@ void DBImpl::BackgroundCompaction() { } else if (shutting_down_.Acquire_Load()) { // Ignore compaction errors found during shutting down } else { - Log(env_, options_.info_log, + Log(options_.info_log, "Compaction error: %s", status.ToString().c_str()); if (options_.paranoid_checks && bg_error_.ok()) { bg_error_ = status; @@ -727,7 +716,7 @@ Status DBImpl::FinishCompactionOutputFile(CompactionState* compact, s = iter->status(); delete iter; if (s.ok()) { - Log(env_, options_.info_log, + Log(options_.info_log, "Generated table #%llu: %lld keys, %lld bytes", (unsigned long long) output_number, (unsigned long long) current_entries, @@ -740,7 +729,7 @@ Status DBImpl::FinishCompactionOutputFile(CompactionState* compact, Status DBImpl::InstallCompactionResults(CompactionState* compact) { mutex_.AssertHeld(); - Log(env_, options_.info_log, "Compacted %d@%d + %d@%d files => %lld bytes", + Log(options_.info_log, "Compacted %d@%d + %d@%d files => %lld bytes", compact->compaction->num_input_files(0), compact->compaction->level(), compact->compaction->num_input_files(1), @@ -776,7 +765,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { const uint64_t start_micros = env_->NowMicros(); int64_t imm_micros = 0; // Micros spent doing imm_ compactions - Log(env_, options_.info_log, "Compacting %d@%d + %d@%d files", + Log(options_.info_log, "Compacting %d@%d + %d@%d files", compact->compaction->num_input_files(0), compact->compaction->level(), compact->compaction->num_input_files(1), @@ -859,7 +848,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { last_sequence_for_key = ikey.sequence; } #if 0 - Log(env_, options_.info_log, + Log(options_.info_log, " Compact: %s, seq %d, type: %d %d, drop: %d, is_base: %d, " "%d smallest_snapshot: %d", ikey.user_key.ToString().c_str(), @@ -925,7 +914,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { status = InstallCompactionResults(compact); } VersionSet::LevelSummaryStorage tmp; - Log(env_, options_.info_log, + Log(options_.info_log, "compacted to: %s", versions_->LevelSummary(&tmp)); return status; } @@ -1112,7 +1101,7 @@ Status DBImpl::MakeRoomForWrite(bool force) { bg_cv_.Wait(); } else if (versions_->NumLevelFiles(0) >= config::kL0_StopWritesTrigger) { // There are too many level-0 files. - Log(env_, options_.info_log, "waiting...\n"); + Log(options_.info_log, "waiting...\n"); bg_cv_.Wait(); } else { // Attempt to switch to a new memtable and trigger compaction of old diff --git a/db/repair.cc b/db/repair.cc index 2e3f50608..5bcdb5651 100644 --- a/db/repair.cc +++ b/db/repair.cc @@ -78,7 +78,7 @@ class Repairer { for (size_t i = 0; i < tables_.size(); i++) { bytes += tables_[i].meta.file_size; } - Log(env_, options_.info_log, + Log(options_.info_log, "**** Repaired leveldb %s; " "recovered %d files; %llu bytes. " "Some data may have been lost. " @@ -149,7 +149,7 @@ class Repairer { std::string logname = LogFileName(dbname_, logs_[i]); Status status = ConvertLogToTable(logs_[i]); if (!status.ok()) { - Log(env_, options_.info_log, "Log #%llu: ignoring conversion error: %s", + Log(options_.info_log, "Log #%llu: ignoring conversion error: %s", (unsigned long long) logs_[i], status.ToString().c_str()); } @@ -160,11 +160,11 @@ class Repairer { Status ConvertLogToTable(uint64_t log) { struct LogReporter : public log::Reader::Reporter { Env* env; - WritableFile* info_log; + Logger* info_log; uint64_t lognum; virtual void Corruption(size_t bytes, const Status& s) { // We print error messages for corruption, but continue repairing. - Log(env, info_log, "Log #%llu: dropping %d bytes; %s", + Log(info_log, "Log #%llu: dropping %d bytes; %s", (unsigned long long) lognum, static_cast(bytes), s.ToString().c_str()); @@ -209,7 +209,7 @@ class Repairer { if (status.ok()) { counter += WriteBatchInternal::Count(&batch); } else { - Log(env_, options_.info_log, "Log #%llu: ignoring %s", + Log(options_.info_log, "Log #%llu: ignoring %s", (unsigned long long) log, status.ToString().c_str()); status = Status::OK(); // Keep going with rest of file @@ -231,7 +231,7 @@ class Repairer { table_numbers_.push_back(meta.number); } } - Log(env_, options_.info_log, "Log #%llu: %d ops saved to Table #%llu %s", + Log(options_.info_log, "Log #%llu: %d ops saved to Table #%llu %s", (unsigned long long) log, counter, (unsigned long long) meta.number, @@ -247,7 +247,7 @@ class Repairer { Status status = ScanTable(&t); if (!status.ok()) { std::string fname = TableFileName(dbname_, table_numbers_[i]); - Log(env_, options_.info_log, "Table #%llu: ignoring %s", + Log(options_.info_log, "Table #%llu: ignoring %s", (unsigned long long) table_numbers_[i], status.ToString().c_str()); ArchiveFile(fname); @@ -270,7 +270,7 @@ class Repairer { for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { Slice key = iter->key(); if (!ParseInternalKey(key, &parsed)) { - Log(env_, options_.info_log, "Table #%llu: unparsable key %s", + Log(options_.info_log, "Table #%llu: unparsable key %s", (unsigned long long) t->meta.number, EscapeString(key).c_str()); continue; @@ -291,7 +291,7 @@ class Repairer { } delete iter; } - Log(env_, options_.info_log, "Table #%llu: %d entries %s", + Log(options_.info_log, "Table #%llu: %d entries %s", (unsigned long long) t->meta.number, counter, status.ToString().c_str()); @@ -373,7 +373,7 @@ class Repairer { new_file.append("/"); new_file.append((slash == NULL) ? fname.c_str() : slash + 1); Status s = env_->RenameFile(fname, new_file); - Log(env_, options_.info_log, "Archiving %s: %s\n", + Log(options_.info_log, "Archiving %s: %s\n", fname.c_str(), s.ToString().c_str()); } }; diff --git a/db/version_set.cc b/db/version_set.cc index 62bd6dd1a..5040b72f6 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -1124,7 +1124,7 @@ void VersionSet::SetupOtherInputs(Compaction* c) { std::vector expanded1; GetOverlappingInputs(level+1, new_start, new_limit, &expanded1); if (expanded1.size() == c->inputs_[1].size()) { - Log(env_, options_->info_log, + Log(options_->info_log, "Expanding@%d %d+%d to %d+%d\n", level, int(c->inputs_[0].size()), @@ -1147,7 +1147,7 @@ void VersionSet::SetupOtherInputs(Compaction* c) { } if (false) { - Log(env_, options_->info_log, "Compacting %d '%s' .. '%s'", + Log(options_->info_log, "Compacting %d '%s' .. '%s'", level, EscapeString(smallest.Encode()).c_str(), EscapeString(largest.Encode()).c_str()); diff --git a/include/leveldb/env.h b/include/leveldb/env.h index 39f6a1a22..bf5100893 100644 --- a/include/leveldb/env.h +++ b/include/leveldb/env.h @@ -22,6 +22,7 @@ namespace leveldb { class FileLock; +class Logger; class RandomAccessFile; class SequentialFile; class Slice; @@ -134,8 +135,8 @@ class Env { // same directory. virtual Status GetTestDirectory(std::string* path) = 0; - // Write an entry to the log file with the specified format. - virtual void Logv(WritableFile* log, const char* format, va_list ap) = 0; + // Create and return a log file for storing informational messages. + virtual Status NewLogger(const std::string& fname, Logger** result) = 0; // Returns the number of micro-seconds since some fixed point in time. Only // useful for computing deltas of time. @@ -210,6 +211,22 @@ class WritableFile { void operator=(const WritableFile&); }; +// An interface for writing log messages. +class Logger { + public: + Logger() { } + virtual ~Logger(); + + // Write an entry to the log file with the specified format. + virtual void Logv(const char* format, va_list ap) = 0; + + private: + // No copying allowed + Logger(const Logger&); + void operator=(const Logger&); +}; + + // Identifies a locked file. class FileLock { public: @@ -222,9 +239,9 @@ class FileLock { }; // Log the specified data to *info_log if info_log is non-NULL. -extern void Log(Env* env, WritableFile* info_log, const char* format, ...) +extern void Log(Logger* info_log, const char* format, ...) # if defined(__GNUC__) || defined(__clang__) - __attribute__((__format__ (__printf__, 3, 4))) + __attribute__((__format__ (__printf__, 2, 3))) # endif ; @@ -284,8 +301,8 @@ class EnvWrapper : public Env { virtual Status GetTestDirectory(std::string* path) { return target_->GetTestDirectory(path); } - virtual void Logv(WritableFile* log, const char* format, va_list ap) { - return target_->Logv(log, format, ap); + virtual Status NewLogger(const std::string& fname, Logger** result) { + return target_->NewLogger(fname, result); } uint64_t NowMicros() { return target_->NowMicros(); diff --git a/include/leveldb/options.h b/include/leveldb/options.h index 0d4f6cdd6..381f22891 100644 --- a/include/leveldb/options.h +++ b/include/leveldb/options.h @@ -12,8 +12,8 @@ namespace leveldb { class Cache; class Comparator; class Env; +class Logger; class Snapshot; -class WritableFile; // DB contents are stored in a set of blocks, each of which holds a // sequence of key,value pairs. Each block may be compressed before @@ -61,10 +61,10 @@ struct Options { Env* env; // Any internal progress/error information generated by the db will - // be to written to info_log if it is non-NULL, or to a file stored + // be written to info_log if it is non-NULL, or to a file stored // in the same directory as the DB contents if info_log is NULL. // Default: NULL - WritableFile* info_log; + Logger* info_log; // ------------------- // Parameters that affect performance diff --git a/port/atomic_pointer.h b/port/atomic_pointer.h index c61877862..c20b1bdfb 100644 --- a/port/atomic_pointer.h +++ b/port/atomic_pointer.h @@ -48,9 +48,8 @@ namespace port { // http://msdn.microsoft.com/en-us/library/ms684208(v=vs.85).aspx #define LEVELDB_HAVE_MEMORY_BARRIER -// Gcc and Sun Studio on x86 -#elif defined(ARCH_CPU_X86_FAMILY) && \ - (defined(__GNUC__) || defined(__SUNPRO_CC)) +// Gcc on x86 +#elif defined(ARCH_CPU_X86_FAMILY) && defined(__GNUC__) inline void MemoryBarrier() { // See http://gcc.gnu.org/ml/gcc/2003-04/msg01180.html for a discussion on // this idiom. Also see http://en.wikipedia.org/wiki/Memory_ordering. @@ -58,6 +57,15 @@ inline void MemoryBarrier() { } #define LEVELDB_HAVE_MEMORY_BARRIER +// Sun Studio +#elif defined(ARCH_CPU_X86_FAMILY) && defined(__SUNPRO_CC) +inline void MemoryBarrier() { + // See http://gcc.gnu.org/ml/gcc/2003-04/msg01180.html for a discussion on + // this idiom. Also see http://en.wikipedia.org/wiki/Memory_ordering. + asm volatile("" : : : "memory"); +} +#define LEVELDB_HAVE_MEMORY_BARRIER + // Mac OS #elif defined(OS_MACOSX) inline void MemoryBarrier() { diff --git a/port/port_android.h b/port/port_android.h index 13df9c943..d68b6c035 100644 --- a/port/port_android.h +++ b/port/port_android.h @@ -125,11 +125,17 @@ inline bool Snappy_Compress( return false; } +// TODO(gabor): Implement uncompress +inline bool Snappy_GetUncompressedLength(const char* input, size_t length, + size_t* result) { + return false; +} + // TODO(gabor): Implement uncompress inline bool Snappy_Uncompress( const char* input_data, size_t input_length, - std::string* output) { + char* output) { return false; } diff --git a/port/port_chromium.cc b/port/port_chromium.cc index 2ab49b9fd..7f6de92a2 100644 --- a/port/port_chromium.cc +++ b/port/port_chromium.cc @@ -62,15 +62,19 @@ bool Snappy_Compress(const char* input, size_t input_length, #endif } -bool Snappy_Uncompress(const char* input_data, size_t input_length, - std::string* output) { +bool Snappy_GetUncompressedLength(const char* input, size_t length, + size_t* result) { #if defined(USE_SNAPPY) - size_t ulength; - if (!snappy::GetUncompressedLength(input_data, input_length, &ulength)) { - return false; - } - output->resize(ulength); - return snappy::RawUncompress(input_data, input_length, &(*output)[0]); + return snappy::GetUncompressedLength(input_data, input_length, result); +#else + return false; +#endif +} + +bool Snappy_Uncompress(const char* input_data, size_t input_length, + char* output) { +#if defined(USE_SNAPPY) + return snappy::RawUncompress(input_data, input_length, output); #else return false; #endif diff --git a/port/port_chromium.h b/port/port_chromium.h index 1851e6ec1..feecd5b99 100644 --- a/port/port_chromium.h +++ b/port/port_chromium.h @@ -84,8 +84,10 @@ class AtomicPointer { bool Snappy_Compress(const char* input, size_t input_length, std::string* output); +bool Snappy_GetUncompressedLength(const char* input, size_t length, + size_t* result); bool Snappy_Uncompress(const char* input_data, size_t input_length, - std::string* output); + char* output); inline bool GetHeapProfile(void (*func)(void*, const char*, int), void* arg) { return false; diff --git a/port/port_example.h b/port/port_example.h index 8a624f346..6bd9b49eb 100644 --- a/port/port_example.h +++ b/port/port_example.h @@ -96,11 +96,21 @@ class AtomicPointer { extern bool Snappy_Compress(const char* input, size_t input_length, std::string* output); +// If input[0,input_length-1] looks like a valid snappy compressed +// buffer, store the size of the uncompressed data in *result and +// return true. Else return false. +extern bool Snappy_GetUncompressedLength(const char* input, size_t length, + size_t* result); + // Attempt to snappy uncompress input[0,input_length-1] into *output. // Returns true if successful, false if the input is invalid lightweight // compressed data. +// +// REQUIRES: at least the first "n" bytes of output[] must be writable +// where "n" is the result of a successful call to +// Snappy_GetUncompressedLength. extern bool Snappy_Uncompress(const char* input_data, size_t input_length, - std::string* output); + char* output); // ------------------ Miscellaneous ------------------- diff --git a/port/port_posix.h b/port/port_posix.h index 2995026f5..ef01de3e1 100644 --- a/port/port_posix.h +++ b/port/port_posix.h @@ -80,12 +80,12 @@ class CondVar { Mutex* mu_; }; -inline bool Snappy_Compress(const char* input, size_t input_length, +inline bool Snappy_Compress(const char* input, size_t length, ::std::string* output) { #ifdef SNAPPY - output->resize(snappy::MaxCompressedLength(input_length)); + output->resize(snappy::MaxCompressedLength(length)); size_t outlen; - snappy::RawCompress(input, input_length, &(*output)[0], &outlen); + snappy::RawCompress(input, length, &(*output)[0], &outlen); output->resize(outlen); return true; #endif @@ -93,18 +93,22 @@ inline bool Snappy_Compress(const char* input, size_t input_length, return false; } -inline bool Snappy_Uncompress(const char* input_data, size_t input_length, - ::std::string* output) { +inline bool Snappy_GetUncompressedLength(const char* input, size_t length, + size_t* result) { #ifdef SNAPPY - size_t ulength; - if (!snappy::GetUncompressedLength(input_data, input_length, &ulength)) { - return false; - } - output->resize(ulength); - return snappy::RawUncompress(input_data, input_length, &(*output)[0]); -#endif - + return snappy::GetUncompressedLength(input, length, result); +#else return false; +#endif +} + +inline bool Snappy_Uncompress(const char* input, size_t length, + char* output) { +#ifdef SNAPPY + return snappy::RawUncompress(input, length, output); +#else + return false; +#endif } inline bool GetHeapProfile(void (*func)(void*, const char*, int), void* arg) { diff --git a/table/format.cc b/table/format.cc index 63971dbe9..ba7838c23 100644 --- a/table/format.cc +++ b/table/format.cc @@ -107,16 +107,20 @@ Status ReadBlock(RandomAccessFile* file, // Ok break; case kSnappyCompression: { - std::string decompressed; - if (!port::Snappy_Uncompress(data, n, &decompressed)) { + size_t ulength = 0; + if (!port::Snappy_GetUncompressedLength(data, n, &ulength)) { delete[] buf; - s = Status::Corruption("corrupted compressed block contents"); - return s; + return Status::Corruption("corrupted compressed block contents"); } - delete[] buf; // Done with uncompressed data - buf = new char[decompressed.size()]; - memcpy(buf, decompressed.data(), decompressed.size()); - n = decompressed.size(); + char* ubuf = new char[ulength]; + if (!port::Snappy_Uncompress(data, n, ubuf)) { + delete[] buf; + delete[] ubuf; + return Status::Corruption("corrupted compressed block contents"); + } + delete[] buf; + buf = ubuf; + n = ulength; break; } default: diff --git a/util/env.cc b/util/env.cc index e5297e794..79e493e99 100644 --- a/util/env.cc +++ b/util/env.cc @@ -18,14 +18,19 @@ RandomAccessFile::~RandomAccessFile() { WritableFile::~WritableFile() { } +Logger::~Logger() { +} + FileLock::~FileLock() { } -void Log(Env* env, WritableFile* info_log, const char* format, ...) { - va_list ap; - va_start(ap, format); - env->Logv(info_log, format, ap); - va_end(ap); +void Log(Logger* info_log, const char* format, ...) { + if (info_log != NULL) { + va_list ap; + va_start(ap, format); + info_log->Logv(format, ap); + va_end(ap); + } } Status WriteStringToFile(Env* env, const Slice& data, diff --git a/util/env_chromium.cc b/util/env_chromium.cc index 1af525a5f..975386baa 100644 --- a/util/env_chromium.cc +++ b/util/env_chromium.cc @@ -23,6 +23,7 @@ #include "leveldb/slice.h" #include "port/port.h" #include "util/logging.h" +#include "util/posix_logger.h" #if defined(OS_WIN) #include @@ -406,9 +407,8 @@ class ChromiumEnv : public Env { return Status::OK(); } - virtual void Logv(WritableFile* info_log, const char* format, va_list ap) { - // TODO(jorlow): We may want to just use Chromium's built in logging. - + // TODO(user,user): Use Chromium's built-in logging? + static uint64_t gettid() { uint64_t thread_id = 0; // Coppied from base/logging.cc. #if defined(OS_WIN) @@ -422,65 +422,17 @@ class ChromiumEnv : public Env { pthread_t tid = pthread_self(); memcpy(&thread_id, &tid, min(sizeof(r), sizeof(tid))); #endif + return thread_id; + } - // We try twice: the first time with a fixed-size stack allocated buffer, - // and the second time with a much larger dynamically allocated buffer. - char buffer[500]; - for (int iter = 0; iter < 2; iter++) { - char* base; - int bufsize; - if (iter == 0) { - bufsize = sizeof(buffer); - base = buffer; - } else { - bufsize = 30000; - base = new char[bufsize]; - } - char* p = base; - char* limit = base + bufsize; - - ::base::Time::Exploded t; - ::base::Time::Now().LocalExplode(&t); - p += snprintf(p, limit - p, - "%04d/%02d/%02d-%02d:%02d:%02d.%06d %llx ", - t.year, - t.month, - t.day_of_month, - t.hour, - t.minute, - t.second, - static_cast(t.millisecond) * 1000, - static_cast(thread_id)); - - // Print the message - if (p < limit) { - va_list backup_ap; - va_copy(backup_ap, ap); - p += vsnprintf(p, limit - p, format, backup_ap); - va_end(backup_ap); - } - - // Truncate to available space if necessary - if (p >= limit) { - if (iter == 0) { - continue; // Try again with larger buffer - } else { - p = limit - 1; - } - } - - // Add newline if necessary - if (p == base || p[-1] != '\n') { - *p++ = '\n'; - } - - assert(p <= limit); - info_log->Append(Slice(base, p - base)); - info_log->Flush(); - if (base != buffer) { - delete[] base; - } - break; + virtual Status NewLogger(const std::string& fname, Logger** result) { + FILE* f = fopen(fname.c_str(), "w"); + if (f == NULL) { + *result = NULL; + return Status::IOError(fname, strerror(errno)); + } else { + *result = new PosixLogger(f, &ChromiumEnv::gettid); + return Status::OK(); } } diff --git a/util/env_posix.cc b/util/env_posix.cc index 46723e25f..5127c8932 100644 --- a/util/env_posix.cc +++ b/util/env_posix.cc @@ -23,6 +23,7 @@ #include "leveldb/slice.h" #include "port/port.h" #include "util/logging.h" +#include "util/posix_logger.h" namespace leveldb { @@ -427,72 +428,21 @@ class PosixEnv : public Env { return Status::OK(); } - virtual void Logv(WritableFile* info_log, const char* format, va_list ap) { + static uint64_t gettid() { pthread_t tid = pthread_self(); uint64_t thread_id = 0; memcpy(&thread_id, &tid, std::min(sizeof(thread_id), sizeof(tid))); + return thread_id; + } - // We try twice: the first time with a fixed-size stack allocated buffer, - // and the second time with a much larger dynamically allocated buffer. - char buffer[500]; - for (int iter = 0; iter < 2; iter++) { - char* base; - int bufsize; - if (iter == 0) { - bufsize = sizeof(buffer); - base = buffer; - } else { - bufsize = 30000; - base = new char[bufsize]; - } - char* p = base; - char* limit = base + bufsize; - - struct timeval now_tv; - gettimeofday(&now_tv, NULL); - const time_t seconds = now_tv.tv_sec; - struct tm t; - localtime_r(&seconds, &t); - p += snprintf(p, limit - p, - "%04d/%02d/%02d-%02d:%02d:%02d.%06d %llx ", - t.tm_year + 1900, - t.tm_mon + 1, - t.tm_mday, - t.tm_hour, - t.tm_min, - t.tm_sec, - static_cast(now_tv.tv_usec), - static_cast(thread_id)); - - // Print the message - if (p < limit) { - va_list backup_ap; - va_copy(backup_ap, ap); - p += vsnprintf(p, limit - p, format, backup_ap); - va_end(backup_ap); - } - - // Truncate to available space if necessary - if (p >= limit) { - if (iter == 0) { - continue; // Try again with larger buffer - } else { - p = limit - 1; - } - } - - // Add newline if necessary - if (p == base || p[-1] != '\n') { - *p++ = '\n'; - } - - assert(p <= limit); - info_log->Append(Slice(base, p - base)); - info_log->Flush(); - if (base != buffer) { - delete[] base; - } - break; + virtual Status NewLogger(const std::string& fname, Logger** result) { + FILE* f = fopen(fname.c_str(), "w"); + if (f == NULL) { + *result = NULL; + return IOError(fname, errno); + } else { + *result = new PosixLogger(f, &PosixEnv::gettid); + return Status::OK(); } } diff --git a/util/posix_logger.h b/util/posix_logger.h new file mode 100644 index 000000000..0dbdeaa3a --- /dev/null +++ b/util/posix_logger.h @@ -0,0 +1,97 @@ +// Copyright 2011 Google Inc. All Rights Reserved. +// Author: sanjay@google.com (Sanjay Ghemawat) +// +// Logger implementation that can be shared by all environments +// where enough posix functionality is available. + +#ifndef STORAGE_LEVELDB_UTIL_POSIX_LOGGER_H_ +#define STORAGE_LEVELDB_UTIL_POSIX_LOGGER_H_ + +#include +#include +#include +#include +#include "leveldb/env.h" + +namespace leveldb { + +class PosixLogger : public Logger { + private: + FILE* file_; + uint64_t (*gettid_)(); // Return the thread id for the current thread + public: + PosixLogger(FILE* f, uint64_t (*gettid)()) : file_(f), gettid_(gettid) { } + virtual ~PosixLogger() { + fclose(file_); + } + virtual void Logv(const char* format, va_list ap) { + const uint64_t thread_id = (*gettid_)(); + + // We try twice: the first time with a fixed-size stack allocated buffer, + // and the second time with a much larger dynamically allocated buffer. + char buffer[500]; + for (int iter = 0; iter < 2; iter++) { + char* base; + int bufsize; + if (iter == 0) { + bufsize = sizeof(buffer); + base = buffer; + } else { + bufsize = 30000; + base = new char[bufsize]; + } + char* p = base; + char* limit = base + bufsize; + + struct timeval now_tv; + gettimeofday(&now_tv, NULL); + const time_t seconds = now_tv.tv_sec; + struct tm t; + localtime_r(&seconds, &t); + p += snprintf(p, limit - p, + "%04d/%02d/%02d-%02d:%02d:%02d.%06d %llx ", + t.tm_year + 1900, + t.tm_mon + 1, + t.tm_mday, + t.tm_hour, + t.tm_min, + t.tm_sec, + static_cast(now_tv.tv_usec), + static_cast(thread_id)); + + // Print the message + if (p < limit) { + va_list backup_ap; + va_copy(backup_ap, ap); + p += vsnprintf(p, limit - p, format, backup_ap); + va_end(backup_ap); + } + + // Truncate to available space if necessary + if (p >= limit) { + if (iter == 0) { + continue; // Try again with larger buffer + } else { + p = limit - 1; + } + } + + // Add newline if necessary + if (p == base || p[-1] != '\n') { + *p++ = '\n'; + } + + assert(p <= limit); + fwrite(base, 1, p - base, file_); + fflush(file_); + if (base != buffer) { + delete[] base; + } + break; + } + } +}; + +} + +#endif // STORAGE_LEVELDB_UTIL_POSIX_LOGGER_H_