PollComponent and PollActivityExecution #8563
PollComponent and PollActivityExecution #8563dandavison merged 149 commits intostandalone-activityfrom
Conversation
cb5f326 to
21cdb37
Compare
ded2e1b to
efc4722
Compare
3281080 to
9c65e47
Compare
ce95cbe to
29a0bd9
Compare
29a0bd9 to
33e981e
Compare
40e0b6b to
28665bb
Compare
| ref, err := e.predicateSatisfied(ctx, monotonicPredicate, requestRef, executionLease) | ||
| if err != nil { | ||
| if errors.Is(err, consts.ErrStaleState) { | ||
| err = serviceerror.NewUnavailable("please retry") |
There was a problem hiding this comment.
I need a better error message here, but I haven't decided what.
"request data ahead of persisted data"
?
Also, should it be PollActivityExecution that converts this to serviceerror?
|
|
||
| var ( | ||
| LongPollTimeout = dynamicconfig.NewNamespaceDurationSetting( | ||
| "chasm.activity.longPollTimeout", |
There was a problem hiding this comment.
Not blocking this PR but I don't think putting chasm. as a prefix for user configs is good practice. We will likely want to change this across all libraries though.
bergundy
left a comment
There was a problem hiding this comment.
I didn't review the tests very closely.
There are still come open comments, please address before merging but I do not feel like I need another pass here.
chasm/lib/activity/handler.go
Outdated
| // TODO(dan): include execution key in error message; we may do this at the CHASM | ||
| // framework level. |
There was a problem hiding this comment.
TBH I don't think this is needed. Not blocking the PR.
There was a problem hiding this comment.
I've removed the comment
| if len(token) == 0 { | ||
| return chasm.ReadComponent(ctx, ref, (*Activity).buildPollActivityExecutionResponse, req, nil) | ||
| } |
There was a problem hiding this comment.
This seems more like an invalid argument to me.
There was a problem hiding this comment.
OK, we will address when we split the API
service/history/chasm_notifier.go
Outdated
| type ( | ||
| // ChasmNotifier allows subscribers to receive notifications relating to a CHASM execution. | ||
| ChasmNotifier struct { | ||
| executions map[chasm.EntityKey]*subscriptionTracker |
There was a problem hiding this comment.
Not blocking, but maybe put a TODO here to use the sharded map which will guarantee less lock contention?
There was a problem hiding this comment.
Done, and internally tracking need for an audit of TODOs in code
service/history/history_engine.go
Outdated
| e.eventNotifier.NotifyNewHistoryEvent(notification) | ||
| } | ||
|
|
||
| func (e *historyEngineImpl) ChasmEngine() chasm.Engine { |
There was a problem hiding this comment.
Is this called anywhere? Maybe I missed it?
tests/standalone_activity_test.go
Outdated
| ) | ||
|
|
||
| var ( | ||
| defaultInput = &commonpb.Payloads{ |
There was a problem hiding this comment.
I mentioned this before I believe, use payloads.EncodeString() from common
There was a problem hiding this comment.
Done (not specifically related to this PR so it may conflict, but good to do now in case we forget)
tests/standalone_activity_test.go
Outdated
|
|
||
| func (s *standaloneActivityTestSuite) SetupSuite() { | ||
| s.FunctionalTestBase.SetupSuite() | ||
| s.tv = testvars.New(s.T()) |
There was a problem hiding this comment.
I think this needs to be done in SetupTest
| } | ||
| input := createDefaultInput() | ||
| taskQueue := uuid.New().String() | ||
| taskQueue := testcore.RandomizeStr(t.Name()) |
There was a problem hiding this comment.
If you're already using testvars, might as well fully use it. IMHO that utility doesn't give us much but leaving it up to you.
There was a problem hiding this comment.
I've switched them over
Co-authored-by: Roey Berman <roey@temporal.io>
Co-authored-by: Roey Berman <roey@temporal.io>
Co-authored-by: Roey Berman <roey@temporal.io>
Co-authored-by: Roey Berman <roey@temporal.io>
yycptt
left a comment
There was a problem hiding this comment.
Also need to change all EntityKey -> ExecutionKey
chasm/ref.go
Outdated
| } | ||
| var pRef persistencespb.ChasmComponentRef | ||
| if err := pRef.Unmarshal(data); err != nil { | ||
| return ComponentRef{}, err |
chasm/ref.go
Outdated
| // ErrMalformedComponentRef is returned when component ref bytes cannot be deserialized. | ||
| var ErrMalformedComponentRef = errors.New("malformed component ref") | ||
|
|
||
| // ErrInvalidComponentRef is returned when component ref bytes deserialize to an invalid component ref. | ||
| var ErrInvalidComponentRef = errors.New("invalid component ref") |
There was a problem hiding this comment.
I'd return invalidRequest here unless we are sure all api handlers have proper error conversion logic.
service/history/interfaces/engine.go
Outdated
|
|
||
| NotifyNewHistoryEvent(event *events.Notification) | ||
| NotifyNewTasks(tasks map[tasks.Category][]tasks.Task) | ||
| ChasmEngine() chasm.Engine |
| // Notify for current workflow if it has CHASM updates | ||
| if len(currentWorkflowMutation.UpsertChasmNodes) > 0 || | ||
| len(currentWorkflowMutation.DeleteChasmNodes) > 0 { | ||
| engine.NotifyChasmExecution(chasm.EntityKey{ |
There was a problem hiding this comment.
let's do it in ConflictResolveExecution as well. Create execution is probably fine, I guess there won't be any poller before execution is created.
There was a problem hiding this comment.
hmm I haven't think through if it needs to be in OperationPossiblySucceeded. Can you elaborate your thoughts a bit here.
There was a problem hiding this comment.
Thanks, let's address this in one more PR to target standalone-activity. It will all arrive in main at the same time.
| if ref != nil { | ||
| return ref, nil | ||
| } | ||
| case <-ctx.Done(): |
There was a problem hiding this comment.
do we have some tail room here to return an empty response and avoid a timeout error?
There was a problem hiding this comment.
In the design that @bergundy and I have settled on, the caller sets the tail room. See PollActivityExecution in chasm/lib/activity/handler.go.
| // behind the requested reference. However, getExecutionLease does not currently guarantee that | ||
| // execution VT >= ref VT, therefore we call IsStale() again here and return any error (which at | ||
| // this point must be ErrStaleState; ErrStaleReference has already been eliminated). | ||
| err := chasmTree.IsStale(ref) |
There was a problem hiding this comment.
this is already checked in getExecutionLease? or it's for fixing the bug we discussed before that getExecutionLease needs to do another stale check after reload?
There was a problem hiding this comment.
Yes that's right. We can remove this when that bug is fixed.
Right, we will do that shortly when we merge/rebase. It will be nice to have that done. |
chasm/lib/activity/handler.go
Outdated
| req *activitypb.PollActivityExecutionRequest, | ||
| ) (*activitypb.PollActivityExecutionResponse, bool, error) { | ||
| // TODO(dan): check for terminal activity states | ||
| panic("pollActivityExecutionWaitCompletion is not implemented") |
There was a problem hiding this comment.
Bug: Panic in WaitCompletion handler crashes server
The PollActivityExecution handler contains a panic("pollActivityExecutionWaitCompletion is not implemented") statement in the WaitCompletion case branch. If a user sends a PollActivityExecutionRequest with a WaitCompletion wait policy, this will crash the server. This panic should be replaced with returning a proper error like serviceerror.NewUnimplemented("WaitCompletion is not yet implemented") to avoid server crashes.
| waitPolicy := req.GetFrontendRequest().GetWaitPolicy() | ||
|
|
||
| if waitPolicy == nil { | ||
| return chasm.ReadComponent(ctx, ref, (*Activity).buildPollActivityExecutionResponse, req, nil) |
There was a problem hiding this comment.
Bug: Deferred error transformation bypassed on early returns
The deferred function at lines 80-85 transforms NotFound errors into a user-friendly message by modifying the named return variable err. However, the direct return chasm.ReadComponent(...) statements on lines 90 and 109 bypass the named return variable entirely. In Go, when using return expr1, expr2 with named returns, the expressions go directly to the caller without updating the named variables. This means NotFound errors from the waitPolicy == nil and len(token) == 0 code paths won't be transformed to "activity execution not found".
Additional Locations (1)
| ref, err := DeserializeComponentRef(refBytes) | ||
| if err != nil { | ||
| return false, ErrMalformedComponentRef | ||
| } |
There was a problem hiding this comment.
Bug: ExecutionStateChanged always returns ErrMalformedComponentRef discarding original error type
The function ExecutionStateChanged unconditionally returns ErrMalformedComponentRef when DeserializeComponentRef fails, but DeserializeComponentRef can return either ErrMalformedComponentRef or ErrInvalidComponentRef (for empty data or missing fields). The function's doc comment claims it "may return ErrInvalidComponentRef or ErrMalformedComponentRef" but the implementation always substitutes ErrMalformedComponentRef on deserialization error. The original error should be returned directly (return false, err) instead of always returning ErrMalformedComponentRef.
What changed?
history.ChasmNotifierfor subscribing to CHASM execution state transitionschasm.PollComponentPollActivityExecutionAPI handlerWhy?
How did you test it?
Note
Implements long‑polling for CHASM components and standalone activities via new PollComponent API, PollActivityExecution endpoint, and an execution notifier, with validation, ref handling, and tests.
Engine.PollComponent(withNotifyExecution) and helperExecutionStateChanged; extendContextwithstructuredRef.ChasmNotifierfor execution-level subscriptions; wire into history engine; emit notifications on CHASM mutations.ErrMalformedComponentRef/ErrInvalidComponentRefand validation; exposestructuredRefin tree.PollActivityExecutionfrontend and history handler usingPollComponent; buildActivityExecutionInfoand response assembly.LongPollTimeout,LongPollBuffer); provide via FX modules.PollActivityExecution{Request,Response}messages and service RPC; generate client/grpc helpers.PollComponent(no-wait, wait, stale).PollComponentsignature andNotifyExecution; history engine interface addsNotifyChasmExecution.Written by Cursor Bugbot for commit 1d59e66. This will update automatically on new commits. Configure here.