Standalone activity heartbeating#8730
Conversation
0527002 to
6ffe4da
Compare
chasm/lib/activity/activity.go
Outdated
| func (a *Activity) HandleStarted( | ||
| ctx chasm.MutableContext, | ||
| request *historyservice.RecordActivityTaskStartedRequest, | ||
| ) (*historyservice.RecordActivityTaskStartedResponse, error) { |
There was a problem hiding this comment.
ignore: formatting only
There was a problem hiding this comment.
Looks like @fretz12 and you have different ideas on how to format these signatures. I personally like the style in this PR better.
There was a problem hiding this comment.
I like this formatting better too, but ideally we should have the linter take care of it.
I'm running gofumpt locally, it seems to do a good job... maybe introduce it as part of the lint-code target?
chasm/lib/activity/activity.go
Outdated
| ctx chasm.Context, | ||
| key chasm.EntityKey, | ||
| response *historyservice.RecordActivityTaskStartedResponse, | ||
| ) error { |
There was a problem hiding this comment.
ignore: formatting only
chasm/lib/activity/activity.go
Outdated
| func (a *Activity) handleCancellationRequested( | ||
| ctx chasm.MutableContext, | ||
| req *activitypb.CancelActivityExecutionRequest, | ||
| ) (*activitypb.CancelActivityExecutionResponse, error) { |
There was a problem hiding this comment.
ignore: formatting only
chasm/lib/activity/activity.go
Outdated
| func (a *Activity) HandleStarted( | ||
| ctx chasm.MutableContext, | ||
| request *historyservice.RecordActivityTaskStartedRequest, | ||
| ) (*historyservice.RecordActivityTaskStartedResponse, error) { |
There was a problem hiding this comment.
Looks like @fretz12 and you have different ideas on how to format these signatures. I personally like the style in this PR better.
| } | ||
|
|
||
| // Validate validates a HeartbeatTimeoutTask. | ||
| func (e *heartbeatTimeoutTaskExecutor) Validate( |
There was a problem hiding this comment.
You will want to validate that the schedule time in the attributes is still relevant here not in the execute function. It should deterministic function of the last heartbeat time and the attempt start time (hbDeadline below should be equal to the schedule time).
chasm/lib/activity/activity_tasks.go
Outdated
| hbTimeout := activity.GetHeartbeatTimeout().AsDuration() | ||
| attemptStartTime := attempt.GetStartedTime().AsTime() | ||
| lastHbTime := lastHb.GetRecordedTime().AsTime() // could be from a previous attempt | ||
| // No heartbeats in the attempt so far is equivalent to a heartbeat having been sent at attempt | ||
| // start time. | ||
| hbDeadline := util.MaxTime(lastHbTime, attemptStartTime).Add(hbTimeout) | ||
|
|
||
| if ctx.Now(activity).Before(hbDeadline) { | ||
| // Deadline has not expired; schedule a new task. | ||
| ctx.AddTask( | ||
| activity, | ||
| chasm.TaskAttributes{ | ||
| ScheduledTime: hbDeadline, | ||
| }, | ||
| &activitypb.HeartbeatTimeoutTask{ | ||
| Attempt: attempt.GetCount(), | ||
| }, | ||
| ) | ||
| return nil | ||
| } |
There was a problem hiding this comment.
This shouldn't happen here. Validate in the Validate function. Schedule a new task any time a heartbeat is received.
There was a problem hiding this comment.
Discussed offline -- design B incurs extra persistence writes that aren't balanced out by the changes to numbers of logical tasks created. I've switched the PR over to the design that @bergundy proposes.
chasm/lib/activity/library.go
Outdated
| l.activityDispatchTaskExecutor, | ||
| l.activityDispatchTaskExecutor, | ||
| ), | ||
| // TODO(dan): why are the task names "FooTimer" but "FooTimeoutTask" in the struct names? |
There was a problem hiding this comment.
We agreed not to add Task to the task names. Not sure if we want to use TaskExecutor vs. just Executor for the struct names but that is not critical and can change easily. The string names cannot be changed easily OTOH because they affect how tasks are represented in persistence.
| Details: input.Request.HeartbeatRequest.GetDetails(), | ||
| }) | ||
| return nil, nil | ||
| return &historyservice.RecordActivityTaskHeartbeatResponse{ |
There was a problem hiding this comment.
Just before returning here you want to generate a new heartbeat task if the heartbeat timeout is set.
There was a problem hiding this comment.
See reply above; I currently still think the proposed design is preferable.
There was a problem hiding this comment.
This is done now -- switched back to your design.
| require.Error(t, err) | ||
| statusErr := serviceerror.ToStatus(err) | ||
| require.NotNil(t, statusErr) | ||
| require.Equal(t, codes.InvalidArgument, statusErr.Code()) |
There was a problem hiding this comment.
Alternatively, you can use require.ErrorAs(err, &invalidArgumentErr)
There was a problem hiding this comment.
Leaving as is for now
chasm/lib/activity/validator.go
Outdated
| ) error { | ||
| if a.Status != activitystatepb.ACTIVITY_EXECUTION_STATUS_STARTED && | ||
| a.Status != activitystatepb.ACTIVITY_EXECUTION_STATUS_CANCEL_REQUESTED { | ||
| return serviceerror.NewNotFound("activity task not found") |
There was a problem hiding this comment.
Wrong error message? Also NotFound doesn't feel like the right error type, even though I get it's checking if a token "matches". Perhaps FailPrecondition?
There was a problem hiding this comment.
I used NotFound because it'd what the workflow implementation uses (see IsActivityTaskNotFoundForToken)
chasm/lib/activity/validator.go
Outdated
| return err | ||
| } | ||
| if token.Attempt != attempt.GetCount() { | ||
| return serviceerror.NewNotFound("activity task not found") |
There was a problem hiding this comment.
I used NotFound because it'd what the workflow implementation uses (see IsActivityTaskNotFoundForToken)
| if err := ValidateActivityTaskToken(ctx, a, input.Token); err != nil { | ||
| return nil, err | ||
| } | ||
| a.LastHeartbeat = chasm.NewDataField(ctx, &activitypb.ActivityHeartbeatState{ |
There was a problem hiding this comment.
@bergundy is there any performance pentalty if we create a new field on every hearbeat?
chasm/lib/activity/activity.go
Outdated
| a.LastHeartbeat = chasm.NewDataField(ctx, &activitypb.ActivityHeartbeatState{ | ||
| RecordedTime: timestamppb.New(ctx.Now(a)), | ||
| Details: details, | ||
| Details: input.Request.HeartbeatRequest.GetDetails(), |
There was a problem hiding this comment.
| Details: input.Request.HeartbeatRequest.GetDetails(), | |
| Details: input.Request.GetHeartbeatRequest().GetDetails(), |
chasm/lib/activity/activity_tasks.go
Outdated
| func (e *heartbeatTimeoutTaskExecutor) Execute( | ||
| ctx chasm.MutableContext, | ||
| activity *Activity, | ||
| taskAttrs chasm.TaskAttributes, |
There was a problem hiding this comment.
nit: taskAttrs, task are unused
There was a problem hiding this comment.
Thanks, marked as _ unused parameters
chasm/lib/activity/activity_tasks.go
Outdated
| ctx.AddTask( | ||
| activity, | ||
| chasm.TaskAttributes{ | ||
| ScheduledTime: hbDeadline, |
There was a problem hiding this comment.
If hbDeadline happens to be attemptStartTime + hbTimeout (i.e., no hearbeat recorded yet), won't the next timer basically pop the same time as the current one being executed?
There was a problem hiding this comment.
That should not happen because of this condition
if ctx.Now(activity).Before(hbDeadline)the heartbeat task will not execute before time hbDeadline since the first task is scheduled for hbTimeout.
81b2516 to
c8e5800
Compare
938ff87 to
4ba7671
Compare
|
|
||
| // Set if activity cancelation was requested. | ||
| ActivityCancelState cancel_state = 11; | ||
| ActivityCancelState cancel_state = 12; |
There was a problem hiding this comment.
Bug: Proto field numbers changed breaking wire format compatibility
The new field last_heartbeat_task_scheduled_time was inserted at field number 7, causing all subsequent fields to be renumbered: retry_policy moved from 7→8, status from 8→9, schedule_time from 9→10, priority from 10→11, and cancel_state from 11→12. This breaks protobuf wire format backward compatibility. If any ActivityState messages were previously serialized and stored, deserializing them with the new schema will cause data corruption (e.g., the old retry_policy data would be interpreted as last_heartbeat_task_scheduled_time). New fields should be added with the next available field number (12) without renumbering existing fields.
Additional Locations (1)
chasm/lib/activity/activity.go
Outdated
| if err := TransitionStarted.Apply(a, ctx, nil); err != nil { | ||
| return nil, err | ||
| } | ||
|
|
There was a problem hiding this comment.
The movement here was required to fix a bug that I encountered when implementing the other proposed design. We needed attempt.StartedTime to be available in TransitionStarted so that we could use it in scheduling the first heartbeat task, as well as the base for the start-to-close timeout. Before this PR we were using an ad-hoc ctx.Now() that was close to but not equal to StartedTime.
Reverting it doesn't cause a test failure with the hreartbeating design in this PR, but it's better to set the field state before calling the transition function.
There was a problem hiding this comment.
Reverting this would now cause a test failure since the PR is back to design A.
| @@ -125,14 +125,28 @@ var TransitionStarted = chasm.NewTransition( | |||
| }, | |||
| activitypb.ACTIVITY_EXECUTION_STATUS_STARTED, | |||
| func(a *Activity, ctx chasm.MutableContext, _ any) error { | |||
| attempt := a.LastAttempt.Get(ctx) | |||
| startTime := attempt.GetStartedTime().AsTime() | |||
There was a problem hiding this comment.
This is where we use StartedTime that I referred to above; it wasn't possible before because we were calling the transition function before setting the attempt field state.
Reverting this doesn't cause a test failure in the current PR. But it did with the alternative design, and in any case it's better to use StartedTime rather than an arbitrary timestamp that's close to it in time.
There was a problem hiding this comment.
Reverting this would now cause a test failure since the PR is back to design A.
chasm/lib/activity/activity.go
Outdated
| attempt := a.LastAttempt.Get(ctx) | ||
| attempt.StartedTime = timestamppb.New(ctx.Now(a)) | ||
| attempt.LastWorkerIdentity = request.GetPollRequest().GetIdentity() |
There was a problem hiding this comment.
Why is this logic outside of the transition function? Ideally no mutation would be done outside of transitions.
There was a problem hiding this comment.
Good point, moved and updated tests.
chasm/lib/activity/activity_tasks.go
Outdated
| shouldRetry, retryInterval, err := activity.shouldRetry(ctx, 0) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| if shouldRetry { | ||
| return TransitionRescheduled.Apply(activity, ctx, rescheduleEvent{ | ||
| retryInterval: retryInterval, | ||
| failure: createHeartbeatTimeoutFailure(), | ||
| }) | ||
| } | ||
| return TransitionTimedOut.Apply(activity, ctx, enumspb.TIMEOUT_TYPE_HEARTBEAT) |
There was a problem hiding this comment.
nit: this logic seems duplicated any time an attempt is failed, can we add a function that encapsulates it?
There was a problem hiding this comment.
Good call, refactored along those lines in ffcb338.
chasm/lib/activity/validator.go
Outdated
| func ValidateActivityTaskToken( | ||
| ctx chasm.Context, | ||
| a *Activity, | ||
| token *tokenspb.Task, | ||
| ) error { |
There was a problem hiding this comment.
Make it a method:
| func ValidateActivityTaskToken( | |
| ctx chasm.Context, | |
| a *Activity, | |
| token *tokenspb.Task, | |
| ) error { | |
| func (a *Activity) ValidateTaskToken( | |
| ctx chasm.Context, | |
| token *tokenspb.Task, | |
| ) error { |
There was a problem hiding this comment.
Thanks, yes, agree with both, done.
| require.NotEmpty(t, pollTaskResp.TaskToken) | ||
|
|
||
| // Heartbeat before timeout | ||
| time.Sleep(600 * time.Millisecond) //nolint:forbidigo |
There was a problem hiding this comment.
What value are you getting from adding those sleeps? They are just making the test flaky.
There was a problem hiding this comment.
The sleeps allow us to prove that an attempt with duration longer than the heartbeat interval can succeed, due to the heartbeats. Without the sleep, the attempt would just immediately succeed and we would have demonstrated nothing about the heartbeat timeout.
I'd like to know of better ways to demonstrate that, but tests like that are a necessity while developing the feature. We could delete the test if there is no good way of testing this in a functional test.
This reverts commit 534685045cdb01abd7d6191126c25204cc6493ec.
This reverts commit 3796405.
2a2b8a4 to
d8c8df8
Compare
Implement heartbeating for CHASM activities (standalone activity) - [x] built - [x] added new functional test(s)

What changed?
Implement heartbeating for CHASM activities (standalone activity)
Why?
Required feature
How did you test it?
Note
Implements heartbeating for standalone activities, including heartbeat timers, timeout-driven retry/failure, task-token validation, frontend routing, and supporting proto/executors with comprehensive tests.
RecordHeartbeatwith task-token validation; store heartbeat details/time and schedule heartbeat timer; response signalscancelRequested.TransitionTimedOuthandlesHEARTBEAT;getOrCreateLastHeartbeatused when recording failures.WithTokenwrapper and enforce token validation inHandleCompleted/Failed/Canceledand heartbeats.tryReschedule/shouldRetry/hasEnoughTimeForRetry; start-to-close timeout uses retry-first logic.PopulateRecordStartedResponseincludes priorHeartbeatDetails;TransitionStartedrecords worker/deployment and uses started-time-based scheduling.HeartbeatTimeoutTaskin proto (+ helpers); implement validate/execute executors; register infxand task registry.RecordActivityTaskHeartbeatandRespondActivityTask{Completed,Failed,Canceled}viachasm.UpdateComponentusingWithToken.Written by Cursor Bugbot for commit af7ca83. This will update automatically on new commits. Configure here.