This is Validio's experimental POC code related to different Airflow integrations. The goal is to serve as an example on how different operators and approaches can put Validio in an Airflow setup.
For these Airflow components to work validio-sdk and
validio-airflow (this module) needs to exist in your Python environment. It
also means that the endpoint required to register the plugin needs to exist.
Additionally validio-cli needs to be installed if you want to
use validio config command. Either install with pip (or any other preferred
tool) or ensure both modules exist in your PYTHONPATH and that the entrypoint
exists before starting Airflow.
We use poetry for development so if you clone this repository you
can use poetry to setup your environment.
# Install poetry
pip install poetry
# Install this provider and its dependencies in a virtual environment
poetry install
# Start Airflow inside the virtual environment
poetry run airflow standaloneThe Hook that we use to connect to Validio works with the Airflow Connection
type. This provider contains a specific connection type called validio. You
can either set it up in the web interface or with the Airflow cli.
Here's how you can create the connection via cli if you have validio-cli and
Airflow CLI installed.
NOTE This example requires
jqto be installed. Copy the values manually if you don't want to installjq.
# Get the Validio config.
read -r host login password \
<<<$(validio config get -o json --show-secrets | jq -r '. | "\(.endpoint) \(.access_key) \(.access_secret)"')
# Create the connection
airflow connections add 'validio_default' \
--conn-json '{
"conn_type": "validio",
"description": "Validio API connection",
"host": "'$host'",
"login": "'$login'",
"password": "'$password'"
}'NOTE We do not take any extra measures to keep this information secret and fully rely on the Airflow configuration for sensitive data!
The ValidioHook will work with Airflow connections but fallback to your system
(environment variables or configuration file) if none exist. To create a hook
you just create an instance.
validio_hook = ValidioHook(conn_id="validio_default")The hook can return a ValidioAPIClient to use for any needs.
validio_client = validio_hook.get_client()Hooks are often used to abstract the most common use cases but for now we're only exposing a method to get metrics for the last 10 minutes. This means that there might be inconsistencies where some API calls happens on the hook where others first gets a client from the hook and making the request with the Validio SDK client.
The ValidioPollSourceOperator is an operator that aims to trigger a source
poll in Validio. Instead of relying on a cron schedule this can be used to
trigger a poll in Validio when we know that data has landed in the data source.
The ValidioMetricsOperator is an Operator that can fetch metrics for a
validator and segment. The operator will generate a CSV file at the specified
location containing metric start time, end time, incident flag and what the
value was.
NOTE This is currently only an example operator on how operators can be built and not designed for production use. In this example it's not possible to get other than the last hours of metrics and we blindly pick a segment at random if none were provided.
ValidioIncidentsSensor is a Sensor that pokes the API for incidents the last
10 minutes. The sensor can be configured to allow a certain amount of incidents
within this time period and also certain incident severities. The sensor will
trigger once there are more than configured amount of incidents outside the
allowed severities.
We've bundled some examples under the examples directory that serves as a starting point for your Airflow setup.