提交 61cc9ef7 编辑于 作者: Yanqin Jin's avatar Yanqin Jin
浏览文件

Fail recovery when MANIFEST record checksum mismatch (#6996)

Summary:
https://github.com/facebook/rocksdb/issues/5411 refactored `VersionSet::Recover` but introduced a bug, explained as follows.
Before, once a checksum mismatch happens, `reporter` will set `s` to be non-ok. Therefore, Recover will stop processing the MANIFEST any further.
```
// Correct
// Inside Recover
LogReporter reporter;
reporter.status = &s;
log::Reader reader(..., reporter);
while (reader.ReadRecord() && s.ok()) {
...
}
```
The bug is that, the local variable `s` in `ReadAndRecover` won't be updated by `reporter` while reading the MANIFEST. It is possible that the reader sees a checksum mismatch in a record, but `ReadRecord` retries internally read and finds the next valid record. The mismatched record will be ignored and no error is reported.
```
// Incorrect
// Inside Recover
LogReporter reporter;
reporter.status = &s;
log::Reader reader(..., reporter);
s = ReadAndRecover(reader, ...);

// Inside ReadAndRecover
  Status s;  // Shadows the s in Recover.
  while (reader.ReadRecord() && s.ok()) {
   ...
  }
```
`LogReporter` can use a separate `log_read_status` to track the errors while reading the MANIFEST. RocksDB can process more MANIFEST entries only if `log_read_status.ok()`.
Test plan (devserver):
make check
Pull Request resolved: https://github.com/facebook/rocksdb/pull/6996

Reviewed By: ajkr

Differential Revision: D22105746

Pulled By: riversand963

fbshipit-source-id: b22f717a423457a41ca152a242abbb64cf91fc38
上级 f5d4dbbe
...@@ -15,6 +15,7 @@ ...@@ -15,6 +15,7 @@
* Best-efforts recovery ignores CURRENT file completely. If CURRENT file is missing during recovery, best-efforts recovery still proceeds with MANIFEST file(s). * Best-efforts recovery ignores CURRENT file completely. If CURRENT file is missing during recovery, best-efforts recovery still proceeds with MANIFEST file(s).
* In best-efforts recovery, an error that is not Corruption or IOError::kNotFound or IOError::kPathNotFound will be overwritten silently. Fix this by checking all non-ok cases and return early. * In best-efforts recovery, an error that is not Corruption or IOError::kNotFound or IOError::kPathNotFound will be overwritten silently. Fix this by checking all non-ok cases and return early.
* Compressed block cache was automatically disabled with read-only DBs by mistake. Now it is fixed: compressed block cache will be in effective with read-only DB too. * Compressed block cache was automatically disabled with read-only DBs by mistake. Now it is fixed: compressed block cache will be in effective with read-only DB too.
* Fail recovery and report once hitting a physical log record checksum mismatch, while reading MANIFEST. RocksDB should not continue processing the MANIFEST any further.
### Public API Change ### Public API Change
* Flush(..., column_family) may return Status::ColumnFamilyDropped() instead of Status::InvalidArgument() if column_family is dropped while processing the flush request. * Flush(..., column_family) may return Status::ColumnFamilyDropped() instead of Status::InvalidArgument() if column_family is dropped while processing the flush request.
......
...@@ -2268,6 +2268,33 @@ TEST_F(DBBasicTest, SkipWALIfMissingTableFiles) { ...@@ -2268,6 +2268,33 @@ TEST_F(DBBasicTest, SkipWALIfMissingTableFiles) {
} }
#endif // !ROCKSDB_LITE #endif // !ROCKSDB_LITE
TEST_F(DBBasicTest, ManifestChecksumMismatch) {
Options options = CurrentOptions();
DestroyAndReopen(options);
ASSERT_OK(Put("bar", "value"));
ASSERT_OK(Flush());
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
SyncPoint::GetInstance()->SetCallBack(
"LogWriter::EmitPhysicalRecord:BeforeEncodeChecksum", [&](void* arg) {
auto* crc = reinterpret_cast<uint32_t*>(arg);
*crc = *crc + 1;
});
SyncPoint::GetInstance()->EnableProcessing();
WriteOptions write_opts;
write_opts.disableWAL = true;
Status s = db_->Put(write_opts, "foo", "value");
ASSERT_OK(s);
ASSERT_OK(Flush());
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
ASSERT_OK(Put("foo", "value1"));
ASSERT_OK(Flush());
s = TryReopen(options);
ASSERT_TRUE(s.IsCorruption());
}
class DBBasicTestMultiGet : public DBTestBase { class DBBasicTestMultiGet : public DBTestBase {
public: public:
DBBasicTestMultiGet(std::string test_dir, int num_cfs, bool compressed_cache, DBBasicTestMultiGet(std::string test_dir, int num_cfs, bool compressed_cache,
......
...@@ -147,6 +147,8 @@ IOStatus Writer::EmitPhysicalRecord(RecordType t, const char* ptr, size_t n) { ...@@ -147,6 +147,8 @@ IOStatus Writer::EmitPhysicalRecord(RecordType t, const char* ptr, size_t n) {
// Compute the crc of the record type and the payload. // Compute the crc of the record type and the payload.
crc = crc32c::Extend(crc, ptr, n); crc = crc32c::Extend(crc, ptr, n);
crc = crc32c::Mask(crc); // Adjust for storage crc = crc32c::Mask(crc); // Adjust for storage
TEST_SYNC_POINT_CALLBACK("LogWriter::EmitPhysicalRecord:BeforeEncodeChecksum",
&crc);
EncodeFixed32(buf, crc); EncodeFixed32(buf, crc);
// Write the header and the payload // Write the header and the payload
......
...@@ -27,12 +27,17 @@ VersionEditHandler::VersionEditHandler( ...@@ -27,12 +27,17 @@ VersionEditHandler::VersionEditHandler(
assert(version_set_ != nullptr); assert(version_set_ != nullptr);
} }
Status VersionEditHandler::Iterate(log::Reader& reader, std::string* db_id) { void VersionEditHandler::Iterate(log::Reader& reader, Status* log_read_status,
std::string* db_id) {
Slice record; Slice record;
std::string scratch; std::string scratch;
assert(log_read_status);
assert(log_read_status->ok());
size_t recovered_edits = 0; size_t recovered_edits = 0;
Status s = Initialize(); Status s = Initialize();
while (reader.ReadRecord(&record, &scratch) && s.ok()) { while (s.ok() && reader.ReadRecord(&record, &scratch) &&
log_read_status->ok()) {
VersionEdit edit; VersionEdit edit;
s = edit.DecodeFrom(record); s = edit.DecodeFrom(record);
if (!s.ok()) { if (!s.ok()) {
...@@ -70,13 +75,15 @@ Status VersionEditHandler::Iterate(log::Reader& reader, std::string* db_id) { ...@@ -70,13 +75,15 @@ Status VersionEditHandler::Iterate(log::Reader& reader, std::string* db_id) {
} }
} }
} }
if (!log_read_status->ok()) {
s = *log_read_status;
}
CheckIterationResult(reader, &s); CheckIterationResult(reader, &s);
if (!s.ok()) { if (!s.ok()) {
status_ = s; status_ = s;
} }
return s;
} }
Status VersionEditHandler::Initialize() { Status VersionEditHandler::Initialize() {
......
...@@ -40,7 +40,8 @@ class VersionEditHandler { ...@@ -40,7 +40,8 @@ class VersionEditHandler {
virtual ~VersionEditHandler() {} virtual ~VersionEditHandler() {}
Status Iterate(log::Reader& reader, std::string* db_id); void Iterate(log::Reader& reader, Status* log_read_status,
std::string* db_id);
const Status& status() const { return status_; } const Status& status() const { return status_; }
......
...@@ -10,6 +10,7 @@ ...@@ -10,6 +10,7 @@
#include "db/version_set.h" #include "db/version_set.h"
#include <stdio.h> #include <stdio.h>
#include <algorithm> #include <algorithm>
#include <array> #include <array>
#include <cinttypes> #include <cinttypes>
...@@ -19,6 +20,7 @@ ...@@ -19,6 +20,7 @@
#include <string> #include <string>
#include <unordered_map> #include <unordered_map>
#include <vector> #include <vector>
#include "compaction/compaction.h" #include "compaction/compaction.h"
#include "db/internal_stats.h" #include "db/internal_stats.h"
#include "db/log_reader.h" #include "db/log_reader.h"
...@@ -50,6 +52,7 @@ ...@@ -50,6 +52,7 @@
#include "table/table_reader.h" #include "table/table_reader.h"
#include "table/two_level_iterator.h" #include "table/two_level_iterator.h"
#include "test_util/sync_point.h" #include "test_util/sync_point.h"
#include "util/cast_util.h"
#include "util/coding.h" #include "util/coding.h"
#include "util/stop_watch.h" #include "util/stop_watch.h"
#include "util/string_util.h" #include "util/string_util.h"
...@@ -4444,24 +4447,26 @@ Status VersionSet::GetCurrentManifestPath(const std::string& dbname, ...@@ -4444,24 +4447,26 @@ Status VersionSet::GetCurrentManifestPath(const std::string& dbname,
if (dbname.back() != '/') { if (dbname.back() != '/') {
manifest_path->push_back('/'); manifest_path->push_back('/');
} }
*manifest_path += fname; manifest_path->append(fname);
return Status::OK(); return Status::OK();
} }
Status VersionSet::ReadAndRecover( Status VersionSet::ReadAndRecover(
log::Reader* reader, AtomicGroupReadBuffer* read_buffer, log::Reader& reader, AtomicGroupReadBuffer* read_buffer,
const std::unordered_map<std::string, ColumnFamilyOptions>& name_to_options, const std::unordered_map<std::string, ColumnFamilyOptions>& name_to_options,
std::unordered_map<int, std::string>& column_families_not_found, std::unordered_map<int, std::string>& column_families_not_found,
std::unordered_map<uint32_t, std::unique_ptr<BaseReferencedVersionBuilder>>& std::unordered_map<uint32_t, std::unique_ptr<BaseReferencedVersionBuilder>>&
builders, builders,
VersionEditParams* version_edit_params, std::string* db_id) { Status* log_read_status, VersionEditParams* version_edit_params,
assert(reader != nullptr); std::string* db_id) {
assert(read_buffer != nullptr); assert(read_buffer != nullptr);
assert(log_read_status != nullptr);
Status s; Status s;
Slice record; Slice record;
std::string scratch; std::string scratch;
size_t recovered_edits = 0; size_t recovered_edits = 0;
while (reader->ReadRecord(&record, &scratch) && s.ok()) { while (s.ok() && reader.ReadRecord(&record, &scratch) &&
log_read_status->ok()) {
VersionEdit edit; VersionEdit edit;
s = edit.DecodeFrom(record); s = edit.DecodeFrom(record);
if (!s.ok()) { if (!s.ok()) {
...@@ -4505,6 +4510,9 @@ Status VersionSet::ReadAndRecover( ...@@ -4505,6 +4510,9 @@ Status VersionSet::ReadAndRecover(
} }
} }
} }
if (!log_read_status->ok()) {
s = *log_read_status;
}
if (!s.ok()) { if (!s.ok()) {
// Clear the buffer if we fail to decode/apply an edit. // Clear the buffer if we fail to decode/apply an edit.
read_buffer->Clear(); read_buffer->Clear();
...@@ -4551,8 +4559,7 @@ Status VersionSet::Recover( ...@@ -4551,8 +4559,7 @@ Status VersionSet::Recover(
db_options_->log_readahead_size)); db_options_->log_readahead_size));
} }
std::unordered_map<uint32_t, std::unique_ptr<BaseReferencedVersionBuilder>> VersionBuilderMap builders;
builders;
// add default column family // add default column family
auto default_cf_iter = cf_name_to_options.find(kDefaultColumnFamilyName); auto default_cf_iter = cf_name_to_options.find(kDefaultColumnFamilyName);
...@@ -4574,12 +4581,13 @@ Status VersionSet::Recover( ...@@ -4574,12 +4581,13 @@ Status VersionSet::Recover(
VersionEditParams version_edit_params; VersionEditParams version_edit_params;
{ {
VersionSet::LogReporter reporter; VersionSet::LogReporter reporter;
reporter.status = &s; Status log_read_status;
reporter.status = &log_read_status;
log::Reader reader(nullptr, std::move(manifest_file_reader), &reporter, log::Reader reader(nullptr, std::move(manifest_file_reader), &reporter,
true /* checksum */, 0 /* log_number */); true /* checksum */, 0 /* log_number */);
AtomicGroupReadBuffer read_buffer; AtomicGroupReadBuffer read_buffer;
s = ReadAndRecover(&reader, &read_buffer, cf_name_to_options, s = ReadAndRecover(reader, &read_buffer, cf_name_to_options,
column_families_not_found, builders, column_families_not_found, builders, &log_read_status,
&version_edit_params, db_id); &version_edit_params, db_id);
current_manifest_file_size = reader.GetReadOffset(); current_manifest_file_size = reader.GetReadOffset();
assert(current_manifest_file_size != 0); assert(current_manifest_file_size != 0);
...@@ -4845,21 +4853,20 @@ Status VersionSet::TryRecoverFromOneManifest( ...@@ -4845,21 +4853,20 @@ Status VersionSet::TryRecoverFromOneManifest(
db_options_->log_readahead_size)); db_options_->log_readahead_size));
} }
assert(s.ok());
VersionSet::LogReporter reporter; VersionSet::LogReporter reporter;
reporter.status = &s; reporter.status = &s;
log::Reader reader(nullptr, std::move(manifest_file_reader), &reporter, log::Reader reader(nullptr, std::move(manifest_file_reader), &reporter,
/*checksum=*/true, /*log_num=*/0); /*checksum=*/true, /*log_num=*/0);
{ VersionEditHandlerPointInTime handler_pit(read_only, column_families,
VersionEditHandlerPointInTime handler_pit(read_only, column_families, const_cast<VersionSet*>(this));
const_cast<VersionSet*>(this));
s = handler_pit.Iterate(reader, db_id); handler_pit.Iterate(reader, &s, db_id);
assert(nullptr != has_missing_table_file); assert(nullptr != has_missing_table_file);
*has_missing_table_file = handler_pit.HasMissingFiles(); *has_missing_table_file = handler_pit.HasMissingFiles();
}
return s; return handler_pit.status();
} }
Status VersionSet::ListColumnFamilies(std::vector<std::string>* column_families, Status VersionSet::ListColumnFamilies(std::vector<std::string>* column_families,
...@@ -5980,8 +5987,7 @@ Status ReactiveVersionSet::Recover( ...@@ -5980,8 +5987,7 @@ Status ReactiveVersionSet::Recover(
// In recovery, nobody else can access it, so it's fine to set it to be // In recovery, nobody else can access it, so it's fine to set it to be
// initialized earlier. // initialized earlier.
default_cfd->set_initialized(); default_cfd->set_initialized();
std::unordered_map<uint32_t, std::unique_ptr<BaseReferencedVersionBuilder>> VersionBuilderMap builders;
builders;
std::unordered_map<int, std::string> column_families_not_found; std::unordered_map<int, std::string> column_families_not_found;
builders.insert( builders.insert(
std::make_pair(0, std::unique_ptr<BaseReferencedVersionBuilder>( std::make_pair(0, std::unique_ptr<BaseReferencedVersionBuilder>(
...@@ -5989,7 +5995,7 @@ Status ReactiveVersionSet::Recover( ...@@ -5989,7 +5995,7 @@ Status ReactiveVersionSet::Recover(
manifest_reader_status->reset(new Status()); manifest_reader_status->reset(new Status());
manifest_reporter->reset(new LogReporter()); manifest_reporter->reset(new LogReporter());
static_cast<LogReporter*>(manifest_reporter->get())->status = static_cast_with_check<LogReporter>(manifest_reporter->get())->status =
manifest_reader_status->get(); manifest_reader_status->get();
Status s = MaybeSwitchManifest(manifest_reporter->get(), manifest_reader); Status s = MaybeSwitchManifest(manifest_reporter->get(), manifest_reader);
log::Reader* reader = manifest_reader->get(); log::Reader* reader = manifest_reader->get();
...@@ -5998,10 +6004,9 @@ Status ReactiveVersionSet::Recover( ...@@ -5998,10 +6004,9 @@ Status ReactiveVersionSet::Recover(
VersionEdit version_edit; VersionEdit version_edit;
while (s.ok() && retry < 1) { while (s.ok() && retry < 1) {
assert(reader != nullptr); assert(reader != nullptr);
Slice record; s = ReadAndRecover(*reader, &read_buffer_, cf_name_to_options,
std::string scratch; column_families_not_found, builders,
s = ReadAndRecover(reader, &read_buffer_, cf_name_to_options, manifest_reader_status->get(), &version_edit);
column_families_not_found, builders, &version_edit);
if (s.ok()) { if (s.ok()) {
bool enough = version_edit.has_next_file_number_ && bool enough = version_edit.has_next_file_number_ &&
version_edit.has_log_number_ && version_edit.has_log_number_ &&
......
...@@ -1165,6 +1165,10 @@ class VersionSet { ...@@ -1165,6 +1165,10 @@ class VersionSet {
void SetIOStatusOK() { io_status_ = IOStatus::OK(); } void SetIOStatusOK() { io_status_ = IOStatus::OK(); }
protected: protected:
using VersionBuilderMap =
std::unordered_map<uint32_t,
std::unique_ptr<BaseReferencedVersionBuilder>>;
struct ManifestWriter; struct ManifestWriter;
friend class Version; friend class Version;
...@@ -1176,7 +1180,9 @@ class VersionSet { ...@@ -1176,7 +1180,9 @@ class VersionSet {
struct LogReporter : public log::Reader::Reporter { struct LogReporter : public log::Reader::Reporter {
Status* status; Status* status;
virtual void Corruption(size_t /*bytes*/, const Status& s) override { virtual void Corruption(size_t /*bytes*/, const Status& s) override {
if (this->status->ok()) *this->status = s; if (status->ok()) {
*status = s;
}
} }
}; };
...@@ -1207,13 +1213,14 @@ class VersionSet { ...@@ -1207,13 +1213,14 @@ class VersionSet {
const VersionEdit* edit); const VersionEdit* edit);
Status ReadAndRecover( Status ReadAndRecover(
log::Reader* reader, AtomicGroupReadBuffer* read_buffer, log::Reader& reader, AtomicGroupReadBuffer* read_buffer,
const std::unordered_map<std::string, ColumnFamilyOptions>& const std::unordered_map<std::string, ColumnFamilyOptions>&
name_to_options, name_to_options,
std::unordered_map<int, std::string>& column_families_not_found, std::unordered_map<int, std::string>& column_families_not_found,
std::unordered_map< std::unordered_map<
uint32_t, std::unique_ptr<BaseReferencedVersionBuilder>>& builders, uint32_t, std::unique_ptr<BaseReferencedVersionBuilder>>& builders,
VersionEditParams* version_edit, std::string* db_id = nullptr); Status* log_read_status, VersionEditParams* version_edit,
std::string* db_id = nullptr);
// REQUIRES db mutex // REQUIRES db mutex
Status ApplyOneVersionEditToBuilder( Status ApplyOneVersionEditToBuilder(
...@@ -1342,8 +1349,7 @@ class ReactiveVersionSet : public VersionSet { ...@@ -1342,8 +1349,7 @@ class ReactiveVersionSet : public VersionSet {
std::unique_ptr<log::FragmentBufferedReader>* manifest_reader); std::unique_ptr<log::FragmentBufferedReader>* manifest_reader);
private: private:
std::unordered_map<uint32_t, std::unique_ptr<BaseReferencedVersionBuilder>> VersionBuilderMap active_version_builders_;
active_version_builders_;
AtomicGroupReadBuffer read_buffer_; AtomicGroupReadBuffer read_buffer_;
// Number of version edits to skip by ReadAndApply at the beginning of a new // Number of version edits to skip by ReadAndApply at the beginning of a new
// MANIFEST created by primary. // MANIFEST created by primary.
......
支持 Markdown
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册