Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions include/pika_admin.h
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,29 @@ class CompactCmd : public Cmd {
std::set<std::string> compact_dbs_;
};

// we can use pika/tests/helpers/test_queue.py to test this command
class CompactRangeCmd : public Cmd {

@AlexStocks AlexStocks Nov 24, 2023

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

非 Redis 指令而是作为Pika 内置指令的话, 建议改为 PkCompactRangeCmd

public:
CompactRangeCmd(const std::string& name, int arity, uint16_t flag) : Cmd(name, arity, flag) {}
void Do(std::shared_ptr<Slot> slot = nullptr) override;
void Split(std::shared_ptr<Slot> slot, const HintKeys& hint_keys) override {};
void Merge() override {};
Cmd* Clone() override { return new CompactRangeCmd(*this); }

private:
void DoInitial() override;
void Clear() override {
struct_type_.clear();
compact_dbs_.clear();
start_key_.clear();
end_key_.clear();
}
std::string struct_type_;
std::set<std::string> compact_dbs_;
std::string start_key_;
std::string end_key_;
};

class PurgelogstoCmd : public Cmd {
public:
PurgelogstoCmd(const std::string& name, int arity, uint16_t flag) : Cmd(name, arity, flag) {}
Expand Down
2 changes: 1 addition & 1 deletion include/pika_binlog.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class Version final : public pstd::noncopyable {

void debug() {
std::shared_lock l(rwlock_);
printf("Current pro_num %u pro_offset %llu\n", pro_num_, pro_offset_);
printf("Current pro_num %u pro_offset %lu\n", pro_num_, pro_offset_);
}

private:
Expand Down
1 change: 1 addition & 0 deletions include/pika_command.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ const std::string kCmdNameDbSlaveof = "dbslaveof";
const std::string kCmdNameAuth = "auth";
const std::string kCmdNameBgsave = "bgsave";
const std::string kCmdNameCompact = "compact";
const std::string kCmdNameCompactRange = "compactrange";

@AlexStocks AlexStocks Nov 24, 2023

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

建议改为 pkcompactrange,类似于 pksetatex pkrange,标明这是一个 pika 内置指令,区别于 redis 指令

const std::string kCmdNamePurgelogsto = "purgelogsto";
const std::string kCmdNamePing = "ping";
const std::string kCmdNameSelect = "select";
Expand Down
1 change: 1 addition & 0 deletions include/pika_db.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ class DB : public std::enable_shared_from_this<DB>, public pstd::noncopyable {

// Compact use;
void Compact(const storage::DataType& type);
void CompactRange(const storage::DataType& type, const std::string& start, const std::string& end);

void LeaveAllSlot();
std::set<uint32_t> GetSlotIDs();
Expand Down
14 changes: 13 additions & 1 deletion include/pika_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,18 @@ enum TaskType {
kStartKeyScan,
kStopKeyScan,
kBgSave,
kCompactRangeStrings,
kCompactRangeHashes,
kCompactRangeSets,
kCompactRangeZSets,
kCompactRangeList,
};

struct TaskArg {
TaskType type;
std::vector<std::string> argv;
TaskArg(TaskType t) : type(t) {}
TaskArg(TaskType t, const std::vector<std::string>& a) : type(t), argv(a) {}
};

void DoBgslotscleanup(void* arg);
Expand Down Expand Up @@ -180,7 +192,7 @@ class PikaServer : public pstd::noncopyable {
bool IsDBExist(const std::string& db_name);
bool IsDBSlotExist(const std::string& db_name, uint32_t slot_id);
bool IsDBBinlogIoError(const std::string& db_name);
pstd::Status DoSameThingSpecificDB(const TaskType& type, const std::set<std::string>& dbs = {});
pstd::Status DoSameThingSpecificDB(const std::set<std::string>& dbs, const TaskArg& arg);
std::shared_mutex& GetDBLock() {
return dbs_rw_;
}
Expand Down
1 change: 1 addition & 0 deletions include/pika_slot.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ class Slot : public std::enable_shared_from_this<Slot>,public pstd::noncopyable
std::shared_ptr<storage::Storage> db() const;

void Compact(const storage::DataType& type);
void CompactRange(const storage::DataType& type, const std::string& start, const std::string& end);

void DbRWLockWriter();
void DbRWLockReader();
Expand Down
3 changes: 2 additions & 1 deletion src/net/src/net_multiplexer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ bool NetMultiplexer::Register(const NetItem& it, bool force) {
}
notify_queue_protector_.unlock();
if (success) {
write(notify_send_fd_, "", 1);
ssize_t n = write(notify_send_fd_, "", 1);
(void)(n);
}
return success;
}
Expand Down
9 changes: 6 additions & 3 deletions src/net/src/net_pubsub.cc
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,8 @@ int PubSubThread::Publish(const std::string& channel, const std::string& msg) {
channel_ = channel;
message_ = msg;
// Send signal to ThreadMain()
write(msg_pfd_[1], "", 1);
ssize_t n = write(msg_pfd_[1], "", 1);
(void)(n);
std::unique_lock lock(receiver_mutex_);
receiver_rsignal_.wait(lock, [this]() { return receivers_ != -1; });

Expand Down Expand Up @@ -345,7 +346,8 @@ void* PubSubThread::ThreadMain() {
pfe = (net_multiplexer_->FiredEvents()) + i;
if (pfe->fd == net_multiplexer_->NotifyReceiveFd()) { // New connection comming
if (pfe->mask & kReadable) {
read(net_multiplexer_->NotifyReceiveFd(), triger, 1);
ssize_t n = read(net_multiplexer_->NotifyReceiveFd(), triger, 1);
(void)(n);
{
NetItem ti = net_multiplexer_->NotifyQueuePop();
if (ti.notify_type() == kNotiClose) {
Expand All @@ -365,7 +367,8 @@ void* PubSubThread::ThreadMain() {
}
if (pfe->fd == msg_pfd_[0]) { // Publish message
if (pfe->mask & kReadable) {
read(msg_pfd_[0], triger, 1);
ssize_t n = read(msg_pfd_[0], triger, 1);
(void)(n);
std::string channel;
std::string msg;
int32_t receivers = 0;
Expand Down
65 changes: 55 additions & 10 deletions src/pika_admin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ void BgsaveCmd::DoInitial() {
}

void BgsaveCmd::Do(std::shared_ptr<Slot> slot) {
g_pika_server->DoSameThingSpecificDB(TaskType::kBgSave, bgsave_dbs_);
g_pika_server->DoSameThingSpecificDB(bgsave_dbs_, {TaskType::kBgSave});
LogCommand();
res_.AppendContent("+Background saving started");
}
Expand Down Expand Up @@ -342,17 +342,62 @@ void CompactCmd::DoInitial() {

void CompactCmd::Do(std::shared_ptr<Slot> slot) {
if (strcasecmp(struct_type_.data(), "all") == 0) {
g_pika_server->DoSameThingSpecificDB(TaskType::kCompactAll, compact_dbs_);
g_pika_server->DoSameThingSpecificDB(compact_dbs_, {TaskType::kCompactAll});
} else if (strcasecmp(struct_type_.data(), "string") == 0) {
g_pika_server->DoSameThingSpecificDB(TaskType::kCompactStrings, compact_dbs_);
g_pika_server->DoSameThingSpecificDB(compact_dbs_, {TaskType::kCompactStrings});
} else if (strcasecmp(struct_type_.data(), "hash") == 0) {
g_pika_server->DoSameThingSpecificDB(TaskType::kCompactHashes, compact_dbs_);
g_pika_server->DoSameThingSpecificDB(compact_dbs_, {TaskType::kCompactHashes});
} else if (strcasecmp(struct_type_.data(), "set") == 0) {
g_pika_server->DoSameThingSpecificDB(TaskType::kCompactSets, compact_dbs_);
g_pika_server->DoSameThingSpecificDB(compact_dbs_, {TaskType::kCompactSets});
} else if (strcasecmp(struct_type_.data(), "zset") == 0) {
g_pika_server->DoSameThingSpecificDB(TaskType::kCompactZSets, compact_dbs_);
g_pika_server->DoSameThingSpecificDB(compact_dbs_, {TaskType::kCompactZSets});
} else if (strcasecmp(struct_type_.data(), "list") == 0) {
g_pika_server->DoSameThingSpecificDB(TaskType::kCompactList, compact_dbs_);
g_pika_server->DoSameThingSpecificDB(compact_dbs_, {TaskType::kCompactList});
} else {
res_.SetRes(CmdRes::kInvalidDbType, struct_type_);
return;
}
LogCommand();
res_.SetRes(CmdRes::kOk);
}

void CompactRangeCmd::DoInitial() {
if (!CheckArg(argv_.size())) {
res_.SetRes(CmdRes::kWrongNum, kCmdNameCompactRange);
return;
}

if (g_pika_server->IsKeyScaning()) {
res_.SetRes(CmdRes::kErrOther, "The info keyspace operation is executing, Try again later");
return;
}

std::vector<std::string> dbs;
pstd::StringSplit(argv_[1], COMMA, dbs);
for (const auto& db : dbs) {
if (!g_pika_server->IsDBExist(db)) {
res_.SetRes(CmdRes::kInvalidDB, db);
return;
} else {
compact_dbs_.insert(db);
}
}
struct_type_ = argv_[2];
start_key_ = argv_[3];
end_key_ = argv_[4];
}

void CompactRangeCmd::Do(std::shared_ptr<Slot> slot) {
if (strcasecmp(struct_type_.data(), "string") == 0) {
g_pika_server->DoSameThingSpecificDB(compact_dbs_, {TaskType::kCompactRangeStrings, {start_key_, end_key_}});
} else if (strcasecmp(struct_type_.data(), "hash") == 0) {
g_pika_server->DoSameThingSpecificDB(compact_dbs_, {TaskType::kCompactRangeHashes, {start_key_, end_key_}});
} else if (strcasecmp(struct_type_.data(), "set") == 0) {
g_pika_server->DoSameThingSpecificDB(compact_dbs_, {TaskType::kCompactRangeSets, {start_key_, end_key_}});
} else if (strcasecmp(struct_type_.data(), "zset") == 0) {
g_pika_server->DoSameThingSpecificDB(compact_dbs_, {TaskType::kCompactRangeZSets, {start_key_, end_key_}});
} else if (strcasecmp(struct_type_.data(), "list") == 0) {
g_pika_server->DoSameThingSpecificDB(compact_dbs_, {TaskType::kCompactRangeList, {start_key_, end_key_}});
} else {
res_.SetRes(CmdRes::kInvalidDbType, struct_type_);
return;
Expand Down Expand Up @@ -677,7 +722,7 @@ void ClientCmd::Do(std::shared_ptr<Slot> slot) {
std::sort(clients.begin(), clients.end(), IdleCompare);
}
while (iter != clients.end()) {
snprintf(buf, sizeof(buf), "addr=%s fd=%d idle=%lld\n", iter->ip_port.c_str(), iter->fd,
snprintf(buf, sizeof(buf), "addr=%s fd=%d idle=%ld\n", iter->ip_port.c_str(), iter->fd,
iter->last_interaction == 0 ? 0 : now.tv_sec - iter->last_interaction); // NOLINT
reply.append(buf);
iter++;
Expand Down Expand Up @@ -1188,7 +1233,7 @@ void InfoCmd::InfoReplication(std::string& info) {

void InfoCmd::InfoKeyspace(std::string& info) {
if (off_) {
g_pika_server->DoSameThingSpecificDB(TaskType::kStopKeyScan, keyspace_scan_dbs_);
g_pika_server->DoSameThingSpecificDB(keyspace_scan_dbs_, {TaskType::kStopKeyScan});
info.append("OK\r\n");
return;
}
Expand Down Expand Up @@ -1247,7 +1292,7 @@ void InfoCmd::InfoKeyspace(std::string& info) {
info.append(tmp_stream.str());

if (rescan_) {
g_pika_server->DoSameThingSpecificDB(TaskType::kStartKeyScan, keyspace_scan_dbs_);
g_pika_server->DoSameThingSpecificDB(keyspace_scan_dbs_, {TaskType::kStartKeyScan});
}
}

Expand Down
2 changes: 2 additions & 0 deletions src/pika_command.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ void InitCmdTable(CmdTable* cmd_table) {
cmd_table->insert(std::pair<std::string, std::unique_ptr<Cmd>>(kCmdNameBgsave, std::move(bgsaveptr)));
std::unique_ptr<Cmd> compactptr = std::make_unique<CompactCmd>(kCmdNameCompact, -1, kCmdFlagsRead | kCmdFlagsAdmin);
cmd_table->insert(std::pair<std::string, std::unique_ptr<Cmd>>(kCmdNameCompact, std::move(compactptr)));
std::unique_ptr<Cmd> compactrangeptr = std::make_unique<CompactRangeCmd>(kCmdNameCompactRange, 5, kCmdFlagsRead | kCmdFlagsAdmin);
cmd_table->insert(std::pair<std::string, std::unique_ptr<Cmd>>(kCmdNameCompactRange, std::move(compactrangeptr)));
std::unique_ptr<Cmd> purgelogsto =
std::make_unique<PurgelogstoCmd>(kCmdNamePurgelogsto, -2, kCmdFlagsRead | kCmdFlagsAdmin);
cmd_table->insert(std::pair<std::string, std::unique_ptr<Cmd>>(kCmdNamePurgelogsto, std::move(purgelogsto)));
Expand Down
7 changes: 7 additions & 0 deletions src/pika_db.cc
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,13 @@ void DB::Compact(const storage::DataType& type) {
}
}

void DB::CompactRange(const storage::DataType& type, const std::string& start, const std::string& end) {
std::lock_guard rwl(slots_rw_);
for (const auto& item : slots_) {
item.second->CompactRange(type, start, end);
}
}

void DB::DoKeyScan(void* arg) {
std::unique_ptr <BgTaskArg> bg_task_arg(static_cast<BgTaskArg*>(arg));
bg_task_arg->db->RunKeyScan();
Expand Down
92 changes: 56 additions & 36 deletions src/pika_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -433,44 +433,57 @@ bool PikaServer::IsDBBinlogIoError(const std::string& db_name) {
return db ? db->IsBinlogIoError() : true;
}

// If no collection of specified dbs is given, we execute task in all dbs
Status PikaServer::DoSameThingSpecificDB(const TaskType& type, const std::set<std::string>& dbs) {
Status PikaServer::DoSameThingSpecificDB(const std::set<std::string>& dbs, const TaskArg& arg) {
std::shared_lock rwl(dbs_rw_);
for (const auto& db_item : dbs_) {
if (!dbs.empty() && dbs.find(db_item.first) == dbs.end()) {
if (dbs.find(db_item.first) == dbs.end()) {
continue;
} else {
switch (type) {
case TaskType::kCompactAll:
db_item.second->Compact(storage::DataType::kAll);
break;
case TaskType::kCompactStrings:
db_item.second->Compact(storage::DataType::kStrings);
break;
case TaskType::kCompactHashes:
db_item.second->Compact(storage::DataType::kHashes);
break;
case TaskType::kCompactSets:
db_item.second->Compact(storage::DataType::kSets);
break;
case TaskType::kCompactZSets:
db_item.second->Compact(storage::DataType::kZSets);
break;
case TaskType::kCompactList:
db_item.second->Compact(storage::DataType::kLists);
break;
case TaskType::kStartKeyScan:
db_item.second->KeyScan();
break;
case TaskType::kStopKeyScan:
db_item.second->StopKeyScan();
break;
case TaskType::kBgSave:
db_item.second->BgSaveDB();
break;
default:
break;
}
}
switch (arg.type) {
case TaskType::kCompactAll:
db_item.second->Compact(storage::DataType::kAll);
break;
case TaskType::kCompactStrings:
db_item.second->Compact(storage::DataType::kStrings);
break;
case TaskType::kCompactHashes:
db_item.second->Compact(storage::DataType::kHashes);
break;
case TaskType::kCompactSets:
db_item.second->Compact(storage::DataType::kSets);
break;
case TaskType::kCompactZSets:
db_item.second->Compact(storage::DataType::kZSets);
break;
case TaskType::kCompactList:
db_item.second->Compact(storage::DataType::kLists);
break;
case TaskType::kStartKeyScan:
db_item.second->KeyScan();
break;
case TaskType::kStopKeyScan:
db_item.second->StopKeyScan();
break;
case TaskType::kBgSave:
db_item.second->BgSaveDB();
break;
case TaskType::kCompactRangeStrings:
db_item.second->CompactRange(storage::DataType::kStrings, arg.argv[0], arg.argv[1]);
break;
case TaskType::kCompactRangeHashes:
db_item.second->CompactRange(storage::DataType::kHashes, arg.argv[0], arg.argv[1]);
break;
case TaskType::kCompactRangeSets:
db_item.second->CompactRange(storage::DataType::kSets, arg.argv[0], arg.argv[1]);
break;
case TaskType::kCompactRangeZSets:
db_item.second->CompactRange(storage::DataType::kZSets, arg.argv[0], arg.argv[1]);
break;
case TaskType::kCompactRangeList:
db_item.second->CompactRange(storage::DataType::kLists, arg.argv[0], arg.argv[1]);
break;
default:
break;
}
}
return Status::OK();
Expand Down Expand Up @@ -1333,7 +1346,14 @@ void PikaServer::AutoCompactRange() {
if (last_check_compact_time_.tv_sec == 0 || now.tv_sec - last_check_compact_time_.tv_sec >= interval * 3600) {
gettimeofday(&last_check_compact_time_, nullptr);
if ((static_cast<double>(free_size) / static_cast<double>(total_size)) * 100 >= usage) {
Status s = DoSameThingSpecificDB(TaskType::kCompactAll);
std::set<std::string> dbs;
{
std::shared_lock l(dbs_rw_);
for (const auto& db_item : dbs_) {
dbs.insert(db_item.first);
}
}
Status s = DoSameThingSpecificDB(dbs, {TaskType::kCompactAll});
if (s.ok()) {
LOG(INFO) << "[Interval]schedule compactRange, freesize: " << free_size / 1048576
<< "MB, disksize: " << total_size / 1048576 << "MB";
Expand Down
7 changes: 7 additions & 0 deletions src/pika_slot.cc
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,13 @@ void Slot::Compact(const storage::DataType& type) {
db_->Compact(type);
}

void Slot::CompactRange(const storage::DataType& type, const std::string& start, const std::string& end) {
if (!opened_) {
return;
}
db_->CompactRange(type, start, end);
}

void Slot::DbRWLockWriter() { db_rwlock_.lock(); }

void Slot::DbRWLockReader() { db_rwlock_.lock_shared(); }
Expand Down
2 changes: 1 addition & 1 deletion src/pika_slot_command.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1573,7 +1573,7 @@ void SlotsReloadCmd::Do(std::shared_ptr<Slot> slot) {
g_pika_server->Bgslotsreload(slot);
const PikaServer::BGSlotsReload &info = g_pika_server->bgslots_reload();
char buf[256];
snprintf(buf, sizeof(buf), "+%s : %lld", info.s_start_time.c_str(), g_pika_server->GetSlotsreloadingCursor());
snprintf(buf, sizeof(buf), "+%s : %ld", info.s_start_time.c_str(), g_pika_server->GetSlotsreloadingCursor());
res_.AppendContent(buf);
return;
}
Expand Down
Loading