提交 0c0ddab7 编辑于 作者: rui xia's avatar rui xia
浏览文件

Enable trivial move in CompactFiles API

上级 75fe6464
......@@ -236,6 +236,7 @@ Compaction::Compaction(VersionStorageInfo* vstorage,
bottommost_level_(IsBottommostLevel(output_level_, vstorage, inputs_)),
is_full_compaction_(IsFullCompaction(vstorage, inputs_)),
is_manual_compaction_(_manual_compaction),
trivial_move_ignore_compaction_filter_(false),
is_trivial_move_(false),
compaction_reason_(_compaction_reason) {
MarkFilesBeingCompacted(true);
......@@ -307,7 +308,8 @@ bool Compaction::IsTrivialMove() const {
if (is_manual_compaction_ &&
(immutable_cf_options_.compaction_filter != nullptr ||
immutable_cf_options_.compaction_filter_factory != nullptr)) {
immutable_cf_options_.compaction_filter_factory != nullptr) &&
!trivial_move_ignore_compaction_filter_) {
// This is a manual compaction and we have a compaction filter that should
// be executed, we cannot do a trivial move
return false;
......
......@@ -293,6 +293,10 @@ class Compaction {
uint64_t MinInputFileOldestAncesterTime() const;
void set_trivial_move_ignore_compaction_filter(bool trivial_move_ignore_compaction_filter) {
trivial_move_ignore_compaction_filter_ = trivial_move_ignore_compaction_filter;
}
private:
// mark (or clear) all files that are being compacted
void MarkFilesBeingCompacted(bool mark_as_compacted);
......@@ -358,6 +362,9 @@ class Compaction {
// Is this compaction requested by the client?
const bool is_manual_compaction_;
// enable a trivial move by ignoring compaction filter
bool trivial_move_ignore_compaction_filter_;
// True if we can do trivial move in Universal multi level
// compaction
bool is_trivial_move_;
......
......@@ -1026,52 +1026,108 @@ Status DBImpl::CompactFilesImpl(
// deletion compaction currently not allowed in CompactFiles.
assert(!c->deletion_compaction());
Status status;
CompactionJobStats compaction_job_stats;
// preparation for normal compaction
std::vector<SequenceNumber> snapshot_seqs;
SequenceNumber earliest_write_conflict_snapshot;
SnapshotChecker* snapshot_checker;
GetSnapshotContext(job_context, &snapshot_seqs,
&earliest_write_conflict_snapshot, &snapshot_checker);
&earliest_write_conflict_snapshot, &snapshot_checker);
std::unique_ptr<std::list<uint64_t>::iterator> pending_outputs_inserted_elem(
new std::list<uint64_t>::iterator(
CaptureCurrentFileNumberInPendingOutputs()));
assert(is_snapshot_supported_ || snapshots_.empty());
CompactionJobStats compaction_job_stats;
CompactionJob compaction_job(
job_context->job_id, c.get(), immutable_db_options_,
file_options_for_compaction_, versions_.get(), &shutting_down_,
preserve_deletes_seqnum_.load(), log_buffer, directories_.GetDbDir(),
GetDataDir(c->column_family_data(), c->output_path_id()), stats_, &mutex_,
&error_handler_, snapshot_seqs, earliest_write_conflict_snapshot,
snapshot_checker, table_cache_, &event_logger_,
c->mutable_cf_options()->paranoid_file_checks,
c->mutable_cf_options()->report_bg_io_stats, dbname_,
&compaction_job_stats, Env::Priority::USER, &manual_compaction_paused_);
// Creating a compaction influences the compaction score because the score
// takes running compactions into account (by skipping files that are already
// being compacted). Since we just changed compaction score, we recalculate it
// here.
version->storage_info()->ComputeCompactionScore(*cfd->ioptions(),
*c->mutable_cf_options());
compaction_job.Prepare();
// if the manual CompactionFiles is a trivial move, do not create CompactionJob.
c->set_trivial_move_ignore_compaction_filter(compact_options.trivial_move_ignore_compaction_filter);
if (c->IsTrivialMove()) {
compaction_job_stats.num_input_files = c->num_input_files(0);
mutex_.Unlock();
TEST_SYNC_POINT("CompactFilesImpl:0");
TEST_SYNC_POINT("CompactFilesImpl:1");
compaction_job.Run();
TEST_SYNC_POINT("CompactFilesImpl:2");
TEST_SYNC_POINT("CompactFilesImpl:3");
mutex_.Lock();
// move files to next level
int32_t moved_files = 0;
int32_t moved_bytes = 0;
for (unsigned int l = 0; l < c->num_input_levels(); l++) {
if (c->level(l) == c->output_level()) {
continue;
}
Status status = compaction_job.Install(*c->mutable_cf_options());
if (status.ok()) {
for (size_t i = 0; i < c->num_input_files(l); i++) {
FileMetaData* f = c->input(l, i);
c->edit()->DeleteFile(c->level(l), f->fd.GetNumber());
c->edit()->AddFile(c->output_level(), f->fd.GetNumber(),
f->fd.GetPathId(), f->fd.GetFileSize(), f->smallest,
f->largest, f->fd.smallest_seqno,
f->fd.largest_seqno, f->marked_for_compaction,
f->oldest_blob_file_number, f->oldest_ancester_time,
f->file_creation_time, f->file_checksum,
f->file_checksum_func_name);
ROCKS_LOG_BUFFER(
log_buffer,
"[%s] Moving #%" PRIu64 " to level-%d %" PRIu64 " bytes\n",
c->column_family_data()->GetName().c_str(), f->fd.GetNumber(),
c->output_level(), f->fd.GetFileSize());
++moved_files;
moved_bytes += f->fd.GetFileSize();
}
}
status = versions_->LogAndApply(c->column_family_data(),
*c->mutable_cf_options(), c->edit(),
&mutex_, directories_.GetDbDir());
InstallSuperVersionAndScheduleWork(c->column_family_data(),
&job_context->superversion_contexts[0],
*c->mutable_cf_options());
c->column_family_data()->internal_stats()->IncBytesMoved(c->output_level(),
moved_bytes);
VersionStorageInfo::LevelSummaryStorage tmp;
ROCKS_LOG_BUFFER(
log_buffer,
"[%s] Moved #%d files to level-%d %" PRIu64 " bytes %s: %s\n",
c->column_family_data()->GetName().c_str(), moved_files,
c->output_level(), moved_bytes, status.ToString().c_str(),
c->column_family_data()->current()->storage_info()->LevelSummary(&tmp));
} else {
assert(is_snapshot_supported_ || snapshots_.empty());
CompactionJob compaction_job(
job_context->job_id, c.get(), immutable_db_options_,
file_options_for_compaction_, versions_.get(), &shutting_down_,
preserve_deletes_seqnum_.load(), log_buffer, directories_.GetDbDir(),
GetDataDir(c->column_family_data(), c->output_path_id()), stats_, &mutex_,
&error_handler_, snapshot_seqs, earliest_write_conflict_snapshot,
snapshot_checker, table_cache_, &event_logger_,
c->mutable_cf_options()->paranoid_file_checks,
c->mutable_cf_options()->report_bg_io_stats, dbname_,
&compaction_job_stats, Env::Priority::USER, &manual_compaction_paused_);
// Creating a compaction influences the compaction score because the score
// takes running compactions into account (by skipping files that are already
// being compacted). Since we just changed compaction score, we recalculate it
// here.
version->storage_info()->ComputeCompactionScore(*cfd->ioptions(),
*c->mutable_cf_options());
compaction_job.Prepare();
mutex_.Unlock();
TEST_SYNC_POINT("CompactFilesImpl:0");
TEST_SYNC_POINT("CompactFilesImpl:1");
compaction_job.Run();
TEST_SYNC_POINT("CompactFilesImpl:2");
TEST_SYNC_POINT("CompactFilesImpl:3");
mutex_.Lock();
status = compaction_job.Install(*c->mutable_cf_options());
if (status.ok()) {
InstallSuperVersionAndScheduleWork(c->column_family_data(),
&job_context->superversion_contexts[0],
*c->mutable_cf_options());
}
}
c->ReleaseCompactionFiles(s);
#ifndef ROCKSDB_LITE
// Need to make sure SstFileManager does its bookkeeping
......
......@@ -1477,11 +1477,14 @@ struct CompactionOptions {
uint64_t output_file_size_limit;
// If > 0, it will replace the option in the DBOptions for this compaction.
uint32_t max_subcompactions;
// ignore compaction filter to trigger a trivial move
bool trivial_move_ignore_compaction_filter;
CompactionOptions()
: compression(kSnappyCompression),
output_file_size_limit(std::numeric_limits<uint64_t>::max()),
max_subcompactions(0) {}
max_subcompactions(0),
trivial_move_ignore_compaction_filter(false) {}
};
// For level based compaction, we can configure if we want to skip/force
......
......@@ -114,3 +114,11 @@ void Java_org_rocksdb_CompactionOptions_setMaxSubcompactions(
compact_opts->max_subcompactions =
static_cast<uint32_t>(jmax_subcompactions);
}
void Java_org_rocksdb_CompactionOptions_setTrivialMoveIgnoreCompactionFilter(
JNIEnv*, jclass, jlong jhandle, jboolean jtrivial_move_ignore_compaction_filter) {
auto* compact_opts =
reinterpret_cast<ROCKSDB_NAMESPACE::CompactionOptions*>(jhandle);
compact_opts->trivial_move_ignore_compaction_filter = jtrivial_move_ignore_compaction_filter;
}
......@@ -106,6 +106,11 @@ public class CompactionOptions extends RocksObject {
return this;
}
public CompactionOptions setTrivialMoveIgnoreCompactionFilter(final boolean trivialMoveIgnoreCompactionFilter) {
setTrivialMoveIgnoreCompactionFilter(nativeHandle_, trivialMoveIgnoreCompactionFilter);
return this;
}
private static native long newCompactionOptions();
@Override protected final native void disposeInternal(final long handle);
......@@ -118,4 +123,6 @@ public class CompactionOptions extends RocksObject {
private static native int maxSubcompactions(final long handle);
private static native void setMaxSubcompactions(final long handle,
final int maxSubcompactions);
private static native void setTrivialMoveIgnoreCompactionFilter(final long handle,
final boolean trivialMoveIgnoreCompactionFilter);
}
Supports Markdown
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册