1515#include < vector>
1616
1717#include " file/filename.h"
18+ #include " file/line_file_reader.h"
1819#include " port/port_posix.h"
1920#include " rocksdb/advanced_options.h"
2021#include " rocksdb/convenience.h"
2122#include " rocksdb/db.h"
2223#include " rocksdb/env.h"
24+ #include " rocksdb/file_system.h"
2325#include " rocksdb/options.h"
26+ #include " rocksdb/slice.h"
2427#include " rocksdb/status.h"
2528#include " rocksdb/types.h"
2629#include " rocksdb/utilities/options_util.h"
@@ -40,6 +43,13 @@ DEFINE_string(
4043 " The names of cf to operate with, multiple cf shoule be comma-separated" );
4144DEFINE_string (command, " " , " The command to run" );
4245DEFINE_uint32 (workers, 4 , " The number of worker to run repair process" );
46+ DEFINE_string (corrupt_sst_path, " _corrupted_sst_list.txt" ,
47+ " The file path to store corruption sst file names" );
48+ DEFINE_bool (verbose, false , " Whether if print more informations" );
49+ DEFINE_bool (
50+ wal_recovery_skip_corrupted, false ,
51+ " Whether if set wal_recovery_mode to "
52+ " WALRecoveryMode::kSkipAnyCorruptedRecords, in case of wal corruption" );
4353
4454class CfRepairer {
4555 public:
@@ -48,6 +58,7 @@ class CfRepairer {
4858 void Run (int argc, char ** argv);
4959
5060 private:
61+ void InitialTargetCf ();
5162 void OpenDB (bool read_only);
5263 void CloseDB ();
5364 void RunSstCheck ();
@@ -58,6 +69,12 @@ class CfRepairer {
5869 void ReceiveCheckResults (std::string, rocksdb::channel<std::string>*);
5970 rocksdb::Status CheckSst (const std::string&);
6071
72+ void StoreCheckResults ();
73+
74+ bool ParseLine (const std::string&, std::string*, std::vector<std::string>*);
75+ void ReadCheckResults ();
76+ void ShowCorruptSsts ();
77+
6178 rocksdb::DB* db_;
6279 rocksdb::ConfigOptions config_options_;
6380 rocksdb::Options options_;
@@ -72,17 +89,19 @@ class CfRepairer {
7289 std::shared_ptr<rocksdb::Logger> logger_;
7390
7491 std::vector<std::unique_ptr<rocksdb::port::Thread>> threads_;
75- // std::vector<std::vector<std::string>> chunked_sst_lists_;
7692 std::unordered_map<std::string, std::vector<std::string>> corruption_ssts_;
7793};
7894
7995const char * USAGE =
8096 " USAGE: \n "
8197 " cf_repairer -db_path <DBPATH> -cf_name <CFNAME> -command <COMMAND> "
8298 " [OPTIONS]...\n " ;
99+
83100const char * STAGE_0 = " LoadOptions" ;
84101const char * STAGE_1 = " OpenDB" ;
85102const char * STAGE_2 = " CheckSst" ;
103+ const char * STAGE_3 = " StoreCheckResult" ;
104+ const char * STAGE_4 = " ReadCheckResult" ;
86105
87106void ChunkSstFiles (const std::vector<std::string>& files,
88107 std::vector<std::vector<std::string>>* results) {
@@ -110,7 +129,9 @@ void ChunkSstFiles(const std::vector<std::string>& files,
110129CfRepairer::CfRepairer ()
111130 : db_(nullptr ), db_path_(FLAGS_db_path), cp_path_suffix_(" _checkpoint" ) {
112131 logger_.reset (new rocksdb::StderrLogger ());
132+ }
113133
134+ void CfRepairer::InitialTargetCf () {
114135 if (db_path_.empty ()) {
115136 fprintf (stdout, " [%s] dbpath not specified!\n " , STAGE_0);
116137 Help ();
@@ -163,7 +184,6 @@ CfRepairer::CfRepairer()
163184 STAGE_0, column_families_.size ());
164185
165186 std::unordered_set<std::string> found_cf_names;
166-
167187 REPAIRER_LOG (logger_, " found column families: " );
168188 for (auto cf : column_families_) {
169189 found_cf_names.emplace (cf.name .c_str ());
@@ -189,6 +209,10 @@ void CfRepairer::Help() { fprintf(stdout, "%s\n", USAGE); }
189209
190210void CfRepairer::OpenDB (bool read_only) {
191211 rocksdb::Status s;
212+ if (FLAGS_wal_recovery_skip_corrupted) {
213+ fprintf (stdout, " [%s] would use kSkipAnyCorruptedRecords to OpenDB" , STAGE_1);
214+ options_.wal_recovery_mode = rocksdb::WALRecoveryMode::kSkipAnyCorruptedRecords ;
215+ }
192216 if (read_only) {
193217 s = rocksdb::DB::OpenForReadOnly (options_, db_path_, column_families_,
194218 &cf_handles_, &db_);
@@ -220,14 +244,18 @@ void CfRepairer::Run(int argc, char** argv) {
220244 std::string comm (FLAGS_command);
221245
222246 if (comm == " cf_sst_check" ) {
247+ InitialTargetCf ();
223248 OpenDB (true );
224249 RunSstCheck ();
250+ } else if (comm == " cf_show_sst_check_result" ) {
251+ ReadCheckResults ();
225252 } else if (comm == " cf_sst_archive" ) {
226253 } else if (comm == " cf_restore_health_sst" ) {
227254 } else {
228255 fprintf (stdout,
229256 " Unknown command: %s, available:\n "
230- " cf_sst_check, cf_sst_archive, cf_restore_health_sst\n " ,
257+ " cf_sst_check, cf_sst_archive, cf_show_sst_check_result, "
258+ " cf_restore_health_sst\n " ,
231259 comm.c_str ());
232260 }
233261
@@ -254,6 +282,11 @@ rocksdb::Status CfRepairer::CheckSst(const std::string& file_path) {
254282 rocksdb::Status s;
255283 s = dumper.VerifyChecksum ();
256284
285+ if (FLAGS_verbose) {
286+ REPAIRER_LOG (logger_, " [%s] sst file %s: %s" , STAGE_2, file_path.c_str (),
287+ s.ToString ().c_str ());
288+ }
289+
257290 // we don't need to actually read every kv out?
258291 //
259292 // if (s.ok()) {
@@ -367,6 +400,12 @@ void CfRepairer::RunSstCheck() {
367400 fprintf (stdout, " [%s] [cf = %s] has found %ld sst files to check.\n " ,
368401 STAGE_2, cf.c_str (), cf_sst_files.size ());
369402
403+ if (cf_sst_files.empty ()) {
404+ fprintf (stdout, " [%s] [cf = %s] has no ssts, skipped ... \n " , STAGE_2,
405+ cf.c_str ());
406+ continue ;
407+ }
408+
370409 // chunk sst
371410 std::vector<std::vector<std::string>> chunked_sst_lists;
372411 ChunkSstFiles (cf_sst_files, &chunked_sst_lists);
@@ -416,10 +455,139 @@ void CfRepairer::RunSstCheck() {
416455 " [%s] [cf = %s] + all sst check workers results kept finished.\n " ,
417456 STAGE_2, cf.c_str ());
418457 }
458+
459+ StoreCheckResults ();
460+ fprintf (stdout, " [%s] + store checking results into %s.\n " , STAGE_2,
461+ FLAGS_corrupt_sst_path.c_str ());
462+ }
463+
464+ void CfRepairer::StoreCheckResults () {
465+ size_t total = 0 ;
466+ for (const auto & i : corruption_ssts_) {
467+ total += i.second .size ();
468+ }
469+
470+ std::string output_path (FLAGS_corrupt_sst_path);
471+ const rocksdb::EnvOptions soptions;
472+ std::unique_ptr<rocksdb::WritableFile> output_file;
473+ rocksdb::Status s =
474+ options_.env ->NewWritableFile (output_path, &output_file, soptions);
475+ if (!s.ok ()) {
476+ fprintf (stdout, " [%s] Open output file(%s) failed: %s \n " , STAGE_3,
477+ output_path.c_str (), s.ToString ().c_str ());
478+ return ;
479+ }
480+ fprintf (stdout, " [%s] Output %ld sst names to file: %s \n " , STAGE_3, total,
481+ output_path.c_str ());
482+
483+ for (const auto & cf : corruption_ssts_) {
484+ output_file->Append (" CF:" );
485+ output_file->Append (cf.first );
486+ output_file->Append (" ;" );
487+ for (const auto & item : cf.second ) {
488+ output_file->Append (item);
489+ output_file->Append (" ," );
490+ }
491+ output_file->Append (" \n " );
492+ }
493+
494+ if (total == 0 ) {
495+ fprintf (stdout, " [%s] No corrupted sst found! \n " , STAGE_3);
496+ }
497+ output_file->Fsync ();
498+ output_file->Close ();
499+ }
500+
501+ bool CfRepairer::ParseLine (const std::string& line, std::string* cf,
502+ std::vector<std::string>* ssts) {
503+ cf->clear ();
504+ ssts->clear ();
505+ rocksdb::Slice s (line);
506+
507+ if (line.empty ()) {
508+ fprintf (stdout, " [%s] empty line, skip! \n " , STAGE_4);
509+ return false ;
510+ }
511+ if (!s.starts_with (" CF:" )) {
512+ fprintf (stdout, " [%s] parse failed, origianl text: %s \n " , STAGE_4,
513+ line.c_str ());
514+ return false ;
515+ }
516+ std::string raw = line.substr (3 );
517+ std::vector<std::string> cf_and_ssts = rocksdb::StringSplit (raw, ' ;' );
518+ if (cf_and_ssts.size () != 2 ) {
519+ fprintf (stdout, " [%s] parse failed, origianl text: %s \n " , STAGE_4,
520+ line.c_str ());
521+ return false ;
522+ }
523+
524+ cf->assign (cf_and_ssts[0 ]);
525+ if (!cf_and_ssts[1 ].empty ()) {
526+ std::vector<std::string> sst_names =
527+ rocksdb::StringSplit (cf_and_ssts[1 ], ' ,' );
528+ for (const auto & i : sst_names) {
529+ if (!i.empty ()) {
530+ ssts->emplace_back (i);
531+ }
532+ }
533+ }
534+ fprintf (stdout, " [%s] parse success, cf = %s, sst counts = %ld \n " , STAGE_4,
535+ cf->c_str (), ssts->size ());
536+ return true ;
537+ }
538+
539+ void CfRepairer::ReadCheckResults () {
540+ const rocksdb::EnvOptions soptions;
541+ std::shared_ptr<rocksdb::FileSystem> fs = options_.env ->GetFileSystem ();
542+ std::unique_ptr<rocksdb::LineFileReader> input_file;
543+
544+ std::string input_path (FLAGS_corrupt_sst_path);
545+ rocksdb::Status s = rocksdb::LineFileReader::Create (
546+ fs, input_path, rocksdb::FileOptions (), &input_file, nullptr , nullptr );
547+ if (!s.ok ()) {
548+ fprintf (stdout, " [%s] Open input file(%s) failed: %s \n " , STAGE_4,
549+ input_path.c_str (), s.ToString ().c_str ());
550+ return ;
551+ }
552+
553+ int lines = 0 ;
554+ std::string buf;
555+ std::string cf;
556+ std::vector<std::string> ssts;
557+
558+ while (input_file->ReadLine (&buf, rocksdb::Env::IO_TOTAL)) {
559+ if (ParseLine (buf, &cf, &ssts)) {
560+ corruption_ssts_.insert (std::make_pair (cf, ssts));
561+ }
562+ lines++;
563+ }
564+ if (!buf.empty ()) {
565+ if (ParseLine (buf, &cf, &ssts)) {
566+ corruption_ssts_.insert (std::make_pair (cf, ssts));
567+ }
568+ }
569+
570+ fprintf (stdout,
571+ " [%s] read corrupted sst list from %s success, %d lines parsed\n " ,
572+ STAGE_4, input_path.c_str (), lines);
573+ ShowCorruptSsts ();
574+ }
575+
576+ void CfRepairer::ShowCorruptSsts () {
577+ REPAIRER_LOG (logger_, " ======Corrupted Ssts=======" );
578+ for (const auto & cf : corruption_ssts_) {
579+ REPAIRER_LOG (logger_, " - CF: %s" , cf.first .c_str ());
580+ std::string sst_list;
581+ for (const auto & sst : cf.second ) {
582+ sst_list.append (sst);
583+ sst_list.append (" ," );
584+ }
585+ REPAIRER_LOG (logger_, " %s" , sst_list.c_str ());
586+ }
587+ REPAIRER_LOG (logger_, " ===========================" );
419588}
420589
421590int main (int argc, char ** argv) {
422- fprintf (stdout, " %d\n " , argc);
423591 gflags::SetVersionString (rocksdb::GetRocksVersionAsString (true ));
424592 gflags::SetUsageMessage (USAGE);
425593 if (argc < 2 ) {
0 commit comments