Skip to content

Commit 4f05a00

Browse files
committed
wrr_locality: rewrite as standalone LB with extracted OrcaWeightManager and locality_lb_config support
Signed-off-by: jukie <isaac.wilson514@gmail.com>
1 parent e7b6400 commit 4f05a00

15 files changed

Lines changed: 1337 additions & 85 deletions

File tree

api/envoy/extensions/load_balancing_policies/wrr_locality/v3/BUILD

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ licenses(["notice"]) # Apache 2
77
api_proto_package(
88
deps = [
99
"//envoy/config/cluster/v3:pkg",
10+
"//envoy/extensions/load_balancing_policies/common/v3:pkg",
1011
"@xds//udpa/annotations:pkg",
1112
],
1213
)

api/envoy/extensions/load_balancing_policies/wrr_locality/v3/wrr_locality.proto

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ syntax = "proto3";
33
package envoy.extensions.load_balancing_policies.wrr_locality.v3;
44

55
import "envoy/config/cluster/v3/cluster.proto";
6+
import "envoy/extensions/load_balancing_policies/common/v3/common.proto";
67

78
import "udpa/annotations/status.proto";
89
import "validate/validate.proto";
@@ -22,4 +23,10 @@ message WrrLocality {
2223
// The child LB policy to create for endpoint-picking within the chosen locality.
2324
config.cluster.v3.LoadBalancingPolicy endpoint_picking_policy = 1
2425
[(validate.rules).message = {required: true}];
26+
27+
// Optional locality LB configuration. When set with ``zone_aware_lb_config``,
28+
// enables zone-aware routing with full feature access (``force_local_zone``,
29+
// ``locality_basis``, etc.). When set with ``locality_weighted_lb_config`` or
30+
// when not set, enables locality-weighted load balancing (current default behavior).
31+
common.v3.LocalityLbConfig locality_lb_config = 2;
2532
}

source/extensions/load_balancing_policies/common/BUILD

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,19 @@ envoy_cc_library(
4444
],
4545
)
4646

47+
envoy_cc_library(
48+
name = "orca_weight_manager_lib",
49+
srcs = ["orca_weight_manager.cc"],
50+
hdrs = ["orca_weight_manager.h"],
51+
deps = [
52+
"//envoy/upstream:upstream_interface",
53+
"//source/common/common:callback_impl_lib",
54+
"//source/common/common:minimal_logger_lib",
55+
"//source/common/orca:orca_load_metrics_lib",
56+
"@xds//xds/data/orca/v3:pkg_cc_proto",
57+
],
58+
)
59+
4760
envoy_cc_library(
4861
name = "load_balancer_lib",
4962
srcs = ["load_balancer_impl.cc"],
Lines changed: 228 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,228 @@
1+
#include "source/extensions/load_balancing_policies/common/orca_weight_manager.h"
2+
3+
#include <algorithm>
4+
#include <cstdint>
5+
#include <limits>
6+
#include <memory>
7+
#include <optional>
8+
#include <string>
9+
10+
#include "envoy/common/time.h"
11+
#include "envoy/upstream/upstream.h"
12+
13+
#include "source/common/orca/orca_load_metrics.h"
14+
15+
#include "absl/status/status.h"
16+
#include "xds/data/orca/v3/orca_load_report.pb.h"
17+
18+
namespace Envoy {
19+
namespace Extensions {
20+
namespace LoadBalancingPolicies {
21+
namespace Common {
22+
23+
namespace {
24+
std::string getHostAddress(const Upstream::Host* host) {
25+
if (host == nullptr || host->address() == nullptr) {
26+
return "unknown";
27+
}
28+
return host->address()->asString();
29+
}
30+
} // namespace
31+
32+
// OrcaLoadReportHandler
33+
34+
OrcaLoadReportHandler::OrcaLoadReportHandler(const OrcaWeightManagerConfig& config,
35+
TimeSource& time_source)
36+
: metric_names_for_computing_utilization_(config.metric_names_for_computing_utilization),
37+
error_utilization_penalty_(config.error_utilization_penalty), time_source_(time_source) {}
38+
39+
double OrcaLoadReportHandler::getUtilizationFromOrcaReport(
40+
const OrcaLoadReportProto& orca_load_report,
41+
const std::vector<std::string>& metric_names_for_computing_utilization) {
42+
// If application_utilization is valid, use it as the utilization metric.
43+
double utilization = orca_load_report.application_utilization();
44+
if (utilization > 0) {
45+
return utilization;
46+
}
47+
// Otherwise, find the most constrained utilization metric.
48+
utilization =
49+
Envoy::Orca::getMaxUtilization(metric_names_for_computing_utilization, orca_load_report);
50+
if (utilization > 0) {
51+
return utilization;
52+
}
53+
// If utilization is <= 0, use cpu_utilization.
54+
return orca_load_report.cpu_utilization();
55+
}
56+
57+
absl::StatusOr<uint32_t> OrcaLoadReportHandler::calculateWeightFromOrcaReport(
58+
const OrcaLoadReportProto& orca_load_report,
59+
const std::vector<std::string>& metric_names_for_computing_utilization,
60+
double error_utilization_penalty) {
61+
double qps = orca_load_report.rps_fractional();
62+
if (qps <= 0) {
63+
return absl::InvalidArgumentError("QPS must be positive");
64+
}
65+
66+
double utilization =
67+
getUtilizationFromOrcaReport(orca_load_report, metric_names_for_computing_utilization);
68+
// If there are errors, then increase utilization to lower the weight.
69+
utilization += error_utilization_penalty * orca_load_report.eps() / qps;
70+
71+
if (utilization <= 0) {
72+
return absl::InvalidArgumentError("Utilization must be positive");
73+
}
74+
75+
// Calculate the weight.
76+
double weight = qps / utilization;
77+
78+
// Limit the weight to uint32_t max.
79+
if (weight > std::numeric_limits<uint32_t>::max()) {
80+
weight = std::numeric_limits<uint32_t>::max();
81+
}
82+
return weight;
83+
}
84+
85+
absl::Status OrcaLoadReportHandler::updateClientSideDataFromOrcaLoadReport(
86+
const OrcaLoadReportProto& orca_load_report, OrcaHostLbPolicyData& client_side_data) {
87+
const absl::StatusOr<uint32_t> weight = calculateWeightFromOrcaReport(
88+
orca_load_report, metric_names_for_computing_utilization_, error_utilization_penalty_);
89+
if (!weight.ok()) {
90+
return weight.status();
91+
}
92+
93+
client_side_data.updateWeightNow(weight.value(), time_source_.monotonicTime());
94+
return absl::OkStatus();
95+
}
96+
97+
// OrcaHostLbPolicyData
98+
99+
absl::Status OrcaHostLbPolicyData::onOrcaLoadReport(const Upstream::OrcaLoadReport& report,
100+
const StreamInfo::StreamInfo&) {
101+
ASSERT(report_handler_ != nullptr);
102+
return report_handler_->updateClientSideDataFromOrcaLoadReport(report, *this);
103+
}
104+
105+
// OrcaWeightManager
106+
107+
OrcaWeightManager::OrcaWeightManager(const OrcaWeightManagerConfig& config,
108+
const Upstream::PrioritySet& priority_set,
109+
TimeSource& time_source, Event::Dispatcher& dispatcher,
110+
std::function<void()> on_weights_updated)
111+
: priority_set_(priority_set), time_source_(time_source),
112+
blackout_period_(config.blackout_period),
113+
weight_expiration_period_(config.weight_expiration_period),
114+
weight_update_period_(config.weight_update_period),
115+
on_weights_updated_(std::move(on_weights_updated)) {
116+
report_handler_ = std::make_shared<OrcaLoadReportHandler>(config, time_source_);
117+
weight_calculation_timer_ = dispatcher.createTimer([this]() -> void {
118+
updateWeightsOnMainThread();
119+
weight_calculation_timer_->enableTimer(weight_update_period_);
120+
});
121+
}
122+
123+
absl::Status OrcaWeightManager::initialize() {
124+
// Ensure that all hosts have LB policy data.
125+
for (const Upstream::HostSetPtr& host_set : priority_set_.hostSetsPerPriority()) {
126+
addLbPolicyDataToHosts(host_set->hosts());
127+
}
128+
129+
// Setup a callback to receive priority set updates.
130+
priority_update_cb_ = priority_set_.addPriorityUpdateCb(
131+
[this](uint32_t, const Upstream::HostVector& hosts_added, const Upstream::HostVector&) {
132+
addLbPolicyDataToHosts(hosts_added);
133+
updateWeightsOnMainThread();
134+
});
135+
136+
weight_calculation_timer_->enableTimer(weight_update_period_);
137+
138+
return absl::OkStatus();
139+
}
140+
141+
void OrcaWeightManager::updateWeightsOnMainThread() {
142+
ENVOY_LOG(trace, "updateWeightsOnMainThread");
143+
bool updated = false;
144+
for (const Upstream::HostSetPtr& host_set : priority_set_.hostSetsPerPriority()) {
145+
updated = updateWeightsOnHosts(host_set->hosts()) || updated;
146+
}
147+
if (updated) {
148+
on_weights_updated_();
149+
}
150+
}
151+
152+
bool OrcaWeightManager::updateWeightsOnHosts(const Upstream::HostVector& hosts) {
153+
std::vector<uint32_t> weights;
154+
Upstream::HostVector hosts_with_default_weight;
155+
bool weights_updated = false;
156+
const MonotonicTime now = time_source_.monotonicTime();
157+
const MonotonicTime max_non_empty_since = now - blackout_period_;
158+
const MonotonicTime min_last_update_time = now - weight_expiration_period_;
159+
weights.reserve(hosts.size());
160+
hosts_with_default_weight.reserve(hosts.size());
161+
ENVOY_LOG(trace, "updateWeights hosts.size() = {}, time since epoch = {}", hosts.size(),
162+
now.time_since_epoch().count());
163+
for (const auto& host_ptr : hosts) {
164+
absl::optional<uint32_t> client_side_weight =
165+
getWeightIfValidFromHost(*host_ptr, max_non_empty_since, min_last_update_time);
166+
if (client_side_weight.has_value()) {
167+
const uint32_t new_weight = client_side_weight.value();
168+
weights.push_back(new_weight);
169+
if (new_weight != host_ptr->weight()) {
170+
host_ptr->weight(new_weight);
171+
ENVOY_LOG(trace, "updateWeights hostWeight {} = {}", getHostAddress(host_ptr.get()),
172+
host_ptr->weight());
173+
weights_updated = true;
174+
}
175+
} else {
176+
hosts_with_default_weight.push_back(host_ptr);
177+
}
178+
}
179+
if (!hosts_with_default_weight.empty()) {
180+
uint32_t default_weight = 1;
181+
if (!weights.empty()) {
182+
const auto median_it = weights.begin() + weights.size() / 2;
183+
std::nth_element(weights.begin(), median_it, weights.end());
184+
if (weights.size() % 2 == 1) {
185+
default_weight = *median_it;
186+
} else {
187+
const auto lower_median_it = std::max_element(weights.begin(), median_it);
188+
default_weight = static_cast<uint32_t>(
189+
(static_cast<uint64_t>(*lower_median_it) + static_cast<uint64_t>(*median_it)) / 2);
190+
}
191+
}
192+
for (const auto& host_ptr : hosts_with_default_weight) {
193+
if (default_weight != host_ptr->weight()) {
194+
host_ptr->weight(default_weight);
195+
ENVOY_LOG(trace, "updateWeights default hostWeight {} = {}", getHostAddress(host_ptr.get()),
196+
host_ptr->weight());
197+
weights_updated = true;
198+
}
199+
}
200+
}
201+
return weights_updated;
202+
}
203+
204+
void OrcaWeightManager::addLbPolicyDataToHosts(const Upstream::HostVector& hosts) {
205+
for (const auto& host_ptr : hosts) {
206+
if (!host_ptr->lbPolicyData().has_value()) {
207+
ENVOY_LOG(trace, "Adding LB policy data to Host {}", getHostAddress(host_ptr.get()));
208+
host_ptr->setLbPolicyData(std::make_unique<OrcaHostLbPolicyData>(report_handler_));
209+
}
210+
}
211+
}
212+
213+
absl::optional<uint32_t>
214+
OrcaWeightManager::getWeightIfValidFromHost(const Upstream::Host& host,
215+
MonotonicTime max_non_empty_since,
216+
MonotonicTime min_last_update_time) {
217+
auto client_side_data = host.typedLbPolicyData<OrcaHostLbPolicyData>();
218+
if (!client_side_data.has_value()) {
219+
ENVOY_LOG_MISC(trace, "Host does not have OrcaHostLbPolicyData {}", getHostAddress(&host));
220+
return std::nullopt;
221+
}
222+
return client_side_data->getWeightIfValid(max_non_empty_since, min_last_update_time);
223+
}
224+
225+
} // namespace Common
226+
} // namespace LoadBalancingPolicies
227+
} // namespace Extensions
228+
} // namespace Envoy

0 commit comments

Comments
 (0)