Skip to content

Add HookToolset and SQLToolset for agentic LLM workflows#62785

Merged
kaxil merged 5 commits intoapache:mainfrom
astronomer:aip-99-toolsets
Mar 3, 2026
Merged

Add HookToolset and SQLToolset for agentic LLM workflows#62785
kaxil merged 5 commits intoapache:mainfrom
astronomer:aip-99-toolsets

Conversation

@kaxil
Copy link
Copy Markdown
Member

@kaxil kaxil commented Mar 3, 2026

Airflow Hooks as AI Agent Tools

Airflow's 350+ provider hooks already form the largest authenticated tool registry in the data ecosystem. Each hook has typed methods, rich docstrings, and managed credentials stored in Airflow's secret backend. This PR adds a thin adapter layer that exposes them as pydantic-ai tools, turning Airflow into an AI agent toolkit. This is a continuation of mine & Pavan's talk at the Airflow Summit 2025 [ YouTube ] .

For context: MCP servers typically expose ~30 tools each and require separate authentication setup. Airflow hooks cover thousands of methods across AWS, GCP, Azure, databases, HTTP APIs, Slack, and more — all pre-authenticated through Airflow Connections.

What's in this PR

HookToolset — generic adapter that turns any Airflow Hook into a set of pydantic-ai tools via introspection:

from airflow.providers.http.hooks.http import HttpHook
from airflow.providers.common.ai.toolsets.hook import HookToolset

# Turn 2 HttpHook methods into agent tools
HookToolset(
    HttpHook(http_conn_id="my_api"),
    allowed_methods=["run", "check_response"],
    tool_name_prefix="http_",
)

The introspection engine builds JSON Schema from method signatures (inspect.signature + get_type_hints) and enriches tool descriptions from docstrings (Sphinx :param: and Google Args: styles). This works with any hook — S3Hook, GCSHook, SlackHook, DbApiHook, etc.

SQLToolset — curated 4-tool database toolset inspired by LangChain's SQLDatabaseToolkit (one of their most-used agent features):

Tool What it does
list_tables Lists available table names (filtered by allowed_tables)
get_schema Returns column names and types for a table
query Executes SQL and returns rows as JSON (with max_rows truncation)
check_query Validates SQL syntax without executing
from airflow.providers.common.ai.toolsets.sql import SQLToolset

SQLToolset(
    db_conn_id="postgres_default",
    allowed_tables=["customers", "orders"],
    allow_writes=False,                       # Default — validates SQL
    max_rows=50,                              # Default — truncate results
)

Documentation — full toolsets.rst with usage, parameters, security section (defense layers table, allowed_tables limitation, HookToolset guidelines, recommended configurations, production checklist).

Safety

  • HookToolset: Requires explicit allowed_methods list. No auto-discovery — DAG authors must opt in to each method. Methods validated with hasattr + callable at instantiation time.
  • SQLToolset: allow_writes=False by default, validates every query through validate_sql() and rejects INSERT/UPDATE/DELETE/DROP. allowed_tables filters metadata visibility (not query-level — documented clearly). max_rows truncates results.
  • Both: sequential=True on all tool definitions to prevent concurrent sync I/O on shared hook state.

Design decisions

Why custom introspection instead of pydantic-ai's _function_schema? Hook methods are bound methods with self, decorators like @provide_bucket_name, and complex signatures. Our lightweight approach avoids coupling to pydantic-ai internals.

Why allowed_tables is metadata-only? Parsing SQL for table references (CTEs, subqueries, aliases, vendor-specific syntax) is complex and error-prone. Providing a false sense of security is worse than being honest about the limitation. Real access control belongs at the DB permission level.

Why not auto-discover hook methods? Auto-discovery would expose every public method including run(), get_connection(), etc. — giving an LLM broad unintended access. Explicit listing forces DAG authors to think about the blast radius

Dag:

"""
Demo: AgentOperator + SQLToolset against a local SQLite database.

Setup (run once inside breeze shell):

    # 1. Create the demo SQLite database
    python /files/dags/demo_agent_sql.py

    # 2. Create the SQLite connection
    airflow connections add sqlite_demo \
        --conn-type sqlite \
        --conn-host /tmp/demo_agent.db

    # 3. Create the LLM connection — Claude via Anthropic API
    airflow connections add pydantic_ai_default \
        --conn-type pydantic_ai \
        --conn-password "$ANTHROPIC_API_KEY" \
        --conn-extra '{"model": "anthropic:claude-sonnet-4-20250514"}'

    # Then trigger the DAG from the UI or:
    #   airflow dags test demo_agent_sql
"""

from __future__ import annotations

import sqlite3
from pathlib import Path

from airflow.providers.common.ai.operators.agent import AgentOperator
from airflow.providers.common.ai.toolsets.sql import SQLToolset
from airflow.providers.common.compat.sdk import dag, task

DB_PATH = "/tmp/demo_agent.db"


def _init_demo_db():
    """Create a small demo database with customers and orders."""
    db = Path(DB_PATH)
    if db.exists():
        db.unlink()

    conn = sqlite3.connect(DB_PATH)
    cur = conn.cursor()

    cur.execute("""
        CREATE TABLE customers (
            id INTEGER PRIMARY KEY,
            name TEXT NOT NULL,
            city TEXT NOT NULL,
            tier TEXT NOT NULL
        )
    """)
    cur.executemany(
        "INSERT INTO customers (id, name, city, tier) VALUES (?, ?, ?, ?)",
        [
            (1, "Acme Corp", "San Francisco", "enterprise"),
            (2, "Globex", "New York", "enterprise"),
            (3, "Initech", "Austin", "mid-market"),
            (4, "Umbrella Corp", "Chicago", "enterprise"),
            (5, "Stark Industries", "Los Angeles", "enterprise"),
            (6, "Wayne Enterprises", "Gotham", "enterprise"),
            (7, "Pied Piper", "Palo Alto", "startup"),
            (8, "Hooli", "Mountain View", "enterprise"),
            (9, "Dunder Mifflin", "Scranton", "mid-market"),
            (10, "Prestige Worldwide", "Denver", "startup"),
        ],
    )

    cur.execute("""
        CREATE TABLE orders (
            id INTEGER PRIMARY KEY,
            customer_id INTEGER NOT NULL REFERENCES customers(id),
            amount REAL NOT NULL,
            product TEXT NOT NULL,
            created_at TEXT NOT NULL
        )
    """)
    cur.executemany(
        "INSERT INTO orders (id, customer_id, amount, product, created_at) VALUES (?, ?, ?, ?, ?)",
        [
            (1, 1, 15000.00, "Platform License", "2025-01-15"),
            (2, 1, 3200.00, "Support Plan", "2025-02-01"),
            (3, 2, 42000.00, "Platform License", "2025-01-20"),
            (4, 3, 8500.00, "Team License", "2025-02-10"),
            (5, 4, 27000.00, "Platform License", "2025-01-25"),
            (6, 5, 55000.00, "Enterprise License", "2025-03-01"),
            (7, 2, 7800.00, "Support Plan", "2025-03-05"),
            (8, 6, 31000.00, "Platform License", "2025-03-10"),
            (9, 7, 2400.00, "Starter License", "2025-03-15"),
            (10, 8, 48000.00, "Enterprise License", "2025-03-20"),
            (11, 1, 5600.00, "Add-on Pack", "2025-04-01"),
            (12, 9, 6200.00, "Team License", "2025-04-05"),
            (13, 3, 9100.00, "Team License", "2025-04-10"),
            (14, 5, 12000.00, "Support Plan", "2025-04-15"),
            (15, 10, 1800.00, "Starter License", "2025-04-20"),
        ],
    )

    conn.commit()
    conn.close()
    print(f"Created demo database at {DB_PATH} with 10 customers and 15 orders")


@dag
def demo_agent_sql():
    @task
    def setup_db():
        """Ensure the demo database exists with fresh data."""
        _init_demo_db()

    analyze = AgentOperator(
        task_id="analyze_revenue",
        prompt=(
            "Which customers have spent the most in total? "
            "Show the top 5 with their total spend and number of orders. "
            "Also tell me which product category generates the most revenue."
        ),
        llm_conn_id="pydantic_ai_default",
        system_prompt=(
            "You are a sales analytics assistant. "
            "Use the available SQL tools to query the database and answer questions. "
            "Always start by listing tables and checking their schema before writing queries."
        ),
        toolsets=[
            SQLToolset(
                db_conn_id="sqlite_demo",
                allowed_tables=["customers", "orders"],
                max_rows=20,
            ),
        ],
    )

    @task
    def print_result(result):
        print("=" * 60)
        print("AGENT RESPONSE:")
        print("=" * 60)
        print(result)
        print("=" * 60)

    setup_db() >> analyze
    print_result(analyze.output)


demo_agent_sql()
image
::group::Log message source details sources=["/root/airflow/logs/dag_id=demo_agent_sql/run_id=manual__2026-03-03T18:18:42.869871+00:00/task_id=print_result/attempt=1.log"] 
::endgroup::
INFO - DAG bundles loaded: dags-folder
INFO - Filling up the DagBag from /files/dags/demo_agent_sql.py
INFO - ============================================================
INFO - Done. Returned value was: None
INFO - AGENT RESPONSE:
INFO - ============================================================
INFO - ## Results Summary
INFO - 
INFO - ### Top 5 Customers by Total Spend:
INFO - 
INFO - 1. **Stark Industries** - $67,000.00 total spend (2 orders)
INFO - 2. **Globex** - $49,800.00 total spend (2 orders)
INFO - 3. **Hooli** - $48,000.00 total spend (1 order)
INFO - 4. **Wayne Enterprises** - $31,000.00 total spend (1 order)
INFO - 5. **Umbrella Corp** - $27,000.00 total spend (1 order)
INFO - 
INFO - ### Product Category Revenue Analysis:
INFO - 
INFO - The **Platform License** category generates the most revenue with $115,000.00 total revenue from 4 orders. Here's the complete ranking by revenue:
INFO - 
INFO - 1. **Platform License** - $115,000.00 (4 orders)
INFO - 2. **Enterprise License** - $103,000.00 (2 orders)
INFO - 3. **Team License** - $23,800.00 (3 orders)
INFO - 4. **Support Plan** - $23,000.00 (3 orders)
INFO - 5. **Add-on Pack** - $5,600.00 (1 order)
INFO - 6. **Starter License** - $4,200.00 (2 orders)
INFO - 
INFO - ### Key Insights:
INFO - - Stark Industries is your top customer, spending significantly more than others
INFO - - Platform License and Enterprise License are your highest-revenue products, together accounting for over $218,000 in revenue
INFO - - Some high-value customers like Hooli make large single purchases, while others like Stark Industries and Globex make multiple purchases
INFO - ============================================================

HookToolset: Generic adapter that exposes any Airflow Hook's methods
as pydantic-ai tools via introspection. Requires explicit
allowed_methods list (no auto-discovery). Builds JSON Schema from
method signatures and enriches tool descriptions from docstrings.

SQLToolset: Curated 4-tool database toolset (list_tables, get_schema,
query, check_query) wrapping DbApiHook. Read-only by default with SQL
validation, allowed_tables metadata filtering, and max_rows truncation.

Both implement pydantic-ai's AbstractToolset interface with
sequential=True on all tool definitions to prevent concurrent sync I/O.
kaxil added 3 commits March 3, 2026 12:44
The list comprehension in the else branch produces list[list[Any]]
while the if branch produces list[dict[str, Any]]. Add an explicit
type annotation to satisfy mypy.
Sphinx autoapi generates RST from pydantic-ai's AbstractToolset base
class docstrings. These words appear in the auto-generated docs and
need to be in the global wordlist.
Docs for HookToolset (generic hook→tools adapter) and SQLToolset
(curated 4-tool DB toolset). Includes defense layers table,
allowed_tables limitation, HookToolset guidelines, recommended
configurations, and production checklist.
@kaxil kaxil force-pushed the aip-99-toolsets branch from 1619202 to 9ad18e2 Compare March 3, 2026 17:39
@kaxil kaxil marked this pull request as ready for review March 3, 2026 19:30
@kaxil kaxil requested a review from gopidesupavan as a code owner March 3, 2026 19:30
Copy link
Copy Markdown
Member

@gopidesupavan gopidesupavan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

woohoo Great work kaxil.

Remove toolsets.rst from how-to-guide list in provider.yaml — the
validation script only recognizes operators/, sensors/, and transfer/
doc paths. The toolsets docs remain accessible via the index.rst toctree.

Add "hardcode" to the spelling wordlist.
@gopidesupavan
Copy link
Copy Markdown
Member

May be we should add ModelRetry if agent fails to call any tool or any errors ?

@kaxil kaxil changed the title Add HookToolset and SQLToolset for agentic LLM workflows Add HookToolset and SQLToolset for agentic LLM workflows Mar 3, 2026
@kaxil kaxil moved this from Backlog to In review in AIP-99 Common Data Access Pattern + AI Mar 3, 2026
@kaxil kaxil merged commit 73d0ee8 into apache:main Mar 3, 2026
124 checks passed
dominikhei pushed a commit to dominikhei/airflow that referenced this pull request Mar 11, 2026
…62785)

HookToolset: Generic adapter that exposes any Airflow Hook's methods
as pydantic-ai tools via introspection. Requires explicit
allowed_methods list (no auto-discovery). Builds JSON Schema from
method signatures and enriches tool descriptions from docstrings.

SQLToolset: Curated 4-tool database toolset (list_tables, get_schema,
query, check_query) wrapping DbApiHook. Read-only by default with SQL
validation, allowed_tables metadata filtering, and max_rows truncation.

Both implement pydantic-ai's AbstractToolset interface with
sequential=True on all tool definitions to prevent concurrent sync I/O.

* Fix mypy error: annotate result variable in SQLToolset._query

The list comprehension in the else branch produces list[list[Any]]
while the if branch produces list[dict[str, Any]]. Add an explicit
type annotation to satisfy mypy.

* Add toolset/agentic/ctx to spelling wordlist

Sphinx autoapi generates RST from pydantic-ai's AbstractToolset base
class docstrings. These words appear in the auto-generated docs and
need to be in the global wordlist.

Docs for HookToolset (generic hook→tools adapter) and SQLToolset
(curated 4-tool DB toolset). Includes defense layers table,
allowed_tables limitation, HookToolset guidelines, recommended
configurations, and production checklist.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

Development

Successfully merging this pull request may close these issues.

2 participants