-
Notifications
You must be signed in to change notification settings - Fork 1.4k
PollComponent and PollActivityExecution #8563
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
f8ccafb
46eb18e
11cac65
11c6b59
b24a022
0db9858
c888969
5740320
1d27df4
1107a99
2ee2b75
ea12fcc
5c2c41e
c02c869
e1a52d4
24ef87c
5412f7e
612426f
7958efd
3904355
00ef0ff
da60095
6d5fc28
a604d80
6917450
27aafa7
c623906
e05af5a
6bd6d57
f325df8
099e6da
7b23898
b120e9c
7e67862
055044f
affc857
fa4e0ea
f6c5290
b540eb0
d1045de
f48a2b8
c98c8f9
18159b6
c4a0f1a
310ae97
4ca1d33
ccd63a4
7178c01
a42fb44
c0ed2df
5bd95d7
ecf4139
74e0a69
e904d87
5b8a8be
88e153d
a500aa0
23fe864
04a462d
e22f108
ff48584
a84784c
3485fb9
2f0b863
0fe23be
8a410b6
8e83fb0
d8d7721
65b83a0
8dbd866
f34f222
4146871
ddff1b5
dc248f4
1b46b23
c7f4ab0
270c13b
fbc77f3
e3d74a7
1f169fb
2f10ee2
0dda6cc
d3c709c
1e5ca51
36fb610
e5d900b
07cc8cf
b7e5c98
5cb1637
561b090
8087fa5
e4dd35d
1e43970
1ad72b5
6c95de3
794238b
dcd3077
78478bb
a7fd245
3e17cb2
e50b04e
0654825
d547542
f414f2d
da1aada
b2420de
09d2dee
4332601
98f5e13
f3115e3
afef2ac
fb8d285
bf9eef7
68ff227
d4936d4
59d7496
1533b51
5f9a521
bcdecdc
7fd8e83
7c3391f
3d78f0e
40b40df
c1fdc4c
9e218dc
956c047
389a5d1
179c1ab
a6835e0
1b821f3
8aa7189
f12d710
1018228
4a3e8c3
34b3f8a
f28fe2a
26c6058
605beb3
ba38880
eae7532
f8f4992
6820744
ab0e5cc
ead88c0
ab1a8ab
1f413ce
9902b0d
2c24dfc
1d59e66
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -42,10 +42,12 @@ type Engine interface { | |
| PollComponent( | ||
| context.Context, | ||
| ComponentRef, | ||
| func(Context, Component) (any, bool, error), | ||
| func(MutableContext, Component, any) error, | ||
| func(Context, Component) (bool, error), | ||
| ...TransitionOption, | ||
| ) ([]byte, error) | ||
|
|
||
| // NotifyExecution notifies any PollComponent callers waiting on the execution. | ||
| NotifyExecution(EntityKey) | ||
| } | ||
|
|
||
| type BusinessIDReusePolicy int | ||
|
|
@@ -176,6 +178,9 @@ func UpdateWithNewEntity[C Component, I any, O1 any, O2 any]( | |
| // - consider remove ComponentRef from the return value and allow components to get | ||
| // the ref in the transition function. There are some caveats there, check the | ||
| // comment of the NewRef method in MutableContext. | ||
| // | ||
| // UpdateComponent applies updateFn to the component identified by the supplied component reference. | ||
| // It returns the result, along with the new component reference. opts are currently ignored. | ||
| func UpdateComponent[C Component, R []byte | ComponentRef, I any, O any]( | ||
| ctx context.Context, | ||
| r R, | ||
|
|
@@ -207,6 +212,8 @@ func UpdateComponent[C Component, R []byte | ComponentRef, I any, O any]( | |
| return output, newSerializedRef, err | ||
| } | ||
|
|
||
| // ReadComponent returns the result of evaluating readFn against the component identified by the | ||
| // component reference. opts are currently ignored. | ||
| func ReadComponent[C Component, R []byte | ComponentRef, I any, O any]( | ||
| ctx context.Context, | ||
| r R, | ||
|
|
@@ -234,18 +241,18 @@ func ReadComponent[C Component, R []byte | ComponentRef, I any, O any]( | |
| return output, err | ||
| } | ||
|
|
||
| type PollComponentRequest[C Component, I any, O any] struct { | ||
| Ref ComponentRef | ||
| PredicateFn func(C, Context, I) bool | ||
| OperationFn func(C, MutableContext, I) (O, error) | ||
| Input I | ||
| } | ||
|
|
||
| func PollComponent[C Component, R []byte | ComponentRef, I any, O any, T any]( | ||
| // PollComponent waits until the predicate is true when evaluated against the component identified | ||
| // by the supplied component reference. If this times out due to a server-imposed long-poll timeout | ||
| // then it returns (nil, nil, nil), as an indication that the caller should continue long-polling. | ||
| // Otherwise it returns (output, ref, err), where output is the output of the predicate function, | ||
| // and ref is a component reference identifying the state at which the predicate was satisfied. The | ||
| // predicate must be monotonic: if it returns true at execution state transition s then it must | ||
| // return true at all transitions t > s. If the predicate is true at the outset then PollComponent | ||
| // returns immediately. opts are currently ignored. | ||
| func PollComponent[C Component, R []byte | ComponentRef, I any, O any]( | ||
| ctx context.Context, | ||
| r R, | ||
| predicateFn func(C, Context, I) (T, bool, error), | ||
| operationFn func(C, MutableContext, I, T) (O, error), | ||
| monotonicPredicate func(C, Context, I) (O, bool, error), | ||
| input I, | ||
| opts ...TransitionOption, | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We can get rid of the transition options right?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We can. |
||
| ) (O, []byte, error) { | ||
|
|
@@ -259,13 +266,12 @@ func PollComponent[C Component, R []byte | ComponentRef, I any, O any, T any]( | |
| newSerializedRef, err := engineFromContext(ctx).PollComponent( | ||
| ctx, | ||
| ref, | ||
| func(ctx Context, c Component) (any, bool, error) { | ||
| return predicateFn(c.(C), ctx, input) | ||
| }, | ||
| func(ctx MutableContext, c Component, t any) error { | ||
| var err error | ||
| output, err = operationFn(c.(C), ctx, input, t.(T)) | ||
| return err | ||
| func(ctx Context, c Component) (bool, error) { | ||
| out, satisfied, err := monotonicPredicate(c.(C), ctx, input) | ||
| if satisfied { | ||
| output = out | ||
| } | ||
| return satisfied, err | ||
| }, | ||
| opts..., | ||
| ) | ||
|
|
||
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -4,11 +4,13 @@ import ( | |
| "fmt" | ||
| "time" | ||
|
|
||
| "go.temporal.io/api/activity/v1" | ||
| commonpb "go.temporal.io/api/common/v1" | ||
| deploymentpb "go.temporal.io/api/deployment/v1" | ||
| enumspb "go.temporal.io/api/enums/v1" | ||
| failurepb "go.temporal.io/api/failure/v1" | ||
| historypb "go.temporal.io/api/history/v1" | ||
| "go.temporal.io/api/serviceerror" | ||
| "go.temporal.io/api/workflowservice/v1" | ||
| "go.temporal.io/server/api/historyservice/v1" | ||
| "go.temporal.io/server/api/matchingservice/v1" | ||
|
|
@@ -37,14 +39,12 @@ type Activity struct { | |
|
|
||
| *activitypb.ActivityState | ||
|
|
||
| // Standalone only | ||
| Visibility chasm.Field[*chasm.Visibility] | ||
| Attempt chasm.Field[*activitypb.ActivityAttemptState] | ||
| LastHeartbeat chasm.Field[*activitypb.ActivityHeartbeatState] | ||
| // Standalone only | ||
| RequestData chasm.Field[*activitypb.ActivityRequestData] | ||
| Outcome chasm.Field[*activitypb.ActivityOutcome] | ||
|
|
||
| // Pointer to an implementation of the "store". for a workflow activity this would be a parent pointer back to | ||
| // the workflow. For a standalone activity this would be nil. | ||
| // TODO: revisit a standalone activity pointing to itself once we handle storing it more efficiently. | ||
|
|
@@ -314,3 +314,141 @@ func (a *Activity) RecordHeartbeat(ctx chasm.MutableContext, details *commonpb.P | |
| }) | ||
| return nil, nil | ||
| } | ||
|
|
||
| func (a *Activity) buildActivityExecutionInfo(ctx chasm.Context) (*activity.ActivityExecutionInfo, error) { | ||
| // TODO(dan): support pause states | ||
| var status enumspb.ActivityExecutionStatus | ||
| var runState enumspb.PendingActivityState | ||
| switch a.GetStatus() { | ||
fretz12 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| case activitypb.ACTIVITY_EXECUTION_STATUS_UNSPECIFIED: | ||
| status = enumspb.ACTIVITY_EXECUTION_STATUS_UNSPECIFIED | ||
| runState = enumspb.PENDING_ACTIVITY_STATE_UNSPECIFIED | ||
| case activitypb.ACTIVITY_EXECUTION_STATUS_SCHEDULED: | ||
| status = enumspb.ACTIVITY_EXECUTION_STATUS_RUNNING | ||
| runState = enumspb.PENDING_ACTIVITY_STATE_SCHEDULED | ||
| case activitypb.ACTIVITY_EXECUTION_STATUS_STARTED: | ||
| status = enumspb.ACTIVITY_EXECUTION_STATUS_RUNNING | ||
| runState = enumspb.PENDING_ACTIVITY_STATE_STARTED | ||
| case activitypb.ACTIVITY_EXECUTION_STATUS_CANCEL_REQUESTED: | ||
| status = enumspb.ACTIVITY_EXECUTION_STATUS_RUNNING | ||
| runState = enumspb.PENDING_ACTIVITY_STATE_CANCEL_REQUESTED | ||
| case activitypb.ACTIVITY_EXECUTION_STATUS_COMPLETED: | ||
| status = enumspb.ACTIVITY_EXECUTION_STATUS_COMPLETED | ||
| runState = enumspb.PENDING_ACTIVITY_STATE_UNSPECIFIED | ||
| case activitypb.ACTIVITY_EXECUTION_STATUS_FAILED: | ||
| status = enumspb.ACTIVITY_EXECUTION_STATUS_FAILED | ||
| runState = enumspb.PENDING_ACTIVITY_STATE_UNSPECIFIED | ||
| case activitypb.ACTIVITY_EXECUTION_STATUS_CANCELED: | ||
| status = enumspb.ACTIVITY_EXECUTION_STATUS_CANCELED | ||
| runState = enumspb.PENDING_ACTIVITY_STATE_UNSPECIFIED | ||
| case activitypb.ACTIVITY_EXECUTION_STATUS_TERMINATED: | ||
| status = enumspb.ACTIVITY_EXECUTION_STATUS_TERMINATED | ||
| runState = enumspb.PENDING_ACTIVITY_STATE_UNSPECIFIED | ||
| case activitypb.ACTIVITY_EXECUTION_STATUS_TIMED_OUT: | ||
| status = enumspb.ACTIVITY_EXECUTION_STATUS_TIMED_OUT | ||
| runState = enumspb.PENDING_ACTIVITY_STATE_UNSPECIFIED | ||
| default: | ||
| return nil, serviceerror.NewInternalf("unknown activity execution status: %s", a.GetStatus()) | ||
| } | ||
|
|
||
| requestData, err := a.RequestData.Get(ctx) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
|
|
||
| attempt, err := a.Attempt.Get(ctx) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
| key := ctx.ExecutionKey() | ||
|
|
||
| info := &activity.ActivityExecutionInfo{ | ||
| ActivityId: key.BusinessID, | ||
| RunId: key.EntityID, | ||
| ActivityType: a.GetActivityType(), | ||
| Status: status, | ||
| RunState: runState, | ||
| ScheduledTime: a.GetScheduledTime(), | ||
| Priority: a.GetPriority(), | ||
| Header: requestData.GetHeader(), | ||
| LastFailure: attempt.GetLastFailureDetails().GetFailure(), | ||
| // TODO(dan): populate remaining fields | ||
| } | ||
|
|
||
| return info, nil | ||
| } | ||
|
|
||
| func (a *Activity) buildPollActivityExecutionResponse( | ||
| ctx chasm.Context, | ||
| req *activitypb.PollActivityExecutionRequest, | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You can just pass in the frontend request here and save the conversion and confusion between
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I get the thinking but I'm not sure it's an improvement: (1) It feels more correct for a (2) The change you're proposing would turn this: if waitPolicy == nil {
return chasm.ReadComponent(ctx, ref, (*Activity).buildPollActivityExecutionResponse, req, nil)
}into this: if waitPolicy == nil {
response, err := chasm.ReadComponent(ctx, ref, (*Activity).buildPollActivityExecutionResponse, req.GetFrontendRequest(), nil)
if err != nil {
return nil, err
}
return &activitypb.PollActivityExecutionResponse{
FrontendResponse: response,
}, nil
}
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. WDYT?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sorry I'd forgotten to submit my pending replies. Reply above.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not strongly opinionated but let's be consistent. |
||
| ) (*activitypb.PollActivityExecutionResponse, error) { | ||
| request := req.GetFrontendRequest() | ||
|
|
||
| token, err := ctx.Ref(a) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
|
|
||
| var info *activity.ActivityExecutionInfo | ||
| if request.GetIncludeInfo() { | ||
| info, err = a.buildActivityExecutionInfo(ctx) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
| } | ||
|
|
||
| var input *commonpb.Payloads | ||
| if request.GetIncludeInput() { | ||
| activityRequest, err := a.RequestData.Get(ctx) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
| input = activityRequest.GetInput() | ||
| } | ||
|
|
||
| response := &workflowservice.PollActivityExecutionResponse{ | ||
| Info: info, | ||
| RunId: ctx.ExecutionKey().EntityID, | ||
bergundy marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| Input: input, | ||
| StateChangeLongPollToken: token, | ||
| } | ||
|
|
||
| if request.GetIncludeOutcome() { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this logic looks good but maybe @fretz12 can take a look too.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it's not correct yet: it's checking for The result will be that we fail to obtain the failure from the attempt last failure details. I'll creating a failing test case on a subsequent PR; marking with a TODO on this branch as its not testable here.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've updated this branch with a correct implementation.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't like the way we set the outcome failure to empty struct on handling |
||
| activityOutcome, err := a.Outcome.Get(ctx) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
| // There are two places where a failure might be stored but only one place where a | ||
| // successful outcome is stored. | ||
| if successful := activityOutcome.GetSuccessful(); successful != nil { | ||
| response.Outcome = &workflowservice.PollActivityExecutionResponse_Result{ | ||
| Result: successful.GetOutput(), | ||
| } | ||
| } else if failure := activityOutcome.GetFailed().GetFailure(); failure != nil { | ||
| response.Outcome = &workflowservice.PollActivityExecutionResponse_Failure{ | ||
| Failure: failure, | ||
| } | ||
| } else { | ||
| shouldHaveFailure := (a.GetStatus() == activitypb.ACTIVITY_EXECUTION_STATUS_FAILED || | ||
| a.GetStatus() == activitypb.ACTIVITY_EXECUTION_STATUS_TIMED_OUT || | ||
| a.GetStatus() == activitypb.ACTIVITY_EXECUTION_STATUS_CANCELED || | ||
| a.GetStatus() == activitypb.ACTIVITY_EXECUTION_STATUS_TERMINATED) | ||
|
|
||
| if shouldHaveFailure { | ||
| attempt, err := a.Attempt.Get(ctx) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
| if details := attempt.GetLastFailureDetails(); details != nil { | ||
| response.Outcome = &workflowservice.PollActivityExecutionResponse_Failure{ | ||
| Failure: details.GetFailure(), | ||
| } | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| return &activitypb.PollActivityExecutionResponse{ | ||
| FrontendResponse: response, | ||
| }, nil | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,35 @@ | ||
| package activity | ||
|
|
||
| import ( | ||
| "time" | ||
|
|
||
| "go.temporal.io/server/common/dynamicconfig" | ||
| ) | ||
|
|
||
| var ( | ||
| LongPollTimeout = dynamicconfig.NewNamespaceDurationSetting( | ||
| "chasm.activity.longPollTimeout", | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not blocking this PR but I don't think putting |
||
| 20*time.Second, | ||
| `Timeout for activity long-poll requests.`, | ||
| ) | ||
|
|
||
| LongPollBuffer = dynamicconfig.NewNamespaceDurationSetting( | ||
| "chasm.activity.longPollBuffer", | ||
| time.Second, | ||
| `A buffer used to adjust the activity long-poll timeouts. | ||
| Specifically, activity long-poll requests are timed out at a time which leaves at least the buffer's duration | ||
| remaining before the caller's deadline, if permitted by the caller's deadline.`, | ||
| ) | ||
| ) | ||
|
|
||
| type Config struct { | ||
| LongPollTimeout dynamicconfig.DurationPropertyFnWithNamespaceFilter | ||
| LongPollBuffer dynamicconfig.DurationPropertyFnWithNamespaceFilter | ||
| } | ||
|
|
||
| func ConfigProvider(dc *dynamicconfig.Collection) *Config { | ||
| return &Config{ | ||
| LongPollTimeout: LongPollTimeout.Get(dc), | ||
| LongPollBuffer: LongPollBuffer.Get(dc), | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've removed
operationFnfromPollComponentsince we don't need poll-with-mutation yet.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This totally works for me. We can discuss how to introduce this when the requirement comes up.