Skip to content

Commit 584fc4e

Browse files
committed
Optimize deployment operation traversal and progress polling
1 parent 75d26d2 commit 584fc4e

5 files changed

Lines changed: 739 additions & 134 deletions

File tree

cli/azd/pkg/infra/azure_resource_manager.go

Lines changed: 182 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -9,17 +9,14 @@ import (
99
"fmt"
1010
"slices"
1111
"strings"
12-
"time"
13-
14-
"maps"
12+
"sync"
1513

1614
"github.com/Azure/azure-sdk-for-go/sdk/azcore/arm"
1715
"github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/resources/armresources"
1816
"github.com/azure/azure-dev/cli/azd/internal"
1917
"github.com/azure/azure-dev/cli/azd/pkg/azapi"
2018
"github.com/azure/azure-dev/cli/azd/pkg/azure"
2119
"github.com/azure/azure-dev/cli/azd/pkg/azureutil"
22-
"github.com/azure/azure-dev/cli/azd/pkg/compare"
2320
"github.com/azure/azure-dev/cli/azd/pkg/output"
2421
)
2522

@@ -29,8 +26,11 @@ type AzureResourceManager struct {
2926
}
3027

3128
type ResourceManager interface {
32-
GetDeploymentResourceOperations(
33-
ctx context.Context, deployment Deployment, queryStart *time.Time) ([]*armresources.DeploymentOperation, error)
29+
WalkDeploymentOperations(
30+
ctx context.Context,
31+
deployment Deployment,
32+
fn WalkDeploymentOperationFunc,
33+
) error
3434
GetResourceTypeDisplayName(
3535
ctx context.Context,
3636
subscriptionId string,
@@ -49,6 +49,19 @@ type ResourceManager interface {
4949
) (string, error)
5050
}
5151

52+
// WalkDeploymentOperationFunc is invoked for each valid deployment operation encountered during traversal.
53+
//
54+
// Returning a non-nil error will halt the traversal and return that error.
55+
// Returning SkipExpand will prevent traversal of any nested deployments within the current operation, but will continue
56+
// traversal of sibling operations.
57+
type WalkDeploymentOperationFunc func(ctx context.Context, operation *armresources.DeploymentOperation) error
58+
59+
// SkipExpand can be returned by WalkDeploymentOperationFunc to skip expanding a nested deployment's children.
60+
var SkipExpand = errors.New("skip deployment expansion")
61+
62+
// maxConcurrentDeploymentFetches limits concurrent ARM API calls when fetching nested deployment operations.
63+
const maxConcurrentDeploymentFetches = 10
64+
5265
func NewAzureResourceManager(
5366
resourceService *azapi.ResourceService,
5467
deploymentService *azapi.StandardDeployments,
@@ -59,31 +72,144 @@ func NewAzureResourceManager(
5972
}
6073
}
6174

62-
// GetDeploymentResourceOperations gets the list of all the resources created as part of the provided deployment.
63-
// Each DeploymentOperation on the list holds a resource and the result of its deployment.
64-
// One deployment operation can trigger new deployment operations, GetDeploymentResourceOperations traverses all
65-
// operations recursively to find the leaf operations.
66-
func (rm *AzureResourceManager) GetDeploymentResourceOperations(
75+
// WalkDeploymentOperations traverses deployment operations in pre-order and allows callers to skip nested expansion.
76+
func (rm *AzureResourceManager) WalkDeploymentOperations(
6777
ctx context.Context,
6878
deployment Deployment,
69-
queryStart *time.Time,
70-
) ([]*armresources.DeploymentOperation, error) {
71-
allOperations := []*armresources.DeploymentOperation{}
72-
79+
fn WalkDeploymentOperationFunc,
80+
) error {
7381
rootDeploymentOperations, err := deployment.Operations(ctx)
7482
if err != nil {
75-
return nil, fmt.Errorf("getting root deployment operations: %w", err)
83+
return fmt.Errorf("getting root deployment operations: %w", err)
7684
}
7785

78-
operationMap := map[string]*armresources.DeploymentOperation{}
79-
if err := rm.appendDeploymentOperationsRecursive(ctx, queryStart, rootDeploymentOperations, operationMap); err != nil {
80-
return nil, err
86+
ctx, cancel := context.WithCancel(ctx)
87+
defer cancel()
88+
89+
jobs := make(chan *arm.ResourceID, maxConcurrentDeploymentFetches)
90+
results := make(chan []*armresources.DeploymentOperation, maxConcurrentDeploymentFetches)
91+
errCh := make(chan error, 1)
92+
93+
var workers sync.WaitGroup
94+
worker := func() {
95+
defer workers.Done()
96+
97+
for resourceID := range jobs {
98+
operations, err := rm.fetchNestedOperations(ctx, resourceID)
99+
if err != nil {
100+
select {
101+
case errCh <- fmt.Errorf("getting deployment operations recursively: %w", err):
102+
default:
103+
}
104+
cancel()
105+
return
106+
}
107+
108+
select {
109+
case <-ctx.Done():
110+
return
111+
case results <- operations:
112+
}
113+
}
114+
}
115+
116+
for range maxConcurrentDeploymentFetches {
117+
workers.Add(1)
118+
go worker()
119+
}
120+
121+
queueNestedDeployments := func(
122+
queue []*arm.ResourceID,
123+
operations []*armresources.DeploymentOperation,
124+
) ([]*arm.ResourceID, error) {
125+
for _, operation := range operations {
126+
if operation.ID == nil || operation.Properties == nil {
127+
continue
128+
}
129+
130+
if fn != nil {
131+
// invoke the walk function for every deployment operation
132+
walkErr := fn(ctx, operation)
133+
if errors.Is(walkErr, SkipExpand) {
134+
continue
135+
} else if walkErr != nil {
136+
return nil, walkErr
137+
}
138+
}
139+
140+
// handle nested deployments
141+
if !isNestedDeployment(operation) {
142+
continue
143+
}
144+
145+
resourceID, err := arm.ParseResourceID(*operation.Properties.TargetResource.ID)
146+
if err != nil {
147+
return nil, fmt.Errorf("parsing deployment resource ID: %w", err)
148+
}
149+
150+
queue = append(queue, resourceID)
151+
}
152+
153+
return queue, nil
154+
}
155+
156+
queue, err := queueNestedDeployments(nil, rootDeploymentOperations)
157+
if err != nil {
158+
close(jobs)
159+
workers.Wait()
160+
return err
81161
}
82162

83-
recursiveOperations := slices.Collect(maps.Values(operationMap))
84-
allOperations = append(allOperations, recursiveOperations...)
163+
pending := 0
85164

86-
return allOperations, nil
165+
// we terminate when there are no more jobs and no more pending operations to fetch
166+
for pending > 0 || len(queue) > 0 {
167+
var nextJob *arm.ResourceID
168+
var jobsCh chan *arm.ResourceID
169+
if len(queue) > 0 {
170+
nextJob = queue[0]
171+
jobsCh = jobs
172+
}
173+
174+
select {
175+
case jobsCh <- nextJob:
176+
queue = queue[1:]
177+
pending++
178+
case <-ctx.Done():
179+
close(jobs)
180+
workers.Wait()
181+
select {
182+
case walkErr := <-errCh:
183+
return walkErr
184+
default:
185+
return ctx.Err()
186+
}
187+
case walkErr := <-errCh:
188+
close(jobs)
189+
workers.Wait()
190+
return walkErr
191+
case nestedOperations := <-results:
192+
pending--
193+
194+
queue, err = queueNestedDeployments(queue, nestedOperations)
195+
if err != nil {
196+
close(jobs)
197+
workers.Wait()
198+
return err
199+
}
200+
}
201+
}
202+
203+
close(jobs)
204+
workers.Wait()
205+
206+
select {
207+
case walkErr := <-errCh:
208+
return walkErr
209+
default:
210+
}
211+
212+
return nil
87213
}
88214

89215
// GetResourceGroupsForEnvironment gets all resources groups for a given environment
@@ -317,64 +443,44 @@ func (rm *AzureResourceManager) getRedisEnterpriseResourceTypeDisplayName(
317443
}
318444
}
319445

320-
// appendDeploymentResourcesRecursive gets the leaf deployment operations and adds them to resourceOperations
321-
// if they are not already in the list.
322-
func (rm *AzureResourceManager) appendDeploymentOperationsRecursive(
323-
ctx context.Context,
324-
queryStart *time.Time,
325-
operations []*armresources.DeploymentOperation,
326-
operationMap map[string]*armresources.DeploymentOperation,
327-
) error {
328-
for _, operation := range operations {
329-
// Operations w/o target data can't be resolved. Ignoring them
330-
if operation.Properties.TargetResource == nil ||
331-
// The time stamp is used to filter only records after the queryStart.
332-
// We ignore the resource if we can't know when it was created
333-
operation.Properties.Timestamp == nil ||
334-
// The resource type is required to resolve the name of the resource.
335-
// If the dep-op is missing this, we can't resolve it.
336-
compare.IsStringNilOrEmpty(operation.Properties.TargetResource.ResourceType) {
337-
continue
338-
}
446+
func isNestedDeployment(operation *armresources.DeploymentOperation) bool {
447+
if operation.Properties.TargetResource == nil ||
448+
operation.Properties.ProvisioningOperation == nil ||
449+
operation.Properties.TargetResource.ResourceType == nil {
450+
return false
451+
}
339452

340-
// Process any nested deployments
341-
if *operation.Properties.TargetResource.ResourceType == string(azapi.AzureResourceTypeDeployment) &&
342-
*operation.Properties.ProvisioningOperation == armresources.ProvisioningOperationCreate {
343-
deploymentResourceId, err := arm.ParseResourceID(*operation.Properties.TargetResource.ID)
344-
if err != nil {
345-
return fmt.Errorf("parsing deployment resource ID: %w", err)
346-
}
453+
return *operation.Properties.TargetResource.ResourceType == string(azapi.AzureResourceTypeDeployment) &&
454+
*operation.Properties.ProvisioningOperation == armresources.ProvisioningOperationCreate
455+
}
347456

348-
var nestedOperations []*armresources.DeploymentOperation
349-
var nestedError error
350-
351-
if deploymentResourceId.ResourceGroupName == "" {
352-
nestedOperations, nestedError = rm.deploymentService.ListSubscriptionDeploymentOperations(
353-
ctx,
354-
deploymentResourceId.SubscriptionID,
355-
deploymentResourceId.Name)
356-
} else {
357-
nestedOperations, nestedError = rm.deploymentService.ListResourceGroupDeploymentOperations(
358-
ctx,
359-
deploymentResourceId.SubscriptionID,
360-
deploymentResourceId.ResourceGroupName,
361-
deploymentResourceId.Name,
362-
)
363-
}
457+
func isTerminalProvisioningState(state *string) bool {
458+
if state == nil {
459+
return false
460+
}
364461

365-
if nestedError != nil {
366-
return fmt.Errorf("getting deployment operations recursively: %w", nestedError)
367-
}
462+
switch *state {
463+
case string(armresources.ProvisioningStateSucceeded),
464+
string(armresources.ProvisioningStateFailed),
465+
string(armresources.ProvisioningStateCanceled):
466+
return true
467+
default:
468+
return false
469+
}
470+
}
368471

369-
if err = rm.appendDeploymentOperationsRecursive(ctx, queryStart, nestedOperations, operationMap); err != nil {
370-
return err
371-
}
372-
} else if *operation.Properties.ProvisioningOperation == armresources.ProvisioningOperationCreate &&
373-
// Only append CREATE operations that started after our queryStart time
374-
operation.Properties.Timestamp.After(*queryStart) {
375-
operationMap[*operation.ID] = operation
376-
}
472+
func (rm *AzureResourceManager) fetchNestedOperations(
473+
ctx context.Context,
474+
resourceID *arm.ResourceID,
475+
) ([]*armresources.DeploymentOperation, error) {
476+
if resourceID.ResourceGroupName == "" {
477+
return rm.deploymentService.ListSubscriptionDeploymentOperations(ctx, resourceID.SubscriptionID, resourceID.Name)
377478
}
378479

379-
return nil
480+
return rm.deploymentService.ListResourceGroupDeploymentOperations(
481+
ctx,
482+
resourceID.SubscriptionID,
483+
resourceID.ResourceGroupName,
484+
resourceID.Name,
485+
)
380486
}

0 commit comments

Comments
 (0)