提交 75fe6464 编辑于 作者: rui xia's avatar rui xia
浏览文件

Fix L0 compaction sanity check

上级 18203969
......@@ -876,6 +876,11 @@ Status CompactionPicker::SanitizeCompactionInputFilesForAllLevels(
continue;
}
// Here, we separate the sanity check for L1+ and L0.
// For L1+, the check logic is the same as the origin.
// For L0, we do NOT expand inputs, if there is any extra L0 file
// with index greater than first_included has overlapped key with
// [smallestkey, largest], we will abort the compaction.
if (l != 0) {
// expend the compaction input of the current level if it
// has overlapping key-range with other non-compaction input
......@@ -896,32 +901,18 @@ Status CompactionPicker::SanitizeCompactionInputFilesForAllLevels(
}
last_included++;
}
} else if (output_level > 0) {
last_included = static_cast<int>(current_files.size() - 1);
}
// include all files between the first and the last compaction input files.
for (int f = first_included; f <= last_included; ++f) {
if (current_files[f].being_compacted) {
return Status::Aborted("Necessary compaction input file " +
current_files[f].name +
" is currently being compacted.");
}
input_files->insert(TableFileNameToNumber(current_files[f].name));
}
// update smallest and largest key
if (l == 0) {
// include all files between the first and the last compaction input files.
for (int f = first_included; f <= last_included; ++f) {
if (comparator->Compare(smallestkey, current_files[f].smallestkey) >
0) {
smallestkey = current_files[f].smallestkey;
}
if (comparator->Compare(largestkey, current_files[f].largestkey) < 0) {
largestkey = current_files[f].largestkey;
if (current_files[f].being_compacted) {
return Status::Aborted("Necessary compaction input file " +
current_files[f].name +
" is currently being compacted.");
}
input_files->insert(TableFileNameToNumber(current_files[f].name));
}
} else {
// update smallest and largest key
if (comparator->Compare(smallestkey,
current_files[first_included].smallestkey) > 0) {
smallestkey = current_files[first_included].smallestkey;
......@@ -930,6 +921,58 @@ Status CompactionPicker::SanitizeCompactionInputFilesForAllLevels(
current_files[last_included].largestkey) < 0) {
largestkey = current_files[last_included].largestkey;
}
} else {
// consider the following case:
// L0-f1, [L0-f2], L0-f3, [L0-f4], L0-f5
// / /
// first last
// In the original sanity check, we need to compaction L0-f2, L0-f3, L0-f4,
// and L0-f5, for the last_included is expanded to current_files.size() - 1.
// Here we will check if L0-f4 overlaps with L0-f2 or L0-f3. If not, we assert
// the compaction is legal, and do not include [L0-f4]. Otherwise, we abort
// the compaction. Currently, we do not fix the compaction files, and force
// the user to pay attention to the file picked for compaction.
std::vector<const SstFileMetaData*> picked_l0_files;
picked_l0_files.reserve(input_files->size());
// Re-iterate the L0 files in *inputs to get the smallestKey and largestKey.
// Also sanity check the beingCompaction status of the L0 inputs.
for (size_t f = 0; f < current_files.size(); ++f) {
if (input_files->find(TableFileNameToNumber(current_files[f].name)) !=
input_files->end()) {
if (current_files[f].being_compacted) {
return Status::Aborted("Necessary compaction input file " +
current_files[f].name +
" is currently being compacted.");
}
picked_l0_files.push_back(&current_files[f]);
if (comparator->Compare(smallestkey, current_files[f].smallestkey) >
0) {
smallestkey = current_files[f].smallestkey;
}
if (comparator->Compare(largestkey, current_files[f].largestkey) < 0) {
largestkey = current_files[f].largestkey;
}
}
}
// iterate L0 files from first_included to the last file in L0,
// if the file is not picked in *inputs and overlapped with any
// file in *inputs, abort the compaction.
for (size_t f = first_included; f < current_files.size(); ++f) {
if (input_files->find(TableFileNameToNumber(current_files[f].name)) ==
input_files->end()) {
for (auto picked_l0_file : picked_l0_files) {
if (HaveOverlappingKeyRanges(comparator, *picked_l0_file, current_files[f])) {
return Status::Aborted(
"L0 File " + current_files[f].name +
" is overlapped with another L0 file " + picked_l0_file->name +
", but not included in the compaction inputs");
}
}
}
}
}
SstFileMetaData aggregated_file_meta;
......
......@@ -23,7 +23,8 @@ public class FlinkRescalingCompactionFilter extends AbstractCompactionFilter<Sli
}
public FlinkRescalingCompactionFilter(ConfigHolder configHolder, Logger logger) {
super(createNewFlinkRescalingCompactionFilter0(configHolder.nativeHandle_, logger.nativeHandle_));
super(createNewFlinkRescalingCompactionFilter0(configHolder.nativeHandle_,
logger == null ? 0 : logger.nativeHandle_));
}
private native static long createNewFlinkRescalingCompactionFilter0(
......
Supports Markdown
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册