Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 82 additions & 1 deletion lib/crewai/src/crewai/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
from crewai.tasks.output_format import OutputFormat
from crewai.tasks.task_output import TaskOutput
from crewai.tools.base_tool import BaseTool
from crewai.types.usage_metrics import UsageMetrics
from crewai.utilities.config import process_config
from crewai.utilities.constants import NOT_SPECIFIED, _NotSpecified
from crewai.utilities.converter import Converter, convert_to_model
Expand Down Expand Up @@ -527,16 +528,80 @@ def execute_async(
).start()
return future

@staticmethod
def _get_agent_token_usage(agent: BaseAgent | None) -> UsageMetrics:
"""Get current token usage from an agent's LLM.

Captures a snapshot of the agent's LLM token usage at the current moment.
This is used to calculate per-task token deltas for accurate tracking
when multiple tasks run concurrently.

Args:
agent: The agent to get token usage from.

Returns:
UsageMetrics with current token counts, or empty metrics if unavailable.
"""
if agent is None:
return UsageMetrics()

# Try to get usage from the agent's LLM (BaseLLM instances)
if hasattr(agent, "llm") and agent.llm is not None:
from crewai.llms.base_llm import BaseLLM

if isinstance(agent.llm, BaseLLM):
return agent.llm.get_token_usage_summary()

# Fallback for litellm-based agents
if hasattr(agent, "_token_process"):
return agent._token_process.get_summary()

return UsageMetrics()

@staticmethod
def _calculate_token_delta(before: UsageMetrics, after: UsageMetrics) -> UsageMetrics:
"""Calculate the token usage delta between two snapshots.

Args:
before: Token usage snapshot before task execution.
after: Token usage snapshot after task execution.

Returns:
UsageMetrics containing the difference (tokens used during task).
"""
return UsageMetrics(
total_tokens=after.total_tokens - before.total_tokens,
prompt_tokens=after.prompt_tokens - before.prompt_tokens,
cached_prompt_tokens=after.cached_prompt_tokens - before.cached_prompt_tokens,
completion_tokens=after.completion_tokens - before.completion_tokens,
successful_requests=after.successful_requests - before.successful_requests,
)

def _execute_task_async(
self,
agent: BaseAgent | None,
context: str | None,
tools: list[Any] | None,
future: Future[TaskOutput],
) -> None:
"""Execute the task asynchronously with context handling."""
"""Execute the task asynchronously with context handling.

This method captures token usage before and after task execution within
the thread to ensure accurate per-task token tracking even when multiple
async tasks run concurrently.
"""
try:
# Capture token usage BEFORE execution within the thread
tokens_before = self._get_agent_token_usage(agent or self.agent)

result = self._execute_core(agent, context, tools)

# Capture token usage AFTER execution within the thread
tokens_after = self._get_agent_token_usage(agent or self.agent)

# Calculate and store the delta in the result
result.token_usage = self._calculate_token_delta(tokens_before, tokens_after)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Redundant token tracking in async execution path

Low Severity

Token tracking happens twice when _execute_task_async calls _execute_core. Both methods capture tokens_before and tokens_after, and both set token_usage. Since _execute_core returns a TaskOutput with token_usage already set, the subsequent overwrite in _execute_task_async is redundant. Both run in the same thread, so the "within thread" rationale doesn't justify the duplication.

Additional Locations (1)

Fix in Cursor Fix in Web


future.set_result(result)
except Exception as e:
future.set_exception(e)
Expand Down Expand Up @@ -568,6 +633,9 @@ async def _aexecute_core(

self.start_time = datetime.datetime.now()

# Capture token usage before execution for accurate per-task tracking
tokens_before = self._get_agent_token_usage(agent)

self.prompt_context = context
tools = tools or self.tools or []

Expand All @@ -579,6 +647,10 @@ async def _aexecute_core(
tools=tools,
)

# Capture token usage after execution
tokens_after = self._get_agent_token_usage(agent)
token_delta = self._calculate_token_delta(tokens_before, tokens_after)

if not self._guardrails and not self._guardrail:
pydantic_output, json_output = self._export_output(result)
else:
Expand All @@ -594,6 +666,7 @@ async def _aexecute_core(
agent=agent.role,
output_format=self._get_output_format(),
messages=agent.last_messages, # type: ignore[attr-defined]
token_usage=token_delta,
)

if self._guardrails:
Expand Down Expand Up @@ -663,6 +736,9 @@ def _execute_core(

self.start_time = datetime.datetime.now()

# Capture token usage before execution for accurate per-task tracking
tokens_before = self._get_agent_token_usage(agent)

self.prompt_context = context
tools = tools or self.tools or []

Expand All @@ -674,6 +750,10 @@ def _execute_core(
tools=tools,
)

# Capture token usage after execution
tokens_after = self._get_agent_token_usage(agent)
token_delta = self._calculate_token_delta(tokens_before, tokens_after)

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Token capture misses output conversion LLM calls

Medium Severity

The tokens_after snapshot is captured BEFORE _export_output(result) is called, but _export_output can make LLM calls through the Converter class when converting task output to Pydantic/JSON models. When the raw result isn't valid JSON and needs LLM-assisted conversion, those additional LLM calls consume tokens that are not included in the task's token_usage. The token capture needs to happen after output conversion completes.

Additional Locations (1)

Fix in Cursor Fix in Web

if not self._guardrails and not self._guardrail:
pydantic_output, json_output = self._export_output(result)
else:
Expand All @@ -689,6 +769,7 @@ def _execute_core(
agent=agent.role,
output_format=self._get_output_format(),
messages=agent.last_messages, # type: ignore[attr-defined]
token_usage=token_delta,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Token usage lost when guardrails trigger task retry

Medium Severity

The token_usage tracking is incomplete. When _execute_core or _aexecute_core creates a TaskOutput with token_usage=token_delta, this value is lost if a guardrail fails and triggers a retry. The guardrail retry paths (_invoke_guardrail_function and _ainvoke_guardrail_function) create new TaskOutput objects without passing token_usage, resulting in token_usage=None for any task that goes through guardrail retry.

Additional Locations (1)

Fix in Cursor Fix in Web

)

if self._guardrails:
Expand Down
13 changes: 12 additions & 1 deletion lib/crewai/src/crewai/tasks/task_output.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,20 @@
"""Task output representation and formatting."""

from __future__ import annotations

import json
from typing import Any
from typing import TYPE_CHECKING, Any

from pydantic import BaseModel, Field, model_validator

from crewai.tasks.output_format import OutputFormat
from crewai.utilities.types import LLMMessage


if TYPE_CHECKING:
from crewai.types.usage_metrics import UsageMetrics


class TaskOutput(BaseModel):
"""Class that represents the result of a task.

Expand All @@ -22,6 +28,7 @@ class TaskOutput(BaseModel):
json_dict: JSON dictionary output of the task
agent: Agent that executed the task
output_format: Output format of the task (JSON, PYDANTIC, or RAW)
token_usage: Token usage metrics for this specific task execution
"""

description: str = Field(description="Description of the task")
Expand All @@ -42,6 +49,10 @@ class TaskOutput(BaseModel):
description="Output format of the task", default=OutputFormat.RAW
)
messages: list[LLMMessage] = Field(description="Messages of the task", default=[])
token_usage: UsageMetrics | None = Field(
description="Token usage metrics for this specific task execution",
default=None,
)

@model_validator(mode="after")
def set_summary(self):
Expand Down
Loading