Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
258 changes: 175 additions & 83 deletions Cargo.lock

Large diffs are not rendered by default.

7 changes: 6 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,28 @@ clap = { version = "4.5.41", features = ["derive", "env"] }
env_logger = { version = "0.11.5", default-features = false, features = ["humantime"] }
http-body-util = "0.1.3"
hyper = { version = "1.6.0", default-features = false }
hyper-tls = "0.6.0"
hyper-util = { version = "0.1.16", default-features = false }
libc = { version = "0.2.159", default-features = false }
log = { version = "0.4.22", default-features = false }
native-tls = { version = "0.2.14", features = ["alpn"] }
openssl = "0.10.75"
prometheus-client = { version = "0.24.0", default-features = false }
prost = "0.14.0"
prost-types = "0.14.0"
serde = { version = "1.0.219", features = ["derive"] }
serde_json = "1.0.142"
tokio = { version = "1.40.0", default-features = false, features = [
"fs",
"macros",
"rt",
"rt-multi-thread",
"net",
"signal",
] }
tokio-native-tls = "0.3.1"
tokio-stream = { version = "0.1.17", features = ["sync"] }
tonic = { version = "0.14.0", features = ["tls-ring"] }
tonic = { version = "0.14.0" }
tonic-prost = "0.14.0"
tonic-prost-build = "0.14.0"
uuid = { version = "1.17.0", features = ["v4"] }
Expand Down
10 changes: 9 additions & 1 deletion Containerfile
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ FROM quay.io/centos/centos:stream9 AS builder
RUN dnf install --enablerepo=crb -y \
clang \
libbpf-devel \
openssl-devel \
protobuf-compiler \
protobuf-devel && \
curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | \
Expand All @@ -22,7 +23,14 @@ RUN --mount=type=cache,target=/root/.cargo/registry \
cargo build --release && \
cp target/release/fact fact

FROM registry.access.redhat.com/ubi9/ubi-micro:latest
FROM registry.access.redhat.com/ubi9/ubi-minimal:latest

RUN microdnf install -y openssl-libs && \
microdnf clean all && \
rpm --verbose -e --nodeps $( \
rpm -qa 'curl' '*rpm*' '*dnf*' '*libsolv*' '*hawkey*' 'yum*' 'libyaml*' 'libarchive*' \
) && \
rm -rf /var/cache/yum

COPY --from=build /app/fact /usr/local/bin

Expand Down
4 changes: 4 additions & 0 deletions fact/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,15 @@ clap = { workspace = true }
env_logger = { workspace = true }
http-body-util = { workspace = true }
hyper = { workspace = true }
hyper-tls = { workspace = true }
hyper-util = { workspace = true }
libc = { workspace = true }
log = { workspace = true }
native-tls = { workspace = true }
openssl = { workspace = true }
tonic = { workspace = true }
tokio = { workspace = true }
tokio-native-tls = { workspace = true }
tokio-stream = { workspace = true }
prometheus-client = { workspace = true }
prost = { workspace = true }
Expand Down
130 changes: 68 additions & 62 deletions fact/src/output/grpc.rs
Original file line number Diff line number Diff line change
@@ -1,57 +1,25 @@
use std::{fs::read_to_string, path::Path, sync::Arc, time::Duration};
use std::{sync::Arc, time::Duration};

use anyhow::bail;
use anyhow::{bail, Context};
use fact_api::file_activity_service_client::FileActivityServiceClient;
use hyper_tls::HttpsConnector;
use hyper_util::client::legacy::connect::HttpConnector;
use log::{debug, info, warn};
use native_tls::{Certificate, Identity};
use openssl::{ec::EcKey, pkey::PKey};
use tokio::{
fs,
sync::{broadcast, watch},
time::sleep,
};
use tokio_stream::{
wrappers::{errors::BroadcastStreamRecvError, BroadcastStream},
StreamExt,
};
use tonic::{
metadata::MetadataValue,
service::Interceptor,
transport::{Certificate, Channel, ClientTlsConfig, Endpoint, Identity},
};
use tonic::transport::Channel;

use crate::{config::GrpcConfig, event::Event, metrics::EventCounter};

struct Certs {
pub ca: Certificate,
pub identity: Identity,
}

impl TryFrom<&Path> for Certs {
type Error = anyhow::Error;

fn try_from(path: &Path) -> Result<Self, Self::Error> {
let ca = read_to_string(path.join("ca.pem"))?;
let ca = Certificate::from_pem(ca);
let cert = read_to_string(path.join("cert.pem"))?;
let key = read_to_string(path.join("key.pem"))?;
let identity = Identity::from_pem(cert, key);

Ok(Self { ca, identity })
}
}

struct UserAgentInterceptor {}

impl Interceptor for UserAgentInterceptor {
fn call(
&mut self,
mut request: tonic::Request<()>,
) -> Result<tonic::Request<()>, tonic::Status> {
request
.metadata_mut()
.insert("user-agent", MetadataValue::from_static("Rox SFA Agent"));
Ok(request)
}
}

pub struct Client {
rx: broadcast::Receiver<Arc<Event>>,
running: watch::Receiver<bool>,
Expand Down Expand Up @@ -89,46 +57,84 @@ impl Client {
info!("Stopping gRPC output...");
break;
}
Err(e) => warn!("gRPC error: {e}"),
Err(e) => warn!("gRPC error: {e:?}"),
}
}
});
}

fn create_channel(&self) -> anyhow::Result<Endpoint> {
let config = self.config.borrow();
let Some(url) = config.url() else {
bail!("Attempting to run gRPC client with no URL");
async fn get_connector(&self) -> anyhow::Result<Option<HttpsConnector<HttpConnector>>> {
let certs = {
let config = self.config.borrow();
let Some(certs) = config.certs() else {
return Ok(None);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This part I don't follow, certs could just be None and passed further?

Copy link
Contributor Author

@Molter73 Molter73 Jan 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, the let-else clause is making it so if config.certs() is None then we go through the else branch and return Ok(None), the rest of the method is skipped.

https://doc.rust-lang.org/rust-by-example/flow_control/let_else.html

Copy link
Contributor

@erthalion erthalion Jan 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, so the whole call chain is just short cut with None.

};
certs.to_owned()
};
let (ca, cert, key) = tokio::try_join!(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do I see it correctly, that try_join! macro joins concurrent activity? If yes, why is it needed here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not needed per se, we could very easily do something like:

let ca = fs::read(certs.join("ca.pem")).await?;
let cert = fs::read(certs.join("cert.pem")).await?;
let key = fs::read(certs.join("key.pem")).await?;

Do note that doing it this way, if we were to fail reading key.pem we still need to finish reading ca.pem and cert.pem first, the try_join! approach will try all of them concurrently and exit as soon as one of the tasks fails. Since this is not in a hot path though, there's not that much to be gained, I just felt like the macro approach was easier to read and a bit neater honestly.

https://docs.rs/tokio/latest/tokio/macro.try_join.html

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe I'm missing something, but my question is mostly why any kind of async is even required here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The way tokio works is that it uses a thread pool to run tasks concurrently, so any operation that would block a thread could affect how the tasks are run. get_tls_connector is called from an async context (the grpc::Client::run method), so it is better to use async I/O operations here for it to not interfere with the other tasks that are running concurrently.

We could make the entire get_tls_connector method sync by using std::fs::read instead of tokio::fs::read, but the one from std will cause the calling thread to block waiting for the file to be read.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the question is why we weren't doing that already, the answer would be that we were doing it wrong (my bad).

Copy link
Contributor

@erthalion erthalion Jan 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the one from std will cause the calling thread to block waiting for the file to be read.

And what's the problem with that? I guess at the bootstrap the concurrency is not yet needed, and if it would lead to a more straightforward implementation, that sounds like win for me.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem is we have hot-realoading implemented, so the grpc client could block a thread at any point in time, not just when it starts up. We can do either, I think the async approach is more correct while being only slightly more complicated.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, you're saying the hot-reloading implementation doesn't stop the grpc service for the time of reloading? I need to look at it again, but I was assuming when reloading the configuration, only the bpf receiving service has to be active to not loose events.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When a change in configuration that requires the grpc client to restart it will attempt to read the certificates again, this is done sequentially in a single task in a loop. If reading the certificates blocks the thread the task it is running from, the grpc client will work correctly but it is potentially stealing away compute time from other tasks (like the BPF worker, the configuration reloader, etc...). This is because the threads used by all tasks are shared and there is a userspace scheduler in charge of handling what tasks get assigned to what threads in a collaborative manner, if we block a thread forcefully, we will mess with this in ways that may be detrimental to the program.

In the big picture, it is probably not that big a deal because we are only reading a few small files once in a full moon, but why risk it?

fs::read(certs.join("ca.pem")),
fs::read(certs.join("cert.pem")),
fs::read(certs.join("key.pem")),
)?;
let ca = Certificate::from_pem(&ca).context("Failed to parse CA")?;

// The key is in PKCS#1 format using EC algorithm, we need it
// in PKCS#8 format for native-tls, so we convert it here
let key = EcKey::private_key_from_pem(&key)?;
let key = PKey::from_ec_key(key)?;
let key = key.private_key_to_pem_pkcs8()?;

let id = Identity::from_pkcs8(&cert, &key).context("Failed to create TLS identity")?;
let connector = native_tls::TlsConnector::builder()
.add_root_certificate(ca)
.identity(id)
.request_alpns(&["h2"])
.build()?;
let connector = tokio_native_tls::TlsConnector::from(connector);

// Wrap the TLS connector into the final HTTPs connector
let mut http = HttpConnector::new();
http.enforce_http(false);
let mut connector = HttpsConnector::from((http, connector));
connector.https_only(true);

Ok(Some(connector))
}

async fn create_channel(
&self,
connector: Option<HttpsConnector<HttpConnector>>,
) -> anyhow::Result<Channel> {
let url = match self.config.borrow().url() {
Some(url) => url.to_string(),
None => bail!("Attempting to run gRPC client with no URL"),
};
let channel = Channel::from_shared(url)?;
let channel = match connector {
Some(connector) => channel.connect_with_connector(connector).await?,
None => {
warn!("Using unencrypted gRPC channel");
channel.connect().await?
}
};
let url = url.to_string();
let certs = config.certs().map(Certs::try_from).transpose()?;
let mut channel = Channel::from_shared(url)?;
if let Some(certs) = certs {
let tls = ClientTlsConfig::new()
.domain_name("sensor.stackrox.svc")
.ca_certificate(certs.ca.clone())
.identity(certs.identity.clone());
channel = channel.tls_config(tls)?;
}
Ok(channel)
}

async fn run(&mut self) -> anyhow::Result<bool> {
let channel = self.create_channel()?;
let connector = self.get_connector().await?;
loop {
info!("Attempting to connect to gRPC server...");
let channel = match channel.connect().await {
let channel = match self.create_channel(connector.clone()).await {
Ok(channel) => channel,
Err(e) => {
debug!("Failed to connect to server: {e}");
sleep(Duration::new(1, 0)).await;
debug!("Failed to connect to server: {e:?}");
sleep(Duration::from_secs(1)).await;
continue;
}
};
info!("Successfully connected to gRPC server");

let mut client =
FileActivityServiceClient::with_interceptor(channel, UserAgentInterceptor {});
let mut client = FileActivityServiceClient::new(channel);

let metrics = self.metrics.clone();
let rx =
Expand All @@ -149,7 +155,7 @@ impl Client {
res = client.communicate(rx) => {
match res {
Ok(_) => info!("gRPC stream ended"),
Err(e) => warn!("gRPC stream error: {e}"),
Err(e) => warn!("gRPC stream error: {e:?}"),
}
}
_ = self.config.changed() => return Ok(true),
Expand Down
10 changes: 9 additions & 1 deletion konflux.Containerfile
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ RUN echo "Checking required FACT_TAG"; [[ "${FACT_TAG}" != "" ]]
RUN dnf install -y \
clang \
libbpf-devel \
openssl-devel \
protobuf-compiler \
protobuf-devel \
cargo \
Expand All @@ -17,7 +18,7 @@ COPY . .

RUN cargo build --release

FROM registry.access.redhat.com/ubi9/ubi-micro@sha256:f45ee3d1f8ea8cd490298769daac2ac61da902e83715186145ac2e65322ddfc8
FROM registry.access.redhat.com/ubi9/ubi-minimal@sha256:6fc28bcb6776e387d7a35a2056d9d2b985dc4e26031e98a2bd35a7137cd6fd71

ARG FACT_TAG

Expand All @@ -39,6 +40,13 @@ LABEL \
# We also set it to not inherit one from a base stage in case it's RHEL or UBI.
release="1"

RUN microdnf install -y openssl-libs && \
microdnf clean all && \
rpm --verbose -e --nodeps $( \
rpm -qa 'curl' '*rpm*' '*dnf*' '*libsolv*' '*hawkey*' 'yum*' 'libyaml*' 'libarchive*' \
) && \
rm -rf /var/cache/yum

COPY --from=builder /app/target/release/fact /usr/local/bin

ENTRYPOINT ["fact"]
2 changes: 2 additions & 0 deletions rpms.in.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ packages:
- cargo
- clang
- libbpf-devel
- openssl-libs
- openssl-devel
- protobuf-compiler
- protobuf-devel
- rust
Expand Down
14 changes: 14 additions & 0 deletions rpms.lock.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,13 @@ arches:
name: llvm-libs
evr: 20.1.8-3.el9
sourcerpm: llvm-20.1.8-3.el9.src.rpm
- url: https://cdn.redhat.com/content/dist/rhel9/9/aarch64/appstream/os/Packages/o/openssl-devel-3.5.1-4.el9_7.aarch64.rpm
repoid: rhel-9-for-aarch64-appstream-rpms
size: 4996902
checksum: sha256:a250fae31cced54a51c0c4aacdd44855044652eb39f4141e23fe197d2528ff0b
name: openssl-devel
evr: 1:3.5.1-4.el9_7
sourcerpm: openssl-3.5.1-4.el9_7.src.rpm
- url: https://cdn.redhat.com/content/dist/rhel9/9/aarch64/appstream/os/Packages/p/policycoreutils-python-utils-3.6-3.el9.noarch.rpm
repoid: rhel-9-for-aarch64-appstream-rpms
size: 77697
Expand Down Expand Up @@ -1499,6 +1506,13 @@ arches:
name: llvm-libs
evr: 20.1.8-3.el9
sourcerpm: llvm-20.1.8-3.el9.src.rpm
- url: https://cdn.redhat.com/content/dist/rhel9/9/x86_64/appstream/os/Packages/o/openssl-devel-3.5.1-4.el9_7.x86_64.rpm
repoid: rhel-9-for-x86_64-appstream-rpms
size: 4997984
checksum: sha256:3aeba34c9a9c3313b16166111a1dfe61a29ffaff671bb8f0be95eb0e2dede860
name: openssl-devel
evr: 1:3.5.1-4.el9_7
sourcerpm: openssl-3.5.1-4.el9_7.src.rpm
- url: https://cdn.redhat.com/content/dist/rhel9/9/x86_64/appstream/os/Packages/p/policycoreutils-python-utils-3.6-3.el9.noarch.rpm
repoid: rhel-9-for-x86_64-appstream-rpms
size: 77697
Expand Down
Loading