@@ -13,6 +13,8 @@ public sealed class WorkflowRunnerService
1313
1414 private sealed record RetryPolicy ( int MaxAttempts , int DelayMs , double BackoffFactor , int ? MaxDelayMs ) ;
1515
16+ private sealed record StepPolicy ( RetryPolicy Retry , int ? TimeoutMs ) ;
17+
1618 private readonly PlatformDbContext _db ;
1719
1820 public WorkflowRunnerService ( PlatformDbContext db )
@@ -76,6 +78,33 @@ private static void ExecuteRequireAsync(WorkflowStepRun step, JsonObject context
7678 }
7779 }
7880
81+ private static StepPolicy ParseStepPolicy ( string ? stepConfigJson )
82+ {
83+ var retry = ParseRetryPolicy ( stepConfigJson ) ;
84+ if ( string . IsNullOrWhiteSpace ( stepConfigJson ) )
85+ return new StepPolicy ( Retry : retry , TimeoutMs : null ) ;
86+
87+ try
88+ {
89+ using var doc = JsonDocument . Parse ( stepConfigJson ) ;
90+ var root = doc . RootElement ;
91+
92+ int ? timeoutMs = null ;
93+ if ( root . TryGetProperty ( "timeoutMs" , out var timeoutEl )
94+ && timeoutEl . TryGetInt32 ( out var parsed )
95+ && parsed >= 1 )
96+ {
97+ timeoutMs = parsed ;
98+ }
99+
100+ return new StepPolicy ( Retry : retry , TimeoutMs : timeoutMs ) ;
101+ }
102+ catch
103+ {
104+ return new StepPolicy ( Retry : retry , TimeoutMs : null ) ;
105+ }
106+ }
107+
79108 private static void ExecuteSetAsync ( WorkflowStepRun step )
80109 {
81110 if ( string . IsNullOrWhiteSpace ( step . StepConfigJson ) )
@@ -673,32 +702,72 @@ public async Task<WorkflowRun> StartAsync(WorkflowDefinition wf, string traceId,
673702 _db . WorkflowRuns . Add ( run ) ;
674703 await _db . SaveChangesAsync ( ct ) ;
675704
676- foreach ( var step in run . Steps . OrderBy ( x => x . StepKey ) )
705+ try
677706 {
678- await ExecuteStepAsync ( run , step , context , ct ) ;
679- await _db . SaveChangesAsync ( ct ) ;
680-
681- if ( step . State == WorkflowStepRunStates . Succeeded && ! string . IsNullOrWhiteSpace ( step . OutputJson ) )
707+ foreach ( var step in run . Steps . OrderBy ( x => x . StepKey ) )
682708 {
683- try
709+ await ExecuteStepAsync ( run , step , context , ct ) ;
710+ await _db . SaveChangesAsync ( ct ) ;
711+
712+ if ( step . State == WorkflowStepRunStates . Succeeded && ! string . IsNullOrWhiteSpace ( step . OutputJson ) )
713+ {
714+ try
715+ {
716+ context [ step . StepKey ] = JsonNode . Parse ( step . OutputJson ) ;
717+ }
718+ catch
719+ {
720+ // ignore invalid output json; step succeeded but context won't have it
721+ }
722+ }
723+
724+ if ( step . State == WorkflowStepRunStates . Failed )
684725 {
685- context [ step . StepKey ] = JsonNode . Parse ( step . OutputJson ) ;
726+ run . State = WorkflowRunStates . Failed ;
727+ run . ErrorCode = step . LastErrorCode ;
728+ run . ErrorMessage = step . LastErrorMessage ;
729+ run . FinishedAtUtc = DateTime . UtcNow ;
730+ await _db . SaveChangesAsync ( ct ) ;
731+ return run ;
686732 }
687- catch
733+
734+ if ( step . State == WorkflowStepRunStates . Canceled )
688735 {
689- // ignore invalid output json; step succeeded but context won't have it
736+ run . State = WorkflowRunStates . Canceled ;
737+ run . ErrorCode = step . LastErrorCode ;
738+ run . ErrorMessage = step . LastErrorMessage ;
739+ run . FinishedAtUtc = DateTime . UtcNow ;
740+ await _db . SaveChangesAsync ( ct ) ;
741+ return run ;
690742 }
691743 }
744+ }
745+ catch ( OperationCanceledException ) when ( ct . IsCancellationRequested )
746+ {
747+ var now = DateTime . UtcNow ;
748+ run . State = WorkflowRunStates . Canceled ;
749+ run . ErrorCode = "canceled" ;
750+ run . ErrorMessage = "Canceled." ;
751+ run . FinishedAtUtc = now ;
692752
693- if ( step . State == WorkflowStepRunStates . Failed )
753+ foreach ( var step in run . Steps . Where ( x => x . State is WorkflowStepRunStates . Pending or WorkflowStepRunStates . Running ) )
694754 {
695- run . State = WorkflowRunStates . Failed ;
696- run . ErrorCode = step . LastErrorCode ;
697- run . ErrorMessage = step . LastErrorMessage ;
698- run . FinishedAtUtc = DateTime . UtcNow ;
699- await _db . SaveChangesAsync ( ct ) ;
700- return run ;
755+ step . State = WorkflowStepRunStates . Canceled ;
756+ step . FinishedAtUtc ??= now ;
757+ step . LastErrorCode ??= "canceled" ;
758+ step . LastErrorMessage ??= "Canceled." ;
759+ }
760+
761+ try
762+ {
763+ await _db . SaveChangesAsync ( CancellationToken . None ) ;
701764 }
765+ catch
766+ {
767+ // ignore
768+ }
769+
770+ throw ;
702771 }
703772
704773 run . State = WorkflowRunStates . Succeeded ;
@@ -759,17 +828,17 @@ private static List<WorkflowStepRun> BuildSteps(WorkflowDefinition wf)
759828
760829 private async Task ExecuteStepAsync ( WorkflowRun run , WorkflowStepRun step , JsonObject context , CancellationToken ct )
761830 {
762- var policy = ParseRetryPolicy ( step . StepConfigJson ) ;
831+ var policy = ParseStepPolicy ( step . StepConfigJson ) ;
763832
764833 var startedAt = DateTime . UtcNow ;
765834 step . StartedAtUtc = startedAt ;
766835
767836 Exception ? lastEx = null ;
768- for ( var attemptNumber = 1 ; attemptNumber <= policy . MaxAttempts ; attemptNumber += 1 )
837+ for ( var attemptNumber = 1 ; attemptNumber <= policy . Retry . MaxAttempts ; attemptNumber += 1 )
769838 {
770839 ct . ThrowIfCancellationRequested ( ) ;
771840
772- var delayMs = GetRetryDelayMs ( policy , attemptNumber ) ;
841+ var delayMs = GetRetryDelayMs ( policy . Retry , attemptNumber ) ;
773842 if ( delayMs > 0 )
774843 await Task . Delay ( delayMs , ct ) ;
775844
@@ -779,21 +848,59 @@ private async Task ExecuteStepAsync(WorkflowRun run, WorkflowStepRun step, JsonO
779848 step . LastErrorMessage = null ;
780849 step . OutputJson = null ;
781850
851+ CancellationToken attemptCt = ct ;
852+ CancellationTokenSource ? timeoutCts = null ;
853+ if ( policy . TimeoutMs is not null )
854+ {
855+ timeoutCts = CancellationTokenSource . CreateLinkedTokenSource ( ct ) ;
856+ timeoutCts . CancelAfter ( policy . TimeoutMs . Value ) ;
857+ attemptCt = timeoutCts . Token ;
858+ }
859+
782860 try
783861 {
784862 InterpolateStepConfigJson ( step , context ) ;
785863
786- await ExecuteStepBodyAsync ( step , context , ct ) ;
864+ await ExecuteStepBodyAsync ( step , context , attemptCt ) ;
787865
788866 step . State = WorkflowStepRunStates . Succeeded ;
789867 step . FinishedAtUtc = DateTime . UtcNow ;
790868 return ;
791869 }
870+ catch ( OperationCanceledException oce )
871+ {
872+ lastEx = oce ;
873+
874+ if ( ct . IsCancellationRequested )
875+ {
876+ step . State = WorkflowStepRunStates . Canceled ;
877+ step . FinishedAtUtc = DateTime . UtcNow ;
878+ step . LastErrorCode ??= "canceled" ;
879+ step . LastErrorMessage ??= "Canceled." ;
880+
881+ run . ErrorCode = step . LastErrorCode ;
882+ run . ErrorMessage = step . LastErrorMessage ;
883+ return ;
884+ }
885+
886+ // timeout (linked token canceled)
887+ step . LastErrorCode ??= "workflow_step_timed_out" ;
888+ step . LastErrorMessage ??= $ "Step timed out after { policy . TimeoutMs } ms.";
889+
890+ if ( attemptNumber < policy . Retry . MaxAttempts )
891+ continue ;
892+
893+ step . State = WorkflowStepRunStates . Failed ;
894+ step . FinishedAtUtc = DateTime . UtcNow ;
895+ run . ErrorCode = step . LastErrorCode ;
896+ run . ErrorMessage = step . LastErrorMessage ;
897+ return ;
898+ }
792899 catch ( Exception ex )
793900 {
794901 lastEx = ex ;
795902
796- if ( attemptNumber < policy . MaxAttempts )
903+ if ( attemptNumber < policy . Retry . MaxAttempts )
797904 continue ;
798905
799906 step . State = WorkflowStepRunStates . Failed ;
@@ -805,6 +912,10 @@ private async Task ExecuteStepAsync(WorkflowRun run, WorkflowStepRun step, JsonO
805912 run . ErrorMessage = step . LastErrorMessage ;
806913 return ;
807914 }
915+ finally
916+ {
917+ timeoutCts ? . Dispose ( ) ;
918+ }
808919 }
809920
810921 step . State = WorkflowStepRunStates . Failed ;
0 commit comments