forked from mcuadros/ofelia
-
Notifications
You must be signed in to change notification settings - Fork 5
Expand file tree
/
Copy pathrunjob.go
More file actions
313 lines (269 loc) · 8.63 KB
/
runjob.go
File metadata and controls
313 lines (269 loc) · 8.63 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
// Copyright (c) 2025-2026 Netresearch DTT GmbH
// SPDX-License-Identifier: MIT
package core
import (
"context"
"errors"
"fmt"
"strconv"
"sync"
"time"
"github.com/gobs/args"
"github.com/netresearch/ofelia/core/domain"
)
type RunJob struct {
BareJob `mapstructure:",squash"`
Provider DockerProvider `json:"-"` // SDK-based Docker provider
// User specifies the user to run the container as.
// If not set, uses the global default-user setting (default: "nobody").
// Set to "default" to explicitly use the container's default user, overriding global setting.
User string `hash:"true"`
// ContainerName specifies the name of the container to be created. If
// nil, the job name will be used. If set to an empty string, Docker
// will assign a random name.
ContainerName *string `gcfg:"container-name" mapstructure:"container-name" hash:"true"`
TTY bool `default:"false" hash:"true"`
// do not use bool values with "default:true" because if
// user would set it to "false" explicitly, it still will be
// changed to "true" https://github.com/netresearch/ofelia/issues/135
// so lets use strings here as workaround
Delete string `default:"true" hash:"true"`
Pull string `default:"true" hash:"true"`
Image string `hash:"true"`
Network string `hash:"true"`
Hostname string `hash:"true"`
Entrypoint *string `hash:"true"`
Container string `hash:"true"`
Volume []string `hash:"true"`
VolumesFrom []string `gcfg:"volumes-from" mapstructure:"volumes-from," hash:"true"`
Environment []string `mapstructure:"environment" hash:"true"`
EnvFile []string `gcfg:"env-file" mapstructure:"env-file," hash:"true"`
EnvFrom []string `gcfg:"env-from" mapstructure:"env-from," hash:"true"`
WorkingDir string `gcfg:"working-dir" mapstructure:"working-dir" hash:"true"`
Annotations []string `mapstructure:"annotations" hash:"true"`
MaxRuntime time.Duration `gcfg:"max-runtime" mapstructure:"max-runtime"`
containerID string
mu sync.RWMutex // Protect containerID access
}
func NewRunJob(provider DockerProvider) *RunJob {
return &RunJob{
Provider: provider,
}
}
// InitializeRuntimeFields initializes fields that depend on the Docker provider.
// This should be called after the Provider field is set.
func (j *RunJob) InitializeRuntimeFields() {
// No additional initialization needed with DockerProvider
}
// Validate checks that the job configuration is valid.
// For job-run, either Image or Container must be specified.
func (j *RunJob) Validate() error {
if j.Image == "" && j.Container == "" {
return ErrImageOrContainer
}
return nil
}
func (j *RunJob) setContainerID(id string) {
j.mu.Lock()
j.containerID = id
j.mu.Unlock()
}
func (j *RunJob) getContainerID() string {
j.mu.RLock()
defer j.mu.RUnlock()
return j.containerID
}
func entrypointSlice(ep *string) []string {
if ep == nil {
return nil
}
return args.GetArgs(*ep)
}
func (j *RunJob) Run(ctx *Context) error {
pull, _ := strconv.ParseBool(j.Pull)
// Use the middleware chain's context for cancellation propagation.
// This ensures scheduler shutdown, job removal, and max-runtime
// cancellation reach the Docker API calls.
runCtx := ctx.Ctx
if runCtx == nil {
runCtx = context.Background()
}
if j.Image != "" && j.Container == "" {
if err := j.ensureImageAvailable(runCtx, ctx, pull); err != nil {
return err
}
}
containerID, err := j.createOrInspectContainer(runCtx)
if err != nil {
return err
}
j.setContainerID(containerID)
created := j.Container == ""
if created {
defer func() {
if delErr := j.deleteContainer(runCtx); delErr != nil {
ctx.Warn("failed to delete container: " + delErr.Error())
}
}()
}
return j.startAndWait(runCtx, ctx)
}
// ensureImageAvailable pulls or verifies the image presence according to Pull option.
func (j *RunJob) ensureImageAvailable(ctx context.Context, jobCtx *Context, pull bool) error {
if err := j.Provider.EnsureImage(ctx, j.Image, pull); err != nil {
return fmt.Errorf("ensuring image: %w", err)
}
jobCtx.Log("Image " + j.Image + " is available")
return nil
}
// createOrInspectContainer creates a new container when needed or inspects an existing one.
func (j *RunJob) createOrInspectContainer(ctx context.Context) (string, error) {
if j.Image != "" && j.Container == "" {
return j.buildContainer(ctx)
}
container, err := j.Provider.InspectContainer(ctx, j.Container)
if err != nil {
return "", fmt.Errorf("inspecting container: %w", err)
}
return container.ID, nil
}
// startAndWait starts the container, waits for completion and tails logs.
func (j *RunJob) startAndWait(ctx context.Context, jobCtx *Context) error {
startTime := time.Now()
if err := j.startContainer(ctx); err != nil {
return err
}
// Create a context with timeout if MaxRuntime is set
watchCtx := ctx
var cancel context.CancelFunc
if j.MaxRuntime > 0 {
watchCtx, cancel = context.WithTimeout(ctx, j.MaxRuntime)
defer cancel()
}
err := j.watchContainer(watchCtx)
if errors.Is(err, ErrUnexpected) {
return err
}
// Get logs since start time
logsOpts := ContainerLogsOptions{
ShowStdout: true,
ShowStderr: true,
Since: startTime,
Follow: false,
}
reader, logsErr := j.Provider.GetContainerLogs(ctx, j.getContainerID(), logsOpts)
if logsErr != nil {
jobCtx.Warn("failed to fetch container logs: " + logsErr.Error())
} else if reader != nil {
defer reader.Close()
// Stream logs to execution output
buf := make([]byte, 32*1024)
for {
n, readErr := reader.Read(buf)
if n > 0 {
_, _ = jobCtx.Execution.OutputStream.Write(buf[:n])
}
if readErr != nil {
break
}
}
}
return err
}
func (j *RunJob) buildContainer(ctx context.Context) (string, error) {
name := j.Name
if j.ContainerName != nil {
name = *j.ContainerName
}
// Merge user annotations with default Ofelia annotations
defaults := getDefaultAnnotations(j.Name, "run")
annotations := mergeAnnotations(j.Annotations, defaults)
// Resolve environment from env-file, env-from, and explicit environment
mergedEnv, err := ResolveJobEnvironment(ctx, j.EnvFile, j.EnvFrom, j.Environment, j.Provider, nil)
if err != nil {
return "", err
}
// Build container configuration using domain types
config := &domain.ContainerConfig{
Image: j.Image,
Cmd: args.GetArgs(j.Command),
Entrypoint: entrypointSlice(j.Entrypoint),
Env: mergedEnv,
WorkingDir: j.WorkingDir,
User: j.User,
Hostname: j.Hostname,
AttachStdin: false,
AttachStdout: true,
AttachStderr: true,
Tty: j.TTY,
Name: name,
Labels: annotations,
HostConfig: &domain.HostConfig{
Binds: j.Volume,
VolumesFrom: j.VolumesFrom,
},
}
containerID, err := j.Provider.CreateContainer(ctx, config, name)
if err != nil {
return "", fmt.Errorf("creating container: %w", err)
}
// Connect to network if specified
if j.Network != "" {
networks, findErr := j.Provider.FindNetworkByName(ctx, j.Network)
if findErr == nil {
for _, network := range networks {
if connErr := j.Provider.ConnectNetwork(ctx, network.ID, containerID); connErr != nil {
return containerID, fmt.Errorf("connecting network: %w", connErr)
}
}
}
}
return containerID, nil
}
func (j *RunJob) startContainer(ctx context.Context) error {
if err := j.Provider.StartContainer(ctx, j.getContainerID()); err != nil {
return fmt.Errorf("starting container: %w", err)
}
return nil
}
func (j *RunJob) stopContainer(ctx context.Context, timeout time.Duration) error {
if err := j.Provider.StopContainer(ctx, j.getContainerID(), &timeout); err != nil {
return fmt.Errorf("stopping container: %w", err)
}
return nil
}
func (j *RunJob) getContainer(ctx context.Context) (*domain.Container, error) {
container, err := j.Provider.InspectContainer(ctx, j.getContainerID())
if err != nil {
return nil, fmt.Errorf("getting container: %w", err)
}
return container, nil
}
func (j *RunJob) watchContainer(ctx context.Context) error {
// Use Provider.WaitContainer for efficient waiting
exitCode, err := j.Provider.WaitContainer(ctx, j.getContainerID())
if err != nil {
// Check if it's a context timeout/cancellation (MaxRuntime)
if ctx.Err() != nil {
return ErrMaxTimeRunning
}
return fmt.Errorf("waiting for container: %w", err)
}
switch exitCode {
case 0:
return nil
case -1:
return ErrUnexpected
default:
return NonZeroExitError{ExitCode: int(exitCode)}
}
}
func (j *RunJob) deleteContainer(ctx context.Context) error {
if shouldDelete, _ := strconv.ParseBool(j.Delete); !shouldDelete {
return nil
}
if err := j.Provider.RemoveContainer(ctx, j.getContainerID(), false); err != nil {
return fmt.Errorf("removing container: %w", err)
}
return nil
}