Skip to content

Commit 219b656

Browse files
authored
Standalone activity heartbeating (#8730)
## What changed? Implement heartbeating for CHASM activities (standalone activity) ## How did you test it? - [x] built - [x] added new functional test(s)
1 parent aea1e85 commit 219b656

14 files changed

Lines changed: 777 additions & 110 deletions

File tree

chasm/lib/activity/activity.go

Lines changed: 126 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,14 @@ import (
77

88
"go.temporal.io/api/activity/v1"
99
commonpb "go.temporal.io/api/common/v1"
10-
deploymentpb "go.temporal.io/api/deployment/v1"
1110
enumspb "go.temporal.io/api/enums/v1"
1211
failurepb "go.temporal.io/api/failure/v1"
1312
historypb "go.temporal.io/api/history/v1"
1413
"go.temporal.io/api/serviceerror"
1514
"go.temporal.io/api/workflowservice/v1"
1615
"go.temporal.io/server/api/historyservice/v1"
1716
"go.temporal.io/server/api/matchingservice/v1"
17+
tokenspb "go.temporal.io/server/api/token/v1"
1818
"go.temporal.io/server/chasm"
1919
"go.temporal.io/server/chasm/lib/activity/gen/activitypb/v1"
2020
"go.temporal.io/server/common"
@@ -54,6 +54,12 @@ type Activity struct {
5454
Store chasm.Field[ActivityStore]
5555
}
5656

57+
// WithToken wraps a request with its deserialized task token.
58+
type WithToken[R any] struct {
59+
Token *tokenspb.Task
60+
Request R
61+
}
62+
5763
func (a *Activity) LifecycleState(_ chasm.Context) chasm.LifecycleState {
5864
switch a.Status {
5965
case activitypb.ACTIVITY_EXECUTION_STATUS_COMPLETED:
@@ -134,25 +140,15 @@ func (a *Activity) createAddActivityTaskRequest(ctx chasm.Context, namespaceID s
134140
func (a *Activity) HandleStarted(ctx chasm.MutableContext, request *historyservice.RecordActivityTaskStartedRequest) (
135141
*historyservice.RecordActivityTaskStartedResponse, error,
136142
) {
137-
if err := TransitionStarted.Apply(a, ctx, nil); err != nil {
143+
if err := TransitionStarted.Apply(a, ctx, request); err != nil {
138144
return nil, err
139145
}
140-
141-
attempt := a.LastAttempt.Get(ctx)
142-
attempt.StartedTime = timestamppb.New(ctx.Now(a))
143-
attempt.LastWorkerIdentity = request.GetPollRequest().GetIdentity()
144-
145-
if versionDirective := request.GetVersionDirective().GetDeploymentVersion(); versionDirective != nil {
146-
attempt.LastDeploymentVersion = &deploymentpb.WorkerDeploymentVersion{
147-
BuildId: versionDirective.GetBuildId(),
148-
DeploymentName: versionDirective.GetDeploymentName(),
149-
}
150-
}
151146
response := &historyservice.RecordActivityTaskStartedResponse{}
152147
err := a.StoreOrSelf(ctx).PopulateRecordStartedResponse(ctx, ctx.ExecutionKey(), response)
153148
return response, err
154149
}
155150

151+
// PopulateRecordStartedResponse populates the response for HandleStarted.
156152
func (a *Activity) PopulateRecordStartedResponse(ctx chasm.Context, key chasm.ExecutionKey, response *historyservice.RecordActivityTaskStartedResponse) error {
157153
lastHeartbeat, _ := a.LastHeartbeat.TryGet(ctx)
158154
if lastHeartbeat != nil {
@@ -183,15 +179,22 @@ func (a *Activity) PopulateRecordStartedResponse(ctx chasm.Context, key chasm.Ex
183179
return nil
184180
}
185181

182+
// RecordCompleted applies the provided function to record activity completion.
186183
func (a *Activity) RecordCompleted(ctx chasm.MutableContext, applyFn func(ctx chasm.MutableContext) error) error {
187184
return applyFn(ctx)
188185
}
189186

190187
// HandleCompleted updates the activity on activity completion.
191-
func (a *Activity) HandleCompleted(ctx chasm.MutableContext, request *historyservice.RespondActivityTaskCompletedRequest) (
192-
*historyservice.RespondActivityTaskCompletedResponse, error,
193-
) {
194-
if err := TransitionCompleted.Apply(a, ctx, request); err != nil {
188+
func (a *Activity) HandleCompleted(
189+
ctx chasm.MutableContext,
190+
input WithToken[*historyservice.RespondActivityTaskCompletedRequest],
191+
) (*historyservice.RespondActivityTaskCompletedResponse, error) {
192+
// TODO(dan): add test coverage for this validation
193+
if err := a.validateActivityTaskToken(ctx, input.Token); err != nil {
194+
return nil, err
195+
}
196+
197+
if err := TransitionCompleted.Apply(a, ctx, input.Request); err != nil {
195198
return nil, err
196199
}
197200

@@ -200,40 +203,50 @@ func (a *Activity) HandleCompleted(ctx chasm.MutableContext, request *historyser
200203

201204
// HandleFailed updates the activity on activity failure. if the activity is retryable, it will be rescheduled
202205
// for retry instead.
203-
func (a *Activity) HandleFailed(ctx chasm.MutableContext, req *historyservice.RespondActivityTaskFailedRequest) (
204-
*historyservice.RespondActivityTaskFailedResponse, error,
205-
) {
206-
failure := req.GetFailedRequest().GetFailure()
207-
208-
shouldRetry, retryInterval, err := a.shouldRetryOnFailure(ctx, failure)
209-
if err != nil {
206+
func (a *Activity) HandleFailed(
207+
ctx chasm.MutableContext,
208+
input WithToken[*historyservice.RespondActivityTaskFailedRequest],
209+
) (*historyservice.RespondActivityTaskFailedResponse, error) {
210+
// TODO(dan): add test coverage for this validation
211+
if err := a.validateActivityTaskToken(ctx, input.Token); err != nil {
210212
return nil, err
211213
}
212214

213-
if shouldRetry {
214-
if err := TransitionRescheduled.Apply(a, ctx, rescheduleEvent{
215-
retryInterval: retryInterval,
216-
failure: failure,
217-
}); err != nil {
215+
failure := input.Request.GetFailedRequest().GetFailure()
216+
217+
appFailure := failure.GetApplicationFailureInfo()
218+
isRetryable := appFailure != nil &&
219+
!appFailure.GetNonRetryable() &&
220+
!slices.Contains(a.GetRetryPolicy().GetNonRetryableErrorTypes(), appFailure.GetType())
221+
222+
if isRetryable {
223+
rescheduled, err := a.tryReschedule(ctx, appFailure.GetNextRetryDelay().AsDuration(), failure)
224+
if err != nil {
218225
return nil, err
219226
}
220-
221-
return &historyservice.RespondActivityTaskFailedResponse{}, nil
227+
if rescheduled {
228+
return &historyservice.RespondActivityTaskFailedResponse{}, nil
229+
}
222230
}
223231

224-
// No more retries, transition to failed state
225-
if err := TransitionFailed.Apply(a, ctx, req); err != nil {
232+
if err := TransitionFailed.Apply(a, ctx, input.Request); err != nil {
226233
return nil, err
227234
}
228235

229236
return &historyservice.RespondActivityTaskFailedResponse{}, nil
230237
}
231238

232239
// HandleCanceled updates the activity on activity canceled.
233-
func (a *Activity) HandleCanceled(ctx chasm.MutableContext, request *historyservice.RespondActivityTaskCanceledRequest) (
234-
*historyservice.RespondActivityTaskCanceledResponse, error,
235-
) {
236-
if err := TransitionCanceled.Apply(a, ctx, request.GetCancelRequest().GetDetails()); err != nil {
240+
func (a *Activity) HandleCanceled(
241+
ctx chasm.MutableContext,
242+
input WithToken[*historyservice.RespondActivityTaskCanceledRequest],
243+
) (*historyservice.RespondActivityTaskCanceledResponse, error) {
244+
// TODO(dan): add test coverage for this validation
245+
if err := a.validateActivityTaskToken(ctx, input.Token); err != nil {
246+
return nil, err
247+
}
248+
249+
if err := TransitionCanceled.Apply(a, ctx, input.Request.GetCancelRequest().GetDetails()); err != nil {
237250
return nil, err
238251
}
239252

@@ -250,9 +263,9 @@ func (a *Activity) handleTerminated(ctx chasm.MutableContext, req *activitypb.Te
250263
return &activitypb.TerminateActivityExecutionResponse{}, nil
251264
}
252265

253-
// getLastHeartbeat retrieves the last heartbeat state, initializing it if not present. The heartbeat is lazily created
266+
// getOrCreateLastHeartbeat retrieves the last heartbeat state, initializing it if not present. The heartbeat is lazily created
254267
// to avoid unnecessary writes when heartbeats are not used.
255-
func (a *Activity) getLastHeartbeat(ctx chasm.MutableContext) *activitypb.ActivityHeartbeatState {
268+
func (a *Activity) getOrCreateLastHeartbeat(ctx chasm.MutableContext) *activitypb.ActivityHeartbeatState {
256269
heartbeat, ok := a.LastHeartbeat.TryGet(ctx)
257270
if !ok {
258271
heartbeat = &activitypb.ActivityHeartbeatState{}
@@ -300,26 +313,6 @@ func (a *Activity) handleCancellationRequested(ctx chasm.MutableContext, req *ac
300313
return &activitypb.RequestCancelActivityExecutionResponse{}, nil
301314
}
302315

303-
func (a *Activity) shouldRetryOnFailure(ctx chasm.Context, failure *failurepb.Failure) (bool, time.Duration, error) {
304-
var isRetryable bool
305-
306-
if failure.GetApplicationFailureInfo() != nil {
307-
appFailure := failure.GetApplicationFailureInfo()
308-
isRetryable = !appFailure.GetNonRetryable() && !slices.Contains(
309-
a.GetRetryPolicy().GetNonRetryableErrorTypes(),
310-
appFailure.GetType(),
311-
)
312-
}
313-
314-
if !isRetryable {
315-
return false, 0, nil
316-
}
317-
318-
overridingRetryInterval := failure.GetApplicationFailureInfo().GetNextRetryDelay().AsDuration()
319-
320-
return a.shouldRetry(ctx, overridingRetryInterval)
321-
}
322-
323316
// recordScheduleToStartOrCloseTimeoutFailure records schedule-to-start or schedule-to-close timeouts. Such timeouts are not retried so we
324317
// set the outcome failure directly and leave the attempt failure as is.
325318
func (a *Activity) recordScheduleToStartOrCloseTimeoutFailure(ctx chasm.MutableContext, timeoutType enumspb.TimeoutType) error {
@@ -361,8 +354,6 @@ func (a *Activity) recordFailedAttempt(
361354
}
362355
attempt.CompleteTime = currentTime
363356

364-
// If the activity has exhausted retries, mark the outcome failure as well but don't store duplicate failure info.
365-
// Also reset the retry interval as there won't be any more retries.
366357
if noRetriesLeft {
367358
attempt.CurrentRetryInterval = nil
368359
} else {
@@ -371,24 +362,38 @@ func (a *Activity) recordFailedAttempt(
371362
return nil
372363
}
373364

374-
func (a *Activity) shouldRetry(ctx chasm.Context, overridingRetryInterval time.Duration) (bool, time.Duration, error) {
365+
// tryReschedule attempts to reschedule the activity for retry. Returns true if rescheduled, false
366+
// if retry is not possible.
367+
func (a *Activity) tryReschedule(
368+
ctx chasm.MutableContext,
369+
overridingRetryInterval time.Duration,
370+
failure *failurepb.Failure,
371+
) (bool, error) {
372+
shouldRetry, retryInterval := a.shouldRetry(ctx, overridingRetryInterval)
373+
if !shouldRetry {
374+
return false, nil
375+
}
376+
return true, TransitionRescheduled.Apply(a, ctx, rescheduleEvent{
377+
retryInterval: retryInterval,
378+
failure: failure,
379+
})
380+
}
381+
382+
func (a *Activity) shouldRetry(ctx chasm.Context, overridingRetryInterval time.Duration) (bool, time.Duration) {
375383
if !TransitionRescheduled.Possible(a) {
376-
return false, 0, nil
384+
return false, 0
377385
}
378386
attempt := a.LastAttempt.Get(ctx)
379387
retryPolicy := a.RetryPolicy
380388

381389
enoughAttempts := retryPolicy.GetMaximumAttempts() == 0 || attempt.GetCount() < retryPolicy.GetMaximumAttempts()
382-
enoughTime, retryInterval, err := a.hasEnoughTimeForRetry(ctx, overridingRetryInterval)
383-
if err != nil {
384-
return false, 0, err
385-
}
386-
return enoughAttempts && enoughTime, retryInterval, nil
390+
enoughTime, retryInterval := a.hasEnoughTimeForRetry(ctx, overridingRetryInterval)
391+
return enoughAttempts && enoughTime, retryInterval
387392
}
388393

389394
// hasEnoughTimeForRetry checks if there is enough time left in the schedule-to-close timeout. If sufficient time
390-
// remains, it will also return a valid retry interval
391-
func (a *Activity) hasEnoughTimeForRetry(ctx chasm.Context, overridingRetryInterval time.Duration) (bool, time.Duration, error) {
395+
// remains, it will also return a valid retry interval.
396+
func (a *Activity) hasEnoughTimeForRetry(ctx chasm.Context, overridingRetryInterval time.Duration) (bool, time.Duration) {
392397
attempt := a.LastAttempt.Get(ctx)
393398

394399
// Use overriding retry interval if provided, else calculate based on retry policy
@@ -399,11 +404,11 @@ func (a *Activity) hasEnoughTimeForRetry(ctx chasm.Context, overridingRetryInter
399404

400405
scheduleToClose := a.GetScheduleToCloseTimeout().AsDuration()
401406
if scheduleToClose == 0 {
402-
return true, retryInterval, nil
407+
return true, retryInterval
403408
}
404409

405410
deadline := a.ScheduleTime.AsTime().Add(scheduleToClose)
406-
return ctx.Now(a).Add(retryInterval).Before(deadline), retryInterval, nil
411+
return ctx.Now(a).Add(retryInterval).Before(deadline), retryInterval
407412
}
408413

409414
func createStartToCloseTimeoutFailure() *failurepb.Failure {
@@ -417,12 +422,43 @@ func createStartToCloseTimeoutFailure() *failurepb.Failure {
417422
}
418423
}
419424

420-
func (a *Activity) RecordHeartbeat(ctx chasm.MutableContext, details *commonpb.Payloads) (chasm.NoValue, error) {
425+
func createHeartbeatTimeoutFailure() *failurepb.Failure {
426+
return &failurepb.Failure{
427+
Message: fmt.Sprintf(common.FailureReasonActivityTimeout, enumspb.TIMEOUT_TYPE_HEARTBEAT.String()),
428+
FailureInfo: &failurepb.Failure_TimeoutFailureInfo{
429+
TimeoutFailureInfo: &failurepb.TimeoutFailureInfo{
430+
TimeoutType: enumspb.TIMEOUT_TYPE_HEARTBEAT,
431+
},
432+
},
433+
}
434+
}
435+
436+
// RecordHeartbeat records a heartbeat for the activity.
437+
func (a *Activity) RecordHeartbeat(
438+
ctx chasm.MutableContext,
439+
input WithToken[*historyservice.RecordActivityTaskHeartbeatRequest],
440+
) (*historyservice.RecordActivityTaskHeartbeatResponse, error) {
441+
err := a.validateActivityTaskToken(ctx, input.Token)
442+
if err != nil {
443+
return nil, err
444+
}
421445
a.LastHeartbeat = chasm.NewDataField(ctx, &activitypb.ActivityHeartbeatState{
422446
RecordedTime: timestamppb.New(ctx.Now(a)),
423-
Details: details,
447+
Details: input.Request.GetHeartbeatRequest().GetDetails(),
424448
})
425-
return nil, nil
449+
ctx.AddTask(
450+
a,
451+
chasm.TaskAttributes{
452+
ScheduledTime: ctx.Now(a).Add(a.GetHeartbeatTimeout().AsDuration()),
453+
},
454+
&activitypb.HeartbeatTimeoutTask{
455+
Attempt: a.LastAttempt.Get(ctx).GetCount(),
456+
},
457+
)
458+
return &historyservice.RecordActivityTaskHeartbeatResponse{
459+
CancelRequested: a.Status == activitypb.ACTIVITY_EXECUTION_STATUS_CANCEL_REQUESTED,
460+
// TODO(dan): ActivityPaused, ActivityReset
461+
}, nil
426462
}
427463

428464
func (a *Activity) buildActivityExecutionInfo(ctx chasm.Context) (*activity.ActivityExecutionInfo, error) {
@@ -577,3 +613,18 @@ func (a *Activity) StoreOrSelf(ctx chasm.Context) ActivityStore {
577613
}
578614
return a
579615
}
616+
617+
// validateActivityTaskToken validates a task token against the current activity state.
618+
func (a *Activity) validateActivityTaskToken(
619+
ctx chasm.Context,
620+
token *tokenspb.Task,
621+
) error {
622+
if a.Status != activitypb.ACTIVITY_EXECUTION_STATUS_STARTED &&
623+
a.Status != activitypb.ACTIVITY_EXECUTION_STATUS_CANCEL_REQUESTED {
624+
return serviceerror.NewNotFound("activity task not found")
625+
}
626+
if token.Attempt != a.LastAttempt.Get(ctx).GetCount() {
627+
return serviceerror.NewNotFound("activity task not found")
628+
}
629+
return nil
630+
}

0 commit comments

Comments
 (0)