提交 dd63f04c 编辑于 作者: Yanqin Jin's avatar Yanqin Jin 提交者: Andrew Kryczka
浏览文件

First step towards handling MANIFEST write error (#6949)

Summary:
This PR provides preliminary support for handling IO error during MANIFEST write.
File write/sync is not guaranteed to be atomic. If we encounter an IOError while writing/syncing to the MANIFEST file, we cannot be sure about the state of the MANIFEST file. The version edits may or may not have reached the file. During cleanup, if we delete the newly-generated SST files referenced by the pending version edit(s), but the version edit(s) actually are persistent in the MANIFEST, then next recovery attempt will process the version edits(s) and then fail since the SST files have already been deleted.
One approach is to truncate the MANIFEST after write/sync error, so that it is safe to delete the SST files. However, file truncation may not be supported on certain file systems. Therefore, we take the following approach.
If an IOError is detected during MANIFEST write/sync, we disable file deletions for the faulty database. Depending on whether the IOError is retryable (set by underlying file system), either RocksDB or application can call `DB::Resume()`, or simply shutdown and restart. During `Resume()`, RocksDB will try to switch to a new MANIFEST and write all existing in-memory version storage in the new file. If this succeeds, then RocksDB may proceed. If all recovery is completed, then file deletions will be re-enabled.
Note that multiple threads can call `LogAndApply()` at the same time, though only one of them will be going through the process MANIFEST write, possibly batching the version edits of other threads. When the leading MANIFEST writer finishes, all of the MANIFEST writing threads in this batch will have the same IOError. They will all call `ErrorHandler::SetBGError()` in which file deletion will be disabled.

Possible future directions:
- Add an `ErrorContext` structure so that it is easier to pass more info to `ErrorHandler`. Currently, as in this example, a new `BackgroundErrorReason` has to be added.

Test plan (dev server):
make check
Pull Request resolved: https://github.com/facebook/rocksdb/pull/6949

Reviewed By: anand1976

Differential Revision: D22026020

Pulled By: riversand963

fbshipit-source-id: f3c68a2ef45d9b505d0d625c7c5e0c88495b91c8
上级 115c9113
......@@ -2,6 +2,7 @@
## 6.11.3 (7/9/2020)
### Bug Fixes
* Fix a bug when index_type == kTwoLevelIndexSearch in PartitionedIndexBuilder to update FlushPolicy to point to internal key partitioner when it changes from user-key mode to internal-key mode in index partition.
* Disable file deletion after MANIFEST write/sync failure until db re-open or Resume() so that subsequent re-open will not see MANIFEST referencing deleted SSTs.
## 6.11.1 (6/23/2020)
### Bug Fixes
......
......@@ -721,7 +721,7 @@ Status CompactionJob::Install(const MutableCFOptions& mutable_cf_options) {
cfd->internal_stats()->AddCompactionStats(
compact_->compaction->output_level(), thread_pri_, compaction_stats_);
versions_->SetIOStatusOK();
versions_->SetIOStatus(IOStatus::OK());
if (status.ok()) {
status = InstallCompactionResults(mutable_cf_options);
}
......
......@@ -3023,6 +3023,33 @@ TEST_F(DBBasicTestMultiGetDeadline, MultiGetDeadlineExceeded) {
Close();
}
TEST_F(DBBasicTest, ManifestWriteFailure) {
Options options = GetDefaultOptions();
options.create_if_missing = true;
options.disable_auto_compactions = true;
options.env = env_;
DestroyAndReopen(options);
ASSERT_OK(Put("foo", "bar"));
ASSERT_OK(Flush());
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
SyncPoint::GetInstance()->SetCallBack(
"VersionSet::ProcessManifestWrites:AfterSyncManifest", [&](void* arg) {
ASSERT_NE(nullptr, arg);
auto* s = reinterpret_cast<Status*>(arg);
ASSERT_OK(*s);
// Manually overwrite return status
*s = Status::IOError();
});
SyncPoint::GetInstance()->EnableProcessing();
ASSERT_OK(Put("key", "value"));
ASSERT_NOK(Flush());
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
SyncPoint::GetInstance()->EnableProcessing();
Reopen(options);
}
} // namespace ROCKSDB_NAMESPACE
#ifdef ROCKSDB_UNITTESTS_WITH_CUSTOM_OBJECTS_FROM_STATIC_LIBS
......
......@@ -23,57 +23,6 @@
namespace ROCKSDB_NAMESPACE {
Status DBImpl::DisableFileDeletions() {
InstrumentedMutexLock l(&mutex_);
++disable_delete_obsolete_files_;
if (disable_delete_obsolete_files_ == 1) {
ROCKS_LOG_INFO(immutable_db_options_.info_log, "File Deletions Disabled");
} else {
ROCKS_LOG_WARN(immutable_db_options_.info_log,
"File Deletions Disabled, but already disabled. Counter: %d",
disable_delete_obsolete_files_);
}
return Status::OK();
}
Status DBImpl::EnableFileDeletions(bool force) {
// Job id == 0 means that this is not our background process, but rather
// user thread
JobContext job_context(0);
bool file_deletion_enabled = false;
{
InstrumentedMutexLock l(&mutex_);
if (force) {
// if force, we need to enable file deletions right away
disable_delete_obsolete_files_ = 0;
} else if (disable_delete_obsolete_files_ > 0) {
--disable_delete_obsolete_files_;
}
if (disable_delete_obsolete_files_ == 0) {
file_deletion_enabled = true;
FindObsoleteFiles(&job_context, true);
bg_cv_.SignalAll();
}
}
if (file_deletion_enabled) {
ROCKS_LOG_INFO(immutable_db_options_.info_log, "File Deletions Enabled");
if (job_context.HaveSomethingToDelete()) {
PurgeObsoleteFiles(job_context);
}
} else {
ROCKS_LOG_WARN(immutable_db_options_.info_log,
"File Deletions Enable, but not really enabled. Counter: %d",
disable_delete_obsolete_files_);
}
job_context.Clean();
LogFlush(immutable_db_options_.info_log);
return Status::OK();
}
int DBImpl::IsFileDeletionsEnabled() const {
return !disable_delete_obsolete_files_;
}
Status DBImpl::GetLiveFiles(std::vector<std::string>& ret,
uint64_t* manifest_file_size,
bool flush_memtable) {
......
......@@ -312,8 +312,36 @@ Status DBImpl::ResumeImpl() {
}
// Make sure the IO Status stored in version set is set to OK.
bool file_deletion_disabled = !IsFileDeletionsEnabled();
if (s.ok()) {
versions_->SetIOStatusOK();
IOStatus io_s = versions_->io_status();
if (io_s.IsIOError()) {
// If resuming from IOError resulted from MANIFEST write, then assert
// that we must have already set the MANIFEST writer to nullptr during
// clean-up phase MANIFEST writing. We must have also disabled file
// deletions.
assert(!versions_->descriptor_log_);
assert(file_deletion_disabled);
// Since we are trying to recover from MANIFEST write error, we need to
// switch to a new MANIFEST anyway. The old MANIFEST can be corrupted.
// Therefore, force writing a dummy version edit because we do not know
// whether there are flush jobs with non-empty data to flush, triggering
// appends to MANIFEST.
VersionEdit edit;
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(default_cf_handle_);
assert(cfh);
ColumnFamilyData* cfd = cfh->cfd();
const MutableCFOptions& cf_opts = *cfd->GetLatestMutableCFOptions();
s = versions_->LogAndApply(cfd, cf_opts, &edit, &mutex_,
directories_.GetDbDir());
if (!s.ok()) {
io_s = versions_->io_status();
if (!io_s.ok()) {
s = error_handler_.SetBGError(io_s,
BackgroundErrorReason::kManifestWrite);
}
}
}
}
// We cannot guarantee consistency of the WAL. So force flush Memtables of
......@@ -364,6 +392,13 @@ Status DBImpl::ResumeImpl() {
job_context.Clean();
if (s.ok()) {
assert(versions_->io_status().ok());
// If we reach here, we should re-enable file deletions if it was disabled
// during previous error handling.
if (file_deletion_disabled) {
// Always return ok
EnableFileDeletions(/*force=*/true);
}
ROCKS_LOG_INFO(immutable_db_options_.info_log, "Successfully resumed DB");
}
mutex_.Lock();
......@@ -4389,6 +4424,14 @@ Status DBImpl::IngestExternalFiles(
#endif // !NDEBUG
}
}
} else if (versions_->io_status().IsIOError()) {
// Error while writing to MANIFEST.
// In fact, versions_->io_status() can also be the result of renaming
// CURRENT file. With current code, it's just difficult to tell. So just
// be pessimistic and try write to a new MANIFEST.
// TODO: distinguish between MANIFEST write and CURRENT renaming
const IOStatus& io_s = versions_->io_status();
error_handler_.SetBGError(io_s, BackgroundErrorReason::kManifestWrite);
}
// Resume writes to the DB
......
......@@ -356,6 +356,12 @@ class DBImpl : public DB {
virtual Status Close() override;
virtual Status DisableFileDeletions() override;
virtual Status EnableFileDeletions(bool force) override;
virtual bool IsFileDeletionsEnabled() const;
Status GetStatsHistory(
uint64_t start_time, uint64_t end_time,
std::unique_ptr<StatsHistoryIterator>* stats_iterator) override;
......@@ -363,9 +369,6 @@ class DBImpl : public DB {
#ifndef ROCKSDB_LITE
using DB::ResetStats;
virtual Status ResetStats() override;
virtual Status DisableFileDeletions() override;
virtual Status EnableFileDeletions(bool force) override;
virtual int IsFileDeletionsEnabled() const;
// All the returned filenames start with "/"
virtual Status GetLiveFiles(std::vector<std::string>&,
uint64_t* manifest_file_size,
......@@ -1780,6 +1783,8 @@ class DBImpl : public DB {
SuperVersion* sv, SequenceNumber snap_seqnum, ReadCallback* callback,
bool* is_blob_index);
Status DisableFileDeletionsWithLock();
// table_cache_ provides its own synchronization
std::shared_ptr<Cache> table_cache_;
......
......@@ -210,7 +210,15 @@ Status DBImpl::FlushMemTableToOutputFile(
if (!s.ok() && !s.IsShutdownInProgress() && !s.IsColumnFamilyDropped()) {
if (!io_s.ok() && !io_s.IsShutdownInProgress() &&
!io_s.IsColumnFamilyDropped()) {
error_handler_.SetBGError(io_s, BackgroundErrorReason::kFlush);
// Error while writing to MANIFEST.
// In fact, versions_->io_status() can also be the result of renaming
// CURRENT file. With current code, it's just difficult to tell. So just
// be pessimistic and try write to a new MANIFEST.
// TODO: distinguish between MANIFEST write and CURRENT renaming
auto err_reason = versions_->io_status().ok()
? BackgroundErrorReason::kFlush
: BackgroundErrorReason::kManifestWrite;
error_handler_.SetBGError(io_s, err_reason);
} else {
Status new_bg_error = s;
error_handler_.SetBGError(new_bg_error, BackgroundErrorReason::kFlush);
......@@ -574,7 +582,15 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
// it is not because of CF drop.
if (!s.ok() && !s.IsColumnFamilyDropped()) {
if (!io_s.ok() && !io_s.IsColumnFamilyDropped()) {
error_handler_.SetBGError(io_s, BackgroundErrorReason::kFlush);
// Error while writing to MANIFEST.
// In fact, versions_->io_status() can also be the result of renaming
// CURRENT file. With current code, it's just difficult to tell. So just
// be pessimistic and try write to a new MANIFEST.
// TODO: distinguish between MANIFEST write and CURRENT renaming
auto err_reason = versions_->io_status().ok()
? BackgroundErrorReason::kFlush
: BackgroundErrorReason::kManifestWrite;
error_handler_.SetBGError(io_s, err_reason);
} else {
Status new_bg_error = s;
error_handler_.SetBGError(new_bg_error, BackgroundErrorReason::kFlush);
......@@ -2687,7 +2703,7 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
for (const auto& f : *c->inputs(0)) {
c->edit()->DeleteFile(c->level(), f->fd.GetNumber());
}
versions_->SetIOStatusOK();
versions_->SetIOStatus(IOStatus::OK());
status = versions_->LogAndApply(c->column_family_data(),
*c->mutable_cf_options(), c->edit(),
&mutex_, directories_.GetDbDir());
......@@ -2745,7 +2761,7 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
}
}
versions_->SetIOStatusOK();
versions_->SetIOStatus(IOStatus::OK());
status = versions_->LogAndApply(c->column_family_data(),
*c->mutable_cf_options(), c->edit(),
&mutex_, directories_.GetDbDir());
......@@ -2877,7 +2893,15 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
ROCKS_LOG_WARN(immutable_db_options_.info_log, "Compaction error: %s",
status.ToString().c_str());
if (!io_s.ok()) {
error_handler_.SetBGError(io_s, BackgroundErrorReason::kCompaction);
// Error while writing to MANIFEST.
// In fact, versions_->io_status() can also be the result of renaming
// CURRENT file. With current code, it's just difficult to tell. So just
// be pessimistic and try write to a new MANIFEST.
// TODO: distinguish between MANIFEST write and CURRENT renaming
auto err_reason = versions_->io_status().ok()
? BackgroundErrorReason::kCompaction
: BackgroundErrorReason::kManifestWrite;
error_handler_.SetBGError(io_s, err_reason);
} else {
error_handler_.SetBGError(status, BackgroundErrorReason::kCompaction);
}
......
......@@ -36,6 +36,62 @@ uint64_t DBImpl::MinObsoleteSstNumberToKeep() {
return std::numeric_limits<uint64_t>::max();
}
Status DBImpl::DisableFileDeletions() {
InstrumentedMutexLock l(&mutex_);
return DisableFileDeletionsWithLock();
}
Status DBImpl::DisableFileDeletionsWithLock() {
mutex_.AssertHeld();
++disable_delete_obsolete_files_;
if (disable_delete_obsolete_files_ == 1) {
ROCKS_LOG_INFO(immutable_db_options_.info_log, "File Deletions Disabled");
} else {
ROCKS_LOG_WARN(immutable_db_options_.info_log,
"File Deletions Disabled, but already disabled. Counter: %d",
disable_delete_obsolete_files_);
}
return Status::OK();
}
Status DBImpl::EnableFileDeletions(bool force) {
// Job id == 0 means that this is not our background process, but rather
// user thread
JobContext job_context(0);
bool file_deletion_enabled = false;
{
InstrumentedMutexLock l(&mutex_);
if (force) {
// if force, we need to enable file deletions right away
disable_delete_obsolete_files_ = 0;
} else if (disable_delete_obsolete_files_ > 0) {
--disable_delete_obsolete_files_;
}
if (disable_delete_obsolete_files_ == 0) {
file_deletion_enabled = true;
FindObsoleteFiles(&job_context, true);
bg_cv_.SignalAll();
}
}
if (file_deletion_enabled) {
ROCKS_LOG_INFO(immutable_db_options_.info_log, "File Deletions Enabled");
if (job_context.HaveSomethingToDelete()) {
PurgeObsoleteFiles(job_context);
}
} else {
ROCKS_LOG_WARN(immutable_db_options_.info_log,
"File Deletions Enable, but not really enabled. Counter: %d",
disable_delete_obsolete_files_);
}
job_context.Clean();
LogFlush(immutable_db_options_.info_log);
return Status::OK();
}
bool DBImpl::IsFileDeletionsEnabled() const {
return 0 == disable_delete_obsolete_files_;
}
// * Returns the list of live files in 'sst_live' and 'blob_live'.
// If it's doing full scan:
// * Returns the list of all files in the filesystem in
......
......@@ -2990,10 +2990,11 @@ class ModelDB : public DB {
Status SyncWAL() override { return Status::OK(); }
#ifndef ROCKSDB_LITE
Status DisableFileDeletions() override { return Status::OK(); }
Status EnableFileDeletions(bool /*force*/) override { return Status::OK(); }
#ifndef ROCKSDB_LITE
Status GetLiveFiles(std::vector<std::string>&, uint64_t* /*size*/,
bool /*flush_memtable*/ = true) override {
return Status::OK();
......
......@@ -51,9 +51,19 @@ std::map<std::tuple<BackgroundErrorReason, Status::Code, Status::SubCode, bool>,
Status::Code::kIOError, Status::SubCode::kNoSpace,
false),
Status::Severity::kHardError},
// Errors during MANIFEST write
{std::make_tuple(BackgroundErrorReason::kManifestWrite,
Status::Code::kIOError, Status::SubCode::kNoSpace,
true),
Status::Severity::kHardError},
{std::make_tuple(BackgroundErrorReason::kManifestWrite,
Status::Code::kIOError, Status::SubCode::kNoSpace,
false),
Status::Severity::kHardError},
};
std::map<std::tuple<BackgroundErrorReason, Status::Code, bool>, Status::Severity>
std::map<std::tuple<BackgroundErrorReason, Status::Code, bool>,
Status::Severity>
DefaultErrorSeverityMap = {
// Errors during BG compaction
{std::make_tuple(BackgroundErrorReason::kCompaction,
......@@ -75,11 +85,11 @@ std::map<std::tuple<BackgroundErrorReason, Status::Code, bool>, Status::Severity
{std::make_tuple(BackgroundErrorReason::kFlush,
Status::Code::kCorruption, false),
Status::Severity::kNoError},
{std::make_tuple(BackgroundErrorReason::kFlush,
Status::Code::kIOError, true),
{std::make_tuple(BackgroundErrorReason::kFlush, Status::Code::kIOError,
true),
Status::Severity::kFatalError},
{std::make_tuple(BackgroundErrorReason::kFlush,
Status::Code::kIOError, false),
{std::make_tuple(BackgroundErrorReason::kFlush, Status::Code::kIOError,
false),
Status::Severity::kNoError},
// Errors during Write
{std::make_tuple(BackgroundErrorReason::kWriteCallback,
......@@ -94,6 +104,12 @@ std::map<std::tuple<BackgroundErrorReason, Status::Code, bool>, Status::Severity
{std::make_tuple(BackgroundErrorReason::kWriteCallback,
Status::Code::kIOError, false),
Status::Severity::kNoError},
{std::make_tuple(BackgroundErrorReason::kManifestWrite,
Status::Code::kIOError, true),
Status::Severity::kFatalError},
{std::make_tuple(BackgroundErrorReason::kManifestWrite,
Status::Code::kIOError, false),
Status::Severity::kFatalError},
};
std::map<std::tuple<BackgroundErrorReason, bool>, Status::Severity>
......@@ -247,6 +263,10 @@ Status ErrorHandler::SetBGError(const IOStatus& bg_io_err,
if (recovery_in_prog_ && recovery_error_.ok()) {
recovery_error_ = bg_io_err;
}
if (BackgroundErrorReason::kManifestWrite == reason) {
// Always returns ok
db_->DisableFileDeletionsWithLock();
}
Status new_bg_io_err = bg_io_err;
Status s;
if (bg_io_err.GetDataLoss()) {
......
......@@ -798,7 +798,7 @@ bool InternalStats::HandleCurrentSuperVersionNumber(uint64_t* value,
bool InternalStats::HandleIsFileDeletionsEnabled(uint64_t* value, DBImpl* db,
Version* /*version*/) {
*value = db->IsFileDeletionsEnabled();
*value = db->IsFileDeletionsEnabled() ? 1 : 0;
return true;
}
......
......@@ -470,7 +470,7 @@ Status MemTableList::TryInstallMemtableFlushResults(
}
// this can release and reacquire the mutex.
vset->SetIOStatusOK();
vset->SetIOStatus(IOStatus::OK());
s = vset->LogAndApply(cfd, mutable_cf_options, edit_list, mu,
db_directory);
*io_s = vset->io_status();
......
......@@ -4001,12 +4001,16 @@ Status VersionSet::ProcessManifestWrites(
}
if (s.ok()) {
io_s = SyncManifest(env_, db_options_, descriptor_log_->file());
TEST_SYNC_POINT_CALLBACK(
"VersionSet::ProcessManifestWrites:AfterSyncManifest", &io_s);
}
if (!io_s.ok()) {
io_status_ = io_s;
s = io_s;
ROCKS_LOG_ERROR(db_options_->info_log, "MANIFEST write %s\n",
s.ToString().c_str());
} else if (io_status_.IsIOError()) {
io_status_ = io_s;
}
}
......@@ -4018,6 +4022,8 @@ Status VersionSet::ProcessManifestWrites(
if (!io_s.ok()) {
io_status_ = io_s;
s = io_s;
} else if (io_status_.IsIOError()) {
io_status_ = io_s;
}
TEST_SYNC_POINT("VersionSet::ProcessManifestWrites:AfterNewManifest");
}
......
......@@ -1162,7 +1162,7 @@ class VersionSet {
IOStatus io_status() const { return io_status_; }
// Set the IO Status to OK. Called before Manifest write if needed.
void SetIOStatusOK() { io_status_ = IOStatus::OK(); }
void SetIOStatus(const IOStatus& s) { io_status_ = s; }
protected:
using VersionBuilderMap =
......
......@@ -1266,8 +1266,6 @@ class DB {
// updated, false if user attempted to call if with seqnum <= current value.
virtual bool SetPreserveDeletesSequenceNumber(SequenceNumber seqnum) = 0;
#ifndef ROCKSDB_LITE
// Prevent file deletions. Compactions will continue to occur,
// but no obsolete files will be deleted. Calling this multiple
// times have the same effect as calling it once.
......@@ -1284,6 +1282,7 @@ class DB {
// threads call EnableFileDeletions()
virtual Status EnableFileDeletions(bool force = true) = 0;
#ifndef ROCKSDB_LITE
// GetLiveFiles followed by GetSortedWalFiles can generate a lossless backup
// Retrieve the list of all files in the database. The files are
......
......@@ -117,6 +117,7 @@ enum class BackgroundErrorReason {
kCompaction,
kWriteCallback,
kMemTable,
kManifestWrite,
};
enum class WriteStallCondition {
......
支持 Markdown
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册