Skip to content

New decorator hooks#2969

Open
romain-intel wants to merge 5 commits intomasterfrom
feat/new_decorators
Open

New decorator hooks#2969
romain-intel wants to merge 5 commits intomasterfrom
feat/new_decorators

Conversation

@romain-intel
Copy link
Contributor

PR Type

  • Bug fix
  • [ X] New feature
  • [ X] Core Runtime change (higher bar -- see CONTRIBUTING.md)
  • Docs / tooling
  • [ X] Refactoring

Summary

Clean up the decorator hooks to use a system context instead of the arguments passed to the decorators. Also make explciit the phase of the runtime the code executing is in. All previous hooks continue working.

Issue

Fixes #

Reproduction

Runtime:

Commands to run:

# paste exact commands

Where evidence shows up:

Before (error / log snippet)
paste here
After (evidence that fix works)
paste here

Root Cause

Why This Fix Is Correct

Failure Modes Considered

Tests

  • Unit tests added/updated
  • Reproduction script provided (required for Core Runtime)
  • CI passes
  • If tests are impractical: explain why below and provide manual evidence above

Non-Goals

AI Tool Usage

  • No AI tools were used in this contribution
  • [ X] AI tools were used (describe below)

Guided Claude for the implementation. This is a WIP to allow review of the direction at this time. Proper code review is forthcoming.

romain-intel and others added 5 commits March 11, 2026 18:09
… data

Introduce a per-process SystemContext singleton that gives decorators
access to execution phase (LAUNCH/TRAMPOLINE/TASK) and infrastructure
data (flow, graph, run_id, task_id, etc.) without passing them as
positional arguments. The runtime progressively populates the context
as information becomes available throughout the execution lifecycle.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…sced hook

Add _ctx variant class attributes (all None by default) to StepDecorator
and FlowDecorator. When overridden in subclasses, these take precedence
over legacy hooks and receive context via self.system_ctx instead of
positional arguments. The task_step_completed hook coalesces
task_post_step and task_exception into a single method.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…tion

Add shared state methods (publish, get_published, has_published,
get_all_published) scoped per step for inter-decorator communication,
and register_step_decorators/get_step_decorators for decorator
discovery. Wire step_name updates and registration into
_init_step_decorators and _process_late_attached_decorator.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Make @timeout publish its seconds value and @resources publish its
attributes via system_ctx during step_init. Update aws_utils
compute_resource_attributes to read from system_context first,
falling back to decorator iteration for backward compatibility.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@romain-intel romain-intel marked this pull request as ready for review March 12, 2026 17:21
@greptile-apps
Copy link
Contributor

greptile-apps bot commented Mar 12, 2026

Greptile Summary

This PR introduces a SystemContext singleton (system_context) and an ExecutionPhase enum to give decorators structured, context-aware access to runtime state instead of receiving it as positional arguments on every hook call. It also adds a dual-dispatch mechanism so that existing decorator hooks continue to work unchanged while new-style _ctx variants (e.g. step_init_ctx, task_pre_step_ctx) can opt in to reading from system_ctx directly.

Key changes:

  • New metaflow/system_context.py with SystemContext (progressive-update singleton) and ExecutionPhase (LAUNCH / TRAMPOLINE / TASK).
  • Decorator base class gains a system_ctx property; FlowDecorator and StepDecorator declare all new-style hook class-attributes (defaulting to None).
  • cli.py populates the singleton at process start; runtime.py and task.py enrich it at each lifecycle stage and dispatch to _ctx variants when defined.
  • TRAMPOLINE_CLIS_DESC is introduced to separate compute-backend CLIs (batch, kubernetes) from regular CLIs, enabling accurate phase detection.
  • @resources and @timeout are updated to publish their configuration via system_ctx.publish() so other decorators can read it without scanning the decorator list.
  • A comprehensive unit test suite for the new module is added.

Issues found:

  • runtime_task_created_ctx receives an incomplete context: task_id and split_index are available in the local scope but not written to system_context before the hook fires; is_cloned has no corresponding field in SystemContext at all, so it is silently dropped when migrating to the new API.
  • Race condition in LAUNCH phase: system_context is a process-level singleton, and NativeRuntime can launch up to 16 concurrent workers that all mutate it (retry count, task datastore, etc.) before calling decorator hooks. Decorators in the LAUNCH phase may observe another task's values.
  • task_step_completed breaks the naming convention: All other new-style hooks are suffixed _ctx; this one is not, making it harder to enumerate or document the new API surface consistently.
  • Step-scoping hazard in get_run_time_limit_for_task: The helper reads system_context.get_published("timeout", "seconds") which is scoped to the current system_context._step_name. In the LAUNCH process this may have advanced to a different step by the time the helper is called, causing a silent fallback to DEFAULT_RUNTIME_LIMIT.

Confidence Score: 2/5

  • Not safe to merge — there are multiple logic issues including a race condition in the LAUNCH phase and missing context fields that would silently break new-style hook implementations.
  • The design direction is sound and backward compatibility is preserved, but two runtime correctness issues lower confidence significantly: the concurrent-worker race condition on the singleton in the LAUNCH phase, and the incomplete context update before runtime_task_created_ctx (missing task_id, split_index, and is_cloned). The naming inconsistency for task_step_completed and the step-scoping hazard in get_run_time_limit_for_task add further concerns. The PR author already flags this as a WIP for direction review, which is appropriate given these open issues.
  • metaflow/runtime.py (race condition + missing context fields) and metaflow/system_context.py (missing is_cloned field) need the most attention before this can be considered production-ready.

Important Files Changed

Filename Overview
metaflow/system_context.py New singleton SystemContext class with ExecutionPhase enum — well-structured, but missing is_cloned field needed to fully replace the runtime_task_created hook's argument surface.
metaflow/runtime.py Dispatches to _ctx hook variants and updates the singleton with per-task data; has two serious issues: task_id/split_index missing from the context update before runtime_task_created_ctx, and concurrent workers mutating the same singleton create a race condition.
metaflow/task.py Correctly updates the system context singleton with all task-level fields before task hooks fire; the merge of task_post_step/task_exception into task_step_completed is handled cleanly.
metaflow/decorators.py Adds system_ctx property to the base Decorator class and declares new-style hook class attributes; task_step_completed breaks the consistent _ctx naming pattern used by all other new-style hooks.
metaflow/cli.py Populates the system context singleton in the start command with phase, flow, graph, environment, and other flow-level objects; straightforward and correct.
metaflow/extension_support/plugins.py Introduces _trampoline_cli_names tracking and merges TRAMPOLINE_CLIS_DESC entries into the CLI pipeline; logic is consistent for both core and extension plugins.
metaflow/plugins/init.py Moves batch and kubernetes from CLIS_DESC into new TRAMPOLINE_CLIS_DESC so they can be tracked as trampoline backends; straightforward separation of concerns.
metaflow/plugins/timeout_decorator.py Uses system_ctx.publish in step_init and adds get_run_time_limit_for_task; the step-scoped lookup in the new helper may resolve the wrong step's timeout when called from the LAUNCH phase after multiple steps have been initialized.
test/unit/test_system_context.py Comprehensive unit tests covering phase detection, progressive updates, shared state isolation, decorator registration, and _ctx variant dispatch; good coverage of the singleton's API surface.

Sequence Diagram

sequenceDiagram
    participant CLI as cli.py (start)
    participant SC as system_context singleton
    participant D as Decorator hooks
    participant RT as runtime.py (NativeRuntime)
    participant TK as task.py (MetaflowTask)

    CLI->>SC: _update(phase, flow, graph, environment, flow_datastore, metadata, logger)
    CLI->>D: _init_flow_decorators → flow_init_ctx OR flow_init(...)
    CLI->>D: _init_step_decorators → step_init_ctx OR step_init(...)

    RT->>SC: _update(package, run_id)
    RT->>D: runtime_init_ctx OR runtime_init(flow, graph, package, run_id)

    RT->>SC: _update(ubf_context, task_datastore, input_paths)
    Note over RT,SC: ⚠ task_id / split_index / is_cloned NOT set
    RT->>D: runtime_task_created_ctx OR runtime_task_created(...)

    RT->>SC: _update(retry_count, max_user_code_retries)
    RT->>D: runtime_step_cli_ctx(cli_args) OR runtime_step_cli(...)

    TK->>SC: _update(step_name, run_id, task_id, task_datastore, retry_count, ...)
    TK->>D: task_pre_step_ctx OR task_pre_step(...)
    TK->>D: task_decorate_ctx(step_func) OR task_decorate(...)
    TK->>D: task_step_completed() OR task_post_step(...)
    TK->>D: task_step_completed(exception) OR task_exception(...)
    TK->>D: task_finished_ctx(is_task_ok) OR task_finished(...)

    RT->>D: runtime_finished_ctx(exception) OR runtime_finished(exception)
Loading

Comments Outside Diff (2)

  1. metaflow/runtime.py, line 2260-2282 (link)

    Singleton mutation in concurrent worker context is a race condition

    system_context is a process-level singleton. In the LAUNCH phase the NativeRuntime can have up to MAX_WORKERS=16 concurrent workers, and each Worker._launch() now mutates the singleton:

    system_context._update(
        retry_count=self.task.retries,
        max_user_code_retries=self.task.user_code_retries,
    )
    for deco in self.task.decos:
        if deco.runtime_step_cli_ctx is not None:
            deco.runtime_step_cli_ctx(args)

    If two workers run concurrently (e.g. Worker A updates retry_count=0 for task-1, then Worker B updates retry_count=2 for task-2 before Worker A's hook executes), the decorator for task-1 will see task-2's retry count.

    The same race exists for the system_context._update(...) in Task.__init__ (which sets ubf_context, task_datastore, input_paths).

    Using a singleton for per-task state that is mutated concurrently is fundamentally unsafe. Options to consider:

    • Use a thread-local or task-local context rather than a global singleton for the per-task fields updated in the LAUNCH phase.
    • Document that these fields are only safe to use in the TASK phase (where each task is its own subprocess with its own singleton).
  2. metaflow/plugins/timeout_decorator.py, line 104-116 (link)

    get_run_time_limit_for_task step-scoping issue

    system_context.get_published("timeout", "seconds") reads the published timeout from system_context._step_name. In the LAUNCH phase during _init_step_decorators, system_context._step_name is updated per-step just before each step's step_init hooks fire. However, when get_run_time_limit_for_task is called inside Worker._launch() (from the LAUNCH phase), system_context._step_name may already have been advanced to a different step if multiple steps have been initialized since the original step_init call.

    This means the lookup system_context.get_published("timeout", "seconds") could return None (wrong step name) or the timeout for a different step, silently falling back to DEFAULT_RUNTIME_LIMIT.

    step_decos is already passed in as the fallback path, so consider either:

    1. Passing the step name explicitly to this function and scoping the lookup, or
    2. Relying solely on the decorator-iteration fallback until the step-scoping behavior is verified.
    def get_run_time_limit_for_task(step_decos, step_name=None):
        from metaflow.system_context import system_context
        timeout = system_context.get_published("timeout", "seconds", step_name)
        ...

Last reviewed commit: aa72c1a

Comment on lines 1799 to +1810
# Open the output datastore only if the task is not being cloned.
if not self._is_cloned:
self.new_attempt()
from .system_context import system_context

system_context._update(
ubf_context=ubf_context,
task_datastore=self._ds,
input_paths=input_paths,
)
for deco in decos:
deco.runtime_task_created(
self._ds,
task_id,
split_index,
input_paths,
self._is_cloned,
ubf_context,
)
if deco.runtime_task_created_ctx is not None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

task_id and split_index missing from system_context before runtime_task_created_ctx

Before dispatching to runtime_task_created_ctx, the system context is updated with ubf_context, task_datastore, and input_paths — but task_id and split_index are available in the local scope yet never written to system_context. Any decorator implementing runtime_task_created_ctx would call self.system_ctx.task_id and get None, silently breaking logic that depended on those values.

The legacy hook receives all six arguments:

deco.runtime_task_created(self._ds, task_id, split_index, input_paths, self._is_cloned, ubf_context)

but the context update only covers three of them:

system_context._update(
    ubf_context=ubf_context,
    task_datastore=self._ds,
    input_paths=input_paths,
    # task_id and split_index are missing!
)

The fix is to include the missing fields:

system_context._update(
    task_id=task_id,
    split_index=split_index,
    ubf_context=ubf_context,
    task_datastore=self._ds,
    input_paths=input_paths,
)

Note that is_cloned is also not representable at all — SystemContext has no _is_cloned attribute — so decorators using the new-style hook lose access to that flag entirely.

Comment on lines +97 to +123
def __init__(self):
self._phase = None
# Flow-level — available after flow_init / step_init
self._flow = None
self._graph = None
self._environment = None
self._flow_datastore = None
self._metadata = None
self._logger = None
# Runtime-level — available after runtime_init (LAUNCH phase)
self._package = None
self._run_id = None
# Runtime-task-level — available after runtime_task_created (LAUNCH phase)
self._input_paths = None
# Task-level — available during task_* hooks (TASK phase)
self._task_id = None
self._task_datastore = None
self._retry_count = None
self._max_user_code_retries = None
self._ubf_context = None
self._inputs = None
self._split_index = None
# Step
self._step_name = None
# Inter-decorator shared state (keyed by step_name)
self._shared = {} # { step_name: { namespace: { key: value } } }
self._step_decorators = {} # { step_name: [StepDecorator, ...] }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is_cloned has no representation in SystemContext

The legacy runtime_task_created hook exposes an is_cloned flag that indicates whether the task is being resumed from a prior run (cloned). This flag is used by several decorators to skip work that was already done. When a decorator migrates to the new runtime_task_created_ctx API it has no way to retrieve this information, because SystemContext has no corresponding field.

Every other argument from runtime_task_created has a slot (_task_datastore, _input_paths, _ubf_context…) except is_cloned. This should either be added as a field here or documented clearly as a known omission.

# In SystemContext.__init__:
self._is_cloned = None  # populated before runtime_task_created_ctx fires

And update _update() accordingly (it already validates attribute names, so no extra change needed there).

Comment on lines +362 to +370
step_init_ctx = None
runtime_init_ctx = None
runtime_task_created_ctx = None
runtime_step_cli_ctx = None
runtime_finished_ctx = None
task_pre_step_ctx = None
task_decorate_ctx = None
task_step_completed = None # coalesces task_post_step + task_exception
task_finished_ctx = None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

task_step_completed breaks the _ctx naming convention

Every other new-style hook variant is consistently suffixed with _ctx:

  • step_init_ctx
  • runtime_init_ctx
  • runtime_task_created_ctx
  • runtime_step_cli_ctx
  • runtime_finished_ctx
  • task_pre_step_ctx
  • task_decorate_ctx
  • task_finished_ctx

The coalesced post/exception hook is named task_step_completed without the _ctx suffix. This breaks the naming pattern that decorators and tooling would naturally rely on to enumerate all new-style hooks, and makes it harder to distinguish at a glance whether this is a legacy hook override or a new-style one.

Consider renaming to task_step_completed_ctx for consistency:

Suggested change
step_init_ctx = None
runtime_init_ctx = None
runtime_task_created_ctx = None
runtime_step_cli_ctx = None
runtime_finished_ctx = None
task_pre_step_ctx = None
task_decorate_ctx = None
task_step_completed = None # coalesces task_post_step + task_exception
task_finished_ctx = None
step_init_ctx = None
runtime_init_ctx = None
runtime_task_created_ctx = None
runtime_step_cli_ctx = None
runtime_finished_ctx = None
task_pre_step_ctx = None
task_decorate_ctx = None
task_step_completed_ctx = None # coalesces task_post_step + task_exception
task_finished_ctx = None

Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant