Skip to content

Commit 876ee5b

Browse files
author
Samu
authored
Chunk instance volumes to avoid API limits (#635)
1 parent d7fd67a commit 876ee5b

5 files changed

Lines changed: 83 additions & 39 deletions

File tree

pkg/cloudprovider/aws/storage_state.go

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ package aws
33
import (
44
"context"
55
"fmt"
6+
"maps"
7+
"slices"
68

79
"github.com/aws/aws-sdk-go-v2/aws"
810
"github.com/aws/aws-sdk-go-v2/service/ec2"
@@ -11,6 +13,10 @@ import (
1113
"github.com/castai/kvisor/pkg/cloudprovider/types"
1214
)
1315

16+
const (
17+
instanceChunkSize = 20
18+
)
19+
1420
func (p *Provider) GetStorageState(ctx context.Context, instanceIds ...string) (*types.StorageState, error) {
1521
p.log.Debug("refreshing storage state")
1622

@@ -19,7 +25,7 @@ func (p *Provider) GetStorageState(ctx context.Context, instanceIds ...string) (
1925
Provider: types.TypeAWS,
2026
}
2127

22-
instanceVolumes, err := p.fetchInstanceVolumes(ctx, instanceIds...)
28+
instanceVolumes, err := p.chunkAndFetchInstanceVolumes(ctx, instanceIds...)
2329
if err != nil {
2430
return nil, fmt.Errorf("fetching volumes: %w", err)
2531
}
@@ -28,14 +34,31 @@ func (p *Provider) GetStorageState(ctx context.Context, instanceIds ...string) (
2834
return state, nil
2935
}
3036

31-
// fetchInstanceVolumes retrieves instance volumes from https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_Volume.html
32-
func (p *Provider) fetchInstanceVolumes(ctx context.Context, instanceIds ...string) (map[string][]types.Volume, error) {
37+
// chunkAndFetchInstanceVolumes chunks instance IDs into smaller batches and fetches volumes for each batch
38+
// to avoid API limits
39+
func (p *Provider) chunkAndFetchInstanceVolumes(ctx context.Context, instanceIds ...string) (map[string][]types.Volume, error) {
3340
instanceVolumes := make(map[string][]types.Volume)
3441

3542
if len(instanceIds) == 0 {
3643
return instanceVolumes, nil
3744
}
3845

46+
for chunk := range slices.Chunk(instanceIds, instanceChunkSize) {
47+
chunkVolumes, err := p.fetchInstanceVolumes(ctx, chunk)
48+
if err != nil {
49+
return nil, err
50+
}
51+
52+
maps.Copy(instanceVolumes, chunkVolumes)
53+
}
54+
55+
return instanceVolumes, nil
56+
}
57+
58+
// fetchInstanceVolumes retrieves instance volumes from https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_Volume.html
59+
func (p *Provider) fetchInstanceVolumes(ctx context.Context, instanceIds []string) (map[string][]types.Volume, error) {
60+
instanceVolumes := make(map[string][]types.Volume)
61+
3962
input := &ec2.DescribeVolumesInput{
4063
Filters: []ec2types.Filter{
4164
{

pkg/cloudprovider/aws/test/.env.example

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,8 @@
66
AWS_PROFILE=default
77

88
# Required: EC2 Instance ID to test volume listing
9-
# Example: i-1234567890abcdef0
10-
AWS_INSTANCE_ID=
9+
# Example: i-1234567890abcdef0,i-0987654321fedcba0
10+
AWS_INSTANCE_IDS=
1111

1212
# Optional: AWS Account ID
1313
AWS_ACCOUNT_ID=

pkg/cloudprovider/aws/test/integration_test.go

Lines changed: 24 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ package integration_test
55
import (
66
"context"
77
"os"
8+
"strings"
89
"testing"
910

1011
"github.com/joho/godotenv"
@@ -14,7 +15,7 @@ import (
1415
)
1516

1617
// Use .env file or run tests with environment variables:
17-
// AWS_PROFILE=default AWS_INSTANCE_ID=i-1234567890abcdef0 go test -v ./pkg/cloudprovider/aws/test/...
18+
// AWS_PROFILE=default AWS_INSTANCE_IDS=i-1234567890abcdef0,i-0987654321fedcba0 go test -v ./pkg/cloudprovider/aws/test/...
1819
func init() {
1920
_ = godotenv.Load(".env")
2021
}
@@ -60,34 +61,34 @@ func TestGetStorageState(t *testing.T) {
6061

6162
p := provider.(*aws.Provider)
6263

63-
// Test storage state refresh with instance ID
64-
instanceID := os.Getenv("AWS_INSTANCE_ID")
65-
if instanceID == "" {
66-
t.Fatal("AWS_INSTANCE_ID not set")
64+
instanceIDsStr := os.Getenv("AWS_INSTANCE_IDS")
65+
if instanceIDsStr == "" {
66+
t.Fatal("AWS_INSTANCE_IDS not set")
6767
}
6868

69-
// Get the cached storage state
70-
state, err := p.GetStorageState(ctx, instanceID)
71-
if err != nil {
72-
t.Fatalf("GetStorageState failed: %v", err)
69+
instanceIDs := strings.Split(instanceIDsStr, ",")
70+
for i := range instanceIDs {
71+
instanceIDs[i] = strings.TrimSpace(instanceIDs[i])
7372
}
7473

75-
volumes, ok := state.InstanceVolumes[instanceID]
76-
if !ok {
77-
t.Fatalf("No volumes found for instance %s", instanceID)
74+
state, err := p.GetStorageState(ctx, instanceIDs...)
75+
if err != nil {
76+
t.Fatalf("GetStorageState failed: %v", err)
7877
}
7978

80-
t.Logf("Found %d volumes attached to instance %s:", len(volumes), instanceID)
81-
for _, v := range volumes {
82-
t.Logf(" Volume:")
83-
t.Logf(" VolumeID: %s", v.VolumeID)
84-
t.Logf(" VolumeType: %s", v.VolumeType)
85-
t.Logf(" VolumeState: %s", v.VolumeState)
86-
t.Logf(" SizeBytes: %d", v.SizeBytes)
87-
t.Logf(" AvailabilityZone: %s", v.Zone)
88-
t.Logf(" Encrypted: %v", v.Encrypted)
89-
t.Logf(" IOPS: %d", v.IOPS)
90-
t.Logf(" ThroughputBytes: %d B/s", v.ThroughputBytes)
79+
for instanceID, volumes := range state.InstanceVolumes {
80+
t.Logf("Found %d volumes attached to instance %s:", len(volumes), instanceID)
81+
for _, v := range volumes {
82+
t.Logf(" Volume:")
83+
t.Logf(" VolumeID: %s", v.VolumeID)
84+
t.Logf(" VolumeType: %s", v.VolumeType)
85+
t.Logf(" VolumeState: %s", v.VolumeState)
86+
t.Logf(" SizeBytes: %d", v.SizeBytes)
87+
t.Logf(" Zone: %s", v.Zone)
88+
t.Logf(" Encrypted: %v", v.Encrypted)
89+
t.Logf(" IOPS: %d", v.IOPS)
90+
t.Logf(" ThroughputBytes: %d B/s", v.ThroughputBytes)
91+
}
9192
}
9293
}
9394

pkg/cloudprovider/gcp/storage_state.go

Lines changed: 30 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,10 @@ import (
44
"context"
55
"errors"
66
"fmt"
7+
"maps"
78
"math"
89
"path"
10+
"slices"
911
"strings"
1012

1113
"cloud.google.com/go/compute/apiv1/computepb"
@@ -15,6 +17,10 @@ import (
1517
"github.com/castai/kvisor/pkg/cloudprovider/types"
1618
)
1719

20+
const (
21+
instanceChunkSize = 20
22+
)
23+
1824
func (p *Provider) GetStorageState(ctx context.Context, instanceIds ...string) (*types.StorageState, error) {
1925
p.log.Debug("refreshing storage state")
2026

@@ -23,7 +29,7 @@ func (p *Provider) GetStorageState(ctx context.Context, instanceIds ...string) (
2329
Provider: types.TypeGCP,
2430
}
2531

26-
instanceVolumes, err := p.fetchInstanceVolumes(ctx, instanceIds...)
32+
instanceVolumes, err := p.chunkAndFetchInstanceVolumes(ctx, instanceIds...)
2733
if err != nil {
2834
return nil, fmt.Errorf("fetching volumes: %w", err)
2935
}
@@ -32,14 +38,31 @@ func (p *Provider) GetStorageState(ctx context.Context, instanceIds ...string) (
3238
return state, nil
3339
}
3440

35-
// fetchInstanceVolumes retrieves instance volumes from https://docs.cloud.google.com/compute/docs/reference/rest/v1/disks/aggregatedList
36-
func (p *Provider) fetchInstanceVolumes(ctx context.Context, instanceIds ...string) (map[string][]types.Volume, error) {
41+
// chunkAndFetchInstanceVolumes chunks instance IDs into smaller batches and fetches volumes for each batch
42+
// to avoid API limits
43+
func (p *Provider) chunkAndFetchInstanceVolumes(ctx context.Context, instanceIds ...string) (map[string][]types.Volume, error) {
3744
instanceVolumes := make(map[string][]types.Volume, len(instanceIds))
3845

3946
if len(instanceIds) == 0 {
4047
return instanceVolumes, nil
4148
}
4249

50+
for chunk := range slices.Chunk(instanceIds, instanceChunkSize) {
51+
chunkVolumes, err := p.fetchInstanceVolumes(ctx, chunk)
52+
if err != nil {
53+
return nil, err
54+
}
55+
56+
maps.Copy(instanceVolumes, chunkVolumes)
57+
}
58+
59+
return instanceVolumes, nil
60+
}
61+
62+
// fetchInstanceVolumes retrieves instance volumes from https://docs.cloud.google.com/compute/docs/reference/rest/v1/disks/aggregatedList
63+
func (p *Provider) fetchInstanceVolumes(ctx context.Context, instanceIds []string) (map[string][]types.Volume, error) {
64+
instanceVolumes := make(map[string][]types.Volume, len(instanceIds))
65+
4366
instanceUrlsMap := make(map[string]string, len(instanceIds))
4467
for _, instanceId := range instanceIds {
4568
url := buildInstanceUrlFromId(instanceId)
@@ -50,6 +73,10 @@ func (p *Provider) fetchInstanceVolumes(ctx context.Context, instanceIds ...stri
5073
instanceUrlsMap[url] = instanceId
5174
}
5275

76+
if len(instanceUrlsMap) == 0 {
77+
return instanceVolumes, nil
78+
}
79+
5380
filter := buildDisksUsedByInstanceFilter(lo.Keys(instanceUrlsMap))
5481

5582
req := &computepb.AggregatedListDisksRequest{

pkg/cloudprovider/gcp/test/integration_test.go

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -113,14 +113,7 @@ func TestGetStorageState(t *testing.T) {
113113
t.Fatalf("GetStorageState failed: %v", err)
114114
}
115115

116-
for _, instanceID := range instanceIDs {
117-
t.Logf("Testing instance: %s", instanceID)
118-
119-
volumes, ok := state.InstanceVolumes[instanceID]
120-
if !ok {
121-
t.Fatalf("No volumes found for instance %s", instanceID)
122-
}
123-
116+
for instanceID, volumes := range state.InstanceVolumes {
124117
t.Logf("Found %d volumes attached to instance %s:", len(volumes), instanceID)
125118
for _, v := range volumes {
126119
t.Logf(" Volume:")

0 commit comments

Comments
 (0)