backupable_db.cc 64 KB
Newer Older
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
Siying Dong's avatar
Siying Dong committed
2
3
4
//  This source code is licensed under both the GPLv2 (found in the
//  COPYING file in the root directory) and Apache 2.0 License
//  (found in the LICENSE.Apache file in the root directory).
Igor Canadi's avatar
Igor Canadi committed
5
6
7
8
9
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.

Igor Canadi's avatar
Igor Canadi committed
10
11
#ifndef ROCKSDB_LITE

12
#include "rocksdb/utilities/backupable_db.h"
13
14
15
#include "port/port.h"
#include "rocksdb/rate_limiter.h"
#include "rocksdb/transaction_log.h"
16
#include "util/channel.h"
Igor Canadi's avatar
Igor Canadi committed
17
#include "util/coding.h"
Lei Jin's avatar
Lei Jin committed
18
#include "util/crc32c.h"
19
#include "util/file_reader_writer.h"
20
#include "util/filename.h"
21
#include "util/logging.h"
sdong's avatar
sdong committed
22
#include "util/string_util.h"
23
#include "util/sync_point.h"
24
#include "utilities/checkpoint/checkpoint_impl.h"
Igor Canadi's avatar
Igor Canadi committed
25

liuhuahang's avatar
liuhuahang committed
26
#ifndef __STDC_FORMAT_MACROS
Igor Canadi's avatar
Igor Canadi committed
27
#define __STDC_FORMAT_MACROS
28
#endif  // __STDC_FORMAT_MACROS
Igor Canadi's avatar
Igor Canadi committed
29
30

#include <inttypes.h>
31
#include <stdlib.h>
Igor Canadi's avatar
Igor Canadi committed
32
#include <algorithm>
Wanning Jiang's avatar
Wanning Jiang committed
33
#include <atomic>
34
#include <functional>
Wanning Jiang's avatar
Wanning Jiang committed
35
36
#include <future>
#include <limits>
Igor Canadi's avatar
Igor Canadi committed
37
#include <map>
38
#include <mutex>
39
#include <sstream>
Igor Canadi's avatar
Igor Canadi committed
40
#include <string>
41
#include <thread>
42
#include <unordered_map>
43
#include <unordered_set>
Wanning Jiang's avatar
Wanning Jiang committed
44
#include <vector>
Igor Canadi's avatar
Igor Canadi committed
45
46
47

namespace rocksdb {

48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
void BackupStatistics::IncrementNumberSuccessBackup() {
  number_success_backup++;
}
void BackupStatistics::IncrementNumberFailBackup() {
  number_fail_backup++;
}

uint32_t BackupStatistics::GetNumberSuccessBackup() const {
  return number_success_backup;
}
uint32_t BackupStatistics::GetNumberFailBackup() const {
  return number_fail_backup;
}

std::string BackupStatistics::ToString() const {
  char result[50];
  snprintf(result, sizeof(result), "# success backup: %u, # fail backup: %u",
           GetNumberSuccessBackup(), GetNumberFailBackup());
  return result;
}

Igor Canadi's avatar
Igor Canadi committed
69
void BackupableDBOptions::Dump(Logger* logger) const {
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
  ROCKS_LOG_INFO(logger, "               Options.backup_dir: %s",
                 backup_dir.c_str());
  ROCKS_LOG_INFO(logger, "               Options.backup_env: %p", backup_env);
  ROCKS_LOG_INFO(logger, "        Options.share_table_files: %d",
                 static_cast<int>(share_table_files));
  ROCKS_LOG_INFO(logger, "                 Options.info_log: %p", info_log);
  ROCKS_LOG_INFO(logger, "                     Options.sync: %d",
                 static_cast<int>(sync));
  ROCKS_LOG_INFO(logger, "         Options.destroy_old_data: %d",
                 static_cast<int>(destroy_old_data));
  ROCKS_LOG_INFO(logger, "         Options.backup_log_files: %d",
                 static_cast<int>(backup_log_files));
  ROCKS_LOG_INFO(logger, "        Options.backup_rate_limit: %" PRIu64,
                 backup_rate_limit);
  ROCKS_LOG_INFO(logger, "       Options.restore_rate_limit: %" PRIu64,
                 restore_rate_limit);
  ROCKS_LOG_INFO(logger, "Options.max_background_operations: %d",
                 max_background_operations);
Igor Canadi's avatar
Igor Canadi committed
88
89
}

Igor Canadi's avatar
Igor Canadi committed
90
91
// -------- BackupEngineImpl class ---------
class BackupEngineImpl : public BackupEngine {
Igor Canadi's avatar
Igor Canadi committed
92
 public:
Igor Canadi's avatar
Igor Canadi committed
93
94
  BackupEngineImpl(Env* db_env, const BackupableDBOptions& options,
                   bool read_only = false);
Igor Canadi's avatar
Igor Canadi committed
95
  ~BackupEngineImpl();
96
97
98
99
  Status CreateNewBackupWithMetadata(DB* db, const std::string& app_metadata,
                                     bool flush_before_backup = false,
                                     std::function<void()> progress_callback =
                                         []() {}) override;
Igor Sugak's avatar
Igor Sugak committed
100
101
102
  Status PurgeOldBackups(uint32_t num_backups_to_keep) override;
  Status DeleteBackup(BackupID backup_id) override;
  void StopBackup() override {
103
104
    stop_backup_.store(true, std::memory_order_release);
  }
Igor Sugak's avatar
Igor Sugak committed
105
106
  Status GarbageCollect() override;

107
108
  // The returned BackupInfos are in chronological order, which means the
  // latest backup comes last.
Igor Sugak's avatar
Igor Sugak committed
109
110
111
112
113
114
115
116
  void GetBackupInfo(std::vector<BackupInfo>* backup_info) override;
  void GetCorruptedBackups(std::vector<BackupID>* corrupt_backup_ids) override;
  Status RestoreDBFromBackup(
      BackupID backup_id, const std::string& db_dir, const std::string& wal_dir,
      const RestoreOptions& restore_options = RestoreOptions()) override;
  Status RestoreDBFromLatestBackup(
      const std::string& db_dir, const std::string& wal_dir,
      const RestoreOptions& restore_options = RestoreOptions()) override {
117
    return RestoreDBFromBackup(latest_valid_backup_id_, db_dir, wal_dir,
118
                               restore_options);
Igor Canadi's avatar
Igor Canadi committed
119
120
  }

121
122
  virtual Status VerifyBackup(BackupID backup_id) override;

123
124
  Status Initialize();

Igor Canadi's avatar
Igor Canadi committed
125
 private:
126
127
  void DeleteChildren(const std::string& dir, uint32_t file_type_filter = 0);

128
  // Extends the "result" map with pathname->size mappings for the contents of
129
  // "dir" in "env". Pathnames are prefixed with "dir".
130
  Status InsertPathnameToSizeBytes(
131
      const std::string& dir, Env* env,
132
133
      std::unordered_map<std::string, uint64_t>* result);

Lei Jin's avatar
Lei Jin committed
134
135
136
137
  struct FileInfo {
    FileInfo(const std::string& fname, uint64_t sz, uint32_t checksum)
      : refs(0), filename(fname), size(sz), checksum_value(checksum) {}

138
139
140
    FileInfo(const FileInfo&) = delete;
    FileInfo& operator=(const FileInfo&) = delete;

Lei Jin's avatar
Lei Jin committed
141
142
143
    int refs;
    const std::string filename;
    const uint64_t size;
144
    const uint32_t checksum_value;
Lei Jin's avatar
Lei Jin committed
145
146
  };

Igor Canadi's avatar
Igor Canadi committed
147
148
  class BackupMeta {
   public:
149
150
    BackupMeta(
        const std::string& meta_filename, const std::string& meta_tmp_filename,
151
152
        std::unordered_map<std::string, std::shared_ptr<FileInfo>>* file_infos,
        Env* env)
153
154
155
156
157
158
159
        : timestamp_(0),
          sequence_number_(0),
          size_(0),
          meta_filename_(meta_filename),
          meta_tmp_filename_(meta_tmp_filename),
          file_infos_(file_infos),
          env_(env) {}
Igor Canadi's avatar
Igor Canadi committed
160

161
162
163
    BackupMeta(const BackupMeta&) = delete;
    BackupMeta& operator=(const BackupMeta&) = delete;

Igor Canadi's avatar
Igor Canadi committed
164
165
166
167
168
169
170
171
172
173
174
    ~BackupMeta() {}

    void RecordTimestamp() {
      env_->GetCurrentTime(&timestamp_);
    }
    int64_t GetTimestamp() const {
      return timestamp_;
    }
    uint64_t GetSize() const {
      return size_;
    }
175
    uint32_t GetNumberFiles() { return static_cast<uint32_t>(files_.size()); }
176
177
178
179
180
181
    void SetSequenceNumber(uint64_t sequence_number) {
      sequence_number_ = sequence_number;
    }
    uint64_t GetSequenceNumber() {
      return sequence_number_;
    }
Igor Canadi's avatar
Igor Canadi committed
182

183
184
185
186
187
188
    const std::string& GetAppMetadata() const { return app_metadata_; }

    void SetAppMetadata(const std::string& app_metadata) {
      app_metadata_ = app_metadata;
    }

189
    Status AddFile(std::shared_ptr<FileInfo> file_info);
Lei Jin's avatar
Lei Jin committed
190

191
    Status Delete(bool delete_meta = true);
Igor Canadi's avatar
Igor Canadi committed
192
193
194
195
196

    bool Empty() {
      return files_.empty();
    }

197
    std::shared_ptr<FileInfo> GetFile(const std::string& filename) const {
198
199
200
      auto it = file_infos_->find(filename);
      if (it == file_infos_->end())
        return nullptr;
201
      return it->second;
202
203
    }

204
    const std::vector<std::shared_ptr<FileInfo>>& GetFiles() {
Igor Canadi's avatar
Igor Canadi committed
205
206
207
      return files_;
    }

208
209
    // @param abs_path_to_size Pre-fetched file sizes (bytes).
    Status LoadFromFile(
210
        const std::string& backup_dir,
211
        const std::unordered_map<std::string, uint64_t>& abs_path_to_size);
Igor Canadi's avatar
Igor Canadi committed
212
213
    Status StoreToFile(bool sync);

214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
    std::string GetInfoString() {
      std::ostringstream ss;
      ss << "Timestamp: " << timestamp_ << std::endl;
      char human_size[16];
      AppendHumanBytes(size_, human_size, sizeof(human_size));
      ss << "Size: " << human_size << std::endl;
      ss << "Files:" << std::endl;
      for (const auto& file : files_) {
        AppendHumanBytes(file->size, human_size, sizeof(human_size));
        ss << file->filename << ", size " << human_size << ", refs "
           << file->refs << std::endl;
      }
      return ss.str();
    }

Igor Canadi's avatar
Igor Canadi committed
229
230
   private:
    int64_t timestamp_;
231
232
233
    // sequence number is only approximate, should not be used
    // by clients
    uint64_t sequence_number_;
Igor Canadi's avatar
Igor Canadi committed
234
    uint64_t size_;
235
    std::string app_metadata_;
Igor Canadi's avatar
Igor Canadi committed
236
    std::string const meta_filename_;
237
    std::string const meta_tmp_filename_;
Igor Canadi's avatar
Igor Canadi committed
238
    // files with relative paths (without "/" prefix!!)
239
240
    std::vector<std::shared_ptr<FileInfo>> files_;
    std::unordered_map<std::string, std::shared_ptr<FileInfo>>* file_infos_;
Igor Canadi's avatar
Igor Canadi committed
241
    Env* env_;
242

243
244
    static const size_t max_backup_meta_file_size_ = 10 * 1024 * 1024;  // 10MB
  };  // BackupMeta
Igor Canadi's avatar
Igor Canadi committed
245
246
247
248
249
250
251
252
253

  inline std::string GetAbsolutePath(
      const std::string &relative_path = "") const {
    assert(relative_path.size() == 0 || relative_path[0] != '/');
    return options_.backup_dir + "/" + relative_path;
  }
  inline std::string GetPrivateDirRel() const {
    return "private";
  }
254
255
256
  inline std::string GetSharedChecksumDirRel() const {
    return "shared_checksum";
  }
Igor Canadi's avatar
Igor Canadi committed
257
  inline std::string GetPrivateFileRel(BackupID backup_id,
258
259
                                       bool tmp = false,
                                       const std::string& file = "") const {
Igor Canadi's avatar
Igor Canadi committed
260
    assert(file.size() == 0 || file[0] != '/');
sdong's avatar
sdong committed
261
    return GetPrivateDirRel() + "/" + rocksdb::ToString(backup_id) +
262
           (tmp ? ".tmp" : "") + "/" + file;
Igor Canadi's avatar
Igor Canadi committed
263
  }
264
265
  inline std::string GetSharedFileRel(const std::string& file = "",
                                      bool tmp = false) const {
Igor Canadi's avatar
Igor Canadi committed
266
    assert(file.size() == 0 || file[0] != '/');
267
268
    return std::string("shared/") + (tmp ? "." : "") + file +
           (tmp ? ".tmp" : "");
Igor Canadi's avatar
Igor Canadi committed
269
  }
270
271
272
  inline std::string GetSharedFileWithChecksumRel(const std::string& file = "",
                                                  bool tmp = false) const {
    assert(file.size() == 0 || file[0] != '/');
273
274
    return GetSharedChecksumDirRel() + "/" + (tmp ? "." : "") + file +
           (tmp ? ".tmp" : "");
275
276
277
278
279
280
281
  }
  inline std::string GetSharedFileWithChecksum(const std::string& file,
                                               const uint32_t checksum_value,
                                               const uint64_t file_size) const {
    assert(file.size() == 0 || file[0] != '/');
    std::string file_copy = file;
    return file_copy.insert(file_copy.find_last_of('.'),
sdong's avatar
sdong committed
282
283
                            "_" + rocksdb::ToString(checksum_value) + "_" +
                                rocksdb::ToString(file_size));
284
285
286
287
288
289
290
291
  }
  inline std::string GetFileFromChecksumFile(const std::string& file) const {
    assert(file.size() == 0 || file[0] != '/');
    std::string file_copy = file;
    size_t first_underscore = file_copy.find_first_of('_');
    return file_copy.erase(first_underscore,
                           file_copy.find_last_of('.') - first_underscore);
  }
Igor Canadi's avatar
Igor Canadi committed
292
293
294
  inline std::string GetBackupMetaDir() const {
    return GetAbsolutePath("meta");
  }
295
296
297
  inline std::string GetBackupMetaFile(BackupID backup_id, bool tmp) const {
    return GetBackupMetaDir() + "/" + (tmp ? "." : "") +
           rocksdb::ToString(backup_id) + (tmp ? ".tmp" : "");
Igor Canadi's avatar
Igor Canadi committed
298
299
  }

300
301
302
303
304
305
306
307
308
309
310
311
312
  // If size_limit == 0, there is no size limit, copy everything.
  //
  // Exactly one of src and contents must be non-empty.
  //
  // @param src If non-empty, the file is copied from this pathname.
  // @param contents If non-empty, the file will be created with these contents.
  Status CopyOrCreateFile(const std::string& src, const std::string& dst,
                          const std::string& contents, Env* src_env,
                          Env* dst_env, bool sync, RateLimiter* rate_limiter,
                          uint64_t* size = nullptr,
                          uint32_t* checksum_value = nullptr,
                          uint64_t size_limit = 0,
                          std::function<void()> progress_callback = []() {});
Lei Jin's avatar
Lei Jin committed
313
314
315
316
317
318

  Status CalculateChecksum(const std::string& src,
                           Env* src_env,
                           uint64_t size_limit,
                           uint32_t* checksum_value);

319
  struct CopyOrCreateResult {
320
321
322
323
    uint64_t size;
    uint32_t checksum_value;
    Status status;
  };
324
325
326
327
328

  // Exactly one of src_path and contents must be non-empty. If src_path is
  // non-empty, the file is copied from this pathname. Otherwise, if contents is
  // non-empty, the file will be created at dst_path with these contents.
  struct CopyOrCreateWorkItem {
329
330
    std::string src_path;
    std::string dst_path;
331
    std::string contents;
332
333
334
    Env* src_env;
    Env* dst_env;
    bool sync;
335
    RateLimiter* rate_limiter;
336
    uint64_t size_limit;
337
    std::promise<CopyOrCreateResult> result;
338
    std::function<void()> progress_callback;
339

340
341
342
343
344
345
346
347
348
349
    CopyOrCreateWorkItem()
      : src_path(""),
        dst_path(""),
        contents(""),
        src_env(nullptr),
        dst_env(nullptr),
        sync(false),
        rate_limiter(nullptr),
        size_limit(0) {}

350
351
    CopyOrCreateWorkItem(const CopyOrCreateWorkItem&) = delete;
    CopyOrCreateWorkItem& operator=(const CopyOrCreateWorkItem&) = delete;
352

353
354
355
    CopyOrCreateWorkItem(CopyOrCreateWorkItem&& o) ROCKSDB_NOEXCEPT {
      *this = std::move(o);
    }
356

357
    CopyOrCreateWorkItem& operator=(CopyOrCreateWorkItem&& o) ROCKSDB_NOEXCEPT {
358
359
      src_path = std::move(o.src_path);
      dst_path = std::move(o.dst_path);
360
      contents = std::move(o.contents);
361
362
363
364
365
366
      src_env = o.src_env;
      dst_env = o.dst_env;
      sync = o.sync;
      rate_limiter = o.rate_limiter;
      size_limit = o.size_limit;
      result = std::move(o.result);
367
      progress_callback = std::move(o.progress_callback);
368
369
370
      return *this;
    }

371
372
373
374
375
    CopyOrCreateWorkItem(std::string _src_path, std::string _dst_path,
                         std::string _contents, Env* _src_env, Env* _dst_env,
                         bool _sync, RateLimiter* _rate_limiter,
                         uint64_t _size_limit,
                         std::function<void()> _progress_callback = []() {})
376
377
        : src_path(std::move(_src_path)),
          dst_path(std::move(_dst_path)),
378
          contents(std::move(_contents)),
379
380
381
382
          src_env(_src_env),
          dst_env(_dst_env),
          sync(_sync),
          rate_limiter(_rate_limiter),
383
384
          size_limit(_size_limit),
          progress_callback(_progress_callback) {}
385
386
  };

387
388
  struct BackupAfterCopyOrCreateWorkItem {
    std::future<CopyOrCreateResult> result;
389
390
391
392
393
394
    bool shared;
    bool needed_to_copy;
    Env* backup_env;
    std::string dst_path_tmp;
    std::string dst_path;
    std::string dst_relative;
395
396
397
398
399
400
401
    BackupAfterCopyOrCreateWorkItem()
      : shared(false),
        needed_to_copy(false),
        backup_env(nullptr),
        dst_path_tmp(""),
        dst_path(""),
        dst_relative("") {}
402

403
404
    BackupAfterCopyOrCreateWorkItem(BackupAfterCopyOrCreateWorkItem&& o)
        ROCKSDB_NOEXCEPT {
405
406
407
      *this = std::move(o);
    }

408
409
    BackupAfterCopyOrCreateWorkItem& operator=(
        BackupAfterCopyOrCreateWorkItem&& o) ROCKSDB_NOEXCEPT {
410
411
412
413
414
415
416
417
418
419
      result = std::move(o.result);
      shared = o.shared;
      needed_to_copy = o.needed_to_copy;
      backup_env = o.backup_env;
      dst_path_tmp = std::move(o.dst_path_tmp);
      dst_path = std::move(o.dst_path);
      dst_relative = std::move(o.dst_relative);
      return *this;
    }

420
421
422
423
424
    BackupAfterCopyOrCreateWorkItem(std::future<CopyOrCreateResult>&& _result,
                                    bool _shared, bool _needed_to_copy,
                                    Env* _backup_env, std::string _dst_path_tmp,
                                    std::string _dst_path,
                                    std::string _dst_relative)
425
426
427
428
429
430
431
432
433
        : result(std::move(_result)),
          shared(_shared),
          needed_to_copy(_needed_to_copy),
          backup_env(_backup_env),
          dst_path_tmp(std::move(_dst_path_tmp)),
          dst_path(std::move(_dst_path)),
          dst_relative(std::move(_dst_relative)) {}
  };

434
435
  struct RestoreAfterCopyOrCreateWorkItem {
    std::future<CopyOrCreateResult> result;
436
    uint32_t checksum_value;
437
438
    RestoreAfterCopyOrCreateWorkItem()
      : checksum_value(0) {}
439
440
    RestoreAfterCopyOrCreateWorkItem(std::future<CopyOrCreateResult>&& _result,
                                     uint32_t _checksum_value)
sdong's avatar
sdong committed
441
        : result(std::move(_result)), checksum_value(_checksum_value) {}
442
443
    RestoreAfterCopyOrCreateWorkItem(RestoreAfterCopyOrCreateWorkItem&& o)
        ROCKSDB_NOEXCEPT {
444
445
446
      *this = std::move(o);
    }

447
448
    RestoreAfterCopyOrCreateWorkItem& operator=(
        RestoreAfterCopyOrCreateWorkItem&& o) ROCKSDB_NOEXCEPT {
449
450
451
452
      result = std::move(o.result);
      checksum_value = o.checksum_value;
      return *this;
    }
453
454
  };

455
  bool initialized_;
456
  std::mutex byte_report_mutex_;
457
  channel<CopyOrCreateWorkItem> files_to_copy_or_create_;
Dmitri Smirnov's avatar
Dmitri Smirnov committed
458
  std::vector<port::Thread> threads_;
459

460
461
462
463
464
465
466
467
468
  // Adds a file to the backup work queue to be copied or created if it doesn't
  // already exist.
  //
  // Exactly one of src_dir and contents must be non-empty.
  //
  // @param src_dir If non-empty, the file in this directory named fname will be
  //    copied.
  // @param fname Name of destination file and, in case of copy, source file.
  // @param contents If non-empty, the file will be created with these contents.
469
  Status AddBackupFileWorkItem(
470
      std::unordered_set<std::string>& live_dst_paths,
471
      std::vector<BackupAfterCopyOrCreateWorkItem>& backup_items_to_finish,
472
      BackupID backup_id, bool shared, const std::string& src_dir,
473
      const std::string& fname,  // starts with "/"
474
      RateLimiter* rate_limiter, uint64_t size_bytes, uint64_t size_limit = 0,
475
      bool shared_checksum = false,
476
477
      std::function<void()> progress_callback = []() {},
      const std::string& contents = std::string());
478

Igor Canadi's avatar
Igor Canadi committed
479
480
  // backup state data
  BackupID latest_backup_id_;
481
  BackupID latest_valid_backup_id_;
482
483
484
485
486
  std::map<BackupID, unique_ptr<BackupMeta>> backups_;
  std::map<BackupID,
           std::pair<Status, unique_ptr<BackupMeta>>> corrupt_backups_;
  std::unordered_map<std::string,
                     std::shared_ptr<FileInfo>> backuped_file_infos_;
487
  std::atomic<bool> stop_backup_;
Igor Canadi's avatar
Igor Canadi committed
488
489
490
491
492
493

  // options data
  BackupableDBOptions options_;
  Env* db_env_;
  Env* backup_env_;

Igor Canadi's avatar
Igor Canadi committed
494
495
496
497
498
499
  // directories
  unique_ptr<Directory> backup_directory_;
  unique_ptr<Directory> shared_directory_;
  unique_ptr<Directory> meta_directory_;
  unique_ptr<Directory> private_directory_;

Igor Canadi's avatar
Igor Canadi committed
500
501
  static const size_t kDefaultCopyFileBufferSize = 5 * 1024 * 1024LL;  // 5MB
  size_t copy_file_buffer_size_;
Igor Canadi's avatar
Igor Canadi committed
502
  bool read_only_;
503
  BackupStatistics backup_statistics_;
504
  static const size_t kMaxAppMetaSize = 1024 * 1024;  // 1MB
Igor Canadi's avatar
Igor Canadi committed
505
506
};

507
Status BackupEngine::Open(Env* env, const BackupableDBOptions& options,
Hasnain Lakhani's avatar
Hasnain Lakhani committed
508
                          BackupEngine** backup_engine_ptr) {
509
510
511
512
513
514
515
516
  std::unique_ptr<BackupEngineImpl> backup_engine(
      new BackupEngineImpl(env, options));
  auto s = backup_engine->Initialize();
  if (!s.ok()) {
    *backup_engine_ptr = nullptr;
    return s;
  }
  *backup_engine_ptr = backup_engine.release();
Hasnain Lakhani's avatar
Hasnain Lakhani committed
517
518
519
  return Status::OK();
}

Igor Canadi's avatar
Igor Canadi committed
520
BackupEngineImpl::BackupEngineImpl(Env* db_env,
Igor Canadi's avatar
Igor Canadi committed
521
522
                                   const BackupableDBOptions& options,
                                   bool read_only)
523
    : initialized_(false),
524
525
      latest_backup_id_(0),
      latest_valid_backup_id_(0),
526
      stop_backup_(false),
527
528
      options_(options),
      db_env_(db_env),
Igor Canadi's avatar
Igor Canadi committed
529
      backup_env_(options.backup_env != nullptr ? options.backup_env : db_env_),
Igor Canadi's avatar
Igor Canadi committed
530
      copy_file_buffer_size_(kDefaultCopyFileBufferSize),
531
532
533
534
535
536
537
538
539
540
541
542
      read_only_(read_only) {
  if (options_.backup_rate_limiter == nullptr &&
      options_.backup_rate_limit > 0) {
    options_.backup_rate_limiter.reset(
        NewGenericRateLimiter(options_.backup_rate_limit));
  }
  if (options_.restore_rate_limiter == nullptr &&
      options_.restore_rate_limit > 0) {
    options_.restore_rate_limiter.reset(
        NewGenericRateLimiter(options_.restore_rate_limit));
  }
}
543
544

BackupEngineImpl::~BackupEngineImpl() {
545
  files_to_copy_or_create_.sendEof();
546
547
548
549
550
  for (auto& t : threads_) {
    t.join();
  }
  LogFlush(options_.info_log);
}
551

552
553
554
Status BackupEngineImpl::Initialize() {
  assert(!initialized_);
  initialized_ = true;
Igor Canadi's avatar
Igor Canadi committed
555
  if (read_only_) {
556
    ROCKS_LOG_INFO(options_.info_log, "Starting read_only backup engine");
Igor Canadi's avatar
Igor Canadi committed
557
  }
Igor Canadi's avatar
Igor Canadi committed
558
559
  options_.Dump(options_.info_log);

Igor Canadi's avatar
Igor Canadi committed
560
  if (!read_only_) {
561
562
563
564
    // gather the list of directories that we need to create
    std::vector<std::pair<std::string, std::unique_ptr<Directory>*>>
        directories;
    directories.emplace_back(GetAbsolutePath(), &backup_directory_);
Igor Canadi's avatar
Igor Canadi committed
565
    if (options_.share_table_files) {
566
      if (options_.share_files_with_checksum) {
567
568
569
        directories.emplace_back(
            GetAbsolutePath(GetSharedFileWithChecksumRel()),
            &shared_directory_);
570
      } else {
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
        directories.emplace_back(GetAbsolutePath(GetSharedFileRel()),
                                 &shared_directory_);
      }
    }
    directories.emplace_back(GetAbsolutePath(GetPrivateDirRel()),
                             &private_directory_);
    directories.emplace_back(GetBackupMetaDir(), &meta_directory_);
    // create all the dirs we need
    for (const auto& d : directories) {
      auto s = backup_env_->CreateDirIfMissing(d.first);
      if (s.ok()) {
        s = backup_env_->NewDirectory(d.first, d.second);
      }
      if (!s.ok()) {
        return s;
586
      }
Igor Canadi's avatar
Igor Canadi committed
587
    }
588
  }
Igor Canadi's avatar
Igor Canadi committed
589
590

  std::vector<std::string> backup_meta_files;
591
592
  {
    auto s = backup_env_->GetChildren(GetBackupMetaDir(), &backup_meta_files);
593
    if (s.IsNotFound()) {
Wanning Jiang's avatar
Wanning Jiang committed
594
      return Status::NotFound(GetBackupMetaDir() + " is missing");
595
596
    } else if (!s.ok()) {
      return s;
597
598
    }
  }
Igor Canadi's avatar
Igor Canadi committed
599
600
  // create backups_ structure
  for (auto& file : backup_meta_files) {
601
602
603
    if (file == "." || file == "..") {
      continue;
    }
604
    ROCKS_LOG_INFO(options_.info_log, "Detected backup %s", file.c_str());
Igor Canadi's avatar
Igor Canadi committed
605
606
    BackupID backup_id = 0;
    sscanf(file.c_str(), "%u", &backup_id);
sdong's avatar
sdong committed
607
    if (backup_id == 0 || file != rocksdb::ToString(backup_id)) {
Igor Canadi's avatar
Igor Canadi committed
608
609
      if (!read_only_) {
        // invalid file name, delete that
610
        auto s = backup_env_->DeleteFile(GetBackupMetaDir() + "/" + file);
611
612
613
        ROCKS_LOG_INFO(options_.info_log,
                       "Unrecognized meta file %s, deleting -- %s",
                       file.c_str(), s.ToString().c_str());
Igor Canadi's avatar
Igor Canadi committed
614
      }
Igor Canadi's avatar
Igor Canadi committed
615
616
617
      continue;
    }
    assert(backups_.find(backup_id) == backups_.end());
618
619
620
621
622
    backups_.insert(std::make_pair(
        backup_id, unique_ptr<BackupMeta>(new BackupMeta(
                       GetBackupMetaFile(backup_id, false /* tmp */),
                       GetBackupMetaFile(backup_id, true /* tmp */),
                       &backuped_file_infos_, backup_env_))));
Igor Canadi's avatar
Igor Canadi committed
623
624
  }

625
  latest_backup_id_ = 0;
626
  latest_valid_backup_id_ = 0;
clark.kang's avatar
clark.kang committed
627
  if (options_.destroy_old_data) {  // Destroy old data
Igor Canadi's avatar
Igor Canadi committed
628
    assert(!read_only_);
629
630
    ROCKS_LOG_INFO(
        options_.info_log,
631
632
        "Backup Engine started with destroy_old_data == true, deleting all "
        "backups");
633
634
635
636
637
638
639
    auto s = PurgeOldBackups(0);
    if (s.ok()) {
      s = GarbageCollect();
    }
    if (!s.ok()) {
      return s;
    }
640
  } else {  // Load data from storage
641
642
643
644
    std::unordered_map<std::string, uint64_t> abs_path_to_size;
    for (const auto& rel_dir :
         {GetSharedFileRel(), GetSharedFileWithChecksumRel()}) {
      const auto abs_dir = GetAbsolutePath(rel_dir);
645
      InsertPathnameToSizeBytes(abs_dir, backup_env_, &abs_path_to_size);
646
    }
Andrew Kryczka's avatar
Andrew Kryczka committed
647
648
    // load the backups if any, until valid_backups_to_open of the latest
    // non-corrupted backups have been successfully opened.
649
    int valid_backups_to_open = options_.max_valid_backups_to_open;
Andrew Kryczka's avatar
Andrew Kryczka committed
650
    for (auto backup_iter = backups_.rbegin();
651
         backup_iter != backups_.rend();
Andrew Kryczka's avatar
Andrew Kryczka committed
652
         ++backup_iter) {
653
654
655
656
      assert(latest_backup_id_ == 0 || latest_backup_id_ > backup_iter->first);
      if (latest_backup_id_ == 0) {
        latest_backup_id_ = backup_iter->first;
      }
657
658
659
660
      if (valid_backups_to_open == 0) {
        break;
      }

661
      InsertPathnameToSizeBytes(
Andrew Kryczka's avatar
Andrew Kryczka committed
662
          GetAbsolutePath(GetPrivateFileRel(backup_iter->first)), backup_env_,
663
          &abs_path_to_size);
Andrew Kryczka's avatar
Andrew Kryczka committed
664
665
      Status s = backup_iter->second->LoadFromFile(options_.backup_dir,
                                                   abs_path_to_size);
666
      if (s.IsCorruption()) {
667
        ROCKS_LOG_INFO(options_.info_log, "Backup %u corrupted -- %s",
Andrew Kryczka's avatar
Andrew Kryczka committed
668
669
670
671
                       backup_iter->first, s.ToString().c_str());
        corrupt_backups_.insert(
            std::make_pair(backup_iter->first,
                           std::make_pair(s, std::move(backup_iter->second))));
672
673
674
675
676
      } else if (!s.ok()) {
        // Distinguish corruption errors from errors in the backup Env.
        // Errors in the backup Env (i.e., this code path) will cause Open() to
        // fail, whereas corruption errors would not cause Open() failures.
        return s;
677
      } else {
678
        ROCKS_LOG_INFO(options_.info_log, "Loading backup %" PRIu32 " OK:\n%s",
Andrew Kryczka's avatar
Andrew Kryczka committed
679
680
                       backup_iter->first,
                       backup_iter->second->GetInfoString().c_str());
681
682
683
684
685
        assert(latest_valid_backup_id_ == 0 ||
               latest_valid_backup_id_ > backup_iter->first);
        if (latest_valid_backup_id_ == 0) {
          latest_valid_backup_id_ = backup_iter->first;
        }
Andrew Kryczka's avatar
Andrew Kryczka committed
686
        --valid_backups_to_open;
Igor Canadi's avatar
Igor Canadi committed
687
688
      }
    }
Hasnain Lakhani's avatar
Hasnain Lakhani committed
689

690
    for (const auto& corrupt : corrupt_backups_) {
Hasnain Lakhani's avatar
Hasnain Lakhani committed
691
      backups_.erase(backups_.find(corrupt.first));
Igor Canadi's avatar
Igor Canadi committed
692
    }
Andrew Kryczka's avatar
Andrew Kryczka committed
693
694
695
696
697
698
699
700
701
702
703
704
705
    // erase the backups before max_valid_backups_to_open
    int num_unopened_backups;
    if (options_.max_valid_backups_to_open == 0) {
      num_unopened_backups = 0;
    } else {
      num_unopened_backups =
          std::max(0, static_cast<int>(backups_.size()) -
                          options_.max_valid_backups_to_open);
    }
    for (int i = 0; i < num_unopened_backups; ++i) {
      assert(backups_.begin()->second->Empty());
      backups_.erase(backups_.begin());
    }
Igor Canadi's avatar
Igor Canadi committed
706
707
  }

708
  ROCKS_LOG_INFO(options_.info_log, "Latest backup is %u", latest_backup_id_);
709
710
  ROCKS_LOG_INFO(options_.info_log, "Latest valid backup is %u",
                 latest_valid_backup_id_);
711

712
713
  // set up threads perform copies from files_to_copy_or_create_ in the
  // background
714
  for (int t = 0; t < options_.max_background_operations; t++) {
715
    threads_.emplace_back([this]() {
716
717
718
719
720
#if defined(_GNU_SOURCE) && defined(__GLIBC_PREREQ)
#if __GLIBC_PREREQ(2, 12)
      pthread_setname_np(pthread_self(), "backup_engine");
#endif
#endif
721
722
723
724
725
726
727
728
      CopyOrCreateWorkItem work_item;
      while (files_to_copy_or_create_.read(work_item)) {
        CopyOrCreateResult result;
        result.status = CopyOrCreateFile(
            work_item.src_path, work_item.dst_path, work_item.contents,
            work_item.src_env, work_item.dst_env, work_item.sync,
            work_item.rate_limiter, &result.size, &result.checksum_value,
            work_item.size_limit, work_item.progress_callback);
729
730
731
732
        work_item.result.set_value(std::move(result));
      }
    });
  }
733
  ROCKS_LOG_INFO(options_.info_log, "Initialized BackupEngine");
Igor Canadi's avatar
Igor Canadi committed
734

735
  return Status::OK();
736
}
Igor Canadi's avatar
Igor Canadi committed
737

738
739
740
Status BackupEngineImpl::CreateNewBackupWithMetadata(
    DB* db, const std::string& app_metadata, bool flush_before_backup,
    std::function<void()> progress_callback) {
741
  assert(initialized_);
Igor Canadi's avatar
Igor Canadi committed
742
  assert(!read_only_);
743
744
745
  if (app_metadata.size() > kMaxAppMetaSize) {
    return Status::InvalidArgument("App metadata too large");
  }
Igor Canadi's avatar
Igor Canadi committed
746
747

  BackupID new_backup_id = latest_backup_id_ + 1;
Wanning Jiang's avatar
Wanning Jiang committed
748

Igor Canadi's avatar
Igor Canadi committed
749
  assert(backups_.find(new_backup_id) == backups_.end());
750
751
752
753
754
755
756
757
758
759
760
761
762

  auto private_dir = GetAbsolutePath(GetPrivateFileRel(new_backup_id));
  Status s = backup_env_->FileExists(private_dir);
  if (s.ok()) {
    // maybe last backup failed and left partial state behind, clean it up.
    // need to do this before updating backups_ such that a private dir
    // named after new_backup_id will be cleaned up
    s = GarbageCollect();
  } else if (s.IsNotFound()) {
    // normal case, the new backup's private dir doesn't exist yet
    s = Status::OK();
  }

763
764
765
766
767
  auto ret = backups_.insert(std::make_pair(
      new_backup_id, unique_ptr<BackupMeta>(new BackupMeta(
                         GetBackupMetaFile(new_backup_id, false /* tmp */),
                         GetBackupMetaFile(new_backup_id, true /* tmp */),
                         &backuped_file_infos_, backup_env_))));
Igor Canadi's avatar
Igor Canadi committed
768
769
  assert(ret.second == true);
  auto& new_backup = ret.first->second;
770
  new_backup->RecordTimestamp();
771
  new_backup->SetAppMetadata(app_metadata);
Igor Canadi's avatar
Igor Canadi committed
772

773
  auto start_backup = backup_env_->NowMicros();
774

775
776
777
  ROCKS_LOG_INFO(options_.info_log,
                 "Started the backup process -- creating backup %u",
                 new_backup_id);
778
  if (s.ok()) {
779
    s = backup_env_->CreateDir(private_dir);
780
  }
Igor Canadi's avatar
Igor Canadi committed
781

782
783
  RateLimiter* rate_limiter = options_.backup_rate_limiter.get();
  if (rate_limiter) {
784
    copy_file_buffer_size_ = rate_limiter->GetSingleBurstBytes();
Igor Canadi's avatar
Igor Canadi committed
785
786
  }

787
788
789
790
791
  // A set into which we will insert the dst_paths that are calculated for live
  // files and live WAL files.
  // This is used to check whether a live files shares a dst_path with another
  // live file.
  std::unordered_set<std::string> live_dst_paths;
792

793
794
  std::vector<BackupAfterCopyOrCreateWorkItem> backup_items_to_finish;
  // Add a CopyOrCreateWorkItem to the channel for each live file
795
796
797
798
799
800
  db->DisableFileDeletions();
  if (s.ok()) {
    CheckpointImpl checkpoint(db);
    uint64_t sequence_number = 0;
    s = checkpoint.CreateCustomCheckpoint(
        db->GetDBOptions(),
801
        [&](const std::string& src_dirname, const std::string& fname,
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
            FileType) {
          // custom checkpoint will switch to calling copy_file_cb after it sees
          // NotSupported returned from link_file_cb.
          return Status::NotSupported();
        } /* link_file_cb */,
        [&](const std::string& src_dirname, const std::string& fname,
            uint64_t size_limit_bytes, FileType type) {
          if (type == kLogFile && !options_.backup_log_files) {
            return Status::OK();
          }
          Log(options_.info_log, "add file for backup %s", fname.c_str());
          uint64_t size_bytes = 0;
          Status st;
          if (type == kTableFile) {
            st = db_env_->GetFileSize(src_dirname + fname, &size_bytes);
          }
          if (st.ok()) {
            st = AddBackupFileWorkItem(
                live_dst_paths, backup_items_to_finish, new_backup_id,
                options_.share_table_files && type == kTableFile, src_dirname,
                fname, rate_limiter, size_bytes, size_limit_bytes,
                options_.share_files_with_checksum && type == kTableFile,
                progress_callback);
          }
          return st;
        } /* copy_file_cb */,
        [&](const std::string& fname, const std::string& contents, FileType) {
          Log(options_.info_log, "add file for backup %s", fname.c_str());
          return AddBackupFileWorkItem(
              live_dst_paths, backup_items_to_finish, new_backup_id,
              false /* shared */, "" /* src_dir */, fname, rate_limiter,
              contents.size(), 0 /* size_limit */, false /* shared_checksum */,
              progress_callback, contents);
        } /* create_file_cb */,
        &sequence_number, flush_before_backup ? 0 : port::kMaxUint64);
    if (s.ok()) {
      new_backup->SetSequenceNumber(sequence_number);
839
840
    }
  }
841
  ROCKS_LOG_INFO(options_.info_log, "add files for backup done, wait finish.");
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
  Status item_status;
  for (auto& item : backup_items_to_finish) {
    item.result.wait();
    auto result = item.result.get();
    item_status = result.status;
    if (item_status.ok() && item.shared && item.needed_to_copy) {
      item_status = item.backup_env->RenameFile(item.dst_path_tmp,
                                                item.dst_path);
    }
    if (item_status.ok()) {
      item_status = new_backup.get()->AddFile(
              std::make_shared<FileInfo>(item.dst_relative,
                                         result.size,
                                         result.checksum_value));
    }
    if (!item_status.ok()) {
      s = item_status;
Igor Canadi's avatar
Igor Canadi committed
859
860
861
862
    }
  }

  // we copied all the files, enable file deletions
863
  db->EnableFileDeletions(false);
Igor Canadi's avatar
Igor Canadi committed
864

865
866
  auto backup_time = backup_env_->NowMicros() - start_backup;

Igor Canadi's avatar
Igor Canadi committed
867
868
  if (s.ok()) {
    // persist the backup metadata on the disk
869
    s = new_backup->StoreToFile(options_.sync);
Igor Canadi's avatar
Igor Canadi committed
870
  }
Igor Canadi's avatar
Igor Canadi committed
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
  if (s.ok() && options_.sync) {
    unique_ptr<Directory> backup_private_directory;
    backup_env_->NewDirectory(
        GetAbsolutePath(GetPrivateFileRel(new_backup_id, false)),
        &backup_private_directory);
    if (backup_private_directory != nullptr) {
      backup_private_directory->Fsync();
    }
    if (private_directory_ != nullptr) {
      private_directory_->Fsync();
    }
    if (meta_directory_ != nullptr) {
      meta_directory_->Fsync();
    }
    if (shared_directory_ != nullptr) {
      shared_directory_->Fsync();
    }
    if (backup_directory_ != nullptr) {
      backup_directory_->Fsync();
    }
  }

893
894
895
  if (s.ok()) {
    backup_statistics_.IncrementNumberSuccessBackup();
  }
Igor Canadi's avatar
Igor Canadi committed
896
  if (!s.ok()) {
897
    backup_statistics_.IncrementNumberFailBackup();
Igor Canadi's avatar
Igor Canadi committed
898
    // clean all the files we might have created
899
900
901
902
    ROCKS_LOG_INFO(options_.info_log, "Backup failed -- %s",
                   s.ToString().c_str());
    ROCKS_LOG_INFO(options_.info_log, "Backup Statistics %s\n",
                   backup_statistics_.ToString().c_str());
Igor Canadi's avatar
Igor Canadi committed
903
904
905
    // delete files that we might have already written
    DeleteBackup(new_backup_id);
    GarbageCollect();
Igor Canadi's avatar
Igor Canadi committed
906
907
908
909
910
911
    return s;
  }

  // here we know that we succeeded and installed the new backup
  // in the LATEST_BACKUP file
  latest_backup_id_ = new_backup_id;
912
  latest_valid_backup_id_ = new_backup_id;
913
  ROCKS_LOG_INFO(options_.info_log, "Backup DONE. All is good");
914
915

  // backup_speed is in byte/second
916
  double backup_speed = new_backup->GetSize() / (1.048576 * backup_time);
917
918
  ROCKS_LOG_INFO(options_.info_log, "Backup number of files: %u",
                 new_backup->GetNumberFiles());
919
920
  char human_size[16];
  AppendHumanBytes(new_backup->GetSize(), human_size, sizeof(human_size));
921
922
923
924
925
926
  ROCKS_LOG_INFO(options_.info_log, "Backup size: %s", human_size);
  ROCKS_LOG_INFO(options_.info_log, "Backup time: %" PRIu64 " microseconds",
                 backup_time);
  ROCKS_LOG_INFO(options_.info_log, "Backup speed: %.3f MB/s", backup_speed);
  ROCKS_LOG_INFO(options_.info_log, "Backup Statistics %s",
                 backup_statistics_.ToString().c_str());
Igor Canadi's avatar
Igor Canadi committed
927
928
929
  return s;
}

Igor Canadi's avatar
Igor Canadi committed
930
Status BackupEngineImpl::PurgeOldBackups(uint32_t num_backups_to_keep) {
931
  assert(initialized_);
Igor Canadi's avatar
Igor Canadi committed
932
  assert(!read_only_);
933
934
  ROCKS_LOG_INFO(options_.info_log, "Purging old backups, keeping %u",
                 num_backups_to_keep);
Hasnain Lakhani's avatar
Hasnain Lakhani committed
935
936
937
938
939
940
941
  std::vector<BackupID> to_delete;
  auto itr = backups_.begin();
  while ((backups_.size() - to_delete.size()) > num_backups_to_keep) {
    to_delete.push_back(itr->first);
    itr++;
  }
  for (auto backup_id : to_delete) {
942
943
944
945
    auto s = DeleteBackup(backup_id);
    if (!s.ok()) {
      return s;
    }
Igor Canadi's avatar
Igor Canadi committed
946
947
948
949
  }
  return Status::OK();
}

Igor Canadi's avatar
Igor Canadi committed
950
Status BackupEngineImpl::DeleteBackup(BackupID backup_id) {
951
  assert(initialized_);
Igor Canadi's avatar
Igor Canadi committed
952
  assert(!read_only_);
953
  ROCKS_LOG_INFO(options_.info_log, "Deleting backup %u", backup_id);
Igor Canadi's avatar
Igor Canadi committed
954
  auto backup = backups_.find(backup_id);
Hasnain Lakhani's avatar
Hasnain Lakhani committed
955
  if (backup != backups_.end()) {
956
957
958
959
    auto s = backup->second->Delete();
    if (!s.ok()) {
      return s;
    }
Hasnain Lakhani's avatar
Hasnain Lakhani committed
960
961
962
963
964
965
    backups_.erase(backup);
  } else {
    auto corrupt = corrupt_backups_.find(backup_id);
    if (corrupt == corrupt_backups_.end()) {
      return Status::NotFound("Backup not found");
    }
966
967
968
969
    auto s = corrupt->second.second->Delete();
    if (!s.ok()) {
      return s;
    }
Hasnain Lakhani's avatar
Hasnain Lakhani committed
970
971
972
973
974
    corrupt_backups_.erase(corrupt);
  }

  std::vector<std::string> to_delete;
  for (auto& itr : backuped_file_infos_) {
975
    if (itr.second->refs == 0) {
Hasnain Lakhani's avatar
Hasnain Lakhani committed
976
      Status s = backup_env_->DeleteFile(GetAbsolutePath(itr.first));
977
978
      ROCKS_LOG_INFO(options_.info_log, "Deleting %s -- %s", itr.first.c_str(),
                     s.ToString().c_str());
Hasnain Lakhani's avatar
Hasnain Lakhani committed
979
980
981
982
983
      to_delete.push_back(itr.first);
    }
  }
  for (auto& td : to_delete) {
    backuped_file_infos_.erase(td);
Igor Canadi's avatar
Igor Canadi committed
984
  }
Hasnain Lakhani's avatar
Hasnain Lakhani committed
985
986
987
988
989

  // take care of private dirs -- GarbageCollect() will take care of them
  // if they are not empty
  std::string private_dir = GetPrivateFileRel(backup_id);
  Status s = backup_env_->DeleteDir(GetAbsolutePath(private_dir));
990
991
  ROCKS_LOG_INFO(options_.info_log, "Deleting private dir %s -- %s",
                 private_dir.c_str(), s.ToString().c_str());
Igor Canadi's avatar
Igor Canadi committed
992
993
994
  return Status::OK();
}

Igor Canadi's avatar
Igor Canadi committed
995
void BackupEngineImpl::GetBackupInfo(std::vector<BackupInfo>* backup_info) {
996
  assert(initialized_);
Igor Canadi's avatar
Igor Canadi committed
997
998
  backup_info->reserve(backups_.size());
  for (auto& backup : backups_) {
999
    if (!backup.second->Empty()) {
1000
      backup_info->push_back(BackupInfo(
For faster browsing, not all history is shown. View entire blame