Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
"syscall"
"time"

"github.com/kubev2v/assisted-migration-agent/pkg/vmware"

"github.com/ecordell/optgen/helpers"
"github.com/fatih/color"
"github.com/gin-gonic/gin"
Expand Down Expand Up @@ -109,7 +111,7 @@ func NewRunCommand(cfg *config.Configuration) *cobra.Command {
collectorSrv := services.NewCollectorService(sched, s, workBuilder)

// create inspector service
inspectorSrv := services.NewInspectorService(sched, s).WithBuilder(models.UnimplementedInspectorWorkBuilder{})
inspectorSrv := services.NewInspectorService(sched, s).WithBuilder(vmware.UnimplementedInspectionWorkBuilder{})

consoleSrv, err := services.NewConsoleService(cfg.Agent, sched, consoleClient, collectorSrv, s)
if err != nil {
Expand Down
13 changes: 13 additions & 0 deletions internal/models/inspection.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,16 @@ type InspectionStatus struct {
State InspectionState
Error error
}

// VMWorkflow represents a collection of steps needed by one vm during the workflow.
type VMWorkflow struct {
Validate InspectorWorkUnit
CreateSnapshot InspectorWorkUnit
Inspect InspectorWorkUnit
Save InspectorWorkUnit
RemoveSnapshot InspectorWorkUnit
}

type InspectionWorkBuilder interface {
Build(string) VMWorkflow
}
23 changes: 0 additions & 23 deletions internal/models/inspector.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,6 @@ package models

import (
"context"
"time"

"go.uber.org/zap"
)

// InspectorState represents the current state of the Inspector.
Expand Down Expand Up @@ -33,27 +30,7 @@ type InspectorStatus struct {
Error error
}

type InspectorWorkBuilder interface {
Build(string) []InspectorWorkUnit
}

// InspectorWorkUnit represents a unit of work in the collector workflow.
type InspectorWorkUnit struct {
Work func() func(ctx context.Context) (any, error)
}

type UnimplementedInspectorWorkBuilder struct{}

func (u UnimplementedInspectorWorkBuilder) Build(id string) []InspectorWorkUnit {
return []InspectorWorkUnit{
{
Work: func() func(ctx context.Context) (any, error) {
return func(ctx context.Context) (any, error) {
time.Sleep(10 * time.Second)
zap.S().Named("inspector_service").Infof("unimplemented work finsished for: %s", id)
return nil, nil
}
},
},
}
}
25 changes: 20 additions & 5 deletions internal/services/inspector.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
type InspectorService struct {
scheduler *scheduler.Scheduler
store *store.Store
builder models.InspectorWorkBuilder
builder models.InspectionWorkBuilder

status models.InspectorStatus

Expand Down Expand Up @@ -188,7 +188,7 @@ func (c *InspectorService) IsBusy() bool {
}
}

func (c *InspectorService) WithBuilder(builder models.InspectorWorkBuilder) *InspectorService {
func (c *InspectorService) WithBuilder(builder models.InspectionWorkBuilder) *InspectorService {
c.builder = builder
return c
}
Expand Down Expand Up @@ -234,12 +234,15 @@ func (c *InspectorService) run(ctx context.Context, done chan any) {
switch {
case errors.As(err, &e):
if setError := c.setVmErrorStatus(ctx, id, err); setError != nil {
c.setErrorStatus(err)
c.setErrorStatus(setError)
return
}
continue // VM failed, move to next VM
case errors.Is(err, context.Canceled):
c.setState(models.InspectorStateCanceled)
if err := c.setVmState(ctx, id, models.InspectionStateCanceled); err != nil {
c.setErrorStatus(err)
}
return
default:
c.setErrorStatus(err)
Expand All @@ -260,15 +263,27 @@ func (c *InspectorService) run(ctx context.Context, done chan any) {
zap.S().Info("inspector finished work")
}

func (c *InspectorService) runVMWork(ctx context.Context, id string, units []models.InspectorWorkUnit) error {
func (c *InspectorService) runVMWork(ctx context.Context, id string, workflow models.VMWorkflow) error {
units := []models.InspectorWorkUnit{
workflow.Validate,
workflow.CreateSnapshot,
workflow.Inspect,
workflow.Save,
}

// In any case we would to try removing the snapshot
defer c.scheduler.AddWork(func(ctx context.Context) (any, error) {
return workflow.RemoveSnapshot.Work()(context.Background())
})

// Todo: add a channel that can stop the running vm
for _, unit := range units {

future := c.scheduler.AddWork(func(ctx context.Context) (any, error) {
return unit.Work()(ctx)
})

select {
// Todo: handle the context done case. we may want to run some cleanup tasks
case <-ctx.Done():
future.Stop()
return context.Canceled
Expand Down
30 changes: 26 additions & 4 deletions internal/services/inspector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"sync"
"time"

"github.com/kubev2v/assisted-migration-agent/pkg/vmware"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"

Expand All @@ -29,7 +31,7 @@ func getVCenterCredentials() *models.Credentials {
}
}

// testsMockInspectorWorkBuilder implements models.InspectorWorkBuilder for testing
// testsMockInspectorWorkBuilder implements models.InspectionWorkBuilder for testing
type testsMockInspectorWorkBuilder struct {
vmWorkErr map[string]error // per-VM errors
workDelay time.Duration
Expand Down Expand Up @@ -62,9 +64,9 @@ func (m *testsMockInspectorWorkBuilder) getInspectedVMs() []string {
return result
}

func (m *testsMockInspectorWorkBuilder) Build(vmID string) []models.InspectorWorkUnit {
return []models.InspectorWorkUnit{
{
func (m *testsMockInspectorWorkBuilder) Build(vmID string) models.VMWorkflow {
return models.VMWorkflow{
Validate: models.InspectorWorkUnit{
Work: func() func(ctx context.Context) (any, error) {
return func(ctx context.Context) (any, error) {
if m.workDelay > 0 {
Expand All @@ -87,6 +89,26 @@ func (m *testsMockInspectorWorkBuilder) Build(vmID string) []models.InspectorWor
}
},
},
CreateSnapshot: models.InspectorWorkUnit{
Work: func() func(ctx context.Context) (any, error) {
return vmware.UnimplementedVMWorkUnit(0, "")
},
},
Inspect: models.InspectorWorkUnit{
Work: func() func(ctx context.Context) (any, error) {
return vmware.UnimplementedVMWorkUnit(0, "")
},
},
Save: models.InspectorWorkUnit{
Work: func() func(ctx context.Context) (any, error) {
return vmware.UnimplementedVMWorkUnit(0, "")
},
},
RemoveSnapshot: models.InspectorWorkUnit{
Work: func() func(ctx context.Context) (any, error) {
return vmware.UnimplementedVMWorkUnit(0, "")
},
},
}
}

Expand Down
103 changes: 96 additions & 7 deletions pkg/vmware/work_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package vmware

import (
"context"
"time"

"go.uber.org/zap"

Expand All @@ -21,14 +22,22 @@ func NewInspectorWorkBuilder(operator VMOperator) *InsWorkBuilder {
}

// Build creates the sequence of WorkUnits for the Inspector workflow.
func (b *InsWorkBuilder) Build(id string) []models.InspectorWorkUnit {
func (b *InsWorkBuilder) Build(id string) models.VMWorkflow {
return b.vmWork(id)
}

func (b *InsWorkBuilder) vmWork(id string) []models.InspectorWorkUnit {
var units []models.InspectorWorkUnit
func (b *InsWorkBuilder) vmWork(id string) models.VMWorkflow {
return models.VMWorkflow{
Validate: b.validate(id),
CreateSnapshot: b.createSnapshot(id),
Inspect: b.inspect(id),
Save: b.save(id),
RemoveSnapshot: b.removeSnapshot(id),
}
}

inspect := models.InspectorWorkUnit{
func (b *InsWorkBuilder) validate(id string) models.InspectorWorkUnit {
return models.InspectorWorkUnit{
Work: func() func(ctx context.Context) (any, error) {
return func(ctx context.Context) (any, error) {
zap.S().Named("inspector_service").Info("validate privileges on VM")
Expand All @@ -38,6 +47,16 @@ func (b *InsWorkBuilder) vmWork(id string) []models.InspectorWorkUnit {
return nil, err
}

return nil, nil
}
},
}
}

func (b *InsWorkBuilder) createSnapshot(id string) models.InspectorWorkUnit {
return models.InspectorWorkUnit{
Work: func() func(ctx context.Context) (any, error) {
return func(ctx context.Context) (any, error) {
zap.S().Named("inspector_service").Infow("creating VM snapshot", "vmId", id)
req := CreateSnapshotRequest{
VmId: id,
Expand All @@ -54,7 +73,39 @@ func (b *InsWorkBuilder) vmWork(id string) []models.InspectorWorkUnit {

zap.S().Named("inspector_service").Infow("VM snapshot created", "vmId", id)

// Todo: add the inspection logic here
return nil, nil
}
},
}
}

func (b *InsWorkBuilder) inspect(id string) models.InspectorWorkUnit {
return models.InspectorWorkUnit{
Work: func() func(ctx context.Context) (any, error) {
return func(ctx context.Context) (any, error) {

return nil, nil
}
},
}
}

func (b *InsWorkBuilder) save(id string) models.InspectorWorkUnit {
return models.InspectorWorkUnit{
Work: func() func(ctx context.Context) (any, error) {
return func(ctx context.Context) (any, error) {

return nil, nil
}
},
}
}

func (b *InsWorkBuilder) removeSnapshot(id string) models.InspectorWorkUnit {
return models.InspectorWorkUnit{
Work: func() func(ctx context.Context) (any, error) {
return func(ctx context.Context) (any, error) {
zap.S().Named("inspector_service").Infow("removing VM snapshot", "vmId", id)

removeSnapReq := RemoveSnapshotRequest{
VmId: id,
Expand All @@ -73,8 +124,46 @@ func (b *InsWorkBuilder) vmWork(id string) []models.InspectorWorkUnit {
}
},
}
}

units = append(units, inspect)
type UnimplementedInspectionWorkBuilder struct{}

return units
func (u UnimplementedInspectionWorkBuilder) Build(id string) models.VMWorkflow {
return models.VMWorkflow{
Validate: models.InspectorWorkUnit{
Work: func() func(ctx context.Context) (any, error) {
return UnimplementedVMWorkUnit(time.Second, "unimplemented Validate step finished for: %s", id)
},
},
CreateSnapshot: models.InspectorWorkUnit{
Work: func() func(ctx context.Context) (any, error) {
return UnimplementedVMWorkUnit(time.Second, "unimplemented CreateSnapshot step finished for: %s", id)
},
},
Inspect: models.InspectorWorkUnit{
Work: func() func(ctx context.Context) (any, error) {
return UnimplementedVMWorkUnit(time.Second, "unimplemented Inspect step finished for: %s", id)
},
},
Save: models.InspectorWorkUnit{
Work: func() func(ctx context.Context) (any, error) {
return UnimplementedVMWorkUnit(time.Second, "unimplemented Save step finished for: %s", id)
},
},
RemoveSnapshot: models.InspectorWorkUnit{
Work: func() func(ctx context.Context) (any, error) {
return UnimplementedVMWorkUnit(time.Second, "unimplemented RemoveSnapshot step finished for: %s", id)
},
},
}
}

func UnimplementedVMWorkUnit(delay time.Duration, msg string, args ...string) func(ctx context.Context) (any, error) {
return func(ctx context.Context) (any, error) {
time.Sleep(delay)
if msg != "" {
zap.S().Named("inspector_service").Infof(msg, args)
}
return nil, nil
}
}
Loading