Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions include/pika_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -522,6 +522,7 @@ class PikaServer : public pstd::noncopyable {
*/
void DoTimingTask();
void AutoCompactRange();
void AutoProgressiveCompact();
void AutoBinlogPurge();
void AutoServerlogPurge();
void AutoDeleteExpiredDump();
Expand Down Expand Up @@ -552,6 +553,7 @@ class PikaServer : public pstd::noncopyable {
*/
bool have_scheduled_crontask_ = false;
struct timeval last_check_compact_time_;
struct timeval last_progressive_compact_time_;

/*
* ResumeDB used
Expand Down
58 changes: 46 additions & 12 deletions src/pika_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1126,6 +1126,8 @@ int PikaServer::ClientPubSubChannelPatternSize(const std::shared_ptr<NetConn>& c
void PikaServer::DoTimingTask() {
// Maybe schedule compactrange
AutoCompactRange();
// Progressive compact
AutoProgressiveCompact();
// Purge serverlog
AutoServerlogPurge();
// Purge binlog
Expand Down Expand Up @@ -1249,6 +1251,33 @@ void PikaServer::AutoCompactRange() {
}
}

void PikaServer::AutoProgressiveCompact() {
struct timeval now;
gettimeofday(&now, nullptr);

// Execute progressive compact every 60 seconds
if (last_progressive_compact_time_.tv_sec == 0 ||
now.tv_sec - last_progressive_compact_time_.tv_sec >= 60) {
gettimeofday(&last_progressive_compact_time_, nullptr);

std::shared_lock db_rwl(dbs_rw_);
for (const auto& db_item : dbs_) {
db_item.second->DBLockShared();
auto storage = db_item.second->storage();
if (storage) {
Status s = storage->LongestNotCompactionSstCompact(storage::DataType::kAll);
if (!s.ok()) {
LOG(WARNING) << "Progressive compact for DB: " << db_item.first
<< " failed: " << s.ToString();
} else {
LOG(INFO) << "Progressive compact for DB: " << db_item.first << " completed";
}
}
db_item.second->DBUnlockShared();
}
}
}

void PikaServer::AutoBinlogPurge() { DoSameThingEveryDB(TaskType::kPurgeLog); }

void PikaServer::AutoServerlogPurge() {
Expand Down Expand Up @@ -1309,25 +1338,30 @@ void PikaServer::AutoServerlogPurge() {

// Process files for each log level
for (auto& [level, files] : log_files_by_level) {
// Sort by time in descending order
// Sort by time in descending order (newest first)
std::sort(files.begin(), files.end(),
[](const auto& a, const auto& b) { return a.second > b.second; });

bool has_recent_file = false;
// Keep the most recent file for each level, delete others that exceed retention_time
bool is_first = true;
for (const auto& [file, log_timestamp] : files) {
double diff_seconds = difftime(now_timestamp, log_timestamp);
int64_t interval_days = static_cast<int64_t>(diff_seconds / 86400);
if (interval_days <= retention_time) {
has_recent_file = true;
// Always keep the most recent file for each log level
if (is_first) {
is_first = false;
continue;
}
if (!has_recent_file) {
has_recent_file = true;
continue;

double diff_seconds = difftime(now_timestamp, log_timestamp);
int64_t interval_days = static_cast<int64_t>(diff_seconds / 86400);

// Delete files that exceed the retention time
if (interval_days > retention_time) {
std::string log_file = log_path + "/" + file;
LOG(INFO) << "Deleting out of date log file: " << log_file;
if(!pstd::DeleteFile(log_file)) {
LOG(ERROR) << "Failed to delete log file: " << log_file;
}
}
std::string log_file = log_path + "/" + file;
LOG(INFO) << "Deleting out of date log file: " << log_file;
if(!pstd::DeleteFile(log_file)) LOG(ERROR) << "Failed to delete log file: " << log_file;
}
}
}
Expand Down
Loading