From 4325002b4a7424f96bb187b0e964fe63223c1c1b Mon Sep 17 00:00:00 2001 From: Dan Davison Date: Fri, 28 Nov 2025 17:32:49 -0500 Subject: [PATCH 1/2] Fix ScheduleToClose bug --- chasm/lib/activity/activity_tasks.go | 12 +--- .../activity/gen/activitypb/v1/tasks.pb.go | 17 +---- chasm/lib/activity/proto/v1/tasks.proto | 3 - chasm/lib/activity/statemachine.go | 4 +- tests/standalone_activity_test.go | 66 +++++++++++++++++-- 5 files changed, 68 insertions(+), 34 deletions(-) diff --git a/chasm/lib/activity/activity_tasks.go b/chasm/lib/activity/activity_tasks.go index c1b0634014..6924dfaad3 100644 --- a/chasm/lib/activity/activity_tasks.go +++ b/chasm/lib/activity/activity_tasks.go @@ -103,18 +103,12 @@ func newScheduleToCloseTimeoutTaskExecutor() *scheduleToCloseTimeoutTaskExecutor } func (e *scheduleToCloseTimeoutTaskExecutor) Validate( - ctx chasm.Context, + _ chasm.Context, activity *Activity, _ chasm.TaskAttributes, - task *activitypb.ScheduleToCloseTimeoutTask, + _ *activitypb.ScheduleToCloseTimeoutTask, ) (bool, error) { - attempt, err := activity.LastAttempt.Get(ctx) - if err != nil { - return false, err - } - - valid := TransitionTimedOut.Possible(activity) && task.Attempt == attempt.Count - return valid, nil + return TransitionTimedOut.Possible(activity), nil } func (e *scheduleToCloseTimeoutTaskExecutor) Execute( diff --git a/chasm/lib/activity/gen/activitypb/v1/tasks.pb.go b/chasm/lib/activity/gen/activitypb/v1/tasks.pb.go index 00843c6bcb..2da7102b85 100644 --- a/chasm/lib/activity/gen/activitypb/v1/tasks.pb.go +++ b/chasm/lib/activity/gen/activitypb/v1/tasks.pb.go @@ -115,10 +115,7 @@ func (x *ScheduleToStartTimeoutTask) GetAttempt() int32 { } type ScheduleToCloseTimeoutTask struct { - state protoimpl.MessageState `protogen:"open.v1"` - // The current attempt number for this activity execution. Since task validation/exec happen outside of a lock, we - // need to guard against any concurrent operations where the originally intended task may be outdated. - Attempt int32 `protobuf:"varint,1,opt,name=attempt,proto3" json:"attempt,omitempty"` + state protoimpl.MessageState `protogen:"open.v1"` unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } @@ -153,13 +150,6 @@ func (*ScheduleToCloseTimeoutTask) Descriptor() ([]byte, []int) { return file_temporal_server_chasm_lib_activity_proto_v1_tasks_proto_rawDescGZIP(), []int{2} } -func (x *ScheduleToCloseTimeoutTask) GetAttempt() int32 { - if x != nil { - return x.Attempt - } - return 0 -} - type StartToCloseTimeoutTask struct { state protoimpl.MessageState `protogen:"open.v1"` // The current attempt number for this activity execution. Since task validation/exec happen outside of a lock, we @@ -214,9 +204,8 @@ const file_temporal_server_chasm_lib_activity_proto_v1_tasks_proto_rawDesc = "" "\x14ActivityDispatchTask\x12\x18\n" + "\aattempt\x18\x01 \x01(\x05R\aattempt\"6\n" + "\x1aScheduleToStartTimeoutTask\x12\x18\n" + - "\aattempt\x18\x01 \x01(\x05R\aattempt\"6\n" + - "\x1aScheduleToCloseTimeoutTask\x12\x18\n" + - "\aattempt\x18\x01 \x01(\x05R\aattempt\"3\n" + + "\aattempt\x18\x01 \x01(\x05R\aattempt\"\x1c\n" + + "\x1aScheduleToCloseTimeoutTask\"3\n" + "\x17StartToCloseTimeoutTask\x12\x18\n" + "\aattempt\x18\x01 \x01(\x05R\aattemptBDZBgo.temporal.io/server/chasm/lib/activity/gen/activitypb;activitypbb\x06proto3" diff --git a/chasm/lib/activity/proto/v1/tasks.proto b/chasm/lib/activity/proto/v1/tasks.proto index 8c559e5258..8015d8134a 100644 --- a/chasm/lib/activity/proto/v1/tasks.proto +++ b/chasm/lib/activity/proto/v1/tasks.proto @@ -17,9 +17,6 @@ message ScheduleToStartTimeoutTask { } message ScheduleToCloseTimeoutTask { - // The current attempt number for this activity execution. Since task validation/exec happen outside of a lock, we - // need to guard against any concurrent operations where the originally intended task may be outdated. - int32 attempt = 1; } message StartToCloseTimeoutTask { diff --git a/chasm/lib/activity/statemachine.go b/chasm/lib/activity/statemachine.go index 19e59e9bc5..29a915a65f 100644 --- a/chasm/lib/activity/statemachine.go +++ b/chasm/lib/activity/statemachine.go @@ -62,9 +62,7 @@ var TransitionScheduled = chasm.NewTransition( chasm.TaskAttributes{ ScheduledTime: currentTime.Add(timeout), }, - &activitypb.ScheduleToCloseTimeoutTask{ - Attempt: attempt.GetCount(), - }) + &activitypb.ScheduleToCloseTimeoutTask{}) } ctx.AddTask( diff --git a/tests/standalone_activity_test.go b/tests/standalone_activity_test.go index c9cf20590b..1fdd7a8f4e 100644 --- a/tests/standalone_activity_test.go +++ b/tests/standalone_activity_test.go @@ -760,6 +760,67 @@ func (s *standaloneActivityTestSuite) TestCompletedActivity_CannotTerminate() { require.Error(t, err) } +func (s *standaloneActivityTestSuite) TestScheduleToCloseTimeout_WithRetry() { + t := s.T() + ctx, cancel := context.WithTimeout(t.Context(), 30*time.Second) + defer cancel() + activityID := testcore.RandomizeStr(t.Name()) + taskQueue := testcore.RandomizeStr(t.Name()) + + // Start an activity + startResp, err := s.FrontendClient().StartActivityExecution(ctx, &workflowservice.StartActivityExecutionRequest{ + Namespace: s.Namespace().String(), + ActivityId: activityID, + ActivityType: &commonpb.ActivityType{ + Name: "test-activity-type", + }, + Options: &activitypb.ActivityOptions{ + TaskQueue: &taskqueuepb.TaskQueue{ + Name: taskQueue, + }, + // It's not possible to guarantee (e.g. via NextRetryDelay or RetryPolicy) that a retry + // will start with a delay <1s because of the use of TimerProcessorMaxTimeShift in the + // timer queue. Therefore we allow 1s for the ActivityDispatchTask to be executed, and + // time out the activity 1s into Attempt 2. + ScheduleToCloseTimeout: durationpb.New(2 * time.Second), + }, + }) + require.NoError(t, err) + + // Fail attempt 1, causing the attempt counter to increment. + pollTaskResp, err := s.pollActivityTaskQueue(ctx, taskQueue) + require.NoError(t, err) + _, err = s.FrontendClient().RespondActivityTaskFailed(ctx, &workflowservice.RespondActivityTaskFailedRequest{ + Namespace: s.Namespace().String(), + TaskToken: pollTaskResp.TaskToken, + Failure: &failurepb.Failure{ + Message: "Retryable failure", + FailureInfo: &failurepb.Failure_ApplicationFailureInfo{ApplicationFailureInfo: &failurepb.ApplicationFailureInfo{ + NonRetryable: false, + NextRetryDelay: durationpb.New(1 * time.Second), + }}, + }, + }) + require.NoError(t, err) + pollTaskResp, err = s.pollActivityTaskQueue(ctx, taskQueue) + require.NoError(t, err) + + // Wait for schedule-to-close timeout. + pollResp, err := s.FrontendClient().PollActivityExecution(ctx, &workflowservice.PollActivityExecutionRequest{ + Namespace: s.Namespace().String(), + ActivityId: activityID, + RunId: startResp.RunId, + IncludeInfo: true, + IncludeOutcome: true, + WaitPolicy: &workflowservice.PollActivityExecutionRequest_WaitCompletion{ + WaitCompletion: &workflowservice.PollActivityExecutionRequest_CompletionWaitOptions{}, + }, + }) + require.NoError(t, err) + require.Equal(t, enumspb.ACTIVITY_EXECUTION_STATUS_TIMED_OUT, pollResp.GetInfo().GetStatus()) + require.Equal(t, enumspb.TIMEOUT_TYPE_SCHEDULE_TO_CLOSE, pollResp.GetFailure().GetTimeoutFailureInfo().GetTimeoutType()) +} + // TestStartToCloseTimeout tests that a start-to-close timeout is recorded after the activity is // started. It also verifies that PollActivityExecution can be used to poll for a TimedOut state // change caused by execution of a timer task. @@ -865,11 +926,6 @@ func (s *standaloneActivityTestSuite) TestStartToCloseTimeout() { "expected StartToCloseTimeout but is %s", pollResp.GetFailure().GetTimeoutFailureInfo().GetTimeoutType()) } -func (s *standaloneActivityTestSuite) TestScheduleToCloseTimeout() { - // TODO implement when we have PollActivityExecution. Make sure we check the attempt vs. outcome failure population. - s.T().Skip("Temporarily disabled") -} - func (s *standaloneActivityTestSuite) TestPollActivityExecution_NoWait() { t := s.T() ctx, cancel := context.WithTimeout(t.Context(), 10*time.Second) From 58f13fc34b57f2a383d3d0b0a978438984ef7700 Mon Sep 17 00:00:00 2001 From: Dan Davison Date: Fri, 5 Dec 2025 19:21:02 -0500 Subject: [PATCH 2/2] Lint --- tests/standalone_activity_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/standalone_activity_test.go b/tests/standalone_activity_test.go index 1fdd7a8f4e..e696d0c331 100644 --- a/tests/standalone_activity_test.go +++ b/tests/standalone_activity_test.go @@ -802,7 +802,7 @@ func (s *standaloneActivityTestSuite) TestScheduleToCloseTimeout_WithRetry() { }, }) require.NoError(t, err) - pollTaskResp, err = s.pollActivityTaskQueue(ctx, taskQueue) + _, err = s.pollActivityTaskQueue(ctx, taskQueue) require.NoError(t, err) // Wait for schedule-to-close timeout.