Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@ mock_instant = { version = "0.6" }
serial_test = { version = "3.2" }

[dependencies]
anyhow.workspace = true
cfg-if.workspace = true
clap.workspace = true
indoc.workspace = true
Expand Down Expand Up @@ -680,6 +681,7 @@ sources-logs-mezmo = [
"sources-fluent",
"sources-http_server",
"sources-http_client",
"sources-mezmo_kubernetes_metrics",
"sources-mezmo_pipeline_state_variable_change",
"sources-mezmo_user_logs",
"sources-kafka",
Expand Down Expand Up @@ -734,6 +736,7 @@ sources-host_metrics = ["heim/cpu", "heim/host", "heim/memory", "heim/net"]
sources-http_client = ["sources-utils-http-client"]
sources-http_server = ["sources-utils-http", "sources-utils-http-headers", "sources-utils-http-query"]
sources-internal_logs = []
sources-mezmo_kubernetes_metrics = ["kubernetes"]
sources-mezmo_pipeline_state_variable_change = []
sources-mezmo_user_logs = ["sources-internal_logs"]
sources-internal_metrics = []
Expand Down
9 changes: 8 additions & 1 deletion lib/vector-core/src/usage_metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -677,12 +677,19 @@ async fn get_flusher(
if let Some(endpoint_url) = endpoint_url {
// Http endpoint used by Pulse
let auth_token = env::var("MEZMO_LOCAL_DEPLOY_AUTH_TOKEN").ok();
let headers = if let Some(token) = auth_token {
let mut headers = if let Some(token) = auth_token {
HashMap::from([("Authorization".into(), format!("Token {token}"))])
} else {
return Err(MetricsPublishingError::AuthNotSetError);
};

if let Some(extra) = env::var("MEZMO_REMOTE_TASK_EXTRA_HEADERS")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make headers already a HeaderMap here? I understand this is a more foundational change. String is case-sensitive, HeaderMap's HeaderName is case-insensitive (as per the HTTP spec).

Regardless of that:

I'd like to suggest the following change:

        let mut headers = env::var("MEZMO_REMOTE_TASK_EXTRA_HEADERS")
            .ok()
            .and_then(|v| serde_json::from_str::<HashMap<String, String>>(&v).ok())
            .unwrap_or_default();

        // Option 1: if `MEZMO_REMOTE_TASK_EXTRA_HEADERS` provides `Authorization:`, DON'T overwrite
        {
            // note that if we'd switch to hashbrown, we can delay the `to_owned()` until it's actually needed to be created. 
            headers
                .entry("Authorization".to_owned())
                .or_insert_with(|| format!("Token {token}"));
        }

        // Option 2: if we want our `Authorization:` to ALWAYS win:
        {
            headers.insert("Authorization".into(), format!("Token {token}"));
        }

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, if we don't expect the value of MEZMO_REMOTE_TASK_EXTRA_HEADERS to change during the runtime of the application, we might want to use a LazyLock to create the hashmap once, and then clone it each time this call happens. That way we skip over the deserialization step.

.ok()
.and_then(|v| serde_json::from_str::<HashMap<String, String>>(&v).ok())
{
headers.extend(extra);
}

return Ok(Arc::new(HttpFlusher::new(
&pod_name,
endpoint_url,
Expand Down
8 changes: 8 additions & 0 deletions src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,7 @@ fn start_remote_task_execution(
runtime: &Runtime,
_config: &ApplicationConfig,
) -> Result<(), ExitCode> {
use serde_json;
use std::env;

#[cfg(feature = "api")]
Expand All @@ -314,6 +315,11 @@ fn start_remote_task_execution(
if let Some(auth_token) = auth_token {
let get_endpoint_url = env::var("MEZMO_TASKS_FETCH_ENDPOINT_URL").ok();
let post_endpoint_url = env::var("MEZMO_TASKS_POST_ENDPOINT_URL").ok();
let extra_headers: std::collections::HashMap<String, String> =
env::var("MEZMO_REMOTE_TASK_EXTRA_HEADERS")
.ok()
.and_then(|v| serde_json::from_str(&v).ok())
.unwrap_or_default();
match (get_endpoint_url, post_endpoint_url) {
(Some(get_endpoint_url), Some(post_endpoint_url)) => {
if !api_config.enabled {
Expand All @@ -328,6 +334,7 @@ fn start_remote_task_execution(
auth_token,
get_endpoint_url,
post_endpoint_url,
extra_headers,
Some(1),
)
.await;
Expand All @@ -340,6 +347,7 @@ fn start_remote_task_execution(
auth_token,
get_endpoint_url,
post_endpoint_url,
extra_headers,
)
.await;
});
Expand Down
56 changes: 49 additions & 7 deletions src/mezmo/remote_task_execution/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ pub(crate) async fn start_polling_for_tasks(
auth_token: String,
get_endpoint_url: String,
post_endpoint_url: String,
extra_headers: HashMap<String, String>,
#[cfg(test)] max_iterations: Option<usize>, // for testing only, set to 0 for infinite loop
) {
let task_initial_pool_delay = Duration::from_secs(mezmo_env_config!(
Expand Down Expand Up @@ -87,6 +88,7 @@ pub(crate) async fn start_polling_for_tasks(
&auth_token,
&get_endpoint_url,
&post_endpoint_url,
&extra_headers,
);

if let Err(_) = tokio::time::timeout(task_execution_timeout, task_fut).await {
Expand Down Expand Up @@ -117,8 +119,9 @@ async fn run_task_step(
auth_token: &str,
get_endpoint_url: &str,
post_endpoint_url: &str,
extra_headers: &HashMap<String, String>,
) {
let tasks = fetch_tasks(client, auth_token, get_endpoint_url)
let tasks = fetch_tasks(client, auth_token, get_endpoint_url, extra_headers)
.await
.unwrap_or_else(|e| {
warn!("Remote task fetch failed: {e}");
Expand All @@ -141,7 +144,15 @@ async fn run_task_step(
}

let results = execute_task(&t, config).await;
if let Err(e) = post_task_results(client, auth_token, post_endpoint_url, &t, &results).await
if let Err(e) = post_task_results(
client,
auth_token,
post_endpoint_url,
&t,
&results,
extra_headers,
)
.await
{
warn!(
"There was an error when posting task results for {}: {}",
Expand Down Expand Up @@ -209,7 +220,11 @@ impl FromStr for TaskType {
}
}

fn gen_headers(auth_token: &str, method: Method) -> header::HeaderMap {
fn gen_headers(
auth_token: &str,
method: Method,
extra_headers: &HashMap<String, String>,
) -> header::HeaderMap {
let mut headers = header::HeaderMap::new();
headers.insert(header::USER_AGENT, HeaderValue::from_static("Mezmo Pulse"));
match method {
Expand All @@ -228,6 +243,14 @@ fn gen_headers(auth_token: &str, method: Method) -> header::HeaderMap {
header::AUTHORIZATION,
HeaderValue::from_str(&format!("Token {auth_token}")).unwrap(),
);
for (k, v) in extra_headers {
if let (Ok(name), Ok(value)) = (
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want log something here? This quietly eats any headers that cannot be represented as HeaderName / HeaderValue.

header::HeaderName::from_bytes(k.as_bytes()),
HeaderValue::from_str(v),
) {
headers.insert(name, value);
}
}

headers
}
Expand All @@ -236,10 +259,11 @@ async fn fetch_tasks(
client: &Client,
auth_token: &str,
endpoint_url: &str,
extra_headers: &HashMap<String, String>,
) -> Result<Vec<Task>, Err> {
let resp = client
.get(endpoint_url)
.headers(gen_headers(auth_token, Method::GET))
.headers(gen_headers(auth_token, Method::GET, extra_headers))
.send()
.await
.map_err(|e| format!("Connection error: {e}"))?;
Expand All @@ -262,13 +286,14 @@ async fn post_task_results(
endpoint_url: &str,
task: &Task,
results: &Result<TaskResult, Err>,
extra_headers: &HashMap<String, String>,
) -> Result<(), Err> {
let endpoint_url = endpoint_url.replace(":task_id", &task.task_id);

let resp = client
.post(&endpoint_url)
.json(&results.to_json())
.headers(gen_headers(auth_token, Method::POST))
.headers(gen_headers(auth_token, Method::POST, extra_headers))
.send()
.await
.map_err(|e| format!("Connection error: {e}"))?;
Expand Down Expand Up @@ -448,7 +473,15 @@ mod tests {
let post_url = format!("http://{}{}", server.addr(), post_path);
let client = Client::new();

run_task_step(&Default::default(), &client, "token", &get_url, &post_url).await;
run_task_step(
&Default::default(),
&client,
"token",
&get_url,
&post_url,
&HashMap::new(),
)
.await;
}

#[tokio::test]
Expand Down Expand Up @@ -498,7 +531,15 @@ mod tests {
let post_url = format!("http://{}{}", server.addr(), post_path);
let client = Client::new();

run_task_step(&Default::default(), &client, "token", &get_url, &post_url).await;
run_task_step(
&Default::default(),
&client,
"token",
&get_url,
&post_url,
&HashMap::new(),
)
.await;
}

#[assay(
Expand Down Expand Up @@ -535,6 +576,7 @@ mod tests {
String::from("token"),
get_url,
unused_post_url,
HashMap::new(),
Some(1),
)
.await;
Expand Down
78 changes: 78 additions & 0 deletions src/sources/mezmo_kubernetes_metrics/kube_stats/cluster_stats.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
use serde::{Deserialize, Serialize};

#[derive(Serialize, Deserialize)]
pub struct ClusterStats {
pub resource: String,
pub r#type: String,
pub containers_init: u32,
pub containers_ready: u32,
pub containers_running: u32,
pub containers_terminated: u32,
pub containers_total: u32,
pub containers_waiting: u32,
#[serde(skip_serializing_if = "Option::is_none")]
pub cpu_allocatable: Option<u32>,
#[serde(skip_serializing_if = "Option::is_none")]
pub cpu_capacity: Option<u32>,
#[serde(skip_serializing_if = "Option::is_none")]
pub cpu_usage: Option<u32>,
#[serde(skip_serializing_if = "Option::is_none")]
pub memory_allocatable: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub memory_capacity: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub memory_usage: Option<u64>,
pub nodes_notready: u32,
pub nodes_ready: u32,
pub nodes_total: u32,
pub nodes_unschedulable: u64,
#[serde(skip_serializing_if = "Option::is_none")]
pub pods_allocatable: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub pods_capacity: Option<u64>,
pub pods_failed: u32,
pub pods_pending: u32,
pub pods_running: u32,
pub pods_succeeded: u32,
pub pods_unknown: u32,
pub pods_total: u32,
}

impl Default for ClusterStats {
fn default() -> Self {
Self::new()
}
}

impl ClusterStats {
pub fn new() -> ClusterStats {
ClusterStats {
containers_init: 0,
containers_ready: 0,
containers_running: 0,
containers_terminated: 0,
containers_total: 0,
containers_waiting: 0,
cpu_allocatable: None,
cpu_capacity: None,
cpu_usage: None,
memory_allocatable: None,
memory_capacity: None,
memory_usage: None,
nodes_notready: 0,
nodes_ready: 0,
nodes_total: 0,
nodes_unschedulable: 0,
pods_allocatable: None,
pods_capacity: None,
pods_failed: 0,
pods_pending: 0,
pods_running: 0,
pods_succeeded: 0,
pods_unknown: 0,
pods_total: 0,
resource: "cluster".to_string(),
r#type: "metric".to_string(),
}
}
}
Loading