-
Notifications
You must be signed in to change notification settings - Fork 2
feat(k8s): add kubernetes metrics source #66
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -677,12 +677,19 @@ async fn get_flusher( | |
| if let Some(endpoint_url) = endpoint_url { | ||
| // Http endpoint used by Pulse | ||
| let auth_token = env::var("MEZMO_LOCAL_DEPLOY_AUTH_TOKEN").ok(); | ||
| let headers = if let Some(token) = auth_token { | ||
| let mut headers = if let Some(token) = auth_token { | ||
| HashMap::from([("Authorization".into(), format!("Token {token}"))]) | ||
| } else { | ||
| return Err(MetricsPublishingError::AuthNotSetError); | ||
| }; | ||
|
|
||
| if let Some(extra) = env::var("MEZMO_REMOTE_TASK_EXTRA_HEADERS") | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Actually, if we don't expect the value of |
||
| .ok() | ||
| .and_then(|v| serde_json::from_str::<HashMap<String, String>>(&v).ok()) | ||
| { | ||
| headers.extend(extra); | ||
| } | ||
|
|
||
| return Ok(Arc::new(HttpFlusher::new( | ||
| &pod_name, | ||
| endpoint_url, | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -48,6 +48,7 @@ pub(crate) async fn start_polling_for_tasks( | |
| auth_token: String, | ||
| get_endpoint_url: String, | ||
| post_endpoint_url: String, | ||
| extra_headers: HashMap<String, String>, | ||
| #[cfg(test)] max_iterations: Option<usize>, // for testing only, set to 0 for infinite loop | ||
| ) { | ||
| let task_initial_pool_delay = Duration::from_secs(mezmo_env_config!( | ||
|
|
@@ -87,6 +88,7 @@ pub(crate) async fn start_polling_for_tasks( | |
| &auth_token, | ||
| &get_endpoint_url, | ||
| &post_endpoint_url, | ||
| &extra_headers, | ||
| ); | ||
|
|
||
| if let Err(_) = tokio::time::timeout(task_execution_timeout, task_fut).await { | ||
|
|
@@ -117,8 +119,9 @@ async fn run_task_step( | |
| auth_token: &str, | ||
| get_endpoint_url: &str, | ||
| post_endpoint_url: &str, | ||
| extra_headers: &HashMap<String, String>, | ||
| ) { | ||
| let tasks = fetch_tasks(client, auth_token, get_endpoint_url) | ||
| let tasks = fetch_tasks(client, auth_token, get_endpoint_url, extra_headers) | ||
| .await | ||
| .unwrap_or_else(|e| { | ||
| warn!("Remote task fetch failed: {e}"); | ||
|
|
@@ -141,7 +144,15 @@ async fn run_task_step( | |
| } | ||
|
|
||
| let results = execute_task(&t, config).await; | ||
| if let Err(e) = post_task_results(client, auth_token, post_endpoint_url, &t, &results).await | ||
| if let Err(e) = post_task_results( | ||
| client, | ||
| auth_token, | ||
| post_endpoint_url, | ||
| &t, | ||
| &results, | ||
| extra_headers, | ||
| ) | ||
| .await | ||
| { | ||
| warn!( | ||
| "There was an error when posting task results for {}: {}", | ||
|
|
@@ -209,7 +220,11 @@ impl FromStr for TaskType { | |
| } | ||
| } | ||
|
|
||
| fn gen_headers(auth_token: &str, method: Method) -> header::HeaderMap { | ||
| fn gen_headers( | ||
| auth_token: &str, | ||
| method: Method, | ||
| extra_headers: &HashMap<String, String>, | ||
| ) -> header::HeaderMap { | ||
| let mut headers = header::HeaderMap::new(); | ||
| headers.insert(header::USER_AGENT, HeaderValue::from_static("Mezmo Pulse")); | ||
| match method { | ||
|
|
@@ -228,6 +243,14 @@ fn gen_headers(auth_token: &str, method: Method) -> header::HeaderMap { | |
| header::AUTHORIZATION, | ||
| HeaderValue::from_str(&format!("Token {auth_token}")).unwrap(), | ||
| ); | ||
| for (k, v) in extra_headers { | ||
| if let (Ok(name), Ok(value)) = ( | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we want log something here? This quietly eats any headers that cannot be represented as |
||
| header::HeaderName::from_bytes(k.as_bytes()), | ||
| HeaderValue::from_str(v), | ||
| ) { | ||
| headers.insert(name, value); | ||
| } | ||
| } | ||
|
|
||
| headers | ||
| } | ||
|
|
@@ -236,10 +259,11 @@ async fn fetch_tasks( | |
| client: &Client, | ||
| auth_token: &str, | ||
| endpoint_url: &str, | ||
| extra_headers: &HashMap<String, String>, | ||
| ) -> Result<Vec<Task>, Err> { | ||
| let resp = client | ||
| .get(endpoint_url) | ||
| .headers(gen_headers(auth_token, Method::GET)) | ||
| .headers(gen_headers(auth_token, Method::GET, extra_headers)) | ||
| .send() | ||
| .await | ||
| .map_err(|e| format!("Connection error: {e}"))?; | ||
|
|
@@ -262,13 +286,14 @@ async fn post_task_results( | |
| endpoint_url: &str, | ||
| task: &Task, | ||
| results: &Result<TaskResult, Err>, | ||
| extra_headers: &HashMap<String, String>, | ||
| ) -> Result<(), Err> { | ||
| let endpoint_url = endpoint_url.replace(":task_id", &task.task_id); | ||
|
|
||
| let resp = client | ||
| .post(&endpoint_url) | ||
| .json(&results.to_json()) | ||
| .headers(gen_headers(auth_token, Method::POST)) | ||
| .headers(gen_headers(auth_token, Method::POST, extra_headers)) | ||
| .send() | ||
| .await | ||
| .map_err(|e| format!("Connection error: {e}"))?; | ||
|
|
@@ -448,7 +473,15 @@ mod tests { | |
| let post_url = format!("http://{}{}", server.addr(), post_path); | ||
| let client = Client::new(); | ||
|
|
||
| run_task_step(&Default::default(), &client, "token", &get_url, &post_url).await; | ||
| run_task_step( | ||
| &Default::default(), | ||
| &client, | ||
| "token", | ||
| &get_url, | ||
| &post_url, | ||
| &HashMap::new(), | ||
| ) | ||
| .await; | ||
| } | ||
|
|
||
| #[tokio::test] | ||
|
|
@@ -498,7 +531,15 @@ mod tests { | |
| let post_url = format!("http://{}{}", server.addr(), post_path); | ||
| let client = Client::new(); | ||
|
|
||
| run_task_step(&Default::default(), &client, "token", &get_url, &post_url).await; | ||
| run_task_step( | ||
| &Default::default(), | ||
| &client, | ||
| "token", | ||
| &get_url, | ||
| &post_url, | ||
| &HashMap::new(), | ||
| ) | ||
| .await; | ||
| } | ||
|
|
||
| #[assay( | ||
|
|
@@ -535,6 +576,7 @@ mod tests { | |
| String::from("token"), | ||
| get_url, | ||
| unused_post_url, | ||
| HashMap::new(), | ||
| Some(1), | ||
| ) | ||
| .await; | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,78 @@ | ||
| use serde::{Deserialize, Serialize}; | ||
|
|
||
| #[derive(Serialize, Deserialize)] | ||
| pub struct ClusterStats { | ||
| pub resource: String, | ||
| pub r#type: String, | ||
| pub containers_init: u32, | ||
| pub containers_ready: u32, | ||
| pub containers_running: u32, | ||
| pub containers_terminated: u32, | ||
| pub containers_total: u32, | ||
| pub containers_waiting: u32, | ||
| #[serde(skip_serializing_if = "Option::is_none")] | ||
| pub cpu_allocatable: Option<u32>, | ||
| #[serde(skip_serializing_if = "Option::is_none")] | ||
| pub cpu_capacity: Option<u32>, | ||
| #[serde(skip_serializing_if = "Option::is_none")] | ||
| pub cpu_usage: Option<u32>, | ||
| #[serde(skip_serializing_if = "Option::is_none")] | ||
| pub memory_allocatable: Option<u64>, | ||
| #[serde(skip_serializing_if = "Option::is_none")] | ||
| pub memory_capacity: Option<u64>, | ||
| #[serde(skip_serializing_if = "Option::is_none")] | ||
| pub memory_usage: Option<u64>, | ||
| pub nodes_notready: u32, | ||
| pub nodes_ready: u32, | ||
| pub nodes_total: u32, | ||
| pub nodes_unschedulable: u64, | ||
| #[serde(skip_serializing_if = "Option::is_none")] | ||
| pub pods_allocatable: Option<u64>, | ||
| #[serde(skip_serializing_if = "Option::is_none")] | ||
| pub pods_capacity: Option<u64>, | ||
| pub pods_failed: u32, | ||
| pub pods_pending: u32, | ||
| pub pods_running: u32, | ||
| pub pods_succeeded: u32, | ||
| pub pods_unknown: u32, | ||
| pub pods_total: u32, | ||
| } | ||
|
|
||
| impl Default for ClusterStats { | ||
| fn default() -> Self { | ||
| Self::new() | ||
| } | ||
| } | ||
|
|
||
| impl ClusterStats { | ||
| pub fn new() -> ClusterStats { | ||
| ClusterStats { | ||
| containers_init: 0, | ||
| containers_ready: 0, | ||
| containers_running: 0, | ||
| containers_terminated: 0, | ||
| containers_total: 0, | ||
| containers_waiting: 0, | ||
| cpu_allocatable: None, | ||
| cpu_capacity: None, | ||
| cpu_usage: None, | ||
| memory_allocatable: None, | ||
| memory_capacity: None, | ||
| memory_usage: None, | ||
| nodes_notready: 0, | ||
| nodes_ready: 0, | ||
| nodes_total: 0, | ||
| nodes_unschedulable: 0, | ||
| pods_allocatable: None, | ||
| pods_capacity: None, | ||
| pods_failed: 0, | ||
| pods_pending: 0, | ||
| pods_running: 0, | ||
| pods_succeeded: 0, | ||
| pods_unknown: 0, | ||
| pods_total: 0, | ||
| resource: "cluster".to_string(), | ||
| r#type: "metric".to_string(), | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we make
headersalready aHeaderMaphere? I understand this is a more foundational change.Stringis case-sensitive,HeaderMap'sHeaderNameis case-insensitive (as per the HTTP spec).Regardless of that:
I'd like to suggest the following change: