Conversation
… data Introduce a per-process SystemContext singleton that gives decorators access to execution phase (LAUNCH/TRAMPOLINE/TASK) and infrastructure data (flow, graph, run_id, task_id, etc.) without passing them as positional arguments. The runtime progressively populates the context as information becomes available throughout the execution lifecycle. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…sced hook Add _ctx variant class attributes (all None by default) to StepDecorator and FlowDecorator. When overridden in subclasses, these take precedence over legacy hooks and receive context via self.system_ctx instead of positional arguments. The task_step_completed hook coalesces task_post_step and task_exception into a single method. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…tion Add shared state methods (publish, get_published, has_published, get_all_published) scoped per step for inter-decorator communication, and register_step_decorators/get_step_decorators for decorator discovery. Wire step_name updates and registration into _init_step_decorators and _process_late_attached_decorator. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Make @timeout publish its seconds value and @resources publish its attributes via system_ctx during step_init. Update aws_utils compute_resource_attributes to read from system_context first, falling back to decorator iteration for backward compatibility. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
93aedb0 to
aa72c1a
Compare
Greptile SummaryThis PR introduces a Key changes:
Issues found:
Confidence Score: 2/5
Important Files Changed
Sequence DiagramsequenceDiagram
participant CLI as cli.py (start)
participant SC as system_context singleton
participant D as Decorator hooks
participant RT as runtime.py (NativeRuntime)
participant TK as task.py (MetaflowTask)
CLI->>SC: _update(phase, flow, graph, environment, flow_datastore, metadata, logger)
CLI->>D: _init_flow_decorators → flow_init_ctx OR flow_init(...)
CLI->>D: _init_step_decorators → step_init_ctx OR step_init(...)
RT->>SC: _update(package, run_id)
RT->>D: runtime_init_ctx OR runtime_init(flow, graph, package, run_id)
RT->>SC: _update(ubf_context, task_datastore, input_paths)
Note over RT,SC: ⚠ task_id / split_index / is_cloned NOT set
RT->>D: runtime_task_created_ctx OR runtime_task_created(...)
RT->>SC: _update(retry_count, max_user_code_retries)
RT->>D: runtime_step_cli_ctx(cli_args) OR runtime_step_cli(...)
TK->>SC: _update(step_name, run_id, task_id, task_datastore, retry_count, ...)
TK->>D: task_pre_step_ctx OR task_pre_step(...)
TK->>D: task_decorate_ctx(step_func) OR task_decorate(...)
TK->>D: task_step_completed() OR task_post_step(...)
TK->>D: task_step_completed(exception) OR task_exception(...)
TK->>D: task_finished_ctx(is_task_ok) OR task_finished(...)
RT->>D: runtime_finished_ctx(exception) OR runtime_finished(exception)
|
| # Open the output datastore only if the task is not being cloned. | ||
| if not self._is_cloned: | ||
| self.new_attempt() | ||
| from .system_context import system_context | ||
|
|
||
| system_context._update( | ||
| ubf_context=ubf_context, | ||
| task_datastore=self._ds, | ||
| input_paths=input_paths, | ||
| ) | ||
| for deco in decos: | ||
| deco.runtime_task_created( | ||
| self._ds, | ||
| task_id, | ||
| split_index, | ||
| input_paths, | ||
| self._is_cloned, | ||
| ubf_context, | ||
| ) | ||
| if deco.runtime_task_created_ctx is not None: |
There was a problem hiding this comment.
task_id and split_index missing from system_context before runtime_task_created_ctx
Before dispatching to runtime_task_created_ctx, the system context is updated with ubf_context, task_datastore, and input_paths — but task_id and split_index are available in the local scope yet never written to system_context. Any decorator implementing runtime_task_created_ctx would call self.system_ctx.task_id and get None, silently breaking logic that depended on those values.
The legacy hook receives all six arguments:
deco.runtime_task_created(self._ds, task_id, split_index, input_paths, self._is_cloned, ubf_context)but the context update only covers three of them:
system_context._update(
ubf_context=ubf_context,
task_datastore=self._ds,
input_paths=input_paths,
# task_id and split_index are missing!
)The fix is to include the missing fields:
system_context._update(
task_id=task_id,
split_index=split_index,
ubf_context=ubf_context,
task_datastore=self._ds,
input_paths=input_paths,
)
Note that is_cloned is also not representable at all — SystemContext has no _is_cloned attribute — so decorators using the new-style hook lose access to that flag entirely.
| def __init__(self): | ||
| self._phase = None | ||
| # Flow-level — available after flow_init / step_init | ||
| self._flow = None | ||
| self._graph = None | ||
| self._environment = None | ||
| self._flow_datastore = None | ||
| self._metadata = None | ||
| self._logger = None | ||
| # Runtime-level — available after runtime_init (LAUNCH phase) | ||
| self._package = None | ||
| self._run_id = None | ||
| # Runtime-task-level — available after runtime_task_created (LAUNCH phase) | ||
| self._input_paths = None | ||
| # Task-level — available during task_* hooks (TASK phase) | ||
| self._task_id = None | ||
| self._task_datastore = None | ||
| self._retry_count = None | ||
| self._max_user_code_retries = None | ||
| self._ubf_context = None | ||
| self._inputs = None | ||
| self._split_index = None | ||
| # Step | ||
| self._step_name = None | ||
| # Inter-decorator shared state (keyed by step_name) | ||
| self._shared = {} # { step_name: { namespace: { key: value } } } | ||
| self._step_decorators = {} # { step_name: [StepDecorator, ...] } |
There was a problem hiding this comment.
is_cloned has no representation in SystemContext
The legacy runtime_task_created hook exposes an is_cloned flag that indicates whether the task is being resumed from a prior run (cloned). This flag is used by several decorators to skip work that was already done. When a decorator migrates to the new runtime_task_created_ctx API it has no way to retrieve this information, because SystemContext has no corresponding field.
Every other argument from runtime_task_created has a slot (_task_datastore, _input_paths, _ubf_context…) except is_cloned. This should either be added as a field here or documented clearly as a known omission.
# In SystemContext.__init__:
self._is_cloned = None # populated before runtime_task_created_ctx firesAnd update _update() accordingly (it already validates attribute names, so no extra change needed there).
| step_init_ctx = None | ||
| runtime_init_ctx = None | ||
| runtime_task_created_ctx = None | ||
| runtime_step_cli_ctx = None | ||
| runtime_finished_ctx = None | ||
| task_pre_step_ctx = None | ||
| task_decorate_ctx = None | ||
| task_step_completed = None # coalesces task_post_step + task_exception | ||
| task_finished_ctx = None |
There was a problem hiding this comment.
task_step_completed breaks the _ctx naming convention
Every other new-style hook variant is consistently suffixed with _ctx:
step_init_ctxruntime_init_ctxruntime_task_created_ctxruntime_step_cli_ctxruntime_finished_ctxtask_pre_step_ctxtask_decorate_ctxtask_finished_ctx
The coalesced post/exception hook is named task_step_completed without the _ctx suffix. This breaks the naming pattern that decorators and tooling would naturally rely on to enumerate all new-style hooks, and makes it harder to distinguish at a glance whether this is a legacy hook override or a new-style one.
Consider renaming to task_step_completed_ctx for consistency:
| step_init_ctx = None | |
| runtime_init_ctx = None | |
| runtime_task_created_ctx = None | |
| runtime_step_cli_ctx = None | |
| runtime_finished_ctx = None | |
| task_pre_step_ctx = None | |
| task_decorate_ctx = None | |
| task_step_completed = None # coalesces task_post_step + task_exception | |
| task_finished_ctx = None | |
| step_init_ctx = None | |
| runtime_init_ctx = None | |
| runtime_task_created_ctx = None | |
| runtime_step_cli_ctx = None | |
| runtime_finished_ctx = None | |
| task_pre_step_ctx = None | |
| task_decorate_ctx = None | |
| task_step_completed_ctx = None # coalesces task_post_step + task_exception | |
| task_finished_ctx = None |
Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!
PR Type
Summary
Clean up the decorator hooks to use a system context instead of the arguments passed to the decorators. Also make explciit the phase of the runtime the code executing is in. All previous hooks continue working.
Issue
Fixes #
Reproduction
Runtime:
Commands to run:
# paste exact commandsWhere evidence shows up:
Before (error / log snippet)
After (evidence that fix works)
Root Cause
Why This Fix Is Correct
Failure Modes Considered
Tests
Non-Goals
AI Tool Usage
Guided Claude for the implementation. This is a WIP to allow review of the direction at this time. Proper code review is forthcoming.