-
Notifications
You must be signed in to change notification settings - Fork 1
ROX-31430: delegate TLS to host implementation #168
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,57 +1,25 @@ | ||
| use std::{fs::read_to_string, path::Path, sync::Arc, time::Duration}; | ||
| use std::{sync::Arc, time::Duration}; | ||
|
|
||
| use anyhow::bail; | ||
| use anyhow::{bail, Context}; | ||
| use fact_api::file_activity_service_client::FileActivityServiceClient; | ||
| use hyper_tls::HttpsConnector; | ||
| use hyper_util::client::legacy::connect::HttpConnector; | ||
| use log::{debug, info, warn}; | ||
| use native_tls::{Certificate, Identity}; | ||
| use openssl::{ec::EcKey, pkey::PKey}; | ||
| use tokio::{ | ||
| fs, | ||
| sync::{broadcast, watch}, | ||
| time::sleep, | ||
| }; | ||
| use tokio_stream::{ | ||
| wrappers::{errors::BroadcastStreamRecvError, BroadcastStream}, | ||
| StreamExt, | ||
| }; | ||
| use tonic::{ | ||
| metadata::MetadataValue, | ||
| service::Interceptor, | ||
| transport::{Certificate, Channel, ClientTlsConfig, Endpoint, Identity}, | ||
| }; | ||
| use tonic::transport::Channel; | ||
|
|
||
| use crate::{config::GrpcConfig, event::Event, metrics::EventCounter}; | ||
|
|
||
| struct Certs { | ||
| pub ca: Certificate, | ||
| pub identity: Identity, | ||
| } | ||
|
|
||
| impl TryFrom<&Path> for Certs { | ||
| type Error = anyhow::Error; | ||
|
|
||
| fn try_from(path: &Path) -> Result<Self, Self::Error> { | ||
| let ca = read_to_string(path.join("ca.pem"))?; | ||
| let ca = Certificate::from_pem(ca); | ||
| let cert = read_to_string(path.join("cert.pem"))?; | ||
| let key = read_to_string(path.join("key.pem"))?; | ||
| let identity = Identity::from_pem(cert, key); | ||
|
|
||
| Ok(Self { ca, identity }) | ||
| } | ||
| } | ||
|
|
||
| struct UserAgentInterceptor {} | ||
|
|
||
| impl Interceptor for UserAgentInterceptor { | ||
| fn call( | ||
| &mut self, | ||
| mut request: tonic::Request<()>, | ||
| ) -> Result<tonic::Request<()>, tonic::Status> { | ||
| request | ||
| .metadata_mut() | ||
| .insert("user-agent", MetadataValue::from_static("Rox SFA Agent")); | ||
| Ok(request) | ||
| } | ||
| } | ||
|
|
||
| pub struct Client { | ||
| rx: broadcast::Receiver<Arc<Event>>, | ||
| running: watch::Receiver<bool>, | ||
|
|
@@ -89,46 +57,84 @@ impl Client { | |
| info!("Stopping gRPC output..."); | ||
| break; | ||
| } | ||
| Err(e) => warn!("gRPC error: {e}"), | ||
| Err(e) => warn!("gRPC error: {e:?}"), | ||
| } | ||
| } | ||
| }); | ||
| } | ||
|
|
||
| fn create_channel(&self) -> anyhow::Result<Endpoint> { | ||
| let config = self.config.borrow(); | ||
| let Some(url) = config.url() else { | ||
| bail!("Attempting to run gRPC client with no URL"); | ||
| async fn get_connector(&self) -> anyhow::Result<Option<HttpsConnector<HttpConnector>>> { | ||
| let certs = { | ||
| let config = self.config.borrow(); | ||
| let Some(certs) = config.certs() else { | ||
| return Ok(None); | ||
| }; | ||
| certs.to_owned() | ||
| }; | ||
| let (ca, cert, key) = tokio::try_join!( | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do I see it correctly, that
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's not needed per se, we could very easily do something like: let ca = fs::read(certs.join("ca.pem")).await?;
let cert = fs::read(certs.join("cert.pem")).await?;
let key = fs::read(certs.join("key.pem")).await?;Do note that doing it this way, if we were to fail reading
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe I'm missing something, but my question is mostly why any kind of async is even required here.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The way We could make the entire
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If the question is why we weren't doing that already, the answer would be that we were doing it wrong (my bad).
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
And what's the problem with that? I guess at the bootstrap the concurrency is not yet needed, and if it would lead to a more straightforward implementation, that sounds like win for me.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The problem is we have hot-realoading implemented, so the grpc client could block a thread at any point in time, not just when it starts up. We can do either, I think the async approach is more correct while being only slightly more complicated.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So, you're saying the hot-reloading implementation doesn't stop the grpc service for the time of reloading? I need to look at it again, but I was assuming when reloading the configuration, only the bpf receiving service has to be active to not loose events.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When a change in configuration that requires the grpc client to restart it will attempt to read the certificates again, this is done sequentially in a single task in a loop. If reading the certificates blocks the thread the task it is running from, the grpc client will work correctly but it is potentially stealing away compute time from other tasks (like the BPF worker, the configuration reloader, etc...). This is because the threads used by all tasks are shared and there is a userspace scheduler in charge of handling what tasks get assigned to what threads in a collaborative manner, if we block a thread forcefully, we will mess with this in ways that may be detrimental to the program. In the big picture, it is probably not that big a deal because we are only reading a few small files once in a full moon, but why risk it? |
||
| fs::read(certs.join("ca.pem")), | ||
| fs::read(certs.join("cert.pem")), | ||
| fs::read(certs.join("key.pem")), | ||
| )?; | ||
| let ca = Certificate::from_pem(&ca).context("Failed to parse CA")?; | ||
|
|
||
| // The key is in PKCS#1 format using EC algorithm, we need it | ||
| // in PKCS#8 format for native-tls, so we convert it here | ||
| let key = EcKey::private_key_from_pem(&key)?; | ||
| let key = PKey::from_ec_key(key)?; | ||
| let key = key.private_key_to_pem_pkcs8()?; | ||
|
|
||
| let id = Identity::from_pkcs8(&cert, &key).context("Failed to create TLS identity")?; | ||
| let connector = native_tls::TlsConnector::builder() | ||
| .add_root_certificate(ca) | ||
| .identity(id) | ||
| .request_alpns(&["h2"]) | ||
| .build()?; | ||
| let connector = tokio_native_tls::TlsConnector::from(connector); | ||
|
|
||
| // Wrap the TLS connector into the final HTTPs connector | ||
| let mut http = HttpConnector::new(); | ||
| http.enforce_http(false); | ||
| let mut connector = HttpsConnector::from((http, connector)); | ||
| connector.https_only(true); | ||
|
|
||
| Ok(Some(connector)) | ||
| } | ||
|
|
||
| async fn create_channel( | ||
| &self, | ||
| connector: Option<HttpsConnector<HttpConnector>>, | ||
| ) -> anyhow::Result<Channel> { | ||
| let url = match self.config.borrow().url() { | ||
| Some(url) => url.to_string(), | ||
| None => bail!("Attempting to run gRPC client with no URL"), | ||
| }; | ||
| let channel = Channel::from_shared(url)?; | ||
| let channel = match connector { | ||
| Some(connector) => channel.connect_with_connector(connector).await?, | ||
| None => { | ||
| warn!("Using unencrypted gRPC channel"); | ||
| channel.connect().await? | ||
| } | ||
| }; | ||
| let url = url.to_string(); | ||
| let certs = config.certs().map(Certs::try_from).transpose()?; | ||
| let mut channel = Channel::from_shared(url)?; | ||
| if let Some(certs) = certs { | ||
| let tls = ClientTlsConfig::new() | ||
| .domain_name("sensor.stackrox.svc") | ||
| .ca_certificate(certs.ca.clone()) | ||
| .identity(certs.identity.clone()); | ||
| channel = channel.tls_config(tls)?; | ||
| } | ||
| Ok(channel) | ||
| } | ||
|
|
||
| async fn run(&mut self) -> anyhow::Result<bool> { | ||
| let channel = self.create_channel()?; | ||
| let connector = self.get_connector().await?; | ||
| loop { | ||
| info!("Attempting to connect to gRPC server..."); | ||
| let channel = match channel.connect().await { | ||
| let channel = match self.create_channel(connector.clone()).await { | ||
| Ok(channel) => channel, | ||
| Err(e) => { | ||
| debug!("Failed to connect to server: {e}"); | ||
| sleep(Duration::new(1, 0)).await; | ||
| debug!("Failed to connect to server: {e:?}"); | ||
| sleep(Duration::from_secs(1)).await; | ||
| continue; | ||
| } | ||
| }; | ||
| info!("Successfully connected to gRPC server"); | ||
|
|
||
| let mut client = | ||
| FileActivityServiceClient::with_interceptor(channel, UserAgentInterceptor {}); | ||
| let mut client = FileActivityServiceClient::new(channel); | ||
|
|
||
| let metrics = self.metrics.clone(); | ||
| let rx = | ||
|
|
@@ -149,7 +155,7 @@ impl Client { | |
| res = client.communicate(rx) => { | ||
| match res { | ||
| Ok(_) => info!("gRPC stream ended"), | ||
| Err(e) => warn!("gRPC stream error: {e}"), | ||
| Err(e) => warn!("gRPC stream error: {e:?}"), | ||
| } | ||
| } | ||
| _ = self.config.changed() => return Ok(true), | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This part I don't follow,
certscould just beNoneand passed further?Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, the
let-elseclause is making it so ifconfig.certs()isNonethen we go through the else branch and returnOk(None), the rest of the method is skipped.https://doc.rust-lang.org/rust-by-example/flow_control/let_else.html
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, so the whole call chain is just short cut with
None.