提交 3514cf51 编辑于 作者: Yanqin Jin's avatar Yanqin Jin 提交者: Andrew Kryczka
浏览文件

Fix data race to VersionSet::io_status_ (#7034)

Summary:
After https://github.com/facebook/rocksdb/issues/6949 , VersionSet::io_status_ can be concurrently accessed by multiple
threads without lock, causing tsan test to fail. For example, a bg flush thread
resets io_status_ before calling LogAndApply(), while another thread already in
the process of LogAndApply() reads io_status_. This is a bug.

We do not have to reset io_status_ each time we call LogAndApply(). io_status_
is part of the state of VersionSet, and it indicates the outcome of preceding
MANIFEST/CURRENT files IO operations. Its value should be updated only when:

1. MANIFEST/CURRENT files IO fail for the first time.
2. MANIFEST/CURRENT files IO succeed as part of recovering from a prior
   failure without process restart, e.g. calling Resume().

Test Plan (devserver):
COMPILE_WITH_TSAN=1 make check
COMPILE_WITH_TSAN=1 make db_test2
./db_test2 --gtest_filter=DBTest2.CompactionStall
Pull Request resolved: https://github.com/facebook/rocksdb/pull/7034

Reviewed By: zhichao-cao

Differential Revision: D22247137

Pulled By: riversand963

fbshipit-source-id: 77b83e05390f3ee3cd2d96d3fdd6fe4f225e3216
上级 dd63f04c
......@@ -721,7 +721,6 @@ Status CompactionJob::Install(const MutableCFOptions& mutable_cf_options) {
cfd->internal_stats()->AddCompactionStats(
compact_->compaction->output_level(), thread_pri_, compaction_stats_);
versions_->SetIOStatus(IOStatus::OK());
if (status.ok()) {
status = InstallCompactionResults(mutable_cf_options);
}
......
......@@ -2703,7 +2703,6 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
for (const auto& f : *c->inputs(0)) {
c->edit()->DeleteFile(c->level(), f->fd.GetNumber());
}
versions_->SetIOStatus(IOStatus::OK());
status = versions_->LogAndApply(c->column_family_data(),
*c->mutable_cf_options(), c->edit(),
&mutex_, directories_.GetDbDir());
......@@ -2761,7 +2760,6 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
}
}
versions_->SetIOStatus(IOStatus::OK());
status = versions_->LogAndApply(c->column_family_data(),
*c->mutable_cf_options(), c->edit(),
&mutex_, directories_.GetDbDir());
......
......@@ -470,7 +470,6 @@ Status MemTableList::TryInstallMemtableFlushResults(
}
// this can release and reacquire the mutex.
vset->SetIOStatus(IOStatus::OK());
s = vset->LogAndApply(cfd, mutable_cf_options, edit_list, mu,
db_directory);
*io_s = vset->io_status();
......
......@@ -3878,10 +3878,6 @@ Status VersionSet::ProcessManifestWrites(
}
#endif // NDEBUG
uint64_t new_manifest_file_size = 0;
Status s;
IOStatus io_s;
assert(pending_manifest_file_number_ == 0);
if (!descriptor_log_ ||
manifest_file_size_ > db_options_->max_manifest_file_size) {
......@@ -3911,6 +3907,9 @@ Status VersionSet::ProcessManifestWrites(
}
}
uint64_t new_manifest_file_size = 0;
Status s;
IOStatus io_s;
{
FileOptions opt_file_opts = fs_->OptimizeForManifestWrite(file_options_);
mu->Unlock();
......@@ -3947,9 +3946,9 @@ Status VersionSet::ProcessManifestWrites(
std::string descriptor_fname =
DescriptorFileName(dbname_, pending_manifest_file_number_);
std::unique_ptr<FSWritableFile> descriptor_file;
s = NewWritableFile(fs_, descriptor_fname, &descriptor_file,
opt_file_opts);
if (s.ok()) {
io_s = NewWritableFile(fs_, descriptor_fname, &descriptor_file,
opt_file_opts);
if (io_s.ok()) {
descriptor_file->SetPreallocationBlockSize(
db_options_->manifest_preallocation_size);
......@@ -3958,7 +3957,10 @@ Status VersionSet::ProcessManifestWrites(
nullptr, db_options_->listeners));
descriptor_log_.reset(
new log::Writer(std::move(file_writer), 0, false));
s = WriteCurrentStateToManifest(curr_state, descriptor_log_.get());
s = WriteCurrentStateToManifest(curr_state, descriptor_log_.get(),
io_s);
} else {
s = io_s;
}
}
......@@ -3994,7 +3996,6 @@ Status VersionSet::ProcessManifestWrites(
#endif /* !NDEBUG */
io_s = descriptor_log_->AddRecord(record);
if (!io_s.ok()) {
io_status_ = io_s;
s = io_s;
break;
}
......@@ -4005,12 +4006,9 @@ Status VersionSet::ProcessManifestWrites(
"VersionSet::ProcessManifestWrites:AfterSyncManifest", &io_s);
}
if (!io_s.ok()) {
io_status_ = io_s;
s = io_s;
ROCKS_LOG_ERROR(db_options_->info_log, "MANIFEST write %s\n",
s.ToString().c_str());
} else if (io_status_.IsIOError()) {
io_status_ = io_s;
}
}
......@@ -4020,10 +4018,7 @@ Status VersionSet::ProcessManifestWrites(
io_s = SetCurrentFile(fs_, dbname_, pending_manifest_file_number_,
db_directory);
if (!io_s.ok()) {
io_status_ = io_s;
s = io_s;
} else if (io_status_.IsIOError()) {
io_status_ = io_s;
}
TEST_SYNC_POINT("VersionSet::ProcessManifestWrites:AfterNewManifest");
}
......@@ -4044,6 +4039,14 @@ Status VersionSet::ProcessManifestWrites(
mu->Lock();
}
if (!io_s.ok()) {
if (io_status_.ok()) {
io_status_ = io_s;
}
} else if (!io_status_.ok()) {
io_status_ = io_s;
}
// Append the old manifest file to the obsolete_manifest_ list to be deleted
// by PurgeObsoleteFiles later.
if (s.ok() && new_descriptor_log) {
......@@ -5297,7 +5300,7 @@ void VersionSet::MarkMinLogNumberToKeep2PC(uint64_t number) {
Status VersionSet::WriteCurrentStateToManifest(
const std::unordered_map<uint32_t, MutableCFState>& curr_state,
log::Writer* log) {
log::Writer* log, IOStatus& io_s) {
// TODO: Break up into multiple records to reduce memory usage on recovery?
// WARNING: This method doesn't hold a mutex!!
......@@ -5306,6 +5309,7 @@ Status VersionSet::WriteCurrentStateToManifest(
// LogAndApply. Column family manipulations can only happen within LogAndApply
// (the same single thread), so we're safe to iterate.
assert(io_s.ok());
if (db_options_->write_dbid_to_manifest) {
VersionEdit edit_for_db_id;
assert(!db_id_.empty());
......@@ -5315,10 +5319,9 @@ Status VersionSet::WriteCurrentStateToManifest(
return Status::Corruption("Unable to Encode VersionEdit:" +
edit_for_db_id.DebugString(true));
}
IOStatus io_s = log->AddRecord(db_id_record);
io_s = log->AddRecord(db_id_record);
if (!io_s.ok()) {
io_status_ = io_s;
return std::move(io_s);
return io_s;
}
}
......@@ -5345,10 +5348,9 @@ Status VersionSet::WriteCurrentStateToManifest(
return Status::Corruption(
"Unable to Encode VersionEdit:" + edit.DebugString(true));
}
IOStatus io_s = log->AddRecord(record);
io_s = log->AddRecord(record);
if (!io_s.ok()) {
io_status_ = io_s;
return std::move(io_s);
return io_s;
}
}
......@@ -5398,10 +5400,9 @@ Status VersionSet::WriteCurrentStateToManifest(
return Status::Corruption(
"Unable to Encode VersionEdit:" + edit.DebugString(true));
}
IOStatus io_s = log->AddRecord(record);
io_s = log->AddRecord(record);
if (!io_s.ok()) {
io_status_ = io_s;
return std::move(io_s);
return io_s;
}
}
}
......
......@@ -1159,10 +1159,7 @@ class VersionSet {
static uint64_t GetTotalSstFilesSize(Version* dummy_versions);
// Get the IO Status returned by written Manifest.
IOStatus io_status() const { return io_status_; }
// Set the IO Status to OK. Called before Manifest write if needed.
void SetIOStatus(const IOStatus& s) { io_status_ = s; }
const IOStatus& io_status() const { return io_status_; }
protected:
using VersionBuilderMap =
......@@ -1205,7 +1202,7 @@ class VersionSet {
// Save current contents to *log
Status WriteCurrentStateToManifest(
const std::unordered_map<uint32_t, MutableCFState>& curr_state,
log::Writer* log);
log::Writer* log, IOStatus& io_s);
void AppendVersion(ColumnFamilyData* column_family_data, Version* v);
......
支持 Markdown
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册