Skip to content

Commit 0ba9efc

Browse files
committed
CHASM update consistency level
1 parent d0764be commit 0ba9efc

15 files changed

Lines changed: 435 additions & 182 deletions

File tree

chasm/engine.go

Lines changed: 42 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -24,27 +24,27 @@ type Engine interface {
2424
context.Context,
2525
ComponentRef,
2626
func(MutableContext) (RootComponent, error),
27-
func(MutableContext, Component) error,
27+
func(MutableContext, Component, *Registry) error,
2828
...TransitionOption,
2929
) (EngineUpdateWithStartExecutionResult, error)
3030

3131
UpdateComponent(
3232
context.Context,
3333
ComponentRef,
34-
func(MutableContext, Component) error,
34+
func(MutableContext, Component, *Registry) error,
3535
...TransitionOption,
3636
) ([]byte, error)
3737
ReadComponent(
3838
context.Context,
3939
ComponentRef,
40-
func(Context, Component) error,
40+
func(Context, Component, *Registry) error,
4141
...TransitionOption,
4242
) error
4343

4444
PollComponent(
4545
context.Context,
4646
ComponentRef,
47-
func(Context, Component) (bool, error),
47+
func(Context, Component, *Registry) (bool, error),
4848
...TransitionOption,
4949
) ([]byte, error)
5050

@@ -65,6 +65,25 @@ type DeleteExecutionRequest struct {
6565
TerminateComponentRequest
6666
}
6767

68+
// ConsistencyLevel controls how strictly a [ComponentRef] is validated
69+
// against the current execution state. Ordered from strongest to loosest.
70+
type ConsistencyLevel int
71+
72+
const (
73+
// ConsistencyLevelExecution is the default. Validates [ComponentRef.executionLastUpdateVT].
74+
ConsistencyLevelExecution ConsistencyLevel = iota
75+
// ConsistencyLevelComponent validates only [ComponentRef.componentInitialVT].
76+
// Use when the token may have been generated before a failover changed the
77+
// execution-level VT, but the component itself is still valid on the current branch.
78+
ConsistencyLevelComponent
79+
// ConsistencyLevelBusinessID skips all [VersionedTransition] checks and matches
80+
// by [ComponentRef.componentPath] only. Falls back to the latest open run if the
81+
// referenced [ComponentRef.RunID] points to a closed execution.
82+
// Use when the original run may have been reset: the operation should target the
83+
// same component in the new run, matched by path and deduplicated by request ID.
84+
ConsistencyLevelBusinessID
85+
)
86+
6887
type BusinessIDReusePolicy int
6988

7089
const (
@@ -82,10 +101,11 @@ const (
82101
)
83102

84103
type TransitionOptions struct {
85-
ReusePolicy BusinessIDReusePolicy
86-
ConflictPolicy BusinessIDConflictPolicy
87-
RequestID string
88-
Speculative bool
104+
ReusePolicy BusinessIDReusePolicy
105+
ConflictPolicy BusinessIDConflictPolicy
106+
RequestID string
107+
Speculative bool
108+
ConsistencyLevel ConsistencyLevel
89109
}
90110

91111
type TransitionOption func(*TransitionOptions)
@@ -172,6 +192,16 @@ func WithRequestID(
172192
}
173193
}
174194

195+
// WithConsistencyLevel sets the consistency level used when resolving a component reference.
196+
// See [ConsistencyLevel] for details on each level.
197+
func WithConsistencyLevel(
198+
level ConsistencyLevel,
199+
) TransitionOption {
200+
return func(opts *TransitionOptions) {
201+
opts.ConsistencyLevel = level
202+
}
203+
}
204+
175205
// Not needed for V1
176206
// func WithEagerLoading(
177207
// paths []ComponentPath,
@@ -255,7 +285,7 @@ func UpdateWithStartExecution[C RootComponent, I any, O any](
255285
c, err = startFn(mutableContext, input)
256286
return c, err
257287
},
258-
func(mutableContext MutableContext, c Component) (retErr error) {
288+
func(mutableContext MutableContext, c Component, _ *Registry) (retErr error) {
259289
defer log.CapturePanic(mutableContext.Logger(), &retErr)
260290

261291
var err error
@@ -306,7 +336,7 @@ func UpdateComponent[C any, R []byte | ComponentRef, I any, O any](
306336
newSerializedRef, err := engineFromContext(ctx).UpdateComponent(
307337
ctx,
308338
ref,
309-
func(mutableContext MutableContext, c Component) (retErr error) {
339+
func(mutableContext MutableContext, c Component, _ *Registry) (retErr error) {
310340
defer log.CapturePanic(mutableContext.Logger(), &retErr)
311341

312342
var err error
@@ -345,7 +375,7 @@ func ReadComponent[C any, R []byte | ComponentRef, I any, O any](
345375
err = engineFromContext(ctx).ReadComponent(
346376
ctx,
347377
ref,
348-
func(chasmContext Context, c Component) (retErr error) {
378+
func(chasmContext Context, c Component, _ *Registry) (retErr error) {
349379
defer log.CapturePanic(chasmContext.Logger(), &retErr)
350380

351381
var err error
@@ -386,7 +416,7 @@ func PollComponent[C any, R []byte | ComponentRef, I any, O any](
386416
newSerializedRef, err := engineFromContext(ctx).PollComponent(
387417
ctx,
388418
ref,
389-
func(chasmContext Context, c Component) (_ bool, retErr error) {
419+
func(chasmContext Context, c Component, _ *Registry) (_ bool, retErr error) {
390420
defer log.CapturePanic(chasmContext.Logger(), &retErr)
391421

392422
out, satisfied, err := monotonicPredicate(

chasm/engine_mock.go

Lines changed: 4 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

chasm/field_test.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ func (s *fieldSuite) TestFieldGetComponent() {
119119

120120
chasmContext := NewMutableContext(context.Background(), node)
121121

122-
c, err := node.Component(chasmContext, ComponentRef{componentPath: rootPath})
122+
c, err := node.Component(chasmContext, ComponentRef{componentPath: rootPath}, ConsistencyLevelExecution)
123123
s.NoError(err)
124124
s.NotNil(c)
125125

@@ -216,7 +216,7 @@ func (s *fieldSuite) TestDeferredPointerResolution() {
216216

217217
// Get components from tree to mark nodes as needing sync.
218218

219-
rootComponentInterface, err := rootNode.Component(ctx, ComponentRef{})
219+
rootComponentInterface, err := rootNode.Component(ctx, ComponentRef{}, ConsistencyLevelExecution)
220220
s.NoError(err)
221221
rootComponent = rootComponentInterface.(*TestComponent)
222222
sc1 = rootComponent.SubComponent1.Get(ctx)
@@ -293,7 +293,7 @@ func (s *fieldSuite) TestMixedPointerScenario() {
293293
s.NoError(err)
294294

295295
// Get components from tree to mark nodes as needing sync.
296-
rootComponentInterface, err := rootNode.Component(ctx, ComponentRef{})
296+
rootComponentInterface, err := rootNode.Component(ctx, ComponentRef{}, ConsistencyLevelExecution)
297297
s.NoError(err)
298298
rootComponent = rootComponentInterface.(*TestComponent)
299299

@@ -308,7 +308,7 @@ func (s *fieldSuite) TestMixedPointerScenario() {
308308
// otherwise those nodes will not be marked as dirty.
309309

310310
ctx2 := NewMutableContext(context.Background(), rootNode)
311-
rootComponentInterface, err = rootNode.Component(ctx2, ComponentRef{})
311+
rootComponentInterface, err = rootNode.Component(ctx2, ComponentRef{}, ConsistencyLevelExecution)
312312
s.NoError(err)
313313

314314
rootComponent = rootComponentInterface.(*TestComponent)
@@ -372,7 +372,7 @@ func (s *fieldSuite) TestUnresolvableDeferredPointerError() {
372372
s.NoError(err)
373373

374374
// Get component from tree to mark node as needing sync.
375-
rootComponentInterface, err := rootNode.Component(ctx, ComponentRef{})
375+
rootComponentInterface, err := rootNode.Component(ctx, ComponentRef{}, ConsistencyLevelExecution)
376376
s.NoError(err)
377377
rootComponent = rootComponentInterface.(*TestComponent)
378378

chasm/lib/callback/tasks_test.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -229,7 +229,7 @@ func TestExecuteInvocationTaskNexus_Outcomes(t *testing.T) {
229229
gomock.Any(),
230230
gomock.Any(),
231231
gomock.Any(),
232-
).DoAndReturn(func(ctx context.Context, ref chasm.ComponentRef, readFn func(chasm.Context, chasm.Component) error, opts ...chasm.TransitionOption) error {
232+
).DoAndReturn(func(ctx context.Context, ref chasm.ComponentRef, readFn func(chasm.Context, chasm.Component, *chasm.Registry) error, opts ...chasm.TransitionOption) error {
233233
mockCtx := &chasm.MockContext{
234234
HandleNow: func(component chasm.Component) time.Time {
235235
return timeSource.Now()
@@ -238,14 +238,14 @@ func TestExecuteInvocationTaskNexus_Outcomes(t *testing.T) {
238238
return []byte{}, nil
239239
},
240240
}
241-
return readFn(mockCtx, callback)
241+
return readFn(mockCtx, callback, nil)
242242
})
243243

244244
mockEngine.EXPECT().UpdateComponent(
245245
gomock.Any(),
246246
gomock.Any(),
247247
gomock.Any(),
248-
).DoAndReturn(func(ctx context.Context, ref chasm.ComponentRef, updateFn func(chasm.MutableContext, chasm.Component) error, opts ...chasm.TransitionOption) ([]any, error) {
248+
).DoAndReturn(func(ctx context.Context, ref chasm.ComponentRef, updateFn func(chasm.MutableContext, chasm.Component, *chasm.Registry) error, opts ...chasm.TransitionOption) ([]any, error) {
249249
mockCtx := &chasm.MockMutableContext{
250250
MockContext: chasm.MockContext{
251251
HandleNow: func(component chasm.Component) time.Time {
@@ -256,7 +256,7 @@ func TestExecuteInvocationTaskNexus_Outcomes(t *testing.T) {
256256
},
257257
},
258258
}
259-
err := updateFn(mockCtx, callback)
259+
err := updateFn(mockCtx, callback, nil)
260260
return nil, err
261261
})
262262

@@ -613,7 +613,7 @@ func TestExecuteInvocationTaskChasm_Outcomes(t *testing.T) {
613613
gomock.Any(),
614614
gomock.Any(),
615615
gomock.Any(),
616-
).DoAndReturn(func(ctx context.Context, ref chasm.ComponentRef, readFn func(chasm.Context, chasm.Component) error, opts ...chasm.TransitionOption) error {
616+
).DoAndReturn(func(ctx context.Context, ref chasm.ComponentRef, readFn func(chasm.Context, chasm.Component, *chasm.Registry) error, opts ...chasm.TransitionOption) error {
617617
// Create a mock context
618618
mockCtx := &chasm.MockContext{
619619
HandleNow: func(component chasm.Component) time.Time {
@@ -632,14 +632,14 @@ func TestExecuteInvocationTaskChasm_Outcomes(t *testing.T) {
632632
}
633633

634634
// Call the readFn with our callback
635-
return readFn(mockCtx, callback)
635+
return readFn(mockCtx, callback, nil)
636636
})
637637

638638
mockEngine.EXPECT().UpdateComponent(
639639
gomock.Any(),
640640
gomock.Any(),
641641
gomock.Any(),
642-
).DoAndReturn(func(ctx context.Context, ref chasm.ComponentRef, updateFn func(chasm.MutableContext, chasm.Component) error, opts ...chasm.TransitionOption) ([]any, error) {
642+
).DoAndReturn(func(ctx context.Context, ref chasm.ComponentRef, updateFn func(chasm.MutableContext, chasm.Component, *chasm.Registry) error, opts ...chasm.TransitionOption) ([]any, error) {
643643
// Create a mock mutable context
644644
mockCtx := &chasm.MockMutableContext{
645645
MockContext: chasm.MockContext{
@@ -653,7 +653,7 @@ func TestExecuteInvocationTaskChasm_Outcomes(t *testing.T) {
653653
}
654654

655655
// Call the updateFn with our callback
656-
err := updateFn(mockCtx, callback)
656+
err := updateFn(mockCtx, callback, nil)
657657
return nil, err
658658
})
659659

chasm/lib/scheduler/backfiller_tasks_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ type backfillTestCase struct {
2929

3030
func runBackfillTestCase(t *testing.T, env *testEnv, c *backfillTestCase) {
3131
ctx := env.MutableContext()
32-
schedComponent, err := env.Node.Component(ctx, chasm.ComponentRef{})
32+
schedComponent, err := env.Node.Component(ctx, chasm.ComponentRef{}, chasm.ConsistencyLevelExecution)
3333
require.NoError(t, err)
3434
sched := schedComponent.(*scheduler.Scheduler)
3535
invoker := sched.Invoker.Get(ctx)
@@ -229,7 +229,7 @@ func TestBackfillTask_PartialFill(t *testing.T) {
229229
}
230230

231231
ctx := env.MutableContext()
232-
schedComponent, err := env.Node.Component(ctx, chasm.ComponentRef{})
232+
schedComponent, err := env.Node.Component(ctx, chasm.ComponentRef{}, chasm.ConsistencyLevelExecution)
233233
require.NoError(t, err)
234234
sched := schedComponent.(*scheduler.Scheduler)
235235
backfiller := sched.NewRangeBackfiller(ctx, request)
@@ -241,7 +241,7 @@ func TestBackfillTask_PartialFill(t *testing.T) {
241241

242242
// Backfiller should still exist (not complete).
243243
ctx = env.MutableContext()
244-
schedComponent, err = env.Node.Component(ctx, chasm.ComponentRef{})
244+
schedComponent, err = env.Node.Component(ctx, chasm.ComponentRef{}, chasm.ConsistencyLevelExecution)
245245
require.NoError(t, err)
246246
sched = schedComponent.(*scheduler.Scheduler)
247247
_, ok := sched.Backfillers[backfiller.BackfillId].TryGet(ctx)

chasm/lib/scheduler/helper_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -299,8 +299,8 @@ func (e *testEnv) ExpectReadComponent(ctx chasm.Context, returnedComponent chasm
299299
e.t.Fatal("ExpectReadComponent requires withMockEngine() option")
300300
}
301301
e.MockEngine.EXPECT().ReadComponent(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
302-
DoAndReturn(func(_ context.Context, _ chasm.ComponentRef, readFn func(chasm.Context, chasm.Component) error, _ ...chasm.TransitionOption) error {
303-
return readFn(ctx, returnedComponent)
302+
DoAndReturn(func(_ context.Context, _ chasm.ComponentRef, readFn func(chasm.Context, chasm.Component, *chasm.Registry) error, _ ...chasm.TransitionOption) error {
303+
return readFn(ctx, returnedComponent, nil)
304304
}).Times(1)
305305
}
306306

@@ -310,8 +310,8 @@ func (e *testEnv) ExpectUpdateComponent(ctx chasm.MutableContext, componentToUpd
310310
e.t.Fatal("ExpectUpdateComponent requires withMockEngine() option")
311311
}
312312
e.MockEngine.EXPECT().UpdateComponent(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
313-
DoAndReturn(func(_ context.Context, _ chasm.ComponentRef, updateFn func(chasm.MutableContext, chasm.Component) error, _ ...chasm.TransitionOption) ([]byte, error) {
314-
err := updateFn(ctx, componentToUpdate)
313+
DoAndReturn(func(_ context.Context, _ chasm.ComponentRef, updateFn func(chasm.MutableContext, chasm.Component, *chasm.Registry) error, _ ...chasm.TransitionOption) ([]byte, error) {
314+
err := updateFn(ctx, componentToUpdate, nil)
315315
return nil, err
316316
}).Times(1)
317317
}

chasm/ref.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,18 @@ type ComponentRef struct {
5454
validationFn func(NodeBackend, Context, Component, *Registry) error
5555
}
5656

57+
// RelaxToBusinessID prepares a ComponentRef for cross-run resolution by clearing all
58+
// versioned transition fields. After this call, the ref identifies a component solely
59+
// by its path within the execution tree. The RunID is also cleared so that the framework
60+
// will resolve the ref against the latest open execution for the business ID.
61+
// This is used internally when ConsistencyLevelBusinessID is in effect and the original
62+
// run is closed.
63+
func (r *ComponentRef) RelaxToBusinessID() {
64+
r.RunID = ""
65+
r.executionLastUpdateVT = nil
66+
r.componentInitialVT = nil
67+
}
68+
5769
// NewComponentRef creates a new ComponentRef with a registered root component go type.
5870
//
5971
// In V1, if you don't have a ref,

chasm/ref_test.go

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,3 +98,40 @@ func (s *componentRefSuite) TestSerializeDeserialize() {
9898
s.Equal(ref.ExecutionKey, deserializedRef.ExecutionKey)
9999
s.Equal(ref.componentPath, deserializedRef.componentPath)
100100
}
101+
102+
func (s *componentRefSuite) TestRelaxToBusinessID() {
103+
ref := ComponentRef{
104+
ExecutionKey: ExecutionKey{
105+
NamespaceID: primitives.NewUUID().String(),
106+
BusinessID: primitives.NewUUID().String(),
107+
RunID: primitives.NewUUID().String(),
108+
},
109+
archetypeID: 42,
110+
executionLastUpdateVT: &persistencespb.VersionedTransition{
111+
NamespaceFailoverVersion: rand.Int63(),
112+
TransitionCount: rand.Int63(),
113+
},
114+
componentPath: []string{"root", "child"},
115+
componentInitialVT: &persistencespb.VersionedTransition{
116+
NamespaceFailoverVersion: rand.Int63(),
117+
TransitionCount: rand.Int63(),
118+
},
119+
}
120+
121+
originalNamespaceID := ref.NamespaceID
122+
originalBusinessID := ref.BusinessID
123+
originalComponentPath := ref.componentPath
124+
125+
ref.RelaxToBusinessID()
126+
127+
// RunID and all VTs should be cleared.
128+
s.Empty(ref.RunID)
129+
s.Nil(ref.executionLastUpdateVT)
130+
s.Nil(ref.componentInitialVT)
131+
132+
// Other fields should be preserved.
133+
s.Equal(originalNamespaceID, ref.NamespaceID)
134+
s.Equal(originalBusinessID, ref.BusinessID)
135+
s.Equal(ArchetypeID(42), ref.archetypeID)
136+
s.Equal(originalComponentPath, ref.componentPath)
137+
}

0 commit comments

Comments
 (0)