Skip to content

Commit 3796405

Browse files
committed
Add hwm
1 parent 16192e2 commit 3796405

4 files changed

Lines changed: 81 additions & 63 deletions

File tree

chasm/lib/activity/activity_tasks.go

Lines changed: 18 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"go.temporal.io/server/common/resource"
1010
"go.temporal.io/server/common/util"
1111
"go.uber.org/fx"
12+
"google.golang.org/protobuf/types/known/timestamppb"
1213
)
1314

1415
type activityDispatchTaskExecutorOptions struct {
@@ -159,41 +160,43 @@ func newHeartbeatTimeoutTaskExecutor() *heartbeatTimeoutTaskExecutor {
159160
func (e *heartbeatTimeoutTaskExecutor) Validate(
160161
ctx chasm.Context,
161162
activity *Activity,
162-
_ chasm.TaskAttributes,
163+
taskAttrs chasm.TaskAttributes,
163164
task *activitypb.HeartbeatTimeoutTask,
164165
) (bool, error) {
165166
validStatus := activity.Status == activitypb.ACTIVITY_EXECUTION_STATUS_STARTED ||
166167
activity.Status == activitypb.ACTIVITY_EXECUTION_STATUS_CANCEL_REQUESTED
167-
return validStatus && activity.LastAttempt.Get(ctx).GetCount() == task.Attempt, nil
168+
if !validStatus || activity.LastAttempt.Get(ctx).GetCount() != task.Attempt {
169+
return false, nil
170+
}
171+
// High-water-mark: reject tasks that have already been executed.
172+
hwm := activity.GetLastHeartbeatTaskScheduledTime().AsTime()
173+
return taskAttrs.ScheduledTime.After(hwm), nil
168174
}
169175

170176
// Execute executes a HeartbeatTimeoutTask.
171177
func (e *heartbeatTimeoutTaskExecutor) Execute(
172178
ctx chasm.MutableContext,
173179
activity *Activity,
174-
_ chasm.TaskAttributes,
180+
taskAttrs chasm.TaskAttributes,
175181
_ *activitypb.HeartbeatTimeoutTask,
176182
) error {
177-
// Let T = user-configured heartbeat timeout and let hb_i be the time of the ith user-submitted
178-
// heartbeat request. (hb_0 = 0 since we always start a timer task when an attempt starts).
179-
180183
// There are two concurrent processes:
181-
// 1. A worker is sending heartbeats at times hb_i.
184+
// 1. A worker is sending heartbeats.
182185
// 2. This task is being executed at (shortly after) certain scheduled times.
183186

184187
// Each time we execute this function, our task is to look back into the past and determine
185-
// whether more than T has elapsed since the last heartbeat. If it has, we fail the attempt (and
186-
// decide between retrying or failing the activity). If it has not, then we schedule a new timer
187-
// task to execute this function at the new deadline, i.e. lastHeartbeatTime+HeartbeatTimeout.
188-
//
189-
// Task validation has established that an attempt is currently in progress and that it is the
190-
// attempt for which this heartbeat timer was originally set.
188+
// whether more than (user-configured heartbeat timeout) has elapsed since the last heartbeat.
189+
// If it has, we fail the attempt (and decide between retrying or failing the activity). If it
190+
// has not, then we schedule a new timer task to execute this function at the new deadline.
191+
192+
// Update high-water-mark so this task is invalidated after execution.
193+
activity.LastHeartbeatTaskScheduledTime = timestamppb.New(taskAttrs.ScheduledTime)
191194

192195
attempt := activity.LastAttempt.Get(ctx)
193-
lastHb, _ := activity.LastHeartbeat.TryGet(ctx)
194196
hbTimeout := activity.GetHeartbeatTimeout().AsDuration()
195197
attemptStartTime := attempt.GetStartedTime().AsTime()
196-
lastHbTime := lastHb.GetRecordedTime().AsTime() // could be from a previous attempt or could be zero
198+
heartbeat, _ := activity.LastHeartbeat.TryGet(ctx)
199+
lastHbTime := heartbeat.GetRecordedTime().AsTime() // could be from a previous attempt or could be zero
197200
// No heartbeats in the attempt so far is equivalent to a heartbeat having been sent at attempt
198201
// start time.
199202
hbDeadline := util.MaxTime(lastHbTime, attemptStartTime).Add(hbTimeout)

chasm/lib/activity/gen/activitypb/v1/activity_state.pb.go

Lines changed: 54 additions & 42 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

chasm/lib/activity/proto/v1/activity_state.proto

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -80,22 +80,25 @@ message ActivityState {
8080
google.protobuf.Duration start_to_close_timeout = 5;
8181
// Maximum permitted time between successful worker heartbeats.
8282
google.protobuf.Duration heartbeat_timeout = 6;
83+
// High-water-mark for heartbeat timeout task validation. Updated when a heartbeat
84+
// timeout task executes. Tasks with ScheduledTime <= this value are stale.
85+
google.protobuf.Timestamp last_heartbeat_task_scheduled_time = 7;
8386
// The retry policy for the activity. Will never exceed `schedule_to_close_timeout`.
84-
temporal.api.common.v1.RetryPolicy retry_policy = 7;
87+
temporal.api.common.v1.RetryPolicy retry_policy = 8;
8588

8689
// All of the possible activity statuses (covers both the public ActivityExecutionStatus and PendingActivityState).
8790
// TODO: consider moving this into ActivityAttemptState and renaming that message. This could save mutating two
8891
// components on each attempt transition.
89-
ActivityExecutionStatus status = 8;
92+
ActivityExecutionStatus status = 9;
9093

9194
// Time the activity was originally scheduled via a StartActivityExecution request.
92-
google.protobuf.Timestamp schedule_time = 9;
95+
google.protobuf.Timestamp schedule_time = 10;
9396

9497
// Priority metadata.
95-
temporal.api.common.v1.Priority priority = 10;
98+
temporal.api.common.v1.Priority priority = 11;
9699

97100
// Set if activity cancelation was requested.
98-
ActivityCancelState cancel_state = 11;
101+
ActivityCancelState cancel_state = 12;
99102
}
100103

101104
message ActivityCancelState {

chasm/lib/scheduler/proto/v1/message.proto

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ import "google/protobuf/timestamp.proto";
1313

1414
// CHASM scheduler top-level state.
1515
message SchedulerState {
16-
// Scheduler request parameters and metadata.
16+
// Scheduler request parameters and metadata.
1717
temporal.api.schedule.v1.Schedule schedule = 2;
1818
temporal.api.schedule.v1.ScheduleInfo info = 3;
1919

0 commit comments

Comments
 (0)